high-roller 0.4.0

Rolling maximum, minimum, and sum for streams of numerical data.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
//! # Rolling Sum
//!
//! [`RollingSum`] tracks the sum of values in a fixed-size window.
//! When the window is full, pushing a new value evicts the oldest.
//!
//! API guarantees:
//! - Add is O(1).
//! - Total is O(1).
//! - Overflow and underflow are detected and recoverable.
//! - Zero heap allocations.
//!
//! The value of this implementation is error recovery.
//! [`RollingSum`] returns [`None`] as long as the inner sum
//! overflows `T::MAX` or underflows `T::MIN`. And it resumes
//! yielding `Some(&T)` as soon as the inner sum recovers.
//!
//! ```
//! use high_roller::rolling_sum::RollingSum;
//!
//! let mut rsum: RollingSum<i8, 3> = RollingSum::default();
//!
//! // Add an initial value.
//! rsum.add(5);
//! assert_eq!(rsum.total().copied(), Some(5));
//!
//! // Stream of -1s takes over the window.
//! for _ in 0..100 {
//!     rsum.add(-1);
//! }
//! assert_eq!(rsum.total().copied(), Some(-3));
//!
//! // Underflow forces total to None.
//! rsum.add(i8::MIN);
//! assert_eq!(rsum.total(), None);
//!
//! // Balanced back to zero.
//! rsum.add(i8::MAX);
//! rsum.add(1);
//! assert_eq!(rsum.total().copied(), Some(0));
//!
//! // Evicting i8::MIN causes overflow.
//! rsum.add(0);
//! assert_eq!(rsum.total(), None);
//!
//! // Cause multiple overflows.
//! for _ in 0..100 {
//!     rsum.add(i8::MAX);
//! }
//! assert_eq!(rsum.total(), None);
//!
//! // And recover back into an expected range.
//! for _ in 0..100 {
//!     rsum.add(1);
//! }
//! assert_eq!(rsum.total().copied(), Some(3));
//! ```

use arraydeque::ArrayDeque;
use num_traits::CheckedAdd;
use num_traits::CheckedSub;
use num_traits::WrappingAdd;
use num_traits::WrappingSub;

// PERF: using a bitvec or double bitvec for RollingSum on
// applicable types could be a pretty nice space optimization.
// Future project :)

/// Tracks a rolling sum of at most `WINDOW` values.
///
/// This type is stack allocated. However a large window
/// might warrant boxing. Clustering multiple instances
/// in the same allocation might offer better cache
/// access patterns.
///
/// ```
/// use high_roller::rolling_sum::RollingSum;
///
/// struct Average {
///     total: RollingSum<u32, 6000>,
///     samples: RollingSum<u8, 6000>
/// }
///
/// // Probably want to box this.
/// const _: () = assert!(core::mem::size_of::<Average>() == 30064);
/// ```
#[derive(Debug)]
pub struct RollingSum<T, const WINDOW: usize> {
    deq: ArrayDeque<T, WINDOW>,
    total: T,
    zero: T,
    balance: isize,
}

impl<T, const W: usize> Default for RollingSum<T, W>
where
    T: Default,
{
    /// Constructs a new [`RollingSum`] with sum and zero
    /// values equal to `T::default()`.
    fn default() -> Self {
        Self::new(T::default(), T::default())
    }
}

impl<T, const WINDOW: usize> RollingSum<T, WINDOW>
where
    T: Default,
{
    /// Constructs a new [`RollingSum`].
    ///
    /// Prefer using [`RollingSum::default`] for a cleaner API.
    /// `init` is the sum's initial value. `zero` is value separating
    /// positive and negative for this type. While identifying zero is strange,
    /// passing as a parameter avoids requiring a `Zero` trait impl, which isn't
    /// fun for anybody. Zero is required for differentiating overflow
    /// and underflow.
    ///
    /// ```
    /// use high_roller::rolling_sum::RollingSum;
    ///
    /// let _sum: RollingSum::<usize, 10> = RollingSum::new(0, 0);
    /// ```
    ///
    /// # Compile errors
    ///
    /// Using `WINDOW == 0` is a compile-time error.
    ///
    /// ```compile_fail
    /// # use high_roller::rolling_sum::RollingSum;
    /// // This will fail to compile. A rolling sum of capacity 0 is nonsensical.
    /// let _sum: RollingSum::<usize, 0> = RollingSum::default();
    /// ```
    #[must_use]
    pub const fn new(init: T, zero: T) -> Self {
        const { assert!(WINDOW != 0, "RollingSum with WINDOW == 0 is not permitted") };
        Self {
            deq: ArrayDeque::new(),
            total: init,
            balance: 0,
            zero,
        }
    }
}

