async_sink/ext/
mod.rs

1//! Asynchronous sinks.
2//!
3//! This module contains:
4//!
5//! - The [`Sink`] trait, which allows you to asynchronously write data.
6//! - The [`SinkExt`] trait, which provides adapters for chaining and composing
7//!   sinks.
8
9use core::future::Future;
10use core::pin::Pin;
11use core::task::{Context, Poll};
12use either::Either;
13use tokio_stream::Stream;
14use tokio_stream_util::TryStream;
15
16pub use super::Sink;
17
18mod close;
19pub use close::Close;
20
21mod drain;
22pub use drain::{drain, Drain};
23
24mod fanout;
25pub use fanout::Fanout;
26
27mod feed;
28pub use feed::Feed;
29
30mod flush;
31pub use flush::Flush;
32
33mod err_into;
34pub use err_into::SinkErrInto;
35
36mod map_err;
37pub use map_err::SinkMapErr;
38
39mod send;
40pub use send::Send;
41
42mod send_all;
43pub use send_all::SendAll;
44
45mod unfold;
46pub use unfold::{unfold, Unfold};
47
48mod with;
49pub use with::With;
50
51mod with_flat_map;
52pub use with_flat_map::WithFlatMap;
53
54#[cfg(feature = "alloc")]
55mod buffer;
56#[cfg(feature = "alloc")]
57pub use buffer::Buffer;
58
59impl<T: ?Sized, Item> SinkExt<Item> for T where T: Sink<Item> {}
60
61/// An extension trait for `Sink`s that provides a variety of convenient
62/// combinator functions.
63pub trait SinkExt<Item>: Sink<Item> {
64    /// Composes a function *in front of* the sink.
65    ///
66    /// This adapter produces a new sink that passes each value through the
67    /// given function `f` before sending it to `self`.
68    ///
69    /// To process each value, `f` produces a *future*, which is then polled to
70    /// completion before passing its result down to the underlying sink. If the
71    /// future produces an error, that error is returned by the new sink.
72    ///
73    /// Note that this function consumes the given sink, returning a wrapped
74    /// version, much like `Iterator::map`.
75    fn with<U, Fut, F, E>(self, f: F) -> With<Self, Item, U, Fut, F>
76    where
77        F: FnMut(U) -> Fut,
78        Fut: Future<Output = Result<Item, E>>,
79        E: From<Self::Error>,
80        Self: Sized,
81    {
82        With::new(self, f)
83    }
84
85    /// Composes a function *in front of* the sink.
86    ///
87    /// This adapter produces a new sink that passes each value through the
88    /// given function `f` before sending it to `self`.
89    ///
90    /// To process each value, `f` produces a *stream*, of which each value
91    /// is passed to the underlying sink. A new value will not be accepted until
92    /// the stream has been drained
93    ///
94    /// Note that this function consumes the given sink, returning a wrapped
95    /// version, much like `Iterator::flat_map`.
96    ///
97    /// # Examples
98    ///
99    /// ```
100    /// ```rust
101    /// # #[tokio::main]
102    /// # async fn main() {
103    /// # use core::pin::Pin;
104    /// # use core::task::{Context, Poll};
105    /// # use tokio::sync::mpsc;
106    /// # use tokio_stream::{self as stream, StreamExt};
107    /// # use crate::sink::{Sink, SinkExt};
108    /// // This example requires a `Sink` implementation for `mpsc::UnboundedSender`.
109    /// struct MySender<T>(mpsc::UnboundedSender<T>);
110    ///
111    /// impl<T> Sink<T> for MySender<T> {
112    ///     type Error = mpsc::error::SendError<T>;
113    ///
114    ///     fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
115    ///         Poll::Ready(Ok(()))
116    ///     }
117    ///
118    ///     fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
119    ///         self.0.send(item)
120    ///     }
121    ///
122    ///     fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
123    ///         Poll::Ready(Ok(()))
124    ///     }
125    ///
126    ///     fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
127    ///         Poll::Ready(Ok(()))
128    ///     }
129    /// }
130    ///
131    /// let (tx, mut rx) = mpsc::unbounded_channel();
132    /// let tx = MySender(tx);
133    ///
134    /// let mut tx = tx.with_flat_map(|x: usize| {
135    ///     stream::iter(vec![Ok(42); x])
136    /// });
137    ///
138    /// tx.send(5).await.unwrap();
139    /// drop(tx);
140    /// let mut received = Vec::new();
141    /// while let Some(i) = rx.recv().await { received.push(i); }
142    /// assert_eq!(received, vec![42, 42, 42, 42, 42]);
143    /// # }
144    /// ```
145    fn with_flat_map<U, St, F>(self, f: F) -> WithFlatMap<Self, Item, U, St, F>
146    where
147        F: FnMut(U) -> St,
148        St: Stream<Item = Result<Item, Self::Error>>,
149        Self: Sized,
150    {
151        WithFlatMap::new(self, f)
152    }
153
154    /*
155    fn with_map<U, F>(self, f: F) -> WithMap<Self, U, F>
156        where F: FnMut(U) -> Self::SinkItem,
157              Self: Sized;
158
159    fn with_filter<F>(self, f: F) -> WithFilter<Self, F>
160        where F: FnMut(Self::SinkItem) -> bool,
161              Self: Sized;
162
163    fn with_filter_map<U, F>(self, f: F) -> WithFilterMap<Self, U, F>
164        where F: FnMut(U) -> Option<Self::SinkItem>,
165              Self: Sized;
166     */
167
168    /// Transforms the error returned by the sink.
169    fn sink_map_err<E, F>(self, f: F) -> SinkMapErr<Self, F>
170    where
171        F: FnOnce(Self::Error) -> E,
172        Self: Sized,
173    {
174        SinkMapErr::new(self, f)
175    }
176
177    /// Map this sink's error to a different error type using the `Into` trait.
178    ///
179    /// If wanting to map errors of a `Sink + Stream`, use `.sink_err_into().err_into()`.
180    fn sink_err_into<E>(self) -> err_into::SinkErrInto<Self, Item, E>
181    where
182        Self: Sized,
183        Self::Error: Into<E>,
184    {
185        err_into::SinkErrInto::new(self)
186    }
187
188    /// Adds a fixed-size buffer to the current sink.
189    ///
190    /// The resulting sink will buffer up to `capacity` items when the
191    /// underlying sink is unwilling to accept additional items. Calling `flush`
192    /// on the buffered sink will attempt to both empty the buffer and complete
193    /// processing on the underlying sink.
194    ///
195    /// Note that this function consumes the given sink, returning a wrapped
196    /// version, much like `Iterator::map`.
197    ///
198    /// This method is only available when the `std` or `alloc` feature of this
199    /// library is activated, and it is activated by default.
200    #[cfg(feature = "alloc")]
201    fn buffer(self, capacity: usize) -> Buffer<Self, Item>
202    where
203        Self: Sized,
204    {
205        Buffer::new(self, capacity)
206    }
207
208    /// Close the sink.
209    fn close(&mut self) -> Close<'_, Self, Item>
210    where
211        Self: Unpin,
212    {
213        Close::new(self)
214    }
215
216    /// Fanout items to multiple sinks.
217    ///
218    /// This adapter clones each incoming item and forwards it to both this as well as
219    /// the other sink at the same time.
220    fn fanout<Si>(self, other: Si) -> Fanout<Self, Si>
221    where
222        Self: Sized,
223        Item: Clone,
224        Si: Sink<Item, Error = Self::Error>,
225    {
226        Fanout::new(self, other)
227    }
228
229    /// Flush the sink, processing all pending items.
230    ///
231    /// This adapter is intended to be used when you want to stop sending to the sink
232    /// until all current requests are processed.
233    fn flush(&mut self) -> Flush<'_, Self, Item>
234    where
235        Self: Unpin,
236    {
237        Flush::new(self)
238    }
239
240    /// A future that completes after the given item has been fully processed
241    /// into the sink, including flushing.
242    ///
243    /// Note that, **because of the flushing requirement, it is usually better
244    /// to batch together items to send via `feed` or `send_all`,
245    /// rather than flushing between each item.**
246    fn send(&mut self, item: Item) -> Send<'_, Self, Item>
247    where
248        Self: Unpin,
249    {
250        Send::new(self, item)
251    }
252
253    /// A future that completes after the given item has been received
254    /// by the sink.
255    ///
256    /// Unlike `send`, the returned future does not flush the sink.
257    /// It is the caller's responsibility to ensure all pending items
258    /// are processed, which can be done via `flush` or `close`.
259    fn feed(&mut self, item: Item) -> Feed<'_, Self, Item>
260    where
261        Self: Unpin,
262    {
263        Feed::new(self, item)
264    }
265
266    /// A future that completes after the given stream has been fully processed
267    /// into the sink, including flushing.
268    ///
269    /// This future will drive the stream to keep producing items until it is
270    /// exhausted, sending each item to the sink. It will complete once both the
271    /// stream is exhausted, the sink has received all items, and the sink has
272    /// been flushed. Note that the sink is **not** closed. If the stream produces
273    /// an error, that error will be returned by this future without flushing the sink.
274    ///
275    /// Doing `sink.send_all(stream)` is roughly equivalent to
276    /// `stream.forward(sink)`. The returned future will exhaust all items from
277    /// `stream` and send them to `self`.
278    fn send_all<'a, St>(&'a mut self, stream: &'a mut St) -> SendAll<'a, Self, St>
279    where
280        St: TryStream<Ok = Item, Error = Self::Error> + Unpin + ?Sized,
281        Self: Unpin,
282    {
283        SendAll::new(self, stream)
284    }
285
286    /// Wrap this sink in an `Either` sink, making it the left-hand variant
287    /// of that `Either`.
288    ///
289    /// This can be used in combination with the `right_sink` method to write `if`
290    /// statements that evaluate to different streams in different branches.
291    fn left_sink<Si2>(self) -> Either<Self, Si2>
292    where
293        Si2: Sink<Item, Error = Self::Error>,
294        Self: Sized,
295    {
296        Either::Left(self)
297    }
298
299    /// Wrap this stream in an `Either` stream, making it the right-hand variant
300    /// of that `Either`.
301    ///
302    /// This can be used in combination with the `left_sink` method to write `if`
303    /// statements that evaluate to different streams in different branches.
304    fn right_sink<Si1>(self) -> Either<Si1, Self>
305    where
306        Si1: Sink<Item, Error = Self::Error>,
307        Self: Sized,
308    {
309        Either::Right(self)
310    }
311
312    /// A convenience method for calling [`Sink::poll_ready`] on [`Unpin`]
313    /// sink types.
314    fn poll_ready_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>
315    where
316        Self: Unpin,
317    {
318        Pin::new(self).poll_ready(cx)
319    }
320
321    /// A convenience method for calling [`Sink::start_send`] on [`Unpin`]
322    /// sink types.
323    fn start_send_unpin(&mut self, item: Item) -> Result<(), Self::Error>
324    where
325        Self: Unpin,
326    {
327        Pin::new(self).start_send(item)
328    }
329
330    /// A convenience method for calling [`Sink::poll_flush`] on [`Unpin`]
331    /// sink types.
332    fn poll_flush_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>
333    where
334        Self: Unpin,
335    {
336        Pin::new(self).poll_flush(cx)
337    }
338
339    /// A convenience method for calling [`Sink::poll_close`] on [`Unpin`]
340    /// sink types.
341    fn poll_close_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>
342    where
343        Self: Unpin,
344    {
345        Pin::new(self).poll_close(cx)
346    }
347}