1use std::collections::VecDeque;
16use std::sync::Arc;
17use std::time::Duration;
18
19use serde::{Deserialize, Serialize};
20
21use crate::clock::{Clock, SystemClock};
22
23#[derive(Debug, Clone, Serialize, Deserialize)]
25pub struct CircuitBreakerConfig {
26 pub loss_limit: u32,
28 pub window_secs: u64,
30 pub cooldown_secs: u64,
32}
33
34impl Default for CircuitBreakerConfig {
35 fn default() -> Self {
36 Self {
39 loss_limit: 4,
40 window_secs: 14_400,
41 cooldown_secs: 3_600,
42 }
43 }
44}
45
46#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
55pub struct CircuitBreakerSnapshot {
56 pub recent_losses: Vec<u64>,
58 pub tripped_at_unix_secs: Option<u64>,
60}
61
62#[derive(Debug, Clone)]
82pub struct CircuitBreaker {
83 config: CircuitBreakerConfig,
84 recent_losses: VecDeque<u64>,
87 tripped_at_unix_secs: Option<u64>,
88 clock: Arc<dyn Clock>,
89}
90
91impl CircuitBreaker {
92 pub fn new(config: CircuitBreakerConfig) -> Self {
94 Self::with_clock(config, Arc::new(SystemClock))
95 }
96
97 pub fn with_clock(config: CircuitBreakerConfig, clock: Arc<dyn Clock>) -> Self {
100 Self {
101 config,
102 recent_losses: VecDeque::with_capacity(16),
103 tripped_at_unix_secs: None,
104 clock,
105 }
106 }
107
108 pub fn tick(&mut self) {
111 let now = self.clock.now_unix_secs();
112 if let Some(t) = self.tripped_at_unix_secs
113 && now.saturating_sub(t) >= self.config.cooldown_secs
114 {
115 self.reset();
116 }
117 self.evict_old(now);
118 }
119
120 pub fn record_loss(&mut self) {
123 let now = self.clock.now_unix_secs();
124 self.recent_losses.push_back(now);
125 self.evict_old(now);
126
127 if self.recent_losses.len() as u32 >= self.config.loss_limit {
128 self.tripped_at_unix_secs = Some(now);
129 tracing::warn!(
130 losses = self.recent_losses.len(),
131 window_secs = self.config.window_secs,
132 "circuit breaker tripped"
133 );
134 }
135 }
136
137 pub fn record_win(&mut self) {
143 self.evict_old(self.clock.now_unix_secs());
144 }
145
146 pub fn is_tripped(&self) -> bool {
148 self.tripped_at_unix_secs.is_some_and(|t| {
149 self.clock.now_unix_secs().saturating_sub(t) < self.config.cooldown_secs
150 })
151 }
152
153 pub fn snapshot(&self) -> CircuitBreakerSnapshot {
159 CircuitBreakerSnapshot {
160 recent_losses: self.recent_losses.iter().copied().collect(),
161 tripped_at_unix_secs: self.tripped_at_unix_secs,
162 }
163 }
164
165 pub fn restore(&mut self, snap: CircuitBreakerSnapshot) {
172 self.recent_losses = snap.recent_losses.into_iter().collect();
173 self.tripped_at_unix_secs = snap.tripped_at_unix_secs;
174 }
175
176 pub fn reset(&mut self) {
179 self.recent_losses.clear();
180 self.tripped_at_unix_secs = None;
181 }
182
183 pub fn recent_loss_count(&self) -> usize {
185 self.recent_losses.len()
186 }
187
188 pub fn cooldown_remaining(&self) -> Option<Duration> {
190 let t = self.tripped_at_unix_secs?;
191 let elapsed = self.clock.now_unix_secs().saturating_sub(t);
192 (elapsed < self.config.cooldown_secs)
193 .then(|| Duration::from_secs(self.config.cooldown_secs - elapsed))
194 }
195
196 fn evict_old(&mut self, now: u64) {
197 let cutoff = now.saturating_sub(self.config.window_secs);
198 while let Some(&ts) = self.recent_losses.front() {
199 if ts < cutoff {
200 self.recent_losses.pop_front();
201 } else {
202 break;
203 }
204 }
205 }
206}
207
208#[cfg(test)]
209mod tests {
210 use super::*;
211 use crate::clock::ManualClock;
212
213 fn cfg(loss_limit: u32, window: u64, cooldown: u64) -> CircuitBreakerConfig {
214 CircuitBreakerConfig {
215 loss_limit,
216 window_secs: window,
217 cooldown_secs: cooldown,
218 }
219 }
220
221 fn breaker(
222 loss_limit: u32,
223 window: u64,
224 cooldown: u64,
225 start: u64,
226 ) -> (CircuitBreaker, Arc<ManualClock>) {
227 let clock = Arc::new(ManualClock::new(start));
228 let cb = CircuitBreaker::with_clock(cfg(loss_limit, window, cooldown), clock.clone());
229 (cb, clock)
230 }
231
232 #[test]
233 fn starts_untripped() {
234 let cb = CircuitBreaker::new(cfg(4, 14400, 3600));
235 assert!(!cb.is_tripped());
236 assert_eq!(cb.recent_loss_count(), 0);
237 }
238
239 #[test]
240 fn trips_at_limit() {
241 let mut cb = CircuitBreaker::new(cfg(3, 14400, 3600));
242 cb.record_loss();
243 cb.record_loss();
244 assert!(!cb.is_tripped());
245 cb.record_loss();
246 assert!(cb.is_tripped());
247 }
248
249 #[test]
250 fn win_does_not_untrip() {
251 let mut cb = CircuitBreaker::new(cfg(2, 14400, 3600));
252 cb.record_loss();
253 cb.record_loss();
254 assert!(cb.is_tripped());
255 cb.record_win();
256 assert!(cb.is_tripped());
257 }
258
259 #[test]
260 fn reset_clears_state() {
261 let mut cb = CircuitBreaker::new(cfg(2, 14400, 3600));
262 cb.record_loss();
263 cb.record_loss();
264 cb.reset();
265 assert!(!cb.is_tripped());
266 assert_eq!(cb.recent_loss_count(), 0);
267 }
268
269 #[test]
270 fn old_losses_evicted_from_rolling_window() {
271 let (mut cb, clock) = breaker(
272 3, 3600, 600, 1_000_000,
273 );
274
275 cb.record_loss(); cb.record_loss(); assert_eq!(cb.recent_loss_count(), 2);
278
279 clock.advance_secs(3_700);
282 cb.record_loss(); assert_eq!(
284 cb.recent_loss_count(),
285 1,
286 "losses outside the rolling window must be evicted"
287 );
288 assert!(!cb.is_tripped(), "rolling count of 1 should not trip");
289 }
290
291 #[test]
292 fn cooldown_auto_resets_on_tick() {
293 let (mut cb, clock) = breaker(
294 2, 3600, 600, 1_000_000,
295 );
296
297 cb.record_loss();
298 cb.record_loss();
299 assert!(cb.is_tripped());
300 assert_eq!(
301 cb.cooldown_remaining(),
302 Some(Duration::from_secs(600)),
303 "cooldown should report full remaining at trip time"
304 );
305
306 clock.advance_secs(300);
308 cb.tick();
309 assert!(cb.is_tripped());
310 assert_eq!(cb.cooldown_remaining(), Some(Duration::from_secs(300)));
311
312 clock.advance_secs(301);
314 cb.tick();
315 assert!(!cb.is_tripped());
316 assert_eq!(cb.cooldown_remaining(), None);
317 assert_eq!(cb.recent_loss_count(), 0);
318 }
319
320 #[test]
321 fn losses_spaced_outside_window_never_trip() {
322 let (mut cb, clock) = breaker(3, 3600, 600, 1_000_000);
326 cb.record_loss();
327 clock.advance_secs(3_700);
328 cb.record_loss();
329 clock.advance_secs(3_700);
330 cb.record_loss();
331 assert!(!cb.is_tripped());
332 assert_eq!(cb.recent_loss_count(), 1);
333 }
334
335 #[test]
336 fn snapshot_restore_preserves_tripped_state() {
337 let (mut cb, clock) = breaker(2, 14_400, 3_600, 1_000_000);
338 cb.record_loss();
339 cb.record_loss();
340 assert!(cb.is_tripped());
341 let snap = cb.snapshot();
342 assert_eq!(snap.recent_losses.len(), 2);
343 assert_eq!(snap.tripped_at_unix_secs, Some(1_000_000));
344
345 let mut restored = CircuitBreaker::with_clock(cfg(2, 14_400, 3_600), clock.clone());
348 restored.restore(snap.clone());
349 assert!(restored.is_tripped());
350 assert_eq!(restored.recent_loss_count(), 2);
351 assert_eq!(restored.snapshot(), snap);
352 }
353
354 #[test]
355 fn restore_then_tick_resets_after_cooldown_elapsed_during_downtime() {
356 let (mut cb, _clock) = breaker(2, 14_400, 600, 1_000_000);
359 cb.record_loss();
360 cb.record_loss();
361 assert!(cb.is_tripped());
362 let snap = cb.snapshot();
363
364 let later = Arc::new(ManualClock::new(1_000_700));
365 let mut restored = CircuitBreaker::with_clock(cfg(2, 14_400, 600), later);
366 restored.restore(snap);
367 restored.tick();
368 assert!(
369 !restored.is_tripped(),
370 "expired cooldown must reset on tick"
371 );
372 assert_eq!(restored.recent_loss_count(), 0);
373 }
374
375 #[test]
376 fn restore_then_tick_evicts_losses_outside_window() {
377 let (mut cb, _clock) = breaker(5, 3_600, 600, 1_000_000);
380 cb.record_loss();
381 cb.record_loss();
382 let snap = cb.snapshot();
383
384 let later = Arc::new(ManualClock::new(1_004_000));
385 let mut restored = CircuitBreaker::with_clock(cfg(5, 3_600, 600), later);
386 restored.restore(snap);
387 restored.tick();
388 assert_eq!(
389 restored.recent_loss_count(),
390 0,
391 "stale losses must be evicted"
392 );
393 }
394}