Skip to main content

oven_cli/pipeline/
executor.rs

1use std::{fmt::Write as _, path::PathBuf, sync::Arc};
2
3use anyhow::{Context, Result};
4use rusqlite::Connection;
5use tokio::sync::Mutex;
6use tokio_util::sync::CancellationToken;
7use tracing::{debug, info, warn};
8
9use crate::{
10    agents::{
11        self, AgentContext, AgentInvocation, AgentRole, Complexity, GraphContextNode,
12        PlannerGraphOutput, Severity, invoke_agent, parse_fixer_output, parse_planner_graph_output,
13        parse_review_output,
14    },
15    config::Config,
16    db::{self, AgentRun, ReviewFinding, Run, RunStatus},
17    git,
18    github::{self, GhClient},
19    issues::{IssueOrigin, IssueProvider, PipelineIssue},
20    process::CommandRunner,
21};
22
23/// The result of running an issue through the pipeline (before merge).
24#[derive(Debug)]
25pub struct PipelineOutcome {
26    pub run_id: String,
27    pub pr_number: u32,
28    /// Worktree path, retained so the caller can clean up after merge.
29    pub worktree_path: PathBuf,
30    /// Repo directory the worktree belongs to (needed for `git::remove_worktree`).
31    pub target_dir: PathBuf,
32}
33
34/// Runs a single issue through the full pipeline.
35pub struct PipelineExecutor<R: CommandRunner> {
36    pub runner: Arc<R>,
37    pub github: Arc<GhClient<R>>,
38    pub issues: Arc<dyn IssueProvider>,
39    pub db: Arc<Mutex<Connection>>,
40    pub config: Config,
41    pub cancel_token: CancellationToken,
42    pub repo_dir: PathBuf,
43}
44
45impl<R: CommandRunner + 'static> PipelineExecutor<R> {
46    /// Run the full pipeline for a single issue.
47    pub async fn run_issue(&self, issue: &PipelineIssue, auto_merge: bool) -> Result<()> {
48        self.run_issue_with_complexity(issue, auto_merge, None).await
49    }
50
51    /// Run the full pipeline for a single issue with an optional complexity classification.
52    pub async fn run_issue_with_complexity(
53        &self,
54        issue: &PipelineIssue,
55        auto_merge: bool,
56        complexity: Option<Complexity>,
57    ) -> Result<()> {
58        let outcome = self.run_issue_pipeline(issue, auto_merge, complexity).await?;
59        self.finalize_merge(&outcome, issue).await
60    }
61
62    /// Run the pipeline up to (but not including) finalization.
63    ///
64    /// Returns a `PipelineOutcome` with the run ID and PR number.
65    /// The caller is responsible for calling `finalize_run` or `finalize_merge`
66    /// at the appropriate time (e.g., after the PR is actually merged).
67    pub async fn run_issue_pipeline(
68        &self,
69        issue: &PipelineIssue,
70        auto_merge: bool,
71        complexity: Option<Complexity>,
72    ) -> Result<PipelineOutcome> {
73        let run_id = generate_run_id();
74
75        // Determine target repo for worktrees and PRs (multi-repo routing)
76        let (target_dir, is_multi_repo) = self.resolve_target_dir(issue.target_repo.as_ref())?;
77
78        let base_branch = git::default_branch(&target_dir).await?;
79
80        let mut run = new_run(&run_id, issue, auto_merge);
81        if let Some(ref c) = complexity {
82            run.complexity = c.to_string();
83        }
84        {
85            let conn = self.db.lock().await;
86            db::runs::insert_run(&conn, &run)?;
87        }
88
89        self.issues
90            .transition(issue.number, &self.config.labels.ready, &self.config.labels.cooking)
91            .await?;
92
93        let worktree = git::create_worktree(&target_dir, issue.number, &base_branch).await?;
94        self.record_worktree(&run_id, &worktree).await?;
95
96        // Seed branch with an empty commit so GitHub accepts the draft PR
97        git::empty_commit(
98            &worktree.path,
99            &format!("chore: start oven pipeline for issue #{}", issue.number),
100        )
101        .await?;
102
103        info!(
104            run_id = %run_id,
105            issue = issue.number,
106            branch = %worktree.branch,
107            target_repo = ?issue.target_repo,
108            "starting pipeline"
109        );
110
111        let pr_number = self.create_pr(&run_id, issue, &worktree.branch, &target_dir).await?;
112
113        let ctx = AgentContext {
114            issue_number: issue.number,
115            issue_title: issue.title.clone(),
116            issue_body: issue.body.clone(),
117            branch: worktree.branch.clone(),
118            pr_number: Some(pr_number),
119            test_command: self.config.project.test.clone(),
120            lint_command: self.config.project.lint.clone(),
121            review_findings: None,
122            cycle: 1,
123            target_repo: if is_multi_repo { issue.target_repo.clone() } else { None },
124            issue_source: issue.source.as_str().to_string(),
125            base_branch: base_branch.clone(),
126        };
127
128        let result = self.run_steps(&run_id, &ctx, &worktree.path, auto_merge, &target_dir).await;
129
130        if let Err(ref e) = result {
131            // On failure, finalize immediately (no merge to wait for)
132            self.finalize_run(&run_id, issue, pr_number, &result, &target_dir).await?;
133            if let Err(e) = git::remove_worktree(&target_dir, &worktree.path).await {
134                warn!(run_id = %run_id, error = %e, "failed to clean up worktree");
135            }
136            return Err(anyhow::anyhow!("{e:#}"));
137        }
138
139        // Update status to AwaitingMerge
140        self.update_status(&run_id, RunStatus::AwaitingMerge).await?;
141
142        Ok(PipelineOutcome { run_id, pr_number, worktree_path: worktree.path, target_dir })
143    }
144
145    /// Finalize a run after its PR has been merged.
146    ///
147    /// Transitions labels, closes issues, marks the run as complete, and cleans
148    /// up the worktree that was left around while awaiting merge.
149    pub async fn finalize_merge(
150        &self,
151        outcome: &PipelineOutcome,
152        issue: &PipelineIssue,
153    ) -> Result<()> {
154        self.finalize_run(&outcome.run_id, issue, outcome.pr_number, &Ok(()), &outcome.target_dir)
155            .await?;
156        if let Err(e) = git::remove_worktree(&outcome.target_dir, &outcome.worktree_path).await {
157            warn!(
158                run_id = %outcome.run_id,
159                error = %e,
160                "failed to clean up worktree after merge"
161            );
162        }
163        Ok(())
164    }
165
166    /// Invoke the planner agent to decide dependency ordering for a set of issues.
167    ///
168    /// `graph_context` describes the current dependency graph state so the planner
169    /// can avoid scheduling conflicting work alongside in-flight issues.
170    ///
171    /// Returns `None` if the planner fails or returns unparseable output (fallback to default).
172    pub async fn plan_issues(
173        &self,
174        issues: &[PipelineIssue],
175        graph_context: &[GraphContextNode],
176    ) -> Option<PlannerGraphOutput> {
177        let prompt = match agents::planner::build_prompt(issues, graph_context) {
178            Ok(p) => p,
179            Err(e) => {
180                warn!(error = %e, "planner prompt build failed");
181                return None;
182            }
183        };
184        let invocation = AgentInvocation {
185            role: AgentRole::Planner,
186            prompt,
187            working_dir: self.repo_dir.clone(),
188            max_turns: Some(self.config.pipeline.turn_limit),
189        };
190
191        match invoke_agent(self.runner.as_ref(), &invocation).await {
192            Ok(result) => {
193                debug!(output = %result.output, "raw planner output");
194                let parsed = parse_planner_graph_output(&result.output);
195                if parsed.is_none() {
196                    warn!(output = %result.output, "planner returned unparseable output, falling back to all-parallel");
197                }
198                parsed
199            }
200            Err(e) => {
201                warn!(error = %e, "planner agent failed, falling back to all-parallel");
202                None
203            }
204        }
205    }
206
207    /// Determine the effective repo directory for worktrees and PRs.
208    ///
209    /// Returns `(target_dir, is_multi_repo)`. When multi-repo is disabled or no target
210    /// is specified, falls back to `self.repo_dir`.
211    pub(crate) fn resolve_target_dir(
212        &self,
213        target_repo: Option<&String>,
214    ) -> Result<(PathBuf, bool)> {
215        if !self.config.multi_repo.enabled {
216            return Ok((self.repo_dir.clone(), false));
217        }
218        match target_repo {
219            Some(name) => {
220                let path = self.config.resolve_repo(name)?;
221                Ok((path, true))
222            }
223            None => Ok((self.repo_dir.clone(), false)),
224        }
225    }
226
227    /// Reconstruct a `PipelineOutcome` from graph node data (for merge polling).
228    ///
229    /// Worktree paths are deterministic, so we can rebuild the outcome from
230    /// the issue metadata stored on the graph node.
231    pub fn reconstruct_outcome(
232        &self,
233        issue: &PipelineIssue,
234        run_id: &str,
235        pr_number: u32,
236    ) -> Result<PipelineOutcome> {
237        let (target_dir, _) = self.resolve_target_dir(issue.target_repo.as_ref())?;
238        let worktree_path =
239            target_dir.join(".oven").join("worktrees").join(format!("issue-{}", issue.number));
240        Ok(PipelineOutcome { run_id: run_id.to_string(), pr_number, worktree_path, target_dir })
241    }
242
243    async fn record_worktree(&self, run_id: &str, worktree: &git::Worktree) -> Result<()> {
244        let conn = self.db.lock().await;
245        db::runs::update_run_worktree(
246            &conn,
247            run_id,
248            &worktree.branch,
249            &worktree.path.to_string_lossy(),
250        )?;
251        drop(conn);
252        Ok(())
253    }
254
255    async fn create_pr(
256        &self,
257        run_id: &str,
258        issue: &PipelineIssue,
259        branch: &str,
260        repo_dir: &std::path::Path,
261    ) -> Result<u32> {
262        let (pr_title, pr_body) = match issue.source {
263            IssueOrigin::Github => (
264                format!("fix(#{}): {}", issue.number, issue.title),
265                format!(
266                    "Resolves #{}\n\nAutomated by [oven](https://github.com/clayharmon/oven-cli).",
267                    issue.number
268                ),
269            ),
270            IssueOrigin::Local => (
271                format!("fix: {}", issue.title),
272                format!(
273                    "From local issue #{}\n\nAutomated by [oven](https://github.com/clayharmon/oven-cli).",
274                    issue.number
275                ),
276            ),
277        };
278
279        git::push_branch(repo_dir, branch).await?;
280        let pr_number =
281            self.github.create_draft_pr_in(&pr_title, branch, &pr_body, repo_dir).await?;
282
283        {
284            let conn = self.db.lock().await;
285            db::runs::update_run_pr(&conn, run_id, pr_number)?;
286        }
287
288        info!(run_id = %run_id, pr = pr_number, "draft PR created");
289        Ok(pr_number)
290    }
291
292    async fn finalize_run(
293        &self,
294        run_id: &str,
295        issue: &PipelineIssue,
296        pr_number: u32,
297        result: &Result<()>,
298        target_dir: &std::path::Path,
299    ) -> Result<()> {
300        let (final_status, error_msg) = match result {
301            Ok(()) => {
302                self.issues
303                    .transition(
304                        issue.number,
305                        &self.config.labels.cooking,
306                        &self.config.labels.complete,
307                    )
308                    .await?;
309
310                // Close the issue when the merger can't do it:
311                // - Local issues: merger can't use `gh issue close`
312                // - Multi-repo: merger runs in target repo, can't close god-repo issue
313                let should_close =
314                    issue.source == IssueOrigin::Local || issue.target_repo.is_some();
315
316                if should_close {
317                    let comment = issue.target_repo.as_ref().map_or_else(
318                        || format!("Implemented in #{pr_number}"),
319                        |repo_name| format!("Implemented in {repo_name}#{pr_number}"),
320                    );
321                    if let Err(e) = self.issues.close(issue.number, Some(&comment)).await {
322                        warn!(
323                            run_id = %run_id,
324                            error = %e,
325                            "failed to close issue"
326                        );
327                    }
328                }
329
330                (RunStatus::Complete, None)
331            }
332            Err(e) => {
333                warn!(run_id = %run_id, error = %e, "pipeline failed");
334                github::safe_comment(
335                    &self.github,
336                    pr_number,
337                    &format_pipeline_failure(e),
338                    target_dir,
339                )
340                .await;
341                let _ = self
342                    .issues
343                    .transition(
344                        issue.number,
345                        &self.config.labels.cooking,
346                        &self.config.labels.failed,
347                    )
348                    .await;
349                (RunStatus::Failed, Some(format!("{e:#}")))
350            }
351        };
352
353        let conn = self.db.lock().await;
354        db::runs::finish_run(&conn, run_id, final_status, error_msg.as_deref())
355    }
356
357    async fn run_steps(
358        &self,
359        run_id: &str,
360        ctx: &AgentContext,
361        worktree_path: &std::path::Path,
362        auto_merge: bool,
363        target_dir: &std::path::Path,
364    ) -> Result<()> {
365        self.check_cancelled()?;
366
367        // 1. Implement
368        self.update_status(run_id, RunStatus::Implementing).await?;
369        let impl_prompt = agents::implementer::build_prompt(ctx)?;
370        let impl_result =
371            self.run_agent(run_id, AgentRole::Implementer, &impl_prompt, worktree_path, 1).await?;
372
373        git::push_branch(worktree_path, &ctx.branch).await?;
374
375        // 1b. Update PR description and mark ready for review
376        if let Some(pr_number) = ctx.pr_number {
377            let body = build_pr_body(&impl_result.output, ctx);
378            if let Err(e) =
379                self.github.edit_pr_in(pr_number, &pr_title(ctx), &body, target_dir).await
380            {
381                warn!(run_id = %run_id, error = %e, "failed to update PR description");
382            }
383            if let Err(e) = self.github.mark_pr_ready_in(pr_number, target_dir).await {
384                warn!(run_id = %run_id, error = %e, "failed to mark PR ready");
385            }
386        }
387
388        // 2. Review-fix loop
389        let clean = self.run_review_fix_loop(run_id, ctx, worktree_path, target_dir).await?;
390
391        if !clean {
392            anyhow::bail!("unresolved findings after max review cycles");
393        }
394
395        // 3. Rebase onto base branch to resolve any conflicts from parallel merges
396        self.check_cancelled()?;
397        info!(run_id = %run_id, base = %ctx.base_branch, "rebasing onto base branch");
398        if let Err(e) = git::rebase_on_base(worktree_path, &ctx.base_branch).await {
399            if let Some(pr_number) = ctx.pr_number {
400                github::safe_comment(
401                    &self.github,
402                    pr_number,
403                    &format_rebase_failure(&e),
404                    target_dir,
405                )
406                .await;
407            }
408            return Err(e);
409        }
410        git::force_push_branch(worktree_path, &ctx.branch).await?;
411        // 4. Merge (only when auto-merge is enabled)
412        if auto_merge {
413            self.check_cancelled()?;
414            ctx.pr_number.context("no PR number for merge step")?;
415            self.update_status(run_id, RunStatus::Merging).await?;
416            let merge_prompt = agents::merger::build_prompt(ctx, auto_merge)?;
417            self.run_agent(run_id, AgentRole::Merger, &merge_prompt, worktree_path, 1).await?;
418        }
419
420        Ok(())
421    }
422
423    async fn run_review_fix_loop(
424        &self,
425        run_id: &str,
426        ctx: &AgentContext,
427        worktree_path: &std::path::Path,
428        target_dir: &std::path::Path,
429    ) -> Result<bool> {
430        for cycle in 1..=3 {
431            self.check_cancelled()?;
432
433            self.update_status(run_id, RunStatus::Reviewing).await?;
434
435            // On cycles 2+, include prior disputes so the reviewer sees what the
436            // fixer already tried and why certain findings were skipped.
437            let prior_disputes = if cycle > 1 {
438                let conn = self.db.lock().await;
439                db::agent_runs::get_resolved_findings(&conn, run_id)?
440            } else {
441                Vec::new()
442            };
443
444            let review_prompt = agents::reviewer::build_prompt(ctx, &prior_disputes)?;
445            let review_result = self
446                .run_agent(run_id, AgentRole::Reviewer, &review_prompt, worktree_path, cycle)
447                .await?;
448
449            let review_output = match parse_review_output(&review_result.output) {
450                Ok(output) => output,
451                Err(e) => {
452                    warn!(run_id = %run_id, cycle, error = %e, "review output unparseable, treating as failed review");
453                    if let Some(pr_number) = ctx.pr_number {
454                        github::safe_comment(
455                            &self.github,
456                            pr_number,
457                            &format_review_parse_failure(cycle),
458                            target_dir,
459                        )
460                        .await;
461                    }
462                    anyhow::bail!("reviewer returned unparseable output in cycle {cycle}");
463                }
464            };
465            self.store_findings(run_id, &review_output.findings).await?;
466
467            let actionable: Vec<_> =
468                review_output.findings.iter().filter(|f| f.severity != Severity::Info).collect();
469
470            if actionable.is_empty() {
471                info!(run_id = %run_id, cycle, "review clean");
472                return Ok(true);
473            }
474
475            info!(run_id = %run_id, cycle, findings = actionable.len(), "review found issues");
476
477            if cycle == 3 {
478                if let Some(pr_number) = ctx.pr_number {
479                    let comment = format_unresolved_comment(&actionable);
480                    github::safe_comment(&self.github, pr_number, &comment, target_dir).await;
481                } else {
482                    warn!(run_id = %run_id, "no PR number, cannot post unresolved findings");
483                }
484                return Ok(false);
485            }
486
487            // Fix
488            self.check_cancelled()?;
489            self.update_status(run_id, RunStatus::Fixing).await?;
490
491            let unresolved = {
492                let conn = self.db.lock().await;
493                db::agent_runs::get_unresolved_findings(&conn, run_id)?
494            };
495
496            let fix_prompt = agents::fixer::build_prompt(ctx, &unresolved)?;
497            let fix_result =
498                self.run_agent(run_id, AgentRole::Fixer, &fix_prompt, worktree_path, cycle).await?;
499
500            // Parse fixer output and mark disputed findings as resolved
501            let fixer_output = parse_fixer_output(&fix_result.output);
502            self.process_fixer_disputes(run_id, &unresolved, &fixer_output).await?;
503
504            git::push_branch(worktree_path, &ctx.branch).await?;
505        }
506
507        Ok(false)
508    }
509
510    /// Process fixer disputes by marking corresponding review findings as resolved.
511    ///
512    /// The fixer references findings by 1-indexed position in the list it received.
513    /// We map those back to the actual `ReviewFinding` IDs and mark them resolved
514    /// with the fixer's dispute reason.
515    async fn process_fixer_disputes(
516        &self,
517        run_id: &str,
518        findings_sent_to_fixer: &[ReviewFinding],
519        fixer_output: &agents::FixerOutput,
520    ) -> Result<()> {
521        if fixer_output.disputed.is_empty() {
522            return Ok(());
523        }
524
525        let conn = self.db.lock().await;
526        for dispute in &fixer_output.disputed {
527            // finding numbers are 1-indexed
528            let idx = dispute.finding.saturating_sub(1) as usize;
529            if let Some(finding) = findings_sent_to_fixer.get(idx) {
530                db::agent_runs::resolve_finding(&conn, finding.id, &dispute.reason)?;
531                info!(
532                    run_id = %run_id,
533                    finding_id = finding.id,
534                    reason = %dispute.reason,
535                    "finding disputed by fixer, marked resolved"
536                );
537            }
538        }
539        drop(conn);
540        Ok(())
541    }
542
543    async fn store_findings(&self, run_id: &str, findings: &[agents::Finding]) -> Result<()> {
544        let conn = self.db.lock().await;
545        let agent_runs = db::agent_runs::get_agent_runs_for_run(&conn, run_id)?;
546        let reviewer_run_id = agent_runs
547            .iter()
548            .rev()
549            .find_map(|ar| if ar.agent == "reviewer" { Some(ar.id) } else { None });
550        if let Some(ar_id) = reviewer_run_id {
551            for finding in findings {
552                let db_finding = ReviewFinding {
553                    id: 0,
554                    agent_run_id: ar_id,
555                    severity: finding.severity.to_string(),
556                    category: finding.category.clone(),
557                    file_path: finding.file_path.clone(),
558                    line_number: finding.line_number,
559                    message: finding.message.clone(),
560                    resolved: false,
561                    dispute_reason: None,
562                };
563                db::agent_runs::insert_finding(&conn, &db_finding)?;
564            }
565        }
566        drop(conn);
567        Ok(())
568    }
569
570    async fn run_agent(
571        &self,
572        run_id: &str,
573        role: AgentRole,
574        prompt: &str,
575        working_dir: &std::path::Path,
576        cycle: u32,
577    ) -> Result<crate::process::AgentResult> {
578        let agent_run_id = self.record_agent_start(run_id, role, cycle).await?;
579
580        info!(run_id = %run_id, agent = %role, cycle, "agent starting");
581
582        let invocation = AgentInvocation {
583            role,
584            prompt: prompt.to_string(),
585            working_dir: working_dir.to_path_buf(),
586            max_turns: Some(self.config.pipeline.turn_limit),
587        };
588
589        let result = invoke_agent(self.runner.as_ref(), &invocation).await;
590
591        match &result {
592            Ok(agent_result) => {
593                self.record_agent_success(run_id, agent_run_id, agent_result).await?;
594            }
595            Err(e) => {
596                let conn = self.db.lock().await;
597                db::agent_runs::finish_agent_run(
598                    &conn,
599                    agent_run_id,
600                    "failed",
601                    0.0,
602                    0,
603                    None,
604                    Some(&format!("{e:#}")),
605                    None,
606                )?;
607            }
608        }
609
610        result
611    }
612
613    async fn record_agent_start(&self, run_id: &str, role: AgentRole, cycle: u32) -> Result<i64> {
614        let agent_run = AgentRun {
615            id: 0,
616            run_id: run_id.to_string(),
617            agent: role.to_string(),
618            cycle,
619            status: "running".to_string(),
620            cost_usd: 0.0,
621            turns: 0,
622            started_at: chrono::Utc::now().to_rfc3339(),
623            finished_at: None,
624            output_summary: None,
625            error_message: None,
626            raw_output: None,
627        };
628        let conn = self.db.lock().await;
629        db::agent_runs::insert_agent_run(&conn, &agent_run)
630    }
631
632    async fn record_agent_success(
633        &self,
634        run_id: &str,
635        agent_run_id: i64,
636        agent_result: &crate::process::AgentResult,
637    ) -> Result<()> {
638        let conn = self.db.lock().await;
639        db::agent_runs::finish_agent_run(
640            &conn,
641            agent_run_id,
642            "complete",
643            agent_result.cost_usd,
644            agent_result.turns,
645            Some(&truncate(&agent_result.output, 500)),
646            None,
647            Some(&agent_result.output),
648        )?;
649
650        let new_cost = db::runs::increment_run_cost(&conn, run_id, agent_result.cost_usd)?;
651        drop(conn);
652
653        if new_cost > self.config.pipeline.cost_budget {
654            anyhow::bail!(
655                "cost budget exceeded: ${:.2} > ${:.2}",
656                new_cost,
657                self.config.pipeline.cost_budget
658            );
659        }
660        Ok(())
661    }
662
663    async fn update_status(&self, run_id: &str, status: RunStatus) -> Result<()> {
664        let conn = self.db.lock().await;
665        db::runs::update_run_status(&conn, run_id, status)
666    }
667
668    fn check_cancelled(&self) -> Result<()> {
669        if self.cancel_token.is_cancelled() {
670            anyhow::bail!("pipeline cancelled");
671        }
672        Ok(())
673    }
674}
675
676const COMMENT_FOOTER: &str =
677    "\n---\nAutomated by [oven](https://github.com/clayharmon/oven-cli) \u{1F35E}";
678
679fn format_unresolved_comment(actionable: &[&agents::Finding]) -> String {
680    let mut comment = String::from(
681        "## Pipeline stopped: unresolved review findings\n\n\
682         The review-fix loop ran 2 cycles but these findings remain unresolved.\n",
683    );
684
685    // Group findings by severity
686    for severity in &[Severity::Critical, Severity::Warning] {
687        let group: Vec<_> = actionable.iter().filter(|f| &f.severity == severity).collect();
688        if group.is_empty() {
689            continue;
690        }
691        let heading = match severity {
692            Severity::Critical => "Critical",
693            Severity::Warning => "Warning",
694            Severity::Info => unreachable!("loop only iterates Critical and Warning"),
695        };
696        let _ = writeln!(comment, "\n### {heading}\n");
697        for f in group {
698            let loc = match (&f.file_path, f.line_number) {
699                (Some(path), Some(line)) => format!(" in `{path}:{line}`"),
700                (Some(path), None) => format!(" in `{path}`"),
701                _ => String::new(),
702            };
703            let _ = writeln!(comment, "- **{}**{} -- {}", f.category, loc, f.message);
704        }
705    }
706
707    comment.push_str(COMMENT_FOOTER);
708    comment
709}
710
711fn format_pipeline_failure(e: &anyhow::Error) -> String {
712    format!(
713        "## Pipeline failed\n\n\
714         **Error:** {e:#}\n\n\
715         The pipeline hit an unrecoverable error. Check the run logs for detail, \
716         or re-run the pipeline.\
717         {COMMENT_FOOTER}"
718    )
719}
720
721fn format_rebase_failure(e: &anyhow::Error) -> String {
722    format!(
723        "## Pipeline stopped: rebase conflict\n\n\
724         Could not rebase onto the base branch. This usually happens when another \
725         PR merged while this pipeline was running.\n\n\
726         **Error:** {e}\n\n\
727         Rebase manually and re-run the pipeline.\
728         {COMMENT_FOOTER}"
729    )
730}
731
732fn format_review_parse_failure(cycle: u32) -> String {
733    format!(
734        "## Pipeline stopped: review output error\n\n\
735         The reviewer agent returned output that could not be parsed as structured \
736         findings (cycle {cycle}). This usually means the reviewer produced malformed JSON.\n\n\
737         Re-run the pipeline to try again.\
738         {COMMENT_FOOTER}"
739    )
740}
741
742/// Build a PR title using the issue metadata.
743///
744/// Infers a conventional-commit prefix from the issue title. Falls back to
745/// `fix` when no keyword matches.
746fn pr_title(ctx: &AgentContext) -> String {
747    let prefix = infer_commit_type(&ctx.issue_title);
748    if ctx.issue_source == "github" {
749        format!("{prefix}(#{}): {}", ctx.issue_number, ctx.issue_title)
750    } else {
751        format!("{prefix}: {}", ctx.issue_title)
752    }
753}
754
755/// Infer a conventional-commit type from an issue title.
756fn infer_commit_type(title: &str) -> &'static str {
757    let lower = title.to_lowercase();
758    if lower.starts_with("feat") || lower.contains("add ") || lower.contains("implement ") {
759        "feat"
760    } else if lower.starts_with("refactor") {
761        "refactor"
762    } else if lower.starts_with("docs") || lower.starts_with("document") {
763        "docs"
764    } else if lower.starts_with("test") || lower.starts_with("add test") {
765        "test"
766    } else if lower.starts_with("chore") {
767        "chore"
768    } else {
769        "fix"
770    }
771}
772
773/// Build a full PR body from the implementer's output and issue context.
774fn build_pr_body(impl_output: &str, ctx: &AgentContext) -> String {
775    let issue_ref = if ctx.issue_source == "github" {
776        format!("Resolves #{}", ctx.issue_number)
777    } else {
778        format!("From local issue #{}", ctx.issue_number)
779    };
780
781    let summary = extract_impl_summary(impl_output);
782
783    let mut body = String::new();
784    let _ = writeln!(body, "{issue_ref}\n");
785    let _ = write!(body, "{summary}");
786    body.push_str(COMMENT_FOOTER);
787    body
788}
789
790/// Extract the summary section from implementer output.
791///
792/// Looks for `## PR Template` (repo-specific PR template) or `## Changes Made`
793/// (default format) headings. Falls back to the full output (truncated) if
794/// neither heading is found.
795fn extract_impl_summary(output: &str) -> String {
796    // Prefer a filled-out PR template if the implementer found one
797    let idx = output.find("## PR Template").or_else(|| output.find("## Changes Made"));
798
799    if let Some(idx) = idx {
800        let summary = output[idx..].trim();
801        // Strip the "## PR Template" heading itself so the body reads cleanly
802        let summary = summary.strip_prefix("## PR Template").map_or(summary, |s| s.trim_start());
803        if summary.len() <= 4000 {
804            return summary.to_string();
805        }
806        return truncate(summary, 4000);
807    }
808    // Fallback: no structured summary found
809    if output.trim().is_empty() {
810        return String::from("*No implementation summary available.*");
811    }
812    truncate(output.trim(), 2000)
813}
814
815fn new_run(run_id: &str, issue: &PipelineIssue, auto_merge: bool) -> Run {
816    Run {
817        id: run_id.to_string(),
818        issue_number: issue.number,
819        status: RunStatus::Pending,
820        pr_number: None,
821        branch: None,
822        worktree_path: None,
823        cost_usd: 0.0,
824        auto_merge,
825        started_at: chrono::Utc::now().to_rfc3339(),
826        finished_at: None,
827        error_message: None,
828        complexity: "full".to_string(),
829        issue_source: issue.source.to_string(),
830    }
831}
832
833/// Generate an 8-character hex run ID.
834pub fn generate_run_id() -> String {
835    uuid::Uuid::new_v4().to_string()[..8].to_string()
836}
837
838/// Truncate a string to at most `max_len` bytes, appending "..." if truncated.
839///
840/// Reserves 3 bytes for the "..." suffix so the total output never exceeds `max_len`.
841/// Always cuts at a valid UTF-8 character boundary to avoid panics on multi-byte input.
842pub(crate) fn truncate(s: &str, max_len: usize) -> String {
843    if s.len() <= max_len {
844        return s.to_string();
845    }
846    let target = max_len.saturating_sub(3);
847    let mut end = target;
848    while end > 0 && !s.is_char_boundary(end) {
849        end -= 1;
850    }
851    format!("{}...", &s[..end])
852}
853
854#[cfg(test)]
855mod tests {
856    use proptest::prelude::*;
857
858    use super::*;
859
860    proptest! {
861        #[test]
862        fn run_ids_always_8_hex_chars(_seed in any::<u64>()) {
863            let id = generate_run_id();
864            prop_assert_eq!(id.len(), 8);
865            prop_assert!(id.chars().all(|c| c.is_ascii_hexdigit()));
866        }
867    }
868
869    #[test]
870    fn run_id_is_8_hex_chars() {
871        let id = generate_run_id();
872        assert_eq!(id.len(), 8);
873        assert!(id.chars().all(|c| c.is_ascii_hexdigit()));
874    }
875
876    #[test]
877    fn run_ids_are_unique() {
878        let ids: Vec<_> = (0..100).map(|_| generate_run_id()).collect();
879        let unique: std::collections::HashSet<_> = ids.iter().collect();
880        assert_eq!(ids.len(), unique.len());
881    }
882
883    #[test]
884    fn truncate_short_string() {
885        assert_eq!(truncate("hello", 10), "hello");
886    }
887
888    #[test]
889    fn truncate_long_string() {
890        let long = "a".repeat(100);
891        let result = truncate(&long, 10);
892        assert_eq!(result.len(), 10); // 7 chars + "..."
893        assert!(result.ends_with("..."));
894    }
895
896    #[test]
897    fn truncate_multibyte_does_not_panic() {
898        // Each emoji is 4 bytes. "πŸ˜€πŸ˜€πŸ˜€" = 12 bytes.
899        // max_len=8, target=5, walks back to boundary at 4 (one emoji).
900        let s = "πŸ˜€πŸ˜€πŸ˜€";
901        let result = truncate(s, 8);
902        assert!(result.ends_with("..."));
903        assert!(result.starts_with("πŸ˜€"));
904        assert!(result.len() <= 8);
905    }
906
907    #[test]
908    fn truncate_cjk_boundary() {
909        // CJK chars are 3 bytes each
910        let s = "δ½ ε₯½δΈ–η•Œζ΅‹θ―•"; // 18 bytes
911        // max_len=10, target=7, walks back to boundary at 6 (two 3-byte chars).
912        let result = truncate(s, 10);
913        assert!(result.ends_with("..."));
914        assert!(result.starts_with("δ½ ε₯½"));
915        assert!(result.len() <= 10);
916    }
917
918    #[test]
919    fn extract_impl_summary_finds_changes_made() {
920        let output = "Some preamble text\n\n## Changes Made\n- src/foo.rs: added bar\n\n## Tests Added\n- tests/foo.rs: bar test\n";
921        let summary = extract_impl_summary(output);
922        assert!(summary.starts_with("## Changes Made"));
923        assert!(summary.contains("added bar"));
924        assert!(summary.contains("## Tests Added"));
925    }
926
927    #[test]
928    fn extract_impl_summary_prefers_pr_template() {
929        let output = "Preamble\n\n## PR Template\n## Summary\n- Added auth flow\n\n## Testing\n- Unit tests pass\n";
930        let summary = extract_impl_summary(output);
931        // Should strip the "## PR Template" heading
932        assert!(!summary.contains("## PR Template"));
933        assert!(summary.starts_with("## Summary"));
934        assert!(summary.contains("Added auth flow"));
935    }
936
937    #[test]
938    fn extract_impl_summary_fallback_on_no_heading() {
939        let output = "just some raw agent output with no structure";
940        let summary = extract_impl_summary(output);
941        assert!(summary.contains("just some raw agent output"));
942    }
943
944    #[test]
945    fn extract_impl_summary_empty_output() {
946        assert_eq!(extract_impl_summary(""), "*No implementation summary available.*");
947        assert_eq!(extract_impl_summary("   "), "*No implementation summary available.*");
948    }
949
950    #[test]
951    fn build_pr_body_github_issue() {
952        let ctx = AgentContext {
953            issue_number: 42,
954            issue_title: "fix the thing".to_string(),
955            issue_body: String::new(),
956            branch: "oven/issue-42".to_string(),
957            pr_number: Some(10),
958            test_command: None,
959            lint_command: None,
960            review_findings: None,
961            cycle: 1,
962            target_repo: None,
963            issue_source: "github".to_string(),
964            base_branch: "main".to_string(),
965        };
966        let body = build_pr_body("## Changes Made\n- added stuff", &ctx);
967        assert!(body.contains("Resolves #42"));
968        assert!(body.contains("## Changes Made"));
969        assert!(body.contains("Automated by [oven]"));
970    }
971
972    #[test]
973    fn build_pr_body_local_issue() {
974        let ctx = AgentContext {
975            issue_number: 7,
976            issue_title: "local thing".to_string(),
977            issue_body: String::new(),
978            branch: "oven/issue-7".to_string(),
979            pr_number: Some(10),
980            test_command: None,
981            lint_command: None,
982            review_findings: None,
983            cycle: 1,
984            target_repo: None,
985            issue_source: "local".to_string(),
986            base_branch: "main".to_string(),
987        };
988        let body = build_pr_body("raw output", &ctx);
989        assert!(body.contains("From local issue #7"));
990    }
991
992    #[test]
993    fn pr_title_github() {
994        let ctx = AgentContext {
995            issue_number: 42,
996            issue_title: "fix the thing".to_string(),
997            issue_body: String::new(),
998            branch: String::new(),
999            pr_number: None,
1000            test_command: None,
1001            lint_command: None,
1002            review_findings: None,
1003            cycle: 1,
1004            target_repo: None,
1005            issue_source: "github".to_string(),
1006            base_branch: "main".to_string(),
1007        };
1008        assert_eq!(pr_title(&ctx), "fix(#42): fix the thing");
1009    }
1010
1011    #[test]
1012    fn pr_title_local() {
1013        let ctx = AgentContext {
1014            issue_number: 7,
1015            issue_title: "local thing".to_string(),
1016            issue_body: String::new(),
1017            branch: String::new(),
1018            pr_number: None,
1019            test_command: None,
1020            lint_command: None,
1021            review_findings: None,
1022            cycle: 1,
1023            target_repo: None,
1024            issue_source: "local".to_string(),
1025            base_branch: "main".to_string(),
1026        };
1027        assert_eq!(pr_title(&ctx), "fix: local thing");
1028    }
1029
1030    #[test]
1031    fn infer_commit_type_feat() {
1032        assert_eq!(infer_commit_type("Add dark mode support"), "feat");
1033        assert_eq!(infer_commit_type("Implement caching layer"), "feat");
1034        assert_eq!(infer_commit_type("Feature: new dashboard"), "feat");
1035    }
1036
1037    #[test]
1038    fn infer_commit_type_refactor() {
1039        assert_eq!(infer_commit_type("Refactor auth middleware"), "refactor");
1040    }
1041
1042    #[test]
1043    fn infer_commit_type_docs() {
1044        assert_eq!(infer_commit_type("Document the API endpoints"), "docs");
1045        assert_eq!(infer_commit_type("Docs: update README"), "docs");
1046    }
1047
1048    #[test]
1049    fn infer_commit_type_defaults_to_fix() {
1050        assert_eq!(infer_commit_type("Null pointer in config parser"), "fix");
1051        assert_eq!(infer_commit_type("Crash on empty input"), "fix");
1052    }
1053
1054    #[test]
1055    fn pr_title_feat_github() {
1056        let ctx = AgentContext {
1057            issue_number: 10,
1058            issue_title: "Add dark mode".to_string(),
1059            issue_body: String::new(),
1060            branch: String::new(),
1061            pr_number: None,
1062            test_command: None,
1063            lint_command: None,
1064            review_findings: None,
1065            cycle: 1,
1066            target_repo: None,
1067            issue_source: "github".to_string(),
1068            base_branch: "main".to_string(),
1069        };
1070        assert_eq!(pr_title(&ctx), "feat(#10): Add dark mode");
1071    }
1072
1073    #[test]
1074    fn format_unresolved_comment_groups_by_severity() {
1075        let findings = [
1076            agents::Finding {
1077                severity: Severity::Critical,
1078                category: "bug".to_string(),
1079                file_path: Some("src/main.rs".to_string()),
1080                line_number: Some(42),
1081                message: "null pointer".to_string(),
1082            },
1083            agents::Finding {
1084                severity: Severity::Warning,
1085                category: "style".to_string(),
1086                file_path: None,
1087                line_number: None,
1088                message: "missing docs".to_string(),
1089            },
1090        ];
1091        let refs: Vec<_> = findings.iter().collect();
1092        let comment = format_unresolved_comment(&refs);
1093        assert!(comment.contains("### Critical"));
1094        assert!(comment.contains("### Warning"));
1095        assert!(comment.contains("**bug** in `src/main.rs:42` -- null pointer"));
1096        assert!(comment.contains("**style** -- missing docs"));
1097        assert!(comment.contains("Automated by [oven]"));
1098    }
1099
1100    #[test]
1101    fn format_unresolved_comment_skips_empty_severity_groups() {
1102        let findings = [agents::Finding {
1103            severity: Severity::Warning,
1104            category: "testing".to_string(),
1105            file_path: Some("src/lib.rs".to_string()),
1106            line_number: None,
1107            message: "missing edge case test".to_string(),
1108        }];
1109        let refs: Vec<_> = findings.iter().collect();
1110        let comment = format_unresolved_comment(&refs);
1111        assert!(!comment.contains("### Critical"));
1112        assert!(comment.contains("### Warning"));
1113    }
1114
1115    #[test]
1116    fn format_pipeline_failure_includes_error() {
1117        let err = anyhow::anyhow!("cost budget exceeded: $12.50 > $10.00");
1118        let comment = format_pipeline_failure(&err);
1119        assert!(comment.contains("## Pipeline failed"));
1120        assert!(comment.contains("cost budget exceeded"));
1121        assert!(comment.contains("Automated by [oven]"));
1122    }
1123
1124    #[test]
1125    fn format_rebase_failure_includes_error() {
1126        let err = anyhow::anyhow!("merge conflict in src/config/mod.rs");
1127        let comment = format_rebase_failure(&err);
1128        assert!(comment.contains("## Pipeline stopped: rebase conflict"));
1129        assert!(comment.contains("merge conflict"));
1130        assert!(comment.contains("Rebase manually"));
1131    }
1132
1133    #[test]
1134    fn format_review_parse_failure_includes_cycle() {
1135        let comment = format_review_parse_failure(2);
1136        assert!(comment.contains("## Pipeline stopped: review output error"));
1137        assert!(comment.contains("cycle 2"));
1138        assert!(comment.contains("Re-run the pipeline"));
1139    }
1140}