pforge_runtime/handlers/
pipeline.rs

1use crate::{HandlerRegistry, Result};
2use serde::{Deserialize, Serialize};
3use std::collections::HashMap;
4
5#[derive(Debug, Clone)]
6pub struct PipelineHandler {
7    pub steps: Vec<PipelineStep>,
8}
9
10#[derive(Debug, Clone)]
11pub struct PipelineStep {
12    pub tool: String,
13    pub input: Option<serde_json::Value>,
14    pub output_var: Option<String>,
15    pub condition: Option<String>,
16    pub error_policy: ErrorPolicy,
17}
18
19#[derive(Debug, Clone, PartialEq)]
20pub enum ErrorPolicy {
21    FailFast,
22    Continue,
23}
24
25#[derive(Debug, Deserialize)]
26pub struct PipelineInput {
27    #[serde(default)]
28    pub variables: HashMap<String, serde_json::Value>,
29}
30
31#[derive(Debug, Serialize)]
32pub struct PipelineOutput {
33    pub results: Vec<StepResult>,
34    pub variables: HashMap<String, serde_json::Value>,
35}
36
37#[derive(Debug, Serialize)]
38pub struct StepResult {
39    pub tool: String,
40    pub success: bool,
41    pub output: Option<serde_json::Value>,
42    pub error: Option<String>,
43}
44
45impl PipelineHandler {
46    pub fn new(steps: Vec<PipelineStep>) -> Self {
47        Self { steps }
48    }
49
50    pub async fn execute(
51        &self,
52        input: PipelineInput,
53        registry: &HandlerRegistry,
54    ) -> Result<PipelineOutput> {
55        let mut variables = input.variables;
56        let mut results = Vec::new();
57
58        for step in &self.steps {
59            // Check condition if present
60            if let Some(condition) = &step.condition {
61                if !self.evaluate_condition(condition, &variables) {
62                    continue;
63                }
64            }
65
66            // Interpolate input with variables
67            let step_input = if let Some(input_template) = &step.input {
68                self.interpolate_variables(input_template, &variables)
69            } else {
70                serde_json::json!({})
71            };
72
73            // Execute step
74            let step_result = match registry
75                .dispatch(&step.tool, &serde_json::to_vec(&step_input)?)
76                .await
77            {
78                Ok(output) => {
79                    let output_value: serde_json::Value = serde_json::from_slice(&output)?;
80
81                    // Store output in variable if specified
82                    if let Some(var_name) = &step.output_var {
83                        variables.insert(var_name.clone(), output_value.clone());
84                    }
85
86                    StepResult {
87                        tool: step.tool.clone(),
88                        success: true,
89                        output: Some(output_value),
90                        error: None,
91                    }
92                }
93                Err(e) => {
94                    let result = StepResult {
95                        tool: step.tool.clone(),
96                        success: false,
97                        output: None,
98                        error: Some(e.to_string()),
99                    };
100
101                    // Handle error based on policy
102                    if step.error_policy == ErrorPolicy::FailFast {
103                        results.push(result);
104                        return Err(e);
105                    }
106
107                    result
108                }
109            };
110
111            results.push(step_result);
112        }
113
114        Ok(PipelineOutput { results, variables })
115    }
116
117    fn evaluate_condition(&self, condition: &str, variables: &HashMap<String, serde_json::Value>) -> bool {
118        // Simple variable existence check for MVP
119        // Format: "variable_name" or "!variable_name"
120        if let Some(var_name) = condition.strip_prefix('!') {
121            !variables.contains_key(var_name)
122        } else {
123            variables.contains_key(condition)
124        }
125    }
126
127    fn interpolate_variables(
128        &self,
129        template: &serde_json::Value,
130        variables: &HashMap<String, serde_json::Value>,
131    ) -> serde_json::Value {
132        match template {
133            serde_json::Value::String(s) => {
134                // Replace {{var}} with variable value
135                let mut result = s.clone();
136                for (key, value) in variables {
137                    let pattern = format!("{{{{{}}}}}", key);
138                    if let Some(value_str) = value.as_str() {
139                        result = result.replace(&pattern, value_str);
140                    }
141                }
142                serde_json::Value::String(result)
143            }
144            serde_json::Value::Object(obj) => {
145                let mut new_obj = serde_json::Map::new();
146                for (k, v) in obj {
147                    new_obj.insert(k.clone(), self.interpolate_variables(v, variables));
148                }
149                serde_json::Value::Object(new_obj)
150            }
151            serde_json::Value::Array(arr) => {
152                let new_arr: Vec<_> = arr
153                    .iter()
154                    .map(|v| self.interpolate_variables(v, variables))
155                    .collect();
156                serde_json::Value::Array(new_arr)
157            }
158            other => other.clone(),
159        }
160    }
161}