Skip to main content

multipart_write/write/
mod.rs

1//! `MultipartWrite` combinators.
2//!
3//! This module contains the trait [`MultipartWriteExt`], which provides
4//! adapters for chaining and composing writers.
5use crate::{
6    BoxFusedMultipartWrite, BoxMultipartWrite, FusedMultipartWrite,
7    LocalBoxFusedMultipartWrite, LocalBoxMultipartWrite, MultipartWrite,
8};
9
10use futures_core::future::Future;
11use std::pin::Pin;
12use std::task::{Context, Poll};
13
14mod buffered;
15pub use buffered::Buffered;
16
17mod complete;
18pub use complete::Complete;
19
20mod extend;
21pub use extend::{Extend, extend, extend_default};
22
23mod fanout;
24pub use fanout::Fanout;
25
26mod feed;
27pub use feed::Feed;
28
29mod filter_map_part;
30pub use filter_map_part::FilterMapPart;
31
32mod filter_part;
33pub use filter_part::FilterPart;
34
35mod flush;
36pub use flush::Flush;
37
38mod fold_sent;
39pub use fold_sent::FoldSent;
40
41mod for_each_recv;
42pub use for_each_recv::ForEachRecv;
43
44mod fuse;
45pub use fuse::Fuse;
46
47mod lift;
48pub use lift::Lift;
49
50mod map_err;
51pub use map_err::MapErr;
52
53mod map_ok;
54pub use map_ok::MapOk;
55
56mod map_sent;
57pub use map_sent::MapSent;
58
59mod ready_part;
60pub use ready_part::ReadyPart;
61
62mod send_flush;
63pub use send_flush::SendFlush;
64
65mod then;
66pub use then::Then;
67
68impl<Wr: MultipartWrite<Part>, Part> MultipartWriteExt<Part> for Wr {}
69
70/// An extension trait for `MultipartWrite` providing a variety of convenient
71/// combinator functions.
72pub trait MultipartWriteExt<Part>: MultipartWrite<Part> {
73    /// Wrap this writer in a `Box`, pinning it.
74    fn boxed<'a>(
75        self,
76    ) -> BoxMultipartWrite<'a, Part, Self::Recv, Self::Output, Self::Error>
77    where
78        Self: Sized + Send + 'a,
79    {
80        Box::pin(self)
81    }
82
83    /// Wrap this writer, which additionally has conditions making it a
84    /// [`FusedMultipartWrite`], in a `Box`, pinning it.
85    fn box_fused<'a>(
86        self,
87    ) -> BoxFusedMultipartWrite<'a, Part, Self::Recv, Self::Output, Self::Error>
88    where
89        Self: Sized + Send + FusedMultipartWrite<Part> + 'a,
90    {
91        Box::pin(self)
92    }
93
94    /// Wrap this writer, which additionally has conditions making it a
95    /// [`FusedMultipartWrite`], in a `Box`, pinning it.
96    ///
97    /// Similar to `box_fused` but without the `Send` requirement.
98    fn box_fused_local<'a>(
99        self,
100    ) -> LocalBoxFusedMultipartWrite<
101        'a,
102        Part,
103        Self::Recv,
104        Self::Output,
105        Self::Error,
106    >
107    where
108        Self: Sized + FusedMultipartWrite<Part> + 'a,
109    {
110        Box::pin(self)
111    }
112
113    /// Wrap this writer in a `Box`, pinning it.
114    ///
115    /// Similar to `boxed` but without the `Send` requirement.
116    fn boxed_local<'a>(
117        self,
118    ) -> LocalBoxMultipartWrite<'a, Part, Self::Recv, Self::Output, Self::Error>
119    where
120        Self: Sized + 'a,
121    {
122        Box::pin(self)
123    }
124
125    /// Adds a fixed size buffer to the current writer.
126    ///
127    /// The resulting `MultipartWrite` will buffer up to `capacity` items when
128    /// the underlying writer is not able to accept new parts.
129    ///
130    /// The values returned when the underlying writer has received a part are
131    /// also accumulated and returned in batch.
132    fn buffered(
133        self,
134        capacity: impl Into<Option<usize>>,
135    ) -> Buffered<Self, Part>
136    where
137        Self: Sized,
138    {
139        assert_writer::<
140            Part,
141            Option<Vec<Self::Recv>>,
142            Self::Error,
143            Self::Output,
144            _,
145        >(Buffered::new(self, capacity.into().unwrap_or_default()))
146    }
147
148    /// A future that runs this writer to completion, returning the associated
149    /// output.
150    fn complete(&mut self) -> Complete<'_, Self, Part>
151    where
152        Self: Unpin,
153    {
154        Complete::new(self)
155    }
156
157    /// Fanout the part to multiple writers.
158    ///
159    /// This adapter clones each incoming part and forwards it to both writers.
160    ///
161    /// # Examples
162    ///
163    /// ```rust
164    /// # futures::executor::block_on(async {
165    /// use multipart_write::{MultipartWriteExt as _, write};
166    ///
167    /// let init: Vec<u8> = Vec::new();
168    /// let wr1 = write::extend(init.clone());
169    /// let wr2 = write::extend(init);
170    ///
171    /// let mut writer = wr1.fanout(wr2);
172    /// writer.send_flush(1).await.unwrap();
173    /// writer.send_flush(2).await.unwrap();
174    /// writer.send_flush(3).await.unwrap();
175    /// let out = writer.complete().await.unwrap();
176    ///
177    /// assert_eq!(out, (vec![1, 2, 3], vec![1, 2, 3]));
178    /// # })
179    /// ```
180    fn fanout<U>(self, other: U) -> Fanout<Self, U, Part>
181    where
182        Part: Clone,
183        U: MultipartWrite<Part, Error = Self::Error>,
184        Self: Sized,
185    {
186        assert_writer::<
187            Part,
188            (Self::Recv, U::Recv),
189            Self::Error,
190            (Self::Output, U::Output),
191            _,
192        >(Fanout::new(self, other))
193    }
194
195    /// A future that completes after the given part has been received by the
196    /// writer.
197    ///
198    /// Unlike `send_flush`, the returned future does not flush the writer.  It
199    /// is the caller's responsibility to ensure all pending items are processed
200    /// by calling `flush` or `complete`.
201    fn feed(&mut self, part: Part) -> Feed<'_, Self, Part>
202    where
203        Self: Unpin,
204    {
205        Feed::new(self, part)
206    }
207
208    /// Apply a filter to this writer's parts, returning a new writer with the
209    /// same output.
210    ///
211    /// The return type of this writer is `Option<Self::Recv>` and is `None`
212    /// when the part did not pass the filter.
213    ///
214    /// # Examples
215    ///
216    /// ```rust
217    /// # futures::executor::block_on(async {
218    /// use multipart_write::{MultipartWriteExt, write};
219    ///
220    /// let init: Vec<u8> = Vec::new();
221    /// let mut writer = write::extend(init).filter_part(|n| n % 2 == 0);
222    ///
223    /// let r1 = writer.send_flush(1).await.unwrap();
224    /// let r2 = writer.send_flush(2).await.unwrap();
225    /// let r3 = writer.send_flush(3).await.unwrap();
226    /// let out = writer.complete().await.unwrap();
227    ///
228    /// assert!(r1.is_none() && r2.is_some() && r3.is_none());
229    /// assert_eq!(out, vec![2]);
230    /// # })
231    /// ```
232    fn filter_part<F>(self, f: F) -> FilterPart<Self, Part, F>
233    where
234        F: FnMut(&Part) -> bool,
235        Self: Sized,
236    {
237        assert_writer::<Part, Option<Self::Recv>, Self::Error, Self::Output, _>(
238            FilterPart::new(self, f),
239        )
240    }
241
242    /// Attempt to map the input to a part for this writer, filtering out the
243    /// inputs where the mapping returns `None`.
244    ///
245    /// The return type of this writer is `Option<Self::Recv>` and is `None`
246    /// when the provided closure returns `None`.
247    ///
248    /// # Examples
249    ///
250    /// ```rust
251    /// # futures::executor::block_on(async {
252    /// use multipart_write::{MultipartWriteExt as _, write};
253    ///
254    /// let init: Vec<String> = Vec::new();
255    /// let mut writer = write::extend(init).filter_map_part(|n: u8| {
256    ///     if n % 2 == 0 { Some(n.to_string()) } else { None }
257    /// });
258    ///
259    /// let r1 = writer.send_flush(1).await.unwrap();
260    /// let r2 = writer.send_flush(2).await.unwrap();
261    /// let r3 = writer.send_flush(3).await.unwrap();
262    /// let out = writer.complete().await.unwrap();
263    ///
264    /// assert!(r1.is_none() && r2.is_some() && r3.is_none());
265    /// assert_eq!(out, vec!["2".to_string()]);
266    /// # })
267    /// ```
268    fn filter_map_part<P, F>(self, f: F) -> FilterMapPart<Self, Part, P, F>
269    where
270        F: FnMut(P) -> Option<Part>,
271        Self: Sized,
272    {
273        assert_writer::<P, Option<Self::Recv>, Self::Error, Self::Output, _>(
274            FilterMapPart::new(self, f),
275        )
276    }
277
278    /// A future that completes when the underlying writer has been flushed.
279    fn flush(&mut self) -> Flush<'_, Self, Part>
280    where
281        Self: Unpin,
282    {
283        Flush::new(self)
284    }
285
286    /// Accumulate the values returned by starting a send, returning it with the
287    /// output.
288    ///
289    /// # Examples
290    ///
291    /// ```rust
292    /// # futures::executor::block_on(async {
293    /// use multipart_write::{MultipartWriteExt as _, write};
294    ///
295    /// let init: Vec<u8> = Vec::new();
296    /// let mut writer = write::extend(init).fold_sent(0, |n, _| n + 1);
297    ///
298    /// let r1 = writer.send_flush(1).await.unwrap();
299    /// let r2 = writer.send_flush(2).await.unwrap();
300    /// let r3 = writer.send_flush(3).await.unwrap();
301    /// let out = writer.complete().await.unwrap();
302    ///
303    /// assert_eq!(out, (3, vec![1, 2, 3]));
304    /// # })
305    /// ```
306    fn fold_sent<T, F>(self, id: T, f: F) -> FoldSent<Self, T, F, Part>
307    where
308        F: FnMut(T, &Self::Recv) -> T,
309        Self: Sized,
310    {
311        assert_writer::<Part, Self::Recv, Self::Error, (T, Self::Output), _>(
312            FoldSent::new(self, id, f),
313        )
314    }
315
316    /// Evaluate the given async closure on the associated `Self::Recv` for this
317    /// writer.
318    ///
319    /// The result is a new writer that has all of the same properties as this
320    /// writer, except that `poll_ready` will not accept the next part until the
321    /// future returned by evaluating `F` on the return value resolves.
322    ///
323    /// # Examples
324    ///
325    /// ```rust
326    /// # futures::executor::block_on(async {
327    /// use std::sync::Arc;
328    /// use std::sync::atomic::{AtomicU8, Ordering};
329    ///
330    /// use multipart_write::{MultipartWriteExt as _, write};
331    ///
332    /// let counter = Arc::new(AtomicU8::new(1));
333    ///
334    /// // `extend` has no return type, so `map_sent` makes one for the
335    /// // demonstration.
336    /// let init: Vec<u8> = Vec::new();
337    /// let mut writer = write::extend(init)
338    ///     .map_sent(|_| {
339    ///         let cnt = Arc::clone(&counter);
340    ///         let n = cnt.fetch_add(1, Ordering::SeqCst);
341    ///         n
342    ///     })
343    ///     .for_each_recv(|n| {
344    ///         println!("{n} parts written");
345    ///         futures::future::ready(())
346    ///     });
347    ///
348    /// let r1 = writer.send_flush(1).await.unwrap();
349    /// let r2 = writer.send_flush(2).await.unwrap();
350    /// let r3 = writer.send_flush(3).await.unwrap();
351    /// let out = writer.complete().await.unwrap();
352    ///
353    /// assert_eq!(out, vec![1, 2, 3]);
354    /// # })
355    /// ```
356    fn for_each_recv<Fut, F>(self, f: F) -> ForEachRecv<Self, Part, Fut, F>
357    where
358        Self: Sized,
359        Self::Recv: Clone,
360        F: FnMut(Self::Recv) -> Fut,
361        Fut: Future<Output = ()>,
362    {
363        assert_writer::<Part, Self::Recv, Self::Error, Self::Output, _>(
364            ForEachRecv::new(self, f),
365        )
366    }
367
368    /// Returns a new writer that fuses according to the provided closure.
369    ///
370    /// The resulting writer wraps both `Self::Recv` and `Self::Output` in
371    /// an `Option` and is guaranteed to both output and return `Ok(None)`
372    /// when called after becoming fused.
373    fn fuse<F>(self, f: F) -> Fuse<Self, Part, F>
374    where
375        F: FnMut(&Self::Output) -> bool,
376        Self: Sized,
377    {
378        assert_writer::<
379            Part,
380            Option<Self::Recv>,
381            Self::Error,
382            Option<Self::Output>,
383            _,
384        >(Fuse::new(self, f))
385    }
386
387    /// Produce the parts for this writer from the output of another writer.
388    ///
389    /// # Examples
390    ///
391    /// ```rust
392    /// # futures::executor::block_on(async {
393    /// use multipart_write::{MultipartWriteExt as _, write};
394    ///
395    /// let init: Vec<u8> = Vec::new();
396    /// let wr = write::extend(init.clone()).map_ok(|vs| vs.iter().sum::<u8>());
397    /// let mut writer = write::extend(init).lift(wr);
398    ///
399    /// // We use `feed` and not `send_flush` because `send_flush` will complete
400    /// // the outer writer and write its output to the inner writer after each
401    /// // send, which is not what we want the example to show.
402    /// writer.feed(1).await.unwrap();
403    /// writer.feed(2).await.unwrap();
404    ///
405    /// // Flush the writer manually, which now completes the outer writer and
406    /// // writes its output, the sum of the parts written, to the inner writer.
407    /// writer.flush().await.unwrap();
408    ///
409    /// writer.feed(3).await.unwrap();
410    /// writer.feed(4).await.unwrap();
411    /// writer.feed(5).await.unwrap();
412    /// let out = writer.complete().await.unwrap();
413    ///
414    /// assert_eq!(out, vec![3, 12]);
415    /// # })
416    /// ```
417    fn lift<U, P>(self, other: U) -> Lift<Self, U, P, Part>
418    where
419        Self: Sized,
420        Self::Error: From<U::Error>,
421        U: MultipartWrite<P, Output = Part>,
422    {
423        assert_writer::<P, U::Recv, Self::Error, Self::Output, _>(Lift::new(
424            self, other,
425        ))
426    }
427
428    /// Map this writer's return type to a different value, returning a new
429    /// multipart writer with the given return type.
430    ///
431    /// # Examples
432    ///
433    /// ```rust
434    /// # futures::executor::block_on(async {
435    /// use multipart_write::{MultipartWriteExt as _, write};
436    ///
437    /// let init: Vec<u8> = Vec::new();
438    /// let mut writer = write::extend(init).map_sent(|_| "OK");
439    ///
440    /// let r1 = writer.send_flush(1).await.unwrap();
441    /// let r2 = writer.send_flush(2).await.unwrap();
442    /// let r3 = writer.send_flush(3).await.unwrap();
443    /// let out = writer.complete().await.unwrap();
444    ///
445    /// assert_eq!(vec![r1, r2, r3], vec!["OK", "OK", "OK"]);
446    /// assert_eq!(out, vec![1, 2, 3]);
447    /// # })
448    /// ```
449    fn map_sent<R, F>(self, f: F) -> MapSent<Self, Part, R, F>
450    where
451        F: FnMut(Self::Recv) -> R,
452        Self: Sized,
453    {
454        assert_writer::<Part, R, Self::Error, Self::Output, _>(MapSent::new(
455            self, f,
456        ))
457    }
458
459    /// Map this writer's error type to a different value, returning a new
460    /// multipart writer with the given error type.
461    fn map_err<E, F>(self, f: F) -> MapErr<Self, Part, E, F>
462    where
463        F: FnMut(Self::Error) -> E,
464        Self: Sized,
465    {
466        assert_writer::<Part, Self::Recv, E, Self::Output, _>(MapErr::new(
467            self, f,
468        ))
469    }
470
471    /// Map this writer's output type to a different type, returning a new
472    /// multipart writer with the given output type.
473    fn map_ok<T, F>(self, f: F) -> MapOk<Self, Part, T, F>
474    where
475        F: FnMut(Self::Output) -> T,
476        Self: Sized,
477    {
478        assert_writer::<Part, Self::Recv, Self::Error, T, _>(MapOk::new(
479            self, f,
480        ))
481    }
482
483    /// A convenience method for calling [`MultipartWrite::poll_ready`] on
484    /// [`Unpin`] writer types.
485    #[must_use = "futures do nothing unless polled"]
486    fn poll_ready_unpin(
487        &mut self,
488        cx: &mut Context<'_>,
489    ) -> Poll<Result<(), Self::Error>>
490    where
491        Self: Unpin,
492    {
493        Pin::new(self).poll_ready(cx)
494    }
495
496    /// A convenience method for calling [`MultipartWrite::poll_flush`] on
497    /// [`Unpin`] writer types.
498    #[must_use = "futures do nothing unless polled"]
499    fn poll_flush_unpin(
500        &mut self,
501        cx: &mut Context<'_>,
502    ) -> Poll<Result<(), Self::Error>>
503    where
504        Self: Unpin,
505    {
506        Pin::new(self).poll_flush(cx)
507    }
508
509    /// A convenience method for calling [`MultipartWrite::poll_complete`] on
510    /// [`Unpin`] writer types.
511    #[must_use = "futures do nothing unless polled"]
512    fn poll_complete_unpin(
513        &mut self,
514        cx: &mut Context<'_>,
515    ) -> Poll<Result<Self::Output, Self::Error>>
516    where
517        Self: Unpin,
518    {
519        Pin::new(self).poll_complete(cx)
520    }
521
522    /// Provide a part to this writer in the output of a future.
523    ///
524    /// The result is a new writer over the type `U` that passes each value
525    /// through the function `f`, resolving the output, and sending it to the
526    /// inner writer.
527    ///
528    /// # Examples
529    ///
530    /// ```rust
531    /// # futures::executor::block_on(async {
532    /// use multipart_write::{MultipartWriteExt as _, write};
533    ///
534    /// let init: Vec<u8> = Vec::new();
535    /// let mut writer = write::extend(init)
536    ///     .ready_part(|n: u8| futures::future::ready(Ok(n + 1)));
537    ///
538    /// writer.send_flush(1).await.unwrap();
539    /// writer.send_flush(2).await.unwrap();
540    /// writer.send_flush(3).await.unwrap();
541    /// let out = writer.complete().await.unwrap();
542    ///
543    /// assert_eq!(out, vec![2, 3, 4]);
544    /// # })
545    /// ```
546    fn ready_part<P, Fut, F>(self, f: F) -> ReadyPart<Self, Part, P, Fut, F>
547    where
548        F: FnMut(P) -> Fut,
549        Fut: Future<Output = Result<Part, Self::Error>>,
550        Self: Sized,
551    {
552        assert_writer::<P, (), Self::Error, Self::Output, _>(ReadyPart::new(
553            self, f,
554        ))
555    }
556
557    /// A future that completes when a part has been fully processed into the
558    /// writer, including flushing.
559    fn send_flush(&mut self, part: Part) -> SendFlush<'_, Self, Part>
560    where
561        Self: Unpin,
562    {
563        SendFlush::new(self, part)
564    }
565
566    /// Asynchronously map the result of completing this writer to a different
567    /// result.
568    ///
569    /// # Examples
570    ///
571    /// ```rust
572    /// # futures::executor::block_on(async {
573    /// use multipart_write::{MultipartWriteExt as _, write};
574    ///
575    /// let init: Vec<u8> = Vec::new();
576    /// let mut writer = write::extend(init).then(|res| {
577    ///     futures::future::ready(res.map(|vs| vs.iter().sum::<u8>()))
578    /// });
579    ///
580    /// writer.send_flush(1).await.unwrap();
581    /// writer.send_flush(2).await.unwrap();
582    /// writer.send_flush(3).await.unwrap();
583    /// let out = writer.complete().await.unwrap();
584    ///
585    /// assert_eq!(out, 6);
586    /// # });
587    /// ```
588    fn then<T, Fut, F>(self, f: F) -> Then<Self, Part, T, Fut, F>
589    where
590        F: FnMut(Result<Self::Output, Self::Error>) -> Fut,
591        Fut: Future<Output = Result<T, Self::Error>>,
592        Self: Sized,
593    {
594        assert_writer::<Part, Self::Recv, Self::Error, T, _>(Then::new(self, f))
595    }
596}
597
598fn assert_writer<Part, R, E, T, Wr>(wr: Wr) -> Wr
599where
600    Wr: MultipartWrite<Part, Recv = R, Error = E, Output = T>,
601{
602    wr
603}