Skip to main content

oven_cli/pipeline/
runner.rs

1use std::{
2    collections::{HashMap, HashSet},
3    sync::Arc,
4    time::Duration,
5};
6
7use anyhow::Result;
8use tokio::{
9    sync::{Mutex, Semaphore},
10    task::JoinSet,
11};
12use tokio_util::sync::CancellationToken;
13use tracing::{error, info, warn};
14
15use super::executor::PipelineExecutor;
16use crate::{
17    agents::{InFlightIssue, PlannerOutput},
18    issues::PipelineIssue,
19    process::CommandRunner,
20};
21
22/// An issue the planner has evaluated and placed in a later batch.
23///
24/// Stored across poll cycles so we skip re-invoking the planner for issues whose
25/// dependency chain is already known. The `awaiting` set tracks which issue numbers
26/// must complete before this issue can be promoted to in-flight.
27#[derive(Debug, Clone)]
28struct DeferredIssue {
29    issue: PipelineIssue,
30    metadata: InFlightIssue,
31    awaiting: HashSet<u32>,
32}
33
34/// Run the pipeline for a batch of issues using planner-driven sequencing.
35///
36/// Used for the explicit-IDs path (`oven on 42,43`). Calls the planner with no
37/// in-flight context, then runs batches sequentially (issues within each batch
38/// run in parallel). Falls back to all-parallel if the planner fails.
39pub async fn run_batch<R: CommandRunner + 'static>(
40    executor: &Arc<PipelineExecutor<R>>,
41    issues: Vec<PipelineIssue>,
42    max_parallel: usize,
43    auto_merge: bool,
44) -> Result<()> {
45    if let Some(plan) = executor.plan_issues(&issues, &[]).await {
46        info!(
47            batches = plan.batches.len(),
48            total = plan.total_issues,
49            "planner produced a plan, running batches sequentially"
50        );
51        run_batches_sequentially(executor, &issues, &plan, max_parallel, auto_merge).await
52    } else {
53        warn!("planner failed, falling back to all-parallel execution");
54        run_all_parallel(executor, issues, max_parallel, auto_merge).await
55    }
56}
57
58/// Run planner batches in sequence: wait for batch N to complete before starting batch N+1.
59/// Issues within each batch run in parallel.
60async fn run_batches_sequentially<R: CommandRunner + 'static>(
61    executor: &Arc<PipelineExecutor<R>>,
62    issues: &[PipelineIssue],
63    plan: &PlannerOutput,
64    max_parallel: usize,
65    auto_merge: bool,
66) -> Result<()> {
67    let issue_map: HashMap<u32, &PipelineIssue> = issues.iter().map(|i| (i.number, i)).collect();
68
69    for batch in &plan.batches {
70        let batch_issues: Vec<PipelineIssue> = batch
71            .issues
72            .iter()
73            .filter_map(|pi| issue_map.get(&pi.number).map(|i| (*i).clone()))
74            .collect();
75
76        if batch_issues.is_empty() {
77            continue;
78        }
79
80        info!(
81            batch = batch.batch,
82            count = batch_issues.len(),
83            reasoning = %batch.reasoning,
84            "starting batch"
85        );
86
87        run_single_batch(executor, batch_issues, &batch.issues, max_parallel, auto_merge).await?;
88    }
89
90    Ok(())
91}
92
93/// Run a single batch of issues in parallel with complexity from planner output.
94async fn run_single_batch<R: CommandRunner + 'static>(
95    executor: &Arc<PipelineExecutor<R>>,
96    issues: Vec<PipelineIssue>,
97    planned: &[crate::agents::PlannedIssue],
98    max_parallel: usize,
99    auto_merge: bool,
100) -> Result<()> {
101    let complexity_map: HashMap<u32, crate::agents::Complexity> =
102        planned.iter().map(|pi| (pi.number, pi.complexity.clone())).collect();
103    let semaphore = Arc::new(Semaphore::new(max_parallel));
104    let mut tasks = JoinSet::new();
105
106    for issue in issues {
107        let permit = semaphore
108            .clone()
109            .acquire_owned()
110            .await
111            .map_err(|e| anyhow::anyhow!("semaphore closed: {e}"))?;
112        let exec = Arc::clone(executor);
113        let complexity = complexity_map.get(&issue.number).cloned();
114        tasks.spawn(async move {
115            let number = issue.number;
116            let result = exec.run_issue_with_complexity(&issue, auto_merge, complexity).await;
117            drop(permit);
118            (number, result)
119        });
120    }
121
122    let mut had_errors = false;
123    while let Some(join_result) = tasks.join_next().await {
124        match join_result {
125            Ok((number, Err(e))) => {
126                error!(issue = number, error = %e, "pipeline failed for issue");
127                had_errors = true;
128            }
129            Err(e) => {
130                error!(error = %e, "pipeline task panicked");
131                had_errors = true;
132            }
133            Ok((number, Ok(()))) => {
134                info!(issue = number, "pipeline completed successfully");
135            }
136        }
137    }
138
139    if had_errors { Err(anyhow::anyhow!("one or more pipelines failed in batch")) } else { Ok(()) }
140}
141
142/// Fallback: run all issues in parallel behind a semaphore (no planner guidance).
143async fn run_all_parallel<R: CommandRunner + 'static>(
144    executor: &Arc<PipelineExecutor<R>>,
145    issues: Vec<PipelineIssue>,
146    max_parallel: usize,
147    auto_merge: bool,
148) -> Result<()> {
149    let semaphore = Arc::new(Semaphore::new(max_parallel));
150    let mut tasks = JoinSet::new();
151
152    for issue in issues {
153        let permit = semaphore
154            .clone()
155            .acquire_owned()
156            .await
157            .map_err(|e| anyhow::anyhow!("semaphore closed: {e}"))?;
158        let exec = Arc::clone(executor);
159        tasks.spawn(async move {
160            let number = issue.number;
161            let result = exec.run_issue(&issue, auto_merge).await;
162            drop(permit);
163            (number, result)
164        });
165    }
166
167    let mut had_errors = false;
168    while let Some(join_result) = tasks.join_next().await {
169        match join_result {
170            Ok((number, Ok(()))) => {
171                info!(issue = number, "pipeline completed successfully");
172            }
173            Ok((number, Err(e))) => {
174                error!(issue = number, error = %e, "pipeline failed for issue");
175                had_errors = true;
176            }
177            Err(e) => {
178                error!(error = %e, "pipeline task panicked");
179                had_errors = true;
180            }
181        }
182    }
183
184    if had_errors {
185        anyhow::bail!("one or more pipelines failed");
186    }
187    Ok(())
188}
189
190fn handle_task_result(result: Result<(u32, Result<()>), tokio::task::JoinError>) {
191    match result {
192        Ok((number, Ok(()))) => {
193            info!(issue = number, "pipeline completed successfully");
194        }
195        Ok((number, Err(e))) => {
196            error!(issue = number, error = %e, "pipeline failed for issue");
197        }
198        Err(e) => {
199            error!(error = %e, "pipeline task panicked");
200        }
201    }
202}
203
204/// Poll for new issues and run them through the pipeline.
205///
206/// Unlike `run_batch`, this function continuously polls for new issues even while
207/// existing pipelines are running. Uses a shared semaphore and `JoinSet` that persist
208/// across poll cycles, with in-flight and deferred tracking to prevent double-spawning
209/// and avoid re-invoking the planner for issues whose dependency chain is already known.
210///
211/// Deferred issues (batch 2+) are stored locally and promoted automatically when their
212/// dependencies complete, saving planner tokens on subsequent poll cycles.
213pub async fn polling_loop<R: CommandRunner + 'static>(
214    executor: Arc<PipelineExecutor<R>>,
215    auto_merge: bool,
216    cancel_token: CancellationToken,
217) -> Result<()> {
218    let poll_interval = Duration::from_secs(executor.config.pipeline.poll_interval);
219    let max_parallel = executor.config.pipeline.max_parallel as usize;
220    let ready_label = executor.config.labels.ready.clone();
221    let semaphore = Arc::new(Semaphore::new(max_parallel));
222    let mut tasks = JoinSet::new();
223    let in_flight: Arc<Mutex<HashMap<u32, InFlightIssue>>> = Arc::new(Mutex::new(HashMap::new()));
224    let deferred: Arc<Mutex<HashMap<u32, DeferredIssue>>> = Arc::new(Mutex::new(HashMap::new()));
225
226    info!(poll_interval_secs = poll_interval.as_secs(), max_parallel, "continuous polling started");
227
228    loop {
229        tokio::select! {
230            () = cancel_token.cancelled() => {
231                info!("shutdown signal received, waiting for in-flight pipelines");
232                while let Some(result) = tasks.join_next().await {
233                    handle_task_result(result);
234                }
235                break;
236            }
237            () = tokio::time::sleep(poll_interval) => {
238                poll_and_spawn(
239                    &executor, &ready_label, &semaphore, &in_flight, &deferred,
240                    &mut tasks, auto_merge,
241                ).await;
242            }
243            Some(result) = tasks.join_next(), if !tasks.is_empty() => {
244                handle_task_result(result);
245            }
246        }
247    }
248
249    Ok(())
250}
251
252/// Remove deferred entries for issues no longer in the ready list and clear
253/// their numbers from other deferred issues' awaiting sets.
254async fn clean_stale_deferred(
255    deferred: &Arc<Mutex<HashMap<u32, DeferredIssue>>>,
256    ready_numbers: &HashSet<u32>,
257) {
258    let mut def_guard = deferred.lock().await;
259    let stale: HashSet<u32> =
260        def_guard.keys().filter(|num| !ready_numbers.contains(num)).copied().collect();
261    if !stale.is_empty() {
262        info!(count = stale.len(), "removing stale deferred issues");
263        def_guard.retain(|num, _| !stale.contains(num));
264        for d in def_guard.values_mut() {
265            d.awaiting.retain(|n| !stale.contains(n));
266        }
267    }
268}
269
270/// Promote deferred issues whose awaiting sets have fully cleared.
271async fn promote_deferred(
272    deferred: &Arc<Mutex<HashMap<u32, DeferredIssue>>>,
273) -> Vec<(PipelineIssue, InFlightIssue)> {
274    let mut promoted = Vec::new();
275    let mut def_guard = deferred.lock().await;
276    let ready: Vec<u32> =
277        def_guard.iter().filter(|(_, d)| d.awaiting.is_empty()).map(|(num, _)| *num).collect();
278    for num in ready {
279        if let Some(d) = def_guard.remove(&num) {
280            info!(issue = num, "promoting deferred issue (dependencies cleared)");
281            promoted.push((d.issue, d.metadata));
282        }
283    }
284    promoted
285}
286
287/// Single poll cycle: promote ready deferred issues, plan genuinely new ones, and spawn.
288///
289/// Only invokes the planner for issues not already tracked in `in_flight` or `deferred`.
290/// Deferred issues whose `awaiting` set has cleared are promoted without a planner call.
291async fn poll_and_spawn<R: CommandRunner + 'static>(
292    executor: &Arc<PipelineExecutor<R>>,
293    ready_label: &str,
294    semaphore: &Arc<Semaphore>,
295    in_flight: &Arc<Mutex<HashMap<u32, InFlightIssue>>>,
296    deferred: &Arc<Mutex<HashMap<u32, DeferredIssue>>>,
297    tasks: &mut JoinSet<(u32, Result<()>)>,
298    auto_merge: bool,
299) {
300    let ready_issues = match executor.issues.get_ready_issues(ready_label).await {
301        Ok(i) => i,
302        Err(e) => {
303            error!(error = %e, "failed to fetch issues");
304            return;
305        }
306    };
307
308    let ready_numbers: HashSet<u32> = ready_issues.iter().map(|i| i.number).collect();
309    clean_stale_deferred(deferred, &ready_numbers).await;
310
311    // Snapshot in-flight and deferred state, then filter to genuinely new issues
312    let in_flight_guard = in_flight.lock().await;
313    let in_flight_snapshot: Vec<InFlightIssue> = in_flight_guard.values().cloned().collect();
314    let in_flight_numbers: HashSet<u32> = in_flight_guard.keys().copied().collect();
315    drop(in_flight_guard);
316
317    let deferred_guard = deferred.lock().await;
318    let deferred_context: Vec<InFlightIssue> =
319        deferred_guard.values().map(|d| d.metadata.clone()).collect();
320    let deferred_numbers: HashSet<u32> = deferred_guard.keys().copied().collect();
321    drop(deferred_guard);
322
323    let new_issues: Vec<_> = ready_issues
324        .into_iter()
325        .filter(|i| !in_flight_numbers.contains(&i.number) && !deferred_numbers.contains(&i.number))
326        .collect();
327
328    let mut to_spawn = promote_deferred(deferred).await;
329
330    // Only invoke the planner for genuinely new issues
331    if !new_issues.is_empty() {
332        info!(count = new_issues.len(), "found new issues to evaluate");
333
334        let mut planner_context = in_flight_snapshot;
335        planner_context.extend(deferred_context);
336
337        if let Some(plan) = executor.plan_issues(&new_issues, &planner_context).await {
338            info!(
339                batches = plan.batches.len(),
340                total = plan.total_issues,
341                "planner produced a plan"
342            );
343            apply_plan(&new_issues, &plan, &in_flight_numbers, &mut to_spawn, deferred).await;
344        } else {
345            warn!("planner failed, spawning all new issues immediately");
346            for issue in &new_issues {
347                to_spawn.push((issue.clone(), InFlightIssue::from_issue(issue)));
348            }
349        }
350    }
351
352    if to_spawn.is_empty() {
353        if new_issues.is_empty() {
354            info!("no actionable issues, waiting");
355        }
356        return;
357    }
358
359    spawn_issues(to_spawn, semaphore, executor, in_flight, deferred, tasks, auto_merge).await;
360}
361
362/// Apply a planner output: add batch 1 issues to spawn list, batch 2+ to deferred.
363async fn apply_plan(
364    new_issues: &[PipelineIssue],
365    plan: &PlannerOutput,
366    in_flight_numbers: &HashSet<u32>,
367    to_spawn: &mut Vec<(PipelineIssue, InFlightIssue)>,
368    deferred: &Arc<Mutex<HashMap<u32, DeferredIssue>>>,
369) {
370    let (spawn_map, defer_list) = split_plan(plan, in_flight_numbers);
371    let issue_map: HashMap<u32, &PipelineIssue> =
372        new_issues.iter().map(|i| (i.number, i)).collect();
373
374    for issue in new_issues {
375        if let Some(metadata) = spawn_map.get(&issue.number) {
376            to_spawn.push((issue.clone(), metadata.clone()));
377        }
378    }
379
380    let mut def_guard = deferred.lock().await;
381    for (number, metadata, awaiting) in defer_list {
382        if let Some(issue) = issue_map.get(&number) {
383            info!(
384                issue = number,
385                awaiting_count = awaiting.len(),
386                "deferring issue (waiting for dependencies)"
387            );
388            def_guard.insert(number, DeferredIssue { issue: (*issue).clone(), metadata, awaiting });
389        }
390    }
391}
392
393/// Spawn pipeline tasks for a set of issues.
394async fn spawn_issues<R: CommandRunner + 'static>(
395    to_spawn: Vec<(PipelineIssue, InFlightIssue)>,
396    semaphore: &Arc<Semaphore>,
397    executor: &Arc<PipelineExecutor<R>>,
398    in_flight: &Arc<Mutex<HashMap<u32, InFlightIssue>>>,
399    deferred: &Arc<Mutex<HashMap<u32, DeferredIssue>>>,
400    tasks: &mut JoinSet<(u32, Result<()>)>,
401    auto_merge: bool,
402) {
403    for (issue, metadata) in to_spawn {
404        let sem = Arc::clone(semaphore);
405        let exec = Arc::clone(executor);
406        let in_fl = Arc::clone(in_flight);
407        let def = Arc::clone(deferred);
408        let number = issue.number;
409        let complexity = Some(metadata.complexity.clone());
410
411        in_fl.lock().await.insert(number, metadata);
412
413        tasks.spawn(async move {
414            let permit = match sem.acquire_owned().await {
415                Ok(p) => p,
416                Err(e) => {
417                    in_fl.lock().await.remove(&number);
418                    return (number, Err(anyhow::anyhow!("semaphore closed: {e}")));
419                }
420            };
421            let result = exec.run_issue_with_complexity(&issue, auto_merge, complexity).await;
422            in_fl.lock().await.remove(&number);
423            // Clear this issue from deferred awaiting sets so dependents can be promoted
424            {
425                let mut def_guard = def.lock().await;
426                for d in def_guard.values_mut() {
427                    d.awaiting.remove(&number);
428                }
429            }
430            drop(permit);
431            (number, result)
432        });
433    }
434}
435
436/// A deferred issue's number, planner metadata, and the set of issues it must wait for.
437type DeferredEntry = (u32, InFlightIssue, HashSet<u32>);
438
439/// Separate a planner output into batch 1 (spawn immediately) and deferred batches.
440///
441/// `in_flight_numbers` are issue numbers currently running -- they form the implicit
442/// "batch 0" that deferred issues must wait for in addition to lower-numbered batches.
443fn split_plan(
444    plan: &PlannerOutput,
445    in_flight_numbers: &HashSet<u32>,
446) -> (HashMap<u32, InFlightIssue>, Vec<DeferredEntry>) {
447    let mut to_spawn = HashMap::new();
448    let mut to_defer = Vec::new();
449    let mut lower_batch: HashSet<u32> = in_flight_numbers.clone();
450
451    for batch in &plan.batches {
452        if batch.batch == 1 {
453            for pi in &batch.issues {
454                to_spawn.insert(pi.number, InFlightIssue::from(pi));
455            }
456        } else {
457            for pi in &batch.issues {
458                to_defer.push((pi.number, InFlightIssue::from(pi), lower_batch.clone()));
459            }
460        }
461        for pi in &batch.issues {
462            lower_batch.insert(pi.number);
463        }
464    }
465
466    (to_spawn, to_defer)
467}
468
469#[cfg(test)]
470mod tests {
471    use std::{collections::HashSet, path::PathBuf};
472
473    use tokio::sync::Mutex;
474
475    use super::*;
476    use crate::{
477        agents::{Batch, Complexity, PlannedIssue},
478        config::Config,
479        github::GhClient,
480        issues::{IssueOrigin, IssueProvider, github::GithubIssueProvider},
481        process::{AgentResult, CommandOutput, MockCommandRunner},
482    };
483
484    fn mock_runner_for_batch() -> MockCommandRunner {
485        let mut mock = MockCommandRunner::new();
486        mock.expect_run_gh().returning(|_, _| {
487            Box::pin(async {
488                Ok(CommandOutput {
489                    stdout: "https://github.com/user/repo/pull/1\n".to_string(),
490                    stderr: String::new(),
491                    success: true,
492                })
493            })
494        });
495        mock.expect_run_claude().returning(|_, _, _, _| {
496            Box::pin(async {
497                Ok(AgentResult {
498                    cost_usd: 1.0,
499                    duration: Duration::from_secs(5),
500                    turns: 3,
501                    output: r#"{"findings":[],"summary":"clean"}"#.to_string(),
502                    session_id: "sess-1".to_string(),
503                    success: true,
504                })
505            })
506        });
507        mock
508    }
509
510    fn make_github_provider(gh: &Arc<GhClient<MockCommandRunner>>) -> Arc<dyn IssueProvider> {
511        Arc::new(GithubIssueProvider::new(Arc::clone(gh), "target_repo"))
512    }
513
514    #[tokio::test]
515    async fn cancellation_stops_polling() {
516        let cancel = CancellationToken::new();
517        let runner = Arc::new(mock_runner_for_batch());
518        let github = Arc::new(GhClient::new(mock_runner_for_batch(), std::path::Path::new("/tmp")));
519        let issues = make_github_provider(&github);
520        let db = Arc::new(Mutex::new(crate::db::open_in_memory().unwrap()));
521
522        let mut config = Config::default();
523        config.pipeline.poll_interval = 3600; // very long so we don't actually poll
524
525        let executor = Arc::new(PipelineExecutor {
526            runner,
527            github,
528            issues,
529            db,
530            config,
531            cancel_token: cancel.clone(),
532            repo_dir: PathBuf::from("/tmp"),
533        });
534
535        let cancel_clone = cancel.clone();
536        let handle = tokio::spawn(async move { polling_loop(executor, false, cancel_clone).await });
537
538        // Cancel immediately
539        cancel.cancel();
540
541        let result = handle.await.unwrap();
542        assert!(result.is_ok());
543    }
544
545    #[tokio::test]
546    async fn cancellation_exits_within_timeout() {
547        let cancel = CancellationToken::new();
548        let runner = Arc::new(mock_runner_for_batch());
549        let github = Arc::new(GhClient::new(mock_runner_for_batch(), std::path::Path::new("/tmp")));
550        let issues = make_github_provider(&github);
551        let db = Arc::new(Mutex::new(crate::db::open_in_memory().unwrap()));
552
553        let mut config = Config::default();
554        config.pipeline.poll_interval = 3600;
555
556        let executor = Arc::new(PipelineExecutor {
557            runner,
558            github,
559            issues,
560            db,
561            config,
562            cancel_token: cancel.clone(),
563            repo_dir: PathBuf::from("/tmp"),
564        });
565
566        let cancel_clone = cancel.clone();
567        let handle = tokio::spawn(async move { polling_loop(executor, false, cancel_clone).await });
568
569        cancel.cancel();
570
571        let result = tokio::time::timeout(Duration::from_secs(5), handle)
572            .await
573            .expect("polling loop should exit within timeout")
574            .unwrap();
575        assert!(result.is_ok());
576    }
577
578    #[tokio::test]
579    async fn in_flight_map_filters_duplicate_issues() {
580        let in_flight: Arc<Mutex<HashMap<u32, InFlightIssue>>> =
581            Arc::new(Mutex::new(HashMap::new()));
582
583        // Simulate issue 1 already in flight
584        in_flight.lock().await.insert(
585            1,
586            InFlightIssue {
587                number: 1,
588                title: "Already running".to_string(),
589                area: "auth".to_string(),
590                predicted_files: vec!["src/auth.rs".to_string()],
591                has_migration: false,
592                complexity: Complexity::Full,
593            },
594        );
595
596        let issues = vec![
597            PipelineIssue {
598                number: 1,
599                title: "Already running".to_string(),
600                body: String::new(),
601                source: IssueOrigin::Github,
602                target_repo: None,
603            },
604            PipelineIssue {
605                number: 2,
606                title: "New issue".to_string(),
607                body: String::new(),
608                source: IssueOrigin::Github,
609                target_repo: None,
610            },
611            PipelineIssue {
612                number: 3,
613                title: "Another new".to_string(),
614                body: String::new(),
615                source: IssueOrigin::Github,
616                target_repo: None,
617            },
618        ];
619
620        let guard = in_flight.lock().await;
621        let new_issues: Vec<_> =
622            issues.into_iter().filter(|i| !guard.contains_key(&i.number)).collect();
623        drop(guard);
624
625        assert_eq!(new_issues.len(), 2);
626        assert_eq!(new_issues[0].number, 2);
627        assert_eq!(new_issues[1].number, 3);
628    }
629
630    #[test]
631    fn handle_task_result_does_not_panic_on_success() {
632        handle_task_result(Ok((1, Ok(()))));
633    }
634
635    #[test]
636    fn handle_task_result_does_not_panic_on_error() {
637        handle_task_result(Ok((1, Err(anyhow::anyhow!("test error")))));
638    }
639
640    #[test]
641    fn split_plan_separates_batches() {
642        let plan = PlannerOutput {
643            batches: vec![
644                Batch {
645                    batch: 1,
646                    issues: vec![
647                        PlannedIssue {
648                            number: 1,
649                            title: "First".to_string(),
650                            area: "cli".to_string(),
651                            predicted_files: vec!["src/cli.rs".to_string()],
652                            has_migration: false,
653                            complexity: Complexity::Simple,
654                        },
655                        PlannedIssue {
656                            number: 2,
657                            title: "Second".to_string(),
658                            area: "config".to_string(),
659                            predicted_files: vec!["src/config.rs".to_string()],
660                            has_migration: false,
661                            complexity: Complexity::Full,
662                        },
663                    ],
664                    reasoning: "independent".to_string(),
665                },
666                Batch {
667                    batch: 2,
668                    issues: vec![PlannedIssue {
669                        number: 3,
670                        title: "Third".to_string(),
671                        area: "db".to_string(),
672                        predicted_files: vec!["src/db.rs".to_string()],
673                        has_migration: true,
674                        complexity: Complexity::Full,
675                    }],
676                    reasoning: "depends on batch 1".to_string(),
677                },
678            ],
679            total_issues: 3,
680            parallel_capacity: 2,
681        };
682
683        let (spawn_map, defer_list) = split_plan(&plan, &HashSet::new());
684
685        assert_eq!(spawn_map.len(), 2);
686        assert_eq!(spawn_map.get(&1).unwrap().complexity, Complexity::Simple);
687        assert_eq!(spawn_map.get(&1).unwrap().area, "cli");
688        assert_eq!(spawn_map.get(&2).unwrap().complexity, Complexity::Full);
689
690        assert_eq!(defer_list.len(), 1);
691        let (num, meta, awaiting) = &defer_list[0];
692        assert_eq!(*num, 3);
693        assert_eq!(meta.area, "db");
694        assert!(awaiting.contains(&1));
695        assert!(awaiting.contains(&2));
696        assert_eq!(awaiting.len(), 2);
697    }
698
699    #[test]
700    fn split_plan_empty() {
701        let plan = PlannerOutput { batches: vec![], total_issues: 0, parallel_capacity: 0 };
702        let (spawn_map, defer_list) = split_plan(&plan, &HashSet::new());
703        assert!(spawn_map.is_empty());
704        assert!(defer_list.is_empty());
705    }
706
707    #[test]
708    fn split_plan_includes_in_flight_in_awaiting() {
709        let plan = PlannerOutput {
710            batches: vec![
711                Batch {
712                    batch: 1,
713                    issues: vec![PlannedIssue {
714                        number: 5,
715                        title: "New".to_string(),
716                        area: "cli".to_string(),
717                        predicted_files: vec![],
718                        has_migration: false,
719                        complexity: Complexity::Simple,
720                    }],
721                    reasoning: "ok".to_string(),
722                },
723                Batch {
724                    batch: 2,
725                    issues: vec![PlannedIssue {
726                        number: 6,
727                        title: "Depends".to_string(),
728                        area: "db".to_string(),
729                        predicted_files: vec![],
730                        has_migration: true,
731                        complexity: Complexity::Full,
732                    }],
733                    reasoning: "conflicts".to_string(),
734                },
735            ],
736            total_issues: 2,
737            parallel_capacity: 1,
738        };
739
740        let in_flight_nums: HashSet<u32> = [10, 11].into_iter().collect();
741        let (spawn_map, defer_list) = split_plan(&plan, &in_flight_nums);
742
743        assert_eq!(spawn_map.len(), 1);
744        assert!(spawn_map.contains_key(&5));
745
746        assert_eq!(defer_list.len(), 1);
747        let (num, _, awaiting) = &defer_list[0];
748        assert_eq!(*num, 6);
749        assert!(awaiting.contains(&10));
750        assert!(awaiting.contains(&11));
751        assert!(awaiting.contains(&5));
752        assert_eq!(awaiting.len(), 3);
753    }
754
755    #[test]
756    fn split_plan_three_batches_chain_awaiting() {
757        let plan = PlannerOutput {
758            batches: vec![
759                Batch {
760                    batch: 1,
761                    issues: vec![PlannedIssue {
762                        number: 1,
763                        title: "A".to_string(),
764                        area: "a".to_string(),
765                        predicted_files: vec![],
766                        has_migration: false,
767                        complexity: Complexity::Simple,
768                    }],
769                    reasoning: String::new(),
770                },
771                Batch {
772                    batch: 2,
773                    issues: vec![PlannedIssue {
774                        number: 2,
775                        title: "B".to_string(),
776                        area: "b".to_string(),
777                        predicted_files: vec![],
778                        has_migration: false,
779                        complexity: Complexity::Full,
780                    }],
781                    reasoning: String::new(),
782                },
783                Batch {
784                    batch: 3,
785                    issues: vec![PlannedIssue {
786                        number: 3,
787                        title: "C".to_string(),
788                        area: "c".to_string(),
789                        predicted_files: vec![],
790                        has_migration: false,
791                        complexity: Complexity::Full,
792                    }],
793                    reasoning: String::new(),
794                },
795            ],
796            total_issues: 3,
797            parallel_capacity: 1,
798        };
799
800        let (spawn_map, defer_list) = split_plan(&plan, &HashSet::new());
801
802        assert_eq!(spawn_map.len(), 1);
803        assert!(spawn_map.contains_key(&1));
804
805        assert_eq!(defer_list.len(), 2);
806        let (_, _, awaiting_2) = &defer_list[0];
807        assert_eq!(*awaiting_2, HashSet::from([1]));
808        let (_, _, awaiting_3) = &defer_list[1];
809        assert_eq!(*awaiting_3, HashSet::from([1, 2]));
810    }
811
812    #[tokio::test]
813    async fn deferred_issues_filtered_from_new_issues() {
814        let in_flight: Arc<Mutex<HashMap<u32, InFlightIssue>>> =
815            Arc::new(Mutex::new(HashMap::new()));
816        let deferred: Arc<Mutex<HashMap<u32, DeferredIssue>>> =
817            Arc::new(Mutex::new(HashMap::new()));
818
819        in_flight.lock().await.insert(
820            1,
821            InFlightIssue {
822                number: 1,
823                title: "Running".to_string(),
824                area: "auth".to_string(),
825                predicted_files: vec![],
826                has_migration: false,
827                complexity: Complexity::Full,
828            },
829        );
830
831        deferred.lock().await.insert(
832            2,
833            DeferredIssue {
834                issue: PipelineIssue {
835                    number: 2,
836                    title: "Waiting".to_string(),
837                    body: String::new(),
838                    source: IssueOrigin::Github,
839                    target_repo: None,
840                },
841                metadata: InFlightIssue {
842                    number: 2,
843                    title: "Waiting".to_string(),
844                    area: "db".to_string(),
845                    predicted_files: vec![],
846                    has_migration: false,
847                    complexity: Complexity::Full,
848                },
849                awaiting: HashSet::from([1]),
850            },
851        );
852
853        let issues = vec![
854            PipelineIssue {
855                number: 1,
856                title: "Running".to_string(),
857                body: String::new(),
858                source: IssueOrigin::Github,
859                target_repo: None,
860            },
861            PipelineIssue {
862                number: 2,
863                title: "Waiting".to_string(),
864                body: String::new(),
865                source: IssueOrigin::Github,
866                target_repo: None,
867            },
868            PipelineIssue {
869                number: 3,
870                title: "New".to_string(),
871                body: String::new(),
872                source: IssueOrigin::Github,
873                target_repo: None,
874            },
875        ];
876
877        let ifl = in_flight.lock().await;
878        let def = deferred.lock().await;
879        let new_issues: Vec<_> = issues
880            .into_iter()
881            .filter(|i| !ifl.contains_key(&i.number) && !def.contains_key(&i.number))
882            .collect();
883        drop(ifl);
884        drop(def);
885
886        assert_eq!(new_issues.len(), 1);
887        assert_eq!(new_issues[0].number, 3);
888    }
889
890    #[tokio::test]
891    async fn deferred_promotion_when_awaiting_clears() {
892        let deferred: Arc<Mutex<HashMap<u32, DeferredIssue>>> =
893            Arc::new(Mutex::new(HashMap::new()));
894
895        deferred.lock().await.insert(
896            3,
897            DeferredIssue {
898                issue: PipelineIssue {
899                    number: 3,
900                    title: "Deferred".to_string(),
901                    body: String::new(),
902                    source: IssueOrigin::Github,
903                    target_repo: None,
904                },
905                metadata: InFlightIssue {
906                    number: 3,
907                    title: "Deferred".to_string(),
908                    area: "db".to_string(),
909                    predicted_files: vec![],
910                    has_migration: true,
911                    complexity: Complexity::Full,
912                },
913                awaiting: HashSet::from([1, 2]),
914            },
915        );
916
917        // Issue 1 completes
918        {
919            let mut guard = deferred.lock().await;
920            for d in guard.values_mut() {
921                d.awaiting.remove(&1);
922            }
923        }
924
925        // Still waiting on issue 2
926        assert!(
927            deferred.lock().await.values().all(|d| !d.awaiting.is_empty()),
928            "should not be promotable yet"
929        );
930
931        // Issue 2 completes
932        {
933            let mut guard = deferred.lock().await;
934            for d in guard.values_mut() {
935                d.awaiting.remove(&2);
936            }
937        }
938
939        // Now issue 3 is promotable
940        {
941            let guard = deferred.lock().await;
942            let promotable: Vec<u32> =
943                guard.iter().filter(|(_, d)| d.awaiting.is_empty()).map(|(n, _)| *n).collect();
944            assert_eq!(promotable, vec![3]);
945            drop(guard);
946        }
947    }
948
949    #[tokio::test]
950    async fn stale_deferred_issues_cleaned_up() {
951        let deferred: Arc<Mutex<HashMap<u32, DeferredIssue>>> =
952            Arc::new(Mutex::new(HashMap::new()));
953
954        {
955            let mut guard = deferred.lock().await;
956            guard.insert(
957                2,
958                DeferredIssue {
959                    issue: PipelineIssue {
960                        number: 2,
961                        title: "Two".to_string(),
962                        body: String::new(),
963                        source: IssueOrigin::Github,
964                        target_repo: None,
965                    },
966                    metadata: InFlightIssue {
967                        number: 2,
968                        title: "Two".to_string(),
969                        area: "a".to_string(),
970                        predicted_files: vec![],
971                        has_migration: false,
972                        complexity: Complexity::Full,
973                    },
974                    awaiting: HashSet::from([1]),
975                },
976            );
977            guard.insert(
978                3,
979                DeferredIssue {
980                    issue: PipelineIssue {
981                        number: 3,
982                        title: "Three".to_string(),
983                        body: String::new(),
984                        source: IssueOrigin::Github,
985                        target_repo: None,
986                    },
987                    metadata: InFlightIssue {
988                        number: 3,
989                        title: "Three".to_string(),
990                        area: "b".to_string(),
991                        predicted_files: vec![],
992                        has_migration: false,
993                        complexity: Complexity::Full,
994                    },
995                    awaiting: HashSet::from([1, 2]),
996                },
997            );
998        }
999
1000        // Issue 2 no longer in ready list (closed externally)
1001        let ready_numbers: HashSet<u32> = HashSet::from([3]);
1002        clean_stale_deferred(&deferred, &ready_numbers).await;
1003
1004        let guard = deferred.lock().await;
1005        assert!(!guard.contains_key(&2));
1006        let d3 = guard.get(&3).unwrap();
1007        let has_2 = d3.awaiting.contains(&2);
1008        let has_1 = d3.awaiting.contains(&1);
1009        drop(guard);
1010        assert!(!has_2);
1011        assert!(has_1);
1012    }
1013
1014    #[tokio::test]
1015    async fn planner_failure_falls_back_to_all_parallel() {
1016        let mut mock = MockCommandRunner::new();
1017        mock.expect_run_gh().returning(|_, _| {
1018            Box::pin(async {
1019                Ok(CommandOutput { stdout: String::new(), stderr: String::new(), success: true })
1020            })
1021        });
1022        mock.expect_run_claude().returning(|_, _, _, _| {
1023            Box::pin(async {
1024                Ok(AgentResult {
1025                    cost_usd: 0.5,
1026                    duration: Duration::from_secs(2),
1027                    turns: 1,
1028                    output: "I don't know how to plan".to_string(),
1029                    session_id: "sess-plan".to_string(),
1030                    success: true,
1031                })
1032            })
1033        });
1034
1035        let runner = Arc::new(mock);
1036        let github = Arc::new(GhClient::new(mock_runner_for_batch(), std::path::Path::new("/tmp")));
1037        let issues_provider = make_github_provider(&github);
1038        let db = Arc::new(Mutex::new(crate::db::open_in_memory().unwrap()));
1039
1040        let executor = Arc::new(PipelineExecutor {
1041            runner,
1042            github,
1043            issues: issues_provider,
1044            db,
1045            config: Config::default(),
1046            cancel_token: CancellationToken::new(),
1047            repo_dir: PathBuf::from("/tmp"),
1048        });
1049
1050        let issues = vec![PipelineIssue {
1051            number: 1,
1052            title: "Test".to_string(),
1053            body: "body".to_string(),
1054            source: IssueOrigin::Github,
1055            target_repo: None,
1056        }];
1057
1058        // plan_issues returns None for unparseable output
1059        let plan = executor.plan_issues(&issues, &[]).await;
1060        assert!(plan.is_none());
1061    }
1062}