Struct dialectic::Chan[][src]

#[repr(C)]
#[must_use]pub struct Chan<S: Session, Tx: Send + 'static, Rx: Send + 'static> { /* fields omitted */ }

A bidirectional communications channel using the session type P over the connections Tx and Rx.

Creating new Chans: use Session

The Session trait is implemented for all valid session types. To create a new Chan for some session type, use one of the provided static methods. Here, we create two Chans with the session type send String and its dual recv String, wrapping an underlying bidirectional transport built from a pair of tokio::sync::mpsc::channels:

use dialectic::prelude::*;
use dialectic_tokio_mpsc as mpsc;

// Make a pair of channels:
// - `c1` with the session type `send String`, and
// - `c2` with the dual session type `recv String`
let (c1, c2) = <Session! { send String }>::channel(|| mpsc::channel(1));

If you already have a sender and receiver and want to wrap them in a Chan, use the wrap method for a session type. This is useful, for example, if you’re talking to another process over a network connection, where it’s not possible to build both halves of the channel on one computer, and instead each computer will wrap one end of the connection:

let (tx, rx) = /* ... */;
let c = <Session! { send String }>::wrap(tx, rx);

Implementations

impl<Tx, Rx, S> Chan<S, Tx, Rx> where
    S: Session,
    Tx: Send + 'static,
    Rx: Send + 'static, 
[src]

pub fn close(self) where
    S: Session<Action = Done>, 
[src]

Close a finished session, dropping the underlying connections.

If called inside a future given to split or call, the underlying connections are implicitly recovered for use in subsequent actions in the session, or if called in a future given to in over, are returned to the caller.

Examples

Starting with a channel whose session type is already Done, we can immediately close the channel.

use dialectic::prelude::*;
use dialectic_tokio_mpsc as mpsc;

let (c1, c2) = <Session! {}>::channel(mpsc::unbounded_channel);
c1.close();
c2.close();

However, if the channel’s session type is not Done, it is a type error to attempt to close the channel. The following code will not compile:

use dialectic::prelude::*;
use dialectic_tokio_mpsc as mpsc;

let (c1, c2) = <Session! { loop { send String } }>::channel(mpsc::unbounded_channel);
c1.close();
c2.close();

If you really want to destruct a channel before the end of its session, use into_inner, but beware that this may cause the party on the other end of the channel to throw errors due to your violation of the channel’s protocol!

pub async fn recv<T, P>(self) -> Result<(T, Chan<P, Tx, Rx>), Rx::Error> where
    S: Session<Action = Recv<T, P>>,
    P: Session,
    Rx: Receive<T>, 
[src]

Receive something of type T on the channel, returning the pair of the received object and the channel.

Errors

This function returns the Receiver::Error for the underlying Rx connection if there was an error while receiving.

Examples

use dialectic::prelude::*;
use dialectic_tokio_mpsc as mpsc;

let (c1, c2) = <Session! { recv String }>::channel(|| mpsc::channel(1));
c2.send("Hello, world!".to_string()).await?;

let (s, c1) = c1.recv().await?;
assert_eq!(s, "Hello, world!");

pub async fn send<T, P>(self, message: T) -> Result<Chan<P, Tx, Rx>, Tx::Error> where
    S: Session<Action = Send<T, P>>,
    P: Session,
    Tx: Transmit<T>,
    T: Send
[src]

Send something of type T on the channel by value, returning the channel.

Errors

This function returns the Transmitter::Error for the underlying Tx connection if there was an error while sending.

Examples

use dialectic::prelude::*;
use dialectic_tokio_mpsc as mpsc;

let (c1, c2) = <Session! { send String }>::channel(|| mpsc::channel(1));
c1.send("Hello, world!".to_string()).await?;

let (s, c2) = c2.recv().await?;
assert_eq!(s, "Hello, world!");

pub async fn send_ref<T, P>(
    self,
    message: &T
) -> Result<Chan<P, Tx, Rx>, Tx::Error> where
    S: Session<Action = Send<T, P>>,
    P: Session,
    Tx: Transmit<T, Ref>,
    T: Send
[src]

Send something of type T on the channel by reference, returning the channel.

Errors

This function returns the Transmitter::Error for the underlying Tx connection if there was an error while sending.

Examples

use dialectic::prelude::*;
use dialectic_tokio_mpsc as mpsc;

let (c1, c2) = <Session! { send String }>::channel(|| mpsc::channel(1));
c1.send_ref(&"Hello, world!".to_string()).await?;

