Skip to main content

oven_cli/pipeline/
executor.rs

1use std::{fmt::Write as _, path::PathBuf, sync::Arc};
2
3use anyhow::{Context, Result};
4use rusqlite::Connection;
5use tokio::sync::Mutex;
6use tokio_util::sync::CancellationToken;
7use tracing::{debug, info, warn};
8
9use crate::{
10    agents::{
11        self, AgentContext, AgentInvocation, AgentRole, Complexity, GraphContextNode,
12        PlannerGraphOutput, Severity, invoke_agent, parse_fixer_output, parse_planner_graph_output,
13        parse_review_output,
14    },
15    config::Config,
16    db::{self, AgentRun, ReviewFinding, Run, RunStatus},
17    git::{self, RebaseOutcome},
18    github::{self, GhClient},
19    issues::{IssueOrigin, IssueProvider, PipelineIssue},
20    process::{self, CommandRunner},
21};
22
23/// The result of running an issue through the pipeline (before merge).
24#[derive(Debug)]
25pub struct PipelineOutcome {
26    pub run_id: String,
27    pub pr_number: u32,
28    /// Branch name, needed for cleanup after the worktree is removed.
29    pub branch: Option<String>,
30    /// Worktree path, retained so the caller can clean up after merge.
31    pub worktree_path: PathBuf,
32    /// Repo directory the worktree belongs to (needed for `git::remove_worktree`).
33    pub target_dir: PathBuf,
34}
35
36/// Runs a single issue through the full pipeline.
37pub struct PipelineExecutor<R: CommandRunner> {
38    pub runner: Arc<R>,
39    pub github: Arc<GhClient<R>>,
40    pub issues: Arc<dyn IssueProvider>,
41    pub db: Arc<Mutex<Connection>>,
42    pub config: Config,
43    pub cancel_token: CancellationToken,
44    pub repo_dir: PathBuf,
45}
46
47impl<R: CommandRunner + 'static> PipelineExecutor<R> {
48    /// Run the full pipeline for a single issue.
49    pub async fn run_issue(&self, issue: &PipelineIssue, auto_merge: bool) -> Result<()> {
50        self.run_issue_with_complexity(issue, auto_merge, None).await
51    }
52
53    /// Run the full pipeline for a single issue with an optional complexity classification.
54    pub async fn run_issue_with_complexity(
55        &self,
56        issue: &PipelineIssue,
57        auto_merge: bool,
58        complexity: Option<Complexity>,
59    ) -> Result<()> {
60        let outcome = self.run_issue_pipeline(issue, auto_merge, complexity).await?;
61        self.finalize_merge(&outcome, issue).await
62    }
63
64    /// Run the pipeline up to (but not including) finalization.
65    ///
66    /// Returns a `PipelineOutcome` with the run ID and PR number.
67    /// The caller is responsible for calling `finalize_run` or `finalize_merge`
68    /// at the appropriate time (e.g., after the PR is actually merged).
69    pub async fn run_issue_pipeline(
70        &self,
71        issue: &PipelineIssue,
72        auto_merge: bool,
73        complexity: Option<Complexity>,
74    ) -> Result<PipelineOutcome> {
75        let run_id = generate_run_id();
76
77        // Determine target repo for worktrees and PRs (multi-repo routing)
78        let (target_dir, is_multi_repo) = self.resolve_target_dir(issue.target_repo.as_ref())?;
79
80        let base_branch = git::default_branch(&target_dir).await?;
81
82        let mut run = new_run(&run_id, issue, auto_merge);
83        if let Some(ref c) = complexity {
84            run.complexity = c.to_string();
85        }
86        {
87            let conn = self.db.lock().await;
88            db::runs::insert_run(&conn, &run)?;
89        }
90
91        self.issues
92            .transition(issue.number, &self.config.labels.ready, &self.config.labels.cooking)
93            .await?;
94
95        let worktree = git::create_worktree(&target_dir, issue.number, &base_branch).await?;
96        self.record_worktree(&run_id, &worktree).await?;
97
98        // Seed branch with an empty commit so GitHub accepts the draft PR
99        git::empty_commit(
100            &worktree.path,
101            &format!("chore: start oven pipeline for issue #{}", issue.number),
102        )
103        .await?;
104
105        info!(
106            run_id = %run_id,
107            issue = issue.number,
108            branch = %worktree.branch,
109            target_repo = ?issue.target_repo,
110            "starting pipeline"
111        );
112
113        let pr_number = self.create_pr(&run_id, issue, &worktree.branch, &target_dir).await?;
114
115        let ctx = AgentContext {
116            issue_number: issue.number,
117            issue_title: issue.title.clone(),
118            issue_body: issue.body.clone(),
119            branch: worktree.branch.clone(),
120            pr_number: Some(pr_number),
121            test_command: self.config.project.test.clone(),
122            lint_command: self.config.project.lint.clone(),
123            review_findings: None,
124            cycle: 1,
125            target_repo: if is_multi_repo { issue.target_repo.clone() } else { None },
126            issue_source: issue.source.as_str().to_string(),
127            base_branch: base_branch.clone(),
128        };
129
130        let result = self.run_steps(&run_id, &ctx, &worktree.path, auto_merge, &target_dir).await;
131
132        if let Err(ref e) = result {
133            // On failure, finalize immediately (no merge to wait for).
134            // Worktree is intentionally preserved so uncommitted work is not lost.
135            // Use `oven clean` to remove worktrees manually.
136            self.finalize_run(&run_id, issue, pr_number, &result, &target_dir).await?;
137            return Err(anyhow::anyhow!("{e:#}"));
138        }
139
140        // Update status to AwaitingMerge
141        self.update_status(&run_id, RunStatus::AwaitingMerge).await?;
142
143        Ok(PipelineOutcome {
144            run_id,
145            pr_number,
146            branch: Some(worktree.branch),
147            worktree_path: worktree.path,
148            target_dir,
149        })
150    }
151
152    /// Finalize a run after its PR has been merged.
153    ///
154    /// Transitions labels, closes issues, marks the run as complete, cleans
155    /// up the worktree, and deletes the local branch. The worktree must be
156    /// removed before the branch because git refuses to delete a branch that
157    /// is checked out in a worktree.
158    pub async fn finalize_merge(
159        &self,
160        outcome: &PipelineOutcome,
161        issue: &PipelineIssue,
162    ) -> Result<()> {
163        self.finalize_run(&outcome.run_id, issue, outcome.pr_number, &Ok(()), &outcome.target_dir)
164            .await?;
165        if let Err(e) = git::remove_worktree(&outcome.target_dir, &outcome.worktree_path).await {
166            warn!(
167                run_id = %outcome.run_id,
168                error = %e,
169                "failed to clean up worktree after merge"
170            );
171        }
172        if let Some(ref branch) = outcome.branch {
173            if let Err(e) = git::delete_branch(&outcome.target_dir, branch).await {
174                warn!(
175                    run_id = %outcome.run_id,
176                    branch = %branch,
177                    error = %e,
178                    "failed to delete local branch after merge"
179                );
180            }
181        }
182        Ok(())
183    }
184
185    /// Invoke the planner agent to decide dependency ordering for a set of issues.
186    ///
187    /// `graph_context` describes the current dependency graph state so the planner
188    /// can avoid scheduling conflicting work alongside in-flight issues.
189    ///
190    /// Returns `None` if the planner fails or returns unparseable output (fallback to default).
191    pub async fn plan_issues(
192        &self,
193        issues: &[PipelineIssue],
194        graph_context: &[GraphContextNode],
195    ) -> Option<PlannerGraphOutput> {
196        let prompt = match agents::planner::build_prompt(issues, graph_context) {
197            Ok(p) => p,
198            Err(e) => {
199                warn!(error = %e, "planner prompt build failed");
200                return None;
201            }
202        };
203        let invocation = AgentInvocation {
204            role: AgentRole::Planner,
205            prompt,
206            working_dir: self.repo_dir.clone(),
207            max_turns: Some(self.config.pipeline.turn_limit),
208            model: self.config.models.model_for(AgentRole::Planner.as_str()).map(String::from),
209        };
210
211        match invoke_agent(self.runner.as_ref(), &invocation).await {
212            Ok(result) => {
213                debug!(output = %result.output, "raw planner output");
214                let parsed = parse_planner_graph_output(&result.output);
215                if parsed.is_none() {
216                    warn!(output = %result.output, "planner returned unparseable output, falling back to all-parallel");
217                }
218                parsed
219            }
220            Err(e) => {
221                warn!(error = %e, "planner agent failed, falling back to all-parallel");
222                None
223            }
224        }
225    }
226
227    /// Determine the effective repo directory for worktrees and PRs.
228    ///
229    /// Returns `(target_dir, is_multi_repo)`. When multi-repo is disabled or no target
230    /// is specified, falls back to `self.repo_dir`.
231    pub(crate) fn resolve_target_dir(
232        &self,
233        target_repo: Option<&String>,
234    ) -> Result<(PathBuf, bool)> {
235        if !self.config.multi_repo.enabled {
236            return Ok((self.repo_dir.clone(), false));
237        }
238        match target_repo {
239            Some(name) => {
240                let path = self.config.resolve_repo(name)?;
241                Ok((path, true))
242            }
243            None => Ok((self.repo_dir.clone(), false)),
244        }
245    }
246
247    /// Reconstruct a `PipelineOutcome` from graph node data (for merge polling).
248    ///
249    /// Worktree paths are deterministic, so we can rebuild the outcome from
250    /// the issue metadata stored on the graph node. The branch name is looked
251    /// up from the runs table (it was recorded when the worktree was created).
252    pub async fn reconstruct_outcome(
253        &self,
254        issue: &PipelineIssue,
255        run_id: &str,
256        pr_number: u32,
257    ) -> Result<PipelineOutcome> {
258        let (target_dir, _) = self.resolve_target_dir(issue.target_repo.as_ref())?;
259        let worktree_path =
260            target_dir.join(".oven").join("worktrees").join(format!("issue-{}", issue.number));
261
262        let branch = {
263            let conn = self.db.lock().await;
264            db::runs::get_run(&conn, run_id)?.and_then(|r| r.branch)
265        };
266
267        Ok(PipelineOutcome {
268            run_id: run_id.to_string(),
269            pr_number,
270            branch,
271            worktree_path,
272            target_dir,
273        })
274    }
275
276    async fn record_worktree(&self, run_id: &str, worktree: &git::Worktree) -> Result<()> {
277        let conn = self.db.lock().await;
278        db::runs::update_run_worktree(
279            &conn,
280            run_id,
281            &worktree.branch,
282            &worktree.path.to_string_lossy(),
283        )?;
284        drop(conn);
285        Ok(())
286    }
287
288    async fn create_pr(
289        &self,
290        run_id: &str,
291        issue: &PipelineIssue,
292        branch: &str,
293        repo_dir: &std::path::Path,
294    ) -> Result<u32> {
295        let (pr_title, pr_body) = match issue.source {
296            IssueOrigin::Github => (
297                format!("fix(#{}): {}", issue.number, issue.title),
298                format!(
299                    "Resolves #{}\n\nAutomated by [oven](https://github.com/clayharmon/oven-cli).",
300                    issue.number
301                ),
302            ),
303            IssueOrigin::Local => (
304                format!("fix: {}", issue.title),
305                format!(
306                    "From local issue #{}\n\nAutomated by [oven](https://github.com/clayharmon/oven-cli).",
307                    issue.number
308                ),
309            ),
310        };
311
312        git::push_branch(repo_dir, branch).await?;
313        let pr_number =
314            self.github.create_draft_pr_in(&pr_title, branch, &pr_body, repo_dir).await?;
315
316        {
317            let conn = self.db.lock().await;
318            db::runs::update_run_pr(&conn, run_id, pr_number)?;
319        }
320
321        info!(run_id = %run_id, pr = pr_number, "draft PR created");
322        Ok(pr_number)
323    }
324
325    async fn finalize_run(
326        &self,
327        run_id: &str,
328        issue: &PipelineIssue,
329        pr_number: u32,
330        result: &Result<()>,
331        target_dir: &std::path::Path,
332    ) -> Result<()> {
333        let (final_status, error_msg) = match result {
334            Ok(()) => {
335                self.issues
336                    .transition(
337                        issue.number,
338                        &self.config.labels.cooking,
339                        &self.config.labels.complete,
340                    )
341                    .await?;
342
343                // Close the issue for local and multi-repo cases. Single-repo
344                // GitHub issues are closed directly in the merge step (run_steps)
345                // because they share the same gh context.
346                let should_close =
347                    issue.source == IssueOrigin::Local || issue.target_repo.is_some();
348
349                if should_close {
350                    let comment = issue.target_repo.as_ref().map_or_else(
351                        || format!("Implemented in #{pr_number}"),
352                        |repo_name| format!("Implemented in {repo_name}#{pr_number}"),
353                    );
354                    if let Err(e) = self.issues.close(issue.number, Some(&comment)).await {
355                        warn!(
356                            run_id = %run_id,
357                            error = %e,
358                            "failed to close issue"
359                        );
360                    }
361                }
362
363                (RunStatus::Complete, None)
364            }
365            Err(e) => {
366                warn!(run_id = %run_id, error = %e, "pipeline failed");
367                github::safe_comment(
368                    &self.github,
369                    pr_number,
370                    &format_pipeline_failure(e),
371                    target_dir,
372                )
373                .await;
374                let _ = self
375                    .issues
376                    .transition(
377                        issue.number,
378                        &self.config.labels.cooking,
379                        &self.config.labels.failed,
380                    )
381                    .await;
382                (RunStatus::Failed, Some(format!("{e:#}")))
383            }
384        };
385
386        let conn = self.db.lock().await;
387        db::runs::finish_run(&conn, run_id, final_status, error_msg.as_deref())
388    }
389
390    async fn run_steps(
391        &self,
392        run_id: &str,
393        ctx: &AgentContext,
394        worktree_path: &std::path::Path,
395        auto_merge: bool,
396        target_dir: &std::path::Path,
397    ) -> Result<()> {
398        self.check_cancelled()?;
399
400        // 1. Implement
401        self.update_status(run_id, RunStatus::Implementing).await?;
402        let impl_prompt = agents::implementer::build_prompt(ctx)?;
403        let impl_result =
404            self.run_agent(run_id, AgentRole::Implementer, &impl_prompt, worktree_path, 1).await?;
405
406        git::push_branch(worktree_path, &ctx.branch).await?;
407
408        // 1b. Update PR description and mark ready for review
409        if let Some(pr_number) = ctx.pr_number {
410            let body = build_pr_body(&impl_result.output, ctx);
411            if let Err(e) =
412                self.github.edit_pr_in(pr_number, &pr_title(ctx), &body, target_dir).await
413            {
414                warn!(run_id = %run_id, error = %e, "failed to update PR description");
415            }
416            if let Err(e) = self.github.mark_pr_ready_in(pr_number, target_dir).await {
417                warn!(run_id = %run_id, error = %e, "failed to mark PR ready");
418            }
419        }
420
421        // 1c. Post implementation comment on PR
422        if let Some(pr_number) = ctx.pr_number {
423            let summary = extract_impl_summary(&impl_result.output);
424            github::safe_comment(
425                &self.github,
426                pr_number,
427                &format_impl_comment(&summary),
428                target_dir,
429            )
430            .await;
431        }
432
433        // 2. Review-fix loop (posts its own step comments on the PR)
434        self.run_review_fix_loop(run_id, ctx, worktree_path, target_dir).await?;
435
436        // 3. Commit any uncommitted agent work before rebasing. Agents (especially
437        //    the fixer) sometimes modify files without committing. Without this,
438        //    git rebase refuses to start on a dirty worktree.
439        if git::commit_all(worktree_path, "chore: commit uncommitted agent changes").await? {
440            info!(run_id = %run_id, "committed uncommitted agent changes before rebase");
441        }
442
443        // 4. Rebase onto base branch to resolve any conflicts from parallel merges
444        self.check_cancelled()?;
445        info!(run_id = %run_id, base = %ctx.base_branch, "rebasing onto base branch");
446        let rebase_outcome =
447            self.rebase_with_agent_fallback(run_id, ctx, worktree_path, target_dir).await?;
448
449        if let Some(pr_number) = ctx.pr_number {
450            github::safe_comment(
451                &self.github,
452                pr_number,
453                &format_rebase_comment(&rebase_outcome),
454                target_dir,
455            )
456            .await;
457        }
458
459        if let RebaseOutcome::Failed(ref msg) = rebase_outcome {
460            anyhow::bail!("rebase failed: {msg}");
461        }
462
463        git::force_push_branch(worktree_path, &ctx.branch).await?;
464
465        // 5. Merge (only when auto-merge is enabled)
466        if auto_merge {
467            let pr_number = ctx.pr_number.context("no PR number for merge step")?;
468            self.update_status(run_id, RunStatus::Merging).await?;
469
470            self.merge_with_retry(run_id, ctx, worktree_path, target_dir, pr_number).await?;
471
472            // Close the issue for single-repo GitHub issues. Multi-repo and local
473            // issues are closed by finalize_run instead (different repo context).
474            if ctx.target_repo.is_none() && ctx.issue_source == "github" {
475                if let Err(e) = self
476                    .github
477                    .close_issue(ctx.issue_number, Some(&format!("Implemented in #{pr_number}")))
478                    .await
479                {
480                    warn!(run_id = %run_id, error = %e, "failed to close issue after merge");
481                }
482            }
483
484            github::safe_comment(&self.github, pr_number, &format_merge_comment(), target_dir)
485                .await;
486        } else if let Some(pr_number) = ctx.pr_number {
487            github::safe_comment(&self.github, pr_number, &format_ready_comment(), target_dir)
488                .await;
489        }
490
491        Ok(())
492    }
493
494    /// Attempt to merge a PR, retrying with a fresh rebase when GitHub reports
495    /// the PR as not mergeable (e.g. because a parallel PR landed first).
496    async fn merge_with_retry(
497        &self,
498        run_id: &str,
499        ctx: &AgentContext,
500        worktree_path: &std::path::Path,
501        target_dir: &std::path::Path,
502        pr_number: u32,
503    ) -> Result<()> {
504        const MAX_MERGE_ATTEMPTS: u32 = 3;
505
506        for attempt in 1..=MAX_MERGE_ATTEMPTS {
507            self.check_cancelled()?;
508
509            match self
510                .github
511                .merge_pr_in(pr_number, &self.config.pipeline.merge_strategy, target_dir)
512                .await
513            {
514                Ok(()) => {
515                    info!(run_id = %run_id, pr = pr_number, attempt, "PR merged");
516                    return Ok(());
517                }
518                Err(e) if attempt < MAX_MERGE_ATTEMPTS && is_retryable_merge_error(&e) => {
519                    warn!(
520                        run_id = %run_id, pr = pr_number, attempt,
521                        "PR not mergeable, re-rebasing and retrying"
522                    );
523
524                    let rebase_outcome = self
525                        .rebase_with_agent_fallback(run_id, ctx, worktree_path, target_dir)
526                        .await?;
527
528                    if let RebaseOutcome::Failed(ref msg) = rebase_outcome {
529                        anyhow::bail!("merge retry rebase failed: {msg}");
530                    }
531
532                    git::force_push_branch(worktree_path, &ctx.branch).await?;
533                    // Brief pause to let GitHub update mergeability state
534                    tokio::time::sleep(std::time::Duration::from_secs(5)).await;
535                }
536                Err(e) => return Err(e.context("merge PR failed")),
537            }
538        }
539
540        anyhow::bail!("PR #{pr_number} not mergeable after {MAX_MERGE_ATTEMPTS} rebase attempts");
541    }
542
543    async fn run_review_fix_loop(
544        &self,
545        run_id: &str,
546        ctx: &AgentContext,
547        worktree_path: &std::path::Path,
548        target_dir: &std::path::Path,
549    ) -> Result<()> {
550        let mut pre_fix_ref: Option<String> = None;
551
552        for cycle in 1..=3 {
553            self.check_cancelled()?;
554
555            self.update_status(run_id, RunStatus::Reviewing).await?;
556
557            let (prior_addressed, prior_disputes, prior_unresolved) =
558                self.gather_prior_findings(run_id, cycle).await?;
559
560            let review_prompt = agents::reviewer::build_prompt(
561                ctx,
562                &prior_addressed,
563                &prior_disputes,
564                &prior_unresolved,
565                pre_fix_ref.as_deref(),
566            )?;
567
568            // Reviewer failure: skip review and continue (don't kill pipeline)
569            let review_result = match self
570                .run_agent(run_id, AgentRole::Reviewer, &review_prompt, worktree_path, cycle)
571                .await
572            {
573                Ok(result) => result,
574                Err(e) => {
575                    warn!(run_id = %run_id, cycle, error = %e, "reviewer agent failed, skipping review");
576                    if let Some(pr_number) = ctx.pr_number {
577                        github::safe_comment(
578                            &self.github,
579                            pr_number,
580                            &format_review_skipped_comment(cycle, &e),
581                            target_dir,
582                        )
583                        .await;
584                    }
585                    return Ok(());
586                }
587            };
588
589            let review_output = parse_review_output(&review_result.output);
590            self.store_findings(run_id, &review_output.findings).await?;
591
592            let actionable: Vec<_> =
593                review_output.findings.iter().filter(|f| f.severity != Severity::Info).collect();
594
595            // Post review comment on PR
596            if let Some(pr_number) = ctx.pr_number {
597                github::safe_comment(
598                    &self.github,
599                    pr_number,
600                    &format_review_comment(cycle, &actionable),
601                    target_dir,
602                )
603                .await;
604            }
605
606            if actionable.is_empty() {
607                info!(run_id = %run_id, cycle, "review clean");
608                return Ok(());
609            }
610
611            info!(run_id = %run_id, cycle, findings = actionable.len(), "review found issues");
612
613            if cycle == 3 {
614                if let Some(pr_number) = ctx.pr_number {
615                    let comment = format_unresolved_comment(&actionable);
616                    github::safe_comment(&self.github, pr_number, &comment, target_dir).await;
617                } else {
618                    warn!(run_id = %run_id, "no PR number, cannot post unresolved findings");
619                }
620                return Ok(());
621            }
622
623            // Snapshot HEAD before fix step so next reviewer can scope to fixer changes
624            pre_fix_ref = Some(git::head_sha(worktree_path).await?);
625
626            self.run_fix_step(run_id, ctx, worktree_path, target_dir, cycle).await?;
627        }
628
629        Ok(())
630    }
631
632    /// Gather prior addressed, disputed, and unresolved findings for review cycles 2+.
633    async fn gather_prior_findings(
634        &self,
635        run_id: &str,
636        cycle: u32,
637    ) -> Result<(Vec<ReviewFinding>, Vec<ReviewFinding>, Vec<ReviewFinding>)> {
638        if cycle <= 1 {
639            return Ok((Vec::new(), Vec::new(), Vec::new()));
640        }
641
642        let conn = self.db.lock().await;
643        let all_resolved = db::agent_runs::get_resolved_findings(&conn, run_id)?;
644        let all_unresolved = db::agent_runs::get_unresolved_findings(&conn, run_id)?;
645        drop(conn);
646
647        let (mut addressed, disputed): (Vec<_>, Vec<_>) = all_resolved.into_iter().partition(|f| {
648            f.dispute_reason.as_deref().is_some_and(|r| r.starts_with("ADDRESSED: "))
649        });
650
651        // Strip the "ADDRESSED: " prefix so the template gets clean action text
652        for f in &mut addressed {
653            if let Some(ref mut reason) = f.dispute_reason {
654                if let Some(stripped) = reason.strip_prefix("ADDRESSED: ") {
655                    *reason = stripped.to_string();
656                }
657            }
658        }
659
660        Ok((addressed, disputed, all_unresolved))
661    }
662
663    /// Run the fixer agent for a given cycle, process its output, and push.
664    ///
665    /// If the fixer agent fails, posts a comment on the PR and returns `Ok(())`
666    /// so the caller can continue to the next review cycle.
667    ///
668    /// Handles three fixer outcome scenarios:
669    /// 1. Normal: fixer produces structured JSON with addressed/disputed findings
670    /// 2. Silent commits: fixer makes commits but no structured output (infer from git)
671    /// 3. Did nothing: no commits and no output (mark findings as not actionable)
672    async fn run_fix_step(
673        &self,
674        run_id: &str,
675        ctx: &AgentContext,
676        worktree_path: &std::path::Path,
677        target_dir: &std::path::Path,
678        cycle: u32,
679    ) -> Result<()> {
680        self.check_cancelled()?;
681        self.update_status(run_id, RunStatus::Fixing).await?;
682
683        let actionable = self.filter_actionable_findings(run_id).await?;
684
685        if actionable.is_empty() {
686            info!(run_id = %run_id, cycle, "no actionable findings for fixer, skipping");
687            if let Some(pr_number) = ctx.pr_number {
688                github::safe_comment(
689                    &self.github,
690                    pr_number,
691                    &format!(
692                        "### Fix skipped (cycle {cycle})\n\n\
693                         No actionable findings (all findings lacked file paths).\
694                         {COMMENT_FOOTER}"
695                    ),
696                    target_dir,
697                )
698                .await;
699            }
700            return Ok(());
701        }
702
703        // Snapshot HEAD before fixer runs
704        let pre_fix_head = git::head_sha(worktree_path).await?;
705
706        let fix_prompt = agents::fixer::build_prompt(ctx, &actionable)?;
707
708        // Fixer failure: skip fix (caller continues to next review cycle)
709        let fix_result =
710            match self.run_agent(run_id, AgentRole::Fixer, &fix_prompt, worktree_path, cycle).await
711            {
712                Ok(result) => result,
713                Err(e) => {
714                    warn!(run_id = %run_id, cycle, error = %e, "fixer agent failed, skipping fix");
715                    if let Some(pr_number) = ctx.pr_number {
716                        github::safe_comment(
717                            &self.github,
718                            pr_number,
719                            &format_fix_skipped_comment(cycle, &e),
720                            target_dir,
721                        )
722                        .await;
723                    }
724                    return Ok(());
725                }
726            };
727
728        // Parse fixer output and detect "did nothing" scenarios
729        let fixer_output = parse_fixer_output(&fix_result.output);
730        let fixer_did_nothing =
731            fixer_output.addressed.is_empty() && fixer_output.disputed.is_empty();
732
733        let new_commits = if fixer_did_nothing {
734            git::commit_count_since(worktree_path, &pre_fix_head).await.unwrap_or(0)
735        } else {
736            0
737        };
738
739        if fixer_did_nothing {
740            if new_commits > 0 {
741                // Fixer made commits but didn't produce structured output.
742                // Infer which findings were addressed by checking changed files.
743                warn!(
744                    run_id = %run_id, cycle, commits = new_commits,
745                    "fixer output unparseable but commits exist, inferring addressed from git"
746                );
747                self.infer_addressed_from_git(run_id, &actionable, worktree_path, &pre_fix_head)
748                    .await?;
749            } else {
750                // Fixer did literally nothing. Mark findings so they don't zombie.
751                warn!(
752                    run_id = %run_id, cycle,
753                    "fixer produced no output and no commits, marking findings not actionable"
754                );
755                let conn = self.db.lock().await;
756                for f in &actionable {
757                    db::agent_runs::resolve_finding(
758                        &conn,
759                        f.id,
760                        "ADDRESSED: fixer could not act on this finding (no commits, no output)",
761                    )?;
762                }
763                drop(conn);
764            }
765        } else {
766            self.process_fixer_results(run_id, &actionable, &fixer_output).await?;
767        }
768
769        // Post fix comment on PR
770        if let Some(pr_number) = ctx.pr_number {
771            let comment = if fixer_did_nothing {
772                format_fixer_recovery_comment(cycle, new_commits)
773            } else {
774                format_fix_comment(cycle, &fixer_output)
775            };
776            github::safe_comment(&self.github, pr_number, &comment, target_dir).await;
777        }
778
779        git::push_branch(worktree_path, &ctx.branch).await?;
780        Ok(())
781    }
782
783    /// Process all fixer results (disputes + addressed) in a single lock acquisition.
784    ///
785    /// The fixer references findings by 1-indexed position in the list it received.
786    /// We map those back to the actual `ReviewFinding` IDs and mark them resolved.
787    /// Disputed findings store the fixer's reason directly; addressed findings get
788    /// an `ADDRESSED: ` prefix so we can distinguish them when building the next
789    /// reviewer prompt.
790    async fn process_fixer_results(
791        &self,
792        run_id: &str,
793        findings_sent_to_fixer: &[ReviewFinding],
794        fixer_output: &agents::FixerOutput,
795    ) -> Result<()> {
796        if fixer_output.disputed.is_empty() && fixer_output.addressed.is_empty() {
797            return Ok(());
798        }
799
800        let conn = self.db.lock().await;
801
802        for dispute in &fixer_output.disputed {
803            let idx = dispute.finding.saturating_sub(1) as usize;
804            if let Some(finding) = findings_sent_to_fixer.get(idx) {
805                db::agent_runs::resolve_finding(&conn, finding.id, &dispute.reason)?;
806                info!(
807                    run_id = %run_id,
808                    finding_id = finding.id,
809                    reason = %dispute.reason,
810                    "finding disputed by fixer, marked resolved"
811                );
812            }
813        }
814
815        for action in &fixer_output.addressed {
816            let idx = action.finding.saturating_sub(1) as usize;
817            if let Some(finding) = findings_sent_to_fixer.get(idx) {
818                let reason = format!("ADDRESSED: {}", action.action);
819                db::agent_runs::resolve_finding(&conn, finding.id, &reason)?;
820                info!(
821                    run_id = %run_id,
822                    finding_id = finding.id,
823                    action = %action.action,
824                    "finding addressed by fixer, marked resolved"
825                );
826            }
827        }
828
829        drop(conn);
830        Ok(())
831    }
832
833    /// Filter unresolved findings into actionable (has `file_path`) and non-actionable.
834    ///
835    /// Non-actionable findings are auto-resolved in the DB so they don't accumulate
836    /// as zombie findings across cycles.
837    async fn filter_actionable_findings(&self, run_id: &str) -> Result<Vec<ReviewFinding>> {
838        let conn = self.db.lock().await;
839        let unresolved = db::agent_runs::get_unresolved_findings(&conn, run_id)?;
840
841        let (actionable, non_actionable): (Vec<_>, Vec<_>) =
842            unresolved.into_iter().partition(|f| f.file_path.is_some());
843
844        if !non_actionable.is_empty() {
845            warn!(
846                run_id = %run_id,
847                count = non_actionable.len(),
848                "auto-resolving non-actionable findings (no file_path)"
849            );
850            for f in &non_actionable {
851                db::agent_runs::resolve_finding(
852                    &conn,
853                    f.id,
854                    "ADDRESSED: auto-resolved -- finding has no file path, not actionable by fixer",
855                )?;
856            }
857        }
858
859        drop(conn);
860        Ok(actionable)
861    }
862
863    /// Infer which findings were addressed by the fixer based on git changes.
864    ///
865    /// When the fixer makes commits but doesn't produce structured JSON output,
866    /// we cross-reference the changed files against the finding file paths.
867    async fn infer_addressed_from_git(
868        &self,
869        run_id: &str,
870        findings: &[ReviewFinding],
871        worktree_path: &std::path::Path,
872        pre_fix_head: &str,
873    ) -> Result<()> {
874        let changed_files =
875            git::changed_files_since(worktree_path, pre_fix_head).await.unwrap_or_default();
876
877        let conn = self.db.lock().await;
878        for f in findings {
879            let was_touched =
880                f.file_path.as_ref().is_some_and(|fp| changed_files.iter().any(|cf| cf == fp));
881
882            let reason = if was_touched {
883                "ADDRESSED: inferred from git -- fixer modified this file (no structured output)"
884            } else {
885                "ADDRESSED: inferred from git -- file not modified (no structured output)"
886            };
887
888            db::agent_runs::resolve_finding(&conn, f.id, reason)?;
889            info!(
890                run_id = %run_id,
891                finding_id = f.id,
892                file = ?f.file_path,
893                touched = was_touched,
894                "finding resolved via git inference"
895            );
896        }
897        drop(conn);
898        Ok(())
899    }
900
901    async fn store_findings(&self, run_id: &str, findings: &[agents::Finding]) -> Result<()> {
902        let conn = self.db.lock().await;
903        let agent_runs = db::agent_runs::get_agent_runs_for_run(&conn, run_id)?;
904        let reviewer_run_id = agent_runs
905            .iter()
906            .rev()
907            .find_map(|ar| if ar.agent == "reviewer" { Some(ar.id) } else { None });
908        if let Some(ar_id) = reviewer_run_id {
909            for finding in findings {
910                let db_finding = ReviewFinding {
911                    id: 0,
912                    agent_run_id: ar_id,
913                    severity: finding.severity.to_string(),
914                    category: finding.category.clone(),
915                    file_path: finding.file_path.clone(),
916                    line_number: finding.line_number,
917                    message: finding.message.clone(),
918                    resolved: false,
919                    dispute_reason: None,
920                };
921                db::agent_runs::insert_finding(&conn, &db_finding)?;
922            }
923        }
924        drop(conn);
925        Ok(())
926    }
927
928    /// Rebase with agent-assisted conflict resolution.
929    ///
930    /// Chain: rebase -> if conflicts, agent resolves -> rebase --continue -> loop.
931    async fn rebase_with_agent_fallback(
932        &self,
933        run_id: &str,
934        ctx: &AgentContext,
935        worktree_path: &std::path::Path,
936        target_dir: &std::path::Path,
937    ) -> Result<RebaseOutcome> {
938        const MAX_REBASE_ROUNDS: u32 = 5;
939
940        let outcome = git::start_rebase(worktree_path, &ctx.base_branch).await;
941
942        let mut conflicting_files = match outcome {
943            RebaseOutcome::RebaseConflicts(files) => files,
944            other => return Ok(other),
945        };
946
947        for round in 1..=MAX_REBASE_ROUNDS {
948            self.check_cancelled()?;
949            info!(
950                run_id = %run_id,
951                round,
952                files = ?conflicting_files,
953                "rebase conflicts, attempting agent resolution"
954            );
955
956            if let Some(pr_number) = ctx.pr_number {
957                github::safe_comment(
958                    &self.github,
959                    pr_number,
960                    &format_rebase_conflict_comment(round, &conflicting_files),
961                    target_dir,
962                )
963                .await;
964            }
965
966            let conflict_prompt = format!(
967                "You are resolving rebase conflicts. The following files have unresolved \
968                 conflict markers (<<<<<<< / ======= / >>>>>>> markers):\n\n{}\n\n\
969                 Open each file, find the conflict markers, and resolve them by choosing \
970                 the correct code. Remove all conflict markers. Do NOT add new features \
971                 or refactor -- just resolve the conflicts so the code compiles and tests pass.\n\n\
972                 After resolving, run any test/lint commands if available:\n\
973                 - Test: {}\n\
974                 - Lint: {}",
975                conflicting_files.iter().map(|f| format!("- {f}")).collect::<Vec<_>>().join("\n"),
976                ctx.test_command.as_deref().unwrap_or("(none)"),
977                ctx.lint_command.as_deref().unwrap_or("(none)"),
978            );
979
980            if let Err(e) = self
981                .run_agent(run_id, AgentRole::Implementer, &conflict_prompt, worktree_path, 1)
982                .await
983            {
984                warn!(run_id = %run_id, error = %e, "conflict resolution agent failed");
985                git::abort_rebase(worktree_path).await;
986                return Ok(RebaseOutcome::Failed(format!(
987                    "agent conflict resolution failed: {e:#}"
988                )));
989            }
990
991            // Check if the agent actually resolved the conflicts by inspecting
992            // file content for markers. We cannot use `conflicting_files()` here
993            // because it checks git index state (diff-filter=U), and files remain
994            // "Unmerged" until staged with `git add` -- which hasn't happened yet.
995            let remaining =
996                git::files_with_conflict_markers(worktree_path, &conflicting_files).await;
997            if !remaining.is_empty() {
998                warn!(
999                    run_id = %run_id,
1000                    remaining = ?remaining,
1001                    "agent did not resolve all conflicts"
1002                );
1003                git::abort_rebase(worktree_path).await;
1004                return Ok(RebaseOutcome::Failed(format!(
1005                    "agent could not resolve conflicts in: {}",
1006                    remaining.join(", ")
1007                )));
1008            }
1009
1010            // Stage resolved files and continue the rebase
1011            match git::rebase_continue(worktree_path, &conflicting_files).await {
1012                Ok(None) => {
1013                    info!(run_id = %run_id, "agent resolved rebase conflicts");
1014                    return Ok(RebaseOutcome::AgentResolved);
1015                }
1016                Ok(Some(new_conflicts)) => {
1017                    // Next commit in the rebase also has conflicts -- loop
1018                    conflicting_files = new_conflicts;
1019                }
1020                Err(e) => {
1021                    git::abort_rebase(worktree_path).await;
1022                    return Ok(RebaseOutcome::Failed(format!("rebase --continue failed: {e:#}")));
1023                }
1024            }
1025        }
1026
1027        // Exhausted all rounds
1028        git::abort_rebase(worktree_path).await;
1029        Ok(RebaseOutcome::Failed(format!(
1030            "rebase conflicts persisted after {MAX_REBASE_ROUNDS} resolution rounds"
1031        )))
1032    }
1033
1034    async fn run_agent(
1035        &self,
1036        run_id: &str,
1037        role: AgentRole,
1038        prompt: &str,
1039        working_dir: &std::path::Path,
1040        cycle: u32,
1041    ) -> Result<crate::process::AgentResult> {
1042        let agent_run_id = self.record_agent_start(run_id, role, cycle).await?;
1043
1044        info!(run_id = %run_id, agent = %role, cycle, "agent starting");
1045
1046        let invocation = AgentInvocation {
1047            role,
1048            prompt: prompt.to_string(),
1049            working_dir: working_dir.to_path_buf(),
1050            max_turns: Some(self.config.pipeline.turn_limit),
1051            model: self.config.models.model_for(role.as_str()).map(String::from),
1052        };
1053
1054        let result = process::run_with_retry(self.runner.as_ref(), &invocation).await;
1055
1056        match &result {
1057            Ok(agent_result) => {
1058                self.record_agent_success(run_id, agent_run_id, agent_result).await?;
1059            }
1060            Err(e) => {
1061                let conn = self.db.lock().await;
1062                db::agent_runs::finish_agent_run(
1063                    &conn,
1064                    agent_run_id,
1065                    "failed",
1066                    0.0,
1067                    0,
1068                    None,
1069                    Some(&format!("{e:#}")),
1070                    None,
1071                )?;
1072            }
1073        }
1074
1075        result
1076    }
1077
1078    async fn record_agent_start(&self, run_id: &str, role: AgentRole, cycle: u32) -> Result<i64> {
1079        let agent_run = AgentRun {
1080            id: 0,
1081            run_id: run_id.to_string(),
1082            agent: role.to_string(),
1083            cycle,
1084            status: "running".to_string(),
1085            cost_usd: 0.0,
1086            turns: 0,
1087            started_at: chrono::Utc::now().to_rfc3339(),
1088            finished_at: None,
1089            output_summary: None,
1090            error_message: None,
1091            raw_output: None,
1092        };
1093        let conn = self.db.lock().await;
1094        db::agent_runs::insert_agent_run(&conn, &agent_run)
1095    }
1096
1097    async fn record_agent_success(
1098        &self,
1099        run_id: &str,
1100        agent_run_id: i64,
1101        agent_result: &crate::process::AgentResult,
1102    ) -> Result<()> {
1103        let conn = self.db.lock().await;
1104        db::agent_runs::finish_agent_run(
1105            &conn,
1106            agent_run_id,
1107            "complete",
1108            agent_result.cost_usd,
1109            agent_result.turns,
1110            Some(&truncate(&agent_result.output, 500)),
1111            None,
1112            Some(&agent_result.output),
1113        )?;
1114
1115        let new_cost = db::runs::increment_run_cost(&conn, run_id, agent_result.cost_usd)?;
1116        drop(conn);
1117
1118        if new_cost > self.config.pipeline.cost_budget {
1119            anyhow::bail!(
1120                "cost budget exceeded: ${:.2} > ${:.2}",
1121                new_cost,
1122                self.config.pipeline.cost_budget
1123            );
1124        }
1125        Ok(())
1126    }
1127
1128    async fn update_status(&self, run_id: &str, status: RunStatus) -> Result<()> {
1129        let conn = self.db.lock().await;
1130        db::runs::update_run_status(&conn, run_id, status)
1131    }
1132
1133    fn check_cancelled(&self) -> Result<()> {
1134        if self.cancel_token.is_cancelled() {
1135            anyhow::bail!("pipeline cancelled");
1136        }
1137        Ok(())
1138    }
1139}
1140
1141/// Check whether a merge error is retryable (stale branch, not permanently blocked).
1142///
1143/// Uses inverted logic: merge errors from GitHub's `mergePullRequest` GraphQL
1144/// mutation are treated as retryable **unless** they match a known permanent
1145/// failure. This avoids playing whack-a-mole with GitHub's varied transient
1146/// error strings (e.g. "not mergeable", "out of date", "Base branch was modified",
1147/// "is expected", "conflict checking is in progress", etc.).
1148///
1149/// Known permanent errors that should NOT be retried:
1150/// - Merge strategy not allowed ("not allowed", "Illegal merge strategy")
1151/// - PR state issues ("draft", "closed", "already merged")
1152/// - Auth/permission ("permission", "not a collaborator")
1153/// - Branch deleted ("does not exist")
1154/// - Branch protection hard blocks ("blocked by branch protection")
1155/// - Merge queue required ("merge queue")
1156const PERMANENT_MERGE_ERRORS: &[&str] = &[
1157    "not allowed",
1158    "Illegal merge strategy",
1159    "still a draft",
1160    "is closed",
1161    "already merged",
1162    "permission",
1163    "not a collaborator",
1164    "does not exist",
1165    "blocked by branch protection",
1166    "merge queue",
1167];
1168
1169fn is_retryable_merge_error(err: &anyhow::Error) -> bool {
1170    let msg = format!("{err:#}");
1171    !PERMANENT_MERGE_ERRORS.iter().any(|p| msg.contains(p))
1172}
1173
1174const COMMENT_FOOTER: &str =
1175    "\n---\nAutomated by [oven](https://github.com/clayharmon/oven-cli) \u{1F35E}";
1176
1177fn format_unresolved_comment(actionable: &[&agents::Finding]) -> String {
1178    let mut comment = String::from(
1179        "### Unresolved review findings\n\n\
1180         The review-fix loop ran 2 cycles but these findings remain unresolved.\n",
1181    );
1182
1183    // Group findings by severity
1184    for severity in &[Severity::Critical, Severity::Warning] {
1185        let group: Vec<_> = actionable.iter().filter(|f| &f.severity == severity).collect();
1186        if group.is_empty() {
1187            continue;
1188        }
1189        let heading = match severity {
1190            Severity::Critical => "Critical",
1191            Severity::Warning => "Warning",
1192            Severity::Info => unreachable!("loop only iterates Critical and Warning"),
1193        };
1194        let _ = writeln!(comment, "\n#### {heading}\n");
1195        for f in group {
1196            let loc = match (&f.file_path, f.line_number) {
1197                (Some(path), Some(line)) => format!(" in `{path}:{line}`"),
1198                (Some(path), None) => format!(" in `{path}`"),
1199                _ => String::new(),
1200            };
1201            let _ = writeln!(comment, "- **{}**{} -- {}", f.category, loc, f.message);
1202        }
1203    }
1204
1205    comment.push_str(COMMENT_FOOTER);
1206    comment
1207}
1208
1209fn format_impl_comment(summary: &str) -> String {
1210    format!("### Implementation complete\n\n{summary}{COMMENT_FOOTER}")
1211}
1212
1213fn format_review_comment(cycle: u32, actionable: &[&agents::Finding]) -> String {
1214    if actionable.is_empty() {
1215        return format!(
1216            "### Review complete (cycle {cycle})\n\n\
1217             Clean review, no actionable findings.{COMMENT_FOOTER}"
1218        );
1219    }
1220
1221    let mut comment = format!(
1222        "### Review complete (cycle {cycle})\n\n\
1223         **{count} finding{s}:**\n",
1224        count = actionable.len(),
1225        s = if actionable.len() == 1 { "" } else { "s" },
1226    );
1227
1228    for f in actionable {
1229        let loc = match (&f.file_path, f.line_number) {
1230            (Some(path), Some(line)) => format!(" in `{path}:{line}`"),
1231            (Some(path), None) => format!(" in `{path}`"),
1232            _ => String::new(),
1233        };
1234        let _ = writeln!(
1235            comment,
1236            "- [{sev}] **{cat}**{loc} -- {msg}",
1237            sev = f.severity,
1238            cat = f.category,
1239            msg = f.message,
1240        );
1241    }
1242
1243    comment.push_str(COMMENT_FOOTER);
1244    comment
1245}
1246
1247fn format_fix_comment(cycle: u32, fixer: &agents::FixerOutput) -> String {
1248    let addressed = fixer.addressed.len();
1249    let disputed = fixer.disputed.len();
1250    format!(
1251        "### Fix complete (cycle {cycle})\n\n\
1252         **Addressed:** {addressed} finding{a_s}\n\
1253         **Disputed:** {disputed} finding{d_s}{COMMENT_FOOTER}",
1254        a_s = if addressed == 1 { "" } else { "s" },
1255        d_s = if disputed == 1 { "" } else { "s" },
1256    )
1257}
1258
1259fn format_rebase_conflict_comment(round: u32, conflicting_files: &[String]) -> String {
1260    format!(
1261        "### Resolving rebase conflicts (round {round})\n\n\
1262         Attempting agent-assisted resolution for {} conflicting file{}: \
1263         {}{COMMENT_FOOTER}",
1264        conflicting_files.len(),
1265        if conflicting_files.len() == 1 { "" } else { "s" },
1266        conflicting_files.iter().map(|f| format!("`{f}`")).collect::<Vec<_>>().join(", "),
1267    )
1268}
1269
1270fn format_fixer_recovery_comment(cycle: u32, new_commits: u32) -> String {
1271    if new_commits > 0 {
1272        format!(
1273            "### Fix complete (cycle {cycle})\n\n\
1274             Fixer made {new_commits} commit{s} but did not produce structured output. \
1275             Addressed findings inferred from changed files.{COMMENT_FOOTER}",
1276            s = if new_commits == 1 { "" } else { "s" },
1277        )
1278    } else {
1279        format!(
1280            "### Fix complete (cycle {cycle})\n\n\
1281             Fixer could not act on the findings (no code changes made). \
1282             Findings marked as not actionable.{COMMENT_FOOTER}"
1283        )
1284    }
1285}
1286
1287fn format_review_skipped_comment(cycle: u32, error: &anyhow::Error) -> String {
1288    format!(
1289        "### Review skipped (cycle {cycle})\n\n\
1290         Reviewer agent encountered an error. Continuing without review.\n\n\
1291         **Error:** {error:#}{COMMENT_FOOTER}"
1292    )
1293}
1294
1295fn format_fix_skipped_comment(cycle: u32, error: &anyhow::Error) -> String {
1296    format!(
1297        "### Fix skipped (cycle {cycle})\n\n\
1298         Fixer agent encountered an error. Continuing to next cycle.\n\n\
1299         **Error:** {error:#}{COMMENT_FOOTER}"
1300    )
1301}
1302
1303fn format_rebase_comment(outcome: &RebaseOutcome) -> String {
1304    match outcome {
1305        RebaseOutcome::Clean => {
1306            format!("### Rebase\n\nRebased onto base branch cleanly.{COMMENT_FOOTER}")
1307        }
1308        RebaseOutcome::AgentResolved => {
1309            format!(
1310                "### Rebase\n\n\
1311                 Rebase had conflicts. Agent resolved them.{COMMENT_FOOTER}"
1312            )
1313        }
1314        RebaseOutcome::RebaseConflicts(_) => {
1315            format!(
1316                "### Rebase\n\n\
1317                 Rebase conflicts present (awaiting resolution).{COMMENT_FOOTER}"
1318            )
1319        }
1320        RebaseOutcome::Failed(msg) => {
1321            format!(
1322                "### Rebase failed\n\n\
1323                 Could not rebase onto the base branch.\n\n\
1324                 **Error:** {msg}{COMMENT_FOOTER}"
1325            )
1326        }
1327    }
1328}
1329
1330fn format_ready_comment() -> String {
1331    format!(
1332        "### Ready for review\n\nPipeline complete. This PR is ready for manual review.{COMMENT_FOOTER}"
1333    )
1334}
1335
1336fn format_merge_comment() -> String {
1337    format!("### Merged\n\nPipeline complete. PR has been merged.{COMMENT_FOOTER}")
1338}
1339
1340fn format_pipeline_failure(e: &anyhow::Error) -> String {
1341    format!(
1342        "## Pipeline failed\n\n\
1343         **Error:** {e:#}\n\n\
1344         The pipeline hit an unrecoverable error. Check the run logs for detail, \
1345         or re-run the pipeline.\
1346         {COMMENT_FOOTER}"
1347    )
1348}
1349
1350/// Build a PR title using the issue metadata.
1351///
1352/// Infers a conventional-commit prefix from the issue title. Falls back to
1353/// `fix` when no keyword matches.
1354fn pr_title(ctx: &AgentContext) -> String {
1355    let prefix = infer_commit_type(&ctx.issue_title);
1356    if ctx.issue_source == "github" {
1357        format!("{prefix}(#{}): {}", ctx.issue_number, ctx.issue_title)
1358    } else {
1359        format!("{prefix}: {}", ctx.issue_title)
1360    }
1361}
1362
1363/// Infer a conventional-commit type from an issue title.
1364fn infer_commit_type(title: &str) -> &'static str {
1365    let lower = title.to_lowercase();
1366    if lower.starts_with("feat") || lower.contains("add ") || lower.contains("implement ") {
1367        "feat"
1368    } else if lower.starts_with("refactor") {
1369        "refactor"
1370    } else if lower.starts_with("docs") || lower.starts_with("document") {
1371        "docs"
1372    } else if lower.starts_with("test") || lower.starts_with("add test") {
1373        "test"
1374    } else if lower.starts_with("chore") {
1375        "chore"
1376    } else {
1377        "fix"
1378    }
1379}
1380
1381/// Build a full PR body from the implementer's output and issue context.
1382fn build_pr_body(impl_output: &str, ctx: &AgentContext) -> String {
1383    let issue_ref = if ctx.issue_source == "github" {
1384        format!("Resolves #{}", ctx.issue_number)
1385    } else {
1386        format!("From local issue #{}", ctx.issue_number)
1387    };
1388
1389    let summary = extract_impl_summary(impl_output);
1390
1391    let mut body = String::new();
1392    let _ = writeln!(body, "{issue_ref}\n");
1393    let _ = write!(body, "{summary}");
1394    body.push_str(COMMENT_FOOTER);
1395    body
1396}
1397
1398/// Extract the summary section from implementer output.
1399///
1400/// Looks for `## PR Template` (repo-specific PR template) or `## Changes Made`
1401/// (default format) headings. Falls back to the full output (truncated) if
1402/// neither heading is found.
1403fn extract_impl_summary(output: &str) -> String {
1404    // Prefer a filled-out PR template if the implementer found one
1405    let idx = output.find("## PR Template").or_else(|| output.find("## Changes Made"));
1406
1407    if let Some(idx) = idx {
1408        let summary = output[idx..].trim();
1409        // Strip the "## PR Template" heading itself so the body reads cleanly
1410        let summary = summary.strip_prefix("## PR Template").map_or(summary, |s| s.trim_start());
1411        if summary.len() <= 4000 {
1412            return summary.to_string();
1413        }
1414        return truncate(summary, 4000);
1415    }
1416    // Fallback: no structured summary found. Don't dump raw agent narration
1417    // (stream-of-consciousness "Let me read..." text) into the PR body.
1418    String::from("*No implementation summary available. See commit history for details.*")
1419}
1420
1421fn new_run(run_id: &str, issue: &PipelineIssue, auto_merge: bool) -> Run {
1422    Run {
1423        id: run_id.to_string(),
1424        issue_number: issue.number,
1425        status: RunStatus::Pending,
1426        pr_number: None,
1427        branch: None,
1428        worktree_path: None,
1429        cost_usd: 0.0,
1430        auto_merge,
1431        started_at: chrono::Utc::now().to_rfc3339(),
1432        finished_at: None,
1433        error_message: None,
1434        complexity: "full".to_string(),
1435        issue_source: issue.source.to_string(),
1436    }
1437}
1438
1439/// Generate an 8-character hex run ID.
1440pub fn generate_run_id() -> String {
1441    uuid::Uuid::new_v4().to_string()[..8].to_string()
1442}
1443
1444/// Truncate a string to at most `max_len` bytes, appending "..." if truncated.
1445///
1446/// Reserves 3 bytes for the "..." suffix so the total output never exceeds `max_len`.
1447/// Always cuts at a valid UTF-8 character boundary to avoid panics on multi-byte input.
1448pub(crate) fn truncate(s: &str, max_len: usize) -> String {
1449    if s.len() <= max_len {
1450        return s.to_string();
1451    }
1452    let target = max_len.saturating_sub(3);
1453    let mut end = target;
1454    while end > 0 && !s.is_char_boundary(end) {
1455        end -= 1;
1456    }
1457    format!("{}...", &s[..end])
1458}
1459
1460#[cfg(test)]
1461mod tests {
1462    use proptest::prelude::*;
1463
1464    use super::*;
1465
1466    proptest! {
1467        #[test]
1468        fn run_ids_always_8_hex_chars(_seed in any::<u64>()) {
1469            let id = generate_run_id();
1470            prop_assert_eq!(id.len(), 8);
1471            prop_assert!(id.chars().all(|c| c.is_ascii_hexdigit()));
1472        }
1473    }
1474
1475    #[test]
1476    fn run_id_is_8_hex_chars() {
1477        let id = generate_run_id();
1478        assert_eq!(id.len(), 8);
1479        assert!(id.chars().all(|c| c.is_ascii_hexdigit()));
1480    }
1481
1482    #[test]
1483    fn run_ids_are_unique() {
1484        let ids: Vec<_> = (0..100).map(|_| generate_run_id()).collect();
1485        let unique: std::collections::HashSet<_> = ids.iter().collect();
1486        assert_eq!(ids.len(), unique.len());
1487    }
1488
1489    #[test]
1490    fn truncate_short_string() {
1491        assert_eq!(truncate("hello", 10), "hello");
1492    }
1493
1494    #[test]
1495    fn truncate_long_string() {
1496        let long = "a".repeat(100);
1497        let result = truncate(&long, 10);
1498        assert_eq!(result.len(), 10); // 7 chars + "..."
1499        assert!(result.ends_with("..."));
1500    }
1501
1502    #[test]
1503    fn truncate_multibyte_does_not_panic() {
1504        // Each emoji is 4 bytes. "πŸ˜€πŸ˜€πŸ˜€" = 12 bytes.
1505        // max_len=8, target=5, walks back to boundary at 4 (one emoji).
1506        let s = "πŸ˜€πŸ˜€πŸ˜€";
1507        let result = truncate(s, 8);
1508        assert!(result.ends_with("..."));
1509        assert!(result.starts_with("πŸ˜€"));
1510        assert!(result.len() <= 8);
1511    }
1512
1513    #[test]
1514    fn truncate_cjk_boundary() {
1515        // CJK chars are 3 bytes each
1516        let s = "δ½ ε₯½δΈ–η•Œζ΅‹θ―•"; // 18 bytes
1517        // max_len=10, target=7, walks back to boundary at 6 (two 3-byte chars).
1518        let result = truncate(s, 10);
1519        assert!(result.ends_with("..."));
1520        assert!(result.starts_with("δ½ ε₯½"));
1521        assert!(result.len() <= 10);
1522    }
1523
1524    #[test]
1525    fn extract_impl_summary_finds_changes_made() {
1526        let output = "Some preamble text\n\n## Changes Made\n- src/foo.rs: added bar\n\n## Tests Added\n- tests/foo.rs: bar test\n";
1527        let summary = extract_impl_summary(output);
1528        assert!(summary.starts_with("## Changes Made"));
1529        assert!(summary.contains("added bar"));
1530        assert!(summary.contains("## Tests Added"));
1531    }
1532
1533    #[test]
1534    fn extract_impl_summary_prefers_pr_template() {
1535        let output = "Preamble\n\n## PR Template\n## Summary\n- Added auth flow\n\n## Testing\n- Unit tests pass\n";
1536        let summary = extract_impl_summary(output);
1537        // Should strip the "## PR Template" heading
1538        assert!(!summary.contains("## PR Template"));
1539        assert!(summary.starts_with("## Summary"));
1540        assert!(summary.contains("Added auth flow"));
1541    }
1542
1543    #[test]
1544    fn extract_impl_summary_fallback_on_no_heading() {
1545        let output = "just some raw agent output with no structure";
1546        let summary = extract_impl_summary(output);
1547        assert_eq!(
1548            summary,
1549            "*No implementation summary available. See commit history for details.*"
1550        );
1551    }
1552
1553    #[test]
1554    fn extract_impl_summary_empty_output() {
1555        let placeholder = "*No implementation summary available. See commit history for details.*";
1556        assert_eq!(extract_impl_summary(""), placeholder);
1557        assert_eq!(extract_impl_summary("   "), placeholder);
1558    }
1559
1560    #[test]
1561    fn build_pr_body_github_issue() {
1562        let ctx = AgentContext {
1563            issue_number: 42,
1564            issue_title: "fix the thing".to_string(),
1565            issue_body: String::new(),
1566            branch: "oven/issue-42".to_string(),
1567            pr_number: Some(10),
1568            test_command: None,
1569            lint_command: None,
1570            review_findings: None,
1571            cycle: 1,
1572            target_repo: None,
1573            issue_source: "github".to_string(),
1574            base_branch: "main".to_string(),
1575        };
1576        let body = build_pr_body("## Changes Made\n- added stuff", &ctx);
1577        assert!(body.contains("Resolves #42"));
1578        assert!(body.contains("## Changes Made"));
1579        assert!(body.contains("Automated by [oven]"));
1580    }
1581
1582    #[test]
1583    fn build_pr_body_local_issue() {
1584        let ctx = AgentContext {
1585            issue_number: 7,
1586            issue_title: "local thing".to_string(),
1587            issue_body: String::new(),
1588            branch: "oven/issue-7".to_string(),
1589            pr_number: Some(10),
1590            test_command: None,
1591            lint_command: None,
1592            review_findings: None,
1593            cycle: 1,
1594            target_repo: None,
1595            issue_source: "local".to_string(),
1596            base_branch: "main".to_string(),
1597        };
1598        let body = build_pr_body("## Changes Made\n- did local stuff", &ctx);
1599        assert!(body.contains("From local issue #7"));
1600        assert!(body.contains("## Changes Made"));
1601    }
1602
1603    #[test]
1604    fn pr_title_github() {
1605        let ctx = AgentContext {
1606            issue_number: 42,
1607            issue_title: "fix the thing".to_string(),
1608            issue_body: String::new(),
1609            branch: String::new(),
1610            pr_number: None,
1611            test_command: None,
1612            lint_command: None,
1613            review_findings: None,
1614            cycle: 1,
1615            target_repo: None,
1616            issue_source: "github".to_string(),
1617            base_branch: "main".to_string(),
1618        };
1619        assert_eq!(pr_title(&ctx), "fix(#42): fix the thing");
1620    }
1621
1622    #[test]
1623    fn pr_title_local() {
1624        let ctx = AgentContext {
1625            issue_number: 7,
1626            issue_title: "local thing".to_string(),
1627            issue_body: String::new(),
1628            branch: String::new(),
1629            pr_number: None,
1630            test_command: None,
1631            lint_command: None,
1632            review_findings: None,
1633            cycle: 1,
1634            target_repo: None,
1635            issue_source: "local".to_string(),
1636            base_branch: "main".to_string(),
1637        };
1638        assert_eq!(pr_title(&ctx), "fix: local thing");
1639    }
1640
1641    #[test]
1642    fn infer_commit_type_feat() {
1643        assert_eq!(infer_commit_type("Add dark mode support"), "feat");
1644        assert_eq!(infer_commit_type("Implement caching layer"), "feat");
1645        assert_eq!(infer_commit_type("Feature: new dashboard"), "feat");
1646    }
1647
1648    #[test]
1649    fn infer_commit_type_refactor() {
1650        assert_eq!(infer_commit_type("Refactor auth middleware"), "refactor");
1651    }
1652
1653    #[test]
1654    fn infer_commit_type_docs() {
1655        assert_eq!(infer_commit_type("Document the API endpoints"), "docs");
1656        assert_eq!(infer_commit_type("Docs: update README"), "docs");
1657    }
1658
1659    #[test]
1660    fn infer_commit_type_defaults_to_fix() {
1661        assert_eq!(infer_commit_type("Null pointer in config parser"), "fix");
1662        assert_eq!(infer_commit_type("Crash on empty input"), "fix");
1663    }
1664
1665    #[test]
1666    fn pr_title_feat_github() {
1667        let ctx = AgentContext {
1668            issue_number: 10,
1669            issue_title: "Add dark mode".to_string(),
1670            issue_body: String::new(),
1671            branch: String::new(),
1672            pr_number: None,
1673            test_command: None,
1674            lint_command: None,
1675            review_findings: None,
1676            cycle: 1,
1677            target_repo: None,
1678            issue_source: "github".to_string(),
1679            base_branch: "main".to_string(),
1680        };
1681        assert_eq!(pr_title(&ctx), "feat(#10): Add dark mode");
1682    }
1683
1684    #[test]
1685    fn format_unresolved_comment_groups_by_severity() {
1686        let findings = [
1687            agents::Finding {
1688                severity: Severity::Critical,
1689                category: "bug".to_string(),
1690                file_path: Some("src/main.rs".to_string()),
1691                line_number: Some(42),
1692                message: "null pointer".to_string(),
1693            },
1694            agents::Finding {
1695                severity: Severity::Warning,
1696                category: "style".to_string(),
1697                file_path: None,
1698                line_number: None,
1699                message: "missing docs".to_string(),
1700            },
1701        ];
1702        let refs: Vec<_> = findings.iter().collect();
1703        let comment = format_unresolved_comment(&refs);
1704        assert!(comment.contains("#### Critical"));
1705        assert!(comment.contains("#### Warning"));
1706        assert!(comment.contains("**bug** in `src/main.rs:42` -- null pointer"));
1707        assert!(comment.contains("**style** -- missing docs"));
1708        assert!(comment.contains("Automated by [oven]"));
1709    }
1710
1711    #[test]
1712    fn format_unresolved_comment_skips_empty_severity_groups() {
1713        let findings = [agents::Finding {
1714            severity: Severity::Warning,
1715            category: "testing".to_string(),
1716            file_path: Some("src/lib.rs".to_string()),
1717            line_number: None,
1718            message: "missing edge case test".to_string(),
1719        }];
1720        let refs: Vec<_> = findings.iter().collect();
1721        let comment = format_unresolved_comment(&refs);
1722        assert!(!comment.contains("#### Critical"));
1723        assert!(comment.contains("#### Warning"));
1724    }
1725
1726    #[test]
1727    fn format_pipeline_failure_includes_error() {
1728        let err = anyhow::anyhow!("cost budget exceeded: $12.50 > $10.00");
1729        let comment = format_pipeline_failure(&err);
1730        assert!(comment.contains("## Pipeline failed"));
1731        assert!(comment.contains("cost budget exceeded"));
1732        assert!(comment.contains("Automated by [oven]"));
1733    }
1734
1735    #[test]
1736    fn format_impl_comment_includes_summary() {
1737        let comment = format_impl_comment("Added login endpoint with tests");
1738        assert!(comment.contains("### Implementation complete"));
1739        assert!(comment.contains("Added login endpoint with tests"));
1740        assert!(comment.contains("Automated by [oven]"));
1741    }
1742
1743    #[test]
1744    fn format_review_comment_clean() {
1745        let comment = format_review_comment(1, &[]);
1746        assert!(comment.contains("### Review complete (cycle 1)"));
1747        assert!(comment.contains("Clean review"));
1748    }
1749
1750    #[test]
1751    fn format_review_comment_with_findings() {
1752        let findings = [agents::Finding {
1753            severity: Severity::Critical,
1754            category: "bug".to_string(),
1755            file_path: Some("src/main.rs".to_string()),
1756            line_number: Some(42),
1757            message: "null pointer".to_string(),
1758        }];
1759        let refs: Vec<_> = findings.iter().collect();
1760        let comment = format_review_comment(1, &refs);
1761        assert!(comment.contains("### Review complete (cycle 1)"));
1762        assert!(comment.contains("1 finding"));
1763        assert!(comment.contains("[critical]"));
1764        assert!(comment.contains("`src/main.rs:42`"));
1765    }
1766
1767    #[test]
1768    fn format_fix_comment_counts() {
1769        let fixer = agents::FixerOutput {
1770            addressed: vec![
1771                agents::FixerAction { finding: 1, action: "fixed it".to_string() },
1772                agents::FixerAction { finding: 2, action: "also fixed".to_string() },
1773            ],
1774            disputed: vec![agents::FixerDispute { finding: 3, reason: "not a bug".to_string() }],
1775        };
1776        let comment = format_fix_comment(1, &fixer);
1777        assert!(comment.contains("### Fix complete (cycle 1)"));
1778        assert!(comment.contains("Addressed:** 2 findings"));
1779        assert!(comment.contains("Disputed:** 1 finding\n"));
1780    }
1781
1782    #[test]
1783    fn format_rebase_comment_variants() {
1784        let clean = format_rebase_comment(&RebaseOutcome::Clean);
1785        assert!(clean.contains("Rebased onto base branch cleanly"));
1786
1787        let agent = format_rebase_comment(&RebaseOutcome::AgentResolved);
1788        assert!(agent.contains("Agent resolved them"));
1789
1790        let conflicts =
1791            format_rebase_comment(&RebaseOutcome::RebaseConflicts(vec!["foo.rs".into()]));
1792        assert!(conflicts.contains("awaiting resolution"));
1793
1794        let failed = format_rebase_comment(&RebaseOutcome::Failed("conflict in foo.rs".into()));
1795        assert!(failed.contains("Rebase failed"));
1796        assert!(failed.contains("conflict in foo.rs"));
1797    }
1798
1799    #[test]
1800    fn format_ready_comment_content() {
1801        let comment = format_ready_comment();
1802        assert!(comment.contains("### Ready for review"));
1803        assert!(comment.contains("manual review"));
1804    }
1805
1806    #[test]
1807    fn format_merge_comment_content() {
1808        let comment = format_merge_comment();
1809        assert!(comment.contains("### Merged"));
1810    }
1811
1812    #[test]
1813    fn retryable_merge_error_transient_errors() {
1814        let transient = [
1815            "GraphQL: Pull Request is not mergeable (mergePullRequest)",
1816            "GraphQL: Head branch is out of date. Review and try the merge again. (mergePullRequest)",
1817            "GraphQL: Base branch was modified. Review and try the merge again. (mergePullRequest)",
1818            "GraphQL: Required status check \"ci\" is expected (mergePullRequest)",
1819            "GraphQL: Merge conflict checking is in progress. Try again in a few moments.",
1820            "GraphQL: Something went wrong while executing your query.",
1821            "GraphQL: Pull request is out of date. Review and try the merge again.",
1822            "GraphQL: The merge-base changed after approval. Review and try the merge again.",
1823            "network timeout",
1824        ];
1825        for msg in transient {
1826            let err = anyhow::anyhow!("{msg}");
1827            assert!(is_retryable_merge_error(&err), "should be retryable: {msg}");
1828        }
1829    }
1830
1831    #[test]
1832    fn retryable_merge_error_permanent_errors() {
1833        let permanent = [
1834            "GraphQL: Squash merges are not allowed on this repository. (mergePullRequest)",
1835            "GraphQL: Rebase merges are not allowed on this repository. (mergePullRequest)",
1836            "GraphQL: Illegal merge strategy for this protected branch. (mergePullRequest)",
1837            "GraphQL: Pull request is still a draft (mergePullRequest)",
1838            "GraphQL: Pull request is closed (mergePullRequest)",
1839            "GraphQL: Pull request is already merged (mergePullRequest)",
1840            "GraphQL: You do not have permission to merge this pull request. (mergePullRequest)",
1841            "GraphQL: User is not a collaborator on the repository (mergePullRequest)",
1842            "GraphQL: Head branch does not exist (mergePullRequest)",
1843            "GraphQL: Merge is blocked by branch protection rules (mergePullRequest)",
1844            "GraphQL: Changes must be made through the merge queue (mergePullRequest)",
1845        ];
1846        for msg in permanent {
1847            let err = anyhow::anyhow!("{msg}");
1848            assert!(!is_retryable_merge_error(&err), "should be permanent: {msg}");
1849        }
1850    }
1851}