impl<T, const WINDOW: usize> RollingSum<T, WINDOW>
where
    T: WrappingAdd + WrappingSub + CheckedAdd + CheckedSub + PartialOrd + Copy + Default,
{
    /// Adds `T` to the rolling sum, displacing the oldest
    /// member if the window is full to capacity.
    ///
    /// If adding `T` causes numerical overflow, subsequent
    /// calls to [`RollingSum::total`] will return [`None`] until window
    /// expirations cause underflow commensurate to the overflow.
    ///
    /// ```
    /// use high_roller::rolling_sum::RollingSum;
    ///
    /// let mut rsum: RollingSum<i32, 1000> = RollingSum::default();
    ///
    /// rsum.add(100);
    /// rsum.add(1);
    /// rsum.add(-2);
    ///
    /// assert_eq!(rsum.total().copied(), Some(99));
    /// ```
    ///
    /// # Panics
    ///
    /// This function panics if the [`isize`] signed overflow balance
    /// counter itself overflows. Such a panic is impossible as long
    /// as `WINDOW <= isize::MAX`. And an array of size `isize::MAX`
    /// bytes is fantasy on all current hardware anyway.
    //
    // Clippy allow:
    // Explained inline. This is easily provable, will never occur,
    // and should not be exposed to the user.
    #[allow(clippy::expect_used)]
    #[allow(clippy::missing_panics_doc)]
    pub fn add(&mut self, val: T) {
        if self.deq.is_full() {
            // Construction has a const assertion that WINDOW is not zero.
            // So `is_full` guarantees there's something to pop.
            let popped = self.deq.pop_front().expect(
                "len is equal to capacity, and capacity is nonzero. So an element must exist.",
            );

            let changed = self.total.checked_sub(&popped).is_none();
            self.total = self.total.wrapping_sub(&popped);

            if changed {
                self.balance = self
                    .balance
                    .checked_add(if popped >= self.zero { -1 } else { 1 })
                    .expect("overflow count itself overflowed");
            }
        }

        let changed = self.total.checked_add(&val).is_none();
        self.total = self.total.wrapping_add(&val);

        if changed {
            self.balance = self
                .balance
                .checked_add(if val >= self.zero { 1 } else { -1 })
                .expect("overflow count itself overflowed");
        }

        // The `if` condition above guarantees the deque
        // is not full. So there's space to push a value.
        self.deq.push_back(val).expect("deq is not full");
    }

    /// Returns the accumulated total of all added values that
    /// fit within the rolling window's capacity.
    ///
    /// Returns [`None`] if the window has overflowed. [`RollingSum`]
    /// will resume returning `Some(&T)` when the last element
    /// causing overflow is pushed out of the window.
    ///
    /// ```
    /// use high_roller::rolling_sum::RollingSum;
    ///
    /// let mut rsum: RollingSum<i32, 100> = RollingSum::default();
    ///
    /// rsum.add(i32::MIN);
    /// assert_eq!(rsum.total().copied(), Some(i32::MIN));
    ///
    /// for _ in 0..1000 {
    ///     rsum.add(i32::MIN);
    ///     assert_eq!(rsum.total(), None);
    /// }
    ///
    /// for _ in 0..100 {
    ///     rsum.add(-1);
    /// }
    /// assert_eq!(rsum.total().copied(), Some(-100));
    /// ```
    #[must_use]
    pub fn total(&self) -> Option<&T> {
        (self.balance == 0).then_some(&self.total)
    }
}

#[cfg(test)]
pub mod for_tests {
    use arraydeque::ArrayDeque;
    use arraydeque::Wrapping;
    use num_bigint::BigInt;

