1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
extern crate generator;
#[macro_use]
extern crate cogo;
use std::time::Duration;
use cogo::{coroutine, cqueue};
// this is wrapper to work around the compile error
// we are safe to share the data in bottom half since we run them orderly
struct SyncCell<T>(T);
impl<T> SyncCell<T> {
unsafe fn get_mut(&mut self) -> &mut T {
&mut *(&self.0 as *const _ as *mut T)
}
}
// sum ten data resources
fn main() {
let mut gv = vec![];
let mut total = SyncCell(0);
// create the event producers
for i in 0..10 {
let g = generator::Gn::new_scoped(move |mut s| {
let mut data = 10;
loop {
coroutine::sleep(Duration::from_millis(500 * (i + 1)));
// println!("coroutine{}: data = {:?}", i, data);
s.yield_with(data);
data += 10;
}
});
gv.push(g);
}
// the select body that monitor the rx event and recalc the new total
cqueue::scope(|cqueue| {
// registe select coroutines
for t in 0..10 {
go!(cqueue, t, |es| {
let mut last = 0;
let token = es.get_token();
while let Some(data) = gv[token].next() {
// =====================================================
es.send(0);
// =====================================================
let delta = data - last;
let total = unsafe { total.get_mut() };
// bottom half that will run sequencially in the poller
println!(
"in selector: update from {}, delta={}, last_total={}",
token, delta, total
);
*total += delta;
last = data;
}
});
}
// register timer for poller
cqueue_add_oneshot!(cqueue, 9999, _ = coroutine::sleep(Duration::from_secs(10)) => {
println!("poll time over!");
});
let total = unsafe { total.get_mut() };
// poll the event
while let Ok(ev) = cqueue.poll(None) {
if ev.token == 9999 {
break;
}
// print the new total
println!("in poller: total={}", total);
}
});
println!("done");
}