1use crate::similarity::SimilarityResult;
12use crate::{index::VectorIndex, VectorId};
13use anyhow::{anyhow, Result};
14use std::collections::{HashMap, VecDeque};
15use std::sync::{Arc, RwLock};
16use std::time::{Duration, Instant};
17use tokio::sync::{watch, Mutex};
18use tokio::time::interval;
19
20#[derive(Debug, Clone)]
22pub enum UpdateOperation {
23 Insert {
25 id: VectorId,
26 vector: Vec<f32>,
27 metadata: HashMap<String, String>,
28 },
29 Update {
31 id: VectorId,
32 vector: Vec<f32>,
33 metadata: Option<HashMap<String, String>>,
34 },
35 Delete { id: VectorId },
37 Batch { operations: Vec<UpdateOperation> },
39}
40
41#[derive(Debug, Clone)]
43pub struct UpdateBatch {
44 pub operations: Vec<UpdateOperation>,
45 pub timestamp: Instant,
46 pub priority: UpdatePriority,
47}
48
49#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
51pub enum UpdatePriority {
52 Low = 0,
53 Normal = 1,
54 High = 2,
55 Critical = 3,
56}
57
58#[derive(Debug, Clone)]
60pub struct RealTimeConfig {
61 pub max_batch_size: usize,
63 pub max_batch_wait: Duration,
65 pub buffer_capacity: usize,
67 pub background_compaction: bool,
69 pub compaction_interval: Duration,
71 pub enable_rebuilding: bool,
73 pub rebuild_threshold: f64,
75}
76
77impl Default for RealTimeConfig {
78 fn default() -> Self {
79 Self {
80 max_batch_size: 1000,
81 max_batch_wait: Duration::from_millis(100),
82 buffer_capacity: 10000,
83 background_compaction: true,
84 compaction_interval: Duration::from_secs(300), enable_rebuilding: true,
86 rebuild_threshold: 0.3, }
88 }
89}
90
91pub struct RealTimeVectorUpdater {
93 config: RealTimeConfig,
95 update_queue: Arc<Mutex<VecDeque<UpdateOperation>>>,
97 batch_processor: Arc<Mutex<BatchProcessor>>,
99 index: Arc<RwLock<dyn VectorIndex + Send + Sync>>,
101 stats: Arc<RwLock<UpdateStats>>,
103 shutdown: watch::Sender<bool>,
105 tasks: Vec<tokio::task::JoinHandle<()>>,
107}
108
109#[derive(Debug, Clone, Default)]
111pub struct UpdateStats {
112 pub total_updates: u64,
113 pub total_inserts: u64,
114 pub total_deletes: u64,
115 pub total_batches: u64,
116 pub failed_updates: u64,
117 pub average_batch_size: f64,
118 pub average_processing_time: Duration,
119 pub last_compaction: Option<Instant>,
120 pub index_size: usize,
121 pub pending_updates: usize,
122}
123
124pub struct BatchProcessor {
126 pending_batch: Vec<UpdateOperation>,
127 batch_start_time: Option<Instant>,
128 total_updates_since_rebuild: usize,
129 last_rebuild: Option<Instant>,
130}
131
132impl RealTimeVectorUpdater {
133 pub fn new(
135 index: Arc<RwLock<dyn VectorIndex + Send + Sync>>,
136 config: RealTimeConfig,
137 ) -> Result<Self> {
138 let (shutdown_tx, _shutdown_rx) = watch::channel(false);
139
140 let updater = Self {
141 config: config.clone(),
142 update_queue: Arc::new(Mutex::new(VecDeque::new())),
143 batch_processor: Arc::new(Mutex::new(BatchProcessor::new())),
144 index: index.clone(),
145 stats: Arc::new(RwLock::new(UpdateStats::default())),
146 shutdown: shutdown_tx,
147 tasks: Vec::new(),
148 };
149
150 Ok(updater)
151 }
152
153 pub async fn start(&mut self) -> Result<()> {
155 let shutdown_rx = self.shutdown.subscribe();
156
157 let batch_task = self
159 .start_batch_processing_task(shutdown_rx.clone())
160 .await?;
161 self.tasks.push(batch_task);
162
163 if self.config.background_compaction {
165 let compaction_task = self.start_compaction_task(shutdown_rx.clone()).await?;
166 self.tasks.push(compaction_task);
167 }
168
169 Ok(())
170 }
171
172 pub async fn stop(&mut self) -> Result<()> {
174 self.shutdown
176 .send(true)
177 .map_err(|_| anyhow!("Failed to send shutdown signal"))?;
178
179 for task in self.tasks.drain(..) {
181 task.await.map_err(|e| anyhow!("Task join error: {}", e))?;
182 }
183
184 self.flush_pending_updates().await?;
186
187 Ok(())
188 }
189
190 pub async fn submit_update(&self, operation: UpdateOperation) -> Result<()> {
192 let mut queue = self.update_queue.lock().await;
193
194 if queue.len() >= self.config.buffer_capacity {
196 return Err(anyhow!("Update queue is full"));
197 }
198
199 queue.push_back(operation);
200 Ok(())
201 }
202
203 pub async fn submit_batch(&self, operations: Vec<UpdateOperation>) -> Result<()> {
205 let batch_op = UpdateOperation::Batch { operations };
206 self.submit_update(batch_op).await
207 }
208
209 pub fn get_stats(&self) -> UpdateStats {
211 self.stats.read().unwrap().clone()
212 }
213
214 pub async fn compact_index(&self) -> Result<()> {
216 let _index = self.index.read().unwrap();
217 let mut stats = self.stats.write().unwrap();
221 stats.last_compaction = Some(Instant::now());
222
223 Ok(())
224 }
225
226 pub async fn rebuild_index_if_needed(&self) -> Result<bool> {
228 let index_size = {
229 let stats = self.stats.read().unwrap();
230 stats.index_size
231 };
232
233 if index_size == 0 {
234 return Ok(false);
235 }
236
237 let processor = self.batch_processor.lock().await;
238 let update_ratio = processor.total_updates_since_rebuild as f64 / index_size as f64;
239
240 if update_ratio >= self.config.rebuild_threshold {
241 drop(processor);
242
243 self.rebuild_index().await?;
245 Ok(true)
246 } else {
247 Ok(false)
248 }
249 }
250
251 pub async fn rebuild_index(&self) -> Result<()> {
253 let mut processor = self.batch_processor.lock().await;
257 processor.total_updates_since_rebuild = 0;
258 processor.last_rebuild = Some(Instant::now());
259
260 Ok(())
261 }
262
263 pub async fn flush_pending_updates(&self) -> Result<()> {
265 let mut queue = self.update_queue.lock().await;
266 let mut processor = self.batch_processor.lock().await;
267
268 while let Some(operation) = queue.pop_front() {
270 processor.pending_batch.push(operation);
271 }
272
273 if !processor.pending_batch.is_empty() {
275 self.process_batch(&mut processor).await?;
276 }
277
278 Ok(())
279 }
280
281 async fn start_batch_processing_task(
283 &self,
284 mut shutdown_rx: watch::Receiver<bool>,
285 ) -> Result<tokio::task::JoinHandle<()>> {
286 let queue = self.update_queue.clone();
287 let processor = self.batch_processor.clone();
288 let index = self.index.clone();
289 let stats = self.stats.clone();
290 let config = self.config.clone();
291
292 let task = tokio::spawn(async move {
293 let mut interval = interval(config.max_batch_wait);
294
295 loop {
296 tokio::select! {
297 _ = interval.tick() => {
298 if let Err(e) = Self::process_pending_batch(
300 &queue, &processor, &index, &stats, &config
301 ).await {
302 eprintln!("Batch processing error: {e}");
303 }
304 }
305 _ = shutdown_rx.changed() => {
306 if *shutdown_rx.borrow() {
307 break;
308 }
309 }
310 }
311 }
312 });
313
314 Ok(task)
315 }
316
317 async fn start_compaction_task(
319 &self,
320 mut shutdown_rx: watch::Receiver<bool>,
321 ) -> Result<tokio::task::JoinHandle<()>> {
322 let index = self.index.clone();
323 let stats = self.stats.clone();
324 let config = self.config.clone();
325
326 let task = tokio::spawn(async move {
327 let mut interval = interval(config.compaction_interval);
328
329 loop {
330 tokio::select! {
331 _ = interval.tick() => {
332 if let Err(e) = Self::perform_compaction(&index, &stats).await {
334 eprintln!("Compaction error: {e}");
335 }
336 }
337 _ = shutdown_rx.changed() => {
338 if *shutdown_rx.borrow() {
339 break;
340 }
341 }
342 }
343 }
344 });
345
346 Ok(task)
347 }
348
349 async fn process_pending_batch(
351 queue: &Arc<Mutex<VecDeque<UpdateOperation>>>,
352 processor: &Arc<Mutex<BatchProcessor>>,
353 index: &Arc<RwLock<dyn VectorIndex + Send + Sync>>,
354 stats: &Arc<RwLock<UpdateStats>>,
355 config: &RealTimeConfig,
356 ) -> Result<()> {
357 let operations = {
359 let mut queue_guard = queue.lock().await;
360 let mut processor_guard = processor.lock().await;
361
362 let mut batch_size = processor_guard.pending_batch.len();
364 while batch_size < config.max_batch_size && !queue_guard.is_empty() {
365 if let Some(operation) = queue_guard.pop_front() {
366 processor_guard.pending_batch.push(operation);
367 batch_size += 1;
368 }
369 }
370
371 if !processor_guard.pending_batch.is_empty() {
373 std::mem::take(&mut processor_guard.pending_batch)
374 } else {
375 return Ok(());
376 }
377 }; let start_time = Instant::now();
381 let (successful_ops, failed_ops) = {
382 let index_guard = index.write();
383 if let Ok(mut index_ref) = index_guard {
384 let mut successful = 0;
385 let mut failed = 0;
386
387 for operation in &operations {
388 match Self::apply_operation(&mut *index_ref, operation) {
389 Ok(_) => successful += 1,
390 Err(_) => failed += 1,
391 }
392 }
393 (successful, failed)
394 } else {
395 return Err(anyhow!("Failed to acquire index lock"));
396 }
397 }; let processing_time = start_time.elapsed();
400
401 {
403 let stats_guard = stats.write();
404 if let Ok(mut stats_ref) = stats_guard {
405 stats_ref.total_batches += 1;
406 stats_ref.total_updates += successful_ops;
407 stats_ref.failed_updates += failed_ops;
408 stats_ref.average_batch_size = (stats_ref.average_batch_size
409 * (stats_ref.total_batches - 1) as f64
410 + operations.len() as f64)
411 / stats_ref.total_batches as f64;
412
413 let total_time = stats_ref.average_processing_time.as_nanos() as f64
415 * (stats_ref.total_batches - 1) as f64
416 + processing_time.as_nanos() as f64;
417 stats_ref.average_processing_time =
418 Duration::from_nanos((total_time / stats_ref.total_batches as f64) as u64);
419 }
420 }; {
424 let mut processor_guard = processor.lock().await;
425 processor_guard.total_updates_since_rebuild += successful_ops as usize;
426 }
427
428 Ok(())
429 }
430
431 fn count_operations(operation: &UpdateOperation) -> u64 {
434 match operation {
435 UpdateOperation::Insert { .. }
436 | UpdateOperation::Update { .. }
437 | UpdateOperation::Delete { .. } => 1,
438 UpdateOperation::Batch { operations } => {
439 operations.iter().map(Self::count_operations).sum()
440 }
441 }
442 }
443
444 fn apply_operation(index: &mut dyn VectorIndex, operation: &UpdateOperation) -> Result<()> {
445 match operation {
446 UpdateOperation::Insert {
447 id,
448 vector,
449 metadata,
450 } => {
451 let vector_obj = crate::Vector::new(vector.clone());
452 index.add_vector(id.clone(), vector_obj, Some(metadata.clone()))?;
453 }
454 UpdateOperation::Update {
455 id,
456 vector,
457 metadata,
458 } => {
459 let vector_obj = crate::Vector::new(vector.clone());
461 index.update_vector(id.clone(), vector_obj)?;
462
463 if let Some(meta) = metadata {
465 index.update_metadata(id.clone(), meta.clone())?;
466 }
467 }
468 UpdateOperation::Delete { id } => {
469 index.remove_vector(id.clone())?;
470 }
471 UpdateOperation::Batch { operations } => {
472 for op in operations {
473 Self::apply_operation(index, op)?;
474 }
475 }
476 }
477 Ok(())
478 }
479
480 async fn perform_compaction(
482 index: &Arc<RwLock<dyn VectorIndex + Send + Sync>>,
483 stats: &Arc<RwLock<UpdateStats>>,
484 ) -> Result<()> {
485 let index_guard = index.read().unwrap();
486 drop(index_guard);
489
490 let mut stats_guard = stats.write().unwrap();
491 stats_guard.last_compaction = Some(Instant::now());
492
493 Ok(())
494 }
495
496 async fn process_batch(&self, processor: &mut BatchProcessor) -> Result<()> {
498 if processor.pending_batch.is_empty() {
499 return Ok(());
500 }
501
502 let start_time = Instant::now();
503 let operations = std::mem::take(&mut processor.pending_batch);
504
505 let mut index = self.index.write().unwrap();
506 let mut successful_ops = 0;
507 let mut failed_ops = 0;
508
509 for operation in &operations {
510 match Self::apply_operation(&mut *index, operation) {
511 Ok(_) => {
512 successful_ops += Self::count_operations(operation);
514 }
515 Err(_) => {
516 failed_ops += Self::count_operations(operation);
517 }
518 }
519 }
520
521 drop(index);
522
523 let processing_time = start_time.elapsed();
525 let mut stats = self.stats.write().unwrap();
526 stats.total_batches += 1;
527 stats.total_updates += successful_ops;
528 stats.failed_updates += failed_ops;
529
530 let total_time = stats.average_processing_time.as_nanos() as f64
532 * (stats.total_batches - 1) as f64
533 + processing_time.as_nanos() as f64;
534 stats.average_processing_time =
535 Duration::from_nanos((total_time / stats.total_batches as f64) as u64);
536
537 processor.total_updates_since_rebuild += successful_ops as usize;
538 processor.batch_start_time = None;
539
540 Ok(())
541 }
542}
543
544impl BatchProcessor {
545 fn new() -> Self {
546 Self {
547 pending_batch: Vec::new(),
548 batch_start_time: None,
549 total_updates_since_rebuild: 0,
550 last_rebuild: None,
551 }
552 }
553}
554
555type SearchCache = Arc<RwLock<HashMap<String, (Vec<SimilarityResult>, Instant)>>>;
557
558pub struct RealTimeVectorSearch {
560 updater: Arc<RealTimeVectorUpdater>,
561 search_cache: SearchCache,
562 cache_ttl: Duration,
563}
564
565impl RealTimeVectorSearch {
566 pub fn new(updater: Arc<RealTimeVectorUpdater>) -> Self {
568 Self {
569 updater,
570 search_cache: Arc::new(RwLock::new(HashMap::new())),
571 cache_ttl: Duration::from_secs(60), }
573 }
574
575 pub async fn similarity_search(
577 &self,
578 query_vector: &[f32],
579 k: usize,
580 ) -> Result<Vec<SimilarityResult>> {
581 let query_hash = self.compute_query_hash(query_vector, k);
582
583 if let Some(cached_results) = self.get_cached_results(&query_hash) {
585 return Ok(cached_results);
586 }
587
588 let index = self.updater.index.read().unwrap();
590 let query_vec = crate::Vector::new(query_vector.to_vec());
592 let search_results = index.search_knn(&query_vec, k)?;
593 drop(index);
594
595 let results: Vec<crate::similarity::SimilarityResult> = search_results
597 .into_iter()
598 .enumerate()
599 .map(
600 |(idx, (uri, similarity))| crate::similarity::SimilarityResult {
601 id: format!(
602 "rt_{}_{}",
603 idx,
604 std::time::SystemTime::now()
605 .duration_since(std::time::UNIX_EPOCH)
606 .unwrap_or_default()
607 .as_millis()
608 ),
609 uri,
610 similarity,
611 metrics: std::collections::HashMap::new(),
612 metadata: None,
613 },
614 )
615 .collect();
616
617 self.cache_results(query_hash, &results);
619
620 Ok(results)
621 }
622
623 pub fn invalidate_cache(&self) {
625 let mut cache = self.search_cache.write().unwrap();
626 cache.clear();
627 }
628
629 fn get_cached_results(&self, query_hash: &str) -> Option<Vec<SimilarityResult>> {
631 let cache = self.search_cache.read().unwrap();
632 cache.get(query_hash).and_then(|(results, timestamp)| {
633 if timestamp.elapsed() < self.cache_ttl {
634 Some(results.clone())
635 } else {
636 None
637 }
638 })
639 }
640
641 fn cache_results(&self, query_hash: String, results: &[SimilarityResult]) {
643 let mut cache = self.search_cache.write().unwrap();
644 cache.insert(query_hash, (results.to_vec(), Instant::now()));
645
646 cache.retain(|_, (_, timestamp)| timestamp.elapsed() < self.cache_ttl);
648 }
649
650 fn compute_query_hash(&self, query_vector: &[f32], k: usize) -> String {
652 let mut hash = k as u64;
654 for &value in query_vector {
655 hash = hash.wrapping_mul(31).wrapping_add(value.to_bits() as u64);
656 }
657 hash.to_string()
658 }
659}
660
661#[cfg(test)]
662mod tests {
663 use super::*;
664 use crate::MemoryVectorIndex;
665
666 #[tokio::test]
667 async fn test_real_time_updater() {
668 let index = Arc::new(RwLock::new(MemoryVectorIndex::new()));
669 let config = RealTimeConfig::default();
670 let updater = RealTimeVectorUpdater::new(index, config).unwrap();
671
672 let operation = UpdateOperation::Insert {
674 id: "1".to_string(),
675 vector: vec![1.0, 2.0, 3.0],
676 metadata: HashMap::new(),
677 };
678
679 updater.submit_update(operation).await.unwrap();
680 updater.flush_pending_updates().await.unwrap();
681
682 let stats = updater.get_stats();
683 assert!(stats.total_updates > 0);
684 }
685
686 #[tokio::test]
687 async fn test_batch_operations() {
688 let index = Arc::new(RwLock::new(MemoryVectorIndex::new()));
689 let config = RealTimeConfig::default();
690 let updater = RealTimeVectorUpdater::new(index, config).unwrap();
691
692 let operations = vec![
693 UpdateOperation::Insert {
694 id: "1".to_string(),
695 vector: vec![1.0, 0.0],
696 metadata: HashMap::new(),
697 },
698 UpdateOperation::Insert {
699 id: "2".to_string(),
700 vector: vec![0.0, 1.0],
701 metadata: HashMap::new(),
702 },
703 ];
704
705 updater.submit_batch(operations).await.unwrap();
706 updater.flush_pending_updates().await.unwrap();
707
708 let stats = updater.get_stats();
709 assert_eq!(stats.total_updates, 2);
710 }
711}