1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147
//! `safe-async-scoped` is a minimal wrapper around `FuturesUnordered` in the `futures` crate that simulates **scoped tasks**. Unlike `async-scoped` and even `crossbeam-scoped` etc, `safe-async-scoped` //! //! - Is _completely_ safe to use //! - Has no `unsafe` code in its implementation //! - Has no dependencies other than `futures` //! - Is completely runtime-agnostic //! //! Note that "tasks" spawned with `safe-async-scoped` will not be sent to the executor as separate tasks, and will thus only run on one thread. On the plus side, none of the futures have to be `Send`. //! //! # Example //! //! ```rust //! let listener = Async::new(TcpListener::bind("127.0.0.1:8080").unwrap()).unwrap(); //! let lala = String::from("hello"); //! { //! let scope = Scope::new(); //! scope //! .start(async { //! loop { //! let (client, _) = listener.accept().await.unwrap(); //! scope.spawn(async { //! handle(client, &lala).await; //! }); //! } //! }) //! .await; //!} //!``` use futures::channel::mpsc; use futures::lock::Mutex; use futures::prelude::*; use futures::stream::FuturesUnordered; use std::pin::Pin; type PTask<'a> = Pin<Box<dyn Future<Output = ()> + 'a>>; /// An opaque utility for spawning local tasks that has access to the scope that `Scope` has access to. /// /// # Example /// /// ``` /// let listener = Async::new(TcpListener::bind("127.0.0.1:8080").unwrap()).unwrap(); /// let lala = String::from("hello"); /// { /// let scope = Scope::new(); /// scope /// .start(async { /// loop { /// let (client, _) = listener.accept().await.unwrap(); /// scope.spawn(async { /// handle(client, &lala).await; /// }); /// } /// }) /// .await; /// } /// ``` pub struct Scope<'env> { runq: Mutex<FuturesUnordered<PTask<'env>>>, recv: Mutex<mpsc::UnboundedReceiver<PTask<'env>>>, send: mpsc::UnboundedSender<PTask<'env>>, } impl<'env> Default for Scope<'env> { fn default() -> Self { Scope::new() } } impl<'env> Scope<'env> { /// Creates a new scope. pub fn new() -> Self { let (send, recv) = mpsc::unbounded(); Scope { runq: Mutex::new(FuturesUnordered::new()), recv: Mutex::new(recv), send, } } /// Schedules a future to be spawned onto the scope. pub fn spawn<F: Future<Output = ()> + 'env>(&self, fut: F) { self.send.unbounded_send(Box::pin(fut)).unwrap(); } async fn schedule(&self) { let mut runq = self.runq.lock().await; let mut recv = self.recv.lock().await; loop { let runq_fut = { async { if runq.len() == 0 { future::pending::<()>().await; } else { runq.next().await; } } }; futures::select! { _ = runq_fut.fuse() => (), ntsk = recv.next().fuse() => runq.push(ntsk.unwrap()), } } } /// Starts a context in which scoped tasks can be spawned. When `fut` resolves, all spawned tasks are forcibly dropped. pub async fn start<'future, 'scope: 'future>( &'scope self, fut: impl Future<Output = ()> + 'future, ) { run_in_scope(self, fut).await; } } async fn run_in_scope<'future, 'scope: 'future, T: Future<Output = ()> + 'future>( s: &Scope<'scope>, f: T, ) { futures::select! {_ = f.fuse() => (), _ = s.schedule().fuse() => ()}; } // fn main() { // smol::run(async { // let listener = Async::new(TcpListener::bind("127.0.0.1:8080").unwrap()).unwrap(); // let lala = String::from("hello"); // { // let scope = Scope::new(); // scope // .start(async { // loop { // let (client, _) = listener.accept().await.unwrap(); // scope.spawn(async { // handle(client, &lala).await; // }); // } // }) // .await; // } // }); // } // async fn handle(stream: Async<TcpStream>, lala: &str) { // let reader = async_dup::Arc::new(stream); // let mut writer = reader.clone(); // println!("lala is {}", lala.to_ascii_uppercase()); // futures::io::copy(reader, &mut writer).await.unwrap(); // }