async_sink/
lib.rs

1//! Asynchronous sinks for the [tokio](https://docs.rs/tokio) ecosystem
2//!
3//! This crate contains the `Sink` trait which allows values to be sent
4//! asynchronously.
5
6#![no_std]
7#![doc(test(
8    no_crate_inject,
9    attr(
10        deny(warnings, rust_2018_idioms, single_use_lifetimes),
11        allow(dead_code, unused_assignments, unused_variables)
12    )
13))]
14#![warn(missing_docs, /* unsafe_op_in_unsafe_fn */)] // unsafe_op_in_unsafe_fn requires Rust 1.52
15
16#[cfg(feature = "alloc")]
17extern crate alloc;
18#[cfg(feature = "std")]
19extern crate std;
20
21mod ext;
22pub(crate) mod unfold_state;
23use core::ops::DerefMut;
24use core::pin::Pin;
25use core::task::{Context, Poll};
26pub use ext::*;
27
28/// A `Sink` is a value into which other values can be sent, asynchronously.
29///
30/// Basic examples of sinks include the sending side of:
31///
32/// - Channels
33/// - Sockets
34/// - Pipes
35///
36/// In addition to such "primitive" sinks, it's typical to layer additional
37/// functionality, such as buffering, on top of an existing sink.
38///
39/// Sending to a sink is "asynchronous" in the sense that the value may not be
40/// sent in its entirety immediately. Instead, values are sent in a two-phase
41/// way: first by initiating a send, and then by polling for completion. This
42/// two-phase setup is analogous to buffered writing in synchronous code, where
43/// writes often succeed immediately, but internally are buffered and are
44/// *actually* written only upon flushing.
45///
46/// In addition, the `Sink` may be *full*, in which case it is not even possible
47/// to start the sending process.
48///
49/// As with `Future` and `Stream`, the `Sink` trait is built from a few core
50/// required methods, and a host of default methods for working in a
51/// higher-level way. The `Sink::send_all` combinator is of particular
52/// importance: you can use it to send an entire stream to a sink, which is
53/// the simplest way to ultimately consume a stream.
54#[must_use = "sinks do nothing unless polled"]
55pub trait Sink<Item> {
56    /// The type of value produced by the sink when an error occurs.
57    type Error;
58
59    /// Attempts to prepare the `Sink` to receive a value.
60    ///
61    /// This method must be called and return `Poll::Ready(Ok(()))` prior to
62    /// each call to `start_send`.
63    ///
64    /// This method returns `Poll::Ready` once the underlying sink is ready to
65    /// receive data. If this method returns `Poll::Pending`, the current task
66    /// is registered to be notified (via `cx.waker().wake_by_ref()`) when `poll_ready`
67    /// should be called again.
68    ///
69    /// In most cases, if the sink encounters an error, the sink will
70    /// permanently be unable to receive items.
71    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
72
73    /// Begin the process of sending a value to the sink.
74    /// Each call to this function must be preceded by a successful call to
75    /// `poll_ready` which returned `Poll::Ready(Ok(()))`.
76    ///
77    /// As the name suggests, this method only *begins* the process of sending
78    /// the item. If the sink employs buffering, the item isn't fully processed
79    /// until the buffer is fully flushed. Since sinks are designed to work with
80    /// asynchronous I/O, the process of actually writing out the data to an
81    /// underlying object takes place asynchronously. **You *must* use
82    /// `poll_flush` or `poll_close` in order to guarantee completion of a
83    /// send**.
84    ///
85    /// Implementations of `poll_ready` and `start_send` will usually involve
86    /// flushing behind the scenes in order to make room for new messages.
87    /// It is only necessary to call `poll_flush` if you need to guarantee that
88    /// *all* of the items placed into the `Sink` have been sent.
89    ///
90    /// In most cases, if the sink encounters an error, the sink will
91    /// permanently be unable to receive items.
92    fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error>;
93
94    /// Flush any remaining output from this sink.
95    ///
96    /// Returns `Poll::Ready(Ok(()))` when no buffered items remain. If this
97    /// value is returned then it is guaranteed that all previous values sent
98    /// via `start_send` have been flushed.
99    ///
100    /// Returns `Poll::Pending` if there is more work left to do, in which
101    /// case the current task is scheduled (via `cx.waker().wake_by_ref()`) to wake up when
102    /// `poll_flush` should be called again.
103    ///
104    /// In most cases, if the sink encounters an error, the sink will
105    /// permanently be unable to receive items.
106    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
107
108    /// Flush any remaining output and close this sink, if necessary.
109    ///
110    /// Returns `Poll::Ready(Ok(()))` when no buffered items remain and the sink
111    /// has been successfully closed.
112    ///
113    /// Returns `Poll::Pending` if there is more work left to do, in which
114    /// case the current task is scheduled (via `cx.waker().wake_by_ref()`) to wake up when
115    /// `poll_close` should be called again.
116    ///
117    /// If this function encounters an error, the sink should be considered to
118    /// have failed permanently, and no more `Sink` methods should be called.
119    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
120}
121
122impl<S: ?Sized + Sink<Item> + Unpin, Item> Sink<Item> for &mut S {
123    type Error = S::Error;
124
125    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
126        Pin::new(&mut **self).poll_ready(cx)
127    }
128
129    fn start_send(mut self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
130        Pin::new(&mut **self).start_send(item)
131    }
132
133    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
134        Pin::new(&mut **self).poll_flush(cx)
135    }
136
137    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
138        Pin::new(&mut **self).poll_close(cx)
139    }
140}
141
142impl<P, Item> Sink<Item> for Pin<P>
143where
144    P: DerefMut + Unpin,
145    P::Target: Sink<Item>,
146{
147    type Error = <P::Target as Sink<Item>>::Error;
148
149    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
150        self.get_mut().as_mut().poll_ready(cx)
151    }
152
153    fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
154        self.get_mut().as_mut().start_send(item)
155    }
156
157    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
158        self.get_mut().as_mut().poll_flush(cx)
159    }
160
161    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
162        self.get_mut().as_mut().poll_close(cx)
163    }
164}
165
166#[cfg(feature = "alloc")]
167impl<T> Sink<T> for alloc::vec::Vec<T> {
168    type Error = core::convert::Infallible;
169
170    fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
171        Poll::Ready(Ok(()))
172    }
173
174    fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
175        // TODO: impl<T> Unpin for Vec<T> {}
176        unsafe { self.get_unchecked_mut() }.push(item);
177        Ok(())
178    }
179
180    fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
181        Poll::Ready(Ok(()))
182    }
183
184    fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
185        Poll::Ready(Ok(()))
186    }
187}
188
189#[cfg(feature = "alloc")]
190impl<T> Sink<T> for alloc::collections::VecDeque<T> {
191    type Error = core::convert::Infallible;
192
193    fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
194        Poll::Ready(Ok(()))
195    }
196
197    fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
198        // TODO: impl<T> Unpin for Vec<T> {}
199        unsafe { self.get_unchecked_mut() }.push_back(item);
200        Ok(())
201    }
202
203    fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
204        Poll::Ready(Ok(()))
205    }
206
207    fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
208        Poll::Ready(Ok(()))
209    }
210}
211
212#[cfg(feature = "alloc")]
213impl<S: ?Sized + Sink<Item> + Unpin, Item> Sink<Item> for alloc::boxed::Box<S> {
214    type Error = S::Error;
215
216    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
217        Pin::new(&mut **self).poll_ready(cx)
218    }
219
220    fn start_send(mut self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
221        Pin::new(&mut **self).start_send(item)
222    }
223
224    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
225        Pin::new(&mut **self).poll_flush(cx)
226    }
227
228    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
229        Pin::new(&mut **self).poll_close(cx)
230    }
231}
232
233impl<SL: Sized + Sink<Item> + Unpin, SR: Sized + Sink<Item> + Unpin, Item> Sink<Item>
234    for either::Either<SL, SR>
235{
236    type Error = either::Either<SL::Error, SR::Error>;
237
238    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
239        match self.get_mut() {
240            either::Either::Left(s) => match Pin::new(s).poll_ready(cx) {
241                Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
242                Poll::Ready(Err(e)) => Poll::Ready(Err(either::Either::Left(e))),
243                Poll::Pending => Poll::Pending,
244            },
245            either::Either::Right(s) => match Pin::new(s).poll_ready(cx) {
246                Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
247                Poll::Ready(Err(e)) => Poll::Ready(Err(either::Either::Right(e))),
248                Poll::Pending => Poll::Pending,
249            },
250        }
251    }
252
253    fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
254        match self.get_mut() {
255            either::Either::Left(s) => Pin::new(s).start_send(item).map_err(either::Either::Left),
256            either::Either::Right(s) => Pin::new(s).start_send(item).map_err(either::Either::Right),
257        }
258    }
259
260    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
261        match self.get_mut() {
262            either::Either::Left(s) => match Pin::new(s).poll_flush(cx) {
263                Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
264                Poll::Ready(Err(e)) => Poll::Ready(Err(either::Either::Left(e))),
265                Poll::Pending => Poll::Pending,
266            },
267            either::Either::Right(s) => match Pin::new(s).poll_flush(cx) {
268                Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
269                Poll::Ready(Err(e)) => Poll::Ready(Err(either::Either::Right(e))),
270                Poll::Pending => Poll::Pending,
271            },
272        }
273    }
274
275    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
276        match self.get_mut() {
277            either::Either::Left(s) => match Pin::new(s).poll_close(cx) {
278                Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
279                Poll::Ready(Err(e)) => Poll::Ready(Err(either::Either::Left(e))),
280                Poll::Pending => Poll::Pending,
281            },
282            either::Either::Right(s) => match Pin::new(s).poll_close(cx) {
283                Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
284                Poll::Ready(Err(e)) => Poll::Ready(Err(either::Either::Right(e))),
285                Poll::Pending => Poll::Pending,
286            },
287        }
288    }
289}