1use std::{fmt::Write as _, path::PathBuf, sync::Arc};
2
3use anyhow::{Context, Result};
4use rusqlite::Connection;
5use tokio::sync::Mutex;
6use tokio_util::sync::CancellationToken;
7use tracing::{debug, info, warn};
8
9use crate::{
10 agents::{
11 self, AgentContext, AgentInvocation, AgentRole, Complexity, GraphContextNode,
12 PlannerGraphOutput, Severity, invoke_agent, parse_fixer_output, parse_planner_graph_output,
13 parse_review_output,
14 },
15 config::Config,
16 db::{self, AgentRun, ReviewFinding, Run, RunStatus},
17 git,
18 github::{self, GhClient},
19 issues::{IssueOrigin, IssueProvider, PipelineIssue},
20 process::CommandRunner,
21};
22
23#[derive(Debug)]
25pub struct PipelineOutcome {
26 pub run_id: String,
27 pub pr_number: u32,
28 pub worktree_path: PathBuf,
30 pub target_dir: PathBuf,
32}
33
34pub struct PipelineExecutor<R: CommandRunner> {
36 pub runner: Arc<R>,
37 pub github: Arc<GhClient<R>>,
38 pub issues: Arc<dyn IssueProvider>,
39 pub db: Arc<Mutex<Connection>>,
40 pub config: Config,
41 pub cancel_token: CancellationToken,
42 pub repo_dir: PathBuf,
43}
44
45impl<R: CommandRunner + 'static> PipelineExecutor<R> {
46 pub async fn run_issue(&self, issue: &PipelineIssue, auto_merge: bool) -> Result<()> {
48 self.run_issue_with_complexity(issue, auto_merge, None).await
49 }
50
51 pub async fn run_issue_with_complexity(
53 &self,
54 issue: &PipelineIssue,
55 auto_merge: bool,
56 complexity: Option<Complexity>,
57 ) -> Result<()> {
58 let outcome = self.run_issue_pipeline(issue, auto_merge, complexity).await?;
59 self.finalize_merge(&outcome, issue).await
60 }
61
62 pub async fn run_issue_pipeline(
68 &self,
69 issue: &PipelineIssue,
70 auto_merge: bool,
71 complexity: Option<Complexity>,
72 ) -> Result<PipelineOutcome> {
73 let run_id = generate_run_id();
74
75 let (target_dir, is_multi_repo) = self.resolve_target_dir(issue.target_repo.as_ref())?;
77
78 let base_branch = git::default_branch(&target_dir).await?;
79
80 let mut run = new_run(&run_id, issue, auto_merge);
81 if let Some(ref c) = complexity {
82 run.complexity = c.to_string();
83 }
84 {
85 let conn = self.db.lock().await;
86 db::runs::insert_run(&conn, &run)?;
87 }
88
89 self.issues
90 .transition(issue.number, &self.config.labels.ready, &self.config.labels.cooking)
91 .await?;
92
93 let worktree = git::create_worktree(&target_dir, issue.number, &base_branch).await?;
94 self.record_worktree(&run_id, &worktree).await?;
95
96 git::empty_commit(
98 &worktree.path,
99 &format!("chore: start oven pipeline for issue #{}", issue.number),
100 )
101 .await?;
102
103 info!(
104 run_id = %run_id,
105 issue = issue.number,
106 branch = %worktree.branch,
107 target_repo = ?issue.target_repo,
108 "starting pipeline"
109 );
110
111 let pr_number = self.create_pr(&run_id, issue, &worktree.branch, &target_dir).await?;
112
113 let ctx = AgentContext {
114 issue_number: issue.number,
115 issue_title: issue.title.clone(),
116 issue_body: issue.body.clone(),
117 branch: worktree.branch.clone(),
118 pr_number: Some(pr_number),
119 test_command: self.config.project.test.clone(),
120 lint_command: self.config.project.lint.clone(),
121 review_findings: None,
122 cycle: 1,
123 target_repo: if is_multi_repo { issue.target_repo.clone() } else { None },
124 issue_source: issue.source.as_str().to_string(),
125 base_branch: base_branch.clone(),
126 };
127
128 let result = self.run_steps(&run_id, &ctx, &worktree.path, auto_merge, &target_dir).await;
129
130 if let Err(ref e) = result {
131 self.finalize_run(&run_id, issue, pr_number, &result, &target_dir).await?;
133 if let Err(e) = git::remove_worktree(&target_dir, &worktree.path).await {
134 warn!(run_id = %run_id, error = %e, "failed to clean up worktree");
135 }
136 return Err(anyhow::anyhow!("{e:#}"));
137 }
138
139 self.update_status(&run_id, RunStatus::AwaitingMerge).await?;
141
142 Ok(PipelineOutcome { run_id, pr_number, worktree_path: worktree.path, target_dir })
143 }
144
145 pub async fn finalize_merge(
150 &self,
151 outcome: &PipelineOutcome,
152 issue: &PipelineIssue,
153 ) -> Result<()> {
154 self.finalize_run(&outcome.run_id, issue, outcome.pr_number, &Ok(()), &outcome.target_dir)
155 .await?;
156 if let Err(e) = git::remove_worktree(&outcome.target_dir, &outcome.worktree_path).await {
157 warn!(
158 run_id = %outcome.run_id,
159 error = %e,
160 "failed to clean up worktree after merge"
161 );
162 }
163 Ok(())
164 }
165
166 pub async fn plan_issues(
173 &self,
174 issues: &[PipelineIssue],
175 graph_context: &[GraphContextNode],
176 ) -> Option<PlannerGraphOutput> {
177 let prompt = match agents::planner::build_prompt(issues, graph_context) {
178 Ok(p) => p,
179 Err(e) => {
180 warn!(error = %e, "planner prompt build failed");
181 return None;
182 }
183 };
184 let invocation = AgentInvocation {
185 role: AgentRole::Planner,
186 prompt,
187 working_dir: self.repo_dir.clone(),
188 max_turns: Some(self.config.pipeline.turn_limit),
189 };
190
191 match invoke_agent(self.runner.as_ref(), &invocation).await {
192 Ok(result) => {
193 debug!(output = %result.output, "raw planner output");
194 let parsed = parse_planner_graph_output(&result.output);
195 if parsed.is_none() {
196 warn!(output = %result.output, "planner returned unparseable output, falling back to all-parallel");
197 }
198 parsed
199 }
200 Err(e) => {
201 warn!(error = %e, "planner agent failed, falling back to all-parallel");
202 None
203 }
204 }
205 }
206
207 pub(crate) fn resolve_target_dir(
212 &self,
213 target_repo: Option<&String>,
214 ) -> Result<(PathBuf, bool)> {
215 if !self.config.multi_repo.enabled {
216 return Ok((self.repo_dir.clone(), false));
217 }
218 match target_repo {
219 Some(name) => {
220 let path = self.config.resolve_repo(name)?;
221 Ok((path, true))
222 }
223 None => Ok((self.repo_dir.clone(), false)),
224 }
225 }
226
227 pub fn reconstruct_outcome(
232 &self,
233 issue: &PipelineIssue,
234 run_id: &str,
235 pr_number: u32,
236 ) -> Result<PipelineOutcome> {
237 let (target_dir, _) = self.resolve_target_dir(issue.target_repo.as_ref())?;
238 let worktree_path =
239 target_dir.join(".oven").join("worktrees").join(format!("issue-{}", issue.number));
240 Ok(PipelineOutcome { run_id: run_id.to_string(), pr_number, worktree_path, target_dir })
241 }
242
243 async fn record_worktree(&self, run_id: &str, worktree: &git::Worktree) -> Result<()> {
244 let conn = self.db.lock().await;
245 db::runs::update_run_worktree(
246 &conn,
247 run_id,
248 &worktree.branch,
249 &worktree.path.to_string_lossy(),
250 )?;
251 drop(conn);
252 Ok(())
253 }
254
255 async fn create_pr(
256 &self,
257 run_id: &str,
258 issue: &PipelineIssue,
259 branch: &str,
260 repo_dir: &std::path::Path,
261 ) -> Result<u32> {
262 let (pr_title, pr_body) = match issue.source {
263 IssueOrigin::Github => (
264 format!("fix(#{}): {}", issue.number, issue.title),
265 format!(
266 "Resolves #{}\n\nAutomated by [oven](https://github.com/clayharmon/oven-cli).",
267 issue.number
268 ),
269 ),
270 IssueOrigin::Local => (
271 format!("fix: {}", issue.title),
272 format!(
273 "From local issue #{}\n\nAutomated by [oven](https://github.com/clayharmon/oven-cli).",
274 issue.number
275 ),
276 ),
277 };
278
279 git::push_branch(repo_dir, branch).await?;
280 let pr_number =
281 self.github.create_draft_pr_in(&pr_title, branch, &pr_body, repo_dir).await?;
282
283 {
284 let conn = self.db.lock().await;
285 db::runs::update_run_pr(&conn, run_id, pr_number)?;
286 }
287
288 info!(run_id = %run_id, pr = pr_number, "draft PR created");
289 Ok(pr_number)
290 }
291
292 async fn finalize_run(
293 &self,
294 run_id: &str,
295 issue: &PipelineIssue,
296 pr_number: u32,
297 result: &Result<()>,
298 target_dir: &std::path::Path,
299 ) -> Result<()> {
300 let (final_status, error_msg) = match result {
301 Ok(()) => {
302 self.issues
303 .transition(
304 issue.number,
305 &self.config.labels.cooking,
306 &self.config.labels.complete,
307 )
308 .await?;
309
310 let should_close =
314 issue.source == IssueOrigin::Local || issue.target_repo.is_some();
315
316 if should_close {
317 let comment = issue.target_repo.as_ref().map_or_else(
318 || format!("Implemented in #{pr_number}"),
319 |repo_name| format!("Implemented in {repo_name}#{pr_number}"),
320 );
321 if let Err(e) = self.issues.close(issue.number, Some(&comment)).await {
322 warn!(
323 run_id = %run_id,
324 error = %e,
325 "failed to close issue"
326 );
327 }
328 }
329
330 (RunStatus::Complete, None)
331 }
332 Err(e) => {
333 warn!(run_id = %run_id, error = %e, "pipeline failed");
334 github::safe_comment(
335 &self.github,
336 pr_number,
337 &format_pipeline_failure(e),
338 target_dir,
339 )
340 .await;
341 let _ = self
342 .issues
343 .transition(
344 issue.number,
345 &self.config.labels.cooking,
346 &self.config.labels.failed,
347 )
348 .await;
349 (RunStatus::Failed, Some(format!("{e:#}")))
350 }
351 };
352
353 let conn = self.db.lock().await;
354 db::runs::finish_run(&conn, run_id, final_status, error_msg.as_deref())
355 }
356
357 async fn run_steps(
358 &self,
359 run_id: &str,
360 ctx: &AgentContext,
361 worktree_path: &std::path::Path,
362 auto_merge: bool,
363 target_dir: &std::path::Path,
364 ) -> Result<()> {
365 self.check_cancelled()?;
366
367 self.update_status(run_id, RunStatus::Implementing).await?;
369 let impl_prompt = agents::implementer::build_prompt(ctx)?;
370 let impl_result =
371 self.run_agent(run_id, AgentRole::Implementer, &impl_prompt, worktree_path, 1).await?;
372
373 git::push_branch(worktree_path, &ctx.branch).await?;
374
375 if let Some(pr_number) = ctx.pr_number {
377 let body = build_pr_body(&impl_result.output, ctx);
378 if let Err(e) =
379 self.github.edit_pr_in(pr_number, &pr_title(ctx), &body, target_dir).await
380 {
381 warn!(run_id = %run_id, error = %e, "failed to update PR description");
382 }
383 if let Err(e) = self.github.mark_pr_ready_in(pr_number, target_dir).await {
384 warn!(run_id = %run_id, error = %e, "failed to mark PR ready");
385 }
386 }
387
388 let clean = self.run_review_fix_loop(run_id, ctx, worktree_path, target_dir).await?;
390
391 if !clean {
392 anyhow::bail!("unresolved findings after max review cycles");
393 }
394
395 self.check_cancelled()?;
397 info!(run_id = %run_id, base = %ctx.base_branch, "rebasing onto base branch");
398 if let Err(e) = git::rebase_on_base(worktree_path, &ctx.base_branch).await {
399 if let Some(pr_number) = ctx.pr_number {
400 github::safe_comment(
401 &self.github,
402 pr_number,
403 &format_rebase_failure(&e),
404 target_dir,
405 )
406 .await;
407 }
408 return Err(e);
409 }
410 git::force_push_branch(worktree_path, &ctx.branch).await?;
411 if auto_merge {
413 self.check_cancelled()?;
414 ctx.pr_number.context("no PR number for merge step")?;
415 self.update_status(run_id, RunStatus::Merging).await?;
416 let merge_prompt = agents::merger::build_prompt(ctx, auto_merge)?;
417 self.run_agent(run_id, AgentRole::Merger, &merge_prompt, worktree_path, 1).await?;
418 }
419
420 Ok(())
421 }
422
423 async fn run_review_fix_loop(
424 &self,
425 run_id: &str,
426 ctx: &AgentContext,
427 worktree_path: &std::path::Path,
428 target_dir: &std::path::Path,
429 ) -> Result<bool> {
430 for cycle in 1..=3 {
431 self.check_cancelled()?;
432
433 self.update_status(run_id, RunStatus::Reviewing).await?;
434
435 let (prior_addressed, prior_disputes) = if cycle > 1 {
438 let conn = self.db.lock().await;
439 let all_resolved = db::agent_runs::get_resolved_findings(&conn, run_id)?;
440 drop(conn);
441
442 let (mut addressed, disputed): (Vec<_>, Vec<_>) =
443 all_resolved.into_iter().partition(|f| {
444 f.dispute_reason.as_deref().is_some_and(|r| r.starts_with("ADDRESSED: "))
445 });
446
447 for f in &mut addressed {
449 if let Some(ref mut reason) = f.dispute_reason {
450 if let Some(stripped) = reason.strip_prefix("ADDRESSED: ") {
451 *reason = stripped.to_string();
452 }
453 }
454 }
455
456 (addressed, disputed)
457 } else {
458 (Vec::new(), Vec::new())
459 };
460
461 let review_prompt =
462 agents::reviewer::build_prompt(ctx, &prior_addressed, &prior_disputes)?;
463 let review_result = self
464 .run_agent(run_id, AgentRole::Reviewer, &review_prompt, worktree_path, cycle)
465 .await?;
466
467 let review_output = match parse_review_output(&review_result.output) {
468 Ok(output) => output,
469 Err(e) => {
470 warn!(run_id = %run_id, cycle, error = %e, "review output unparseable, treating as failed review");
471 if let Some(pr_number) = ctx.pr_number {
472 github::safe_comment(
473 &self.github,
474 pr_number,
475 &format_review_parse_failure(cycle),
476 target_dir,
477 )
478 .await;
479 }
480 anyhow::bail!("reviewer returned unparseable output in cycle {cycle}");
481 }
482 };
483 self.store_findings(run_id, &review_output.findings).await?;
484
485 let actionable: Vec<_> =
486 review_output.findings.iter().filter(|f| f.severity != Severity::Info).collect();
487
488 if actionable.is_empty() {
489 info!(run_id = %run_id, cycle, "review clean");
490 return Ok(true);
491 }
492
493 info!(run_id = %run_id, cycle, findings = actionable.len(), "review found issues");
494
495 if cycle == 3 {
496 if let Some(pr_number) = ctx.pr_number {
497 let comment = format_unresolved_comment(&actionable);
498 github::safe_comment(&self.github, pr_number, &comment, target_dir).await;
499 } else {
500 warn!(run_id = %run_id, "no PR number, cannot post unresolved findings");
501 }
502 return Ok(false);
503 }
504
505 self.check_cancelled()?;
507 self.update_status(run_id, RunStatus::Fixing).await?;
508
509 let unresolved = {
510 let conn = self.db.lock().await;
511 db::agent_runs::get_unresolved_findings(&conn, run_id)?
512 };
513
514 let fix_prompt = agents::fixer::build_prompt(ctx, &unresolved)?;
515 let fix_result =
516 self.run_agent(run_id, AgentRole::Fixer, &fix_prompt, worktree_path, cycle).await?;
517
518 let fixer_output = parse_fixer_output(&fix_result.output);
520 self.process_fixer_disputes(run_id, &unresolved, &fixer_output).await?;
521 self.process_fixer_addressed(run_id, &unresolved, &fixer_output).await?;
522
523 git::push_branch(worktree_path, &ctx.branch).await?;
524 }
525
526 Ok(false)
527 }
528
529 async fn process_fixer_disputes(
535 &self,
536 run_id: &str,
537 findings_sent_to_fixer: &[ReviewFinding],
538 fixer_output: &agents::FixerOutput,
539 ) -> Result<()> {
540 if fixer_output.disputed.is_empty() {
541 return Ok(());
542 }
543
544 let conn = self.db.lock().await;
545 for dispute in &fixer_output.disputed {
546 let idx = dispute.finding.saturating_sub(1) as usize;
548 if let Some(finding) = findings_sent_to_fixer.get(idx) {
549 db::agent_runs::resolve_finding(&conn, finding.id, &dispute.reason)?;
550 info!(
551 run_id = %run_id,
552 finding_id = finding.id,
553 reason = %dispute.reason,
554 "finding disputed by fixer, marked resolved"
555 );
556 }
557 }
558 drop(conn);
559 Ok(())
560 }
561
562 async fn process_fixer_addressed(
568 &self,
569 run_id: &str,
570 findings_sent_to_fixer: &[ReviewFinding],
571 fixer_output: &agents::FixerOutput,
572 ) -> Result<()> {
573 if fixer_output.addressed.is_empty() {
574 return Ok(());
575 }
576
577 let conn = self.db.lock().await;
578 for action in &fixer_output.addressed {
579 let idx = action.finding.saturating_sub(1) as usize;
580 if let Some(finding) = findings_sent_to_fixer.get(idx) {
581 let reason = format!("ADDRESSED: {}", action.action);
582 db::agent_runs::resolve_finding(&conn, finding.id, &reason)?;
583 info!(
584 run_id = %run_id,
585 finding_id = finding.id,
586 action = %action.action,
587 "finding addressed by fixer, marked resolved"
588 );
589 }
590 }
591 drop(conn);
592 Ok(())
593 }
594
595 async fn store_findings(&self, run_id: &str, findings: &[agents::Finding]) -> Result<()> {
596 let conn = self.db.lock().await;
597 let agent_runs = db::agent_runs::get_agent_runs_for_run(&conn, run_id)?;
598 let reviewer_run_id = agent_runs
599 .iter()
600 .rev()
601 .find_map(|ar| if ar.agent == "reviewer" { Some(ar.id) } else { None });
602 if let Some(ar_id) = reviewer_run_id {
603 for finding in findings {
604 let db_finding = ReviewFinding {
605 id: 0,
606 agent_run_id: ar_id,
607 severity: finding.severity.to_string(),
608 category: finding.category.clone(),
609 file_path: finding.file_path.clone(),
610 line_number: finding.line_number,
611 message: finding.message.clone(),
612 resolved: false,
613 dispute_reason: None,
614 };
615 db::agent_runs::insert_finding(&conn, &db_finding)?;
616 }
617 }
618 drop(conn);
619 Ok(())
620 }
621
622 async fn run_agent(
623 &self,
624 run_id: &str,
625 role: AgentRole,
626 prompt: &str,
627 working_dir: &std::path::Path,
628 cycle: u32,
629 ) -> Result<crate::process::AgentResult> {
630 let agent_run_id = self.record_agent_start(run_id, role, cycle).await?;
631
632 info!(run_id = %run_id, agent = %role, cycle, "agent starting");
633
634 let invocation = AgentInvocation {
635 role,
636 prompt: prompt.to_string(),
637 working_dir: working_dir.to_path_buf(),
638 max_turns: Some(self.config.pipeline.turn_limit),
639 };
640
641 let result = invoke_agent(self.runner.as_ref(), &invocation).await;
642
643 match &result {
644 Ok(agent_result) => {
645 self.record_agent_success(run_id, agent_run_id, agent_result).await?;
646 }
647 Err(e) => {
648 let conn = self.db.lock().await;
649 db::agent_runs::finish_agent_run(
650 &conn,
651 agent_run_id,
652 "failed",
653 0.0,
654 0,
655 None,
656 Some(&format!("{e:#}")),
657 None,
658 )?;
659 }
660 }
661
662 result
663 }
664
665 async fn record_agent_start(&self, run_id: &str, role: AgentRole, cycle: u32) -> Result<i64> {
666 let agent_run = AgentRun {
667 id: 0,
668 run_id: run_id.to_string(),
669 agent: role.to_string(),
670 cycle,
671 status: "running".to_string(),
672 cost_usd: 0.0,
673 turns: 0,
674 started_at: chrono::Utc::now().to_rfc3339(),
675 finished_at: None,
676 output_summary: None,
677 error_message: None,
678 raw_output: None,
679 };
680 let conn = self.db.lock().await;
681 db::agent_runs::insert_agent_run(&conn, &agent_run)
682 }
683
684 async fn record_agent_success(
685 &self,
686 run_id: &str,
687 agent_run_id: i64,
688 agent_result: &crate::process::AgentResult,
689 ) -> Result<()> {
690 let conn = self.db.lock().await;
691 db::agent_runs::finish_agent_run(
692 &conn,
693 agent_run_id,
694 "complete",
695 agent_result.cost_usd,
696 agent_result.turns,
697 Some(&truncate(&agent_result.output, 500)),
698 None,
699 Some(&agent_result.output),
700 )?;
701
702 let new_cost = db::runs::increment_run_cost(&conn, run_id, agent_result.cost_usd)?;
703 drop(conn);
704
705 if new_cost > self.config.pipeline.cost_budget {
706 anyhow::bail!(
707 "cost budget exceeded: ${:.2} > ${:.2}",
708 new_cost,
709 self.config.pipeline.cost_budget
710 );
711 }
712 Ok(())
713 }
714
715 async fn update_status(&self, run_id: &str, status: RunStatus) -> Result<()> {
716 let conn = self.db.lock().await;
717 db::runs::update_run_status(&conn, run_id, status)
718 }
719
720 fn check_cancelled(&self) -> Result<()> {
721 if self.cancel_token.is_cancelled() {
722 anyhow::bail!("pipeline cancelled");
723 }
724 Ok(())
725 }
726}
727
728const COMMENT_FOOTER: &str =
729 "\n---\nAutomated by [oven](https://github.com/clayharmon/oven-cli) \u{1F35E}";
730
731fn format_unresolved_comment(actionable: &[&agents::Finding]) -> String {
732 let mut comment = String::from(
733 "## Pipeline stopped: unresolved review findings\n\n\
734 The review-fix loop ran 2 cycles but these findings remain unresolved.\n",
735 );
736
737 for severity in &[Severity::Critical, Severity::Warning] {
739 let group: Vec<_> = actionable.iter().filter(|f| &f.severity == severity).collect();
740 if group.is_empty() {
741 continue;
742 }
743 let heading = match severity {
744 Severity::Critical => "Critical",
745 Severity::Warning => "Warning",
746 Severity::Info => unreachable!("loop only iterates Critical and Warning"),
747 };
748 let _ = writeln!(comment, "\n### {heading}\n");
749 for f in group {
750 let loc = match (&f.file_path, f.line_number) {
751 (Some(path), Some(line)) => format!(" in `{path}:{line}`"),
752 (Some(path), None) => format!(" in `{path}`"),
753 _ => String::new(),
754 };
755 let _ = writeln!(comment, "- **{}**{} -- {}", f.category, loc, f.message);
756 }
757 }
758
759 comment.push_str(COMMENT_FOOTER);
760 comment
761}
762
763fn format_pipeline_failure(e: &anyhow::Error) -> String {
764 format!(
765 "## Pipeline failed\n\n\
766 **Error:** {e:#}\n\n\
767 The pipeline hit an unrecoverable error. Check the run logs for detail, \
768 or re-run the pipeline.\
769 {COMMENT_FOOTER}"
770 )
771}
772
773fn format_rebase_failure(e: &anyhow::Error) -> String {
774 format!(
775 "## Pipeline stopped: rebase conflict\n\n\
776 Could not rebase onto the base branch. This usually happens when another \
777 PR merged while this pipeline was running.\n\n\
778 **Error:** {e}\n\n\
779 Rebase manually and re-run the pipeline.\
780 {COMMENT_FOOTER}"
781 )
782}
783
784fn format_review_parse_failure(cycle: u32) -> String {
785 format!(
786 "## Pipeline stopped: review output error\n\n\
787 The reviewer agent returned output that could not be parsed as structured \
788 findings (cycle {cycle}). This usually means the reviewer produced malformed JSON.\n\n\
789 Re-run the pipeline to try again.\
790 {COMMENT_FOOTER}"
791 )
792}
793
794fn pr_title(ctx: &AgentContext) -> String {
799 let prefix = infer_commit_type(&ctx.issue_title);
800 if ctx.issue_source == "github" {
801 format!("{prefix}(#{}): {}", ctx.issue_number, ctx.issue_title)
802 } else {
803 format!("{prefix}: {}", ctx.issue_title)
804 }
805}
806
807fn infer_commit_type(title: &str) -> &'static str {
809 let lower = title.to_lowercase();
810 if lower.starts_with("feat") || lower.contains("add ") || lower.contains("implement ") {
811 "feat"
812 } else if lower.starts_with("refactor") {
813 "refactor"
814 } else if lower.starts_with("docs") || lower.starts_with("document") {
815 "docs"
816 } else if lower.starts_with("test") || lower.starts_with("add test") {
817 "test"
818 } else if lower.starts_with("chore") {
819 "chore"
820 } else {
821 "fix"
822 }
823}
824
825fn build_pr_body(impl_output: &str, ctx: &AgentContext) -> String {
827 let issue_ref = if ctx.issue_source == "github" {
828 format!("Resolves #{}", ctx.issue_number)
829 } else {
830 format!("From local issue #{}", ctx.issue_number)
831 };
832
833 let summary = extract_impl_summary(impl_output);
834
835 let mut body = String::new();
836 let _ = writeln!(body, "{issue_ref}\n");
837 let _ = write!(body, "{summary}");
838 body.push_str(COMMENT_FOOTER);
839 body
840}
841
842fn extract_impl_summary(output: &str) -> String {
848 let idx = output.find("## PR Template").or_else(|| output.find("## Changes Made"));
850
851 if let Some(idx) = idx {
852 let summary = output[idx..].trim();
853 let summary = summary.strip_prefix("## PR Template").map_or(summary, |s| s.trim_start());
855 if summary.len() <= 4000 {
856 return summary.to_string();
857 }
858 return truncate(summary, 4000);
859 }
860 String::from("*No implementation summary available. See commit history for details.*")
863}
864
865fn new_run(run_id: &str, issue: &PipelineIssue, auto_merge: bool) -> Run {
866 Run {
867 id: run_id.to_string(),
868 issue_number: issue.number,
869 status: RunStatus::Pending,
870 pr_number: None,
871 branch: None,
872 worktree_path: None,
873 cost_usd: 0.0,
874 auto_merge,
875 started_at: chrono::Utc::now().to_rfc3339(),
876 finished_at: None,
877 error_message: None,
878 complexity: "full".to_string(),
879 issue_source: issue.source.to_string(),
880 }
881}
882
883pub fn generate_run_id() -> String {
885 uuid::Uuid::new_v4().to_string()[..8].to_string()
886}
887
888pub(crate) fn truncate(s: &str, max_len: usize) -> String {
893 if s.len() <= max_len {
894 return s.to_string();
895 }
896 let target = max_len.saturating_sub(3);
897 let mut end = target;
898 while end > 0 && !s.is_char_boundary(end) {
899 end -= 1;
900 }
901 format!("{}...", &s[..end])
902}
903
904#[cfg(test)]
905mod tests {
906 use proptest::prelude::*;
907
908 use super::*;
909
910 proptest! {
911 #[test]
912 fn run_ids_always_8_hex_chars(_seed in any::<u64>()) {
913 let id = generate_run_id();
914 prop_assert_eq!(id.len(), 8);
915 prop_assert!(id.chars().all(|c| c.is_ascii_hexdigit()));
916 }
917 }
918
919 #[test]
920 fn run_id_is_8_hex_chars() {
921 let id = generate_run_id();
922 assert_eq!(id.len(), 8);
923 assert!(id.chars().all(|c| c.is_ascii_hexdigit()));
924 }
925
926 #[test]
927 fn run_ids_are_unique() {
928 let ids: Vec<_> = (0..100).map(|_| generate_run_id()).collect();
929 let unique: std::collections::HashSet<_> = ids.iter().collect();
930 assert_eq!(ids.len(), unique.len());
931 }
932
933 #[test]
934 fn truncate_short_string() {
935 assert_eq!(truncate("hello", 10), "hello");
936 }
937
938 #[test]
939 fn truncate_long_string() {
940 let long = "a".repeat(100);
941 let result = truncate(&long, 10);
942 assert_eq!(result.len(), 10); assert!(result.ends_with("..."));
944 }
945
946 #[test]
947 fn truncate_multibyte_does_not_panic() {
948 let s = "πππ";
951 let result = truncate(s, 8);
952 assert!(result.ends_with("..."));
953 assert!(result.starts_with("π"));
954 assert!(result.len() <= 8);
955 }
956
957 #[test]
958 fn truncate_cjk_boundary() {
959 let s = "δ½ ε₯½δΈηζ΅θ―"; let result = truncate(s, 10);
963 assert!(result.ends_with("..."));
964 assert!(result.starts_with("δ½ ε₯½"));
965 assert!(result.len() <= 10);
966 }
967
968 #[test]
969 fn extract_impl_summary_finds_changes_made() {
970 let output = "Some preamble text\n\n## Changes Made\n- src/foo.rs: added bar\n\n## Tests Added\n- tests/foo.rs: bar test\n";
971 let summary = extract_impl_summary(output);
972 assert!(summary.starts_with("## Changes Made"));
973 assert!(summary.contains("added bar"));
974 assert!(summary.contains("## Tests Added"));
975 }
976
977 #[test]
978 fn extract_impl_summary_prefers_pr_template() {
979 let output = "Preamble\n\n## PR Template\n## Summary\n- Added auth flow\n\n## Testing\n- Unit tests pass\n";
980 let summary = extract_impl_summary(output);
981 assert!(!summary.contains("## PR Template"));
983 assert!(summary.starts_with("## Summary"));
984 assert!(summary.contains("Added auth flow"));
985 }
986
987 #[test]
988 fn extract_impl_summary_fallback_on_no_heading() {
989 let output = "just some raw agent output with no structure";
990 let summary = extract_impl_summary(output);
991 assert_eq!(
992 summary,
993 "*No implementation summary available. See commit history for details.*"
994 );
995 }
996
997 #[test]
998 fn extract_impl_summary_empty_output() {
999 let placeholder = "*No implementation summary available. See commit history for details.*";
1000 assert_eq!(extract_impl_summary(""), placeholder);
1001 assert_eq!(extract_impl_summary(" "), placeholder);
1002 }
1003
1004 #[test]
1005 fn build_pr_body_github_issue() {
1006 let ctx = AgentContext {
1007 issue_number: 42,
1008 issue_title: "fix the thing".to_string(),
1009 issue_body: String::new(),
1010 branch: "oven/issue-42".to_string(),
1011 pr_number: Some(10),
1012 test_command: None,
1013 lint_command: None,
1014 review_findings: None,
1015 cycle: 1,
1016 target_repo: None,
1017 issue_source: "github".to_string(),
1018 base_branch: "main".to_string(),
1019 };
1020 let body = build_pr_body("## Changes Made\n- added stuff", &ctx);
1021 assert!(body.contains("Resolves #42"));
1022 assert!(body.contains("## Changes Made"));
1023 assert!(body.contains("Automated by [oven]"));
1024 }
1025
1026 #[test]
1027 fn build_pr_body_local_issue() {
1028 let ctx = AgentContext {
1029 issue_number: 7,
1030 issue_title: "local thing".to_string(),
1031 issue_body: String::new(),
1032 branch: "oven/issue-7".to_string(),
1033 pr_number: Some(10),
1034 test_command: None,
1035 lint_command: None,
1036 review_findings: None,
1037 cycle: 1,
1038 target_repo: None,
1039 issue_source: "local".to_string(),
1040 base_branch: "main".to_string(),
1041 };
1042 let body = build_pr_body("## Changes Made\n- did local stuff", &ctx);
1043 assert!(body.contains("From local issue #7"));
1044 assert!(body.contains("## Changes Made"));
1045 }
1046
1047 #[test]
1048 fn pr_title_github() {
1049 let ctx = AgentContext {
1050 issue_number: 42,
1051 issue_title: "fix the thing".to_string(),
1052 issue_body: String::new(),
1053 branch: String::new(),
1054 pr_number: None,
1055 test_command: None,
1056 lint_command: None,
1057 review_findings: None,
1058 cycle: 1,
1059 target_repo: None,
1060 issue_source: "github".to_string(),
1061 base_branch: "main".to_string(),
1062 };
1063 assert_eq!(pr_title(&ctx), "fix(#42): fix the thing");
1064 }
1065
1066 #[test]
1067 fn pr_title_local() {
1068 let ctx = AgentContext {
1069 issue_number: 7,
1070 issue_title: "local thing".to_string(),
1071 issue_body: String::new(),
1072 branch: String::new(),
1073 pr_number: None,
1074 test_command: None,
1075 lint_command: None,
1076 review_findings: None,
1077 cycle: 1,
1078 target_repo: None,
1079 issue_source: "local".to_string(),
1080 base_branch: "main".to_string(),
1081 };
1082 assert_eq!(pr_title(&ctx), "fix: local thing");
1083 }
1084
1085 #[test]
1086 fn infer_commit_type_feat() {
1087 assert_eq!(infer_commit_type("Add dark mode support"), "feat");
1088 assert_eq!(infer_commit_type("Implement caching layer"), "feat");
1089 assert_eq!(infer_commit_type("Feature: new dashboard"), "feat");
1090 }
1091
1092 #[test]
1093 fn infer_commit_type_refactor() {
1094 assert_eq!(infer_commit_type("Refactor auth middleware"), "refactor");
1095 }
1096
1097 #[test]
1098 fn infer_commit_type_docs() {
1099 assert_eq!(infer_commit_type("Document the API endpoints"), "docs");
1100 assert_eq!(infer_commit_type("Docs: update README"), "docs");
1101 }
1102
1103 #[test]
1104 fn infer_commit_type_defaults_to_fix() {
1105 assert_eq!(infer_commit_type("Null pointer in config parser"), "fix");
1106 assert_eq!(infer_commit_type("Crash on empty input"), "fix");
1107 }
1108
1109 #[test]
1110 fn pr_title_feat_github() {
1111 let ctx = AgentContext {
1112 issue_number: 10,
1113 issue_title: "Add dark mode".to_string(),
1114 issue_body: String::new(),
1115 branch: String::new(),
1116 pr_number: None,
1117 test_command: None,
1118 lint_command: None,
1119 review_findings: None,
1120 cycle: 1,
1121 target_repo: None,
1122 issue_source: "github".to_string(),
1123 base_branch: "main".to_string(),
1124 };
1125 assert_eq!(pr_title(&ctx), "feat(#10): Add dark mode");
1126 }
1127
1128 #[test]
1129 fn format_unresolved_comment_groups_by_severity() {
1130 let findings = [
1131 agents::Finding {
1132 severity: Severity::Critical,
1133 category: "bug".to_string(),
1134 file_path: Some("src/main.rs".to_string()),
1135 line_number: Some(42),
1136 message: "null pointer".to_string(),
1137 },
1138 agents::Finding {
1139 severity: Severity::Warning,
1140 category: "style".to_string(),
1141 file_path: None,
1142 line_number: None,
1143 message: "missing docs".to_string(),
1144 },
1145 ];
1146 let refs: Vec<_> = findings.iter().collect();
1147 let comment = format_unresolved_comment(&refs);
1148 assert!(comment.contains("### Critical"));
1149 assert!(comment.contains("### Warning"));
1150 assert!(comment.contains("**bug** in `src/main.rs:42` -- null pointer"));
1151 assert!(comment.contains("**style** -- missing docs"));
1152 assert!(comment.contains("Automated by [oven]"));
1153 }
1154
1155 #[test]
1156 fn format_unresolved_comment_skips_empty_severity_groups() {
1157 let findings = [agents::Finding {
1158 severity: Severity::Warning,
1159 category: "testing".to_string(),
1160 file_path: Some("src/lib.rs".to_string()),
1161 line_number: None,
1162 message: "missing edge case test".to_string(),
1163 }];
1164 let refs: Vec<_> = findings.iter().collect();
1165 let comment = format_unresolved_comment(&refs);
1166 assert!(!comment.contains("### Critical"));
1167 assert!(comment.contains("### Warning"));
1168 }
1169
1170 #[test]
1171 fn format_pipeline_failure_includes_error() {
1172 let err = anyhow::anyhow!("cost budget exceeded: $12.50 > $10.00");
1173 let comment = format_pipeline_failure(&err);
1174 assert!(comment.contains("## Pipeline failed"));
1175 assert!(comment.contains("cost budget exceeded"));
1176 assert!(comment.contains("Automated by [oven]"));
1177 }
1178
1179 #[test]
1180 fn format_rebase_failure_includes_error() {
1181 let err = anyhow::anyhow!("merge conflict in src/config/mod.rs");
1182 let comment = format_rebase_failure(&err);
1183 assert!(comment.contains("## Pipeline stopped: rebase conflict"));
1184 assert!(comment.contains("merge conflict"));
1185 assert!(comment.contains("Rebase manually"));
1186 }
1187
1188 #[test]
1189 fn format_review_parse_failure_includes_cycle() {
1190 let comment = format_review_parse_failure(2);
1191 assert!(comment.contains("## Pipeline stopped: review output error"));
1192 assert!(comment.contains("cycle 2"));
1193 assert!(comment.contains("Re-run the pipeline"));
1194 }
1195}