1use std::sync::Arc;
37use std::time::Duration;
38
39use tokio::sync::mpsc;
40
41use crate::boundary::BindingBoundary;
42use crate::handle::{HandleId, NodeId};
43use crate::node::WeakCore;
44
45pub type TimerCallback = Box<dyn Fn() + Send + Sync>;
52
53pub enum TimerCmd {
55 Schedule {
59 tag: u32,
60 delay: Duration,
61 handle: HandleId,
62 },
63 ScheduleCallback {
68 tag: u32,
69 delay: Duration,
70 callback: TimerCallback,
71 },
72 Cancel { tag: u32 },
75 CancelAll,
77}
78
79impl std::fmt::Debug for TimerCmd {
80 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
81 match self {
82 Self::Schedule { tag, delay, handle } => f
83 .debug_struct("Schedule")
84 .field("tag", tag)
85 .field("delay", delay)
86 .field("handle", handle)
87 .finish(),
88 Self::ScheduleCallback { tag, delay, .. } => f
89 .debug_struct("ScheduleCallback")
90 .field("tag", tag)
91 .field("delay", delay)
92 .finish_non_exhaustive(),
93 Self::Cancel { tag } => f.debug_struct("Cancel").field("tag", tag).finish(),
94 Self::CancelAll => write!(f, "CancelAll"),
95 }
96 }
97}
98
99pub type TimerSender = mpsc::UnboundedSender<TimerCmd>;
101
102#[must_use]
104pub struct TimerTaskHandle {
105 tx: Option<mpsc::UnboundedSender<TimerCmd>>,
106}
107
108impl TimerTaskHandle {
109 #[must_use]
115 pub fn sender(&self) -> TimerSender {
116 self.tx
117 .as_ref()
118 .expect("TimerTaskHandle: sender already taken via shutdown")
119 .clone()
120 }
121
122 pub fn shutdown(&mut self) {
125 if let Some(tx) = self.tx.take() {
126 let _ = tx.send(TimerCmd::CancelAll);
127 }
129 }
130}
131
132impl Drop for TimerTaskHandle {
133 fn drop(&mut self) {
134 self.shutdown();
135 }
136}
137
138pub fn spawn_timer_task(
152 core: WeakCore,
153 node_id: NodeId,
154 binding: Arc<dyn BindingBoundary>,
155) -> TimerTaskHandle {
156 let (tx, rx) = mpsc::unbounded_channel();
157 tokio::spawn(timer_task_loop(rx, core, node_id, binding));
158 TimerTaskHandle { tx: Some(tx) }
159}
160
161async fn timer_task_loop(
167 mut rx: mpsc::UnboundedReceiver<TimerCmd>,
168 core: WeakCore,
169 node_id: NodeId,
170 binding: Arc<dyn BindingBoundary>,
171) {
172 let mut slots: Vec<TimerSlot> = Vec::new();
173
174 loop {
175 let next_fire = nearest_deadline(&slots);
177
178 tokio::select! {
179 biased; cmd = rx.recv() => {
182 match cmd {
183 Some(TimerCmd::Schedule { tag, delay, handle }) => {
184 cancel_slot(&mut slots, tag, &*binding);
185 let deadline = tokio::time::Instant::now() + delay;
186 slots.push(TimerSlot { tag, kind: TimerSlotKind::Emit(handle), deadline });
187 }
188 Some(TimerCmd::ScheduleCallback { tag, delay, callback }) => {
189 cancel_slot(&mut slots, tag, &*binding);
190 let deadline = tokio::time::Instant::now() + delay;
191 slots.push(TimerSlot { tag, kind: TimerSlotKind::Callback(callback), deadline });
192 }
193 Some(TimerCmd::Cancel { tag }) => {
194 cancel_slot(&mut slots, tag, &*binding);
195 }
196 Some(TimerCmd::CancelAll) => {
197 release_all_slots(&mut slots, &*binding);
198 }
199 None => {
200 release_all_slots(&mut slots, &*binding);
202 return;
203 }
204 }
205 }
206
207 () = sleep_until_or_forever(next_fire) => {
208 let now = tokio::time::Instant::now();
210 let mut i = 0;
211 while i < slots.len() {
212 if slots[i].deadline <= now {
213 let slot = slots.swap_remove(i);
214 match slot.kind {
215 TimerSlotKind::Emit(handle) => {
216 if let Some(c) = core.upgrade() {
217 c.emit(node_id, handle);
218 } else {
219 binding.release_handle(handle);
220 release_all_slots(&mut slots, &*binding);
221 return;
222 }
223 }
224 TimerSlotKind::Callback(cb) => {
225 cb();
226 }
227 }
228 } else {
230 i += 1;
231 }
232 }
233 }
234 }
235 }
236}
237
238enum TimerSlotKind {
243 Emit(HandleId),
245 Callback(TimerCallback),
247}
248
249struct TimerSlot {
250 tag: u32,
251 kind: TimerSlotKind,
252 deadline: tokio::time::Instant,
253}
254
255fn cancel_slot(slots: &mut Vec<TimerSlot>, tag: u32, binding: &dyn BindingBoundary) {
256 if let Some(pos) = slots.iter().position(|s| s.tag == tag) {
257 let slot = slots.swap_remove(pos);
258 if let TimerSlotKind::Emit(h) = slot.kind {
259 binding.release_handle(h);
260 }
261 }
262}
263
264fn release_all_slots(slots: &mut Vec<TimerSlot>, binding: &dyn BindingBoundary) {
265 for slot in slots.drain(..) {
266 if let TimerSlotKind::Emit(h) = slot.kind {
267 binding.release_handle(h);
268 }
269 }
270}
271
272fn nearest_deadline(slots: &[TimerSlot]) -> Option<tokio::time::Instant> {
273 slots.iter().map(|s| s.deadline).min()
274}
275
276async fn sleep_until_or_forever(deadline: Option<tokio::time::Instant>) {
278 match deadline {
279 Some(d) => tokio::time::sleep_until(d).await,
280 None => std::future::pending::<()>().await,
281 }
282}
283
284#[cfg(test)]
289mod tests {
290 use super::*;
291 use crate::boundary::{DepBatch, FnResult};
292 use crate::handle::FnId;
293
294 async fn settle() {
297 for _ in 0..10 {
298 tokio::task::yield_now().await;
299 }
300 }
301
302 struct TimerTestBinding {
304 released: Arc<parking_lot::Mutex<Vec<HandleId>>>,
305 }
306
307 impl TimerTestBinding {
308 fn new(released: Arc<parking_lot::Mutex<Vec<HandleId>>>) -> Self {
309 Self { released }
310 }
311 }
312
313 impl BindingBoundary for TimerTestBinding {
314 fn invoke_fn(&self, _node_id: NodeId, _fn_id: FnId, _dep_data: &[DepBatch]) -> FnResult {
315 FnResult::Noop { tracked: None }
316 }
317
318 fn custom_equals(&self, _equals_handle: FnId, _a: HandleId, _b: HandleId) -> bool {
319 false
320 }
321
322 fn release_handle(&self, h: HandleId) {
323 self.released.lock().push(h);
324 }
325 }
326
327 fn make_test_core(
328 released: Arc<parking_lot::Mutex<Vec<HandleId>>>,
329 ) -> (crate::node::Core, NodeId, Arc<dyn BindingBoundary>) {
330 let binding: Arc<dyn BindingBoundary> = Arc::new(TimerTestBinding::new(released));
331 let core = crate::node::Core::new(binding.clone());
332 let s = core
333 .register_state(crate::handle::NO_HANDLE, false)
334 .unwrap();
335 (core, s, binding)
336 }
337
338 #[tokio::test]
339 async fn schedule_fires_after_delay() {
340 tokio::time::pause();
341
342 let released = Arc::new(parking_lot::Mutex::new(Vec::new()));
343 let (core, node, binding) = make_test_core(released.clone());
344 let weak = core.weak_handle();
345
346 let emitted = Arc::new(parking_lot::Mutex::new(Vec::<HandleId>::new()));
347 let em = emitted.clone();
348 let _sub = core.subscribe(
349 node,
350 Arc::new(move |msgs| {
351 for m in msgs {
352 if let crate::message::Message::Data(h) = m {
353 em.lock().push(*h);
354 }
355 }
356 }),
357 );
358
359 let task = spawn_timer_task(weak, node, binding.clone());
360 let h1 = HandleId::new(42);
361 binding.retain_handle(h1);
362
363 task.sender()
364 .send(TimerCmd::Schedule {
365 tag: 0,
366 delay: Duration::from_millis(50),
367 handle: h1,
368 })
369 .unwrap();
370
371 settle().await; tokio::time::advance(Duration::from_millis(51)).await;
374 settle().await; let got = emitted.lock().clone();
377 assert_eq!(got, vec![h1], "timer should have emitted h1");
378 }
379
380 #[tokio::test]
381 async fn cancel_releases_handle() {
382 tokio::time::pause();
383
384 let released = Arc::new(parking_lot::Mutex::new(Vec::new()));
385 let (core, node, binding) = make_test_core(released.clone());
386 let weak = core.weak_handle();
387
388 let task = spawn_timer_task(weak, node, binding.clone());
389 let h1 = HandleId::new(42);
390 binding.retain_handle(h1);
391
392 task.sender()
393 .send(TimerCmd::Schedule {
394 tag: 0,
395 delay: Duration::from_millis(100),
396 handle: h1,
397 })
398 .unwrap();
399
400 task.sender().send(TimerCmd::Cancel { tag: 0 }).unwrap();
401 settle().await;
402
403 assert!(
404 released.lock().contains(&h1),
405 "cancelled handle should be released"
406 );
407 }
408
409 #[tokio::test]
410 async fn reschedule_same_tag_cancels_previous() {
411 tokio::time::pause();
412
413 let released = Arc::new(parking_lot::Mutex::new(Vec::new()));
414 let (core, node, binding) = make_test_core(released.clone());
415 let weak = core.weak_handle();
416
417 let emitted = Arc::new(parking_lot::Mutex::new(Vec::<HandleId>::new()));
418 let em = emitted.clone();
419 let _sub = core.subscribe(
420 node,
421 Arc::new(move |msgs| {
422 for m in msgs {
423 if let crate::message::Message::Data(h) = m {
424 em.lock().push(*h);
425 }
426 }
427 }),
428 );
429
430 let task = spawn_timer_task(weak, node, binding.clone());
431 let h1 = HandleId::new(10);
432 let h2 = HandleId::new(20);
433 binding.retain_handle(h1);
434 binding.retain_handle(h2);
435
436 task.sender()
438 .send(TimerCmd::Schedule {
439 tag: 0,
440 delay: Duration::from_millis(100),
441 handle: h1,
442 })
443 .unwrap();
444
445 task.sender()
447 .send(TimerCmd::Schedule {
448 tag: 0,
449 delay: Duration::from_millis(50),
450 handle: h2,
451 })
452 .unwrap();
453
454 settle().await;
455
456 assert!(released.lock().contains(&h1));
458
459 tokio::time::advance(Duration::from_millis(51)).await;
461 settle().await;
462
463 let got = emitted.lock().clone();
464 assert_eq!(got, vec![h2], "only h2 should fire");
465 }
466
467 #[tokio::test]
468 async fn drop_handle_releases_pending() {
469 tokio::time::pause();
470
471 let released = Arc::new(parking_lot::Mutex::new(Vec::new()));
472 let (core, node, binding) = make_test_core(released.clone());
473 let weak = core.weak_handle();
474
475 let mut task = spawn_timer_task(weak, node, binding.clone());
476 let h1 = HandleId::new(42);
477 binding.retain_handle(h1);
478
479 task.sender()
480 .send(TimerCmd::Schedule {
481 tag: 0,
482 delay: Duration::from_secs(1),
483 handle: h1,
484 })
485 .unwrap();
486
487 settle().await;
488
489 task.shutdown();
491 settle().await;
492
493 assert!(
494 released.lock().contains(&h1),
495 "pending handle should be released on shutdown"
496 );
497 }
498
499 #[tokio::test]
500 async fn multiple_tags_fire_independently() {
501 tokio::time::pause();
502
503 let released = Arc::new(parking_lot::Mutex::new(Vec::new()));
504 let (core, node, binding) = make_test_core(released.clone());
505 let weak = core.weak_handle();
506
507 let emitted = Arc::new(parking_lot::Mutex::new(Vec::<HandleId>::new()));
508 let em = emitted.clone();
509 let _sub = core.subscribe(
510 node,
511 Arc::new(move |msgs| {
512 for m in msgs {
513 if let crate::message::Message::Data(h) = m {
514 em.lock().push(*h);
515 }
516 }
517 }),
518 );
519
520 let task = spawn_timer_task(weak, node, binding.clone());
521 let h1 = HandleId::new(10);
522 let h2 = HandleId::new(20);
523 binding.retain_handle(h1);
524 binding.retain_handle(h2);
525
526 task.sender()
528 .send(TimerCmd::Schedule {
529 tag: 0,
530 delay: Duration::from_millis(100),
531 handle: h1,
532 })
533 .unwrap();
534 task.sender()
535 .send(TimerCmd::Schedule {
536 tag: 1,
537 delay: Duration::from_millis(50),
538 handle: h2,
539 })
540 .unwrap();
541
542 settle().await; tokio::time::advance(Duration::from_millis(51)).await;
547 settle().await;
548 assert_eq!(*emitted.lock(), vec![h2]);
549
550 tokio::time::advance(Duration::from_millis(50)).await;
552 settle().await;
553 assert_eq!(*emitted.lock(), vec![h2, h1]);
554 }
555}