1use crate::Sink;
2use core::fmt::{Debug, Formatter, Result as FmtResult};
3use core::pin::Pin;
4use core::task::{Context, Poll};
5
6#[must_use = "sinks do nothing unless polled"]
11pub struct Fanout<Si1, Si2> {
12 sink1: Si1,
13 sink2: Si2,
14}
15
16impl<Si1, Si2> Unpin for Fanout<Si1, Si2>
17where
18 Si1: Unpin,
19 Si2: Unpin,
20{
21}
22
23impl<Si1, Si2> Fanout<Si1, Si2> {
24 pub(super) fn new(sink1: Si1, sink2: Si2) -> Self {
25 Self { sink1, sink2 }
26 }
27
28 pub fn get_ref(&self) -> (&Si1, &Si2) {
30 (&self.sink1, &self.sink2)
31 }
32
33 pub fn get_mut(&mut self) -> (&mut Si1, &mut Si2) {
35 (&mut self.sink1, &mut self.sink2)
36 }
37
38 pub fn get_pin_mut(self: Pin<&mut Self>) -> (Pin<&mut Si1>, Pin<&mut Si2>) {
40 unsafe {
41 let this = self.get_unchecked_mut();
42 (
43 Pin::new_unchecked(&mut this.sink1),
44 Pin::new_unchecked(&mut this.sink2),
45 )
46 }
47 }
48
49 pub fn into_inner(self) -> (Si1, Si2) {
54 (self.sink1, self.sink2)
55 }
56}
57
58impl<Si1: Debug, Si2: Debug> Debug for Fanout<Si1, Si2> {
59 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
60 f.debug_struct("Fanout")
61 .field("sink1", &self.sink1)
62 .field("sink2", &self.sink2)
63 .finish()
64 }
65}
66
67impl<Si1, Si2, Item> Sink<Item> for Fanout<Si1, Si2>
68where
69 Si1: Sink<Item>,
70 Item: Clone,
71 Si2: Sink<Item, Error = Si1::Error>,
72{
73 type Error = Si1::Error;
74
75 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
76 let (sink1, sink2) = self.get_pin_mut();
77
78 let sink1_ready = match sink1.poll_ready(cx) {
79 Poll::Ready(Ok(())) => true,
80 Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
81 Poll::Pending => false,
82 };
83
84 let sink2_ready = match sink2.poll_ready(cx) {
85 Poll::Ready(Ok(())) => true,
86 Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
87 Poll::Pending => false,
88 };
89
90 if sink1_ready && sink2_ready {
91 Poll::Ready(Ok(()))
92 } else {
93 Poll::Pending
94 }
95 }
96
97 fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
98 let (sink1, sink2) = self.get_pin_mut();
99
100 sink1.start_send(item.clone())?;
101 sink2.start_send(item)?;
102 Ok(())
103 }
104
105 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
106 let (sink1, sink2) = self.get_pin_mut();
107
108 let sink1_ready = match sink1.poll_flush(cx) {
109 Poll::Ready(Ok(())) => true,
110 Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
111 Poll::Pending => false,
112 };
113
114 let sink2_ready = match sink2.poll_flush(cx) {
115 Poll::Ready(Ok(())) => true,
116 Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
117 Poll::Pending => false,
118 };
119
120 if sink1_ready && sink2_ready {
121 Poll::Ready(Ok(()))
122 } else {
123 Poll::Pending
124 }
125 }
126
127 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
128 let (sink1, sink2) = self.get_pin_mut();
129
130 let sink1_ready = match sink1.poll_close(cx) {
131 Poll::Ready(Ok(())) => true,
132 Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
133 Poll::Pending => false,
134 };
135 let sink2_ready = match sink2.poll_close(cx) {
136 Poll::Ready(Ok(())) => true,
137 Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
138 Poll::Pending => false,
139 };
140
141 if sink1_ready && sink2_ready {
142 Poll::Ready(Ok(()))
143 } else {
144 Poll::Pending
145 }
146 }
147}