multipart_write/write/
fanout.rs1use crate::{FusedMultipartWrite, MultipartWrite};
2
3use futures::ready;
4use std::pin::Pin;
5use std::task::{Context, Poll};
6
7#[must_use = "futures do nothing unless polled"]
9#[pin_project::pin_project]
10pub struct Fanout<Wr1: MultipartWrite<Part>, Wr2: MultipartWrite<Part>, Part> {
11 #[pin]
12 wr1: Wr1,
13 #[pin]
14 wr2: Wr2,
15 wro1: Option<Wr1::Output>,
16 wro2: Option<Wr2::Output>,
17}
18
19impl<Wr1: MultipartWrite<Part>, Wr2: MultipartWrite<Part>, Part> Fanout<Wr1, Wr2, Part> {
20 pub(super) fn new(wr1: Wr1, wr2: Wr2) -> Self {
21 Self {
22 wr1,
23 wr2,
24 wro1: None,
25 wro2: None,
26 }
27 }
28}
29
30impl<Wr1, Wr2, Part> FusedMultipartWrite<Part> for Fanout<Wr1, Wr2, Part>
31where
32 Part: Clone,
33 Wr1: FusedMultipartWrite<Part>,
34 Wr2: FusedMultipartWrite<Part, Error = Wr1::Error>,
35{
36 fn is_terminated(&self) -> bool {
37 self.wr1.is_terminated() || self.wr2.is_terminated()
38 }
39}
40
41impl<Wr1, Wr2, Part> MultipartWrite<Part> for Fanout<Wr1, Wr2, Part>
42where
43 Part: Clone,
44 Wr1: MultipartWrite<Part>,
45 Wr2: MultipartWrite<Part, Error = Wr1::Error>,
46{
47 type Ret = (Wr1::Ret, Wr2::Ret);
48 type Output = (Wr1::Output, Wr2::Output);
49 type Error = Wr1::Error;
50
51 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
52 let this = self.project();
53 let ready1 = this.wr1.poll_ready(cx)?.is_ready();
54 let ready2 = this.wr2.poll_ready(cx)?.is_ready();
55 if ready1 && ready2 {
56 Poll::Ready(Ok(()))
57 } else {
58 Poll::Pending
59 }
60 }
61
62 fn start_send(self: Pin<&mut Self>, part: Part) -> Result<Self::Ret, Self::Error> {
63 let this = self.project();
64 let ret1 = this.wr1.start_send(part.clone())?;
65 let ret2 = this.wr2.start_send(part)?;
66 Ok((ret1, ret2))
67 }
68
69 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
70 let this = self.project();
71 let ready1 = this.wr1.poll_flush(cx)?.is_ready();
72 let ready2 = this.wr2.poll_flush(cx)?.is_ready();
73 if ready1 && ready2 {
74 Poll::Ready(Ok(()))
75 } else {
76 Poll::Pending
77 }
78 }
79
80 fn poll_complete(
81 self: Pin<&mut Self>,
82 cx: &mut Context<'_>,
83 ) -> Poll<Result<Self::Output, Self::Error>> {
84 let this = self.project();
85 let output1 = ready!(this.wr1.poll_complete(cx))?;
86 *this.wro1 = Some(output1);
87 let output2 = ready!(this.wr2.poll_complete(cx))?;
88 Poll::Ready(Ok((this.wro1.take().unwrap(), output2)))
89 }
90}