1pub mod batch_processor;
34pub mod change_detection;
35pub mod index_manager;
36pub mod integrity;
37pub mod monitoring;
38pub mod rollback;
39pub mod vector_updates;
40pub mod versioning;
41
42pub use change_detection::{
44 ChangeDetectionConfig, ChangeDetector, ChangeResult, ChangeType, ContentDelta, DocumentChange,
45};
46
47pub use index_manager::{
48 ConflictResolution, IncrementalIndexManager, IndexManagerConfig, IndexOperation, IndexUpdate,
49 UpdateResult,
50};
51
52pub use batch_processor::{
53 BatchConfig, BatchExecutor, BatchOperation, BatchProcessingStats, BatchProcessor, BatchResult,
54 QueueManager,
55};
56
57pub use versioning::{
58 DocumentVersion, VersionConflict, VersionHistory, VersionManager, VersionResolution,
59 VersioningConfig,
60};
61
62pub use rollback::{
63 OperationLog, RecoveryResult, RollbackConfig, RollbackManager, RollbackOperation, RollbackPoint,
64};
65
66pub use integrity::{
67 ConsistencyReport, HealthMetrics, IntegrityChecker, IntegrityConfig, IntegrityError,
68 ValidationResult,
69};
70
71pub use vector_updates::{
72 EmbeddingUpdate, IndexUpdateStrategy, VectorBatch, VectorOperation, VectorUpdateConfig,
73 VectorUpdateManager,
74};
75
76pub use monitoring::{
77 AlertConfig, IncrementalMetrics, IndexingStats, MetricsCollector, MonitoringConfig,
78 PerformanceTracker,
79};
80
81use crate::RragResult;
82use serde::{Deserialize, Serialize};
83use std::collections::HashMap;
84use std::sync::Arc;
85use tokio::sync::RwLock;
86
87pub struct IncrementalIndexingService {
89 change_detector: Arc<ChangeDetector>,
91
92 index_manager: Arc<IncrementalIndexManager>,
94
95 batch_processor: Arc<BatchProcessor>,
97
98 version_manager: Arc<VersionManager>,
100
101 rollback_manager: Arc<RollbackManager>,
103
104 integrity_checker: Arc<IntegrityChecker>,
106
107 vector_manager: Arc<VectorUpdateManager>,
109
110 metrics: Arc<RwLock<IncrementalMetrics>>,
112
113 config: IncrementalServiceConfig,
115}
116
117#[derive(Debug, Clone, Serialize, Deserialize)]
119pub struct IncrementalServiceConfig {
120 pub auto_change_detection: bool,
122
123 pub enable_batch_processing: bool,
125
126 pub enable_version_resolution: bool,
128
129 pub auto_integrity_checks: bool,
131
132 pub enable_rollback: bool,
134
135 pub enable_monitoring: bool,
137
138 pub max_batch_size: usize,
140
141 pub batch_timeout_ms: u64,
143
144 pub max_concurrent_ops: usize,
146
147 pub integrity_check_interval_secs: u64,
149
150 pub collect_metrics: bool,
152
153 pub optimization: OptimizationConfig,
155}
156
157#[derive(Debug, Clone, Serialize, Deserialize)]
159pub struct OptimizationConfig {
160 pub optimize_vector_index: bool,
162
163 pub smart_conflict_resolution: bool,
165
166 pub enable_prefetching: bool,
168
169 pub enable_compression: bool,
171
172 pub memory_pool_size: usize,
174
175 pub enable_parallel_processing: bool,
177}
178
179impl Default for IncrementalServiceConfig {
180 fn default() -> Self {
181 Self {
182 auto_change_detection: true,
183 enable_batch_processing: true,
184 enable_version_resolution: true,
185 auto_integrity_checks: true,
186 enable_rollback: true,
187 enable_monitoring: true,
188 max_batch_size: 1000,
189 batch_timeout_ms: 5000,
190 max_concurrent_ops: 10,
191 integrity_check_interval_secs: 3600, collect_metrics: true,
193 optimization: OptimizationConfig::default(),
194 }
195 }
196}
197
198impl Default for OptimizationConfig {
199 fn default() -> Self {
200 Self {
201 optimize_vector_index: true,
202 smart_conflict_resolution: true,
203 enable_prefetching: false,
204 enable_compression: true,
205 memory_pool_size: 1024 * 1024 * 100, enable_parallel_processing: true,
207 }
208 }
209}
210
211pub struct IncrementalServiceBuilder {
213 config: IncrementalServiceConfig,
214}
215
216impl IncrementalServiceBuilder {
217 pub fn new() -> Self {
218 Self {
219 config: IncrementalServiceConfig::default(),
220 }
221 }
222
223 pub fn with_batch_size(mut self, size: usize) -> Self {
224 self.config.max_batch_size = size;
225 self
226 }
227
228 pub fn with_timeout(mut self, timeout_ms: u64) -> Self {
229 self.config.batch_timeout_ms = timeout_ms;
230 self
231 }
232
233 pub fn with_concurrency(mut self, max_ops: usize) -> Self {
234 self.config.max_concurrent_ops = max_ops;
235 self
236 }
237
238 pub fn enable_feature(mut self, feature: &str, enabled: bool) -> Self {
239 match feature {
240 "auto_change_detection" => self.config.auto_change_detection = enabled,
241 "batch_processing" => self.config.enable_batch_processing = enabled,
242 "version_resolution" => self.config.enable_version_resolution = enabled,
243 "integrity_checks" => self.config.auto_integrity_checks = enabled,
244 "rollback" => self.config.enable_rollback = enabled,
245 "monitoring" => self.config.enable_monitoring = enabled,
246 _ => {} }
248 self
249 }
250
251 pub fn with_optimization(mut self, optimization: OptimizationConfig) -> Self {
252 self.config.optimization = optimization;
253 self
254 }
255
256 pub async fn build(self) -> RragResult<IncrementalIndexingService> {
257 IncrementalIndexingService::new(self.config).await
258 }
259}
260
261impl Default for IncrementalServiceBuilder {
262 fn default() -> Self {
263 Self::new()
264 }
265}
266
267impl IncrementalIndexingService {
269 pub async fn new(config: IncrementalServiceConfig) -> RragResult<Self> {
271 let change_detector =
273 Arc::new(ChangeDetector::new(ChangeDetectionConfig::default()).await?);
274
275 let index_manager =
276 Arc::new(IncrementalIndexManager::new(IndexManagerConfig::default()).await?);
277
278 let batch_processor = Arc::new(BatchProcessor::new(BatchConfig::default()).await?);
279
280 let version_manager = Arc::new(VersionManager::new(VersioningConfig::default()).await?);
281
282 let rollback_manager = Arc::new(RollbackManager::new(RollbackConfig::default()).await?);
283
284 let integrity_checker = Arc::new(IntegrityChecker::new(IntegrityConfig::default()).await?);
285
286 let vector_manager =
287 Arc::new(VectorUpdateManager::new(VectorUpdateConfig::default()).await?);
288
289 let metrics = Arc::new(RwLock::new(IncrementalMetrics::new()));
290
291 Ok(Self {
292 change_detector,
293 index_manager,
294 batch_processor,
295 version_manager,
296 rollback_manager,
297 integrity_checker,
298 vector_manager,
299 metrics,
300 config,
301 })
302 }
303
304 pub async fn get_metrics(&self) -> IncrementalMetrics {
306 self.metrics.read().await.clone()
307 }
308
309 pub async fn health_check(&self) -> RragResult<HashMap<String, bool>> {
311 let mut health_status = HashMap::new();
312
313 health_status.insert(
314 "change_detector".to_string(),
315 self.change_detector.health_check().await?,
316 );
317 health_status.insert(
318 "index_manager".to_string(),
319 self.index_manager.health_check().await?,
320 );
321 health_status.insert(
322 "batch_processor".to_string(),
323 self.batch_processor.health_check().await?,
324 );
325 health_status.insert(
326 "version_manager".to_string(),
327 self.version_manager.health_check().await?,
328 );
329 health_status.insert(
330 "rollback_manager".to_string(),
331 self.rollback_manager.health_check().await?,
332 );
333 health_status.insert(
334 "integrity_checker".to_string(),
335 self.integrity_checker.health_check().await?,
336 );
337 health_status.insert(
338 "vector_manager".to_string(),
339 self.vector_manager.health_check().await?,
340 );
341
342 Ok(health_status)
343 }
344
345 pub fn get_config(&self) -> &IncrementalServiceConfig {
347 &self.config
348 }
349
350 pub async fn update_config(&mut self, new_config: IncrementalServiceConfig) -> RragResult<()> {
352 self.config = new_config;
353 Ok(())
354 }
355}
356
357#[cfg(test)]
358mod tests {
359 use super::*;
360
361 #[tokio::test]
362 async fn test_service_builder() {
363 let service = IncrementalServiceBuilder::new()
364 .with_batch_size(500)
365 .with_timeout(1000)
366 .with_concurrency(5)
367 .enable_feature("monitoring", true)
368 .build()
369 .await
370 .unwrap();
371
372 assert_eq!(service.config.max_batch_size, 500);
373 assert_eq!(service.config.batch_timeout_ms, 1000);
374 assert_eq!(service.config.max_concurrent_ops, 5);
375 assert!(service.config.enable_monitoring);
376 }
377
378 #[tokio::test]
379 async fn test_service_creation() {
380 let config = IncrementalServiceConfig::default();
381 let service = IncrementalIndexingService::new(config).await.unwrap();
382
383 assert!(service.config.auto_change_detection);
384 assert!(service.config.enable_batch_processing);
385 assert!(service.config.enable_version_resolution);
386 }
387
388 #[tokio::test]
389 async fn test_health_check() {
390 let service = IncrementalServiceBuilder::new().build().await.unwrap();
391 let health = service.health_check().await.unwrap();
392
393 assert!(health.len() >= 7); assert!(health.values().all(|&healthy| healthy)); }
396}