use std::str::FromStr;
use nautilus_core::{
UUID4, UnixNanos,
python::{IntoPyObjectNautilusExt, to_pyvalue_err},
};
use pyo3::{
IntoPyObjectExt,
basic::CompareOp,
prelude::*,
types::{PyInt, PyString, PyTuple},
};
use ustr::Ustr;
use crate::timer::{TimeEvent, TimeEventCallback, TimeEventHandler};
#[pyo3::pyclass(
module = "nautilus_trader.core.nautilus_pyo3.common",
name = "TimeEventHandler"
)]
#[derive(Debug)]
#[allow(non_camel_case_types)]
pub struct TimeEventHandler_Py {
pub event: TimeEvent,
pub callback: Py<PyAny>,
}
impl From<TimeEventHandler> for TimeEventHandler_Py {
fn from(value: TimeEventHandler) -> Self {
Self {
event: value.event,
callback: match value.callback {
#[cfg(feature = "python")]
TimeEventCallback::Python(callback) => callback,
TimeEventCallback::Rust(_) | TimeEventCallback::RustLocal(_) => {
panic!("Python time event handler is not supported for Rust callbacks")
}
},
}
}
}
#[pymethods]
#[pyo3_stub_gen::derive::gen_stub_pymethods]
impl TimeEvent {
#[new]
fn py_new(name: &str, event_id: UUID4, ts_event: u64, ts_init: u64) -> Self {
Self::new(Ustr::from(name), event_id, ts_event.into(), ts_init.into())
}
fn __setstate__(&mut self, state: &Bound<'_, PyAny>) -> PyResult<()> {
let py_tuple: &Bound<'_, PyTuple> = state.cast::<PyTuple>()?;
let ts_event = py_tuple.get_item(2)?.cast::<PyInt>()?.extract::<u64>()?;
let ts_init: u64 = py_tuple.get_item(3)?.cast::<PyInt>()?.extract::<u64>()?;
self.name = Ustr::from(
py_tuple
.get_item(0)?
.cast::<PyString>()?
.extract::<&str>()?,
);
self.event_id = UUID4::from_str(
py_tuple
.get_item(1)?
.cast::<PyString>()?
.extract::<&str>()?,
)
.map_err(to_pyvalue_err)?;
self.ts_event = ts_event.into();
self.ts_init = ts_init.into();
Ok(())
}
fn __getstate__(&self, py: Python) -> PyResult<Py<PyAny>> {
(
self.name.to_string(),
self.event_id.to_string(),
self.ts_event.as_u64(),
self.ts_init.as_u64(),
)
.into_py_any(py)
}
fn __reduce__(&self, py: Python) -> PyResult<Py<PyAny>> {
let safe_constructor = py.get_type::<Self>().getattr("_safe_constructor")?;
let state = self.__getstate__(py)?;
(safe_constructor, PyTuple::empty(py), state).into_py_any(py)
}
#[staticmethod]
fn _safe_constructor() -> Self {
Self::new(
Ustr::from("NULL"),
UUID4::new(),
UnixNanos::default(),
UnixNanos::default(),
)
}
fn __richcmp__(&self, other: &Self, op: CompareOp, py: Python<'_>) -> Py<PyAny> {
match op {
CompareOp::Eq => self.eq(other).into_py_any_unwrap(py),
CompareOp::Ne => self.ne(other).into_py_any_unwrap(py),
_ => py.NotImplemented(),
}
}
fn __repr__(&self) -> String {
self.to_string()
}
#[getter]
#[pyo3(name = "name")]
fn py_name(&self) -> String {
self.name.to_string()
}
#[getter]
#[pyo3(name = "event_id")]
const fn py_event_id(&self) -> UUID4 {
self.event_id
}
#[getter]
#[pyo3(name = "ts_event")]
const fn py_ts_event(&self) -> u64 {
self.ts_event.as_u64()
}
#[getter]
#[pyo3(name = "ts_init")]
const fn py_ts_init(&self) -> u64 {
self.ts_init.as_u64()
}
}
#[cfg(test)]
mod tests {
use std::{num::NonZeroU64, sync::Arc, time::Duration};
use nautilus_core::{
UnixNanos, datetime::NANOSECONDS_IN_MILLISECOND, python::IntoPyObjectNautilusExt,
time::get_atomic_clock_realtime,
};
use pyo3::prelude::*;
use crate::{
live::timer::LiveTimer,
runner::{TimeEventSender, set_time_event_sender},
testing::wait_until,
timer::{TimeEvent, TimeEventCallback},
};
#[pyfunction]
const fn receive_event(_py: Python, _event: TimeEvent) {
}
#[derive(Debug)]
struct TestTimeEventSender;
impl TimeEventSender for TestTimeEventSender {
fn send(&self, _handler: crate::timer::TimeEventHandler) {
}
}
#[tokio::test]
async fn test_live_timer_starts_and_stops() {
set_time_event_sender(Arc::new(TestTimeEventSender));
Python::initialize();
let callback = Python::attach(|py| {
let callable = wrap_pyfunction!(receive_event, py).unwrap();
let callable = callable.into_py_any_unwrap(py);
TimeEventCallback::from(callable)
});
let clock = get_atomic_clock_realtime();
let start_time = clock.get_time_ns();
let interval_ns = NonZeroU64::new(100 * NANOSECONDS_IN_MILLISECOND).unwrap();
let test_sender = Arc::new(TestTimeEventSender);
let mut timer = LiveTimer::new(
"TEST_TIMER".into(),
interval_ns,
start_time,
None,
callback,
false,
Some(test_sender),
);
let next_time_ns = timer.next_time_ns();
timer.start();
tokio::time::sleep(Duration::from_millis(300)).await;
timer.cancel();
wait_until(|| timer.is_expired(), Duration::from_secs(2));
assert!(timer.next_time_ns() > next_time_ns);
}
#[tokio::test]
async fn test_live_timer_with_stop_time() {
set_time_event_sender(Arc::new(TestTimeEventSender));
Python::initialize();
let callback = Python::attach(|py| {
let callable = wrap_pyfunction!(receive_event, py).unwrap();
let callable = callable.into_py_any_unwrap(py);
TimeEventCallback::from(callable)
});
let clock = get_atomic_clock_realtime();
let start_time = clock.get_time_ns();
let interval_ns = NonZeroU64::new(100 * NANOSECONDS_IN_MILLISECOND).unwrap();
let stop_time = start_time + 500 * NANOSECONDS_IN_MILLISECOND;
let test_sender = Arc::new(TestTimeEventSender);
let mut timer = LiveTimer::new(
"TEST_TIMER".into(),
interval_ns,
start_time,
Some(stop_time),
callback,
false,
Some(test_sender),
);
let next_time_ns = timer.next_time_ns();
timer.start();
tokio::time::sleep(Duration::from_secs(1)).await;
wait_until(|| timer.is_expired(), Duration::from_secs(2));
assert!(timer.next_time_ns() > next_time_ns);
}
#[tokio::test]
async fn test_live_timer_with_zero_interval_and_immediate_stop_time() {
set_time_event_sender(Arc::new(TestTimeEventSender));
Python::initialize();
let callback = Python::attach(|py| {
let callable = wrap_pyfunction!(receive_event, py).unwrap();
let callable = callable.into_py_any_unwrap(py);
TimeEventCallback::from(callable)
});
let clock = get_atomic_clock_realtime();
let start_time = UnixNanos::default();
let interval_ns = NonZeroU64::new(1).unwrap();
let stop_time = clock.get_time_ns();
let test_sender = Arc::new(TestTimeEventSender);
let mut timer = LiveTimer::new(
"TEST_TIMER".into(),
interval_ns,
start_time,
Some(stop_time),
callback,
false,
Some(test_sender),
);
timer.start();
wait_until(|| timer.is_expired(), Duration::from_secs(2));
}
}