1use async_trait::async_trait;
6use parking_lot::RwLock;
7use std::boxed::Box;
8use std::collections::HashMap;
9use std::sync::Arc;
10use std::time::{Duration, SystemTime};
11use tokio::sync::{mpsc, Notify};
12use tokio::time::interval;
13
14use crate::types::*;
15
16#[async_trait]
18pub trait ErrorHandler {
19 async fn handle_error(
21 &self,
22 agent_id: AgentId,
23 error: RuntimeError,
24 ) -> Result<ErrorAction, ErrorHandlerError>;
25
26 async fn register_strategy(
28 &self,
29 error_type: ErrorType,
30 strategy: RecoveryStrategy,
31 ) -> Result<(), ErrorHandlerError>;
32
33 async fn get_error_stats(
35 &self,
36 agent_id: AgentId,
37 ) -> Result<ErrorStatistics, ErrorHandlerError>;
38
39 async fn get_system_error_stats(&self) -> SystemErrorStatistics;
41
42 async fn set_error_thresholds(
44 &self,
45 agent_id: AgentId,
46 thresholds: ErrorThresholds,
47 ) -> Result<(), ErrorHandlerError>;
48
49 async fn clear_error_history(&self, agent_id: AgentId) -> Result<(), ErrorHandlerError>;
51
52 async fn shutdown(&self) -> Result<(), ErrorHandlerError>;
54}
55
56#[derive(Debug, Clone)]
58pub struct ErrorHandlerConfig {
59 pub max_error_history: usize,
60 pub error_aggregation_window: Duration,
61 pub escalation_threshold: u32,
62 pub circuit_breaker_threshold: u32,
63 pub circuit_breaker_timeout: Duration,
64 pub enable_auto_recovery: bool,
65 pub max_recovery_attempts: u32,
66 pub recovery_backoff_multiplier: f32,
67}
68
69impl Default for ErrorHandlerConfig {
70 fn default() -> Self {
71 Self {
72 max_error_history: 1000,
73 error_aggregation_window: Duration::from_secs(300), escalation_threshold: 5,
75 circuit_breaker_threshold: 10,
76 circuit_breaker_timeout: Duration::from_secs(60),
77 enable_auto_recovery: true,
78 max_recovery_attempts: 3,
79 recovery_backoff_multiplier: 2.0,
80 }
81 }
82}
83
84pub struct DefaultErrorHandler {
86 config: ErrorHandlerConfig,
87 error_history: Arc<RwLock<HashMap<AgentId, Vec<ErrorRecord>>>>,
88 recovery_strategies: Arc<RwLock<HashMap<ErrorType, RecoveryStrategy>>>,
89 error_thresholds: Arc<RwLock<HashMap<AgentId, ErrorThresholds>>>,
90 circuit_breakers: Arc<RwLock<HashMap<AgentId, CircuitBreaker>>>,
91 event_sender: mpsc::UnboundedSender<ErrorEvent>,
92 shutdown_notify: Arc<Notify>,
93 is_running: Arc<RwLock<bool>>,
94}
95
96impl DefaultErrorHandler {
97 pub async fn new(config: ErrorHandlerConfig) -> Result<Self, ErrorHandlerError> {
99 let error_history = Arc::new(RwLock::new(HashMap::new()));
100 let recovery_strategies = Arc::new(RwLock::new(Self::default_strategies()));
101 let error_thresholds = Arc::new(RwLock::new(HashMap::new()));
102 let circuit_breakers = Arc::new(RwLock::new(HashMap::new()));
103 let (event_sender, event_receiver) = mpsc::unbounded_channel();
104 let shutdown_notify = Arc::new(Notify::new());
105 let is_running = Arc::new(RwLock::new(true));
106
107 let handler = Self {
108 config,
109 error_history,
110 recovery_strategies,
111 error_thresholds,
112 circuit_breakers,
113 event_sender,
114 shutdown_notify,
115 is_running,
116 };
117
118 handler.start_event_loop(event_receiver).await;
120 handler.start_cleanup_loop().await;
121
122 Ok(handler)
123 }
124
125 fn default_strategies() -> HashMap<ErrorType, RecoveryStrategy> {
127 let mut strategies = HashMap::new();
128
129 strategies.insert(
130 ErrorType::ResourceExhaustion,
131 RecoveryStrategy::Retry {
132 max_attempts: 3,
133 backoff: Duration::from_secs(1),
134 },
135 );
136
137 strategies.insert(
138 ErrorType::NetworkError,
139 RecoveryStrategy::Retry {
140 max_attempts: 5,
141 backoff: Duration::from_millis(500),
142 },
143 );
144
145 strategies.insert(
146 ErrorType::SecurityViolation,
147 RecoveryStrategy::Terminate { cleanup: true },
148 );
149
150 strategies.insert(
151 ErrorType::PolicyViolation,
152 RecoveryStrategy::Manual {
153 reason: "Policy violation requires manual review".to_string(),
154 },
155 );
156
157 strategies.insert(
158 ErrorType::SystemError,
159 RecoveryStrategy::Restart {
160 preserve_state: false,
161 },
162 );
163
164 strategies.insert(
165 ErrorType::ValidationError,
166 RecoveryStrategy::Failover { backup_agent: None },
167 );
168
169 strategies
170 }
171
172 async fn start_event_loop(&self, mut event_receiver: mpsc::UnboundedReceiver<ErrorEvent>) {
174 let error_history = self.error_history.clone();
175 let recovery_strategies = self.recovery_strategies.clone();
176 let error_thresholds = self.error_thresholds.clone();
177 let circuit_breakers = self.circuit_breakers.clone();
178 let shutdown_notify = self.shutdown_notify.clone();
179 let config = self.config.clone();
180
181 tokio::spawn(async move {
182 loop {
183 tokio::select! {
184 event = event_receiver.recv() => {
185 if let Some(event) = event {
186 Self::process_error_event(
187 event,
188 &error_history,
189 &recovery_strategies,
190 &error_thresholds,
191 &circuit_breakers,
192 &config,
193 ).await;
194 } else {
195 break;
196 }
197 }
198 _ = shutdown_notify.notified() => {
199 break;
200 }
201 }
202 }
203 });
204 }
205
206 async fn start_cleanup_loop(&self) {
208 let error_history = self.error_history.clone();
209 let circuit_breakers = self.circuit_breakers.clone();
210 let shutdown_notify = self.shutdown_notify.clone();
211 let is_running = self.is_running.clone();
212 let max_history = self.config.max_error_history;
213 let aggregation_window = self.config.error_aggregation_window;
214
215 tokio::spawn(async move {
216 let mut interval = interval(Duration::from_secs(300)); loop {
219 tokio::select! {
220 _ = interval.tick() => {
221 if !*is_running.read() {
222 break;
223 }
224
225 Self::cleanup_old_records(&error_history, &circuit_breakers, max_history, aggregation_window).await;
226 }
227 _ = shutdown_notify.notified() => {
228 break;
229 }
230 }
231 }
232 });
233 }
234
235 async fn process_error_event(
237 event: ErrorEvent,
238 error_history: &Arc<RwLock<HashMap<AgentId, Vec<ErrorRecord>>>>,
239 recovery_strategies: &Arc<RwLock<HashMap<ErrorType, RecoveryStrategy>>>,
240 error_thresholds: &Arc<RwLock<HashMap<AgentId, ErrorThresholds>>>,
241 circuit_breakers: &Arc<RwLock<HashMap<AgentId, CircuitBreaker>>>,
242 config: &ErrorHandlerConfig,
243 ) {
244 match event {
245 ErrorEvent::ErrorOccurred { agent_id, error } => {
246 let error_record = ErrorRecord::new(error.clone());
248 error_history
249 .write()
250 .entry(agent_id)
251 .or_default()
252 .push(error_record);
253
254 let circuit_breaker_open = {
256 let mut breakers = circuit_breakers.write();
257 let breaker = breakers.entry(agent_id).or_insert_with(|| {
258 CircuitBreaker::new(
259 config.circuit_breaker_threshold,
260 config.circuit_breaker_timeout,
261 )
262 });
263
264 if breaker.is_open() {
265 tracing::warn!(
266 "Circuit breaker open for agent {}, blocking error handling",
267 agent_id
268 );
269 return;
270 }
271
272 breaker.record_failure();
273 false
274 };
275
276 if circuit_breaker_open {
277 return;
278 }
279
280 let thresholds = error_thresholds
282 .read()
283 .get(&agent_id)
284 .cloned()
285 .unwrap_or_default();
286 let recent_errors = Self::count_recent_errors(
287 error_history,
288 agent_id,
289 config.error_aggregation_window,
290 );
291
292 if recent_errors >= thresholds.max_errors_per_window {
293 tracing::error!(
294 "Agent {} exceeded error threshold: {} errors in window",
295 agent_id,
296 recent_errors
297 );
298 }
300
301 let error_type = Self::classify_error(&error);
303 let strategy_option = {
304 let strategies = recovery_strategies.read();
305 strategies.get(&error_type).cloned()
306 };
307
308 if let Some(strategy) = strategy_option {
309 tracing::info!(
310 "Applying recovery strategy {:?} for agent {} error: {}",
311 strategy,
312 agent_id,
313 error
314 );
315
316 let success = match strategy {
318 RecoveryStrategy::Retry { .. } => true, RecoveryStrategy::Restart { .. } => true, RecoveryStrategy::Terminate { .. } => true, RecoveryStrategy::Failover { .. } => false, RecoveryStrategy::Manual { .. } => false, RecoveryStrategy::None => false, };
325
326 let recovery_event = ErrorEvent::RecoveryAttempted {
328 agent_id,
329 strategy: strategy.clone(),
330 success,
331 timestamp: SystemTime::now(),
332 };
333
334 Box::pin(Self::process_error_event(
336 recovery_event,
337 error_history,
338 recovery_strategies,
339 error_thresholds,
340 circuit_breakers,
341 config,
342 ))
343 .await;
344 } else {
345 tracing::warn!(
346 "No recovery strategy found for error type {:?} in agent {}",
347 error_type,
348 agent_id
349 );
350 }
351 }
352 ErrorEvent::RecoveryAttempted {
353 agent_id,
354 strategy,
355 success,
356 timestamp,
357 } => {
358 if success {
359 tracing::info!(
360 "Recovery successful for agent {} using strategy {:?} at {:?}",
361 agent_id,
362 strategy,
363 timestamp
364 );
365
366 {
368 if let Some(breaker) = circuit_breakers.write().get_mut(&agent_id) {
369 breaker.record_success();
370 }
371 }
372 } else {
373 tracing::error!(
374 "Recovery failed for agent {} using strategy {:?} at {:?}",
375 agent_id,
376 strategy,
377 timestamp
378 );
379 }
380 }
381 }
382 }
383
384 async fn cleanup_old_records(
386 error_history: &Arc<RwLock<HashMap<AgentId, Vec<ErrorRecord>>>>,
387 circuit_breakers: &Arc<RwLock<HashMap<AgentId, CircuitBreaker>>>,
388 max_history: usize,
389 aggregation_window: Duration,
390 ) {
391 let now = SystemTime::now();
392
393 {
395 let mut history = error_history.write();
396 for records in history.values_mut() {
397 records.retain(|record| {
399 now.duration_since(record.timestamp).unwrap_or_default()
400 < aggregation_window * 2
401 });
402
403 if records.len() > max_history {
405 records.drain(0..records.len() - max_history);
406 }
407 }
408
409 history.retain(|_, records| !records.is_empty());
411 }
412
413 {
415 let mut breakers = circuit_breakers.write();
416 for breaker in breakers.values_mut() {
417 breaker.update(now);
418 }
419
420 breakers.retain(|_, breaker| {
422 breaker.is_open()
423 || breaker
424 .last_failure_time
425 .map(|t| now.duration_since(t).unwrap_or_default() < aggregation_window)
426 .unwrap_or(false)
427 });
428 }
429 }
430
431 fn classify_error(error: &RuntimeError) -> ErrorType {
433 match error {
434 RuntimeError::Resource(_) => ErrorType::ResourceExhaustion,
435 RuntimeError::Communication(_) => ErrorType::NetworkError,
436 RuntimeError::Security(_) => ErrorType::SecurityViolation,
437 RuntimeError::Scheduler(_) => ErrorType::SystemError,
438 RuntimeError::Lifecycle(_) => ErrorType::SystemError,
439 RuntimeError::ErrorHandler(_) => ErrorType::SystemError,
440 RuntimeError::Configuration(_) => ErrorType::SystemError,
441 RuntimeError::Policy(_) => ErrorType::SecurityViolation,
442 RuntimeError::Sandbox(_) => ErrorType::SecurityViolation,
443 RuntimeError::Audit(_) => ErrorType::SystemError,
444 RuntimeError::Internal(_) => ErrorType::SystemError,
445 }
446 }
447
448 fn count_recent_errors(
450 error_history: &Arc<RwLock<HashMap<AgentId, Vec<ErrorRecord>>>>,
451 agent_id: AgentId,
452 window: Duration,
453 ) -> u32 {
454 let history = error_history.read();
455 if let Some(records) = history.get(&agent_id) {
456 let now = SystemTime::now();
457 records
458 .iter()
459 .filter(|record| now.duration_since(record.timestamp).unwrap_or_default() < window)
460 .count() as u32
461 } else {
462 0
463 }
464 }
465
466 fn send_event(&self, event: ErrorEvent) -> Result<(), ErrorHandlerError> {
468 self.event_sender
469 .send(event)
470 .map_err(|_| ErrorHandlerError::EventProcessingFailed {
471 reason: "Failed to send error event".to_string(),
472 })
473 }
474}
475
476#[async_trait]
477impl ErrorHandler for DefaultErrorHandler {
478 async fn handle_error(
479 &self,
480 agent_id: AgentId,
481 error: RuntimeError,
482 ) -> Result<ErrorAction, ErrorHandlerError> {
483 if !*self.is_running.read() {
484 return Err(ErrorHandlerError::ShuttingDown);
485 }
486
487 self.send_event(ErrorEvent::ErrorOccurred {
489 agent_id,
490 error: error.clone(),
491 })?;
492
493 let error_type = Self::classify_error(&error);
495 let strategies = self.recovery_strategies.read();
496
497 if let Some(strategy) = strategies.get(&error_type) {
498 let action = match strategy {
499 RecoveryStrategy::Retry {
500 max_attempts,
501 backoff,
502 } => ErrorAction::Retry {
503 max_attempts: *max_attempts,
504 backoff: *backoff,
505 },
506 RecoveryStrategy::Restart { .. } => ErrorAction::Restart,
507 RecoveryStrategy::Terminate { .. } => ErrorAction::Terminate,
508 RecoveryStrategy::Failover { .. } => ErrorAction::Failover,
509 RecoveryStrategy::Manual { .. } => ErrorAction::Suspend, RecoveryStrategy::None => ErrorAction::Terminate, };
512
513 Ok(action)
514 } else {
515 Ok(ErrorAction::Retry {
517 max_attempts: 1,
518 backoff: Duration::from_secs(1),
519 })
520 }
521 }
522
523 async fn register_strategy(
524 &self,
525 error_type: ErrorType,
526 strategy: RecoveryStrategy,
527 ) -> Result<(), ErrorHandlerError> {
528 let error_type_clone = error_type;
529 self.recovery_strategies
530 .write()
531 .insert(error_type, strategy);
532 tracing::info!(
533 "Registered recovery strategy for error type {:?}",
534 error_type_clone
535 );
536 Ok(())
537 }
538
539 async fn get_error_stats(
540 &self,
541 agent_id: AgentId,
542 ) -> Result<ErrorStatistics, ErrorHandlerError> {
543 let history = self.error_history.read();
544 if let Some(records) = history.get(&agent_id) {
545 let now = SystemTime::now();
546 let window = self.config.error_aggregation_window;
547
548 let recent_errors = records
549 .iter()
550 .filter(|record| now.duration_since(record.timestamp).unwrap_or_default() < window)
551 .count() as u32;
552
553 let total_errors = records.len() as u32;
554
555 let error_types = records
556 .iter()
557 .map(|record| Self::classify_error(&record.error))
558 .fold(HashMap::new(), |mut acc, error_type| {
559 *acc.entry(error_type).or_insert(0) += 1;
560 acc
561 });
562
563 Ok(ErrorStatistics {
564 agent_id,
565 total_errors,
566 recent_errors,
567 error_types,
568 last_error: records.last().map(|r| r.timestamp),
569 })
570 } else {
571 Ok(ErrorStatistics {
572 agent_id,
573 total_errors: 0,
574 recent_errors: 0,
575 error_types: HashMap::new(),
576 last_error: None,
577 })
578 }
579 }
580
581 async fn get_system_error_stats(&self) -> SystemErrorStatistics {
582 let history = self.error_history.read();
583 let now = SystemTime::now();
584 let window = self.config.error_aggregation_window;
585
586 let mut total_errors = 0;
587 let mut recent_errors = 0;
588 let mut agents_with_errors = 0;
589 let mut error_types = HashMap::new();
590
591 for records in history.values() {
592 if !records.is_empty() {
593 agents_with_errors += 1;
594 total_errors += records.len() as u32;
595
596 let agent_recent_errors = records
597 .iter()
598 .filter(|record| {
599 now.duration_since(record.timestamp).unwrap_or_default() < window
600 })
601 .count() as u32;
602
603 recent_errors += agent_recent_errors;
604
605 for record in records {
606 let error_type = Self::classify_error(&record.error);
607 *error_types.entry(error_type).or_insert(0) += 1;
608 }
609 }
610 }
611
612 SystemErrorStatistics {
613 total_errors,
614 recent_errors,
615 agents_with_errors,
616 error_types,
617 last_updated: now,
618 }
619 }
620
621 async fn set_error_thresholds(
622 &self,
623 agent_id: AgentId,
624 thresholds: ErrorThresholds,
625 ) -> Result<(), ErrorHandlerError> {
626 self.error_thresholds.write().insert(agent_id, thresholds);
627 tracing::info!("Set error thresholds for agent {}", agent_id);
628 Ok(())
629 }
630
631 async fn clear_error_history(&self, agent_id: AgentId) -> Result<(), ErrorHandlerError> {
632 self.error_history.write().remove(&agent_id);
633 self.circuit_breakers.write().remove(&agent_id);
634 tracing::info!("Cleared error history for agent {}", agent_id);
635 Ok(())
636 }
637
638 async fn shutdown(&self) -> Result<(), ErrorHandlerError> {
639 tracing::info!("Shutting down error handler");
640
641 *self.is_running.write() = false;
642 self.shutdown_notify.notify_waiters();
643
644 Ok(())
645 }
646}
647
648#[derive(Debug, Clone)]
650struct ErrorRecord {
651 error: RuntimeError,
652 timestamp: SystemTime,
653}
654
655impl ErrorRecord {
656 fn new(error: RuntimeError) -> Self {
657 Self {
658 error,
659 timestamp: SystemTime::now(),
660 }
661 }
662}
663
664#[derive(Debug, Clone)]
666struct CircuitBreaker {
667 failure_threshold: u32,
668 timeout: Duration,
669 failure_count: u32,
670 last_failure_time: Option<SystemTime>,
671 state: CircuitBreakerState,
672}
673
674#[derive(Debug, Clone, PartialEq, Eq)]
675enum CircuitBreakerState {
676 Closed,
677 Open,
678 HalfOpen,
679}
680
681impl CircuitBreaker {
682 fn new(failure_threshold: u32, timeout: Duration) -> Self {
683 Self {
684 failure_threshold,
685 timeout,
686 failure_count: 0,
687 last_failure_time: None,
688 state: CircuitBreakerState::Closed,
689 }
690 }
691
692 fn is_open(&self) -> bool {
693 self.state == CircuitBreakerState::Open
694 }
695
696 fn record_failure(&mut self) {
697 self.failure_count += 1;
698 self.last_failure_time = Some(SystemTime::now());
699
700 if self.failure_count >= self.failure_threshold {
701 self.state = CircuitBreakerState::Open;
702 }
703 }
704
705 fn record_success(&mut self) {
706 self.failure_count = 0;
707 self.state = CircuitBreakerState::Closed;
708 }
709
710 fn update(&mut self, now: SystemTime) {
711 if self.state == CircuitBreakerState::Open {
712 if let Some(last_failure) = self.last_failure_time {
713 if now.duration_since(last_failure).unwrap_or_default() > self.timeout {
714 self.state = CircuitBreakerState::HalfOpen;
715 }
716 }
717 }
718 }
719}
720
721#[derive(Debug, Clone)]
723pub struct ErrorThresholds {
724 pub max_errors_per_window: u32,
725 pub escalation_threshold: u32,
726}
727
728impl Default for ErrorThresholds {
729 fn default() -> Self {
730 Self {
731 max_errors_per_window: 10,
732 escalation_threshold: 5,
733 }
734 }
735}
736
737#[derive(Debug, Clone)]
739pub struct ErrorStatistics {
740 pub agent_id: AgentId,
741 pub total_errors: u32,
742 pub recent_errors: u32,
743 pub error_types: HashMap<ErrorType, u32>,
744 pub last_error: Option<SystemTime>,
745}
746
747#[derive(Debug, Clone)]
749pub struct SystemErrorStatistics {
750 pub total_errors: u32,
751 pub recent_errors: u32,
752 pub agents_with_errors: u32,
753 pub error_types: HashMap<ErrorType, u32>,
754 pub last_updated: SystemTime,
755}
756
757#[derive(Debug, Clone)]
759pub enum ErrorAction {
760 Retry {
761 max_attempts: u32,
762 backoff: Duration,
763 },
764 Restart,
765 Suspend,
766 Terminate,
767 Failover,
768}
769
770#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
772pub enum ErrorType {
773 ResourceExhaustion,
774 NetworkError,
775 SecurityViolation,
776 PolicyViolation,
777 SystemError,
778 ValidationError,
779}
780
781#[derive(Debug, Clone)]
783enum ErrorEvent {
784 ErrorOccurred {
785 agent_id: AgentId,
786 error: RuntimeError,
787 },
788 RecoveryAttempted {
789 agent_id: AgentId,
790 strategy: RecoveryStrategy,
791 success: bool,
792 timestamp: SystemTime,
793 },
794}
795
796#[cfg(test)]
797mod tests {
798 use super::*;
799
800 #[tokio::test]
801 async fn test_error_handling() {
802 let handler = DefaultErrorHandler::new(ErrorHandlerConfig::default())
803 .await
804 .unwrap();
805 let agent_id = AgentId::new();
806 let error =
807 RuntimeError::Resource(ResourceError::Insufficient("Memory exhausted".to_string()));
808
809 let action = handler.handle_error(agent_id, error).await.unwrap();
810
811 match action {
812 ErrorAction::Retry { max_attempts, .. } => {
813 assert_eq!(max_attempts, 3);
814 }
815 _ => panic!("Expected retry action for resource error"),
816 }
817 }
818
819 #[tokio::test]
820 async fn test_error_statistics() {
821 let handler = DefaultErrorHandler::new(ErrorHandlerConfig::default())
822 .await
823 .unwrap();
824 let agent_id = AgentId::new();
825
826 for _ in 0..5 {
828 let error =
829 RuntimeError::Resource(ResourceError::Insufficient("Memory exhausted".to_string()));
830 handler.handle_error(agent_id, error).await.unwrap();
831 }
832
833 tokio::time::sleep(Duration::from_millis(50)).await;
835
836 let stats = handler.get_error_stats(agent_id).await.unwrap();
837 assert_eq!(stats.total_errors, 5);
838 assert_eq!(stats.recent_errors, 5);
839 assert!(stats
840 .error_types
841 .contains_key(&ErrorType::ResourceExhaustion));
842 }
843
844 #[tokio::test]
845 async fn test_recovery_strategy_registration() {
846 let handler = DefaultErrorHandler::new(ErrorHandlerConfig::default())
847 .await
848 .unwrap();
849
850 let strategy = RecoveryStrategy::Terminate { cleanup: true };
851 let result = handler
852 .register_strategy(ErrorType::SecurityViolation, strategy)
853 .await;
854 assert!(result.is_ok());
855 }
856
857 #[tokio::test]
858 async fn test_error_thresholds() {
859 let handler = DefaultErrorHandler::new(ErrorHandlerConfig::default())
860 .await
861 .unwrap();
862 let agent_id = AgentId::new();
863
864 let thresholds = ErrorThresholds {
865 max_errors_per_window: 3,
866 escalation_threshold: 2,
867 };
868
869 let result = handler.set_error_thresholds(agent_id, thresholds).await;
870 assert!(result.is_ok());
871 }
872
873 #[test]
874 fn test_circuit_breaker() {
875 let mut breaker = CircuitBreaker::new(3, Duration::from_secs(60));
876
877 assert!(!breaker.is_open());
878
879 breaker.record_failure();
881 breaker.record_failure();
882 assert!(!breaker.is_open());
883
884 breaker.record_failure();
885 assert!(breaker.is_open());
886
887 breaker.record_success();
889 assert!(!breaker.is_open());
890 }
891}