1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
//! `safe-async-scoped` is a minimal wrapper around `FuturesUnordered` in the `futures` crate that simulates **scoped tasks**. Unlike `async-scoped` and even `crossbeam-scoped` etc, `safe-async-scoped`
//!
//! - Is _completely_ safe to use
//! - Has no `unsafe` code in its implementation
//! - Has no dependencies other than `futures`
//! - Is completely runtime-agnostic
//!
//! Note that "tasks" spawned with `safe-async-scoped` will not be sent to the executor as separate tasks, and will thus only run on one thread. On the plus side, none of the futures have to be `Send`.
//!
//! # Example
//!
//! ```rust
//! let listener = Async::new(TcpListener::bind("127.0.0.1:8080").unwrap()).unwrap();
//! let lala = String::from("hello");
//! {
//!    let scope = Scope::new();
//!    scope
//!        .start(async {
//!            loop {
//!                let (client, _) = listener.accept().await.unwrap();
//!                scope.spawn(async {
//!                    handle(client, &lala).await;
//!                });
//!            }
//!        })
//!        .await;
//!}
//!```

use futures::channel::mpsc;
use futures::lock::Mutex;
use futures::prelude::*;
use futures::stream::FuturesUnordered;
use std::pin::Pin;

type PTask<'a> = Pin<Box<dyn Future<Output = ()> + 'a>>;

/// An opaque utility for spawning local tasks that has access to the scope that `Scope` has access to.
///
/// # Example
///
/// ```
/// let listener = Async::new(TcpListener::bind("127.0.0.1:8080").unwrap()).unwrap();
/// let lala = String::from("hello");
/// {
///     let scope = Scope::new();
///     scope
///         .start(async {
///             loop {
///                 let (client, _) = listener.accept().await.unwrap();
///                 scope.spawn(async {
///                     handle(client, &lala).await;
///                 });
///             }
///         })
///         .await;
/// }
/// ```
pub struct Scope<'env> {
    runq: Mutex<FuturesUnordered<PTask<'env>>>,
    recv: Mutex<mpsc::UnboundedReceiver<PTask<'env>>>,
    send: mpsc::UnboundedSender<PTask<'env>>,
}

impl<'env> Default for Scope<'env> {
    fn default() -> Self {
        Scope::new()
    }
}

impl<'env> Scope<'env> {
    /// Creates a new scope.
    pub fn new() -> Self {
        let (send, recv) = mpsc::unbounded();
        Scope {
            runq: Mutex::new(FuturesUnordered::new()),
            recv: Mutex::new(recv),
            send,
        }
    }
    /// Schedules a future to be spawned onto the scope.
    pub fn spawn<F: Future<Output = ()> + 'env>(&self, fut: F) {
        self.send.unbounded_send(Box::pin(fut)).unwrap();
    }

    async fn schedule(&self) {
        let mut runq = self.runq.lock().await;
        let mut recv = self.recv.lock().await;
        loop {
            let runq_fut = {
                async {
                    if runq.len() == 0 {
                        future::pending::<()>().await;
                    } else {
                        runq.next().await;
                    }
                }
            };
            futures::select! {
                _ = runq_fut.fuse() => (),
                ntsk = recv.next().fuse() => runq.push(ntsk.unwrap()),
            }
        }
    }

    /// Starts a context in which scoped tasks can be spawned. When `fut` resolves, all spawned tasks are forcibly dropped.
    pub async fn start<'future, 'scope: 'future>(
        &'scope self,
        fut: impl Future<Output = ()> + 'future,
    ) {
        run_in_scope(self, fut).await;
    }
}

async fn run_in_scope<'future, 'scope: 'future, T: Future<Output = ()> + 'future>(
    s: &Scope<'scope>,
    f: T,
) {
    futures::select! {_ = f.fuse() => (), _ = s.schedule().fuse() => ()};
}

// fn main() {
//     smol::run(async {
//         let listener = Async::new(TcpListener::bind("127.0.0.1:8080").unwrap()).unwrap();
//         let lala = String::from("hello");
//         {
//             let scope = Scope::new();
//             scope
//                 .start(async {
//                     loop {
//                         let (client, _) = listener.accept().await.unwrap();
//                         scope.spawn(async {
//                             handle(client, &lala).await;
//                         });
//                     }
//                 })
//                 .await;
//         }
//     });
// }

// async fn handle(stream: Async<TcpStream>, lala: &str) {
//     let reader = async_dup::Arc::new(stream);
//     let mut writer = reader.clone();
//     println!("lala is {}", lala.to_ascii_uppercase());
//     futures::io::copy(reader, &mut writer).await.unwrap();
// }