1use crate::incremental::change_detection::{ChangeResult, ChangeType};
7use crate::{Document, DocumentChunk, Embedding, RragError, RragResult};
8use serde::{Deserialize, Serialize};
9use std::collections::{HashMap, HashSet, VecDeque};
10use std::sync::Arc;
11use tokio::sync::{Mutex, RwLock};
12use uuid::Uuid;
13
14#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct IndexManagerConfig {
17 pub max_pending_operations: usize,
19
20 pub batch_size: usize,
22
23 pub operation_timeout_secs: u64,
25
26 pub enable_conflict_resolution: bool,
28
29 pub conflict_resolution: ConflictResolutionStrategy,
31
32 pub enable_operation_log: bool,
34
35 pub max_operation_log: usize,
37
38 pub enable_auto_cleanup: bool,
40
41 pub cleanup_interval_secs: u64,
43}
44
45#[derive(Debug, Clone, Serialize, Deserialize)]
47pub enum ConflictResolutionStrategy {
48 LastWriteWins,
50 FirstWriteWins,
52 Merge,
54 Manual,
56 Timestamp,
58 Custom(String),
60}
61
62impl Default for IndexManagerConfig {
63 fn default() -> Self {
64 Self {
65 max_pending_operations: 10000,
66 batch_size: 100,
67 operation_timeout_secs: 300, enable_conflict_resolution: true,
69 conflict_resolution: ConflictResolutionStrategy::LastWriteWins,
70 enable_operation_log: true,
71 max_operation_log: 10000,
72 enable_auto_cleanup: true,
73 cleanup_interval_secs: 3600, }
75 }
76}
77
78#[derive(Debug, Clone, Serialize, Deserialize)]
80pub enum IndexOperation {
81 Add {
83 document: Document,
84 chunks: Vec<DocumentChunk>,
85 embeddings: Vec<Embedding>,
86 },
87
88 Update {
90 document_id: String,
91 document: Document,
92 chunks: Vec<DocumentChunk>,
93 embeddings: Vec<Embedding>,
94 change_result: ChangeResult,
95 },
96
97 Delete { document_id: String },
99
100 UpdateEmbeddings {
102 document_id: String,
103 embeddings: Vec<Embedding>,
104 },
105
106 UpdateChunks {
108 document_id: String,
109 chunks: Vec<DocumentChunk>,
110 },
111
112 Batch { operations: Vec<IndexOperation> },
114
115 Rebuild {
117 index_name: String,
118 document_ids: Vec<String>,
119 },
120}
121
122#[derive(Debug, Clone, Serialize, Deserialize)]
124pub struct IndexUpdate {
125 pub operation_id: String,
127
128 pub operation: IndexOperation,
130
131 pub priority: u8,
133
134 pub timestamp: chrono::DateTime<chrono::Utc>,
136
137 pub source: String,
139
140 pub metadata: HashMap<String, serde_json::Value>,
142
143 pub dependencies: Vec<String>,
145
146 pub max_retries: u32,
148
149 pub retry_count: u32,
151}
152
153#[derive(Debug, Clone, Serialize, Deserialize)]
155pub struct UpdateResult {
156 pub operation_id: String,
158
159 pub success: bool,
161
162 pub operations_completed: Vec<String>,
164
165 pub conflicts: Vec<ConflictInfo>,
167
168 pub processing_time_ms: u64,
170
171 pub items_affected: usize,
173
174 pub error: Option<String>,
176
177 pub metadata: HashMap<String, serde_json::Value>,
179}
180
181#[derive(Debug, Clone, Serialize, Deserialize)]
183pub struct ConflictInfo {
184 pub document_id: String,
186
187 pub conflict_type: ConflictType,
189
190 pub conflicting_operations: Vec<String>,
192
193 pub resolution: ConflictResolution,
195
196 pub context: HashMap<String, serde_json::Value>,
198}
199
200#[derive(Debug, Clone, Serialize, Deserialize)]
202pub enum ConflictType {
203 ConcurrentUpdate,
205 VersionMismatch,
207 DependencyConflict,
209 ResourceLock,
211 SchemaConflict,
213}
214
215#[derive(Debug, Clone, Serialize, Deserialize)]
217pub enum ConflictResolution {
218 AutoResolved(String),
220 ManuallyResolved(String),
222 Deferred,
224 Failed(String),
226}
227
228#[derive(Debug, Clone, Serialize, Deserialize)]
230pub enum OperationStatus {
231 Queued,
233 Processing,
235 Completed,
237 Failed(String),
239 Cancelled,
241 Waiting,
243 ConflictResolution,
245}
246
247#[derive(Debug, Clone)]
249struct TrackedOperation {
250 update: IndexUpdate,
252
253 status: OperationStatus,
255
256 start_time: Option<chrono::DateTime<chrono::Utc>>,
258
259 end_time: Option<chrono::DateTime<chrono::Utc>>,
261
262 result: Option<UpdateResult>,
264}
265
266pub struct IncrementalIndexManager {
268 config: IndexManagerConfig,
270
271 pending_operations: Arc<Mutex<VecDeque<TrackedOperation>>>,
273
274 processing_operations: Arc<RwLock<HashMap<String, TrackedOperation>>>,
276
277 completed_operations: Arc<RwLock<VecDeque<TrackedOperation>>>,
279
280 index_state: Arc<RwLock<IndexState>>,
282
283 conflict_resolver: Arc<ConflictResolver>,
285
286 stats: Arc<RwLock<IndexManagerStats>>,
288
289 task_handles: Arc<Mutex<Vec<tokio::task::JoinHandle<()>>>>,
291}
292
293#[derive(Debug)]
295struct IndexState {
296 indexed_documents: HashSet<String>,
298
299 document_versions: HashMap<String, u64>,
301
302 document_locks: HashMap<String, tokio::sync::Mutex<()>>,
304
305 metadata: HashMap<String, serde_json::Value>,
307
308 last_updated: chrono::DateTime<chrono::Utc>,
310}
311
312struct ConflictResolver {
314 strategy: ConflictResolutionStrategy,
316
317 manual_queue: Arc<Mutex<VecDeque<ConflictInfo>>>,
319
320 resolution_history: Arc<RwLock<Vec<ConflictInfo>>>,
322}
323
324#[derive(Debug, Clone, Serialize, Deserialize)]
326pub struct IndexManagerStats {
327 pub total_operations: u64,
329
330 pub operations_by_type: HashMap<String, u64>,
332
333 pub success_rate: f64,
335
336 pub avg_processing_time_ms: f64,
338
339 pub total_conflicts: u64,
341
342 pub auto_resolved_conflicts: u64,
344
345 pub current_queue_depth: usize,
347 pub max_queue_depth: usize,
348
349 pub throughput_ops_per_second: f64,
351
352 pub last_updated: chrono::DateTime<chrono::Utc>,
354}
355
356impl IncrementalIndexManager {
357 pub async fn new(config: IndexManagerConfig) -> RragResult<Self> {
359 let pending_operations = Arc::new(Mutex::new(VecDeque::new()));
360 let processing_operations = Arc::new(RwLock::new(HashMap::new()));
361 let completed_operations = Arc::new(RwLock::new(VecDeque::new()));
362
363 let index_state = Arc::new(RwLock::new(IndexState {
364 indexed_documents: HashSet::new(),
365 document_versions: HashMap::new(),
366 document_locks: HashMap::new(),
367 metadata: HashMap::new(),
368 last_updated: chrono::Utc::now(),
369 }));
370
371 let conflict_resolver = Arc::new(ConflictResolver {
372 strategy: config.conflict_resolution.clone(),
373 manual_queue: Arc::new(Mutex::new(VecDeque::new())),
374 resolution_history: Arc::new(RwLock::new(Vec::new())),
375 });
376
377 let stats = Arc::new(RwLock::new(IndexManagerStats {
378 total_operations: 0,
379 operations_by_type: HashMap::new(),
380 success_rate: 0.0,
381 avg_processing_time_ms: 0.0,
382 total_conflicts: 0,
383 auto_resolved_conflicts: 0,
384 current_queue_depth: 0,
385 max_queue_depth: 0,
386 throughput_ops_per_second: 0.0,
387 last_updated: chrono::Utc::now(),
388 }));
389
390 let task_handles = Arc::new(Mutex::new(Vec::new()));
391
392 let manager = Self {
393 config,
394 pending_operations,
395 processing_operations,
396 completed_operations,
397 index_state,
398 conflict_resolver,
399 stats,
400 task_handles,
401 };
402
403 manager.start_background_tasks().await?;
405
406 Ok(manager)
407 }
408
409 pub async fn submit_update(&self, update: IndexUpdate) -> RragResult<String> {
411 self.validate_update(&update).await?;
413
414 let tracked_op = TrackedOperation {
416 update: update.clone(),
417 status: OperationStatus::Queued,
418 start_time: None,
419 end_time: None,
420 result: None,
421 };
422
423 {
425 let mut queue = self.pending_operations.lock().await;
426
427 if queue.len() >= self.config.max_pending_operations {
429 return Err(RragError::storage(
430 "queue_full",
431 std::io::Error::new(std::io::ErrorKind::Other, "Operation queue is full"),
432 ));
433 }
434
435 queue.push_back(tracked_op);
436 }
437
438 {
440 let mut stats = self.stats.write().await;
441 stats.current_queue_depth = {
442 let queue = self.pending_operations.lock().await;
443 queue.len()
444 };
445 stats.max_queue_depth = std::cmp::max(stats.max_queue_depth, stats.current_queue_depth);
446 }
447
448 Ok(update.operation_id)
449 }
450
451 pub async fn submit_batch(&self, operations: Vec<IndexUpdate>) -> RragResult<Vec<String>> {
453 if operations.is_empty() {
454 return Ok(Vec::new());
455 }
456
457 let batch_id = Uuid::new_v4().to_string();
459 let batch_operation = IndexOperation::Batch {
460 operations: operations.iter().map(|op| op.operation.clone()).collect(),
461 };
462
463 let batch_update = IndexUpdate {
464 operation_id: batch_id.clone(),
465 operation: batch_operation,
466 priority: operations.iter().map(|op| op.priority).max().unwrap_or(5),
467 timestamp: chrono::Utc::now(),
468 source: "batch_processor".to_string(),
469 metadata: HashMap::new(),
470 dependencies: Vec::new(),
471 max_retries: 3,
472 retry_count: 0,
473 };
474
475 let mut operation_ids = Vec::new();
477 for operation in operations {
478 let op_id = self.submit_update(operation).await?;
479 operation_ids.push(op_id);
480 }
481
482 self.submit_update(batch_update).await?;
484 operation_ids.push(batch_id);
485
486 Ok(operation_ids)
487 }
488
489 pub async fn get_operation_status(
491 &self,
492 operation_id: &str,
493 ) -> RragResult<Option<OperationStatus>> {
494 {
496 let processing = self.processing_operations.read().await;
497 if let Some(op) = processing.get(operation_id) {
498 return Ok(Some(op.status.clone()));
499 }
500 }
501
502 {
504 let queue = self.pending_operations.lock().await;
505 for op in queue.iter() {
506 if op.update.operation_id == operation_id {
507 return Ok(Some(op.status.clone()));
508 }
509 }
510 }
511
512 {
514 let completed = self.completed_operations.read().await;
515 for op in completed.iter() {
516 if op.update.operation_id == operation_id {
517 return Ok(Some(op.status.clone()));
518 }
519 }
520 }
521
522 Ok(None)
523 }
524
525 pub async fn get_operation_result(
527 &self,
528 operation_id: &str,
529 ) -> RragResult<Option<UpdateResult>> {
530 {
532 let processing = self.processing_operations.read().await;
533 if let Some(op) = processing.get(operation_id) {
534 return Ok(op.result.clone());
535 }
536 }
537
538 {
540 let completed = self.completed_operations.read().await;
541 for op in completed.iter() {
542 if op.update.operation_id == operation_id {
543 return Ok(op.result.clone());
544 }
545 }
546 }
547
548 Ok(None)
549 }
550
551 pub async fn cancel_operation(&self, operation_id: &str) -> RragResult<bool> {
553 {
555 let mut queue = self.pending_operations.lock().await;
556 if let Some(pos) = queue
557 .iter()
558 .position(|op| op.update.operation_id == operation_id)
559 {
560 queue.remove(pos);
561 return Ok(true);
562 }
563 }
564
565 {
567 let mut processing = self.processing_operations.write().await;
568 if let Some(mut op) = processing.remove(operation_id) {
569 op.status = OperationStatus::Cancelled;
570 op.end_time = Some(chrono::Utc::now());
571
572 let mut completed = self.completed_operations.write().await;
574 completed.push_back(op);
575
576 return Ok(true);
577 }
578 }
579
580 Ok(false)
581 }
582
583 pub async fn get_stats(&self) -> IndexManagerStats {
585 let mut stats = self.stats.read().await.clone();
586 stats.current_queue_depth = {
587 let queue = self.pending_operations.lock().await;
588 queue.len()
589 };
590 stats.last_updated = chrono::Utc::now();
591 stats
592 }
593
594 pub async fn get_index_state(&self) -> RragResult<HashMap<String, serde_json::Value>> {
596 let state = self.index_state.read().await;
597 let mut info = HashMap::new();
598
599 info.insert(
600 "indexed_documents_count".to_string(),
601 serde_json::Value::Number(state.indexed_documents.len().into()),
602 );
603 info.insert(
604 "last_updated".to_string(),
605 serde_json::Value::String(state.last_updated.to_rfc3339()),
606 );
607 info.insert(
608 "metadata".to_string(),
609 serde_json::Value::Object(state.metadata.clone().into_iter().collect()),
610 );
611
612 Ok(info)
613 }
614
615 pub async fn health_check(&self) -> RragResult<bool> {
617 let handles = self.task_handles.lock().await;
619 let all_running = handles.iter().all(|handle| !handle.is_finished());
620
621 let queue_size = {
623 let queue = self.pending_operations.lock().await;
624 queue.len()
625 };
626 let queue_healthy = queue_size < self.config.max_pending_operations;
627
628 Ok(all_running && queue_healthy)
629 }
630
631 async fn start_background_tasks(&self) -> RragResult<()> {
633 let mut handles = self.task_handles.lock().await;
634
635 let processor_handle = self.start_operation_processor().await;
637 handles.push(processor_handle);
638
639 if self.config.enable_auto_cleanup {
641 let cleanup_handle = self.start_cleanup_task().await;
642 handles.push(cleanup_handle);
643 }
644
645 Ok(())
646 }
647
648 async fn start_operation_processor(&self) -> tokio::task::JoinHandle<()> {
650 let pending_ops = Arc::clone(&self.pending_operations);
651 let processing_ops = Arc::clone(&self.processing_operations);
652 let completed_ops = Arc::clone(&self.completed_operations);
653 let index_state = Arc::clone(&self.index_state);
654 let conflict_resolver = Arc::clone(&self.conflict_resolver);
655 let stats = Arc::clone(&self.stats);
656 let config = self.config.clone();
657
658 tokio::spawn(async move {
659 loop {
660 let operation = {
662 let mut queue = pending_ops.lock().await;
663 queue.pop_front()
664 };
665
666 if let Some(mut tracked_op) = operation {
667 tracked_op.status = OperationStatus::Processing;
668 tracked_op.start_time = Some(chrono::Utc::now());
669
670 let operation_id = tracked_op.update.operation_id.clone();
671
672 {
674 let mut processing = processing_ops.write().await;
675 processing.insert(operation_id.clone(), tracked_op.clone());
676 }
677
678 let result = Self::process_operation(
680 &tracked_op.update,
681 &index_state,
682 &conflict_resolver,
683 &config,
684 )
685 .await;
686
687 tracked_op.end_time = Some(chrono::Utc::now());
689 tracked_op.result = Some(result.clone());
690 tracked_op.status = if result.success {
691 OperationStatus::Completed
692 } else {
693 OperationStatus::Failed(result.error.unwrap_or_default())
694 };
695
696 let op_type = format!("{:?}", tracked_op.update.operation)
698 .split('{')
699 .next()
700 .unwrap_or("Unknown")
701 .to_string();
702
703 {
705 let mut processing = processing_ops.write().await;
706 processing.remove(&operation_id);
707 }
708 {
709 let mut completed = completed_ops.write().await;
710 completed.push_back(tracked_op);
711
712 if completed.len() > config.max_operation_log {
714 completed.pop_front();
715 }
716 }
717
718 {
720 let mut stats_guard = stats.write().await;
721 stats_guard.total_operations += 1;
722
723 *stats_guard.operations_by_type.entry(op_type).or_insert(0) += 1;
724
725 stats_guard.success_rate = if stats_guard.total_operations > 0 {
726 let successful = stats_guard.operations_by_type.values().sum::<u64>();
727 successful as f64 / stats_guard.total_operations as f64
728 } else {
729 0.0
730 };
731
732 stats_guard.avg_processing_time_ms = (stats_guard.avg_processing_time_ms
733 + result.processing_time_ms as f64)
734 / 2.0;
735
736 stats_guard.last_updated = chrono::Utc::now();
737 }
738 } else {
739 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
741 }
742 }
743 })
744 }
745
746 async fn start_cleanup_task(&self) -> tokio::task::JoinHandle<()> {
748 let completed_ops = Arc::clone(&self.completed_operations);
749 let config = self.config.clone();
750
751 tokio::spawn(async move {
752 let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(
753 config.cleanup_interval_secs,
754 ));
755
756 loop {
757 interval.tick().await;
758
759 {
761 let mut completed = completed_ops.write().await;
762 while completed.len() > config.max_operation_log {
763 completed.pop_front();
764 }
765 }
766 }
767 })
768 }
769
770 async fn process_operation(
772 update: &IndexUpdate,
773 index_state: &Arc<RwLock<IndexState>>,
774 conflict_resolver: &Arc<ConflictResolver>,
775 _config: &IndexManagerConfig,
776 ) -> UpdateResult {
777 let start_time = std::time::Instant::now();
778 let mut conflicts = Vec::new();
779 let mut items_affected = 0;
780 let mut operations_completed = Vec::new();
781
782 let success = match &update.operation {
783 IndexOperation::Add {
784 document,
785 chunks,
786 embeddings,
787 } => {
788 match Self::process_add_operation(document, chunks, embeddings, index_state).await {
789 Ok(count) => {
790 items_affected = count;
791 operations_completed.push("add".to_string());
792 true
793 }
794 Err(_) => false,
795 }
796 }
797
798 IndexOperation::Update {
799 document_id,
800 document,
801 chunks,
802 embeddings,
803 change_result,
804 } => {
805 match Self::process_update_operation(
806 document_id,
807 document,
808 chunks,
809 embeddings,
810 change_result,
811 index_state,
812 conflict_resolver,
813 )
814 .await
815 {
816 Ok((count, detected_conflicts)) => {
817 items_affected = count;
818 conflicts = detected_conflicts;
819 operations_completed.push("update".to_string());
820 true
821 }
822 Err(_) => false,
823 }
824 }
825
826 IndexOperation::Delete { document_id } => {
827 match Self::process_delete_operation(document_id, index_state).await {
828 Ok(count) => {
829 items_affected = count;
830 operations_completed.push("delete".to_string());
831 true
832 }
833 Err(_) => false,
834 }
835 }
836
837 IndexOperation::UpdateEmbeddings {
838 document_id,
839 embeddings,
840 } => match Self::process_embedding_update(document_id, embeddings, index_state).await {
841 Ok(count) => {
842 items_affected = count;
843 operations_completed.push("update_embeddings".to_string());
844 true
845 }
846 Err(_) => false,
847 },
848
849 IndexOperation::UpdateChunks {
850 document_id,
851 chunks,
852 } => match Self::process_chunk_update(document_id, chunks, index_state).await {
853 Ok(count) => {
854 items_affected = count;
855 operations_completed.push("update_chunks".to_string());
856 true
857 }
858 Err(_) => false,
859 },
860
861 IndexOperation::Batch { operations } => {
862 operations_completed.push("batch".to_string());
863 items_affected = operations.len();
864 true }
866
867 IndexOperation::Rebuild {
868 index_name: _,
869 document_ids,
870 } => {
871 operations_completed.push("rebuild".to_string());
872 items_affected = document_ids.len();
873 true }
875 };
876
877 UpdateResult {
878 operation_id: update.operation_id.clone(),
879 success,
880 operations_completed,
881 conflicts,
882 processing_time_ms: start_time.elapsed().as_millis() as u64,
883 items_affected,
884 error: if success {
885 None
886 } else {
887 Some("Operation failed".to_string())
888 },
889 metadata: HashMap::new(),
890 }
891 }
892
893 async fn process_add_operation(
895 document: &Document,
896 chunks: &[DocumentChunk],
897 embeddings: &[Embedding],
898 index_state: &Arc<RwLock<IndexState>>,
899 ) -> RragResult<usize> {
900 let mut state = index_state.write().await;
901
902 state.indexed_documents.insert(document.id.clone());
904 state.document_versions.insert(document.id.clone(), 1);
905 state.last_updated = chrono::Utc::now();
906
907 Ok(1 + chunks.len() + embeddings.len())
908 }
909
910 async fn process_update_operation(
912 document_id: &str,
913 document: &Document,
914 chunks: &[DocumentChunk],
915 embeddings: &[Embedding],
916 change_result: &ChangeResult,
917 index_state: &Arc<RwLock<IndexState>>,
918 _conflict_resolver: &Arc<ConflictResolver>,
919 ) -> RragResult<(usize, Vec<ConflictInfo>)> {
920 let mut state = index_state.write().await;
921 let conflicts = Vec::new();
922
923 if let Some(_current_version) = state.document_versions.get(document_id) {
925 if change_result.change_type == ChangeType::NoChange {
927 }
929 }
930
931 state.indexed_documents.insert(document.id.clone());
933 let new_version = state.document_versions.get(document_id).unwrap_or(&0) + 1;
934 state
935 .document_versions
936 .insert(document_id.to_string(), new_version);
937 state.last_updated = chrono::Utc::now();
938
939 Ok((1 + chunks.len() + embeddings.len(), conflicts))
940 }
941
942 async fn process_delete_operation(
944 document_id: &str,
945 index_state: &Arc<RwLock<IndexState>>,
946 ) -> RragResult<usize> {
947 let mut state = index_state.write().await;
948
949 let was_present = state.indexed_documents.remove(document_id);
950 state.document_versions.remove(document_id);
951 state.last_updated = chrono::Utc::now();
952
953 Ok(if was_present { 1 } else { 0 })
954 }
955
956 async fn process_embedding_update(
958 _document_id: &str,
959 embeddings: &[Embedding],
960 index_state: &Arc<RwLock<IndexState>>,
961 ) -> RragResult<usize> {
962 let mut state = index_state.write().await;
963 state.last_updated = chrono::Utc::now();
964 Ok(embeddings.len())
965 }
966
967 async fn process_chunk_update(
969 _document_id: &str,
970 chunks: &[DocumentChunk],
971 index_state: &Arc<RwLock<IndexState>>,
972 ) -> RragResult<usize> {
973 let mut state = index_state.write().await;
974 state.last_updated = chrono::Utc::now();
975 Ok(chunks.len())
976 }
977
978 async fn validate_update(&self, update: &IndexUpdate) -> RragResult<()> {
980 if update.operation_id.is_empty() {
982 return Err(RragError::validation("operation_id", "non-empty", "empty"));
983 }
984
985 if update.priority > 10 {
986 return Err(RragError::validation(
987 "priority",
988 "0-10",
989 &update.priority.to_string(),
990 ));
991 }
992
993 match &update.operation {
995 IndexOperation::Add { document, .. } => {
996 if document.id.is_empty() {
997 return Err(RragError::validation("document.id", "non-empty", "empty"));
998 }
999 }
1000 IndexOperation::Update { document_id, .. } => {
1001 if document_id.is_empty() {
1002 return Err(RragError::validation("document_id", "non-empty", "empty"));
1003 }
1004 }
1005 IndexOperation::Delete { document_id } => {
1006 if document_id.is_empty() {
1007 return Err(RragError::validation("document_id", "non-empty", "empty"));
1008 }
1009 }
1010 _ => {} }
1012
1013 Ok(())
1014 }
1015}
1016
1017#[cfg(test)]
1018mod tests {
1019 use super::*;
1020 use crate::Document;
1021
1022 #[tokio::test]
1023 async fn test_index_manager_creation() {
1024 let config = IndexManagerConfig::default();
1025 let manager = IncrementalIndexManager::new(config).await.unwrap();
1026 assert!(manager.health_check().await.unwrap());
1027 }
1028
1029 #[tokio::test]
1030 async fn test_submit_add_operation() {
1031 let manager = IncrementalIndexManager::new(IndexManagerConfig::default())
1032 .await
1033 .unwrap();
1034
1035 let doc = Document::new("Test content");
1036 let operation = IndexOperation::Add {
1037 document: doc.clone(),
1038 chunks: Vec::new(),
1039 embeddings: Vec::new(),
1040 };
1041
1042 let update = IndexUpdate {
1043 operation_id: Uuid::new_v4().to_string(),
1044 operation,
1045 priority: 5,
1046 timestamp: chrono::Utc::now(),
1047 source: "test".to_string(),
1048 metadata: HashMap::new(),
1049 dependencies: Vec::new(),
1050 max_retries: 3,
1051 retry_count: 0,
1052 };
1053
1054 let op_id = manager.submit_update(update).await.unwrap();
1055 assert!(!op_id.is_empty());
1056
1057 let status = manager.get_operation_status(&op_id).await.unwrap();
1059 assert!(status.is_some());
1060 }
1061
1062 #[tokio::test]
1063 async fn test_batch_operations() {
1064 let manager = IncrementalIndexManager::new(IndexManagerConfig::default())
1065 .await
1066 .unwrap();
1067
1068 let mut operations = Vec::new();
1069 for i in 0..3 {
1070 let doc = Document::new(format!("Test content {}", i));
1071 let operation = IndexOperation::Add {
1072 document: doc,
1073 chunks: Vec::new(),
1074 embeddings: Vec::new(),
1075 };
1076
1077 let update = IndexUpdate {
1078 operation_id: Uuid::new_v4().to_string(),
1079 operation,
1080 priority: 5,
1081 timestamp: chrono::Utc::now(),
1082 source: "test".to_string(),
1083 metadata: HashMap::new(),
1084 dependencies: Vec::new(),
1085 max_retries: 3,
1086 retry_count: 0,
1087 };
1088
1089 operations.push(update);
1090 }
1091
1092 let op_ids = manager.submit_batch(operations).await.unwrap();
1093 assert_eq!(op_ids.len(), 4); }
1095
1096 #[tokio::test]
1097 async fn test_operation_cancellation() {
1098 let manager = IncrementalIndexManager::new(IndexManagerConfig::default())
1099 .await
1100 .unwrap();
1101
1102 let doc = Document::new("Test content");
1103 let operation = IndexOperation::Add {
1104 document: doc,
1105 chunks: Vec::new(),
1106 embeddings: Vec::new(),
1107 };
1108
1109 let update = IndexUpdate {
1110 operation_id: Uuid::new_v4().to_string(),
1111 operation,
1112 priority: 5,
1113 timestamp: chrono::Utc::now(),
1114 source: "test".to_string(),
1115 metadata: HashMap::new(),
1116 dependencies: Vec::new(),
1117 max_retries: 3,
1118 retry_count: 0,
1119 };
1120
1121 let op_id = manager.submit_update(update).await.unwrap();
1122
1123 let cancelled = manager.cancel_operation(&op_id).await.unwrap();
1125 assert!(cancelled);
1126 }
1127
1128 #[test]
1129 fn test_conflict_resolution_strategies() {
1130 let strategies = vec![
1131 ConflictResolutionStrategy::LastWriteWins,
1132 ConflictResolutionStrategy::FirstWriteWins,
1133 ConflictResolutionStrategy::Merge,
1134 ConflictResolutionStrategy::Manual,
1135 ConflictResolutionStrategy::Timestamp,
1136 ConflictResolutionStrategy::Custom("custom_logic".to_string()),
1137 ];
1138
1139 for (i, strategy1) in strategies.iter().enumerate() {
1141 for (j, strategy2) in strategies.iter().enumerate() {
1142 if i != j {
1143 assert_ne!(format!("{:?}", strategy1), format!("{:?}", strategy2));
1144 }
1145 }
1146 }
1147 }
1148}