pub struct Pipeline { /* private fields */ }
Expand description
A pipeline provides the infrastructure for managing a set of workers that run user-defined “tasks” on data going through the pipes.
§Examples
Creating a single producer and a single consumer.
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::atomic::Ordering::{Acquire, SeqCst};
use tokio::sync::Mutex;
use async_pipes::Pipeline;
#[tokio::main]
async fn main() {
use async_pipes::WorkerOptions;
let count = Arc::new(Mutex::new(0usize));
let sum = Arc::new(AtomicUsize::new(0));
let task_sum = sum.clone();
Pipeline::builder()
// Produce values 1 through 10
.with_producer("data", move || {
let count = count.clone();
async move {
let mut count = count.lock().await;
if *count < 10 {
*count += 1;
Some(*count)
} else {
None
}
}
})
.with_consumer("data", WorkerOptions::default_single_task(), move |value: usize| {
let sum = task_sum.clone();
async move {
sum.fetch_add(value, SeqCst);
}
})
.build()
.expect("failed to build pipeline")
.wait()
.await;
assert_eq!(sum.load(Acquire), 55);
}
Creating a branching producer and two consumers for each branch.
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::atomic::Ordering::Acquire;
use tokio::sync::Mutex;
use async_pipes::{branch, NoOutput, Pipeline};
#[tokio::main]
async fn main() {
use async_pipes::WorkerOptions;
let count = Arc::new(Mutex::new(0usize));
let odds_sum = Arc::new(AtomicUsize::new(0));
let task_odds_sum = odds_sum.clone();
let evens_sum = Arc::new(AtomicUsize::new(0));
let task_evens_sum = evens_sum.clone();
Pipeline::builder()
.with_branching_producer(vec!["evens", "odds"], move || {
let c = count.clone();
async move {
let mut c = c.lock().await;
if *c >= 10 {
return None;
}
*c += 1;
let result = if *c % 2 == 0 {
branch![*c, NoOutput]
} else {
branch![NoOutput, *c]
};
Some(result)
}
})
.with_consumer("odds", WorkerOptions::default_single_task(), move |n: usize| {
let odds_sum = task_odds_sum.clone();
async move {
odds_sum.fetch_add(n, Ordering::SeqCst);
}
})
.with_consumer("evens", WorkerOptions::default_single_task(), move |n: usize| {
let evens_sum = task_evens_sum.clone();
async move {
evens_sum.fetch_add(n, Ordering::SeqCst);
}
})
.build()
.expect("failed to build pipeline!")
.wait()
.await;
assert_eq!(odds_sum.load(Acquire), 25);
assert_eq!(evens_sum.load(Acquire), 30);
}
Implementations§
Source§impl Pipeline
impl Pipeline
Sourcepub fn builder() -> PipelineBuilder
pub fn builder() -> PipelineBuilder
Create a new pipeline builder.
Sourcepub async fn wait(self)
pub async fn wait(self)
Wait for the pipeline to complete.
Once the pipeline is complete, a termination signal is sent to to all the workers.
A pipeline progresses to completion by doing the following:
- Wait for all “producers” to complete while also progressing stage workers
- Wait for either all the stage workers to complete, or wait for the internal synchronizer to notify of completion (i.e. there’s no more data flowing through the pipeline)
Step 1 implies that if the producers never finish, the pipeline will run forever. See PipelineBuilder::with_producer for more info.
Trait Implementations§
Auto Trait Implementations§
impl Freeze for Pipeline
impl RefUnwindSafe for Pipeline
impl Send for Pipeline
impl Sync for Pipeline
impl Unpin for Pipeline
impl UnwindSafe for Pipeline
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
Converts
self
into a Left
variant of Either<Self, Self>
if into_left
is true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
Converts
self
into a Left
variant of Either<Self, Self>
if into_left(&self)
returns true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read more