1use crate::{Embedding, RragResult};
7use serde::{Deserialize, Serialize};
8use std::collections::{HashMap, VecDeque};
9use std::sync::Arc;
10use tokio::sync::RwLock;
11use uuid::Uuid;
12
13#[derive(Debug, Clone, Serialize, Deserialize)]
15pub struct VectorUpdateConfig {
16 pub enable_batch_processing: bool,
18
19 pub max_batch_size: usize,
21
22 pub batch_timeout_ms: u64,
24
25 pub update_strategy: IndexUpdateStrategy,
27
28 pub enable_optimization: bool,
30
31 pub optimization_interval_secs: u64,
33
34 pub enable_similarity_updates: bool,
36
37 pub similarity_threshold: f32,
39
40 pub max_concurrent_operations: usize,
42
43 pub monitoring: VectorMonitoringConfig,
45}
46
47#[derive(Debug, Clone, Serialize, Deserialize)]
49pub enum IndexUpdateStrategy {
50 Immediate,
52 Batch,
54 Lazy,
56 Adaptive,
58 Custom(String),
60}
61
62#[derive(Debug, Clone, Serialize, Deserialize)]
64pub struct VectorMonitoringConfig {
65 pub enable_performance_tracking: bool,
67
68 pub enable_memory_monitoring: bool,
70
71 pub enable_quality_metrics: bool,
73
74 pub metrics_interval_secs: u64,
76}
77
78impl Default for VectorUpdateConfig {
79 fn default() -> Self {
80 Self {
81 enable_batch_processing: true,
82 max_batch_size: 1000,
83 batch_timeout_ms: 5000,
84 update_strategy: IndexUpdateStrategy::Batch,
85 enable_optimization: true,
86 optimization_interval_secs: 3600, enable_similarity_updates: true,
88 similarity_threshold: 0.7,
89 max_concurrent_operations: 10,
90 monitoring: VectorMonitoringConfig::default(),
91 }
92 }
93}
94
95impl Default for VectorMonitoringConfig {
96 fn default() -> Self {
97 Self {
98 enable_performance_tracking: true,
99 enable_memory_monitoring: true,
100 enable_quality_metrics: true,
101 metrics_interval_secs: 300, }
103 }
104}
105
106#[derive(Debug, Clone, Serialize, Deserialize)]
108pub enum VectorOperation {
109 Add {
111 embeddings: Vec<Embedding>,
112 index_name: String,
113 },
114
115 Update {
117 embedding_updates: Vec<EmbeddingUpdate>,
118 index_name: String,
119 },
120
121 Remove {
123 embedding_ids: Vec<String>,
124 index_name: String,
125 },
126
127 Optimize {
129 index_name: String,
130 optimization_type: OptimizationType,
131 },
132
133 Rebuild {
135 index_name: String,
136 embeddings: Vec<Embedding>,
137 },
138
139 UpdateThresholds {
141 index_name: String,
142 new_threshold: f32,
143 },
144}
145
146#[derive(Debug, Clone, Serialize, Deserialize)]
148pub struct EmbeddingUpdate {
149 pub embedding_id: String,
151
152 pub new_embedding: Embedding,
154
155 pub update_reason: UpdateReason,
157
158 pub metadata: HashMap<String, serde_json::Value>,
160}
161
162#[derive(Debug, Clone, Serialize, Deserialize)]
164pub enum UpdateReason {
165 ContentChanged,
167 ModelUpdated,
169 QualityImprovement,
171 MetadataUpdated,
173 ErrorCorrection,
175 Manual,
177}
178
179#[derive(Debug, Clone, Serialize, Deserialize)]
181pub enum OptimizationType {
182 Compact,
184 RebuildTrees,
186 UpdateClustering,
188 QueryOptimization,
190 MemoryOptimization,
192 Full,
194}
195
196#[derive(Debug, Clone, Serialize, Deserialize)]
198pub struct VectorBatch {
199 pub batch_id: String,
201
202 pub operations: Vec<VectorOperation>,
204
205 pub created_at: chrono::DateTime<chrono::Utc>,
207
208 pub index_name: String,
210
211 pub priority: u8,
213
214 pub estimated_duration_ms: u64,
216
217 pub metadata: HashMap<String, serde_json::Value>,
219}
220
221#[derive(Debug, Clone, Serialize, Deserialize)]
223pub struct VectorOperationResult {
224 pub operation_id: String,
226
227 pub success: bool,
229
230 pub embeddings_processed: usize,
232
233 pub processing_time_ms: u64,
235
236 pub index_stats: Option<IndexStats>,
238
239 pub performance_metrics: OperationMetrics,
241
242 pub errors: Vec<String>,
244
245 pub metadata: HashMap<String, serde_json::Value>,
247}
248
249#[derive(Debug, Clone, Serialize, Deserialize)]
251pub struct IndexStats {
252 pub index_name: String,
254
255 pub embedding_count: usize,
257
258 pub size_bytes: u64,
260
261 pub dimensions: usize,
263
264 pub index_type: String,
266
267 pub memory_usage_bytes: u64,
269
270 pub last_optimized_at: Option<chrono::DateTime<chrono::Utc>>,
272
273 pub quality_metrics: IndexQualityMetrics,
275
276 pub performance_metrics: IndexPerformanceMetrics,
278}
279
280#[derive(Debug, Clone, Serialize, Deserialize)]
282pub struct IndexQualityMetrics {
283 pub recall_at_10: f32,
285
286 pub precision: f32,
288
289 pub freshness_score: f32,
291
292 pub clustering_quality: f32,
294
295 pub distribution_balance: f32,
297}
298
299#[derive(Debug, Clone, Serialize, Deserialize)]
301pub struct IndexPerformanceMetrics {
302 pub avg_query_time_ms: f32,
304
305 pub p95_query_time_ms: f32,
307
308 pub queries_per_second: f32,
310
311 pub build_time_ms: u64,
313
314 pub memory_efficiency: f32,
316}
317
318#[derive(Debug, Clone, Serialize, Deserialize)]
320pub struct OperationMetrics {
321 pub cpu_time_ms: u64,
323
324 pub peak_memory_mb: f32,
326
327 pub io_operations: u64,
329
330 pub cache_hit_rate: f32,
332
333 pub throughput_eps: f32,
335}
336
337pub struct VectorUpdateManager {
339 config: VectorUpdateConfig,
341
342 pending_operations: Arc<RwLock<VecDeque<VectorOperation>>>,
344
345 active_batches: Arc<RwLock<HashMap<String, VectorBatch>>>,
347
348 index_metadata: Arc<RwLock<HashMap<String, IndexStats>>>,
350
351 operation_history: Arc<RwLock<VecDeque<VectorOperationResult>>>,
353
354 metrics: Arc<RwLock<VectorUpdateMetrics>>,
356
357 task_handles: Arc<tokio::sync::Mutex<Vec<tokio::task::JoinHandle<()>>>>,
359}
360
361#[derive(Debug, Clone, Serialize, Deserialize)]
363pub struct VectorUpdateMetrics {
364 pub total_operations: u64,
366
367 pub operations_by_type: HashMap<String, u64>,
369
370 pub success_rate: f32,
372
373 pub avg_processing_time_ms: f32,
375
376 pub total_embeddings_processed: u64,
378
379 pub index_stats: HashMap<String, IndexStats>,
381
382 pub system_performance: SystemPerformanceMetrics,
384
385 pub last_updated: chrono::DateTime<chrono::Utc>,
387}
388
389#[derive(Debug, Clone, Serialize, Deserialize)]
391pub struct SystemPerformanceMetrics {
392 pub overall_throughput_eps: f32,
394
395 pub memory_usage_mb: f32,
397
398 pub cpu_utilization_percent: f32,
400
401 pub queue_depth: usize,
403
404 pub active_operations: usize,
406
407 pub health_score: f32,
409}
410
411impl VectorUpdateManager {
412 pub async fn new(config: VectorUpdateConfig) -> RragResult<Self> {
414 let manager = Self {
415 config: config.clone(),
416 pending_operations: Arc::new(RwLock::new(VecDeque::new())),
417 active_batches: Arc::new(RwLock::new(HashMap::new())),
418 index_metadata: Arc::new(RwLock::new(HashMap::new())),
419 operation_history: Arc::new(RwLock::new(VecDeque::new())),
420 metrics: Arc::new(RwLock::new(VectorUpdateMetrics {
421 total_operations: 0,
422 operations_by_type: HashMap::new(),
423 success_rate: 1.0,
424 avg_processing_time_ms: 0.0,
425 total_embeddings_processed: 0,
426 index_stats: HashMap::new(),
427 system_performance: SystemPerformanceMetrics {
428 overall_throughput_eps: 0.0,
429 memory_usage_mb: 0.0,
430 cpu_utilization_percent: 0.0,
431 queue_depth: 0,
432 active_operations: 0,
433 health_score: 1.0,
434 },
435 last_updated: chrono::Utc::now(),
436 })),
437 task_handles: Arc::new(tokio::sync::Mutex::new(Vec::new())),
438 };
439
440 manager.start_background_tasks().await?;
441 Ok(manager)
442 }
443
444 pub async fn submit_operation(&self, operation: VectorOperation) -> RragResult<String> {
446 let operation_id = Uuid::new_v4().to_string();
447
448 if self.config.enable_batch_processing {
449 match self.config.update_strategy {
450 IndexUpdateStrategy::Batch => {
451 self.add_to_batch(operation).await?;
452 }
453 IndexUpdateStrategy::Immediate => {
454 self.process_immediate(operation).await?;
455 }
456 _ => {
457 let mut pending = self.pending_operations.write().await;
458 pending.push_back(operation);
459 }
460 }
461 } else {
462 self.process_immediate(operation).await?;
463 }
464
465 Ok(operation_id)
466 }
467
468 pub async fn process_embedding_updates(
470 &self,
471 updates: Vec<EmbeddingUpdate>,
472 index_name: &str,
473 ) -> RragResult<VectorOperationResult> {
474 let start_time = std::time::Instant::now();
475 let operation_id = Uuid::new_v4().to_string();
476
477 let mut processed_count = 0;
479 let mut errors = Vec::new();
480
481 for update in &updates {
482 match self
483 .process_single_embedding_update(update, index_name)
484 .await
485 {
486 Ok(_) => processed_count += 1,
487 Err(e) => errors.push(e.to_string()),
488 }
489 }
490
491 let processing_time = start_time.elapsed().as_millis() as u64;
492 let success = processed_count > 0;
493
494 let index_stats = self.update_index_stats(index_name, processed_count).await?;
496
497 let result = VectorOperationResult {
498 operation_id,
499 success,
500 embeddings_processed: processed_count,
501 processing_time_ms: processing_time,
502 index_stats: Some(index_stats),
503 performance_metrics: OperationMetrics {
504 cpu_time_ms: processing_time,
505 peak_memory_mb: 10.0, io_operations: processed_count as u64,
507 cache_hit_rate: 0.8,
508 throughput_eps: processed_count as f32 / (processing_time as f32 / 1000.0),
509 },
510 errors,
511 metadata: HashMap::new(),
512 };
513
514 self.store_operation_result(result.clone()).await?;
516
517 Ok(result)
518 }
519
520 pub async fn optimize_index(
522 &self,
523 index_name: &str,
524 optimization_type: OptimizationType,
525 ) -> RragResult<VectorOperationResult> {
526 let start_time = std::time::Instant::now();
527 let operation_id = Uuid::new_v4().to_string();
528
529 let optimization_result = self
531 .perform_optimization(index_name, &optimization_type)
532 .await?;
533 let processing_time = start_time.elapsed().as_millis() as u64;
534
535 let mut index_metadata = self.index_metadata.write().await;
537 if let Some(stats) = index_metadata.get_mut(index_name) {
538 stats.last_optimized_at = Some(chrono::Utc::now());
539 stats.quality_metrics = optimization_result.new_quality_metrics;
540 stats.performance_metrics = optimization_result.new_performance_metrics;
541 }
542
543 let result = VectorOperationResult {
544 operation_id,
545 success: optimization_result.success,
546 embeddings_processed: optimization_result.embeddings_affected,
547 processing_time_ms: processing_time,
548 index_stats: index_metadata.get(index_name).cloned(),
549 performance_metrics: OperationMetrics {
550 cpu_time_ms: processing_time,
551 peak_memory_mb: optimization_result.peak_memory_usage,
552 io_operations: optimization_result.io_operations,
553 cache_hit_rate: 0.9,
554 throughput_eps: 0.0, },
556 errors: optimization_result.errors,
557 metadata: optimization_result.metadata,
558 };
559
560 self.store_operation_result(result.clone()).await?;
561 Ok(result)
562 }
563
564 pub async fn get_index_stats(&self, index_name: &str) -> RragResult<Option<IndexStats>> {
566 let metadata = self.index_metadata.read().await;
567 Ok(metadata.get(index_name).cloned())
568 }
569
570 pub async fn get_all_index_stats(&self) -> RragResult<HashMap<String, IndexStats>> {
572 let metadata = self.index_metadata.read().await;
573 Ok(metadata.clone())
574 }
575
576 pub async fn get_metrics(&self) -> VectorUpdateMetrics {
578 let mut metrics = self.metrics.read().await.clone();
579
580 metrics.system_performance.queue_depth = {
582 let pending = self.pending_operations.read().await;
583 pending.len()
584 };
585
586 metrics.system_performance.active_operations = {
587 let batches = self.active_batches.read().await;
588 batches.len()
589 };
590
591 metrics.last_updated = chrono::Utc::now();
592 metrics
593 }
594
595 pub async fn get_operation_history(
597 &self,
598 limit: Option<usize>,
599 ) -> RragResult<Vec<VectorOperationResult>> {
600 let history = self.operation_history.read().await;
601 let limit = limit.unwrap_or(history.len());
602 Ok(history.iter().rev().take(limit).cloned().collect())
603 }
604
605 pub async fn health_check(&self) -> RragResult<bool> {
607 let handles = self.task_handles.lock().await;
608 let all_running = handles.iter().all(|handle| !handle.is_finished());
609
610 let metrics = self.get_metrics().await;
611 let healthy_performance = metrics.system_performance.health_score > 0.8;
612 let low_error_rate = metrics.success_rate > 0.9;
613
614 Ok(all_running && healthy_performance && low_error_rate)
615 }
616
617 async fn start_background_tasks(&self) -> RragResult<()> {
619 let mut handles = self.task_handles.lock().await;
620
621 handles.push(self.start_operation_processor().await);
623
624 if self.config.enable_batch_processing {
626 handles.push(self.start_batch_processor().await);
627 }
628
629 if self.config.enable_optimization {
631 handles.push(self.start_index_optimizer().await);
632 }
633
634 if self.config.monitoring.enable_performance_tracking {
636 handles.push(self.start_metrics_collector().await);
637 }
638
639 Ok(())
640 }
641
642 async fn start_operation_processor(&self) -> tokio::task::JoinHandle<()> {
644 let pending_operations = Arc::clone(&self.pending_operations);
645
646 tokio::spawn(async move {
647 loop {
648 let operation = {
649 let mut pending = pending_operations.write().await;
650 pending.pop_front()
651 };
652
653 if let Some(_op) = operation {
654 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
656 } else {
657 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
659 }
660 }
661 })
662 }
663
664 async fn start_batch_processor(&self) -> tokio::task::JoinHandle<()> {
666 let active_batches = Arc::clone(&self.active_batches);
667 let config = self.config.clone();
668
669 tokio::spawn(async move {
670 let mut interval =
671 tokio::time::interval(tokio::time::Duration::from_millis(config.batch_timeout_ms));
672
673 loop {
674 interval.tick().await;
675
676 let batches_to_process = {
678 let batches = active_batches.read().await;
679 batches.values().cloned().collect::<Vec<_>>()
680 };
681
682 for batch in batches_to_process {
683 if batch.operations.len() >= config.max_batch_size
684 || chrono::Utc::now()
685 .signed_duration_since(batch.created_at)
686 .num_milliseconds()
687 >= config.batch_timeout_ms as i64
688 {
689 {
691 let mut batches = active_batches.write().await;
692 batches.remove(&batch.batch_id);
693 }
694 }
696 }
697 }
698 })
699 }
700
701 async fn start_index_optimizer(&self) -> tokio::task::JoinHandle<()> {
703 let index_metadata = Arc::clone(&self.index_metadata);
704 let config = self.config.clone();
705
706 tokio::spawn(async move {
707 let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(
708 config.optimization_interval_secs,
709 ));
710
711 loop {
712 interval.tick().await;
713
714 let indexes_to_optimize = {
716 let metadata = index_metadata.read().await;
717 metadata
718 .keys()
719 .filter(|index_name| {
720 if let Some(stats) = metadata.get(*index_name) {
721 stats.last_optimized_at.map_or(true, |last_opt| {
723 chrono::Utc::now()
724 .signed_duration_since(last_opt)
725 .num_hours()
726 >= 1
727 })
728 } else {
729 false
730 }
731 })
732 .cloned()
733 .collect::<Vec<String>>()
734 };
735
736 for index_name in indexes_to_optimize {
738 tracing::debug!("Triggering optimization for index: {}", index_name);
740 }
741 }
742 })
743 }
744
745 async fn start_metrics_collector(&self) -> tokio::task::JoinHandle<()> {
747 let metrics = Arc::clone(&self.metrics);
748 let config = self.config.clone();
749
750 tokio::spawn(async move {
751 let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(
752 config.monitoring.metrics_interval_secs,
753 ));
754
755 loop {
756 interval.tick().await;
757
758 let mut metrics_guard = metrics.write().await;
760
761 metrics_guard.system_performance = SystemPerformanceMetrics {
762 overall_throughput_eps: 100.0, memory_usage_mb: 256.0, cpu_utilization_percent: 45.0, queue_depth: 0, active_operations: 0, health_score: 0.95, };
769
770 metrics_guard.last_updated = chrono::Utc::now();
771 }
772 })
773 }
774
775 async fn add_to_batch(&self, operation: VectorOperation) -> RragResult<()> {
777 let index_name = self.extract_index_name(&operation)?;
778 let batch_id = format!("batch_{}", index_name);
779
780 let mut batches = self.active_batches.write().await;
781 let batch = batches
782 .entry(batch_id.clone())
783 .or_insert_with(|| VectorBatch {
784 batch_id: batch_id.clone(),
785 operations: Vec::new(),
786 created_at: chrono::Utc::now(),
787 index_name: index_name.clone(),
788 priority: 5,
789 estimated_duration_ms: 1000,
790 metadata: HashMap::new(),
791 });
792
793 batch.operations.push(operation);
794 Ok(())
795 }
796
797 async fn process_immediate(&self, _operation: VectorOperation) -> RragResult<()> {
799 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
801 Ok(())
802 }
803
804 async fn process_single_embedding_update(
806 &self,
807 _update: &EmbeddingUpdate,
808 _index_name: &str,
809 ) -> RragResult<()> {
810 tokio::time::sleep(tokio::time::Duration::from_millis(5)).await;
812 Ok(())
813 }
814
815 async fn update_index_stats(
817 &self,
818 index_name: &str,
819 processed_count: usize,
820 ) -> RragResult<IndexStats> {
821 let mut metadata = self.index_metadata.write().await;
822 let stats = metadata.entry(index_name.to_string()).or_insert_with(|| {
823 IndexStats {
824 index_name: index_name.to_string(),
825 embedding_count: 0,
826 size_bytes: 0,
827 dimensions: 768, index_type: "flat".to_string(),
829 memory_usage_bytes: 0,
830 last_optimized_at: None,
831 quality_metrics: IndexQualityMetrics {
832 recall_at_10: 0.9,
833 precision: 0.85,
834 freshness_score: 1.0,
835 clustering_quality: 0.8,
836 distribution_balance: 0.75,
837 },
838 performance_metrics: IndexPerformanceMetrics {
839 avg_query_time_ms: 10.0,
840 p95_query_time_ms: 50.0,
841 queries_per_second: 100.0,
842 build_time_ms: 1000,
843 memory_efficiency: 0.8,
844 },
845 }
846 });
847
848 stats.embedding_count += processed_count;
849 stats.size_bytes += processed_count as u64 * 768 * 4; stats.memory_usage_bytes = stats.size_bytes * 2; Ok(stats.clone())
853 }
854
855 async fn store_operation_result(&self, result: VectorOperationResult) -> RragResult<()> {
857 let mut history = self.operation_history.write().await;
858 history.push_back(result.clone());
859
860 if history.len() > 1000 {
862 history.pop_front();
863 }
864
865 let mut metrics = self.metrics.write().await;
867 metrics.total_operations += 1;
868 metrics.total_embeddings_processed += result.embeddings_processed as u64;
869
870 if result.success {
871 metrics.success_rate = (metrics.success_rate * (metrics.total_operations - 1) as f32
872 + 1.0)
873 / metrics.total_operations as f32;
874 } else {
875 metrics.success_rate = (metrics.success_rate * (metrics.total_operations - 1) as f32)
876 / metrics.total_operations as f32;
877 }
878
879 metrics.avg_processing_time_ms = (metrics.avg_processing_time_ms
880 * (metrics.total_operations - 1) as f32
881 + result.processing_time_ms as f32)
882 / metrics.total_operations as f32;
883
884 Ok(())
885 }
886
887 fn extract_index_name(&self, operation: &VectorOperation) -> RragResult<String> {
889 match operation {
890 VectorOperation::Add { index_name, .. } => Ok(index_name.clone()),
891 VectorOperation::Update { index_name, .. } => Ok(index_name.clone()),
892 VectorOperation::Remove { index_name, .. } => Ok(index_name.clone()),
893 VectorOperation::Optimize { index_name, .. } => Ok(index_name.clone()),
894 VectorOperation::Rebuild { index_name, .. } => Ok(index_name.clone()),
895 VectorOperation::UpdateThresholds { index_name, .. } => Ok(index_name.clone()),
896 }
897 }
898
899 async fn perform_optimization(
901 &self,
902 _index_name: &str,
903 _optimization_type: &OptimizationType,
904 ) -> RragResult<OptimizationResult> {
905 tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
907
908 Ok(OptimizationResult {
909 success: true,
910 embeddings_affected: 1000,
911 peak_memory_usage: 100.0,
912 io_operations: 1000,
913 errors: Vec::new(),
914 metadata: HashMap::new(),
915 new_quality_metrics: IndexQualityMetrics {
916 recall_at_10: 0.95,
917 precision: 0.90,
918 freshness_score: 1.0,
919 clustering_quality: 0.85,
920 distribution_balance: 0.80,
921 },
922 new_performance_metrics: IndexPerformanceMetrics {
923 avg_query_time_ms: 8.0,
924 p95_query_time_ms: 40.0,
925 queries_per_second: 120.0,
926 build_time_ms: 800,
927 memory_efficiency: 0.85,
928 },
929 })
930 }
931}
932
933#[derive(Debug)]
935struct OptimizationResult {
936 success: bool,
937 embeddings_affected: usize,
938 peak_memory_usage: f32,
939 io_operations: u64,
940 errors: Vec<String>,
941 metadata: HashMap<String, serde_json::Value>,
942 new_quality_metrics: IndexQualityMetrics,
943 new_performance_metrics: IndexPerformanceMetrics,
944}
945
946#[cfg(test)]
947mod tests {
948 use super::*;
949 use crate::Embedding;
950
951 #[tokio::test]
952 async fn test_vector_update_manager_creation() {
953 let config = VectorUpdateConfig::default();
954 let manager = VectorUpdateManager::new(config).await.unwrap();
955 assert!(manager.health_check().await.unwrap());
956 }
957
958 #[tokio::test]
959 async fn test_submit_operation() {
960 let manager = VectorUpdateManager::new(VectorUpdateConfig::default())
961 .await
962 .unwrap();
963
964 let embedding = Embedding::new("test_id".to_string(), vec![0.1, 0.2, 0.3]);
965 let operation = VectorOperation::Add {
966 embeddings: vec![embedding],
967 index_name: "test_index".to_string(),
968 };
969
970 let op_id = manager.submit_operation(operation).await.unwrap();
971 assert!(!op_id.is_empty());
972 }
973
974 #[tokio::test]
975 async fn test_embedding_updates() {
976 let manager = VectorUpdateManager::new(VectorUpdateConfig::default())
977 .await
978 .unwrap();
979
980 let embedding = Embedding::new("test_id".to_string(), vec![0.1, 0.2, 0.3]);
981 let update = EmbeddingUpdate {
982 embedding_id: "test_id".to_string(),
983 new_embedding: embedding,
984 update_reason: UpdateReason::ContentChanged,
985 metadata: HashMap::new(),
986 };
987
988 let result = manager
989 .process_embedding_updates(vec![update], "test_index")
990 .await
991 .unwrap();
992
993 assert!(result.success);
994 assert_eq!(result.embeddings_processed, 1);
995 }
996
997 #[tokio::test]
998 async fn test_index_optimization() {
999 let manager = VectorUpdateManager::new(VectorUpdateConfig::default())
1000 .await
1001 .unwrap();
1002
1003 let result = manager
1004 .optimize_index("test_index", OptimizationType::Compact)
1005 .await
1006 .unwrap();
1007
1008 assert!(result.success);
1009 assert!(result.processing_time_ms > 0);
1010 }
1011
1012 #[tokio::test]
1013 async fn test_metrics_collection() {
1014 let manager = VectorUpdateManager::new(VectorUpdateConfig::default())
1015 .await
1016 .unwrap();
1017
1018 let embedding = Embedding::new("test_id".to_string(), vec![0.1, 0.2, 0.3]);
1020 let operation = VectorOperation::Add {
1021 embeddings: vec![embedding],
1022 index_name: "test_index".to_string(),
1023 };
1024
1025 manager.submit_operation(operation).await.unwrap();
1026
1027 let metrics = manager.get_metrics().await;
1028 assert!(metrics.system_performance.health_score >= 0.0);
1029 assert!(metrics.system_performance.health_score <= 1.0);
1030 }
1031
1032 #[test]
1033 fn test_update_strategies() {
1034 let strategies = vec![
1035 IndexUpdateStrategy::Immediate,
1036 IndexUpdateStrategy::Batch,
1037 IndexUpdateStrategy::Lazy,
1038 IndexUpdateStrategy::Adaptive,
1039 IndexUpdateStrategy::Custom("custom".to_string()),
1040 ];
1041
1042 for (i, strategy1) in strategies.iter().enumerate() {
1044 for (j, strategy2) in strategies.iter().enumerate() {
1045 if i != j {
1046 assert_ne!(format!("{:?}", strategy1), format!("{:?}", strategy2));
1047 }
1048 }
1049 }
1050 }
1051
1052 #[test]
1053 fn test_optimization_types() {
1054 let opt_types = vec![
1055 OptimizationType::Compact,
1056 OptimizationType::RebuildTrees,
1057 OptimizationType::UpdateClustering,
1058 OptimizationType::QueryOptimization,
1059 OptimizationType::MemoryOptimization,
1060 OptimizationType::Full,
1061 ];
1062
1063 for (i, type1) in opt_types.iter().enumerate() {
1065 for (j, type2) in opt_types.iter().enumerate() {
1066 if i != j {
1067 assert_ne!(format!("{:?}", type1), format!("{:?}", type2));
1068 }
1069 }
1070 }
1071 }
1072}