dk_runner/workflow/
parser.rs1use std::path::Path;
2use std::time::Duration;
3
4use anyhow::{bail, Context, Result};
5
6use super::types::*;
7
8pub fn parse_duration(s: &str) -> Result<Duration> {
9 let s = s.trim();
10 if let Some(mins) = s.strip_suffix('m') {
11 let n: u64 = mins.parse().context("invalid minutes")?;
12 return Ok(Duration::from_secs(n * 60));
13 }
14 if let Some(secs) = s.strip_suffix('s') {
15 let n: u64 = secs.parse().context("invalid seconds")?;
16 return Ok(Duration::from_secs(n));
17 }
18 if let Some(hours) = s.strip_suffix('h') {
19 let n: u64 = hours.parse().context("invalid hours")?;
20 return Ok(Duration::from_secs(n * 3600));
21 }
22 bail!(
23 "unsupported duration format: '{}' (use '5m', '120s', or '2h')",
24 s
25 )
26}
27
28pub fn parse_workflow_file(path: &Path) -> Result<Workflow> {
29 let content = std::fs::read_to_string(path)
30 .with_context(|| format!("failed to read workflow file: {}", path.display()))?;
31 parse_workflow_str(&content)
32}
33
34pub fn parse_workflow_str(content: &str) -> Result<Workflow> {
35 let file: WorkflowFile = toml::from_str(content).context("failed to parse workflow TOML")?;
36 let timeout = parse_duration(&file.pipeline.timeout)?;
37 let stages = file
38 .stage
39 .into_iter()
40 .map(resolve_stage)
41 .collect::<Result<Vec<_>>>()?;
42 Ok(Workflow {
43 name: file.pipeline.name,
44 timeout,
45 stages,
46 allowed_commands: vec![],
47 })
48}
49
50fn resolve_stage(sc: StageConfig) -> Result<Stage> {
51 let steps = sc
52 .step
53 .into_iter()
54 .map(resolve_step)
55 .collect::<Result<Vec<_>>>()?;
56 Ok(Stage {
57 name: sc.name,
58 parallel: sc.parallel,
59 steps,
60 })
61}
62
63fn resolve_step(sc: StepConfig) -> Result<Step> {
64 let timeout = match &sc.timeout {
65 Some(t) => parse_duration(t)?,
66 None => Duration::from_secs(120),
67 };
68 let step_type = match sc.step_type.as_deref() {
69 Some("semantic") => StepType::Semantic { checks: sc.check },
70 Some("agent-review") => StepType::AgentReview {
71 prompt: sc
72 .prompt
73 .unwrap_or_else(|| "Review this changeset".to_string()),
74 },
75 Some("human-approve") => StepType::HumanApprove,
76 Some("command") => {
77 let run = sc.run.context("step with type 'command' must have a 'run' field")?;
78 StepType::Command { run }
79 }
80 Some(other) => bail!("unknown step type: '{}'", other),
81 None => {
82 let run = sc.run.context("step must have either 'run' or 'type'")?;
83 StepType::Command { run }
84 }
85 };
86 Ok(Step {
87 name: sc.name,
88 step_type,
89 timeout,
90 required: sc.required,
91 changeset_aware: sc.changeset_aware,
92 })
93}
94
95pub async fn parse_yaml_workflow_file(path: &Path) -> Result<Workflow> {
96 let content = tokio::fs::read_to_string(path)
97 .await
98 .with_context(|| format!("failed to read workflow file: {}", path.display()))?;
99 parse_yaml_workflow_str(&content)
100}
101
102pub fn parse_yaml_workflow_str(content: &str) -> Result<Workflow> {
103 let file: YamlWorkflowFile =
104 serde_yml::from_str(content).context("failed to parse pipeline YAML")?;
105 let timeout = parse_duration(&file.pipeline.timeout)?;
106 let stages = file
107 .stages
108 .into_iter()
109 .map(resolve_yaml_stage)
110 .collect::<Result<Vec<_>>>()?;
111 Ok(Workflow {
112 name: file.pipeline.name,
113 timeout,
114 stages,
115 allowed_commands: file.pipeline.allowed_commands,
116 })
117}
118
119fn resolve_yaml_stage(sc: YamlStageConfig) -> Result<Stage> {
120 let steps = sc
121 .steps
122 .into_iter()
123 .map(resolve_yaml_step)
124 .collect::<Result<Vec<_>>>()?;
125 Ok(Stage {
126 name: sc.name,
127 parallel: sc.parallel,
128 steps,
129 })
130}
131
132fn resolve_yaml_step(sc: YamlStepConfig) -> Result<Step> {
133 let timeout = match &sc.timeout {
134 Some(t) => parse_duration(t)?,
135 None => Duration::from_secs(120),
136 };
137 let step_type = match sc.step_type.as_deref() {
138 Some("semantic") => StepType::Semantic { checks: sc.check },
139 Some("agent-review") => StepType::AgentReview {
140 prompt: sc
141 .prompt
142 .unwrap_or_else(|| "Review this changeset".to_string()),
143 },
144 Some("human-approve") => StepType::HumanApprove,
145 Some("command") | None => {
146 let run = sc.run.context("command step must have a 'run' field")?;
147 StepType::Command { run }
148 }
149 Some(other) => bail!("unknown step type: '{}'", other),
150 };
151 Ok(Step {
152 name: sc.name,
153 step_type,
154 timeout,
155 required: sc.required,
156 changeset_aware: sc.changeset_aware,
157 })
158}
159
160#[cfg(test)]
161mod tests {
162 use super::*;
163
164 #[test]
165 fn test_parse_duration_minutes() {
166 assert_eq!(parse_duration("5m").unwrap(), Duration::from_secs(300));
167 }
168
169 #[test]
170 fn test_parse_duration_seconds() {
171 assert_eq!(parse_duration("120s").unwrap(), Duration::from_secs(120));
172 }
173
174 #[test]
175 fn test_parse_duration_hours() {
176 assert_eq!(parse_duration("2h").unwrap(), Duration::from_secs(7200));
177 }
178
179 #[test]
180 fn test_parse_duration_invalid() {
181 assert!(parse_duration("abc").is_err());
182 }
183
184 #[test]
185 fn test_parse_basic_workflow() {
186 let toml = r#"
187[pipeline]
188name = "verify"
189timeout = "10m"
190
191[[stage]]
192name = "checks"
193parallel = true
194
195[[stage.step]]
196name = "typecheck"
197run = "cargo check"
198timeout = "2m"
199
200[[stage.step]]
201name = "test"
202run = "cargo test"
203timeout = "5m"
204changeset_aware = true
205"#;
206 let wf = parse_workflow_str(toml).unwrap();
207 assert_eq!(wf.name, "verify");
208 assert_eq!(wf.timeout, Duration::from_secs(600));
209 assert_eq!(wf.stages.len(), 1);
210 assert!(wf.stages[0].parallel);
211 assert_eq!(wf.stages[0].steps.len(), 2);
212 assert!(wf.stages[0].steps[1].changeset_aware);
213 }
214
215 #[test]
216 fn test_parse_gates_stage() {
217 let toml = r#"
218[pipeline]
219name = "full"
220
221[[stage]]
222name = "gates"
223
224[[stage.step]]
225name = "semantic-check"
226type = "semantic"
227check = ["no-unsafe-added", "types-consistent"]
228
229[[stage.step]]
230name = "agent-review"
231type = "agent-review"
232prompt = "Check for security issues"
233
234[[stage.step]]
235name = "human-approval"
236type = "human-approve"
237"#;
238 let wf = parse_workflow_str(toml).unwrap();
239 assert_eq!(wf.stages.len(), 1);
240 let steps = &wf.stages[0].steps;
241 assert_eq!(steps.len(), 3);
242 assert!(
243 matches!(&steps[0].step_type, StepType::Semantic { checks } if checks.len() == 2)
244 );
245 assert!(matches!(&steps[1].step_type, StepType::AgentReview { .. }));
246 assert!(matches!(&steps[2].step_type, StepType::HumanApprove));
247 }
248
249 #[test]
250 fn test_step_without_run_or_type_fails() {
251 let toml = r#"
252[pipeline]
253name = "bad"
254
255[[stage]]
256name = "s"
257
258[[stage.step]]
259name = "missing"
260"#;
261 assert!(parse_workflow_str(toml).is_err());
262 }
263
264 #[test]
265 fn test_parse_yaml_basic_workflow() {
266 let yaml = r#"
267pipeline:
268 name: verify
269 timeout: 10m
270
271stages:
272 - name: checks
273 parallel: true
274 steps:
275 - name: typecheck
276 run: cargo check
277 timeout: 2m
278
279 - name: test
280 run: cargo test
281 timeout: 5m
282 changeset_aware: true
283"#;
284 let wf = parse_yaml_workflow_str(yaml).unwrap();
285 assert_eq!(wf.name, "verify");
286 assert_eq!(wf.timeout, Duration::from_secs(600));
287 assert_eq!(wf.stages.len(), 1);
288 assert!(wf.stages[0].parallel);
289 assert_eq!(wf.stages[0].steps.len(), 2);
290 assert!(wf.stages[0].steps[1].changeset_aware);
291 assert!(wf.allowed_commands.is_empty());
292 }
293
294 #[test]
295 fn test_parse_yaml_with_allowed_commands() {
296 let yaml = r#"
297pipeline:
298 name: custom
299 timeout: 5m
300 allowed_commands:
301 - cargo check
302 - cargo clippy
303 - eslint
304
305stages:
306 - name: lint
307 steps:
308 - name: clippy
309 run: cargo clippy
310 timeout: 60s
311"#;
312 let wf = parse_yaml_workflow_str(yaml).unwrap();
313 assert_eq!(wf.allowed_commands.len(), 3);
314 assert_eq!(wf.allowed_commands[0], "cargo check");
315 }
316
317 #[test]
318 fn test_parse_yaml_gates_stage() {
319 let yaml = r#"
320pipeline:
321 name: full
322
323stages:
324 - name: gates
325 steps:
326 - name: semantic-check
327 type: semantic
328 check:
329 - no-unsafe-added
330 - types-consistent
331
332 - name: agent-review
333 type: agent-review
334 prompt: Check for security issues
335
336 - name: human-approval
337 type: human-approve
338"#;
339 let wf = parse_yaml_workflow_str(yaml).unwrap();
340 let steps = &wf.stages[0].steps;
341 assert_eq!(steps.len(), 3);
342 assert!(matches!(&steps[0].step_type, StepType::Semantic { checks } if checks.len() == 2));
343 assert!(matches!(&steps[1].step_type, StepType::AgentReview { .. }));
344 assert!(matches!(&steps[2].step_type, StepType::HumanApprove));
345 }
346
347 #[test]
348 fn test_parse_yaml_step_without_run_or_type_fails() {
349 let yaml = r#"
350pipeline:
351 name: bad
352
353stages:
354 - name: s
355 steps:
356 - name: missing
357"#;
358 assert!(parse_yaml_workflow_str(yaml).is_err());
359 }
360}