simple_channels/mpsc/
mod.rs

1mod error;
2mod ring_buf;
3
4pub use error::{RecvError, SendError, TryRecvError};
5
6use ring_buf::RingBuf;
7use std::{sync::Arc, thread};
8
9/// The sending side of a channel.
10pub struct Sender<T> {
11    channel: Arc<RingBuf<T>>,
12}
13
14/// The receiving side of a channel.
15pub struct Receiver<T> {
16    channel: Arc<RingBuf<T>>,
17}
18
19/// Creates a new bounded MPSC channel with a specified capacity.
20///
21/// The capacity must be a power of two.
22///
23/// # Panics
24///
25/// Panics if the capacity is not a power of two.
26pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
27    let channel = Arc::new(RingBuf::new(cap));
28    let sender = Sender {
29        channel: Arc::clone(&channel),
30    };
31    let receiver = Receiver { channel };
32    (sender, receiver)
33}
34
35impl<T> Sender<T> {
36    /// Sends a value down the channel.
37    ///
38    /// This method will block if the channel's buffer is full.
39    ///
40    /// An error is returned if the receiver has been dropped.
41    pub fn send(&self, value: T) -> Result<(), SendError<T>> {
42        // To check for disconnection, we see if the Arc has only one reference left.
43        // If so, it must be this Sender, meaning the Receiver is gone.
44        // We use a relaxed ordering because we don't need to synchronize memory with this check.
45        if Arc::strong_count(&self.channel) == 1 {
46            return Err(SendError(value));
47        }
48
49        let mut current_value = value;
50        loop {
51            // Attempt to push the value into the ring buffer.
52            match self.channel.push(current_value) {
53                Ok(()) => return Ok(()),
54                Err(v) => {
55                    // The buffer is full. We store the value back and yield.
56                    current_value = v;
57                    thread::yield_now(); // Yield to allow the receiver to catch up.
58
59                    // After yielding, we must re-check for disconnection.
60                    if Arc::strong_count(&self.channel) == 1 {
61                        return Err(SendError(current_value));
62                    }
63                }
64            }
65        }
66    }
67}
68
69// Implement Clone to allow for multiple producers.
70impl<T> Clone for Sender<T> {
71    fn clone(&self) -> Self {
72        Sender {
73            channel: Arc::clone(&self.channel),
74        }
75    }
76}
77
78impl<T> Receiver<T> {
79    /// Receives a value from the channel.
80    ///
81    /// This method will block until a message is available.
82    ///
83    /// An error is returned if the channel is empty and all senders have been dropped.
84    pub fn recv(&self) -> Result<T, RecvError> {
85        loop {
86            // Attempt to pop a value from the buffer.
87            match self.channel.pop() {
88                Some(value) => return Ok(value),
89                None => {
90                    // Buffer is empty. Check if senders are still connected.
91                    // If the strong count is 1, only this Receiver holds the Arc.
92                    if Arc::strong_count(&self.channel) == 1 {
93                        return Err(RecvError::Disconnected);
94                    }
95                    // Yield to allow senders to produce a message.
96                    thread::yield_now();
97                }
98            }
99        }
100    }
101
102    /// Attempts to receive a value from the channel without blocking.
103    pub fn try_recv(&self) -> Result<T, TryRecvError> {
104        match self.channel.pop() {
105            Some(value) => Ok(value),
106            None => {
107                // Buffer is empty. Check for disconnection.
108                if Arc::strong_count(&self.channel) == 1 {
109                    Err(TryRecvError::Disconnected)
110                } else {
111                    Err(TryRecvError::Empty)
112                }
113            }
114        }
115    }
116}