Skip to main content

oven_cli/pipeline/
executor.rs

1use std::{fmt::Write as _, path::PathBuf, sync::Arc};
2
3use anyhow::{Context, Result};
4use rusqlite::Connection;
5use tokio::sync::Mutex;
6use tokio_util::sync::CancellationToken;
7use tracing::{debug, info, warn};
8
9use crate::{
10    agents::{
11        self, AgentContext, AgentInvocation, AgentRole, Complexity, PlannerOutput, Severity,
12        invoke_agent, parse_planner_output, parse_review_output,
13    },
14    config::Config,
15    db::{self, AgentRun, ReviewFinding, Run, RunStatus},
16    git,
17    github::{self, GhClient},
18    issues::{IssueOrigin, IssueProvider, PipelineIssue},
19    process::CommandRunner,
20};
21
22/// Runs a single issue through the full pipeline.
23pub struct PipelineExecutor<R: CommandRunner> {
24    pub runner: Arc<R>,
25    pub github: Arc<GhClient<R>>,
26    pub issues: Arc<dyn IssueProvider>,
27    pub db: Arc<Mutex<Connection>>,
28    pub config: Config,
29    pub cancel_token: CancellationToken,
30    pub repo_dir: PathBuf,
31}
32
33impl<R: CommandRunner + 'static> PipelineExecutor<R> {
34    /// Run the full pipeline for a single issue.
35    pub async fn run_issue(&self, issue: &PipelineIssue, auto_merge: bool) -> Result<()> {
36        self.run_issue_with_complexity(issue, auto_merge, None).await
37    }
38
39    /// Run the full pipeline for a single issue with an optional complexity classification.
40    pub async fn run_issue_with_complexity(
41        &self,
42        issue: &PipelineIssue,
43        auto_merge: bool,
44        complexity: Option<Complexity>,
45    ) -> Result<()> {
46        let run_id = generate_run_id();
47
48        // Determine target repo for worktrees and PRs (multi-repo routing)
49        let (target_dir, is_multi_repo) = self.resolve_target_dir(issue.target_repo.as_ref())?;
50
51        let base_branch = git::default_branch(&target_dir).await?;
52
53        let mut run = new_run(&run_id, issue, auto_merge);
54        if let Some(ref c) = complexity {
55            run.complexity = c.to_string();
56        }
57        {
58            let conn = self.db.lock().await;
59            db::runs::insert_run(&conn, &run)?;
60        }
61
62        self.issues
63            .transition(issue.number, &self.config.labels.ready, &self.config.labels.cooking)
64            .await?;
65
66        let worktree = git::create_worktree(&target_dir, issue.number, &base_branch).await?;
67        self.record_worktree(&run_id, &worktree).await?;
68
69        // Seed branch with an empty commit so GitHub accepts the draft PR
70        git::empty_commit(
71            &worktree.path,
72            &format!("chore: start oven pipeline for issue #{}", issue.number),
73        )
74        .await?;
75
76        info!(
77            run_id = %run_id,
78            issue = issue.number,
79            branch = %worktree.branch,
80            target_repo = ?issue.target_repo,
81            "starting pipeline"
82        );
83
84        let pr_number = self.create_pr(&run_id, issue, &worktree.branch, &target_dir).await?;
85
86        let ctx = AgentContext {
87            issue_number: issue.number,
88            issue_title: issue.title.clone(),
89            issue_body: issue.body.clone(),
90            branch: worktree.branch.clone(),
91            pr_number: Some(pr_number),
92            test_command: self.config.project.test.clone(),
93            lint_command: self.config.project.lint.clone(),
94            review_findings: None,
95            cycle: 1,
96            target_repo: if is_multi_repo { issue.target_repo.clone() } else { None },
97            issue_source: issue.source.as_str().to_string(),
98        };
99
100        let result = self.run_steps(&run_id, &ctx, &worktree.path, auto_merge).await;
101        self.finalize_run(&run_id, issue, pr_number, &result).await?;
102
103        if let Err(e) = git::remove_worktree(&target_dir, &worktree.path).await {
104            warn!(run_id = %run_id, error = %e, "failed to clean up worktree");
105        }
106
107        result
108    }
109
110    /// Invoke the planner agent to decide batching and complexity for a set of issues.
111    ///
112    /// Returns `None` if the planner fails or returns unparseable output (fallback to default).
113    pub async fn plan_issues(&self, issues: &[PipelineIssue]) -> Option<PlannerOutput> {
114        let prompt = match agents::planner::build_prompt(issues) {
115            Ok(p) => p,
116            Err(e) => {
117                warn!(error = %e, "planner prompt build failed");
118                return None;
119            }
120        };
121        let invocation = AgentInvocation {
122            role: AgentRole::Planner,
123            prompt,
124            working_dir: self.repo_dir.clone(),
125            max_turns: Some(self.config.pipeline.turn_limit),
126        };
127
128        match invoke_agent(self.runner.as_ref(), &invocation).await {
129            Ok(result) => {
130                debug!(output = %result.output, "raw planner output");
131                let parsed = parse_planner_output(&result.output);
132                if parsed.is_none() {
133                    warn!(output = %result.output, "planner returned unparseable output, falling back to single batch");
134                }
135                parsed
136            }
137            Err(e) => {
138                warn!(error = %e, "planner agent failed, falling back to single batch");
139                None
140            }
141        }
142    }
143
144    /// Determine the effective repo directory for worktrees and PRs.
145    ///
146    /// Returns `(target_dir, is_multi_repo)`. When multi-repo is disabled or no target
147    /// is specified, falls back to `self.repo_dir`.
148    fn resolve_target_dir(&self, target_repo: Option<&String>) -> Result<(PathBuf, bool)> {
149        if !self.config.multi_repo.enabled {
150            return Ok((self.repo_dir.clone(), false));
151        }
152        match target_repo {
153            Some(name) => {
154                let path = self.config.resolve_repo(name)?;
155                Ok((path, true))
156            }
157            None => Ok((self.repo_dir.clone(), false)),
158        }
159    }
160
161    async fn record_worktree(&self, run_id: &str, worktree: &git::Worktree) -> Result<()> {
162        let conn = self.db.lock().await;
163        db::runs::update_run_worktree(
164            &conn,
165            run_id,
166            &worktree.branch,
167            &worktree.path.to_string_lossy(),
168        )?;
169        drop(conn);
170        Ok(())
171    }
172
173    async fn create_pr(
174        &self,
175        run_id: &str,
176        issue: &PipelineIssue,
177        branch: &str,
178        repo_dir: &std::path::Path,
179    ) -> Result<u32> {
180        let (pr_title, pr_body) = match issue.source {
181            IssueOrigin::Github => (
182                format!("fix(#{}): {}", issue.number, issue.title),
183                format!(
184                    "Resolves #{}\n\nAutomated by [oven](https://github.com/clayharmon/oven-cli).",
185                    issue.number
186                ),
187            ),
188            IssueOrigin::Local => (
189                format!("fix: {}", issue.title),
190                format!(
191                    "From local issue #{}\n\nAutomated by [oven](https://github.com/clayharmon/oven-cli).",
192                    issue.number
193                ),
194            ),
195        };
196
197        git::push_branch(repo_dir, branch).await?;
198        let pr_number =
199            self.github.create_draft_pr_in(&pr_title, branch, &pr_body, repo_dir).await?;
200
201        {
202            let conn = self.db.lock().await;
203            db::runs::update_run_pr(&conn, run_id, pr_number)?;
204        }
205
206        info!(run_id = %run_id, pr = pr_number, "draft PR created");
207        Ok(pr_number)
208    }
209
210    async fn finalize_run(
211        &self,
212        run_id: &str,
213        issue: &PipelineIssue,
214        pr_number: u32,
215        result: &Result<()>,
216    ) -> Result<()> {
217        let (final_status, error_msg) = match result {
218            Ok(()) => {
219                self.issues
220                    .transition(
221                        issue.number,
222                        &self.config.labels.cooking,
223                        &self.config.labels.complete,
224                    )
225                    .await?;
226
227                // Close the issue when the merger can't do it:
228                // - Local issues: merger can't use `gh issue close`
229                // - Multi-repo: merger runs in target repo, can't close god-repo issue
230                let should_close =
231                    issue.source == IssueOrigin::Local || issue.target_repo.is_some();
232
233                if should_close {
234                    let comment = issue.target_repo.as_ref().map_or_else(
235                        || format!("Implemented in #{pr_number}"),
236                        |repo_name| format!("Implemented in {repo_name}#{pr_number}"),
237                    );
238                    if let Err(e) = self.issues.close(issue.number, Some(&comment)).await {
239                        warn!(
240                            run_id = %run_id,
241                            error = %e,
242                            "failed to close issue"
243                        );
244                    }
245                }
246
247                (RunStatus::Complete, None)
248            }
249            Err(e) => {
250                warn!(run_id = %run_id, error = %e, "pipeline failed");
251                github::safe_comment(&self.github, pr_number, &format!("Pipeline failed: {e:#}"))
252                    .await;
253                let _ = self
254                    .issues
255                    .transition(
256                        issue.number,
257                        &self.config.labels.cooking,
258                        &self.config.labels.failed,
259                    )
260                    .await;
261                (RunStatus::Failed, Some(format!("{e:#}")))
262            }
263        };
264
265        let conn = self.db.lock().await;
266        db::runs::finish_run(&conn, run_id, final_status, error_msg.as_deref())
267    }
268
269    async fn run_steps(
270        &self,
271        run_id: &str,
272        ctx: &AgentContext,
273        worktree_path: &std::path::Path,
274        auto_merge: bool,
275    ) -> Result<()> {
276        self.check_cancelled()?;
277
278        // 1. Implement
279        self.update_status(run_id, RunStatus::Implementing).await?;
280        let impl_prompt = agents::implementer::build_prompt(ctx)?;
281        self.run_agent(run_id, AgentRole::Implementer, &impl_prompt, worktree_path, 1).await?;
282
283        git::push_branch(worktree_path, &ctx.branch).await?;
284
285        // 2. Review-fix loop
286        let clean = self.run_review_fix_loop(run_id, ctx, worktree_path).await?;
287
288        if !clean {
289            anyhow::bail!("unresolved findings after max review cycles");
290        }
291
292        // 3. Merge
293        self.check_cancelled()?;
294        ctx.pr_number.context("no PR number for merge step")?;
295        self.update_status(run_id, RunStatus::Merging).await?;
296        let merge_prompt = agents::merger::build_prompt(ctx, auto_merge)?;
297        self.run_agent(run_id, AgentRole::Merger, &merge_prompt, worktree_path, 1).await?;
298
299        Ok(())
300    }
301
302    async fn run_review_fix_loop(
303        &self,
304        run_id: &str,
305        ctx: &AgentContext,
306        worktree_path: &std::path::Path,
307    ) -> Result<bool> {
308        for cycle in 1..=2 {
309            self.check_cancelled()?;
310
311            self.update_status(run_id, RunStatus::Reviewing).await?;
312            let review_prompt = agents::reviewer::build_prompt(ctx)?;
313            let review_result = self
314                .run_agent(run_id, AgentRole::Reviewer, &review_prompt, worktree_path, cycle)
315                .await?;
316
317            let review_output = match parse_review_output(&review_result.output) {
318                Ok(output) => output,
319                Err(e) => {
320                    warn!(run_id = %run_id, cycle, error = %e, "review output unparseable, treating as failed review");
321                    if let Some(pr_number) = ctx.pr_number {
322                        github::safe_comment(
323                            &self.github,
324                            pr_number,
325                            &format!("Review cycle {cycle} returned unparseable output. Stopping pipeline."),
326                        )
327                        .await;
328                    }
329                    anyhow::bail!("reviewer returned unparseable output in cycle {cycle}");
330                }
331            };
332            self.store_findings(run_id, &review_output.findings).await?;
333
334            let actionable: Vec<_> =
335                review_output.findings.iter().filter(|f| f.severity != Severity::Info).collect();
336
337            if actionable.is_empty() {
338                info!(run_id = %run_id, cycle, "review clean");
339                return Ok(true);
340            }
341
342            info!(run_id = %run_id, cycle, findings = actionable.len(), "review found issues");
343
344            if cycle == 2 {
345                if let Some(pr_number) = ctx.pr_number {
346                    let comment = format_unresolved_comment(&actionable);
347                    github::safe_comment(&self.github, pr_number, &comment).await;
348                } else {
349                    warn!(run_id = %run_id, "no PR number, cannot post unresolved findings");
350                }
351                return Ok(false);
352            }
353
354            // Fix
355            self.check_cancelled()?;
356            self.update_status(run_id, RunStatus::Fixing).await?;
357
358            let unresolved = {
359                let conn = self.db.lock().await;
360                db::agent_runs::get_unresolved_findings(&conn, run_id)?
361            };
362
363            let fix_prompt = agents::fixer::build_prompt(ctx, &unresolved)?;
364            self.run_agent(run_id, AgentRole::Fixer, &fix_prompt, worktree_path, cycle).await?;
365
366            git::push_branch(worktree_path, &ctx.branch).await?;
367        }
368
369        Ok(false)
370    }
371
372    async fn store_findings(&self, run_id: &str, findings: &[agents::Finding]) -> Result<()> {
373        let conn = self.db.lock().await;
374        let agent_runs = db::agent_runs::get_agent_runs_for_run(&conn, run_id)?;
375        let reviewer_run_id = agent_runs
376            .iter()
377            .rev()
378            .find_map(|ar| if ar.agent == "reviewer" { Some(ar.id) } else { None });
379        if let Some(ar_id) = reviewer_run_id {
380            for finding in findings {
381                let db_finding = ReviewFinding {
382                    id: 0,
383                    agent_run_id: ar_id,
384                    severity: finding.severity.to_string(),
385                    category: finding.category.clone(),
386                    file_path: finding.file_path.clone(),
387                    line_number: finding.line_number,
388                    message: finding.message.clone(),
389                    resolved: false,
390                };
391                db::agent_runs::insert_finding(&conn, &db_finding)?;
392            }
393        }
394        drop(conn);
395        Ok(())
396    }
397
398    async fn run_agent(
399        &self,
400        run_id: &str,
401        role: AgentRole,
402        prompt: &str,
403        working_dir: &std::path::Path,
404        cycle: u32,
405    ) -> Result<crate::process::AgentResult> {
406        let agent_run_id = self.record_agent_start(run_id, role, cycle).await?;
407
408        info!(run_id = %run_id, agent = %role, cycle, "agent starting");
409
410        let invocation = AgentInvocation {
411            role,
412            prompt: prompt.to_string(),
413            working_dir: working_dir.to_path_buf(),
414            max_turns: Some(self.config.pipeline.turn_limit),
415        };
416
417        let result = invoke_agent(self.runner.as_ref(), &invocation).await;
418
419        match &result {
420            Ok(agent_result) => {
421                self.record_agent_success(run_id, agent_run_id, agent_result).await?;
422            }
423            Err(e) => {
424                let conn = self.db.lock().await;
425                db::agent_runs::finish_agent_run(
426                    &conn,
427                    agent_run_id,
428                    "failed",
429                    0.0,
430                    0,
431                    None,
432                    Some(&format!("{e:#}")),
433                    None,
434                )?;
435            }
436        }
437
438        result
439    }
440
441    async fn record_agent_start(&self, run_id: &str, role: AgentRole, cycle: u32) -> Result<i64> {
442        let agent_run = AgentRun {
443            id: 0,
444            run_id: run_id.to_string(),
445            agent: role.to_string(),
446            cycle,
447            status: "running".to_string(),
448            cost_usd: 0.0,
449            turns: 0,
450            started_at: chrono::Utc::now().to_rfc3339(),
451            finished_at: None,
452            output_summary: None,
453            error_message: None,
454            raw_output: None,
455        };
456        let conn = self.db.lock().await;
457        db::agent_runs::insert_agent_run(&conn, &agent_run)
458    }
459
460    async fn record_agent_success(
461        &self,
462        run_id: &str,
463        agent_run_id: i64,
464        agent_result: &crate::process::AgentResult,
465    ) -> Result<()> {
466        let conn = self.db.lock().await;
467        db::agent_runs::finish_agent_run(
468            &conn,
469            agent_run_id,
470            "complete",
471            agent_result.cost_usd,
472            agent_result.turns,
473            Some(&truncate(&agent_result.output, 500)),
474            None,
475            Some(&agent_result.output),
476        )?;
477
478        let new_cost = db::runs::increment_run_cost(&conn, run_id, agent_result.cost_usd)?;
479        drop(conn);
480
481        if new_cost > self.config.pipeline.cost_budget {
482            anyhow::bail!(
483                "cost budget exceeded: ${:.2} > ${:.2}",
484                new_cost,
485                self.config.pipeline.cost_budget
486            );
487        }
488        Ok(())
489    }
490
491    async fn update_status(&self, run_id: &str, status: RunStatus) -> Result<()> {
492        let conn = self.db.lock().await;
493        db::runs::update_run_status(&conn, run_id, status)
494    }
495
496    fn check_cancelled(&self) -> Result<()> {
497        if self.cancel_token.is_cancelled() {
498            anyhow::bail!("pipeline cancelled");
499        }
500        Ok(())
501    }
502}
503
504fn format_unresolved_comment(actionable: &[&agents::Finding]) -> String {
505    let mut comment = String::from("## Unresolved findings after 2 review cycles\n\n");
506    for f in actionable {
507        let loc = match (&f.file_path, f.line_number) {
508            (Some(path), Some(line)) => format!(" at `{path}:{line}`"),
509            (Some(path), None) => format!(" in `{path}`"),
510            _ => String::new(),
511        };
512        let _ = writeln!(comment, "- **[{}]** {}{}: {}", f.severity, f.category, loc, f.message);
513    }
514    comment
515}
516
517fn new_run(run_id: &str, issue: &PipelineIssue, auto_merge: bool) -> Run {
518    Run {
519        id: run_id.to_string(),
520        issue_number: issue.number,
521        status: RunStatus::Pending,
522        pr_number: None,
523        branch: None,
524        worktree_path: None,
525        cost_usd: 0.0,
526        auto_merge,
527        started_at: chrono::Utc::now().to_rfc3339(),
528        finished_at: None,
529        error_message: None,
530        complexity: "full".to_string(),
531        issue_source: issue.source.to_string(),
532    }
533}
534
535/// Generate an 8-character hex run ID.
536pub fn generate_run_id() -> String {
537    uuid::Uuid::new_v4().to_string()[..8].to_string()
538}
539
540/// Truncate a string to at most `max_len` bytes, appending "..." if truncated.
541///
542/// Reserves 3 bytes for the "..." suffix so the total output never exceeds `max_len`.
543/// Always cuts at a valid UTF-8 character boundary to avoid panics on multi-byte input.
544pub(crate) fn truncate(s: &str, max_len: usize) -> String {
545    if s.len() <= max_len {
546        return s.to_string();
547    }
548    let target = max_len.saturating_sub(3);
549    let mut end = target;
550    while end > 0 && !s.is_char_boundary(end) {
551        end -= 1;
552    }
553    format!("{}...", &s[..end])
554}
555
556#[cfg(test)]
557mod tests {
558    use proptest::prelude::*;
559
560    use super::*;
561
562    proptest! {
563        #[test]
564        fn run_ids_always_8_hex_chars(_seed in any::<u64>()) {
565            let id = generate_run_id();
566            prop_assert_eq!(id.len(), 8);
567            prop_assert!(id.chars().all(|c| c.is_ascii_hexdigit()));
568        }
569    }
570
571    #[test]
572    fn run_id_is_8_hex_chars() {
573        let id = generate_run_id();
574        assert_eq!(id.len(), 8);
575        assert!(id.chars().all(|c| c.is_ascii_hexdigit()));
576    }
577
578    #[test]
579    fn run_ids_are_unique() {
580        let ids: Vec<_> = (0..100).map(|_| generate_run_id()).collect();
581        let unique: std::collections::HashSet<_> = ids.iter().collect();
582        assert_eq!(ids.len(), unique.len());
583    }
584
585    #[test]
586    fn truncate_short_string() {
587        assert_eq!(truncate("hello", 10), "hello");
588    }
589
590    #[test]
591    fn truncate_long_string() {
592        let long = "a".repeat(100);
593        let result = truncate(&long, 10);
594        assert_eq!(result.len(), 10); // 7 chars + "..."
595        assert!(result.ends_with("..."));
596    }
597
598    #[test]
599    fn truncate_multibyte_does_not_panic() {
600        // Each emoji is 4 bytes. "πŸ˜€πŸ˜€πŸ˜€" = 12 bytes.
601        // max_len=8, target=5, walks back to boundary at 4 (one emoji).
602        let s = "πŸ˜€πŸ˜€πŸ˜€";
603        let result = truncate(s, 8);
604        assert!(result.ends_with("..."));
605        assert!(result.starts_with("πŸ˜€"));
606        assert!(result.len() <= 8);
607    }
608
609    #[test]
610    fn truncate_cjk_boundary() {
611        // CJK chars are 3 bytes each
612        let s = "δ½ ε₯½δΈ–η•Œζ΅‹θ―•"; // 18 bytes
613        // max_len=10, target=7, walks back to boundary at 6 (two 3-byte chars).
614        let result = truncate(s, 10);
615        assert!(result.ends_with("..."));
616        assert!(result.starts_with("δ½ ε₯½"));
617        assert!(result.len() <= 10);
618    }
619
620    #[test]
621    fn format_unresolved_comment_includes_findings() {
622        let findings = [
623            agents::Finding {
624                severity: Severity::Critical,
625                category: "bug".to_string(),
626                file_path: Some("src/main.rs".to_string()),
627                line_number: Some(42),
628                message: "null pointer".to_string(),
629            },
630            agents::Finding {
631                severity: Severity::Warning,
632                category: "style".to_string(),
633                file_path: None,
634                line_number: None,
635                message: "missing docs".to_string(),
636            },
637        ];
638        let refs: Vec<_> = findings.iter().collect();
639        let comment = format_unresolved_comment(&refs);
640        assert!(comment.contains("Unresolved findings"));
641        assert!(comment.contains("null pointer"));
642        assert!(comment.contains("`src/main.rs:42`"));
643        assert!(comment.contains("missing docs"));
644    }
645}