Struct Pipeline

Source
pub struct Pipeline { /* private fields */ }
Expand description

A pipeline provides the infrastructure for managing a set of workers that run user-defined “tasks” on data going through the pipes.

§Examples

Creating a single producer and a single consumer.

use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::atomic::Ordering::{Acquire, SeqCst};
use tokio::sync::Mutex;
use async_pipes::Pipeline;

#[tokio::main]
async fn main() {
    use async_pipes::WorkerOptions;
let count = Arc::new(Mutex::new(0usize));

    let sum = Arc::new(AtomicUsize::new(0));
    let task_sum = sum.clone();

    Pipeline::builder()
        // Produce values 1 through 10
        .with_producer("data", move || {
            let count = count.clone();
            async move {
                let mut count = count.lock().await;
                if *count < 10 {
                    *count += 1;
                    Some(*count)
                } else {
                    None
                }
            }
        })
        .with_consumer("data", WorkerOptions::default_single_task(), move |value: usize| {
            let sum = task_sum.clone();
            async move {
                sum.fetch_add(value, SeqCst);
            }
        })
        .build()
        .expect("failed to build pipeline")
        .wait()
        .await;

    assert_eq!(sum.load(Acquire), 55);
}

Creating a branching producer and two consumers for each branch.

use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::atomic::Ordering::Acquire;
use tokio::sync::Mutex;
use async_pipes::{branch, NoOutput, Pipeline};

#[tokio::main]
async fn main() {
    use async_pipes::WorkerOptions;
let count = Arc::new(Mutex::new(0usize));

    let odds_sum = Arc::new(AtomicUsize::new(0));
    let task_odds_sum = odds_sum.clone();

    let evens_sum = Arc::new(AtomicUsize::new(0));
    let task_evens_sum = evens_sum.clone();

    Pipeline::builder()
        .with_branching_producer(vec!["evens", "odds"], move || {
            let c = count.clone();
            async move {
                let mut c = c.lock().await;
                if *c >= 10 {
                    return None;
                }
                *c += 1;

                let result = if *c % 2 == 0 {
                    branch![*c, NoOutput]
                } else {
                    branch![NoOutput, *c]
                };
                Some(result)
            }
        })
        .with_consumer("odds", WorkerOptions::default_single_task(), move |n: usize| {
            let odds_sum = task_odds_sum.clone();
            async move {
                odds_sum.fetch_add(n, Ordering::SeqCst);
            }
        })
        .with_consumer("evens", WorkerOptions::default_single_task(), move |n: usize| {
            let evens_sum = task_evens_sum.clone();
            async move {
                evens_sum.fetch_add(n, Ordering::SeqCst);
            }
        })
        .build()
        .expect("failed to build pipeline!")
        .wait()
        .await;

    assert_eq!(odds_sum.load(Acquire), 25);
    assert_eq!(evens_sum.load(Acquire), 30);
}

Implementations§

Source§

impl Pipeline

Source

pub fn builder() -> PipelineBuilder

Create a new pipeline builder.

Source

pub async fn wait(self)

Wait for the pipeline to complete.

Once the pipeline is complete, a termination signal is sent to to all the workers.

A pipeline progresses to completion by doing the following:

  1. Wait for all “producers” to complete while also progressing stage workers
  2. Wait for either all the stage workers to complete, or wait for the internal synchronizer to notify of completion (i.e. there’s no more data flowing through the pipeline)

Step 1 implies that if the producers never finish, the pipeline will run forever. See PipelineBuilder::with_producer for more info.

Trait Implementations§

Source§

impl Debug for Pipeline

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.