Skip to main content

oven_cli/pipeline/
runner.rs

1use std::{
2    collections::HashSet,
3    path::{Path, PathBuf},
4    sync::Arc,
5    time::Duration,
6};
7
8use anyhow::Result;
9use tokio::{sync::Semaphore, task::JoinSet};
10use tokio_util::sync::CancellationToken;
11use tracing::{error, info, warn};
12
13use super::{
14    executor::{PipelineExecutor, PipelineOutcome},
15    graph::DependencyGraph,
16};
17use crate::{
18    agents::Complexity,
19    db::graph::NodeState,
20    git,
21    issues::PipelineIssue,
22    pipeline::{executor::generate_run_id, graph::GraphNode},
23    process::CommandRunner,
24};
25
26/// Shared mutable state for the polling scheduler.
27///
28/// The `DependencyGraph` is the single source of truth for issue states,
29/// dependency edges, and scheduling decisions.
30struct SchedulerState {
31    graph: DependencyGraph,
32    semaphore: Arc<Semaphore>,
33    tasks: JoinSet<(u32, Result<PipelineOutcome>)>,
34}
35
36/// Run the pipeline for a batch of issues using planner-driven sequencing.
37///
38/// Used for the explicit-IDs path (`oven on 42,43`). Calls the planner with no
39/// in-flight context, builds a `DependencyGraph`, then runs layers sequentially
40/// (issues within each layer run in parallel). Falls back to all-parallel if the
41/// planner fails.
42pub async fn run_batch<R: CommandRunner + 'static>(
43    executor: &Arc<PipelineExecutor<R>>,
44    issues: Vec<PipelineIssue>,
45    max_parallel: usize,
46    auto_merge: bool,
47) -> Result<()> {
48    let session_id = generate_run_id();
49    let mut graph = if let Some(plan) = executor.plan_issues(&issues, &[]).await {
50        info!(nodes = plan.nodes.len(), total = plan.total_issues, "planner produced a plan");
51        DependencyGraph::from_planner_output(&session_id, &plan, &issues)
52    } else {
53        warn!("planner failed, falling back to all-parallel execution");
54        let mut g = DependencyGraph::new(&session_id);
55        for issue in &issues {
56            g.add_node(standalone_node(issue));
57        }
58        g
59    };
60
61    save_graph(&graph, executor).await;
62
63    let semaphore = Arc::new(Semaphore::new(max_parallel));
64    let mut had_errors = false;
65
66    while !graph.all_terminal() {
67        let ready = graph.ready_issues();
68        if ready.is_empty() {
69            warn!("no ready issues but graph is not terminal, breaking to avoid infinite loop");
70            save_graph(&graph, executor).await;
71            break;
72        }
73
74        let mut tasks: JoinSet<(u32, Result<PipelineOutcome>)> = JoinSet::new();
75
76        for num in &ready {
77            graph.transition(*num, NodeState::InFlight);
78        }
79        save_graph(&graph, executor).await;
80
81        for num in ready {
82            let node = graph.node(num).expect("ready issue must exist in graph");
83            let issue = node.issue.clone().expect("batch issues have issue attached");
84            let complexity = node.complexity.parse::<Complexity>().ok();
85            let sem = Arc::clone(&semaphore);
86            let exec = Arc::clone(executor);
87
88            tasks.spawn(async move {
89                let permit = match sem.acquire_owned().await {
90                    Ok(p) => p,
91                    Err(e) => return (num, Err(anyhow::anyhow!("semaphore closed: {e}"))),
92                };
93                let result = exec.run_issue_pipeline(&issue, auto_merge, complexity).await;
94                let outcome = match result {
95                    Ok(outcome) => {
96                        if let Err(e) = exec.finalize_merge(&outcome, &issue).await {
97                            warn!(issue = num, error = %e, "finalize_merge failed");
98                        }
99                        Ok(outcome)
100                    }
101                    Err(e) => Err(e),
102                };
103                drop(permit);
104                (num, outcome)
105            });
106        }
107
108        let mut merged_target_dirs: HashSet<PathBuf> = HashSet::new();
109        while let Some(join_result) = tasks.join_next().await {
110            match join_result {
111                Ok((number, Ok(ref outcome))) => {
112                    info!(issue = number, "pipeline completed successfully");
113                    graph.set_pr_number(number, outcome.pr_number);
114                    graph.set_run_id(number, &outcome.run_id);
115                    graph.transition(number, NodeState::Merged);
116                    if auto_merge {
117                        merged_target_dirs.insert(outcome.target_dir.clone());
118                    }
119                }
120                Ok((number, Err(ref e))) => {
121                    error!(issue = number, error = %e, "pipeline failed for issue");
122                    graph.transition(number, NodeState::Failed);
123                    let blocked = graph.propagate_failure(number);
124                    for b in &blocked {
125                        warn!(issue = b, blocked_by = number, "transitively failed");
126                    }
127                    had_errors = true;
128                }
129                Err(e) => {
130                    error!(error = %e, "pipeline task panicked");
131                    had_errors = true;
132                }
133            }
134        }
135
136        // After merges land on the remote, update the local base branch so the
137        // next layer's worktrees fork from post-merge state.
138        if !merged_target_dirs.is_empty() && !graph.all_terminal() {
139            fetch_base_branches(&merged_target_dirs).await;
140        }
141
142        save_graph(&graph, executor).await;
143    }
144
145    if had_errors {
146        anyhow::bail!("one or more pipelines failed in batch");
147    }
148    Ok(())
149}
150
151/// Poll for new issues and run them through the pipeline.
152///
153/// Unlike `run_batch`, this function continuously polls for new issues even while
154/// existing pipelines are running. The `DependencyGraph` is the single source of
155/// truth: `ready_issues()` drives scheduling, `transition()` replaces manual map
156/// mutations, and `propagate_failure()` handles dependency cascades.
157pub async fn polling_loop<R: CommandRunner + 'static>(
158    executor: Arc<PipelineExecutor<R>>,
159    auto_merge: bool,
160    cancel_token: CancellationToken,
161) -> Result<()> {
162    let poll_interval = Duration::from_secs(executor.config.pipeline.poll_interval);
163    let max_parallel = executor.config.pipeline.max_parallel as usize;
164    let ready_label = executor.config.labels.ready.clone();
165
166    // Try loading an existing graph session (crash recovery), or create a new one.
167    let graph = load_or_create_graph(&executor).await;
168
169    let mut sched = SchedulerState {
170        graph,
171        semaphore: Arc::new(Semaphore::new(max_parallel)),
172        tasks: JoinSet::new(),
173    };
174
175    info!(poll_interval_secs = poll_interval.as_secs(), max_parallel, "continuous polling started");
176
177    loop {
178        tokio::select! {
179            () = cancel_token.cancelled() => {
180                info!("shutdown signal received, waiting for in-flight pipelines");
181                drain_tasks(&mut sched, &executor).await;
182                break;
183            }
184            () = tokio::time::sleep(poll_interval) => {
185                poll_and_spawn(&executor, &ready_label, &mut sched, auto_merge).await;
186            }
187            Some(result) = sched.tasks.join_next(), if !sched.tasks.is_empty() => {
188                handle_task_result(result, &mut sched.graph, &executor).await;
189            }
190        }
191    }
192
193    Ok(())
194}
195
196/// Load an existing active graph session from DB, or create a new empty one.
197async fn load_or_create_graph<R: CommandRunner>(
198    executor: &Arc<PipelineExecutor<R>>,
199) -> DependencyGraph {
200    let conn = executor.db.lock().await;
201    match crate::db::graph::get_active_session(&conn) {
202        Ok(Some(session_id)) => match DependencyGraph::from_db(&conn, &session_id) {
203            Ok(graph) => {
204                info!(session_id = %session_id, nodes = graph.node_count(), "resumed existing graph session");
205                return graph;
206            }
207            Err(e) => {
208                warn!(error = %e, "failed to load graph session, starting fresh");
209            }
210        },
211        Ok(None) => {}
212        Err(e) => {
213            warn!(error = %e, "failed to check for active graph session");
214        }
215    }
216    let session_id = generate_run_id();
217    info!(session_id = %session_id, "starting new graph session");
218    DependencyGraph::new(&session_id)
219}
220
221/// Drain remaining tasks on shutdown.
222async fn drain_tasks<R: CommandRunner>(
223    sched: &mut SchedulerState,
224    executor: &Arc<PipelineExecutor<R>>,
225) {
226    while let Some(result) = sched.tasks.join_next().await {
227        handle_task_result(result, &mut sched.graph, executor).await;
228    }
229}
230
231/// Process a completed pipeline task: update graph state and persist.
232async fn handle_task_result<R: CommandRunner>(
233    result: Result<(u32, Result<PipelineOutcome>), tokio::task::JoinError>,
234    graph: &mut DependencyGraph,
235    executor: &Arc<PipelineExecutor<R>>,
236) {
237    match result {
238        Ok((number, Ok(ref outcome))) => {
239            info!(issue = number, "pipeline completed successfully");
240            graph.set_pr_number(number, outcome.pr_number);
241            graph.set_run_id(number, &outcome.run_id);
242            graph.transition(number, NodeState::AwaitingMerge);
243        }
244        Ok((number, Err(ref e))) => {
245            error!(issue = number, error = %e, "pipeline failed for issue");
246            graph.transition(number, NodeState::Failed);
247            let blocked = graph.propagate_failure(number);
248            for b in &blocked {
249                warn!(issue = b, blocked_by = number, "transitively failed");
250            }
251        }
252        Err(e) => {
253            error!(error = %e, "pipeline task panicked");
254            return;
255        }
256    }
257    save_graph(graph, executor).await;
258}
259
260/// Check `AwaitingMerge` nodes and transition them to `Merged` or `Failed`
261/// based on the PR's actual state on GitHub.
262async fn poll_awaiting_merges<R: CommandRunner + 'static>(
263    graph: &mut DependencyGraph,
264    executor: &Arc<PipelineExecutor<R>>,
265) {
266    let awaiting = graph.awaiting_merge();
267    if awaiting.is_empty() {
268        return;
269    }
270
271    let mut merged_target_dirs: HashSet<PathBuf> = HashSet::new();
272    for num in awaiting {
273        let Some(node) = graph.node(num) else { continue };
274        let Some(pr_number) = node.pr_number else {
275            warn!(issue = num, "AwaitingMerge node has no PR number, skipping");
276            continue;
277        };
278        let run_id = node.run_id.clone().unwrap_or_default();
279        let issue = node.issue.clone();
280        let target_repo = node.target_repo.clone();
281
282        // Resolve which repo directory to query for PR state.
283        // Multi-repo PRs live in the target repo, not the god repo.
284        let pr_repo_dir = match executor.resolve_target_dir(target_repo.as_ref()) {
285            Ok((dir, _)) => dir,
286            Err(e) => {
287                warn!(issue = num, error = %e, "failed to resolve target dir for PR state check");
288                continue;
289            }
290        };
291
292        let pr_state = match executor.github.get_pr_state_in(pr_number, &pr_repo_dir).await {
293            Ok(s) => s,
294            Err(e) => {
295                warn!(issue = num, pr = pr_number, error = %e, "failed to check PR state");
296                continue;
297            }
298        };
299
300        match pr_state {
301            crate::github::PrState::Merged => {
302                info!(issue = num, pr = pr_number, "PR merged, finalizing");
303                if let Some(ref issue) = issue {
304                    match executor.reconstruct_outcome(issue, &run_id, pr_number).await {
305                        Ok(outcome) => {
306                            if let Err(e) = executor.finalize_merge(&outcome, issue).await {
307                                warn!(issue = num, error = %e, "finalize_merge after poll failed");
308                            }
309                        }
310                        Err(e) => {
311                            warn!(issue = num, error = %e, "failed to reconstruct outcome");
312                        }
313                    }
314                } else {
315                    warn!(
316                        issue = num,
317                        pr = pr_number,
318                        "node restored from DB has no PipelineIssue, \
319                         skipping finalization (labels and worktree may need manual cleanup)"
320                    );
321                }
322                graph.transition(num, NodeState::Merged);
323                merged_target_dirs.insert(pr_repo_dir);
324            }
325            crate::github::PrState::Closed => {
326                warn!(issue = num, pr = pr_number, "PR closed without merge, marking failed");
327                graph.transition(num, NodeState::Failed);
328                let blocked = graph.propagate_failure(num);
329                for b in &blocked {
330                    warn!(issue = b, blocked_by = num, "transitively failed (PR closed)");
331                }
332            }
333            crate::github::PrState::Open => {
334                // Still open, keep waiting
335            }
336        }
337    }
338
339    // After merges land, update each affected repo's base branch so the next
340    // layer's worktrees fork from post-merge state.
341    if !merged_target_dirs.is_empty() {
342        fetch_base_branches(&merged_target_dirs).await;
343    }
344
345    save_graph(graph, executor).await;
346}
347
348/// Single poll cycle: plan new issues, promote ready ones, and spawn tasks.
349async fn poll_and_spawn<R: CommandRunner + 'static>(
350    executor: &Arc<PipelineExecutor<R>>,
351    ready_label: &str,
352    sched: &mut SchedulerState,
353    auto_merge: bool,
354) {
355    // Check if any AwaitingMerge PRs have been merged
356    poll_awaiting_merges(&mut sched.graph, executor).await;
357
358    let ready_issues = match executor.issues.get_ready_issues(ready_label).await {
359        Ok(i) => i,
360        Err(e) => {
361            error!(error = %e, "failed to fetch issues");
362            return;
363        }
364    };
365
366    let ready_numbers: HashSet<u32> = ready_issues.iter().map(|i| i.number).collect();
367
368    // Clean stale nodes: remove Pending nodes whose issues disappeared from the ready list
369    clean_stale_nodes(&mut sched.graph, &ready_numbers);
370
371    // Filter to genuinely new issues not already in the graph
372    let new_issues: Vec<_> =
373        ready_issues.into_iter().filter(|i| !sched.graph.contains(i.number)).collect();
374
375    // Plan and merge new issues into the graph
376    if !new_issues.is_empty() {
377        info!(count = new_issues.len(), "found new issues to evaluate");
378        let graph_context = sched.graph.to_graph_context();
379
380        if let Some(plan) = executor.plan_issues(&new_issues, &graph_context).await {
381            info!(nodes = plan.nodes.len(), total = plan.total_issues, "planner produced a plan");
382            sched.graph.merge_planner_output(&plan, &new_issues);
383        } else {
384            warn!("planner failed, adding all new issues as independent nodes");
385            add_independent_nodes(&mut sched.graph, &new_issues);
386        }
387
388        save_graph(&sched.graph, executor).await;
389    }
390
391    // Spawn ready issues
392    let to_spawn = collect_ready_issues(&mut sched.graph);
393    if to_spawn.is_empty() {
394        if new_issues.is_empty() {
395            info!("no actionable issues, waiting");
396        }
397        return;
398    }
399
400    save_graph(&sched.graph, executor).await;
401    spawn_issues(to_spawn, executor, sched, auto_merge);
402}
403
404/// Remove graph nodes that are still `Pending` but no longer in the provider's ready list.
405fn clean_stale_nodes(graph: &mut DependencyGraph, ready_numbers: &HashSet<u32>) {
406    let stale: Vec<u32> = graph
407        .all_issues()
408        .into_iter()
409        .filter(|num| {
410            !ready_numbers.contains(num)
411                && graph.node(*num).is_some_and(|n| n.state == NodeState::Pending)
412        })
413        .collect();
414    if !stale.is_empty() {
415        info!(count = stale.len(), "removing stale pending nodes");
416        for num in stale {
417            graph.remove_node(num);
418        }
419    }
420}
421
422/// Add issues to the graph as independent nodes (no edges) when the planner fails.
423fn add_independent_nodes(graph: &mut DependencyGraph, issues: &[PipelineIssue]) {
424    for issue in issues {
425        if !graph.contains(issue.number) {
426            graph.add_node(standalone_node(issue));
427        }
428    }
429}
430
431/// Find ready issues in the graph, transition them to `InFlight`, return spawn data.
432fn collect_ready_issues(graph: &mut DependencyGraph) -> Vec<(u32, PipelineIssue, Complexity)> {
433    let ready = graph.ready_issues();
434    let mut to_spawn = Vec::new();
435
436    for num in ready {
437        let Some(node) = graph.node(num) else { continue };
438        let Some(issue) = node.issue.clone() else {
439            warn!(issue = num, "ready node has no PipelineIssue attached, skipping");
440            continue;
441        };
442        let complexity = node.complexity.parse::<Complexity>().unwrap_or(Complexity::Full);
443        graph.transition(num, NodeState::InFlight);
444        to_spawn.push((num, issue, complexity));
445    }
446
447    to_spawn
448}
449
450/// Spawn pipeline tasks for a set of issues.
451fn spawn_issues<R: CommandRunner + 'static>(
452    to_spawn: Vec<(u32, PipelineIssue, Complexity)>,
453    executor: &Arc<PipelineExecutor<R>>,
454    sched: &mut SchedulerState,
455    auto_merge: bool,
456) {
457    for (number, issue, complexity) in to_spawn {
458        let sem = Arc::clone(&sched.semaphore);
459        let exec = Arc::clone(executor);
460
461        sched.tasks.spawn(async move {
462            let permit = match sem.acquire_owned().await {
463                Ok(p) => p,
464                Err(e) => return (number, Err(anyhow::anyhow!("semaphore closed: {e}"))),
465            };
466            let outcome = exec.run_issue_pipeline(&issue, auto_merge, Some(complexity)).await;
467            drop(permit);
468            (number, outcome)
469        });
470    }
471}
472
473/// Create a `GraphNode` for an issue with no planner metadata.
474fn standalone_node(issue: &PipelineIssue) -> GraphNode {
475    GraphNode {
476        issue_number: issue.number,
477        title: issue.title.clone(),
478        area: String::new(),
479        predicted_files: Vec::new(),
480        has_migration: false,
481        complexity: Complexity::Full.to_string(),
482        state: NodeState::Pending,
483        pr_number: None,
484        run_id: None,
485        target_repo: issue.target_repo.clone(),
486        issue: Some(issue.clone()),
487    }
488}
489
490/// Update the base branch in every repo where merges landed.
491///
492/// Without this, new worktrees created for the next layer would fork from a
493/// stale local ref, causing the implementer to work against pre-merge code.
494/// In multi-repo mode, merges may land in different target repos, so we fetch
495/// the base branch in each distinct repo directory.
496async fn fetch_base_branches(repo_dirs: &HashSet<PathBuf>) {
497    for repo_dir in repo_dirs {
498        fetch_base_branch_in(repo_dir).await;
499    }
500}
501
502/// Fetch the base branch for a single repo directory.
503async fn fetch_base_branch_in(repo_dir: &Path) {
504    match git::default_branch(repo_dir).await {
505        Ok(branch) => {
506            if let Err(e) = git::fetch_branch(repo_dir, &branch).await {
507                warn!(
508                    repo = %repo_dir.display(), error = %e,
509                    "failed to fetch base branch after merge"
510                );
511            } else {
512                if let Err(e) = git::advance_local_branch(repo_dir, &branch).await {
513                    warn!(
514                        repo = %repo_dir.display(), branch = %branch, error = %e,
515                        "failed to advance local branch after fetch"
516                    );
517                }
518                info!(
519                    repo = %repo_dir.display(), branch = %branch,
520                    "updated base branch after merge"
521                );
522            }
523        }
524        Err(e) => {
525            warn!(
526                repo = %repo_dir.display(), error = %e,
527                "failed to detect base branch for post-merge fetch"
528            );
529        }
530    }
531}
532
533/// Persist graph state to the database.
534async fn save_graph<R: CommandRunner>(
535    graph: &DependencyGraph,
536    executor: &Arc<PipelineExecutor<R>>,
537) {
538    let conn = executor.db.lock().await;
539    if let Err(e) = graph.save_to_db(&conn) {
540        warn!(error = %e, "failed to persist dependency graph");
541    }
542}
543
544#[cfg(test)]
545mod tests {
546    use std::path::PathBuf;
547
548    use tokio::sync::Mutex;
549
550    use super::*;
551    use crate::{
552        agents::PlannerGraphOutput,
553        config::Config,
554        github::GhClient,
555        issues::{IssueOrigin, IssueProvider, github::GithubIssueProvider},
556        process::{AgentResult, CommandOutput, MockCommandRunner},
557    };
558
559    fn mock_runner_for_batch() -> MockCommandRunner {
560        let mut mock = MockCommandRunner::new();
561        mock.expect_run_gh().returning(|_, _| {
562            Box::pin(async {
563                Ok(CommandOutput {
564                    stdout: "https://github.com/user/repo/pull/1\n".to_string(),
565                    stderr: String::new(),
566                    success: true,
567                })
568            })
569        });
570        mock.expect_run_claude().returning(|_, _, _, _, _| {
571            Box::pin(async {
572                Ok(AgentResult {
573                    cost_usd: 1.0,
574                    duration: Duration::from_secs(5),
575                    turns: 3,
576                    output: r#"{"findings":[],"summary":"clean"}"#.to_string(),
577                    session_id: "sess-1".to_string(),
578                    success: true,
579                })
580            })
581        });
582        mock
583    }
584
585    fn make_github_provider(gh: &Arc<GhClient<MockCommandRunner>>) -> Arc<dyn IssueProvider> {
586        Arc::new(GithubIssueProvider::new(Arc::clone(gh), "target_repo"))
587    }
588
589    fn make_issue(number: u32) -> PipelineIssue {
590        PipelineIssue {
591            number,
592            title: format!("Issue #{number}"),
593            body: String::new(),
594            source: IssueOrigin::Github,
595            target_repo: None,
596            author: None,
597        }
598    }
599
600    #[tokio::test]
601    async fn cancellation_stops_polling() {
602        let cancel = CancellationToken::new();
603        let runner = Arc::new(mock_runner_for_batch());
604        let github = Arc::new(GhClient::new(mock_runner_for_batch(), std::path::Path::new("/tmp")));
605        let issues = make_github_provider(&github);
606        let db = Arc::new(Mutex::new(crate::db::open_in_memory().unwrap()));
607
608        let mut config = Config::default();
609        config.pipeline.poll_interval = 3600; // very long so we don't actually poll
610
611        let executor = Arc::new(PipelineExecutor {
612            runner,
613            github,
614            issues,
615            db,
616            config,
617            cancel_token: cancel.clone(),
618            repo_dir: PathBuf::from("/tmp"),
619        });
620
621        let cancel_clone = cancel.clone();
622        let handle = tokio::spawn(async move { polling_loop(executor, false, cancel_clone).await });
623
624        // Cancel immediately
625        cancel.cancel();
626
627        let result = handle.await.unwrap();
628        assert!(result.is_ok());
629    }
630
631    #[tokio::test]
632    async fn cancellation_exits_within_timeout() {
633        let cancel = CancellationToken::new();
634        let runner = Arc::new(mock_runner_for_batch());
635        let github = Arc::new(GhClient::new(mock_runner_for_batch(), std::path::Path::new("/tmp")));
636        let issues = make_github_provider(&github);
637        let db = Arc::new(Mutex::new(crate::db::open_in_memory().unwrap()));
638
639        let mut config = Config::default();
640        config.pipeline.poll_interval = 3600;
641
642        let executor = Arc::new(PipelineExecutor {
643            runner,
644            github,
645            issues,
646            db,
647            config,
648            cancel_token: cancel.clone(),
649            repo_dir: PathBuf::from("/tmp"),
650        });
651
652        let cancel_clone = cancel.clone();
653        let handle = tokio::spawn(async move { polling_loop(executor, false, cancel_clone).await });
654
655        cancel.cancel();
656
657        let result = tokio::time::timeout(Duration::from_secs(5), handle)
658            .await
659            .expect("polling loop should exit within timeout")
660            .unwrap();
661        assert!(result.is_ok());
662    }
663
664    #[test]
665    fn handle_task_success_transitions_to_awaiting_merge() {
666        let rt = tokio::runtime::Builder::new_current_thread().build().unwrap();
667        rt.block_on(async {
668            let executor = {
669                let runner = Arc::new(mock_runner_for_batch());
670                let github =
671                    Arc::new(GhClient::new(mock_runner_for_batch(), std::path::Path::new("/tmp")));
672                let issues = make_github_provider(&github);
673                let db = Arc::new(Mutex::new(crate::db::open_in_memory().unwrap()));
674                Arc::new(PipelineExecutor {
675                    runner,
676                    github,
677                    issues,
678                    db,
679                    config: Config::default(),
680                    cancel_token: CancellationToken::new(),
681                    repo_dir: PathBuf::from("/tmp"),
682                })
683            };
684
685            let mut graph = DependencyGraph::new("test");
686            graph.add_node(standalone_node(&make_issue(1)));
687            graph.transition(1, NodeState::InFlight);
688
689            let outcome = PipelineOutcome {
690                run_id: "run-abc".to_string(),
691                pr_number: 42,
692                branch: Some("oven/issue-1-abc12345".to_string()),
693                worktree_path: PathBuf::from("/tmp/wt"),
694                target_dir: PathBuf::from("/tmp"),
695            };
696
697            handle_task_result(Ok((1, Ok(outcome))), &mut graph, &executor).await;
698
699            assert_eq!(graph.node(1).unwrap().state, NodeState::AwaitingMerge);
700            assert_eq!(graph.node(1).unwrap().pr_number, Some(42));
701            assert_eq!(graph.node(1).unwrap().run_id.as_deref(), Some("run-abc"));
702        });
703    }
704
705    #[test]
706    fn handle_task_failure_propagates_to_dependents() {
707        let rt = tokio::runtime::Builder::new_current_thread().build().unwrap();
708        rt.block_on(async {
709            let executor = {
710                let runner = Arc::new(mock_runner_for_batch());
711                let github =
712                    Arc::new(GhClient::new(mock_runner_for_batch(), std::path::Path::new("/tmp")));
713                let issues = make_github_provider(&github);
714                let db = Arc::new(Mutex::new(crate::db::open_in_memory().unwrap()));
715                Arc::new(PipelineExecutor {
716                    runner,
717                    github,
718                    issues,
719                    db,
720                    config: Config::default(),
721                    cancel_token: CancellationToken::new(),
722                    repo_dir: PathBuf::from("/tmp"),
723                })
724            };
725
726            let plan = PlannerGraphOutput {
727                nodes: vec![
728                    crate::agents::PlannedNode {
729                        number: 1,
730                        title: "Root".to_string(),
731                        area: "a".to_string(),
732                        predicted_files: vec![],
733                        has_migration: false,
734                        complexity: Complexity::Full,
735                        depends_on: vec![],
736                        reasoning: String::new(),
737                    },
738                    crate::agents::PlannedNode {
739                        number: 2,
740                        title: "Dep".to_string(),
741                        area: "b".to_string(),
742                        predicted_files: vec![],
743                        has_migration: false,
744                        complexity: Complexity::Full,
745                        depends_on: vec![1],
746                        reasoning: String::new(),
747                    },
748                ],
749                total_issues: 2,
750                parallel_capacity: 1,
751            };
752            let issues = vec![make_issue(1), make_issue(2)];
753            let mut graph = DependencyGraph::from_planner_output("test", &plan, &issues);
754            graph.transition(1, NodeState::InFlight);
755
756            handle_task_result(
757                Ok((1, Err(anyhow::anyhow!("pipeline failed")))),
758                &mut graph,
759                &executor,
760            )
761            .await;
762
763            assert_eq!(graph.node(1).unwrap().state, NodeState::Failed);
764            assert_eq!(graph.node(2).unwrap().state, NodeState::Failed);
765        });
766    }
767
768    #[test]
769    fn stale_node_removed_when_issue_disappears() {
770        let mut graph = DependencyGraph::new("test");
771        graph.add_node(standalone_node(&make_issue(1)));
772        graph.add_node(standalone_node(&make_issue(2)));
773        graph.add_node(standalone_node(&make_issue(3)));
774        graph.transition(2, NodeState::InFlight);
775
776        // Only issue 1 and 2 remain in provider; 3 disappeared
777        let ready_numbers: HashSet<u32> = HashSet::from([1, 2]);
778        clean_stale_nodes(&mut graph, &ready_numbers);
779
780        assert!(graph.contains(1)); // still Pending + in ready list
781        assert!(graph.contains(2)); // InFlight, not removed even if not in ready
782        assert!(!graph.contains(3)); // Pending + not in ready = removed
783    }
784
785    #[test]
786    fn collect_ready_issues_transitions_to_in_flight() {
787        let mut graph = DependencyGraph::new("test");
788        graph.add_node(standalone_node(&make_issue(1)));
789        graph.add_node(standalone_node(&make_issue(2)));
790
791        let spawnable = collect_ready_issues(&mut graph);
792        assert_eq!(spawnable.len(), 2);
793
794        // Both should now be InFlight
795        assert_eq!(graph.node(1).unwrap().state, NodeState::InFlight);
796        assert_eq!(graph.node(2).unwrap().state, NodeState::InFlight);
797
798        // No more ready issues
799        assert!(collect_ready_issues(&mut graph).is_empty());
800    }
801
802    #[tokio::test]
803    async fn planner_failure_falls_back_to_all_parallel() {
804        let mut mock = MockCommandRunner::new();
805        mock.expect_run_gh().returning(|_, _| {
806            Box::pin(async {
807                Ok(CommandOutput { stdout: String::new(), stderr: String::new(), success: true })
808            })
809        });
810        mock.expect_run_claude().returning(|_, _, _, _, _| {
811            Box::pin(async {
812                Ok(AgentResult {
813                    cost_usd: 0.5,
814                    duration: Duration::from_secs(2),
815                    turns: 1,
816                    output: "I don't know how to plan".to_string(),
817                    session_id: "sess-plan".to_string(),
818                    success: true,
819                })
820            })
821        });
822
823        let runner = Arc::new(mock);
824        let github = Arc::new(GhClient::new(mock_runner_for_batch(), std::path::Path::new("/tmp")));
825        let issues_provider = make_github_provider(&github);
826        let db = Arc::new(Mutex::new(crate::db::open_in_memory().unwrap()));
827
828        let executor = Arc::new(PipelineExecutor {
829            runner,
830            github,
831            issues: issues_provider,
832            db,
833            config: Config::default(),
834            cancel_token: CancellationToken::new(),
835            repo_dir: PathBuf::from("/tmp"),
836        });
837
838        let issues = vec![PipelineIssue {
839            number: 1,
840            title: "Test".to_string(),
841            body: "body".to_string(),
842            source: IssueOrigin::Github,
843            target_repo: None,
844            author: None,
845        }];
846
847        // plan_issues returns None for unparseable output
848        let plan = executor.plan_issues(&issues, &[]).await;
849        assert!(plan.is_none());
850    }
851
852    #[test]
853    fn graph_persisted_after_state_change() {
854        let rt = tokio::runtime::Builder::new_current_thread().build().unwrap();
855        rt.block_on(async {
856            let db = Arc::new(Mutex::new(crate::db::open_in_memory().unwrap()));
857            let runner = Arc::new(mock_runner_for_batch());
858            let github =
859                Arc::new(GhClient::new(mock_runner_for_batch(), std::path::Path::new("/tmp")));
860            let issues = make_github_provider(&github);
861            let executor = Arc::new(PipelineExecutor {
862                runner,
863                github,
864                issues,
865                db: Arc::clone(&db),
866                config: Config::default(),
867                cancel_token: CancellationToken::new(),
868                repo_dir: PathBuf::from("/tmp"),
869            });
870
871            let mut graph = DependencyGraph::new("persist-test");
872            graph.add_node(standalone_node(&make_issue(1)));
873            graph.transition(1, NodeState::InFlight);
874
875            let outcome = PipelineOutcome {
876                run_id: "run-1".to_string(),
877                pr_number: 10,
878                branch: Some("oven/issue-1-abc12345".to_string()),
879                worktree_path: PathBuf::from("/tmp/wt"),
880                target_dir: PathBuf::from("/tmp"),
881            };
882            handle_task_result(Ok((1, Ok(outcome))), &mut graph, &executor).await;
883
884            // Load from DB and verify
885            let loaded = DependencyGraph::from_db(&*db.lock().await, "persist-test").unwrap();
886            assert_eq!(loaded.node(1).unwrap().state, NodeState::AwaitingMerge);
887            assert_eq!(loaded.node(1).unwrap().pr_number, Some(10));
888        });
889    }
890
891    fn mock_runner_with_pr_state(state: &'static str) -> MockCommandRunner {
892        let mut mock = MockCommandRunner::new();
893        mock.expect_run_gh().returning(move |args, _| {
894            let args = args.to_vec();
895            Box::pin(async move {
896                if args.iter().any(|a| a == "view") {
897                    Ok(CommandOutput {
898                        stdout: format!(r#"{{"state":"{state}"}}"#),
899                        stderr: String::new(),
900                        success: true,
901                    })
902                } else {
903                    Ok(CommandOutput {
904                        stdout: String::new(),
905                        stderr: String::new(),
906                        success: true,
907                    })
908                }
909            })
910        });
911        mock.expect_run_claude().returning(|_, _, _, _, _| {
912            Box::pin(async {
913                Ok(AgentResult {
914                    cost_usd: 0.0,
915                    duration: Duration::from_secs(0),
916                    turns: 0,
917                    output: String::new(),
918                    session_id: String::new(),
919                    success: true,
920                })
921            })
922        });
923        mock
924    }
925
926    fn make_merge_poll_executor(state: &'static str) -> Arc<PipelineExecutor<MockCommandRunner>> {
927        let gh_mock = mock_runner_with_pr_state(state);
928        let github = Arc::new(GhClient::new(gh_mock, std::path::Path::new("/tmp")));
929        let issues = make_github_provider(&github);
930        let db = Arc::new(Mutex::new(crate::db::open_in_memory().unwrap()));
931        let runner = Arc::new(mock_runner_with_pr_state(state));
932        Arc::new(PipelineExecutor {
933            runner,
934            github,
935            issues,
936            db,
937            config: Config::default(),
938            cancel_token: CancellationToken::new(),
939            repo_dir: PathBuf::from("/tmp"),
940        })
941    }
942
943    #[test]
944    fn merge_polling_transitions_merged_pr() {
945        let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
946        rt.block_on(async {
947            let executor = make_merge_poll_executor("MERGED");
948
949            let mut graph = DependencyGraph::new("merge-poll-test");
950            let mut node = standalone_node(&make_issue(1));
951            node.pr_number = Some(42);
952            node.run_id = Some("run-1".to_string());
953            graph.add_node(node);
954            graph.transition(1, NodeState::AwaitingMerge);
955
956            poll_awaiting_merges(&mut graph, &executor).await;
957
958            assert_eq!(graph.node(1).unwrap().state, NodeState::Merged);
959        });
960    }
961
962    #[test]
963    fn merge_polling_transitions_node_without_issue() {
964        let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
965        rt.block_on(async {
966            let executor = make_merge_poll_executor("MERGED");
967
968            let mut graph = DependencyGraph::new("db-restore-test");
969            // Simulate a node restored from DB (no PipelineIssue attached)
970            let mut node = GraphNode {
971                issue_number: 1,
972                title: "Issue #1".to_string(),
973                area: "test".to_string(),
974                predicted_files: vec![],
975                has_migration: false,
976                complexity: "full".to_string(),
977                state: NodeState::Pending,
978                pr_number: Some(42),
979                run_id: Some("run-1".to_string()),
980                issue: None,
981                target_repo: None,
982            };
983            node.state = NodeState::Pending;
984            graph.add_node(node);
985            graph.transition(1, NodeState::AwaitingMerge);
986
987            poll_awaiting_merges(&mut graph, &executor).await;
988
989            // Should still transition to Merged even without issue data
990            assert_eq!(graph.node(1).unwrap().state, NodeState::Merged);
991        });
992    }
993
994    #[test]
995    fn merge_polling_handles_closed_pr() {
996        let rt = tokio::runtime::Builder::new_current_thread().build().unwrap();
997        rt.block_on(async {
998            let executor = make_merge_poll_executor("CLOSED");
999
1000            let plan = PlannerGraphOutput {
1001                nodes: vec![
1002                    crate::agents::PlannedNode {
1003                        number: 1,
1004                        title: "Root".to_string(),
1005                        area: "a".to_string(),
1006                        predicted_files: vec![],
1007                        has_migration: false,
1008                        complexity: Complexity::Full,
1009                        depends_on: vec![],
1010                        reasoning: String::new(),
1011                    },
1012                    crate::agents::PlannedNode {
1013                        number: 2,
1014                        title: "Dep".to_string(),
1015                        area: "b".to_string(),
1016                        predicted_files: vec![],
1017                        has_migration: false,
1018                        complexity: Complexity::Full,
1019                        depends_on: vec![1],
1020                        reasoning: String::new(),
1021                    },
1022                ],
1023                total_issues: 2,
1024                parallel_capacity: 1,
1025            };
1026            let test_issues = vec![make_issue(1), make_issue(2)];
1027            let mut graph =
1028                DependencyGraph::from_planner_output("merge-poll-close", &plan, &test_issues);
1029            graph.transition(1, NodeState::AwaitingMerge);
1030            graph.set_pr_number(1, 42);
1031            graph.set_run_id(1, "run-1");
1032
1033            poll_awaiting_merges(&mut graph, &executor).await;
1034
1035            assert_eq!(graph.node(1).unwrap().state, NodeState::Failed);
1036            // Dependent should be transitively failed
1037            assert_eq!(graph.node(2).unwrap().state, NodeState::Failed);
1038        });
1039    }
1040
1041    #[test]
1042    fn merge_unlocks_dependent() {
1043        let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
1044        rt.block_on(async {
1045            let executor = make_merge_poll_executor("MERGED");
1046
1047            let plan = PlannerGraphOutput {
1048                nodes: vec![
1049                    crate::agents::PlannedNode {
1050                        number: 1,
1051                        title: "Root".to_string(),
1052                        area: "a".to_string(),
1053                        predicted_files: vec![],
1054                        has_migration: false,
1055                        complexity: Complexity::Full,
1056                        depends_on: vec![],
1057                        reasoning: String::new(),
1058                    },
1059                    crate::agents::PlannedNode {
1060                        number: 2,
1061                        title: "Dep".to_string(),
1062                        area: "b".to_string(),
1063                        predicted_files: vec![],
1064                        has_migration: false,
1065                        complexity: Complexity::Full,
1066                        depends_on: vec![1],
1067                        reasoning: String::new(),
1068                    },
1069                ],
1070                total_issues: 2,
1071                parallel_capacity: 1,
1072            };
1073            let test_issues = vec![make_issue(1), make_issue(2)];
1074            let mut graph =
1075                DependencyGraph::from_planner_output("merge-unlock", &plan, &test_issues);
1076            graph.transition(1, NodeState::AwaitingMerge);
1077            graph.set_pr_number(1, 42);
1078            graph.set_run_id(1, "run-1");
1079
1080            // Before polling: node 2 is not ready (dep 1 is AwaitingMerge)
1081            assert!(graph.ready_issues().is_empty());
1082
1083            poll_awaiting_merges(&mut graph, &executor).await;
1084
1085            // After polling: node 1 merged, node 2 should now be ready
1086            assert_eq!(graph.node(1).unwrap().state, NodeState::Merged);
1087            assert_eq!(graph.ready_issues(), vec![2]);
1088        });
1089    }
1090
1091    #[tokio::test]
1092    async fn fetch_base_branches_handles_multiple_repos() {
1093        // Create two independent repos with remotes so `default_branch` and
1094        // `fetch_branch` work.
1095        async fn repo_with_remote() -> (tempfile::TempDir, tempfile::TempDir) {
1096            use tokio::process::Command as TokioCmd;
1097
1098            let dir = tempfile::tempdir().unwrap();
1099            for (args, cwd) in [
1100                (vec!["init"], dir.path()),
1101                (vec!["config", "user.email", "test@test.com"], dir.path()),
1102                (vec!["config", "user.name", "Test"], dir.path()),
1103            ] {
1104                TokioCmd::new("git").args(&args).current_dir(cwd).output().await.unwrap();
1105            }
1106            tokio::fs::write(dir.path().join("README.md"), "init").await.unwrap();
1107            TokioCmd::new("git").args(["add", "."]).current_dir(dir.path()).output().await.unwrap();
1108            TokioCmd::new("git")
1109                .args(["commit", "-m", "init"])
1110                .current_dir(dir.path())
1111                .output()
1112                .await
1113                .unwrap();
1114
1115            let remote = tempfile::tempdir().unwrap();
1116            TokioCmd::new("git")
1117                .args(["clone", "--bare", dir.path().to_string_lossy().as_ref(), "."])
1118                .current_dir(remote.path())
1119                .output()
1120                .await
1121                .unwrap();
1122            TokioCmd::new("git")
1123                .args(["remote", "add", "origin", remote.path().to_string_lossy().as_ref()])
1124                .current_dir(dir.path())
1125                .output()
1126                .await
1127                .unwrap();
1128            TokioCmd::new("git")
1129                .args(["fetch", "origin"])
1130                .current_dir(dir.path())
1131                .output()
1132                .await
1133                .unwrap();
1134
1135            (dir, remote)
1136        }
1137
1138        let (repo_a, _remote_a) = repo_with_remote().await;
1139        let (repo_b, _remote_b) = repo_with_remote().await;
1140
1141        let mut dirs = HashSet::new();
1142        dirs.insert(repo_a.path().to_path_buf());
1143        dirs.insert(repo_b.path().to_path_buf());
1144
1145        // Should not panic or error -- just fetches both repos
1146        fetch_base_branches(&dirs).await;
1147    }
1148
1149    #[tokio::test]
1150    async fn fetch_base_branches_skips_invalid_repo_gracefully() {
1151        let mut dirs = HashSet::new();
1152        dirs.insert(PathBuf::from("/tmp/nonexistent-repo-12345"));
1153
1154        // Should log a warning but not panic
1155        fetch_base_branches(&dirs).await;
1156    }
1157}