datex_core/channel/
mpsc.rs1use cfg_if::cfg_if;
2use core::{clone::Clone, prelude::rust_2024::*};
3use futures_util::{SinkExt, StreamExt};
4
5#[cfg(not(feature = "std"))]
6pub use async_unsync::{
7 bounded::{Receiver as _Receiver, Sender as _Sender},
8 unbounded::{
9 UnboundedReceiver as _UnboundedReceiver,
10 UnboundedSender as _UnboundedSender,
11 },
12};
13#[cfg(feature = "std")]
14use futures::channel::mpsc::{
15 Receiver as _Receiver, Sender as _Sender,
16 UnboundedReceiver as _UnboundedReceiver,
17 UnboundedSender as _UnboundedSender,
18};
19
20#[derive(Debug)]
21pub struct Receiver<T>(_Receiver<T>);
22impl<T> Receiver<T> {
23 pub fn new(receiver: _Receiver<T>) -> Self {
24 Receiver(receiver)
25 }
26
27 pub async fn next(&mut self) -> Option<T> {
28 #[cfg(feature = "std")]
29 {
30 self.0.next().await
31 }
32 #[cfg(not(feature = "std"))]
33 {
34 self.0.recv().await
35 }
36 }
37}
38
39#[derive(Debug)]
40pub struct UnboundedReceiver<T>(_UnboundedReceiver<T>);
41impl<T> UnboundedReceiver<T> {
42 pub fn new(receiver: _UnboundedReceiver<T>) -> Self {
43 UnboundedReceiver(receiver)
44 }
45 pub async fn next(&mut self) -> Option<T> {
46 #[cfg(feature = "std")]
47 {
48 self.0.next().await
49 }
50 #[cfg(not(feature = "std"))]
51 {
52 self.0.recv().await
53 }
54 }
55}
56
57#[derive(Debug)]
58pub struct Sender<T>(_Sender<T>);
59
60impl<T> Clone for Sender<T> {
61 fn clone(&self) -> Self {
62 Sender(self.0.clone())
63 }
64}
65impl<T> Sender<T> {
66 pub fn new(sender: _Sender<T>) -> Self {
67 Sender(sender)
68 }
69
70 pub fn start_send(&mut self, item: T) -> Result<(), ()> {
71 #[cfg(feature = "std")]
72 {
73 self.0.start_send(item).map_err(|_| ())
74 }
75 #[cfg(not(feature = "std"))]
76 {
77 self.0.try_send(item).map_err(|_| ())
78 }
79 }
80
81 pub async fn send(&mut self, item: T) -> Result<(), ()> {
82 #[cfg(feature = "std")]
83 {
84 self.0.send(item).await.map_err(|_| ()).map(|_| ())
85 }
86 #[cfg(not(feature = "std"))]
87 {
88 self.0.send(item).await.map(|_| ()).map_err(|_| ())
89 }
90 }
91
92 pub fn close_channel(&mut self) {
93 #[cfg(feature = "std")]
94 {
95 self.0.close_channel();
96 }
97 #[cfg(not(feature = "std"))]
98 {}
99 }
100}
101
102#[derive(Debug)]
103pub struct UnboundedSender<T>(_UnboundedSender<T>);
104
105impl<T> Clone for UnboundedSender<T> {
107 fn clone(&self) -> Self {
108 UnboundedSender(self.0.clone())
109 }
110}
111
112impl<T> UnboundedSender<T> {
113 pub fn new(sender: _UnboundedSender<T>) -> Self {
114 UnboundedSender(sender)
115 }
116
117 pub fn start_send(&mut self, item: T) -> Result<(), ()> {
118 #[cfg(feature = "std")]
119 {
120 self.0.start_send(item).map_err(|_| ())
121 }
122 #[cfg(not(feature = "std"))]
123 {
124 self.0.send(item).map_err(|_| ())
125 }
126 }
127
128 pub async fn send(&mut self, item: T) -> Result<(), ()> {
129 #[cfg(feature = "std")]
130 {
131 self.0.send(item).await.map_err(|_| ()).map(|_| ())
132 }
133 #[cfg(not(feature = "std"))]
134 {
135 self.0.send(item).map(|_| ()).map_err(|_| ())
136 }
137 }
138
139 pub fn close_channel(&self) {
140 #[cfg(feature = "std")]
141 {
142 self.0.close_channel();
143 }
144 #[cfg(not(feature = "std"))]
145 {}
146 }
147}
148
149cfg_if! {
150 if #[cfg(feature = "std")] {
151 pub fn create_bounded_channel<T>(capacity: usize) -> (Sender<T>, Receiver<T>) {
152 let (sender, receiver) = futures::channel::mpsc::channel::<T>(capacity);
153 (Sender::new(sender), Receiver::new(receiver))
154 }
155 pub fn create_unbounded_channel<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
156 let (sender, receiver) = futures::channel::mpsc::unbounded::<T>();
157 (UnboundedSender::new(sender), UnboundedReceiver::new(receiver))
158 }
159 }
160 else {
161 pub fn create_bounded_channel<T>(capacity: usize) -> (Sender<T>, Receiver<T>) {
162 let (sender, receiver) = async_unsync::bounded::channel::<T>(capacity).into_split();
163 (Sender::new(sender), Receiver::new(receiver))
164 }
165 pub fn create_unbounded_channel<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
166 let (sender, receiver) = async_unsync::unbounded::channel::<T>().into_split();
167 (UnboundedSender::new(sender), UnboundedReceiver::new(receiver))
168 }
169 }
170}