1use crate::similarity::SimilarityResult;
12use crate::{index::VectorIndex, VectorId};
13use anyhow::{anyhow, Result};
14use std::collections::{HashMap, VecDeque};
15use std::sync::{Arc, RwLock};
16use std::time::{Duration, Instant};
17use tokio::sync::{watch, Mutex};
18use tokio::time::interval;
19
20#[derive(Debug, Clone)]
22pub enum UpdateOperation {
23 Insert {
25 id: VectorId,
26 vector: Vec<f32>,
27 metadata: HashMap<String, String>,
28 },
29 Update {
31 id: VectorId,
32 vector: Vec<f32>,
33 metadata: Option<HashMap<String, String>>,
34 },
35 Delete { id: VectorId },
37 Batch { operations: Vec<UpdateOperation> },
39}
40
41#[derive(Debug, Clone)]
43pub struct UpdateBatch {
44 pub operations: Vec<UpdateOperation>,
45 pub timestamp: Instant,
46 pub priority: UpdatePriority,
47}
48
49#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
51pub enum UpdatePriority {
52 Low = 0,
53 Normal = 1,
54 High = 2,
55 Critical = 3,
56}
57
58#[derive(Debug, Clone)]
60pub struct RealTimeConfig {
61 pub max_batch_size: usize,
63 pub max_batch_wait: Duration,
65 pub buffer_capacity: usize,
67 pub background_compaction: bool,
69 pub compaction_interval: Duration,
71 pub enable_rebuilding: bool,
73 pub rebuild_threshold: f64,
75}
76
77impl Default for RealTimeConfig {
78 fn default() -> Self {
79 Self {
80 max_batch_size: 1000,
81 max_batch_wait: Duration::from_millis(100),
82 buffer_capacity: 10000,
83 background_compaction: true,
84 compaction_interval: Duration::from_secs(300), enable_rebuilding: true,
86 rebuild_threshold: 0.3, }
88 }
89}
90
91pub struct RealTimeVectorUpdater {
93 config: RealTimeConfig,
95 update_queue: Arc<Mutex<VecDeque<UpdateOperation>>>,
97 batch_processor: Arc<Mutex<BatchProcessor>>,
99 index: Arc<RwLock<dyn VectorIndex + Send + Sync>>,
101 stats: Arc<RwLock<UpdateStats>>,
103 shutdown: watch::Sender<bool>,
105 tasks: Vec<tokio::task::JoinHandle<()>>,
107}
108
109#[derive(Debug, Clone, Default)]
111pub struct UpdateStats {
112 pub total_updates: u64,
113 pub total_inserts: u64,
114 pub total_deletes: u64,
115 pub total_batches: u64,
116 pub failed_updates: u64,
117 pub average_batch_size: f64,
118 pub average_processing_time: Duration,
119 pub last_compaction: Option<Instant>,
120 pub index_size: usize,
121 pub pending_updates: usize,
122}
123
124pub struct BatchProcessor {
126 pending_batch: Vec<UpdateOperation>,
127 batch_start_time: Option<Instant>,
128 total_updates_since_rebuild: usize,
129 last_rebuild: Option<Instant>,
130}
131
132impl RealTimeVectorUpdater {
133 pub fn new(
135 index: Arc<RwLock<dyn VectorIndex + Send + Sync>>,
136 config: RealTimeConfig,
137 ) -> Result<Self> {
138 let (shutdown_tx, _shutdown_rx) = watch::channel(false);
139
140 let updater = Self {
141 config: config.clone(),
142 update_queue: Arc::new(Mutex::new(VecDeque::new())),
143 batch_processor: Arc::new(Mutex::new(BatchProcessor::new())),
144 index: index.clone(),
145 stats: Arc::new(RwLock::new(UpdateStats::default())),
146 shutdown: shutdown_tx,
147 tasks: Vec::new(),
148 };
149
150 Ok(updater)
151 }
152
153 pub async fn start(&mut self) -> Result<()> {
155 let shutdown_rx = self.shutdown.subscribe();
156
157 let batch_task = self
159 .start_batch_processing_task(shutdown_rx.clone())
160 .await?;
161 self.tasks.push(batch_task);
162
163 if self.config.background_compaction {
165 let compaction_task = self.start_compaction_task(shutdown_rx.clone()).await?;
166 self.tasks.push(compaction_task);
167 }
168
169 Ok(())
170 }
171
172 pub async fn stop(&mut self) -> Result<()> {
174 self.shutdown
176 .send(true)
177 .map_err(|_| anyhow!("Failed to send shutdown signal"))?;
178
179 for task in self.tasks.drain(..) {
181 task.await.map_err(|e| anyhow!("Task join error: {}", e))?;
182 }
183
184 self.flush_pending_updates().await?;
186
187 Ok(())
188 }
189
190 pub async fn submit_update(&self, operation: UpdateOperation) -> Result<()> {
192 let mut queue = self.update_queue.lock().await;
193
194 if queue.len() >= self.config.buffer_capacity {
196 return Err(anyhow!("Update queue is full"));
197 }
198
199 queue.push_back(operation);
200 Ok(())
201 }
202
203 pub async fn submit_batch(&self, operations: Vec<UpdateOperation>) -> Result<()> {
205 let batch_op = UpdateOperation::Batch { operations };
206 self.submit_update(batch_op).await
207 }
208
209 pub fn get_stats(&self) -> UpdateStats {
211 self.stats
212 .read()
213 .expect("rwlock should not be poisoned")
214 .clone()
215 }
216
217 pub async fn compact_index(&self) -> Result<()> {
219 let _index = self.index.read().expect("rwlock should not be poisoned");
220 let mut stats = self.stats.write().expect("rwlock should not be poisoned");
224 stats.last_compaction = Some(Instant::now());
225
226 Ok(())
227 }
228
229 pub async fn rebuild_index_if_needed(&self) -> Result<bool> {
231 let index_size = {
232 let stats = self.stats.read().expect("rwlock should not be poisoned");
233 stats.index_size
234 };
235
236 if index_size == 0 {
237 return Ok(false);
238 }
239
240 let processor = self.batch_processor.lock().await;
241 let update_ratio = processor.total_updates_since_rebuild as f64 / index_size as f64;
242
243 if update_ratio >= self.config.rebuild_threshold {
244 drop(processor);
245
246 self.rebuild_index().await?;
248 Ok(true)
249 } else {
250 Ok(false)
251 }
252 }
253
254 pub async fn rebuild_index(&self) -> Result<()> {
256 let mut processor = self.batch_processor.lock().await;
260 processor.total_updates_since_rebuild = 0;
261 processor.last_rebuild = Some(Instant::now());
262
263 Ok(())
264 }
265
266 pub async fn flush_pending_updates(&self) -> Result<()> {
268 let mut queue = self.update_queue.lock().await;
269 let mut processor = self.batch_processor.lock().await;
270
271 while let Some(operation) = queue.pop_front() {
273 processor.pending_batch.push(operation);
274 }
275
276 if !processor.pending_batch.is_empty() {
278 self.process_batch(&mut processor).await?;
279 }
280
281 Ok(())
282 }
283
284 async fn start_batch_processing_task(
286 &self,
287 mut shutdown_rx: watch::Receiver<bool>,
288 ) -> Result<tokio::task::JoinHandle<()>> {
289 let queue = self.update_queue.clone();
290 let processor = self.batch_processor.clone();
291 let index = self.index.clone();
292 let stats = self.stats.clone();
293 let config = self.config.clone();
294
295 let task = tokio::spawn(async move {
296 let mut interval = interval(config.max_batch_wait);
297
298 loop {
299 tokio::select! {
300 _ = interval.tick() => {
301 if let Err(e) = Self::process_pending_batch(
303 &queue, &processor, &index, &stats, &config
304 ).await {
305 eprintln!("Batch processing error: {e}");
306 }
307 }
308 _ = shutdown_rx.changed() => {
309 if *shutdown_rx.borrow() {
310 break;
311 }
312 }
313 }
314 }
315 });
316
317 Ok(task)
318 }
319
320 async fn start_compaction_task(
322 &self,
323 mut shutdown_rx: watch::Receiver<bool>,
324 ) -> Result<tokio::task::JoinHandle<()>> {
325 let index = self.index.clone();
326 let stats = self.stats.clone();
327 let config = self.config.clone();
328
329 let task = tokio::spawn(async move {
330 let mut interval = interval(config.compaction_interval);
331
332 loop {
333 tokio::select! {
334 _ = interval.tick() => {
335 if let Err(e) = Self::perform_compaction(&index, &stats).await {
337 eprintln!("Compaction error: {e}");
338 }
339 }
340 _ = shutdown_rx.changed() => {
341 if *shutdown_rx.borrow() {
342 break;
343 }
344 }
345 }
346 }
347 });
348
349 Ok(task)
350 }
351
352 async fn process_pending_batch(
354 queue: &Arc<Mutex<VecDeque<UpdateOperation>>>,
355 processor: &Arc<Mutex<BatchProcessor>>,
356 index: &Arc<RwLock<dyn VectorIndex + Send + Sync>>,
357 stats: &Arc<RwLock<UpdateStats>>,
358 config: &RealTimeConfig,
359 ) -> Result<()> {
360 let operations = {
362 let mut queue_guard = queue.lock().await;
363 let mut processor_guard = processor.lock().await;
364
365 let mut batch_size = processor_guard.pending_batch.len();
367 while batch_size < config.max_batch_size && !queue_guard.is_empty() {
368 if let Some(operation) = queue_guard.pop_front() {
369 processor_guard.pending_batch.push(operation);
370 batch_size += 1;
371 }
372 }
373
374 if !processor_guard.pending_batch.is_empty() {
376 std::mem::take(&mut processor_guard.pending_batch)
377 } else {
378 return Ok(());
379 }
380 }; let start_time = Instant::now();
384 let (successful_ops, failed_ops) = {
385 let index_guard = index.write();
386 if let Ok(mut index_ref) = index_guard {
387 let mut successful = 0;
388 let mut failed = 0;
389
390 for operation in &operations {
391 match Self::apply_operation(&mut *index_ref, operation) {
392 Ok(_) => successful += 1,
393 Err(_) => failed += 1,
394 }
395 }
396 (successful, failed)
397 } else {
398 return Err(anyhow!("Failed to acquire index lock"));
399 }
400 }; let processing_time = start_time.elapsed();
403
404 {
406 let stats_guard = stats.write();
407 if let Ok(mut stats_ref) = stats_guard {
408 stats_ref.total_batches += 1;
409 stats_ref.total_updates += successful_ops;
410 stats_ref.failed_updates += failed_ops;
411 stats_ref.average_batch_size = (stats_ref.average_batch_size
412 * (stats_ref.total_batches - 1) as f64
413 + operations.len() as f64)
414 / stats_ref.total_batches as f64;
415
416 let total_time = stats_ref.average_processing_time.as_nanos() as f64
418 * (stats_ref.total_batches - 1) as f64
419 + processing_time.as_nanos() as f64;
420 stats_ref.average_processing_time =
421 Duration::from_nanos((total_time / stats_ref.total_batches as f64) as u64);
422 }
423 }; {
427 let mut processor_guard = processor.lock().await;
428 processor_guard.total_updates_since_rebuild += successful_ops as usize;
429 }
430
431 Ok(())
432 }
433
434 fn count_operations(operation: &UpdateOperation) -> u64 {
437 match operation {
438 UpdateOperation::Insert { .. }
439 | UpdateOperation::Update { .. }
440 | UpdateOperation::Delete { .. } => 1,
441 UpdateOperation::Batch { operations } => {
442 operations.iter().map(Self::count_operations).sum()
443 }
444 }
445 }
446
447 fn apply_operation(index: &mut dyn VectorIndex, operation: &UpdateOperation) -> Result<()> {
448 match operation {
449 UpdateOperation::Insert {
450 id,
451 vector,
452 metadata,
453 } => {
454 let vector_obj = crate::Vector::new(vector.clone());
455 index.add_vector(id.clone(), vector_obj, Some(metadata.clone()))?;
456 }
457 UpdateOperation::Update {
458 id,
459 vector,
460 metadata,
461 } => {
462 let vector_obj = crate::Vector::new(vector.clone());
464 index.update_vector(id.clone(), vector_obj)?;
465
466 if let Some(meta) = metadata {
468 index.update_metadata(id.clone(), meta.clone())?;
469 }
470 }
471 UpdateOperation::Delete { id } => {
472 index.remove_vector(id.clone())?;
473 }
474 UpdateOperation::Batch { operations } => {
475 for op in operations {
476 Self::apply_operation(index, op)?;
477 }
478 }
479 }
480 Ok(())
481 }
482
483 async fn perform_compaction(
485 index: &Arc<RwLock<dyn VectorIndex + Send + Sync>>,
486 stats: &Arc<RwLock<UpdateStats>>,
487 ) -> Result<()> {
488 let index_guard = index.read().expect("rwlock should not be poisoned");
489 drop(index_guard);
492
493 let mut stats_guard = stats.write().expect("rwlock should not be poisoned");
494 stats_guard.last_compaction = Some(Instant::now());
495
496 Ok(())
497 }
498
499 async fn process_batch(&self, processor: &mut BatchProcessor) -> Result<()> {
501 if processor.pending_batch.is_empty() {
502 return Ok(());
503 }
504
505 let start_time = Instant::now();
506 let operations = std::mem::take(&mut processor.pending_batch);
507
508 let mut index = self.index.write().expect("rwlock should not be poisoned");
509 let mut successful_ops = 0;
510 let mut failed_ops = 0;
511
512 for operation in &operations {
513 match Self::apply_operation(&mut *index, operation) {
514 Ok(_) => {
515 successful_ops += Self::count_operations(operation);
517 }
518 Err(_) => {
519 failed_ops += Self::count_operations(operation);
520 }
521 }
522 }
523
524 drop(index);
525
526 let processing_time = start_time.elapsed();
528 let mut stats = self.stats.write().expect("rwlock should not be poisoned");
529 stats.total_batches += 1;
530 stats.total_updates += successful_ops;
531 stats.failed_updates += failed_ops;
532
533 let total_time = stats.average_processing_time.as_nanos() as f64
535 * (stats.total_batches - 1) as f64
536 + processing_time.as_nanos() as f64;
537 stats.average_processing_time =
538 Duration::from_nanos((total_time / stats.total_batches as f64) as u64);
539
540 processor.total_updates_since_rebuild += successful_ops as usize;
541 processor.batch_start_time = None;
542
543 Ok(())
544 }
545}
546
547impl BatchProcessor {
548 fn new() -> Self {
549 Self {
550 pending_batch: Vec::new(),
551 batch_start_time: None,
552 total_updates_since_rebuild: 0,
553 last_rebuild: None,
554 }
555 }
556}
557
558type SearchCache = Arc<RwLock<HashMap<String, (Vec<SimilarityResult>, Instant)>>>;
560
561pub struct RealTimeVectorSearch {
563 updater: Arc<RealTimeVectorUpdater>,
564 search_cache: SearchCache,
565 cache_ttl: Duration,
566}
567
568impl RealTimeVectorSearch {
569 pub fn new(updater: Arc<RealTimeVectorUpdater>) -> Self {
571 Self {
572 updater,
573 search_cache: Arc::new(RwLock::new(HashMap::new())),
574 cache_ttl: Duration::from_secs(60), }
576 }
577
578 pub async fn similarity_search(
580 &self,
581 query_vector: &[f32],
582 k: usize,
583 ) -> Result<Vec<SimilarityResult>> {
584 let query_hash = self.compute_query_hash(query_vector, k);
585
586 if let Some(cached_results) = self.get_cached_results(&query_hash) {
588 return Ok(cached_results);
589 }
590
591 let index = self
593 .updater
594 .index
595 .read()
596 .expect("rwlock should not be poisoned");
597 let query_vec = crate::Vector::new(query_vector.to_vec());
599 let search_results = index.search_knn(&query_vec, k)?;
600 drop(index);
601
602 let results: Vec<crate::similarity::SimilarityResult> = search_results
604 .into_iter()
605 .enumerate()
606 .map(
607 |(idx, (uri, similarity))| crate::similarity::SimilarityResult {
608 id: format!(
609 "rt_{}_{}",
610 idx,
611 std::time::SystemTime::now()
612 .duration_since(std::time::UNIX_EPOCH)
613 .unwrap_or_default()
614 .as_millis()
615 ),
616 uri,
617 similarity,
618 metrics: std::collections::HashMap::new(),
619 metadata: None,
620 },
621 )
622 .collect();
623
624 self.cache_results(query_hash, &results);
626
627 Ok(results)
628 }
629
630 pub fn invalidate_cache(&self) {
632 let mut cache = self
633 .search_cache
634 .write()
635 .expect("rwlock should not be poisoned");
636 cache.clear();
637 }
638
639 fn get_cached_results(&self, query_hash: &str) -> Option<Vec<SimilarityResult>> {
641 let cache = self
642 .search_cache
643 .read()
644 .expect("rwlock should not be poisoned");
645 cache.get(query_hash).and_then(|(results, timestamp)| {
646 if timestamp.elapsed() < self.cache_ttl {
647 Some(results.clone())
648 } else {
649 None
650 }
651 })
652 }
653
654 fn cache_results(&self, query_hash: String, results: &[SimilarityResult]) {
656 let mut cache = self
657 .search_cache
658 .write()
659 .expect("rwlock should not be poisoned");
660 cache.insert(query_hash, (results.to_vec(), Instant::now()));
661
662 cache.retain(|_, (_, timestamp)| timestamp.elapsed() < self.cache_ttl);
664 }
665
666 fn compute_query_hash(&self, query_vector: &[f32], k: usize) -> String {
668 let mut hash = k as u64;
670 for &value in query_vector {
671 hash = hash.wrapping_mul(31).wrapping_add(value.to_bits() as u64);
672 }
673 hash.to_string()
674 }
675}
676
677#[cfg(test)]
678mod tests {
679 use super::*;
680 use crate::MemoryVectorIndex;
681
682 #[tokio::test]
683 async fn test_real_time_updater() {
684 let index = Arc::new(RwLock::new(MemoryVectorIndex::new()));
685 let config = RealTimeConfig::default();
686 let updater = RealTimeVectorUpdater::new(index, config).unwrap();
687
688 let operation = UpdateOperation::Insert {
690 id: "1".to_string(),
691 vector: vec![1.0, 2.0, 3.0],
692 metadata: HashMap::new(),
693 };
694
695 updater.submit_update(operation).await.unwrap();
696 updater.flush_pending_updates().await.unwrap();
697
698 let stats = updater.get_stats();
699 assert!(stats.total_updates > 0);
700 }
701
702 #[tokio::test]
703 async fn test_batch_operations() {
704 let index = Arc::new(RwLock::new(MemoryVectorIndex::new()));
705 let config = RealTimeConfig::default();
706 let updater = RealTimeVectorUpdater::new(index, config).unwrap();
707
708 let operations = vec![
709 UpdateOperation::Insert {
710 id: "1".to_string(),
711 vector: vec![1.0, 0.0],
712 metadata: HashMap::new(),
713 },
714 UpdateOperation::Insert {
715 id: "2".to_string(),
716 vector: vec![0.0, 1.0],
717 metadata: HashMap::new(),
718 },
719 ];
720
721 updater.submit_batch(operations).await.unwrap();
722 updater.flush_pending_updates().await.unwrap();
723
724 let stats = updater.get_stats();
725 assert_eq!(stats.total_updates, 2);
726 }
727}