1use crate::Sink;
2use core::fmt::{Debug, Formatter, Result as FmtResult};
3use core::pin::Pin;
4use core::task::{Context, Poll};
5
6#[must_use = "sinks do nothing unless polled"]
11pub struct Fanout<Si1, Si2> {
12 sink1: Si1,
13 sink2: Si2,
14}
15
16impl<Si1, Si2> Unpin for Fanout<Si1, Si2>
17where
18 Si1: Unpin,
19 Si2: Unpin,
20{
21}
22
23impl<Si1, Si2> Fanout<Si1, Si2> {
24 pub(super) fn new(sink1: Si1, sink2: Si2) -> Self {
25 Self { sink1, sink2 }
26 }
27
28 pub fn get_ref(&self) -> (&Si1, &Si2) {
30 (&self.sink1, &self.sink2)
31 }
32
33 pub fn get_mut(&mut self) -> (&mut Si1, &mut Si2) {
35 (&mut self.sink1, &mut self.sink2)
36 }
37
38 pub fn get_pin_mut(self: Pin<&mut Self>) -> (Pin<&mut Si1>, Pin<&mut Si2>) {
40 unsafe {
41 let this = self.get_unchecked_mut();
42 (
43 Pin::new_unchecked(&mut this.sink1),
44 Pin::new_unchecked(&mut this.sink2),
45 )
46 }
47 }
48
49 pub fn into_inner(self) -> (Si1, Si2) {
54 (self.sink1, self.sink2)
55 }
56}
57
58impl<Si1: Debug, Si2: Debug> Debug for Fanout<Si1, Si2> {
59 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
60 f.debug_struct("Fanout")
61 .field("sink1", &self.sink1)
62 .field("sink2", &self.sink2)
63 .finish()
64 }
65}
66
67impl<Si1, Si2, Item> Sink<Item> for Fanout<Si1, Si2>
68where
69 Si1: Sink<Item>,
70 Item: Clone,
71 Si2: Sink<Item, Error = Si1::Error>,
72{
73 type Error = Si1::Error;
74
75 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
76 let (sink1, sink2) = self.get_pin_mut();
77
78 match (sink2.poll_ready(cx), sink1.poll_ready(cx)) {
79 (Poll::Ready(Ok(())), Poll::Ready(Ok(()))) => Poll::Ready(Ok(())),
80 (Poll::Ready(Err(e)), _) | (_, Poll::Ready(Err(e))) => Poll::Ready(Err(e)),
81 (_, Poll::Pending) | (Poll::Pending, _) => Poll::Pending,
82 }
83 }
84
85 fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
86 let (sink1, sink2) = self.get_pin_mut();
87
88 match (sink1.start_send(item.clone()), sink2.start_send(item)) {
89 (Ok(()), Ok(())) => Ok(()),
90 (Err(e), _) | (_, Err(e)) => Err(e),
91 }
92 }
93
94 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
95 let (sink1, sink2) = self.get_pin_mut();
96
97 match (sink2.poll_flush(cx), sink1.poll_flush(cx)) {
98 (Poll::Ready(Ok(())), Poll::Ready(Ok(()))) => Poll::Ready(Ok(())),
99 (Poll::Ready(Err(e)), _) | (_, Poll::Ready(Err(e))) => Poll::Ready(Err(e)),
100 (_, Poll::Pending) | (Poll::Pending, _) => Poll::Pending,
101 }
102 }
103
104 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
105 let (sink1, sink2) = self.get_pin_mut();
106
107 match (sink2.poll_close(cx), sink1.poll_close(cx)) {
108 (Poll::Ready(Ok(())), Poll::Ready(Ok(()))) => Poll::Ready(Ok(())),
109 (Poll::Ready(Err(e)), _) | (_, Poll::Ready(Err(e))) => Poll::Ready(Err(e)),
110 (_, Poll::Pending) | (Poll::Pending, _) => Poll::Pending,
111 }
112 }
113}