async_sink/
lib.rs

1//! Asynchronous sinks for the [tokio](https://docs.rs/tokio) ecosystem
2//!
3//! This crate contains the `Sink` trait which allows values to be sent
4//! asynchronously.
5
6#![no_std]
7#![doc(test(
8    no_crate_inject,
9    attr(
10        deny(warnings, rust_2018_idioms, single_use_lifetimes),
11        allow(dead_code, unused_assignments, unused_variables)
12    )
13))]
14#![warn(missing_docs, /* unsafe_op_in_unsafe_fn */)] // unsafe_op_in_unsafe_fn requires Rust 1.52
15
16#[cfg(feature = "alloc")]
17extern crate alloc;
18#[cfg(feature = "std")]
19extern crate std;
20
21mod ext;
22pub(crate) mod unfold_state;
23use core::ops::DerefMut;
24use core::pin::Pin;
25use core::task::{Context, Poll};
26pub use ext::*;
27
28#[cfg(feature = "sync")]
29pub mod sync;
30
31/// A `Sink` is a value into which other values can be sent, asynchronously.
32///
33/// Basic examples of sinks include the sending side of:
34///
35/// - Channels
36/// - Sockets
37/// - Pipes
38///
39/// In addition to such "primitive" sinks, it's typical to layer additional
40/// functionality, such as buffering, on top of an existing sink.
41///
42/// Sending to a sink is "asynchronous" in the sense that the value may not be
43/// sent in its entirety immediately. Instead, values are sent in a two-phase
44/// way: first by initiating a send, and then by polling for completion. This
45/// two-phase setup is analogous to buffered writing in synchronous code, where
46/// writes often succeed immediately, but internally are buffered and are
47/// *actually* written only upon flushing.
48///
49/// In addition, the `Sink` may be *full*, in which case it is not even possible
50/// to start the sending process.
51///
52/// As with `Future` and `Stream`, the `Sink` trait is built from a few core
53/// required methods, and a host of default methods for working in a
54/// higher-level way. The `Sink::send_all` combinator is of particular
55/// importance: you can use it to send an entire stream to a sink, which is
56/// the simplest way to ultimately consume a stream.
57#[must_use = "sinks do nothing unless polled"]
58pub trait Sink<Item> {
59    /// The type of value produced by the sink when an error occurs.
60    type Error;
61
62    /// Attempts to prepare the `Sink` to receive a value.
63    ///
64    /// This method must be called and return `Poll::Ready(Ok(()))` prior to
65    /// each call to `start_send`.
66    ///
67    /// This method returns `Poll::Ready` once the underlying sink is ready to
68    /// receive data. If this method returns `Poll::Pending`, the current task
69    /// is registered to be notified (via `cx.waker().wake_by_ref()`) when `poll_ready`
70    /// should be called again.
71    ///
72    /// In most cases, if the sink encounters an error, the sink will
73    /// permanently be unable to receive items.
74    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
75
76    /// Begin the process of sending a value to the sink.
77    /// Each call to this function must be preceded by a successful call to
78    /// `poll_ready` which returned `Poll::Ready(Ok(()))`.
79    ///
80    /// As the name suggests, this method only *begins* the process of sending
81    /// the item. If the sink employs buffering, the item isn't fully processed
82    /// until the buffer is fully flushed. Since sinks are designed to work with
83    /// asynchronous I/O, the process of actually writing out the data to an
84    /// underlying object takes place asynchronously. **You *must* use
85    /// `poll_flush` or `poll_close` in order to guarantee completion of a
86    /// send**.
87    ///
88    /// Implementations of `poll_ready` and `start_send` will usually involve
89    /// flushing behind the scenes in order to make room for new messages.
90    /// It is only necessary to call `poll_flush` if you need to guarantee that
91    /// *all* of the items placed into the `Sink` have been sent.
92    ///
93    /// In most cases, if the sink encounters an error, the sink will
94    /// permanently be unable to receive items.
95    fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error>;
96
97    /// Flush any remaining output from this sink.
98    ///
99    /// Returns `Poll::Ready(Ok(()))` when no buffered items remain. If this
100    /// value is returned then it is guaranteed that all previous values sent
101    /// via `start_send` have been flushed.
102    ///
103    /// Returns `Poll::Pending` if there is more work left to do, in which
104    /// case the current task is scheduled (via `cx.waker().wake_by_ref()`) to wake up when
105    /// `poll_flush` should be called again.
106    ///
107    /// In most cases, if the sink encounters an error, the sink will
108    /// permanently be unable to receive items.
109    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
110
111    /// Flush any remaining output and close this sink, if necessary.
112    ///
113    /// Returns `Poll::Ready(Ok(()))` when no buffered items remain and the sink
114    /// has been successfully closed.
115    ///
116    /// Returns `Poll::Pending` if there is more work left to do, in which
117    /// case the current task is scheduled (via `cx.waker().wake_by_ref()`) to wake up when
118    /// `poll_close` should be called again.
119    ///
120    /// If this function encounters an error, the sink should be considered to
121    /// have failed permanently, and no more `Sink` methods should be called.
122    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
123}
124
125impl<S: ?Sized + Sink<Item> + Unpin, Item> Sink<Item> for &mut S {
126    type Error = S::Error;
127
128    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
129        Pin::new(&mut **self).poll_ready(cx)
130    }
131
132    fn start_send(mut self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
133        Pin::new(&mut **self).start_send(item)
134    }
135
136    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
137        Pin::new(&mut **self).poll_flush(cx)
138    }
139
140    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
141        Pin::new(&mut **self).poll_close(cx)
142    }
143}
144
145impl<P, Item> Sink<Item> for Pin<P>
146where
147    P: DerefMut + Unpin,
148    P::Target: Sink<Item>,
149{
150    type Error = <P::Target as Sink<Item>>::Error;
151
152    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
153        self.get_mut().as_mut().poll_ready(cx)
154    }
155
156    fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
157        self.get_mut().as_mut().start_send(item)
158    }
159
160    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
161        self.get_mut().as_mut().poll_flush(cx)
162    }
163
164    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
165        self.get_mut().as_mut().poll_close(cx)
166    }
167}
168
169#[cfg(feature = "alloc")]
170impl<T> Sink<T> for alloc::vec::Vec<T> {
171    type Error = core::convert::Infallible;
172
173    fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
174        Poll::Ready(Ok(()))
175    }
176
177    fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
178        // TODO: impl<T> Unpin for Vec<T> {}
179        unsafe { self.get_unchecked_mut() }.push(item);
180        Ok(())
181    }
182
183    fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
184        Poll::Ready(Ok(()))
185    }
186
187    fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
188        Poll::Ready(Ok(()))
189    }
190}
191
192#[cfg(feature = "alloc")]
193impl<T> Sink<T> for alloc::collections::VecDeque<T> {
194    type Error = core::convert::Infallible;
195
196    fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
197        Poll::Ready(Ok(()))
198    }
199
200    fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
201        // TODO: impl<T> Unpin for Vec<T> {}
202        unsafe { self.get_unchecked_mut() }.push_back(item);
203        Ok(())
204    }
205
206    fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
207        Poll::Ready(Ok(()))
208    }
209
210    fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
211        Poll::Ready(Ok(()))
212    }
213}
214
215#[cfg(feature = "alloc")]
216impl<S: ?Sized + Sink<Item> + Unpin, Item> Sink<Item> for alloc::boxed::Box<S> {
217    type Error = S::Error;
218
219    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
220        Pin::new(&mut **self).poll_ready(cx)
221    }
222
223    fn start_send(mut self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
224        Pin::new(&mut **self).start_send(item)
225    }
226
227    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
228        Pin::new(&mut **self).poll_flush(cx)
229    }
230
231    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
232        Pin::new(&mut **self).poll_close(cx)
233    }
234}
235
236impl<SL: Sized + Sink<Item> + Unpin, SR: Sized + Sink<Item> + Unpin, Item> Sink<Item>
237    for either::Either<SL, SR>
238{
239    type Error = either::Either<SL::Error, SR::Error>;
240
241    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
242        match self.get_mut() {
243            either::Either::Left(s) => match Pin::new(s).poll_ready(cx) {
244                Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
245                Poll::Ready(Err(e)) => Poll::Ready(Err(either::Either::Left(e))),
246                Poll::Pending => Poll::Pending,
247            },
248            either::Either::Right(s) => match Pin::new(s).poll_ready(cx) {
249                Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
250                Poll::Ready(Err(e)) => Poll::Ready(Err(either::Either::Right(e))),
251                Poll::Pending => Poll::Pending,
252            },
253        }
254    }
255
256    fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
257        match self.get_mut() {
258            either::Either::Left(s) => Pin::new(s).start_send(item).map_err(either::Either::Left),
259            either::Either::Right(s) => Pin::new(s).start_send(item).map_err(either::Either::Right),
260        }
261    }
262
263    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
264        match self.get_mut() {
265            either::Either::Left(s) => match Pin::new(s).poll_flush(cx) {
266                Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
267                Poll::Ready(Err(e)) => Poll::Ready(Err(either::Either::Left(e))),
268                Poll::Pending => Poll::Pending,
269            },
270            either::Either::Right(s) => match Pin::new(s).poll_flush(cx) {
271                Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
272                Poll::Ready(Err(e)) => Poll::Ready(Err(either::Either::Right(e))),
273                Poll::Pending => Poll::Pending,
274            },
275        }
276    }
277
278    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
279        match self.get_mut() {
280            either::Either::Left(s) => match Pin::new(s).poll_close(cx) {
281                Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
282                Poll::Ready(Err(e)) => Poll::Ready(Err(either::Either::Left(e))),
283                Poll::Pending => Poll::Pending,
284            },
285            either::Either::Right(s) => match Pin::new(s).poll_close(cx) {
286                Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
287                Poll::Ready(Err(e)) => Poll::Ready(Err(either::Either::Right(e))),
288                Poll::Pending => Poll::Pending,
289            },
290        }
291    }
292}