async_sink/ext/mod.rs
1//! Asynchronous sinks.
2//!
3//! This module contains:
4//!
5//! - The [`Sink`] trait, which allows you to asynchronously write data.
6//! - The [`SinkExt`] trait, which provides adapters for chaining and composing
7//! sinks.
8
9use core::future::Future;
10use core::pin::Pin;
11use core::task::{Context, Poll};
12use either::Either;
13use tokio_stream::Stream;
14use tokio_stream_util::TryStream;
15
16pub use super::Sink;
17
18mod close;
19pub use close::Close;
20
21mod drain;
22pub use drain::{drain, Drain};
23
24mod fanout;
25pub use fanout::Fanout;
26
27mod feed;
28pub use feed::Feed;
29
30mod flush;
31pub use flush::Flush;
32
33mod err_into;
34pub use err_into::SinkErrInto;
35
36mod map_err;
37pub use map_err::SinkMapErr;
38
39mod send;
40pub use send::Send;
41
42mod send_all;
43pub use send_all::SendAll;
44
45mod unfold;
46pub use unfold::{unfold, Unfold};
47
48mod with;
49pub use with::With;
50
51mod with_flat_map;
52pub use with_flat_map::WithFlatMap;
53
54#[cfg(feature = "alloc")]
55mod buffer;
56#[cfg(feature = "alloc")]
57pub use buffer::Buffer;
58
59impl<T: ?Sized, Item> SinkExt<Item> for T where T: Sink<Item> {}
60
61/// An extension trait for `Sink`s that provides a variety of convenient
62/// combinator functions.
63pub trait SinkExt<Item>: Sink<Item> {
64 /// Composes a function *in front of* the sink.
65 ///
66 /// This adapter produces a new sink that passes each value through the
67 /// given function `f` before sending it to `self`.
68 ///
69 /// To process each value, `f` produces a *future*, which is then polled to
70 /// completion before passing its result down to the underlying sink. If the
71 /// future produces an error, that error is returned by the new sink.
72 ///
73 /// Note that this function consumes the given sink, returning a wrapped
74 /// version, much like `Iterator::map`.
75 fn with<U, Fut, F, E>(self, f: F) -> With<Self, Item, U, Fut, F>
76 where
77 F: FnMut(U) -> Fut,
78 Fut: Future<Output = Result<Item, E>>,
79 E: From<Self::Error>,
80 Self: Sized,
81 {
82 With::new(self, f)
83 }
84
85 /// Composes a function *in front of* the sink.
86 ///
87 /// This adapter produces a new sink that passes each value through the
88 /// given function `f` before sending it to `self`.
89 ///
90 /// To process each value, `f` produces a *stream*, of which each value
91 /// is passed to the underlying sink. A new value will not be accepted until
92 /// the stream has been drained
93 ///
94 /// Note that this function consumes the given sink, returning a wrapped
95 /// version, much like `Iterator::flat_map`.
96 ///
97 /// # Examples
98 ///
99 /// ```
100 /// ```rust
101 /// # #[tokio::main]
102 /// # async fn main() {
103 /// # use core::pin::Pin;
104 /// # use core::task::{Context, Poll};
105 /// # use tokio::sync::mpsc;
106 /// # use tokio_stream::{self as stream, StreamExt};
107 /// # use crate::sink::{Sink, SinkExt};
108 /// // This example requires a `Sink` implementation for `mpsc::UnboundedSender`.
109 /// struct MySender<T>(mpsc::UnboundedSender<T>);
110 ///
111 /// impl<T> Sink<T> for MySender<T> {
112 /// type Error = mpsc::error::SendError<T>;
113 ///
114 /// fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
115 /// Poll::Ready(Ok(()))
116 /// }
117 ///
118 /// fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
119 /// self.0.send(item)
120 /// }
121 ///
122 /// fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
123 /// Poll::Ready(Ok(()))
124 /// }
125 ///
126 /// fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
127 /// Poll::Ready(Ok(()))
128 /// }
129 /// }
130 ///
131 /// let (tx, mut rx) = mpsc::unbounded_channel();
132 /// let tx = MySender(tx);
133 ///
134 /// let mut tx = tx.with_flat_map(|x: usize| {
135 /// stream::iter(vec![Ok(42); x])
136 /// });
137 ///
138 /// tx.send(5).await.unwrap();
139 /// drop(tx);
140 /// let mut received = Vec::new();
141 /// while let Some(i) = rx.recv().await { received.push(i); }
142 /// assert_eq!(received, vec![42, 42, 42, 42, 42]);
143 /// # }
144 /// ```
145 fn with_flat_map<U, St, F>(self, f: F) -> WithFlatMap<Self, Item, U, St, F>
146 where
147 F: FnMut(U) -> St,
148 St: Stream<Item = Result<Item, Self::Error>>,
149 Self: Sized,
150 {
151 WithFlatMap::new(self, f)
152 }
153
154 /*
155 fn with_map<U, F>(self, f: F) -> WithMap<Self, U, F>
156 where F: FnMut(U) -> Self::SinkItem,
157 Self: Sized;
158
159 fn with_filter<F>(self, f: F) -> WithFilter<Self, F>
160 where F: FnMut(Self::SinkItem) -> bool,
161 Self: Sized;
162
163 fn with_filter_map<U, F>(self, f: F) -> WithFilterMap<Self, U, F>
164 where F: FnMut(U) -> Option<Self::SinkItem>,
165 Self: Sized;
166 */
167
168 /// Transforms the error returned by the sink.
169 fn sink_map_err<E, F>(self, f: F) -> SinkMapErr<Self, F>
170 where
171 F: FnOnce(Self::Error) -> E,
172 Self: Sized,
173 {
174 SinkMapErr::new(self, f)
175 }
176
177 /// Map this sink's error to a different error type using the `Into` trait.
178 ///
179 /// If wanting to map errors of a `Sink + Stream`, use `.sink_err_into().err_into()`.
180 fn sink_err_into<E>(self) -> err_into::SinkErrInto<Self, Item, E>
181 where
182 Self: Sized,
183 Self::Error: Into<E>,
184 {
185 err_into::SinkErrInto::new(self)
186 }
187
188 /// Adds a fixed-size buffer to the current sink.
189 ///
190 /// The resulting sink will buffer up to `capacity` items when the
191 /// underlying sink is unwilling to accept additional items. Calling `flush`
192 /// on the buffered sink will attempt to both empty the buffer and complete
193 /// processing on the underlying sink.
194 ///
195 /// Note that this function consumes the given sink, returning a wrapped
196 /// version, much like `Iterator::map`.
197 ///
198 /// This method is only available when the `std` or `alloc` feature of this
199 /// library is activated, and it is activated by default.
200 #[cfg(feature = "alloc")]
201 fn buffer(self, capacity: usize) -> Buffer<Self, Item>
202 where
203 Self: Sized,
204 {
205 Buffer::new(self, capacity)
206 }
207
208 /// Close the sink.
209 fn close(&mut self) -> Close<'_, Self, Item>
210 where
211 Self: Unpin,
212 {
213 Close::new(self)
214 }
215
216 /// Fanout items to multiple sinks.
217 ///
218 /// This adapter clones each incoming item and forwards it to both this as well as
219 /// the other sink at the same time.
220 fn fanout<Si>(self, other: Si) -> Fanout<Self, Si>
221 where
222 Self: Sized,
223 Item: Clone,
224 Si: Sink<Item, Error = Self::Error>,
225 {
226 Fanout::new(self, other)
227 }
228
229 /// Flush the sink, processing all pending items.
230 ///
231 /// This adapter is intended to be used when you want to stop sending to the sink
232 /// until all current requests are processed.
233 fn flush(&mut self) -> Flush<'_, Self, Item>
234 where
235 Self: Unpin,
236 {
237 Flush::new(self)
238 }
239
240 /// A future that completes after the given item has been fully processed
241 /// into the sink, including flushing.
242 ///
243 /// Note that, **because of the flushing requirement, it is usually better
244 /// to batch together items to send via `feed` or `send_all`,
245 /// rather than flushing between each item.**
246 fn send(&mut self, item: Item) -> Send<'_, Self, Item>
247 where
248 Self: Unpin,
249 {
250 Send::new(self, item)
251 }
252
253 /// A future that completes after the given item has been received
254 /// by the sink.
255 ///
256 /// Unlike `send`, the returned future does not flush the sink.
257 /// It is the caller's responsibility to ensure all pending items
258 /// are processed, which can be done via `flush` or `close`.
259 fn feed(&mut self, item: Item) -> Feed<'_, Self, Item>
260 where
261 Self: Unpin,
262 {
263 Feed::new(self, item)
264 }
265
266 /// A future that completes after the given stream has been fully processed
267 /// into the sink, including flushing.
268 ///
269 /// This future will drive the stream to keep producing items until it is
270 /// exhausted, sending each item to the sink. It will complete once both the
271 /// stream is exhausted, the sink has received all items, and the sink has
272 /// been flushed. Note that the sink is **not** closed. If the stream produces
273 /// an error, that error will be returned by this future without flushing the sink.
274 ///
275 /// Doing `sink.send_all(stream)` is roughly equivalent to
276 /// `stream.forward(sink)`. The returned future will exhaust all items from
277 /// `stream` and send them to `self`.
278 fn send_all<'a, St>(&'a mut self, stream: &'a mut St) -> SendAll<'a, Self, St>
279 where
280 St: TryStream<Ok = Item, Error = Self::Error> + Unpin + ?Sized,
281 Self: Unpin,
282 {
283 SendAll::new(self, stream)
284 }
285
286 /// Wrap this sink in an `Either` sink, making it the left-hand variant
287 /// of that `Either`.
288 ///
289 /// This can be used in combination with the `right_sink` method to write `if`
290 /// statements that evaluate to different streams in different branches.
291 fn left_sink<Si2>(self) -> Either<Self, Si2>
292 where
293 Si2: Sink<Item, Error = Self::Error>,
294 Self: Sized,
295 {
296 Either::Left(self)
297 }
298
299 /// Wrap this stream in an `Either` stream, making it the right-hand variant
300 /// of that `Either`.
301 ///
302 /// This can be used in combination with the `left_sink` method to write `if`
303 /// statements that evaluate to different streams in different branches.
304 fn right_sink<Si1>(self) -> Either<Si1, Self>
305 where
306 Si1: Sink<Item, Error = Self::Error>,
307 Self: Sized,
308 {
309 Either::Right(self)
310 }
311
312 /// A convenience method for calling [`Sink::poll_ready`] on [`Unpin`]
313 /// sink types.
314 fn poll_ready_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>
315 where
316 Self: Unpin,
317 {
318 Pin::new(self).poll_ready(cx)
319 }
320
321 /// A convenience method for calling [`Sink::start_send`] on [`Unpin`]
322 /// sink types.
323 fn start_send_unpin(&mut self, item: Item) -> Result<(), Self::Error>
324 where
325 Self: Unpin,
326 {
327 Pin::new(self).start_send(item)
328 }
329
330 /// A convenience method for calling [`Sink::poll_flush`] on [`Unpin`]
331 /// sink types.
332 fn poll_flush_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>
333 where
334 Self: Unpin,
335 {
336 Pin::new(self).poll_flush(cx)
337 }
338
339 /// A convenience method for calling [`Sink::poll_close`] on [`Unpin`]
340 /// sink types.
341 fn poll_close_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>
342 where
343 Self: Unpin,
344 {
345 Pin::new(self).poll_close(cx)
346 }
347}