1use super::subscriber::*;
2use super::message_publisher::*;
3
4use futures::prelude::*;
5use futures::task::{Context, Poll};
6use futures::future::{BoxFuture};
7
8use std::pin::{Pin};
9
10pub struct PublisherSink<Publisher>
14where Publisher: MessagePublisher {
15 publisher: Option<Publisher>,
17
18 future_sender: Option<BoxFuture<'static, MessageSender<Publisher::Message>>>,
20
21 next_sender: Option<MessageSender<Publisher::Message>>,
23
24 future_flush: Option<BoxFuture<'static, ()>>
26}
27
28impl<Publisher> PublisherSink<Publisher>
29where Publisher: MessagePublisher {
30 pub fn as_publisher<'a>(&'a mut self) -> Option<&'a mut Publisher> {
34 self.publisher.as_mut()
35 }
36
37 pub fn subscribe(&mut self) -> Option<Subscriber<Publisher::Message>> {
43 self.publisher.as_mut().map(|publisher| publisher.subscribe())
44 }
45
46 pub fn when_ready(&mut self) -> Option<BoxFuture<'static, MessageSender<Publisher::Message>>> {
50 self.publisher.as_mut().map(|publisher| publisher.when_ready())
51 }
52
53 pub fn when_empty(&mut self) -> Option<BoxFuture<'static, ()>> {
59 self.publisher.as_mut().map(|publisher| publisher.when_empty())
60 }
61
62 pub fn publish(&mut self, message: Publisher::Message) -> Option<BoxFuture<'static, ()>> {
68 self.publisher.as_mut().map(|publisher| publisher.publish(message))
69 }
70}
71
72impl<Publisher> Sink<Publisher::Message> for PublisherSink<Publisher>
73where Publisher: MessagePublisher,
74Self: Unpin {
75 type Error = ();
76
77 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
78 let future_sender = match self.future_sender {
80 Some(ref mut future_sender) => future_sender,
81 None => {
82 self.future_sender = self.when_ready();
83 if self.future_sender.is_none() { return Poll::Ready(Err(())); }
84 self.future_sender.as_mut().unwrap()
85 }
86 };
87
88 match future_sender.poll_unpin(cx) {
90 Poll::Ready(sender) => {
91 self.future_sender = None;
92 self.next_sender = Some(sender);
93 Poll::Ready(Ok(()))
94 },
95
96 Poll::Pending => Poll::Pending
97 }
98 }
99
100 fn start_send(mut self: Pin<&mut Self>, item: Publisher::Message) -> Result<(), Self::Error> {
101 self.next_sender.take().map(move |sender| sender.send(item));
103 Ok(())
104 }
105
106 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
107 let future_flush = match self.future_flush {
109 Some(ref mut future_flush) => future_flush,
110 None => {
111 self.future_flush = self.when_empty();
112 if self.future_flush.is_none() { return Poll::Ready(Err(())); }
113 self.future_flush.as_mut().unwrap()
114 }
115 };
116
117 match future_flush.poll_unpin(cx) {
119 Poll::Ready(_) => {
120 self.future_flush = None;
121 Poll::Ready(Ok(()))
122 },
123
124 Poll::Pending => Poll::Pending
125 }
126 }
127
128 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
129 let future_flush = match self.future_flush {
134 Some(ref mut future_flush) => future_flush,
135 None => {
136 self.future_flush = self.when_empty();
137 if self.future_flush.is_none() { return Poll::Ready(Err(())); }
138 self.future_flush.as_mut().unwrap()
139 }
140 };
141
142 let result = future_flush.poll_unpin(cx);
144
145 self.publisher = None;
147 self.future_sender = None;
148 self.next_sender = None;
149
150 match result {
152 Poll::Ready(_) => {
153 self.future_flush = None;
154 Poll::Ready(Ok(()))
155 },
156
157 Poll::Pending => Poll::Pending
158 }
159 }
160}
161
162pub trait ToPublisherSink : Sized+MessagePublisher {
166 fn to_sink(self) -> PublisherSink<Self>;
170}
171
172impl<Publisher> ToPublisherSink for Publisher
173where Publisher: Sized+MessagePublisher {
174 fn to_sink(self) -> PublisherSink<Self> {
175 PublisherSink {
176 publisher: Some(self),
177 future_sender: None,
178 next_sender: None,
179 future_flush: None,
180 }
181 }
182}