use super::{ComposableWorkflow, WorkflowComposer};
use crate::cook::workflow::WorkflowContext;
use anyhow::{Context, Result};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SubWorkflow {
pub source: PathBuf,
#[serde(default)]
pub parameters: HashMap<String, Value>,
#[serde(skip_serializing_if = "Option::is_none")]
pub inputs: Option<HashMap<String, String>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub outputs: Option<Vec<String>>,
#[serde(default)]
pub parallel: bool,
#[serde(default)]
pub continue_on_error: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub timeout: Option<Duration>,
#[serde(skip_serializing_if = "Option::is_none")]
pub working_dir: Option<PathBuf>,
}
#[derive(Debug, Clone, Serialize)]
pub struct SubWorkflowResult {
pub success: bool,
pub outputs: HashMap<String, Value>,
pub duration: Duration,
pub error: Option<String>,
pub logs: Vec<String>,
}
pub struct SubWorkflowExecutor {
composer: Arc<WorkflowComposer>,
}
impl SubWorkflowExecutor {
pub fn new(composer: Arc<WorkflowComposer>) -> Self {
Self { composer }
}
pub async fn execute_sub_workflow(
&self,
parent_context: &mut WorkflowContext,
name: &str,
sub_workflow: &SubWorkflow,
) -> Result<SubWorkflowResult> {
let start_time = std::time::Instant::now();
let mut logs = Vec::new();
tracing::info!(
"Executing sub-workflow '{}' from {:?}",
name,
sub_workflow.source
);
logs.push(format!(
"Starting sub-workflow '{}' from {:?}",
name, sub_workflow.source
));
let mut sub_context = self.create_sub_context(parent_context, &sub_workflow.inputs)?;
if let Some(_working_dir) = &sub_workflow.working_dir {
tracing::debug!("Working directory override not yet implemented");
}
let composed = self
.composer
.compose(&sub_workflow.source, sub_workflow.parameters.clone())
.await
.with_context(|| format!("Failed to compose sub-workflow '{}'", name))?;
logs.push(format!(
"Composed sub-workflow with {} commands",
composed.workflow.config.commands.len()
));
let result = if let Some(timeout) = sub_workflow.timeout {
tokio::time::timeout(
timeout,
self.execute_composed(&composed.workflow, &mut sub_context),
)
.await
.map_err(|_| anyhow::anyhow!("Sub-workflow '{}' timed out", name))
.and_then(|r| r)
} else {
self.execute_composed(&composed.workflow, &mut sub_context)
.await
};
let duration = start_time.elapsed();
match result {
Ok(_) => {
logs.push(format!(
"Sub-workflow '{}' completed successfully in {:?}",
name, duration
));
let outputs = self.extract_outputs(&sub_context, &sub_workflow.outputs)?;
self.merge_outputs(parent_context, &outputs)?;
Ok(SubWorkflowResult {
success: true,
outputs,
duration,
error: None,
logs,
})
}
Err(e) => {
let error_msg = format!("Sub-workflow '{}' failed: {}", name, e);
logs.push(error_msg.clone());
if sub_workflow.continue_on_error {
tracing::warn!("{} (continuing due to continue_on_error)", error_msg);
Ok(SubWorkflowResult {
success: false,
outputs: HashMap::new(),
duration,
error: Some(e.to_string()),
logs,
})
} else {
Err(anyhow::anyhow!(error_msg))
}
}
}
}
pub async fn execute_parallel_sub_workflows(
&self,
parent_context: &mut WorkflowContext,
sub_workflows: Vec<(&str, &SubWorkflow)>,
) -> Result<Vec<(String, SubWorkflowResult)>> {
let mut handles = Vec::new();
for (name, sub_workflow) in sub_workflows {
let name = name.to_string();
let sub_workflow = sub_workflow.clone();
let parent_ctx = parent_context.clone();
let executor = self.clone();
let handle = tokio::spawn(async move {
let mut ctx = parent_ctx;
let result = executor
.execute_sub_workflow(&mut ctx, &name, &sub_workflow)
.await;
(name, result)
});
handles.push(handle);
}
let mut results = Vec::new();
for handle in handles {
let (name, result) = handle
.await
.map_err(|e| anyhow::anyhow!("Failed to join sub-workflow task: {}", e))?;
match result {
Ok(sub_result) => {
results.push((name, sub_result));
}
Err(e) => {
return Err(anyhow::anyhow!("Sub-workflow '{}' failed: {}", name, e));
}
}
}
for (_, result) in &results {
self.merge_outputs(parent_context, &result.outputs)?;
}
Ok(results)
}
async fn execute_composed(
&self,
workflow: &ComposableWorkflow,
_context: &mut WorkflowContext,
) -> Result<()> {
tracing::info!(
"Would execute composed workflow with {} commands",
workflow.config.commands.len()
);
Ok(())
}
fn create_sub_context(
&self,
parent_context: &WorkflowContext,
inputs: &Option<HashMap<String, String>>,
) -> Result<WorkflowContext> {
let mut sub_context = parent_context.clone();
sub_context.variables.clear();
if let Some(inputs) = inputs {
for (key, var_name) in inputs {
if let Some(value) = parent_context.variables.get(var_name) {
sub_context.variables.insert(key.clone(), value.clone());
}
}
}
Ok(sub_context)
}
fn extract_outputs(
&self,
sub_context: &WorkflowContext,
outputs: &Option<Vec<String>>,
) -> Result<HashMap<String, Value>> {
let mut result = HashMap::new();
if let Some(output_names) = outputs {
for name in output_names {
if let Some(value) = sub_context.variables.get(name) {
let json_value = serde_json::to_value(value)
.with_context(|| format!("Failed to serialize output '{}'", name))?;
result.insert(name.clone(), json_value);
}
}
}
Ok(result)
}
fn merge_outputs(
&self,
parent_context: &mut WorkflowContext,
outputs: &HashMap<String, Value>,
) -> Result<()> {
for (key, value) in outputs {
let str_value = match value {
Value::String(s) => s.clone(),
_ => value.to_string(),
};
parent_context.variables.insert(key.clone(), str_value);
}
Ok(())
}
}
impl Clone for SubWorkflowExecutor {
fn clone(&self) -> Self {
Self {
composer: Arc::clone(&self.composer),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_sub_workflow_configuration() {
let sub = SubWorkflow {
source: PathBuf::from("test.yml"),
parameters: HashMap::new(),
inputs: Some(HashMap::from([("input1".to_string(), "var1".to_string())])),
outputs: Some(vec!["result".to_string()]),
parallel: false,
continue_on_error: false,
timeout: Some(Duration::from_secs(60)),
working_dir: None,
};
assert_eq!(sub.source, PathBuf::from("test.yml"));
assert!(sub.inputs.is_some());
assert!(sub.outputs.is_some());
}
#[test]
fn test_sub_workflow_result() {
let result = SubWorkflowResult {
success: true,
outputs: HashMap::from([("key".to_string(), Value::String("value".to_string()))]),
duration: Duration::from_secs(10),
error: None,
logs: vec!["Log entry".to_string()],
};
assert!(result.success);
assert_eq!(result.outputs.len(), 1);
assert_eq!(result.duration, Duration::from_secs(10));
}
}