1use std::collections::HashMap;
2use std::path::Path;
3use std::sync::Arc;
4use std::time::Duration;
5
6use anyhow::{Context, Result};
7use tokio::sync::mpsc;
8use tracing::info;
9use uuid::Uuid;
10
11use dk_engine::repo::Engine;
12
13use crate::executor::Executor;
14use crate::scheduler::{self, StepResult};
15use crate::workflow::parser::parse_yaml_workflow_file;
16use crate::workflow::types::{Stage, Step, StepType, Workflow};
17use crate::workflow::validator::validate_workflow;
18
19pub struct Runner {
21 engine: Arc<Engine>,
22 executor: Box<dyn Executor>,
23}
24
25impl Runner {
26 pub fn new(engine: Arc<Engine>, executor: Box<dyn Executor>) -> Self {
27 Self { engine, executor }
28 }
29
30 pub async fn verify(
32 &self,
33 changeset_id: Uuid,
34 repo_name: &str,
35 tx: mpsc::Sender<StepResult>,
36 ) -> Result<bool> {
37 let (repo_id, repo_dir) = {
38 let (repo_id, git_repo) = self.engine.get_repo(repo_name).await?;
39 (repo_id, git_repo.path().to_path_buf())
41 };
42
43 let changeset_data = self.engine.changeset_store().get_files(changeset_id).await?;
47 let temp_dir = tempfile::tempdir().context("failed to create temp dir for verify")?;
48 let work_dir = temp_dir.path().to_path_buf();
49
50 copy_dir_recursive(&repo_dir, &work_dir).await
53 .context("failed to copy repo into temp dir")?;
54
55 let mut changeset_paths: Vec<String> = Vec::with_capacity(changeset_data.len());
57 for file in &changeset_data {
58 changeset_paths.push(file.file_path.clone());
59 if let Some(content) = &file.content {
60 if file.file_path.contains("..") {
63 anyhow::bail!(
64 "changeset file path contains traversal component: '{}'",
65 file.file_path
66 );
67 }
68 if file.file_path.starts_with('/') || file.file_path.starts_with('\\') {
70 anyhow::bail!(
71 "changeset file path is absolute: '{}'",
72 file.file_path
73 );
74 }
75 let dest = work_dir.join(&file.file_path);
76 if !dest.starts_with(&work_dir) {
79 anyhow::bail!(
80 "changeset file path escapes sandbox: '{}' resolves outside work_dir",
81 file.file_path
82 );
83 }
84 if let Some(parent) = dest.parent() {
86 tokio::fs::create_dir_all(parent).await?;
87 }
88 tokio::fs::write(&dest, content).await?;
89 }
90 }
91
92 info!(
93 "copied repo and overlaid {} changeset files into {} for verification",
94 changeset_paths.len(),
95 work_dir.display()
96 );
97
98 let workflow = self.load_workflow(&repo_dir, repo_id).await?;
102
103 if workflow.stages.is_empty() {
105 tracing::warn!(
106 changeset_id = %changeset_id,
107 repo = %repo_name,
108 "auto-approving changeset: no verification pipeline and no recognized project type"
109 );
110 return Ok(true);
111 }
112
113 validate_workflow(&workflow).context("workflow validation failed")?;
114
115 let mut env = HashMap::new();
116 env.insert("DKOD_CHANGESET_ID".to_string(), changeset_id.to_string());
117 env.insert("DKOD_REPO_ID".to_string(), repo_id.to_string());
118
119 let passed = tokio::time::timeout(
120 workflow.timeout,
121 scheduler::run_workflow(
122 &workflow,
123 self.executor.as_ref(),
124 &work_dir,
125 &changeset_paths,
126 &env,
127 &tx,
128 Some(&self.engine),
129 Some(repo_id),
130 Some(changeset_id),
131 ),
132 )
133 .await
134 .unwrap_or_else(|_| {
135 tracing::warn!("workflow '{}' timed out after {:?}", workflow.name, workflow.timeout);
136 false
137 });
138
139 Ok(passed)
141 }
142
143 async fn load_workflow(&self, repo_dir: &Path, repo_id: Uuid) -> Result<Workflow> {
144 let yaml_path = repo_dir.join(".dkod/pipeline.yaml");
146 if yaml_path.exists() {
147 info!("loading workflow from {}", yaml_path.display());
148 let workflow = parse_yaml_workflow_file(&yaml_path).await?;
149 if workflow.stages.is_empty() {
150 anyhow::bail!(
151 "pipeline.yaml exists but defines no stages — refusing to auto-approve; \
152 add at least one stage or remove the file to use auto-detection"
153 );
154 }
155 return Ok(workflow);
156 }
157
158 let legacy_toml = repo_dir.join(".dekode/pipeline.toml");
160 if legacy_toml.exists() {
161 tracing::warn!(
162 path = %legacy_toml.display(),
163 "found legacy .dekode/pipeline.toml \u{2014} this format is no longer loaded; please migrate to .dkod/pipeline.yaml"
164 );
165 }
166
167 let db_steps = self.engine
169 .pipeline_store()
170 .get_pipeline(repo_id)
171 .await
172 .unwrap_or_default();
173
174 if !db_steps.is_empty() {
175 info!(
176 "loading workflow from DB pipeline ({} steps)",
177 db_steps.len()
178 );
179 return Ok(db_pipeline_to_workflow(db_steps));
180 }
181
182 info!("auto-detecting verification workflow from project files");
184 Ok(detect_workflow(repo_dir))
185 }
186}
187
188fn db_pipeline_to_workflow(steps: Vec<dk_engine::pipeline::PipelineStep>) -> Workflow {
189 let resolved_steps: Vec<Step> = steps
190 .into_iter()
191 .map(|s| {
192 let command = s
193 .config
194 .get("command")
195 .and_then(|v| v.as_str())
196 .unwrap_or("echo 'no command configured'")
197 .to_string();
198 let timeout_secs = s
199 .config
200 .get("timeout_secs")
201 .and_then(|v| v.as_u64())
202 .unwrap_or(120);
203
204 let step_type = match s.step_type.as_str() {
205 "agent-review" => StepType::AgentReview {
206 prompt: "Review this changeset".to_string(),
207 },
208 "human-approve" => StepType::HumanApprove,
209 _ => StepType::Command { run: command },
210 };
211
212 Step {
213 name: s.step_type.clone(),
214 step_type,
215 timeout: Duration::from_secs(timeout_secs),
216 required: s.required,
217 changeset_aware: false,
218 work_dir: None,
219 }
220 })
221 .collect();
222
223 Workflow {
224 name: "db-pipeline".to_string(),
225 timeout: Duration::from_secs(600),
226 stages: vec![Stage {
227 name: "pipeline".to_string(),
228 parallel: false,
229 steps: resolved_steps,
230 }],
231 allowed_commands: vec![],
232 }
233}
234
235pub fn detect_workflow(repo_dir: &Path) -> Workflow {
242 use std::path::PathBuf;
243
244 let mut steps: Vec<Step> = Vec::new();
245
246 let skip = ["node_modules", "target"];
250 let mut scan_dirs: Vec<(String, std::path::PathBuf)> = vec![("".to_string(), repo_dir.to_path_buf())];
251 if let Ok(entries) = std::fs::read_dir(repo_dir) {
252 for entry in entries.flatten() {
253 let path = entry.path();
254 if !path.is_dir() {
255 continue;
256 }
257 let name = entry.file_name();
258 let name_str = name.to_string_lossy().to_string();
259 if name_str.starts_with('.') || skip.contains(&name_str.as_str()) {
260 continue;
261 }
262 scan_dirs.push((name_str, path));
263 }
264 }
265
266 if scan_dirs.len() > 1 {
269 scan_dirs[1..].sort_by(|a, b| a.0.cmp(&b.0));
270 }
271
272 let root_dir = &scan_dirs[0].1;
277 let mut root_languages: std::collections::HashSet<&str> = std::collections::HashSet::new();
278 if root_dir.join("Cargo.toml").exists() {
279 root_languages.insert("rust");
280 }
281 if root_dir.join("package.json").exists() {
282 let is_bun = root_dir.join("bun.lock").exists() || root_dir.join("bun.lockb").exists();
283 root_languages.insert(if is_bun { "bun" } else { "node" });
284 }
285 if root_dir.join("pyproject.toml").exists() || root_dir.join("requirements.txt").exists() {
286 root_languages.insert("python");
287 }
288 if root_dir.join("go.mod").exists() {
289 root_languages.insert("go");
290 }
291
292 for (subdir_name, dir) in &scan_dirs {
293 let is_root = subdir_name.is_empty();
295 let step_work_dir: Option<PathBuf> = if is_root {
296 None
297 } else {
298 Some(PathBuf::from(subdir_name))
299 };
300
301 if dir.join("Cargo.toml").exists() && (is_root || !root_languages.contains("rust")) {
303 let name_prefix = if is_root {
304 "rust".to_string()
305 } else {
306 format!("rust({subdir_name})")
307 };
308 steps.push(Step {
309 name: format!("{name_prefix}:check"),
310 step_type: StepType::Command { run: "cargo check".to_string() },
311 timeout: Duration::from_secs(60),
312 required: true,
313 changeset_aware: true,
314 work_dir: step_work_dir.clone(),
315 });
316 steps.push(Step {
317 name: format!("{name_prefix}:test"),
318 step_type: StepType::Command { run: "cargo test".to_string() },
319 timeout: Duration::from_secs(60),
320 required: true,
321 changeset_aware: true,
322 work_dir: step_work_dir.clone(),
323 });
324 }
325
326 if dir.join("package.json").exists() {
328 let is_bun = dir.join("bun.lock").exists()
329 || dir.join("bun.lockb").exists();
330 let lang_key = if is_bun { "bun" } else { "node" };
331 if is_root || !root_languages.contains(lang_key) {
332 let (label, install_cmd, test_cmd) = if is_bun {
333 ("bun", "bun install --frozen-lockfile", "bun test")
334 } else {
335 ("node", "npm ci", "npm test")
336 };
337 let name_prefix = if is_root {
338 label.to_string()
339 } else {
340 format!("{label}({subdir_name})")
341 };
342 steps.push(Step {
343 name: format!("{name_prefix}:install"),
344 step_type: StepType::Command { run: install_cmd.to_string() },
345 timeout: Duration::from_secs(120),
346 required: true,
347 changeset_aware: false,
348 work_dir: step_work_dir.clone(),
349 });
350 steps.push(Step {
351 name: format!("{name_prefix}:test"),
352 step_type: StepType::Command { run: test_cmd.to_string() },
353 timeout: Duration::from_secs(60),
354 required: true,
355 changeset_aware: true,
356 work_dir: step_work_dir.clone(),
357 });
358 }
359 }
360
361 if (dir.join("pyproject.toml").exists()
363 || dir.join("requirements.txt").exists())
364 && (is_root || !root_languages.contains("python"))
365 {
366 let name_prefix = if is_root {
367 "python".to_string()
368 } else {
369 format!("python({subdir_name})")
370 };
371 if dir.join("pyproject.toml").exists() {
372 steps.push(Step {
373 name: format!("{name_prefix}:install"),
374 step_type: StepType::Command { run: "pip install -e .".to_string() },
375 timeout: Duration::from_secs(120),
376 required: true,
377 changeset_aware: false,
378 work_dir: step_work_dir.clone(),
379 });
380 }
381 if dir.join("requirements.txt").exists() {
382 steps.push(Step {
383 name: format!("{name_prefix}:install-deps"),
384 step_type: StepType::Command {
385 run: "pip install -r requirements.txt".to_string(),
386 },
387 timeout: Duration::from_secs(120),
388 required: true,
389 changeset_aware: false,
390 work_dir: step_work_dir.clone(),
391 });
392 }
393 steps.push(Step {
394 name: format!("{name_prefix}:test"),
395 step_type: StepType::Command { run: "pytest".to_string() },
396 timeout: Duration::from_secs(60),
397 required: true,
398 changeset_aware: true,
399 work_dir: step_work_dir.clone(),
400 });
401 }
402
403 if dir.join("go.mod").exists() && (is_root || !root_languages.contains("go")) {
405 let name_prefix = if is_root {
406 "go".to_string()
407 } else {
408 format!("go({subdir_name})")
409 };
410 steps.push(Step {
411 name: format!("{name_prefix}:build"),
412 step_type: StepType::Command { run: "go build ./...".to_string() },
413 timeout: Duration::from_secs(60),
414 required: true,
415 changeset_aware: true,
416 work_dir: step_work_dir.clone(),
417 });
418 steps.push(Step {
419 name: format!("{name_prefix}:vet"),
420 step_type: StepType::Command { run: "go vet ./...".to_string() },
421 timeout: Duration::from_secs(60),
422 required: true,
423 changeset_aware: true,
424 work_dir: step_work_dir.clone(),
425 });
426 steps.push(Step {
427 name: format!("{name_prefix}:test"),
428 step_type: StepType::Command { run: "go test ./...".to_string() },
429 timeout: Duration::from_secs(60),
430 required: true,
431 changeset_aware: true,
432 work_dir: step_work_dir.clone(),
433 });
434 }
435 }
436
437 if steps.is_empty() {
438 return Workflow {
439 name: "auto-none".to_string(),
440 timeout: Duration::from_secs(30),
441 allowed_commands: vec![],
442 stages: vec![],
443 };
444 }
445
446 let unique_langs = steps.iter().map(|s| s.name.split(':').next().unwrap_or("")).collect::<std::collections::HashSet<_>>();
447 let unique_work_dirs = steps.iter().map(|s| s.work_dir.as_deref()).collect::<std::collections::HashSet<_>>();
448 let name = if unique_langs.len() > 1 || unique_work_dirs.len() > 1 {
449 "auto-polyglot".to_string()
450 } else {
451 format!("auto-{}", steps[0].name.split(':').next().unwrap_or("unknown"))
452 };
453
454 let total_timeout_secs = steps.iter().map(|s| s.timeout.as_secs()).sum::<u64>().max(60);
456
457 Workflow {
458 name,
459 timeout: Duration::from_secs(total_timeout_secs),
460 allowed_commands: vec![],
461 stages: vec![Stage {
462 name: "checks".to_string(),
463 parallel: false,
464 steps,
465 }],
466 }
467}
468
469async fn copy_dir_recursive(src: &Path, dst: &Path) -> Result<()> {
471 tokio::fs::create_dir_all(dst).await?;
472 let mut entries = tokio::fs::read_dir(src).await?;
473 while let Some(entry) = entries.next_entry().await? {
474 let file_name = entry.file_name();
475 if file_name == ".git" {
477 continue;
478 }
479 let src_path = entry.path();
480 let dst_path = dst.join(&file_name);
481 let file_type = entry.file_type().await?;
482 if file_type.is_dir() {
483 Box::pin(copy_dir_recursive(&src_path, &dst_path)).await?;
484 } else if file_type.is_symlink() {
485 let target = tokio::fs::read_link(&src_path).await?;
486 let target_str = target.to_string_lossy();
490 if target_str.starts_with('/') || target_str.contains("..") {
491 tracing::warn!(
492 src = %src_path.display(),
493 target = %target.display(),
494 "skipping symlink that points outside sandbox"
495 );
496 continue;
497 }
498 #[cfg(unix)]
499 tokio::fs::symlink(target, &dst_path).await?;
500 } else {
501 tokio::fs::copy(&src_path, &dst_path).await?;
502 }
503 }
504 Ok(())
505}
506
507#[cfg(test)]
508mod tests {
509 use super::*;
510
511 #[tokio::test]
512 async fn test_detect_workflow_rust() {
513 let dir = tempfile::tempdir().unwrap();
514 tokio::fs::write(dir.path().join("Cargo.toml"), b"[package]\nname = \"test\"")
515 .await.unwrap();
516 let wf = detect_workflow(dir.path());
517 assert_eq!(wf.name, "auto-rust");
518 assert_eq!(wf.stages.len(), 1);
519 assert_eq!(wf.stages[0].steps.len(), 2);
520 }
521
522 #[tokio::test]
523 async fn test_detect_workflow_bun() {
524 let dir = tempfile::tempdir().unwrap();
525 tokio::fs::write(dir.path().join("package.json"), b"{}").await.unwrap();
526 tokio::fs::write(dir.path().join("bun.lock"), b"").await.unwrap();
527 let wf = detect_workflow(dir.path());
528 assert_eq!(wf.name, "auto-bun");
529 assert_eq!(wf.stages[0].steps.len(), 2);
530 let cmds: Vec<_> = wf.stages[0].steps.iter().filter_map(|s| {
531 if let StepType::Command { run } = &s.step_type { Some(run.as_str()) } else { None }
532 }).collect();
533 assert!(cmds.contains(&"bun install --frozen-lockfile"));
534 assert!(cmds.contains(&"bun test"));
535 }
536
537 #[tokio::test]
538 async fn test_detect_workflow_bun_lockb() {
539 let dir = tempfile::tempdir().unwrap();
540 tokio::fs::write(dir.path().join("package.json"), b"{}").await.unwrap();
541 tokio::fs::write(dir.path().join("bun.lockb"), b"\x00").await.unwrap();
542 let wf = detect_workflow(dir.path());
543 assert_eq!(wf.name, "auto-bun");
544 assert_eq!(wf.stages[0].steps.len(), 2);
545 let cmds: Vec<_> = wf.stages[0].steps.iter().filter_map(|s| {
546 if let StepType::Command { run } = &s.step_type { Some(run.as_str()) } else { None }
547 }).collect();
548 assert!(cmds.contains(&"bun install --frozen-lockfile"));
549 assert!(cmds.contains(&"bun test"));
550 }
551
552 #[tokio::test]
553 async fn test_detect_workflow_npm() {
554 let dir = tempfile::tempdir().unwrap();
555 tokio::fs::write(dir.path().join("package.json"), b"{}").await.unwrap();
556 let wf = detect_workflow(dir.path());
557 assert_eq!(wf.name, "auto-node");
558 assert_eq!(wf.stages[0].steps.len(), 2);
559 let cmds: Vec<_> = wf.stages[0].steps.iter().filter_map(|s| {
560 if let StepType::Command { run } = &s.step_type { Some(run.as_str()) } else { None }
561 }).collect();
562 assert!(cmds.contains(&"npm ci"));
563 assert!(cmds.contains(&"npm test"));
564 }
565
566 #[tokio::test]
567 async fn test_detect_workflow_python_pyproject() {
568 let dir = tempfile::tempdir().unwrap();
569 tokio::fs::write(dir.path().join("pyproject.toml"), b"[project]").await.unwrap();
570 let wf = detect_workflow(dir.path());
571 assert_eq!(wf.name, "auto-python");
572 assert_eq!(wf.stages[0].steps.len(), 2);
574 let cmds: Vec<_> = wf.stages[0].steps.iter().filter_map(|s| {
575 if let StepType::Command { run } = &s.step_type { Some(run.as_str()) } else { None }
576 }).collect();
577 assert!(cmds.contains(&"pip install -e ."));
578 assert!(cmds.contains(&"pytest"));
579 }
580
581 #[tokio::test]
582 async fn test_detect_workflow_python_dual_file() {
583 let dir = tempfile::tempdir().unwrap();
584 tokio::fs::write(dir.path().join("pyproject.toml"), b"[project]").await.unwrap();
585 tokio::fs::write(dir.path().join("requirements.txt"), b"pytest\nrequests").await.unwrap();
586 let wf = detect_workflow(dir.path());
587 assert_eq!(wf.name, "auto-python");
588 assert_eq!(wf.stages[0].steps.len(), 3);
590 let cmds: Vec<_> = wf.stages[0].steps.iter().filter_map(|s| {
591 if let StepType::Command { run } = &s.step_type { Some(run.as_str()) } else { None }
592 }).collect();
593 assert!(cmds.contains(&"pip install -e ."));
594 assert!(cmds.contains(&"pip install -r requirements.txt"));
595 assert!(cmds.contains(&"pytest"));
596 }
597
598 #[tokio::test]
599 async fn test_detect_workflow_python_requirements() {
600 let dir = tempfile::tempdir().unwrap();
601 tokio::fs::write(dir.path().join("requirements.txt"), b"pytest\nrequests").await.unwrap();
602 let wf = detect_workflow(dir.path());
603 assert_eq!(wf.name, "auto-python");
604 assert_eq!(wf.stages[0].steps.len(), 2);
606 let cmds: Vec<_> = wf.stages[0].steps.iter().filter_map(|s| {
607 if let StepType::Command { run } = &s.step_type { Some(run.as_str()) } else { None }
608 }).collect();
609 assert!(cmds.contains(&"pip install -r requirements.txt"));
610 assert!(cmds.contains(&"pytest"));
611 }
612
613 #[tokio::test]
614 async fn test_detect_workflow_python_both_files() {
615 let dir = tempfile::tempdir().unwrap();
616 tokio::fs::write(dir.path().join("pyproject.toml"), b"[project]").await.unwrap();
617 tokio::fs::write(dir.path().join("requirements.txt"), b"pytest\nrequests").await.unwrap();
618 let wf = detect_workflow(dir.path());
619 assert_eq!(wf.name, "auto-python");
620 assert_eq!(wf.stages[0].steps.len(), 3);
622 let cmds: Vec<_> = wf.stages[0].steps.iter().filter_map(|s| {
623 if let StepType::Command { run } = &s.step_type { Some(run.as_str()) } else { None }
624 }).collect();
625 assert!(cmds.contains(&"pip install -e ."));
626 assert!(cmds.contains(&"pip install -r requirements.txt"));
627 assert!(cmds.contains(&"pytest"));
628 }
629
630 #[tokio::test]
631 async fn test_detect_workflow_go() {
632 let dir = tempfile::tempdir().unwrap();
633 tokio::fs::write(dir.path().join("go.mod"), b"module example.com/test").await.unwrap();
634 let wf = detect_workflow(dir.path());
635 assert_eq!(wf.name, "auto-go");
636 assert_eq!(wf.stages[0].steps.len(), 3);
637 }
638
639 #[tokio::test]
640 async fn test_detect_workflow_unknown() {
641 let dir = tempfile::tempdir().unwrap();
642 let wf = detect_workflow(dir.path());
643 assert_eq!(wf.name, "auto-none");
644 assert!(wf.stages.is_empty());
645 }
646
647 #[tokio::test]
648 async fn test_copy_dir_recursive_copies_files() {
649 let src = tempfile::tempdir().unwrap();
650 let dst = tempfile::tempdir().unwrap();
651
652 tokio::fs::write(src.path().join("Cargo.toml"), b"[package]\nname = \"test\"")
653 .await
654 .unwrap();
655 tokio::fs::create_dir_all(src.path().join("src")).await.unwrap();
656 tokio::fs::write(src.path().join("src/main.rs"), b"fn main() {}")
657 .await
658 .unwrap();
659
660 tokio::fs::create_dir_all(src.path().join(".git/objects")).await.unwrap();
662 tokio::fs::write(src.path().join(".git/HEAD"), b"ref: refs/heads/main")
663 .await
664 .unwrap();
665
666 copy_dir_recursive(src.path(), dst.path()).await.unwrap();
667
668 assert!(dst.path().join("Cargo.toml").exists(), "Cargo.toml must be at dst root");
669 assert!(dst.path().join("src/main.rs").exists(), "src/main.rs must exist");
670 assert!(!dst.path().join(".git").exists(), ".git must be skipped");
671 }
672
673 #[tokio::test]
674 async fn test_copy_dir_recursive_handles_symlinks() {
675 let src = tempfile::tempdir().unwrap();
676 let dst = tempfile::tempdir().unwrap();
677
678 tokio::fs::write(src.path().join("real.txt"), b"hello").await.unwrap();
680 #[cfg(unix)]
681 tokio::fs::symlink("real.txt", src.path().join("link.txt")).await.unwrap();
682
683 copy_dir_recursive(src.path(), dst.path()).await.unwrap();
684
685 assert!(dst.path().join("real.txt").exists());
686 #[cfg(unix)]
687 {
688 let meta = tokio::fs::symlink_metadata(dst.path().join("link.txt")).await.unwrap();
689 assert!(meta.file_type().is_symlink(), "symlink should be preserved");
690 let target = tokio::fs::read_link(dst.path().join("link.txt")).await.unwrap();
691 assert_eq!(target.to_str().unwrap(), "real.txt");
692 }
693 }
694
695 #[tokio::test]
696 async fn test_copy_dir_recursive_handles_dir_symlinks() {
697 let src = tempfile::tempdir().unwrap();
698 let dst = tempfile::tempdir().unwrap();
699
700 tokio::fs::create_dir_all(src.path().join("real_dir")).await.unwrap();
702 tokio::fs::write(src.path().join("real_dir/file.txt"), b"content").await.unwrap();
703 #[cfg(unix)]
704 tokio::fs::symlink("real_dir", src.path().join("linked_dir")).await.unwrap();
705
706 copy_dir_recursive(src.path(), dst.path()).await.unwrap();
707
708 assert!(dst.path().join("real_dir/file.txt").exists());
709 #[cfg(unix)]
710 {
711 let meta = tokio::fs::symlink_metadata(dst.path().join("linked_dir")).await.unwrap();
712 assert!(meta.file_type().is_symlink(), "dir symlink should be preserved");
713 let target = tokio::fs::read_link(dst.path().join("linked_dir")).await.unwrap();
714 assert_eq!(target.to_str().unwrap(), "real_dir");
715 }
716 }
717
718 #[test]
719 fn test_db_pipeline_conversion() {
720 let steps = vec![
721 dk_engine::pipeline::PipelineStep {
722 repo_id: Uuid::new_v4(),
723 step_order: 1,
724 step_type: "typecheck".to_string(),
725 config: serde_json::json!({"command": "cargo check", "timeout_secs": 120}),
726 required: true,
727 },
728 dk_engine::pipeline::PipelineStep {
729 repo_id: Uuid::new_v4(),
730 step_order: 2,
731 step_type: "test".to_string(),
732 config: serde_json::json!({"command": "cargo test", "timeout_secs": 300}),
733 required: true,
734 },
735 ];
736 let wf = db_pipeline_to_workflow(steps);
737 assert_eq!(wf.stages.len(), 1);
738 assert_eq!(wf.stages[0].steps.len(), 2);
739 }
740
741 #[tokio::test]
742 async fn test_detect_workflow_polyglot_rust_and_node() {
743 let dir = tempfile::tempdir().unwrap();
744 tokio::fs::write(dir.path().join("Cargo.toml"), b"[package]\nname = \"test\"").await.unwrap();
745 tokio::fs::write(dir.path().join("package.json"), b"{}").await.unwrap();
746 let wf = detect_workflow(dir.path());
747 assert_eq!(wf.name, "auto-polyglot");
748 assert_eq!(wf.stages.len(), 1);
749 let step_names: Vec<&str> = wf.stages[0].steps.iter().map(|s| s.name.as_str()).collect();
750 assert!(step_names.iter().any(|n| n.starts_with("rust:")), "missing rust steps");
751 assert!(step_names.iter().any(|n| n.starts_with("node:")), "missing node steps");
752 }
753
754 #[tokio::test]
755 async fn test_detect_workflow_polyglot_three_languages() {
756 let dir = tempfile::tempdir().unwrap();
757 tokio::fs::write(dir.path().join("Cargo.toml"), b"[package]\nname = \"test\"").await.unwrap();
758 tokio::fs::write(dir.path().join("package.json"), b"{}").await.unwrap();
759 tokio::fs::write(dir.path().join("pyproject.toml"), b"[project]\nname = \"test\"").await.unwrap();
760 let wf = detect_workflow(dir.path());
761 assert_eq!(wf.name, "auto-polyglot");
762 let step_names: Vec<&str> = wf.stages[0].steps.iter().map(|s| s.name.as_str()).collect();
763 assert!(step_names.iter().any(|n| n.starts_with("rust:")), "missing rust");
764 assert!(step_names.iter().any(|n| n.starts_with("node:")), "missing node");
765 assert!(step_names.iter().any(|n| n.starts_with("python:")), "missing python");
766 }
767
768 #[tokio::test]
769 async fn test_detect_workflow_polyglot_bun_and_go() {
770 let dir = tempfile::tempdir().unwrap();
771 tokio::fs::write(dir.path().join("package.json"), b"{}").await.unwrap();
772 tokio::fs::write(dir.path().join("bun.lock"), b"").await.unwrap();
773 tokio::fs::write(dir.path().join("go.mod"), b"module example.com/test").await.unwrap();
774 let wf = detect_workflow(dir.path());
775 assert_eq!(wf.name, "auto-polyglot");
776 let step_names: Vec<&str> = wf.stages[0].steps.iter().map(|s| s.name.as_str()).collect();
777 assert!(step_names.iter().any(|n| n.starts_with("bun:")), "missing bun steps");
778 assert!(step_names.iter().any(|n| n.starts_with("go:")), "missing go steps");
779 }
780
781 #[tokio::test]
782 async fn test_detect_workflow_subdirectory() {
783 let dir = tempfile::tempdir().unwrap();
784 std::fs::create_dir_all(dir.path().join("rust")).unwrap();
786 std::fs::write(dir.path().join("rust/Cargo.toml"), "[package]\nname = \"test\"").unwrap();
787 std::fs::create_dir_all(dir.path().join("python")).unwrap();
788 std::fs::write(dir.path().join("python/requirements.txt"), "flask\n").unwrap();
789
790 let wf = detect_workflow(dir.path());
791 assert_eq!(wf.name, "auto-polyglot");
792
793 let steps: Vec<&Step> = wf.stages.iter()
794 .flat_map(|s| s.steps.iter())
795 .collect();
796
797 let rust_steps: Vec<&&Step> = steps.iter()
800 .filter(|s| s.name.starts_with("rust("))
801 .collect();
802 assert!(!rust_steps.is_empty(), "should detect rust steps");
803 for step in &rust_steps {
804 assert_eq!(step.work_dir.as_ref().map(|p| p.to_str().unwrap()), Some("rust"),
805 "rust steps should have work_dir = 'rust'");
806 if let StepType::Command { run } = &step.step_type {
807 assert!(!run.contains("cd "), "commands should not contain cd prefix");
808 }
809 }
810
811 let python_steps: Vec<&&Step> = steps.iter()
812 .filter(|s| s.name.starts_with("python("))
813 .collect();
814 assert!(!python_steps.is_empty(), "should detect python steps");
815 for step in &python_steps {
816 assert_eq!(step.work_dir.as_ref().map(|p| p.to_str().unwrap()), Some("python"),
817 "python steps should have work_dir = 'python'");
818 if let StepType::Command { run } = &step.step_type {
819 assert!(!run.contains("cd "), "commands should not contain cd prefix");
820 }
821 }
822
823 let first_lang = steps[0].name.split(':').next().unwrap();
825 assert_eq!(first_lang, "python(python)", "python should come before rust (alphabetical subdirectory order)");
826 }
827
828 #[tokio::test]
829 async fn test_detect_workflow_same_language_sibling_subdirs() {
830 let dir = tempfile::tempdir().unwrap();
831 std::fs::create_dir_all(dir.path().join("backend")).unwrap();
833 std::fs::write(dir.path().join("backend/package.json"), "{}").unwrap();
834 std::fs::create_dir_all(dir.path().join("frontend")).unwrap();
835 std::fs::write(dir.path().join("frontend/package.json"), "{}").unwrap();
836
837 let wf = detect_workflow(dir.path());
838 assert_eq!(wf.name, "auto-polyglot");
839
840 let work_dirs: Vec<Option<&Path>> = wf.stages.iter()
842 .flat_map(|s| s.steps.iter())
843 .map(|step| step.work_dir.as_deref())
844 .collect();
845
846 assert!(work_dirs.iter().any(|wd| wd == &Some(Path::new("backend"))),
847 "backend should have steps");
848 assert!(work_dirs.iter().any(|wd| wd == &Some(Path::new("frontend"))),
849 "frontend should have steps");
850
851 let step_names: Vec<&str> = wf.stages.iter()
853 .flat_map(|s| s.steps.iter())
854 .map(|step| step.name.as_str())
855 .collect();
856
857 assert!(step_names.contains(&"node(backend):install"), "missing node(backend):install");
858 assert!(step_names.contains(&"node(backend):test"), "missing node(backend):test");
859 assert!(step_names.contains(&"node(frontend):install"), "missing node(frontend):install");
860 assert!(step_names.contains(&"node(frontend):test"), "missing node(frontend):test");
861
862 let unique_names: std::collections::HashSet<&str> = step_names.iter().copied().collect();
864 assert_eq!(step_names.len(), unique_names.len(),
865 "step names must be unique, got: {:?}", step_names);
866 }
867}