1use crate::error::{ClusterError, Result};
7use crate::worker_pool::WorkerId;
8use dashmap::DashMap;
9use parking_lot::RwLock;
10use serde::{Deserialize, Serialize};
11use std::collections::{HashMap, HashSet};
12use std::sync::Arc;
13use std::sync::atomic::{AtomicU64, Ordering};
14use std::time::Instant;
15
16#[derive(Clone)]
18pub struct DataLocalityOptimizer {
19 inner: Arc<DataLocalityInner>,
20}
21
22struct DataLocalityInner {
23 data_locations: DashMap<String, HashSet<WorkerId>>,
25
26 worker_data: DashMap<WorkerId, HashSet<String>>,
28
29 access_patterns: DashMap<String, AtomicU64>,
31
32 data_affinity: DashMap<String, Vec<WorkerId>>,
34
35 prefetch_schedule: RwLock<HashMap<String, Vec<WorkerId>>>,
37
38 config: LocalityConfig,
40
41 stats: Arc<LocalityStatistics>,
43}
44
45#[derive(Debug, Clone, Serialize, Deserialize)]
47pub struct LocalityConfig {
48 pub min_replication: usize,
50
51 pub max_replication: usize,
53
54 pub hot_data_threshold: u64,
56
57 pub enable_prefetch: bool,
59
60 pub prefetch_lookahead: usize,
62
63 pub enable_affinity: bool,
65
66 pub affinity_update_interval: u64,
68}
69
70impl Default for LocalityConfig {
71 fn default() -> Self {
72 Self {
73 min_replication: 2,
74 max_replication: 5,
75 hot_data_threshold: 100,
76 enable_prefetch: true,
77 prefetch_lookahead: 10,
78 enable_affinity: true,
79 affinity_update_interval: 10,
80 }
81 }
82}
83
84#[derive(Debug, Default)]
86struct LocalityStatistics {
87 locality_hits: AtomicU64,
89
90 locality_misses: AtomicU64,
92
93 data_transfers: AtomicU64,
95
96 prefetches: AtomicU64,
98
99 bytes_transferred: AtomicU64,
101}
102
103#[derive(Debug, Clone, Serialize, Deserialize)]
105pub struct DataLocation {
106 pub key: String,
108
109 pub workers: Vec<WorkerId>,
111
112 pub size_bytes: u64,
114
115 pub access_count: u64,
117
118 #[serde(skip)]
120 pub last_accessed: Option<Instant>,
121}
122
123#[derive(Debug, Clone, Serialize, Deserialize)]
125pub struct PlacementRecommendation {
126 pub worker_id: WorkerId,
128
129 pub locality_score: f64,
131
132 pub local_data: Vec<String>,
134
135 pub transfer_data: Vec<String>,
137
138 pub estimated_transfer_bytes: u64,
140}
141
142impl DataLocalityOptimizer {
143 pub fn new(config: LocalityConfig) -> Self {
145 Self {
146 inner: Arc::new(DataLocalityInner {
147 data_locations: DashMap::new(),
148 worker_data: DashMap::new(),
149 access_patterns: DashMap::new(),
150 data_affinity: DashMap::new(),
151 prefetch_schedule: RwLock::new(HashMap::new()),
152 config,
153 stats: Arc::new(LocalityStatistics::default()),
154 }),
155 }
156 }
157
158 pub fn with_defaults() -> Self {
160 Self::new(LocalityConfig::default())
161 }
162
163 pub fn register_data(&self, data_key: String, worker_id: WorkerId) -> Result<()> {
165 self.inner
167 .data_locations
168 .entry(data_key.clone())
169 .or_default()
170 .insert(worker_id);
171
172 self.inner
174 .worker_data
175 .entry(worker_id)
176 .or_default()
177 .insert(data_key.clone());
178
179 self.inner
181 .access_patterns
182 .entry(data_key)
183 .or_insert_with(|| AtomicU64::new(0));
184
185 Ok(())
186 }
187
188 pub fn unregister_data(&self, data_key: &str, worker_id: WorkerId) -> Result<()> {
190 if let Some(mut locations) = self.inner.data_locations.get_mut(data_key) {
192 locations.remove(&worker_id);
193 }
194
195 if let Some(mut worker_data) = self.inner.worker_data.get_mut(&worker_id) {
197 worker_data.remove(data_key);
198 }
199
200 Ok(())
201 }
202
203 pub fn record_access(&self, data_key: &str, worker_id: WorkerId) -> Result<()> {
205 let access_count = self
207 .inner
208 .access_patterns
209 .entry(data_key.to_string())
210 .or_insert_with(|| AtomicU64::new(0))
211 .fetch_add(1, Ordering::Relaxed)
212 + 1;
213
214 if self.inner.config.enable_affinity
216 && access_count % self.inner.config.affinity_update_interval == 0
217 {
218 self.update_affinity(data_key, worker_id)?;
219 }
220
221 Ok(())
222 }
223
224 fn update_affinity(&self, data_key: &str, worker_id: WorkerId) -> Result<()> {
226 let mut affinity = self
227 .inner
228 .data_affinity
229 .entry(data_key.to_string())
230 .or_default();
231
232 if !affinity.contains(&worker_id) {
234 affinity.push(worker_id);
235
236 if affinity.len() > 5 {
238 affinity.remove(0);
239 }
240 } else {
241 if let Some(pos) = affinity.iter().position(|&id| id == worker_id) {
243 affinity.remove(pos);
244 affinity.push(worker_id);
245 }
246 }
247
248 Ok(())
249 }
250
251 pub fn get_workers_with_data(&self, data_key: &str) -> Vec<WorkerId> {
253 self.inner
254 .data_locations
255 .get(data_key)
256 .map(|locations| locations.iter().copied().collect())
257 .unwrap_or_default()
258 }
259
260 pub fn get_worker_data(&self, worker_id: WorkerId) -> Vec<String> {
262 self.inner
263 .worker_data
264 .get(&worker_id)
265 .map(|data| data.iter().cloned().collect())
266 .unwrap_or_default()
267 }
268
269 pub fn get_data_location(&self, data_key: &str) -> Option<DataLocation> {
271 let workers = self
272 .inner
273 .data_locations
274 .get(data_key)?
275 .iter()
276 .copied()
277 .collect();
278
279 let access_count = self
280 .inner
281 .access_patterns
282 .get(data_key)
283 .map(|c| c.load(Ordering::Relaxed))
284 .unwrap_or(0);
285
286 Some(DataLocation {
287 key: data_key.to_string(),
288 workers,
289 size_bytes: 0, access_count,
291 last_accessed: None,
292 })
293 }
294
295 pub fn recommend_placement(
297 &self,
298 required_data: &[String],
299 candidate_workers: &[WorkerId],
300 ) -> Result<PlacementRecommendation> {
301 if candidate_workers.is_empty() {
302 return Err(ClusterError::DataLocalityError(
303 "No candidate workers provided".to_string(),
304 ));
305 }
306
307 let mut best_worker = candidate_workers[0];
308 let mut best_score = 0.0;
309 let mut best_local = Vec::new();
310 let mut best_transfer = Vec::new();
311
312 for &worker_id in candidate_workers {
313 let worker_data = self.get_worker_data(worker_id);
314 let worker_data_set: HashSet<_> = worker_data.iter().collect();
315
316 let mut local_data = Vec::new();
317 let mut transfer_data = Vec::new();
318
319 for data_key in required_data {
320 if worker_data_set.contains(&data_key) {
321 local_data.push(data_key.clone());
322 } else {
323 transfer_data.push(data_key.clone());
324 }
325 }
326
327 let locality_score = if required_data.is_empty() {
329 1.0
330 } else {
331 local_data.len() as f64 / required_data.len() as f64
332 };
333
334 if locality_score > best_score {
335 best_score = locality_score;
336 best_worker = worker_id;
337 best_local = local_data;
338 best_transfer = transfer_data;
339 }
340 }
341
342 if best_score == 1.0 {
344 self.inner
345 .stats
346 .locality_hits
347 .fetch_add(1, Ordering::Relaxed);
348 } else {
349 self.inner
350 .stats
351 .locality_misses
352 .fetch_add(1, Ordering::Relaxed);
353 }
354
355 Ok(PlacementRecommendation {
356 worker_id: best_worker,
357 locality_score: best_score,
358 local_data: best_local,
359 transfer_data: best_transfer,
360 estimated_transfer_bytes: 0, })
362 }
363
364 pub fn schedule_prefetch(&self, data_key: String, target_workers: Vec<WorkerId>) -> Result<()> {
366 if !self.inner.config.enable_prefetch {
367 return Ok(());
368 }
369
370 let mut schedule = self.inner.prefetch_schedule.write();
371 schedule.insert(data_key, target_workers);
372
373 Ok(())
374 }
375
376 pub fn get_prefetch_schedule(&self) -> HashMap<String, Vec<WorkerId>> {
378 self.inner.prefetch_schedule.read().clone()
379 }
380
381 pub fn clear_prefetch_schedule(&self) {
383 self.inner.prefetch_schedule.write().clear();
384 }
385
386 pub fn record_transfer(
388 &self,
389 data_key: &str,
390 _from_worker: WorkerId,
391 to_worker: WorkerId,
392 bytes: u64,
393 ) -> Result<()> {
394 self.inner
395 .stats
396 .data_transfers
397 .fetch_add(1, Ordering::Relaxed);
398
399 self.inner
400 .stats
401 .bytes_transferred
402 .fetch_add(bytes, Ordering::Relaxed);
403
404 self.register_data(data_key.to_string(), to_worker)?;
406
407 Ok(())
408 }
409
410 pub fn record_prefetch(&self) {
412 self.inner.stats.prefetches.fetch_add(1, Ordering::Relaxed);
413 }
414
415 pub fn get_hot_data(&self) -> Vec<(String, u64)> {
417 let mut hot_data: Vec<_> = self
418 .inner
419 .access_patterns
420 .iter()
421 .filter_map(|entry| {
422 let key = entry.key().clone();
423 let count = entry.value().load(Ordering::Relaxed);
424 if count >= self.inner.config.hot_data_threshold {
425 Some((key, count))
426 } else {
427 None
428 }
429 })
430 .collect();
431
432 hot_data.sort_by_key(|x| std::cmp::Reverse(x.1));
433 hot_data
434 }
435
436 pub fn suggest_replication(&self) -> Vec<(String, usize)> {
438 let mut suggestions = Vec::new();
439
440 for (data_key, access_count) in self.get_hot_data() {
441 let current_replication = self
442 .inner
443 .data_locations
444 .get(&data_key)
445 .map(|locs| locs.len())
446 .unwrap_or(0);
447
448 let suggested_replication = if access_count > self.inner.config.hot_data_threshold * 10
450 {
451 self.inner.config.max_replication
452 } else if access_count > self.inner.config.hot_data_threshold * 5 {
453 (self.inner.config.max_replication + self.inner.config.min_replication) / 2
454 } else {
455 self.inner.config.min_replication
456 };
457
458 if suggested_replication > current_replication {
459 suggestions.push((data_key, suggested_replication - current_replication));
460 }
461 }
462
463 suggestions
464 }
465
466 pub fn get_affinity(&self, data_key: &str) -> Vec<WorkerId> {
468 self.inner
469 .data_affinity
470 .get(data_key)
471 .map(|affinity| affinity.clone())
472 .unwrap_or_default()
473 }
474
475 pub fn remove_worker(&self, worker_id: WorkerId) -> Result<()> {
477 if let Some((_, data_keys)) = self.inner.worker_data.remove(&worker_id) {
479 for data_key in data_keys {
480 if let Some(mut locations) = self.inner.data_locations.get_mut(&data_key) {
481 locations.remove(&worker_id);
482 }
483 }
484 }
485
486 for mut affinity in self.inner.data_affinity.iter_mut() {
488 affinity.retain(|&id| id != worker_id);
489 }
490
491 Ok(())
492 }
493
494 pub fn get_statistics(&self) -> LocalityStats {
496 let locality_hits = self.inner.stats.locality_hits.load(Ordering::Relaxed);
497 let locality_misses = self.inner.stats.locality_misses.load(Ordering::Relaxed);
498
499 let total_placements = locality_hits + locality_misses;
500 let hit_rate = if total_placements > 0 {
501 locality_hits as f64 / total_placements as f64
502 } else {
503 0.0
504 };
505
506 LocalityStats {
507 locality_hits,
508 locality_misses,
509 hit_rate,
510 data_transfers: self.inner.stats.data_transfers.load(Ordering::Relaxed),
511 prefetches: self.inner.stats.prefetches.load(Ordering::Relaxed),
512 bytes_transferred: self.inner.stats.bytes_transferred.load(Ordering::Relaxed),
513 tracked_data_keys: self.inner.data_locations.len(),
514 tracked_workers: self.inner.worker_data.len(),
515 }
516 }
517
518 pub fn reset_statistics(&self) {
520 self.inner.stats.locality_hits.store(0, Ordering::Relaxed);
521 self.inner.stats.locality_misses.store(0, Ordering::Relaxed);
522 self.inner.stats.data_transfers.store(0, Ordering::Relaxed);
523 self.inner.stats.prefetches.store(0, Ordering::Relaxed);
524 self.inner
525 .stats
526 .bytes_transferred
527 .store(0, Ordering::Relaxed);
528 }
529}
530
531#[derive(Debug, Clone, Serialize, Deserialize)]
533pub struct LocalityStats {
534 pub locality_hits: u64,
536
537 pub locality_misses: u64,
539
540 pub hit_rate: f64,
542
543 pub data_transfers: u64,
545
546 pub prefetches: u64,
548
549 pub bytes_transferred: u64,
551
552 pub tracked_data_keys: usize,
554
555 pub tracked_workers: usize,
557}
558
559#[cfg(test)]
560#[allow(clippy::expect_used, clippy::unwrap_used)]
561mod tests {
562 use super::*;
563
564 #[test]
565 fn test_locality_optimizer_creation() {
566 let optimizer = DataLocalityOptimizer::with_defaults();
567 let stats = optimizer.get_statistics();
568 assert_eq!(stats.locality_hits, 0);
569 }
570
571 #[test]
572 fn test_register_data() {
573 let optimizer = DataLocalityOptimizer::with_defaults();
574 let worker_id = WorkerId::new();
575
576 let result = optimizer.register_data("data1".to_string(), worker_id);
577 assert!(result.is_ok());
578
579 let workers = optimizer.get_workers_with_data("data1");
580 assert_eq!(workers.len(), 1);
581 assert_eq!(workers[0], worker_id);
582 }
583
584 #[test]
585 fn test_data_access_tracking() {
586 let optimizer = DataLocalityOptimizer::with_defaults();
587 let worker_id = WorkerId::new();
588
589 optimizer.register_data("data1".to_string(), worker_id).ok();
590 optimizer.record_access("data1", worker_id).ok();
591 optimizer.record_access("data1", worker_id).ok();
592
593 let location = optimizer.get_data_location("data1");
594 assert!(location.is_some());
595 if let Some(location) = location {
596 assert_eq!(location.access_count, 2);
597 }
598 }
599
600 #[test]
601 fn test_placement_recommendation() {
602 let optimizer = DataLocalityOptimizer::with_defaults();
603
604 let worker1 = WorkerId::new();
605 let worker2 = WorkerId::new();
606
607 optimizer.register_data("data1".to_string(), worker1).ok();
608 optimizer.register_data("data2".to_string(), worker1).ok();
609 optimizer.register_data("data3".to_string(), worker2).ok();
610
611 let required_data = vec!["data1".to_string(), "data2".to_string()];
612 let candidates = vec![worker1, worker2];
613
614 let recommendation = optimizer.recommend_placement(&required_data, &candidates);
615 assert!(recommendation.is_ok());
616
617 if let Ok(rec) = recommendation {
618 assert_eq!(rec.worker_id, worker1);
619 assert_eq!(rec.locality_score, 1.0);
620 }
621 }
622
623 #[test]
624 fn test_hot_data_detection() {
625 let optimizer = DataLocalityOptimizer::with_defaults();
626 let worker_id = WorkerId::new();
627
628 optimizer
629 .register_data("hot_data".to_string(), worker_id)
630 .ok();
631
632 for _ in 0..150 {
634 optimizer.record_access("hot_data", worker_id).ok();
635 }
636
637 let hot_data = optimizer.get_hot_data();
638 assert!(!hot_data.is_empty());
639 assert_eq!(hot_data[0].0, "hot_data");
640 }
641}