#![allow(clippy::match_same_arms)]
#[cfg(feature = "crossbeam-channel")]
pub use crossbeam_channel::{SendError, TrySendError, RecvError, TryRecvError};
#[cfg(feature = "crossbeam-channel")]
use crossbeam_channel::{
Receiver as PlainReceiver,
Sender as PlainSender,
Sender as PlainSyncSender,
bounded as plain_bounded,
unbounded as plain_unbounded,
};
#[cfg(not(feature = "crossbeam-channel"))]
pub use std::sync::mpsc::{SendError, TrySendError, RecvError, TryRecvError};
#[cfg(not(feature = "crossbeam-channel"))]
use std::sync::mpsc::{
Receiver as PlainReceiver,
Sender as PlainSender,
SyncSender as PlainSyncSender,
sync_channel as plain_bounded,
channel as plain_unbounded,
};
use std::cmp::Ordering;
use std::collections::BinaryHeap;
use std::iter::FusedIterator;
enum SenderKind<T> {
Bounded(PlainSyncSender<T>),
Unbounded(PlainSender<T>),
}
pub struct Sender<T> {
sender: SenderKind<OrderByKey<T>>,
}
impl<T> Clone for Sender<T> {
#[inline]
fn clone(&self) -> Self {
Self { sender: match &self.sender {
SenderKind::Bounded(s) => SenderKind::Bounded(s.clone()),
SenderKind::Unbounded(s) => SenderKind::Unbounded(s.clone()),
} }
}
}
pub struct Receiver<T> {
receiver: PlainReceiver<OrderByKey<T>>,
next_index: usize,
receive_buffer: BinaryHeap<OrderByKey<T>>,
}
impl<T> Receiver<T> {
pub fn recv(&mut self) -> Result<T, RecvError> {
while self.receive_buffer.peek().map_or(true, |i| i.0 > self.next_index) {
match self.receiver.recv() {
Ok(OrderByKey(index, item)) if index <= self.next_index => {
self.next_index = index + 1;
return Ok(item);
},
Ok(queued) => {
self.receive_buffer.push(queued);
},
Err(_) => {
break;
},
}
}
if let Some(OrderByKey(index, item)) = self.receive_buffer.pop() {
self.next_index = index + 1;
Ok(item)
} else {
Err(RecvError)
}
}
pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
while self.receive_buffer.peek().map_or(true, |i| i.0 > self.next_index) {
match self.receiver.try_recv() {
Ok(OrderByKey(index, item)) if index <= self.next_index => {
self.next_index = index + 1;
return Ok(item);
},
Ok(queued) => {
self.receive_buffer.push(queued);
},
Err(e @ TryRecvError::Empty) => {
return Err(e);
},
Err(TryRecvError::Disconnected) => {
break;
},
}
}
if let Some(OrderByKey(index, item)) = self.receive_buffer.pop() {
self.next_index = index + 1;
Ok(item)
} else {
Err(TryRecvError::Disconnected)
}
}
}
#[inline]
#[must_use] pub fn bounded<T>(depth: usize) -> (Sender<T>, Receiver<T>) {
let (tx, rx) = plain_bounded(depth);
make(SenderKind::Bounded(tx), rx)
}
#[inline]
#[must_use] pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
let (tx, rx) = plain_unbounded();
make(SenderKind::Unbounded(tx), rx)
}
fn make<T>(sender: SenderKind<OrderByKey<T>>, receiver: PlainReceiver<OrderByKey<T>>) -> (Sender<T>, Receiver<T>) {
(Sender {
sender,
}, Receiver {
receiver,
next_index: 0,
receive_buffer: BinaryHeap::new()
})
}
impl<T: Send> Sender<T> {
#[inline]
pub fn send(&self, index: usize, item: T) -> Result<(), SendError<T>> {
let pair = OrderByKey(index, item);
match &self.sender {
SenderKind::Bounded(s) => s.send(pair),
SenderKind::Unbounded(s) => s.send(pair),
}.map_err(|e| SendError(e.0.1))
}
#[inline]
pub fn try_send(&self, index: usize, item: T) -> Result<(), TrySendError<T>> {
let pair = OrderByKey(index, item);
match &self.sender {
SenderKind::Bounded(s) => s.try_send(pair).map_err(|e| match e {
TrySendError::Full(e) => TrySendError::Full(e.1),
TrySendError::Disconnected(e) => TrySendError::Disconnected(e.1),
}),
SenderKind::Unbounded(s) => s.send(pair).map_err(|e| TrySendError::Disconnected(e.0.1)),
}
}
}
impl<T> FusedIterator for Receiver<T> {}
impl<T> Iterator for Receiver<T> {
type Item = T;
#[inline]
fn next(&mut self) -> Option<T> {
self.recv().ok()
}
}
struct OrderByKey<T>(usize, T);
impl<T> PartialEq for OrderByKey<T> {
#[inline]
fn eq(&self, o: &Self) -> bool { o.0.eq(&self.0) }
}
impl<T> Eq for OrderByKey<T> {}
impl<T> PartialOrd for OrderByKey<T> {
#[inline]
fn partial_cmp(&self, o: &Self) -> Option<Ordering> { Some(self.cmp(o)) }
}
impl<T> Ord for OrderByKey<T> {
#[inline]
fn cmp(&self, o: &Self) -> Ordering { o.0.cmp(&self.0) }
}
#[test]
fn test() {
let (s, mut r) = bounded(10);
s.send(1, "B").unwrap();
s.send(0, "A").unwrap();
s.send(200, "X").unwrap();
s.send(0, "bad A").unwrap();
std::thread::spawn(move || {
s.send(2, "C").unwrap();
});
assert_eq!("A", r.recv().unwrap());
assert_eq!("B", r.recv().unwrap());
assert_eq!("bad A", r.recv().unwrap());
assert_eq!("C", r.recv().unwrap());
assert_eq!("X", r.recv().unwrap());
assert!(r.recv().is_err());
}
#[test]
fn test_try() {
let (s, mut r) = bounded(10);
s.try_send(1, "B").unwrap();
s.try_send(0, "A").unwrap();
s.try_send(2, "C").unwrap();
assert_eq!("A", r.try_recv().unwrap());
assert_eq!("B", r.try_recv().unwrap());
assert_eq!("C", r.try_recv().unwrap());
drop(s);
assert!(r.try_recv().is_err());
assert!(r.recv().is_err());
}