multipart_write/write/mod.rs
1//! `MultipartWrite` combinators.
2//!
3//! This module contains the trait [`MultipartWriteExt`], which provides
4//! adapters for chaining and composing `MultipartWrite`rs.
5use crate::{
6 BoxFusedMultipartWrite, BoxMultipartWrite, FusedMultipartWrite,
7 LocalBoxFusedMultipartWrite, LocalBoxMultipartWrite, MultipartWrite,
8};
9
10use futures_core::future::Future;
11use std::pin::Pin;
12use std::task::{Context, Poll};
13
14mod and_then;
15pub use and_then::AndThen;
16
17mod buffered;
18pub use buffered::Buffered;
19
20mod complete;
21pub use complete::Complete;
22
23mod from_extend;
24pub use from_extend::{FromExtend, from_extend};
25
26mod fanout;
27pub use fanout::Fanout;
28
29mod feed;
30pub use feed::Feed;
31
32mod filter_map_part;
33pub use filter_map_part::FilterMapPart;
34
35mod filter_part;
36pub use filter_part::FilterPart;
37
38mod flush;
39pub use flush::Flush;
40
41mod fold_sent;
42pub use fold_sent::FoldSent;
43
44mod fuse;
45pub use fuse::Fuse;
46
47mod lift;
48pub use lift::Lift;
49
50mod map_err;
51pub use map_err::MapErr;
52
53mod map_ok;
54pub use map_ok::MapOk;
55
56mod map_sent;
57pub use map_sent::MapSent;
58
59mod ready_part;
60pub use ready_part::ReadyPart;
61
62mod resolve;
63pub use resolve::{Resolve, resolve};
64
65mod send_flush;
66pub use send_flush::SendFlush;
67
68impl<Wr: MultipartWrite<Part>, Part> MultipartWriteExt<Part> for Wr {}
69
70/// An extension trait for `MultipartWrite` providing a variety of convenient
71/// combinator functions.
72pub trait MultipartWriteExt<Part>: MultipartWrite<Part> {
73 /// Compute from this writer's output type a new result using an
74 /// asynchronous closure.
75 ///
76 /// Calling `poll_complete` on this writer will complete the inner writer,
77 /// then run the provided closure `f` with the output to produce the final
78 /// output of this writer.
79 ///
80 /// # Examples
81 ///
82 /// ```rust
83 /// # futures::executor::block_on(async {
84 /// use std::io::Error as IoError;
85 ///
86 /// use futures::future;
87 /// use multipart_write::{MultipartWriteExt as _, write};
88 ///
89 /// // `from_extend` turns an impl of `std::iter::Extend` into a writer,
90 /// // here one that writes a `u8` and outputs a `Vec<u8>`.
91 /// let mut writer = write::from_extend::<u8, Vec<u8>>().and_then(|vs| {
92 /// future::ready(Ok::<_, IoError>(vs.iter().fold(0, |acc, n| acc + n)))
93 /// });
94 ///
95 /// writer.send_flush(1).await.unwrap();
96 /// writer.send_flush(2).await.unwrap();
97 /// writer.send_flush(3).await.unwrap();
98 /// let out = writer.complete().await.unwrap();
99 ///
100 /// assert_eq!(out, 6);
101 /// # });
102 /// ```
103 fn and_then<T, E, Fut, F>(self, f: F) -> AndThen<Self, Fut, F>
104 where
105 F: FnMut(Self::Output) -> Fut,
106 Fut: Future<Output = Result<T, E>>,
107 E: From<Self::Error>,
108 Self: Sized,
109 {
110 assert_writer::<Part, Self::Recv, E, T, _>(AndThen::new(self, f))
111 }
112
113 /// Wrap this writer in a `Box`, pinning it.
114 fn boxed<'a>(
115 self,
116 ) -> BoxMultipartWrite<'a, Part, Self::Recv, Self::Output, Self::Error>
117 where
118 Self: Sized + Send + 'a,
119 {
120 Box::pin(self)
121 }
122
123 /// Wrap this writer, which additionally has conditions making it a
124 /// [`FusedMultipartWrite`], in a `Box`, pinning it.
125 fn box_fused<'a>(
126 self,
127 ) -> BoxFusedMultipartWrite<'a, Part, Self::Recv, Self::Output, Self::Error>
128 where
129 Self: Sized + Send + FusedMultipartWrite<Part> + 'a,
130 {
131 Box::pin(self)
132 }
133
134 /// Wrap this writer, which additionally has conditions making it a
135 /// [`FusedMultipartWrite`], in a `Box`, pinning it.
136 ///
137 /// Similar to `box_fused` but without the `Send` requirement.
138 fn box_fused_local<'a>(
139 self,
140 ) -> LocalBoxFusedMultipartWrite<
141 'a,
142 Part,
143 Self::Recv,
144 Self::Output,
145 Self::Error,
146 >
147 where
148 Self: Sized + FusedMultipartWrite<Part> + 'a,
149 {
150 Box::pin(self)
151 }
152
153 /// Wrap this writer in a `Box`, pinning it.
154 ///
155 /// Similar to `boxed` but without the `Send` requirement.
156 fn boxed_local<'a>(
157 self,
158 ) -> LocalBoxMultipartWrite<'a, Part, Self::Recv, Self::Output, Self::Error>
159 where
160 Self: Sized + 'a,
161 {
162 Box::pin(self)
163 }
164
165 /// Adds a fixed size buffer to the current writer.
166 ///
167 /// The resulting `MultipartWrite` will buffer up to `capacity` items when
168 /// the underlying writer is not able to accept new parts.
169 ///
170 /// The values returned when the underlying writer has received a part are
171 /// also accumulated and returned in batch.
172 fn buffered(
173 self,
174 capacity: impl Into<Option<usize>>,
175 ) -> Buffered<Self, Part>
176 where
177 Self: Sized,
178 {
179 assert_writer::<
180 Part,
181 Option<Vec<Self::Recv>>,
182 Self::Error,
183 Self::Output,
184 _,
185 >(Buffered::new(self, capacity.into().unwrap_or_default()))
186 }
187
188 /// A future that runs this writer to completion, returning the associated
189 /// output.
190 fn complete(&mut self) -> Complete<'_, Self, Part>
191 where
192 Self: Unpin,
193 {
194 Complete::new(self)
195 }
196
197 /// Fanout the part to multiple writers.
198 ///
199 /// This adapter clones each incoming part and forwards it to both writers.
200 ///
201 /// # Examples
202 ///
203 /// ```rust
204 /// # futures::executor::block_on(async {
205 /// use multipart_write::{MultipartWriteExt as _, write};
206 ///
207 /// let wr1 = write::from_extend::<u8, Vec<u8>>();
208 /// let wr2 = write::from_extend::<u8, Vec<u8>>();
209 /// let mut writer = wr1.fanout(wr2);
210 ///
211 /// writer.send_flush(1).await.unwrap();
212 /// writer.send_flush(2).await.unwrap();
213 /// writer.send_flush(3).await.unwrap();
214 /// let out = writer.complete().await.unwrap();
215 ///
216 /// assert_eq!(out, (vec![1, 2, 3], vec![1, 2, 3]));
217 /// # })
218 /// ```
219 fn fanout<U>(self, other: U) -> Fanout<Self, U, Part>
220 where
221 Part: Clone,
222 U: MultipartWrite<Part, Error = Self::Error>,
223 Self: Sized,
224 {
225 assert_writer::<
226 Part,
227 (Self::Recv, U::Recv),
228 Self::Error,
229 (Self::Output, U::Output),
230 _,
231 >(Fanout::new(self, other))
232 }
233
234 /// A future that completes after the given part has been received by the
235 /// writer.
236 ///
237 /// Unlike `send_flush`, the returned future does not flush the writer. It
238 /// is the caller's responsibility to ensure all pending items are
239 /// processed, which can be done with `flush` or `complete`.
240 fn feed(&mut self, part: Part) -> Feed<'_, Self, Part>
241 where
242 Self: Unpin,
243 {
244 Feed::new(self, part)
245 }
246
247 /// Apply a filter to this writer's parts, returning a new writer with the
248 /// same output.
249 ///
250 /// The return type of this writer is `Option<Self::Recv>` and is `None`
251 /// when the part did not pass the filter.
252 ///
253 /// # Examples
254 ///
255 /// ```rust
256 /// # futures::executor::block_on(async {
257 /// use multipart_write::{MultipartWriteExt, write};
258 ///
259 /// let mut writer =
260 /// write::from_extend::<u8, Vec<u8>>().filter_part(|n| n % 2 == 0);
261 ///
262 /// let r1 = writer.send_flush(1).await.unwrap();
263 /// let r2 = writer.send_flush(2).await.unwrap();
264 /// let r3 = writer.send_flush(3).await.unwrap();
265 /// let out = writer.complete().await.unwrap();
266 ///
267 /// assert!(r1.is_none() && r2.is_some() && r3.is_none());
268 /// assert_eq!(out, vec![2]);
269 /// # })
270 /// ```
271 fn filter_part<F>(self, f: F) -> FilterPart<Self, F>
272 where
273 F: FnMut(&Part) -> bool,
274 Self: Sized,
275 {
276 assert_writer::<Part, Option<Self::Recv>, Self::Error, Self::Output, _>(
277 FilterPart::new(self, f),
278 )
279 }
280
281 /// Attempt to map the input to a part for this writer, filtering out the
282 /// inputs where the mapping returns `None`.
283 ///
284 /// The return type of this writer is `Option<Self::Recv>` and is `None`
285 /// when the provided closure returns `None`.
286 ///
287 /// # Examples
288 ///
289 /// ```rust
290 /// # futures::executor::block_on(async {
291 /// use multipart_write::{MultipartWriteExt as _, write};
292 ///
293 /// let mut writer = write::from_extend::<String, Vec<String>>()
294 /// .filter_map_part::<u8, _>(|n| {
295 /// if n % 2 == 0 { Some(n.to_string()) } else { None }
296 /// });
297 ///
298 /// let r1 = writer.send_flush(1).await.unwrap();
299 /// let r2 = writer.send_flush(2).await.unwrap();
300 /// let r3 = writer.send_flush(3).await.unwrap();
301 /// let out = writer.complete().await.unwrap();
302 ///
303 /// assert!(r1.is_none() && r2.is_some() && r3.is_none());
304 /// assert_eq!(out, vec!["2".to_string()]);
305 /// # })
306 /// ```
307 fn filter_map_part<U, F>(self, f: F) -> FilterMapPart<Self, F>
308 where
309 F: FnMut(U) -> Option<Part>,
310 Self: Sized,
311 {
312 assert_writer::<U, Option<Self::Recv>, Self::Error, Self::Output, _>(
313 FilterMapPart::new(self, f),
314 )
315 }
316
317 /// A future that completes when the underlying writer has been flushed.
318 fn flush(&mut self) -> Flush<'_, Self, Part>
319 where
320 Self: Unpin,
321 {
322 Flush::new(self)
323 }
324
325 /// Accumulate the values returned by starting a send, returning it with the
326 /// output.
327 ///
328 /// # Examples
329 ///
330 /// ```rust
331 /// # futures::executor::block_on(async {
332 /// use multipart_write::{MultipartWriteExt as _, write};
333 ///
334 /// let mut writer =
335 /// write::from_extend::<u8, Vec<u8>>().fold_sent(0, |n, _| n + 1);
336 ///
337 /// let r1 = writer.send_flush(1).await.unwrap();
338 /// let r2 = writer.send_flush(2).await.unwrap();
339 /// let r3 = writer.send_flush(3).await.unwrap();
340 /// let out = writer.complete().await.unwrap();
341 ///
342 /// assert_eq!(out, (3, vec![1, 2, 3]));
343 /// # })
344 /// ```
345 fn fold_sent<T, F>(self, id: T, f: F) -> FoldSent<Self, T, F>
346 where
347 F: FnMut(T, &Self::Recv) -> T,
348 Self: Sized,
349 {
350 assert_writer::<Part, Self::Recv, Self::Error, (T, Self::Output), _>(
351 FoldSent::new(self, id, f),
352 )
353 }
354
355 /// Returns a new writer that fuses according to the provided closure.
356 ///
357 /// The resulting writer wraps both `Self::Recv` and `Self::Output` in
358 /// an `Option` and is guaranted to both output and return `Ok(None)`
359 /// when called after becoming fused.
360 fn fuse<F>(self, f: F) -> Fuse<Self, F>
361 where
362 F: FnMut(&Self::Output) -> bool,
363 Self: Sized,
364 {
365 assert_writer::<
366 Part,
367 Option<Self::Recv>,
368 Self::Error,
369 Option<Self::Output>,
370 _,
371 >(Fuse::new(self, f))
372 }
373
374 /// Produce the parts for this writer from the output of another writer.
375 ///
376 /// # Examples
377 ///
378 /// ```rust
379 /// # futures::executor::block_on(async {
380 /// use multipart_write::{MultipartWriteExt as _, write};
381 ///
382 /// let wr = write::from_extend::<u8, Vec<u8>>()
383 /// .map_ok(|vs| vs.iter().fold(0, |acc, n| acc + n));
384 /// let mut writer = write::from_extend::<u8, Vec<u8>>().lift(wr);
385 ///
386 /// // We use `feed` and not `send_flush` because `send_flush` will complete
387 /// // the outer writer and write its output to the inner writer after each
388 /// // send, which is not what we want the example to show.
389 /// writer.feed(1).await.unwrap();
390 /// writer.feed(2).await.unwrap();
391 ///
392 /// // Flush the writer manually, which now completes the outer writer and
393 /// // writes its output, the sum of the parts written, to the inner writer.
394 /// writer.flush().await.unwrap();
395 ///
396 /// writer.feed(3).await.unwrap();
397 /// writer.feed(4).await.unwrap();
398 /// writer.feed(5).await.unwrap();
399 /// let out = writer.complete().await.unwrap();
400 ///
401 /// assert_eq!(out, vec![3, 12]);
402 /// # })
403 /// ```
404 fn lift<U, T>(self, other: U) -> Lift<Self, U, Part>
405 where
406 Self: Sized,
407 Self::Error: From<U::Error>,
408 U: MultipartWrite<T, Output = Part>,
409 {
410 assert_writer::<T, U::Recv, Self::Error, Self::Output, _>(Lift::new(
411 self, other,
412 ))
413 }
414
415 /// Map this writer's return type to a different value, returning a new
416 /// multipart writer with the given return type.
417 ///
418 /// # Examples
419 ///
420 /// ```rust
421 /// # futures::executor::block_on(async {
422 /// use multipart_write::{MultipartWriteExt as _, write};
423 ///
424 /// let mut writer = write::from_extend::<u8, Vec<u8>>()
425 /// .map_sent::<&'static str, _>(|_| "OK");
426 ///
427 /// let r1 = writer.send_flush(1).await.unwrap();
428 /// let r2 = writer.send_flush(2).await.unwrap();
429 /// let r3 = writer.send_flush(3).await.unwrap();
430 /// let out = writer.complete().await.unwrap();
431 ///
432 /// assert_eq!(vec![r1, r2, r3], vec!["OK", "OK", "OK"]);
433 /// assert_eq!(out, vec![1, 2, 3]);
434 /// # })
435 /// ```
436 fn map_sent<U, F>(self, f: F) -> MapSent<Self, F>
437 where
438 F: FnMut(Self::Recv) -> U,
439 Self: Sized,
440 {
441 assert_writer::<Part, U, Self::Error, Self::Output, _>(MapSent::new(
442 self, f,
443 ))
444 }
445
446 /// Map this writer's error type to a different value, returning a new
447 /// multipart writer with the given error type.
448 fn map_err<E, F>(self, f: F) -> MapErr<Self, F>
449 where
450 F: FnMut(Self::Error) -> E,
451 Self: Sized,
452 {
453 assert_writer::<Part, Self::Recv, E, Self::Output, _>(MapErr::new(
454 self, f,
455 ))
456 }
457
458 /// Map this writer's output type to a different type, returning a new
459 /// multipart writer with the given output type.
460 fn map_ok<U, F>(self, f: F) -> MapOk<Self, F>
461 where
462 F: FnMut(Self::Output) -> U,
463 Self: Sized,
464 {
465 assert_writer::<Part, Self::Recv, Self::Error, U, _>(MapOk::new(
466 self, f,
467 ))
468 }
469
470 /// A convenience method for calling [`MultipartWrite::poll_ready`] on
471 /// [`Unpin`] writer types.
472 #[must_use = "futures do nothing unless polled"]
473 fn poll_ready_unpin(
474 &mut self,
475 cx: &mut Context<'_>,
476 ) -> Poll<Result<(), Self::Error>>
477 where
478 Self: Unpin,
479 {
480 Pin::new(self).poll_ready(cx)
481 }
482
483 /// A convenience method for calling [`MultipartWrite::poll_flush`] on
484 /// [`Unpin`] writer types.
485 #[must_use = "futures do nothing unless polled"]
486 fn poll_flush_unpin(
487 &mut self,
488 cx: &mut Context<'_>,
489 ) -> Poll<Result<(), Self::Error>>
490 where
491 Self: Unpin,
492 {
493 Pin::new(self).poll_flush(cx)
494 }
495
496 /// A convenience method for calling [`MultipartWrite::poll_complete`] on
497 /// [`Unpin`] writer types.
498 #[must_use = "futures do nothing unless polled"]
499 fn poll_complete_unpin(
500 &mut self,
501 cx: &mut Context<'_>,
502 ) -> Poll<Result<Self::Output, Self::Error>>
503 where
504 Self: Unpin,
505 {
506 Pin::new(self).poll_complete(cx)
507 }
508
509 /// Provide a part to this writer in the output of a future.
510 ///
511 /// The result is a new writer over the type `U` that passes each value
512 /// through the function `f`, resolving the output, and sending it to the
513 /// inner writer.
514 ///
515 /// # Examples
516 ///
517 /// ```rust
518 /// # futures::executor::block_on(async {
519 /// use std::io::Error as IoError;
520 ///
521 /// use futures::future;
522 /// use multipart_write::{MultipartWriteExt as _, write};
523 ///
524 /// let mut writer = write::from_extend::<u8, Vec<u8>>()
525 /// .ready_part(|n| future::ready(Ok::<_, IoError>(n + 1_u8)));
526 ///
527 /// writer.send_flush(1).await.unwrap();
528 /// writer.send_flush(2).await.unwrap();
529 /// writer.send_flush(3).await.unwrap();
530 /// let out = writer.complete().await.unwrap();
531 ///
532 /// assert_eq!(out, vec![2, 3, 4]);
533 /// # })
534 /// ```
535 fn ready_part<U, E, Fut, F>(self, f: F) -> ReadyPart<Self, Part, Fut, F>
536 where
537 F: FnMut(U) -> Fut,
538 Fut: Future<Output = Result<Part, E>>,
539 E: From<Self::Error>,
540 Self: Sized,
541 {
542 assert_writer::<U, (), E, Self::Output, _>(ReadyPart::new(self, f))
543 }
544
545 /// A future that completes when a part has been fully processed into the
546 /// writer, including flushing.
547 fn send_flush(&mut self, part: Part) -> SendFlush<'_, Self, Part>
548 where
549 Self: Unpin,
550 {
551 SendFlush::new(self, part)
552 }
553}
554
555fn assert_writer<Part, R, E, T, Wr>(wr: Wr) -> Wr
556where
557 Wr: MultipartWrite<Part, Recv = R, Error = E, Output = T>,
558{
559 wr
560}