safe_async_scoped/lib.rs
1//! `safe-async-scoped` is a minimal wrapper around `FuturesUnordered` in the `futures` crate that simulates **scoped tasks**. Unlike `async-scoped` and even `crossbeam-scoped` etc, `safe-async-scoped`
2//!
3//! - Is _completely_ safe to use
4//! - Has no `unsafe` code in its implementation
5//! - Has no dependencies other than `futures`
6//! - Is completely runtime-agnostic
7//!
8//! Note that "tasks" spawned with `safe-async-scoped` will not be sent to the executor as separate tasks, and will thus only run on one thread. On the plus side, none of the futures have to be `Send`.
9//!
10//! # Example
11//!
12//! ```rust
13//! let listener = Async::new(TcpListener::bind("127.0.0.1:8080").unwrap()).unwrap();
14//! let lala = String::from("hello");
15//! {
16//! let scope = Scope::new();
17//! scope
18//! .start(async {
19//! loop {
20//! let (client, _) = listener.accept().await.unwrap();
21//! scope.spawn(async {
22//! handle(client, &lala).await;
23//! });
24//! }
25//! })
26//! .await;
27//!}
28//!```
29
30use futures::channel::mpsc;
31use futures::lock::Mutex;
32use futures::prelude::*;
33use futures::stream::FuturesUnordered;
34use std::pin::Pin;
35
36type PTask<'a> = Pin<Box<dyn Future<Output = ()> + 'a>>;
37
38/// An opaque utility for spawning local tasks that has access to the scope that `Scope` has access to.
39///
40/// # Example
41///
42/// ```
43/// let listener = Async::new(TcpListener::bind("127.0.0.1:8080").unwrap()).unwrap();
44/// let lala = String::from("hello");
45/// {
46/// let scope = Scope::new();
47/// scope
48/// .start(async {
49/// loop {
50/// let (client, _) = listener.accept().await.unwrap();
51/// scope.spawn(async {
52/// handle(client, &lala).await;
53/// });
54/// }
55/// })
56/// .await;
57/// }
58/// ```
59pub struct Scope<'env> {
60 runq: Mutex<FuturesUnordered<PTask<'env>>>,
61 recv: Mutex<mpsc::UnboundedReceiver<PTask<'env>>>,
62 send: mpsc::UnboundedSender<PTask<'env>>,
63}
64
65impl<'env> Default for Scope<'env> {
66 fn default() -> Self {
67 Scope::new()
68 }
69}
70
71impl<'env> Scope<'env> {
72 /// Creates a new scope.
73 pub fn new() -> Self {
74 let (send, recv) = mpsc::unbounded();
75 Scope {
76 runq: Mutex::new(FuturesUnordered::new()),
77 recv: Mutex::new(recv),
78 send,
79 }
80 }
81 /// Schedules a future to be spawned onto the scope.
82 pub fn spawn<F: Future<Output = ()> + 'env>(&self, fut: F) {
83 self.send.unbounded_send(Box::pin(fut)).unwrap();
84 }
85
86 async fn schedule(&self) {
87 let mut runq = self.runq.lock().await;
88 let mut recv = self.recv.lock().await;
89 loop {
90 let runq_fut = {
91 async {
92 if runq.len() == 0 {
93 future::pending::<()>().await;
94 } else {
95 runq.next().await;
96 }
97 }
98 };
99 futures::select! {
100 _ = runq_fut.fuse() => (),
101 ntsk = recv.next().fuse() => runq.push(ntsk.unwrap()),
102 }
103 }
104 }
105
106 /// Starts a context in which scoped tasks can be spawned. When `fut` resolves, all spawned tasks are forcibly dropped.
107 pub async fn start<'future, 'scope: 'future>(
108 &'scope self,
109 fut: impl Future<Output = ()> + 'future,
110 ) {
111 run_in_scope(self, fut).await;
112 }
113}
114
115async fn run_in_scope<'future, 'scope: 'future, T: Future<Output = ()> + 'future>(
116 s: &Scope<'scope>,
117 f: T,
118) {
119 futures::select! {_ = f.fuse() => (), _ = s.schedule().fuse() => ()};
120}
121
122// fn main() {
123// smol::run(async {
124// let listener = Async::new(TcpListener::bind("127.0.0.1:8080").unwrap()).unwrap();
125// let lala = String::from("hello");
126// {
127// let scope = Scope::new();
128// scope
129// .start(async {
130// loop {
131// let (client, _) = listener.accept().await.unwrap();
132// scope.spawn(async {
133// handle(client, &lala).await;
134// });
135// }
136// })
137// .await;
138// }
139// });
140// }
141
142// async fn handle(stream: Async<TcpStream>, lala: &str) {
143// let reader = async_dup::Arc::new(stream);
144// let mut writer = reader.clone();
145// println!("lala is {}", lala.to_ascii_uppercase());
146// futures::io::copy(reader, &mut writer).await.unwrap();
147// }