Struct async_pipes::Pipeline

source ·
pub struct Pipeline { /* private fields */ }
Expand description

A pipeline defines the infrastructure for managing stage workers and transferring data between them using pipes defined by the workers.

Examples

Creating a single producer and a single consumer.

use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::atomic::Ordering::Acquire;
use tokio::sync::Mutex;
use async_pipes::{Pipeline, PipeReader, Pipes, PipeWriter};

tokio_test::block_on(async {
    let (mut pipeline, mut pipes): (Pipeline, Pipes) = Pipeline::from_pipes(vec!["Pipe"]);
    let (writer, reader) = pipes.create_io::<usize>("Pipe").unwrap();

    // Produce values 1 through 10
    let count = Arc::new(Mutex::new(0));
    pipeline.register_producer("Producer", writer, || async move {
        let mut count = count.lock().await;
        if *count < 10 {
            *count += 1;
            Some(*count)
        } else {
            None
        }
    });

    let sum = Arc::new(AtomicUsize::new(0));
    let task_sum = sum.clone();
    pipeline.register_consumer("Consumer", reader, |n: usize| async move {
        task_sum.fetch_add(n, Ordering::SeqCst);
    });

    pipeline.wait().await;

    assert_eq!(sum.load(Acquire), 55);
});

Creating a branching producer and two consumers for each branch.

use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::atomic::Ordering::Acquire;
use tokio::sync::Mutex;
use async_pipes::{Pipeline , Pipes };

tokio_test::block_on(async {

    let (mut pipeline, mut pipes): (Pipeline, Pipes) = Pipeline::from_pipes(vec!["evens", "odds"]);
    let (evens_w, evens_r) = pipes.create_io::<usize>("evens").unwrap();
    let (odds_w, odds_r) = pipes.create_io::<usize>("odds").unwrap();

    let count = Arc::new(Mutex::new(0));
    pipeline.register_branching_producer("producer", vec![evens_w, odds_w], || async move {
        let mut count = count.lock().await;
        if *count >= 10 {
            return None;
        }
        *count += 1;

        let output = if *count % 2 == 0 {
            vec![Some(*count), None]
        } else {
            vec![None, Some(*count)]
        };

        Some(output)
    });

    let odds_sum = Arc::new(AtomicUsize::new(0));
    let task_odds_sum = odds_sum.clone();
    pipeline.register_consumer("odds-consumer", odds_r, |n: usize| async move {
        task_odds_sum.fetch_add(n, Ordering::SeqCst);
    });
    let evens_sum = Arc::new(AtomicUsize::new(0));
    let task_evens_sum = evens_sum.clone();
    pipeline.register_consumer("evens-consumer", evens_r, |n: usize| async move {
        task_evens_sum.fetch_add(n, Ordering::SeqCst);
    });

    pipeline.wait().await;

    assert_eq!(odds_sum.load(Acquire), 25);
    assert_eq!(evens_sum.load(Acquire), 30);
});

Implementations§

source§

impl Pipeline

source

pub fn from_pipes<S: AsRef<str>>(names: Vec<S>) -> (Self, Pipes)

Create a new pipeline using the list of pipe names.

Using the names, an instance of Pipes will also be created which can be used to create the I/O objects for each pipe. These objects can be given to Pipeline::register_branching calls which tell the pipeline to connect that pipe to the stage worker being registered.

Providing no names will result in a pipeline that can’t transfer any data.

source

pub async fn wait(self)

Wait for the pipeline to complete.

Once the pipeline is complete, a termination signal is sent to to all the workers.

A pipeline progresses to completion by doing the following:

  1. Wait for all “producers” to complete while also progressing stage workers
  2. Wait for either all the stage workers to complete, or wait for the internal synchronizer to notify of completion (i.e. there’s no more data flowing through the pipeline)

Step 1 implies that if the producers never finish, the pipeline will run forever. See Pipeline::register_producer for more info.

source

pub fn register_inputs<O: Send + 'static>( &mut self, name: impl Into<String>, writer: PipeWriter<O>, inputs: Vec<O> )

Register a set of inputs to be written to a provided pipe.

This effectively creates a producer stage internally, looping over each input and writing it to the pipe.

Arguments
  • name - For debugging purposes; provide an identifying string for this stage.
  • writer - Created by Pipes::create_io, where each input is written to.
  • inputs - A list of inputs to write to the pipe.
source

pub fn register_branching_inputs<O: Send + 'static>( &mut self, name: impl Into<String>, writers: Vec<PipeWriter<O>>, inputs: Vec<Vec<O>> )

Register a set of inputs to be written to a provided pipe.

This effectively creates a producer stage internally, looping over each input and writing it to the pipe.

Arguments
  • name - For debugging purposes; provide an identifying string for this stage.
  • writer - Created by Pipes::create_io, where each input is written to.
  • inputs - A list of inputs to write to the pipe.
