1use std::{
2 collections::HashSet,
3 path::{Path, PathBuf},
4 sync::Arc,
5 time::Duration,
6};
7
8use anyhow::Result;
9use tokio::{sync::Semaphore, task::JoinSet};
10use tokio_util::sync::CancellationToken;
11use tracing::{error, info, warn};
12
13use super::{
14 executor::{PipelineExecutor, PipelineOutcome},
15 graph::DependencyGraph,
16};
17use crate::{
18 agents::Complexity,
19 db::graph::NodeState,
20 git,
21 issues::PipelineIssue,
22 pipeline::{executor::generate_run_id, graph::GraphNode},
23 process::CommandRunner,
24};
25
26struct SchedulerState {
31 graph: DependencyGraph,
32 semaphore: Arc<Semaphore>,
33 tasks: JoinSet<(u32, Result<PipelineOutcome>)>,
34}
35
36pub async fn run_batch<R: CommandRunner + 'static>(
43 executor: &Arc<PipelineExecutor<R>>,
44 issues: Vec<PipelineIssue>,
45 max_parallel: usize,
46 auto_merge: bool,
47) -> Result<()> {
48 let session_id = generate_run_id();
49 let mut graph = if let Some(plan) = executor.plan_issues(&issues, &[]).await {
50 info!(nodes = plan.nodes.len(), total = plan.total_issues, "planner produced a plan");
51 DependencyGraph::from_planner_output(&session_id, &plan, &issues)
52 } else {
53 warn!("planner failed, falling back to all-parallel execution");
54 let mut g = DependencyGraph::new(&session_id);
55 for issue in &issues {
56 g.add_node(standalone_node(issue));
57 }
58 g
59 };
60
61 save_graph(&graph, executor).await;
62
63 let semaphore = Arc::new(Semaphore::new(max_parallel));
64 let mut had_errors = false;
65
66 while !graph.all_terminal() {
67 let ready = graph.ready_issues();
68 if ready.is_empty() {
69 warn!("no ready issues but graph is not terminal, breaking to avoid infinite loop");
70 save_graph(&graph, executor).await;
71 break;
72 }
73
74 let mut tasks: JoinSet<(u32, Result<PipelineOutcome>)> = JoinSet::new();
75
76 for num in &ready {
77 graph.transition(*num, NodeState::InFlight);
78 }
79 save_graph(&graph, executor).await;
80
81 for num in ready {
82 let node = graph.node(num).expect("ready issue must exist in graph");
83 let issue = node.issue.clone().expect("batch issues have issue attached");
84 let complexity = node.complexity.parse::<Complexity>().ok();
85 let sem = Arc::clone(&semaphore);
86 let exec = Arc::clone(executor);
87
88 tasks.spawn(async move {
89 let permit = match sem.acquire_owned().await {
90 Ok(p) => p,
91 Err(e) => return (num, Err(anyhow::anyhow!("semaphore closed: {e}"))),
92 };
93 let result = exec.run_issue_pipeline(&issue, auto_merge, complexity).await;
94 let outcome = match result {
95 Ok(outcome) => {
96 if let Err(e) = exec.finalize_merge(&outcome, &issue).await {
97 warn!(issue = num, error = %e, "finalize_merge failed");
98 }
99 Ok(outcome)
100 }
101 Err(e) => Err(e),
102 };
103 drop(permit);
104 (num, outcome)
105 });
106 }
107
108 let mut merged_target_dirs: HashSet<PathBuf> = HashSet::new();
109 while let Some(join_result) = tasks.join_next().await {
110 match join_result {
111 Ok((number, Ok(ref outcome))) => {
112 info!(issue = number, "pipeline completed successfully");
113 graph.set_pr_number(number, outcome.pr_number);
114 graph.set_run_id(number, &outcome.run_id);
115 graph.transition(number, NodeState::Merged);
116 if auto_merge {
117 merged_target_dirs.insert(outcome.target_dir.clone());
118 }
119 }
120 Ok((number, Err(ref e))) => {
121 error!(issue = number, error = %e, "pipeline failed for issue");
122 graph.transition(number, NodeState::Failed);
123 let blocked = graph.propagate_failure(number);
124 for b in &blocked {
125 warn!(issue = b, blocked_by = number, "transitively failed");
126 }
127 had_errors = true;
128 }
129 Err(e) => {
130 error!(error = %e, "pipeline task panicked");
131 had_errors = true;
132 }
133 }
134 }
135
136 if !merged_target_dirs.is_empty() && !graph.all_terminal() {
139 fetch_base_branches(&merged_target_dirs).await;
140 }
141
142 save_graph(&graph, executor).await;
143 }
144
145 if had_errors {
146 anyhow::bail!("one or more pipelines failed in batch");
147 }
148 Ok(())
149}
150
151pub async fn polling_loop<R: CommandRunner + 'static>(
158 executor: Arc<PipelineExecutor<R>>,
159 auto_merge: bool,
160 cancel_token: CancellationToken,
161) -> Result<()> {
162 let poll_interval = Duration::from_secs(executor.config.pipeline.poll_interval);
163 let max_parallel = executor.config.pipeline.max_parallel as usize;
164 let ready_label = executor.config.labels.ready.clone();
165
166 let graph = load_or_create_graph(&executor).await;
168
169 let mut sched = SchedulerState {
170 graph,
171 semaphore: Arc::new(Semaphore::new(max_parallel)),
172 tasks: JoinSet::new(),
173 };
174
175 info!(poll_interval_secs = poll_interval.as_secs(), max_parallel, "continuous polling started");
176
177 loop {
178 tokio::select! {
179 () = cancel_token.cancelled() => {
180 info!("shutdown signal received, waiting for in-flight pipelines");
181 drain_tasks(&mut sched, &executor).await;
182 break;
183 }
184 () = tokio::time::sleep(poll_interval) => {
185 poll_and_spawn(&executor, &ready_label, &mut sched, auto_merge).await;
186 }
187 Some(result) = sched.tasks.join_next(), if !sched.tasks.is_empty() => {
188 handle_task_result(result, &mut sched.graph, &executor).await;
189 }
190 }
191 }
192
193 Ok(())
194}
195
196async fn load_or_create_graph<R: CommandRunner>(
198 executor: &Arc<PipelineExecutor<R>>,
199) -> DependencyGraph {
200 let conn = executor.db.lock().await;
201 match crate::db::graph::get_active_session(&conn) {
202 Ok(Some(session_id)) => match DependencyGraph::from_db(&conn, &session_id) {
203 Ok(graph) => {
204 info!(session_id = %session_id, nodes = graph.node_count(), "resumed existing graph session");
205 return graph;
206 }
207 Err(e) => {
208 warn!(error = %e, "failed to load graph session, starting fresh");
209 }
210 },
211 Ok(None) => {}
212 Err(e) => {
213 warn!(error = %e, "failed to check for active graph session");
214 }
215 }
216 let session_id = generate_run_id();
217 info!(session_id = %session_id, "starting new graph session");
218 DependencyGraph::new(&session_id)
219}
220
221async fn drain_tasks<R: CommandRunner>(
223 sched: &mut SchedulerState,
224 executor: &Arc<PipelineExecutor<R>>,
225) {
226 while let Some(result) = sched.tasks.join_next().await {
227 handle_task_result(result, &mut sched.graph, executor).await;
228 }
229}
230
231async fn handle_task_result<R: CommandRunner>(
233 result: Result<(u32, Result<PipelineOutcome>), tokio::task::JoinError>,
234 graph: &mut DependencyGraph,
235 executor: &Arc<PipelineExecutor<R>>,
236) {
237 match result {
238 Ok((number, Ok(ref outcome))) => {
239 info!(issue = number, "pipeline completed successfully");
240 graph.set_pr_number(number, outcome.pr_number);
241 graph.set_run_id(number, &outcome.run_id);
242 graph.transition(number, NodeState::AwaitingMerge);
243 }
244 Ok((number, Err(ref e))) => {
245 error!(issue = number, error = %e, "pipeline failed for issue");
246 graph.transition(number, NodeState::Failed);
247 let blocked = graph.propagate_failure(number);
248 for b in &blocked {
249 warn!(issue = b, blocked_by = number, "transitively failed");
250 }
251 }
252 Err(e) => {
253 error!(error = %e, "pipeline task panicked");
254 return;
255 }
256 }
257 save_graph(graph, executor).await;
258}
259
260async fn poll_awaiting_merges<R: CommandRunner + 'static>(
263 graph: &mut DependencyGraph,
264 executor: &Arc<PipelineExecutor<R>>,
265) {
266 let awaiting = graph.awaiting_merge();
267 if awaiting.is_empty() {
268 return;
269 }
270
271 let mut merged_target_dirs: HashSet<PathBuf> = HashSet::new();
272 for num in awaiting {
273 let Some(node) = graph.node(num) else { continue };
274 let Some(pr_number) = node.pr_number else {
275 warn!(issue = num, "AwaitingMerge node has no PR number, skipping");
276 continue;
277 };
278 let run_id = node.run_id.clone().unwrap_or_default();
279 let issue = node.issue.clone();
280 let target_repo = node.target_repo.clone();
281
282 let pr_repo_dir = match executor.resolve_target_dir(target_repo.as_ref()) {
285 Ok((dir, _)) => dir,
286 Err(e) => {
287 warn!(issue = num, error = %e, "failed to resolve target dir for PR state check");
288 continue;
289 }
290 };
291
292 let pr_state = match executor.github.get_pr_state_in(pr_number, &pr_repo_dir).await {
293 Ok(s) => s,
294 Err(e) => {
295 warn!(issue = num, pr = pr_number, error = %e, "failed to check PR state");
296 continue;
297 }
298 };
299
300 match pr_state {
301 crate::github::PrState::Merged => {
302 info!(issue = num, pr = pr_number, "PR merged, finalizing");
303 if let Some(ref issue) = issue {
304 match executor.reconstruct_outcome(issue, &run_id, pr_number).await {
305 Ok(outcome) => {
306 if let Err(e) = executor.finalize_merge(&outcome, issue).await {
307 warn!(issue = num, error = %e, "finalize_merge after poll failed");
308 }
309 }
310 Err(e) => {
311 warn!(issue = num, error = %e, "failed to reconstruct outcome");
312 }
313 }
314 } else {
315 warn!(
316 issue = num,
317 pr = pr_number,
318 "node restored from DB has no PipelineIssue, \
319 skipping finalization (labels and worktree may need manual cleanup)"
320 );
321 }
322 graph.transition(num, NodeState::Merged);
323 merged_target_dirs.insert(pr_repo_dir);
324 }
325 crate::github::PrState::Closed => {
326 warn!(issue = num, pr = pr_number, "PR closed without merge, marking failed");
327 graph.transition(num, NodeState::Failed);
328 let blocked = graph.propagate_failure(num);
329 for b in &blocked {
330 warn!(issue = b, blocked_by = num, "transitively failed (PR closed)");
331 }
332 }
333 crate::github::PrState::Open => {
334 }
336 }
337 }
338
339 if !merged_target_dirs.is_empty() {
342 fetch_base_branches(&merged_target_dirs).await;
343 }
344
345 save_graph(graph, executor).await;
346}
347
348async fn poll_and_spawn<R: CommandRunner + 'static>(
350 executor: &Arc<PipelineExecutor<R>>,
351 ready_label: &str,
352 sched: &mut SchedulerState,
353 auto_merge: bool,
354) {
355 poll_awaiting_merges(&mut sched.graph, executor).await;
357
358 let ready_issues = match executor.issues.get_ready_issues(ready_label).await {
359 Ok(i) => i,
360 Err(e) => {
361 error!(error = %e, "failed to fetch issues");
362 return;
363 }
364 };
365
366 let ready_numbers: HashSet<u32> = ready_issues.iter().map(|i| i.number).collect();
367
368 clean_stale_nodes(&mut sched.graph, &ready_numbers);
370
371 let new_issues: Vec<_> =
373 ready_issues.into_iter().filter(|i| !sched.graph.contains(i.number)).collect();
374
375 if !new_issues.is_empty() {
377 info!(count = new_issues.len(), "found new issues to evaluate");
378 let graph_context = sched.graph.to_graph_context();
379
380 if let Some(plan) = executor.plan_issues(&new_issues, &graph_context).await {
381 info!(nodes = plan.nodes.len(), total = plan.total_issues, "planner produced a plan");
382 sched.graph.merge_planner_output(&plan, &new_issues);
383 } else {
384 warn!("planner failed, adding all new issues as independent nodes");
385 add_independent_nodes(&mut sched.graph, &new_issues);
386 }
387
388 save_graph(&sched.graph, executor).await;
389 }
390
391 let to_spawn = collect_ready_issues(&mut sched.graph);
393 if to_spawn.is_empty() {
394 if new_issues.is_empty() {
395 info!("no actionable issues, waiting");
396 }
397 return;
398 }
399
400 save_graph(&sched.graph, executor).await;
401 spawn_issues(to_spawn, executor, sched, auto_merge);
402}
403
404fn clean_stale_nodes(graph: &mut DependencyGraph, ready_numbers: &HashSet<u32>) {
406 let stale: Vec<u32> = graph
407 .all_issues()
408 .into_iter()
409 .filter(|num| {
410 !ready_numbers.contains(num)
411 && graph.node(*num).is_some_and(|n| n.state == NodeState::Pending)
412 })
413 .collect();
414 if !stale.is_empty() {
415 info!(count = stale.len(), "removing stale pending nodes");
416 for num in stale {
417 graph.remove_node(num);
418 }
419 }
420}
421
422fn add_independent_nodes(graph: &mut DependencyGraph, issues: &[PipelineIssue]) {
424 for issue in issues {
425 if !graph.contains(issue.number) {
426 graph.add_node(standalone_node(issue));
427 }
428 }
429}
430
431fn collect_ready_issues(graph: &mut DependencyGraph) -> Vec<(u32, PipelineIssue, Complexity)> {
433 let ready = graph.ready_issues();
434 let mut to_spawn = Vec::new();
435
436 for num in ready {
437 let Some(node) = graph.node(num) else { continue };
438 let Some(issue) = node.issue.clone() else {
439 warn!(issue = num, "ready node has no PipelineIssue attached, skipping");
440 continue;
441 };
442 let complexity = node.complexity.parse::<Complexity>().unwrap_or(Complexity::Full);
443 graph.transition(num, NodeState::InFlight);
444 to_spawn.push((num, issue, complexity));
445 }
446
447 to_spawn
448}
449
450fn spawn_issues<R: CommandRunner + 'static>(
452 to_spawn: Vec<(u32, PipelineIssue, Complexity)>,
453 executor: &Arc<PipelineExecutor<R>>,
454 sched: &mut SchedulerState,
455 auto_merge: bool,
456) {
457 for (number, issue, complexity) in to_spawn {
458 let sem = Arc::clone(&sched.semaphore);
459 let exec = Arc::clone(executor);
460
461 sched.tasks.spawn(async move {
462 let permit = match sem.acquire_owned().await {
463 Ok(p) => p,
464 Err(e) => return (number, Err(anyhow::anyhow!("semaphore closed: {e}"))),
465 };
466 let outcome = exec.run_issue_pipeline(&issue, auto_merge, Some(complexity)).await;
467 drop(permit);
468 (number, outcome)
469 });
470 }
471}
472
473fn standalone_node(issue: &PipelineIssue) -> GraphNode {
475 GraphNode {
476 issue_number: issue.number,
477 title: issue.title.clone(),
478 area: String::new(),
479 predicted_files: Vec::new(),
480 has_migration: false,
481 complexity: Complexity::Full.to_string(),
482 state: NodeState::Pending,
483 pr_number: None,
484 run_id: None,
485 target_repo: issue.target_repo.clone(),
486 issue: Some(issue.clone()),
487 }
488}
489
490async fn fetch_base_branches(repo_dirs: &HashSet<PathBuf>) {
497 for repo_dir in repo_dirs {
498 fetch_base_branch_in(repo_dir).await;
499 }
500}
501
502async fn fetch_base_branch_in(repo_dir: &Path) {
504 match git::default_branch(repo_dir).await {
505 Ok(branch) => {
506 if let Err(e) = git::fetch_branch(repo_dir, &branch).await {
507 warn!(
508 repo = %repo_dir.display(), error = %e,
509 "failed to fetch base branch after merge"
510 );
511 } else {
512 if let Err(e) = git::advance_local_branch(repo_dir, &branch).await {
513 warn!(
514 repo = %repo_dir.display(), branch = %branch, error = %e,
515 "failed to advance local branch after fetch"
516 );
517 }
518 info!(
519 repo = %repo_dir.display(), branch = %branch,
520 "updated base branch after merge"
521 );
522 }
523 }
524 Err(e) => {
525 warn!(
526 repo = %repo_dir.display(), error = %e,
527 "failed to detect base branch for post-merge fetch"
528 );
529 }
530 }
531}
532
533async fn save_graph<R: CommandRunner>(
535 graph: &DependencyGraph,
536 executor: &Arc<PipelineExecutor<R>>,
537) {
538 let conn = executor.db.lock().await;
539 if let Err(e) = graph.save_to_db(&conn) {
540 warn!(error = %e, "failed to persist dependency graph");
541 }
542}
543
544#[cfg(test)]
545mod tests {
546 use std::path::PathBuf;
547
548 use tokio::sync::Mutex;
549
550 use super::*;
551 use crate::{
552 agents::PlannerGraphOutput,
553 config::Config,
554 github::GhClient,
555 issues::{IssueOrigin, IssueProvider, github::GithubIssueProvider},
556 process::{AgentResult, CommandOutput, MockCommandRunner},
557 };
558
559 fn mock_runner_for_batch() -> MockCommandRunner {
560 let mut mock = MockCommandRunner::new();
561 mock.expect_run_gh().returning(|_, _| {
562 Box::pin(async {
563 Ok(CommandOutput {
564 stdout: "https://github.com/user/repo/pull/1\n".to_string(),
565 stderr: String::new(),
566 success: true,
567 })
568 })
569 });
570 mock.expect_run_claude().returning(|_, _, _, _, _| {
571 Box::pin(async {
572 Ok(AgentResult {
573 cost_usd: 1.0,
574 duration: Duration::from_secs(5),
575 turns: 3,
576 output: r#"{"findings":[],"summary":"clean"}"#.to_string(),
577 session_id: "sess-1".to_string(),
578 success: true,
579 })
580 })
581 });
582 mock
583 }
584
585 fn make_github_provider(gh: &Arc<GhClient<MockCommandRunner>>) -> Arc<dyn IssueProvider> {
586 Arc::new(GithubIssueProvider::new(Arc::clone(gh), "target_repo"))
587 }
588
589 fn make_issue(number: u32) -> PipelineIssue {
590 PipelineIssue {
591 number,
592 title: format!("Issue #{number}"),
593 body: String::new(),
594 source: IssueOrigin::Github,
595 target_repo: None,
596 author: None,
597 }
598 }
599
600 #[tokio::test]
601 async fn cancellation_stops_polling() {
602 let cancel = CancellationToken::new();
603 let runner = Arc::new(mock_runner_for_batch());
604 let github = Arc::new(GhClient::new(mock_runner_for_batch(), std::path::Path::new("/tmp")));
605 let issues = make_github_provider(&github);
606 let db = Arc::new(Mutex::new(crate::db::open_in_memory().unwrap()));
607
608 let mut config = Config::default();
609 config.pipeline.poll_interval = 3600; let executor = Arc::new(PipelineExecutor {
612 runner,
613 github,
614 issues,
615 db,
616 config,
617 cancel_token: cancel.clone(),
618 repo_dir: PathBuf::from("/tmp"),
619 });
620
621 let cancel_clone = cancel.clone();
622 let handle = tokio::spawn(async move { polling_loop(executor, false, cancel_clone).await });
623
624 cancel.cancel();
626
627 let result = handle.await.unwrap();
628 assert!(result.is_ok());
629 }
630
631 #[tokio::test]
632 async fn cancellation_exits_within_timeout() {
633 let cancel = CancellationToken::new();
634 let runner = Arc::new(mock_runner_for_batch());
635 let github = Arc::new(GhClient::new(mock_runner_for_batch(), std::path::Path::new("/tmp")));
636 let issues = make_github_provider(&github);
637 let db = Arc::new(Mutex::new(crate::db::open_in_memory().unwrap()));
638
639 let mut config = Config::default();
640 config.pipeline.poll_interval = 3600;
641
642 let executor = Arc::new(PipelineExecutor {
643 runner,
644 github,
645 issues,
646 db,
647 config,
648 cancel_token: cancel.clone(),
649 repo_dir: PathBuf::from("/tmp"),
650 });
651
652 let cancel_clone = cancel.clone();
653 let handle = tokio::spawn(async move { polling_loop(executor, false, cancel_clone).await });
654
655 cancel.cancel();
656
657 let result = tokio::time::timeout(Duration::from_secs(5), handle)
658 .await
659 .expect("polling loop should exit within timeout")
660 .unwrap();
661 assert!(result.is_ok());
662 }
663
664 #[test]
665 fn handle_task_success_transitions_to_awaiting_merge() {
666 let rt = tokio::runtime::Builder::new_current_thread().build().unwrap();
667 rt.block_on(async {
668 let executor = {
669 let runner = Arc::new(mock_runner_for_batch());
670 let github =
671 Arc::new(GhClient::new(mock_runner_for_batch(), std::path::Path::new("/tmp")));
672 let issues = make_github_provider(&github);
673 let db = Arc::new(Mutex::new(crate::db::open_in_memory().unwrap()));
674 Arc::new(PipelineExecutor {
675 runner,
676 github,
677 issues,
678 db,
679 config: Config::default(),
680 cancel_token: CancellationToken::new(),
681 repo_dir: PathBuf::from("/tmp"),
682 })
683 };
684
685 let mut graph = DependencyGraph::new("test");
686 graph.add_node(standalone_node(&make_issue(1)));
687 graph.transition(1, NodeState::InFlight);
688
689 let outcome = PipelineOutcome {
690 run_id: "run-abc".to_string(),
691 pr_number: 42,
692 branch: Some("oven/issue-1-abc12345".to_string()),
693 worktree_path: PathBuf::from("/tmp/wt"),
694 target_dir: PathBuf::from("/tmp"),
695 };
696
697 handle_task_result(Ok((1, Ok(outcome))), &mut graph, &executor).await;
698
699 assert_eq!(graph.node(1).unwrap().state, NodeState::AwaitingMerge);
700 assert_eq!(graph.node(1).unwrap().pr_number, Some(42));
701 assert_eq!(graph.node(1).unwrap().run_id.as_deref(), Some("run-abc"));
702 });
703 }
704
705 #[test]
706 fn handle_task_failure_propagates_to_dependents() {
707 let rt = tokio::runtime::Builder::new_current_thread().build().unwrap();
708 rt.block_on(async {
709 let executor = {
710 let runner = Arc::new(mock_runner_for_batch());
711 let github =
712 Arc::new(GhClient::new(mock_runner_for_batch(), std::path::Path::new("/tmp")));
713 let issues = make_github_provider(&github);
714 let db = Arc::new(Mutex::new(crate::db::open_in_memory().unwrap()));
715 Arc::new(PipelineExecutor {
716 runner,
717 github,
718 issues,
719 db,
720 config: Config::default(),
721 cancel_token: CancellationToken::new(),
722 repo_dir: PathBuf::from("/tmp"),
723 })
724 };
725
726 let plan = PlannerGraphOutput {
727 nodes: vec![
728 crate::agents::PlannedNode {
729 number: 1,
730 title: "Root".to_string(),
731 area: "a".to_string(),
732 predicted_files: vec![],
733 has_migration: false,
734 complexity: Complexity::Full,
735 depends_on: vec![],
736 reasoning: String::new(),
737 },
738 crate::agents::PlannedNode {
739 number: 2,
740 title: "Dep".to_string(),
741 area: "b".to_string(),
742 predicted_files: vec![],
743 has_migration: false,
744 complexity: Complexity::Full,
745 depends_on: vec![1],
746 reasoning: String::new(),
747 },
748 ],
749 total_issues: 2,
750 parallel_capacity: 1,
751 };
752 let issues = vec![make_issue(1), make_issue(2)];
753 let mut graph = DependencyGraph::from_planner_output("test", &plan, &issues);
754 graph.transition(1, NodeState::InFlight);
755
756 handle_task_result(
757 Ok((1, Err(anyhow::anyhow!("pipeline failed")))),
758 &mut graph,
759 &executor,
760 )
761 .await;
762
763 assert_eq!(graph.node(1).unwrap().state, NodeState::Failed);
764 assert_eq!(graph.node(2).unwrap().state, NodeState::Failed);
765 });
766 }
767
768 #[test]
769 fn stale_node_removed_when_issue_disappears() {
770 let mut graph = DependencyGraph::new("test");
771 graph.add_node(standalone_node(&make_issue(1)));
772 graph.add_node(standalone_node(&make_issue(2)));
773 graph.add_node(standalone_node(&make_issue(3)));
774 graph.transition(2, NodeState::InFlight);
775
776 let ready_numbers: HashSet<u32> = HashSet::from([1, 2]);
778 clean_stale_nodes(&mut graph, &ready_numbers);
779
780 assert!(graph.contains(1)); assert!(graph.contains(2)); assert!(!graph.contains(3)); }
784
785 #[test]
786 fn collect_ready_issues_transitions_to_in_flight() {
787 let mut graph = DependencyGraph::new("test");
788 graph.add_node(standalone_node(&make_issue(1)));
789 graph.add_node(standalone_node(&make_issue(2)));
790
791 let spawnable = collect_ready_issues(&mut graph);
792 assert_eq!(spawnable.len(), 2);
793
794 assert_eq!(graph.node(1).unwrap().state, NodeState::InFlight);
796 assert_eq!(graph.node(2).unwrap().state, NodeState::InFlight);
797
798 assert!(collect_ready_issues(&mut graph).is_empty());
800 }
801
802 #[tokio::test]
803 async fn planner_failure_falls_back_to_all_parallel() {
804 let mut mock = MockCommandRunner::new();
805 mock.expect_run_gh().returning(|_, _| {
806 Box::pin(async {
807 Ok(CommandOutput { stdout: String::new(), stderr: String::new(), success: true })
808 })
809 });
810 mock.expect_run_claude().returning(|_, _, _, _, _| {
811 Box::pin(async {
812 Ok(AgentResult {
813 cost_usd: 0.5,
814 duration: Duration::from_secs(2),
815 turns: 1,
816 output: "I don't know how to plan".to_string(),
817 session_id: "sess-plan".to_string(),
818 success: true,
819 })
820 })
821 });
822
823 let runner = Arc::new(mock);
824 let github = Arc::new(GhClient::new(mock_runner_for_batch(), std::path::Path::new("/tmp")));
825 let issues_provider = make_github_provider(&github);
826 let db = Arc::new(Mutex::new(crate::db::open_in_memory().unwrap()));
827
828 let executor = Arc::new(PipelineExecutor {
829 runner,
830 github,
831 issues: issues_provider,
832 db,
833 config: Config::default(),
834 cancel_token: CancellationToken::new(),
835 repo_dir: PathBuf::from("/tmp"),
836 });
837
838 let issues = vec![PipelineIssue {
839 number: 1,
840 title: "Test".to_string(),
841 body: "body".to_string(),
842 source: IssueOrigin::Github,
843 target_repo: None,
844 author: None,
845 }];
846
847 let plan = executor.plan_issues(&issues, &[]).await;
849 assert!(plan.is_none());
850 }
851
852 #[test]
853 fn graph_persisted_after_state_change() {
854 let rt = tokio::runtime::Builder::new_current_thread().build().unwrap();
855 rt.block_on(async {
856 let db = Arc::new(Mutex::new(crate::db::open_in_memory().unwrap()));
857 let runner = Arc::new(mock_runner_for_batch());
858 let github =
859 Arc::new(GhClient::new(mock_runner_for_batch(), std::path::Path::new("/tmp")));
860 let issues = make_github_provider(&github);
861 let executor = Arc::new(PipelineExecutor {
862 runner,
863 github,
864 issues,
865 db: Arc::clone(&db),
866 config: Config::default(),
867 cancel_token: CancellationToken::new(),
868 repo_dir: PathBuf::from("/tmp"),
869 });
870
871 let mut graph = DependencyGraph::new("persist-test");
872 graph.add_node(standalone_node(&make_issue(1)));
873 graph.transition(1, NodeState::InFlight);
874
875 let outcome = PipelineOutcome {
876 run_id: "run-1".to_string(),
877 pr_number: 10,
878 branch: Some("oven/issue-1-abc12345".to_string()),
879 worktree_path: PathBuf::from("/tmp/wt"),
880 target_dir: PathBuf::from("/tmp"),
881 };
882 handle_task_result(Ok((1, Ok(outcome))), &mut graph, &executor).await;
883
884 let loaded = DependencyGraph::from_db(&*db.lock().await, "persist-test").unwrap();
886 assert_eq!(loaded.node(1).unwrap().state, NodeState::AwaitingMerge);
887 assert_eq!(loaded.node(1).unwrap().pr_number, Some(10));
888 });
889 }
890
891 fn mock_runner_with_pr_state(state: &'static str) -> MockCommandRunner {
892 let mut mock = MockCommandRunner::new();
893 mock.expect_run_gh().returning(move |args, _| {
894 let args = args.to_vec();
895 Box::pin(async move {
896 if args.iter().any(|a| a == "view") {
897 Ok(CommandOutput {
898 stdout: format!(r#"{{"state":"{state}"}}"#),
899 stderr: String::new(),
900 success: true,
901 })
902 } else {
903 Ok(CommandOutput {
904 stdout: String::new(),
905 stderr: String::new(),
906 success: true,
907 })
908 }
909 })
910 });
911 mock.expect_run_claude().returning(|_, _, _, _, _| {
912 Box::pin(async {
913 Ok(AgentResult {
914 cost_usd: 0.0,
915 duration: Duration::from_secs(0),
916 turns: 0,
917 output: String::new(),
918 session_id: String::new(),
919 success: true,
920 })
921 })
922 });
923 mock
924 }
925
926 fn make_merge_poll_executor(state: &'static str) -> Arc<PipelineExecutor<MockCommandRunner>> {
927 let gh_mock = mock_runner_with_pr_state(state);
928 let github = Arc::new(GhClient::new(gh_mock, std::path::Path::new("/tmp")));
929 let issues = make_github_provider(&github);
930 let db = Arc::new(Mutex::new(crate::db::open_in_memory().unwrap()));
931 let runner = Arc::new(mock_runner_with_pr_state(state));
932 Arc::new(PipelineExecutor {
933 runner,
934 github,
935 issues,
936 db,
937 config: Config::default(),
938 cancel_token: CancellationToken::new(),
939 repo_dir: PathBuf::from("/tmp"),
940 })
941 }
942
943 #[test]
944 fn merge_polling_transitions_merged_pr() {
945 let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
946 rt.block_on(async {
947 let executor = make_merge_poll_executor("MERGED");
948
949 let mut graph = DependencyGraph::new("merge-poll-test");
950 let mut node = standalone_node(&make_issue(1));
951 node.pr_number = Some(42);
952 node.run_id = Some("run-1".to_string());
953 graph.add_node(node);
954 graph.transition(1, NodeState::AwaitingMerge);
955
956 poll_awaiting_merges(&mut graph, &executor).await;
957
958 assert_eq!(graph.node(1).unwrap().state, NodeState::Merged);
959 });
960 }
961
962 #[test]
963 fn merge_polling_transitions_node_without_issue() {
964 let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
965 rt.block_on(async {
966 let executor = make_merge_poll_executor("MERGED");
967
968 let mut graph = DependencyGraph::new("db-restore-test");
969 let mut node = GraphNode {
971 issue_number: 1,
972 title: "Issue #1".to_string(),
973 area: "test".to_string(),
974 predicted_files: vec![],
975 has_migration: false,
976 complexity: "full".to_string(),
977 state: NodeState::Pending,
978 pr_number: Some(42),
979 run_id: Some("run-1".to_string()),
980 issue: None,
981 target_repo: None,
982 };
983 node.state = NodeState::Pending;
984 graph.add_node(node);
985 graph.transition(1, NodeState::AwaitingMerge);
986
987 poll_awaiting_merges(&mut graph, &executor).await;
988
989 assert_eq!(graph.node(1).unwrap().state, NodeState::Merged);
991 });
992 }
993
994 #[test]
995 fn merge_polling_handles_closed_pr() {
996 let rt = tokio::runtime::Builder::new_current_thread().build().unwrap();
997 rt.block_on(async {
998 let executor = make_merge_poll_executor("CLOSED");
999
1000 let plan = PlannerGraphOutput {
1001 nodes: vec![
1002 crate::agents::PlannedNode {
1003 number: 1,
1004 title: "Root".to_string(),
1005 area: "a".to_string(),
1006 predicted_files: vec![],
1007 has_migration: false,
1008 complexity: Complexity::Full,
1009 depends_on: vec![],
1010 reasoning: String::new(),
1011 },
1012 crate::agents::PlannedNode {
1013 number: 2,
1014 title: "Dep".to_string(),
1015 area: "b".to_string(),
1016 predicted_files: vec![],
1017 has_migration: false,
1018 complexity: Complexity::Full,
1019 depends_on: vec![1],
1020 reasoning: String::new(),
1021 },
1022 ],
1023 total_issues: 2,
1024 parallel_capacity: 1,
1025 };
1026 let test_issues = vec![make_issue(1), make_issue(2)];
1027 let mut graph =
1028 DependencyGraph::from_planner_output("merge-poll-close", &plan, &test_issues);
1029 graph.transition(1, NodeState::AwaitingMerge);
1030 graph.set_pr_number(1, 42);
1031 graph.set_run_id(1, "run-1");
1032
1033 poll_awaiting_merges(&mut graph, &executor).await;
1034
1035 assert_eq!(graph.node(1).unwrap().state, NodeState::Failed);
1036 assert_eq!(graph.node(2).unwrap().state, NodeState::Failed);
1038 });
1039 }
1040
1041 #[test]
1042 fn merge_unlocks_dependent() {
1043 let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
1044 rt.block_on(async {
1045 let executor = make_merge_poll_executor("MERGED");
1046
1047 let plan = PlannerGraphOutput {
1048 nodes: vec![
1049 crate::agents::PlannedNode {
1050 number: 1,
1051 title: "Root".to_string(),
1052 area: "a".to_string(),
1053 predicted_files: vec![],
1054 has_migration: false,
1055 complexity: Complexity::Full,
1056 depends_on: vec![],
1057 reasoning: String::new(),
1058 },
1059 crate::agents::PlannedNode {
1060 number: 2,
1061 title: "Dep".to_string(),
1062 area: "b".to_string(),
1063 predicted_files: vec![],
1064 has_migration: false,
1065 complexity: Complexity::Full,
1066 depends_on: vec![1],
1067 reasoning: String::new(),
1068 },
1069 ],
1070 total_issues: 2,
1071 parallel_capacity: 1,
1072 };
1073 let test_issues = vec![make_issue(1), make_issue(2)];
1074 let mut graph =
1075 DependencyGraph::from_planner_output("merge-unlock", &plan, &test_issues);
1076 graph.transition(1, NodeState::AwaitingMerge);
1077 graph.set_pr_number(1, 42);
1078 graph.set_run_id(1, "run-1");
1079
1080 assert!(graph.ready_issues().is_empty());
1082
1083 poll_awaiting_merges(&mut graph, &executor).await;
1084
1085 assert_eq!(graph.node(1).unwrap().state, NodeState::Merged);
1087 assert_eq!(graph.ready_issues(), vec![2]);
1088 });
1089 }
1090
1091 #[tokio::test]
1092 async fn fetch_base_branches_handles_multiple_repos() {
1093 async fn repo_with_remote() -> (tempfile::TempDir, tempfile::TempDir) {
1096 use tokio::process::Command as TokioCmd;
1097
1098 let dir = tempfile::tempdir().unwrap();
1099 for (args, cwd) in [
1100 (vec!["init"], dir.path()),
1101 (vec!["config", "user.email", "test@test.com"], dir.path()),
1102 (vec!["config", "user.name", "Test"], dir.path()),
1103 ] {
1104 TokioCmd::new("git").args(&args).current_dir(cwd).output().await.unwrap();
1105 }
1106 tokio::fs::write(dir.path().join("README.md"), "init").await.unwrap();
1107 TokioCmd::new("git").args(["add", "."]).current_dir(dir.path()).output().await.unwrap();
1108 TokioCmd::new("git")
1109 .args(["commit", "-m", "init"])
1110 .current_dir(dir.path())
1111 .output()
1112 .await
1113 .unwrap();
1114
1115 let remote = tempfile::tempdir().unwrap();
1116 TokioCmd::new("git")
1117 .args(["clone", "--bare", dir.path().to_string_lossy().as_ref(), "."])
1118 .current_dir(remote.path())
1119 .output()
1120 .await
1121 .unwrap();
1122 TokioCmd::new("git")
1123 .args(["remote", "add", "origin", remote.path().to_string_lossy().as_ref()])
1124 .current_dir(dir.path())
1125 .output()
1126 .await
1127 .unwrap();
1128 TokioCmd::new("git")
1129 .args(["fetch", "origin"])
1130 .current_dir(dir.path())
1131 .output()
1132 .await
1133 .unwrap();
1134
1135 (dir, remote)
1136 }
1137
1138 let (repo_a, _remote_a) = repo_with_remote().await;
1139 let (repo_b, _remote_b) = repo_with_remote().await;
1140
1141 let mut dirs = HashSet::new();
1142 dirs.insert(repo_a.path().to_path_buf());
1143 dirs.insert(repo_b.path().to_path_buf());
1144
1145 fetch_base_branches(&dirs).await;
1147 }
1148
1149 #[tokio::test]
1150 async fn fetch_base_branches_skips_invalid_repo_gracefully() {
1151 let mut dirs = HashSet::new();
1152 dirs.insert(PathBuf::from("/tmp/nonexistent-repo-12345"));
1153
1154 fetch_base_branches(&dirs).await;
1156 }
1157}