rexis_rag/incremental/
batch_processor.rs

1//! # Batch Processing System
2//!
3//! Optimized batch processing for large-scale incremental updates.
4//! Handles queue management, error handling, and performance optimization.
5
6use crate::incremental::index_manager::{IndexUpdate, UpdateResult};
7use crate::{RragError, RragResult};
8use serde::{Deserialize, Serialize};
9use std::collections::{HashMap, VecDeque};
10use std::sync::Arc;
11use tokio::sync::{Mutex, RwLock, Semaphore};
12use uuid::Uuid;
13
14/// Batch processing configuration
15#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct BatchConfig {
17    /// Maximum batch size
18    pub max_batch_size: usize,
19
20    /// Minimum batch size before processing
21    pub min_batch_size: usize,
22
23    /// Batch timeout in milliseconds
24    pub batch_timeout_ms: u64,
25
26    /// Maximum concurrent batches
27    pub max_concurrent_batches: usize,
28
29    /// Enable priority-based batching
30    pub enable_priority_batching: bool,
31
32    /// Enable adaptive batch sizing
33    pub enable_adaptive_sizing: bool,
34
35    /// Error handling strategy
36    pub error_handling: ErrorHandlingStrategy,
37
38    /// Retry configuration
39    pub retry_config: RetryConfig,
40
41    /// Performance optimization settings
42    pub optimization: BatchOptimizationConfig,
43}
44
45/// Error handling strategies for batch processing
46#[derive(Debug, Clone, Serialize, Deserialize)]
47pub enum ErrorHandlingStrategy {
48    /// Fail entire batch on first error
49    FailFast,
50    /// Continue processing despite individual failures
51    ContinueOnError,
52    /// Isolate failed items and retry separately
53    IsolateAndRetry,
54    /// Use circuit breaker pattern
55    CircuitBreaker,
56}
57
58/// Retry configuration
59#[derive(Debug, Clone, Serialize, Deserialize)]
60pub struct RetryConfig {
61    /// Maximum retry attempts
62    pub max_retries: u32,
63
64    /// Base delay between retries in milliseconds
65    pub base_delay_ms: u64,
66
67    /// Exponential backoff multiplier
68    pub backoff_multiplier: f64,
69
70    /// Maximum delay between retries
71    pub max_delay_ms: u64,
72
73    /// Jitter factor (0.0 to 1.0)
74    pub jitter_factor: f64,
75}
76
77/// Batch optimization configuration
78#[derive(Debug, Clone, Serialize, Deserialize)]
79pub struct BatchOptimizationConfig {
80    /// Enable batch deduplication
81    pub enable_deduplication: bool,
82
83    /// Enable operation reordering
84    pub enable_reordering: bool,
85
86    /// Enable batch compression
87    pub enable_compression: bool,
88
89    /// Memory pool size for batching
90    pub memory_pool_size: usize,
91
92    /// Enable parallel processing within batches
93    pub enable_parallel_processing: bool,
94
95    /// Target processing time per batch in milliseconds
96    pub target_processing_time_ms: u64,
97}
98
99impl Default for BatchConfig {
100    fn default() -> Self {
101        Self {
102            max_batch_size: 100,
103            min_batch_size: 10,
104            batch_timeout_ms: 5000,
105            max_concurrent_batches: 5,
106            enable_priority_batching: true,
107            enable_adaptive_sizing: true,
108            error_handling: ErrorHandlingStrategy::ContinueOnError,
109            retry_config: RetryConfig::default(),
110            optimization: BatchOptimizationConfig::default(),
111        }
112    }
113}
114
115impl Default for RetryConfig {
116    fn default() -> Self {
117        Self {
118            max_retries: 3,
119            base_delay_ms: 1000,
120            backoff_multiplier: 2.0,
121            max_delay_ms: 30000,
122            jitter_factor: 0.1,
123        }
124    }
125}
126
127impl Default for BatchOptimizationConfig {
128    fn default() -> Self {
129        Self {
130            enable_deduplication: true,
131            enable_reordering: true,
132            enable_compression: false,
133            memory_pool_size: 1024 * 1024 * 50, // 50MB
134            enable_parallel_processing: true,
135            target_processing_time_ms: 10000, // 10 seconds
136        }
137    }
138}
139
140/// Batch operation container
141#[derive(Debug, Clone, Serialize, Deserialize)]
142pub struct BatchOperation {
143    /// Batch ID
144    pub batch_id: String,
145
146    /// Operations in this batch
147    pub operations: Vec<IndexUpdate>,
148
149    /// Batch priority (derived from operations)
150    pub priority: u8,
151
152    /// Batch creation timestamp
153    pub created_at: chrono::DateTime<chrono::Utc>,
154
155    /// Expected processing time estimate
156    pub estimated_processing_time_ms: u64,
157
158    /// Batch metadata
159    pub metadata: HashMap<String, serde_json::Value>,
160}
161
162/// Result of batch processing
163#[derive(Debug, Clone, Serialize, Deserialize)]
164pub struct BatchResult {
165    /// Batch ID
166    pub batch_id: String,
167
168    /// Overall success status
169    pub success: bool,
170
171    /// Individual operation results
172    pub operation_results: Vec<UpdateResult>,
173
174    /// Total processing time
175    pub processing_time_ms: u64,
176
177    /// Number of successful operations
178    pub successful_operations: usize,
179
180    /// Number of failed operations
181    pub failed_operations: usize,
182
183    /// Batch-level errors
184    pub batch_errors: Vec<String>,
185
186    /// Performance statistics
187    pub stats: BatchProcessingStats,
188
189    /// Retry information
190    pub retry_info: Option<RetryInfo>,
191}
192
193/// Processing statistics for a batch
194#[derive(Debug, Clone, Serialize, Deserialize)]
195pub struct BatchProcessingStats {
196    /// Queue wait time
197    pub queue_wait_time_ms: u64,
198
199    /// Actual processing time
200    pub processing_time_ms: u64,
201
202    /// Memory usage during processing
203    pub peak_memory_usage_mb: f64,
204
205    /// CPU utilization during processing
206    pub cpu_utilization_percent: f64,
207
208    /// Throughput (operations per second)
209    pub throughput_ops_per_second: f64,
210
211    /// Optimization metrics
212    pub optimizations_applied: Vec<String>,
213}
214
215/// Retry information
216#[derive(Debug, Clone, Serialize, Deserialize)]
217pub struct RetryInfo {
218    /// Current retry attempt
219    pub attempt: u32,
220
221    /// Maximum retries allowed
222    pub max_retries: u32,
223
224    /// Next retry time
225    pub next_retry_at: chrono::DateTime<chrono::Utc>,
226
227    /// Retry reason
228    pub retry_reason: String,
229
230    /// Failed operations to retry
231    pub failed_operations: Vec<String>,
232}
233
234/// Queue management system
235pub struct QueueManager {
236    /// High priority queue
237    high_priority_queue: Arc<Mutex<VecDeque<BatchOperation>>>,
238
239    /// Normal priority queue
240    normal_priority_queue: Arc<Mutex<VecDeque<BatchOperation>>>,
241
242    /// Low priority queue
243    low_priority_queue: Arc<Mutex<VecDeque<BatchOperation>>>,
244
245    /// Retry queue
246    retry_queue: Arc<Mutex<VecDeque<(BatchOperation, RetryInfo)>>>,
247
248    /// Queue statistics
249    stats: Arc<RwLock<QueueStats>>,
250}
251
252/// Queue statistics
253#[derive(Debug, Clone, Serialize, Deserialize)]
254pub struct QueueStats {
255    /// Queue sizes by priority
256    pub queue_sizes: HashMap<String, usize>,
257
258    /// Average wait times
259    pub average_wait_times_ms: HashMap<String, f64>,
260
261    /// Total items processed
262    pub total_processed: u64,
263
264    /// Current throughput
265    pub current_throughput: f64,
266
267    /// Last updated
268    pub last_updated: chrono::DateTime<chrono::Utc>,
269}
270
271/// Batch executor for processing operations
272pub struct BatchExecutor {
273    /// Configuration
274    config: BatchOptimizationConfig,
275
276    /// Concurrency control
277    semaphore: Arc<Semaphore>,
278
279    /// Processing statistics
280    stats: Arc<RwLock<ExecutorStats>>,
281}
282
283/// Executor statistics
284#[derive(Debug, Clone, Serialize, Deserialize)]
285pub struct ExecutorStats {
286    /// Total batches processed
287    pub batches_processed: u64,
288
289    /// Average processing time
290    pub avg_processing_time_ms: f64,
291
292    /// Success rate
293    pub success_rate: f64,
294
295    /// Current active batches
296    pub active_batches: usize,
297
298    /// Peak concurrent batches
299    pub peak_concurrent_batches: usize,
300
301    /// Last updated
302    pub last_updated: chrono::DateTime<chrono::Utc>,
303}
304
305/// Main batch processor
306pub struct BatchProcessor {
307    /// Configuration
308    config: BatchConfig,
309
310    /// Queue manager
311    queue_manager: Arc<QueueManager>,
312
313    /// Batch executor
314    executor: Arc<BatchExecutor>,
315
316    /// Current batches being assembled
317    current_batches: Arc<RwLock<HashMap<String, Vec<IndexUpdate>>>>,
318
319    /// Batch timers
320    batch_timers: Arc<RwLock<HashMap<String, tokio::time::Instant>>>,
321
322    /// Background task handles
323    task_handles: Arc<Mutex<Vec<tokio::task::JoinHandle<()>>>>,
324
325    /// Performance metrics
326    metrics: Arc<RwLock<ProcessingMetrics>>,
327}
328
329/// Overall processing metrics
330#[derive(Debug, Clone, Serialize, Deserialize)]
331pub struct ProcessingMetrics {
332    /// Total operations processed
333    pub total_operations: u64,
334
335    /// Total batches processed
336    pub total_batches: u64,
337
338    /// Average batch size
339    pub avg_batch_size: f64,
340
341    /// Overall throughput
342    pub throughput_ops_per_second: f64,
343
344    /// Error rates
345    pub error_rate: f64,
346
347    /// Retry statistics
348    pub retry_stats: RetryStats,
349
350    /// Performance trends
351    pub performance_trends: Vec<PerformanceDataPoint>,
352
353    /// Last updated
354    pub last_updated: chrono::DateTime<chrono::Utc>,
355}
356
357/// Retry statistics
358#[derive(Debug, Clone, Serialize, Deserialize)]
359pub struct RetryStats {
360    /// Total retries attempted
361    pub total_retries: u64,
362
363    /// Successful retries
364    pub successful_retries: u64,
365
366    /// Failed retries (exhausted)
367    pub failed_retries: u64,
368
369    /// Average retry attempts per operation
370    pub avg_retry_attempts: f64,
371}
372
373/// Performance data point for trending
374#[derive(Debug, Clone, Serialize, Deserialize)]
375pub struct PerformanceDataPoint {
376    /// Timestamp
377    pub timestamp: chrono::DateTime<chrono::Utc>,
378
379    /// Throughput at this point
380    pub throughput: f64,
381
382    /// Queue depth at this point
383    pub queue_depth: usize,
384
385    /// Error rate at this point
386    pub error_rate: f64,
387
388    /// Average processing time
389    pub avg_processing_time_ms: f64,
390}
391
392impl BatchProcessor {
393    /// Create a new batch processor
394    pub async fn new(config: BatchConfig) -> RragResult<Self> {
395        let queue_manager = Arc::new(QueueManager {
396            high_priority_queue: Arc::new(Mutex::new(VecDeque::new())),
397            normal_priority_queue: Arc::new(Mutex::new(VecDeque::new())),
398            low_priority_queue: Arc::new(Mutex::new(VecDeque::new())),
399            retry_queue: Arc::new(Mutex::new(VecDeque::new())),
400            stats: Arc::new(RwLock::new(QueueStats {
401                queue_sizes: HashMap::new(),
402                average_wait_times_ms: HashMap::new(),
403                total_processed: 0,
404                current_throughput: 0.0,
405                last_updated: chrono::Utc::now(),
406            })),
407        });
408
409        let executor = Arc::new(BatchExecutor {
410            config: config.optimization.clone(),
411            semaphore: Arc::new(Semaphore::new(config.max_concurrent_batches)),
412            stats: Arc::new(RwLock::new(ExecutorStats {
413                batches_processed: 0,
414                avg_processing_time_ms: 0.0,
415                success_rate: 0.0,
416                active_batches: 0,
417                peak_concurrent_batches: 0,
418                last_updated: chrono::Utc::now(),
419            })),
420        });
421
422        let processor = Self {
423            config,
424            queue_manager,
425            executor,
426            current_batches: Arc::new(RwLock::new(HashMap::new())),
427            batch_timers: Arc::new(RwLock::new(HashMap::new())),
428            task_handles: Arc::new(Mutex::new(Vec::new())),
429            metrics: Arc::new(RwLock::new(ProcessingMetrics {
430                total_operations: 0,
431                total_batches: 0,
432                avg_batch_size: 0.0,
433                throughput_ops_per_second: 0.0,
434                error_rate: 0.0,
435                retry_stats: RetryStats {
436                    total_retries: 0,
437                    successful_retries: 0,
438                    failed_retries: 0,
439                    avg_retry_attempts: 0.0,
440                },
441                performance_trends: Vec::new(),
442                last_updated: chrono::Utc::now(),
443            })),
444        };
445
446        processor.start_background_tasks().await?;
447        Ok(processor)
448    }
449
450    /// Add operation to batch processing queue
451    pub async fn add_operation(&self, operation: IndexUpdate) -> RragResult<String> {
452        let batch_key = self.determine_batch_key(&operation).await?;
453
454        // Add to current batch
455        {
456            let mut current_batches = self.current_batches.write().await;
457            let batch = current_batches
458                .entry(batch_key.clone())
459                .or_insert_with(Vec::new);
460            batch.push(operation);
461
462            // Start timer if this is the first operation in the batch
463            if batch.len() == 1 {
464                let mut timers = self.batch_timers.write().await;
465                timers.insert(batch_key.clone(), tokio::time::Instant::now());
466            }
467
468            // Check if batch is ready for processing
469            if batch.len() >= self.config.max_batch_size {
470                let operations = std::mem::take(batch);
471                drop(current_batches);
472
473                // Remove timer
474                let mut timers = self.batch_timers.write().await;
475                timers.remove(&batch_key);
476                drop(timers);
477
478                // Create and queue batch
479                self.create_and_queue_batch(operations).await?;
480            }
481        }
482
483        Ok(batch_key)
484    }
485
486    /// Process a batch of operations
487    pub async fn process_batch(&self, batch: BatchOperation) -> RragResult<BatchResult> {
488        let _permit = self
489            .executor
490            .semaphore
491            .acquire()
492            .await
493            .map_err(|_e| RragError::timeout("acquire_semaphore", 30000))?;
494
495        let start_time = std::time::Instant::now();
496        let queue_wait_time = start_time.elapsed();
497
498        // Update executor stats
499        {
500            let mut stats = self.executor.stats.write().await;
501            stats.active_batches += 1;
502            stats.peak_concurrent_batches =
503                std::cmp::max(stats.peak_concurrent_batches, stats.active_batches);
504        }
505
506        // Apply optimizations
507        let optimized_operations = self.optimize_batch(&batch.operations).await?;
508
509        // Process operations
510        let mut operation_results = Vec::new();
511        let mut successful_operations = 0;
512        let mut failed_operations = 0;
513        let mut batch_errors = Vec::new();
514
515        for operation in optimized_operations {
516            match self.process_single_operation(&operation).await {
517                Ok(result) => {
518                    if result.success {
519                        successful_operations += 1;
520                    } else {
521                        failed_operations += 1;
522                    }
523                    operation_results.push(result);
524                }
525                Err(e) => {
526                    failed_operations += 1;
527                    batch_errors.push(e.to_string());
528
529                    // Create error result
530                    operation_results.push(UpdateResult {
531                        operation_id: operation.operation_id.clone(),
532                        success: false,
533                        operations_completed: Vec::new(),
534                        conflicts: Vec::new(),
535                        processing_time_ms: 0,
536                        items_affected: 0,
537                        error: Some(e.to_string()),
538                        metadata: HashMap::new(),
539                    });
540                }
541            }
542        }
543
544        let processing_time = start_time.elapsed();
545        let success = match self.config.error_handling {
546            ErrorHandlingStrategy::FailFast => failed_operations == 0,
547            ErrorHandlingStrategy::ContinueOnError => successful_operations > 0,
548            ErrorHandlingStrategy::IsolateAndRetry => true, // Always succeed, handle retries separately
549            ErrorHandlingStrategy::CircuitBreaker => failed_operations < successful_operations,
550        };
551
552        // Update executor stats
553        {
554            let mut stats = self.executor.stats.write().await;
555            stats.active_batches -= 1;
556            stats.batches_processed += 1;
557            stats.avg_processing_time_ms =
558                (stats.avg_processing_time_ms + processing_time.as_millis() as f64) / 2.0;
559            stats.success_rate = if stats.batches_processed > 0 {
560                // Simplified success rate calculation
561                successful_operations as f64 / (successful_operations + failed_operations) as f64
562            } else {
563                0.0
564            };
565            stats.last_updated = chrono::Utc::now();
566        }
567
568        // Create batch result
569        let result = BatchResult {
570            batch_id: batch.batch_id,
571            success,
572            operation_results,
573            processing_time_ms: processing_time.as_millis() as u64,
574            successful_operations,
575            failed_operations,
576            batch_errors,
577            stats: BatchProcessingStats {
578                queue_wait_time_ms: queue_wait_time.as_millis() as u64,
579                processing_time_ms: processing_time.as_millis() as u64,
580                peak_memory_usage_mb: 0.0, // Would be measured in production
581                cpu_utilization_percent: 0.0, // Would be measured in production
582                throughput_ops_per_second: successful_operations as f64
583                    / processing_time.as_secs_f64(),
584                optimizations_applied: vec!["deduplication".to_string()], // Track applied optimizations
585            },
586            retry_info: None,
587        };
588
589        // Update overall metrics
590        self.update_metrics(&result).await?;
591
592        Ok(result)
593    }
594
595    /// Get current processing metrics
596    pub async fn get_metrics(&self) -> ProcessingMetrics {
597        self.metrics.read().await.clone()
598    }
599
600    /// Get queue statistics
601    pub async fn get_queue_stats(&self) -> QueueStats {
602        self.queue_manager.stats.read().await.clone()
603    }
604
605    /// Health check
606    pub async fn health_check(&self) -> RragResult<bool> {
607        // Check if background tasks are running
608        let handles = self.task_handles.lock().await;
609        let all_running = handles.iter().all(|handle| !handle.is_finished());
610
611        // Check queue health
612        let queue_stats = self.get_queue_stats().await;
613        let total_queue_size: usize = queue_stats.queue_sizes.values().sum();
614        let queue_healthy = total_queue_size < self.config.max_batch_size * 10; // Arbitrary health threshold
615
616        Ok(all_running && queue_healthy)
617    }
618
619    /// Start background processing tasks
620    async fn start_background_tasks(&self) -> RragResult<()> {
621        let mut handles = self.task_handles.lock().await;
622
623        // Batch formation task
624        handles.push(self.start_batch_formation_task().await);
625
626        // Batch processing task
627        handles.push(self.start_batch_processing_task().await);
628
629        // Timeout monitoring task
630        handles.push(self.start_timeout_monitoring_task().await);
631
632        // Metrics collection task
633        handles.push(self.start_metrics_collection_task().await);
634
635        Ok(())
636    }
637
638    /// Start batch formation monitoring task
639    async fn start_batch_formation_task(&self) -> tokio::task::JoinHandle<()> {
640        let current_batches = Arc::clone(&self.current_batches);
641        let batch_timers = Arc::clone(&self.batch_timers);
642        let config = self.config.clone();
643
644        tokio::spawn(async move {
645            let mut interval = tokio::time::interval(tokio::time::Duration::from_millis(
646                config.batch_timeout_ms / 4,
647            ));
648
649            loop {
650                interval.tick().await;
651
652                // Check for batches that should be processed due to timeout
653                let mut batches_to_process = Vec::new();
654
655                {
656                    let current_batches_read = current_batches.read().await;
657                    let timers = batch_timers.read().await;
658
659                    for (batch_key, timer) in timers.iter() {
660                        if timer.elapsed().as_millis() as u64 >= config.batch_timeout_ms {
661                            if let Some(operations) = current_batches_read.get(batch_key) {
662                                if operations.len() >= config.min_batch_size {
663                                    batches_to_process.push(batch_key.clone());
664                                }
665                            }
666                        }
667                    }
668                }
669
670                // Process timeout batches
671                for batch_key in batches_to_process {
672                    let operations = {
673                        let mut current_batches_write = current_batches.write().await;
674                        current_batches_write.remove(&batch_key).unwrap_or_default()
675                    };
676
677                    {
678                        let mut timers = batch_timers.write().await;
679                        timers.remove(&batch_key);
680                    }
681
682                    if !operations.is_empty() {
683                        // Create and queue batch (would need access to self here)
684                        // In a real implementation, this would be handled differently
685                    }
686                }
687            }
688        })
689    }
690
691    /// Start batch processing task
692    async fn start_batch_processing_task(&self) -> tokio::task::JoinHandle<()> {
693        let queue_manager = Arc::clone(&self.queue_manager);
694        let _executor = Arc::clone(&self.executor);
695
696        tokio::spawn(async move {
697            loop {
698                // Try to get next batch from queues (priority order)
699                let batch = {
700                    // High priority first
701                    let mut high_queue = queue_manager.high_priority_queue.lock().await;
702                    if let Some(batch) = high_queue.pop_front() {
703                        Some(batch)
704                    } else {
705                        drop(high_queue);
706
707                        // Normal priority next
708                        let mut normal_queue = queue_manager.normal_priority_queue.lock().await;
709                        if let Some(batch) = normal_queue.pop_front() {
710                            Some(batch)
711                        } else {
712                            drop(normal_queue);
713
714                            // Low priority last
715                            let mut low_queue = queue_manager.low_priority_queue.lock().await;
716                            low_queue.pop_front()
717                        }
718                    }
719                };
720
721                if let Some(_batch) = batch {
722                    // Process batch (simplified - would need full context)
723                    tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
724                } else {
725                    // No batches to process, sleep briefly
726                    tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
727                }
728            }
729        })
730    }
731
732    /// Start timeout monitoring task
733    async fn start_timeout_monitoring_task(&self) -> tokio::task::JoinHandle<()> {
734        tokio::spawn(async move {
735            let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(60));
736
737            loop {
738                interval.tick().await;
739                // Monitor for stuck operations, cleanup expired data, etc.
740                // Implementation would depend on specific requirements
741            }
742        })
743    }
744
745    /// Start metrics collection task
746    async fn start_metrics_collection_task(&self) -> tokio::task::JoinHandle<()> {
747        let metrics = Arc::clone(&self.metrics);
748
749        tokio::spawn(async move {
750            let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(30));
751
752            loop {
753                interval.tick().await;
754
755                // Collect performance data point
756                let mut metrics_guard = metrics.write().await;
757                let data_point = PerformanceDataPoint {
758                    timestamp: chrono::Utc::now(),
759                    throughput: metrics_guard.throughput_ops_per_second,
760                    queue_depth: 0, // Would get actual queue depth
761                    error_rate: metrics_guard.error_rate,
762                    avg_processing_time_ms: 0.0, // Would calculate from recent batches
763                };
764
765                metrics_guard.performance_trends.push(data_point);
766
767                // Limit trend history
768                if metrics_guard.performance_trends.len() > 1000 {
769                    metrics_guard.performance_trends.remove(0);
770                }
771
772                metrics_guard.last_updated = chrono::Utc::now();
773            }
774        })
775    }
776
777    /// Determine batch key for operation grouping
778    async fn determine_batch_key(&self, operation: &IndexUpdate) -> RragResult<String> {
779        if self.config.enable_priority_batching {
780            Ok(format!("priority_{}", operation.priority))
781        } else {
782            Ok("default".to_string())
783        }
784    }
785
786    /// Create and queue a batch
787    async fn create_and_queue_batch(&self, operations: Vec<IndexUpdate>) -> RragResult<()> {
788        let batch_id = Uuid::new_v4().to_string();
789        let priority = operations.iter().map(|op| op.priority).max().unwrap_or(5);
790
791        let batch = BatchOperation {
792            batch_id,
793            operations,
794            priority,
795            created_at: chrono::Utc::now(),
796            estimated_processing_time_ms: 1000, // Would estimate based on operations
797            metadata: HashMap::new(),
798        };
799
800        // Add to appropriate priority queue
801        match priority {
802            8..=10 => {
803                let mut queue = self.queue_manager.high_priority_queue.lock().await;
804                queue.push_back(batch);
805            }
806            4..=7 => {
807                let mut queue = self.queue_manager.normal_priority_queue.lock().await;
808                queue.push_back(batch);
809            }
810            _ => {
811                let mut queue = self.queue_manager.low_priority_queue.lock().await;
812                queue.push_back(batch);
813            }
814        }
815
816        Ok(())
817    }
818
819    /// Apply optimizations to a batch
820    async fn optimize_batch(&self, operations: &[IndexUpdate]) -> RragResult<Vec<IndexUpdate>> {
821        let mut optimized = operations.to_vec();
822
823        // Deduplication
824        if self.config.optimization.enable_deduplication {
825            optimized = self.deduplicate_operations(optimized).await?;
826        }
827
828        // Reordering
829        if self.config.optimization.enable_reordering {
830            optimized = self.reorder_operations(optimized).await?;
831        }
832
833        Ok(optimized)
834    }
835
836    /// Remove duplicate operations
837    async fn deduplicate_operations(
838        &self,
839        operations: Vec<IndexUpdate>,
840    ) -> RragResult<Vec<IndexUpdate>> {
841        let mut seen_documents = std::collections::HashSet::new();
842        let mut deduplicated = Vec::new();
843
844        for operation in operations {
845            // Simple deduplication based on document ID
846            let document_id = match &operation.operation {
847                crate::incremental::index_manager::IndexOperation::Add { document, .. } => {
848                    Some(&document.id)
849                }
850                crate::incremental::index_manager::IndexOperation::Update {
851                    document_id, ..
852                } => Some(document_id),
853                crate::incremental::index_manager::IndexOperation::Delete { document_id } => {
854                    Some(document_id)
855                }
856                _ => None,
857            };
858
859            if let Some(doc_id) = document_id {
860                if !seen_documents.contains(doc_id) {
861                    seen_documents.insert(doc_id.clone());
862                    deduplicated.push(operation);
863                }
864                // Skip duplicate operations for the same document
865            } else {
866                // Keep operations that don't have document IDs
867                deduplicated.push(operation);
868            }
869        }
870
871        Ok(deduplicated)
872    }
873
874    /// Reorder operations for optimal processing
875    async fn reorder_operations(
876        &self,
877        mut operations: Vec<IndexUpdate>,
878    ) -> RragResult<Vec<IndexUpdate>> {
879        // Sort by priority (descending) and then by timestamp (ascending)
880        operations.sort_by(|a, b| {
881            b.priority
882                .cmp(&a.priority)
883                .then_with(|| a.timestamp.cmp(&b.timestamp))
884        });
885
886        Ok(operations)
887    }
888
889    /// Process a single operation (placeholder)
890    async fn process_single_operation(&self, operation: &IndexUpdate) -> RragResult<UpdateResult> {
891        // This is a placeholder - in production, this would call the actual index manager
892        tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
893
894        Ok(UpdateResult {
895            operation_id: operation.operation_id.clone(),
896            success: true,
897            operations_completed: vec!["processed".to_string()],
898            conflicts: Vec::new(),
899            processing_time_ms: 10,
900            items_affected: 1,
901            error: None,
902            metadata: HashMap::new(),
903        })
904    }
905
906    /// Update overall metrics
907    async fn update_metrics(&self, result: &BatchResult) -> RragResult<()> {
908        let mut metrics = self.metrics.write().await;
909
910        metrics.total_batches += 1;
911        metrics.total_operations += result.operation_results.len() as u64;
912
913        if metrics.total_batches > 0 {
914            metrics.avg_batch_size = metrics.total_operations as f64 / metrics.total_batches as f64;
915        }
916
917        // Update throughput
918        metrics.throughput_ops_per_second = result.stats.throughput_ops_per_second;
919
920        // Update error rate
921        if metrics.total_operations > 0 {
922            metrics.error_rate = result.failed_operations as f64 / metrics.total_operations as f64;
923        }
924
925        metrics.last_updated = chrono::Utc::now();
926
927        Ok(())
928    }
929}
930
931#[cfg(test)]
932mod tests {
933    use super::*;
934    use crate::incremental::index_manager::IndexOperation;
935    use crate::Document;
936
937    #[tokio::test]
938    async fn test_batch_processor_creation() {
939        let config = BatchConfig::default();
940        let processor = BatchProcessor::new(config).await.unwrap();
941        assert!(processor.health_check().await.unwrap());
942    }
943
944    #[tokio::test]
945    async fn test_add_operation_to_batch() {
946        let processor = BatchProcessor::new(BatchConfig::default()).await.unwrap();
947
948        let doc = Document::new("Test content");
949        let operation = IndexOperation::Add {
950            document: doc,
951            chunks: Vec::new(),
952            embeddings: Vec::new(),
953        };
954
955        let update = IndexUpdate {
956            operation_id: Uuid::new_v4().to_string(),
957            operation,
958            priority: 5,
959            timestamp: chrono::Utc::now(),
960            source: "test".to_string(),
961            metadata: HashMap::new(),
962            dependencies: Vec::new(),
963            max_retries: 3,
964            retry_count: 0,
965        };
966
967        let batch_key = processor.add_operation(update).await.unwrap();
968        assert!(!batch_key.is_empty());
969    }
970
971    #[tokio::test]
972    async fn test_batch_optimization() {
973        let processor = BatchProcessor::new(BatchConfig::default()).await.unwrap();
974
975        // Create operations with same document ID (should be deduplicated)
976        let mut operations = Vec::new();
977        for i in 0..3 {
978            let doc = Document::with_id("same_doc", format!("Content {}", i));
979            let operation = IndexOperation::Update {
980                document_id: "same_doc".to_string(),
981                document: doc,
982                chunks: Vec::new(),
983                embeddings: Vec::new(),
984                change_result: crate::incremental::change_detection::ChangeResult {
985                    change_type: crate::incremental::change_detection::ChangeType::ContentChanged,
986                    document_id: "same_doc".to_string(),
987                    previous_hash: None,
988                    current_hash: format!("hash_{}", i),
989                    delta: crate::incremental::change_detection::ContentDelta {
990                        added_chars: 10,
991                        removed_chars: 0,
992                        modified_chars: 5,
993                        previous_size: 10,
994                        current_size: 20,
995                        change_percentage: 0.5,
996                    },
997                    metadata_changes: crate::incremental::change_detection::MetadataChanges {
998                        added_keys: Vec::new(),
999                        removed_keys: Vec::new(),
1000                        modified_keys: Vec::new(),
1001                        previous_metadata: HashMap::new(),
1002                        current_metadata: HashMap::new(),
1003                    },
1004                    timestamps: crate::incremental::change_detection::ChangeTimestamps {
1005                        detected_at: chrono::Utc::now(),
1006                        last_modified: None,
1007                        previous_check: None,
1008                        time_since_change: None,
1009                    },
1010                    chunk_changes: Vec::new(),
1011                    confidence: 1.0,
1012                },
1013            };
1014
1015            let update = IndexUpdate {
1016                operation_id: Uuid::new_v4().to_string(),
1017                operation,
1018                priority: 5,
1019                timestamp: chrono::Utc::now(),
1020                source: "test".to_string(),
1021                metadata: HashMap::new(),
1022                dependencies: Vec::new(),
1023                max_retries: 3,
1024                retry_count: 0,
1025            };
1026
1027            operations.push(update);
1028        }
1029
1030        let optimized = processor.optimize_batch(&operations).await.unwrap();
1031
1032        // Should have only one operation after deduplication
1033        assert_eq!(optimized.len(), 1);
1034    }
1035
1036    #[test]
1037    fn test_error_handling_strategies() {
1038        let strategies = vec![
1039            ErrorHandlingStrategy::FailFast,
1040            ErrorHandlingStrategy::ContinueOnError,
1041            ErrorHandlingStrategy::IsolateAndRetry,
1042            ErrorHandlingStrategy::CircuitBreaker,
1043        ];
1044
1045        // Ensure all strategies are different
1046        for (i, strategy1) in strategies.iter().enumerate() {
1047            for (j, strategy2) in strategies.iter().enumerate() {
1048                if i != j {
1049                    assert_ne!(format!("{:?}", strategy1), format!("{:?}", strategy2));
1050                }
1051            }
1052        }
1053    }
1054
1055    #[test]
1056    fn test_retry_config_defaults() {
1057        let config = RetryConfig::default();
1058        assert_eq!(config.max_retries, 3);
1059        assert_eq!(config.base_delay_ms, 1000);
1060        assert_eq!(config.backoff_multiplier, 2.0);
1061        assert!(config.jitter_factor >= 0.0 && config.jitter_factor <= 1.0);
1062    }
1063}