quicksink/
lib.rs

1// Copyright (c) 2019-2020 Parity Technologies (UK) Ltd.
2//
3// Licensed under the Apache License, Version 2.0
4// <LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0> or the MIT
5// license <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
6// option. All files in the project carrying such notice may not be copied,
7// modified, or distributed except according to those terms.
8
9//! Create a [`Sink`] implementation from an initial value and a closure
10//! returning a [`Future`].
11//!
12//! This is very similar to how `futures::stream::unfold` creates a `Stream`
13//! implementation from a seed value and a future-returning closure.
14//!
15//! # Examples
16//!
17//! ```no_run
18//! use async_std::io;
19//! use futures::prelude::*;
20//! use quicksink::Action;
21//!
22//! quicksink::make_sink(io::stdout(), |mut stdout, action| async move {
23//!     match action {
24//!         Action::Send(x) => stdout.write_all(x).await?,
25//!         Action::Flush => stdout.flush().await?,
26//!         Action::Close => stdout.close().await?
27//!     }
28//!     Ok::<_, io::Error>(stdout)
29//! });
30//! ```
31//!
32//! # Panics
33//!
34//! - If any of the [`Sink`] methods produce an error, the sink transitions
35//! to a failure state and none of its methods must be called afterwards or
36//! else a panic will occur.
37//! - If [`Sink::poll_close`] has been called, no other sink method must be
38//! called afterwards or else a panic will be caused.
39//!
40
41use futures_core::ready;
42use futures_sink::Sink;
43use pin_project_lite::pin_project;
44use std::{future::Future, pin::Pin, task::{Context, Poll}};
45
46/// Returns a `Sink` impl based on the initial value and the given closure.
47///
48/// The closure will be applied to the initial value and an [`Action`] that
49/// informs it about the action it should perform. The returned [`Future`]
50/// will resolve to another value and the process starts over using this
51/// output.
52pub fn make_sink<S, F, T, A, E>(init: S, f: F) -> SinkImpl<S, F, T, A, E>
53where
54    F: FnMut(S, Action<A>) -> T,
55    T: Future<Output = Result<S, E>>,
56{
57    SinkImpl {
58        lambda: f,
59        future: None,
60        param: Some(init),
61        state: State::Empty,
62        _mark: std::marker::PhantomData
63    }
64}
65
66/// The command given to the closure so that it can perform appropriate action.
67///
68/// Presumably the closure encapsulates a resource to perform I/O. The commands
69/// correspond to methods of the [`Sink`] trait and provide the closure with
70/// sufficient information to know what kind of action to perform with it.
71#[derive(Clone, Debug, PartialEq, Eq)]
72pub enum Action<A> {
73    /// Send the given value.
74    /// Corresponds to [`Sink::start_send`].
75    Send(A),
76    /// Flush the resource.
77    /// Corresponds to [`Sink::poll_flush`].
78    Flush,
79    /// Close the resource.
80    /// Corresponds to [`Sink::poll_close`].
81    Close
82}
83
84/// The various states the `Sink` may be in.
85#[derive(Debug, PartialEq, Eq)]
86enum State {
87    /// The `Sink` is idle.
88    Empty,
89    /// The `Sink` is sending a value.
90    Sending,
91    /// The `Sink` is flushing its resource.
92    Flushing,
93    /// The `Sink` is closing its resource.
94    Closing,
95    /// The `Sink` is closed (terminal state).
96    Closed,
97    /// The `Sink` experienced an error (terminal state).
98    Failed
99}
100
101pin_project!
102{
103    /// `SinkImpl` implements the `Sink` trait.
104    #[derive(Debug)]
105    pub struct SinkImpl<S, F, T, A, E> {
106        lambda: F,
107        #[pin] future: Option<T>,
108        param: Option<S>,
109        state: State,
110        _mark: std::marker::PhantomData<(A, E)>
111    }
112}
113
114impl<S, F, T, A, E> Sink<A> for SinkImpl<S, F, T, A, E>
115where
116    F: FnMut(S, Action<A>) -> T,
117    T: Future<Output = Result<S, E>>
118{
119    type Error = E;
120
121    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
122        let mut this = self.project();
123        match this.state {
124            State::Sending | State::Flushing => {
125                match ready!(this.future.as_mut().as_pin_mut().unwrap().poll(cx)) {
126                    Ok(p) => {
127                        this.future.set(None);
128                        *this.param = Some(p);
129                        *this.state = State::Empty;
130                        Poll::Ready(Ok(()))
131                    }
132                    Err(e) => {
133                        this.future.set(None);
134                        *this.state = State::Failed;
135                        Poll::Ready(Err(e))
136                    }
137                }
138            }
139            State::Closing => {
140                match ready!(this.future.as_mut().as_pin_mut().unwrap().poll(cx)) {
141                    Ok(_) => {
142                        this.future.set(None);
143                        *this.state = State::Closed;
144                        panic!("SinkImpl::poll_ready called on a closing sink.")
145                    }
146                    Err(e) => {
147                        this.future.set(None);
148                        *this.state = State::Failed;
149                        Poll::Ready(Err(e))
150                    }
151                }
152            }
153            State::Empty => {
154                assert!(this.param.is_some());
155                Poll::Ready(Ok(()))
156            }
157            State::Closed => panic!("SinkImpl::poll_ready called on a closed sink."),
158            State::Failed => panic!("SinkImpl::poll_ready called after error.")
159        }
160    }
161
162    fn start_send(self: Pin<&mut Self>, item: A) -> Result<(), Self::Error> {
163        assert_eq!(State::Empty, self.state);
164        let mut this = self.project();
165        let param = this.param.take().unwrap();
166        let future = (this.lambda)(param, Action::Send(item));
167        this.future.set(Some(future));
168        *this.state = State::Sending;
169        Ok(())
170    }
171
172    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
173        loop {
174            let mut this = self.as_mut().project();
175            match this.state {
176                State::Empty =>
177                    if let Some(p) = this.param.take() {
178                        let future = (this.lambda)(p, Action::Flush);
179                        this.future.set(Some(future));
180                        *this.state = State::Flushing
181                    } else {
182                        return Poll::Ready(Ok(()))
183                    }
184                State::Sending =>
185                    match ready!(this.future.as_mut().as_pin_mut().unwrap().poll(cx)) {
186                        Ok(p) => {
187                            this.future.set(None);
188                            *this.param = Some(p);
189                            *this.state = State::Empty
190                        }
191                        Err(e) => {
192                            this.future.set(None);
193                            *this.state = State::Failed;
194                            return Poll::Ready(Err(e))
195                        }
196                    }
197                State::Flushing =>
198                    match ready!(this.future.as_mut().as_pin_mut().unwrap().poll(cx)) {
199                        Ok(p) => {
200                            this.future.set(None);
201                            *this.param = Some(p);
202                            *this.state = State::Empty;
203                            return Poll::Ready(Ok(()))
204                        }
205                        Err(e) => {
206                            this.future.set(None);
207                            *this.state = State::Failed;
208                            return Poll::Ready(Err(e))
209                        }
210                    }
211                State::Closing =>
212                    match ready!(this.future.as_mut().as_pin_mut().unwrap().poll(cx)) {
213                        Ok(_) => {
214                            this.future.set(None);
215                            *this.state = State::Closed;
216                            return Poll::Ready(Ok(()))
217                        }
218                        Err(e) => {
219                            this.future.set(None);
220                            *this.state = State::Failed;
221                            return Poll::Ready(Err(e))
222                        }
223                    }
224                State::Closed => return Poll::Ready(Ok(())),
225                State::Failed => panic!("SinkImpl::poll_flush called after error.")
226            }
227        }
228    }
229
230    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
231        loop {
232            let mut this = self.as_mut().project();
233            match this.state {
234                State::Empty =>
235                    if let Some(p) = this.param.take() {
236                        let future = (this.lambda)(p, Action::Close);
237                        this.future.set(Some(future));
238                        *this.state = State::Closing;
239                    } else {
240                        return Poll::Ready(Ok(()))
241                    }
242                State::Sending =>
243                    match ready!(this.future.as_mut().as_pin_mut().unwrap().poll(cx)) {
244                        Ok(p) => {
245                            this.future.set(None);
246                            *this.param = Some(p);
247                            *this.state = State::Empty
248                        }
249                        Err(e) => {
250                            this.future.set(None);
251                            *this.state = State::Failed;
252                            return Poll::Ready(Err(e))
253                        }
254                    }
255                State::Flushing =>
256                    match ready!(this.future.as_mut().as_pin_mut().unwrap().poll(cx)) {
257                        Ok(p) => {
258                            this.future.set(None);
259                            *this.param = Some(p);
260                            *this.state = State::Empty
261                        }
262                        Err(e) => {
263                            this.future.set(None);
264                            *this.state = State::Failed;
265                            return Poll::Ready(Err(e))
266                        }
267                    }
268                State::Closing =>
269                    match ready!(this.future.as_mut().as_pin_mut().unwrap().poll(cx)) {
270                        Ok(_) => {
271                            this.future.set(None);
272                            *this.state = State::Closed;
273                            return Poll::Ready(Ok(()))
274                        }
275                        Err(e) => {
276                            this.future.set(None);
277                            *this.state = State::Failed;
278                            return Poll::Ready(Err(e))
279                        }
280                    }
281                State::Closed => return Poll::Ready(Ok(())),
282                State::Failed => panic!("SinkImpl::poll_closed called after error.")
283            }
284        }
285    }
286}
287
288#[cfg(test)]
289mod tests {
290    use async_std::{io, task};
291    use futures::{channel::mpsc, prelude::*, stream};
292    use crate::{Action, make_sink};
293
294    #[test]
295    fn smoke_test() {
296        task::block_on(async {
297            let sink = make_sink(io::stdout(), |mut stdout, action| async move {
298                match action {
299                    Action::Send(x) => stdout.write_all(x).await?,
300                    Action::Flush => stdout.flush().await?,
301                    Action::Close => stdout.close().await?
302                }
303                Ok::<_, io::Error>(stdout)
304            });
305
306            let values = vec![Ok(&b"hello\n"[..]), Ok(&b"world\n"[..])];
307            assert!(stream::iter(values).forward(sink).await.is_ok())
308        })
309    }
310
311    #[test]
312    fn replay() {
313        task::block_on(async {
314            let (tx, rx) = mpsc::unbounded();
315
316            let sink = make_sink(tx, |mut tx, action| async move {
317                tx.send(action.clone()).await?;
318                if action == Action::Close {
319                    tx.close().await?
320                }
321                Ok::<_, mpsc::SendError>(tx)
322            });
323
324            futures::pin_mut!(sink);
325
326            let expected = [
327                Action::Send("hello\n"),
328                Action::Flush,
329                Action::Send("world\n"),
330                Action::Flush,
331                Action::Close
332            ];
333
334            for &item in &["hello\n", "world\n"] {
335                sink.send(item).await.unwrap()
336            }
337
338            sink.close().await.unwrap();
339
340            let actual = rx.collect::<Vec<_>>().await;
341
342            assert_eq!(&expected[..], &actual[..])
343        });
344    }
345}