let (s, c2) = c2.recv().await?;
assert_eq!(s, "Hello, world!");

pub async fn send_mut<T, P>(
    self,
    message: &mut T
) -> Result<Chan<P, Tx, Rx>, Tx::Error> where
    S: Session<Action = Send<T, P>>,
    P: Session,
    Tx: Transmit<T, Mut>,
    T: Send
[src]

Send something of type T on the channel by mutable reference, returning the channel.

Errors

This function returns the Transmitter::Error for the underlying Tx connection if there was an error while sending.

Examples

use dialectic::prelude::*;
use dialectic_tokio_mpsc as mpsc;

let (c1, c2) = <Session! { send String }>::channel(|| mpsc::channel(1));
let mut string = "Hello, world!".to_string();
c1.send_mut(&mut string).await?;

let (s, c2) = c2.recv().await?;
assert_eq!(s, "Hello, world!");

impl<Tx, Rx, S, Choices, const LENGTH: usize> Chan<S, Tx, Rx> where
    S: Session<Action = Choose<Choices>>,
    Choices: Tuple,
    Choices::AsList: HasLength,
    <Choices::AsList as HasLength>::Length: ToConstant<AsConstant = Number<LENGTH>>,
    Tx: Transmitter + Send + 'static,
    Rx: Send + 'static, 
[src]

pub async fn choose<const N: usize>(
    self
) -> Result<Chan<<Choices::AsList as Select<<Number<N> as ToUnary>::AsUnary>>::Selected, Tx, Rx>, Tx::Error> where
    Number<N>: ToUnary,
    Choices::AsList: Select<<Number<N> as ToUnary>::AsUnary>,
    <Choices::AsList as Select<<Number<N> as ToUnary>::AsUnary>>::Selected: Session
[src]

Actively choose to enter the Nth protocol offered via offer! by the other end of the connection, alerting the other party to this choice by sending the number N over the channel.

The choice N is specified as a const generic usize.

Errors

This function returns the Transmitter::Error for the underlying Tx connection if there was an error while sending the choice.

Examples

use dialectic::prelude::*;
use dialectic_tokio_mpsc as mpsc;

type GiveOrTake = Session! {
    choose {
        0 => send i64,
        1 => recv String,
    }
};

let (c1, c2) = GiveOrTake::channel(|| mpsc::channel(1));

// Spawn a thread to offer a choice
let t1 = tokio::spawn(async move {
    offer!(in c2 {
        0 => { c2.recv().await?; },
        1 => { c2.send("Hello!".to_string()).await?; },
    });
    Ok::<_, mpsc::Error>(())
});

// Choose to send an integer
c1.choose::<0>().await?.send(42).await?;

// Wait for the offering thread to finish
t1.await??;

Attempting to choose an index that’s out of bounds results in a compile-time error:

use dialectic::prelude::*;
use dialectic_tokio_mpsc as mpsc;

type OnlyTwoChoices = Choose<(Done, Done)>;
let (c1, c2) = OnlyTwoChoices::channel(|| mpsc::channel(1));

// Try to choose something out of range (this doesn't typecheck)
c1.choose::<2>().await?;

impl<Tx, Rx, S, Choices, const LENGTH: usize> Chan<S, Tx, Rx> where
    S: Session<Action = Offer<Choices>>,
    Choices: Tuple + 'static,
    Choices::AsList: HasLength + EachScoped + EachHasDual,
    <Choices::AsList as HasLength>::Length: ToConstant<AsConstant = Number<LENGTH>>,
    Z: LessThan<<Choices::AsList as HasLength>::Length>,
    Tx: Send + 'static,
    Rx: Receiver + Send + 'static, 
[src]

pub async fn offer(self) -> Result<Branches<Choices, Tx, Rx>, Rx::Error>[src]

Offer the choice of one or more protocols to the other party, and wait for them to indicate which protocol they’d like to proceed with. Returns a Branches structure representing all the possible channel types which could be returned, which must be eliminated using case.

💡 Where possible, prefer the offer! macro. This has the benefit of ensuring at compile time that no case is left unhandled; it’s also more succinct.

Errors

This function returns the Receiver::Error for the underlying Rx connection if there was an error while receiving.

Examples

use dialectic::prelude::*;
use dialectic_tokio_mpsc as mpsc;

type GiveOrTake = Session! {
    choose {
        0 => send i64,
        1 => recv String,
    }
};

let (c1, c2) = GiveOrTake::channel(|| mpsc::channel(1));

