use std::fmt::Debug;
use chrono::{DateTime, Utc};
use crate::{
data::episode::Episode,
error::{ChapatyResult, DataError},
sim::{
cursor::{
EconomicCalendarCursor, EmaCursor, OhlcvCursor, RsiCursor, SmaCursor, StreamCursor,
TpoCursor, TradeCursor, VolumeProfileCursor,
},
data::SimulationData,
},
};
#[derive(Debug, Clone)]
pub struct CursorGroup {
ohlcv: OhlcvCursor,
trade: TradeCursor,
economic_cal: EconomicCalendarCursor,
vp: VolumeProfileCursor,
tpo: TpoCursor,
ema: EmaCursor,
sma: SmaCursor,
rsi: RsiCursor,
previous_ts: Option<DateTime<Utc>>,
current_ts: DateTime<Utc>,
}
impl CursorGroup {
pub fn new(sim_data: &SimulationData) -> ChapatyResult<Self> {
let start_ts = sim_data.global_availability_start();
let mut group = Self {
ohlcv: OhlcvCursor::new(sim_data.ohlcv()),
trade: TradeCursor::new(sim_data.trade()),
economic_cal: EconomicCalendarCursor::new(sim_data.economic_cal()),
vp: VolumeProfileCursor::new(sim_data.volume_profile()),
tpo: TpoCursor::new(sim_data.tpo()),
ema: EmaCursor::new(sim_data.ema()),
sma: SmaCursor::new(sim_data.sma()),
rsi: RsiCursor::new(sim_data.rsi()),
previous_ts: None,
current_ts: start_ts,
};
group.advance_all(sim_data, start_ts);
Ok(group)
}
pub fn current_ts(&self) -> DateTime<Utc> {
self.current_ts
}
pub fn previous_ts(&self) -> Option<DateTime<Utc>> {
self.previous_ts
}
pub fn ohlcv(&self) -> &OhlcvCursor {
&self.ohlcv
}
pub fn trade(&self) -> &TradeCursor {
&self.trade
}
pub fn economic_cal(&self) -> &EconomicCalendarCursor {
&self.economic_cal
}
pub fn vp(&self) -> &VolumeProfileCursor {
&self.vp
}
pub fn tpo(&self) -> &TpoCursor {
&self.tpo
}
pub fn ema(&self) -> &EmaCursor {
&self.ema
}
pub fn sma(&self) -> &SmaCursor {
&self.sma
}
pub fn rsi(&self) -> &RsiCursor {
&self.rsi
}
pub fn peek(&self, sim_data: &SimulationData) -> Option<DateTime<Utc>> {
[
self.ohlcv.next_point_in_time(sim_data.ohlcv()),
self.trade.next_point_in_time(sim_data.trade()),
self.economic_cal
.next_point_in_time(sim_data.economic_cal()),
self.vp.next_point_in_time(sim_data.volume_profile()),
self.tpo.next_point_in_time(sim_data.tpo()),
self.ema.next_point_in_time(sim_data.ema()),
self.sma.next_point_in_time(sim_data.sma()),
self.rsi.next_point_in_time(sim_data.rsi()),
]
.into_iter()
.flatten()
.min()
}
pub fn step(&mut self, sim_data: &SimulationData, ep: &Episode) -> ChapatyResult<()> {
let Some(mut next_ts) = self.peek(sim_data) else {
return Ok(());
};
next_ts = next_ts.min(ep.end());
if next_ts <= self.current_ts {
return Ok(());
}
self.previous_ts = Some(self.current_ts);
self.current_ts = next_ts;
self.advance_all(sim_data, next_ts);
Ok(())
}
pub fn advance_to_next_episode(
&mut self,
sim_data: &SimulationData,
ep: Episode,
) -> ChapatyResult<Option<Episode>> {
let current_ep_end = ep.end();
let next_start = [
self.ohlcv
.find_first_open_at_or_after(sim_data.ohlcv(), current_ep_end),
self.trade
.find_first_open_at_or_after(sim_data.trade(), current_ep_end),
self.economic_cal
.find_first_open_at_or_after(sim_data.economic_cal(), current_ep_end),
self.vp
.find_first_open_at_or_after(sim_data.volume_profile(), current_ep_end),
self.tpo
.find_first_open_at_or_after(sim_data.tpo(), current_ep_end),
self.ema
.find_first_open_at_or_after(sim_data.ema(), current_ep_end),
self.sma
.find_first_open_at_or_after(sim_data.sma(), current_ep_end),
self.rsi
.find_first_open_at_or_after(sim_data.rsi(), current_ep_end),
]
.into_iter()
.flatten()
.min();
let Some(next_start) = next_start else {
self.advance_all_to_end(sim_data);
return Ok(None);
};
let start_availability_candidate = [
self.ohlcv
.find_first_point_in_time_at_or_after(sim_data.ohlcv(), next_start),
self.trade
.find_first_point_in_time_at_or_after(sim_data.trade(), next_start),
self.economic_cal
.find_first_point_in_time_at_or_after(sim_data.economic_cal(), next_start),
self.vp
.find_first_point_in_time_at_or_after(sim_data.volume_profile(), next_start),
self.tpo
.find_first_point_in_time_at_or_after(sim_data.tpo(), next_start),
self.ema
.find_first_point_in_time_at_or_after(sim_data.ema(), next_start),
self.sma
.find_first_point_in_time_at_or_after(sim_data.sma(), next_start),
self.rsi
.find_first_point_in_time_at_or_after(sim_data.rsi(), next_start),
]
.into_iter()
.flatten()
.min();
let start_availability =
start_availability_candidate.ok_or_else(|| DataError::CausalityViolation {
open: next_start.to_string(),
stream: "Unknown (Aggregation)".to_string(),
})?;
let next_ep = ep.next(next_start);
self.previous_ts = None;
self.current_ts = start_availability;
self.advance_all(sim_data, self.current_ts);
Ok(Some(next_ep))
}
pub fn reset(&mut self, sim_data: &SimulationData) {
self.rewind();
let start_ts = sim_data.global_availability_start();
self.advance_all(sim_data, start_ts);
self.current_ts = start_ts;
self.previous_ts = None;
}
pub fn is_end_of_data(&self, sim_data: &SimulationData) -> bool {
self.ohlcv.is_done(sim_data.ohlcv())
&& self.trade.is_done(sim_data.trade())
&& self.economic_cal.is_done(sim_data.economic_cal())
&& self.vp.is_done(sim_data.volume_profile())
&& self.tpo.is_done(sim_data.tpo())
&& self.ema.is_done(sim_data.ema())
&& self.sma.is_done(sim_data.sma())
&& self.rsi.is_done(sim_data.rsi())
}
}
impl CursorGroup {
fn advance_all_to_end(&mut self, sim_data: &SimulationData) {
self.ohlcv.to_end(sim_data.ohlcv());
self.trade.to_end(sim_data.trade());
self.economic_cal.to_end(sim_data.economic_cal());
self.vp.to_end(sim_data.volume_profile());
self.tpo.to_end(sim_data.tpo());
self.ema.to_end(sim_data.ema());
self.sma.to_end(sim_data.sma());
self.rsi.to_end(sim_data.rsi());
}
fn rewind(&mut self) {
self.ohlcv.rewind();
self.trade.rewind();
self.economic_cal.rewind();
self.vp.rewind();
self.tpo.rewind();
self.ema.rewind();
self.sma.rewind();
self.rsi.rewind();
}
fn advance_all(&mut self, sim_data: &SimulationData, ts: DateTime<Utc>) {
self.ohlcv.advance(sim_data.ohlcv(), ts);
self.trade.advance(sim_data.trade(), ts);
self.economic_cal.advance(sim_data.economic_cal(), ts);
self.vp.advance(sim_data.volume_profile(), ts);
self.tpo.advance(sim_data.tpo(), ts);
self.ema.advance(sim_data.ema(), ts);
self.sma.advance(sim_data.sma(), ts);
self.rsi.advance(sim_data.rsi(), ts);
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::{
data::{
domain::{DataBroker, Exchange, Period, Price, Quantity, SpotPair, Symbol},
episode::{EpisodeBuilder, EpisodeLength},
event::{Ohlcv, OhlcvId, TradeEvent, TradesId},
},
gym::trading::config::EnvConfig,
sim::data::{SimulationDataBuilder, Streams},
sorted_vec_map::SortedVecMap,
};
fn ts(s: &str) -> DateTime<Utc> {
DateTime::parse_from_rfc3339(s).unwrap().with_timezone(&Utc)
}
fn ohlcv(open_ts: DateTime<Utc>, close_ts: DateTime<Utc>) -> Ohlcv {
Ohlcv {
open_timestamp: open_ts,
close_timestamp: close_ts,
open: Price(100.0),
high: Price(110.0),
low: Price(90.0),
close: Price(105.0),
volume: Quantity(1000.0),
quote_asset_volume: None,
number_of_trades: None,
taker_buy_base_asset_volume: None,
taker_buy_quote_asset_volume: None,
}
}
fn ohlcv_id(period: Period) -> OhlcvId {
OhlcvId {
broker: DataBroker::Binance,
exchange: Exchange::Binance,
symbol: Symbol::Spot(SpotPair::BtcUsdt),
period,
}
}
fn trade(timestamp: DateTime<Utc>) -> TradeEvent {
TradeEvent {
timestamp,
price: Price(100.0),
quantity: Quantity(1.0),
trade_id: None,
quote_asset_volume: None,
is_buyer_maker: None,
is_best_match: None,
}
}
fn trade_id() -> TradesId {
TradesId {
broker: DataBroker::Binance,
exchange: Exchange::Binance,
symbol: Symbol::Spot(SpotPair::BtcUsdt),
}
}
fn trade_id_alt() -> TradesId {
TradesId {
broker: DataBroker::Binance,
exchange: Exchange::Binance,
symbol: Symbol::Spot(SpotPair::EthUsdt),
}
}
fn sim_data_with_ohlcv(id: OhlcvId, events: Vec<Ohlcv>) -> SimulationData {
let mut ohlcv_map = SortedVecMap::new();
ohlcv_map.insert(id, events.into_boxed_slice());
let streams = Streams::default().with_ohlcv(ohlcv_map);
SimulationDataBuilder::new(streams)
.build(EnvConfig::default())
.unwrap()
}
fn sim_data_multi_stream(
oid: OhlcvId,
ohlcv_events: Vec<Ohlcv>,
tid: TradesId,
trade_events: Vec<TradeEvent>,
) -> SimulationData {
let mut ohlcv_map = SortedVecMap::new();
ohlcv_map.insert(oid, ohlcv_events.into_boxed_slice());
let mut trade_map = SortedVecMap::new();
trade_map.insert(tid, trade_events.into_boxed_slice());
let streams = Streams::default()
.with_ohlcv(ohlcv_map)
.with_trade(trade_map);
SimulationDataBuilder::new(streams)
.build(EnvConfig::default())
.unwrap()
}
fn episode(start: DateTime<Utc>, length: EpisodeLength) -> Episode {
EpisodeBuilder::new()
.with_start(start)
.with_length(length)
.build()
.unwrap()
}
#[test]
fn test_cursor_group_synchronization_ohlcv_and_trade() {
let oid = ohlcv_id(Period::Minute(5));
let tid = trade_id();
let ohlcv_events = vec![
ohlcv(ts("2025-03-01T00:00:00Z"), ts("2025-03-01T00:05:00Z")),
ohlcv(ts("2025-03-01T00:05:00Z"), ts("2025-03-01T00:10:00Z")),
];
let trade_events = vec![
trade(ts("2025-03-01T00:02:00Z")),
trade(ts("2025-03-01T00:07:00Z")),
];
let sim_data = sim_data_multi_stream(oid, ohlcv_events, tid, trade_events);
let ep = episode(ts("2025-03-01T00:00:00Z"), EpisodeLength::Day);
let mut cursor_group = CursorGroup::new(&sim_data).unwrap();
assert_eq!(
cursor_group.current_ts,
ts("2025-03-01T00:02:00Z"),
"Initial: Should start at earliest available event (trade at 00:02)"
);
assert_eq!(cursor_group.trade.0.get(&tid).unwrap(), &(0..1));
assert_eq!(cursor_group.ohlcv.0.get(&oid).unwrap(), &(0..0));
cursor_group.step(&sim_data, &ep).unwrap();
assert_eq!(cursor_group.current_ts, ts("2025-03-01T00:05:00Z"));
assert_eq!(
cursor_group.ohlcv.0.get(&oid).unwrap(),
&(0..1),
"Step 1: OHLCV cursor should have consumed first candle"
);
assert_eq!(
cursor_group.trade.0.get(&tid).unwrap(),
&(0..1),
"Step 1: Trade cursor should remain at 1 (no new trades)"
);
cursor_group.step(&sim_data, &ep).unwrap();
assert_eq!(cursor_group.current_ts, ts("2025-03-01T00:07:00Z"));
assert_eq!(
cursor_group.trade.0.get(&tid).unwrap(),
&(0..2),
"Step 2: Trade cursor should now have consumed both trades"
);
assert_eq!(
cursor_group.ohlcv.0.get(&oid).unwrap(),
&(0..1),
"Step 2: OHLCV cursor should remain at 1"
);
cursor_group.step(&sim_data, &ep).unwrap();
assert_eq!(cursor_group.current_ts, ts("2025-03-01T00:10:00Z"));
assert_eq!(
cursor_group.ohlcv.0.get(&oid).unwrap(),
&(0..2),
"Step 3: OHLCV cursor should have consumed both candles"
);
assert_eq!(
cursor_group.trade.0.get(&tid).unwrap(),
&(0..2),
"Step 3: Trade cursor should remain at 2"
);
assert!(cursor_group.is_end_of_data(&sim_data));
}
#[test]
fn test_cursor_group_initialization_advances_to_first_available() {
let oid = ohlcv_id(Period::Minute(3));
let events = vec![
ohlcv(ts("2025-04-01T00:00:00Z"), ts("2025-04-01T00:03:00Z")),
ohlcv(ts("2025-04-01T00:03:00Z"), ts("2025-04-01T00:06:00Z")),
];
let sim_data = sim_data_with_ohlcv(oid, events);
let cursor_group = CursorGroup::new(&sim_data).unwrap();
assert_eq!(
cursor_group.current_ts,
ts("2025-04-01T00:03:00Z"),
"Should initialize at first available event"
);
assert_eq!(
cursor_group.previous_ts, None,
"previous_ts should be None on init"
);
assert_eq!(
cursor_group.ohlcv.0.get(&oid).unwrap(),
&(0..1),
"First event should be consumed on initialization"
);
}
#[test]
fn test_cursor_group_rewind_resets_all_cursors() {
let oid = ohlcv_id(Period::Minute(3));
let tid = trade_id();
let ohlcv_events = vec![
ohlcv(ts("2025-05-01T00:00:00Z"), ts("2025-05-01T00:03:00Z")),
ohlcv(ts("2025-05-01T00:03:00Z"), ts("2025-05-01T00:06:00Z")),
];
let trade_events = vec![
trade(ts("2025-05-01T00:02:00Z")),
trade(ts("2025-05-01T00:04:00Z")),
];
let sim_data = sim_data_multi_stream(oid, ohlcv_events, tid, trade_events);
let ep = episode(ts("2025-05-01T00:00:00Z"), EpisodeLength::Day);
let mut cursor_group = CursorGroup::new(&sim_data).unwrap();
cursor_group.step(&sim_data, &ep).unwrap();
cursor_group.step(&sim_data, &ep).unwrap();
assert!(cursor_group.ohlcv.0.get(&oid).unwrap().end > 0);
assert!(cursor_group.trade.0.get(&tid).unwrap().end > 0);
cursor_group.rewind();
assert_eq!(
cursor_group.ohlcv.0.get(&oid).unwrap(),
&(0..0),
"OHLCV cursor should be reset to 0..0"
);
assert_eq!(
cursor_group.trade.0.get(&tid).unwrap(),
&(0..0),
"Trade cursor should be reset to 0..0"
);
}
#[test]
fn test_cursor_group_is_end_of_data_requires_all_exhausted() {
let oid = ohlcv_id(Period::Minute(3));
let tid = trade_id();
let ohlcv_events = vec![ohlcv(
ts("2025-06-01T00:00:00Z"),
ts("2025-06-01T00:03:00Z"),
)];
let trade_events = vec![trade(ts("2025-06-01T00:05:00Z"))];
let sim_data = sim_data_multi_stream(oid, ohlcv_events, tid, trade_events);
let ep = episode(ts("2025-06-01T00:00:00Z"), EpisodeLength::Day);
let mut cursor_group = CursorGroup::new(&sim_data).unwrap();
assert!(
!cursor_group.is_end_of_data(&sim_data),
"Should NOT be at end initially"
);
cursor_group.step(&sim_data, &ep).unwrap();
assert_eq!(cursor_group.current_ts, ts("2025-06-01T00:05:00Z"));
assert!(
cursor_group.is_end_of_data(&sim_data),
"Should be at end when ALL streams are exhausted"
);
}
#[test]
fn test_cursor_group_step_idempotent_at_episode_end() {
let oid = ohlcv_id(Period::Minute(3));
let events = vec![
ohlcv(ts("2025-07-01T00:00:00Z"), ts("2025-07-01T00:03:00Z")),
ohlcv(ts("2025-07-01T00:03:00Z"), ts("2025-07-01T00:06:00Z")),
ohlcv(ts("2025-07-02T00:06:00Z"), ts("2025-07-02T00:09:00Z")),
];
let sim_data = sim_data_with_ohlcv(oid, events);
let ep = episode(ts("2025-07-01T00:00:00Z"), EpisodeLength::Day);
let mut cursor_group = CursorGroup::new(&sim_data).unwrap();
cursor_group.step(&sim_data, &ep).unwrap();
cursor_group.step(&sim_data, &ep).unwrap();
let ts_before = cursor_group.current_ts;
let prev_before = cursor_group.previous_ts;
let range_before = cursor_group.ohlcv.0.get(&oid).unwrap().clone();
cursor_group.step(&sim_data, &ep).unwrap();
cursor_group.step(&sim_data, &ep).unwrap();
assert_eq!(
cursor_group.current_ts, ts_before,
"current_ts should not change"
);
assert_eq!(
cursor_group.previous_ts, prev_before,
"previous_ts should not change"
);
assert_eq!(
cursor_group.ohlcv.0.get(&oid).unwrap(),
&range_before,
"cursor range should not change"
);
}
#[test]
fn test_cursor_group_advance_to_next_episode_resets_previous_ts() {
let oid = ohlcv_id(Period::Minute(3));
let events = vec![
ohlcv(ts("2025-08-01T00:00:00Z"), ts("2025-08-01T00:03:00Z")),
ohlcv(ts("2025-08-01T00:03:00Z"), ts("2025-08-01T00:06:00Z")),
ohlcv(ts("2025-08-02T00:00:00Z"), ts("2025-08-02T00:03:00Z")),
];
let sim_data = sim_data_with_ohlcv(oid, events);
let ep1 = episode(ts("2025-08-01T00:00:00Z"), EpisodeLength::Day);
let mut cursor_group = CursorGroup::new(&sim_data).unwrap();
cursor_group.step(&sim_data, &ep1).unwrap();
assert!(
cursor_group.previous_ts.is_some(),
"previous_ts should be set after step"
);
let ep2 = cursor_group
.advance_to_next_episode(&sim_data, ep1)
.unwrap();
assert!(ep2.is_some());
assert_eq!(
cursor_group.previous_ts, None,
"previous_ts should be reset to None for new episode"
);
assert_eq!(
cursor_group.current_ts,
ts("2025-08-02T00:03:00Z"),
"current_ts should be at first event of new episode"
);
}
#[test]
fn test_cursor_group_advance_to_next_episode_returns_none_when_no_more_data() {
let oid = ohlcv_id(Period::Minute(3));
let events = vec![ohlcv(
ts("2025-09-01T00:00:00Z"),
ts("2025-09-01T00:03:00Z"),
)];
let sim_data = sim_data_with_ohlcv(oid, events);
let ep = episode(ts("2025-09-01T00:00:00Z"), EpisodeLength::Day);
let mut cursor_group = CursorGroup::new(&sim_data).unwrap();
let next_ep = cursor_group.advance_to_next_episode(&sim_data, ep).unwrap();
assert!(
next_ep.is_none(),
"Should return None when no more data exists"
);
assert!(
cursor_group.is_end_of_data(&sim_data),
"All cursors should be exhausted"
);
}
#[test]
fn test_cursor_group_handles_sparse_data_across_episodes() {
let oid = ohlcv_id(Period::Minute(3));
let events = vec![
ohlcv(ts("2025-10-01T00:00:00Z"), ts("2025-10-01T00:03:00Z")),
ohlcv(ts("2025-10-15T10:00:00Z"), ts("2025-10-15T10:03:00Z")),
];
let sim_data = sim_data_with_ohlcv(oid, events);
let ep1 = episode(ts("2025-10-01T00:00:00Z"), EpisodeLength::Day);
let mut cursor_group = CursorGroup::new(&sim_data).unwrap();
let ep2 = cursor_group
.advance_to_next_episode(&sim_data, ep1)
.unwrap();
assert!(ep2.is_some());
let ep2 = ep2.unwrap();
assert_eq!(
ep2.start(),
ts("2025-10-15T10:00:00Z"),
"Episode should start at actual data location"
);
assert_eq!(
cursor_group.current_ts,
ts("2025-10-15T10:03:00Z"),
"current_ts should be at first available event"
);
}
#[test]
fn test_cursor_group_step_tracks_time_correctly() {
let oid = ohlcv_id(Period::Minute(3));
let events = vec![
ohlcv(ts("2025-11-01T00:00:00Z"), ts("2025-11-01T00:03:00Z")),
ohlcv(ts("2025-11-01T00:03:00Z"), ts("2025-11-01T00:06:00Z")),
ohlcv(ts("2025-11-01T00:06:00Z"), ts("2025-11-01T00:09:00Z")),
];
let sim_data = sim_data_with_ohlcv(oid, events);
let ep = episode(ts("2025-11-01T00:00:00Z"), EpisodeLength::Day);
let mut cursor_group = CursorGroup::new(&sim_data).unwrap();
assert_eq!(cursor_group.current_ts, ts("2025-11-01T00:03:00Z"));
assert_eq!(cursor_group.previous_ts, None);
cursor_group.step(&sim_data, &ep).unwrap();
assert_eq!(cursor_group.current_ts, ts("2025-11-01T00:06:00Z"));
assert_eq!(cursor_group.previous_ts, Some(ts("2025-11-01T00:03:00Z")));
cursor_group.step(&sim_data, &ep).unwrap();
assert_eq!(cursor_group.current_ts, ts("2025-11-01T00:09:00Z"));
assert_eq!(cursor_group.previous_ts, Some(ts("2025-11-01T00:06:00Z")));
}
#[test]
fn test_cursor_group_simultaneous_events_across_different_streams() {
let oid = ohlcv_id(Period::Minute(5));
let tid = trade_id();
let ohlcv_events = vec![ohlcv(
ts("2025-12-01T00:00:00Z"),
ts("2025-12-01T00:05:00Z"),
)];
let trade_events = vec![trade(ts("2025-12-01T00:05:00Z"))];
let sim_data = sim_data_multi_stream(oid, ohlcv_events, tid, trade_events);
let cursor_group = CursorGroup::new(&sim_data).unwrap();
assert_eq!(
cursor_group.current_ts,
ts("2025-12-01T00:05:00Z"),
"Should be at the simultaneous timestamp"
);
assert_eq!(
cursor_group.ohlcv.0.get(&oid).unwrap(),
&(0..1),
"OHLCV should have consumed its event"
);
assert_eq!(
cursor_group.trade.0.get(&tid).unwrap(),
&(0..1),
"Trade should have consumed its event"
);
assert!(cursor_group.is_end_of_data(&sim_data));
}
#[test]
fn test_cursor_group_integration_full_workflow() {
let oid = ohlcv_id(Period::Minute(5));
let tid = trade_id();
let ohlcv_events = vec![
ohlcv(ts("2026-01-05T00:00:00Z"), ts("2026-01-05T00:05:00Z")),
ohlcv(ts("2026-01-05T00:05:00Z"), ts("2026-01-05T00:10:00Z")),
ohlcv(ts("2026-01-06T00:00:00Z"), ts("2026-01-06T00:05:00Z")),
];
let trade_events = vec![
trade(ts("2026-01-05T00:03:00Z")),
trade(ts("2026-01-05T00:08:00Z")),
trade(ts("2026-01-06T00:02:00Z")),
];
let sim_data = sim_data_multi_stream(oid, ohlcv_events, tid, trade_events);
let ep1 = episode(ts("2026-01-05T00:00:00Z"), EpisodeLength::Day);
let mut cursor_group = CursorGroup::new(&sim_data).unwrap();
assert_eq!(cursor_group.current_ts, ts("2026-01-05T00:03:00Z"));
cursor_group.step(&sim_data, &ep1).unwrap(); assert_eq!(cursor_group.current_ts, ts("2026-01-05T00:05:00Z"));
cursor_group.step(&sim_data, &ep1).unwrap(); assert_eq!(cursor_group.current_ts, ts("2026-01-05T00:08:00Z"));
cursor_group.step(&sim_data, &ep1).unwrap(); assert_eq!(cursor_group.current_ts, ts("2026-01-05T00:10:00Z"));
let ep2 = cursor_group
.advance_to_next_episode(&sim_data, ep1)
.unwrap();
assert!(ep2.is_some());
let ep2 = ep2.unwrap();
assert_eq!(cursor_group.current_ts, ts("2026-01-06T00:02:00Z"));
assert_eq!(cursor_group.previous_ts, None);
cursor_group.step(&sim_data, &ep2).unwrap();
assert_eq!(cursor_group.current_ts, ts("2026-01-06T00:05:00Z"));
assert_eq!(cursor_group.previous_ts, Some(ts("2026-01-06T00:02:00Z")));
let ep3 = cursor_group
.advance_to_next_episode(&sim_data, ep2)
.unwrap();
assert!(ep3.is_none());
assert!(cursor_group.is_end_of_data(&sim_data));
}
#[test]
fn test_cursor_group_multi_symbol_same_type() {
let btc_id = trade_id();
let eth_id = trade_id_alt();
let btc_events = vec![trade(ts("2025-01-01T10:00:00Z"))];
let eth_events = vec![trade(ts("2025-01-01T10:01:00Z"))];
let mut trade_map = SortedVecMap::new();
trade_map.insert(btc_id, btc_events.into_boxed_slice());
trade_map.insert(eth_id, eth_events.into_boxed_slice());
let mut ohlcv_map = SortedVecMap::new();
let dummy_ohlcv = ohlcv_id(Period::Minute(1));
ohlcv_map.insert(dummy_ohlcv, vec![].into_boxed_slice());
let streams = Streams::default()
.with_ohlcv(ohlcv_map)
.with_trade(trade_map);
let sim_data = SimulationDataBuilder::new(streams)
.build(EnvConfig::default())
.unwrap();
let ep = episode(ts("2025-01-01T00:00:00Z"), EpisodeLength::Day);
let mut cursor_group = CursorGroup::new(&sim_data).unwrap();
assert_eq!(cursor_group.current_ts, ts("2025-01-01T10:00:00Z"));
assert_eq!(cursor_group.trade.0.get(&btc_id).unwrap(), &(0..1));
assert_eq!(cursor_group.trade.0.get(ð_id).unwrap(), &(0..0));
cursor_group.step(&sim_data, &ep).unwrap();
assert_eq!(cursor_group.current_ts, ts("2025-01-01T10:01:00Z"));
assert_eq!(cursor_group.trade.0.get(&btc_id).unwrap(), &(0..1));
assert_eq!(cursor_group.trade.0.get(ð_id).unwrap(), &(0..1));
}
}