    /// A simple implementation satisfying the same API as
    /// this crate's `RollingSum` type. This is used for both
    /// correctness and performance testing.
    ///
    /// See the note in total. This is not as robust against
    /// underflow as RollingSum.
    #[derive(Debug, Default)]
    pub struct NaiveRollingSum<T, const WINDOW: usize> {
        deq: ArrayDeque<T, WINDOW, Wrapping>,
        sum: BigInt,
    }

    impl<T, const WINDOW: usize> NaiveRollingSum<T, WINDOW>
    where
        T: Clone + Default + Into<BigInt> + for<'a> TryFrom<&'a BigInt>,
    {
        #[must_use]
        pub fn new(init: T) -> Self {
            const { assert!(WINDOW != 0, "RollingSum with WINDOW == 0 is not permitted") };
            Self {
                deq: ArrayDeque::new(),
                sum: init.into(),
            }
        }

        // Rationalizing `allow`
        // - This is test code. Expect is used as an assertion. We want
        //   assertions in test code.
        // - Internal correctness assertions shouldn't be part of the public
        //   API. They just make my problem someone else's problem.
        #[allow(clippy::expect_used)]
        #[allow(clippy::missing_panics_doc)]
        pub fn add(&mut self, val: T) {
            self.sum = self
                .sum
                .checked_add(&Into::<BigInt>::into(val.clone()))
                .expect("no bigint overflow");
            if let Some(replaced) = self.deq.push_back(val) {
                self.sum = self
                    .sum
                    .checked_sub(&Into::<BigInt>::into(replaced))
                    .expect("no bigint underflow");
            }
        }

        #[must_use]
        pub fn total(&self) -> Option<T> {
            (&self.sum).try_into().ok()
        }
    }
}

