[][src]Struct dialectic::canonical::CanonicalChan

#[must_use]pub struct CanonicalChan<Tx: Send + 'static, Rx: Send + 'static, P: Actionable<E, Action = P, Env = E>, E: Environment = ()> { /* fields omitted */ }

A bidirectional communications channel using the session type P over the connections Tx and Rx. ⚠️ Important: in type signatures, always write the type synonym Chan, not CanonicalChan directly. Read more here.

The fourth E parameter to a CanonicalChan is a type-level list describing the session environment of the channel: the stack of Loops the channel has entered. When a loop repeats, the next session type is retrieved by selecting the Nth element of this list.

Creating new Chans: use NewSession

To construct a new Chan, use one of the static methods of NewSession on the session type for which you want to create a channel. Here, we create two Chans with the session type Send<String> and its dual Recv<String>, wrapping an underlying bidirectional transport built from a pair of tokio::sync::mpsc::channels:

use dialectic::prelude::*;
use dialectic::backend::mpsc;

// Make a pair of channels:
// - `c1` with the session type `Send<String>`, and
// - `c2` with the dual session type `Recv<String>`
let (c1, c2) = <Send<String>>::channel(|| mpsc::channel(1));

If you already have a sender and receiver and want to wrap them in a Chan, use the wrap method for a session type. This is useful, for example, if you're talking to another process over a network connection, where it's not possible to build both halves of the channel on one computer, and instead each computer will wrap one end of the connection:

let (tx, rx) = mpsc::channel(1);
let c = <Send<String>>::wrap(tx, rx);

Implementations

impl<Tx: Send + 'static, Rx: Send + 'static> CanonicalChan<Tx, Rx, Done, ()>[src]

pub fn close(self)[src]

Close a finished session, dropping the underlying connections.

If called inside a future given to split or seq, the underlying connections are implicitly recovered for use in subsequent actions in the session, or if called in a future given to in over, are returned to the caller.

Examples

Starting with a channel whose session type is already Done, we can immediately close the channel.

use dialectic::prelude::*;
use dialectic::backend::mpsc;

let (c1, c2) = Done::channel(mpsc::unbounded_channel);
c1.close();
c2.close();

However, if the channel's session type is not Done, it is a type error to attempt to close the channel. The following code will not compile:

use dialectic::prelude::*;
use dialectic::backend::mpsc;

let (c1, c2) = <Loop<Send<String, Continue>>>::channel(mpsc::unbounded_channel);
c1.close();
c2.close();

If you really want to destruct a channel before the end of its session, use unwrap, but beware that this may cause the party on the other end of the channel to throw errors due to your violation of the channel's protocol!

impl<'a, Tx, Rx, E, T, P> CanonicalChan<Tx, Rx, Recv<T, P>, E> where
    Tx: Send + 'static,
    Rx: Receive<T> + Send + 'static,
    T: Send + 'static,
    P: Actionable<E>,
    E: Environment
[src]

pub async fn recv(self) -> Result<(T, Chan<Tx, Rx, P, E>), Rx::Error>[src]

Receive something of type T on the channel, returning the pair of the received object and the channel.

Errors

This function returns the Receive::Error for the underlying Rx connection if there was an error while receiving.

Examples

use dialectic::prelude::*;
use dialectic::backend::mpsc;

let (c1, c2) = <Recv<String>>::channel(|| mpsc::channel(1));
c2.send("Hello, world!".to_string()).await?;

let (s, c1) = c1.recv().await?;
assert_eq!(s, "Hello, world!");

impl<'a, Tx, Rx, E, T: 'static, P> CanonicalChan<Tx, Rx, Send<T, P>, E> where
    Tx: Send + 'static,
    Rx: Send + 'static,
    P: Actionable<E>,
    E: Environment
[src]

pub async fn send<'b, Convention: CallingConvention>(
    self,
    message: <T as CallBy<'b, Convention>>::Type
) -> Result<Chan<Tx, Rx, P, E>, <Tx as Transmit<T, Convention>>::Error> where
    Tx: Transmit<T, Convention>,
    T: CallBy<'b, Convention>,
    <T as CallBy<'b, Convention>>::Type: Send
