1use std::collections::HashMap;
2use std::path::Path;
3use std::sync::Arc;
4
5use tokio::sync::mpsc;
6use tracing::info;
7use uuid::Uuid;
8
9use dk_engine::repo::Engine;
10
11use crate::changeset::scope_command_to_changeset;
12use crate::executor::{Executor, StepOutput, StepStatus};
13use crate::findings::{Finding, Suggestion};
14use crate::steps::{agent_review, command, human_approve, semantic};
15use crate::workflow::types::{Stage, Step, StepType, Workflow};
16
17#[derive(Debug, Clone)]
19pub struct StepResult {
20 pub stage_name: String,
21 pub step_name: String,
22 pub status: StepStatus,
23 pub output: String,
24 pub required: bool,
25 pub findings: Vec<Finding>,
26 pub suggestions: Vec<Suggestion>,
27}
28
29pub async fn run_workflow(
35 workflow: &Workflow,
36 executor: &dyn Executor,
37 work_dir: &Path,
38 changeset_files: &[String],
39 env: &HashMap<String, String>,
40 tx: &mpsc::Sender<StepResult>,
41 engine: Option<&Arc<Engine>>,
42 repo_id: Option<Uuid>,
43 changeset_id: Option<Uuid>,
44) -> bool {
45 let mut all_passed = true;
46
47 for stage in &workflow.stages {
48 info!(stage = %stage.name, parallel = stage.parallel, "running stage");
49
50 let results = if stage.parallel {
51 run_stage_parallel(stage, executor, work_dir, changeset_files, env, engine, repo_id, changeset_id)
52 .await
53 } else {
54 run_stage_sequential(stage, executor, work_dir, changeset_files, env, engine, repo_id, changeset_id)
55 .await
56 };
57
58 for result in results {
59 if result.status != StepStatus::Pass && result.required {
60 all_passed = false;
61 }
62 let _ = tx.send(result).await;
63 }
64 }
65
66 all_passed
67}
68
69async fn run_stage_parallel(
70 stage: &Stage,
71 executor: &dyn Executor,
72 work_dir: &Path,
73 changeset_files: &[String],
74 env: &HashMap<String, String>,
75 engine: Option<&Arc<Engine>>,
76 repo_id: Option<Uuid>,
77 changeset_id: Option<Uuid>,
78) -> Vec<StepResult> {
79 let mut futures = Vec::new();
80 for step in &stage.steps {
81 futures.push(run_single_step(
82 &stage.name,
83 step,
84 executor,
85 work_dir,
86 changeset_files,
87 env,
88 engine,
89 repo_id,
90 changeset_id,
91 ));
92 }
93 futures::future::join_all(futures).await
94}
95
96async fn run_stage_sequential(
97 stage: &Stage,
98 executor: &dyn Executor,
99 work_dir: &Path,
100 changeset_files: &[String],
101 env: &HashMap<String, String>,
102 engine: Option<&Arc<Engine>>,
103 repo_id: Option<Uuid>,
104 changeset_id: Option<Uuid>,
105) -> Vec<StepResult> {
106 let mut results = Vec::new();
107 for step in &stage.steps {
108 let result = run_single_step(
109 &stage.name,
110 step,
111 executor,
112 work_dir,
113 changeset_files,
114 env,
115 engine,
116 repo_id,
117 changeset_id,
118 )
119 .await;
120 let failed_required = step.required && result.status != StepStatus::Pass;
121 results.push(result);
122 if failed_required {
125 tracing::warn!(
126 stage = %stage.name,
127 step = %step.name,
128 "required step failed — aborting remaining steps in sequential stage"
129 );
130 break;
131 }
132 }
133 results
134}
135
136async fn run_single_step(
137 stage_name: &str,
138 step: &Step,
139 executor: &dyn Executor,
140 work_dir: &Path,
141 changeset_files: &[String],
142 env: &HashMap<String, String>,
143 engine: Option<&Arc<Engine>>,
144 repo_id: Option<Uuid>,
145 changeset_id: Option<Uuid>,
146) -> StepResult {
147 info!(step = %step.name, "running step");
148
149 match &step.step_type {
150 StepType::Command { run } => {
151 let cmd = if step.changeset_aware {
152 scope_command_to_changeset(run, changeset_files)
153 .unwrap_or_else(|| run.clone())
154 } else {
155 run.clone()
156 };
157 let output =
158 match command::run_command_step(executor, &cmd, work_dir, step.timeout, env).await {
159 Ok(out) => out,
160 Err(e) => StepOutput {
161 status: StepStatus::Fail,
162 stdout: String::new(),
163 stderr: e.to_string(),
164 duration: std::time::Duration::ZERO,
165 },
166 };
167
168 let combined_output = if output.stderr.is_empty() {
169 output.stdout
170 } else {
171 format!("{}{}", output.stdout, output.stderr)
172 };
173
174 StepResult {
175 stage_name: stage_name.to_string(),
176 step_name: step.name.clone(),
177 status: output.status,
178 output: combined_output,
179 required: step.required,
180 findings: Vec::new(),
181 suggestions: Vec::new(),
182 }
183 }
184 StepType::Semantic { checks } => {
185 if let (Some(eng), Some(rid)) = (engine, repo_id) {
186 let (output, findings, suggestions) = semantic::run_semantic_step(
188 eng,
189 rid,
190 changeset_files,
191 work_dir,
192 checks,
193 )
194 .await;
195
196 let combined_output = if output.stderr.is_empty() {
197 output.stdout
198 } else {
199 format!("{}{}", output.stdout, output.stderr)
200 };
201
202 StepResult {
203 stage_name: stage_name.to_string(),
204 step_name: step.name.clone(),
205 status: output.status,
206 output: combined_output,
207 required: step.required,
208 findings,
209 suggestions,
210 }
211 } else {
212 let output = semantic::run_semantic_step_simple(checks).await;
214
215 let combined_output = if output.stderr.is_empty() {
216 output.stdout
217 } else {
218 format!("{}{}", output.stdout, output.stderr)
219 };
220
221 StepResult {
222 stage_name: stage_name.to_string(),
223 step_name: step.name.clone(),
224 status: output.status,
225 output: combined_output,
226 required: step.required,
227 findings: Vec::new(),
228 suggestions: Vec::new(),
229 }
230 }
231 }
232 StepType::AgentReview { prompt } => {
233 let provider = agent_review::claude::ClaudeReviewProvider::from_env();
234 if let Some(provider) = provider {
235 let mut diff = String::new();
236 let mut files = Vec::new();
237 for path in changeset_files {
238 let full_path = work_dir.join(path);
239 if let Ok(content) = tokio::fs::read_to_string(&full_path).await {
240 diff.push_str(&format!("--- {path}\n+++ {path}\n{content}\n"));
241 files.push(agent_review::provider::FileContext {
242 path: path.clone(),
243 content,
244 });
245 }
246 }
247 let (output, findings, suggestions) =
248 agent_review::run_agent_review_step_with_provider(
249 &provider, &diff, files, prompt,
250 )
251 .await;
252 return StepResult {
253 stage_name: stage_name.to_string(),
254 step_name: step.name.clone(),
255 status: output.status,
256 output: if output.stderr.is_empty() {
257 output.stdout
258 } else {
259 format!("{}{}", output.stdout, output.stderr)
260 },
261 required: step.required,
262 findings,
263 suggestions,
264 };
265 }
266 let output = agent_review::run_agent_review_step(prompt).await;
268 StepResult {
269 stage_name: stage_name.to_string(),
270 step_name: step.name.clone(),
271 status: output.status,
272 output: if output.stderr.is_empty() {
273 output.stdout
274 } else {
275 format!("{}{}", output.stdout, output.stderr)
276 },
277 required: step.required,
278 findings: Vec::new(),
279 suggestions: Vec::new(),
280 }
281 }
282 StepType::HumanApprove => {
283 if let (Some(eng), Some(cid)) = (engine, changeset_id) {
284 let (output, findings) = human_approve::run_human_approve_step_with_engine(
285 eng, cid, Some(step.timeout),
286 ).await;
287 return StepResult {
288 stage_name: stage_name.to_string(),
289 step_name: step.name.clone(),
290 status: output.status,
291 output: if output.stderr.is_empty() { output.stdout } else { format!("{}{}", output.stdout, output.stderr) },
292 required: step.required,
293 findings,
294 suggestions: Vec::new(),
295 };
296 }
297 let output = human_approve::run_human_approve_step().await;
298 StepResult {
299 stage_name: stage_name.to_string(),
300 step_name: step.name.clone(),
301 status: output.status,
302 output: if output.stderr.is_empty() { output.stdout } else { format!("{}{}", output.stdout, output.stderr) },
303 required: step.required,
304 findings: Vec::new(),
305 suggestions: Vec::new(),
306 }
307 }
308 }
309}
310
311#[cfg(test)]
312mod tests {
313 use super::*;
314 use crate::executor::process::ProcessExecutor;
315 use crate::workflow::types::*;
316 use std::time::Duration;
317
318 #[tokio::test]
319 async fn test_run_workflow_passes() {
320 let wf = Workflow {
321 name: "test".into(),
322 timeout: Duration::from_secs(30),
323 stages: vec![Stage {
324 name: "checks".into(),
325 parallel: false,
326 steps: vec![Step {
327 name: "echo-test".into(),
328 step_type: StepType::Command {
329 run: "echo hello".into(),
330 },
331 timeout: Duration::from_secs(5),
332 required: true,
333 changeset_aware: false,
334 }],
335 }],
336 allowed_commands: vec![],
337 };
338
339 let exec = ProcessExecutor::new();
340 let (tx, mut rx) = mpsc::channel(32);
341 let dir = std::env::temp_dir();
342
343 let passed =
344 run_workflow(&wf, &exec, &dir, &[], &HashMap::new(), &tx, None, None, None).await;
345 drop(tx);
346 assert!(passed);
347 let result = rx.recv().await.unwrap();
348 assert_eq!(result.status, StepStatus::Pass);
349 }
350
351 #[tokio::test]
352 async fn test_failing_required_step() {
353 let wf = Workflow {
354 name: "test".into(),
355 timeout: Duration::from_secs(30),
356 stages: vec![Stage {
357 name: "checks".into(),
358 parallel: false,
359 steps: vec![Step {
360 name: "disallowed".into(),
361 step_type: StepType::Command {
362 run: "false_cmd_not_in_allowlist".into(),
363 },
364 timeout: Duration::from_secs(5),
365 required: true,
366 changeset_aware: false,
367 }],
368 }],
369 allowed_commands: vec![],
370 };
371
372 let exec = ProcessExecutor::new();
373 let (tx, _rx) = mpsc::channel(32);
374 let dir = std::env::temp_dir();
375
376 let passed =
377 run_workflow(&wf, &exec, &dir, &[], &HashMap::new(), &tx, None, None, None).await;
378 drop(tx);
379 assert!(!passed);
380 }
381
382 #[tokio::test]
383 async fn test_parallel_stage() {
384 let wf = Workflow {
385 name: "test".into(),
386 timeout: Duration::from_secs(30),
387 stages: vec![Stage {
388 name: "parallel-checks".into(),
389 parallel: true,
390 steps: vec![
391 Step {
392 name: "echo-a".into(),
393 step_type: StepType::Command {
394 run: "echo a".into(),
395 },
396 timeout: Duration::from_secs(5),
397 required: true,
398 changeset_aware: false,
399 },
400 Step {
401 name: "echo-b".into(),
402 step_type: StepType::Command {
403 run: "echo b".into(),
404 },
405 timeout: Duration::from_secs(5),
406 required: true,
407 changeset_aware: false,
408 },
409 ],
410 }],
411 allowed_commands: vec![],
412 };
413
414 let exec = ProcessExecutor::new();
415 let (tx, mut rx) = mpsc::channel(32);
416 let dir = std::env::temp_dir();
417
418 let passed =
419 run_workflow(&wf, &exec, &dir, &[], &HashMap::new(), &tx, None, None, None).await;
420 drop(tx);
421 assert!(passed);
422
423 let mut results = Vec::new();
424 while let Some(r) = rx.recv().await {
425 results.push(r);
426 }
427 assert_eq!(results.len(), 2);
428 }
429}