utils_atomics/channel/
once.rs

1use crate::flag::mpsc::*;
2use alloc::sync::{Arc, Weak};
3use core::cell::UnsafeCell;
4use docfg::docfg;
5
6struct Inner<T> {
7    v: UnsafeCell<Option<T>>,
8}
9unsafe impl<T: Send> Send for Inner<T> {}
10unsafe impl<T: Sync> Sync for Inner<T> {}
11
12/// A channel sender that can only send a single value
13pub struct Sender<T> {
14    inner: Weak<Inner<T>>,
15    flag: Flag,
16}
17
18/// A channel receiver that can only receive a single value
19pub struct Receiver<T> {
20    inner: Arc<Inner<T>>,
21    sub: Subscribe,
22}
23
24impl<T> Sender<T> {
25    /// Sends the value through the channel. If the channel is already closed, the error will be ignored.
26    #[inline]
27    pub fn send(self, t: T) {
28        let _: Result<(), T> = self.try_send(t);
29    }
30
31    /// Attempts to send the value through the channel, returning `Ok` if successfull, and `Err(t)` otherwise.
32    ///
33    /// # Errors
34    /// This method returns an error if the channel has already been used or closed.
35    pub fn try_send(self, t: T) -> Result<(), T> {
36        if let Some(inner) = self.inner.upgrade() {
37            unsafe { *inner.v.get() = Some(t) };
38            self.flag.mark();
39            return Ok(());
40        }
41        return Err(t);
42    }
43}
44
45impl<T> Receiver<T> {
46    /// Blocks the current thread until the value is received.
47    /// If [`Sender`] is dropped before it sends the value, this method returns `None`.
48    #[inline]
49    pub fn wait(self) -> Option<T> {
50        self.sub.wait();
51        return unsafe { &mut *self.inner.v.get() }.take();
52    }
53
54    /// Blocks the current thread until the value is received.
55    /// If [`Sender`] is dropped before it sends the value, this method returns `None`.
56    ///
57    /// # Errors
58    /// This method returns an error if the wait didn't conclude before the specified duration
59    #[docfg(feature = "std")]
60    #[inline]
61    pub fn wait_timeout(&self, dur: core::time::Duration) -> Result<Option<T>, crate::Timeout> {
62        self.sub.wait_timeout(dur)?;
63        return Ok(unsafe { &mut *self.inner.v.get() }.take());
64    }
65}
66
67unsafe impl<T: Send> Send for Sender<T> {}
68unsafe impl<T: Send> Send for Receiver<T> {}
69unsafe impl<T: Send> Sync for Sender<T> {}
70unsafe impl<T: Send> Sync for Receiver<T> {}
71
72/// Creates a new single-value channel
73pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
74    let inner = Arc::new(Inner {
75        v: UnsafeCell::new(None),
76    });
77    let (flag, sub) = crate::flag::mpsc::flag();
78
79    return (
80        Sender {
81            inner: Arc::downgrade(&inner),
82            flag,
83        },
84        Receiver { inner, sub },
85    );
86}
87
88cfg_if::cfg_if! {
89    if #[cfg(feature = "futures")] {
90        /// An asynchronous channel sender that can only send a single value
91        #[cfg_attr(docsrs, doc(cfg(all(feature = "alloc", feature = "futures"))))]
92        pub struct AsyncSender<T> {
93            inner: Weak<Inner<T>>,
94            flag: AsyncFlag
95        }
96
97        pin_project_lite::pin_project! {
98            /// An asynchronous channel receiver that can only receive a single value
99            #[cfg_attr(docsrs, doc(cfg(all(feature = "alloc", feature = "futures"))))]
100            pub struct AsyncReceiver<T> {
101                inner: Arc<Inner<T>>,
102                #[pin]
103                sub: AsyncSubscribe
104            }
105        }
106
107        impl<T> AsyncSender<T> {
108            /// Sends the value through the channel.
109            #[inline]
110            pub fn send (self, t: T) {
111                let _: Result<(), T> = self.try_send(t);
112            }
113
114            /// Attempts to send the value through the channel, returning `Ok` if successfull, and `Err(t)` otherwise.
115            ///
116            /// # Errors
117            /// This method returns an error if the channel has already been used or closed.
118            pub fn try_send(self, t: T) -> Result<(), T> {
119                if let Some(inner) = self.inner.upgrade() {
120                    unsafe { *inner.v.get() = Some(t) };
121                    self.flag.mark();
122                    return Ok(());
123                }
124                return Err(t);
125            }
126        }
127
128        impl<T> futures::Future for AsyncReceiver<T> {
129            type Output = Option<T>;
130
131            #[inline]
132            fn poll(self: core::pin::Pin<&mut Self>, cx: &mut core::task::Context<'_>) -> core::task::Poll<Self::Output> {
133                let this = self.project();
134                if this.sub.poll(cx).is_ready() {
135                    return core::task::Poll::Ready(unsafe { &mut *this.inner.v.get() }.take())
136                }
137                return core::task::Poll::Pending
138            }
139        }
140
141        impl<T> futures::future::FusedFuture for AsyncReceiver<T> {
142            #[inline]
143            fn is_terminated(&self) -> bool {
144                self.sub.is_terminated()
145            }
146        }
147
148        unsafe impl<T: Send> Send for AsyncSender<T> {}
149        unsafe impl<T: Send> Send for AsyncReceiver<T> {}
150        unsafe impl<T: Send> Sync for AsyncSender<T> {}
151        unsafe impl<T: Send> Sync for AsyncReceiver<T> {}
152
153        /// Creates a new async and single-value channel
154        pub fn async_channel<T>() -> (AsyncSender<T>, AsyncReceiver<T>) {
155            let inner = Arc::new(Inner {
156                v: UnsafeCell::new(None),
157            });
158            let (flag, sub) = crate::flag::mpsc::async_flag();
159
160            return (
161                AsyncSender {
162                    inner: Arc::downgrade(&inner),
163                    flag,
164                },
165                AsyncReceiver { inner, sub },
166            );
167        }
168    }
169}
170
171#[cfg(test)]
172mod tests {
173    use super::*;
174
175    #[test]
176    fn test_send_receive() {
177        let (sender, receiver) = channel::<i32>();
178
179        sender.send(42);
180        let result = receiver.wait();
181
182        assert_eq!(result, Some(42));
183    }
184
185    #[test]
186    fn test_sender_dropped() {
187        let (sender, receiver) = channel::<i32>();
188
189        drop(sender);
190        let result = receiver.wait();
191
192        assert_eq!(result, None);
193    }
194
195    #[test]
196    fn test_try_send() {
197        let (sender, receiver) = channel::<i32>();
198
199        let result = sender.try_send(42);
200        assert!(result.is_ok());
201
202        let value = receiver.wait();
203        assert_eq!(value, Some(42));
204    }
205
206    #[test]
207    fn test_try_send_after_used() {
208        let (sender, receiver) = channel::<i32>();
209        drop(receiver);
210        let result = sender.try_send(43);
211
212        assert!(result.is_err());
213        assert_eq!(result.unwrap_err(), 43);
214    }
215
216    #[docfg(feature = "std")]
217    #[test]
218    fn test_try_receive_timeout() {
219        let (sender, receiver) = channel::<i32>();
220
221        let wait = std::thread::spawn(move || {
222            receiver.wait_timeout(core::time::Duration::from_millis(100))
223        });
224        std::thread::sleep(core::time::Duration::from_millis(200));
225        sender.send(2);
226
227        assert!(wait.join().unwrap().is_err())
228    }
229
230    #[cfg(feature = "futures")]
231    mod async_tests {
232        use super::*;
233        use tokio::runtime::Runtime;
234
235        #[test]
236        fn test_async_send_receive() {
237            let rt = Runtime::new().unwrap();
238            let (async_sender, async_receiver) = async_channel::<i32>();
239
240            async_sender.send(42);
241            let result = rt.block_on(async_receiver);
242
243            assert_eq!(result, Some(42));
244        }
245
246        #[test]
247        fn test_async_sender_dropped() {
248            let rt = Runtime::new().unwrap();
249            let (async_sender, async_receiver) = async_channel::<i32>();
250
251            drop(async_sender);
252            let result = rt.block_on(async_receiver);
253
254            assert_eq!(result, None);
255        }
256
257        #[test]
258        fn test_async_try_send() {
259            let rt = Runtime::new().unwrap();
260            let (async_sender, async_receiver) = async_channel::<i32>();
261
262            let result = async_sender.try_send(42);
263            assert!(result.is_ok());
264
265            let value = rt.block_on(async_receiver);
266            assert_eq!(value, Some(42));
267        }
268
269        #[test]
270        fn test_async_try_send_after_used() {
271            let rt = Runtime::new().unwrap();
272            let (async_sender, async_receiver) = async_channel::<i32>();
273
274            async_sender.send(42);
275            let value = rt.block_on(async_receiver);
276            assert_eq!(value, Some(42));
277        }
278    }
279}