1use anyhow::{anyhow, Result};
21use serde::{Deserialize, Serialize};
22use std::collections::HashMap;
23
24#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
34pub struct VectorEntry {
35 pub id: u64,
37 pub label: String,
39 pub vector: Vec<f32>,
41 pub metadata: HashMap<String, String>,
43 pub version: u64,
45}
46
47impl VectorEntry {
48 pub fn new(id: u64, vector: Vec<f32>, version: u64) -> Self {
50 Self {
51 id,
52 label: String::new(),
53 vector,
54 metadata: HashMap::new(),
55 version,
56 }
57 }
58
59 pub fn approx_bytes(&self) -> u64 {
61 let meta_bytes: u64 = self
62 .metadata
63 .iter()
64 .map(|(k, v)| (k.len() + v.len()) as u64)
65 .sum();
66 8 + 8 + self.label.len() as u64 + (self.vector.len() as u64 * 4) + meta_bytes
67 }
68}
69
70#[derive(Debug, Clone, Default, Serialize, Deserialize)]
78pub struct IndexSnapshot {
79 pub entries: HashMap<u64, VectorEntry>,
81 pub seq: u64,
83}
84
85impl IndexSnapshot {
86 pub fn new() -> Self {
88 Self::default()
89 }
90
91 pub fn from_entries(entries: Vec<VectorEntry>, seq: u64) -> Self {
93 Self {
94 entries: entries.into_iter().map(|e| (e.id, e)).collect(),
95 seq,
96 }
97 }
98
99 pub fn upsert(&mut self, entry: VectorEntry) {
101 self.entries.insert(entry.id, entry);
102 }
103
104 pub fn remove(&mut self, id: u64) -> bool {
106 self.entries.remove(&id).is_some()
107 }
108
109 pub fn get(&self, id: u64) -> Option<&VectorEntry> {
111 self.entries.get(&id)
112 }
113
114 pub fn len(&self) -> usize {
116 self.entries.len()
117 }
118
119 pub fn is_empty(&self) -> bool {
121 self.entries.is_empty()
122 }
123}
124
125#[derive(Debug, Clone, Default, Serialize, Deserialize)]
131pub struct IndexDelta {
132 pub added: Vec<VectorEntry>,
134 pub removed: Vec<u64>,
136 pub modified: Vec<VectorEntry>,
138}
139
140impl IndexDelta {
141 pub fn is_empty(&self) -> bool {
143 self.added.is_empty() && self.removed.is_empty() && self.modified.is_empty()
144 }
145
146 pub fn change_count(&self) -> usize {
148 self.added.len() + self.removed.len() + self.modified.len()
149 }
150}
151
152#[derive(Debug, Clone, Copy, Default)]
158pub struct DeltaSync;
159
160impl DeltaSync {
161 pub fn new() -> Self {
163 Self
164 }
165
166 pub fn compute_delta(
173 &self,
174 old_index: &IndexSnapshot,
175 new_index: &IndexSnapshot,
176 ) -> IndexDelta {
177 let mut added = Vec::new();
178 let mut removed = Vec::new();
179 let mut modified = Vec::new();
180
181 for (id, new_entry) in &new_index.entries {
183 match old_index.entries.get(id) {
184 None => added.push(new_entry.clone()),
185 Some(old_entry) => {
186 if new_entry.version > old_entry.version {
187 modified.push(new_entry.clone());
188 }
189 }
190 }
191 }
192
193 for id in old_index.entries.keys() {
195 if !new_index.entries.contains_key(id) {
196 removed.push(*id);
197 }
198 }
199
200 IndexDelta {
201 added,
202 removed,
203 modified,
204 }
205 }
206
207 pub fn apply_delta(&self, base: &mut IndexSnapshot, delta: &IndexDelta) -> Result<()> {
213 for entry in &delta.added {
214 base.upsert(entry.clone());
215 }
216 for entry in &delta.modified {
217 base.upsert(entry.clone());
218 }
219 for &id in &delta.removed {
220 if !base.remove(id) {
221 return Err(anyhow!(
222 "Delta removal of ID {} failed: entry not found in base snapshot",
223 id
224 ));
225 }
226 }
227 Ok(())
228 }
229
230 pub fn delta_size_bytes(&self, delta: &IndexDelta) -> u64 {
235 let added_bytes: u64 = delta.added.iter().map(|e| e.approx_bytes()).sum();
236 let modified_bytes: u64 = delta.modified.iter().map(|e| e.approx_bytes()).sum();
237 let removed_bytes: u64 = (delta.removed.len() as u64) * 8;
238 added_bytes + modified_bytes + removed_bytes
239 }
240}
241
242#[derive(Debug, Clone, Serialize, Deserialize)]
248pub struct ReplicationAlert {
249 pub dc_a: String,
251 pub dc_b: String,
253 pub measured_lag_ms: u64,
255 pub threshold_ms: u64,
257 pub message: String,
259}
260
261#[derive(Debug, Clone, Default)]
270pub struct ReplicationLag {
271 measurements_ms: HashMap<(String, String), u64>,
273}
274
275impl ReplicationLag {
276 pub fn new() -> Self {
278 Self::default()
279 }
280
281 pub fn record(&mut self, dc_a: impl Into<String>, dc_b: impl Into<String>, lag_ms: u64) {
283 self.measurements_ms
284 .insert((dc_a.into(), dc_b.into()), lag_ms);
285 }
286
287 pub fn lag_ms(&self, dc_a: &str, dc_b: &str) -> u64 {
290 self.measurements_ms
291 .get(&(dc_a.to_string(), dc_b.to_string()))
292 .copied()
293 .unwrap_or(0)
294 }
295
296 pub fn is_acceptable(&self, lag_ms: u64, sla_ms: u64) -> bool {
298 lag_ms <= sla_ms
299 }
300
301 pub fn alert_if_excessive(
304 &self,
305 dc_a: &str,
306 dc_b: &str,
307 lag_ms: u64,
308 threshold_ms: u64,
309 ) -> Option<ReplicationAlert> {
310 if lag_ms > threshold_ms {
311 Some(ReplicationAlert {
312 dc_a: dc_a.to_string(),
313 dc_b: dc_b.to_string(),
314 measured_lag_ms: lag_ms,
315 threshold_ms,
316 message: format!(
317 "Replication lag from {} to {} is {} ms, exceeding threshold {} ms",
318 dc_a, dc_b, lag_ms, threshold_ms
319 ),
320 })
321 } else {
322 None
323 }
324 }
325
326 pub fn check_and_alert(
329 &self,
330 dc_a: &str,
331 dc_b: &str,
332 threshold_ms: u64,
333 ) -> Option<ReplicationAlert> {
334 let lag = self.lag_ms(dc_a, dc_b);
335 self.alert_if_excessive(dc_a, dc_b, lag, threshold_ms)
336 }
337}
338
339#[cfg(test)]
344mod tests {
345 use super::*;
346
347 fn make_entry(id: u64, version: u64) -> VectorEntry {
348 VectorEntry::new(id, vec![id as f32, version as f32], version)
349 }
350
351 fn make_snapshot(entries: Vec<(u64, u64)>) -> IndexSnapshot {
352 let seq = entries.iter().map(|(_, v)| *v).max().unwrap_or(0);
353 IndexSnapshot::from_entries(
354 entries
355 .into_iter()
356 .map(|(id, ver)| make_entry(id, ver))
357 .collect(),
358 seq,
359 )
360 }
361
362 #[test]
365 fn test_snapshot_upsert_and_get() {
366 let mut snap = IndexSnapshot::new();
367 snap.upsert(make_entry(1, 1));
368 assert!(snap.get(1).is_some());
369 assert_eq!(snap.len(), 1);
370 }
371
372 #[test]
373 fn test_snapshot_remove_existing() {
374 let mut snap = make_snapshot(vec![(1, 1), (2, 1)]);
375 assert!(snap.remove(1));
376 assert_eq!(snap.len(), 1);
377 }
378
379 #[test]
380 fn test_snapshot_remove_nonexistent() {
381 let mut snap = IndexSnapshot::new();
382 assert!(!snap.remove(99));
383 }
384
385 #[test]
386 fn test_snapshot_is_empty() {
387 let snap = IndexSnapshot::new();
388 assert!(snap.is_empty());
389 }
390
391 #[test]
394 fn test_vector_entry_approx_bytes_basic() {
395 let e = make_entry(1, 1); let bytes = e.approx_bytes();
397 assert_eq!(bytes, 24);
399 }
400
401 #[test]
402 fn test_vector_entry_with_metadata_bytes() {
403 let mut e = make_entry(1, 1);
404 e.metadata.insert("key".into(), "value".into()); let bytes = e.approx_bytes();
406 assert_eq!(bytes, 32);
407 }
408
409 #[test]
412 fn test_compute_delta_empty_to_empty() {
413 let ds = DeltaSync::new();
414 let old = IndexSnapshot::new();
415 let new = IndexSnapshot::new();
416 let delta = ds.compute_delta(&old, &new);
417 assert!(delta.is_empty());
418 }
419
420 #[test]
421 fn test_compute_delta_all_added() {
422 let ds = DeltaSync::new();
423 let old = IndexSnapshot::new();
424 let new = make_snapshot(vec![(1, 1), (2, 1), (3, 1)]);
425 let delta = ds.compute_delta(&old, &new);
426 assert_eq!(delta.added.len(), 3);
427 assert!(delta.removed.is_empty());
428 assert!(delta.modified.is_empty());
429 }
430
431 #[test]
432 fn test_compute_delta_all_removed() {
433 let ds = DeltaSync::new();
434 let old = make_snapshot(vec![(1, 1), (2, 1)]);
435 let new = IndexSnapshot::new();
436 let delta = ds.compute_delta(&old, &new);
437 assert_eq!(delta.removed.len(), 2);
438 assert!(delta.added.is_empty());
439 assert!(delta.modified.is_empty());
440 }
441
442 #[test]
443 fn test_compute_delta_modifications() {
444 let ds = DeltaSync::new();
445 let old = make_snapshot(vec![(1, 1), (2, 1)]);
446 let new = make_snapshot(vec![(1, 2), (2, 1)]); let delta = ds.compute_delta(&old, &new);
448 assert_eq!(delta.modified.len(), 1);
449 assert_eq!(delta.modified[0].id, 1);
450 assert!(delta.added.is_empty());
451 assert!(delta.removed.is_empty());
452 }
453
454 #[test]
455 fn test_compute_delta_mixed() {
456 let ds = DeltaSync::new();
457 let old = make_snapshot(vec![(1, 1), (2, 1), (3, 1)]);
458 let new = make_snapshot(vec![(1, 1), (2, 2), (4, 1)]);
460 let delta = ds.compute_delta(&old, &new);
461 assert_eq!(delta.added.len(), 1); assert_eq!(delta.removed.len(), 1); assert_eq!(delta.modified.len(), 1); }
465
466 #[test]
467 fn test_compute_delta_no_change_no_diff() {
468 let ds = DeltaSync::new();
469 let snap = make_snapshot(vec![(1, 5), (2, 3)]);
470 let delta = ds.compute_delta(&snap, &snap);
471 assert!(delta.is_empty());
472 }
473
474 #[test]
477 fn test_apply_delta_add() {
478 let ds = DeltaSync::new();
479 let mut base = IndexSnapshot::new();
480 let delta = IndexDelta {
481 added: vec![make_entry(1, 1)],
482 removed: vec![],
483 modified: vec![],
484 };
485 ds.apply_delta(&mut base, &delta).unwrap();
486 assert!(base.get(1).is_some());
487 }
488
489 #[test]
490 fn test_apply_delta_remove() {
491 let ds = DeltaSync::new();
492 let mut base = make_snapshot(vec![(1, 1), (2, 1)]);
493 let delta = IndexDelta {
494 added: vec![],
495 removed: vec![1],
496 modified: vec![],
497 };
498 ds.apply_delta(&mut base, &delta).unwrap();
499 assert!(base.get(1).is_none());
500 assert_eq!(base.len(), 1);
501 }
502
503 #[test]
504 fn test_apply_delta_modify() {
505 let ds = DeltaSync::new();
506 let mut base = make_snapshot(vec![(1, 1)]);
507 let updated = make_entry(1, 2);
508 let delta = IndexDelta {
509 added: vec![],
510 removed: vec![],
511 modified: vec![updated.clone()],
512 };
513 ds.apply_delta(&mut base, &delta).unwrap();
514 assert_eq!(base.get(1).unwrap().version, 2);
515 }
516
517 #[test]
518 fn test_apply_delta_remove_nonexistent_errors() {
519 let ds = DeltaSync::new();
520 let mut base = IndexSnapshot::new();
521 let delta = IndexDelta {
522 added: vec![],
523 removed: vec![99],
524 modified: vec![],
525 };
526 assert!(ds.apply_delta(&mut base, &delta).is_err());
527 }
528
529 #[test]
530 fn test_apply_delta_roundtrip() {
531 let ds = DeltaSync::new();
532 let old = make_snapshot(vec![(1, 1), (2, 1), (3, 1)]);
533 let new = make_snapshot(vec![(1, 2), (2, 1), (4, 1)]);
534 let delta = ds.compute_delta(&old, &new);
535 let mut applied = old.clone();
536 ds.apply_delta(&mut applied, &delta).unwrap();
537 assert_eq!(applied.len(), new.len());
539 for (id, entry) in &new.entries {
540 assert_eq!(applied.get(*id).map(|e| e.version), Some(entry.version));
541 }
542 }
543
544 #[test]
547 fn test_delta_size_bytes_empty() {
548 let ds = DeltaSync::new();
549 let delta = IndexDelta::default();
550 assert_eq!(ds.delta_size_bytes(&delta), 0);
551 }
552
553 #[test]
554 fn test_delta_size_bytes_removed_only() {
555 let ds = DeltaSync::new();
556 let delta = IndexDelta {
557 added: vec![],
558 removed: vec![1, 2, 3],
559 modified: vec![],
560 };
561 assert_eq!(ds.delta_size_bytes(&delta), 24); }
563
564 #[test]
565 fn test_delta_size_bytes_added() {
566 let ds = DeltaSync::new();
567 let entry = make_entry(1, 1); let expected = entry.approx_bytes();
569 let delta = IndexDelta {
570 added: vec![entry],
571 removed: vec![],
572 modified: vec![],
573 };
574 assert_eq!(ds.delta_size_bytes(&delta), expected);
575 }
576
577 #[test]
580 fn test_lag_ms_unknown_pair_is_zero() {
581 let lag = ReplicationLag::new();
582 assert_eq!(lag.lag_ms("dc-a", "dc-b"), 0);
583 }
584
585 #[test]
586 fn test_lag_ms_after_record() {
587 let mut lag = ReplicationLag::new();
588 lag.record("dc-a", "dc-b", 500);
589 assert_eq!(lag.lag_ms("dc-a", "dc-b"), 500);
590 }
591
592 #[test]
593 fn test_is_acceptable_within_sla() {
594 let lag = ReplicationLag::new();
595 assert!(lag.is_acceptable(100, 500));
596 }
597
598 #[test]
599 fn test_is_acceptable_equals_sla() {
600 let lag = ReplicationLag::new();
601 assert!(lag.is_acceptable(500, 500));
602 }
603
604 #[test]
605 fn test_is_acceptable_exceeds_sla() {
606 let lag = ReplicationLag::new();
607 assert!(!lag.is_acceptable(501, 500));
608 }
609
610 #[test]
611 fn test_alert_if_excessive_below_threshold() {
612 let lag = ReplicationLag::new();
613 let alert = lag.alert_if_excessive("dc-a", "dc-b", 100, 500);
614 assert!(alert.is_none());
615 }
616
617 #[test]
618 fn test_alert_if_excessive_above_threshold() {
619 let lag = ReplicationLag::new();
620 let alert = lag.alert_if_excessive("dc-a", "dc-b", 1000, 500);
621 assert!(alert.is_some());
622 let a = alert.unwrap();
623 assert_eq!(a.measured_lag_ms, 1000);
624 assert_eq!(a.threshold_ms, 500);
625 assert!(!a.message.is_empty());
626 }
627
628 #[test]
629 fn test_check_and_alert_uses_recorded_lag() {
630 let mut lag = ReplicationLag::new();
631 lag.record("dc-a", "dc-b", 999);
632 let alert = lag.check_and_alert("dc-a", "dc-b", 500);
633 assert!(alert.is_some());
634 assert_eq!(alert.unwrap().measured_lag_ms, 999);
635 }
636
637 #[test]
638 fn test_check_and_alert_no_alert_when_below() {
639 let mut lag = ReplicationLag::new();
640 lag.record("dc-a", "dc-b", 50);
641 let alert = lag.check_and_alert("dc-a", "dc-b", 500);
642 assert!(alert.is_none());
643 }
644
645 #[test]
646 fn test_delta_change_count() {
647 let delta = IndexDelta {
648 added: vec![make_entry(1, 1)],
649 removed: vec![2],
650 modified: vec![make_entry(3, 2), make_entry(4, 3)],
651 };
652 assert_eq!(delta.change_count(), 4);
653 }
654}