Skip to main content

graphrefly_core/
timer.rs

1//! Tokio-based timer substrate for deferred emissions.
2//!
3//! Feature-gated behind `tokio`. Core's dispatcher stays fully synchronous;
4//! this module provides a minimal async boundary for timer scheduling.
5//!
6//! # Design (2026-05-11)
7//!
8//! Timer-dependent operators (debounce, throttle, delay, audit) and
9//! infrastructure consumers (storage `debounce_ms`) need to schedule
10//! deferred work — "emit this handle in 50ms" or "flush after 200ms of
11//! quiet." These are async concerns that Core's sync dispatcher can't
12//! own directly.
13//!
14//! The substrate provides:
15//!
16//! - [`TimerTaskHandle`] — command channel from sync code to a tokio task.
17//!   Non-blocking send; drop-to-shutdown.
18//! - [`spawn_timer_task`] — spawns a tokio task that manages tagged timer
19//!   slots for one node, wires the channel, returns the handle.
20//!
21//! Operators send [`TimerCmd`] from their sync fn-fire to schedule,
22//! cancel, or cancel-all timers. The task manages deadlines via
23//! `tokio::time` and calls `Core::emit()` when a timer fires, which
24//! enters the wave engine via the normal cross-thread emit path
25//! (partition `wave_owner` serializes).
26//!
27//! Timer sources (fromTimer, interval) use the producer substrate +
28//! this module: the producer's build closure spawns a timer task on
29//! activation; deactivation drops the handle, shutting down the task.
30//!
31//! # Testing
32//!
33//! Use `tokio::time::pause()` + `tokio::time::advance()` for
34//! deterministic timer control in tests. No mock infrastructure needed.
35
36use std::sync::Arc;
37use std::time::Duration;
38
39use tokio::sync::mpsc;
40
41use crate::boundary::BindingBoundary;
42use crate::handle::{HandleId, NodeId};
43use crate::node::WeakCore;
44
45// ---------------------------------------------------------------------------
46// Public types
47// ---------------------------------------------------------------------------
48
49/// Callback invoked when a timer fires. Runs on the tokio task thread.
50/// Must not block or hold locks across `Core::emit`.
51pub type TimerCallback = Box<dyn Fn() + Send + Sync>;
52
53/// Command sent from the operator fn (sync) to the timer task (async).
54pub enum TimerCmd {
55    /// Schedule a timer: after `delay`, emit `handle` on the node.
56    /// If a timer with the same `tag` is already pending, it is cancelled
57    /// and the old handle is released.
58    Schedule {
59        tag: u32,
60        delay: Duration,
61        handle: HandleId,
62    },
63    /// Schedule a callback-only timer: after `delay`, invoke `callback`.
64    /// No handle emission — the callback manages state and may call
65    /// `Core::emit` itself. Used by operators (debounce, throttle) that
66    /// need finer control than the emit-on-fire pattern.
67    ScheduleCallback {
68        tag: u32,
69        delay: Duration,
70        callback: TimerCallback,
71    },
72    /// Cancel a pending timer by tag. The held handle (if any) is released.
73    /// Callback-only timers have no handle to release.
74    Cancel { tag: u32 },
75    /// Cancel all pending timers and release all held handles.
76    CancelAll,
77}
78
79impl std::fmt::Debug for TimerCmd {
80    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
81        match self {
82            Self::Schedule { tag, delay, handle } => f
83                .debug_struct("Schedule")
84                .field("tag", tag)
85                .field("delay", delay)
86                .field("handle", handle)
87                .finish(),
88            Self::ScheduleCallback { tag, delay, .. } => f
89                .debug_struct("ScheduleCallback")
90                .field("tag", tag)
91                .field("delay", delay)
92                .finish_non_exhaustive(),
93            Self::Cancel { tag } => f.debug_struct("Cancel").field("tag", tag).finish(),
94            Self::CancelAll => write!(f, "CancelAll"),
95        }
96    }
97}
98
99/// Sender half — stored in the operator's scratch for sync command dispatch.
100pub type TimerSender = mpsc::UnboundedSender<TimerCmd>;
101
102/// Handle to a running timer task. Dropping this shuts down the task.
103#[must_use]
104pub struct TimerTaskHandle {
105    tx: Option<mpsc::UnboundedSender<TimerCmd>>,
106}
107
108impl TimerTaskHandle {
109    /// Get a clone of the sender for use in operator scratch.
110    ///
111    /// # Panics
112    ///
113    /// Panics if called after `shutdown()`.
114    #[must_use]
115    pub fn sender(&self) -> TimerSender {
116        self.tx
117            .as_ref()
118            .expect("TimerTaskHandle: sender already taken via shutdown")
119            .clone()
120    }
121
122    /// Explicitly shut down the task. Sends `CancelAll` then drops the
123    /// sender so the task exits cleanly.
124    pub fn shutdown(&mut self) {
125        if let Some(tx) = self.tx.take() {
126            let _ = tx.send(TimerCmd::CancelAll);
127            // Drop tx -> channel closes -> task exits on next poll.
128        }
129    }
130}
131
132impl Drop for TimerTaskHandle {
133    fn drop(&mut self) {
134        self.shutdown();
135    }
136}
137
138// ---------------------------------------------------------------------------
139// Task spawn
140// ---------------------------------------------------------------------------
141
142/// Spawn a timer task for a single operator/source node.
143///
144/// The task runs until the channel closes (sender dropped). It manages
145/// pending timers tagged by `u32`. When a timer fires, it calls
146/// `core.emit(node_id, handle)`.
147///
148/// # Panics
149///
150/// Must be called from within a tokio runtime context.
151pub fn spawn_timer_task(
152    core: WeakCore,
153    node_id: NodeId,
154    binding: Arc<dyn BindingBoundary>,
155) -> TimerTaskHandle {
156    let (tx, rx) = mpsc::unbounded_channel();
157    tokio::spawn(timer_task_loop(rx, core, node_id, binding));
158    TimerTaskHandle { tx: Some(tx) }
159}
160
161// ---------------------------------------------------------------------------
162// Task loop
163// ---------------------------------------------------------------------------
164
165/// Internal: the async task that manages timers for one node.
166async fn timer_task_loop(
167    mut rx: mpsc::UnboundedReceiver<TimerCmd>,
168    core: WeakCore,
169    node_id: NodeId,
170    binding: Arc<dyn BindingBoundary>,
171) {
172    let mut slots: Vec<TimerSlot> = Vec::new();
173
174    loop {
175        // Find the nearest deadline among active slots.
176        let next_fire = nearest_deadline(&slots);
177
178        tokio::select! {
179            biased; // Prefer commands over timer fires to process cancels first.
180
181            cmd = rx.recv() => {
182                match cmd {
183                    Some(TimerCmd::Schedule { tag, delay, handle }) => {
184                        cancel_slot(&mut slots, tag, &*binding);
185                        let deadline = tokio::time::Instant::now() + delay;
186                        slots.push(TimerSlot { tag, kind: TimerSlotKind::Emit(handle), deadline });
187                    }
188                    Some(TimerCmd::ScheduleCallback { tag, delay, callback }) => {
189                        cancel_slot(&mut slots, tag, &*binding);
190                        let deadline = tokio::time::Instant::now() + delay;
191                        slots.push(TimerSlot { tag, kind: TimerSlotKind::Callback(callback), deadline });
192                    }
193                    Some(TimerCmd::Cancel { tag }) => {
194                        cancel_slot(&mut slots, tag, &*binding);
195                    }
196                    Some(TimerCmd::CancelAll) => {
197                        release_all_slots(&mut slots, &*binding);
198                    }
199                    None => {
200                        // Channel closed — operator deactivated / torn down.
201                        release_all_slots(&mut slots, &*binding);
202                        return;
203                    }
204                }
205            }
206
207            () = sleep_until_or_forever(next_fire) => {
208                // At least one timer fired. Drain all expired slots.
209                let now = tokio::time::Instant::now();
210                let mut i = 0;
211                while i < slots.len() {
212                    if slots[i].deadline <= now {
213                        let slot = slots.swap_remove(i);
214                        match slot.kind {
215                            TimerSlotKind::Emit(handle) => {
216                                if let Some(c) = core.upgrade() {
217                                    c.emit(node_id, handle);
218                                } else {
219                                    binding.release_handle(handle);
220                                    release_all_slots(&mut slots, &*binding);
221                                    return;
222                                }
223                            }
224                            TimerSlotKind::Callback(cb) => {
225                                cb();
226                            }
227                        }
228                        // Don't increment i — swap_remove moved the last element here.
229                    } else {
230                        i += 1;
231                    }
232                }
233            }
234        }
235    }
236}
237
238// ---------------------------------------------------------------------------
239// Internal helpers
240// ---------------------------------------------------------------------------
241
242enum TimerSlotKind {
243    /// Emit handle on fire; release on cancel.
244    Emit(HandleId),
245    /// Invoke callback on fire; nothing to release on cancel.
246    Callback(TimerCallback),
247}
248
249struct TimerSlot {
250    tag: u32,
251    kind: TimerSlotKind,
252    deadline: tokio::time::Instant,
253}
254
255fn cancel_slot(slots: &mut Vec<TimerSlot>, tag: u32, binding: &dyn BindingBoundary) {
256    if let Some(pos) = slots.iter().position(|s| s.tag == tag) {
257        let slot = slots.swap_remove(pos);
258        if let TimerSlotKind::Emit(h) = slot.kind {
259            binding.release_handle(h);
260        }
261    }
262}
263
264fn release_all_slots(slots: &mut Vec<TimerSlot>, binding: &dyn BindingBoundary) {
265    for slot in slots.drain(..) {
266        if let TimerSlotKind::Emit(h) = slot.kind {
267            binding.release_handle(h);
268        }
269    }
270}
271
272fn nearest_deadline(slots: &[TimerSlot]) -> Option<tokio::time::Instant> {
273    slots.iter().map(|s| s.deadline).min()
274}
275
276/// Sleep until the given instant, or sleep forever if `None`.
277async fn sleep_until_or_forever(deadline: Option<tokio::time::Instant>) {
278    match deadline {
279        Some(d) => tokio::time::sleep_until(d).await,
280        None => std::future::pending::<()>().await,
281    }
282}
283
284// ---------------------------------------------------------------------------
285// Tests
286// ---------------------------------------------------------------------------
287
288#[cfg(test)]
289mod tests {
290    use super::*;
291    use crate::boundary::{DepBatch, FnResult};
292    use crate::handle::FnId;
293
294    /// Yield multiple times to let spawned tasks process commands and
295    /// complete emit→wave→sink chains in the current-thread runtime.
296    async fn settle() {
297        for _ in 0..10 {
298            tokio::task::yield_now().await;
299        }
300    }
301
302    /// Minimal binding for timer tests — tracks released handles.
303    struct TimerTestBinding {
304        released: Arc<parking_lot::Mutex<Vec<HandleId>>>,
305    }
306
307    impl TimerTestBinding {
308        fn new(released: Arc<parking_lot::Mutex<Vec<HandleId>>>) -> Self {
309            Self { released }
310        }
311    }
312
313    impl BindingBoundary for TimerTestBinding {
314        fn invoke_fn(&self, _node_id: NodeId, _fn_id: FnId, _dep_data: &[DepBatch]) -> FnResult {
315            FnResult::Noop { tracked: None }
316        }
317
318        fn custom_equals(&self, _equals_handle: FnId, _a: HandleId, _b: HandleId) -> bool {
319            false
320        }
321
322        fn release_handle(&self, h: HandleId) {
323            self.released.lock().push(h);
324        }
325    }
326
327    fn make_test_core(
328        released: Arc<parking_lot::Mutex<Vec<HandleId>>>,
329    ) -> (crate::node::Core, NodeId, Arc<dyn BindingBoundary>) {
330        let binding: Arc<dyn BindingBoundary> = Arc::new(TimerTestBinding::new(released));
331        let core = crate::node::Core::new(binding.clone());
332        let s = core
333            .register_state(crate::handle::NO_HANDLE, false)
334            .unwrap();
335        (core, s, binding)
336    }
337
338    #[tokio::test]
339    async fn schedule_fires_after_delay() {
340        tokio::time::pause();
341
342        let released = Arc::new(parking_lot::Mutex::new(Vec::new()));
343        let (core, node, binding) = make_test_core(released.clone());
344        let weak = core.weak_handle();
345
346        let emitted = Arc::new(parking_lot::Mutex::new(Vec::<HandleId>::new()));
347        let em = emitted.clone();
348        let _sub = core.subscribe(
349            node,
350            Arc::new(move |msgs| {
351                for m in msgs {
352                    if let crate::message::Message::Data(h) = m {
353                        em.lock().push(*h);
354                    }
355                }
356            }),
357        );
358
359        let task = spawn_timer_task(weak, node, binding.clone());
360        let h1 = HandleId::new(42);
361        binding.retain_handle(h1);
362
363        task.sender()
364            .send(TimerCmd::Schedule {
365                tag: 0,
366                delay: Duration::from_millis(50),
367                handle: h1,
368            })
369            .unwrap();
370
371        settle().await; // task processes Schedule, enters sleep_until
372
373        tokio::time::advance(Duration::from_millis(51)).await;
374        settle().await; // task fires timer, emit → wave → sink
375
376        let got = emitted.lock().clone();
377        assert_eq!(got, vec![h1], "timer should have emitted h1");
378    }
379
380    #[tokio::test]
381    async fn cancel_releases_handle() {
382        tokio::time::pause();
383
384        let released = Arc::new(parking_lot::Mutex::new(Vec::new()));
385        let (core, node, binding) = make_test_core(released.clone());
386        let weak = core.weak_handle();
387
388        let task = spawn_timer_task(weak, node, binding.clone());
389        let h1 = HandleId::new(42);
390        binding.retain_handle(h1);
391
392        task.sender()
393            .send(TimerCmd::Schedule {
394                tag: 0,
395                delay: Duration::from_millis(100),
396                handle: h1,
397            })
398            .unwrap();
399
400        task.sender().send(TimerCmd::Cancel { tag: 0 }).unwrap();
401        settle().await;
402
403        assert!(
404            released.lock().contains(&h1),
405            "cancelled handle should be released"
406        );
407    }
408
409    #[tokio::test]
410    async fn reschedule_same_tag_cancels_previous() {
411        tokio::time::pause();
412
413        let released = Arc::new(parking_lot::Mutex::new(Vec::new()));
414        let (core, node, binding) = make_test_core(released.clone());
415        let weak = core.weak_handle();
416
417        let emitted = Arc::new(parking_lot::Mutex::new(Vec::<HandleId>::new()));
418        let em = emitted.clone();
419        let _sub = core.subscribe(
420            node,
421            Arc::new(move |msgs| {
422                for m in msgs {
423                    if let crate::message::Message::Data(h) = m {
424                        em.lock().push(*h);
425                    }
426                }
427            }),
428        );
429
430        let task = spawn_timer_task(weak, node, binding.clone());
431        let h1 = HandleId::new(10);
432        let h2 = HandleId::new(20);
433        binding.retain_handle(h1);
434        binding.retain_handle(h2);
435
436        // Schedule h1 at tag 0.
437        task.sender()
438            .send(TimerCmd::Schedule {
439                tag: 0,
440                delay: Duration::from_millis(100),
441                handle: h1,
442            })
443            .unwrap();
444
445        // Reschedule tag 0 with h2 — cancels h1.
446        task.sender()
447            .send(TimerCmd::Schedule {
448                tag: 0,
449                delay: Duration::from_millis(50),
450                handle: h2,
451            })
452            .unwrap();
453
454        settle().await;
455
456        // h1 released (cancelled).
457        assert!(released.lock().contains(&h1));
458
459        // Advance past h2's deadline.
460        tokio::time::advance(Duration::from_millis(51)).await;
461        settle().await;
462
463        let got = emitted.lock().clone();
464        assert_eq!(got, vec![h2], "only h2 should fire");
465    }
466
467    #[tokio::test]
468    async fn drop_handle_releases_pending() {
469        tokio::time::pause();
470
471        let released = Arc::new(parking_lot::Mutex::new(Vec::new()));
472        let (core, node, binding) = make_test_core(released.clone());
473        let weak = core.weak_handle();
474
475        let mut task = spawn_timer_task(weak, node, binding.clone());
476        let h1 = HandleId::new(42);
477        binding.retain_handle(h1);
478
479        task.sender()
480            .send(TimerCmd::Schedule {
481                tag: 0,
482                delay: Duration::from_secs(1),
483                handle: h1,
484            })
485            .unwrap();
486
487        settle().await;
488
489        // Shutdown (simulates operator deactivation).
490        task.shutdown();
491        settle().await;
492
493        assert!(
494            released.lock().contains(&h1),
495            "pending handle should be released on shutdown"
496        );
497    }
498
499    #[tokio::test]
500    async fn multiple_tags_fire_independently() {
501        tokio::time::pause();
502
503        let released = Arc::new(parking_lot::Mutex::new(Vec::new()));
504        let (core, node, binding) = make_test_core(released.clone());
505        let weak = core.weak_handle();
506
507        let emitted = Arc::new(parking_lot::Mutex::new(Vec::<HandleId>::new()));
508        let em = emitted.clone();
509        let _sub = core.subscribe(
510            node,
511            Arc::new(move |msgs| {
512                for m in msgs {
513                    if let crate::message::Message::Data(h) = m {
514                        em.lock().push(*h);
515                    }
516                }
517            }),
518        );
519
520        let task = spawn_timer_task(weak, node, binding.clone());
521        let h1 = HandleId::new(10);
522        let h2 = HandleId::new(20);
523        binding.retain_handle(h1);
524        binding.retain_handle(h2);
525
526        // Tag 0 at 100ms, tag 1 at 50ms.
527        task.sender()
528            .send(TimerCmd::Schedule {
529                tag: 0,
530                delay: Duration::from_millis(100),
531                handle: h1,
532            })
533            .unwrap();
534        task.sender()
535            .send(TimerCmd::Schedule {
536                tag: 1,
537                delay: Duration::from_millis(50),
538                handle: h2,
539            })
540            .unwrap();
541
542        // Yield so the task processes both Schedule commands.
543        settle().await; // task processes both Schedule commands
544
545        // Advance to 51ms — h2 fires.
546        tokio::time::advance(Duration::from_millis(51)).await;
547        settle().await;
548        assert_eq!(*emitted.lock(), vec![h2]);
549
550        // Advance to 101ms — h1 fires.
551        tokio::time::advance(Duration::from_millis(50)).await;
552        settle().await;
553        assert_eq!(*emitted.lock(), vec![h2, h1]);
554    }
555}