Struct Flow

Source
pub struct Flow<DataType, ErrorType, OutputType> { /* private fields */ }
Expand description

A builder for constructing and executing data processing flows.

Flow represents a sequence of processing stages that data flows through. It supports various operations including data transformation, conditional branching, and async execution.

§Type Parameters

  • DataType - The type of data flowing through the pipeline
  • ErrorType - The error type that can be produced during processing
  • OutputType - The output type produced by conditions in branches

§Examples

use cortex_ai::composer::Flow;
use cortex_ai::flow::source::Source;
use cortex_ai::flow::types::SourceOutput;
use cortex_ai::flow::condition::Condition;
use cortex_ai::flow::processor::Processor;
use cortex_ai::FlowComponent;
use cortex_ai::FlowError;
use std::error::Error;
use std::fmt;
use std::pin::Pin;
use std::future::Future;
use flume::{Sender, Receiver};

#[derive(Clone, Debug)]
struct MyData(String);

#[derive(Clone, Debug)]
struct MyError;

impl fmt::Display for MyError {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(f, "MyError")
    }
}

impl Error for MyError {}

impl From<FlowError> for MyError {
    fn from(e: FlowError) -> Self { MyError }
}

struct MySource;

impl FlowComponent for MySource {
    type Input = ();
    type Output = MyData;
    type Error = MyError;
}

impl Source for MySource {
    fn stream(&self) -> Pin<Box<dyn Future<Output = Result<SourceOutput<Self::Output, Self::Error>, Self::Error>> + Send>> {
        Box::pin(async move {
            let (tx, rx) = flume::bounded(10);
            let (feedback_tx, _) = flume::bounded(10);
            Ok(SourceOutput { receiver: rx, feedback: feedback_tx })
        })
    }
}

struct MyProcessor;
impl FlowComponent for MyProcessor {
    type Input = MyData;
    type Output = MyData;
    type Error = MyError;
}

impl Processor for MyProcessor {
    fn process(&self, input: Self::Input) -> Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>> {
        Box::pin(async move { Ok(input) })
    }
}

struct MyCondition;
impl FlowComponent for MyCondition {
    type Input = MyData;
    type Output = bool;
    type Error = MyError;
}

impl Condition for MyCondition {
    fn evaluate(&self, input: Self::Input) -> Pin<Box<dyn Future<Output = Result<(bool, Option<Self::Output>), Self::Error>> + Send>> {
        Box::pin(async move { Ok((true, Some(false))) })
    }
}

#[tokio::main]
async fn main() {
    let (_, shutdown_rx) = tokio::sync::broadcast::channel::<()>(1);
     
    let flow = Flow::<MyData, MyError, bool>::new()
        .source(MySource)
        .process(MyProcessor)
        .when(MyCondition)
        .process(MyProcessor)
        .otherwise()
        .process(MyProcessor)
        .end();

    let _ = flow.run_stream(shutdown_rx).await;
}

Implementations§

Source§

impl<DataType, ErrorType, OutputType> Flow<DataType, ErrorType, OutputType>
where DataType: Clone + Send + Sync + 'static, OutputType: Send + Sync + 'static, ErrorType: Error + Send + Sync + Clone + 'static + From<FlowError>,

Source

pub fn new() -> Self

Creates a new empty Flow.

§Returns

A new instance of Flow with no source or stages configured

Source

pub fn source<SourceType>(self, source: SourceType) -> Self
where SourceType: Source<Input = (), Output = DataType, Error = ErrorType> + Send + Sync + 'static,

Sets the data source for the flow.

§Arguments
  • source - The source that will produce data for the flow
§Returns

The flow builder for method chaining

Source

pub fn process<ProcessorType>(self, processor: ProcessorType) -> Self
where ProcessorType: Processor<Input = DataType, Output = DataType, Error = ErrorType> + Send + Sync + 'static,

Adds a processor stage to the flow.

§Arguments
  • processor - The processor to add to the flow
§Returns

The flow builder for method chaining

Source

pub fn when<ConditionType>( self, condition: ConditionType, ) -> BranchBuilder<DataType, OutputType, ErrorType>
where ConditionType: Condition<Input = DataType, Output = OutputType, Error = ErrorType> + Send + Sync + 'static,

Starts building a conditional branch in the flow.

§Arguments
  • condition - The condition that determines which branch to take
§Returns

A BranchBuilder for constructing the conditional branches

Source

pub async fn run_stream( self, shutdown: Receiver<()>, ) -> Result<Vec<DataType>, ErrorType>

Executes the flow asynchronously, processing data from the source through all stages.

§Arguments
  • shutdown - A broadcast receiver for graceful shutdown signaling
§Returns

A Result containing either a vector of processed data items or an error

§Errors

Returns an error if:

  • The flow source is not set
  • Any stage in the flow returns an error during processing
  • The task execution fails

Trait Implementations§

Source§

impl<DataType, ErrorType, OutputType> Default for Flow<DataType, ErrorType, OutputType>
where DataType: Clone + Send + Sync + 'static, OutputType: Send + Sync + 'static, ErrorType: Error + Send + Sync + Clone + 'static + From<FlowError>,

Source§

fn default() -> Self

Returns the “default value” for a type. Read more

Auto Trait Implementations§

§

impl<DataType, ErrorType, OutputType> Freeze for Flow<DataType, ErrorType, OutputType>

§

impl<DataType, ErrorType, OutputType> !RefUnwindSafe for Flow<DataType, ErrorType, OutputType>

§

impl<DataType, ErrorType, OutputType> Send for Flow<DataType, ErrorType, OutputType>
where DataType: Send, ErrorType: Send, OutputType: Send,

§

impl<DataType, ErrorType, OutputType> Sync for Flow<DataType, ErrorType, OutputType>
where DataType: Sync, ErrorType: Sync, OutputType: Sync,

§

impl<DataType, ErrorType, OutputType> Unpin for Flow<DataType, ErrorType, OutputType>

§

impl<DataType, ErrorType, OutputType> !UnwindSafe for Flow<DataType, ErrorType, OutputType>

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more