Skip to main content

oxigdal_cluster/
replication.rs

1//! Data replication for reliability and availability.
2//!
3//! This module implements data replication with configurable replication factor,
4//! quorum-based reads/writes, replica placement strategy, and automatic re-replication.
5
6use crate::error::{ClusterError, Result};
7use crate::worker_pool::WorkerId;
8use dashmap::DashMap;
9use serde::{Deserialize, Serialize};
10use std::collections::HashSet;
11use std::sync::Arc;
12use std::sync::atomic::{AtomicU64, Ordering};
13use std::time::Instant;
14use tracing::{debug, info, warn};
15
16/// Replication manager.
17#[derive(Clone)]
18pub struct ReplicationManager {
19    inner: Arc<ReplicationInner>,
20}
21
22struct ReplicationInner {
23    /// Replica locations (data_id -> replicas)
24    replicas: DashMap<String, ReplicaSet>,
25
26    /// Worker health (for placement decisions)
27    worker_health: DashMap<WorkerId, WorkerHealth>,
28
29    /// Configuration
30    config: ReplicationConfig,
31
32    /// Statistics
33    stats: Arc<ReplicationStats>,
34}
35
36/// Replication configuration.
37#[derive(Debug, Clone, Serialize, Deserialize)]
38pub struct ReplicationConfig {
39    /// Default replication factor
40    pub replication_factor: usize,
41
42    /// Minimum replication factor
43    pub min_replicas: usize,
44
45    /// Maximum replication factor
46    pub max_replicas: usize,
47
48    /// Read quorum size
49    pub read_quorum: usize,
50
51    /// Write quorum size
52    pub write_quorum: usize,
53
54    /// Replica placement strategy
55    pub placement_strategy: PlacementStrategy,
56
57    /// Enable automatic re-replication
58    pub auto_rereplication: bool,
59
60    /// Re-replication check interval
61    pub rereplication_interval: std::time::Duration,
62
63    /// Rack awareness (spread across racks)
64    pub rack_aware: bool,
65}
66
67impl Default for ReplicationConfig {
68    fn default() -> Self {
69        Self {
70            replication_factor: 3,
71            min_replicas: 2,
72            max_replicas: 5,
73            read_quorum: 2,
74            write_quorum: 2,
75            placement_strategy: PlacementStrategy::Random,
76            auto_rereplication: true,
77            rereplication_interval: std::time::Duration::from_secs(60),
78            rack_aware: false,
79        }
80    }
81}
82
83/// Replica placement strategy.
84#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
85pub enum PlacementStrategy {
86    /// Random placement
87    Random,
88
89    /// Least loaded workers first
90    LeastLoaded,
91
92    /// Rack-aware (spread across racks)
93    RackAware,
94
95    /// Zone-aware (spread across zones)
96    ZoneAware,
97}
98
99/// Replica set for a data item.
100#[derive(Debug, Clone)]
101pub struct ReplicaSet {
102    /// Data ID
103    pub data_id: String,
104
105    /// Replica locations
106    pub replicas: Vec<Replica>,
107
108    /// Primary replica (for write coordination)
109    pub primary: Option<WorkerId>,
110
111    /// Data version
112    pub version: u64,
113
114    /// Created at
115    pub created_at: Instant,
116
117    /// Last updated
118    pub last_updated: Instant,
119
120    /// Data size (bytes)
121    pub size_bytes: u64,
122}
123
124/// Individual replica.
125#[derive(Debug, Clone)]
126pub struct Replica {
127    /// Worker ID
128    pub worker_id: WorkerId,
129
130    /// Replica status
131    pub status: ReplicaStatus,
132
133    /// Version
134    pub version: u64,
135
136    /// Created at
137    pub created_at: Instant,
138
139    /// Last verified
140    pub last_verified: Option<Instant>,
141
142    /// Rack ID (for rack-aware placement)
143    pub rack_id: Option<String>,
144
145    /// Zone ID (for zone-aware placement)
146    pub zone_id: Option<String>,
147}
148
149/// Replica status.
150#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
151pub enum ReplicaStatus {
152    /// Replica is healthy
153    Healthy,
154
155    /// Replica is being created
156    Replicating,
157
158    /// Replica is stale (version mismatch)
159    Stale,
160
161    /// Replica is unavailable
162    Unavailable,
163}
164
165/// Worker health information.
166#[derive(Debug, Clone)]
167pub struct WorkerHealth {
168    /// Worker ID
169    pub worker_id: WorkerId,
170
171    /// Health status
172    pub healthy: bool,
173
174    /// Load (0.0-1.0)
175    pub load: f64,
176
177    /// Available storage (bytes)
178    pub available_storage: u64,
179
180    /// Rack ID
181    pub rack_id: Option<String>,
182
183    /// Zone ID
184    pub zone_id: Option<String>,
185
186    /// Last updated
187    pub last_updated: Instant,
188}
189
190/// Replication statistics.
191#[derive(Debug, Default)]
192struct ReplicationStats {
193    /// Total replicas created
194    replicas_created: AtomicU64,
195
196    /// Replicas removed
197    replicas_removed: AtomicU64,
198
199    /// Re-replications performed
200    rereplications: AtomicU64,
201
202    /// Quorum reads
203    quorum_reads: AtomicU64,
204
205    /// Quorum writes
206    quorum_writes: AtomicU64,
207
208    /// Quorum failures
209    quorum_failures: AtomicU64,
210}
211
212impl ReplicationManager {
213    /// Create a new replication manager.
214    pub fn new(config: ReplicationConfig) -> Self {
215        Self {
216            inner: Arc::new(ReplicationInner {
217                replicas: DashMap::new(),
218                worker_health: DashMap::new(),
219                config,
220                stats: Arc::new(ReplicationStats::default()),
221            }),
222        }
223    }
224
225    /// Create with default configuration.
226    pub fn with_defaults() -> Self {
227        Self::new(ReplicationConfig::default())
228    }
229
230    /// Create replica set for data.
231    pub fn create_replicas(
232        &self,
233        data_id: String,
234        size_bytes: u64,
235        available_workers: &[WorkerId],
236    ) -> Result<ReplicaSet> {
237        if available_workers.len() < self.inner.config.min_replicas {
238            return Err(ClusterError::ReplicaPlacementError(format!(
239                "Not enough workers: need {}, have {}",
240                self.inner.config.min_replicas,
241                available_workers.len()
242            )));
243        }
244
245        // Select workers for replicas
246        let selected_workers =
247            self.select_replica_workers(available_workers, self.inner.config.replication_factor)?;
248
249        let now = Instant::now();
250        let mut replicas = Vec::new();
251
252        for worker_id in selected_workers {
253            let health = self.inner.worker_health.get(&worker_id);
254
255            let replica = Replica {
256                worker_id,
257                status: ReplicaStatus::Healthy,
258                version: 1,
259                created_at: now,
260                last_verified: Some(now),
261                rack_id: health.as_ref().and_then(|h| h.rack_id.clone()),
262                zone_id: health.as_ref().and_then(|h| h.zone_id.clone()),
263            };
264
265            replicas.push(replica);
266
267            self.inner
268                .stats
269                .replicas_created
270                .fetch_add(1, Ordering::Relaxed);
271        }
272
273        let replica_set = ReplicaSet {
274            data_id: data_id.clone(),
275            replicas,
276            primary: None, // Will be elected if needed
277            version: 1,
278            created_at: now,
279            last_updated: now,
280            size_bytes,
281        };
282
283        self.inner.replicas.insert(data_id, replica_set.clone());
284
285        Ok(replica_set)
286    }
287
288    /// Select workers for replica placement.
289    fn select_replica_workers(
290        &self,
291        available: &[WorkerId],
292        count: usize,
293    ) -> Result<Vec<WorkerId>> {
294        let count = count.min(available.len());
295
296        match self.inner.config.placement_strategy {
297            PlacementStrategy::Random => {
298                // Simple random selection
299                Ok(available.iter().take(count).copied().collect())
300            }
301            PlacementStrategy::LeastLoaded => {
302                // Sort by load and select least loaded
303                let mut workers_with_load: Vec<_> = available
304                    .iter()
305                    .map(|&id| {
306                        let load = self
307                            .inner
308                            .worker_health
309                            .get(&id)
310                            .map(|h| h.load)
311                            .unwrap_or(1.0);
312                        (id, load)
313                    })
314                    .collect();
315
316                workers_with_load
317                    .sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
318
319                Ok(workers_with_load
320                    .into_iter()
321                    .take(count)
322                    .map(|(id, _)| id)
323                    .collect())
324            }
325            PlacementStrategy::RackAware => {
326                // Spread across racks
327                self.select_rack_aware(available, count)
328            }
329            PlacementStrategy::ZoneAware => {
330                // Spread across zones
331                self.select_zone_aware(available, count)
332            }
333        }
334    }
335
336    /// Select workers with rack awareness.
337    fn select_rack_aware(&self, available: &[WorkerId], count: usize) -> Result<Vec<WorkerId>> {
338        let mut selected = Vec::new();
339        let mut used_racks = HashSet::new();
340
341        // First pass: select one from each rack
342        for &worker_id in available {
343            if selected.len() >= count {
344                break;
345            }
346
347            if let Some(health) = self.inner.worker_health.get(&worker_id) {
348                if let Some(rack_id) = &health.rack_id {
349                    if !used_racks.contains(rack_id) {
350                        selected.push(worker_id);
351                        used_racks.insert(rack_id.clone());
352                    }
353                }
354            }
355        }
356
357        // Second pass: fill remaining slots
358        for &worker_id in available {
359            if selected.len() >= count {
360                break;
361            }
362            if !selected.contains(&worker_id) {
363                selected.push(worker_id);
364            }
365        }
366
367        Ok(selected)
368    }
369
370    /// Select workers with zone awareness.
371    fn select_zone_aware(&self, available: &[WorkerId], count: usize) -> Result<Vec<WorkerId>> {
372        let mut selected = Vec::new();
373        let mut used_zones = HashSet::new();
374
375        // First pass: select one from each zone
376        for &worker_id in available {
377            if selected.len() >= count {
378                break;
379            }
380
381            if let Some(health) = self.inner.worker_health.get(&worker_id) {
382                if let Some(zone_id) = &health.zone_id {
383                    if !used_zones.contains(zone_id) {
384                        selected.push(worker_id);
385                        used_zones.insert(zone_id.clone());
386                    }
387                }
388            }
389        }
390
391        // Second pass: fill remaining slots
392        for &worker_id in available {
393            if selected.len() >= count {
394                break;
395            }
396            if !selected.contains(&worker_id) {
397                selected.push(worker_id);
398            }
399        }
400
401        Ok(selected)
402    }
403
404    /// Get replica set for data.
405    pub fn get_replicas(&self, data_id: &str) -> Option<ReplicaSet> {
406        self.inner.replicas.get(data_id).map(|r| r.clone())
407    }
408
409    /// Perform quorum read.
410    pub async fn quorum_read(&self, data_id: &str) -> Result<Vec<WorkerId>> {
411        let replica_set = self
412            .get_replicas(data_id)
413            .ok_or_else(|| ClusterError::DataNotAvailable(data_id.to_string()))?;
414
415        let healthy_replicas: Vec<_> = replica_set
416            .replicas
417            .iter()
418            .filter(|r| r.status == ReplicaStatus::Healthy)
419            .map(|r| r.worker_id)
420            .collect();
421
422        if healthy_replicas.len() < self.inner.config.read_quorum {
423            self.inner
424                .stats
425                .quorum_failures
426                .fetch_add(1, Ordering::Relaxed);
427
428            return Err(ClusterError::QuorumNotReached {
429                required: self.inner.config.read_quorum,
430                actual: healthy_replicas.len(),
431            });
432        }
433
434        self.inner
435            .stats
436            .quorum_reads
437            .fetch_add(1, Ordering::Relaxed);
438
439        Ok(healthy_replicas
440            .into_iter()
441            .take(self.inner.config.read_quorum)
442            .collect())
443    }
444
445    /// Perform quorum write.
446    pub async fn quorum_write(&self, data_id: &str) -> Result<Vec<WorkerId>> {
447        let replica_set = self
448            .get_replicas(data_id)
449            .ok_or_else(|| ClusterError::DataNotAvailable(data_id.to_string()))?;
450
451        let healthy_replicas: Vec<_> = replica_set
452            .replicas
453            .iter()
454            .filter(|r| r.status == ReplicaStatus::Healthy)
455            .map(|r| r.worker_id)
456            .collect();
457
458        if healthy_replicas.len() < self.inner.config.write_quorum {
459            self.inner
460                .stats
461                .quorum_failures
462                .fetch_add(1, Ordering::Relaxed);
463
464            return Err(ClusterError::QuorumNotReached {
465                required: self.inner.config.write_quorum,
466                actual: healthy_replicas.len(),
467            });
468        }
469
470        self.inner
471            .stats
472            .quorum_writes
473            .fetch_add(1, Ordering::Relaxed);
474
475        Ok(healthy_replicas
476            .into_iter()
477            .take(self.inner.config.write_quorum)
478            .collect())
479    }
480
481    /// Update worker health.
482    pub fn update_worker_health(&self, health: WorkerHealth) {
483        self.inner.worker_health.insert(health.worker_id, health);
484    }
485
486    /// Mark replica as failed.
487    pub fn mark_replica_failed(&self, data_id: &str, worker_id: WorkerId) -> Result<()> {
488        if let Some(mut replica_set) = self.inner.replicas.get_mut(data_id) {
489            for replica in &mut replica_set.replicas {
490                if replica.worker_id == worker_id {
491                    replica.status = ReplicaStatus::Unavailable;
492                    warn!(
493                        "Marked replica as unavailable: {} on {}",
494                        data_id, worker_id
495                    );
496                    break;
497                }
498            }
499        }
500
501        Ok(())
502    }
503
504    /// Check and perform re-replication if needed.
505    pub fn check_rereplication(&self, available_workers: &[WorkerId]) -> Vec<(String, WorkerId)> {
506        if !self.inner.config.auto_rereplication {
507            return Vec::new();
508        }
509
510        let mut rereplications = Vec::new();
511
512        for entry in self.inner.replicas.iter() {
513            let data_id = entry.key().clone();
514            let replica_set = entry.value();
515
516            let healthy_count = replica_set
517                .replicas
518                .iter()
519                .filter(|r| r.status == ReplicaStatus::Healthy)
520                .count();
521
522            if healthy_count < self.inner.config.replication_factor {
523                // Need to create more replicas to maintain replication factor
524                let needed = self.inner.config.replication_factor - healthy_count;
525
526                // Find workers not already having this data
527                let existing: HashSet<_> =
528                    replica_set.replicas.iter().map(|r| r.worker_id).collect();
529
530                let candidates: Vec<_> = available_workers
531                    .iter()
532                    .filter(|w| !existing.contains(w))
533                    .copied()
534                    .collect();
535
536                match self.select_replica_workers(&candidates, needed) {
537                    Ok(new_workers) => {
538                        for worker_id in new_workers {
539                            rereplications.push((data_id.clone(), worker_id));
540
541                            self.inner
542                                .stats
543                                .rereplications
544                                .fetch_add(1, Ordering::Relaxed);
545                        }
546                    }
547                    Err(e) => {
548                        warn!(
549                            "Failed to select workers for re-replication of {}: {}",
550                            data_id, e
551                        );
552                    }
553                }
554            }
555        }
556
557        if !rereplications.is_empty() {
558            info!("Scheduled {} re-replications", rereplications.len());
559        }
560
561        rereplications
562    }
563
564    /// Add replica to existing set.
565    pub fn add_replica(&self, data_id: &str, worker_id: WorkerId) -> Result<()> {
566        if let Some(mut replica_set) = self.inner.replicas.get_mut(data_id) {
567            let health = self.inner.worker_health.get(&worker_id);
568
569            let replica = Replica {
570                worker_id,
571                status: ReplicaStatus::Healthy,
572                version: replica_set.version,
573                created_at: Instant::now(),
574                last_verified: Some(Instant::now()),
575                rack_id: health.as_ref().and_then(|h| h.rack_id.clone()),
576                zone_id: health.as_ref().and_then(|h| h.zone_id.clone()),
577            };
578
579            replica_set.replicas.push(replica);
580            replica_set.last_updated = Instant::now();
581
582            self.inner
583                .stats
584                .replicas_created
585                .fetch_add(1, Ordering::Relaxed);
586
587            debug!("Added replica for {} on worker {}", data_id, worker_id);
588        }
589
590        Ok(())
591    }
592
593    /// Remove replica.
594    pub fn remove_replica(&self, data_id: &str, worker_id: WorkerId) -> Result<()> {
595        if let Some(mut replica_set) = self.inner.replicas.get_mut(data_id) {
596            replica_set.replicas.retain(|r| r.worker_id != worker_id);
597            replica_set.last_updated = Instant::now();
598
599            self.inner
600                .stats
601                .replicas_removed
602                .fetch_add(1, Ordering::Relaxed);
603
604            debug!("Removed replica for {} from worker {}", data_id, worker_id);
605        }
606
607        Ok(())
608    }
609
610    /// Get statistics.
611    pub fn get_statistics(&self) -> ReplicationStatistics {
612        ReplicationStatistics {
613            replicas_created: self.inner.stats.replicas_created.load(Ordering::Relaxed),
614            replicas_removed: self.inner.stats.replicas_removed.load(Ordering::Relaxed),
615            rereplications: self.inner.stats.rereplications.load(Ordering::Relaxed),
616            quorum_reads: self.inner.stats.quorum_reads.load(Ordering::Relaxed),
617            quorum_writes: self.inner.stats.quorum_writes.load(Ordering::Relaxed),
618            quorum_failures: self.inner.stats.quorum_failures.load(Ordering::Relaxed),
619            total_replica_sets: self.inner.replicas.len(),
620            total_workers: self.inner.worker_health.len(),
621        }
622    }
623}
624
625/// Replication statistics.
626#[derive(Debug, Clone, Serialize, Deserialize)]
627pub struct ReplicationStatistics {
628    /// Replicas created
629    pub replicas_created: u64,
630
631    /// Replicas removed
632    pub replicas_removed: u64,
633
634    /// Re-replications
635    pub rereplications: u64,
636
637    /// Quorum reads
638    pub quorum_reads: u64,
639
640    /// Quorum writes
641    pub quorum_writes: u64,
642
643    /// Quorum failures
644    pub quorum_failures: u64,
645
646    /// Total replica sets
647    pub total_replica_sets: usize,
648
649    /// Total workers
650    pub total_workers: usize,
651}
652
653#[cfg(test)]
654#[allow(clippy::expect_used, clippy::unwrap_used)]
655mod tests {
656    use super::*;
657
658    #[test]
659    fn test_replication_manager_creation() {
660        let mgr = ReplicationManager::with_defaults();
661        let stats = mgr.get_statistics();
662        assert_eq!(stats.replicas_created, 0);
663    }
664
665    #[test]
666    fn test_create_replicas() {
667        let mgr = ReplicationManager::with_defaults();
668
669        let workers: Vec<_> = (0..5).map(|_| WorkerId::new()).collect();
670
671        let result = mgr.create_replicas("data1".to_string(), 1000, &workers);
672        assert!(result.is_ok());
673
674        if let Ok(replica_set) = result {
675            assert_eq!(replica_set.replicas.len(), 3); // default replication factor
676        }
677    }
678
679    #[tokio::test]
680    async fn test_quorum_operations() {
681        let mgr = ReplicationManager::with_defaults();
682
683        let workers: Vec<_> = (0..5).map(|_| WorkerId::new()).collect();
684
685        mgr.create_replicas("data1".to_string(), 1000, &workers)
686            .ok();
687
688        let read_result = mgr.quorum_read("data1").await;
689        assert!(read_result.is_ok());
690
691        let write_result = mgr.quorum_write("data1").await;
692        assert!(write_result.is_ok());
693    }
694
695    #[test]
696    fn test_rereplication() {
697        let mgr = ReplicationManager::with_defaults();
698
699        let workers: Vec<_> = (0..5).map(|_| WorkerId::new()).collect();
700
701        mgr.create_replicas("data1".to_string(), 1000, &workers)
702            .ok();
703
704        // Mark a replica as failed
705        mgr.mark_replica_failed("data1", workers[0]).ok();
706
707        // Check for re-replication
708        let rereplications = mgr.check_rereplication(&workers);
709        assert!(!rereplications.is_empty());
710    }
711}