flo_stream/
stream_publisher.rs1use super::message_publisher::*;
2
3use futures::prelude::*;
4use futures::future::{BoxFuture};
5use futures::task::{Poll, Context};
6
7use std::pin::*;
8
9pub struct StreamPublisher<'a, Publisher, SourceStream> {
14 publisher: &'a mut Publisher,
16
17 when_closed: BoxFuture<'static, ()>,
19
20 stream: Pin<Box<SourceStream>>,
22
23 currently_publishing: Option<BoxFuture<'static, ()>>
25}
26
27impl<'a, Publisher, SourceStream> StreamPublisher<'a, Publisher, SourceStream>
28where Publisher: MessagePublisher,
29 SourceStream: 'a+Stream<Item=Publisher::Message> {
30 pub fn new(publisher: &'a mut Publisher, stream: SourceStream) -> StreamPublisher<'a, Publisher, SourceStream> {
34 StreamPublisher {
35 when_closed: publisher.when_closed(),
36 publisher: publisher,
37 stream: Box::pin(stream),
38 currently_publishing: None
39 }
40 }
41}
42
43impl<'a, Publisher, SourceStream> Future for StreamPublisher<'a, Publisher, SourceStream>
44where Publisher: MessagePublisher,
45 SourceStream: Stream<Item=Publisher::Message> {
46 type Output = ();
47
48 fn poll(mut self: Pin<&mut Self>, context: &mut Context) -> Poll<()> {
49 let currently_publishing = self.currently_publishing.take();
51
52 if let Some(mut currently_publishing) = currently_publishing {
53 match currently_publishing.poll_unpin(context) {
55 Poll::Pending => { self.currently_publishing = Some(currently_publishing); return Poll::Pending; }
57
58 Poll::Ready(()) => { }
60 }
61 }
62
63 let _when_closed = self.when_closed.poll_unpin(context);
65
66 loop {
68 if self.publisher.is_closed() {
69 return Poll::Ready(());
71 }
72
73 match self.stream.poll_next_unpin(context) {
74 Poll::Pending => {
75 return Poll::Pending;
77 }
78
79 Poll::Ready(None) => {
80 return Poll::Ready(());
82 }
83
84 Poll::Ready(Some(next_message)) => {
85 let mut currently_publishing = self.publisher.publish(next_message);
87
88 match currently_publishing.poll_unpin(context) {
90 Poll::Pending => { self.currently_publishing = Some(currently_publishing); return Poll::Pending; }
92
93 Poll::Ready(()) => { }
95 }
96 }
97 }
98 }
99 }
100}
101
102pub trait SendStreamToPublisher : Sized+MessagePublisher {
106 fn send_all<'a, SourceStream>(&'a mut self, stream: SourceStream) -> StreamPublisher<'a, Self, SourceStream>
110 where SourceStream: 'a+Stream<Item=Self::Message>;
111}
112
113impl<T: Sized+MessagePublisher> SendStreamToPublisher for T {
114 fn send_all<'a, SourceStream>(&'a mut self, stream: SourceStream) -> StreamPublisher<'a, Self, SourceStream>
115 where SourceStream: 'a+Stream<Item=Self::Message> {
116 StreamPublisher::new(self, stream)
117 }
118}