multipart_write/stream/mod.rs
1//! Using `MultipartWrite` with streams.
2//!
3//! This module contains the extension [`MultipartStreamExt`] that has adapters
4//! for using a `MultipartWrite` with a stream.
5use futures_core::stream::Stream;
6
7use crate::{FusedMultipartWrite, MultipartWrite};
8
9mod complete_with;
10pub use complete_with::CompleteWith;
11
12mod try_complete_when;
13pub use try_complete_when::TryCompleteWhen;
14
15impl<St: Stream> MultipartStreamExt for St {}
16
17/// An extension trait for `Stream`s that provides combinators to use with
18/// `MultipartWrite`rs.
19pub trait MultipartStreamExt: Stream {
20 /// Consumes a stream by passing to the provided `MultipartWrite`, returning
21 /// the complete output of the writer in a future.
22 ///
23 /// # Examples
24 ///
25 /// ```rust
26 /// # futures::executor::block_on(async {
27 /// use multipart_write::{
28 /// MultipartStreamExt as _, MultipartWriteExt as _, write,
29 /// };
30 ///
31 /// let init: Vec<u8> = Vec::new();
32 /// let writer = write::extend(init).then(|res| async move {
33 /// let vs = res?;
34 /// Ok(vs.iter().sum::<u8>())
35 /// });
36 ///
37 /// let output = futures::stream::iter(1..=5)
38 /// .complete_with(writer)
39 /// .await;
40 ///
41 /// assert!(matches!(output, Ok(n) if n == 15));
42 /// # });
43 /// ```
44 fn complete_with<Wr>(self, writer: Wr) -> CompleteWith<Self, Wr>
45 where
46 Wr: MultipartWrite<Self::Item>,
47 Self: Sized,
48 {
49 CompleteWith::new(self, writer)
50 }
51
52 /// Transforms this stream into a stream of `Result`s returned by polling
53 /// the writer for completion.
54 ///
55 /// `TryCompleteWhen` does this by writing the items as parts to the writer
56 /// until the given closure evaluates to `true`, which triggers the writer
57 /// to produce the complete output result.
58 ///
59 /// Note the stronger requirement of [`FusedMultipartWrite`] on the writer
60 /// type. It must be safe to continue using the writer after it produces
61 /// the next completed item for the stream, so prior to this it must check
62 /// that the inner writer has not terminated. If either the stream or
63 /// the writer are terminated, the stream is ended.
64 ///
65 /// # Examples
66 ///
67 /// ```rust
68 /// # futures::executor::block_on(async {
69 /// use std::sync::Arc;
70 /// use std::sync::atomic::{AtomicU8, Ordering};
71 ///
72 /// use futures::stream::{self, TryStreamExt as _};
73 /// use multipart_write::{
74 /// MultipartStreamExt as _, MultipartWriteExt as _, write,
75 /// };
76 ///
77 /// // The associated `Recv` type for the `FromExtend` writer is (), so
78 /// // there's nothing to decide when to stop and complete a part. So in
79 /// // this contrived example a counter is used to complete the writer every
80 /// // third item in the stream.
81 /// let counter = Arc::new(AtomicU8::new(1));
82 ///
83 /// let init: Vec<u8> = Vec::new();
84 /// let writer = write::extend(init).then(|res| async move {
85 /// let vs = res?;
86 /// Ok(vs.iter().sum::<u8>())
87 /// });
88 ///
89 /// let output = stream::iter(1..=10)
90 /// .try_complete_when(writer, |_| {
91 /// let cnt = Arc::clone(&counter);
92 /// let n = cnt.fetch_add(1, Ordering::SeqCst);
93 /// n % 3 == 0
94 /// })
95 /// .try_collect::<Vec<_>>()
96 /// .await
97 /// .unwrap();
98 ///
99 /// assert_eq!(output, vec![6, 15, 24, 10]);
100 /// # });
101 /// ```
102 fn try_complete_when<Wr, F>(
103 self,
104 writer: Wr,
105 f: F,
106 ) -> TryCompleteWhen<Self, Wr, F>
107 where
108 Wr: FusedMultipartWrite<Self::Item>,
109 F: FnMut(Wr::Recv) -> bool,
110 Self: Sized,
111 {
112 TryCompleteWhen::new(self, writer, f)
113 }
114}