1use serde::{Deserialize, Serialize};
20use std::collections::{HashMap, HashSet};
21use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
22use std::sync::{Arc, RwLock};
23
24use common::VectorId;
25
26#[derive(Debug, Clone, Default, Serialize, Deserialize)]
28pub struct OnlineUpdateStats {
29 pub total_inserts: u64,
31 pub total_deletes: u64,
33 pub total_updates: u64,
35 pub pending_cleanup: usize,
37 pub last_maintenance_ms: u64,
39 pub maintenance_count: u64,
41}
42
43#[derive(Debug, Clone, Serialize, Deserialize)]
45pub struct OnlineUpdateConfig {
46 pub max_pending_deletes: usize,
48 pub max_pending_inserts: usize,
50 pub enable_background_maintenance: bool,
52 pub maintenance_interval_ms: u64,
54}
55
56impl Default for OnlineUpdateConfig {
57 fn default() -> Self {
58 Self {
59 max_pending_deletes: 1000,
60 max_pending_inserts: 100,
61 enable_background_maintenance: true,
62 maintenance_interval_ms: 60_000, }
64 }
65}
66
67pub trait OnlineUpdatable: Send + Sync {
69 fn online_insert(&self, id: VectorId, vector: Vec<f32>) -> Result<(), String>;
71
72 fn online_insert_batch(&self, vectors: Vec<(VectorId, Vec<f32>)>) -> Result<usize, String>;
74
75 fn online_delete(&self, id: &VectorId) -> Result<bool, String>;
77
78 fn online_delete_batch(&self, ids: &[VectorId]) -> Result<usize, String>;
80
81 fn online_update(&self, id: VectorId, vector: Vec<f32>) -> Result<(), String> {
83 self.online_delete(&id)?;
84 self.online_insert(id, vector)
85 }
86
87 fn online_update_batch(&self, vectors: Vec<(VectorId, Vec<f32>)>) -> Result<usize, String> {
89 let ids: Vec<_> = vectors.iter().map(|(id, _)| id.clone()).collect();
90 self.online_delete_batch(&ids)?;
91 self.online_insert_batch(vectors)
92 }
93
94 fn contains(&self, id: &VectorId) -> bool;
96
97 fn len(&self) -> usize;
99
100 fn is_empty(&self) -> bool {
102 self.len() == 0
103 }
104
105 fn maintenance(&self) -> Result<MaintenanceResult, String>;
107
108 fn online_stats(&self) -> OnlineUpdateStats;
110}
111
112#[derive(Debug, Clone, Default, Serialize, Deserialize)]
114pub struct MaintenanceResult {
115 pub vectors_cleaned: usize,
117 pub memory_reclaimed_bytes: usize,
119 pub duration_ms: u64,
121 pub restructured: bool,
123}
124
125#[derive(Debug)]
127pub struct UpdateBuffer {
128 config: OnlineUpdateConfig,
129 pending_inserts: RwLock<Vec<(VectorId, Vec<f32>)>>,
131 pending_deletes: RwLock<HashSet<VectorId>>,
133 stats: UpdateBufferStats,
135}
136
137#[derive(Debug, Default)]
138struct UpdateBufferStats {
139 total_inserts: AtomicU64,
140 total_deletes: AtomicU64,
141 total_updates: AtomicU64,
142 pending_count: AtomicUsize,
143}
144
145impl UpdateBuffer {
146 pub fn new(config: OnlineUpdateConfig) -> Self {
148 Self {
149 config,
150 pending_inserts: RwLock::new(Vec::new()),
151 pending_deletes: RwLock::new(HashSet::new()),
152 stats: UpdateBufferStats::default(),
153 }
154 }
155
156 pub fn buffer_insert(&self, id: VectorId, vector: Vec<f32>) {
158 let mut inserts = self
159 .pending_inserts
160 .write()
161 .expect("pending_inserts lock poisoned in buffer_insert");
162 inserts.push((id, vector));
163 self.stats.pending_count.fetch_add(1, Ordering::Relaxed);
164 }
165
166 pub fn buffer_delete(&self, id: VectorId) {
168 let mut deletes = self
169 .pending_deletes
170 .write()
171 .expect("pending_deletes lock poisoned in buffer_delete");
172 deletes.insert(id);
173 }
174
175 pub fn should_flush(&self) -> bool {
177 let inserts = self
178 .pending_inserts
179 .read()
180 .expect("pending_inserts lock poisoned in should_flush");
181 let deletes = self
182 .pending_deletes
183 .read()
184 .expect("pending_deletes lock poisoned in should_flush");
185
186 inserts.len() >= self.config.max_pending_inserts
187 || deletes.len() >= self.config.max_pending_deletes
188 }
189
190 pub fn flush_inserts(&self) -> Vec<(VectorId, Vec<f32>)> {
192 let mut inserts = self
193 .pending_inserts
194 .write()
195 .expect("pending_inserts lock poisoned in flush_inserts");
196 let flushed: Vec<_> = inserts.drain(..).collect();
197 self.stats
198 .total_inserts
199 .fetch_add(flushed.len() as u64, Ordering::Relaxed);
200 self.stats
201 .pending_count
202 .fetch_sub(flushed.len(), Ordering::Relaxed);
203 flushed
204 }
205
206 pub fn flush_deletes(&self) -> HashSet<VectorId> {
208 let mut deletes = self
209 .pending_deletes
210 .write()
211 .expect("pending_deletes lock poisoned in flush_deletes");
212 let flushed: HashSet<_> = deletes.drain().collect();
213 self.stats
214 .total_deletes
215 .fetch_add(flushed.len() as u64, Ordering::Relaxed);
216 flushed
217 }
218
219 pub fn stats(&self) -> OnlineUpdateStats {
221 OnlineUpdateStats {
222 total_inserts: self.stats.total_inserts.load(Ordering::Relaxed),
223 total_deletes: self.stats.total_deletes.load(Ordering::Relaxed),
224 total_updates: self.stats.total_updates.load(Ordering::Relaxed),
225 pending_cleanup: self
226 .pending_deletes
227 .read()
228 .expect("pending_deletes lock poisoned in stats")
229 .len(),
230 last_maintenance_ms: 0,
231 maintenance_count: 0,
232 }
233 }
234}
235
236pub struct OnlineIndex<T> {
238 inner: Arc<RwLock<T>>,
240 _buffer: UpdateBuffer,
242 deleted_ids: RwLock<HashSet<VectorId>>,
244 id_to_vector: RwLock<HashMap<VectorId, Vec<f32>>>,
246 stats: Arc<RwLock<OnlineUpdateStats>>,
248 config: OnlineUpdateConfig,
250}
251
252impl<T> OnlineIndex<T> {
253 pub fn new(inner: T, config: OnlineUpdateConfig) -> Self {
255 Self {
256 inner: Arc::new(RwLock::new(inner)),
257 _buffer: UpdateBuffer::new(config.clone()),
258 deleted_ids: RwLock::new(HashSet::new()),
259 id_to_vector: RwLock::new(HashMap::new()),
260 stats: Arc::new(RwLock::new(OnlineUpdateStats::default())),
261 config,
262 }
263 }
264
265 pub fn read(&self) -> std::sync::RwLockReadGuard<'_, T> {
267 self.inner
268 .read()
269 .expect("inner index lock poisoned in read")
270 }
271
272 pub fn write(&self) -> std::sync::RwLockWriteGuard<'_, T> {
274 self.inner
275 .write()
276 .expect("inner index lock poisoned in write")
277 }
278
279 pub fn is_deleted(&self, id: &VectorId) -> bool {
281 self.deleted_ids
282 .read()
283 .expect("deleted_ids lock poisoned in is_deleted")
284 .contains(id)
285 }
286
287 pub fn mark_deleted(&self, id: VectorId) -> bool {
289 let mut deleted = self
290 .deleted_ids
291 .write()
292 .expect("deleted_ids lock poisoned in mark_deleted");
293 let was_new = deleted.insert(id.clone());
294
295 if was_new {
296 let mut stats = self
297 .stats
298 .write()
299 .expect("stats lock poisoned in mark_deleted");
300 stats.total_deletes += 1;
301 stats.pending_cleanup += 1;
302 }
303
304 was_new
305 }
306
307 pub fn clear_deleted(&self, ids: &[VectorId]) {
309 let mut deleted = self
310 .deleted_ids
311 .write()
312 .expect("deleted_ids lock poisoned in clear_deleted");
313 let mut stats = self
314 .stats
315 .write()
316 .expect("stats lock poisoned in clear_deleted");
317
318 for id in ids {
319 if deleted.remove(id) {
320 stats.pending_cleanup = stats.pending_cleanup.saturating_sub(1);
321 }
322 }
323 }
324
325 pub fn pending_deletes(&self) -> usize {
327 self.deleted_ids
328 .read()
329 .expect("deleted_ids lock poisoned in pending_deletes")
330 .len()
331 }
332
333 pub fn needs_maintenance(&self) -> bool {
335 self.pending_deletes() >= self.config.max_pending_deletes
336 }
337
338 pub fn record_insert(&self) {
340 let mut stats = self
341 .stats
342 .write()
343 .expect("stats lock poisoned in record_insert");
344 stats.total_inserts += 1;
345 }
346
347 pub fn record_update(&self) {
349 let mut stats = self
350 .stats
351 .write()
352 .expect("stats lock poisoned in record_update");
353 stats.total_updates += 1;
354 }
355
356 pub fn stats(&self) -> OnlineUpdateStats {
358 self.stats
359 .read()
360 .expect("stats lock poisoned in stats")
361 .clone()
362 }
363
364 pub fn store_vector(&self, id: VectorId, vector: Vec<f32>) {
366 let mut map = self
367 .id_to_vector
368 .write()
369 .expect("id_to_vector lock poisoned in store_vector");
370 map.insert(id, vector);
371 }
372
373 pub fn remove_vector(&self, id: &VectorId) -> Option<Vec<f32>> {
375 let mut map = self
376 .id_to_vector
377 .write()
378 .expect("id_to_vector lock poisoned in remove_vector");
379 map.remove(id)
380 }
381
382 pub fn has_vector(&self, id: &VectorId) -> bool {
384 let map = self
385 .id_to_vector
386 .read()
387 .expect("id_to_vector lock poisoned in has_vector");
388 map.contains_key(id) && !self.is_deleted(id)
389 }
390
391 pub fn vector_count(&self) -> usize {
393 let map = self
394 .id_to_vector
395 .read()
396 .expect("id_to_vector lock poisoned in vector_count");
397 let deleted = self
398 .deleted_ids
399 .read()
400 .expect("deleted_ids lock poisoned in vector_count");
401 map.len().saturating_sub(deleted.len())
402 }
403}
404
405#[derive(Debug, Clone, Default, Serialize, Deserialize)]
407pub struct DeltaLog {
408 pub inserts: Vec<(VectorId, Vec<f32>)>,
410 pub deletes: Vec<VectorId>,
412 pub base_snapshot_time: u64,
414 pub sequence_number: u64,
416}
417
418impl DeltaLog {
419 pub fn new(base_snapshot_time: u64) -> Self {
421 Self {
422 inserts: Vec::new(),
423 deletes: Vec::new(),
424 base_snapshot_time,
425 sequence_number: 0,
426 }
427 }
428
429 pub fn record_insert(&mut self, id: VectorId, vector: Vec<f32>) {
431 self.inserts.push((id, vector));
432 self.sequence_number += 1;
433 }
434
435 pub fn record_delete(&mut self, id: VectorId) {
437 self.deletes.push(id);
438 self.sequence_number += 1;
439 }
440
441 pub fn should_compact(&self, threshold: usize) -> bool {
443 self.inserts.len() + self.deletes.len() > threshold
444 }
445
446 pub fn clear(&mut self, new_base_time: u64) {
448 self.inserts.clear();
449 self.deletes.clear();
450 self.base_snapshot_time = new_base_time;
451 }
453
454 pub fn len(&self) -> usize {
456 self.inserts.len() + self.deletes.len()
457 }
458
459 pub fn is_empty(&self) -> bool {
461 self.inserts.is_empty() && self.deletes.is_empty()
462 }
463}
464
465#[cfg(test)]
466mod tests {
467 use super::*;
468
469 #[test]
470 fn test_update_buffer() {
471 let config = OnlineUpdateConfig::default();
472 let buffer = UpdateBuffer::new(config);
473
474 buffer.buffer_insert("v1".to_string(), vec![1.0, 2.0, 3.0]);
476 buffer.buffer_insert("v2".to_string(), vec![4.0, 5.0, 6.0]);
477
478 assert!(!buffer.should_flush()); buffer.buffer_delete("v1".to_string());
482
483 let inserts = buffer.flush_inserts();
485 assert_eq!(inserts.len(), 2);
486
487 let deletes = buffer.flush_deletes();
488 assert_eq!(deletes.len(), 1);
489 assert!(deletes.contains("v1"));
490 }
491
492 #[test]
493 fn test_delta_log() {
494 let mut log = DeltaLog::new(1000);
495
496 log.record_insert("v1".to_string(), vec![1.0, 2.0]);
497 log.record_insert("v2".to_string(), vec![3.0, 4.0]);
498 log.record_delete("v0".to_string());
499
500 assert_eq!(log.len(), 3);
501 assert_eq!(log.inserts.len(), 2);
502 assert_eq!(log.deletes.len(), 1);
503 assert_eq!(log.sequence_number, 3);
504
505 log.clear(2000);
507 assert!(log.is_empty());
508 assert_eq!(log.base_snapshot_time, 2000);
509 assert_eq!(log.sequence_number, 3); }
511
512 #[test]
513 fn test_online_index_wrapper() {
514 let inner: Vec<(String, Vec<f32>)> = Vec::new();
516 let config = OnlineUpdateConfig::default();
517 let online = OnlineIndex::new(inner, config);
518
519 online.store_vector("v1".to_string(), vec![1.0, 2.0]);
521 online.store_vector("v2".to_string(), vec![3.0, 4.0]);
522
523 assert!(online.has_vector(&"v1".to_string()));
524 assert!(online.has_vector(&"v2".to_string()));
525 assert_eq!(online.vector_count(), 2);
526
527 online.mark_deleted("v1".to_string());
529 assert!(!online.has_vector(&"v1".to_string()));
530 assert_eq!(online.vector_count(), 1);
531 assert_eq!(online.pending_deletes(), 1);
532
533 let stats = online.stats();
535 assert_eq!(stats.total_deletes, 1);
536 }
537
538 #[test]
539 fn test_flush_threshold() {
540 let config = OnlineUpdateConfig {
541 max_pending_inserts: 5,
542 max_pending_deletes: 3,
543 ..Default::default()
544 };
545 let buffer = UpdateBuffer::new(config);
546
547 for i in 0..4 {
549 buffer.buffer_insert(format!("v{}", i), vec![i as f32]);
550 }
551 assert!(!buffer.should_flush());
552
553 buffer.buffer_insert("v4".to_string(), vec![4.0]);
555 assert!(buffer.should_flush());
556 }
557}