Skip to main content

oxigdal_cluster/
data_locality.rs

1//! Data locality optimization for minimizing data transfer.
2//!
3//! This module implements data locality tracking and optimization to minimize
4//! network transfer by placing tasks near their required data.
5
6use crate::error::{ClusterError, Result};
7use crate::worker_pool::WorkerId;
8use dashmap::DashMap;
9use parking_lot::RwLock;
10use serde::{Deserialize, Serialize};
11use std::collections::{HashMap, HashSet};
12use std::sync::Arc;
13use std::sync::atomic::{AtomicU64, Ordering};
14use std::time::Instant;
15
16/// Data locality optimizer.
17#[derive(Clone)]
18pub struct DataLocalityOptimizer {
19    inner: Arc<DataLocalityInner>,
20}
21
22struct DataLocalityInner {
23    /// Data location tracking (data_key -> workers)
24    data_locations: DashMap<String, HashSet<WorkerId>>,
25
26    /// Worker data inventory (worker -> data_keys)
27    worker_data: DashMap<WorkerId, HashSet<String>>,
28
29    /// Data access patterns (data_key -> access count)
30    access_patterns: DashMap<String, AtomicU64>,
31
32    /// Data affinity (data_key -> preferred workers)
33    data_affinity: DashMap<String, Vec<WorkerId>>,
34
35    /// Prefetch schedule (data_key -> workers to prefetch)
36    prefetch_schedule: RwLock<HashMap<String, Vec<WorkerId>>>,
37
38    /// Configuration
39    config: LocalityConfig,
40
41    /// Statistics
42    stats: Arc<LocalityStatistics>,
43}
44
45/// Locality optimizer configuration.
46#[derive(Debug, Clone, Serialize, Deserialize)]
47pub struct LocalityConfig {
48    /// Minimum replication factor
49    pub min_replication: usize,
50
51    /// Maximum replication factor
52    pub max_replication: usize,
53
54    /// Access count threshold for hot data
55    pub hot_data_threshold: u64,
56
57    /// Enable prefetching
58    pub enable_prefetch: bool,
59
60    /// Prefetch lookahead distance
61    pub prefetch_lookahead: usize,
62
63    /// Enable affinity tracking
64    pub enable_affinity: bool,
65
66    /// Affinity update interval (number of accesses)
67    pub affinity_update_interval: u64,
68}
69
70impl Default for LocalityConfig {
71    fn default() -> Self {
72        Self {
73            min_replication: 2,
74            max_replication: 5,
75            hot_data_threshold: 100,
76            enable_prefetch: true,
77            prefetch_lookahead: 10,
78            enable_affinity: true,
79            affinity_update_interval: 10,
80        }
81    }
82}
83
84/// Locality statistics.
85#[derive(Debug, Default)]
86struct LocalityStatistics {
87    /// Locality hits (task placed on worker with data)
88    locality_hits: AtomicU64,
89
90    /// Locality misses (task placed on worker without data)
91    locality_misses: AtomicU64,
92
93    /// Data transfers initiated
94    data_transfers: AtomicU64,
95
96    /// Prefetches performed
97    prefetches: AtomicU64,
98
99    /// Bytes transferred
100    bytes_transferred: AtomicU64,
101}
102
103/// Data location information.
104#[derive(Debug, Clone, Serialize, Deserialize)]
105pub struct DataLocation {
106    /// Data key
107    pub key: String,
108
109    /// Workers that have this data
110    pub workers: Vec<WorkerId>,
111
112    /// Data size (bytes)
113    pub size_bytes: u64,
114
115    /// Access count
116    pub access_count: u64,
117
118    /// Last accessed time
119    #[serde(skip)]
120    pub last_accessed: Option<Instant>,
121}
122
123/// Task placement recommendation.
124#[derive(Debug, Clone, Serialize, Deserialize)]
125pub struct PlacementRecommendation {
126    /// Recommended worker
127    pub worker_id: WorkerId,
128
129    /// Locality score (0.0-1.0, higher is better)
130    pub locality_score: f64,
131
132    /// Data already on worker
133    pub local_data: Vec<String>,
134
135    /// Data to transfer
136    pub transfer_data: Vec<String>,
137
138    /// Estimated transfer size (bytes)
139    pub estimated_transfer_bytes: u64,
140}
141
142impl DataLocalityOptimizer {
143    /// Create a new data locality optimizer.
144    pub fn new(config: LocalityConfig) -> Self {
145        Self {
146            inner: Arc::new(DataLocalityInner {
147                data_locations: DashMap::new(),
148                worker_data: DashMap::new(),
149                access_patterns: DashMap::new(),
150                data_affinity: DashMap::new(),
151                prefetch_schedule: RwLock::new(HashMap::new()),
152                config,
153                stats: Arc::new(LocalityStatistics::default()),
154            }),
155        }
156    }
157
158    /// Create with default configuration.
159    pub fn with_defaults() -> Self {
160        Self::new(LocalityConfig::default())
161    }
162
163    /// Register data location.
164    pub fn register_data(&self, data_key: String, worker_id: WorkerId) -> Result<()> {
165        // Add to data locations
166        self.inner
167            .data_locations
168            .entry(data_key.clone())
169            .or_default()
170            .insert(worker_id);
171
172        // Add to worker data inventory
173        self.inner
174            .worker_data
175            .entry(worker_id)
176            .or_default()
177            .insert(data_key.clone());
178
179        // Initialize access pattern
180        self.inner
181            .access_patterns
182            .entry(data_key)
183            .or_insert_with(|| AtomicU64::new(0));
184
185        Ok(())
186    }
187
188    /// Unregister data location.
189    pub fn unregister_data(&self, data_key: &str, worker_id: WorkerId) -> Result<()> {
190        // Remove from data locations
191        if let Some(mut locations) = self.inner.data_locations.get_mut(data_key) {
192            locations.remove(&worker_id);
193        }
194
195        // Remove from worker data inventory
196        if let Some(mut worker_data) = self.inner.worker_data.get_mut(&worker_id) {
197            worker_data.remove(data_key);
198        }
199
200        Ok(())
201    }
202
203    /// Record data access.
204    pub fn record_access(&self, data_key: &str, worker_id: WorkerId) -> Result<()> {
205        // Increment access count
206        let access_count = self
207            .inner
208            .access_patterns
209            .entry(data_key.to_string())
210            .or_insert_with(|| AtomicU64::new(0))
211            .fetch_add(1, Ordering::Relaxed)
212            + 1;
213
214        // Update affinity if enabled
215        if self.inner.config.enable_affinity
216            && access_count % self.inner.config.affinity_update_interval == 0
217        {
218            self.update_affinity(data_key, worker_id)?;
219        }
220
221        Ok(())
222    }
223
224    /// Update data affinity.
225    fn update_affinity(&self, data_key: &str, worker_id: WorkerId) -> Result<()> {
226        let mut affinity = self
227            .inner
228            .data_affinity
229            .entry(data_key.to_string())
230            .or_default();
231
232        // Add worker if not already in affinity list
233        if !affinity.contains(&worker_id) {
234            affinity.push(worker_id);
235
236            // Keep only top N workers
237            if affinity.len() > 5 {
238                affinity.remove(0);
239            }
240        } else {
241            // Move to end (most recent)
242            if let Some(pos) = affinity.iter().position(|&id| id == worker_id) {
243                affinity.remove(pos);
244                affinity.push(worker_id);
245            }
246        }
247
248        Ok(())
249    }
250
251    /// Get workers that have specific data.
252    pub fn get_workers_with_data(&self, data_key: &str) -> Vec<WorkerId> {
253        self.inner
254            .data_locations
255            .get(data_key)
256            .map(|locations| locations.iter().copied().collect())
257            .unwrap_or_default()
258    }
259
260    /// Get data available on a worker.
261    pub fn get_worker_data(&self, worker_id: WorkerId) -> Vec<String> {
262        self.inner
263            .worker_data
264            .get(&worker_id)
265            .map(|data| data.iter().cloned().collect())
266            .unwrap_or_default()
267    }
268
269    /// Get data location information.
270    pub fn get_data_location(&self, data_key: &str) -> Option<DataLocation> {
271        let workers = self
272            .inner
273            .data_locations
274            .get(data_key)?
275            .iter()
276            .copied()
277            .collect();
278
279        let access_count = self
280            .inner
281            .access_patterns
282            .get(data_key)
283            .map(|c| c.load(Ordering::Relaxed))
284            .unwrap_or(0);
285
286        Some(DataLocation {
287            key: data_key.to_string(),
288            workers,
289            size_bytes: 0, // Would need to track separately
290            access_count,
291            last_accessed: None,
292        })
293    }
294
295    /// Recommend task placement based on data locality.
296    pub fn recommend_placement(
297        &self,
298        required_data: &[String],
299        candidate_workers: &[WorkerId],
300    ) -> Result<PlacementRecommendation> {
301        if candidate_workers.is_empty() {
302            return Err(ClusterError::DataLocalityError(
303                "No candidate workers provided".to_string(),
304            ));
305        }
306
307        let mut best_worker = candidate_workers[0];
308        let mut best_score = 0.0;
309        let mut best_local = Vec::new();
310        let mut best_transfer = Vec::new();
311
312        for &worker_id in candidate_workers {
313            let worker_data = self.get_worker_data(worker_id);
314            let worker_data_set: HashSet<_> = worker_data.iter().collect();
315
316            let mut local_data = Vec::new();
317            let mut transfer_data = Vec::new();
318
319            for data_key in required_data {
320                if worker_data_set.contains(&data_key) {
321                    local_data.push(data_key.clone());
322                } else {
323                    transfer_data.push(data_key.clone());
324                }
325            }
326
327            // Calculate locality score
328            let locality_score = if required_data.is_empty() {
329                1.0
330            } else {
331                local_data.len() as f64 / required_data.len() as f64
332            };
333
334            if locality_score > best_score {
335                best_score = locality_score;
336                best_worker = worker_id;
337                best_local = local_data;
338                best_transfer = transfer_data;
339            }
340        }
341
342        // Record hit or miss
343        if best_score == 1.0 {
344            self.inner
345                .stats
346                .locality_hits
347                .fetch_add(1, Ordering::Relaxed);
348        } else {
349            self.inner
350                .stats
351                .locality_misses
352                .fetch_add(1, Ordering::Relaxed);
353        }
354
355        Ok(PlacementRecommendation {
356            worker_id: best_worker,
357            locality_score: best_score,
358            local_data: best_local,
359            transfer_data: best_transfer,
360            estimated_transfer_bytes: 0, // Would need data size info
361        })
362    }
363
364    /// Schedule data prefetch for upcoming tasks.
365    pub fn schedule_prefetch(&self, data_key: String, target_workers: Vec<WorkerId>) -> Result<()> {
366        if !self.inner.config.enable_prefetch {
367            return Ok(());
368        }
369
370        let mut schedule = self.inner.prefetch_schedule.write();
371        schedule.insert(data_key, target_workers);
372
373        Ok(())
374    }
375
376    /// Get prefetch schedule.
377    pub fn get_prefetch_schedule(&self) -> HashMap<String, Vec<WorkerId>> {
378        self.inner.prefetch_schedule.read().clone()
379    }
380
381    /// Clear prefetch schedule.
382    pub fn clear_prefetch_schedule(&self) {
383        self.inner.prefetch_schedule.write().clear();
384    }
385
386    /// Record data transfer.
387    pub fn record_transfer(
388        &self,
389        data_key: &str,
390        _from_worker: WorkerId,
391        to_worker: WorkerId,
392        bytes: u64,
393    ) -> Result<()> {
394        self.inner
395            .stats
396            .data_transfers
397            .fetch_add(1, Ordering::Relaxed);
398
399        self.inner
400            .stats
401            .bytes_transferred
402            .fetch_add(bytes, Ordering::Relaxed);
403
404        // Register data at new location
405        self.register_data(data_key.to_string(), to_worker)?;
406
407        Ok(())
408    }
409
410    /// Record prefetch.
411    pub fn record_prefetch(&self) {
412        self.inner.stats.prefetches.fetch_add(1, Ordering::Relaxed);
413    }
414
415    /// Get hot data (frequently accessed).
416    pub fn get_hot_data(&self) -> Vec<(String, u64)> {
417        let mut hot_data: Vec<_> = self
418            .inner
419            .access_patterns
420            .iter()
421            .filter_map(|entry| {
422                let key = entry.key().clone();
423                let count = entry.value().load(Ordering::Relaxed);
424                if count >= self.inner.config.hot_data_threshold {
425                    Some((key, count))
426                } else {
427                    None
428                }
429            })
430            .collect();
431
432        hot_data.sort_by_key(|x| std::cmp::Reverse(x.1));
433        hot_data
434    }
435
436    /// Suggest replication for hot data.
437    pub fn suggest_replication(&self) -> Vec<(String, usize)> {
438        let mut suggestions = Vec::new();
439
440        for (data_key, access_count) in self.get_hot_data() {
441            let current_replication = self
442                .inner
443                .data_locations
444                .get(&data_key)
445                .map(|locs| locs.len())
446                .unwrap_or(0);
447
448            // Suggest higher replication for hot data
449            let suggested_replication = if access_count > self.inner.config.hot_data_threshold * 10
450            {
451                self.inner.config.max_replication
452            } else if access_count > self.inner.config.hot_data_threshold * 5 {
453                (self.inner.config.max_replication + self.inner.config.min_replication) / 2
454            } else {
455                self.inner.config.min_replication
456            };
457
458            if suggested_replication > current_replication {
459                suggestions.push((data_key, suggested_replication - current_replication));
460            }
461        }
462
463        suggestions
464    }
465
466    /// Get affinity for data.
467    pub fn get_affinity(&self, data_key: &str) -> Vec<WorkerId> {
468        self.inner
469            .data_affinity
470            .get(data_key)
471            .map(|affinity| affinity.clone())
472            .unwrap_or_default()
473    }
474
475    /// Remove worker from all data tracking.
476    pub fn remove_worker(&self, worker_id: WorkerId) -> Result<()> {
477        // Remove from all data locations
478        if let Some((_, data_keys)) = self.inner.worker_data.remove(&worker_id) {
479            for data_key in data_keys {
480                if let Some(mut locations) = self.inner.data_locations.get_mut(&data_key) {
481                    locations.remove(&worker_id);
482                }
483            }
484        }
485
486        // Remove from affinity lists
487        for mut affinity in self.inner.data_affinity.iter_mut() {
488            affinity.retain(|&id| id != worker_id);
489        }
490
491        Ok(())
492    }
493
494    /// Get locality statistics.
495    pub fn get_statistics(&self) -> LocalityStats {
496        let locality_hits = self.inner.stats.locality_hits.load(Ordering::Relaxed);
497        let locality_misses = self.inner.stats.locality_misses.load(Ordering::Relaxed);
498
499        let total_placements = locality_hits + locality_misses;
500        let hit_rate = if total_placements > 0 {
501            locality_hits as f64 / total_placements as f64
502        } else {
503            0.0
504        };
505
506        LocalityStats {
507            locality_hits,
508            locality_misses,
509            hit_rate,
510            data_transfers: self.inner.stats.data_transfers.load(Ordering::Relaxed),
511            prefetches: self.inner.stats.prefetches.load(Ordering::Relaxed),
512            bytes_transferred: self.inner.stats.bytes_transferred.load(Ordering::Relaxed),
513            tracked_data_keys: self.inner.data_locations.len(),
514            tracked_workers: self.inner.worker_data.len(),
515        }
516    }
517
518    /// Reset statistics.
519    pub fn reset_statistics(&self) {
520        self.inner.stats.locality_hits.store(0, Ordering::Relaxed);
521        self.inner.stats.locality_misses.store(0, Ordering::Relaxed);
522        self.inner.stats.data_transfers.store(0, Ordering::Relaxed);
523        self.inner.stats.prefetches.store(0, Ordering::Relaxed);
524        self.inner
525            .stats
526            .bytes_transferred
527            .store(0, Ordering::Relaxed);
528    }
529}
530
531/// Locality statistics snapshot.
532#[derive(Debug, Clone, Serialize, Deserialize)]
533pub struct LocalityStats {
534    /// Locality hits
535    pub locality_hits: u64,
536
537    /// Locality misses
538    pub locality_misses: u64,
539
540    /// Hit rate (0.0-1.0)
541    pub hit_rate: f64,
542
543    /// Data transfers
544    pub data_transfers: u64,
545
546    /// Prefetches
547    pub prefetches: u64,
548
549    /// Bytes transferred
550    pub bytes_transferred: u64,
551
552    /// Number of tracked data keys
553    pub tracked_data_keys: usize,
554
555    /// Number of tracked workers
556    pub tracked_workers: usize,
557}
558
559#[cfg(test)]
560#[allow(clippy::expect_used, clippy::unwrap_used)]
561mod tests {
562    use super::*;
563
564    #[test]
565    fn test_locality_optimizer_creation() {
566        let optimizer = DataLocalityOptimizer::with_defaults();
567        let stats = optimizer.get_statistics();
568        assert_eq!(stats.locality_hits, 0);
569    }
570
571    #[test]
572    fn test_register_data() {
573        let optimizer = DataLocalityOptimizer::with_defaults();
574        let worker_id = WorkerId::new();
575
576        let result = optimizer.register_data("data1".to_string(), worker_id);
577        assert!(result.is_ok());
578
579        let workers = optimizer.get_workers_with_data("data1");
580        assert_eq!(workers.len(), 1);
581        assert_eq!(workers[0], worker_id);
582    }
583
584    #[test]
585    fn test_data_access_tracking() {
586        let optimizer = DataLocalityOptimizer::with_defaults();
587        let worker_id = WorkerId::new();
588
589        optimizer.register_data("data1".to_string(), worker_id).ok();
590        optimizer.record_access("data1", worker_id).ok();
591        optimizer.record_access("data1", worker_id).ok();
592
593        let location = optimizer.get_data_location("data1");
594        assert!(location.is_some());
595        if let Some(location) = location {
596            assert_eq!(location.access_count, 2);
597        }
598    }
599
600    #[test]
601    fn test_placement_recommendation() {
602        let optimizer = DataLocalityOptimizer::with_defaults();
603
604        let worker1 = WorkerId::new();
605        let worker2 = WorkerId::new();
606
607        optimizer.register_data("data1".to_string(), worker1).ok();
608        optimizer.register_data("data2".to_string(), worker1).ok();
609        optimizer.register_data("data3".to_string(), worker2).ok();
610
611        let required_data = vec!["data1".to_string(), "data2".to_string()];
612        let candidates = vec![worker1, worker2];
613
614        let recommendation = optimizer.recommend_placement(&required_data, &candidates);
615        assert!(recommendation.is_ok());
616
617        if let Ok(rec) = recommendation {
618            assert_eq!(rec.worker_id, worker1);
619            assert_eq!(rec.locality_score, 1.0);
620        }
621    }
622
623    #[test]
624    fn test_hot_data_detection() {
625        let optimizer = DataLocalityOptimizer::with_defaults();
626        let worker_id = WorkerId::new();
627
628        optimizer
629            .register_data("hot_data".to_string(), worker_id)
630            .ok();
631
632        // Access data many times
633        for _ in 0..150 {
634            optimizer.record_access("hot_data", worker_id).ok();
635        }
636
637        let hot_data = optimizer.get_hot_data();
638        assert!(!hot_data.is_empty());
639        assert_eq!(hot_data[0].0, "hot_data");
640    }
641}