futures_ext/future/
abort_handle_ref.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * This source code is licensed under both the MIT license found in the
5 * LICENSE-MIT file in the root directory of this source tree and the Apache
6 * License, Version 2.0 found in the LICENSE-APACHE file in the root directory
7 * of this source tree.
8 */
9
10use std::future::Future;
11use std::sync::Arc;
12
13use futures::future::abortable;
14use futures::future::AbortHandle;
15
16/// Spawns a new task returning an abort handle for it.
17///
18/// It is similar to [tokio::task::spawn] but instad of returning a JoinHandle it will return
19/// an [ControlledHandle]. The [ControlledHandle] can be used to directly abort the task that was
20/// spawned. [ControlledHandle] can be cloned resulting in a new handle to the same underlying
21/// task. Dropping all [ControlledHandle] instances pointing to a given task will result in the
22/// abort of that task.
23///
24/// The use case this function is tasks that "run in the background" and are tied to a specific
25/// object. We attach the [ControlledHandle] to the object in question so that the background task is
26/// "dropped" (aborted) when the object is dropped.
27pub fn spawn_controlled<T>(t: T) -> ControlledHandle
28where
29    T: Future + Send + 'static,
30    T::Output: Send + 'static,
31{
32    let (abortable_future, abort_handle) = abortable(t);
33    tokio::task::spawn(abortable_future);
34    ControlledHandle::new(abort_handle)
35}
36
37/// A handle that can abort the spawned task that it is associated with aborted. The underlying
38/// task also gets aborted when there are no more handles referencing it.
39#[derive(Clone, Debug)]
40pub struct ControlledHandle(Arc<Inner>);
41
42impl ControlledHandle {
43    fn new(abort_handle: AbortHandle) -> Self {
44        Self(Arc::new(Inner(abort_handle)))
45    }
46    // There's probably nothing wrong with adding an explicit abort function but we don't need it
47    // right now.
48}
49
50#[derive(Debug)]
51struct Inner(AbortHandle);
52
53impl Drop for Inner {
54    fn drop(&mut self) {
55        self.0.abort();
56    }
57}
58
59#[cfg(test)]
60mod tests {
61    use tokio::sync::mpsc;
62
63    use super::*;
64
65    fn handle_and_counting_receiver() -> (ControlledHandle, mpsc::Receiver<u64>) {
66        let (tx, rx) = mpsc::channel(1);
67        let handle = spawn_controlled(async move {
68            let mut x: u64 = 0;
69            loop {
70                tx.send(x).await.unwrap();
71                x += 1;
72            }
73        });
74        (handle, rx)
75    }
76
77    #[tokio::test]
78    async fn test_no_handles_abort() {
79        let (handle, mut rx) = handle_and_counting_receiver();
80        assert_eq!(rx.recv().await, Some(0));
81        {
82            let _ = handle.clone();
83        }
84        assert_eq!(rx.recv().await, Some(1));
85        drop(handle);
86        assert_eq!(rx.recv().await, None);
87    }
88}