Skip to main content

dk_runner/workflow/
parser.rs

1use std::path::Path;
2use std::time::Duration;
3
4use anyhow::{bail, Context, Result};
5
6use super::types::*;
7
8pub fn parse_duration(s: &str) -> Result<Duration> {
9    let s = s.trim();
10    if let Some(mins) = s.strip_suffix('m') {
11        let n: u64 = mins.parse().context("invalid minutes")?;
12        return Ok(Duration::from_secs(n * 60));
13    }
14    if let Some(secs) = s.strip_suffix('s') {
15        let n: u64 = secs.parse().context("invalid seconds")?;
16        return Ok(Duration::from_secs(n));
17    }
18    if let Some(hours) = s.strip_suffix('h') {
19        let n: u64 = hours.parse().context("invalid hours")?;
20        return Ok(Duration::from_secs(n * 3600));
21    }
22    bail!(
23        "unsupported duration format: '{}' (use '5m', '120s', or '2h')",
24        s
25    )
26}
27
28pub fn parse_workflow_file(path: &Path) -> Result<Workflow> {
29    let content = std::fs::read_to_string(path)
30        .with_context(|| format!("failed to read workflow file: {}", path.display()))?;
31    parse_workflow_str(&content)
32}
33
34pub fn parse_workflow_str(content: &str) -> Result<Workflow> {
35    let file: WorkflowFile = toml::from_str(content).context("failed to parse workflow TOML")?;
36    let timeout = parse_duration(&file.pipeline.timeout)?;
37    let stages = file
38        .stage
39        .into_iter()
40        .map(resolve_stage)
41        .collect::<Result<Vec<_>>>()?;
42    Ok(Workflow {
43        name: file.pipeline.name,
44        timeout,
45        stages,
46        allowed_commands: vec![],
47    })
48}
49
50fn resolve_stage(sc: StageConfig) -> Result<Stage> {
51    let steps = sc
52        .step
53        .into_iter()
54        .map(resolve_step)
55        .collect::<Result<Vec<_>>>()?;
56    Ok(Stage {
57        name: sc.name,
58        parallel: sc.parallel,
59        steps,
60    })
61}
62
63fn resolve_step(sc: StepConfig) -> Result<Step> {
64    let timeout = match &sc.timeout {
65        Some(t) => parse_duration(t)?,
66        None => Duration::from_secs(120),
67    };
68    let step_type = match sc.step_type.as_deref() {
69        Some("semantic") => StepType::Semantic { checks: sc.check },
70        Some("agent-review") => StepType::AgentReview {
71            prompt: sc
72                .prompt
73                .unwrap_or_else(|| "Review this changeset".to_string()),
74        },
75        Some("human-approve") => StepType::HumanApprove,
76        Some("command") => {
77            let run = sc.run.context("step with type 'command' must have a 'run' field")?;
78            StepType::Command { run }
79        }
80        Some(other) => bail!("unknown step type: '{}'", other),
81        None => {
82            let run = sc.run.context("step must have either 'run' or 'type'")?;
83            StepType::Command { run }
84        }
85    };
86    Ok(Step {
87        name: sc.name,
88        step_type,
89        timeout,
90        required: sc.required,
91        changeset_aware: sc.changeset_aware,
92        work_dir: None,
93    })
94}
95
96pub async fn parse_yaml_workflow_file(path: &Path) -> Result<Workflow> {
97    let content = tokio::fs::read_to_string(path)
98        .await
99        .with_context(|| format!("failed to read workflow file: {}", path.display()))?;
100    parse_yaml_workflow_str(&content)
101}
102
103pub fn parse_yaml_workflow_str(content: &str) -> Result<Workflow> {
104    let file: YamlWorkflowFile =
105        serde_yml::from_str(content).context("failed to parse pipeline YAML")?;
106    let timeout = parse_duration(&file.pipeline.timeout)?;
107    let stages = file
108        .stages
109        .into_iter()
110        .map(resolve_yaml_stage)
111        .collect::<Result<Vec<_>>>()?;
112    Ok(Workflow {
113        name: file.pipeline.name,
114        timeout,
115        stages,
116        allowed_commands: file.pipeline.allowed_commands,
117    })
118}
119
120fn resolve_yaml_stage(sc: YamlStageConfig) -> Result<Stage> {
121    let steps = sc
122        .steps
123        .into_iter()
124        .map(resolve_yaml_step)
125        .collect::<Result<Vec<_>>>()?;
126    Ok(Stage {
127        name: sc.name,
128        parallel: sc.parallel,
129        steps,
130    })
131}
132
133fn resolve_yaml_step(sc: YamlStepConfig) -> Result<Step> {
134    let timeout = match &sc.timeout {
135        Some(t) => parse_duration(t)?,
136        None => Duration::from_secs(120),
137    };
138    let step_type = match sc.step_type.as_deref() {
139        Some("semantic") => StepType::Semantic { checks: sc.check },
140        Some("agent-review") => StepType::AgentReview {
141            prompt: sc
142                .prompt
143                .unwrap_or_else(|| "Review this changeset".to_string()),
144        },
145        Some("human-approve") => StepType::HumanApprove,
146        Some("command") | None => {
147            let run = sc.run.context("command step must have a 'run' field")?;
148            StepType::Command { run }
149        }
150        Some(other) => bail!("unknown step type: '{}'", other),
151    };
152    Ok(Step {
153        name: sc.name,
154        step_type,
155        timeout,
156        required: sc.required,
157        changeset_aware: sc.changeset_aware,
158        work_dir: None,
159    })
160}
161
162#[cfg(test)]
163mod tests {
164    use super::*;
165
166    #[test]
167    fn test_parse_duration_minutes() {
168        assert_eq!(parse_duration("5m").unwrap(), Duration::from_secs(300));
169    }
170
171    #[test]
172    fn test_parse_duration_seconds() {
173        assert_eq!(parse_duration("120s").unwrap(), Duration::from_secs(120));
174    }
175
176    #[test]
177    fn test_parse_duration_hours() {
178        assert_eq!(parse_duration("2h").unwrap(), Duration::from_secs(7200));
179    }
180
181    #[test]
182    fn test_parse_duration_invalid() {
183        assert!(parse_duration("abc").is_err());
184    }
185
186    #[test]
187    fn test_parse_basic_workflow() {
188        let toml = r#"
189[pipeline]
190name = "verify"
191timeout = "10m"
192
193[[stage]]
194name = "checks"
195parallel = true
196
197[[stage.step]]
198name = "typecheck"
199run = "cargo check"
200timeout = "2m"
201
202[[stage.step]]
203name = "test"
204run = "cargo test"
205timeout = "5m"
206changeset_aware = true
207"#;
208        let wf = parse_workflow_str(toml).unwrap();
209        assert_eq!(wf.name, "verify");
210        assert_eq!(wf.timeout, Duration::from_secs(600));
211        assert_eq!(wf.stages.len(), 1);
212        assert!(wf.stages[0].parallel);
213        assert_eq!(wf.stages[0].steps.len(), 2);
214        assert!(wf.stages[0].steps[1].changeset_aware);
215    }
216
217    #[test]
218    fn test_parse_gates_stage() {
219        let toml = r#"
220[pipeline]
221name = "full"
222
223[[stage]]
224name = "gates"
225
226[[stage.step]]
227name = "semantic-check"
228type = "semantic"
229check = ["no-unsafe-added", "types-consistent"]
230
231[[stage.step]]
232name = "agent-review"
233type = "agent-review"
234prompt = "Check for security issues"
235
236[[stage.step]]
237name = "human-approval"
238type = "human-approve"
239"#;
240        let wf = parse_workflow_str(toml).unwrap();
241        assert_eq!(wf.stages.len(), 1);
242        let steps = &wf.stages[0].steps;
243        assert_eq!(steps.len(), 3);
244        assert!(
245            matches!(&steps[0].step_type, StepType::Semantic { checks } if checks.len() == 2)
246        );
247        assert!(matches!(&steps[1].step_type, StepType::AgentReview { .. }));
248        assert!(matches!(&steps[2].step_type, StepType::HumanApprove));
249    }
250
251    #[test]
252    fn test_step_without_run_or_type_fails() {
253        let toml = r#"
254[pipeline]
255name = "bad"
256
257[[stage]]
258name = "s"
259
260[[stage.step]]
261name = "missing"
262"#;
263        assert!(parse_workflow_str(toml).is_err());
264    }
265
266    #[test]
267    fn test_parse_yaml_basic_workflow() {
268        let yaml = r#"
269pipeline:
270  name: verify
271  timeout: 10m
272
273stages:
274  - name: checks
275    parallel: true
276    steps:
277      - name: typecheck
278        run: cargo check
279        timeout: 2m
280
281      - name: test
282        run: cargo test
283        timeout: 5m
284        changeset_aware: true
285"#;
286        let wf = parse_yaml_workflow_str(yaml).unwrap();
287        assert_eq!(wf.name, "verify");
288        assert_eq!(wf.timeout, Duration::from_secs(600));
289        assert_eq!(wf.stages.len(), 1);
290        assert!(wf.stages[0].parallel);
291        assert_eq!(wf.stages[0].steps.len(), 2);
292        assert!(wf.stages[0].steps[1].changeset_aware);
293        assert!(wf.allowed_commands.is_empty());
294    }
295
296    #[test]
297    fn test_parse_yaml_with_allowed_commands() {
298        let yaml = r#"
299pipeline:
300  name: custom
301  timeout: 5m
302  allowed_commands:
303    - cargo check
304    - cargo clippy
305    - eslint
306
307stages:
308  - name: lint
309    steps:
310      - name: clippy
311        run: cargo clippy
312        timeout: 60s
313"#;
314        let wf = parse_yaml_workflow_str(yaml).unwrap();
315        assert_eq!(wf.allowed_commands.len(), 3);
316        assert_eq!(wf.allowed_commands[0], "cargo check");
317    }
318
319    #[test]
320    fn test_parse_yaml_gates_stage() {
321        let yaml = r#"
322pipeline:
323  name: full
324
325stages:
326  - name: gates
327    steps:
328      - name: semantic-check
329        type: semantic
330        check:
331          - no-unsafe-added
332          - types-consistent
333
334      - name: agent-review
335        type: agent-review
336        prompt: Check for security issues
337
338      - name: human-approval
339        type: human-approve
340"#;
341        let wf = parse_yaml_workflow_str(yaml).unwrap();
342        let steps = &wf.stages[0].steps;
343        assert_eq!(steps.len(), 3);
344        assert!(matches!(&steps[0].step_type, StepType::Semantic { checks } if checks.len() == 2));
345        assert!(matches!(&steps[1].step_type, StepType::AgentReview { .. }));
346        assert!(matches!(&steps[2].step_type, StepType::HumanApprove));
347    }
348
349    #[test]
350    fn test_parse_yaml_step_without_run_or_type_fails() {
351        let yaml = r#"
352pipeline:
353  name: bad
354
355stages:
356  - name: s
357    steps:
358      - name: missing
359"#;
360        assert!(parse_yaml_workflow_str(yaml).is_err());
361    }
362}