Skip to main content

rs_zero/resil/
window.rs

1use std::time::{Duration, Instant};
2
3/// Sliding window configuration.
4#[derive(Debug, Clone, Copy, PartialEq, Eq)]
5pub struct WindowConfig {
6    /// Number of buckets kept in the rolling window.
7    pub buckets: usize,
8    /// Duration represented by each bucket.
9    pub bucket_duration: Duration,
10}
11
12impl Default for WindowConfig {
13    fn default() -> Self {
14        Self {
15            buckets: 10,
16            bucket_duration: Duration::from_secs(1),
17        }
18    }
19}
20
21/// Outcome recorded into a rolling window.
22#[derive(Debug, Clone, Copy, PartialEq, Eq)]
23pub enum WindowOutcome {
24    /// The protected operation completed successfully.
25    Success,
26    /// The protected operation reached the backend and failed.
27    Failure,
28    /// The protection layer rejected the operation before backend execution.
29    Drop,
30}
31
32/// Aggregated view over the live buckets in a rolling window.
33#[derive(Debug, Clone, Copy, Default, PartialEq)]
34pub struct WindowSnapshot {
35    /// Successful operation count.
36    pub successes: u64,
37    /// Failed operation count.
38    pub failures: u64,
39    /// Dropped operation count.
40    pub drops: u64,
41    /// Sum of recorded successful latencies.
42    pub latency_sum: Duration,
43    /// Number of samples contributing to [`Self::latency_sum`].
44    pub latency_samples: u64,
45}
46
47impl WindowSnapshot {
48    /// Total number of recorded outcomes.
49    pub fn total(&self) -> u64 {
50        self.successes + self.failures + self.drops
51    }
52
53    /// Failure ratio in the range `[0.0, 1.0]`.
54    pub fn failure_ratio(&self) -> f64 {
55        ratio(self.failures, self.successes + self.failures)
56    }
57
58    /// Drop ratio in the range `[0.0, 1.0]`.
59    pub fn drop_ratio(&self) -> f64 {
60        ratio(self.drops, self.total())
61    }
62
63    /// Average latency for samples recorded with latency.
64    pub fn average_latency(&self) -> Option<Duration> {
65        if self.latency_samples == 0 {
66            return None;
67        }
68
69        Some(Duration::from_secs_f64(
70            self.latency_sum.as_secs_f64() / self.latency_samples as f64,
71        ))
72    }
73}
74
75/// Fixed-bucket sliding window for resilience decisions.
76#[derive(Debug, Clone)]
77pub struct RollingWindow {
78    anchor: Instant,
79    config: WindowConfig,
80    buckets: Vec<WindowBucket>,
81}
82
83impl RollingWindow {
84    /// Creates a rolling window from configuration.
85    pub fn new(config: WindowConfig) -> Self {
86        let config = WindowConfig {
87            buckets: config.buckets.max(1),
88            bucket_duration: if config.bucket_duration.is_zero() {
89                Duration::from_millis(1)
90            } else {
91                config.bucket_duration
92            },
93        };
94        Self {
95            anchor: Instant::now(),
96            config,
97            buckets: vec![WindowBucket::default(); config.buckets],
98        }
99    }
100
101    /// Records an outcome at the current instant.
102    pub fn record(&mut self, outcome: WindowOutcome) {
103        self.record_at(outcome, Instant::now());
104    }
105
106    /// Records an outcome with a latency sample at the current instant.
107    pub fn record_with_latency(&mut self, outcome: WindowOutcome, latency: Duration) {
108        self.record_at_with_latency(outcome, latency, Instant::now());
109    }
110
111    /// Returns an aggregate over all live buckets.
112    pub fn snapshot(&self) -> WindowSnapshot {
113        self.snapshot_at(Instant::now())
114    }
115
116    /// Returns the maximum successful operations observed in one live bucket.
117    pub fn max_successes_per_bucket(&self) -> u64 {
118        self.max_successes_per_bucket_at(Instant::now())
119    }
120
121    /// Returns the minimum average latency observed in one live bucket.
122    pub fn min_average_latency(&self) -> Option<Duration> {
123        self.min_average_latency_at(Instant::now())
124    }
125
126    pub(crate) fn record_at(&mut self, outcome: WindowOutcome, now: Instant) {
127        self.record_at_inner(outcome, None, now);
128    }
129
130    pub(crate) fn record_at_with_latency(
131        &mut self,
132        outcome: WindowOutcome,
133        latency: Duration,
134        now: Instant,
135    ) {
136        self.record_at_inner(outcome, Some(latency), now);
137    }
138
139    pub(crate) fn snapshot_at(&self, now: Instant) -> WindowSnapshot {
140        let current_generation = self.generation(now);
141        self.buckets
142            .iter()
143            .filter(|bucket| bucket.is_live(current_generation, self.config.buckets as u64))
144            .fold(WindowSnapshot::default(), |mut snapshot, bucket| {
145                snapshot.successes += bucket.successes;
146                snapshot.failures += bucket.failures;
147                snapshot.drops += bucket.drops;
148                snapshot.latency_sum += bucket.latency_sum;
149                snapshot.latency_samples += bucket.latency_samples;
150                snapshot
151            })
152    }
153
154    pub(crate) fn max_successes_per_bucket_at(&self, now: Instant) -> u64 {
155        let current_generation = self.generation(now);
156        self.buckets
157            .iter()
158            .filter(|bucket| bucket.is_live(current_generation, self.config.buckets as u64))
159            .map(|bucket| bucket.successes)
160            .max()
161            .unwrap_or_default()
162    }
163
164    pub(crate) fn min_average_latency_at(&self, now: Instant) -> Option<Duration> {
165        let current_generation = self.generation(now);
166        self.buckets
167            .iter()
168            .filter(|bucket| bucket.is_live(current_generation, self.config.buckets as u64))
169            .filter_map(WindowBucket::average_latency)
170            .min()
171    }
172
173    fn record_at_inner(&mut self, outcome: WindowOutcome, latency: Option<Duration>, now: Instant) {
174        let generation = self.generation(now);
175        let index = generation as usize % self.config.buckets;
176        let bucket = &mut self.buckets[index];
177        if bucket.generation != Some(generation) {
178            *bucket = WindowBucket {
179                generation: Some(generation),
180                ..WindowBucket::default()
181            };
182        }
183
184        bucket.record(outcome);
185        if let Some(latency) = latency {
186            bucket.latency_sum += latency;
187            bucket.latency_samples += 1;
188        }
189    }
190
191    fn generation(&self, now: Instant) -> u64 {
192        let elapsed = now.saturating_duration_since(self.anchor);
193        let width = self.config.bucket_duration.as_nanos().max(1);
194        (elapsed.as_nanos() / width) as u64
195    }
196}
197
198#[derive(Debug, Clone, Default)]
199struct WindowBucket {
200    generation: Option<u64>,
201    successes: u64,
202    failures: u64,
203    drops: u64,
204    latency_sum: Duration,
205    latency_samples: u64,
206}
207
208impl WindowBucket {
209    fn record(&mut self, outcome: WindowOutcome) {
210        match outcome {
211            WindowOutcome::Success => self.successes += 1,
212            WindowOutcome::Failure => self.failures += 1,
213            WindowOutcome::Drop => self.drops += 1,
214        }
215    }
216
217    fn is_live(&self, current_generation: u64, bucket_count: u64) -> bool {
218        self.generation
219            .is_some_and(|generation| current_generation.saturating_sub(generation) < bucket_count)
220    }
221
222    fn average_latency(&self) -> Option<Duration> {
223        if self.latency_samples == 0 {
224            return None;
225        }
226
227        Some(Duration::from_secs_f64(
228            self.latency_sum.as_secs_f64() / self.latency_samples as f64,
229        ))
230    }
231}
232
233fn ratio(part: u64, total: u64) -> f64 {
234    if total == 0 {
235        0.0
236    } else {
237        part as f64 / total as f64
238    }
239}
240
241#[cfg(test)]
242mod tests {
243    use std::time::{Duration, Instant};
244
245    use super::{RollingWindow, WindowConfig, WindowOutcome};
246
247    #[test]
248    fn rolling_window_aggregates_live_buckets() {
249        let mut window = RollingWindow::new(WindowConfig {
250            buckets: 2,
251            bucket_duration: Duration::from_millis(10),
252        });
253        let now = Instant::now();
254
255        window.record_at(WindowOutcome::Success, now);
256        window.record_at(WindowOutcome::Failure, now + Duration::from_millis(10));
257        window.record_at(WindowOutcome::Drop, now + Duration::from_millis(20));
258
259        let snapshot = window.snapshot_at(now + Duration::from_millis(20));
260        assert_eq!(snapshot.successes, 0);
261        assert_eq!(snapshot.failures, 1);
262        assert_eq!(snapshot.drops, 1);
263        assert_eq!(snapshot.total(), 2);
264    }
265
266    #[test]
267    fn rolling_window_tracks_average_latency() {
268        let mut window = RollingWindow::new(WindowConfig::default());
269        let now = Instant::now();
270
271        window.record_at_with_latency(WindowOutcome::Success, Duration::from_millis(10), now);
272        window.record_at_with_latency(WindowOutcome::Success, Duration::from_millis(30), now);
273
274        assert_eq!(
275            window.snapshot_at(now).average_latency(),
276            Some(Duration::from_millis(20))
277        );
278    }
279
280    #[test]
281    fn rolling_window_reports_max_pass_and_min_latency() {
282        let mut window = RollingWindow::new(WindowConfig {
283            buckets: 2,
284            bucket_duration: Duration::from_millis(10),
285        });
286        let now = Instant::now();
287
288        window.record_at_with_latency(WindowOutcome::Success, Duration::from_millis(40), now);
289        window.record_at_with_latency(WindowOutcome::Success, Duration::from_millis(20), now);
290        window.record_at_with_latency(
291            WindowOutcome::Success,
292            Duration::from_millis(5),
293            now + Duration::from_millis(10),
294        );
295
296        assert_eq!(window.max_successes_per_bucket_at(now), 2);
297        assert_eq!(
298            window.min_average_latency_at(now + Duration::from_millis(10)),
299            Some(Duration::from_millis(5))
300        );
301    }
302}