1use futures_core::ready;
42use futures_sink::Sink;
43use pin_project_lite::pin_project;
44use std::{future::Future, pin::Pin, task::{Context, Poll}};
45
46pub fn make_sink<S, F, T, A, E>(init: S, f: F) -> SinkImpl<S, F, T, A, E>
53where
54 F: FnMut(S, Action<A>) -> T,
55 T: Future<Output = Result<S, E>>,
56{
57 SinkImpl {
58 lambda: f,
59 future: None,
60 param: Some(init),
61 state: State::Empty,
62 _mark: std::marker::PhantomData
63 }
64}
65
66#[derive(Clone, Debug, PartialEq, Eq)]
72pub enum Action<A> {
73 Send(A),
76 Flush,
79 Close
82}
83
84#[derive(Debug, PartialEq, Eq)]
86enum State {
87 Empty,
89 Sending,
91 Flushing,
93 Closing,
95 Closed,
97 Failed
99}
100
101pin_project!
102{
103 #[derive(Debug)]
105 pub struct SinkImpl<S, F, T, A, E> {
106 lambda: F,
107 #[pin] future: Option<T>,
108 param: Option<S>,
109 state: State,
110 _mark: std::marker::PhantomData<(A, E)>
111 }
112}
113
114impl<S, F, T, A, E> Sink<A> for SinkImpl<S, F, T, A, E>
115where
116 F: FnMut(S, Action<A>) -> T,
117 T: Future<Output = Result<S, E>>
118{
119 type Error = E;
120
121 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
122 let mut this = self.project();
123 match this.state {
124 State::Sending | State::Flushing => {
125 match ready!(this.future.as_mut().as_pin_mut().unwrap().poll(cx)) {
126 Ok(p) => {
127 this.future.set(None);
128 *this.param = Some(p);
129 *this.state = State::Empty;
130 Poll::Ready(Ok(()))
131 }
132 Err(e) => {
133 this.future.set(None);
134 *this.state = State::Failed;
135 Poll::Ready(Err(e))
136 }
137 }
138 }
139 State::Closing => {
140 match ready!(this.future.as_mut().as_pin_mut().unwrap().poll(cx)) {
141 Ok(_) => {
142 this.future.set(None);
143 *this.state = State::Closed;
144 panic!("SinkImpl::poll_ready called on a closing sink.")
145 }
146 Err(e) => {
147 this.future.set(None);
148 *this.state = State::Failed;
149 Poll::Ready(Err(e))
150 }
151 }
152 }
153 State::Empty => {
154 assert!(this.param.is_some());
155 Poll::Ready(Ok(()))
156 }
157 State::Closed => panic!("SinkImpl::poll_ready called on a closed sink."),
158 State::Failed => panic!("SinkImpl::poll_ready called after error.")
159 }
160 }
161
162 fn start_send(self: Pin<&mut Self>, item: A) -> Result<(), Self::Error> {
163 assert_eq!(State::Empty, self.state);
164 let mut this = self.project();
165 let param = this.param.take().unwrap();
166 let future = (this.lambda)(param, Action::Send(item));
167 this.future.set(Some(future));
168 *this.state = State::Sending;
169 Ok(())
170 }
171
172 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
173 loop {
174 let mut this = self.as_mut().project();
175 match this.state {
176 State::Empty =>
177 if let Some(p) = this.param.take() {
178 let future = (this.lambda)(p, Action::Flush);
179 this.future.set(Some(future));
180 *this.state = State::Flushing
181 } else {
182 return Poll::Ready(Ok(()))
183 }
184 State::Sending =>
185 match ready!(this.future.as_mut().as_pin_mut().unwrap().poll(cx)) {
186 Ok(p) => {
187 this.future.set(None);
188 *this.param = Some(p);
189 *this.state = State::Empty
190 }
191 Err(e) => {
192 this.future.set(None);
193 *this.state = State::Failed;
194 return Poll::Ready(Err(e))
195 }
196 }
197 State::Flushing =>
198 match ready!(this.future.as_mut().as_pin_mut().unwrap().poll(cx)) {
199 Ok(p) => {
200 this.future.set(None);
201 *this.param = Some(p);
202 *this.state = State::Empty;
203 return Poll::Ready(Ok(()))
204 }
205 Err(e) => {
206 this.future.set(None);
207 *this.state = State::Failed;
208 return Poll::Ready(Err(e))
209 }
210 }
211 State::Closing =>
212 match ready!(this.future.as_mut().as_pin_mut().unwrap().poll(cx)) {
213 Ok(_) => {
214 this.future.set(None);
215 *this.state = State::Closed;
216 return Poll::Ready(Ok(()))
217 }
218 Err(e) => {
219 this.future.set(None);
220 *this.state = State::Failed;
221 return Poll::Ready(Err(e))
222 }
223 }
224 State::Closed => return Poll::Ready(Ok(())),
225 State::Failed => panic!("SinkImpl::poll_flush called after error.")
226 }
227 }
228 }
229
230 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
231 loop {
232 let mut this = self.as_mut().project();
233 match this.state {
234 State::Empty =>
235 if let Some(p) = this.param.take() {
236 let future = (this.lambda)(p, Action::Close);
237 this.future.set(Some(future));
238 *this.state = State::Closing;
239 } else {
240 return Poll::Ready(Ok(()))
241 }
242 State::Sending =>
243 match ready!(this.future.as_mut().as_pin_mut().unwrap().poll(cx)) {
244 Ok(p) => {
245 this.future.set(None);
246 *this.param = Some(p);
247 *this.state = State::Empty
248 }
249 Err(e) => {
250 this.future.set(None);
251 *this.state = State::Failed;
252 return Poll::Ready(Err(e))
253 }
254 }
255 State::Flushing =>
256 match ready!(this.future.as_mut().as_pin_mut().unwrap().poll(cx)) {
257 Ok(p) => {
258 this.future.set(None);
259 *this.param = Some(p);
260 *this.state = State::Empty
261 }
262 Err(e) => {
263 this.future.set(None);
264 *this.state = State::Failed;
265 return Poll::Ready(Err(e))
266 }
267 }
268 State::Closing =>
269 match ready!(this.future.as_mut().as_pin_mut().unwrap().poll(cx)) {
270 Ok(_) => {
271 this.future.set(None);
272 *this.state = State::Closed;
273 return Poll::Ready(Ok(()))
274 }
275 Err(e) => {
276 this.future.set(None);
277 *this.state = State::Failed;
278 return Poll::Ready(Err(e))
279 }
280 }
281 State::Closed => return Poll::Ready(Ok(())),
282 State::Failed => panic!("SinkImpl::poll_closed called after error.")
283 }
284 }
285 }
286}
287
288#[cfg(test)]
289mod tests {
290 use async_std::{io, task};
291 use futures::{channel::mpsc, prelude::*, stream};
292 use crate::{Action, make_sink};
293
294 #[test]
295 fn smoke_test() {
296 task::block_on(async {
297 let sink = make_sink(io::stdout(), |mut stdout, action| async move {
298 match action {
299 Action::Send(x) => stdout.write_all(x).await?,
300 Action::Flush => stdout.flush().await?,
301 Action::Close => stdout.close().await?
302 }
303 Ok::<_, io::Error>(stdout)
304 });
305
306 let values = vec![Ok(&b"hello\n"[..]), Ok(&b"world\n"[..])];
307 assert!(stream::iter(values).forward(sink).await.is_ok())
308 })
309 }
310
311 #[test]
312 fn replay() {
313 task::block_on(async {
314 let (tx, rx) = mpsc::unbounded();
315
316 let sink = make_sink(tx, |mut tx, action| async move {
317 tx.send(action.clone()).await?;
318 if action == Action::Close {
319 tx.close().await?
320 }
321 Ok::<_, mpsc::SendError>(tx)
322 });
323
324 futures::pin_mut!(sink);
325
326 let expected = [
327 Action::Send("hello\n"),
328 Action::Flush,
329 Action::Send("world\n"),
330 Action::Flush,
331 Action::Close
332 ];
333
334 for &item in &["hello\n", "world\n"] {
335 sink.send(item).await.unwrap()
336 }
337
338 sink.close().await.unwrap();
339
340 let actual = rx.collect::<Vec<_>>().await;
341
342 assert_eq!(&expected[..], &actual[..])
343 });
344 }
345}