Skip to main content

symbi_runtime/error_handler/
mod.rs

1//! Agent Error Handler
2//!
3//! Handles error recovery, escalation, and system resilience
4
5use async_trait::async_trait;
6use parking_lot::RwLock;
7use std::boxed::Box;
8use std::collections::HashMap;
9use std::sync::Arc;
10use std::time::{Duration, SystemTime};
11use tokio::sync::{mpsc, Notify};
12use tokio::time::interval;
13
14use crate::types::*;
15
16/// Error handler trait
17#[async_trait]
18pub trait ErrorHandler {
19    /// Handle an error for a specific agent
20    async fn handle_error(
21        &self,
22        agent_id: AgentId,
23        error: RuntimeError,
24    ) -> Result<ErrorAction, ErrorHandlerError>;
25
26    /// Register an error recovery strategy
27    async fn register_strategy(
28        &self,
29        error_type: ErrorType,
30        strategy: RecoveryStrategy,
31    ) -> Result<(), ErrorHandlerError>;
32
33    /// Get error statistics for an agent
34    async fn get_error_stats(
35        &self,
36        agent_id: AgentId,
37    ) -> Result<ErrorStatistics, ErrorHandlerError>;
38
39    /// Get system-wide error statistics
40    async fn get_system_error_stats(&self) -> SystemErrorStatistics;
41
42    /// Set error thresholds for an agent
43    async fn set_error_thresholds(
44        &self,
45        agent_id: AgentId,
46        thresholds: ErrorThresholds,
47    ) -> Result<(), ErrorHandlerError>;
48
49    /// Clear error history for an agent
50    async fn clear_error_history(&self, agent_id: AgentId) -> Result<(), ErrorHandlerError>;
51
52    /// Shutdown the error handler
53    async fn shutdown(&self) -> Result<(), ErrorHandlerError>;
54}
55
56/// Error handler configuration
57#[derive(Debug, Clone)]
58pub struct ErrorHandlerConfig {
59    pub max_error_history: usize,
60    pub error_aggregation_window: Duration,
61    pub escalation_threshold: u32,
62    pub circuit_breaker_threshold: u32,
63    pub circuit_breaker_timeout: Duration,
64    pub enable_auto_recovery: bool,
65    pub max_recovery_attempts: u32,
66    pub recovery_backoff_multiplier: f32,
67}
68
69impl Default for ErrorHandlerConfig {
70    fn default() -> Self {
71        Self {
72            max_error_history: 1000,
73            error_aggregation_window: Duration::from_secs(300), // 5 minutes
74            escalation_threshold: 5,
75            circuit_breaker_threshold: 10,
76            circuit_breaker_timeout: Duration::from_secs(60),
77            enable_auto_recovery: true,
78            max_recovery_attempts: 3,
79            recovery_backoff_multiplier: 2.0,
80        }
81    }
82}
83
84/// Default implementation of the error handler
85pub struct DefaultErrorHandler {
86    config: ErrorHandlerConfig,
87    error_history: Arc<RwLock<HashMap<AgentId, Vec<ErrorRecord>>>>,
88    recovery_strategies: Arc<RwLock<HashMap<ErrorType, RecoveryStrategy>>>,
89    error_thresholds: Arc<RwLock<HashMap<AgentId, ErrorThresholds>>>,
90    circuit_breakers: Arc<RwLock<HashMap<AgentId, CircuitBreaker>>>,
91    event_sender: mpsc::UnboundedSender<ErrorEvent>,
92    shutdown_notify: Arc<Notify>,
93    is_running: Arc<RwLock<bool>>,
94}
95
96impl DefaultErrorHandler {
97    /// Create a new error handler
98    pub async fn new(config: ErrorHandlerConfig) -> Result<Self, ErrorHandlerError> {
99        let error_history = Arc::new(RwLock::new(HashMap::new()));
100        let recovery_strategies = Arc::new(RwLock::new(Self::default_strategies()));
101        let error_thresholds = Arc::new(RwLock::new(HashMap::new()));
102        let circuit_breakers = Arc::new(RwLock::new(HashMap::new()));
103        let (event_sender, event_receiver) = mpsc::unbounded_channel();
104        let shutdown_notify = Arc::new(Notify::new());
105        let is_running = Arc::new(RwLock::new(true));
106
107        let handler = Self {
108            config,
109            error_history,
110            recovery_strategies,
111            error_thresholds,
112            circuit_breakers,
113            event_sender,
114            shutdown_notify,
115            is_running,
116        };
117
118        // Start background tasks
119        handler.start_event_loop(event_receiver).await;
120        handler.start_cleanup_loop().await;
121
122        Ok(handler)
123    }
124
125    /// Create default recovery strategies
126    fn default_strategies() -> HashMap<ErrorType, RecoveryStrategy> {
127        let mut strategies = HashMap::new();
128
129        strategies.insert(
130            ErrorType::ResourceExhaustion,
131            RecoveryStrategy::Retry {
132                max_attempts: 3,
133                backoff: Duration::from_secs(1),
134            },
135        );
136
137        strategies.insert(
138            ErrorType::NetworkError,
139            RecoveryStrategy::Retry {
140                max_attempts: 5,
141                backoff: Duration::from_millis(500),
142            },
143        );
144
145        strategies.insert(
146            ErrorType::SecurityViolation,
147            RecoveryStrategy::Terminate { cleanup: true },
148        );
149
150        strategies.insert(
151            ErrorType::PolicyViolation,
152            RecoveryStrategy::Manual {
153                reason: "Policy violation requires manual review".to_string(),
154            },
155        );
156
157        strategies.insert(
158            ErrorType::SystemError,
159            RecoveryStrategy::Restart {
160                preserve_state: false,
161            },
162        );
163
164        strategies.insert(
165            ErrorType::ValidationError,
166            RecoveryStrategy::Failover { backup_agent: None },
167        );
168
169        strategies
170    }
171
172    /// Start the event processing loop
173    async fn start_event_loop(&self, mut event_receiver: mpsc::UnboundedReceiver<ErrorEvent>) {
174        let error_history = self.error_history.clone();
175        let recovery_strategies = self.recovery_strategies.clone();
176        let error_thresholds = self.error_thresholds.clone();
177        let circuit_breakers = self.circuit_breakers.clone();
178        let shutdown_notify = self.shutdown_notify.clone();
179        let config = self.config.clone();
180
181        tokio::spawn(async move {
182            loop {
183                tokio::select! {
184                    event = event_receiver.recv() => {
185                        if let Some(event) = event {
186                            Self::process_error_event(
187                                event,
188                                &error_history,
189                                &recovery_strategies,
190                                &error_thresholds,
191                                &circuit_breakers,
192                                &config,
193                            ).await;
194                        } else {
195                            break;
196                        }
197                    }
198                    _ = shutdown_notify.notified() => {
199                        break;
200                    }
201                }
202            }
203        });
204    }
205
206    /// Start the cleanup loop for old error records
207    async fn start_cleanup_loop(&self) {
208        let error_history = self.error_history.clone();
209        let circuit_breakers = self.circuit_breakers.clone();
210        let shutdown_notify = self.shutdown_notify.clone();
211        let is_running = self.is_running.clone();
212        let max_history = self.config.max_error_history;
213        let aggregation_window = self.config.error_aggregation_window;
214
215        tokio::spawn(async move {
216            let mut interval = interval(Duration::from_secs(300)); // Cleanup every 5 minutes
217
218            loop {
219                tokio::select! {
220                    _ = interval.tick() => {
221                        if !*is_running.read() {
222                            break;
223                        }
224
225                        Self::cleanup_old_records(&error_history, &circuit_breakers, max_history, aggregation_window).await;
226                    }
227                    _ = shutdown_notify.notified() => {
228                        break;
229                    }
230                }
231            }
232        });
233    }
234
235    /// Process an error event
236    async fn process_error_event(
237        event: ErrorEvent,
238        error_history: &Arc<RwLock<HashMap<AgentId, Vec<ErrorRecord>>>>,
239        recovery_strategies: &Arc<RwLock<HashMap<ErrorType, RecoveryStrategy>>>,
240        error_thresholds: &Arc<RwLock<HashMap<AgentId, ErrorThresholds>>>,
241        circuit_breakers: &Arc<RwLock<HashMap<AgentId, CircuitBreaker>>>,
242        config: &ErrorHandlerConfig,
243    ) {
244        match event {
245            ErrorEvent::ErrorOccurred { agent_id, error } => {
246                // Record the error
247                let error_record = ErrorRecord::new(error.clone());
248                error_history
249                    .write()
250                    .entry(agent_id)
251                    .or_default()
252                    .push(error_record);
253
254                // Check circuit breaker
255                let circuit_breaker_open = {
256                    let mut breakers = circuit_breakers.write();
257                    let breaker = breakers.entry(agent_id).or_insert_with(|| {
258                        CircuitBreaker::new(
259                            config.circuit_breaker_threshold,
260                            config.circuit_breaker_timeout,
261                        )
262                    });
263
264                    if breaker.is_open() {
265                        tracing::warn!(
266                            "Circuit breaker open for agent {}, blocking error handling",
267                            agent_id
268                        );
269                        return;
270                    }
271
272                    breaker.record_failure();
273                    false
274                };
275
276                if circuit_breaker_open {
277                    return;
278                }
279
280                // Check thresholds
281                let thresholds = error_thresholds
282                    .read()
283                    .get(&agent_id)
284                    .cloned()
285                    .unwrap_or_default();
286                let recent_errors = Self::count_recent_errors(
287                    error_history,
288                    agent_id,
289                    config.error_aggregation_window,
290                );
291
292                if recent_errors >= thresholds.max_errors_per_window {
293                    tracing::error!(
294                        "Agent {} exceeded error threshold: {} errors in window",
295                        agent_id,
296                        recent_errors
297                    );
298                    // Could trigger escalation here
299                }
300
301                // Determine recovery action
302                let error_type = Self::classify_error(&error);
303                let strategy_option = {
304                    let strategies = recovery_strategies.read();
305                    strategies.get(&error_type).cloned()
306                };
307
308                if let Some(strategy) = strategy_option {
309                    tracing::info!(
310                        "Applying recovery strategy {:?} for agent {} error: {}",
311                        strategy,
312                        agent_id,
313                        error
314                    );
315
316                    // Simulate recovery attempt (in real implementation this would call actual recovery logic)
317                    let success = match strategy {
318                        RecoveryStrategy::Retry { .. } => true, // Assume retry succeeds
319                        RecoveryStrategy::Restart { .. } => true, // Assume restart succeeds
320                        RecoveryStrategy::Terminate { .. } => true, // Terminate always succeeds
321                        RecoveryStrategy::Failover { .. } => false, // Failover might fail without backup
322                        RecoveryStrategy::Manual { .. } => false,   // Manual requires intervention
323                        RecoveryStrategy::None => false,            // No recovery means failure
324                    };
325
326                    // Send recovery event (this would be done by the actual recovery system)
327                    let recovery_event = ErrorEvent::RecoveryAttempted {
328                        agent_id,
329                        strategy: strategy.clone(),
330                        success,
331                        timestamp: SystemTime::now(),
332                    };
333
334                    // Process the recovery event to demonstrate its usage
335                    Box::pin(Self::process_error_event(
336                        recovery_event,
337                        error_history,
338                        recovery_strategies,
339                        error_thresholds,
340                        circuit_breakers,
341                        config,
342                    ))
343                    .await;
344                } else {
345                    tracing::warn!(
346                        "No recovery strategy found for error type {:?} in agent {}",
347                        error_type,
348                        agent_id
349                    );
350                }
351            }
352            ErrorEvent::RecoveryAttempted {
353                agent_id,
354                strategy,
355                success,
356                timestamp,
357            } => {
358                if success {
359                    tracing::info!(
360                        "Recovery successful for agent {} using strategy {:?} at {:?}",
361                        agent_id,
362                        strategy,
363                        timestamp
364                    );
365
366                    // Reset circuit breaker on successful recovery
367                    {
368                        if let Some(breaker) = circuit_breakers.write().get_mut(&agent_id) {
369                            breaker.record_success();
370                        }
371                    }
372                } else {
373                    tracing::error!(
374                        "Recovery failed for agent {} using strategy {:?} at {:?}",
375                        agent_id,
376                        strategy,
377                        timestamp
378                    );
379                }
380            }
381        }
382    }
383
384    /// Cleanup old error records
385    async fn cleanup_old_records(
386        error_history: &Arc<RwLock<HashMap<AgentId, Vec<ErrorRecord>>>>,
387        circuit_breakers: &Arc<RwLock<HashMap<AgentId, CircuitBreaker>>>,
388        max_history: usize,
389        aggregation_window: Duration,
390    ) {
391        let now = SystemTime::now();
392
393        // Cleanup error history
394        {
395            let mut history = error_history.write();
396            for records in history.values_mut() {
397                // Remove old records
398                records.retain(|record| {
399                    now.duration_since(record.timestamp).unwrap_or_default()
400                        < aggregation_window * 2
401                });
402
403                // Limit history size
404                if records.len() > max_history {
405                    records.drain(0..records.len() - max_history);
406                }
407            }
408
409            // Remove empty entries
410            history.retain(|_, records| !records.is_empty());
411        }
412
413        // Update circuit breakers
414        {
415            let mut breakers = circuit_breakers.write();
416            for breaker in breakers.values_mut() {
417                breaker.update(now);
418            }
419
420            // Remove closed breakers that haven't been used recently
421            breakers.retain(|_, breaker| {
422                breaker.is_open()
423                    || breaker
424                        .last_failure_time
425                        .map(|t| now.duration_since(t).unwrap_or_default() < aggregation_window)
426                        .unwrap_or(false)
427            });
428        }
429    }
430
431    /// Classify an error into a type
432    fn classify_error(error: &RuntimeError) -> ErrorType {
433        match error {
434            RuntimeError::Resource(_) => ErrorType::ResourceExhaustion,
435            RuntimeError::Communication(_) => ErrorType::NetworkError,
436            RuntimeError::Security(_) => ErrorType::SecurityViolation,
437            RuntimeError::Scheduler(_) => ErrorType::SystemError,
438            RuntimeError::Lifecycle(_) => ErrorType::SystemError,
439            RuntimeError::ErrorHandler(_) => ErrorType::SystemError,
440            RuntimeError::Configuration(_) => ErrorType::SystemError,
441            RuntimeError::Policy(_) => ErrorType::SecurityViolation,
442            RuntimeError::Sandbox(_) => ErrorType::SecurityViolation,
443            RuntimeError::Audit(_) => ErrorType::SystemError,
444            RuntimeError::Internal(_) => ErrorType::SystemError,
445            RuntimeError::Authentication(_) => ErrorType::SecurityViolation,
446        }
447    }
448
449    /// Count recent errors for an agent
450    fn count_recent_errors(
451        error_history: &Arc<RwLock<HashMap<AgentId, Vec<ErrorRecord>>>>,
452        agent_id: AgentId,
453        window: Duration,
454    ) -> u32 {
455        let history = error_history.read();
456        if let Some(records) = history.get(&agent_id) {
457            let now = SystemTime::now();
458            records
459                .iter()
460                .filter(|record| now.duration_since(record.timestamp).unwrap_or_default() < window)
461                .count() as u32
462        } else {
463            0
464        }
465    }
466
467    /// Send an error event
468    fn send_event(&self, event: ErrorEvent) -> Result<(), ErrorHandlerError> {
469        self.event_sender
470            .send(event)
471            .map_err(|_| ErrorHandlerError::EventProcessingFailed {
472                reason: "Failed to send error event".to_string(),
473            })
474    }
475}
476
477#[async_trait]
478impl ErrorHandler for DefaultErrorHandler {
479    async fn handle_error(
480        &self,
481        agent_id: AgentId,
482        error: RuntimeError,
483    ) -> Result<ErrorAction, ErrorHandlerError> {
484        if !*self.is_running.read() {
485            return Err(ErrorHandlerError::ShuttingDown);
486        }
487
488        // Send error event for processing
489        self.send_event(ErrorEvent::ErrorOccurred {
490            agent_id,
491            error: error.clone(),
492        })?;
493
494        // Determine immediate action
495        let error_type = Self::classify_error(&error);
496        let strategies = self.recovery_strategies.read();
497
498        if let Some(strategy) = strategies.get(&error_type) {
499            let action = match strategy {
500                RecoveryStrategy::Retry {
501                    max_attempts,
502                    backoff,
503                } => ErrorAction::Retry {
504                    max_attempts: *max_attempts,
505                    backoff: *backoff,
506                },
507                RecoveryStrategy::Restart { .. } => ErrorAction::Restart,
508                RecoveryStrategy::Terminate { .. } => ErrorAction::Terminate,
509                RecoveryStrategy::Failover { .. } => ErrorAction::Failover,
510                RecoveryStrategy::Manual { .. } => ErrorAction::Suspend, // Manual intervention maps to suspend
511                RecoveryStrategy::None => ErrorAction::Terminate, // No recovery maps to terminate
512            };
513
514            Ok(action)
515        } else {
516            // Default action for unknown error types
517            Ok(ErrorAction::Retry {
518                max_attempts: 1,
519                backoff: Duration::from_secs(1),
520            })
521        }
522    }
523
524    async fn register_strategy(
525        &self,
526        error_type: ErrorType,
527        strategy: RecoveryStrategy,
528    ) -> Result<(), ErrorHandlerError> {
529        let error_type_clone = error_type;
530        self.recovery_strategies
531            .write()
532            .insert(error_type, strategy);
533        tracing::info!(
534            "Registered recovery strategy for error type {:?}",
535            error_type_clone
536        );
537        Ok(())
538    }
539
540    async fn get_error_stats(
541        &self,
542        agent_id: AgentId,
543    ) -> Result<ErrorStatistics, ErrorHandlerError> {
544        let history = self.error_history.read();
545        if let Some(records) = history.get(&agent_id) {
546            let now = SystemTime::now();
547            let window = self.config.error_aggregation_window;
548
549            let recent_errors = records
550                .iter()
551                .filter(|record| now.duration_since(record.timestamp).unwrap_or_default() < window)
552                .count() as u32;
553
554            let total_errors = records.len() as u32;
555
556            let error_types = records
557                .iter()
558                .map(|record| Self::classify_error(&record.error))
559                .fold(HashMap::new(), |mut acc, error_type| {
560                    *acc.entry(error_type).or_insert(0) += 1;
561                    acc
562                });
563
564            Ok(ErrorStatistics {
565                agent_id,
566                total_errors,
567                recent_errors,
568                error_types,
569                last_error: records.last().map(|r| r.timestamp),
570            })
571        } else {
572            Ok(ErrorStatistics {
573                agent_id,
574                total_errors: 0,
575                recent_errors: 0,
576                error_types: HashMap::new(),
577                last_error: None,
578            })
579        }
580    }
581
582    async fn get_system_error_stats(&self) -> SystemErrorStatistics {
583        let history = self.error_history.read();
584        let now = SystemTime::now();
585        let window = self.config.error_aggregation_window;
586
587        let mut total_errors = 0;
588        let mut recent_errors = 0;
589        let mut agents_with_errors = 0;
590        let mut error_types = HashMap::new();
591
592        for records in history.values() {
593            if !records.is_empty() {
594                agents_with_errors += 1;
595                total_errors += records.len() as u32;
596
597                let agent_recent_errors = records
598                    .iter()
599                    .filter(|record| {
600                        now.duration_since(record.timestamp).unwrap_or_default() < window
601                    })
602                    .count() as u32;
603
604                recent_errors += agent_recent_errors;
605
606                for record in records {
607                    let error_type = Self::classify_error(&record.error);
608                    *error_types.entry(error_type).or_insert(0) += 1;
609                }
610            }
611        }
612
613        SystemErrorStatistics {
614            total_errors,
615            recent_errors,
616            agents_with_errors,
617            error_types,
618            last_updated: now,
619        }
620    }
621
622    async fn set_error_thresholds(
623        &self,
624        agent_id: AgentId,
625        thresholds: ErrorThresholds,
626    ) -> Result<(), ErrorHandlerError> {
627        self.error_thresholds.write().insert(agent_id, thresholds);
628        tracing::info!("Set error thresholds for agent {}", agent_id);
629        Ok(())
630    }
631
632    async fn clear_error_history(&self, agent_id: AgentId) -> Result<(), ErrorHandlerError> {
633        self.error_history.write().remove(&agent_id);
634        self.circuit_breakers.write().remove(&agent_id);
635        tracing::info!("Cleared error history for agent {}", agent_id);
636        Ok(())
637    }
638
639    async fn shutdown(&self) -> Result<(), ErrorHandlerError> {
640        tracing::info!("Shutting down error handler");
641
642        *self.is_running.write() = false;
643        self.shutdown_notify.notify_waiters();
644
645        Ok(())
646    }
647}
648
649/// Error record for tracking
650#[derive(Debug, Clone)]
651struct ErrorRecord {
652    error: RuntimeError,
653    timestamp: SystemTime,
654}
655
656impl ErrorRecord {
657    fn new(error: RuntimeError) -> Self {
658        Self {
659            error,
660            timestamp: SystemTime::now(),
661        }
662    }
663}
664
665/// Circuit breaker for error handling
666#[derive(Debug, Clone)]
667struct CircuitBreaker {
668    failure_threshold: u32,
669    timeout: Duration,
670    failure_count: u32,
671    last_failure_time: Option<SystemTime>,
672    state: CircuitBreakerState,
673}
674
675#[derive(Debug, Clone, PartialEq, Eq)]
676enum CircuitBreakerState {
677    Closed,
678    Open,
679    HalfOpen,
680}
681
682impl CircuitBreaker {
683    fn new(failure_threshold: u32, timeout: Duration) -> Self {
684        Self {
685            failure_threshold,
686            timeout,
687            failure_count: 0,
688            last_failure_time: None,
689            state: CircuitBreakerState::Closed,
690        }
691    }
692
693    fn is_open(&self) -> bool {
694        self.state == CircuitBreakerState::Open
695    }
696
697    fn record_failure(&mut self) {
698        self.failure_count += 1;
699        self.last_failure_time = Some(SystemTime::now());
700
701        if self.failure_count >= self.failure_threshold {
702            self.state = CircuitBreakerState::Open;
703        }
704    }
705
706    fn record_success(&mut self) {
707        self.failure_count = 0;
708        self.state = CircuitBreakerState::Closed;
709    }
710
711    fn update(&mut self, now: SystemTime) {
712        if self.state == CircuitBreakerState::Open {
713            if let Some(last_failure) = self.last_failure_time {
714                if now.duration_since(last_failure).unwrap_or_default() > self.timeout {
715                    self.state = CircuitBreakerState::HalfOpen;
716                }
717            }
718        }
719    }
720}
721
722/// Error thresholds for an agent
723#[derive(Debug, Clone)]
724pub struct ErrorThresholds {
725    pub max_errors_per_window: u32,
726    pub escalation_threshold: u32,
727}
728
729impl Default for ErrorThresholds {
730    fn default() -> Self {
731        Self {
732            max_errors_per_window: 10,
733            escalation_threshold: 5,
734        }
735    }
736}
737
738/// Error statistics for an agent
739#[derive(Debug, Clone)]
740pub struct ErrorStatistics {
741    pub agent_id: AgentId,
742    pub total_errors: u32,
743    pub recent_errors: u32,
744    pub error_types: HashMap<ErrorType, u32>,
745    pub last_error: Option<SystemTime>,
746}
747
748/// System-wide error statistics
749#[derive(Debug, Clone)]
750pub struct SystemErrorStatistics {
751    pub total_errors: u32,
752    pub recent_errors: u32,
753    pub agents_with_errors: u32,
754    pub error_types: HashMap<ErrorType, u32>,
755    pub last_updated: SystemTime,
756}
757
758/// Error action to take
759#[derive(Debug, Clone)]
760pub enum ErrorAction {
761    Retry {
762        max_attempts: u32,
763        backoff: Duration,
764    },
765    Restart,
766    Suspend,
767    Terminate,
768    Failover,
769}
770
771/// Error types for classification
772#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
773pub enum ErrorType {
774    ResourceExhaustion,
775    NetworkError,
776    SecurityViolation,
777    PolicyViolation,
778    SystemError,
779    ValidationError,
780}
781
782/// Error events for internal processing
783#[derive(Debug, Clone)]
784enum ErrorEvent {
785    ErrorOccurred {
786        agent_id: AgentId,
787        error: RuntimeError,
788    },
789    RecoveryAttempted {
790        agent_id: AgentId,
791        strategy: RecoveryStrategy,
792        success: bool,
793        timestamp: SystemTime,
794    },
795}
796
797#[cfg(test)]
798mod tests {
799    use super::*;
800
801    #[tokio::test]
802    async fn test_error_handling() {
803        let handler = DefaultErrorHandler::new(ErrorHandlerConfig::default())
804            .await
805            .unwrap();
806        let agent_id = AgentId::new();
807        let error =
808            RuntimeError::Resource(ResourceError::Insufficient("Memory exhausted".to_string()));
809
810        let action = handler.handle_error(agent_id, error).await.unwrap();
811
812        match action {
813            ErrorAction::Retry { max_attempts, .. } => {
814                assert_eq!(max_attempts, 3);
815            }
816            _ => panic!("Expected retry action for resource error"),
817        }
818    }
819
820    #[tokio::test]
821    async fn test_error_statistics() {
822        let handler = DefaultErrorHandler::new(ErrorHandlerConfig::default())
823            .await
824            .unwrap();
825        let agent_id = AgentId::new();
826
827        // Generate some errors
828        for _ in 0..5 {
829            let error =
830                RuntimeError::Resource(ResourceError::Insufficient("Memory exhausted".to_string()));
831            handler.handle_error(agent_id, error).await.unwrap();
832        }
833
834        // Give the event loop time to process
835        tokio::time::sleep(Duration::from_millis(50)).await;
836
837        let stats = handler.get_error_stats(agent_id).await.unwrap();
838        assert_eq!(stats.total_errors, 5);
839        assert_eq!(stats.recent_errors, 5);
840        assert!(stats
841            .error_types
842            .contains_key(&ErrorType::ResourceExhaustion));
843    }
844
845    #[tokio::test]
846    async fn test_recovery_strategy_registration() {
847        let handler = DefaultErrorHandler::new(ErrorHandlerConfig::default())
848            .await
849            .unwrap();
850
851        let strategy = RecoveryStrategy::Terminate { cleanup: true };
852        let result = handler
853            .register_strategy(ErrorType::SecurityViolation, strategy)
854            .await;
855        assert!(result.is_ok());
856    }
857
858    #[tokio::test]
859    async fn test_error_thresholds() {
860        let handler = DefaultErrorHandler::new(ErrorHandlerConfig::default())
861            .await
862            .unwrap();
863        let agent_id = AgentId::new();
864
865        let thresholds = ErrorThresholds {
866            max_errors_per_window: 3,
867            escalation_threshold: 2,
868        };
869
870        let result = handler.set_error_thresholds(agent_id, thresholds).await;
871        assert!(result.is_ok());
872    }
873
874    #[test]
875    fn test_circuit_breaker() {
876        let mut breaker = CircuitBreaker::new(3, Duration::from_secs(60));
877
878        assert!(!breaker.is_open());
879
880        // Record failures
881        breaker.record_failure();
882        breaker.record_failure();
883        assert!(!breaker.is_open());
884
885        breaker.record_failure();
886        assert!(breaker.is_open());
887
888        // Record success should reset
889        breaker.record_success();
890        assert!(!breaker.is_open());
891    }
892}