1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
use super::message_publisher::*;
use futures::prelude::*;
use futures::future::{BoxFuture};
use futures::task::{Poll, Context};
use std::pin::*;
///
/// The `StreamPublisher` struct sends a stream to a publisher. It implements the `Future` trait, and
/// will return an empty value when the entire stream has been sent to the publisher.
///
pub struct StreamPublisher<'a, Publisher, SourceStream> {
/// The publisher where the stream will be sent to
publisher: &'a mut Publisher,
/// Future that's polled when the publisher is closed
when_closed: BoxFuture<'static, ()>,
/// The stream that is being sent to the publisher
stream: Pin<Box<SourceStream>>,
/// The value that's currently being published to the publisher
currently_publishing: Option<BoxFuture<'static, ()>>
}
impl<'a, Publisher, SourceStream> StreamPublisher<'a, Publisher, SourceStream>
where Publisher: MessagePublisher,
SourceStream: 'a+Stream<Item=Publisher::Message> {
///
/// Creates a new stream publisher
///
pub fn new(publisher: &'a mut Publisher, stream: SourceStream) -> StreamPublisher<'a, Publisher, SourceStream> {
StreamPublisher {
when_closed: publisher.when_closed(),
publisher: publisher,
stream: Box::pin(stream),
currently_publishing: None
}
}
}
impl<'a, Publisher, SourceStream> Future for StreamPublisher<'a, Publisher, SourceStream>
where Publisher: MessagePublisher,
SourceStream: Stream<Item=Publisher::Message> {
type Output = ();
fn poll(mut self: Pin<&mut Self>, context: &mut Context) -> Poll<()> {
// If we're currently trying to publish a value, then poll that future first
let currently_publishing = self.currently_publishing.take();
if let Some(mut currently_publishing) = currently_publishing {
// Poll the value we're currently publishing
match currently_publishing.poll_unpin(context) {
// If still pending, return pending and keep publishing the value
Poll::Pending => { self.currently_publishing = Some(currently_publishing); return Poll::Pending; }
// If ready, carry on and try to read from the stream
Poll::Ready(()) => { }
}
}
// Poll when_closed (we check the flag later so the result of this future doesn't matter right now: we just need to make sure it wakes us)
let _when_closed = self.when_closed.poll_unpin(context);
// Attempt to read a value from the stream
loop {
if self.publisher.is_closed() {
// If the publisher has closed, then immediately complete the future (and stop reading from the stream)
return Poll::Ready(());
}
match self.stream.poll_next_unpin(context) {
Poll::Pending => {
// Stream is waiting to send more data
return Poll::Pending;
}
Poll::Ready(None) => {
// Stream has finished
return Poll::Ready(());
}
Poll::Ready(Some(next_message)) => {
// Start to send a new message
let mut currently_publishing = self.publisher.publish(next_message);
// Try to complete the send immediately
match currently_publishing.poll_unpin(context) {
// If still pending, return pending and keep publishing the value
Poll::Pending => { self.currently_publishing = Some(currently_publishing); return Poll::Pending; }
// If ready, carry on and try to read from the stream
Poll::Ready(()) => { }
}
}
}
}
}
}
///
/// Provides a way to send the values generated by a stream to a publisher
///
pub trait SendStreamToPublisher : Sized+MessagePublisher {
///
/// Sends everything from a particular source stream to this publisher
///
fn send_all<'a, SourceStream>(&'a mut self, stream: SourceStream) -> StreamPublisher<'a, Self, SourceStream>
where SourceStream: 'a+Stream<Item=Self::Message>;
}
impl<T: Sized+MessagePublisher> SendStreamToPublisher for T {
fn send_all<'a, SourceStream>(&'a mut self, stream: SourceStream) -> StreamPublisher<'a, Self, SourceStream>
where SourceStream: 'a+Stream<Item=Self::Message> {
StreamPublisher::new(self, stream)
}
}