utils_atomics/channel/
once.rs1use crate::flag::mpsc::*;
2use alloc::sync::{Arc, Weak};
3use core::cell::UnsafeCell;
4use docfg::docfg;
5
6struct Inner<T> {
7 v: UnsafeCell<Option<T>>,
8}
9unsafe impl<T: Send> Send for Inner<T> {}
10unsafe impl<T: Sync> Sync for Inner<T> {}
11
12pub struct Sender<T> {
14 inner: Weak<Inner<T>>,
15 flag: Flag,
16}
17
18pub struct Receiver<T> {
20 inner: Arc<Inner<T>>,
21 sub: Subscribe,
22}
23
24impl<T> Sender<T> {
25 #[inline]
27 pub fn send(self, t: T) {
28 let _: Result<(), T> = self.try_send(t);
29 }
30
31 pub fn try_send(self, t: T) -> Result<(), T> {
36 if let Some(inner) = self.inner.upgrade() {
37 unsafe { *inner.v.get() = Some(t) };
38 self.flag.mark();
39 return Ok(());
40 }
41 return Err(t);
42 }
43}
44
45impl<T> Receiver<T> {
46 #[inline]
49 pub fn wait(self) -> Option<T> {
50 self.sub.wait();
51 return unsafe { &mut *self.inner.v.get() }.take();
52 }
53
54 #[docfg(feature = "std")]
60 #[inline]
61 pub fn wait_timeout(&self, dur: core::time::Duration) -> Result<Option<T>, crate::Timeout> {
62 self.sub.wait_timeout(dur)?;
63 return Ok(unsafe { &mut *self.inner.v.get() }.take());
64 }
65}
66
67unsafe impl<T: Send> Send for Sender<T> {}
68unsafe impl<T: Send> Send for Receiver<T> {}
69unsafe impl<T: Send> Sync for Sender<T> {}
70unsafe impl<T: Send> Sync for Receiver<T> {}
71
72pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
74 let inner = Arc::new(Inner {
75 v: UnsafeCell::new(None),
76 });
77 let (flag, sub) = crate::flag::mpsc::flag();
78
79 return (
80 Sender {
81 inner: Arc::downgrade(&inner),
82 flag,
83 },
84 Receiver { inner, sub },
85 );
86}
87
88cfg_if::cfg_if! {
89 if #[cfg(feature = "futures")] {
90 #[cfg_attr(docsrs, doc(cfg(all(feature = "alloc", feature = "futures"))))]
92 pub struct AsyncSender<T> {
93 inner: Weak<Inner<T>>,
94 flag: AsyncFlag
95 }
96
97 pin_project_lite::pin_project! {
98 #[cfg_attr(docsrs, doc(cfg(all(feature = "alloc", feature = "futures"))))]
100 pub struct AsyncReceiver<T> {
101 inner: Arc<Inner<T>>,
102 #[pin]
103 sub: AsyncSubscribe
104 }
105 }
106
107 impl<T> AsyncSender<T> {
108 #[inline]
110 pub fn send (self, t: T) {
111 let _: Result<(), T> = self.try_send(t);
112 }
113
114 pub fn try_send(self, t: T) -> Result<(), T> {
119 if let Some(inner) = self.inner.upgrade() {
120 unsafe { *inner.v.get() = Some(t) };
121 self.flag.mark();
122 return Ok(());
123 }
124 return Err(t);
125 }
126 }
127
128 impl<T> futures::Future for AsyncReceiver<T> {
129 type Output = Option<T>;
130
131 #[inline]
132 fn poll(self: core::pin::Pin<&mut Self>, cx: &mut core::task::Context<'_>) -> core::task::Poll<Self::Output> {
133 let this = self.project();
134 if this.sub.poll(cx).is_ready() {
135 return core::task::Poll::Ready(unsafe { &mut *this.inner.v.get() }.take())
136 }
137 return core::task::Poll::Pending
138 }
139 }
140
141 impl<T> futures::future::FusedFuture for AsyncReceiver<T> {
142 #[inline]
143 fn is_terminated(&self) -> bool {
144 self.sub.is_terminated()
145 }
146 }
147
148 unsafe impl<T: Send> Send for AsyncSender<T> {}
149 unsafe impl<T: Send> Send for AsyncReceiver<T> {}
150 unsafe impl<T: Send> Sync for AsyncSender<T> {}
151 unsafe impl<T: Send> Sync for AsyncReceiver<T> {}
152
153 pub fn async_channel<T>() -> (AsyncSender<T>, AsyncReceiver<T>) {
155 let inner = Arc::new(Inner {
156 v: UnsafeCell::new(None),
157 });
158 let (flag, sub) = crate::flag::mpsc::async_flag();
159
160 return (
161 AsyncSender {
162 inner: Arc::downgrade(&inner),
163 flag,
164 },
165 AsyncReceiver { inner, sub },
166 );
167 }
168 }
169}
170
171#[cfg(test)]
172mod tests {
173 use super::*;
174
175 #[test]
176 fn test_send_receive() {
177 let (sender, receiver) = channel::<i32>();
178
179 sender.send(42);
180 let result = receiver.wait();
181
182 assert_eq!(result, Some(42));
183 }
184
185 #[test]
186 fn test_sender_dropped() {
187 let (sender, receiver) = channel::<i32>();
188
189 drop(sender);
190 let result = receiver.wait();
191
192 assert_eq!(result, None);
193 }
194
195 #[test]
196 fn test_try_send() {
197 let (sender, receiver) = channel::<i32>();
198
199 let result = sender.try_send(42);
200 assert!(result.is_ok());
201
202 let value = receiver.wait();
203 assert_eq!(value, Some(42));
204 }
205
206 #[test]
207 fn test_try_send_after_used() {
208 let (sender, receiver) = channel::<i32>();
209 drop(receiver);
210 let result = sender.try_send(43);
211
212 assert!(result.is_err());
213 assert_eq!(result.unwrap_err(), 43);
214 }
215
216 #[docfg(feature = "std")]
217 #[test]
218 fn test_try_receive_timeout() {
219 let (sender, receiver) = channel::<i32>();
220
221 let wait = std::thread::spawn(move || {
222 receiver.wait_timeout(core::time::Duration::from_millis(100))
223 });
224 std::thread::sleep(core::time::Duration::from_millis(200));
225 sender.send(2);
226
227 assert!(wait.join().unwrap().is_err())
228 }
229
230 #[cfg(feature = "futures")]
231 mod async_tests {
232 use super::*;
233 use tokio::runtime::Runtime;
234
235 #[test]
236 fn test_async_send_receive() {
237 let rt = Runtime::new().unwrap();
238 let (async_sender, async_receiver) = async_channel::<i32>();
239
240 async_sender.send(42);
241 let result = rt.block_on(async_receiver);
242
243 assert_eq!(result, Some(42));
244 }
245
246 #[test]
247 fn test_async_sender_dropped() {
248 let rt = Runtime::new().unwrap();
249 let (async_sender, async_receiver) = async_channel::<i32>();
250
251 drop(async_sender);
252 let result = rt.block_on(async_receiver);
253
254 assert_eq!(result, None);
255 }
256
257 #[test]
258 fn test_async_try_send() {
259 let rt = Runtime::new().unwrap();
260 let (async_sender, async_receiver) = async_channel::<i32>();
261
262 let result = async_sender.try_send(42);
263 assert!(result.is_ok());
264
265 let value = rt.block_on(async_receiver);
266 assert_eq!(value, Some(42));
267 }
268
269 #[test]
270 fn test_async_try_send_after_used() {
271 let rt = Runtime::new().unwrap();
272 let (async_sender, async_receiver) = async_channel::<i32>();
273
274 async_sender.send(42);
275 let value = rt.block_on(async_receiver);
276 assert_eq!(value, Some(42));
277 }
278 }
279}