dynomite/core/task.rs
1//! Periodic task scheduler built on `tokio::time::interval`.
2//!
3//! The C engine ships a custom red-black-tree-backed task manager
4//! (`schedule_task_1`, `time_to_next_task`, `execute_expired_tasks`)
5//! that fires one-shot timeouts from the main loop. The Rust port
6//! delegates timer wheel maintenance to the tokio runtime: callers
7//! register a periodic callback through [`task_register`] and receive
8//! a [`TaskHandle`] that cancels the underlying tokio task when
9//! dropped or explicitly cancelled.
10//!
11//! A one-shot variant, [`task_schedule_once`], is provided for parity
12//! with `schedule_task_1`. The runtime drives both APIs, so the
13//! per-iteration `time_to_next_task` and `execute_expired_tasks`
14//! helpers from the C reference have no Rust counterpart; tokio's
15//! reactor performs the equivalent work transparently.
16//!
17//! # Examples
18//!
19//! ```
20//! use std::sync::atomic::{AtomicUsize, Ordering};
21//! use std::sync::Arc;
22//! use std::time::Duration;
23//! use dynomite::core::task::task_register;
24//!
25//! let rt = tokio::runtime::Runtime::new().unwrap();
26//! rt.block_on(async {
27//! let counter = Arc::new(AtomicUsize::new(0));
28//! let c = counter.clone();
29//! let handle = task_register(Duration::from_millis(5), Arc::new(move || {
30//! c.fetch_add(1, Ordering::Relaxed);
31//! }));
32//! tokio::time::sleep(Duration::from_millis(40)).await;
33//! handle.cancel();
34//! assert!(counter.load(Ordering::Relaxed) >= 1);
35//! });
36//! ```
37
38use std::sync::Arc;
39use std::time::Duration;
40
41use tokio_util::sync::CancellationToken;
42
43/// A handle that cancels a registered task.
44///
45/// Dropping the handle without calling [`TaskHandle::cancel`] leaves
46/// the task running (the tokio task holds a clone of the cancellation
47/// token). Call [`TaskHandle::cancel`] to stop the task explicitly.
48///
49/// # Examples
50///
51/// ```
52/// use std::sync::Arc;
53/// use std::time::Duration;
54/// use dynomite::core::task::task_register;
55///
56/// let rt = tokio::runtime::Runtime::new().unwrap();
57/// rt.block_on(async {
58/// let h = task_register(Duration::from_millis(50), Arc::new(|| {}));
59/// assert!(!h.is_cancelled());
60/// h.cancel();
61/// assert!(h.is_cancelled());
62/// });
63/// ```
64#[derive(Debug, Clone)]
65pub struct TaskHandle {
66 token: CancellationToken,
67}
68
69impl TaskHandle {
70 /// Cancel the task.
71 ///
72 /// # Examples
73 ///
74 /// ```
75 /// use std::sync::Arc;
76 /// use std::time::Duration;
77 /// use dynomite::core::task::task_register;
78 ///
79 /// let rt = tokio::runtime::Runtime::new().unwrap();
80 /// rt.block_on(async {
81 /// let h = task_register(Duration::from_millis(50), Arc::new(|| {}));
82 /// h.cancel();
83 /// assert!(h.is_cancelled());
84 /// });
85 /// ```
86 pub fn cancel(&self) {
87 self.token.cancel();
88 }
89
90 /// Whether the task has already been cancelled.
91 ///
92 /// # Examples
93 ///
94 /// ```
95 /// use std::sync::Arc;
96 /// use std::time::Duration;
97 /// use dynomite::core::task::task_register;
98 ///
99 /// let rt = tokio::runtime::Runtime::new().unwrap();
100 /// rt.block_on(async {
101 /// let h = task_register(Duration::from_millis(50), Arc::new(|| {}));
102 /// assert!(!h.is_cancelled());
103 /// h.cancel();
104 /// assert!(h.is_cancelled());
105 /// });
106 /// ```
107 pub fn is_cancelled(&self) -> bool {
108 self.token.is_cancelled()
109 }
110}
111
112/// Register a periodic task that fires its callback every `period`.
113///
114/// The first invocation occurs after `period` elapses. The task runs
115/// on the current tokio runtime, so this function must be called from
116/// inside one (e.g. inside `#[tokio::main]` or a `block_on`).
117///
118/// # Examples
119///
120/// ```
121/// use std::sync::atomic::{AtomicUsize, Ordering};
122/// use std::sync::Arc;
123/// use std::time::Duration;
124/// use dynomite::core::task::task_register;
125///
126/// let rt = tokio::runtime::Runtime::new().unwrap();
127/// rt.block_on(async {
128/// let n = Arc::new(AtomicUsize::new(0));
129/// let nn = n.clone();
130/// let handle = task_register(Duration::from_millis(2), Arc::new(move || {
131/// nn.fetch_add(1, Ordering::Relaxed);
132/// }));
133/// tokio::time::sleep(Duration::from_millis(15)).await;
134/// handle.cancel();
135/// });
136/// ```
137pub fn task_register(period: Duration, callback: Arc<dyn Fn() + Send + Sync>) -> TaskHandle {
138 let token = CancellationToken::new();
139 let child = token.clone();
140 tokio::spawn(async move {
141 let mut interval = tokio::time::interval(period);
142 // Skip the immediate-fire first tick that `tokio::time::interval`
143 // produces by default; the C reference fires after `timeout` ms,
144 // not at registration time.
145 interval.tick().await;
146 loop {
147 tokio::select! {
148 () = child.cancelled() => break,
149 _ = interval.tick() => callback(),
150 }
151 }
152 });
153 TaskHandle { token }
154}
155
156/// Register a one-shot task that fires once after `delay` elapses.
157///
158/// Provided for parity with the C `schedule_task_1` shape. The handle
159/// can be cancelled before the deadline to suppress execution.
160///
161/// # Examples
162///
163/// ```
164/// use std::sync::atomic::{AtomicBool, Ordering};
165/// use std::sync::Arc;
166/// use std::time::Duration;
167/// use dynomite::core::task::task_schedule_once;
168///
169/// let rt = tokio::runtime::Runtime::new().unwrap();
170/// rt.block_on(async {
171/// let fired = Arc::new(AtomicBool::new(false));
172/// let f = fired.clone();
173/// let _h = task_schedule_once(Duration::from_millis(5), Box::new(move || {
174/// f.store(true, Ordering::Relaxed);
175/// }));
176/// tokio::time::sleep(Duration::from_millis(20)).await;
177/// assert!(fired.load(Ordering::Relaxed));
178/// });
179/// ```
180pub fn task_schedule_once(delay: Duration, callback: Box<dyn FnOnce() + Send>) -> TaskHandle {
181 let token = CancellationToken::new();
182 let child = token.clone();
183 tokio::spawn(async move {
184 tokio::select! {
185 () = child.cancelled() => {}
186 () = tokio::time::sleep(delay) => {
187 callback();
188 }
189 }
190 });
191 TaskHandle { token }
192}
193
194#[cfg(test)]
195mod tests {
196 use std::sync::atomic::{AtomicUsize, Ordering};
197 use std::sync::Arc;
198 use std::time::Duration;
199
200 use super::*;
201
202 #[tokio::test(start_paused = true)]
203 async fn periodic_task_fires_multiple_times() {
204 let n = Arc::new(AtomicUsize::new(0));
205 let nn = n.clone();
206 let handle = task_register(
207 Duration::from_millis(5),
208 Arc::new(move || {
209 nn.fetch_add(1, Ordering::Relaxed);
210 }),
211 );
212 // Yield once so the spawned task gets polled and its
213 // interval is registered with the paused clock before
214 // the first advance.
215 tokio::task::yield_now().await;
216 // tokio::time is paused; advance() drives the timer
217 // deterministically without depending on wall-clock
218 // scheduling, eliminating CI flake.
219 for _ in 0..8 {
220 tokio::time::advance(Duration::from_millis(5)).await;
221 tokio::task::yield_now().await;
222 }
223 handle.cancel();
224 let after_cancel = n.load(Ordering::Relaxed);
225 assert!(after_cancel >= 2, "expected >=2 fires, got {after_cancel}");
226 tokio::time::advance(Duration::from_millis(20)).await;
227 tokio::task::yield_now().await;
228 // No fires after cancel.
229 let final_count = n.load(Ordering::Relaxed);
230 assert!(
231 final_count <= after_cancel + 1,
232 "task fired after cancel: before={after_cancel} after={final_count}"
233 );
234 }
235
236 #[tokio::test]
237 async fn cancel_before_first_tick_suppresses_callback() {
238 let n = Arc::new(AtomicUsize::new(0));
239 let nn = n.clone();
240 let handle = task_register(
241 Duration::from_millis(50),
242 Arc::new(move || {
243 nn.fetch_add(1, Ordering::Relaxed);
244 }),
245 );
246 handle.cancel();
247 tokio::time::sleep(Duration::from_millis(80)).await;
248 assert_eq!(n.load(Ordering::Relaxed), 0);
249 }
250
251 #[tokio::test]
252 async fn one_shot_fires_exactly_once() {
253 let n = Arc::new(AtomicUsize::new(0));
254 let nn = n.clone();
255 let _handle = task_schedule_once(
256 Duration::from_millis(5),
257 Box::new(move || {
258 nn.fetch_add(1, Ordering::Relaxed);
259 }),
260 );
261 tokio::time::sleep(Duration::from_millis(40)).await;
262 assert_eq!(n.load(Ordering::Relaxed), 1);
263 }
264
265 #[tokio::test]
266 async fn one_shot_can_be_cancelled() {
267 let n = Arc::new(AtomicUsize::new(0));
268 let nn = n.clone();
269 let handle = task_schedule_once(
270 Duration::from_millis(50),
271 Box::new(move || {
272 nn.fetch_add(1, Ordering::Relaxed);
273 }),
274 );
275 handle.cancel();
276 tokio::time::sleep(Duration::from_millis(80)).await;
277 assert_eq!(n.load(Ordering::Relaxed), 0);
278 assert!(handle.is_cancelled());
279 }
280}