Skip to main content

Pipeline

Struct Pipeline 

Source
pub struct Pipeline<T: Send + 'static> { /* private fields */ }
Expand description

A parallel data pipeline backed by bounded crossbeam channels and worker threads.

Each stage method consumes the pipeline and returns a new one whose type reflects the output of that stage. All worker threads are tracked in a shared Arc<Mutex<Vec<JoinHandle>>> so that calling one of the terminal methods (collect, for_each, wait_for_completion) joins every thread spawned by the whole graph.

Implementations§

Source§

impl<T: Send + 'static> Pipeline<T>

Source

pub fn new(iter: impl IntoIterator<Item = T> + Send + 'static) -> Self

Create a pipeline from any iterator. A feeder thread is spawned immediately to push items into the first bounded channel.

Source

pub fn with_buffer(self, size: usize) -> Self

Override the inter-stage channel capacity (default: 4, matching olympipe).

Source§

impl<T: Send + 'static> Pipeline<T>

Source

pub fn task<U, F>(self, f: F, count: usize) -> Pipeline<U>
where U: Send + 'static, F: Fn(T) -> U + Send + Clone + 'static,

Map each item through f. count worker threads run concurrently, sharing the input channel (MPMC) and writing to a shared output channel.

Source

pub fn task_or<U, E, F, H>(self, f: F, on_error: H) -> Pipeline<U>
where T: Clone, U: Send + 'static, E: 'static, F: Fn(T) -> Result<U, E> + Send + 'static, H: Fn(T, E) -> Option<U> + Send + 'static,

Map each item through f which may fail. On error, on_error is called with a clone of the original item and the error; if it returns Some(v), v is forwarded downstream; None skips the item.

Source

pub fn filter<F>(self, f: F) -> Pipeline<T>
where F: Fn(&T) -> bool + Send + 'static,

Keep only items for which f returns true.

Source

pub fn batch(self, size: usize) -> Pipeline<Vec<T>>

Group items into Vec<T> of at most size elements. The last (potentially incomplete) batch is always emitted.

Source

pub fn temporal_batch(self, window: Duration) -> Pipeline<Vec<T>>

Collect items arriving within window of each other into a Vec<T>. A new batch begins whenever the inter-item gap exceeds window, or when the upstream channel disconnects.

Source

pub fn explode<U, I, F>(self, f: F) -> Pipeline<U>
where U: Send + 'static, I: IntoIterator<Item = U>, F: Fn(T) -> I + Send + 'static,

Apply f to each item and forward every element yielded by the returned iterator (flatMap).

Source

pub fn split<A, B, F>(self, f: F) -> (Pipeline<A>, Pipeline<B>)
where A: Send + 'static, B: Send + 'static, F: Fn(T) -> (Option<A>, Option<B>) + Send + 'static,

Route each item to one or both of two output pipelines according to f.

Both returned pipelines share the same handle registry, so a single terminal call on either one will join all threads.

The two output channels are unbounded so that the router thread can fully drain its input even when the two branches are consumed sequentially (calling collect() on one branch at a time). If you add further bounded stages after split, back-pressure is restored there.

Source

pub fn gather(self, others: Vec<Pipeline<T>>) -> Pipeline<T>

Merge self and all pipelines in others into a single output stream. One forwarder thread is spawned per source; all handles are merged into the returned pipeline’s handle registry.

Source

pub fn reduce<U, F>(self, init: U, f: F) -> Pipeline<U>
where U: Send + 'static, F: Fn(U, T) -> U + Send + 'static,

Fold all items into a single accumulated value and emit it as the sole output item.

Source

pub fn limit(self, n: usize) -> Pipeline<T>

Pass at most n items downstream, then stop.

Source

pub fn timeout(self, duration: Duration) -> Pipeline<T>

Error (drop the sender, propagating disconnection downstream) if no item arrives within duration between two consecutive items.

Source

pub fn debug(self) -> Pipeline<T>
where T: Debug,

Print each item to stdout and forward it unchanged.

Source§

impl<T: Send + 'static> Pipeline<T>

Source

pub fn collect(self) -> Vec<T>

Collect all output items into a Vec, then join every worker thread.

Source

pub fn for_each<F>(self, f: F)
where F: FnMut(T),

Apply f to each output item, then join every worker thread.

Source

pub fn wait_for_completion(self)

Drain and discard all output, then join every worker thread.

Auto Trait Implementations§

§

impl<T> Freeze for Pipeline<T>

§

impl<T> RefUnwindSafe for Pipeline<T>

§

impl<T> Send for Pipeline<T>

§

impl<T> Sync for Pipeline<T>

§

impl<T> Unpin for Pipeline<T>
where T: Unpin,

§

impl<T> UnsafeUnpin for Pipeline<T>

§

impl<T> UnwindSafe for Pipeline<T>

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.