[src]

Send something of type T on the channel, returning the channel.

The underlying sending channel Tx may be able to send a T using multiple different CallingConventions: by Val, by Ref and/or by Mut. To disambiguate, use "turbofish" syntax when calling send, i.e. chan.send::<Val>(1) or chan.send::<Ref>(&true).

Errors

This function returns the Transmit::Error for the underlying Tx connection if there was an error while sending.

Examples

use dialectic::prelude::*;
use dialectic::backend::mpsc;

let (c1, c2) = <Send<String>>::channel(|| mpsc::channel(1));
c1.send("Hello, world!".to_string()).await?;

let (s, c2) = c2.recv().await?;
assert_eq!(s, "Hello, world!");

impl<Tx, Rx, E, Choices: 'static> CanonicalChan<Tx, Rx, Choose<Choices>, E> where
    Tx: Transmit<Choice<<Choices::AsList as HasLength>::Length>, Val> + Send + 'static,
    Rx: Send + 'static,
    Choices: Tuple,
    Choices::AsList: HasLength,
    <Choices::AsList as HasLength>::Length: Send,
    <Choices::AsList as EachSession>::Dual: List,
    E: Environment,
    Choices::AsList: EachScoped<E::Depth>, 
[src]

pub async fn choose<N: Unary>(
    self,
    _choice: N
) -> Result<Chan<Tx, Rx, <Choices::AsList as Select<N>>::Selected, E>, Tx::Error> where
    N: LessThan<_128>,
    Choices::AsList: Select<N>,
    <Choices::AsList as Select<N>>::Selected: Actionable<E>, 
[src]

Actively choose to enter the Nth protocol offered via offer! by the other end of the connection, alerting the other party to this choice by sending the number N over the channel.

The choice N is specified as a type-level Unary number. Predefined constants for all supported numbers of choices (up to a maximum of 127) are available in the constants module, each named for its corresponding decimal number prefixed with an underscore (e.g. _0, or _42).

Errors

This function returns the Transmit::Error for the underlying Tx connection if there was an error while sending the choice.

Examples

use dialectic::prelude::*;
use dialectic::backend::mpsc;

type GiveOrTake = Choose<(Send<i64>, Recv<String>)>;

let (c1, c2) = GiveOrTake::channel(|| mpsc::channel(1));

// Spawn a thread to offer a choice
let t1 = tokio::spawn(async move {
    offer!(c2 => {
        _0 => { c2.recv().await?; },
        _1 => { c2.send("Hello!".to_string()).await?; },
    });
    Ok::<_, mpsc::Error>(())
});

// Choose to send an integer
c1.choose(_0).await?.send(42).await?;

// Wait for the offering thread to finish
t1.await??;

Attempting to choose an index that's out of bounds results in a compile-time error:

use dialectic::prelude::*;
use dialectic::backend::mpsc;

type OnlyTwoChoices = Choose<(Done, Done)>;
let (c1, c2) = OnlyTwoChoices::channel(|| mpsc::channel(1));

// Try to choose something out of range (this doesn't typecheck)
c1.choose(_2).await?;

impl<'a, Tx, Rx, E, Choices: 'static> CanonicalChan<Tx, Rx, Offer<Choices>, E> where
    Tx: Send + 'static,
    Rx: Receive<Choice<<Choices::AsList as HasLength>::Length>> + Send + 'static,
    Choices: Tuple,
    Choices::AsList: HasLength,
    <Choices::AsList as EachSession>::Dual: List,
    E: Environment,
    Choices::AsList: EachActionable<E>,
    Choices::AsList: EachScoped<E::Depth>,
    _0: LessThan<<Choices::AsList as HasLength>::Length>, 
[src]

pub async fn offer(self) -> Result<Branches<Tx, Rx, Choices, E>, Rx::Error>[src]

Offer the choice of one or more protocols to the other party, and wait for them to indicate by sending a number which protocol to proceed with.

💡 Where possible, prefer the offer! macro. This has the benefit of ensuring at compile time that no case is left unhandled; it's also more succinct.

Errors

This function returns the Receive::Error for the underlying Rx connection if there was an error while receiving.

