Skip to main content

nautilus_common/
timer.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Real-time and test timers for use with `Clock` implementations.
17
18use std::{
19    cmp::Ordering,
20    fmt::{Debug, Display},
21    num::NonZeroU64,
22    rc::Rc,
23    sync::Arc,
24};
25
26#[cfg(feature = "python")]
27use nautilus_core::python::IntoPyObjectNautilusExt;
28use nautilus_core::{
29    UUID4, UnixNanos,
30    correctness::{FAILED, check_valid_string_utf8},
31};
32#[cfg(feature = "python")]
33use pyo3::{Py, PyAny, PyResult, Python, types::PyCapsule};
34use ustr::Ustr;
35
36/// Creates a valid nanoseconds interval that is guaranteed to be positive.
37///
38/// Coerces zero to one to ensure a valid `NonZeroU64`.
39#[must_use]
40pub fn create_valid_interval(interval_ns: u64) -> NonZeroU64 {
41    NonZeroU64::new(interval_ns).unwrap_or(NonZeroU64::MIN)
42}
43
44#[repr(C)]
45#[derive(Clone, Debug, PartialEq, Eq)]
46#[cfg_attr(
47    feature = "python",
48    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common", from_py_object)
49)]
50#[cfg_attr(
51    feature = "python",
52    pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.common")
53)]
54/// Represents a time event occurring at the event timestamp.
55///
56/// A `TimeEvent` carries metadata such as the event's name, a unique event ID,
57/// and timestamps indicating when the event was scheduled to occur and when it was initialized.
58pub struct TimeEvent {
59    /// The event name, identifying the nature or purpose of the event.
60    pub name: Ustr,
61    /// The unique identifier for the event.
62    pub event_id: UUID4,
63    /// UNIX timestamp (nanoseconds) when the event occurred.
64    pub ts_event: UnixNanos,
65    /// UNIX timestamp (nanoseconds) when the instance was created.
66    pub ts_init: UnixNanos,
67}
68
69impl TimeEvent {
70    /// Creates a new [`TimeEvent`] instance.
71    #[must_use]
72    pub const fn new(name: Ustr, event_id: UUID4, ts_event: UnixNanos, ts_init: UnixNanos) -> Self {
73        Self {
74            name,
75            event_id,
76            ts_event,
77            ts_init,
78        }
79    }
80}
81
82impl Display for TimeEvent {
83    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
84        write!(
85            f,
86            "{}(name={}, event_id={}, ts_event={}, ts_init={})",
87            stringify!(TimeEvent),
88            self.name,
89            self.event_id,
90            self.ts_event,
91            self.ts_init
92        )
93    }
94}
95
96/// Wrapper for [`TimeEvent`] that implements ordering by timestamp for heap scheduling.
97///
98/// This newtype allows time events to be ordered in a priority queue (max heap) by their
99/// timestamp while keeping [`TimeEvent`] itself clean with standard field-based equality.
100/// Events are ordered in reverse (earlier timestamps have higher priority).
101#[repr(transparent)] // Guarantees zero-cost abstraction with identical memory layout
102#[derive(Clone, Debug, PartialEq, Eq)]
103pub struct ScheduledTimeEvent(pub TimeEvent);
104
105impl ScheduledTimeEvent {
106    /// Creates a new scheduled time event.
107    #[must_use]
108    pub const fn new(event: TimeEvent) -> Self {
109        Self(event)
110    }
111
112    /// Extracts the inner time event.
113    #[must_use]
114    pub fn into_inner(self) -> TimeEvent {
115        self.0
116    }
117}
118
119impl PartialOrd for ScheduledTimeEvent {
120    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
121        Some(self.cmp(other))
122    }
123}
124
125impl Ord for ScheduledTimeEvent {
126    fn cmp(&self, other: &Self) -> Ordering {
127        // Reverse order for max heap: earlier timestamps have higher priority
128        other.0.ts_event.cmp(&self.0.ts_event)
129    }
130}
131
132#[cfg(feature = "python")]
133#[derive(Clone, Copy, Debug, PartialEq, Eq)]
134/// Python time event callback argument mode.
135pub enum PythonTimeEventCallbackArg {
136    /// Callbacks receive the PyO3 `TimeEvent` object.
137    TimeEvent,
138    /// Legacy Cython callbacks receive a `PyCapsule` containing the Rust event.
139    LegacyCapsule,
140}
141
142#[cfg(feature = "python")]
143/// Python callback for time events.
144pub struct PythonTimeEventCallback {
145    callback: Py<PyAny>,
146    arg: PythonTimeEventCallbackArg,
147}
148
149#[cfg(feature = "python")]
150impl PythonTimeEventCallback {
151    /// Creates a new [`PythonTimeEventCallback`] instance.
152    #[must_use]
153    pub const fn new(callback: Py<PyAny>, arg: PythonTimeEventCallbackArg) -> Self {
154        Self { callback, arg }
155    }
156
157    /// Returns the Python callable.
158    #[must_use]
159    pub const fn callback(&self) -> &Py<PyAny> {
160        &self.callback
161    }
162
163    /// Invokes the Python callback for the given `TimeEvent`.
164    pub fn call(&self, event: TimeEvent) {
165        Python::attach(|py| {
166            let result = match self.arg {
167                PythonTimeEventCallbackArg::TimeEvent => self.callback.call1(py, (event,)),
168                PythonTimeEventCallbackArg::LegacyCapsule => {
169                    call_legacy_python_time_event_callback(py, event, &self.callback)
170                }
171            };
172
173            if let Err(e) = result {
174                log::error!("Python time event callback raised exception: {e}");
175            }
176        });
177    }
178}
179
180#[cfg(feature = "python")]
181impl Debug for PythonTimeEventCallback {
182    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
183        f.debug_struct(stringify!(PythonTimeEventCallback))
184            .field("arg", &self.arg)
185            .finish_non_exhaustive()
186    }
187}
188
189/// Callback type for time events.
190///
191/// # Variants
192///
193/// - `Python`: For Python callbacks (requires `python` feature).
194/// - `Rust`: Thread-safe callbacks using `Arc`. Use when the closure is `Send + Sync`.
195/// - `RustLocal`: Single-threaded callbacks using `Rc`. Use when capturing `Rc<RefCell<...>>`.
196///
197/// # Choosing Between `Rust` and `RustLocal`
198///
199/// Use `Rust` (thread-safe) when:
200/// - The callback doesn't capture `Rc<RefCell<...>>` or other non-`Send` types.
201/// - The closure is `Send + Sync` (most simple closures qualify).
202///
203/// Use `RustLocal` when:
204/// - The callback captures `Rc<RefCell<...>>` for shared mutable state.
205/// - Thread safety constraints prevent using `Arc`.
206///
207/// `RustLocal` works with `TestClock`. With `LiveClock`, use it only for existing
208/// single-threaded callback paths; live timer callback registry dispatch is still pending.
209///
210/// # Automatic Conversion
211///
212/// - Closures that are `Fn + Send + Sync + 'static` automatically convert to `Rust`.
213/// - `Rc<dyn Fn(TimeEvent)>` converts to `RustLocal`.
214/// - `Arc<dyn Fn(TimeEvent) + Send + Sync>` converts to `Rust`.
215pub enum TimeEventCallback {
216    /// Python callable for use from Python via PyO3.
217    #[cfg(feature = "python")]
218    Python(Arc<PythonTimeEventCallback>),
219    /// Thread-safe Rust callback using `Arc` (`Send + Sync`).
220    Rust(Arc<dyn Fn(TimeEvent) + Send + Sync>),
221    /// Local Rust callback using `Rc` (not `Send`/`Sync`).
222    RustLocal(Rc<dyn Fn(TimeEvent)>),
223}
224
225impl Clone for TimeEventCallback {
226    fn clone(&self) -> Self {
227        match self {
228            #[cfg(feature = "python")]
229            Self::Python(callback) => Self::Python(callback.clone()),
230            Self::Rust(cb) => Self::Rust(cb.clone()),
231            Self::RustLocal(cb) => Self::RustLocal(cb.clone()),
232        }
233    }
234}
235
236impl Debug for TimeEventCallback {
237    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
238        match self {
239            #[cfg(feature = "python")]
240            Self::Python(_) => f.write_str("Python callback"),
241            Self::Rust(_) => f.write_str("Rust callback (thread-safe)"),
242            Self::RustLocal(_) => f.write_str("Rust callback (local)"),
243        }
244    }
245}
246
247impl TimeEventCallback {
248    /// Returns `true` if this is a thread-safe Rust callback.
249    #[must_use]
250    pub const fn is_rust(&self) -> bool {
251        matches!(self, Self::Rust(_))
252    }
253
254    /// Returns `true` if this is a local (non-thread-safe) Rust callback.
255    ///
256    /// Local callbacks use `Rc` internally and require creation, cloning, dropping,
257    /// and invocation to stay on the originating thread.
258    #[must_use]
259    pub const fn is_local(&self) -> bool {
260        matches!(self, Self::RustLocal(_))
261    }
262
263    /// Invokes the callback for the given `TimeEvent`.
264    ///
265    /// For Python callbacks, exceptions are logged as errors rather than panicking.
266    pub fn call(&self, event: TimeEvent) {
267        match self {
268            #[cfg(feature = "python")]
269            Self::Python(callback) => callback.call(event),
270            Self::Rust(callback) => callback(event),
271            Self::RustLocal(callback) => callback(event),
272        }
273    }
274}
275
276impl<F> From<F> for TimeEventCallback
277where
278    F: Fn(TimeEvent) + Send + Sync + 'static,
279{
280    fn from(value: F) -> Self {
281        Self::Rust(Arc::new(value))
282    }
283}
284
285impl From<Arc<dyn Fn(TimeEvent) + Send + Sync>> for TimeEventCallback {
286    fn from(value: Arc<dyn Fn(TimeEvent) + Send + Sync>) -> Self {
287        Self::Rust(value)
288    }
289}
290
291impl From<Rc<dyn Fn(TimeEvent)>> for TimeEventCallback {
292    fn from(value: Rc<dyn Fn(TimeEvent)>) -> Self {
293        Self::RustLocal(value)
294    }
295}
296
297#[cfg(feature = "python")]
298impl From<Py<PyAny>> for TimeEventCallback {
299    fn from(value: Py<PyAny>) -> Self {
300        Self::from_python_time_event(value)
301    }
302}
303
304#[cfg(feature = "python")]
305impl TimeEventCallback {
306    /// Creates a Python callback that receives a PyO3 `TimeEvent`.
307    #[must_use]
308    pub fn from_python_time_event(callback: Py<PyAny>) -> Self {
309        Self::Python(Arc::new(PythonTimeEventCallback::new(
310            callback,
311            PythonTimeEventCallbackArg::TimeEvent,
312        )))
313    }
314
315    /// Creates a legacy Python callback that receives a `PyCapsule`.
316    #[must_use]
317    pub fn from_python_legacy_capsule(callback: Py<PyAny>) -> Self {
318        Self::Python(Arc::new(PythonTimeEventCallback::new(
319            callback,
320            PythonTimeEventCallbackArg::LegacyCapsule,
321        )))
322    }
323}
324
325// SAFETY: TimeEventCallback is Send + Sync with the following invariants:
326//
327// - Python variant: Arc clone/drop does not require the GIL, and the callable is
328//   only invoked after acquiring the GIL.
329//
330// - Rust variant: Arc<dyn Fn + Send + Sync> is inherently Send + Sync.
331//
332// - RustLocal variant: Rc<dyn Fn> is not Send/Sync. This unsafe impl preserves
333//   existing API compatibility and relies on callers to keep RustLocal callbacks
334//   on the originating event-loop thread. LiveTimer logs a warning because its
335//   callback registry dispatch follow-up is still pending.
336//
337//   INVARIANT: RustLocal callbacks must only be cloned, dropped, or called from
338//   the thread that created them. Violating this invariant causes undefined behavior.
339//   Use the Rust variant with Arc if cross-thread execution is needed.
340#[allow(unsafe_code)]
341unsafe impl Send for TimeEventCallback {}
342#[allow(unsafe_code)]
343unsafe impl Sync for TimeEventCallback {}
344
345#[cfg(feature = "python")]
346fn call_legacy_python_time_event_callback(
347    py: Python<'_>,
348    event: TimeEvent,
349    callback: &Py<PyAny>,
350) -> PyResult<Py<PyAny>> {
351    #[allow(
352        deprecated,
353        reason = "unnamed capsules are required for legacy Cython time-event callbacks"
354    )]
355    let capsule: Py<PyAny> = PyCapsule::new_with_destructor(py, event, None, |_, _| {})
356        .expect("Error creating `PyCapsule`")
357        .into_py_any_unwrap(py);
358
359    callback.call1(py, (capsule,))
360}
361
362#[repr(C)]
363#[derive(Clone, Debug)]
364/// Represents a time event and its associated handler.
365///
366/// `TimeEventHandler` associates a `TimeEvent` with a callback function that is triggered
367/// when the event's timestamp is reached.
368pub struct TimeEventHandler {
369    /// The time event.
370    pub event: TimeEvent,
371    /// The callable handler for the event.
372    pub callback: TimeEventCallback,
373}
374
375impl TimeEventHandler {
376    /// Creates a new [`TimeEventHandler`] instance.
377    #[must_use]
378    pub const fn new(event: TimeEvent, callback: TimeEventCallback) -> Self {
379        Self { event, callback }
380    }
381
382    fn cmp_event(&self, other: &Self) -> Ordering {
383        self.event
384            .ts_event
385            .cmp(&other.event.ts_event)
386            .then_with(|| self.event.name.cmp(&other.event.name))
387            .then_with(|| self.event.ts_init.cmp(&other.event.ts_init))
388            .then_with(|| {
389                self.event
390                    .event_id
391                    .as_str()
392                    .cmp(other.event.event_id.as_str())
393            })
394    }
395
396    /// Executes the handler by invoking its callback for the associated event.
397    pub fn run(self) {
398        let Self { event, callback } = self;
399        crate::msgbus::dispatch_tap_time_event(&event);
400        callback.call(event);
401    }
402}
403
404impl PartialOrd for TimeEventHandler {
405    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
406        Some(self.cmp(other))
407    }
408}
409
410impl PartialEq for TimeEventHandler {
411    fn eq(&self, other: &Self) -> bool {
412        self.cmp_event(other).is_eq()
413    }
414}
415
416impl Eq for TimeEventHandler {}
417
418impl Ord for TimeEventHandler {
419    fn cmp(&self, other: &Self) -> Ordering {
420        self.cmp_event(other)
421    }
422}
423
424pub(crate) trait Timer {
425    fn is_expired(&self) -> bool;
426    fn cancel(&mut self);
427}
428
429/// A test timer for user with a `TestClock`.
430///
431/// `TestTimer` simulates time progression in a controlled environment,
432/// allowing for precise control over event generation in test scenarios.
433///
434/// # Threading
435///
436/// The timer mutates its internal state and should only be used from its owning thread.
437#[derive(Clone, Debug)]
438pub struct TestTimer {
439    /// The name of the timer.
440    pub name: Ustr,
441    /// The interval between timer events in nanoseconds.
442    pub interval_ns: NonZeroU64,
443    /// The start time of the timer in UNIX nanoseconds.
444    pub start_time_ns: UnixNanos,
445    /// The optional stop time of the timer in UNIX nanoseconds.
446    pub stop_time_ns: Option<UnixNanos>,
447    /// If the timer should fire immediately at start time.
448    pub fire_immediately: bool,
449    next_time_ns: UnixNanos,
450    is_expired: bool,
451}
452
453impl TestTimer {
454    /// Creates a new [`TestTimer`] instance.
455    ///
456    /// # Panics
457    ///
458    /// Panics if `name` is not a valid string.
459    #[must_use]
460    pub fn new(
461        name: Ustr,
462        interval_ns: NonZeroU64,
463        start_time_ns: UnixNanos,
464        stop_time_ns: Option<UnixNanos>,
465        fire_immediately: bool,
466    ) -> Self {
467        check_valid_string_utf8(name, stringify!(name)).expect(FAILED);
468
469        let next_time_ns = if fire_immediately {
470            start_time_ns
471        } else {
472            start_time_ns + interval_ns.get()
473        };
474
475        Self {
476            name,
477            interval_ns,
478            start_time_ns,
479            stop_time_ns,
480            fire_immediately,
481            next_time_ns,
482            is_expired: false,
483        }
484    }
485
486    /// Returns the next time in UNIX nanoseconds when the timer will fire.
487    #[must_use]
488    pub const fn next_time_ns(&self) -> UnixNanos {
489        self.next_time_ns
490    }
491
492    /// Returns whether the timer is expired.
493    #[must_use]
494    pub const fn is_expired(&self) -> bool {
495        self.is_expired
496    }
497
498    #[must_use]
499    pub const fn pop_event(&self, event_id: UUID4, ts_init: UnixNanos) -> TimeEvent {
500        TimeEvent {
501            name: self.name,
502            event_id,
503            ts_event: self.next_time_ns,
504            ts_init,
505        }
506    }
507
508    /// Advance the test timer forward to the given time, generating a sequence
509    /// of events. A [`TimeEvent`] is appended for each time a next event is
510    /// <= the given `to_time_ns`.
511    ///
512    /// This allows testing of multiple time intervals within a single step.
513    pub fn advance(&mut self, to_time_ns: UnixNanos) -> impl Iterator<Item = TimeEvent> + '_ {
514        // Calculate how many events should fire up to and including to_time_ns
515        let advances = if self.next_time_ns <= to_time_ns {
516            ((to_time_ns.as_u64() - self.next_time_ns.as_u64()) / self.interval_ns.get())
517                .saturating_add(1)
518        } else {
519            0
520        };
521        self.take(advances as usize).map(|(event, _)| event)
522    }
523
524    /// Cancels the timer (the timer will not generate an event).
525    ///
526    /// Used to stop the timer before its scheduled stop time.
527    pub const fn cancel(&mut self) {
528        self.is_expired = true;
529    }
530}
531
532impl Timer for TestTimer {
533    fn is_expired(&self) -> bool {
534        self.is_expired
535    }
536
537    fn cancel(&mut self) {
538        self.is_expired = true;
539    }
540}
541
542impl Iterator for TestTimer {
543    type Item = (TimeEvent, UnixNanos);
544
545    fn next(&mut self) -> Option<Self::Item> {
546        if self.is_expired {
547            None
548        } else {
549            // Check if current event would exceed stop time before creating the event
550            if let Some(stop_time_ns) = self.stop_time_ns
551                && self.next_time_ns > stop_time_ns
552            {
553                self.is_expired = true;
554                return None;
555            }
556
557            let item = (
558                TimeEvent {
559                    name: self.name,
560                    event_id: UUID4::new(),
561                    ts_event: self.next_time_ns,
562                    ts_init: self.next_time_ns,
563                },
564                self.next_time_ns,
565            );
566
567            // Check if we should expire after this event (for repeating timers at stop boundary)
568            if let Some(stop_time_ns) = self.stop_time_ns
569                && self.next_time_ns == stop_time_ns
570            {
571                self.is_expired = true;
572            }
573
574            self.next_time_ns += self.interval_ns;
575
576            Some(item)
577        }
578    }
579}
580
581#[cfg(test)]
582mod tests {
583    use std::{cell::RefCell, num::NonZeroU64, rc::Rc};
584
585    use nautilus_core::{UUID4, UnixNanos};
586    #[cfg(feature = "python")]
587    use pyo3::{
588        Bound, PyResult, Python,
589        types::{
590            PyAnyMethods, PyCFunction, PyDict, PyList, PyListMethods, PyTuple, PyTupleMethods,
591            PyTypeMethods,
592        },
593    };
594    use rstest::*;
595    use ustr::Ustr;
596
597    use super::{TestTimer, TimeEvent, TimeEventCallback, TimeEventHandler, create_valid_interval};
598    use crate::msgbus::{
599        BusTap, Endpoint, MStr, MessagingSwitchboard, Topic, clear_bus_tap, set_bus_tap,
600    };
601
602    #[rstest]
603    #[case(0, 1)]
604    #[case(1, 1)]
605    #[case(25, 25)]
606    fn test_create_valid_interval(#[case] interval_ns: u64, #[case] expected: u64) {
607        assert_eq!(create_valid_interval(interval_ns).get(), expected);
608    }
609
610    #[rstest]
611    fn test_test_timer_pop_event() {
612        let mut timer = TestTimer::new(
613            Ustr::from("TEST_TIMER"),
614            NonZeroU64::new(1).unwrap(),
615            UnixNanos::from(1),
616            None,
617            false,
618        );
619
620        assert!(timer.next().is_some());
621        assert!(timer.next().is_some());
622        timer.is_expired = true;
623        assert!(timer.next().is_none());
624    }
625
626    #[rstest]
627    fn test_test_timer_advance_within_next_time_ns() {
628        let mut timer = TestTimer::new(
629            Ustr::from("TEST_TIMER"),
630            NonZeroU64::new(5).unwrap(),
631            UnixNanos::default(),
632            None,
633            false,
634        );
635        let _: Vec<TimeEvent> = timer.advance(UnixNanos::from(1)).collect();
636        let _: Vec<TimeEvent> = timer.advance(UnixNanos::from(2)).collect();
637        let _: Vec<TimeEvent> = timer.advance(UnixNanos::from(3)).collect();
638        assert_eq!(timer.advance(UnixNanos::from(4)).count(), 0);
639        assert_eq!(timer.next_time_ns, 5);
640        assert!(!timer.is_expired);
641    }
642
643    #[rstest]
644    fn test_test_timer_advance_up_to_next_time_ns() {
645        let mut timer = TestTimer::new(
646            Ustr::from("TEST_TIMER"),
647            NonZeroU64::new(1).unwrap(),
648            UnixNanos::default(),
649            None,
650            false,
651        );
652        assert_eq!(timer.advance(UnixNanos::from(1)).count(), 1);
653        assert!(!timer.is_expired);
654    }
655
656    #[rstest]
657    fn test_test_timer_advance_up_to_next_time_ns_with_stop_time() {
658        let mut timer = TestTimer::new(
659            Ustr::from("TEST_TIMER"),
660            NonZeroU64::new(1).unwrap(),
661            UnixNanos::default(),
662            Some(UnixNanos::from(2)),
663            false,
664        );
665        assert_eq!(timer.advance(UnixNanos::from(2)).count(), 2);
666        assert!(timer.is_expired);
667    }
668
669    #[rstest]
670    fn test_test_timer_advance_beyond_next_time_ns() {
671        let mut timer = TestTimer::new(
672            Ustr::from("TEST_TIMER"),
673            NonZeroU64::new(1).unwrap(),
674            UnixNanos::default(),
675            Some(UnixNanos::from(5)),
676            false,
677        );
678        assert_eq!(timer.advance(UnixNanos::from(5)).count(), 5);
679        assert!(timer.is_expired);
680    }
681
682    #[rstest]
683    fn test_test_timer_advance_beyond_stop_time() {
684        let mut timer = TestTimer::new(
685            Ustr::from("TEST_TIMER"),
686            NonZeroU64::new(1).unwrap(),
687            UnixNanos::default(),
688            Some(UnixNanos::from(5)),
689            false,
690        );
691        assert_eq!(timer.advance(UnixNanos::from(10)).count(), 5);
692        assert!(timer.is_expired);
693    }
694
695    #[rstest]
696    fn test_test_timer_advance_exact_boundary() {
697        let mut timer = TestTimer::new(
698            Ustr::from("TEST_TIMER"),
699            NonZeroU64::new(5).unwrap(),
700            UnixNanos::from(0),
701            None,
702            false,
703        );
704        assert_eq!(
705            timer.advance(UnixNanos::from(5)).count(),
706            1,
707            "Expected one event at the 5 ns boundary"
708        );
709        assert_eq!(
710            timer.advance(UnixNanos::from(10)).count(),
711            1,
712            "Expected one event at the 10 ns boundary"
713        );
714    }
715
716    #[rstest]
717    fn test_test_timer_fire_immediately_true() {
718        let mut timer = TestTimer::new(
719            Ustr::from("TEST_TIMER"),
720            NonZeroU64::new(5).unwrap(),
721            UnixNanos::from(10),
722            None,
723            true, // fire_immediately = true
724        );
725
726        // With fire_immediately=true, next_time_ns should be start_time_ns
727        assert_eq!(timer.next_time_ns(), UnixNanos::from(10));
728
729        // Advance to start time should produce an event
730        let events: Vec<TimeEvent> = timer.advance(UnixNanos::from(10)).collect();
731        assert_eq!(events.len(), 1);
732        assert_eq!(events[0].ts_event, UnixNanos::from(10));
733
734        // Next event should be at start_time + interval
735        assert_eq!(timer.next_time_ns(), UnixNanos::from(15));
736    }
737
738    #[rstest]
739    fn test_test_timer_fire_immediately_false() {
740        let mut timer = TestTimer::new(
741            Ustr::from("TEST_TIMER"),
742            NonZeroU64::new(5).unwrap(),
743            UnixNanos::from(10),
744            None,
745            false, // fire_immediately = false
746        );
747
748        // With fire_immediately=false, next_time_ns should be start_time_ns + interval
749        assert_eq!(timer.next_time_ns(), UnixNanos::from(15));
750
751        // Advance to start time should produce no events
752        assert_eq!(timer.advance(UnixNanos::from(10)).count(), 0);
753
754        // Advance to first interval should produce an event
755        let events: Vec<TimeEvent> = timer.advance(UnixNanos::from(15)).collect();
756        assert_eq!(events.len(), 1);
757        assert_eq!(events[0].ts_event, UnixNanos::from(15));
758    }
759
760    #[rstest]
761    fn test_time_event_handler_ordering_uses_tie_breakers() {
762        let callback = TimeEventCallback::from(|_: TimeEvent| {});
763
764        let later_name = TimeEventHandler::new(
765            TimeEvent::new(
766                Ustr::from("TIME_BAR_ESM4-2-MINUTE-ASK-INTERNAL"),
767                UUID4::from("00000000-0000-4000-8000-000000000003"),
768                100.into(),
769                100.into(),
770            ),
771            callback.clone(),
772        );
773        let earlier_name = TimeEventHandler::new(
774            TimeEvent::new(
775                Ustr::from("SPREAD_QUOTE_ESM4"),
776                UUID4::from("00000000-0000-4000-8000-000000000002"),
777                100.into(),
778                100.into(),
779            ),
780            callback.clone(),
781        );
782        let later_init = TimeEventHandler::new(
783            TimeEvent::new(
784                Ustr::from("SPREAD_QUOTE_ESM4"),
785                UUID4::from("00000000-0000-4000-8000-000000000004"),
786                100.into(),
787                101.into(),
788            ),
789            callback.clone(),
790        );
791        let later_id = TimeEventHandler::new(
792            TimeEvent::new(
793                Ustr::from("SPREAD_QUOTE_ESM4"),
794                UUID4::from("00000000-0000-4000-8000-000000000005"),
795                100.into(),
796                100.into(),
797            ),
798            callback,
799        );
800
801        assert!(earlier_name < later_name);
802        assert!(earlier_name < later_init);
803        assert!(earlier_name < later_id);
804        assert_ne!(earlier_name, later_id);
805    }
806
807    #[cfg(feature = "python")]
808    #[rstest]
809    fn test_python_callback_modes_pass_expected_argument_types() {
810        Python::initialize();
811
812        Python::attach(|py| {
813            let seen = PyList::empty(py);
814            let seen_obj = seen.clone().unbind().into_any();
815            let callback = new_sync_py_callback(
816                py,
817                move |args: &Bound<'_, PyTuple>,
818                      _kwargs: Option<&Bound<'_, PyDict>>|
819                      -> PyResult<()> {
820                    let arg = args.get_item(0)?;
821                    let type_name = arg.get_type().name()?.to_string();
822                    Python::attach(|py| seen_obj.call_method1(py, "append", (type_name,)))?;
823                    Ok(())
824                },
825            )
826            .expect("callback should create")
827            .into_any()
828            .unbind();
829
830            let event = TimeEvent::new(
831                Ustr::from("PY_CALLBACK_MODE"),
832                UUID4::from("00000000-0000-4000-8000-000000000007"),
833                UnixNanos::from(100),
834                UnixNanos::from(99),
835            );
836
837            TimeEventCallback::from_python_time_event(callback.clone_ref(py)).call(event.clone());
838            TimeEventCallback::from_python_legacy_capsule(callback).call(event);
839
840            assert_eq!(
841                seen.get_item(0).unwrap().extract::<String>().unwrap(),
842                "TimeEvent"
843            );
844            assert_eq!(
845                seen.get_item(1).unwrap().extract::<String>().unwrap(),
846                "PyCapsule"
847            );
848        });
849    }
850
851    #[cfg(feature = "python")]
852    fn new_sync_py_callback<F>(py: Python<'_>, closure: F) -> PyResult<Bound<'_, PyCFunction>>
853    where
854        F: Fn(&Bound<'_, PyTuple>, Option<&Bound<'_, PyDict>>) -> PyResult<()>
855            + Send
856            + Sync
857            + 'static,
858    {
859        PyCFunction::new_closure(py, None, None, closure)
860    }
861
862    #[derive(Default)]
863    struct RecordingTimeEventTap {
864        time_events: RefCell<Vec<(String, TimeEvent)>>,
865    }
866
867    impl RecordingTimeEventTap {
868        fn time_events(&self) -> Vec<(String, TimeEvent)> {
869            self.time_events.borrow().clone()
870        }
871    }
872
873    impl BusTap for RecordingTimeEventTap {
874        fn on_publish(&self, topic: MStr<Topic>, message: &dyn std::any::Any) {
875            if let Some(event) = message.downcast_ref::<TimeEvent>() {
876                self.time_events
877                    .borrow_mut()
878                    .push((topic.to_string(), event.clone()));
879            }
880        }
881
882        fn on_send(&self, _endpoint: MStr<Endpoint>, _message: &dyn std::any::Any) {}
883    }
884
885    #[rstest]
886    fn test_time_event_handler_run_dispatches_tap_before_callback() {
887        let event = TimeEvent::new(
888            Ustr::from("strategy.heartbeat"),
889            UUID4::from("00000000-0000-4000-8000-000000000006"),
890            UnixNanos::from(100),
891            UnixNanos::from(99),
892        );
893        let tap = Rc::new(RecordingTimeEventTap::default());
894        let callback_seen: Rc<RefCell<Vec<TimeEvent>>> = Rc::new(RefCell::new(Vec::new()));
895        let expected_topic = MessagingSwitchboard::time_event_topic().to_string();
896        let callback_expected = event.clone();
897        let callback_expected_topic = expected_topic.clone();
898        let callback_tap = Rc::clone(&tap);
899        let callback_seen_ref = Rc::clone(&callback_seen);
900        let callback: Rc<dyn Fn(TimeEvent)> = Rc::new(move |callback_event| {
901            assert_eq!(
902                callback_tap.time_events(),
903                vec![(callback_expected_topic.clone(), callback_expected.clone())],
904            );
905            callback_seen_ref.borrow_mut().push(callback_event);
906        });
907
908        set_bus_tap(tap.clone());
909        TimeEventHandler::new(event.clone(), TimeEventCallback::from(callback)).run();
910        clear_bus_tap();
911
912        assert_eq!(tap.time_events(), vec![(expected_topic, event.clone())]);
913        assert_eq!(*callback_seen.borrow(), vec![event]);
914    }
915
916    ////////////////////////////////////////////////////////////////////////////////
917    // Property-based testing
918    ////////////////////////////////////////////////////////////////////////////////
919
920    use proptest::prelude::*;
921
922    #[derive(Clone, Debug)]
923    enum TimerOperation {
924        AdvanceTime(u64),
925        Cancel,
926    }
927
928    fn timer_operation_strategy() -> impl Strategy<Value = TimerOperation> {
929        prop_oneof![
930            8 => prop::num::u64::ANY.prop_map(|v| TimerOperation::AdvanceTime(v % 1000 + 1)),
931            2 => Just(TimerOperation::Cancel),
932        ]
933    }
934
935    fn timer_config_strategy() -> impl Strategy<Value = (u64, u64, Option<u64>, bool)> {
936        (
937            1u64..=100u64,                    // interval_ns (1-100)
938            0u64..=50u64,                     // start_time_ns (0-50)
939            prop::option::of(51u64..=200u64), // stop_time_ns (51-200 or None)
940            prop::bool::ANY,                  // fire_immediately
941        )
942    }
943
944    fn timer_test_strategy()
945    -> impl Strategy<Value = (Vec<TimerOperation>, (u64, u64, Option<u64>, bool))> {
946        (
947            prop::collection::vec(timer_operation_strategy(), 5..=50),
948            timer_config_strategy(),
949        )
950    }
951
952    #[expect(clippy::needless_collect)] // Collect needed for indexing and .is_empty()
953    fn test_timer_with_operations(
954        operations: Vec<TimerOperation>,
955        (interval_ns, start_time_ns, stop_time_ns, fire_immediately): (u64, u64, Option<u64>, bool),
956    ) {
957        let mut timer = TestTimer::new(
958            Ustr::from("PROP_TEST_TIMER"),
959            NonZeroU64::new(interval_ns).unwrap(),
960            UnixNanos::from(start_time_ns),
961            stop_time_ns.map(UnixNanos::from),
962            fire_immediately,
963        );
964
965        let mut current_time = start_time_ns;
966
967        for operation in operations {
968            if timer.is_expired() {
969                break;
970            }
971
972            match operation {
973                TimerOperation::AdvanceTime(delta) => {
974                    let to_time = current_time + delta;
975                    let events: Vec<TimeEvent> = timer.advance(UnixNanos::from(to_time)).collect();
976                    current_time = to_time;
977
978                    // Verify event ordering and timing
979                    for (i, event) in events.iter().enumerate() {
980                        // Event timestamps should be in order
981                        if i > 0 {
982                            assert!(
983                                event.ts_event >= events[i - 1].ts_event,
984                                "Events should be in chronological order"
985                            );
986                        }
987
988                        // Event timestamp should be within reasonable bounds
989                        assert!(
990                            event.ts_event.as_u64() >= start_time_ns,
991                            "Event timestamp should not be before start time"
992                        );
993
994                        assert!(
995                            event.ts_event.as_u64() <= to_time,
996                            "Event timestamp should not be after advance time"
997                        );
998
999                        // If there's a stop time, event should not exceed it
1000                        if let Some(stop_time_ns) = stop_time_ns {
1001                            assert!(
1002                                event.ts_event.as_u64() <= stop_time_ns,
1003                                "Event timestamp should not exceed stop time"
1004                            );
1005                        }
1006                    }
1007                }
1008                TimerOperation::Cancel => {
1009                    timer.cancel();
1010                    assert!(timer.is_expired(), "Timer should be expired after cancel");
1011                }
1012            }
1013
1014            // Timer invariants
1015            if !timer.is_expired() {
1016                // Next time should be properly spaced
1017                let expected_interval_multiple = if fire_immediately {
1018                    timer.next_time_ns().as_u64() >= start_time_ns
1019                } else {
1020                    timer.next_time_ns().as_u64() >= start_time_ns + interval_ns
1021                };
1022                assert!(
1023                    expected_interval_multiple,
1024                    "Next time should respect interval spacing"
1025                );
1026
1027                // If timer has stop time, check if it should be considered logically expired
1028                // Note: Timer only becomes actually expired when advance() or next() is called
1029                if let Some(stop_time_ns) = stop_time_ns
1030                    && timer.next_time_ns().as_u64() > stop_time_ns
1031                {
1032                    // The timer should expire on the next advance/iteration
1033                    let mut test_timer = timer.clone();
1034                    let events: Vec<TimeEvent> = test_timer
1035                        .advance(UnixNanos::from(stop_time_ns + 1))
1036                        .collect();
1037                    assert!(
1038                        events.is_empty() || test_timer.is_expired(),
1039                        "Timer should not generate events beyond stop time"
1040                    );
1041                }
1042            }
1043        }
1044
1045        // Final consistency check: if timer is not expired and we haven't hit stop time,
1046        // advancing far enough should eventually expire it
1047        if !timer.is_expired()
1048            && let Some(stop_time_ns) = stop_time_ns
1049        {
1050            let events: Vec<TimeEvent> = timer
1051                .advance(UnixNanos::from(stop_time_ns + 1000))
1052                .collect();
1053            assert!(
1054                timer.is_expired() || events.is_empty(),
1055                "Timer should eventually expire or stop generating events"
1056            );
1057        }
1058    }
1059
1060    proptest! {
1061        #[rstest]
1062        fn prop_timer_advance_operations((operations, config) in timer_test_strategy()) {
1063            test_timer_with_operations(operations, config);
1064        }
1065
1066        #[rstest]
1067        fn prop_timer_interval_consistency(
1068            interval_ns in 1u64..=100u64,
1069            start_time_ns in 0u64..=50u64,
1070            fire_immediately in prop::bool::ANY,
1071            advance_count in 1usize..=20usize,
1072        ) {
1073            let mut timer = TestTimer::new(
1074                Ustr::from("CONSISTENCY_TEST"),
1075                NonZeroU64::new(interval_ns).unwrap(),
1076                UnixNanos::from(start_time_ns),
1077                None, // No stop time for this test
1078                fire_immediately,
1079            );
1080
1081            let mut previous_event_time = if fire_immediately { start_time_ns } else { start_time_ns + interval_ns };
1082
1083            for _ in 0..advance_count {
1084                let events: Vec<TimeEvent> = timer.advance(UnixNanos::from(previous_event_time)).collect();
1085
1086                if !events.is_empty() {
1087                    // Should get exactly one event at the expected time
1088                    prop_assert_eq!(events.len(), 1);
1089                    prop_assert_eq!(events[0].ts_event.as_u64(), previous_event_time);
1090                }
1091
1092                previous_event_time += interval_ns;
1093            }
1094        }
1095    }
1096}