use crate::incremental::change_detection::{ChangeResult, ChangeType};
use crate::{Document, DocumentChunk, Embedding, RragError, RragResult};
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet, VecDeque};
use std::sync::Arc;
use tokio::sync::{Mutex, RwLock};
use uuid::Uuid;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct IndexManagerConfig {
pub max_pending_operations: usize,
pub batch_size: usize,
pub operation_timeout_secs: u64,
pub enable_conflict_resolution: bool,
pub conflict_resolution: ConflictResolutionStrategy,
pub enable_operation_log: bool,
pub max_operation_log: usize,
pub enable_auto_cleanup: bool,
pub cleanup_interval_secs: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ConflictResolutionStrategy {
LastWriteWins,
FirstWriteWins,
Merge,
Manual,
Timestamp,
Custom(String),
}
impl Default for IndexManagerConfig {
fn default() -> Self {
Self {
max_pending_operations: 10000,
batch_size: 100,
operation_timeout_secs: 300, enable_conflict_resolution: true,
conflict_resolution: ConflictResolutionStrategy::LastWriteWins,
enable_operation_log: true,
max_operation_log: 10000,
enable_auto_cleanup: true,
cleanup_interval_secs: 3600, }
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum IndexOperation {
Add {
document: Document,
chunks: Vec<DocumentChunk>,
embeddings: Vec<Embedding>,
},
Update {
document_id: String,
document: Document,
chunks: Vec<DocumentChunk>,
embeddings: Vec<Embedding>,
change_result: ChangeResult,
},
Delete { document_id: String },
UpdateEmbeddings {
document_id: String,
embeddings: Vec<Embedding>,
},
UpdateChunks {
document_id: String,
chunks: Vec<DocumentChunk>,
},
Batch { operations: Vec<IndexOperation> },
Rebuild {
index_name: String,
document_ids: Vec<String>,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct IndexUpdate {
pub operation_id: String,
pub operation: IndexOperation,
pub priority: u8,
pub timestamp: chrono::DateTime<chrono::Utc>,
pub source: String,
pub metadata: HashMap<String, serde_json::Value>,
pub dependencies: Vec<String>,
pub max_retries: u32,
pub retry_count: u32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UpdateResult {
pub operation_id: String,
pub success: bool,
pub operations_completed: Vec<String>,
pub conflicts: Vec<ConflictInfo>,
pub processing_time_ms: u64,
pub items_affected: usize,
pub error: Option<String>,
pub metadata: HashMap<String, serde_json::Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ConflictInfo {
pub document_id: String,
pub conflict_type: ConflictType,
pub conflicting_operations: Vec<String>,
pub resolution: ConflictResolution,
pub context: HashMap<String, serde_json::Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ConflictType {
ConcurrentUpdate,
VersionMismatch,
DependencyConflict,
ResourceLock,
SchemaConflict,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ConflictResolution {
AutoResolved(String),
ManuallyResolved(String),
Deferred,
Failed(String),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum OperationStatus {
Queued,
Processing,
Completed,
Failed(String),
Cancelled,
Waiting,
ConflictResolution,
}
#[derive(Debug, Clone)]
struct TrackedOperation {
update: IndexUpdate,
status: OperationStatus,
start_time: Option<chrono::DateTime<chrono::Utc>>,
end_time: Option<chrono::DateTime<chrono::Utc>>,
result: Option<UpdateResult>,
}
pub struct IncrementalIndexManager {
config: IndexManagerConfig,
pending_operations: Arc<Mutex<VecDeque<TrackedOperation>>>,
processing_operations: Arc<RwLock<HashMap<String, TrackedOperation>>>,
completed_operations: Arc<RwLock<VecDeque<TrackedOperation>>>,
index_state: Arc<RwLock<IndexState>>,
conflict_resolver: Arc<ConflictResolver>,
stats: Arc<RwLock<IndexManagerStats>>,
task_handles: Arc<Mutex<Vec<tokio::task::JoinHandle<()>>>>,
}
#[derive(Debug)]
struct IndexState {
indexed_documents: HashSet<String>,
document_versions: HashMap<String, u64>,
document_locks: HashMap<String, tokio::sync::Mutex<()>>,
metadata: HashMap<String, serde_json::Value>,
last_updated: chrono::DateTime<chrono::Utc>,
}
struct ConflictResolver {
strategy: ConflictResolutionStrategy,
manual_queue: Arc<Mutex<VecDeque<ConflictInfo>>>,
resolution_history: Arc<RwLock<Vec<ConflictInfo>>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct IndexManagerStats {
pub total_operations: u64,
pub operations_by_type: HashMap<String, u64>,
pub success_rate: f64,
pub avg_processing_time_ms: f64,
pub total_conflicts: u64,
pub auto_resolved_conflicts: u64,
pub current_queue_depth: usize,
pub max_queue_depth: usize,
pub throughput_ops_per_second: f64,
pub last_updated: chrono::DateTime<chrono::Utc>,
}
impl IncrementalIndexManager {
pub async fn new(config: IndexManagerConfig) -> RragResult<Self> {
let pending_operations = Arc::new(Mutex::new(VecDeque::new()));
let processing_operations = Arc::new(RwLock::new(HashMap::new()));
let completed_operations = Arc::new(RwLock::new(VecDeque::new()));
let index_state = Arc::new(RwLock::new(IndexState {
indexed_documents: HashSet::new(),
document_versions: HashMap::new(),
document_locks: HashMap::new(),
metadata: HashMap::new(),
last_updated: chrono::Utc::now(),
}));
let conflict_resolver = Arc::new(ConflictResolver {
strategy: config.conflict_resolution.clone(),
manual_queue: Arc::new(Mutex::new(VecDeque::new())),
resolution_history: Arc::new(RwLock::new(Vec::new())),
});
let stats = Arc::new(RwLock::new(IndexManagerStats {
total_operations: 0,
operations_by_type: HashMap::new(),
success_rate: 0.0,
avg_processing_time_ms: 0.0,
total_conflicts: 0,
auto_resolved_conflicts: 0,
current_queue_depth: 0,
max_queue_depth: 0,
throughput_ops_per_second: 0.0,
last_updated: chrono::Utc::now(),
}));
let task_handles = Arc::new(Mutex::new(Vec::new()));
let manager = Self {
config,
pending_operations,
processing_operations,
completed_operations,
index_state,
conflict_resolver,
stats,
task_handles,
};
manager.start_background_tasks().await?;
Ok(manager)
}
pub async fn submit_update(&self, update: IndexUpdate) -> RragResult<String> {
self.validate_update(&update).await?;
let tracked_op = TrackedOperation {
update: update.clone(),
status: OperationStatus::Queued,
start_time: None,
end_time: None,
result: None,
};
{
let mut queue = self.pending_operations.lock().await;
if queue.len() >= self.config.max_pending_operations {
return Err(RragError::storage(
"queue_full",
std::io::Error::new(std::io::ErrorKind::Other, "Operation queue is full"),
));
}
queue.push_back(tracked_op);
}
{
let mut stats = self.stats.write().await;
stats.current_queue_depth = {
let queue = self.pending_operations.lock().await;
queue.len()
};
stats.max_queue_depth = std::cmp::max(stats.max_queue_depth, stats.current_queue_depth);
}
Ok(update.operation_id)
}
pub async fn submit_batch(&self, operations: Vec<IndexUpdate>) -> RragResult<Vec<String>> {
if operations.is_empty() {
return Ok(Vec::new());
}
let batch_id = Uuid::new_v4().to_string();
let batch_operation = IndexOperation::Batch {
operations: operations.iter().map(|op| op.operation.clone()).collect(),
};
let batch_update = IndexUpdate {
operation_id: batch_id.clone(),
operation: batch_operation,
priority: operations.iter().map(|op| op.priority).max().unwrap_or(5),
timestamp: chrono::Utc::now(),
source: "batch_processor".to_string(),
metadata: HashMap::new(),
dependencies: Vec::new(),
max_retries: 3,
retry_count: 0,
};
let mut operation_ids = Vec::new();
for operation in operations {
let op_id = self.submit_update(operation).await?;
operation_ids.push(op_id);
}
self.submit_update(batch_update).await?;
operation_ids.push(batch_id);
Ok(operation_ids)
}
pub async fn get_operation_status(
&self,
operation_id: &str,
) -> RragResult<Option<OperationStatus>> {
{
let processing = self.processing_operations.read().await;
if let Some(op) = processing.get(operation_id) {
return Ok(Some(op.status.clone()));
}
}
{
let queue = self.pending_operations.lock().await;
for op in queue.iter() {
if op.update.operation_id == operation_id {
return Ok(Some(op.status.clone()));
}
}
}
{
let completed = self.completed_operations.read().await;
for op in completed.iter() {
if op.update.operation_id == operation_id {
return Ok(Some(op.status.clone()));
}
}
}
Ok(None)
}
pub async fn get_operation_result(
&self,
operation_id: &str,
) -> RragResult<Option<UpdateResult>> {
{
let processing = self.processing_operations.read().await;
if let Some(op) = processing.get(operation_id) {
return Ok(op.result.clone());
}
}
{
let completed = self.completed_operations.read().await;
for op in completed.iter() {
if op.update.operation_id == operation_id {
return Ok(op.result.clone());
}
}
}
Ok(None)
}
pub async fn cancel_operation(&self, operation_id: &str) -> RragResult<bool> {
{
let mut queue = self.pending_operations.lock().await;
if let Some(pos) = queue
.iter()
.position(|op| op.update.operation_id == operation_id)
{
queue.remove(pos);
return Ok(true);
}
}
{
let mut processing = self.processing_operations.write().await;
if let Some(mut op) = processing.remove(operation_id) {
op.status = OperationStatus::Cancelled;
op.end_time = Some(chrono::Utc::now());
let mut completed = self.completed_operations.write().await;
completed.push_back(op);
return Ok(true);
}
}
Ok(false)
}
pub async fn get_stats(&self) -> IndexManagerStats {
let mut stats = self.stats.read().await.clone();
stats.current_queue_depth = {
let queue = self.pending_operations.lock().await;
queue.len()
};
stats.last_updated = chrono::Utc::now();
stats
}
pub async fn get_index_state(&self) -> RragResult<HashMap<String, serde_json::Value>> {
let state = self.index_state.read().await;
let mut info = HashMap::new();
info.insert(
"indexed_documents_count".to_string(),
serde_json::Value::Number(state.indexed_documents.len().into()),
);
info.insert(
"last_updated".to_string(),
serde_json::Value::String(state.last_updated.to_rfc3339()),
);
info.insert(
"metadata".to_string(),
serde_json::Value::Object(state.metadata.clone().into_iter().collect()),
);
Ok(info)
}
pub async fn health_check(&self) -> RragResult<bool> {
let handles = self.task_handles.lock().await;
let all_running = handles.iter().all(|handle| !handle.is_finished());
let queue_size = {
let queue = self.pending_operations.lock().await;
queue.len()
};
let queue_healthy = queue_size < self.config.max_pending_operations;
Ok(all_running && queue_healthy)
}
async fn start_background_tasks(&self) -> RragResult<()> {
let mut handles = self.task_handles.lock().await;
let processor_handle = self.start_operation_processor().await;
handles.push(processor_handle);
if self.config.enable_auto_cleanup {
let cleanup_handle = self.start_cleanup_task().await;
handles.push(cleanup_handle);
}
Ok(())
}
async fn start_operation_processor(&self) -> tokio::task::JoinHandle<()> {
let pending_ops = Arc::clone(&self.pending_operations);
let processing_ops = Arc::clone(&self.processing_operations);
let completed_ops = Arc::clone(&self.completed_operations);
let index_state = Arc::clone(&self.index_state);
let conflict_resolver = Arc::clone(&self.conflict_resolver);
let stats = Arc::clone(&self.stats);
let config = self.config.clone();
tokio::spawn(async move {
loop {
let operation = {
let mut queue = pending_ops.lock().await;
queue.pop_front()
};
if let Some(mut tracked_op) = operation {
tracked_op.status = OperationStatus::Processing;
tracked_op.start_time = Some(chrono::Utc::now());
let operation_id = tracked_op.update.operation_id.clone();
{
let mut processing = processing_ops.write().await;
processing.insert(operation_id.clone(), tracked_op.clone());
}
let result = Self::process_operation(
&tracked_op.update,
&index_state,
&conflict_resolver,
&config,
)
.await;
tracked_op.end_time = Some(chrono::Utc::now());
tracked_op.result = Some(result.clone());
tracked_op.status = if result.success {
OperationStatus::Completed
} else {
OperationStatus::Failed(result.error.unwrap_or_default())
};
let op_type = format!("{:?}", tracked_op.update.operation)
.split('{')
.next()
.unwrap_or("Unknown")
.to_string();
{
let mut processing = processing_ops.write().await;
processing.remove(&operation_id);
}
{
let mut completed = completed_ops.write().await;
completed.push_back(tracked_op);
if completed.len() > config.max_operation_log {
completed.pop_front();
}
}
{
let mut stats_guard = stats.write().await;
stats_guard.total_operations += 1;
*stats_guard.operations_by_type.entry(op_type).or_insert(0) += 1;
stats_guard.success_rate = if stats_guard.total_operations > 0 {
let successful = stats_guard.operations_by_type.values().sum::<u64>();
successful as f64 / stats_guard.total_operations as f64
} else {
0.0
};
stats_guard.avg_processing_time_ms = (stats_guard.avg_processing_time_ms
+ result.processing_time_ms as f64)
/ 2.0;
stats_guard.last_updated = chrono::Utc::now();
}
} else {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
}
})
}
async fn start_cleanup_task(&self) -> tokio::task::JoinHandle<()> {
let completed_ops = Arc::clone(&self.completed_operations);
let config = self.config.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(
config.cleanup_interval_secs,
));
loop {
interval.tick().await;
{
let mut completed = completed_ops.write().await;
while completed.len() > config.max_operation_log {
completed.pop_front();
}
}
}
})
}
async fn process_operation(
update: &IndexUpdate,
index_state: &Arc<RwLock<IndexState>>,
conflict_resolver: &Arc<ConflictResolver>,
_config: &IndexManagerConfig,
) -> UpdateResult {
let start_time = std::time::Instant::now();
let mut conflicts = Vec::new();
let mut items_affected = 0;
let mut operations_completed = Vec::new();
let success = match &update.operation {
IndexOperation::Add {
document,
chunks,
embeddings,
} => {
match Self::process_add_operation(document, chunks, embeddings, index_state).await {
Ok(count) => {
items_affected = count;
operations_completed.push("add".to_string());
true
}
Err(_) => false,
}
}
IndexOperation::Update {
document_id,
document,
chunks,
embeddings,
change_result,
} => {
match Self::process_update_operation(
document_id,
document,
chunks,
embeddings,
change_result,
index_state,
conflict_resolver,
)
.await
{
Ok((count, detected_conflicts)) => {
items_affected = count;
conflicts = detected_conflicts;
operations_completed.push("update".to_string());
true
}
Err(_) => false,
}
}
IndexOperation::Delete { document_id } => {
match Self::process_delete_operation(document_id, index_state).await {
Ok(count) => {
items_affected = count;
operations_completed.push("delete".to_string());
true
}
Err(_) => false,
}
}
IndexOperation::UpdateEmbeddings {
document_id,
embeddings,
} => match Self::process_embedding_update(document_id, embeddings, index_state).await {
Ok(count) => {
items_affected = count;
operations_completed.push("update_embeddings".to_string());
true
}
Err(_) => false,
},
IndexOperation::UpdateChunks {
document_id,
chunks,
} => match Self::process_chunk_update(document_id, chunks, index_state).await {
Ok(count) => {
items_affected = count;
operations_completed.push("update_chunks".to_string());
true
}
Err(_) => false,
},
IndexOperation::Batch { operations } => {
operations_completed.push("batch".to_string());
items_affected = operations.len();
true }
IndexOperation::Rebuild {
index_name: _,
document_ids,
} => {
operations_completed.push("rebuild".to_string());
items_affected = document_ids.len();
true }
};
UpdateResult {
operation_id: update.operation_id.clone(),
success,
operations_completed,
conflicts,
processing_time_ms: start_time.elapsed().as_millis() as u64,
items_affected,
error: if success {
None
} else {
Some("Operation failed".to_string())
},
metadata: HashMap::new(),
}
}
async fn process_add_operation(
document: &Document,
chunks: &[DocumentChunk],
embeddings: &[Embedding],
index_state: &Arc<RwLock<IndexState>>,
) -> RragResult<usize> {
let mut state = index_state.write().await;
state.indexed_documents.insert(document.id.clone());
state.document_versions.insert(document.id.clone(), 1);
state.last_updated = chrono::Utc::now();
Ok(1 + chunks.len() + embeddings.len())
}
async fn process_update_operation(
document_id: &str,
document: &Document,
chunks: &[DocumentChunk],
embeddings: &[Embedding],
change_result: &ChangeResult,
index_state: &Arc<RwLock<IndexState>>,
_conflict_resolver: &Arc<ConflictResolver>,
) -> RragResult<(usize, Vec<ConflictInfo>)> {
let mut state = index_state.write().await;
let conflicts = Vec::new();
if let Some(_current_version) = state.document_versions.get(document_id) {
if change_result.change_type == ChangeType::NoChange {
}
}
state.indexed_documents.insert(document.id.clone());
let new_version = state.document_versions.get(document_id).unwrap_or(&0) + 1;
state
.document_versions
.insert(document_id.to_string(), new_version);
state.last_updated = chrono::Utc::now();
Ok((1 + chunks.len() + embeddings.len(), conflicts))
}
async fn process_delete_operation(
document_id: &str,
index_state: &Arc<RwLock<IndexState>>,
) -> RragResult<usize> {
let mut state = index_state.write().await;
let was_present = state.indexed_documents.remove(document_id);
state.document_versions.remove(document_id);
state.last_updated = chrono::Utc::now();
Ok(if was_present { 1 } else { 0 })
}
async fn process_embedding_update(
_document_id: &str,
embeddings: &[Embedding],
index_state: &Arc<RwLock<IndexState>>,
) -> RragResult<usize> {
let mut state = index_state.write().await;
state.last_updated = chrono::Utc::now();
Ok(embeddings.len())
}
async fn process_chunk_update(
_document_id: &str,
chunks: &[DocumentChunk],
index_state: &Arc<RwLock<IndexState>>,
) -> RragResult<usize> {
let mut state = index_state.write().await;
state.last_updated = chrono::Utc::now();
Ok(chunks.len())
}
async fn validate_update(&self, update: &IndexUpdate) -> RragResult<()> {
if update.operation_id.is_empty() {
return Err(RragError::validation("operation_id", "non-empty", "empty"));
}
if update.priority > 10 {
return Err(RragError::validation(
"priority",
"0-10",
&update.priority.to_string(),
));
}
match &update.operation {
IndexOperation::Add { document, .. } => {
if document.id.is_empty() {
return Err(RragError::validation("document.id", "non-empty", "empty"));
}
}
IndexOperation::Update { document_id, .. } => {
if document_id.is_empty() {
return Err(RragError::validation("document_id", "non-empty", "empty"));
}
}
IndexOperation::Delete { document_id } => {
if document_id.is_empty() {
return Err(RragError::validation("document_id", "non-empty", "empty"));
}
}
_ => {} }
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::Document;
#[tokio::test]
async fn test_index_manager_creation() {
let config = IndexManagerConfig::default();
let manager = IncrementalIndexManager::new(config).await.unwrap();
assert!(manager.health_check().await.unwrap());
}
#[tokio::test]
async fn test_submit_add_operation() {
let manager = IncrementalIndexManager::new(IndexManagerConfig::default())
.await
.unwrap();
let doc = Document::new("Test content");
let operation = IndexOperation::Add {
document: doc.clone(),
chunks: Vec::new(),
embeddings: Vec::new(),
};
let update = IndexUpdate {
operation_id: Uuid::new_v4().to_string(),
operation,
priority: 5,
timestamp: chrono::Utc::now(),
source: "test".to_string(),
metadata: HashMap::new(),
dependencies: Vec::new(),
max_retries: 3,
retry_count: 0,
};
let op_id = manager.submit_update(update).await.unwrap();
assert!(!op_id.is_empty());
let status = manager.get_operation_status(&op_id).await.unwrap();
assert!(status.is_some());
}
#[tokio::test]
async fn test_batch_operations() {
let manager = IncrementalIndexManager::new(IndexManagerConfig::default())
.await
.unwrap();
let mut operations = Vec::new();
for i in 0..3 {
let doc = Document::new(format!("Test content {}", i));
let operation = IndexOperation::Add {
document: doc,
chunks: Vec::new(),
embeddings: Vec::new(),
};
let update = IndexUpdate {
operation_id: Uuid::new_v4().to_string(),
operation,
priority: 5,
timestamp: chrono::Utc::now(),
source: "test".to_string(),
metadata: HashMap::new(),
dependencies: Vec::new(),
max_retries: 3,
retry_count: 0,
};
operations.push(update);
}
let op_ids = manager.submit_batch(operations).await.unwrap();
assert_eq!(op_ids.len(), 4); }
#[tokio::test]
async fn test_operation_cancellation() {
let manager = IncrementalIndexManager::new(IndexManagerConfig::default())
.await
.unwrap();
let doc = Document::new("Test content");
let operation = IndexOperation::Add {
document: doc,
chunks: Vec::new(),
embeddings: Vec::new(),
};
let update = IndexUpdate {
operation_id: Uuid::new_v4().to_string(),
operation,
priority: 5,
timestamp: chrono::Utc::now(),
source: "test".to_string(),
metadata: HashMap::new(),
dependencies: Vec::new(),
max_retries: 3,
retry_count: 0,
};
let op_id = manager.submit_update(update).await.unwrap();
let cancelled = manager.cancel_operation(&op_id).await.unwrap();
assert!(cancelled);
}
#[test]
fn test_conflict_resolution_strategies() {
let strategies = vec![
ConflictResolutionStrategy::LastWriteWins,
ConflictResolutionStrategy::FirstWriteWins,
ConflictResolutionStrategy::Merge,
ConflictResolutionStrategy::Manual,
ConflictResolutionStrategy::Timestamp,
ConflictResolutionStrategy::Custom("custom_logic".to_string()),
];
for (i, strategy1) in strategies.iter().enumerate() {
for (j, strategy2) in strategies.iter().enumerate() {
if i != j {
assert_ne!(format!("{:?}", strategy1), format!("{:?}", strategy2));
}
}
}
}
}