wickra_data/aggregator.rs
1//! Roll trade ticks up into candles of an arbitrary timeframe.
2
3use crate::error::{Error, Result};
4use wickra_core::{Candle, Tick};
5
6/// Hard cap on the number of placeholder candles a single
7/// [`TickAggregator::push`] call may emit when gap-fill is enabled. One
8/// million minute-candles is roughly 1.9 years of contiguous one-minute bars
9/// — orders of magnitude beyond any realistic missing-data window in
10/// production while still keeping the resulting `Vec<Candle>` to well under
11/// 50 MB. Any larger gap is treated as malformed input rather than allowed
12/// to OOM the process.
13pub const MAX_GAP_FILL_CANDLES: i64 = 1_000_000;
14
15/// A candle bucket size measured in the same unit as the tick timestamps.
16///
17/// Wickra is unit-agnostic about timestamps: choose whichever makes sense for
18/// your source (milliseconds for Binance trade events, microseconds for IB,
19/// seconds for daily bars).
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
21pub struct Timeframe {
22 bucket: i64,
23}
24
25impl Timeframe {
26 /// Construct a timeframe with the given bucket size in the chosen unit.
27 ///
28 /// # Errors
29 /// Returns [`Error::InvalidTimeframe`] if `bucket <= 0`.
30 pub fn new(bucket: i64) -> Result<Self> {
31 if bucket <= 0 {
32 return Err(Error::InvalidTimeframe(format!(
33 "bucket size must be positive, got {bucket}"
34 )));
35 }
36 Ok(Self { bucket })
37 }
38
39 /// Convenience: build a millisecond timeframe.
40 pub fn millis(ms: i64) -> Result<Self> {
41 Self::new(ms)
42 }
43
44 /// Convenience: build a seconds-resolution timeframe.
45 pub fn seconds(s: i64) -> Result<Self> {
46 Self::new(s)
47 }
48
49 /// One-minute timeframe in milliseconds (`60_000`).
50 pub fn one_minute_ms() -> Self {
51 Self::new(60_000).expect("60_000 > 0")
52 }
53
54 /// Convenience: build a timeframe of `n` whole minutes, measured in
55 /// seconds — consistent with [`Timeframe::seconds`].
56 ///
57 /// `minutes(5)` yields a bucket of `300`, for use with second-resolution
58 /// timestamps. For millisecond timestamps (Binance) multiply yourself or
59 /// use [`Timeframe::millis`].
60 ///
61 /// # Errors
62 /// Returns [`Error::InvalidTimeframe`] if `n` is not positive or if
63 /// `n * 60` overflows `i64`.
64 ///
65 /// ```
66 /// use wickra_data::aggregator::Timeframe;
67 /// assert_eq!(Timeframe::minutes(5)?.bucket(), 300);
68 /// # Ok::<(), wickra_data::Error>(())
69 /// ```
70 pub fn minutes(n: i64) -> Result<Self> {
71 let bucket = n
72 .checked_mul(60)
73 .ok_or_else(|| Error::InvalidTimeframe(format!("{n} minutes overflows i64 seconds")))?;
74 Self::new(bucket)
75 }
76
77 /// Convenience: build a timeframe of `n` whole hours, measured in seconds
78 /// (`hours(2)` → a bucket of `7_200`).
79 ///
80 /// # Errors
81 /// Returns [`Error::InvalidTimeframe`] if `n` is not positive or if
82 /// `n * 3_600` overflows `i64`.
83 ///
84 /// ```
85 /// use wickra_data::aggregator::Timeframe;
86 /// assert_eq!(Timeframe::hours(2)?.bucket(), 7_200);
87 /// # Ok::<(), wickra_data::Error>(())
88 /// ```
89 pub fn hours(n: i64) -> Result<Self> {
90 let bucket = n
91 .checked_mul(3_600)
92 .ok_or_else(|| Error::InvalidTimeframe(format!("{n} hours overflows i64 seconds")))?;
93 Self::new(bucket)
94 }
95
96 /// Convenience: build a timeframe of `n` whole days, measured in seconds
97 /// (`days(1)` → a bucket of `86_400`).
98 ///
99 /// # Errors
100 /// Returns [`Error::InvalidTimeframe`] if `n` is not positive or if
101 /// `n * 86_400` overflows `i64`.
102 ///
103 /// ```
104 /// use wickra_data::aggregator::Timeframe;
105 /// assert_eq!(Timeframe::days(1)?.bucket(), 86_400);
106 /// # Ok::<(), wickra_data::Error>(())
107 /// ```
108 pub fn days(n: i64) -> Result<Self> {
109 let bucket = n
110 .checked_mul(86_400)
111 .ok_or_else(|| Error::InvalidTimeframe(format!("{n} days overflows i64 seconds")))?;
112 Self::new(bucket)
113 }
114
115 /// Bucket size.
116 pub const fn bucket(self) -> i64 {
117 self.bucket
118 }
119
120 /// Floor a raw timestamp to this timeframe's bucket boundary.
121 ///
122 /// For a timestamp within one bucket of [`i64::MIN`] the mathematically
123 /// exact boundary lies below `i64::MIN` and cannot be represented; in that
124 /// (practically unreachable) case the result saturates at `i64::MIN`
125 /// rather than overflowing and panicking in debug builds. `bucket` is
126 /// always positive, so `rem_euclid` itself cannot panic.
127 pub fn floor(self, ts: i64) -> i64 {
128 ts.saturating_sub(ts.rem_euclid(self.bucket))
129 }
130}
131
132/// Incrementally builds candles out of arriving ticks.
133///
134/// Each call to [`TickAggregator::push`] returns the candles that closed as a
135/// result of the new tick — normally at most one. Use
136/// [`TickAggregator::flush`] at the end of a stream to capture the final open
137/// bar.
138///
139/// # Gaps
140///
141/// By default a tick that jumps across one or more empty buckets simply opens
142/// the next non-empty bar — the skipped buckets produce no candle, so the
143/// output series can have time holes. Enable [`TickAggregator::with_gap_fill`]
144/// to instead emit a flat placeholder candle for every skipped bucket, giving
145/// downstream indicators an unbroken, evenly spaced series. To bound memory
146/// against an adversarial timestamp jump, gap-filling refuses to emit more
147/// than [`MAX_GAP_FILL_CANDLES`] placeholders in a single step; a larger gap
148/// surfaces as an `Error::Malformed` so the caller can decide how to handle
149/// the discontinuity.
150#[derive(Debug, Clone)]
151pub struct TickAggregator {
152 timeframe: Timeframe,
153 open_bar: Option<OpenBar>,
154 fill_gaps: bool,
155}
156
157#[derive(Debug, Clone, Copy)]
158struct OpenBar {
159 bucket_start: i64,
160 /// Timestamp of the most recently absorbed tick. Used to reject ticks that
161 /// arrive out of order *within* the current bucket — without it an older
162 /// tick would silently overwrite `close` with a stale price.
163 last_ts: i64,
164 open: f64,
165 high: f64,
166 low: f64,
167 close: f64,
168 volume: f64,
169}
170
171impl OpenBar {
172 fn from_tick(t: Tick, bucket_start: i64) -> Self {
173 Self {
174 bucket_start,
175 last_ts: t.timestamp,
176 open: t.price,
177 high: t.price,
178 low: t.price,
179 close: t.price,
180 volume: t.volume,
181 }
182 }
183
184 fn absorb(&mut self, t: Tick) {
185 if t.price > self.high {
186 self.high = t.price;
187 }
188 if t.price < self.low {
189 self.low = t.price;
190 }
191 self.close = t.price;
192 self.volume += t.volume;
193 self.last_ts = t.timestamp;
194 }
195
196 /// Finalise the bar into a validated [`Candle`].
197 ///
198 /// # Errors
199 /// Returns [`Error::Core`] if the accumulated `volume` is no longer finite.
200 /// `volume` is summed across every absorbed tick, so an astronomically
201 /// long or large run can drift it to `inf`; emitting such a candle would
202 /// silently poison every downstream indicator, so it is surfaced instead.
203 /// The OHLC fields are finite and correctly ordered by construction, so
204 /// `Candle::new` only ever rejects this bar for a non-finite volume.
205 fn into_candle(self) -> Result<Candle> {
206 Candle::new(
207 self.open,
208 self.high,
209 self.low,
210 self.close,
211 self.volume,
212 self.bucket_start,
213 )
214 .map_err(Error::from)
215 }
216}
217
218impl TickAggregator {
219 /// Construct a new aggregator for the given timeframe.
220 pub fn new(timeframe: Timeframe) -> Self {
221 Self {
222 timeframe,
223 open_bar: None,
224 fill_gaps: false,
225 }
226 }
227
228 /// Enable or disable gap filling, returning the (re)configured aggregator.
229 ///
230 /// When enabled, [`push`](Self::push) emits a flat candle
231 /// (`open == high == low == close`, `volume == 0`) for every bucket that is
232 /// skipped between two consecutive ticks. The flat candle's price is the
233 /// close of the bar that preceded the gap, so the series stays continuous.
234 #[must_use]
235 pub fn with_gap_fill(mut self, fill: bool) -> Self {
236 self.fill_gaps = fill;
237 self
238 }
239
240 /// Whether gap filling is enabled.
241 pub const fn fills_gaps(&self) -> bool {
242 self.fill_gaps
243 }
244
245 /// Push a tick. Returns every candle that closed as a result — an empty
246 /// vector while the open bar keeps growing, one candle when a bar boundary
247 /// is crossed, and (with gap filling enabled) additionally one flat candle
248 /// per skipped bucket.
249 ///
250 /// # Errors
251 /// Returns [`Error::Malformed`] if `tick.timestamp` goes backwards — both
252 /// across buckets (older than the open bar's start) and within a bucket
253 /// (older than the last tick absorbed into it) — or if gap filling
254 /// overflows the timestamp range. Ticks sharing a timestamp are accepted.
255 pub fn push(&mut self, tick: Tick) -> Result<Vec<Candle>> {
256 let bucket = self.timeframe.floor(tick.timestamp);
257 if let Some(mut bar) = self.open_bar {
258 if bucket < bar.bucket_start {
259 return Err(Error::Malformed(format!(
260 "tick timestamp {} is older than the open bar start {}",
261 tick.timestamp, bar.bucket_start
262 )));
263 }
264 if bucket > bar.bucket_start {
265 // Close the previous bar and start a new one with this tick.
266 let closed = bar.into_candle()?;
267 let mut out = Vec::with_capacity(1);
268 out.push(closed);
269 if self.fill_gaps {
270 self.fill_between(closed, bucket, &mut out)?;
271 }
272 self.open_bar = Some(OpenBar::from_tick(tick, bucket));
273 return Ok(out);
274 }
275 // Same bucket: reject a tick that predates the last one absorbed,
276 // which would otherwise overwrite `close` with a stale price.
277 // Equal timestamps are allowed — several trades can share a
278 // millisecond.
279 if tick.timestamp < bar.last_ts {
280 return Err(Error::Malformed(format!(
281 "tick timestamp {} predates the last tick {} in the same bucket",
282 tick.timestamp, bar.last_ts
283 )));
284 }
285 bar.absorb(tick);
286 self.open_bar = Some(bar);
287 return Ok(Vec::new());
288 }
289 self.open_bar = Some(OpenBar::from_tick(tick, bucket));
290 Ok(Vec::new())
291 }
292
293 /// Append a flat placeholder candle for every empty bucket strictly between
294 /// the just-closed bar and the next bucket that received a tick.
295 ///
296 /// Returns `Error::Malformed` when the gap would exceed
297 /// [`MAX_GAP_FILL_CANDLES`] — an adversarial timestamp jump (a clock-glitch
298 /// tick years in the future) must surface as a defined error, not as an
299 /// out-of-memory panic from allocating millions of placeholder candles.
300 fn fill_between(&self, prev: Candle, next_bucket: i64, out: &mut Vec<Candle>) -> Result<()> {
301 let step = self.timeframe.bucket();
302 let start = prev
303 .timestamp
304 .checked_add(step)
305 .ok_or_else(|| Error::Malformed("timestamp overflow while gap-filling".to_string()))?;
306 if start >= next_bucket {
307 return Ok(());
308 }
309
310 // Compute the gap size up-front so an adversarial timestamp delta
311 // is refused before we allocate. `step > 0` by `Timeframe::new`'s
312 // invariant, so the divisor is safe. Saturating the subtraction
313 // makes the arithmetic infallible; an overflowed-saturated span is
314 // still far above the cap so the limit check below catches it.
315 let span = next_bucket.saturating_sub(start);
316 let gap_count = span / step + i64::from(span % step != 0);
317
318 if gap_count > MAX_GAP_FILL_CANDLES {
319 return Err(Error::Malformed(format!(
320 "gap-fill between bucket {} and {next_bucket} would emit {gap_count} \
321 flat candles at step {step}, exceeding the {MAX_GAP_FILL_CANDLES} \
322 cap; reject the discontinuity instead of allocating",
323 prev.timestamp
324 )));
325 }
326
327 out.reserve(gap_count as usize);
328 // Bucket alignment guarantees start + (gap_count - 1) * step ≤
329 // next_bucket - step < i64::MAX, so iterating `gap_count` times
330 // with `saturating_add(step)` cannot reach i64::MAX inside the
331 // loop body. `prev.close` is finite (it came from a validated
332 // bar) and volume is exactly 0.0, so the OHLCV invariants hold
333 // by construction — skip re-validation via Candle::new_unchecked.
334 let mut t = start;
335 for _ in 0..gap_count {
336 out.push(Candle::new_unchecked(
337 prev.close, prev.close, prev.close, prev.close, 0.0, t,
338 ));
339 t = t.saturating_add(step);
340 }
341 Ok(())
342 }
343
344 /// Drain the currently open bar (if any) and return it. Useful at the end of
345 /// a backtest or when shutting down a live aggregator.
346 ///
347 /// # Errors
348 /// Returns an error if the open bar's accumulated volume is non-finite
349 /// (see [`OpenBar::into_candle`]).
350 pub fn flush(&mut self) -> Result<Option<Candle>> {
351 self.open_bar.take().map(OpenBar::into_candle).transpose()
352 }
353
354 /// Configured timeframe.
355 pub const fn timeframe(&self) -> Timeframe {
356 self.timeframe
357 }
358}
359
360#[cfg(test)]
361mod tests {
362 use super::*;
363
364 fn t(price: f64, ts: i64) -> Tick {
365 Tick::new(price, 1.0, ts).unwrap()
366 }
367
368 #[test]
369 fn timeframe_rejects_non_positive() {
370 assert!(Timeframe::new(0).is_err());
371 assert!(Timeframe::new(-1).is_err());
372 }
373
374 /// Cover the `Timeframe::millis`, `Timeframe::seconds`, and
375 /// `Timeframe::one_minute_ms` convenience constructors (lines 40-52).
376 /// All existing tests build Timeframes via `new` / `minutes` / `hours` /
377 /// `days`, never via the three thin convenience wrappers.
378 #[test]
379 fn timeframe_convenience_constructors() {
380 assert_eq!(Timeframe::millis(250).unwrap().bucket(), 250);
381 assert!(Timeframe::millis(0).is_err());
382 assert_eq!(Timeframe::seconds(30).unwrap().bucket(), 30);
383 assert!(Timeframe::seconds(-1).is_err());
384 // one_minute_ms is the infallible 60_000-ms shortcut.
385 assert_eq!(Timeframe::one_minute_ms().bucket(), 60_000);
386 }
387
388 /// Cover the `TickAggregator::timeframe` const accessor (lines 353-355).
389 /// Existing tests only inspect emitted candles, never query the
390 /// configured timeframe back out.
391 #[test]
392 fn aggregator_timeframe_getter() {
393 let tf = Timeframe::new(60).unwrap();
394 let agg = TickAggregator::new(tf);
395 assert_eq!(agg.timeframe().bucket(), 60);
396 }
397
398 #[test]
399 fn minute_hour_day_constructors_compute_seconds() {
400 assert_eq!(Timeframe::minutes(1).unwrap().bucket(), 60);
401 assert_eq!(Timeframe::minutes(5).unwrap().bucket(), 300);
402 assert_eq!(Timeframe::hours(1).unwrap().bucket(), 3_600);
403 assert_eq!(Timeframe::hours(4).unwrap().bucket(), 14_400);
404 assert_eq!(Timeframe::days(1).unwrap().bucket(), 86_400);
405 assert_eq!(Timeframe::days(7).unwrap().bucket(), 604_800);
406 }
407
408 #[test]
409 fn minute_hour_day_constructors_reject_non_positive() {
410 for n in [0, -1, -60] {
411 assert!(Timeframe::minutes(n).is_err());
412 assert!(Timeframe::hours(n).is_err());
413 assert!(Timeframe::days(n).is_err());
414 }
415 }
416
417 #[test]
418 fn minute_hour_day_constructors_reject_overflow() {
419 // `n * unit` overflows i64 long before `new`'s sign check runs.
420 assert!(matches!(
421 Timeframe::minutes(i64::MAX),
422 Err(Error::InvalidTimeframe(_))
423 ));
424 assert!(matches!(
425 Timeframe::hours(i64::MAX),
426 Err(Error::InvalidTimeframe(_))
427 ));
428 assert!(matches!(
429 Timeframe::days(i64::MAX),
430 Err(Error::InvalidTimeframe(_))
431 ));
432 }
433
434 #[test]
435 fn floors_to_bucket_boundary() {
436 let tf = Timeframe::new(100).unwrap();
437 assert_eq!(tf.floor(0), 0);
438 assert_eq!(tf.floor(99), 0);
439 assert_eq!(tf.floor(100), 100);
440 assert_eq!(tf.floor(150), 100);
441 assert_eq!(tf.floor(250), 200);
442 // Negative timestamps still floor toward negative infinity.
443 assert_eq!(tf.floor(-1), -100);
444 assert_eq!(tf.floor(-100), -100);
445 assert_eq!(tf.floor(-101), -200);
446 }
447
448 #[test]
449 fn floor_saturates_instead_of_overflowing_at_min() {
450 let tf = Timeframe::new(100).unwrap();
451 // The exact boundary lies below i64::MIN — must not panic.
452 assert_eq!(tf.floor(i64::MIN), i64::MIN);
453 // i64::MAX must not overflow either (subtracting a non-negative).
454 let hi = tf.floor(i64::MAX);
455 assert!(hi > i64::MAX - 100 && hi % 100 == 0);
456 }
457
458 #[test]
459 fn aggregates_ticks_into_one_candle_within_bucket() {
460 let mut agg = TickAggregator::new(Timeframe::new(60).unwrap());
461 assert!(agg.push(t(10.0, 0)).unwrap().is_empty());
462 assert!(agg.push(t(12.0, 15)).unwrap().is_empty());
463 assert!(agg.push(t(8.0, 30)).unwrap().is_empty());
464 assert!(agg.push(t(11.0, 50)).unwrap().is_empty());
465 let bar = agg.flush().unwrap().expect("open bar");
466 assert_eq!(bar.open, 10.0);
467 assert_eq!(bar.high, 12.0);
468 assert_eq!(bar.low, 8.0);
469 assert_eq!(bar.close, 11.0);
470 assert!((bar.volume - 4.0).abs() < 1e-12);
471 assert_eq!(bar.timestamp, 0);
472 }
473
474 #[test]
475 fn emits_candle_on_bucket_crossing() {
476 let mut agg = TickAggregator::new(Timeframe::new(60).unwrap());
477 agg.push(t(10.0, 0)).unwrap();
478 agg.push(t(12.0, 30)).unwrap();
479 let closed = agg.push(t(15.0, 60)).unwrap();
480 assert_eq!(closed.len(), 1);
481 let closed = closed[0];
482 assert_eq!(closed.open, 10.0);
483 assert_eq!(closed.high, 12.0);
484 assert_eq!(closed.low, 10.0);
485 assert_eq!(closed.close, 12.0);
486
487 // The new tick at ts=60 opens the next bar.
488 let still_open = agg.flush().unwrap().unwrap();
489 assert_eq!(still_open.open, 15.0);
490 assert_eq!(still_open.timestamp, 60);
491 }
492
493 #[test]
494 fn rejects_out_of_order_ticks() {
495 let mut agg = TickAggregator::new(Timeframe::new(60).unwrap());
496 agg.push(t(10.0, 100)).unwrap();
497 let err = agg.push(t(11.0, 30)).unwrap_err();
498 assert!(matches!(err, Error::Malformed(_)));
499 }
500
501 #[test]
502 fn rejects_same_bucket_out_of_order_tick() {
503 let mut agg = TickAggregator::new(Timeframe::new(60).unwrap());
504 agg.push(t(10.0, 50)).unwrap();
505 // ts=10 is still bucket 0 but predates the tick at ts=50 — rejecting
506 // it prevents a stale price silently overwriting `close`.
507 let err = agg.push(t(99.0, 10)).unwrap_err();
508 assert!(matches!(err, Error::Malformed(_)));
509 // The open bar is untouched: close is still the ts=50 price.
510 assert_eq!(agg.flush().unwrap().unwrap().close, 10.0);
511 }
512
513 #[test]
514 fn accepts_same_bucket_ticks_sharing_a_timestamp() {
515 let mut agg = TickAggregator::new(Timeframe::new(60).unwrap());
516 agg.push(t(10.0, 20)).unwrap();
517 // Two trades in the same millisecond are legitimate.
518 agg.push(t(12.0, 20)).unwrap();
519 agg.push(t(11.0, 20)).unwrap();
520 let bar = agg.flush().unwrap().unwrap();
521 assert_eq!(bar.high, 12.0);
522 assert_eq!(bar.close, 11.0);
523 }
524
525 #[test]
526 fn flushes_a_non_finite_volume_as_an_error() {
527 let mut agg = TickAggregator::new(Timeframe::new(60).unwrap());
528 // Two near-max volumes sum to +inf — the closed candle would carry a
529 // non-finite volume that poisons every downstream indicator.
530 agg.push(Tick::new(10.0, f64::MAX, 0).unwrap()).unwrap();
531 agg.push(Tick::new(10.0, f64::MAX, 1).unwrap()).unwrap();
532 let err = agg.flush().unwrap_err();
533 assert!(matches!(err, Error::Core(_)));
534 }
535
536 #[test]
537 fn skips_empty_buckets_without_gap_fill() {
538 let mut agg = TickAggregator::new(Timeframe::new(60).unwrap());
539 assert!(!agg.fills_gaps());
540 agg.push(t(10.0, 0)).unwrap();
541 // Jump from bucket 0 straight to bucket 180 — buckets 60 and 120 empty.
542 let closed = agg.push(t(20.0, 200)).unwrap();
543 assert_eq!(closed.len(), 1, "only the real bar closes");
544 assert_eq!(closed[0].timestamp, 0);
545 }
546
547 #[test]
548 fn gap_fill_rejects_runaway_timestamp_jump() {
549 // An adversarial clock-glitch tick years in the future must surface
550 // as an Error::Malformed rather than allocating millions of flat
551 // candles and OOMing. Found by the `tick_aggregator` fuzz target.
552 let mut agg = TickAggregator::new(Timeframe::new(60).unwrap()).with_gap_fill(true);
553 agg.push(t(10.0, 0)).unwrap();
554 // Two-billion-second jump = ~63 years of minute bars = ~33 million
555 // candles, well above the 1_000_000 cap.
556 let err = agg.push(t(20.0, 2_000_000_000)).unwrap_err();
557 let msg = err.to_string();
558 assert!(
559 msg.contains("gap-fill") && msg.contains("cap"),
560 "expected a malformed-gap error, got: {msg}"
561 );
562 }
563
564 #[test]
565 fn gap_fill_at_the_cap_succeeds() {
566 // Exactly one million minute-buckets between the two ticks (one real
567 // bar + one million flat fillers + the third tick's open bar) — the
568 // limit is inclusive, so this must succeed.
569 let mut agg = TickAggregator::new(Timeframe::new(60).unwrap()).with_gap_fill(true);
570 agg.push(t(10.0, 0)).unwrap();
571 // bucket 0 closes; jump straight to bucket 60_000_060 (1_000_001 buckets
572 // away). fill_between emits 1_000_000 flat candles between them, then
573 // the new tick opens its own bucket. Output: 1 real bar + 1_000_000 fillers.
574 let out = agg.push(t(20.0, 60_000_060)).unwrap();
575 assert_eq!(out.len(), 1 + MAX_GAP_FILL_CANDLES as usize);
576 }
577
578 #[test]
579 fn gap_fill_emits_flat_candles_for_skipped_buckets() {
580 let mut agg = TickAggregator::new(Timeframe::new(60).unwrap()).with_gap_fill(true);
581 assert!(agg.fills_gaps());
582 agg.push(t(10.0, 0)).unwrap();
583 agg.push(t(13.0, 30)).unwrap(); // still bucket 0, close = 13.0
584 // Next tick lands in bucket 180 — buckets 60 and 120 are skipped.
585 let out = agg.push(t(20.0, 200)).unwrap();
586 assert_eq!(out.len(), 3, "real bar + two flat fillers");
587
588 let real = out[0];
589 assert_eq!(real.timestamp, 0);
590 assert_eq!(real.close, 13.0);
591
592 for (filler, ts) in out[1..].iter().zip([60, 120]) {
593 assert_eq!(filler.timestamp, ts);
594 assert_eq!(filler.open, 13.0);
595 assert_eq!(filler.high, 13.0);
596 assert_eq!(filler.low, 13.0);
597 assert_eq!(filler.close, 13.0);
598 assert_eq!(filler.volume, 0.0);
599 }
600
601 // The tick at ts=200 opens bucket 180.
602 assert_eq!(agg.flush().unwrap().unwrap().timestamp, 180);
603 }
604
605 #[test]
606 fn gap_fill_emits_nothing_extra_for_adjacent_buckets() {
607 let mut agg = TickAggregator::new(Timeframe::new(60).unwrap()).with_gap_fill(true);
608 agg.push(t(10.0, 0)).unwrap();
609 // Bucket 60 directly follows bucket 0 — no gap to fill.
610 let out = agg.push(t(11.0, 70)).unwrap();
611 assert_eq!(out.len(), 1);
612 assert_eq!(out[0].timestamp, 0);
613 }
614}