use std::{
num::NonZeroU64,
sync::{
Arc,
atomic::{self, AtomicU64},
},
};
#[cfg(feature = "python")]
use nautilus_core::consts::NAUTILUS_PREFIX;
use nautilus_core::{
UUID4, UnixNanos,
correctness::{FAILED, check_valid_string_utf8},
datetime::floor_to_nearest_microsecond,
time::get_atomic_clock_realtime,
};
#[cfg(feature = "python")]
use pyo3::{Py, PyAny, Python};
use tokio::{
task::JoinHandle,
time::{Duration, Instant},
};
use ustr::Ustr;
use super::runtime::get_runtime;
use crate::{
runner::TimeEventSender,
timer::{TimeEvent, TimeEventCallback, TimeEventHandler, Timer},
};
#[derive(Debug)]
pub struct LiveTimer {
pub name: Ustr,
pub interval_ns: NonZeroU64,
pub start_time_ns: UnixNanos,
pub stop_time_ns: Option<UnixNanos>,
pub fire_immediately: bool,
next_time_ns: Arc<AtomicU64>,
callback: TimeEventCallback,
task_handle: Option<JoinHandle<()>>,
sender: Option<Arc<dyn TimeEventSender>>,
}
impl LiveTimer {
#[must_use]
pub fn new(
name: Ustr,
interval_ns: NonZeroU64,
start_time_ns: UnixNanos,
stop_time_ns: Option<UnixNanos>,
callback: TimeEventCallback,
fire_immediately: bool,
sender: Option<Arc<dyn TimeEventSender>>,
) -> Self {
check_valid_string_utf8(name, stringify!(name)).expect(FAILED);
let next_time_ns = if fire_immediately {
start_time_ns.as_u64()
} else {
start_time_ns.as_u64() + interval_ns.get()
};
log::debug!("Creating timer '{name}'");
Self {
name,
interval_ns,
start_time_ns,
stop_time_ns,
fire_immediately,
next_time_ns: Arc::new(AtomicU64::new(next_time_ns)),
callback,
task_handle: None,
sender,
}
}
#[must_use]
pub fn next_time_ns(&self) -> UnixNanos {
UnixNanos::from(self.next_time_ns.load(atomic::Ordering::SeqCst))
}
#[must_use]
pub fn is_expired(&self) -> bool {
self.task_handle
.as_ref()
.is_some_and(tokio::task::JoinHandle::is_finished)
}
#[allow(unused_variables)]
pub fn start(&mut self) {
let event_name = self.name;
let stop_time_ns = self.stop_time_ns;
let interval_ns = self.interval_ns.get();
let callback = self.callback.clone();
let clock = get_atomic_clock_realtime();
let now_ns = clock.get_time_ns();
let now_raw = now_ns.as_u64();
let mut observed_next = self.next_time_ns.load(atomic::Ordering::SeqCst);
if observed_next <= now_raw {
loop {
match self.next_time_ns.compare_exchange(
observed_next,
now_raw,
atomic::Ordering::SeqCst,
atomic::Ordering::SeqCst,
) {
Ok(_) => {
if observed_next < now_raw {
let original = UnixNanos::from(observed_next);
log::warn!(
"Timer '{event_name}' alert time {} was in the past, adjusted to current time for immediate fire",
original.to_rfc3339(),
);
}
observed_next = now_raw;
break;
}
Err(actual) => {
observed_next = actual;
if observed_next > now_raw {
break;
}
}
}
}
}
let mut next_time_ns = UnixNanos::from(floor_to_nearest_microsecond(observed_next));
let next_time_atomic = self.next_time_ns.clone();
next_time_atomic.store(next_time_ns.as_u64(), atomic::Ordering::SeqCst);
let sender = self.sender.clone();
let rt = get_runtime();
let handle = rt.spawn(async move {
let clock = get_atomic_clock_realtime();
let overhead = Duration::from_millis(1);
let delay_ns = next_time_ns.saturating_sub(now_ns.as_u64());
let mut delay = Duration::from_nanos(delay_ns);
if delay > overhead {
delay -= overhead;
} else {
delay = Duration::from_nanos(0);
}
let start = Instant::now() + delay;
let mut timer = tokio::time::interval_at(start, Duration::from_nanos(interval_ns));
loop {
timer.tick().await;
let now_ns = clock.get_time_ns();
let event = TimeEvent::new(event_name, UUID4::new(), next_time_ns, now_ns);
match callback {
#[cfg(feature = "python")]
TimeEventCallback::Python(ref callback) => {
call_python_with_time_event(event, callback);
}
TimeEventCallback::Rust(_) | TimeEventCallback::RustLocal(_) => {
debug_assert!(
sender.is_some(),
"LiveTimer with Rust callback requires TimeEventSender"
);
let sender = sender
.as_ref()
.expect("timer event sender was unset for Rust callback system");
let handler = TimeEventHandler::new(event, callback.clone());
sender.send(handler);
}
}
next_time_ns += interval_ns;
next_time_atomic.store(next_time_ns.as_u64(), atomic::Ordering::SeqCst);
if let Some(stop_time_ns) = stop_time_ns
&& std::cmp::max(next_time_ns, now_ns) >= stop_time_ns
{
break; }
}
});
self.task_handle = Some(handle);
}
pub fn cancel(&mut self) {
log::debug!("Cancel timer '{}'", self.name);
if let Some(ref handle) = self.task_handle {
handle.abort();
}
}
}
impl Timer for LiveTimer {
fn is_expired(&self) -> bool {
self.task_handle
.as_ref()
.is_some_and(tokio::task::JoinHandle::is_finished)
}
fn cancel(&mut self) {
Self::cancel(self);
}
}
#[cfg(feature = "python")]
fn call_python_with_time_event(event: TimeEvent, callback: &Py<PyAny>) {
use nautilus_core::python::IntoPyObjectNautilusExt;
use pyo3::types::PyCapsule;
Python::attach(|py| {
let capsule: Py<PyAny> = PyCapsule::new_with_destructor(py, event, None, |_, _| {})
.expect("Error creating `PyCapsule`")
.into_py_any_unwrap(py);
match callback.call1(py, (capsule,)) {
Ok(_) => {}
Err(e) => eprintln!("{NAUTILUS_PREFIX} Error on callback: {e:?}"),
}
});
}
#[cfg(test)]
mod tests {
use std::{num::NonZeroU64, sync::Arc};
use nautilus_core::{UnixNanos, time::get_atomic_clock_realtime};
use rstest::*;
use ustr::Ustr;
use super::LiveTimer;
use crate::{
runner::TimeEventSender,
timer::{TimeEventCallback, TimeEventHandler},
};
#[rstest]
fn test_live_timer_fire_immediately_field() {
let timer = LiveTimer::new(
Ustr::from("TEST_TIMER"),
NonZeroU64::new(1000).unwrap(),
UnixNanos::from(100),
None,
TimeEventCallback::from(|_| {}),
true, None, );
assert!(timer.fire_immediately);
assert_eq!(timer.next_time_ns(), UnixNanos::from(100));
}
#[rstest]
fn test_live_timer_fire_immediately_false_field() {
let timer = LiveTimer::new(
Ustr::from("TEST_TIMER"),
NonZeroU64::new(1000).unwrap(),
UnixNanos::from(100),
None,
TimeEventCallback::from(|_| {}),
false, None, );
assert!(!timer.fire_immediately);
assert_eq!(timer.next_time_ns(), UnixNanos::from(1100));
}
#[rstest]
fn test_live_timer_adjusts_past_due_start_time() {
#[derive(Debug)]
struct NoopSender;
impl TimeEventSender for NoopSender {
fn send(&self, _handler: TimeEventHandler) {}
}
let sender = Arc::new(NoopSender);
let mut timer = LiveTimer::new(
Ustr::from("PAST_TIMER"),
NonZeroU64::new(1).unwrap(),
UnixNanos::from(0),
None,
TimeEventCallback::from(|_| {}),
true,
Some(sender),
);
let before = get_atomic_clock_realtime().get_time_ns();
timer.start();
assert!(timer.next_time_ns() >= before);
timer.cancel();
}
}