mod engine;
pub const ENGINE: &str = engine::ENGINE_NAME;
#[derive(Clone)]
pub struct Chan<T> {
inner: engine::Inner<T>,
}
impl<T> Chan<T> {
pub fn new(cap: usize) -> Self {
Chan { inner: engine::Inner::new(cap) }
}
pub fn Send(&self, v: T) -> crate::errors::error {
match self.inner.send(v) {
Ok(()) => crate::errors::nil,
Err(_) => crate::errors::New("send on closed channel"),
}
}
pub async fn send(&self, v: T) -> crate::errors::error {
match self.inner.send_async(v).await {
Ok(()) => crate::errors::nil,
Err(_) => crate::errors::New("send on closed channel"),
}
}
pub fn Recv(&self) -> (T, bool)
where T: Default {
match self.inner.recv() {
Some(v) => (v, true),
None => (T::default(), false),
}
}
pub async fn recv(&self) -> (T, bool)
where T: Default {
match self.inner.recv_async().await {
Some(v) => (v, true),
None => (T::default(), false),
}
}
pub fn TryRecv(&self) -> (T, bool)
where T: Default {
match self.inner.try_recv() {
Some(v) => (v, true),
None => (T::default(), false),
}
}
pub fn TrySend(&self, v: T) -> bool {
self.inner.try_send(v).is_ok()
}
#[doc(hidden)]
pub fn __select_try_recv(&self) -> Option<(T, bool)>
where T: Default {
if let Some(v) = self.inner.try_recv() {
return Some((v, true));
}
if self.inner.is_closed() {
return Some((T::default(), false));
}
None
}
#[doc(hidden)]
pub fn __select_try_send(&self, v: T) -> Result<(), T> {
self.inner.try_send(v)
}
#[allow(non_snake_case)]
pub fn Close(&self) {
self.inner.close();
}
pub fn Len(&self) -> crate::types::int {
self.inner.len() as crate::types::int
}
pub fn Cap(&self) -> crate::types::int {
self.inner.cap() as crate::types::int
}
pub fn len(&self) -> usize {
self.inner.len()
}
}
#[macro_export]
macro_rules! chan {
($t:ty, $cap:expr) => {
$crate::chan::Chan::<$t>::new($cap)
};
($t:ty) => {
$crate::chan::Chan::<$t>::new(0)
};
}
#[macro_export]
macro_rules! close {
($ch:expr) => {
($ch).Close()
};
}
#[macro_export]
macro_rules! select {
($($tt:tt)*) => {
$crate::__select_parse!(@arms [] $($tt)*)
};
}
#[doc(hidden)]
#[macro_export]
macro_rules! __select_parse {
(@arms [$($acc:tt)*] recv($ch:expr) |$v:ident, $ok:ident| => $body:block $(, $($rest:tt)*)?) => {
$crate::__select_parse!(@arms [$($acc)* (RecvBind2 ($ch) ($v) ($ok) ($body))] $($($rest)*)?)
};
(@arms [$($acc:tt)*] recv($ch:expr) |$v:ident| => $body:block $(, $($rest:tt)*)?) => {
$crate::__select_parse!(@arms [$($acc)* (RecvBind1 ($ch) ($v) ($body))] $($($rest)*)?)
};
(@arms [$($acc:tt)*] recv($ch:expr) => $body:block $(, $($rest:tt)*)?) => {
$crate::__select_parse!(@arms [$($acc)* (RecvDrop ($ch) ($body))] $($($rest)*)?)
};
(@arms [$($acc:tt)*] send($ch:expr, $v:expr) => $body:block $(, $($rest:tt)*)?) => {
$crate::__select_parse!(@arms [$($acc)* (Send ($ch) ($v) ($body))] $($($rest)*)?)
};
(@arms [$($acc:tt)*] default => $body:block $(,)?) => {
$crate::__select_parse!(@emit [$($acc)*] [$body])
};
(@arms [$($acc:tt)*]) => {
$crate::__select_parse!(@emit [$($acc)*] [])
};
(@emit [$($arms:tt)*] [$($def:tt)*]) => {{
#[allow(unused_mut, unused_assignments)]
let mut __goish_fired = false;
loop {
$crate::__select_parse!(@try __goish_fired $($arms)*);
if __goish_fired {
break;
}
$crate::__select_parse!(@default_or_spin __goish_fired [$($def)*]);
if __goish_fired {
break;
}
}
}};
(@default_or_spin $fired:ident [$($def:tt)+]) => {{
{ $($def)+ }
$fired = true;
}};
(@default_or_spin $fired:ident []) => {{
std::thread::sleep(std::time::Duration::from_millis(1));
}};
(@try $fired:ident (RecvBind2 ($ch:expr) ($v:ident) ($ok:ident) ($body:block)) $($rest:tt)*) => {
if !$fired {
if let Some(($v, $ok)) = ($ch).__select_try_recv() {
$body
$fired = true;
}
}
$crate::__select_parse!(@try $fired $($rest)*);
};
(@try $fired:ident (RecvBind1 ($ch:expr) ($v:ident) ($body:block)) $($rest:tt)*) => {
if !$fired {
if let Some(($v, _)) = ($ch).__select_try_recv() {
$body
$fired = true;
}
}
$crate::__select_parse!(@try $fired $($rest)*);
};
(@try $fired:ident (RecvDrop ($ch:expr) ($body:block)) $($rest:tt)*) => {
if !$fired {
if ($ch).__select_try_recv().is_some() {
$body
$fired = true;
}
}
$crate::__select_parse!(@try $fired $($rest)*);
};
(@try $fired:ident (Send ($ch:expr) ($v:expr) ($body:block)) $($rest:tt)*) => {
if !$fired {
if ($ch).__select_try_send($v).is_ok() {
$body
$fired = true;
}
}
$crate::__select_parse!(@try $fired $($rest)*);
};
(@try $fired:ident) => {};
}
#[cfg(test)]
mod tests {
#[test]
fn buffered_send_then_recv() {
let ch = crate::chan!(i64, 4);
ch.Send(10);
ch.Send(20);
let (v, ok) = ch.Recv();
assert!(ok);
assert_eq!(v, 10);
let (v, ok) = ch.Recv();
assert!(ok);
assert_eq!(v, 20);
}
#[test]
fn try_recv_on_empty() {
let ch = crate::chan!(i64, 1);
let (_, ok) = ch.TryRecv();
assert!(!ok);
ch.Send(99);
let (v, ok) = ch.TryRecv();
assert!(ok);
assert_eq!(v, 99);
}
#[test]
fn cross_thread_buffered() {
let ch = crate::chan!(i64, 8);
let producer = ch.clone();
let handle = std::thread::spawn(move || {
for i in 0..5 { producer.Send(i); }
});
let mut sum = 0i64;
for _ in 0..5 {
let (v, _) = ch.Recv();
sum += v;
}
handle.join().unwrap();
assert_eq!(sum, 0 + 1 + 2 + 3 + 4);
}
#[test]
fn unbuffered_rendezvous() {
let ch = crate::chan!(i64);
let producer = ch.clone();
let handle = std::thread::spawn(move || {
producer.Send(42);
});
let (v, ok) = ch.Recv();
handle.join().unwrap();
assert!(ok);
assert_eq!(v, 42);
}
#[test]
fn close_drains_then_returns_zero_ok_false() {
let ch = crate::chan!(i64, 4);
ch.Send(1);
ch.Send(2);
ch.Close();
let (v, ok) = ch.Recv();
assert!(ok); assert_eq!(v, 1);
let (v, ok) = ch.Recv();
assert!(ok); assert_eq!(v, 2);
let (v, ok) = ch.Recv();
assert!(!ok); assert_eq!(v, 0);
}
#[test]
fn send_on_closed_returns_error() {
let ch = crate::chan!(i64, 1);
ch.Close();
let err = ch.Send(42);
assert!(err != crate::errors::nil);
}
#[test]
fn engine_name_is_flume() {
assert_eq!(crate::chan::ENGINE, "flume");
}
#[test]
fn select_default_fires_when_empty() {
let c = crate::chan!(i64, 1);
let mut took_default = false;
crate::select!{
recv(c) => {},
default => { took_default = true; },
}
assert!(took_default);
}
#[test]
fn select_recv_fires_when_ready() {
let c = crate::chan!(i64, 1);
c.Send(42);
let mut got = -1i64;
crate::select!{
recv(c) |v| => { got = v; },
default => {},
}
assert_eq!(got, 42);
}
#[test]
fn select_recv_fires_on_closed_drained() {
let c = crate::chan!(i64, 1);
c.Close();
let mut fired = false;
let mut ok_seen = true;
crate::select!{
recv(c) |_v, ok| => { fired = true; ok_seen = ok; },
default => {},
}
assert!(fired, "recv case should fire on closed channel");
assert!(!ok_seen, "ok should be false on closed channel");
}
#[test]
fn select_send_fires_when_space() {
let c = crate::chan!(i64, 1);
let mut sent = false;
crate::select!{
send(c, 99) => { sent = true; },
default => {},
}
assert!(sent);
let (v, _) = c.Recv();
assert_eq!(v, 99);
}
#[test]
fn select_send_default_when_full() {
let c = crate::chan!(i64, 1);
c.Send(1); let mut took_default = false;
crate::select!{
send(c, 99) => {},
default => { took_default = true; },
}
assert!(took_default);
}
}