use crate::error::{Error, Result};
use wickra_core::{Candle, Tick};
pub const MAX_GAP_FILL_CANDLES: i64 = 1_000_000;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct Timeframe {
bucket: i64,
}
impl Timeframe {
pub fn new(bucket: i64) -> Result<Self> {
if bucket <= 0 {
return Err(Error::InvalidTimeframe(format!(
"bucket size must be positive, got {bucket}"
)));
}
Ok(Self { bucket })
}
pub fn millis(ms: i64) -> Result<Self> {
Self::new(ms)
}
pub fn seconds(s: i64) -> Result<Self> {
Self::new(s)
}
pub fn one_minute_ms() -> Self {
Self::new(60_000).expect("60_000 > 0")
}
pub fn minutes(n: i64) -> Result<Self> {
let bucket = n
.checked_mul(60)
.ok_or_else(|| Error::InvalidTimeframe(format!("{n} minutes overflows i64 seconds")))?;
Self::new(bucket)
}
pub fn hours(n: i64) -> Result<Self> {
let bucket = n
.checked_mul(3_600)
.ok_or_else(|| Error::InvalidTimeframe(format!("{n} hours overflows i64 seconds")))?;
Self::new(bucket)
}
pub fn days(n: i64) -> Result<Self> {
let bucket = n
.checked_mul(86_400)
.ok_or_else(|| Error::InvalidTimeframe(format!("{n} days overflows i64 seconds")))?;
Self::new(bucket)
}
pub const fn bucket(self) -> i64 {
self.bucket
}
pub fn floor(self, ts: i64) -> i64 {
ts.saturating_sub(ts.rem_euclid(self.bucket))
}
}
#[derive(Debug, Clone)]
pub struct TickAggregator {
timeframe: Timeframe,
open_bar: Option<OpenBar>,
fill_gaps: bool,
}
#[derive(Debug, Clone, Copy)]
struct OpenBar {
bucket_start: i64,
last_ts: i64,
open: f64,
high: f64,
low: f64,
close: f64,
volume: f64,
}
impl OpenBar {
fn from_tick(t: Tick, bucket_start: i64) -> Self {
Self {
bucket_start,
last_ts: t.timestamp,
open: t.price,
high: t.price,
low: t.price,
close: t.price,
volume: t.volume,
}
}
fn absorb(&mut self, t: Tick) {
if t.price > self.high {
self.high = t.price;
}
if t.price < self.low {
self.low = t.price;
}
self.close = t.price;
self.volume += t.volume;
self.last_ts = t.timestamp;
}
fn into_candle(self) -> Result<Candle> {
Candle::new(
self.open,
self.high,
self.low,
self.close,
self.volume,
self.bucket_start,
)
.map_err(Error::from)
}
}
impl TickAggregator {
pub fn new(timeframe: Timeframe) -> Self {
Self {
timeframe,
open_bar: None,
fill_gaps: false,
}
}
#[must_use]
pub fn with_gap_fill(mut self, fill: bool) -> Self {
self.fill_gaps = fill;
self
}
pub const fn fills_gaps(&self) -> bool {
self.fill_gaps
}
pub fn push(&mut self, tick: Tick) -> Result<Vec<Candle>> {
let bucket = self.timeframe.floor(tick.timestamp);
if let Some(mut bar) = self.open_bar {
if bucket < bar.bucket_start {
return Err(Error::Malformed(format!(
"tick timestamp {} is older than the open bar start {}",
tick.timestamp, bar.bucket_start
)));
}
if bucket > bar.bucket_start {
let closed = bar.into_candle()?;
let mut out = Vec::with_capacity(1);
out.push(closed);
if self.fill_gaps {
self.fill_between(closed, bucket, &mut out)?;
}
self.open_bar = Some(OpenBar::from_tick(tick, bucket));
return Ok(out);
}
if tick.timestamp < bar.last_ts {
return Err(Error::Malformed(format!(
"tick timestamp {} predates the last tick {} in the same bucket",
tick.timestamp, bar.last_ts
)));
}
bar.absorb(tick);
self.open_bar = Some(bar);
return Ok(Vec::new());
}
self.open_bar = Some(OpenBar::from_tick(tick, bucket));
Ok(Vec::new())
}
fn fill_between(&self, prev: Candle, next_bucket: i64, out: &mut Vec<Candle>) -> Result<()> {
let step = self.timeframe.bucket();
let start = prev
.timestamp
.checked_add(step)
.ok_or_else(|| Error::Malformed("timestamp overflow while gap-filling".to_string()))?;
if start >= next_bucket {
return Ok(());
}
let span = next_bucket.saturating_sub(start);
let gap_count = span / step + i64::from(span % step != 0);
if gap_count > MAX_GAP_FILL_CANDLES {
return Err(Error::Malformed(format!(
"gap-fill between bucket {} and {next_bucket} would emit {gap_count} \
flat candles at step {step}, exceeding the {MAX_GAP_FILL_CANDLES} \
cap; reject the discontinuity instead of allocating",
prev.timestamp
)));
}
out.reserve(gap_count as usize);
let mut t = start;
for _ in 0..gap_count {
out.push(Candle::new_unchecked(
prev.close, prev.close, prev.close, prev.close, 0.0, t,
));
t = t.saturating_add(step);
}
Ok(())
}
pub fn flush(&mut self) -> Result<Option<Candle>> {
self.open_bar.take().map(OpenBar::into_candle).transpose()
}
pub const fn timeframe(&self) -> Timeframe {
self.timeframe
}
}
#[cfg(test)]
mod tests {
use super::*;
fn t(price: f64, ts: i64) -> Tick {
Tick::new(price, 1.0, ts).unwrap()
}
#[test]
fn timeframe_rejects_non_positive() {
assert!(Timeframe::new(0).is_err());
assert!(Timeframe::new(-1).is_err());
}
#[test]
fn timeframe_convenience_constructors() {
assert_eq!(Timeframe::millis(250).unwrap().bucket(), 250);
assert!(Timeframe::millis(0).is_err());
assert_eq!(Timeframe::seconds(30).unwrap().bucket(), 30);
assert!(Timeframe::seconds(-1).is_err());
assert_eq!(Timeframe::one_minute_ms().bucket(), 60_000);
}
#[test]
fn aggregator_timeframe_getter() {
let tf = Timeframe::new(60).unwrap();
let agg = TickAggregator::new(tf);
assert_eq!(agg.timeframe().bucket(), 60);
}
#[test]
fn minute_hour_day_constructors_compute_seconds() {
assert_eq!(Timeframe::minutes(1).unwrap().bucket(), 60);
assert_eq!(Timeframe::minutes(5).unwrap().bucket(), 300);
assert_eq!(Timeframe::hours(1).unwrap().bucket(), 3_600);
assert_eq!(Timeframe::hours(4).unwrap().bucket(), 14_400);
assert_eq!(Timeframe::days(1).unwrap().bucket(), 86_400);
assert_eq!(Timeframe::days(7).unwrap().bucket(), 604_800);
}
#[test]
fn minute_hour_day_constructors_reject_non_positive() {
for n in [0, -1, -60] {
assert!(Timeframe::minutes(n).is_err());
assert!(Timeframe::hours(n).is_err());
assert!(Timeframe::days(n).is_err());
}
}
#[test]
fn minute_hour_day_constructors_reject_overflow() {
assert!(matches!(
Timeframe::minutes(i64::MAX),
Err(Error::InvalidTimeframe(_))
));
assert!(matches!(
Timeframe::hours(i64::MAX),
Err(Error::InvalidTimeframe(_))
));
assert!(matches!(
Timeframe::days(i64::MAX),
Err(Error::InvalidTimeframe(_))
));
}
#[test]
fn floors_to_bucket_boundary() {
let tf = Timeframe::new(100).unwrap();
assert_eq!(tf.floor(0), 0);
assert_eq!(tf.floor(99), 0);
assert_eq!(tf.floor(100), 100);
assert_eq!(tf.floor(150), 100);
assert_eq!(tf.floor(250), 200);
assert_eq!(tf.floor(-1), -100);
assert_eq!(tf.floor(-100), -100);
assert_eq!(tf.floor(-101), -200);
}
#[test]
fn floor_saturates_instead_of_overflowing_at_min() {
let tf = Timeframe::new(100).unwrap();
assert_eq!(tf.floor(i64::MIN), i64::MIN);
let hi = tf.floor(i64::MAX);
assert!(hi > i64::MAX - 100 && hi % 100 == 0);
}
#[test]
fn aggregates_ticks_into_one_candle_within_bucket() {
let mut agg = TickAggregator::new(Timeframe::new(60).unwrap());
assert!(agg.push(t(10.0, 0)).unwrap().is_empty());
assert!(agg.push(t(12.0, 15)).unwrap().is_empty());
assert!(agg.push(t(8.0, 30)).unwrap().is_empty());
assert!(agg.push(t(11.0, 50)).unwrap().is_empty());
let bar = agg.flush().unwrap().expect("open bar");
assert_eq!(bar.open, 10.0);
assert_eq!(bar.high, 12.0);
assert_eq!(bar.low, 8.0);
assert_eq!(bar.close, 11.0);
assert!((bar.volume - 4.0).abs() < 1e-12);
assert_eq!(bar.timestamp, 0);
}
#[test]
fn emits_candle_on_bucket_crossing() {
let mut agg = TickAggregator::new(Timeframe::new(60).unwrap());
agg.push(t(10.0, 0)).unwrap();
agg.push(t(12.0, 30)).unwrap();
let closed = agg.push(t(15.0, 60)).unwrap();
assert_eq!(closed.len(), 1);
let closed = closed[0];
assert_eq!(closed.open, 10.0);
assert_eq!(closed.high, 12.0);
assert_eq!(closed.low, 10.0);
assert_eq!(closed.close, 12.0);
let still_open = agg.flush().unwrap().unwrap();
assert_eq!(still_open.open, 15.0);
assert_eq!(still_open.timestamp, 60);
}
#[test]
fn rejects_out_of_order_ticks() {
let mut agg = TickAggregator::new(Timeframe::new(60).unwrap());
agg.push(t(10.0, 100)).unwrap();
let err = agg.push(t(11.0, 30)).unwrap_err();
assert!(matches!(err, Error::Malformed(_)));
}
#[test]
fn rejects_same_bucket_out_of_order_tick() {
let mut agg = TickAggregator::new(Timeframe::new(60).unwrap());
agg.push(t(10.0, 50)).unwrap();
let err = agg.push(t(99.0, 10)).unwrap_err();
assert!(matches!(err, Error::Malformed(_)));
assert_eq!(agg.flush().unwrap().unwrap().close, 10.0);
}
#[test]
fn accepts_same_bucket_ticks_sharing_a_timestamp() {
let mut agg = TickAggregator::new(Timeframe::new(60).unwrap());
agg.push(t(10.0, 20)).unwrap();
agg.push(t(12.0, 20)).unwrap();
agg.push(t(11.0, 20)).unwrap();
let bar = agg.flush().unwrap().unwrap();
assert_eq!(bar.high, 12.0);
assert_eq!(bar.close, 11.0);
}
#[test]
fn flushes_a_non_finite_volume_as_an_error() {
let mut agg = TickAggregator::new(Timeframe::new(60).unwrap());
agg.push(Tick::new(10.0, f64::MAX, 0).unwrap()).unwrap();
agg.push(Tick::new(10.0, f64::MAX, 1).unwrap()).unwrap();
let err = agg.flush().unwrap_err();
assert!(matches!(err, Error::Core(_)));
}
#[test]
fn skips_empty_buckets_without_gap_fill() {
let mut agg = TickAggregator::new(Timeframe::new(60).unwrap());
assert!(!agg.fills_gaps());
agg.push(t(10.0, 0)).unwrap();
let closed = agg.push(t(20.0, 200)).unwrap();
assert_eq!(closed.len(), 1, "only the real bar closes");
assert_eq!(closed[0].timestamp, 0);
}
#[test]
fn gap_fill_rejects_runaway_timestamp_jump() {
let mut agg = TickAggregator::new(Timeframe::new(60).unwrap()).with_gap_fill(true);
agg.push(t(10.0, 0)).unwrap();
let err = agg.push(t(20.0, 2_000_000_000)).unwrap_err();
let msg = err.to_string();
assert!(
msg.contains("gap-fill") && msg.contains("cap"),
"expected a malformed-gap error, got: {msg}"
);
}
#[test]
fn gap_fill_at_the_cap_succeeds() {
let mut agg = TickAggregator::new(Timeframe::new(60).unwrap()).with_gap_fill(true);
agg.push(t(10.0, 0)).unwrap();
let out = agg.push(t(20.0, 60_000_060)).unwrap();
assert_eq!(out.len(), 1 + MAX_GAP_FILL_CANDLES as usize);
}
#[test]
fn gap_fill_emits_flat_candles_for_skipped_buckets() {
let mut agg = TickAggregator::new(Timeframe::new(60).unwrap()).with_gap_fill(true);
assert!(agg.fills_gaps());
agg.push(t(10.0, 0)).unwrap();
agg.push(t(13.0, 30)).unwrap(); let out = agg.push(t(20.0, 200)).unwrap();
assert_eq!(out.len(), 3, "real bar + two flat fillers");
let real = out[0];
assert_eq!(real.timestamp, 0);
assert_eq!(real.close, 13.0);
for (filler, ts) in out[1..].iter().zip([60, 120]) {
assert_eq!(filler.timestamp, ts);
assert_eq!(filler.open, 13.0);
assert_eq!(filler.high, 13.0);
assert_eq!(filler.low, 13.0);
assert_eq!(filler.close, 13.0);
assert_eq!(filler.volume, 0.0);
}
assert_eq!(agg.flush().unwrap().unwrap().timestamp, 180);
}
#[test]
fn gap_fill_emits_nothing_extra_for_adjacent_buckets() {
let mut agg = TickAggregator::new(Timeframe::new(60).unwrap()).with_gap_fill(true);
agg.push(t(10.0, 0)).unwrap();
let out = agg.push(t(11.0, 70)).unwrap();
assert_eq!(out.len(), 1);
assert_eq!(out[0].timestamp, 0);
}
}