1use std::time::{Duration, Instant};
2
3#[derive(Debug, Clone, Copy, PartialEq, Eq)]
5pub struct WindowConfig {
6 pub buckets: usize,
8 pub bucket_duration: Duration,
10}
11
12impl Default for WindowConfig {
13 fn default() -> Self {
14 Self {
15 buckets: 10,
16 bucket_duration: Duration::from_secs(1),
17 }
18 }
19}
20
21#[derive(Debug, Clone, Copy, PartialEq, Eq)]
23pub enum WindowOutcome {
24 Success,
26 Failure,
28 Drop,
30}
31
32#[derive(Debug, Clone, Copy, Default, PartialEq)]
34pub struct WindowSnapshot {
35 pub successes: u64,
37 pub failures: u64,
39 pub drops: u64,
41 pub latency_sum: Duration,
43 pub latency_samples: u64,
45}
46
47impl WindowSnapshot {
48 pub fn total(&self) -> u64 {
50 self.successes + self.failures + self.drops
51 }
52
53 pub fn failure_ratio(&self) -> f64 {
55 ratio(self.failures, self.successes + self.failures)
56 }
57
58 pub fn drop_ratio(&self) -> f64 {
60 ratio(self.drops, self.total())
61 }
62
63 pub fn average_latency(&self) -> Option<Duration> {
65 if self.latency_samples == 0 {
66 return None;
67 }
68
69 Some(Duration::from_secs_f64(
70 self.latency_sum.as_secs_f64() / self.latency_samples as f64,
71 ))
72 }
73}
74
75#[derive(Debug, Clone)]
77pub struct RollingWindow {
78 anchor: Instant,
79 config: WindowConfig,
80 buckets: Vec<WindowBucket>,
81}
82
83impl RollingWindow {
84 pub fn new(config: WindowConfig) -> Self {
86 let config = WindowConfig {
87 buckets: config.buckets.max(1),
88 bucket_duration: if config.bucket_duration.is_zero() {
89 Duration::from_millis(1)
90 } else {
91 config.bucket_duration
92 },
93 };
94 Self {
95 anchor: Instant::now(),
96 config,
97 buckets: vec![WindowBucket::default(); config.buckets],
98 }
99 }
100
101 pub fn record(&mut self, outcome: WindowOutcome) {
103 self.record_at(outcome, Instant::now());
104 }
105
106 pub fn record_with_latency(&mut self, outcome: WindowOutcome, latency: Duration) {
108 self.record_at_with_latency(outcome, latency, Instant::now());
109 }
110
111 pub fn snapshot(&self) -> WindowSnapshot {
113 self.snapshot_at(Instant::now())
114 }
115
116 pub fn max_successes_per_bucket(&self) -> u64 {
118 self.max_successes_per_bucket_at(Instant::now())
119 }
120
121 pub fn min_average_latency(&self) -> Option<Duration> {
123 self.min_average_latency_at(Instant::now())
124 }
125
126 pub(crate) fn record_at(&mut self, outcome: WindowOutcome, now: Instant) {
127 self.record_at_inner(outcome, None, now);
128 }
129
130 pub(crate) fn record_at_with_latency(
131 &mut self,
132 outcome: WindowOutcome,
133 latency: Duration,
134 now: Instant,
135 ) {
136 self.record_at_inner(outcome, Some(latency), now);
137 }
138
139 pub(crate) fn snapshot_at(&self, now: Instant) -> WindowSnapshot {
140 let current_generation = self.generation(now);
141 self.buckets
142 .iter()
143 .filter(|bucket| bucket.is_live(current_generation, self.config.buckets as u64))
144 .fold(WindowSnapshot::default(), |mut snapshot, bucket| {
145 snapshot.successes += bucket.successes;
146 snapshot.failures += bucket.failures;
147 snapshot.drops += bucket.drops;
148 snapshot.latency_sum += bucket.latency_sum;
149 snapshot.latency_samples += bucket.latency_samples;
150 snapshot
151 })
152 }
153
154 pub(crate) fn max_successes_per_bucket_at(&self, now: Instant) -> u64 {
155 let current_generation = self.generation(now);
156 self.buckets
157 .iter()
158 .filter(|bucket| bucket.is_live(current_generation, self.config.buckets as u64))
159 .map(|bucket| bucket.successes)
160 .max()
161 .unwrap_or_default()
162 }
163
164 pub(crate) fn min_average_latency_at(&self, now: Instant) -> Option<Duration> {
165 let current_generation = self.generation(now);
166 self.buckets
167 .iter()
168 .filter(|bucket| bucket.is_live(current_generation, self.config.buckets as u64))
169 .filter_map(WindowBucket::average_latency)
170 .min()
171 }
172
173 fn record_at_inner(&mut self, outcome: WindowOutcome, latency: Option<Duration>, now: Instant) {
174 let generation = self.generation(now);
175 let index = generation as usize % self.config.buckets;
176 let bucket = &mut self.buckets[index];
177 if bucket.generation != Some(generation) {
178 *bucket = WindowBucket {
179 generation: Some(generation),
180 ..WindowBucket::default()
181 };
182 }
183
184 bucket.record(outcome);
185 if let Some(latency) = latency {
186 bucket.latency_sum += latency;
187 bucket.latency_samples += 1;
188 }
189 }
190
191 fn generation(&self, now: Instant) -> u64 {
192 let elapsed = now.saturating_duration_since(self.anchor);
193 let width = self.config.bucket_duration.as_nanos().max(1);
194 (elapsed.as_nanos() / width) as u64
195 }
196}
197
198#[derive(Debug, Clone, Default)]
199struct WindowBucket {
200 generation: Option<u64>,
201 successes: u64,
202 failures: u64,
203 drops: u64,
204 latency_sum: Duration,
205 latency_samples: u64,
206}
207
208impl WindowBucket {
209 fn record(&mut self, outcome: WindowOutcome) {
210 match outcome {
211 WindowOutcome::Success => self.successes += 1,
212 WindowOutcome::Failure => self.failures += 1,
213 WindowOutcome::Drop => self.drops += 1,
214 }
215 }
216
217 fn is_live(&self, current_generation: u64, bucket_count: u64) -> bool {
218 self.generation
219 .is_some_and(|generation| current_generation.saturating_sub(generation) < bucket_count)
220 }
221
222 fn average_latency(&self) -> Option<Duration> {
223 if self.latency_samples == 0 {
224 return None;
225 }
226
227 Some(Duration::from_secs_f64(
228 self.latency_sum.as_secs_f64() / self.latency_samples as f64,
229 ))
230 }
231}
232
233fn ratio(part: u64, total: u64) -> f64 {
234 if total == 0 {
235 0.0
236 } else {
237 part as f64 / total as f64
238 }
239}
240
241#[cfg(test)]
242mod tests {
243 use std::time::{Duration, Instant};
244
245 use super::{RollingWindow, WindowConfig, WindowOutcome};
246
247 #[test]
248 fn rolling_window_aggregates_live_buckets() {
249 let mut window = RollingWindow::new(WindowConfig {
250 buckets: 2,
251 bucket_duration: Duration::from_millis(10),
252 });
253 let now = Instant::now();
254
255 window.record_at(WindowOutcome::Success, now);
256 window.record_at(WindowOutcome::Failure, now + Duration::from_millis(10));
257 window.record_at(WindowOutcome::Drop, now + Duration::from_millis(20));
258
259 let snapshot = window.snapshot_at(now + Duration::from_millis(20));
260 assert_eq!(snapshot.successes, 0);
261 assert_eq!(snapshot.failures, 1);
262 assert_eq!(snapshot.drops, 1);
263 assert_eq!(snapshot.total(), 2);
264 }
265
266 #[test]
267 fn rolling_window_tracks_average_latency() {
268 let mut window = RollingWindow::new(WindowConfig::default());
269 let now = Instant::now();
270
271 window.record_at_with_latency(WindowOutcome::Success, Duration::from_millis(10), now);
272 window.record_at_with_latency(WindowOutcome::Success, Duration::from_millis(30), now);
273
274 assert_eq!(
275 window.snapshot_at(now).average_latency(),
276 Some(Duration::from_millis(20))
277 );
278 }
279
280 #[test]
281 fn rolling_window_reports_max_pass_and_min_latency() {
282 let mut window = RollingWindow::new(WindowConfig {
283 buckets: 2,
284 bucket_duration: Duration::from_millis(10),
285 });
286 let now = Instant::now();
287
288 window.record_at_with_latency(WindowOutcome::Success, Duration::from_millis(40), now);
289 window.record_at_with_latency(WindowOutcome::Success, Duration::from_millis(20), now);
290 window.record_at_with_latency(
291 WindowOutcome::Success,
292 Duration::from_millis(5),
293 now + Duration::from_millis(10),
294 );
295
296 assert_eq!(window.max_successes_per_bucket_at(now), 2);
297 assert_eq!(
298 window.min_average_latency_at(now + Duration::from_millis(10)),
299 Some(Duration::from_millis(5))
300 );
301 }
302}