use std::{fmt::Debug, ops::Range};
use chrono::{DateTime, Utc};
use crate::{
data::event::{
EconomicCalendarId, EmaId, MarketEvent, OhlcvId, RsiId, SmaId, StreamId, TpoId, TradesId,
VolumeProfileId,
},
sim::data::EventMap,
sorted_vec_map::SortedVecMap,
};
pub trait StreamEntity: StreamId {
type Storage;
}
impl StreamEntity for OhlcvId {
type Storage = EventMap<Self>;
}
impl StreamEntity for TradesId {
type Storage = EventMap<Self>;
}
impl StreamEntity for EconomicCalendarId {
type Storage = EventMap<Self>;
}
impl StreamEntity for VolumeProfileId {
type Storage = EventMap<Self>;
}
impl StreamEntity for TpoId {
type Storage = EventMap<Self>;
}
impl StreamEntity for EmaId {
type Storage = EventMap<Self>;
}
impl StreamEntity for SmaId {
type Storage = EventMap<Self>;
}
impl StreamEntity for RsiId {
type Storage = EventMap<Self>;
}
pub trait StreamCursor {
type Storage;
fn new(data: &Self::Storage) -> Self;
fn advance(&mut self, data: &Self::Storage, ts: DateTime<Utc>);
fn rewind(&mut self);
fn to_end(&mut self, data: &Self::Storage);
fn find_first_open_at_or_after(
&self,
data: &Self::Storage,
ts: DateTime<Utc>,
) -> Option<DateTime<Utc>>;
fn find_first_point_in_time_at_or_after(
&self,
data: &Self::Storage,
ts: DateTime<Utc>,
) -> Option<DateTime<Utc>>;
fn is_done(&self, data: &Self::Storage) -> bool;
fn next_point_in_time(&self, data: &Self::Storage) -> Option<DateTime<Utc>>;
}
#[derive(Debug, Clone)]
pub struct Cursor<S: StreamId>(pub SortedVecMap<S, Range<usize>>);
impl<S: StreamId> Default for Cursor<S> {
fn default() -> Self {
Self(SortedVecMap::new())
}
}
pub type OhlcvCursor = Cursor<OhlcvId>;
pub type TradeCursor = Cursor<TradesId>;
pub type EconomicCalendarCursor = Cursor<EconomicCalendarId>;
pub type VolumeProfileCursor = Cursor<VolumeProfileId>;
pub type TpoCursor = Cursor<TpoId>;
pub type EmaCursor = Cursor<EmaId>;
pub type SmaCursor = Cursor<SmaId>;
pub type RsiCursor = Cursor<RsiId>;
impl<S> StreamCursor for Cursor<S>
where
S: StreamEntity<Storage = EventMap<S>>,
{
type Storage = S::Storage;
fn new(data: &EventMap<S>) -> Self {
Self(data.iter().map(|(id, _)| (*id, 0..0)).collect())
}
fn advance(&mut self, data: &EventMap<S>, ts: DateTime<Utc>) {
self.0
.iter_mut()
.zip(data.iter())
.for_each(|((cursor_id, range), (data_id, events))| {
debug_assert_eq!(cursor_id, data_id, "Cursor desynchronized from Storage!");
let future_events = &events[range.end..];
if future_events
.first()
.is_some_and(|e| e.point_in_time() <= ts)
{
let advance_by = future_events
.iter()
.position(|e| e.point_in_time() > ts)
.unwrap_or(future_events.len());
range.end += advance_by;
}
})
}
fn rewind(&mut self) {
self.0.iter_mut().for_each(|(_, range)| range.end = 0);
}
fn to_end(&mut self, data: &EventMap<S>) {
self.0
.iter_mut()
.zip(data.iter())
.for_each(|((cursor_id, range), (data_id, events))| {
debug_assert_eq!(cursor_id, data_id, "Cursor desynchronized from Storage!");
range.end = events.len();
});
}
fn find_first_open_at_or_after(
&self,
data: &EventMap<S>,
ts: DateTime<Utc>,
) -> Option<DateTime<Utc>> {
self.0
.iter()
.zip(data.iter())
.filter_map(|((cursor_id, range), (data_id, events))| {
debug_assert_eq!(cursor_id, data_id, "Cursor desynchronized from Storage!");
events[range.end..]
.iter()
.find(|e| e.opened_at() >= ts)
.map(|e| e.opened_at())
})
.min()
}
fn find_first_point_in_time_at_or_after(
&self,
data: &EventMap<S>,
ts: DateTime<Utc>,
) -> Option<DateTime<Utc>> {
self.0
.iter()
.zip(data.iter())
.filter_map(|((cursor_id, range), (data_id, events))| {
debug_assert_eq!(cursor_id, data_id, "Cursor desynchronized from Storage!");
events[range.end..]
.iter()
.find(|e| e.point_in_time() >= ts)
.map(|e| e.point_in_time())
})
.min()
}
fn next_point_in_time(&self, data: &EventMap<S>) -> Option<DateTime<Utc>> {
self.0
.iter()
.zip(data.iter())
.filter_map(|((cursor_id, range), (data_id, events))| {
debug_assert_eq!(cursor_id, data_id, "Cursor desynchronized from Storage!");
events.get(range.end).map(|e| e.point_in_time())
})
.min()
}
fn is_done(&self, data: &EventMap<S>) -> bool {
self.0
.iter()
.zip(data.iter())
.all(|((cursor_id, range), (data_id, events))| {
debug_assert_eq!(cursor_id, data_id, "Cursor desynchronized from Storage!");
range.end >= events.len()
})
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::data::{
domain::{
CountryCode, DataBroker, EconomicEventImpact, EconomicValue, Exchange, Period, Price,
Quantity, SpotPair, Symbol,
},
event::{EconomicCalendarId, EconomicEvent, Ohlcv, OhlcvId, TradeEvent, TradesId},
};
fn ts(s: &str) -> DateTime<Utc> {
DateTime::parse_from_rfc3339(s).unwrap().with_timezone(&Utc)
}
fn ohlcv(open_ts: DateTime<Utc>, close_ts: DateTime<Utc>) -> Ohlcv {
Ohlcv {
open_timestamp: open_ts,
close_timestamp: close_ts,
open: Price(100.0),
high: Price(110.0),
low: Price(90.0),
close: Price(105.0),
volume: Quantity(1000.0),
quote_asset_volume: None,
number_of_trades: None,
taker_buy_base_asset_volume: None,
taker_buy_quote_asset_volume: None,
}
}
fn ohlcv_id(period: Period) -> OhlcvId {
OhlcvId {
broker: DataBroker::Binance,
exchange: Exchange::Binance,
symbol: Symbol::Spot(SpotPair::BtcUsdt),
period,
}
}
fn trade(timestamp: DateTime<Utc>) -> TradeEvent {
TradeEvent {
timestamp,
price: Price(100.0),
quantity: Quantity(1.0),
trade_id: None,
quote_asset_volume: None,
is_buyer_maker: None,
is_best_match: None,
}
}
fn trade_id() -> TradesId {
TradesId {
broker: DataBroker::Binance,
exchange: Exchange::Binance,
symbol: Symbol::Spot(SpotPair::BtcUsdt),
}
}
fn economic_event(timestamp: DateTime<Utc>, name: &str) -> EconomicEvent {
EconomicEvent {
timestamp,
data_source: "investingcom".to_string(),
category: "Inflation".to_string(),
news_name: name.to_string(),
country_code: CountryCode::Us,
currency_code: "USD".to_string(),
economic_impact: EconomicEventImpact::High,
news_type: Some("CPI".to_string()),
news_type_confidence: Some(0.95),
news_type_source: Some("manual".to_string()),
period: Some("yoy".to_string()),
actual: Some(EconomicValue(2.5)),
forecast: Some(EconomicValue(2.3)),
previous: Some(EconomicValue(2.4)),
}
}
fn econ_id() -> EconomicCalendarId {
EconomicCalendarId {
broker: DataBroker::InvestingCom,
data_source: None,
country_code: Some(CountryCode::Us),
category: None,
importance: None,
}
}
#[test]
fn test_cursor_new_initializes_at_zero() {
let id = ohlcv_id(Period::Minute(3));
let events = vec![
ohlcv(ts("2025-01-01T00:00:00Z"), ts("2025-01-01T00:03:00Z")),
ohlcv(ts("2025-01-01T00:03:00Z"), ts("2025-01-01T00:06:00Z")),
];
let mut data = SortedVecMap::new();
data.insert(id, events.into_boxed_slice());
let cursor = OhlcvCursor::new(&data);
assert_eq!(
cursor.0.get(&id).unwrap(),
&(0..0),
"Cursor should initialize with empty range"
);
}
#[test]
fn test_cursor_simultaneous_events_news_case() {
let id = econ_id();
let events = vec![
economic_event(ts("2025-06-15T08:30:00Z"), "CPI YoY"),
economic_event(ts("2025-06-15T08:30:00Z"), "CPI MoM"),
economic_event(ts("2025-06-15T08:30:00Z"), "CPI QoQ"),
economic_event(ts("2025-06-15T08:31:00Z"), "Core CPI"),
];
let mut data = SortedVecMap::new();
data.insert(id, events.into_boxed_slice());
let mut cursor = EconomicCalendarCursor::new(&data);
cursor.advance(&data, ts("2025-06-15T08:30:00Z"));
assert_eq!(
cursor.0.get(&id).unwrap(),
&(0..3),
"All three simultaneous news events at 08:30:00 should be visible, \
but NOT the event at 08:31:00. Logic: position(|e| e.point_in_time() > ts)"
);
}
#[test]
fn test_cursor_simultaneous_events_ohlcv_case() {
let id = ohlcv_id(Period::Minute(1));
let events = vec![
ohlcv(ts("2025-03-01T00:02:00Z"), ts("2025-03-01T00:03:00Z")),
ohlcv(ts("2025-03-01T00:00:00Z"), ts("2025-03-01T00:03:00Z")),
ohlcv(ts("2025-03-01T00:02:00Z"), ts("2025-03-01T00:03:00Z")),
ohlcv(ts("2025-03-01T00:03:00Z"), ts("2025-03-01T00:04:00Z")),
];
let mut data = SortedVecMap::new();
data.insert(id, events.into_boxed_slice());
let mut cursor = OhlcvCursor::new(&data);
cursor.advance(&data, ts("2025-03-01T00:03:00Z"));
assert_eq!(
cursor.0.get(&id).unwrap(),
&(0..3),
"Should consume ALL THREE events at 00:03:00, but NOT the one at 00:04:00"
);
}
#[test]
fn test_cursor_strict_forward_only_slice_grows() {
let id = ohlcv_id(Period::Minute(3));
let events = vec![
ohlcv(ts("2025-02-01T00:00:00Z"), ts("2025-02-01T00:03:00Z")),
ohlcv(ts("2025-02-01T00:03:00Z"), ts("2025-02-01T00:06:00Z")),
ohlcv(ts("2025-02-01T00:06:00Z"), ts("2025-02-01T00:09:00Z")),
ohlcv(ts("2025-02-01T00:09:00Z"), ts("2025-02-01T00:12:00Z")),
];
let mut data = SortedVecMap::new();
data.insert(id, events.into_boxed_slice());
let mut cursor = OhlcvCursor::new(&data);
cursor.advance(&data, ts("2025-02-01T00:03:00Z"));
assert_eq!(
cursor.0.get(&id).unwrap(),
&(0..1),
"Step 1: 1 event visible"
);
cursor.advance(&data, ts("2025-02-01T00:06:00Z"));
assert_eq!(
cursor.0.get(&id).unwrap(),
&(0..2),
"Step 2: slice grew to 2 events"
);
cursor.advance(&data, ts("2025-02-01T00:12:00Z"));
assert_eq!(
cursor.0.get(&id).unwrap(),
&(0..4),
"Step 3: slice grew to all 4 events"
);
cursor.advance(&data, ts("2025-02-01T00:06:00Z"));
assert_eq!(
cursor.0.get(&id).unwrap(),
&(0..4),
"Step 4: slice MUST NOT shrink on earlier timestamp"
);
}
#[test]
fn test_cursor_advance_before_any_events() {
let id = ohlcv_id(Period::Minute(3));
let events = vec![
ohlcv(ts("2025-04-01T00:00:00Z"), ts("2025-04-01T00:03:00Z")),
];
let mut data = SortedVecMap::new();
data.insert(id, events.into_boxed_slice());
let mut cursor = OhlcvCursor::new(&data);
cursor.advance(&data, ts("2025-04-01T00:02:00Z"));
assert_eq!(
cursor.0.get(&id).unwrap(),
&(0..0),
"Should not consume events before they're available"
);
}
#[test]
fn test_cursor_advance_exactly_at_availability() {
let id = ohlcv_id(Period::Minute(5));
let events = vec![ohlcv(
ts("2025-05-01T00:00:00Z"),
ts("2025-05-01T00:05:00Z"),
)];
let mut data = SortedVecMap::new();
data.insert(id, events.into_boxed_slice());
let mut cursor = OhlcvCursor::new(&data);
cursor.advance(&data, ts("2025-05-01T00:05:00Z"));
assert_eq!(
cursor.0.get(&id).unwrap(),
&(0..1),
"Event should be consumed when ts == point_in_time()"
);
}
#[test]
fn test_cursor_large_jump_consumes_all_intermediate() {
let id = ohlcv_id(Period::Minute(3));
let events = vec![
ohlcv(ts("2025-07-01T00:00:00Z"), ts("2025-07-01T00:03:00Z")),
ohlcv(ts("2025-07-01T00:03:00Z"), ts("2025-07-01T00:06:00Z")),
ohlcv(ts("2025-07-01T00:06:00Z"), ts("2025-07-01T00:09:00Z")),
ohlcv(ts("2025-07-01T00:09:00Z"), ts("2025-07-01T00:12:00Z")),
ohlcv(ts("2025-07-01T00:12:00Z"), ts("2025-07-01T00:15:00Z")),
];
let mut data = SortedVecMap::new();
data.insert(id, events.into_boxed_slice());
let mut cursor = OhlcvCursor::new(&data);
cursor.advance(&data, ts("2025-07-01T00:12:00Z"));
assert_eq!(
cursor.0.get(&id).unwrap(),
&(0..4),
"Should consume all events up to and including timestamp"
);
}
#[test]
fn test_cursor_multiple_streams_different_availability() {
let id_3m = OhlcvId {
broker: DataBroker::Binance,
exchange: Exchange::Binance,
symbol: Symbol::Spot(SpotPair::BtcUsdt),
period: Period::Minute(3),
};
let id_5m = OhlcvId {
broker: DataBroker::Binance,
exchange: Exchange::Binance,
symbol: Symbol::Spot(SpotPair::BtcUsdt),
period: Period::Minute(5),
};
let events_3m = vec![
ohlcv(ts("2025-08-01T00:00:00Z"), ts("2025-08-01T00:03:00Z")),
ohlcv(ts("2025-08-01T00:03:00Z"), ts("2025-08-01T00:06:00Z")),
];
let events_5m = vec![
ohlcv(ts("2025-08-01T00:00:00Z"), ts("2025-08-01T00:05:00Z")),
ohlcv(ts("2025-08-01T00:05:00Z"), ts("2025-08-01T00:10:00Z")),
];
let mut data = SortedVecMap::new();
data.insert(id_3m, events_3m.into_boxed_slice());
data.insert(id_5m, events_5m.into_boxed_slice());
let mut cursor = OhlcvCursor::new(&data);
cursor.advance(&data, ts("2025-08-01T00:05:00Z"));
assert_eq!(
cursor.0.get(&id_3m).unwrap(),
&(0..1),
"3m stream: only event at 00:03 is consumed (00:06 > 00:05)"
);
assert_eq!(
cursor.0.get(&id_5m).unwrap(),
&(0..1),
"5m stream: only event at 00:05 is consumed (00:10 > 00:05)"
);
}
#[test]
fn test_cursor_is_done() {
let id = ohlcv_id(Period::Minute(3));
let events = vec![ohlcv(
ts("2025-09-01T00:00:00Z"),
ts("2025-09-01T00:03:00Z"),
)];
let mut data = SortedVecMap::new();
data.insert(id, events.into_boxed_slice());
let mut cursor = OhlcvCursor::new(&data);
assert!(!cursor.is_done(&data), "Should not be done at start");
cursor.advance(&data, ts("2025-09-01T00:03:00Z"));
assert!(
cursor.is_done(&data),
"Should be done after consuming all events"
);
}
#[test]
fn test_cursor_rewind() {
let id = ohlcv_id(Period::Minute(3));
let events = vec![
ohlcv(ts("2025-10-01T00:00:00Z"), ts("2025-10-01T00:03:00Z")),
ohlcv(ts("2025-10-01T00:03:00Z"), ts("2025-10-01T00:06:00Z")),
];
let mut data = SortedVecMap::new();
data.insert(id, events.into_boxed_slice());
let mut cursor = OhlcvCursor::new(&data);
cursor.advance(&data, ts("2025-10-01T00:06:00Z"));
assert_eq!(cursor.0.get(&id).unwrap().end, 2);
cursor.rewind();
assert_eq!(
cursor.0.get(&id).unwrap(),
&(0..0),
"Rewind should reset to zero"
);
}
#[test]
fn test_cursor_to_end() {
let id = ohlcv_id(Period::Minute(3));
let events = vec![
ohlcv(ts("2025-11-01T00:00:00Z"), ts("2025-11-01T00:03:00Z")),
ohlcv(ts("2025-11-01T00:03:00Z"), ts("2025-11-01T00:06:00Z")),
ohlcv(ts("2025-11-01T00:06:00Z"), ts("2025-11-01T00:09:00Z")),
];
let mut data = SortedVecMap::new();
data.insert(id, events.into_boxed_slice());
let mut cursor = OhlcvCursor::new(&data);
cursor.to_end(&data);
assert_eq!(
cursor.0.get(&id).unwrap(),
&(0..3),
"to_end should move cursor to include all events"
);
}
#[test]
fn test_cursor_next_point_in_time() {
let id = ohlcv_id(Period::Minute(3));
let events = vec![
ohlcv(ts("2025-12-01T00:00:00Z"), ts("2025-12-01T00:03:00Z")),
ohlcv(ts("2025-12-01T00:03:00Z"), ts("2025-12-01T00:06:00Z")),
];
let mut data = SortedVecMap::new();
data.insert(id, events.into_boxed_slice());
let cursor = OhlcvCursor::new(&data);
assert_eq!(
cursor.next_point_in_time(&data),
Some(ts("2025-12-01T00:03:00Z")),
"Should return the first unconsumed event's availability"
);
}
#[test]
fn test_cursor_find_first_open_at_or_after() {
let id = ohlcv_id(Period::Minute(3));
let events = vec![
ohlcv(ts("2026-01-01T00:00:00Z"), ts("2026-01-01T00:03:00Z")),
ohlcv(ts("2026-01-01T00:03:00Z"), ts("2026-01-01T00:06:00Z")),
ohlcv(ts("2026-01-01T00:06:00Z"), ts("2026-01-01T00:09:00Z")),
];
let mut data = SortedVecMap::new();
data.insert(id, events.into_boxed_slice());
let cursor = OhlcvCursor::new(&data);
let result = cursor.find_first_open_at_or_after(&data, ts("2026-01-01T00:02:00Z"));
assert_eq!(
result,
Some(ts("2026-01-01T00:03:00Z")),
"Should find candle opening at 00:03:00"
);
let result = cursor.find_first_open_at_or_after(&data, ts("2026-01-01T00:03:00Z"));
assert_eq!(
result,
Some(ts("2026-01-01T00:03:00Z")),
"Should find candle opening exactly at requested time"
);
}
#[test]
fn test_cursor_find_first_point_in_time_at_or_after() {
let id = ohlcv_id(Period::Minute(3));
let events = vec![
ohlcv(ts("2026-01-05T00:00:00Z"), ts("2026-01-05T00:03:00Z")),
ohlcv(ts("2026-01-05T00:03:00Z"), ts("2026-01-05T00:06:00Z")),
ohlcv(ts("2026-01-05T00:06:00Z"), ts("2026-01-05T00:09:00Z")),
];
let mut data = SortedVecMap::new();
data.insert(id, events.into_boxed_slice());
let cursor = OhlcvCursor::new(&data);
let result = cursor.find_first_point_in_time_at_or_after(&data, ts("2026-01-05T00:05:00Z"));
assert_eq!(
result,
Some(ts("2026-01-05T00:06:00Z")),
"Should find next available event after the query time"
);
}
#[test]
fn test_cursor_lockstep_invariant() {
let id1 = OhlcvId {
broker: DataBroker::Binance,
exchange: Exchange::Binance,
symbol: Symbol::Spot(SpotPair::BtcUsdt),
period: Period::Minute(3),
};
let id2 = OhlcvId {
broker: DataBroker::Binance,
exchange: Exchange::Binance,
symbol: Symbol::Spot(SpotPair::EthUsdt),
period: Period::Minute(3),
};
let events1 = vec![ohlcv(
ts("2026-01-10T00:00:00Z"),
ts("2026-01-10T00:03:00Z"),
)];
let events2 = vec![ohlcv(
ts("2026-01-10T00:00:00Z"),
ts("2026-01-10T00:03:00Z"),
)];
let mut data = SortedVecMap::new();
data.insert(id1, events1.into_boxed_slice());
data.insert(id2, events2.into_boxed_slice());
let mut cursor = OhlcvCursor::new(&data);
assert_eq!(
cursor.0.len(),
data.len(),
"Cursor should have same number of keys as data"
);
cursor.advance(&data, ts("2026-01-10T00:03:00Z"));
assert_eq!(cursor.0.get(&id1).unwrap(), &(0..1));
assert_eq!(cursor.0.get(&id2).unwrap(), &(0..1));
}
#[test]
fn test_cursor_lockstep_invariant_mixed_types() {
let ohlcv_id = OhlcvId {
broker: DataBroker::Binance,
exchange: Exchange::Binance,
symbol: Symbol::Spot(SpotPair::BtcUsdt),
period: Period::Minute(3),
};
let ohlcv_events = vec![ohlcv(
ts("2026-01-10T00:00:00Z"),
ts("2026-01-10T00:03:00Z"),
)];
let trade_id = trade_id();
let trade_events = vec![trade(ts("2026-01-10T00:02:00Z"))];
let mut ohlcv_map = SortedVecMap::new();
ohlcv_map.insert(ohlcv_id, ohlcv_events.into_boxed_slice());
let mut trade_map = SortedVecMap::new();
trade_map.insert(trade_id, trade_events.into_boxed_slice());
let mut ohlcv_cursor = OhlcvCursor::new(&ohlcv_map);
let mut trade_cursor = TradeCursor::new(&trade_map);
assert_eq!(ohlcv_cursor.0.len(), ohlcv_map.len());
assert_eq!(trade_cursor.0.len(), trade_map.len());
trade_cursor.advance(&trade_map, ts("2026-01-10T00:02:00Z"));
assert_eq!(trade_cursor.0.get(&trade_id).unwrap(), &(0..1));
ohlcv_cursor.advance(&ohlcv_map, ts("2026-01-10T00:03:00Z"));
assert_eq!(ohlcv_cursor.0.get(&ohlcv_id).unwrap(), &(0..1));
}
}