1use anyhow::{anyhow, Result};
26use parking_lot::{Mutex, RwLock};
27use serde::{Deserialize, Serialize};
28use std::collections::{HashMap, VecDeque};
29use std::sync::Arc;
30use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
31use tracing::{debug, info, warn};
32
33pub type DcId = String;
35
36type VectorStateMap = HashMap<String, (Vec<f32>, HashMap<String, String>, ReplicationSeq)>;
38
39pub type ReplicationSeq = u64;
41
42#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct CrossDcConfig {
45 pub dc_id: DcId,
47 pub region: String,
49 pub is_primary: bool,
51 pub max_lag_tolerance: Duration,
53 pub replication_batch_size: usize,
55 pub retry_interval: Duration,
57 pub max_retries: usize,
59 pub bandwidth_limit_bps: u64,
61 pub compression_level: u8,
63 pub conflict_detection: bool,
65 pub conflict_resolution: ConflictResolutionStrategy,
67 pub heartbeat_interval: Duration,
69 pub connection_timeout: Duration,
71}
72
73impl Default for CrossDcConfig {
74 fn default() -> Self {
75 Self {
76 dc_id: "dc-primary".to_string(),
77 region: "us-east-1".to_string(),
78 is_primary: true,
79 max_lag_tolerance: Duration::from_secs(30),
80 replication_batch_size: 500,
81 retry_interval: Duration::from_secs(5),
82 max_retries: 10,
83 bandwidth_limit_bps: 0,
84 compression_level: 3,
85 conflict_detection: true,
86 conflict_resolution: ConflictResolutionStrategy::LastWriteWins,
87 heartbeat_interval: Duration::from_secs(10),
88 connection_timeout: Duration::from_secs(30),
89 }
90 }
91}
92
93#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
95pub enum ConflictResolutionStrategy {
96 LastWriteWins,
98 PrimaryWins,
100 ReplicaWins,
102 KeepBoth,
104 MergeMetadata,
106}
107
108#[derive(Debug, Clone, Serialize, Deserialize)]
110pub struct ReplicationEntry {
111 pub seq: ReplicationSeq,
113 pub source_dc: DcId,
115 pub timestamp_ms: u64,
117 pub operation: ReplicationOperation,
119 pub payload_bytes: usize,
121}
122
123#[derive(Debug, Clone, Serialize, Deserialize)]
125pub enum ReplicationOperation {
126 Upsert {
128 vector_id: String,
129 vector: Vec<f32>,
130 metadata: HashMap<String, String>,
131 },
132 Delete { vector_id: String },
134 Snapshot {
136 entries: Vec<(String, Vec<f32>, HashMap<String, String>)>,
137 as_of_seq: ReplicationSeq,
138 },
139 Heartbeat { current_seq: ReplicationSeq },
141 NoOp,
143}
144
145#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
147pub enum ReplicaStatus {
148 Healthy,
150 Lagging,
152 Disconnected,
154 Failed,
156 Catching,
158}
159
160impl std::fmt::Display for ReplicaStatus {
161 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
162 match self {
163 Self::Healthy => write!(f, "Healthy"),
164 Self::Lagging => write!(f, "Lagging"),
165 Self::Disconnected => write!(f, "Disconnected"),
166 Self::Failed => write!(f, "Failed"),
167 Self::Catching => write!(f, "Catching"),
168 }
169 }
170}
171
172#[derive(Debug, Clone)]
174pub struct ReplicaTracker {
175 pub dc_id: DcId,
177 pub region: String,
179 pub acked_seq: ReplicationSeq,
181 pub status: ReplicaStatus,
183 pub failure_count: usize,
185 pub last_contact: Instant,
187 pub lag: Duration,
189 pub bytes_sent: u64,
191 pub entries_sent: u64,
193}
194
195impl ReplicaTracker {
196 fn new(dc_id: DcId, region: String) -> Self {
197 Self {
198 dc_id,
199 region,
200 acked_seq: 0,
201 status: ReplicaStatus::Catching,
202 failure_count: 0,
203 last_contact: Instant::now(),
204 lag: Duration::ZERO,
205 bytes_sent: 0,
206 entries_sent: 0,
207 }
208 }
209
210 fn on_success(&mut self, new_acked_seq: ReplicationSeq, bytes: u64, entries: u64) {
212 self.acked_seq = new_acked_seq;
213 self.failure_count = 0;
214 self.last_contact = Instant::now();
215 self.bytes_sent += bytes;
216 self.entries_sent += entries;
217 }
218
219 fn on_failure(&mut self) {
221 self.failure_count += 1;
222 self.status = if self.failure_count > 5 {
223 ReplicaStatus::Disconnected
224 } else {
225 ReplicaStatus::Lagging
226 };
227 }
228
229 fn update_lag(&mut self, primary_seq: ReplicationSeq) {
231 let lag_entries = primary_seq.saturating_sub(self.acked_seq);
232 self.lag = Duration::from_millis(lag_entries);
234
235 self.status = match lag_entries {
236 0 => ReplicaStatus::Healthy,
237 1..=100 => ReplicaStatus::Healthy,
238 101..=1000 => ReplicaStatus::Lagging,
239 _ => ReplicaStatus::Catching,
240 };
241 }
242}
243
244#[derive(Debug, Clone, Default, Serialize, Deserialize)]
246pub struct CrossDcStats {
247 pub total_entries: u64,
249 pub total_bytes: u64,
251 pub current_lag_entries: u64,
253 pub current_lag_ms: u64,
255 pub conflicts_detected: u64,
257 pub conflicts_resolved: u64,
259 pub total_retries: u64,
261 pub failed_entries: u64,
263 pub replica_statuses: HashMap<DcId, String>,
265 pub last_heartbeat_ms: u64,
267}
268
269#[derive(Debug)]
274pub struct PrimaryDcManager {
275 config: CrossDcConfig,
276 replication_log: Arc<RwLock<VecDeque<ReplicationEntry>>>,
278 current_seq: Arc<Mutex<ReplicationSeq>>,
280 replicas: Arc<RwLock<HashMap<DcId, ReplicaTracker>>>,
282 stats: Arc<Mutex<CrossDcStats>>,
284 log_retention_entries: usize,
286}
287
288impl PrimaryDcManager {
289 pub fn new(config: CrossDcConfig) -> Result<Self> {
291 if !config.is_primary {
292 return Err(anyhow!("PrimaryDcManager requires is_primary=true"));
293 }
294
295 info!(
296 "Primary DC manager initialized for DC '{}' in region '{}'",
297 config.dc_id, config.region
298 );
299
300 Ok(Self {
301 config,
302 replication_log: Arc::new(RwLock::new(VecDeque::new())),
303 current_seq: Arc::new(Mutex::new(0)),
304 replicas: Arc::new(RwLock::new(HashMap::new())),
305 stats: Arc::new(Mutex::new(CrossDcStats::default())),
306 log_retention_entries: 100_000,
307 })
308 }
309
310 pub fn add_replica(&self, dc_id: DcId, region: String) {
312 let tracker = ReplicaTracker::new(dc_id.clone(), region.clone());
313 self.replicas.write().insert(dc_id.clone(), tracker);
314 info!("Registered replica DC '{}' in region '{}'", dc_id, region);
315 }
316
317 pub fn remove_replica(&self, dc_id: &str) {
319 self.replicas.write().remove(dc_id);
320 info!("Removed replica DC '{}'", dc_id);
321 }
322
323 pub fn publish_upsert(
325 &self,
326 vector_id: String,
327 vector: Vec<f32>,
328 metadata: HashMap<String, String>,
329 ) -> ReplicationSeq {
330 let payload_bytes = vector.len() * 4 + 64; self.publish_entry(
332 ReplicationOperation::Upsert {
333 vector_id,
334 vector,
335 metadata,
336 },
337 payload_bytes,
338 )
339 }
340
341 pub fn publish_delete(&self, vector_id: String) -> ReplicationSeq {
343 self.publish_entry(ReplicationOperation::Delete { vector_id }, 32)
344 }
345
346 pub fn publish_heartbeat(&self) -> ReplicationSeq {
348 let seq = *self.current_seq.lock();
349 self.publish_entry(ReplicationOperation::Heartbeat { current_seq: seq }, 16)
350 }
351
352 fn publish_entry(
353 &self,
354 operation: ReplicationOperation,
355 payload_bytes: usize,
356 ) -> ReplicationSeq {
357 let mut seq = self.current_seq.lock();
358 *seq += 1;
359 let new_seq = *seq;
360
361 let timestamp_ms = SystemTime::now()
362 .duration_since(UNIX_EPOCH)
363 .unwrap_or(Duration::ZERO)
364 .as_millis() as u64;
365
366 let entry = ReplicationEntry {
367 seq: new_seq,
368 source_dc: self.config.dc_id.clone(),
369 timestamp_ms,
370 operation,
371 payload_bytes,
372 };
373
374 let mut log = self.replication_log.write();
375 log.push_back(entry);
376
377 while log.len() > self.log_retention_entries {
379 log.pop_front();
380 }
381
382 let mut stats = self.stats.lock();
383 stats.total_entries += 1;
384 stats.total_bytes += payload_bytes as u64;
385
386 debug!("Published replication entry seq={} to log", new_seq);
387 new_seq
388 }
389
390 pub fn get_entries_for_replica(
394 &self,
395 _replica_dc: &str,
396 after_seq: ReplicationSeq,
397 ) -> Vec<ReplicationEntry> {
398 let log = self.replication_log.read();
399 log.iter()
400 .filter(|e| e.seq > after_seq)
401 .take(self.config.replication_batch_size)
402 .cloned()
403 .collect()
404 }
405
406 pub fn acknowledge_replica(
408 &self,
409 dc_id: &str,
410 acked_seq: ReplicationSeq,
411 bytes_received: u64,
412 entries_received: u64,
413 ) -> Result<()> {
414 let mut replicas = self.replicas.write();
415 let tracker = replicas
416 .get_mut(dc_id)
417 .ok_or_else(|| anyhow!("Unknown replica DC: {}", dc_id))?;
418
419 tracker.on_success(acked_seq, bytes_received, entries_received);
420
421 let primary_seq = *self.current_seq.lock();
422 tracker.update_lag(primary_seq);
423
424 debug!(
425 "Replica '{}' acked seq={}, lag={} entries",
426 dc_id,
427 acked_seq,
428 primary_seq.saturating_sub(acked_seq)
429 );
430
431 Ok(())
432 }
433
434 pub fn record_replica_failure(&self, dc_id: &str) {
436 let mut replicas = self.replicas.write();
437 if let Some(tracker) = replicas.get_mut(dc_id) {
438 tracker.on_failure();
439 let mut stats = self.stats.lock();
440 stats.total_retries += 1;
441 warn!(
442 "Replica '{}' failure #{} - status: {}",
443 dc_id, tracker.failure_count, tracker.status
444 );
445 }
446 }
447
448 pub fn get_replica_status(&self) -> Vec<(DcId, ReplicaStatus, ReplicationSeq, Duration)> {
450 let replicas = self.replicas.read();
451 replicas
452 .values()
453 .map(|t| (t.dc_id.clone(), t.status, t.acked_seq, t.lag))
454 .collect()
455 }
456
457 pub fn has_lagging_replicas(&self) -> bool {
459 let replicas = self.replicas.read();
460 let primary_seq = *self.current_seq.lock();
461
462 replicas.values().any(|t| {
463 let lag_entries = primary_seq.saturating_sub(t.acked_seq);
464 lag_entries > self.config.replication_batch_size as u64
465 })
466 }
467
468 pub fn max_replica_lag_entries(&self) -> u64 {
470 let replicas = self.replicas.read();
471 let primary_seq = *self.current_seq.lock();
472 replicas
473 .values()
474 .map(|t| primary_seq.saturating_sub(t.acked_seq))
475 .max()
476 .unwrap_or(0)
477 }
478
479 pub fn get_stats(&self) -> CrossDcStats {
481 let replicas = self.replicas.read();
482 let replica_statuses: HashMap<DcId, String> = replicas
483 .iter()
484 .map(|(id, t)| (id.clone(), t.status.to_string()))
485 .collect();
486
487 let mut stats = self.stats.lock().clone();
488 stats.replica_statuses = replica_statuses;
489 stats
490 }
491
492 pub fn current_seq(&self) -> ReplicationSeq {
494 *self.current_seq.lock()
495 }
496
497 pub fn log_length(&self) -> usize {
499 self.replication_log.read().len()
500 }
501
502 pub fn replica_count(&self) -> usize {
504 self.replicas.read().len()
505 }
506}
507
508#[derive(Debug)]
513pub struct ReplicaDcManager {
514 config: CrossDcConfig,
515 last_applied_seq: Arc<Mutex<ReplicationSeq>>,
517 pending_buffer: Arc<Mutex<VecDeque<ReplicationEntry>>>,
519 local_state: Arc<RwLock<VectorStateMap>>,
521 conflict_log: Arc<Mutex<Vec<ConflictRecord>>>,
523 stats: Arc<Mutex<CrossDcStats>>,
525 primary_seq: Arc<Mutex<ReplicationSeq>>,
527 last_heartbeat: Arc<Mutex<Instant>>,
529}
530
531#[derive(Debug, Clone, Serialize, Deserialize)]
533pub struct ConflictRecord {
534 pub vector_id: String,
535 pub replica_seq: ReplicationSeq,
536 pub primary_seq: ReplicationSeq,
537 pub resolution: String,
538 pub timestamp_ms: u64,
539}
540
541impl ReplicaDcManager {
542 pub fn new(config: CrossDcConfig) -> Result<Self> {
544 if config.is_primary {
545 return Err(anyhow!("ReplicaDcManager requires is_primary=false"));
546 }
547
548 info!(
549 "Replica DC manager initialized for DC '{}' in region '{}'",
550 config.dc_id, config.region
551 );
552
553 Ok(Self {
554 config,
555 last_applied_seq: Arc::new(Mutex::new(0)),
556 pending_buffer: Arc::new(Mutex::new(VecDeque::new())),
557 local_state: Arc::new(RwLock::new(HashMap::new())),
558 conflict_log: Arc::new(Mutex::new(Vec::new())),
559 stats: Arc::new(Mutex::new(CrossDcStats::default())),
560 primary_seq: Arc::new(Mutex::new(0)),
561 last_heartbeat: Arc::new(Mutex::new(Instant::now())),
562 })
563 }
564
565 pub fn receive_entries(&self, entries: Vec<ReplicationEntry>) -> ReplicationSeq {
567 if entries.is_empty() {
568 return *self.last_applied_seq.lock();
569 }
570
571 let entries_count = entries.len();
572 let total_bytes: u64 = entries.iter().map(|e| e.payload_bytes as u64).sum();
573
574 let mut buffer = self.pending_buffer.lock();
575 for entry in entries {
576 buffer.push_back(entry);
577 }
578
579 let mut stats = self.stats.lock();
580 stats.total_entries += entries_count as u64;
581 stats.total_bytes += total_bytes;
582
583 *self.last_applied_seq.lock()
584 }
585
586 pub fn apply_pending(&self) -> usize {
588 let mut buffer = self.pending_buffer.lock();
589 let mut local = self.local_state.write();
590 let mut last_seq = self.last_applied_seq.lock();
591 let mut stats = self.stats.lock();
592 let mut applied = 0;
593
594 let mut entries: Vec<ReplicationEntry> = buffer.drain(..).collect();
596 entries.sort_by_key(|e| e.seq);
597
598 for entry in entries {
599 if entry.seq <= *last_seq {
601 debug!("Skipping already-applied seq={}", entry.seq);
602 continue;
603 }
604
605 match &entry.operation {
606 ReplicationOperation::Upsert {
607 vector_id,
608 vector,
609 metadata,
610 } => {
611 let conflict = if let Some((_, _, existing_seq)) = local.get(vector_id.as_str())
613 {
614 *existing_seq > entry.seq && self.config.conflict_detection
615 } else {
616 false
617 };
618
619 if conflict {
620 stats.conflicts_detected += 1;
621 let resolution = self.resolve_conflict(
622 vector_id,
623 entry.seq,
624 local
625 .get(vector_id.as_str())
626 .map(|(_, _, s)| *s)
627 .unwrap_or(0),
628 );
629 if !resolution {
630 debug!("Conflict: keeping local version for '{}'", vector_id);
632 stats.conflicts_resolved += 1;
633 *last_seq = entry.seq;
634 applied += 1;
635 continue;
636 }
637 stats.conflicts_resolved += 1;
638 }
639
640 local.insert(
641 vector_id.clone(),
642 (vector.clone(), metadata.clone(), entry.seq),
643 );
644 applied += 1;
645 debug!("Applied upsert for '{}' at seq={}", vector_id, entry.seq);
646 }
647 ReplicationOperation::Delete { vector_id } => {
648 local.remove(vector_id.as_str());
649 applied += 1;
650 debug!("Applied delete for '{}' at seq={}", vector_id, entry.seq);
651 }
652 ReplicationOperation::Snapshot {
653 entries: snapshot_entries,
654 as_of_seq,
655 } => {
656 local.clear();
658 for (id, vec, meta) in snapshot_entries {
659 local.insert(id.clone(), (vec.clone(), meta.clone(), *as_of_seq));
660 }
661 applied += 1;
662 info!(
663 "Applied snapshot with {} entries at seq={}",
664 snapshot_entries.len(),
665 as_of_seq
666 );
667 }
668 ReplicationOperation::Heartbeat { current_seq } => {
669 *self.primary_seq.lock() = *current_seq;
670 *self.last_heartbeat.lock() = Instant::now();
671
672 let last_heartbeat_ms = SystemTime::now()
673 .duration_since(UNIX_EPOCH)
674 .unwrap_or(Duration::ZERO)
675 .as_millis() as u64;
676 stats.last_heartbeat_ms = last_heartbeat_ms;
677 }
679 ReplicationOperation::NoOp => {
680 }
682 }
683
684 *last_seq = entry.seq;
685 }
686
687 let primary = *self.primary_seq.lock();
689 let last = *last_seq;
690 let lag = primary.saturating_sub(last);
691 stats.current_lag_entries = lag;
692 stats.current_lag_ms = lag; applied
695 }
696
697 fn resolve_conflict(
701 &self,
702 vector_id: &str,
703 incoming_seq: ReplicationSeq,
704 local_seq: ReplicationSeq,
705 ) -> bool {
706 let timestamp_ms = SystemTime::now()
707 .duration_since(UNIX_EPOCH)
708 .unwrap_or(Duration::ZERO)
709 .as_millis() as u64;
710
711 let primary_wins = match self.config.conflict_resolution {
712 ConflictResolutionStrategy::LastWriteWins => incoming_seq > local_seq,
713 ConflictResolutionStrategy::PrimaryWins => true,
714 ConflictResolutionStrategy::ReplicaWins => false,
715 ConflictResolutionStrategy::KeepBoth => false, ConflictResolutionStrategy::MergeMetadata => incoming_seq > local_seq,
717 };
718
719 self.conflict_log.lock().push(ConflictRecord {
720 vector_id: vector_id.to_string(),
721 replica_seq: local_seq,
722 primary_seq: incoming_seq,
723 resolution: if primary_wins {
724 "primary_wins".to_string()
725 } else {
726 "replica_wins".to_string()
727 },
728 timestamp_ms,
729 });
730
731 primary_wins
732 }
733
734 pub fn get_vector(&self, vector_id: &str) -> Option<(Vec<f32>, HashMap<String, String>)> {
736 self.local_state
737 .read()
738 .get(vector_id)
739 .map(|(v, m, _)| (v.clone(), m.clone()))
740 }
741
742 pub fn last_applied_seq(&self) -> ReplicationSeq {
744 *self.last_applied_seq.lock()
745 }
746
747 pub fn lag_entries(&self) -> u64 {
749 let primary = *self.primary_seq.lock();
750 let applied = *self.last_applied_seq.lock();
751 primary.saturating_sub(applied)
752 }
753
754 pub fn is_within_lag_tolerance(&self) -> bool {
756 let lag = self.lag_entries();
757 let tolerance_entries = self.config.max_lag_tolerance.as_millis() as u64;
758 lag <= tolerance_entries
759 }
760
761 pub fn vector_count(&self) -> usize {
763 self.local_state.read().len()
764 }
765
766 pub fn get_stats(&self) -> CrossDcStats {
768 let stats = self.stats.lock();
769 stats.clone()
770 }
771
772 pub fn search_similar(&self, query: &[f32], k: usize) -> Vec<(String, f32)> {
774 let local = self.local_state.read();
775 let mut similarities: Vec<(String, f32)> = local
776 .iter()
777 .filter_map(|(id, (vec, _, _))| {
778 if vec.len() != query.len() {
779 return None;
780 }
781 let dot: f32 = vec.iter().zip(query.iter()).map(|(a, b)| a * b).sum();
782 let na: f32 = vec.iter().map(|x| x * x).sum::<f32>().sqrt();
783 let nb: f32 = query.iter().map(|x| x * x).sum::<f32>().sqrt();
784 let sim = if na < 1e-9 || nb < 1e-9 {
785 0.0
786 } else {
787 dot / (na * nb)
788 };
789 Some((id.clone(), sim))
790 })
791 .collect();
792
793 similarities.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
794 similarities.truncate(k);
795 similarities
796 }
797
798 pub fn primary_heartbeat_recent(&self) -> bool {
800 self.last_heartbeat.lock().elapsed() < self.config.heartbeat_interval * 3
801 }
802
803 pub fn conflict_log(&self) -> Vec<ConflictRecord> {
805 self.conflict_log.lock().clone()
806 }
807
808 pub fn pending_count(&self) -> usize {
810 self.pending_buffer.lock().len()
811 }
812}
813
814#[derive(Debug)]
820pub struct CrossDcCoordinator {
821 primary: Arc<PrimaryDcManager>,
822 replicas: HashMap<DcId, Arc<ReplicaDcManager>>,
823}
824
825impl CrossDcCoordinator {
826 pub fn new(primary: Arc<PrimaryDcManager>) -> Self {
828 Self {
829 primary,
830 replicas: HashMap::new(),
831 }
832 }
833
834 pub fn add_replica_node(&mut self, dc_id: DcId, replica: Arc<ReplicaDcManager>) {
836 self.primary
837 .add_replica(dc_id.clone(), replica.config.region.clone());
838 self.replicas.insert(dc_id, replica);
839 }
840
841 pub fn replicate_once(&self) -> Result<HashMap<DcId, usize>> {
843 let mut applied_counts = HashMap::new();
844
845 for (dc_id, replica) in &self.replicas {
846 let last_seq = replica.last_applied_seq();
847 let entries = self.primary.get_entries_for_replica(dc_id, last_seq);
848
849 if entries.is_empty() {
850 applied_counts.insert(dc_id.clone(), 0);
851 continue;
852 }
853
854 let entry_count = entries.len();
855 let bytes: u64 = entries.iter().map(|e| e.payload_bytes as u64).sum();
856
857 replica.receive_entries(entries);
858 let applied = replica.apply_pending();
859
860 self.primary
861 .acknowledge_replica(dc_id, replica.last_applied_seq(), bytes, entry_count as u64)
862 .map_err(|e| anyhow!("Failed to ack replica {}: {}", dc_id, e))?;
863
864 applied_counts.insert(dc_id.clone(), applied);
865 }
866
867 Ok(applied_counts)
868 }
869
870 pub fn replication_health(&self) -> ReplicationHealth {
872 let has_lagging = self.primary.has_lagging_replicas();
873 let max_lag = self.primary.max_replica_lag_entries();
874
875 let all_healthy = self.replicas.values().all(|r| r.is_within_lag_tolerance());
876
877 ReplicationHealth {
878 is_healthy: !has_lagging && all_healthy,
879 max_lag_entries: max_lag,
880 lagging_replica_count: if has_lagging {
881 self.replicas
882 .values()
883 .filter(|r| !r.is_within_lag_tolerance())
884 .count()
885 } else {
886 0
887 },
888 total_replicas: self.replicas.len(),
889 }
890 }
891}
892
893#[derive(Debug, Clone, Serialize, Deserialize)]
895pub struct ReplicationHealth {
896 pub is_healthy: bool,
898 pub max_lag_entries: u64,
900 pub lagging_replica_count: usize,
902 pub total_replicas: usize,
904}
905
906#[cfg(test)]
907mod tests {
908 use super::*;
909
910 fn make_primary_config() -> CrossDcConfig {
911 CrossDcConfig {
912 dc_id: "dc-us-east".to_string(),
913 region: "us-east-1".to_string(),
914 is_primary: true,
915 max_lag_tolerance: Duration::from_secs(10),
916 replication_batch_size: 100,
917 ..Default::default()
918 }
919 }
920
921 fn make_replica_config(dc_id: &str, region: &str) -> CrossDcConfig {
922 CrossDcConfig {
923 dc_id: dc_id.to_string(),
924 region: region.to_string(),
925 is_primary: false,
926 max_lag_tolerance: Duration::from_secs(10),
927 replication_batch_size: 100,
928 ..Default::default()
929 }
930 }
931
932 #[test]
933 fn test_primary_manager_creation() {
934 let config = make_primary_config();
935 let manager = PrimaryDcManager::new(config);
936 assert!(manager.is_ok(), "Primary manager creation should succeed");
937 }
938
939 #[test]
940 fn test_replica_manager_creation() {
941 let config = make_replica_config("dc-eu-west", "eu-west-1");
942 let manager = ReplicaDcManager::new(config);
943 assert!(manager.is_ok(), "Replica manager creation should succeed");
944 }
945
946 #[test]
947 fn test_primary_requires_is_primary_true() {
948 let mut config = make_primary_config();
949 config.is_primary = false;
950 let result = PrimaryDcManager::new(config);
951 assert!(result.is_err(), "Should fail if is_primary=false");
952 }
953
954 #[test]
955 fn test_replica_requires_is_primary_false() {
956 let mut config = make_replica_config("dc-x", "region-x");
957 config.is_primary = true;
958 let result = ReplicaDcManager::new(config);
959 assert!(result.is_err(), "Should fail if is_primary=true");
960 }
961
962 #[test]
963 fn test_publish_upsert() -> Result<()> {
964 let manager = PrimaryDcManager::new(make_primary_config())?;
965 let seq = manager.publish_upsert("v1".to_string(), vec![1.0, 2.0], HashMap::new());
966 assert_eq!(seq, 1);
967 assert_eq!(manager.log_length(), 1);
968 assert_eq!(manager.current_seq(), 1);
969 Ok(())
970 }
971
972 #[test]
973 fn test_publish_delete() -> Result<()> {
974 let manager = PrimaryDcManager::new(make_primary_config())?;
975 manager.publish_upsert("v1".to_string(), vec![1.0], HashMap::new());
976 let seq = manager.publish_delete("v1".to_string());
977 assert_eq!(seq, 2);
978 assert_eq!(manager.log_length(), 2);
979 Ok(())
980 }
981
982 #[test]
983 fn test_publish_heartbeat() -> Result<()> {
984 let manager = PrimaryDcManager::new(make_primary_config())?;
985 let seq = manager.publish_heartbeat();
986 assert_eq!(seq, 1);
987 Ok(())
988 }
989
990 #[test]
991 fn test_add_and_remove_replica() -> Result<()> {
992 let manager = PrimaryDcManager::new(make_primary_config())?;
993 manager.add_replica("dc-eu".to_string(), "eu-west-1".to_string());
994 assert_eq!(manager.replica_count(), 1);
995 manager.remove_replica("dc-eu");
996 assert_eq!(manager.replica_count(), 0);
997 Ok(())
998 }
999
1000 #[test]
1001 fn test_get_entries_for_replica() -> Result<()> {
1002 let manager = PrimaryDcManager::new(make_primary_config())?;
1003 manager.add_replica("dc-eu".to_string(), "eu-west-1".to_string());
1004
1005 for i in 0..5 {
1006 manager.publish_upsert(format!("v{}", i), vec![i as f32], HashMap::new());
1007 }
1008
1009 let entries = manager.get_entries_for_replica("dc-eu", 0);
1010 assert_eq!(entries.len(), 5);
1011
1012 let partial = manager.get_entries_for_replica("dc-eu", 3);
1013 assert_eq!(partial.len(), 2); Ok(())
1015 }
1016
1017 #[test]
1018 fn test_replica_receive_and_apply() -> Result<()> {
1019 let primary = PrimaryDcManager::new(make_primary_config())?;
1020 primary.add_replica("dc-eu".to_string(), "eu-west-1".to_string());
1021
1022 let replica = ReplicaDcManager::new(make_replica_config("dc-eu", "eu-west-1"))?;
1023
1024 primary.publish_upsert("v1".to_string(), vec![1.0, 0.0], HashMap::new());
1026 primary.publish_upsert("v2".to_string(), vec![0.0, 1.0], HashMap::new());
1027
1028 let entries = primary.get_entries_for_replica("dc-eu", 0);
1030 assert_eq!(entries.len(), 2);
1031 replica.receive_entries(entries);
1032 let applied = replica.apply_pending();
1033
1034 assert_eq!(applied, 2);
1035 assert_eq!(replica.vector_count(), 2);
1036 assert_eq!(replica.last_applied_seq(), 2);
1037
1038 let v1 = replica.get_vector("v1");
1040 assert!(v1.is_some());
1041 assert_eq!(v1.expect("test value").0, vec![1.0, 0.0]);
1042 Ok(())
1043 }
1044
1045 #[test]
1046 fn test_replica_apply_delete() -> Result<()> {
1047 let primary = PrimaryDcManager::new(make_primary_config())?;
1048 let replica = ReplicaDcManager::new(make_replica_config("dc-eu", "eu-west-1"))?;
1049 primary.add_replica("dc-eu".to_string(), "eu-west-1".to_string());
1050
1051 primary.publish_upsert("v1".to_string(), vec![1.0], HashMap::new());
1052 primary.publish_delete("v1".to_string());
1053
1054 let entries = primary.get_entries_for_replica("dc-eu", 0);
1055 replica.receive_entries(entries);
1056 replica.apply_pending();
1057
1058 assert_eq!(replica.vector_count(), 0);
1059 assert!(replica.get_vector("v1").is_none());
1060 Ok(())
1061 }
1062
1063 #[test]
1064 fn test_coordinator_replicate_once() -> Result<()> {
1065 let primary = Arc::new(PrimaryDcManager::new(make_primary_config())?);
1066 let replica = Arc::new(ReplicaDcManager::new(make_replica_config(
1067 "dc-eu",
1068 "eu-west-1",
1069 ))?);
1070
1071 let mut coordinator = CrossDcCoordinator::new(Arc::clone(&primary));
1072 coordinator.add_replica_node("dc-eu".to_string(), Arc::clone(&replica));
1073
1074 for i in 0..10 {
1076 primary.publish_upsert(format!("v{}", i), vec![i as f32], HashMap::new());
1077 }
1078
1079 let applied = coordinator.replicate_once()?;
1080 assert_eq!(applied.get("dc-eu"), Some(&10));
1081 assert_eq!(replica.vector_count(), 10);
1082 Ok(())
1083 }
1084
1085 #[test]
1086 fn test_coordinator_incremental_replication() -> Result<()> {
1087 let primary = Arc::new(PrimaryDcManager::new(make_primary_config())?);
1088 let replica = Arc::new(ReplicaDcManager::new(make_replica_config(
1089 "dc-ap",
1090 "ap-southeast-1",
1091 ))?);
1092
1093 let mut coordinator = CrossDcCoordinator::new(Arc::clone(&primary));
1094 coordinator.add_replica_node("dc-ap".to_string(), Arc::clone(&replica));
1095
1096 for i in 0..5 {
1098 primary.publish_upsert(format!("v{}", i), vec![i as f32], HashMap::new());
1099 }
1100 coordinator.replicate_once()?;
1101 assert_eq!(replica.vector_count(), 5);
1102
1103 for i in 5..10 {
1105 primary.publish_upsert(format!("v{}", i), vec![i as f32], HashMap::new());
1106 }
1107 coordinator.replicate_once()?;
1108 assert_eq!(replica.vector_count(), 10);
1109 Ok(())
1110 }
1111
1112 #[test]
1113 fn test_replication_health_healthy() -> Result<()> {
1114 let primary = Arc::new(PrimaryDcManager::new(make_primary_config())?);
1115 let replica = Arc::new(ReplicaDcManager::new(make_replica_config(
1116 "dc-eu",
1117 "eu-west-1",
1118 ))?);
1119
1120 let mut coordinator = CrossDcCoordinator::new(Arc::clone(&primary));
1121 coordinator.add_replica_node("dc-eu".to_string(), Arc::clone(&replica));
1122
1123 primary.publish_upsert("v1".to_string(), vec![1.0], HashMap::new());
1125 coordinator.replicate_once()?;
1126
1127 let health = coordinator.replication_health();
1128 assert_eq!(health.total_replicas, 1);
1129 assert!(health.is_healthy || health.max_lag_entries <= 1);
1131 Ok(())
1132 }
1133
1134 #[test]
1135 fn test_snapshot_operation() -> Result<()> {
1136 let _primary = Arc::new(PrimaryDcManager::new(make_primary_config())?);
1137 let replica = Arc::new(ReplicaDcManager::new(make_replica_config(
1138 "dc-eu",
1139 "eu-west-1",
1140 ))?);
1141
1142 let snapshot_entries = vec![
1144 ("v1".to_string(), vec![1.0, 0.0], HashMap::new()),
1145 ("v2".to_string(), vec![0.0, 1.0], HashMap::new()),
1146 ];
1147
1148 let snapshot_op = ReplicationOperation::Snapshot {
1149 entries: snapshot_entries,
1150 as_of_seq: 100,
1151 };
1152
1153 let entry = ReplicationEntry {
1154 seq: 1,
1155 source_dc: "dc-us-east".to_string(),
1156 timestamp_ms: 0,
1157 operation: snapshot_op,
1158 payload_bytes: 256,
1159 };
1160
1161 replica.receive_entries(vec![entry]);
1162 replica.apply_pending();
1163
1164 assert_eq!(replica.vector_count(), 2);
1165 Ok(())
1166 }
1167
1168 #[test]
1169 fn test_heartbeat_replication() -> Result<()> {
1170 let primary = Arc::new(PrimaryDcManager::new(make_primary_config())?);
1171 let replica = Arc::new(ReplicaDcManager::new(make_replica_config(
1172 "dc-eu",
1173 "eu-west-1",
1174 ))?);
1175
1176 let mut coordinator = CrossDcCoordinator::new(Arc::clone(&primary));
1177 coordinator.add_replica_node("dc-eu".to_string(), Arc::clone(&replica));
1178
1179 primary.publish_heartbeat();
1180 coordinator.replicate_once()?;
1181
1182 let stats = replica.get_stats();
1185 let _ = stats.total_entries;
1187 Ok(())
1188 }
1189
1190 #[test]
1191 fn test_acknowledge_replica() -> Result<()> {
1192 let manager = PrimaryDcManager::new(make_primary_config())?;
1193 manager.add_replica("dc-eu".to_string(), "eu-west-1".to_string());
1194
1195 for _ in 0..5 {
1196 manager.publish_upsert("v".to_string(), vec![1.0], HashMap::new());
1197 }
1198
1199 let result = manager.acknowledge_replica("dc-eu", 5, 500, 5);
1200 assert!(result.is_ok());
1201
1202 let status = manager.get_replica_status();
1203 assert!(!status.is_empty());
1204 let (_, status_val, acked, _) = &status[0];
1205 assert_eq!(*acked, 5);
1206 assert_eq!(*status_val, ReplicaStatus::Healthy);
1207 Ok(())
1208 }
1209
1210 #[test]
1211 fn test_acknowledge_unknown_replica_fails() -> Result<()> {
1212 let manager = PrimaryDcManager::new(make_primary_config())?;
1213 let result = manager.acknowledge_replica("unknown-dc", 1, 0, 0);
1214 assert!(result.is_err(), "Should fail for unknown replica");
1215 Ok(())
1216 }
1217
1218 #[test]
1219 fn test_record_replica_failure() -> Result<()> {
1220 let manager = PrimaryDcManager::new(make_primary_config())?;
1221 manager.add_replica("dc-eu".to_string(), "eu-west-1".to_string());
1222
1223 for _ in 0..6 {
1224 manager.record_replica_failure("dc-eu");
1225 }
1226
1227 let status = manager.get_replica_status();
1228 let (_, s, _, _) = &status[0];
1229 assert_eq!(*s, ReplicaStatus::Disconnected);
1230 Ok(())
1231 }
1232
1233 #[test]
1234 fn test_replica_search_similar() -> Result<()> {
1235 let primary = PrimaryDcManager::new(make_primary_config())?;
1236 let replica = ReplicaDcManager::new(make_replica_config("dc-eu", "eu-west-1"))?;
1237 primary.add_replica("dc-eu".to_string(), "eu-west-1".to_string());
1238
1239 primary.publish_upsert("v1".to_string(), vec![1.0, 0.0, 0.0], HashMap::new());
1240 primary.publish_upsert("v2".to_string(), vec![0.0, 1.0, 0.0], HashMap::new());
1241
1242 let entries = primary.get_entries_for_replica("dc-eu", 0);
1243 replica.receive_entries(entries);
1244 replica.apply_pending();
1245
1246 let results = replica.search_similar(&[1.0, 0.0, 0.0], 2);
1247 assert!(!results.is_empty());
1248 assert_eq!(results[0].0, "v1");
1249 Ok(())
1250 }
1251
1252 #[test]
1253 fn test_cross_dc_config_default() {
1254 let config = CrossDcConfig::default();
1255 assert!(config.is_primary);
1256 assert_eq!(config.compression_level, 3);
1257 assert!(config.conflict_detection);
1258 }
1259
1260 #[test]
1261 fn test_conflict_resolution_last_write_wins() -> Result<()> {
1262 let mut config = make_replica_config("dc-eu", "eu-west-1");
1263 config.conflict_resolution = ConflictResolutionStrategy::LastWriteWins;
1264 config.conflict_detection = true;
1265
1266 let replica = ReplicaDcManager::new(config)?;
1267
1268 {
1270 let mut local = replica.local_state.write();
1271 local.insert(
1272 "v1".to_string(),
1273 (vec![2.0], HashMap::new(), 100), );
1275 }
1276
1277 let entry = ReplicationEntry {
1279 seq: 1,
1280 source_dc: "dc-us-east".to_string(),
1281 timestamp_ms: 0,
1282 operation: ReplicationOperation::Upsert {
1283 vector_id: "v1".to_string(),
1284 vector: vec![1.0],
1285 metadata: HashMap::new(),
1286 },
1287 payload_bytes: 16,
1288 };
1289
1290 replica.receive_entries(vec![entry]);
1291 replica.apply_pending();
1292
1293 let v1 = replica.get_vector("v1");
1295 assert!(v1.is_some());
1296 assert_eq!(
1297 v1.expect("test value").0,
1298 vec![2.0],
1299 "Local version should be retained"
1300 );
1301 Ok(())
1302 }
1303
1304 #[test]
1305 fn test_conflict_resolution_primary_wins() -> Result<()> {
1306 let mut config = make_replica_config("dc-eu", "eu-west-1");
1307 config.conflict_resolution = ConflictResolutionStrategy::PrimaryWins;
1308 config.conflict_detection = true;
1309
1310 let replica = ReplicaDcManager::new(config)?;
1311
1312 {
1314 let mut local = replica.local_state.write();
1315 local.insert("v1".to_string(), (vec![2.0], HashMap::new(), 100));
1316 }
1317
1318 let entry = ReplicationEntry {
1320 seq: 1,
1321 source_dc: "dc-us-east".to_string(),
1322 timestamp_ms: 0,
1323 operation: ReplicationOperation::Upsert {
1324 vector_id: "v1".to_string(),
1325 vector: vec![1.0],
1326 metadata: HashMap::new(),
1327 },
1328 payload_bytes: 16,
1329 };
1330
1331 replica.receive_entries(vec![entry]);
1332 replica.apply_pending();
1333
1334 let v1 = replica.get_vector("v1");
1336 assert!(v1.is_some());
1337 assert_eq!(
1338 v1.expect("test value").0,
1339 vec![1.0],
1340 "Primary version should win"
1341 );
1342 Ok(())
1343 }
1344
1345 #[test]
1346 fn test_pending_buffer_tracking() -> Result<()> {
1347 let replica = ReplicaDcManager::new(make_replica_config("dc-eu", "eu-west-1"))?;
1348
1349 assert_eq!(replica.pending_count(), 0);
1350
1351 let entry = ReplicationEntry {
1352 seq: 1,
1353 source_dc: "dc-us".to_string(),
1354 timestamp_ms: 0,
1355 operation: ReplicationOperation::NoOp,
1356 payload_bytes: 0,
1357 };
1358
1359 let entry2 = ReplicationEntry {
1361 seq: 1,
1362 source_dc: "dc-us".to_string(),
1363 timestamp_ms: 0,
1364 operation: ReplicationOperation::Heartbeat { current_seq: 0 },
1365 payload_bytes: 0,
1366 };
1367
1368 replica.receive_entries(vec![entry, entry2]);
1369 assert_eq!(replica.pending_count(), 2);
1370 Ok(())
1371 }
1372
1373 #[test]
1374 fn test_max_lag_entries_calculation() -> Result<()> {
1375 let manager = PrimaryDcManager::new(make_primary_config())?;
1376 manager.add_replica("dc-eu".to_string(), "eu-west-1".to_string());
1377
1378 for _ in 0..20 {
1379 manager.publish_upsert("v".to_string(), vec![1.0], HashMap::new());
1380 }
1381
1382 let lag = manager.max_replica_lag_entries();
1383 assert_eq!(lag, 20, "Lag should be 20 entries");
1384 Ok(())
1385 }
1386
1387 #[test]
1388 fn test_replication_stats() -> Result<()> {
1389 let manager = PrimaryDcManager::new(make_primary_config())?;
1390
1391 for i in 0..5 {
1392 manager.publish_upsert(format!("v{}", i), vec![i as f32], HashMap::new());
1393 }
1394
1395 let stats = manager.get_stats();
1396 assert_eq!(stats.total_entries, 5);
1397 assert!(stats.total_bytes > 0);
1398 Ok(())
1399 }
1400}