Async Pipes
Create a lightweight, concurrent data processing pipeline for Rust applications.
Description
Async Pipes provides a simple way to create high-throughput data processing pipelines by utilizing Rust's
asynchronous runtime capabilities. This is done by this library providing the infrastructure for managing
asynchronous tasks and data transfer between the tasks so the developer only has to worry about the task-specific
implementation for each stage in the pipeline.
Simple, Linear Pipeline Example
use std::sync::Arc;
use tokio::sync::Mutex;
use async_pipes::Pipeline;
use async_pipes::atomic_mut;
#[tokio::main]
async fn main() {
let (mut pipeline, mut pipes) =
Pipeline::from_pipes(vec!["MapInput", "MapToReduce", "ReduceToLog"]);
let (map_input_w, map_input_r) = pipes.create_io("MapInput").unwrap();
let (map_to_reduce_w, map_to_reduce_r) = pipes.create_io("MapToReduce").unwrap();
pipeline.register_inputs("Producer", map_input_w, vec!["a", "bb", "ccc"]);
pipeline.register("MapStage", map_input_r, map_to_reduce_w, |value: &'static str| async move {
let new_value = format!("{}!", value);
Some(new_value)
});
let total_count = atomic_mut(0);
let reduce_total_count = total_count.clone();
pipeline.register_consumer("ReduceStage", map_to_reduce_r, |value: String| async move {
*reduce_total_count.lock().await += value.len();
});
pipeline.wait().await;
assert_eq!(*total_count.lock().await, 9);
}
Documentation
WIP