Skip to main content

multipart_write/write/
fanout.rs

1use std::fmt::{self, Debug, Formatter};
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use futures_core::ready;
6
7use crate::{FusedMultipartWrite, MultipartWrite};
8
9pin_project_lite::pin_project! {
10    /// `MultipartWrite` for [`fanout`](super::MultipartWriteExt::fanout).
11    #[must_use = "futures do nothing unless polled"]
12    pub struct Fanout<Wr1: MultipartWrite<Part>, Wr2: MultipartWrite<Part>, Part> {
13        #[pin]
14        wr1: Wr1,
15        #[pin]
16        wr2: Wr2,
17        wro1: Option<Wr1::Output>,
18        wro2: Option<Wr2::Output>,
19    }
20}
21
22impl<Wr1: MultipartWrite<Part>, Wr2: MultipartWrite<Part>, Part>
23    Fanout<Wr1, Wr2, Part>
24{
25    pub(super) fn new(wr1: Wr1, wr2: Wr2) -> Self {
26        Self { wr1, wr2, wro1: None, wro2: None }
27    }
28}
29
30impl<Wr1, Wr2, Part> FusedMultipartWrite<Part> for Fanout<Wr1, Wr2, Part>
31where
32    Part: Clone,
33    Wr1: FusedMultipartWrite<Part>,
34    Wr2: FusedMultipartWrite<Part, Error = Wr1::Error>,
35{
36    fn is_terminated(&self) -> bool {
37        self.wr1.is_terminated() || self.wr2.is_terminated()
38    }
39}
40
41impl<Wr1, Wr2, Part> MultipartWrite<Part> for Fanout<Wr1, Wr2, Part>
42where
43    Part: Clone,
44    Wr1: MultipartWrite<Part>,
45    Wr2: MultipartWrite<Part, Error = Wr1::Error>,
46{
47    type Error = Wr1::Error;
48    type Output = (Wr1::Output, Wr2::Output);
49    type Recv = (Wr1::Recv, Wr2::Recv);
50
51    fn poll_ready(
52        self: Pin<&mut Self>,
53        cx: &mut Context<'_>,
54    ) -> Poll<Result<(), Self::Error>> {
55        let this = self.project();
56        let ready1 = this.wr1.poll_ready(cx)?.is_ready();
57        let ready2 = this.wr2.poll_ready(cx)?.is_ready();
58        if ready1 && ready2 { Poll::Ready(Ok(())) } else { Poll::Pending }
59    }
60
61    fn start_send(
62        self: Pin<&mut Self>,
63        part: Part,
64    ) -> Result<Self::Recv, Self::Error> {
65        let this = self.project();
66        let ret1 = this.wr1.start_send(part.clone())?;
67        let ret2 = this.wr2.start_send(part)?;
68        Ok((ret1, ret2))
69    }
70
71    fn poll_flush(
72        self: Pin<&mut Self>,
73        cx: &mut Context<'_>,
74    ) -> Poll<Result<(), Self::Error>> {
75        let this = self.project();
76        let ready1 = this.wr1.poll_flush(cx)?.is_ready();
77        let ready2 = this.wr2.poll_flush(cx)?.is_ready();
78        if ready1 && ready2 { Poll::Ready(Ok(())) } else { Poll::Pending }
79    }
80
81    fn poll_complete(
82        self: Pin<&mut Self>,
83        cx: &mut Context<'_>,
84    ) -> Poll<Result<Self::Output, Self::Error>> {
85        let this = self.project();
86        let out1 = ready!(this.wr1.poll_complete(cx))?;
87        *this.wro1 = Some(out1);
88        let out2 = ready!(this.wr2.poll_complete(cx))?;
89        Poll::Ready(Ok((this.wro1.take().unwrap(), out2)))
90    }
91}
92
93impl<Wr1, Wr2, Part> Debug for Fanout<Wr1, Wr2, Part>
94where
95    Wr1: MultipartWrite<Part> + Debug,
96    Wr2: MultipartWrite<Part> + Debug,
97    Wr1::Output: Debug,
98    Wr2::Output: Debug,
99{
100    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
101        f.debug_struct("Fanout")
102            .field("wr1", &self.wr1)
103            .field("wr2", &self.wr2)
104            .field("wro1", &self.wro1)
105            .field("wro2", &self.wro2)
106            .finish()
107    }
108}