uactor/data/
data_publisher.rs1#[cfg(feature = "async_sender")]
2pub use async_sender::*;
3
4#[cfg(feature = "async_sender")]
5mod async_sender {
6 use tokio::sync::mpsc::{Sender, UnboundedSender};
7 use tokio::sync::{broadcast, mpsc, oneshot, watch};
8
9 pub trait DataPublisher: TryClone {
10 type Item;
11 fn publish(
12 &self,
13 data: Self::Item,
14 ) -> impl std::future::Future<Output = DataPublisherResult> + Send;
15 }
16
17 #[derive(thiserror::Error, Debug)]
18 pub enum DataPublisherErrors {
19 #[error("Channel closed")]
20 Closed,
21 }
22
23 pub trait TryClone: Sized {
24 fn try_clone(&self) -> Result<Self, TryCloneError>;
25 }
26
27 impl<T: ?Sized> TryClone for &T {
28 #[inline(always)]
29 fn try_clone(&self) -> Result<Self, TryCloneError> {
30 Ok(*self)
31 }
32 }
33
34 #[derive(thiserror::Error, Debug)]
35 pub enum TryCloneError {
36 #[error("Can't be cloned")]
37 CantClone,
38 }
39
40 impl Clone for TryCloneError {
41 fn clone(&self) -> Self {
42 todo!()
43 }
44 }
45
46 impl<T> From<mpsc::error::SendError<T>> for DataPublisherErrors {
47 fn from(_: mpsc::error::SendError<T>) -> Self {
48 DataPublisherErrors::Closed
49 }
50 }
51
52 impl From<oneshot::error::RecvError> for DataPublisherErrors {
53 fn from(_: oneshot::error::RecvError) -> Self {
54 DataPublisherErrors::Closed
55 }
56 }
57
58 pub type DataPublisherResult = Result<(), DataPublisherErrors>;
59
60 impl<T> DataPublisher for mpsc::Sender<T>
61 where
62 T: Send,
63 {
64 type Item = T;
65
66 async fn publish(&self, data: Self::Item) -> DataPublisherResult {
67 self.send(data).await.map_err(DataPublisherErrors::from)
68 }
69 }
70
71 impl<T> TryClone for Sender<T>
72 where
73 T: Send,
74 {
75 fn try_clone(&self) -> Result<Self, TryCloneError> {
76 Ok(self.clone())
77 }
78 }
79
80 impl<T> DataPublisher for mpsc::UnboundedSender<T>
81 where
82 T: Send,
83 {
84 type Item = T;
85
86 async fn publish(&self, data: Self::Item) -> DataPublisherResult {
87 self.send(data).map_err(DataPublisherErrors::from)
88 }
89 }
90
91 impl<T> TryClone for UnboundedSender<T>
92 where
93 T: Send,
94 {
95 fn try_clone(&self) -> Result<Self, TryCloneError> {
96 Ok(self.clone())
97 }
98 }
99
100 impl<T> DataPublisher for watch::Sender<T>
101 where
102 T: Send + Sync,
103 {
104 type Item = T;
105
106 async fn publish(&self, data: Self::Item) -> DataPublisherResult {
107 self.send(data).map_err(|_| DataPublisherErrors::Closed)
108 }
109 }
110
111 impl<T> TryClone for watch::Sender<T>
112 where
113 T: Send,
114 {
115 fn try_clone(&self) -> Result<Self, TryCloneError> {
116 Err(TryCloneError::CantClone)
117 }
118 }
119
120 impl<T> DataPublisher for broadcast::Sender<T>
121 where
122 T: Send + Sync,
123 {
124 type Item = T;
125
126 async fn publish(&self, data: Self::Item) -> DataPublisherResult {
127 self.send(data)
128 .map(|_receivers_count| ())
129 .map_err(|_| DataPublisherErrors::Closed)
130 }
131 }
132
133 impl<T> TryClone for broadcast::Sender<T>
134 where
135 T: Send,
136 {
137 fn try_clone(&self) -> Result<Self, TryCloneError> {
138 Ok(self.clone())
139 }
140 }
141}
142
143#[cfg(not(feature = "async_sender"))]
144pub use sync_sender::*;
145
146#[cfg(not(feature = "async_sender"))]
147mod sync_sender {
148 use tokio::sync::{broadcast, mpsc, oneshot, watch};
149
150 pub trait DataPublisher: TryClone {
151 type Item;
152 fn publish(&self, data: Self::Item) -> DataPublisherResult;
153 }
154
155 #[derive(thiserror::Error, Debug)]
156 pub enum DataPublisherErrors {
157 #[error("Channel closed")]
158 Closed,
159 }
160
161 pub trait TryClone: Sized {
162 fn try_clone(&self) -> Result<Self, TryCloneError>;
163 }
164
165 #[derive(thiserror::Error, Debug)]
166 pub enum TryCloneError {
167 #[error("Can't be cloned")]
168 CantClone,
169 }
170
171 impl Clone for TryCloneError {
172 fn clone(&self) -> Self {
173 todo!()
174 }
175 }
176
177 impl<T> From<mpsc::error::SendError<T>> for DataPublisherErrors {
178 fn from(_: mpsc::error::SendError<T>) -> Self {
179 DataPublisherErrors::Closed
180 }
181 }
182
183 impl From<oneshot::error::RecvError> for DataPublisherErrors {
184 fn from(_: oneshot::error::RecvError) -> Self {
185 DataPublisherErrors::Closed
186 }
187 }
188
189 pub type DataPublisherResult = Result<(), DataPublisherErrors>;
190
191 impl<T> DataPublisher for mpsc::UnboundedSender<T>
192 where
193 T: Send,
194 {
195 type Item = T;
196
197 fn publish(&self, data: Self::Item) -> DataPublisherResult {
198 self.send(data).map_err(DataPublisherErrors::from)
199 }
200 }
201
202 impl<T> TryClone for mpsc::UnboundedSender<T>
203 where
204 T: Send,
205 {
206 fn try_clone(&self) -> Result<Self, TryCloneError> {
207 Ok(self.clone())
208 }
209 }
210
211 impl<T> DataPublisher for watch::Sender<T>
212 where
213 T: Send + Sync,
214 {
215 type Item = T;
216
217 fn publish(&self, data: Self::Item) -> DataPublisherResult {
218 self.send(data).map_err(|_| DataPublisherErrors::Closed)
219 }
220 }
221
222 impl<T> TryClone for watch::Sender<T>
223 where
224 T: Send,
225 {
226 fn try_clone(&self) -> Result<Self, TryCloneError> {
227 Err(TryCloneError::CantClone)
228 }
229 }
230
231 impl<T> DataPublisher for broadcast::Sender<T>
232 where
233 T: Send + Sync,
234 {
235 type Item = T;
236
237 fn publish(&self, data: Self::Item) -> DataPublisherResult {
238 self.send(data)
239 .map(|_receivers_count| ())
240 .map_err(|_| DataPublisherErrors::Closed)
241 }
242 }
243
244 impl<T> TryClone for broadcast::Sender<T>
245 where
246 T: Send,
247 {
248 fn try_clone(&self) -> Result<Self, TryCloneError> {
249 Ok(self.clone())
250 }
251 }
252}