Examples

use dialectic::prelude::*;
use dialectic::backend::mpsc;

type GiveOrTake = Choose<(Send<i64>, Recv<String>)>;

let (c1, c2) = GiveOrTake::channel(|| mpsc::channel(1));

// Spawn a thread to offer a choice
let t1 = tokio::spawn(async move {
    match c2.offer().await?.case() {
        Ok(c2) => { c2.recv().await?; },
        Err(rest) => match rest.case() {
            Ok(c2) => { c2.send("Hello!".to_string()).await?; },
            Err(rest) => rest.empty_case(),
        }
    }
    Ok::<_, mpsc::Error>(())
});

// Choose to send an integer
c1.choose(_0).await?.send(42).await?;

// Wait for the offering thread to finish
t1.await??;

Notice how the handling of cases by manual match is harder to read than the equivalent in terms of offer!:

offer!(c2 => {
    _0 => { c2.recv().await?; },
    _1 => { c2.send("Hello!".to_string()).await?; },
});

impl<'a, Tx, Rx, E, P, Q> CanonicalChan<Tx, Rx, Split<P, Q>, E> where
    Tx: Send + 'static,
    Rx: Send + 'static,
    P: Actionable<E>,
    Q: Actionable<E>,
    E: Environment
[src]

pub async fn split<T, Err, F, Fut>(
    self,
    with_parts: F
) -> Result<(T, Result<Chan<Tx, Rx, Done>, SessionIncomplete<Tx, Rx>>), Err> where
    F: FnOnce(Chan<Available<Tx>, Unavailable<Rx>, P, E>, Chan<Unavailable<Tx>, Available<Rx>, Q, E>) -> Fut,
    Fut: Future<Output = Result<T, Err>>, 
[src]

Split a channel into transmit-only and receive-only ends and manipulate them, potentially concurrently, in the given closure.

To use the channel as a reunited whole after it has been split, combine this operation with seq to sequence further operations after it.

Errors

The closure must finish the session for both the send-only and receive-only ends of the channel and drop or close each end before the future completes. If either end is dropped before finishing its session, or is not closed after finishing its session, a SessionIncomplete error will be returned instead of a finished channel.

Examples

In this example, both ends of a channel concurrently interact with its split send/receive halves. If the underlying channel implementation allows for parallelism, this simultaneous interaction can be faster than sequentially sending data back and forth.

use dialectic::prelude::*;
use dialectic::backend::mpsc;

type SendAndRecv = Split<Send<Vec<usize>>, Recv<String>>;

let (c1, c2) = SendAndRecv::channel(|| mpsc::channel(1));

// Spawn a thread to simultaneously send a `Vec<usize>` and receive a `String`:
let t1 = tokio::spawn(async move {
    c1.split(|tx, rx| async move {
        let send_vec = tokio::spawn(async move {
            tx.send(vec![1, 2, 3, 4, 5]).await?;
            Ok::<_, mpsc::Error>(())
        });
        let recv_string = tokio::spawn(async move {
            let (string, _) = rx.recv().await?;
            Ok::<_, mpsc::Error>(string)
        });
        send_vec.await.unwrap()?;
        let string = recv_string.await.unwrap()?;
        Ok::<_, mpsc::Error>(string)
    }).await
});

// Simultaneously *receive* a `Vec<usize>` *from*, and *send* a `String` *to*,
// the task above:
c2.split(|tx, rx| async move {
    let send_string = tokio::spawn(async move {
        tx.send("Hello!".to_string()).await?;
        Ok::<_, mpsc::Error>(())
    });
    let recv_vec = tokio::spawn(async move {
        let (vec, _) = rx.recv().await?;
        Ok::<_, mpsc::Error>(vec)
    });

    // Examine the result values:
    send_string.await??;
    let vec = recv_vec.await??;
    let string = t1.await??.0;
    assert_eq!(vec, &[1, 2, 3, 4, 5]);
    assert_eq!(string, "Hello!");

    Ok::<_, Box<dyn std::error::Error>>(())
}).await?;

