1use std::sync::Arc;
42use std::time::Duration;
43
44use tokio::sync::mpsc;
45
46use crate::boundary::BindingBoundary;
47use crate::handle::{HandleId, NodeId};
48use crate::mailbox::CoreMailbox;
49
50pub type TimerCallback = Box<dyn Fn() + Send + Sync>;
57
58pub enum TimerCmd {
60 Schedule {
64 tag: u32,
65 delay: Duration,
66 handle: HandleId,
67 },
68 ScheduleCallback {
73 tag: u32,
74 delay: Duration,
75 callback: TimerCallback,
76 },
77 Cancel { tag: u32 },
80 CancelAll,
82}
83
84impl std::fmt::Debug for TimerCmd {
85 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
86 match self {
87 Self::Schedule { tag, delay, handle } => f
88 .debug_struct("Schedule")
89 .field("tag", tag)
90 .field("delay", delay)
91 .field("handle", handle)
92 .finish(),
93 Self::ScheduleCallback { tag, delay, .. } => f
94 .debug_struct("ScheduleCallback")
95 .field("tag", tag)
96 .field("delay", delay)
97 .finish_non_exhaustive(),
98 Self::Cancel { tag } => f.debug_struct("Cancel").field("tag", tag).finish(),
99 Self::CancelAll => write!(f, "CancelAll"),
100 }
101 }
102}
103
104pub type TimerSender = mpsc::UnboundedSender<TimerCmd>;
106
107#[must_use]
109pub struct TimerTaskHandle {
110 tx: Option<mpsc::UnboundedSender<TimerCmd>>,
111}
112
113impl TimerTaskHandle {
114 #[must_use]
120 pub fn sender(&self) -> TimerSender {
121 self.tx
122 .as_ref()
123 .expect("TimerTaskHandle: sender already taken via shutdown")
124 .clone()
125 }
126
127 pub fn shutdown(&mut self) {
130 if let Some(tx) = self.tx.take() {
131 let _ = tx.send(TimerCmd::CancelAll);
132 }
134 }
135}
136
137impl Drop for TimerTaskHandle {
138 fn drop(&mut self) {
139 self.shutdown();
140 }
141}
142
143pub fn spawn_timer_task(
157 mailbox: Arc<CoreMailbox>,
158 node_id: NodeId,
159 binding: Arc<dyn BindingBoundary>,
160) -> TimerTaskHandle {
161 let (tx, rx) = mpsc::unbounded_channel();
162 tokio::spawn(timer_task_loop(rx, mailbox, node_id, binding));
163 TimerTaskHandle { tx: Some(tx) }
164}
165
166async fn timer_task_loop(
172 mut rx: mpsc::UnboundedReceiver<TimerCmd>,
173 mailbox: Arc<CoreMailbox>,
174 node_id: NodeId,
175 binding: Arc<dyn BindingBoundary>,
176) {
177 let mut slots: Vec<TimerSlot> = Vec::new();
178
179 loop {
180 let next_fire = nearest_deadline(&slots);
182
183 tokio::select! {
184 biased; cmd = rx.recv() => {
187 match cmd {
188 Some(TimerCmd::Schedule { tag, delay, handle }) => {
189 cancel_slot(&mut slots, tag, &*binding);
190 let deadline = tokio::time::Instant::now() + delay;
191 slots.push(TimerSlot { tag, kind: TimerSlotKind::Emit(handle), deadline });
192 }
193 Some(TimerCmd::ScheduleCallback { tag, delay, callback }) => {
194 cancel_slot(&mut slots, tag, &*binding);
195 let deadline = tokio::time::Instant::now() + delay;
196 slots.push(TimerSlot { tag, kind: TimerSlotKind::Callback(callback), deadline });
197 }
198 Some(TimerCmd::Cancel { tag }) => {
199 cancel_slot(&mut slots, tag, &*binding);
200 }
201 Some(TimerCmd::CancelAll) => {
202 release_all_slots(&mut slots, &*binding);
203 }
204 None => {
205 release_all_slots(&mut slots, &*binding);
207 return;
208 }
209 }
210 }
211
212 () = sleep_until_or_forever(next_fire) => {
213 let now = tokio::time::Instant::now();
215 let mut i = 0;
216 while i < slots.len() {
217 if slots[i].deadline <= now {
218 let slot = slots.swap_remove(i);
219 match slot.kind {
220 TimerSlotKind::Emit(handle) => {
221 if !mailbox.post_emit(node_id, handle) {
233 binding.release_handle(handle);
234 release_all_slots(&mut slots, &*binding);
235 return;
236 }
237 }
238 TimerSlotKind::Callback(cb) => {
239 cb();
240 }
241 }
242 } else {
244 i += 1;
245 }
246 }
247 }
248 }
249 }
250}
251
252enum TimerSlotKind {
257 Emit(HandleId),
259 Callback(TimerCallback),
261}
262
263struct TimerSlot {
264 tag: u32,
265 kind: TimerSlotKind,
266 deadline: tokio::time::Instant,
267}
268
269fn cancel_slot(slots: &mut Vec<TimerSlot>, tag: u32, binding: &dyn BindingBoundary) {
270 if let Some(pos) = slots.iter().position(|s| s.tag == tag) {
271 let slot = slots.swap_remove(pos);
272 if let TimerSlotKind::Emit(h) = slot.kind {
273 binding.release_handle(h);
274 }
275 }
276}
277
278fn release_all_slots(slots: &mut Vec<TimerSlot>, binding: &dyn BindingBoundary) {
279 for slot in slots.drain(..) {
280 if let TimerSlotKind::Emit(h) = slot.kind {
281 binding.release_handle(h);
282 }
283 }
284}
285
286fn nearest_deadline(slots: &[TimerSlot]) -> Option<tokio::time::Instant> {
287 slots.iter().map(|s| s.deadline).min()
288}
289
290async fn sleep_until_or_forever(deadline: Option<tokio::time::Instant>) {
292 match deadline {
293 Some(d) => tokio::time::sleep_until(d).await,
294 None => std::future::pending::<()>().await,
295 }
296}
297
298#[cfg(test)]
303mod tests {
304 use super::*;
305 use crate::boundary::{DepBatch, FnResult};
306 use crate::handle::FnId;
307
308 async fn settle(core: &crate::node::Core) {
317 for _ in 0..10 {
318 tokio::task::yield_now().await;
319 }
320 core.drain_mailbox();
321 }
322
323 struct TimerTestBinding {
325 released: Arc<parking_lot::Mutex<Vec<HandleId>>>,
326 }
327
328 impl TimerTestBinding {
329 fn new(released: Arc<parking_lot::Mutex<Vec<HandleId>>>) -> Self {
330 Self { released }
331 }
332 }
333
334 impl BindingBoundary for TimerTestBinding {
335 fn invoke_fn(&self, _node_id: NodeId, _fn_id: FnId, _dep_data: &[DepBatch]) -> FnResult {
336 FnResult::Noop { tracked: None }
337 }
338
339 fn custom_equals(&self, _equals_handle: FnId, _a: HandleId, _b: HandleId) -> bool {
340 false
341 }
342
343 fn release_handle(&self, h: HandleId) {
344 self.released.lock().push(h);
345 }
346 }
347
348 fn make_test_core(
349 released: Arc<parking_lot::Mutex<Vec<HandleId>>>,
350 ) -> (crate::node::Core, NodeId, Arc<dyn BindingBoundary>) {
351 let binding: Arc<dyn BindingBoundary> = Arc::new(TimerTestBinding::new(released));
352 let core = crate::node::Core::new(binding.clone());
353 let s = core
354 .register_state(crate::handle::NO_HANDLE, false)
355 .unwrap();
356 (core, s, binding)
357 }
358
359 #[tokio::test]
360 async fn schedule_fires_after_delay() {
361 tokio::time::pause();
362
363 let released = Arc::new(parking_lot::Mutex::new(Vec::new()));
364 let (core, node, binding) = make_test_core(released.clone());
365 let mailbox = core.mailbox();
366
367 let emitted = Arc::new(parking_lot::Mutex::new(Vec::<HandleId>::new()));
368 let em = emitted.clone();
369 let _sub = core.subscribe(
370 node,
371 Arc::new(move |msgs| {
372 for m in msgs {
373 if let crate::message::Message::Data(h) = m {
374 em.lock().push(*h);
375 }
376 }
377 }),
378 );
379
380 let task = spawn_timer_task(mailbox, node, binding.clone());
381 let h1 = HandleId::new(42);
382 binding.retain_handle(h1);
383
384 task.sender()
385 .send(TimerCmd::Schedule {
386 tag: 0,
387 delay: Duration::from_millis(50),
388 handle: h1,
389 })
390 .unwrap();
391
392 settle(&core).await; tokio::time::advance(Duration::from_millis(51)).await;
395 settle(&core).await; let got = emitted.lock().clone();
398 assert_eq!(got, vec![h1], "timer should have emitted h1");
399 }
400
401 #[tokio::test]
402 async fn cancel_releases_handle() {
403 tokio::time::pause();
404
405 let released = Arc::new(parking_lot::Mutex::new(Vec::new()));
406 let (core, node, binding) = make_test_core(released.clone());
407 let mailbox = core.mailbox();
408
409 let task = spawn_timer_task(mailbox, node, binding.clone());
410 let h1 = HandleId::new(42);
411 binding.retain_handle(h1);
412
413 task.sender()
414 .send(TimerCmd::Schedule {
415 tag: 0,
416 delay: Duration::from_millis(100),
417 handle: h1,
418 })
419 .unwrap();
420
421 task.sender().send(TimerCmd::Cancel { tag: 0 }).unwrap();
422 settle(&core).await;
423
424 assert!(
425 released.lock().contains(&h1),
426 "cancelled handle should be released"
427 );
428 }
429
430 #[tokio::test]
431 async fn reschedule_same_tag_cancels_previous() {
432 tokio::time::pause();
433
434 let released = Arc::new(parking_lot::Mutex::new(Vec::new()));
435 let (core, node, binding) = make_test_core(released.clone());
436 let mailbox = core.mailbox();
437
438 let emitted = Arc::new(parking_lot::Mutex::new(Vec::<HandleId>::new()));
439 let em = emitted.clone();
440 let _sub = core.subscribe(
441 node,
442 Arc::new(move |msgs| {
443 for m in msgs {
444 if let crate::message::Message::Data(h) = m {
445 em.lock().push(*h);
446 }
447 }
448 }),
449 );
450
451 let task = spawn_timer_task(mailbox, node, binding.clone());
452 let h1 = HandleId::new(10);
453 let h2 = HandleId::new(20);
454 binding.retain_handle(h1);
455 binding.retain_handle(h2);
456
457 task.sender()
459 .send(TimerCmd::Schedule {
460 tag: 0,
461 delay: Duration::from_millis(100),
462 handle: h1,
463 })
464 .unwrap();
465
466 task.sender()
468 .send(TimerCmd::Schedule {
469 tag: 0,
470 delay: Duration::from_millis(50),
471 handle: h2,
472 })
473 .unwrap();
474
475 settle(&core).await;
476
477 assert!(released.lock().contains(&h1));
479
480 tokio::time::advance(Duration::from_millis(51)).await;
482 settle(&core).await;
483
484 let got = emitted.lock().clone();
485 assert_eq!(got, vec![h2], "only h2 should fire");
486 }
487
488 #[tokio::test]
489 async fn drop_handle_releases_pending() {
490 tokio::time::pause();
491
492 let released = Arc::new(parking_lot::Mutex::new(Vec::new()));
493 let (core, node, binding) = make_test_core(released.clone());
494 let mailbox = core.mailbox();
495
496 let mut task = spawn_timer_task(mailbox, node, binding.clone());
497 let h1 = HandleId::new(42);
498 binding.retain_handle(h1);
499
500 task.sender()
501 .send(TimerCmd::Schedule {
502 tag: 0,
503 delay: Duration::from_secs(1),
504 handle: h1,
505 })
506 .unwrap();
507
508 settle(&core).await;
509
510 task.shutdown();
512 settle(&core).await;
513
514 assert!(
515 released.lock().contains(&h1),
516 "pending handle should be released on shutdown"
517 );
518 }
519
520 #[tokio::test]
521 async fn multiple_tags_fire_independently() {
522 tokio::time::pause();
523
524 let released = Arc::new(parking_lot::Mutex::new(Vec::new()));
525 let (core, node, binding) = make_test_core(released.clone());
526 let mailbox = core.mailbox();
527
528 let emitted = Arc::new(parking_lot::Mutex::new(Vec::<HandleId>::new()));
529 let em = emitted.clone();
530 let _sub = core.subscribe(
531 node,
532 Arc::new(move |msgs| {
533 for m in msgs {
534 if let crate::message::Message::Data(h) = m {
535 em.lock().push(*h);
536 }
537 }
538 }),
539 );
540
541 let task = spawn_timer_task(mailbox, node, binding.clone());
542 let h1 = HandleId::new(10);
543 let h2 = HandleId::new(20);
544 binding.retain_handle(h1);
545 binding.retain_handle(h2);
546
547 task.sender()
549 .send(TimerCmd::Schedule {
550 tag: 0,
551 delay: Duration::from_millis(100),
552 handle: h1,
553 })
554 .unwrap();
555 task.sender()
556 .send(TimerCmd::Schedule {
557 tag: 1,
558 delay: Duration::from_millis(50),
559 handle: h2,
560 })
561 .unwrap();
562
563 settle(&core).await; tokio::time::advance(Duration::from_millis(51)).await;
568 settle(&core).await;
569 assert_eq!(*emitted.lock(), vec![h2]);
570
571 tokio::time::advance(Duration::from_millis(50)).await;
573 settle(&core).await;
574 assert_eq!(*emitted.lock(), vec![h2, h1]);
575 }
576}