flo_stream/
stream_publisher.rs

1use super::message_publisher::*;
2
3use futures::prelude::*;
4use futures::future::{BoxFuture};
5use futures::task::{Poll, Context};
6
7use std::pin::*;
8
9///
10/// The `StreamPublisher` struct sends a stream to a publisher. It implements the `Future` trait, and
11/// will return an empty value when the entire stream has been sent to the publisher.
12///
13pub struct StreamPublisher<'a, Publisher, SourceStream> {
14    /// The publisher where the stream will be sent to
15    publisher: &'a mut Publisher,
16
17    /// Future that's polled when the publisher is closed
18    when_closed: BoxFuture<'static, ()>,
19
20    /// The stream that is being sent to the publisher
21    stream: Pin<Box<SourceStream>>,
22
23    /// The value that's currently being published to the publisher
24    currently_publishing: Option<BoxFuture<'static, ()>>
25}
26
27impl<'a, Publisher, SourceStream> StreamPublisher<'a, Publisher, SourceStream>
28where   Publisher:      MessagePublisher,
29        SourceStream:   'a+Stream<Item=Publisher::Message> {
30    ///
31    /// Creates a new stream publisher
32    ///
33    pub fn new(publisher: &'a mut Publisher, stream: SourceStream) -> StreamPublisher<'a, Publisher, SourceStream> {
34        StreamPublisher {
35            when_closed:            publisher.when_closed(),
36            publisher:              publisher,
37            stream:                 Box::pin(stream),
38            currently_publishing:   None
39        }
40    }
41}
42
43impl<'a, Publisher, SourceStream> Future for StreamPublisher<'a, Publisher, SourceStream>
44where   Publisher:      MessagePublisher,
45        SourceStream:   Stream<Item=Publisher::Message> {
46    type Output = ();
47
48    fn poll(mut self: Pin<&mut Self>, context: &mut Context) -> Poll<()> {
49        // If we're currently trying to publish a value, then poll that future first
50        let currently_publishing = self.currently_publishing.take();
51
52        if let Some(mut currently_publishing) = currently_publishing {
53            // Poll the value we're currently publishing
54            match currently_publishing.poll_unpin(context) {
55                // If still pending, return pending and keep publishing the value
56                Poll::Pending   => { self.currently_publishing = Some(currently_publishing); return Poll::Pending; }
57
58                // If ready, carry on and try to read from the stream
59                Poll::Ready(()) => { }
60            }
61        }
62
63        // Poll when_closed (we check the flag later so the result of this future doesn't matter right now: we just need to make sure it wakes us)
64        let _when_closed = self.when_closed.poll_unpin(context);
65
66        // Attempt to read a value from the stream
67        loop {
68            if self.publisher.is_closed() {
69                // If the publisher has closed, then immediately complete the future (and stop reading from the stream)
70                return Poll::Ready(());
71            }
72
73            match self.stream.poll_next_unpin(context) {
74                Poll::Pending => {
75                    // Stream is waiting to send more data
76                    return Poll::Pending;
77                }
78
79                Poll::Ready(None) => {
80                    // Stream has finished
81                    return Poll::Ready(());
82                }
83
84                Poll::Ready(Some(next_message)) => {
85                    // Start to send a new message
86                    let mut currently_publishing = self.publisher.publish(next_message);
87
88                    // Try to complete the send immediately
89                    match currently_publishing.poll_unpin(context) {
90                        // If still pending, return pending and keep publishing the value
91                        Poll::Pending   => { self.currently_publishing = Some(currently_publishing); return Poll::Pending; }
92
93                        // If ready, carry on and try to read from the stream
94                        Poll::Ready(()) => { }
95                    }
96                }
97            }
98        }
99    }
100}
101
102///
103/// Provides a way to send the values generated by a stream to a publisher
104///
105pub trait SendStreamToPublisher : Sized+MessagePublisher {
106    ///
107    /// Sends everything from a particular source stream to this publisher
108    ///
109    fn send_all<'a, SourceStream>(&'a mut self, stream: SourceStream) -> StreamPublisher<'a, Self, SourceStream>
110    where SourceStream: 'a+Stream<Item=Self::Message>;
111}
112
113impl<T: Sized+MessagePublisher> SendStreamToPublisher for T {
114    fn send_all<'a, SourceStream>(&'a mut self, stream: SourceStream) -> StreamPublisher<'a, Self, SourceStream>
115    where SourceStream: 'a+Stream<Item=Self::Message> {
116        StreamPublisher::new(self, stream)
117    }
118}