async_sink/ext/
mod.rs

1//! Asynchronous sinks.
2//!
3//! This module contains:
4//!
5//! - The [`Sink`] trait, which allows you to asynchronously write data.
6//! - The [`SinkExt`] trait, which provides adapters for chaining and composing
7//!   sinks.
8
9use core::future::Future;
10use core::pin::Pin;
11use core::task::{Context, Poll};
12use either::Either;
13use tokio_stream::Stream;
14
15pub use super::Sink;
16
17mod close;
18pub use close::Close;
19
20mod drain;
21pub use drain::{drain, Drain};
22
23mod fanout;
24pub use fanout::Fanout;
25
26mod feed;
27pub use feed::Feed;
28
29mod flush;
30pub use flush::Flush;
31
32mod err_into;
33pub use err_into::SinkErrInto;
34
35mod map_err;
36pub use map_err::SinkMapErr;
37
38mod send;
39pub use send::Send;
40
41mod send_all;
42pub use send_all::SendAll;
43
44mod unfold;
45pub use unfold::{unfold, Unfold};
46
47mod with;
48pub use with::With;
49
50mod with_flat_map;
51pub use with_flat_map::WithFlatMap;
52
53#[cfg(feature = "alloc")]
54mod buffer;
55#[cfg(feature = "alloc")]
56pub use buffer::Buffer;
57
58impl<T: ?Sized, Item> SinkExt<Item> for T where T: Sink<Item> {}
59
60/// An extension trait for `Sink`s that provides a variety of convenient
61/// combinator functions.
62pub trait SinkExt<Item>: Sink<Item> {
63    /// Composes a function *in front of* the sink.
64    ///
65    /// This adapter produces a new sink that passes each value through the
66    /// given function `f` before sending it to `self`.
67    ///
68    /// To process each value, `f` produces a *future*, which is then polled to
69    /// completion before passing its result down to the underlying sink. If the
70    /// future produces an error, that error is returned by the new sink.
71    ///
72    /// Note that this function consumes the given sink, returning a wrapped
73    /// version, much like `Iterator::map`.
74    fn with<U, Fut, F, E>(self, f: F) -> With<Self, Item, U, Fut, F, E>
75    where
76        F: FnMut(U) -> Fut,
77        Fut: Future<Output = Result<Item, E>>,
78        E: From<Self::Error>,
79        Self: Sized,
80    {
81        With::new(self, f)
82    }
83
84    /// Composes a function *in front of* the sink.
85    ///
86    /// This adapter produces a new sink that passes each value through the
87    /// given function `f` before sending it to `self`.
88    ///
89    /// To process each value, `f` produces a *stream*, of which each value
90    /// is passed to the underlying sink. A new value will not be accepted until
91    /// the stream has been drained
92    ///
93    /// Note that this function consumes the given sink, returning a wrapped
94    /// version, much like `Iterator::flat_map`.
95    ///
96    /// # Examples
97    ///
98    /// ```rust
99    /// use core::pin::Pin;
100    /// use core::task::{Context, Poll};
101    /// use tokio::sync::mpsc;
102    /// use async_sink::{Sink, SinkExt};
103    /// struct MySender<T>(mpsc::UnboundedSender<T>);
104    ///
105    /// impl<T> Sink<T> for MySender<T> {
106    ///     type Error = mpsc::error::SendError<T>;
107    ///
108    ///     fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
109    ///         Poll::Ready(Ok(()))
110    ///     }
111    ///
112    ///     fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
113    ///         self.0.send(item)
114    ///     }
115    ///
116    ///     fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
117    ///         Poll::Ready(Ok(()))
118    ///     }
119    ///
120    ///     fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
121    ///         Poll::Ready(Ok(()))
122    ///     }
123    /// }
124    /// #[tokio::main]
125    /// async fn main() {
126    /// let (tx, mut rx) = mpsc::unbounded_channel();
127    /// let tx = MySender(tx);
128    ///
129    /// let mut tx = tx.with_flat_map(|x: usize| {
130    ///     tokio_stream::iter(vec![Ok(42); x])
131    /// });
132    ///
133    /// tx.send(5).await.unwrap();
134    /// drop(tx);
135    /// let mut received = Vec::new();
136    /// while let Some(i) = rx.recv().await { received.push(i); }
137    /// assert_eq!(received, vec![42, 42, 42, 42, 42]);
138    /// }
139    /// ```
140    fn with_flat_map<U, St, F>(self, f: F) -> WithFlatMap<Self, Item, U, St, F>
141    where
142        F: FnMut(U) -> St,
143        St: Stream<Item = Result<Item, Self::Error>>,
144        Self: Sized,
145    {
146        WithFlatMap::new(self, f)
147    }
148
149    /*
150    fn with_map<U, F>(self, f: F) -> WithMap<Self, U, F>
151        where F: FnMut(U) -> Self::SinkItem,
152              Self: Sized;
153
154    fn with_filter<F>(self, f: F) -> WithFilter<Self, F>
155        where F: FnMut(Self::SinkItem) -> bool,
156              Self: Sized;
157
158    fn with_filter_map<U, F>(self, f: F) -> WithFilterMap<Self, U, F>
159        where F: FnMut(U) -> Option<Self::SinkItem>,
160              Self: Sized;
161     */
162
163    /// Transforms the error returned by the sink.
164    fn sink_map_err<E, F>(self, f: F) -> SinkMapErr<Self, F>
165    where
166        F: FnOnce(Self::Error) -> E,
167        Self: Sized,
168    {
169        SinkMapErr::new(self, f)
170    }
171
172    /// Map this sink's error to a different error type using the `Into` trait.
173    ///
174    /// If wanting to map errors of a `Sink + Stream`, use `.sink_err_into().err_into()`.
175    fn sink_err_into<E>(self) -> err_into::SinkErrInto<Self, Item, E>
176    where
177        Self: Sized,
178        Self::Error: Into<E>,
179    {
180        err_into::SinkErrInto::new(self)
181    }
182
183    /// Adds a fixed-size buffer to the current sink.
184    ///
185    /// The resulting sink will buffer up to `capacity` items when the
186    /// underlying sink is unwilling to accept additional items. Calling `flush`
187    /// on the buffered sink will attempt to both empty the buffer and complete
188    /// processing on the underlying sink.
189    ///
190    /// Note that this function consumes the given sink, returning a wrapped
191    /// version, much like `Iterator::map`.
192    ///
193    /// This method is only available when the `std` or `alloc` feature of this
194    /// library is activated, and it is activated by default.
195    #[cfg(feature = "alloc")]
196    fn buffer(self, capacity: usize) -> Buffer<Self, Item>
197    where
198        Self: Sized,
199    {
200        Buffer::new(self, capacity)
201    }
202
203    /// Close the sink.
204    fn close(&mut self) -> Close<'_, Self, Item>
205    where
206        Self: Unpin,
207    {
208        Close::new(self)
209    }
210
211    /// Fanout items to multiple sinks.
212    ///
213    /// This adapter clones each incoming item and forwards it to both this as well as
214    /// the other sink at the same time.
215    fn fanout<Si>(self, other: Si) -> Fanout<Self, Si>
216    where
217        Self: Sized,
218        Item: Clone,
219        Si: Sink<Item, Error = Self::Error>,
220    {
221        Fanout::new(self, other)
222    }
223
224    /// Flush the sink, processing all pending items.
225    ///
226    /// This adapter is intended to be used when you want to stop sending to the sink
227    /// until all current requests are processed.
228    fn flush(&mut self) -> Flush<'_, Self, Item>
229    where
230        Self: Unpin,
231    {
232        Flush::new(self)
233    }
234
235    /// A future that completes after the given item has been fully processed
236    /// into the sink, including flushing.
237    ///
238    /// Note that, **because of the flushing requirement, it is usually better
239    /// to batch together items to send via `feed` or `send_all`,
240    /// rather than flushing between each item.**
241    fn send(&mut self, item: Item) -> Send<'_, Self, Item>
242    where
243        Self: Unpin,
244    {
245        Send::new(self, item)
246    }
247
248    /// A future that completes after the given item has been received
249    /// by the sink.
250    ///
251    /// Unlike `send`, the returned future does not flush the sink.
252    /// It is the caller's responsibility to ensure all pending items
253    /// are processed, which can be done via `flush` or `close`.
254    fn feed(&mut self, item: Item) -> Feed<'_, Self, Item>
255    where
256        Self: Unpin,
257    {
258        Feed::new(self, item)
259    }
260
261    /// A future that completes after the given stream has been fully processed
262    /// into the sink, including flushing.
263    ///
264    /// This future will drive the stream to keep producing items until it is
265    /// exhausted, sending each item to the sink. It will complete once both the
266    /// stream is exhausted, the sink has received all items, and the sink has
267    /// been flushed. Note that the sink is **not** closed. If the stream produces
268    /// an error, that error will be returned by this future without flushing the sink.
269    ///
270    /// Doing `sink.send_all(stream)` is roughly equivalent to
271    /// `stream.forward(sink)`. The returned future will exhaust all items from
272    /// `stream` and send them to `self`.
273    fn send_all<'a, St>(&'a mut self, stream: &'a mut St) -> SendAll<'a, Self, Item, St>
274    where
275        St: Stream<Item = Result<Item, Self::Error>> + Unpin + ?Sized,
276        Self: Unpin,
277    {
278        SendAll::new(self, stream)
279    }
280
281    /// Wrap this sink in an `Either` sink, making it the left-hand variant
282    /// of that `Either`.
283    ///
284    /// This can be used in combination with the `right_sink` method to write `if`
285    /// statements that evaluate to different streams in different branches.
286    fn left_sink<Si2>(self) -> Either<Self, Si2>
287    where
288        Si2: Sink<Item, Error = Self::Error>,
289        Self: Sized,
290    {
291        Either::Left(self)
292    }
293
294    /// Wrap this stream in an `Either` stream, making it the right-hand variant
295    /// of that `Either`.
296    ///
297    /// This can be used in combination with the `left_sink` method to write `if`
298    /// statements that evaluate to different streams in different branches.
299    fn right_sink<Si1>(self) -> Either<Si1, Self>
300    where
301        Si1: Sink<Item, Error = Self::Error>,
302        Self: Sized,
303    {
304        Either::Right(self)
305    }
306
307    /// A convenience method for calling [`Sink::poll_ready`] on [`Unpin`]
308    /// sink types.
309    fn poll_ready_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>
310    where
311        Self: Unpin,
312    {
313        Pin::new(self).poll_ready(cx)
314    }
315
316    /// A convenience method for calling [`Sink::start_send`] on [`Unpin`]
317    /// sink types.
318    fn start_send_unpin(&mut self, item: Item) -> Result<(), Self::Error>
319    where
320        Self: Unpin,
321    {
322        Pin::new(self).start_send(item)
323    }
324
325    /// A convenience method for calling [`Sink::poll_flush`] on [`Unpin`]
326    /// sink types.
327    fn poll_flush_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>
328    where
329        Self: Unpin,
330    {
331        Pin::new(self).poll_flush(cx)
332    }
333
334    /// A convenience method for calling [`Sink::poll_close`] on [`Unpin`]
335    /// sink types.
336    fn poll_close_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>
337    where
338        Self: Unpin,
339    {
340        Pin::new(self).poll_close(cx)
341    }
342}