use crate::config::{ActionDefinition, ActionType, WorkflowConfig};
use crate::expression::ExpressionEvaluator;
use anyhow::Result;
use flowbuilder_context::SharedContext;
use flowbuilder_core::{FlowBuilder, Step, StepFuture};
use std::future::Future;
use std::pin::Pin;
#[allow(unused_imports)]
use std::sync::Arc;
use std::time::Duration;
pub struct YamlFlowBuilder {
config: WorkflowConfig,
evaluator: ExpressionEvaluator,
}
type StepClosure = Box<
dyn FnMut(
SharedContext,
)
-> Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>
+ Send
+ 'static,
>;
impl YamlFlowBuilder {
pub fn new(config: WorkflowConfig) -> Result<Self> {
let mut evaluator = ExpressionEvaluator::new();
evaluator.set_env_vars(config.workflow.env.clone());
evaluator.set_flow_vars(config.workflow.vars.clone());
Ok(Self { config, evaluator })
}
pub fn build(&self) -> Result<FlowBuilder> {
let mut flow_builder = FlowBuilder::new();
for task in &self.config.workflow.tasks {
for action in &task.task.actions {
let step_closure =
self.create_step_closure_from_action(&action.action)?;
flow_builder = flow_builder.step(step_closure);
}
}
Ok(flow_builder)
}
fn create_step_closure_from_action(
&self,
action: &ActionDefinition,
) -> Result<StepClosure> {
let action_clone = action.clone();
let evaluator_clone = self.evaluator.clone();
Ok(Box::new(move |ctx: SharedContext| {
let action = action_clone.clone();
let evaluator = evaluator_clone.clone();
let future: Pin<
Box<dyn Future<Output = Result<()>> + Send + 'static>,
> = Box::pin(async move {
match action.action_type {
ActionType::Builtin => {
println!("执行内置动作: {}", action.id);
for (key, value) in action.outputs {
let mut guard = ctx.lock().await;
guard.set_variable(key, format!("{value:?}"));
}
}
ActionType::Cmd => {
println!("执行命令动作: {}", action.id);
for (param_name, param) in action.parameters {
let evaluated_value = evaluator
.evaluate(&format!("{:?}", param.value))
.unwrap_or(param.value.clone());
println!(
" 参数 {param_name}: {evaluated_value:?}"
);
}
}
ActionType::Http => {
println!("执行HTTP动作: {}", action.id);
tokio::time::sleep(std::time::Duration::from_millis(
100,
))
.await;
}
ActionType::Wasm => {
println!("执行WASM动作: {}", action.id);
tokio::time::sleep(std::time::Duration::from_millis(
50,
))
.await;
}
}
Ok(())
});
future
}))
}
#[allow(dead_code)]
fn create_step_from_action(
&self,
action: &ActionDefinition,
) -> Result<Step> {
match action.action_type {
ActionType::Builtin => self.create_builtin_step(action),
ActionType::Cmd => self.create_cmd_step(action),
ActionType::Http => self.create_http_step(action),
ActionType::Wasm => self.create_wasm_step(action),
}
}
fn create_builtin_step(&self, action: &ActionDefinition) -> Result<Step> {
let action_id = action.id.clone();
let outputs = action.outputs.clone();
let evaluator = self.evaluator.clone();
Ok(Box::new(move |_ctx: SharedContext| -> StepFuture {
let action_id = action_id.clone();
let outputs = outputs.clone();
let mut evaluator = evaluator.clone();
Box::pin(async move {
println!("执行内置动作: {action_id}");
for (key, value) in outputs {
let full_key = format!("{action_id}.outputs.{key}");
evaluator.set_context_var(full_key, value);
}
tokio::time::sleep(Duration::from_millis(100)).await;
Ok(())
})
}))
}
fn create_cmd_step(&self, action: &ActionDefinition) -> Result<Step> {
let action_id = action.id.clone();
let parameters = action.parameters.clone();
let outputs = action.outputs.clone();
let evaluator = self.evaluator.clone();
Ok(Box::new(move |_ctx: SharedContext| -> StepFuture {
let action_id = action_id.clone();
let parameters = parameters.clone();
let outputs = outputs.clone();
let mut evaluator = evaluator.clone();
Box::pin(async move {
println!("执行命令动作: {action_id}");
for (param_name, param) in parameters {
let evaluated_value = evaluator
.evaluate(&format!("{:?}", param.value))
.unwrap_or(param.value.clone());
println!(" 参数 {param_name}: {evaluated_value:?}");
}
tokio::time::sleep(Duration::from_millis(200)).await;
for (key, value) in outputs {
let full_key = format!("{action_id}.outputs.{key}");
evaluator.set_context_var(full_key, value);
}
Ok(())
})
}))
}
fn create_http_step(&self, action: &ActionDefinition) -> Result<Step> {
let action_id = action.id.clone();
let parameters = action.parameters.clone();
let outputs = action.outputs.clone();
let evaluator = self.evaluator.clone();
Ok(Box::new(move |_ctx: SharedContext| -> StepFuture {
let action_id = action_id.clone();
let parameters = parameters.clone();
let outputs = outputs.clone();
let mut evaluator = evaluator.clone();
Box::pin(async move {
println!("执行 HTTP 动作: {action_id}");
for (param_name, param) in parameters {
let evaluated_value = evaluator
.evaluate(&format!("{:?}", param.value))
.unwrap_or(param.value.clone());
println!(" HTTP 参数 {param_name}: {evaluated_value:?}");
}
tokio::time::sleep(Duration::from_millis(300)).await;
for (key, value) in outputs {
let full_key = format!("{action_id}.outputs.{key}");
evaluator.set_context_var(full_key, value);
}
Ok(())
})
}))
}
fn create_wasm_step(&self, action: &ActionDefinition) -> Result<Step> {
let action_id = action.id.clone();
let parameters = action.parameters.clone();
let outputs = action.outputs.clone();
let evaluator = self.evaluator.clone();
Ok(Box::new(move |_ctx: SharedContext| -> StepFuture {
let action_id = action_id.clone();
let parameters = parameters.clone();
let outputs = outputs.clone();
let mut evaluator = evaluator.clone();
Box::pin(async move {
println!("执行 WASM 动作: {action_id}");
for (param_name, param) in parameters {
let evaluated_value = evaluator
.evaluate(&format!("{:?}", param.value))
.unwrap_or(param.value.clone());
println!(" WASM 参数 {param_name}: {evaluated_value:?}");
}
tokio::time::sleep(Duration::from_millis(150)).await;
for (key, value) in outputs {
let full_key = format!("{action_id}.outputs.{key}");
evaluator.set_context_var(full_key, value);
}
Ok(())
})
}))
}
pub fn evaluator_mut(&mut self) -> &mut ExpressionEvaluator {
&mut self.evaluator
}
pub fn evaluator(&self) -> &ExpressionEvaluator {
&self.evaluator
}
pub fn config(&self) -> &WorkflowConfig {
&self.config
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::loader::WorkflowLoader;
#[test]
fn test_yaml_flow_builder() {
let yaml_content = r#"
workflow:
version: "1.0"
env:
FLOWBUILDER_ENV: "test"
vars:
name: "Test Workflow"
tasks:
- task:
id: "task1"
name: "Test Task"
description: "A test task"
actions:
- action:
id: "test_action"
name: "Test Action"
description: "A test action"
type: "builtin"
outputs:
status: 200
message: "Test completed"
parameters: {}
"#;
let config = WorkflowLoader::from_yaml_str(yaml_content).unwrap();
let yaml_builder = YamlFlowBuilder::new(config).unwrap();
let flow_builder = yaml_builder.build().unwrap();
let _flow = flow_builder.build();
}
#[tokio::test]
async fn test_builtin_step_execution() {
let yaml_content = r#"
workflow:
version: "1.0"
tasks:
- task:
id: "task1"
name: "Test Task"
description: "A test task"
actions:
- action:
id: "test_action"
name: "Test Action"
description: "A test action"
type: "builtin"
outputs:
status: 200
parameters: {}
"#;
let config = WorkflowLoader::from_yaml_str(yaml_content).unwrap();
let yaml_builder = YamlFlowBuilder::new(config).unwrap();
let flow_builder = yaml_builder.build().unwrap();
let _context = Arc::new(tokio::sync::Mutex::new(
flowbuilder_context::FlowContext::default(),
));
let flow = flow_builder.build();
let result = flow.execute().await;
assert!(result.is_ok());
}
}