source

pub fn register_producer<O, F, Fut>( &mut self, name: impl Into<String>, writer: PipeWriter<O>, task_definition: F )
where O: Send + 'static, F: FnOnce() -> Fut + Clone + Send + 'static, Fut: Future<Output = Option<O>> + Send + 'static,

Register a new stage in the pipeline that produces values and writes them into a pipe.

The producer will continue producing values until the user-provided task function returns None. This means that it is possible to create an infinite stream of values by simply never returning None.

Arguments
  • name - For debugging purposes; provide an identifying string for this stage.
  • writer - Created by Pipes::create_io, where produced output is written to.
  • task_definition - An async function that when executed produces a single output value.

If the output value returned by the task definition is Some(...), it is written to the provided pipe. Otherwise, if the output value is None, the producer terminates.

source

pub fn register_branching_producer<O, F, Fut>( &mut self, _name: impl Into<String>, writers: Vec<PipeWriter<O>>, task_definition: F )
where O: Send + 'static, F: FnOnce() -> Fut + Clone + Send + 'static, Fut: Future<Output = Option<Vec<Option<O>>>> + Send + 'static,

Register a new stage in the pipeline that produces multiple values and writes them into their respective pipe.

The producer will continue producing values until the user-provided task function returns None. This means that it is possible to create an infinite stream of values by simply never returning None.

Arguments
  • name - for debugging purposes; provide an identifying string for this stage.
  • writers - created by Pipes::create_io, where produced outputs are written to. The position of each output in the returned vector maps directly to the position of the writer in the writers vector provided.
  • task_definition - an async function that when executed produces a list of output values.

If the output returned by the task definition is Some(vec![...]) each value in the vector is written to the respective pipe. Otherwise, if the output value is None, the producer terminates.

source

pub fn register<I, O, F, Fut>( &mut self, name: impl Into<String>, reader: PipeReader<I>, writer: PipeWriter<O>, task_definition: F )
where I: Send + 'static, O: Send + 'static, F: FnOnce(I) -> Fut + Clone + Send + 'static, Fut: Future<Output = Option<O>> + Send + 'static,

Register a new stage in the pipeline that operates on incoming data and writes the result to a single output pipe.

This effectively provides a means to do a “mapping” transformation on incoming data, with an additional capability to filter it by returning None in the task definition.

Arguments
  • name - for debugging purposes; provide an identifying string for this stage.
  • reader - Created by Pipes::create_io, where incoming data is read from.
  • writer - Created by Pipes::create_io, where the task’s output is written to.
  • task_definition - An async function that operates on input received from the reader and returns an output that is written to the writer.

If the output returned by the task definition is Some(...), that value will be written. Otherwise, if the output value is None, that value will not be written.

source

pub fn register_branching<I, O, F, Fut>( &mut self, name: impl Into<String>, reader: PipeReader<I>, writers: Vec<PipeWriter<O>>, task_definition: F )
where I: Send + 'static, O: Send + 'static, F: FnOnce(I) -> Fut + Clone + Send + 'static, Fut: Future<Output = Vec<Option<O>>> + Send + 'static,

Register a new stage in the pipeline that operates on reads incoming data and writes the results to its respective output pipe.

This effectively provides a means to do a “mapping” transformation on incoming data, with an additional capability to filter it by returning None in the task definition.

Arguments
  • name - for debugging purposes; provide an identifying string for this stage.
  • reader - Created by Pipes::create_io, where incoming data is read from.
  • writer - Created by Pipes::create_io, where the task’s output is written to. The position of each output in the returned vector maps directly to the position of the writer in the writers vector provided.
  • task_definition - An async function that operates on input received from the reader and returns a list of outputs where each is written to its respective writer.

For each output, if the task definition returns Some(...), that value will be written. Otherwise, if it is None, that value will not be written.

source

pub fn register_consumer<I, F, Fut>( &mut self, name: impl Into<String>, reader: PipeReader<I>, task_definition: F )
where I: Send + 'static, F: FnOnce(I) -> Fut + Clone + Send + 'static, Fut: Future<Output = ()> + Send + 'static,

Register a new stage in the pipeline that consumes incoming data from a pipe.

This acts as a “leaf” stage where the data flowing through the pipeline stops flowing at.

Arguments
  • name - for debugging purposes; provide an identifying string for this stage.
  • reader - Created by Pipes::create_io, where incoming data is read from.
  • writer - Created by Pipes::create_io, where the task’s output is written to.
  • task_definition - An async function that operates on input received from the reader and returns an output that is written to the writer.

If the output returned by the task definition is Some(...), that value will be written. Otherwise, if the output value is None, that value will not be written.

Trait Implementations§

source§

impl Debug for Pipeline

source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

Blanket Implementations§

source§

impl<T> Any for T
where T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for T
where T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

source§

impl<T, U> Into<U> for T
where U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.