use std::collections::HashMap;
use chrono::{DateTime, Utc};
use crate::{
db::models::OhlcCandleRow,
shared::{Lookback, OhlcResolution, Period},
util::DateTimeExt,
};
use super::error::{BacktestError, Result};
#[derive(Clone)]
struct BucketAccumulator {
bucket_time: DateTime<Utc>,
first_candle_time: DateTime<Utc>,
last_candle_time: DateTime<Utc>,
open: f64,
high: f64,
low: f64,
close: f64,
volume: i64,
min_created_at: DateTime<Utc>,
max_updated_at: DateTime<Utc>,
all_stable: bool,
}
impl BucketAccumulator {
fn new(bucket_time: DateTime<Utc>) -> Self {
Self {
bucket_time,
first_candle_time: DateTime::<Utc>::MAX_UTC,
last_candle_time: DateTime::<Utc>::MIN_UTC,
open: 0.0,
high: f64::MIN,
low: f64::MAX,
close: 0.0,
volume: 0,
min_created_at: DateTime::<Utc>::MAX_UTC,
max_updated_at: DateTime::<Utc>::MIN_UTC,
all_stable: true,
}
}
fn add_candle(&mut self, candle: &OhlcCandleRow) {
if candle.time < self.first_candle_time {
self.first_candle_time = candle.time;
self.open = candle.open;
}
if candle.time > self.last_candle_time {
self.last_candle_time = candle.time;
self.close = candle.close;
}
self.high = self.high.max(candle.high);
self.low = self.low.min(candle.low);
self.volume += candle.volume;
self.min_created_at = self.min_created_at.min(candle.created_at);
self.max_updated_at = self.max_updated_at.max(candle.updated_at);
self.all_stable = self.all_stable && candle.stable;
}
fn to_candle_row(&self, is_complete: bool) -> OhlcCandleRow {
OhlcCandleRow {
time: self.bucket_time,
open: self.open,
high: self.high,
low: self.low,
close: self.close,
volume: self.volume,
created_at: self.min_created_at,
updated_at: self.max_updated_at,
stable: self.all_stable && is_complete,
}
}
}
pub(super) struct RuntimeConsolidator {
lookback: Lookback,
candles: Vec<OhlcCandleRow>,
current_bucket: Option<BucketAccumulator>,
}
impl RuntimeConsolidator {
pub fn new(
lookback: Lookback,
initial_candles: &[OhlcCandleRow],
time_cursor: DateTime<Utc>,
) -> Result<Self> {
let mut consolidator = Self {
lookback,
candles: Vec::with_capacity(lookback.period().as_usize()),
current_bucket: None,
};
if matches!(lookback.resolution(), OhlcResolution::OneMinute) {
let mut last_time: Option<DateTime<Utc>> = None;
for candle in initial_candles {
if candle.time > time_cursor {
continue;
}
if let Some(prev_time) = last_time
&& candle.time < prev_time
{
return Err(BacktestError::OutOfOrderCandle {
candle_time: candle.time,
bucket_time: prev_time,
});
}
last_time = Some(candle.time);
let is_complete = candle.time < time_cursor;
if is_complete {
consolidator.candles.push(candle.clone());
consolidator.trim_old_candles();
} else {
let mut bucket = BucketAccumulator::new(candle.time);
bucket.add_candle(candle);
consolidator.candles.push(bucket.to_candle_row(false));
consolidator.current_bucket = Some(bucket);
consolidator.trim_old_candles();
}
}
return Ok(consolidator);
}
for candle in initial_candles {
if candle.time > time_cursor {
continue;
}
consolidator.push(candle)?;
}
Ok(consolidator)
}
fn trim_old_candles(&mut self) {
while self.candles.len() > self.lookback.period().as_usize() {
self.candles.remove(0);
}
}
fn finalize_current_bucket(&mut self) {
if let Some(current) = self.current_bucket.take() {
if let Some(last) = self.candles.last_mut() {
*last = current.to_candle_row(true);
}
self.trim_old_candles();
}
}
fn sync_current_bucket(&mut self) {
if let Some(current) = &self.current_bucket {
let current_candle = current.to_candle_row(false);
if self.candles.is_empty() {
self.candles.push(current_candle);
} else {
*self.candles.last_mut().unwrap() = current_candle;
}
}
}
fn floor_to_bucket(&self, time: DateTime<Utc>) -> DateTime<Utc> {
time.floor_to_resolution(self.lookback.resolution())
}
pub fn push(&mut self, candle: &OhlcCandleRow) -> Result<()> {
let candle_bucket_time = self.floor_to_bucket(candle.time);
match &mut self.current_bucket {
Some(current) if current.bucket_time == candle_bucket_time => {
current.add_candle(candle);
self.sync_current_bucket();
return Ok(());
}
Some(current) if candle_bucket_time < current.bucket_time => {
return Err(BacktestError::OutOfOrderCandle {
candle_time: candle.time,
bucket_time: current.bucket_time,
});
}
_ => {
self.finalize_current_bucket();
}
}
let mut new_bucket = BucketAccumulator::new(candle_bucket_time);
new_bucket.add_candle(candle);
self.candles.push(new_bucket.to_candle_row(false));
self.current_bucket = Some(new_bucket);
self.trim_old_candles();
Ok(())
}
pub fn get_candles(&self) -> &[OhlcCandleRow] {
&self.candles
}
}
pub(super) struct MultiResolutionConsolidator(HashMap<OhlcResolution, RuntimeConsolidator>);
impl MultiResolutionConsolidator {
pub fn new(
resolution_to_max_period: HashMap<OhlcResolution, Period>,
initial_candles: &[OhlcCandleRow],
time_cursor: DateTime<Utc>,
) -> Result<Self> {
let mut consolidators = HashMap::new();
for (resolution, max_period) in resolution_to_max_period {
let lookback = Lookback::new(resolution, max_period).expect("is valid");
let consolidator = RuntimeConsolidator::new(lookback, initial_candles, time_cursor)?;
consolidators.insert(resolution, consolidator);
}
Ok(Self(consolidators))
}
pub fn push(&mut self, candle: &OhlcCandleRow) -> Result<()> {
for consolidator in self.0.values_mut() {
consolidator.push(candle)?;
}
Ok(())
}
pub fn get_candles(&self, resolution: OhlcResolution) -> Option<&[OhlcCandleRow]> {
self.0.get(&resolution).map(|c| c.get_candles())
}
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::TimeZone;
fn make_candle(time: DateTime<Utc>, price: f64) -> OhlcCandleRow {
OhlcCandleRow {
time,
open: price,
high: price + 100.0,
low: price - 100.0,
close: price,
volume: 100_000,
created_at: time,
updated_at: time,
stable: true,
}
}
fn empty_consolidator(resolution: OhlcResolution, period: u64) -> RuntimeConsolidator {
let time_cursor = Utc.with_ymd_and_hms(2026, 1, 15, 0, 0, 0).unwrap();
let lookback = Lookback::new(resolution, period).unwrap();
RuntimeConsolidator::new(lookback, &[], time_cursor).unwrap()
}
#[test]
fn incremental_push_same_bucket() {
let mut consolidator = empty_consolidator(OhlcResolution::FiveMinutes, 10);
let base_time = Utc.with_ymd_and_hms(2026, 1, 15, 10, 0, 0).unwrap();
consolidator
.push(&make_candle(base_time, 90_000.0))
.unwrap();
consolidator
.push(&make_candle(
base_time + chrono::Duration::minutes(1),
90_100.0,
))
.unwrap();
consolidator
.push(&make_candle(
base_time + chrono::Duration::minutes(2),
90_200.0,
))
.unwrap();
let candles = consolidator.get_candles();
assert_eq!(candles.len(), 1);
assert!(!candles[0].stable); assert_eq!(candles[0].open, 90_000.0);
assert_eq!(candles[0].close, 90_200.0);
}
#[test]
fn incremental_push_new_bucket_finalizes_previous() {
let mut consolidator = empty_consolidator(OhlcResolution::FiveMinutes, 10);
let base_time = Utc.with_ymd_and_hms(2026, 1, 15, 10, 0, 0).unwrap();
consolidator
.push(&make_candle(base_time, 90_000.0))
.unwrap();
consolidator
.push(&make_candle(
base_time + chrono::Duration::minutes(1),
90_100.0,
))
.unwrap();
consolidator
.push(&make_candle(
base_time + chrono::Duration::minutes(5),
91_000.0,
))
.unwrap();
let candles = consolidator.get_candles();
assert_eq!(candles.len(), 2);
assert!(candles[0].stable);
assert_eq!(
candles[0].time,
Utc.with_ymd_and_hms(2026, 1, 15, 10, 0, 0).unwrap()
);
assert!(!candles[1].stable);
assert_eq!(
candles[1].time,
Utc.with_ymd_and_hms(2026, 1, 15, 10, 5, 0).unwrap()
);
}
#[test]
fn new_with_initial_candles() {
let candles: Vec<OhlcCandleRow> = (0..90)
.map(|i| {
let time = Utc.with_ymd_and_hms(2026, 1, 15, 9, 0, 0).unwrap()
+ chrono::Duration::minutes(i);
make_candle(time, 90_000.0 + i as f64 * 10.0)
})
.collect();
let time_cursor = Utc.with_ymd_and_hms(2026, 1, 15, 10, 30, 0).unwrap();
let lookback = Lookback::new(OhlcResolution::OneHour, 10).unwrap();
let consolidator = RuntimeConsolidator::new(lookback, &candles, time_cursor).unwrap();
let result = consolidator.get_candles();
assert_eq!(result.len(), 2);
assert_eq!(
result[0].time,
Utc.with_ymd_and_hms(2026, 1, 15, 9, 0, 0).unwrap()
);
assert!(result[0].stable);
assert_eq!(
result[1].time,
Utc.with_ymd_and_hms(2026, 1, 15, 10, 0, 0).unwrap()
);
assert!(!result[1].stable);
}
#[test]
fn lookback_trimming() {
let mut consolidator = empty_consolidator(OhlcResolution::FiveMinutes, 5);
let base_time = Utc.with_ymd_and_hms(2026, 1, 15, 10, 0, 0).unwrap();
for i in 0..7 {
consolidator
.push(&make_candle(
base_time + chrono::Duration::minutes(i * 5),
90_000.0,
))
.unwrap();
}
let candles = consolidator.get_candles();
assert_eq!(candles.len(), 5);
assert_eq!(
candles[0].time,
Utc.with_ymd_and_hms(2026, 1, 15, 10, 10, 0).unwrap()
);
assert_eq!(
candles[4].time,
Utc.with_ymd_and_hms(2026, 1, 15, 10, 30, 0).unwrap()
);
assert!(!candles[4].stable);
}
#[test]
fn aggregates_ohlc_correctly() {
let mut consolidator = empty_consolidator(OhlcResolution::FiveMinutes, 10);
let base_time = Utc.with_ymd_and_hms(2026, 1, 15, 10, 0, 0).unwrap();
let candles = vec![
OhlcCandleRow {
time: base_time,
open: 90_000.0,
high: 90_500.0,
low: 89_800.0,
close: 90_200.0,
volume: 100_000,
created_at: base_time,
updated_at: base_time,
stable: true,
},
OhlcCandleRow {
time: base_time + chrono::Duration::minutes(1),
open: 90_200.0,
high: 91_000.0,
low: 90_100.0,
close: 90_800.0,
volume: 150_000,
created_at: base_time,
updated_at: base_time,
stable: true,
},
OhlcCandleRow {
time: base_time + chrono::Duration::minutes(2),
open: 90_800.0,
high: 90_900.0,
low: 89_500.0,
close: 89_700.0,
volume: 200_000,
created_at: base_time,
updated_at: base_time,
stable: true,
},
];
for candle in &candles {
consolidator.push(candle).unwrap();
}
consolidator
.push(&make_candle(
base_time + chrono::Duration::minutes(5),
91_000.0,
))
.unwrap();
let result = consolidator.get_candles();
let consolidated = &result[0];
assert_eq!(consolidated.open, 90_000.0); assert_eq!(consolidated.high, 91_000.0); assert_eq!(consolidated.low, 89_500.0); assert_eq!(consolidated.close, 89_700.0); assert_eq!(consolidated.volume, 450_000); assert!(consolidated.stable);
}
#[test]
fn one_minute_resolution_passthrough() {
let base_time = Utc.with_ymd_and_hms(2026, 1, 15, 10, 0, 0).unwrap();
let candles: Vec<OhlcCandleRow> = (0..10)
.map(|i| {
let time = base_time + chrono::Duration::minutes(i);
make_candle(time, 90_000.0 + i as f64 * 10.0)
})
.collect();
let time_cursor = Utc.with_ymd_and_hms(2026, 1, 15, 10, 8, 0).unwrap();
let lookback = Lookback::new(OhlcResolution::OneMinute, 5).unwrap();
let consolidator = RuntimeConsolidator::new(lookback, &candles, time_cursor).unwrap();
let result = consolidator.get_candles();
assert_eq!(result.len(), 5);
for candle in &result[..4] {
assert!(candle.stable);
}
assert!(!result[4].stable); }
}