#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
    use super::*;
    use crate::decimal::D1;
    use crate::decimal::D5;
    use crate::rolling_sum::for_tests::NaiveRollingSum;
    use core::fmt::Debug;
    use rand::distr::Uniform;
    use rand::rngs::SmallRng;
    use rand::RngExt;
    use rand::SeedableRng;

    /// Smoke test for RollingSum correctness.
    ///
    /// Accumulates a representative RollingMax and NaiveRollingMax
    /// to verify their outputs are identical.
    #[test]
    fn rng_with_naive() {
        const QLEN: usize = 1000;
        const STREAM_LEN: usize = 100_000;

        let sample = SmallRng::seed_from_u64(57).sample_iter(Uniform::new(-800f32, 800.).unwrap());
        let mut roller = RollingSum::<D5, QLEN>::default();
        let mut naive = NaiveRollingSum::<D5, QLEN>::default();

        for val in sample.take(STREAM_LEN) {
            let d4 = D5::cast(val.into());
            roller.add(d4);
            naive.add(d4);
            assert_eq!(roller.total(), naive.total().as_ref());
        }
    }

    /// If this doesn't work, nothing will.
    #[test]
    fn basic_math() {
        let mut rs: RollingSum<u32, 3> = RollingSum::default();
        assert_eq!(rs.total(), Some(&0u32));

        rs.add(10);
        assert_eq!(rs.total(), Some(&10));
    }

    /// Various behaviors when pushing and popping from the window.
    #[test]
    fn basic_rolling() {
        // Basic accumulation.
        self::expect_total::<u32, 3>([1, 2, 3].into_iter().zip([1, 3, 6]));

        // Old elements are properly evicted from the sum.
        self::expect_total::<u32, 3>([1, 2, 3, 4].into_iter().zip([1, 3, 6, 9]));

        // Test above but with a longer window.
        self::expect_total::<u32, 3>([5, 3, 8, 2, 6].into_iter().zip([5, 8, 16, 13, 16]));

        // Capacity equals one is just a complicated variable.
        self::expect_total::<u32, 1>([5, 3, 9, 1].into_iter().zip([5, 3, 9, 1]));

        // Giant window never has eviction.
        self::expect_total::<u32, 100>([1, 2, 3, 4, 5].into_iter().zip([1, 3, 6, 10, 15]));

        // Negatives work too.
        self::expect_total::<i32, 2>([-3, 5, -2, 4].into_iter().zip([-3, 2, 3, 2]));
    }

    /// Hits the u8 limit exactly and then recovers.
    #[test]
    fn limit_boundary() {
        let mut rs = RollingSum::<u8, 3>::default();
        rs.add(100);
        rs.add(100);
        rs.add(55);
        assert_eq!(rs.total(), Some(&255));
        rs.add(0);
        assert_eq!(rs.total(), Some(&155));
    }

    /// Sum overflows and then recovers.
    #[test]
    fn limit_overflow() {
        let mut rs = RollingSum::<u8, 2>::default();
        rs.add(200);
        assert_eq!(rs.total(), Some(&200)); // single element, no overflow
        rs.add(100); // 200+100=300 > u8::MAX → wrap_ct=1
        assert_eq!(rs.total(), None); // overflow detected
        rs.add(50); // evicts 200; window=[100,50], sum=150
        assert_eq!(rs.total(), Some(&150)); // overflow healed
    }

    /// Sum really overflows and then recovers.
    #[test]
    fn limit_overflow_2x() {
        let mut rs = RollingSum::<u8, 3>::default();

        rs.add(200);
        assert_eq!(rs.total(), Some(&200));
        rs.add(200);
        assert_eq!(rs.total(), None);
        rs.add(200);
        assert_eq!(rs.total(), None);

        rs.add(10);
        assert_eq!(rs.total(), None);
        rs.add(10);
        assert_eq!(rs.total(), Some(&220));
        rs.add(10);
        assert_eq!(rs.total(), Some(&30));
    }

    /// The other tests use u8. Maybe I left a weird type expectation
    /// somewhere. This uses large u64s.
    #[test]
    fn sanity_check_big() {
        const HALF: u64 = (u64::MAX as f64 / 2.) as u64 - 1;
        let mut rs = RollingSum::<_, 2>::default();
        rs.add(HALF);
        rs.add(HALF);
        assert_eq!(rs.total(), Some(&(HALF * 2)));
        rs.add(1);
        assert_eq!(rs.total(), Some(&(HALF + 1)));
    }

    /// Brief overflow and recovery using only min, max, and 0.
    #[test]
    fn limit_extremes() {
        let mut rs = RollingSum::<i32, 3>::default();

        rs.add(i32::MAX);
        assert!(rs.total().is_some());

        rs.add(i32::MIN);
        assert!(rs.total().is_some());

        rs.add(i32::MAX);
        assert!(rs.total().is_some());

        rs.add(i32::MAX);
        assert!(rs.total().is_some());

        rs.add(0);
        assert!(rs.total().is_none());

        rs.add(0);
        assert_eq!(rs.total(), Some(&i32::MAX));
    }

    /// Same as overflow test but negative.
    #[test]
    fn limit_underflow() {
        let mut rs = RollingSum::<i32, 3>::default();

        rs.add(i32::MIN);
        assert!(rs.total().is_some());

        rs.add(-1);
        assert!(rs.total().is_none());

        rs.add(1);
        assert_eq!(rs.total(), Some(&i32::MIN));
    }

    /// Using `Decimal32` for its intended purpose.
    #[test]
    fn decimal_overflow() {
        let mut rs = RollingSum::<D1, 4>::default();

        rs.add(D1::MAX);
        assert!(rs.total().is_some());

        for _ in 0..100 {
            rs.add(D1::MAX);
            assert!(rs.total().is_none());
        }

        for _ in 0..3 {
            rs.add(D1::ZERO);
        }

        rs.add(D1::MIN_UNIT);
        assert!(matches!(rs.total(), Some(&D1::MIN_UNIT)));
    }

    /// Feeds inputs from an `(input, expected)` iterator into
    /// a RollingSum. Compares each total to `expected` and panics
    /// if they're not equal.
    fn expect_total<T, const WINDOW: usize>(input_and_expected: impl Iterator<Item = (T, T)>)
    where
        T: WrappingAdd
            + WrappingSub
            + CheckedAdd
            + CheckedSub
            + PartialOrd
            + Copy
            + Default
            + Debug,
    {
        let mut roll: RollingSum<T, WINDOW> = RollingSum::default();
        for (input, expected) in input_and_expected {
            roll.add(input);
            assert_eq!(*roll.total().unwrap(), expected);
        }
    }
}