#![feature(never_type)]
extern crate crossbeam_channel;
extern crate either;
use std::boxed::Box;
use std::error::Error;
use std::fmt;
use std::marker;
use std::mem;
use std::thread::{JoinHandle, spawn};
use std::panic;
use crossbeam_channel::{Sender, Receiver, bounded, Select};
use either::Either;
#[must_use]
#[derive(Debug)]
pub struct End {
sender: Sender<()>,
receiver: Receiver<()>,
}
#[must_use]
#[derive(Debug)]
pub struct Send<T, S: Session> {
channel: Sender<(T, S::Dual)>,
}
#[must_use]
#[derive(Debug)]
pub struct Recv<T, S: Session> {
channel: Receiver<(T, S)>,
}
pub trait Session: marker::Sized + marker::Send {
type Dual: Session<Dual=Self>;
#[doc(hidden)]
fn new() -> (Self, Self::Dual);
}
impl Session for End {
type Dual = End;
fn new() -> (Self, Self::Dual) {
let (sender1, receiver1) = bounded::<()>(1);
let (sender2, receiver2) = bounded::<()>(1);
return (End { sender: sender1, receiver: receiver2 },
End { sender: sender2, receiver: receiver1 });
}
}
impl<T: marker::Send, S: Session> Session for Send<T, S> {
type Dual = Recv<T, S::Dual>;
fn new() -> (Self, Self::Dual) {
let (sender, receiver) = bounded::<(T, S::Dual)>(1);
return (Send { channel: sender }, Recv { channel: receiver });
}
}
impl<T: marker::Send, S: Session> Session for Recv<T, S> {
type Dual = Send<T, S::Dual>;
fn new() -> (Self, Self::Dual) {
let (there, here) = Self::Dual::new();
return (here, there);
}
}
pub fn send<T, S>(x: T, s: Send<T, S>) -> S
where
T: marker::Send,
S: Session,
{
let (here, there) = S::new();
s.channel.send((x, there)).unwrap_or(());
here
}
pub fn recv<T, S>(s: Recv<T, S>) -> Result<(T, S), Box<dyn Error>>
where
T: marker::Send,
S: Session,
{
let (v, s) = s.channel.recv()?;
Ok((v, s))
}
pub fn cancel<T>(x: T) -> () {
mem::drop(x);
}
pub fn close(s: End) -> Result<(), Box<dyn Error>> {
s.sender.send(()).unwrap_or(());
s.receiver.recv()?;
Ok(())
}
pub fn fork_with_thread_id<S, P>(p: P) -> (JoinHandle<()>, S::Dual)
where
S: Session + 'static,
P: FnOnce(S) -> Result<(), Box<dyn Error>> + marker::Send + 'static
{
let (there, here) = Session::new();
let other_thread = spawn(move || {
panic::set_hook(Box::new(|_info| {
}));
match p(there) {
Ok(()) => (),
Err(e) => panic!("{}", e.description()),
}
});
(other_thread, here)
}
pub fn fork<S, P>(p: P) -> S::Dual
where
S: Session + 'static,
P: FnOnce(S) -> Result<(), Box<dyn Error>> + marker::Send + 'static
{
fork_with_thread_id(p).1
}
pub type Offer<S1, S2> =
Recv<Either<S1, S2>, End>;
pub type Choose<S1, S2> =
Send<Either<<S1 as Session>::Dual, <S2 as Session>::Dual>, End>;
pub fn offer_either<'a, S1, S2, F, G, R>(s: Offer<S1, S2>, f: F, g: G)
-> Result<R, Box<dyn Error + 'a>>
where
S1: Session,
S2: Session,
F: FnOnce(S1) -> Result<R, Box<dyn Error + 'a>>,
G: FnOnce(S2) -> Result<R, Box<dyn Error + 'a>>,
{
let (e, s) = recv(s)?;
cancel(s);
e.either(f, g)
}
pub fn choose_left<'a, S1, S2>(s: Choose<S1, S2>) -> S1
where
S1: Session + 'a,
S2: Session + 'a,
{
let (here, there) = S1::new();
let s = send(Either::Left(there), s);
cancel(s);
here
}
pub fn choose_right<'a, S1, S2>(s: Choose<S1, S2>) -> S2
where
S1: Session + 'a,
S2: Session + 'a,
{
let (here, there) = S2::new();
let s = send(Either::Right(there), s);
cancel(s);
here
}
#[macro_export]
macro_rules! offer {
($session:expr, { $($pat:pat => $result:expr,)* }) => {
(move || -> Result<_, _> {
let (l, s) = recv($session)?;
cancel(s);
match l {
$(
$pat => $result,
)*
}
})()
};
}
#[macro_export]
macro_rules! choose {
($label:path, $session:expr) => {{
let (here, there) = <_ as Session>::new();
let s = send($label(there), $session);
cancel(s);
here
}};
}
#[derive(Debug)]
enum SelectError {
EmptyVec,
}
impl fmt::Display for SelectError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
SelectError::EmptyVec =>
write!(f, "please use a vector with at least one element"),
}
}
}
impl Error for SelectError {
fn description(&self) -> &str {
match *self {
SelectError::EmptyVec => "empty vectors not allowed",
}
}
fn cause(&self) -> Option<&dyn Error> {
match *self {
SelectError::EmptyVec => None,
}
}
}
pub fn select_mut<T, S>(rs: &mut Vec<Recv<T, S>>) -> Result<(T, S), Box<dyn Error>>
where
T: marker::Send,
S: Session,
{
if rs.is_empty() {
Err(Box::new(SelectError::EmptyVec))
}
else {
let (index, res) = {
let mut sel = Select::new();
let iter = rs.iter();
for r in iter {
sel.recv(&r.channel);
}
loop {
let index = sel.ready();
let res = rs[index].channel.try_recv();
if let Err(e) = res {
if e.is_empty() {
continue;
}
}
break (index, res);
}
};
let _ = rs.swap_remove(index);
match res {
Ok(res) => Ok(res),
Err(e) => Err(Box::new(e)),
}
}
}
pub fn select<T, S>(rs: Vec<Recv<T, S>>) -> (Result<(T, S), Box<dyn Error>>, Vec<Recv<T, S>>)
where
T: marker::Send,
S: Session,
{
let mut rs = rs;
let res = select_mut(&mut rs);
(res, rs)
}