rexis_rag/incremental/
mod.rs

1//! # Incremental Indexing System for RRAG
2//!
3//! This module provides a comprehensive incremental indexing system that allows
4//! efficient document updates in production RAG systems without requiring full re-indexing.
5//!
6//! ## Key Features
7//!
8//! - **Incremental Operations**: Add, update, delete operations without full rebuilds
9//! - **Change Detection**: Efficient delta processing and conflict resolution  
10//! - **Vector Index Updates**: Smart vector index management without full rebuilds
11//! - **Document Versioning**: Complete document versioning and conflict resolution
12//! - **Batch Processing**: Optimized batch processing for large-scale updates
13//! - **Consistency Checks**: Index consistency and integrity verification
14//! - **Rollback Support**: Complete rollback capabilities for failed operations
15//! - **Performance Monitoring**: Comprehensive metrics and performance tracking
16//!
17//! ## Architecture Overview
18//!
19//! ```text
20//! ┌─────────────────────┐    ┌─────────────────────┐    ┌─────────────────────┐
21//! │   Change Detection  │────│   Index Manager     │────│   Vector Store      │
22//! │   - Content Hash    │    │   - Update Tracker  │    │   - Incremental     │
23//! │   - Version Check   │    │   - Conflict Res.   │    │   - Batch Updates   │
24//! └─────────────────────┘    └─────────────────────┘    └─────────────────────┘
25//!                                       │
26//! ┌─────────────────────┐    ┌─────────────────────┐    ┌─────────────────────┐
27//! │   Rollback System   │────│   Batch Processor   │────│   Integrity Check   │
28//! │   - Operation Log   │    │   - Queue Mgmt      │    │   - Consistency     │
29//! │   - State Snapshots │    │   - Error Handling  │    │   - Validation      │
30//! └─────────────────────┘    └─────────────────────┘    └─────────────────────┘
31//! ```
32
33pub mod batch_processor;
34pub mod change_detection;
35pub mod index_manager;
36pub mod integrity;
37pub mod monitoring;
38pub mod rollback;
39pub mod vector_updates;
40pub mod versioning;
41
42// Re-exports for convenience
43pub use change_detection::{
44    ChangeDetectionConfig, ChangeDetector, ChangeResult, ChangeType, ContentDelta, DocumentChange,
45};
46
47pub use index_manager::{
48    ConflictResolution, IncrementalIndexManager, IndexManagerConfig, IndexOperation, IndexUpdate,
49    UpdateResult,
50};
51
52pub use batch_processor::{
53    BatchConfig, BatchExecutor, BatchOperation, BatchProcessingStats, BatchProcessor, BatchResult,
54    QueueManager,
55};
56
57pub use versioning::{
58    DocumentVersion, VersionConflict, VersionHistory, VersionManager, VersionResolution,
59    VersioningConfig,
60};
61
62pub use rollback::{
63    OperationLog, RecoveryResult, RollbackConfig, RollbackManager, RollbackOperation, RollbackPoint,
64};
65
66pub use integrity::{
67    ConsistencyReport, HealthMetrics, IntegrityChecker, IntegrityConfig, IntegrityError,
68    ValidationResult,
69};
70
71pub use vector_updates::{
72    EmbeddingUpdate, IndexUpdateStrategy, VectorBatch, VectorOperation, VectorUpdateConfig,
73    VectorUpdateManager,
74};
75
76pub use monitoring::{
77    AlertConfig, IncrementalMetrics, IndexingStats, MetricsCollector, MonitoringConfig,
78    PerformanceTracker,
79};
80
81use crate::RragResult;
82use serde::{Deserialize, Serialize};
83use std::collections::HashMap;
84use std::sync::Arc;
85use tokio::sync::RwLock;
86
87/// Main incremental indexing service that orchestrates all components
88pub struct IncrementalIndexingService {
89    /// Change detection system
90    change_detector: Arc<ChangeDetector>,
91
92    /// Index management system
93    index_manager: Arc<IncrementalIndexManager>,
94
95    /// Batch processing system
96    batch_processor: Arc<BatchProcessor>,
97
98    /// Version management system
99    version_manager: Arc<VersionManager>,
100
101    /// Rollback management system
102    rollback_manager: Arc<RollbackManager>,
103
104    /// Integrity checking system
105    integrity_checker: Arc<IntegrityChecker>,
106
107    /// Vector update system
108    vector_manager: Arc<VectorUpdateManager>,
109
110    /// Performance monitoring
111    metrics: Arc<RwLock<IncrementalMetrics>>,
112
113    /// Service configuration
114    config: IncrementalServiceConfig,
115}
116
117/// Configuration for the incremental indexing service
118#[derive(Debug, Clone, Serialize, Deserialize)]
119pub struct IncrementalServiceConfig {
120    /// Enable automatic change detection
121    pub auto_change_detection: bool,
122
123    /// Enable batch processing optimization
124    pub enable_batch_processing: bool,
125
126    /// Enable version conflict resolution
127    pub enable_version_resolution: bool,
128
129    /// Enable automatic integrity checks
130    pub auto_integrity_checks: bool,
131
132    /// Enable rollback capabilities
133    pub enable_rollback: bool,
134
135    /// Enable performance monitoring
136    pub enable_monitoring: bool,
137
138    /// Maximum batch size for operations
139    pub max_batch_size: usize,
140
141    /// Batch timeout in milliseconds
142    pub batch_timeout_ms: u64,
143
144    /// Maximum concurrent operations
145    pub max_concurrent_ops: usize,
146
147    /// Integrity check interval in seconds
148    pub integrity_check_interval_secs: u64,
149
150    /// Enable metrics collection
151    pub collect_metrics: bool,
152
153    /// Performance optimization settings
154    pub optimization: OptimizationConfig,
155}
156
157/// Performance optimization configuration
158#[derive(Debug, Clone, Serialize, Deserialize)]
159pub struct OptimizationConfig {
160    /// Enable vector index optimization
161    pub optimize_vector_index: bool,
162
163    /// Enable smart conflict resolution
164    pub smart_conflict_resolution: bool,
165
166    /// Enable predictive prefetching
167    pub enable_prefetching: bool,
168
169    /// Enable compression for storage
170    pub enable_compression: bool,
171
172    /// Memory pool size for operations
173    pub memory_pool_size: usize,
174
175    /// Enable parallel processing
176    pub enable_parallel_processing: bool,
177}
178
179impl Default for IncrementalServiceConfig {
180    fn default() -> Self {
181        Self {
182            auto_change_detection: true,
183            enable_batch_processing: true,
184            enable_version_resolution: true,
185            auto_integrity_checks: true,
186            enable_rollback: true,
187            enable_monitoring: true,
188            max_batch_size: 1000,
189            batch_timeout_ms: 5000,
190            max_concurrent_ops: 10,
191            integrity_check_interval_secs: 3600, // 1 hour
192            collect_metrics: true,
193            optimization: OptimizationConfig::default(),
194        }
195    }
196}
197
198impl Default for OptimizationConfig {
199    fn default() -> Self {
200        Self {
201            optimize_vector_index: true,
202            smart_conflict_resolution: true,
203            enable_prefetching: false,
204            enable_compression: true,
205            memory_pool_size: 1024 * 1024 * 100, // 100MB
206            enable_parallel_processing: true,
207        }
208    }
209}
210
211/// Service builder for easy configuration
212pub struct IncrementalServiceBuilder {
213    config: IncrementalServiceConfig,
214}
215
216impl IncrementalServiceBuilder {
217    pub fn new() -> Self {
218        Self {
219            config: IncrementalServiceConfig::default(),
220        }
221    }
222
223    pub fn with_batch_size(mut self, size: usize) -> Self {
224        self.config.max_batch_size = size;
225        self
226    }
227
228    pub fn with_timeout(mut self, timeout_ms: u64) -> Self {
229        self.config.batch_timeout_ms = timeout_ms;
230        self
231    }
232
233    pub fn with_concurrency(mut self, max_ops: usize) -> Self {
234        self.config.max_concurrent_ops = max_ops;
235        self
236    }
237
238    pub fn enable_feature(mut self, feature: &str, enabled: bool) -> Self {
239        match feature {
240            "auto_change_detection" => self.config.auto_change_detection = enabled,
241            "batch_processing" => self.config.enable_batch_processing = enabled,
242            "version_resolution" => self.config.enable_version_resolution = enabled,
243            "integrity_checks" => self.config.auto_integrity_checks = enabled,
244            "rollback" => self.config.enable_rollback = enabled,
245            "monitoring" => self.config.enable_monitoring = enabled,
246            _ => {} // Ignore unknown features
247        }
248        self
249    }
250
251    pub fn with_optimization(mut self, optimization: OptimizationConfig) -> Self {
252        self.config.optimization = optimization;
253        self
254    }
255
256    pub async fn build(self) -> RragResult<IncrementalIndexingService> {
257        IncrementalIndexingService::new(self.config).await
258    }
259}
260
261impl Default for IncrementalServiceBuilder {
262    fn default() -> Self {
263        Self::new()
264    }
265}
266
267/// Service implementation
268impl IncrementalIndexingService {
269    /// Create a new incremental indexing service
270    pub async fn new(config: IncrementalServiceConfig) -> RragResult<Self> {
271        // Initialize all components with their respective configurations
272        let change_detector =
273            Arc::new(ChangeDetector::new(ChangeDetectionConfig::default()).await?);
274
275        let index_manager =
276            Arc::new(IncrementalIndexManager::new(IndexManagerConfig::default()).await?);
277
278        let batch_processor = Arc::new(BatchProcessor::new(BatchConfig::default()).await?);
279
280        let version_manager = Arc::new(VersionManager::new(VersioningConfig::default()).await?);
281
282        let rollback_manager = Arc::new(RollbackManager::new(RollbackConfig::default()).await?);
283
284        let integrity_checker = Arc::new(IntegrityChecker::new(IntegrityConfig::default()).await?);
285
286        let vector_manager =
287            Arc::new(VectorUpdateManager::new(VectorUpdateConfig::default()).await?);
288
289        let metrics = Arc::new(RwLock::new(IncrementalMetrics::new()));
290
291        Ok(Self {
292            change_detector,
293            index_manager,
294            batch_processor,
295            version_manager,
296            rollback_manager,
297            integrity_checker,
298            vector_manager,
299            metrics,
300            config,
301        })
302    }
303
304    /// Get service metrics
305    pub async fn get_metrics(&self) -> IncrementalMetrics {
306        self.metrics.read().await.clone()
307    }
308
309    /// Perform health check on all components
310    pub async fn health_check(&self) -> RragResult<HashMap<String, bool>> {
311        let mut health_status = HashMap::new();
312
313        health_status.insert(
314            "change_detector".to_string(),
315            self.change_detector.health_check().await?,
316        );
317        health_status.insert(
318            "index_manager".to_string(),
319            self.index_manager.health_check().await?,
320        );
321        health_status.insert(
322            "batch_processor".to_string(),
323            self.batch_processor.health_check().await?,
324        );
325        health_status.insert(
326            "version_manager".to_string(),
327            self.version_manager.health_check().await?,
328        );
329        health_status.insert(
330            "rollback_manager".to_string(),
331            self.rollback_manager.health_check().await?,
332        );
333        health_status.insert(
334            "integrity_checker".to_string(),
335            self.integrity_checker.health_check().await?,
336        );
337        health_status.insert(
338            "vector_manager".to_string(),
339            self.vector_manager.health_check().await?,
340        );
341
342        Ok(health_status)
343    }
344
345    /// Get service configuration
346    pub fn get_config(&self) -> &IncrementalServiceConfig {
347        &self.config
348    }
349
350    /// Update service configuration
351    pub async fn update_config(&mut self, new_config: IncrementalServiceConfig) -> RragResult<()> {
352        self.config = new_config;
353        Ok(())
354    }
355}
356
357#[cfg(test)]
358mod tests {
359    use super::*;
360
361    #[tokio::test]
362    async fn test_service_builder() {
363        let service = IncrementalServiceBuilder::new()
364            .with_batch_size(500)
365            .with_timeout(1000)
366            .with_concurrency(5)
367            .enable_feature("monitoring", true)
368            .build()
369            .await
370            .unwrap();
371
372        assert_eq!(service.config.max_batch_size, 500);
373        assert_eq!(service.config.batch_timeout_ms, 1000);
374        assert_eq!(service.config.max_concurrent_ops, 5);
375        assert!(service.config.enable_monitoring);
376    }
377
378    #[tokio::test]
379    async fn test_service_creation() {
380        let config = IncrementalServiceConfig::default();
381        let service = IncrementalIndexingService::new(config).await.unwrap();
382
383        assert!(service.config.auto_change_detection);
384        assert!(service.config.enable_batch_processing);
385        assert!(service.config.enable_version_resolution);
386    }
387
388    #[tokio::test]
389    async fn test_health_check() {
390        let service = IncrementalServiceBuilder::new().build().await.unwrap();
391        let health = service.health_check().await.unwrap();
392
393        assert!(health.len() >= 7); // All components should report health
394        assert!(health.values().all(|&healthy| healthy)); // All should be healthy initially
395    }
396}