1use std::{fmt::Write as _, path::PathBuf, sync::Arc};
2
3use anyhow::{Context, Result};
4use rusqlite::Connection;
5use tokio::sync::Mutex;
6use tokio_util::sync::CancellationToken;
7use tracing::{debug, info, warn};
8
9use crate::{
10 agents::{
11 self, AgentContext, AgentInvocation, AgentRole, Complexity, PlannerOutput, Severity,
12 invoke_agent, parse_planner_output, parse_review_output,
13 },
14 config::Config,
15 db::{self, AgentRun, ReviewFinding, Run, RunStatus},
16 git,
17 github::{self, GhClient},
18 issues::{IssueOrigin, IssueProvider, PipelineIssue},
19 process::CommandRunner,
20};
21
22pub struct PipelineExecutor<R: CommandRunner> {
24 pub runner: Arc<R>,
25 pub github: Arc<GhClient<R>>,
26 pub issues: Arc<dyn IssueProvider>,
27 pub db: Arc<Mutex<Connection>>,
28 pub config: Config,
29 pub cancel_token: CancellationToken,
30 pub repo_dir: PathBuf,
31}
32
33impl<R: CommandRunner + 'static> PipelineExecutor<R> {
34 pub async fn run_issue(&self, issue: &PipelineIssue, auto_merge: bool) -> Result<()> {
36 self.run_issue_with_complexity(issue, auto_merge, None).await
37 }
38
39 pub async fn run_issue_with_complexity(
41 &self,
42 issue: &PipelineIssue,
43 auto_merge: bool,
44 complexity: Option<Complexity>,
45 ) -> Result<()> {
46 let run_id = generate_run_id();
47
48 let (target_dir, is_multi_repo) = self.resolve_target_dir(issue.target_repo.as_ref())?;
50
51 let base_branch = git::default_branch(&target_dir).await?;
52
53 let mut run = new_run(&run_id, issue, auto_merge);
54 if let Some(ref c) = complexity {
55 run.complexity = c.to_string();
56 }
57 {
58 let conn = self.db.lock().await;
59 db::runs::insert_run(&conn, &run)?;
60 }
61
62 self.issues
63 .transition(issue.number, &self.config.labels.ready, &self.config.labels.cooking)
64 .await?;
65
66 let worktree = git::create_worktree(&target_dir, issue.number, &base_branch).await?;
67 self.record_worktree(&run_id, &worktree).await?;
68
69 git::empty_commit(
71 &worktree.path,
72 &format!("chore: start oven pipeline for issue #{}", issue.number),
73 )
74 .await?;
75
76 info!(
77 run_id = %run_id,
78 issue = issue.number,
79 branch = %worktree.branch,
80 target_repo = ?issue.target_repo,
81 "starting pipeline"
82 );
83
84 let pr_number = self.create_pr(&run_id, issue, &worktree.branch, &target_dir).await?;
85
86 let ctx = AgentContext {
87 issue_number: issue.number,
88 issue_title: issue.title.clone(),
89 issue_body: issue.body.clone(),
90 branch: worktree.branch.clone(),
91 pr_number: Some(pr_number),
92 test_command: self.config.project.test.clone(),
93 lint_command: self.config.project.lint.clone(),
94 review_findings: None,
95 cycle: 1,
96 target_repo: if is_multi_repo { issue.target_repo.clone() } else { None },
97 issue_source: issue.source.as_str().to_string(),
98 };
99
100 let result = self.run_steps(&run_id, &ctx, &worktree.path, auto_merge).await;
101 self.finalize_run(&run_id, issue, pr_number, &result).await?;
102
103 if let Err(e) = git::remove_worktree(&target_dir, &worktree.path).await {
104 warn!(run_id = %run_id, error = %e, "failed to clean up worktree");
105 }
106
107 result
108 }
109
110 pub async fn plan_issues(&self, issues: &[PipelineIssue]) -> Option<PlannerOutput> {
114 let prompt = match agents::planner::build_prompt(issues) {
115 Ok(p) => p,
116 Err(e) => {
117 warn!(error = %e, "planner prompt build failed");
118 return None;
119 }
120 };
121 let invocation = AgentInvocation {
122 role: AgentRole::Planner,
123 prompt,
124 working_dir: self.repo_dir.clone(),
125 max_turns: Some(self.config.pipeline.turn_limit),
126 };
127
128 match invoke_agent(self.runner.as_ref(), &invocation).await {
129 Ok(result) => {
130 debug!(output = %result.output, "raw planner output");
131 let parsed = parse_planner_output(&result.output);
132 if parsed.is_none() {
133 warn!(output = %result.output, "planner returned unparseable output, falling back to single batch");
134 }
135 parsed
136 }
137 Err(e) => {
138 warn!(error = %e, "planner agent failed, falling back to single batch");
139 None
140 }
141 }
142 }
143
144 fn resolve_target_dir(&self, target_repo: Option<&String>) -> Result<(PathBuf, bool)> {
149 if !self.config.multi_repo.enabled {
150 return Ok((self.repo_dir.clone(), false));
151 }
152 match target_repo {
153 Some(name) => {
154 let path = self.config.resolve_repo(name)?;
155 Ok((path, true))
156 }
157 None => Ok((self.repo_dir.clone(), false)),
158 }
159 }
160
161 async fn record_worktree(&self, run_id: &str, worktree: &git::Worktree) -> Result<()> {
162 let conn = self.db.lock().await;
163 db::runs::update_run_worktree(
164 &conn,
165 run_id,
166 &worktree.branch,
167 &worktree.path.to_string_lossy(),
168 )?;
169 drop(conn);
170 Ok(())
171 }
172
173 async fn create_pr(
174 &self,
175 run_id: &str,
176 issue: &PipelineIssue,
177 branch: &str,
178 repo_dir: &std::path::Path,
179 ) -> Result<u32> {
180 let (pr_title, pr_body) = match issue.source {
181 IssueOrigin::Github => (
182 format!("fix(#{}): {}", issue.number, issue.title),
183 format!(
184 "Resolves #{}\n\nAutomated by [oven](https://github.com/clayharmon/oven-cli).",
185 issue.number
186 ),
187 ),
188 IssueOrigin::Local => (
189 format!("fix: {}", issue.title),
190 format!(
191 "From local issue #{}\n\nAutomated by [oven](https://github.com/clayharmon/oven-cli).",
192 issue.number
193 ),
194 ),
195 };
196
197 git::push_branch(repo_dir, branch).await?;
198 let pr_number =
199 self.github.create_draft_pr_in(&pr_title, branch, &pr_body, repo_dir).await?;
200
201 {
202 let conn = self.db.lock().await;
203 db::runs::update_run_pr(&conn, run_id, pr_number)?;
204 }
205
206 info!(run_id = %run_id, pr = pr_number, "draft PR created");
207 Ok(pr_number)
208 }
209
210 async fn finalize_run(
211 &self,
212 run_id: &str,
213 issue: &PipelineIssue,
214 pr_number: u32,
215 result: &Result<()>,
216 ) -> Result<()> {
217 let (final_status, error_msg) = match result {
218 Ok(()) => {
219 self.issues
220 .transition(
221 issue.number,
222 &self.config.labels.cooking,
223 &self.config.labels.complete,
224 )
225 .await?;
226
227 let should_close =
231 issue.source == IssueOrigin::Local || issue.target_repo.is_some();
232
233 if should_close {
234 let comment = issue.target_repo.as_ref().map_or_else(
235 || format!("Implemented in #{pr_number}"),
236 |repo_name| format!("Implemented in {repo_name}#{pr_number}"),
237 );
238 if let Err(e) = self.issues.close(issue.number, Some(&comment)).await {
239 warn!(
240 run_id = %run_id,
241 error = %e,
242 "failed to close issue"
243 );
244 }
245 }
246
247 (RunStatus::Complete, None)
248 }
249 Err(e) => {
250 warn!(run_id = %run_id, error = %e, "pipeline failed");
251 github::safe_comment(&self.github, pr_number, &format!("Pipeline failed: {e:#}"))
252 .await;
253 let _ = self
254 .issues
255 .transition(
256 issue.number,
257 &self.config.labels.cooking,
258 &self.config.labels.failed,
259 )
260 .await;
261 (RunStatus::Failed, Some(format!("{e:#}")))
262 }
263 };
264
265 let conn = self.db.lock().await;
266 db::runs::finish_run(&conn, run_id, final_status, error_msg.as_deref())
267 }
268
269 async fn run_steps(
270 &self,
271 run_id: &str,
272 ctx: &AgentContext,
273 worktree_path: &std::path::Path,
274 auto_merge: bool,
275 ) -> Result<()> {
276 self.check_cancelled()?;
277
278 self.update_status(run_id, RunStatus::Implementing).await?;
280 let impl_prompt = agents::implementer::build_prompt(ctx)?;
281 self.run_agent(run_id, AgentRole::Implementer, &impl_prompt, worktree_path, 1).await?;
282
283 git::push_branch(worktree_path, &ctx.branch).await?;
284
285 let clean = self.run_review_fix_loop(run_id, ctx, worktree_path).await?;
287
288 if !clean {
289 anyhow::bail!("unresolved findings after max review cycles");
290 }
291
292 self.check_cancelled()?;
294 ctx.pr_number.context("no PR number for merge step")?;
295 self.update_status(run_id, RunStatus::Merging).await?;
296 let merge_prompt = agents::merger::build_prompt(ctx, auto_merge)?;
297 self.run_agent(run_id, AgentRole::Merger, &merge_prompt, worktree_path, 1).await?;
298
299 Ok(())
300 }
301
302 async fn run_review_fix_loop(
303 &self,
304 run_id: &str,
305 ctx: &AgentContext,
306 worktree_path: &std::path::Path,
307 ) -> Result<bool> {
308 for cycle in 1..=2 {
309 self.check_cancelled()?;
310
311 self.update_status(run_id, RunStatus::Reviewing).await?;
312 let review_prompt = agents::reviewer::build_prompt(ctx)?;
313 let review_result = self
314 .run_agent(run_id, AgentRole::Reviewer, &review_prompt, worktree_path, cycle)
315 .await?;
316
317 let review_output = match parse_review_output(&review_result.output) {
318 Ok(output) => output,
319 Err(e) => {
320 warn!(run_id = %run_id, cycle, error = %e, "review output unparseable, treating as failed review");
321 if let Some(pr_number) = ctx.pr_number {
322 github::safe_comment(
323 &self.github,
324 pr_number,
325 &format!("Review cycle {cycle} returned unparseable output. Stopping pipeline."),
326 )
327 .await;
328 }
329 anyhow::bail!("reviewer returned unparseable output in cycle {cycle}");
330 }
331 };
332 self.store_findings(run_id, &review_output.findings).await?;
333
334 let actionable: Vec<_> =
335 review_output.findings.iter().filter(|f| f.severity != Severity::Info).collect();
336
337 if actionable.is_empty() {
338 info!(run_id = %run_id, cycle, "review clean");
339 return Ok(true);
340 }
341
342 info!(run_id = %run_id, cycle, findings = actionable.len(), "review found issues");
343
344 if cycle == 2 {
345 if let Some(pr_number) = ctx.pr_number {
346 let comment = format_unresolved_comment(&actionable);
347 github::safe_comment(&self.github, pr_number, &comment).await;
348 } else {
349 warn!(run_id = %run_id, "no PR number, cannot post unresolved findings");
350 }
351 return Ok(false);
352 }
353
354 self.check_cancelled()?;
356 self.update_status(run_id, RunStatus::Fixing).await?;
357
358 let unresolved = {
359 let conn = self.db.lock().await;
360 db::agent_runs::get_unresolved_findings(&conn, run_id)?
361 };
362
363 let fix_prompt = agents::fixer::build_prompt(ctx, &unresolved)?;
364 self.run_agent(run_id, AgentRole::Fixer, &fix_prompt, worktree_path, cycle).await?;
365
366 git::push_branch(worktree_path, &ctx.branch).await?;
367 }
368
369 Ok(false)
370 }
371
372 async fn store_findings(&self, run_id: &str, findings: &[agents::Finding]) -> Result<()> {
373 let conn = self.db.lock().await;
374 let agent_runs = db::agent_runs::get_agent_runs_for_run(&conn, run_id)?;
375 let reviewer_run_id = agent_runs
376 .iter()
377 .rev()
378 .find_map(|ar| if ar.agent == "reviewer" { Some(ar.id) } else { None });
379 if let Some(ar_id) = reviewer_run_id {
380 for finding in findings {
381 let db_finding = ReviewFinding {
382 id: 0,
383 agent_run_id: ar_id,
384 severity: finding.severity.to_string(),
385 category: finding.category.clone(),
386 file_path: finding.file_path.clone(),
387 line_number: finding.line_number,
388 message: finding.message.clone(),
389 resolved: false,
390 };
391 db::agent_runs::insert_finding(&conn, &db_finding)?;
392 }
393 }
394 drop(conn);
395 Ok(())
396 }
397
398 async fn run_agent(
399 &self,
400 run_id: &str,
401 role: AgentRole,
402 prompt: &str,
403 working_dir: &std::path::Path,
404 cycle: u32,
405 ) -> Result<crate::process::AgentResult> {
406 let agent_run_id = self.record_agent_start(run_id, role, cycle).await?;
407
408 info!(run_id = %run_id, agent = %role, cycle, "agent starting");
409
410 let invocation = AgentInvocation {
411 role,
412 prompt: prompt.to_string(),
413 working_dir: working_dir.to_path_buf(),
414 max_turns: Some(self.config.pipeline.turn_limit),
415 };
416
417 let result = invoke_agent(self.runner.as_ref(), &invocation).await;
418
419 match &result {
420 Ok(agent_result) => {
421 self.record_agent_success(run_id, agent_run_id, agent_result).await?;
422 }
423 Err(e) => {
424 let conn = self.db.lock().await;
425 db::agent_runs::finish_agent_run(
426 &conn,
427 agent_run_id,
428 "failed",
429 0.0,
430 0,
431 None,
432 Some(&format!("{e:#}")),
433 None,
434 )?;
435 }
436 }
437
438 result
439 }
440
441 async fn record_agent_start(&self, run_id: &str, role: AgentRole, cycle: u32) -> Result<i64> {
442 let agent_run = AgentRun {
443 id: 0,
444 run_id: run_id.to_string(),
445 agent: role.to_string(),
446 cycle,
447 status: "running".to_string(),
448 cost_usd: 0.0,
449 turns: 0,
450 started_at: chrono::Utc::now().to_rfc3339(),
451 finished_at: None,
452 output_summary: None,
453 error_message: None,
454 raw_output: None,
455 };
456 let conn = self.db.lock().await;
457 db::agent_runs::insert_agent_run(&conn, &agent_run)
458 }
459
460 async fn record_agent_success(
461 &self,
462 run_id: &str,
463 agent_run_id: i64,
464 agent_result: &crate::process::AgentResult,
465 ) -> Result<()> {
466 let conn = self.db.lock().await;
467 db::agent_runs::finish_agent_run(
468 &conn,
469 agent_run_id,
470 "complete",
471 agent_result.cost_usd,
472 agent_result.turns,
473 Some(&truncate(&agent_result.output, 500)),
474 None,
475 Some(&agent_result.output),
476 )?;
477
478 let new_cost = db::runs::increment_run_cost(&conn, run_id, agent_result.cost_usd)?;
479 drop(conn);
480
481 if new_cost > self.config.pipeline.cost_budget {
482 anyhow::bail!(
483 "cost budget exceeded: ${:.2} > ${:.2}",
484 new_cost,
485 self.config.pipeline.cost_budget
486 );
487 }
488 Ok(())
489 }
490
491 async fn update_status(&self, run_id: &str, status: RunStatus) -> Result<()> {
492 let conn = self.db.lock().await;
493 db::runs::update_run_status(&conn, run_id, status)
494 }
495
496 fn check_cancelled(&self) -> Result<()> {
497 if self.cancel_token.is_cancelled() {
498 anyhow::bail!("pipeline cancelled");
499 }
500 Ok(())
501 }
502}
503
504fn format_unresolved_comment(actionable: &[&agents::Finding]) -> String {
505 let mut comment = String::from("## Unresolved findings after 2 review cycles\n\n");
506 for f in actionable {
507 let loc = match (&f.file_path, f.line_number) {
508 (Some(path), Some(line)) => format!(" at `{path}:{line}`"),
509 (Some(path), None) => format!(" in `{path}`"),
510 _ => String::new(),
511 };
512 let _ = writeln!(comment, "- **[{}]** {}{}: {}", f.severity, f.category, loc, f.message);
513 }
514 comment
515}
516
517fn new_run(run_id: &str, issue: &PipelineIssue, auto_merge: bool) -> Run {
518 Run {
519 id: run_id.to_string(),
520 issue_number: issue.number,
521 status: RunStatus::Pending,
522 pr_number: None,
523 branch: None,
524 worktree_path: None,
525 cost_usd: 0.0,
526 auto_merge,
527 started_at: chrono::Utc::now().to_rfc3339(),
528 finished_at: None,
529 error_message: None,
530 complexity: "full".to_string(),
531 issue_source: issue.source.to_string(),
532 }
533}
534
535pub fn generate_run_id() -> String {
537 uuid::Uuid::new_v4().to_string()[..8].to_string()
538}
539
540pub(crate) fn truncate(s: &str, max_len: usize) -> String {
545 if s.len() <= max_len {
546 return s.to_string();
547 }
548 let target = max_len.saturating_sub(3);
549 let mut end = target;
550 while end > 0 && !s.is_char_boundary(end) {
551 end -= 1;
552 }
553 format!("{}...", &s[..end])
554}
555
556#[cfg(test)]
557mod tests {
558 use proptest::prelude::*;
559
560 use super::*;
561
562 proptest! {
563 #[test]
564 fn run_ids_always_8_hex_chars(_seed in any::<u64>()) {
565 let id = generate_run_id();
566 prop_assert_eq!(id.len(), 8);
567 prop_assert!(id.chars().all(|c| c.is_ascii_hexdigit()));
568 }
569 }
570
571 #[test]
572 fn run_id_is_8_hex_chars() {
573 let id = generate_run_id();
574 assert_eq!(id.len(), 8);
575 assert!(id.chars().all(|c| c.is_ascii_hexdigit()));
576 }
577
578 #[test]
579 fn run_ids_are_unique() {
580 let ids: Vec<_> = (0..100).map(|_| generate_run_id()).collect();
581 let unique: std::collections::HashSet<_> = ids.iter().collect();
582 assert_eq!(ids.len(), unique.len());
583 }
584
585 #[test]
586 fn truncate_short_string() {
587 assert_eq!(truncate("hello", 10), "hello");
588 }
589
590 #[test]
591 fn truncate_long_string() {
592 let long = "a".repeat(100);
593 let result = truncate(&long, 10);
594 assert_eq!(result.len(), 10); assert!(result.ends_with("..."));
596 }
597
598 #[test]
599 fn truncate_multibyte_does_not_panic() {
600 let s = "πππ";
603 let result = truncate(s, 8);
604 assert!(result.ends_with("..."));
605 assert!(result.starts_with("π"));
606 assert!(result.len() <= 8);
607 }
608
609 #[test]
610 fn truncate_cjk_boundary() {
611 let s = "δ½ ε₯½δΈηζ΅θ―"; let result = truncate(s, 10);
615 assert!(result.ends_with("..."));
616 assert!(result.starts_with("δ½ ε₯½"));
617 assert!(result.len() <= 10);
618 }
619
620 #[test]
621 fn format_unresolved_comment_includes_findings() {
622 let findings = [
623 agents::Finding {
624 severity: Severity::Critical,
625 category: "bug".to_string(),
626 file_path: Some("src/main.rs".to_string()),
627 line_number: Some(42),
628 message: "null pointer".to_string(),
629 },
630 agents::Finding {
631 severity: Severity::Warning,
632 category: "style".to_string(),
633 file_path: None,
634 line_number: None,
635 message: "missing docs".to_string(),
636 },
637 ];
638 let refs: Vec<_> = findings.iter().collect();
639 let comment = format_unresolved_comment(&refs);
640 assert!(comment.contains("Unresolved findings"));
641 assert!(comment.contains("null pointer"));
642 assert!(comment.contains("`src/main.rs:42`"));
643 assert!(comment.contains("missing docs"));
644 }
645}