circuitbreaker_rs/
metrics.rs

1//! Failure tracking and metrics for circuit breaker.
2
3use parking_lot::Mutex;
4use smallvec::SmallVec;
5use std::sync::atomic::{AtomicU64, Ordering};
6use std::time::{Duration, Instant};
7
8/// Trait for metrics sinks that can receive circuit breaker events.
9pub trait MetricSink: Send + Sync + 'static {
10    /// Records a state transition event.
11    fn record_state_transition(&self, from: &str, to: &str);
12
13    /// Records an error rate change.
14    fn record_error_rate(&self, rate: f64);
15
16    /// Records a probe attempt.
17    fn record_probe_attempt(&self, success: bool);
18
19    /// Records a call result.
20    fn record_call(&self, success: bool, duration: Duration);
21}
22
23/// A null metrics sink that discards all events.
24pub struct NullMetricSink;
25
26impl MetricSink for NullMetricSink {
27    fn record_state_transition(&self, _from: &str, _to: &str) {}
28    fn record_error_rate(&self, _rate: f64) {}
29    fn record_probe_attempt(&self, _success: bool) {}
30    fn record_call(&self, _success: bool, _duration: Duration) {}
31}
32
33/// Statistics for the circuit breaker.
34#[derive(Debug)]
35pub struct BreakerStats {
36    success_count: AtomicU64,
37    failure_count: AtomicU64,
38    consecutive_failures: AtomicU64,
39    consecutive_successes: AtomicU64,
40    last_failure_time: Mutex<Option<Instant>>,
41    last_success_time: Mutex<Option<Instant>>,
42    total_calls: AtomicU64,
43}
44
45impl Default for BreakerStats {
46    fn default() -> Self {
47        Self::new()
48    }
49}
50
51impl BreakerStats {
52    /// Creates a new empty stats tracker.
53    pub fn new() -> Self {
54        Self {
55            success_count: AtomicU64::new(0),
56            failure_count: AtomicU64::new(0),
57            consecutive_failures: AtomicU64::new(0),
58            consecutive_successes: AtomicU64::new(0),
59            last_failure_time: Mutex::new(None),
60            last_success_time: Mutex::new(None),
61            total_calls: AtomicU64::new(0),
62        }
63    }
64
65    /// Gets the current success count.
66    pub fn get_success_count(&self) -> u64 {
67        self.success_count
68            .load(std::sync::atomic::Ordering::Relaxed)
69    }
70
71    /// Gets the current failure count.
72    pub fn get_failure_count(&self) -> u64 {
73        self.failure_count
74            .load(std::sync::atomic::Ordering::Relaxed)
75    }
76
77    /// Gets the total call count.
78    pub fn get_total_calls(&self) -> u64 {
79        self.total_calls.load(std::sync::atomic::Ordering::Relaxed)
80    }
81
82    /// Gets the last failure time.
83    pub fn get_last_failure_time(&self) -> Option<Instant> {
84        *self.last_failure_time.lock()
85    }
86
87    /// Records a successful call.
88    pub fn record_success(&self) {
89        self.success_count.fetch_add(1, Ordering::Relaxed);
90        self.consecutive_successes.fetch_add(1, Ordering::Relaxed);
91        self.consecutive_failures.store(0, Ordering::Relaxed);
92        self.total_calls.fetch_add(1, Ordering::Relaxed);
93        *self.last_success_time.lock() = Some(Instant::now());
94    }
95
96    /// Records a failed call.
97    pub fn record_failure(&self) {
98        self.failure_count.fetch_add(1, Ordering::Relaxed);
99        self.consecutive_failures.fetch_add(1, Ordering::Relaxed);
100        self.consecutive_successes.store(0, Ordering::Relaxed);
101        self.total_calls.fetch_add(1, Ordering::Relaxed);
102        *self.last_failure_time.lock() = Some(Instant::now());
103    }
104
105    /// Gets the current error rate.
106    pub fn error_rate(&self) -> f64 {
107        let failures = self.failure_count.load(Ordering::Relaxed);
108        let total = self.total_calls.load(Ordering::Relaxed);
109
110        if total == 0 {
111            return 0.0;
112        }
113
114        failures as f64 / total as f64
115    }
116
117    /// Gets the number of consecutive failures.
118    pub fn consecutive_failures(&self) -> u64 {
119        self.consecutive_failures.load(Ordering::Relaxed)
120    }
121
122    /// Gets the number of consecutive successes.
123    pub fn consecutive_successes(&self) -> u64 {
124        self.consecutive_successes.load(Ordering::Relaxed)
125    }
126
127    /// Resets all statistics.
128    pub fn reset(&self) {
129        self.success_count.store(0, Ordering::Relaxed);
130        self.failure_count.store(0, Ordering::Relaxed);
131        self.consecutive_failures.store(0, Ordering::Relaxed);
132        self.consecutive_successes.store(0, Ordering::Relaxed);
133        self.total_calls.store(0, Ordering::Relaxed);
134        *self.last_failure_time.lock() = None;
135        *self.last_success_time.lock() = None;
136    }
137}
138
139/// A time window for tracking failures with fixed buckets.
140pub struct FixedWindow {
141    buckets: Mutex<SmallVec<[(Instant, u64, u64); 16]>>, // (timestamp, successes, failures)
142    window_size: Duration,
143    bucket_size: Duration,
144}
145
146impl FixedWindow {
147    /// Creates a new fixed window tracker.
148    pub fn new(window_size: Duration, bucket_count: usize) -> Self {
149        let bucket_size = window_size / bucket_count as u32;
150        Self {
151            buckets: Mutex::new(SmallVec::new()),
152            window_size,
153            bucket_size,
154        }
155    }
156
157    /// Records a successful call.
158    pub fn record_success(&self) {
159        let mut buckets = self.buckets.lock();
160        self.clean_old_buckets(&mut buckets);
161
162        let now = Instant::now();
163        if let Some(bucket) = buckets.last_mut() {
164            if now.duration_since(bucket.0) < self.bucket_size {
165                bucket.1 += 1;
166                return;
167            }
168        }
169
170        buckets.push((now, 1, 0));
171    }
172
173    /// Records a failed call.
174    pub fn record_failure(&self) {
175        let mut buckets = self.buckets.lock();
176        self.clean_old_buckets(&mut buckets);
177
178        let now = Instant::now();
179        if let Some(bucket) = buckets.last_mut() {
180            if now.duration_since(bucket.0) < self.bucket_size {
181                bucket.2 += 1;
182                return;
183            }
184        }
185
186        buckets.push((now, 0, 1));
187    }
188
189    /// Gets the current error rate in the window.
190    pub fn error_rate(&self) -> f64 {
191        let mut buckets = self.buckets.lock();
192        self.clean_old_buckets(&mut buckets);
193
194        let mut total_success = 0;
195        let mut total_failure = 0;
196
197        for (_, successes, failures) in buckets.iter() {
198            total_success += successes;
199            total_failure += failures;
200        }
201
202        let total = total_success + total_failure;
203        if total == 0 {
204            return 0.0;
205        }
206
207        total_failure as f64 / total as f64
208    }
209
210    fn clean_old_buckets(&self, buckets: &mut SmallVec<[(Instant, u64, u64); 16]>) {
211        let now = Instant::now();
212        let cutoff = now - self.window_size;
213
214        while let Some(bucket) = buckets.first() {
215            if bucket.0 < cutoff {
216                buckets.remove(0);
217            } else {
218                break;
219            }
220        }
221    }
222}
223
224/// A time window for tracking failures with exponential moving average.
225pub struct EMAWindow {
226    error_rate: AtomicU64, // Stored as bits of f64
227    alpha: f64,
228    calls_required: u64,
229    call_count: AtomicU64,
230}
231
232impl EMAWindow {
233    /// Creates a new EMA window tracker.
234    pub fn new(alpha: f64, calls_required: u64) -> Self {
235        Self {
236            error_rate: AtomicU64::new(0),
237            alpha,
238            calls_required,
239            call_count: AtomicU64::new(0),
240        }
241    }
242
243    /// Records a successful call.
244    pub fn record_success(&self) {
245        self.call_count.fetch_add(1, Ordering::Relaxed);
246
247        if self.call_count.load(Ordering::Relaxed) < self.calls_required {
248            return;
249        }
250
251        let current = f64::from_bits(self.error_rate.load(Ordering::Relaxed));
252        let new = current * (1.0 - self.alpha);
253        self.error_rate.store(new.to_bits(), Ordering::Relaxed);
254    }
255
256    /// Records a failed call.
257    pub fn record_failure(&self) {
258        self.call_count.fetch_add(1, Ordering::Relaxed);
259
260        if self.call_count.load(Ordering::Relaxed) < self.calls_required {
261            return;
262        }
263
264        let current = f64::from_bits(self.error_rate.load(Ordering::Relaxed));
265        let new = current * (1.0 - self.alpha) + self.alpha;
266        self.error_rate.store(new.to_bits(), Ordering::Relaxed);
267    }
268
269    /// Gets the current EMA error rate.
270    pub fn error_rate(&self) -> f64 {
271        if self.call_count.load(Ordering::Relaxed) < self.calls_required {
272            return 0.0;
273        }
274
275        f64::from_bits(self.error_rate.load(Ordering::Relaxed))
276    }
277}