Skip to main content

mur_common/
pipeline.rs

1//! Pipeline expression AST and parser.
2//!
3//! Supports three composition operators with precedence (high → low):
4//! - `|`  Pipe — pass output of left as input to right
5//! - `&&` Sequential — run right only if left succeeds (exit code 0)
6//! - `,`  Parallel — run all branches concurrently
7
8use serde::{Deserialize, Serialize};
9
10/// Pipeline expression AST.
11#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
12pub enum PipelineExpr {
13    /// A single workflow reference (by name or ID).
14    Single(String),
15    /// `w1 | w2` — pipe output of left into right.
16    Pipe(Box<PipelineExpr>, Box<PipelineExpr>),
17    /// `w1 && w2` — run right only if left succeeds.
18    Sequential(Box<PipelineExpr>, Box<PipelineExpr>),
19    /// `w1, w2, w3` — run all concurrently.
20    Parallel(Vec<PipelineExpr>),
21}
22
23/// Status of a pipeline stage execution.
24#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
25#[serde(rename_all = "lowercase")]
26pub enum PipelineStatus {
27    Success,
28    Failed,
29    Skipped,
30}
31
32/// Output produced by a single pipeline stage.
33#[derive(Debug, Clone, Serialize, Deserialize)]
34pub struct PipelineOutput {
35    pub workflow_id: String,
36    pub status: PipelineStatus,
37    pub output_text: Option<String>,
38    pub output_data: Option<serde_json::Value>,
39    pub exit_code: i32,
40    pub duration_ms: u64,
41    /// Real LLM tokens (input + output) consumed producing this output, summed
42    /// across any delegated agent turns. 0 when unknown (e.g. shell stages or a
43    /// reply that carried no usage). Drives the fleet budget guard's real-cost
44    /// accounting. `#[serde(default)]` keeps older serialized outputs readable.
45    #[serde(default)]
46    pub tokens_used: u64,
47}
48
49/// Replace `{{input}}` placeholders in a template string with the given value.
50/// If `input` is `None`, replaces with empty string.
51///
52/// The replacement is shell-escaped to prevent injection when the result
53/// is passed to `sh -c`.
54pub fn inject_input(template: &str, input: Option<&str>) -> String {
55    let replacement = input.unwrap_or("");
56    let escaped = shell_escape::escape(replacement.into());
57    template.replace("{{input}}", &escaped)
58}
59
60/// Returns `true` if the input string contains pipeline operators.
61pub fn has_pipeline_syntax(input: &str) -> bool {
62    // Check for `|` (but not inside workflow names), `&&`, or `,`
63    input.contains(" | ") || input.contains(" && ") || input.contains(", ") || input.contains(',')
64}
65
66/// Parse a pipeline expression string into an AST.
67///
68/// Operator precedence (high → low): `|` > `&&` > `,`
69///
70/// Parsing strategy: split by lowest-precedence operator first (`,`),
71/// then by `&&`, then by `|`.
72pub fn parse_pipeline_expr(input: &str) -> Result<PipelineExpr, PipelineParseError> {
73    let input = input.trim();
74    if input.is_empty() {
75        return Err(PipelineParseError::EmptyInput);
76    }
77    parse_parallel(input)
78}
79
80/// Errors from pipeline expression parsing.
81#[derive(Debug, Clone, PartialEq, thiserror::Error)]
82pub enum PipelineParseError {
83    #[error("empty pipeline expression")]
84    EmptyInput,
85    #[error("empty segment in pipeline expression")]
86    EmptySegment,
87}
88
89/// Parse comma-separated parallel branches (lowest precedence).
90fn parse_parallel(input: &str) -> Result<PipelineExpr, PipelineParseError> {
91    let parts: Vec<&str> = input.split(',').collect();
92    if parts.len() == 1 {
93        return parse_sequential(input);
94    }
95
96    let mut exprs = Vec::with_capacity(parts.len());
97    for part in parts {
98        let part = part.trim();
99        if part.is_empty() {
100            return Err(PipelineParseError::EmptySegment);
101        }
102        exprs.push(parse_sequential(part)?);
103    }
104
105    if exprs.len() == 1 {
106        Ok(exprs.into_iter().next().unwrap())
107    } else {
108        Ok(PipelineExpr::Parallel(exprs))
109    }
110}
111
112/// Parse `&&`-separated sequential chains (middle precedence).
113fn parse_sequential(input: &str) -> Result<PipelineExpr, PipelineParseError> {
114    let parts: Vec<&str> = input.split("&&").collect();
115    if parts.len() == 1 {
116        return parse_pipe(input);
117    }
118
119    let mut iter = parts.into_iter();
120    let first = iter.next().unwrap().trim();
121    if first.is_empty() {
122        return Err(PipelineParseError::EmptySegment);
123    }
124    let mut expr = parse_pipe(first)?;
125
126    for part in iter {
127        let part = part.trim();
128        if part.is_empty() {
129            return Err(PipelineParseError::EmptySegment);
130        }
131        let right = parse_pipe(part)?;
132        expr = PipelineExpr::Sequential(Box::new(expr), Box::new(right));
133    }
134
135    Ok(expr)
136}
137
138/// Parse `|`-separated pipe chains (highest precedence).
139fn parse_pipe(input: &str) -> Result<PipelineExpr, PipelineParseError> {
140    let parts: Vec<&str> = input.split('|').collect();
141    if parts.len() == 1 {
142        return parse_single(input);
143    }
144
145    let mut iter = parts.into_iter();
146    let first = iter.next().unwrap().trim();
147    if first.is_empty() {
148        return Err(PipelineParseError::EmptySegment);
149    }
150    let mut expr = parse_single(first)?;
151
152    for part in iter {
153        let part = part.trim();
154        if part.is_empty() {
155            return Err(PipelineParseError::EmptySegment);
156        }
157        let right = parse_single(part)?;
158        expr = PipelineExpr::Pipe(Box::new(expr), Box::new(right));
159    }
160
161    Ok(expr)
162}
163
164/// Parse a single workflow identifier.
165fn parse_single(input: &str) -> Result<PipelineExpr, PipelineParseError> {
166    let input = input.trim();
167    if input.is_empty() {
168        return Err(PipelineParseError::EmptySegment);
169    }
170    Ok(PipelineExpr::Single(input.to_string()))
171}
172
173#[cfg(test)]
174mod tests {
175    use super::*;
176
177    #[test]
178    fn test_single() {
179        assert_eq!(
180            parse_pipeline_expr("w1").unwrap(),
181            PipelineExpr::Single("w1".into())
182        );
183    }
184
185    #[test]
186    fn test_pipe() {
187        assert_eq!(
188            parse_pipeline_expr("w1 | w2").unwrap(),
189            PipelineExpr::Pipe(
190                Box::new(PipelineExpr::Single("w1".into())),
191                Box::new(PipelineExpr::Single("w2".into())),
192            )
193        );
194    }
195
196    #[test]
197    fn test_sequential() {
198        assert_eq!(
199            parse_pipeline_expr("w1 && w2").unwrap(),
200            PipelineExpr::Sequential(
201                Box::new(PipelineExpr::Single("w1".into())),
202                Box::new(PipelineExpr::Single("w2".into())),
203            )
204        );
205    }
206
207    #[test]
208    fn test_parallel() {
209        assert_eq!(
210            parse_pipeline_expr("w1, w2").unwrap(),
211            PipelineExpr::Parallel(vec![
212                PipelineExpr::Single("w1".into()),
213                PipelineExpr::Single("w2".into()),
214            ])
215        );
216    }
217
218    #[test]
219    fn test_pipe_then_sequential() {
220        // "w1 | w2 && w3" → Sequential(Pipe(w1, w2), w3)
221        assert_eq!(
222            parse_pipeline_expr("w1 | w2 && w3").unwrap(),
223            PipelineExpr::Sequential(
224                Box::new(PipelineExpr::Pipe(
225                    Box::new(PipelineExpr::Single("w1".into())),
226                    Box::new(PipelineExpr::Single("w2".into())),
227                )),
228                Box::new(PipelineExpr::Single("w3".into())),
229            )
230        );
231    }
232
233    #[test]
234    fn test_full_combo() {
235        // "w1 | w2 && w3, w4" →
236        // Parallel([
237        //   Sequential(Pipe(w1, w2), w3),
238        //   w4,
239        // ])
240        assert_eq!(
241            parse_pipeline_expr("w1 | w2 && w3, w4").unwrap(),
242            PipelineExpr::Parallel(vec![
243                PipelineExpr::Sequential(
244                    Box::new(PipelineExpr::Pipe(
245                        Box::new(PipelineExpr::Single("w1".into())),
246                        Box::new(PipelineExpr::Single("w2".into())),
247                    )),
248                    Box::new(PipelineExpr::Single("w3".into())),
249                ),
250                PipelineExpr::Single("w4".into()),
251            ])
252        );
253    }
254
255    #[test]
256    fn test_empty_input() {
257        assert_eq!(parse_pipeline_expr(""), Err(PipelineParseError::EmptyInput));
258    }
259
260    #[test]
261    fn test_triple_pipe() {
262        // "w1 | w2 | w3" → Pipe(Pipe(w1, w2), w3)
263        assert_eq!(
264            parse_pipeline_expr("w1 | w2 | w3").unwrap(),
265            PipelineExpr::Pipe(
266                Box::new(PipelineExpr::Pipe(
267                    Box::new(PipelineExpr::Single("w1".into())),
268                    Box::new(PipelineExpr::Single("w2".into())),
269                )),
270                Box::new(PipelineExpr::Single("w3".into())),
271            )
272        );
273    }
274
275    #[test]
276    fn test_has_pipeline_syntax() {
277        assert!(!has_pipeline_syntax("simple-workflow"));
278        assert!(has_pipeline_syntax("w1 | w2"));
279        assert!(has_pipeline_syntax("w1 && w2"));
280        assert!(has_pipeline_syntax("w1, w2"));
281    }
282
283    #[test]
284    fn test_inject_input_with_value() {
285        // "hello world" contains a space, so shell-escape wraps in quotes
286        let result = inject_input("Analyze this: {{input}}\nDone.", Some("hello world"));
287        // Unix uses single quotes, Windows uses double quotes
288        assert!(
289            result == "Analyze this: 'hello world'\nDone."
290                || result == "Analyze this: \"hello world\"\nDone."
291        );
292    }
293
294    #[test]
295    fn test_inject_input_none() {
296        let result = inject_input("Prefix {{input}} suffix", None);
297        assert!(result == "Prefix '' suffix" || result == "Prefix \"\" suffix");
298    }
299
300    #[test]
301    fn test_inject_input_no_placeholder() {
302        let result = inject_input("no placeholder here", Some("data"));
303        assert_eq!(result, "no placeholder here");
304    }
305
306    #[test]
307    fn test_inject_input_multiple() {
308        let result = inject_input("{{input}} and {{input}}", Some("x"));
309        assert_eq!(result, "x and x");
310    }
311
312    #[test]
313    fn test_inject_input_shell_injection_prevented() {
314        let result = inject_input("echo {{input}}", Some("hello; rm -rf /"));
315        // The malicious input should be escaped, not executed literally
316        // Unix: 'hello; rm -rf /', Windows: "hello; rm -rf /"
317        assert!(result.contains("'hello; rm -rf /'") || result.contains("\"hello; rm -rf /\""));
318    }
319}