1use parking_lot::RwLock;
10use serde::{Deserialize, Serialize};
11use std::collections::{HashMap, HashSet};
12use std::sync::Arc;
13use tracing::{debug, info, warn};
14
15#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct ReplicationConfig {
18 pub default_replication_factor: u32,
20 pub min_write_replicas: u32,
22 pub write_mode: WriteAckMode,
24 pub placement_strategy: PlacementStrategy,
26 pub sync_timeout_ms: u64,
28 pub health_check_interval_ms: u64,
30 pub max_lag_ms: u64,
32 pub auto_failover: bool,
34 pub min_healthy_replicas: u32,
36}
37
38impl Default for ReplicationConfig {
39 fn default() -> Self {
40 Self {
41 default_replication_factor: 3,
42 min_write_replicas: 2,
43 write_mode: WriteAckMode::Majority,
44 placement_strategy: PlacementStrategy::RackAware,
45 sync_timeout_ms: 5000,
46 health_check_interval_ms: 1000,
47 max_lag_ms: 30000,
48 auto_failover: true,
49 min_healthy_replicas: 1,
50 }
51 }
52}
53
54#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
56pub enum WriteAckMode {
57 PrimaryOnly,
59 Majority,
61 All,
63 Quorum(u32),
65}
66
67#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
69pub enum PlacementStrategy {
70 RackAware,
72 ZoneAware,
74 Random,
76 LocalityFirst,
78 TagBased,
80}
81
82#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
84pub enum ReplicaState {
85 InSync,
87 Syncing,
89 Lagging,
91 Offline,
93 Initializing,
95 Decommissioning,
97}
98
99#[derive(Debug, Clone, Serialize, Deserialize)]
101pub struct NodeTopology {
102 pub node_id: String,
104 pub datacenter: String,
106 pub zone: String,
108 pub rack: String,
110 pub tags: HashMap<String, String>,
112}
113
114impl NodeTopology {
115 pub fn new(node_id: String) -> Self {
116 Self {
117 node_id,
118 datacenter: "dc1".to_string(),
119 zone: "zone-a".to_string(),
120 rack: "rack-1".to_string(),
121 tags: HashMap::new(),
122 }
123 }
124
125 pub fn with_location(mut self, datacenter: &str, zone: &str, rack: &str) -> Self {
126 self.datacenter = datacenter.to_string();
127 self.zone = zone.to_string();
128 self.rack = rack.to_string();
129 self
130 }
131
132 pub fn with_tag(mut self, key: &str, value: &str) -> Self {
133 self.tags.insert(key.to_string(), value.to_string());
134 self
135 }
136}
137
138#[derive(Debug, Clone, Serialize, Deserialize)]
140pub struct ReplicaInfo {
141 pub node_id: String,
143 pub state: ReplicaState,
145 pub is_primary: bool,
147 pub lag_ms: u64,
149 pub last_sync: Option<u64>,
151 pub failure_count: u32,
153 pub created_at: u64,
155}
156
157impl ReplicaInfo {
158 pub fn new(node_id: String, is_primary: bool) -> Self {
159 Self {
160 node_id,
161 state: if is_primary {
162 ReplicaState::InSync
163 } else {
164 ReplicaState::Initializing
165 },
166 is_primary,
167 lag_ms: 0,
168 last_sync: None,
169 failure_count: 0,
170 created_at: current_time_ms(),
171 }
172 }
173
174 pub fn is_healthy(&self) -> bool {
175 matches!(self.state, ReplicaState::InSync | ReplicaState::Syncing)
176 }
177}
178
179#[derive(Debug, Clone, Serialize, Deserialize)]
181pub struct ReplicaSet {
182 pub shard_id: String,
184 pub replication_factor: u32,
186 pub primary: Option<String>,
188 pub replicas: HashMap<String, ReplicaInfo>,
190 pub target_replicas: u32,
192 pub version: u64,
194}
195
196impl ReplicaSet {
197 pub fn new(shard_id: String, replication_factor: u32) -> Self {
198 Self {
199 shard_id,
200 replication_factor,
201 primary: None,
202 replicas: HashMap::new(),
203 target_replicas: replication_factor,
204 version: 0,
205 }
206 }
207
208 pub fn healthy_count(&self) -> usize {
210 self.replicas.values().filter(|r| r.is_healthy()).count()
211 }
212
213 pub fn in_sync_count(&self) -> usize {
215 self.replicas
216 .values()
217 .filter(|r| r.state == ReplicaState::InSync)
218 .count()
219 }
220
221 pub fn has_quorum(&self) -> bool {
223 let required = (self.replication_factor / 2) + 1;
224 self.healthy_count() >= required as usize
225 }
226
227 pub fn can_write(&self, mode: WriteAckMode) -> bool {
229 let healthy = self.healthy_count() as u32;
230 match mode {
231 WriteAckMode::PrimaryOnly => self.primary.is_some(),
232 WriteAckMode::Majority => healthy > (self.replication_factor / 2),
233 WriteAckMode::All => healthy >= self.replication_factor,
234 WriteAckMode::Quorum(n) => healthy >= n,
235 }
236 }
237
238 pub fn get_in_sync_replicas(&self) -> Vec<String> {
240 self.replicas
241 .iter()
242 .filter(|(_, r)| r.state == ReplicaState::InSync)
243 .map(|(id, _)| id.clone())
244 .collect()
245 }
246
247 pub fn get_healthy_replicas(&self) -> Vec<String> {
249 self.replicas
250 .iter()
251 .filter(|(_, r)| r.is_healthy())
252 .map(|(id, _)| id.clone())
253 .collect()
254 }
255}
256
257pub struct ReplicaManager {
259 config: ReplicationConfig,
261 replica_sets: Arc<RwLock<HashMap<String, ReplicaSet>>>,
263 node_topology: Arc<RwLock<HashMap<String, NodeTopology>>>,
265 available_nodes: Arc<RwLock<HashSet<String>>>,
267}
268
269impl ReplicaManager {
270 pub fn new(config: ReplicationConfig) -> Self {
272 Self {
273 config,
274 replica_sets: Arc::new(RwLock::new(HashMap::new())),
275 node_topology: Arc::new(RwLock::new(HashMap::new())),
276 available_nodes: Arc::new(RwLock::new(HashSet::new())),
277 }
278 }
279
280 pub fn register_node(&self, topology: NodeTopology) {
282 let node_id = topology.node_id.clone();
283 self.node_topology.write().insert(node_id.clone(), topology);
284 self.available_nodes.write().insert(node_id.clone());
285 info!(node_id = %node_id, "Registered node for replica placement");
286 }
287
288 pub fn deregister_node(&self, node_id: &str) {
290 self.available_nodes.write().remove(node_id);
291 info!(node_id = %node_id, "Deregistered node from replica placement");
292 }
293
294 pub fn create_replica_set(
296 &self,
297 shard_id: &str,
298 replication_factor: Option<u32>,
299 ) -> Result<ReplicaSet, ReplicationError> {
300 let rf = replication_factor.unwrap_or(self.config.default_replication_factor);
301 let available = self.available_nodes.read();
302
303 if available.len() < rf as usize {
304 return Err(ReplicationError::InsufficientNodes {
305 required: rf as usize,
306 available: available.len(),
307 });
308 }
309
310 let mut replica_set = ReplicaSet::new(shard_id.to_string(), rf);
311
312 let selected_nodes = self.select_nodes_for_placement(&available, rf as usize)?;
314
315 let primary_node = selected_nodes[0].clone();
317 replica_set.primary = Some(primary_node.clone());
318 replica_set
319 .replicas
320 .insert(primary_node.clone(), ReplicaInfo::new(primary_node, true));
321
322 for node_id in selected_nodes.into_iter().skip(1) {
324 replica_set
325 .replicas
326 .insert(node_id.clone(), ReplicaInfo::new(node_id, false));
327 }
328
329 self.replica_sets
331 .write()
332 .insert(shard_id.to_string(), replica_set.clone());
333
334 info!(
335 shard_id = %shard_id,
336 replication_factor = rf,
337 "Created replica set"
338 );
339
340 Ok(replica_set)
341 }
342
343 fn select_nodes_for_placement(
345 &self,
346 available: &HashSet<String>,
347 count: usize,
348 ) -> Result<Vec<String>, ReplicationError> {
349 let topology = self.node_topology.read();
350
351 match self.config.placement_strategy {
352 PlacementStrategy::RackAware => self.select_rack_aware(available, &topology, count),
353 PlacementStrategy::ZoneAware => self.select_zone_aware(available, &topology, count),
354 PlacementStrategy::Random => self.select_random(available, count),
355 PlacementStrategy::LocalityFirst => {
356 self.select_locality_first(available, &topology, count)
357 }
358 PlacementStrategy::TagBased => self.select_random(available, count), }
360 }
361
362 fn select_rack_aware(
364 &self,
365 available: &HashSet<String>,
366 topology: &HashMap<String, NodeTopology>,
367 count: usize,
368 ) -> Result<Vec<String>, ReplicationError> {
369 let mut selected = Vec::new();
370 let mut used_racks: HashSet<String> = HashSet::new();
371
372 for node_id in available {
374 if let Some(topo) = topology.get(node_id) {
375 if !used_racks.contains(&topo.rack) {
376 selected.push(node_id.clone());
377 used_racks.insert(topo.rack.clone());
378 if selected.len() >= count {
379 return Ok(selected);
380 }
381 }
382 }
383 }
384
385 for node_id in available {
387 if !selected.contains(node_id) {
388 selected.push(node_id.clone());
389 if selected.len() >= count {
390 return Ok(selected);
391 }
392 }
393 }
394
395 if selected.len() >= count {
396 Ok(selected)
397 } else {
398 Err(ReplicationError::InsufficientNodes {
399 required: count,
400 available: selected.len(),
401 })
402 }
403 }
404
405 fn select_zone_aware(
407 &self,
408 available: &HashSet<String>,
409 topology: &HashMap<String, NodeTopology>,
410 count: usize,
411 ) -> Result<Vec<String>, ReplicationError> {
412 let mut selected = Vec::new();
413 let mut used_zones: HashSet<String> = HashSet::new();
414
415 for node_id in available {
417 if let Some(topo) = topology.get(node_id) {
418 if !used_zones.contains(&topo.zone) {
419 selected.push(node_id.clone());
420 used_zones.insert(topo.zone.clone());
421 if selected.len() >= count {
422 return Ok(selected);
423 }
424 }
425 }
426 }
427
428 for node_id in available {
430 if !selected.contains(node_id) {
431 selected.push(node_id.clone());
432 if selected.len() >= count {
433 return Ok(selected);
434 }
435 }
436 }
437
438 if selected.len() >= count {
439 Ok(selected)
440 } else {
441 Err(ReplicationError::InsufficientNodes {
442 required: count,
443 available: selected.len(),
444 })
445 }
446 }
447
448 fn select_random(
450 &self,
451 available: &HashSet<String>,
452 count: usize,
453 ) -> Result<Vec<String>, ReplicationError> {
454 let nodes: Vec<String> = available.iter().cloned().collect();
455 if nodes.len() < count {
456 return Err(ReplicationError::InsufficientNodes {
457 required: count,
458 available: nodes.len(),
459 });
460 }
461
462 Ok(nodes.into_iter().take(count).collect())
464 }
465
466 fn select_locality_first(
468 &self,
469 available: &HashSet<String>,
470 topology: &HashMap<String, NodeTopology>,
471 count: usize,
472 ) -> Result<Vec<String>, ReplicationError> {
473 let mut selected = Vec::new();
476 let mut reference_topo: Option<&NodeTopology> = None;
477
478 for node_id in available {
479 if let Some(topo) = topology.get(node_id) {
480 if selected.is_empty() {
481 selected.push(node_id.clone());
482 reference_topo = Some(topo);
483 } else if let Some(ref_topo) = reference_topo {
484 if topo.rack == ref_topo.rack || topo.zone == ref_topo.zone {
486 selected.push(node_id.clone());
487 }
488 }
489 if selected.len() >= count {
490 return Ok(selected);
491 }
492 }
493 }
494
495 for node_id in available {
497 if !selected.contains(node_id) {
498 selected.push(node_id.clone());
499 if selected.len() >= count {
500 return Ok(selected);
501 }
502 }
503 }
504
505 if selected.len() >= count {
506 Ok(selected)
507 } else {
508 Err(ReplicationError::InsufficientNodes {
509 required: count,
510 available: selected.len(),
511 })
512 }
513 }
514
515 pub fn get_replica_set(&self, shard_id: &str) -> Option<ReplicaSet> {
517 self.replica_sets.read().get(shard_id).cloned()
518 }
519
520 pub fn update_replica_state(
522 &self,
523 shard_id: &str,
524 node_id: &str,
525 state: ReplicaState,
526 lag_ms: Option<u64>,
527 ) -> Result<(), ReplicationError> {
528 let mut sets = self.replica_sets.write();
529 let set = sets
530 .get_mut(shard_id)
531 .ok_or_else(|| ReplicationError::ShardNotFound(shard_id.to_string()))?;
532
533 let replica = set
534 .replicas
535 .get_mut(node_id)
536 .ok_or_else(|| ReplicationError::ReplicaNotFound(node_id.to_string()))?;
537
538 let old_state = replica.state;
539 replica.state = state;
540 if let Some(lag) = lag_ms {
541 replica.lag_ms = lag;
542 }
543 if state == ReplicaState::InSync {
544 replica.last_sync = Some(current_time_ms());
545 replica.failure_count = 0;
546 }
547 set.version += 1;
548
549 if old_state != state {
550 debug!(
551 shard_id = %shard_id,
552 node_id = %node_id,
553 old_state = ?old_state,
554 new_state = ?state,
555 "Replica state changed"
556 );
557 }
558
559 Ok(())
560 }
561
562 pub fn record_replica_failure(
564 &self,
565 shard_id: &str,
566 node_id: &str,
567 ) -> Result<(), ReplicationError> {
568 let mut sets = self.replica_sets.write();
569 let set = sets
570 .get_mut(shard_id)
571 .ok_or_else(|| ReplicationError::ShardNotFound(shard_id.to_string()))?;
572
573 if let Some(replica) = set.replicas.get_mut(node_id) {
574 replica.failure_count += 1;
575
576 if replica.failure_count >= 3 {
578 replica.state = ReplicaState::Offline;
579 warn!(
580 shard_id = %shard_id,
581 node_id = %node_id,
582 failure_count = replica.failure_count,
583 "Replica marked offline due to failures"
584 );
585 }
586 set.version += 1;
587 }
588
589 Ok(())
590 }
591
592 pub fn promote_replica(
594 &self,
595 shard_id: &str,
596 new_primary_node: &str,
597 ) -> Result<(), ReplicationError> {
598 let mut sets = self.replica_sets.write();
599 let set = sets
600 .get_mut(shard_id)
601 .ok_or_else(|| ReplicationError::ShardNotFound(shard_id.to_string()))?;
602
603 let replica = set
605 .replicas
606 .get(new_primary_node)
607 .ok_or_else(|| ReplicationError::ReplicaNotFound(new_primary_node.to_string()))?;
608
609 if !replica.is_healthy() {
610 return Err(ReplicationError::UnhealthyReplica(
611 new_primary_node.to_string(),
612 ));
613 }
614
615 if let Some(old_primary) = &set.primary {
617 if let Some(old_replica) = set.replicas.get_mut(old_primary) {
618 old_replica.is_primary = false;
619 }
620 }
621
622 if let Some(new_replica) = set.replicas.get_mut(new_primary_node) {
624 new_replica.is_primary = true;
625 }
626
627 let old_primary = set.primary.clone();
628 set.primary = Some(new_primary_node.to_string());
629 set.version += 1;
630
631 info!(
632 shard_id = %shard_id,
633 old_primary = ?old_primary,
634 new_primary = %new_primary_node,
635 "Promoted replica to primary"
636 );
637
638 Ok(())
639 }
640
641 pub fn add_replica(&self, shard_id: &str, node_id: &str) -> Result<(), ReplicationError> {
643 let mut sets = self.replica_sets.write();
644 let set = sets
645 .get_mut(shard_id)
646 .ok_or_else(|| ReplicationError::ShardNotFound(shard_id.to_string()))?;
647
648 if set.replicas.contains_key(node_id) {
649 return Err(ReplicationError::ReplicaExists(node_id.to_string()));
650 }
651
652 if !self.available_nodes.read().contains(node_id) {
653 return Err(ReplicationError::NodeNotAvailable(node_id.to_string()));
654 }
655
656 set.replicas.insert(
657 node_id.to_string(),
658 ReplicaInfo::new(node_id.to_string(), false),
659 );
660 set.version += 1;
661
662 info!(
663 shard_id = %shard_id,
664 node_id = %node_id,
665 "Added new replica"
666 );
667
668 Ok(())
669 }
670
671 pub fn remove_replica(&self, shard_id: &str, node_id: &str) -> Result<(), ReplicationError> {
673 let mut sets = self.replica_sets.write();
674 let set = sets
675 .get_mut(shard_id)
676 .ok_or_else(|| ReplicationError::ShardNotFound(shard_id.to_string()))?;
677
678 if set.primary.as_deref() == Some(node_id) {
680 return Err(ReplicationError::CannotRemovePrimary);
681 }
682
683 if set.replicas.len() <= self.config.min_healthy_replicas as usize {
685 return Err(ReplicationError::MinimumReplicasRequired);
686 }
687
688 set.replicas.remove(node_id);
689 set.version += 1;
690
691 info!(
692 shard_id = %shard_id,
693 node_id = %node_id,
694 "Removed replica"
695 );
696
697 Ok(())
698 }
699
700 pub fn check_under_replicated(&self) -> Vec<(String, usize)> {
702 let sets = self.replica_sets.read();
703 let mut under_replicated = Vec::new();
704
705 for (shard_id, set) in sets.iter() {
706 let healthy = set.healthy_count();
707 if healthy < set.replication_factor as usize {
708 under_replicated
709 .push((shard_id.clone(), set.replication_factor as usize - healthy));
710 }
711 }
712
713 under_replicated
714 }
715
716 pub fn auto_failover(&self, shard_id: &str) -> Result<Option<String>, ReplicationError> {
718 if !self.config.auto_failover {
719 return Ok(None);
720 }
721
722 let sets = self.replica_sets.read();
723 let set = sets
724 .get(shard_id)
725 .ok_or_else(|| ReplicationError::ShardNotFound(shard_id.to_string()))?;
726
727 if let Some(primary) = &set.primary {
729 if let Some(replica) = set.replicas.get(primary) {
730 if replica.is_healthy() {
731 return Ok(None); }
733 }
734 }
735
736 let candidate = set
738 .replicas
739 .iter()
740 .filter(|(_, r)| !r.is_primary && r.state == ReplicaState::InSync)
741 .min_by_key(|(_, r)| r.lag_ms)
742 .map(|(id, _)| id.clone());
743
744 drop(sets);
745
746 if let Some(new_primary) = candidate.clone() {
747 self.promote_replica(shard_id, &new_primary)?;
748 }
749
750 Ok(candidate)
751 }
752
753 pub fn get_stats(&self) -> ReplicationStats {
755 let sets = self.replica_sets.read();
756 let nodes = self.available_nodes.read();
757
758 let mut total_replicas = 0;
759 let mut healthy_replicas = 0;
760 let mut in_sync_replicas = 0;
761 let mut under_replicated = 0;
762
763 for set in sets.values() {
764 total_replicas += set.replicas.len();
765 healthy_replicas += set.healthy_count();
766 in_sync_replicas += set.in_sync_count();
767 if set.healthy_count() < set.replication_factor as usize {
768 under_replicated += 1;
769 }
770 }
771
772 ReplicationStats {
773 total_replica_sets: sets.len(),
774 total_replicas,
775 healthy_replicas,
776 in_sync_replicas,
777 under_replicated_shards: under_replicated,
778 available_nodes: nodes.len(),
779 }
780 }
781
782 pub fn config(&self) -> &ReplicationConfig {
784 &self.config
785 }
786}
787
788#[derive(Debug, Clone, Serialize, Deserialize)]
790pub struct ReplicationStats {
791 pub total_replica_sets: usize,
792 pub total_replicas: usize,
793 pub healthy_replicas: usize,
794 pub in_sync_replicas: usize,
795 pub under_replicated_shards: usize,
796 pub available_nodes: usize,
797}
798
799#[derive(Debug, Clone, thiserror::Error)]
801pub enum ReplicationError {
802 #[error("Insufficient nodes for replication: required {required}, available {available}")]
803 InsufficientNodes { required: usize, available: usize },
804
805 #[error("Shard not found: {0}")]
806 ShardNotFound(String),
807
808 #[error("Replica not found: {0}")]
809 ReplicaNotFound(String),
810
811 #[error("Replica already exists: {0}")]
812 ReplicaExists(String),
813
814 #[error("Node not available: {0}")]
815 NodeNotAvailable(String),
816
817 #[error("Cannot remove primary replica")]
818 CannotRemovePrimary,
819
820 #[error("Minimum replicas required")]
821 MinimumReplicasRequired,
822
823 #[error("Replica is unhealthy: {0}")]
824 UnhealthyReplica(String),
825
826 #[error("No healthy replicas available")]
827 NoHealthyReplicas,
828}
829
830fn current_time_ms() -> u64 {
831 std::time::SystemTime::now()
832 .duration_since(std::time::UNIX_EPOCH)
833 .unwrap_or_default()
834 .as_millis() as u64
835}
836
837#[cfg(test)]
838mod tests {
839 use super::*;
840
841 fn create_test_manager() -> ReplicaManager {
842 let config = ReplicationConfig {
843 default_replication_factor: 3,
844 min_write_replicas: 2,
845 ..Default::default()
846 };
847 let manager = ReplicaManager::new(config);
848
849 for i in 0..5 {
851 let topo = NodeTopology::new(format!("node-{}", i)).with_location(
852 "dc1",
853 &format!("zone-{}", i % 2),
854 &format!("rack-{}", i),
855 );
856 manager.register_node(topo);
857 }
858
859 manager
860 }
861
862 #[test]
863 fn test_create_replica_set() {
864 let manager = create_test_manager();
865
866 let result = manager.create_replica_set("shard-1", None);
867 assert!(result.is_ok());
868
869 let set = result.unwrap();
870 assert_eq!(set.shard_id, "shard-1");
871 assert_eq!(set.replication_factor, 3);
872 assert_eq!(set.replicas.len(), 3);
873 assert!(set.primary.is_some());
874 }
875
876 #[test]
877 fn test_replica_set_with_custom_factor() {
878 let manager = create_test_manager();
879
880 let result = manager.create_replica_set("shard-2", Some(2));
881 assert!(result.is_ok());
882
883 let set = result.unwrap();
884 assert_eq!(set.replication_factor, 2);
885 assert_eq!(set.replicas.len(), 2);
886 }
887
888 #[test]
889 fn test_insufficient_nodes() {
890 let config = ReplicationConfig::default();
891 let manager = ReplicaManager::new(config);
892
893 manager.register_node(NodeTopology::new("node-1".to_string()));
895 manager.register_node(NodeTopology::new("node-2".to_string()));
896
897 let result = manager.create_replica_set("shard-1", Some(3));
898 assert!(matches!(
899 result,
900 Err(ReplicationError::InsufficientNodes { .. })
901 ));
902 }
903
904 #[test]
905 fn test_update_replica_state() {
906 let manager = create_test_manager();
907 manager.create_replica_set("shard-1", None).unwrap();
908
909 let set = manager.get_replica_set("shard-1").unwrap();
910 let node_id = set.replicas.keys().next().unwrap().clone();
911
912 let result =
913 manager.update_replica_state("shard-1", &node_id, ReplicaState::Syncing, Some(100));
914 assert!(result.is_ok());
915
916 let updated_set = manager.get_replica_set("shard-1").unwrap();
917 let replica = updated_set.replicas.get(&node_id).unwrap();
918 assert_eq!(replica.state, ReplicaState::Syncing);
919 assert_eq!(replica.lag_ms, 100);
920 }
921
922 #[test]
923 fn test_promote_replica() {
924 let manager = create_test_manager();
925 manager.create_replica_set("shard-1", None).unwrap();
926
927 let set = manager.get_replica_set("shard-1").unwrap();
928 let old_primary = set.primary.clone().unwrap();
929
930 let new_primary = set
932 .replicas
933 .iter()
934 .find(|(_, r)| !r.is_primary)
935 .map(|(id, _)| id.clone())
936 .unwrap();
937
938 manager
940 .update_replica_state("shard-1", &new_primary, ReplicaState::InSync, None)
941 .unwrap();
942
943 let result = manager.promote_replica("shard-1", &new_primary);
945 assert!(result.is_ok());
946
947 let updated_set = manager.get_replica_set("shard-1").unwrap();
948 assert_eq!(updated_set.primary, Some(new_primary.clone()));
949 assert!(!updated_set.replicas.get(&old_primary).unwrap().is_primary);
950 assert!(updated_set.replicas.get(&new_primary).unwrap().is_primary);
951 }
952
953 #[test]
954 fn test_add_remove_replica() {
955 let manager = create_test_manager();
956 manager.create_replica_set("shard-1", Some(2)).unwrap();
957
958 let set = manager.get_replica_set("shard-1").unwrap();
960 let new_node = (0..5)
961 .map(|i| format!("node-{}", i))
962 .find(|n| !set.replicas.contains_key(n))
963 .expect("Should have available node");
964
965 let result = manager.add_replica("shard-1", &new_node);
967 assert!(result.is_ok());
968
969 let set = manager.get_replica_set("shard-1").unwrap();
970 assert_eq!(set.replicas.len(), 3);
971
972 let non_primary = set
974 .replicas
975 .iter()
976 .find(|(_, r)| !r.is_primary)
977 .map(|(id, _)| id.clone())
978 .unwrap();
979
980 let result = manager.remove_replica("shard-1", &non_primary);
981 assert!(result.is_ok());
982
983 let set = manager.get_replica_set("shard-1").unwrap();
984 assert_eq!(set.replicas.len(), 2);
985 }
986
987 #[test]
988 fn test_cannot_remove_primary() {
989 let manager = create_test_manager();
990 manager.create_replica_set("shard-1", None).unwrap();
991
992 let set = manager.get_replica_set("shard-1").unwrap();
993 let primary = set.primary.unwrap();
994
995 let result = manager.remove_replica("shard-1", &primary);
996 assert!(matches!(result, Err(ReplicationError::CannotRemovePrimary)));
997 }
998
999 #[test]
1000 fn test_replica_set_quorum() {
1001 let manager = create_test_manager();
1002 manager.create_replica_set("shard-1", Some(3)).unwrap();
1003
1004 let set = manager.get_replica_set("shard-1").unwrap();
1006 for node_id in set.replicas.keys() {
1007 manager
1008 .update_replica_state("shard-1", node_id, ReplicaState::InSync, None)
1009 .unwrap();
1010 }
1011
1012 let set = manager.get_replica_set("shard-1").unwrap();
1013 assert!(set.has_quorum());
1014 assert!(set.can_write(WriteAckMode::Majority));
1015 assert!(set.can_write(WriteAckMode::All));
1016 }
1017
1018 #[test]
1019 fn test_check_under_replicated() {
1020 let manager = create_test_manager();
1021 manager.create_replica_set("shard-1", Some(3)).unwrap();
1022
1023 let set = manager.get_replica_set("shard-1").unwrap();
1025 let node_id = set.replicas.keys().next().unwrap().clone();
1026 manager
1027 .update_replica_state("shard-1", &node_id, ReplicaState::Offline, None)
1028 .unwrap();
1029
1030 let under_replicated = manager.check_under_replicated();
1031 assert_eq!(under_replicated.len(), 1);
1032 assert_eq!(under_replicated[0].0, "shard-1");
1033 }
1034
1035 #[test]
1036 fn test_replication_stats() {
1037 let manager = create_test_manager();
1038 manager.create_replica_set("shard-1", Some(3)).unwrap();
1039 manager.create_replica_set("shard-2", Some(2)).unwrap();
1040
1041 let stats = manager.get_stats();
1042 assert_eq!(stats.total_replica_sets, 2);
1043 assert_eq!(stats.total_replicas, 5);
1044 assert_eq!(stats.available_nodes, 5);
1045 }
1046
1047 #[test]
1048 fn test_rack_aware_placement() {
1049 let config = ReplicationConfig {
1050 default_replication_factor: 3,
1051 placement_strategy: PlacementStrategy::RackAware,
1052 ..Default::default()
1053 };
1054 let manager = ReplicaManager::new(config);
1055
1056 for i in 0..5 {
1058 let topo = NodeTopology::new(format!("node-{}", i)).with_location(
1059 "dc1",
1060 "zone-a",
1061 &format!("rack-{}", i),
1062 );
1063 manager.register_node(topo);
1064 }
1065
1066 let set = manager.create_replica_set("shard-1", None).unwrap();
1067
1068 let topology = manager.node_topology.read();
1070 let racks: HashSet<_> = set
1071 .replicas
1072 .keys()
1073 .filter_map(|id| topology.get(id))
1074 .map(|t| t.rack.clone())
1075 .collect();
1076
1077 assert_eq!(racks.len(), 3); }
1079
1080 #[test]
1081 fn test_record_replica_failure() {
1082 let manager = create_test_manager();
1083 manager.create_replica_set("shard-1", None).unwrap();
1084
1085 let set = manager.get_replica_set("shard-1").unwrap();
1086 let node_id = set.replicas.keys().next().unwrap().clone();
1087
1088 for _ in 0..3 {
1090 manager.record_replica_failure("shard-1", &node_id).unwrap();
1091 }
1092
1093 let set = manager.get_replica_set("shard-1").unwrap();
1094 let replica = set.replicas.get(&node_id).unwrap();
1095 assert_eq!(replica.state, ReplicaState::Offline);
1096 assert_eq!(replica.failure_count, 3);
1097 }
1098}