Skip to main content

multipart_write/write/
mod.rs

1//! `MultipartWrite` combinators.
2//!
3//! This module contains the trait [`MultipartWriteExt`], which provides
4//! adapters for chaining and composing writers.
5use crate::{
6    BoxFusedMultipartWrite, BoxMultipartWrite, FusedMultipartWrite,
7    LocalBoxFusedMultipartWrite, LocalBoxMultipartWrite, MultipartWrite,
8};
9
10use futures_core::future::Future;
11use std::pin::Pin;
12use std::task::{Context, Poll};
13
14mod buffered;
15pub use buffered::Buffered;
16
17mod complete;
18pub use complete::Complete;
19
20mod from_extend;
21pub use from_extend::{FromExtend, from_extend};
22
23mod fanout;
24pub use fanout::Fanout;
25
26mod feed;
27pub use feed::Feed;
28
29mod filter_map_part;
30pub use filter_map_part::FilterMapPart;
31
32mod filter_part;
33pub use filter_part::FilterPart;
34
35mod flush;
36pub use flush::Flush;
37
38mod fold_sent;
39pub use fold_sent::FoldSent;
40
41mod for_each_recv;
42pub use for_each_recv::ForEachRecv;
43
44mod fuse;
45pub use fuse::Fuse;
46
47mod lift;
48pub use lift::Lift;
49
50mod map_err;
51pub use map_err::MapErr;
52
53mod map_ok;
54pub use map_ok::MapOk;
55
56mod map_sent;
57pub use map_sent::MapSent;
58
59mod ready_part;
60pub use ready_part::ReadyPart;
61
62mod send_flush;
63pub use send_flush::SendFlush;
64
65mod then;
66pub use then::Then;
67
68impl<Wr: MultipartWrite<Part>, Part> MultipartWriteExt<Part> for Wr {}
69
70/// An extension trait for `MultipartWrite` providing a variety of convenient
71/// combinator functions.
72pub trait MultipartWriteExt<Part>: MultipartWrite<Part> {
73    /// Wrap this writer in a `Box`, pinning it.
74    fn boxed<'a>(
75        self,
76    ) -> BoxMultipartWrite<'a, Part, Self::Recv, Self::Output, Self::Error>
77    where
78        Self: Sized + Send + 'a,
79    {
80        Box::pin(self)
81    }
82
83    /// Wrap this writer, which additionally has conditions making it a
84    /// [`FusedMultipartWrite`], in a `Box`, pinning it.
85    fn box_fused<'a>(
86        self,
87    ) -> BoxFusedMultipartWrite<'a, Part, Self::Recv, Self::Output, Self::Error>
88    where
89        Self: Sized + Send + FusedMultipartWrite<Part> + 'a,
90    {
91        Box::pin(self)
92    }
93
94    /// Wrap this writer, which additionally has conditions making it a
95    /// [`FusedMultipartWrite`], in a `Box`, pinning it.
96    ///
97    /// Similar to `box_fused` but without the `Send` requirement.
98    fn box_fused_local<'a>(
99        self,
100    ) -> LocalBoxFusedMultipartWrite<
101        'a,
102        Part,
103        Self::Recv,
104        Self::Output,
105        Self::Error,
106    >
107    where
108        Self: Sized + FusedMultipartWrite<Part> + 'a,
109    {
110        Box::pin(self)
111    }
112
113    /// Wrap this writer in a `Box`, pinning it.
114    ///
115    /// Similar to `boxed` but without the `Send` requirement.
116    fn boxed_local<'a>(
117        self,
118    ) -> LocalBoxMultipartWrite<'a, Part, Self::Recv, Self::Output, Self::Error>
119    where
120        Self: Sized + 'a,
121    {
122        Box::pin(self)
123    }
124
125    /// Adds a fixed size buffer to the current writer.
126    ///
127    /// The resulting `MultipartWrite` will buffer up to `capacity` items when
128    /// the underlying writer is not able to accept new parts.
129    ///
130    /// The values returned when the underlying writer has received a part are
131    /// also accumulated and returned in batch.
132    fn buffered(
133        self,
134        capacity: impl Into<Option<usize>>,
135    ) -> Buffered<Self, Part>
136    where
137        Self: Sized,
138    {
139        assert_writer::<
140            Part,
141            Option<Vec<Self::Recv>>,
142            Self::Error,
143            Self::Output,
144            _,
145        >(Buffered::new(self, capacity.into().unwrap_or_default()))
146    }
147
148    /// A future that runs this writer to completion, returning the associated
149    /// output.
150    fn complete(&mut self) -> Complete<'_, Self, Part>
151    where
152        Self: Unpin,
153    {
154        Complete::new(self)
155    }
156
157    /// Fanout the part to multiple writers.
158    ///
159    /// This adapter clones each incoming part and forwards it to both writers.
160    ///
161    /// # Examples
162    ///
163    /// ```rust
164    /// # futures::executor::block_on(async {
165    /// use multipart_write::{MultipartWriteExt as _, write};
166    ///
167    /// let wr1 = write::from_extend::<u8, Vec<u8>>();
168    /// let wr2 = write::from_extend::<u8, Vec<u8>>();
169    /// let mut writer = wr1.fanout(wr2);
170    ///
171    /// writer.send_flush(1).await.unwrap();
172    /// writer.send_flush(2).await.unwrap();
173    /// writer.send_flush(3).await.unwrap();
174    /// let out = writer.complete().await.unwrap();
175    ///
176    /// assert_eq!(out, (vec![1, 2, 3], vec![1, 2, 3]));
177    /// # })
178    /// ```
179    fn fanout<U>(self, other: U) -> Fanout<Self, U, Part>
180    where
181        Part: Clone,
182        U: MultipartWrite<Part, Error = Self::Error>,
183        Self: Sized,
184    {
185        assert_writer::<
186            Part,
187            (Self::Recv, U::Recv),
188            Self::Error,
189            (Self::Output, U::Output),
190            _,
191        >(Fanout::new(self, other))
192    }
193
194    /// A future that completes after the given part has been received by the
195    /// writer.
196    ///
197    /// Unlike `send_flush`, the returned future does not flush the writer.  It
198    /// is the caller's responsibility to ensure all pending items are
199    /// processed, which can be done with `flush` or `complete`.
200    fn feed(&mut self, part: Part) -> Feed<'_, Self, Part>
201    where
202        Self: Unpin,
203    {
204        Feed::new(self, part)
205    }
206
207    /// Apply a filter to this writer's parts, returning a new writer with the
208    /// same output.
209    ///
210    /// The return type of this writer is `Option<Self::Recv>` and is `None`
211    /// when the part did not pass the filter.
212    ///
213    /// # Examples
214    ///
215    /// ```rust
216    /// # futures::executor::block_on(async {
217    /// use multipart_write::{MultipartWriteExt, write};
218    ///
219    /// let mut writer =
220    ///     write::from_extend::<u8, Vec<u8>>().filter_part(|n| n % 2 == 0);
221    ///
222    /// let r1 = writer.send_flush(1).await.unwrap();
223    /// let r2 = writer.send_flush(2).await.unwrap();
224    /// let r3 = writer.send_flush(3).await.unwrap();
225    /// let out = writer.complete().await.unwrap();
226    ///
227    /// assert!(r1.is_none() && r2.is_some() && r3.is_none());
228    /// assert_eq!(out, vec![2]);
229    /// # })
230    /// ```
231    fn filter_part<F>(self, f: F) -> FilterPart<Self, F>
232    where
233        F: FnMut(&Part) -> bool,
234        Self: Sized,
235    {
236        assert_writer::<Part, Option<Self::Recv>, Self::Error, Self::Output, _>(
237            FilterPart::new(self, f),
238        )
239    }
240
241    /// Attempt to map the input to a part for this writer, filtering out the
242    /// inputs where the mapping returns `None`.
243    ///
244    /// The return type of this writer is `Option<Self::Recv>` and is `None`
245    /// when the provided closure returns `None`.
246    ///
247    /// # Examples
248    ///
249    /// ```rust
250    /// # futures::executor::block_on(async {
251    /// use multipart_write::{MultipartWriteExt as _, write};
252    ///
253    /// let mut writer = write::from_extend::<String, Vec<String>>()
254    ///     .filter_map_part::<u8, _>(|n| {
255    ///         if n % 2 == 0 { Some(n.to_string()) } else { None }
256    ///     });
257    ///
258    /// let r1 = writer.send_flush(1).await.unwrap();
259    /// let r2 = writer.send_flush(2).await.unwrap();
260    /// let r3 = writer.send_flush(3).await.unwrap();
261    /// let out = writer.complete().await.unwrap();
262    ///
263    /// assert!(r1.is_none() && r2.is_some() && r3.is_none());
264    /// assert_eq!(out, vec!["2".to_string()]);
265    /// # })
266    /// ```
267    fn filter_map_part<U, F>(self, f: F) -> FilterMapPart<Self, F>
268    where
269        F: FnMut(U) -> Option<Part>,
270        Self: Sized,
271    {
272        assert_writer::<U, Option<Self::Recv>, Self::Error, Self::Output, _>(
273            FilterMapPart::new(self, f),
274        )
275    }
276
277    /// A future that completes when the underlying writer has been flushed.
278    fn flush(&mut self) -> Flush<'_, Self, Part>
279    where
280        Self: Unpin,
281    {
282        Flush::new(self)
283    }
284
285    /// Accumulate the values returned by starting a send, returning it with the
286    /// output.
287    ///
288    /// # Examples
289    ///
290    /// ```rust
291    /// # futures::executor::block_on(async {
292    /// use multipart_write::{MultipartWriteExt as _, write};
293    ///
294    /// let mut writer =
295    ///     write::from_extend::<u8, Vec<u8>>().fold_sent(0, |n, _| n + 1);
296    ///
297    /// let r1 = writer.send_flush(1).await.unwrap();
298    /// let r2 = writer.send_flush(2).await.unwrap();
299    /// let r3 = writer.send_flush(3).await.unwrap();
300    /// let out = writer.complete().await.unwrap();
301    ///
302    /// assert_eq!(out, (3, vec![1, 2, 3]));
303    /// # })
304    /// ```
305    fn fold_sent<T, F>(self, id: T, f: F) -> FoldSent<Self, T, F>
306    where
307        F: FnMut(T, &Self::Recv) -> T,
308        Self: Sized,
309    {
310        assert_writer::<Part, Self::Recv, Self::Error, (T, Self::Output), _>(
311            FoldSent::new(self, id, f),
312        )
313    }
314
315    /// Evaluate the given async closure on the associated `Self::Recv` for this
316    /// writer.
317    ///
318    /// The result is a new writer that has all of the same properties as this
319    /// writer, except that `poll_ready` will not accept the next part until the
320    /// future returned by evaluating `F` on the return value resolves.
321    ///
322    /// # Examples
323    ///
324    /// ```rust
325    /// # futures::executor::block_on(async {
326    /// use std::sync::Arc;
327    /// use std::sync::atomic::{AtomicU8, Ordering};
328    ///
329    /// use multipart_write::{MultipartWriteExt as _, write};
330    ///
331    /// let counter = Arc::new(AtomicU8::new(1));
332    ///
333    /// // `from_extend` has no return type, so `map_sent` makes one for the
334    /// // demonstration.
335    /// let wr = write::from_extend::<u8, Vec<u8>>().map_sent::<u8, _>(|_| {
336    ///     let cnt = Arc::clone(&counter);
337    ///     let n = cnt.fetch_add(1, Ordering::SeqCst);
338    ///     n
339    /// });
340    ///
341    /// let mut writer = wr.for_each_recv(|n| {
342    ///     println!("{n} parts written");
343    ///     futures::future::ready(())
344    /// });
345    ///
346    /// let r1 = writer.send_flush(1).await.unwrap();
347    /// let r2 = writer.send_flush(2).await.unwrap();
348    /// let r3 = writer.send_flush(3).await.unwrap();
349    /// let out = writer.complete().await.unwrap();
350    ///
351    /// assert_eq!(out, vec![1, 2, 3]);
352    /// # })
353    /// ```
354    fn for_each_recv<Fut, F>(self, f: F) -> ForEachRecv<Self, Fut, F>
355    where
356        Self: Sized,
357        Self::Recv: Clone,
358        F: FnMut(Self::Recv) -> Fut,
359        Fut: Future<Output = ()>,
360    {
361        assert_writer::<Part, Self::Recv, Self::Error, Self::Output, _>(
362            ForEachRecv::new(self, f),
363        )
364    }
365
366    /// Returns a new writer that fuses according to the provided closure.
367    ///
368    /// The resulting writer wraps both `Self::Recv` and `Self::Output` in
369    /// an `Option` and is guaranteed to both output and return `Ok(None)`
370    /// when called after becoming fused.
371    fn fuse<F>(self, f: F) -> Fuse<Self, F>
372    where
373        F: FnMut(&Self::Output) -> bool,
374        Self: Sized,
375    {
376        assert_writer::<
377            Part,
378            Option<Self::Recv>,
379            Self::Error,
380            Option<Self::Output>,
381            _,
382        >(Fuse::new(self, f))
383    }
384
385    /// Produce the parts for this writer from the output of another writer.
386    ///
387    /// # Examples
388    ///
389    /// ```rust
390    /// # futures::executor::block_on(async {
391    /// use multipart_write::{MultipartWriteExt as _, write};
392    ///
393    /// let wr =
394    ///     write::from_extend::<u8, Vec<u8>>().map_ok(|vs| vs.iter().sum::<u8>());
395    /// let mut writer = write::from_extend::<u8, Vec<u8>>().lift(wr);
396    ///
397    /// // We use `feed` and not `send_flush` because `send_flush` will complete
398    /// // the outer writer and write its output to the inner writer after each
399    /// // send, which is not what we want the example to show.
400    /// writer.feed(1).await.unwrap();
401    /// writer.feed(2).await.unwrap();
402    ///
403    /// // Flush the writer manually, which now completes the outer writer and
404    /// // writes its output, the sum of the parts written, to the inner writer.
405    /// writer.flush().await.unwrap();
406    ///
407    /// writer.feed(3).await.unwrap();
408    /// writer.feed(4).await.unwrap();
409    /// writer.feed(5).await.unwrap();
410    /// let out = writer.complete().await.unwrap();
411    ///
412    /// assert_eq!(out, vec![3, 12]);
413    /// # })
414    /// ```
415    fn lift<U, T>(self, other: U) -> Lift<Self, U, Part>
416    where
417        Self: Sized,
418        Self::Error: From<U::Error>,
419        U: MultipartWrite<T, Output = Part>,
420    {
421        assert_writer::<T, U::Recv, Self::Error, Self::Output, _>(Lift::new(
422            self, other,
423        ))
424    }
425
426    /// Map this writer's return type to a different value, returning a new
427    /// multipart writer with the given return type.
428    ///
429    /// # Examples
430    ///
431    /// ```rust
432    /// # futures::executor::block_on(async {
433    /// use multipart_write::{MultipartWriteExt as _, write};
434    ///
435    /// let mut writer = write::from_extend::<u8, Vec<u8>>()
436    ///     .map_sent::<&'static str, _>(|_| "OK");
437    ///
438    /// let r1 = writer.send_flush(1).await.unwrap();
439    /// let r2 = writer.send_flush(2).await.unwrap();
440    /// let r3 = writer.send_flush(3).await.unwrap();
441    /// let out = writer.complete().await.unwrap();
442    ///
443    /// assert_eq!(vec![r1, r2, r3], vec!["OK", "OK", "OK"]);
444    /// assert_eq!(out, vec![1, 2, 3]);
445    /// # })
446    /// ```
447    fn map_sent<U, F>(self, f: F) -> MapSent<Self, F>
448    where
449        F: FnMut(Self::Recv) -> U,
450        Self: Sized,
451    {
452        assert_writer::<Part, U, Self::Error, Self::Output, _>(MapSent::new(
453            self, f,
454        ))
455    }
456
457    /// Map this writer's error type to a different value, returning a new
458    /// multipart writer with the given error type.
459    fn map_err<E, F>(self, f: F) -> MapErr<Self, F>
460    where
461        F: FnMut(Self::Error) -> E,
462        Self: Sized,
463    {
464        assert_writer::<Part, Self::Recv, E, Self::Output, _>(MapErr::new(
465            self, f,
466        ))
467    }
468
469    /// Map this writer's output type to a different type, returning a new
470    /// multipart writer with the given output type.
471    fn map_ok<U, F>(self, f: F) -> MapOk<Self, F>
472    where
473        F: FnMut(Self::Output) -> U,
474        Self: Sized,
475    {
476        assert_writer::<Part, Self::Recv, Self::Error, U, _>(MapOk::new(
477            self, f,
478        ))
479    }
480
481    /// A convenience method for calling [`MultipartWrite::poll_ready`] on
482    /// [`Unpin`] writer types.
483    #[must_use = "futures do nothing unless polled"]
484    fn poll_ready_unpin(
485        &mut self,
486        cx: &mut Context<'_>,
487    ) -> Poll<Result<(), Self::Error>>
488    where
489        Self: Unpin,
490    {
491        Pin::new(self).poll_ready(cx)
492    }
493
494    /// A convenience method for calling [`MultipartWrite::poll_flush`] on
495    /// [`Unpin`] writer types.
496    #[must_use = "futures do nothing unless polled"]
497    fn poll_flush_unpin(
498        &mut self,
499        cx: &mut Context<'_>,
500    ) -> Poll<Result<(), Self::Error>>
501    where
502        Self: Unpin,
503    {
504        Pin::new(self).poll_flush(cx)
505    }
506
507    /// A convenience method for calling [`MultipartWrite::poll_complete`] on
508    /// [`Unpin`] writer types.
509    #[must_use = "futures do nothing unless polled"]
510    fn poll_complete_unpin(
511        &mut self,
512        cx: &mut Context<'_>,
513    ) -> Poll<Result<Self::Output, Self::Error>>
514    where
515        Self: Unpin,
516    {
517        Pin::new(self).poll_complete(cx)
518    }
519
520    /// Provide a part to this writer in the output of a future.
521    ///
522    /// The result is a new writer over the type `U` that passes each value
523    /// through the function `f`, resolving the output, and sending it to the
524    /// inner writer.
525    ///
526    /// # Examples
527    ///
528    /// ```rust
529    /// # futures::executor::block_on(async {
530    /// use std::io::Error as IoError;
531    ///
532    /// use futures::future;
533    /// use multipart_write::{MultipartWriteExt as _, write};
534    ///
535    /// let mut writer = write::from_extend::<u8, Vec<u8>>()
536    ///     .ready_part(|n| future::ready(Ok::<_, IoError>(n + 1_u8)));
537    ///
538    /// writer.send_flush(1).await.unwrap();
539    /// writer.send_flush(2).await.unwrap();
540    /// writer.send_flush(3).await.unwrap();
541    /// let out = writer.complete().await.unwrap();
542    ///
543    /// assert_eq!(out, vec![2, 3, 4]);
544    /// # })
545    /// ```
546    fn ready_part<U, E, Fut, F>(self, f: F) -> ReadyPart<Self, Part, Fut, F>
547    where
548        F: FnMut(U) -> Fut,
549        Fut: Future<Output = Result<Part, E>>,
550        E: From<Self::Error>,
551        Self: Sized,
552    {
553        assert_writer::<U, (), E, Self::Output, _>(ReadyPart::new(self, f))
554    }
555
556    /// A future that completes when a part has been fully processed into the
557    /// writer, including flushing.
558    fn send_flush(&mut self, part: Part) -> SendFlush<'_, Self, Part>
559    where
560        Self: Unpin,
561    {
562        SendFlush::new(self, part)
563    }
564
565    /// Asynchronously map the result of completing this writer to a different
566    /// result.
567    ///
568    /// # Examples
569    ///
570    /// ```rust
571    /// # futures::executor::block_on(async {
572    /// use multipart_write::{MultipartWriteExt as _, write};
573    ///
574    /// // `from_extend` turns an impl of `std::iter::Extend` into a writer,
575    /// // here one that writes a `u8` and outputs a `Vec<u8>`.
576    /// let mut writer = write::from_extend::<u8, Vec<u8>>().then(|res| {
577    ///     futures::future::ready(res.map(|vs| vs.iter().sum::<u8>()))
578    /// });
579    ///
580    /// writer.send_flush(1).await.unwrap();
581    /// writer.send_flush(2).await.unwrap();
582    /// writer.send_flush(3).await.unwrap();
583    /// let out = writer.complete().await.unwrap();
584    ///
585    /// assert_eq!(out, 6);
586    /// # });
587    /// ```
588    fn then<T, Fut, F>(self, f: F) -> Then<Self, Fut, F>
589    where
590        F: FnMut(Result<Self::Output, Self::Error>) -> Fut,
591        Fut: Future<Output = Result<T, Self::Error>>,
592        Self: Sized,
593    {
594        assert_writer::<Part, Self::Recv, Self::Error, T, _>(Then::new(self, f))
595    }
596}
597
598fn assert_writer<Part, R, E, T, Wr>(wr: Wr) -> Wr
599where
600    Wr: MultipartWrite<Part, Recv = R, Error = E, Output = T>,
601{
602    wr
603}