Struct dialectic::Chan [−][src]
A bidirectional communications channel using the session type P
over the connections Tx
and
Rx
.
Creating new Chan
s: use Session
The Session
trait is implemented for all valid session types. To create a new Chan
for
some session type, use one of the provided static methods. Here, we create two Chan
s with the
session type send String
and its dual recv String
, wrapping an underlying bidirectional
transport built from a pair of tokio::sync::mpsc::channel
s:
use dialectic::prelude::*; use dialectic_tokio_mpsc as mpsc; // Make a pair of channels: // - `c1` with the session type `send String`, and // - `c2` with the dual session type `recv String` let (c1, c2) = <Session! { send String }>::channel(|| mpsc::channel(1));
If you already have a sender and receiver and want to wrap them in a Chan
, use the
wrap
method for a session type. This is useful, for example, if you’re
talking to another process over a network connection, where it’s not possible to build both
halves of the channel on one computer, and instead each computer will wrap one end of the
connection:
let (tx, rx) = /* ... */; let c = <Session! { send String }>::wrap(tx, rx);
Implementations
impl<Tx, Rx, S> Chan<S, Tx, Rx> where
S: Session,
Tx: Send + 'static,
Rx: Send + 'static,
[src]
S: Session,
Tx: Send + 'static,
Rx: Send + 'static,
pub fn close(self) where
S: Session<Action = Done>,
[src]
S: Session<Action = Done>,
Close a finished session, dropping the underlying connections.
If called inside a future given to split
or call
, the
underlying connections are implicitly recovered for use in subsequent actions in the
session, or if called in a future given to in over
, are returned to the
caller.
Examples
Starting with a channel whose session type is already Done
, we can immediately close the
channel.
use dialectic::prelude::*; use dialectic_tokio_mpsc as mpsc; let (c1, c2) = <Session! {}>::channel(mpsc::unbounded_channel); c1.close(); c2.close();
However, if the channel’s session type is not Done
, it is a type error to attempt to
close the channel. The following code will not compile:
use dialectic::prelude::*; use dialectic_tokio_mpsc as mpsc; let (c1, c2) = <Session! { loop { send String } }>::channel(mpsc::unbounded_channel); c1.close(); c2.close();
If you really want to destruct a channel before the end of its session, use
into_inner
, but beware that this may cause the party on the other end
of the channel to throw errors due to your violation of the channel’s protocol!
pub async fn recv<T, P>(self) -> Result<(T, Chan<P, Tx, Rx>), Rx::Error> where
S: Session<Action = Recv<T, P>>,
P: Session,
Rx: Receive<T>,
[src]
S: Session<Action = Recv<T, P>>,
P: Session,
Rx: Receive<T>,
Receive something of type T
on the channel, returning the pair of the received object and
the channel.
Errors
This function returns the Receiver::Error
for the underlying Rx
connection if there
was an error while receiving.
Examples
use dialectic::prelude::*; use dialectic_tokio_mpsc as mpsc; let (c1, c2) = <Session! { recv String }>::channel(|| mpsc::channel(1)); c2.send("Hello, world!".to_string()).await?; let (s, c1) = c1.recv().await?; assert_eq!(s, "Hello, world!");
pub async fn send<T, P>(self, message: T) -> Result<Chan<P, Tx, Rx>, Tx::Error> where
S: Session<Action = Send<T, P>>,
P: Session,
Tx: Transmit<T>,
T: Send,
[src]
S: Session<Action = Send<T, P>>,
P: Session,
Tx: Transmit<T>,
T: Send,
Send something of type T
on the channel by value, returning the channel.
Errors
This function returns the Transmitter::Error
for the underlying Tx
connection if there
was an error while sending.
Examples
use dialectic::prelude::*; use dialectic_tokio_mpsc as mpsc; let (c1, c2) = <Session! { send String }>::channel(|| mpsc::channel(1)); c1.send("Hello, world!".to_string()).await?; let (s, c2) = c2.recv().await?; assert_eq!(s, "Hello, world!");
pub async fn send_ref<T, P>(
self,
message: &T
) -> Result<Chan<P, Tx, Rx>, Tx::Error> where
S: Session<Action = Send<T, P>>,
P: Session,
Tx: Transmit<T, Ref>,
T: Send,
[src]
self,
message: &T
) -> Result<Chan<P, Tx, Rx>, Tx::Error> where
S: Session<Action = Send<T, P>>,
P: Session,
Tx: Transmit<T, Ref>,
T: Send,
Send something of type T
on the channel by reference, returning the channel.
Errors
This function returns the Transmitter::Error
for the underlying Tx
connection if there
was an error while sending.
Examples
use dialectic::prelude::*; use dialectic_tokio_mpsc as mpsc; let (c1, c2) = <Session! { send String }>::channel(|| mpsc::channel(1)); c1.send_ref(&"Hello, world!".to_string()).await?; let (s, c2) = c2.recv().await?; assert_eq!(s, "Hello, world!");
pub async fn send_mut<T, P>(
self,
message: &mut T
) -> Result<Chan<P, Tx, Rx>, Tx::Error> where
S: Session<Action = Send<T, P>>,
P: Session,
Tx: Transmit<T, Mut>,
T: Send,
[src]
self,
message: &mut T
) -> Result<Chan<P, Tx, Rx>, Tx::Error> where
S: Session<Action = Send<T, P>>,
P: Session,
Tx: Transmit<T, Mut>,
T: Send,
Send something of type T
on the channel by mutable reference, returning the channel.
Errors
This function returns the Transmitter::Error
for the underlying Tx
connection if there
was an error while sending.
Examples
use dialectic::prelude::*; use dialectic_tokio_mpsc as mpsc; let (c1, c2) = <Session! { send String }>::channel(|| mpsc::channel(1)); let mut string = "Hello, world!".to_string(); c1.send_mut(&mut string).await?; let (s, c2) = c2.recv().await?; assert_eq!(s, "Hello, world!");
impl<Tx, Rx, S, Choices, const LENGTH: usize> Chan<S, Tx, Rx> where
S: Session<Action = Choose<Choices>>,
Choices: Tuple,
Choices::AsList: HasLength,
<Choices::AsList as HasLength>::Length: ToConstant<AsConstant = Number<LENGTH>>,
Tx: Transmitter + Send + 'static,
Rx: Send + 'static,
[src]
S: Session<Action = Choose<Choices>>,
Choices: Tuple,
Choices::AsList: HasLength,
<Choices::AsList as HasLength>::Length: ToConstant<AsConstant = Number<LENGTH>>,
Tx: Transmitter + Send + 'static,
Rx: Send + 'static,
pub async fn choose<const N: usize>(
self
) -> Result<Chan<<Choices::AsList as Select<<Number<N> as ToUnary>::AsUnary>>::Selected, Tx, Rx>, Tx::Error> where
Number<N>: ToUnary,
Choices::AsList: Select<<Number<N> as ToUnary>::AsUnary>,
<Choices::AsList as Select<<Number<N> as ToUnary>::AsUnary>>::Selected: Session,
[src]
self
) -> Result<Chan<<Choices::AsList as Select<<Number<N> as ToUnary>::AsUnary>>::Selected, Tx, Rx>, Tx::Error> where
Number<N>: ToUnary,
Choices::AsList: Select<<Number<N> as ToUnary>::AsUnary>,
<Choices::AsList as Select<<Number<N> as ToUnary>::AsUnary>>::Selected: Session,
Actively choose to enter the N
th protocol offered via offer!
by the
other end of the connection, alerting the other party to this choice by sending the number
N
over the channel.
The choice N
is specified as a const
generic usize
.
Errors
This function returns the Transmitter::Error
for the underlying Tx
connection if there
was an error while sending the choice.
Examples
use dialectic::prelude::*; use dialectic_tokio_mpsc as mpsc; type GiveOrTake = Session! { choose { 0 => send i64, 1 => recv String, } }; let (c1, c2) = GiveOrTake::channel(|| mpsc::channel(1)); // Spawn a thread to offer a choice let t1 = tokio::spawn(async move { offer!(in c2 { 0 => { c2.recv().await?; }, 1 => { c2.send("Hello!".to_string()).await?; }, }); Ok::<_, mpsc::Error>(()) }); // Choose to send an integer c1.choose::<0>().await?.send(42).await?; // Wait for the offering thread to finish t1.await??;
Attempting to choose an index that’s out of bounds results in a compile-time error:
use dialectic::prelude::*; use dialectic_tokio_mpsc as mpsc; type OnlyTwoChoices = Choose<(Done, Done)>; let (c1, c2) = OnlyTwoChoices::channel(|| mpsc::channel(1)); // Try to choose something out of range (this doesn't typecheck) c1.choose::<2>().await?;
impl<Tx, Rx, S, Choices, const LENGTH: usize> Chan<S, Tx, Rx> where
S: Session<Action = Offer<Choices>>,
Choices: Tuple + 'static,
Choices::AsList: HasLength + EachScoped + EachHasDual,
<Choices::AsList as HasLength>::Length: ToConstant<AsConstant = Number<LENGTH>>,
Z: LessThan<<Choices::AsList as HasLength>::Length>,
Tx: Send + 'static,
Rx: Receiver + Send + 'static,
[src]
S: Session<Action = Offer<Choices>>,
Choices: Tuple + 'static,
Choices::AsList: HasLength + EachScoped + EachHasDual,
<Choices::AsList as HasLength>::Length: ToConstant<AsConstant = Number<LENGTH>>,
Z: LessThan<<Choices::AsList as HasLength>::Length>,
Tx: Send + 'static,
Rx: Receiver + Send + 'static,
pub async fn offer(self) -> Result<Branches<Choices, Tx, Rx>, Rx::Error>
[src]
Offer the choice of one or more protocols to the other party, and wait for them to indicate
which protocol they’d like to proceed with. Returns a Branches
structure representing
all the possible channel types which could be returned, which must be eliminated using
case
.
💡 Where possible, prefer the offer!
macro. This has the benefit of
ensuring at compile time that no case is left unhandled; it’s also more succinct.
Errors
This function returns the Receiver::Error
for the underlying Rx
connection if there
was an error while receiving.
Examples
use dialectic::prelude::*; use dialectic_tokio_mpsc as mpsc; type GiveOrTake = Session! { choose { 0 => send i64, 1 => recv String, } }; let (c1, c2) = GiveOrTake::channel(|| mpsc::channel(1)); // Spawn a thread to offer a choice let t1 = tokio::spawn(async move { match c2.offer().await?.case::<0>() { Ok(c2) => { c2.recv().await?; }, Err(rest) => match rest.case::<0>() { Ok(c2) => { c2.send("Hello!".to_string()).await?; }, Err(rest) => rest.empty_case(), } } Ok::<_, mpsc::Error>(()) }); // Choose to send an integer c1.choose::<0>().await?.send(42).await?; // Wait for the offering thread to finish t1.await??;
Notice how the handling of cases by manual match
is harder to read than the equivalent in
terms of offer!
:
offer!(in c2 { 0 => { c2.recv().await?; }, 1 => { c2.send("Hello!".to_string()).await?; }, });
impl<Tx, Rx, S> Chan<S, Tx, Rx> where
S: Session,
Tx: Send + 'static,
Rx: Send + 'static,
[src]
S: Session,
Tx: Send + 'static,
Rx: Send + 'static,
pub async fn call<T, E, P, Q, F, Fut>(
self,
first: F
) -> Result<(T, Result<Chan<Q, Tx, Rx>, SessionIncomplete<Tx, Rx>>), E> where
S: Session<Action = Call<P, Q>>,
P: Session,
Q: Session,
F: FnOnce(Chan<P, Tx, Rx>) -> Fut,
Fut: Future<Output = Result<T, E>>,
[src]
self,
first: F
) -> Result<(T, Result<Chan<Q, Tx, Rx>, SessionIncomplete<Tx, Rx>>), E> where
S: Session<Action = Call<P, Q>>,
P: Session,
Q: Session,
F: FnOnce(Chan<P, Tx, Rx>) -> Fut,
Fut: Future<Output = Result<T, E>>,
Execute the session type P
as a subroutine in a closure.
This operation takes as input an asynchronous closure that runs a channel for the session
type P
to completion and returns either an error Err
or some result value T
. The
result of this (provided that no errors occurred during P
) is a channel ready to execute
the session type Q
.
Errors
The closure must finish the session P
on the channel given to it and drop the finished
channel before the future returns. If the channel is dropped before completing P
or is not
dropped after completing P
, a SessionIncomplete
error will be returned instead of a
channel for Q
. The best way to ensure this error does not occur is to call
close
on the channel before returning from the future, because this
statically checks that the session is complete and drops the channel.
Additionally, this function returns an Err
if the closure returns an Err
.
Examples
This can be used to cleanly modularize a session-typed program by splitting it up into independent subroutines:
use dialectic::prelude::*; use dialectic_tokio_mpsc as mpsc; let (c1, c2) = <Session! { call { send String }; send String; }>::channel(mpsc::unbounded_channel); let ((), c1_result) = c1.call(|c| async move { let c = c.send("Hello!".to_string()).await?; // Because we're done with this subroutine, we can "close" the channel here, but it // will remain open to the calling context so it can run the rest of the session: c.close(); Ok::<_, mpsc::Error>(()) }).await?; let c1 = c1_result?; let c1 = c1.send("World!".to_string()).await?; c1.close();
More generally, this construct permits the expression of context-free session types, by
allowing recursion in the first parameter to Call
. For a demonstration of this, see the
stack
example. For more
background on context-free session types, see the paper Context-Free Session Type
Inference by Luca Padovani. When comparing with that
paper, note that the call
operation is roughly equivalent to the paper’s
@=
operator, and the Call
type is equivalent to the paper’s ;
type operator.
pub async fn split<T, E, P, Q, R, F, Fut>(
self,
with_parts: F
) -> Result<(T, Result<Chan<R, Tx, Rx>, SessionIncomplete<Tx, Rx>>), E> where
S: Session<Action = Split<P, Q, R>>,
P: Session,
Q: Session,
R: Session,
F: FnOnce(Chan<P, Tx, Unavailable>, Chan<Q, Unavailable, Rx>) -> Fut,
Fut: Future<Output = Result<T, E>>,
[src]
self,
with_parts: F
) -> Result<(T, Result<Chan<R, Tx, Rx>, SessionIncomplete<Tx, Rx>>), E> where
S: Session<Action = Split<P, Q, R>>,
P: Session,
Q: Session,
R: Session,
F: FnOnce(Chan<P, Tx, Unavailable>, Chan<Q, Unavailable, Rx>) -> Fut,
Fut: Future<Output = Result<T, E>>,
Split a channel into transmit-only and receive-only ends and manipulate them, potentially concurrently, in the given closure.
This is akin to call
, except the closure is given two Chan
s: one which
can only do Transmit
operations (Send
and Choose
) and one which can only do
Receive
operations (Recv
and Offer
). The result of the call to
split
is a re-unified Chan
ready to execute the session R
.
Errors
The closure must finish the session for both the send-only and receive-only ends of the
channel and drop or close
each end before the future completes. If either
end is dropped before finishing its session, or is not closed after finishing its session, a
SessionIncomplete
error will be returned instead of a finished channel.
Examples
In this example, both ends of a channel concurrently interact with its split send/receive halves. If the underlying channel implementation allows for parallelism, this simultaneous interaction can be faster than sequentially sending data back and forth.
use dialectic::prelude::*; use dialectic_tokio_mpsc as mpsc; type SendAndRecv = Session! { split { -> send Vec<usize>, <- recv String, } }; let (c1, c2) = SendAndRecv::channel(|| mpsc::channel(1)); // Spawn a thread to simultaneously send a `Vec<usize>` and receive a `String`: let t1 = tokio::spawn(async move { c1.split(|tx, rx| async move { let send_vec = tokio::spawn(async move { tx.send(vec![1, 2, 3, 4, 5]).await?; Ok::<_, mpsc::Error>(()) }); let recv_string = tokio::spawn(async move { let (string, _) = rx.recv().await?; Ok::<_, mpsc::Error>(string) }); send_vec.await.unwrap()?; let string = recv_string.await.unwrap()?; Ok::<_, mpsc::Error>(string) }).await }); // Simultaneously *receive* a `Vec<usize>` *from*, and *send* a `String` *to*, // the task above: c2.split(|tx, rx| async move { let send_string = tokio::spawn(async move { tx.send("Hello!".to_string()).await?; Ok::<_, mpsc::Error>(()) }); let recv_vec = tokio::spawn(async move { let (vec, _) = rx.recv().await?; Ok::<_, mpsc::Error>(vec) }); // Examine the result values: send_string.await??; let vec = recv_vec.await??; let string = t1.await??.0; assert_eq!(vec, &[1, 2, 3, 4, 5]); assert_eq!(string, "Hello!"); Ok::<_, Box<dyn std::error::Error>>(()) }).await?;
pub fn into_inner(self) -> (Tx, Rx)
[src]
Unwrap a channel into its transmit and receive ends, exiting the regimen of session typing, potentially before the end of the session.
Errors
If this function is used before the end of a session, it may result in errors when the other end of the channel attempts to continue the session.
Examples
use dialectic::prelude::*; use dialectic_tokio_mpsc as mpsc; let (c1, c2) = <Session! { send String }>::channel(mpsc::unbounded_channel); let (tx1, rx1) = c1.into_inner(); let (tx2, rx2) = c2.into_inner();
Trait Implementations
impl<S: Session, Tx: Send + 'static, Rx: Send + 'static> Debug for Chan<S, Tx, Rx> where
Tx: Debug,
Rx: Debug,
[src]
Tx: Debug,
Rx: Debug,
impl<Tx, Rx, S> Drop for Chan<S, Tx, Rx> where
Tx: Send + 'static,
Rx: Send + 'static,
S: Session,
[src]
Tx: Send + 'static,
Rx: Send + 'static,
S: Session,
Auto Trait Implementations
impl<S, Tx, Rx> RefUnwindSafe for Chan<S, Tx, Rx> where
Rx: RefUnwindSafe,
Tx: RefUnwindSafe,
Rx: RefUnwindSafe,
Tx: RefUnwindSafe,
impl<S, Tx, Rx> Send for Chan<S, Tx, Rx>
impl<S, Tx, Rx> Sync for Chan<S, Tx, Rx> where
Rx: Sync,
Tx: Sync,
Rx: Sync,
Tx: Sync,
impl<S, Tx, Rx> Unpin for Chan<S, Tx, Rx> where
Rx: Unpin,
Tx: Unpin,
Rx: Unpin,
Tx: Unpin,
impl<S, Tx, Rx> UnwindSafe for Chan<S, Tx, Rx> where
Rx: UnwindSafe,
Tx: UnwindSafe,
Rx: UnwindSafe,
Tx: UnwindSafe,
Blanket Implementations
impl<T> Any for T where
T: 'static + ?Sized,
[src]
T: 'static + ?Sized,
impl<'a, T, S> As<'a, Val, T> for S where
S: Into<T>,
[src]
S: Into<T>,
pub fn as_convention(this: S) -> T
[src]
impl<T> Borrow<T> for T where
T: ?Sized,
[src]
T: ?Sized,
impl<T> BorrowMut<T> for T where
T: ?Sized,
[src]
T: ?Sized,
pub fn borrow_mut(&mut self) -> &mut T
[src]
impl<'a, T> By<'a, Mut> for T where
T: 'a,
[src]
T: 'a,
impl<'a, T> By<'a, Ref> for T where
T: 'a,
[src]
T: 'a,
impl<'a, T> By<'a, Val> for T
[src]
type Type = T
The type of Self
when called by Convention
.
impl<'a, T> Convert<'a, Mut, Mut> for T where
T: 'a,
[src]
T: 'a,
impl<'a, T> Convert<'a, Mut, Ref> for T where
T: 'a,
[src]
T: 'a,
impl<'a, T> Convert<'a, Ref, Ref> for T where
T: 'a,
[src]
T: 'a,
impl<'a, T> Convert<'a, Val, Val> for T
[src]
impl<T> From<T> for T
[src]
impl<T, U> Into<U> for T where
U: From<T>,
[src]
U: From<T>,
impl<T, U> TryFrom<U> for T where
U: Into<T>,
[src]
U: Into<T>,
type Error = Infallible
The type returned in the event of a conversion error.
pub fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>
[src]
impl<T, U> TryInto<U> for T where
U: TryFrom<T>,
[src]
U: TryFrom<T>,