high_roller/rolling_max.rs
1//! # Rolling Max
2//!
3//! A rolling accumulator that tracks the largest value
4//! in a fixed size window.
5//!
6//! - Push is amortized O(1).
7//! - Max is O(1).
8//! - There are no heap allocations.
9//!
10//! Like `std::collections::BinaryHeap`, [`RollingMax`] exposes
11//! a "maximum only" API. A rolling minimum can be found by using
12//! [`core::cmp::Reverse`] within a [`RollingMax`].
13//!
14//! ```
15//! use core::cmp::Reverse;
16//! use high_roller::rolling_max::RollingMax;
17//!
18//! type RollingMin<T, const WINDOW: usize> = RollingMax<Reverse<T>, WINDOW>;
19//! ```
20//!
21//! The example below shows how this might be used to publish
22//! telemetry for the highest latency event among the 100 most
23//! recent samples.
24//!
25//! ```
26//! # use high_roller::rolling_max::RollingMax;
27//! # use rand::Rng;
28//! #
29//! # let stream = (0..1000);
30//! let events = stream.map(|event| network_latency_us(&event));
31//!
32//! let mut window: RollingMax<u32, 100> = RollingMax::new();
33//! for latency in events {
34//! window.push(latency);
35//! window.max().copied().map(emit_network_telemetry);
36//! }
37//!
38//! # fn network_latency_us(_: &usize) -> u32 {
39//! # rand::rng().next_u32()
40//! # }
41//! # fn emit_network_telemetry(max_latency_us: u32) {
42//! # core::hint::black_box(max_latency_us);
43//! # }
44//! ```
45
46use arraydeque::ArrayDeque;
47
48/// # Rolling Max
49///
50/// Tracks the largest value in a fixed-size window.
51///
52/// # Design
53///
54/// The algorithm for this is well-known but not formalized
55/// anywhere I found easily accessible. The constraint of
56/// accumulating values internally is also a slight divergence
57/// from how this problem is typically presented.
58#[derive(Debug, Default)]
59pub struct RollingMax<T, const WINDOW: usize> {
60 deq: ArrayDeque<T, WINDOW>,
61 ct: usize,
62 // PERF: Expiration could wrap to the window size. In that
63 // case rarely if ever will an expiration exceed u16::MAX.
64 // This incentivizes compressing expirations into a smaller
65 // type than usize. Worth investigating.
66 expires: ArrayDeque<usize, WINDOW>,
67}
68
69impl<T, const W: usize> RollingMax<T, W>
70where
71 T: PartialOrd,
72{
73 /// Constructs a new empty [`RollingMax`].
74 ///
75 /// This type is stored entirely on the stack, so be aware of
76 /// window size. Boxing might be a good idea. Doing so yourself
77 /// enables cache-friendlier patterns than if each [`RollingMax`]
78 /// were unconditionally allocated on the heap.
79 ///
80 /// ```
81 /// use core::cmp::Reverse;
82 /// use high_roller::rolling_max::RollingMax;
83 ///
84 /// const WINDOW: usize = 6000;
85 ///
86 /// #[derive(Default)]
87 /// struct MyTelemetry {
88 /// max_latency: RollingMax<u32, WINDOW>,
89 /// min_latency: RollingMax<Reverse<u32>, WINDOW>,
90 /// largest_batch: RollingMax<usize, WINDOW>
91 /// }
92 ///
93 /// // `MyTelemetry` is too big to live comfortably on the stack.
94 /// // But keeping everything in one allocation may yield friendlier
95 /// // cache access patterns.
96 /// const _: () = assert!(core::mem::size_of::<MyTelemetry>() == 240120);
97 /// let _telemetry = Box::new(MyTelemetry::default());
98 /// ```
99 #[must_use]
100 pub const fn new() -> Self {
101 Self {
102 deq: ArrayDeque::new(),
103 expires: ArrayDeque::new(),
104 ct: 0,
105 }
106 }
107
108 /// Adds an entry to the rolling window. If the window is full,
109 /// the oldest member is evicted.
110 ///
111 /// ```
112 /// use high_roller::rolling_max::RollingMax;
113 ///
114 /// let mut window: RollingMax<usize, 3> = RollingMax::new();
115 ///
116 /// window.push(usize::MAX);
117 /// window.push(1);
118 /// window.push(2);
119 ///
120 /// assert_eq!(window.max().copied(), Some(usize::MAX));
121 ///
122 /// // Evicts the first entry, which was previously the max.
123 /// window.push(0);
124 /// assert_eq!(window.max().copied(), Some(2));
125 /// ```
126 //
127 // Clippy allow:
128 //
129 // Expect is used in this function to guarantee invariants.
130 // See the note within the function.
131 //
132 // It should never panic in user code. So exposing or documenting
133 // the failure case makes the API unnecessarily leaky.
134 #[allow(clippy::expect_used)]
135 #[allow(clippy::missing_panics_doc)]
136 pub fn push(&mut self, entry: T) {
137 self.ct = self.ct.wrapping_add(1);
138
139 while self
140 .expires
141 .front()
142 .is_some_and(|&exp| self.ct.wrapping_sub(exp) <= W)
143 {
144 self.deq.pop_front();
145 self.expires.pop_front();
146 }
147
148 while self.deq.back().is_some_and(|tail| tail <= &entry) {
149 self.deq.pop_back();
150 self.expires.pop_back();
151 }
152
153 // The first loop pops any entry whose expiration equals
154 // or exceeds W. So every entry in the queue has a nonzero
155 // expiration less than W. The queue has capacity W. So the
156 // queue is guaranteed to have at least one spot available.
157 // The calls to `expect` below check this invariant.
158 self.deq
159 .push_back(entry)
160 .expect("expirations guarantee queue is never full at this point");
161 self.expires
162 .push_back(self.ct.wrapping_add(W))
163 .expect("expirations guarantee queue is never full at this point");
164 }
165
166 /// Returns the maximum entry in the window or [`None`] if
167 /// the window is empty.
168 ///
169 /// ```
170 /// use high_roller::rolling_max::RollingMax;
171 ///
172 /// let mut window: RollingMax<usize, 3> = RollingMax::new();
173 /// assert_eq!(window.max(), None);
174 ///
175 /// window.push(5);
176 /// // Once any entry has been pushed, max will never again
177 /// // return None.
178 /// assert_eq!(window.max().copied(), Some(5));
179 /// ```
180 #[must_use]
181 pub fn max(&self) -> Option<&T> {
182 self.deq.front()
183 }
184}
185
186#[cfg(test)]
187pub mod for_tests {
188 use arraydeque::ArrayDeque;
189 use arraydeque::Wrapping;
190
191 /// The simple and easy implementation of RollingMax.
192 /// Useful for verifying correctness and performance
193 /// characteristics. This crate's `RollingMax` is totally
194 /// useless if this meets your needs.
195 #[derive(Default)]
196 pub struct NaiveRollingMax<T, const W: usize> {
197 deq: ArrayDeque<T, W, Wrapping>,
198 }
199
200 impl<T, const W: usize> NaiveRollingMax<T, W>
201 where
202 T: Ord,
203 {
204 #[must_use]
205 pub const fn new() -> Self {
206 Self {
207 deq: ArrayDeque::new(),
208 }
209 }
210
211 pub fn push(&mut self, entry: T) {
212 self.deq.push_back(entry);
213 }
214
215 pub fn max(&self) -> Option<&T> {
216 self.deq.iter().max()
217 }
218 }
219}
220
221#[cfg(test)]
222#[allow(clippy::unwrap_used)]
223mod tests {
224 use super::*;
225 use crate::decimal::D4;
226 use crate::rolling_max::for_tests::NaiveRollingMax;
227
228 use core::fmt::Debug;
229 use rand::distr::Uniform;
230 use rand::rngs::SmallRng;
231 use rand::RngExt;
232 use rand::SeedableRng;
233
234 /// Smoke test for RollingMax correctness.
235 ///
236 /// Accumulates a representative RollingMax and NaiveRollingMax
237 /// to verify their outputs are identical.
238 #[test]
239 fn rng_with_naive() {
240 const QLEN: usize = 1000;
241 const STREAM_LEN: usize = 100_000;
242
243 let sample =
244 SmallRng::seed_from_u64(75).sample_iter(Uniform::new(-65.535, 65.535).unwrap());
245 let mut roller = RollingMax::<D4, QLEN>::new();
246 let mut naive = NaiveRollingMax::<D4, QLEN>::new();
247
248 for val in sample.take(STREAM_LEN) {
249 let d4 = D4::cast(val);
250 roller.push(d4);
251 naive.push(d4);
252 assert_eq!(roller.max(), naive.max());
253 }
254 }
255
256 /// Verifies the zero-state guarantee: max must be None before any push.
257 #[test]
258 fn max_on_empty_is_none() {
259 let rm: RollingMax<i32, 3> = RollingMax::new();
260 assert_eq!(rm.max(), None);
261 }
262
263 /// A single push must always yield Some, regardless of window size.
264 #[test]
265 fn single_push_yields_some() {
266 let mut rm: RollingMax<i32, 5> = RollingMax::new();
267 rm.push(42);
268 assert_eq!(rm.max(), Some(&42));
269 }
270
271 /// Window=1: every element is its own maximum; exercises the path where
272 /// the entire deque is evicted on every push.
273 #[test]
274 fn window_of_one() {
275 expect_max::<i32, 1>([3, 1, 4, 1, 5].into_iter().zip([3, 1, 4, 1, 5]));
276 }
277
278 /// Window larger than the entire input: tracker never evicts, so the
279 /// running max is monotonically non-decreasing.
280 #[test]
281 fn window_larger_than_input() {
282 expect_max::<i32, 10>([2, 4, 1].into_iter().zip([2, 4, 4]));
283 }
284
285 /// Window exactly equal to input length: global max emerges only after
286 /// the last push.
287 #[test]
288 fn window_equals_input_length() {
289 expect_max::<i32, 5>([1, 3, 2, 5, 4].into_iter().zip([1, 3, 3, 5, 5]));
290 }
291
292 /// Core sliding-window case; this exact sequence caught the off-by-one
293 /// expiry bug where element `3` incorrectly survived into window [1,2,0].
294 #[test]
295 fn sliding_window_canonical() {
296 expect_max::<i32, 3>([1, 3, 1, 2, 0, 5].into_iter().zip([1, 3, 3, 3, 2, 5]));
297 }
298
299 /// Strictly increasing input: the monotone invariant discards every
300 /// predecessor, so the deque always holds exactly one element.
301 #[test]
302 fn strictly_increasing() {
303 expect_max::<i32, 3>([1, 2, 3, 4, 5].into_iter().zip([1, 2, 3, 4, 5]));
304 }
305
306 /// Strictly decreasing input: the oldest value leads the deque and must
307 /// survive until it expires, then yield to the next oldest.
308 #[test]
309 fn strictly_decreasing() {
310 expect_max::<i32, 3>([5, 4, 3, 2, 1].into_iter().zip([5, 5, 5, 4, 3]));
311 }
312
313 /// All-equal input: equal elements are pruned from the back (`<=`), so
314 /// the deque stays bounded and does not grow without limit.
315 #[test]
316 fn all_equal() {
317 expect_max::<i32, 3>([7i32; 6].into_iter().zip([7; 6]));
318 }
319
320 /// Negative values: ensures no implicit assumption about sign or zero.
321 #[test]
322 fn negative_values() {
323 expect_max::<i32, 2>([-3, -1, -4, -1, -5].into_iter().zip([-3, -1, -1, -1, -1]));
324 }
325
326 /// Float input: exercises the PartialOrd bound on a non-Ord type.
327 #[test]
328 fn float_values() {
329 expect_max::<f32, 2>(
330 [1.0, 3.0, 2.0, 5.0, 4.0]
331 .into_iter()
332 .zip([1.0, 3.0, 3.0, 5.0, 5.0]),
333 );
334 }
335
336 /// The maximum must survive exactly `cap` pushes and be gone on the next;
337 /// guards against off-by-one errors at the expiry boundary.
338 #[test]
339 fn max_expires_at_exact_boundary() {
340 let mut rm = RollingMax::<i32, 3>::new();
341 rm.push(99);
342 rm.push(1);
343 rm.push(1);
344 assert_eq!(rm.max(), Some(&99)); // 99 still in [99, 1, 1]
345 rm.push(1);
346 assert_eq!(rm.max(), Some(&1)); // 99 evicted; window is now [1, 1, 1]
347 }
348
349 /// Exercises the `usize` counter wrap-around: pre-seeds `ct` so that
350 /// expiry values cross the `usize::MAX → 0` boundary, verifying that the
351 /// wrapping arithmetic correctly evicts and retains elements.
352 #[test]
353 fn expiry_counter_wrapping() {
354 let mut rm: RollingMax<i32, 3> = RollingMax {
355 deq: ArrayDeque::new(),
356 expires: ArrayDeque::new(),
357 ct: usize::MAX - 3,
358 };
359
360 rm.push(10); // ct = usize::MAX-2, exp = 0 (wraps)
361 rm.push(5); // ct = usize::MAX-1, exp = 1 (wraps)
362 rm.push(8); // ct = usize::MAX, exp = 2 (wraps)
363 assert_eq!(rm.max(), Some(&10)); // window = [10, 5, 8]
364
365 rm.push(6); // ct = 0 (wrap). exp=0 matches ct → evicts 10. window=[5,8,6]
366 assert_eq!(rm.max(), Some(&8));
367
368 rm.push(7); // ct = 1. No expiry yet. Monotone pops 6. window=[8,6,7]
369 assert_eq!(rm.max(), Some(&8));
370
371 rm.push(9); // ct = 2. exp=2 matches ct → evicts 8. Monotone pops 7. window=[6,7,9]
372 assert_eq!(rm.max(), Some(&9));
373 }
374
375 /// Feeds inputs from an `(input, expected)` iterator into
376 /// a RollingMax. Compares each max to `expected` and panics
377 /// if they're not equal.
378 #[allow(clippy::unwrap_used)]
379 fn expect_max<T, const WINDOW: usize>(input_and_expected: impl Iterator<Item = (T, T)>)
380 where
381 T: PartialOrd + Copy + Debug + PartialEq,
382 {
383 let mut rm: RollingMax<T, WINDOW> = RollingMax::new();
384 for (input, expected) in input_and_expected {
385 rm.push(input);
386 assert_eq!(*rm.max().unwrap(), expected);
387 }
388 }
389}