Skip to main content

engine/distributed/
replication.rs

1//! Replica Placement and Management for Distributed Dakera
2//!
3//! Provides:
4//! - Configurable replication factor per namespace
5//! - Multiple placement strategies (rack-aware, zone-aware, random)
6//! - Replica state tracking and synchronization
7//! - Automatic failover and promotion
8
9use parking_lot::RwLock;
10use serde::{Deserialize, Serialize};
11use std::collections::{HashMap, HashSet};
12use std::sync::Arc;
13use tracing::{debug, info, warn};
14
15/// Configuration for replication behavior
16#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct ReplicationConfig {
18    /// Default replication factor (number of copies including primary)
19    pub default_replication_factor: u32,
20    /// Minimum replicas required before acknowledging writes
21    pub min_write_replicas: u32,
22    /// Write acknowledgment mode
23    pub write_mode: WriteAckMode,
24    /// Replica placement strategy
25    pub placement_strategy: PlacementStrategy,
26    /// Maximum time to wait for replica sync (milliseconds)
27    pub sync_timeout_ms: u64,
28    /// Interval for replica health checks (milliseconds)
29    pub health_check_interval_ms: u64,
30    /// Maximum replication lag before marking replica as lagging (milliseconds)
31    pub max_lag_ms: u64,
32    /// Enable automatic failover
33    pub auto_failover: bool,
34    /// Minimum healthy replicas before raising alert
35    pub min_healthy_replicas: u32,
36}
37
38impl Default for ReplicationConfig {
39    fn default() -> Self {
40        Self {
41            default_replication_factor: 3,
42            min_write_replicas: 2,
43            write_mode: WriteAckMode::Majority,
44            placement_strategy: PlacementStrategy::RackAware,
45            sync_timeout_ms: 5000,
46            health_check_interval_ms: 1000,
47            max_lag_ms: 30000,
48            auto_failover: true,
49            min_healthy_replicas: 1,
50        }
51    }
52}
53
54/// Write acknowledgment modes
55#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
56pub enum WriteAckMode {
57    /// Wait for primary only
58    PrimaryOnly,
59    /// Wait for majority of replicas (n/2 + 1)
60    Majority,
61    /// Wait for all replicas
62    All,
63    /// Wait for specific number of replicas
64    Quorum(u32),
65}
66
67/// Strategy for placing replicas across nodes
68#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
69pub enum PlacementStrategy {
70    /// Spread replicas across different racks
71    RackAware,
72    /// Spread replicas across different availability zones
73    ZoneAware,
74    /// Random placement (for testing or simple deployments)
75    Random,
76    /// Place replicas close together for low latency
77    LocalityFirst,
78    /// Custom placement based on node tags
79    TagBased,
80}
81
82/// State of a replica
83#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
84pub enum ReplicaState {
85    /// Replica is in sync with primary
86    InSync,
87    /// Replica is catching up (replication lag)
88    Syncing,
89    /// Replica is significantly behind
90    Lagging,
91    /// Replica is unavailable
92    Offline,
93    /// New replica being initialized
94    Initializing,
95    /// Replica is being removed
96    Decommissioning,
97}
98
99/// Information about a node's topology placement
100#[derive(Debug, Clone, Serialize, Deserialize)]
101pub struct NodeTopology {
102    /// Node identifier
103    pub node_id: String,
104    /// Data center / region
105    pub datacenter: String,
106    /// Availability zone
107    pub zone: String,
108    /// Physical rack
109    pub rack: String,
110    /// Custom tags for placement decisions
111    pub tags: HashMap<String, String>,
112}
113
114impl NodeTopology {
115    pub fn new(node_id: String) -> Self {
116        Self {
117            node_id,
118            datacenter: "dc1".to_string(),
119            zone: "zone-a".to_string(),
120            rack: "rack-1".to_string(),
121            tags: HashMap::new(),
122        }
123    }
124
125    pub fn with_location(mut self, datacenter: &str, zone: &str, rack: &str) -> Self {
126        self.datacenter = datacenter.to_string();
127        self.zone = zone.to_string();
128        self.rack = rack.to_string();
129        self
130    }
131
132    pub fn with_tag(mut self, key: &str, value: &str) -> Self {
133        self.tags.insert(key.to_string(), value.to_string());
134        self
135    }
136}
137
138/// Information about a single replica
139#[derive(Debug, Clone, Serialize, Deserialize)]
140pub struct ReplicaInfo {
141    /// Node hosting this replica
142    pub node_id: String,
143    /// Current state of the replica
144    pub state: ReplicaState,
145    /// Whether this is the primary replica
146    pub is_primary: bool,
147    /// Last known replication lag in milliseconds
148    pub lag_ms: u64,
149    /// Last successful sync timestamp
150    pub last_sync: Option<u64>,
151    /// Number of consecutive failures
152    pub failure_count: u32,
153    /// Timestamp when replica was created
154    pub created_at: u64,
155}
156
157impl ReplicaInfo {
158    pub fn new(node_id: String, is_primary: bool) -> Self {
159        Self {
160            node_id,
161            state: if is_primary {
162                ReplicaState::InSync
163            } else {
164                ReplicaState::Initializing
165            },
166            is_primary,
167            lag_ms: 0,
168            last_sync: None,
169            failure_count: 0,
170            created_at: current_time_ms(),
171        }
172    }
173
174    pub fn is_healthy(&self) -> bool {
175        matches!(self.state, ReplicaState::InSync | ReplicaState::Syncing)
176    }
177}
178
179/// Replica set for a shard/namespace
180#[derive(Debug, Clone, Serialize, Deserialize)]
181pub struct ReplicaSet {
182    /// Shard or namespace identifier
183    pub shard_id: String,
184    /// Configured replication factor
185    pub replication_factor: u32,
186    /// Primary replica node
187    pub primary: Option<String>,
188    /// All replicas (including primary)
189    pub replicas: HashMap<String, ReplicaInfo>,
190    /// Target replication factor (may differ during rebalancing)
191    pub target_replicas: u32,
192    /// Version for optimistic concurrency
193    pub version: u64,
194}
195
196impl ReplicaSet {
197    pub fn new(shard_id: String, replication_factor: u32) -> Self {
198        Self {
199            shard_id,
200            replication_factor,
201            primary: None,
202            replicas: HashMap::new(),
203            target_replicas: replication_factor,
204            version: 0,
205        }
206    }
207
208    /// Get count of healthy replicas
209    pub fn healthy_count(&self) -> usize {
210        self.replicas.values().filter(|r| r.is_healthy()).count()
211    }
212
213    /// Get count of in-sync replicas
214    pub fn in_sync_count(&self) -> usize {
215        self.replicas
216            .values()
217            .filter(|r| r.state == ReplicaState::InSync)
218            .count()
219    }
220
221    /// Check if replica set has quorum
222    pub fn has_quorum(&self) -> bool {
223        let required = (self.replication_factor / 2) + 1;
224        self.healthy_count() >= required as usize
225    }
226
227    /// Check if replica set can accept writes
228    pub fn can_write(&self, mode: WriteAckMode) -> bool {
229        let healthy = self.healthy_count() as u32;
230        match mode {
231            WriteAckMode::PrimaryOnly => self.primary.is_some(),
232            WriteAckMode::Majority => healthy > (self.replication_factor / 2),
233            WriteAckMode::All => healthy >= self.replication_factor,
234            WriteAckMode::Quorum(n) => healthy >= n,
235        }
236    }
237
238    /// Get all in-sync replica node IDs
239    pub fn get_in_sync_replicas(&self) -> Vec<String> {
240        self.replicas
241            .iter()
242            .filter(|(_, r)| r.state == ReplicaState::InSync)
243            .map(|(id, _)| id.clone())
244            .collect()
245    }
246
247    /// Get all healthy replica node IDs
248    pub fn get_healthy_replicas(&self) -> Vec<String> {
249        self.replicas
250            .iter()
251            .filter(|(_, r)| r.is_healthy())
252            .map(|(id, _)| id.clone())
253            .collect()
254    }
255}
256
257/// Manager for replica placement and lifecycle
258pub struct ReplicaManager {
259    /// Replication configuration
260    config: ReplicationConfig,
261    /// Replica sets indexed by shard ID
262    replica_sets: Arc<RwLock<HashMap<String, ReplicaSet>>>,
263    /// Node topology information
264    node_topology: Arc<RwLock<HashMap<String, NodeTopology>>>,
265    /// Available nodes for placement
266    available_nodes: Arc<RwLock<HashSet<String>>>,
267}
268
269impl ReplicaManager {
270    /// Create a new replica manager
271    pub fn new(config: ReplicationConfig) -> Self {
272        Self {
273            config,
274            replica_sets: Arc::new(RwLock::new(HashMap::new())),
275            node_topology: Arc::new(RwLock::new(HashMap::new())),
276            available_nodes: Arc::new(RwLock::new(HashSet::new())),
277        }
278    }
279
280    /// Register a node with its topology
281    pub fn register_node(&self, topology: NodeTopology) {
282        let node_id = topology.node_id.clone();
283        self.node_topology.write().insert(node_id.clone(), topology);
284        self.available_nodes.write().insert(node_id.clone());
285        info!(node_id = %node_id, "Registered node for replica placement");
286    }
287
288    /// Remove a node from available nodes
289    pub fn deregister_node(&self, node_id: &str) {
290        self.available_nodes.write().remove(node_id);
291        info!(node_id = %node_id, "Deregistered node from replica placement");
292    }
293
294    /// Create a new replica set for a shard
295    pub fn create_replica_set(
296        &self,
297        shard_id: &str,
298        replication_factor: Option<u32>,
299    ) -> Result<ReplicaSet, ReplicationError> {
300        let rf = replication_factor.unwrap_or(self.config.default_replication_factor);
301        let available = self.available_nodes.read();
302
303        if available.len() < rf as usize {
304            return Err(ReplicationError::InsufficientNodes {
305                required: rf as usize,
306                available: available.len(),
307            });
308        }
309
310        let mut replica_set = ReplicaSet::new(shard_id.to_string(), rf);
311
312        // Select nodes for placement
313        let selected_nodes = self.select_nodes_for_placement(&available, rf as usize)?;
314
315        // First node is primary
316        let primary_node = selected_nodes[0].clone();
317        replica_set.primary = Some(primary_node.clone());
318        replica_set
319            .replicas
320            .insert(primary_node.clone(), ReplicaInfo::new(primary_node, true));
321
322        // Remaining nodes are replicas
323        for node_id in selected_nodes.into_iter().skip(1) {
324            replica_set
325                .replicas
326                .insert(node_id.clone(), ReplicaInfo::new(node_id, false));
327        }
328
329        // Store the replica set
330        self.replica_sets
331            .write()
332            .insert(shard_id.to_string(), replica_set.clone());
333
334        info!(
335            shard_id = %shard_id,
336            replication_factor = rf,
337            "Created replica set"
338        );
339
340        Ok(replica_set)
341    }
342
343    /// Select nodes for replica placement based on strategy
344    fn select_nodes_for_placement(
345        &self,
346        available: &HashSet<String>,
347        count: usize,
348    ) -> Result<Vec<String>, ReplicationError> {
349        let topology = self.node_topology.read();
350
351        match self.config.placement_strategy {
352            PlacementStrategy::RackAware => self.select_rack_aware(available, &topology, count),
353            PlacementStrategy::ZoneAware => self.select_zone_aware(available, &topology, count),
354            PlacementStrategy::Random => self.select_random(available, count),
355            PlacementStrategy::LocalityFirst => {
356                self.select_locality_first(available, &topology, count)
357            }
358            PlacementStrategy::TagBased => self.select_random(available, count), // Simplified
359        }
360    }
361
362    /// Select nodes spread across different racks
363    fn select_rack_aware(
364        &self,
365        available: &HashSet<String>,
366        topology: &HashMap<String, NodeTopology>,
367        count: usize,
368    ) -> Result<Vec<String>, ReplicationError> {
369        let mut selected = Vec::new();
370        let mut used_racks: HashSet<String> = HashSet::new();
371
372        // First pass: prefer nodes from different racks
373        for node_id in available {
374            if let Some(topo) = topology.get(node_id) {
375                if !used_racks.contains(&topo.rack) {
376                    selected.push(node_id.clone());
377                    used_racks.insert(topo.rack.clone());
378                    if selected.len() >= count {
379                        return Ok(selected);
380                    }
381                }
382            }
383        }
384
385        // Second pass: fill remaining with any available nodes
386        for node_id in available {
387            if !selected.contains(node_id) {
388                selected.push(node_id.clone());
389                if selected.len() >= count {
390                    return Ok(selected);
391                }
392            }
393        }
394
395        if selected.len() >= count {
396            Ok(selected)
397        } else {
398            Err(ReplicationError::InsufficientNodes {
399                required: count,
400                available: selected.len(),
401            })
402        }
403    }
404
405    /// Select nodes spread across different zones
406    fn select_zone_aware(
407        &self,
408        available: &HashSet<String>,
409        topology: &HashMap<String, NodeTopology>,
410        count: usize,
411    ) -> Result<Vec<String>, ReplicationError> {
412        let mut selected = Vec::new();
413        let mut used_zones: HashSet<String> = HashSet::new();
414
415        // First pass: prefer nodes from different zones
416        for node_id in available {
417            if let Some(topo) = topology.get(node_id) {
418                if !used_zones.contains(&topo.zone) {
419                    selected.push(node_id.clone());
420                    used_zones.insert(topo.zone.clone());
421                    if selected.len() >= count {
422                        return Ok(selected);
423                    }
424                }
425            }
426        }
427
428        // Second pass: fill remaining
429        for node_id in available {
430            if !selected.contains(node_id) {
431                selected.push(node_id.clone());
432                if selected.len() >= count {
433                    return Ok(selected);
434                }
435            }
436        }
437
438        if selected.len() >= count {
439            Ok(selected)
440        } else {
441            Err(ReplicationError::InsufficientNodes {
442                required: count,
443                available: selected.len(),
444            })
445        }
446    }
447
448    /// Select nodes randomly
449    fn select_random(
450        &self,
451        available: &HashSet<String>,
452        count: usize,
453    ) -> Result<Vec<String>, ReplicationError> {
454        let nodes: Vec<String> = available.iter().cloned().collect();
455        if nodes.len() < count {
456            return Err(ReplicationError::InsufficientNodes {
457                required: count,
458                available: nodes.len(),
459            });
460        }
461
462        // Simple selection (in production, use proper random selection)
463        Ok(nodes.into_iter().take(count).collect())
464    }
465
466    /// Select nodes prioritizing locality (same rack/zone when possible)
467    fn select_locality_first(
468        &self,
469        available: &HashSet<String>,
470        topology: &HashMap<String, NodeTopology>,
471        count: usize,
472    ) -> Result<Vec<String>, ReplicationError> {
473        // For locality-first, we pick first available node and then
474        // try to pick nodes from the same rack/zone
475        let mut selected = Vec::new();
476        let mut reference_topo: Option<&NodeTopology> = None;
477
478        for node_id in available {
479            if let Some(topo) = topology.get(node_id) {
480                if selected.is_empty() {
481                    selected.push(node_id.clone());
482                    reference_topo = Some(topo);
483                } else if let Some(ref_topo) = reference_topo {
484                    // Prefer same rack, then same zone
485                    if topo.rack == ref_topo.rack || topo.zone == ref_topo.zone {
486                        selected.push(node_id.clone());
487                    }
488                }
489                if selected.len() >= count {
490                    return Ok(selected);
491                }
492            }
493        }
494
495        // Fill remaining with any nodes
496        for node_id in available {
497            if !selected.contains(node_id) {
498                selected.push(node_id.clone());
499                if selected.len() >= count {
500                    return Ok(selected);
501                }
502            }
503        }
504
505        if selected.len() >= count {
506            Ok(selected)
507        } else {
508            Err(ReplicationError::InsufficientNodes {
509                required: count,
510                available: selected.len(),
511            })
512        }
513    }
514
515    /// Get replica set for a shard
516    pub fn get_replica_set(&self, shard_id: &str) -> Option<ReplicaSet> {
517        self.replica_sets.read().get(shard_id).cloned()
518    }
519
520    /// Update replica state
521    pub fn update_replica_state(
522        &self,
523        shard_id: &str,
524        node_id: &str,
525        state: ReplicaState,
526        lag_ms: Option<u64>,
527    ) -> Result<(), ReplicationError> {
528        let mut sets = self.replica_sets.write();
529        let set = sets
530            .get_mut(shard_id)
531            .ok_or_else(|| ReplicationError::ShardNotFound(shard_id.to_string()))?;
532
533        let replica = set
534            .replicas
535            .get_mut(node_id)
536            .ok_or_else(|| ReplicationError::ReplicaNotFound(node_id.to_string()))?;
537
538        let old_state = replica.state;
539        replica.state = state;
540        if let Some(lag) = lag_ms {
541            replica.lag_ms = lag;
542        }
543        if state == ReplicaState::InSync {
544            replica.last_sync = Some(current_time_ms());
545            replica.failure_count = 0;
546        }
547        set.version += 1;
548
549        if old_state != state {
550            debug!(
551                shard_id = %shard_id,
552                node_id = %node_id,
553                old_state = ?old_state,
554                new_state = ?state,
555                "Replica state changed"
556            );
557        }
558
559        Ok(())
560    }
561
562    /// Record a replica failure
563    pub fn record_replica_failure(
564        &self,
565        shard_id: &str,
566        node_id: &str,
567    ) -> Result<(), ReplicationError> {
568        let mut sets = self.replica_sets.write();
569        let set = sets
570            .get_mut(shard_id)
571            .ok_or_else(|| ReplicationError::ShardNotFound(shard_id.to_string()))?;
572
573        if let Some(replica) = set.replicas.get_mut(node_id) {
574            replica.failure_count += 1;
575
576            // Mark as offline after too many failures
577            if replica.failure_count >= 3 {
578                replica.state = ReplicaState::Offline;
579                warn!(
580                    shard_id = %shard_id,
581                    node_id = %node_id,
582                    failure_count = replica.failure_count,
583                    "Replica marked offline due to failures"
584                );
585            }
586            set.version += 1;
587        }
588
589        Ok(())
590    }
591
592    /// Promote a replica to primary
593    pub fn promote_replica(
594        &self,
595        shard_id: &str,
596        new_primary_node: &str,
597    ) -> Result<(), ReplicationError> {
598        let mut sets = self.replica_sets.write();
599        let set = sets
600            .get_mut(shard_id)
601            .ok_or_else(|| ReplicationError::ShardNotFound(shard_id.to_string()))?;
602
603        // Verify the new primary exists and is healthy
604        let replica = set
605            .replicas
606            .get(new_primary_node)
607            .ok_or_else(|| ReplicationError::ReplicaNotFound(new_primary_node.to_string()))?;
608
609        if !replica.is_healthy() {
610            return Err(ReplicationError::UnhealthyReplica(
611                new_primary_node.to_string(),
612            ));
613        }
614
615        // Demote old primary
616        if let Some(old_primary) = &set.primary {
617            if let Some(old_replica) = set.replicas.get_mut(old_primary) {
618                old_replica.is_primary = false;
619            }
620        }
621
622        // Promote new primary
623        if let Some(new_replica) = set.replicas.get_mut(new_primary_node) {
624            new_replica.is_primary = true;
625        }
626
627        let old_primary = set.primary.clone();
628        set.primary = Some(new_primary_node.to_string());
629        set.version += 1;
630
631        info!(
632            shard_id = %shard_id,
633            old_primary = ?old_primary,
634            new_primary = %new_primary_node,
635            "Promoted replica to primary"
636        );
637
638        Ok(())
639    }
640
641    /// Add a new replica to a shard
642    pub fn add_replica(&self, shard_id: &str, node_id: &str) -> Result<(), ReplicationError> {
643        let mut sets = self.replica_sets.write();
644        let set = sets
645            .get_mut(shard_id)
646            .ok_or_else(|| ReplicationError::ShardNotFound(shard_id.to_string()))?;
647
648        if set.replicas.contains_key(node_id) {
649            return Err(ReplicationError::ReplicaExists(node_id.to_string()));
650        }
651
652        if !self.available_nodes.read().contains(node_id) {
653            return Err(ReplicationError::NodeNotAvailable(node_id.to_string()));
654        }
655
656        set.replicas.insert(
657            node_id.to_string(),
658            ReplicaInfo::new(node_id.to_string(), false),
659        );
660        set.version += 1;
661
662        info!(
663            shard_id = %shard_id,
664            node_id = %node_id,
665            "Added new replica"
666        );
667
668        Ok(())
669    }
670
671    /// Remove a replica from a shard
672    pub fn remove_replica(&self, shard_id: &str, node_id: &str) -> Result<(), ReplicationError> {
673        let mut sets = self.replica_sets.write();
674        let set = sets
675            .get_mut(shard_id)
676            .ok_or_else(|| ReplicationError::ShardNotFound(shard_id.to_string()))?;
677
678        // Cannot remove primary
679        if set.primary.as_deref() == Some(node_id) {
680            return Err(ReplicationError::CannotRemovePrimary);
681        }
682
683        // Ensure minimum replicas
684        if set.replicas.len() <= self.config.min_healthy_replicas as usize {
685            return Err(ReplicationError::MinimumReplicasRequired);
686        }
687
688        set.replicas.remove(node_id);
689        set.version += 1;
690
691        info!(
692            shard_id = %shard_id,
693            node_id = %node_id,
694            "Removed replica"
695        );
696
697        Ok(())
698    }
699
700    /// Get nodes that need replicas (automatic failover check)
701    pub fn check_under_replicated(&self) -> Vec<(String, usize)> {
702        let sets = self.replica_sets.read();
703        let mut under_replicated = Vec::new();
704
705        for (shard_id, set) in sets.iter() {
706            let healthy = set.healthy_count();
707            if healthy < set.replication_factor as usize {
708                under_replicated
709                    .push((shard_id.clone(), set.replication_factor as usize - healthy));
710            }
711        }
712
713        under_replicated
714    }
715
716    /// Auto-failover: promote replica if primary is down
717    pub fn auto_failover(&self, shard_id: &str) -> Result<Option<String>, ReplicationError> {
718        if !self.config.auto_failover {
719            return Ok(None);
720        }
721
722        let sets = self.replica_sets.read();
723        let set = sets
724            .get(shard_id)
725            .ok_or_else(|| ReplicationError::ShardNotFound(shard_id.to_string()))?;
726
727        // Check if primary is healthy
728        if let Some(primary) = &set.primary {
729            if let Some(replica) = set.replicas.get(primary) {
730                if replica.is_healthy() {
731                    return Ok(None); // Primary is fine
732                }
733            }
734        }
735
736        // Find best candidate for promotion
737        let candidate = set
738            .replicas
739            .iter()
740            .filter(|(_, r)| !r.is_primary && r.state == ReplicaState::InSync)
741            .min_by_key(|(_, r)| r.lag_ms)
742            .map(|(id, _)| id.clone());
743
744        drop(sets);
745
746        if let Some(new_primary) = candidate.clone() {
747            self.promote_replica(shard_id, &new_primary)?;
748        }
749
750        Ok(candidate)
751    }
752
753    /// Get replication statistics
754    pub fn get_stats(&self) -> ReplicationStats {
755        let sets = self.replica_sets.read();
756        let nodes = self.available_nodes.read();
757
758        let mut total_replicas = 0;
759        let mut healthy_replicas = 0;
760        let mut in_sync_replicas = 0;
761        let mut under_replicated = 0;
762
763        for set in sets.values() {
764            total_replicas += set.replicas.len();
765            healthy_replicas += set.healthy_count();
766            in_sync_replicas += set.in_sync_count();
767            if set.healthy_count() < set.replication_factor as usize {
768                under_replicated += 1;
769            }
770        }
771
772        ReplicationStats {
773            total_replica_sets: sets.len(),
774            total_replicas,
775            healthy_replicas,
776            in_sync_replicas,
777            under_replicated_shards: under_replicated,
778            available_nodes: nodes.len(),
779        }
780    }
781
782    /// Get configuration
783    pub fn config(&self) -> &ReplicationConfig {
784        &self.config
785    }
786}
787
788/// Statistics about replication state
789#[derive(Debug, Clone, Serialize, Deserialize)]
790pub struct ReplicationStats {
791    pub total_replica_sets: usize,
792    pub total_replicas: usize,
793    pub healthy_replicas: usize,
794    pub in_sync_replicas: usize,
795    pub under_replicated_shards: usize,
796    pub available_nodes: usize,
797}
798
799/// Errors that can occur during replication operations
800#[derive(Debug, Clone, thiserror::Error)]
801pub enum ReplicationError {
802    #[error("Insufficient nodes for replication: required {required}, available {available}")]
803    InsufficientNodes { required: usize, available: usize },
804
805    #[error("Shard not found: {0}")]
806    ShardNotFound(String),
807
808    #[error("Replica not found: {0}")]
809    ReplicaNotFound(String),
810
811    #[error("Replica already exists: {0}")]
812    ReplicaExists(String),
813
814    #[error("Node not available: {0}")]
815    NodeNotAvailable(String),
816
817    #[error("Cannot remove primary replica")]
818    CannotRemovePrimary,
819
820    #[error("Minimum replicas required")]
821    MinimumReplicasRequired,
822
823    #[error("Replica is unhealthy: {0}")]
824    UnhealthyReplica(String),
825
826    #[error("No healthy replicas available")]
827    NoHealthyReplicas,
828}
829
830fn current_time_ms() -> u64 {
831    std::time::SystemTime::now()
832        .duration_since(std::time::UNIX_EPOCH)
833        .unwrap_or_default()
834        .as_millis() as u64
835}
836
837#[cfg(test)]
838mod tests {
839    use super::*;
840
841    fn create_test_manager() -> ReplicaManager {
842        let config = ReplicationConfig {
843            default_replication_factor: 3,
844            min_write_replicas: 2,
845            ..Default::default()
846        };
847        let manager = ReplicaManager::new(config);
848
849        // Register test nodes
850        for i in 0..5 {
851            let topo = NodeTopology::new(format!("node-{}", i)).with_location(
852                "dc1",
853                &format!("zone-{}", i % 2),
854                &format!("rack-{}", i),
855            );
856            manager.register_node(topo);
857        }
858
859        manager
860    }
861
862    #[test]
863    fn test_create_replica_set() {
864        let manager = create_test_manager();
865
866        let result = manager.create_replica_set("shard-1", None);
867        assert!(result.is_ok());
868
869        let set = result.unwrap();
870        assert_eq!(set.shard_id, "shard-1");
871        assert_eq!(set.replication_factor, 3);
872        assert_eq!(set.replicas.len(), 3);
873        assert!(set.primary.is_some());
874    }
875
876    #[test]
877    fn test_replica_set_with_custom_factor() {
878        let manager = create_test_manager();
879
880        let result = manager.create_replica_set("shard-2", Some(2));
881        assert!(result.is_ok());
882
883        let set = result.unwrap();
884        assert_eq!(set.replication_factor, 2);
885        assert_eq!(set.replicas.len(), 2);
886    }
887
888    #[test]
889    fn test_insufficient_nodes() {
890        let config = ReplicationConfig::default();
891        let manager = ReplicaManager::new(config);
892
893        // Only register 2 nodes for replication factor of 3
894        manager.register_node(NodeTopology::new("node-1".to_string()));
895        manager.register_node(NodeTopology::new("node-2".to_string()));
896
897        let result = manager.create_replica_set("shard-1", Some(3));
898        assert!(matches!(
899            result,
900            Err(ReplicationError::InsufficientNodes { .. })
901        ));
902    }
903
904    #[test]
905    fn test_update_replica_state() {
906        let manager = create_test_manager();
907        manager.create_replica_set("shard-1", None).unwrap();
908
909        let set = manager.get_replica_set("shard-1").unwrap();
910        let node_id = set.replicas.keys().next().unwrap().clone();
911
912        let result =
913            manager.update_replica_state("shard-1", &node_id, ReplicaState::Syncing, Some(100));
914        assert!(result.is_ok());
915
916        let updated_set = manager.get_replica_set("shard-1").unwrap();
917        let replica = updated_set.replicas.get(&node_id).unwrap();
918        assert_eq!(replica.state, ReplicaState::Syncing);
919        assert_eq!(replica.lag_ms, 100);
920    }
921
922    #[test]
923    fn test_promote_replica() {
924        let manager = create_test_manager();
925        manager.create_replica_set("shard-1", None).unwrap();
926
927        let set = manager.get_replica_set("shard-1").unwrap();
928        let old_primary = set.primary.clone().unwrap();
929
930        // Find a non-primary replica
931        let new_primary = set
932            .replicas
933            .iter()
934            .find(|(_, r)| !r.is_primary)
935            .map(|(id, _)| id.clone())
936            .unwrap();
937
938        // Mark it as in-sync first
939        manager
940            .update_replica_state("shard-1", &new_primary, ReplicaState::InSync, None)
941            .unwrap();
942
943        // Promote
944        let result = manager.promote_replica("shard-1", &new_primary);
945        assert!(result.is_ok());
946
947        let updated_set = manager.get_replica_set("shard-1").unwrap();
948        assert_eq!(updated_set.primary, Some(new_primary.clone()));
949        assert!(!updated_set.replicas.get(&old_primary).unwrap().is_primary);
950        assert!(updated_set.replicas.get(&new_primary).unwrap().is_primary);
951    }
952
953    #[test]
954    fn test_add_remove_replica() {
955        let manager = create_test_manager();
956        manager.create_replica_set("shard-1", Some(2)).unwrap();
957
958        // Find a node not already in the replica set
959        let set = manager.get_replica_set("shard-1").unwrap();
960        let new_node = (0..5)
961            .map(|i| format!("node-{}", i))
962            .find(|n| !set.replicas.contains_key(n))
963            .expect("Should have available node");
964
965        // Add a replica using a node that's not already in the set
966        let result = manager.add_replica("shard-1", &new_node);
967        assert!(result.is_ok());
968
969        let set = manager.get_replica_set("shard-1").unwrap();
970        assert_eq!(set.replicas.len(), 3);
971
972        // Remove a non-primary replica
973        let non_primary = set
974            .replicas
975            .iter()
976            .find(|(_, r)| !r.is_primary)
977            .map(|(id, _)| id.clone())
978            .unwrap();
979
980        let result = manager.remove_replica("shard-1", &non_primary);
981        assert!(result.is_ok());
982
983        let set = manager.get_replica_set("shard-1").unwrap();
984        assert_eq!(set.replicas.len(), 2);
985    }
986
987    #[test]
988    fn test_cannot_remove_primary() {
989        let manager = create_test_manager();
990        manager.create_replica_set("shard-1", None).unwrap();
991
992        let set = manager.get_replica_set("shard-1").unwrap();
993        let primary = set.primary.unwrap();
994
995        let result = manager.remove_replica("shard-1", &primary);
996        assert!(matches!(result, Err(ReplicationError::CannotRemovePrimary)));
997    }
998
999    #[test]
1000    fn test_replica_set_quorum() {
1001        let manager = create_test_manager();
1002        manager.create_replica_set("shard-1", Some(3)).unwrap();
1003
1004        // Mark all as in-sync
1005        let set = manager.get_replica_set("shard-1").unwrap();
1006        for node_id in set.replicas.keys() {
1007            manager
1008                .update_replica_state("shard-1", node_id, ReplicaState::InSync, None)
1009                .unwrap();
1010        }
1011
1012        let set = manager.get_replica_set("shard-1").unwrap();
1013        assert!(set.has_quorum());
1014        assert!(set.can_write(WriteAckMode::Majority));
1015        assert!(set.can_write(WriteAckMode::All));
1016    }
1017
1018    #[test]
1019    fn test_check_under_replicated() {
1020        let manager = create_test_manager();
1021        manager.create_replica_set("shard-1", Some(3)).unwrap();
1022
1023        // Mark one replica as offline
1024        let set = manager.get_replica_set("shard-1").unwrap();
1025        let node_id = set.replicas.keys().next().unwrap().clone();
1026        manager
1027            .update_replica_state("shard-1", &node_id, ReplicaState::Offline, None)
1028            .unwrap();
1029
1030        let under_replicated = manager.check_under_replicated();
1031        assert_eq!(under_replicated.len(), 1);
1032        assert_eq!(under_replicated[0].0, "shard-1");
1033    }
1034
1035    #[test]
1036    fn test_replication_stats() {
1037        let manager = create_test_manager();
1038        manager.create_replica_set("shard-1", Some(3)).unwrap();
1039        manager.create_replica_set("shard-2", Some(2)).unwrap();
1040
1041        let stats = manager.get_stats();
1042        assert_eq!(stats.total_replica_sets, 2);
1043        assert_eq!(stats.total_replicas, 5);
1044        assert_eq!(stats.available_nodes, 5);
1045    }
1046
1047    #[test]
1048    fn test_rack_aware_placement() {
1049        let config = ReplicationConfig {
1050            default_replication_factor: 3,
1051            placement_strategy: PlacementStrategy::RackAware,
1052            ..Default::default()
1053        };
1054        let manager = ReplicaManager::new(config);
1055
1056        // Register nodes in different racks
1057        for i in 0..5 {
1058            let topo = NodeTopology::new(format!("node-{}", i)).with_location(
1059                "dc1",
1060                "zone-a",
1061                &format!("rack-{}", i),
1062            );
1063            manager.register_node(topo);
1064        }
1065
1066        let set = manager.create_replica_set("shard-1", None).unwrap();
1067
1068        // Check that replicas are in different racks
1069        let topology = manager.node_topology.read();
1070        let racks: HashSet<_> = set
1071            .replicas
1072            .keys()
1073            .filter_map(|id| topology.get(id))
1074            .map(|t| t.rack.clone())
1075            .collect();
1076
1077        assert_eq!(racks.len(), 3); // 3 different racks for 3 replicas
1078    }
1079
1080    #[test]
1081    fn test_record_replica_failure() {
1082        let manager = create_test_manager();
1083        manager.create_replica_set("shard-1", None).unwrap();
1084
1085        let set = manager.get_replica_set("shard-1").unwrap();
1086        let node_id = set.replicas.keys().next().unwrap().clone();
1087
1088        // Record multiple failures
1089        for _ in 0..3 {
1090            manager.record_replica_failure("shard-1", &node_id).unwrap();
1091        }
1092
1093        let set = manager.get_replica_set("shard-1").unwrap();
1094        let replica = set.replicas.get(&node_id).unwrap();
1095        assert_eq!(replica.state, ReplicaState::Offline);
1096        assert_eq!(replica.failure_count, 3);
1097    }
1098}