1use std::collections::{HashMap, HashSet};
20use std::path::Path;
21use std::thread;
22use std::time::{Duration, Instant};
23
24use anyhow::Result;
25use colored::Colorize;
26
27use crate::commands::spawn::agent;
28use crate::commands::spawn::terminal::{self, Harness};
29use crate::models::phase::Phase;
30use crate::models::task::{Task, TaskStatus};
31use crate::storage::Storage;
32
33use super::events::EventWriter;
34use super::session::{RoundState, SwarmSession};
35
36pub struct BeadsConfig {
38 pub max_concurrent: usize,
40 pub poll_interval: Duration,
42}
43
44impl Default for BeadsConfig {
45 fn default() -> Self {
46 Self {
47 max_concurrent: 5,
48 poll_interval: Duration::from_secs(3),
49 }
50 }
51}
52
53#[derive(Clone, Debug)]
55pub struct ReadyTask {
56 pub task: Task,
57 pub tag: String,
58}
59
60pub struct BeadsResult {
62 pub tasks_completed: usize,
63 pub tasks_failed: usize,
64 pub total_duration: Duration,
65}
66
67pub fn get_ready_tasks(
75 all_phases: &HashMap<String, Phase>,
76 phase_tag: &str,
77 all_tags: bool,
78) -> Vec<ReadyTask> {
79 let mut ready = Vec::new();
80
81 let all_task_refs: Vec<&Task> = all_phases.values().flat_map(|p| &p.tasks).collect();
83
84 let phase_tags: Vec<&String> = if all_tags {
86 all_phases.keys().collect()
87 } else {
88 all_phases
89 .keys()
90 .filter(|t| t.as_str() == phase_tag)
91 .collect()
92 };
93
94 for tag in phase_tags {
95 if let Some(phase) = all_phases.get(tag) {
96 for task in &phase.tasks {
97 if is_task_ready(task, phase, &all_task_refs) {
98 ready.push(ReadyTask {
99 task: task.clone(),
100 tag: tag.clone(),
101 });
102 }
103 }
104 }
105 }
106
107 ready.sort_by(|a, b| {
109 use crate::models::task::Priority;
110 let priority_ord = |p: &Priority| match p {
111 Priority::Critical => 0,
112 Priority::High => 1,
113 Priority::Medium => 2,
114 Priority::Low => 3,
115 };
116 priority_ord(&a.task.priority)
117 .cmp(&priority_ord(&b.task.priority))
118 .then_with(|| a.task.id.cmp(&b.task.id))
119 });
120
121 ready
122}
123
124fn is_task_ready(task: &Task, phase: &Phase, all_tasks: &[&Task]) -> bool {
126 if task.status != TaskStatus::Pending {
128 return false;
129 }
130
131 if task.is_expanded() {
133 return false;
134 }
135
136 if let Some(ref parent_id) = task.parent_id {
138 let parent_expanded = phase
139 .get_task(parent_id)
140 .map(|p| p.is_expanded())
141 .unwrap_or(false);
142 if !parent_expanded {
143 return false;
144 }
145 }
146
147 task.has_dependencies_met_refs(all_tasks)
150}
151
152pub fn count_in_progress(
154 all_phases: &HashMap<String, Phase>,
155 phase_tag: &str,
156 all_tags: bool,
157) -> usize {
158 let tags: Vec<&String> = if all_tags {
159 all_phases.keys().collect()
160 } else {
161 all_phases
162 .keys()
163 .filter(|t| t.as_str() == phase_tag)
164 .collect()
165 };
166
167 tags.iter()
168 .filter_map(|tag| all_phases.get(*tag))
169 .flat_map(|phase| &phase.tasks)
170 .filter(|t| t.status == TaskStatus::InProgress)
171 .count()
172}
173
174pub fn count_remaining(
176 all_phases: &HashMap<String, Phase>,
177 phase_tag: &str,
178 all_tags: bool,
179) -> usize {
180 let tags: Vec<&String> = if all_tags {
181 all_phases.keys().collect()
182 } else {
183 all_phases
184 .keys()
185 .filter(|t| t.as_str() == phase_tag)
186 .collect()
187 };
188
189 tags.iter()
190 .filter_map(|tag| all_phases.get(*tag))
191 .flat_map(|phase| &phase.tasks)
192 .filter(|t| {
193 t.status == TaskStatus::InProgress
194 || (t.status == TaskStatus::Pending && !t.is_expanded())
195 })
196 .count()
197}
198
199pub fn claim_task(storage: &Storage, task_id: &str, tag: &str) -> Result<bool> {
201 let mut phase = storage.load_group(tag)?;
202
203 if let Some(task) = phase.get_task_mut(task_id) {
204 if task.status == TaskStatus::Pending {
206 task.set_status(TaskStatus::InProgress);
207 storage.update_group(tag, &phase)?;
208 return Ok(true);
209 }
210 }
211
212 Ok(false)
213}
214
215pub fn spawn_agent_tmux(
217 ready_task: &ReadyTask,
218 working_dir: &Path,
219 session_name: &str,
220 default_harness: Harness,
221) -> Result<String> {
222 let config = agent::resolve_agent_config(
224 &ready_task.task,
225 &ready_task.tag,
226 default_harness,
227 None,
228 working_dir,
229 );
230
231 let window_index = terminal::spawn_terminal_with_harness_and_model(
233 &ready_task.task.id,
234 &config.prompt,
235 working_dir,
236 session_name,
237 config.harness,
238 config.model.as_deref(),
239 )?;
240
241 Ok(format!("{}:{}", session_name, window_index))
242}
243
244pub fn run_beads_loop(
250 storage: &Storage,
251 phase_tag: &str,
252 all_tags: bool,
253 working_dir: &Path,
254 session_name: &str,
255 default_harness: Harness,
256 config: &BeadsConfig,
257 session: &mut SwarmSession,
258) -> Result<BeadsResult> {
259 let start_time = Instant::now();
260 let mut tasks_completed = 0;
261 let mut tasks_failed = 0;
262 let mut spawned_tasks: HashSet<String> = HashSet::new();
263 let mut spawned_times: HashMap<String, Instant> = HashMap::new();
264 let mut round_state = RoundState::new(0); let event_writer = EventWriter::new(working_dir, session_name)
268 .map_err(|e| anyhow::anyhow!("Failed to initialize event writer: {}", e))?;
269
270 println!();
271 println!("{}", "Beads Execution Mode".cyan().bold());
272 println!("{}", "═".repeat(50));
273 println!(" {} Continuous ready-task polling", "Mode:".dimmed());
274 if let Some(session_file) = event_writer.session_file() {
275 println!(
276 " {} {}",
277 "Event log:".dimmed(),
278 session_file.display().to_string().dimmed()
279 );
280 }
281 println!(
282 " {} {}",
283 "Max concurrent:".dimmed(),
284 config.max_concurrent.to_string().cyan()
285 );
286 println!(
287 " {} {}ms",
288 "Poll interval:".dimmed(),
289 config.poll_interval.as_millis().to_string().cyan()
290 );
291 println!();
292
293 loop {
294 let all_phases = storage.load_tasks()?;
296
297 let in_progress = count_in_progress(&all_phases, phase_tag, all_tags);
299 let remaining = count_remaining(&all_phases, phase_tag, all_tags);
300
301 if remaining == 0 {
303 println!();
304 println!("{}", "All tasks complete!".green().bold());
305 break;
306 }
307
308 let ready_tasks = get_ready_tasks(&all_phases, phase_tag, all_tags);
310
311 let ready_tasks: Vec<_> = ready_tasks
313 .into_iter()
314 .filter(|rt| !spawned_tasks.contains(&rt.task.id))
315 .collect();
316
317 if ready_tasks.is_empty() {
318 if in_progress > 0 {
319 print!(
321 "\r {} {} task(s) in progress, waiting... ",
322 "⏳".dimmed(),
323 in_progress.to_string().cyan()
324 );
325 std::io::Write::flush(&mut std::io::stdout())?;
326 thread::sleep(config.poll_interval);
327 continue;
328 } else {
329 println!();
331 println!("{}", "No ready tasks and none in progress.".yellow());
332 println!(
333 " {} {} remaining task(s) may be blocked.",
334 "!".yellow(),
335 remaining
336 );
337 println!(" Check for circular dependencies or missing dependencies.");
338 break;
339 }
340 }
341
342 print!("\r{}\r", " ".repeat(60));
344
345 let available_slots = config.max_concurrent.saturating_sub(in_progress);
347 let to_spawn = ready_tasks.into_iter().take(available_slots);
348
349 for ready_task in to_spawn {
351 if !claim_task(storage, &ready_task.task.id, &ready_task.tag)? {
353 continue;
355 }
356
357 spawned_tasks.insert(ready_task.task.id.clone());
359 spawned_times.insert(ready_task.task.id.clone(), Instant::now());
360
361 if let Err(e) = event_writer.log_spawned(&ready_task.task.id) {
363 eprintln!("Warning: Failed to log spawn event: {}", e);
364 }
365
366 match spawn_agent_tmux(&ready_task, working_dir, session_name, default_harness) {
368 Ok(window_info) => {
369 println!(
370 " {} Spawned: {} | {} [{}]",
371 "✓".green(),
372 ready_task.task.id.cyan(),
373 ready_task.task.title.dimmed(),
374 window_info.dimmed()
375 );
376 round_state.task_ids.push(ready_task.task.id.clone());
377 round_state.tags.push(ready_task.tag.clone());
378 }
379 Err(e) => {
380 println!(
381 " {} Failed: {} - {}",
382 "✗".red(),
383 ready_task.task.id.red(),
384 e
385 );
386 round_state.failures.push(ready_task.task.id.clone());
387 tasks_failed += 1;
388
389 if let Err(log_err) = event_writer.log_completed(&ready_task.task.id, false, 0)
391 {
392 eprintln!("Warning: Failed to log completion event: {}", log_err);
393 }
394
395 if let Ok(mut phase) = storage.load_group(&ready_task.tag) {
397 if let Some(task) = phase.get_task_mut(&ready_task.task.id) {
398 task.set_status(TaskStatus::Failed);
399 let _ = storage.update_group(&ready_task.tag, &phase);
400 }
401 }
402 }
403 }
404 }
405
406 let mut newly_completed: Vec<(String, bool)> = Vec::new();
408 for task_id in &spawned_tasks {
409 if !spawned_times.contains_key(task_id) {
411 continue;
412 }
413 for phase in all_phases.values() {
414 if let Some(task) = phase.get_task(task_id) {
415 match task.status {
416 TaskStatus::Done => {
417 newly_completed.push((task_id.clone(), true));
418 }
419 TaskStatus::Failed => {
420 newly_completed.push((task_id.clone(), false));
421 }
422 _ => {}
423 }
424 break;
425 }
426 }
427 }
428
429 for (task_id, success) in newly_completed {
431 if let Some(spawn_time) = spawned_times.remove(&task_id) {
432 spawned_tasks.remove(&task_id);
434
435 let duration_ms = spawn_time.elapsed().as_millis() as u64;
436 if let Err(e) = event_writer.log_completed(&task_id, success, duration_ms) {
437 eprintln!("Warning: Failed to log completion: {}", e);
438 }
439 if success {
440 tasks_completed += 1;
441 println!(
442 " {} Completed: {} ({}ms)",
443 "✓".green(),
444 task_id.cyan(),
445 duration_ms
446 );
447
448 for phase in all_phases.values() {
451 for potential_unblocked in &phase.tasks {
452 if potential_unblocked.status == TaskStatus::Pending
453 && potential_unblocked.dependencies.contains(&task_id)
454 {
455 if let Err(e) =
456 event_writer.log_unblocked(&potential_unblocked.id, &task_id)
457 {
458 eprintln!("Warning: Failed to log unblock: {}", e);
459 }
460 }
461 }
462 }
463 } else {
464 tasks_failed += 1;
465 }
466 }
467 }
468
469 if in_progress >= config.max_concurrent {
471 thread::sleep(config.poll_interval);
472 } else {
473 thread::sleep(Duration::from_millis(100));
475 }
476 }
477
478 let mut wave_state = super::session::WaveState::new(1);
480 wave_state.rounds.push(round_state);
481 session.waves.push(wave_state);
482
483 Ok(BeadsResult {
484 tasks_completed,
485 tasks_failed,
486 total_duration: start_time.elapsed(),
487 })
488}
489
490#[cfg(test)]
494mod tests {
495 use super::*;
496 use crate::models::task::Priority;
497 use tempfile::TempDir;
498
499 fn create_test_task(id: &str, status: TaskStatus, deps: Vec<&str>) -> Task {
500 let mut task = Task::new(
501 id.to_string(),
502 format!("Task {}", id),
503 "Description".to_string(),
504 );
505 task.status = status;
506 task.dependencies = deps.into_iter().map(String::from).collect();
507 task
508 }
509
510 fn setup_storage_with_phase(phase: &Phase, tag: &str) -> (TempDir, Storage) {
511 let temp_dir = TempDir::new().unwrap();
512 let storage = Storage::new(Some(temp_dir.path().to_path_buf()));
513 storage.update_group(tag, phase).unwrap();
514 (temp_dir, storage)
515 }
516
517 #[test]
518 fn test_get_ready_tasks_no_deps() {
519 let mut phase = Phase::new("test".to_string());
520 phase
521 .tasks
522 .push(create_test_task("1", TaskStatus::Pending, vec![]));
523 phase
524 .tasks
525 .push(create_test_task("2", TaskStatus::Pending, vec![]));
526
527 let mut phases = HashMap::new();
528 phases.insert("test".to_string(), phase);
529
530 let ready = get_ready_tasks(&phases, "test", false);
531 assert_eq!(ready.len(), 2);
532 }
533
534 #[test]
535 fn test_get_ready_tasks_with_deps_met() {
536 let mut phase = Phase::new("test".to_string());
537 phase
538 .tasks
539 .push(create_test_task("1", TaskStatus::Done, vec![]));
540 phase
541 .tasks
542 .push(create_test_task("2", TaskStatus::Pending, vec!["1"]));
543
544 let mut phases = HashMap::new();
545 phases.insert("test".to_string(), phase);
546
547 let ready = get_ready_tasks(&phases, "test", false);
548 assert_eq!(ready.len(), 1);
549 assert_eq!(ready[0].task.id, "2");
550 }
551
552 #[test]
553 fn test_get_ready_tasks_with_deps_not_met() {
554 let mut phase = Phase::new("test".to_string());
555 phase
556 .tasks
557 .push(create_test_task("1", TaskStatus::InProgress, vec![]));
558 phase
559 .tasks
560 .push(create_test_task("2", TaskStatus::Pending, vec!["1"]));
561
562 let mut phases = HashMap::new();
563 phases.insert("test".to_string(), phase);
564
565 let ready = get_ready_tasks(&phases, "test", false);
566 assert_eq!(ready.len(), 0);
567 }
568
569 #[test]
570 fn test_get_ready_tasks_skips_expanded() {
571 let mut phase = Phase::new("test".to_string());
572 let mut expanded_task = create_test_task("1", TaskStatus::Expanded, vec![]);
573 expanded_task.subtasks = vec!["1.1".to_string()];
574 phase.tasks.push(expanded_task);
575
576 let mut subtask = create_test_task("1.1", TaskStatus::Pending, vec![]);
577 subtask.parent_id = Some("1".to_string());
578 phase.tasks.push(subtask);
579
580 let mut phases = HashMap::new();
581 phases.insert("test".to_string(), phase);
582
583 let ready = get_ready_tasks(&phases, "test", false);
584 assert_eq!(ready.len(), 1);
585 assert_eq!(ready[0].task.id, "1.1");
586 }
587
588 #[test]
589 fn test_get_ready_tasks_priority_sort() {
590 let mut phase = Phase::new("test".to_string());
591
592 let mut low = create_test_task("low", TaskStatus::Pending, vec![]);
593 low.priority = Priority::Low;
594
595 let mut critical = create_test_task("critical", TaskStatus::Pending, vec![]);
596 critical.priority = Priority::Critical;
597
598 let mut high = create_test_task("high", TaskStatus::Pending, vec![]);
599 high.priority = Priority::High;
600
601 phase.tasks.push(low);
602 phase.tasks.push(critical);
603 phase.tasks.push(high);
604
605 let mut phases = HashMap::new();
606 phases.insert("test".to_string(), phase);
607
608 let ready = get_ready_tasks(&phases, "test", false);
609 assert_eq!(ready.len(), 3);
610 assert_eq!(ready[0].task.id, "critical");
611 assert_eq!(ready[1].task.id, "high");
612 assert_eq!(ready[2].task.id, "low");
613 }
614
615 #[test]
616 fn test_count_in_progress() {
617 let mut phase = Phase::new("test".to_string());
618 phase
619 .tasks
620 .push(create_test_task("1", TaskStatus::InProgress, vec![]));
621 phase
622 .tasks
623 .push(create_test_task("2", TaskStatus::InProgress, vec![]));
624 phase
625 .tasks
626 .push(create_test_task("3", TaskStatus::Pending, vec![]));
627 phase
628 .tasks
629 .push(create_test_task("4", TaskStatus::Done, vec![]));
630
631 let mut phases = HashMap::new();
632 phases.insert("test".to_string(), phase);
633
634 assert_eq!(count_in_progress(&phases, "test", false), 2);
635 }
636
637 #[test]
638 fn test_count_remaining() {
639 let mut phase = Phase::new("test".to_string());
640 phase
641 .tasks
642 .push(create_test_task("1", TaskStatus::InProgress, vec![]));
643 phase
644 .tasks
645 .push(create_test_task("2", TaskStatus::Pending, vec![]));
646 phase
647 .tasks
648 .push(create_test_task("3", TaskStatus::Done, vec![]));
649 phase
650 .tasks
651 .push(create_test_task("4", TaskStatus::Failed, vec![]));
652
653 let mut phases = HashMap::new();
654 phases.insert("test".to_string(), phase);
655
656 assert_eq!(count_remaining(&phases, "test", false), 2); }
658
659 #[test]
660 fn test_claim_task_pending() {
661 let mut phase = Phase::new("test".to_string());
662 phase
663 .tasks
664 .push(create_test_task("1", TaskStatus::Pending, vec![]));
665
666 let (_temp_dir, storage) = setup_storage_with_phase(&phase, "test");
667
668 let claimed = claim_task(&storage, "1", "test").unwrap();
670 assert!(claimed);
671
672 let reloaded = storage.load_group("test").unwrap();
674 assert_eq!(
675 reloaded.get_task("1").unwrap().status,
676 TaskStatus::InProgress
677 );
678 }
679
680 #[test]
681 fn test_claim_task_already_in_progress() {
682 let mut phase = Phase::new("test".to_string());
683 phase
684 .tasks
685 .push(create_test_task("1", TaskStatus::InProgress, vec![]));
686
687 let (_temp_dir, storage) = setup_storage_with_phase(&phase, "test");
688
689 let claimed = claim_task(&storage, "1", "test").unwrap();
691 assert!(!claimed);
692 }
693
694 #[test]
695 fn test_claim_task_nonexistent() {
696 let mut phase = Phase::new("test".to_string());
697 phase
698 .tasks
699 .push(create_test_task("1", TaskStatus::Pending, vec![]));
700
701 let (_temp_dir, storage) = setup_storage_with_phase(&phase, "test");
702
703 let claimed = claim_task(&storage, "nonexistent", "test").unwrap();
705 assert!(!claimed);
706 }
707
708 #[test]
709 fn test_claim_task_already_done() {
710 let mut phase = Phase::new("test".to_string());
711 phase
712 .tasks
713 .push(create_test_task("1", TaskStatus::Done, vec![]));
714
715 let (_temp_dir, storage) = setup_storage_with_phase(&phase, "test");
716
717 let claimed = claim_task(&storage, "1", "test").unwrap();
719 assert!(!claimed);
720 }
721}