// Spawn a thread to offer a choice
let t1 = tokio::spawn(async move {
    match c2.offer().await?.case::<0>() {
        Ok(c2) => { c2.recv().await?; },
        Err(rest) => match rest.case::<0>() {
            Ok(c2) => { c2.send("Hello!".to_string()).await?; },
            Err(rest) => rest.empty_case(),
        }
    }
    Ok::<_, mpsc::Error>(())
});

// Choose to send an integer
c1.choose::<0>().await?.send(42).await?;

// Wait for the offering thread to finish
t1.await??;

Notice how the handling of cases by manual match is harder to read than the equivalent in terms of offer!:

offer!(in c2 {
    0 => { c2.recv().await?; },
    1 => { c2.send("Hello!".to_string()).await?; },
});

impl<Tx, Rx, S> Chan<S, Tx, Rx> where
    S: Session,
    Tx: Send + 'static,
    Rx: Send + 'static, 
[src]

pub async fn call<T, E, P, Q, F, Fut>(
    self,
    first: F
) -> Result<(T, Result<Chan<Q, Tx, Rx>, SessionIncomplete<Tx, Rx>>), E> where
    S: Session<Action = Call<P, Q>>,
    P: Session,
    Q: Session,
    F: FnOnce(Chan<P, Tx, Rx>) -> Fut,
    Fut: Future<Output = Result<T, E>>, 
[src]

Execute the session type P as a subroutine in a closure.

This operation takes as input an asynchronous closure that runs a channel for the session type P to completion and returns either an error Err or some result value T. The result of this (provided that no errors occurred during P) is a channel ready to execute the session type Q.

Errors

The closure must finish the session P on the channel given to it and drop the finished channel before the future returns. If the channel is dropped before completing P or is not dropped after completing P, a SessionIncomplete error will be returned instead of a channel for Q. The best way to ensure this error does not occur is to call close on the channel before returning from the future, because this statically checks that the session is complete and drops the channel.

Additionally, this function returns an Err if the closure returns an Err.

Examples

This can be used to cleanly modularize a session-typed program by splitting it up into independent subroutines:

use dialectic::prelude::*;
use dialectic_tokio_mpsc as mpsc;

let (c1, c2) = <Session! {
    call { send String };
    send String;
}>::channel(mpsc::unbounded_channel);

let ((), c1_result) = c1.call(|c| async move {
    let c = c.send("Hello!".to_string()).await?;
    // Because we're done with this subroutine, we can "close" the channel here, but it
    // will remain open to the calling context so it can run the rest of the session:
    c.close();
    Ok::<_, mpsc::Error>(())
}).await?;
let c1 = c1_result?;

let c1 = c1.send("World!".to_string()).await?;
c1.close();

More generally, this construct permits the expression of context-free session types, by allowing recursion in the first parameter to Call. For a demonstration of this, see the stack example. For more background on context-free session types, see the paper Context-Free Session Type Inference by Luca Padovani. When comparing with that paper, note that the call operation is roughly equivalent to the paper’s @= operator, and the Call type is equivalent to the paper’s ; type operator.

pub async fn split<T, E, P, Q, R, F, Fut>(
    self,
    with_parts: F
) -> Result<(T, Result<Chan<R, Tx, Rx>, SessionIncomplete<Tx, Rx>>), E> where
    S: Session<Action = Split<P, Q, R>>,
    P: Session,
    Q: Session,
    R: Session,
    F: FnOnce(Chan<P, Tx, Unavailable>, Chan<Q, Unavailable, Rx>) -> Fut,
    Fut: Future<Output = Result<T, E>>, 
[src]

Split a channel into transmit-only and receive-only ends and manipulate them, potentially concurrently, in the given closure.

This is akin to call, except the closure is given two Chans: one which can only do Transmit operations (Send and Choose) and one which can only do Receive operations (Recv and Offer). The result of the call to split is a re-unified Chan ready to execute the session R.

Errors

The closure must finish the session for both the send-only and receive-only ends of the channel and drop or close each end before the future completes. If either end is dropped before finishing its session, or is not closed after finishing its session, a SessionIncomplete error will be returned instead of a finished channel.

Examples

In this example, both ends of a channel concurrently interact with its split send/receive halves. If the underlying channel implementation allows for parallelism, this simultaneous interaction can be faster than sequentially sending data back and forth.

use dialectic::prelude::*;
use dialectic_tokio_mpsc as mpsc;

type SendAndRecv = Session! {
    split {
        -> send Vec<usize>,
        <- recv String,
    }
};

let (c1, c2) = SendAndRecv::channel(|| mpsc::channel(1));

