Skip to main content

engine/
online_update.rs

1//! Online Index Updates
2//!
3//! Provides a unified interface for incrementally updating indexes without
4//! requiring a full rebuild. Supports:
5//!
6//! - Incremental vector insertion
7//! - Vector deletion with lazy cleanup
8//! - Vector updates (delete + insert)
9//! - Background maintenance operations
10//! - Concurrent update handling
11//!
12//! # Design Principles
13//!
14//! 1. **Non-blocking reads**: Updates don't block ongoing searches
15//! 2. **Eventual consistency**: Updates visible after brief delay
16//! 3. **Lazy cleanup**: Deleted vectors marked, cleaned up in background
17//! 4. **Batch efficiency**: Batching updates for better performance
18
19use serde::{Deserialize, Serialize};
20use std::collections::{HashMap, HashSet};
21use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
22use std::sync::{Arc, RwLock};
23
24use common::VectorId;
25
26/// Statistics for online update operations
27#[derive(Debug, Clone, Default, Serialize, Deserialize)]
28pub struct OnlineUpdateStats {
29    /// Total vectors inserted since index creation
30    pub total_inserts: u64,
31    /// Total vectors deleted since index creation
32    pub total_deletes: u64,
33    /// Total vectors updated since index creation
34    pub total_updates: u64,
35    /// Vectors pending cleanup (deleted but not yet removed)
36    pub pending_cleanup: usize,
37    /// Last maintenance operation timestamp (unix millis)
38    pub last_maintenance_ms: u64,
39    /// Number of maintenance operations performed
40    pub maintenance_count: u64,
41}
42
43/// Configuration for online updates
44#[derive(Debug, Clone, Serialize, Deserialize)]
45pub struct OnlineUpdateConfig {
46    /// Maximum pending deletes before triggering cleanup
47    pub max_pending_deletes: usize,
48    /// Maximum pending inserts before triggering batch commit
49    pub max_pending_inserts: usize,
50    /// Whether to enable background maintenance
51    pub enable_background_maintenance: bool,
52    /// Interval for background maintenance in milliseconds
53    pub maintenance_interval_ms: u64,
54}
55
56impl Default for OnlineUpdateConfig {
57    fn default() -> Self {
58        Self {
59            max_pending_deletes: 1000,
60            max_pending_inserts: 100,
61            enable_background_maintenance: true,
62            maintenance_interval_ms: 60_000, // 1 minute
63        }
64    }
65}
66
67/// Trait for indexes that support online updates
68pub trait OnlineUpdatable: Send + Sync {
69    /// Insert a single vector
70    fn online_insert(&self, id: VectorId, vector: Vec<f32>) -> Result<(), String>;
71
72    /// Insert multiple vectors in a batch
73    fn online_insert_batch(&self, vectors: Vec<(VectorId, Vec<f32>)>) -> Result<usize, String>;
74
75    /// Delete a vector by ID (may be lazy)
76    fn online_delete(&self, id: &VectorId) -> Result<bool, String>;
77
78    /// Delete multiple vectors by ID
79    fn online_delete_batch(&self, ids: &[VectorId]) -> Result<usize, String>;
80
81    /// Update a vector (atomic delete + insert)
82    fn online_update(&self, id: VectorId, vector: Vec<f32>) -> Result<(), String> {
83        self.online_delete(&id)?;
84        self.online_insert(id, vector)
85    }
86
87    /// Update multiple vectors
88    fn online_update_batch(&self, vectors: Vec<(VectorId, Vec<f32>)>) -> Result<usize, String> {
89        let ids: Vec<_> = vectors.iter().map(|(id, _)| id.clone()).collect();
90        self.online_delete_batch(&ids)?;
91        self.online_insert_batch(vectors)
92    }
93
94    /// Check if a vector exists
95    fn contains(&self, id: &VectorId) -> bool;
96
97    /// Get the current vector count
98    fn len(&self) -> usize;
99
100    /// Check if the index is empty
101    fn is_empty(&self) -> bool {
102        self.len() == 0
103    }
104
105    /// Perform maintenance operations (cleanup deleted vectors, optimize structure)
106    fn maintenance(&self) -> Result<MaintenanceResult, String>;
107
108    /// Get online update statistics
109    fn online_stats(&self) -> OnlineUpdateStats;
110}
111
112/// Result of a maintenance operation
113#[derive(Debug, Clone, Default, Serialize, Deserialize)]
114pub struct MaintenanceResult {
115    /// Number of vectors cleaned up
116    pub vectors_cleaned: usize,
117    /// Memory reclaimed in bytes
118    pub memory_reclaimed_bytes: usize,
119    /// Duration of maintenance in milliseconds
120    pub duration_ms: u64,
121    /// Whether the index was restructured
122    pub restructured: bool,
123}
124
125/// Buffer for batching online updates
126#[derive(Debug)]
127pub struct UpdateBuffer {
128    config: OnlineUpdateConfig,
129    /// Pending inserts
130    pending_inserts: RwLock<Vec<(VectorId, Vec<f32>)>>,
131    /// Pending deletes
132    pending_deletes: RwLock<HashSet<VectorId>>,
133    /// Statistics
134    stats: UpdateBufferStats,
135}
136
137#[derive(Debug, Default)]
138struct UpdateBufferStats {
139    total_inserts: AtomicU64,
140    total_deletes: AtomicU64,
141    total_updates: AtomicU64,
142    pending_count: AtomicUsize,
143}
144
145impl UpdateBuffer {
146    /// Create a new update buffer
147    pub fn new(config: OnlineUpdateConfig) -> Self {
148        Self {
149            config,
150            pending_inserts: RwLock::new(Vec::new()),
151            pending_deletes: RwLock::new(HashSet::new()),
152            stats: UpdateBufferStats::default(),
153        }
154    }
155
156    /// Add an insert to the buffer
157    pub fn buffer_insert(&self, id: VectorId, vector: Vec<f32>) {
158        let mut inserts = self
159            .pending_inserts
160            .write()
161            .expect("pending_inserts lock poisoned in buffer_insert");
162        inserts.push((id, vector));
163        self.stats.pending_count.fetch_add(1, Ordering::Relaxed);
164    }
165
166    /// Add a delete to the buffer
167    pub fn buffer_delete(&self, id: VectorId) {
168        let mut deletes = self
169            .pending_deletes
170            .write()
171            .expect("pending_deletes lock poisoned in buffer_delete");
172        deletes.insert(id);
173    }
174
175    /// Check if buffer should be flushed
176    pub fn should_flush(&self) -> bool {
177        let inserts = self
178            .pending_inserts
179            .read()
180            .expect("pending_inserts lock poisoned in should_flush");
181        let deletes = self
182            .pending_deletes
183            .read()
184            .expect("pending_deletes lock poisoned in should_flush");
185
186        inserts.len() >= self.config.max_pending_inserts
187            || deletes.len() >= self.config.max_pending_deletes
188    }
189
190    /// Flush pending inserts and return them
191    pub fn flush_inserts(&self) -> Vec<(VectorId, Vec<f32>)> {
192        let mut inserts = self
193            .pending_inserts
194            .write()
195            .expect("pending_inserts lock poisoned in flush_inserts");
196        let flushed: Vec<_> = inserts.drain(..).collect();
197        self.stats
198            .total_inserts
199            .fetch_add(flushed.len() as u64, Ordering::Relaxed);
200        self.stats
201            .pending_count
202            .fetch_sub(flushed.len(), Ordering::Relaxed);
203        flushed
204    }
205
206    /// Flush pending deletes and return them
207    pub fn flush_deletes(&self) -> HashSet<VectorId> {
208        let mut deletes = self
209            .pending_deletes
210            .write()
211            .expect("pending_deletes lock poisoned in flush_deletes");
212        let flushed: HashSet<_> = deletes.drain().collect();
213        self.stats
214            .total_deletes
215            .fetch_add(flushed.len() as u64, Ordering::Relaxed);
216        flushed
217    }
218
219    /// Get buffer statistics
220    pub fn stats(&self) -> OnlineUpdateStats {
221        OnlineUpdateStats {
222            total_inserts: self.stats.total_inserts.load(Ordering::Relaxed),
223            total_deletes: self.stats.total_deletes.load(Ordering::Relaxed),
224            total_updates: self.stats.total_updates.load(Ordering::Relaxed),
225            pending_cleanup: self
226                .pending_deletes
227                .read()
228                .expect("pending_deletes lock poisoned in stats")
229                .len(),
230            last_maintenance_ms: 0,
231            maintenance_count: 0,
232        }
233    }
234}
235
236/// Wrapper that adds online update capabilities to any index
237pub struct OnlineIndex<T> {
238    /// The underlying index
239    inner: Arc<RwLock<T>>,
240    /// Update buffer (reserved for batch operations)
241    _buffer: UpdateBuffer,
242    /// Deleted vector IDs (for lazy cleanup)
243    deleted_ids: RwLock<HashSet<VectorId>>,
244    /// ID to vector mapping for updates
245    id_to_vector: RwLock<HashMap<VectorId, Vec<f32>>>,
246    /// Statistics
247    stats: Arc<RwLock<OnlineUpdateStats>>,
248    /// Configuration
249    config: OnlineUpdateConfig,
250}
251
252impl<T> OnlineIndex<T> {
253    /// Create a new online-updatable index wrapper
254    pub fn new(inner: T, config: OnlineUpdateConfig) -> Self {
255        Self {
256            inner: Arc::new(RwLock::new(inner)),
257            _buffer: UpdateBuffer::new(config.clone()),
258            deleted_ids: RwLock::new(HashSet::new()),
259            id_to_vector: RwLock::new(HashMap::new()),
260            stats: Arc::new(RwLock::new(OnlineUpdateStats::default())),
261            config,
262        }
263    }
264
265    /// Get read access to the inner index
266    pub fn read(&self) -> std::sync::RwLockReadGuard<'_, T> {
267        self.inner
268            .read()
269            .expect("inner index lock poisoned in read")
270    }
271
272    /// Get write access to the inner index
273    pub fn write(&self) -> std::sync::RwLockWriteGuard<'_, T> {
274        self.inner
275            .write()
276            .expect("inner index lock poisoned in write")
277    }
278
279    /// Check if a vector ID is deleted
280    pub fn is_deleted(&self, id: &VectorId) -> bool {
281        self.deleted_ids
282            .read()
283            .expect("deleted_ids lock poisoned in is_deleted")
284            .contains(id)
285    }
286
287    /// Mark a vector as deleted
288    pub fn mark_deleted(&self, id: VectorId) -> bool {
289        let mut deleted = self
290            .deleted_ids
291            .write()
292            .expect("deleted_ids lock poisoned in mark_deleted");
293        let was_new = deleted.insert(id.clone());
294
295        if was_new {
296            let mut stats = self
297                .stats
298                .write()
299                .expect("stats lock poisoned in mark_deleted");
300            stats.total_deletes += 1;
301            stats.pending_cleanup += 1;
302        }
303
304        was_new
305    }
306
307    /// Clear deleted marks after cleanup
308    pub fn clear_deleted(&self, ids: &[VectorId]) {
309        let mut deleted = self
310            .deleted_ids
311            .write()
312            .expect("deleted_ids lock poisoned in clear_deleted");
313        let mut stats = self
314            .stats
315            .write()
316            .expect("stats lock poisoned in clear_deleted");
317
318        for id in ids {
319            if deleted.remove(id) {
320                stats.pending_cleanup = stats.pending_cleanup.saturating_sub(1);
321            }
322        }
323    }
324
325    /// Get number of pending deletes
326    pub fn pending_deletes(&self) -> usize {
327        self.deleted_ids
328            .read()
329            .expect("deleted_ids lock poisoned in pending_deletes")
330            .len()
331    }
332
333    /// Check if maintenance is needed
334    pub fn needs_maintenance(&self) -> bool {
335        self.pending_deletes() >= self.config.max_pending_deletes
336    }
337
338    /// Record an insert
339    pub fn record_insert(&self) {
340        let mut stats = self
341            .stats
342            .write()
343            .expect("stats lock poisoned in record_insert");
344        stats.total_inserts += 1;
345    }
346
347    /// Record an update
348    pub fn record_update(&self) {
349        let mut stats = self
350            .stats
351            .write()
352            .expect("stats lock poisoned in record_update");
353        stats.total_updates += 1;
354    }
355
356    /// Get statistics
357    pub fn stats(&self) -> OnlineUpdateStats {
358        self.stats
359            .read()
360            .expect("stats lock poisoned in stats")
361            .clone()
362    }
363
364    /// Store vector for potential retrieval
365    pub fn store_vector(&self, id: VectorId, vector: Vec<f32>) {
366        let mut map = self
367            .id_to_vector
368            .write()
369            .expect("id_to_vector lock poisoned in store_vector");
370        map.insert(id, vector);
371    }
372
373    /// Remove stored vector
374    pub fn remove_vector(&self, id: &VectorId) -> Option<Vec<f32>> {
375        let mut map = self
376            .id_to_vector
377            .write()
378            .expect("id_to_vector lock poisoned in remove_vector");
379        map.remove(id)
380    }
381
382    /// Check if vector exists in storage
383    pub fn has_vector(&self, id: &VectorId) -> bool {
384        let map = self
385            .id_to_vector
386            .read()
387            .expect("id_to_vector lock poisoned in has_vector");
388        map.contains_key(id) && !self.is_deleted(id)
389    }
390
391    /// Get vector count (excluding deleted)
392    pub fn vector_count(&self) -> usize {
393        let map = self
394            .id_to_vector
395            .read()
396            .expect("id_to_vector lock poisoned in vector_count");
397        let deleted = self
398            .deleted_ids
399            .read()
400            .expect("deleted_ids lock poisoned in vector_count");
401        map.len().saturating_sub(deleted.len())
402    }
403}
404
405/// Delta log for tracking changes between index snapshots
406#[derive(Debug, Clone, Default, Serialize, Deserialize)]
407pub struct DeltaLog {
408    /// Inserted vectors since last snapshot
409    pub inserts: Vec<(VectorId, Vec<f32>)>,
410    /// Deleted vector IDs since last snapshot
411    pub deletes: Vec<VectorId>,
412    /// Timestamp of the base snapshot
413    pub base_snapshot_time: u64,
414    /// Current log sequence number
415    pub sequence_number: u64,
416}
417
418impl DeltaLog {
419    /// Create a new delta log
420    pub fn new(base_snapshot_time: u64) -> Self {
421        Self {
422            inserts: Vec::new(),
423            deletes: Vec::new(),
424            base_snapshot_time,
425            sequence_number: 0,
426        }
427    }
428
429    /// Record an insert
430    pub fn record_insert(&mut self, id: VectorId, vector: Vec<f32>) {
431        self.inserts.push((id, vector));
432        self.sequence_number += 1;
433    }
434
435    /// Record a delete
436    pub fn record_delete(&mut self, id: VectorId) {
437        self.deletes.push(id);
438        self.sequence_number += 1;
439    }
440
441    /// Check if log should be compacted
442    pub fn should_compact(&self, threshold: usize) -> bool {
443        self.inserts.len() + self.deletes.len() > threshold
444    }
445
446    /// Clear the log after snapshot
447    pub fn clear(&mut self, new_base_time: u64) {
448        self.inserts.clear();
449        self.deletes.clear();
450        self.base_snapshot_time = new_base_time;
451        // Keep sequence number for ordering
452    }
453
454    /// Get total operations in log
455    pub fn len(&self) -> usize {
456        self.inserts.len() + self.deletes.len()
457    }
458
459    /// Check if log is empty
460    pub fn is_empty(&self) -> bool {
461        self.inserts.is_empty() && self.deletes.is_empty()
462    }
463}
464
465#[cfg(test)]
466mod tests {
467    use super::*;
468
469    #[test]
470    fn test_update_buffer() {
471        let config = OnlineUpdateConfig::default();
472        let buffer = UpdateBuffer::new(config);
473
474        // Buffer inserts
475        buffer.buffer_insert("v1".to_string(), vec![1.0, 2.0, 3.0]);
476        buffer.buffer_insert("v2".to_string(), vec![4.0, 5.0, 6.0]);
477
478        assert!(!buffer.should_flush()); // Not at threshold yet
479
480        // Buffer deletes
481        buffer.buffer_delete("v1".to_string());
482
483        // Flush and check
484        let inserts = buffer.flush_inserts();
485        assert_eq!(inserts.len(), 2);
486
487        let deletes = buffer.flush_deletes();
488        assert_eq!(deletes.len(), 1);
489        assert!(deletes.contains("v1"));
490    }
491
492    #[test]
493    fn test_delta_log() {
494        let mut log = DeltaLog::new(1000);
495
496        log.record_insert("v1".to_string(), vec![1.0, 2.0]);
497        log.record_insert("v2".to_string(), vec![3.0, 4.0]);
498        log.record_delete("v0".to_string());
499
500        assert_eq!(log.len(), 3);
501        assert_eq!(log.inserts.len(), 2);
502        assert_eq!(log.deletes.len(), 1);
503        assert_eq!(log.sequence_number, 3);
504
505        // Clear after snapshot
506        log.clear(2000);
507        assert!(log.is_empty());
508        assert_eq!(log.base_snapshot_time, 2000);
509        assert_eq!(log.sequence_number, 3); // Preserved
510    }
511
512    #[test]
513    fn test_online_index_wrapper() {
514        // Test with a simple Vec as inner "index"
515        let inner: Vec<(String, Vec<f32>)> = Vec::new();
516        let config = OnlineUpdateConfig::default();
517        let online = OnlineIndex::new(inner, config);
518
519        // Store some vectors
520        online.store_vector("v1".to_string(), vec![1.0, 2.0]);
521        online.store_vector("v2".to_string(), vec![3.0, 4.0]);
522
523        assert!(online.has_vector(&"v1".to_string()));
524        assert!(online.has_vector(&"v2".to_string()));
525        assert_eq!(online.vector_count(), 2);
526
527        // Mark one as deleted
528        online.mark_deleted("v1".to_string());
529        assert!(!online.has_vector(&"v1".to_string()));
530        assert_eq!(online.vector_count(), 1);
531        assert_eq!(online.pending_deletes(), 1);
532
533        // Check stats
534        let stats = online.stats();
535        assert_eq!(stats.total_deletes, 1);
536    }
537
538    #[test]
539    fn test_flush_threshold() {
540        let config = OnlineUpdateConfig {
541            max_pending_inserts: 5,
542            max_pending_deletes: 3,
543            ..Default::default()
544        };
545        let buffer = UpdateBuffer::new(config);
546
547        // Add inserts below threshold
548        for i in 0..4 {
549            buffer.buffer_insert(format!("v{}", i), vec![i as f32]);
550        }
551        assert!(!buffer.should_flush());
552
553        // Add one more to hit threshold
554        buffer.buffer_insert("v4".to_string(), vec![4.0]);
555        assert!(buffer.should_flush());
556    }
557}