1use std::{any::Any, collections::BTreeMap, fmt::Debug, ops::Deref, time::Duration};
19
20use ahash::AHashMap;
21use chrono::{DateTime, Utc};
22use nautilus_core::{
23 AtomicTime, UnixNanos,
24 correctness::{check_positive_u64, check_predicate_true, check_valid_string_utf8},
25 formatting::Separable,
26};
27use ustr::Ustr;
28
29use crate::timer::{
30 TestTimer, TimeEvent, TimeEventCallback, TimeEventHandler, create_valid_interval,
31};
32
33pub trait Clock: Debug + Any {
39 fn utc_now(&self) -> DateTime<Utc> {
41 DateTime::from_timestamp_nanos(self.timestamp_ns().as_i64())
42 }
43
44 fn timestamp_ns(&self) -> UnixNanos;
46
47 fn timestamp_us(&self) -> u64;
49
50 fn timestamp_ms(&self) -> u64;
52
53 fn timestamp(&self) -> f64;
55
56 fn timer_names(&self) -> Vec<&str>;
58
59 fn timer_count(&self) -> usize;
61
62 fn timer_exists(&self, name: &Ustr) -> bool;
64
65 fn register_default_handler(&mut self, callback: TimeEventCallback);
68
69 fn get_handler(&self, event: TimeEvent) -> TimeEventHandler;
73
74 #[allow(clippy::too_many_arguments)]
88 fn set_time_alert(
89 &mut self,
90 name: &str,
91 alert_time: DateTime<Utc>,
92 callback: Option<TimeEventCallback>,
93 allow_past: Option<bool>,
94 ) -> anyhow::Result<()> {
95 self.set_time_alert_ns(name, alert_time.into(), callback, allow_past)
96 }
97
98 #[allow(clippy::too_many_arguments)]
119 fn set_time_alert_ns(
120 &mut self,
121 name: &str,
122 alert_time_ns: UnixNanos,
123 callback: Option<TimeEventCallback>,
124 allow_past: Option<bool>,
125 ) -> anyhow::Result<()>;
126
127 #[allow(clippy::too_many_arguments)]
143 fn set_timer(
144 &mut self,
145 name: &str,
146 interval: Duration,
147 start_time: Option<DateTime<Utc>>,
148 stop_time: Option<DateTime<Utc>>,
149 callback: Option<TimeEventCallback>,
150 allow_past: Option<bool>,
151 fire_immediately: Option<bool>,
152 ) -> anyhow::Result<()> {
153 self.set_timer_ns(
154 name,
155 interval.as_nanos() as u64,
156 start_time.map(UnixNanos::from),
157 stop_time.map(UnixNanos::from),
158 callback,
159 allow_past,
160 fire_immediately,
161 )
162 }
163
164 #[allow(clippy::too_many_arguments)]
192 fn set_timer_ns(
193 &mut self,
194 name: &str,
195 interval_ns: u64,
196 start_time_ns: Option<UnixNanos>,
197 stop_time_ns: Option<UnixNanos>,
198 callback: Option<TimeEventCallback>,
199 allow_past: Option<bool>,
200 fire_immediately: Option<bool>,
201 ) -> anyhow::Result<()>;
202
203 fn next_time_ns(&self, name: &str) -> Option<UnixNanos>;
207
208 fn cancel_timer(&mut self, name: &str);
210
211 fn cancel_timers(&mut self);
213
214 fn reset(&mut self);
216}
217
218impl dyn Clock {
219 pub fn as_any(&self) -> &dyn std::any::Any {
221 self
222 }
223 pub fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
225 self
226 }
227}
228
229#[derive(Debug, Default)]
234pub struct CallbackRegistry {
235 default_callback: Option<TimeEventCallback>,
236 callbacks: AHashMap<Ustr, TimeEventCallback>,
237}
238
239impl CallbackRegistry {
240 #[must_use]
242 pub fn new() -> Self {
243 Self {
244 default_callback: None,
245 callbacks: AHashMap::new(),
246 }
247 }
248
249 pub fn register_default_handler(&mut self, callback: TimeEventCallback) {
251 self.default_callback = Some(callback);
252 }
253
254 pub fn register_callback(&mut self, name: Ustr, callback: TimeEventCallback) {
256 self.callbacks.insert(name, callback);
257 }
258
259 #[must_use]
261 pub fn has_any_callback(&self, name: &Ustr) -> bool {
262 self.callbacks.contains_key(name) || self.default_callback.is_some()
263 }
264
265 #[must_use]
267 pub fn get_callback(&self, name: &Ustr) -> Option<TimeEventCallback> {
268 self.callbacks
269 .get(name)
270 .cloned()
271 .or_else(|| self.default_callback.clone())
272 }
273
274 #[must_use]
280 pub fn get_handler(&self, event: TimeEvent) -> TimeEventHandler {
281 let callback = self
282 .get_callback(&event.name)
283 .unwrap_or_else(|| panic!("Event '{}' should have associated handler", event.name));
284
285 TimeEventHandler::new(event, callback)
286 }
287
288 pub fn clear(&mut self) {
290 self.callbacks.clear();
291 }
292}
293
294pub fn validate_and_prepare_time_alert(
302 name: &str,
303 mut alert_time_ns: UnixNanos,
304 allow_past: Option<bool>,
305 ts_now: UnixNanos,
306) -> anyhow::Result<(Ustr, UnixNanos)> {
307 check_valid_string_utf8(name, stringify!(name))?;
308
309 let name = Ustr::from(name);
310 let allow_past = allow_past.unwrap_or(true);
311
312 if alert_time_ns < ts_now {
313 if allow_past {
314 alert_time_ns = ts_now;
315 log::warn!(
316 "Timer '{name}' alert time {} was in the past, adjusted to current time for immediate firing",
317 alert_time_ns.to_rfc3339(),
318 );
319 } else {
320 anyhow::bail!(
321 "Timer '{name}' alert time {} was in the past (current time is {ts_now})",
322 alert_time_ns.to_rfc3339(),
323 );
324 }
325 }
326
327 Ok((name, alert_time_ns))
328}
329
330pub fn validate_and_prepare_timer(
339 name: &str,
340 interval_ns: u64,
341 start_time_ns: Option<UnixNanos>,
342 stop_time_ns: Option<UnixNanos>,
343 allow_past: Option<bool>,
344 fire_immediately: Option<bool>,
345 ts_now: UnixNanos,
346) -> anyhow::Result<(Ustr, UnixNanos, Option<UnixNanos>, bool, bool)> {
347 check_valid_string_utf8(name, stringify!(name))?;
348 check_positive_u64(interval_ns, stringify!(interval_ns))?;
349
350 let name = Ustr::from(name);
351 let allow_past = allow_past.unwrap_or(true);
352 let fire_immediately = fire_immediately.unwrap_or(false);
353
354 let mut start_time_ns = start_time_ns.unwrap_or_default();
355
356 if start_time_ns == 0 {
357 start_time_ns = ts_now;
359 } else if !allow_past {
360 let next_event_time = if fire_immediately {
361 start_time_ns
362 } else {
363 start_time_ns + interval_ns
364 };
365
366 if next_event_time < ts_now {
367 anyhow::bail!(
368 "Timer '{name}' next event time {} would be in the past (current time is {ts_now})",
369 next_event_time.to_rfc3339(),
370 );
371 }
372 }
373
374 if let Some(stop_time) = stop_time_ns {
375 if stop_time <= start_time_ns {
376 anyhow::bail!(
377 "Timer '{name}' stop time {} must be after start time {}",
378 stop_time.to_rfc3339(),
379 start_time_ns.to_rfc3339(),
380 );
381 }
382
383 if !allow_past && stop_time <= ts_now {
384 anyhow::bail!(
385 "Timer '{name}' stop time {} is in the past (current time is {ts_now})",
386 stop_time.to_rfc3339(),
387 );
388 }
389 }
390
391 Ok((
392 name,
393 start_time_ns,
394 stop_time_ns,
395 allow_past,
396 fire_immediately,
397 ))
398}
399
400#[derive(Debug)]
408pub struct TestClock {
409 time: AtomicTime,
410 timers: BTreeMap<Ustr, TestTimer>,
412 callbacks: CallbackRegistry,
413}
414
415impl TestClock {
416 #[must_use]
418 pub fn new() -> Self {
419 Self {
420 time: AtomicTime::new(false, UnixNanos::default()),
421 timers: BTreeMap::new(),
422 callbacks: CallbackRegistry::new(),
423 }
424 }
425
426 #[must_use]
428 pub const fn get_timers(&self) -> &BTreeMap<Ustr, TestTimer> {
429 &self.timers
430 }
431
432 pub fn advance_time(&mut self, to_time_ns: UnixNanos, set_time: bool) -> Vec<TimeEvent> {
449 const WARN_TIME_EVENTS_THRESHOLD: usize = 1_000_000;
450
451 let from_time_ns = self.time.get_time_ns();
452
453 assert!(
454 to_time_ns >= from_time_ns,
455 "Invariant violated: time must be non-decreasing, `to_time_ns` {to_time_ns} < `from_time_ns` {from_time_ns}"
456 );
457
458 if set_time {
459 self.time.set_time(to_time_ns);
460 }
461
462 let mut events: Vec<TimeEvent> = Vec::new();
464 self.timers.retain(|_, timer| {
465 timer.advance(to_time_ns).for_each(|event| {
466 events.push(event);
467 });
468
469 !timer.is_expired()
470 });
471
472 if events.len() >= WARN_TIME_EVENTS_THRESHOLD {
473 log::warn!(
474 "Allocated {} time events during clock advancement from {} to {}, \
475 consider stopping the timer between large time ranges with no data points",
476 events.len().separate_with_commas(),
477 from_time_ns,
478 to_time_ns
479 );
480 }
481
482 events.sort_by_key(|a| a.ts_event);
483 events
484 }
485
486 #[must_use]
496 pub fn match_handlers(&self, events: Vec<TimeEvent>) -> Vec<TimeEventHandler> {
497 events
498 .into_iter()
499 .map(|event| self.callbacks.get_handler(event))
500 .collect()
501 }
502
503 fn replace_existing_timer_if_needed(&mut self, name: &Ustr) {
504 if self.timer_exists(name) {
505 self.cancel_timer(name.as_str());
506 log::warn!("Timer '{name}' replaced");
507 }
508 }
509}
510
511impl Default for TestClock {
512 fn default() -> Self {
514 Self::new()
515 }
516}
517
518impl Deref for TestClock {
519 type Target = AtomicTime;
520
521 fn deref(&self) -> &Self::Target {
522 &self.time
523 }
524}
525
526impl Clock for TestClock {
527 fn timestamp_ns(&self) -> UnixNanos {
528 self.time.get_time_ns()
529 }
530
531 fn timestamp_us(&self) -> u64 {
532 self.time.get_time_us()
533 }
534
535 fn timestamp_ms(&self) -> u64 {
536 self.time.get_time_ms()
537 }
538
539 fn timestamp(&self) -> f64 {
540 self.time.get_time()
541 }
542
543 fn timer_names(&self) -> Vec<&str> {
544 self.timers
545 .iter()
546 .filter(|(_, timer)| !timer.is_expired())
547 .map(|(k, _)| k.as_str())
548 .collect()
549 }
550
551 fn timer_count(&self) -> usize {
552 self.timers
553 .iter()
554 .filter(|(_, timer)| !timer.is_expired())
555 .count()
556 }
557
558 fn timer_exists(&self, name: &Ustr) -> bool {
559 self.timers.contains_key(name)
560 }
561
562 fn register_default_handler(&mut self, callback: TimeEventCallback) {
563 self.callbacks.register_default_handler(callback);
564 }
565
566 fn get_handler(&self, event: TimeEvent) -> TimeEventHandler {
572 self.callbacks.get_handler(event)
573 }
574
575 fn set_time_alert_ns(
576 &mut self,
577 name: &str,
578 alert_time_ns: UnixNanos,
579 callback: Option<TimeEventCallback>,
580 allow_past: Option<bool>,
581 ) -> anyhow::Result<()> {
582 let ts_now = self.get_time_ns();
583 let (name, alert_time_ns) =
584 validate_and_prepare_time_alert(name, alert_time_ns, allow_past, ts_now)?;
585
586 self.replace_existing_timer_if_needed(&name);
587
588 check_predicate_true(
589 callback.is_some() | self.callbacks.has_any_callback(&name),
590 "No callbacks provided",
591 )?;
592
593 if let Some(callback) = callback {
594 self.callbacks.register_callback(name, callback);
595 }
596
597 let interval_ns = create_valid_interval((alert_time_ns - ts_now).into());
599 let fire_immediately = alert_time_ns == ts_now;
600
601 let timer = TestTimer::new(
602 name,
603 interval_ns,
604 ts_now,
605 Some(alert_time_ns),
606 fire_immediately,
607 );
608 self.timers.insert(name, timer);
609
610 Ok(())
611 }
612
613 fn set_timer_ns(
614 &mut self,
615 name: &str,
616 interval_ns: u64,
617 start_time_ns: Option<UnixNanos>,
618 stop_time_ns: Option<UnixNanos>,
619 callback: Option<TimeEventCallback>,
620 allow_past: Option<bool>,
621 fire_immediately: Option<bool>,
622 ) -> anyhow::Result<()> {
623 let ts_now = self.get_time_ns();
624 let (name, start_time_ns, stop_time_ns, _allow_past, fire_immediately) =
625 validate_and_prepare_timer(
626 name,
627 interval_ns,
628 start_time_ns,
629 stop_time_ns,
630 allow_past,
631 fire_immediately,
632 ts_now,
633 )?;
634
635 check_predicate_true(
636 callback.is_some() | self.callbacks.has_any_callback(&name),
637 "No callbacks provided",
638 )?;
639
640 self.replace_existing_timer_if_needed(&name);
641
642 if let Some(callback) = callback {
643 self.callbacks.register_callback(name, callback);
644 }
645
646 let interval_ns = create_valid_interval(interval_ns);
647
648 let timer = TestTimer::new(
649 name,
650 interval_ns,
651 start_time_ns,
652 stop_time_ns,
653 fire_immediately,
654 );
655 self.timers.insert(name, timer);
656
657 Ok(())
658 }
659
660 fn next_time_ns(&self, name: &str) -> Option<UnixNanos> {
661 self.timers
662 .get(&Ustr::from(name))
663 .map(|timer| timer.next_time_ns())
664 }
665
666 fn cancel_timer(&mut self, name: &str) {
667 let timer = self.timers.remove(&Ustr::from(name));
668 if let Some(mut timer) = timer {
669 timer.cancel();
670 }
671 }
672
673 fn cancel_timers(&mut self) {
674 for timer in &mut self.timers.values_mut() {
675 timer.cancel();
676 }
677
678 self.timers.clear();
679 }
680
681 fn reset(&mut self) {
682 self.time = AtomicTime::new(false, UnixNanos::default());
683 self.timers = BTreeMap::new();
684 self.callbacks.clear();
685 }
686}
687
688#[cfg(test)]
689mod tests {
690 use std::{
691 sync::{Arc, Mutex},
692 time::Duration,
693 };
694
695 use nautilus_core::{MUTEX_POISONED, UnixNanos};
696 use rstest::{fixture, rstest};
697 use ustr::Ustr;
698
699 use super::*;
700 use crate::timer::{TimeEvent, TimeEventCallback};
701
702 #[derive(Debug, Default)]
703 struct TestCallback {
704 called: Arc<Mutex<bool>>,
706 }
707
708 impl TestCallback {
709 fn new(called: Arc<Mutex<bool>>) -> Self {
710 Self { called }
711 }
712 }
713
714 impl From<TestCallback> for TimeEventCallback {
715 fn from(callback: TestCallback) -> Self {
716 Self::from(move |_event: TimeEvent| {
717 if let Ok(mut called) = callback.called.lock() {
718 *called = true;
719 }
720 })
721 }
722 }
723
724 #[fixture]
725 pub fn test_clock() -> TestClock {
726 let mut clock = TestClock::new();
727 clock.register_default_handler(TestCallback::default().into());
728 clock
729 }
730
731 #[rstest]
732 fn test_time_monotonicity(mut test_clock: TestClock) {
733 let initial_time = test_clock.timestamp_ns();
734 test_clock.advance_time(UnixNanos::from(*initial_time + 1000), true);
735 assert!(test_clock.timestamp_ns() > initial_time);
736 }
737
738 #[rstest]
739 fn test_timer_registration(mut test_clock: TestClock) {
740 test_clock
741 .set_time_alert_ns(
742 "test_timer",
743 (*test_clock.timestamp_ns() + 1000).into(),
744 None,
745 None,
746 )
747 .unwrap();
748 assert_eq!(test_clock.timer_count(), 1);
749 assert_eq!(test_clock.timer_names(), vec!["test_timer"]);
750 }
751
752 #[rstest]
753 fn test_timer_expiration(mut test_clock: TestClock) {
754 let alert_time = (*test_clock.timestamp_ns() + 1000).into();
755 test_clock
756 .set_time_alert_ns("test_timer", alert_time, None, None)
757 .unwrap();
758 let events = test_clock.advance_time(alert_time, true);
759 assert_eq!(events.len(), 1);
760 assert_eq!(events[0].name.as_str(), "test_timer");
761 }
762
763 #[rstest]
764 fn test_timer_cancellation(mut test_clock: TestClock) {
765 test_clock
766 .set_time_alert_ns(
767 "test_timer",
768 (*test_clock.timestamp_ns() + 1000).into(),
769 None,
770 None,
771 )
772 .unwrap();
773 assert_eq!(test_clock.timer_count(), 1);
774 test_clock.cancel_timer("test_timer");
775 assert_eq!(test_clock.timer_count(), 0);
776 }
777
778 #[rstest]
779 fn test_time_advancement(mut test_clock: TestClock) {
780 let start_time = test_clock.timestamp_ns();
781 test_clock
782 .set_timer_ns("test_timer", 1000, Some(start_time), None, None, None, None)
783 .unwrap();
784 let events = test_clock.advance_time(UnixNanos::from(*start_time + 2500), true);
785 assert_eq!(events.len(), 2);
786 assert_eq!(*events[0].ts_event, *start_time + 1000);
787 assert_eq!(*events[1].ts_event, *start_time + 2000);
788 }
789
790 #[rstest]
791 fn test_default_and_custom_callbacks() {
792 let mut clock = TestClock::new();
793 let default_called = Arc::new(Mutex::new(false));
794 let custom_called = Arc::new(Mutex::new(false));
795
796 let default_callback = TestCallback::new(Arc::clone(&default_called));
797 let custom_callback = TestCallback::new(Arc::clone(&custom_called));
798
799 clock.register_default_handler(TimeEventCallback::from(default_callback));
800 clock
801 .set_time_alert_ns(
802 "default_timer",
803 (*clock.timestamp_ns() + 1000).into(),
804 None,
805 None,
806 )
807 .unwrap();
808 clock
809 .set_time_alert_ns(
810 "custom_timer",
811 (*clock.timestamp_ns() + 1000).into(),
812 Some(TimeEventCallback::from(custom_callback)),
813 None,
814 )
815 .unwrap();
816
817 let events = clock.advance_time(UnixNanos::from(*clock.timestamp_ns() + 1000), true);
818 let handlers = clock.match_handlers(events);
819
820 for handler in handlers {
821 handler.callback.call(handler.event);
822 }
823
824 assert!(*default_called.lock().expect(MUTEX_POISONED));
825 assert!(*custom_called.lock().expect(MUTEX_POISONED));
826 }
827
828 #[rstest]
829 fn test_timer_with_rust_local_callback() {
830 use std::{cell::RefCell, rc::Rc};
831
832 let mut clock = TestClock::new();
833 let call_count = Rc::new(RefCell::new(0_u32));
834 let call_count_clone = Rc::clone(&call_count);
835
836 let callback: Rc<dyn Fn(TimeEvent)> = Rc::new(move |_event: TimeEvent| {
838 *call_count_clone.borrow_mut() += 1;
839 });
840
841 clock
842 .set_time_alert_ns(
843 "local_timer",
844 (*clock.timestamp_ns() + 1000).into(),
845 Some(TimeEventCallback::from(callback)),
846 None,
847 )
848 .unwrap();
849
850 let events = clock.advance_time(UnixNanos::from(*clock.timestamp_ns() + 1000), true);
851 let handlers = clock.match_handlers(events);
852
853 for handler in handlers {
854 handler.callback.call(handler.event);
855 }
856
857 assert_eq!(*call_count.borrow(), 1);
858 }
859
860 #[rstest]
861 fn test_multiple_timers(mut test_clock: TestClock) {
862 let start_time = test_clock.timestamp_ns();
863 test_clock
864 .set_timer_ns("timer1", 1000, Some(start_time), None, None, None, None)
865 .unwrap();
866 test_clock
867 .set_timer_ns("timer2", 2000, Some(start_time), None, None, None, None)
868 .unwrap();
869 let events = test_clock.advance_time(UnixNanos::from(*start_time + 2000), true);
870 assert_eq!(events.len(), 3);
871 assert_eq!(events[0].name.as_str(), "timer1");
872 assert_eq!(events[1].name.as_str(), "timer1");
873 assert_eq!(events[2].name.as_str(), "timer2");
874 }
875
876 #[rstest]
877 fn test_allow_past_parameter_true(mut test_clock: TestClock) {
878 test_clock.set_time(UnixNanos::from(2000));
879 let current_time = test_clock.timestamp_ns();
880 let past_time = UnixNanos::from(current_time.as_u64() - 1000);
881
882 test_clock
884 .set_time_alert_ns("past_timer", past_time, None, Some(true))
885 .unwrap();
886
887 assert_eq!(test_clock.timer_count(), 1);
889 assert_eq!(test_clock.timer_names(), vec!["past_timer"]);
890
891 let next_time = test_clock.next_time_ns("past_timer").unwrap();
893 assert!(next_time >= current_time);
894 }
895
896 #[rstest]
897 fn test_allow_past_parameter_false(mut test_clock: TestClock) {
898 test_clock.set_time(UnixNanos::from(2000));
899 let current_time = test_clock.timestamp_ns();
900 let past_time = current_time - 1000;
901
902 let result = test_clock.set_time_alert_ns("past_timer", past_time, None, Some(false));
904
905 assert!(result.is_err());
907 assert!(format!("{}", result.unwrap_err()).contains("was in the past"));
908
909 assert_eq!(test_clock.timer_count(), 0);
911 assert!(test_clock.timer_names().is_empty());
912 }
913
914 #[rstest]
915 fn test_invalid_stop_time_validation(mut test_clock: TestClock) {
916 test_clock.set_time(UnixNanos::from(2000));
917 let current_time = test_clock.timestamp_ns();
918 let start_time = current_time + 1000;
919 let stop_time = current_time + 500; let result = test_clock.set_timer_ns(
923 "invalid_timer",
924 100,
925 Some(start_time),
926 Some(stop_time),
927 None,
928 None,
929 None,
930 );
931
932 assert!(result.is_err());
934 assert!(format!("{}", result.unwrap_err()).contains("must be after start time"));
935
936 assert_eq!(test_clock.timer_count(), 0);
938 }
939
940 #[rstest]
941 fn test_set_timer_ns_fire_immediately_true(mut test_clock: TestClock) {
942 let start_time = test_clock.timestamp_ns();
943 let interval_ns = 1000;
944
945 test_clock
946 .set_timer_ns(
947 "fire_immediately_timer",
948 interval_ns,
949 Some(start_time),
950 None,
951 None,
952 None,
953 Some(true),
954 )
955 .unwrap();
956
957 let events = test_clock.advance_time(start_time + 2500, true);
959
960 assert_eq!(events.len(), 3);
962 assert_eq!(*events[0].ts_event, *start_time); assert_eq!(*events[1].ts_event, *start_time + 1000); assert_eq!(*events[2].ts_event, *start_time + 2000); }
966
967 #[rstest]
968 fn test_set_timer_ns_fire_immediately_false(mut test_clock: TestClock) {
969 let start_time = test_clock.timestamp_ns();
970 let interval_ns = 1000;
971
972 test_clock
973 .set_timer_ns(
974 "normal_timer",
975 interval_ns,
976 Some(start_time),
977 None,
978 None,
979 None,
980 Some(false),
981 )
982 .unwrap();
983
984 let events = test_clock.advance_time(start_time + 2500, true);
986
987 assert_eq!(events.len(), 2);
989 assert_eq!(*events[0].ts_event, *start_time + 1000); assert_eq!(*events[1].ts_event, *start_time + 2000); }
992
993 #[rstest]
994 fn test_set_timer_ns_fire_immediately_default_is_false(mut test_clock: TestClock) {
995 let start_time = test_clock.timestamp_ns();
996 let interval_ns = 1000;
997
998 test_clock
1000 .set_timer_ns(
1001 "default_timer",
1002 interval_ns,
1003 Some(start_time),
1004 None,
1005 None,
1006 None,
1007 None,
1008 )
1009 .unwrap();
1010
1011 let events = test_clock.advance_time(start_time + 1500, true);
1012
1013 assert_eq!(events.len(), 1);
1015 assert_eq!(*events[0].ts_event, *start_time + 1000); }
1017
1018 #[rstest]
1019 fn test_set_timer_ns_fire_immediately_with_zero_start_time(mut test_clock: TestClock) {
1020 test_clock.set_time(5000.into());
1021 let interval_ns = 1000;
1022
1023 test_clock
1024 .set_timer_ns(
1025 "zero_start_timer",
1026 interval_ns,
1027 None,
1028 None,
1029 None,
1030 None,
1031 Some(true),
1032 )
1033 .unwrap();
1034
1035 let events = test_clock.advance_time(UnixNanos::from(7000), true);
1036
1037 assert_eq!(events.len(), 3);
1040 assert_eq!(*events[0].ts_event, 5000); assert_eq!(*events[1].ts_event, 6000);
1042 assert_eq!(*events[2].ts_event, 7000);
1043 }
1044
1045 #[rstest]
1046 fn test_multiple_timers_different_fire_immediately_settings(mut test_clock: TestClock) {
1047 let start_time = test_clock.timestamp_ns();
1048 let interval_ns = 1000;
1049
1050 test_clock
1052 .set_timer_ns(
1053 "immediate_timer",
1054 interval_ns,
1055 Some(start_time),
1056 None,
1057 None,
1058 None,
1059 Some(true),
1060 )
1061 .unwrap();
1062
1063 test_clock
1065 .set_timer_ns(
1066 "normal_timer",
1067 interval_ns,
1068 Some(start_time),
1069 None,
1070 None,
1071 None,
1072 Some(false),
1073 )
1074 .unwrap();
1075
1076 let events = test_clock.advance_time(start_time + 1500, true);
1077
1078 assert_eq!(events.len(), 3);
1080
1081 let mut event_times: Vec<u64> = events.iter().map(|e| e.ts_event.as_u64()).collect();
1083 event_times.sort_unstable();
1084
1085 assert_eq!(event_times[0], start_time.as_u64()); assert_eq!(event_times[1], start_time.as_u64() + 1000); assert_eq!(event_times[2], start_time.as_u64() + 1000); }
1089
1090 #[rstest]
1091 fn test_timer_name_collision_overwrites(mut test_clock: TestClock) {
1092 let start_time = test_clock.timestamp_ns();
1093
1094 test_clock
1096 .set_timer_ns(
1097 "collision_timer",
1098 1000,
1099 Some(start_time),
1100 None,
1101 None,
1102 None,
1103 None,
1104 )
1105 .unwrap();
1106
1107 let result = test_clock.set_timer_ns(
1109 "collision_timer",
1110 2000,
1111 Some(start_time),
1112 None,
1113 None,
1114 None,
1115 None,
1116 );
1117
1118 assert!(result.is_ok());
1119 assert_eq!(test_clock.timer_count(), 1);
1121
1122 let next_time = test_clock.next_time_ns("collision_timer").unwrap();
1124 assert_eq!(next_time, start_time + 2000);
1126 }
1127
1128 #[rstest]
1129 fn test_timer_zero_interval_error(mut test_clock: TestClock) {
1130 let start_time = test_clock.timestamp_ns();
1131
1132 let result =
1134 test_clock.set_timer_ns("zero_interval", 0, Some(start_time), None, None, None, None);
1135
1136 assert!(result.is_err());
1137 assert_eq!(test_clock.timer_count(), 0);
1138 }
1139
1140 #[rstest]
1141 fn test_timer_empty_name_error(mut test_clock: TestClock) {
1142 let start_time = test_clock.timestamp_ns();
1143
1144 let result = test_clock.set_timer_ns("", 1000, Some(start_time), None, None, None, None);
1146
1147 assert!(result.is_err());
1148 assert_eq!(test_clock.timer_count(), 0);
1149 }
1150
1151 #[rstest]
1152 fn test_timer_exists(mut test_clock: TestClock) {
1153 let name = Ustr::from("exists_timer");
1154 assert!(!test_clock.timer_exists(&name));
1155
1156 test_clock
1157 .set_time_alert_ns(
1158 name.as_str(),
1159 (*test_clock.timestamp_ns() + 1_000).into(),
1160 None,
1161 None,
1162 )
1163 .unwrap();
1164
1165 assert!(test_clock.timer_exists(&name));
1166 }
1167
1168 #[rstest]
1169 fn test_timer_rejects_past_stop_time_when_not_allowed(mut test_clock: TestClock) {
1170 test_clock.set_time(UnixNanos::from(10_000));
1171 let current = test_clock.timestamp_ns();
1172
1173 let result = test_clock.set_timer_ns(
1174 "past_stop",
1175 10_000,
1176 Some(current - 500),
1177 Some(current - 100),
1178 None,
1179 Some(false),
1180 None,
1181 );
1182
1183 let err = result.expect_err("expected stop time validation error");
1184 let err_msg = err.to_string();
1185 assert!(err_msg.contains("stop time"));
1186 assert!(err_msg.contains("in the past"));
1187 }
1188
1189 #[rstest]
1190 fn test_timer_accepts_future_stop_time(mut test_clock: TestClock) {
1191 let current = test_clock.timestamp_ns();
1192
1193 let result = test_clock.set_timer_ns(
1194 "future_stop",
1195 1_000,
1196 Some(current),
1197 Some(current + 10_000),
1198 None,
1199 Some(false),
1200 None,
1201 );
1202
1203 assert!(result.is_ok());
1204 }
1205
1206 #[rstest]
1207 fn test_timer_fire_immediately_at_exact_stop_time(mut test_clock: TestClock) {
1208 let start_time = test_clock.timestamp_ns();
1209 let interval_ns = 1000;
1210 let stop_time = start_time + interval_ns; test_clock
1213 .set_timer_ns(
1214 "exact_stop",
1215 interval_ns,
1216 Some(start_time),
1217 Some(stop_time),
1218 None,
1219 None,
1220 Some(true),
1221 )
1222 .unwrap();
1223
1224 let events = test_clock.advance_time(stop_time, true);
1225
1226 assert_eq!(events.len(), 2);
1228 assert_eq!(*events[0].ts_event, *start_time); assert_eq!(*events[1].ts_event, *stop_time); }
1231
1232 #[rstest]
1233 fn test_timer_advance_to_exact_next_time(mut test_clock: TestClock) {
1234 let start_time = test_clock.timestamp_ns();
1235 let interval_ns = 1000;
1236
1237 test_clock
1238 .set_timer_ns(
1239 "exact_advance",
1240 interval_ns,
1241 Some(start_time),
1242 None,
1243 None,
1244 None,
1245 Some(false),
1246 )
1247 .unwrap();
1248
1249 let next_time = test_clock.next_time_ns("exact_advance").unwrap();
1251 let events = test_clock.advance_time(next_time, true);
1252
1253 assert_eq!(events.len(), 1);
1254 assert_eq!(*events[0].ts_event, *next_time);
1255 }
1256
1257 #[rstest]
1258 fn test_allow_past_bar_aggregation_use_case(mut test_clock: TestClock) {
1259 test_clock.set_time(UnixNanos::from(100_500)); let bar_start_time = UnixNanos::from(100_000); let interval_ns = 1000; let result = test_clock.set_timer_ns(
1269 "bar_timer",
1270 interval_ns,
1271 Some(bar_start_time),
1272 None,
1273 None,
1274 Some(false), Some(false), );
1277
1278 assert!(result.is_ok());
1280 assert_eq!(test_clock.timer_count(), 1);
1281
1282 let next_time = test_clock.next_time_ns("bar_timer").unwrap();
1284 assert_eq!(*next_time, 101_000);
1285 }
1286
1287 #[rstest]
1288 fn test_allow_past_false_rejects_when_next_event_in_past(mut test_clock: TestClock) {
1289 test_clock.set_time(UnixNanos::from(102_000)); let past_start_time = UnixNanos::from(100_000); let interval_ns = 1000; let result = test_clock.set_timer_ns(
1298 "past_event_timer",
1299 interval_ns,
1300 Some(past_start_time),
1301 None,
1302 None,
1303 Some(false), Some(false), );
1306
1307 assert!(result.is_err());
1309 assert!(
1310 result
1311 .unwrap_err()
1312 .to_string()
1313 .contains("would be in the past")
1314 );
1315 }
1316
1317 #[rstest]
1318 fn test_allow_past_false_with_fire_immediately_true(mut test_clock: TestClock) {
1319 test_clock.set_time(UnixNanos::from(100_500)); let past_start_time = UnixNanos::from(100_000); let interval_ns = 1000;
1323
1324 let result = test_clock.set_timer_ns(
1327 "immediate_past_timer",
1328 interval_ns,
1329 Some(past_start_time),
1330 None,
1331 None,
1332 Some(false), Some(true), );
1335
1336 assert!(result.is_err());
1338 assert!(
1339 result
1340 .unwrap_err()
1341 .to_string()
1342 .contains("would be in the past")
1343 );
1344 }
1345
1346 #[rstest]
1347 fn test_cancel_timer_during_execution(mut test_clock: TestClock) {
1348 let start_time = test_clock.timestamp_ns();
1349
1350 test_clock
1351 .set_timer_ns(
1352 "cancel_test",
1353 1000,
1354 Some(start_time),
1355 None,
1356 None,
1357 None,
1358 None,
1359 )
1360 .unwrap();
1361
1362 assert_eq!(test_clock.timer_count(), 1);
1363
1364 test_clock.cancel_timer("cancel_test");
1366
1367 assert_eq!(test_clock.timer_count(), 0);
1368
1369 let events = test_clock.advance_time(start_time + 2000, true);
1371 assert_eq!(events.len(), 0);
1372 }
1373
1374 #[rstest]
1375 fn test_cancel_all_timers(mut test_clock: TestClock) {
1376 test_clock
1378 .set_timer_ns("timer1", 1000, None, None, None, None, None)
1379 .unwrap();
1380 test_clock
1381 .set_timer_ns("timer2", 1500, None, None, None, None, None)
1382 .unwrap();
1383 test_clock
1384 .set_timer_ns("timer3", 2000, None, None, None, None, None)
1385 .unwrap();
1386
1387 assert_eq!(test_clock.timer_count(), 3);
1388
1389 test_clock.cancel_timers();
1391
1392 assert_eq!(test_clock.timer_count(), 0);
1393
1394 let events = test_clock.advance_time(UnixNanos::from(5000), true);
1396 assert_eq!(events.len(), 0);
1397 }
1398
1399 #[rstest]
1400 fn test_clock_reset_clears_timers(mut test_clock: TestClock) {
1401 test_clock
1402 .set_timer_ns("reset_test", 1000, None, None, None, None, None)
1403 .unwrap();
1404
1405 assert_eq!(test_clock.timer_count(), 1);
1406
1407 test_clock.reset();
1409
1410 assert_eq!(test_clock.timer_count(), 0);
1411 assert_eq!(test_clock.timestamp_ns(), UnixNanos::default()); }
1413
1414 #[rstest]
1415 fn test_set_time_alert_default_impl(mut test_clock: TestClock) {
1416 let current_time = test_clock.utc_now();
1417 let alert_time = current_time + chrono::Duration::seconds(1);
1418
1419 test_clock
1421 .set_time_alert("alert_test", alert_time, None, None)
1422 .unwrap();
1423
1424 assert_eq!(test_clock.timer_count(), 1);
1425 assert_eq!(test_clock.timer_names(), vec!["alert_test"]);
1426
1427 let expected_ns = UnixNanos::from(alert_time);
1429 let next_time = test_clock.next_time_ns("alert_test").unwrap();
1430
1431 let diff = if next_time >= expected_ns {
1433 next_time.as_u64() - expected_ns.as_u64()
1434 } else {
1435 expected_ns.as_u64() - next_time.as_u64()
1436 };
1437 assert!(
1438 diff < 1000,
1439 "Timer should be set within 1 microsecond of expected time"
1440 );
1441 }
1442
1443 #[rstest]
1444 fn test_set_timer_default_impl(mut test_clock: TestClock) {
1445 let current_time = test_clock.utc_now();
1446 let start_time = current_time + chrono::Duration::seconds(1);
1447 let interval = Duration::from_millis(500);
1448
1449 test_clock
1451 .set_timer(
1452 "timer_test",
1453 interval,
1454 Some(start_time),
1455 None,
1456 None,
1457 None,
1458 None,
1459 )
1460 .unwrap();
1461
1462 assert_eq!(test_clock.timer_count(), 1);
1463 assert_eq!(test_clock.timer_names(), vec!["timer_test"]);
1464
1465 let start_ns = UnixNanos::from(start_time);
1467 let interval_ns = interval.as_nanos() as u64;
1468
1469 let events = test_clock.advance_time(start_ns + interval_ns * 3, true);
1470 assert_eq!(events.len(), 3); assert_eq!(*events[0].ts_event, *start_ns + interval_ns);
1474 assert_eq!(*events[1].ts_event, *start_ns + interval_ns * 2);
1475 assert_eq!(*events[2].ts_event, *start_ns + interval_ns * 3);
1476 }
1477
1478 #[rstest]
1479 fn test_set_timer_with_stop_time_default_impl(mut test_clock: TestClock) {
1480 let current_time = test_clock.utc_now();
1481 let start_time = current_time + chrono::Duration::seconds(1);
1482 let stop_time = current_time + chrono::Duration::seconds(3);
1483 let interval = Duration::from_secs(1);
1484
1485 test_clock
1487 .set_timer(
1488 "timer_with_stop",
1489 interval,
1490 Some(start_time),
1491 Some(stop_time),
1492 None,
1493 None,
1494 None,
1495 )
1496 .unwrap();
1497
1498 assert_eq!(test_clock.timer_count(), 1);
1499
1500 let stop_ns = UnixNanos::from(stop_time);
1502 let events = test_clock.advance_time(stop_ns + 1000, true);
1503
1504 assert_eq!(events.len(), 2);
1506
1507 let start_ns = UnixNanos::from(start_time);
1508 let interval_ns = interval.as_nanos() as u64;
1509 assert_eq!(*events[0].ts_event, *start_ns + interval_ns);
1510 assert_eq!(*events[1].ts_event, *start_ns + interval_ns * 2);
1511 }
1512
1513 #[rstest]
1514 fn test_set_timer_fire_immediately_default_impl(mut test_clock: TestClock) {
1515 let current_time = test_clock.utc_now();
1516 let start_time = current_time + chrono::Duration::seconds(1);
1517 let interval = Duration::from_millis(500);
1518
1519 test_clock
1521 .set_timer(
1522 "immediate_timer",
1523 interval,
1524 Some(start_time),
1525 None,
1526 None,
1527 None,
1528 Some(true),
1529 )
1530 .unwrap();
1531
1532 let start_ns = UnixNanos::from(start_time);
1533 let interval_ns = interval.as_nanos() as u64;
1534
1535 let events = test_clock.advance_time(start_ns + interval_ns, true);
1537
1538 assert_eq!(events.len(), 2);
1540 assert_eq!(*events[0].ts_event, *start_ns); assert_eq!(*events[1].ts_event, *start_ns + interval_ns); }
1543
1544 #[rstest]
1545 fn test_set_time_alert_when_alert_time_equals_current_time(mut test_clock: TestClock) {
1546 let current_time = test_clock.timestamp_ns();
1547
1548 test_clock
1550 .set_time_alert_ns("alert_at_current_time", current_time, None, None)
1551 .unwrap();
1552
1553 assert_eq!(test_clock.timer_count(), 1);
1554
1555 let events = test_clock.advance_time(current_time, true);
1557
1558 assert_eq!(events.len(), 1);
1560 assert_eq!(events[0].name.as_str(), "alert_at_current_time");
1561 assert_eq!(*events[0].ts_event, *current_time);
1562 }
1563
1564 #[rstest]
1565 fn test_cancel_and_reschedule_same_name(mut test_clock: TestClock) {
1566 let start = test_clock.timestamp_ns();
1567
1568 test_clock
1569 .set_time_alert_ns("timer", UnixNanos::from(*start + 1000), None, None)
1570 .unwrap();
1571 assert_eq!(test_clock.timer_count(), 1);
1572
1573 test_clock.cancel_timer("timer");
1574 assert_eq!(test_clock.timer_count(), 0);
1575
1576 test_clock
1577 .set_time_alert_ns("timer", UnixNanos::from(*start + 2000), None, None)
1578 .unwrap();
1579 assert_eq!(test_clock.timer_count(), 1);
1580
1581 let events = test_clock.advance_time(UnixNanos::from(*start + 1500), true);
1582 assert!(events.is_empty());
1583
1584 let events = test_clock.advance_time(UnixNanos::from(*start + 2000), true);
1585 assert_eq!(events.len(), 1);
1586 assert_eq!(*events[0].ts_event, *start + 2000);
1587 }
1588
1589 #[rstest]
1590 fn test_multiple_timers_same_timestamp_all_fire(mut test_clock: TestClock) {
1591 let fire_time = UnixNanos::from(*test_clock.timestamp_ns() + 1000);
1592
1593 for i in 0..5 {
1594 test_clock
1595 .set_time_alert_ns(&format!("timer_{i}"), fire_time, None, None)
1596 .unwrap();
1597 }
1598 assert_eq!(test_clock.timer_count(), 5);
1599
1600 let events = test_clock.advance_time(fire_time, true);
1601 assert_eq!(events.len(), 5);
1602 for event in &events {
1603 assert_eq!(*event.ts_event, *fire_time);
1604 }
1605 }
1606
1607 #[rstest]
1608 fn test_events_ordered_by_timestamp_after_advance() {
1609 let mut clock = TestClock::new();
1610 clock.register_default_handler(TestCallback::default().into());
1611 let start = clock.timestamp_ns();
1612
1613 clock
1614 .set_time_alert_ns("third", UnixNanos::from(*start + 300), None, None)
1615 .unwrap();
1616 clock
1617 .set_time_alert_ns("first", UnixNanos::from(*start + 100), None, None)
1618 .unwrap();
1619 clock
1620 .set_time_alert_ns("second", UnixNanos::from(*start + 200), None, None)
1621 .unwrap();
1622
1623 let events = clock.advance_time(UnixNanos::from(*start + 400), true);
1624 assert_eq!(events.len(), 3);
1625 assert_eq!(events[0].name.as_str(), "first");
1626 assert_eq!(events[1].name.as_str(), "second");
1627 assert_eq!(events[2].name.as_str(), "third");
1628 }
1629
1630 #[rstest]
1631 fn test_large_interval_does_not_overflow(mut test_clock: TestClock) {
1632 let start = test_clock.timestamp_ns();
1633 let large_interval: u64 = 1_000_000_000 * 60 * 60 * 24 * 365; test_clock
1636 .set_timer_ns(
1637 "large_interval",
1638 large_interval,
1639 Some(start),
1640 None,
1641 None,
1642 None,
1643 None,
1644 )
1645 .unwrap();
1646
1647 let events = test_clock.advance_time(UnixNanos::from(*start + large_interval), true);
1648 assert_eq!(events.len(), 1);
1649 assert_eq!(*events[0].ts_event, *start + large_interval);
1650 }
1651
1652 #[rstest]
1653 fn test_near_zero_interval_fires_correctly(mut test_clock: TestClock) {
1654 let start = test_clock.timestamp_ns();
1655
1656 test_clock
1657 .set_timer_ns("tiny", 1, Some(start), None, None, None, None)
1658 .unwrap();
1659
1660 let events = test_clock.advance_time(UnixNanos::from(*start + 10), true);
1661 assert_eq!(events.len(), 10);
1662
1663 for i in 1..events.len() {
1664 assert!(events[i].ts_event >= events[i - 1].ts_event);
1665 }
1666 }
1667
1668 #[rstest]
1669 fn test_repeated_advance_to_same_time_no_double_fire(mut test_clock: TestClock) {
1670 let fire_time = UnixNanos::from(*test_clock.timestamp_ns() + 1000);
1671
1672 test_clock
1673 .set_time_alert_ns("once", fire_time, None, None)
1674 .unwrap();
1675
1676 let events1 = test_clock.advance_time(fire_time, true);
1677 assert_eq!(events1.len(), 1);
1678
1679 let events2 = test_clock.advance_time(fire_time, true);
1680 assert!(events2.is_empty());
1681 }
1682
1683 #[rstest]
1684 fn test_advance_with_no_timers(mut test_clock: TestClock) {
1685 let start = test_clock.timestamp_ns();
1686
1687 let events = test_clock.advance_time(UnixNanos::from(*start + 1000), true);
1688 assert!(events.is_empty());
1689 assert_eq!(*test_clock.timestamp_ns(), *start + 1000);
1690 }
1691}