1use std::str::FromStr;
17
18use nautilus_core::{
19 UUID4, UnixNanos,
20 python::{IntoPyObjectNautilusExt, to_pyvalue_err},
21};
22use pyo3::{
23 IntoPyObjectExt,
24 basic::CompareOp,
25 prelude::*,
26 types::{PyInt, PyString, PyTuple},
27};
28use ustr::Ustr;
29
30use crate::timer::{TimeEvent, TimeEventCallback, TimeEventHandler};
31
32#[pyo3::pyclass(
33 module = "nautilus_trader.core.nautilus_pyo3.common",
34 name = "TimeEventHandler"
35)]
36#[derive(Debug)]
43#[allow(non_camel_case_types)]
44pub struct TimeEventHandler_Py {
45 pub event: TimeEvent,
47 pub callback: Py<PyAny>,
49}
50
51impl From<TimeEventHandler> for TimeEventHandler_Py {
52 fn from(value: TimeEventHandler) -> Self {
57 Self {
58 event: value.event,
59 callback: match value.callback {
60 #[cfg(feature = "python")]
61 TimeEventCallback::Python(callback) => callback,
62 TimeEventCallback::Rust(_) | TimeEventCallback::RustLocal(_) => {
63 panic!("Python time event handler is not supported for Rust callbacks")
64 }
65 },
66 }
67 }
68}
69
70#[pymethods]
71#[pyo3_stub_gen::derive::gen_stub_pymethods]
72impl TimeEvent {
73 #[new]
81 fn py_new(name: &str, event_id: UUID4, ts_event: u64, ts_init: u64) -> Self {
82 Self::new(Ustr::from(name), event_id, ts_event.into(), ts_init.into())
83 }
84
85 fn __setstate__(&mut self, state: &Bound<'_, PyAny>) -> PyResult<()> {
86 let py_tuple: &Bound<'_, PyTuple> = state.cast::<PyTuple>()?;
87
88 let ts_event = py_tuple.get_item(2)?.cast::<PyInt>()?.extract::<u64>()?;
89 let ts_init: u64 = py_tuple.get_item(3)?.cast::<PyInt>()?.extract::<u64>()?;
90
91 self.name = Ustr::from(
92 py_tuple
93 .get_item(0)?
94 .cast::<PyString>()?
95 .extract::<&str>()?,
96 );
97 self.event_id = UUID4::from_str(
98 py_tuple
99 .get_item(1)?
100 .cast::<PyString>()?
101 .extract::<&str>()?,
102 )
103 .map_err(to_pyvalue_err)?;
104 self.ts_event = ts_event.into();
105 self.ts_init = ts_init.into();
106
107 Ok(())
108 }
109
110 fn __getstate__(&self, py: Python) -> PyResult<Py<PyAny>> {
111 (
112 self.name.to_string(),
113 self.event_id.to_string(),
114 self.ts_event.as_u64(),
115 self.ts_init.as_u64(),
116 )
117 .into_py_any(py)
118 }
119
120 fn __reduce__(&self, py: Python) -> PyResult<Py<PyAny>> {
121 let safe_constructor = py.get_type::<Self>().getattr("_safe_constructor")?;
122 let state = self.__getstate__(py)?;
123 (safe_constructor, PyTuple::empty(py), state).into_py_any(py)
124 }
125
126 #[staticmethod]
127 fn _safe_constructor() -> Self {
128 Self::new(
129 Ustr::from("NULL"),
130 UUID4::new(),
131 UnixNanos::default(),
132 UnixNanos::default(),
133 )
134 }
135
136 fn __richcmp__(&self, other: &Self, op: CompareOp, py: Python<'_>) -> Py<PyAny> {
137 match op {
138 CompareOp::Eq => self.eq(other).into_py_any_unwrap(py),
139 CompareOp::Ne => self.ne(other).into_py_any_unwrap(py),
140 _ => py.NotImplemented(),
141 }
142 }
143
144 fn __repr__(&self) -> String {
145 self.to_string()
146 }
147
148 #[getter]
149 #[pyo3(name = "name")]
150 fn py_name(&self) -> String {
151 self.name.to_string()
152 }
153
154 #[getter]
155 #[pyo3(name = "event_id")]
156 const fn py_event_id(&self) -> UUID4 {
157 self.event_id
158 }
159
160 #[getter]
161 #[pyo3(name = "ts_event")]
162 const fn py_ts_event(&self) -> u64 {
163 self.ts_event.as_u64()
164 }
165
166 #[getter]
167 #[pyo3(name = "ts_init")]
168 const fn py_ts_init(&self) -> u64 {
169 self.ts_init.as_u64()
170 }
171}
172
173#[cfg(test)]
174mod tests {
175 use std::{num::NonZeroU64, sync::Arc, time::Duration};
176
177 use nautilus_core::{
178 UnixNanos, datetime::NANOSECONDS_IN_MILLISECOND, python::IntoPyObjectNautilusExt,
179 time::get_atomic_clock_realtime,
180 };
181 use pyo3::prelude::*;
182
183 use crate::{
184 live::timer::LiveTimer,
185 runner::{TimeEventSender, set_time_event_sender},
186 testing::wait_until,
187 timer::{TimeEvent, TimeEventCallback},
188 };
189
190 #[pyfunction]
191 const fn receive_event(_py: Python, _event: TimeEvent) {
192 }
194
195 #[derive(Debug)]
196 struct TestTimeEventSender;
197
198 impl TimeEventSender for TestTimeEventSender {
199 fn send(&self, _handler: crate::timer::TimeEventHandler) {
200 }
202 }
203
204 #[tokio::test]
205 async fn test_live_timer_starts_and_stops() {
206 set_time_event_sender(Arc::new(TestTimeEventSender));
207
208 Python::initialize();
209 let callback = Python::attach(|py| {
210 let callable = wrap_pyfunction!(receive_event, py).unwrap();
211 let callable = callable.into_py_any_unwrap(py);
212 TimeEventCallback::from(callable)
213 });
214
215 let clock = get_atomic_clock_realtime();
217 let start_time = clock.get_time_ns();
218 let interval_ns = NonZeroU64::new(100 * NANOSECONDS_IN_MILLISECOND).unwrap();
219
220 let test_sender = Arc::new(TestTimeEventSender);
221 let mut timer = LiveTimer::new(
222 "TEST_TIMER".into(),
223 interval_ns,
224 start_time,
225 None,
226 callback,
227 false,
228 Some(test_sender),
229 );
230
231 let next_time_ns = timer.next_time_ns();
232 timer.start();
233
234 tokio::time::sleep(Duration::from_millis(300)).await;
236
237 timer.cancel();
238 wait_until(|| timer.is_expired(), Duration::from_secs(2));
239 assert!(timer.next_time_ns() > next_time_ns);
240 }
241
242 #[tokio::test]
243 async fn test_live_timer_with_stop_time() {
244 set_time_event_sender(Arc::new(TestTimeEventSender));
245
246 Python::initialize();
247 let callback = Python::attach(|py| {
248 let callable = wrap_pyfunction!(receive_event, py).unwrap();
249 let callable = callable.into_py_any_unwrap(py);
250 TimeEventCallback::from(callable)
251 });
252
253 let clock = get_atomic_clock_realtime();
255 let start_time = clock.get_time_ns();
256 let interval_ns = NonZeroU64::new(100 * NANOSECONDS_IN_MILLISECOND).unwrap();
257 let stop_time = start_time + 500 * NANOSECONDS_IN_MILLISECOND;
258
259 let test_sender = Arc::new(TestTimeEventSender);
260 let mut timer = LiveTimer::new(
261 "TEST_TIMER".into(),
262 interval_ns,
263 start_time,
264 Some(stop_time),
265 callback,
266 false,
267 Some(test_sender),
268 );
269
270 let next_time_ns = timer.next_time_ns();
271 timer.start();
272
273 tokio::time::sleep(Duration::from_secs(1)).await;
275
276 wait_until(|| timer.is_expired(), Duration::from_secs(2));
277 assert!(timer.next_time_ns() > next_time_ns);
278 }
279
280 #[tokio::test]
281 async fn test_live_timer_with_zero_interval_and_immediate_stop_time() {
282 set_time_event_sender(Arc::new(TestTimeEventSender));
283
284 Python::initialize();
285 let callback = Python::attach(|py| {
286 let callable = wrap_pyfunction!(receive_event, py).unwrap();
287 let callable = callable.into_py_any_unwrap(py);
288 TimeEventCallback::from(callable)
289 });
290
291 let clock = get_atomic_clock_realtime();
293 let start_time = UnixNanos::default();
294 let interval_ns = NonZeroU64::new(1).unwrap();
295 let stop_time = clock.get_time_ns();
296
297 let test_sender = Arc::new(TestTimeEventSender);
298 let mut timer = LiveTimer::new(
299 "TEST_TIMER".into(),
300 interval_ns,
301 start_time,
302 Some(stop_time),
303 callback,
304 false,
305 Some(test_sender),
306 );
307
308 timer.start();
309
310 wait_until(|| timer.is_expired(), Duration::from_secs(2));
311 }
312}