Crate mpmc_async

source ·
Expand description

A multi-producer, multi-consumer async channel with reservations.

Example usage:

tokio_test::block_on(async {
    let (tx1, rx1) = mpmc_async::channel(2);

    let task = tokio::spawn(async move {
      let rx2 = rx1.clone();
      assert_eq!(rx1.recv().await.unwrap(), 2);
      assert_eq!(rx2.recv().await.unwrap(), 1);
    });

    let tx2 = tx1.clone();
    let permit = tx1.reserve().await.unwrap();
    tx2.send(1).await.unwrap();
    permit.send(2);

    task.await.unwrap();
});

A more complex example with multiple sender and receiver tasks:

use std::collections::BTreeSet;
use std::ops::DerefMut;
use std::sync::{Arc, Mutex};

tokio_test::block_on(async {
    let (tx, rx) = mpmc_async::channel(1);

    let num_workers = 10;
    let count = 10;
    let mut tasks = Vec::with_capacity(num_workers);

    for i in 0..num_workers {
        let mut tx = tx.clone();
        let task = tokio::spawn(async move {
            for j in 0..count {
                let val = i * count + j;
                tx.reserve().await.expect("no error").send(val);
            }
        });
        tasks.push(task);
    }

    let total = count * num_workers;
    let values = Arc::new(Mutex::new(BTreeSet::new()));

    for _ in 0..num_workers {
        let values = values.clone();
        let rx = rx.clone();
        let task = tokio::spawn(async move {
            for _ in 0..count {
                let val = rx.recv().await.expect("Failed to recv");
                values.lock().unwrap().insert(val);
            }
        });
        tasks.push(task);
    }

    for task in tasks {
        task.await.expect("failed to join task");
    }

    let exp = (0..total).collect::<Vec<_>>();
    let got = std::mem::take(values.lock().unwrap().deref_mut())
        .into_iter()
        .collect::<Vec<_>>();
    assert_eq!(exp, got);
});

Structs§

Enums§

  • Occurs when channel is empty or all senders have been dropped.
  • Occurs when the channel is full, or all receivers have been dropped.
  • TrySendError occurs when the channel is empty or all receivers have been dropped.

Functions§

  • Creates a new bounded channel. When cap is 0 it will be increased to 1.