roughage

This crate provides a single type, AsyncPipeline, which is an alternative to buffered
streams, FuturesOrdered, and FuturesUnordered.
All of those are prone to deadlocks if any of their buffered/concurrent futures touches an
async lock of any kind, even indirectly. (For example, note that tokio::sync::mpsc
channels use a Semaphore internally.) The problem is that they don't
consistently poll their buffered futures, so a future holding a lock could stop making forward
progress through no fault of its own. AsyncPipeline fixes this whole class of deadlocks by
consistently polling all its in-flight futures until they complete. In other words,
AsyncPipeline will never "snooze" a future.
Here's how easy it is to provoke a deadlock with buffered streams:
use StreamExt;
use Mutex;
use ;
static LOCK: = const_new;
// An innocent example function that touches an async lock. Note
// that the deadlocks below can happen even if this function is
// buried three crates deep in some dependency you never see.
async
iter
.buffered
.for_each
.await;
Here's the same deadlock with FuturesUnordered:
let mut unordered = new;
unordered.push;
unordered.push;
while let Some = unordered.next.await
An AsyncPipeline does not have this problem, because once it's started a future internally,
it never stops polling it:
use AsyncPipeline;
from_iter
.map_concurrent
.map_unordered
.for_each_concurrent
.await;
// Deadlock free!
See AsyncPipeline for more examples.
"Roughage" (ruff-edge) is an older term for dietary fiber. It keeps our pipes running smoothly.