impl<'a, Tx, Rx, E, P, Q> CanonicalChan<Tx, Rx, Seq<P, Q>, E> where
    Tx: Send + 'static,
    Rx: Send + 'static,
    P: Actionable<E>,
    Q: Actionable<E>,
    E: Environment
[src]

pub async fn seq<T, Err, F, Fut>(
    self,
    first: F
) -> Result<(T, Result<Chan<Tx, Rx, Q, E>, SessionIncomplete<Tx, Rx>>), Err> where
    F: FnOnce(Chan<Tx, Rx, P, E>) -> Fut,
    Fut: Future<Output = Result<T, Err>>, 
[src]

Sequence an arbitrary session P before another session Q.

This operation takes as input an asynchronous closure that runs a channel for the session type P to completion and returns either an error Err or some result value T. The result of this (provided that no errors occurred during P) is a channel ready to execute the session type Q.

Errors

The closure must finish the session P on the channel given to it and drop the finished channel before the future returns. If the channel is dropped before completing P or is not dropped after completing P, a SessionIncomplete error will be returned instead of a channel for Q. The best way to ensure this error does not occur is to call close on the channel before returning from the future, because this statically checks that the session is complete and drops the channel.

Additionally, this function returns an Err if the closure returns an Err.

Examples

This can be used to cleanly modularize a session-typed program by splitting it up into independent subroutines:

use dialectic::prelude::*;
use dialectic::backend::mpsc;

let (c1, c2) = <Seq<Send<String>, Send<String>>>::channel(mpsc::unbounded_channel);

let ((), c1_result) = c1.seq(|c| async move {
    let c = c.send("Hello!".to_string()).await?;
    // Because we're done with this subroutine, we can "close" the channel here, but it
    // will remain open to the calling context so it can run the rest of the session:
    c.close();
    Ok::<_, mpsc::Error>(())
}).await?;
let c1 = c1_result?;

let c1 = c1.send("World!".to_string()).await?;
c1.close();

An advanced example: context-free sessions

More generally, Seq allows for arbitrary context-free session types, by permitting multiple Continues to be sequenced together. In the following example, we define and implement a session type for valid operations on a stack: that is, the session type statically prevents programs that would attempt to pop from an empty stack. This session type would not be expressible without Seq, because it requires a point of recursion that is not at the end of a session type.

use std::{marker, error::Error, fmt::Debug, future::Future, pin::Pin, any::Any};

type Stack<T> =
    Loop<Offer<(Break, Recv<T, Seq<Continue, Send<T, Continue>>>)>>;

// A server over the `mpsc` backend for the `Stack<T>` protocol
fn stack<T>(
    mut chan: mpsc::Chan<Stack<T>>,
) -> Pin<Box<dyn Future<Output = Result<(), mpsc::Error>> + marker::Send>>
where
    T: marker::Send + 'static,
{
    Box::pin(async move {
        loop {
            chan = offer!(chan => {
                // Client doesn't want to push a value
                _0 => break chan.close(),
                // Client wants to push a value
                _1 => {
                    let (t, chan) = chan.recv().await?;       // Receive pushed value
                    let ((), chan) = chan.seq(stack).await?;  // Recursively do `Stack<T>`
                    chan.unwrap().send(t).await?              // Send back that pushed value
                },
            })
        }
        Ok(())
    })
}

// A client over the `mpsc` backend for the `Stack<T>` protocol, which uses the
// server's stack to reverse a given iterator
fn reverse_with_stack<T>(
    mut chan: mpsc::Chan<<Stack<T> as Session>::Dual>,
    mut iter: impl Iterator<Item = T> + marker::Send + 'static,
) -> Pin<Box<dyn Future<Output = Result<Vec<T>, mpsc::Error>> + marker::Send>>
where
    T: marker::Send + 'static,
{
    Box::pin(async move {
        if let Some(t) = iter.next() {
            // If there is a value left in the iterator...
            let (mut reversed, chan) =
                chan.choose(_1).await?  // Choose to push a value
                    .send(t).await?     // Push the value
                    // Recursively push the rest of the iterator
                    .seq(|chan| reverse_with_stack(chan, iter)).await?;
            let (t, chan) = chan.unwrap().recv().await?;  // Pop a value
            reversed.push(t);                             // Add it to the reversed `Vec`
            chan.choose(_0).await?.close();               // Choose to complete the session
            Ok(reversed)
        } else {
            // If there are no values left in the iterator...
            chan.choose(_0).await?.close();  // Choose to complete the session
            Ok(vec![])
        }
    })
}

