oxirs_vec/
real_time_updates.rs

1//! Real-time Vector Index Updates
2//!
3//! This module provides comprehensive real-time updates for vector indices, including:
4//! - Incremental updates with conflict resolution
5//! - Streaming ingestion with backpressure control
6//! - Live index maintenance and optimization
7//! - Distributed update coordination
8//! - Version control and rollback capabilities
9//! - Performance monitoring and analytics
10
11use crate::similarity::SimilarityResult;
12use crate::{index::VectorIndex, VectorId};
13use anyhow::{anyhow, Result};
14use std::collections::{HashMap, VecDeque};
15use std::sync::{Arc, RwLock};
16use std::time::{Duration, Instant};
17use tokio::sync::{watch, Mutex};
18use tokio::time::interval;
19
20/// Real-time update operation
21#[derive(Debug, Clone)]
22pub enum UpdateOperation {
23    /// Insert new vector
24    Insert {
25        id: VectorId,
26        vector: Vec<f32>,
27        metadata: HashMap<String, String>,
28    },
29    /// Update existing vector
30    Update {
31        id: VectorId,
32        vector: Vec<f32>,
33        metadata: Option<HashMap<String, String>>,
34    },
35    /// Delete vector
36    Delete { id: VectorId },
37    /// Batch operations
38    Batch { operations: Vec<UpdateOperation> },
39}
40
41/// Update batch for efficient processing
42#[derive(Debug, Clone)]
43pub struct UpdateBatch {
44    pub operations: Vec<UpdateOperation>,
45    pub timestamp: Instant,
46    pub priority: UpdatePriority,
47}
48
49/// Update priority levels
50#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
51pub enum UpdatePriority {
52    Low = 0,
53    Normal = 1,
54    High = 2,
55    Critical = 3,
56}
57
58/// Real-time update configuration
59#[derive(Debug, Clone)]
60pub struct RealTimeConfig {
61    /// Maximum batch size for updates
62    pub max_batch_size: usize,
63    /// Maximum time to wait before processing batch
64    pub max_batch_wait: Duration,
65    /// Buffer capacity for update queue
66    pub buffer_capacity: usize,
67    /// Enable background compaction
68    pub background_compaction: bool,
69    /// Compaction interval
70    pub compaction_interval: Duration,
71    /// Enable index rebuilding
72    pub enable_rebuilding: bool,
73    /// Rebuild threshold (fraction of updates)
74    pub rebuild_threshold: f64,
75}
76
77impl Default for RealTimeConfig {
78    fn default() -> Self {
79        Self {
80            max_batch_size: 1000,
81            max_batch_wait: Duration::from_millis(100),
82            buffer_capacity: 10000,
83            background_compaction: true,
84            compaction_interval: Duration::from_secs(300), // 5 minutes
85            enable_rebuilding: true,
86            rebuild_threshold: 0.3, // Rebuild when 30% of index has been updated
87        }
88    }
89}
90
91/// Real-time vector index updater
92pub struct RealTimeVectorUpdater {
93    /// Configuration
94    config: RealTimeConfig,
95    /// Update queue
96    update_queue: Arc<Mutex<VecDeque<UpdateOperation>>>,
97    /// Batch processor
98    batch_processor: Arc<Mutex<BatchProcessor>>,
99    /// Index reference
100    index: Arc<RwLock<dyn VectorIndex + Send + Sync>>,
101    /// Update statistics
102    stats: Arc<RwLock<UpdateStats>>,
103    /// Shutdown signal
104    shutdown: watch::Sender<bool>,
105    /// Background tasks handle
106    tasks: Vec<tokio::task::JoinHandle<()>>,
107}
108
109/// Update statistics
110#[derive(Debug, Clone, Default)]
111pub struct UpdateStats {
112    pub total_updates: u64,
113    pub total_inserts: u64,
114    pub total_deletes: u64,
115    pub total_batches: u64,
116    pub failed_updates: u64,
117    pub average_batch_size: f64,
118    pub average_processing_time: Duration,
119    pub last_compaction: Option<Instant>,
120    pub index_size: usize,
121    pub pending_updates: usize,
122}
123
124/// Batch processor for efficient updates
125pub struct BatchProcessor {
126    pending_batch: Vec<UpdateOperation>,
127    batch_start_time: Option<Instant>,
128    total_updates_since_rebuild: usize,
129    last_rebuild: Option<Instant>,
130}
131
132impl RealTimeVectorUpdater {
133    /// Create new real-time updater
134    pub fn new(
135        index: Arc<RwLock<dyn VectorIndex + Send + Sync>>,
136        config: RealTimeConfig,
137    ) -> Result<Self> {
138        let (shutdown_tx, _shutdown_rx) = watch::channel(false);
139
140        let updater = Self {
141            config: config.clone(),
142            update_queue: Arc::new(Mutex::new(VecDeque::new())),
143            batch_processor: Arc::new(Mutex::new(BatchProcessor::new())),
144            index: index.clone(),
145            stats: Arc::new(RwLock::new(UpdateStats::default())),
146            shutdown: shutdown_tx,
147            tasks: Vec::new(),
148        };
149
150        Ok(updater)
151    }
152
153    /// Start background processing
154    pub async fn start(&mut self) -> Result<()> {
155        let shutdown_rx = self.shutdown.subscribe();
156
157        // Start batch processing task
158        let batch_task = self
159            .start_batch_processing_task(shutdown_rx.clone())
160            .await?;
161        self.tasks.push(batch_task);
162
163        // Start compaction task if enabled
164        if self.config.background_compaction {
165            let compaction_task = self.start_compaction_task(shutdown_rx.clone()).await?;
166            self.tasks.push(compaction_task);
167        }
168
169        Ok(())
170    }
171
172    /// Stop background processing
173    pub async fn stop(&mut self) -> Result<()> {
174        // Send shutdown signal
175        self.shutdown
176            .send(true)
177            .map_err(|_| anyhow!("Failed to send shutdown signal"))?;
178
179        // Wait for all tasks to complete
180        for task in self.tasks.drain(..) {
181            task.await.map_err(|e| anyhow!("Task join error: {}", e))?;
182        }
183
184        // Process any remaining updates
185        self.flush_pending_updates().await?;
186
187        Ok(())
188    }
189
190    /// Submit update operation
191    pub async fn submit_update(&self, operation: UpdateOperation) -> Result<()> {
192        let mut queue = self.update_queue.lock().await;
193
194        // Check queue capacity
195        if queue.len() >= self.config.buffer_capacity {
196            return Err(anyhow!("Update queue is full"));
197        }
198
199        queue.push_back(operation);
200        Ok(())
201    }
202
203    /// Submit batch of operations
204    pub async fn submit_batch(&self, operations: Vec<UpdateOperation>) -> Result<()> {
205        let batch_op = UpdateOperation::Batch { operations };
206        self.submit_update(batch_op).await
207    }
208
209    /// Get update statistics
210    pub fn get_stats(&self) -> UpdateStats {
211        self.stats
212            .read()
213            .expect("rwlock should not be poisoned")
214            .clone()
215    }
216
217    /// Force index compaction
218    pub async fn compact_index(&self) -> Result<()> {
219        let _index = self.index.read().expect("rwlock should not be poisoned");
220        // Compact implementation would go here
221        // For now, this is a placeholder
222
223        let mut stats = self.stats.write().expect("rwlock should not be poisoned");
224        stats.last_compaction = Some(Instant::now());
225
226        Ok(())
227    }
228
229    /// Rebuild index if needed
230    pub async fn rebuild_index_if_needed(&self) -> Result<bool> {
231        let index_size = {
232            let stats = self.stats.read().expect("rwlock should not be poisoned");
233            stats.index_size
234        };
235
236        if index_size == 0 {
237            return Ok(false);
238        }
239
240        let processor = self.batch_processor.lock().await;
241        let update_ratio = processor.total_updates_since_rebuild as f64 / index_size as f64;
242
243        if update_ratio >= self.config.rebuild_threshold {
244            drop(processor);
245
246            // Trigger rebuild
247            self.rebuild_index().await?;
248            Ok(true)
249        } else {
250            Ok(false)
251        }
252    }
253
254    /// Force index rebuild
255    pub async fn rebuild_index(&self) -> Result<()> {
256        // Implementation would extract all vectors and rebuild the index
257        // This is a placeholder for the actual rebuild logic
258
259        let mut processor = self.batch_processor.lock().await;
260        processor.total_updates_since_rebuild = 0;
261        processor.last_rebuild = Some(Instant::now());
262
263        Ok(())
264    }
265
266    /// Flush all pending updates
267    pub async fn flush_pending_updates(&self) -> Result<()> {
268        let mut queue = self.update_queue.lock().await;
269        let mut processor = self.batch_processor.lock().await;
270
271        // Move all queued operations to batch processor
272        while let Some(operation) = queue.pop_front() {
273            processor.pending_batch.push(operation);
274        }
275
276        // Process the batch
277        if !processor.pending_batch.is_empty() {
278            self.process_batch(&mut processor).await?;
279        }
280
281        Ok(())
282    }
283
284    /// Start batch processing background task
285    async fn start_batch_processing_task(
286        &self,
287        mut shutdown_rx: watch::Receiver<bool>,
288    ) -> Result<tokio::task::JoinHandle<()>> {
289        let queue = self.update_queue.clone();
290        let processor = self.batch_processor.clone();
291        let index = self.index.clone();
292        let stats = self.stats.clone();
293        let config = self.config.clone();
294
295        let task = tokio::spawn(async move {
296            let mut interval = interval(config.max_batch_wait);
297
298            loop {
299                tokio::select! {
300                    _ = interval.tick() => {
301                        // Process batch on timer
302                        if let Err(e) = Self::process_pending_batch(
303                            &queue, &processor, &index, &stats, &config
304                        ).await {
305                            eprintln!("Batch processing error: {e}");
306                        }
307                    }
308                    _ = shutdown_rx.changed() => {
309                        if *shutdown_rx.borrow() {
310                            break;
311                        }
312                    }
313                }
314            }
315        });
316
317        Ok(task)
318    }
319
320    /// Start compaction background task
321    async fn start_compaction_task(
322        &self,
323        mut shutdown_rx: watch::Receiver<bool>,
324    ) -> Result<tokio::task::JoinHandle<()>> {
325        let index = self.index.clone();
326        let stats = self.stats.clone();
327        let config = self.config.clone();
328
329        let task = tokio::spawn(async move {
330            let mut interval = interval(config.compaction_interval);
331
332            loop {
333                tokio::select! {
334                    _ = interval.tick() => {
335                        // Perform compaction
336                        if let Err(e) = Self::perform_compaction(&index, &stats).await {
337                            eprintln!("Compaction error: {e}");
338                        }
339                    }
340                    _ = shutdown_rx.changed() => {
341                        if *shutdown_rx.borrow() {
342                            break;
343                        }
344                    }
345                }
346            }
347        });
348
349        Ok(task)
350    }
351
352    /// Process pending batch
353    async fn process_pending_batch(
354        queue: &Arc<Mutex<VecDeque<UpdateOperation>>>,
355        processor: &Arc<Mutex<BatchProcessor>>,
356        index: &Arc<RwLock<dyn VectorIndex + Send + Sync>>,
357        stats: &Arc<RwLock<UpdateStats>>,
358        config: &RealTimeConfig,
359    ) -> Result<()> {
360        // Extract operations from queue/processor in a separate scope
361        let operations = {
362            let mut queue_guard = queue.lock().await;
363            let mut processor_guard = processor.lock().await;
364
365            // Move operations from queue to batch
366            let mut batch_size = processor_guard.pending_batch.len();
367            while batch_size < config.max_batch_size && !queue_guard.is_empty() {
368                if let Some(operation) = queue_guard.pop_front() {
369                    processor_guard.pending_batch.push(operation);
370                    batch_size += 1;
371                }
372            }
373
374            // Extract batch operations if not empty
375            if !processor_guard.pending_batch.is_empty() {
376                std::mem::take(&mut processor_guard.pending_batch)
377            } else {
378                return Ok(());
379            }
380        }; // Guards are dropped here
381
382        // Process operations and collect results
383        let start_time = Instant::now();
384        let (successful_ops, failed_ops) = {
385            let index_guard = index.write();
386            if let Ok(mut index_ref) = index_guard {
387                let mut successful = 0;
388                let mut failed = 0;
389
390                for operation in &operations {
391                    match Self::apply_operation(&mut *index_ref, operation) {
392                        Ok(_) => successful += 1,
393                        Err(_) => failed += 1,
394                    }
395                }
396                (successful, failed)
397            } else {
398                return Err(anyhow!("Failed to acquire index lock"));
399            }
400        }; // index_guard is dropped here
401
402        let processing_time = start_time.elapsed();
403
404        // Update statistics without holding lock across await
405        {
406            let stats_guard = stats.write();
407            if let Ok(mut stats_ref) = stats_guard {
408                stats_ref.total_batches += 1;
409                stats_ref.total_updates += successful_ops;
410                stats_ref.failed_updates += failed_ops;
411                stats_ref.average_batch_size = (stats_ref.average_batch_size
412                    * (stats_ref.total_batches - 1) as f64
413                    + operations.len() as f64)
414                    / stats_ref.total_batches as f64;
415
416                // Update average processing time
417                let total_time = stats_ref.average_processing_time.as_nanos() as f64
418                    * (stats_ref.total_batches - 1) as f64
419                    + processing_time.as_nanos() as f64;
420                stats_ref.average_processing_time =
421                    Duration::from_nanos((total_time / stats_ref.total_batches as f64) as u64);
422            }
423        }; // stats_guard is dropped here
424
425        // Update processor state - separate async scope
426        {
427            let mut processor_guard = processor.lock().await;
428            processor_guard.total_updates_since_rebuild += successful_ops as usize;
429        }
430
431        Ok(())
432    }
433
434    /// Apply single operation to index
435    /// Count the actual number of individual operations (handle batch operations)
436    fn count_operations(operation: &UpdateOperation) -> u64 {
437        match operation {
438            UpdateOperation::Insert { .. }
439            | UpdateOperation::Update { .. }
440            | UpdateOperation::Delete { .. } => 1,
441            UpdateOperation::Batch { operations } => {
442                operations.iter().map(Self::count_operations).sum()
443            }
444        }
445    }
446
447    fn apply_operation(index: &mut dyn VectorIndex, operation: &UpdateOperation) -> Result<()> {
448        match operation {
449            UpdateOperation::Insert {
450                id,
451                vector,
452                metadata,
453            } => {
454                let vector_obj = crate::Vector::new(vector.clone());
455                index.add_vector(id.clone(), vector_obj, Some(metadata.clone()))?;
456            }
457            UpdateOperation::Update {
458                id,
459                vector,
460                metadata,
461            } => {
462                // Update vector
463                let vector_obj = crate::Vector::new(vector.clone());
464                index.update_vector(id.clone(), vector_obj)?;
465
466                // Update metadata if provided
467                if let Some(meta) = metadata {
468                    index.update_metadata(id.clone(), meta.clone())?;
469                }
470            }
471            UpdateOperation::Delete { id } => {
472                index.remove_vector(id.clone())?;
473            }
474            UpdateOperation::Batch { operations } => {
475                for op in operations {
476                    Self::apply_operation(index, op)?;
477                }
478            }
479        }
480        Ok(())
481    }
482
483    /// Perform index compaction
484    async fn perform_compaction(
485        index: &Arc<RwLock<dyn VectorIndex + Send + Sync>>,
486        stats: &Arc<RwLock<UpdateStats>>,
487    ) -> Result<()> {
488        let index_guard = index.read().expect("rwlock should not be poisoned");
489        // Compaction logic would go here
490        // This is a placeholder
491        drop(index_guard);
492
493        let mut stats_guard = stats.write().expect("rwlock should not be poisoned");
494        stats_guard.last_compaction = Some(Instant::now());
495
496        Ok(())
497    }
498
499    /// Process batch synchronously
500    async fn process_batch(&self, processor: &mut BatchProcessor) -> Result<()> {
501        if processor.pending_batch.is_empty() {
502            return Ok(());
503        }
504
505        let start_time = Instant::now();
506        let operations = std::mem::take(&mut processor.pending_batch);
507
508        let mut index = self.index.write().expect("rwlock should not be poisoned");
509        let mut successful_ops = 0;
510        let mut failed_ops = 0;
511
512        for operation in &operations {
513            match Self::apply_operation(&mut *index, operation) {
514                Ok(_) => {
515                    // Count actual number of operations (handle batch operations properly)
516                    successful_ops += Self::count_operations(operation);
517                }
518                Err(_) => {
519                    failed_ops += Self::count_operations(operation);
520                }
521            }
522        }
523
524        drop(index);
525
526        // Update statistics
527        let processing_time = start_time.elapsed();
528        let mut stats = self.stats.write().expect("rwlock should not be poisoned");
529        stats.total_batches += 1;
530        stats.total_updates += successful_ops;
531        stats.failed_updates += failed_ops;
532
533        // Update average processing time
534        let total_time = stats.average_processing_time.as_nanos() as f64
535            * (stats.total_batches - 1) as f64
536            + processing_time.as_nanos() as f64;
537        stats.average_processing_time =
538            Duration::from_nanos((total_time / stats.total_batches as f64) as u64);
539
540        processor.total_updates_since_rebuild += successful_ops as usize;
541        processor.batch_start_time = None;
542
543        Ok(())
544    }
545}
546
547impl BatchProcessor {
548    fn new() -> Self {
549        Self {
550            pending_batch: Vec::new(),
551            batch_start_time: None,
552            total_updates_since_rebuild: 0,
553            last_rebuild: None,
554        }
555    }
556}
557
558/// Search cache type alias for readability
559type SearchCache = Arc<RwLock<HashMap<String, (Vec<SimilarityResult>, Instant)>>>;
560
561/// Real-time search interface that handles live updates
562pub struct RealTimeVectorSearch {
563    updater: Arc<RealTimeVectorUpdater>,
564    search_cache: SearchCache,
565    cache_ttl: Duration,
566}
567
568impl RealTimeVectorSearch {
569    /// Create new real-time search interface
570    pub fn new(updater: Arc<RealTimeVectorUpdater>) -> Self {
571        Self {
572            updater,
573            search_cache: Arc::new(RwLock::new(HashMap::new())),
574            cache_ttl: Duration::from_secs(60), // 1 minute cache
575        }
576    }
577
578    /// Perform similarity search with real-time updates
579    pub async fn similarity_search(
580        &self,
581        query_vector: &[f32],
582        k: usize,
583    ) -> Result<Vec<SimilarityResult>> {
584        let query_hash = self.compute_query_hash(query_vector, k);
585
586        // Check cache first
587        if let Some(cached_results) = self.get_cached_results(&query_hash) {
588            return Ok(cached_results);
589        }
590
591        // Perform search
592        let index = self
593            .updater
594            .index
595            .read()
596            .expect("rwlock should not be poisoned");
597        // Create Vector from query slice
598        let query_vec = crate::Vector::new(query_vector.to_vec());
599        let search_results = index.search_knn(&query_vec, k)?;
600        drop(index);
601
602        // Convert to SimilarityResult
603        let results: Vec<crate::similarity::SimilarityResult> = search_results
604            .into_iter()
605            .enumerate()
606            .map(
607                |(idx, (uri, similarity))| crate::similarity::SimilarityResult {
608                    id: format!(
609                        "rt_{}_{}",
610                        idx,
611                        std::time::SystemTime::now()
612                            .duration_since(std::time::UNIX_EPOCH)
613                            .unwrap_or_default()
614                            .as_millis()
615                    ),
616                    uri,
617                    similarity,
618                    metrics: std::collections::HashMap::new(),
619                    metadata: None,
620                },
621            )
622            .collect();
623
624        // Cache results
625        self.cache_results(query_hash, &results);
626
627        Ok(results)
628    }
629
630    /// Invalidate search cache (called after updates)
631    pub fn invalidate_cache(&self) {
632        let mut cache = self
633            .search_cache
634            .write()
635            .expect("rwlock should not be poisoned");
636        cache.clear();
637    }
638
639    /// Get cached search results
640    fn get_cached_results(&self, query_hash: &str) -> Option<Vec<SimilarityResult>> {
641        let cache = self
642            .search_cache
643            .read()
644            .expect("rwlock should not be poisoned");
645        cache.get(query_hash).and_then(|(results, timestamp)| {
646            if timestamp.elapsed() < self.cache_ttl {
647                Some(results.clone())
648            } else {
649                None
650            }
651        })
652    }
653
654    /// Cache search results
655    fn cache_results(&self, query_hash: String, results: &[SimilarityResult]) {
656        let mut cache = self
657            .search_cache
658            .write()
659            .expect("rwlock should not be poisoned");
660        cache.insert(query_hash, (results.to_vec(), Instant::now()));
661
662        // Cleanup old cache entries
663        cache.retain(|_, (_, timestamp)| timestamp.elapsed() < self.cache_ttl);
664    }
665
666    /// Compute query hash for caching
667    fn compute_query_hash(&self, query_vector: &[f32], k: usize) -> String {
668        // Simple hash implementation
669        let mut hash = k as u64;
670        for &value in query_vector {
671            hash = hash.wrapping_mul(31).wrapping_add(value.to_bits() as u64);
672        }
673        hash.to_string()
674    }
675}
676
677#[cfg(test)]
678mod tests {
679    use super::*;
680    use crate::MemoryVectorIndex;
681
682    #[tokio::test]
683    async fn test_real_time_updater() {
684        let index = Arc::new(RwLock::new(MemoryVectorIndex::new()));
685        let config = RealTimeConfig::default();
686        let updater = RealTimeVectorUpdater::new(index, config).unwrap();
687
688        // Test basic operations
689        let operation = UpdateOperation::Insert {
690            id: "1".to_string(),
691            vector: vec![1.0, 2.0, 3.0],
692            metadata: HashMap::new(),
693        };
694
695        updater.submit_update(operation).await.unwrap();
696        updater.flush_pending_updates().await.unwrap();
697
698        let stats = updater.get_stats();
699        assert!(stats.total_updates > 0);
700    }
701
702    #[tokio::test]
703    async fn test_batch_operations() {
704        let index = Arc::new(RwLock::new(MemoryVectorIndex::new()));
705        let config = RealTimeConfig::default();
706        let updater = RealTimeVectorUpdater::new(index, config).unwrap();
707
708        let operations = vec![
709            UpdateOperation::Insert {
710                id: "1".to_string(),
711                vector: vec![1.0, 0.0],
712                metadata: HashMap::new(),
713            },
714            UpdateOperation::Insert {
715                id: "2".to_string(),
716                vector: vec![0.0, 1.0],
717                metadata: HashMap::new(),
718            },
719        ];
720
721        updater.submit_batch(operations).await.unwrap();
722        updater.flush_pending_updates().await.unwrap();
723
724        let stats = updater.get_stats();
725        assert_eq!(stats.total_updates, 2);
726    }
727}