Skip to main content

graphrefly_core/
timer.rs

1//! Tokio-based timer substrate for deferred emissions.
2//!
3//! Feature-gated behind `tokio`. Core's dispatcher stays fully synchronous;
4//! this module provides a minimal async boundary for timer scheduling.
5//!
6//! # Design (2026-05-11)
7//!
8//! Timer-dependent operators (debounce, throttle, delay, audit) and
9//! infrastructure consumers (storage `debounce_ms`) need to schedule
10//! deferred work — "emit this handle in 50ms" or "flush after 200ms of
11//! quiet." These are async concerns that Core's sync dispatcher can't
12//! own directly.
13//!
14//! The substrate provides:
15//!
16//! - [`TimerTaskHandle`] — command channel from sync code to a tokio task.
17//!   Non-blocking send; drop-to-shutdown.
18//! - [`spawn_timer_task`] — spawns a tokio task that manages tagged timer
19//!   slots for one node, wires the channel, returns the handle.
20//!
21//! Operators send [`TimerCmd`] from their sync fn-fire to schedule,
22//! cancel, or cancel-all timers. The task manages deadlines via
23//! `tokio::time` and, when a timer fires, posts an `Emit` request to
24//! the cross-thread `Arc<CoreMailbox>` (D223/D227/D230). The owner
25//! drains the mailbox via `Core::drain_mailbox`, applying each emit
26//! via the sync `Core::emit`. Under S2c/D248 single-owner `Core` this
27//! is the only cross-thread bridge into the otherwise `!Send` Core;
28//! the prior partition `wave_owner` serialization machinery is
29//! deleted (one owner thread, no cross-thread interleaving wave to
30//! serialize).
31//!
32//! Timer sources (fromTimer, interval) use the producer substrate +
33//! this module: the producer's build closure spawns a timer task on
34//! activation; deactivation drops the handle, shutting down the task.
35//!
36//! # Testing
37//!
38//! Use `tokio::time::pause()` + `tokio::time::advance()` for
39//! deterministic timer control in tests. No mock infrastructure needed.
40
41use std::sync::Arc;
42use std::time::Duration;
43
44use tokio::sync::mpsc;
45
46use crate::boundary::BindingBoundary;
47use crate::handle::{HandleId, NodeId};
48use crate::mailbox::CoreMailbox;
49
50// ---------------------------------------------------------------------------
51// Public types
52// ---------------------------------------------------------------------------
53
54/// Callback invoked when a timer fires. Runs on the tokio task thread.
55/// Must not block or hold locks across `Core::emit`.
56pub type TimerCallback = Box<dyn Fn() + Send + Sync>;
57
58/// Command sent from the operator fn (sync) to the timer task (async).
59pub enum TimerCmd {
60    /// Schedule a timer: after `delay`, emit `handle` on the node.
61    /// If a timer with the same `tag` is already pending, it is cancelled
62    /// and the old handle is released.
63    Schedule {
64        tag: u32,
65        delay: Duration,
66        handle: HandleId,
67    },
68    /// Schedule a callback-only timer: after `delay`, invoke `callback`.
69    /// No handle emission — the callback manages state and may call
70    /// `Core::emit` itself. Used by operators (debounce, throttle) that
71    /// need finer control than the emit-on-fire pattern.
72    ScheduleCallback {
73        tag: u32,
74        delay: Duration,
75        callback: TimerCallback,
76    },
77    /// Cancel a pending timer by tag. The held handle (if any) is released.
78    /// Callback-only timers have no handle to release.
79    Cancel { tag: u32 },
80    /// Cancel all pending timers and release all held handles.
81    CancelAll,
82}
83
84impl std::fmt::Debug for TimerCmd {
85    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
86        match self {
87            Self::Schedule { tag, delay, handle } => f
88                .debug_struct("Schedule")
89                .field("tag", tag)
90                .field("delay", delay)
91                .field("handle", handle)
92                .finish(),
93            Self::ScheduleCallback { tag, delay, .. } => f
94                .debug_struct("ScheduleCallback")
95                .field("tag", tag)
96                .field("delay", delay)
97                .finish_non_exhaustive(),
98            Self::Cancel { tag } => f.debug_struct("Cancel").field("tag", tag).finish(),
99            Self::CancelAll => write!(f, "CancelAll"),
100        }
101    }
102}
103
104/// Sender half — stored in the operator's scratch for sync command dispatch.
105pub type TimerSender = mpsc::UnboundedSender<TimerCmd>;
106
107/// Handle to a running timer task. Dropping this shuts down the task.
108#[must_use]
109pub struct TimerTaskHandle {
110    tx: Option<mpsc::UnboundedSender<TimerCmd>>,
111}
112
113impl TimerTaskHandle {
114    /// Get a clone of the sender for use in operator scratch.
115    ///
116    /// # Panics
117    ///
118    /// Panics if called after `shutdown()`.
119    #[must_use]
120    pub fn sender(&self) -> TimerSender {
121        self.tx
122            .as_ref()
123            .expect("TimerTaskHandle: sender already taken via shutdown")
124            .clone()
125    }
126
127    /// Explicitly shut down the task. Sends `CancelAll` then drops the
128    /// sender so the task exits cleanly.
129    pub fn shutdown(&mut self) {
130        if let Some(tx) = self.tx.take() {
131            let _ = tx.send(TimerCmd::CancelAll);
132            // Drop tx -> channel closes -> task exits on next poll.
133        }
134    }
135}
136
137impl Drop for TimerTaskHandle {
138    fn drop(&mut self) {
139        self.shutdown();
140    }
141}
142
143// ---------------------------------------------------------------------------
144// Task spawn
145// ---------------------------------------------------------------------------
146
147/// Spawn a timer task for a single operator/source node.
148///
149/// The task runs until the channel closes (sender dropped). It manages
150/// pending timers tagged by `u32`. When a timer fires, it calls
151/// `core.emit(node_id, handle)`.
152///
153/// # Panics
154///
155/// Must be called from within a tokio runtime context.
156pub fn spawn_timer_task(
157    mailbox: Arc<CoreMailbox>,
158    node_id: NodeId,
159    binding: Arc<dyn BindingBoundary>,
160) -> TimerTaskHandle {
161    let (tx, rx) = mpsc::unbounded_channel();
162    tokio::spawn(timer_task_loop(rx, mailbox, node_id, binding));
163    TimerTaskHandle { tx: Some(tx) }
164}
165
166// ---------------------------------------------------------------------------
167// Task loop
168// ---------------------------------------------------------------------------
169
170/// Internal: the async task that manages timers for one node.
171async fn timer_task_loop(
172    mut rx: mpsc::UnboundedReceiver<TimerCmd>,
173    mailbox: Arc<CoreMailbox>,
174    node_id: NodeId,
175    binding: Arc<dyn BindingBoundary>,
176) {
177    let mut slots: Vec<TimerSlot> = Vec::new();
178
179    loop {
180        // Find the nearest deadline among active slots.
181        let next_fire = nearest_deadline(&slots);
182
183        tokio::select! {
184            biased; // Prefer commands over timer fires to process cancels first.
185
186            cmd = rx.recv() => {
187                match cmd {
188                    Some(TimerCmd::Schedule { tag, delay, handle }) => {
189                        cancel_slot(&mut slots, tag, &*binding);
190                        let deadline = tokio::time::Instant::now() + delay;
191                        slots.push(TimerSlot { tag, kind: TimerSlotKind::Emit(handle), deadline });
192                    }
193                    Some(TimerCmd::ScheduleCallback { tag, delay, callback }) => {
194                        cancel_slot(&mut slots, tag, &*binding);
195                        let deadline = tokio::time::Instant::now() + delay;
196                        slots.push(TimerSlot { tag, kind: TimerSlotKind::Callback(callback), deadline });
197                    }
198                    Some(TimerCmd::Cancel { tag }) => {
199                        cancel_slot(&mut slots, tag, &*binding);
200                    }
201                    Some(TimerCmd::CancelAll) => {
202                        release_all_slots(&mut slots, &*binding);
203                    }
204                    None => {
205                        // Channel closed — operator deactivated / torn down.
206                        release_all_slots(&mut slots, &*binding);
207                        return;
208                    }
209                }
210            }
211
212            () = sleep_until_or_forever(next_fire) => {
213                // At least one timer fired. Drain all expired slots.
214                let now = tokio::time::Instant::now();
215                let mut i = 0;
216                while i < slots.len() {
217                    if slots[i].deadline <= now {
218                        let slot = slots.swap_remove(i);
219                        match slot.kind {
220                            TimerSlotKind::Emit(handle) => {
221                                // D227/D230: post to the `Send + Sync`
222                                // mailbox instead of upgrading a `Weak<C>`
223                                // (deleted in S2b — the `Core` relocates
224                                // between workers). The owner applies it
225                                // via `Core::drain_mailbox` → sync `emit`
226                                // (no async in Core). `post_emit` returns
227                                // `false` iff the owning `Core` already
228                                // dropped — the exact teardown branch the
229                                // old `WeakCore::upgrade() == None` took
230                                // (release the handle, drain the rest,
231                                // exit the task).
232                                if !mailbox.post_emit(node_id, handle) {
233                                    binding.release_handle(handle);
234                                    release_all_slots(&mut slots, &*binding);
235                                    return;
236                                }
237                            }
238                            TimerSlotKind::Callback(cb) => {
239                                cb();
240                            }
241                        }
242                        // Don't increment i — swap_remove moved the last element here.
243                    } else {
244                        i += 1;
245                    }
246                }
247            }
248        }
249    }
250}
251
252// ---------------------------------------------------------------------------
253// Internal helpers
254// ---------------------------------------------------------------------------
255
256enum TimerSlotKind {
257    /// Emit handle on fire; release on cancel.
258    Emit(HandleId),
259    /// Invoke callback on fire; nothing to release on cancel.
260    Callback(TimerCallback),
261}
262
263struct TimerSlot {
264    tag: u32,
265    kind: TimerSlotKind,
266    deadline: tokio::time::Instant,
267}
268
269fn cancel_slot(slots: &mut Vec<TimerSlot>, tag: u32, binding: &dyn BindingBoundary) {
270    if let Some(pos) = slots.iter().position(|s| s.tag == tag) {
271        let slot = slots.swap_remove(pos);
272        if let TimerSlotKind::Emit(h) = slot.kind {
273            binding.release_handle(h);
274        }
275    }
276}
277
278fn release_all_slots(slots: &mut Vec<TimerSlot>, binding: &dyn BindingBoundary) {
279    for slot in slots.drain(..) {
280        if let TimerSlotKind::Emit(h) = slot.kind {
281            binding.release_handle(h);
282        }
283    }
284}
285
286fn nearest_deadline(slots: &[TimerSlot]) -> Option<tokio::time::Instant> {
287    slots.iter().map(|s| s.deadline).min()
288}
289
290/// Sleep until the given instant, or sleep forever if `None`.
291async fn sleep_until_or_forever(deadline: Option<tokio::time::Instant>) {
292    match deadline {
293        Some(d) => tokio::time::sleep_until(d).await,
294        None => std::future::pending::<()>().await,
295    }
296}
297
298// ---------------------------------------------------------------------------
299// Tests
300// ---------------------------------------------------------------------------
301
302#[cfg(test)]
303mod tests {
304    use super::*;
305    use crate::boundary::{DepBatch, FnResult};
306    use crate::handle::FnId;
307
308    /// Yield multiple times to let spawned tasks process commands, then
309    /// drain the mailbox owner-side (D230). Timer tasks post `Emit`s to
310    /// the `CoreMailbox`; `drain_mailbox` applies them via the sync
311    /// `emit` → wave → sink. This is the embedder's pump point and is
312    /// behaviour-identical to the deleted autonomous
313    /// `WeakCore::upgrade → core.emit`: the test already had to advance
314    /// the runtime for the timer task to fire at all. Draining an empty
315    /// mailbox is an idempotent no-op (safe before any timer fires).
316    async fn settle(core: &crate::node::Core) {
317        for _ in 0..10 {
318            tokio::task::yield_now().await;
319        }
320        core.drain_mailbox();
321    }
322
323    /// Minimal binding for timer tests — tracks released handles.
324    struct TimerTestBinding {
325        released: Arc<parking_lot::Mutex<Vec<HandleId>>>,
326    }
327
328    impl TimerTestBinding {
329        fn new(released: Arc<parking_lot::Mutex<Vec<HandleId>>>) -> Self {
330            Self { released }
331        }
332    }
333
334    impl BindingBoundary for TimerTestBinding {
335        fn invoke_fn(&self, _node_id: NodeId, _fn_id: FnId, _dep_data: &[DepBatch]) -> FnResult {
336            FnResult::Noop { tracked: None }
337        }
338
339        fn custom_equals(&self, _equals_handle: FnId, _a: HandleId, _b: HandleId) -> bool {
340            false
341        }
342
343        fn release_handle(&self, h: HandleId) {
344            self.released.lock().push(h);
345        }
346    }
347
348    fn make_test_core(
349        released: Arc<parking_lot::Mutex<Vec<HandleId>>>,
350    ) -> (crate::node::Core, NodeId, Arc<dyn BindingBoundary>) {
351        let binding: Arc<dyn BindingBoundary> = Arc::new(TimerTestBinding::new(released));
352        let core = crate::node::Core::new(binding.clone());
353        let s = core
354            .register_state(crate::handle::NO_HANDLE, false)
355            .unwrap();
356        (core, s, binding)
357    }
358
359    #[tokio::test]
360    async fn schedule_fires_after_delay() {
361        tokio::time::pause();
362
363        let released = Arc::new(parking_lot::Mutex::new(Vec::new()));
364        let (core, node, binding) = make_test_core(released.clone());
365        let mailbox = core.mailbox();
366
367        let emitted = Arc::new(parking_lot::Mutex::new(Vec::<HandleId>::new()));
368        let em = emitted.clone();
369        let _sub = core.subscribe(
370            node,
371            Arc::new(move |msgs| {
372                for m in msgs {
373                    if let crate::message::Message::Data(h) = m {
374                        em.lock().push(*h);
375                    }
376                }
377            }),
378        );
379
380        let task = spawn_timer_task(mailbox, node, binding.clone());
381        let h1 = HandleId::new(42);
382        binding.retain_handle(h1);
383
384        task.sender()
385            .send(TimerCmd::Schedule {
386                tag: 0,
387                delay: Duration::from_millis(50),
388                handle: h1,
389            })
390            .unwrap();
391
392        settle(&core).await; // task processes Schedule, enters sleep_until
393
394        tokio::time::advance(Duration::from_millis(51)).await;
395        settle(&core).await; // task fires timer, emit → wave → sink
396
397        let got = emitted.lock().clone();
398        assert_eq!(got, vec![h1], "timer should have emitted h1");
399    }
400
401    #[tokio::test]
402    async fn cancel_releases_handle() {
403        tokio::time::pause();
404
405        let released = Arc::new(parking_lot::Mutex::new(Vec::new()));
406        let (core, node, binding) = make_test_core(released.clone());
407        let mailbox = core.mailbox();
408
409        let task = spawn_timer_task(mailbox, node, binding.clone());
410        let h1 = HandleId::new(42);
411        binding.retain_handle(h1);
412
413        task.sender()
414            .send(TimerCmd::Schedule {
415                tag: 0,
416                delay: Duration::from_millis(100),
417                handle: h1,
418            })
419            .unwrap();
420
421        task.sender().send(TimerCmd::Cancel { tag: 0 }).unwrap();
422        settle(&core).await;
423
424        assert!(
425            released.lock().contains(&h1),
426            "cancelled handle should be released"
427        );
428    }
429
430    #[tokio::test]
431    async fn reschedule_same_tag_cancels_previous() {
432        tokio::time::pause();
433
434        let released = Arc::new(parking_lot::Mutex::new(Vec::new()));
435        let (core, node, binding) = make_test_core(released.clone());
436        let mailbox = core.mailbox();
437
438        let emitted = Arc::new(parking_lot::Mutex::new(Vec::<HandleId>::new()));
439        let em = emitted.clone();
440        let _sub = core.subscribe(
441            node,
442            Arc::new(move |msgs| {
443                for m in msgs {
444                    if let crate::message::Message::Data(h) = m {
445                        em.lock().push(*h);
446                    }
447                }
448            }),
449        );
450
451        let task = spawn_timer_task(mailbox, node, binding.clone());
452        let h1 = HandleId::new(10);
453        let h2 = HandleId::new(20);
454        binding.retain_handle(h1);
455        binding.retain_handle(h2);
456
457        // Schedule h1 at tag 0.
458        task.sender()
459            .send(TimerCmd::Schedule {
460                tag: 0,
461                delay: Duration::from_millis(100),
462                handle: h1,
463            })
464            .unwrap();
465
466        // Reschedule tag 0 with h2 — cancels h1.
467        task.sender()
468            .send(TimerCmd::Schedule {
469                tag: 0,
470                delay: Duration::from_millis(50),
471                handle: h2,
472            })
473            .unwrap();
474
475        settle(&core).await;
476
477        // h1 released (cancelled).
478        assert!(released.lock().contains(&h1));
479
480        // Advance past h2's deadline.
481        tokio::time::advance(Duration::from_millis(51)).await;
482        settle(&core).await;
483
484        let got = emitted.lock().clone();
485        assert_eq!(got, vec![h2], "only h2 should fire");
486    }
487
488    #[tokio::test]
489    async fn drop_handle_releases_pending() {
490        tokio::time::pause();
491
492        let released = Arc::new(parking_lot::Mutex::new(Vec::new()));
493        let (core, node, binding) = make_test_core(released.clone());
494        let mailbox = core.mailbox();
495
496        let mut task = spawn_timer_task(mailbox, node, binding.clone());
497        let h1 = HandleId::new(42);
498        binding.retain_handle(h1);
499
500        task.sender()
501            .send(TimerCmd::Schedule {
502                tag: 0,
503                delay: Duration::from_secs(1),
504                handle: h1,
505            })
506            .unwrap();
507
508        settle(&core).await;
509
510        // Shutdown (simulates operator deactivation).
511        task.shutdown();
512        settle(&core).await;
513
514        assert!(
515            released.lock().contains(&h1),
516            "pending handle should be released on shutdown"
517        );
518    }
519
520    #[tokio::test]
521    async fn multiple_tags_fire_independently() {
522        tokio::time::pause();
523
524        let released = Arc::new(parking_lot::Mutex::new(Vec::new()));
525        let (core, node, binding) = make_test_core(released.clone());
526        let mailbox = core.mailbox();
527
528        let emitted = Arc::new(parking_lot::Mutex::new(Vec::<HandleId>::new()));
529        let em = emitted.clone();
530        let _sub = core.subscribe(
531            node,
532            Arc::new(move |msgs| {
533                for m in msgs {
534                    if let crate::message::Message::Data(h) = m {
535                        em.lock().push(*h);
536                    }
537                }
538            }),
539        );
540
541        let task = spawn_timer_task(mailbox, node, binding.clone());
542        let h1 = HandleId::new(10);
543        let h2 = HandleId::new(20);
544        binding.retain_handle(h1);
545        binding.retain_handle(h2);
546
547        // Tag 0 at 100ms, tag 1 at 50ms.
548        task.sender()
549            .send(TimerCmd::Schedule {
550                tag: 0,
551                delay: Duration::from_millis(100),
552                handle: h1,
553            })
554            .unwrap();
555        task.sender()
556            .send(TimerCmd::Schedule {
557                tag: 1,
558                delay: Duration::from_millis(50),
559                handle: h2,
560            })
561            .unwrap();
562
563        // Yield so the task processes both Schedule commands.
564        settle(&core).await; // task processes both Schedule commands
565
566        // Advance to 51ms — h2 fires.
567        tokio::time::advance(Duration::from_millis(51)).await;
568        settle(&core).await;
569        assert_eq!(*emitted.lock(), vec![h2]);
570
571        // Advance to 101ms — h1 fires.
572        tokio::time::advance(Duration::from_millis(50)).await;
573        settle(&core).await;
574        assert_eq!(*emitted.lock(), vec![h2, h1]);
575    }
576}