1use std::{
19 cmp::Ordering,
20 fmt::{Debug, Display},
21 num::NonZeroU64,
22 rc::Rc,
23 sync::Arc,
24};
25
26use nautilus_core::{
27 UUID4, UnixNanos,
28 correctness::{FAILED, check_valid_string_utf8},
29};
30#[cfg(feature = "python")]
31use pyo3::{Py, PyAny, Python};
32use ustr::Ustr;
33
34#[must_use]
40pub fn create_valid_interval(interval_ns: u64) -> NonZeroU64 {
41 NonZeroU64::new(std::cmp::max(interval_ns, 1)).expect("`interval_ns` must be positive")
42}
43
44#[repr(C)]
45#[derive(Clone, Debug, PartialEq, Eq)]
46#[cfg_attr(
47 feature = "python",
48 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common")
49)]
50pub struct TimeEvent {
55 pub name: Ustr,
57 pub event_id: UUID4,
59 pub ts_event: UnixNanos,
61 pub ts_init: UnixNanos,
63}
64
65impl TimeEvent {
66 #[must_use]
72 pub const fn new(name: Ustr, event_id: UUID4, ts_event: UnixNanos, ts_init: UnixNanos) -> Self {
73 Self {
74 name,
75 event_id,
76 ts_event,
77 ts_init,
78 }
79 }
80}
81
82impl Display for TimeEvent {
83 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
84 write!(
85 f,
86 "{}(name={}, event_id={}, ts_event={}, ts_init={})",
87 stringify!(TimeEvent),
88 self.name,
89 self.event_id,
90 self.ts_event,
91 self.ts_init
92 )
93 }
94}
95
96#[repr(transparent)] #[derive(Clone, Debug, PartialEq, Eq)]
103pub struct ScheduledTimeEvent(pub TimeEvent);
104
105impl ScheduledTimeEvent {
106 #[must_use]
108 pub const fn new(event: TimeEvent) -> Self {
109 Self(event)
110 }
111
112 #[must_use]
114 pub fn into_inner(self) -> TimeEvent {
115 self.0
116 }
117}
118
119impl PartialOrd for ScheduledTimeEvent {
120 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
121 Some(self.cmp(other))
122 }
123}
124
125impl Ord for ScheduledTimeEvent {
126 fn cmp(&self, other: &Self) -> Ordering {
127 other.0.ts_event.cmp(&self.0.ts_event)
129 }
130}
131
132pub enum TimeEventCallback {
160 #[cfg(feature = "python")]
162 Python(Py<PyAny>),
163 Rust(Arc<dyn Fn(TimeEvent) + Send + Sync>),
165 RustLocal(Rc<dyn Fn(TimeEvent)>),
167}
168
169impl Clone for TimeEventCallback {
170 fn clone(&self) -> Self {
171 match self {
172 #[cfg(feature = "python")]
173 Self::Python(obj) => Self::Python(nautilus_core::python::clone_py_object(obj)),
174 Self::Rust(cb) => Self::Rust(cb.clone()),
175 Self::RustLocal(cb) => Self::RustLocal(cb.clone()),
176 }
177 }
178}
179
180impl Debug for TimeEventCallback {
181 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
182 match self {
183 #[cfg(feature = "python")]
184 Self::Python(_) => f.write_str("Python callback"),
185 Self::Rust(_) => f.write_str("Rust callback (thread-safe)"),
186 Self::RustLocal(_) => f.write_str("Rust callback (local)"),
187 }
188 }
189}
190
191impl TimeEventCallback {
192 #[must_use]
194 pub const fn is_rust(&self) -> bool {
195 matches!(self, Self::Rust(_))
196 }
197
198 #[must_use]
203 pub const fn is_local(&self) -> bool {
204 matches!(self, Self::RustLocal(_))
205 }
206
207 pub fn call(&self, event: TimeEvent) {
211 match self {
212 #[cfg(feature = "python")]
213 Self::Python(callback) => {
214 Python::attach(|py| {
215 if let Err(e) = callback.call1(py, (event,)) {
216 log::error!("Python time event callback raised exception: {e}");
217 }
218 });
219 }
220 Self::Rust(callback) => callback(event),
221 Self::RustLocal(callback) => callback(event),
222 }
223 }
224}
225
226impl<F> From<F> for TimeEventCallback
227where
228 F: Fn(TimeEvent) + Send + Sync + 'static,
229{
230 fn from(value: F) -> Self {
231 Self::Rust(Arc::new(value))
232 }
233}
234
235impl From<Arc<dyn Fn(TimeEvent) + Send + Sync>> for TimeEventCallback {
236 fn from(value: Arc<dyn Fn(TimeEvent) + Send + Sync>) -> Self {
237 Self::Rust(value)
238 }
239}
240
241impl From<Rc<dyn Fn(TimeEvent)>> for TimeEventCallback {
242 fn from(value: Rc<dyn Fn(TimeEvent)>) -> Self {
243 Self::RustLocal(value)
244 }
245}
246
247#[cfg(feature = "python")]
248impl From<Py<PyAny>> for TimeEventCallback {
249 fn from(value: Py<PyAny>) -> Self {
250 Self::Python(value)
251 }
252}
253
254#[allow(unsafe_code)]
270unsafe impl Send for TimeEventCallback {}
271#[allow(unsafe_code)]
272unsafe impl Sync for TimeEventCallback {}
273
274#[repr(C)]
275#[derive(Clone, Debug)]
276pub struct TimeEventHandlerV2 {
281 pub event: TimeEvent,
283 pub callback: TimeEventCallback,
285}
286
287impl TimeEventHandlerV2 {
288 #[must_use]
290 pub const fn new(event: TimeEvent, callback: TimeEventCallback) -> Self {
291 Self { event, callback }
292 }
293
294 pub fn run(self) {
300 let Self { event, callback } = self;
301 callback.call(event);
302 }
303}
304
305impl PartialOrd for TimeEventHandlerV2 {
306 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
307 Some(self.cmp(other))
308 }
309}
310
311impl PartialEq for TimeEventHandlerV2 {
312 fn eq(&self, other: &Self) -> bool {
313 self.event.ts_event == other.event.ts_event
314 }
315}
316
317impl Eq for TimeEventHandlerV2 {}
318
319impl Ord for TimeEventHandlerV2 {
320 fn cmp(&self, other: &Self) -> Ordering {
321 self.event.ts_event.cmp(&other.event.ts_event)
322 }
323}
324
325#[derive(Clone, Copy, Debug)]
334pub struct TestTimer {
335 pub name: Ustr,
337 pub interval_ns: NonZeroU64,
339 pub start_time_ns: UnixNanos,
341 pub stop_time_ns: Option<UnixNanos>,
343 pub fire_immediately: bool,
345 next_time_ns: UnixNanos,
346 is_expired: bool,
347}
348
349impl TestTimer {
350 #[must_use]
356 pub fn new(
357 name: Ustr,
358 interval_ns: NonZeroU64,
359 start_time_ns: UnixNanos,
360 stop_time_ns: Option<UnixNanos>,
361 fire_immediately: bool,
362 ) -> Self {
363 check_valid_string_utf8(name, stringify!(name)).expect(FAILED);
364
365 let next_time_ns = if fire_immediately {
366 start_time_ns
367 } else {
368 start_time_ns + interval_ns.get()
369 };
370
371 Self {
372 name,
373 interval_ns,
374 start_time_ns,
375 stop_time_ns,
376 fire_immediately,
377 next_time_ns,
378 is_expired: false,
379 }
380 }
381
382 #[must_use]
384 pub const fn next_time_ns(&self) -> UnixNanos {
385 self.next_time_ns
386 }
387
388 #[must_use]
390 pub const fn is_expired(&self) -> bool {
391 self.is_expired
392 }
393
394 #[must_use]
395 pub const fn pop_event(&self, event_id: UUID4, ts_init: UnixNanos) -> TimeEvent {
396 TimeEvent {
397 name: self.name,
398 event_id,
399 ts_event: self.next_time_ns,
400 ts_init,
401 }
402 }
403
404 pub fn advance(&mut self, to_time_ns: UnixNanos) -> impl Iterator<Item = TimeEvent> + '_ {
410 let advances = if self.next_time_ns <= to_time_ns {
412 ((to_time_ns.as_u64() - self.next_time_ns.as_u64()) / self.interval_ns.get())
413 .saturating_add(1)
414 } else {
415 0
416 };
417 self.take(advances as usize).map(|(event, _)| event)
418 }
419
420 pub const fn cancel(&mut self) {
424 self.is_expired = true;
425 }
426}
427
428impl Iterator for TestTimer {
429 type Item = (TimeEvent, UnixNanos);
430
431 fn next(&mut self) -> Option<Self::Item> {
432 if self.is_expired {
433 None
434 } else {
435 if let Some(stop_time_ns) = self.stop_time_ns
437 && self.next_time_ns > stop_time_ns
438 {
439 self.is_expired = true;
440 return None;
441 }
442
443 let item = (
444 TimeEvent {
445 name: self.name,
446 event_id: UUID4::new(),
447 ts_event: self.next_time_ns,
448 ts_init: self.next_time_ns,
449 },
450 self.next_time_ns,
451 );
452
453 if let Some(stop_time_ns) = self.stop_time_ns
455 && self.next_time_ns == stop_time_ns
456 {
457 self.is_expired = true;
458 }
459
460 self.next_time_ns += self.interval_ns;
461
462 Some(item)
463 }
464 }
465}
466
467#[cfg(test)]
468mod tests {
469 use std::num::NonZeroU64;
470
471 use nautilus_core::UnixNanos;
472 use rstest::*;
473 use ustr::Ustr;
474
475 use super::{TestTimer, TimeEvent};
476
477 #[rstest]
478 fn test_test_timer_pop_event() {
479 let mut timer = TestTimer::new(
480 Ustr::from("TEST_TIMER"),
481 NonZeroU64::new(1).unwrap(),
482 UnixNanos::from(1),
483 None,
484 false,
485 );
486
487 assert!(timer.next().is_some());
488 assert!(timer.next().is_some());
489 timer.is_expired = true;
490 assert!(timer.next().is_none());
491 }
492
493 #[rstest]
494 fn test_test_timer_advance_within_next_time_ns() {
495 let mut timer = TestTimer::new(
496 Ustr::from("TEST_TIMER"),
497 NonZeroU64::new(5).unwrap(),
498 UnixNanos::default(),
499 None,
500 false,
501 );
502 let _: Vec<TimeEvent> = timer.advance(UnixNanos::from(1)).collect();
503 let _: Vec<TimeEvent> = timer.advance(UnixNanos::from(2)).collect();
504 let _: Vec<TimeEvent> = timer.advance(UnixNanos::from(3)).collect();
505 assert_eq!(timer.advance(UnixNanos::from(4)).count(), 0);
506 assert_eq!(timer.next_time_ns, 5);
507 assert!(!timer.is_expired);
508 }
509
510 #[rstest]
511 fn test_test_timer_advance_up_to_next_time_ns() {
512 let mut timer = TestTimer::new(
513 Ustr::from("TEST_TIMER"),
514 NonZeroU64::new(1).unwrap(),
515 UnixNanos::default(),
516 None,
517 false,
518 );
519 assert_eq!(timer.advance(UnixNanos::from(1)).count(), 1);
520 assert!(!timer.is_expired);
521 }
522
523 #[rstest]
524 fn test_test_timer_advance_up_to_next_time_ns_with_stop_time() {
525 let mut timer = TestTimer::new(
526 Ustr::from("TEST_TIMER"),
527 NonZeroU64::new(1).unwrap(),
528 UnixNanos::default(),
529 Some(UnixNanos::from(2)),
530 false,
531 );
532 assert_eq!(timer.advance(UnixNanos::from(2)).count(), 2);
533 assert!(timer.is_expired);
534 }
535
536 #[rstest]
537 fn test_test_timer_advance_beyond_next_time_ns() {
538 let mut timer = TestTimer::new(
539 Ustr::from("TEST_TIMER"),
540 NonZeroU64::new(1).unwrap(),
541 UnixNanos::default(),
542 Some(UnixNanos::from(5)),
543 false,
544 );
545 assert_eq!(timer.advance(UnixNanos::from(5)).count(), 5);
546 assert!(timer.is_expired);
547 }
548
549 #[rstest]
550 fn test_test_timer_advance_beyond_stop_time() {
551 let mut timer = TestTimer::new(
552 Ustr::from("TEST_TIMER"),
553 NonZeroU64::new(1).unwrap(),
554 UnixNanos::default(),
555 Some(UnixNanos::from(5)),
556 false,
557 );
558 assert_eq!(timer.advance(UnixNanos::from(10)).count(), 5);
559 assert!(timer.is_expired);
560 }
561
562 #[rstest]
563 fn test_test_timer_advance_exact_boundary() {
564 let mut timer = TestTimer::new(
565 Ustr::from("TEST_TIMER"),
566 NonZeroU64::new(5).unwrap(),
567 UnixNanos::from(0),
568 None,
569 false,
570 );
571 assert_eq!(
572 timer.advance(UnixNanos::from(5)).count(),
573 1,
574 "Expected one event at the 5 ns boundary"
575 );
576 assert_eq!(
577 timer.advance(UnixNanos::from(10)).count(),
578 1,
579 "Expected one event at the 10 ns boundary"
580 );
581 }
582
583 #[rstest]
584 fn test_test_timer_fire_immediately_true() {
585 let mut timer = TestTimer::new(
586 Ustr::from("TEST_TIMER"),
587 NonZeroU64::new(5).unwrap(),
588 UnixNanos::from(10),
589 None,
590 true, );
592
593 assert_eq!(timer.next_time_ns(), UnixNanos::from(10));
595
596 let events: Vec<TimeEvent> = timer.advance(UnixNanos::from(10)).collect();
598 assert_eq!(events.len(), 1);
599 assert_eq!(events[0].ts_event, UnixNanos::from(10));
600
601 assert_eq!(timer.next_time_ns(), UnixNanos::from(15));
603 }
604
605 #[rstest]
606 fn test_test_timer_fire_immediately_false() {
607 let mut timer = TestTimer::new(
608 Ustr::from("TEST_TIMER"),
609 NonZeroU64::new(5).unwrap(),
610 UnixNanos::from(10),
611 None,
612 false, );
614
615 assert_eq!(timer.next_time_ns(), UnixNanos::from(15));
617
618 assert_eq!(timer.advance(UnixNanos::from(10)).count(), 0);
620
621 let events: Vec<TimeEvent> = timer.advance(UnixNanos::from(15)).collect();
623 assert_eq!(events.len(), 1);
624 assert_eq!(events[0].ts_event, UnixNanos::from(15));
625 }
626
627 use proptest::prelude::*;
632
633 #[derive(Clone, Debug)]
634 enum TimerOperation {
635 AdvanceTime(u64),
636 Cancel,
637 }
638
639 fn timer_operation_strategy() -> impl Strategy<Value = TimerOperation> {
640 prop_oneof![
641 8 => prop::num::u64::ANY.prop_map(|v| TimerOperation::AdvanceTime(v % 1000 + 1)),
642 2 => Just(TimerOperation::Cancel),
643 ]
644 }
645
646 fn timer_config_strategy() -> impl Strategy<Value = (u64, u64, Option<u64>, bool)> {
647 (
648 1u64..=100u64, 0u64..=50u64, prop::option::of(51u64..=200u64), prop::bool::ANY, )
653 }
654
655 fn timer_test_strategy()
656 -> impl Strategy<Value = (Vec<TimerOperation>, (u64, u64, Option<u64>, bool))> {
657 (
658 prop::collection::vec(timer_operation_strategy(), 5..=50),
659 timer_config_strategy(),
660 )
661 }
662
663 #[allow(clippy::needless_collect)] fn test_timer_with_operations(
665 operations: Vec<TimerOperation>,
666 (interval_ns, start_time_ns, stop_time_ns, fire_immediately): (u64, u64, Option<u64>, bool),
667 ) {
668 let mut timer = TestTimer::new(
669 Ustr::from("PROP_TEST_TIMER"),
670 NonZeroU64::new(interval_ns).unwrap(),
671 UnixNanos::from(start_time_ns),
672 stop_time_ns.map(UnixNanos::from),
673 fire_immediately,
674 );
675
676 let mut current_time = start_time_ns;
677
678 for operation in operations {
679 if timer.is_expired() {
680 break;
681 }
682
683 match operation {
684 TimerOperation::AdvanceTime(delta) => {
685 let to_time = current_time + delta;
686 let events: Vec<TimeEvent> = timer.advance(UnixNanos::from(to_time)).collect();
687 current_time = to_time;
688
689 for (i, event) in events.iter().enumerate() {
691 if i > 0 {
693 assert!(
694 event.ts_event >= events[i - 1].ts_event,
695 "Events should be in chronological order"
696 );
697 }
698
699 assert!(
701 event.ts_event.as_u64() >= start_time_ns,
702 "Event timestamp should not be before start time"
703 );
704
705 assert!(
706 event.ts_event.as_u64() <= to_time,
707 "Event timestamp should not be after advance time"
708 );
709
710 if let Some(stop_time_ns) = stop_time_ns {
712 assert!(
713 event.ts_event.as_u64() <= stop_time_ns,
714 "Event timestamp should not exceed stop time"
715 );
716 }
717 }
718 }
719 TimerOperation::Cancel => {
720 timer.cancel();
721 assert!(timer.is_expired(), "Timer should be expired after cancel");
722 }
723 }
724
725 if !timer.is_expired() {
727 let expected_interval_multiple = if fire_immediately {
729 timer.next_time_ns().as_u64() >= start_time_ns
730 } else {
731 timer.next_time_ns().as_u64() >= start_time_ns + interval_ns
732 };
733 assert!(
734 expected_interval_multiple,
735 "Next time should respect interval spacing"
736 );
737
738 if let Some(stop_time_ns) = stop_time_ns
741 && timer.next_time_ns().as_u64() > stop_time_ns
742 {
743 let mut test_timer = timer;
745 let events: Vec<TimeEvent> = test_timer
746 .advance(UnixNanos::from(stop_time_ns + 1))
747 .collect();
748 assert!(
749 events.is_empty() || test_timer.is_expired(),
750 "Timer should not generate events beyond stop time"
751 );
752 }
753 }
754 }
755
756 if !timer.is_expired()
759 && let Some(stop_time_ns) = stop_time_ns
760 {
761 let events: Vec<TimeEvent> = timer
762 .advance(UnixNanos::from(stop_time_ns + 1000))
763 .collect();
764 assert!(
765 timer.is_expired() || events.is_empty(),
766 "Timer should eventually expire or stop generating events"
767 );
768 }
769 }
770
771 proptest! {
772 #[rstest]
773 fn prop_timer_advance_operations((operations, config) in timer_test_strategy()) {
774 test_timer_with_operations(operations, config);
775 }
776
777 #[rstest]
778 fn prop_timer_interval_consistency(
779 interval_ns in 1u64..=100u64,
780 start_time_ns in 0u64..=50u64,
781 fire_immediately in prop::bool::ANY,
782 advance_count in 1usize..=20usize,
783 ) {
784 let mut timer = TestTimer::new(
785 Ustr::from("CONSISTENCY_TEST"),
786 NonZeroU64::new(interval_ns).unwrap(),
787 UnixNanos::from(start_time_ns),
788 None, fire_immediately,
790 );
791
792 let mut previous_event_time = if fire_immediately { start_time_ns } else { start_time_ns + interval_ns };
793
794 for _ in 0..advance_count {
795 let events: Vec<TimeEvent> = timer.advance(UnixNanos::from(previous_event_time)).collect();
796
797 if !events.is_empty() {
798 prop_assert_eq!(events.len(), 1);
800 prop_assert_eq!(events[0].ts_event.as_u64(), previous_event_time);
801 }
802
803 previous_event_time += interval_ns;
804 }
805 }
806 }
807}