use crate::core::error::RustChainError;
use crate::engine::{DagExecutor, ExecutionContext, MissionStep, StepType};
use serde_json::Value;
use std::collections::HashMap;
use tracing::{debug, error, info};
#[derive(Debug, Clone)]
pub struct ChainExecutor {
chain_id: String,
}
impl ChainExecutor {
pub fn new(chain_id: String) -> Self {
Self { chain_id }
}
pub async fn execute_chain_steps(
&self,
chain_steps: &[ChainSubStep],
parent_context: &mut ExecutionContext,
) -> Result<String, RustChainError> {
info!("Executing chain '{}' with {} sub-steps", self.chain_id, chain_steps.len());
let mut chain_results = Vec::new();
let mut chain_context = ExecutionContext::new();
for (key, value) in parent_context.get_all_variables() {
chain_context.set_variable(key, value);
}
for (idx, sub_step) in chain_steps.iter().enumerate() {
debug!("Executing chain sub-step {}: {}", idx + 1, sub_step.step_name);
let mission_step = self.convert_sub_step_to_mission_step(sub_step, &chain_context)?;
let step_result = Box::pin(DagExecutor::execute_step(&mission_step, &mut chain_context)).await?;
if step_result.status != crate::engine::StepStatus::Success {
let error_msg = step_result.error.unwrap_or_else(|| format!("Chain sub-step {} failed", sub_step.step_name));
error!("Chain sub-step failed: {}", error_msg);
return Err(RustChainError::Execution(crate::core::error::ExecutionError::StepFailed {
mission_id: self.chain_id.clone(),
step_id: sub_step.step_name.clone(),
reason: error_msg,
}));
}
if let Some(output) = step_result.output.as_str() {
chain_results.push(format!("Step {}: {}", sub_step.step_name, output));
}
info!("Chain sub-step '{}' completed successfully", sub_step.step_name);
}
let final_result = chain_results.join("\n\n");
for (key, value) in chain_context.get_all_variables() {
if key.starts_with(&format!("{}_", self.chain_id)) ||
chain_steps.iter().any(|step| key.starts_with(&format!("{}_", step.step_name))) {
parent_context.set_variable(key, value);
}
}
let chain_result_key = format!("{}_result", self.chain_id);
parent_context.set_variable(&chain_result_key, &final_result);
Ok(final_result)
}
fn convert_sub_step_to_mission_step(
&self,
sub_step: &ChainSubStep,
context: &ExecutionContext,
) -> Result<MissionStep, RustChainError> {
let mut processed_parameters = sub_step.parameters.clone();
self.substitute_variables_in_value(&mut processed_parameters, context);
Ok(MissionStep {
id: sub_step.step_name.clone(),
name: sub_step.step_name.clone(),
step_type: sub_step.step_type.clone(),
parameters: processed_parameters,
depends_on: None, timeout_seconds: sub_step.timeout_seconds,
continue_on_error: None, })
}
fn substitute_variables_in_value(&self, value: &mut Value, context: &ExecutionContext) {
match value {
Value::String(s) => {
*s = context.substitute_variables(s);
}
Value::Object(map) => {
for (_, v) in map.iter_mut() {
self.substitute_variables_in_value(v, context);
}
}
Value::Array(arr) => {
for v in arr.iter_mut() {
self.substitute_variables_in_value(v, context);
}
}
_ => {} }
}
}
#[derive(Debug, Clone, serde::Deserialize)]
pub struct ChainSubStep {
pub step_name: String,
pub step_type: StepType,
pub parameters: Value,
pub timeout_seconds: Option<u64>,
}
impl ExecutionContext {
pub fn get_all_variables(&self) -> &HashMap<String, String> {
&self.variables
}
}