simple_channels/mpsc/mod.rs
1mod error;
2mod ring_buf;
3
4pub use error::{RecvError, SendError, TryRecvError};
5
6use ring_buf::RingBuf;
7use std::{sync::Arc, thread};
8
9/// The sending side of a channel.
10pub struct Sender<T> {
11 channel: Arc<RingBuf<T>>,
12}
13
14/// The receiving side of a channel.
15pub struct Receiver<T> {
16 channel: Arc<RingBuf<T>>,
17}
18
19/// Creates a new bounded MPSC channel with a specified capacity.
20///
21/// The capacity must be a power of two.
22///
23/// # Panics
24///
25/// Panics if the capacity is not a power of two.
26pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
27 let channel = Arc::new(RingBuf::new(cap));
28 let sender = Sender {
29 channel: Arc::clone(&channel),
30 };
31 let receiver = Receiver { channel };
32 (sender, receiver)
33}
34
35impl<T> Sender<T> {
36 /// Sends a value down the channel.
37 ///
38 /// This method will block if the channel's buffer is full.
39 ///
40 /// An error is returned if the receiver has been dropped.
41 pub fn send(&self, value: T) -> Result<(), SendError<T>> {
42 // To check for disconnection, we see if the Arc has only one reference left.
43 // If so, it must be this Sender, meaning the Receiver is gone.
44 // We use a relaxed ordering because we don't need to synchronize memory with this check.
45 if Arc::strong_count(&self.channel) == 1 {
46 return Err(SendError(value));
47 }
48
49 let mut current_value = value;
50 loop {
51 // Attempt to push the value into the ring buffer.
52 match self.channel.push(current_value) {
53 Ok(()) => return Ok(()),
54 Err(v) => {
55 // The buffer is full. We store the value back and yield.
56 current_value = v;
57 thread::yield_now(); // Yield to allow the receiver to catch up.
58
59 // After yielding, we must re-check for disconnection.
60 if Arc::strong_count(&self.channel) == 1 {
61 return Err(SendError(current_value));
62 }
63 }
64 }
65 }
66 }
67}
68
69// Implement Clone to allow for multiple producers.
70impl<T> Clone for Sender<T> {
71 fn clone(&self) -> Self {
72 Sender {
73 channel: Arc::clone(&self.channel),
74 }
75 }
76}
77
78impl<T> Receiver<T> {
79 /// Receives a value from the channel.
80 ///
81 /// This method will block until a message is available.
82 ///
83 /// An error is returned if the channel is empty and all senders have been dropped.
84 pub fn recv(&self) -> Result<T, RecvError> {
85 loop {
86 // Attempt to pop a value from the buffer.
87 match self.channel.pop() {
88 Some(value) => return Ok(value),
89 None => {
90 // Buffer is empty. Check if senders are still connected.
91 // If the strong count is 1, only this Receiver holds the Arc.
92 if Arc::strong_count(&self.channel) == 1 {
93 return Err(RecvError::Disconnected);
94 }
95 // Yield to allow senders to produce a message.
96 thread::yield_now();
97 }
98 }
99 }
100 }
101
102 /// Attempts to receive a value from the channel without blocking.
103 pub fn try_recv(&self) -> Result<T, TryRecvError> {
104 match self.channel.pop() {
105 Some(value) => Ok(value),
106 None => {
107 // Buffer is empty. Check for disconnection.
108 if Arc::strong_count(&self.channel) == 1 {
109 Err(TryRecvError::Disconnected)
110 } else {
111 Err(TryRecvError::Empty)
112 }
113 }
114 }
115 }
116}