Skip to main content

bluetape_rs_async/
control.rs

1//! Timeout, cancellation, and shutdown coordination helpers.
2
3use std::error::Error;
4use std::fmt;
5use std::future::Future;
6use std::time::Duration;
7
8use tokio::sync::watch;
9use tokio::time::Instant;
10
11/// Error returned by timeout, deadline, cancellation, and shutdown helpers.
12#[derive(Debug, Clone, Eq, PartialEq)]
13#[non_exhaustive]
14pub enum AsyncControlError {
15    /// The wrapped operation did not complete before its timeout or deadline.
16    TimedOut,
17    /// The caller-owned cancellation or shutdown signal completed first.
18    Cancelled,
19}
20
21impl fmt::Display for AsyncControlError {
22    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
23        match self {
24            Self::TimedOut => formatter.write_str("operation timed out"),
25            Self::Cancelled => formatter.write_str("operation was cancelled"),
26        }
27    }
28}
29
30impl Error for AsyncControlError {}
31
32/// Handle used to request cancellation for one or more [`CancellationToken`]s.
33///
34/// Cloning the source creates another owner that can request cancellation.
35/// Dropping every source without calling [`CancellationSource::cancel`] also
36/// wakes tokens; receivers treat a source-less channel as cancelled because no
37/// owner remains able to make a later positive decision.
38#[derive(Debug, Clone)]
39pub struct CancellationSource {
40    sender: watch::Sender<bool>,
41}
42
43impl CancellationSource {
44    /// Creates a new cancellation source and its first token.
45    ///
46    /// # Examples
47    ///
48    /// ```
49    /// use bluetape_rs_async::CancellationSource;
50    ///
51    /// let (source, token) = CancellationSource::new();
52    /// assert!(!token.is_cancelled());
53    /// source.cancel();
54    /// assert!(token.is_cancelled());
55    /// ```
56    #[must_use]
57    pub fn new() -> (Self, CancellationToken) {
58        let (sender, receiver) = watch::channel(false);
59        (Self { sender }, CancellationToken { receiver })
60    }
61
62    /// Creates another token linked to this source.
63    ///
64    /// Tokens are independent receivers over the same cancellation state. A
65    /// late token observes an already-cancelled source immediately.
66    #[must_use]
67    pub fn token(&self) -> CancellationToken {
68        CancellationToken {
69            receiver: self.sender.subscribe(),
70        }
71    }
72
73    /// Requests cancellation.
74    ///
75    /// This method is idempotent. It is not an error if all tokens have already
76    /// been dropped.
77    pub fn cancel(&self) {
78        let _ = self.sender.send(true);
79    }
80
81    /// Returns `true` when cancellation has been requested.
82    #[must_use]
83    pub fn is_cancelled(&self) -> bool {
84        *self.sender.borrow()
85    }
86}
87
88/// Receiver-side cancellation token.
89///
90/// A token is a listener, not an owner. Dropping a token never cancels sibling
91/// tokens; only [`CancellationSource::cancel`] or dropping all sources does.
92#[derive(Debug, Clone)]
93pub struct CancellationToken {
94    receiver: watch::Receiver<bool>,
95}
96
97impl CancellationToken {
98    /// Returns `true` when cancellation has been requested.
99    #[must_use]
100    pub fn is_cancelled(&self) -> bool {
101        *self.receiver.borrow()
102    }
103
104    /// Waits until cancellation is requested or the source is dropped.
105    ///
106    /// Dropping the source is treated as cancellation because no owner remains
107    /// able to make a later positive shutdown decision. In that source-drop
108    /// case [`CancellationToken::is_cancelled`] still reflects the last observed
109    /// boolean state, which can remain `false`.
110    pub async fn cancelled(&mut self) {
111        if *self.receiver.borrow() {
112            return;
113        }
114
115        loop {
116            if self.receiver.changed().await.is_err() {
117                return;
118            }
119            if *self.receiver.borrow_and_update() {
120                return;
121            }
122        }
123    }
124}
125
126/// Trigger side of a shutdown signal pair.
127///
128/// This is a domain-named wrapper around [`CancellationSource`] for graceful
129/// shutdown flows.
130#[derive(Debug, Clone)]
131pub struct ShutdownTrigger {
132    source: CancellationSource,
133}
134
135impl ShutdownTrigger {
136    /// Requests shutdown for all linked [`ShutdownSignal`] listeners.
137    pub fn shutdown(&self) {
138        self.source.cancel();
139    }
140
141    /// Creates another listener linked to this trigger.
142    #[must_use]
143    pub fn signal(&self) -> ShutdownSignal {
144        ShutdownSignal {
145            token: self.source.token(),
146        }
147    }
148
149    /// Returns `true` when shutdown has been requested.
150    #[must_use]
151    pub fn is_shutdown_requested(&self) -> bool {
152        self.source.is_cancelled()
153    }
154}
155
156/// Listener side of a shutdown signal pair.
157///
158/// Dropping all triggers wakes listeners the same way an explicit shutdown
159/// request does, because no owner remains able to keep the service running.
160#[derive(Debug, Clone)]
161pub struct ShutdownSignal {
162    token: CancellationToken,
163}
164
165impl ShutdownSignal {
166    /// Returns `true` when shutdown has been requested.
167    #[must_use]
168    pub fn is_shutdown_requested(&self) -> bool {
169        self.token.is_cancelled()
170    }
171
172    /// Waits until shutdown is requested or the trigger is dropped.
173    pub async fn wait(&mut self) {
174        self.token.cancelled().await;
175    }
176}
177
178/// Creates a shutdown trigger and its first listener.
179#[must_use]
180pub fn shutdown_signal() -> (ShutdownTrigger, ShutdownSignal) {
181    let (source, token) = CancellationSource::new();
182    (ShutdownTrigger { source }, ShutdownSignal { token })
183}
184
185/// Runs a future with a Tokio timeout.
186///
187/// Tokio cancels the wrapped future by dropping it when the timeout elapses.
188/// Dropping this wrapper future still propagates caller cancellation normally.
189///
190/// # Errors
191///
192/// Returns [`AsyncControlError::TimedOut`] when the future does not complete
193/// before `duration`.
194pub async fn with_timeout<F, T>(duration: Duration, future: F) -> Result<T, AsyncControlError>
195where
196    F: Future<Output = T>,
197{
198    tokio::time::timeout(duration, future)
199        .await
200        .map_err(|_| AsyncControlError::TimedOut)
201}
202
203/// Runs a future until a Tokio deadline.
204///
205/// Tokio checks the deadline before polling the future. A future that never
206/// yields can still run past the deadline before Tokio observes the timeout.
207///
208/// # Errors
209///
210/// Returns [`AsyncControlError::TimedOut`] when the future does not complete
211/// before `deadline`.
212pub async fn with_deadline<F, T>(deadline: Instant, future: F) -> Result<T, AsyncControlError>
213where
214    F: Future<Output = T>,
215{
216    tokio::time::timeout_at(deadline, future)
217        .await
218        .map_err(|_| AsyncControlError::TimedOut)
219}
220
221/// Runs a future until either it completes or cancellation is requested.
222///
223/// Dropping this wrapper future does not convert caller cancellation into
224/// [`AsyncControlError::Cancelled`]; the inner future is dropped normally.
225/// Dropping every source for `token` is treated the same as cancellation.
226///
227/// # Errors
228///
229/// Returns [`AsyncControlError::Cancelled`] when `token` completes first.
230pub async fn run_until_cancelled<F, T>(
231    mut token: CancellationToken,
232    future: F,
233) -> Result<T, AsyncControlError>
234where
235    F: Future<Output = T>,
236{
237    tokio::select! {
238        biased;
239        _ = token.cancelled() => Err(AsyncControlError::Cancelled),
240        value = future => Ok(value),
241    }
242}
243
244/// Runs a future until it completes, times out, or cancellation is requested.
245///
246/// Cancellation wins when the token and timeout are both ready.
247/// Dropping every source for `token` is treated the same as cancellation.
248///
249/// # Errors
250///
251/// Returns [`AsyncControlError::Cancelled`] when `token` completes first, or
252/// [`AsyncControlError::TimedOut`] when `duration` elapses first.
253pub async fn with_timeout_or_cancel<F, T>(
254    duration: Duration,
255    mut token: CancellationToken,
256    future: F,
257) -> Result<T, AsyncControlError>
258where
259    F: Future<Output = T>,
260{
261    tokio::select! {
262        biased;
263        _ = token.cancelled() => Err(AsyncControlError::Cancelled),
264        result = tokio::time::timeout(duration, future) => {
265            result.map_err(|_| AsyncControlError::TimedOut)
266        }
267    }
268}
269
270#[cfg(test)]
271mod tests {
272    use std::future::pending;
273    use std::sync::Arc;
274    use std::sync::atomic::{AtomicUsize, Ordering};
275
276    use tokio::sync::Notify;
277    use tokio::time::{Duration, Instant, sleep};
278
279    use super::*;
280
281    struct DropCounter {
282        counter: Arc<AtomicUsize>,
283    }
284
285    impl Drop for DropCounter {
286        fn drop(&mut self) {
287            self.counter.fetch_add(1, Ordering::SeqCst);
288        }
289    }
290
291    #[test]
292    fn async_control_error_formats_public_error_messages() {
293        assert_eq!(
294            AsyncControlError::TimedOut.to_string(),
295            "operation timed out"
296        );
297        assert_eq!(
298            AsyncControlError::Cancelled.to_string(),
299            "operation was cancelled"
300        );
301        assert!(AsyncControlError::TimedOut.source().is_none());
302        assert!(AsyncControlError::Cancelled.source().is_none());
303    }
304
305    #[tokio::test]
306    async fn with_timeout_returns_value_before_deadline() {
307        let actual = with_timeout(Duration::from_secs(1), async { 7 }).await;
308
309        assert_eq!(actual, Ok(7));
310    }
311
312    #[tokio::test(start_paused = true)]
313    async fn with_timeout_reports_elapsed_operation() {
314        let actual = with_timeout(Duration::from_millis(10), async {
315            sleep(Duration::from_secs(1)).await;
316            7
317        })
318        .await;
319
320        assert_eq!(actual, Err(AsyncControlError::TimedOut));
321    }
322
323    #[tokio::test(start_paused = true)]
324    async fn with_deadline_reports_elapsed_operation() {
325        let deadline = Instant::now() + Duration::from_millis(10);
326        let actual = with_deadline(deadline, async {
327            sleep(Duration::from_secs(1)).await;
328            7
329        })
330        .await;
331
332        assert_eq!(actual, Err(AsyncControlError::TimedOut));
333    }
334
335    #[tokio::test(start_paused = true)]
336    async fn with_timeout_or_cancel_reports_timeout_when_token_is_idle() {
337        let (_source, token) = CancellationSource::new();
338
339        let actual = with_timeout_or_cancel(Duration::from_millis(10), token, async {
340            sleep(Duration::from_secs(1)).await;
341            7
342        })
343        .await;
344
345        assert_eq!(actual, Err(AsyncControlError::TimedOut));
346    }
347
348    #[tokio::test]
349    async fn run_until_cancelled_returns_value_before_cancellation() {
350        let (_source, token) = CancellationSource::new();
351
352        let actual = run_until_cancelled(token, async { 7 }).await;
353
354        assert_eq!(actual, Ok(7));
355    }
356
357    #[tokio::test]
358    async fn cancellation_token_completes_when_all_sources_are_dropped() {
359        let (source, mut token) = CancellationSource::new();
360
361        drop(source);
362        token.cancelled().await;
363
364        assert!(!token.is_cancelled());
365    }
366
367    #[tokio::test]
368    async fn run_until_cancelled_reports_cancelled_when_source_is_dropped() {
369        let (source, token) = CancellationSource::new();
370
371        drop(source);
372        let actual = run_until_cancelled(token, pending::<()>()).await;
373
374        assert_eq!(actual, Err(AsyncControlError::Cancelled));
375    }
376
377    #[tokio::test]
378    async fn run_until_cancelled_reports_cancellation_and_drops_future() {
379        let (source, token) = CancellationSource::new();
380        let dropped = Arc::new(AtomicUsize::new(0));
381        let started = Arc::new(Notify::new());
382
383        let task = tokio::spawn({
384            let dropped = Arc::clone(&dropped);
385            let started = Arc::clone(&started);
386            async move {
387                run_until_cancelled(token, async move {
388                    let _guard = DropCounter { counter: dropped };
389                    started.notify_one();
390                    pending::<()>().await;
391                    7
392                })
393                .await
394            }
395        });
396
397        started.notified().await;
398        source.cancel();
399        let actual = task.await.unwrap();
400
401        assert_eq!(actual, Err(AsyncControlError::Cancelled));
402        assert_eq!(dropped.load(Ordering::SeqCst), 1);
403    }
404
405    #[tokio::test(start_paused = true)]
406    async fn with_timeout_or_cancel_prefers_cancellation() {
407        let (source, token) = CancellationSource::new();
408        source.cancel();
409
410        let actual = with_timeout_or_cancel(Duration::from_millis(10), token, async {
411            sleep(Duration::from_secs(1)).await;
412            7
413        })
414        .await;
415
416        assert_eq!(actual, Err(AsyncControlError::Cancelled));
417    }
418
419    #[tokio::test]
420    async fn with_timeout_or_cancel_reports_cancelled_when_source_is_dropped() {
421        let (source, token) = CancellationSource::new();
422
423        drop(source);
424        let actual = with_timeout_or_cancel(Duration::from_secs(1), token, pending::<()>()).await;
425
426        assert_eq!(actual, Err(AsyncControlError::Cancelled));
427    }
428
429    #[tokio::test]
430    async fn shutdown_signal_notifies_all_listeners() {
431        let (trigger, mut signal) = shutdown_signal();
432        let mut second = trigger.signal();
433
434        let first_task = tokio::spawn(async move {
435            signal.wait().await;
436            signal.is_shutdown_requested()
437        });
438        let second_task = tokio::spawn(async move {
439            second.wait().await;
440            second.is_shutdown_requested()
441        });
442
443        trigger.shutdown();
444
445        assert!(first_task.await.unwrap());
446        assert!(second_task.await.unwrap());
447        assert!(trigger.is_shutdown_requested());
448    }
449
450    #[tokio::test]
451    async fn shutdown_signal_waits_until_trigger_is_dropped() {
452        let (trigger, mut signal) = shutdown_signal();
453
454        drop(trigger);
455        signal.wait().await;
456
457        assert!(!signal.is_shutdown_requested());
458    }
459}