1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
use crate::rope_binding::core::*;

use flo_rope::*;
use ::desync::*;
use futures::task::*;
use futures::prelude::*;
use futures::future::{BoxFuture};

use std::mem;
use std::pin::*;
use std::sync::*;
use std::collections::{VecDeque};

///
/// A rope stream monitors a rope binding, and supplies them as a stream so they can be mirrored elsewhere
///
/// An example of a use for a rope stream is to send updates from a rope to a user interface.
///
pub struct RopeStream<Cell, Attribute> 
where 
Cell:       'static+Send+Unpin+Clone+PartialEq,
Attribute:  'static+Send+Sync+Clone+Unpin+PartialEq+Default {
    /// The identifier for this stream
    pub (super) identifier: usize,

    /// The core of the rope
    pub (super) core: Arc<Desync<RopeBindingCore<Cell, Attribute>>>,

    /// A future that will return the next poll result
    pub (super) poll_future: Option<BoxFuture<'static, Poll<Option<VecDeque<RopeAction<Cell, Attribute>>>>>>,

    /// The actions that are currently being drained through this stream
    pub (super) draining: VecDeque<RopeAction<Cell, Attribute>>,
}

impl<Cell, Attribute> Stream for RopeStream<Cell, Attribute>
where 
Cell:       'static+Send+Unpin+Clone+PartialEq,
Attribute:  'static+Send+Sync+Clone+Unpin+PartialEq+Default {
    type Item = RopeAction<Cell,Attribute>;

    fn poll_next(mut self: Pin<&mut Self>, ctxt: &mut Context<'_>) -> Poll<Option<RopeAction<Cell, Attribute>>> { 
        // If we've got a set of actions we're already reading, then return those as fast as we can
        if self.draining.len() > 0 {
            return Poll::Ready(self.draining.pop_back());
        }

        // If we're waiting for the core to return to us, borrow the future from there
        let poll_future     = self.poll_future.take();
        let mut poll_future = if let Some(poll_future) = poll_future {
            // We're already waiting for the core to get back to us
            poll_future
        } else {
            // Ask the core for the next stream state
            let stream_id = self.identifier;

            self.core.future_desync(move |core| {
                async move {
                    // Pull any pending changes from the rope
                    core.pull_rope();

                    // Find the state of this stream
                    let stream_state = core.stream_states.iter_mut()
                        .filter(|state| state.identifier == stream_id)
                        .nth(0)
                        .unwrap();

                    // Check for data
                    if stream_state.pending_changes.len() > 0 {
                        // Return the changes to the waiting stream
                        let mut changes = VecDeque::new();
                        mem::swap(&mut changes, &mut stream_state.pending_changes);

                        Poll::Ready(Some(changes))
                    } else if core.usage_count == 0 {
                        // No changes, and nothing is using the core any more
                        Poll::Ready(None)
                    } else {
                        // No changes are waiting
                        Poll::Pending
                    }
                }.boxed()
            })
            .map(|result| {
                // Error would indicate the core had gone away before the request should complete, so we signal this as an end-of-stream event
                match result {
                    Ok(result)  => result,
                    Err(_)      => Poll::Ready(None)
                }
            })
            .boxed()
        };

        // Ask the future for the latest update on this stream
        let future_result = poll_future.poll_unpin(ctxt);

        match future_result {
            Poll::Ready(Poll::Ready(Some(actions))) => {
                if actions.len() == 0 {
                    // Nothing waiting: need to wait until the rope signals a 'pull' event
                    let waker       = ctxt.waker().clone();
                    let stream_id   = self.identifier;

                    self.core.desync(move |core| {
                        core.wake_stream(stream_id, waker);
                    });

                    Poll::Pending
                } else {
                    // Have some actions ready
                    self.draining = actions;
                    Poll::Ready(self.draining.pop_back())
                }
            }

            Poll::Ready(Poll::Ready(None))  => Poll::Ready(None),
            Poll::Ready(Poll::Pending)      => {
                // Wake when the rope generates a 'pull' event
                let waker       = ctxt.waker().clone();
                let stream_id   = self.identifier;

                self.core.desync(move |core| {
                    core.wake_stream(stream_id, waker);
                });

                Poll::Pending
            }

            Poll::Pending                   => {
                // Poll the future again when it notifies
                self.poll_future = Some(poll_future);
                Poll::Pending
            }
        }
    }
}

impl<Cell, Attribute> Drop for RopeStream<Cell, Attribute>
where 
Cell:       'static+Send+Unpin+Clone+PartialEq,
Attribute:  'static+Send+Sync+Clone+Unpin+PartialEq+Default {
    fn drop(&mut self) {
        // Remove the stream state when the stream is no more
        let dropped_stream_id = self.identifier;
        self.core.desync(move |core| {
            core.stream_states.retain(|state| state.identifier != dropped_stream_id);
        });
    }
}