1use std::{fmt::Write as _, path::PathBuf, sync::Arc};
2
3use anyhow::{Context, Result};
4use rusqlite::Connection;
5use tokio::sync::Mutex;
6use tokio_util::sync::CancellationToken;
7use tracing::{debug, info, warn};
8
9use crate::{
10 agents::{
11 self, AgentContext, AgentInvocation, AgentRole, Complexity, GraphContextNode,
12 PlannerGraphOutput, Severity, invoke_agent, parse_fixer_output, parse_planner_graph_output,
13 parse_review_output,
14 },
15 config::Config,
16 db::{self, AgentRun, ReviewFinding, Run, RunStatus},
17 git,
18 github::{self, GhClient},
19 issues::{IssueOrigin, IssueProvider, PipelineIssue},
20 process::CommandRunner,
21};
22
23#[derive(Debug)]
25pub struct PipelineOutcome {
26 pub run_id: String,
27 pub pr_number: u32,
28 pub worktree_path: PathBuf,
30 pub target_dir: PathBuf,
32}
33
34pub struct PipelineExecutor<R: CommandRunner> {
36 pub runner: Arc<R>,
37 pub github: Arc<GhClient<R>>,
38 pub issues: Arc<dyn IssueProvider>,
39 pub db: Arc<Mutex<Connection>>,
40 pub config: Config,
41 pub cancel_token: CancellationToken,
42 pub repo_dir: PathBuf,
43}
44
45impl<R: CommandRunner + 'static> PipelineExecutor<R> {
46 pub async fn run_issue(&self, issue: &PipelineIssue, auto_merge: bool) -> Result<()> {
48 self.run_issue_with_complexity(issue, auto_merge, None).await
49 }
50
51 pub async fn run_issue_with_complexity(
53 &self,
54 issue: &PipelineIssue,
55 auto_merge: bool,
56 complexity: Option<Complexity>,
57 ) -> Result<()> {
58 let outcome = self.run_issue_pipeline(issue, auto_merge, complexity).await?;
59 self.finalize_merge(&outcome, issue).await
60 }
61
62 pub async fn run_issue_pipeline(
68 &self,
69 issue: &PipelineIssue,
70 auto_merge: bool,
71 complexity: Option<Complexity>,
72 ) -> Result<PipelineOutcome> {
73 let run_id = generate_run_id();
74
75 let (target_dir, is_multi_repo) = self.resolve_target_dir(issue.target_repo.as_ref())?;
77
78 let base_branch = git::default_branch(&target_dir).await?;
79
80 let mut run = new_run(&run_id, issue, auto_merge);
81 if let Some(ref c) = complexity {
82 run.complexity = c.to_string();
83 }
84 {
85 let conn = self.db.lock().await;
86 db::runs::insert_run(&conn, &run)?;
87 }
88
89 self.issues
90 .transition(issue.number, &self.config.labels.ready, &self.config.labels.cooking)
91 .await?;
92
93 let worktree = git::create_worktree(&target_dir, issue.number, &base_branch).await?;
94 self.record_worktree(&run_id, &worktree).await?;
95
96 git::empty_commit(
98 &worktree.path,
99 &format!("chore: start oven pipeline for issue #{}", issue.number),
100 )
101 .await?;
102
103 info!(
104 run_id = %run_id,
105 issue = issue.number,
106 branch = %worktree.branch,
107 target_repo = ?issue.target_repo,
108 "starting pipeline"
109 );
110
111 let pr_number = self.create_pr(&run_id, issue, &worktree.branch, &target_dir).await?;
112
113 let ctx = AgentContext {
114 issue_number: issue.number,
115 issue_title: issue.title.clone(),
116 issue_body: issue.body.clone(),
117 branch: worktree.branch.clone(),
118 pr_number: Some(pr_number),
119 test_command: self.config.project.test.clone(),
120 lint_command: self.config.project.lint.clone(),
121 review_findings: None,
122 cycle: 1,
123 target_repo: if is_multi_repo { issue.target_repo.clone() } else { None },
124 issue_source: issue.source.as_str().to_string(),
125 base_branch: base_branch.clone(),
126 };
127
128 let result = self.run_steps(&run_id, &ctx, &worktree.path, auto_merge, &target_dir).await;
129
130 if let Err(ref e) = result {
131 self.finalize_run(&run_id, issue, pr_number, &result, &target_dir).await?;
133 if let Err(e) = git::remove_worktree(&target_dir, &worktree.path).await {
134 warn!(run_id = %run_id, error = %e, "failed to clean up worktree");
135 }
136 return Err(anyhow::anyhow!("{e:#}"));
137 }
138
139 self.update_status(&run_id, RunStatus::AwaitingMerge).await?;
141
142 Ok(PipelineOutcome { run_id, pr_number, worktree_path: worktree.path, target_dir })
143 }
144
145 pub async fn finalize_merge(
150 &self,
151 outcome: &PipelineOutcome,
152 issue: &PipelineIssue,
153 ) -> Result<()> {
154 self.finalize_run(&outcome.run_id, issue, outcome.pr_number, &Ok(()), &outcome.target_dir)
155 .await?;
156 if let Err(e) = git::remove_worktree(&outcome.target_dir, &outcome.worktree_path).await {
157 warn!(
158 run_id = %outcome.run_id,
159 error = %e,
160 "failed to clean up worktree after merge"
161 );
162 }
163 Ok(())
164 }
165
166 pub async fn plan_issues(
173 &self,
174 issues: &[PipelineIssue],
175 graph_context: &[GraphContextNode],
176 ) -> Option<PlannerGraphOutput> {
177 let prompt = match agents::planner::build_prompt(issues, graph_context) {
178 Ok(p) => p,
179 Err(e) => {
180 warn!(error = %e, "planner prompt build failed");
181 return None;
182 }
183 };
184 let invocation = AgentInvocation {
185 role: AgentRole::Planner,
186 prompt,
187 working_dir: self.repo_dir.clone(),
188 max_turns: Some(self.config.pipeline.turn_limit),
189 };
190
191 match invoke_agent(self.runner.as_ref(), &invocation).await {
192 Ok(result) => {
193 debug!(output = %result.output, "raw planner output");
194 let parsed = parse_planner_graph_output(&result.output);
195 if parsed.is_none() {
196 warn!(output = %result.output, "planner returned unparseable output, falling back to all-parallel");
197 }
198 parsed
199 }
200 Err(e) => {
201 warn!(error = %e, "planner agent failed, falling back to all-parallel");
202 None
203 }
204 }
205 }
206
207 pub(crate) fn resolve_target_dir(
212 &self,
213 target_repo: Option<&String>,
214 ) -> Result<(PathBuf, bool)> {
215 if !self.config.multi_repo.enabled {
216 return Ok((self.repo_dir.clone(), false));
217 }
218 match target_repo {
219 Some(name) => {
220 let path = self.config.resolve_repo(name)?;
221 Ok((path, true))
222 }
223 None => Ok((self.repo_dir.clone(), false)),
224 }
225 }
226
227 pub fn reconstruct_outcome(
232 &self,
233 issue: &PipelineIssue,
234 run_id: &str,
235 pr_number: u32,
236 ) -> Result<PipelineOutcome> {
237 let (target_dir, _) = self.resolve_target_dir(issue.target_repo.as_ref())?;
238 let worktree_path =
239 target_dir.join(".oven").join("worktrees").join(format!("issue-{}", issue.number));
240 Ok(PipelineOutcome { run_id: run_id.to_string(), pr_number, worktree_path, target_dir })
241 }
242
243 async fn record_worktree(&self, run_id: &str, worktree: &git::Worktree) -> Result<()> {
244 let conn = self.db.lock().await;
245 db::runs::update_run_worktree(
246 &conn,
247 run_id,
248 &worktree.branch,
249 &worktree.path.to_string_lossy(),
250 )?;
251 drop(conn);
252 Ok(())
253 }
254
255 async fn create_pr(
256 &self,
257 run_id: &str,
258 issue: &PipelineIssue,
259 branch: &str,
260 repo_dir: &std::path::Path,
261 ) -> Result<u32> {
262 let (pr_title, pr_body) = match issue.source {
263 IssueOrigin::Github => (
264 format!("fix(#{}): {}", issue.number, issue.title),
265 format!(
266 "Resolves #{}\n\nAutomated by [oven](https://github.com/clayharmon/oven-cli).",
267 issue.number
268 ),
269 ),
270 IssueOrigin::Local => (
271 format!("fix: {}", issue.title),
272 format!(
273 "From local issue #{}\n\nAutomated by [oven](https://github.com/clayharmon/oven-cli).",
274 issue.number
275 ),
276 ),
277 };
278
279 git::push_branch(repo_dir, branch).await?;
280 let pr_number =
281 self.github.create_draft_pr_in(&pr_title, branch, &pr_body, repo_dir).await?;
282
283 {
284 let conn = self.db.lock().await;
285 db::runs::update_run_pr(&conn, run_id, pr_number)?;
286 }
287
288 info!(run_id = %run_id, pr = pr_number, "draft PR created");
289 Ok(pr_number)
290 }
291
292 async fn finalize_run(
293 &self,
294 run_id: &str,
295 issue: &PipelineIssue,
296 pr_number: u32,
297 result: &Result<()>,
298 target_dir: &std::path::Path,
299 ) -> Result<()> {
300 let (final_status, error_msg) = match result {
301 Ok(()) => {
302 self.issues
303 .transition(
304 issue.number,
305 &self.config.labels.cooking,
306 &self.config.labels.complete,
307 )
308 .await?;
309
310 let should_close =
314 issue.source == IssueOrigin::Local || issue.target_repo.is_some();
315
316 if should_close {
317 let comment = issue.target_repo.as_ref().map_or_else(
318 || format!("Implemented in #{pr_number}"),
319 |repo_name| format!("Implemented in {repo_name}#{pr_number}"),
320 );
321 if let Err(e) = self.issues.close(issue.number, Some(&comment)).await {
322 warn!(
323 run_id = %run_id,
324 error = %e,
325 "failed to close issue"
326 );
327 }
328 }
329
330 (RunStatus::Complete, None)
331 }
332 Err(e) => {
333 warn!(run_id = %run_id, error = %e, "pipeline failed");
334 github::safe_comment(
335 &self.github,
336 pr_number,
337 &format_pipeline_failure(e),
338 target_dir,
339 )
340 .await;
341 let _ = self
342 .issues
343 .transition(
344 issue.number,
345 &self.config.labels.cooking,
346 &self.config.labels.failed,
347 )
348 .await;
349 (RunStatus::Failed, Some(format!("{e:#}")))
350 }
351 };
352
353 let conn = self.db.lock().await;
354 db::runs::finish_run(&conn, run_id, final_status, error_msg.as_deref())
355 }
356
357 async fn run_steps(
358 &self,
359 run_id: &str,
360 ctx: &AgentContext,
361 worktree_path: &std::path::Path,
362 auto_merge: bool,
363 target_dir: &std::path::Path,
364 ) -> Result<()> {
365 self.check_cancelled()?;
366
367 self.update_status(run_id, RunStatus::Implementing).await?;
369 let impl_prompt = agents::implementer::build_prompt(ctx)?;
370 let impl_result =
371 self.run_agent(run_id, AgentRole::Implementer, &impl_prompt, worktree_path, 1).await?;
372
373 git::push_branch(worktree_path, &ctx.branch).await?;
374
375 if let Some(pr_number) = ctx.pr_number {
377 let body = build_pr_body(&impl_result.output, ctx);
378 if let Err(e) =
379 self.github.edit_pr_in(pr_number, &pr_title(ctx), &body, target_dir).await
380 {
381 warn!(run_id = %run_id, error = %e, "failed to update PR description");
382 }
383 if let Err(e) = self.github.mark_pr_ready_in(pr_number, target_dir).await {
384 warn!(run_id = %run_id, error = %e, "failed to mark PR ready");
385 }
386 }
387
388 let clean = self.run_review_fix_loop(run_id, ctx, worktree_path, target_dir).await?;
390
391 if !clean {
392 anyhow::bail!("unresolved findings after max review cycles");
393 }
394
395 self.check_cancelled()?;
397 info!(run_id = %run_id, base = %ctx.base_branch, "rebasing onto base branch");
398 if let Err(e) = git::rebase_on_base(worktree_path, &ctx.base_branch).await {
399 if let Some(pr_number) = ctx.pr_number {
400 github::safe_comment(
401 &self.github,
402 pr_number,
403 &format_rebase_failure(&e),
404 target_dir,
405 )
406 .await;
407 }
408 return Err(e);
409 }
410 git::force_push_branch(worktree_path, &ctx.branch).await?;
411 if auto_merge {
413 self.check_cancelled()?;
414 ctx.pr_number.context("no PR number for merge step")?;
415 self.update_status(run_id, RunStatus::Merging).await?;
416 let merge_prompt = agents::merger::build_prompt(ctx, auto_merge)?;
417 self.run_agent(run_id, AgentRole::Merger, &merge_prompt, worktree_path, 1).await?;
418 }
419
420 Ok(())
421 }
422
423 async fn run_review_fix_loop(
424 &self,
425 run_id: &str,
426 ctx: &AgentContext,
427 worktree_path: &std::path::Path,
428 target_dir: &std::path::Path,
429 ) -> Result<bool> {
430 for cycle in 1..=3 {
431 self.check_cancelled()?;
432
433 self.update_status(run_id, RunStatus::Reviewing).await?;
434
435 let prior_disputes = if cycle > 1 {
438 let conn = self.db.lock().await;
439 db::agent_runs::get_resolved_findings(&conn, run_id)?
440 } else {
441 Vec::new()
442 };
443
444 let review_prompt = agents::reviewer::build_prompt(ctx, &prior_disputes)?;
445 let review_result = self
446 .run_agent(run_id, AgentRole::Reviewer, &review_prompt, worktree_path, cycle)
447 .await?;
448
449 let review_output = match parse_review_output(&review_result.output) {
450 Ok(output) => output,
451 Err(e) => {
452 warn!(run_id = %run_id, cycle, error = %e, "review output unparseable, treating as failed review");
453 if let Some(pr_number) = ctx.pr_number {
454 github::safe_comment(
455 &self.github,
456 pr_number,
457 &format_review_parse_failure(cycle),
458 target_dir,
459 )
460 .await;
461 }
462 anyhow::bail!("reviewer returned unparseable output in cycle {cycle}");
463 }
464 };
465 self.store_findings(run_id, &review_output.findings).await?;
466
467 let actionable: Vec<_> =
468 review_output.findings.iter().filter(|f| f.severity != Severity::Info).collect();
469
470 if actionable.is_empty() {
471 info!(run_id = %run_id, cycle, "review clean");
472 return Ok(true);
473 }
474
475 info!(run_id = %run_id, cycle, findings = actionable.len(), "review found issues");
476
477 if cycle == 3 {
478 if let Some(pr_number) = ctx.pr_number {
479 let comment = format_unresolved_comment(&actionable);
480 github::safe_comment(&self.github, pr_number, &comment, target_dir).await;
481 } else {
482 warn!(run_id = %run_id, "no PR number, cannot post unresolved findings");
483 }
484 return Ok(false);
485 }
486
487 self.check_cancelled()?;
489 self.update_status(run_id, RunStatus::Fixing).await?;
490
491 let unresolved = {
492 let conn = self.db.lock().await;
493 db::agent_runs::get_unresolved_findings(&conn, run_id)?
494 };
495
496 let fix_prompt = agents::fixer::build_prompt(ctx, &unresolved)?;
497 let fix_result =
498 self.run_agent(run_id, AgentRole::Fixer, &fix_prompt, worktree_path, cycle).await?;
499
500 let fixer_output = parse_fixer_output(&fix_result.output);
502 self.process_fixer_disputes(run_id, &unresolved, &fixer_output).await?;
503
504 git::push_branch(worktree_path, &ctx.branch).await?;
505 }
506
507 Ok(false)
508 }
509
510 async fn process_fixer_disputes(
516 &self,
517 run_id: &str,
518 findings_sent_to_fixer: &[ReviewFinding],
519 fixer_output: &agents::FixerOutput,
520 ) -> Result<()> {
521 if fixer_output.disputed.is_empty() {
522 return Ok(());
523 }
524
525 let conn = self.db.lock().await;
526 for dispute in &fixer_output.disputed {
527 let idx = dispute.finding.saturating_sub(1) as usize;
529 if let Some(finding) = findings_sent_to_fixer.get(idx) {
530 db::agent_runs::resolve_finding(&conn, finding.id, &dispute.reason)?;
531 info!(
532 run_id = %run_id,
533 finding_id = finding.id,
534 reason = %dispute.reason,
535 "finding disputed by fixer, marked resolved"
536 );
537 }
538 }
539 drop(conn);
540 Ok(())
541 }
542
543 async fn store_findings(&self, run_id: &str, findings: &[agents::Finding]) -> Result<()> {
544 let conn = self.db.lock().await;
545 let agent_runs = db::agent_runs::get_agent_runs_for_run(&conn, run_id)?;
546 let reviewer_run_id = agent_runs
547 .iter()
548 .rev()
549 .find_map(|ar| if ar.agent == "reviewer" { Some(ar.id) } else { None });
550 if let Some(ar_id) = reviewer_run_id {
551 for finding in findings {
552 let db_finding = ReviewFinding {
553 id: 0,
554 agent_run_id: ar_id,
555 severity: finding.severity.to_string(),
556 category: finding.category.clone(),
557 file_path: finding.file_path.clone(),
558 line_number: finding.line_number,
559 message: finding.message.clone(),
560 resolved: false,
561 dispute_reason: None,
562 };
563 db::agent_runs::insert_finding(&conn, &db_finding)?;
564 }
565 }
566 drop(conn);
567 Ok(())
568 }
569
570 async fn run_agent(
571 &self,
572 run_id: &str,
573 role: AgentRole,
574 prompt: &str,
575 working_dir: &std::path::Path,
576 cycle: u32,
577 ) -> Result<crate::process::AgentResult> {
578 let agent_run_id = self.record_agent_start(run_id, role, cycle).await?;
579
580 info!(run_id = %run_id, agent = %role, cycle, "agent starting");
581
582 let invocation = AgentInvocation {
583 role,
584 prompt: prompt.to_string(),
585 working_dir: working_dir.to_path_buf(),
586 max_turns: Some(self.config.pipeline.turn_limit),
587 };
588
589 let result = invoke_agent(self.runner.as_ref(), &invocation).await;
590
591 match &result {
592 Ok(agent_result) => {
593 self.record_agent_success(run_id, agent_run_id, agent_result).await?;
594 }
595 Err(e) => {
596 let conn = self.db.lock().await;
597 db::agent_runs::finish_agent_run(
598 &conn,
599 agent_run_id,
600 "failed",
601 0.0,
602 0,
603 None,
604 Some(&format!("{e:#}")),
605 None,
606 )?;
607 }
608 }
609
610 result
611 }
612
613 async fn record_agent_start(&self, run_id: &str, role: AgentRole, cycle: u32) -> Result<i64> {
614 let agent_run = AgentRun {
615 id: 0,
616 run_id: run_id.to_string(),
617 agent: role.to_string(),
618 cycle,
619 status: "running".to_string(),
620 cost_usd: 0.0,
621 turns: 0,
622 started_at: chrono::Utc::now().to_rfc3339(),
623 finished_at: None,
624 output_summary: None,
625 error_message: None,
626 raw_output: None,
627 };
628 let conn = self.db.lock().await;
629 db::agent_runs::insert_agent_run(&conn, &agent_run)
630 }
631
632 async fn record_agent_success(
633 &self,
634 run_id: &str,
635 agent_run_id: i64,
636 agent_result: &crate::process::AgentResult,
637 ) -> Result<()> {
638 let conn = self.db.lock().await;
639 db::agent_runs::finish_agent_run(
640 &conn,
641 agent_run_id,
642 "complete",
643 agent_result.cost_usd,
644 agent_result.turns,
645 Some(&truncate(&agent_result.output, 500)),
646 None,
647 Some(&agent_result.output),
648 )?;
649
650 let new_cost = db::runs::increment_run_cost(&conn, run_id, agent_result.cost_usd)?;
651 drop(conn);
652
653 if new_cost > self.config.pipeline.cost_budget {
654 anyhow::bail!(
655 "cost budget exceeded: ${:.2} > ${:.2}",
656 new_cost,
657 self.config.pipeline.cost_budget
658 );
659 }
660 Ok(())
661 }
662
663 async fn update_status(&self, run_id: &str, status: RunStatus) -> Result<()> {
664 let conn = self.db.lock().await;
665 db::runs::update_run_status(&conn, run_id, status)
666 }
667
668 fn check_cancelled(&self) -> Result<()> {
669 if self.cancel_token.is_cancelled() {
670 anyhow::bail!("pipeline cancelled");
671 }
672 Ok(())
673 }
674}
675
676const COMMENT_FOOTER: &str =
677 "\n---\nAutomated by [oven](https://github.com/clayharmon/oven-cli) \u{1F35E}";
678
679fn format_unresolved_comment(actionable: &[&agents::Finding]) -> String {
680 let mut comment = String::from(
681 "## Pipeline stopped: unresolved review findings\n\n\
682 The review-fix loop ran 2 cycles but these findings remain unresolved.\n",
683 );
684
685 for severity in &[Severity::Critical, Severity::Warning] {
687 let group: Vec<_> = actionable.iter().filter(|f| &f.severity == severity).collect();
688 if group.is_empty() {
689 continue;
690 }
691 let heading = match severity {
692 Severity::Critical => "Critical",
693 Severity::Warning => "Warning",
694 Severity::Info => unreachable!("loop only iterates Critical and Warning"),
695 };
696 let _ = writeln!(comment, "\n### {heading}\n");
697 for f in group {
698 let loc = match (&f.file_path, f.line_number) {
699 (Some(path), Some(line)) => format!(" in `{path}:{line}`"),
700 (Some(path), None) => format!(" in `{path}`"),
701 _ => String::new(),
702 };
703 let _ = writeln!(comment, "- **{}**{} -- {}", f.category, loc, f.message);
704 }
705 }
706
707 comment.push_str(COMMENT_FOOTER);
708 comment
709}
710
711fn format_pipeline_failure(e: &anyhow::Error) -> String {
712 format!(
713 "## Pipeline failed\n\n\
714 **Error:** {e:#}\n\n\
715 The pipeline hit an unrecoverable error. Check the run logs for detail, \
716 or re-run the pipeline.\
717 {COMMENT_FOOTER}"
718 )
719}
720
721fn format_rebase_failure(e: &anyhow::Error) -> String {
722 format!(
723 "## Pipeline stopped: rebase conflict\n\n\
724 Could not rebase onto the base branch. This usually happens when another \
725 PR merged while this pipeline was running.\n\n\
726 **Error:** {e}\n\n\
727 Rebase manually and re-run the pipeline.\
728 {COMMENT_FOOTER}"
729 )
730}
731
732fn format_review_parse_failure(cycle: u32) -> String {
733 format!(
734 "## Pipeline stopped: review output error\n\n\
735 The reviewer agent returned output that could not be parsed as structured \
736 findings (cycle {cycle}). This usually means the reviewer produced malformed JSON.\n\n\
737 Re-run the pipeline to try again.\
738 {COMMENT_FOOTER}"
739 )
740}
741
742fn pr_title(ctx: &AgentContext) -> String {
747 let prefix = infer_commit_type(&ctx.issue_title);
748 if ctx.issue_source == "github" {
749 format!("{prefix}(#{}): {}", ctx.issue_number, ctx.issue_title)
750 } else {
751 format!("{prefix}: {}", ctx.issue_title)
752 }
753}
754
755fn infer_commit_type(title: &str) -> &'static str {
757 let lower = title.to_lowercase();
758 if lower.starts_with("feat") || lower.contains("add ") || lower.contains("implement ") {
759 "feat"
760 } else if lower.starts_with("refactor") {
761 "refactor"
762 } else if lower.starts_with("docs") || lower.starts_with("document") {
763 "docs"
764 } else if lower.starts_with("test") || lower.starts_with("add test") {
765 "test"
766 } else if lower.starts_with("chore") {
767 "chore"
768 } else {
769 "fix"
770 }
771}
772
773fn build_pr_body(impl_output: &str, ctx: &AgentContext) -> String {
775 let issue_ref = if ctx.issue_source == "github" {
776 format!("Resolves #{}", ctx.issue_number)
777 } else {
778 format!("From local issue #{}", ctx.issue_number)
779 };
780
781 let summary = extract_impl_summary(impl_output);
782
783 let mut body = String::new();
784 let _ = writeln!(body, "{issue_ref}\n");
785 let _ = write!(body, "{summary}");
786 body.push_str(COMMENT_FOOTER);
787 body
788}
789
790fn extract_impl_summary(output: &str) -> String {
796 let idx = output.find("## PR Template").or_else(|| output.find("## Changes Made"));
798
799 if let Some(idx) = idx {
800 let summary = output[idx..].trim();
801 let summary = summary.strip_prefix("## PR Template").map_or(summary, |s| s.trim_start());
803 if summary.len() <= 4000 {
804 return summary.to_string();
805 }
806 return truncate(summary, 4000);
807 }
808 if output.trim().is_empty() {
810 return String::from("*No implementation summary available.*");
811 }
812 truncate(output.trim(), 2000)
813}
814
815fn new_run(run_id: &str, issue: &PipelineIssue, auto_merge: bool) -> Run {
816 Run {
817 id: run_id.to_string(),
818 issue_number: issue.number,
819 status: RunStatus::Pending,
820 pr_number: None,
821 branch: None,
822 worktree_path: None,
823 cost_usd: 0.0,
824 auto_merge,
825 started_at: chrono::Utc::now().to_rfc3339(),
826 finished_at: None,
827 error_message: None,
828 complexity: "full".to_string(),
829 issue_source: issue.source.to_string(),
830 }
831}
832
833pub fn generate_run_id() -> String {
835 uuid::Uuid::new_v4().to_string()[..8].to_string()
836}
837
838pub(crate) fn truncate(s: &str, max_len: usize) -> String {
843 if s.len() <= max_len {
844 return s.to_string();
845 }
846 let target = max_len.saturating_sub(3);
847 let mut end = target;
848 while end > 0 && !s.is_char_boundary(end) {
849 end -= 1;
850 }
851 format!("{}...", &s[..end])
852}
853
854#[cfg(test)]
855mod tests {
856 use proptest::prelude::*;
857
858 use super::*;
859
860 proptest! {
861 #[test]
862 fn run_ids_always_8_hex_chars(_seed in any::<u64>()) {
863 let id = generate_run_id();
864 prop_assert_eq!(id.len(), 8);
865 prop_assert!(id.chars().all(|c| c.is_ascii_hexdigit()));
866 }
867 }
868
869 #[test]
870 fn run_id_is_8_hex_chars() {
871 let id = generate_run_id();
872 assert_eq!(id.len(), 8);
873 assert!(id.chars().all(|c| c.is_ascii_hexdigit()));
874 }
875
876 #[test]
877 fn run_ids_are_unique() {
878 let ids: Vec<_> = (0..100).map(|_| generate_run_id()).collect();
879 let unique: std::collections::HashSet<_> = ids.iter().collect();
880 assert_eq!(ids.len(), unique.len());
881 }
882
883 #[test]
884 fn truncate_short_string() {
885 assert_eq!(truncate("hello", 10), "hello");
886 }
887
888 #[test]
889 fn truncate_long_string() {
890 let long = "a".repeat(100);
891 let result = truncate(&long, 10);
892 assert_eq!(result.len(), 10); assert!(result.ends_with("..."));
894 }
895
896 #[test]
897 fn truncate_multibyte_does_not_panic() {
898 let s = "πππ";
901 let result = truncate(s, 8);
902 assert!(result.ends_with("..."));
903 assert!(result.starts_with("π"));
904 assert!(result.len() <= 8);
905 }
906
907 #[test]
908 fn truncate_cjk_boundary() {
909 let s = "δ½ ε₯½δΈηζ΅θ―"; let result = truncate(s, 10);
913 assert!(result.ends_with("..."));
914 assert!(result.starts_with("δ½ ε₯½"));
915 assert!(result.len() <= 10);
916 }
917
918 #[test]
919 fn extract_impl_summary_finds_changes_made() {
920 let output = "Some preamble text\n\n## Changes Made\n- src/foo.rs: added bar\n\n## Tests Added\n- tests/foo.rs: bar test\n";
921 let summary = extract_impl_summary(output);
922 assert!(summary.starts_with("## Changes Made"));
923 assert!(summary.contains("added bar"));
924 assert!(summary.contains("## Tests Added"));
925 }
926
927 #[test]
928 fn extract_impl_summary_prefers_pr_template() {
929 let output = "Preamble\n\n## PR Template\n## Summary\n- Added auth flow\n\n## Testing\n- Unit tests pass\n";
930 let summary = extract_impl_summary(output);
931 assert!(!summary.contains("## PR Template"));
933 assert!(summary.starts_with("## Summary"));
934 assert!(summary.contains("Added auth flow"));
935 }
936
937 #[test]
938 fn extract_impl_summary_fallback_on_no_heading() {
939 let output = "just some raw agent output with no structure";
940 let summary = extract_impl_summary(output);
941 assert!(summary.contains("just some raw agent output"));
942 }
943
944 #[test]
945 fn extract_impl_summary_empty_output() {
946 assert_eq!(extract_impl_summary(""), "*No implementation summary available.*");
947 assert_eq!(extract_impl_summary(" "), "*No implementation summary available.*");
948 }
949
950 #[test]
951 fn build_pr_body_github_issue() {
952 let ctx = AgentContext {
953 issue_number: 42,
954 issue_title: "fix the thing".to_string(),
955 issue_body: String::new(),
956 branch: "oven/issue-42".to_string(),
957 pr_number: Some(10),
958 test_command: None,
959 lint_command: None,
960 review_findings: None,
961 cycle: 1,
962 target_repo: None,
963 issue_source: "github".to_string(),
964 base_branch: "main".to_string(),
965 };
966 let body = build_pr_body("## Changes Made\n- added stuff", &ctx);
967 assert!(body.contains("Resolves #42"));
968 assert!(body.contains("## Changes Made"));
969 assert!(body.contains("Automated by [oven]"));
970 }
971
972 #[test]
973 fn build_pr_body_local_issue() {
974 let ctx = AgentContext {
975 issue_number: 7,
976 issue_title: "local thing".to_string(),
977 issue_body: String::new(),
978 branch: "oven/issue-7".to_string(),
979 pr_number: Some(10),
980 test_command: None,
981 lint_command: None,
982 review_findings: None,
983 cycle: 1,
984 target_repo: None,
985 issue_source: "local".to_string(),
986 base_branch: "main".to_string(),
987 };
988 let body = build_pr_body("raw output", &ctx);
989 assert!(body.contains("From local issue #7"));
990 }
991
992 #[test]
993 fn pr_title_github() {
994 let ctx = AgentContext {
995 issue_number: 42,
996 issue_title: "fix the thing".to_string(),
997 issue_body: String::new(),
998 branch: String::new(),
999 pr_number: None,
1000 test_command: None,
1001 lint_command: None,
1002 review_findings: None,
1003 cycle: 1,
1004 target_repo: None,
1005 issue_source: "github".to_string(),
1006 base_branch: "main".to_string(),
1007 };
1008 assert_eq!(pr_title(&ctx), "fix(#42): fix the thing");
1009 }
1010
1011 #[test]
1012 fn pr_title_local() {
1013 let ctx = AgentContext {
1014 issue_number: 7,
1015 issue_title: "local thing".to_string(),
1016 issue_body: String::new(),
1017 branch: String::new(),
1018 pr_number: None,
1019 test_command: None,
1020 lint_command: None,
1021 review_findings: None,
1022 cycle: 1,
1023 target_repo: None,
1024 issue_source: "local".to_string(),
1025 base_branch: "main".to_string(),
1026 };
1027 assert_eq!(pr_title(&ctx), "fix: local thing");
1028 }
1029
1030 #[test]
1031 fn infer_commit_type_feat() {
1032 assert_eq!(infer_commit_type("Add dark mode support"), "feat");
1033 assert_eq!(infer_commit_type("Implement caching layer"), "feat");
1034 assert_eq!(infer_commit_type("Feature: new dashboard"), "feat");
1035 }
1036
1037 #[test]
1038 fn infer_commit_type_refactor() {
1039 assert_eq!(infer_commit_type("Refactor auth middleware"), "refactor");
1040 }
1041
1042 #[test]
1043 fn infer_commit_type_docs() {
1044 assert_eq!(infer_commit_type("Document the API endpoints"), "docs");
1045 assert_eq!(infer_commit_type("Docs: update README"), "docs");
1046 }
1047
1048 #[test]
1049 fn infer_commit_type_defaults_to_fix() {
1050 assert_eq!(infer_commit_type("Null pointer in config parser"), "fix");
1051 assert_eq!(infer_commit_type("Crash on empty input"), "fix");
1052 }
1053
1054 #[test]
1055 fn pr_title_feat_github() {
1056 let ctx = AgentContext {
1057 issue_number: 10,
1058 issue_title: "Add dark mode".to_string(),
1059 issue_body: String::new(),
1060 branch: String::new(),
1061 pr_number: None,
1062 test_command: None,
1063 lint_command: None,
1064 review_findings: None,
1065 cycle: 1,
1066 target_repo: None,
1067 issue_source: "github".to_string(),
1068 base_branch: "main".to_string(),
1069 };
1070 assert_eq!(pr_title(&ctx), "feat(#10): Add dark mode");
1071 }
1072
1073 #[test]
1074 fn format_unresolved_comment_groups_by_severity() {
1075 let findings = [
1076 agents::Finding {
1077 severity: Severity::Critical,
1078 category: "bug".to_string(),
1079 file_path: Some("src/main.rs".to_string()),
1080 line_number: Some(42),
1081 message: "null pointer".to_string(),
1082 },
1083 agents::Finding {
1084 severity: Severity::Warning,
1085 category: "style".to_string(),
1086 file_path: None,
1087 line_number: None,
1088 message: "missing docs".to_string(),
1089 },
1090 ];
1091 let refs: Vec<_> = findings.iter().collect();
1092 let comment = format_unresolved_comment(&refs);
1093 assert!(comment.contains("### Critical"));
1094 assert!(comment.contains("### Warning"));
1095 assert!(comment.contains("**bug** in `src/main.rs:42` -- null pointer"));
1096 assert!(comment.contains("**style** -- missing docs"));
1097 assert!(comment.contains("Automated by [oven]"));
1098 }
1099
1100 #[test]
1101 fn format_unresolved_comment_skips_empty_severity_groups() {
1102 let findings = [agents::Finding {
1103 severity: Severity::Warning,
1104 category: "testing".to_string(),
1105 file_path: Some("src/lib.rs".to_string()),
1106 line_number: None,
1107 message: "missing edge case test".to_string(),
1108 }];
1109 let refs: Vec<_> = findings.iter().collect();
1110 let comment = format_unresolved_comment(&refs);
1111 assert!(!comment.contains("### Critical"));
1112 assert!(comment.contains("### Warning"));
1113 }
1114
1115 #[test]
1116 fn format_pipeline_failure_includes_error() {
1117 let err = anyhow::anyhow!("cost budget exceeded: $12.50 > $10.00");
1118 let comment = format_pipeline_failure(&err);
1119 assert!(comment.contains("## Pipeline failed"));
1120 assert!(comment.contains("cost budget exceeded"));
1121 assert!(comment.contains("Automated by [oven]"));
1122 }
1123
1124 #[test]
1125 fn format_rebase_failure_includes_error() {
1126 let err = anyhow::anyhow!("merge conflict in src/config/mod.rs");
1127 let comment = format_rebase_failure(&err);
1128 assert!(comment.contains("## Pipeline stopped: rebase conflict"));
1129 assert!(comment.contains("merge conflict"));
1130 assert!(comment.contains("Rebase manually"));
1131 }
1132
1133 #[test]
1134 fn format_review_parse_failure_includes_cycle() {
1135 let comment = format_review_parse_failure(2);
1136 assert!(comment.contains("## Pipeline stopped: review output error"));
1137 assert!(comment.contains("cycle 2"));
1138 assert!(comment.contains("Re-run the pipeline"));
1139 }
1140}