Skip to main content

datex_core/channel/
mpsc.rs

1use cfg_if::cfg_if;
2use core::{clone::Clone, prelude::rust_2024::*};
3use futures_util::{SinkExt, StreamExt};
4
5#[cfg(not(feature = "std"))]
6pub use async_unsync::{
7    bounded::{Receiver as _Receiver, Sender as _Sender},
8    unbounded::{
9        UnboundedReceiver as _UnboundedReceiver,
10        UnboundedSender as _UnboundedSender,
11    },
12};
13#[cfg(feature = "std")]
14use futures::channel::mpsc::{
15    Receiver as _Receiver, Sender as _Sender,
16    UnboundedReceiver as _UnboundedReceiver,
17    UnboundedSender as _UnboundedSender,
18};
19
20#[derive(Debug)]
21pub struct Receiver<T>(_Receiver<T>);
22impl<T> Receiver<T> {
23    pub fn new(receiver: _Receiver<T>) -> Self {
24        Receiver(receiver)
25    }
26
27    pub async fn next(&mut self) -> Option<T> {
28        #[cfg(feature = "std")]
29        {
30            self.0.next().await
31        }
32        #[cfg(not(feature = "std"))]
33        {
34            self.0.recv().await
35        }
36    }
37}
38
39#[derive(Debug)]
40pub struct UnboundedReceiver<T>(_UnboundedReceiver<T>);
41impl<T> UnboundedReceiver<T> {
42    pub fn new(receiver: _UnboundedReceiver<T>) -> Self {
43        UnboundedReceiver(receiver)
44    }
45    pub async fn next(&mut self) -> Option<T> {
46        #[cfg(feature = "std")]
47        {
48            self.0.next().await
49        }
50        #[cfg(not(feature = "std"))]
51        {
52            self.0.recv().await
53        }
54    }
55}
56
57#[derive(Debug)]
58pub struct Sender<T>(_Sender<T>);
59
60impl<T> Clone for Sender<T> {
61    fn clone(&self) -> Self {
62        Sender(self.0.clone())
63    }
64}
65impl<T> Sender<T> {
66    pub fn new(sender: _Sender<T>) -> Self {
67        Sender(sender)
68    }
69
70    pub fn start_send(&mut self, item: T) -> Result<(), ()> {
71        #[cfg(feature = "std")]
72        {
73            self.0.start_send(item).map_err(|_| ())
74        }
75        #[cfg(not(feature = "std"))]
76        {
77            self.0.try_send(item).map_err(|_| ())
78        }
79    }
80
81    pub async fn send(&mut self, item: T) -> Result<(), ()> {
82        #[cfg(feature = "std")]
83        {
84            self.0.send(item).await.map_err(|_| ()).map(|_| ())
85        }
86        #[cfg(not(feature = "std"))]
87        {
88            self.0.send(item).await.map(|_| ()).map_err(|_| ())
89        }
90    }
91
92    pub fn close_channel(&mut self) {
93        #[cfg(feature = "std")]
94        {
95            self.0.close_channel();
96        }
97        #[cfg(not(feature = "std"))]
98        {}
99    }
100}
101
102#[derive(Debug)]
103pub struct UnboundedSender<T>(_UnboundedSender<T>);
104
105// FIXME #603: derive Clone?
106impl<T> Clone for UnboundedSender<T> {
107    fn clone(&self) -> Self {
108        UnboundedSender(self.0.clone())
109    }
110}
111
112impl<T> UnboundedSender<T> {
113    pub fn new(sender: _UnboundedSender<T>) -> Self {
114        UnboundedSender(sender)
115    }
116
117    pub fn start_send(&mut self, item: T) -> Result<(), ()> {
118        #[cfg(feature = "std")]
119        {
120            self.0.start_send(item).map_err(|_| ())
121        }
122        #[cfg(not(feature = "std"))]
123        {
124            self.0.send(item).map_err(|_| ())
125        }
126    }
127
128    pub async fn send(&mut self, item: T) -> Result<(), ()> {
129        #[cfg(feature = "std")]
130        {
131            self.0.send(item).await.map_err(|_| ()).map(|_| ())
132        }
133        #[cfg(not(feature = "std"))]
134        {
135            self.0.send(item).map(|_| ()).map_err(|_| ())
136        }
137    }
138
139    pub fn close_channel(&self) {
140        #[cfg(feature = "std")]
141        {
142            self.0.close_channel();
143        }
144        #[cfg(not(feature = "std"))]
145        {}
146    }
147}
148
149cfg_if! {
150    if #[cfg(feature = "std")] {
151        pub fn create_bounded_channel<T>(capacity: usize) -> (Sender<T>, Receiver<T>) {
152            let (sender, receiver) = futures::channel::mpsc::channel::<T>(capacity);
153            (Sender::new(sender), Receiver::new(receiver))
154        }
155        pub fn create_unbounded_channel<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
156            let (sender, receiver) = futures::channel::mpsc::unbounded::<T>();
157            (UnboundedSender::new(sender), UnboundedReceiver::new(receiver))
158        }
159    }
160    else {
161        pub fn create_bounded_channel<T>(capacity: usize) -> (Sender<T>, Receiver<T>) {
162            let (sender, receiver) = async_unsync::bounded::channel::<T>(capacity).into_split();
163            (Sender::new(sender), Receiver::new(receiver))
164        }
165        pub fn create_unbounded_channel<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
166            let (sender, receiver) = async_unsync::unbounded::channel::<T>().into_split();
167            (UnboundedSender::new(sender), UnboundedReceiver::new(receiver))
168        }
169    }
170}