1use std::collections::{HashMap, HashSet};
20use std::path::Path;
21use std::thread;
22use std::time::{Duration, Instant};
23
24use anyhow::Result;
25use colored::Colorize;
26
27use crate::commands::spawn::agent;
28use crate::commands::spawn::terminal::{self, Harness};
29use crate::models::phase::Phase;
30use crate::models::task::{Task, TaskStatus};
31use crate::storage::Storage;
32
33use super::events::EventWriter;
34use super::session::{RoundState, SwarmSession};
35
36pub struct BeadsConfig {
38 pub max_concurrent: usize,
40 pub poll_interval: Duration,
42}
43
44impl Default for BeadsConfig {
45 fn default() -> Self {
46 Self {
47 max_concurrent: 5,
48 poll_interval: Duration::from_secs(3),
49 }
50 }
51}
52
53#[derive(Clone, Debug)]
55pub struct ReadyTask {
56 pub task: Task,
57 pub tag: String,
58}
59
60pub struct BeadsResult {
62 pub tasks_completed: usize,
63 pub tasks_failed: usize,
64 pub total_duration: Duration,
65}
66
67pub fn get_ready_tasks(
75 all_phases: &HashMap<String, Phase>,
76 phase_tag: &str,
77 all_tags: bool,
78) -> Vec<ReadyTask> {
79 let mut ready = Vec::new();
80
81 let all_task_refs: Vec<&Task> = all_phases.values().flat_map(|p| &p.tasks).collect();
83
84 let phase_tags: Vec<&String> = if all_tags {
86 all_phases.keys().collect()
87 } else {
88 all_phases
89 .keys()
90 .filter(|t| t.as_str() == phase_tag)
91 .collect()
92 };
93
94 for tag in phase_tags {
95 if let Some(phase) = all_phases.get(tag) {
96 for task in &phase.tasks {
97 if is_task_ready(task, phase, &all_task_refs) {
98 ready.push(ReadyTask {
99 task: task.clone(),
100 tag: tag.clone(),
101 });
102 }
103 }
104 }
105 }
106
107 ready.sort_by(|a, b| {
109 use crate::models::task::Priority;
110 let priority_ord = |p: &Priority| match p {
111 Priority::Critical => 0,
112 Priority::High => 1,
113 Priority::Medium => 2,
114 Priority::Low => 3,
115 };
116 priority_ord(&a.task.priority)
117 .cmp(&priority_ord(&b.task.priority))
118 .then_with(|| a.task.id.cmp(&b.task.id))
119 });
120
121 ready
122}
123
124fn is_task_ready(task: &Task, phase: &Phase, all_tasks: &[&Task]) -> bool {
126 if task.status != TaskStatus::Pending {
128 return false;
129 }
130
131 if task.is_expanded() {
133 return false;
134 }
135
136 if let Some(ref parent_id) = task.parent_id {
138 let parent_expanded = phase
139 .get_task(parent_id)
140 .map(|p| p.is_expanded())
141 .unwrap_or(false);
142 if !parent_expanded {
143 return false;
144 }
145 }
146
147 task.has_dependencies_met_refs(all_tasks)
150}
151
152pub fn count_in_progress(
154 all_phases: &HashMap<String, Phase>,
155 phase_tag: &str,
156 all_tags: bool,
157) -> usize {
158 let tags: Vec<&String> = if all_tags {
159 all_phases.keys().collect()
160 } else {
161 all_phases
162 .keys()
163 .filter(|t| t.as_str() == phase_tag)
164 .collect()
165 };
166
167 tags.iter()
168 .filter_map(|tag| all_phases.get(*tag))
169 .flat_map(|phase| &phase.tasks)
170 .filter(|t| t.status == TaskStatus::InProgress)
171 .count()
172}
173
174pub fn count_remaining(
176 all_phases: &HashMap<String, Phase>,
177 phase_tag: &str,
178 all_tags: bool,
179) -> usize {
180 let tags: Vec<&String> = if all_tags {
181 all_phases.keys().collect()
182 } else {
183 all_phases
184 .keys()
185 .filter(|t| t.as_str() == phase_tag)
186 .collect()
187 };
188
189 tags.iter()
190 .filter_map(|tag| all_phases.get(*tag))
191 .flat_map(|phase| &phase.tasks)
192 .filter(|t| {
193 t.status == TaskStatus::InProgress
194 || (t.status == TaskStatus::Pending && !t.is_expanded())
195 })
196 .count()
197}
198
199pub fn claim_task(storage: &Storage, task_id: &str, tag: &str) -> Result<bool> {
201 let mut phase = storage.load_group(tag)?;
202
203 if let Some(task) = phase.get_task_mut(task_id) {
204 if task.status == TaskStatus::Pending {
206 task.set_status(TaskStatus::InProgress);
207 storage.update_group(tag, &phase)?;
208 return Ok(true);
209 }
210 }
211
212 Ok(false)
213}
214
215pub fn spawn_agent_tmux(
217 ready_task: &ReadyTask,
218 working_dir: &Path,
219 session_name: &str,
220 default_harness: Harness,
221) -> Result<String> {
222 let config = agent::resolve_agent_config(
224 &ready_task.task,
225 &ready_task.tag,
226 default_harness,
227 None,
228 working_dir,
229 );
230
231 let window_index = terminal::spawn_terminal_with_harness_and_model(
233 &ready_task.task.id,
234 &config.prompt,
235 working_dir,
236 session_name,
237 config.harness,
238 config.model.as_deref(),
239 )?;
240
241 Ok(format!("{}:{}", session_name, window_index))
242}
243
244#[allow(clippy::too_many_arguments)]
250pub fn run_beads_loop(
251 storage: &Storage,
252 phase_tag: &str,
253 all_tags: bool,
254 working_dir: &Path,
255 session_name: &str,
256 default_harness: Harness,
257 config: &BeadsConfig,
258 session: &mut SwarmSession,
259) -> Result<BeadsResult> {
260 let start_time = Instant::now();
261 let mut tasks_completed = 0;
262 let mut tasks_failed = 0;
263 let mut spawned_tasks: HashSet<String> = HashSet::new();
264 let mut spawned_times: HashMap<String, Instant> = HashMap::new();
265 let mut round_state = RoundState::new(0); let event_writer = EventWriter::new(working_dir, session_name)
269 .map_err(|e| anyhow::anyhow!("Failed to initialize event writer: {}", e))?;
270
271 println!();
272 println!("{}", "Beads Execution Mode".cyan().bold());
273 println!("{}", "═".repeat(50));
274 println!(" {} Continuous ready-task polling", "Mode:".dimmed());
275 if let Some(session_file) = event_writer.session_file() {
276 println!(
277 " {} {}",
278 "Event log:".dimmed(),
279 session_file.display().to_string().dimmed()
280 );
281 }
282 println!(
283 " {} {}",
284 "Max concurrent:".dimmed(),
285 config.max_concurrent.to_string().cyan()
286 );
287 println!(
288 " {} {}ms",
289 "Poll interval:".dimmed(),
290 config.poll_interval.as_millis().to_string().cyan()
291 );
292 println!();
293
294 loop {
295 let all_phases = storage.load_tasks()?;
297
298 let in_progress = count_in_progress(&all_phases, phase_tag, all_tags);
300 let remaining = count_remaining(&all_phases, phase_tag, all_tags);
301
302 if remaining == 0 {
304 println!();
305 println!("{}", "All tasks complete!".green().bold());
306 break;
307 }
308
309 let ready_tasks = get_ready_tasks(&all_phases, phase_tag, all_tags);
311
312 let ready_tasks: Vec<_> = ready_tasks
314 .into_iter()
315 .filter(|rt| !spawned_tasks.contains(&rt.task.id))
316 .collect();
317
318 if ready_tasks.is_empty() {
319 if in_progress > 0 {
320 print!(
322 "\r {} {} task(s) in progress, waiting... ",
323 "⏳".dimmed(),
324 in_progress.to_string().cyan()
325 );
326 std::io::Write::flush(&mut std::io::stdout())?;
327 thread::sleep(config.poll_interval);
328 continue;
329 } else {
330 println!();
332 println!("{}", "No ready tasks and none in progress.".yellow());
333 println!(
334 " {} {} remaining task(s) may be blocked.",
335 "!".yellow(),
336 remaining
337 );
338 println!(" Check for circular dependencies or missing dependencies.");
339 break;
340 }
341 }
342
343 print!("\r{}\r", " ".repeat(60));
345
346 let available_slots = config.max_concurrent.saturating_sub(in_progress);
348 let to_spawn = ready_tasks.into_iter().take(available_slots);
349
350 for ready_task in to_spawn {
352 if !claim_task(storage, &ready_task.task.id, &ready_task.tag)? {
354 continue;
356 }
357
358 spawned_tasks.insert(ready_task.task.id.clone());
360 spawned_times.insert(ready_task.task.id.clone(), Instant::now());
361
362 if let Err(e) = event_writer.log_spawned(&ready_task.task.id) {
364 eprintln!("Warning: Failed to log spawn event: {}", e);
365 }
366
367 match spawn_agent_tmux(&ready_task, working_dir, session_name, default_harness) {
369 Ok(window_info) => {
370 println!(
371 " {} Spawned: {} | {} [{}]",
372 "✓".green(),
373 ready_task.task.id.cyan(),
374 ready_task.task.title.dimmed(),
375 window_info.dimmed()
376 );
377 round_state.task_ids.push(ready_task.task.id.clone());
378 round_state.tags.push(ready_task.tag.clone());
379 }
380 Err(e) => {
381 println!(
382 " {} Failed: {} - {}",
383 "✗".red(),
384 ready_task.task.id.red(),
385 e
386 );
387 round_state.failures.push(ready_task.task.id.clone());
388 tasks_failed += 1;
389
390 if let Err(log_err) = event_writer.log_completed(&ready_task.task.id, false, 0)
392 {
393 eprintln!("Warning: Failed to log completion event: {}", log_err);
394 }
395
396 if let Ok(mut phase) = storage.load_group(&ready_task.tag) {
398 if let Some(task) = phase.get_task_mut(&ready_task.task.id) {
399 task.set_status(TaskStatus::Failed);
400 let _ = storage.update_group(&ready_task.tag, &phase);
401 }
402 }
403 }
404 }
405 }
406
407 let mut newly_completed: Vec<(String, bool)> = Vec::new();
409 for task_id in &spawned_tasks {
410 if !spawned_times.contains_key(task_id) {
412 continue;
413 }
414 for phase in all_phases.values() {
415 if let Some(task) = phase.get_task(task_id) {
416 match task.status {
417 TaskStatus::Done => {
418 newly_completed.push((task_id.clone(), true));
419 }
420 TaskStatus::Failed => {
421 newly_completed.push((task_id.clone(), false));
422 }
423 _ => {}
424 }
425 break;
426 }
427 }
428 }
429
430 for (task_id, success) in newly_completed {
432 if let Some(spawn_time) = spawned_times.remove(&task_id) {
433 spawned_tasks.remove(&task_id);
435
436 let duration_ms = spawn_time.elapsed().as_millis() as u64;
437 if let Err(e) = event_writer.log_completed(&task_id, success, duration_ms) {
438 eprintln!("Warning: Failed to log completion: {}", e);
439 }
440 if success {
441 tasks_completed += 1;
442 println!(
443 " {} Completed: {} ({}ms)",
444 "✓".green(),
445 task_id.cyan(),
446 duration_ms
447 );
448
449 for phase in all_phases.values() {
452 for potential_unblocked in &phase.tasks {
453 if potential_unblocked.status == TaskStatus::Pending
454 && potential_unblocked.dependencies.contains(&task_id)
455 {
456 if let Err(e) =
457 event_writer.log_unblocked(&potential_unblocked.id, &task_id)
458 {
459 eprintln!("Warning: Failed to log unblock: {}", e);
460 }
461 }
462 }
463 }
464 } else {
465 tasks_failed += 1;
466 }
467 }
468 }
469
470 if in_progress >= config.max_concurrent {
472 thread::sleep(config.poll_interval);
473 } else {
474 thread::sleep(Duration::from_millis(100));
476 }
477 }
478
479 let mut wave_state = super::session::WaveState::new(1);
481 wave_state.rounds.push(round_state);
482 session.waves.push(wave_state);
483
484 Ok(BeadsResult {
485 tasks_completed,
486 tasks_failed,
487 total_duration: start_time.elapsed(),
488 })
489}
490
491#[cfg(test)]
495mod tests {
496 use super::*;
497 use crate::models::task::Priority;
498 use tempfile::TempDir;
499
500 fn create_test_task(id: &str, status: TaskStatus, deps: Vec<&str>) -> Task {
501 let mut task = Task::new(
502 id.to_string(),
503 format!("Task {}", id),
504 "Description".to_string(),
505 );
506 task.status = status;
507 task.dependencies = deps.into_iter().map(String::from).collect();
508 task
509 }
510
511 fn setup_storage_with_phase(phase: &Phase, tag: &str) -> (TempDir, Storage) {
512 let temp_dir = TempDir::new().unwrap();
513 let storage = Storage::new(Some(temp_dir.path().to_path_buf()));
514 storage.update_group(tag, phase).unwrap();
515 (temp_dir, storage)
516 }
517
518 #[test]
519 fn test_get_ready_tasks_no_deps() {
520 let mut phase = Phase::new("test".to_string());
521 phase
522 .tasks
523 .push(create_test_task("1", TaskStatus::Pending, vec![]));
524 phase
525 .tasks
526 .push(create_test_task("2", TaskStatus::Pending, vec![]));
527
528 let mut phases = HashMap::new();
529 phases.insert("test".to_string(), phase);
530
531 let ready = get_ready_tasks(&phases, "test", false);
532 assert_eq!(ready.len(), 2);
533 }
534
535 #[test]
536 fn test_get_ready_tasks_with_deps_met() {
537 let mut phase = Phase::new("test".to_string());
538 phase
539 .tasks
540 .push(create_test_task("1", TaskStatus::Done, vec![]));
541 phase
542 .tasks
543 .push(create_test_task("2", TaskStatus::Pending, vec!["1"]));
544
545 let mut phases = HashMap::new();
546 phases.insert("test".to_string(), phase);
547
548 let ready = get_ready_tasks(&phases, "test", false);
549 assert_eq!(ready.len(), 1);
550 assert_eq!(ready[0].task.id, "2");
551 }
552
553 #[test]
554 fn test_get_ready_tasks_with_deps_not_met() {
555 let mut phase = Phase::new("test".to_string());
556 phase
557 .tasks
558 .push(create_test_task("1", TaskStatus::InProgress, vec![]));
559 phase
560 .tasks
561 .push(create_test_task("2", TaskStatus::Pending, vec!["1"]));
562
563 let mut phases = HashMap::new();
564 phases.insert("test".to_string(), phase);
565
566 let ready = get_ready_tasks(&phases, "test", false);
567 assert_eq!(ready.len(), 0);
568 }
569
570 #[test]
571 fn test_get_ready_tasks_skips_expanded() {
572 let mut phase = Phase::new("test".to_string());
573 let mut expanded_task = create_test_task("1", TaskStatus::Expanded, vec![]);
574 expanded_task.subtasks = vec!["1.1".to_string()];
575 phase.tasks.push(expanded_task);
576
577 let mut subtask = create_test_task("1.1", TaskStatus::Pending, vec![]);
578 subtask.parent_id = Some("1".to_string());
579 phase.tasks.push(subtask);
580
581 let mut phases = HashMap::new();
582 phases.insert("test".to_string(), phase);
583
584 let ready = get_ready_tasks(&phases, "test", false);
585 assert_eq!(ready.len(), 1);
586 assert_eq!(ready[0].task.id, "1.1");
587 }
588
589 #[test]
590 fn test_get_ready_tasks_priority_sort() {
591 let mut phase = Phase::new("test".to_string());
592
593 let mut low = create_test_task("low", TaskStatus::Pending, vec![]);
594 low.priority = Priority::Low;
595
596 let mut critical = create_test_task("critical", TaskStatus::Pending, vec![]);
597 critical.priority = Priority::Critical;
598
599 let mut high = create_test_task("high", TaskStatus::Pending, vec![]);
600 high.priority = Priority::High;
601
602 phase.tasks.push(low);
603 phase.tasks.push(critical);
604 phase.tasks.push(high);
605
606 let mut phases = HashMap::new();
607 phases.insert("test".to_string(), phase);
608
609 let ready = get_ready_tasks(&phases, "test", false);
610 assert_eq!(ready.len(), 3);
611 assert_eq!(ready[0].task.id, "critical");
612 assert_eq!(ready[1].task.id, "high");
613 assert_eq!(ready[2].task.id, "low");
614 }
615
616 #[test]
617 fn test_count_in_progress() {
618 let mut phase = Phase::new("test".to_string());
619 phase
620 .tasks
621 .push(create_test_task("1", TaskStatus::InProgress, vec![]));
622 phase
623 .tasks
624 .push(create_test_task("2", TaskStatus::InProgress, vec![]));
625 phase
626 .tasks
627 .push(create_test_task("3", TaskStatus::Pending, vec![]));
628 phase
629 .tasks
630 .push(create_test_task("4", TaskStatus::Done, vec![]));
631
632 let mut phases = HashMap::new();
633 phases.insert("test".to_string(), phase);
634
635 assert_eq!(count_in_progress(&phases, "test", false), 2);
636 }
637
638 #[test]
639 fn test_count_remaining() {
640 let mut phase = Phase::new("test".to_string());
641 phase
642 .tasks
643 .push(create_test_task("1", TaskStatus::InProgress, vec![]));
644 phase
645 .tasks
646 .push(create_test_task("2", TaskStatus::Pending, vec![]));
647 phase
648 .tasks
649 .push(create_test_task("3", TaskStatus::Done, vec![]));
650 phase
651 .tasks
652 .push(create_test_task("4", TaskStatus::Failed, vec![]));
653
654 let mut phases = HashMap::new();
655 phases.insert("test".to_string(), phase);
656
657 assert_eq!(count_remaining(&phases, "test", false), 2); }
659
660 #[test]
661 fn test_claim_task_pending() {
662 let mut phase = Phase::new("test".to_string());
663 phase
664 .tasks
665 .push(create_test_task("1", TaskStatus::Pending, vec![]));
666
667 let (_temp_dir, storage) = setup_storage_with_phase(&phase, "test");
668
669 let claimed = claim_task(&storage, "1", "test").unwrap();
671 assert!(claimed);
672
673 let reloaded = storage.load_group("test").unwrap();
675 assert_eq!(
676 reloaded.get_task("1").unwrap().status,
677 TaskStatus::InProgress
678 );
679 }
680
681 #[test]
682 fn test_claim_task_already_in_progress() {
683 let mut phase = Phase::new("test".to_string());
684 phase
685 .tasks
686 .push(create_test_task("1", TaskStatus::InProgress, vec![]));
687
688 let (_temp_dir, storage) = setup_storage_with_phase(&phase, "test");
689
690 let claimed = claim_task(&storage, "1", "test").unwrap();
692 assert!(!claimed);
693 }
694
695 #[test]
696 fn test_claim_task_nonexistent() {
697 let mut phase = Phase::new("test".to_string());
698 phase
699 .tasks
700 .push(create_test_task("1", TaskStatus::Pending, vec![]));
701
702 let (_temp_dir, storage) = setup_storage_with_phase(&phase, "test");
703
704 let claimed = claim_task(&storage, "nonexistent", "test").unwrap();
706 assert!(!claimed);
707 }
708
709 #[test]
710 fn test_claim_task_already_done() {
711 let mut phase = Phase::new("test".to_string());
712 phase
713 .tasks
714 .push(create_test_task("1", TaskStatus::Done, vec![]));
715
716 let (_temp_dir, storage) = setup_storage_with_phase(&phase, "test");
717
718 let claimed = claim_task(&storage, "1", "test").unwrap();
720 assert!(!claimed);
721 }
722}