1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
use futures::channel::mpsc;
use futures::lock::Mutex;
use futures::prelude::*;
use futures::stream::FuturesUnordered;
use std::pin::Pin;

type PTask<'a> = Pin<Box<dyn Future<Output = ()> + 'a>>;

/// An opaque utility for spawning local tasks that has access to the scope that `Scope` has access to.
///
/// # Example
///
/// ```
/// let listener = Async::new(TcpListener::bind("127.0.0.1:8080").unwrap()).unwrap();
/// let lala = String::from("hello");
/// {
///     let scope = Scope::new();
///     scope
///         .start(async {
///             loop {
///                 let (client, _) = listener.accept().await.unwrap();
///                 scope.spawn(async {
///                     handle(client, &lala).await;
///                 });
///             }
///         })
///         .await;
/// }
/// ```
pub struct Scope<'env> {
    runq: Mutex<FuturesUnordered<PTask<'env>>>,
    recv: Mutex<mpsc::UnboundedReceiver<PTask<'env>>>,
    send: mpsc::UnboundedSender<PTask<'env>>,
}

impl<'env> Default for Scope<'env> {
    fn default() -> Self {
        Scope::new()
    }
}

impl<'env> Scope<'env> {
    /// Creates a new scope.
    pub fn new() -> Self {
        let (send, recv) = mpsc::unbounded();
        Scope {
            runq: Mutex::new(FuturesUnordered::new()),
            recv: Mutex::new(recv),
            send,
        }
    }
    /// Schedules a future to be spawned onto the scope.
    pub fn spawn<F: Future<Output = ()> + 'env>(&self, fut: F) {
        self.send.unbounded_send(Box::pin(fut)).unwrap();
    }

    async fn schedule(&self) {
        let mut runq = self.runq.lock().await;
        let mut recv = self.recv.lock().await;
        loop {
            let runq_fut = {
                async {
                    if runq.len() == 0 {
                        future::pending::<()>().await;
                    } else {
                        runq.next().await;
                    }
                }
            };
            futures::select! {
                _ = runq_fut.fuse() => (),
                ntsk = recv.next().fuse() => runq.push(ntsk.unwrap()),
            }
        }
    }

    /// Starts a context in which scoped tasks can be spawned. When `fut` resolves, all spawned tasks are forcibly dropped.
    pub async fn start<'future, 'scope: 'future>(
        &'scope self,
        fut: impl Future<Output = ()> + 'future,
    ) {
        run_in_scope(self, fut).await;
    }
}

async fn run_in_scope<'future, 'scope: 'future, T: Future<Output = ()> + 'future>(
    s: &Scope<'scope>,
    f: T,
) {
    futures::select! {_ = f.fuse() => (), _ = s.schedule().fuse() => ()};
}

// fn main() {
//     smol::run(async {
//         let listener = Async::new(TcpListener::bind("127.0.0.1:8080").unwrap()).unwrap();
//         let lala = String::from("hello");
//         {
//             let scope = Scope::new();
//             scope
//                 .start(async {
//                     loop {
//                         let (client, _) = listener.accept().await.unwrap();
//                         scope.spawn(async {
//                             handle(client, &lala).await;
//                         });
//                     }
//                 })
//                 .await;
//         }
//     });
// }

// async fn handle(stream: Async<TcpStream>, lala: &str) {
//     let reader = async_dup::Arc::new(stream);
//     let mut writer = reader.clone();
//     println!("lala is {}", lala.to_ascii_uppercase());
//     futures::io::copy(reader, &mut writer).await.unwrap();
// }