sorted_channel/
lib.rs

1//! Multi-producer, multi-consumer sorted channel.
2use std::sync::{
3    atomic::{AtomicUsize, Ordering},
4    Arc,
5};
6
7use parking_lot::{Condvar, Mutex};
8
9/// The sending-half of the [`sorted_channel`] type. This sender can be cloned
10/// and sent to multiple threads.
11///
12/// # Examples
13/// ```
14/// use sorted_channel::sorted_channel;
15/// use std::thread;
16///
17/// let (tx, rx) = sorted_channel();
18/// let tx2 = tx.clone();
19///
20/// let h1 = thread::spawn(move || {
21///     tx.send(2).unwrap();
22///     tx.send(1).unwrap();
23/// });
24///
25/// let h2 = thread::spawn(move || {
26///     tx2.send(4).unwrap();
27///     tx2.send(3).unwrap();
28/// });
29///
30/// h1.join().unwrap();
31/// h2.join().unwrap();
32/// let outputs: Vec<_> = (0..4).map(|_| rx.recv().unwrap()).collect();
33/// assert_eq!(outputs, [4, 3, 2, 1]);
34/// ```
35pub struct Sender<T> {
36    inner: Arc<Inner<T>>,
37}
38
39/// An error returned from [`Sender::send`] when there are no [`Receiver`]s left
40/// for the channel.
41#[derive(Debug, PartialEq)]
42pub struct SendError<T>(pub T);
43
44/// The receiving-half of the [`sorted_channel`] type. This receiver can be
45/// cloned and sent to multiple threads. See [`Sender`] for an example.
46pub struct Receiver<T> {
47    inner: Arc<Inner<T>>,
48}
49
50/// An error returned from [`Receiver::recv`] when there are no [`Sender`]s left
51/// for the channel.
52#[derive(Debug, PartialEq)]
53pub struct RecvError;
54
55struct Inner<T> {
56    queue: Mutex<Vec<T>>,
57    senders: AtomicUsize,
58    receivers: AtomicUsize,
59    new_values: Condvar,
60}
61
62impl<T: Ord> Sender<T> {
63    pub fn send(&self, t: T) -> Result<(), SendError<T>> {
64        if self.inner.receivers.load(Ordering::Relaxed) == 0 {
65            return Err(SendError(t));
66        }
67        let mut queue = self.inner.queue.lock();
68        queue.push(t);
69        queue.sort_unstable();
70        self.inner.new_values.notify_one();
71        Ok(())
72    }
73}
74
75impl<T> Clone for Sender<T> {
76    fn clone(&self) -> Self {
77        self.inner.senders.fetch_add(1, Ordering::Relaxed);
78        Self {
79            inner: Arc::clone(&self.inner),
80        }
81    }
82}
83
84impl<T> Drop for Sender<T> {
85    fn drop(&mut self) {
86        self.inner.senders.fetch_sub(1, Ordering::Relaxed);
87    }
88}
89
90impl<T> Receiver<T> {
91    /// Wait for data in the channel by blocking the current thread, returning
92    /// an error if there are no more [`Sender`]s connected to this channel.
93    ///
94    /// # Examples
95    /// ```
96    /// use sorted_channel::sorted_channel;
97    /// use std::thread;
98    /// let (tx, rx) = sorted_channel();
99    /// thread::spawn(move || {
100    ///     tx.send(7).unwrap();
101    /// });
102    ///
103    /// assert_eq!(rx.recv(), Ok(7));
104    /// ```
105    ///
106    /// Failure from missing sender:
107    /// ```
108    /// use sorted_channel::sorted_channel;
109    /// use std::thread;
110    /// let (tx, rx) = sorted_channel::<()>();
111    /// let handle = thread::spawn(move || {
112    ///     drop(tx);
113    /// });
114    ///
115    /// handle.join().unwrap();
116    /// assert!(rx.recv().is_err());
117    /// ```
118    pub fn recv(&self) -> Result<T, RecvError> {
119        let mut queue = self.inner.queue.lock();
120        loop {
121            match queue.pop() {
122                Some(t) => return Ok(t),
123                None => {
124                    if self.inner.senders.load(Ordering::Relaxed) == 0 {
125                        return Err(RecvError);
126                    }
127                    self.inner.new_values.wait(&mut queue);
128                }
129            }
130        }
131    }
132}
133
134impl<T> Clone for Receiver<T> {
135    fn clone(&self) -> Self {
136        self.inner.receivers.fetch_add(1, Ordering::Relaxed);
137        Self {
138            inner: Arc::clone(&self.inner),
139        }
140    }
141}
142
143impl<T> Drop for Receiver<T> {
144    fn drop(&mut self) {
145        self.inner.receivers.fetch_sub(1, Ordering::Relaxed);
146    }
147}
148
149/// Creates a new sorted channel returning the [`Sender`]/[`Receiver`] halves.
150///
151/// # Examples
152/// ```
153/// use sorted_channel::sorted_channel;
154/// use std::thread;
155///
156/// let (tx, rx) = sorted_channel();
157///
158/// let handle = thread::spawn(move || {
159///     for i in [0, 9, 1, 8, 2, 7, 3, 6, 4, 5] {
160///         tx.send(i).unwrap();
161///     }
162/// });
163///
164/// handle.join().unwrap();
165///
166/// for i in (0..10).rev() {
167///     assert_eq!(rx.recv(), Ok(i));
168/// }
169/// ```
170pub fn sorted_channel<T>() -> (Sender<T>, Receiver<T>) {
171    let inner = Arc::new(Inner {
172        queue: Mutex::default(),
173        senders: AtomicUsize::new(1),
174        receivers: AtomicUsize::new(1),
175        new_values: Condvar::new(),
176    });
177
178    (
179        Sender {
180            inner: inner.clone(),
181        },
182        Receiver { inner },
183    )
184}
185
186#[cfg(test)]
187mod tests {
188    use super::*;
189
190    #[test]
191    fn send_receive_sorted() {
192        let (tx, rx) = sorted_channel();
193
194        for i in [0, 9, 1, 8, 2, 7, 3, 6, 4, 5] {
195            tx.send(i).unwrap();
196        }
197        for i in (0..10).rev() {
198            assert_eq!(rx.recv().unwrap(), i);
199        }
200    }
201
202    #[test]
203    fn no_senders() {
204        let (tx, rx) = sorted_channel::<()>();
205        drop(tx);
206        assert_eq!(rx.recv(), Err(RecvError));
207    }
208
209    #[test]
210    fn no_receivers() {
211        let (tx, rx) = sorted_channel();
212        drop(rx);
213        let value = 7;
214        assert_eq!(tx.send(7), Err(SendError(value)));
215    }
216
217    #[test]
218    fn multiple_receivers() {
219        let (tx, rx) = sorted_channel();
220        let rx2 = rx.clone();
221        for i in [0, 9, 1, 8, 2, 7, 3, 6, 4, 5] {
222            tx.send(i).unwrap();
223        }
224        for i in (0..10).rev() {
225            if i % 2 == 0 {
226                assert_eq!(rx.recv().unwrap(), i);
227            } else {
228                assert_eq!(rx2.recv().unwrap(), i);
229            }
230        }
231    }
232
233    #[test]
234    fn multiple_senders() {
235        let (tx, rx) = sorted_channel();
236        let tx2 = tx.clone();
237        for i in [0, 9, 1, 8, 2, 7, 3, 6, 4, 5] {
238            if i % 2 == 0 {
239                tx.send(i).unwrap();
240            } else {
241                tx2.send(i).unwrap();
242            }
243        }
244        for i in (0..10).rev() {
245            assert_eq!(rx.recv().unwrap(), i);
246        }
247    }
248}