Skip to main content

dk_runner/
scheduler.rs

1use std::collections::HashMap;
2use std::path::Path;
3use std::sync::Arc;
4
5use tokio::sync::mpsc;
6use tracing::info;
7use uuid::Uuid;
8
9use dk_engine::repo::Engine;
10
11use crate::changeset::scope_command_to_changeset;
12use crate::executor::{Executor, StepOutput, StepStatus};
13use crate::findings::{Finding, Suggestion};
14use crate::steps::{agent_review, command, human_approve, semantic};
15use crate::workflow::types::{Stage, Step, StepType, Workflow};
16
17/// Result of running a single step, with metadata for streaming.
18#[derive(Debug, Clone)]
19pub struct StepResult {
20    pub stage_name: String,
21    pub step_name: String,
22    pub status: StepStatus,
23    pub output: String,
24    pub required: bool,
25    pub findings: Vec<Finding>,
26    pub suggestions: Vec<Suggestion>,
27}
28
29/// Run an entire workflow: stages sequentially, steps within parallel stages concurrently.
30/// Sends `StepResult`s to `tx` as each step completes. Returns `true` if all required steps passed.
31///
32/// `engine` and `repo_id` are optional — when provided, the semantic step uses the full
33/// Engine-backed analysis. Pass `None` for both in tests or contexts without an Engine.
34pub async fn run_workflow(
35    workflow: &Workflow,
36    executor: &dyn Executor,
37    work_dir: &Path,
38    changeset_files: &[String],
39    env: &HashMap<String, String>,
40    tx: &mpsc::Sender<StepResult>,
41    engine: Option<&Arc<Engine>>,
42    repo_id: Option<Uuid>,
43    changeset_id: Option<Uuid>,
44) -> bool {
45    let mut all_passed = true;
46
47    for stage in &workflow.stages {
48        info!(stage = %stage.name, parallel = stage.parallel, "running stage");
49
50        let results = if stage.parallel {
51            run_stage_parallel(stage, executor, work_dir, changeset_files, env, engine, repo_id, changeset_id)
52                .await
53        } else {
54            run_stage_sequential(stage, executor, work_dir, changeset_files, env, engine, repo_id, changeset_id)
55                .await
56        };
57
58        for result in results {
59            if result.status != StepStatus::Pass && result.required {
60                all_passed = false;
61            }
62            let _ = tx.send(result).await;
63        }
64    }
65
66    all_passed
67}
68
69async fn run_stage_parallel(
70    stage: &Stage,
71    executor: &dyn Executor,
72    work_dir: &Path,
73    changeset_files: &[String],
74    env: &HashMap<String, String>,
75    engine: Option<&Arc<Engine>>,
76    repo_id: Option<Uuid>,
77    changeset_id: Option<Uuid>,
78) -> Vec<StepResult> {
79    let mut futures = Vec::new();
80    for step in &stage.steps {
81        futures.push(run_single_step(
82            &stage.name,
83            step,
84            executor,
85            work_dir,
86            changeset_files,
87            env,
88            engine,
89            repo_id,
90            changeset_id,
91        ));
92    }
93    futures::future::join_all(futures).await
94}
95
96async fn run_stage_sequential(
97    stage: &Stage,
98    executor: &dyn Executor,
99    work_dir: &Path,
100    changeset_files: &[String],
101    env: &HashMap<String, String>,
102    engine: Option<&Arc<Engine>>,
103    repo_id: Option<Uuid>,
104    changeset_id: Option<Uuid>,
105) -> Vec<StepResult> {
106    let mut results = Vec::new();
107    for step in &stage.steps {
108        let result = run_single_step(
109            &stage.name,
110            step,
111            executor,
112            work_dir,
113            changeset_files,
114            env,
115            engine,
116            repo_id,
117            changeset_id,
118        )
119        .await;
120        let failed_required = step.required && result.status != StepStatus::Pass;
121        results.push(result);
122        // Abort early if a required step failed — no point running subsequent
123        // steps (e.g., cargo test after cargo check fails with compile errors)
124        if failed_required {
125            tracing::warn!(
126                stage = %stage.name,
127                step = %step.name,
128                "required step failed — aborting remaining steps in sequential stage"
129            );
130            break;
131        }
132    }
133    results
134}
135
136async fn run_single_step(
137    stage_name: &str,
138    step: &Step,
139    executor: &dyn Executor,
140    work_dir: &Path,
141    changeset_files: &[String],
142    env: &HashMap<String, String>,
143    engine: Option<&Arc<Engine>>,
144    repo_id: Option<Uuid>,
145    changeset_id: Option<Uuid>,
146) -> StepResult {
147    info!(step = %step.name, "running step");
148
149    match &step.step_type {
150        StepType::Command { run } => {
151            let cmd = if step.changeset_aware {
152                scope_command_to_changeset(run, changeset_files)
153                    .unwrap_or_else(|| run.clone())
154            } else {
155                run.clone()
156            };
157            let output =
158                match command::run_command_step(executor, &cmd, work_dir, step.timeout, env).await {
159                    Ok(out) => out,
160                    Err(e) => StepOutput {
161                        status: StepStatus::Fail,
162                        stdout: String::new(),
163                        stderr: e.to_string(),
164                        duration: std::time::Duration::ZERO,
165                    },
166                };
167
168            let combined_output = if output.stderr.is_empty() {
169                output.stdout
170            } else {
171                format!("{}{}", output.stdout, output.stderr)
172            };
173
174            StepResult {
175                stage_name: stage_name.to_string(),
176                step_name: step.name.clone(),
177                status: output.status,
178                output: combined_output,
179                required: step.required,
180                findings: Vec::new(),
181                suggestions: Vec::new(),
182            }
183        }
184        StepType::Semantic { checks } => {
185            if let (Some(eng), Some(rid)) = (engine, repo_id) {
186                // Full Engine-backed semantic analysis
187                let (output, findings, suggestions) = semantic::run_semantic_step(
188                    eng,
189                    rid,
190                    changeset_files,
191                    work_dir,
192                    checks,
193                )
194                .await;
195
196                let combined_output = if output.stderr.is_empty() {
197                    output.stdout
198                } else {
199                    format!("{}{}", output.stdout, output.stderr)
200                };
201
202                StepResult {
203                    stage_name: stage_name.to_string(),
204                    step_name: step.name.clone(),
205                    status: output.status,
206                    output: combined_output,
207                    required: step.required,
208                    findings,
209                    suggestions,
210                }
211            } else {
212                // Fallback to simple shim (no Engine available)
213                let output = semantic::run_semantic_step_simple(checks).await;
214
215                let combined_output = if output.stderr.is_empty() {
216                    output.stdout
217                } else {
218                    format!("{}{}", output.stdout, output.stderr)
219                };
220
221                StepResult {
222                    stage_name: stage_name.to_string(),
223                    step_name: step.name.clone(),
224                    status: output.status,
225                    output: combined_output,
226                    required: step.required,
227                    findings: Vec::new(),
228                    suggestions: Vec::new(),
229                }
230            }
231        }
232        StepType::AgentReview { prompt } => {
233            let provider = agent_review::claude::ClaudeReviewProvider::from_env();
234            if let Some(provider) = provider {
235                let mut diff = String::new();
236                let mut files = Vec::new();
237                for path in changeset_files {
238                    let full_path = work_dir.join(path);
239                    if let Ok(content) = tokio::fs::read_to_string(&full_path).await {
240                        diff.push_str(&format!("--- {path}\n+++ {path}\n{content}\n"));
241                        files.push(agent_review::provider::FileContext {
242                            path: path.clone(),
243                            content,
244                        });
245                    }
246                }
247                let (output, findings, suggestions) =
248                    agent_review::run_agent_review_step_with_provider(
249                        &provider, &diff, files, prompt,
250                    )
251                    .await;
252                return StepResult {
253                    stage_name: stage_name.to_string(),
254                    step_name: step.name.clone(),
255                    status: output.status,
256                    output: if output.stderr.is_empty() {
257                        output.stdout
258                    } else {
259                        format!("{}{}", output.stdout, output.stderr)
260                    },
261                    required: step.required,
262                    findings,
263                    suggestions,
264                };
265            }
266            // No provider: use legacy stub
267            let output = agent_review::run_agent_review_step(prompt).await;
268            StepResult {
269                stage_name: stage_name.to_string(),
270                step_name: step.name.clone(),
271                status: output.status,
272                output: if output.stderr.is_empty() {
273                    output.stdout
274                } else {
275                    format!("{}{}", output.stdout, output.stderr)
276                },
277                required: step.required,
278                findings: Vec::new(),
279                suggestions: Vec::new(),
280            }
281        }
282        StepType::HumanApprove => {
283            if let (Some(eng), Some(cid)) = (engine, changeset_id) {
284                let (output, findings) = human_approve::run_human_approve_step_with_engine(
285                    eng, cid, Some(step.timeout),
286                ).await;
287                return StepResult {
288                    stage_name: stage_name.to_string(),
289                    step_name: step.name.clone(),
290                    status: output.status,
291                    output: if output.stderr.is_empty() { output.stdout } else { format!("{}{}", output.stdout, output.stderr) },
292                    required: step.required,
293                    findings,
294                    suggestions: Vec::new(),
295                };
296            }
297            let output = human_approve::run_human_approve_step().await;
298            StepResult {
299                stage_name: stage_name.to_string(),
300                step_name: step.name.clone(),
301                status: output.status,
302                output: if output.stderr.is_empty() { output.stdout } else { format!("{}{}", output.stdout, output.stderr) },
303                required: step.required,
304                findings: Vec::new(),
305                suggestions: Vec::new(),
306            }
307        }
308    }
309}
310
311#[cfg(test)]
312mod tests {
313    use super::*;
314    use crate::executor::process::ProcessExecutor;
315    use crate::workflow::types::*;
316    use std::time::Duration;
317
318    #[tokio::test]
319    async fn test_run_workflow_passes() {
320        let wf = Workflow {
321            name: "test".into(),
322            timeout: Duration::from_secs(30),
323            stages: vec![Stage {
324                name: "checks".into(),
325                parallel: false,
326                steps: vec![Step {
327                    name: "echo-test".into(),
328                    step_type: StepType::Command {
329                        run: "echo hello".into(),
330                    },
331                    timeout: Duration::from_secs(5),
332                    required: true,
333                    changeset_aware: false,
334                }],
335            }],
336            allowed_commands: vec![],
337        };
338
339        let exec = ProcessExecutor::new();
340        let (tx, mut rx) = mpsc::channel(32);
341        let dir = std::env::temp_dir();
342
343        let passed =
344            run_workflow(&wf, &exec, &dir, &[], &HashMap::new(), &tx, None, None, None).await;
345        drop(tx);
346        assert!(passed);
347        let result = rx.recv().await.unwrap();
348        assert_eq!(result.status, StepStatus::Pass);
349    }
350
351    #[tokio::test]
352    async fn test_failing_required_step() {
353        let wf = Workflow {
354            name: "test".into(),
355            timeout: Duration::from_secs(30),
356            stages: vec![Stage {
357                name: "checks".into(),
358                parallel: false,
359                steps: vec![Step {
360                    name: "disallowed".into(),
361                    step_type: StepType::Command {
362                        run: "false_cmd_not_in_allowlist".into(),
363                    },
364                    timeout: Duration::from_secs(5),
365                    required: true,
366                    changeset_aware: false,
367                }],
368            }],
369            allowed_commands: vec![],
370        };
371
372        let exec = ProcessExecutor::new();
373        let (tx, _rx) = mpsc::channel(32);
374        let dir = std::env::temp_dir();
375
376        let passed =
377            run_workflow(&wf, &exec, &dir, &[], &HashMap::new(), &tx, None, None, None).await;
378        drop(tx);
379        assert!(!passed);
380    }
381
382    #[tokio::test]
383    async fn test_parallel_stage() {
384        let wf = Workflow {
385            name: "test".into(),
386            timeout: Duration::from_secs(30),
387            stages: vec![Stage {
388                name: "parallel-checks".into(),
389                parallel: true,
390                steps: vec![
391                    Step {
392                        name: "echo-a".into(),
393                        step_type: StepType::Command {
394                            run: "echo a".into(),
395                        },
396                        timeout: Duration::from_secs(5),
397                        required: true,
398                        changeset_aware: false,
399                    },
400                    Step {
401                        name: "echo-b".into(),
402                        step_type: StepType::Command {
403                            run: "echo b".into(),
404                        },
405                        timeout: Duration::from_secs(5),
406                        required: true,
407                        changeset_aware: false,
408                    },
409                ],
410            }],
411            allowed_commands: vec![],
412        };
413
414        let exec = ProcessExecutor::new();
415        let (tx, mut rx) = mpsc::channel(32);
416        let dir = std::env::temp_dir();
417
418        let passed =
419            run_workflow(&wf, &exec, &dir, &[], &HashMap::new(), &tx, None, None, None).await;
420        drop(tx);
421        assert!(passed);
422
423        let mut results = Vec::new();
424        while let Some(r) = rx.recv().await {
425            results.push(r);
426        }
427        assert_eq!(results.len(), 2);
428    }
429}