rexis_rag/incremental/
index_manager.rs

1//! # Incremental Index Manager
2//!
3//! Manages incremental updates to document indexes without requiring full rebuilds.
4//! Handles conflict resolution, operation queuing, and index consistency.
5
6use crate::incremental::change_detection::{ChangeResult, ChangeType};
7use crate::{Document, DocumentChunk, Embedding, RragError, RragResult};
8use serde::{Deserialize, Serialize};
9use std::collections::{HashMap, HashSet, VecDeque};
10use std::sync::Arc;
11use tokio::sync::{Mutex, RwLock};
12use uuid::Uuid;
13
14/// Index manager configuration
15#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct IndexManagerConfig {
17    /// Maximum pending operations
18    pub max_pending_operations: usize,
19
20    /// Operation batch size
21    pub batch_size: usize,
22
23    /// Operation timeout in seconds
24    pub operation_timeout_secs: u64,
25
26    /// Enable conflict resolution
27    pub enable_conflict_resolution: bool,
28
29    /// Conflict resolution strategy
30    pub conflict_resolution: ConflictResolutionStrategy,
31
32    /// Enable operation logging
33    pub enable_operation_log: bool,
34
35    /// Maximum operation log size
36    pub max_operation_log: usize,
37
38    /// Enable automatic cleanup
39    pub enable_auto_cleanup: bool,
40
41    /// Cleanup interval in seconds
42    pub cleanup_interval_secs: u64,
43}
44
45/// Conflict resolution strategies
46#[derive(Debug, Clone, Serialize, Deserialize)]
47pub enum ConflictResolutionStrategy {
48    /// Last write wins
49    LastWriteWins,
50    /// First write wins
51    FirstWriteWins,
52    /// Merge changes when possible
53    Merge,
54    /// Manual resolution required
55    Manual,
56    /// Use version timestamps
57    Timestamp,
58    /// Use custom resolution logic
59    Custom(String),
60}
61
62impl Default for IndexManagerConfig {
63    fn default() -> Self {
64        Self {
65            max_pending_operations: 10000,
66            batch_size: 100,
67            operation_timeout_secs: 300, // 5 minutes
68            enable_conflict_resolution: true,
69            conflict_resolution: ConflictResolutionStrategy::LastWriteWins,
70            enable_operation_log: true,
71            max_operation_log: 10000,
72            enable_auto_cleanup: true,
73            cleanup_interval_secs: 3600, // 1 hour
74        }
75    }
76}
77
78/// Types of index operations
79#[derive(Debug, Clone, Serialize, Deserialize)]
80pub enum IndexOperation {
81    /// Add new document and its chunks
82    Add {
83        document: Document,
84        chunks: Vec<DocumentChunk>,
85        embeddings: Vec<Embedding>,
86    },
87
88    /// Update existing document
89    Update {
90        document_id: String,
91        document: Document,
92        chunks: Vec<DocumentChunk>,
93        embeddings: Vec<Embedding>,
94        change_result: ChangeResult,
95    },
96
97    /// Delete document and all associated data
98    Delete { document_id: String },
99
100    /// Update only embeddings
101    UpdateEmbeddings {
102        document_id: String,
103        embeddings: Vec<Embedding>,
104    },
105
106    /// Update only chunks
107    UpdateChunks {
108        document_id: String,
109        chunks: Vec<DocumentChunk>,
110    },
111
112    /// Batch operation containing multiple operations
113    Batch { operations: Vec<IndexOperation> },
114
115    /// Rebuild specific index
116    Rebuild {
117        index_name: String,
118        document_ids: Vec<String>,
119    },
120}
121
122/// Index update specification
123#[derive(Debug, Clone, Serialize, Deserialize)]
124pub struct IndexUpdate {
125    /// Unique operation ID
126    pub operation_id: String,
127
128    /// Operation to perform
129    pub operation: IndexOperation,
130
131    /// Priority level (0-10, higher = more priority)
132    pub priority: u8,
133
134    /// Operation timestamp
135    pub timestamp: chrono::DateTime<chrono::Utc>,
136
137    /// Source of the operation
138    pub source: String,
139
140    /// Operation metadata
141    pub metadata: HashMap<String, serde_json::Value>,
142
143    /// Dependencies on other operations
144    pub dependencies: Vec<String>,
145
146    /// Maximum retry attempts
147    pub max_retries: u32,
148
149    /// Current retry count
150    pub retry_count: u32,
151}
152
153/// Result of an update operation
154#[derive(Debug, Clone, Serialize, Deserialize)]
155pub struct UpdateResult {
156    /// Operation ID
157    pub operation_id: String,
158
159    /// Whether the operation succeeded
160    pub success: bool,
161
162    /// Operations performed
163    pub operations_completed: Vec<String>,
164
165    /// Conflicts encountered
166    pub conflicts: Vec<ConflictInfo>,
167
168    /// Processing time in milliseconds
169    pub processing_time_ms: u64,
170
171    /// Items affected
172    pub items_affected: usize,
173
174    /// Error details if failed
175    pub error: Option<String>,
176
177    /// Metadata about the operation
178    pub metadata: HashMap<String, serde_json::Value>,
179}
180
181/// Conflict information
182#[derive(Debug, Clone, Serialize, Deserialize)]
183pub struct ConflictInfo {
184    /// Document ID where conflict occurred
185    pub document_id: String,
186
187    /// Type of conflict
188    pub conflict_type: ConflictType,
189
190    /// Conflicting operations
191    pub conflicting_operations: Vec<String>,
192
193    /// Resolution applied
194    pub resolution: ConflictResolution,
195
196    /// Additional context
197    pub context: HashMap<String, serde_json::Value>,
198}
199
200/// Types of conflicts
201#[derive(Debug, Clone, Serialize, Deserialize)]
202pub enum ConflictType {
203    /// Multiple updates to the same document
204    ConcurrentUpdate,
205    /// Version mismatch
206    VersionMismatch,
207    /// Dependency conflict
208    DependencyConflict,
209    /// Resource lock conflict
210    ResourceLock,
211    /// Schema conflict
212    SchemaConflict,
213}
214
215/// Conflict resolution applied
216#[derive(Debug, Clone, Serialize, Deserialize)]
217pub enum ConflictResolution {
218    /// Automatically resolved
219    AutoResolved(String),
220    /// Manually resolved
221    ManuallyResolved(String),
222    /// Deferred for later resolution
223    Deferred,
224    /// Failed to resolve
225    Failed(String),
226}
227
228/// Operation status tracking
229#[derive(Debug, Clone, Serialize, Deserialize)]
230pub enum OperationStatus {
231    /// Queued for processing
232    Queued,
233    /// Currently being processed
234    Processing,
235    /// Successfully completed
236    Completed,
237    /// Failed with error
238    Failed(String),
239    /// Cancelled
240    Cancelled,
241    /// Waiting for dependencies
242    Waiting,
243    /// Conflict resolution required
244    ConflictResolution,
245}
246
247/// Tracked operation state
248#[derive(Debug, Clone)]
249struct TrackedOperation {
250    /// Update specification
251    update: IndexUpdate,
252
253    /// Current status
254    status: OperationStatus,
255
256    /// Start time
257    start_time: Option<chrono::DateTime<chrono::Utc>>,
258
259    /// End time
260    end_time: Option<chrono::DateTime<chrono::Utc>>,
261
262    /// Result if completed
263    result: Option<UpdateResult>,
264}
265
266/// Incremental index manager
267pub struct IncrementalIndexManager {
268    /// Configuration
269    config: IndexManagerConfig,
270
271    /// Pending operations queue
272    pending_operations: Arc<Mutex<VecDeque<TrackedOperation>>>,
273
274    /// Currently processing operations
275    processing_operations: Arc<RwLock<HashMap<String, TrackedOperation>>>,
276
277    /// Completed operations history
278    completed_operations: Arc<RwLock<VecDeque<TrackedOperation>>>,
279
280    /// Index state tracking
281    index_state: Arc<RwLock<IndexState>>,
282
283    /// Conflict resolution system
284    conflict_resolver: Arc<ConflictResolver>,
285
286    /// Operation statistics
287    stats: Arc<RwLock<IndexManagerStats>>,
288
289    /// Background task handles
290    task_handles: Arc<Mutex<Vec<tokio::task::JoinHandle<()>>>>,
291}
292
293/// Index state tracking
294#[derive(Debug)]
295struct IndexState {
296    /// Documents currently indexed
297    indexed_documents: HashSet<String>,
298
299    /// Document versions
300    document_versions: HashMap<String, u64>,
301
302    /// Document locks for concurrent access
303    document_locks: HashMap<String, tokio::sync::Mutex<()>>,
304
305    /// Index metadata
306    metadata: HashMap<String, serde_json::Value>,
307
308    /// Last update timestamp
309    last_updated: chrono::DateTime<chrono::Utc>,
310}
311
312/// Conflict resolution system
313struct ConflictResolver {
314    /// Resolution strategy
315    strategy: ConflictResolutionStrategy,
316
317    /// Manual resolution queue
318    manual_queue: Arc<Mutex<VecDeque<ConflictInfo>>>,
319
320    /// Resolution history
321    resolution_history: Arc<RwLock<Vec<ConflictInfo>>>,
322}
323
324/// Index manager statistics
325#[derive(Debug, Clone, Serialize, Deserialize)]
326pub struct IndexManagerStats {
327    /// Total operations processed
328    pub total_operations: u64,
329
330    /// Operations by type
331    pub operations_by_type: HashMap<String, u64>,
332
333    /// Success rate
334    pub success_rate: f64,
335
336    /// Average processing time
337    pub avg_processing_time_ms: f64,
338
339    /// Conflicts encountered
340    pub total_conflicts: u64,
341
342    /// Conflicts resolved automatically
343    pub auto_resolved_conflicts: u64,
344
345    /// Queue depth statistics
346    pub current_queue_depth: usize,
347    pub max_queue_depth: usize,
348
349    /// Performance metrics
350    pub throughput_ops_per_second: f64,
351
352    /// Last updated
353    pub last_updated: chrono::DateTime<chrono::Utc>,
354}
355
356impl IncrementalIndexManager {
357    /// Create a new index manager
358    pub async fn new(config: IndexManagerConfig) -> RragResult<Self> {
359        let pending_operations = Arc::new(Mutex::new(VecDeque::new()));
360        let processing_operations = Arc::new(RwLock::new(HashMap::new()));
361        let completed_operations = Arc::new(RwLock::new(VecDeque::new()));
362
363        let index_state = Arc::new(RwLock::new(IndexState {
364            indexed_documents: HashSet::new(),
365            document_versions: HashMap::new(),
366            document_locks: HashMap::new(),
367            metadata: HashMap::new(),
368            last_updated: chrono::Utc::now(),
369        }));
370
371        let conflict_resolver = Arc::new(ConflictResolver {
372            strategy: config.conflict_resolution.clone(),
373            manual_queue: Arc::new(Mutex::new(VecDeque::new())),
374            resolution_history: Arc::new(RwLock::new(Vec::new())),
375        });
376
377        let stats = Arc::new(RwLock::new(IndexManagerStats {
378            total_operations: 0,
379            operations_by_type: HashMap::new(),
380            success_rate: 0.0,
381            avg_processing_time_ms: 0.0,
382            total_conflicts: 0,
383            auto_resolved_conflicts: 0,
384            current_queue_depth: 0,
385            max_queue_depth: 0,
386            throughput_ops_per_second: 0.0,
387            last_updated: chrono::Utc::now(),
388        }));
389
390        let task_handles = Arc::new(Mutex::new(Vec::new()));
391
392        let manager = Self {
393            config,
394            pending_operations,
395            processing_operations,
396            completed_operations,
397            index_state,
398            conflict_resolver,
399            stats,
400            task_handles,
401        };
402
403        // Start background processing tasks
404        manager.start_background_tasks().await?;
405
406        Ok(manager)
407    }
408
409    /// Submit an update operation
410    pub async fn submit_update(&self, update: IndexUpdate) -> RragResult<String> {
411        // Validate update
412        self.validate_update(&update).await?;
413
414        // Create tracked operation
415        let tracked_op = TrackedOperation {
416            update: update.clone(),
417            status: OperationStatus::Queued,
418            start_time: None,
419            end_time: None,
420            result: None,
421        };
422
423        // Add to queue
424        {
425            let mut queue = self.pending_operations.lock().await;
426
427            // Check queue capacity
428            if queue.len() >= self.config.max_pending_operations {
429                return Err(RragError::storage(
430                    "queue_full",
431                    std::io::Error::new(std::io::ErrorKind::Other, "Operation queue is full"),
432                ));
433            }
434
435            queue.push_back(tracked_op);
436        }
437
438        // Update statistics
439        {
440            let mut stats = self.stats.write().await;
441            stats.current_queue_depth = {
442                let queue = self.pending_operations.lock().await;
443                queue.len()
444            };
445            stats.max_queue_depth = std::cmp::max(stats.max_queue_depth, stats.current_queue_depth);
446        }
447
448        Ok(update.operation_id)
449    }
450
451    /// Submit multiple operations as a batch
452    pub async fn submit_batch(&self, operations: Vec<IndexUpdate>) -> RragResult<Vec<String>> {
453        if operations.is_empty() {
454            return Ok(Vec::new());
455        }
456
457        // Create batch operation
458        let batch_id = Uuid::new_v4().to_string();
459        let batch_operation = IndexOperation::Batch {
460            operations: operations.iter().map(|op| op.operation.clone()).collect(),
461        };
462
463        let batch_update = IndexUpdate {
464            operation_id: batch_id.clone(),
465            operation: batch_operation,
466            priority: operations.iter().map(|op| op.priority).max().unwrap_or(5),
467            timestamp: chrono::Utc::now(),
468            source: "batch_processor".to_string(),
469            metadata: HashMap::new(),
470            dependencies: Vec::new(),
471            max_retries: 3,
472            retry_count: 0,
473        };
474
475        // Submit individual operations
476        let mut operation_ids = Vec::new();
477        for operation in operations {
478            let op_id = self.submit_update(operation).await?;
479            operation_ids.push(op_id);
480        }
481
482        // Submit batch operation
483        self.submit_update(batch_update).await?;
484        operation_ids.push(batch_id);
485
486        Ok(operation_ids)
487    }
488
489    /// Get operation status
490    pub async fn get_operation_status(
491        &self,
492        operation_id: &str,
493    ) -> RragResult<Option<OperationStatus>> {
494        // Check processing operations
495        {
496            let processing = self.processing_operations.read().await;
497            if let Some(op) = processing.get(operation_id) {
498                return Ok(Some(op.status.clone()));
499            }
500        }
501
502        // Check pending operations
503        {
504            let queue = self.pending_operations.lock().await;
505            for op in queue.iter() {
506                if op.update.operation_id == operation_id {
507                    return Ok(Some(op.status.clone()));
508                }
509            }
510        }
511
512        // Check completed operations
513        {
514            let completed = self.completed_operations.read().await;
515            for op in completed.iter() {
516                if op.update.operation_id == operation_id {
517                    return Ok(Some(op.status.clone()));
518                }
519            }
520        }
521
522        Ok(None)
523    }
524
525    /// Get operation result
526    pub async fn get_operation_result(
527        &self,
528        operation_id: &str,
529    ) -> RragResult<Option<UpdateResult>> {
530        // Check processing operations first
531        {
532            let processing = self.processing_operations.read().await;
533            if let Some(op) = processing.get(operation_id) {
534                return Ok(op.result.clone());
535            }
536        }
537
538        // Check completed operations
539        {
540            let completed = self.completed_operations.read().await;
541            for op in completed.iter() {
542                if op.update.operation_id == operation_id {
543                    return Ok(op.result.clone());
544                }
545            }
546        }
547
548        Ok(None)
549    }
550
551    /// Cancel a pending operation
552    pub async fn cancel_operation(&self, operation_id: &str) -> RragResult<bool> {
553        // Try to cancel from pending queue
554        {
555            let mut queue = self.pending_operations.lock().await;
556            if let Some(pos) = queue
557                .iter()
558                .position(|op| op.update.operation_id == operation_id)
559            {
560                queue.remove(pos);
561                return Ok(true);
562            }
563        }
564
565        // Try to cancel from processing (if not too far along)
566        {
567            let mut processing = self.processing_operations.write().await;
568            if let Some(mut op) = processing.remove(operation_id) {
569                op.status = OperationStatus::Cancelled;
570                op.end_time = Some(chrono::Utc::now());
571
572                // Move to completed
573                let mut completed = self.completed_operations.write().await;
574                completed.push_back(op);
575
576                return Ok(true);
577            }
578        }
579
580        Ok(false)
581    }
582
583    /// Get current statistics
584    pub async fn get_stats(&self) -> IndexManagerStats {
585        let mut stats = self.stats.read().await.clone();
586        stats.current_queue_depth = {
587            let queue = self.pending_operations.lock().await;
588            queue.len()
589        };
590        stats.last_updated = chrono::Utc::now();
591        stats
592    }
593
594    /// Get index state information
595    pub async fn get_index_state(&self) -> RragResult<HashMap<String, serde_json::Value>> {
596        let state = self.index_state.read().await;
597        let mut info = HashMap::new();
598
599        info.insert(
600            "indexed_documents_count".to_string(),
601            serde_json::Value::Number(state.indexed_documents.len().into()),
602        );
603        info.insert(
604            "last_updated".to_string(),
605            serde_json::Value::String(state.last_updated.to_rfc3339()),
606        );
607        info.insert(
608            "metadata".to_string(),
609            serde_json::Value::Object(state.metadata.clone().into_iter().collect()),
610        );
611
612        Ok(info)
613    }
614
615    /// Health check
616    pub async fn health_check(&self) -> RragResult<bool> {
617        // Check if background tasks are running
618        let handles = self.task_handles.lock().await;
619        let all_running = handles.iter().all(|handle| !handle.is_finished());
620
621        // Check queue health
622        let queue_size = {
623            let queue = self.pending_operations.lock().await;
624            queue.len()
625        };
626        let queue_healthy = queue_size < self.config.max_pending_operations;
627
628        Ok(all_running && queue_healthy)
629    }
630
631    /// Start background processing tasks
632    async fn start_background_tasks(&self) -> RragResult<()> {
633        let mut handles = self.task_handles.lock().await;
634
635        // Operation processor task
636        let processor_handle = self.start_operation_processor().await;
637        handles.push(processor_handle);
638
639        // Cleanup task
640        if self.config.enable_auto_cleanup {
641            let cleanup_handle = self.start_cleanup_task().await;
642            handles.push(cleanup_handle);
643        }
644
645        Ok(())
646    }
647
648    /// Start the main operation processor
649    async fn start_operation_processor(&self) -> tokio::task::JoinHandle<()> {
650        let pending_ops = Arc::clone(&self.pending_operations);
651        let processing_ops = Arc::clone(&self.processing_operations);
652        let completed_ops = Arc::clone(&self.completed_operations);
653        let index_state = Arc::clone(&self.index_state);
654        let conflict_resolver = Arc::clone(&self.conflict_resolver);
655        let stats = Arc::clone(&self.stats);
656        let config = self.config.clone();
657
658        tokio::spawn(async move {
659            loop {
660                // Process next operation
661                let operation = {
662                    let mut queue = pending_ops.lock().await;
663                    queue.pop_front()
664                };
665
666                if let Some(mut tracked_op) = operation {
667                    tracked_op.status = OperationStatus::Processing;
668                    tracked_op.start_time = Some(chrono::Utc::now());
669
670                    let operation_id = tracked_op.update.operation_id.clone();
671
672                    // Move to processing
673                    {
674                        let mut processing = processing_ops.write().await;
675                        processing.insert(operation_id.clone(), tracked_op.clone());
676                    }
677
678                    // Process the operation
679                    let result = Self::process_operation(
680                        &tracked_op.update,
681                        &index_state,
682                        &conflict_resolver,
683                        &config,
684                    )
685                    .await;
686
687                    // Update tracked operation
688                    tracked_op.end_time = Some(chrono::Utc::now());
689                    tracked_op.result = Some(result.clone());
690                    tracked_op.status = if result.success {
691                        OperationStatus::Completed
692                    } else {
693                        OperationStatus::Failed(result.error.unwrap_or_default())
694                    };
695
696                    // Save operation type for statistics before moving tracked_op
697                    let op_type = format!("{:?}", tracked_op.update.operation)
698                        .split('{')
699                        .next()
700                        .unwrap_or("Unknown")
701                        .to_string();
702
703                    // Move to completed
704                    {
705                        let mut processing = processing_ops.write().await;
706                        processing.remove(&operation_id);
707                    }
708                    {
709                        let mut completed = completed_ops.write().await;
710                        completed.push_back(tracked_op);
711
712                        // Limit completed operations history
713                        if completed.len() > config.max_operation_log {
714                            completed.pop_front();
715                        }
716                    }
717
718                    // Update statistics
719                    {
720                        let mut stats_guard = stats.write().await;
721                        stats_guard.total_operations += 1;
722
723                        *stats_guard.operations_by_type.entry(op_type).or_insert(0) += 1;
724
725                        stats_guard.success_rate = if stats_guard.total_operations > 0 {
726                            let successful = stats_guard.operations_by_type.values().sum::<u64>();
727                            successful as f64 / stats_guard.total_operations as f64
728                        } else {
729                            0.0
730                        };
731
732                        stats_guard.avg_processing_time_ms = (stats_guard.avg_processing_time_ms
733                            + result.processing_time_ms as f64)
734                            / 2.0;
735
736                        stats_guard.last_updated = chrono::Utc::now();
737                    }
738                } else {
739                    // No operations pending, sleep briefly
740                    tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
741                }
742            }
743        })
744    }
745
746    /// Start cleanup task
747    async fn start_cleanup_task(&self) -> tokio::task::JoinHandle<()> {
748        let completed_ops = Arc::clone(&self.completed_operations);
749        let config = self.config.clone();
750
751        tokio::spawn(async move {
752            let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(
753                config.cleanup_interval_secs,
754            ));
755
756            loop {
757                interval.tick().await;
758
759                // Cleanup old completed operations
760                {
761                    let mut completed = completed_ops.write().await;
762                    while completed.len() > config.max_operation_log {
763                        completed.pop_front();
764                    }
765                }
766            }
767        })
768    }
769
770    /// Process a single operation (static method for background task)
771    async fn process_operation(
772        update: &IndexUpdate,
773        index_state: &Arc<RwLock<IndexState>>,
774        conflict_resolver: &Arc<ConflictResolver>,
775        _config: &IndexManagerConfig,
776    ) -> UpdateResult {
777        let start_time = std::time::Instant::now();
778        let mut conflicts = Vec::new();
779        let mut items_affected = 0;
780        let mut operations_completed = Vec::new();
781
782        let success = match &update.operation {
783            IndexOperation::Add {
784                document,
785                chunks,
786                embeddings,
787            } => {
788                match Self::process_add_operation(document, chunks, embeddings, index_state).await {
789                    Ok(count) => {
790                        items_affected = count;
791                        operations_completed.push("add".to_string());
792                        true
793                    }
794                    Err(_) => false,
795                }
796            }
797
798            IndexOperation::Update {
799                document_id,
800                document,
801                chunks,
802                embeddings,
803                change_result,
804            } => {
805                match Self::process_update_operation(
806                    document_id,
807                    document,
808                    chunks,
809                    embeddings,
810                    change_result,
811                    index_state,
812                    conflict_resolver,
813                )
814                .await
815                {
816                    Ok((count, detected_conflicts)) => {
817                        items_affected = count;
818                        conflicts = detected_conflicts;
819                        operations_completed.push("update".to_string());
820                        true
821                    }
822                    Err(_) => false,
823                }
824            }
825
826            IndexOperation::Delete { document_id } => {
827                match Self::process_delete_operation(document_id, index_state).await {
828                    Ok(count) => {
829                        items_affected = count;
830                        operations_completed.push("delete".to_string());
831                        true
832                    }
833                    Err(_) => false,
834                }
835            }
836
837            IndexOperation::UpdateEmbeddings {
838                document_id,
839                embeddings,
840            } => match Self::process_embedding_update(document_id, embeddings, index_state).await {
841                Ok(count) => {
842                    items_affected = count;
843                    operations_completed.push("update_embeddings".to_string());
844                    true
845                }
846                Err(_) => false,
847            },
848
849            IndexOperation::UpdateChunks {
850                document_id,
851                chunks,
852            } => match Self::process_chunk_update(document_id, chunks, index_state).await {
853                Ok(count) => {
854                    items_affected = count;
855                    operations_completed.push("update_chunks".to_string());
856                    true
857                }
858                Err(_) => false,
859            },
860
861            IndexOperation::Batch { operations } => {
862                operations_completed.push("batch".to_string());
863                items_affected = operations.len();
864                true // Simplified for batch operations
865            }
866
867            IndexOperation::Rebuild {
868                index_name: _,
869                document_ids,
870            } => {
871                operations_completed.push("rebuild".to_string());
872                items_affected = document_ids.len();
873                true // Simplified for rebuild operations
874            }
875        };
876
877        UpdateResult {
878            operation_id: update.operation_id.clone(),
879            success,
880            operations_completed,
881            conflicts,
882            processing_time_ms: start_time.elapsed().as_millis() as u64,
883            items_affected,
884            error: if success {
885                None
886            } else {
887                Some("Operation failed".to_string())
888            },
889            metadata: HashMap::new(),
890        }
891    }
892
893    /// Process add operation
894    async fn process_add_operation(
895        document: &Document,
896        chunks: &[DocumentChunk],
897        embeddings: &[Embedding],
898        index_state: &Arc<RwLock<IndexState>>,
899    ) -> RragResult<usize> {
900        let mut state = index_state.write().await;
901
902        // Add document to index
903        state.indexed_documents.insert(document.id.clone());
904        state.document_versions.insert(document.id.clone(), 1);
905        state.last_updated = chrono::Utc::now();
906
907        Ok(1 + chunks.len() + embeddings.len())
908    }
909
910    /// Process update operation
911    async fn process_update_operation(
912        document_id: &str,
913        document: &Document,
914        chunks: &[DocumentChunk],
915        embeddings: &[Embedding],
916        change_result: &ChangeResult,
917        index_state: &Arc<RwLock<IndexState>>,
918        _conflict_resolver: &Arc<ConflictResolver>,
919    ) -> RragResult<(usize, Vec<ConflictInfo>)> {
920        let mut state = index_state.write().await;
921        let conflicts = Vec::new();
922
923        // Check for conflicts
924        if let Some(_current_version) = state.document_versions.get(document_id) {
925            // Simple conflict detection - in production, would be more sophisticated
926            if change_result.change_type == ChangeType::NoChange {
927                // No actual conflict, but could indicate race condition
928            }
929        }
930
931        // Update document in index
932        state.indexed_documents.insert(document.id.clone());
933        let new_version = state.document_versions.get(document_id).unwrap_or(&0) + 1;
934        state
935            .document_versions
936            .insert(document_id.to_string(), new_version);
937        state.last_updated = chrono::Utc::now();
938
939        Ok((1 + chunks.len() + embeddings.len(), conflicts))
940    }
941
942    /// Process delete operation
943    async fn process_delete_operation(
944        document_id: &str,
945        index_state: &Arc<RwLock<IndexState>>,
946    ) -> RragResult<usize> {
947        let mut state = index_state.write().await;
948
949        let was_present = state.indexed_documents.remove(document_id);
950        state.document_versions.remove(document_id);
951        state.last_updated = chrono::Utc::now();
952
953        Ok(if was_present { 1 } else { 0 })
954    }
955
956    /// Process embedding update
957    async fn process_embedding_update(
958        _document_id: &str,
959        embeddings: &[Embedding],
960        index_state: &Arc<RwLock<IndexState>>,
961    ) -> RragResult<usize> {
962        let mut state = index_state.write().await;
963        state.last_updated = chrono::Utc::now();
964        Ok(embeddings.len())
965    }
966
967    /// Process chunk update
968    async fn process_chunk_update(
969        _document_id: &str,
970        chunks: &[DocumentChunk],
971        index_state: &Arc<RwLock<IndexState>>,
972    ) -> RragResult<usize> {
973        let mut state = index_state.write().await;
974        state.last_updated = chrono::Utc::now();
975        Ok(chunks.len())
976    }
977
978    /// Validate an update operation
979    async fn validate_update(&self, update: &IndexUpdate) -> RragResult<()> {
980        // Basic validation
981        if update.operation_id.is_empty() {
982            return Err(RragError::validation("operation_id", "non-empty", "empty"));
983        }
984
985        if update.priority > 10 {
986            return Err(RragError::validation(
987                "priority",
988                "0-10",
989                &update.priority.to_string(),
990            ));
991        }
992
993        // Validate operation-specific requirements
994        match &update.operation {
995            IndexOperation::Add { document, .. } => {
996                if document.id.is_empty() {
997                    return Err(RragError::validation("document.id", "non-empty", "empty"));
998                }
999            }
1000            IndexOperation::Update { document_id, .. } => {
1001                if document_id.is_empty() {
1002                    return Err(RragError::validation("document_id", "non-empty", "empty"));
1003                }
1004            }
1005            IndexOperation::Delete { document_id } => {
1006                if document_id.is_empty() {
1007                    return Err(RragError::validation("document_id", "non-empty", "empty"));
1008                }
1009            }
1010            _ => {} // Other validations as needed
1011        }
1012
1013        Ok(())
1014    }
1015}
1016
1017#[cfg(test)]
1018mod tests {
1019    use super::*;
1020    use crate::Document;
1021
1022    #[tokio::test]
1023    async fn test_index_manager_creation() {
1024        let config = IndexManagerConfig::default();
1025        let manager = IncrementalIndexManager::new(config).await.unwrap();
1026        assert!(manager.health_check().await.unwrap());
1027    }
1028
1029    #[tokio::test]
1030    async fn test_submit_add_operation() {
1031        let manager = IncrementalIndexManager::new(IndexManagerConfig::default())
1032            .await
1033            .unwrap();
1034
1035        let doc = Document::new("Test content");
1036        let operation = IndexOperation::Add {
1037            document: doc.clone(),
1038            chunks: Vec::new(),
1039            embeddings: Vec::new(),
1040        };
1041
1042        let update = IndexUpdate {
1043            operation_id: Uuid::new_v4().to_string(),
1044            operation,
1045            priority: 5,
1046            timestamp: chrono::Utc::now(),
1047            source: "test".to_string(),
1048            metadata: HashMap::new(),
1049            dependencies: Vec::new(),
1050            max_retries: 3,
1051            retry_count: 0,
1052        };
1053
1054        let op_id = manager.submit_update(update).await.unwrap();
1055        assert!(!op_id.is_empty());
1056
1057        // Check that operation was queued
1058        let status = manager.get_operation_status(&op_id).await.unwrap();
1059        assert!(status.is_some());
1060    }
1061
1062    #[tokio::test]
1063    async fn test_batch_operations() {
1064        let manager = IncrementalIndexManager::new(IndexManagerConfig::default())
1065            .await
1066            .unwrap();
1067
1068        let mut operations = Vec::new();
1069        for i in 0..3 {
1070            let doc = Document::new(format!("Test content {}", i));
1071            let operation = IndexOperation::Add {
1072                document: doc,
1073                chunks: Vec::new(),
1074                embeddings: Vec::new(),
1075            };
1076
1077            let update = IndexUpdate {
1078                operation_id: Uuid::new_v4().to_string(),
1079                operation,
1080                priority: 5,
1081                timestamp: chrono::Utc::now(),
1082                source: "test".to_string(),
1083                metadata: HashMap::new(),
1084                dependencies: Vec::new(),
1085                max_retries: 3,
1086                retry_count: 0,
1087            };
1088
1089            operations.push(update);
1090        }
1091
1092        let op_ids = manager.submit_batch(operations).await.unwrap();
1093        assert_eq!(op_ids.len(), 4); // 3 individual + 1 batch operation
1094    }
1095
1096    #[tokio::test]
1097    async fn test_operation_cancellation() {
1098        let manager = IncrementalIndexManager::new(IndexManagerConfig::default())
1099            .await
1100            .unwrap();
1101
1102        let doc = Document::new("Test content");
1103        let operation = IndexOperation::Add {
1104            document: doc,
1105            chunks: Vec::new(),
1106            embeddings: Vec::new(),
1107        };
1108
1109        let update = IndexUpdate {
1110            operation_id: Uuid::new_v4().to_string(),
1111            operation,
1112            priority: 5,
1113            timestamp: chrono::Utc::now(),
1114            source: "test".to_string(),
1115            metadata: HashMap::new(),
1116            dependencies: Vec::new(),
1117            max_retries: 3,
1118            retry_count: 0,
1119        };
1120
1121        let op_id = manager.submit_update(update).await.unwrap();
1122
1123        // Try to cancel the operation
1124        let cancelled = manager.cancel_operation(&op_id).await.unwrap();
1125        assert!(cancelled);
1126    }
1127
1128    #[test]
1129    fn test_conflict_resolution_strategies() {
1130        let strategies = vec![
1131            ConflictResolutionStrategy::LastWriteWins,
1132            ConflictResolutionStrategy::FirstWriteWins,
1133            ConflictResolutionStrategy::Merge,
1134            ConflictResolutionStrategy::Manual,
1135            ConflictResolutionStrategy::Timestamp,
1136            ConflictResolutionStrategy::Custom("custom_logic".to_string()),
1137        ];
1138
1139        // Ensure all strategies are different
1140        for (i, strategy1) in strategies.iter().enumerate() {
1141            for (j, strategy2) in strategies.iter().enumerate() {
1142                if i != j {
1143                    assert_ne!(format!("{:?}", strategy1), format!("{:?}", strategy2));
1144                }
1145            }
1146        }
1147    }
1148}