1use std::sync::{
3 atomic::{AtomicUsize, Ordering},
4 Arc,
5};
6
7use parking_lot::{Condvar, Mutex};
8
9pub struct Sender<T> {
36 inner: Arc<Inner<T>>,
37}
38
39#[derive(Debug, PartialEq)]
42pub struct SendError<T>(pub T);
43
44pub struct Receiver<T> {
47 inner: Arc<Inner<T>>,
48}
49
50#[derive(Debug, PartialEq)]
53pub struct RecvError;
54
55struct Inner<T> {
56 queue: Mutex<Vec<T>>,
57 senders: AtomicUsize,
58 receivers: AtomicUsize,
59 new_values: Condvar,
60}
61
62impl<T: Ord> Sender<T> {
63 pub fn send(&self, t: T) -> Result<(), SendError<T>> {
64 if self.inner.receivers.load(Ordering::Relaxed) == 0 {
65 return Err(SendError(t));
66 }
67 let mut queue = self.inner.queue.lock();
68 queue.push(t);
69 queue.sort_unstable();
70 self.inner.new_values.notify_one();
71 Ok(())
72 }
73}
74
75impl<T> Clone for Sender<T> {
76 fn clone(&self) -> Self {
77 self.inner.senders.fetch_add(1, Ordering::Relaxed);
78 Self {
79 inner: Arc::clone(&self.inner),
80 }
81 }
82}
83
84impl<T> Drop for Sender<T> {
85 fn drop(&mut self) {
86 self.inner.senders.fetch_sub(1, Ordering::Relaxed);
87 }
88}
89
90impl<T> Receiver<T> {
91 pub fn recv(&self) -> Result<T, RecvError> {
119 let mut queue = self.inner.queue.lock();
120 loop {
121 match queue.pop() {
122 Some(t) => return Ok(t),
123 None => {
124 if self.inner.senders.load(Ordering::Relaxed) == 0 {
125 return Err(RecvError);
126 }
127 self.inner.new_values.wait(&mut queue);
128 }
129 }
130 }
131 }
132}
133
134impl<T> Clone for Receiver<T> {
135 fn clone(&self) -> Self {
136 self.inner.receivers.fetch_add(1, Ordering::Relaxed);
137 Self {
138 inner: Arc::clone(&self.inner),
139 }
140 }
141}
142
143impl<T> Drop for Receiver<T> {
144 fn drop(&mut self) {
145 self.inner.receivers.fetch_sub(1, Ordering::Relaxed);
146 }
147}
148
149pub fn sorted_channel<T>() -> (Sender<T>, Receiver<T>) {
171 let inner = Arc::new(Inner {
172 queue: Mutex::default(),
173 senders: AtomicUsize::new(1),
174 receivers: AtomicUsize::new(1),
175 new_values: Condvar::new(),
176 });
177
178 (
179 Sender {
180 inner: inner.clone(),
181 },
182 Receiver { inner },
183 )
184}
185
186#[cfg(test)]
187mod tests {
188 use super::*;
189
190 #[test]
191 fn send_receive_sorted() {
192 let (tx, rx) = sorted_channel();
193
194 for i in [0, 9, 1, 8, 2, 7, 3, 6, 4, 5] {
195 tx.send(i).unwrap();
196 }
197 for i in (0..10).rev() {
198 assert_eq!(rx.recv().unwrap(), i);
199 }
200 }
201
202 #[test]
203 fn no_senders() {
204 let (tx, rx) = sorted_channel::<()>();
205 drop(tx);
206 assert_eq!(rx.recv(), Err(RecvError));
207 }
208
209 #[test]
210 fn no_receivers() {
211 let (tx, rx) = sorted_channel();
212 drop(rx);
213 let value = 7;
214 assert_eq!(tx.send(7), Err(SendError(value)));
215 }
216
217 #[test]
218 fn multiple_receivers() {
219 let (tx, rx) = sorted_channel();
220 let rx2 = rx.clone();
221 for i in [0, 9, 1, 8, 2, 7, 3, 6, 4, 5] {
222 tx.send(i).unwrap();
223 }
224 for i in (0..10).rev() {
225 if i % 2 == 0 {
226 assert_eq!(rx.recv().unwrap(), i);
227 } else {
228 assert_eq!(rx2.recv().unwrap(), i);
229 }
230 }
231 }
232
233 #[test]
234 fn multiple_senders() {
235 let (tx, rx) = sorted_channel();
236 let tx2 = tx.clone();
237 for i in [0, 9, 1, 8, 2, 7, 3, 6, 4, 5] {
238 if i % 2 == 0 {
239 tx.send(i).unwrap();
240 } else {
241 tx2.send(i).unwrap();
242 }
243 }
244 for i in (0..10).rev() {
245 assert_eq!(rx.recv().unwrap(), i);
246 }
247 }
248}