multipart_write/write/
fanout.rs

1use crate::{FusedMultipartWrite, MultipartWrite};
2
3use futures_core::ready;
4use std::fmt::{self, Debug, Formatter};
5use std::pin::Pin;
6use std::task::{Context, Poll};
7
8pin_project_lite::pin_project! {
9    /// `MultipartWrite` for [`fanout`](super::MultipartWriteExt::fanout).
10    #[must_use = "futures do nothing unless polled"]
11    pub struct Fanout<Wr1: MultipartWrite<Part>, Wr2: MultipartWrite<Part>, Part> {
12        #[pin]
13        wr1: Wr1,
14        #[pin]
15        wr2: Wr2,
16        wro1: Option<Wr1::Output>,
17        wro2: Option<Wr2::Output>,
18    }
19}
20
21impl<Wr1: MultipartWrite<Part>, Wr2: MultipartWrite<Part>, Part> Fanout<Wr1, Wr2, Part> {
22    pub(super) fn new(wr1: Wr1, wr2: Wr2) -> Self {
23        Self {
24            wr1,
25            wr2,
26            wro1: None,
27            wro2: None,
28        }
29    }
30}
31
32impl<Wr1, Wr2, Part> FusedMultipartWrite<Part> for Fanout<Wr1, Wr2, Part>
33where
34    Part: Clone,
35    Wr1: FusedMultipartWrite<Part>,
36    Wr2: FusedMultipartWrite<Part, Error = Wr1::Error>,
37{
38    fn is_terminated(&self) -> bool {
39        self.wr1.is_terminated() || self.wr2.is_terminated()
40    }
41}
42
43impl<Wr1, Wr2, Part> MultipartWrite<Part> for Fanout<Wr1, Wr2, Part>
44where
45    Part: Clone,
46    Wr1: MultipartWrite<Part>,
47    Wr2: MultipartWrite<Part, Error = Wr1::Error>,
48{
49    type Ret = (Wr1::Ret, Wr2::Ret);
50    type Output = (Wr1::Output, Wr2::Output);
51    type Error = Wr1::Error;
52
53    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
54        let this = self.project();
55        let ready1 = this.wr1.poll_ready(cx)?.is_ready();
56        let ready2 = this.wr2.poll_ready(cx)?.is_ready();
57        if ready1 && ready2 {
58            Poll::Ready(Ok(()))
59        } else {
60            Poll::Pending
61        }
62    }
63
64    fn start_send(self: Pin<&mut Self>, part: Part) -> Result<Self::Ret, Self::Error> {
65        let this = self.project();
66        let ret1 = this.wr1.start_send(part.clone())?;
67        let ret2 = this.wr2.start_send(part)?;
68        Ok((ret1, ret2))
69    }
70
71    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
72        let this = self.project();
73        let ready1 = this.wr1.poll_flush(cx)?.is_ready();
74        let ready2 = this.wr2.poll_flush(cx)?.is_ready();
75        if ready1 && ready2 {
76            Poll::Ready(Ok(()))
77        } else {
78            Poll::Pending
79        }
80    }
81
82    fn poll_complete(
83        self: Pin<&mut Self>,
84        cx: &mut Context<'_>,
85    ) -> Poll<Result<Self::Output, Self::Error>> {
86        let this = self.project();
87        let out1 = ready!(this.wr1.poll_complete(cx))?;
88        *this.wro1 = Some(out1);
89        let out2 = ready!(this.wr2.poll_complete(cx))?;
90        Poll::Ready(Ok((this.wro1.take().unwrap(), out2)))
91    }
92}
93
94impl<Wr1, Wr2, Part> Debug for Fanout<Wr1, Wr2, Part>
95where
96    Wr1: MultipartWrite<Part> + Debug,
97    Wr2: MultipartWrite<Part> + Debug,
98    Wr1::Output: Debug,
99    Wr2::Output: Debug,
100{
101    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
102        f.debug_struct("Fanout")
103            .field("wr1", &self.wr1)
104            .field("wr2", &self.wr2)
105            .field("wro1", &self.wro1)
106            .field("wro2", &self.wro2)
107            .finish()
108    }
109}