Skip to main content

symbi_runtime/error_handler/
mod.rs

1//! Agent Error Handler
2//!
3//! Handles error recovery, escalation, and system resilience
4
5use async_trait::async_trait;
6use parking_lot::RwLock;
7use std::boxed::Box;
8use std::collections::HashMap;
9use std::sync::Arc;
10use std::time::{Duration, SystemTime};
11use tokio::sync::{mpsc, Notify};
12use tokio::time::interval;
13
14use crate::types::*;
15
16/// Error handler trait
17#[async_trait]
18pub trait ErrorHandler {
19    /// Handle an error for a specific agent
20    async fn handle_error(
21        &self,
22        agent_id: AgentId,
23        error: RuntimeError,
24    ) -> Result<ErrorAction, ErrorHandlerError>;
25
26    /// Register an error recovery strategy
27    async fn register_strategy(
28        &self,
29        error_type: ErrorType,
30        strategy: RecoveryStrategy,
31    ) -> Result<(), ErrorHandlerError>;
32
33    /// Get error statistics for an agent
34    async fn get_error_stats(
35        &self,
36        agent_id: AgentId,
37    ) -> Result<ErrorStatistics, ErrorHandlerError>;
38
39    /// Get system-wide error statistics
40    async fn get_system_error_stats(&self) -> SystemErrorStatistics;
41
42    /// Set error thresholds for an agent
43    async fn set_error_thresholds(
44        &self,
45        agent_id: AgentId,
46        thresholds: ErrorThresholds,
47    ) -> Result<(), ErrorHandlerError>;
48
49    /// Clear error history for an agent
50    async fn clear_error_history(&self, agent_id: AgentId) -> Result<(), ErrorHandlerError>;
51
52    /// Shutdown the error handler
53    async fn shutdown(&self) -> Result<(), ErrorHandlerError>;
54}
55
56/// Error handler configuration
57#[derive(Debug, Clone)]
58pub struct ErrorHandlerConfig {
59    pub max_error_history: usize,
60    pub error_aggregation_window: Duration,
61    pub escalation_threshold: u32,
62    pub circuit_breaker_threshold: u32,
63    pub circuit_breaker_timeout: Duration,
64    pub enable_auto_recovery: bool,
65    pub max_recovery_attempts: u32,
66    pub recovery_backoff_multiplier: f32,
67}
68
69impl Default for ErrorHandlerConfig {
70    fn default() -> Self {
71        Self {
72            max_error_history: 1000,
73            error_aggregation_window: Duration::from_secs(300), // 5 minutes
74            escalation_threshold: 5,
75            circuit_breaker_threshold: 10,
76            circuit_breaker_timeout: Duration::from_secs(60),
77            enable_auto_recovery: true,
78            max_recovery_attempts: 3,
79            recovery_backoff_multiplier: 2.0,
80        }
81    }
82}
83
84/// Default implementation of the error handler
85pub struct DefaultErrorHandler {
86    config: ErrorHandlerConfig,
87    error_history: Arc<RwLock<HashMap<AgentId, Vec<ErrorRecord>>>>,
88    recovery_strategies: Arc<RwLock<HashMap<ErrorType, RecoveryStrategy>>>,
89    error_thresholds: Arc<RwLock<HashMap<AgentId, ErrorThresholds>>>,
90    circuit_breakers: Arc<RwLock<HashMap<AgentId, CircuitBreaker>>>,
91    event_sender: mpsc::UnboundedSender<ErrorEvent>,
92    shutdown_notify: Arc<Notify>,
93    is_running: Arc<RwLock<bool>>,
94}
95
96impl DefaultErrorHandler {
97    /// Create a new error handler
98    pub async fn new(config: ErrorHandlerConfig) -> Result<Self, ErrorHandlerError> {
99        let error_history = Arc::new(RwLock::new(HashMap::new()));
100        let recovery_strategies = Arc::new(RwLock::new(Self::default_strategies()));
101        let error_thresholds = Arc::new(RwLock::new(HashMap::new()));
102        let circuit_breakers = Arc::new(RwLock::new(HashMap::new()));
103        let (event_sender, event_receiver) = mpsc::unbounded_channel();
104        let shutdown_notify = Arc::new(Notify::new());
105        let is_running = Arc::new(RwLock::new(true));
106
107        let handler = Self {
108            config,
109            error_history,
110            recovery_strategies,
111            error_thresholds,
112            circuit_breakers,
113            event_sender,
114            shutdown_notify,
115            is_running,
116        };
117
118        // Start background tasks
119        handler.start_event_loop(event_receiver).await;
120        handler.start_cleanup_loop().await;
121
122        Ok(handler)
123    }
124
125    /// Create default recovery strategies
126    fn default_strategies() -> HashMap<ErrorType, RecoveryStrategy> {
127        let mut strategies = HashMap::new();
128
129        strategies.insert(
130            ErrorType::ResourceExhaustion,
131            RecoveryStrategy::Retry {
132                max_attempts: 3,
133                backoff: Duration::from_secs(1),
134            },
135        );
136
137        strategies.insert(
138            ErrorType::NetworkError,
139            RecoveryStrategy::Retry {
140                max_attempts: 5,
141                backoff: Duration::from_millis(500),
142            },
143        );
144
145        strategies.insert(
146            ErrorType::SecurityViolation,
147            RecoveryStrategy::Terminate { cleanup: true },
148        );
149
150        strategies.insert(
151            ErrorType::PolicyViolation,
152            RecoveryStrategy::Manual {
153                reason: "Policy violation requires manual review".to_string(),
154            },
155        );
156
157        strategies.insert(
158            ErrorType::SystemError,
159            RecoveryStrategy::Restart {
160                preserve_state: false,
161            },
162        );
163
164        strategies.insert(
165            ErrorType::ValidationError,
166            RecoveryStrategy::Failover { backup_agent: None },
167        );
168
169        strategies
170    }
171
172    /// Start the event processing loop
173    async fn start_event_loop(&self, mut event_receiver: mpsc::UnboundedReceiver<ErrorEvent>) {
174        let error_history = self.error_history.clone();
175        let recovery_strategies = self.recovery_strategies.clone();
176        let error_thresholds = self.error_thresholds.clone();
177        let circuit_breakers = self.circuit_breakers.clone();
178        let shutdown_notify = self.shutdown_notify.clone();
179        let config = self.config.clone();
180
181        tokio::spawn(async move {
182            loop {
183                tokio::select! {
184                    event = event_receiver.recv() => {
185                        if let Some(event) = event {
186                            Self::process_error_event(
187                                event,
188                                &error_history,
189                                &recovery_strategies,
190                                &error_thresholds,
191                                &circuit_breakers,
192                                &config,
193                            ).await;
194                        } else {
195                            break;
196                        }
197                    }
198                    _ = shutdown_notify.notified() => {
199                        break;
200                    }
201                }
202            }
203        });
204    }
205
206    /// Start the cleanup loop for old error records
207    async fn start_cleanup_loop(&self) {
208        let error_history = self.error_history.clone();
209        let circuit_breakers = self.circuit_breakers.clone();
210        let shutdown_notify = self.shutdown_notify.clone();
211        let is_running = self.is_running.clone();
212        let max_history = self.config.max_error_history;
213        let aggregation_window = self.config.error_aggregation_window;
214
215        tokio::spawn(async move {
216            let mut interval = interval(Duration::from_secs(300)); // Cleanup every 5 minutes
217
218            loop {
219                tokio::select! {
220                    _ = interval.tick() => {
221                        if !*is_running.read() {
222                            break;
223                        }
224
225                        Self::cleanup_old_records(&error_history, &circuit_breakers, max_history, aggregation_window).await;
226                    }
227                    _ = shutdown_notify.notified() => {
228                        break;
229                    }
230                }
231            }
232        });
233    }
234
235    /// Process an error event
236    async fn process_error_event(
237        event: ErrorEvent,
238        error_history: &Arc<RwLock<HashMap<AgentId, Vec<ErrorRecord>>>>,
239        recovery_strategies: &Arc<RwLock<HashMap<ErrorType, RecoveryStrategy>>>,
240        error_thresholds: &Arc<RwLock<HashMap<AgentId, ErrorThresholds>>>,
241        circuit_breakers: &Arc<RwLock<HashMap<AgentId, CircuitBreaker>>>,
242        config: &ErrorHandlerConfig,
243    ) {
244        match event {
245            ErrorEvent::ErrorOccurred { agent_id, error } => {
246                // Record the error
247                let error_record = ErrorRecord::new(error.clone());
248                error_history
249                    .write()
250                    .entry(agent_id)
251                    .or_default()
252                    .push(error_record);
253
254                // Check circuit breaker
255                let circuit_breaker_open = {
256                    let mut breakers = circuit_breakers.write();
257                    let breaker = breakers.entry(agent_id).or_insert_with(|| {
258                        CircuitBreaker::new(
259                            config.circuit_breaker_threshold,
260                            config.circuit_breaker_timeout,
261                        )
262                    });
263
264                    if breaker.is_open() {
265                        tracing::warn!(
266                            "Circuit breaker open for agent {}, blocking error handling",
267                            agent_id
268                        );
269                        return;
270                    }
271
272                    breaker.record_failure();
273                    false
274                };
275
276                if circuit_breaker_open {
277                    return;
278                }
279
280                // Check thresholds
281                let thresholds = error_thresholds
282                    .read()
283                    .get(&agent_id)
284                    .cloned()
285                    .unwrap_or_default();
286                let recent_errors = Self::count_recent_errors(
287                    error_history,
288                    agent_id,
289                    config.error_aggregation_window,
290                );
291
292                if recent_errors >= thresholds.max_errors_per_window {
293                    tracing::error!(
294                        "Agent {} exceeded error threshold: {} errors in window",
295                        agent_id,
296                        recent_errors
297                    );
298                    // Could trigger escalation here
299                }
300
301                // Determine recovery action
302                let error_type = Self::classify_error(&error);
303                let strategy_option = {
304                    let strategies = recovery_strategies.read();
305                    strategies.get(&error_type).cloned()
306                };
307
308                if let Some(strategy) = strategy_option {
309                    tracing::info!(
310                        "Applying recovery strategy {:?} for agent {} error: {}",
311                        strategy,
312                        agent_id,
313                        error
314                    );
315
316                    // Simulate recovery attempt (in real implementation this would call actual recovery logic)
317                    let success = match strategy {
318                        RecoveryStrategy::Retry { .. } => true, // Assume retry succeeds
319                        RecoveryStrategy::Restart { .. } => true, // Assume restart succeeds
320                        RecoveryStrategy::Terminate { .. } => true, // Terminate always succeeds
321                        RecoveryStrategy::Failover { .. } => false, // Failover might fail without backup
322                        RecoveryStrategy::Manual { .. } => false,   // Manual requires intervention
323                        RecoveryStrategy::None => false,            // No recovery means failure
324                    };
325
326                    // Send recovery event (this would be done by the actual recovery system)
327                    let recovery_event = ErrorEvent::RecoveryAttempted {
328                        agent_id,
329                        strategy: strategy.clone(),
330                        success,
331                        timestamp: SystemTime::now(),
332                    };
333
334                    // Process the recovery event to demonstrate its usage
335                    Box::pin(Self::process_error_event(
336                        recovery_event,
337                        error_history,
338                        recovery_strategies,
339                        error_thresholds,
340                        circuit_breakers,
341                        config,
342                    ))
343                    .await;
344                } else {
345                    tracing::warn!(
346                        "No recovery strategy found for error type {:?} in agent {}",
347                        error_type,
348                        agent_id
349                    );
350                }
351            }
352            ErrorEvent::RecoveryAttempted {
353                agent_id,
354                strategy,
355                success,
356                timestamp,
357            } => {
358                if success {
359                    tracing::info!(
360                        "Recovery successful for agent {} using strategy {:?} at {:?}",
361                        agent_id,
362                        strategy,
363                        timestamp
364                    );
365
366                    // Reset circuit breaker on successful recovery
367                    {
368                        if let Some(breaker) = circuit_breakers.write().get_mut(&agent_id) {
369                            breaker.record_success();
370                        }
371                    }
372                } else {
373                    tracing::error!(
374                        "Recovery failed for agent {} using strategy {:?} at {:?}",
375                        agent_id,
376                        strategy,
377                        timestamp
378                    );
379                }
380            }
381        }
382    }
383
384    /// Cleanup old error records
385    async fn cleanup_old_records(
386        error_history: &Arc<RwLock<HashMap<AgentId, Vec<ErrorRecord>>>>,
387        circuit_breakers: &Arc<RwLock<HashMap<AgentId, CircuitBreaker>>>,
388        max_history: usize,
389        aggregation_window: Duration,
390    ) {
391        let now = SystemTime::now();
392
393        // Cleanup error history
394        {
395            let mut history = error_history.write();
396            for records in history.values_mut() {
397                // Remove old records
398                records.retain(|record| {
399                    now.duration_since(record.timestamp).unwrap_or_default()
400                        < aggregation_window * 2
401                });
402
403                // Limit history size
404                if records.len() > max_history {
405                    records.drain(0..records.len() - max_history);
406                }
407            }
408
409            // Remove empty entries
410            history.retain(|_, records| !records.is_empty());
411        }
412
413        // Update circuit breakers
414        {
415            let mut breakers = circuit_breakers.write();
416            for breaker in breakers.values_mut() {
417                breaker.update(now);
418            }
419
420            // Remove closed breakers that haven't been used recently
421            breakers.retain(|_, breaker| {
422                breaker.is_open()
423                    || breaker
424                        .last_failure_time
425                        .map(|t| now.duration_since(t).unwrap_or_default() < aggregation_window)
426                        .unwrap_or(false)
427            });
428        }
429    }
430
431    /// Classify an error into a type
432    fn classify_error(error: &RuntimeError) -> ErrorType {
433        match error {
434            RuntimeError::Resource(_) => ErrorType::ResourceExhaustion,
435            RuntimeError::Communication(_) => ErrorType::NetworkError,
436            RuntimeError::Security(_) => ErrorType::SecurityViolation,
437            RuntimeError::Scheduler(_) => ErrorType::SystemError,
438            RuntimeError::Lifecycle(_) => ErrorType::SystemError,
439            RuntimeError::ErrorHandler(_) => ErrorType::SystemError,
440            RuntimeError::Configuration(_) => ErrorType::SystemError,
441            RuntimeError::Policy(_) => ErrorType::SecurityViolation,
442            RuntimeError::Sandbox(_) => ErrorType::SecurityViolation,
443            RuntimeError::Audit(_) => ErrorType::SystemError,
444            RuntimeError::Internal(_) => ErrorType::SystemError,
445        }
446    }
447
448    /// Count recent errors for an agent
449    fn count_recent_errors(
450        error_history: &Arc<RwLock<HashMap<AgentId, Vec<ErrorRecord>>>>,
451        agent_id: AgentId,
452        window: Duration,
453    ) -> u32 {
454        let history = error_history.read();
455        if let Some(records) = history.get(&agent_id) {
456            let now = SystemTime::now();
457            records
458                .iter()
459                .filter(|record| now.duration_since(record.timestamp).unwrap_or_default() < window)
460                .count() as u32
461        } else {
462            0
463        }
464    }
465
466    /// Send an error event
467    fn send_event(&self, event: ErrorEvent) -> Result<(), ErrorHandlerError> {
468        self.event_sender
469            .send(event)
470            .map_err(|_| ErrorHandlerError::EventProcessingFailed {
471                reason: "Failed to send error event".to_string(),
472            })
473    }
474}
475
476#[async_trait]
477impl ErrorHandler for DefaultErrorHandler {
478    async fn handle_error(
479        &self,
480        agent_id: AgentId,
481        error: RuntimeError,
482    ) -> Result<ErrorAction, ErrorHandlerError> {
483        if !*self.is_running.read() {
484            return Err(ErrorHandlerError::ShuttingDown);
485        }
486
487        // Send error event for processing
488        self.send_event(ErrorEvent::ErrorOccurred {
489            agent_id,
490            error: error.clone(),
491        })?;
492
493        // Determine immediate action
494        let error_type = Self::classify_error(&error);
495        let strategies = self.recovery_strategies.read();
496
497        if let Some(strategy) = strategies.get(&error_type) {
498            let action = match strategy {
499                RecoveryStrategy::Retry {
500                    max_attempts,
501                    backoff,
502                } => ErrorAction::Retry {
503                    max_attempts: *max_attempts,
504                    backoff: *backoff,
505                },
506                RecoveryStrategy::Restart { .. } => ErrorAction::Restart,
507                RecoveryStrategy::Terminate { .. } => ErrorAction::Terminate,
508                RecoveryStrategy::Failover { .. } => ErrorAction::Failover,
509                RecoveryStrategy::Manual { .. } => ErrorAction::Suspend, // Manual intervention maps to suspend
510                RecoveryStrategy::None => ErrorAction::Terminate, // No recovery maps to terminate
511            };
512
513            Ok(action)
514        } else {
515            // Default action for unknown error types
516            Ok(ErrorAction::Retry {
517                max_attempts: 1,
518                backoff: Duration::from_secs(1),
519            })
520        }
521    }
522
523    async fn register_strategy(
524        &self,
525        error_type: ErrorType,
526        strategy: RecoveryStrategy,
527    ) -> Result<(), ErrorHandlerError> {
528        let error_type_clone = error_type;
529        self.recovery_strategies
530            .write()
531            .insert(error_type, strategy);
532        tracing::info!(
533            "Registered recovery strategy for error type {:?}",
534            error_type_clone
535        );
536        Ok(())
537    }
538
539    async fn get_error_stats(
540        &self,
541        agent_id: AgentId,
542    ) -> Result<ErrorStatistics, ErrorHandlerError> {
543        let history = self.error_history.read();
544        if let Some(records) = history.get(&agent_id) {
545            let now = SystemTime::now();
546            let window = self.config.error_aggregation_window;
547
548            let recent_errors = records
549                .iter()
550                .filter(|record| now.duration_since(record.timestamp).unwrap_or_default() < window)
551                .count() as u32;
552
553            let total_errors = records.len() as u32;
554
555            let error_types = records
556                .iter()
557                .map(|record| Self::classify_error(&record.error))
558                .fold(HashMap::new(), |mut acc, error_type| {
559                    *acc.entry(error_type).or_insert(0) += 1;
560                    acc
561                });
562
563            Ok(ErrorStatistics {
564                agent_id,
565                total_errors,
566                recent_errors,
567                error_types,
568                last_error: records.last().map(|r| r.timestamp),
569            })
570        } else {
571            Ok(ErrorStatistics {
572                agent_id,
573                total_errors: 0,
574                recent_errors: 0,
575                error_types: HashMap::new(),
576                last_error: None,
577            })
578        }
579    }
580
581    async fn get_system_error_stats(&self) -> SystemErrorStatistics {
582        let history = self.error_history.read();
583        let now = SystemTime::now();
584        let window = self.config.error_aggregation_window;
585
586        let mut total_errors = 0;
587        let mut recent_errors = 0;
588        let mut agents_with_errors = 0;
589        let mut error_types = HashMap::new();
590
591        for records in history.values() {
592            if !records.is_empty() {
593                agents_with_errors += 1;
594                total_errors += records.len() as u32;
595
596                let agent_recent_errors = records
597                    .iter()
598                    .filter(|record| {
599                        now.duration_since(record.timestamp).unwrap_or_default() < window
600                    })
601                    .count() as u32;
602
603                recent_errors += agent_recent_errors;
604
605                for record in records {
606                    let error_type = Self::classify_error(&record.error);
607                    *error_types.entry(error_type).or_insert(0) += 1;
608                }
609            }
610        }
611
612        SystemErrorStatistics {
613            total_errors,
614            recent_errors,
615            agents_with_errors,
616            error_types,
617            last_updated: now,
618        }
619    }
620
621    async fn set_error_thresholds(
622        &self,
623        agent_id: AgentId,
624        thresholds: ErrorThresholds,
625    ) -> Result<(), ErrorHandlerError> {
626        self.error_thresholds.write().insert(agent_id, thresholds);
627        tracing::info!("Set error thresholds for agent {}", agent_id);
628        Ok(())
629    }
630
631    async fn clear_error_history(&self, agent_id: AgentId) -> Result<(), ErrorHandlerError> {
632        self.error_history.write().remove(&agent_id);
633        self.circuit_breakers.write().remove(&agent_id);
634        tracing::info!("Cleared error history for agent {}", agent_id);
635        Ok(())
636    }
637
638    async fn shutdown(&self) -> Result<(), ErrorHandlerError> {
639        tracing::info!("Shutting down error handler");
640
641        *self.is_running.write() = false;
642        self.shutdown_notify.notify_waiters();
643
644        Ok(())
645    }
646}
647
648/// Error record for tracking
649#[derive(Debug, Clone)]
650struct ErrorRecord {
651    error: RuntimeError,
652    timestamp: SystemTime,
653}
654
655impl ErrorRecord {
656    fn new(error: RuntimeError) -> Self {
657        Self {
658            error,
659            timestamp: SystemTime::now(),
660        }
661    }
662}
663
664/// Circuit breaker for error handling
665#[derive(Debug, Clone)]
666struct CircuitBreaker {
667    failure_threshold: u32,
668    timeout: Duration,
669    failure_count: u32,
670    last_failure_time: Option<SystemTime>,
671    state: CircuitBreakerState,
672}
673
674#[derive(Debug, Clone, PartialEq, Eq)]
675enum CircuitBreakerState {
676    Closed,
677    Open,
678    HalfOpen,
679}
680
681impl CircuitBreaker {
682    fn new(failure_threshold: u32, timeout: Duration) -> Self {
683        Self {
684            failure_threshold,
685            timeout,
686            failure_count: 0,
687            last_failure_time: None,
688            state: CircuitBreakerState::Closed,
689        }
690    }
691
692    fn is_open(&self) -> bool {
693        self.state == CircuitBreakerState::Open
694    }
695
696    fn record_failure(&mut self) {
697        self.failure_count += 1;
698        self.last_failure_time = Some(SystemTime::now());
699
700        if self.failure_count >= self.failure_threshold {
701            self.state = CircuitBreakerState::Open;
702        }
703    }
704
705    fn record_success(&mut self) {
706        self.failure_count = 0;
707        self.state = CircuitBreakerState::Closed;
708    }
709
710    fn update(&mut self, now: SystemTime) {
711        if self.state == CircuitBreakerState::Open {
712            if let Some(last_failure) = self.last_failure_time {
713                if now.duration_since(last_failure).unwrap_or_default() > self.timeout {
714                    self.state = CircuitBreakerState::HalfOpen;
715                }
716            }
717        }
718    }
719}
720
721/// Error thresholds for an agent
722#[derive(Debug, Clone)]
723pub struct ErrorThresholds {
724    pub max_errors_per_window: u32,
725    pub escalation_threshold: u32,
726}
727
728impl Default for ErrorThresholds {
729    fn default() -> Self {
730        Self {
731            max_errors_per_window: 10,
732            escalation_threshold: 5,
733        }
734    }
735}
736
737/// Error statistics for an agent
738#[derive(Debug, Clone)]
739pub struct ErrorStatistics {
740    pub agent_id: AgentId,
741    pub total_errors: u32,
742    pub recent_errors: u32,
743    pub error_types: HashMap<ErrorType, u32>,
744    pub last_error: Option<SystemTime>,
745}
746
747/// System-wide error statistics
748#[derive(Debug, Clone)]
749pub struct SystemErrorStatistics {
750    pub total_errors: u32,
751    pub recent_errors: u32,
752    pub agents_with_errors: u32,
753    pub error_types: HashMap<ErrorType, u32>,
754    pub last_updated: SystemTime,
755}
756
757/// Error action to take
758#[derive(Debug, Clone)]
759pub enum ErrorAction {
760    Retry {
761        max_attempts: u32,
762        backoff: Duration,
763    },
764    Restart,
765    Suspend,
766    Terminate,
767    Failover,
768}
769
770/// Error types for classification
771#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
772pub enum ErrorType {
773    ResourceExhaustion,
774    NetworkError,
775    SecurityViolation,
776    PolicyViolation,
777    SystemError,
778    ValidationError,
779}
780
781/// Error events for internal processing
782#[derive(Debug, Clone)]
783enum ErrorEvent {
784    ErrorOccurred {
785        agent_id: AgentId,
786        error: RuntimeError,
787    },
788    RecoveryAttempted {
789        agent_id: AgentId,
790        strategy: RecoveryStrategy,
791        success: bool,
792        timestamp: SystemTime,
793    },
794}
795
796#[cfg(test)]
797mod tests {
798    use super::*;
799
800    #[tokio::test]
801    async fn test_error_handling() {
802        let handler = DefaultErrorHandler::new(ErrorHandlerConfig::default())
803            .await
804            .unwrap();
805        let agent_id = AgentId::new();
806        let error =
807            RuntimeError::Resource(ResourceError::Insufficient("Memory exhausted".to_string()));
808
809        let action = handler.handle_error(agent_id, error).await.unwrap();
810
811        match action {
812            ErrorAction::Retry { max_attempts, .. } => {
813                assert_eq!(max_attempts, 3);
814            }
815            _ => panic!("Expected retry action for resource error"),
816        }
817    }
818
819    #[tokio::test]
820    async fn test_error_statistics() {
821        let handler = DefaultErrorHandler::new(ErrorHandlerConfig::default())
822            .await
823            .unwrap();
824        let agent_id = AgentId::new();
825
826        // Generate some errors
827        for _ in 0..5 {
828            let error =
829                RuntimeError::Resource(ResourceError::Insufficient("Memory exhausted".to_string()));
830            handler.handle_error(agent_id, error).await.unwrap();
831        }
832
833        // Give the event loop time to process
834        tokio::time::sleep(Duration::from_millis(50)).await;
835
836        let stats = handler.get_error_stats(agent_id).await.unwrap();
837        assert_eq!(stats.total_errors, 5);
838        assert_eq!(stats.recent_errors, 5);
839        assert!(stats
840            .error_types
841            .contains_key(&ErrorType::ResourceExhaustion));
842    }
843
844    #[tokio::test]
845    async fn test_recovery_strategy_registration() {
846        let handler = DefaultErrorHandler::new(ErrorHandlerConfig::default())
847            .await
848            .unwrap();
849
850        let strategy = RecoveryStrategy::Terminate { cleanup: true };
851        let result = handler
852            .register_strategy(ErrorType::SecurityViolation, strategy)
853            .await;
854        assert!(result.is_ok());
855    }
856
857    #[tokio::test]
858    async fn test_error_thresholds() {
859        let handler = DefaultErrorHandler::new(ErrorHandlerConfig::default())
860            .await
861            .unwrap();
862        let agent_id = AgentId::new();
863
864        let thresholds = ErrorThresholds {
865            max_errors_per_window: 3,
866            escalation_threshold: 2,
867        };
868
869        let result = handler.set_error_thresholds(agent_id, thresholds).await;
870        assert!(result.is_ok());
871    }
872
873    #[test]
874    fn test_circuit_breaker() {
875        let mut breaker = CircuitBreaker::new(3, Duration::from_secs(60));
876
877        assert!(!breaker.is_open());
878
879        // Record failures
880        breaker.record_failure();
881        breaker.record_failure();
882        assert!(!breaker.is_open());
883
884        breaker.record_failure();
885        assert!(breaker.is_open());
886
887        // Record success should reset
888        breaker.record_success();
889        assert!(!breaker.is_open());
890    }
891}