pub struct Flow<DataType, ErrorType, OutputType> { /* private fields */ }
Expand description
A builder for constructing and executing data processing flows.
Flow
represents a sequence of processing stages that data flows through. It supports
various operations including data transformation, conditional branching, and async execution.
§Type Parameters
DataType
- The type of data flowing through the pipelineErrorType
- The error type that can be produced during processingOutputType
- The output type produced by conditions in branches
§Examples
use cortex_ai::composer::Flow;
use cortex_ai::flow::source::Source;
use cortex_ai::flow::types::SourceOutput;
use cortex_ai::flow::condition::Condition;
use cortex_ai::flow::processor::Processor;
use cortex_ai::FlowComponent;
use cortex_ai::FlowError;
use std::error::Error;
use std::fmt;
use std::pin::Pin;
use std::future::Future;
use flume::{Sender, Receiver};
#[derive(Clone, Debug)]
struct MyData(String);
#[derive(Clone, Debug)]
struct MyError;
impl fmt::Display for MyError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "MyError")
}
}
impl Error for MyError {}
impl From<FlowError> for MyError {
fn from(e: FlowError) -> Self { MyError }
}
struct MySource;
impl FlowComponent for MySource {
type Input = ();
type Output = MyData;
type Error = MyError;
}
impl Source for MySource {
fn stream(&self) -> Pin<Box<dyn Future<Output = Result<SourceOutput<Self::Output, Self::Error>, Self::Error>> + Send>> {
Box::pin(async move {
let (tx, rx) = flume::bounded(10);
let (feedback_tx, _) = flume::bounded(10);
Ok(SourceOutput { receiver: rx, feedback: feedback_tx })
})
}
}
struct MyProcessor;
impl FlowComponent for MyProcessor {
type Input = MyData;
type Output = MyData;
type Error = MyError;
}
impl Processor for MyProcessor {
fn process(&self, input: Self::Input) -> Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>> {
Box::pin(async move { Ok(input) })
}
}
struct MyCondition;
impl FlowComponent for MyCondition {
type Input = MyData;
type Output = bool;
type Error = MyError;
}
impl Condition for MyCondition {
fn evaluate(&self, input: Self::Input) -> Pin<Box<dyn Future<Output = Result<(bool, Option<Self::Output>), Self::Error>> + Send>> {
Box::pin(async move { Ok((true, Some(false))) })
}
}
#[tokio::main]
async fn main() {
let (_, shutdown_rx) = tokio::sync::broadcast::channel::<()>(1);
let flow = Flow::<MyData, MyError, bool>::new()
.source(MySource)
.process(MyProcessor)
.when(MyCondition)
.process(MyProcessor)
.otherwise()
.process(MyProcessor)
.end();
let _ = flow.run_stream(shutdown_rx).await;
}
Implementations§
Source§impl<DataType, ErrorType, OutputType> Flow<DataType, ErrorType, OutputType>
impl<DataType, ErrorType, OutputType> Flow<DataType, ErrorType, OutputType>
Sourcepub fn when<ConditionType>(
self,
condition: ConditionType,
) -> BranchBuilder<DataType, OutputType, ErrorType>
pub fn when<ConditionType>( self, condition: ConditionType, ) -> BranchBuilder<DataType, OutputType, ErrorType>
Sourcepub async fn run_stream(
self,
shutdown: Receiver<()>,
) -> Result<Vec<DataType>, ErrorType>
pub async fn run_stream( self, shutdown: Receiver<()>, ) -> Result<Vec<DataType>, ErrorType>
Executes the flow asynchronously, processing data from the source through all stages.
§Arguments
shutdown
- A broadcast receiver for graceful shutdown signaling
§Returns
A Result containing either a vector of processed data items or an error
§Errors
Returns an error if:
- The flow source is not set
- Any stage in the flow returns an error during processing
- The task execution fails
Trait Implementations§
Auto Trait Implementations§
impl<DataType, ErrorType, OutputType> Freeze for Flow<DataType, ErrorType, OutputType>
impl<DataType, ErrorType, OutputType> !RefUnwindSafe for Flow<DataType, ErrorType, OutputType>
impl<DataType, ErrorType, OutputType> Send for Flow<DataType, ErrorType, OutputType>
impl<DataType, ErrorType, OutputType> Sync for Flow<DataType, ErrorType, OutputType>
impl<DataType, ErrorType, OutputType> Unpin for Flow<DataType, ErrorType, OutputType>
impl<DataType, ErrorType, OutputType> !UnwindSafe for Flow<DataType, ErrorType, OutputType>
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more