use super::Session;
use futures::{channel::oneshot, FutureExt};
use std::marker;
#[must_use]
pub struct Recv<T, S: Session = ()> {
rx: oneshot::Receiver<Exchange<T, S>>,
}
#[must_use]
pub struct Send<T, S: Session = ()> {
tx: oneshot::Sender<Exchange<T, S::Dual>>,
}
enum Exchange<T, S: Session> {
Send((T, S)),
Link(Recv<T, S>),
}
impl<T, S: Session> Session for Recv<T, S>
where
T: marker::Send + 'static,
{
type Dual = Send<T, S::Dual>;
fn fork_sync(f: impl FnOnce(Self::Dual)) -> Self {
let (recv, send) = endpoints();
f(send);
recv
}
fn link(self, dual: Self::Dual) {
dual.link(self)
}
}
impl<T, S: Session> Session for Send<T, S>
where
T: marker::Send + 'static,
{
type Dual = Recv<T, S::Dual>;
fn fork_sync(f: impl FnOnce(Self::Dual)) -> Self {
let (recv, send) = endpoints();
f(recv);
send
}
fn link(self, dual: Self::Dual) {
self.tx
.send(Exchange::Link(dual))
.ok()
.expect("receiver dropped")
}
}
fn endpoints<T, S: Session>() -> (Recv<T, S>, Send<T, S::Dual>)
where
T: marker::Send + 'static,
{
let (tx, rx) = oneshot::channel();
(Recv { rx }, Send { tx })
}
impl<T, S: Session> Recv<T, S>
where
T: marker::Send + 'static,
{
#[must_use]
pub async fn recv(mut self) -> (T, S) {
loop {
match self.rx.await.expect("sender dropped") {
Exchange::Send(x) => break x,
Exchange::Link(r) => self = r,
}
}
}
#[must_use]
pub fn poll_recv(mut self, cx: &mut std::task::Context<'_>) -> Result<(T, S), Self> {
use std::task::Poll;
loop {
self = match self.rx.poll_unpin(cx) {
Poll::Ready(Ok(Exchange::Send(x))) => return Ok(x),
Poll::Ready(Ok(Exchange::Link(r))) => r,
Poll::Ready(Err(oneshot::Canceled)) => panic!("sender dropped"),
Poll::Pending => return Err(self),
}
}
}
}
impl<T> Recv<T, ()>
where
T: marker::Send + 'static,
{
pub async fn recv1(self) -> T {
self.recv().await.0
}
#[must_use]
pub fn poll_recv1(self, cx: &mut std::task::Context<'_>) -> Result<T, Self> {
self.poll_recv(cx).map(|(t, ())| t)
}
}
impl<T, S: Session> Send<T, S>
where
T: marker::Send + 'static,
{
#[must_use]
pub fn send(self, value: T) -> S {
S::fork_sync(|dual| {
self.tx
.send(Exchange::Send((value, dual)))
.ok()
.expect("receiver dropped")
})
}
}
impl<T> Send<T, ()>
where
T: marker::Send + 'static,
{
pub fn send1(self, value: T) {
self.send(value)
}
#[must_use]
pub fn choose<S: Session>(self, choice: impl FnOnce(S) -> T) -> S::Dual {
S::Dual::fork_sync(|session| self.send1(choice(session)))
}
}
impl<S: Session> Send<S, ()> {
#[must_use]
pub fn handle(self) -> S::Dual {
S::Dual::fork_sync(|session| self.send1(session))
}
}