flo_stream/
sink.rs

1use super::subscriber::*;
2use super::message_publisher::*;
3
4use futures::prelude::*;
5use futures::task::{Context, Poll};
6use futures::future::{BoxFuture};
7
8use std::pin::{Pin};
9
10///
11/// An implementation of the Sink trait that can be applied to publishers
12///
13pub struct PublisherSink<Publisher>
14where Publisher: MessagePublisher {
15    /// The publisher that is being turned into a sink
16    publisher: Option<Publisher>,
17
18    /// Future for awaiting the message sender
19    future_sender: Option<BoxFuture<'static, MessageSender<Publisher::Message>>>,
20
21    /// The sender returned by poll_ready
22    next_sender: Option<MessageSender<Publisher::Message>>,
23
24    /// The future waiting for the publisher to flush
25    future_flush: Option<BoxFuture<'static, ()>>
26}
27
28impl<Publisher> PublisherSink<Publisher> 
29where Publisher: MessagePublisher {
30    ///
31    /// Provides access to the underlying MessagePublisher for this sink
32    ///
33    pub fn as_publisher<'a>(&'a mut self) -> Option<&'a mut Publisher> {
34        self.publisher.as_mut()
35    }
36
37    ///
38    /// Creates a subscription to this publisher
39    /// 
40    /// Any future messages sent here will also be sent to this subscriber. Returns None if the sink has been closed
41    /// 
42    pub fn subscribe(&mut self) -> Option<Subscriber<Publisher::Message>> {
43        self.publisher.as_mut().map(|publisher| publisher.subscribe())
44    }
45
46    ///
47    /// Reserves a space for a message with the subscribers, returning when it's ready
48    ///
49    pub fn when_ready(&mut self) -> Option<BoxFuture<'static, MessageSender<Publisher::Message>>> {
50        self.publisher.as_mut().map(|publisher| publisher.when_ready())
51    }
52
53    ///
54    /// Waits until all subscribers have consumed all pending messages
55    /// 
56    /// Returns None if the sink has been closed.
57    ///
58    pub fn when_empty(&mut self) -> Option<BoxFuture<'static, ()>> {
59        self.publisher.as_mut().map(|publisher| publisher.when_empty())
60    }
61
62    ///
63    /// Publishes a message to the subscribers of this object 
64    /// 
65    /// Returns None if the sink has been closed
66    ///
67    pub fn publish(&mut self, message: Publisher::Message) -> Option<BoxFuture<'static, ()>> {
68        self.publisher.as_mut().map(|publisher| publisher.publish(message))
69    }
70}
71
72impl<Publisher> Sink<Publisher::Message> for PublisherSink<Publisher>
73where Publisher: MessagePublisher,
74Self: Unpin {
75    type Error = ();
76
77    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
78        // Get or create the future sender (get_or_insert_with won't work here due to the multiple borrow of self)
79        let future_sender   = match self.future_sender {
80            Some(ref mut future_sender) => future_sender,
81            None                        => {
82                self.future_sender = self.when_ready();
83                if self.future_sender.is_none() { return Poll::Ready(Err(())); }
84                self.future_sender.as_mut().unwrap()
85            }
86        };
87
88        // Poll for the next sender and ready it if possible
89        match future_sender.poll_unpin(cx) {
90            Poll::Ready(sender) => {
91                self.future_sender  = None;
92                self.next_sender    = Some(sender);
93                Poll::Ready(Ok(()))
94            },
95
96            Poll::Pending       => Poll::Pending
97        }
98    }
99
100    fn start_send(mut self: Pin<&mut Self>, item: Publisher::Message) -> Result<(), Self::Error> {
101        // Send to the next sender if one has been prepared by calling poll_ready
102        self.next_sender.take().map(move |sender| sender.send(item));
103        Ok(())
104    }
105
106    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
107        // Get or create the flush future (get_or_insert_with won't work here due to the multiple borrow of self)
108        let future_flush    = match self.future_flush {
109            Some(ref mut future_flush)  => future_flush,
110            None                        => {
111                self.future_flush = self.when_empty();
112                if self.future_flush.is_none() { return Poll::Ready(Err(())); }
113                self.future_flush.as_mut().unwrap()
114            }
115        };
116
117        // Poll the future for when the publisher is empty
118        match future_flush.poll_unpin(cx) {
119            Poll::Ready(_)  => {
120                self.future_flush  = None;
121                Poll::Ready(Ok(()))
122            },
123
124            Poll::Pending   => Poll::Pending
125        }
126    }
127
128    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
129        // This is the same as poll_flush, but the use of Pin prevents us from calling that and unsetting the future
130        // The flush future will keep the publisher around until all the subscriber messages are processed
131
132        // Get or create the flush future (get_or_insert_with won't work here due to the multiple borrow of self)
133        let future_flush    = match self.future_flush {
134            Some(ref mut future_flush)  => future_flush,
135            None                        => {
136                self.future_flush = self.when_empty();
137                if self.future_flush.is_none() { return Poll::Ready(Err(())); }
138                self.future_flush.as_mut().unwrap()
139            }
140        };
141
142        // Need to poll here as we can't set publisher to none while we've borrowed the future_flush future
143        let result = future_flush.poll_unpin(cx);
144
145        // Unset the publisher so it's dropped when the flush is done
146        self.publisher      = None;
147        self.future_sender  = None;
148        self.next_sender    = None;
149
150        // Poll the future for when the publisher is empty
151        match result {
152            Poll::Ready(_)  => {
153                self.future_flush  = None;
154                Poll::Ready(Ok(()))
155            },
156
157            Poll::Pending   => Poll::Pending
158        }
159    }
160}
161
162///
163/// Trait that turns publishers into sinks
164///
165pub trait ToPublisherSink : Sized+MessagePublisher {
166    ///
167    /// Converts this publisher into a futures Sink
168    ///
169    fn to_sink(self) -> PublisherSink<Self>;
170}
171
172impl<Publisher> ToPublisherSink for Publisher
173where Publisher: Sized+MessagePublisher {
174    fn to_sink(self) -> PublisherSink<Self> {
175        PublisherSink {
176            publisher:      Some(self),
177            future_sender:  None,
178            next_sender:    None,
179            future_flush:   None,
180        }
181    }
182}