high_roller/rolling_max.rs
1use arraydeque::ArrayDeque;
2
3/// A rolling accumulator that tracks the largest value
4/// in a fixed size window.
5///
6/// - Push is O(1)
7/// - Get max is O(1)
8/// - There are no heap allocations.
9///
10/// Like [`std::collections::BinaryHeap`], `RollingMax` exposes
11/// a "maximum only" API. The minimum can be found by using
12/// [`core::cmp::Reverse`].
13///
14/// ```
15/// use core::cmp::Reverse;
16/// use high_roller::rolling_max::RollingMax;
17///
18/// type RollingMin<T, const WINDOW: usize> = RollingMax<Reverse<T>, WINDOW>;
19/// ```
20///
21/// The example below shows how this might be used to publish
22/// telemetry for the highest latency event among the most
23/// recent 100 samples.
24///
25/// ```
26/// use high_roller::rolling_max::RollingMax;
27/// use rand::Rng;
28///
29/// // Assume this is unbounded in reality.
30/// let events = (0..1000).map(|_| network_latency_us());
31///
32/// let mut window: RollingMax<u32, 100> = RollingMax::new();
33/// for latency in events {
34/// window.push(latency);
35/// window.max().copied().map(emit_network_telemetry);
36/// }
37///
38/// fn network_latency_us() -> u32 {
39/// rand::rng().next_u32()
40/// }
41///
42/// fn emit_network_telemetry(max_latency_us: u32) {
43/// core::hint::black_box(max_latency_us);
44/// }
45///
46/// ```
47///
48/// # Design
49///
50/// The algorithm for this is well-known but not formalized
51/// anywhere I found easily accessible. The constraint of
52/// accumulating values internally is also a slight divergence
53/// from how this problem is typically presented. While RollingMax
54/// was motivated by a genuine need in production code, I also
55/// verified it against LeetCode 239, which exercises the same
56/// use case.
57#[derive(Debug, Default)]
58pub struct RollingMax<T, const WINDOW: usize> {
59 deq: ArrayDeque<T, WINDOW>,
60 ct: usize,
61 // PERF: Expiration could wrap to the window size. In that
62 // case rarely if ever will an expiration exceed u16::MAX.
63 // This incentivizes compressing expirations into a smaller
64 // type than usize. Worth investigating.
65 expires: ArrayDeque<usize, WINDOW>,
66}
67
68impl<T, const W: usize> RollingMax<T, W>
69where
70 T: PartialOrd,
71{
72 /// Constructs a new empty [`RollingMax`].
73 ///
74 /// This type is stored entirely on the stack, so be aware of
75 /// window size. Boxing might be a good idea. Doing so yourself
76 /// enables cache-friendlier patterns than if each RollingMax
77 /// were unconditionally allocated on the heap.
78 ///
79 /// ```
80 /// use core::cmp::Reverse;
81 /// use high_roller::rolling_max::RollingMax;
82 ///
83 /// const WINDOW: usize = 6000;
84 ///
85 /// #[derive(Default)]
86 /// struct MyTelemetry {
87 /// max_latency: RollingMax<u32, WINDOW>,
88 /// min_latency: RollingMax<Reverse<u32>, WINDOW>,
89 /// largest_batch: RollingMax<usize, WINDOW>
90 /// }
91 ///
92 /// // `MyTelemetry` is too big to live on the stack. But keeping
93 /// // everything in one allocation may yield friendlier cache
94 /// // access patterns.
95 /// let _telemetry = Box::new(MyTelemetry::default());
96 ///
97 /// ```
98 #[must_use]
99 pub const fn new() -> Self {
100 Self {
101 deq: ArrayDeque::new(),
102 expires: ArrayDeque::new(),
103 ct: 0,
104 }
105 }
106
107 // TODO: docs
108 //
109 // Clippy allow:
110 //
111 // Expect is used in this function to guarantee invariants.
112 // See the note within the function.
113 //
114 // It should never panic in user code. So exposing or documenting
115 // the failure case makes the API unnecessarily leaky.
116 #[allow(clippy::expect_used)]
117 #[allow(clippy::missing_panics_doc)]
118 pub fn push(&mut self, entry: T) {
119 self.ct = self.ct.wrapping_add(1);
120
121 while self
122 .expires
123 .front()
124 .is_some_and(|&exp| self.ct.wrapping_sub(exp) <= W)
125 {
126 self.deq.pop_front();
127 self.expires.pop_front();
128 }
129
130 while self.deq.back().is_some_and(|tail| tail <= &entry) {
131 self.deq.pop_back();
132 self.expires.pop_back();
133 }
134
135 // The first loop pops any entry whose expiration equals
136 // or exceeds W. So every entry in the queue has a nonzero
137 // expiration less than W. The queue has capacity W. So the
138 // queue is guaranteed to have at least one spot available.
139 // The calls to `expect` below check this invariant.
140 self.deq
141 .push_back(entry)
142 .expect("expirations guarantee queue is never full at this point");
143 self.expires
144 .push_back(self.ct.wrapping_add(W))
145 .expect("expirations guarantee queue is never full at this point");
146 }
147
148 // TODO: docs
149 #[must_use]
150 pub fn max(&self) -> Option<&T> {
151 self.deq.front()
152 }
153}
154
155#[cfg(test)]
156pub mod for_tests {
157 use arraydeque::{ArrayDeque, Wrapping};
158
159 /// The simple and easy implementation of RollingMax.
160 /// Useful for verifying correctness and performance
161 /// characteristics. This crate's `RollingMax` is totally
162 /// useless if this meets your needs.
163 #[derive(Default)]
164 pub struct NaiveRollingMax<T, const W: usize> {
165 deq: ArrayDeque<T, W, Wrapping>,
166 }
167
168 impl<T, const W: usize> NaiveRollingMax<T, W>
169 where
170 T: Ord,
171 {
172 #[must_use]
173 pub const fn new() -> Self {
174 Self {
175 deq: ArrayDeque::new(),
176 }
177 }
178
179 pub fn push(&mut self, entry: T) {
180 self.deq.push_back(entry);
181 }
182
183 pub fn max(&self) -> Option<&T> {
184 self.deq.iter().max()
185 }
186 }
187}
188
189#[cfg(test)]
190#[allow(clippy::unwrap_used)]
191mod tests {
192 use super::*;
193 use crate::{decimal::D4, rolling_max::for_tests::NaiveRollingMax};
194
195 use core::fmt::Debug;
196 use rand::{distr::Uniform, rngs::SmallRng, RngExt, SeedableRng};
197
198 /// Smoke test for RollingMax correctness.
199 ///
200 /// Accumulates a representative RollingMax and NaiveRollingMax
201 /// to verify their outputs are identical.
202 #[test]
203 fn rng_with_naive() {
204 const QLEN: usize = 1000;
205 const STREAM_LEN: usize = 100_000;
206
207 let sample =
208 SmallRng::seed_from_u64(75).sample_iter(Uniform::new(-65.535, 65.535).unwrap());
209 let mut roller = RollingMax::<D4, QLEN>::new();
210 let mut naive = NaiveRollingMax::<D4, QLEN>::new();
211
212 for val in sample.take(STREAM_LEN) {
213 let d4 = D4::cast(val);
214 roller.push(d4);
215 naive.push(d4);
216 assert_eq!(roller.max(), naive.max());
217 }
218 }
219
220 /// Verifies the zero-state guarantee: max must be None before any push.
221 #[test]
222 fn max_on_empty_is_none() {
223 let rm: RollingMax<i32, 3> = RollingMax::new();
224 assert_eq!(rm.max(), None);
225 }
226
227 /// A single push must always yield Some, regardless of window size.
228 #[test]
229 fn single_push_yields_some() {
230 let mut rm: RollingMax<i32, 5> = RollingMax::new();
231 rm.push(42);
232 assert_eq!(rm.max(), Some(&42));
233 }
234
235 /// Window=1: every element is its own maximum; exercises the path where
236 /// the entire deque is evicted on every push.
237 #[test]
238 fn window_of_one() {
239 expect_max::<i32, 1>([3, 1, 4, 1, 5].into_iter().zip([3, 1, 4, 1, 5]));
240 }
241
242 /// Window larger than the entire input: tracker never evicts, so the
243 /// running max is monotonically non-decreasing.
244 #[test]
245 fn window_larger_than_input() {
246 expect_max::<i32, 10>([2, 4, 1].into_iter().zip([2, 4, 4]));
247 }
248
249 /// Window exactly equal to input length: global max emerges only after
250 /// the last push.
251 #[test]
252 fn window_equals_input_length() {
253 expect_max::<i32, 5>([1, 3, 2, 5, 4].into_iter().zip([1, 3, 3, 5, 5]));
254 }
255
256 /// Core sliding-window case; this exact sequence caught the off-by-one
257 /// expiry bug where element `3` incorrectly survived into window [1,2,0].
258 #[test]
259 fn sliding_window_canonical() {
260 expect_max::<i32, 3>([1, 3, 1, 2, 0, 5].into_iter().zip([1, 3, 3, 3, 2, 5]));
261 }
262
263 /// Strictly increasing input: the monotone invariant discards every
264 /// predecessor, so the deque always holds exactly one element.
265 #[test]
266 fn strictly_increasing() {
267 expect_max::<i32, 3>([1, 2, 3, 4, 5].into_iter().zip([1, 2, 3, 4, 5]));
268 }
269
270 /// Strictly decreasing input: the oldest value leads the deque and must
271 /// survive until it expires, then yield to the next oldest.
272 #[test]
273 fn strictly_decreasing() {
274 expect_max::<i32, 3>([5, 4, 3, 2, 1].into_iter().zip([5, 5, 5, 4, 3]));
275 }
276
277 /// All-equal input: equal elements are pruned from the back (`<=`), so
278 /// the deque stays bounded and does not grow without limit.
279 #[test]
280 fn all_equal() {
281 expect_max::<i32, 3>([7i32; 6].into_iter().zip([7; 6]));
282 }
283
284 /// Negative values: ensures no implicit assumption about sign or zero.
285 #[test]
286 fn negative_values() {
287 expect_max::<i32, 2>([-3, -1, -4, -1, -5].into_iter().zip([-3, -1, -1, -1, -1]));
288 }
289
290 /// Float input: exercises the PartialOrd bound on a non-Ord type.
291 #[test]
292 fn float_values() {
293 expect_max::<f32, 2>(
294 [1.0, 3.0, 2.0, 5.0, 4.0]
295 .into_iter()
296 .zip([1.0, 3.0, 3.0, 5.0, 5.0]),
297 );
298 }
299
300 /// The maximum must survive exactly `cap` pushes and be gone on the next;
301 /// guards against off-by-one errors at the expiry boundary.
302 #[test]
303 fn max_expires_at_exact_boundary() {
304 let mut rm = RollingMax::<i32, 3>::new();
305 rm.push(99);
306 rm.push(1);
307 rm.push(1);
308 assert_eq!(rm.max(), Some(&99)); // 99 still in [99, 1, 1]
309 rm.push(1);
310 assert_eq!(rm.max(), Some(&1)); // 99 evicted; window is now [1, 1, 1]
311 }
312
313 /// Exercises the `usize` counter wrap-around: pre-seeds `ct` so that
314 /// expiry values cross the `usize::MAX → 0` boundary, verifying that the
315 /// wrapping arithmetic correctly evicts and retains elements.
316 #[test]
317 fn expiry_counter_wrapping() {
318 let mut rm: RollingMax<i32, 3> = RollingMax {
319 deq: ArrayDeque::new(),
320 expires: ArrayDeque::new(),
321 ct: usize::MAX - 3,
322 };
323
324 rm.push(10); // ct = usize::MAX-2, exp = 0 (wraps)
325 rm.push(5); // ct = usize::MAX-1, exp = 1 (wraps)
326 rm.push(8); // ct = usize::MAX, exp = 2 (wraps)
327 assert_eq!(rm.max(), Some(&10)); // window = [10, 5, 8]
328
329 rm.push(6); // ct = 0 (wrap). exp=0 matches ct → evicts 10. window=[5,8,6]
330 assert_eq!(rm.max(), Some(&8));
331
332 rm.push(7); // ct = 1. No expiry yet. Monotone pops 6. window=[8,6,7]
333 assert_eq!(rm.max(), Some(&8));
334
335 rm.push(9); // ct = 2. exp=2 matches ct → evicts 8. Monotone pops 7. window=[6,7,9]
336 assert_eq!(rm.max(), Some(&9));
337 }
338
339 /// Feeds inputs from an `(input, expected)` iterator into
340 /// a RollingMax. Compares each max to `expected` and panics
341 /// if they're not equal.
342 #[allow(clippy::unwrap_used)]
343 fn expect_max<T, const WINDOW: usize>(input_and_expected: impl Iterator<Item = (T, T)>)
344 where
345 T: PartialOrd + Copy + Debug + PartialEq,
346 {
347 let mut rm: RollingMax<T, WINDOW> = RollingMax::new();
348 for (input, expected) in input_and_expected {
349 rm.push(input);
350 assert_eq!(*rm.max().unwrap(), expected);
351 }
352 }
353}