Skip to main content

dynomite/core/
task.rs

1//! Periodic task scheduler built on `tokio::time::interval`.
2//!
3//! The C engine ships a custom red-black-tree-backed task manager
4//! (`schedule_task_1`, `time_to_next_task`, `execute_expired_tasks`)
5//! that fires one-shot timeouts from the main loop. The Rust port
6//! delegates timer wheel maintenance to the tokio runtime: callers
7//! register a periodic callback through [`task_register`] and receive
8//! a [`TaskHandle`] that cancels the underlying tokio task when
9//! dropped or explicitly cancelled.
10//!
11//! A one-shot variant, [`task_schedule_once`], is provided for parity
12//! with `schedule_task_1`. The runtime drives both APIs, so the
13//! per-iteration `time_to_next_task` and `execute_expired_tasks`
14//! helpers from the C reference have no Rust counterpart; tokio's
15//! reactor performs the equivalent work transparently.
16//!
17//! # Examples
18//!
19//! ```
20//! use std::sync::atomic::{AtomicUsize, Ordering};
21//! use std::sync::Arc;
22//! use std::time::Duration;
23//! use dynomite::core::task::task_register;
24//!
25//! let rt = tokio::runtime::Runtime::new().unwrap();
26//! rt.block_on(async {
27//!     let counter = Arc::new(AtomicUsize::new(0));
28//!     let c = counter.clone();
29//!     let handle = task_register(Duration::from_millis(5), Arc::new(move || {
30//!         c.fetch_add(1, Ordering::Relaxed);
31//!     }));
32//!     tokio::time::sleep(Duration::from_millis(40)).await;
33//!     handle.cancel();
34//!     assert!(counter.load(Ordering::Relaxed) >= 1);
35//! });
36//! ```
37
38use std::sync::Arc;
39use std::time::Duration;
40
41use tokio_util::sync::CancellationToken;
42
43/// A handle that cancels a registered task.
44///
45/// Dropping the handle without calling [`TaskHandle::cancel`] leaves
46/// the task running (the tokio task holds a clone of the cancellation
47/// token). Call [`TaskHandle::cancel`] to stop the task explicitly.
48///
49/// # Examples
50///
51/// ```
52/// use std::sync::Arc;
53/// use std::time::Duration;
54/// use dynomite::core::task::task_register;
55///
56/// let rt = tokio::runtime::Runtime::new().unwrap();
57/// rt.block_on(async {
58///     let h = task_register(Duration::from_millis(50), Arc::new(|| {}));
59///     assert!(!h.is_cancelled());
60///     h.cancel();
61///     assert!(h.is_cancelled());
62/// });
63/// ```
64#[derive(Debug, Clone)]
65pub struct TaskHandle {
66    token: CancellationToken,
67}
68
69impl TaskHandle {
70    /// Cancel the task.
71    ///
72    /// # Examples
73    ///
74    /// ```
75    /// use std::sync::Arc;
76    /// use std::time::Duration;
77    /// use dynomite::core::task::task_register;
78    ///
79    /// let rt = tokio::runtime::Runtime::new().unwrap();
80    /// rt.block_on(async {
81    ///     let h = task_register(Duration::from_millis(50), Arc::new(|| {}));
82    ///     h.cancel();
83    ///     assert!(h.is_cancelled());
84    /// });
85    /// ```
86    pub fn cancel(&self) {
87        self.token.cancel();
88    }
89
90    /// Whether the task has already been cancelled.
91    ///
92    /// # Examples
93    ///
94    /// ```
95    /// use std::sync::Arc;
96    /// use std::time::Duration;
97    /// use dynomite::core::task::task_register;
98    ///
99    /// let rt = tokio::runtime::Runtime::new().unwrap();
100    /// rt.block_on(async {
101    ///     let h = task_register(Duration::from_millis(50), Arc::new(|| {}));
102    ///     assert!(!h.is_cancelled());
103    ///     h.cancel();
104    ///     assert!(h.is_cancelled());
105    /// });
106    /// ```
107    pub fn is_cancelled(&self) -> bool {
108        self.token.is_cancelled()
109    }
110}
111
112/// Register a periodic task that fires its callback every `period`.
113///
114/// The first invocation occurs after `period` elapses. The task runs
115/// on the current tokio runtime, so this function must be called from
116/// inside one (e.g. inside `#[tokio::main]` or a `block_on`).
117///
118/// # Examples
119///
120/// ```
121/// use std::sync::atomic::{AtomicUsize, Ordering};
122/// use std::sync::Arc;
123/// use std::time::Duration;
124/// use dynomite::core::task::task_register;
125///
126/// let rt = tokio::runtime::Runtime::new().unwrap();
127/// rt.block_on(async {
128///     let n = Arc::new(AtomicUsize::new(0));
129///     let nn = n.clone();
130///     let handle = task_register(Duration::from_millis(2), Arc::new(move || {
131///         nn.fetch_add(1, Ordering::Relaxed);
132///     }));
133///     tokio::time::sleep(Duration::from_millis(15)).await;
134///     handle.cancel();
135/// });
136/// ```
137pub fn task_register(period: Duration, callback: Arc<dyn Fn() + Send + Sync>) -> TaskHandle {
138    let token = CancellationToken::new();
139    let child = token.clone();
140    tokio::spawn(async move {
141        let mut interval = tokio::time::interval(period);
142        // Skip the immediate-fire first tick that `tokio::time::interval`
143        // produces by default; the C reference fires after `timeout` ms,
144        // not at registration time.
145        interval.tick().await;
146        loop {
147            tokio::select! {
148                () = child.cancelled() => break,
149                _ = interval.tick() => callback(),
150            }
151        }
152    });
153    TaskHandle { token }
154}
155
156/// Register a one-shot task that fires once after `delay` elapses.
157///
158/// Provided for parity with the C `schedule_task_1` shape. The handle
159/// can be cancelled before the deadline to suppress execution.
160///
161/// # Examples
162///
163/// ```
164/// use std::sync::atomic::{AtomicBool, Ordering};
165/// use std::sync::Arc;
166/// use std::time::Duration;
167/// use dynomite::core::task::task_schedule_once;
168///
169/// let rt = tokio::runtime::Runtime::new().unwrap();
170/// rt.block_on(async {
171///     let fired = Arc::new(AtomicBool::new(false));
172///     let f = fired.clone();
173///     let _h = task_schedule_once(Duration::from_millis(5), Box::new(move || {
174///         f.store(true, Ordering::Relaxed);
175///     }));
176///     tokio::time::sleep(Duration::from_millis(20)).await;
177///     assert!(fired.load(Ordering::Relaxed));
178/// });
179/// ```
180pub fn task_schedule_once(delay: Duration, callback: Box<dyn FnOnce() + Send>) -> TaskHandle {
181    let token = CancellationToken::new();
182    let child = token.clone();
183    tokio::spawn(async move {
184        tokio::select! {
185            () = child.cancelled() => {}
186            () = tokio::time::sleep(delay) => {
187                callback();
188            }
189        }
190    });
191    TaskHandle { token }
192}
193
194#[cfg(test)]
195mod tests {
196    use std::sync::atomic::{AtomicUsize, Ordering};
197    use std::sync::Arc;
198    use std::time::Duration;
199
200    use super::*;
201
202    #[tokio::test(start_paused = true)]
203    async fn periodic_task_fires_multiple_times() {
204        let n = Arc::new(AtomicUsize::new(0));
205        let nn = n.clone();
206        let handle = task_register(
207            Duration::from_millis(5),
208            Arc::new(move || {
209                nn.fetch_add(1, Ordering::Relaxed);
210            }),
211        );
212        // Yield once so the spawned task gets polled and its
213        // interval is registered with the paused clock before
214        // the first advance.
215        tokio::task::yield_now().await;
216        // tokio::time is paused; advance() drives the timer
217        // deterministically without depending on wall-clock
218        // scheduling, eliminating CI flake.
219        for _ in 0..8 {
220            tokio::time::advance(Duration::from_millis(5)).await;
221            tokio::task::yield_now().await;
222        }
223        handle.cancel();
224        let after_cancel = n.load(Ordering::Relaxed);
225        assert!(after_cancel >= 2, "expected >=2 fires, got {after_cancel}");
226        tokio::time::advance(Duration::from_millis(20)).await;
227        tokio::task::yield_now().await;
228        // No fires after cancel.
229        let final_count = n.load(Ordering::Relaxed);
230        assert!(
231            final_count <= after_cancel + 1,
232            "task fired after cancel: before={after_cancel} after={final_count}"
233        );
234    }
235
236    #[tokio::test]
237    async fn cancel_before_first_tick_suppresses_callback() {
238        let n = Arc::new(AtomicUsize::new(0));
239        let nn = n.clone();
240        let handle = task_register(
241            Duration::from_millis(50),
242            Arc::new(move || {
243                nn.fetch_add(1, Ordering::Relaxed);
244            }),
245        );
246        handle.cancel();
247        tokio::time::sleep(Duration::from_millis(80)).await;
248        assert_eq!(n.load(Ordering::Relaxed), 0);
249    }
250
251    #[tokio::test]
252    async fn one_shot_fires_exactly_once() {
253        let n = Arc::new(AtomicUsize::new(0));
254        let nn = n.clone();
255        let _handle = task_schedule_once(
256            Duration::from_millis(5),
257            Box::new(move || {
258                nn.fetch_add(1, Ordering::Relaxed);
259            }),
260        );
261        tokio::time::sleep(Duration::from_millis(40)).await;
262        assert_eq!(n.load(Ordering::Relaxed), 1);
263    }
264
265    #[tokio::test]
266    async fn one_shot_can_be_cancelled() {
267        let n = Arc::new(AtomicUsize::new(0));
268        let nn = n.clone();
269        let handle = task_schedule_once(
270            Duration::from_millis(50),
271            Box::new(move || {
272                nn.fetch_add(1, Ordering::Relaxed);
273            }),
274        );
275        handle.cancel();
276        tokio::time::sleep(Duration::from_millis(80)).await;
277        assert_eq!(n.load(Ordering::Relaxed), 0);
278        assert!(handle.is_cancelled());
279    }
280}