1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
use crate::rope_binding::core::*;
use flo_rope::*;
use ::desync::*;
use futures::task::*;
use futures::prelude::*;
use futures::future::{BoxFuture};
use std::mem;
use std::pin::*;
use std::sync::*;
use std::collections::{VecDeque};
///
/// A rope stream monitors a rope binding, and supplies them as a stream so they can be mirrored elsewhere
///
/// An example of a use for a rope stream is to send updates from a rope to a user interface.
///
pub struct RopeStream<Cell, Attribute>
where
Cell: 'static + Send + Unpin + Clone + PartialEq,
Attribute: 'static + Send + Sync + Clone + Unpin + PartialEq + Default,
{
/// The identifier for this stream
pub (super) identifier: usize,
/// The core of the rope
pub (super) core: Arc<Desync<RopeBindingCore<Cell, Attribute>>>,
/// A future that will return the next poll result
pub (super) poll_future: Option<BoxFuture<'static, Poll<Option<VecDeque<RopeAction<Cell, Attribute>>>>>>,
/// The actions that are currently being drained through this stream
pub (super) draining: VecDeque<RopeAction<Cell, Attribute>>,
/// Set to true if this should reduce the usage count on the core when it's dropped
pub (super) retains_core: bool
}
impl<Cell, Attribute> Stream for RopeStream<Cell, Attribute>
where
Cell: 'static + Send + Unpin + Clone + PartialEq,
Attribute: 'static + Send + Sync + Clone + Unpin + PartialEq + Default,
{
type Item = RopeAction<Cell,Attribute>;
fn poll_next(mut self: Pin<&mut Self>, ctxt: &mut Context<'_>) -> Poll<Option<RopeAction<Cell, Attribute>>> {
// If we've got a set of actions we're already reading, then return those as fast as we can
if self.draining.len() > 0 {
return Poll::Ready(self.draining.pop_back());
}
// If we're waiting for the core to return to us, borrow the future from there
let poll_future = self.poll_future.take();
let mut poll_future = if let Some(poll_future) = poll_future {
// We're already waiting for the core to get back to us
poll_future
} else {
// Ask the core for the next stream state
let stream_id = self.identifier;
self.core.future_desync(move |core| {
async move {
// Pull any pending changes from the rope
core.pull_rope();
// Find the state of this stream
let stream_state = core.stream_states.iter_mut()
.filter(|state| state.identifier == stream_id)
.nth(0)
.unwrap();
// Check for data
if stream_state.pending_changes.len() > 0 {
// Return the changes to the waiting stream
let mut changes = VecDeque::new();
mem::swap(&mut changes, &mut stream_state.pending_changes);
Poll::Ready(Some(changes))
} else if core.usage_count == 0 {
// No changes, and nothing is using the core any more
Poll::Ready(None)
} else {
// No changes are waiting
Poll::Pending
}
}.boxed()
})
.map(|result| {
// Error would indicate the core had gone away before the request should complete, so we signal this as an end-of-stream event
match result {
Ok(result) => result,
Err(_) => Poll::Ready(None)
}
})
.boxed()
};
// Wake when the rope generates a 'pull' event
let waker = ctxt.waker().clone();
let stream_id = self.identifier;
self.core.desync(move |core| {
core.wake_stream(stream_id, waker);
});
// Ask the future for the latest update on this stream
let future_result = poll_future.poll_unpin(ctxt);
match future_result {
Poll::Ready(Poll::Ready(Some(actions))) => {
if actions.len() == 0 {
// Nothing waiting: need to wait until the rope signals a 'pull' event
Poll::Pending
} else {
// Have some actions ready
self.draining = actions;
Poll::Ready(self.draining.pop_back())
}
}
Poll::Ready(Poll::Ready(None)) => Poll::Ready(None),
Poll::Ready(Poll::Pending) => Poll::Pending,
Poll::Pending => {
// Poll the future again when it notifies
self.poll_future = Some(poll_future);
Poll::Pending
}
}
}
}
impl<Cell, Attribute> Drop for RopeStream<Cell, Attribute>
where
Cell: 'static + Send + Unpin + Clone + PartialEq,
Attribute: 'static + Send + Sync + Clone + Unpin + PartialEq + Default
{
fn drop(&mut self) {
// Remove the stream state when the stream is no more
let dropped_stream_id = self.identifier;
let retains_core = self.retains_core;
self.core.desync(move |core| {
core.stream_states.retain(|state| state.identifier != dropped_stream_id);
if retains_core {
// Core is no longer in use
core.usage_count -= 1;
// Counts as a notification if this is the last binding using this core
if core.usage_count == 0 {
core.pull_rope();
}
}
});
}
}