Skip to main content

oxirs_vec/distributed/
delta_sync.rs

1//! Efficient delta synchronisation for distributed vector indexes.
2//!
3//! Instead of shipping entire index snapshots on every sync cycle, this module
4//! computes and applies **deltas** — the minimal set of additions, removals and
5//! modifications required to transform one index snapshot into another.
6//!
7//! # Key Types
8//!
9//! - `IndexSnapshot` — an in-memory view of the current index state.
10//! - `VectorEntry` — a single vector with its ID, data and metadata.
11//! - `IndexDelta` — the minimal diff between two snapshots.
12//! - `DeltaSync` — stateless helper for computing and applying deltas.
13//! - `ReplicationLag` — measures and evaluates sync lag between datacenters.
14//! - `ReplicationAlert` — generated when lag exceeds a configured threshold.
15//!
16//! # Pure Rust Policy
17//!
18//! No CUDA runtime calls or FFI.
19
20use anyhow::{anyhow, Result};
21use serde::{Deserialize, Serialize};
22use std::collections::HashMap;
23
24// ============================================================
25// VectorEntry
26// ============================================================
27
28/// A single vector stored in the index.
29///
30/// Note: a `VectorEntry` type also exists in `raft_index`; this one is
31/// intentionally local to the delta-sync domain (it includes `version` for
32/// conflict detection).
33#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
34pub struct VectorEntry {
35    /// Globally unique vector identifier
36    pub id: u64,
37    /// Optional human-readable label
38    pub label: String,
39    /// The vector data
40    pub vector: Vec<f32>,
41    /// Arbitrary key-value metadata
42    pub metadata: HashMap<String, String>,
43    /// Monotonically increasing version counter (updated on every write)
44    pub version: u64,
45}
46
47impl VectorEntry {
48    /// Create a new entry with the given `id`, `vector` and `version`.
49    pub fn new(id: u64, vector: Vec<f32>, version: u64) -> Self {
50        Self {
51            id,
52            label: String::new(),
53            vector,
54            metadata: HashMap::new(),
55            version,
56        }
57    }
58
59    /// Approximate serialised byte size (id + version + label bytes + vector f32s + metadata).
60    pub fn approx_bytes(&self) -> u64 {
61        let meta_bytes: u64 = self
62            .metadata
63            .iter()
64            .map(|(k, v)| (k.len() + v.len()) as u64)
65            .sum();
66        8 + 8 + self.label.len() as u64 + (self.vector.len() as u64 * 4) + meta_bytes
67    }
68}
69
70// ============================================================
71// IndexSnapshot
72// ============================================================
73
74/// An immutable snapshot of the current index state.
75///
76/// Internally stored as a map from vector ID to `VectorEntry` for O(1) lookups.
77#[derive(Debug, Clone, Default, Serialize, Deserialize)]
78pub struct IndexSnapshot {
79    /// All entries in this snapshot, keyed by vector ID
80    pub entries: HashMap<u64, VectorEntry>,
81    /// Sequence number associated with this snapshot
82    pub seq: u64,
83}
84
85impl IndexSnapshot {
86    /// Create an empty snapshot at sequence 0.
87    pub fn new() -> Self {
88        Self::default()
89    }
90
91    /// Create a snapshot with the given entries and sequence number.
92    pub fn from_entries(entries: Vec<VectorEntry>, seq: u64) -> Self {
93        Self {
94            entries: entries.into_iter().map(|e| (e.id, e)).collect(),
95            seq,
96        }
97    }
98
99    /// Insert or update an entry.
100    pub fn upsert(&mut self, entry: VectorEntry) {
101        self.entries.insert(entry.id, entry);
102    }
103
104    /// Remove an entry by ID.  Returns `true` if the entry was present.
105    pub fn remove(&mut self, id: u64) -> bool {
106        self.entries.remove(&id).is_some()
107    }
108
109    /// Return the entry for the given ID, if present.
110    pub fn get(&self, id: u64) -> Option<&VectorEntry> {
111        self.entries.get(&id)
112    }
113
114    /// Number of entries in the snapshot.
115    pub fn len(&self) -> usize {
116        self.entries.len()
117    }
118
119    /// Returns `true` if the snapshot contains no entries.
120    pub fn is_empty(&self) -> bool {
121        self.entries.is_empty()
122    }
123}
124
125// ============================================================
126// IndexDelta
127// ============================================================
128
129/// The minimal diff between two index snapshots.
130#[derive(Debug, Clone, Default, Serialize, Deserialize)]
131pub struct IndexDelta {
132    /// Entries that are new or have been updated in the newer snapshot
133    pub added: Vec<VectorEntry>,
134    /// IDs of entries that have been removed in the newer snapshot
135    pub removed: Vec<u64>,
136    /// Entries whose vector data or metadata has changed
137    pub modified: Vec<VectorEntry>,
138}
139
140impl IndexDelta {
141    /// Returns `true` if the delta contains no changes.
142    pub fn is_empty(&self) -> bool {
143        self.added.is_empty() && self.removed.is_empty() && self.modified.is_empty()
144    }
145
146    /// Total number of change operations in this delta.
147    pub fn change_count(&self) -> usize {
148        self.added.len() + self.removed.len() + self.modified.len()
149    }
150}
151
152// ============================================================
153// DeltaSync
154// ============================================================
155
156/// Stateless helper for computing and applying index deltas.
157#[derive(Debug, Clone, Copy, Default)]
158pub struct DeltaSync;
159
160impl DeltaSync {
161    /// Create a new `DeltaSync` instance.
162    pub fn new() -> Self {
163        Self
164    }
165
166    /// Compute the delta required to transform `old_index` into `new_index`.
167    ///
168    /// An entry is considered **added** if it exists only in `new_index`.
169    /// An entry is **removed** if it exists only in `old_index`.
170    /// An entry is **modified** if it exists in both but the version number has
171    /// increased in `new_index`.
172    pub fn compute_delta(
173        &self,
174        old_index: &IndexSnapshot,
175        new_index: &IndexSnapshot,
176    ) -> IndexDelta {
177        let mut added = Vec::new();
178        let mut removed = Vec::new();
179        let mut modified = Vec::new();
180
181        // Entries in new that are absent or newer in old
182        for (id, new_entry) in &new_index.entries {
183            match old_index.entries.get(id) {
184                None => added.push(new_entry.clone()),
185                Some(old_entry) => {
186                    if new_entry.version > old_entry.version {
187                        modified.push(new_entry.clone());
188                    }
189                }
190            }
191        }
192
193        // Entries in old that are absent in new
194        for id in old_index.entries.keys() {
195            if !new_index.entries.contains_key(id) {
196                removed.push(*id);
197            }
198        }
199
200        IndexDelta {
201            added,
202            removed,
203            modified,
204        }
205    }
206
207    /// Apply `delta` to `base`, mutating it in place.
208    ///
209    /// Additions and modifications are upserted; removals are deleted.
210    /// Returns an error if a removal targets an ID that does not exist in `base`
211    /// (indicates a logic error in delta computation).
212    pub fn apply_delta(&self, base: &mut IndexSnapshot, delta: &IndexDelta) -> Result<()> {
213        for entry in &delta.added {
214            base.upsert(entry.clone());
215        }
216        for entry in &delta.modified {
217            base.upsert(entry.clone());
218        }
219        for &id in &delta.removed {
220            if !base.remove(id) {
221                return Err(anyhow!(
222                    "Delta removal of ID {} failed: entry not found in base snapshot",
223                    id
224                ));
225            }
226        }
227        Ok(())
228    }
229
230    /// Estimate the serialised byte size of a delta.
231    ///
232    /// Approximates: each removed ID costs 8 bytes; each added/modified entry
233    /// uses `VectorEntry::approx_bytes()`.
234    pub fn delta_size_bytes(&self, delta: &IndexDelta) -> u64 {
235        let added_bytes: u64 = delta.added.iter().map(|e| e.approx_bytes()).sum();
236        let modified_bytes: u64 = delta.modified.iter().map(|e| e.approx_bytes()).sum();
237        let removed_bytes: u64 = (delta.removed.len() as u64) * 8;
238        added_bytes + modified_bytes + removed_bytes
239    }
240}
241
242// ============================================================
243// ReplicationAlert
244// ============================================================
245
246/// Alert generated when replication lag exceeds a configured threshold.
247#[derive(Debug, Clone, Serialize, Deserialize)]
248pub struct ReplicationAlert {
249    /// Source datacenter
250    pub dc_a: String,
251    /// Destination datacenter
252    pub dc_b: String,
253    /// Measured lag in milliseconds
254    pub measured_lag_ms: u64,
255    /// The threshold that was exceeded
256    pub threshold_ms: u64,
257    /// Human-readable description
258    pub message: String,
259}
260
261// ============================================================
262// ReplicationLag
263// ============================================================
264
265/// Measures and evaluates replication lag between pairs of datacenters.
266///
267/// Lag measurements are stored in-memory; in production these would be
268/// populated from heartbeat timestamps.
269#[derive(Debug, Clone, Default)]
270pub struct ReplicationLag {
271    /// Measured lag per DC pair: key = (dc_a, dc_b) in sorted order
272    measurements_ms: HashMap<(String, String), u64>,
273}
274
275impl ReplicationLag {
276    /// Create a new lag tracker with no measurements.
277    pub fn new() -> Self {
278        Self::default()
279    }
280
281    /// Record a lag measurement between `dc_a` and `dc_b` (directional).
282    pub fn record(&mut self, dc_a: impl Into<String>, dc_b: impl Into<String>, lag_ms: u64) {
283        self.measurements_ms
284            .insert((dc_a.into(), dc_b.into()), lag_ms);
285    }
286
287    /// Retrieve the most recently recorded lag between `dc_a` and `dc_b` in
288    /// milliseconds.  Returns 0 if no measurement has been recorded.
289    pub fn lag_ms(&self, dc_a: &str, dc_b: &str) -> u64 {
290        self.measurements_ms
291            .get(&(dc_a.to_string(), dc_b.to_string()))
292            .copied()
293            .unwrap_or(0)
294    }
295
296    /// Returns `true` if `lag_ms` is within the acceptable `sla_ms` bound.
297    pub fn is_acceptable(&self, lag_ms: u64, sla_ms: u64) -> bool {
298        lag_ms <= sla_ms
299    }
300
301    /// Generate an alert if `lag_ms` exceeds `threshold_ms`, otherwise returns
302    /// `None`.
303    pub fn alert_if_excessive(
304        &self,
305        dc_a: &str,
306        dc_b: &str,
307        lag_ms: u64,
308        threshold_ms: u64,
309    ) -> Option<ReplicationAlert> {
310        if lag_ms > threshold_ms {
311            Some(ReplicationAlert {
312                dc_a: dc_a.to_string(),
313                dc_b: dc_b.to_string(),
314                measured_lag_ms: lag_ms,
315                threshold_ms,
316                message: format!(
317                    "Replication lag from {} to {} is {} ms, exceeding threshold {} ms",
318                    dc_a, dc_b, lag_ms, threshold_ms
319                ),
320            })
321        } else {
322            None
323        }
324    }
325
326    /// Check recorded lag between `dc_a` and `dc_b` against a threshold and
327    /// produce an alert if it is excessive.
328    pub fn check_and_alert(
329        &self,
330        dc_a: &str,
331        dc_b: &str,
332        threshold_ms: u64,
333    ) -> Option<ReplicationAlert> {
334        let lag = self.lag_ms(dc_a, dc_b);
335        self.alert_if_excessive(dc_a, dc_b, lag, threshold_ms)
336    }
337}
338
339// ============================================================
340// Tests
341// ============================================================
342
343#[cfg(test)]
344mod tests {
345    use super::*;
346    use anyhow::Result;
347
348    fn make_entry(id: u64, version: u64) -> VectorEntry {
349        VectorEntry::new(id, vec![id as f32, version as f32], version)
350    }
351
352    fn make_snapshot(entries: Vec<(u64, u64)>) -> IndexSnapshot {
353        let seq = entries.iter().map(|(_, v)| *v).max().unwrap_or(0);
354        IndexSnapshot::from_entries(
355            entries
356                .into_iter()
357                .map(|(id, ver)| make_entry(id, ver))
358                .collect(),
359            seq,
360        )
361    }
362
363    // ---- IndexSnapshot ----
364
365    #[test]
366    fn test_snapshot_upsert_and_get() {
367        let mut snap = IndexSnapshot::new();
368        snap.upsert(make_entry(1, 1));
369        assert!(snap.get(1).is_some());
370        assert_eq!(snap.len(), 1);
371    }
372
373    #[test]
374    fn test_snapshot_remove_existing() {
375        let mut snap = make_snapshot(vec![(1, 1), (2, 1)]);
376        assert!(snap.remove(1));
377        assert_eq!(snap.len(), 1);
378    }
379
380    #[test]
381    fn test_snapshot_remove_nonexistent() {
382        let mut snap = IndexSnapshot::new();
383        assert!(!snap.remove(99));
384    }
385
386    #[test]
387    fn test_snapshot_is_empty() {
388        let snap = IndexSnapshot::new();
389        assert!(snap.is_empty());
390    }
391
392    // ---- VectorEntry ----
393
394    #[test]
395    fn test_vector_entry_approx_bytes_basic() {
396        let e = make_entry(1, 1); // 2-element f32 vector
397        let bytes = e.approx_bytes();
398        // 8 (id) + 8 (version) + 0 (empty label) + 8 (2 * 4) + 0 (no meta) = 24
399        assert_eq!(bytes, 24);
400    }
401
402    #[test]
403    fn test_vector_entry_with_metadata_bytes() {
404        let mut e = make_entry(1, 1);
405        e.metadata.insert("key".into(), "value".into()); // 3 + 5 = 8 bytes
406        let bytes = e.approx_bytes();
407        assert_eq!(bytes, 32);
408    }
409
410    // ---- DeltaSync::compute_delta ----
411
412    #[test]
413    fn test_compute_delta_empty_to_empty() {
414        let ds = DeltaSync::new();
415        let old = IndexSnapshot::new();
416        let new = IndexSnapshot::new();
417        let delta = ds.compute_delta(&old, &new);
418        assert!(delta.is_empty());
419    }
420
421    #[test]
422    fn test_compute_delta_all_added() {
423        let ds = DeltaSync::new();
424        let old = IndexSnapshot::new();
425        let new = make_snapshot(vec![(1, 1), (2, 1), (3, 1)]);
426        let delta = ds.compute_delta(&old, &new);
427        assert_eq!(delta.added.len(), 3);
428        assert!(delta.removed.is_empty());
429        assert!(delta.modified.is_empty());
430    }
431
432    #[test]
433    fn test_compute_delta_all_removed() {
434        let ds = DeltaSync::new();
435        let old = make_snapshot(vec![(1, 1), (2, 1)]);
436        let new = IndexSnapshot::new();
437        let delta = ds.compute_delta(&old, &new);
438        assert_eq!(delta.removed.len(), 2);
439        assert!(delta.added.is_empty());
440        assert!(delta.modified.is_empty());
441    }
442
443    #[test]
444    fn test_compute_delta_modifications() {
445        let ds = DeltaSync::new();
446        let old = make_snapshot(vec![(1, 1), (2, 1)]);
447        let new = make_snapshot(vec![(1, 2), (2, 1)]); // entry 1 updated
448        let delta = ds.compute_delta(&old, &new);
449        assert_eq!(delta.modified.len(), 1);
450        assert_eq!(delta.modified[0].id, 1);
451        assert!(delta.added.is_empty());
452        assert!(delta.removed.is_empty());
453    }
454
455    #[test]
456    fn test_compute_delta_mixed() {
457        let ds = DeltaSync::new();
458        let old = make_snapshot(vec![(1, 1), (2, 1), (3, 1)]);
459        // entry 2 updated, entry 3 removed, entry 4 added
460        let new = make_snapshot(vec![(1, 1), (2, 2), (4, 1)]);
461        let delta = ds.compute_delta(&old, &new);
462        assert_eq!(delta.added.len(), 1); // entry 4
463        assert_eq!(delta.removed.len(), 1); // entry 3
464        assert_eq!(delta.modified.len(), 1); // entry 2
465    }
466
467    #[test]
468    fn test_compute_delta_no_change_no_diff() {
469        let ds = DeltaSync::new();
470        let snap = make_snapshot(vec![(1, 5), (2, 3)]);
471        let delta = ds.compute_delta(&snap, &snap);
472        assert!(delta.is_empty());
473    }
474
475    // ---- DeltaSync::apply_delta ----
476
477    #[test]
478    fn test_apply_delta_add() -> Result<()> {
479        let ds = DeltaSync::new();
480        let mut base = IndexSnapshot::new();
481        let delta = IndexDelta {
482            added: vec![make_entry(1, 1)],
483            removed: vec![],
484            modified: vec![],
485        };
486        ds.apply_delta(&mut base, &delta)?;
487        assert!(base.get(1).is_some());
488        Ok(())
489    }
490
491    #[test]
492    fn test_apply_delta_remove() -> Result<()> {
493        let ds = DeltaSync::new();
494        let mut base = make_snapshot(vec![(1, 1), (2, 1)]);
495        let delta = IndexDelta {
496            added: vec![],
497            removed: vec![1],
498            modified: vec![],
499        };
500        ds.apply_delta(&mut base, &delta)?;
501        assert!(base.get(1).is_none());
502        assert_eq!(base.len(), 1);
503        Ok(())
504    }
505
506    #[test]
507    fn test_apply_delta_modify() -> Result<()> {
508        let ds = DeltaSync::new();
509        let mut base = make_snapshot(vec![(1, 1)]);
510        let updated = make_entry(1, 2);
511        let delta = IndexDelta {
512            added: vec![],
513            removed: vec![],
514            modified: vec![updated.clone()],
515        };
516        ds.apply_delta(&mut base, &delta)?;
517        assert_eq!(base.get(1).expect("test value").version, 2);
518        Ok(())
519    }
520
521    #[test]
522    fn test_apply_delta_remove_nonexistent_errors() {
523        let ds = DeltaSync::new();
524        let mut base = IndexSnapshot::new();
525        let delta = IndexDelta {
526            added: vec![],
527            removed: vec![99],
528            modified: vec![],
529        };
530        assert!(ds.apply_delta(&mut base, &delta).is_err());
531    }
532
533    #[test]
534    fn test_apply_delta_roundtrip() -> Result<()> {
535        let ds = DeltaSync::new();
536        let old = make_snapshot(vec![(1, 1), (2, 1), (3, 1)]);
537        let new = make_snapshot(vec![(1, 2), (2, 1), (4, 1)]);
538        let delta = ds.compute_delta(&old, &new);
539        let mut applied = old.clone();
540        ds.apply_delta(&mut applied, &delta)?;
541        // applied should now match new
542        assert_eq!(applied.len(), new.len());
543        for (id, entry) in &new.entries {
544            assert_eq!(applied.get(*id).map(|e| e.version), Some(entry.version));
545        }
546        Ok(())
547    }
548
549    // ---- DeltaSync::delta_size_bytes ----
550
551    #[test]
552    fn test_delta_size_bytes_empty() {
553        let ds = DeltaSync::new();
554        let delta = IndexDelta::default();
555        assert_eq!(ds.delta_size_bytes(&delta), 0);
556    }
557
558    #[test]
559    fn test_delta_size_bytes_removed_only() {
560        let ds = DeltaSync::new();
561        let delta = IndexDelta {
562            added: vec![],
563            removed: vec![1, 2, 3],
564            modified: vec![],
565        };
566        assert_eq!(ds.delta_size_bytes(&delta), 24); // 3 * 8 bytes
567    }
568
569    #[test]
570    fn test_delta_size_bytes_added() {
571        let ds = DeltaSync::new();
572        let entry = make_entry(1, 1); // 2-element vector => 24 bytes
573        let expected = entry.approx_bytes();
574        let delta = IndexDelta {
575            added: vec![entry],
576            removed: vec![],
577            modified: vec![],
578        };
579        assert_eq!(ds.delta_size_bytes(&delta), expected);
580    }
581
582    // ---- ReplicationLag ----
583
584    #[test]
585    fn test_lag_ms_unknown_pair_is_zero() {
586        let lag = ReplicationLag::new();
587        assert_eq!(lag.lag_ms("dc-a", "dc-b"), 0);
588    }
589
590    #[test]
591    fn test_lag_ms_after_record() {
592        let mut lag = ReplicationLag::new();
593        lag.record("dc-a", "dc-b", 500);
594        assert_eq!(lag.lag_ms("dc-a", "dc-b"), 500);
595    }
596
597    #[test]
598    fn test_is_acceptable_within_sla() {
599        let lag = ReplicationLag::new();
600        assert!(lag.is_acceptable(100, 500));
601    }
602
603    #[test]
604    fn test_is_acceptable_equals_sla() {
605        let lag = ReplicationLag::new();
606        assert!(lag.is_acceptable(500, 500));
607    }
608
609    #[test]
610    fn test_is_acceptable_exceeds_sla() {
611        let lag = ReplicationLag::new();
612        assert!(!lag.is_acceptable(501, 500));
613    }
614
615    #[test]
616    fn test_alert_if_excessive_below_threshold() {
617        let lag = ReplicationLag::new();
618        let alert = lag.alert_if_excessive("dc-a", "dc-b", 100, 500);
619        assert!(alert.is_none());
620    }
621
622    #[test]
623    fn test_alert_if_excessive_above_threshold() -> Result<()> {
624        let lag = ReplicationLag::new();
625        let alert = lag.alert_if_excessive("dc-a", "dc-b", 1000, 500);
626        assert!(alert.is_some());
627        let a = alert.expect("alert was None");
628        assert_eq!(a.measured_lag_ms, 1000);
629        assert_eq!(a.threshold_ms, 500);
630        assert!(!a.message.is_empty());
631        Ok(())
632    }
633
634    #[test]
635    fn test_check_and_alert_uses_recorded_lag() -> Result<()> {
636        let mut lag = ReplicationLag::new();
637        lag.record("dc-a", "dc-b", 999);
638        let alert = lag.check_and_alert("dc-a", "dc-b", 500);
639        assert!(alert.is_some());
640        assert_eq!(alert.expect("test value").measured_lag_ms, 999);
641        Ok(())
642    }
643
644    #[test]
645    fn test_check_and_alert_no_alert_when_below() {
646        let mut lag = ReplicationLag::new();
647        lag.record("dc-a", "dc-b", 50);
648        let alert = lag.check_and_alert("dc-a", "dc-b", 500);
649        assert!(alert.is_none());
650    }
651
652    #[test]
653    fn test_delta_change_count() {
654        let delta = IndexDelta {
655            added: vec![make_entry(1, 1)],
656            removed: vec![2],
657            modified: vec![make_entry(3, 2), make_entry(4, 3)],
658        };
659        assert_eq!(delta.change_count(), 4);
660    }
661}