[][src]Crate async_local_bounded_channel

A same-producer, same-consumer channel, bounded to a single async task.

Implementation details

Internally, this uses the generic-array crate, which utilizes types from typenum to specify the capacity at compile time, allowing the space for the queue to be allocated inline. Thus, this channel also requires specifying the capacity upfront at compile time.

Examples

Used together with futures::future::select, this can implement something like a coroutine, where two asynchronous generators cooperate producing and consuming values.

futures::executor::block_on(async move {
    // create a new channel with a capacity of 8 items
    let mut channel = channel::<_, U8>();
    let (mut tx, mut rx) = channel.split();
    let producer = async move {
        for i in 0..100 {
            tx.send(i).await.expect("consumer still alive");
        }
    };
    let consumer = async move {
        let mut expected = 0;
        loop {
            if let Ok(v) = rx.receive().await {
                assert_eq!(v, expected);
                expected += 1;
            } else {
                break;
            }
        }
    };
    pin_mut!(producer, consumer);
    let remaining = select(producer, consumer).await.factor_first().1;
    match remaining {
        Either::Left(f) => f.await,
        Either::Right(f) => f.await,
    }
});

This can be useful, for example, when implementing a server. One task can handle each client, where the producer waits for incoming requests and writes responses; and the consumer waits for requests, handles them, and then generates a response.

Usage notes

Once the transmission endpoints have been acquired via split(), the channel cannot be moved. This is required for safety, since each endpoint contains a reference back to the channel; thus, if the channel were to move, those references would become dangling.

This example deliberately fails to compile
let mut channel = channel::<isize, U8>();
let (tx, rx) = channel.split();
std::thread::spawn(move || {
    // nope!
    let channel = channel;
    let tx = tx;
    let rx = rx;
});

Further, endpoints must remain anchored to a single thread, since access to the underlying data structures is not thread-safe. Unfortunately, this isn't enforced by the compiler, and scoped thread libraries can allow unsafe usage. For example:

// shouldn't compile, but unfortunately does.
let mut channel = channel::<isize, U8>();
crossbeam::thread::scope(|s| {
    let (tx, rx) = channel.split();
    // don't do this!
    s.spawn(move |_| {
        let tx = tx;
    });
    s.spawn(move |_| {
        let rx = rx;
    });
});

If there are no open endpoints, though, a channel can be safely moved and sent. A channel can even be re-used after the endpoints are dropped.

type C = async_local_bounded_channel::Channel<isize, U8>;

async fn test_channel(mut channel: C) -> C {
    // run the producer-consumer example above.
    channel
}

let channel = channel();
let t = std::thread::spawn(move || {
    let channel = block_on(async move {
       test_channel(channel).await
    });
    block_on(async move {
        test_channel(channel).await
    });
});
t.join().expect("test to pass");

Structs

Channel

A same-producer, same-consumer channel.

Receiver

The endpoint of a channel for receiving values.

Sender

The endpoint of a channel for sending values.

Functions

channel

Create a bounded channel for communicating within a task.