pumps-rs
Eager streams for Rust. If a stream allows water to flow down the hill, a pump forces it up.
Futures stream api is awesome, but has unfortunate issues
- Futures run in surprising and unintuitive order. Read about Barbara battles buffered streams
- Prone to surprising deadlocks. Fixing the Next Thousand Deadlocks: Why Buffered Streams Are Broken and How To Make Them Safer
- Rust Stream API visualized and exposed
- Fixing these issues will require added features to the language - poll_progress
This crate offers an alternative approach for rust async pipelining.
Main features:
- Designed for common async pipelining needs in heart
- Explicit concurrency, ordering, and backpressure control
- Eager - work is done before downstream methods consumes it
- builds on top of Rust async tools as tasks and channels.
- For now only supports the Tokio async runtime
- TBA
- additional operators
Example:
let = from_iter
.map
.map
.filter_map
.map
.build;
while let Some = output_receiver.recv.await
Pumps
A Pump is a wrapper around a common async programming (or rather multithreading) pattern - concurrent work is split into several tasks that communicate with each other using channels
let = channel;
let = channel;
let = channel;
spawn;
spawn;
// send data to input channel
send_input.await;
while let Some = receiver2.recv.await
A 'Pump' is one step of such pipeline - a task and input/output channel. For example the Map Pump spawns a task, receives input via a Receiver, runs an async function, and sends its output to a Sender
A Pipeline is a chain of Pumps. Each pump receives its input from the output channel of its predecessor
Creation
// from channel
let = from;
// from a stream
let = from_stream;
// create an iterator
let = from_iter;
The .build() method returns a touple of a tokio::sync::mpsc::Receiver and a join handle to the internally spawned tasks
Concurrency control
Each Pump operation receives a Concurrency struct that defines the concurrency characteristics of the operation.
- serial execution -
Concurrency::serial() - concurrent execution -
Concurrency::concurrent_ordered(n),Concurrency::concurrent_unordered(n)
Backpressure
Backpressure defines the amount of unconsumed data can accumulate in memory. Without back pressure an eger operation will keep processing data and storing it in memory. A slow downstream consumer will result with unbounded memory usage.
The .backpressure(n) definition limits the output channel of a Pump allowing it to stop processing data until the output channel have been consumed.
The default backpressure is equal to the concurrency number