// Spawn a thread to simultaneously send a `Vec<usize>` and receive a `String`:
let t1 = tokio::spawn(async move {
    c1.split(|tx, rx| async move {
        let send_vec = tokio::spawn(async move {
            tx.send(vec![1, 2, 3, 4, 5]).await?;
            Ok::<_, mpsc::Error>(())
        });
        let recv_string = tokio::spawn(async move {
            let (string, _) = rx.recv().await?;
            Ok::<_, mpsc::Error>(string)
        });
        send_vec.await.unwrap()?;
        let string = recv_string.await.unwrap()?;
        Ok::<_, mpsc::Error>(string)
    }).await
});

// Simultaneously *receive* a `Vec<usize>` *from*, and *send* a `String` *to*,
// the task above:
c2.split(|tx, rx| async move {
    let send_string = tokio::spawn(async move {
        tx.send("Hello!".to_string()).await?;
        Ok::<_, mpsc::Error>(())
    });
    let recv_vec = tokio::spawn(async move {
        let (vec, _) = rx.recv().await?;
        Ok::<_, mpsc::Error>(vec)
    });

    // Examine the result values:
    send_string.await??;
    let vec = recv_vec.await??;
    let string = t1.await??.0;
    assert_eq!(vec, &[1, 2, 3, 4, 5]);
    assert_eq!(string, "Hello!");

    Ok::<_, Box<dyn std::error::Error>>(())
}).await?;

pub fn into_inner(self) -> (Tx, Rx)[src]

Unwrap a channel into its transmit and receive ends, exiting the regimen of session typing, potentially before the end of the session.

Errors

If this function is used before the end of a session, it may result in errors when the other end of the channel attempts to continue the session.

Examples

use dialectic::prelude::*;
use dialectic_tokio_mpsc as mpsc;

let (c1, c2) = <Session! { send String }>::channel(mpsc::unbounded_channel);
let (tx1, rx1) = c1.into_inner();
let (tx2, rx2) = c2.into_inner();

Trait Implementations

impl<S: Session, Tx: Send + 'static, Rx: Send + 'static> Debug for Chan<S, Tx, Rx> where
    Tx: Debug,
    Rx: Debug
[src]

impl<Tx, Rx, S> Drop for Chan<S, Tx, Rx> where
    Tx: Send + 'static,
    Rx: Send + 'static,
    S: Session
[src]

Auto Trait Implementations

impl<S, Tx, Rx> RefUnwindSafe for Chan<S, Tx, Rx> where
    Rx: RefUnwindSafe,
    Tx: RefUnwindSafe

impl<S, Tx, Rx> Send for Chan<S, Tx, Rx>

impl<S, Tx, Rx> Sync for Chan<S, Tx, Rx> where
    Rx: Sync,
    Tx: Sync

impl<S, Tx, Rx> Unpin for Chan<S, Tx, Rx> where
    Rx: Unpin,
    Tx: Unpin

impl<S, Tx, Rx> UnwindSafe for Chan<S, Tx, Rx> where
    Rx: UnwindSafe,
    Tx: UnwindSafe

Blanket Implementations

impl<T> Any for T where
    T: 'static + ?Sized
[src]

impl<'a, T, S> As<'a, Val, T> for S where
    S: Into<T>, 
[src]

impl<T> Borrow<T> for T where
    T: ?Sized
[src]

impl<T> BorrowMut<T> for T where
    T: ?Sized
[src]

impl<'a, T> By<'a, Mut> for T where
    T: 'a, 
[src]

type Type = &'a mut T

The type of Self when called by Convention.

impl<'a, T> By<'a, Ref> for T where
    T: 'a, 
[src]

type Type = &'a T

The type of Self when called by Convention.

impl<'a, T> By<'a, Val> for T[src]

type Type = T

The type of Self when called by Convention.

impl<'a, T> Convert<'a, Mut, Mut> for T where
    T: 'a, 
[src]

impl<'a, T> Convert<'a, Mut, Ref> for T where
    T: 'a, 
[src]

impl<'a, T> Convert<'a, Ref, Ref> for T where
    T: 'a, 
[src]

impl<'a, T> Convert<'a, Val, Val> for T[src]

impl<T> From<T> for T[src]

impl<T, U> Into<U> for T where
    U: From<T>, 
[src]

impl<T, U> TryFrom<U> for T where
    U: Into<T>, 
[src]

type Error = Infallible

The type returned in the event of a conversion error.

impl<T, U> TryInto<U> for T where
    U: TryFrom<T>, 
[src]

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.