use crate::config::{Config, WorkflowStep, WorkflowStepType};
use crate::session::layers::types::GenericLayer;
use crate::session::layers::Layer;
use crate::session::Session;
use anyhow::Result;
use super::parser::PatternParser;
#[derive(Clone)]
pub struct WorkflowContext<'a> {
pub step_index: usize,
pub total_steps: usize,
pub workflow_name: &'a str,
}
impl<'a> WorkflowContext<'a> {
pub fn to_owned(&self) -> OwnedWorkflowContext {
OwnedWorkflowContext {
step_index: self.step_index,
total_steps: self.total_steps,
workflow_name: self.workflow_name.to_string(),
}
}
}
#[derive(Clone)]
pub struct OwnedWorkflowContext {
pub step_index: usize,
pub total_steps: usize,
pub workflow_name: String,
}
impl OwnedWorkflowContext {
pub fn as_ref(&self) -> WorkflowContext<'_> {
WorkflowContext {
step_index: self.step_index,
total_steps: self.total_steps,
workflow_name: &self.workflow_name,
}
}
}
pub struct StepExecutionResult {
pub output: String,
pub step_name: String,
pub step_index: usize,
pub total_steps: usize,
pub duration_ms: u64,
}
pub struct StepExecutor;
impl StepExecutor {
pub fn execute_step<'a>(
step: &'a WorkflowStep,
input: &'a str,
session: &'a mut Session,
config: &'a Config,
operation_cancelled: tokio::sync::watch::Receiver<bool>,
context: WorkflowContext<'a>,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<StepExecutionResult>> + Send + 'a>>
{
Box::pin(async move {
if *operation_cancelled.borrow() {
return Err(anyhow::anyhow!("Operation cancelled"));
}
crate::log_debug!(
"Executing workflow step: {} (type: {:?})",
step.name,
step.step_type
);
let step_start = std::time::Instant::now();
let result = match step.step_type {
WorkflowStepType::Once => {
Self::execute_once(step, input, session, config, operation_cancelled, &context)
.await?
}
WorkflowStepType::Loop => {
Self::execute_loop(step, input, session, config, operation_cancelled, &context)
.await?
}
WorkflowStepType::Foreach => {
Self::execute_foreach(
step,
input,
session,
config,
operation_cancelled,
&context,
)
.await?
}
WorkflowStepType::Conditional => {
Self::execute_conditional(
step,
input,
session,
config,
operation_cancelled,
&context,
)
.await?
}
WorkflowStepType::Parallel => {
Self::execute_parallel(
step,
input,
session,
config,
operation_cancelled,
&context,
)
.await?
}
};
let duration_ms = step_start.elapsed().as_millis() as u64;
Ok(StepExecutionResult {
output: result,
step_name: step.name.clone(),
step_index: context.step_index,
total_steps: context.total_steps,
duration_ms,
})
})
}
async fn execute_once(
step: &WorkflowStep,
input: &str,
session: &mut Session,
config: &Config,
operation_cancelled: tokio::sync::watch::Receiver<bool>,
context: &WorkflowContext<'_>,
) -> Result<String> {
let layer_name = step
.layer
.as_ref()
.ok_or_else(|| anyhow::anyhow!("Once step requires layer name"))?;
Self::execute_layer(
layer_name,
input,
session,
config,
operation_cancelled,
context,
)
.await
}
async fn execute_loop(
step: &WorkflowStep,
input: &str,
session: &mut Session,
config: &Config,
operation_cancelled: tokio::sync::watch::Receiver<bool>,
context: &WorkflowContext<'_>,
) -> Result<String> {
let exit_pattern = step
.exit_pattern
.as_ref()
.ok_or_else(|| anyhow::anyhow!("Loop step requires exit_pattern"))?;
let max_iterations = step.max_iterations.unwrap_or(10);
let mut current_input = input.to_string();
for iteration in 0..max_iterations {
crate::log_debug!("Loop iteration {}/{}", iteration + 1, max_iterations);
for substep in &step.substeps {
let substep_result = Self::execute_step(
substep,
¤t_input,
session,
config,
operation_cancelled.clone(),
context.clone(),
)
.await?;
current_input = substep_result.output;
if *operation_cancelled.borrow() {
return Err(anyhow::anyhow!("Operation cancelled"));
}
}
if PatternParser::matches(¤t_input, exit_pattern)? {
break;
}
if iteration == max_iterations - 1 {
crate::log_info!("Loop max iterations reached for step: {}", step.name);
}
}
Ok(current_input)
}
async fn execute_foreach(
step: &WorkflowStep,
input: &str,
session: &mut Session,
config: &Config,
operation_cancelled: tokio::sync::watch::Receiver<bool>,
context: &WorkflowContext<'_>,
) -> Result<String> {
let pattern = step
.parse_pattern
.as_ref()
.ok_or_else(|| anyhow::anyhow!("Foreach step requires parse_pattern"))?;
let items = PatternParser::parse_items(input, pattern)?;
let total_items = items.len();
let mut results = Vec::new();
for (i, item) in items.iter().enumerate() {
crate::log_debug!("Processing item {}/{}: {}", i + 1, total_items, item);
let mut current_input = item.clone();
for substep in &step.substeps {
let substep_result = Self::execute_step(
substep,
¤t_input,
session,
config,
operation_cancelled.clone(),
context.clone(),
)
.await?;
current_input = substep_result.output;
if *operation_cancelled.borrow() {
return Err(anyhow::anyhow!("Operation cancelled"));
}
}
results.push(current_input);
}
Ok(results.join("\n\n"))
}
async fn execute_conditional(
step: &WorkflowStep,
input: &str,
session: &mut Session,
config: &Config,
operation_cancelled: tokio::sync::watch::Receiver<bool>,
context: &WorkflowContext<'_>,
) -> Result<String> {
let layer_name = step
.layer
.as_ref()
.ok_or_else(|| anyhow::anyhow!("Conditional step requires layer"))?;
let condition_pattern = step
.condition_pattern
.as_ref()
.ok_or_else(|| anyhow::anyhow!("Conditional step requires condition_pattern"))?;
let output = Self::execute_layer(
layer_name,
input,
session,
config,
operation_cancelled.clone(),
context,
)
.await?;
let matches = PatternParser::matches(&output, condition_pattern)?;
if matches {
crate::log_debug!("Condition matched for step: {}", step.name);
} else {
crate::log_debug!("Condition not matched for step: {}", step.name);
}
let layers_to_execute = if matches {
&step.on_match
} else {
&step.on_no_match
};
let mut current_input = output;
for layer_name in layers_to_execute {
current_input = Self::execute_layer(
layer_name,
¤t_input,
session,
config,
operation_cancelled.clone(),
context,
)
.await?;
}
Ok(current_input)
}
async fn execute_parallel(
step: &WorkflowStep,
input: &str,
session: &mut Session,
config: &Config,
operation_cancelled: tokio::sync::watch::Receiver<bool>,
context: &WorkflowContext<'_>,
) -> Result<String> {
let mut futures = Vec::new();
for layer_name in &step.parallel_layers {
let layer_name = layer_name.clone();
let input = input.to_string();
let mut session = session.clone();
let config = config.clone();
let operation_cancelled = operation_cancelled.clone();
let ctx = context.to_owned();
futures.push(tokio::spawn(async move {
Self::execute_layer(
&layer_name,
&input,
&mut session,
&config,
operation_cancelled,
&ctx.as_ref(),
)
.await
}));
}
let results = futures::future::join_all(futures).await;
let mut outputs = Vec::new();
for result in results {
let output = result??;
outputs.push(output);
}
if let Some(aggregator) = &step.aggregator {
let combined_input = outputs.join("\n\n---\n\n");
Self::execute_layer(
aggregator,
&combined_input,
session,
config,
operation_cancelled,
context,
)
.await
} else {
Ok(outputs.join("\n\n"))
}
}
async fn execute_layer(
layer_name: &str,
input: &str,
session: &mut Session,
config: &Config,
operation_cancelled: tokio::sync::watch::Receiver<bool>,
context: &WorkflowContext<'_>,
) -> Result<String> {
use colored::Colorize;
let mut layer_config = if let Some(layers) = &config.layers {
layers
.iter()
.find(|l| l.name == layer_name)
.ok_or_else(|| anyhow::anyhow!("Layer '{}' not found in config", layer_name))?
.clone()
} else {
return Err(anyhow::anyhow!(
"No layers defined in config, cannot execute layer '{}'",
layer_name
));
};
let current_dir = crate::mcp::get_thread_working_directory();
layer_config
.process_and_cache_system_prompt(¤t_dir)
.await;
let mut layer = GenericLayer::new(layer_config);
layer.set_workflow_context(
context.step_index,
context.total_steps,
context.workflow_name.to_string(),
);
let result = layer
.process(input, session, config, operation_cancelled)
.await?;
use crate::session::layers::OutputMode;
match layer.config().output_mode {
OutputMode::None => {
crate::log_debug!(
"Layer '{}': output_mode=none (intermediate layer)",
layer_name
);
}
OutputMode::Append => {
crate::log_debug!(
"Layer '{}': output_mode=append (adding {} outputs)",
layer_name,
result.outputs.len()
);
for output_text in &result.outputs {
session.add_message(layer.config().output_role.as_str(), output_text);
}
}
OutputMode::Replace => {
crate::log_debug!(
"Layer '{}': output_mode=replace (replacing session)",
layer_name
);
let system_message = session
.messages
.iter()
.find(|m| m.role == "system")
.cloned();
session.messages.clear();
let mut final_messages = Vec::new();
if let Some(sys_msg) = system_message {
final_messages.push(sys_msg);
}
for output_text in &result.outputs {
let output_msg = crate::session::Message {
role: layer.config().output_role.as_str().to_string(),
content: output_text.clone(),
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs(),
cached: false,
..Default::default()
};
final_messages.push(output_msg);
}
session.messages = final_messages;
}
OutputMode::Last => {
crate::log_debug!(
"Layer '{}': output_mode=last (adding last output)",
layer_name
);
let last_message = result.outputs.last().unwrap_or(&String::new()).clone();
session.add_message(layer.config().output_role.as_str(), &last_message);
}
OutputMode::Restart => {
crate::log_debug!(
"Layer '{}': output_mode=restart (replacing with last output)",
layer_name
);
session.messages.clear();
let last_message = result.outputs.last().unwrap_or(&String::new()).clone();
session.add_message(layer.config().output_role.as_str(), &last_message);
}
}
let had_tool_calls =
result.tool_calls.is_some() && !result.tool_calls.as_ref().unwrap().is_empty();
if let Some(output) = result.outputs.last() {
if !output.trim().is_empty() {
if !had_tool_calls {
let response_header = format!(
" {} | {} | Step {}/{} ",
context.workflow_name.bright_yellow(),
layer_name.bright_cyan(),
context.step_index,
context.total_steps
);
let separator_length = 70.max(response_header.len() + 4);
let dashes = "─".repeat(separator_length - response_header.len());
let separator = format!("──{}{}──", response_header, dashes.dimmed());
println!("{}", separator);
}
println!();
use crate::session::chat::assistant_output::print_assistant_response;
print_assistant_response(output, config, "", &None);
println!();
}
}
Ok(result.outputs.last().unwrap_or(&String::new()).clone())
}
}