use super::err::{ReceiverError, SenderError};
use crate::core::configuration::WaitPolicy;
#[cfg(feature = "ff")]
use super::channel_ff as backend;
#[cfg(feature = "crossbeam")]
use super::channel_cb as backend;
#[cfg(feature = "kanal")]
use super::channel_kanal as backend;
#[cfg(feature = "flume")]
use super::channel_flume as backend;
pub trait Receiver<T> {
fn receive(&self) -> Result<Option<T>, ReceiverError>;
fn is_empty(&self) -> bool;
}
pub trait Sender<T> {
fn send(&self, msg: T) -> Result<(), SenderError>;
}
pub struct ReceiverChannel<T> {
rx: Box<dyn Receiver<T> + Sync + Send>,
blocking: bool,
}
impl<T> ReceiverChannel<T>
where
T: Send,
{
pub fn receive(&self) -> Result<Option<T>, ReceiverError> {
self.rx.receive()
}
pub fn try_receive_all(&self) -> Vec<T> {
let mut res = Vec::new();
while !self.is_empty() {
match self.receive() {
Ok(Some(msg)) => res.push(msg),
Ok(None) => break, Err(_e) => break, }
}
res
}
pub fn is_blocking(&self) -> bool {
self.blocking
}
pub fn is_empty(&self) -> bool {
self.rx.is_empty()
}
}
pub struct SenderChannel<T> {
tx: Box<dyn Sender<T> + Sync + Send>,
}
impl<T> SenderChannel<T>
where
T: Send,
{
pub fn send(&self, msg: T) -> Result<(), SenderError> {
self.tx.send(msg)
}
}
pub struct Channel;
impl Channel {
pub fn channel<T: Send + 'static>(
wait_policy: WaitPolicy,
) -> (ReceiverChannel<T>, SenderChannel<T>) {
let blocking = match wait_policy {
WaitPolicy::Active => false,
WaitPolicy::Passive => true,
};
let (rx, tx) = backend::Channel::channel(blocking);
(ReceiverChannel { rx, blocking }, SenderChannel { tx })
}
}
#[cfg(test)]
mod tests {
use serial_test::parallel;
use super::Channel;
#[test]
#[parallel]
fn test_non_blocking() {
let (rx, tx) = Channel::channel(crate::core::configuration::WaitPolicy::Active);
let mut check = true;
for i in 0..1000 {
let _ = tx.send(i);
}
for i in 0..1000 {
match rx.receive() {
Ok(Some(msg)) => {
if msg != i {
check = false;
}
}
Ok(None) => {}
Err(_) => check = false,
}
}
assert!(check)
}
#[test]
#[parallel]
fn test_blocking() {
let (rx, tx) = Channel::channel(crate::core::configuration::WaitPolicy::Passive);
let mut check = true;
for i in 0..1000 {
let _ = tx.send(i);
}
drop(tx);
for i in 0..1000 {
match rx.receive() {
Ok(Some(msg)) => {
if msg != i {
check = false;
}
}
Ok(None) => {}
Err(_) => check = false,
}
}
if rx.receive().is_ok() {
check = false;
}
assert!(check)
}
}