1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
use super::core::*;
use super::event::*;
use super::super::controller::*;
use desync::*;
use futures::*;
use futures::task;
use std::sync::*;
///
/// The event sink works with a UI session. When events arrive, they can be sent
/// to one of these
///
pub struct UiEventSink {
/// The core controller that will be the target for these events
controller: Arc<Controller>,
/// The core that is affected by these events
core: Arc<Desync<UiSessionCore>>,
/// ID assigned to the most recently dispatched event
last_event: Mutex<usize>,
/// The event that was most recently retired for this sink
last_finished_event: Arc<Mutex<usize>>
}
impl UiEventSink {
///
/// Creates a new event sink
///
pub fn new<CoreController: 'static+Controller>(controller: Arc<CoreController>, core: Arc<Desync<UiSessionCore>>) -> UiEventSink {
UiEventSink {
controller: controller,
core: core,
last_event: Mutex::new(0),
last_finished_event: Arc::new(Mutex::new(0))
}
}
}
impl Sink for UiEventSink {
type SinkItem = Vec<UiEvent>;
type SinkError = ();
fn start_send(&mut self, item: Vec<UiEvent>) -> StartSend<Vec<UiEvent>, ()> {
// Assign an ID to this event
let event_id: usize = {
let mut last_event = self.last_event.lock().unwrap();
(*last_event) += 1;
let event_id = *last_event;
event_id
};
// Need to send some stuff to the core to finish processing the event
let controller = Arc::clone(&self.controller);
let last_finished_event = Arc::clone(&self.last_finished_event);
// Send to the core (which acts as our sink)
self.core.async(move |core| {
// Dispatch the event
core.dispatch_event(item, &*controller);
// Retire the event
let mut last_finished_event = last_finished_event.lock().unwrap();
if *last_finished_event < event_id {
*last_finished_event = event_id;
}
});
// Item went to the sink
Ok(AsyncSink::Ready)
}
fn poll_complete(&mut self) -> Poll<(), ()> {
// Fetch the last event we dispatched and the last one we retired
let current_event = *(self.last_event.lock().unwrap());
let retired_event = *(self.last_finished_event.lock().unwrap());
if current_event == retired_event {
// We're ready
Ok(Async::Ready(()))
} else {
// Generate a task and defer until the core is available again
let task = task::current();
self.core.async(move |_| {
// The event we were expecting will be retired at this point, so signal the task
// New events might be present so the next poll might also be not ready
task.notify();
});
// Events are still waiting to be dispatched/being dispatched
Ok(Async::NotReady)
}
}
}