Skip to main content

dk_runner/
runner.rs

1use std::collections::HashMap;
2use std::path::Path;
3use std::sync::Arc;
4use std::time::Duration;
5
6use anyhow::{Context, Result};
7use tokio::sync::mpsc;
8use tracing::info;
9use uuid::Uuid;
10
11use dk_engine::repo::Engine;
12
13use crate::executor::Executor;
14use crate::scheduler::{self, StepResult};
15use crate::workflow::parser::parse_yaml_workflow_file;
16use crate::workflow::types::{Stage, Step, StepType, Workflow};
17use crate::workflow::validator::validate_workflow;
18
19/// The top-level runner that loads workflows and executes them.
20pub struct Runner {
21    engine: Arc<Engine>,
22    executor: Box<dyn Executor>,
23}
24
25impl Runner {
26    pub fn new(engine: Arc<Engine>, executor: Box<dyn Executor>) -> Self {
27        Self { engine, executor }
28    }
29
30    /// Run a verification pipeline for a changeset.
31    pub async fn verify(
32        &self,
33        changeset_id: Uuid,
34        repo_name: &str,
35        tx: mpsc::Sender<StepResult>,
36    ) -> Result<bool> {
37        let (repo_id, repo_dir) = {
38            let (repo_id, git_repo) = self.engine.get_repo(repo_name).await?;
39            // GitRepository::path() already returns the working tree directory
40            (repo_id, git_repo.path().to_path_buf())
41        };
42
43        // Create a temp directory with the full repo content, then overlay
44        // changeset files so that cargo/build tools find Cargo.toml and
45        // all workspace metadata alongside the modified source files.
46        let changeset_data = self.engine.changeset_store().get_files(changeset_id).await?;
47        let temp_dir = tempfile::tempdir().context("failed to create temp dir for verify")?;
48        let work_dir = temp_dir.path().to_path_buf();
49
50        // Copy repo working tree into temp dir so Cargo.toml, Cargo.lock,
51        // and all other workspace files are present for build tools.
52        copy_dir_recursive(&repo_dir, &work_dir).await
53            .context("failed to copy repo into temp dir")?;
54
55        // Overlay changeset files on top of the repo copy.
56        let mut changeset_paths: Vec<String> = Vec::with_capacity(changeset_data.len());
57        for file in &changeset_data {
58            changeset_paths.push(file.file_path.clone());
59            if let Some(content) = &file.content {
60                // Security: reject dangerous paths BEFORE any filesystem operations.
61                // 1. Reject traversal components (../)
62                if file.file_path.contains("..") {
63                    anyhow::bail!(
64                        "changeset file path contains traversal component: '{}'",
65                        file.file_path
66                    );
67                }
68                // 2. Reject absolute paths (would discard work_dir base in Path::join)
69                if file.file_path.starts_with('/') || file.file_path.starts_with('\\') {
70                    anyhow::bail!(
71                        "changeset file path is absolute: '{}'",
72                        file.file_path
73                    );
74                }
75                let dest = work_dir.join(&file.file_path);
76                // 3. Lexical prefix check: verify joined path stays under work_dir.
77                //    This catches any remaining edge cases without touching the filesystem.
78                if !dest.starts_with(&work_dir) {
79                    anyhow::bail!(
80                        "changeset file path escapes sandbox: '{}' resolves outside work_dir",
81                        file.file_path
82                    );
83                }
84                // Safe to create directories and write — path is validated
85                if let Some(parent) = dest.parent() {
86                    tokio::fs::create_dir_all(parent).await?;
87                }
88                tokio::fs::write(&dest, content).await?;
89            }
90        }
91
92        info!(
93            "copied repo and overlaid {} changeset files into {} for verification",
94            changeset_paths.len(),
95            work_dir.display()
96        );
97
98        // Intentionally load the pipeline from the canonical repo directory, not from
99        // work_dir (the changeset overlay). This prevents a submitted changeset from
100        // hijacking its own verification pipeline for security.
101        let workflow = self.load_workflow(&repo_dir, repo_id).await?;
102
103        // Auto-none: no pipeline configured, auto-approve with audit trail
104        if workflow.stages.is_empty() {
105            tracing::warn!(
106                changeset_id = %changeset_id,
107                repo = %repo_name,
108                "auto-approving changeset: no verification pipeline and no recognized project type"
109            );
110            return Ok(true);
111        }
112
113        validate_workflow(&workflow).context("workflow validation failed")?;
114
115        let mut env = HashMap::new();
116        env.insert("DKOD_CHANGESET_ID".to_string(), changeset_id.to_string());
117        env.insert("DKOD_REPO_ID".to_string(), repo_id.to_string());
118
119        let passed = tokio::time::timeout(
120            workflow.timeout,
121            scheduler::run_workflow(
122                &workflow,
123                self.executor.as_ref(),
124                &work_dir,
125                &changeset_paths,
126                &env,
127                &tx,
128                Some(&self.engine),
129                Some(repo_id),
130                Some(changeset_id),
131            ),
132        )
133        .await
134        .unwrap_or_else(|_| {
135            tracing::warn!("workflow '{}' timed out after {:?}", workflow.name, workflow.timeout);
136            false
137        });
138
139        // temp_dir cleaned up on drop
140        Ok(passed)
141    }
142
143    async fn load_workflow(&self, repo_dir: &Path, repo_id: Uuid) -> Result<Workflow> {
144        // Priority 1: .dkod/pipeline.yaml in repo
145        let yaml_path = repo_dir.join(".dkod/pipeline.yaml");
146        if yaml_path.exists() {
147            info!("loading workflow from {}", yaml_path.display());
148            let workflow = parse_yaml_workflow_file(&yaml_path).await?;
149            if workflow.stages.is_empty() {
150                anyhow::bail!(
151                    "pipeline.yaml exists but defines no stages — refusing to auto-approve; \
152                     add at least one stage or remove the file to use auto-detection"
153                );
154            }
155            return Ok(workflow);
156        }
157
158        // Check for legacy .dekode/pipeline.toml and warn about migration
159        let legacy_toml = repo_dir.join(".dekode/pipeline.toml");
160        if legacy_toml.exists() {
161            tracing::warn!(
162                path = %legacy_toml.display(),
163                "found legacy .dekode/pipeline.toml \u{2014} this format is no longer loaded; please migrate to .dkod/pipeline.yaml"
164            );
165        }
166
167        // Priority 2: DB-stored pipeline
168        let db_steps = self.engine
169            .pipeline_store()
170            .get_pipeline(repo_id)
171            .await
172            .unwrap_or_default();
173
174        if !db_steps.is_empty() {
175            info!(
176                "loading workflow from DB pipeline ({} steps)",
177                db_steps.len()
178            );
179            return Ok(db_pipeline_to_workflow(db_steps));
180        }
181
182        // Priority 3: Auto-detect from project files
183        info!("auto-detecting verification workflow from project files");
184        Ok(detect_workflow(repo_dir))
185    }
186}
187
188fn db_pipeline_to_workflow(steps: Vec<dk_engine::pipeline::PipelineStep>) -> Workflow {
189    let resolved_steps: Vec<Step> = steps
190        .into_iter()
191        .map(|s| {
192            let command = s
193                .config
194                .get("command")
195                .and_then(|v| v.as_str())
196                .unwrap_or("echo 'no command configured'")
197                .to_string();
198            let timeout_secs = s
199                .config
200                .get("timeout_secs")
201                .and_then(|v| v.as_u64())
202                .unwrap_or(120);
203
204            let step_type = match s.step_type.as_str() {
205                "agent-review" => StepType::AgentReview {
206                    prompt: "Review this changeset".to_string(),
207                },
208                "human-approve" => StepType::HumanApprove,
209                _ => StepType::Command { run: command },
210            };
211
212            Step {
213                name: s.step_type.clone(),
214                step_type,
215                timeout: Duration::from_secs(timeout_secs),
216                required: s.required,
217                changeset_aware: false,
218                work_dir: None,
219            }
220        })
221        .collect();
222
223    Workflow {
224        name: "db-pipeline".to_string(),
225        timeout: Duration::from_secs(600),
226        stages: vec![Stage {
227            name: "pipeline".to_string(),
228            parallel: false,
229            steps: resolved_steps,
230        }],
231        allowed_commands: vec![],
232    }
233}
234
235/// Auto-detect verification workflow from project files in the repo.
236/// Scans the repo root AND immediate subdirectories for ALL known language
237/// markers and creates steps for each.  When a marker is found in a
238/// subdirectory, `step.work_dir` is set so the scheduler runs the command
239/// in the correct directory.
240/// Returns a no-stage workflow (auto-approve) if no known project type found.
241pub fn detect_workflow(repo_dir: &Path) -> Workflow {
242    use std::path::PathBuf;
243
244    let mut steps: Vec<Step> = Vec::new();
245
246    // Directories to scan: repo root + immediate subdirectories.
247    // Each entry is (subdir_name, full_path).  Root uses "" as the name.
248    // Skip hidden dirs, node_modules, and target to avoid noise.
249    let skip = ["node_modules", "target"];
250    let mut scan_dirs: Vec<(String, std::path::PathBuf)> = vec![("".to_string(), repo_dir.to_path_buf())];
251    if let Ok(entries) = std::fs::read_dir(repo_dir) {
252        for entry in entries.flatten() {
253            let path = entry.path();
254            if !path.is_dir() {
255                continue;
256            }
257            let name = entry.file_name();
258            let name_str = name.to_string_lossy().to_string();
259            if name_str.starts_with('.') || skip.contains(&name_str.as_str()) {
260                continue;
261            }
262            scan_dirs.push((name_str, path));
263        }
264    }
265
266    // Sort subdirectories alphabetically for deterministic ordering.
267    // Keep root (index 0) first, sort only subdirs.
268    if scan_dirs.len() > 1 {
269        scan_dirs[1..].sort_by(|a, b| a.0.cmp(&b.0));
270    }
271
272    // First pass: collect languages present in the root directory so that
273    // subdirectory matches of the same language are suppressed (root wins).
274    // Sibling subdirectories are NOT deduplicated against each other — e.g.
275    // frontend/package.json and backend/package.json both get steps.
276    let root_dir = &scan_dirs[0].1;
277    let mut root_languages: std::collections::HashSet<&str> = std::collections::HashSet::new();
278    if root_dir.join("Cargo.toml").exists() {
279        root_languages.insert("rust");
280    }
281    if root_dir.join("package.json").exists() {
282        let is_bun = root_dir.join("bun.lock").exists() || root_dir.join("bun.lockb").exists();
283        root_languages.insert(if is_bun { "bun" } else { "node" });
284    }
285    if root_dir.join("pyproject.toml").exists() || root_dir.join("requirements.txt").exists() {
286        root_languages.insert("python");
287    }
288    if root_dir.join("go.mod").exists() {
289        root_languages.insert("go");
290    }
291
292    for (subdir_name, dir) in &scan_dirs {
293        // Determine work_dir for steps detected in this directory.
294        let is_root = subdir_name.is_empty();
295        let step_work_dir: Option<PathBuf> = if is_root {
296            None
297        } else {
298            Some(PathBuf::from(subdir_name))
299        };
300
301        // ── Rust ──
302        if dir.join("Cargo.toml").exists() && (is_root || !root_languages.contains("rust")) {
303            let name_prefix = if is_root {
304                "rust".to_string()
305            } else {
306                format!("rust({subdir_name})")
307            };
308            steps.push(Step {
309                name: format!("{name_prefix}:check"),
310                step_type: StepType::Command { run: "cargo check".to_string() },
311                timeout: Duration::from_secs(60),
312                required: true,
313                changeset_aware: true,
314                work_dir: step_work_dir.clone(),
315            });
316            steps.push(Step {
317                name: format!("{name_prefix}:test"),
318                step_type: StepType::Command { run: "cargo test".to_string() },
319                timeout: Duration::from_secs(60),
320                required: true,
321                changeset_aware: true,
322                work_dir: step_work_dir.clone(),
323            });
324        }
325
326        // ── Node / Bun ──
327        if dir.join("package.json").exists() {
328            let is_bun = dir.join("bun.lock").exists()
329                || dir.join("bun.lockb").exists();
330            let lang_key = if is_bun { "bun" } else { "node" };
331            if is_root || !root_languages.contains(lang_key) {
332                let (label, install_cmd, test_cmd) = if is_bun {
333                    ("bun", "bun install --frozen-lockfile", "bun test")
334                } else {
335                    ("node", "npm ci", "npm test")
336                };
337                let name_prefix = if is_root {
338                    label.to_string()
339                } else {
340                    format!("{label}({subdir_name})")
341                };
342                steps.push(Step {
343                    name: format!("{name_prefix}:install"),
344                    step_type: StepType::Command { run: install_cmd.to_string() },
345                    timeout: Duration::from_secs(120),
346                    required: true,
347                    changeset_aware: false,
348                    work_dir: step_work_dir.clone(),
349                });
350                steps.push(Step {
351                    name: format!("{name_prefix}:test"),
352                    step_type: StepType::Command { run: test_cmd.to_string() },
353                    timeout: Duration::from_secs(60),
354                    required: true,
355                    changeset_aware: true,
356                    work_dir: step_work_dir.clone(),
357                });
358            }
359        }
360
361        // ── Python ──
362        if (dir.join("pyproject.toml").exists()
363            || dir.join("requirements.txt").exists())
364            && (is_root || !root_languages.contains("python"))
365        {
366            let name_prefix = if is_root {
367                "python".to_string()
368            } else {
369                format!("python({subdir_name})")
370            };
371            if dir.join("pyproject.toml").exists() {
372                steps.push(Step {
373                    name: format!("{name_prefix}:install"),
374                    step_type: StepType::Command { run: "pip install -e .".to_string() },
375                    timeout: Duration::from_secs(120),
376                    required: true,
377                    changeset_aware: false,
378                    work_dir: step_work_dir.clone(),
379                });
380            }
381            if dir.join("requirements.txt").exists() {
382                steps.push(Step {
383                    name: format!("{name_prefix}:install-deps"),
384                    step_type: StepType::Command {
385                        run: "pip install -r requirements.txt".to_string(),
386                    },
387                    timeout: Duration::from_secs(120),
388                    required: true,
389                    changeset_aware: false,
390                    work_dir: step_work_dir.clone(),
391                });
392            }
393            steps.push(Step {
394                name: format!("{name_prefix}:test"),
395                step_type: StepType::Command { run: "pytest".to_string() },
396                timeout: Duration::from_secs(60),
397                required: true,
398                changeset_aware: true,
399                work_dir: step_work_dir.clone(),
400            });
401        }
402
403        // ── Go ──
404        if dir.join("go.mod").exists() && (is_root || !root_languages.contains("go")) {
405            let name_prefix = if is_root {
406                "go".to_string()
407            } else {
408                format!("go({subdir_name})")
409            };
410            steps.push(Step {
411                name: format!("{name_prefix}:build"),
412                step_type: StepType::Command { run: "go build ./...".to_string() },
413                timeout: Duration::from_secs(60),
414                required: true,
415                changeset_aware: true,
416                work_dir: step_work_dir.clone(),
417            });
418            steps.push(Step {
419                name: format!("{name_prefix}:vet"),
420                step_type: StepType::Command { run: "go vet ./...".to_string() },
421                timeout: Duration::from_secs(60),
422                required: true,
423                changeset_aware: true,
424                work_dir: step_work_dir.clone(),
425            });
426            steps.push(Step {
427                name: format!("{name_prefix}:test"),
428                step_type: StepType::Command { run: "go test ./...".to_string() },
429                timeout: Duration::from_secs(60),
430                required: true,
431                changeset_aware: true,
432                work_dir: step_work_dir.clone(),
433            });
434        }
435    }
436
437    if steps.is_empty() {
438        return Workflow {
439            name: "auto-none".to_string(),
440            timeout: Duration::from_secs(30),
441            allowed_commands: vec![],
442            stages: vec![],
443        };
444    }
445
446    let unique_langs = steps.iter().map(|s| s.name.split(':').next().unwrap_or("")).collect::<std::collections::HashSet<_>>();
447    let unique_work_dirs = steps.iter().map(|s| s.work_dir.as_deref()).collect::<std::collections::HashSet<_>>();
448    let name = if unique_langs.len() > 1 || unique_work_dirs.len() > 1 {
449        "auto-polyglot".to_string()
450    } else {
451        format!("auto-{}", steps[0].name.split(':').next().unwrap_or("unknown"))
452    };
453
454    // Derive timeout from the sum of individual step timeouts (with a floor of 60s).
455    let total_timeout_secs = steps.iter().map(|s| s.timeout.as_secs()).sum::<u64>().max(60);
456
457    Workflow {
458        name,
459        timeout: Duration::from_secs(total_timeout_secs),
460        allowed_commands: vec![],
461        stages: vec![Stage {
462            name: "checks".to_string(),
463            parallel: false,
464            steps,
465        }],
466    }
467}
468
469/// Recursively copy a directory tree, skipping the `.git` directory.
470async fn copy_dir_recursive(src: &Path, dst: &Path) -> Result<()> {
471    tokio::fs::create_dir_all(dst).await?;
472    let mut entries = tokio::fs::read_dir(src).await?;
473    while let Some(entry) = entries.next_entry().await? {
474        let file_name = entry.file_name();
475        // Skip .git to avoid copying potentially large git objects
476        if file_name == ".git" {
477            continue;
478        }
479        let src_path = entry.path();
480        let dst_path = dst.join(&file_name);
481        let file_type = entry.file_type().await?;
482        if file_type.is_dir() {
483            Box::pin(copy_dir_recursive(&src_path, &dst_path)).await?;
484        } else if file_type.is_symlink() {
485            let target = tokio::fs::read_link(&src_path).await?;
486            // Security: only recreate relative symlinks whose resolved target
487            // stays within the destination tree. This prevents sandbox escapes
488            // via crafted symlinks (e.g., link -> /etc/passwd, link -> ../../..).
489            let target_str = target.to_string_lossy();
490            if target_str.starts_with('/') || target_str.contains("..") {
491                tracing::warn!(
492                    src = %src_path.display(),
493                    target = %target.display(),
494                    "skipping symlink that points outside sandbox"
495                );
496                continue;
497            }
498            #[cfg(unix)]
499            tokio::fs::symlink(target, &dst_path).await?;
500        } else {
501            tokio::fs::copy(&src_path, &dst_path).await?;
502        }
503    }
504    Ok(())
505}
506
507#[cfg(test)]
508mod tests {
509    use super::*;
510
511    #[tokio::test]
512    async fn test_detect_workflow_rust() {
513        let dir = tempfile::tempdir().unwrap();
514        tokio::fs::write(dir.path().join("Cargo.toml"), b"[package]\nname = \"test\"")
515            .await.unwrap();
516        let wf = detect_workflow(dir.path());
517        assert_eq!(wf.name, "auto-rust");
518        assert_eq!(wf.stages.len(), 1);
519        assert_eq!(wf.stages[0].steps.len(), 2);
520    }
521
522    #[tokio::test]
523    async fn test_detect_workflow_bun() {
524        let dir = tempfile::tempdir().unwrap();
525        tokio::fs::write(dir.path().join("package.json"), b"{}").await.unwrap();
526        tokio::fs::write(dir.path().join("bun.lock"), b"").await.unwrap();
527        let wf = detect_workflow(dir.path());
528        assert_eq!(wf.name, "auto-bun");
529        assert_eq!(wf.stages[0].steps.len(), 2);
530        let cmds: Vec<_> = wf.stages[0].steps.iter().filter_map(|s| {
531            if let StepType::Command { run } = &s.step_type { Some(run.as_str()) } else { None }
532        }).collect();
533        assert!(cmds.contains(&"bun install --frozen-lockfile"));
534        assert!(cmds.contains(&"bun test"));
535    }
536
537    #[tokio::test]
538    async fn test_detect_workflow_bun_lockb() {
539        let dir = tempfile::tempdir().unwrap();
540        tokio::fs::write(dir.path().join("package.json"), b"{}").await.unwrap();
541        tokio::fs::write(dir.path().join("bun.lockb"), b"\x00").await.unwrap();
542        let wf = detect_workflow(dir.path());
543        assert_eq!(wf.name, "auto-bun");
544        assert_eq!(wf.stages[0].steps.len(), 2);
545        let cmds: Vec<_> = wf.stages[0].steps.iter().filter_map(|s| {
546            if let StepType::Command { run } = &s.step_type { Some(run.as_str()) } else { None }
547        }).collect();
548        assert!(cmds.contains(&"bun install --frozen-lockfile"));
549        assert!(cmds.contains(&"bun test"));
550    }
551
552    #[tokio::test]
553    async fn test_detect_workflow_npm() {
554        let dir = tempfile::tempdir().unwrap();
555        tokio::fs::write(dir.path().join("package.json"), b"{}").await.unwrap();
556        let wf = detect_workflow(dir.path());
557        assert_eq!(wf.name, "auto-node");
558        assert_eq!(wf.stages[0].steps.len(), 2);
559        let cmds: Vec<_> = wf.stages[0].steps.iter().filter_map(|s| {
560            if let StepType::Command { run } = &s.step_type { Some(run.as_str()) } else { None }
561        }).collect();
562        assert!(cmds.contains(&"npm ci"));
563        assert!(cmds.contains(&"npm test"));
564    }
565
566    #[tokio::test]
567    async fn test_detect_workflow_python_pyproject() {
568        let dir = tempfile::tempdir().unwrap();
569        tokio::fs::write(dir.path().join("pyproject.toml"), b"[project]").await.unwrap();
570        let wf = detect_workflow(dir.path());
571        assert_eq!(wf.name, "auto-python");
572        // pyproject.toml only — install via pip install -e . plus test
573        assert_eq!(wf.stages[0].steps.len(), 2);
574        let cmds: Vec<_> = wf.stages[0].steps.iter().filter_map(|s| {
575            if let StepType::Command { run } = &s.step_type { Some(run.as_str()) } else { None }
576        }).collect();
577        assert!(cmds.contains(&"pip install -e ."));
578        assert!(cmds.contains(&"pytest"));
579    }
580
581    #[tokio::test]
582    async fn test_detect_workflow_python_dual_file() {
583        let dir = tempfile::tempdir().unwrap();
584        tokio::fs::write(dir.path().join("pyproject.toml"), b"[project]").await.unwrap();
585        tokio::fs::write(dir.path().join("requirements.txt"), b"pytest\nrequests").await.unwrap();
586        let wf = detect_workflow(dir.path());
587        assert_eq!(wf.name, "auto-python");
588        // Both files present — install pyproject + requirements + test
589        assert_eq!(wf.stages[0].steps.len(), 3);
590        let cmds: Vec<_> = wf.stages[0].steps.iter().filter_map(|s| {
591            if let StepType::Command { run } = &s.step_type { Some(run.as_str()) } else { None }
592        }).collect();
593        assert!(cmds.contains(&"pip install -e ."));
594        assert!(cmds.contains(&"pip install -r requirements.txt"));
595        assert!(cmds.contains(&"pytest"));
596    }
597
598    #[tokio::test]
599    async fn test_detect_workflow_python_requirements() {
600        let dir = tempfile::tempdir().unwrap();
601        tokio::fs::write(dir.path().join("requirements.txt"), b"pytest\nrequests").await.unwrap();
602        let wf = detect_workflow(dir.path());
603        assert_eq!(wf.name, "auto-python");
604        // requirements.txt only — install-deps + test
605        assert_eq!(wf.stages[0].steps.len(), 2);
606        let cmds: Vec<_> = wf.stages[0].steps.iter().filter_map(|s| {
607            if let StepType::Command { run } = &s.step_type { Some(run.as_str()) } else { None }
608        }).collect();
609        assert!(cmds.contains(&"pip install -r requirements.txt"));
610        assert!(cmds.contains(&"pytest"));
611    }
612
613    #[tokio::test]
614    async fn test_detect_workflow_python_both_files() {
615        let dir = tempfile::tempdir().unwrap();
616        tokio::fs::write(dir.path().join("pyproject.toml"), b"[project]").await.unwrap();
617        tokio::fs::write(dir.path().join("requirements.txt"), b"pytest\nrequests").await.unwrap();
618        let wf = detect_workflow(dir.path());
619        assert_eq!(wf.name, "auto-python");
620        // Both files — install-package + install-deps + test
621        assert_eq!(wf.stages[0].steps.len(), 3);
622        let cmds: Vec<_> = wf.stages[0].steps.iter().filter_map(|s| {
623            if let StepType::Command { run } = &s.step_type { Some(run.as_str()) } else { None }
624        }).collect();
625        assert!(cmds.contains(&"pip install -e ."));
626        assert!(cmds.contains(&"pip install -r requirements.txt"));
627        assert!(cmds.contains(&"pytest"));
628    }
629
630    #[tokio::test]
631    async fn test_detect_workflow_go() {
632        let dir = tempfile::tempdir().unwrap();
633        tokio::fs::write(dir.path().join("go.mod"), b"module example.com/test").await.unwrap();
634        let wf = detect_workflow(dir.path());
635        assert_eq!(wf.name, "auto-go");
636        assert_eq!(wf.stages[0].steps.len(), 3);
637    }
638
639    #[tokio::test]
640    async fn test_detect_workflow_unknown() {
641        let dir = tempfile::tempdir().unwrap();
642        let wf = detect_workflow(dir.path());
643        assert_eq!(wf.name, "auto-none");
644        assert!(wf.stages.is_empty());
645    }
646
647    #[tokio::test]
648    async fn test_copy_dir_recursive_copies_files() {
649        let src = tempfile::tempdir().unwrap();
650        let dst = tempfile::tempdir().unwrap();
651
652        tokio::fs::write(src.path().join("Cargo.toml"), b"[package]\nname = \"test\"")
653            .await
654            .unwrap();
655        tokio::fs::create_dir_all(src.path().join("src")).await.unwrap();
656        tokio::fs::write(src.path().join("src/main.rs"), b"fn main() {}")
657            .await
658            .unwrap();
659
660        // .git dir should be skipped
661        tokio::fs::create_dir_all(src.path().join(".git/objects")).await.unwrap();
662        tokio::fs::write(src.path().join(".git/HEAD"), b"ref: refs/heads/main")
663            .await
664            .unwrap();
665
666        copy_dir_recursive(src.path(), dst.path()).await.unwrap();
667
668        assert!(dst.path().join("Cargo.toml").exists(), "Cargo.toml must be at dst root");
669        assert!(dst.path().join("src/main.rs").exists(), "src/main.rs must exist");
670        assert!(!dst.path().join(".git").exists(), ".git must be skipped");
671    }
672
673    #[tokio::test]
674    async fn test_copy_dir_recursive_handles_symlinks() {
675        let src = tempfile::tempdir().unwrap();
676        let dst = tempfile::tempdir().unwrap();
677
678        // Create a regular file and a symlink to it
679        tokio::fs::write(src.path().join("real.txt"), b"hello").await.unwrap();
680        #[cfg(unix)]
681        tokio::fs::symlink("real.txt", src.path().join("link.txt")).await.unwrap();
682
683        copy_dir_recursive(src.path(), dst.path()).await.unwrap();
684
685        assert!(dst.path().join("real.txt").exists());
686        #[cfg(unix)]
687        {
688            let meta = tokio::fs::symlink_metadata(dst.path().join("link.txt")).await.unwrap();
689            assert!(meta.file_type().is_symlink(), "symlink should be preserved");
690            let target = tokio::fs::read_link(dst.path().join("link.txt")).await.unwrap();
691            assert_eq!(target.to_str().unwrap(), "real.txt");
692        }
693    }
694
695    #[tokio::test]
696    async fn test_copy_dir_recursive_handles_dir_symlinks() {
697        let src = tempfile::tempdir().unwrap();
698        let dst = tempfile::tempdir().unwrap();
699
700        // Create a real directory and a symlink to it
701        tokio::fs::create_dir_all(src.path().join("real_dir")).await.unwrap();
702        tokio::fs::write(src.path().join("real_dir/file.txt"), b"content").await.unwrap();
703        #[cfg(unix)]
704        tokio::fs::symlink("real_dir", src.path().join("linked_dir")).await.unwrap();
705
706        copy_dir_recursive(src.path(), dst.path()).await.unwrap();
707
708        assert!(dst.path().join("real_dir/file.txt").exists());
709        #[cfg(unix)]
710        {
711            let meta = tokio::fs::symlink_metadata(dst.path().join("linked_dir")).await.unwrap();
712            assert!(meta.file_type().is_symlink(), "dir symlink should be preserved");
713            let target = tokio::fs::read_link(dst.path().join("linked_dir")).await.unwrap();
714            assert_eq!(target.to_str().unwrap(), "real_dir");
715        }
716    }
717
718    #[test]
719    fn test_db_pipeline_conversion() {
720        let steps = vec![
721            dk_engine::pipeline::PipelineStep {
722                repo_id: Uuid::new_v4(),
723                step_order: 1,
724                step_type: "typecheck".to_string(),
725                config: serde_json::json!({"command": "cargo check", "timeout_secs": 120}),
726                required: true,
727            },
728            dk_engine::pipeline::PipelineStep {
729                repo_id: Uuid::new_v4(),
730                step_order: 2,
731                step_type: "test".to_string(),
732                config: serde_json::json!({"command": "cargo test", "timeout_secs": 300}),
733                required: true,
734            },
735        ];
736        let wf = db_pipeline_to_workflow(steps);
737        assert_eq!(wf.stages.len(), 1);
738        assert_eq!(wf.stages[0].steps.len(), 2);
739    }
740
741    #[tokio::test]
742    async fn test_detect_workflow_polyglot_rust_and_node() {
743        let dir = tempfile::tempdir().unwrap();
744        tokio::fs::write(dir.path().join("Cargo.toml"), b"[package]\nname = \"test\"").await.unwrap();
745        tokio::fs::write(dir.path().join("package.json"), b"{}").await.unwrap();
746        let wf = detect_workflow(dir.path());
747        assert_eq!(wf.name, "auto-polyglot");
748        assert_eq!(wf.stages.len(), 1);
749        let step_names: Vec<&str> = wf.stages[0].steps.iter().map(|s| s.name.as_str()).collect();
750        assert!(step_names.iter().any(|n| n.starts_with("rust:")), "missing rust steps");
751        assert!(step_names.iter().any(|n| n.starts_with("node:")), "missing node steps");
752    }
753
754    #[tokio::test]
755    async fn test_detect_workflow_polyglot_three_languages() {
756        let dir = tempfile::tempdir().unwrap();
757        tokio::fs::write(dir.path().join("Cargo.toml"), b"[package]\nname = \"test\"").await.unwrap();
758        tokio::fs::write(dir.path().join("package.json"), b"{}").await.unwrap();
759        tokio::fs::write(dir.path().join("pyproject.toml"), b"[project]\nname = \"test\"").await.unwrap();
760        let wf = detect_workflow(dir.path());
761        assert_eq!(wf.name, "auto-polyglot");
762        let step_names: Vec<&str> = wf.stages[0].steps.iter().map(|s| s.name.as_str()).collect();
763        assert!(step_names.iter().any(|n| n.starts_with("rust:")), "missing rust");
764        assert!(step_names.iter().any(|n| n.starts_with("node:")), "missing node");
765        assert!(step_names.iter().any(|n| n.starts_with("python:")), "missing python");
766    }
767
768    #[tokio::test]
769    async fn test_detect_workflow_polyglot_bun_and_go() {
770        let dir = tempfile::tempdir().unwrap();
771        tokio::fs::write(dir.path().join("package.json"), b"{}").await.unwrap();
772        tokio::fs::write(dir.path().join("bun.lock"), b"").await.unwrap();
773        tokio::fs::write(dir.path().join("go.mod"), b"module example.com/test").await.unwrap();
774        let wf = detect_workflow(dir.path());
775        assert_eq!(wf.name, "auto-polyglot");
776        let step_names: Vec<&str> = wf.stages[0].steps.iter().map(|s| s.name.as_str()).collect();
777        assert!(step_names.iter().any(|n| n.starts_with("bun:")), "missing bun steps");
778        assert!(step_names.iter().any(|n| n.starts_with("go:")), "missing go steps");
779    }
780
781    #[tokio::test]
782    async fn test_detect_workflow_subdirectory() {
783        let dir = tempfile::tempdir().unwrap();
784        // Create markers in subdirectories, not root
785        std::fs::create_dir_all(dir.path().join("rust")).unwrap();
786        std::fs::write(dir.path().join("rust/Cargo.toml"), "[package]\nname = \"test\"").unwrap();
787        std::fs::create_dir_all(dir.path().join("python")).unwrap();
788        std::fs::write(dir.path().join("python/requirements.txt"), "flask\n").unwrap();
789
790        let wf = detect_workflow(dir.path());
791        assert_eq!(wf.name, "auto-polyglot");
792
793        let steps: Vec<&Step> = wf.stages.iter()
794            .flat_map(|s| s.steps.iter())
795            .collect();
796
797        // Commands should be plain (no cd prefix) with work_dir set
798        // Subdirectory steps include the subdir in the name: rust(rust):check
799        let rust_steps: Vec<&&Step> = steps.iter()
800            .filter(|s| s.name.starts_with("rust("))
801            .collect();
802        assert!(!rust_steps.is_empty(), "should detect rust steps");
803        for step in &rust_steps {
804            assert_eq!(step.work_dir.as_ref().map(|p| p.to_str().unwrap()), Some("rust"),
805                "rust steps should have work_dir = 'rust'");
806            if let StepType::Command { run } = &step.step_type {
807                assert!(!run.contains("cd "), "commands should not contain cd prefix");
808            }
809        }
810
811        let python_steps: Vec<&&Step> = steps.iter()
812            .filter(|s| s.name.starts_with("python("))
813            .collect();
814        assert!(!python_steps.is_empty(), "should detect python steps");
815        for step in &python_steps {
816            assert_eq!(step.work_dir.as_ref().map(|p| p.to_str().unwrap()), Some("python"),
817                "python steps should have work_dir = 'python'");
818            if let StepType::Command { run } = &step.step_type {
819                assert!(!run.contains("cd "), "commands should not contain cd prefix");
820            }
821        }
822
823        // Verify deterministic ordering: python before rust (alphabetical)
824        let first_lang = steps[0].name.split(':').next().unwrap();
825        assert_eq!(first_lang, "python(python)", "python should come before rust (alphabetical subdirectory order)");
826    }
827
828    #[tokio::test]
829    async fn test_detect_workflow_same_language_sibling_subdirs() {
830        let dir = tempfile::tempdir().unwrap();
831        // Two sibling subdirectories with the same language (Node)
832        std::fs::create_dir_all(dir.path().join("backend")).unwrap();
833        std::fs::write(dir.path().join("backend/package.json"), "{}").unwrap();
834        std::fs::create_dir_all(dir.path().join("frontend")).unwrap();
835        std::fs::write(dir.path().join("frontend/package.json"), "{}").unwrap();
836
837        let wf = detect_workflow(dir.path());
838        assert_eq!(wf.name, "auto-polyglot");
839
840        // Both subdirectories should have steps, not just the first alphabetically
841        let work_dirs: Vec<Option<&Path>> = wf.stages.iter()
842            .flat_map(|s| s.steps.iter())
843            .map(|step| step.work_dir.as_deref())
844            .collect();
845
846        assert!(work_dirs.iter().any(|wd| wd == &Some(Path::new("backend"))),
847            "backend should have steps");
848        assert!(work_dirs.iter().any(|wd| wd == &Some(Path::new("frontend"))),
849            "frontend should have steps");
850
851        // Step names must include the subdirectory and be unique
852        let step_names: Vec<&str> = wf.stages.iter()
853            .flat_map(|s| s.steps.iter())
854            .map(|step| step.name.as_str())
855            .collect();
856
857        assert!(step_names.contains(&"node(backend):install"), "missing node(backend):install");
858        assert!(step_names.contains(&"node(backend):test"), "missing node(backend):test");
859        assert!(step_names.contains(&"node(frontend):install"), "missing node(frontend):install");
860        assert!(step_names.contains(&"node(frontend):test"), "missing node(frontend):test");
861
862        // All step names must be unique (no duplicates)
863        let unique_names: std::collections::HashSet<&str> = step_names.iter().copied().collect();
864        assert_eq!(step_names.len(), unique_names.len(),
865            "step names must be unique, got: {:?}", step_names);
866    }
867}