multipart_write/write/mod.rs
1//! `MultipartWrite` combinators.
2//!
3//! This module contains the trait [`MultipartWriteExt`], which provides
4//! adapters for chaining and composing writers.
5use crate::{
6 BoxFusedMultipartWrite, BoxMultipartWrite, FusedMultipartWrite,
7 LocalBoxFusedMultipartWrite, LocalBoxMultipartWrite, MultipartWrite,
8};
9
10use futures_core::future::Future;
11use std::pin::Pin;
12use std::task::{Context, Poll};
13
14mod buffered;
15pub use buffered::Buffered;
16
17mod complete;
18pub use complete::Complete;
19
20mod from_extend;
21pub use from_extend::{FromExtend, from_extend};
22
23mod fanout;
24pub use fanout::Fanout;
25
26mod feed;
27pub use feed::Feed;
28
29mod filter_map_part;
30pub use filter_map_part::FilterMapPart;
31
32mod filter_part;
33pub use filter_part::FilterPart;
34
35mod flush;
36pub use flush::Flush;
37
38mod fold_sent;
39pub use fold_sent::FoldSent;
40
41mod for_each_recv;
42pub use for_each_recv::ForEachRecv;
43
44mod fuse;
45pub use fuse::Fuse;
46
47mod lift;
48pub use lift::Lift;
49
50mod map_err;
51pub use map_err::MapErr;
52
53mod map_ok;
54pub use map_ok::MapOk;
55
56mod map_sent;
57pub use map_sent::MapSent;
58
59mod ready_part;
60pub use ready_part::ReadyPart;
61
62mod send_flush;
63pub use send_flush::SendFlush;
64
65mod then;
66pub use then::Then;
67
68impl<Wr: MultipartWrite<Part>, Part> MultipartWriteExt<Part> for Wr {}
69
70/// An extension trait for `MultipartWrite` providing a variety of convenient
71/// combinator functions.
72pub trait MultipartWriteExt<Part>: MultipartWrite<Part> {
73 /// Wrap this writer in a `Box`, pinning it.
74 fn boxed<'a>(
75 self,
76 ) -> BoxMultipartWrite<'a, Part, Self::Recv, Self::Output, Self::Error>
77 where
78 Self: Sized + Send + 'a,
79 {
80 Box::pin(self)
81 }
82
83 /// Wrap this writer, which additionally has conditions making it a
84 /// [`FusedMultipartWrite`], in a `Box`, pinning it.
85 fn box_fused<'a>(
86 self,
87 ) -> BoxFusedMultipartWrite<'a, Part, Self::Recv, Self::Output, Self::Error>
88 where
89 Self: Sized + Send + FusedMultipartWrite<Part> + 'a,
90 {
91 Box::pin(self)
92 }
93
94 /// Wrap this writer, which additionally has conditions making it a
95 /// [`FusedMultipartWrite`], in a `Box`, pinning it.
96 ///
97 /// Similar to `box_fused` but without the `Send` requirement.
98 fn box_fused_local<'a>(
99 self,
100 ) -> LocalBoxFusedMultipartWrite<
101 'a,
102 Part,
103 Self::Recv,
104 Self::Output,
105 Self::Error,
106 >
107 where
108 Self: Sized + FusedMultipartWrite<Part> + 'a,
109 {
110 Box::pin(self)
111 }
112
113 /// Wrap this writer in a `Box`, pinning it.
114 ///
115 /// Similar to `boxed` but without the `Send` requirement.
116 fn boxed_local<'a>(
117 self,
118 ) -> LocalBoxMultipartWrite<'a, Part, Self::Recv, Self::Output, Self::Error>
119 where
120 Self: Sized + 'a,
121 {
122 Box::pin(self)
123 }
124
125 /// Adds a fixed size buffer to the current writer.
126 ///
127 /// The resulting `MultipartWrite` will buffer up to `capacity` items when
128 /// the underlying writer is not able to accept new parts.
129 ///
130 /// The values returned when the underlying writer has received a part are
131 /// also accumulated and returned in batch.
132 fn buffered(
133 self,
134 capacity: impl Into<Option<usize>>,
135 ) -> Buffered<Self, Part>
136 where
137 Self: Sized,
138 {
139 assert_writer::<
140 Part,
141 Option<Vec<Self::Recv>>,
142 Self::Error,
143 Self::Output,
144 _,
145 >(Buffered::new(self, capacity.into().unwrap_or_default()))
146 }
147
148 /// A future that runs this writer to completion, returning the associated
149 /// output.
150 fn complete(&mut self) -> Complete<'_, Self, Part>
151 where
152 Self: Unpin,
153 {
154 Complete::new(self)
155 }
156
157 /// Fanout the part to multiple writers.
158 ///
159 /// This adapter clones each incoming part and forwards it to both writers.
160 ///
161 /// # Examples
162 ///
163 /// ```rust
164 /// # futures::executor::block_on(async {
165 /// use multipart_write::{MultipartWriteExt as _, write};
166 ///
167 /// let wr1 = write::from_extend::<u8, Vec<u8>>();
168 /// let wr2 = write::from_extend::<u8, Vec<u8>>();
169 /// let mut writer = wr1.fanout(wr2);
170 ///
171 /// writer.send_flush(1).await.unwrap();
172 /// writer.send_flush(2).await.unwrap();
173 /// writer.send_flush(3).await.unwrap();
174 /// let out = writer.complete().await.unwrap();
175 ///
176 /// assert_eq!(out, (vec![1, 2, 3], vec![1, 2, 3]));
177 /// # })
178 /// ```
179 fn fanout<U>(self, other: U) -> Fanout<Self, U, Part>
180 where
181 Part: Clone,
182 U: MultipartWrite<Part, Error = Self::Error>,
183 Self: Sized,
184 {
185 assert_writer::<
186 Part,
187 (Self::Recv, U::Recv),
188 Self::Error,
189 (Self::Output, U::Output),
190 _,
191 >(Fanout::new(self, other))
192 }
193
194 /// A future that completes after the given part has been received by the
195 /// writer.
196 ///
197 /// Unlike `send_flush`, the returned future does not flush the writer. It
198 /// is the caller's responsibility to ensure all pending items are
199 /// processed, which can be done with `flush` or `complete`.
200 fn feed(&mut self, part: Part) -> Feed<'_, Self, Part>
201 where
202 Self: Unpin,
203 {
204 Feed::new(self, part)
205 }
206
207 /// Apply a filter to this writer's parts, returning a new writer with the
208 /// same output.
209 ///
210 /// The return type of this writer is `Option<Self::Recv>` and is `None`
211 /// when the part did not pass the filter.
212 ///
213 /// # Examples
214 ///
215 /// ```rust
216 /// # futures::executor::block_on(async {
217 /// use multipart_write::{MultipartWriteExt, write};
218 ///
219 /// let mut writer =
220 /// write::from_extend::<u8, Vec<u8>>().filter_part(|n| n % 2 == 0);
221 ///
222 /// let r1 = writer.send_flush(1).await.unwrap();
223 /// let r2 = writer.send_flush(2).await.unwrap();
224 /// let r3 = writer.send_flush(3).await.unwrap();
225 /// let out = writer.complete().await.unwrap();
226 ///
227 /// assert!(r1.is_none() && r2.is_some() && r3.is_none());
228 /// assert_eq!(out, vec![2]);
229 /// # })
230 /// ```
231 fn filter_part<F>(self, f: F) -> FilterPart<Self, F>
232 where
233 F: FnMut(&Part) -> bool,
234 Self: Sized,
235 {
236 assert_writer::<Part, Option<Self::Recv>, Self::Error, Self::Output, _>(
237 FilterPart::new(self, f),
238 )
239 }
240
241 /// Attempt to map the input to a part for this writer, filtering out the
242 /// inputs where the mapping returns `None`.
243 ///
244 /// The return type of this writer is `Option<Self::Recv>` and is `None`
245 /// when the provided closure returns `None`.
246 ///
247 /// # Examples
248 ///
249 /// ```rust
250 /// # futures::executor::block_on(async {
251 /// use multipart_write::{MultipartWriteExt as _, write};
252 ///
253 /// let mut writer = write::from_extend::<String, Vec<String>>()
254 /// .filter_map_part::<u8, _>(|n| {
255 /// if n % 2 == 0 { Some(n.to_string()) } else { None }
256 /// });
257 ///
258 /// let r1 = writer.send_flush(1).await.unwrap();
259 /// let r2 = writer.send_flush(2).await.unwrap();
260 /// let r3 = writer.send_flush(3).await.unwrap();
261 /// let out = writer.complete().await.unwrap();
262 ///
263 /// assert!(r1.is_none() && r2.is_some() && r3.is_none());
264 /// assert_eq!(out, vec!["2".to_string()]);
265 /// # })
266 /// ```
267 fn filter_map_part<U, F>(self, f: F) -> FilterMapPart<Self, F>
268 where
269 F: FnMut(U) -> Option<Part>,
270 Self: Sized,
271 {
272 assert_writer::<U, Option<Self::Recv>, Self::Error, Self::Output, _>(
273 FilterMapPart::new(self, f),
274 )
275 }
276
277 /// A future that completes when the underlying writer has been flushed.
278 fn flush(&mut self) -> Flush<'_, Self, Part>
279 where
280 Self: Unpin,
281 {
282 Flush::new(self)
283 }
284
285 /// Accumulate the values returned by starting a send, returning it with the
286 /// output.
287 ///
288 /// # Examples
289 ///
290 /// ```rust
291 /// # futures::executor::block_on(async {
292 /// use multipart_write::{MultipartWriteExt as _, write};
293 ///
294 /// let mut writer =
295 /// write::from_extend::<u8, Vec<u8>>().fold_sent(0, |n, _| n + 1);
296 ///
297 /// let r1 = writer.send_flush(1).await.unwrap();
298 /// let r2 = writer.send_flush(2).await.unwrap();
299 /// let r3 = writer.send_flush(3).await.unwrap();
300 /// let out = writer.complete().await.unwrap();
301 ///
302 /// assert_eq!(out, (3, vec![1, 2, 3]));
303 /// # })
304 /// ```
305 fn fold_sent<T, F>(self, id: T, f: F) -> FoldSent<Self, T, F>
306 where
307 F: FnMut(T, &Self::Recv) -> T,
308 Self: Sized,
309 {
310 assert_writer::<Part, Self::Recv, Self::Error, (T, Self::Output), _>(
311 FoldSent::new(self, id, f),
312 )
313 }
314
315 /// Evaluate the given async closure on the associated `Self::Recv` for this
316 /// writer.
317 ///
318 /// The result is a new writer that has all of the same properties as this
319 /// writer, except that `poll_ready` will not accept the next part until the
320 /// future returned by evaluating `F` on the return value resolves.
321 ///
322 /// # Examples
323 ///
324 /// ```rust
325 /// # futures::executor::block_on(async {
326 /// use std::sync::Arc;
327 /// use std::sync::atomic::{AtomicU8, Ordering};
328 ///
329 /// use multipart_write::{MultipartWriteExt as _, write};
330 ///
331 /// let counter = Arc::new(AtomicU8::new(1));
332 ///
333 /// // `from_extend` has no return type, so `map_sent` makes one for the
334 /// // demonstration.
335 /// let wr = write::from_extend::<u8, Vec<u8>>().map_sent::<u8, _>(|_| {
336 /// let cnt = Arc::clone(&counter);
337 /// let n = cnt.fetch_add(1, Ordering::SeqCst);
338 /// n
339 /// });
340 ///
341 /// let mut writer = wr.for_each_recv(|n| {
342 /// println!("{n} parts written");
343 /// futures::future::ready(())
344 /// });
345 ///
346 /// let r1 = writer.send_flush(1).await.unwrap();
347 /// let r2 = writer.send_flush(2).await.unwrap();
348 /// let r3 = writer.send_flush(3).await.unwrap();
349 /// let out = writer.complete().await.unwrap();
350 ///
351 /// assert_eq!(out, vec![1, 2, 3]);
352 /// # })
353 /// ```
354 fn for_each_recv<Fut, F>(self, f: F) -> ForEachRecv<Self, Fut, F>
355 where
356 Self: Sized,
357 Self::Recv: Clone,
358 F: FnMut(Self::Recv) -> Fut,
359 Fut: Future<Output = ()>,
360 {
361 assert_writer::<Part, Self::Recv, Self::Error, Self::Output, _>(
362 ForEachRecv::new(self, f),
363 )
364 }
365
366 /// Returns a new writer that fuses according to the provided closure.
367 ///
368 /// The resulting writer wraps both `Self::Recv` and `Self::Output` in
369 /// an `Option` and is guaranteed to both output and return `Ok(None)`
370 /// when called after becoming fused.
371 fn fuse<F>(self, f: F) -> Fuse<Self, F>
372 where
373 F: FnMut(&Self::Output) -> bool,
374 Self: Sized,
375 {
376 assert_writer::<
377 Part,
378 Option<Self::Recv>,
379 Self::Error,
380 Option<Self::Output>,
381 _,
382 >(Fuse::new(self, f))
383 }
384
385 /// Produce the parts for this writer from the output of another writer.
386 ///
387 /// # Examples
388 ///
389 /// ```rust
390 /// # futures::executor::block_on(async {
391 /// use multipart_write::{MultipartWriteExt as _, write};
392 ///
393 /// let wr =
394 /// write::from_extend::<u8, Vec<u8>>().map_ok(|vs| vs.iter().sum::<u8>());
395 /// let mut writer = write::from_extend::<u8, Vec<u8>>().lift(wr);
396 ///
397 /// // We use `feed` and not `send_flush` because `send_flush` will complete
398 /// // the outer writer and write its output to the inner writer after each
399 /// // send, which is not what we want the example to show.
400 /// writer.feed(1).await.unwrap();
401 /// writer.feed(2).await.unwrap();
402 ///
403 /// // Flush the writer manually, which now completes the outer writer and
404 /// // writes its output, the sum of the parts written, to the inner writer.
405 /// writer.flush().await.unwrap();
406 ///
407 /// writer.feed(3).await.unwrap();
408 /// writer.feed(4).await.unwrap();
409 /// writer.feed(5).await.unwrap();
410 /// let out = writer.complete().await.unwrap();
411 ///
412 /// assert_eq!(out, vec![3, 12]);
413 /// # })
414 /// ```
415 fn lift<U, T>(self, other: U) -> Lift<Self, U, Part>
416 where
417 Self: Sized,
418 Self::Error: From<U::Error>,
419 U: MultipartWrite<T, Output = Part>,
420 {
421 assert_writer::<T, U::Recv, Self::Error, Self::Output, _>(Lift::new(
422 self, other,
423 ))
424 }
425
426 /// Map this writer's return type to a different value, returning a new
427 /// multipart writer with the given return type.
428 ///
429 /// # Examples
430 ///
431 /// ```rust
432 /// # futures::executor::block_on(async {
433 /// use multipart_write::{MultipartWriteExt as _, write};
434 ///
435 /// let mut writer = write::from_extend::<u8, Vec<u8>>()
436 /// .map_sent::<&'static str, _>(|_| "OK");
437 ///
438 /// let r1 = writer.send_flush(1).await.unwrap();
439 /// let r2 = writer.send_flush(2).await.unwrap();
440 /// let r3 = writer.send_flush(3).await.unwrap();
441 /// let out = writer.complete().await.unwrap();
442 ///
443 /// assert_eq!(vec![r1, r2, r3], vec!["OK", "OK", "OK"]);
444 /// assert_eq!(out, vec![1, 2, 3]);
445 /// # })
446 /// ```
447 fn map_sent<U, F>(self, f: F) -> MapSent<Self, F>
448 where
449 F: FnMut(Self::Recv) -> U,
450 Self: Sized,
451 {
452 assert_writer::<Part, U, Self::Error, Self::Output, _>(MapSent::new(
453 self, f,
454 ))
455 }
456
457 /// Map this writer's error type to a different value, returning a new
458 /// multipart writer with the given error type.
459 fn map_err<E, F>(self, f: F) -> MapErr<Self, F>
460 where
461 F: FnMut(Self::Error) -> E,
462 Self: Sized,
463 {
464 assert_writer::<Part, Self::Recv, E, Self::Output, _>(MapErr::new(
465 self, f,
466 ))
467 }
468
469 /// Map this writer's output type to a different type, returning a new
470 /// multipart writer with the given output type.
471 fn map_ok<U, F>(self, f: F) -> MapOk<Self, F>
472 where
473 F: FnMut(Self::Output) -> U,
474 Self: Sized,
475 {
476 assert_writer::<Part, Self::Recv, Self::Error, U, _>(MapOk::new(
477 self, f,
478 ))
479 }
480
481 /// A convenience method for calling [`MultipartWrite::poll_ready`] on
482 /// [`Unpin`] writer types.
483 #[must_use = "futures do nothing unless polled"]
484 fn poll_ready_unpin(
485 &mut self,
486 cx: &mut Context<'_>,
487 ) -> Poll<Result<(), Self::Error>>
488 where
489 Self: Unpin,
490 {
491 Pin::new(self).poll_ready(cx)
492 }
493
494 /// A convenience method for calling [`MultipartWrite::poll_flush`] on
495 /// [`Unpin`] writer types.
496 #[must_use = "futures do nothing unless polled"]
497 fn poll_flush_unpin(
498 &mut self,
499 cx: &mut Context<'_>,
500 ) -> Poll<Result<(), Self::Error>>
501 where
502 Self: Unpin,
503 {
504 Pin::new(self).poll_flush(cx)
505 }
506
507 /// A convenience method for calling [`MultipartWrite::poll_complete`] on
508 /// [`Unpin`] writer types.
509 #[must_use = "futures do nothing unless polled"]
510 fn poll_complete_unpin(
511 &mut self,
512 cx: &mut Context<'_>,
513 ) -> Poll<Result<Self::Output, Self::Error>>
514 where
515 Self: Unpin,
516 {
517 Pin::new(self).poll_complete(cx)
518 }
519
520 /// Provide a part to this writer in the output of a future.
521 ///
522 /// The result is a new writer over the type `U` that passes each value
523 /// through the function `f`, resolving the output, and sending it to the
524 /// inner writer.
525 ///
526 /// # Examples
527 ///
528 /// ```rust
529 /// # futures::executor::block_on(async {
530 /// use std::io::Error as IoError;
531 ///
532 /// use futures::future;
533 /// use multipart_write::{MultipartWriteExt as _, write};
534 ///
535 /// let mut writer = write::from_extend::<u8, Vec<u8>>()
536 /// .ready_part(|n| future::ready(Ok::<_, IoError>(n + 1_u8)));
537 ///
538 /// writer.send_flush(1).await.unwrap();
539 /// writer.send_flush(2).await.unwrap();
540 /// writer.send_flush(3).await.unwrap();
541 /// let out = writer.complete().await.unwrap();
542 ///
543 /// assert_eq!(out, vec![2, 3, 4]);
544 /// # })
545 /// ```
546 fn ready_part<U, E, Fut, F>(self, f: F) -> ReadyPart<Self, Part, Fut, F>
547 where
548 F: FnMut(U) -> Fut,
549 Fut: Future<Output = Result<Part, E>>,
550 E: From<Self::Error>,
551 Self: Sized,
552 {
553 assert_writer::<U, (), E, Self::Output, _>(ReadyPart::new(self, f))
554 }
555
556 /// A future that completes when a part has been fully processed into the
557 /// writer, including flushing.
558 fn send_flush(&mut self, part: Part) -> SendFlush<'_, Self, Part>
559 where
560 Self: Unpin,
561 {
562 SendFlush::new(self, part)
563 }
564
565 /// Asynchronously map the result of completing this writer to a different
566 /// result.
567 ///
568 /// # Examples
569 ///
570 /// ```rust
571 /// # futures::executor::block_on(async {
572 /// use multipart_write::{MultipartWriteExt as _, write};
573 ///
574 /// // `from_extend` turns an impl of `std::iter::Extend` into a writer,
575 /// // here one that writes a `u8` and outputs a `Vec<u8>`.
576 /// let mut writer = write::from_extend::<u8, Vec<u8>>().then(|res| {
577 /// futures::future::ready(res.map(|vs| vs.iter().sum::<u8>()))
578 /// });
579 ///
580 /// writer.send_flush(1).await.unwrap();
581 /// writer.send_flush(2).await.unwrap();
582 /// writer.send_flush(3).await.unwrap();
583 /// let out = writer.complete().await.unwrap();
584 ///
585 /// assert_eq!(out, 6);
586 /// # });
587 /// ```
588 fn then<T, Fut, F>(self, f: F) -> Then<Self, Fut, F>
589 where
590 F: FnMut(Result<Self::Output, Self::Error>) -> Fut,
591 Fut: Future<Output = Result<T, Self::Error>>,
592 Self: Sized,
593 {
594 assert_writer::<Part, Self::Recv, Self::Error, T, _>(Then::new(self, f))
595 }
596}
597
598fn assert_writer<Part, R, E, T, Wr>(wr: Wr) -> Wr
599where
600 Wr: MultipartWrite<Part, Recv = R, Error = E, Output = T>,
601{
602 wr
603}