1use std::{
2 collections::{HashMap, HashSet},
3 sync::Arc,
4 time::Duration,
5};
6
7use anyhow::Result;
8use tokio::{
9 sync::{Mutex, Semaphore},
10 task::JoinSet,
11};
12use tokio_util::sync::CancellationToken;
13use tracing::{error, info, warn};
14
15use super::executor::PipelineExecutor;
16use crate::{
17 agents::{InFlightIssue, PlannerOutput},
18 issues::PipelineIssue,
19 process::CommandRunner,
20};
21
22#[derive(Debug, Clone)]
28struct DeferredIssue {
29 issue: PipelineIssue,
30 metadata: InFlightIssue,
31 awaiting: HashSet<u32>,
32}
33
34pub async fn run_batch<R: CommandRunner + 'static>(
40 executor: &Arc<PipelineExecutor<R>>,
41 issues: Vec<PipelineIssue>,
42 max_parallel: usize,
43 auto_merge: bool,
44) -> Result<()> {
45 if let Some(plan) = executor.plan_issues(&issues, &[]).await {
46 info!(
47 batches = plan.batches.len(),
48 total = plan.total_issues,
49 "planner produced a plan, running batches sequentially"
50 );
51 run_batches_sequentially(executor, &issues, &plan, max_parallel, auto_merge).await
52 } else {
53 warn!("planner failed, falling back to all-parallel execution");
54 run_all_parallel(executor, issues, max_parallel, auto_merge).await
55 }
56}
57
58async fn run_batches_sequentially<R: CommandRunner + 'static>(
61 executor: &Arc<PipelineExecutor<R>>,
62 issues: &[PipelineIssue],
63 plan: &PlannerOutput,
64 max_parallel: usize,
65 auto_merge: bool,
66) -> Result<()> {
67 let issue_map: HashMap<u32, &PipelineIssue> = issues.iter().map(|i| (i.number, i)).collect();
68
69 for batch in &plan.batches {
70 let batch_issues: Vec<PipelineIssue> = batch
71 .issues
72 .iter()
73 .filter_map(|pi| issue_map.get(&pi.number).map(|i| (*i).clone()))
74 .collect();
75
76 if batch_issues.is_empty() {
77 continue;
78 }
79
80 info!(
81 batch = batch.batch,
82 count = batch_issues.len(),
83 reasoning = %batch.reasoning,
84 "starting batch"
85 );
86
87 run_single_batch(executor, batch_issues, &batch.issues, max_parallel, auto_merge).await?;
88 }
89
90 Ok(())
91}
92
93async fn run_single_batch<R: CommandRunner + 'static>(
95 executor: &Arc<PipelineExecutor<R>>,
96 issues: Vec<PipelineIssue>,
97 planned: &[crate::agents::PlannedIssue],
98 max_parallel: usize,
99 auto_merge: bool,
100) -> Result<()> {
101 let complexity_map: HashMap<u32, crate::agents::Complexity> =
102 planned.iter().map(|pi| (pi.number, pi.complexity.clone())).collect();
103 let semaphore = Arc::new(Semaphore::new(max_parallel));
104 let mut tasks = JoinSet::new();
105
106 for issue in issues {
107 let permit = semaphore
108 .clone()
109 .acquire_owned()
110 .await
111 .map_err(|e| anyhow::anyhow!("semaphore closed: {e}"))?;
112 let exec = Arc::clone(executor);
113 let complexity = complexity_map.get(&issue.number).cloned();
114 tasks.spawn(async move {
115 let number = issue.number;
116 let result = exec.run_issue_with_complexity(&issue, auto_merge, complexity).await;
117 drop(permit);
118 (number, result)
119 });
120 }
121
122 let mut had_errors = false;
123 while let Some(join_result) = tasks.join_next().await {
124 match join_result {
125 Ok((number, Err(e))) => {
126 error!(issue = number, error = %e, "pipeline failed for issue");
127 had_errors = true;
128 }
129 Err(e) => {
130 error!(error = %e, "pipeline task panicked");
131 had_errors = true;
132 }
133 Ok((number, Ok(()))) => {
134 info!(issue = number, "pipeline completed successfully");
135 }
136 }
137 }
138
139 if had_errors { Err(anyhow::anyhow!("one or more pipelines failed in batch")) } else { Ok(()) }
140}
141
142async fn run_all_parallel<R: CommandRunner + 'static>(
144 executor: &Arc<PipelineExecutor<R>>,
145 issues: Vec<PipelineIssue>,
146 max_parallel: usize,
147 auto_merge: bool,
148) -> Result<()> {
149 let semaphore = Arc::new(Semaphore::new(max_parallel));
150 let mut tasks = JoinSet::new();
151
152 for issue in issues {
153 let permit = semaphore
154 .clone()
155 .acquire_owned()
156 .await
157 .map_err(|e| anyhow::anyhow!("semaphore closed: {e}"))?;
158 let exec = Arc::clone(executor);
159 tasks.spawn(async move {
160 let number = issue.number;
161 let result = exec.run_issue(&issue, auto_merge).await;
162 drop(permit);
163 (number, result)
164 });
165 }
166
167 let mut had_errors = false;
168 while let Some(join_result) = tasks.join_next().await {
169 match join_result {
170 Ok((number, Ok(()))) => {
171 info!(issue = number, "pipeline completed successfully");
172 }
173 Ok((number, Err(e))) => {
174 error!(issue = number, error = %e, "pipeline failed for issue");
175 had_errors = true;
176 }
177 Err(e) => {
178 error!(error = %e, "pipeline task panicked");
179 had_errors = true;
180 }
181 }
182 }
183
184 if had_errors {
185 anyhow::bail!("one or more pipelines failed");
186 }
187 Ok(())
188}
189
190fn handle_task_result(result: Result<(u32, Result<()>), tokio::task::JoinError>) {
191 match result {
192 Ok((number, Ok(()))) => {
193 info!(issue = number, "pipeline completed successfully");
194 }
195 Ok((number, Err(e))) => {
196 error!(issue = number, error = %e, "pipeline failed for issue");
197 }
198 Err(e) => {
199 error!(error = %e, "pipeline task panicked");
200 }
201 }
202}
203
204pub async fn polling_loop<R: CommandRunner + 'static>(
214 executor: Arc<PipelineExecutor<R>>,
215 auto_merge: bool,
216 cancel_token: CancellationToken,
217) -> Result<()> {
218 let poll_interval = Duration::from_secs(executor.config.pipeline.poll_interval);
219 let max_parallel = executor.config.pipeline.max_parallel as usize;
220 let ready_label = executor.config.labels.ready.clone();
221 let semaphore = Arc::new(Semaphore::new(max_parallel));
222 let mut tasks = JoinSet::new();
223 let in_flight: Arc<Mutex<HashMap<u32, InFlightIssue>>> = Arc::new(Mutex::new(HashMap::new()));
224 let deferred: Arc<Mutex<HashMap<u32, DeferredIssue>>> = Arc::new(Mutex::new(HashMap::new()));
225
226 info!(poll_interval_secs = poll_interval.as_secs(), max_parallel, "continuous polling started");
227
228 loop {
229 tokio::select! {
230 () = cancel_token.cancelled() => {
231 info!("shutdown signal received, waiting for in-flight pipelines");
232 while let Some(result) = tasks.join_next().await {
233 handle_task_result(result);
234 }
235 break;
236 }
237 () = tokio::time::sleep(poll_interval) => {
238 poll_and_spawn(
239 &executor, &ready_label, &semaphore, &in_flight, &deferred,
240 &mut tasks, auto_merge,
241 ).await;
242 }
243 Some(result) = tasks.join_next(), if !tasks.is_empty() => {
244 handle_task_result(result);
245 }
246 }
247 }
248
249 Ok(())
250}
251
252async fn clean_stale_deferred(
255 deferred: &Arc<Mutex<HashMap<u32, DeferredIssue>>>,
256 ready_numbers: &HashSet<u32>,
257) {
258 let mut def_guard = deferred.lock().await;
259 let stale: HashSet<u32> =
260 def_guard.keys().filter(|num| !ready_numbers.contains(num)).copied().collect();
261 if !stale.is_empty() {
262 info!(count = stale.len(), "removing stale deferred issues");
263 def_guard.retain(|num, _| !stale.contains(num));
264 for d in def_guard.values_mut() {
265 d.awaiting.retain(|n| !stale.contains(n));
266 }
267 }
268}
269
270async fn promote_deferred(
272 deferred: &Arc<Mutex<HashMap<u32, DeferredIssue>>>,
273) -> Vec<(PipelineIssue, InFlightIssue)> {
274 let mut promoted = Vec::new();
275 let mut def_guard = deferred.lock().await;
276 let ready: Vec<u32> =
277 def_guard.iter().filter(|(_, d)| d.awaiting.is_empty()).map(|(num, _)| *num).collect();
278 for num in ready {
279 if let Some(d) = def_guard.remove(&num) {
280 info!(issue = num, "promoting deferred issue (dependencies cleared)");
281 promoted.push((d.issue, d.metadata));
282 }
283 }
284 promoted
285}
286
287async fn poll_and_spawn<R: CommandRunner + 'static>(
292 executor: &Arc<PipelineExecutor<R>>,
293 ready_label: &str,
294 semaphore: &Arc<Semaphore>,
295 in_flight: &Arc<Mutex<HashMap<u32, InFlightIssue>>>,
296 deferred: &Arc<Mutex<HashMap<u32, DeferredIssue>>>,
297 tasks: &mut JoinSet<(u32, Result<()>)>,
298 auto_merge: bool,
299) {
300 let ready_issues = match executor.issues.get_ready_issues(ready_label).await {
301 Ok(i) => i,
302 Err(e) => {
303 error!(error = %e, "failed to fetch issues");
304 return;
305 }
306 };
307
308 let ready_numbers: HashSet<u32> = ready_issues.iter().map(|i| i.number).collect();
309 clean_stale_deferred(deferred, &ready_numbers).await;
310
311 let in_flight_guard = in_flight.lock().await;
313 let in_flight_snapshot: Vec<InFlightIssue> = in_flight_guard.values().cloned().collect();
314 let in_flight_numbers: HashSet<u32> = in_flight_guard.keys().copied().collect();
315 drop(in_flight_guard);
316
317 let deferred_guard = deferred.lock().await;
318 let deferred_context: Vec<InFlightIssue> =
319 deferred_guard.values().map(|d| d.metadata.clone()).collect();
320 let deferred_numbers: HashSet<u32> = deferred_guard.keys().copied().collect();
321 drop(deferred_guard);
322
323 let new_issues: Vec<_> = ready_issues
324 .into_iter()
325 .filter(|i| !in_flight_numbers.contains(&i.number) && !deferred_numbers.contains(&i.number))
326 .collect();
327
328 let mut to_spawn = promote_deferred(deferred).await;
329
330 if !new_issues.is_empty() {
332 info!(count = new_issues.len(), "found new issues to evaluate");
333
334 let mut planner_context = in_flight_snapshot;
335 planner_context.extend(deferred_context);
336
337 if let Some(plan) = executor.plan_issues(&new_issues, &planner_context).await {
338 info!(
339 batches = plan.batches.len(),
340 total = plan.total_issues,
341 "planner produced a plan"
342 );
343 apply_plan(&new_issues, &plan, &in_flight_numbers, &mut to_spawn, deferred).await;
344 } else {
345 warn!("planner failed, spawning all new issues immediately");
346 for issue in &new_issues {
347 to_spawn.push((issue.clone(), InFlightIssue::from_issue(issue)));
348 }
349 }
350 }
351
352 if to_spawn.is_empty() {
353 if new_issues.is_empty() {
354 info!("no actionable issues, waiting");
355 }
356 return;
357 }
358
359 spawn_issues(to_spawn, semaphore, executor, in_flight, deferred, tasks, auto_merge).await;
360}
361
362async fn apply_plan(
364 new_issues: &[PipelineIssue],
365 plan: &PlannerOutput,
366 in_flight_numbers: &HashSet<u32>,
367 to_spawn: &mut Vec<(PipelineIssue, InFlightIssue)>,
368 deferred: &Arc<Mutex<HashMap<u32, DeferredIssue>>>,
369) {
370 let (spawn_map, defer_list) = split_plan(plan, in_flight_numbers);
371 let issue_map: HashMap<u32, &PipelineIssue> =
372 new_issues.iter().map(|i| (i.number, i)).collect();
373
374 for issue in new_issues {
375 if let Some(metadata) = spawn_map.get(&issue.number) {
376 to_spawn.push((issue.clone(), metadata.clone()));
377 }
378 }
379
380 let mut def_guard = deferred.lock().await;
381 for (number, metadata, awaiting) in defer_list {
382 if let Some(issue) = issue_map.get(&number) {
383 info!(
384 issue = number,
385 awaiting_count = awaiting.len(),
386 "deferring issue (waiting for dependencies)"
387 );
388 def_guard.insert(number, DeferredIssue { issue: (*issue).clone(), metadata, awaiting });
389 }
390 }
391}
392
393async fn spawn_issues<R: CommandRunner + 'static>(
395 to_spawn: Vec<(PipelineIssue, InFlightIssue)>,
396 semaphore: &Arc<Semaphore>,
397 executor: &Arc<PipelineExecutor<R>>,
398 in_flight: &Arc<Mutex<HashMap<u32, InFlightIssue>>>,
399 deferred: &Arc<Mutex<HashMap<u32, DeferredIssue>>>,
400 tasks: &mut JoinSet<(u32, Result<()>)>,
401 auto_merge: bool,
402) {
403 for (issue, metadata) in to_spawn {
404 let sem = Arc::clone(semaphore);
405 let exec = Arc::clone(executor);
406 let in_fl = Arc::clone(in_flight);
407 let def = Arc::clone(deferred);
408 let number = issue.number;
409 let complexity = Some(metadata.complexity.clone());
410
411 in_fl.lock().await.insert(number, metadata);
412
413 tasks.spawn(async move {
414 let permit = match sem.acquire_owned().await {
415 Ok(p) => p,
416 Err(e) => {
417 in_fl.lock().await.remove(&number);
418 return (number, Err(anyhow::anyhow!("semaphore closed: {e}")));
419 }
420 };
421 let result = exec.run_issue_with_complexity(&issue, auto_merge, complexity).await;
422 in_fl.lock().await.remove(&number);
423 {
425 let mut def_guard = def.lock().await;
426 for d in def_guard.values_mut() {
427 d.awaiting.remove(&number);
428 }
429 }
430 drop(permit);
431 (number, result)
432 });
433 }
434}
435
436type DeferredEntry = (u32, InFlightIssue, HashSet<u32>);
438
439fn split_plan(
444 plan: &PlannerOutput,
445 in_flight_numbers: &HashSet<u32>,
446) -> (HashMap<u32, InFlightIssue>, Vec<DeferredEntry>) {
447 let mut to_spawn = HashMap::new();
448 let mut to_defer = Vec::new();
449 let mut lower_batch: HashSet<u32> = in_flight_numbers.clone();
450
451 for batch in &plan.batches {
452 if batch.batch == 1 {
453 for pi in &batch.issues {
454 to_spawn.insert(pi.number, InFlightIssue::from(pi));
455 }
456 } else {
457 for pi in &batch.issues {
458 to_defer.push((pi.number, InFlightIssue::from(pi), lower_batch.clone()));
459 }
460 }
461 for pi in &batch.issues {
462 lower_batch.insert(pi.number);
463 }
464 }
465
466 (to_spawn, to_defer)
467}
468
469#[cfg(test)]
470mod tests {
471 use std::{collections::HashSet, path::PathBuf};
472
473 use tokio::sync::Mutex;
474
475 use super::*;
476 use crate::{
477 agents::{Batch, Complexity, PlannedIssue},
478 config::Config,
479 github::GhClient,
480 issues::{IssueOrigin, IssueProvider, github::GithubIssueProvider},
481 process::{AgentResult, CommandOutput, MockCommandRunner},
482 };
483
484 fn mock_runner_for_batch() -> MockCommandRunner {
485 let mut mock = MockCommandRunner::new();
486 mock.expect_run_gh().returning(|_, _| {
487 Box::pin(async {
488 Ok(CommandOutput {
489 stdout: "https://github.com/user/repo/pull/1\n".to_string(),
490 stderr: String::new(),
491 success: true,
492 })
493 })
494 });
495 mock.expect_run_claude().returning(|_, _, _, _| {
496 Box::pin(async {
497 Ok(AgentResult {
498 cost_usd: 1.0,
499 duration: Duration::from_secs(5),
500 turns: 3,
501 output: r#"{"findings":[],"summary":"clean"}"#.to_string(),
502 session_id: "sess-1".to_string(),
503 success: true,
504 })
505 })
506 });
507 mock
508 }
509
510 fn make_github_provider(gh: &Arc<GhClient<MockCommandRunner>>) -> Arc<dyn IssueProvider> {
511 Arc::new(GithubIssueProvider::new(Arc::clone(gh), "target_repo"))
512 }
513
514 #[tokio::test]
515 async fn cancellation_stops_polling() {
516 let cancel = CancellationToken::new();
517 let runner = Arc::new(mock_runner_for_batch());
518 let github = Arc::new(GhClient::new(mock_runner_for_batch(), std::path::Path::new("/tmp")));
519 let issues = make_github_provider(&github);
520 let db = Arc::new(Mutex::new(crate::db::open_in_memory().unwrap()));
521
522 let mut config = Config::default();
523 config.pipeline.poll_interval = 3600; let executor = Arc::new(PipelineExecutor {
526 runner,
527 github,
528 issues,
529 db,
530 config,
531 cancel_token: cancel.clone(),
532 repo_dir: PathBuf::from("/tmp"),
533 });
534
535 let cancel_clone = cancel.clone();
536 let handle = tokio::spawn(async move { polling_loop(executor, false, cancel_clone).await });
537
538 cancel.cancel();
540
541 let result = handle.await.unwrap();
542 assert!(result.is_ok());
543 }
544
545 #[tokio::test]
546 async fn cancellation_exits_within_timeout() {
547 let cancel = CancellationToken::new();
548 let runner = Arc::new(mock_runner_for_batch());
549 let github = Arc::new(GhClient::new(mock_runner_for_batch(), std::path::Path::new("/tmp")));
550 let issues = make_github_provider(&github);
551 let db = Arc::new(Mutex::new(crate::db::open_in_memory().unwrap()));
552
553 let mut config = Config::default();
554 config.pipeline.poll_interval = 3600;
555
556 let executor = Arc::new(PipelineExecutor {
557 runner,
558 github,
559 issues,
560 db,
561 config,
562 cancel_token: cancel.clone(),
563 repo_dir: PathBuf::from("/tmp"),
564 });
565
566 let cancel_clone = cancel.clone();
567 let handle = tokio::spawn(async move { polling_loop(executor, false, cancel_clone).await });
568
569 cancel.cancel();
570
571 let result = tokio::time::timeout(Duration::from_secs(5), handle)
572 .await
573 .expect("polling loop should exit within timeout")
574 .unwrap();
575 assert!(result.is_ok());
576 }
577
578 #[tokio::test]
579 async fn in_flight_map_filters_duplicate_issues() {
580 let in_flight: Arc<Mutex<HashMap<u32, InFlightIssue>>> =
581 Arc::new(Mutex::new(HashMap::new()));
582
583 in_flight.lock().await.insert(
585 1,
586 InFlightIssue {
587 number: 1,
588 title: "Already running".to_string(),
589 area: "auth".to_string(),
590 predicted_files: vec!["src/auth.rs".to_string()],
591 has_migration: false,
592 complexity: Complexity::Full,
593 },
594 );
595
596 let issues = vec![
597 PipelineIssue {
598 number: 1,
599 title: "Already running".to_string(),
600 body: String::new(),
601 source: IssueOrigin::Github,
602 target_repo: None,
603 },
604 PipelineIssue {
605 number: 2,
606 title: "New issue".to_string(),
607 body: String::new(),
608 source: IssueOrigin::Github,
609 target_repo: None,
610 },
611 PipelineIssue {
612 number: 3,
613 title: "Another new".to_string(),
614 body: String::new(),
615 source: IssueOrigin::Github,
616 target_repo: None,
617 },
618 ];
619
620 let guard = in_flight.lock().await;
621 let new_issues: Vec<_> =
622 issues.into_iter().filter(|i| !guard.contains_key(&i.number)).collect();
623 drop(guard);
624
625 assert_eq!(new_issues.len(), 2);
626 assert_eq!(new_issues[0].number, 2);
627 assert_eq!(new_issues[1].number, 3);
628 }
629
630 #[test]
631 fn handle_task_result_does_not_panic_on_success() {
632 handle_task_result(Ok((1, Ok(()))));
633 }
634
635 #[test]
636 fn handle_task_result_does_not_panic_on_error() {
637 handle_task_result(Ok((1, Err(anyhow::anyhow!("test error")))));
638 }
639
640 #[test]
641 fn split_plan_separates_batches() {
642 let plan = PlannerOutput {
643 batches: vec![
644 Batch {
645 batch: 1,
646 issues: vec![
647 PlannedIssue {
648 number: 1,
649 title: "First".to_string(),
650 area: "cli".to_string(),
651 predicted_files: vec!["src/cli.rs".to_string()],
652 has_migration: false,
653 complexity: Complexity::Simple,
654 },
655 PlannedIssue {
656 number: 2,
657 title: "Second".to_string(),
658 area: "config".to_string(),
659 predicted_files: vec!["src/config.rs".to_string()],
660 has_migration: false,
661 complexity: Complexity::Full,
662 },
663 ],
664 reasoning: "independent".to_string(),
665 },
666 Batch {
667 batch: 2,
668 issues: vec![PlannedIssue {
669 number: 3,
670 title: "Third".to_string(),
671 area: "db".to_string(),
672 predicted_files: vec!["src/db.rs".to_string()],
673 has_migration: true,
674 complexity: Complexity::Full,
675 }],
676 reasoning: "depends on batch 1".to_string(),
677 },
678 ],
679 total_issues: 3,
680 parallel_capacity: 2,
681 };
682
683 let (spawn_map, defer_list) = split_plan(&plan, &HashSet::new());
684
685 assert_eq!(spawn_map.len(), 2);
686 assert_eq!(spawn_map.get(&1).unwrap().complexity, Complexity::Simple);
687 assert_eq!(spawn_map.get(&1).unwrap().area, "cli");
688 assert_eq!(spawn_map.get(&2).unwrap().complexity, Complexity::Full);
689
690 assert_eq!(defer_list.len(), 1);
691 let (num, meta, awaiting) = &defer_list[0];
692 assert_eq!(*num, 3);
693 assert_eq!(meta.area, "db");
694 assert!(awaiting.contains(&1));
695 assert!(awaiting.contains(&2));
696 assert_eq!(awaiting.len(), 2);
697 }
698
699 #[test]
700 fn split_plan_empty() {
701 let plan = PlannerOutput { batches: vec![], total_issues: 0, parallel_capacity: 0 };
702 let (spawn_map, defer_list) = split_plan(&plan, &HashSet::new());
703 assert!(spawn_map.is_empty());
704 assert!(defer_list.is_empty());
705 }
706
707 #[test]
708 fn split_plan_includes_in_flight_in_awaiting() {
709 let plan = PlannerOutput {
710 batches: vec![
711 Batch {
712 batch: 1,
713 issues: vec![PlannedIssue {
714 number: 5,
715 title: "New".to_string(),
716 area: "cli".to_string(),
717 predicted_files: vec![],
718 has_migration: false,
719 complexity: Complexity::Simple,
720 }],
721 reasoning: "ok".to_string(),
722 },
723 Batch {
724 batch: 2,
725 issues: vec![PlannedIssue {
726 number: 6,
727 title: "Depends".to_string(),
728 area: "db".to_string(),
729 predicted_files: vec![],
730 has_migration: true,
731 complexity: Complexity::Full,
732 }],
733 reasoning: "conflicts".to_string(),
734 },
735 ],
736 total_issues: 2,
737 parallel_capacity: 1,
738 };
739
740 let in_flight_nums: HashSet<u32> = [10, 11].into_iter().collect();
741 let (spawn_map, defer_list) = split_plan(&plan, &in_flight_nums);
742
743 assert_eq!(spawn_map.len(), 1);
744 assert!(spawn_map.contains_key(&5));
745
746 assert_eq!(defer_list.len(), 1);
747 let (num, _, awaiting) = &defer_list[0];
748 assert_eq!(*num, 6);
749 assert!(awaiting.contains(&10));
750 assert!(awaiting.contains(&11));
751 assert!(awaiting.contains(&5));
752 assert_eq!(awaiting.len(), 3);
753 }
754
755 #[test]
756 fn split_plan_three_batches_chain_awaiting() {
757 let plan = PlannerOutput {
758 batches: vec![
759 Batch {
760 batch: 1,
761 issues: vec![PlannedIssue {
762 number: 1,
763 title: "A".to_string(),
764 area: "a".to_string(),
765 predicted_files: vec![],
766 has_migration: false,
767 complexity: Complexity::Simple,
768 }],
769 reasoning: String::new(),
770 },
771 Batch {
772 batch: 2,
773 issues: vec![PlannedIssue {
774 number: 2,
775 title: "B".to_string(),
776 area: "b".to_string(),
777 predicted_files: vec![],
778 has_migration: false,
779 complexity: Complexity::Full,
780 }],
781 reasoning: String::new(),
782 },
783 Batch {
784 batch: 3,
785 issues: vec![PlannedIssue {
786 number: 3,
787 title: "C".to_string(),
788 area: "c".to_string(),
789 predicted_files: vec![],
790 has_migration: false,
791 complexity: Complexity::Full,
792 }],
793 reasoning: String::new(),
794 },
795 ],
796 total_issues: 3,
797 parallel_capacity: 1,
798 };
799
800 let (spawn_map, defer_list) = split_plan(&plan, &HashSet::new());
801
802 assert_eq!(spawn_map.len(), 1);
803 assert!(spawn_map.contains_key(&1));
804
805 assert_eq!(defer_list.len(), 2);
806 let (_, _, awaiting_2) = &defer_list[0];
807 assert_eq!(*awaiting_2, HashSet::from([1]));
808 let (_, _, awaiting_3) = &defer_list[1];
809 assert_eq!(*awaiting_3, HashSet::from([1, 2]));
810 }
811
812 #[tokio::test]
813 async fn deferred_issues_filtered_from_new_issues() {
814 let in_flight: Arc<Mutex<HashMap<u32, InFlightIssue>>> =
815 Arc::new(Mutex::new(HashMap::new()));
816 let deferred: Arc<Mutex<HashMap<u32, DeferredIssue>>> =
817 Arc::new(Mutex::new(HashMap::new()));
818
819 in_flight.lock().await.insert(
820 1,
821 InFlightIssue {
822 number: 1,
823 title: "Running".to_string(),
824 area: "auth".to_string(),
825 predicted_files: vec![],
826 has_migration: false,
827 complexity: Complexity::Full,
828 },
829 );
830
831 deferred.lock().await.insert(
832 2,
833 DeferredIssue {
834 issue: PipelineIssue {
835 number: 2,
836 title: "Waiting".to_string(),
837 body: String::new(),
838 source: IssueOrigin::Github,
839 target_repo: None,
840 },
841 metadata: InFlightIssue {
842 number: 2,
843 title: "Waiting".to_string(),
844 area: "db".to_string(),
845 predicted_files: vec![],
846 has_migration: false,
847 complexity: Complexity::Full,
848 },
849 awaiting: HashSet::from([1]),
850 },
851 );
852
853 let issues = vec![
854 PipelineIssue {
855 number: 1,
856 title: "Running".to_string(),
857 body: String::new(),
858 source: IssueOrigin::Github,
859 target_repo: None,
860 },
861 PipelineIssue {
862 number: 2,
863 title: "Waiting".to_string(),
864 body: String::new(),
865 source: IssueOrigin::Github,
866 target_repo: None,
867 },
868 PipelineIssue {
869 number: 3,
870 title: "New".to_string(),
871 body: String::new(),
872 source: IssueOrigin::Github,
873 target_repo: None,
874 },
875 ];
876
877 let ifl = in_flight.lock().await;
878 let def = deferred.lock().await;
879 let new_issues: Vec<_> = issues
880 .into_iter()
881 .filter(|i| !ifl.contains_key(&i.number) && !def.contains_key(&i.number))
882 .collect();
883 drop(ifl);
884 drop(def);
885
886 assert_eq!(new_issues.len(), 1);
887 assert_eq!(new_issues[0].number, 3);
888 }
889
890 #[tokio::test]
891 async fn deferred_promotion_when_awaiting_clears() {
892 let deferred: Arc<Mutex<HashMap<u32, DeferredIssue>>> =
893 Arc::new(Mutex::new(HashMap::new()));
894
895 deferred.lock().await.insert(
896 3,
897 DeferredIssue {
898 issue: PipelineIssue {
899 number: 3,
900 title: "Deferred".to_string(),
901 body: String::new(),
902 source: IssueOrigin::Github,
903 target_repo: None,
904 },
905 metadata: InFlightIssue {
906 number: 3,
907 title: "Deferred".to_string(),
908 area: "db".to_string(),
909 predicted_files: vec![],
910 has_migration: true,
911 complexity: Complexity::Full,
912 },
913 awaiting: HashSet::from([1, 2]),
914 },
915 );
916
917 {
919 let mut guard = deferred.lock().await;
920 for d in guard.values_mut() {
921 d.awaiting.remove(&1);
922 }
923 }
924
925 assert!(
927 deferred.lock().await.values().all(|d| !d.awaiting.is_empty()),
928 "should not be promotable yet"
929 );
930
931 {
933 let mut guard = deferred.lock().await;
934 for d in guard.values_mut() {
935 d.awaiting.remove(&2);
936 }
937 }
938
939 {
941 let guard = deferred.lock().await;
942 let promotable: Vec<u32> =
943 guard.iter().filter(|(_, d)| d.awaiting.is_empty()).map(|(n, _)| *n).collect();
944 assert_eq!(promotable, vec![3]);
945 drop(guard);
946 }
947 }
948
949 #[tokio::test]
950 async fn stale_deferred_issues_cleaned_up() {
951 let deferred: Arc<Mutex<HashMap<u32, DeferredIssue>>> =
952 Arc::new(Mutex::new(HashMap::new()));
953
954 {
955 let mut guard = deferred.lock().await;
956 guard.insert(
957 2,
958 DeferredIssue {
959 issue: PipelineIssue {
960 number: 2,
961 title: "Two".to_string(),
962 body: String::new(),
963 source: IssueOrigin::Github,
964 target_repo: None,
965 },
966 metadata: InFlightIssue {
967 number: 2,
968 title: "Two".to_string(),
969 area: "a".to_string(),
970 predicted_files: vec![],
971 has_migration: false,
972 complexity: Complexity::Full,
973 },
974 awaiting: HashSet::from([1]),
975 },
976 );
977 guard.insert(
978 3,
979 DeferredIssue {
980 issue: PipelineIssue {
981 number: 3,
982 title: "Three".to_string(),
983 body: String::new(),
984 source: IssueOrigin::Github,
985 target_repo: None,
986 },
987 metadata: InFlightIssue {
988 number: 3,
989 title: "Three".to_string(),
990 area: "b".to_string(),
991 predicted_files: vec![],
992 has_migration: false,
993 complexity: Complexity::Full,
994 },
995 awaiting: HashSet::from([1, 2]),
996 },
997 );
998 }
999
1000 let ready_numbers: HashSet<u32> = HashSet::from([3]);
1002 clean_stale_deferred(&deferred, &ready_numbers).await;
1003
1004 let guard = deferred.lock().await;
1005 assert!(!guard.contains_key(&2));
1006 let d3 = guard.get(&3).unwrap();
1007 let has_2 = d3.awaiting.contains(&2);
1008 let has_1 = d3.awaiting.contains(&1);
1009 drop(guard);
1010 assert!(!has_2);
1011 assert!(has_1);
1012 }
1013
1014 #[tokio::test]
1015 async fn planner_failure_falls_back_to_all_parallel() {
1016 let mut mock = MockCommandRunner::new();
1017 mock.expect_run_gh().returning(|_, _| {
1018 Box::pin(async {
1019 Ok(CommandOutput { stdout: String::new(), stderr: String::new(), success: true })
1020 })
1021 });
1022 mock.expect_run_claude().returning(|_, _, _, _| {
1023 Box::pin(async {
1024 Ok(AgentResult {
1025 cost_usd: 0.5,
1026 duration: Duration::from_secs(2),
1027 turns: 1,
1028 output: "I don't know how to plan".to_string(),
1029 session_id: "sess-plan".to_string(),
1030 success: true,
1031 })
1032 })
1033 });
1034
1035 let runner = Arc::new(mock);
1036 let github = Arc::new(GhClient::new(mock_runner_for_batch(), std::path::Path::new("/tmp")));
1037 let issues_provider = make_github_provider(&github);
1038 let db = Arc::new(Mutex::new(crate::db::open_in_memory().unwrap()));
1039
1040 let executor = Arc::new(PipelineExecutor {
1041 runner,
1042 github,
1043 issues: issues_provider,
1044 db,
1045 config: Config::default(),
1046 cancel_token: CancellationToken::new(),
1047 repo_dir: PathBuf::from("/tmp"),
1048 });
1049
1050 let issues = vec![PipelineIssue {
1051 number: 1,
1052 title: "Test".to_string(),
1053 body: "body".to_string(),
1054 source: IssueOrigin::Github,
1055 target_repo: None,
1056 }];
1057
1058 let plan = executor.plan_issues(&issues, &[]).await;
1060 assert!(plan.is_none());
1061 }
1062}