pub mod batch_processor;
pub mod change_detection;
pub mod index_manager;
pub mod integrity;
pub mod monitoring;
pub mod rollback;
pub mod vector_updates;
pub mod versioning;
pub use change_detection::{
ChangeDetectionConfig, ChangeDetector, ChangeResult, ChangeType, ContentDelta, DocumentChange,
};
pub use index_manager::{
ConflictResolution, IncrementalIndexManager, IndexManagerConfig, IndexOperation, IndexUpdate,
UpdateResult,
};
pub use batch_processor::{
BatchConfig, BatchExecutor, BatchOperation, BatchProcessingStats, BatchProcessor, BatchResult,
QueueManager,
};
pub use versioning::{
DocumentVersion, VersionConflict, VersionHistory, VersionManager, VersionResolution,
VersioningConfig,
};
pub use rollback::{
OperationLog, RecoveryResult, RollbackConfig, RollbackManager, RollbackOperation, RollbackPoint,
};
pub use integrity::{
ConsistencyReport, HealthMetrics, IntegrityChecker, IntegrityConfig, IntegrityError,
ValidationResult,
};
pub use vector_updates::{
EmbeddingUpdate, IndexUpdateStrategy, VectorBatch, VectorOperation, VectorUpdateConfig,
VectorUpdateManager,
};
pub use monitoring::{
AlertConfig, IncrementalMetrics, IndexingStats, MetricsCollector, MonitoringConfig,
PerformanceTracker,
};
use crate::RragResult;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
pub struct IncrementalIndexingService {
change_detector: Arc<ChangeDetector>,
index_manager: Arc<IncrementalIndexManager>,
batch_processor: Arc<BatchProcessor>,
version_manager: Arc<VersionManager>,
rollback_manager: Arc<RollbackManager>,
integrity_checker: Arc<IntegrityChecker>,
vector_manager: Arc<VectorUpdateManager>,
metrics: Arc<RwLock<IncrementalMetrics>>,
config: IncrementalServiceConfig,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct IncrementalServiceConfig {
pub auto_change_detection: bool,
pub enable_batch_processing: bool,
pub enable_version_resolution: bool,
pub auto_integrity_checks: bool,
pub enable_rollback: bool,
pub enable_monitoring: bool,
pub max_batch_size: usize,
pub batch_timeout_ms: u64,
pub max_concurrent_ops: usize,
pub integrity_check_interval_secs: u64,
pub collect_metrics: bool,
pub optimization: OptimizationConfig,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OptimizationConfig {
pub optimize_vector_index: bool,
pub smart_conflict_resolution: bool,
pub enable_prefetching: bool,
pub enable_compression: bool,
pub memory_pool_size: usize,
pub enable_parallel_processing: bool,
}
impl Default for IncrementalServiceConfig {
fn default() -> Self {
Self {
auto_change_detection: true,
enable_batch_processing: true,
enable_version_resolution: true,
auto_integrity_checks: true,
enable_rollback: true,
enable_monitoring: true,
max_batch_size: 1000,
batch_timeout_ms: 5000,
max_concurrent_ops: 10,
integrity_check_interval_secs: 3600, collect_metrics: true,
optimization: OptimizationConfig::default(),
}
}
}
impl Default for OptimizationConfig {
fn default() -> Self {
Self {
optimize_vector_index: true,
smart_conflict_resolution: true,
enable_prefetching: false,
enable_compression: true,
memory_pool_size: 1024 * 1024 * 100, enable_parallel_processing: true,
}
}
}
pub struct IncrementalServiceBuilder {
config: IncrementalServiceConfig,
}
impl IncrementalServiceBuilder {
pub fn new() -> Self {
Self {
config: IncrementalServiceConfig::default(),
}
}
pub fn with_batch_size(mut self, size: usize) -> Self {
self.config.max_batch_size = size;
self
}
pub fn with_timeout(mut self, timeout_ms: u64) -> Self {
self.config.batch_timeout_ms = timeout_ms;
self
}
pub fn with_concurrency(mut self, max_ops: usize) -> Self {
self.config.max_concurrent_ops = max_ops;
self
}
pub fn enable_feature(mut self, feature: &str, enabled: bool) -> Self {
match feature {
"auto_change_detection" => self.config.auto_change_detection = enabled,
"batch_processing" => self.config.enable_batch_processing = enabled,
"version_resolution" => self.config.enable_version_resolution = enabled,
"integrity_checks" => self.config.auto_integrity_checks = enabled,
"rollback" => self.config.enable_rollback = enabled,
"monitoring" => self.config.enable_monitoring = enabled,
_ => {} }
self
}
pub fn with_optimization(mut self, optimization: OptimizationConfig) -> Self {
self.config.optimization = optimization;
self
}
pub async fn build(self) -> RragResult<IncrementalIndexingService> {
IncrementalIndexingService::new(self.config).await
}
}
impl Default for IncrementalServiceBuilder {
fn default() -> Self {
Self::new()
}
}
impl IncrementalIndexingService {
pub async fn new(config: IncrementalServiceConfig) -> RragResult<Self> {
let change_detector =
Arc::new(ChangeDetector::new(ChangeDetectionConfig::default()).await?);
let index_manager =
Arc::new(IncrementalIndexManager::new(IndexManagerConfig::default()).await?);
let batch_processor = Arc::new(BatchProcessor::new(BatchConfig::default()).await?);
let version_manager = Arc::new(VersionManager::new(VersioningConfig::default()).await?);
let rollback_manager = Arc::new(RollbackManager::new(RollbackConfig::default()).await?);
let integrity_checker = Arc::new(IntegrityChecker::new(IntegrityConfig::default()).await?);
let vector_manager =
Arc::new(VectorUpdateManager::new(VectorUpdateConfig::default()).await?);
let metrics = Arc::new(RwLock::new(IncrementalMetrics::new()));
Ok(Self {
change_detector,
index_manager,
batch_processor,
version_manager,
rollback_manager,
integrity_checker,
vector_manager,
metrics,
config,
})
}
pub async fn get_metrics(&self) -> IncrementalMetrics {
self.metrics.read().await.clone()
}
pub async fn health_check(&self) -> RragResult<HashMap<String, bool>> {
let mut health_status = HashMap::new();
health_status.insert(
"change_detector".to_string(),
self.change_detector.health_check().await?,
);
health_status.insert(
"index_manager".to_string(),
self.index_manager.health_check().await?,
);
health_status.insert(
"batch_processor".to_string(),
self.batch_processor.health_check().await?,
);
health_status.insert(
"version_manager".to_string(),
self.version_manager.health_check().await?,
);
health_status.insert(
"rollback_manager".to_string(),
self.rollback_manager.health_check().await?,
);
health_status.insert(
"integrity_checker".to_string(),
self.integrity_checker.health_check().await?,
);
health_status.insert(
"vector_manager".to_string(),
self.vector_manager.health_check().await?,
);
Ok(health_status)
}
pub fn get_config(&self) -> &IncrementalServiceConfig {
&self.config
}
pub async fn update_config(&mut self, new_config: IncrementalServiceConfig) -> RragResult<()> {
self.config = new_config;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_service_builder() {
let service = IncrementalServiceBuilder::new()
.with_batch_size(500)
.with_timeout(1000)
.with_concurrency(5)
.enable_feature("monitoring", true)
.build()
.await
.unwrap();
assert_eq!(service.config.max_batch_size, 500);
assert_eq!(service.config.batch_timeout_ms, 1000);
assert_eq!(service.config.max_concurrent_ops, 5);
assert!(service.config.enable_monitoring);
}
#[tokio::test]
async fn test_service_creation() {
let config = IncrementalServiceConfig::default();
let service = IncrementalIndexingService::new(config).await.unwrap();
assert!(service.config.auto_change_detection);
assert!(service.config.enable_batch_processing);
assert!(service.config.enable_version_resolution);
}
#[tokio::test]
async fn test_health_check() {
let service = IncrementalServiceBuilder::new().build().await.unwrap();
let health = service.health_check().await.unwrap();
assert!(health.len() >= 7); assert!(health.values().all(|&healthy| healthy)); }
}