multipart_write/write/
fanout.rs1use std::fmt::{self, Debug, Formatter};
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use futures_core::ready;
6
7use crate::{FusedMultipartWrite, MultipartWrite};
8
9pin_project_lite::pin_project! {
10 #[must_use = "futures do nothing unless polled"]
12 pub struct Fanout<Wr1: MultipartWrite<Part>, Wr2: MultipartWrite<Part>, Part> {
13 #[pin]
14 wr1: Wr1,
15 #[pin]
16 wr2: Wr2,
17 wro1: Option<Wr1::Output>,
18 wro2: Option<Wr2::Output>,
19 }
20}
21
22impl<Wr1: MultipartWrite<Part>, Wr2: MultipartWrite<Part>, Part>
23 Fanout<Wr1, Wr2, Part>
24{
25 pub(super) fn new(wr1: Wr1, wr2: Wr2) -> Self {
26 Self { wr1, wr2, wro1: None, wro2: None }
27 }
28}
29
30impl<Wr1, Wr2, Part> FusedMultipartWrite<Part> for Fanout<Wr1, Wr2, Part>
31where
32 Part: Clone,
33 Wr1: FusedMultipartWrite<Part>,
34 Wr2: FusedMultipartWrite<Part, Error = Wr1::Error>,
35{
36 fn is_terminated(&self) -> bool {
37 self.wr1.is_terminated() || self.wr2.is_terminated()
38 }
39}
40
41impl<Wr1, Wr2, Part> MultipartWrite<Part> for Fanout<Wr1, Wr2, Part>
42where
43 Part: Clone,
44 Wr1: MultipartWrite<Part>,
45 Wr2: MultipartWrite<Part, Error = Wr1::Error>,
46{
47 type Error = Wr1::Error;
48 type Output = (Wr1::Output, Wr2::Output);
49 type Recv = (Wr1::Recv, Wr2::Recv);
50
51 fn poll_ready(
52 self: Pin<&mut Self>,
53 cx: &mut Context<'_>,
54 ) -> Poll<Result<(), Self::Error>> {
55 let this = self.project();
56 let ready1 = this.wr1.poll_ready(cx)?.is_ready();
57 let ready2 = this.wr2.poll_ready(cx)?.is_ready();
58 if ready1 && ready2 { Poll::Ready(Ok(())) } else { Poll::Pending }
59 }
60
61 fn start_send(
62 self: Pin<&mut Self>,
63 part: Part,
64 ) -> Result<Self::Recv, Self::Error> {
65 let this = self.project();
66 let ret1 = this.wr1.start_send(part.clone())?;
67 let ret2 = this.wr2.start_send(part)?;
68 Ok((ret1, ret2))
69 }
70
71 fn poll_flush(
72 self: Pin<&mut Self>,
73 cx: &mut Context<'_>,
74 ) -> Poll<Result<(), Self::Error>> {
75 let this = self.project();
76 let ready1 = this.wr1.poll_flush(cx)?.is_ready();
77 let ready2 = this.wr2.poll_flush(cx)?.is_ready();
78 if ready1 && ready2 { Poll::Ready(Ok(())) } else { Poll::Pending }
79 }
80
81 fn poll_complete(
82 self: Pin<&mut Self>,
83 cx: &mut Context<'_>,
84 ) -> Poll<Result<Self::Output, Self::Error>> {
85 let this = self.project();
86 let out1 = ready!(this.wr1.poll_complete(cx))?;
87 *this.wro1 = Some(out1);
88 let out2 = ready!(this.wr2.poll_complete(cx))?;
89 Poll::Ready(Ok((this.wro1.take().unwrap(), out2)))
90 }
91}
92
93impl<Wr1, Wr2, Part> Debug for Fanout<Wr1, Wr2, Part>
94where
95 Wr1: MultipartWrite<Part> + Debug,
96 Wr2: MultipartWrite<Part> + Debug,
97 Wr1::Output: Debug,
98 Wr2::Output: Debug,
99{
100 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
101 f.debug_struct("Fanout")
102 .field("wr1", &self.wr1)
103 .field("wr2", &self.wr2)
104 .field("wro1", &self.wro1)
105 .field("wro2", &self.wro2)
106 .finish()
107 }
108}