1use crate::{HandlerRegistry, Result};
2use rustc_hash::FxHashMap;
3use serde::{Deserialize, Serialize};
4
5#[derive(Debug, Clone)]
6pub struct PipelineHandler {
7 pub steps: Vec<PipelineStep>,
8}
9
10#[derive(Debug, Clone)]
11pub struct PipelineStep {
12 pub tool: String,
13 pub input: Option<serde_json::Value>,
14 pub output_var: Option<String>,
15 pub condition: Option<String>,
16 pub error_policy: ErrorPolicy,
17}
18
19#[derive(Debug, Clone, PartialEq)]
20pub enum ErrorPolicy {
21 FailFast,
22 Continue,
23}
24
25#[derive(Debug, Deserialize)]
26pub struct PipelineInput {
27 #[serde(default)]
28 pub variables: FxHashMap<String, serde_json::Value>,
29}
30
31#[derive(Debug, Serialize)]
32pub struct PipelineOutput {
33 pub results: Vec<StepResult>,
34 pub variables: FxHashMap<String, serde_json::Value>,
35}
36
37#[derive(Debug, Serialize)]
38pub struct StepResult {
39 pub tool: String,
40 pub success: bool,
41 pub output: Option<serde_json::Value>,
42 pub error: Option<String>,
43}
44
45impl PipelineHandler {
46 pub fn new(steps: Vec<PipelineStep>) -> Self {
47 Self { steps }
48 }
49
50 pub async fn execute(
51 &self,
52 input: PipelineInput,
53 registry: &HandlerRegistry,
54 ) -> Result<PipelineOutput> {
55 let mut variables = input.variables;
56 let mut results = Vec::new();
57
58 for step in &self.steps {
59 if let Some(condition) = &step.condition {
61 if !self.evaluate_condition(condition, &variables) {
62 continue;
63 }
64 }
65
66 let step_input = if let Some(input_template) = &step.input {
68 self.interpolate_variables(input_template, &variables)
69 } else {
70 serde_json::json!({})
71 };
72
73 let step_result = match registry
75 .dispatch(&step.tool, &serde_json::to_vec(&step_input)?)
76 .await
77 {
78 Ok(output) => {
79 let output_value: serde_json::Value = serde_json::from_slice(&output)?;
80
81 if let Some(var_name) = &step.output_var {
83 variables.insert(var_name.clone(), output_value.clone());
84 }
85
86 StepResult {
87 tool: step.tool.clone(),
88 success: true,
89 output: Some(output_value),
90 error: None,
91 }
92 }
93 Err(e) => {
94 let result = StepResult {
95 tool: step.tool.clone(),
96 success: false,
97 output: None,
98 error: Some(e.to_string()),
99 };
100
101 if step.error_policy == ErrorPolicy::FailFast {
103 results.push(result);
104 return Err(e);
105 }
106
107 result
108 }
109 };
110
111 results.push(step_result);
112 }
113
114 Ok(PipelineOutput { results, variables })
115 }
116
117 fn evaluate_condition(
118 &self,
119 condition: &str,
120 variables: &FxHashMap<String, serde_json::Value>,
121 ) -> bool {
122 if let Some(var_name) = condition.strip_prefix('!') {
125 !variables.contains_key(var_name)
126 } else {
127 variables.contains_key(condition)
128 }
129 }
130
131 fn interpolate_variables(
132 &self,
133 template: &serde_json::Value,
134 variables: &FxHashMap<String, serde_json::Value>,
135 ) -> serde_json::Value {
136 interpolate_value(template, variables)
137 }
138}
139
140fn interpolate_value(
143 template: &serde_json::Value,
144 variables: &FxHashMap<String, serde_json::Value>,
145) -> serde_json::Value {
146 match template {
147 serde_json::Value::String(s) => {
148 let mut result = s.clone();
150 for (key, value) in variables {
151 let pattern = format!("{{{{{}}}}}", key);
152 if let Some(value_str) = value.as_str() {
153 result = result.replace(&pattern, value_str);
154 }
155 }
156 serde_json::Value::String(result)
157 }
158 serde_json::Value::Object(obj) => {
159 let mut new_obj = serde_json::Map::new();
160 for (k, v) in obj {
161 new_obj.insert(k.clone(), interpolate_value(v, variables));
162 }
163 serde_json::Value::Object(new_obj)
164 }
165 serde_json::Value::Array(arr) => {
166 let new_arr: Vec<_> = arr
167 .iter()
168 .map(|v| interpolate_value(v, variables))
169 .collect();
170 serde_json::Value::Array(new_arr)
171 }
172 other => other.clone(),
173 }
174}
175
176pub struct PipelineHandlerAdapter {
179 handler: PipelineHandler,
180 registry: std::sync::Arc<tokio::sync::RwLock<crate::HandlerRegistry>>,
181}
182
183impl PipelineHandlerAdapter {
184 pub fn new(
186 steps: Vec<PipelineStep>,
187 registry: std::sync::Arc<tokio::sync::RwLock<crate::HandlerRegistry>>,
188 ) -> Self {
189 Self {
190 handler: PipelineHandler::new(steps),
191 registry,
192 }
193 }
194
195 pub fn from_config_steps(
197 config_steps: &[pforge_config::PipelineStep],
198 registry: std::sync::Arc<tokio::sync::RwLock<crate::HandlerRegistry>>,
199 ) -> Self {
200 let steps = config_steps
201 .iter()
202 .map(|s| PipelineStep {
203 tool: s.tool.clone(),
204 input: s.input.clone(),
205 output_var: s.output_var.clone(),
206 condition: s.condition.clone(),
207 error_policy: match s.error_policy {
208 pforge_config::ErrorPolicy::FailFast => ErrorPolicy::FailFast,
209 pforge_config::ErrorPolicy::Continue => ErrorPolicy::Continue,
210 },
211 })
212 .collect();
213
214 Self {
215 handler: PipelineHandler::new(steps),
216 registry,
217 }
218 }
219}
220
221use schemars::JsonSchema;
222
223#[derive(Debug, Deserialize, JsonSchema)]
225pub struct PipelineAdapterInput {
226 #[serde(default)]
228 pub variables: FxHashMap<String, serde_json::Value>,
229}
230
231#[derive(Debug, Serialize, JsonSchema)]
233pub struct PipelineAdapterOutput {
234 pub results: Vec<StepResultSchema>,
236 pub variables: FxHashMap<String, serde_json::Value>,
238}
239
240#[derive(Debug, Serialize, JsonSchema)]
242pub struct StepResultSchema {
243 pub tool: String,
245 pub success: bool,
247 pub output: Option<serde_json::Value>,
249 pub error: Option<String>,
251}
252
253#[async_trait::async_trait]
254impl crate::Handler for PipelineHandlerAdapter {
255 type Input = PipelineAdapterInput;
256 type Output = PipelineAdapterOutput;
257 type Error = crate::Error;
258
259 async fn handle(&self, input: Self::Input) -> Result<Self::Output> {
260 let registry = self.registry.read().await;
261 let pipeline_input = PipelineInput {
262 variables: input.variables,
263 };
264
265 let output = self.handler.execute(pipeline_input, ®istry).await?;
266
267 Ok(PipelineAdapterOutput {
268 results: output
269 .results
270 .into_iter()
271 .map(|r| StepResultSchema {
272 tool: r.tool,
273 success: r.success,
274 output: r.output,
275 error: r.error,
276 })
277 .collect(),
278 variables: output.variables,
279 })
280 }
281}
282
283#[cfg(test)]
284mod tests {
285 use super::*;
286
287 #[test]
288 fn test_pipeline_handler_new() {
289 let steps = vec![PipelineStep {
290 tool: "test_tool".to_string(),
291 input: None,
292 output_var: None,
293 condition: None,
294 error_policy: ErrorPolicy::FailFast,
295 }];
296
297 let handler = PipelineHandler::new(steps);
298 assert_eq!(handler.steps.len(), 1);
299 assert_eq!(handler.steps[0].tool, "test_tool");
300 }
301
302 #[test]
303 fn test_error_policy_equality() {
304 assert_eq!(ErrorPolicy::FailFast, ErrorPolicy::FailFast);
305 assert_eq!(ErrorPolicy::Continue, ErrorPolicy::Continue);
306 assert_ne!(ErrorPolicy::FailFast, ErrorPolicy::Continue);
307 }
308
309 #[test]
310 fn test_evaluate_condition_exists() {
311 let handler = PipelineHandler::new(vec![]);
312 let mut vars = FxHashMap::default();
313 vars.insert("key".to_string(), serde_json::json!("value"));
314
315 assert!(handler.evaluate_condition("key", &vars));
316 assert!(!handler.evaluate_condition("missing", &vars));
317 }
318
319 #[test]
320 fn test_evaluate_condition_not_exists() {
321 let handler = PipelineHandler::new(vec![]);
322 let mut vars = FxHashMap::default();
323 vars.insert("key".to_string(), serde_json::json!("value"));
324
325 assert!(!handler.evaluate_condition("!key", &vars));
326 assert!(handler.evaluate_condition("!missing", &vars));
327 }
328
329 #[test]
330 fn test_interpolate_variables_string() {
331 let handler = PipelineHandler::new(vec![]);
332 let mut vars = FxHashMap::default();
333 vars.insert("name".to_string(), serde_json::json!("Alice"));
334
335 let template = serde_json::json!("Hello {{name}}!");
336 let result = handler.interpolate_variables(&template, &vars);
337
338 assert_eq!(result, serde_json::json!("Hello Alice!"));
339 }
340
341 #[test]
342 fn test_interpolate_variables_object() {
343 let handler = PipelineHandler::new(vec![]);
344 let mut vars = FxHashMap::default();
345 vars.insert("user".to_string(), serde_json::json!("Bob"));
346
347 let template = serde_json::json!({"greeting": "Hi {{user}}"});
348 let result = handler.interpolate_variables(&template, &vars);
349
350 assert_eq!(result["greeting"], "Hi Bob");
351 }
352
353 #[test]
354 fn test_interpolate_variables_array() {
355 let handler = PipelineHandler::new(vec![]);
356 let mut vars = FxHashMap::default();
357 vars.insert("item".to_string(), serde_json::json!("test"));
358
359 let template = serde_json::json!(["{{item}}", "other"]);
360 let result = handler.interpolate_variables(&template, &vars);
361
362 assert_eq!(result[0], "test");
363 assert_eq!(result[1], "other");
364 }
365
366 #[test]
367 fn test_interpolate_variables_no_match() {
368 let handler = PipelineHandler::new(vec![]);
369 let vars = FxHashMap::default();
370
371 let template = serde_json::json!("Hello {{missing}}!");
372 let result = handler.interpolate_variables(&template, &vars);
373
374 assert_eq!(result, serde_json::json!("Hello {{missing}}!"));
375 }
376
377 #[test]
378 fn test_pipeline_input_deserialization() {
379 let json = r#"{"variables": {"key": "value"}}"#;
380 let input: PipelineInput = serde_json::from_str(json).unwrap();
381
382 assert_eq!(input.variables.len(), 1);
383 assert_eq!(input.variables["key"], "value");
384 }
385
386 #[test]
387 fn test_pipeline_output_serialization() {
388 let output = PipelineOutput {
389 results: vec![StepResult {
390 tool: "test".to_string(),
391 success: true,
392 output: Some(serde_json::json!({"result": "ok"})),
393 error: None,
394 }],
395 variables: FxHashMap::default(),
396 };
397
398 let json = serde_json::to_string(&output).unwrap();
399 assert!(json.contains("\"tool\":\"test\""));
400 assert!(json.contains("\"success\":true"));
401 }
402
403 #[tokio::test]
404 async fn test_pipeline_execute_simple() {
405 use crate::{Handler, HandlerRegistry};
406 use schemars::JsonSchema;
407
408 #[derive(Debug, serde::Deserialize, JsonSchema)]
410 struct TestInput {
411 value: String,
412 }
413
414 #[derive(Debug, serde::Serialize, JsonSchema)]
415 struct TestOutput {
416 result: String,
417 }
418
419 struct TestHandler;
420
421 #[async_trait::async_trait]
422 impl Handler for TestHandler {
423 type Input = TestInput;
424 type Output = TestOutput;
425 type Error = crate::Error;
426
427 async fn handle(&self, input: Self::Input) -> crate::Result<Self::Output> {
428 Ok(TestOutput {
429 result: format!("processed: {}", input.value),
430 })
431 }
432 }
433
434 let mut registry = HandlerRegistry::new();
436 registry.register("test_tool", TestHandler);
437
438 let handler = PipelineHandler::new(vec![PipelineStep {
440 tool: "test_tool".to_string(),
441 input: Some(serde_json::json!({"value": "hello"})),
442 output_var: Some("result".to_string()),
443 condition: None,
444 error_policy: ErrorPolicy::FailFast,
445 }]);
446
447 let input = PipelineInput {
448 variables: FxHashMap::default(),
449 };
450
451 let output = handler.execute(input, ®istry).await.unwrap();
452
453 assert_eq!(output.results.len(), 1);
454 assert!(output.results[0].success);
455 assert!(output.variables.contains_key("result"));
456 }
457
458 #[tokio::test]
459 async fn test_pipeline_execute_with_condition_skip() {
460 use crate::HandlerRegistry;
461
462 let registry = HandlerRegistry::new();
463
464 let handler = PipelineHandler::new(vec![PipelineStep {
465 tool: "nonexistent".to_string(),
466 input: None,
467 output_var: None,
468 condition: Some("missing_var".to_string()),
469 error_policy: ErrorPolicy::FailFast,
470 }]);
471
472 let input = PipelineInput {
473 variables: FxHashMap::default(),
474 };
475
476 let output = handler.execute(input, ®istry).await.unwrap();
477
478 assert_eq!(output.results.len(), 0);
480 }
481
482 #[tokio::test]
483 async fn test_pipeline_execute_continue_on_error() {
484 use crate::HandlerRegistry;
485
486 let registry = HandlerRegistry::new();
487
488 let handler = PipelineHandler::new(vec![
489 PipelineStep {
490 tool: "nonexistent1".to_string(),
491 input: None,
492 output_var: None,
493 condition: None,
494 error_policy: ErrorPolicy::Continue,
495 },
496 PipelineStep {
497 tool: "nonexistent2".to_string(),
498 input: None,
499 output_var: None,
500 condition: None,
501 error_policy: ErrorPolicy::Continue,
502 },
503 ]);
504
505 let input = PipelineInput {
506 variables: FxHashMap::default(),
507 };
508
509 let output = handler.execute(input, ®istry).await.unwrap();
510
511 assert_eq!(output.results.len(), 2);
513 assert!(!output.results[0].success);
514 assert!(!output.results[1].success);
515 }
516
517 #[tokio::test]
518 async fn test_pipeline_execute_fail_fast() {
519 use crate::HandlerRegistry;
520
521 let registry = HandlerRegistry::new();
522
523 let handler = PipelineHandler::new(vec![
524 PipelineStep {
525 tool: "nonexistent1".to_string(),
526 input: None,
527 output_var: None,
528 condition: None,
529 error_policy: ErrorPolicy::FailFast,
530 },
531 PipelineStep {
532 tool: "nonexistent2".to_string(),
533 input: None,
534 output_var: None,
535 condition: None,
536 error_policy: ErrorPolicy::FailFast,
537 },
538 ]);
539
540 let input = PipelineInput {
541 variables: FxHashMap::default(),
542 };
543
544 let result = handler.execute(input, ®istry).await;
545
546 assert!(result.is_err());
548 }
549}