1use crate::incremental::index_manager::{IndexUpdate, UpdateResult};
7use crate::{RragError, RragResult};
8use serde::{Deserialize, Serialize};
9use std::collections::{HashMap, VecDeque};
10use std::sync::Arc;
11use tokio::sync::{Mutex, RwLock, Semaphore};
12use uuid::Uuid;
13
14#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct BatchConfig {
17 pub max_batch_size: usize,
19
20 pub min_batch_size: usize,
22
23 pub batch_timeout_ms: u64,
25
26 pub max_concurrent_batches: usize,
28
29 pub enable_priority_batching: bool,
31
32 pub enable_adaptive_sizing: bool,
34
35 pub error_handling: ErrorHandlingStrategy,
37
38 pub retry_config: RetryConfig,
40
41 pub optimization: BatchOptimizationConfig,
43}
44
45#[derive(Debug, Clone, Serialize, Deserialize)]
47pub enum ErrorHandlingStrategy {
48 FailFast,
50 ContinueOnError,
52 IsolateAndRetry,
54 CircuitBreaker,
56}
57
58#[derive(Debug, Clone, Serialize, Deserialize)]
60pub struct RetryConfig {
61 pub max_retries: u32,
63
64 pub base_delay_ms: u64,
66
67 pub backoff_multiplier: f64,
69
70 pub max_delay_ms: u64,
72
73 pub jitter_factor: f64,
75}
76
77#[derive(Debug, Clone, Serialize, Deserialize)]
79pub struct BatchOptimizationConfig {
80 pub enable_deduplication: bool,
82
83 pub enable_reordering: bool,
85
86 pub enable_compression: bool,
88
89 pub memory_pool_size: usize,
91
92 pub enable_parallel_processing: bool,
94
95 pub target_processing_time_ms: u64,
97}
98
99impl Default for BatchConfig {
100 fn default() -> Self {
101 Self {
102 max_batch_size: 100,
103 min_batch_size: 10,
104 batch_timeout_ms: 5000,
105 max_concurrent_batches: 5,
106 enable_priority_batching: true,
107 enable_adaptive_sizing: true,
108 error_handling: ErrorHandlingStrategy::ContinueOnError,
109 retry_config: RetryConfig::default(),
110 optimization: BatchOptimizationConfig::default(),
111 }
112 }
113}
114
115impl Default for RetryConfig {
116 fn default() -> Self {
117 Self {
118 max_retries: 3,
119 base_delay_ms: 1000,
120 backoff_multiplier: 2.0,
121 max_delay_ms: 30000,
122 jitter_factor: 0.1,
123 }
124 }
125}
126
127impl Default for BatchOptimizationConfig {
128 fn default() -> Self {
129 Self {
130 enable_deduplication: true,
131 enable_reordering: true,
132 enable_compression: false,
133 memory_pool_size: 1024 * 1024 * 50, enable_parallel_processing: true,
135 target_processing_time_ms: 10000, }
137 }
138}
139
140#[derive(Debug, Clone, Serialize, Deserialize)]
142pub struct BatchOperation {
143 pub batch_id: String,
145
146 pub operations: Vec<IndexUpdate>,
148
149 pub priority: u8,
151
152 pub created_at: chrono::DateTime<chrono::Utc>,
154
155 pub estimated_processing_time_ms: u64,
157
158 pub metadata: HashMap<String, serde_json::Value>,
160}
161
162#[derive(Debug, Clone, Serialize, Deserialize)]
164pub struct BatchResult {
165 pub batch_id: String,
167
168 pub success: bool,
170
171 pub operation_results: Vec<UpdateResult>,
173
174 pub processing_time_ms: u64,
176
177 pub successful_operations: usize,
179
180 pub failed_operations: usize,
182
183 pub batch_errors: Vec<String>,
185
186 pub stats: BatchProcessingStats,
188
189 pub retry_info: Option<RetryInfo>,
191}
192
193#[derive(Debug, Clone, Serialize, Deserialize)]
195pub struct BatchProcessingStats {
196 pub queue_wait_time_ms: u64,
198
199 pub processing_time_ms: u64,
201
202 pub peak_memory_usage_mb: f64,
204
205 pub cpu_utilization_percent: f64,
207
208 pub throughput_ops_per_second: f64,
210
211 pub optimizations_applied: Vec<String>,
213}
214
215#[derive(Debug, Clone, Serialize, Deserialize)]
217pub struct RetryInfo {
218 pub attempt: u32,
220
221 pub max_retries: u32,
223
224 pub next_retry_at: chrono::DateTime<chrono::Utc>,
226
227 pub retry_reason: String,
229
230 pub failed_operations: Vec<String>,
232}
233
234pub struct QueueManager {
236 high_priority_queue: Arc<Mutex<VecDeque<BatchOperation>>>,
238
239 normal_priority_queue: Arc<Mutex<VecDeque<BatchOperation>>>,
241
242 low_priority_queue: Arc<Mutex<VecDeque<BatchOperation>>>,
244
245 retry_queue: Arc<Mutex<VecDeque<(BatchOperation, RetryInfo)>>>,
247
248 stats: Arc<RwLock<QueueStats>>,
250}
251
252#[derive(Debug, Clone, Serialize, Deserialize)]
254pub struct QueueStats {
255 pub queue_sizes: HashMap<String, usize>,
257
258 pub average_wait_times_ms: HashMap<String, f64>,
260
261 pub total_processed: u64,
263
264 pub current_throughput: f64,
266
267 pub last_updated: chrono::DateTime<chrono::Utc>,
269}
270
271pub struct BatchExecutor {
273 config: BatchOptimizationConfig,
275
276 semaphore: Arc<Semaphore>,
278
279 stats: Arc<RwLock<ExecutorStats>>,
281}
282
283#[derive(Debug, Clone, Serialize, Deserialize)]
285pub struct ExecutorStats {
286 pub batches_processed: u64,
288
289 pub avg_processing_time_ms: f64,
291
292 pub success_rate: f64,
294
295 pub active_batches: usize,
297
298 pub peak_concurrent_batches: usize,
300
301 pub last_updated: chrono::DateTime<chrono::Utc>,
303}
304
305pub struct BatchProcessor {
307 config: BatchConfig,
309
310 queue_manager: Arc<QueueManager>,
312
313 executor: Arc<BatchExecutor>,
315
316 current_batches: Arc<RwLock<HashMap<String, Vec<IndexUpdate>>>>,
318
319 batch_timers: Arc<RwLock<HashMap<String, tokio::time::Instant>>>,
321
322 task_handles: Arc<Mutex<Vec<tokio::task::JoinHandle<()>>>>,
324
325 metrics: Arc<RwLock<ProcessingMetrics>>,
327}
328
329#[derive(Debug, Clone, Serialize, Deserialize)]
331pub struct ProcessingMetrics {
332 pub total_operations: u64,
334
335 pub total_batches: u64,
337
338 pub avg_batch_size: f64,
340
341 pub throughput_ops_per_second: f64,
343
344 pub error_rate: f64,
346
347 pub retry_stats: RetryStats,
349
350 pub performance_trends: Vec<PerformanceDataPoint>,
352
353 pub last_updated: chrono::DateTime<chrono::Utc>,
355}
356
357#[derive(Debug, Clone, Serialize, Deserialize)]
359pub struct RetryStats {
360 pub total_retries: u64,
362
363 pub successful_retries: u64,
365
366 pub failed_retries: u64,
368
369 pub avg_retry_attempts: f64,
371}
372
373#[derive(Debug, Clone, Serialize, Deserialize)]
375pub struct PerformanceDataPoint {
376 pub timestamp: chrono::DateTime<chrono::Utc>,
378
379 pub throughput: f64,
381
382 pub queue_depth: usize,
384
385 pub error_rate: f64,
387
388 pub avg_processing_time_ms: f64,
390}
391
392impl BatchProcessor {
393 pub async fn new(config: BatchConfig) -> RragResult<Self> {
395 let queue_manager = Arc::new(QueueManager {
396 high_priority_queue: Arc::new(Mutex::new(VecDeque::new())),
397 normal_priority_queue: Arc::new(Mutex::new(VecDeque::new())),
398 low_priority_queue: Arc::new(Mutex::new(VecDeque::new())),
399 retry_queue: Arc::new(Mutex::new(VecDeque::new())),
400 stats: Arc::new(RwLock::new(QueueStats {
401 queue_sizes: HashMap::new(),
402 average_wait_times_ms: HashMap::new(),
403 total_processed: 0,
404 current_throughput: 0.0,
405 last_updated: chrono::Utc::now(),
406 })),
407 });
408
409 let executor = Arc::new(BatchExecutor {
410 config: config.optimization.clone(),
411 semaphore: Arc::new(Semaphore::new(config.max_concurrent_batches)),
412 stats: Arc::new(RwLock::new(ExecutorStats {
413 batches_processed: 0,
414 avg_processing_time_ms: 0.0,
415 success_rate: 0.0,
416 active_batches: 0,
417 peak_concurrent_batches: 0,
418 last_updated: chrono::Utc::now(),
419 })),
420 });
421
422 let processor = Self {
423 config,
424 queue_manager,
425 executor,
426 current_batches: Arc::new(RwLock::new(HashMap::new())),
427 batch_timers: Arc::new(RwLock::new(HashMap::new())),
428 task_handles: Arc::new(Mutex::new(Vec::new())),
429 metrics: Arc::new(RwLock::new(ProcessingMetrics {
430 total_operations: 0,
431 total_batches: 0,
432 avg_batch_size: 0.0,
433 throughput_ops_per_second: 0.0,
434 error_rate: 0.0,
435 retry_stats: RetryStats {
436 total_retries: 0,
437 successful_retries: 0,
438 failed_retries: 0,
439 avg_retry_attempts: 0.0,
440 },
441 performance_trends: Vec::new(),
442 last_updated: chrono::Utc::now(),
443 })),
444 };
445
446 processor.start_background_tasks().await?;
447 Ok(processor)
448 }
449
450 pub async fn add_operation(&self, operation: IndexUpdate) -> RragResult<String> {
452 let batch_key = self.determine_batch_key(&operation).await?;
453
454 {
456 let mut current_batches = self.current_batches.write().await;
457 let batch = current_batches
458 .entry(batch_key.clone())
459 .or_insert_with(Vec::new);
460 batch.push(operation);
461
462 if batch.len() == 1 {
464 let mut timers = self.batch_timers.write().await;
465 timers.insert(batch_key.clone(), tokio::time::Instant::now());
466 }
467
468 if batch.len() >= self.config.max_batch_size {
470 let operations = std::mem::take(batch);
471 drop(current_batches);
472
473 let mut timers = self.batch_timers.write().await;
475 timers.remove(&batch_key);
476 drop(timers);
477
478 self.create_and_queue_batch(operations).await?;
480 }
481 }
482
483 Ok(batch_key)
484 }
485
486 pub async fn process_batch(&self, batch: BatchOperation) -> RragResult<BatchResult> {
488 let _permit = self
489 .executor
490 .semaphore
491 .acquire()
492 .await
493 .map_err(|_e| RragError::timeout("acquire_semaphore", 30000))?;
494
495 let start_time = std::time::Instant::now();
496 let queue_wait_time = start_time.elapsed();
497
498 {
500 let mut stats = self.executor.stats.write().await;
501 stats.active_batches += 1;
502 stats.peak_concurrent_batches =
503 std::cmp::max(stats.peak_concurrent_batches, stats.active_batches);
504 }
505
506 let optimized_operations = self.optimize_batch(&batch.operations).await?;
508
509 let mut operation_results = Vec::new();
511 let mut successful_operations = 0;
512 let mut failed_operations = 0;
513 let mut batch_errors = Vec::new();
514
515 for operation in optimized_operations {
516 match self.process_single_operation(&operation).await {
517 Ok(result) => {
518 if result.success {
519 successful_operations += 1;
520 } else {
521 failed_operations += 1;
522 }
523 operation_results.push(result);
524 }
525 Err(e) => {
526 failed_operations += 1;
527 batch_errors.push(e.to_string());
528
529 operation_results.push(UpdateResult {
531 operation_id: operation.operation_id.clone(),
532 success: false,
533 operations_completed: Vec::new(),
534 conflicts: Vec::new(),
535 processing_time_ms: 0,
536 items_affected: 0,
537 error: Some(e.to_string()),
538 metadata: HashMap::new(),
539 });
540 }
541 }
542 }
543
544 let processing_time = start_time.elapsed();
545 let success = match self.config.error_handling {
546 ErrorHandlingStrategy::FailFast => failed_operations == 0,
547 ErrorHandlingStrategy::ContinueOnError => successful_operations > 0,
548 ErrorHandlingStrategy::IsolateAndRetry => true, ErrorHandlingStrategy::CircuitBreaker => failed_operations < successful_operations,
550 };
551
552 {
554 let mut stats = self.executor.stats.write().await;
555 stats.active_batches -= 1;
556 stats.batches_processed += 1;
557 stats.avg_processing_time_ms =
558 (stats.avg_processing_time_ms + processing_time.as_millis() as f64) / 2.0;
559 stats.success_rate = if stats.batches_processed > 0 {
560 successful_operations as f64 / (successful_operations + failed_operations) as f64
562 } else {
563 0.0
564 };
565 stats.last_updated = chrono::Utc::now();
566 }
567
568 let result = BatchResult {
570 batch_id: batch.batch_id,
571 success,
572 operation_results,
573 processing_time_ms: processing_time.as_millis() as u64,
574 successful_operations,
575 failed_operations,
576 batch_errors,
577 stats: BatchProcessingStats {
578 queue_wait_time_ms: queue_wait_time.as_millis() as u64,
579 processing_time_ms: processing_time.as_millis() as u64,
580 peak_memory_usage_mb: 0.0, cpu_utilization_percent: 0.0, throughput_ops_per_second: successful_operations as f64
583 / processing_time.as_secs_f64(),
584 optimizations_applied: vec!["deduplication".to_string()], },
586 retry_info: None,
587 };
588
589 self.update_metrics(&result).await?;
591
592 Ok(result)
593 }
594
595 pub async fn get_metrics(&self) -> ProcessingMetrics {
597 self.metrics.read().await.clone()
598 }
599
600 pub async fn get_queue_stats(&self) -> QueueStats {
602 self.queue_manager.stats.read().await.clone()
603 }
604
605 pub async fn health_check(&self) -> RragResult<bool> {
607 let handles = self.task_handles.lock().await;
609 let all_running = handles.iter().all(|handle| !handle.is_finished());
610
611 let queue_stats = self.get_queue_stats().await;
613 let total_queue_size: usize = queue_stats.queue_sizes.values().sum();
614 let queue_healthy = total_queue_size < self.config.max_batch_size * 10; Ok(all_running && queue_healthy)
617 }
618
619 async fn start_background_tasks(&self) -> RragResult<()> {
621 let mut handles = self.task_handles.lock().await;
622
623 handles.push(self.start_batch_formation_task().await);
625
626 handles.push(self.start_batch_processing_task().await);
628
629 handles.push(self.start_timeout_monitoring_task().await);
631
632 handles.push(self.start_metrics_collection_task().await);
634
635 Ok(())
636 }
637
638 async fn start_batch_formation_task(&self) -> tokio::task::JoinHandle<()> {
640 let current_batches = Arc::clone(&self.current_batches);
641 let batch_timers = Arc::clone(&self.batch_timers);
642 let config = self.config.clone();
643
644 tokio::spawn(async move {
645 let mut interval = tokio::time::interval(tokio::time::Duration::from_millis(
646 config.batch_timeout_ms / 4,
647 ));
648
649 loop {
650 interval.tick().await;
651
652 let mut batches_to_process = Vec::new();
654
655 {
656 let current_batches_read = current_batches.read().await;
657 let timers = batch_timers.read().await;
658
659 for (batch_key, timer) in timers.iter() {
660 if timer.elapsed().as_millis() as u64 >= config.batch_timeout_ms {
661 if let Some(operations) = current_batches_read.get(batch_key) {
662 if operations.len() >= config.min_batch_size {
663 batches_to_process.push(batch_key.clone());
664 }
665 }
666 }
667 }
668 }
669
670 for batch_key in batches_to_process {
672 let operations = {
673 let mut current_batches_write = current_batches.write().await;
674 current_batches_write.remove(&batch_key).unwrap_or_default()
675 };
676
677 {
678 let mut timers = batch_timers.write().await;
679 timers.remove(&batch_key);
680 }
681
682 if !operations.is_empty() {
683 }
686 }
687 }
688 })
689 }
690
691 async fn start_batch_processing_task(&self) -> tokio::task::JoinHandle<()> {
693 let queue_manager = Arc::clone(&self.queue_manager);
694 let _executor = Arc::clone(&self.executor);
695
696 tokio::spawn(async move {
697 loop {
698 let batch = {
700 let mut high_queue = queue_manager.high_priority_queue.lock().await;
702 if let Some(batch) = high_queue.pop_front() {
703 Some(batch)
704 } else {
705 drop(high_queue);
706
707 let mut normal_queue = queue_manager.normal_priority_queue.lock().await;
709 if let Some(batch) = normal_queue.pop_front() {
710 Some(batch)
711 } else {
712 drop(normal_queue);
713
714 let mut low_queue = queue_manager.low_priority_queue.lock().await;
716 low_queue.pop_front()
717 }
718 }
719 };
720
721 if let Some(_batch) = batch {
722 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
724 } else {
725 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
727 }
728 }
729 })
730 }
731
732 async fn start_timeout_monitoring_task(&self) -> tokio::task::JoinHandle<()> {
734 tokio::spawn(async move {
735 let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(60));
736
737 loop {
738 interval.tick().await;
739 }
742 })
743 }
744
745 async fn start_metrics_collection_task(&self) -> tokio::task::JoinHandle<()> {
747 let metrics = Arc::clone(&self.metrics);
748
749 tokio::spawn(async move {
750 let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(30));
751
752 loop {
753 interval.tick().await;
754
755 let mut metrics_guard = metrics.write().await;
757 let data_point = PerformanceDataPoint {
758 timestamp: chrono::Utc::now(),
759 throughput: metrics_guard.throughput_ops_per_second,
760 queue_depth: 0, error_rate: metrics_guard.error_rate,
762 avg_processing_time_ms: 0.0, };
764
765 metrics_guard.performance_trends.push(data_point);
766
767 if metrics_guard.performance_trends.len() > 1000 {
769 metrics_guard.performance_trends.remove(0);
770 }
771
772 metrics_guard.last_updated = chrono::Utc::now();
773 }
774 })
775 }
776
777 async fn determine_batch_key(&self, operation: &IndexUpdate) -> RragResult<String> {
779 if self.config.enable_priority_batching {
780 Ok(format!("priority_{}", operation.priority))
781 } else {
782 Ok("default".to_string())
783 }
784 }
785
786 async fn create_and_queue_batch(&self, operations: Vec<IndexUpdate>) -> RragResult<()> {
788 let batch_id = Uuid::new_v4().to_string();
789 let priority = operations.iter().map(|op| op.priority).max().unwrap_or(5);
790
791 let batch = BatchOperation {
792 batch_id,
793 operations,
794 priority,
795 created_at: chrono::Utc::now(),
796 estimated_processing_time_ms: 1000, metadata: HashMap::new(),
798 };
799
800 match priority {
802 8..=10 => {
803 let mut queue = self.queue_manager.high_priority_queue.lock().await;
804 queue.push_back(batch);
805 }
806 4..=7 => {
807 let mut queue = self.queue_manager.normal_priority_queue.lock().await;
808 queue.push_back(batch);
809 }
810 _ => {
811 let mut queue = self.queue_manager.low_priority_queue.lock().await;
812 queue.push_back(batch);
813 }
814 }
815
816 Ok(())
817 }
818
819 async fn optimize_batch(&self, operations: &[IndexUpdate]) -> RragResult<Vec<IndexUpdate>> {
821 let mut optimized = operations.to_vec();
822
823 if self.config.optimization.enable_deduplication {
825 optimized = self.deduplicate_operations(optimized).await?;
826 }
827
828 if self.config.optimization.enable_reordering {
830 optimized = self.reorder_operations(optimized).await?;
831 }
832
833 Ok(optimized)
834 }
835
836 async fn deduplicate_operations(
838 &self,
839 operations: Vec<IndexUpdate>,
840 ) -> RragResult<Vec<IndexUpdate>> {
841 let mut seen_documents = std::collections::HashSet::new();
842 let mut deduplicated = Vec::new();
843
844 for operation in operations {
845 let document_id = match &operation.operation {
847 crate::incremental::index_manager::IndexOperation::Add { document, .. } => {
848 Some(&document.id)
849 }
850 crate::incremental::index_manager::IndexOperation::Update {
851 document_id, ..
852 } => Some(document_id),
853 crate::incremental::index_manager::IndexOperation::Delete { document_id } => {
854 Some(document_id)
855 }
856 _ => None,
857 };
858
859 if let Some(doc_id) = document_id {
860 if !seen_documents.contains(doc_id) {
861 seen_documents.insert(doc_id.clone());
862 deduplicated.push(operation);
863 }
864 } else {
866 deduplicated.push(operation);
868 }
869 }
870
871 Ok(deduplicated)
872 }
873
874 async fn reorder_operations(
876 &self,
877 mut operations: Vec<IndexUpdate>,
878 ) -> RragResult<Vec<IndexUpdate>> {
879 operations.sort_by(|a, b| {
881 b.priority
882 .cmp(&a.priority)
883 .then_with(|| a.timestamp.cmp(&b.timestamp))
884 });
885
886 Ok(operations)
887 }
888
889 async fn process_single_operation(&self, operation: &IndexUpdate) -> RragResult<UpdateResult> {
891 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
893
894 Ok(UpdateResult {
895 operation_id: operation.operation_id.clone(),
896 success: true,
897 operations_completed: vec!["processed".to_string()],
898 conflicts: Vec::new(),
899 processing_time_ms: 10,
900 items_affected: 1,
901 error: None,
902 metadata: HashMap::new(),
903 })
904 }
905
906 async fn update_metrics(&self, result: &BatchResult) -> RragResult<()> {
908 let mut metrics = self.metrics.write().await;
909
910 metrics.total_batches += 1;
911 metrics.total_operations += result.operation_results.len() as u64;
912
913 if metrics.total_batches > 0 {
914 metrics.avg_batch_size = metrics.total_operations as f64 / metrics.total_batches as f64;
915 }
916
917 metrics.throughput_ops_per_second = result.stats.throughput_ops_per_second;
919
920 if metrics.total_operations > 0 {
922 metrics.error_rate = result.failed_operations as f64 / metrics.total_operations as f64;
923 }
924
925 metrics.last_updated = chrono::Utc::now();
926
927 Ok(())
928 }
929}
930
931#[cfg(test)]
932mod tests {
933 use super::*;
934 use crate::incremental::index_manager::IndexOperation;
935 use crate::Document;
936
937 #[tokio::test]
938 async fn test_batch_processor_creation() {
939 let config = BatchConfig::default();
940 let processor = BatchProcessor::new(config).await.unwrap();
941 assert!(processor.health_check().await.unwrap());
942 }
943
944 #[tokio::test]
945 async fn test_add_operation_to_batch() {
946 let processor = BatchProcessor::new(BatchConfig::default()).await.unwrap();
947
948 let doc = Document::new("Test content");
949 let operation = IndexOperation::Add {
950 document: doc,
951 chunks: Vec::new(),
952 embeddings: Vec::new(),
953 };
954
955 let update = IndexUpdate {
956 operation_id: Uuid::new_v4().to_string(),
957 operation,
958 priority: 5,
959 timestamp: chrono::Utc::now(),
960 source: "test".to_string(),
961 metadata: HashMap::new(),
962 dependencies: Vec::new(),
963 max_retries: 3,
964 retry_count: 0,
965 };
966
967 let batch_key = processor.add_operation(update).await.unwrap();
968 assert!(!batch_key.is_empty());
969 }
970
971 #[tokio::test]
972 async fn test_batch_optimization() {
973 let processor = BatchProcessor::new(BatchConfig::default()).await.unwrap();
974
975 let mut operations = Vec::new();
977 for i in 0..3 {
978 let doc = Document::with_id("same_doc", format!("Content {}", i));
979 let operation = IndexOperation::Update {
980 document_id: "same_doc".to_string(),
981 document: doc,
982 chunks: Vec::new(),
983 embeddings: Vec::new(),
984 change_result: crate::incremental::change_detection::ChangeResult {
985 change_type: crate::incremental::change_detection::ChangeType::ContentChanged,
986 document_id: "same_doc".to_string(),
987 previous_hash: None,
988 current_hash: format!("hash_{}", i),
989 delta: crate::incremental::change_detection::ContentDelta {
990 added_chars: 10,
991 removed_chars: 0,
992 modified_chars: 5,
993 previous_size: 10,
994 current_size: 20,
995 change_percentage: 0.5,
996 },
997 metadata_changes: crate::incremental::change_detection::MetadataChanges {
998 added_keys: Vec::new(),
999 removed_keys: Vec::new(),
1000 modified_keys: Vec::new(),
1001 previous_metadata: HashMap::new(),
1002 current_metadata: HashMap::new(),
1003 },
1004 timestamps: crate::incremental::change_detection::ChangeTimestamps {
1005 detected_at: chrono::Utc::now(),
1006 last_modified: None,
1007 previous_check: None,
1008 time_since_change: None,
1009 },
1010 chunk_changes: Vec::new(),
1011 confidence: 1.0,
1012 },
1013 };
1014
1015 let update = IndexUpdate {
1016 operation_id: Uuid::new_v4().to_string(),
1017 operation,
1018 priority: 5,
1019 timestamp: chrono::Utc::now(),
1020 source: "test".to_string(),
1021 metadata: HashMap::new(),
1022 dependencies: Vec::new(),
1023 max_retries: 3,
1024 retry_count: 0,
1025 };
1026
1027 operations.push(update);
1028 }
1029
1030 let optimized = processor.optimize_batch(&operations).await.unwrap();
1031
1032 assert_eq!(optimized.len(), 1);
1034 }
1035
1036 #[test]
1037 fn test_error_handling_strategies() {
1038 let strategies = vec![
1039 ErrorHandlingStrategy::FailFast,
1040 ErrorHandlingStrategy::ContinueOnError,
1041 ErrorHandlingStrategy::IsolateAndRetry,
1042 ErrorHandlingStrategy::CircuitBreaker,
1043 ];
1044
1045 for (i, strategy1) in strategies.iter().enumerate() {
1047 for (j, strategy2) in strategies.iter().enumerate() {
1048 if i != j {
1049 assert_ne!(format!("{:?}", strategy1), format!("{:?}", strategy2));
1050 }
1051 }
1052 }
1053 }
1054
1055 #[test]
1056 fn test_retry_config_defaults() {
1057 let config = RetryConfig::default();
1058 assert_eq!(config.max_retries, 3);
1059 assert_eq!(config.base_delay_ms, 1000);
1060 assert_eq!(config.backoff_multiplier, 2.0);
1061 assert!(config.jitter_factor >= 0.0 && config.jitter_factor <= 1.0);
1062 }
1063}