1use anyhow::{anyhow, Result};
21use serde::{Deserialize, Serialize};
22use std::collections::HashMap;
23
24#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
34pub struct VectorEntry {
35 pub id: u64,
37 pub label: String,
39 pub vector: Vec<f32>,
41 pub metadata: HashMap<String, String>,
43 pub version: u64,
45}
46
47impl VectorEntry {
48 pub fn new(id: u64, vector: Vec<f32>, version: u64) -> Self {
50 Self {
51 id,
52 label: String::new(),
53 vector,
54 metadata: HashMap::new(),
55 version,
56 }
57 }
58
59 pub fn approx_bytes(&self) -> u64 {
61 let meta_bytes: u64 = self
62 .metadata
63 .iter()
64 .map(|(k, v)| (k.len() + v.len()) as u64)
65 .sum();
66 8 + 8 + self.label.len() as u64 + (self.vector.len() as u64 * 4) + meta_bytes
67 }
68}
69
70#[derive(Debug, Clone, Default, Serialize, Deserialize)]
78pub struct IndexSnapshot {
79 pub entries: HashMap<u64, VectorEntry>,
81 pub seq: u64,
83}
84
85impl IndexSnapshot {
86 pub fn new() -> Self {
88 Self::default()
89 }
90
91 pub fn from_entries(entries: Vec<VectorEntry>, seq: u64) -> Self {
93 Self {
94 entries: entries.into_iter().map(|e| (e.id, e)).collect(),
95 seq,
96 }
97 }
98
99 pub fn upsert(&mut self, entry: VectorEntry) {
101 self.entries.insert(entry.id, entry);
102 }
103
104 pub fn remove(&mut self, id: u64) -> bool {
106 self.entries.remove(&id).is_some()
107 }
108
109 pub fn get(&self, id: u64) -> Option<&VectorEntry> {
111 self.entries.get(&id)
112 }
113
114 pub fn len(&self) -> usize {
116 self.entries.len()
117 }
118
119 pub fn is_empty(&self) -> bool {
121 self.entries.is_empty()
122 }
123}
124
125#[derive(Debug, Clone, Default, Serialize, Deserialize)]
131pub struct IndexDelta {
132 pub added: Vec<VectorEntry>,
134 pub removed: Vec<u64>,
136 pub modified: Vec<VectorEntry>,
138}
139
140impl IndexDelta {
141 pub fn is_empty(&self) -> bool {
143 self.added.is_empty() && self.removed.is_empty() && self.modified.is_empty()
144 }
145
146 pub fn change_count(&self) -> usize {
148 self.added.len() + self.removed.len() + self.modified.len()
149 }
150}
151
152#[derive(Debug, Clone, Copy, Default)]
158pub struct DeltaSync;
159
160impl DeltaSync {
161 pub fn new() -> Self {
163 Self
164 }
165
166 pub fn compute_delta(
173 &self,
174 old_index: &IndexSnapshot,
175 new_index: &IndexSnapshot,
176 ) -> IndexDelta {
177 let mut added = Vec::new();
178 let mut removed = Vec::new();
179 let mut modified = Vec::new();
180
181 for (id, new_entry) in &new_index.entries {
183 match old_index.entries.get(id) {
184 None => added.push(new_entry.clone()),
185 Some(old_entry) => {
186 if new_entry.version > old_entry.version {
187 modified.push(new_entry.clone());
188 }
189 }
190 }
191 }
192
193 for id in old_index.entries.keys() {
195 if !new_index.entries.contains_key(id) {
196 removed.push(*id);
197 }
198 }
199
200 IndexDelta {
201 added,
202 removed,
203 modified,
204 }
205 }
206
207 pub fn apply_delta(&self, base: &mut IndexSnapshot, delta: &IndexDelta) -> Result<()> {
213 for entry in &delta.added {
214 base.upsert(entry.clone());
215 }
216 for entry in &delta.modified {
217 base.upsert(entry.clone());
218 }
219 for &id in &delta.removed {
220 if !base.remove(id) {
221 return Err(anyhow!(
222 "Delta removal of ID {} failed: entry not found in base snapshot",
223 id
224 ));
225 }
226 }
227 Ok(())
228 }
229
230 pub fn delta_size_bytes(&self, delta: &IndexDelta) -> u64 {
235 let added_bytes: u64 = delta.added.iter().map(|e| e.approx_bytes()).sum();
236 let modified_bytes: u64 = delta.modified.iter().map(|e| e.approx_bytes()).sum();
237 let removed_bytes: u64 = (delta.removed.len() as u64) * 8;
238 added_bytes + modified_bytes + removed_bytes
239 }
240}
241
242#[derive(Debug, Clone, Serialize, Deserialize)]
248pub struct ReplicationAlert {
249 pub dc_a: String,
251 pub dc_b: String,
253 pub measured_lag_ms: u64,
255 pub threshold_ms: u64,
257 pub message: String,
259}
260
261#[derive(Debug, Clone, Default)]
270pub struct ReplicationLag {
271 measurements_ms: HashMap<(String, String), u64>,
273}
274
275impl ReplicationLag {
276 pub fn new() -> Self {
278 Self::default()
279 }
280
281 pub fn record(&mut self, dc_a: impl Into<String>, dc_b: impl Into<String>, lag_ms: u64) {
283 self.measurements_ms
284 .insert((dc_a.into(), dc_b.into()), lag_ms);
285 }
286
287 pub fn lag_ms(&self, dc_a: &str, dc_b: &str) -> u64 {
290 self.measurements_ms
291 .get(&(dc_a.to_string(), dc_b.to_string()))
292 .copied()
293 .unwrap_or(0)
294 }
295
296 pub fn is_acceptable(&self, lag_ms: u64, sla_ms: u64) -> bool {
298 lag_ms <= sla_ms
299 }
300
301 pub fn alert_if_excessive(
304 &self,
305 dc_a: &str,
306 dc_b: &str,
307 lag_ms: u64,
308 threshold_ms: u64,
309 ) -> Option<ReplicationAlert> {
310 if lag_ms > threshold_ms {
311 Some(ReplicationAlert {
312 dc_a: dc_a.to_string(),
313 dc_b: dc_b.to_string(),
314 measured_lag_ms: lag_ms,
315 threshold_ms,
316 message: format!(
317 "Replication lag from {} to {} is {} ms, exceeding threshold {} ms",
318 dc_a, dc_b, lag_ms, threshold_ms
319 ),
320 })
321 } else {
322 None
323 }
324 }
325
326 pub fn check_and_alert(
329 &self,
330 dc_a: &str,
331 dc_b: &str,
332 threshold_ms: u64,
333 ) -> Option<ReplicationAlert> {
334 let lag = self.lag_ms(dc_a, dc_b);
335 self.alert_if_excessive(dc_a, dc_b, lag, threshold_ms)
336 }
337}
338
339#[cfg(test)]
344mod tests {
345 use super::*;
346 use anyhow::Result;
347
348 fn make_entry(id: u64, version: u64) -> VectorEntry {
349 VectorEntry::new(id, vec![id as f32, version as f32], version)
350 }
351
352 fn make_snapshot(entries: Vec<(u64, u64)>) -> IndexSnapshot {
353 let seq = entries.iter().map(|(_, v)| *v).max().unwrap_or(0);
354 IndexSnapshot::from_entries(
355 entries
356 .into_iter()
357 .map(|(id, ver)| make_entry(id, ver))
358 .collect(),
359 seq,
360 )
361 }
362
363 #[test]
366 fn test_snapshot_upsert_and_get() {
367 let mut snap = IndexSnapshot::new();
368 snap.upsert(make_entry(1, 1));
369 assert!(snap.get(1).is_some());
370 assert_eq!(snap.len(), 1);
371 }
372
373 #[test]
374 fn test_snapshot_remove_existing() {
375 let mut snap = make_snapshot(vec![(1, 1), (2, 1)]);
376 assert!(snap.remove(1));
377 assert_eq!(snap.len(), 1);
378 }
379
380 #[test]
381 fn test_snapshot_remove_nonexistent() {
382 let mut snap = IndexSnapshot::new();
383 assert!(!snap.remove(99));
384 }
385
386 #[test]
387 fn test_snapshot_is_empty() {
388 let snap = IndexSnapshot::new();
389 assert!(snap.is_empty());
390 }
391
392 #[test]
395 fn test_vector_entry_approx_bytes_basic() {
396 let e = make_entry(1, 1); let bytes = e.approx_bytes();
398 assert_eq!(bytes, 24);
400 }
401
402 #[test]
403 fn test_vector_entry_with_metadata_bytes() {
404 let mut e = make_entry(1, 1);
405 e.metadata.insert("key".into(), "value".into()); let bytes = e.approx_bytes();
407 assert_eq!(bytes, 32);
408 }
409
410 #[test]
413 fn test_compute_delta_empty_to_empty() {
414 let ds = DeltaSync::new();
415 let old = IndexSnapshot::new();
416 let new = IndexSnapshot::new();
417 let delta = ds.compute_delta(&old, &new);
418 assert!(delta.is_empty());
419 }
420
421 #[test]
422 fn test_compute_delta_all_added() {
423 let ds = DeltaSync::new();
424 let old = IndexSnapshot::new();
425 let new = make_snapshot(vec![(1, 1), (2, 1), (3, 1)]);
426 let delta = ds.compute_delta(&old, &new);
427 assert_eq!(delta.added.len(), 3);
428 assert!(delta.removed.is_empty());
429 assert!(delta.modified.is_empty());
430 }
431
432 #[test]
433 fn test_compute_delta_all_removed() {
434 let ds = DeltaSync::new();
435 let old = make_snapshot(vec![(1, 1), (2, 1)]);
436 let new = IndexSnapshot::new();
437 let delta = ds.compute_delta(&old, &new);
438 assert_eq!(delta.removed.len(), 2);
439 assert!(delta.added.is_empty());
440 assert!(delta.modified.is_empty());
441 }
442
443 #[test]
444 fn test_compute_delta_modifications() {
445 let ds = DeltaSync::new();
446 let old = make_snapshot(vec![(1, 1), (2, 1)]);
447 let new = make_snapshot(vec![(1, 2), (2, 1)]); let delta = ds.compute_delta(&old, &new);
449 assert_eq!(delta.modified.len(), 1);
450 assert_eq!(delta.modified[0].id, 1);
451 assert!(delta.added.is_empty());
452 assert!(delta.removed.is_empty());
453 }
454
455 #[test]
456 fn test_compute_delta_mixed() {
457 let ds = DeltaSync::new();
458 let old = make_snapshot(vec![(1, 1), (2, 1), (3, 1)]);
459 let new = make_snapshot(vec![(1, 1), (2, 2), (4, 1)]);
461 let delta = ds.compute_delta(&old, &new);
462 assert_eq!(delta.added.len(), 1); assert_eq!(delta.removed.len(), 1); assert_eq!(delta.modified.len(), 1); }
466
467 #[test]
468 fn test_compute_delta_no_change_no_diff() {
469 let ds = DeltaSync::new();
470 let snap = make_snapshot(vec![(1, 5), (2, 3)]);
471 let delta = ds.compute_delta(&snap, &snap);
472 assert!(delta.is_empty());
473 }
474
475 #[test]
478 fn test_apply_delta_add() -> Result<()> {
479 let ds = DeltaSync::new();
480 let mut base = IndexSnapshot::new();
481 let delta = IndexDelta {
482 added: vec![make_entry(1, 1)],
483 removed: vec![],
484 modified: vec![],
485 };
486 ds.apply_delta(&mut base, &delta)?;
487 assert!(base.get(1).is_some());
488 Ok(())
489 }
490
491 #[test]
492 fn test_apply_delta_remove() -> Result<()> {
493 let ds = DeltaSync::new();
494 let mut base = make_snapshot(vec![(1, 1), (2, 1)]);
495 let delta = IndexDelta {
496 added: vec![],
497 removed: vec![1],
498 modified: vec![],
499 };
500 ds.apply_delta(&mut base, &delta)?;
501 assert!(base.get(1).is_none());
502 assert_eq!(base.len(), 1);
503 Ok(())
504 }
505
506 #[test]
507 fn test_apply_delta_modify() -> Result<()> {
508 let ds = DeltaSync::new();
509 let mut base = make_snapshot(vec![(1, 1)]);
510 let updated = make_entry(1, 2);
511 let delta = IndexDelta {
512 added: vec![],
513 removed: vec![],
514 modified: vec![updated.clone()],
515 };
516 ds.apply_delta(&mut base, &delta)?;
517 assert_eq!(base.get(1).expect("test value").version, 2);
518 Ok(())
519 }
520
521 #[test]
522 fn test_apply_delta_remove_nonexistent_errors() {
523 let ds = DeltaSync::new();
524 let mut base = IndexSnapshot::new();
525 let delta = IndexDelta {
526 added: vec![],
527 removed: vec![99],
528 modified: vec![],
529 };
530 assert!(ds.apply_delta(&mut base, &delta).is_err());
531 }
532
533 #[test]
534 fn test_apply_delta_roundtrip() -> Result<()> {
535 let ds = DeltaSync::new();
536 let old = make_snapshot(vec![(1, 1), (2, 1), (3, 1)]);
537 let new = make_snapshot(vec![(1, 2), (2, 1), (4, 1)]);
538 let delta = ds.compute_delta(&old, &new);
539 let mut applied = old.clone();
540 ds.apply_delta(&mut applied, &delta)?;
541 assert_eq!(applied.len(), new.len());
543 for (id, entry) in &new.entries {
544 assert_eq!(applied.get(*id).map(|e| e.version), Some(entry.version));
545 }
546 Ok(())
547 }
548
549 #[test]
552 fn test_delta_size_bytes_empty() {
553 let ds = DeltaSync::new();
554 let delta = IndexDelta::default();
555 assert_eq!(ds.delta_size_bytes(&delta), 0);
556 }
557
558 #[test]
559 fn test_delta_size_bytes_removed_only() {
560 let ds = DeltaSync::new();
561 let delta = IndexDelta {
562 added: vec![],
563 removed: vec![1, 2, 3],
564 modified: vec![],
565 };
566 assert_eq!(ds.delta_size_bytes(&delta), 24); }
568
569 #[test]
570 fn test_delta_size_bytes_added() {
571 let ds = DeltaSync::new();
572 let entry = make_entry(1, 1); let expected = entry.approx_bytes();
574 let delta = IndexDelta {
575 added: vec![entry],
576 removed: vec![],
577 modified: vec![],
578 };
579 assert_eq!(ds.delta_size_bytes(&delta), expected);
580 }
581
582 #[test]
585 fn test_lag_ms_unknown_pair_is_zero() {
586 let lag = ReplicationLag::new();
587 assert_eq!(lag.lag_ms("dc-a", "dc-b"), 0);
588 }
589
590 #[test]
591 fn test_lag_ms_after_record() {
592 let mut lag = ReplicationLag::new();
593 lag.record("dc-a", "dc-b", 500);
594 assert_eq!(lag.lag_ms("dc-a", "dc-b"), 500);
595 }
596
597 #[test]
598 fn test_is_acceptable_within_sla() {
599 let lag = ReplicationLag::new();
600 assert!(lag.is_acceptable(100, 500));
601 }
602
603 #[test]
604 fn test_is_acceptable_equals_sla() {
605 let lag = ReplicationLag::new();
606 assert!(lag.is_acceptable(500, 500));
607 }
608
609 #[test]
610 fn test_is_acceptable_exceeds_sla() {
611 let lag = ReplicationLag::new();
612 assert!(!lag.is_acceptable(501, 500));
613 }
614
615 #[test]
616 fn test_alert_if_excessive_below_threshold() {
617 let lag = ReplicationLag::new();
618 let alert = lag.alert_if_excessive("dc-a", "dc-b", 100, 500);
619 assert!(alert.is_none());
620 }
621
622 #[test]
623 fn test_alert_if_excessive_above_threshold() -> Result<()> {
624 let lag = ReplicationLag::new();
625 let alert = lag.alert_if_excessive("dc-a", "dc-b", 1000, 500);
626 assert!(alert.is_some());
627 let a = alert.expect("alert was None");
628 assert_eq!(a.measured_lag_ms, 1000);
629 assert_eq!(a.threshold_ms, 500);
630 assert!(!a.message.is_empty());
631 Ok(())
632 }
633
634 #[test]
635 fn test_check_and_alert_uses_recorded_lag() -> Result<()> {
636 let mut lag = ReplicationLag::new();
637 lag.record("dc-a", "dc-b", 999);
638 let alert = lag.check_and_alert("dc-a", "dc-b", 500);
639 assert!(alert.is_some());
640 assert_eq!(alert.expect("test value").measured_lag_ms, 999);
641 Ok(())
642 }
643
644 #[test]
645 fn test_check_and_alert_no_alert_when_below() {
646 let mut lag = ReplicationLag::new();
647 lag.record("dc-a", "dc-b", 50);
648 let alert = lag.check_and_alert("dc-a", "dc-b", 500);
649 assert!(alert.is_none());
650 }
651
652 #[test]
653 fn test_delta_change_count() {
654 let delta = IndexDelta {
655 added: vec![make_entry(1, 1)],
656 removed: vec![2],
657 modified: vec![make_entry(3, 2), make_entry(4, 3)],
658 };
659 assert_eq!(delta.change_count(), 4);
660 }
661}