use std::{cmp::Reverse, collections::BinaryHeap};
use nautilus_common::{clock::TestClock, timer::TimeEventHandler};
use nautilus_core::UnixNanos;
#[derive(Debug)]
pub struct TimeEventAccumulator {
heap: BinaryHeap<Reverse<TimeEventHandler>>,
}
impl TimeEventAccumulator {
#[must_use]
pub fn new() -> Self {
Self {
heap: BinaryHeap::new(),
}
}
pub fn advance_clock(&mut self, clock: &mut TestClock, to_time_ns: UnixNanos, set_time: bool) {
let events = clock.advance_time(to_time_ns, set_time);
let handlers = clock.match_handlers(events);
for handler in handlers {
self.heap.push(Reverse(handler));
}
}
#[must_use]
pub fn peek_next_time(&self) -> Option<UnixNanos> {
self.heap.peek().map(|h| h.0.event.ts_event)
}
pub fn pop_next_at_or_before(&mut self, ts: UnixNanos) -> Option<TimeEventHandler> {
if self.heap.peek().is_some_and(|h| h.0.event.ts_event <= ts) {
self.heap.pop().map(|h| h.0)
} else {
None
}
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.heap.is_empty()
}
#[must_use]
pub fn len(&self) -> usize {
self.heap.len()
}
pub fn clear(&mut self) {
self.heap.clear();
}
pub fn drain(&mut self) -> Vec<TimeEventHandler> {
let mut handlers = Vec::with_capacity(self.heap.len());
while let Some(scheduled) = self.heap.pop() {
handlers.push(scheduled.0);
}
handlers
}
}
impl Default for TimeEventAccumulator {
fn default() -> Self {
Self::new()
}
}
#[cfg(all(test, feature = "python"))]
mod tests {
use nautilus_common::timer::{TimeEvent, TimeEventCallback};
use nautilus_core::UUID4;
use pyo3::{Py, Python, prelude::*, types::PyList};
use rstest::*;
use ustr::Ustr;
use super::*;
#[rstest]
fn test_accumulator_pop_in_order() {
Python::initialize();
Python::attach(|py| {
let py_list = PyList::empty(py);
let py_append = Py::from(py_list.getattr("append").unwrap());
let mut accumulator = TimeEventAccumulator::new();
let time_event1 = TimeEvent::new(
Ustr::from("TEST_EVENT_1"),
UUID4::new(),
100.into(),
100.into(),
);
let time_event2 = TimeEvent::new(
Ustr::from("TEST_EVENT_2"),
UUID4::new(),
300.into(),
300.into(),
);
let time_event3 = TimeEvent::new(
Ustr::from("TEST_EVENT_3"),
UUID4::new(),
200.into(),
200.into(),
);
let callback = TimeEventCallback::from(py_append.into_any());
let handler1 = TimeEventHandler::new(time_event1.clone(), callback.clone());
let handler2 = TimeEventHandler::new(time_event2.clone(), callback.clone());
let handler3 = TimeEventHandler::new(time_event3.clone(), callback);
accumulator.heap.push(Reverse(handler1));
accumulator.heap.push(Reverse(handler2));
accumulator.heap.push(Reverse(handler3));
assert_eq!(accumulator.len(), 3);
let popped1 = accumulator.pop_next_at_or_before(1000.into()).unwrap();
assert_eq!(popped1.event.ts_event, time_event1.ts_event);
let popped2 = accumulator.pop_next_at_or_before(1000.into()).unwrap();
assert_eq!(popped2.event.ts_event, time_event3.ts_event);
let popped3 = accumulator.pop_next_at_or_before(1000.into()).unwrap();
assert_eq!(popped3.event.ts_event, time_event2.ts_event);
assert!(accumulator.is_empty());
});
}
#[rstest]
fn test_accumulator_pop_same_timestamp_in_name_order() {
Python::initialize();
Python::attach(|py| {
let py_list = PyList::empty(py);
let py_append = Py::from(py_list.getattr("append").unwrap());
let mut accumulator = TimeEventAccumulator::new();
let callback = TimeEventCallback::from(py_append.into_any());
let spread_event = TimeEvent::new(
Ustr::from("spread_quote_ESM4"),
UUID4::new(),
100.into(),
100.into(),
);
let time_bar_event = TimeEvent::new(
Ustr::from("time_bar_ESM4-2-MINUTE-ASK-INTERNAL"),
UUID4::new(),
100.into(),
100.into(),
);
accumulator.heap.push(Reverse(TimeEventHandler::new(
time_bar_event.clone(),
callback.clone(),
)));
accumulator.heap.push(Reverse(TimeEventHandler::new(
spread_event.clone(),
callback,
)));
let popped1 = accumulator.pop_next_at_or_before(100.into()).unwrap();
assert_eq!(popped1.event.ts_event, spread_event.ts_event);
assert_eq!(popped1.event.name, spread_event.name);
let popped2 = accumulator.pop_next_at_or_before(100.into()).unwrap();
assert_eq!(popped2.event.ts_event, time_bar_event.ts_event);
assert_eq!(popped2.event.name, time_bar_event.name);
});
}
#[rstest]
fn test_accumulator_pop_respects_timestamp() {
Python::initialize();
Python::attach(|py| {
let py_list = PyList::empty(py);
let py_append = Py::from(py_list.getattr("append").unwrap());
let mut accumulator = TimeEventAccumulator::new();
let time_event1 = TimeEvent::new(
Ustr::from("TEST_EVENT_1"),
UUID4::new(),
100.into(),
100.into(),
);
let time_event2 = TimeEvent::new(
Ustr::from("TEST_EVENT_2"),
UUID4::new(),
300.into(),
300.into(),
);
let callback = TimeEventCallback::from(py_append.into_any());
accumulator.heap.push(Reverse(TimeEventHandler::new(
time_event1.clone(),
callback.clone(),
)));
accumulator.heap.push(Reverse(TimeEventHandler::new(
time_event2.clone(),
callback,
)));
let popped1 = accumulator.pop_next_at_or_before(200.into()).unwrap();
assert_eq!(popped1.event.ts_event, time_event1.ts_event);
assert!(accumulator.pop_next_at_or_before(200.into()).is_none());
let popped2 = accumulator.pop_next_at_or_before(300.into()).unwrap();
assert_eq!(popped2.event.ts_event, time_event2.ts_event);
});
}
#[rstest]
fn test_peek_next_time() {
Python::initialize();
Python::attach(|py| {
let py_list = PyList::empty(py);
let py_append = Py::from(py_list.getattr("append").unwrap());
let callback = TimeEventCallback::from(py_append.into_any());
let mut accumulator = TimeEventAccumulator::new();
assert!(accumulator.peek_next_time().is_none());
let time_event1 = TimeEvent::new(
Ustr::from("TEST_EVENT_1"),
UUID4::new(),
200.into(),
200.into(),
);
let time_event2 = TimeEvent::new(
Ustr::from("TEST_EVENT_2"),
UUID4::new(),
100.into(),
100.into(),
);
accumulator.heap.push(Reverse(TimeEventHandler::new(
time_event1,
callback.clone(),
)));
assert_eq!(accumulator.peek_next_time(), Some(200.into()));
accumulator
.heap
.push(Reverse(TimeEventHandler::new(time_event2, callback)));
assert_eq!(accumulator.peek_next_time(), Some(100.into()));
});
}
#[rstest]
fn test_drain_returns_in_order() {
Python::initialize();
Python::attach(|py| {
let py_list = PyList::empty(py);
let py_append = Py::from(py_list.getattr("append").unwrap());
let callback = TimeEventCallback::from(py_append.into_any());
let mut accumulator = TimeEventAccumulator::new();
for ts in [300u64, 100, 200] {
let event = TimeEvent::new(Ustr::from("TEST"), UUID4::new(), ts.into(), ts.into());
accumulator
.heap
.push(Reverse(TimeEventHandler::new(event, callback.clone())));
}
let handlers = accumulator.drain();
assert_eq!(handlers.len(), 3);
assert_eq!(handlers[0].event.ts_event.as_u64(), 100);
assert_eq!(handlers[1].event.ts_event.as_u64(), 200);
assert_eq!(handlers[2].event.ts_event.as_u64(), 300);
assert!(accumulator.is_empty());
});
}
#[rstest]
fn test_interleaved_push_pop_maintains_order() {
Python::initialize();
Python::attach(|py| {
let py_list = PyList::empty(py);
let py_append = Py::from(py_list.getattr("append").unwrap());
let callback = TimeEventCallback::from(py_append.into_any());
let mut accumulator = TimeEventAccumulator::new();
let mut popped_timestamps: Vec<u64> = Vec::new();
for ts in [100u64, 300] {
let event = TimeEvent::new(Ustr::from("TEST"), UUID4::new(), ts.into(), ts.into());
accumulator
.heap
.push(Reverse(TimeEventHandler::new(event, callback.clone())));
}
let handler = accumulator.pop_next_at_or_before(1000.into()).unwrap();
popped_timestamps.push(handler.event.ts_event.as_u64());
let event = TimeEvent::new(Ustr::from("NEW"), UUID4::new(), 150.into(), 150.into());
accumulator
.heap
.push(Reverse(TimeEventHandler::new(event, callback)));
while let Some(handler) = accumulator.pop_next_at_or_before(1000.into()) {
popped_timestamps.push(handler.event.ts_event.as_u64());
}
assert_eq!(popped_timestamps, vec![100, 150, 300]);
});
}
#[rstest]
fn test_same_timestamp_events() {
Python::initialize();
Python::attach(|py| {
let py_list = PyList::empty(py);
let py_append = Py::from(py_list.getattr("append").unwrap());
let callback = TimeEventCallback::from(py_append.into_any());
let mut accumulator = TimeEventAccumulator::new();
for i in 0..3 {
let event = TimeEvent::new(
Ustr::from(&format!("EVENT_{i}")),
UUID4::new(),
100.into(),
100.into(),
);
accumulator
.heap
.push(Reverse(TimeEventHandler::new(event, callback.clone())));
}
let mut count = 0;
while let Some(handler) = accumulator.pop_next_at_or_before(100.into()) {
assert_eq!(handler.event.ts_event.as_u64(), 100);
count += 1;
}
assert_eq!(count, 3);
});
}
#[rstest]
fn test_pop_at_exact_timestamp_boundary() {
Python::initialize();
Python::attach(|py| {
let py_list = PyList::empty(py);
let py_append = Py::from(py_list.getattr("append").unwrap());
let callback = TimeEventCallback::from(py_append.into_any());
let mut accumulator = TimeEventAccumulator::new();
let event = TimeEvent::new(Ustr::from("TEST"), UUID4::new(), 100.into(), 100.into());
accumulator
.heap
.push(Reverse(TimeEventHandler::new(event, callback)));
let handler = accumulator.pop_next_at_or_before(100.into());
assert!(handler.is_some());
assert_eq!(handler.unwrap().event.ts_event.as_u64(), 100);
let event2 = TimeEvent::new(Ustr::from("TEST2"), UUID4::new(), 200.into(), 200.into());
accumulator.heap.push(Reverse(TimeEventHandler::new(
event2,
TimeEventCallback::from(Py::from(py_list.getattr("append").unwrap()).into_any()),
)));
assert!(accumulator.pop_next_at_or_before(199.into()).is_none());
assert!(accumulator.pop_next_at_or_before(200.into()).is_some());
});
}
}