Skip to main content

oxirs_vec/distributed/
delta_sync.rs

1//! Efficient delta synchronisation for distributed vector indexes.
2//!
3//! Instead of shipping entire index snapshots on every sync cycle, this module
4//! computes and applies **deltas** — the minimal set of additions, removals and
5//! modifications required to transform one index snapshot into another.
6//!
7//! # Key Types
8//!
9//! - `IndexSnapshot` — an in-memory view of the current index state.
10//! - `VectorEntry` — a single vector with its ID, data and metadata.
11//! - `IndexDelta` — the minimal diff between two snapshots.
12//! - `DeltaSync` — stateless helper for computing and applying deltas.
13//! - `ReplicationLag` — measures and evaluates sync lag between datacenters.
14//! - `ReplicationAlert` — generated when lag exceeds a configured threshold.
15//!
16//! # Pure Rust Policy
17//!
18//! No CUDA runtime calls or FFI.
19
20use anyhow::{anyhow, Result};
21use serde::{Deserialize, Serialize};
22use std::collections::HashMap;
23
24// ============================================================
25// VectorEntry
26// ============================================================
27
28/// A single vector stored in the index.
29///
30/// Note: a `VectorEntry` type also exists in `raft_index`; this one is
31/// intentionally local to the delta-sync domain (it includes `version` for
32/// conflict detection).
33#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
34pub struct VectorEntry {
35    /// Globally unique vector identifier
36    pub id: u64,
37    /// Optional human-readable label
38    pub label: String,
39    /// The vector data
40    pub vector: Vec<f32>,
41    /// Arbitrary key-value metadata
42    pub metadata: HashMap<String, String>,
43    /// Monotonically increasing version counter (updated on every write)
44    pub version: u64,
45}
46
47impl VectorEntry {
48    /// Create a new entry with the given `id`, `vector` and `version`.
49    pub fn new(id: u64, vector: Vec<f32>, version: u64) -> Self {
50        Self {
51            id,
52            label: String::new(),
53            vector,
54            metadata: HashMap::new(),
55            version,
56        }
57    }
58
59    /// Approximate serialised byte size (id + version + label bytes + vector f32s + metadata).
60    pub fn approx_bytes(&self) -> u64 {
61        let meta_bytes: u64 = self
62            .metadata
63            .iter()
64            .map(|(k, v)| (k.len() + v.len()) as u64)
65            .sum();
66        8 + 8 + self.label.len() as u64 + (self.vector.len() as u64 * 4) + meta_bytes
67    }
68}
69
70// ============================================================
71// IndexSnapshot
72// ============================================================
73
74/// An immutable snapshot of the current index state.
75///
76/// Internally stored as a map from vector ID to `VectorEntry` for O(1) lookups.
77#[derive(Debug, Clone, Default, Serialize, Deserialize)]
78pub struct IndexSnapshot {
79    /// All entries in this snapshot, keyed by vector ID
80    pub entries: HashMap<u64, VectorEntry>,
81    /// Sequence number associated with this snapshot
82    pub seq: u64,
83}
84
85impl IndexSnapshot {
86    /// Create an empty snapshot at sequence 0.
87    pub fn new() -> Self {
88        Self::default()
89    }
90
91    /// Create a snapshot with the given entries and sequence number.
92    pub fn from_entries(entries: Vec<VectorEntry>, seq: u64) -> Self {
93        Self {
94            entries: entries.into_iter().map(|e| (e.id, e)).collect(),
95            seq,
96        }
97    }
98
99    /// Insert or update an entry.
100    pub fn upsert(&mut self, entry: VectorEntry) {
101        self.entries.insert(entry.id, entry);
102    }
103
104    /// Remove an entry by ID.  Returns `true` if the entry was present.
105    pub fn remove(&mut self, id: u64) -> bool {
106        self.entries.remove(&id).is_some()
107    }
108
109    /// Return the entry for the given ID, if present.
110    pub fn get(&self, id: u64) -> Option<&VectorEntry> {
111        self.entries.get(&id)
112    }
113
114    /// Number of entries in the snapshot.
115    pub fn len(&self) -> usize {
116        self.entries.len()
117    }
118
119    /// Returns `true` if the snapshot contains no entries.
120    pub fn is_empty(&self) -> bool {
121        self.entries.is_empty()
122    }
123}
124
125// ============================================================
126// IndexDelta
127// ============================================================
128
129/// The minimal diff between two index snapshots.
130#[derive(Debug, Clone, Default, Serialize, Deserialize)]
131pub struct IndexDelta {
132    /// Entries that are new or have been updated in the newer snapshot
133    pub added: Vec<VectorEntry>,
134    /// IDs of entries that have been removed in the newer snapshot
135    pub removed: Vec<u64>,
136    /// Entries whose vector data or metadata has changed
137    pub modified: Vec<VectorEntry>,
138}
139
140impl IndexDelta {
141    /// Returns `true` if the delta contains no changes.
142    pub fn is_empty(&self) -> bool {
143        self.added.is_empty() && self.removed.is_empty() && self.modified.is_empty()
144    }
145
146    /// Total number of change operations in this delta.
147    pub fn change_count(&self) -> usize {
148        self.added.len() + self.removed.len() + self.modified.len()
149    }
150}
151
152// ============================================================
153// DeltaSync
154// ============================================================
155
156/// Stateless helper for computing and applying index deltas.
157#[derive(Debug, Clone, Copy, Default)]
158pub struct DeltaSync;
159
160impl DeltaSync {
161    /// Create a new `DeltaSync` instance.
162    pub fn new() -> Self {
163        Self
164    }
165
166    /// Compute the delta required to transform `old_index` into `new_index`.
167    ///
168    /// An entry is considered **added** if it exists only in `new_index`.
169    /// An entry is **removed** if it exists only in `old_index`.
170    /// An entry is **modified** if it exists in both but the version number has
171    /// increased in `new_index`.
172    pub fn compute_delta(
173        &self,
174        old_index: &IndexSnapshot,
175        new_index: &IndexSnapshot,
176    ) -> IndexDelta {
177        let mut added = Vec::new();
178        let mut removed = Vec::new();
179        let mut modified = Vec::new();
180
181        // Entries in new that are absent or newer in old
182        for (id, new_entry) in &new_index.entries {
183            match old_index.entries.get(id) {
184                None => added.push(new_entry.clone()),
185                Some(old_entry) => {
186                    if new_entry.version > old_entry.version {
187                        modified.push(new_entry.clone());
188                    }
189                }
190            }
191        }
192
193        // Entries in old that are absent in new
194        for id in old_index.entries.keys() {
195            if !new_index.entries.contains_key(id) {
196                removed.push(*id);
197            }
198        }
199
200        IndexDelta {
201            added,
202            removed,
203            modified,
204        }
205    }
206
207    /// Apply `delta` to `base`, mutating it in place.
208    ///
209    /// Additions and modifications are upserted; removals are deleted.
210    /// Returns an error if a removal targets an ID that does not exist in `base`
211    /// (indicates a logic error in delta computation).
212    pub fn apply_delta(&self, base: &mut IndexSnapshot, delta: &IndexDelta) -> Result<()> {
213        for entry in &delta.added {
214            base.upsert(entry.clone());
215        }
216        for entry in &delta.modified {
217            base.upsert(entry.clone());
218        }
219        for &id in &delta.removed {
220            if !base.remove(id) {
221                return Err(anyhow!(
222                    "Delta removal of ID {} failed: entry not found in base snapshot",
223                    id
224                ));
225            }
226        }
227        Ok(())
228    }
229
230    /// Estimate the serialised byte size of a delta.
231    ///
232    /// Approximates: each removed ID costs 8 bytes; each added/modified entry
233    /// uses `VectorEntry::approx_bytes()`.
234    pub fn delta_size_bytes(&self, delta: &IndexDelta) -> u64 {
235        let added_bytes: u64 = delta.added.iter().map(|e| e.approx_bytes()).sum();
236        let modified_bytes: u64 = delta.modified.iter().map(|e| e.approx_bytes()).sum();
237        let removed_bytes: u64 = (delta.removed.len() as u64) * 8;
238        added_bytes + modified_bytes + removed_bytes
239    }
240}
241
242// ============================================================
243// ReplicationAlert
244// ============================================================
245
246/// Alert generated when replication lag exceeds a configured threshold.
247#[derive(Debug, Clone, Serialize, Deserialize)]
248pub struct ReplicationAlert {
249    /// Source datacenter
250    pub dc_a: String,
251    /// Destination datacenter
252    pub dc_b: String,
253    /// Measured lag in milliseconds
254    pub measured_lag_ms: u64,
255    /// The threshold that was exceeded
256    pub threshold_ms: u64,
257    /// Human-readable description
258    pub message: String,
259}
260
261// ============================================================
262// ReplicationLag
263// ============================================================
264
265/// Measures and evaluates replication lag between pairs of datacenters.
266///
267/// Lag measurements are stored in-memory; in production these would be
268/// populated from heartbeat timestamps.
269#[derive(Debug, Clone, Default)]
270pub struct ReplicationLag {
271    /// Measured lag per DC pair: key = (dc_a, dc_b) in sorted order
272    measurements_ms: HashMap<(String, String), u64>,
273}
274
275impl ReplicationLag {
276    /// Create a new lag tracker with no measurements.
277    pub fn new() -> Self {
278        Self::default()
279    }
280
281    /// Record a lag measurement between `dc_a` and `dc_b` (directional).
282    pub fn record(&mut self, dc_a: impl Into<String>, dc_b: impl Into<String>, lag_ms: u64) {
283        self.measurements_ms
284            .insert((dc_a.into(), dc_b.into()), lag_ms);
285    }
286
287    /// Retrieve the most recently recorded lag between `dc_a` and `dc_b` in
288    /// milliseconds.  Returns 0 if no measurement has been recorded.
289    pub fn lag_ms(&self, dc_a: &str, dc_b: &str) -> u64 {
290        self.measurements_ms
291            .get(&(dc_a.to_string(), dc_b.to_string()))
292            .copied()
293            .unwrap_or(0)
294    }
295
296    /// Returns `true` if `lag_ms` is within the acceptable `sla_ms` bound.
297    pub fn is_acceptable(&self, lag_ms: u64, sla_ms: u64) -> bool {
298        lag_ms <= sla_ms
299    }
300
301    /// Generate an alert if `lag_ms` exceeds `threshold_ms`, otherwise returns
302    /// `None`.
303    pub fn alert_if_excessive(
304        &self,
305        dc_a: &str,
306        dc_b: &str,
307        lag_ms: u64,
308        threshold_ms: u64,
309    ) -> Option<ReplicationAlert> {
310        if lag_ms > threshold_ms {
311            Some(ReplicationAlert {
312                dc_a: dc_a.to_string(),
313                dc_b: dc_b.to_string(),
314                measured_lag_ms: lag_ms,
315                threshold_ms,
316                message: format!(
317                    "Replication lag from {} to {} is {} ms, exceeding threshold {} ms",
318                    dc_a, dc_b, lag_ms, threshold_ms
319                ),
320            })
321        } else {
322            None
323        }
324    }
325
326    /// Check recorded lag between `dc_a` and `dc_b` against a threshold and
327    /// produce an alert if it is excessive.
328    pub fn check_and_alert(
329        &self,
330        dc_a: &str,
331        dc_b: &str,
332        threshold_ms: u64,
333    ) -> Option<ReplicationAlert> {
334        let lag = self.lag_ms(dc_a, dc_b);
335        self.alert_if_excessive(dc_a, dc_b, lag, threshold_ms)
336    }
337}
338
339// ============================================================
340// Tests
341// ============================================================
342
343#[cfg(test)]
344mod tests {
345    use super::*;
346
347    fn make_entry(id: u64, version: u64) -> VectorEntry {
348        VectorEntry::new(id, vec![id as f32, version as f32], version)
349    }
350
351    fn make_snapshot(entries: Vec<(u64, u64)>) -> IndexSnapshot {
352        let seq = entries.iter().map(|(_, v)| *v).max().unwrap_or(0);
353        IndexSnapshot::from_entries(
354            entries
355                .into_iter()
356                .map(|(id, ver)| make_entry(id, ver))
357                .collect(),
358            seq,
359        )
360    }
361
362    // ---- IndexSnapshot ----
363
364    #[test]
365    fn test_snapshot_upsert_and_get() {
366        let mut snap = IndexSnapshot::new();
367        snap.upsert(make_entry(1, 1));
368        assert!(snap.get(1).is_some());
369        assert_eq!(snap.len(), 1);
370    }
371
372    #[test]
373    fn test_snapshot_remove_existing() {
374        let mut snap = make_snapshot(vec![(1, 1), (2, 1)]);
375        assert!(snap.remove(1));
376        assert_eq!(snap.len(), 1);
377    }
378
379    #[test]
380    fn test_snapshot_remove_nonexistent() {
381        let mut snap = IndexSnapshot::new();
382        assert!(!snap.remove(99));
383    }
384
385    #[test]
386    fn test_snapshot_is_empty() {
387        let snap = IndexSnapshot::new();
388        assert!(snap.is_empty());
389    }
390
391    // ---- VectorEntry ----
392
393    #[test]
394    fn test_vector_entry_approx_bytes_basic() {
395        let e = make_entry(1, 1); // 2-element f32 vector
396        let bytes = e.approx_bytes();
397        // 8 (id) + 8 (version) + 0 (empty label) + 8 (2 * 4) + 0 (no meta) = 24
398        assert_eq!(bytes, 24);
399    }
400
401    #[test]
402    fn test_vector_entry_with_metadata_bytes() {
403        let mut e = make_entry(1, 1);
404        e.metadata.insert("key".into(), "value".into()); // 3 + 5 = 8 bytes
405        let bytes = e.approx_bytes();
406        assert_eq!(bytes, 32);
407    }
408
409    // ---- DeltaSync::compute_delta ----
410
411    #[test]
412    fn test_compute_delta_empty_to_empty() {
413        let ds = DeltaSync::new();
414        let old = IndexSnapshot::new();
415        let new = IndexSnapshot::new();
416        let delta = ds.compute_delta(&old, &new);
417        assert!(delta.is_empty());
418    }
419
420    #[test]
421    fn test_compute_delta_all_added() {
422        let ds = DeltaSync::new();
423        let old = IndexSnapshot::new();
424        let new = make_snapshot(vec![(1, 1), (2, 1), (3, 1)]);
425        let delta = ds.compute_delta(&old, &new);
426        assert_eq!(delta.added.len(), 3);
427        assert!(delta.removed.is_empty());
428        assert!(delta.modified.is_empty());
429    }
430
431    #[test]
432    fn test_compute_delta_all_removed() {
433        let ds = DeltaSync::new();
434        let old = make_snapshot(vec![(1, 1), (2, 1)]);
435        let new = IndexSnapshot::new();
436        let delta = ds.compute_delta(&old, &new);
437        assert_eq!(delta.removed.len(), 2);
438        assert!(delta.added.is_empty());
439        assert!(delta.modified.is_empty());
440    }
441
442    #[test]
443    fn test_compute_delta_modifications() {
444        let ds = DeltaSync::new();
445        let old = make_snapshot(vec![(1, 1), (2, 1)]);
446        let new = make_snapshot(vec![(1, 2), (2, 1)]); // entry 1 updated
447        let delta = ds.compute_delta(&old, &new);
448        assert_eq!(delta.modified.len(), 1);
449        assert_eq!(delta.modified[0].id, 1);
450        assert!(delta.added.is_empty());
451        assert!(delta.removed.is_empty());
452    }
453
454    #[test]
455    fn test_compute_delta_mixed() {
456        let ds = DeltaSync::new();
457        let old = make_snapshot(vec![(1, 1), (2, 1), (3, 1)]);
458        // entry 2 updated, entry 3 removed, entry 4 added
459        let new = make_snapshot(vec![(1, 1), (2, 2), (4, 1)]);
460        let delta = ds.compute_delta(&old, &new);
461        assert_eq!(delta.added.len(), 1); // entry 4
462        assert_eq!(delta.removed.len(), 1); // entry 3
463        assert_eq!(delta.modified.len(), 1); // entry 2
464    }
465
466    #[test]
467    fn test_compute_delta_no_change_no_diff() {
468        let ds = DeltaSync::new();
469        let snap = make_snapshot(vec![(1, 5), (2, 3)]);
470        let delta = ds.compute_delta(&snap, &snap);
471        assert!(delta.is_empty());
472    }
473
474    // ---- DeltaSync::apply_delta ----
475
476    #[test]
477    fn test_apply_delta_add() {
478        let ds = DeltaSync::new();
479        let mut base = IndexSnapshot::new();
480        let delta = IndexDelta {
481            added: vec![make_entry(1, 1)],
482            removed: vec![],
483            modified: vec![],
484        };
485        ds.apply_delta(&mut base, &delta).unwrap();
486        assert!(base.get(1).is_some());
487    }
488
489    #[test]
490    fn test_apply_delta_remove() {
491        let ds = DeltaSync::new();
492        let mut base = make_snapshot(vec![(1, 1), (2, 1)]);
493        let delta = IndexDelta {
494            added: vec![],
495            removed: vec![1],
496            modified: vec![],
497        };
498        ds.apply_delta(&mut base, &delta).unwrap();
499        assert!(base.get(1).is_none());
500        assert_eq!(base.len(), 1);
501    }
502
503    #[test]
504    fn test_apply_delta_modify() {
505        let ds = DeltaSync::new();
506        let mut base = make_snapshot(vec![(1, 1)]);
507        let updated = make_entry(1, 2);
508        let delta = IndexDelta {
509            added: vec![],
510            removed: vec![],
511            modified: vec![updated.clone()],
512        };
513        ds.apply_delta(&mut base, &delta).unwrap();
514        assert_eq!(base.get(1).unwrap().version, 2);
515    }
516
517    #[test]
518    fn test_apply_delta_remove_nonexistent_errors() {
519        let ds = DeltaSync::new();
520        let mut base = IndexSnapshot::new();
521        let delta = IndexDelta {
522            added: vec![],
523            removed: vec![99],
524            modified: vec![],
525        };
526        assert!(ds.apply_delta(&mut base, &delta).is_err());
527    }
528
529    #[test]
530    fn test_apply_delta_roundtrip() {
531        let ds = DeltaSync::new();
532        let old = make_snapshot(vec![(1, 1), (2, 1), (3, 1)]);
533        let new = make_snapshot(vec![(1, 2), (2, 1), (4, 1)]);
534        let delta = ds.compute_delta(&old, &new);
535        let mut applied = old.clone();
536        ds.apply_delta(&mut applied, &delta).unwrap();
537        // applied should now match new
538        assert_eq!(applied.len(), new.len());
539        for (id, entry) in &new.entries {
540            assert_eq!(applied.get(*id).map(|e| e.version), Some(entry.version));
541        }
542    }
543
544    // ---- DeltaSync::delta_size_bytes ----
545
546    #[test]
547    fn test_delta_size_bytes_empty() {
548        let ds = DeltaSync::new();
549        let delta = IndexDelta::default();
550        assert_eq!(ds.delta_size_bytes(&delta), 0);
551    }
552
553    #[test]
554    fn test_delta_size_bytes_removed_only() {
555        let ds = DeltaSync::new();
556        let delta = IndexDelta {
557            added: vec![],
558            removed: vec![1, 2, 3],
559            modified: vec![],
560        };
561        assert_eq!(ds.delta_size_bytes(&delta), 24); // 3 * 8 bytes
562    }
563
564    #[test]
565    fn test_delta_size_bytes_added() {
566        let ds = DeltaSync::new();
567        let entry = make_entry(1, 1); // 2-element vector => 24 bytes
568        let expected = entry.approx_bytes();
569        let delta = IndexDelta {
570            added: vec![entry],
571            removed: vec![],
572            modified: vec![],
573        };
574        assert_eq!(ds.delta_size_bytes(&delta), expected);
575    }
576
577    // ---- ReplicationLag ----
578
579    #[test]
580    fn test_lag_ms_unknown_pair_is_zero() {
581        let lag = ReplicationLag::new();
582        assert_eq!(lag.lag_ms("dc-a", "dc-b"), 0);
583    }
584
585    #[test]
586    fn test_lag_ms_after_record() {
587        let mut lag = ReplicationLag::new();
588        lag.record("dc-a", "dc-b", 500);
589        assert_eq!(lag.lag_ms("dc-a", "dc-b"), 500);
590    }
591
592    #[test]
593    fn test_is_acceptable_within_sla() {
594        let lag = ReplicationLag::new();
595        assert!(lag.is_acceptable(100, 500));
596    }
597
598    #[test]
599    fn test_is_acceptable_equals_sla() {
600        let lag = ReplicationLag::new();
601        assert!(lag.is_acceptable(500, 500));
602    }
603
604    #[test]
605    fn test_is_acceptable_exceeds_sla() {
606        let lag = ReplicationLag::new();
607        assert!(!lag.is_acceptable(501, 500));
608    }
609
610    #[test]
611    fn test_alert_if_excessive_below_threshold() {
612        let lag = ReplicationLag::new();
613        let alert = lag.alert_if_excessive("dc-a", "dc-b", 100, 500);
614        assert!(alert.is_none());
615    }
616
617    #[test]
618    fn test_alert_if_excessive_above_threshold() {
619        let lag = ReplicationLag::new();
620        let alert = lag.alert_if_excessive("dc-a", "dc-b", 1000, 500);
621        assert!(alert.is_some());
622        let a = alert.unwrap();
623        assert_eq!(a.measured_lag_ms, 1000);
624        assert_eq!(a.threshold_ms, 500);
625        assert!(!a.message.is_empty());
626    }
627
628    #[test]
629    fn test_check_and_alert_uses_recorded_lag() {
630        let mut lag = ReplicationLag::new();
631        lag.record("dc-a", "dc-b", 999);
632        let alert = lag.check_and_alert("dc-a", "dc-b", 500);
633        assert!(alert.is_some());
634        assert_eq!(alert.unwrap().measured_lag_ms, 999);
635    }
636
637    #[test]
638    fn test_check_and_alert_no_alert_when_below() {
639        let mut lag = ReplicationLag::new();
640        lag.record("dc-a", "dc-b", 50);
641        let alert = lag.check_and_alert("dc-a", "dc-b", 500);
642        assert!(alert.is_none());
643    }
644
645    #[test]
646    fn test_delta_change_count() {
647        let delta = IndexDelta {
648            added: vec![make_entry(1, 1)],
649            removed: vec![2],
650            modified: vec![make_entry(3, 2), make_entry(4, 3)],
651        };
652        assert_eq!(delta.change_count(), 4);
653    }
654}