uactor/data/
data_publisher.rs

1#[cfg(feature = "async_sender")]
2pub use async_sender::*;
3
4#[cfg(feature = "async_sender")]
5mod async_sender {
6    use tokio::sync::mpsc::{Sender, UnboundedSender};
7    use tokio::sync::{broadcast, mpsc, oneshot, watch};
8
9    pub trait DataPublisher: TryClone {
10        type Item;
11        fn publish(
12            &self,
13            data: Self::Item,
14        ) -> impl std::future::Future<Output = DataPublisherResult> + Send;
15    }
16
17    #[derive(thiserror::Error, Debug)]
18    pub enum DataPublisherErrors {
19        #[error("Channel closed")]
20        Closed,
21    }
22
23    pub trait TryClone: Sized {
24        fn try_clone(&self) -> Result<Self, TryCloneError>;
25    }
26
27    impl<T: ?Sized> TryClone for &T {
28        #[inline(always)]
29        fn try_clone(&self) -> Result<Self, TryCloneError> {
30            Ok(*self)
31        }
32    }
33
34    #[derive(thiserror::Error, Debug)]
35    pub enum TryCloneError {
36        #[error("Can't be cloned")]
37        CantClone,
38    }
39
40    impl Clone for TryCloneError {
41        fn clone(&self) -> Self {
42            todo!()
43        }
44    }
45
46    impl<T> From<mpsc::error::SendError<T>> for DataPublisherErrors {
47        fn from(_: mpsc::error::SendError<T>) -> Self {
48            DataPublisherErrors::Closed
49        }
50    }
51
52    impl From<oneshot::error::RecvError> for DataPublisherErrors {
53        fn from(_: oneshot::error::RecvError) -> Self {
54            DataPublisherErrors::Closed
55        }
56    }
57
58    pub type DataPublisherResult = Result<(), DataPublisherErrors>;
59
60    impl<T> DataPublisher for mpsc::Sender<T>
61    where
62        T: Send,
63    {
64        type Item = T;
65
66        async fn publish(&self, data: Self::Item) -> DataPublisherResult {
67            self.send(data).await.map_err(DataPublisherErrors::from)
68        }
69    }
70
71    impl<T> TryClone for Sender<T>
72    where
73        T: Send,
74    {
75        fn try_clone(&self) -> Result<Self, TryCloneError> {
76            Ok(self.clone())
77        }
78    }
79
80    impl<T> DataPublisher for mpsc::UnboundedSender<T>
81    where
82        T: Send,
83    {
84        type Item = T;
85
86        async fn publish(&self, data: Self::Item) -> DataPublisherResult {
87            self.send(data).map_err(DataPublisherErrors::from)
88        }
89    }
90
91    impl<T> TryClone for UnboundedSender<T>
92    where
93        T: Send,
94    {
95        fn try_clone(&self) -> Result<Self, TryCloneError> {
96            Ok(self.clone())
97        }
98    }
99
100    impl<T> DataPublisher for watch::Sender<T>
101    where
102        T: Send + Sync,
103    {
104        type Item = T;
105
106        async fn publish(&self, data: Self::Item) -> DataPublisherResult {
107            self.send(data).map_err(|_| DataPublisherErrors::Closed)
108        }
109    }
110
111    impl<T> TryClone for watch::Sender<T>
112    where
113        T: Send,
114    {
115        fn try_clone(&self) -> Result<Self, TryCloneError> {
116            Err(TryCloneError::CantClone)
117        }
118    }
119
120    impl<T> DataPublisher for broadcast::Sender<T>
121    where
122        T: Send + Sync,
123    {
124        type Item = T;
125
126        async fn publish(&self, data: Self::Item) -> DataPublisherResult {
127            self.send(data)
128                .map(|_receivers_count| ())
129                .map_err(|_| DataPublisherErrors::Closed)
130        }
131    }
132
133    impl<T> TryClone for broadcast::Sender<T>
134    where
135        T: Send,
136    {
137        fn try_clone(&self) -> Result<Self, TryCloneError> {
138            Ok(self.clone())
139        }
140    }
141}
142
143#[cfg(not(feature = "async_sender"))]
144pub use sync_sender::*;
145
146#[cfg(not(feature = "async_sender"))]
147mod sync_sender {
148    use tokio::sync::{broadcast, mpsc, oneshot, watch};
149
150    pub trait DataPublisher: TryClone {
151        type Item;
152        fn publish(&self, data: Self::Item) -> DataPublisherResult;
153    }
154
155    #[derive(thiserror::Error, Debug)]
156    pub enum DataPublisherErrors {
157        #[error("Channel closed")]
158        Closed,
159    }
160
161    pub trait TryClone: Sized {
162        fn try_clone(&self) -> Result<Self, TryCloneError>;
163    }
164
165    #[derive(thiserror::Error, Debug)]
166    pub enum TryCloneError {
167        #[error("Can't be cloned")]
168        CantClone,
169    }
170
171    impl Clone for TryCloneError {
172        fn clone(&self) -> Self {
173            todo!()
174        }
175    }
176
177    impl<T> From<mpsc::error::SendError<T>> for DataPublisherErrors {
178        fn from(_: mpsc::error::SendError<T>) -> Self {
179            DataPublisherErrors::Closed
180        }
181    }
182
183    impl From<oneshot::error::RecvError> for DataPublisherErrors {
184        fn from(_: oneshot::error::RecvError) -> Self {
185            DataPublisherErrors::Closed
186        }
187    }
188
189    pub type DataPublisherResult = Result<(), DataPublisherErrors>;
190
191    impl<T> DataPublisher for mpsc::UnboundedSender<T>
192    where
193        T: Send,
194    {
195        type Item = T;
196
197        fn publish(&self, data: Self::Item) -> DataPublisherResult {
198            self.send(data).map_err(DataPublisherErrors::from)
199        }
200    }
201
202    impl<T> TryClone for mpsc::UnboundedSender<T>
203    where
204        T: Send,
205    {
206        fn try_clone(&self) -> Result<Self, TryCloneError> {
207            Ok(self.clone())
208        }
209    }
210
211    impl<T> DataPublisher for watch::Sender<T>
212    where
213        T: Send + Sync,
214    {
215        type Item = T;
216
217        fn publish(&self, data: Self::Item) -> DataPublisherResult {
218            self.send(data).map_err(|_| DataPublisherErrors::Closed)
219        }
220    }
221
222    impl<T> TryClone for watch::Sender<T>
223    where
224        T: Send,
225    {
226        fn try_clone(&self) -> Result<Self, TryCloneError> {
227            Err(TryCloneError::CantClone)
228        }
229    }
230
231    impl<T> DataPublisher for broadcast::Sender<T>
232    where
233        T: Send + Sync,
234    {
235        type Item = T;
236
237        fn publish(&self, data: Self::Item) -> DataPublisherResult {
238            self.send(data)
239                .map(|_receivers_count| ())
240                .map_err(|_| DataPublisherErrors::Closed)
241        }
242    }
243
244    impl<T> TryClone for broadcast::Sender<T>
245    where
246        T: Send,
247    {
248        fn try_clone(&self) -> Result<Self, TryCloneError> {
249            Ok(self.clone())
250        }
251    }
252}