1use anyhow::{anyhow, Result};
26use parking_lot::{Mutex, RwLock};
27use serde::{Deserialize, Serialize};
28use std::collections::{HashMap, VecDeque};
29use std::sync::Arc;
30use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
31use tracing::{debug, info, warn};
32
33pub type DcId = String;
35
36type VectorStateMap = HashMap<String, (Vec<f32>, HashMap<String, String>, ReplicationSeq)>;
38
39pub type ReplicationSeq = u64;
41
42#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct CrossDcConfig {
45 pub dc_id: DcId,
47 pub region: String,
49 pub is_primary: bool,
51 pub max_lag_tolerance: Duration,
53 pub replication_batch_size: usize,
55 pub retry_interval: Duration,
57 pub max_retries: usize,
59 pub bandwidth_limit_bps: u64,
61 pub compression_level: u8,
63 pub conflict_detection: bool,
65 pub conflict_resolution: ConflictResolutionStrategy,
67 pub heartbeat_interval: Duration,
69 pub connection_timeout: Duration,
71}
72
73impl Default for CrossDcConfig {
74 fn default() -> Self {
75 Self {
76 dc_id: "dc-primary".to_string(),
77 region: "us-east-1".to_string(),
78 is_primary: true,
79 max_lag_tolerance: Duration::from_secs(30),
80 replication_batch_size: 500,
81 retry_interval: Duration::from_secs(5),
82 max_retries: 10,
83 bandwidth_limit_bps: 0,
84 compression_level: 3,
85 conflict_detection: true,
86 conflict_resolution: ConflictResolutionStrategy::LastWriteWins,
87 heartbeat_interval: Duration::from_secs(10),
88 connection_timeout: Duration::from_secs(30),
89 }
90 }
91}
92
93#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
95pub enum ConflictResolutionStrategy {
96 LastWriteWins,
98 PrimaryWins,
100 ReplicaWins,
102 KeepBoth,
104 MergeMetadata,
106}
107
108#[derive(Debug, Clone, Serialize, Deserialize)]
110pub struct ReplicationEntry {
111 pub seq: ReplicationSeq,
113 pub source_dc: DcId,
115 pub timestamp_ms: u64,
117 pub operation: ReplicationOperation,
119 pub payload_bytes: usize,
121}
122
123#[derive(Debug, Clone, Serialize, Deserialize)]
125pub enum ReplicationOperation {
126 Upsert {
128 vector_id: String,
129 vector: Vec<f32>,
130 metadata: HashMap<String, String>,
131 },
132 Delete { vector_id: String },
134 Snapshot {
136 entries: Vec<(String, Vec<f32>, HashMap<String, String>)>,
137 as_of_seq: ReplicationSeq,
138 },
139 Heartbeat { current_seq: ReplicationSeq },
141 NoOp,
143}
144
145#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
147pub enum ReplicaStatus {
148 Healthy,
150 Lagging,
152 Disconnected,
154 Failed,
156 Catching,
158}
159
160impl std::fmt::Display for ReplicaStatus {
161 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
162 match self {
163 Self::Healthy => write!(f, "Healthy"),
164 Self::Lagging => write!(f, "Lagging"),
165 Self::Disconnected => write!(f, "Disconnected"),
166 Self::Failed => write!(f, "Failed"),
167 Self::Catching => write!(f, "Catching"),
168 }
169 }
170}
171
172#[derive(Debug, Clone)]
174pub struct ReplicaTracker {
175 pub dc_id: DcId,
177 pub region: String,
179 pub acked_seq: ReplicationSeq,
181 pub status: ReplicaStatus,
183 pub failure_count: usize,
185 pub last_contact: Instant,
187 pub lag: Duration,
189 pub bytes_sent: u64,
191 pub entries_sent: u64,
193}
194
195impl ReplicaTracker {
196 fn new(dc_id: DcId, region: String) -> Self {
197 Self {
198 dc_id,
199 region,
200 acked_seq: 0,
201 status: ReplicaStatus::Catching,
202 failure_count: 0,
203 last_contact: Instant::now(),
204 lag: Duration::ZERO,
205 bytes_sent: 0,
206 entries_sent: 0,
207 }
208 }
209
210 fn on_success(&mut self, new_acked_seq: ReplicationSeq, bytes: u64, entries: u64) {
212 self.acked_seq = new_acked_seq;
213 self.failure_count = 0;
214 self.last_contact = Instant::now();
215 self.bytes_sent += bytes;
216 self.entries_sent += entries;
217 }
218
219 fn on_failure(&mut self) {
221 self.failure_count += 1;
222 self.status = if self.failure_count > 5 {
223 ReplicaStatus::Disconnected
224 } else {
225 ReplicaStatus::Lagging
226 };
227 }
228
229 fn update_lag(&mut self, primary_seq: ReplicationSeq) {
231 let lag_entries = primary_seq.saturating_sub(self.acked_seq);
232 self.lag = Duration::from_millis(lag_entries);
234
235 self.status = match lag_entries {
236 0 => ReplicaStatus::Healthy,
237 1..=100 => ReplicaStatus::Healthy,
238 101..=1000 => ReplicaStatus::Lagging,
239 _ => ReplicaStatus::Catching,
240 };
241 }
242}
243
244#[derive(Debug, Clone, Default, Serialize, Deserialize)]
246pub struct CrossDcStats {
247 pub total_entries: u64,
249 pub total_bytes: u64,
251 pub current_lag_entries: u64,
253 pub current_lag_ms: u64,
255 pub conflicts_detected: u64,
257 pub conflicts_resolved: u64,
259 pub total_retries: u64,
261 pub failed_entries: u64,
263 pub replica_statuses: HashMap<DcId, String>,
265 pub last_heartbeat_ms: u64,
267}
268
269#[derive(Debug)]
274pub struct PrimaryDcManager {
275 config: CrossDcConfig,
276 replication_log: Arc<RwLock<VecDeque<ReplicationEntry>>>,
278 current_seq: Arc<Mutex<ReplicationSeq>>,
280 replicas: Arc<RwLock<HashMap<DcId, ReplicaTracker>>>,
282 stats: Arc<Mutex<CrossDcStats>>,
284 log_retention_entries: usize,
286}
287
288impl PrimaryDcManager {
289 pub fn new(config: CrossDcConfig) -> Result<Self> {
291 if !config.is_primary {
292 return Err(anyhow!("PrimaryDcManager requires is_primary=true"));
293 }
294
295 info!(
296 "Primary DC manager initialized for DC '{}' in region '{}'",
297 config.dc_id, config.region
298 );
299
300 Ok(Self {
301 config,
302 replication_log: Arc::new(RwLock::new(VecDeque::new())),
303 current_seq: Arc::new(Mutex::new(0)),
304 replicas: Arc::new(RwLock::new(HashMap::new())),
305 stats: Arc::new(Mutex::new(CrossDcStats::default())),
306 log_retention_entries: 100_000,
307 })
308 }
309
310 pub fn add_replica(&self, dc_id: DcId, region: String) {
312 let tracker = ReplicaTracker::new(dc_id.clone(), region.clone());
313 self.replicas.write().insert(dc_id.clone(), tracker);
314 info!("Registered replica DC '{}' in region '{}'", dc_id, region);
315 }
316
317 pub fn remove_replica(&self, dc_id: &str) {
319 self.replicas.write().remove(dc_id);
320 info!("Removed replica DC '{}'", dc_id);
321 }
322
323 pub fn publish_upsert(
325 &self,
326 vector_id: String,
327 vector: Vec<f32>,
328 metadata: HashMap<String, String>,
329 ) -> ReplicationSeq {
330 let payload_bytes = vector.len() * 4 + 64; self.publish_entry(
332 ReplicationOperation::Upsert {
333 vector_id,
334 vector,
335 metadata,
336 },
337 payload_bytes,
338 )
339 }
340
341 pub fn publish_delete(&self, vector_id: String) -> ReplicationSeq {
343 self.publish_entry(ReplicationOperation::Delete { vector_id }, 32)
344 }
345
346 pub fn publish_heartbeat(&self) -> ReplicationSeq {
348 let seq = *self.current_seq.lock();
349 self.publish_entry(ReplicationOperation::Heartbeat { current_seq: seq }, 16)
350 }
351
352 fn publish_entry(
353 &self,
354 operation: ReplicationOperation,
355 payload_bytes: usize,
356 ) -> ReplicationSeq {
357 let mut seq = self.current_seq.lock();
358 *seq += 1;
359 let new_seq = *seq;
360
361 let timestamp_ms = SystemTime::now()
362 .duration_since(UNIX_EPOCH)
363 .unwrap_or(Duration::ZERO)
364 .as_millis() as u64;
365
366 let entry = ReplicationEntry {
367 seq: new_seq,
368 source_dc: self.config.dc_id.clone(),
369 timestamp_ms,
370 operation,
371 payload_bytes,
372 };
373
374 let mut log = self.replication_log.write();
375 log.push_back(entry);
376
377 while log.len() > self.log_retention_entries {
379 log.pop_front();
380 }
381
382 let mut stats = self.stats.lock();
383 stats.total_entries += 1;
384 stats.total_bytes += payload_bytes as u64;
385
386 debug!("Published replication entry seq={} to log", new_seq);
387 new_seq
388 }
389
390 pub fn get_entries_for_replica(
394 &self,
395 _replica_dc: &str,
396 after_seq: ReplicationSeq,
397 ) -> Vec<ReplicationEntry> {
398 let log = self.replication_log.read();
399 log.iter()
400 .filter(|e| e.seq > after_seq)
401 .take(self.config.replication_batch_size)
402 .cloned()
403 .collect()
404 }
405
406 pub fn acknowledge_replica(
408 &self,
409 dc_id: &str,
410 acked_seq: ReplicationSeq,
411 bytes_received: u64,
412 entries_received: u64,
413 ) -> Result<()> {
414 let mut replicas = self.replicas.write();
415 let tracker = replicas
416 .get_mut(dc_id)
417 .ok_or_else(|| anyhow!("Unknown replica DC: {}", dc_id))?;
418
419 tracker.on_success(acked_seq, bytes_received, entries_received);
420
421 let primary_seq = *self.current_seq.lock();
422 tracker.update_lag(primary_seq);
423
424 debug!(
425 "Replica '{}' acked seq={}, lag={} entries",
426 dc_id,
427 acked_seq,
428 primary_seq.saturating_sub(acked_seq)
429 );
430
431 Ok(())
432 }
433
434 pub fn record_replica_failure(&self, dc_id: &str) {
436 let mut replicas = self.replicas.write();
437 if let Some(tracker) = replicas.get_mut(dc_id) {
438 tracker.on_failure();
439 let mut stats = self.stats.lock();
440 stats.total_retries += 1;
441 warn!(
442 "Replica '{}' failure #{} - status: {}",
443 dc_id, tracker.failure_count, tracker.status
444 );
445 }
446 }
447
448 pub fn get_replica_status(&self) -> Vec<(DcId, ReplicaStatus, ReplicationSeq, Duration)> {
450 let replicas = self.replicas.read();
451 replicas
452 .values()
453 .map(|t| (t.dc_id.clone(), t.status, t.acked_seq, t.lag))
454 .collect()
455 }
456
457 pub fn has_lagging_replicas(&self) -> bool {
459 let replicas = self.replicas.read();
460 let primary_seq = *self.current_seq.lock();
461
462 replicas.values().any(|t| {
463 let lag_entries = primary_seq.saturating_sub(t.acked_seq);
464 lag_entries > self.config.replication_batch_size as u64
465 })
466 }
467
468 pub fn max_replica_lag_entries(&self) -> u64 {
470 let replicas = self.replicas.read();
471 let primary_seq = *self.current_seq.lock();
472 replicas
473 .values()
474 .map(|t| primary_seq.saturating_sub(t.acked_seq))
475 .max()
476 .unwrap_or(0)
477 }
478
479 pub fn get_stats(&self) -> CrossDcStats {
481 let replicas = self.replicas.read();
482 let replica_statuses: HashMap<DcId, String> = replicas
483 .iter()
484 .map(|(id, t)| (id.clone(), t.status.to_string()))
485 .collect();
486
487 let mut stats = self.stats.lock().clone();
488 stats.replica_statuses = replica_statuses;
489 stats
490 }
491
492 pub fn current_seq(&self) -> ReplicationSeq {
494 *self.current_seq.lock()
495 }
496
497 pub fn log_length(&self) -> usize {
499 self.replication_log.read().len()
500 }
501
502 pub fn replica_count(&self) -> usize {
504 self.replicas.read().len()
505 }
506}
507
508#[derive(Debug)]
513pub struct ReplicaDcManager {
514 config: CrossDcConfig,
515 last_applied_seq: Arc<Mutex<ReplicationSeq>>,
517 pending_buffer: Arc<Mutex<VecDeque<ReplicationEntry>>>,
519 local_state: Arc<RwLock<VectorStateMap>>,
521 conflict_log: Arc<Mutex<Vec<ConflictRecord>>>,
523 stats: Arc<Mutex<CrossDcStats>>,
525 primary_seq: Arc<Mutex<ReplicationSeq>>,
527 last_heartbeat: Arc<Mutex<Instant>>,
529}
530
531#[derive(Debug, Clone, Serialize, Deserialize)]
533pub struct ConflictRecord {
534 pub vector_id: String,
535 pub replica_seq: ReplicationSeq,
536 pub primary_seq: ReplicationSeq,
537 pub resolution: String,
538 pub timestamp_ms: u64,
539}
540
541impl ReplicaDcManager {
542 pub fn new(config: CrossDcConfig) -> Result<Self> {
544 if config.is_primary {
545 return Err(anyhow!("ReplicaDcManager requires is_primary=false"));
546 }
547
548 info!(
549 "Replica DC manager initialized for DC '{}' in region '{}'",
550 config.dc_id, config.region
551 );
552
553 Ok(Self {
554 config,
555 last_applied_seq: Arc::new(Mutex::new(0)),
556 pending_buffer: Arc::new(Mutex::new(VecDeque::new())),
557 local_state: Arc::new(RwLock::new(HashMap::new())),
558 conflict_log: Arc::new(Mutex::new(Vec::new())),
559 stats: Arc::new(Mutex::new(CrossDcStats::default())),
560 primary_seq: Arc::new(Mutex::new(0)),
561 last_heartbeat: Arc::new(Mutex::new(Instant::now())),
562 })
563 }
564
565 pub fn receive_entries(&self, entries: Vec<ReplicationEntry>) -> ReplicationSeq {
567 if entries.is_empty() {
568 return *self.last_applied_seq.lock();
569 }
570
571 let entries_count = entries.len();
572 let total_bytes: u64 = entries.iter().map(|e| e.payload_bytes as u64).sum();
573
574 let mut buffer = self.pending_buffer.lock();
575 for entry in entries {
576 buffer.push_back(entry);
577 }
578
579 let mut stats = self.stats.lock();
580 stats.total_entries += entries_count as u64;
581 stats.total_bytes += total_bytes;
582
583 *self.last_applied_seq.lock()
584 }
585
586 pub fn apply_pending(&self) -> usize {
588 let mut buffer = self.pending_buffer.lock();
589 let mut local = self.local_state.write();
590 let mut last_seq = self.last_applied_seq.lock();
591 let mut stats = self.stats.lock();
592 let mut applied = 0;
593
594 let mut entries: Vec<ReplicationEntry> = buffer.drain(..).collect();
596 entries.sort_by_key(|e| e.seq);
597
598 for entry in entries {
599 if entry.seq <= *last_seq {
601 debug!("Skipping already-applied seq={}", entry.seq);
602 continue;
603 }
604
605 match &entry.operation {
606 ReplicationOperation::Upsert {
607 vector_id,
608 vector,
609 metadata,
610 } => {
611 let conflict = if let Some((_, _, existing_seq)) = local.get(vector_id.as_str())
613 {
614 *existing_seq > entry.seq && self.config.conflict_detection
615 } else {
616 false
617 };
618
619 if conflict {
620 stats.conflicts_detected += 1;
621 let resolution = self.resolve_conflict(
622 vector_id,
623 entry.seq,
624 local
625 .get(vector_id.as_str())
626 .map(|(_, _, s)| *s)
627 .unwrap_or(0),
628 );
629 if !resolution {
630 debug!("Conflict: keeping local version for '{}'", vector_id);
632 stats.conflicts_resolved += 1;
633 *last_seq = entry.seq;
634 applied += 1;
635 continue;
636 }
637 stats.conflicts_resolved += 1;
638 }
639
640 local.insert(
641 vector_id.clone(),
642 (vector.clone(), metadata.clone(), entry.seq),
643 );
644 applied += 1;
645 debug!("Applied upsert for '{}' at seq={}", vector_id, entry.seq);
646 }
647 ReplicationOperation::Delete { vector_id } => {
648 local.remove(vector_id.as_str());
649 applied += 1;
650 debug!("Applied delete for '{}' at seq={}", vector_id, entry.seq);
651 }
652 ReplicationOperation::Snapshot {
653 entries: snapshot_entries,
654 as_of_seq,
655 } => {
656 local.clear();
658 for (id, vec, meta) in snapshot_entries {
659 local.insert(id.clone(), (vec.clone(), meta.clone(), *as_of_seq));
660 }
661 applied += 1;
662 info!(
663 "Applied snapshot with {} entries at seq={}",
664 snapshot_entries.len(),
665 as_of_seq
666 );
667 }
668 ReplicationOperation::Heartbeat { current_seq } => {
669 *self.primary_seq.lock() = *current_seq;
670 *self.last_heartbeat.lock() = Instant::now();
671
672 let last_heartbeat_ms = SystemTime::now()
673 .duration_since(UNIX_EPOCH)
674 .unwrap_or(Duration::ZERO)
675 .as_millis() as u64;
676 stats.last_heartbeat_ms = last_heartbeat_ms;
677 }
679 ReplicationOperation::NoOp => {
680 }
682 }
683
684 *last_seq = entry.seq;
685 }
686
687 let primary = *self.primary_seq.lock();
689 let last = *last_seq;
690 let lag = primary.saturating_sub(last);
691 stats.current_lag_entries = lag;
692 stats.current_lag_ms = lag; applied
695 }
696
697 fn resolve_conflict(
701 &self,
702 vector_id: &str,
703 incoming_seq: ReplicationSeq,
704 local_seq: ReplicationSeq,
705 ) -> bool {
706 let timestamp_ms = SystemTime::now()
707 .duration_since(UNIX_EPOCH)
708 .unwrap_or(Duration::ZERO)
709 .as_millis() as u64;
710
711 let primary_wins = match self.config.conflict_resolution {
712 ConflictResolutionStrategy::LastWriteWins => incoming_seq > local_seq,
713 ConflictResolutionStrategy::PrimaryWins => true,
714 ConflictResolutionStrategy::ReplicaWins => false,
715 ConflictResolutionStrategy::KeepBoth => false, ConflictResolutionStrategy::MergeMetadata => incoming_seq > local_seq,
717 };
718
719 self.conflict_log.lock().push(ConflictRecord {
720 vector_id: vector_id.to_string(),
721 replica_seq: local_seq,
722 primary_seq: incoming_seq,
723 resolution: if primary_wins {
724 "primary_wins".to_string()
725 } else {
726 "replica_wins".to_string()
727 },
728 timestamp_ms,
729 });
730
731 primary_wins
732 }
733
734 pub fn get_vector(&self, vector_id: &str) -> Option<(Vec<f32>, HashMap<String, String>)> {
736 self.local_state
737 .read()
738 .get(vector_id)
739 .map(|(v, m, _)| (v.clone(), m.clone()))
740 }
741
742 pub fn last_applied_seq(&self) -> ReplicationSeq {
744 *self.last_applied_seq.lock()
745 }
746
747 pub fn lag_entries(&self) -> u64 {
749 let primary = *self.primary_seq.lock();
750 let applied = *self.last_applied_seq.lock();
751 primary.saturating_sub(applied)
752 }
753
754 pub fn is_within_lag_tolerance(&self) -> bool {
756 let lag = self.lag_entries();
757 let tolerance_entries = self.config.max_lag_tolerance.as_millis() as u64;
758 lag <= tolerance_entries
759 }
760
761 pub fn vector_count(&self) -> usize {
763 self.local_state.read().len()
764 }
765
766 pub fn get_stats(&self) -> CrossDcStats {
768 let stats = self.stats.lock();
769 stats.clone()
770 }
771
772 pub fn search_similar(&self, query: &[f32], k: usize) -> Vec<(String, f32)> {
774 let local = self.local_state.read();
775 let mut similarities: Vec<(String, f32)> = local
776 .iter()
777 .filter_map(|(id, (vec, _, _))| {
778 if vec.len() != query.len() {
779 return None;
780 }
781 let dot: f32 = vec.iter().zip(query.iter()).map(|(a, b)| a * b).sum();
782 let na: f32 = vec.iter().map(|x| x * x).sum::<f32>().sqrt();
783 let nb: f32 = query.iter().map(|x| x * x).sum::<f32>().sqrt();
784 let sim = if na < 1e-9 || nb < 1e-9 {
785 0.0
786 } else {
787 dot / (na * nb)
788 };
789 Some((id.clone(), sim))
790 })
791 .collect();
792
793 similarities.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
794 similarities.truncate(k);
795 similarities
796 }
797
798 pub fn primary_heartbeat_recent(&self) -> bool {
800 self.last_heartbeat.lock().elapsed() < self.config.heartbeat_interval * 3
801 }
802
803 pub fn conflict_log(&self) -> Vec<ConflictRecord> {
805 self.conflict_log.lock().clone()
806 }
807
808 pub fn pending_count(&self) -> usize {
810 self.pending_buffer.lock().len()
811 }
812}
813
814#[derive(Debug)]
820pub struct CrossDcCoordinator {
821 primary: Arc<PrimaryDcManager>,
822 replicas: HashMap<DcId, Arc<ReplicaDcManager>>,
823}
824
825impl CrossDcCoordinator {
826 pub fn new(primary: Arc<PrimaryDcManager>) -> Self {
828 Self {
829 primary,
830 replicas: HashMap::new(),
831 }
832 }
833
834 pub fn add_replica_node(&mut self, dc_id: DcId, replica: Arc<ReplicaDcManager>) {
836 self.primary
837 .add_replica(dc_id.clone(), replica.config.region.clone());
838 self.replicas.insert(dc_id, replica);
839 }
840
841 pub fn replicate_once(&self) -> Result<HashMap<DcId, usize>> {
843 let mut applied_counts = HashMap::new();
844
845 for (dc_id, replica) in &self.replicas {
846 let last_seq = replica.last_applied_seq();
847 let entries = self.primary.get_entries_for_replica(dc_id, last_seq);
848
849 if entries.is_empty() {
850 applied_counts.insert(dc_id.clone(), 0);
851 continue;
852 }
853
854 let entry_count = entries.len();
855 let bytes: u64 = entries.iter().map(|e| e.payload_bytes as u64).sum();
856
857 replica.receive_entries(entries);
858 let applied = replica.apply_pending();
859
860 self.primary
861 .acknowledge_replica(dc_id, replica.last_applied_seq(), bytes, entry_count as u64)
862 .map_err(|e| anyhow!("Failed to ack replica {}: {}", dc_id, e))?;
863
864 applied_counts.insert(dc_id.clone(), applied);
865 }
866
867 Ok(applied_counts)
868 }
869
870 pub fn replication_health(&self) -> ReplicationHealth {
872 let has_lagging = self.primary.has_lagging_replicas();
873 let max_lag = self.primary.max_replica_lag_entries();
874
875 let all_healthy = self.replicas.values().all(|r| r.is_within_lag_tolerance());
876
877 ReplicationHealth {
878 is_healthy: !has_lagging && all_healthy,
879 max_lag_entries: max_lag,
880 lagging_replica_count: if has_lagging {
881 self.replicas
882 .values()
883 .filter(|r| !r.is_within_lag_tolerance())
884 .count()
885 } else {
886 0
887 },
888 total_replicas: self.replicas.len(),
889 }
890 }
891}
892
893#[derive(Debug, Clone, Serialize, Deserialize)]
895pub struct ReplicationHealth {
896 pub is_healthy: bool,
898 pub max_lag_entries: u64,
900 pub lagging_replica_count: usize,
902 pub total_replicas: usize,
904}
905
906#[cfg(test)]
907mod tests {
908 use super::*;
909
910 fn make_primary_config() -> CrossDcConfig {
911 CrossDcConfig {
912 dc_id: "dc-us-east".to_string(),
913 region: "us-east-1".to_string(),
914 is_primary: true,
915 max_lag_tolerance: Duration::from_secs(10),
916 replication_batch_size: 100,
917 ..Default::default()
918 }
919 }
920
921 fn make_replica_config(dc_id: &str, region: &str) -> CrossDcConfig {
922 CrossDcConfig {
923 dc_id: dc_id.to_string(),
924 region: region.to_string(),
925 is_primary: false,
926 max_lag_tolerance: Duration::from_secs(10),
927 replication_batch_size: 100,
928 ..Default::default()
929 }
930 }
931
932 #[test]
933 fn test_primary_manager_creation() {
934 let config = make_primary_config();
935 let manager = PrimaryDcManager::new(config);
936 assert!(manager.is_ok(), "Primary manager creation should succeed");
937 }
938
939 #[test]
940 fn test_replica_manager_creation() {
941 let config = make_replica_config("dc-eu-west", "eu-west-1");
942 let manager = ReplicaDcManager::new(config);
943 assert!(manager.is_ok(), "Replica manager creation should succeed");
944 }
945
946 #[test]
947 fn test_primary_requires_is_primary_true() {
948 let mut config = make_primary_config();
949 config.is_primary = false;
950 let result = PrimaryDcManager::new(config);
951 assert!(result.is_err(), "Should fail if is_primary=false");
952 }
953
954 #[test]
955 fn test_replica_requires_is_primary_false() {
956 let mut config = make_replica_config("dc-x", "region-x");
957 config.is_primary = true;
958 let result = ReplicaDcManager::new(config);
959 assert!(result.is_err(), "Should fail if is_primary=true");
960 }
961
962 #[test]
963 fn test_publish_upsert() {
964 let manager = PrimaryDcManager::new(make_primary_config()).unwrap();
965 let seq = manager.publish_upsert("v1".to_string(), vec![1.0, 2.0], HashMap::new());
966 assert_eq!(seq, 1);
967 assert_eq!(manager.log_length(), 1);
968 assert_eq!(manager.current_seq(), 1);
969 }
970
971 #[test]
972 fn test_publish_delete() {
973 let manager = PrimaryDcManager::new(make_primary_config()).unwrap();
974 manager.publish_upsert("v1".to_string(), vec![1.0], HashMap::new());
975 let seq = manager.publish_delete("v1".to_string());
976 assert_eq!(seq, 2);
977 assert_eq!(manager.log_length(), 2);
978 }
979
980 #[test]
981 fn test_publish_heartbeat() {
982 let manager = PrimaryDcManager::new(make_primary_config()).unwrap();
983 let seq = manager.publish_heartbeat();
984 assert_eq!(seq, 1);
985 }
986
987 #[test]
988 fn test_add_and_remove_replica() {
989 let manager = PrimaryDcManager::new(make_primary_config()).unwrap();
990 manager.add_replica("dc-eu".to_string(), "eu-west-1".to_string());
991 assert_eq!(manager.replica_count(), 1);
992 manager.remove_replica("dc-eu");
993 assert_eq!(manager.replica_count(), 0);
994 }
995
996 #[test]
997 fn test_get_entries_for_replica() {
998 let manager = PrimaryDcManager::new(make_primary_config()).unwrap();
999 manager.add_replica("dc-eu".to_string(), "eu-west-1".to_string());
1000
1001 for i in 0..5 {
1002 manager.publish_upsert(format!("v{}", i), vec![i as f32], HashMap::new());
1003 }
1004
1005 let entries = manager.get_entries_for_replica("dc-eu", 0);
1006 assert_eq!(entries.len(), 5);
1007
1008 let partial = manager.get_entries_for_replica("dc-eu", 3);
1009 assert_eq!(partial.len(), 2); }
1011
1012 #[test]
1013 fn test_replica_receive_and_apply() {
1014 let primary = PrimaryDcManager::new(make_primary_config()).unwrap();
1015 primary.add_replica("dc-eu".to_string(), "eu-west-1".to_string());
1016
1017 let replica = ReplicaDcManager::new(make_replica_config("dc-eu", "eu-west-1")).unwrap();
1018
1019 primary.publish_upsert("v1".to_string(), vec![1.0, 0.0], HashMap::new());
1021 primary.publish_upsert("v2".to_string(), vec![0.0, 1.0], HashMap::new());
1022
1023 let entries = primary.get_entries_for_replica("dc-eu", 0);
1025 assert_eq!(entries.len(), 2);
1026 replica.receive_entries(entries);
1027 let applied = replica.apply_pending();
1028
1029 assert_eq!(applied, 2);
1030 assert_eq!(replica.vector_count(), 2);
1031 assert_eq!(replica.last_applied_seq(), 2);
1032
1033 let v1 = replica.get_vector("v1");
1035 assert!(v1.is_some());
1036 assert_eq!(v1.unwrap().0, vec![1.0, 0.0]);
1037 }
1038
1039 #[test]
1040 fn test_replica_apply_delete() {
1041 let primary = PrimaryDcManager::new(make_primary_config()).unwrap();
1042 let replica = ReplicaDcManager::new(make_replica_config("dc-eu", "eu-west-1")).unwrap();
1043 primary.add_replica("dc-eu".to_string(), "eu-west-1".to_string());
1044
1045 primary.publish_upsert("v1".to_string(), vec![1.0], HashMap::new());
1046 primary.publish_delete("v1".to_string());
1047
1048 let entries = primary.get_entries_for_replica("dc-eu", 0);
1049 replica.receive_entries(entries);
1050 replica.apply_pending();
1051
1052 assert_eq!(replica.vector_count(), 0);
1053 assert!(replica.get_vector("v1").is_none());
1054 }
1055
1056 #[test]
1057 fn test_coordinator_replicate_once() {
1058 let primary = Arc::new(PrimaryDcManager::new(make_primary_config()).unwrap());
1059 let replica =
1060 Arc::new(ReplicaDcManager::new(make_replica_config("dc-eu", "eu-west-1")).unwrap());
1061
1062 let mut coordinator = CrossDcCoordinator::new(Arc::clone(&primary));
1063 coordinator.add_replica_node("dc-eu".to_string(), Arc::clone(&replica));
1064
1065 for i in 0..10 {
1067 primary.publish_upsert(format!("v{}", i), vec![i as f32], HashMap::new());
1068 }
1069
1070 let applied = coordinator.replicate_once().unwrap();
1071 assert_eq!(applied.get("dc-eu"), Some(&10));
1072 assert_eq!(replica.vector_count(), 10);
1073 }
1074
1075 #[test]
1076 fn test_coordinator_incremental_replication() {
1077 let primary = Arc::new(PrimaryDcManager::new(make_primary_config()).unwrap());
1078 let replica = Arc::new(
1079 ReplicaDcManager::new(make_replica_config("dc-ap", "ap-southeast-1")).unwrap(),
1080 );
1081
1082 let mut coordinator = CrossDcCoordinator::new(Arc::clone(&primary));
1083 coordinator.add_replica_node("dc-ap".to_string(), Arc::clone(&replica));
1084
1085 for i in 0..5 {
1087 primary.publish_upsert(format!("v{}", i), vec![i as f32], HashMap::new());
1088 }
1089 coordinator.replicate_once().unwrap();
1090 assert_eq!(replica.vector_count(), 5);
1091
1092 for i in 5..10 {
1094 primary.publish_upsert(format!("v{}", i), vec![i as f32], HashMap::new());
1095 }
1096 coordinator.replicate_once().unwrap();
1097 assert_eq!(replica.vector_count(), 10);
1098 }
1099
1100 #[test]
1101 fn test_replication_health_healthy() {
1102 let primary = Arc::new(PrimaryDcManager::new(make_primary_config()).unwrap());
1103 let replica =
1104 Arc::new(ReplicaDcManager::new(make_replica_config("dc-eu", "eu-west-1")).unwrap());
1105
1106 let mut coordinator = CrossDcCoordinator::new(Arc::clone(&primary));
1107 coordinator.add_replica_node("dc-eu".to_string(), Arc::clone(&replica));
1108
1109 primary.publish_upsert("v1".to_string(), vec![1.0], HashMap::new());
1111 coordinator.replicate_once().unwrap();
1112
1113 let health = coordinator.replication_health();
1114 assert_eq!(health.total_replicas, 1);
1115 assert!(health.is_healthy || health.max_lag_entries <= 1);
1117 }
1118
1119 #[test]
1120 fn test_snapshot_operation() {
1121 let _primary = Arc::new(PrimaryDcManager::new(make_primary_config()).unwrap());
1122 let replica =
1123 Arc::new(ReplicaDcManager::new(make_replica_config("dc-eu", "eu-west-1")).unwrap());
1124
1125 let snapshot_entries = vec![
1127 ("v1".to_string(), vec![1.0, 0.0], HashMap::new()),
1128 ("v2".to_string(), vec![0.0, 1.0], HashMap::new()),
1129 ];
1130
1131 let snapshot_op = ReplicationOperation::Snapshot {
1132 entries: snapshot_entries,
1133 as_of_seq: 100,
1134 };
1135
1136 let entry = ReplicationEntry {
1137 seq: 1,
1138 source_dc: "dc-us-east".to_string(),
1139 timestamp_ms: 0,
1140 operation: snapshot_op,
1141 payload_bytes: 256,
1142 };
1143
1144 replica.receive_entries(vec![entry]);
1145 replica.apply_pending();
1146
1147 assert_eq!(replica.vector_count(), 2);
1148 }
1149
1150 #[test]
1151 fn test_heartbeat_replication() {
1152 let primary = Arc::new(PrimaryDcManager::new(make_primary_config()).unwrap());
1153 let replica =
1154 Arc::new(ReplicaDcManager::new(make_replica_config("dc-eu", "eu-west-1")).unwrap());
1155
1156 let mut coordinator = CrossDcCoordinator::new(Arc::clone(&primary));
1157 coordinator.add_replica_node("dc-eu".to_string(), Arc::clone(&replica));
1158
1159 primary.publish_heartbeat();
1160 coordinator.replicate_once().unwrap();
1161
1162 let stats = replica.get_stats();
1165 let _ = stats.total_entries;
1167 }
1168
1169 #[test]
1170 fn test_acknowledge_replica() {
1171 let manager = PrimaryDcManager::new(make_primary_config()).unwrap();
1172 manager.add_replica("dc-eu".to_string(), "eu-west-1".to_string());
1173
1174 for _ in 0..5 {
1175 manager.publish_upsert("v".to_string(), vec![1.0], HashMap::new());
1176 }
1177
1178 let result = manager.acknowledge_replica("dc-eu", 5, 500, 5);
1179 assert!(result.is_ok());
1180
1181 let status = manager.get_replica_status();
1182 assert!(!status.is_empty());
1183 let (_, status_val, acked, _) = &status[0];
1184 assert_eq!(*acked, 5);
1185 assert_eq!(*status_val, ReplicaStatus::Healthy);
1186 }
1187
1188 #[test]
1189 fn test_acknowledge_unknown_replica_fails() {
1190 let manager = PrimaryDcManager::new(make_primary_config()).unwrap();
1191 let result = manager.acknowledge_replica("unknown-dc", 1, 0, 0);
1192 assert!(result.is_err(), "Should fail for unknown replica");
1193 }
1194
1195 #[test]
1196 fn test_record_replica_failure() {
1197 let manager = PrimaryDcManager::new(make_primary_config()).unwrap();
1198 manager.add_replica("dc-eu".to_string(), "eu-west-1".to_string());
1199
1200 for _ in 0..6 {
1201 manager.record_replica_failure("dc-eu");
1202 }
1203
1204 let status = manager.get_replica_status();
1205 let (_, s, _, _) = &status[0];
1206 assert_eq!(*s, ReplicaStatus::Disconnected);
1207 }
1208
1209 #[test]
1210 fn test_replica_search_similar() {
1211 let primary = PrimaryDcManager::new(make_primary_config()).unwrap();
1212 let replica = ReplicaDcManager::new(make_replica_config("dc-eu", "eu-west-1")).unwrap();
1213 primary.add_replica("dc-eu".to_string(), "eu-west-1".to_string());
1214
1215 primary.publish_upsert("v1".to_string(), vec![1.0, 0.0, 0.0], HashMap::new());
1216 primary.publish_upsert("v2".to_string(), vec![0.0, 1.0, 0.0], HashMap::new());
1217
1218 let entries = primary.get_entries_for_replica("dc-eu", 0);
1219 replica.receive_entries(entries);
1220 replica.apply_pending();
1221
1222 let results = replica.search_similar(&[1.0, 0.0, 0.0], 2);
1223 assert!(!results.is_empty());
1224 assert_eq!(results[0].0, "v1");
1225 }
1226
1227 #[test]
1228 fn test_cross_dc_config_default() {
1229 let config = CrossDcConfig::default();
1230 assert!(config.is_primary);
1231 assert_eq!(config.compression_level, 3);
1232 assert!(config.conflict_detection);
1233 }
1234
1235 #[test]
1236 fn test_conflict_resolution_last_write_wins() {
1237 let mut config = make_replica_config("dc-eu", "eu-west-1");
1238 config.conflict_resolution = ConflictResolutionStrategy::LastWriteWins;
1239 config.conflict_detection = true;
1240
1241 let replica = ReplicaDcManager::new(config).unwrap();
1242
1243 {
1245 let mut local = replica.local_state.write();
1246 local.insert(
1247 "v1".to_string(),
1248 (vec![2.0], HashMap::new(), 100), );
1250 }
1251
1252 let entry = ReplicationEntry {
1254 seq: 1,
1255 source_dc: "dc-us-east".to_string(),
1256 timestamp_ms: 0,
1257 operation: ReplicationOperation::Upsert {
1258 vector_id: "v1".to_string(),
1259 vector: vec![1.0],
1260 metadata: HashMap::new(),
1261 },
1262 payload_bytes: 16,
1263 };
1264
1265 replica.receive_entries(vec![entry]);
1266 replica.apply_pending();
1267
1268 let v1 = replica.get_vector("v1");
1270 assert!(v1.is_some());
1271 assert_eq!(v1.unwrap().0, vec![2.0], "Local version should be retained");
1272 }
1273
1274 #[test]
1275 fn test_conflict_resolution_primary_wins() {
1276 let mut config = make_replica_config("dc-eu", "eu-west-1");
1277 config.conflict_resolution = ConflictResolutionStrategy::PrimaryWins;
1278 config.conflict_detection = true;
1279
1280 let replica = ReplicaDcManager::new(config).unwrap();
1281
1282 {
1284 let mut local = replica.local_state.write();
1285 local.insert("v1".to_string(), (vec![2.0], HashMap::new(), 100));
1286 }
1287
1288 let entry = ReplicationEntry {
1290 seq: 1,
1291 source_dc: "dc-us-east".to_string(),
1292 timestamp_ms: 0,
1293 operation: ReplicationOperation::Upsert {
1294 vector_id: "v1".to_string(),
1295 vector: vec![1.0],
1296 metadata: HashMap::new(),
1297 },
1298 payload_bytes: 16,
1299 };
1300
1301 replica.receive_entries(vec![entry]);
1302 replica.apply_pending();
1303
1304 let v1 = replica.get_vector("v1");
1306 assert!(v1.is_some());
1307 assert_eq!(v1.unwrap().0, vec![1.0], "Primary version should win");
1308 }
1309
1310 #[test]
1311 fn test_pending_buffer_tracking() {
1312 let replica = ReplicaDcManager::new(make_replica_config("dc-eu", "eu-west-1")).unwrap();
1313
1314 assert_eq!(replica.pending_count(), 0);
1315
1316 let entry = ReplicationEntry {
1317 seq: 1,
1318 source_dc: "dc-us".to_string(),
1319 timestamp_ms: 0,
1320 operation: ReplicationOperation::NoOp,
1321 payload_bytes: 0,
1322 };
1323
1324 let entry2 = ReplicationEntry {
1326 seq: 1,
1327 source_dc: "dc-us".to_string(),
1328 timestamp_ms: 0,
1329 operation: ReplicationOperation::Heartbeat { current_seq: 0 },
1330 payload_bytes: 0,
1331 };
1332
1333 replica.receive_entries(vec![entry, entry2]);
1334 assert_eq!(replica.pending_count(), 2);
1335 }
1336
1337 #[test]
1338 fn test_max_lag_entries_calculation() {
1339 let manager = PrimaryDcManager::new(make_primary_config()).unwrap();
1340 manager.add_replica("dc-eu".to_string(), "eu-west-1".to_string());
1341
1342 for _ in 0..20 {
1343 manager.publish_upsert("v".to_string(), vec![1.0], HashMap::new());
1344 }
1345
1346 let lag = manager.max_replica_lag_entries();
1347 assert_eq!(lag, 20, "Lag should be 20 entries");
1348 }
1349
1350 #[test]
1351 fn test_replication_stats() {
1352 let manager = PrimaryDcManager::new(make_primary_config()).unwrap();
1353
1354 for i in 0..5 {
1355 manager.publish_upsert(format!("v{}", i), vec![i as f32], HashMap::new());
1356 }
1357
1358 let stats = manager.get_stats();
1359 assert_eq!(stats.total_entries, 5);
1360 assert!(stats.total_bytes > 0);
1361 }
1362}