futures_ext/future/abort_handle_ref.rs
1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * This source code is licensed under both the MIT license found in the
5 * LICENSE-MIT file in the root directory of this source tree and the Apache
6 * License, Version 2.0 found in the LICENSE-APACHE file in the root directory
7 * of this source tree.
8 */
9
10use std::future::Future;
11use std::sync::Arc;
12
13use futures::future::abortable;
14use futures::future::AbortHandle;
15
16/// Spawns a new task returning an abort handle for it.
17///
18/// It is similar to [tokio::task::spawn] but instad of returning a JoinHandle it will return
19/// an [ControlledHandle]. The [ControlledHandle] can be used to directly abort the task that was
20/// spawned. [ControlledHandle] can be cloned resulting in a new handle to the same underlying
21/// task. Dropping all [ControlledHandle] instances pointing to a given task will result in the
22/// abort of that task.
23///
24/// The use case this function is tasks that "run in the background" and are tied to a specific
25/// object. We attach the [ControlledHandle] to the object in question so that the background task is
26/// "dropped" (aborted) when the object is dropped.
27pub fn spawn_controlled<T>(t: T) -> ControlledHandle
28where
29 T: Future + Send + 'static,
30 T::Output: Send + 'static,
31{
32 let (abortable_future, abort_handle) = abortable(t);
33 tokio::task::spawn(abortable_future);
34 ControlledHandle::new(abort_handle)
35}
36
37/// A handle that can abort the spawned task that it is associated with aborted. The underlying
38/// task also gets aborted when there are no more handles referencing it.
39#[derive(Clone, Debug)]
40pub struct ControlledHandle(Arc<Inner>);
41
42impl ControlledHandle {
43 fn new(abort_handle: AbortHandle) -> Self {
44 Self(Arc::new(Inner(abort_handle)))
45 }
46 // There's probably nothing wrong with adding an explicit abort function but we don't need it
47 // right now.
48}
49
50#[derive(Debug)]
51struct Inner(AbortHandle);
52
53impl Drop for Inner {
54 fn drop(&mut self) {
55 self.0.abort();
56 }
57}
58
59#[cfg(test)]
60mod tests {
61 use tokio::sync::mpsc;
62
63 use super::*;
64
65 fn handle_and_counting_receiver() -> (ControlledHandle, mpsc::Receiver<u64>) {
66 let (tx, rx) = mpsc::channel(1);
67 let handle = spawn_controlled(async move {
68 let mut x: u64 = 0;
69 loop {
70 tx.send(x).await.unwrap();
71 x += 1;
72 }
73 });
74 (handle, rx)
75 }
76
77 #[tokio::test]
78 async fn test_no_handles_abort() {
79 let (handle, mut rx) = handle_and_counting_receiver();
80 assert_eq!(rx.recv().await, Some(0));
81 {
82 let _ = handle.clone();
83 }
84 assert_eq!(rx.recv().await, Some(1));
85 drop(handle);
86 assert_eq!(rx.recv().await, None);
87 }
88}