pforge_runtime/handlers/
pipeline.rs

1use crate::{HandlerRegistry, Result};
2use rustc_hash::FxHashMap;
3use serde::{Deserialize, Serialize};
4
5#[derive(Debug, Clone)]
6pub struct PipelineHandler {
7    pub steps: Vec<PipelineStep>,
8}
9
10#[derive(Debug, Clone)]
11pub struct PipelineStep {
12    pub tool: String,
13    pub input: Option<serde_json::Value>,
14    pub output_var: Option<String>,
15    pub condition: Option<String>,
16    pub error_policy: ErrorPolicy,
17}
18
19#[derive(Debug, Clone, PartialEq)]
20pub enum ErrorPolicy {
21    FailFast,
22    Continue,
23}
24
25#[derive(Debug, Deserialize)]
26pub struct PipelineInput {
27    #[serde(default)]
28    pub variables: FxHashMap<String, serde_json::Value>,
29}
30
31#[derive(Debug, Serialize)]
32pub struct PipelineOutput {
33    pub results: Vec<StepResult>,
34    pub variables: FxHashMap<String, serde_json::Value>,
35}
36
37#[derive(Debug, Serialize)]
38pub struct StepResult {
39    pub tool: String,
40    pub success: bool,
41    pub output: Option<serde_json::Value>,
42    pub error: Option<String>,
43}
44
45impl PipelineHandler {
46    pub fn new(steps: Vec<PipelineStep>) -> Self {
47        Self { steps }
48    }
49
50    pub async fn execute(
51        &self,
52        input: PipelineInput,
53        registry: &HandlerRegistry,
54    ) -> Result<PipelineOutput> {
55        let mut variables = input.variables;
56        let mut results = Vec::new();
57
58        for step in &self.steps {
59            // Check condition if present
60            if let Some(condition) = &step.condition {
61                if !self.evaluate_condition(condition, &variables) {
62                    continue;
63                }
64            }
65
66            // Interpolate input with variables
67            let step_input = if let Some(input_template) = &step.input {
68                self.interpolate_variables(input_template, &variables)
69            } else {
70                serde_json::json!({})
71            };
72
73            // Execute step
74            let step_result = match registry
75                .dispatch(&step.tool, &serde_json::to_vec(&step_input)?)
76                .await
77            {
78                Ok(output) => {
79                    let output_value: serde_json::Value = serde_json::from_slice(&output)?;
80
81                    // Store output in variable if specified
82                    if let Some(var_name) = &step.output_var {
83                        variables.insert(var_name.clone(), output_value.clone());
84                    }
85
86                    StepResult {
87                        tool: step.tool.clone(),
88                        success: true,
89                        output: Some(output_value),
90                        error: None,
91                    }
92                }
93                Err(e) => {
94                    let result = StepResult {
95                        tool: step.tool.clone(),
96                        success: false,
97                        output: None,
98                        error: Some(e.to_string()),
99                    };
100
101                    // Handle error based on policy
102                    if step.error_policy == ErrorPolicy::FailFast {
103                        results.push(result);
104                        return Err(e);
105                    }
106
107                    result
108                }
109            };
110
111            results.push(step_result);
112        }
113
114        Ok(PipelineOutput { results, variables })
115    }
116
117    fn evaluate_condition(
118        &self,
119        condition: &str,
120        variables: &FxHashMap<String, serde_json::Value>,
121    ) -> bool {
122        // Simple variable existence check for MVP
123        // Format: "variable_name" or "!variable_name"
124        if let Some(var_name) = condition.strip_prefix('!') {
125            !variables.contains_key(var_name)
126        } else {
127            variables.contains_key(condition)
128        }
129    }
130
131    fn interpolate_variables(
132        &self,
133        template: &serde_json::Value,
134        variables: &FxHashMap<String, serde_json::Value>,
135    ) -> serde_json::Value {
136        interpolate_value(template, variables)
137    }
138}
139
140/// Interpolate template variables in a JSON value.
141/// Supports {{var}} syntax for variable substitution.
142fn interpolate_value(
143    template: &serde_json::Value,
144    variables: &FxHashMap<String, serde_json::Value>,
145) -> serde_json::Value {
146    match template {
147        serde_json::Value::String(s) => {
148            // Replace {{var}} with variable value
149            let mut result = s.clone();
150            for (key, value) in variables {
151                let pattern = format!("{{{{{}}}}}", key);
152                if let Some(value_str) = value.as_str() {
153                    result = result.replace(&pattern, value_str);
154                }
155            }
156            serde_json::Value::String(result)
157        }
158        serde_json::Value::Object(obj) => {
159            let mut new_obj = serde_json::Map::new();
160            for (k, v) in obj {
161                new_obj.insert(k.clone(), interpolate_value(v, variables));
162            }
163            serde_json::Value::Object(new_obj)
164        }
165        serde_json::Value::Array(arr) => {
166            let new_arr: Vec<_> = arr
167                .iter()
168                .map(|v| interpolate_value(v, variables))
169                .collect();
170            serde_json::Value::Array(new_arr)
171        }
172        other => other.clone(),
173    }
174}
175
176/// Adapter that wraps PipelineHandler to implement the Handler trait.
177/// Captures a registry clone for dispatching sub-tool calls.
178pub struct PipelineHandlerAdapter {
179    handler: PipelineHandler,
180    registry: std::sync::Arc<tokio::sync::RwLock<crate::HandlerRegistry>>,
181}
182
183impl PipelineHandlerAdapter {
184    /// Create a new pipeline handler adapter with the given steps and registry.
185    pub fn new(
186        steps: Vec<PipelineStep>,
187        registry: std::sync::Arc<tokio::sync::RwLock<crate::HandlerRegistry>>,
188    ) -> Self {
189        Self {
190            handler: PipelineHandler::new(steps),
191            registry,
192        }
193    }
194
195    /// Convert config steps to runtime steps.
196    pub fn from_config_steps(
197        config_steps: &[pforge_config::PipelineStep],
198        registry: std::sync::Arc<tokio::sync::RwLock<crate::HandlerRegistry>>,
199    ) -> Self {
200        let steps = config_steps
201            .iter()
202            .map(|s| PipelineStep {
203                tool: s.tool.clone(),
204                input: s.input.clone(),
205                output_var: s.output_var.clone(),
206                condition: s.condition.clone(),
207                error_policy: match s.error_policy {
208                    pforge_config::ErrorPolicy::FailFast => ErrorPolicy::FailFast,
209                    pforge_config::ErrorPolicy::Continue => ErrorPolicy::Continue,
210                },
211            })
212            .collect();
213
214        Self {
215            handler: PipelineHandler::new(steps),
216            registry,
217        }
218    }
219}
220
221use schemars::JsonSchema;
222
223/// Input for the pipeline handler adapter.
224#[derive(Debug, Deserialize, JsonSchema)]
225pub struct PipelineAdapterInput {
226    /// Variables to pass to the pipeline.
227    #[serde(default)]
228    pub variables: FxHashMap<String, serde_json::Value>,
229}
230
231/// Output from the pipeline handler adapter.
232#[derive(Debug, Serialize, JsonSchema)]
233pub struct PipelineAdapterOutput {
234    /// Results from each step.
235    pub results: Vec<StepResultSchema>,
236    /// Final variable state.
237    pub variables: FxHashMap<String, serde_json::Value>,
238}
239
240/// Step result with JSON schema support.
241#[derive(Debug, Serialize, JsonSchema)]
242pub struct StepResultSchema {
243    /// Tool name that was executed.
244    pub tool: String,
245    /// Whether the step succeeded.
246    pub success: bool,
247    /// Output from the step (if successful).
248    pub output: Option<serde_json::Value>,
249    /// Error message (if failed).
250    pub error: Option<String>,
251}
252
253#[async_trait::async_trait]
254impl crate::Handler for PipelineHandlerAdapter {
255    type Input = PipelineAdapterInput;
256    type Output = PipelineAdapterOutput;
257    type Error = crate::Error;
258
259    async fn handle(&self, input: Self::Input) -> Result<Self::Output> {
260        let registry = self.registry.read().await;
261        let pipeline_input = PipelineInput {
262            variables: input.variables,
263        };
264
265        let output = self.handler.execute(pipeline_input, &registry).await?;
266
267        Ok(PipelineAdapterOutput {
268            results: output
269                .results
270                .into_iter()
271                .map(|r| StepResultSchema {
272                    tool: r.tool,
273                    success: r.success,
274                    output: r.output,
275                    error: r.error,
276                })
277                .collect(),
278            variables: output.variables,
279        })
280    }
281}
282
283#[cfg(test)]
284mod tests {
285    use super::*;
286
287    #[test]
288    fn test_pipeline_handler_new() {
289        let steps = vec![PipelineStep {
290            tool: "test_tool".to_string(),
291            input: None,
292            output_var: None,
293            condition: None,
294            error_policy: ErrorPolicy::FailFast,
295        }];
296
297        let handler = PipelineHandler::new(steps);
298        assert_eq!(handler.steps.len(), 1);
299        assert_eq!(handler.steps[0].tool, "test_tool");
300    }
301
302    #[test]
303    fn test_error_policy_equality() {
304        assert_eq!(ErrorPolicy::FailFast, ErrorPolicy::FailFast);
305        assert_eq!(ErrorPolicy::Continue, ErrorPolicy::Continue);
306        assert_ne!(ErrorPolicy::FailFast, ErrorPolicy::Continue);
307    }
308
309    #[test]
310    fn test_evaluate_condition_exists() {
311        let handler = PipelineHandler::new(vec![]);
312        let mut vars = FxHashMap::default();
313        vars.insert("key".to_string(), serde_json::json!("value"));
314
315        assert!(handler.evaluate_condition("key", &vars));
316        assert!(!handler.evaluate_condition("missing", &vars));
317    }
318
319    #[test]
320    fn test_evaluate_condition_not_exists() {
321        let handler = PipelineHandler::new(vec![]);
322        let mut vars = FxHashMap::default();
323        vars.insert("key".to_string(), serde_json::json!("value"));
324
325        assert!(!handler.evaluate_condition("!key", &vars));
326        assert!(handler.evaluate_condition("!missing", &vars));
327    }
328
329    #[test]
330    fn test_interpolate_variables_string() {
331        let handler = PipelineHandler::new(vec![]);
332        let mut vars = FxHashMap::default();
333        vars.insert("name".to_string(), serde_json::json!("Alice"));
334
335        let template = serde_json::json!("Hello {{name}}!");
336        let result = handler.interpolate_variables(&template, &vars);
337
338        assert_eq!(result, serde_json::json!("Hello Alice!"));
339    }
340
341    #[test]
342    fn test_interpolate_variables_object() {
343        let handler = PipelineHandler::new(vec![]);
344        let mut vars = FxHashMap::default();
345        vars.insert("user".to_string(), serde_json::json!("Bob"));
346
347        let template = serde_json::json!({"greeting": "Hi {{user}}"});
348        let result = handler.interpolate_variables(&template, &vars);
349
350        assert_eq!(result["greeting"], "Hi Bob");
351    }
352
353    #[test]
354    fn test_interpolate_variables_array() {
355        let handler = PipelineHandler::new(vec![]);
356        let mut vars = FxHashMap::default();
357        vars.insert("item".to_string(), serde_json::json!("test"));
358
359        let template = serde_json::json!(["{{item}}", "other"]);
360        let result = handler.interpolate_variables(&template, &vars);
361
362        assert_eq!(result[0], "test");
363        assert_eq!(result[1], "other");
364    }
365
366    #[test]
367    fn test_interpolate_variables_no_match() {
368        let handler = PipelineHandler::new(vec![]);
369        let vars = FxHashMap::default();
370
371        let template = serde_json::json!("Hello {{missing}}!");
372        let result = handler.interpolate_variables(&template, &vars);
373
374        assert_eq!(result, serde_json::json!("Hello {{missing}}!"));
375    }
376
377    #[test]
378    fn test_pipeline_input_deserialization() {
379        let json = r#"{"variables": {"key": "value"}}"#;
380        let input: PipelineInput = serde_json::from_str(json).unwrap();
381
382        assert_eq!(input.variables.len(), 1);
383        assert_eq!(input.variables["key"], "value");
384    }
385
386    #[test]
387    fn test_pipeline_output_serialization() {
388        let output = PipelineOutput {
389            results: vec![StepResult {
390                tool: "test".to_string(),
391                success: true,
392                output: Some(serde_json::json!({"result": "ok"})),
393                error: None,
394            }],
395            variables: FxHashMap::default(),
396        };
397
398        let json = serde_json::to_string(&output).unwrap();
399        assert!(json.contains("\"tool\":\"test\""));
400        assert!(json.contains("\"success\":true"));
401    }
402
403    #[tokio::test]
404    async fn test_pipeline_execute_simple() {
405        use crate::{Handler, HandlerRegistry};
406        use schemars::JsonSchema;
407
408        // Create a simple test handler
409        #[derive(Debug, serde::Deserialize, JsonSchema)]
410        struct TestInput {
411            value: String,
412        }
413
414        #[derive(Debug, serde::Serialize, JsonSchema)]
415        struct TestOutput {
416            result: String,
417        }
418
419        struct TestHandler;
420
421        #[async_trait::async_trait]
422        impl Handler for TestHandler {
423            type Input = TestInput;
424            type Output = TestOutput;
425            type Error = crate::Error;
426
427            async fn handle(&self, input: Self::Input) -> crate::Result<Self::Output> {
428                Ok(TestOutput {
429                    result: format!("processed: {}", input.value),
430                })
431            }
432        }
433
434        // Setup registry
435        let mut registry = HandlerRegistry::new();
436        registry.register("test_tool", TestHandler);
437
438        // Create pipeline with one step
439        let handler = PipelineHandler::new(vec![PipelineStep {
440            tool: "test_tool".to_string(),
441            input: Some(serde_json::json!({"value": "hello"})),
442            output_var: Some("result".to_string()),
443            condition: None,
444            error_policy: ErrorPolicy::FailFast,
445        }]);
446
447        let input = PipelineInput {
448            variables: FxHashMap::default(),
449        };
450
451        let output = handler.execute(input, &registry).await.unwrap();
452
453        assert_eq!(output.results.len(), 1);
454        assert!(output.results[0].success);
455        assert!(output.variables.contains_key("result"));
456    }
457
458    #[tokio::test]
459    async fn test_pipeline_execute_with_condition_skip() {
460        use crate::HandlerRegistry;
461
462        let registry = HandlerRegistry::new();
463
464        let handler = PipelineHandler::new(vec![PipelineStep {
465            tool: "nonexistent".to_string(),
466            input: None,
467            output_var: None,
468            condition: Some("missing_var".to_string()),
469            error_policy: ErrorPolicy::FailFast,
470        }]);
471
472        let input = PipelineInput {
473            variables: FxHashMap::default(),
474        };
475
476        let output = handler.execute(input, &registry).await.unwrap();
477
478        // Step should be skipped due to failed condition
479        assert_eq!(output.results.len(), 0);
480    }
481
482    #[tokio::test]
483    async fn test_pipeline_execute_continue_on_error() {
484        use crate::HandlerRegistry;
485
486        let registry = HandlerRegistry::new();
487
488        let handler = PipelineHandler::new(vec![
489            PipelineStep {
490                tool: "nonexistent1".to_string(),
491                input: None,
492                output_var: None,
493                condition: None,
494                error_policy: ErrorPolicy::Continue,
495            },
496            PipelineStep {
497                tool: "nonexistent2".to_string(),
498                input: None,
499                output_var: None,
500                condition: None,
501                error_policy: ErrorPolicy::Continue,
502            },
503        ]);
504
505        let input = PipelineInput {
506            variables: FxHashMap::default(),
507        };
508
509        let output = handler.execute(input, &registry).await.unwrap();
510
511        // Both steps should fail but continue
512        assert_eq!(output.results.len(), 2);
513        assert!(!output.results[0].success);
514        assert!(!output.results[1].success);
515    }
516
517    #[tokio::test]
518    async fn test_pipeline_execute_fail_fast() {
519        use crate::HandlerRegistry;
520
521        let registry = HandlerRegistry::new();
522
523        let handler = PipelineHandler::new(vec![
524            PipelineStep {
525                tool: "nonexistent1".to_string(),
526                input: None,
527                output_var: None,
528                condition: None,
529                error_policy: ErrorPolicy::FailFast,
530            },
531            PipelineStep {
532                tool: "nonexistent2".to_string(),
533                input: None,
534                output_var: None,
535                condition: None,
536                error_policy: ErrorPolicy::FailFast,
537            },
538        ]);
539
540        let input = PipelineInput {
541            variables: FxHashMap::default(),
542        };
543
544        let result = handler.execute(input, &registry).await;
545
546        // Should fail on first error
547        assert!(result.is_err());
548    }
549}