Struct async_pipes::Pipeline
source · pub struct Pipeline { /* private fields */ }Expand description
A pipeline provides the infrastructure for managing a set of workers that operate on and transfer data between them using pipes.
Examples
Creating a single producer and a single consumer.
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::atomic::Ordering::{Acquire, SeqCst};
use tokio::sync::Mutex;
use async_pipes::Pipeline;
tokio_test::block_on(async {
let count = Arc::new(Mutex::new(0usize));
let sum = Arc::new(AtomicUsize::new(0));
let task_sum = sum.clone();
Pipeline::builder()
// Produce values 1 through 10
.with_producer("data", move || {
let c = count.clone();
async move {
let mut c = c.lock().await;
if *c < 10 {
*c += 1;
Some(*c)
} else {
None
}
}
})
.with_consumer("data", move |value: usize| {
let ts = task_sum.clone();
async move {
ts.fetch_add(value, SeqCst);
}
})
.build()
.expect("failed to build pipeline")
.wait()
.await;
assert_eq!(sum.load(Acquire), 55);
});Creating a branching producer and two consumers for each branch.
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::atomic::Ordering::Acquire;
use tokio::sync::Mutex;
use async_pipes::{BoxedAnySend, Pipeline};
tokio_test::block_on(async {
let count = Arc::new(Mutex::new(0usize));
let odds_sum = Arc::new(AtomicUsize::new(0));
let task_odds_sum = odds_sum.clone();
let evens_sum = Arc::new(AtomicUsize::new(0));
let task_evens_sum = evens_sum.clone();
Pipeline::builder()
.with_branching_producer(vec!["evens", "odds"], move || {
let c = count.clone();
async move {
let mut c = c.lock().await;
if *c >= 10 {
return None;
}
*c += 1;
let output: Vec<Option<BoxedAnySend>> = if *c % 2 == 0 {
vec![Some(Box::new(*c)), None]
} else {
vec![None, Some(Box::new(*c))]
};
Some(output)
}
})
.with_consumer("odds", move |n: usize| {
let odds_sum = task_odds_sum.clone();
async move {
odds_sum.fetch_add(n, Ordering::SeqCst);
}
})
.with_consumer("evens", move |n: usize| {
let evens_sum = task_evens_sum.clone();
async move {
evens_sum.fetch_add(n, Ordering::SeqCst);
}
})
.build()
.expect("failed to build pipeline!")
.wait()
.await;
assert_eq!(odds_sum.load(Acquire), 25);
assert_eq!(evens_sum.load(Acquire), 30);
});Implementations§
source§impl Pipeline
impl Pipeline
sourcepub fn builder() -> PipelineBuilder
pub fn builder() -> PipelineBuilder
Create a new pipeline builder.
sourcepub async fn wait(self)
pub async fn wait(self)
Wait for the pipeline to complete.
Once the pipeline is complete, a termination signal is sent to to all the workers.
A pipeline progresses to completion by doing the following:
- Wait for all “producers” to complete while also progressing stage workers
- Wait for either all the stage workers to complete, or wait for the internal synchronizer to notify of completion (i.e. there’s no more data flowing through the pipeline)
Step 1 implies that if the producers never finish, the pipeline will run forever. See [Pipeline::register_producer] for more info.
Trait Implementations§
Auto Trait Implementations§
impl !RefUnwindSafe for Pipeline
impl Send for Pipeline
impl Sync for Pipeline
impl Unpin for Pipeline
impl !UnwindSafe for Pipeline
Blanket Implementations§
source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more