1use std::collections::HashMap;
2use std::path::Path;
3use std::sync::Arc;
4use std::time::Duration;
5
6use anyhow::{Context, Result};
7use tokio::sync::mpsc;
8use tracing::info;
9use uuid::Uuid;
10
11use dk_engine::repo::Engine;
12
13use crate::executor::Executor;
14use crate::scheduler::{self, StepResult};
15use crate::workflow::parser::parse_yaml_workflow_file;
16use crate::workflow::types::{Stage, Step, StepType, Workflow};
17use crate::workflow::validator::validate_workflow;
18
19pub struct Runner {
21 engine: Arc<Engine>,
22 executor: Box<dyn Executor>,
23}
24
25impl Runner {
26 pub fn new(engine: Arc<Engine>, executor: Box<dyn Executor>) -> Self {
27 Self { engine, executor }
28 }
29
30 pub async fn verify(
32 &self,
33 changeset_id: Uuid,
34 repo_name: &str,
35 tx: mpsc::Sender<StepResult>,
36 ) -> Result<bool> {
37 let (repo_id, repo_dir) = {
38 let (repo_id, git_repo) = self.engine.get_repo(repo_name).await?;
39 (repo_id, git_repo.path().to_path_buf())
41 };
42
43 let changeset_data = self.engine.changeset_store().get_files(changeset_id).await?;
47 let temp_dir = tempfile::tempdir().context("failed to create temp dir for verify")?;
48 let work_dir = temp_dir.path().to_path_buf();
49
50 copy_dir_recursive(&repo_dir, &work_dir).await
53 .context("failed to copy repo into temp dir")?;
54
55 let mut changeset_paths: Vec<String> = Vec::with_capacity(changeset_data.len());
57 for file in &changeset_data {
58 changeset_paths.push(file.file_path.clone());
59 if let Some(content) = &file.content {
60 if file.file_path.contains("..") {
63 anyhow::bail!(
64 "changeset file path contains traversal component: '{}'",
65 file.file_path
66 );
67 }
68 if file.file_path.starts_with('/') || file.file_path.starts_with('\\') {
70 anyhow::bail!(
71 "changeset file path is absolute: '{}'",
72 file.file_path
73 );
74 }
75 let dest = work_dir.join(&file.file_path);
76 if !dest.starts_with(&work_dir) {
79 anyhow::bail!(
80 "changeset file path escapes sandbox: '{}' resolves outside work_dir",
81 file.file_path
82 );
83 }
84 if let Some(parent) = dest.parent() {
86 tokio::fs::create_dir_all(parent).await?;
87 }
88 tokio::fs::write(&dest, content).await?;
89 }
90 }
91
92 info!(
93 "copied repo and overlaid {} changeset files into {} for verification",
94 changeset_paths.len(),
95 work_dir.display()
96 );
97
98 let workflow = self.load_workflow(&repo_dir, repo_id).await?;
102
103 if workflow.stages.is_empty() {
105 tracing::warn!(
106 changeset_id = %changeset_id,
107 repo = %repo_name,
108 "auto-approving changeset: no verification pipeline and no recognized project type"
109 );
110 return Ok(true);
111 }
112
113 validate_workflow(&workflow).context("workflow validation failed")?;
114
115 let mut env = HashMap::new();
116 env.insert("DKOD_CHANGESET_ID".to_string(), changeset_id.to_string());
117 env.insert("DKOD_REPO_ID".to_string(), repo_id.to_string());
118
119 let passed = tokio::time::timeout(
120 workflow.timeout,
121 scheduler::run_workflow(
122 &workflow,
123 self.executor.as_ref(),
124 &work_dir,
125 &changeset_paths,
126 &env,
127 &tx,
128 Some(&self.engine),
129 Some(repo_id),
130 Some(changeset_id),
131 ),
132 )
133 .await
134 .unwrap_or_else(|_| {
135 tracing::warn!("workflow '{}' timed out after {:?}", workflow.name, workflow.timeout);
136 false
137 });
138
139 Ok(passed)
141 }
142
143 async fn load_workflow(&self, repo_dir: &Path, repo_id: Uuid) -> Result<Workflow> {
144 let yaml_path = repo_dir.join(".dkod/pipeline.yaml");
146 if yaml_path.exists() {
147 info!("loading workflow from {}", yaml_path.display());
148 let workflow = parse_yaml_workflow_file(&yaml_path).await?;
149 if workflow.stages.is_empty() {
150 anyhow::bail!(
151 "pipeline.yaml exists but defines no stages — refusing to auto-approve; \
152 add at least one stage or remove the file to use auto-detection"
153 );
154 }
155 return Ok(workflow);
156 }
157
158 let legacy_toml = repo_dir.join(".dekode/pipeline.toml");
160 if legacy_toml.exists() {
161 tracing::warn!(
162 path = %legacy_toml.display(),
163 "found legacy .dekode/pipeline.toml \u{2014} this format is no longer loaded; please migrate to .dkod/pipeline.yaml"
164 );
165 }
166
167 let db_steps = self.engine
169 .pipeline_store()
170 .get_pipeline(repo_id)
171 .await
172 .unwrap_or_default();
173
174 if !db_steps.is_empty() {
175 info!(
176 "loading workflow from DB pipeline ({} steps)",
177 db_steps.len()
178 );
179 return Ok(db_pipeline_to_workflow(db_steps));
180 }
181
182 info!("auto-detecting verification workflow from project files");
184 Ok(detect_workflow(repo_dir))
185 }
186}
187
188fn db_pipeline_to_workflow(steps: Vec<dk_engine::pipeline::PipelineStep>) -> Workflow {
189 let resolved_steps: Vec<Step> = steps
190 .into_iter()
191 .map(|s| {
192 let command = s
193 .config
194 .get("command")
195 .and_then(|v| v.as_str())
196 .unwrap_or("echo 'no command configured'")
197 .to_string();
198 let timeout_secs = s
199 .config
200 .get("timeout_secs")
201 .and_then(|v| v.as_u64())
202 .unwrap_or(120);
203
204 let step_type = match s.step_type.as_str() {
205 "agent-review" => StepType::AgentReview {
206 prompt: "Review this changeset".to_string(),
207 },
208 "human-approve" => StepType::HumanApprove,
209 _ => StepType::Command { run: command },
210 };
211
212 Step {
213 name: s.step_type.clone(),
214 step_type,
215 timeout: Duration::from_secs(timeout_secs),
216 required: s.required,
217 changeset_aware: false,
218 }
219 })
220 .collect();
221
222 Workflow {
223 name: "db-pipeline".to_string(),
224 timeout: Duration::from_secs(600),
225 stages: vec![Stage {
226 name: "pipeline".to_string(),
227 parallel: false,
228 steps: resolved_steps,
229 }],
230 allowed_commands: vec![],
231 }
232}
233
234fn detect_workflow(repo_dir: &Path) -> Workflow {
237 if repo_dir.join("Cargo.toml").exists() {
238 return Workflow {
239 name: "auto-rust".to_string(),
240 timeout: Duration::from_secs(180),
241 allowed_commands: vec![],
242 stages: vec![Stage {
243 name: "checks".to_string(),
244 parallel: false,
245 steps: vec![
246 Step {
247 name: "typecheck".to_string(),
248 step_type: StepType::Command {
249 run: "cargo check".to_string(),
250 },
251 timeout: Duration::from_secs(60),
252 required: true,
253 changeset_aware: true,
254 },
255 Step {
256 name: "test".to_string(),
257 step_type: StepType::Command {
258 run: "cargo test".to_string(),
259 },
260 timeout: Duration::from_secs(60),
261 required: true,
262 changeset_aware: true,
263 },
264 ],
265 }],
266 };
267 }
268
269 if repo_dir.join("package.json").exists() {
270 let is_bun = repo_dir.join("bun.lock").exists() || repo_dir.join("bun.lockb").exists();
271 let (name, install_cmd, test_cmd) = if is_bun {
272 ("auto-bun", "bun install --frozen-lockfile", "bun test")
273 } else {
274 ("auto-node", "npm ci", "npm test")
275 };
276 return Workflow {
277 name: name.to_string(),
278 timeout: Duration::from_secs(300),
279 allowed_commands: vec![],
280 stages: vec![Stage {
281 name: "checks".to_string(),
282 parallel: false,
283 steps: vec![
284 Step {
285 name: "install".to_string(),
286 step_type: StepType::Command {
287 run: install_cmd.to_string(),
288 },
289 timeout: Duration::from_secs(120),
290 required: true,
291 changeset_aware: false,
292 },
293 Step {
294 name: "test".to_string(),
295 step_type: StepType::Command {
296 run: test_cmd.to_string(),
297 },
298 timeout: Duration::from_secs(60),
299 required: true,
300 changeset_aware: true,
301 },
302 ],
303 }],
304 };
305 }
306
307 if repo_dir.join("pyproject.toml").exists() || repo_dir.join("requirements.txt").exists() {
308 let has_requirements = repo_dir.join("requirements.txt").exists();
309 let has_pyproject = repo_dir.join("pyproject.toml").exists();
310 let mut steps = Vec::new();
311 if has_pyproject {
312 steps.push(Step {
313 name: "install".to_string(),
314 step_type: StepType::Command {
315 run: "pip install -e .".to_string(),
316 },
317 timeout: Duration::from_secs(120),
318 required: true,
319 changeset_aware: false,
320 });
321 }
322 if has_requirements {
324 steps.push(Step {
325 name: "install-deps".to_string(),
326 step_type: StepType::Command {
327 run: "pip install -r requirements.txt".to_string(),
328 },
329 timeout: Duration::from_secs(120),
330 required: true,
331 changeset_aware: false,
332 });
333 }
334 steps.push(Step {
335 name: "test".to_string(),
336 step_type: StepType::Command {
337 run: "pytest".to_string(),
338 },
339 timeout: Duration::from_secs(60),
340 required: true,
341 changeset_aware: true,
342 });
343 return Workflow {
344 name: "auto-python".to_string(),
345 timeout: Duration::from_secs(420),
346 allowed_commands: vec![],
347 stages: vec![Stage {
348 name: "checks".to_string(),
349 parallel: false,
350 steps,
351 }],
352 };
353 }
354
355 if repo_dir.join("go.mod").exists() {
356 return Workflow {
357 name: "auto-go".to_string(),
358 timeout: Duration::from_secs(300),
359 allowed_commands: vec![],
360 stages: vec![Stage {
361 name: "checks".to_string(),
362 parallel: false,
363 steps: vec![
364 Step {
365 name: "build".to_string(),
366 step_type: StepType::Command {
367 run: "go build ./...".to_string(),
368 },
369 timeout: Duration::from_secs(60),
370 required: true,
371 changeset_aware: true,
372 },
373 Step {
374 name: "vet".to_string(),
375 step_type: StepType::Command {
376 run: "go vet ./...".to_string(),
377 },
378 timeout: Duration::from_secs(60),
379 required: true,
380 changeset_aware: true,
381 },
382 Step {
383 name: "test".to_string(),
384 step_type: StepType::Command {
385 run: "go test ./...".to_string(),
386 },
387 timeout: Duration::from_secs(60),
388 required: true,
389 changeset_aware: true,
390 },
391 ],
392 }],
393 };
394 }
395
396 Workflow {
398 name: "auto-none".to_string(),
399 timeout: Duration::from_secs(30),
400 allowed_commands: vec![],
401 stages: vec![],
402 }
403}
404
405async fn copy_dir_recursive(src: &Path, dst: &Path) -> Result<()> {
407 tokio::fs::create_dir_all(dst).await?;
408 let mut entries = tokio::fs::read_dir(src).await?;
409 while let Some(entry) = entries.next_entry().await? {
410 let file_name = entry.file_name();
411 if file_name == ".git" {
413 continue;
414 }
415 let src_path = entry.path();
416 let dst_path = dst.join(&file_name);
417 let file_type = entry.file_type().await?;
418 if file_type.is_dir() {
419 Box::pin(copy_dir_recursive(&src_path, &dst_path)).await?;
420 } else if file_type.is_symlink() {
421 let target = tokio::fs::read_link(&src_path).await?;
422 let target_str = target.to_string_lossy();
426 if target_str.starts_with('/') || target_str.contains("..") {
427 tracing::warn!(
428 src = %src_path.display(),
429 target = %target.display(),
430 "skipping symlink that points outside sandbox"
431 );
432 continue;
433 }
434 #[cfg(unix)]
435 tokio::fs::symlink(target, &dst_path).await?;
436 } else {
437 tokio::fs::copy(&src_path, &dst_path).await?;
438 }
439 }
440 Ok(())
441}
442
443#[cfg(test)]
444mod tests {
445 use super::*;
446
447 #[tokio::test]
448 async fn test_detect_workflow_rust() {
449 let dir = tempfile::tempdir().unwrap();
450 tokio::fs::write(dir.path().join("Cargo.toml"), b"[package]\nname = \"test\"")
451 .await.unwrap();
452 let wf = detect_workflow(dir.path());
453 assert_eq!(wf.name, "auto-rust");
454 assert_eq!(wf.stages.len(), 1);
455 assert_eq!(wf.stages[0].steps.len(), 2);
456 }
457
458 #[tokio::test]
459 async fn test_detect_workflow_bun() {
460 let dir = tempfile::tempdir().unwrap();
461 tokio::fs::write(dir.path().join("package.json"), b"{}").await.unwrap();
462 tokio::fs::write(dir.path().join("bun.lock"), b"").await.unwrap();
463 let wf = detect_workflow(dir.path());
464 assert_eq!(wf.name, "auto-bun");
465 assert_eq!(wf.stages[0].steps.len(), 2);
466 let cmds: Vec<_> = wf.stages[0].steps.iter().filter_map(|s| {
467 if let StepType::Command { run } = &s.step_type { Some(run.as_str()) } else { None }
468 }).collect();
469 assert!(cmds.contains(&"bun install --frozen-lockfile"));
470 assert!(cmds.contains(&"bun test"));
471 }
472
473 #[tokio::test]
474 async fn test_detect_workflow_bun_lockb() {
475 let dir = tempfile::tempdir().unwrap();
476 tokio::fs::write(dir.path().join("package.json"), b"{}").await.unwrap();
477 tokio::fs::write(dir.path().join("bun.lockb"), b"\x00").await.unwrap();
478 let wf = detect_workflow(dir.path());
479 assert_eq!(wf.name, "auto-bun");
480 assert_eq!(wf.stages[0].steps.len(), 2);
481 let cmds: Vec<_> = wf.stages[0].steps.iter().filter_map(|s| {
482 if let StepType::Command { run } = &s.step_type { Some(run.as_str()) } else { None }
483 }).collect();
484 assert!(cmds.contains(&"bun install --frozen-lockfile"));
485 assert!(cmds.contains(&"bun test"));
486 }
487
488 #[tokio::test]
489 async fn test_detect_workflow_npm() {
490 let dir = tempfile::tempdir().unwrap();
491 tokio::fs::write(dir.path().join("package.json"), b"{}").await.unwrap();
492 let wf = detect_workflow(dir.path());
493 assert_eq!(wf.name, "auto-node");
494 assert_eq!(wf.stages[0].steps.len(), 2);
495 let cmds: Vec<_> = wf.stages[0].steps.iter().filter_map(|s| {
496 if let StepType::Command { run } = &s.step_type { Some(run.as_str()) } else { None }
497 }).collect();
498 assert!(cmds.contains(&"npm ci"));
499 assert!(cmds.contains(&"npm test"));
500 }
501
502 #[tokio::test]
503 async fn test_detect_workflow_python_pyproject() {
504 let dir = tempfile::tempdir().unwrap();
505 tokio::fs::write(dir.path().join("pyproject.toml"), b"[project]").await.unwrap();
506 let wf = detect_workflow(dir.path());
507 assert_eq!(wf.name, "auto-python");
508 assert_eq!(wf.stages[0].steps.len(), 2);
510 let cmds: Vec<_> = wf.stages[0].steps.iter().filter_map(|s| {
511 if let StepType::Command { run } = &s.step_type { Some(run.as_str()) } else { None }
512 }).collect();
513 assert!(cmds.contains(&"pip install -e ."));
514 assert!(cmds.contains(&"pytest"));
515 }
516
517 #[tokio::test]
518 async fn test_detect_workflow_python_dual_file() {
519 let dir = tempfile::tempdir().unwrap();
520 tokio::fs::write(dir.path().join("pyproject.toml"), b"[project]").await.unwrap();
521 tokio::fs::write(dir.path().join("requirements.txt"), b"pytest\nrequests").await.unwrap();
522 let wf = detect_workflow(dir.path());
523 assert_eq!(wf.name, "auto-python");
524 assert_eq!(wf.stages[0].steps.len(), 3);
526 let cmds: Vec<_> = wf.stages[0].steps.iter().filter_map(|s| {
527 if let StepType::Command { run } = &s.step_type { Some(run.as_str()) } else { None }
528 }).collect();
529 assert!(cmds.contains(&"pip install -e ."));
530 assert!(cmds.contains(&"pip install -r requirements.txt"));
531 assert!(cmds.contains(&"pytest"));
532 }
533
534 #[tokio::test]
535 async fn test_detect_workflow_python_requirements() {
536 let dir = tempfile::tempdir().unwrap();
537 tokio::fs::write(dir.path().join("requirements.txt"), b"pytest\nrequests").await.unwrap();
538 let wf = detect_workflow(dir.path());
539 assert_eq!(wf.name, "auto-python");
540 assert_eq!(wf.stages[0].steps.len(), 2);
542 let cmds: Vec<_> = wf.stages[0].steps.iter().filter_map(|s| {
543 if let StepType::Command { run } = &s.step_type { Some(run.as_str()) } else { None }
544 }).collect();
545 assert!(cmds.contains(&"pip install -r requirements.txt"));
546 assert!(cmds.contains(&"pytest"));
547 }
548
549 #[tokio::test]
550 async fn test_detect_workflow_python_both_files() {
551 let dir = tempfile::tempdir().unwrap();
552 tokio::fs::write(dir.path().join("pyproject.toml"), b"[project]").await.unwrap();
553 tokio::fs::write(dir.path().join("requirements.txt"), b"pytest\nrequests").await.unwrap();
554 let wf = detect_workflow(dir.path());
555 assert_eq!(wf.name, "auto-python");
556 assert_eq!(wf.stages[0].steps.len(), 3);
558 let cmds: Vec<_> = wf.stages[0].steps.iter().filter_map(|s| {
559 if let StepType::Command { run } = &s.step_type { Some(run.as_str()) } else { None }
560 }).collect();
561 assert!(cmds.contains(&"pip install -e ."));
562 assert!(cmds.contains(&"pip install -r requirements.txt"));
563 assert!(cmds.contains(&"pytest"));
564 }
565
566 #[tokio::test]
567 async fn test_detect_workflow_go() {
568 let dir = tempfile::tempdir().unwrap();
569 tokio::fs::write(dir.path().join("go.mod"), b"module example.com/test").await.unwrap();
570 let wf = detect_workflow(dir.path());
571 assert_eq!(wf.name, "auto-go");
572 assert_eq!(wf.stages[0].steps.len(), 3);
573 }
574
575 #[tokio::test]
576 async fn test_detect_workflow_unknown() {
577 let dir = tempfile::tempdir().unwrap();
578 let wf = detect_workflow(dir.path());
579 assert_eq!(wf.name, "auto-none");
580 assert!(wf.stages.is_empty());
581 }
582
583 #[tokio::test]
584 async fn test_copy_dir_recursive_copies_files() {
585 let src = tempfile::tempdir().unwrap();
586 let dst = tempfile::tempdir().unwrap();
587
588 tokio::fs::write(src.path().join("Cargo.toml"), b"[package]\nname = \"test\"")
589 .await
590 .unwrap();
591 tokio::fs::create_dir_all(src.path().join("src")).await.unwrap();
592 tokio::fs::write(src.path().join("src/main.rs"), b"fn main() {}")
593 .await
594 .unwrap();
595
596 tokio::fs::create_dir_all(src.path().join(".git/objects")).await.unwrap();
598 tokio::fs::write(src.path().join(".git/HEAD"), b"ref: refs/heads/main")
599 .await
600 .unwrap();
601
602 copy_dir_recursive(src.path(), dst.path()).await.unwrap();
603
604 assert!(dst.path().join("Cargo.toml").exists(), "Cargo.toml must be at dst root");
605 assert!(dst.path().join("src/main.rs").exists(), "src/main.rs must exist");
606 assert!(!dst.path().join(".git").exists(), ".git must be skipped");
607 }
608
609 #[tokio::test]
610 async fn test_copy_dir_recursive_handles_symlinks() {
611 let src = tempfile::tempdir().unwrap();
612 let dst = tempfile::tempdir().unwrap();
613
614 tokio::fs::write(src.path().join("real.txt"), b"hello").await.unwrap();
616 #[cfg(unix)]
617 tokio::fs::symlink("real.txt", src.path().join("link.txt")).await.unwrap();
618
619 copy_dir_recursive(src.path(), dst.path()).await.unwrap();
620
621 assert!(dst.path().join("real.txt").exists());
622 #[cfg(unix)]
623 {
624 let meta = tokio::fs::symlink_metadata(dst.path().join("link.txt")).await.unwrap();
625 assert!(meta.file_type().is_symlink(), "symlink should be preserved");
626 let target = tokio::fs::read_link(dst.path().join("link.txt")).await.unwrap();
627 assert_eq!(target.to_str().unwrap(), "real.txt");
628 }
629 }
630
631 #[tokio::test]
632 async fn test_copy_dir_recursive_handles_dir_symlinks() {
633 let src = tempfile::tempdir().unwrap();
634 let dst = tempfile::tempdir().unwrap();
635
636 tokio::fs::create_dir_all(src.path().join("real_dir")).await.unwrap();
638 tokio::fs::write(src.path().join("real_dir/file.txt"), b"content").await.unwrap();
639 #[cfg(unix)]
640 tokio::fs::symlink("real_dir", src.path().join("linked_dir")).await.unwrap();
641
642 copy_dir_recursive(src.path(), dst.path()).await.unwrap();
643
644 assert!(dst.path().join("real_dir/file.txt").exists());
645 #[cfg(unix)]
646 {
647 let meta = tokio::fs::symlink_metadata(dst.path().join("linked_dir")).await.unwrap();
648 assert!(meta.file_type().is_symlink(), "dir symlink should be preserved");
649 let target = tokio::fs::read_link(dst.path().join("linked_dir")).await.unwrap();
650 assert_eq!(target.to_str().unwrap(), "real_dir");
651 }
652 }
653
654 #[test]
655 fn test_db_pipeline_conversion() {
656 let steps = vec![
657 dk_engine::pipeline::PipelineStep {
658 repo_id: Uuid::new_v4(),
659 step_order: 1,
660 step_type: "typecheck".to_string(),
661 config: serde_json::json!({"command": "cargo check", "timeout_secs": 120}),
662 required: true,
663 },
664 dk_engine::pipeline::PipelineStep {
665 repo_id: Uuid::new_v4(),
666 step_order: 2,
667 step_type: "test".to_string(),
668 config: serde_json::json!({"command": "cargo test", "timeout_secs": 300}),
669 required: true,
670 },
671 ];
672 let wf = db_pipeline_to_workflow(steps);
673 assert_eq!(wf.stages.len(), 1);
674 assert_eq!(wf.stages[0].steps.len(), 2);
675 }
676}