1use std::collections::VecDeque;
35use std::future::Future;
36use std::pin::Pin;
37use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
38use std::sync::{Arc, Mutex};
39use std::task::{Context, Poll, Waker};
40
41pub enum SendError<T> {
44 Closed(T),
47}
48
49impl<T> std::fmt::Debug for SendError<T> {
50 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
51 f.debug_tuple("SendError::Closed")
52 .field(&format_args!("_"))
53 .finish()
54 }
55}
56
57impl<T> PartialEq for SendError<T> {
58 fn eq(&self, _other: &Self) -> bool {
59 true
60 }
61}
62
63impl<T> Eq for SendError<T> {}
64
65impl<T> std::fmt::Display for SendError<T> {
66 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
67 write!(f, "Channel closed")
68 }
69}
70
71impl<T> std::error::Error for SendError<T> {}
72
73#[derive(Debug, Clone, Copy, PartialEq, Eq)]
76pub enum RecvError {
77 Closed,
80}
81
82impl std::fmt::Display for RecvError {
83 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
84 match self {
85 RecvError::Closed => write!(f, "Channel closed"),
86 }
87 }
88}
89
90impl std::error::Error for RecvError {}
91
92#[must_use]
106pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
107 let shared = Arc::new(ChannelShared {
108 buffer: Mutex::new(VecDeque::new()),
109 sender_count: AtomicUsize::new(1),
110 is_receiver_alive: AtomicBool::new(true),
111 recv_waker: Mutex::new(None),
112 });
113
114 let sender = Sender {
115 shared: shared.clone(),
116 };
117 let receiver = Receiver { shared };
118
119 (sender, receiver)
120}
121
122#[must_use]
136pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
137 let shared = Arc::new(ChannelShared {
138 buffer: Mutex::new(VecDeque::with_capacity(cap)),
139 sender_count: AtomicUsize::new(1),
140 is_receiver_alive: AtomicBool::new(true),
141 recv_waker: Mutex::new(None),
142 });
143
144 let sender = Sender {
145 shared: shared.clone(),
146 };
147 let receiver = Receiver { shared };
148
149 (sender, receiver)
150}
151
152struct ChannelShared<T> {
155 buffer: Mutex<VecDeque<T>>,
158 sender_count: AtomicUsize,
161 is_receiver_alive: AtomicBool,
164 recv_waker: Mutex<Option<Waker>>,
167}
168
169pub struct Sender<T> {
175 shared: Arc<ChannelShared<T>>,
176}
177
178impl<T> Clone for Sender<T> {
179 fn clone(&self) -> Self {
180 self.shared.sender_count.fetch_add(1, Ordering::Relaxed);
183 Self {
184 shared: Arc::clone(&self.shared),
185 }
186 }
187}
188
189impl<T> Sender<T> {
190 pub fn send(&self, value: T) -> Result<(), SendError<T>> {
203 if !self.shared.is_receiver_alive.load(Ordering::Acquire) {
204 return Err(SendError::Closed(value));
205 }
206
207 let mut buffer = self.shared.buffer.lock().unwrap();
208 buffer.push_back(value);
209
210 if let Some(waker) = self.shared.recv_waker.lock().unwrap().take() {
213 drop(buffer);
214 waker.wake();
215 }
216
217 Ok(())
218 }
219
220 #[must_use]
223 pub fn is_closed(&self) -> bool {
224 !self.shared.is_receiver_alive.load(Ordering::Acquire)
225 }
226
227 #[must_use]
230 pub fn sender_count(&self) -> usize {
231 self.shared.sender_count.load(Ordering::Acquire)
232 }
233}
234
235impl<T> Drop for Sender<T> {
236 fn drop(&mut self) {
237 let prev = self.shared.sender_count.fetch_sub(1, Ordering::AcqRel);
240
241 if prev == 1 {
242 if let Some(waker) = self.shared.recv_waker.lock().unwrap().take() {
245 waker.wake();
246 }
247 }
248 }
249}
250
251pub struct Receiver<T> {
254 shared: Arc<ChannelShared<T>>,
255}
256
257impl<T> Receiver<T> {
258 pub fn recv(&mut self) -> RecvFuture<'_, T> {
261 RecvFuture::new(self)
262 }
263
264 pub fn try_recv(&mut self) -> Result<T, RecvError> {
280 let mut buffer = self.shared.buffer.lock().unwrap();
281
282 if let Some(value) = buffer.pop_front() {
283 Ok(value)
284 } else if self.shared.sender_count.load(Ordering::Acquire) == 0 {
285 Err(RecvError::Closed)
288 } else {
289 Err(RecvError::Closed)
292 }
293 }
294
295 #[must_use]
303 pub fn len(&self) -> usize {
304 self.shared.buffer.lock().unwrap().len()
305 }
306
307 #[must_use]
310 pub fn is_empty(&self) -> bool {
311 self.len() == 0
312 }
313}
314
315impl<T> Drop for Receiver<T> {
316 fn drop(&mut self) {
317 self.shared
320 .is_receiver_alive
321 .store(false, Ordering::Release);
322 }
323}
324
325pub struct RecvFuture<'a, T> {
328 shared: Arc<ChannelShared<T>>,
331 _marker: std::marker::PhantomData<&'a mut Receiver<T>>,
334}
335
336impl<'a, T> RecvFuture<'a, T> {
337 fn new(receiver: &'a mut Receiver<T>) -> Self {
339 Self {
342 shared: Arc::clone(&receiver.shared),
343 _marker: std::marker::PhantomData,
344 }
345 }
346}
347
348unsafe impl<T: Send> Send for RecvFuture<'_, T> {}
349unsafe impl<T: Sync> Sync for RecvFuture<'_, T> {}
350
351impl<T> Future for RecvFuture<'_, T> {
352 type Output = Option<T>;
353
354 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
355 let mut buffer = self.shared.buffer.lock().unwrap();
356
357 if let Some(value) = buffer.pop_front() {
358 Poll::Ready(Some(value))
359 } else if self.shared.sender_count.load(Ordering::Acquire) == 0 {
360 Poll::Ready(None)
363 } else {
364 *self.shared.recv_waker.lock().unwrap() = Some(cx.waker().clone());
367 Poll::Pending
368 }
369 }
370}
371
372#[cfg(test)]
373mod tests {
374 use super::*;
375
376 #[test]
377 fn test_unbounded_channel_creation() {
378 let (tx, _rx) = unbounded::<i32>();
379 assert!(!tx.is_closed());
380 assert_eq!(tx.sender_count(), 1);
381 }
382
383 #[test]
384 fn test_bounded_channel_creation() {
385 let (tx, _rx) = bounded::<i32>(16);
386 assert!(!tx.is_closed());
387 assert_eq!(tx.sender_count(), 1);
388 }
389
390 #[test]
391 fn test_sender_clone() {
392 let (tx, _rx) = unbounded::<i32>();
393 let tx2 = tx.clone();
394 assert_eq!(tx.sender_count(), 2);
395 assert_eq!(tx2.sender_count(), 2);
396 drop(tx);
397 assert_eq!(tx2.sender_count(), 1);
398 }
399
400 #[test]
401 fn test_receiver_empty() {
402 let (_tx, rx) = unbounded::<i32>();
403 assert!(rx.is_empty());
404 assert_eq!(rx.len(), 0);
405 }
406
407 #[test]
408 fn test_sync_send() {
409 let (tx, mut rx) = unbounded::<i32>();
410
411 assert!(tx.send(42).is_ok());
412 assert!(tx.send(100).is_ok());
413
414 assert_eq!(rx.len(), 2);
415 assert!(!rx.is_empty());
416
417 assert_eq!(rx.try_recv().unwrap(), 42);
419 assert_eq!(rx.try_recv().unwrap(), 100);
420 assert_eq!(rx.try_recv(), Err(RecvError::Closed));
421 }
422
423 #[test]
424 fn test_send_after_receiver_drop() {
425 let (tx, rx) = unbounded::<i32>();
426 drop(rx);
427 assert!(tx.is_closed());
428
429 let err = tx.send(42).unwrap_err();
430 assert!(matches!(err, SendError::Closed(42)));
431 assert_eq!(err.to_string(), "Channel closed");
432 }
433
434 #[test]
435 fn test_recv_error() {
436 let err = RecvError::Closed;
437
438 assert_eq!(err.to_string(), "Channel closed");
439 }
440
441 #[test]
442 fn test_unbounded_send_recv_order() {
443 let (tx, mut rx) = unbounded::<String>();
444 for i in 0..10 {
445 tx.send(format!("msg-{i}")).unwrap();
446 }
447 for i in 0..10 {
448 assert_eq!(rx.try_recv().unwrap(), format!("msg-{i}"));
449 }
450 }
451
452 #[test]
453 fn test_bounded_channel_full() {
454 let (tx, rx) = bounded::<i32>(2);
455 assert!(tx.send(1).is_ok());
456 assert!(tx.send(2).is_ok());
457 assert!(tx.send(3).is_ok());
459 assert_eq!(rx.len(), 3);
460 }
461
462 #[test]
463 fn test_close_after_all_senders_drop() {
464 let (tx, mut rx) = unbounded::<i32>();
465 let tx2 = tx.clone();
466 tx.send(1).unwrap();
467 drop(tx);
468 assert!(!tx2.is_closed());
470 tx2.send(2).unwrap();
471 drop(tx2);
472 assert_eq!(rx.try_recv().unwrap(), 1);
474 assert_eq!(rx.try_recv().unwrap(), 2);
475 }
476}