Skip to main content

oven_cli/pipeline/
executor.rs

1use std::{fmt::Write as _, path::PathBuf, sync::Arc};
2
3use anyhow::{Context, Result};
4use rusqlite::Connection;
5use tokio::sync::Mutex;
6use tokio_util::sync::CancellationToken;
7use tracing::{debug, info, warn};
8
9use crate::{
10    agents::{
11        self, AgentContext, AgentInvocation, AgentRole, Complexity, InFlightIssue, PlannerOutput,
12        Severity, invoke_agent, parse_planner_output, parse_review_output,
13    },
14    config::Config,
15    db::{self, AgentRun, ReviewFinding, Run, RunStatus},
16    git,
17    github::{self, GhClient},
18    issues::{IssueOrigin, IssueProvider, PipelineIssue},
19    process::CommandRunner,
20};
21
22/// Runs a single issue through the full pipeline.
23pub struct PipelineExecutor<R: CommandRunner> {
24    pub runner: Arc<R>,
25    pub github: Arc<GhClient<R>>,
26    pub issues: Arc<dyn IssueProvider>,
27    pub db: Arc<Mutex<Connection>>,
28    pub config: Config,
29    pub cancel_token: CancellationToken,
30    pub repo_dir: PathBuf,
31}
32
33impl<R: CommandRunner + 'static> PipelineExecutor<R> {
34    /// Run the full pipeline for a single issue.
35    pub async fn run_issue(&self, issue: &PipelineIssue, auto_merge: bool) -> Result<()> {
36        self.run_issue_with_complexity(issue, auto_merge, None).await
37    }
38
39    /// Run the full pipeline for a single issue with an optional complexity classification.
40    pub async fn run_issue_with_complexity(
41        &self,
42        issue: &PipelineIssue,
43        auto_merge: bool,
44        complexity: Option<Complexity>,
45    ) -> Result<()> {
46        let run_id = generate_run_id();
47
48        // Determine target repo for worktrees and PRs (multi-repo routing)
49        let (target_dir, is_multi_repo) = self.resolve_target_dir(issue.target_repo.as_ref())?;
50
51        let base_branch = git::default_branch(&target_dir).await?;
52
53        let mut run = new_run(&run_id, issue, auto_merge);
54        if let Some(ref c) = complexity {
55            run.complexity = c.to_string();
56        }
57        {
58            let conn = self.db.lock().await;
59            db::runs::insert_run(&conn, &run)?;
60        }
61
62        self.issues
63            .transition(issue.number, &self.config.labels.ready, &self.config.labels.cooking)
64            .await?;
65
66        let worktree = git::create_worktree(&target_dir, issue.number, &base_branch).await?;
67        self.record_worktree(&run_id, &worktree).await?;
68
69        // Seed branch with an empty commit so GitHub accepts the draft PR
70        git::empty_commit(
71            &worktree.path,
72            &format!("chore: start oven pipeline for issue #{}", issue.number),
73        )
74        .await?;
75
76        info!(
77            run_id = %run_id,
78            issue = issue.number,
79            branch = %worktree.branch,
80            target_repo = ?issue.target_repo,
81            "starting pipeline"
82        );
83
84        let pr_number = self.create_pr(&run_id, issue, &worktree.branch, &target_dir).await?;
85
86        let ctx = AgentContext {
87            issue_number: issue.number,
88            issue_title: issue.title.clone(),
89            issue_body: issue.body.clone(),
90            branch: worktree.branch.clone(),
91            pr_number: Some(pr_number),
92            test_command: self.config.project.test.clone(),
93            lint_command: self.config.project.lint.clone(),
94            review_findings: None,
95            cycle: 1,
96            target_repo: if is_multi_repo { issue.target_repo.clone() } else { None },
97            issue_source: issue.source.as_str().to_string(),
98            base_branch: base_branch.clone(),
99        };
100
101        let result = self.run_steps(&run_id, &ctx, &worktree.path, auto_merge, &base_branch).await;
102        self.finalize_run(&run_id, issue, pr_number, &result).await?;
103
104        if let Err(e) = git::remove_worktree(&target_dir, &worktree.path).await {
105            warn!(run_id = %run_id, error = %e, "failed to clean up worktree");
106        }
107
108        result
109    }
110
111    /// Invoke the planner agent to decide batching and complexity for a set of issues.
112    ///
113    /// `in_flight` describes issues currently running through the pipeline so the planner
114    /// can avoid scheduling conflicting work in batch 1.
115    ///
116    /// Returns `None` if the planner fails or returns unparseable output (fallback to default).
117    pub async fn plan_issues(
118        &self,
119        issues: &[PipelineIssue],
120        in_flight: &[InFlightIssue],
121    ) -> Option<PlannerOutput> {
122        let prompt = match agents::planner::build_prompt(issues, in_flight) {
123            Ok(p) => p,
124            Err(e) => {
125                warn!(error = %e, "planner prompt build failed");
126                return None;
127            }
128        };
129        let invocation = AgentInvocation {
130            role: AgentRole::Planner,
131            prompt,
132            working_dir: self.repo_dir.clone(),
133            max_turns: Some(self.config.pipeline.turn_limit),
134        };
135
136        match invoke_agent(self.runner.as_ref(), &invocation).await {
137            Ok(result) => {
138                debug!(output = %result.output, "raw planner output");
139                let parsed = parse_planner_output(&result.output);
140                if parsed.is_none() {
141                    warn!(output = %result.output, "planner returned unparseable output, falling back to single batch");
142                }
143                parsed
144            }
145            Err(e) => {
146                warn!(error = %e, "planner agent failed, falling back to single batch");
147                None
148            }
149        }
150    }
151
152    /// Determine the effective repo directory for worktrees and PRs.
153    ///
154    /// Returns `(target_dir, is_multi_repo)`. When multi-repo is disabled or no target
155    /// is specified, falls back to `self.repo_dir`.
156    fn resolve_target_dir(&self, target_repo: Option<&String>) -> Result<(PathBuf, bool)> {
157        if !self.config.multi_repo.enabled {
158            return Ok((self.repo_dir.clone(), false));
159        }
160        match target_repo {
161            Some(name) => {
162                let path = self.config.resolve_repo(name)?;
163                Ok((path, true))
164            }
165            None => Ok((self.repo_dir.clone(), false)),
166        }
167    }
168
169    async fn record_worktree(&self, run_id: &str, worktree: &git::Worktree) -> Result<()> {
170        let conn = self.db.lock().await;
171        db::runs::update_run_worktree(
172            &conn,
173            run_id,
174            &worktree.branch,
175            &worktree.path.to_string_lossy(),
176        )?;
177        drop(conn);
178        Ok(())
179    }
180
181    async fn create_pr(
182        &self,
183        run_id: &str,
184        issue: &PipelineIssue,
185        branch: &str,
186        repo_dir: &std::path::Path,
187    ) -> Result<u32> {
188        let (pr_title, pr_body) = match issue.source {
189            IssueOrigin::Github => (
190                format!("fix(#{}): {}", issue.number, issue.title),
191                format!(
192                    "Resolves #{}\n\nAutomated by [oven](https://github.com/clayharmon/oven-cli).",
193                    issue.number
194                ),
195            ),
196            IssueOrigin::Local => (
197                format!("fix: {}", issue.title),
198                format!(
199                    "From local issue #{}\n\nAutomated by [oven](https://github.com/clayharmon/oven-cli).",
200                    issue.number
201                ),
202            ),
203        };
204
205        git::push_branch(repo_dir, branch).await?;
206        let pr_number =
207            self.github.create_draft_pr_in(&pr_title, branch, &pr_body, repo_dir).await?;
208
209        {
210            let conn = self.db.lock().await;
211            db::runs::update_run_pr(&conn, run_id, pr_number)?;
212        }
213
214        info!(run_id = %run_id, pr = pr_number, "draft PR created");
215        Ok(pr_number)
216    }
217
218    async fn finalize_run(
219        &self,
220        run_id: &str,
221        issue: &PipelineIssue,
222        pr_number: u32,
223        result: &Result<()>,
224    ) -> Result<()> {
225        let (final_status, error_msg) = match result {
226            Ok(()) => {
227                self.issues
228                    .transition(
229                        issue.number,
230                        &self.config.labels.cooking,
231                        &self.config.labels.complete,
232                    )
233                    .await?;
234
235                // Close the issue when the merger can't do it:
236                // - Local issues: merger can't use `gh issue close`
237                // - Multi-repo: merger runs in target repo, can't close god-repo issue
238                let should_close =
239                    issue.source == IssueOrigin::Local || issue.target_repo.is_some();
240
241                if should_close {
242                    let comment = issue.target_repo.as_ref().map_or_else(
243                        || format!("Implemented in #{pr_number}"),
244                        |repo_name| format!("Implemented in {repo_name}#{pr_number}"),
245                    );
246                    if let Err(e) = self.issues.close(issue.number, Some(&comment)).await {
247                        warn!(
248                            run_id = %run_id,
249                            error = %e,
250                            "failed to close issue"
251                        );
252                    }
253                }
254
255                (RunStatus::Complete, None)
256            }
257            Err(e) => {
258                warn!(run_id = %run_id, error = %e, "pipeline failed");
259                github::safe_comment(&self.github, pr_number, &format!("Pipeline failed: {e:#}"))
260                    .await;
261                let _ = self
262                    .issues
263                    .transition(
264                        issue.number,
265                        &self.config.labels.cooking,
266                        &self.config.labels.failed,
267                    )
268                    .await;
269                (RunStatus::Failed, Some(format!("{e:#}")))
270            }
271        };
272
273        let conn = self.db.lock().await;
274        db::runs::finish_run(&conn, run_id, final_status, error_msg.as_deref())
275    }
276
277    async fn run_steps(
278        &self,
279        run_id: &str,
280        ctx: &AgentContext,
281        worktree_path: &std::path::Path,
282        auto_merge: bool,
283        base_branch: &str,
284    ) -> Result<()> {
285        self.check_cancelled()?;
286
287        // 1. Implement
288        self.update_status(run_id, RunStatus::Implementing).await?;
289        let impl_prompt = agents::implementer::build_prompt(ctx)?;
290        self.run_agent(run_id, AgentRole::Implementer, &impl_prompt, worktree_path, 1).await?;
291
292        git::push_branch(worktree_path, &ctx.branch).await?;
293
294        // 2. Review-fix loop
295        let clean = self.run_review_fix_loop(run_id, ctx, worktree_path).await?;
296
297        if !clean {
298            anyhow::bail!("unresolved findings after max review cycles");
299        }
300
301        // 3. Rebase onto base branch to resolve any conflicts from parallel merges
302        self.check_cancelled()?;
303        info!(run_id = %run_id, base = base_branch, "rebasing onto base branch");
304        if let Err(e) = git::rebase_on_base(worktree_path, base_branch).await {
305            if let Some(pr_number) = ctx.pr_number {
306                github::safe_comment(
307                    &self.github,
308                    pr_number,
309                    &format!(
310                        "Pipeline stopped: {e}\n\nPlease rebase manually and re-run the pipeline."
311                    ),
312                )
313                .await;
314            }
315            return Err(e);
316        }
317        git::force_push_branch(worktree_path, &ctx.branch).await?;
318
319        // 4. Merge (only when auto-merge is enabled)
320        if auto_merge {
321            self.check_cancelled()?;
322            ctx.pr_number.context("no PR number for merge step")?;
323            self.update_status(run_id, RunStatus::Merging).await?;
324            let merge_prompt = agents::merger::build_prompt(ctx, auto_merge)?;
325            self.run_agent(run_id, AgentRole::Merger, &merge_prompt, worktree_path, 1).await?;
326        }
327
328        Ok(())
329    }
330
331    async fn run_review_fix_loop(
332        &self,
333        run_id: &str,
334        ctx: &AgentContext,
335        worktree_path: &std::path::Path,
336    ) -> Result<bool> {
337        for cycle in 1..=3 {
338            self.check_cancelled()?;
339
340            self.update_status(run_id, RunStatus::Reviewing).await?;
341            let review_prompt = agents::reviewer::build_prompt(ctx)?;
342            let review_result = self
343                .run_agent(run_id, AgentRole::Reviewer, &review_prompt, worktree_path, cycle)
344                .await?;
345
346            let review_output = match parse_review_output(&review_result.output) {
347                Ok(output) => output,
348                Err(e) => {
349                    warn!(run_id = %run_id, cycle, error = %e, "review output unparseable, treating as failed review");
350                    if let Some(pr_number) = ctx.pr_number {
351                        github::safe_comment(
352                            &self.github,
353                            pr_number,
354                            &format!("Review cycle {cycle} returned unparseable output. Stopping pipeline."),
355                        )
356                        .await;
357                    }
358                    anyhow::bail!("reviewer returned unparseable output in cycle {cycle}");
359                }
360            };
361            self.store_findings(run_id, &review_output.findings).await?;
362
363            let actionable: Vec<_> =
364                review_output.findings.iter().filter(|f| f.severity != Severity::Info).collect();
365
366            if actionable.is_empty() {
367                info!(run_id = %run_id, cycle, "review clean");
368                return Ok(true);
369            }
370
371            info!(run_id = %run_id, cycle, findings = actionable.len(), "review found issues");
372
373            if cycle == 3 {
374                if let Some(pr_number) = ctx.pr_number {
375                    let comment = format_unresolved_comment(&actionable);
376                    github::safe_comment(&self.github, pr_number, &comment).await;
377                } else {
378                    warn!(run_id = %run_id, "no PR number, cannot post unresolved findings");
379                }
380                return Ok(false);
381            }
382
383            // Fix
384            self.check_cancelled()?;
385            self.update_status(run_id, RunStatus::Fixing).await?;
386
387            let unresolved = {
388                let conn = self.db.lock().await;
389                db::agent_runs::get_unresolved_findings(&conn, run_id)?
390            };
391
392            let fix_prompt = agents::fixer::build_prompt(ctx, &unresolved)?;
393            self.run_agent(run_id, AgentRole::Fixer, &fix_prompt, worktree_path, cycle).await?;
394
395            git::push_branch(worktree_path, &ctx.branch).await?;
396        }
397
398        Ok(false)
399    }
400
401    async fn store_findings(&self, run_id: &str, findings: &[agents::Finding]) -> Result<()> {
402        let conn = self.db.lock().await;
403        let agent_runs = db::agent_runs::get_agent_runs_for_run(&conn, run_id)?;
404        let reviewer_run_id = agent_runs
405            .iter()
406            .rev()
407            .find_map(|ar| if ar.agent == "reviewer" { Some(ar.id) } else { None });
408        if let Some(ar_id) = reviewer_run_id {
409            for finding in findings {
410                let db_finding = ReviewFinding {
411                    id: 0,
412                    agent_run_id: ar_id,
413                    severity: finding.severity.to_string(),
414                    category: finding.category.clone(),
415                    file_path: finding.file_path.clone(),
416                    line_number: finding.line_number,
417                    message: finding.message.clone(),
418                    resolved: false,
419                };
420                db::agent_runs::insert_finding(&conn, &db_finding)?;
421            }
422        }
423        drop(conn);
424        Ok(())
425    }
426
427    async fn run_agent(
428        &self,
429        run_id: &str,
430        role: AgentRole,
431        prompt: &str,
432        working_dir: &std::path::Path,
433        cycle: u32,
434    ) -> Result<crate::process::AgentResult> {
435        let agent_run_id = self.record_agent_start(run_id, role, cycle).await?;
436
437        info!(run_id = %run_id, agent = %role, cycle, "agent starting");
438
439        let invocation = AgentInvocation {
440            role,
441            prompt: prompt.to_string(),
442            working_dir: working_dir.to_path_buf(),
443            max_turns: Some(self.config.pipeline.turn_limit),
444        };
445
446        let result = invoke_agent(self.runner.as_ref(), &invocation).await;
447
448        match &result {
449            Ok(agent_result) => {
450                self.record_agent_success(run_id, agent_run_id, agent_result).await?;
451            }
452            Err(e) => {
453                let conn = self.db.lock().await;
454                db::agent_runs::finish_agent_run(
455                    &conn,
456                    agent_run_id,
457                    "failed",
458                    0.0,
459                    0,
460                    None,
461                    Some(&format!("{e:#}")),
462                    None,
463                )?;
464            }
465        }
466
467        result
468    }
469
470    async fn record_agent_start(&self, run_id: &str, role: AgentRole, cycle: u32) -> Result<i64> {
471        let agent_run = AgentRun {
472            id: 0,
473            run_id: run_id.to_string(),
474            agent: role.to_string(),
475            cycle,
476            status: "running".to_string(),
477            cost_usd: 0.0,
478            turns: 0,
479            started_at: chrono::Utc::now().to_rfc3339(),
480            finished_at: None,
481            output_summary: None,
482            error_message: None,
483            raw_output: None,
484        };
485        let conn = self.db.lock().await;
486        db::agent_runs::insert_agent_run(&conn, &agent_run)
487    }
488
489    async fn record_agent_success(
490        &self,
491        run_id: &str,
492        agent_run_id: i64,
493        agent_result: &crate::process::AgentResult,
494    ) -> Result<()> {
495        let conn = self.db.lock().await;
496        db::agent_runs::finish_agent_run(
497            &conn,
498            agent_run_id,
499            "complete",
500            agent_result.cost_usd,
501            agent_result.turns,
502            Some(&truncate(&agent_result.output, 500)),
503            None,
504            Some(&agent_result.output),
505        )?;
506
507        let new_cost = db::runs::increment_run_cost(&conn, run_id, agent_result.cost_usd)?;
508        drop(conn);
509
510        if new_cost > self.config.pipeline.cost_budget {
511            anyhow::bail!(
512                "cost budget exceeded: ${:.2} > ${:.2}",
513                new_cost,
514                self.config.pipeline.cost_budget
515            );
516        }
517        Ok(())
518    }
519
520    async fn update_status(&self, run_id: &str, status: RunStatus) -> Result<()> {
521        let conn = self.db.lock().await;
522        db::runs::update_run_status(&conn, run_id, status)
523    }
524
525    fn check_cancelled(&self) -> Result<()> {
526        if self.cancel_token.is_cancelled() {
527            anyhow::bail!("pipeline cancelled");
528        }
529        Ok(())
530    }
531}
532
533fn format_unresolved_comment(actionable: &[&agents::Finding]) -> String {
534    let mut comment = String::from("## Unresolved findings after max review cycles\n\n");
535    for f in actionable {
536        let loc = match (&f.file_path, f.line_number) {
537            (Some(path), Some(line)) => format!(" at `{path}:{line}`"),
538            (Some(path), None) => format!(" in `{path}`"),
539            _ => String::new(),
540        };
541        let _ = writeln!(comment, "- **[{}]** {}{}: {}", f.severity, f.category, loc, f.message);
542    }
543    comment
544}
545
546fn new_run(run_id: &str, issue: &PipelineIssue, auto_merge: bool) -> Run {
547    Run {
548        id: run_id.to_string(),
549        issue_number: issue.number,
550        status: RunStatus::Pending,
551        pr_number: None,
552        branch: None,
553        worktree_path: None,
554        cost_usd: 0.0,
555        auto_merge,
556        started_at: chrono::Utc::now().to_rfc3339(),
557        finished_at: None,
558        error_message: None,
559        complexity: "full".to_string(),
560        issue_source: issue.source.to_string(),
561    }
562}
563
564/// Generate an 8-character hex run ID.
565pub fn generate_run_id() -> String {
566    uuid::Uuid::new_v4().to_string()[..8].to_string()
567}
568
569/// Truncate a string to at most `max_len` bytes, appending "..." if truncated.
570///
571/// Reserves 3 bytes for the "..." suffix so the total output never exceeds `max_len`.
572/// Always cuts at a valid UTF-8 character boundary to avoid panics on multi-byte input.
573pub(crate) fn truncate(s: &str, max_len: usize) -> String {
574    if s.len() <= max_len {
575        return s.to_string();
576    }
577    let target = max_len.saturating_sub(3);
578    let mut end = target;
579    while end > 0 && !s.is_char_boundary(end) {
580        end -= 1;
581    }
582    format!("{}...", &s[..end])
583}
584
585#[cfg(test)]
586mod tests {
587    use proptest::prelude::*;
588
589    use super::*;
590
591    proptest! {
592        #[test]
593        fn run_ids_always_8_hex_chars(_seed in any::<u64>()) {
594            let id = generate_run_id();
595            prop_assert_eq!(id.len(), 8);
596            prop_assert!(id.chars().all(|c| c.is_ascii_hexdigit()));
597        }
598    }
599
600    #[test]
601    fn run_id_is_8_hex_chars() {
602        let id = generate_run_id();
603        assert_eq!(id.len(), 8);
604        assert!(id.chars().all(|c| c.is_ascii_hexdigit()));
605    }
606
607    #[test]
608    fn run_ids_are_unique() {
609        let ids: Vec<_> = (0..100).map(|_| generate_run_id()).collect();
610        let unique: std::collections::HashSet<_> = ids.iter().collect();
611        assert_eq!(ids.len(), unique.len());
612    }
613
614    #[test]
615    fn truncate_short_string() {
616        assert_eq!(truncate("hello", 10), "hello");
617    }
618
619    #[test]
620    fn truncate_long_string() {
621        let long = "a".repeat(100);
622        let result = truncate(&long, 10);
623        assert_eq!(result.len(), 10); // 7 chars + "..."
624        assert!(result.ends_with("..."));
625    }
626
627    #[test]
628    fn truncate_multibyte_does_not_panic() {
629        // Each emoji is 4 bytes. "πŸ˜€πŸ˜€πŸ˜€" = 12 bytes.
630        // max_len=8, target=5, walks back to boundary at 4 (one emoji).
631        let s = "πŸ˜€πŸ˜€πŸ˜€";
632        let result = truncate(s, 8);
633        assert!(result.ends_with("..."));
634        assert!(result.starts_with("πŸ˜€"));
635        assert!(result.len() <= 8);
636    }
637
638    #[test]
639    fn truncate_cjk_boundary() {
640        // CJK chars are 3 bytes each
641        let s = "δ½ ε₯½δΈ–η•Œζ΅‹θ―•"; // 18 bytes
642        // max_len=10, target=7, walks back to boundary at 6 (two 3-byte chars).
643        let result = truncate(s, 10);
644        assert!(result.ends_with("..."));
645        assert!(result.starts_with("δ½ ε₯½"));
646        assert!(result.len() <= 10);
647    }
648
649    #[test]
650    fn format_unresolved_comment_includes_findings() {
651        let findings = [
652            agents::Finding {
653                severity: Severity::Critical,
654                category: "bug".to_string(),
655                file_path: Some("src/main.rs".to_string()),
656                line_number: Some(42),
657                message: "null pointer".to_string(),
658            },
659            agents::Finding {
660                severity: Severity::Warning,
661                category: "style".to_string(),
662                file_path: None,
663                line_number: None,
664                message: "missing docs".to_string(),
665            },
666        ];
667        let refs: Vec<_> = findings.iter().collect();
668        let comment = format_unresolved_comment(&refs);
669        assert!(comment.contains("Unresolved findings"));
670        assert!(comment.contains("null pointer"));
671        assert!(comment.contains("`src/main.rs:42`"));
672        assert!(comment.contains("missing docs"));
673    }
674}