use super::BranchBuilder;
use crate::{
flow::{condition::Condition, processor::Processor, source::Source, stage::Stage},
FlowError,
};
use std::{error::Error, sync::Arc};
use tokio::sync::broadcast;
use tracing::{debug, error, info, instrument, warn};
pub struct Flow<DataType, ErrorType, OutputType> {
pub(crate) source:
Option<Box<dyn Source<Input = (), Output = DataType, Error = ErrorType> + Send + Sync>>,
pub(crate) stages: Vec<Stage<DataType, ErrorType, OutputType>>,
}
impl<DataType, ErrorType, OutputType> Flow<DataType, ErrorType, OutputType>
where
DataType: Clone + Send + Sync + 'static,
OutputType: Send + Sync + 'static,
ErrorType: Error + Send + Sync + Clone + 'static + From<FlowError>,
{
#[must_use]
pub fn new() -> Self {
Self {
source: None,
stages: Vec::new(),
}
}
#[must_use]
pub fn source<SourceType>(mut self, source: SourceType) -> Self
where
SourceType:
Source<Input = (), Output = DataType, Error = ErrorType> + Send + Sync + 'static,
{
self.source = Some(Box::new(source));
self
}
#[must_use]
pub fn process<ProcessorType>(mut self, processor: ProcessorType) -> Self
where
ProcessorType: Processor<Input = DataType, Output = DataType, Error = ErrorType>
+ Send
+ Sync
+ 'static,
{
self.stages.push(Stage::Process(Box::new(processor)));
self
}
#[must_use]
pub fn when<ConditionType>(
self,
condition: ConditionType,
) -> BranchBuilder<DataType, OutputType, ErrorType>
where
ConditionType: Condition<Input = DataType, Output = OutputType, Error = ErrorType>
+ Send
+ Sync
+ 'static,
{
BranchBuilder::new(Box::new(condition), self)
}
#[instrument(skip(self))]
pub async fn run_stream(
mut self,
shutdown: broadcast::Receiver<()>,
) -> Result<Vec<DataType>, ErrorType> {
info!("Starting flow execution");
let source = self.source.take().ok_or_else(|| {
error!("Flow source not set");
ErrorType::from(FlowError::NoSource)
})?;
debug!("Initializing source stream");
let source_output = source.stream().await?;
let receiver = source_output.receiver;
let feedback = source_output.feedback;
let mut results = Vec::new();
let mut shutdown_rx = shutdown;
let mut handles = Vec::new();
let stages = Arc::new(self.stages);
info!("Starting message processing loop");
loop {
tokio::select! {
_ = shutdown_rx.recv() => {
warn!("Received shutdown signal");
break;
}
item = receiver.recv_async() => {
if let Ok(item) = item {
let feedback = feedback.clone();
let stages = Arc::clone(&stages);
debug!("Spawning task for data processing");
let handle = tokio::spawn(async move {
let mut current_item = match item {
Ok(data) => {
debug!("Processing new item");
data
},
Err(e) => {
error!("Source error: {:?}", e);
let _ = feedback.send(Err(e.clone()));
return Err(e);
}
};
for stage in stages.iter() {
match stage {
Stage::Process(processor) => {
debug!("Executing processor stage");
current_item = match processor.process(current_item).await {
Ok(data) => data,
Err(e) => {
error!("Processor error: {:?}", e);
let _ = feedback.send(Err(e.clone()));
return Err(e);
}
};
}
Stage::Branch(branch) => {
debug!("Evaluating branch condition");
let (condition_met, _) = match branch.condition.evaluate(current_item.clone()).await {
Ok(result) => result,
Err(e) => {
error!("Branch condition error: {:?}", e);
let _ = feedback.send(Err(e.clone()));
return Err(e);
}
};
let stages = if condition_met {
debug!("Taking then branch");
&branch.then_branch
} else {
debug!("Taking else branch");
&branch.else_branch
};
for stage in stages {
if let Stage::Process(processor) = stage {
current_item = match processor.process(current_item).await {
Ok(data) => data,
Err(e) => {
error!("Branch processor error: {:?}", e);
let _ = feedback.send(Err(e.clone()));
return Err(e);
}
};
}
}
}
}
}
debug!("Data processing completed successfully");
let _ = feedback.send(Ok(current_item.clone()));
Ok(current_item)
});
handles.push(handle);
} else {
debug!("Source channel closed");
break;
}
}
}
}
debug!("Collecting results from all tasks");
for handle in handles {
match handle.await {
Ok(Ok(result)) => results.push(result),
Ok(Err(e)) => {
error!("Task error: {:?}", e);
return Err(e);
}
Err(e) => {
error!("Task join error: {:?}", e);
return Err(ErrorType::from(FlowError::Custom(e.to_string())));
}
}
}
debug!("Flow execution completed successfully");
Ok(results)
}
}
impl<DataType, ErrorType, OutputType> Default for Flow<DataType, ErrorType, OutputType>
where
DataType: Clone + Send + Sync + 'static,
OutputType: Send + Sync + 'static,
ErrorType: Error + Send + Sync + Clone + 'static + From<FlowError>,
{
fn default() -> Self {
Self::new()
}
}