#![feature(trait_alias)]
#![feature(async_closure)]
use std::{error::Error, marker, marker::PhantomData, mem::ManuallyDrop, thread, time::Duration};
use tokio::time::timeout as timeout_after;
pub type Receiver<T> = tokio::sync::mpsc::UnboundedReceiver<T>;
pub type Sender<T> = tokio::sync::mpsc::UnboundedSender<T>;
fn unbounded_channel<T>() -> (Sender<T>, Receiver<T>) {
tokio::sync::mpsc::unbounded_channel()
}
#[cfg(feature = "mux")]
pub mod multiplexing;
mod repr;
pub use repr::{DynMessage, Repr};
#[derive(Debug)]
pub enum SessionError {
UnexpectedMessage(DynMessage),
Disconnected,
Timeout,
Abort(Box<dyn Error + marker::Send + 'static>),
}
impl std::fmt::Display for SessionError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self)
}
}
impl Error for SessionError {}
pub type SessionResult<T> = Result<T, SessionError>;
pub fn ok<T>(value: T) -> SessionResult<T> {
Ok(value)
}
fn downcast<T, R: Repr<T>>(msg: R) -> SessionResult<T> {
msg.try_into()
.map_err(|msg| SessionError::UnexpectedMessage(Box::new(msg)))
}
pub struct Chan<P, E, R> {
tx: ManuallyDrop<Sender<R>>,
rx: ManuallyDrop<Receiver<R>>,
stash: ManuallyDrop<Option<R>>,
_phantom: PhantomData<(P, E)>,
}
impl<P, E, R> Chan<P, E, R> {
fn new(tx: Sender<R>, rx: Receiver<R>) -> Chan<P, E, R> {
Chan {
tx: ManuallyDrop::new(tx),
rx: ManuallyDrop::new(rx),
stash: ManuallyDrop::new(None),
_phantom: PhantomData,
}
}
}
fn write_chan<T, P, E, R: Repr<T>>(chan: &Chan<P, E, R>, v: T) -> SessionResult<()> {
chan.tx
.send(Repr::from(v))
.map_err(|_| SessionError::Disconnected)
}
async fn read_chan<T, P, E, R: Repr<T>>(
chan: &mut Chan<P, E, R>,
timeout: Duration,
) -> SessionResult<T> {
let msg = read_chan_dyn(chan, timeout).await?;
downcast(msg)
}
async fn read_chan_dyn<P, E, R>(chan: &mut Chan<P, E, R>, timeout: Duration) -> SessionResult<R> {
match chan.stash.take() {
Some(msg) => Ok(msg),
None if timeout == Duration::MAX => match chan.rx.recv().await {
Some(msg) => Ok(msg),
None => Err(SessionError::Disconnected),
},
None => match timeout_after(timeout, chan.rx.recv()).await {
Ok(Some(msg)) => Ok(msg),
Ok(None) => Err(SessionError::Disconnected),
Err(_) => Err(SessionError::Timeout),
},
}
}
fn close_chan<P, E, R>(chan: Chan<P, E, R>) {
let mut this = ManuallyDrop::new(chan);
unsafe {
ManuallyDrop::drop(&mut this.tx);
ManuallyDrop::drop(&mut this.rx);
ManuallyDrop::drop(&mut this.stash);
}
}
pub struct Z;
pub struct S<N>(PhantomData<N>);
pub struct Eps;
pub struct Recv<T, P>(PhantomData<(T, P)>);
pub struct Send<T, P>(PhantomData<(T, P)>);
pub struct Choose<P: Outgoing, Q: Outgoing>(PhantomData<(P, Q)>);
pub struct Offer<P: Incoming, Q: Incoming>(PhantomData<(P, Q)>);
pub struct Rec<P>(PhantomData<P>);
pub struct Var<N>(PhantomData<N>);
pub trait Incoming {
type Expected;
}
impl<T, P> Incoming for Recv<T, P> {
type Expected = T;
}
impl<P: Incoming, Q: Incoming> Incoming for Offer<P, Q> {
type Expected = P::Expected;
}
pub trait Outgoing {}
impl<T, P> Outgoing for Send<T, P> {}
impl<P: Outgoing, Q: Outgoing> Outgoing for Choose<P, Q> {}
pub trait HasDual {
type Dual;
}
impl HasDual for Eps {
type Dual = Eps;
}
impl<A, P: HasDual> HasDual for Send<A, P> {
type Dual = Recv<A, P::Dual>;
}
impl<A, P: HasDual> HasDual for Recv<A, P> {
type Dual = Send<A, P::Dual>;
}
impl<P: HasDual, Q: HasDual> HasDual for Choose<P, Q>
where
P: Outgoing,
Q: Outgoing,
P::Dual: Incoming,
Q::Dual: Incoming,
{
type Dual = Offer<P::Dual, Q::Dual>;
}
impl<P: HasDual, Q: HasDual> HasDual for Offer<P, Q>
where
P: Incoming,
Q: Incoming,
P::Dual: Outgoing,
Q::Dual: Outgoing,
{
type Dual = Choose<P::Dual, Q::Dual>;
}
impl HasDual for Var<Z> {
type Dual = Var<Z>;
}
impl<N> HasDual for Var<S<N>> {
type Dual = Var<S<N>>;
}
impl<P: HasDual> HasDual for Rec<P> {
type Dual = Rec<P::Dual>;
}
pub enum Branch<L, R> {
Left(L),
Right(R),
}
impl<P, E, R> Drop for Chan<P, E, R> {
fn drop(&mut self) {
if !thread::panicking() {
panic!("Session channel prematurely dropped. Must call `.close()`.");
}
}
}
impl<E, R> Chan<Eps, E, R> {
pub fn close(self) -> SessionResult<()> {
close_chan(self);
Ok(())
}
}
impl<P, E, R> Chan<P, E, R> {
fn cast<P2, E2>(self) -> Chan<P2, E2, R> {
let mut this = ManuallyDrop::new(self);
unsafe {
Chan {
tx: ManuallyDrop::new(ManuallyDrop::take(&mut this.tx)),
rx: ManuallyDrop::new(ManuallyDrop::take(&mut this.rx)),
stash: ManuallyDrop::new(ManuallyDrop::take(&mut this.stash)),
_phantom: PhantomData,
}
}
}
pub fn abort<T, F: Error + marker::Send + 'static>(self, e: F) -> SessionResult<T> {
close_chan(self);
Err(SessionError::Abort(Box::new(e)))
}
pub fn abort_dyn<T>(self, e: Box<dyn Error + marker::Send>) -> SessionResult<T> {
close_chan(self);
Err(SessionError::Abort(e))
}
}
impl<P, E, T, R: Repr<T>> Chan<Send<T, P>, E, R> {
pub fn send(self, v: T) -> SessionResult<Chan<P, E, R>> {
match write_chan(&self, v) {
Ok(()) => Ok(self.cast()),
Err(e) => {
close_chan(self);
Err(e)
}
}
}
}
impl<P, E, T, R: Repr<T>> Chan<Recv<T, P>, E, R> {
pub async fn recv(mut self, timeout: Duration) -> SessionResult<(Chan<P, E, R>, T)> {
match read_chan(&mut self, timeout).await {
Ok(v) => Ok((self.cast(), v)),
Err(e) => {
close_chan(self);
Err(e)
}
}
}
}
impl<P: Outgoing, Q: Outgoing, E, R> Chan<Choose<P, Q>, E, R> {
pub fn sel1(self) -> Chan<P, E, R> {
self.cast()
}
pub fn sel2(self) -> Chan<Q, E, R> {
self.cast()
}
}
type OfferBranch<P, Q, E, R> = Branch<Chan<P, E, R>, Chan<Q, E, R>>;
impl<P: Incoming, Q: Incoming, E, R> Chan<Offer<P, Q>, E, R>
where
P::Expected: 'static,
R: Repr<P::Expected>,
{
fn stash(mut self, msg: R) -> Self {
self.stash = ManuallyDrop::new(Some(msg));
self
}
pub async fn offer(mut self, t: Duration) -> SessionResult<OfferBranch<P, Q, E, R>> {
let msg = match read_chan_dyn(&mut self, t).await {
Ok(msg) => msg,
Err(e) => {
close_chan(self);
return Err(e);
}
};
if Repr::<P::Expected>::can_into(&msg) {
Ok(Branch::Left(self.stash(msg).cast()))
} else {
Ok(Branch::Right(self.stash(msg).cast()))
}
}
}
impl<P, E, R> Chan<Rec<P>, E, R> {
pub fn enter(self) -> Chan<P, (P, E), R> {
self.cast()
}
}
impl<P, E, R> Chan<Var<Z>, (P, E), R> {
pub fn zero(self) -> SessionResult<Chan<P, (P, E), R>> {
Ok(self.cast())
}
}
impl<P, E, N, R> Chan<Var<S<N>>, (P, E), R> {
pub fn succ(self) -> Chan<Var<N>, E, R> {
self.cast()
}
}
type ChanPair<P, R> = (Chan<P, (), R>, Chan<<P as HasDual>::Dual, (), R>);
type ChanDynPair<P, R> = (Chan<P, (), R>, (Sender<R>, Receiver<R>));
pub fn session_channel<P: HasDual, R>() -> ChanPair<P, R> {
let (tx1, rx1) = unbounded_channel();
let (tx2, rx2) = unbounded_channel();
let c1 = Chan::new(tx1, rx2);
let c2 = Chan::new(tx2, rx1);
(c1, c2)
}
pub fn session_channel_dyn<P, R>() -> ChanDynPair<P, R> {
let (tx1, rx1) = unbounded_channel();
let (tx2, rx2) = unbounded_channel();
let c = Chan::new(tx1, rx2);
(c, (tx2, rx1))
}
#[macro_export]
macro_rules! offer {
(
$id:ident, $timeout:expr, $branch:ident => $code:expr, $($t:tt)+
) => (
match $id.offer($timeout).await? {
$crate::Branch::Left($id) => $code,
$crate::Branch::Right($id) => offer!{ $id, $timeout, $($t)+ }
}
);
(
$id:ident, $timeout:expr, $branch:ident => $code:expr
) => (
$code
)
}
#[cfg(test)]
mod test;