1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
use crate::releasable::*;
use crate::rope_binding::stream_state::*;
use flo_rope::*;
use futures::task::*;
///
/// The core of a rope binding represents the data that's shared amongst all ropes
///
pub (super) struct RopeBindingCore<Cell, Attribute>
where
Cell: Clone+PartialEq,
Attribute: Clone+PartialEq+Default {
/// The number of items that are using hte core
pub (super) usage_count: usize,
/// The rope that stores this binding
pub (super) rope: PullRope<AttributedRope<Cell, Attribute>, Box<dyn Fn() -> ()+Send+Sync>>,
/// The states of any streams reading from this rope
pub (super) stream_states: Vec<RopeStreamState<Cell, Attribute>>,
/// The next ID to assign to a stream state
pub (super) next_stream_id: usize,
// List of things to call when this binding changes
pub (super) when_changed: Vec<ReleasableNotifiable>
}
impl<Cell, Attribute> RopeBindingCore<Cell, Attribute>
where
Cell: 'static+Send+Unpin+Clone+PartialEq,
Attribute: 'static+Send+Sync+Clone+Unpin+PartialEq+Default {
///
/// If there are any notifiables in this object that aren't in use, remove them
///
pub (super) fn filter_unused_notifications(&mut self) {
self.when_changed.retain(|releasable| releasable.is_in_use());
}
///
/// Wake anything that's listening to the core
///
pub (super) fn wake(&mut self) {
// Notify anything that's listening
for notifiable in &self.when_changed {
notifiable.mark_as_changed();
}
// Wake any streams that are waiting for changes to be pulled
for stream in self.stream_states.iter_mut() {
let waker = stream.waker.take();
if let Some(waker) = waker {
// Wake the stream so that it pulls the changes
waker.wake();
} else {
// If the stream is trying to sleep, make sure it wakes up immediately
stream.needs_pull = true;
}
}
}
///
/// Callback: the rope has changes to pull
///
pub (super) fn on_pull(&mut self) {
// Clear out any notifications that are not being used any more
self.filter_unused_notifications();
// Notify anything that's listening
self.wake();
}
///
/// Pulls values from the rope and send to all attached streams
///
pub (super) fn pull_rope(&mut self) {
// Stop the streams from waking up (no changes pending)
for stream in self.stream_states.iter_mut() {
stream.needs_pull = false;
}
// Collect the actions
let actions = self.rope.pull_changes().collect::<Vec<_>>();
// Don't wake anything if there are no actions to perform
if actions.len() == 0 {
return;
}
// Push to each stream
for stream in self.stream_states.iter_mut() {
stream.pending_changes.extend(actions.iter().cloned());
}
// Wake all of the streams
self.wake();
}
///
/// Sets a stream to wake when the rope changes
///
pub (super) fn wake_stream(&mut self, stream_id: usize, waker: Waker) {
self.stream_states
.iter_mut()
.filter(|state| state.identifier == stream_id)
.nth(0)
.map(move |state| {
if !state.needs_pull {
// There are no pending values so we should wait for the rope to pull some extra data
// Wake the stream when there's some more data to receive
state.waker = Some(waker);
} else {
// There are pending values so we should immediately re-awaken the stream
// Disable the waker in case there's a stale one
state.waker = None;
// Wake the stream so it reads the next value
waker.wake();
}
});
}
}