// Using the server and client above, let's reverse a list!
let (server_chan, client_chan) = <Stack<usize>>::channel(|| mpsc::channel(1));
let server_thread = tokio::spawn(stack(server_chan));
let input = vec![1, 2, 3, 4, 5].into_iter();
let result = reverse_with_stack(client_chan, input).await?;
assert_eq!(result, vec![5, 4, 3, 2, 1]);
server_thread.await?;

For more on context-free session types, see "Context-Free Session Type Inference" by Luca Padovani: https://doi.org/10.1145/3229062. When comparing with that paper, note that the seq operator is roughly equivalent to its @= operator, and the Seq type is equivalent to ;.

impl<'a, Tx: 'a, Rx: 'a, E, P> CanonicalChan<Tx, Rx, P, E> where
    Tx: Send + 'static,
    Rx: Send + 'static,
    P: Actionable<E, Action = P, Env = E>,
    E: Environment
[src]

pub fn unwrap(mut self: Self) -> (Tx, Rx)[src]

Unwrap a channel into its transmit and receive ends, exiting the regimen of session typing, potentially before the end of the session.

Errors

If this function is used before the end of a session, it may result in errors when the other end of the channel attempts to continue the session.

Examples

use dialectic::prelude::*;
use dialectic::backend::mpsc;

let (c1, c2) = <Send<String>>::channel(mpsc::unbounded_channel);
let (tx1, rx1) = c1.unwrap();
let (tx2, rx2) = c2.unwrap();

Trait Implementations

impl<Tx: Send + 'static, Rx: Send + 'static, P: Actionable<E, Action = P, Env = E>, E: Environment> Debug for CanonicalChan<Tx, Rx, P, E> where
    Tx: Debug,
    Rx: Debug
[src]

impl<Tx, Rx, P, E> Drop for CanonicalChan<Tx, Rx, P, E> where
    Tx: Send + 'static,
    Rx: Send + 'static,
    P: Actionable<E, Action = P, Env = E>,
    E: Environment
[src]

Auto Trait Implementations

impl<Tx, Rx, P, E = ()> !RefUnwindSafe for CanonicalChan<Tx, Rx, P, E>[src]

impl<Tx, Rx, P, E> Send for CanonicalChan<Tx, Rx, P, E> where
    E: Send,
    P: Send
[src]

impl<Tx, Rx, P, E = ()> !Sync for CanonicalChan<Tx, Rx, P, E>[src]

impl<Tx, Rx, P, E> Unpin for CanonicalChan<Tx, Rx, P, E> where
    E: Unpin,
    P: Unpin,
    Rx: Unpin,
    Tx: Unpin
[src]

impl<Tx, Rx, P, E = ()> !UnwindSafe for CanonicalChan<Tx, Rx, P, E>[src]

Blanket Implementations

impl<T> Any for T where
    T: 'static + ?Sized
[src]

impl<T> Borrow<T> for T where
    T: ?Sized
[src]

impl<T> BorrowMut<T> for T where
    T: ?Sized
[src]

impl<'a, T> CallBy<'a, Mut> for T where
    T: 'a, 
[src]

type Type = &'a mut T

The type of Self when called by Convention.

impl<'a, T> CallBy<'a, Ref> for T where
    T: 'a, 
[src]

type Type = &'a T

The type of Self when called by Convention.

impl<'a, T> CallBy<'a, Val> for T[src]

type Type = T

The type of Self when called by Convention.

impl<T> From<T> for T[src]

impl<T, U> Into<U> for T where
    U: From<T>, 
[src]

impl<T, U> TryFrom<U> for T where
    U: Into<T>, 
[src]

type Error = Infallible

The type returned in the event of a conversion error.

impl<T, U> TryInto<U> for T where
    U: TryFrom<T>, 
[src]

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.