multipart_write/write/mod.rs
1//! `MultipartWrite` combinators.
2//!
3//! This module contains the trait [`MultipartWriteExt`], which provides
4//! adapters for chaining and composing writers.
5use crate::{
6 BoxFusedMultipartWrite, BoxMultipartWrite, FusedMultipartWrite,
7 LocalBoxFusedMultipartWrite, LocalBoxMultipartWrite, MultipartWrite,
8};
9
10use futures_core::future::Future;
11use std::pin::Pin;
12use std::task::{Context, Poll};
13
14mod buffered;
15pub use buffered::Buffered;
16
17mod complete;
18pub use complete::Complete;
19
20mod extend;
21pub use extend::{Extend, extend, extend_default};
22
23mod fanout;
24pub use fanout::Fanout;
25
26mod feed;
27pub use feed::Feed;
28
29mod filter_map_part;
30pub use filter_map_part::FilterMapPart;
31
32mod filter_part;
33pub use filter_part::FilterPart;
34
35mod flush;
36pub use flush::Flush;
37
38mod fold_sent;
39pub use fold_sent::FoldSent;
40
41mod for_each_recv;
42pub use for_each_recv::ForEachRecv;
43
44mod fuse;
45pub use fuse::Fuse;
46
47mod lift;
48pub use lift::Lift;
49
50mod map_err;
51pub use map_err::MapErr;
52
53mod map_ok;
54pub use map_ok::MapOk;
55
56mod map_sent;
57pub use map_sent::MapSent;
58
59mod ready_part;
60pub use ready_part::ReadyPart;
61
62mod send_flush;
63pub use send_flush::SendFlush;
64
65mod then;
66pub use then::Then;
67
68impl<Wr: MultipartWrite<Part>, Part> MultipartWriteExt<Part> for Wr {}
69
70/// An extension trait for `MultipartWrite` providing a variety of convenient
71/// combinator functions.
72pub trait MultipartWriteExt<Part>: MultipartWrite<Part> {
73 /// Wrap this writer in a `Box`, pinning it.
74 fn boxed<'a>(
75 self,
76 ) -> BoxMultipartWrite<'a, Part, Self::Recv, Self::Output, Self::Error>
77 where
78 Self: Sized + Send + 'a,
79 {
80 Box::pin(self)
81 }
82
83 /// Wrap this writer, which additionally has conditions making it a
84 /// [`FusedMultipartWrite`], in a `Box`, pinning it.
85 fn box_fused<'a>(
86 self,
87 ) -> BoxFusedMultipartWrite<'a, Part, Self::Recv, Self::Output, Self::Error>
88 where
89 Self: Sized + Send + FusedMultipartWrite<Part> + 'a,
90 {
91 Box::pin(self)
92 }
93
94 /// Wrap this writer, which additionally has conditions making it a
95 /// [`FusedMultipartWrite`], in a `Box`, pinning it.
96 ///
97 /// Similar to `box_fused` but without the `Send` requirement.
98 fn box_fused_local<'a>(
99 self,
100 ) -> LocalBoxFusedMultipartWrite<
101 'a,
102 Part,
103 Self::Recv,
104 Self::Output,
105 Self::Error,
106 >
107 where
108 Self: Sized + FusedMultipartWrite<Part> + 'a,
109 {
110 Box::pin(self)
111 }
112
113 /// Wrap this writer in a `Box`, pinning it.
114 ///
115 /// Similar to `boxed` but without the `Send` requirement.
116 fn boxed_local<'a>(
117 self,
118 ) -> LocalBoxMultipartWrite<'a, Part, Self::Recv, Self::Output, Self::Error>
119 where
120 Self: Sized + 'a,
121 {
122 Box::pin(self)
123 }
124
125 /// Adds a fixed size buffer to the current writer.
126 ///
127 /// The resulting `MultipartWrite` will buffer up to `capacity` items when
128 /// the underlying writer is not able to accept new parts.
129 ///
130 /// The values returned when the underlying writer has received a part are
131 /// also accumulated and returned in batch.
132 fn buffered(
133 self,
134 capacity: impl Into<Option<usize>>,
135 ) -> Buffered<Self, Part>
136 where
137 Self: Sized,
138 {
139 assert_writer::<
140 Part,
141 Option<Vec<Self::Recv>>,
142 Self::Error,
143 Self::Output,
144 _,
145 >(Buffered::new(self, capacity.into().unwrap_or_default()))
146 }
147
148 /// A future that runs this writer to completion, returning the associated
149 /// output.
150 fn complete(&mut self) -> Complete<'_, Self, Part>
151 where
152 Self: Unpin,
153 {
154 Complete::new(self)
155 }
156
157 /// Fanout the part to multiple writers.
158 ///
159 /// This adapter clones each incoming part and forwards it to both writers.
160 ///
161 /// # Examples
162 ///
163 /// ```rust
164 /// # futures::executor::block_on(async {
165 /// use multipart_write::{MultipartWriteExt as _, write};
166 ///
167 /// let init: Vec<u8> = Vec::new();
168 /// let wr1 = write::extend(init.clone());
169 /// let wr2 = write::extend(init);
170 ///
171 /// let mut writer = wr1.fanout(wr2);
172 /// writer.send_flush(1).await.unwrap();
173 /// writer.send_flush(2).await.unwrap();
174 /// writer.send_flush(3).await.unwrap();
175 /// let out = writer.complete().await.unwrap();
176 ///
177 /// assert_eq!(out, (vec![1, 2, 3], vec![1, 2, 3]));
178 /// # })
179 /// ```
180 fn fanout<U>(self, other: U) -> Fanout<Self, U, Part>
181 where
182 Part: Clone,
183 U: MultipartWrite<Part, Error = Self::Error>,
184 Self: Sized,
185 {
186 assert_writer::<
187 Part,
188 (Self::Recv, U::Recv),
189 Self::Error,
190 (Self::Output, U::Output),
191 _,
192 >(Fanout::new(self, other))
193 }
194
195 /// A future that completes after the given part has been received by the
196 /// writer.
197 ///
198 /// Unlike `send_flush`, the returned future does not flush the writer. It
199 /// is the caller's responsibility to ensure all pending items are processed
200 /// by calling `flush` or `complete`.
201 fn feed(&mut self, part: Part) -> Feed<'_, Self, Part>
202 where
203 Self: Unpin,
204 {
205 Feed::new(self, part)
206 }
207
208 /// Apply a filter to this writer's parts, returning a new writer with the
209 /// same output.
210 ///
211 /// The return type of this writer is `Option<Self::Recv>` and is `None`
212 /// when the part did not pass the filter.
213 ///
214 /// # Examples
215 ///
216 /// ```rust
217 /// # futures::executor::block_on(async {
218 /// use multipart_write::{MultipartWriteExt, write};
219 ///
220 /// let init: Vec<u8> = Vec::new();
221 /// let mut writer = write::extend(init).filter_part(|n| n % 2 == 0);
222 ///
223 /// let r1 = writer.send_flush(1).await.unwrap();
224 /// let r2 = writer.send_flush(2).await.unwrap();
225 /// let r3 = writer.send_flush(3).await.unwrap();
226 /// let out = writer.complete().await.unwrap();
227 ///
228 /// assert!(r1.is_none() && r2.is_some() && r3.is_none());
229 /// assert_eq!(out, vec![2]);
230 /// # })
231 /// ```
232 fn filter_part<F>(self, f: F) -> FilterPart<Self, Part, F>
233 where
234 F: FnMut(&Part) -> bool,
235 Self: Sized,
236 {
237 assert_writer::<Part, Option<Self::Recv>, Self::Error, Self::Output, _>(
238 FilterPart::new(self, f),
239 )
240 }
241
242 /// Attempt to map the input to a part for this writer, filtering out the
243 /// inputs where the mapping returns `None`.
244 ///
245 /// The return type of this writer is `Option<Self::Recv>` and is `None`
246 /// when the provided closure returns `None`.
247 ///
248 /// # Examples
249 ///
250 /// ```rust
251 /// # futures::executor::block_on(async {
252 /// use multipart_write::{MultipartWriteExt as _, write};
253 ///
254 /// let init: Vec<String> = Vec::new();
255 /// let mut writer = write::extend(init).filter_map_part(|n: u8| {
256 /// if n % 2 == 0 { Some(n.to_string()) } else { None }
257 /// });
258 ///
259 /// let r1 = writer.send_flush(1).await.unwrap();
260 /// let r2 = writer.send_flush(2).await.unwrap();
261 /// let r3 = writer.send_flush(3).await.unwrap();
262 /// let out = writer.complete().await.unwrap();
263 ///
264 /// assert!(r1.is_none() && r2.is_some() && r3.is_none());
265 /// assert_eq!(out, vec!["2".to_string()]);
266 /// # })
267 /// ```
268 fn filter_map_part<P, F>(self, f: F) -> FilterMapPart<Self, Part, P, F>
269 where
270 F: FnMut(P) -> Option<Part>,
271 Self: Sized,
272 {
273 assert_writer::<P, Option<Self::Recv>, Self::Error, Self::Output, _>(
274 FilterMapPart::new(self, f),
275 )
276 }
277
278 /// A future that completes when the underlying writer has been flushed.
279 fn flush(&mut self) -> Flush<'_, Self, Part>
280 where
281 Self: Unpin,
282 {
283 Flush::new(self)
284 }
285
286 /// Accumulate the values returned by starting a send, returning it with the
287 /// output.
288 ///
289 /// # Examples
290 ///
291 /// ```rust
292 /// # futures::executor::block_on(async {
293 /// use multipart_write::{MultipartWriteExt as _, write};
294 ///
295 /// let init: Vec<u8> = Vec::new();
296 /// let mut writer = write::extend(init).fold_sent(0, |n, _| n + 1);
297 ///
298 /// let r1 = writer.send_flush(1).await.unwrap();
299 /// let r2 = writer.send_flush(2).await.unwrap();
300 /// let r3 = writer.send_flush(3).await.unwrap();
301 /// let out = writer.complete().await.unwrap();
302 ///
303 /// assert_eq!(out, (3, vec![1, 2, 3]));
304 /// # })
305 /// ```
306 fn fold_sent<T, F>(self, id: T, f: F) -> FoldSent<Self, T, F, Part>
307 where
308 F: FnMut(T, &Self::Recv) -> T,
309 Self: Sized,
310 {
311 assert_writer::<Part, Self::Recv, Self::Error, (T, Self::Output), _>(
312 FoldSent::new(self, id, f),
313 )
314 }
315
316 /// Evaluate the given async closure on the associated `Self::Recv` for this
317 /// writer.
318 ///
319 /// The result is a new writer that has all of the same properties as this
320 /// writer, except that `poll_ready` will not accept the next part until the
321 /// future returned by evaluating `F` on the return value resolves.
322 ///
323 /// # Examples
324 ///
325 /// ```rust
326 /// # futures::executor::block_on(async {
327 /// use std::sync::Arc;
328 /// use std::sync::atomic::{AtomicU8, Ordering};
329 ///
330 /// use multipart_write::{MultipartWriteExt as _, write};
331 ///
332 /// let counter = Arc::new(AtomicU8::new(1));
333 ///
334 /// // `extend` has no return type, so `map_sent` makes one for the
335 /// // demonstration.
336 /// let init: Vec<u8> = Vec::new();
337 /// let mut writer = write::extend(init)
338 /// .map_sent(|_| {
339 /// let cnt = Arc::clone(&counter);
340 /// let n = cnt.fetch_add(1, Ordering::SeqCst);
341 /// n
342 /// })
343 /// .for_each_recv(|n| {
344 /// println!("{n} parts written");
345 /// futures::future::ready(())
346 /// });
347 ///
348 /// let r1 = writer.send_flush(1).await.unwrap();
349 /// let r2 = writer.send_flush(2).await.unwrap();
350 /// let r3 = writer.send_flush(3).await.unwrap();
351 /// let out = writer.complete().await.unwrap();
352 ///
353 /// assert_eq!(out, vec![1, 2, 3]);
354 /// # })
355 /// ```
356 fn for_each_recv<Fut, F>(self, f: F) -> ForEachRecv<Self, Part, Fut, F>
357 where
358 Self: Sized,
359 Self::Recv: Clone,
360 F: FnMut(Self::Recv) -> Fut,
361 Fut: Future<Output = ()>,
362 {
363 assert_writer::<Part, Self::Recv, Self::Error, Self::Output, _>(
364 ForEachRecv::new(self, f),
365 )
366 }
367
368 /// Returns a new writer that fuses according to the provided closure.
369 ///
370 /// The resulting writer wraps both `Self::Recv` and `Self::Output` in
371 /// an `Option` and is guaranteed to both output and return `Ok(None)`
372 /// when called after becoming fused.
373 fn fuse<F>(self, f: F) -> Fuse<Self, Part, F>
374 where
375 F: FnMut(&Self::Output) -> bool,
376 Self: Sized,
377 {
378 assert_writer::<
379 Part,
380 Option<Self::Recv>,
381 Self::Error,
382 Option<Self::Output>,
383 _,
384 >(Fuse::new(self, f))
385 }
386
387 /// Produce the parts for this writer from the output of another writer.
388 ///
389 /// # Examples
390 ///
391 /// ```rust
392 /// # futures::executor::block_on(async {
393 /// use multipart_write::{MultipartWriteExt as _, write};
394 ///
395 /// let init: Vec<u8> = Vec::new();
396 /// let wr = write::extend(init.clone()).map_ok(|vs| vs.iter().sum::<u8>());
397 /// let mut writer = write::extend(init).lift(wr);
398 ///
399 /// // We use `feed` and not `send_flush` because `send_flush` will complete
400 /// // the outer writer and write its output to the inner writer after each
401 /// // send, which is not what we want the example to show.
402 /// writer.feed(1).await.unwrap();
403 /// writer.feed(2).await.unwrap();
404 ///
405 /// // Flush the writer manually, which now completes the outer writer and
406 /// // writes its output, the sum of the parts written, to the inner writer.
407 /// writer.flush().await.unwrap();
408 ///
409 /// writer.feed(3).await.unwrap();
410 /// writer.feed(4).await.unwrap();
411 /// writer.feed(5).await.unwrap();
412 /// let out = writer.complete().await.unwrap();
413 ///
414 /// assert_eq!(out, vec![3, 12]);
415 /// # })
416 /// ```
417 fn lift<U, P>(self, other: U) -> Lift<Self, U, P, Part>
418 where
419 Self: Sized,
420 Self::Error: From<U::Error>,
421 U: MultipartWrite<P, Output = Part>,
422 {
423 assert_writer::<P, U::Recv, Self::Error, Self::Output, _>(Lift::new(
424 self, other,
425 ))
426 }
427
428 /// Map this writer's return type to a different value, returning a new
429 /// multipart writer with the given return type.
430 ///
431 /// # Examples
432 ///
433 /// ```rust
434 /// # futures::executor::block_on(async {
435 /// use multipart_write::{MultipartWriteExt as _, write};
436 ///
437 /// let init: Vec<u8> = Vec::new();
438 /// let mut writer = write::extend(init).map_sent(|_| "OK");
439 ///
440 /// let r1 = writer.send_flush(1).await.unwrap();
441 /// let r2 = writer.send_flush(2).await.unwrap();
442 /// let r3 = writer.send_flush(3).await.unwrap();
443 /// let out = writer.complete().await.unwrap();
444 ///
445 /// assert_eq!(vec![r1, r2, r3], vec!["OK", "OK", "OK"]);
446 /// assert_eq!(out, vec![1, 2, 3]);
447 /// # })
448 /// ```
449 fn map_sent<R, F>(self, f: F) -> MapSent<Self, Part, R, F>
450 where
451 F: FnMut(Self::Recv) -> R,
452 Self: Sized,
453 {
454 assert_writer::<Part, R, Self::Error, Self::Output, _>(MapSent::new(
455 self, f,
456 ))
457 }
458
459 /// Map this writer's error type to a different value, returning a new
460 /// multipart writer with the given error type.
461 fn map_err<E, F>(self, f: F) -> MapErr<Self, Part, E, F>
462 where
463 F: FnMut(Self::Error) -> E,
464 Self: Sized,
465 {
466 assert_writer::<Part, Self::Recv, E, Self::Output, _>(MapErr::new(
467 self, f,
468 ))
469 }
470
471 /// Map this writer's output type to a different type, returning a new
472 /// multipart writer with the given output type.
473 fn map_ok<T, F>(self, f: F) -> MapOk<Self, Part, T, F>
474 where
475 F: FnMut(Self::Output) -> T,
476 Self: Sized,
477 {
478 assert_writer::<Part, Self::Recv, Self::Error, T, _>(MapOk::new(
479 self, f,
480 ))
481 }
482
483 /// A convenience method for calling [`MultipartWrite::poll_ready`] on
484 /// [`Unpin`] writer types.
485 #[must_use = "futures do nothing unless polled"]
486 fn poll_ready_unpin(
487 &mut self,
488 cx: &mut Context<'_>,
489 ) -> Poll<Result<(), Self::Error>>
490 where
491 Self: Unpin,
492 {
493 Pin::new(self).poll_ready(cx)
494 }
495
496 /// A convenience method for calling [`MultipartWrite::poll_flush`] on
497 /// [`Unpin`] writer types.
498 #[must_use = "futures do nothing unless polled"]
499 fn poll_flush_unpin(
500 &mut self,
501 cx: &mut Context<'_>,
502 ) -> Poll<Result<(), Self::Error>>
503 where
504 Self: Unpin,
505 {
506 Pin::new(self).poll_flush(cx)
507 }
508
509 /// A convenience method for calling [`MultipartWrite::poll_complete`] on
510 /// [`Unpin`] writer types.
511 #[must_use = "futures do nothing unless polled"]
512 fn poll_complete_unpin(
513 &mut self,
514 cx: &mut Context<'_>,
515 ) -> Poll<Result<Self::Output, Self::Error>>
516 where
517 Self: Unpin,
518 {
519 Pin::new(self).poll_complete(cx)
520 }
521
522 /// Provide a part to this writer in the output of a future.
523 ///
524 /// The result is a new writer over the type `U` that passes each value
525 /// through the function `f`, resolving the output, and sending it to the
526 /// inner writer.
527 ///
528 /// # Examples
529 ///
530 /// ```rust
531 /// # futures::executor::block_on(async {
532 /// use multipart_write::{MultipartWriteExt as _, write};
533 ///
534 /// let init: Vec<u8> = Vec::new();
535 /// let mut writer = write::extend(init)
536 /// .ready_part(|n: u8| futures::future::ready(Ok(n + 1)));
537 ///
538 /// writer.send_flush(1).await.unwrap();
539 /// writer.send_flush(2).await.unwrap();
540 /// writer.send_flush(3).await.unwrap();
541 /// let out = writer.complete().await.unwrap();
542 ///
543 /// assert_eq!(out, vec![2, 3, 4]);
544 /// # })
545 /// ```
546 fn ready_part<P, Fut, F>(self, f: F) -> ReadyPart<Self, Part, P, Fut, F>
547 where
548 F: FnMut(P) -> Fut,
549 Fut: Future<Output = Result<Part, Self::Error>>,
550 Self: Sized,
551 {
552 assert_writer::<P, (), Self::Error, Self::Output, _>(ReadyPart::new(
553 self, f,
554 ))
555 }
556
557 /// A future that completes when a part has been fully processed into the
558 /// writer, including flushing.
559 fn send_flush(&mut self, part: Part) -> SendFlush<'_, Self, Part>
560 where
561 Self: Unpin,
562 {
563 SendFlush::new(self, part)
564 }
565
566 /// Asynchronously map the result of completing this writer to a different
567 /// result.
568 ///
569 /// # Examples
570 ///
571 /// ```rust
572 /// # futures::executor::block_on(async {
573 /// use multipart_write::{MultipartWriteExt as _, write};
574 ///
575 /// let init: Vec<u8> = Vec::new();
576 /// let mut writer = write::extend(init).then(|res| {
577 /// futures::future::ready(res.map(|vs| vs.iter().sum::<u8>()))
578 /// });
579 ///
580 /// writer.send_flush(1).await.unwrap();
581 /// writer.send_flush(2).await.unwrap();
582 /// writer.send_flush(3).await.unwrap();
583 /// let out = writer.complete().await.unwrap();
584 ///
585 /// assert_eq!(out, 6);
586 /// # });
587 /// ```
588 fn then<T, Fut, F>(self, f: F) -> Then<Self, Part, T, Fut, F>
589 where
590 F: FnMut(Result<Self::Output, Self::Error>) -> Fut,
591 Fut: Future<Output = Result<T, Self::Error>>,
592 Self: Sized,
593 {
594 assert_writer::<Part, Self::Recv, Self::Error, T, _>(Then::new(self, f))
595 }
596}
597
598fn assert_writer<Part, R, E, T, Wr>(wr: Wr) -> Wr
599where
600 Wr: MultipartWrite<Part, Recv = R, Error = E, Output = T>,
601{
602 wr
603}