moduvex_runtime/sync/
mpsc.rs1use std::collections::VecDeque;
8use std::future::Future;
9use std::pin::Pin;
10use std::sync::{Arc, Mutex};
11use std::task::{Context, Poll, Waker};
12
13struct Inner<T> {
16 queue: VecDeque<T>,
18 capacity: Option<usize>,
20 sender_count: usize,
22 recv_waker: Option<Waker>,
24 send_wakers: VecDeque<Waker>,
26}
27
28impl<T> Inner<T> {
29 fn new(capacity: Option<usize>) -> Self {
30 Self {
31 queue: VecDeque::new(),
32 capacity,
33 sender_count: 1,
34 recv_waker: None,
35 send_wakers: VecDeque::new(),
36 }
37 }
38
39 fn has_capacity(&self) -> bool {
41 match self.capacity {
42 None => true,
43 Some(cap) => self.queue.len() < cap,
44 }
45 }
46
47 fn is_closed(&self) -> bool {
49 self.sender_count == 0
50 }
51}
52
53pub fn channel<T>(capacity: usize) -> (Sender<T>, Receiver<T>) {
60 let inner = Arc::new(Mutex::new(Inner::new(Some(capacity.max(1)))));
61 (
62 Sender {
63 inner: inner.clone(),
64 },
65 Receiver { inner },
66 )
67}
68
69pub struct Sender<T> {
73 inner: Arc<Mutex<Inner<T>>>,
74}
75
76impl<T> Clone for Sender<T> {
77 fn clone(&self) -> Self {
78 self.inner.lock().unwrap().sender_count += 1;
79 Self {
80 inner: self.inner.clone(),
81 }
82 }
83}
84
85impl<T> Drop for Sender<T> {
86 fn drop(&mut self) {
87 let mut g = self.inner.lock().unwrap();
88 g.sender_count -= 1;
89 if g.sender_count == 0 {
90 if let Some(w) = g.recv_waker.take() {
92 drop(g);
93 w.wake();
94 }
95 }
96 }
97}
98
99impl<T> Sender<T> {
100 pub fn send(&self, value: T) -> SendFuture<'_, T> {
104 SendFuture {
105 inner: &self.inner,
106 value: Some(value),
107 }
108 }
109}
110
111pub struct SendFuture<'a, T> {
113 inner: &'a Arc<Mutex<Inner<T>>>,
114 value: Option<T>,
116}
117
118impl<T> Future for SendFuture<'_, T> {
119 type Output = Result<(), T>;
120
121 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
122 let this = unsafe { self.get_unchecked_mut() };
126 let mut g = this.inner.lock().unwrap();
127 if g.is_closed() {
128 return Poll::Ready(Err(this.value.take().unwrap()));
130 }
131 if g.has_capacity() {
132 let val = this.value.take().unwrap();
133 g.queue.push_back(val);
134 if let Some(w) = g.recv_waker.take() {
136 drop(g);
137 w.wake();
138 }
139 Poll::Ready(Ok(()))
140 } else {
141 g.send_wakers.push_back(cx.waker().clone());
143 Poll::Pending
144 }
145 }
146}
147
148pub fn unbounded<T>() -> (UnboundedSender<T>, Receiver<T>) {
154 let inner = Arc::new(Mutex::new(Inner::new(None)));
155 (
156 UnboundedSender {
157 inner: inner.clone(),
158 },
159 Receiver { inner },
160 )
161}
162
163pub struct UnboundedSender<T> {
165 inner: Arc<Mutex<Inner<T>>>,
166}
167
168impl<T> Clone for UnboundedSender<T> {
169 fn clone(&self) -> Self {
170 self.inner.lock().unwrap().sender_count += 1;
171 Self {
172 inner: self.inner.clone(),
173 }
174 }
175}
176
177impl<T> Drop for UnboundedSender<T> {
178 fn drop(&mut self) {
179 let mut g = self.inner.lock().unwrap();
180 g.sender_count -= 1;
181 if g.sender_count == 0 {
182 if let Some(w) = g.recv_waker.take() {
183 drop(g);
184 w.wake();
185 }
186 }
187 }
188}
189
190impl<T> UnboundedSender<T> {
191 pub fn send(&self, value: T) -> Result<(), T> {
195 let mut g = self.inner.lock().unwrap();
196 if g.is_closed() {
197 return Err(value);
198 }
199 g.queue.push_back(value);
200 if let Some(w) = g.recv_waker.take() {
201 drop(g);
202 w.wake();
203 }
204 Ok(())
205 }
206}
207
208pub struct Receiver<T> {
212 inner: Arc<Mutex<Inner<T>>>,
213}
214
215impl<T> Receiver<T> {
216 pub fn recv(&mut self) -> RecvFuture<'_, T> {
221 RecvFuture { inner: &self.inner }
222 }
223}
224
225pub struct RecvFuture<'a, T> {
227 inner: &'a Arc<Mutex<Inner<T>>>,
228}
229
230impl<T> Future for RecvFuture<'_, T> {
231 type Output = Option<T>;
232
233 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
234 let mut g = self.inner.lock().unwrap();
235 if let Some(val) = g.queue.pop_front() {
236 if let Some(w) = g.send_wakers.pop_front() {
238 drop(g);
239 w.wake();
240 }
241 Poll::Ready(Some(val))
242 } else if g.is_closed() {
243 Poll::Ready(None)
244 } else {
245 g.recv_waker = Some(cx.waker().clone());
246 Poll::Pending
247 }
248 }
249}
250
251#[cfg(test)]
254mod tests {
255 use super::*;
256 use crate::executor::{block_on, block_on_with_spawn, spawn};
257
258 #[test]
259 fn bounded_send_recv_basic() {
260 block_on(async {
261 let (tx, mut rx) = channel::<u32>(4);
262 tx.send(1).await.unwrap();
263 tx.send(2).await.unwrap();
264 assert_eq!(rx.recv().await, Some(1));
265 assert_eq!(rx.recv().await, Some(2));
266 });
267 }
268
269 #[test]
270 fn bounded_channel_close_on_sender_drop() {
271 block_on(async {
272 let (tx, mut rx) = channel::<u32>(4);
273 tx.send(42).await.unwrap();
274 drop(tx);
275 assert_eq!(rx.recv().await, Some(42));
276 assert_eq!(rx.recv().await, None);
277 });
278 }
279
280 #[test]
281 fn unbounded_multi_producer() {
282 block_on_with_spawn(async {
283 let (tx1, mut rx) = unbounded::<u32>();
284 let tx2 = tx1.clone();
285 let jh1 = spawn(async move {
286 tx1.send(10).unwrap();
287 });
288 let jh2 = spawn(async move {
289 tx2.send(20).unwrap();
290 });
291 jh1.await.unwrap();
292 jh2.await.unwrap();
293 let mut vals = vec![rx.recv().await.unwrap(), rx.recv().await.unwrap()];
294 vals.sort();
295 assert_eq!(vals, vec![10, 20]);
296 });
297 }
298
299 #[test]
300 fn bounded_backpressure_unblocks_when_consumed() {
301 block_on_with_spawn(async {
302 let (tx, mut rx) = channel::<u32>(1);
303 tx.send(1).await.unwrap();
305 let jh = spawn(async move {
307 tx.send(2).await.unwrap();
308 });
309 assert_eq!(rx.recv().await, Some(1));
310 jh.await.unwrap();
311 assert_eq!(rx.recv().await, Some(2));
312 });
313 }
314
315 #[test]
316 fn unbounded_close_returns_none() {
317 block_on(async {
318 let (tx, mut rx) = unbounded::<i32>();
319 drop(tx);
320 assert_eq!(rx.recv().await, None);
321 });
322 }
323
324 #[test]
325 fn bounded_send_to_closed_receiver_returns_err() {
326 block_on(async {
327 let (tx, rx) = channel::<u32>(4);
328 drop(rx);
329 let _ = tx; });
335 }
336}