Skip to main content

multipart_write/write/
mod.rs

1//! `MultipartWrite` combinators.
2//!
3//! This module contains the trait [`MultipartWriteExt`], which provides
4//! adapters for chaining and composing `MultipartWrite`rs.
5use crate::{
6    BoxFusedMultipartWrite, BoxMultipartWrite, FusedMultipartWrite,
7    LocalBoxFusedMultipartWrite, LocalBoxMultipartWrite, MultipartWrite,
8};
9
10use futures_core::future::Future;
11use std::pin::Pin;
12use std::task::{Context, Poll};
13
14mod and_then;
15pub use and_then::AndThen;
16
17mod buffered;
18pub use buffered::Buffered;
19
20mod complete;
21pub use complete::Complete;
22
23mod from_extend;
24pub use from_extend::{FromExtend, from_extend};
25
26mod fanout;
27pub use fanout::Fanout;
28
29mod feed;
30pub use feed::Feed;
31
32mod filter_map_part;
33pub use filter_map_part::FilterMapPart;
34
35mod filter_part;
36pub use filter_part::FilterPart;
37
38mod flush;
39pub use flush::Flush;
40
41mod fold_sent;
42pub use fold_sent::FoldSent;
43
44mod fuse;
45pub use fuse::Fuse;
46
47mod lift;
48pub use lift::Lift;
49
50mod map_err;
51pub use map_err::MapErr;
52
53mod map_ok;
54pub use map_ok::MapOk;
55
56mod map_sent;
57pub use map_sent::MapSent;
58
59mod ready_part;
60pub use ready_part::ReadyPart;
61
62mod resolve;
63pub use resolve::{Resolve, resolve};
64
65mod send_flush;
66pub use send_flush::SendFlush;
67
68impl<Wr: MultipartWrite<Part>, Part> MultipartWriteExt<Part> for Wr {}
69
70/// An extension trait for `MultipartWrite` providing a variety of convenient
71/// combinator functions.
72pub trait MultipartWriteExt<Part>: MultipartWrite<Part> {
73    /// Compute from this writer's output type a new result using an
74    /// asynchronous closure.
75    ///
76    /// Calling `poll_complete` on this writer will complete the inner writer,
77    /// then run the provided closure `f` with the output to produce the final
78    /// output of this writer.
79    ///
80    /// # Examples
81    ///
82    /// ```rust
83    /// # futures::executor::block_on(async {
84    /// use std::io::Error as IoError;
85    ///
86    /// use futures::future;
87    /// use multipart_write::{MultipartWriteExt as _, write};
88    ///
89    /// // `from_extend` turns an impl of `std::iter::Extend` into a writer,
90    /// // here one that writes a `u8` and outputs a `Vec<u8>`.
91    /// let mut writer = write::from_extend::<u8, Vec<u8>>().and_then(|vs| {
92    ///     future::ready(Ok::<_, IoError>(vs.iter().fold(0, |acc, n| acc + n)))
93    /// });
94    ///
95    /// writer.send_flush(1).await.unwrap();
96    /// writer.send_flush(2).await.unwrap();
97    /// writer.send_flush(3).await.unwrap();
98    /// let out = writer.complete().await.unwrap();
99    ///
100    /// assert_eq!(out, 6);
101    /// # });
102    /// ```
103    fn and_then<T, E, Fut, F>(self, f: F) -> AndThen<Self, Fut, F>
104    where
105        F: FnMut(Self::Output) -> Fut,
106        Fut: Future<Output = Result<T, E>>,
107        E: From<Self::Error>,
108        Self: Sized,
109    {
110        assert_writer::<Part, Self::Recv, E, T, _>(AndThen::new(self, f))
111    }
112
113    /// Wrap this writer in a `Box`, pinning it.
114    fn boxed<'a>(
115        self,
116    ) -> BoxMultipartWrite<'a, Part, Self::Recv, Self::Output, Self::Error>
117    where
118        Self: Sized + Send + 'a,
119    {
120        Box::pin(self)
121    }
122
123    /// Wrap this writer, which additionally has conditions making it a
124    /// [`FusedMultipartWrite`], in a `Box`, pinning it.
125    fn box_fused<'a>(
126        self,
127    ) -> BoxFusedMultipartWrite<'a, Part, Self::Recv, Self::Output, Self::Error>
128    where
129        Self: Sized + Send + FusedMultipartWrite<Part> + 'a,
130    {
131        Box::pin(self)
132    }
133
134    /// Wrap this writer, which additionally has conditions making it a
135    /// [`FusedMultipartWrite`], in a `Box`, pinning it.
136    ///
137    /// Similar to `box_fused` but without the `Send` requirement.
138    fn box_fused_local<'a>(
139        self,
140    ) -> LocalBoxFusedMultipartWrite<
141        'a,
142        Part,
143        Self::Recv,
144        Self::Output,
145        Self::Error,
146    >
147    where
148        Self: Sized + FusedMultipartWrite<Part> + 'a,
149    {
150        Box::pin(self)
151    }
152
153    /// Wrap this writer in a `Box`, pinning it.
154    ///
155    /// Similar to `boxed` but without the `Send` requirement.
156    fn boxed_local<'a>(
157        self,
158    ) -> LocalBoxMultipartWrite<'a, Part, Self::Recv, Self::Output, Self::Error>
159    where
160        Self: Sized + 'a,
161    {
162        Box::pin(self)
163    }
164
165    /// Adds a fixed size buffer to the current writer.
166    ///
167    /// The resulting `MultipartWrite` will buffer up to `capacity` items when
168    /// the underlying writer is not able to accept new parts.
169    ///
170    /// The values returned when the underlying writer has received a part are
171    /// also accumulated and returned in batch.
172    fn buffered(
173        self,
174        capacity: impl Into<Option<usize>>,
175    ) -> Buffered<Self, Part>
176    where
177        Self: Sized,
178    {
179        assert_writer::<
180            Part,
181            Option<Vec<Self::Recv>>,
182            Self::Error,
183            Self::Output,
184            _,
185        >(Buffered::new(self, capacity.into().unwrap_or_default()))
186    }
187
188    /// A future that runs this writer to completion, returning the associated
189    /// output.
190    fn complete(&mut self) -> Complete<'_, Self, Part>
191    where
192        Self: Unpin,
193    {
194        Complete::new(self)
195    }
196
197    /// Fanout the part to multiple writers.
198    ///
199    /// This adapter clones each incoming part and forwards it to both writers.
200    ///
201    /// # Examples
202    ///
203    /// ```rust
204    /// # futures::executor::block_on(async {
205    /// use multipart_write::{MultipartWriteExt as _, write};
206    ///
207    /// let wr1 = write::from_extend::<u8, Vec<u8>>();
208    /// let wr2 = write::from_extend::<u8, Vec<u8>>();
209    /// let mut writer = wr1.fanout(wr2);
210    ///
211    /// writer.send_flush(1).await.unwrap();
212    /// writer.send_flush(2).await.unwrap();
213    /// writer.send_flush(3).await.unwrap();
214    /// let out = writer.complete().await.unwrap();
215    ///
216    /// assert_eq!(out, (vec![1, 2, 3], vec![1, 2, 3]));
217    /// # })
218    /// ```
219    fn fanout<U>(self, other: U) -> Fanout<Self, U, Part>
220    where
221        Part: Clone,
222        U: MultipartWrite<Part, Error = Self::Error>,
223        Self: Sized,
224    {
225        assert_writer::<
226            Part,
227            (Self::Recv, U::Recv),
228            Self::Error,
229            (Self::Output, U::Output),
230            _,
231        >(Fanout::new(self, other))
232    }
233
234    /// A future that completes after the given part has been received by the
235    /// writer.
236    ///
237    /// Unlike `send_flush`, the returned future does not flush the writer.  It
238    /// is the caller's responsibility to ensure all pending items are
239    /// processed, which can be done with `flush` or `complete`.
240    fn feed(&mut self, part: Part) -> Feed<'_, Self, Part>
241    where
242        Self: Unpin,
243    {
244        Feed::new(self, part)
245    }
246
247    /// Apply a filter to this writer's parts, returning a new writer with the
248    /// same output.
249    ///
250    /// The return type of this writer is `Option<Self::Recv>` and is `None`
251    /// when the part did not pass the filter.
252    ///
253    /// # Examples
254    ///
255    /// ```rust
256    /// # futures::executor::block_on(async {
257    /// use multipart_write::{MultipartWriteExt, write};
258    ///
259    /// let mut writer =
260    ///     write::from_extend::<u8, Vec<u8>>().filter_part(|n| n % 2 == 0);
261    ///
262    /// let r1 = writer.send_flush(1).await.unwrap();
263    /// let r2 = writer.send_flush(2).await.unwrap();
264    /// let r3 = writer.send_flush(3).await.unwrap();
265    /// let out = writer.complete().await.unwrap();
266    ///
267    /// assert!(r1.is_none() && r2.is_some() && r3.is_none());
268    /// assert_eq!(out, vec![2]);
269    /// # })
270    /// ```
271    fn filter_part<F>(self, f: F) -> FilterPart<Self, F>
272    where
273        F: FnMut(&Part) -> bool,
274        Self: Sized,
275    {
276        assert_writer::<Part, Option<Self::Recv>, Self::Error, Self::Output, _>(
277            FilterPart::new(self, f),
278        )
279    }
280
281    /// Attempt to map the input to a part for this writer, filtering out the
282    /// inputs where the mapping returns `None`.
283    ///
284    /// The return type of this writer is `Option<Self::Recv>` and is `None`
285    /// when the provided closure returns `None`.
286    ///
287    /// # Examples
288    ///
289    /// ```rust
290    /// # futures::executor::block_on(async {
291    /// use multipart_write::{MultipartWriteExt as _, write};
292    ///
293    /// let mut writer = write::from_extend::<String, Vec<String>>()
294    ///     .filter_map_part::<u8, _>(|n| {
295    ///         if n % 2 == 0 { Some(n.to_string()) } else { None }
296    ///     });
297    ///
298    /// let r1 = writer.send_flush(1).await.unwrap();
299    /// let r2 = writer.send_flush(2).await.unwrap();
300    /// let r3 = writer.send_flush(3).await.unwrap();
301    /// let out = writer.complete().await.unwrap();
302    ///
303    /// assert!(r1.is_none() && r2.is_some() && r3.is_none());
304    /// assert_eq!(out, vec!["2".to_string()]);
305    /// # })
306    /// ```
307    fn filter_map_part<U, F>(self, f: F) -> FilterMapPart<Self, F>
308    where
309        F: FnMut(U) -> Option<Part>,
310        Self: Sized,
311    {
312        assert_writer::<U, Option<Self::Recv>, Self::Error, Self::Output, _>(
313            FilterMapPart::new(self, f),
314        )
315    }
316
317    /// A future that completes when the underlying writer has been flushed.
318    fn flush(&mut self) -> Flush<'_, Self, Part>
319    where
320        Self: Unpin,
321    {
322        Flush::new(self)
323    }
324
325    /// Accumulate the values returned by starting a send, returning it with the
326    /// output.
327    ///
328    /// # Examples
329    ///
330    /// ```rust
331    /// # futures::executor::block_on(async {
332    /// use multipart_write::{MultipartWriteExt as _, write};
333    ///
334    /// let mut writer =
335    ///     write::from_extend::<u8, Vec<u8>>().fold_sent(0, |n, _| n + 1);
336    ///
337    /// let r1 = writer.send_flush(1).await.unwrap();
338    /// let r2 = writer.send_flush(2).await.unwrap();
339    /// let r3 = writer.send_flush(3).await.unwrap();
340    /// let out = writer.complete().await.unwrap();
341    ///
342    /// assert_eq!(out, (3, vec![1, 2, 3]));
343    /// # })
344    /// ```
345    fn fold_sent<T, F>(self, id: T, f: F) -> FoldSent<Self, T, F>
346    where
347        F: FnMut(T, &Self::Recv) -> T,
348        Self: Sized,
349    {
350        assert_writer::<Part, Self::Recv, Self::Error, (T, Self::Output), _>(
351            FoldSent::new(self, id, f),
352        )
353    }
354
355    /// Returns a new writer that fuses according to the provided closure.
356    ///
357    /// The resulting writer wraps both `Self::Recv` and `Self::Output` in
358    /// an `Option` and is guaranted to both output and return `Ok(None)`
359    /// when called after becoming fused.
360    fn fuse<F>(self, f: F) -> Fuse<Self, F>
361    where
362        F: FnMut(&Self::Output) -> bool,
363        Self: Sized,
364    {
365        assert_writer::<
366            Part,
367            Option<Self::Recv>,
368            Self::Error,
369            Option<Self::Output>,
370            _,
371        >(Fuse::new(self, f))
372    }
373
374    /// Produce the parts for this writer from the output of another writer.
375    ///
376    /// # Examples
377    ///
378    /// ```rust
379    /// # futures::executor::block_on(async {
380    /// use multipart_write::{MultipartWriteExt as _, write};
381    ///
382    /// let wr = write::from_extend::<u8, Vec<u8>>()
383    ///     .map_ok(|vs| vs.iter().fold(0, |acc, n| acc + n));
384    /// let mut writer = write::from_extend::<u8, Vec<u8>>().lift(wr);
385    ///
386    /// // We use `feed` and not `send_flush` because `send_flush` will complete
387    /// // the outer writer and write its output to the inner writer after each
388    /// // send, which is not what we want the example to show.
389    /// writer.feed(1).await.unwrap();
390    /// writer.feed(2).await.unwrap();
391    ///
392    /// // Flush the writer manually, which now completes the outer writer and
393    /// // writes its output, the sum of the parts written, to the inner writer.
394    /// writer.flush().await.unwrap();
395    ///
396    /// writer.feed(3).await.unwrap();
397    /// writer.feed(4).await.unwrap();
398    /// writer.feed(5).await.unwrap();
399    /// let out = writer.complete().await.unwrap();
400    ///
401    /// assert_eq!(out, vec![3, 12]);
402    /// # })
403    /// ```
404    fn lift<U, T>(self, other: U) -> Lift<Self, U, Part>
405    where
406        Self: Sized,
407        Self::Error: From<U::Error>,
408        U: MultipartWrite<T, Output = Part>,
409    {
410        assert_writer::<T, U::Recv, Self::Error, Self::Output, _>(Lift::new(
411            self, other,
412        ))
413    }
414
415    /// Map this writer's return type to a different value, returning a new
416    /// multipart writer with the given return type.
417    ///
418    /// # Examples
419    ///
420    /// ```rust
421    /// # futures::executor::block_on(async {
422    /// use multipart_write::{MultipartWriteExt as _, write};
423    ///
424    /// let mut writer = write::from_extend::<u8, Vec<u8>>()
425    ///     .map_sent::<&'static str, _>(|_| "OK");
426    ///
427    /// let r1 = writer.send_flush(1).await.unwrap();
428    /// let r2 = writer.send_flush(2).await.unwrap();
429    /// let r3 = writer.send_flush(3).await.unwrap();
430    /// let out = writer.complete().await.unwrap();
431    ///
432    /// assert_eq!(vec![r1, r2, r3], vec!["OK", "OK", "OK"]);
433    /// assert_eq!(out, vec![1, 2, 3]);
434    /// # })
435    /// ```
436    fn map_sent<U, F>(self, f: F) -> MapSent<Self, F>
437    where
438        F: FnMut(Self::Recv) -> U,
439        Self: Sized,
440    {
441        assert_writer::<Part, U, Self::Error, Self::Output, _>(MapSent::new(
442            self, f,
443        ))
444    }
445
446    /// Map this writer's error type to a different value, returning a new
447    /// multipart writer with the given error type.
448    fn map_err<E, F>(self, f: F) -> MapErr<Self, F>
449    where
450        F: FnMut(Self::Error) -> E,
451        Self: Sized,
452    {
453        assert_writer::<Part, Self::Recv, E, Self::Output, _>(MapErr::new(
454            self, f,
455        ))
456    }
457
458    /// Map this writer's output type to a different type, returning a new
459    /// multipart writer with the given output type.
460    fn map_ok<U, F>(self, f: F) -> MapOk<Self, F>
461    where
462        F: FnMut(Self::Output) -> U,
463        Self: Sized,
464    {
465        assert_writer::<Part, Self::Recv, Self::Error, U, _>(MapOk::new(
466            self, f,
467        ))
468    }
469
470    /// A convenience method for calling [`MultipartWrite::poll_ready`] on
471    /// [`Unpin`] writer types.
472    #[must_use = "futures do nothing unless polled"]
473    fn poll_ready_unpin(
474        &mut self,
475        cx: &mut Context<'_>,
476    ) -> Poll<Result<(), Self::Error>>
477    where
478        Self: Unpin,
479    {
480        Pin::new(self).poll_ready(cx)
481    }
482
483    /// A convenience method for calling [`MultipartWrite::poll_flush`] on
484    /// [`Unpin`] writer types.
485    #[must_use = "futures do nothing unless polled"]
486    fn poll_flush_unpin(
487        &mut self,
488        cx: &mut Context<'_>,
489    ) -> Poll<Result<(), Self::Error>>
490    where
491        Self: Unpin,
492    {
493        Pin::new(self).poll_flush(cx)
494    }
495
496    /// A convenience method for calling [`MultipartWrite::poll_complete`] on
497    /// [`Unpin`] writer types.
498    #[must_use = "futures do nothing unless polled"]
499    fn poll_complete_unpin(
500        &mut self,
501        cx: &mut Context<'_>,
502    ) -> Poll<Result<Self::Output, Self::Error>>
503    where
504        Self: Unpin,
505    {
506        Pin::new(self).poll_complete(cx)
507    }
508
509    /// Provide a part to this writer in the output of a future.
510    ///
511    /// The result is a new writer over the type `U` that passes each value
512    /// through the function `f`, resolving the output, and sending it to the
513    /// inner writer.
514    ///
515    /// # Examples
516    ///
517    /// ```rust
518    /// # futures::executor::block_on(async {
519    /// use std::io::Error as IoError;
520    ///
521    /// use futures::future;
522    /// use multipart_write::{MultipartWriteExt as _, write};
523    ///
524    /// let mut writer = write::from_extend::<u8, Vec<u8>>()
525    ///     .ready_part(|n| future::ready(Ok::<_, IoError>(n + 1_u8)));
526    ///
527    /// writer.send_flush(1).await.unwrap();
528    /// writer.send_flush(2).await.unwrap();
529    /// writer.send_flush(3).await.unwrap();
530    /// let out = writer.complete().await.unwrap();
531    ///
532    /// assert_eq!(out, vec![2, 3, 4]);
533    /// # })
534    /// ```
535    fn ready_part<U, E, Fut, F>(self, f: F) -> ReadyPart<Self, Part, Fut, F>
536    where
537        F: FnMut(U) -> Fut,
538        Fut: Future<Output = Result<Part, E>>,
539        E: From<Self::Error>,
540        Self: Sized,
541    {
542        assert_writer::<U, (), E, Self::Output, _>(ReadyPart::new(self, f))
543    }
544
545    /// A future that completes when a part has been fully processed into the
546    /// writer, including flushing.
547    fn send_flush(&mut self, part: Part) -> SendFlush<'_, Self, Part>
548    where
549        Self: Unpin,
550    {
551        SendFlush::new(self, part)
552    }
553}
554
555fn assert_writer<Part, R, E, T, Wr>(wr: Wr) -> Wr
556where
557    Wr: MultipartWrite<Part, Recv = R, Error = E, Output = T>,
558{
559    wr
560}