1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
use futures::channel::mpsc;
use futures::lock::Mutex;
use futures::prelude::*;
use futures::stream::FuturesUnordered;
use std::pin::Pin;
type PTask<'a> = Pin<Box<dyn Future<Output = ()> + 'a>>;
pub struct Scope<'env> {
runq: Mutex<FuturesUnordered<PTask<'env>>>,
recv: Mutex<mpsc::UnboundedReceiver<PTask<'env>>>,
send: mpsc::UnboundedSender<PTask<'env>>,
}
impl<'env> Default for Scope<'env> {
fn default() -> Self {
Scope::new()
}
}
impl<'env> Scope<'env> {
pub fn new() -> Self {
let (send, recv) = mpsc::unbounded();
Scope {
runq: Mutex::new(FuturesUnordered::new()),
recv: Mutex::new(recv),
send,
}
}
pub fn spawn<F: Future<Output = ()> + 'env>(&self, fut: F) {
self.send.unbounded_send(Box::pin(fut)).unwrap();
}
async fn schedule(&self) {
let mut runq = self.runq.lock().await;
let mut recv = self.recv.lock().await;
loop {
let runq_fut = {
async {
if runq.len() == 0 {
future::pending::<()>().await;
} else {
runq.next().await;
}
}
};
futures::select! {
_ = runq_fut.fuse() => (),
ntsk = recv.next().fuse() => runq.push(ntsk.unwrap()),
}
}
}
pub async fn start<'future, 'scope: 'future>(
&'scope self,
fut: impl Future<Output = ()> + 'future,
) {
run_in_scope(self, fut).await;
}
}
async fn run_in_scope<'future, 'scope: 'future, T: Future<Output = ()> + 'future>(
s: &Scope<'scope>,
f: T,
) {
futures::select! {_ = f.fuse() => (), _ = s.schedule().fuse() => ()};
}