Skip to main content

hyperi_rustlib/concurrency/
periodic.rs

1// Project:   hyperi-rustlib
2// File:      src/concurrency/periodic.rs
3// Purpose:   PeriodicWorker -- timer-driven loop with biased shutdown
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! Timer-driven background loop, generic over the task type.
10//!
11//! Use for everything that's "do X every N seconds": config reload
12//! polling, scaling pressure recompute, periodic health snapshots,
13//! version-check pings to crates.io. Encapsulates the
14//! `tokio::time::interval` + `tokio::select! { biased; shutdown | tick }`
15//! boilerplate that would otherwise be open-coded per call site.
16//!
17//! # Shape
18//!
19//! ```text
20//! tokio::time::interval ──tick──► actor task ──tick()──► PeriodicTask impl
21//!                                     ▲
22//!                                     │ biased select
23//!                                     │
24//!                              CancellationToken
25//! ```
26//!
27//! Tick errors are logged at WARN and do NOT terminate the worker --
28//! the next tick still fires. Use the `shutdown` hook for cleanup.
29
30use std::future::Future;
31use std::time::Duration;
32
33use tokio::task::JoinHandle;
34use tokio::time::{MissedTickBehavior, interval};
35use tokio_util::sync::CancellationToken;
36use tracing::warn;
37
38use super::error::TickError;
39
40/// A task that runs on a fixed interval.
41///
42/// Implementations may hold mutable state -- `tick` takes `&mut self`.
43/// Replies / outputs propagate through the state itself (`AtomicU64`
44/// counters, `ArcSwap<T>` config handles, etc.) -- `tick` returns
45/// `Result<(), TickError>` purely for error signalling.
46pub trait PeriodicTask: Send + 'static {
47    /// Called once per interval tick.
48    fn tick(&mut self) -> impl Future<Output = Result<(), TickError>> + Send;
49
50    /// Called once after the shutdown token fires, before the worker
51    /// task exits. Default: no-op.
52    fn shutdown(&mut self) -> impl Future<Output = Result<(), TickError>> + Send {
53        std::future::ready(Ok(()))
54    }
55}
56
57/// Handle for the worker task.
58///
59/// Dropping the handle does NOT abort the task -- the task lives until
60/// the `CancellationToken` is fired or the runtime shuts down. Use
61/// [`Self::join`] for graceful drain after signalling shutdown.
62pub struct PeriodicWorker {
63    join: JoinHandle<()>,
64}
65
66impl PeriodicWorker {
67    /// Spawn a periodic worker. The first `tick()` fires after one
68    /// `interval_duration` elapses (NOT at t=0). This avoids the
69    /// common bug where every spawned worker bursts a tick immediately
70    /// on startup, hammering downstream dependencies during deploy.
71    pub fn spawn<T: PeriodicTask>(
72        mut task: T,
73        interval_duration: Duration,
74        shutdown: CancellationToken,
75    ) -> Self {
76        let join = tokio::spawn(async move {
77            let mut tick = interval(interval_duration);
78            tick.set_missed_tick_behavior(MissedTickBehavior::Delay);
79            // Consume the immediate first tick.
80            tick.tick().await;
81
82            loop {
83                tokio::select! {
84                    biased;
85                    () = shutdown.cancelled() => {
86                        if let Err(e) = task.shutdown().await {
87                            warn!(error = %e, "periodic task shutdown hook failed");
88                        }
89                        return;
90                    }
91                    _ = tick.tick() => {
92                        if let Err(e) = task.tick().await {
93                            warn!(error = %e, "periodic task tick failed");
94                        }
95                    }
96                }
97            }
98        });
99        Self { join }
100    }
101
102    /// Await the worker's clean exit. Use after `shutdown.cancel()`.
103    pub async fn join(self) -> Result<(), tokio::task::JoinError> {
104        self.join.await
105    }
106}
107
108#[cfg(test)]
109mod tests {
110    use super::*;
111    use std::sync::Arc;
112    use std::sync::atomic::{AtomicU32, Ordering};
113    use std::time::Instant;
114
115    struct CountingTask {
116        ticks: Arc<AtomicU32>,
117    }
118
119    impl PeriodicTask for CountingTask {
120        async fn tick(&mut self) -> Result<(), TickError> {
121            self.ticks.fetch_add(1, Ordering::SeqCst);
122            Ok(())
123        }
124    }
125
126    struct ShutdownTask {
127        ticks: Arc<AtomicU32>,
128        shutdown_called: Arc<AtomicU32>,
129    }
130
131    impl PeriodicTask for ShutdownTask {
132        async fn tick(&mut self) -> Result<(), TickError> {
133            self.ticks.fetch_add(1, Ordering::SeqCst);
134            Ok(())
135        }
136
137        async fn shutdown(&mut self) -> Result<(), TickError> {
138            self.shutdown_called.fetch_add(1, Ordering::SeqCst);
139            Ok(())
140        }
141    }
142
143    struct FailingTask {
144        ticks: Arc<AtomicU32>,
145    }
146
147    impl PeriodicTask for FailingTask {
148        async fn tick(&mut self) -> Result<(), TickError> {
149            self.ticks.fetch_add(1, Ordering::SeqCst);
150            Err(TickError::Generic("simulated".into()))
151        }
152    }
153
154    #[tokio::test]
155    async fn tick_fires_at_interval() {
156        let ticks = Arc::new(AtomicU32::new(0));
157        let shutdown = CancellationToken::new();
158        let _worker = PeriodicWorker::spawn(
159            CountingTask {
160                ticks: ticks.clone(),
161            },
162            Duration::from_millis(20),
163            shutdown.clone(),
164        );
165        // Wait ~110ms; expect roughly 5 ticks at 20ms interval.
166        tokio::time::sleep(Duration::from_millis(110)).await;
167        shutdown.cancel();
168        let n = ticks.load(Ordering::SeqCst);
169        assert!((4..=7).contains(&n), "got {n} ticks, expected 4-7");
170    }
171
172    #[tokio::test]
173    async fn first_tick_is_delayed_not_immediate() {
174        // Regression test: ensure the worker does NOT fire a tick at
175        // t=0 (the common gotcha with tokio::time::interval).
176        let ticks = Arc::new(AtomicU32::new(0));
177        let shutdown = CancellationToken::new();
178        let _worker = PeriodicWorker::spawn(
179            CountingTask {
180                ticks: ticks.clone(),
181            },
182            Duration::from_millis(100),
183            shutdown.clone(),
184        );
185        // Check immediately -- should be 0 because interval consumed
186        // the first tick before the loop started.
187        tokio::time::sleep(Duration::from_millis(10)).await;
188        assert_eq!(ticks.load(Ordering::SeqCst), 0);
189        shutdown.cancel();
190    }
191
192    #[tokio::test]
193    async fn shutdown_hook_called_exactly_once() {
194        let ticks = Arc::new(AtomicU32::new(0));
195        let shutdown_called = Arc::new(AtomicU32::new(0));
196        let shutdown = CancellationToken::new();
197        let worker = PeriodicWorker::spawn(
198            ShutdownTask {
199                ticks: ticks.clone(),
200                shutdown_called: shutdown_called.clone(),
201            },
202            Duration::from_mins(1), // very long, no ticks expected
203            shutdown.clone(),
204        );
205        shutdown.cancel();
206        worker.join().await.expect("clean exit");
207        assert_eq!(shutdown_called.load(Ordering::SeqCst), 1);
208        // No tick should have fired in the brief lifetime.
209        assert_eq!(ticks.load(Ordering::SeqCst), 0);
210    }
211
212    #[tokio::test]
213    async fn failing_tick_does_not_stop_worker() {
214        let ticks = Arc::new(AtomicU32::new(0));
215        let shutdown = CancellationToken::new();
216        let _worker = PeriodicWorker::spawn(
217            FailingTask {
218                ticks: ticks.clone(),
219            },
220            Duration::from_millis(15),
221            shutdown.clone(),
222        );
223        // Wait long enough for several failing ticks.
224        tokio::time::sleep(Duration::from_millis(80)).await;
225        shutdown.cancel();
226        let n = ticks.load(Ordering::SeqCst);
227        // Worker kept ticking despite errors -- proves no panic + no exit.
228        assert!(n >= 3, "got {n} ticks, expected >=3 even with errors");
229    }
230
231    #[tokio::test]
232    async fn biased_select_prioritises_shutdown_over_tick() {
233        // If a tick was due simultaneously with shutdown, the biased
234        // select! must pick shutdown first. We can't directly observe
235        // ordering, but we can verify shutdown completes cleanly even
236        // when triggered at the moment a tick would fire.
237        let ticks = Arc::new(AtomicU32::new(0));
238        let shutdown = CancellationToken::new();
239        let worker = PeriodicWorker::spawn(
240            CountingTask {
241                ticks: ticks.clone(),
242            },
243            Duration::from_millis(1), // very tight -- many tick opportunities
244            shutdown.clone(),
245        );
246        let t0 = Instant::now();
247        // Run for a bit, then cancel.
248        tokio::time::sleep(Duration::from_millis(20)).await;
249        shutdown.cancel();
250        worker.join().await.expect("clean exit");
251        let elapsed = t0.elapsed();
252        // join() must return in much less than 1s -- i.e. shutdown
253        // wasn't blocked by an in-flight tick.
254        assert!(
255            elapsed < Duration::from_millis(500),
256            "worker took {elapsed:?} to shut down (expected <500ms)",
257        );
258    }
259}