1use crate::core::types::{TrackingError, TrackingResult};
8use crate::export::error_handling::{ConflictType, ExportError, ExportStage, ResourceType};
9use crate::export::fast_export_coordinator::{CompleteExportStats, FastExportConfig};
10use std::collections::HashMap;
11use std::path::PathBuf;
12use std::time::{Duration, Instant};
13#[derive(Debug)]
17pub struct ErrorRecoveryManager {
18 config: RecoveryConfig,
20 stats: RecoveryStats,
22 retry_history: HashMap<String, RetryHistory>,
24 degradation_state: DegradationState,
26}
27
28#[derive(Debug, Clone)]
30pub struct RecoveryConfig {
31 pub enable_auto_retry: bool,
33 pub max_retry_attempts: usize,
35 pub retry_interval_ms: u64,
37 pub retry_backoff_factor: f64,
39 pub max_retry_interval_ms: u64,
41
42 pub enable_graceful_degradation: bool,
44 pub degradation_threshold: f64,
46 pub recovery_threshold: f64,
48
49 pub enable_partial_save: bool,
51 pub partial_save_directory: PathBuf,
53 pub partial_save_interval: usize,
55
56 pub verbose_logging: bool,
58}
59
60impl Default for RecoveryConfig {
61 fn default() -> Self {
62 Self {
63 enable_auto_retry: true,
64 max_retry_attempts: 3,
65 retry_interval_ms: 1000,
66 retry_backoff_factor: 2.0,
67 max_retry_interval_ms: 10000,
68
69 enable_graceful_degradation: true,
70 degradation_threshold: 10.0, recovery_threshold: 2.0, enable_partial_save: true,
74 partial_save_directory: PathBuf::from("./partial_exports"),
75 partial_save_interval: 1000,
76
77 verbose_logging: false,
78 }
79 }
80}
81
82#[derive(Debug, Clone, Default)]
84pub struct RecoveryStats {
85 pub total_errors: usize,
87 pub successful_recoveries: usize,
89 pub failed_recoveries: usize,
91 pub total_retries: usize,
93 pub degradation_count: usize,
95 pub partial_saves: usize,
97 pub total_recovery_time_ms: u64,
99}
100
101#[derive(Debug, Clone)]
103pub struct RetryHistory {
104 pub operation: String,
106 pub attempt_count: usize,
108 pub last_attempt: Instant,
110 pub next_interval_ms: u64,
112 pub error_history: Vec<String>,
114}
115
116#[derive(Debug, Clone)]
118pub struct DegradationState {
119 pub is_degraded: bool,
121 pub degradation_start: Option<Instant>,
123 pub current_error_rate: f64,
125 pub degradation_level: DegradationLevel,
127 pub degradation_reason: Option<String>,
129}
130
131#[derive(Debug, Clone, PartialEq)]
133pub enum DegradationLevel {
134 Normal,
136 Light,
138 Moderate,
140 Severe,
142 Emergency,
144}
145
146#[derive(Debug, Clone)]
148pub enum RecoveryStrategy {
149 AutoRetry {
151 max_attempts: usize,
153 interval_ms: u64,
155 backoff_factor: f64,
157 },
158 GracefulDegradation {
160 target_level: DegradationLevel,
162 reason: String,
164 },
165 PartialSave {
167 save_path: PathBuf,
169 progress_percentage: f64,
171 },
172 ConfigAdjustment {
174 new_config: Box<FastExportConfig>,
176 reason: String,
178 },
179 ResourceRelease {
181 resource_type: ResourceType,
183 amount: usize,
185 },
186 SkipOperation {
188 operation: String,
190 reason: String,
192 },
193}
194
195#[derive(Debug, Clone)]
197pub struct RecoveryResult {
198 pub success: bool,
200 pub strategy: RecoveryStrategy,
202 pub message: String,
204 pub recovery_time_ms: u64,
206 pub partial_result_path: Option<PathBuf>,
208 pub suggested_actions: Vec<String>,
210}
211
212impl ErrorRecoveryManager {
213 pub fn new(config: RecoveryConfig) -> Self {
215 if config.enable_partial_save {
217 if let Err(e) = std::fs::create_dir_all(&config.partial_save_directory) {
218 tracing::warn!("ā ļø Unable to create partial save directory: {e}");
219 }
220 }
221
222 Self {
223 config,
224 stats: RecoveryStats::default(),
225 retry_history: HashMap::new(),
226 degradation_state: DegradationState {
227 is_degraded: false,
228 degradation_start: None,
229 current_error_rate: 0.0,
230 degradation_level: DegradationLevel::Normal,
231 degradation_reason: None,
232 },
233 }
234 }
235
236 pub fn handle_export_error(
238 &mut self,
239 error: &ExportError,
240 operation: &str,
241 context: &ErrorContext,
242 ) -> TrackingResult<RecoveryResult> {
243 let recovery_start = Instant::now();
244 self.stats.total_errors += 1;
245
246 if self.config.verbose_logging {
247 tracing::debug!("š§ Error recovery: {} - {}", operation, error);
248 }
249
250 let strategy = self.select_recovery_strategy(error, operation, context)?;
252
253 let result = self.execute_recovery_strategy(strategy, error, operation, context)?;
255
256 let recovery_time = recovery_start.elapsed().as_millis() as u64;
258 self.stats.total_recovery_time_ms += recovery_time;
259
260 if result.success {
261 self.stats.successful_recoveries += 1;
262 } else {
263 self.stats.failed_recoveries += 1;
264 }
265
266 self.update_degradation_state(error, &result);
268
269 if self.config.verbose_logging {
270 tracing::debug!(
271 "š§ Recovery completed: {} ({}ms)",
272 result.message,
273 recovery_time
274 );
275 }
276
277 Ok(result)
278 }
279
280 fn select_recovery_strategy(
282 &self,
283 error: &ExportError,
284 operation: &str,
285 context: &ErrorContext,
286 ) -> TrackingResult<RecoveryStrategy> {
287 match error {
288 ExportError::ParallelProcessingError { shard_index, .. } => {
289 if self.should_retry(operation) {
291 Ok(RecoveryStrategy::AutoRetry {
292 max_attempts: self.config.max_retry_attempts,
293 interval_ms: self.config.retry_interval_ms,
294 backoff_factor: self.config.retry_backoff_factor,
295 })
296 } else {
297 Ok(RecoveryStrategy::GracefulDegradation {
298 target_level: DegradationLevel::Light,
299 reason: format!(
300 "shard {shard_index} processing failed, degrade parallelism"
301 ),
302 })
303 }
304 }
305
306 ExportError::ResourceLimitExceeded {
307 resource_type,
308 suggested_action,
309 ..
310 } => {
311 match resource_type {
313 ResourceType::Memory => Ok(RecoveryStrategy::ConfigAdjustment {
314 new_config: Box::new(self.create_memory_optimized_config(context)),
315 reason: "memory limit exceeded, adjust to memory optimized configuration"
316 .to_string(),
317 }),
318 ResourceType::CPU => Ok(RecoveryStrategy::GracefulDegradation {
319 target_level: DegradationLevel::Moderate,
320 reason: "CPU usage exceeded, degrade processing intensity".to_string(),
321 }),
322 _ => Ok(RecoveryStrategy::ConfigAdjustment {
323 new_config: Box::new(context.current_config.clone()),
324 reason: suggested_action.clone(),
325 }),
326 }
327 }
328
329 ExportError::DataQualityError {
330 affected_records, ..
331 } => {
332 if self.config.enable_partial_save && context.progress_percentage > 10.0 {
334 Ok(RecoveryStrategy::PartialSave {
335 save_path: self.generate_partial_save_path(operation),
336 progress_percentage: context.progress_percentage,
337 })
338 } else {
339 Ok(RecoveryStrategy::SkipOperation {
340 operation: operation.to_string(),
341 reason: format!("data quality issue affects {affected_records} records, skip processing"),
342 })
343 }
344 }
345
346 ExportError::PerformanceThresholdExceeded { stage, .. } => {
347 match stage {
349 ExportStage::ParallelProcessing => Ok(RecoveryStrategy::ConfigAdjustment {
350 new_config: Box::new(self.create_performance_optimized_config(context)),
351 reason: "performance threshold exceeded, adjust configuration".to_string(),
352 }),
353 _ => Ok(RecoveryStrategy::GracefulDegradation {
354 target_level: DegradationLevel::Light,
355 reason: format!(
356 "performance threshold exceeded in stage {stage:?}, degrade processing"
357 ),
358 }),
359 }
360 }
361
362 ExportError::ConcurrencyConflict {
363 conflict_type,
364 retry_count,
365 ..
366 } => {
367 if *retry_count < self.config.max_retry_attempts {
369 let interval = match conflict_type {
370 ConflictType::LockContention => self.config.retry_interval_ms * 2,
371 ConflictType::ThreadPoolExhaustion => self.config.retry_interval_ms * 3,
372 _ => self.config.retry_interval_ms,
373 };
374
375 Ok(RecoveryStrategy::AutoRetry {
376 max_attempts: self.config.max_retry_attempts - retry_count,
377 interval_ms: interval,
378 backoff_factor: self.config.retry_backoff_factor,
379 })
380 } else {
381 Ok(RecoveryStrategy::GracefulDegradation {
382 target_level: DegradationLevel::Moderate,
383 reason: "concurrency conflict, degrade processing".to_string(),
384 })
385 }
386 }
387
388 ExportError::InsufficientResources { .. } => {
389 Ok(RecoveryStrategy::GracefulDegradation {
391 target_level: DegradationLevel::Emergency,
392 reason: "system resources severely insufficient, enable emergency mode"
393 .to_string(),
394 })
395 }
396
397 ExportError::ExportInterrupted {
398 progress_percentage,
399 ..
400 } => {
401 Ok(RecoveryStrategy::PartialSave {
403 save_path: self.generate_partial_save_path(operation),
404 progress_percentage: *progress_percentage,
405 })
406 }
407
408 ExportError::DataCorruption {
409 recovery_possible, ..
410 } => {
411 if *recovery_possible {
413 Ok(RecoveryStrategy::AutoRetry {
414 max_attempts: 1, interval_ms: self.config.retry_interval_ms,
416 backoff_factor: 1.0,
417 })
418 } else {
419 Ok(RecoveryStrategy::SkipOperation {
420 operation: operation.to_string(),
421 reason: "data corruption and cannot be recovered, skip operation"
422 .to_string(),
423 })
424 }
425 }
426 }
427 }
428
429 fn execute_recovery_strategy(
431 &mut self,
432 strategy: RecoveryStrategy,
433 _error: &ExportError,
434 operation: &str,
435 context: &ErrorContext,
436 ) -> TrackingResult<RecoveryResult> {
437 let _execution_start = Instant::now();
438
439 match strategy {
440 RecoveryStrategy::AutoRetry {
441 max_attempts,
442 interval_ms,
443 backoff_factor,
444 } => self.execute_auto_retry(operation, max_attempts, interval_ms, backoff_factor),
445
446 RecoveryStrategy::GracefulDegradation {
447 target_level,
448 reason,
449 } => self.execute_graceful_degradation(target_level, reason),
450
451 RecoveryStrategy::PartialSave {
452 save_path,
453 progress_percentage,
454 } => self.execute_partial_save(save_path, progress_percentage, context),
455
456 RecoveryStrategy::ConfigAdjustment { new_config, reason } => {
457 self.execute_config_adjustment(*new_config, reason)
458 }
459
460 RecoveryStrategy::ResourceRelease {
461 resource_type,
462 amount,
463 } => self.execute_resource_release(resource_type, amount),
464
465 RecoveryStrategy::SkipOperation { operation, reason } => {
466 self.execute_skip_operation(operation, reason)
467 }
468 }
469 }
470
471 fn execute_auto_retry(
473 &mut self,
474 operation: &str,
475 max_attempts: usize,
476 interval_ms: u64,
477 backoff_factor: f64,
478 ) -> TrackingResult<RecoveryResult> {
479 let history = self
480 .retry_history
481 .entry(operation.to_string())
482 .or_insert_with(|| RetryHistory {
483 operation: operation.to_string(),
484 attempt_count: 0,
485 last_attempt: Instant::now(),
486 next_interval_ms: interval_ms,
487 error_history: Vec::new(),
488 });
489
490 if history.attempt_count >= max_attempts {
491 return Ok(RecoveryResult {
492 success: false,
493 strategy: RecoveryStrategy::AutoRetry {
494 max_attempts,
495 interval_ms,
496 backoff_factor,
497 },
498 message: format!("reached maximum retry limit ({max_attempts})"),
499 recovery_time_ms: 0,
500 partial_result_path: None,
501 suggested_actions: vec![
502 "consider manual intervention or configuration adjustment".to_string()
503 ],
504 });
505 }
506
507 if history.attempt_count > 0 {
509 std::thread::sleep(Duration::from_millis(history.next_interval_ms));
510 }
511
512 history.attempt_count += 1;
513 history.last_attempt = Instant::now();
514 history.next_interval_ms = (history.next_interval_ms as f64 * backoff_factor) as u64;
515 history.next_interval_ms = history
516 .next_interval_ms
517 .min(self.config.max_retry_interval_ms);
518
519 self.stats.total_retries += 1;
520
521 Ok(RecoveryResult {
522 success: true,
523 strategy: RecoveryStrategy::AutoRetry {
524 max_attempts,
525 interval_ms,
526 backoff_factor,
527 },
528 message: format!(
529 "prepare for retry {} (max {})",
530 history.attempt_count, max_attempts
531 ),
532 recovery_time_ms: history.next_interval_ms,
533 partial_result_path: None,
534 suggested_actions: vec![
535 "monitor retry results".to_string(),
536 "if keep failed, consider adjustment strategy".to_string(),
537 ],
538 })
539 }
540
541 fn execute_graceful_degradation(
543 &mut self,
544 target_level: DegradationLevel,
545 reason: String,
546 ) -> TrackingResult<RecoveryResult> {
547 self.degradation_state.is_degraded = true;
548 self.degradation_state.degradation_start = Some(Instant::now());
549 self.degradation_state.degradation_level = target_level.clone();
550 self.degradation_state.degradation_reason = Some(reason.clone());
551 self.stats.degradation_count += 1;
552
553 let message = match target_level {
554 DegradationLevel::Light => "Enable slightly degraded mode: reduce parallelism",
555 DegradationLevel::Moderate => "Enable moderate degraded mode: disable complex features",
556 DegradationLevel::Severe => "Enable severe degraded mode: only basic features",
557 DegradationLevel::Emergency => "Enable emergency mode: minimum features",
558 DegradationLevel::Normal => "Enable normal mode",
559 };
560
561 Ok(RecoveryResult {
562 success: true,
563 strategy: RecoveryStrategy::GracefulDegradation {
564 target_level,
565 reason,
566 },
567 message: message.to_string(),
568 recovery_time_ms: 0,
569 partial_result_path: None,
570 suggested_actions: vec![
571 "monitor system status".to_string(),
572 "in conditions improve, consider normal mode".to_string(),
573 ],
574 })
575 }
576
577 fn execute_partial_save(
579 &mut self,
580 save_path: PathBuf,
581 progress_percentage: f64,
582 _context: &ErrorContext,
583 ) -> TrackingResult<RecoveryResult> {
584 if let Some(parent) = save_path.parent() {
586 std::fs::create_dir_all(parent).map_err(|e| {
587 TrackingError::IoError(format!("create partial save directory failed: {e}"))
588 })?;
589 }
590
591 let partial_data = format!(
593 "{{\"partial_export\":true,\"progress\":{progress_percentage},\"timestamp\":\"{}\",\"context\":\"unknown\"}}",
594 std::time::SystemTime::now()
595 .duration_since(std::time::UNIX_EPOCH)
596 .unwrap_or_default()
597 .as_secs()
598 );
599
600 std::fs::write(&save_path, partial_data)
601 .map_err(|e| TrackingError::IoError(format!("save partial results failed: {e}")))?;
602
603 self.stats.partial_saves += 1;
604
605 Ok(RecoveryResult {
606 success: true,
607 strategy: RecoveryStrategy::PartialSave {
608 save_path: save_path.clone(),
609 progress_percentage,
610 },
611 message: format!("partial results saved ({progress_percentage:.1}% completed)"),
612 recovery_time_ms: 0,
613 partial_result_path: Some(save_path),
614 suggested_actions: vec![
615 "check partial result file".to_string(),
616 "resume from here after fixing the issue".to_string(),
617 ],
618 })
619 }
620
621 fn execute_config_adjustment(
623 &self,
624 new_config: FastExportConfig,
625 reason: String,
626 ) -> TrackingResult<RecoveryResult> {
627 Ok(RecoveryResult {
628 success: true,
629 strategy: RecoveryStrategy::ConfigAdjustment {
630 new_config: Box::new(new_config),
631 reason: reason.clone(),
632 },
633 message: format!("config adjusted: {reason}"),
634 recovery_time_ms: 0,
635 partial_result_path: None,
636 suggested_actions: vec![
637 "use new config to retry export".to_string(),
638 "monitor new config effects".to_string(),
639 ],
640 })
641 }
642
643 fn execute_resource_release(
645 &self,
646 resource_type: ResourceType,
647 amount: usize,
648 ) -> TrackingResult<RecoveryResult> {
649 let message = match resource_type {
651 ResourceType::Memory => format!("try to release {amount} bytes of memory"),
652 ResourceType::CPU => format!("reduce CPU usage by {amount}%"),
653 ResourceType::Disk => format!("clean up {amount} bytes of disk space"),
654 ResourceType::FileHandles => format!("close {amount} file handles"),
655 ResourceType::ThreadPool => format!("reduce {amount} threads"),
656 };
657
658 Ok(RecoveryResult {
659 success: true,
660 strategy: RecoveryStrategy::ResourceRelease {
661 resource_type,
662 amount,
663 },
664 message,
665 recovery_time_ms: 0,
666 partial_result_path: None,
667 suggested_actions: vec![
668 "monitor resource usage".to_string(),
669 "retry failed operation".to_string(),
670 ],
671 })
672 }
673
674 fn execute_skip_operation(
676 &self,
677 operation: String,
678 reason: String,
679 ) -> TrackingResult<RecoveryResult> {
680 Ok(RecoveryResult {
681 success: true,
682 strategy: RecoveryStrategy::SkipOperation {
683 operation: operation.clone(),
684 reason: reason.clone(),
685 },
686 message: format!("skip operation '{operation}': {reason}"),
687 recovery_time_ms: 0,
688 partial_result_path: None,
689 suggested_actions: vec![
690 "check the impact of skipping the operation".to_string(),
691 "consider manually handling the skipped part".to_string(),
692 ],
693 })
694 }
695
696 fn should_retry(&self, operation: &str) -> bool {
698 if !self.config.enable_auto_retry {
699 return false;
700 }
701
702 if let Some(history) = self.retry_history.get(operation) {
703 history.attempt_count < self.config.max_retry_attempts
704 } else {
705 true
706 }
707 }
708
709 fn update_degradation_state(&mut self, error: &ExportError, _result: &RecoveryResult) {
711 let error_weight = match error {
713 ExportError::ParallelProcessingError { .. } => 1.0,
714 ExportError::ResourceLimitExceeded { .. } => 2.0,
715 ExportError::DataQualityError { .. } => 1.5,
716 ExportError::PerformanceThresholdExceeded { .. } => 1.0,
717 ExportError::ConcurrencyConflict { .. } => 1.0,
718 ExportError::DataCorruption { .. } => 3.0,
719 ExportError::InsufficientResources { .. } => 2.5,
720 ExportError::ExportInterrupted { .. } => 1.5,
721 };
722
723 self.degradation_state.current_error_rate =
725 (self.degradation_state.current_error_rate * 0.9) + (error_weight * 0.1);
726
727 if !self.degradation_state.is_degraded
729 && self.degradation_state.current_error_rate > self.config.degradation_threshold
730 {
731 self.degradation_state.is_degraded = true;
733 self.degradation_state.degradation_start = Some(Instant::now());
734 self.degradation_state.degradation_level = DegradationLevel::Light;
735 } else if self.degradation_state.is_degraded
736 && self.degradation_state.current_error_rate < self.config.recovery_threshold
737 {
738 self.degradation_state.is_degraded = false;
740 self.degradation_state.degradation_start = None;
741 self.degradation_state.degradation_level = DegradationLevel::Normal;
742 self.degradation_state.degradation_reason = None;
743 }
744 }
745
746 fn generate_partial_save_path(&self, operation: &str) -> PathBuf {
748 let timestamp = std::time::SystemTime::now()
749 .duration_since(std::time::UNIX_EPOCH)
750 .unwrap_or_default()
751 .as_secs();
752 let filename = format!("partial_export_{operation}_{timestamp}.json");
753 self.config.partial_save_directory.join(filename)
754 }
755
756 fn create_memory_optimized_config(&self, context: &ErrorContext) -> FastExportConfig {
758 let mut config = context.current_config.clone();
759
760 config.shard_config.max_threads = Some(2);
762 config.shard_config.shard_size /= 2;
763
764 config.writer_config.buffer_size /= 2;
766
767 config.enable_data_localization = false;
769
770 config
771 }
772
773 fn create_performance_optimized_config(&self, context: &ErrorContext) -> FastExportConfig {
775 let mut config = context.current_config.clone();
776
777 config.shard_config.shard_size = 500; config.shard_config.parallel_threshold = 1000; config.verbose_logging = false;
783 config.enable_performance_monitoring = false;
784
785 config
786 }
787
788 pub fn get_stats(&self) -> &RecoveryStats {
790 &self.stats
791 }
792
793 pub fn get_degradation_state(&self) -> &DegradationState {
795 &self.degradation_state
796 }
797
798 pub fn generate_recovery_report(&self) -> RecoveryReport {
800 let success_rate = if self.stats.total_errors > 0 {
801 (self.stats.successful_recoveries as f64 / self.stats.total_errors as f64) * 100.0
802 } else {
803 0.0
804 };
805
806 let avg_recovery_time = if self.stats.successful_recoveries > 0 {
807 self.stats.total_recovery_time_ms as f64 / self.stats.successful_recoveries as f64
808 } else {
809 0.0
810 };
811
812 RecoveryReport {
813 total_errors: self.stats.total_errors,
814 successful_recoveries: self.stats.successful_recoveries,
815 failed_recoveries: self.stats.failed_recoveries,
816 success_rate,
817 total_retries: self.stats.total_retries,
818 degradation_count: self.stats.degradation_count,
819 partial_saves: self.stats.partial_saves,
820 avg_recovery_time_ms: avg_recovery_time,
821 current_degradation_level: self.degradation_state.degradation_level.clone(),
822 is_currently_degraded: self.degradation_state.is_degraded,
823 }
824 }
825}
826
827#[derive(Debug, Clone)]
829pub struct ErrorContext {
830 pub current_config: FastExportConfig,
832 pub progress_percentage: f64,
834 pub processed_data_size: usize,
836 pub operation_start_time: Instant,
838 pub current_stats: Option<CompleteExportStats>,
840}
841
842#[derive(Debug, Clone)]
844pub struct RecoveryReport {
845 pub total_errors: usize,
847 pub successful_recoveries: usize,
849 pub failed_recoveries: usize,
851 pub success_rate: f64,
853 pub total_retries: usize,
855 pub degradation_count: usize,
857 pub partial_saves: usize,
859 pub avg_recovery_time_ms: f64,
861 pub current_degradation_level: DegradationLevel,
863 pub is_currently_degraded: bool,
865}
866
867impl RecoveryReport {
868 pub fn print_detailed_report(&self) {
870 tracing::info!("\nš§ recovery report");
871 tracing::info!("================");
872
873 tracing::info!("š total statistics:");
874 tracing::info!(" total errors: {}", self.total_errors);
875 tracing::info!(
876 " successful recoveries: {} ({:.1}%)",
877 self.successful_recoveries,
878 self.success_rate
879 );
880 tracing::info!(" failed recoveries: {}", self.failed_recoveries);
881 tracing::info!(" total retries: {}", self.total_retries);
882 tracing::info!(" degradation count: {}", self.degradation_count);
883 tracing::info!(" partial saves: {}", self.partial_saves);
884 tracing::info!(
885 " average recovery time: {:.2}ms",
886 self.avg_recovery_time_ms
887 );
888
889 tracing::info!("\nšļø current state:");
890 tracing::info!(" degradation level: {:?}", self.current_degradation_level);
891 tracing::info!(
892 " is degraded: {}",
893 if self.is_currently_degraded {
894 "yes"
895 } else {
896 "no"
897 }
898 );
899 }
900}
901
902#[cfg(test)]
903mod tests {
904 use super::*;
905 use crate::export::fast_export_coordinator::FastExportConfig;
906
907 fn create_test_context() -> ErrorContext {
908 ErrorContext {
909 current_config: FastExportConfig::default(),
910 progress_percentage: 50.0,
911 processed_data_size: 1000,
912 operation_start_time: Instant::now(),
913 current_stats: None,
914 }
915 }
916
917 #[test]
918 fn test_error_recovery_manager_creation() {
919 let config = RecoveryConfig::default();
920 let manager = ErrorRecoveryManager::new(config);
921 assert_eq!(manager.stats.total_errors, 0);
922 assert!(!manager.degradation_state.is_degraded);
923 }
924
925 #[test]
926 fn test_handle_parallel_processing_error() {
927 let mut manager = ErrorRecoveryManager::new(RecoveryConfig::default());
928 let error = ExportError::ParallelProcessingError {
929 shard_index: 5,
930 thread_id: "thread-1".to_string(),
931 error_message: "Test error".to_string(),
932 partial_results: None,
933 };
934 let context = create_test_context();
935
936 let result = manager.handle_export_error(&error, "test_operation", &context);
937 assert!(result.is_ok());
938
939 let recovery_result = result.expect("Failed to handle export error");
940 assert!(recovery_result.success);
941 assert_eq!(manager.stats.total_errors, 1);
942 }
943
944 #[test]
945 fn test_graceful_degradation() {
946 let mut manager = ErrorRecoveryManager::new(RecoveryConfig::default());
947
948 let result = manager
949 .execute_graceful_degradation(DegradationLevel::Light, "test degradation".to_string());
950
951 assert!(result.is_ok());
952 let recovery_result = result.expect("Failed to get test value");
953 assert!(recovery_result.success);
954 assert!(manager.degradation_state.is_degraded);
955 assert_eq!(
956 manager.degradation_state.degradation_level,
957 DegradationLevel::Light
958 );
959 }
960
961 #[test]
962 fn test_partial_save() {
963 let config = RecoveryConfig {
964 partial_save_directory: std::env::temp_dir().join("test_partial_saves"),
965 ..Default::default()
966 };
967 let mut manager = ErrorRecoveryManager::new(config);
968 let context = create_test_context();
969
970 let save_path = manager.generate_partial_save_path("test_op");
971 let result = manager.execute_partial_save(save_path.clone(), 75.0, &context);
972
973 assert!(result.is_ok());
974 let recovery_result = result.expect("Failed to get test value");
975 assert!(recovery_result.success);
976 assert_eq!(recovery_result.partial_result_path, Some(save_path.clone()));
977
978 let _ = std::fs::remove_file(save_path);
980 }
981
982 #[test]
983 fn test_recovery_report() {
984 let mut manager = ErrorRecoveryManager::new(RecoveryConfig::default());
985
986 manager.stats.total_errors = 10;
988 manager.stats.successful_recoveries = 8;
989 manager.stats.failed_recoveries = 2;
990 manager.stats.total_retries = 5;
991 manager.stats.degradation_count = 1;
992 manager.stats.partial_saves = 2;
993 manager.stats.total_recovery_time_ms = 1000;
994
995 let report = manager.generate_recovery_report();
996 assert_eq!(report.total_errors, 10);
997 assert_eq!(report.successful_recoveries, 8);
998 assert_eq!(report.success_rate, 80.0);
999 assert_eq!(report.avg_recovery_time_ms, 125.0); }
1001}