Skip to main content

multipart_write/stream/
mod.rs

1//! Using `MultipartWrite` with streams.
2//!
3//! This module contains the extension [`MultipartStreamExt`] that has adapters
4//! for using a `MultipartWrite` with a stream.
5use futures_core::stream::Stream;
6
7use crate::{FusedMultipartWrite, MultipartWrite};
8
9mod complete_with;
10pub use complete_with::CompleteWith;
11
12mod try_complete_when;
13pub use try_complete_when::TryCompleteWhen;
14
15impl<St: Stream> MultipartStreamExt for St {}
16
17/// An extension trait for `Stream`s that provides combinators to use with
18/// `MultipartWrite`rs.
19pub trait MultipartStreamExt: Stream {
20    /// Consumes a stream by passing to the provided `MultipartWrite`, returning
21    /// the complete output of the writer in a future.
22    ///
23    /// # Examples
24    ///
25    /// ```rust
26    /// # futures::executor::block_on(async {
27    /// use multipart_write::{
28    ///     MultipartStreamExt as _, MultipartWriteExt as _, write,
29    /// };
30    ///
31    /// let init: Vec<u8> = Vec::new();
32    /// let writer = write::extend(init).then(|res| async move {
33    ///     let vs = res?;
34    ///     Ok(vs.iter().sum::<u8>())
35    /// });
36    ///
37    /// let output = futures::stream::iter(1..=5)
38    ///     .complete_with(writer)
39    ///     .await;
40    ///
41    /// assert!(matches!(output, Ok(n) if n == 15));
42    /// # });
43    /// ```
44    fn complete_with<Wr>(self, writer: Wr) -> CompleteWith<Self, Wr>
45    where
46        Wr: MultipartWrite<Self::Item>,
47        Self: Sized,
48    {
49        CompleteWith::new(self, writer)
50    }
51
52    /// Transforms this stream into a stream of `Result`s returned by polling
53    /// the writer for completion.
54    ///
55    /// `TryCompleteWhen` does this by writing the items as parts to the writer
56    /// until the given closure evaluates to `true`, which triggers the writer
57    /// to produce the complete output result.
58    ///
59    /// Note the stronger requirement of [`FusedMultipartWrite`] on the writer
60    /// type. It must be safe to continue using the writer after it produces
61    /// the next completed item for the stream, so prior to this it must check
62    /// that the inner writer has not terminated. If either the stream or
63    /// the writer are terminated, the stream is ended.
64    ///
65    /// # Examples
66    ///
67    /// ```rust
68    /// # futures::executor::block_on(async {
69    /// use std::sync::Arc;
70    /// use std::sync::atomic::{AtomicU8, Ordering};
71    ///
72    /// use futures::stream::{self, TryStreamExt as _};
73    /// use multipart_write::{
74    ///     MultipartStreamExt as _, MultipartWriteExt as _, write,
75    /// };
76    ///
77    /// // The associated `Recv` type for the `FromExtend` writer is (), so
78    /// // there's nothing to decide when to stop and complete a part.  So in
79    /// // this contrived example a counter is used to complete the writer every
80    /// // third item in the stream.
81    /// let counter = Arc::new(AtomicU8::new(1));
82    ///
83    /// let init: Vec<u8> = Vec::new();
84    /// let writer = write::extend(init).then(|res| async move {
85    ///     let vs = res?;
86    ///     Ok(vs.iter().sum::<u8>())
87    /// });
88    ///
89    /// let output = stream::iter(1..=10)
90    ///     .try_complete_when(writer, |_| {
91    ///         let cnt = Arc::clone(&counter);
92    ///         let n = cnt.fetch_add(1, Ordering::SeqCst);
93    ///         n % 3 == 0
94    ///     })
95    ///     .try_collect::<Vec<_>>()
96    ///     .await
97    ///     .unwrap();
98    ///
99    /// assert_eq!(output, vec![6, 15, 24, 10]);
100    /// # });
101    /// ```
102    fn try_complete_when<Wr, F>(
103        self,
104        writer: Wr,
105        f: F,
106    ) -> TryCompleteWhen<Self, Wr, F>
107    where
108        Wr: FusedMultipartWrite<Self::Item>,
109        F: FnMut(Wr::Recv) -> bool,
110        Self: Sized,
111    {
112        TryCompleteWhen::new(self, writer, f)
113    }
114}