Skip to main content

wickra_data/
aggregator.rs

1//! Roll trade ticks up into candles of an arbitrary timeframe.
2
3use crate::error::{Error, Result};
4use wickra_core::{Candle, Tick};
5
6/// Hard cap on the number of placeholder candles a single
7/// [`TickAggregator::push`] call may emit when gap-fill is enabled. One
8/// million minute-candles is roughly 1.9 years of contiguous one-minute bars
9/// — orders of magnitude beyond any realistic missing-data window in
10/// production while still keeping the resulting `Vec<Candle>` to well under
11/// 50 MB. Any larger gap is treated as malformed input rather than allowed
12/// to OOM the process.
13pub const MAX_GAP_FILL_CANDLES: i64 = 1_000_000;
14
15/// A candle bucket size measured in the same unit as the tick timestamps.
16///
17/// Wickra is unit-agnostic about timestamps: choose whichever makes sense for
18/// your source (milliseconds for Binance trade events, microseconds for IB,
19/// seconds for daily bars).
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
21pub struct Timeframe {
22    bucket: i64,
23}
24
25impl Timeframe {
26    /// Construct a timeframe with the given bucket size in the chosen unit.
27    ///
28    /// # Errors
29    /// Returns [`Error::InvalidTimeframe`] if `bucket <= 0`.
30    pub fn new(bucket: i64) -> Result<Self> {
31        if bucket <= 0 {
32            return Err(Error::InvalidTimeframe(format!(
33                "bucket size must be positive, got {bucket}"
34            )));
35        }
36        Ok(Self { bucket })
37    }
38
39    /// Convenience: build a millisecond timeframe.
40    pub fn millis(ms: i64) -> Result<Self> {
41        Self::new(ms)
42    }
43
44    /// Convenience: build a seconds-resolution timeframe.
45    pub fn seconds(s: i64) -> Result<Self> {
46        Self::new(s)
47    }
48
49    /// One-minute timeframe in milliseconds (`60_000`).
50    pub fn one_minute_ms() -> Self {
51        Self::new(60_000).expect("60_000 > 0")
52    }
53
54    /// Convenience: build a timeframe of `n` whole minutes, measured in
55    /// seconds — consistent with [`Timeframe::seconds`].
56    ///
57    /// `minutes(5)` yields a bucket of `300`, for use with second-resolution
58    /// timestamps. For millisecond timestamps (Binance) multiply yourself or
59    /// use [`Timeframe::millis`].
60    ///
61    /// # Errors
62    /// Returns [`Error::InvalidTimeframe`] if `n` is not positive or if
63    /// `n * 60` overflows `i64`.
64    ///
65    /// ```
66    /// use wickra_data::aggregator::Timeframe;
67    /// assert_eq!(Timeframe::minutes(5)?.bucket(), 300);
68    /// # Ok::<(), wickra_data::Error>(())
69    /// ```
70    pub fn minutes(n: i64) -> Result<Self> {
71        let bucket = n
72            .checked_mul(60)
73            .ok_or_else(|| Error::InvalidTimeframe(format!("{n} minutes overflows i64 seconds")))?;
74        Self::new(bucket)
75    }
76
77    /// Convenience: build a timeframe of `n` whole hours, measured in seconds
78    /// (`hours(2)` → a bucket of `7_200`).
79    ///
80    /// # Errors
81    /// Returns [`Error::InvalidTimeframe`] if `n` is not positive or if
82    /// `n * 3_600` overflows `i64`.
83    ///
84    /// ```
85    /// use wickra_data::aggregator::Timeframe;
86    /// assert_eq!(Timeframe::hours(2)?.bucket(), 7_200);
87    /// # Ok::<(), wickra_data::Error>(())
88    /// ```
89    pub fn hours(n: i64) -> Result<Self> {
90        let bucket = n
91            .checked_mul(3_600)
92            .ok_or_else(|| Error::InvalidTimeframe(format!("{n} hours overflows i64 seconds")))?;
93        Self::new(bucket)
94    }
95
96    /// Convenience: build a timeframe of `n` whole days, measured in seconds
97    /// (`days(1)` → a bucket of `86_400`).
98    ///
99    /// # Errors
100    /// Returns [`Error::InvalidTimeframe`] if `n` is not positive or if
101    /// `n * 86_400` overflows `i64`.
102    ///
103    /// ```
104    /// use wickra_data::aggregator::Timeframe;
105    /// assert_eq!(Timeframe::days(1)?.bucket(), 86_400);
106    /// # Ok::<(), wickra_data::Error>(())
107    /// ```
108    pub fn days(n: i64) -> Result<Self> {
109        let bucket = n
110            .checked_mul(86_400)
111            .ok_or_else(|| Error::InvalidTimeframe(format!("{n} days overflows i64 seconds")))?;
112        Self::new(bucket)
113    }
114
115    /// Bucket size.
116    pub const fn bucket(self) -> i64 {
117        self.bucket
118    }
119
120    /// Floor a raw timestamp to this timeframe's bucket boundary.
121    ///
122    /// For a timestamp within one bucket of [`i64::MIN`] the mathematically
123    /// exact boundary lies below `i64::MIN` and cannot be represented; in that
124    /// (practically unreachable) case the result saturates at `i64::MIN`
125    /// rather than overflowing and panicking in debug builds. `bucket` is
126    /// always positive, so `rem_euclid` itself cannot panic.
127    pub fn floor(self, ts: i64) -> i64 {
128        ts.saturating_sub(ts.rem_euclid(self.bucket))
129    }
130}
131
132/// Incrementally builds candles out of arriving ticks.
133///
134/// Each call to [`TickAggregator::push`] returns the candles that closed as a
135/// result of the new tick — normally at most one. Use
136/// [`TickAggregator::flush`] at the end of a stream to capture the final open
137/// bar.
138///
139/// # Gaps
140///
141/// By default a tick that jumps across one or more empty buckets simply opens
142/// the next non-empty bar — the skipped buckets produce no candle, so the
143/// output series can have time holes. Enable [`TickAggregator::with_gap_fill`]
144/// to instead emit a flat placeholder candle for every skipped bucket, giving
145/// downstream indicators an unbroken, evenly spaced series. To bound memory
146/// against an adversarial timestamp jump, gap-filling refuses to emit more
147/// than [`MAX_GAP_FILL_CANDLES`] placeholders in a single step; a larger gap
148/// surfaces as an `Error::Malformed` so the caller can decide how to handle
149/// the discontinuity.
150#[derive(Debug, Clone)]
151pub struct TickAggregator {
152    timeframe: Timeframe,
153    open_bar: Option<OpenBar>,
154    fill_gaps: bool,
155}
156
157#[derive(Debug, Clone, Copy)]
158struct OpenBar {
159    bucket_start: i64,
160    /// Timestamp of the most recently absorbed tick. Used to reject ticks that
161    /// arrive out of order *within* the current bucket — without it an older
162    /// tick would silently overwrite `close` with a stale price.
163    last_ts: i64,
164    open: f64,
165    high: f64,
166    low: f64,
167    close: f64,
168    volume: f64,
169}
170
171impl OpenBar {
172    fn from_tick(t: Tick, bucket_start: i64) -> Self {
173        Self {
174            bucket_start,
175            last_ts: t.timestamp,
176            open: t.price,
177            high: t.price,
178            low: t.price,
179            close: t.price,
180            volume: t.volume,
181        }
182    }
183
184    fn absorb(&mut self, t: Tick) {
185        if t.price > self.high {
186            self.high = t.price;
187        }
188        if t.price < self.low {
189            self.low = t.price;
190        }
191        self.close = t.price;
192        self.volume += t.volume;
193        self.last_ts = t.timestamp;
194    }
195
196    /// Finalise the bar into a validated [`Candle`].
197    ///
198    /// # Errors
199    /// Returns [`Error::Core`] if the accumulated `volume` is no longer finite.
200    /// `volume` is summed across every absorbed tick, so an astronomically
201    /// long or large run can drift it to `inf`; emitting such a candle would
202    /// silently poison every downstream indicator, so it is surfaced instead.
203    /// The OHLC fields are finite and correctly ordered by construction, so
204    /// `Candle::new` only ever rejects this bar for a non-finite volume.
205    fn into_candle(self) -> Result<Candle> {
206        Candle::new(
207            self.open,
208            self.high,
209            self.low,
210            self.close,
211            self.volume,
212            self.bucket_start,
213        )
214        .map_err(Error::from)
215    }
216}
217
218impl TickAggregator {
219    /// Construct a new aggregator for the given timeframe.
220    pub fn new(timeframe: Timeframe) -> Self {
221        Self {
222            timeframe,
223            open_bar: None,
224            fill_gaps: false,
225        }
226    }
227
228    /// Enable or disable gap filling, returning the (re)configured aggregator.
229    ///
230    /// When enabled, [`push`](Self::push) emits a flat candle
231    /// (`open == high == low == close`, `volume == 0`) for every bucket that is
232    /// skipped between two consecutive ticks. The flat candle's price is the
233    /// close of the bar that preceded the gap, so the series stays continuous.
234    #[must_use]
235    pub fn with_gap_fill(mut self, fill: bool) -> Self {
236        self.fill_gaps = fill;
237        self
238    }
239
240    /// Whether gap filling is enabled.
241    pub const fn fills_gaps(&self) -> bool {
242        self.fill_gaps
243    }
244
245    /// Push a tick. Returns every candle that closed as a result — an empty
246    /// vector while the open bar keeps growing, one candle when a bar boundary
247    /// is crossed, and (with gap filling enabled) additionally one flat candle
248    /// per skipped bucket.
249    ///
250    /// # Errors
251    /// Returns [`Error::Malformed`] if `tick.timestamp` goes backwards — both
252    /// across buckets (older than the open bar's start) and within a bucket
253    /// (older than the last tick absorbed into it) — or if gap filling
254    /// overflows the timestamp range. Ticks sharing a timestamp are accepted.
255    pub fn push(&mut self, tick: Tick) -> Result<Vec<Candle>> {
256        let bucket = self.timeframe.floor(tick.timestamp);
257        if let Some(mut bar) = self.open_bar {
258            if bucket < bar.bucket_start {
259                return Err(Error::Malformed(format!(
260                    "tick timestamp {} is older than the open bar start {}",
261                    tick.timestamp, bar.bucket_start
262                )));
263            }
264            if bucket > bar.bucket_start {
265                // Close the previous bar and start a new one with this tick.
266                let closed = bar.into_candle()?;
267                let mut out = Vec::with_capacity(1);
268                out.push(closed);
269                if self.fill_gaps {
270                    self.fill_between(closed, bucket, &mut out)?;
271                }
272                self.open_bar = Some(OpenBar::from_tick(tick, bucket));
273                return Ok(out);
274            }
275            // Same bucket: reject a tick that predates the last one absorbed,
276            // which would otherwise overwrite `close` with a stale price.
277            // Equal timestamps are allowed — several trades can share a
278            // millisecond.
279            if tick.timestamp < bar.last_ts {
280                return Err(Error::Malformed(format!(
281                    "tick timestamp {} predates the last tick {} in the same bucket",
282                    tick.timestamp, bar.last_ts
283                )));
284            }
285            bar.absorb(tick);
286            self.open_bar = Some(bar);
287            return Ok(Vec::new());
288        }
289        self.open_bar = Some(OpenBar::from_tick(tick, bucket));
290        Ok(Vec::new())
291    }
292
293    /// Append a flat placeholder candle for every empty bucket strictly between
294    /// the just-closed bar and the next bucket that received a tick.
295    ///
296    /// Returns `Error::Malformed` when the gap would exceed
297    /// [`MAX_GAP_FILL_CANDLES`] — an adversarial timestamp jump (a clock-glitch
298    /// tick years in the future) must surface as a defined error, not as an
299    /// out-of-memory panic from allocating millions of placeholder candles.
300    fn fill_between(&self, prev: Candle, next_bucket: i64, out: &mut Vec<Candle>) -> Result<()> {
301        let step = self.timeframe.bucket();
302        let start = prev
303            .timestamp
304            .checked_add(step)
305            .ok_or_else(|| Error::Malformed("timestamp overflow while gap-filling".to_string()))?;
306        if start >= next_bucket {
307            return Ok(());
308        }
309
310        // Compute the gap size up-front so an adversarial timestamp delta
311        // is refused before we allocate. `step > 0` by `Timeframe::new`'s
312        // invariant, so the divisor is safe. Saturating the subtraction
313        // makes the arithmetic infallible; an overflowed-saturated span is
314        // still far above the cap so the limit check below catches it.
315        let span = next_bucket.saturating_sub(start);
316        let gap_count = span / step + i64::from(span % step != 0);
317
318        if gap_count > MAX_GAP_FILL_CANDLES {
319            return Err(Error::Malformed(format!(
320                "gap-fill between bucket {} and {next_bucket} would emit {gap_count} \
321                 flat candles at step {step}, exceeding the {MAX_GAP_FILL_CANDLES} \
322                 cap; reject the discontinuity instead of allocating",
323                prev.timestamp
324            )));
325        }
326
327        out.reserve(gap_count as usize);
328        // Bucket alignment guarantees start + (gap_count - 1) * step ≤
329        // next_bucket - step < i64::MAX, so iterating `gap_count` times
330        // with `saturating_add(step)` cannot reach i64::MAX inside the
331        // loop body. `prev.close` is finite (it came from a validated
332        // bar) and volume is exactly 0.0, so the OHLCV invariants hold
333        // by construction — skip re-validation via Candle::new_unchecked.
334        let mut t = start;
335        for _ in 0..gap_count {
336            out.push(Candle::new_unchecked(
337                prev.close, prev.close, prev.close, prev.close, 0.0, t,
338            ));
339            t = t.saturating_add(step);
340        }
341        Ok(())
342    }
343
344    /// Drain the currently open bar (if any) and return it. Useful at the end of
345    /// a backtest or when shutting down a live aggregator.
346    ///
347    /// # Errors
348    /// Returns an error if the open bar's accumulated volume is non-finite
349    /// (see [`OpenBar::into_candle`]).
350    pub fn flush(&mut self) -> Result<Option<Candle>> {
351        self.open_bar.take().map(OpenBar::into_candle).transpose()
352    }
353
354    /// Configured timeframe.
355    pub const fn timeframe(&self) -> Timeframe {
356        self.timeframe
357    }
358}
359
360#[cfg(test)]
361mod tests {
362    use super::*;
363
364    fn t(price: f64, ts: i64) -> Tick {
365        Tick::new(price, 1.0, ts).unwrap()
366    }
367
368    #[test]
369    fn timeframe_rejects_non_positive() {
370        assert!(Timeframe::new(0).is_err());
371        assert!(Timeframe::new(-1).is_err());
372    }
373
374    /// Cover the `Timeframe::millis`, `Timeframe::seconds`, and
375    /// `Timeframe::one_minute_ms` convenience constructors (lines 40-52).
376    /// All existing tests build Timeframes via `new` / `minutes` / `hours` /
377    /// `days`, never via the three thin convenience wrappers.
378    #[test]
379    fn timeframe_convenience_constructors() {
380        assert_eq!(Timeframe::millis(250).unwrap().bucket(), 250);
381        assert!(Timeframe::millis(0).is_err());
382        assert_eq!(Timeframe::seconds(30).unwrap().bucket(), 30);
383        assert!(Timeframe::seconds(-1).is_err());
384        // one_minute_ms is the infallible 60_000-ms shortcut.
385        assert_eq!(Timeframe::one_minute_ms().bucket(), 60_000);
386    }
387
388    /// Cover the `TickAggregator::timeframe` const accessor (lines 353-355).
389    /// Existing tests only inspect emitted candles, never query the
390    /// configured timeframe back out.
391    #[test]
392    fn aggregator_timeframe_getter() {
393        let tf = Timeframe::new(60).unwrap();
394        let agg = TickAggregator::new(tf);
395        assert_eq!(agg.timeframe().bucket(), 60);
396    }
397
398    #[test]
399    fn minute_hour_day_constructors_compute_seconds() {
400        assert_eq!(Timeframe::minutes(1).unwrap().bucket(), 60);
401        assert_eq!(Timeframe::minutes(5).unwrap().bucket(), 300);
402        assert_eq!(Timeframe::hours(1).unwrap().bucket(), 3_600);
403        assert_eq!(Timeframe::hours(4).unwrap().bucket(), 14_400);
404        assert_eq!(Timeframe::days(1).unwrap().bucket(), 86_400);
405        assert_eq!(Timeframe::days(7).unwrap().bucket(), 604_800);
406    }
407
408    #[test]
409    fn minute_hour_day_constructors_reject_non_positive() {
410        for n in [0, -1, -60] {
411            assert!(Timeframe::minutes(n).is_err());
412            assert!(Timeframe::hours(n).is_err());
413            assert!(Timeframe::days(n).is_err());
414        }
415    }
416
417    #[test]
418    fn minute_hour_day_constructors_reject_overflow() {
419        // `n * unit` overflows i64 long before `new`'s sign check runs.
420        assert!(matches!(
421            Timeframe::minutes(i64::MAX),
422            Err(Error::InvalidTimeframe(_))
423        ));
424        assert!(matches!(
425            Timeframe::hours(i64::MAX),
426            Err(Error::InvalidTimeframe(_))
427        ));
428        assert!(matches!(
429            Timeframe::days(i64::MAX),
430            Err(Error::InvalidTimeframe(_))
431        ));
432    }
433
434    #[test]
435    fn floors_to_bucket_boundary() {
436        let tf = Timeframe::new(100).unwrap();
437        assert_eq!(tf.floor(0), 0);
438        assert_eq!(tf.floor(99), 0);
439        assert_eq!(tf.floor(100), 100);
440        assert_eq!(tf.floor(150), 100);
441        assert_eq!(tf.floor(250), 200);
442        // Negative timestamps still floor toward negative infinity.
443        assert_eq!(tf.floor(-1), -100);
444        assert_eq!(tf.floor(-100), -100);
445        assert_eq!(tf.floor(-101), -200);
446    }
447
448    #[test]
449    fn floor_saturates_instead_of_overflowing_at_min() {
450        let tf = Timeframe::new(100).unwrap();
451        // The exact boundary lies below i64::MIN — must not panic.
452        assert_eq!(tf.floor(i64::MIN), i64::MIN);
453        // i64::MAX must not overflow either (subtracting a non-negative).
454        let hi = tf.floor(i64::MAX);
455        assert!(hi > i64::MAX - 100 && hi % 100 == 0);
456    }
457
458    #[test]
459    fn aggregates_ticks_into_one_candle_within_bucket() {
460        let mut agg = TickAggregator::new(Timeframe::new(60).unwrap());
461        assert!(agg.push(t(10.0, 0)).unwrap().is_empty());
462        assert!(agg.push(t(12.0, 15)).unwrap().is_empty());
463        assert!(agg.push(t(8.0, 30)).unwrap().is_empty());
464        assert!(agg.push(t(11.0, 50)).unwrap().is_empty());
465        let bar = agg.flush().unwrap().expect("open bar");
466        assert_eq!(bar.open, 10.0);
467        assert_eq!(bar.high, 12.0);
468        assert_eq!(bar.low, 8.0);
469        assert_eq!(bar.close, 11.0);
470        assert!((bar.volume - 4.0).abs() < 1e-12);
471        assert_eq!(bar.timestamp, 0);
472    }
473
474    #[test]
475    fn emits_candle_on_bucket_crossing() {
476        let mut agg = TickAggregator::new(Timeframe::new(60).unwrap());
477        agg.push(t(10.0, 0)).unwrap();
478        agg.push(t(12.0, 30)).unwrap();
479        let closed = agg.push(t(15.0, 60)).unwrap();
480        assert_eq!(closed.len(), 1);
481        let closed = closed[0];
482        assert_eq!(closed.open, 10.0);
483        assert_eq!(closed.high, 12.0);
484        assert_eq!(closed.low, 10.0);
485        assert_eq!(closed.close, 12.0);
486
487        // The new tick at ts=60 opens the next bar.
488        let still_open = agg.flush().unwrap().unwrap();
489        assert_eq!(still_open.open, 15.0);
490        assert_eq!(still_open.timestamp, 60);
491    }
492
493    #[test]
494    fn rejects_out_of_order_ticks() {
495        let mut agg = TickAggregator::new(Timeframe::new(60).unwrap());
496        agg.push(t(10.0, 100)).unwrap();
497        let err = agg.push(t(11.0, 30)).unwrap_err();
498        assert!(matches!(err, Error::Malformed(_)));
499    }
500
501    #[test]
502    fn rejects_same_bucket_out_of_order_tick() {
503        let mut agg = TickAggregator::new(Timeframe::new(60).unwrap());
504        agg.push(t(10.0, 50)).unwrap();
505        // ts=10 is still bucket 0 but predates the tick at ts=50 — rejecting
506        // it prevents a stale price silently overwriting `close`.
507        let err = agg.push(t(99.0, 10)).unwrap_err();
508        assert!(matches!(err, Error::Malformed(_)));
509        // The open bar is untouched: close is still the ts=50 price.
510        assert_eq!(agg.flush().unwrap().unwrap().close, 10.0);
511    }
512
513    #[test]
514    fn accepts_same_bucket_ticks_sharing_a_timestamp() {
515        let mut agg = TickAggregator::new(Timeframe::new(60).unwrap());
516        agg.push(t(10.0, 20)).unwrap();
517        // Two trades in the same millisecond are legitimate.
518        agg.push(t(12.0, 20)).unwrap();
519        agg.push(t(11.0, 20)).unwrap();
520        let bar = agg.flush().unwrap().unwrap();
521        assert_eq!(bar.high, 12.0);
522        assert_eq!(bar.close, 11.0);
523    }
524
525    #[test]
526    fn flushes_a_non_finite_volume_as_an_error() {
527        let mut agg = TickAggregator::new(Timeframe::new(60).unwrap());
528        // Two near-max volumes sum to +inf — the closed candle would carry a
529        // non-finite volume that poisons every downstream indicator.
530        agg.push(Tick::new(10.0, f64::MAX, 0).unwrap()).unwrap();
531        agg.push(Tick::new(10.0, f64::MAX, 1).unwrap()).unwrap();
532        let err = agg.flush().unwrap_err();
533        assert!(matches!(err, Error::Core(_)));
534    }
535
536    #[test]
537    fn skips_empty_buckets_without_gap_fill() {
538        let mut agg = TickAggregator::new(Timeframe::new(60).unwrap());
539        assert!(!agg.fills_gaps());
540        agg.push(t(10.0, 0)).unwrap();
541        // Jump from bucket 0 straight to bucket 180 — buckets 60 and 120 empty.
542        let closed = agg.push(t(20.0, 200)).unwrap();
543        assert_eq!(closed.len(), 1, "only the real bar closes");
544        assert_eq!(closed[0].timestamp, 0);
545    }
546
547    #[test]
548    fn gap_fill_rejects_runaway_timestamp_jump() {
549        // An adversarial clock-glitch tick years in the future must surface
550        // as an Error::Malformed rather than allocating millions of flat
551        // candles and OOMing. Found by the `tick_aggregator` fuzz target.
552        let mut agg = TickAggregator::new(Timeframe::new(60).unwrap()).with_gap_fill(true);
553        agg.push(t(10.0, 0)).unwrap();
554        // Two-billion-second jump = ~63 years of minute bars = ~33 million
555        // candles, well above the 1_000_000 cap.
556        let err = agg.push(t(20.0, 2_000_000_000)).unwrap_err();
557        let msg = err.to_string();
558        assert!(
559            msg.contains("gap-fill") && msg.contains("cap"),
560            "expected a malformed-gap error, got: {msg}"
561        );
562    }
563
564    #[test]
565    fn gap_fill_at_the_cap_succeeds() {
566        // Exactly one million minute-buckets between the two ticks (one real
567        // bar + one million flat fillers + the third tick's open bar) — the
568        // limit is inclusive, so this must succeed.
569        let mut agg = TickAggregator::new(Timeframe::new(60).unwrap()).with_gap_fill(true);
570        agg.push(t(10.0, 0)).unwrap();
571        // bucket 0 closes; jump straight to bucket 60_000_060 (1_000_001 buckets
572        // away). fill_between emits 1_000_000 flat candles between them, then
573        // the new tick opens its own bucket. Output: 1 real bar + 1_000_000 fillers.
574        let out = agg.push(t(20.0, 60_000_060)).unwrap();
575        assert_eq!(out.len(), 1 + MAX_GAP_FILL_CANDLES as usize);
576    }
577
578    #[test]
579    fn gap_fill_emits_flat_candles_for_skipped_buckets() {
580        let mut agg = TickAggregator::new(Timeframe::new(60).unwrap()).with_gap_fill(true);
581        assert!(agg.fills_gaps());
582        agg.push(t(10.0, 0)).unwrap();
583        agg.push(t(13.0, 30)).unwrap(); // still bucket 0, close = 13.0
584                                        // Next tick lands in bucket 180 — buckets 60 and 120 are skipped.
585        let out = agg.push(t(20.0, 200)).unwrap();
586        assert_eq!(out.len(), 3, "real bar + two flat fillers");
587
588        let real = out[0];
589        assert_eq!(real.timestamp, 0);
590        assert_eq!(real.close, 13.0);
591
592        for (filler, ts) in out[1..].iter().zip([60, 120]) {
593            assert_eq!(filler.timestamp, ts);
594            assert_eq!(filler.open, 13.0);
595            assert_eq!(filler.high, 13.0);
596            assert_eq!(filler.low, 13.0);
597            assert_eq!(filler.close, 13.0);
598            assert_eq!(filler.volume, 0.0);
599        }
600
601        // The tick at ts=200 opens bucket 180.
602        assert_eq!(agg.flush().unwrap().unwrap().timestamp, 180);
603    }
604
605    #[test]
606    fn gap_fill_emits_nothing_extra_for_adjacent_buckets() {
607        let mut agg = TickAggregator::new(Timeframe::new(60).unwrap()).with_gap_fill(true);
608        agg.push(t(10.0, 0)).unwrap();
609        // Bucket 60 directly follows bucket 0 — no gap to fill.
610        let out = agg.push(t(11.0, 70)).unwrap();
611        assert_eq!(out.len(), 1);
612        assert_eq!(out[0].timestamp, 0);
613    }
614}