roughage 0.1.1

provides `AsyncPipeline`, a deadlock-free replacement for buffered streams
Documentation

roughage crates.io docs.rs

This crate provides a single type, AsyncPipeline, which is an alternative to buffered streams, FuturesOrdered, and FuturesUnordered.

All of those are prone to deadlocks if any of their buffered/concurrent futures touches an async lock of any kind, even indirectly. (For example, note that tokio::sync::mpsc channels use a Semaphore internally.) The problem is that they don't consistently poll their buffered futures, so a future holding a lock could stop making forward progress through no fault of its own. AsyncPipeline fixes this whole class of deadlocks by consistently polling all its in-flight futures until they complete. In other words, AsyncPipeline will never "snooze" a future.

Here's how easy it is to provoke a deadlock with buffered streams:

use futures::StreamExt;
use tokio::sync::Mutex;
use tokio::time::{Duration, sleep};

static LOCK: Mutex<()> = Mutex::const_new(());

// An innocent example function that touches an async lock. Note
// that the deadlocks below can happen even if this function is
// buried three crates deep in some dependency you never see.
async fn foo() {
    let _guard = LOCK.lock().await;
    sleep(Duration::from_millis(1)).await;
}

futures::stream::iter([foo(), foo()])
    .buffered(2)
    .for_each(|_| async {
        foo().await; // Deadlock!
    })
    .await;

Here's the same deadlock with FuturesUnordered:

let mut unordered = futures::stream::FuturesUnordered::new();
unordered.push(foo());
unordered.push(foo());
while let Some(_) = unordered.next().await {
    foo().await; // Deadlock!
}

An AsyncPipeline does not have this problem, because once it's started a future internally, it never stops polling it:

use roughage::AsyncPipeline;

AsyncPipeline::from_iter(0..100)
    .map_concurrent(|_| foo(), 10)
    .map_unordered(|_| foo(), 10)
    .for_each_concurrent(|_| foo(), 10)
    .await;
// Deadlock free!

See AsyncPipeline for more examples.

"Roughage" (ruff-edge) is an older term for dietary fiber. It keeps our pipes running smoothly.