multipart_write/write/
fanout.rs1use crate::{FusedMultipartWrite, MultipartWrite};
2
3use futures_core::ready;
4use std::fmt::{self, Debug, Formatter};
5use std::pin::Pin;
6use std::task::{Context, Poll};
7
8pin_project_lite::pin_project! {
9 #[must_use = "futures do nothing unless polled"]
11 pub struct Fanout<Wr1: MultipartWrite<Part>, Wr2: MultipartWrite<Part>, Part> {
12 #[pin]
13 wr1: Wr1,
14 #[pin]
15 wr2: Wr2,
16 wro1: Option<Wr1::Output>,
17 wro2: Option<Wr2::Output>,
18 }
19}
20
21impl<Wr1: MultipartWrite<Part>, Wr2: MultipartWrite<Part>, Part> Fanout<Wr1, Wr2, Part> {
22 pub(super) fn new(wr1: Wr1, wr2: Wr2) -> Self {
23 Self {
24 wr1,
25 wr2,
26 wro1: None,
27 wro2: None,
28 }
29 }
30}
31
32impl<Wr1, Wr2, Part> FusedMultipartWrite<Part> for Fanout<Wr1, Wr2, Part>
33where
34 Part: Clone,
35 Wr1: FusedMultipartWrite<Part>,
36 Wr2: FusedMultipartWrite<Part, Error = Wr1::Error>,
37{
38 fn is_terminated(&self) -> bool {
39 self.wr1.is_terminated() || self.wr2.is_terminated()
40 }
41}
42
43impl<Wr1, Wr2, Part> MultipartWrite<Part> for Fanout<Wr1, Wr2, Part>
44where
45 Part: Clone,
46 Wr1: MultipartWrite<Part>,
47 Wr2: MultipartWrite<Part, Error = Wr1::Error>,
48{
49 type Ret = (Wr1::Ret, Wr2::Ret);
50 type Output = (Wr1::Output, Wr2::Output);
51 type Error = Wr1::Error;
52
53 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
54 let this = self.project();
55 let ready1 = this.wr1.poll_ready(cx)?.is_ready();
56 let ready2 = this.wr2.poll_ready(cx)?.is_ready();
57 if ready1 && ready2 {
58 Poll::Ready(Ok(()))
59 } else {
60 Poll::Pending
61 }
62 }
63
64 fn start_send(self: Pin<&mut Self>, part: Part) -> Result<Self::Ret, Self::Error> {
65 let this = self.project();
66 let ret1 = this.wr1.start_send(part.clone())?;
67 let ret2 = this.wr2.start_send(part)?;
68 Ok((ret1, ret2))
69 }
70
71 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
72 let this = self.project();
73 let ready1 = this.wr1.poll_flush(cx)?.is_ready();
74 let ready2 = this.wr2.poll_flush(cx)?.is_ready();
75 if ready1 && ready2 {
76 Poll::Ready(Ok(()))
77 } else {
78 Poll::Pending
79 }
80 }
81
82 fn poll_complete(
83 self: Pin<&mut Self>,
84 cx: &mut Context<'_>,
85 ) -> Poll<Result<Self::Output, Self::Error>> {
86 let this = self.project();
87 let out1 = ready!(this.wr1.poll_complete(cx))?;
88 *this.wro1 = Some(out1);
89 let out2 = ready!(this.wr2.poll_complete(cx))?;
90 Poll::Ready(Ok((this.wro1.take().unwrap(), out2)))
91 }
92}
93
94impl<Wr1, Wr2, Part> Debug for Fanout<Wr1, Wr2, Part>
95where
96 Wr1: MultipartWrite<Part> + Debug,
97 Wr2: MultipartWrite<Part> + Debug,
98 Wr1::Output: Debug,
99 Wr2::Output: Debug,
100{
101 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
102 f.debug_struct("Fanout")
103 .field("wr1", &self.wr1)
104 .field("wr2", &self.wr2)
105 .field("wro1", &self.wro1)
106 .field("wro2", &self.wro2)
107 .finish()
108 }
109}