1use async_trait::async_trait;
6use parking_lot::RwLock;
7use std::boxed::Box;
8use std::collections::HashMap;
9use std::sync::Arc;
10use std::time::{Duration, SystemTime};
11use tokio::sync::{mpsc, Notify};
12use tokio::time::interval;
13
14use crate::types::*;
15
16#[async_trait]
18pub trait ErrorHandler {
19 async fn handle_error(
21 &self,
22 agent_id: AgentId,
23 error: RuntimeError,
24 ) -> Result<ErrorAction, ErrorHandlerError>;
25
26 async fn register_strategy(
28 &self,
29 error_type: ErrorType,
30 strategy: RecoveryStrategy,
31 ) -> Result<(), ErrorHandlerError>;
32
33 async fn get_error_stats(
35 &self,
36 agent_id: AgentId,
37 ) -> Result<ErrorStatistics, ErrorHandlerError>;
38
39 async fn get_system_error_stats(&self) -> SystemErrorStatistics;
41
42 async fn set_error_thresholds(
44 &self,
45 agent_id: AgentId,
46 thresholds: ErrorThresholds,
47 ) -> Result<(), ErrorHandlerError>;
48
49 async fn clear_error_history(&self, agent_id: AgentId) -> Result<(), ErrorHandlerError>;
51
52 async fn shutdown(&self) -> Result<(), ErrorHandlerError>;
54}
55
56#[derive(Debug, Clone)]
58pub struct ErrorHandlerConfig {
59 pub max_error_history: usize,
60 pub error_aggregation_window: Duration,
61 pub escalation_threshold: u32,
62 pub circuit_breaker_threshold: u32,
63 pub circuit_breaker_timeout: Duration,
64 pub enable_auto_recovery: bool,
65 pub max_recovery_attempts: u32,
66 pub recovery_backoff_multiplier: f32,
67}
68
69impl Default for ErrorHandlerConfig {
70 fn default() -> Self {
71 Self {
72 max_error_history: 1000,
73 error_aggregation_window: Duration::from_secs(300), escalation_threshold: 5,
75 circuit_breaker_threshold: 10,
76 circuit_breaker_timeout: Duration::from_secs(60),
77 enable_auto_recovery: true,
78 max_recovery_attempts: 3,
79 recovery_backoff_multiplier: 2.0,
80 }
81 }
82}
83
84pub struct DefaultErrorHandler {
86 config: ErrorHandlerConfig,
87 error_history: Arc<RwLock<HashMap<AgentId, Vec<ErrorRecord>>>>,
88 recovery_strategies: Arc<RwLock<HashMap<ErrorType, RecoveryStrategy>>>,
89 error_thresholds: Arc<RwLock<HashMap<AgentId, ErrorThresholds>>>,
90 circuit_breakers: Arc<RwLock<HashMap<AgentId, CircuitBreaker>>>,
91 event_sender: mpsc::UnboundedSender<ErrorEvent>,
92 shutdown_notify: Arc<Notify>,
93 is_running: Arc<RwLock<bool>>,
94}
95
96impl DefaultErrorHandler {
97 pub async fn new(config: ErrorHandlerConfig) -> Result<Self, ErrorHandlerError> {
99 let error_history = Arc::new(RwLock::new(HashMap::new()));
100 let recovery_strategies = Arc::new(RwLock::new(Self::default_strategies()));
101 let error_thresholds = Arc::new(RwLock::new(HashMap::new()));
102 let circuit_breakers = Arc::new(RwLock::new(HashMap::new()));
103 let (event_sender, event_receiver) = mpsc::unbounded_channel();
104 let shutdown_notify = Arc::new(Notify::new());
105 let is_running = Arc::new(RwLock::new(true));
106
107 let handler = Self {
108 config,
109 error_history,
110 recovery_strategies,
111 error_thresholds,
112 circuit_breakers,
113 event_sender,
114 shutdown_notify,
115 is_running,
116 };
117
118 handler.start_event_loop(event_receiver).await;
120 handler.start_cleanup_loop().await;
121
122 Ok(handler)
123 }
124
125 fn default_strategies() -> HashMap<ErrorType, RecoveryStrategy> {
127 let mut strategies = HashMap::new();
128
129 strategies.insert(
130 ErrorType::ResourceExhaustion,
131 RecoveryStrategy::Retry {
132 max_attempts: 3,
133 backoff: Duration::from_secs(1),
134 },
135 );
136
137 strategies.insert(
138 ErrorType::NetworkError,
139 RecoveryStrategy::Retry {
140 max_attempts: 5,
141 backoff: Duration::from_millis(500),
142 },
143 );
144
145 strategies.insert(
146 ErrorType::SecurityViolation,
147 RecoveryStrategy::Terminate { cleanup: true },
148 );
149
150 strategies.insert(
151 ErrorType::PolicyViolation,
152 RecoveryStrategy::Manual {
153 reason: "Policy violation requires manual review".to_string(),
154 },
155 );
156
157 strategies.insert(
158 ErrorType::SystemError,
159 RecoveryStrategy::Restart {
160 preserve_state: false,
161 },
162 );
163
164 strategies.insert(
165 ErrorType::ValidationError,
166 RecoveryStrategy::Failover { backup_agent: None },
167 );
168
169 strategies
170 }
171
172 async fn start_event_loop(&self, mut event_receiver: mpsc::UnboundedReceiver<ErrorEvent>) {
174 let error_history = self.error_history.clone();
175 let recovery_strategies = self.recovery_strategies.clone();
176 let error_thresholds = self.error_thresholds.clone();
177 let circuit_breakers = self.circuit_breakers.clone();
178 let shutdown_notify = self.shutdown_notify.clone();
179 let config = self.config.clone();
180
181 tokio::spawn(async move {
182 loop {
183 tokio::select! {
184 event = event_receiver.recv() => {
185 if let Some(event) = event {
186 Self::process_error_event(
187 event,
188 &error_history,
189 &recovery_strategies,
190 &error_thresholds,
191 &circuit_breakers,
192 &config,
193 ).await;
194 } else {
195 break;
196 }
197 }
198 _ = shutdown_notify.notified() => {
199 break;
200 }
201 }
202 }
203 });
204 }
205
206 async fn start_cleanup_loop(&self) {
208 let error_history = self.error_history.clone();
209 let circuit_breakers = self.circuit_breakers.clone();
210 let shutdown_notify = self.shutdown_notify.clone();
211 let is_running = self.is_running.clone();
212 let max_history = self.config.max_error_history;
213 let aggregation_window = self.config.error_aggregation_window;
214
215 tokio::spawn(async move {
216 let mut interval = interval(Duration::from_secs(300)); loop {
219 tokio::select! {
220 _ = interval.tick() => {
221 if !*is_running.read() {
222 break;
223 }
224
225 Self::cleanup_old_records(&error_history, &circuit_breakers, max_history, aggregation_window).await;
226 }
227 _ = shutdown_notify.notified() => {
228 break;
229 }
230 }
231 }
232 });
233 }
234
235 async fn process_error_event(
237 event: ErrorEvent,
238 error_history: &Arc<RwLock<HashMap<AgentId, Vec<ErrorRecord>>>>,
239 recovery_strategies: &Arc<RwLock<HashMap<ErrorType, RecoveryStrategy>>>,
240 error_thresholds: &Arc<RwLock<HashMap<AgentId, ErrorThresholds>>>,
241 circuit_breakers: &Arc<RwLock<HashMap<AgentId, CircuitBreaker>>>,
242 config: &ErrorHandlerConfig,
243 ) {
244 match event {
245 ErrorEvent::ErrorOccurred { agent_id, error } => {
246 let error_record = ErrorRecord::new(error.clone());
248 error_history
249 .write()
250 .entry(agent_id)
251 .or_default()
252 .push(error_record);
253
254 let circuit_breaker_open = {
256 let mut breakers = circuit_breakers.write();
257 let breaker = breakers.entry(agent_id).or_insert_with(|| {
258 CircuitBreaker::new(
259 config.circuit_breaker_threshold,
260 config.circuit_breaker_timeout,
261 )
262 });
263
264 if breaker.is_open() {
265 tracing::warn!(
266 "Circuit breaker open for agent {}, blocking error handling",
267 agent_id
268 );
269 return;
270 }
271
272 breaker.record_failure();
273 false
274 };
275
276 if circuit_breaker_open {
277 return;
278 }
279
280 let thresholds = error_thresholds
282 .read()
283 .get(&agent_id)
284 .cloned()
285 .unwrap_or_default();
286 let recent_errors = Self::count_recent_errors(
287 error_history,
288 agent_id,
289 config.error_aggregation_window,
290 );
291
292 if recent_errors >= thresholds.max_errors_per_window {
293 tracing::error!(
294 "Agent {} exceeded error threshold: {} errors in window",
295 agent_id,
296 recent_errors
297 );
298 }
300
301 let error_type = Self::classify_error(&error);
303 let strategy_option = {
304 let strategies = recovery_strategies.read();
305 strategies.get(&error_type).cloned()
306 };
307
308 if let Some(strategy) = strategy_option {
309 tracing::info!(
310 "Applying recovery strategy {:?} for agent {} error: {}",
311 strategy,
312 agent_id,
313 error
314 );
315
316 let success = match strategy {
318 RecoveryStrategy::Retry { .. } => true, RecoveryStrategy::Restart { .. } => true, RecoveryStrategy::Terminate { .. } => true, RecoveryStrategy::Failover { .. } => false, RecoveryStrategy::Manual { .. } => false, RecoveryStrategy::None => false, };
325
326 let recovery_event = ErrorEvent::RecoveryAttempted {
328 agent_id,
329 strategy: strategy.clone(),
330 success,
331 timestamp: SystemTime::now(),
332 };
333
334 Box::pin(Self::process_error_event(
336 recovery_event,
337 error_history,
338 recovery_strategies,
339 error_thresholds,
340 circuit_breakers,
341 config,
342 ))
343 .await;
344 } else {
345 tracing::warn!(
346 "No recovery strategy found for error type {:?} in agent {}",
347 error_type,
348 agent_id
349 );
350 }
351 }
352 ErrorEvent::RecoveryAttempted {
353 agent_id,
354 strategy,
355 success,
356 timestamp,
357 } => {
358 if success {
359 tracing::info!(
360 "Recovery successful for agent {} using strategy {:?} at {:?}",
361 agent_id,
362 strategy,
363 timestamp
364 );
365
366 {
368 if let Some(breaker) = circuit_breakers.write().get_mut(&agent_id) {
369 breaker.record_success();
370 }
371 }
372 } else {
373 tracing::error!(
374 "Recovery failed for agent {} using strategy {:?} at {:?}",
375 agent_id,
376 strategy,
377 timestamp
378 );
379 }
380 }
381 }
382 }
383
384 async fn cleanup_old_records(
386 error_history: &Arc<RwLock<HashMap<AgentId, Vec<ErrorRecord>>>>,
387 circuit_breakers: &Arc<RwLock<HashMap<AgentId, CircuitBreaker>>>,
388 max_history: usize,
389 aggregation_window: Duration,
390 ) {
391 let now = SystemTime::now();
392
393 {
395 let mut history = error_history.write();
396 for records in history.values_mut() {
397 records.retain(|record| {
399 now.duration_since(record.timestamp).unwrap_or_default()
400 < aggregation_window * 2
401 });
402
403 if records.len() > max_history {
405 records.drain(0..records.len() - max_history);
406 }
407 }
408
409 history.retain(|_, records| !records.is_empty());
411 }
412
413 {
415 let mut breakers = circuit_breakers.write();
416 for breaker in breakers.values_mut() {
417 breaker.update(now);
418 }
419
420 breakers.retain(|_, breaker| {
422 breaker.is_open()
423 || breaker
424 .last_failure_time
425 .map(|t| now.duration_since(t).unwrap_or_default() < aggregation_window)
426 .unwrap_or(false)
427 });
428 }
429 }
430
431 fn classify_error(error: &RuntimeError) -> ErrorType {
433 match error {
434 RuntimeError::Resource(_) => ErrorType::ResourceExhaustion,
435 RuntimeError::Communication(_) => ErrorType::NetworkError,
436 RuntimeError::Security(_) => ErrorType::SecurityViolation,
437 RuntimeError::Scheduler(_) => ErrorType::SystemError,
438 RuntimeError::Lifecycle(_) => ErrorType::SystemError,
439 RuntimeError::ErrorHandler(_) => ErrorType::SystemError,
440 RuntimeError::Configuration(_) => ErrorType::SystemError,
441 RuntimeError::Policy(_) => ErrorType::SecurityViolation,
442 RuntimeError::Sandbox(_) => ErrorType::SecurityViolation,
443 RuntimeError::Audit(_) => ErrorType::SystemError,
444 RuntimeError::Internal(_) => ErrorType::SystemError,
445 RuntimeError::Authentication(_) => ErrorType::SecurityViolation,
446 }
447 }
448
449 fn count_recent_errors(
451 error_history: &Arc<RwLock<HashMap<AgentId, Vec<ErrorRecord>>>>,
452 agent_id: AgentId,
453 window: Duration,
454 ) -> u32 {
455 let history = error_history.read();
456 if let Some(records) = history.get(&agent_id) {
457 let now = SystemTime::now();
458 records
459 .iter()
460 .filter(|record| now.duration_since(record.timestamp).unwrap_or_default() < window)
461 .count() as u32
462 } else {
463 0
464 }
465 }
466
467 fn send_event(&self, event: ErrorEvent) -> Result<(), ErrorHandlerError> {
469 self.event_sender
470 .send(event)
471 .map_err(|_| ErrorHandlerError::EventProcessingFailed {
472 reason: "Failed to send error event".to_string(),
473 })
474 }
475}
476
477#[async_trait]
478impl ErrorHandler for DefaultErrorHandler {
479 async fn handle_error(
480 &self,
481 agent_id: AgentId,
482 error: RuntimeError,
483 ) -> Result<ErrorAction, ErrorHandlerError> {
484 if !*self.is_running.read() {
485 return Err(ErrorHandlerError::ShuttingDown);
486 }
487
488 self.send_event(ErrorEvent::ErrorOccurred {
490 agent_id,
491 error: error.clone(),
492 })?;
493
494 let error_type = Self::classify_error(&error);
496 let strategies = self.recovery_strategies.read();
497
498 if let Some(strategy) = strategies.get(&error_type) {
499 let action = match strategy {
500 RecoveryStrategy::Retry {
501 max_attempts,
502 backoff,
503 } => ErrorAction::Retry {
504 max_attempts: *max_attempts,
505 backoff: *backoff,
506 },
507 RecoveryStrategy::Restart { .. } => ErrorAction::Restart,
508 RecoveryStrategy::Terminate { .. } => ErrorAction::Terminate,
509 RecoveryStrategy::Failover { .. } => ErrorAction::Failover,
510 RecoveryStrategy::Manual { .. } => ErrorAction::Suspend, RecoveryStrategy::None => ErrorAction::Terminate, };
513
514 Ok(action)
515 } else {
516 Ok(ErrorAction::Retry {
518 max_attempts: 1,
519 backoff: Duration::from_secs(1),
520 })
521 }
522 }
523
524 async fn register_strategy(
525 &self,
526 error_type: ErrorType,
527 strategy: RecoveryStrategy,
528 ) -> Result<(), ErrorHandlerError> {
529 let error_type_clone = error_type;
530 self.recovery_strategies
531 .write()
532 .insert(error_type, strategy);
533 tracing::info!(
534 "Registered recovery strategy for error type {:?}",
535 error_type_clone
536 );
537 Ok(())
538 }
539
540 async fn get_error_stats(
541 &self,
542 agent_id: AgentId,
543 ) -> Result<ErrorStatistics, ErrorHandlerError> {
544 let history = self.error_history.read();
545 if let Some(records) = history.get(&agent_id) {
546 let now = SystemTime::now();
547 let window = self.config.error_aggregation_window;
548
549 let recent_errors = records
550 .iter()
551 .filter(|record| now.duration_since(record.timestamp).unwrap_or_default() < window)
552 .count() as u32;
553
554 let total_errors = records.len() as u32;
555
556 let error_types = records
557 .iter()
558 .map(|record| Self::classify_error(&record.error))
559 .fold(HashMap::new(), |mut acc, error_type| {
560 *acc.entry(error_type).or_insert(0) += 1;
561 acc
562 });
563
564 Ok(ErrorStatistics {
565 agent_id,
566 total_errors,
567 recent_errors,
568 error_types,
569 last_error: records.last().map(|r| r.timestamp),
570 })
571 } else {
572 Ok(ErrorStatistics {
573 agent_id,
574 total_errors: 0,
575 recent_errors: 0,
576 error_types: HashMap::new(),
577 last_error: None,
578 })
579 }
580 }
581
582 async fn get_system_error_stats(&self) -> SystemErrorStatistics {
583 let history = self.error_history.read();
584 let now = SystemTime::now();
585 let window = self.config.error_aggregation_window;
586
587 let mut total_errors = 0;
588 let mut recent_errors = 0;
589 let mut agents_with_errors = 0;
590 let mut error_types = HashMap::new();
591
592 for records in history.values() {
593 if !records.is_empty() {
594 agents_with_errors += 1;
595 total_errors += records.len() as u32;
596
597 let agent_recent_errors = records
598 .iter()
599 .filter(|record| {
600 now.duration_since(record.timestamp).unwrap_or_default() < window
601 })
602 .count() as u32;
603
604 recent_errors += agent_recent_errors;
605
606 for record in records {
607 let error_type = Self::classify_error(&record.error);
608 *error_types.entry(error_type).or_insert(0) += 1;
609 }
610 }
611 }
612
613 SystemErrorStatistics {
614 total_errors,
615 recent_errors,
616 agents_with_errors,
617 error_types,
618 last_updated: now,
619 }
620 }
621
622 async fn set_error_thresholds(
623 &self,
624 agent_id: AgentId,
625 thresholds: ErrorThresholds,
626 ) -> Result<(), ErrorHandlerError> {
627 self.error_thresholds.write().insert(agent_id, thresholds);
628 tracing::info!("Set error thresholds for agent {}", agent_id);
629 Ok(())
630 }
631
632 async fn clear_error_history(&self, agent_id: AgentId) -> Result<(), ErrorHandlerError> {
633 self.error_history.write().remove(&agent_id);
634 self.circuit_breakers.write().remove(&agent_id);
635 tracing::info!("Cleared error history for agent {}", agent_id);
636 Ok(())
637 }
638
639 async fn shutdown(&self) -> Result<(), ErrorHandlerError> {
640 tracing::info!("Shutting down error handler");
641
642 *self.is_running.write() = false;
643 self.shutdown_notify.notify_waiters();
644
645 Ok(())
646 }
647}
648
649#[derive(Debug, Clone)]
651struct ErrorRecord {
652 error: RuntimeError,
653 timestamp: SystemTime,
654}
655
656impl ErrorRecord {
657 fn new(error: RuntimeError) -> Self {
658 Self {
659 error,
660 timestamp: SystemTime::now(),
661 }
662 }
663}
664
665#[derive(Debug, Clone)]
667struct CircuitBreaker {
668 failure_threshold: u32,
669 timeout: Duration,
670 failure_count: u32,
671 last_failure_time: Option<SystemTime>,
672 state: CircuitBreakerState,
673}
674
675#[derive(Debug, Clone, PartialEq, Eq)]
676enum CircuitBreakerState {
677 Closed,
678 Open,
679 HalfOpen,
680}
681
682impl CircuitBreaker {
683 fn new(failure_threshold: u32, timeout: Duration) -> Self {
684 Self {
685 failure_threshold,
686 timeout,
687 failure_count: 0,
688 last_failure_time: None,
689 state: CircuitBreakerState::Closed,
690 }
691 }
692
693 fn is_open(&self) -> bool {
694 self.state == CircuitBreakerState::Open
695 }
696
697 fn record_failure(&mut self) {
698 self.failure_count += 1;
699 self.last_failure_time = Some(SystemTime::now());
700
701 if self.failure_count >= self.failure_threshold {
702 self.state = CircuitBreakerState::Open;
703 }
704 }
705
706 fn record_success(&mut self) {
707 self.failure_count = 0;
708 self.state = CircuitBreakerState::Closed;
709 }
710
711 fn update(&mut self, now: SystemTime) {
712 if self.state == CircuitBreakerState::Open {
713 if let Some(last_failure) = self.last_failure_time {
714 if now.duration_since(last_failure).unwrap_or_default() > self.timeout {
715 self.state = CircuitBreakerState::HalfOpen;
716 }
717 }
718 }
719 }
720}
721
722#[derive(Debug, Clone)]
724pub struct ErrorThresholds {
725 pub max_errors_per_window: u32,
726 pub escalation_threshold: u32,
727}
728
729impl Default for ErrorThresholds {
730 fn default() -> Self {
731 Self {
732 max_errors_per_window: 10,
733 escalation_threshold: 5,
734 }
735 }
736}
737
738#[derive(Debug, Clone)]
740pub struct ErrorStatistics {
741 pub agent_id: AgentId,
742 pub total_errors: u32,
743 pub recent_errors: u32,
744 pub error_types: HashMap<ErrorType, u32>,
745 pub last_error: Option<SystemTime>,
746}
747
748#[derive(Debug, Clone)]
750pub struct SystemErrorStatistics {
751 pub total_errors: u32,
752 pub recent_errors: u32,
753 pub agents_with_errors: u32,
754 pub error_types: HashMap<ErrorType, u32>,
755 pub last_updated: SystemTime,
756}
757
758#[derive(Debug, Clone)]
760pub enum ErrorAction {
761 Retry {
762 max_attempts: u32,
763 backoff: Duration,
764 },
765 Restart,
766 Suspend,
767 Terminate,
768 Failover,
769}
770
771#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
773pub enum ErrorType {
774 ResourceExhaustion,
775 NetworkError,
776 SecurityViolation,
777 PolicyViolation,
778 SystemError,
779 ValidationError,
780}
781
782#[derive(Debug, Clone)]
784enum ErrorEvent {
785 ErrorOccurred {
786 agent_id: AgentId,
787 error: RuntimeError,
788 },
789 RecoveryAttempted {
790 agent_id: AgentId,
791 strategy: RecoveryStrategy,
792 success: bool,
793 timestamp: SystemTime,
794 },
795}
796
797#[cfg(test)]
798mod tests {
799 use super::*;
800
801 #[tokio::test]
802 async fn test_error_handling() {
803 let handler = DefaultErrorHandler::new(ErrorHandlerConfig::default())
804 .await
805 .unwrap();
806 let agent_id = AgentId::new();
807 let error =
808 RuntimeError::Resource(ResourceError::Insufficient("Memory exhausted".to_string()));
809
810 let action = handler.handle_error(agent_id, error).await.unwrap();
811
812 match action {
813 ErrorAction::Retry { max_attempts, .. } => {
814 assert_eq!(max_attempts, 3);
815 }
816 _ => panic!("Expected retry action for resource error"),
817 }
818 }
819
820 #[tokio::test]
821 async fn test_error_statistics() {
822 let handler = DefaultErrorHandler::new(ErrorHandlerConfig::default())
823 .await
824 .unwrap();
825 let agent_id = AgentId::new();
826
827 for _ in 0..5 {
829 let error =
830 RuntimeError::Resource(ResourceError::Insufficient("Memory exhausted".to_string()));
831 handler.handle_error(agent_id, error).await.unwrap();
832 }
833
834 tokio::time::sleep(Duration::from_millis(50)).await;
836
837 let stats = handler.get_error_stats(agent_id).await.unwrap();
838 assert_eq!(stats.total_errors, 5);
839 assert_eq!(stats.recent_errors, 5);
840 assert!(stats
841 .error_types
842 .contains_key(&ErrorType::ResourceExhaustion));
843 }
844
845 #[tokio::test]
846 async fn test_recovery_strategy_registration() {
847 let handler = DefaultErrorHandler::new(ErrorHandlerConfig::default())
848 .await
849 .unwrap();
850
851 let strategy = RecoveryStrategy::Terminate { cleanup: true };
852 let result = handler
853 .register_strategy(ErrorType::SecurityViolation, strategy)
854 .await;
855 assert!(result.is_ok());
856 }
857
858 #[tokio::test]
859 async fn test_error_thresholds() {
860 let handler = DefaultErrorHandler::new(ErrorHandlerConfig::default())
861 .await
862 .unwrap();
863 let agent_id = AgentId::new();
864
865 let thresholds = ErrorThresholds {
866 max_errors_per_window: 3,
867 escalation_threshold: 2,
868 };
869
870 let result = handler.set_error_thresholds(agent_id, thresholds).await;
871 assert!(result.is_ok());
872 }
873
874 #[test]
875 fn test_circuit_breaker() {
876 let mut breaker = CircuitBreaker::new(3, Duration::from_secs(60));
877
878 assert!(!breaker.is_open());
879
880 breaker.record_failure();
882 breaker.record_failure();
883 assert!(!breaker.is_open());
884
885 breaker.record_failure();
886 assert!(breaker.is_open());
887
888 breaker.record_success();
890 assert!(!breaker.is_open());
891 }
892}