safe_async_scoped/
lib.rs

1//! `safe-async-scoped` is a minimal wrapper around `FuturesUnordered` in the `futures` crate that simulates **scoped tasks**. Unlike `async-scoped` and even `crossbeam-scoped` etc, `safe-async-scoped`
2//!
3//! - Is _completely_ safe to use
4//! - Has no `unsafe` code in its implementation
5//! - Has no dependencies other than `futures`
6//! - Is completely runtime-agnostic
7//!
8//! Note that "tasks" spawned with `safe-async-scoped` will not be sent to the executor as separate tasks, and will thus only run on one thread. On the plus side, none of the futures have to be `Send`.
9//!
10//! # Example
11//!
12//! ```rust
13//! let listener = Async::new(TcpListener::bind("127.0.0.1:8080").unwrap()).unwrap();
14//! let lala = String::from("hello");
15//! {
16//!    let scope = Scope::new();
17//!    scope
18//!        .start(async {
19//!            loop {
20//!                let (client, _) = listener.accept().await.unwrap();
21//!                scope.spawn(async {
22//!                    handle(client, &lala).await;
23//!                });
24//!            }
25//!        })
26//!        .await;
27//!}
28//!```
29
30use futures::channel::mpsc;
31use futures::lock::Mutex;
32use futures::prelude::*;
33use futures::stream::FuturesUnordered;
34use std::pin::Pin;
35
36type PTask<'a> = Pin<Box<dyn Future<Output = ()> + 'a>>;
37
38/// An opaque utility for spawning local tasks that has access to the scope that `Scope` has access to.
39///
40/// # Example
41///
42/// ```
43/// let listener = Async::new(TcpListener::bind("127.0.0.1:8080").unwrap()).unwrap();
44/// let lala = String::from("hello");
45/// {
46///     let scope = Scope::new();
47///     scope
48///         .start(async {
49///             loop {
50///                 let (client, _) = listener.accept().await.unwrap();
51///                 scope.spawn(async {
52///                     handle(client, &lala).await;
53///                 });
54///             }
55///         })
56///         .await;
57/// }
58/// ```
59pub struct Scope<'env> {
60    runq: Mutex<FuturesUnordered<PTask<'env>>>,
61    recv: Mutex<mpsc::UnboundedReceiver<PTask<'env>>>,
62    send: mpsc::UnboundedSender<PTask<'env>>,
63}
64
65impl<'env> Default for Scope<'env> {
66    fn default() -> Self {
67        Scope::new()
68    }
69}
70
71impl<'env> Scope<'env> {
72    /// Creates a new scope.
73    pub fn new() -> Self {
74        let (send, recv) = mpsc::unbounded();
75        Scope {
76            runq: Mutex::new(FuturesUnordered::new()),
77            recv: Mutex::new(recv),
78            send,
79        }
80    }
81    /// Schedules a future to be spawned onto the scope.
82    pub fn spawn<F: Future<Output = ()> + 'env>(&self, fut: F) {
83        self.send.unbounded_send(Box::pin(fut)).unwrap();
84    }
85
86    async fn schedule(&self) {
87        let mut runq = self.runq.lock().await;
88        let mut recv = self.recv.lock().await;
89        loop {
90            let runq_fut = {
91                async {
92                    if runq.len() == 0 {
93                        future::pending::<()>().await;
94                    } else {
95                        runq.next().await;
96                    }
97                }
98            };
99            futures::select! {
100                _ = runq_fut.fuse() => (),
101                ntsk = recv.next().fuse() => runq.push(ntsk.unwrap()),
102            }
103        }
104    }
105
106    /// Starts a context in which scoped tasks can be spawned. When `fut` resolves, all spawned tasks are forcibly dropped.
107    pub async fn start<'future, 'scope: 'future>(
108        &'scope self,
109        fut: impl Future<Output = ()> + 'future,
110    ) {
111        run_in_scope(self, fut).await;
112    }
113}
114
115async fn run_in_scope<'future, 'scope: 'future, T: Future<Output = ()> + 'future>(
116    s: &Scope<'scope>,
117    f: T,
118) {
119    futures::select! {_ = f.fuse() => (), _ = s.schedule().fuse() => ()};
120}
121
122// fn main() {
123//     smol::run(async {
124//         let listener = Async::new(TcpListener::bind("127.0.0.1:8080").unwrap()).unwrap();
125//         let lala = String::from("hello");
126//         {
127//             let scope = Scope::new();
128//             scope
129//                 .start(async {
130//                     loop {
131//                         let (client, _) = listener.accept().await.unwrap();
132//                         scope.spawn(async {
133//                             handle(client, &lala).await;
134//                         });
135//                     }
136//                 })
137//                 .await;
138//         }
139//     });
140// }
141
142// async fn handle(stream: Async<TcpStream>, lala: &str) {
143//     let reader = async_dup::Arc::new(stream);
144//     let mut writer = reader.clone();
145//     println!("lala is {}", lala.to_ascii_uppercase());
146//     futures::io::copy(reader, &mut writer).await.unwrap();
147// }