#![cfg_attr(feature = "cargo-clippy", allow(type_complexity))]
extern crate crossbeam_channel;
use std::{marker, mem, ptr};
use std::thread::spawn;
use std::marker::PhantomData;
use std::collections::HashMap;
use crossbeam_channel::{unbounded, Receiver, Sender};
use crossbeam_channel::Select;
pub use Branch::*;
#[must_use]
pub struct Chan<E, P>(Sender<*mut u8>, Receiver<*mut u8>, PhantomData<(E, P)>);
unsafe impl<E: marker::Send, P: marker::Send> marker::Send for Chan<E, P> {}
unsafe fn write_chan<A: marker::Send + 'static, E, P>(&Chan(ref tx, _, _): &Chan<E, P>, x: A) {
tx.send(Box::into_raw(Box::new(x)) as *mut _).unwrap()
}
unsafe fn read_chan<A: marker::Send + 'static, E, P>(&Chan(_, ref rx, _): &Chan<E, P>) -> A {
*Box::from_raw(rx.recv().unwrap() as *mut A)
}
unsafe fn try_read_chan<A: marker::Send + 'static, E, P>(
&Chan(_, ref rx, _): &Chan<E, P>,
) -> Option<A> {
match rx.try_recv() {
Ok(a) => Some(*Box::from_raw(a as *mut A)),
Err(_) => None,
}
}
#[allow(missing_copy_implementations)]
pub struct Z;
pub struct S<N>(PhantomData<N>);
#[allow(missing_copy_implementations)]
pub struct Eps;
pub struct Recv<A, P>(PhantomData<(A, P)>);
pub struct Send<A, P>(PhantomData<(A, P)>);
pub struct Choose<P, Q>(PhantomData<(P, Q)>);
pub struct Offer<P, Q>(PhantomData<(P, Q)>);
pub struct Rec<P>(PhantomData<P>);
pub struct Var<N>(PhantomData<N>);
pub trait HasDual: private::Sealed {
type Dual;
}
impl HasDual for Eps {
type Dual = Eps;
}
impl<A, P: HasDual> HasDual for Send<A, P> {
type Dual = Recv<A, P::Dual>;
}
impl<A, P: HasDual> HasDual for Recv<A, P> {
type Dual = Send<A, P::Dual>;
}
impl<P: HasDual, Q: HasDual> HasDual for Choose<P, Q> {
type Dual = Offer<P::Dual, Q::Dual>;
}
impl<P: HasDual, Q: HasDual> HasDual for Offer<P, Q> {
type Dual = Choose<P::Dual, Q::Dual>;
}
impl HasDual for Var<Z> {
type Dual = Var<Z>;
}
impl<N> HasDual for Var<S<N>> {
type Dual = Var<S<N>>;
}
impl<P: HasDual> HasDual for Rec<P> {
type Dual = Rec<P::Dual>;
}
pub enum Branch<L, R> {
Left(L),
Right(R),
}
impl<E, P> Drop for Chan<E, P> {
fn drop(&mut self) {
panic!("Session channel prematurely dropped");
}
}
impl<E> Chan<E, Eps> {
pub fn close(self) {
let this = mem::ManuallyDrop::new(self);
let sender = unsafe { ptr::read(&(this).0 as *const _) };
let receiver = unsafe { ptr::read(&(this).1 as *const _) };
drop(sender);
drop(receiver); }
}
impl<E, P> Chan<E, P> {
unsafe fn cast<E2, P2>(self) -> Chan<E2, P2> {
let this = mem::ManuallyDrop::new(self);
Chan(ptr::read(&(this).0 as *const _), ptr::read(&(this).1 as *const _), PhantomData)
}
}
impl<E, P, A: marker::Send + 'static> Chan<E, Send<A, P>> {
#[must_use]
pub fn send(self, v: A) -> Chan<E, P> {
unsafe {
write_chan(&self, v);
self.cast()
}
}
}
impl<E, P, A: marker::Send + 'static> Chan<E, Recv<A, P>> {
#[must_use]
pub fn recv(self) -> (Chan<E, P>, A) {
unsafe {
let v = read_chan(&self);
(self.cast(), v)
}
}
#[must_use]
pub fn try_recv(self) -> Result<(Chan<E, P>, A), Self> {
unsafe {
if let Some(v) = try_read_chan(&self) {
Ok((self.cast(), v))
} else {
Err(self)
}
}
}
}
impl<E, P, Q> Chan<E, Choose<P, Q>> {
#[must_use]
pub fn sel1(self) -> Chan<E, P> {
unsafe {
write_chan(&self, true);
self.cast()
}
}
#[must_use]
pub fn sel2(self) -> Chan<E, Q> {
unsafe {
write_chan(&self, false);
self.cast()
}
}
}
impl<Z, A, B> Chan<Z, Choose<A, B>> {
#[must_use]
pub fn skip(self) -> Chan<Z, B> {
self.sel2()
}
}
impl<Z, A, B, C> Chan<Z, Choose<A, Choose<B, C>>> {
#[must_use]
pub fn skip2(self) -> Chan<Z, C> {
self.sel2().sel2()
}
}
impl<Z, A, B, C, D> Chan<Z, Choose<A, Choose<B, Choose<C, D>>>> {
#[must_use]
pub fn skip3(self) -> Chan<Z, D> {
self.sel2().sel2().sel2()
}
}
impl<Z, A, B, C, D, E> Chan<Z, Choose<A, Choose<B, Choose<C, Choose<D, E>>>>> {
#[must_use]
pub fn skip4(self) -> Chan<Z, E> {
self.sel2().sel2().sel2().sel2()
}
}
impl<Z, A, B, C, D, E, F> Chan<Z, Choose<A, Choose<B, Choose<C, Choose<D, Choose<E, F>>>>>> {
#[must_use]
pub fn skip5(self) -> Chan<Z, F> {
self.sel2().sel2().sel2().sel2().sel2()
}
}
impl<Z, A, B, C, D, E, F, G>
Chan<Z, Choose<A, Choose<B, Choose<C, Choose<D, Choose<E, Choose<F, G>>>>>>> {
#[must_use]
pub fn skip6(self) -> Chan<Z, G> {
self.sel2().sel2().sel2().sel2().sel2().sel2()
}
}
impl<Z, A, B, C, D, E, F, G, H>
Chan<Z, Choose<A, Choose<B, Choose<C, Choose<D, Choose<E, Choose<F, Choose<G, H>>>>>>>> {
#[must_use]
pub fn skip7(self) -> Chan<Z, H> {
self.sel2().sel2().sel2().sel2().sel2().sel2().sel2()
}
}
impl<E, P, Q> Chan<E, Offer<P, Q>> {
#[must_use]
pub fn offer(self) -> Branch<Chan<E, P>, Chan<E, Q>> {
unsafe {
let b = read_chan(&self);
if b {
Left(self.cast())
} else {
Right(self.cast())
}
}
}
#[must_use]
pub fn try_offer(self) -> Result<Branch<Chan<E, P>, Chan<E, Q>>, Self> {
unsafe {
if let Some(b) = try_read_chan(&self) {
if b {
Ok(Left(self.cast()))
} else {
Ok(Right(self.cast()))
}
} else {
Err(self)
}
}
}
}
impl<E, P> Chan<E, Rec<P>> {
#[must_use]
pub fn enter(self) -> Chan<(P, E), P> {
unsafe { self.cast() }
}
}
impl<E, P> Chan<(P, E), Var<Z>> {
#[must_use]
pub fn zero(self) -> Chan<(P, E), P> {
unsafe { self.cast() }
}
}
impl<E, P, N> Chan<(P, E), Var<S<N>>> {
#[must_use]
pub fn succ(self) -> Chan<E, Var<N>> {
unsafe { self.cast() }
}
}
#[must_use]
pub fn hselect<E, P, A>(
mut chans: Vec<Chan<E, Recv<A, P>>>,
) -> (Chan<E, Recv<A, P>>, Vec<Chan<E, Recv<A, P>>>) {
let i = iselect(&chans);
let c = chans.remove(i);
(c, chans)
}
pub fn iselect<E, P, A>(chans: &Vec<Chan<E, Recv<A, P>>>) -> usize {
let mut map = HashMap::new();
let id = {
let mut sel = Select::new();
let mut handles = Vec::with_capacity(chans.len());
for (i, chan) in chans.iter().enumerate() {
let &Chan(_, ref rx, _) = chan;
let handle = sel.recv(rx);
map.insert(handle, i);
handles.push(handle);
}
let id = sel.ready();
id
};
map.remove(&id).unwrap()
}
pub struct ChanSelect<'c> {
receivers: Vec<&'c Receiver<*mut u8>>,
}
impl<'c> ChanSelect<'c> {
pub fn new() -> ChanSelect<'c> {
ChanSelect { receivers: Vec::new() }
}
pub fn add_recv<E, P, A: marker::Send>(&mut self, chan: &'c Chan<E, Recv<A, P>>) {
let &Chan(_, ref rx, _) = chan;
let _ = self.receivers.push(rx);
}
pub fn add_offer<E, P, Q>(&mut self, chan: &'c Chan<E, Offer<P, Q>>) {
let &Chan(_, ref rx, _) = chan;
let _ = self.receivers.push(rx);
}
pub fn wait(self) -> usize {
let mut sel = Select::new();
for rx in self.receivers.into_iter() {
sel.recv(rx);
}
sel.ready()
}
pub fn len(&self) -> usize {
self.receivers.len()
}
}
#[must_use]
pub fn session_channel<P: HasDual>() -> (Chan<(), P>, Chan<(), P::Dual>) {
let (tx1, rx1) = unbounded();
let (tx2, rx2) = unbounded();
let c1 = Chan(tx1, rx2, PhantomData);
let c2 = Chan(tx2, rx1, PhantomData);
(c1, c2)
}
pub fn connect<F1, F2, P>(srv: F1, cli: F2)
where
F1: Fn(Chan<(), P>) + marker::Send + 'static,
F2: Fn(Chan<(), P::Dual>) + marker::Send,
P: HasDual + marker::Send + 'static,
P::Dual: HasDual + marker::Send + 'static
{
let (c1, c2) = session_channel();
let t = spawn(move || srv(c1));
cli(c2);
t.join().unwrap();
}
mod private {
use super::*;
pub trait Sealed {}
impl Sealed for Eps {}
impl<A, P> Sealed for Send<A, P> {}
impl<A, P> Sealed for Recv<A, P> {}
impl<P, Q> Sealed for Choose<P, Q> {}
impl<P, Q> Sealed for Offer<P, Q> {}
impl<Z> Sealed for Var<Z> {}
impl<P> Sealed for Rec<P> {}
}
#[macro_export]
macro_rules! offer {
(
$id:ident, $branch:ident => $code:expr, $($t:tt)+
) => (
match $id.offer() {
$crate::Left($id) => $code,
$crate::Right($id) => offer!{ $id, $($t)+ }
}
);
(
$id:ident, $branch:ident => $code:expr
) => (
$code
)
}
#[macro_export]
macro_rules! try_offer {
(
$id:ident, $branch:ident => $code:expr, $($t:tt)+
) => (
match $id.try_offer() {
Ok($crate::Left($id)) => $code,
Ok($crate::Right($id)) => try_offer!{ $id, $($t)+ },
Err($id) => Err($id)
}
);
(
$id:ident, $branch:ident => $code:expr
) => (
$code
)
}
#[macro_export]
macro_rules! chan_select {
(
$(($c:ident, $name:pat) = $rx:ident.recv() => $code:expr),+
) => ({
let index = {
let mut sel = $crate::ChanSelect::new();
$( sel.add_recv(&$rx); )+
sel.wait()
};
let mut i = 0;
$( if index == { i += 1; i - 1 } { let ($c, $name) = $rx.recv(); $code }
else )+
{ unreachable!() }
});
(
$($res:ident = $rx:ident.offer() => { $($t:tt)+ }),+
) => ({
let index = {
let mut sel = $crate::ChanSelect::new();
$( sel.add_offer(&$rx); )+
sel.wait()
};
let mut i = 0;
$( if index == { i += 1; i - 1 } { $res = offer!{ $rx, $($t)+ } } else )+
{ unreachable!() }
})
}