circuitbreaker_rs/
metrics.rs1use parking_lot::Mutex;
4use smallvec::SmallVec;
5use std::sync::atomic::{AtomicU64, Ordering};
6use std::time::{Duration, Instant};
7
8pub trait MetricSink: Send + Sync + 'static {
10 fn record_state_transition(&self, from: &str, to: &str);
12
13 fn record_error_rate(&self, rate: f64);
15
16 fn record_probe_attempt(&self, success: bool);
18
19 fn record_call(&self, success: bool, duration: Duration);
21}
22
23pub struct NullMetricSink;
25
26impl MetricSink for NullMetricSink {
27 fn record_state_transition(&self, _from: &str, _to: &str) {}
28 fn record_error_rate(&self, _rate: f64) {}
29 fn record_probe_attempt(&self, _success: bool) {}
30 fn record_call(&self, _success: bool, _duration: Duration) {}
31}
32
33#[derive(Debug)]
35pub struct BreakerStats {
36 success_count: AtomicU64,
37 failure_count: AtomicU64,
38 consecutive_failures: AtomicU64,
39 consecutive_successes: AtomicU64,
40 last_failure_time: Mutex<Option<Instant>>,
41 last_success_time: Mutex<Option<Instant>>,
42 total_calls: AtomicU64,
43}
44
45impl Default for BreakerStats {
46 fn default() -> Self {
47 Self::new()
48 }
49}
50
51impl BreakerStats {
52 pub fn new() -> Self {
54 Self {
55 success_count: AtomicU64::new(0),
56 failure_count: AtomicU64::new(0),
57 consecutive_failures: AtomicU64::new(0),
58 consecutive_successes: AtomicU64::new(0),
59 last_failure_time: Mutex::new(None),
60 last_success_time: Mutex::new(None),
61 total_calls: AtomicU64::new(0),
62 }
63 }
64
65 pub fn get_success_count(&self) -> u64 {
67 self.success_count
68 .load(std::sync::atomic::Ordering::Relaxed)
69 }
70
71 pub fn get_failure_count(&self) -> u64 {
73 self.failure_count
74 .load(std::sync::atomic::Ordering::Relaxed)
75 }
76
77 pub fn get_total_calls(&self) -> u64 {
79 self.total_calls.load(std::sync::atomic::Ordering::Relaxed)
80 }
81
82 pub fn get_last_failure_time(&self) -> Option<Instant> {
84 *self.last_failure_time.lock()
85 }
86
87 pub fn record_success(&self) {
89 self.success_count.fetch_add(1, Ordering::Relaxed);
90 self.consecutive_successes.fetch_add(1, Ordering::Relaxed);
91 self.consecutive_failures.store(0, Ordering::Relaxed);
92 self.total_calls.fetch_add(1, Ordering::Relaxed);
93 *self.last_success_time.lock() = Some(Instant::now());
94 }
95
96 pub fn record_failure(&self) {
98 self.failure_count.fetch_add(1, Ordering::Relaxed);
99 self.consecutive_failures.fetch_add(1, Ordering::Relaxed);
100 self.consecutive_successes.store(0, Ordering::Relaxed);
101 self.total_calls.fetch_add(1, Ordering::Relaxed);
102 *self.last_failure_time.lock() = Some(Instant::now());
103 }
104
105 pub fn error_rate(&self) -> f64 {
107 let failures = self.failure_count.load(Ordering::Relaxed);
108 let total = self.total_calls.load(Ordering::Relaxed);
109
110 if total == 0 {
111 return 0.0;
112 }
113
114 failures as f64 / total as f64
115 }
116
117 pub fn consecutive_failures(&self) -> u64 {
119 self.consecutive_failures.load(Ordering::Relaxed)
120 }
121
122 pub fn consecutive_successes(&self) -> u64 {
124 self.consecutive_successes.load(Ordering::Relaxed)
125 }
126
127 pub fn reset(&self) {
129 self.success_count.store(0, Ordering::Relaxed);
130 self.failure_count.store(0, Ordering::Relaxed);
131 self.consecutive_failures.store(0, Ordering::Relaxed);
132 self.consecutive_successes.store(0, Ordering::Relaxed);
133 self.total_calls.store(0, Ordering::Relaxed);
134 *self.last_failure_time.lock() = None;
135 *self.last_success_time.lock() = None;
136 }
137}
138
139pub struct FixedWindow {
141 buckets: Mutex<SmallVec<[(Instant, u64, u64); 16]>>, window_size: Duration,
143 bucket_size: Duration,
144}
145
146impl FixedWindow {
147 pub fn new(window_size: Duration, bucket_count: usize) -> Self {
149 let bucket_size = window_size / bucket_count as u32;
150 Self {
151 buckets: Mutex::new(SmallVec::new()),
152 window_size,
153 bucket_size,
154 }
155 }
156
157 pub fn record_success(&self) {
159 let mut buckets = self.buckets.lock();
160 self.clean_old_buckets(&mut buckets);
161
162 let now = Instant::now();
163 if let Some(bucket) = buckets.last_mut() {
164 if now.duration_since(bucket.0) < self.bucket_size {
165 bucket.1 += 1;
166 return;
167 }
168 }
169
170 buckets.push((now, 1, 0));
171 }
172
173 pub fn record_failure(&self) {
175 let mut buckets = self.buckets.lock();
176 self.clean_old_buckets(&mut buckets);
177
178 let now = Instant::now();
179 if let Some(bucket) = buckets.last_mut() {
180 if now.duration_since(bucket.0) < self.bucket_size {
181 bucket.2 += 1;
182 return;
183 }
184 }
185
186 buckets.push((now, 0, 1));
187 }
188
189 pub fn error_rate(&self) -> f64 {
191 let mut buckets = self.buckets.lock();
192 self.clean_old_buckets(&mut buckets);
193
194 let mut total_success = 0;
195 let mut total_failure = 0;
196
197 for (_, successes, failures) in buckets.iter() {
198 total_success += successes;
199 total_failure += failures;
200 }
201
202 let total = total_success + total_failure;
203 if total == 0 {
204 return 0.0;
205 }
206
207 total_failure as f64 / total as f64
208 }
209
210 fn clean_old_buckets(&self, buckets: &mut SmallVec<[(Instant, u64, u64); 16]>) {
211 let now = Instant::now();
212 let cutoff = now - self.window_size;
213
214 while let Some(bucket) = buckets.first() {
215 if bucket.0 < cutoff {
216 buckets.remove(0);
217 } else {
218 break;
219 }
220 }
221 }
222}
223
224pub struct EMAWindow {
226 error_rate: AtomicU64, alpha: f64,
228 calls_required: u64,
229 call_count: AtomicU64,
230}
231
232impl EMAWindow {
233 pub fn new(alpha: f64, calls_required: u64) -> Self {
235 Self {
236 error_rate: AtomicU64::new(0),
237 alpha,
238 calls_required,
239 call_count: AtomicU64::new(0),
240 }
241 }
242
243 pub fn record_success(&self) {
245 self.call_count.fetch_add(1, Ordering::Relaxed);
246
247 if self.call_count.load(Ordering::Relaxed) < self.calls_required {
248 return;
249 }
250
251 let current = f64::from_bits(self.error_rate.load(Ordering::Relaxed));
252 let new = current * (1.0 - self.alpha);
253 self.error_rate.store(new.to_bits(), Ordering::Relaxed);
254 }
255
256 pub fn record_failure(&self) {
258 self.call_count.fetch_add(1, Ordering::Relaxed);
259
260 if self.call_count.load(Ordering::Relaxed) < self.calls_required {
261 return;
262 }
263
264 let current = f64::from_bits(self.error_rate.load(Ordering::Relaxed));
265 let new = current * (1.0 - self.alpha) + self.alpha;
266 self.error_rate.store(new.to_bits(), Ordering::Relaxed);
267 }
268
269 pub fn error_rate(&self) -> f64 {
271 if self.call_count.load(Ordering::Relaxed) < self.calls_required {
272 return 0.0;
273 }
274
275 f64::from_bits(self.error_rate.load(Ordering::Relaxed))
276 }
277}