use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;
use crate::boundary::BindingBoundary;
use crate::handle::{HandleId, NodeId};
use crate::mailbox::CoreMailbox;
pub type TimerCallback = Box<dyn Fn() + Send + Sync>;
pub enum TimerCmd {
Schedule {
tag: u32,
delay: Duration,
handle: HandleId,
},
ScheduleCallback {
tag: u32,
delay: Duration,
callback: TimerCallback,
},
Cancel { tag: u32 },
CancelAll,
}
impl std::fmt::Debug for TimerCmd {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Schedule { tag, delay, handle } => f
.debug_struct("Schedule")
.field("tag", tag)
.field("delay", delay)
.field("handle", handle)
.finish(),
Self::ScheduleCallback { tag, delay, .. } => f
.debug_struct("ScheduleCallback")
.field("tag", tag)
.field("delay", delay)
.finish_non_exhaustive(),
Self::Cancel { tag } => f.debug_struct("Cancel").field("tag", tag).finish(),
Self::CancelAll => write!(f, "CancelAll"),
}
}
}
pub type TimerSender = mpsc::UnboundedSender<TimerCmd>;
#[must_use]
pub struct TimerTaskHandle {
tx: Option<mpsc::UnboundedSender<TimerCmd>>,
}
impl TimerTaskHandle {
#[must_use]
pub fn sender(&self) -> TimerSender {
self.tx
.as_ref()
.expect("TimerTaskHandle: sender already taken via shutdown")
.clone()
}
pub fn shutdown(&mut self) {
if let Some(tx) = self.tx.take() {
let _ = tx.send(TimerCmd::CancelAll);
}
}
}
impl Drop for TimerTaskHandle {
fn drop(&mut self) {
self.shutdown();
}
}
pub fn spawn_timer_task(
mailbox: Arc<CoreMailbox>,
node_id: NodeId,
binding: Arc<dyn BindingBoundary>,
) -> TimerTaskHandle {
let (tx, rx) = mpsc::unbounded_channel();
tokio::spawn(timer_task_loop(rx, mailbox, node_id, binding));
TimerTaskHandle { tx: Some(tx) }
}
async fn timer_task_loop(
mut rx: mpsc::UnboundedReceiver<TimerCmd>,
mailbox: Arc<CoreMailbox>,
node_id: NodeId,
binding: Arc<dyn BindingBoundary>,
) {
let mut slots: Vec<TimerSlot> = Vec::new();
loop {
let next_fire = nearest_deadline(&slots);
tokio::select! {
biased;
cmd = rx.recv() => {
match cmd {
Some(TimerCmd::Schedule { tag, delay, handle }) => {
cancel_slot(&mut slots, tag, &*binding);
let deadline = tokio::time::Instant::now() + delay;
slots.push(TimerSlot { tag, kind: TimerSlotKind::Emit(handle), deadline });
}
Some(TimerCmd::ScheduleCallback { tag, delay, callback }) => {
cancel_slot(&mut slots, tag, &*binding);
let deadline = tokio::time::Instant::now() + delay;
slots.push(TimerSlot { tag, kind: TimerSlotKind::Callback(callback), deadline });
}
Some(TimerCmd::Cancel { tag }) => {
cancel_slot(&mut slots, tag, &*binding);
}
Some(TimerCmd::CancelAll) => {
release_all_slots(&mut slots, &*binding);
}
None => {
release_all_slots(&mut slots, &*binding);
return;
}
}
}
() = sleep_until_or_forever(next_fire) => {
let now = tokio::time::Instant::now();
let mut i = 0;
while i < slots.len() {
if slots[i].deadline <= now {
let slot = slots.swap_remove(i);
match slot.kind {
TimerSlotKind::Emit(handle) => {
if !mailbox.post_emit(node_id, handle) {
binding.release_handle(handle);
release_all_slots(&mut slots, &*binding);
return;
}
}
TimerSlotKind::Callback(cb) => {
cb();
}
}
} else {
i += 1;
}
}
}
}
}
}
enum TimerSlotKind {
Emit(HandleId),
Callback(TimerCallback),
}
struct TimerSlot {
tag: u32,
kind: TimerSlotKind,
deadline: tokio::time::Instant,
}
fn cancel_slot(slots: &mut Vec<TimerSlot>, tag: u32, binding: &dyn BindingBoundary) {
if let Some(pos) = slots.iter().position(|s| s.tag == tag) {
let slot = slots.swap_remove(pos);
if let TimerSlotKind::Emit(h) = slot.kind {
binding.release_handle(h);
}
}
}
fn release_all_slots(slots: &mut Vec<TimerSlot>, binding: &dyn BindingBoundary) {
for slot in slots.drain(..) {
if let TimerSlotKind::Emit(h) = slot.kind {
binding.release_handle(h);
}
}
}
fn nearest_deadline(slots: &[TimerSlot]) -> Option<tokio::time::Instant> {
slots.iter().map(|s| s.deadline).min()
}
async fn sleep_until_or_forever(deadline: Option<tokio::time::Instant>) {
match deadline {
Some(d) => tokio::time::sleep_until(d).await,
None => std::future::pending::<()>().await,
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::boundary::{DepBatch, FnResult};
use crate::handle::FnId;
async fn settle(core: &crate::node::Core) {
for _ in 0..10 {
tokio::task::yield_now().await;
}
core.drain_mailbox();
}
struct TimerTestBinding {
released: Arc<parking_lot::Mutex<Vec<HandleId>>>,
}
impl TimerTestBinding {
fn new(released: Arc<parking_lot::Mutex<Vec<HandleId>>>) -> Self {
Self { released }
}
}
impl BindingBoundary for TimerTestBinding {
fn invoke_fn(&self, _node_id: NodeId, _fn_id: FnId, _dep_data: &[DepBatch]) -> FnResult {
FnResult::Noop { tracked: None }
}
fn custom_equals(&self, _equals_handle: FnId, _a: HandleId, _b: HandleId) -> bool {
false
}
fn release_handle(&self, h: HandleId) {
self.released.lock().push(h);
}
}
fn make_test_core(
released: Arc<parking_lot::Mutex<Vec<HandleId>>>,
) -> (crate::node::Core, NodeId, Arc<dyn BindingBoundary>) {
let binding: Arc<dyn BindingBoundary> = Arc::new(TimerTestBinding::new(released));
let core = crate::node::Core::new(binding.clone());
let s = core
.register_state(crate::handle::NO_HANDLE, false)
.unwrap();
(core, s, binding)
}
#[tokio::test]
async fn schedule_fires_after_delay() {
tokio::time::pause();
let released = Arc::new(parking_lot::Mutex::new(Vec::new()));
let (core, node, binding) = make_test_core(released.clone());
let mailbox = core.mailbox();
let emitted = Arc::new(parking_lot::Mutex::new(Vec::<HandleId>::new()));
let em = emitted.clone();
let _sub = core.subscribe(
node,
Arc::new(move |msgs| {
for m in msgs {
if let crate::message::Message::Data(h) = m {
em.lock().push(*h);
}
}
}),
);
let task = spawn_timer_task(mailbox, node, binding.clone());
let h1 = HandleId::new(42);
binding.retain_handle(h1);
task.sender()
.send(TimerCmd::Schedule {
tag: 0,
delay: Duration::from_millis(50),
handle: h1,
})
.unwrap();
settle(&core).await;
tokio::time::advance(Duration::from_millis(51)).await;
settle(&core).await;
let got = emitted.lock().clone();
assert_eq!(got, vec![h1], "timer should have emitted h1");
}
#[tokio::test]
async fn cancel_releases_handle() {
tokio::time::pause();
let released = Arc::new(parking_lot::Mutex::new(Vec::new()));
let (core, node, binding) = make_test_core(released.clone());
let mailbox = core.mailbox();
let task = spawn_timer_task(mailbox, node, binding.clone());
let h1 = HandleId::new(42);
binding.retain_handle(h1);
task.sender()
.send(TimerCmd::Schedule {
tag: 0,
delay: Duration::from_millis(100),
handle: h1,
})
.unwrap();
task.sender().send(TimerCmd::Cancel { tag: 0 }).unwrap();
settle(&core).await;
assert!(
released.lock().contains(&h1),
"cancelled handle should be released"
);
}
#[tokio::test]
async fn reschedule_same_tag_cancels_previous() {
tokio::time::pause();
let released = Arc::new(parking_lot::Mutex::new(Vec::new()));
let (core, node, binding) = make_test_core(released.clone());
let mailbox = core.mailbox();
let emitted = Arc::new(parking_lot::Mutex::new(Vec::<HandleId>::new()));
let em = emitted.clone();
let _sub = core.subscribe(
node,
Arc::new(move |msgs| {
for m in msgs {
if let crate::message::Message::Data(h) = m {
em.lock().push(*h);
}
}
}),
);
let task = spawn_timer_task(mailbox, node, binding.clone());
let h1 = HandleId::new(10);
let h2 = HandleId::new(20);
binding.retain_handle(h1);
binding.retain_handle(h2);
task.sender()
.send(TimerCmd::Schedule {
tag: 0,
delay: Duration::from_millis(100),
handle: h1,
})
.unwrap();
task.sender()
.send(TimerCmd::Schedule {
tag: 0,
delay: Duration::from_millis(50),
handle: h2,
})
.unwrap();
settle(&core).await;
assert!(released.lock().contains(&h1));
tokio::time::advance(Duration::from_millis(51)).await;
settle(&core).await;
let got = emitted.lock().clone();
assert_eq!(got, vec![h2], "only h2 should fire");
}
#[tokio::test]
async fn drop_handle_releases_pending() {
tokio::time::pause();
let released = Arc::new(parking_lot::Mutex::new(Vec::new()));
let (core, node, binding) = make_test_core(released.clone());
let mailbox = core.mailbox();
let mut task = spawn_timer_task(mailbox, node, binding.clone());
let h1 = HandleId::new(42);
binding.retain_handle(h1);
task.sender()
.send(TimerCmd::Schedule {
tag: 0,
delay: Duration::from_secs(1),
handle: h1,
})
.unwrap();
settle(&core).await;
task.shutdown();
settle(&core).await;
assert!(
released.lock().contains(&h1),
"pending handle should be released on shutdown"
);
}
#[tokio::test]
async fn multiple_tags_fire_independently() {
tokio::time::pause();
let released = Arc::new(parking_lot::Mutex::new(Vec::new()));
let (core, node, binding) = make_test_core(released.clone());
let mailbox = core.mailbox();
let emitted = Arc::new(parking_lot::Mutex::new(Vec::<HandleId>::new()));
let em = emitted.clone();
let _sub = core.subscribe(
node,
Arc::new(move |msgs| {
for m in msgs {
if let crate::message::Message::Data(h) = m {
em.lock().push(*h);
}
}
}),
);
let task = spawn_timer_task(mailbox, node, binding.clone());
let h1 = HandleId::new(10);
let h2 = HandleId::new(20);
binding.retain_handle(h1);
binding.retain_handle(h2);
task.sender()
.send(TimerCmd::Schedule {
tag: 0,
delay: Duration::from_millis(100),
handle: h1,
})
.unwrap();
task.sender()
.send(TimerCmd::Schedule {
tag: 1,
delay: Duration::from_millis(50),
handle: h2,
})
.unwrap();
settle(&core).await;
tokio::time::advance(Duration::from_millis(51)).await;
settle(&core).await;
assert_eq!(*emitted.lock(), vec![h2]);
tokio::time::advance(Duration::from_millis(50)).await;
settle(&core).await;
assert_eq!(*emitted.lock(), vec![h2, h1]);
}
}