pforge_runtime/handlers/
pipeline.rs1use crate::{HandlerRegistry, Result};
2use serde::{Deserialize, Serialize};
3use std::collections::HashMap;
4
5#[derive(Debug, Clone)]
6pub struct PipelineHandler {
7 pub steps: Vec<PipelineStep>,
8}
9
10#[derive(Debug, Clone)]
11pub struct PipelineStep {
12 pub tool: String,
13 pub input: Option<serde_json::Value>,
14 pub output_var: Option<String>,
15 pub condition: Option<String>,
16 pub error_policy: ErrorPolicy,
17}
18
19#[derive(Debug, Clone, PartialEq)]
20pub enum ErrorPolicy {
21 FailFast,
22 Continue,
23}
24
25#[derive(Debug, Deserialize)]
26pub struct PipelineInput {
27 #[serde(default)]
28 pub variables: HashMap<String, serde_json::Value>,
29}
30
31#[derive(Debug, Serialize)]
32pub struct PipelineOutput {
33 pub results: Vec<StepResult>,
34 pub variables: HashMap<String, serde_json::Value>,
35}
36
37#[derive(Debug, Serialize)]
38pub struct StepResult {
39 pub tool: String,
40 pub success: bool,
41 pub output: Option<serde_json::Value>,
42 pub error: Option<String>,
43}
44
45impl PipelineHandler {
46 pub fn new(steps: Vec<PipelineStep>) -> Self {
47 Self { steps }
48 }
49
50 pub async fn execute(
51 &self,
52 input: PipelineInput,
53 registry: &HandlerRegistry,
54 ) -> Result<PipelineOutput> {
55 let mut variables = input.variables;
56 let mut results = Vec::new();
57
58 for step in &self.steps {
59 if let Some(condition) = &step.condition {
61 if !self.evaluate_condition(condition, &variables) {
62 continue;
63 }
64 }
65
66 let step_input = if let Some(input_template) = &step.input {
68 self.interpolate_variables(input_template, &variables)
69 } else {
70 serde_json::json!({})
71 };
72
73 let step_result = match registry
75 .dispatch(&step.tool, &serde_json::to_vec(&step_input)?)
76 .await
77 {
78 Ok(output) => {
79 let output_value: serde_json::Value = serde_json::from_slice(&output)?;
80
81 if let Some(var_name) = &step.output_var {
83 variables.insert(var_name.clone(), output_value.clone());
84 }
85
86 StepResult {
87 tool: step.tool.clone(),
88 success: true,
89 output: Some(output_value),
90 error: None,
91 }
92 }
93 Err(e) => {
94 let result = StepResult {
95 tool: step.tool.clone(),
96 success: false,
97 output: None,
98 error: Some(e.to_string()),
99 };
100
101 if step.error_policy == ErrorPolicy::FailFast {
103 results.push(result);
104 return Err(e);
105 }
106
107 result
108 }
109 };
110
111 results.push(step_result);
112 }
113
114 Ok(PipelineOutput { results, variables })
115 }
116
117 fn evaluate_condition(&self, condition: &str, variables: &HashMap<String, serde_json::Value>) -> bool {
118 if let Some(var_name) = condition.strip_prefix('!') {
121 !variables.contains_key(var_name)
122 } else {
123 variables.contains_key(condition)
124 }
125 }
126
127 fn interpolate_variables(
128 &self,
129 template: &serde_json::Value,
130 variables: &HashMap<String, serde_json::Value>,
131 ) -> serde_json::Value {
132 match template {
133 serde_json::Value::String(s) => {
134 let mut result = s.clone();
136 for (key, value) in variables {
137 let pattern = format!("{{{{{}}}}}", key);
138 if let Some(value_str) = value.as_str() {
139 result = result.replace(&pattern, value_str);
140 }
141 }
142 serde_json::Value::String(result)
143 }
144 serde_json::Value::Object(obj) => {
145 let mut new_obj = serde_json::Map::new();
146 for (k, v) in obj {
147 new_obj.insert(k.clone(), self.interpolate_variables(v, variables));
148 }
149 serde_json::Value::Object(new_obj)
150 }
151 serde_json::Value::Array(arr) => {
152 let new_arr: Vec<_> = arr
153 .iter()
154 .map(|v| self.interpolate_variables(v, variables))
155 .collect();
156 serde_json::Value::Array(new_arr)
157 }
158 other => other.clone(),
159 }
160 }
161}