1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
use super::message_publisher::*;
use futures::prelude::*;
use futures::future::{BoxFuture};
use futures::task::{Poll, Context};
use std::pin::*;
pub struct StreamPublisher<'a, Publisher, SourceStream> {
publisher: &'a mut Publisher,
when_closed: BoxFuture<'static, ()>,
stream: Pin<Box<SourceStream>>,
currently_publishing: Option<BoxFuture<'static, ()>>
}
impl<'a, Publisher, SourceStream> StreamPublisher<'a, Publisher, SourceStream>
where Publisher: MessagePublisher,
SourceStream: 'a+Stream<Item=Publisher::Message> {
pub fn new(publisher: &'a mut Publisher, stream: SourceStream) -> StreamPublisher<'a, Publisher, SourceStream> {
StreamPublisher {
when_closed: publisher.when_closed(),
publisher: publisher,
stream: Box::pin(stream),
currently_publishing: None
}
}
}
impl<'a, Publisher, SourceStream> Future for StreamPublisher<'a, Publisher, SourceStream>
where Publisher: MessagePublisher,
SourceStream: Stream<Item=Publisher::Message> {
type Output = ();
fn poll(mut self: Pin<&mut Self>, context: &mut Context) -> Poll<()> {
let currently_publishing = self.currently_publishing.take();
if let Some(mut currently_publishing) = currently_publishing {
match currently_publishing.poll_unpin(context) {
Poll::Pending => { self.currently_publishing = Some(currently_publishing); return Poll::Pending; }
Poll::Ready(()) => { }
}
}
let _when_closed = self.when_closed.poll_unpin(context);
loop {
if self.publisher.is_closed() {
return Poll::Ready(());
}
match self.stream.poll_next_unpin(context) {
Poll::Pending => {
return Poll::Pending;
}
Poll::Ready(None) => {
return Poll::Ready(());
}
Poll::Ready(Some(next_message)) => {
let mut currently_publishing = self.publisher.publish(next_message);
match currently_publishing.poll_unpin(context) {
Poll::Pending => { self.currently_publishing = Some(currently_publishing); return Poll::Pending; }
Poll::Ready(()) => { }
}
}
}
}
}
}
pub trait SendStreamToPublisher : Sized+MessagePublisher {
fn send_all<'a, SourceStream>(&'a mut self, stream: SourceStream) -> StreamPublisher<'a, Self, SourceStream>
where SourceStream: 'a+Stream<Item=Self::Message>;
}
impl<T: Sized+MessagePublisher> SendStreamToPublisher for T {
fn send_all<'a, SourceStream>(&'a mut self, stream: SourceStream) -> StreamPublisher<'a, Self, SourceStream>
where SourceStream: 'a+Stream<Item=Self::Message> {
StreamPublisher::new(self, stream)
}
}