multipart_write/stream/mod.rs
1//! Using `MultipartWrite` with streams.
2//!
3//! This module contains the extension [`MultipartStreamExt`] that has adapters
4//! for composing `MultipartWrite` with streams.
5use futures_core::stream::Stream;
6
7use crate::{FusedMultipartWrite, MultipartWrite};
8
9mod complete_with;
10pub use complete_with::CompleteWith;
11
12mod try_complete_when;
13pub use try_complete_when::TryCompleteWhen;
14
15impl<St: Stream> MultipartStreamExt for St {}
16
17/// An extension trait for `Stream`s that provides combinators to use with
18/// `MultipartWrite`rs.
19pub trait MultipartStreamExt: Stream {
20 /// Consumes a stream by passing to the provided `MultipartWrite`, returning
21 /// the complete output of the writer in a future.
22 ///
23 /// # Examples
24 ///
25 /// ```rust
26 /// # futures::executor::block_on(async {
27 /// use multipart_write::{
28 /// MultipartStreamExt as _, MultipartWriteExt as _, write,
29 /// };
30 ///
31 /// let writer = write::from_extend::<u8, Vec<u8>>().and_then(|vs| {
32 /// futures::future::ready(Ok::<u8, std::io::Error>(vs.iter().sum()))
33 /// });
34 ///
35 /// let vs: Vec<u8> = (1..=5).collect();
36 /// let output = futures::stream::iter(vs)
37 /// .complete_with(writer)
38 /// .await;
39 ///
40 /// assert!(matches!(output, Ok(n) if n == 15));
41 /// # });
42 /// ```
43 fn complete_with<Wr>(self, writer: Wr) -> CompleteWith<Self, Wr>
44 where
45 Wr: MultipartWrite<Self::Item>,
46 Self: Sized,
47 {
48 CompleteWith::new(self, writer)
49 }
50
51 /// Transforms this stream into a stream of `Result`s returned by polling
52 /// the writer for completion.
53 ///
54 /// `TryCompleteWhen` does this by writing the items as parts to the writer
55 /// until the given closure evaluates to `true`, which triggers the writer
56 /// to produce the complete output result.
57 ///
58 /// Note the stronger requirement of [`FusedMultipartWrite`] on the writer
59 /// type. It must be safe to continue using the writer after it produces
60 /// the next completed item for the stream, so prior to this it must check
61 /// that the inner writer has not terminated. If either the stream or
62 /// the writer are terminated, the stream is ended.
63 ///
64 /// # Examples
65 ///
66 /// ```rust
67 /// # futures::executor::block_on(async {
68 /// use std::sync::Arc;
69 /// use std::sync::atomic::{AtomicU8, Ordering};
70 ///
71 /// use futures::TryStreamExt as _;
72 /// use multipart_write::{
73 /// MultipartStreamExt as _, MultipartWriteExt as _, write,
74 /// };
75 ///
76 /// // The associated `Recv` type for the `FromExtend` writer is (), so
77 /// // there's nothing to decide when to stop and complete a part. So here
78 /// // we keep a counter for the purpose of writing out a part every third
79 /// // item in the stream.
80 /// let counter = Arc::new(AtomicU8::new(1));
81 ///
82 /// let writer = write::from_extend::<u8, Vec<u8>>().and_then(|vs| {
83 /// futures::future::ready(Ok::<u8, std::io::Error>(vs.iter().sum()))
84 /// });
85 ///
86 /// let output = futures::stream::iter(1..=10)
87 /// .try_complete_when(writer, |_| {
88 /// let cnt = Arc::clone(&counter);
89 /// let n = cnt.fetch_add(1, Ordering::SeqCst);
90 /// n % 3 == 0
91 /// })
92 /// .try_collect::<Vec<_>>()
93 /// .await
94 /// .unwrap();
95 ///
96 /// assert_eq!(output, vec![6, 15, 24, 10]);
97 /// # });
98 /// ```
99 fn try_complete_when<Wr, F>(
100 self,
101 writer: Wr,
102 f: F,
103 ) -> TryCompleteWhen<Self, Wr, F>
104 where
105 Wr: FusedMultipartWrite<Self::Item>,
106 F: FnMut(Wr::Recv) -> bool,
107 Self: Sized,
108 {
109 TryCompleteWhen::new(self, writer, f)
110 }
111}