1use std::{fmt::Write as _, path::PathBuf, sync::Arc};
2
3use anyhow::{Context, Result};
4use rusqlite::Connection;
5use tokio::sync::Mutex;
6use tokio_util::sync::CancellationToken;
7use tracing::{debug, info, warn};
8
9use crate::{
10 agents::{
11 self, AgentContext, AgentInvocation, AgentRole, Complexity, InFlightIssue, PlannerOutput,
12 Severity, invoke_agent, parse_planner_output, parse_review_output,
13 },
14 config::Config,
15 db::{self, AgentRun, ReviewFinding, Run, RunStatus},
16 git,
17 github::{self, GhClient},
18 issues::{IssueOrigin, IssueProvider, PipelineIssue},
19 process::CommandRunner,
20};
21
22pub struct PipelineExecutor<R: CommandRunner> {
24 pub runner: Arc<R>,
25 pub github: Arc<GhClient<R>>,
26 pub issues: Arc<dyn IssueProvider>,
27 pub db: Arc<Mutex<Connection>>,
28 pub config: Config,
29 pub cancel_token: CancellationToken,
30 pub repo_dir: PathBuf,
31}
32
33impl<R: CommandRunner + 'static> PipelineExecutor<R> {
34 pub async fn run_issue(&self, issue: &PipelineIssue, auto_merge: bool) -> Result<()> {
36 self.run_issue_with_complexity(issue, auto_merge, None).await
37 }
38
39 pub async fn run_issue_with_complexity(
41 &self,
42 issue: &PipelineIssue,
43 auto_merge: bool,
44 complexity: Option<Complexity>,
45 ) -> Result<()> {
46 let run_id = generate_run_id();
47
48 let (target_dir, is_multi_repo) = self.resolve_target_dir(issue.target_repo.as_ref())?;
50
51 let base_branch = git::default_branch(&target_dir).await?;
52
53 let mut run = new_run(&run_id, issue, auto_merge);
54 if let Some(ref c) = complexity {
55 run.complexity = c.to_string();
56 }
57 {
58 let conn = self.db.lock().await;
59 db::runs::insert_run(&conn, &run)?;
60 }
61
62 self.issues
63 .transition(issue.number, &self.config.labels.ready, &self.config.labels.cooking)
64 .await?;
65
66 let worktree = git::create_worktree(&target_dir, issue.number, &base_branch).await?;
67 self.record_worktree(&run_id, &worktree).await?;
68
69 git::empty_commit(
71 &worktree.path,
72 &format!("chore: start oven pipeline for issue #{}", issue.number),
73 )
74 .await?;
75
76 info!(
77 run_id = %run_id,
78 issue = issue.number,
79 branch = %worktree.branch,
80 target_repo = ?issue.target_repo,
81 "starting pipeline"
82 );
83
84 let pr_number = self.create_pr(&run_id, issue, &worktree.branch, &target_dir).await?;
85
86 let ctx = AgentContext {
87 issue_number: issue.number,
88 issue_title: issue.title.clone(),
89 issue_body: issue.body.clone(),
90 branch: worktree.branch.clone(),
91 pr_number: Some(pr_number),
92 test_command: self.config.project.test.clone(),
93 lint_command: self.config.project.lint.clone(),
94 review_findings: None,
95 cycle: 1,
96 target_repo: if is_multi_repo { issue.target_repo.clone() } else { None },
97 issue_source: issue.source.as_str().to_string(),
98 base_branch: base_branch.clone(),
99 };
100
101 let result = self.run_steps(&run_id, &ctx, &worktree.path, auto_merge, &base_branch).await;
102 self.finalize_run(&run_id, issue, pr_number, &result).await?;
103
104 if let Err(e) = git::remove_worktree(&target_dir, &worktree.path).await {
105 warn!(run_id = %run_id, error = %e, "failed to clean up worktree");
106 }
107
108 result
109 }
110
111 pub async fn plan_issues(
118 &self,
119 issues: &[PipelineIssue],
120 in_flight: &[InFlightIssue],
121 ) -> Option<PlannerOutput> {
122 let prompt = match agents::planner::build_prompt(issues, in_flight) {
123 Ok(p) => p,
124 Err(e) => {
125 warn!(error = %e, "planner prompt build failed");
126 return None;
127 }
128 };
129 let invocation = AgentInvocation {
130 role: AgentRole::Planner,
131 prompt,
132 working_dir: self.repo_dir.clone(),
133 max_turns: Some(self.config.pipeline.turn_limit),
134 };
135
136 match invoke_agent(self.runner.as_ref(), &invocation).await {
137 Ok(result) => {
138 debug!(output = %result.output, "raw planner output");
139 let parsed = parse_planner_output(&result.output);
140 if parsed.is_none() {
141 warn!(output = %result.output, "planner returned unparseable output, falling back to single batch");
142 }
143 parsed
144 }
145 Err(e) => {
146 warn!(error = %e, "planner agent failed, falling back to single batch");
147 None
148 }
149 }
150 }
151
152 fn resolve_target_dir(&self, target_repo: Option<&String>) -> Result<(PathBuf, bool)> {
157 if !self.config.multi_repo.enabled {
158 return Ok((self.repo_dir.clone(), false));
159 }
160 match target_repo {
161 Some(name) => {
162 let path = self.config.resolve_repo(name)?;
163 Ok((path, true))
164 }
165 None => Ok((self.repo_dir.clone(), false)),
166 }
167 }
168
169 async fn record_worktree(&self, run_id: &str, worktree: &git::Worktree) -> Result<()> {
170 let conn = self.db.lock().await;
171 db::runs::update_run_worktree(
172 &conn,
173 run_id,
174 &worktree.branch,
175 &worktree.path.to_string_lossy(),
176 )?;
177 drop(conn);
178 Ok(())
179 }
180
181 async fn create_pr(
182 &self,
183 run_id: &str,
184 issue: &PipelineIssue,
185 branch: &str,
186 repo_dir: &std::path::Path,
187 ) -> Result<u32> {
188 let (pr_title, pr_body) = match issue.source {
189 IssueOrigin::Github => (
190 format!("fix(#{}): {}", issue.number, issue.title),
191 format!(
192 "Resolves #{}\n\nAutomated by [oven](https://github.com/clayharmon/oven-cli).",
193 issue.number
194 ),
195 ),
196 IssueOrigin::Local => (
197 format!("fix: {}", issue.title),
198 format!(
199 "From local issue #{}\n\nAutomated by [oven](https://github.com/clayharmon/oven-cli).",
200 issue.number
201 ),
202 ),
203 };
204
205 git::push_branch(repo_dir, branch).await?;
206 let pr_number =
207 self.github.create_draft_pr_in(&pr_title, branch, &pr_body, repo_dir).await?;
208
209 {
210 let conn = self.db.lock().await;
211 db::runs::update_run_pr(&conn, run_id, pr_number)?;
212 }
213
214 info!(run_id = %run_id, pr = pr_number, "draft PR created");
215 Ok(pr_number)
216 }
217
218 async fn finalize_run(
219 &self,
220 run_id: &str,
221 issue: &PipelineIssue,
222 pr_number: u32,
223 result: &Result<()>,
224 ) -> Result<()> {
225 let (final_status, error_msg) = match result {
226 Ok(()) => {
227 self.issues
228 .transition(
229 issue.number,
230 &self.config.labels.cooking,
231 &self.config.labels.complete,
232 )
233 .await?;
234
235 let should_close =
239 issue.source == IssueOrigin::Local || issue.target_repo.is_some();
240
241 if should_close {
242 let comment = issue.target_repo.as_ref().map_or_else(
243 || format!("Implemented in #{pr_number}"),
244 |repo_name| format!("Implemented in {repo_name}#{pr_number}"),
245 );
246 if let Err(e) = self.issues.close(issue.number, Some(&comment)).await {
247 warn!(
248 run_id = %run_id,
249 error = %e,
250 "failed to close issue"
251 );
252 }
253 }
254
255 (RunStatus::Complete, None)
256 }
257 Err(e) => {
258 warn!(run_id = %run_id, error = %e, "pipeline failed");
259 github::safe_comment(&self.github, pr_number, &format!("Pipeline failed: {e:#}"))
260 .await;
261 let _ = self
262 .issues
263 .transition(
264 issue.number,
265 &self.config.labels.cooking,
266 &self.config.labels.failed,
267 )
268 .await;
269 (RunStatus::Failed, Some(format!("{e:#}")))
270 }
271 };
272
273 let conn = self.db.lock().await;
274 db::runs::finish_run(&conn, run_id, final_status, error_msg.as_deref())
275 }
276
277 async fn run_steps(
278 &self,
279 run_id: &str,
280 ctx: &AgentContext,
281 worktree_path: &std::path::Path,
282 auto_merge: bool,
283 base_branch: &str,
284 ) -> Result<()> {
285 self.check_cancelled()?;
286
287 self.update_status(run_id, RunStatus::Implementing).await?;
289 let impl_prompt = agents::implementer::build_prompt(ctx)?;
290 self.run_agent(run_id, AgentRole::Implementer, &impl_prompt, worktree_path, 1).await?;
291
292 git::push_branch(worktree_path, &ctx.branch).await?;
293
294 let clean = self.run_review_fix_loop(run_id, ctx, worktree_path).await?;
296
297 if !clean {
298 anyhow::bail!("unresolved findings after max review cycles");
299 }
300
301 self.check_cancelled()?;
303 info!(run_id = %run_id, base = base_branch, "rebasing onto base branch");
304 if let Err(e) = git::rebase_on_base(worktree_path, base_branch).await {
305 if let Some(pr_number) = ctx.pr_number {
306 github::safe_comment(
307 &self.github,
308 pr_number,
309 &format!(
310 "Pipeline stopped: {e}\n\nPlease rebase manually and re-run the pipeline."
311 ),
312 )
313 .await;
314 }
315 return Err(e);
316 }
317 git::force_push_branch(worktree_path, &ctx.branch).await?;
318
319 if auto_merge {
321 self.check_cancelled()?;
322 ctx.pr_number.context("no PR number for merge step")?;
323 self.update_status(run_id, RunStatus::Merging).await?;
324 let merge_prompt = agents::merger::build_prompt(ctx, auto_merge)?;
325 self.run_agent(run_id, AgentRole::Merger, &merge_prompt, worktree_path, 1).await?;
326 }
327
328 Ok(())
329 }
330
331 async fn run_review_fix_loop(
332 &self,
333 run_id: &str,
334 ctx: &AgentContext,
335 worktree_path: &std::path::Path,
336 ) -> Result<bool> {
337 for cycle in 1..=3 {
338 self.check_cancelled()?;
339
340 self.update_status(run_id, RunStatus::Reviewing).await?;
341 let review_prompt = agents::reviewer::build_prompt(ctx)?;
342 let review_result = self
343 .run_agent(run_id, AgentRole::Reviewer, &review_prompt, worktree_path, cycle)
344 .await?;
345
346 let review_output = match parse_review_output(&review_result.output) {
347 Ok(output) => output,
348 Err(e) => {
349 warn!(run_id = %run_id, cycle, error = %e, "review output unparseable, treating as failed review");
350 if let Some(pr_number) = ctx.pr_number {
351 github::safe_comment(
352 &self.github,
353 pr_number,
354 &format!("Review cycle {cycle} returned unparseable output. Stopping pipeline."),
355 )
356 .await;
357 }
358 anyhow::bail!("reviewer returned unparseable output in cycle {cycle}");
359 }
360 };
361 self.store_findings(run_id, &review_output.findings).await?;
362
363 let actionable: Vec<_> =
364 review_output.findings.iter().filter(|f| f.severity != Severity::Info).collect();
365
366 if actionable.is_empty() {
367 info!(run_id = %run_id, cycle, "review clean");
368 return Ok(true);
369 }
370
371 info!(run_id = %run_id, cycle, findings = actionable.len(), "review found issues");
372
373 if cycle == 3 {
374 if let Some(pr_number) = ctx.pr_number {
375 let comment = format_unresolved_comment(&actionable);
376 github::safe_comment(&self.github, pr_number, &comment).await;
377 } else {
378 warn!(run_id = %run_id, "no PR number, cannot post unresolved findings");
379 }
380 return Ok(false);
381 }
382
383 self.check_cancelled()?;
385 self.update_status(run_id, RunStatus::Fixing).await?;
386
387 let unresolved = {
388 let conn = self.db.lock().await;
389 db::agent_runs::get_unresolved_findings(&conn, run_id)?
390 };
391
392 let fix_prompt = agents::fixer::build_prompt(ctx, &unresolved)?;
393 self.run_agent(run_id, AgentRole::Fixer, &fix_prompt, worktree_path, cycle).await?;
394
395 git::push_branch(worktree_path, &ctx.branch).await?;
396 }
397
398 Ok(false)
399 }
400
401 async fn store_findings(&self, run_id: &str, findings: &[agents::Finding]) -> Result<()> {
402 let conn = self.db.lock().await;
403 let agent_runs = db::agent_runs::get_agent_runs_for_run(&conn, run_id)?;
404 let reviewer_run_id = agent_runs
405 .iter()
406 .rev()
407 .find_map(|ar| if ar.agent == "reviewer" { Some(ar.id) } else { None });
408 if let Some(ar_id) = reviewer_run_id {
409 for finding in findings {
410 let db_finding = ReviewFinding {
411 id: 0,
412 agent_run_id: ar_id,
413 severity: finding.severity.to_string(),
414 category: finding.category.clone(),
415 file_path: finding.file_path.clone(),
416 line_number: finding.line_number,
417 message: finding.message.clone(),
418 resolved: false,
419 };
420 db::agent_runs::insert_finding(&conn, &db_finding)?;
421 }
422 }
423 drop(conn);
424 Ok(())
425 }
426
427 async fn run_agent(
428 &self,
429 run_id: &str,
430 role: AgentRole,
431 prompt: &str,
432 working_dir: &std::path::Path,
433 cycle: u32,
434 ) -> Result<crate::process::AgentResult> {
435 let agent_run_id = self.record_agent_start(run_id, role, cycle).await?;
436
437 info!(run_id = %run_id, agent = %role, cycle, "agent starting");
438
439 let invocation = AgentInvocation {
440 role,
441 prompt: prompt.to_string(),
442 working_dir: working_dir.to_path_buf(),
443 max_turns: Some(self.config.pipeline.turn_limit),
444 };
445
446 let result = invoke_agent(self.runner.as_ref(), &invocation).await;
447
448 match &result {
449 Ok(agent_result) => {
450 self.record_agent_success(run_id, agent_run_id, agent_result).await?;
451 }
452 Err(e) => {
453 let conn = self.db.lock().await;
454 db::agent_runs::finish_agent_run(
455 &conn,
456 agent_run_id,
457 "failed",
458 0.0,
459 0,
460 None,
461 Some(&format!("{e:#}")),
462 None,
463 )?;
464 }
465 }
466
467 result
468 }
469
470 async fn record_agent_start(&self, run_id: &str, role: AgentRole, cycle: u32) -> Result<i64> {
471 let agent_run = AgentRun {
472 id: 0,
473 run_id: run_id.to_string(),
474 agent: role.to_string(),
475 cycle,
476 status: "running".to_string(),
477 cost_usd: 0.0,
478 turns: 0,
479 started_at: chrono::Utc::now().to_rfc3339(),
480 finished_at: None,
481 output_summary: None,
482 error_message: None,
483 raw_output: None,
484 };
485 let conn = self.db.lock().await;
486 db::agent_runs::insert_agent_run(&conn, &agent_run)
487 }
488
489 async fn record_agent_success(
490 &self,
491 run_id: &str,
492 agent_run_id: i64,
493 agent_result: &crate::process::AgentResult,
494 ) -> Result<()> {
495 let conn = self.db.lock().await;
496 db::agent_runs::finish_agent_run(
497 &conn,
498 agent_run_id,
499 "complete",
500 agent_result.cost_usd,
501 agent_result.turns,
502 Some(&truncate(&agent_result.output, 500)),
503 None,
504 Some(&agent_result.output),
505 )?;
506
507 let new_cost = db::runs::increment_run_cost(&conn, run_id, agent_result.cost_usd)?;
508 drop(conn);
509
510 if new_cost > self.config.pipeline.cost_budget {
511 anyhow::bail!(
512 "cost budget exceeded: ${:.2} > ${:.2}",
513 new_cost,
514 self.config.pipeline.cost_budget
515 );
516 }
517 Ok(())
518 }
519
520 async fn update_status(&self, run_id: &str, status: RunStatus) -> Result<()> {
521 let conn = self.db.lock().await;
522 db::runs::update_run_status(&conn, run_id, status)
523 }
524
525 fn check_cancelled(&self) -> Result<()> {
526 if self.cancel_token.is_cancelled() {
527 anyhow::bail!("pipeline cancelled");
528 }
529 Ok(())
530 }
531}
532
533fn format_unresolved_comment(actionable: &[&agents::Finding]) -> String {
534 let mut comment = String::from("## Unresolved findings after max review cycles\n\n");
535 for f in actionable {
536 let loc = match (&f.file_path, f.line_number) {
537 (Some(path), Some(line)) => format!(" at `{path}:{line}`"),
538 (Some(path), None) => format!(" in `{path}`"),
539 _ => String::new(),
540 };
541 let _ = writeln!(comment, "- **[{}]** {}{}: {}", f.severity, f.category, loc, f.message);
542 }
543 comment
544}
545
546fn new_run(run_id: &str, issue: &PipelineIssue, auto_merge: bool) -> Run {
547 Run {
548 id: run_id.to_string(),
549 issue_number: issue.number,
550 status: RunStatus::Pending,
551 pr_number: None,
552 branch: None,
553 worktree_path: None,
554 cost_usd: 0.0,
555 auto_merge,
556 started_at: chrono::Utc::now().to_rfc3339(),
557 finished_at: None,
558 error_message: None,
559 complexity: "full".to_string(),
560 issue_source: issue.source.to_string(),
561 }
562}
563
564pub fn generate_run_id() -> String {
566 uuid::Uuid::new_v4().to_string()[..8].to_string()
567}
568
569pub(crate) fn truncate(s: &str, max_len: usize) -> String {
574 if s.len() <= max_len {
575 return s.to_string();
576 }
577 let target = max_len.saturating_sub(3);
578 let mut end = target;
579 while end > 0 && !s.is_char_boundary(end) {
580 end -= 1;
581 }
582 format!("{}...", &s[..end])
583}
584
585#[cfg(test)]
586mod tests {
587 use proptest::prelude::*;
588
589 use super::*;
590
591 proptest! {
592 #[test]
593 fn run_ids_always_8_hex_chars(_seed in any::<u64>()) {
594 let id = generate_run_id();
595 prop_assert_eq!(id.len(), 8);
596 prop_assert!(id.chars().all(|c| c.is_ascii_hexdigit()));
597 }
598 }
599
600 #[test]
601 fn run_id_is_8_hex_chars() {
602 let id = generate_run_id();
603 assert_eq!(id.len(), 8);
604 assert!(id.chars().all(|c| c.is_ascii_hexdigit()));
605 }
606
607 #[test]
608 fn run_ids_are_unique() {
609 let ids: Vec<_> = (0..100).map(|_| generate_run_id()).collect();
610 let unique: std::collections::HashSet<_> = ids.iter().collect();
611 assert_eq!(ids.len(), unique.len());
612 }
613
614 #[test]
615 fn truncate_short_string() {
616 assert_eq!(truncate("hello", 10), "hello");
617 }
618
619 #[test]
620 fn truncate_long_string() {
621 let long = "a".repeat(100);
622 let result = truncate(&long, 10);
623 assert_eq!(result.len(), 10); assert!(result.ends_with("..."));
625 }
626
627 #[test]
628 fn truncate_multibyte_does_not_panic() {
629 let s = "πππ";
632 let result = truncate(s, 8);
633 assert!(result.ends_with("..."));
634 assert!(result.starts_with("π"));
635 assert!(result.len() <= 8);
636 }
637
638 #[test]
639 fn truncate_cjk_boundary() {
640 let s = "δ½ ε₯½δΈηζ΅θ―"; let result = truncate(s, 10);
644 assert!(result.ends_with("..."));
645 assert!(result.starts_with("δ½ ε₯½"));
646 assert!(result.len() <= 10);
647 }
648
649 #[test]
650 fn format_unresolved_comment_includes_findings() {
651 let findings = [
652 agents::Finding {
653 severity: Severity::Critical,
654 category: "bug".to_string(),
655 file_path: Some("src/main.rs".to_string()),
656 line_number: Some(42),
657 message: "null pointer".to_string(),
658 },
659 agents::Finding {
660 severity: Severity::Warning,
661 category: "style".to_string(),
662 file_path: None,
663 line_number: None,
664 message: "missing docs".to_string(),
665 },
666 ];
667 let refs: Vec<_> = findings.iter().collect();
668 let comment = format_unresolved_comment(&refs);
669 assert!(comment.contains("Unresolved findings"));
670 assert!(comment.contains("null pointer"));
671 assert!(comment.contains("`src/main.rs:42`"));
672 assert!(comment.contains("missing docs"));
673 }
674}