Skip to main content

high_roller/
rolling_max.rs

1//! # Rolling Max
2//!
3//! A rolling accumulator that tracks the largest value
4//! in a fixed size window.
5//!
6//! - Push is amortized O(1).
7//! - Max is O(1).
8//! - There are no heap allocations.
9//!
10//! Like `std::collections::BinaryHeap`, [`RollingMax`] exposes
11//! a "maximum only" API. A rolling minimum can be found by using
12//! [`core::cmp::Reverse`] within a [`RollingMax`].
13//!
14//! ```
15//! use core::cmp::Reverse;
16//! use high_roller::rolling_max::RollingMax;
17//!
18//! type RollingMin<T, const WINDOW: usize> = RollingMax<Reverse<T>, WINDOW>;
19//! ```
20//!
21//! The example below shows how this might be used to publish
22//! telemetry for the highest latency event among the 100 most
23//! recent samples.
24//!
25//! ```
26//! # use high_roller::rolling_max::RollingMax;
27//! # use rand::Rng;
28//! #
29//! # let stream = (0..1000);
30//! let events = stream.map(|event| network_latency_us(&event));
31//!
32//! let mut window: RollingMax<u32, 100> = RollingMax::new();
33//! for latency in events {
34//!     window.push(latency);
35//!     window.max().copied().map(emit_network_telemetry);
36//! }
37//!
38//! # fn network_latency_us(_: &usize) -> u32 {
39//! #     rand::rng().next_u32()
40//! # }
41//! # fn emit_network_telemetry(max_latency_us: u32) {
42//! #     core::hint::black_box(max_latency_us);
43//! # }
44//! ```
45
46use arraydeque::ArrayDeque;
47
48/// # Rolling Max
49///
50/// Tracks the largest value in a fixed-size window.
51///
52/// # Design
53///
54/// The algorithm for this is well-known but not formalized
55/// anywhere I found easily accessible. The constraint of
56/// accumulating values internally is also a slight divergence
57/// from how this problem is typically presented.
58#[derive(Debug, Default)]
59pub struct RollingMax<T, const WINDOW: usize> {
60    deq: ArrayDeque<T, WINDOW>,
61    ct: usize,
62    // PERF: Expiration could wrap to the window size. In that
63    // case rarely if ever will an expiration exceed u16::MAX.
64    // This incentivizes compressing expirations into a smaller
65    // type than usize. Worth investigating.
66    expires: ArrayDeque<usize, WINDOW>,
67}
68
69impl<T, const W: usize> RollingMax<T, W>
70where
71    T: PartialOrd,
72{
73    /// Constructs a new empty [`RollingMax`].
74    ///
75    /// This type is stored entirely on the stack, so be aware of
76    /// window size. Boxing might be a good idea. Doing so yourself
77    /// enables cache-friendlier patterns than if each [`RollingMax`]
78    /// were unconditionally allocated on the heap.
79    ///
80    /// ```
81    /// use core::cmp::Reverse;
82    /// use high_roller::rolling_max::RollingMax;
83    ///
84    /// const WINDOW: usize = 6000;
85    ///
86    /// #[derive(Default)]
87    /// struct MyTelemetry {
88    ///     max_latency: RollingMax<u32, WINDOW>,
89    ///     min_latency: RollingMax<Reverse<u32>, WINDOW>,
90    ///     largest_batch: RollingMax<usize, WINDOW>
91    /// }
92    ///
93    /// // `MyTelemetry` is too big to live comfortably on the stack.
94    /// // But keeping everything in one allocation may yield friendlier
95    /// // cache access patterns.
96    /// const _: () = assert!(core::mem::size_of::<MyTelemetry>() == 240120);
97    /// let _telemetry = Box::new(MyTelemetry::default());
98    /// ```
99    #[must_use]
100    pub const fn new() -> Self {
101        Self {
102            deq: ArrayDeque::new(),
103            expires: ArrayDeque::new(),
104            ct: 0,
105        }
106    }
107
108    /// Adds an entry to the rolling window. If the window is full,
109    /// the oldest member is evicted.
110    ///
111    /// ```
112    /// use high_roller::rolling_max::RollingMax;
113    ///
114    /// let mut window: RollingMax<usize, 3> = RollingMax::new();
115    ///
116    /// window.push(usize::MAX);
117    /// window.push(1);
118    /// window.push(2);
119    ///
120    /// assert_eq!(window.max().copied(), Some(usize::MAX));
121    ///
122    /// // Evicts the first entry, which was previously the max.
123    /// window.push(0);
124    /// assert_eq!(window.max().copied(), Some(2));
125    /// ```
126    //
127    // Clippy allow:
128    //
129    // Expect is used in this function to guarantee invariants.
130    // See the note within the function.
131    //
132    // It should never panic in user code. So exposing or documenting
133    // the failure case makes the API unnecessarily leaky.
134    #[allow(clippy::expect_used)]
135    #[allow(clippy::missing_panics_doc)]
136    pub fn push(&mut self, entry: T) {
137        self.ct = self.ct.wrapping_add(1);
138
139        while self
140            .expires
141            .front()
142            .is_some_and(|&exp| self.ct.wrapping_sub(exp) <= W)
143        {
144            self.deq.pop_front();
145            self.expires.pop_front();
146        }
147
148        while self.deq.back().is_some_and(|tail| tail <= &entry) {
149            self.deq.pop_back();
150            self.expires.pop_back();
151        }
152
153        // The first loop pops any entry whose expiration equals
154        // or exceeds W. So every entry in the queue has a nonzero
155        // expiration less than W. The queue has capacity W. So the
156        // queue is guaranteed to have at least one spot available.
157        // The calls to `expect` below check this invariant.
158        self.deq
159            .push_back(entry)
160            .expect("expirations guarantee queue is never full at this point");
161        self.expires
162            .push_back(self.ct.wrapping_add(W))
163            .expect("expirations guarantee queue is never full at this point");
164    }
165
166    /// Returns the maximum entry in the window or [`None`] if
167    /// the window is empty.
168    ///
169    /// ```
170    /// use high_roller::rolling_max::RollingMax;
171    ///
172    /// let mut window: RollingMax<usize, 3> = RollingMax::new();
173    /// assert_eq!(window.max(), None);
174    ///
175    /// window.push(5);
176    /// // Once any entry has been pushed, max will never again
177    /// // return None.
178    /// assert_eq!(window.max().copied(), Some(5));
179    /// ```
180    #[must_use]
181    pub fn max(&self) -> Option<&T> {
182        self.deq.front()
183    }
184}
185
186#[cfg(test)]
187pub mod for_tests {
188    use arraydeque::ArrayDeque;
189    use arraydeque::Wrapping;
190
191    /// The simple and easy implementation of RollingMax.
192    /// Useful for verifying correctness and performance
193    /// characteristics. This crate's `RollingMax` is totally
194    /// useless if this meets your needs.
195    #[derive(Default)]
196    pub struct NaiveRollingMax<T, const W: usize> {
197        deq: ArrayDeque<T, W, Wrapping>,
198    }
199
200    impl<T, const W: usize> NaiveRollingMax<T, W>
201    where
202        T: Ord,
203    {
204        #[must_use]
205        pub const fn new() -> Self {
206            Self {
207                deq: ArrayDeque::new(),
208            }
209        }
210
211        pub fn push(&mut self, entry: T) {
212            self.deq.push_back(entry);
213        }
214
215        pub fn max(&self) -> Option<&T> {
216            self.deq.iter().max()
217        }
218    }
219}
220
221#[cfg(test)]
222#[allow(clippy::unwrap_used)]
223mod tests {
224    use super::*;
225    use crate::decimal::D4;
226    use crate::rolling_max::for_tests::NaiveRollingMax;
227
228    use core::fmt::Debug;
229    use rand::distr::Uniform;
230    use rand::rngs::SmallRng;
231    use rand::RngExt;
232    use rand::SeedableRng;
233
234    /// Smoke test for RollingMax correctness.
235    ///
236    /// Accumulates a representative RollingMax and NaiveRollingMax
237    /// to verify their outputs are identical.
238    #[test]
239    fn rng_with_naive() {
240        const QLEN: usize = 1000;
241        const STREAM_LEN: usize = 100_000;
242
243        let sample =
244            SmallRng::seed_from_u64(75).sample_iter(Uniform::new(-65.535, 65.535).unwrap());
245        let mut roller = RollingMax::<D4, QLEN>::new();
246        let mut naive = NaiveRollingMax::<D4, QLEN>::new();
247
248        for val in sample.take(STREAM_LEN) {
249            let d4 = D4::cast(val);
250            roller.push(d4);
251            naive.push(d4);
252            assert_eq!(roller.max(), naive.max());
253        }
254    }
255
256    /// Verifies the minimum behavior required for RollingMax to do
257    /// anything useful.
258    #[test]
259    fn basic_math() {
260        let mut rm: RollingMax<i32, 3> = RollingMax::new();
261        assert_eq!(rm.max(), None);
262
263        rm.push(42);
264        assert_eq!(rm.max(), Some(&42));
265    }
266
267    /// Rolling window behavior in different conditions.
268    #[test]
269    fn basic_rolling() {
270        // Window = 1 is just a fancy variable.
271        expect_max::<i32, 1>([3, 1, 4, 1, 5].into_iter().zip([3, 1, 4, 1, 5]));
272
273        // No evictions.
274        expect_max::<i32, 10>([2, 4, 1].into_iter().zip([2, 4, 4]));
275        expect_max::<i32, 5>([1, 3, 2, 5, 4].into_iter().zip([1, 3, 3, 5, 5]));
276
277        // Basic eviction.
278        expect_max::<i32, 3>([1, 3, 1, 2, 0, 5].into_iter().zip([1, 3, 3, 3, 2, 5]));
279
280        // Monotonic
281        expect_max::<i32, 3>([1, 2, 3, 4, 5].into_iter().zip([1, 2, 3, 4, 5]));
282        expect_max::<i32, 3>([5, 4, 3, 2, 1].into_iter().zip([5, 5, 5, 4, 3]));
283
284        // All equal
285        expect_max::<i32, 3>([7i32; 6].into_iter().zip([7; 6]));
286
287        // Negatives
288        expect_max::<i32, 2>([-3, -1, -4, -1, -5].into_iter().zip([-3, -1, -1, -1, -1]));
289
290        // Floats
291        expect_max::<f32, 2>(
292            [1.0, 3.0, 2.0, 5.0, 4.0]
293                .into_iter()
294                .zip([1.0, 3.0, 3.0, 5.0, 5.0]),
295        );
296    }
297
298    /// Play by play view of max getting evicted.
299    #[test]
300    fn rollover() {
301        let mut rm = RollingMax::<i32, 3>::new();
302        rm.push(99);
303        rm.push(1);
304        rm.push(1);
305        assert_eq!(rm.max(), Some(&99));
306        rm.push(1);
307        assert_eq!(rm.max(), Some(&1));
308    }
309
310    /// Still works when the expiration counter wraps.
311    #[test]
312    fn exp_ct_wraps() {
313        let mut rm: RollingMax<i32, 3> = RollingMax {
314            deq: ArrayDeque::new(),
315            expires: ArrayDeque::new(),
316            ct: usize::MAX - 3,
317        };
318
319        rm.push(10);
320        rm.push(5);
321        rm.push(8);
322        assert_eq!(rm.max(), Some(&10));
323
324        rm.push(6);
325        assert_eq!(rm.max(), Some(&8));
326
327        rm.push(7);
328        assert_eq!(rm.max(), Some(&8));
329
330        rm.push(9);
331        assert_eq!(rm.max(), Some(&9));
332    }
333
334    /// Feeds inputs from an `(input, expected)` iterator into
335    /// a RollingMax. Compares each max to `expected` and panics
336    /// if they're not equal.
337    #[allow(clippy::unwrap_used)]
338    fn expect_max<T, const WINDOW: usize>(input_and_expected: impl Iterator<Item = (T, T)>)
339    where
340        T: PartialOrd + Copy + Debug + PartialEq,
341    {
342        let mut rm: RollingMax<T, WINDOW> = RollingMax::new();
343        for (input, expected) in input_and_expected {
344            rm.push(input);
345            assert_eq!(*rm.max().unwrap(), expected);
346        }
347    }
348}