Skip to main content

dk_runner/
runner.rs

1use std::collections::HashMap;
2use std::path::Path;
3use std::sync::Arc;
4use std::time::Duration;
5
6use anyhow::{Context, Result};
7use tokio::sync::mpsc;
8use tracing::info;
9use uuid::Uuid;
10
11use dk_engine::repo::Engine;
12
13use crate::executor::Executor;
14use crate::scheduler::{self, StepResult};
15use crate::workflow::parser::parse_yaml_workflow_file;
16use crate::workflow::types::{Stage, Step, StepType, Workflow};
17use crate::workflow::validator::validate_workflow;
18
19/// The top-level runner that loads workflows and executes them.
20pub struct Runner {
21    engine: Arc<Engine>,
22    executor: Box<dyn Executor>,
23}
24
25impl Runner {
26    pub fn new(engine: Arc<Engine>, executor: Box<dyn Executor>) -> Self {
27        Self { engine, executor }
28    }
29
30    /// Run a verification pipeline for a changeset.
31    pub async fn verify(
32        &self,
33        changeset_id: Uuid,
34        repo_name: &str,
35        tx: mpsc::Sender<StepResult>,
36    ) -> Result<bool> {
37        let (repo_id, repo_dir) = {
38            let (repo_id, git_repo) = self.engine.get_repo(repo_name).await?;
39            // GitRepository::path() already returns the working tree directory
40            (repo_id, git_repo.path().to_path_buf())
41        };
42
43        // Create a temp directory with the full repo content, then overlay
44        // changeset files so that cargo/build tools find Cargo.toml and
45        // all workspace metadata alongside the modified source files.
46        let changeset_data = self.engine.changeset_store().get_files(changeset_id).await?;
47        let temp_dir = tempfile::tempdir().context("failed to create temp dir for verify")?;
48        let work_dir = temp_dir.path().to_path_buf();
49
50        // Copy repo working tree into temp dir so Cargo.toml, Cargo.lock,
51        // and all other workspace files are present for build tools.
52        copy_dir_recursive(&repo_dir, &work_dir).await
53            .context("failed to copy repo into temp dir")?;
54
55        // Overlay changeset files on top of the repo copy.
56        let mut changeset_paths: Vec<String> = Vec::with_capacity(changeset_data.len());
57        for file in &changeset_data {
58            changeset_paths.push(file.file_path.clone());
59            if let Some(content) = &file.content {
60                // Security: reject dangerous paths BEFORE any filesystem operations.
61                // 1. Reject traversal components (../)
62                if file.file_path.contains("..") {
63                    anyhow::bail!(
64                        "changeset file path contains traversal component: '{}'",
65                        file.file_path
66                    );
67                }
68                // 2. Reject absolute paths (would discard work_dir base in Path::join)
69                if file.file_path.starts_with('/') || file.file_path.starts_with('\\') {
70                    anyhow::bail!(
71                        "changeset file path is absolute: '{}'",
72                        file.file_path
73                    );
74                }
75                let dest = work_dir.join(&file.file_path);
76                // 3. Lexical prefix check: verify joined path stays under work_dir.
77                //    This catches any remaining edge cases without touching the filesystem.
78                if !dest.starts_with(&work_dir) {
79                    anyhow::bail!(
80                        "changeset file path escapes sandbox: '{}' resolves outside work_dir",
81                        file.file_path
82                    );
83                }
84                // Safe to create directories and write — path is validated
85                if let Some(parent) = dest.parent() {
86                    tokio::fs::create_dir_all(parent).await?;
87                }
88                tokio::fs::write(&dest, content).await?;
89            }
90        }
91
92        info!(
93            "copied repo and overlaid {} changeset files into {} for verification",
94            changeset_paths.len(),
95            work_dir.display()
96        );
97
98        // Intentionally load the pipeline from the canonical repo directory, not from
99        // work_dir (the changeset overlay). This prevents a submitted changeset from
100        // hijacking its own verification pipeline for security.
101        let workflow = self.load_workflow(&repo_dir, repo_id).await?;
102
103        // Auto-none: no pipeline configured, auto-approve with audit trail
104        if workflow.stages.is_empty() {
105            tracing::warn!(
106                changeset_id = %changeset_id,
107                repo = %repo_name,
108                "auto-approving changeset: no verification pipeline and no recognized project type"
109            );
110            return Ok(true);
111        }
112
113        validate_workflow(&workflow).context("workflow validation failed")?;
114
115        let mut env = HashMap::new();
116        env.insert("DKOD_CHANGESET_ID".to_string(), changeset_id.to_string());
117        env.insert("DKOD_REPO_ID".to_string(), repo_id.to_string());
118
119        let passed = tokio::time::timeout(
120            workflow.timeout,
121            scheduler::run_workflow(
122                &workflow,
123                self.executor.as_ref(),
124                &work_dir,
125                &changeset_paths,
126                &env,
127                &tx,
128                Some(&self.engine),
129                Some(repo_id),
130                Some(changeset_id),
131            ),
132        )
133        .await
134        .unwrap_or_else(|_| {
135            tracing::warn!("workflow '{}' timed out after {:?}", workflow.name, workflow.timeout);
136            false
137        });
138
139        // temp_dir cleaned up on drop
140        Ok(passed)
141    }
142
143    async fn load_workflow(&self, repo_dir: &Path, repo_id: Uuid) -> Result<Workflow> {
144        // Priority 1: .dkod/pipeline.yaml in repo
145        let yaml_path = repo_dir.join(".dkod/pipeline.yaml");
146        if yaml_path.exists() {
147            info!("loading workflow from {}", yaml_path.display());
148            let workflow = parse_yaml_workflow_file(&yaml_path).await?;
149            if workflow.stages.is_empty() {
150                anyhow::bail!(
151                    "pipeline.yaml exists but defines no stages — refusing to auto-approve; \
152                     add at least one stage or remove the file to use auto-detection"
153                );
154            }
155            return Ok(workflow);
156        }
157
158        // Check for legacy .dekode/pipeline.toml and warn about migration
159        let legacy_toml = repo_dir.join(".dekode/pipeline.toml");
160        if legacy_toml.exists() {
161            tracing::warn!(
162                path = %legacy_toml.display(),
163                "found legacy .dekode/pipeline.toml \u{2014} this format is no longer loaded; please migrate to .dkod/pipeline.yaml"
164            );
165        }
166
167        // Priority 2: DB-stored pipeline
168        let db_steps = self.engine
169            .pipeline_store()
170            .get_pipeline(repo_id)
171            .await
172            .unwrap_or_default();
173
174        if !db_steps.is_empty() {
175            info!(
176                "loading workflow from DB pipeline ({} steps)",
177                db_steps.len()
178            );
179            return Ok(db_pipeline_to_workflow(db_steps));
180        }
181
182        // Priority 3: Auto-detect from project files
183        info!("auto-detecting verification workflow from project files");
184        Ok(detect_workflow(repo_dir))
185    }
186}
187
188fn db_pipeline_to_workflow(steps: Vec<dk_engine::pipeline::PipelineStep>) -> Workflow {
189    let resolved_steps: Vec<Step> = steps
190        .into_iter()
191        .map(|s| {
192            let command = s
193                .config
194                .get("command")
195                .and_then(|v| v.as_str())
196                .unwrap_or("echo 'no command configured'")
197                .to_string();
198            let timeout_secs = s
199                .config
200                .get("timeout_secs")
201                .and_then(|v| v.as_u64())
202                .unwrap_or(120);
203
204            let step_type = match s.step_type.as_str() {
205                "agent-review" => StepType::AgentReview {
206                    prompt: "Review this changeset".to_string(),
207                },
208                "human-approve" => StepType::HumanApprove,
209                _ => StepType::Command { run: command },
210            };
211
212            Step {
213                name: s.step_type.clone(),
214                step_type,
215                timeout: Duration::from_secs(timeout_secs),
216                required: s.required,
217                changeset_aware: false,
218            }
219        })
220        .collect();
221
222    Workflow {
223        name: "db-pipeline".to_string(),
224        timeout: Duration::from_secs(600),
225        stages: vec![Stage {
226            name: "pipeline".to_string(),
227            parallel: false,
228            steps: resolved_steps,
229        }],
230        allowed_commands: vec![],
231    }
232}
233
234/// Auto-detect verification workflow from project files in the repo.
235/// Returns a no-stage workflow (auto-approve) if no known project type found.
236fn detect_workflow(repo_dir: &Path) -> Workflow {
237    if repo_dir.join("Cargo.toml").exists() {
238        return Workflow {
239            name: "auto-rust".to_string(),
240            timeout: Duration::from_secs(180),
241            allowed_commands: vec![],
242            stages: vec![Stage {
243                name: "checks".to_string(),
244                parallel: false,
245                steps: vec![
246                    Step {
247                        name: "typecheck".to_string(),
248                        step_type: StepType::Command {
249                            run: "cargo check".to_string(),
250                        },
251                        timeout: Duration::from_secs(60),
252                        required: true,
253                        changeset_aware: true,
254                    },
255                    Step {
256                        name: "test".to_string(),
257                        step_type: StepType::Command {
258                            run: "cargo test".to_string(),
259                        },
260                        timeout: Duration::from_secs(60),
261                        required: true,
262                        changeset_aware: true,
263                    },
264                ],
265            }],
266        };
267    }
268
269    if repo_dir.join("package.json").exists() {
270        let is_bun = repo_dir.join("bun.lock").exists() || repo_dir.join("bun.lockb").exists();
271        let (name, install_cmd, test_cmd) = if is_bun {
272            ("auto-bun", "bun install --frozen-lockfile", "bun test")
273        } else {
274            ("auto-node", "npm ci", "npm test")
275        };
276        return Workflow {
277            name: name.to_string(),
278            timeout: Duration::from_secs(300),
279            allowed_commands: vec![],
280            stages: vec![Stage {
281                name: "checks".to_string(),
282                parallel: false,
283                steps: vec![
284                    Step {
285                        name: "install".to_string(),
286                        step_type: StepType::Command {
287                            run: install_cmd.to_string(),
288                        },
289                        timeout: Duration::from_secs(120),
290                        required: true,
291                        changeset_aware: false,
292                    },
293                    Step {
294                        name: "test".to_string(),
295                        step_type: StepType::Command {
296                            run: test_cmd.to_string(),
297                        },
298                        timeout: Duration::from_secs(60),
299                        required: true,
300                        changeset_aware: true,
301                    },
302                ],
303            }],
304        };
305    }
306
307    if repo_dir.join("pyproject.toml").exists() || repo_dir.join("requirements.txt").exists() {
308        let has_requirements = repo_dir.join("requirements.txt").exists();
309        let has_pyproject = repo_dir.join("pyproject.toml").exists();
310        let mut steps = Vec::new();
311        if has_pyproject {
312            steps.push(Step {
313                name: "install".to_string(),
314                step_type: StepType::Command {
315                    run: "pip install -e .".to_string(),
316                },
317                timeout: Duration::from_secs(120),
318                required: true,
319                changeset_aware: false,
320            });
321        }
322        // Also install requirements.txt when both files exist (common dual-file layout)
323        if has_requirements {
324            steps.push(Step {
325                name: "install-deps".to_string(),
326                step_type: StepType::Command {
327                    run: "pip install -r requirements.txt".to_string(),
328                },
329                timeout: Duration::from_secs(120),
330                required: true,
331                changeset_aware: false,
332            });
333        }
334        steps.push(Step {
335            name: "test".to_string(),
336            step_type: StepType::Command {
337                run: "pytest".to_string(),
338            },
339            timeout: Duration::from_secs(60),
340            required: true,
341            changeset_aware: true,
342        });
343        return Workflow {
344            name: "auto-python".to_string(),
345            timeout: Duration::from_secs(420),
346            allowed_commands: vec![],
347            stages: vec![Stage {
348                name: "checks".to_string(),
349                parallel: false,
350                steps,
351            }],
352        };
353    }
354
355    if repo_dir.join("go.mod").exists() {
356        return Workflow {
357            name: "auto-go".to_string(),
358            timeout: Duration::from_secs(300),
359            allowed_commands: vec![],
360            stages: vec![Stage {
361                name: "checks".to_string(),
362                parallel: false,
363                steps: vec![
364                    Step {
365                        name: "build".to_string(),
366                        step_type: StepType::Command {
367                            run: "go build ./...".to_string(),
368                        },
369                        timeout: Duration::from_secs(60),
370                        required: true,
371                        changeset_aware: true,
372                    },
373                    Step {
374                        name: "vet".to_string(),
375                        step_type: StepType::Command {
376                            run: "go vet ./...".to_string(),
377                        },
378                        timeout: Duration::from_secs(60),
379                        required: true,
380                        changeset_aware: true,
381                    },
382                    Step {
383                        name: "test".to_string(),
384                        step_type: StepType::Command {
385                            run: "go test ./...".to_string(),
386                        },
387                        timeout: Duration::from_secs(60),
388                        required: true,
389                        changeset_aware: true,
390                    },
391                ],
392            }],
393        };
394    }
395
396    // No recognized project type — auto-approve (structured warning emitted by caller)
397    Workflow {
398        name: "auto-none".to_string(),
399        timeout: Duration::from_secs(30),
400        allowed_commands: vec![],
401        stages: vec![],
402    }
403}
404
405/// Recursively copy a directory tree, skipping the `.git` directory.
406async fn copy_dir_recursive(src: &Path, dst: &Path) -> Result<()> {
407    tokio::fs::create_dir_all(dst).await?;
408    let mut entries = tokio::fs::read_dir(src).await?;
409    while let Some(entry) = entries.next_entry().await? {
410        let file_name = entry.file_name();
411        // Skip .git to avoid copying potentially large git objects
412        if file_name == ".git" {
413            continue;
414        }
415        let src_path = entry.path();
416        let dst_path = dst.join(&file_name);
417        let file_type = entry.file_type().await?;
418        if file_type.is_dir() {
419            Box::pin(copy_dir_recursive(&src_path, &dst_path)).await?;
420        } else if file_type.is_symlink() {
421            let target = tokio::fs::read_link(&src_path).await?;
422            // Security: only recreate relative symlinks whose resolved target
423            // stays within the destination tree. This prevents sandbox escapes
424            // via crafted symlinks (e.g., link -> /etc/passwd, link -> ../../..).
425            let target_str = target.to_string_lossy();
426            if target_str.starts_with('/') || target_str.contains("..") {
427                tracing::warn!(
428                    src = %src_path.display(),
429                    target = %target.display(),
430                    "skipping symlink that points outside sandbox"
431                );
432                continue;
433            }
434            #[cfg(unix)]
435            tokio::fs::symlink(target, &dst_path).await?;
436        } else {
437            tokio::fs::copy(&src_path, &dst_path).await?;
438        }
439    }
440    Ok(())
441}
442
443#[cfg(test)]
444mod tests {
445    use super::*;
446
447    #[tokio::test]
448    async fn test_detect_workflow_rust() {
449        let dir = tempfile::tempdir().unwrap();
450        tokio::fs::write(dir.path().join("Cargo.toml"), b"[package]\nname = \"test\"")
451            .await.unwrap();
452        let wf = detect_workflow(dir.path());
453        assert_eq!(wf.name, "auto-rust");
454        assert_eq!(wf.stages.len(), 1);
455        assert_eq!(wf.stages[0].steps.len(), 2);
456    }
457
458    #[tokio::test]
459    async fn test_detect_workflow_bun() {
460        let dir = tempfile::tempdir().unwrap();
461        tokio::fs::write(dir.path().join("package.json"), b"{}").await.unwrap();
462        tokio::fs::write(dir.path().join("bun.lock"), b"").await.unwrap();
463        let wf = detect_workflow(dir.path());
464        assert_eq!(wf.name, "auto-bun");
465        assert_eq!(wf.stages[0].steps.len(), 2);
466        let cmds: Vec<_> = wf.stages[0].steps.iter().filter_map(|s| {
467            if let StepType::Command { run } = &s.step_type { Some(run.as_str()) } else { None }
468        }).collect();
469        assert!(cmds.contains(&"bun install --frozen-lockfile"));
470        assert!(cmds.contains(&"bun test"));
471    }
472
473    #[tokio::test]
474    async fn test_detect_workflow_bun_lockb() {
475        let dir = tempfile::tempdir().unwrap();
476        tokio::fs::write(dir.path().join("package.json"), b"{}").await.unwrap();
477        tokio::fs::write(dir.path().join("bun.lockb"), b"\x00").await.unwrap();
478        let wf = detect_workflow(dir.path());
479        assert_eq!(wf.name, "auto-bun");
480        assert_eq!(wf.stages[0].steps.len(), 2);
481        let cmds: Vec<_> = wf.stages[0].steps.iter().filter_map(|s| {
482            if let StepType::Command { run } = &s.step_type { Some(run.as_str()) } else { None }
483        }).collect();
484        assert!(cmds.contains(&"bun install --frozen-lockfile"));
485        assert!(cmds.contains(&"bun test"));
486    }
487
488    #[tokio::test]
489    async fn test_detect_workflow_npm() {
490        let dir = tempfile::tempdir().unwrap();
491        tokio::fs::write(dir.path().join("package.json"), b"{}").await.unwrap();
492        let wf = detect_workflow(dir.path());
493        assert_eq!(wf.name, "auto-node");
494        assert_eq!(wf.stages[0].steps.len(), 2);
495        let cmds: Vec<_> = wf.stages[0].steps.iter().filter_map(|s| {
496            if let StepType::Command { run } = &s.step_type { Some(run.as_str()) } else { None }
497        }).collect();
498        assert!(cmds.contains(&"npm ci"));
499        assert!(cmds.contains(&"npm test"));
500    }
501
502    #[tokio::test]
503    async fn test_detect_workflow_python_pyproject() {
504        let dir = tempfile::tempdir().unwrap();
505        tokio::fs::write(dir.path().join("pyproject.toml"), b"[project]").await.unwrap();
506        let wf = detect_workflow(dir.path());
507        assert_eq!(wf.name, "auto-python");
508        // pyproject.toml only — install via pip install -e . plus test
509        assert_eq!(wf.stages[0].steps.len(), 2);
510        let cmds: Vec<_> = wf.stages[0].steps.iter().filter_map(|s| {
511            if let StepType::Command { run } = &s.step_type { Some(run.as_str()) } else { None }
512        }).collect();
513        assert!(cmds.contains(&"pip install -e ."));
514        assert!(cmds.contains(&"pytest"));
515    }
516
517    #[tokio::test]
518    async fn test_detect_workflow_python_dual_file() {
519        let dir = tempfile::tempdir().unwrap();
520        tokio::fs::write(dir.path().join("pyproject.toml"), b"[project]").await.unwrap();
521        tokio::fs::write(dir.path().join("requirements.txt"), b"pytest\nrequests").await.unwrap();
522        let wf = detect_workflow(dir.path());
523        assert_eq!(wf.name, "auto-python");
524        // Both files present — install pyproject + requirements + test
525        assert_eq!(wf.stages[0].steps.len(), 3);
526        let cmds: Vec<_> = wf.stages[0].steps.iter().filter_map(|s| {
527            if let StepType::Command { run } = &s.step_type { Some(run.as_str()) } else { None }
528        }).collect();
529        assert!(cmds.contains(&"pip install -e ."));
530        assert!(cmds.contains(&"pip install -r requirements.txt"));
531        assert!(cmds.contains(&"pytest"));
532    }
533
534    #[tokio::test]
535    async fn test_detect_workflow_python_requirements() {
536        let dir = tempfile::tempdir().unwrap();
537        tokio::fs::write(dir.path().join("requirements.txt"), b"pytest\nrequests").await.unwrap();
538        let wf = detect_workflow(dir.path());
539        assert_eq!(wf.name, "auto-python");
540        // requirements.txt only — install-deps + test
541        assert_eq!(wf.stages[0].steps.len(), 2);
542        let cmds: Vec<_> = wf.stages[0].steps.iter().filter_map(|s| {
543            if let StepType::Command { run } = &s.step_type { Some(run.as_str()) } else { None }
544        }).collect();
545        assert!(cmds.contains(&"pip install -r requirements.txt"));
546        assert!(cmds.contains(&"pytest"));
547    }
548
549    #[tokio::test]
550    async fn test_detect_workflow_python_both_files() {
551        let dir = tempfile::tempdir().unwrap();
552        tokio::fs::write(dir.path().join("pyproject.toml"), b"[project]").await.unwrap();
553        tokio::fs::write(dir.path().join("requirements.txt"), b"pytest\nrequests").await.unwrap();
554        let wf = detect_workflow(dir.path());
555        assert_eq!(wf.name, "auto-python");
556        // Both files — install-package + install-deps + test
557        assert_eq!(wf.stages[0].steps.len(), 3);
558        let cmds: Vec<_> = wf.stages[0].steps.iter().filter_map(|s| {
559            if let StepType::Command { run } = &s.step_type { Some(run.as_str()) } else { None }
560        }).collect();
561        assert!(cmds.contains(&"pip install -e ."));
562        assert!(cmds.contains(&"pip install -r requirements.txt"));
563        assert!(cmds.contains(&"pytest"));
564    }
565
566    #[tokio::test]
567    async fn test_detect_workflow_go() {
568        let dir = tempfile::tempdir().unwrap();
569        tokio::fs::write(dir.path().join("go.mod"), b"module example.com/test").await.unwrap();
570        let wf = detect_workflow(dir.path());
571        assert_eq!(wf.name, "auto-go");
572        assert_eq!(wf.stages[0].steps.len(), 3);
573    }
574
575    #[tokio::test]
576    async fn test_detect_workflow_unknown() {
577        let dir = tempfile::tempdir().unwrap();
578        let wf = detect_workflow(dir.path());
579        assert_eq!(wf.name, "auto-none");
580        assert!(wf.stages.is_empty());
581    }
582
583    #[tokio::test]
584    async fn test_copy_dir_recursive_copies_files() {
585        let src = tempfile::tempdir().unwrap();
586        let dst = tempfile::tempdir().unwrap();
587
588        tokio::fs::write(src.path().join("Cargo.toml"), b"[package]\nname = \"test\"")
589            .await
590            .unwrap();
591        tokio::fs::create_dir_all(src.path().join("src")).await.unwrap();
592        tokio::fs::write(src.path().join("src/main.rs"), b"fn main() {}")
593            .await
594            .unwrap();
595
596        // .git dir should be skipped
597        tokio::fs::create_dir_all(src.path().join(".git/objects")).await.unwrap();
598        tokio::fs::write(src.path().join(".git/HEAD"), b"ref: refs/heads/main")
599            .await
600            .unwrap();
601
602        copy_dir_recursive(src.path(), dst.path()).await.unwrap();
603
604        assert!(dst.path().join("Cargo.toml").exists(), "Cargo.toml must be at dst root");
605        assert!(dst.path().join("src/main.rs").exists(), "src/main.rs must exist");
606        assert!(!dst.path().join(".git").exists(), ".git must be skipped");
607    }
608
609    #[tokio::test]
610    async fn test_copy_dir_recursive_handles_symlinks() {
611        let src = tempfile::tempdir().unwrap();
612        let dst = tempfile::tempdir().unwrap();
613
614        // Create a regular file and a symlink to it
615        tokio::fs::write(src.path().join("real.txt"), b"hello").await.unwrap();
616        #[cfg(unix)]
617        tokio::fs::symlink("real.txt", src.path().join("link.txt")).await.unwrap();
618
619        copy_dir_recursive(src.path(), dst.path()).await.unwrap();
620
621        assert!(dst.path().join("real.txt").exists());
622        #[cfg(unix)]
623        {
624            let meta = tokio::fs::symlink_metadata(dst.path().join("link.txt")).await.unwrap();
625            assert!(meta.file_type().is_symlink(), "symlink should be preserved");
626            let target = tokio::fs::read_link(dst.path().join("link.txt")).await.unwrap();
627            assert_eq!(target.to_str().unwrap(), "real.txt");
628        }
629    }
630
631    #[tokio::test]
632    async fn test_copy_dir_recursive_handles_dir_symlinks() {
633        let src = tempfile::tempdir().unwrap();
634        let dst = tempfile::tempdir().unwrap();
635
636        // Create a real directory and a symlink to it
637        tokio::fs::create_dir_all(src.path().join("real_dir")).await.unwrap();
638        tokio::fs::write(src.path().join("real_dir/file.txt"), b"content").await.unwrap();
639        #[cfg(unix)]
640        tokio::fs::symlink("real_dir", src.path().join("linked_dir")).await.unwrap();
641
642        copy_dir_recursive(src.path(), dst.path()).await.unwrap();
643
644        assert!(dst.path().join("real_dir/file.txt").exists());
645        #[cfg(unix)]
646        {
647            let meta = tokio::fs::symlink_metadata(dst.path().join("linked_dir")).await.unwrap();
648            assert!(meta.file_type().is_symlink(), "dir symlink should be preserved");
649            let target = tokio::fs::read_link(dst.path().join("linked_dir")).await.unwrap();
650            assert_eq!(target.to_str().unwrap(), "real_dir");
651        }
652    }
653
654    #[test]
655    fn test_db_pipeline_conversion() {
656        let steps = vec![
657            dk_engine::pipeline::PipelineStep {
658                repo_id: Uuid::new_v4(),
659                step_order: 1,
660                step_type: "typecheck".to_string(),
661                config: serde_json::json!({"command": "cargo check", "timeout_secs": 120}),
662                required: true,
663            },
664            dk_engine::pipeline::PipelineStep {
665                repo_id: Uuid::new_v4(),
666                step_order: 2,
667                step_type: "test".to_string(),
668                config: serde_json::json!({"command": "cargo test", "timeout_secs": 300}),
669                required: true,
670            },
671        ];
672        let wf = db_pipeline_to_workflow(steps);
673        assert_eq!(wf.stages.len(), 1);
674        assert_eq!(wf.stages[0].steps.len(), 2);
675    }
676}