1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
use super::subscriber::*;
use super::message_publisher::*;

use futures::prelude::*;
use futures::task::{Context, Poll};
use futures::future::{BoxFuture};

use std::pin::{Pin};

///
/// An implementation of the Sink trait that can be applied to publishers
///
pub struct PublisherSink<Publisher>
where Publisher: MessagePublisher {
    /// The publisher that is being turned into a sink
    publisher: Option<Publisher>,

    /// Future for awaiting the message sender
    future_sender: Option<BoxFuture<'static, MessageSender<Publisher::Message>>>,

    /// The sender returned by poll_ready
    next_sender: Option<MessageSender<Publisher::Message>>,

    /// The future waiting for the publisher to flush
    future_flush: Option<BoxFuture<'static, ()>>
}

impl<Publisher> PublisherSink<Publisher> 
where Publisher: MessagePublisher {
    ///
    /// Provides access to the underlying MessagePublisher for this sink
    ///
    pub fn as_publisher<'a>(&'a mut self) -> Option<&'a mut Publisher> {
        self.publisher.as_mut()
    }

    ///
    /// Creates a subscription to this publisher
    /// 
    /// Any future messages sent here will also be sent to this subscriber. Returns None if the sink has been closed
    /// 
    pub fn subscribe(&mut self) -> Option<Subscriber<Publisher::Message>> {
        self.publisher.as_mut().map(|publisher| publisher.subscribe())
    }

    ///
    /// Reserves a space for a message with the subscribers, returning when it's ready
    ///
    pub fn when_ready(&mut self) -> Option<BoxFuture<'static, MessageSender<Publisher::Message>>> {
        self.publisher.as_mut().map(|publisher| publisher.when_ready())
    }

    ///
    /// Waits until all subscribers have consumed all pending messages
    /// 
    /// Returns None if the sink has been closed.
    ///
    pub fn when_empty(&mut self) -> Option<BoxFuture<'static, ()>> {
        self.publisher.as_mut().map(|publisher| publisher.when_empty())
    }

    ///
    /// Publishes a message to the subscribers of this object 
    /// 
    /// Returns None if the sink has been closed
    ///
    pub fn publish(&mut self, message: Publisher::Message) -> Option<BoxFuture<'static, ()>> {
        self.publisher.as_mut().map(|publisher| publisher.publish(message))
    }
}

impl<Publisher> Sink<Publisher::Message> for PublisherSink<Publisher>
where Publisher: MessagePublisher,
Self: Unpin {
    type Error = ();

    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
        // Get or create the future sender (get_or_insert_with won't work here due to the multiple borrow of self)
        let future_sender   = match self.future_sender {
            Some(ref mut future_sender) => future_sender,
            None                        => {
                self.future_sender = self.when_ready();
                if self.future_sender.is_none() { return Poll::Ready(Err(())); }
                self.future_sender.as_mut().unwrap()
            }
        };

        // Poll for the next sender and ready it if possible
        match future_sender.poll_unpin(cx) {
            Poll::Ready(sender) => {
                self.future_sender  = None;
                self.next_sender    = Some(sender);
                Poll::Ready(Ok(()))
            },

            Poll::Pending       => Poll::Pending
        }
    }

    fn start_send(mut self: Pin<&mut Self>, item: Publisher::Message) -> Result<(), Self::Error> {
        // Send to the next sender if one has been prepared by calling poll_ready
        self.next_sender.take().map(move |sender| sender.send(item));
        Ok(())
    }

    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
        // Get or create the flush future (get_or_insert_with won't work here due to the multiple borrow of self)
        let future_flush    = match self.future_flush {
            Some(ref mut future_flush)  => future_flush,
            None                        => {
                self.future_flush = self.when_empty();
                if self.future_flush.is_none() { return Poll::Ready(Err(())); }
                self.future_flush.as_mut().unwrap()
            }
        };

        // Poll the future for when the publisher is empty
        match future_flush.poll_unpin(cx) {
            Poll::Ready(_)  => {
                self.future_flush  = None;
                Poll::Ready(Ok(()))
            },

            Poll::Pending   => Poll::Pending
        }
    }

    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
        // This is the same as poll_flush, but the use of Pin prevents us from calling that and unsetting the future
        // The flush future will keep the publisher around until all the subscriber messages are processed

        // Get or create the flush future (get_or_insert_with won't work here due to the multiple borrow of self)
        let future_flush    = match self.future_flush {
            Some(ref mut future_flush)  => future_flush,
            None                        => {
                self.future_flush = self.when_empty();
                if self.future_flush.is_none() { return Poll::Ready(Err(())); }
                self.future_flush.as_mut().unwrap()
            }
        };

        // Need to poll here as we can't set publisher to none while we've borrowed the future_flush future
        let result = future_flush.poll_unpin(cx);

        // Unset the publisher so it's dropped when the flush is done
        self.publisher      = None;
        self.future_sender  = None;
        self.next_sender    = None;

        // Poll the future for when the publisher is empty
        match result {
            Poll::Ready(_)  => {
                self.future_flush  = None;
                Poll::Ready(Ok(()))
            },

            Poll::Pending   => Poll::Pending
        }
    }
}

///
/// Trait that turns publishers into sinks
///
pub trait ToPublisherSink : Sized+MessagePublisher {
    ///
    /// Converts this publisher into a futures Sink
    ///
    fn to_sink(self) -> PublisherSink<Self>;
}

impl<Publisher> ToPublisherSink for Publisher
where Publisher: Sized+MessagePublisher {
    fn to_sink(self) -> PublisherSink<Self> {
        PublisherSink {
            publisher:      Some(self),
            future_sender:  None,
            next_sender:    None,
            future_flush:   None,
        }
    }
}