1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
extern crate generator;
#[macro_use]
extern crate may;
use std::cell::UnsafeCell;
use std::time::Duration;
use may::{coroutine, cqueue};
// this is wrapper to work around the compile error
// we are safe to share the data in bottom half since we run them orderly
struct SyncCell<T>(UnsafeCell<T>);
impl<T> SyncCell<T> {
fn new(data: T) -> Self {
Self(UnsafeCell::new(data))
}
unsafe fn get_mut(&mut self) -> &mut T {
self.0.get_mut()
}
}
// sum ten data resources
fn main() {
let mut gv = vec![];
let mut total = SyncCell::new(0);
// create the event producers
for i in 0..10 {
let g = generator::Gn::new_scoped(move |mut s| {
let mut data = 10;
loop {
coroutine::sleep(Duration::from_millis(500 * (i + 1)));
// println!("coroutine{}: data = {:?}", i, data);
s.yield_with(data);
data += 10;
}
});
gv.push(g);
}
// the select body that monitor the rx event and recalc the new total
cqueue::scope(|cqueue| {
// register select coroutines
for t in 0..10 {
go!(cqueue, t, |es| {
let mut last = 0;
let token = es.get_token();
for data in gv[token].by_ref() {
// =====================================================
es.send(0);
// =====================================================
let delta = data - last;
let total = unsafe { total.get_mut() };
// bottom half that will run sequentially in the poller
println!("in selector: update from {token}, delta={delta}, last_total={total}");
*total += delta;
last = data;
}
});
}
// register timer for poller
cqueue_add_oneshot!(cqueue, 9999, _ = coroutine::sleep(Duration::from_secs(10)) => {
println!("poll time over!");
});
let total = unsafe { total.get_mut() };
// poll the event
while let Ok(ev) = cqueue.poll(None) {
if ev.token == 9999 {
break;
}
// print the new total
println!("in poller: total={total}");
}
});
println!("done");
}