[−][src]Struct dialectic::canonical::CanonicalChan
A bidirectional communications channel using the session type P
over the connections Tx
and
Rx
. ⚠️ Important: in type signatures, always write the type synonym Chan
,
not CanonicalChan
directly. Read more
here.
The fourth E
parameter to a CanonicalChan
is a type-level list describing the session
environment of the channel: the stack of Loop
s the channel has entered. When a loop
repeats, the next session type is retrieved by selecting the N
th element of this list.
Creating new Chan
s: use NewSession
To construct a new Chan
, use one of the static methods of NewSession
on
the session type for which you want to create a channel. Here, we create two Chan
s with the
session type Send<String>
and its dual Recv<String>
, wrapping an underlying bidirectional
transport built from a pair of tokio::sync::mpsc::channel
s:
use dialectic::prelude::*; use dialectic::backend::mpsc; // Make a pair of channels: // - `c1` with the session type `Send<String>`, and // - `c2` with the dual session type `Recv<String>` let (c1, c2) = <Send<String>>::channel(|| mpsc::channel(1));
If you already have a sender and receiver and want to wrap them in a Chan
, use the
wrap
method for a session type. This is useful, for example, if
you're talking to another process over a network connection, where it's not possible to build
both halves of the channel on one computer, and instead each computer will wrap one end of the
connection:
let (tx, rx) = mpsc::channel(1); let c = <Send<String>>::wrap(tx, rx);
Implementations
impl<Tx: Send + 'static, Rx: Send + 'static> CanonicalChan<Tx, Rx, Done, ()>
[src]
pub fn close(self)
[src]
Close a finished session, dropping the underlying connections.
If called inside a future given to split
or
seq
, the underlying connections are implicitly recovered for use in
subsequent actions in the session, or if called in a future given to in
over
, are returned to the caller.
Examples
Starting with a channel whose session type is already Done
, we can immediately close the
channel.
use dialectic::prelude::*; use dialectic::backend::mpsc; let (c1, c2) = Done::channel(mpsc::unbounded_channel); c1.close(); c2.close();
However, if the channel's session type is not Done
, it is a type error to attempt to
close the channel. The following code will not compile:
use dialectic::prelude::*; use dialectic::backend::mpsc; let (c1, c2) = <Loop<Send<String, Continue>>>::channel(mpsc::unbounded_channel); c1.close(); c2.close();
If you really want to destruct a channel before the end of its session, use
unwrap
, but beware that this may cause the party on the other end
of the channel to throw errors due to your violation of the channel's protocol!
impl<'a, Tx, Rx, E, T, P> CanonicalChan<Tx, Rx, Recv<T, P>, E> where
Tx: Send + 'static,
Rx: Receive<T> + Send + 'static,
T: Send + 'static,
P: Actionable<E>,
E: Environment,
[src]
Tx: Send + 'static,
Rx: Receive<T> + Send + 'static,
T: Send + 'static,
P: Actionable<E>,
E: Environment,
pub async fn recv(self) -> Result<(T, Chan<Tx, Rx, P, E>), Rx::Error>
[src]
Receive something of type T
on the channel, returning the pair of the received object and
the channel.
Errors
This function returns the Receive::Error
for the underlying Rx
connection if there was
an error while receiving.
Examples
use dialectic::prelude::*; use dialectic::backend::mpsc; let (c1, c2) = <Recv<String>>::channel(|| mpsc::channel(1)); c2.send("Hello, world!".to_string()).await?; let (s, c1) = c1.recv().await?; assert_eq!(s, "Hello, world!");
impl<'a, Tx, Rx, E, T: 'static, P> CanonicalChan<Tx, Rx, Send<T, P>, E> where
Tx: Send + 'static,
Rx: Send + 'static,
P: Actionable<E>,
E: Environment,
[src]
Tx: Send + 'static,
Rx: Send + 'static,
P: Actionable<E>,
E: Environment,
pub async fn send<'b, Convention: CallingConvention>(
self,
message: <T as CallBy<'b, Convention>>::Type
) -> Result<Chan<Tx, Rx, P, E>, <Tx as Transmit<T, Convention>>::Error> where
Tx: Transmit<T, Convention>,
T: CallBy<'b, Convention>,
<T as CallBy<'b, Convention>>::Type: Send,
[src]
self,
message: <T as CallBy<'b, Convention>>::Type
) -> Result<Chan<Tx, Rx, P, E>, <Tx as Transmit<T, Convention>>::Error> where
Tx: Transmit<T, Convention>,
T: CallBy<'b, Convention>,
<T as CallBy<'b, Convention>>::Type: Send,
Send something of type T
on the channel, returning the channel.
The underlying sending channel Tx
may be able to send a T
using multiple different
CallingConvention
s: by Val
, by Ref
and/or by Mut
. To disambiguate, use
"turbofish" syntax when calling send
, i.e. chan.send::<Val>(1)
or
chan.send::<Ref>(&true)
.
Errors
This function returns the Transmit::Error
for the underlying Tx
connection if there
was an error while sending.
Examples
use dialectic::prelude::*; use dialectic::backend::mpsc; let (c1, c2) = <Send<String>>::channel(|| mpsc::channel(1)); c1.send("Hello, world!".to_string()).await?; let (s, c2) = c2.recv().await?; assert_eq!(s, "Hello, world!");
impl<Tx, Rx, E, Choices: 'static> CanonicalChan<Tx, Rx, Choose<Choices>, E> where
Tx: Transmit<Choice<<Choices::AsList as HasLength>::Length>, Val> + Send + 'static,
Rx: Send + 'static,
Choices: Tuple,
Choices::AsList: HasLength,
<Choices::AsList as HasLength>::Length: Send,
<Choices::AsList as EachSession>::Dual: List,
E: Environment,
Choices::AsList: EachScoped<E::Depth>,
[src]
Tx: Transmit<Choice<<Choices::AsList as HasLength>::Length>, Val> + Send + 'static,
Rx: Send + 'static,
Choices: Tuple,
Choices::AsList: HasLength,
<Choices::AsList as HasLength>::Length: Send,
<Choices::AsList as EachSession>::Dual: List,
E: Environment,
Choices::AsList: EachScoped<E::Depth>,
pub async fn choose<N: Unary>(
self,
_choice: N
) -> Result<Chan<Tx, Rx, <Choices::AsList as Select<N>>::Selected, E>, Tx::Error> where
N: LessThan<_128>,
Choices::AsList: Select<N>,
<Choices::AsList as Select<N>>::Selected: Actionable<E>,
[src]
self,
_choice: N
) -> Result<Chan<Tx, Rx, <Choices::AsList as Select<N>>::Selected, E>, Tx::Error> where
N: LessThan<_128>,
Choices::AsList: Select<N>,
<Choices::AsList as Select<N>>::Selected: Actionable<E>,
Actively choose to enter the N
th protocol offered via offer!
by the
other end of the connection, alerting the other party to this choice by sending the number
N
over the channel.
The choice N
is specified as a type-level Unary
number. Predefined constants for all
supported numbers of choices (up to a maximum of 127) are available in the
constants
module, each named for its corresponding
decimal number prefixed with an underscore (e.g. _0
, or _42
).
Errors
This function returns the Transmit::Error
for the underlying Tx
connection if there
was an error while sending the choice.
Examples
use dialectic::prelude::*; use dialectic::backend::mpsc; type GiveOrTake = Choose<(Send<i64>, Recv<String>)>; let (c1, c2) = GiveOrTake::channel(|| mpsc::channel(1)); // Spawn a thread to offer a choice let t1 = tokio::spawn(async move { offer!(c2 => { _0 => { c2.recv().await?; }, _1 => { c2.send("Hello!".to_string()).await?; }, }); Ok::<_, mpsc::Error>(()) }); // Choose to send an integer c1.choose(_0).await?.send(42).await?; // Wait for the offering thread to finish t1.await??;
Attempting to choose an index that's out of bounds results in a compile-time error:
use dialectic::prelude::*; use dialectic::backend::mpsc; type OnlyTwoChoices = Choose<(Done, Done)>; let (c1, c2) = OnlyTwoChoices::channel(|| mpsc::channel(1)); // Try to choose something out of range (this doesn't typecheck) c1.choose(_2).await?;
impl<'a, Tx, Rx, E, Choices: 'static> CanonicalChan<Tx, Rx, Offer<Choices>, E> where
Tx: Send + 'static,
Rx: Receive<Choice<<Choices::AsList as HasLength>::Length>> + Send + 'static,
Choices: Tuple,
Choices::AsList: HasLength,
<Choices::AsList as EachSession>::Dual: List,
E: Environment,
Choices::AsList: EachActionable<E>,
Choices::AsList: EachScoped<E::Depth>,
_0: LessThan<<Choices::AsList as HasLength>::Length>,
[src]
Tx: Send + 'static,
Rx: Receive<Choice<<Choices::AsList as HasLength>::Length>> + Send + 'static,
Choices: Tuple,
Choices::AsList: HasLength,
<Choices::AsList as EachSession>::Dual: List,
E: Environment,
Choices::AsList: EachActionable<E>,
Choices::AsList: EachScoped<E::Depth>,
_0: LessThan<<Choices::AsList as HasLength>::Length>,
pub async fn offer(self) -> Result<Branches<Tx, Rx, Choices, E>, Rx::Error>
[src]
Offer the choice of one or more protocols to the other party, and wait for them to indicate by sending a number which protocol to proceed with.
💡 Where possible, prefer the offer!
macro. This has the benefit of
ensuring at compile time that no case is left unhandled; it's also more succinct.
Errors
This function returns the Receive::Error
for the underlying Rx
connection if there was
an error while receiving.
Examples
use dialectic::prelude::*; use dialectic::backend::mpsc; type GiveOrTake = Choose<(Send<i64>, Recv<String>)>; let (c1, c2) = GiveOrTake::channel(|| mpsc::channel(1)); // Spawn a thread to offer a choice let t1 = tokio::spawn(async move { match c2.offer().await?.case() { Ok(c2) => { c2.recv().await?; }, Err(rest) => match rest.case() { Ok(c2) => { c2.send("Hello!".to_string()).await?; }, Err(rest) => rest.empty_case(), } } Ok::<_, mpsc::Error>(()) }); // Choose to send an integer c1.choose(_0).await?.send(42).await?; // Wait for the offering thread to finish t1.await??;
Notice how the handling of cases by manual match
is harder to read than the equivalent in
terms of offer!
:
offer!(c2 => { _0 => { c2.recv().await?; }, _1 => { c2.send("Hello!".to_string()).await?; }, });
impl<'a, Tx, Rx, E, P, Q> CanonicalChan<Tx, Rx, Split<P, Q>, E> where
Tx: Send + 'static,
Rx: Send + 'static,
P: Actionable<E>,
Q: Actionable<E>,
E: Environment,
[src]
Tx: Send + 'static,
Rx: Send + 'static,
P: Actionable<E>,
Q: Actionable<E>,
E: Environment,
pub async fn split<T, Err, F, Fut>(
self,
with_parts: F
) -> Result<(T, Result<Chan<Tx, Rx, Done>, SessionIncomplete<Tx, Rx>>), Err> where
F: FnOnce(Chan<Available<Tx>, Unavailable<Rx>, P, E>, Chan<Unavailable<Tx>, Available<Rx>, Q, E>) -> Fut,
Fut: Future<Output = Result<T, Err>>,
[src]
self,
with_parts: F
) -> Result<(T, Result<Chan<Tx, Rx, Done>, SessionIncomplete<Tx, Rx>>), Err> where
F: FnOnce(Chan<Available<Tx>, Unavailable<Rx>, P, E>, Chan<Unavailable<Tx>, Available<Rx>, Q, E>) -> Fut,
Fut: Future<Output = Result<T, Err>>,
Split a channel into transmit-only and receive-only ends and manipulate them, potentially concurrently, in the given closure.
To use the channel as a reunited whole after it has been split, combine this operation with
seq
to sequence further operations after it.
Errors
The closure must finish the session for both the send-only and receive-only ends of the
channel and drop or close
each end before the future completes.
If either end is dropped before finishing its session, or is not closed after finishing its
session, a SessionIncomplete
error will be returned instead of a finished channel.
Examples
In this example, both ends of a channel concurrently interact with its split send/receive halves. If the underlying channel implementation allows for parallelism, this simultaneous interaction can be faster than sequentially sending data back and forth.
use dialectic::prelude::*; use dialectic::backend::mpsc; type SendAndRecv = Split<Send<Vec<usize>>, Recv<String>>; let (c1, c2) = SendAndRecv::channel(|| mpsc::channel(1)); // Spawn a thread to simultaneously send a `Vec<usize>` and receive a `String`: let t1 = tokio::spawn(async move { c1.split(|tx, rx| async move { let send_vec = tokio::spawn(async move { tx.send(vec![1, 2, 3, 4, 5]).await?; Ok::<_, mpsc::Error>(()) }); let recv_string = tokio::spawn(async move { let (string, _) = rx.recv().await?; Ok::<_, mpsc::Error>(string) }); send_vec.await.unwrap()?; let string = recv_string.await.unwrap()?; Ok::<_, mpsc::Error>(string) }).await }); // Simultaneously *receive* a `Vec<usize>` *from*, and *send* a `String` *to*, // the task above: c2.split(|tx, rx| async move { let send_string = tokio::spawn(async move { tx.send("Hello!".to_string()).await?; Ok::<_, mpsc::Error>(()) }); let recv_vec = tokio::spawn(async move { let (vec, _) = rx.recv().await?; Ok::<_, mpsc::Error>(vec) }); // Examine the result values: send_string.await??; let vec = recv_vec.await??; let string = t1.await??.0; assert_eq!(vec, &[1, 2, 3, 4, 5]); assert_eq!(string, "Hello!"); Ok::<_, Box<dyn std::error::Error>>(()) }).await?;
impl<'a, Tx, Rx, E, P, Q> CanonicalChan<Tx, Rx, Seq<P, Q>, E> where
Tx: Send + 'static,
Rx: Send + 'static,
P: Actionable<E>,
Q: Actionable<E>,
E: Environment,
[src]
Tx: Send + 'static,
Rx: Send + 'static,
P: Actionable<E>,
Q: Actionable<E>,
E: Environment,
pub async fn seq<T, Err, F, Fut>(
self,
first: F
) -> Result<(T, Result<Chan<Tx, Rx, Q, E>, SessionIncomplete<Tx, Rx>>), Err> where
F: FnOnce(Chan<Tx, Rx, P, E>) -> Fut,
Fut: Future<Output = Result<T, Err>>,
[src]
self,
first: F
) -> Result<(T, Result<Chan<Tx, Rx, Q, E>, SessionIncomplete<Tx, Rx>>), Err> where
F: FnOnce(Chan<Tx, Rx, P, E>) -> Fut,
Fut: Future<Output = Result<T, Err>>,
Sequence an arbitrary session P
before another session Q
.
This operation takes as input an asynchronous closure that runs a channel for the session
type P
to completion and returns either an error Err
or some result value T
. The
result of this (provided that no errors occurred during P
) is a channel ready to execute
the session type Q
.
Errors
The closure must finish the session P
on the channel given to it and drop the finished
channel before the future returns. If the channel is dropped before completing P
or is not
dropped after completing P
, a SessionIncomplete
error will be returned instead of a
channel for Q
. The best way to ensure this error does not occur is to call
close
on the channel before returning from the future, because
this statically checks that the session is complete and drops the channel.
Additionally, this function returns an Err
if the closure returns an Err
.
Examples
This can be used to cleanly modularize a session-typed program by splitting it up into independent subroutines:
use dialectic::prelude::*; use dialectic::backend::mpsc; let (c1, c2) = <Seq<Send<String>, Send<String>>>::channel(mpsc::unbounded_channel); let ((), c1_result) = c1.seq(|c| async move { let c = c.send("Hello!".to_string()).await?; // Because we're done with this subroutine, we can "close" the channel here, but it // will remain open to the calling context so it can run the rest of the session: c.close(); Ok::<_, mpsc::Error>(()) }).await?; let c1 = c1_result?; let c1 = c1.send("World!".to_string()).await?; c1.close();
An advanced example: context-free sessions
More generally, Seq
allows for arbitrary context-free session types, by permitting
multiple Continue
s to be sequenced together. In the following example, we define and
implement a session type for valid operations on a stack: that is, the session type
statically prevents programs that would attempt to pop from an empty stack. This session
type would not be expressible without Seq
, because it requires a point of recursion that
is not at the end of a session type.
use std::{marker, error::Error, fmt::Debug, future::Future, pin::Pin, any::Any}; type Stack<T> = Loop<Offer<(Break, Recv<T, Seq<Continue, Send<T, Continue>>>)>>; // A server over the `mpsc` backend for the `Stack<T>` protocol fn stack<T>( mut chan: mpsc::Chan<Stack<T>>, ) -> Pin<Box<dyn Future<Output = Result<(), mpsc::Error>> + marker::Send>> where T: marker::Send + 'static, { Box::pin(async move { loop { chan = offer!(chan => { // Client doesn't want to push a value _0 => break chan.close(), // Client wants to push a value _1 => { let (t, chan) = chan.recv().await?; // Receive pushed value let ((), chan) = chan.seq(stack).await?; // Recursively do `Stack<T>` chan.unwrap().send(t).await? // Send back that pushed value }, }) } Ok(()) }) } // A client over the `mpsc` backend for the `Stack<T>` protocol, which uses the // server's stack to reverse a given iterator fn reverse_with_stack<T>( mut chan: mpsc::Chan<<Stack<T> as Session>::Dual>, mut iter: impl Iterator<Item = T> + marker::Send + 'static, ) -> Pin<Box<dyn Future<Output = Result<Vec<T>, mpsc::Error>> + marker::Send>> where T: marker::Send + 'static, { Box::pin(async move { if let Some(t) = iter.next() { // If there is a value left in the iterator... let (mut reversed, chan) = chan.choose(_1).await? // Choose to push a value .send(t).await? // Push the value // Recursively push the rest of the iterator .seq(|chan| reverse_with_stack(chan, iter)).await?; let (t, chan) = chan.unwrap().recv().await?; // Pop a value reversed.push(t); // Add it to the reversed `Vec` chan.choose(_0).await?.close(); // Choose to complete the session Ok(reversed) } else { // If there are no values left in the iterator... chan.choose(_0).await?.close(); // Choose to complete the session Ok(vec![]) } }) } // Using the server and client above, let's reverse a list! let (server_chan, client_chan) = <Stack<usize>>::channel(|| mpsc::channel(1)); let server_thread = tokio::spawn(stack(server_chan)); let input = vec![1, 2, 3, 4, 5].into_iter(); let result = reverse_with_stack(client_chan, input).await?; assert_eq!(result, vec![5, 4, 3, 2, 1]); server_thread.await?;
For more on context-free session types, see "Context-Free Session Type Inference" by Luca
Padovani: https://doi.org/10.1145/3229062. When comparing with that paper, note that the
seq
operator is roughly equivalent to its @=
operator, and the
Seq
type is equivalent to ;
.
impl<'a, Tx: 'a, Rx: 'a, E, P> CanonicalChan<Tx, Rx, P, E> where
Tx: Send + 'static,
Rx: Send + 'static,
P: Actionable<E, Action = P, Env = E>,
E: Environment,
[src]
Tx: Send + 'static,
Rx: Send + 'static,
P: Actionable<E, Action = P, Env = E>,
E: Environment,
pub fn unwrap(mut self: Self) -> (Tx, Rx)
[src]
Unwrap a channel into its transmit and receive ends, exiting the regimen of session typing, potentially before the end of the session.
Errors
If this function is used before the end of a session, it may result in errors when the other end of the channel attempts to continue the session.
Examples
use dialectic::prelude::*; use dialectic::backend::mpsc; let (c1, c2) = <Send<String>>::channel(mpsc::unbounded_channel); let (tx1, rx1) = c1.unwrap(); let (tx2, rx2) = c2.unwrap();
Trait Implementations
impl<Tx: Send + 'static, Rx: Send + 'static, P: Actionable<E, Action = P, Env = E>, E: Environment> Debug for CanonicalChan<Tx, Rx, P, E> where
Tx: Debug,
Rx: Debug,
[src]
Tx: Debug,
Rx: Debug,
impl<Tx, Rx, P, E> Drop for CanonicalChan<Tx, Rx, P, E> where
Tx: Send + 'static,
Rx: Send + 'static,
P: Actionable<E, Action = P, Env = E>,
E: Environment,
[src]
Tx: Send + 'static,
Rx: Send + 'static,
P: Actionable<E, Action = P, Env = E>,
E: Environment,
Auto Trait Implementations
impl<Tx, Rx, P, E = ()> !RefUnwindSafe for CanonicalChan<Tx, Rx, P, E>
[src]
impl<Tx, Rx, P, E> Send for CanonicalChan<Tx, Rx, P, E> where
E: Send,
P: Send,
[src]
E: Send,
P: Send,
impl<Tx, Rx, P, E = ()> !Sync for CanonicalChan<Tx, Rx, P, E>
[src]
impl<Tx, Rx, P, E> Unpin for CanonicalChan<Tx, Rx, P, E> where
E: Unpin,
P: Unpin,
Rx: Unpin,
Tx: Unpin,
[src]
E: Unpin,
P: Unpin,
Rx: Unpin,
Tx: Unpin,
impl<Tx, Rx, P, E = ()> !UnwindSafe for CanonicalChan<Tx, Rx, P, E>
[src]
Blanket Implementations
impl<T> Any for T where
T: 'static + ?Sized,
[src]
T: 'static + ?Sized,
impl<T> Borrow<T> for T where
T: ?Sized,
[src]
T: ?Sized,
impl<T> BorrowMut<T> for T where
T: ?Sized,
[src]
T: ?Sized,
pub fn borrow_mut(&mut self) -> &mut T
[src]
impl<'a, T> CallBy<'a, Mut> for T where
T: 'a,
[src]
T: 'a,
impl<'a, T> CallBy<'a, Ref> for T where
T: 'a,
[src]
T: 'a,
impl<'a, T> CallBy<'a, Val> for T
[src]
type Type = T
The type of Self
when called by Convention
.
impl<T> From<T> for T
[src]
impl<T, U> Into<U> for T where
U: From<T>,
[src]
U: From<T>,
impl<T, U> TryFrom<U> for T where
U: Into<T>,
[src]
U: Into<T>,
type Error = Infallible
The type returned in the event of a conversion error.
pub fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>
[src]
impl<T, U> TryInto<U> for T where
U: TryFrom<T>,
[src]
U: TryFrom<T>,