1use std::{
19 cmp::Ordering,
20 fmt::{Debug, Display},
21 num::NonZeroU64,
22 rc::Rc,
23 sync::Arc,
24};
25
26use nautilus_core::{
27 UUID4, UnixNanos,
28 correctness::{FAILED, check_valid_string_utf8},
29};
30#[cfg(feature = "python")]
31use pyo3::{Py, PyAny, Python};
32use ustr::Ustr;
33
34#[must_use]
38#[allow(clippy::missing_panics_doc)] pub fn create_valid_interval(interval_ns: u64) -> NonZeroU64 {
40 NonZeroU64::new(std::cmp::max(interval_ns, 1)).expect("`interval_ns` must be positive")
41}
42
43#[repr(C)]
44#[derive(Clone, Debug, PartialEq, Eq)]
45#[cfg_attr(
46 feature = "python",
47 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common", from_py_object)
48)]
49pub struct TimeEvent {
54 pub name: Ustr,
56 pub event_id: UUID4,
58 pub ts_event: UnixNanos,
60 pub ts_init: UnixNanos,
62}
63
64impl TimeEvent {
65 #[must_use]
71 pub const fn new(name: Ustr, event_id: UUID4, ts_event: UnixNanos, ts_init: UnixNanos) -> Self {
72 Self {
73 name,
74 event_id,
75 ts_event,
76 ts_init,
77 }
78 }
79}
80
81impl Display for TimeEvent {
82 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
83 write!(
84 f,
85 "{}(name={}, event_id={}, ts_event={}, ts_init={})",
86 stringify!(TimeEvent),
87 self.name,
88 self.event_id,
89 self.ts_event,
90 self.ts_init
91 )
92 }
93}
94
95#[repr(transparent)] #[derive(Clone, Debug, PartialEq, Eq)]
102pub struct ScheduledTimeEvent(pub TimeEvent);
103
104impl ScheduledTimeEvent {
105 #[must_use]
107 pub const fn new(event: TimeEvent) -> Self {
108 Self(event)
109 }
110
111 #[must_use]
113 pub fn into_inner(self) -> TimeEvent {
114 self.0
115 }
116}
117
118impl PartialOrd for ScheduledTimeEvent {
119 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
120 Some(self.cmp(other))
121 }
122}
123
124impl Ord for ScheduledTimeEvent {
125 fn cmp(&self, other: &Self) -> Ordering {
126 other.0.ts_event.cmp(&self.0.ts_event)
128 }
129}
130
131pub enum TimeEventCallback {
159 #[cfg(feature = "python")]
161 Python(Py<PyAny>),
162 Rust(Arc<dyn Fn(TimeEvent) + Send + Sync>),
164 RustLocal(Rc<dyn Fn(TimeEvent)>),
166}
167
168impl Clone for TimeEventCallback {
169 fn clone(&self) -> Self {
170 match self {
171 #[cfg(feature = "python")]
172 Self::Python(obj) => Self::Python(nautilus_core::python::clone_py_object(obj)),
173 Self::Rust(cb) => Self::Rust(cb.clone()),
174 Self::RustLocal(cb) => Self::RustLocal(cb.clone()),
175 }
176 }
177}
178
179impl Debug for TimeEventCallback {
180 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
181 match self {
182 #[cfg(feature = "python")]
183 Self::Python(_) => f.write_str("Python callback"),
184 Self::Rust(_) => f.write_str("Rust callback (thread-safe)"),
185 Self::RustLocal(_) => f.write_str("Rust callback (local)"),
186 }
187 }
188}
189
190impl TimeEventCallback {
191 #[must_use]
193 pub const fn is_rust(&self) -> bool {
194 matches!(self, Self::Rust(_))
195 }
196
197 #[must_use]
202 pub const fn is_local(&self) -> bool {
203 matches!(self, Self::RustLocal(_))
204 }
205
206 pub fn call(&self, event: TimeEvent) {
210 match self {
211 #[cfg(feature = "python")]
212 Self::Python(callback) => {
213 Python::attach(|py| {
214 if let Err(e) = callback.call1(py, (event,)) {
215 log::error!("Python time event callback raised exception: {e}");
216 }
217 });
218 }
219 Self::Rust(callback) => callback(event),
220 Self::RustLocal(callback) => callback(event),
221 }
222 }
223}
224
225impl<F> From<F> for TimeEventCallback
226where
227 F: Fn(TimeEvent) + Send + Sync + 'static,
228{
229 fn from(value: F) -> Self {
230 Self::Rust(Arc::new(value))
231 }
232}
233
234impl From<Arc<dyn Fn(TimeEvent) + Send + Sync>> for TimeEventCallback {
235 fn from(value: Arc<dyn Fn(TimeEvent) + Send + Sync>) -> Self {
236 Self::Rust(value)
237 }
238}
239
240impl From<Rc<dyn Fn(TimeEvent)>> for TimeEventCallback {
241 fn from(value: Rc<dyn Fn(TimeEvent)>) -> Self {
242 Self::RustLocal(value)
243 }
244}
245
246#[cfg(feature = "python")]
247impl From<Py<PyAny>> for TimeEventCallback {
248 fn from(value: Py<PyAny>) -> Self {
249 Self::Python(value)
250 }
251}
252
253#[allow(unsafe_code)]
269unsafe impl Send for TimeEventCallback {}
270#[allow(unsafe_code)]
271unsafe impl Sync for TimeEventCallback {}
272
273#[repr(C)]
274#[derive(Clone, Debug)]
275pub struct TimeEventHandler {
280 pub event: TimeEvent,
282 pub callback: TimeEventCallback,
284}
285
286impl TimeEventHandler {
287 #[must_use]
289 pub const fn new(event: TimeEvent, callback: TimeEventCallback) -> Self {
290 Self { event, callback }
291 }
292
293 pub fn run(self) {
295 let Self { event, callback } = self;
296 callback.call(event);
297 }
298}
299
300impl PartialOrd for TimeEventHandler {
301 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
302 Some(self.cmp(other))
303 }
304}
305
306impl PartialEq for TimeEventHandler {
307 fn eq(&self, other: &Self) -> bool {
308 self.event.ts_event == other.event.ts_event
309 }
310}
311
312impl Eq for TimeEventHandler {}
313
314impl Ord for TimeEventHandler {
315 fn cmp(&self, other: &Self) -> Ordering {
316 self.event.ts_event.cmp(&other.event.ts_event)
317 }
318}
319
320#[derive(Clone, Debug)]
329pub struct TestTimer {
330 pub name: Ustr,
332 pub interval_ns: NonZeroU64,
334 pub start_time_ns: UnixNanos,
336 pub stop_time_ns: Option<UnixNanos>,
338 pub fire_immediately: bool,
340 next_time_ns: UnixNanos,
341 is_expired: bool,
342}
343
344impl TestTimer {
345 #[must_use]
351 pub fn new(
352 name: Ustr,
353 interval_ns: NonZeroU64,
354 start_time_ns: UnixNanos,
355 stop_time_ns: Option<UnixNanos>,
356 fire_immediately: bool,
357 ) -> Self {
358 check_valid_string_utf8(name, stringify!(name)).expect(FAILED);
359
360 let next_time_ns = if fire_immediately {
361 start_time_ns
362 } else {
363 start_time_ns + interval_ns.get()
364 };
365
366 Self {
367 name,
368 interval_ns,
369 start_time_ns,
370 stop_time_ns,
371 fire_immediately,
372 next_time_ns,
373 is_expired: false,
374 }
375 }
376
377 #[must_use]
379 pub const fn next_time_ns(&self) -> UnixNanos {
380 self.next_time_ns
381 }
382
383 #[must_use]
385 pub const fn is_expired(&self) -> bool {
386 self.is_expired
387 }
388
389 #[must_use]
390 pub const fn pop_event(&self, event_id: UUID4, ts_init: UnixNanos) -> TimeEvent {
391 TimeEvent {
392 name: self.name,
393 event_id,
394 ts_event: self.next_time_ns,
395 ts_init,
396 }
397 }
398
399 pub fn advance(&mut self, to_time_ns: UnixNanos) -> impl Iterator<Item = TimeEvent> + '_ {
405 let advances = if self.next_time_ns <= to_time_ns {
407 ((to_time_ns.as_u64() - self.next_time_ns.as_u64()) / self.interval_ns.get())
408 .saturating_add(1)
409 } else {
410 0
411 };
412 self.take(advances as usize).map(|(event, _)| event)
413 }
414
415 pub const fn cancel(&mut self) {
419 self.is_expired = true;
420 }
421}
422
423impl Iterator for TestTimer {
424 type Item = (TimeEvent, UnixNanos);
425
426 fn next(&mut self) -> Option<Self::Item> {
427 if self.is_expired {
428 None
429 } else {
430 if let Some(stop_time_ns) = self.stop_time_ns
432 && self.next_time_ns > stop_time_ns
433 {
434 self.is_expired = true;
435 return None;
436 }
437
438 let item = (
439 TimeEvent {
440 name: self.name,
441 event_id: UUID4::new(),
442 ts_event: self.next_time_ns,
443 ts_init: self.next_time_ns,
444 },
445 self.next_time_ns,
446 );
447
448 if let Some(stop_time_ns) = self.stop_time_ns
450 && self.next_time_ns == stop_time_ns
451 {
452 self.is_expired = true;
453 }
454
455 self.next_time_ns += self.interval_ns;
456
457 Some(item)
458 }
459 }
460}
461
462#[cfg(test)]
463mod tests {
464 use std::num::NonZeroU64;
465
466 use nautilus_core::UnixNanos;
467 use rstest::*;
468 use ustr::Ustr;
469
470 use super::{TestTimer, TimeEvent};
471
472 #[rstest]
473 fn test_test_timer_pop_event() {
474 let mut timer = TestTimer::new(
475 Ustr::from("TEST_TIMER"),
476 NonZeroU64::new(1).unwrap(),
477 UnixNanos::from(1),
478 None,
479 false,
480 );
481
482 assert!(timer.next().is_some());
483 assert!(timer.next().is_some());
484 timer.is_expired = true;
485 assert!(timer.next().is_none());
486 }
487
488 #[rstest]
489 fn test_test_timer_advance_within_next_time_ns() {
490 let mut timer = TestTimer::new(
491 Ustr::from("TEST_TIMER"),
492 NonZeroU64::new(5).unwrap(),
493 UnixNanos::default(),
494 None,
495 false,
496 );
497 let _: Vec<TimeEvent> = timer.advance(UnixNanos::from(1)).collect();
498 let _: Vec<TimeEvent> = timer.advance(UnixNanos::from(2)).collect();
499 let _: Vec<TimeEvent> = timer.advance(UnixNanos::from(3)).collect();
500 assert_eq!(timer.advance(UnixNanos::from(4)).count(), 0);
501 assert_eq!(timer.next_time_ns, 5);
502 assert!(!timer.is_expired);
503 }
504
505 #[rstest]
506 fn test_test_timer_advance_up_to_next_time_ns() {
507 let mut timer = TestTimer::new(
508 Ustr::from("TEST_TIMER"),
509 NonZeroU64::new(1).unwrap(),
510 UnixNanos::default(),
511 None,
512 false,
513 );
514 assert_eq!(timer.advance(UnixNanos::from(1)).count(), 1);
515 assert!(!timer.is_expired);
516 }
517
518 #[rstest]
519 fn test_test_timer_advance_up_to_next_time_ns_with_stop_time() {
520 let mut timer = TestTimer::new(
521 Ustr::from("TEST_TIMER"),
522 NonZeroU64::new(1).unwrap(),
523 UnixNanos::default(),
524 Some(UnixNanos::from(2)),
525 false,
526 );
527 assert_eq!(timer.advance(UnixNanos::from(2)).count(), 2);
528 assert!(timer.is_expired);
529 }
530
531 #[rstest]
532 fn test_test_timer_advance_beyond_next_time_ns() {
533 let mut timer = TestTimer::new(
534 Ustr::from("TEST_TIMER"),
535 NonZeroU64::new(1).unwrap(),
536 UnixNanos::default(),
537 Some(UnixNanos::from(5)),
538 false,
539 );
540 assert_eq!(timer.advance(UnixNanos::from(5)).count(), 5);
541 assert!(timer.is_expired);
542 }
543
544 #[rstest]
545 fn test_test_timer_advance_beyond_stop_time() {
546 let mut timer = TestTimer::new(
547 Ustr::from("TEST_TIMER"),
548 NonZeroU64::new(1).unwrap(),
549 UnixNanos::default(),
550 Some(UnixNanos::from(5)),
551 false,
552 );
553 assert_eq!(timer.advance(UnixNanos::from(10)).count(), 5);
554 assert!(timer.is_expired);
555 }
556
557 #[rstest]
558 fn test_test_timer_advance_exact_boundary() {
559 let mut timer = TestTimer::new(
560 Ustr::from("TEST_TIMER"),
561 NonZeroU64::new(5).unwrap(),
562 UnixNanos::from(0),
563 None,
564 false,
565 );
566 assert_eq!(
567 timer.advance(UnixNanos::from(5)).count(),
568 1,
569 "Expected one event at the 5 ns boundary"
570 );
571 assert_eq!(
572 timer.advance(UnixNanos::from(10)).count(),
573 1,
574 "Expected one event at the 10 ns boundary"
575 );
576 }
577
578 #[rstest]
579 fn test_test_timer_fire_immediately_true() {
580 let mut timer = TestTimer::new(
581 Ustr::from("TEST_TIMER"),
582 NonZeroU64::new(5).unwrap(),
583 UnixNanos::from(10),
584 None,
585 true, );
587
588 assert_eq!(timer.next_time_ns(), UnixNanos::from(10));
590
591 let events: Vec<TimeEvent> = timer.advance(UnixNanos::from(10)).collect();
593 assert_eq!(events.len(), 1);
594 assert_eq!(events[0].ts_event, UnixNanos::from(10));
595
596 assert_eq!(timer.next_time_ns(), UnixNanos::from(15));
598 }
599
600 #[rstest]
601 fn test_test_timer_fire_immediately_false() {
602 let mut timer = TestTimer::new(
603 Ustr::from("TEST_TIMER"),
604 NonZeroU64::new(5).unwrap(),
605 UnixNanos::from(10),
606 None,
607 false, );
609
610 assert_eq!(timer.next_time_ns(), UnixNanos::from(15));
612
613 assert_eq!(timer.advance(UnixNanos::from(10)).count(), 0);
615
616 let events: Vec<TimeEvent> = timer.advance(UnixNanos::from(15)).collect();
618 assert_eq!(events.len(), 1);
619 assert_eq!(events[0].ts_event, UnixNanos::from(15));
620 }
621
622 use proptest::prelude::*;
627
628 #[derive(Clone, Debug)]
629 enum TimerOperation {
630 AdvanceTime(u64),
631 Cancel,
632 }
633
634 fn timer_operation_strategy() -> impl Strategy<Value = TimerOperation> {
635 prop_oneof![
636 8 => prop::num::u64::ANY.prop_map(|v| TimerOperation::AdvanceTime(v % 1000 + 1)),
637 2 => Just(TimerOperation::Cancel),
638 ]
639 }
640
641 fn timer_config_strategy() -> impl Strategy<Value = (u64, u64, Option<u64>, bool)> {
642 (
643 1u64..=100u64, 0u64..=50u64, prop::option::of(51u64..=200u64), prop::bool::ANY, )
648 }
649
650 fn timer_test_strategy()
651 -> impl Strategy<Value = (Vec<TimerOperation>, (u64, u64, Option<u64>, bool))> {
652 (
653 prop::collection::vec(timer_operation_strategy(), 5..=50),
654 timer_config_strategy(),
655 )
656 }
657
658 #[allow(clippy::needless_collect)] fn test_timer_with_operations(
660 operations: Vec<TimerOperation>,
661 (interval_ns, start_time_ns, stop_time_ns, fire_immediately): (u64, u64, Option<u64>, bool),
662 ) {
663 let mut timer = TestTimer::new(
664 Ustr::from("PROP_TEST_TIMER"),
665 NonZeroU64::new(interval_ns).unwrap(),
666 UnixNanos::from(start_time_ns),
667 stop_time_ns.map(UnixNanos::from),
668 fire_immediately,
669 );
670
671 let mut current_time = start_time_ns;
672
673 for operation in operations {
674 if timer.is_expired() {
675 break;
676 }
677
678 match operation {
679 TimerOperation::AdvanceTime(delta) => {
680 let to_time = current_time + delta;
681 let events: Vec<TimeEvent> = timer.advance(UnixNanos::from(to_time)).collect();
682 current_time = to_time;
683
684 for (i, event) in events.iter().enumerate() {
686 if i > 0 {
688 assert!(
689 event.ts_event >= events[i - 1].ts_event,
690 "Events should be in chronological order"
691 );
692 }
693
694 assert!(
696 event.ts_event.as_u64() >= start_time_ns,
697 "Event timestamp should not be before start time"
698 );
699
700 assert!(
701 event.ts_event.as_u64() <= to_time,
702 "Event timestamp should not be after advance time"
703 );
704
705 if let Some(stop_time_ns) = stop_time_ns {
707 assert!(
708 event.ts_event.as_u64() <= stop_time_ns,
709 "Event timestamp should not exceed stop time"
710 );
711 }
712 }
713 }
714 TimerOperation::Cancel => {
715 timer.cancel();
716 assert!(timer.is_expired(), "Timer should be expired after cancel");
717 }
718 }
719
720 if !timer.is_expired() {
722 let expected_interval_multiple = if fire_immediately {
724 timer.next_time_ns().as_u64() >= start_time_ns
725 } else {
726 timer.next_time_ns().as_u64() >= start_time_ns + interval_ns
727 };
728 assert!(
729 expected_interval_multiple,
730 "Next time should respect interval spacing"
731 );
732
733 if let Some(stop_time_ns) = stop_time_ns
736 && timer.next_time_ns().as_u64() > stop_time_ns
737 {
738 let mut test_timer = timer.clone();
740 let events: Vec<TimeEvent> = test_timer
741 .advance(UnixNanos::from(stop_time_ns + 1))
742 .collect();
743 assert!(
744 events.is_empty() || test_timer.is_expired(),
745 "Timer should not generate events beyond stop time"
746 );
747 }
748 }
749 }
750
751 if !timer.is_expired()
754 && let Some(stop_time_ns) = stop_time_ns
755 {
756 let events: Vec<TimeEvent> = timer
757 .advance(UnixNanos::from(stop_time_ns + 1000))
758 .collect();
759 assert!(
760 timer.is_expired() || events.is_empty(),
761 "Timer should eventually expire or stop generating events"
762 );
763 }
764 }
765
766 proptest! {
767 #[rstest]
768 fn prop_timer_advance_operations((operations, config) in timer_test_strategy()) {
769 test_timer_with_operations(operations, config);
770 }
771
772 #[rstest]
773 fn prop_timer_interval_consistency(
774 interval_ns in 1u64..=100u64,
775 start_time_ns in 0u64..=50u64,
776 fire_immediately in prop::bool::ANY,
777 advance_count in 1usize..=20usize,
778 ) {
779 let mut timer = TestTimer::new(
780 Ustr::from("CONSISTENCY_TEST"),
781 NonZeroU64::new(interval_ns).unwrap(),
782 UnixNanos::from(start_time_ns),
783 None, fire_immediately,
785 );
786
787 let mut previous_event_time = if fire_immediately { start_time_ns } else { start_time_ns + interval_ns };
788
789 for _ in 0..advance_count {
790 let events: Vec<TimeEvent> = timer.advance(UnixNanos::from(previous_event_time)).collect();
791
792 if !events.is_empty() {
793 prop_assert_eq!(events.len(), 1);
795 prop_assert_eq!(events[0].ts_event.as_u64(), previous_event_time);
796 }
797
798 previous_event_time += interval_ns;
799 }
800 }
801 }
802}