dk_runner/workflow/
parser.rs1use std::path::Path;
2use std::time::Duration;
3
4use anyhow::{bail, Context, Result};
5
6use super::types::*;
7
8pub fn parse_duration(s: &str) -> Result<Duration> {
9 let s = s.trim();
10 if let Some(mins) = s.strip_suffix('m') {
11 let n: u64 = mins.parse().context("invalid minutes")?;
12 return Ok(Duration::from_secs(n * 60));
13 }
14 if let Some(secs) = s.strip_suffix('s') {
15 let n: u64 = secs.parse().context("invalid seconds")?;
16 return Ok(Duration::from_secs(n));
17 }
18 if let Some(hours) = s.strip_suffix('h') {
19 let n: u64 = hours.parse().context("invalid hours")?;
20 return Ok(Duration::from_secs(n * 3600));
21 }
22 bail!(
23 "unsupported duration format: '{}' (use '5m', '120s', or '2h')",
24 s
25 )
26}
27
28pub fn parse_workflow_file(path: &Path) -> Result<Workflow> {
29 let content = std::fs::read_to_string(path)
30 .with_context(|| format!("failed to read workflow file: {}", path.display()))?;
31 parse_workflow_str(&content)
32}
33
34pub fn parse_workflow_str(content: &str) -> Result<Workflow> {
35 let file: WorkflowFile = toml::from_str(content).context("failed to parse workflow TOML")?;
36 let timeout = parse_duration(&file.pipeline.timeout)?;
37 let stages = file
38 .stage
39 .into_iter()
40 .map(resolve_stage)
41 .collect::<Result<Vec<_>>>()?;
42 Ok(Workflow {
43 name: file.pipeline.name,
44 timeout,
45 stages,
46 allowed_commands: vec![],
47 })
48}
49
50fn resolve_stage(sc: StageConfig) -> Result<Stage> {
51 let steps = sc
52 .step
53 .into_iter()
54 .map(resolve_step)
55 .collect::<Result<Vec<_>>>()?;
56 Ok(Stage {
57 name: sc.name,
58 parallel: sc.parallel,
59 steps,
60 })
61}
62
63fn resolve_step(sc: StepConfig) -> Result<Step> {
64 let timeout = match &sc.timeout {
65 Some(t) => parse_duration(t)?,
66 None => Duration::from_secs(120),
67 };
68 let step_type = match sc.step_type.as_deref() {
69 Some("semantic") => StepType::Semantic { checks: sc.check },
70 Some("agent-review") => StepType::AgentReview {
71 prompt: sc
72 .prompt
73 .unwrap_or_else(|| "Review this changeset".to_string()),
74 },
75 Some("human-approve") => StepType::HumanApprove,
76 Some("command") => {
77 let run = sc.run.context("step with type 'command' must have a 'run' field")?;
78 StepType::Command { run }
79 }
80 Some(other) => bail!("unknown step type: '{}'", other),
81 None => {
82 let run = sc.run.context("step must have either 'run' or 'type'")?;
83 StepType::Command { run }
84 }
85 };
86 Ok(Step {
87 name: sc.name,
88 step_type,
89 timeout,
90 required: sc.required,
91 changeset_aware: sc.changeset_aware,
92 work_dir: None,
93 })
94}
95
96pub async fn parse_yaml_workflow_file(path: &Path) -> Result<Workflow> {
97 let content = tokio::fs::read_to_string(path)
98 .await
99 .with_context(|| format!("failed to read workflow file: {}", path.display()))?;
100 parse_yaml_workflow_str(&content)
101}
102
103pub fn parse_yaml_workflow_str(content: &str) -> Result<Workflow> {
104 let file: YamlWorkflowFile =
105 serde_yml::from_str(content).context("failed to parse pipeline YAML")?;
106 let timeout = parse_duration(&file.pipeline.timeout)?;
107 let stages = file
108 .stages
109 .into_iter()
110 .map(resolve_yaml_stage)
111 .collect::<Result<Vec<_>>>()?;
112 Ok(Workflow {
113 name: file.pipeline.name,
114 timeout,
115 stages,
116 allowed_commands: file.pipeline.allowed_commands,
117 })
118}
119
120fn resolve_yaml_stage(sc: YamlStageConfig) -> Result<Stage> {
121 let steps = sc
122 .steps
123 .into_iter()
124 .map(resolve_yaml_step)
125 .collect::<Result<Vec<_>>>()?;
126 Ok(Stage {
127 name: sc.name,
128 parallel: sc.parallel,
129 steps,
130 })
131}
132
133fn resolve_yaml_step(sc: YamlStepConfig) -> Result<Step> {
134 let timeout = match &sc.timeout {
135 Some(t) => parse_duration(t)?,
136 None => Duration::from_secs(120),
137 };
138 let step_type = match sc.step_type.as_deref() {
139 Some("semantic") => StepType::Semantic { checks: sc.check },
140 Some("agent-review") => StepType::AgentReview {
141 prompt: sc
142 .prompt
143 .unwrap_or_else(|| "Review this changeset".to_string()),
144 },
145 Some("human-approve") => StepType::HumanApprove,
146 Some("command") | None => {
147 let run = sc.run.context("command step must have a 'run' field")?;
148 StepType::Command { run }
149 }
150 Some(other) => bail!("unknown step type: '{}'", other),
151 };
152 Ok(Step {
153 name: sc.name,
154 step_type,
155 timeout,
156 required: sc.required,
157 changeset_aware: sc.changeset_aware,
158 work_dir: None,
159 })
160}
161
162#[cfg(test)]
163mod tests {
164 use super::*;
165
166 #[test]
167 fn test_parse_duration_minutes() {
168 assert_eq!(parse_duration("5m").unwrap(), Duration::from_secs(300));
169 }
170
171 #[test]
172 fn test_parse_duration_seconds() {
173 assert_eq!(parse_duration("120s").unwrap(), Duration::from_secs(120));
174 }
175
176 #[test]
177 fn test_parse_duration_hours() {
178 assert_eq!(parse_duration("2h").unwrap(), Duration::from_secs(7200));
179 }
180
181 #[test]
182 fn test_parse_duration_invalid() {
183 assert!(parse_duration("abc").is_err());
184 }
185
186 #[test]
187 fn test_parse_basic_workflow() {
188 let toml = r#"
189[pipeline]
190name = "verify"
191timeout = "10m"
192
193[[stage]]
194name = "checks"
195parallel = true
196
197[[stage.step]]
198name = "typecheck"
199run = "cargo check"
200timeout = "2m"
201
202[[stage.step]]
203name = "test"
204run = "cargo test"
205timeout = "5m"
206changeset_aware = true
207"#;
208 let wf = parse_workflow_str(toml).unwrap();
209 assert_eq!(wf.name, "verify");
210 assert_eq!(wf.timeout, Duration::from_secs(600));
211 assert_eq!(wf.stages.len(), 1);
212 assert!(wf.stages[0].parallel);
213 assert_eq!(wf.stages[0].steps.len(), 2);
214 assert!(wf.stages[0].steps[1].changeset_aware);
215 }
216
217 #[test]
218 fn test_parse_gates_stage() {
219 let toml = r#"
220[pipeline]
221name = "full"
222
223[[stage]]
224name = "gates"
225
226[[stage.step]]
227name = "semantic-check"
228type = "semantic"
229check = ["no-unsafe-added", "types-consistent"]
230
231[[stage.step]]
232name = "agent-review"
233type = "agent-review"
234prompt = "Check for security issues"
235
236[[stage.step]]
237name = "human-approval"
238type = "human-approve"
239"#;
240 let wf = parse_workflow_str(toml).unwrap();
241 assert_eq!(wf.stages.len(), 1);
242 let steps = &wf.stages[0].steps;
243 assert_eq!(steps.len(), 3);
244 assert!(
245 matches!(&steps[0].step_type, StepType::Semantic { checks } if checks.len() == 2)
246 );
247 assert!(matches!(&steps[1].step_type, StepType::AgentReview { .. }));
248 assert!(matches!(&steps[2].step_type, StepType::HumanApprove));
249 }
250
251 #[test]
252 fn test_step_without_run_or_type_fails() {
253 let toml = r#"
254[pipeline]
255name = "bad"
256
257[[stage]]
258name = "s"
259
260[[stage.step]]
261name = "missing"
262"#;
263 assert!(parse_workflow_str(toml).is_err());
264 }
265
266 #[test]
267 fn test_parse_yaml_basic_workflow() {
268 let yaml = r#"
269pipeline:
270 name: verify
271 timeout: 10m
272
273stages:
274 - name: checks
275 parallel: true
276 steps:
277 - name: typecheck
278 run: cargo check
279 timeout: 2m
280
281 - name: test
282 run: cargo test
283 timeout: 5m
284 changeset_aware: true
285"#;
286 let wf = parse_yaml_workflow_str(yaml).unwrap();
287 assert_eq!(wf.name, "verify");
288 assert_eq!(wf.timeout, Duration::from_secs(600));
289 assert_eq!(wf.stages.len(), 1);
290 assert!(wf.stages[0].parallel);
291 assert_eq!(wf.stages[0].steps.len(), 2);
292 assert!(wf.stages[0].steps[1].changeset_aware);
293 assert!(wf.allowed_commands.is_empty());
294 }
295
296 #[test]
297 fn test_parse_yaml_with_allowed_commands() {
298 let yaml = r#"
299pipeline:
300 name: custom
301 timeout: 5m
302 allowed_commands:
303 - cargo check
304 - cargo clippy
305 - eslint
306
307stages:
308 - name: lint
309 steps:
310 - name: clippy
311 run: cargo clippy
312 timeout: 60s
313"#;
314 let wf = parse_yaml_workflow_str(yaml).unwrap();
315 assert_eq!(wf.allowed_commands.len(), 3);
316 assert_eq!(wf.allowed_commands[0], "cargo check");
317 }
318
319 #[test]
320 fn test_parse_yaml_gates_stage() {
321 let yaml = r#"
322pipeline:
323 name: full
324
325stages:
326 - name: gates
327 steps:
328 - name: semantic-check
329 type: semantic
330 check:
331 - no-unsafe-added
332 - types-consistent
333
334 - name: agent-review
335 type: agent-review
336 prompt: Check for security issues
337
338 - name: human-approval
339 type: human-approve
340"#;
341 let wf = parse_yaml_workflow_str(yaml).unwrap();
342 let steps = &wf.stages[0].steps;
343 assert_eq!(steps.len(), 3);
344 assert!(matches!(&steps[0].step_type, StepType::Semantic { checks } if checks.len() == 2));
345 assert!(matches!(&steps[1].step_type, StepType::AgentReview { .. }));
346 assert!(matches!(&steps[2].step_type, StepType::HumanApprove));
347 }
348
349 #[test]
350 fn test_parse_yaml_step_without_run_or_type_fails() {
351 let yaml = r#"
352pipeline:
353 name: bad
354
355stages:
356 - name: s
357 steps:
358 - name: missing
359"#;
360 assert!(parse_yaml_workflow_str(yaml).is_err());
361 }
362}