Skip to main content

oven_cli/pipeline/
executor.rs

1use std::{fmt::Write as _, path::PathBuf, sync::Arc};
2
3use anyhow::{Context, Result};
4use rusqlite::Connection;
5use tokio::sync::Mutex;
6use tokio_util::sync::CancellationToken;
7use tracing::{debug, info, warn};
8
9use crate::{
10    agents::{
11        self, AgentContext, AgentInvocation, AgentRole, Complexity, GraphContextNode,
12        PlannerGraphOutput, Severity, invoke_agent, parse_fixer_output, parse_planner_graph_output,
13        parse_review_output,
14    },
15    config::Config,
16    db::{self, AgentRun, ReviewFinding, Run, RunStatus},
17    git,
18    github::{self, GhClient},
19    issues::{IssueOrigin, IssueProvider, PipelineIssue},
20    process::CommandRunner,
21};
22
23/// The result of running an issue through the pipeline (before merge).
24#[derive(Debug)]
25pub struct PipelineOutcome {
26    pub run_id: String,
27    pub pr_number: u32,
28    /// Worktree path, retained so the caller can clean up after merge.
29    pub worktree_path: PathBuf,
30    /// Repo directory the worktree belongs to (needed for `git::remove_worktree`).
31    pub target_dir: PathBuf,
32}
33
34/// Runs a single issue through the full pipeline.
35pub struct PipelineExecutor<R: CommandRunner> {
36    pub runner: Arc<R>,
37    pub github: Arc<GhClient<R>>,
38    pub issues: Arc<dyn IssueProvider>,
39    pub db: Arc<Mutex<Connection>>,
40    pub config: Config,
41    pub cancel_token: CancellationToken,
42    pub repo_dir: PathBuf,
43}
44
45impl<R: CommandRunner + 'static> PipelineExecutor<R> {
46    /// Run the full pipeline for a single issue.
47    pub async fn run_issue(&self, issue: &PipelineIssue, auto_merge: bool) -> Result<()> {
48        self.run_issue_with_complexity(issue, auto_merge, None).await
49    }
50
51    /// Run the full pipeline for a single issue with an optional complexity classification.
52    pub async fn run_issue_with_complexity(
53        &self,
54        issue: &PipelineIssue,
55        auto_merge: bool,
56        complexity: Option<Complexity>,
57    ) -> Result<()> {
58        let outcome = self.run_issue_pipeline(issue, auto_merge, complexity).await?;
59        self.finalize_merge(&outcome, issue).await
60    }
61
62    /// Run the pipeline up to (but not including) finalization.
63    ///
64    /// Returns a `PipelineOutcome` with the run ID and PR number.
65    /// The caller is responsible for calling `finalize_run` or `finalize_merge`
66    /// at the appropriate time (e.g., after the PR is actually merged).
67    pub async fn run_issue_pipeline(
68        &self,
69        issue: &PipelineIssue,
70        auto_merge: bool,
71        complexity: Option<Complexity>,
72    ) -> Result<PipelineOutcome> {
73        let run_id = generate_run_id();
74
75        // Determine target repo for worktrees and PRs (multi-repo routing)
76        let (target_dir, is_multi_repo) = self.resolve_target_dir(issue.target_repo.as_ref())?;
77
78        let base_branch = git::default_branch(&target_dir).await?;
79
80        let mut run = new_run(&run_id, issue, auto_merge);
81        if let Some(ref c) = complexity {
82            run.complexity = c.to_string();
83        }
84        {
85            let conn = self.db.lock().await;
86            db::runs::insert_run(&conn, &run)?;
87        }
88
89        self.issues
90            .transition(issue.number, &self.config.labels.ready, &self.config.labels.cooking)
91            .await?;
92
93        let worktree = git::create_worktree(&target_dir, issue.number, &base_branch).await?;
94        self.record_worktree(&run_id, &worktree).await?;
95
96        // Seed branch with an empty commit so GitHub accepts the draft PR
97        git::empty_commit(
98            &worktree.path,
99            &format!("chore: start oven pipeline for issue #{}", issue.number),
100        )
101        .await?;
102
103        info!(
104            run_id = %run_id,
105            issue = issue.number,
106            branch = %worktree.branch,
107            target_repo = ?issue.target_repo,
108            "starting pipeline"
109        );
110
111        let pr_number = self.create_pr(&run_id, issue, &worktree.branch, &target_dir).await?;
112
113        let ctx = AgentContext {
114            issue_number: issue.number,
115            issue_title: issue.title.clone(),
116            issue_body: issue.body.clone(),
117            branch: worktree.branch.clone(),
118            pr_number: Some(pr_number),
119            test_command: self.config.project.test.clone(),
120            lint_command: self.config.project.lint.clone(),
121            review_findings: None,
122            cycle: 1,
123            target_repo: if is_multi_repo { issue.target_repo.clone() } else { None },
124            issue_source: issue.source.as_str().to_string(),
125            base_branch: base_branch.clone(),
126        };
127
128        let result = self.run_steps(&run_id, &ctx, &worktree.path, auto_merge, &target_dir).await;
129
130        if let Err(ref e) = result {
131            // On failure, finalize immediately (no merge to wait for)
132            self.finalize_run(&run_id, issue, pr_number, &result, &target_dir).await?;
133            if let Err(e) = git::remove_worktree(&target_dir, &worktree.path).await {
134                warn!(run_id = %run_id, error = %e, "failed to clean up worktree");
135            }
136            return Err(anyhow::anyhow!("{e:#}"));
137        }
138
139        // Update status to AwaitingMerge
140        self.update_status(&run_id, RunStatus::AwaitingMerge).await?;
141
142        Ok(PipelineOutcome { run_id, pr_number, worktree_path: worktree.path, target_dir })
143    }
144
145    /// Finalize a run after its PR has been merged.
146    ///
147    /// Transitions labels, closes issues, marks the run as complete, and cleans
148    /// up the worktree that was left around while awaiting merge.
149    pub async fn finalize_merge(
150        &self,
151        outcome: &PipelineOutcome,
152        issue: &PipelineIssue,
153    ) -> Result<()> {
154        self.finalize_run(&outcome.run_id, issue, outcome.pr_number, &Ok(()), &outcome.target_dir)
155            .await?;
156        if let Err(e) = git::remove_worktree(&outcome.target_dir, &outcome.worktree_path).await {
157            warn!(
158                run_id = %outcome.run_id,
159                error = %e,
160                "failed to clean up worktree after merge"
161            );
162        }
163        Ok(())
164    }
165
166    /// Invoke the planner agent to decide dependency ordering for a set of issues.
167    ///
168    /// `graph_context` describes the current dependency graph state so the planner
169    /// can avoid scheduling conflicting work alongside in-flight issues.
170    ///
171    /// Returns `None` if the planner fails or returns unparseable output (fallback to default).
172    pub async fn plan_issues(
173        &self,
174        issues: &[PipelineIssue],
175        graph_context: &[GraphContextNode],
176    ) -> Option<PlannerGraphOutput> {
177        let prompt = match agents::planner::build_prompt(issues, graph_context) {
178            Ok(p) => p,
179            Err(e) => {
180                warn!(error = %e, "planner prompt build failed");
181                return None;
182            }
183        };
184        let invocation = AgentInvocation {
185            role: AgentRole::Planner,
186            prompt,
187            working_dir: self.repo_dir.clone(),
188            max_turns: Some(self.config.pipeline.turn_limit),
189        };
190
191        match invoke_agent(self.runner.as_ref(), &invocation).await {
192            Ok(result) => {
193                debug!(output = %result.output, "raw planner output");
194                let parsed = parse_planner_graph_output(&result.output);
195                if parsed.is_none() {
196                    warn!(output = %result.output, "planner returned unparseable output, falling back to all-parallel");
197                }
198                parsed
199            }
200            Err(e) => {
201                warn!(error = %e, "planner agent failed, falling back to all-parallel");
202                None
203            }
204        }
205    }
206
207    /// Determine the effective repo directory for worktrees and PRs.
208    ///
209    /// Returns `(target_dir, is_multi_repo)`. When multi-repo is disabled or no target
210    /// is specified, falls back to `self.repo_dir`.
211    pub(crate) fn resolve_target_dir(
212        &self,
213        target_repo: Option<&String>,
214    ) -> Result<(PathBuf, bool)> {
215        if !self.config.multi_repo.enabled {
216            return Ok((self.repo_dir.clone(), false));
217        }
218        match target_repo {
219            Some(name) => {
220                let path = self.config.resolve_repo(name)?;
221                Ok((path, true))
222            }
223            None => Ok((self.repo_dir.clone(), false)),
224        }
225    }
226
227    /// Reconstruct a `PipelineOutcome` from graph node data (for merge polling).
228    ///
229    /// Worktree paths are deterministic, so we can rebuild the outcome from
230    /// the issue metadata stored on the graph node.
231    pub fn reconstruct_outcome(
232        &self,
233        issue: &PipelineIssue,
234        run_id: &str,
235        pr_number: u32,
236    ) -> Result<PipelineOutcome> {
237        let (target_dir, _) = self.resolve_target_dir(issue.target_repo.as_ref())?;
238        let worktree_path =
239            target_dir.join(".oven").join("worktrees").join(format!("issue-{}", issue.number));
240        Ok(PipelineOutcome { run_id: run_id.to_string(), pr_number, worktree_path, target_dir })
241    }
242
243    async fn record_worktree(&self, run_id: &str, worktree: &git::Worktree) -> Result<()> {
244        let conn = self.db.lock().await;
245        db::runs::update_run_worktree(
246            &conn,
247            run_id,
248            &worktree.branch,
249            &worktree.path.to_string_lossy(),
250        )?;
251        drop(conn);
252        Ok(())
253    }
254
255    async fn create_pr(
256        &self,
257        run_id: &str,
258        issue: &PipelineIssue,
259        branch: &str,
260        repo_dir: &std::path::Path,
261    ) -> Result<u32> {
262        let (pr_title, pr_body) = match issue.source {
263            IssueOrigin::Github => (
264                format!("fix(#{}): {}", issue.number, issue.title),
265                format!(
266                    "Resolves #{}\n\nAutomated by [oven](https://github.com/clayharmon/oven-cli).",
267                    issue.number
268                ),
269            ),
270            IssueOrigin::Local => (
271                format!("fix: {}", issue.title),
272                format!(
273                    "From local issue #{}\n\nAutomated by [oven](https://github.com/clayharmon/oven-cli).",
274                    issue.number
275                ),
276            ),
277        };
278
279        git::push_branch(repo_dir, branch).await?;
280        let pr_number =
281            self.github.create_draft_pr_in(&pr_title, branch, &pr_body, repo_dir).await?;
282
283        {
284            let conn = self.db.lock().await;
285            db::runs::update_run_pr(&conn, run_id, pr_number)?;
286        }
287
288        info!(run_id = %run_id, pr = pr_number, "draft PR created");
289        Ok(pr_number)
290    }
291
292    async fn finalize_run(
293        &self,
294        run_id: &str,
295        issue: &PipelineIssue,
296        pr_number: u32,
297        result: &Result<()>,
298        target_dir: &std::path::Path,
299    ) -> Result<()> {
300        let (final_status, error_msg) = match result {
301            Ok(()) => {
302                self.issues
303                    .transition(
304                        issue.number,
305                        &self.config.labels.cooking,
306                        &self.config.labels.complete,
307                    )
308                    .await?;
309
310                // Close the issue when the merger can't do it:
311                // - Local issues: merger can't use `gh issue close`
312                // - Multi-repo: merger runs in target repo, can't close god-repo issue
313                let should_close =
314                    issue.source == IssueOrigin::Local || issue.target_repo.is_some();
315
316                if should_close {
317                    let comment = issue.target_repo.as_ref().map_or_else(
318                        || format!("Implemented in #{pr_number}"),
319                        |repo_name| format!("Implemented in {repo_name}#{pr_number}"),
320                    );
321                    if let Err(e) = self.issues.close(issue.number, Some(&comment)).await {
322                        warn!(
323                            run_id = %run_id,
324                            error = %e,
325                            "failed to close issue"
326                        );
327                    }
328                }
329
330                (RunStatus::Complete, None)
331            }
332            Err(e) => {
333                warn!(run_id = %run_id, error = %e, "pipeline failed");
334                github::safe_comment(
335                    &self.github,
336                    pr_number,
337                    &format_pipeline_failure(e),
338                    target_dir,
339                )
340                .await;
341                let _ = self
342                    .issues
343                    .transition(
344                        issue.number,
345                        &self.config.labels.cooking,
346                        &self.config.labels.failed,
347                    )
348                    .await;
349                (RunStatus::Failed, Some(format!("{e:#}")))
350            }
351        };
352
353        let conn = self.db.lock().await;
354        db::runs::finish_run(&conn, run_id, final_status, error_msg.as_deref())
355    }
356
357    async fn run_steps(
358        &self,
359        run_id: &str,
360        ctx: &AgentContext,
361        worktree_path: &std::path::Path,
362        auto_merge: bool,
363        target_dir: &std::path::Path,
364    ) -> Result<()> {
365        self.check_cancelled()?;
366
367        // 1. Implement
368        self.update_status(run_id, RunStatus::Implementing).await?;
369        let impl_prompt = agents::implementer::build_prompt(ctx)?;
370        let impl_result =
371            self.run_agent(run_id, AgentRole::Implementer, &impl_prompt, worktree_path, 1).await?;
372
373        git::push_branch(worktree_path, &ctx.branch).await?;
374
375        // 1b. Update PR description and mark ready for review
376        if let Some(pr_number) = ctx.pr_number {
377            let body = build_pr_body(&impl_result.output, ctx);
378            if let Err(e) =
379                self.github.edit_pr_in(pr_number, &pr_title(ctx), &body, target_dir).await
380            {
381                warn!(run_id = %run_id, error = %e, "failed to update PR description");
382            }
383            if let Err(e) = self.github.mark_pr_ready_in(pr_number, target_dir).await {
384                warn!(run_id = %run_id, error = %e, "failed to mark PR ready");
385            }
386        }
387
388        // 2. Review-fix loop
389        let clean = self.run_review_fix_loop(run_id, ctx, worktree_path, target_dir).await?;
390
391        if !clean {
392            anyhow::bail!("unresolved findings after max review cycles");
393        }
394
395        // 3. Rebase onto base branch to resolve any conflicts from parallel merges
396        self.check_cancelled()?;
397        info!(run_id = %run_id, base = %ctx.base_branch, "rebasing onto base branch");
398        if let Err(e) = git::rebase_on_base(worktree_path, &ctx.base_branch).await {
399            if let Some(pr_number) = ctx.pr_number {
400                github::safe_comment(
401                    &self.github,
402                    pr_number,
403                    &format_rebase_failure(&e),
404                    target_dir,
405                )
406                .await;
407            }
408            return Err(e);
409        }
410        git::force_push_branch(worktree_path, &ctx.branch).await?;
411        // 4. Merge (only when auto-merge is enabled)
412        if auto_merge {
413            self.check_cancelled()?;
414            ctx.pr_number.context("no PR number for merge step")?;
415            self.update_status(run_id, RunStatus::Merging).await?;
416            let merge_prompt = agents::merger::build_prompt(ctx, auto_merge)?;
417            self.run_agent(run_id, AgentRole::Merger, &merge_prompt, worktree_path, 1).await?;
418        }
419
420        Ok(())
421    }
422
423    async fn run_review_fix_loop(
424        &self,
425        run_id: &str,
426        ctx: &AgentContext,
427        worktree_path: &std::path::Path,
428        target_dir: &std::path::Path,
429    ) -> Result<bool> {
430        for cycle in 1..=3 {
431            self.check_cancelled()?;
432
433            self.update_status(run_id, RunStatus::Reviewing).await?;
434
435            // On cycles 2+, include prior resolved findings so the reviewer sees
436            // what was addressed (fixed) and what was disputed (rejected) by the fixer.
437            let (prior_addressed, prior_disputes) = if cycle > 1 {
438                let conn = self.db.lock().await;
439                let all_resolved = db::agent_runs::get_resolved_findings(&conn, run_id)?;
440                drop(conn);
441
442                let (mut addressed, disputed): (Vec<_>, Vec<_>) =
443                    all_resolved.into_iter().partition(|f| {
444                        f.dispute_reason.as_deref().is_some_and(|r| r.starts_with("ADDRESSED: "))
445                    });
446
447                // Strip the "ADDRESSED: " prefix so the template gets clean action text
448                for f in &mut addressed {
449                    if let Some(ref mut reason) = f.dispute_reason {
450                        if let Some(stripped) = reason.strip_prefix("ADDRESSED: ") {
451                            *reason = stripped.to_string();
452                        }
453                    }
454                }
455
456                (addressed, disputed)
457            } else {
458                (Vec::new(), Vec::new())
459            };
460
461            let review_prompt =
462                agents::reviewer::build_prompt(ctx, &prior_addressed, &prior_disputes)?;
463            let review_result = self
464                .run_agent(run_id, AgentRole::Reviewer, &review_prompt, worktree_path, cycle)
465                .await?;
466
467            let review_output = match parse_review_output(&review_result.output) {
468                Ok(output) => output,
469                Err(e) => {
470                    warn!(run_id = %run_id, cycle, error = %e, "review output unparseable, treating as failed review");
471                    if let Some(pr_number) = ctx.pr_number {
472                        github::safe_comment(
473                            &self.github,
474                            pr_number,
475                            &format_review_parse_failure(cycle),
476                            target_dir,
477                        )
478                        .await;
479                    }
480                    anyhow::bail!("reviewer returned unparseable output in cycle {cycle}");
481                }
482            };
483            self.store_findings(run_id, &review_output.findings).await?;
484
485            let actionable: Vec<_> =
486                review_output.findings.iter().filter(|f| f.severity != Severity::Info).collect();
487
488            if actionable.is_empty() {
489                info!(run_id = %run_id, cycle, "review clean");
490                return Ok(true);
491            }
492
493            info!(run_id = %run_id, cycle, findings = actionable.len(), "review found issues");
494
495            if cycle == 3 {
496                if let Some(pr_number) = ctx.pr_number {
497                    let comment = format_unresolved_comment(&actionable);
498                    github::safe_comment(&self.github, pr_number, &comment, target_dir).await;
499                } else {
500                    warn!(run_id = %run_id, "no PR number, cannot post unresolved findings");
501                }
502                return Ok(false);
503            }
504
505            // Fix
506            self.check_cancelled()?;
507            self.update_status(run_id, RunStatus::Fixing).await?;
508
509            let unresolved = {
510                let conn = self.db.lock().await;
511                db::agent_runs::get_unresolved_findings(&conn, run_id)?
512            };
513
514            let fix_prompt = agents::fixer::build_prompt(ctx, &unresolved)?;
515            let fix_result =
516                self.run_agent(run_id, AgentRole::Fixer, &fix_prompt, worktree_path, cycle).await?;
517
518            // Parse fixer output and mark disputed + addressed findings as resolved
519            let fixer_output = parse_fixer_output(&fix_result.output);
520            self.process_fixer_disputes(run_id, &unresolved, &fixer_output).await?;
521            self.process_fixer_addressed(run_id, &unresolved, &fixer_output).await?;
522
523            git::push_branch(worktree_path, &ctx.branch).await?;
524        }
525
526        Ok(false)
527    }
528
529    /// Process fixer disputes by marking corresponding review findings as resolved.
530    ///
531    /// The fixer references findings by 1-indexed position in the list it received.
532    /// We map those back to the actual `ReviewFinding` IDs and mark them resolved
533    /// with the fixer's dispute reason.
534    async fn process_fixer_disputes(
535        &self,
536        run_id: &str,
537        findings_sent_to_fixer: &[ReviewFinding],
538        fixer_output: &agents::FixerOutput,
539    ) -> Result<()> {
540        if fixer_output.disputed.is_empty() {
541            return Ok(());
542        }
543
544        let conn = self.db.lock().await;
545        for dispute in &fixer_output.disputed {
546            // finding numbers are 1-indexed
547            let idx = dispute.finding.saturating_sub(1) as usize;
548            if let Some(finding) = findings_sent_to_fixer.get(idx) {
549                db::agent_runs::resolve_finding(&conn, finding.id, &dispute.reason)?;
550                info!(
551                    run_id = %run_id,
552                    finding_id = finding.id,
553                    reason = %dispute.reason,
554                    "finding disputed by fixer, marked resolved"
555                );
556            }
557        }
558        drop(conn);
559        Ok(())
560    }
561
562    /// Process fixer addressed actions by marking corresponding review findings as resolved.
563    ///
564    /// Similar to `process_fixer_disputes`, but for findings the fixer actually fixed.
565    /// Stores the action with an `ADDRESSED: ` prefix so we can distinguish addressed
566    /// findings from disputed ones when building the next reviewer prompt.
567    async fn process_fixer_addressed(
568        &self,
569        run_id: &str,
570        findings_sent_to_fixer: &[ReviewFinding],
571        fixer_output: &agents::FixerOutput,
572    ) -> Result<()> {
573        if fixer_output.addressed.is_empty() {
574            return Ok(());
575        }
576
577        let conn = self.db.lock().await;
578        for action in &fixer_output.addressed {
579            let idx = action.finding.saturating_sub(1) as usize;
580            if let Some(finding) = findings_sent_to_fixer.get(idx) {
581                let reason = format!("ADDRESSED: {}", action.action);
582                db::agent_runs::resolve_finding(&conn, finding.id, &reason)?;
583                info!(
584                    run_id = %run_id,
585                    finding_id = finding.id,
586                    action = %action.action,
587                    "finding addressed by fixer, marked resolved"
588                );
589            }
590        }
591        drop(conn);
592        Ok(())
593    }
594
595    async fn store_findings(&self, run_id: &str, findings: &[agents::Finding]) -> Result<()> {
596        let conn = self.db.lock().await;
597        let agent_runs = db::agent_runs::get_agent_runs_for_run(&conn, run_id)?;
598        let reviewer_run_id = agent_runs
599            .iter()
600            .rev()
601            .find_map(|ar| if ar.agent == "reviewer" { Some(ar.id) } else { None });
602        if let Some(ar_id) = reviewer_run_id {
603            for finding in findings {
604                let db_finding = ReviewFinding {
605                    id: 0,
606                    agent_run_id: ar_id,
607                    severity: finding.severity.to_string(),
608                    category: finding.category.clone(),
609                    file_path: finding.file_path.clone(),
610                    line_number: finding.line_number,
611                    message: finding.message.clone(),
612                    resolved: false,
613                    dispute_reason: None,
614                };
615                db::agent_runs::insert_finding(&conn, &db_finding)?;
616            }
617        }
618        drop(conn);
619        Ok(())
620    }
621
622    async fn run_agent(
623        &self,
624        run_id: &str,
625        role: AgentRole,
626        prompt: &str,
627        working_dir: &std::path::Path,
628        cycle: u32,
629    ) -> Result<crate::process::AgentResult> {
630        let agent_run_id = self.record_agent_start(run_id, role, cycle).await?;
631
632        info!(run_id = %run_id, agent = %role, cycle, "agent starting");
633
634        let invocation = AgentInvocation {
635            role,
636            prompt: prompt.to_string(),
637            working_dir: working_dir.to_path_buf(),
638            max_turns: Some(self.config.pipeline.turn_limit),
639        };
640
641        let result = invoke_agent(self.runner.as_ref(), &invocation).await;
642
643        match &result {
644            Ok(agent_result) => {
645                self.record_agent_success(run_id, agent_run_id, agent_result).await?;
646            }
647            Err(e) => {
648                let conn = self.db.lock().await;
649                db::agent_runs::finish_agent_run(
650                    &conn,
651                    agent_run_id,
652                    "failed",
653                    0.0,
654                    0,
655                    None,
656                    Some(&format!("{e:#}")),
657                    None,
658                )?;
659            }
660        }
661
662        result
663    }
664
665    async fn record_agent_start(&self, run_id: &str, role: AgentRole, cycle: u32) -> Result<i64> {
666        let agent_run = AgentRun {
667            id: 0,
668            run_id: run_id.to_string(),
669            agent: role.to_string(),
670            cycle,
671            status: "running".to_string(),
672            cost_usd: 0.0,
673            turns: 0,
674            started_at: chrono::Utc::now().to_rfc3339(),
675            finished_at: None,
676            output_summary: None,
677            error_message: None,
678            raw_output: None,
679        };
680        let conn = self.db.lock().await;
681        db::agent_runs::insert_agent_run(&conn, &agent_run)
682    }
683
684    async fn record_agent_success(
685        &self,
686        run_id: &str,
687        agent_run_id: i64,
688        agent_result: &crate::process::AgentResult,
689    ) -> Result<()> {
690        let conn = self.db.lock().await;
691        db::agent_runs::finish_agent_run(
692            &conn,
693            agent_run_id,
694            "complete",
695            agent_result.cost_usd,
696            agent_result.turns,
697            Some(&truncate(&agent_result.output, 500)),
698            None,
699            Some(&agent_result.output),
700        )?;
701
702        let new_cost = db::runs::increment_run_cost(&conn, run_id, agent_result.cost_usd)?;
703        drop(conn);
704
705        if new_cost > self.config.pipeline.cost_budget {
706            anyhow::bail!(
707                "cost budget exceeded: ${:.2} > ${:.2}",
708                new_cost,
709                self.config.pipeline.cost_budget
710            );
711        }
712        Ok(())
713    }
714
715    async fn update_status(&self, run_id: &str, status: RunStatus) -> Result<()> {
716        let conn = self.db.lock().await;
717        db::runs::update_run_status(&conn, run_id, status)
718    }
719
720    fn check_cancelled(&self) -> Result<()> {
721        if self.cancel_token.is_cancelled() {
722            anyhow::bail!("pipeline cancelled");
723        }
724        Ok(())
725    }
726}
727
728const COMMENT_FOOTER: &str =
729    "\n---\nAutomated by [oven](https://github.com/clayharmon/oven-cli) \u{1F35E}";
730
731fn format_unresolved_comment(actionable: &[&agents::Finding]) -> String {
732    let mut comment = String::from(
733        "## Pipeline stopped: unresolved review findings\n\n\
734         The review-fix loop ran 2 cycles but these findings remain unresolved.\n",
735    );
736
737    // Group findings by severity
738    for severity in &[Severity::Critical, Severity::Warning] {
739        let group: Vec<_> = actionable.iter().filter(|f| &f.severity == severity).collect();
740        if group.is_empty() {
741            continue;
742        }
743        let heading = match severity {
744            Severity::Critical => "Critical",
745            Severity::Warning => "Warning",
746            Severity::Info => unreachable!("loop only iterates Critical and Warning"),
747        };
748        let _ = writeln!(comment, "\n### {heading}\n");
749        for f in group {
750            let loc = match (&f.file_path, f.line_number) {
751                (Some(path), Some(line)) => format!(" in `{path}:{line}`"),
752                (Some(path), None) => format!(" in `{path}`"),
753                _ => String::new(),
754            };
755            let _ = writeln!(comment, "- **{}**{} -- {}", f.category, loc, f.message);
756        }
757    }
758
759    comment.push_str(COMMENT_FOOTER);
760    comment
761}
762
763fn format_pipeline_failure(e: &anyhow::Error) -> String {
764    format!(
765        "## Pipeline failed\n\n\
766         **Error:** {e:#}\n\n\
767         The pipeline hit an unrecoverable error. Check the run logs for detail, \
768         or re-run the pipeline.\
769         {COMMENT_FOOTER}"
770    )
771}
772
773fn format_rebase_failure(e: &anyhow::Error) -> String {
774    format!(
775        "## Pipeline stopped: rebase conflict\n\n\
776         Could not rebase onto the base branch. This usually happens when another \
777         PR merged while this pipeline was running.\n\n\
778         **Error:** {e}\n\n\
779         Rebase manually and re-run the pipeline.\
780         {COMMENT_FOOTER}"
781    )
782}
783
784fn format_review_parse_failure(cycle: u32) -> String {
785    format!(
786        "## Pipeline stopped: review output error\n\n\
787         The reviewer agent returned output that could not be parsed as structured \
788         findings (cycle {cycle}). This usually means the reviewer produced malformed JSON.\n\n\
789         Re-run the pipeline to try again.\
790         {COMMENT_FOOTER}"
791    )
792}
793
794/// Build a PR title using the issue metadata.
795///
796/// Infers a conventional-commit prefix from the issue title. Falls back to
797/// `fix` when no keyword matches.
798fn pr_title(ctx: &AgentContext) -> String {
799    let prefix = infer_commit_type(&ctx.issue_title);
800    if ctx.issue_source == "github" {
801        format!("{prefix}(#{}): {}", ctx.issue_number, ctx.issue_title)
802    } else {
803        format!("{prefix}: {}", ctx.issue_title)
804    }
805}
806
807/// Infer a conventional-commit type from an issue title.
808fn infer_commit_type(title: &str) -> &'static str {
809    let lower = title.to_lowercase();
810    if lower.starts_with("feat") || lower.contains("add ") || lower.contains("implement ") {
811        "feat"
812    } else if lower.starts_with("refactor") {
813        "refactor"
814    } else if lower.starts_with("docs") || lower.starts_with("document") {
815        "docs"
816    } else if lower.starts_with("test") || lower.starts_with("add test") {
817        "test"
818    } else if lower.starts_with("chore") {
819        "chore"
820    } else {
821        "fix"
822    }
823}
824
825/// Build a full PR body from the implementer's output and issue context.
826fn build_pr_body(impl_output: &str, ctx: &AgentContext) -> String {
827    let issue_ref = if ctx.issue_source == "github" {
828        format!("Resolves #{}", ctx.issue_number)
829    } else {
830        format!("From local issue #{}", ctx.issue_number)
831    };
832
833    let summary = extract_impl_summary(impl_output);
834
835    let mut body = String::new();
836    let _ = writeln!(body, "{issue_ref}\n");
837    let _ = write!(body, "{summary}");
838    body.push_str(COMMENT_FOOTER);
839    body
840}
841
842/// Extract the summary section from implementer output.
843///
844/// Looks for `## PR Template` (repo-specific PR template) or `## Changes Made`
845/// (default format) headings. Falls back to the full output (truncated) if
846/// neither heading is found.
847fn extract_impl_summary(output: &str) -> String {
848    // Prefer a filled-out PR template if the implementer found one
849    let idx = output.find("## PR Template").or_else(|| output.find("## Changes Made"));
850
851    if let Some(idx) = idx {
852        let summary = output[idx..].trim();
853        // Strip the "## PR Template" heading itself so the body reads cleanly
854        let summary = summary.strip_prefix("## PR Template").map_or(summary, |s| s.trim_start());
855        if summary.len() <= 4000 {
856            return summary.to_string();
857        }
858        return truncate(summary, 4000);
859    }
860    // Fallback: no structured summary found. Don't dump raw agent narration
861    // (stream-of-consciousness "Let me read..." text) into the PR body.
862    String::from("*No implementation summary available. See commit history for details.*")
863}
864
865fn new_run(run_id: &str, issue: &PipelineIssue, auto_merge: bool) -> Run {
866    Run {
867        id: run_id.to_string(),
868        issue_number: issue.number,
869        status: RunStatus::Pending,
870        pr_number: None,
871        branch: None,
872        worktree_path: None,
873        cost_usd: 0.0,
874        auto_merge,
875        started_at: chrono::Utc::now().to_rfc3339(),
876        finished_at: None,
877        error_message: None,
878        complexity: "full".to_string(),
879        issue_source: issue.source.to_string(),
880    }
881}
882
883/// Generate an 8-character hex run ID.
884pub fn generate_run_id() -> String {
885    uuid::Uuid::new_v4().to_string()[..8].to_string()
886}
887
888/// Truncate a string to at most `max_len` bytes, appending "..." if truncated.
889///
890/// Reserves 3 bytes for the "..." suffix so the total output never exceeds `max_len`.
891/// Always cuts at a valid UTF-8 character boundary to avoid panics on multi-byte input.
892pub(crate) fn truncate(s: &str, max_len: usize) -> String {
893    if s.len() <= max_len {
894        return s.to_string();
895    }
896    let target = max_len.saturating_sub(3);
897    let mut end = target;
898    while end > 0 && !s.is_char_boundary(end) {
899        end -= 1;
900    }
901    format!("{}...", &s[..end])
902}
903
904#[cfg(test)]
905mod tests {
906    use proptest::prelude::*;
907
908    use super::*;
909
910    proptest! {
911        #[test]
912        fn run_ids_always_8_hex_chars(_seed in any::<u64>()) {
913            let id = generate_run_id();
914            prop_assert_eq!(id.len(), 8);
915            prop_assert!(id.chars().all(|c| c.is_ascii_hexdigit()));
916        }
917    }
918
919    #[test]
920    fn run_id_is_8_hex_chars() {
921        let id = generate_run_id();
922        assert_eq!(id.len(), 8);
923        assert!(id.chars().all(|c| c.is_ascii_hexdigit()));
924    }
925
926    #[test]
927    fn run_ids_are_unique() {
928        let ids: Vec<_> = (0..100).map(|_| generate_run_id()).collect();
929        let unique: std::collections::HashSet<_> = ids.iter().collect();
930        assert_eq!(ids.len(), unique.len());
931    }
932
933    #[test]
934    fn truncate_short_string() {
935        assert_eq!(truncate("hello", 10), "hello");
936    }
937
938    #[test]
939    fn truncate_long_string() {
940        let long = "a".repeat(100);
941        let result = truncate(&long, 10);
942        assert_eq!(result.len(), 10); // 7 chars + "..."
943        assert!(result.ends_with("..."));
944    }
945
946    #[test]
947    fn truncate_multibyte_does_not_panic() {
948        // Each emoji is 4 bytes. "πŸ˜€πŸ˜€πŸ˜€" = 12 bytes.
949        // max_len=8, target=5, walks back to boundary at 4 (one emoji).
950        let s = "πŸ˜€πŸ˜€πŸ˜€";
951        let result = truncate(s, 8);
952        assert!(result.ends_with("..."));
953        assert!(result.starts_with("πŸ˜€"));
954        assert!(result.len() <= 8);
955    }
956
957    #[test]
958    fn truncate_cjk_boundary() {
959        // CJK chars are 3 bytes each
960        let s = "δ½ ε₯½δΈ–η•Œζ΅‹θ―•"; // 18 bytes
961        // max_len=10, target=7, walks back to boundary at 6 (two 3-byte chars).
962        let result = truncate(s, 10);
963        assert!(result.ends_with("..."));
964        assert!(result.starts_with("δ½ ε₯½"));
965        assert!(result.len() <= 10);
966    }
967
968    #[test]
969    fn extract_impl_summary_finds_changes_made() {
970        let output = "Some preamble text\n\n## Changes Made\n- src/foo.rs: added bar\n\n## Tests Added\n- tests/foo.rs: bar test\n";
971        let summary = extract_impl_summary(output);
972        assert!(summary.starts_with("## Changes Made"));
973        assert!(summary.contains("added bar"));
974        assert!(summary.contains("## Tests Added"));
975    }
976
977    #[test]
978    fn extract_impl_summary_prefers_pr_template() {
979        let output = "Preamble\n\n## PR Template\n## Summary\n- Added auth flow\n\n## Testing\n- Unit tests pass\n";
980        let summary = extract_impl_summary(output);
981        // Should strip the "## PR Template" heading
982        assert!(!summary.contains("## PR Template"));
983        assert!(summary.starts_with("## Summary"));
984        assert!(summary.contains("Added auth flow"));
985    }
986
987    #[test]
988    fn extract_impl_summary_fallback_on_no_heading() {
989        let output = "just some raw agent output with no structure";
990        let summary = extract_impl_summary(output);
991        assert_eq!(
992            summary,
993            "*No implementation summary available. See commit history for details.*"
994        );
995    }
996
997    #[test]
998    fn extract_impl_summary_empty_output() {
999        let placeholder = "*No implementation summary available. See commit history for details.*";
1000        assert_eq!(extract_impl_summary(""), placeholder);
1001        assert_eq!(extract_impl_summary("   "), placeholder);
1002    }
1003
1004    #[test]
1005    fn build_pr_body_github_issue() {
1006        let ctx = AgentContext {
1007            issue_number: 42,
1008            issue_title: "fix the thing".to_string(),
1009            issue_body: String::new(),
1010            branch: "oven/issue-42".to_string(),
1011            pr_number: Some(10),
1012            test_command: None,
1013            lint_command: None,
1014            review_findings: None,
1015            cycle: 1,
1016            target_repo: None,
1017            issue_source: "github".to_string(),
1018            base_branch: "main".to_string(),
1019        };
1020        let body = build_pr_body("## Changes Made\n- added stuff", &ctx);
1021        assert!(body.contains("Resolves #42"));
1022        assert!(body.contains("## Changes Made"));
1023        assert!(body.contains("Automated by [oven]"));
1024    }
1025
1026    #[test]
1027    fn build_pr_body_local_issue() {
1028        let ctx = AgentContext {
1029            issue_number: 7,
1030            issue_title: "local thing".to_string(),
1031            issue_body: String::new(),
1032            branch: "oven/issue-7".to_string(),
1033            pr_number: Some(10),
1034            test_command: None,
1035            lint_command: None,
1036            review_findings: None,
1037            cycle: 1,
1038            target_repo: None,
1039            issue_source: "local".to_string(),
1040            base_branch: "main".to_string(),
1041        };
1042        let body = build_pr_body("## Changes Made\n- did local stuff", &ctx);
1043        assert!(body.contains("From local issue #7"));
1044        assert!(body.contains("## Changes Made"));
1045    }
1046
1047    #[test]
1048    fn pr_title_github() {
1049        let ctx = AgentContext {
1050            issue_number: 42,
1051            issue_title: "fix the thing".to_string(),
1052            issue_body: String::new(),
1053            branch: String::new(),
1054            pr_number: None,
1055            test_command: None,
1056            lint_command: None,
1057            review_findings: None,
1058            cycle: 1,
1059            target_repo: None,
1060            issue_source: "github".to_string(),
1061            base_branch: "main".to_string(),
1062        };
1063        assert_eq!(pr_title(&ctx), "fix(#42): fix the thing");
1064    }
1065
1066    #[test]
1067    fn pr_title_local() {
1068        let ctx = AgentContext {
1069            issue_number: 7,
1070            issue_title: "local thing".to_string(),
1071            issue_body: String::new(),
1072            branch: String::new(),
1073            pr_number: None,
1074            test_command: None,
1075            lint_command: None,
1076            review_findings: None,
1077            cycle: 1,
1078            target_repo: None,
1079            issue_source: "local".to_string(),
1080            base_branch: "main".to_string(),
1081        };
1082        assert_eq!(pr_title(&ctx), "fix: local thing");
1083    }
1084
1085    #[test]
1086    fn infer_commit_type_feat() {
1087        assert_eq!(infer_commit_type("Add dark mode support"), "feat");
1088        assert_eq!(infer_commit_type("Implement caching layer"), "feat");
1089        assert_eq!(infer_commit_type("Feature: new dashboard"), "feat");
1090    }
1091
1092    #[test]
1093    fn infer_commit_type_refactor() {
1094        assert_eq!(infer_commit_type("Refactor auth middleware"), "refactor");
1095    }
1096
1097    #[test]
1098    fn infer_commit_type_docs() {
1099        assert_eq!(infer_commit_type("Document the API endpoints"), "docs");
1100        assert_eq!(infer_commit_type("Docs: update README"), "docs");
1101    }
1102
1103    #[test]
1104    fn infer_commit_type_defaults_to_fix() {
1105        assert_eq!(infer_commit_type("Null pointer in config parser"), "fix");
1106        assert_eq!(infer_commit_type("Crash on empty input"), "fix");
1107    }
1108
1109    #[test]
1110    fn pr_title_feat_github() {
1111        let ctx = AgentContext {
1112            issue_number: 10,
1113            issue_title: "Add dark mode".to_string(),
1114            issue_body: String::new(),
1115            branch: String::new(),
1116            pr_number: None,
1117            test_command: None,
1118            lint_command: None,
1119            review_findings: None,
1120            cycle: 1,
1121            target_repo: None,
1122            issue_source: "github".to_string(),
1123            base_branch: "main".to_string(),
1124        };
1125        assert_eq!(pr_title(&ctx), "feat(#10): Add dark mode");
1126    }
1127
1128    #[test]
1129    fn format_unresolved_comment_groups_by_severity() {
1130        let findings = [
1131            agents::Finding {
1132                severity: Severity::Critical,
1133                category: "bug".to_string(),
1134                file_path: Some("src/main.rs".to_string()),
1135                line_number: Some(42),
1136                message: "null pointer".to_string(),
1137            },
1138            agents::Finding {
1139                severity: Severity::Warning,
1140                category: "style".to_string(),
1141                file_path: None,
1142                line_number: None,
1143                message: "missing docs".to_string(),
1144            },
1145        ];
1146        let refs: Vec<_> = findings.iter().collect();
1147        let comment = format_unresolved_comment(&refs);
1148        assert!(comment.contains("### Critical"));
1149        assert!(comment.contains("### Warning"));
1150        assert!(comment.contains("**bug** in `src/main.rs:42` -- null pointer"));
1151        assert!(comment.contains("**style** -- missing docs"));
1152        assert!(comment.contains("Automated by [oven]"));
1153    }
1154
1155    #[test]
1156    fn format_unresolved_comment_skips_empty_severity_groups() {
1157        let findings = [agents::Finding {
1158            severity: Severity::Warning,
1159            category: "testing".to_string(),
1160            file_path: Some("src/lib.rs".to_string()),
1161            line_number: None,
1162            message: "missing edge case test".to_string(),
1163        }];
1164        let refs: Vec<_> = findings.iter().collect();
1165        let comment = format_unresolved_comment(&refs);
1166        assert!(!comment.contains("### Critical"));
1167        assert!(comment.contains("### Warning"));
1168    }
1169
1170    #[test]
1171    fn format_pipeline_failure_includes_error() {
1172        let err = anyhow::anyhow!("cost budget exceeded: $12.50 > $10.00");
1173        let comment = format_pipeline_failure(&err);
1174        assert!(comment.contains("## Pipeline failed"));
1175        assert!(comment.contains("cost budget exceeded"));
1176        assert!(comment.contains("Automated by [oven]"));
1177    }
1178
1179    #[test]
1180    fn format_rebase_failure_includes_error() {
1181        let err = anyhow::anyhow!("merge conflict in src/config/mod.rs");
1182        let comment = format_rebase_failure(&err);
1183        assert!(comment.contains("## Pipeline stopped: rebase conflict"));
1184        assert!(comment.contains("merge conflict"));
1185        assert!(comment.contains("Rebase manually"));
1186    }
1187
1188    #[test]
1189    fn format_review_parse_failure_includes_cycle() {
1190        let comment = format_review_parse_failure(2);
1191        assert!(comment.contains("## Pipeline stopped: review output error"));
1192        assert!(comment.contains("cycle 2"));
1193        assert!(comment.contains("Re-run the pipeline"));
1194    }
1195}