multipart_write/write/
fanout.rs

1use crate::{FusedMultipartWrite, MultipartWrite};
2
3use futures::ready;
4use std::pin::Pin;
5use std::task::{Context, Poll};
6
7/// `MultipartWrite` for [`fanout`](super::MultipartWriteExt::fanout).
8#[must_use = "futures do nothing unless polled"]
9#[pin_project::pin_project]
10pub struct Fanout<Wr1: MultipartWrite<Part>, Wr2: MultipartWrite<Part>, Part> {
11    #[pin]
12    wr1: Wr1,
13    #[pin]
14    wr2: Wr2,
15    wro1: Option<Wr1::Output>,
16    wro2: Option<Wr2::Output>,
17}
18
19impl<Wr1: MultipartWrite<Part>, Wr2: MultipartWrite<Part>, Part> Fanout<Wr1, Wr2, Part> {
20    pub(super) fn new(wr1: Wr1, wr2: Wr2) -> Self {
21        Self {
22            wr1,
23            wr2,
24            wro1: None,
25            wro2: None,
26        }
27    }
28}
29
30impl<Wr1, Wr2, Part> FusedMultipartWrite<Part> for Fanout<Wr1, Wr2, Part>
31where
32    Part: Clone,
33    Wr1: FusedMultipartWrite<Part>,
34    Wr2: FusedMultipartWrite<Part, Error = Wr1::Error>,
35{
36    fn is_terminated(&self) -> bool {
37        self.wr1.is_terminated() || self.wr2.is_terminated()
38    }
39}
40
41impl<Wr1, Wr2, Part> MultipartWrite<Part> for Fanout<Wr1, Wr2, Part>
42where
43    Part: Clone,
44    Wr1: MultipartWrite<Part>,
45    Wr2: MultipartWrite<Part, Error = Wr1::Error>,
46{
47    type Ret = (Wr1::Ret, Wr2::Ret);
48    type Output = (Wr1::Output, Wr2::Output);
49    type Error = Wr1::Error;
50
51    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
52        let this = self.project();
53        let ready1 = this.wr1.poll_ready(cx)?.is_ready();
54        let ready2 = this.wr2.poll_ready(cx)?.is_ready();
55        if ready1 && ready2 {
56            Poll::Ready(Ok(()))
57        } else {
58            Poll::Pending
59        }
60    }
61
62    fn start_send(self: Pin<&mut Self>, part: Part) -> Result<Self::Ret, Self::Error> {
63        let this = self.project();
64        let ret1 = this.wr1.start_send(part.clone())?;
65        let ret2 = this.wr2.start_send(part)?;
66        Ok((ret1, ret2))
67    }
68
69    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
70        let this = self.project();
71        let ready1 = this.wr1.poll_flush(cx)?.is_ready();
72        let ready2 = this.wr2.poll_flush(cx)?.is_ready();
73        if ready1 && ready2 {
74            Poll::Ready(Ok(()))
75        } else {
76            Poll::Pending
77        }
78    }
79
80    fn poll_complete(
81        self: Pin<&mut Self>,
82        cx: &mut Context<'_>,
83    ) -> Poll<Result<Self::Output, Self::Error>> {
84        let this = self.project();
85        let output1 = ready!(this.wr1.poll_complete(cx))?;
86        *this.wro1 = Some(output1);
87        let output2 = ready!(this.wr2.poll_complete(cx))?;
88        Poll::Ready(Ok((this.wro1.take().unwrap(), output2)))
89    }
90}