1use std::collections::{HashMap, HashSet};
20use std::path::Path;
21use std::thread;
22use std::time::{Duration, Instant};
23
24use anyhow::Result;
25use colored::Colorize;
26
27use crate::commands::spawn::agent;
28use crate::commands::spawn::terminal::{self, Harness};
29use crate::commands::task_selection::{
30 count_in_progress_tasks, is_actionable_pending_task, scoped_phases,
31};
32use crate::models::phase::Phase;
33use crate::models::task::{Task, TaskStatus};
34use crate::storage::Storage;
35
36use super::events::EventWriter;
37use super::session::{RoundState, SwarmSession};
38
39pub struct BeadsConfig {
41 pub max_concurrent: usize,
43 pub poll_interval: Duration,
45}
46
47impl Default for BeadsConfig {
48 fn default() -> Self {
49 Self {
50 max_concurrent: 5,
51 poll_interval: Duration::from_secs(3),
52 }
53 }
54}
55
56#[derive(Clone, Debug)]
58pub struct ReadyTask {
59 pub task: Task,
60 pub tag: String,
61}
62
63pub struct BeadsResult {
65 pub tasks_completed: usize,
66 pub tasks_failed: usize,
67 pub total_duration: Duration,
68}
69
70pub fn get_ready_tasks(
78 all_phases: &HashMap<String, Phase>,
79 phase_tag: &str,
80 all_tags: bool,
81) -> Vec<ReadyTask> {
82 let mut ready = Vec::new();
83
84 let all_task_refs: Vec<&Task> = all_phases.values().flat_map(|p| &p.tasks).collect();
86
87 let phase_tags: Vec<&String> = if all_tags {
89 all_phases.keys().collect()
90 } else {
91 all_phases
92 .keys()
93 .filter(|t| t.as_str() == phase_tag)
94 .collect()
95 };
96
97 for tag in phase_tags {
98 if let Some(phase) = all_phases.get(tag) {
99 for task in &phase.tasks {
100 if is_task_ready(task, phase, &all_task_refs) {
101 ready.push(ReadyTask {
102 task: task.clone(),
103 tag: tag.clone(),
104 });
105 }
106 }
107 }
108 }
109
110 ready.sort_by(|a, b| {
112 use crate::models::task::Priority;
113 let priority_ord = |p: &Priority| match p {
114 Priority::Critical => 0,
115 Priority::High => 1,
116 Priority::Medium => 2,
117 Priority::Low => 3,
118 };
119 priority_ord(&a.task.priority)
120 .cmp(&priority_ord(&b.task.priority))
121 .then_with(|| a.task.id.cmp(&b.task.id))
122 });
123
124 ready
125}
126
127fn is_task_ready(task: &Task, phase: &Phase, all_tasks: &[&Task]) -> bool {
129 if !is_actionable_pending_task(task, phase) {
130 return false;
131 }
132
133 task.has_dependencies_met_refs(all_tasks)
136}
137
138pub fn count_in_progress(
140 all_phases: &HashMap<String, Phase>,
141 phase_tag: &str,
142 all_tags: bool,
143) -> usize {
144 count_in_progress_tasks(all_phases, phase_tag, all_tags)
145}
146
147pub fn count_remaining(
149 all_phases: &HashMap<String, Phase>,
150 phase_tag: &str,
151 all_tags: bool,
152) -> usize {
153 scoped_phases(all_phases, phase_tag, all_tags)
154 .into_iter()
155 .flat_map(|phase| &phase.tasks)
156 .filter(|t| {
157 t.status == TaskStatus::InProgress
158 || (t.status == TaskStatus::Pending && !t.is_expanded())
159 })
160 .count()
161}
162
163pub fn claim_task(storage: &Storage, task_id: &str, tag: &str) -> Result<bool> {
165 let mut phase = storage.load_group(tag)?;
166
167 if let Some(task) = phase.get_task_mut(task_id) {
168 if task.status == TaskStatus::Pending {
170 task.set_status(TaskStatus::InProgress);
171 storage.update_group(tag, &phase)?;
172 return Ok(true);
173 }
174 }
175
176 Ok(false)
177}
178
179pub fn spawn_agent_tmux(
181 ready_task: &ReadyTask,
182 working_dir: &Path,
183 session_name: &str,
184 default_harness: Harness,
185) -> Result<String> {
186 let config = agent::resolve_agent_config(
188 &ready_task.task,
189 &ready_task.tag,
190 default_harness,
191 None,
192 working_dir,
193 );
194
195 let spawn_config = terminal::SpawnConfig {
197 task_id: &ready_task.task.id,
198 prompt: &config.prompt,
199 working_dir,
200 session_name,
201 harness: config.harness,
202 model: config.model.as_deref(),
203 task_list_id: None,
204 };
205 let window_index = terminal::spawn_tmux_agent(&spawn_config)?;
206
207 Ok(format!("{}:{}", session_name, window_index))
208}
209
210#[allow(clippy::too_many_arguments)]
216pub fn run_beads_loop(
217 storage: &Storage,
218 phase_tag: &str,
219 all_tags: bool,
220 working_dir: &Path,
221 session_name: &str,
222 default_harness: Harness,
223 config: &BeadsConfig,
224 session: &mut SwarmSession,
225) -> Result<BeadsResult> {
226 let start_time = Instant::now();
227 let mut tasks_completed = 0;
228 let mut tasks_failed = 0;
229 let mut spawned_tasks: HashSet<String> = HashSet::new();
230 let mut spawned_times: HashMap<String, Instant> = HashMap::new();
231 let mut round_state = RoundState::new(0); let event_writer = EventWriter::new(working_dir, session_name)
235 .map_err(|e| anyhow::anyhow!("Failed to initialize event writer: {}", e))?;
236
237 println!();
238 println!("{}", "Beads Execution Mode".cyan().bold());
239 println!("{}", "═".repeat(50));
240 println!(" {} Continuous ready-task polling", "Mode:".dimmed());
241 if let Some(session_file) = event_writer.session_file() {
242 println!(
243 " {} {}",
244 "Event log:".dimmed(),
245 session_file.display().to_string().dimmed()
246 );
247 }
248 println!(
249 " {} {}",
250 "Max concurrent:".dimmed(),
251 config.max_concurrent.to_string().cyan()
252 );
253 println!(
254 " {} {}ms",
255 "Poll interval:".dimmed(),
256 config.poll_interval.as_millis().to_string().cyan()
257 );
258 println!();
259
260 loop {
261 let all_phases = storage.load_tasks()?;
263
264 let in_progress = count_in_progress(&all_phases, phase_tag, all_tags);
266 let remaining = count_remaining(&all_phases, phase_tag, all_tags);
267
268 if remaining == 0 {
270 println!();
271 println!("{}", "All tasks complete!".green().bold());
272 break;
273 }
274
275 let ready_tasks = get_ready_tasks(&all_phases, phase_tag, all_tags);
277
278 let ready_tasks: Vec<_> = ready_tasks
280 .into_iter()
281 .filter(|rt| !spawned_tasks.contains(&rt.task.id))
282 .collect();
283
284 if ready_tasks.is_empty() {
285 if in_progress > 0 {
286 print!(
288 "\r {} {} task(s) in progress, waiting... ",
289 "⏳".dimmed(),
290 in_progress.to_string().cyan()
291 );
292 std::io::Write::flush(&mut std::io::stdout())?;
293 thread::sleep(config.poll_interval);
294 continue;
295 } else {
296 println!();
298 println!("{}", "No ready tasks and none in progress.".yellow());
299 println!(
300 " {} {} remaining task(s) may be blocked.",
301 "!".yellow(),
302 remaining
303 );
304 println!(" Check for circular dependencies or missing dependencies.");
305 break;
306 }
307 }
308
309 print!("\r{}\r", " ".repeat(60));
311
312 let available_slots = config.max_concurrent.saturating_sub(in_progress);
314 let to_spawn = ready_tasks.into_iter().take(available_slots);
315
316 for ready_task in to_spawn {
318 if !claim_task(storage, &ready_task.task.id, &ready_task.tag)? {
320 continue;
322 }
323
324 spawned_tasks.insert(ready_task.task.id.clone());
326 spawned_times.insert(ready_task.task.id.clone(), Instant::now());
327
328 if let Err(e) = event_writer.log_spawned(&ready_task.task.id) {
330 eprintln!("Warning: Failed to log spawn event: {}", e);
331 }
332
333 match spawn_agent_tmux(&ready_task, working_dir, session_name, default_harness) {
335 Ok(window_info) => {
336 println!(
337 " {} Spawned: {} | {} [{}]",
338 "✓".green(),
339 ready_task.task.id.cyan(),
340 ready_task.task.title.dimmed(),
341 window_info.dimmed()
342 );
343 round_state.task_ids.push(ready_task.task.id.clone());
344 round_state.tags.push(ready_task.tag.clone());
345 }
346 Err(e) => {
347 println!(
348 " {} Failed: {} - {}",
349 "✗".red(),
350 ready_task.task.id.red(),
351 e
352 );
353 round_state.failures.push(ready_task.task.id.clone());
354 tasks_failed += 1;
355
356 if let Err(log_err) = event_writer.log_completed(&ready_task.task.id, false, 0)
358 {
359 eprintln!("Warning: Failed to log completion event: {}", log_err);
360 }
361
362 if let Ok(mut phase) = storage.load_group(&ready_task.tag) {
364 if let Some(task) = phase.get_task_mut(&ready_task.task.id) {
365 task.set_status(TaskStatus::Failed);
366 let _ = storage.update_group(&ready_task.tag, &phase);
367 }
368 }
369 }
370 }
371 }
372
373 let mut newly_completed: Vec<(String, bool)> = Vec::new();
375 for task_id in &spawned_tasks {
376 if !spawned_times.contains_key(task_id) {
378 continue;
379 }
380 for phase in all_phases.values() {
381 if let Some(task) = phase.get_task(task_id) {
382 match task.status {
383 TaskStatus::Done => {
384 newly_completed.push((task_id.clone(), true));
385 }
386 TaskStatus::Failed => {
387 newly_completed.push((task_id.clone(), false));
388 }
389 _ => {}
390 }
391 break;
392 }
393 }
394 }
395
396 for (task_id, success) in newly_completed {
398 if let Some(spawn_time) = spawned_times.remove(&task_id) {
399 spawned_tasks.remove(&task_id);
401
402 let duration_ms = spawn_time.elapsed().as_millis() as u64;
403 if let Err(e) = event_writer.log_completed(&task_id, success, duration_ms) {
404 eprintln!("Warning: Failed to log completion: {}", e);
405 }
406 if success {
407 tasks_completed += 1;
408 println!(
409 " {} Completed: {} ({}ms)",
410 "✓".green(),
411 task_id.cyan(),
412 duration_ms
413 );
414
415 for phase in all_phases.values() {
418 for potential_unblocked in &phase.tasks {
419 if potential_unblocked.status == TaskStatus::Pending
420 && potential_unblocked.dependencies.contains(&task_id)
421 {
422 if let Err(e) =
423 event_writer.log_unblocked(&potential_unblocked.id, &task_id)
424 {
425 eprintln!("Warning: Failed to log unblock: {}", e);
426 }
427 }
428 }
429 }
430 } else {
431 tasks_failed += 1;
432 }
433 }
434 }
435
436 if in_progress >= config.max_concurrent {
438 thread::sleep(config.poll_interval);
439 } else {
440 thread::sleep(Duration::from_millis(100));
442 }
443 }
444
445 let mut wave_state = super::session::WaveState::new(1);
447 wave_state.rounds.push(round_state);
448 session.waves.push(wave_state);
449
450 Ok(BeadsResult {
451 tasks_completed,
452 tasks_failed,
453 total_duration: start_time.elapsed(),
454 })
455}
456
457#[cfg(test)]
461mod tests {
462 use super::*;
463 use crate::models::task::Priority;
464 use tempfile::TempDir;
465
466 fn create_test_task(id: &str, status: TaskStatus, deps: Vec<&str>) -> Task {
467 let mut task = Task::new(
468 id.to_string(),
469 format!("Task {}", id),
470 "Description".to_string(),
471 );
472 task.status = status;
473 task.dependencies = deps.into_iter().map(String::from).collect();
474 task
475 }
476
477 fn setup_storage_with_phase(phase: &Phase, tag: &str) -> (TempDir, Storage) {
478 let temp_dir = TempDir::new().unwrap();
479 let storage = Storage::new(Some(temp_dir.path().to_path_buf()));
480 storage.update_group(tag, phase).unwrap();
481 (temp_dir, storage)
482 }
483
484 #[test]
485 fn test_get_ready_tasks_no_deps() {
486 let mut phase = Phase::new("test".to_string());
487 phase
488 .tasks
489 .push(create_test_task("1", TaskStatus::Pending, vec![]));
490 phase
491 .tasks
492 .push(create_test_task("2", TaskStatus::Pending, vec![]));
493
494 let mut phases = HashMap::new();
495 phases.insert("test".to_string(), phase);
496
497 let ready = get_ready_tasks(&phases, "test", false);
498 assert_eq!(ready.len(), 2);
499 }
500
501 #[test]
502 fn test_get_ready_tasks_with_deps_met() {
503 let mut phase = Phase::new("test".to_string());
504 phase
505 .tasks
506 .push(create_test_task("1", TaskStatus::Done, vec![]));
507 phase
508 .tasks
509 .push(create_test_task("2", TaskStatus::Pending, vec!["1"]));
510
511 let mut phases = HashMap::new();
512 phases.insert("test".to_string(), phase);
513
514 let ready = get_ready_tasks(&phases, "test", false);
515 assert_eq!(ready.len(), 1);
516 assert_eq!(ready[0].task.id, "2");
517 }
518
519 #[test]
520 fn test_get_ready_tasks_with_deps_not_met() {
521 let mut phase = Phase::new("test".to_string());
522 phase
523 .tasks
524 .push(create_test_task("1", TaskStatus::InProgress, vec![]));
525 phase
526 .tasks
527 .push(create_test_task("2", TaskStatus::Pending, vec!["1"]));
528
529 let mut phases = HashMap::new();
530 phases.insert("test".to_string(), phase);
531
532 let ready = get_ready_tasks(&phases, "test", false);
533 assert_eq!(ready.len(), 0);
534 }
535
536 #[test]
537 fn test_get_ready_tasks_skips_expanded() {
538 let mut phase = Phase::new("test".to_string());
539 let mut expanded_task = create_test_task("1", TaskStatus::Expanded, vec![]);
540 expanded_task.subtasks = vec!["1.1".to_string()];
541 phase.tasks.push(expanded_task);
542
543 let mut subtask = create_test_task("1.1", TaskStatus::Pending, vec![]);
544 subtask.parent_id = Some("1".to_string());
545 phase.tasks.push(subtask);
546
547 let mut phases = HashMap::new();
548 phases.insert("test".to_string(), phase);
549
550 let ready = get_ready_tasks(&phases, "test", false);
551 assert_eq!(ready.len(), 1);
552 assert_eq!(ready[0].task.id, "1.1");
553 }
554
555 #[test]
556 fn test_get_ready_tasks_priority_sort() {
557 let mut phase = Phase::new("test".to_string());
558
559 let mut low = create_test_task("low", TaskStatus::Pending, vec![]);
560 low.priority = Priority::Low;
561
562 let mut critical = create_test_task("critical", TaskStatus::Pending, vec![]);
563 critical.priority = Priority::Critical;
564
565 let mut high = create_test_task("high", TaskStatus::Pending, vec![]);
566 high.priority = Priority::High;
567
568 phase.tasks.push(low);
569 phase.tasks.push(critical);
570 phase.tasks.push(high);
571
572 let mut phases = HashMap::new();
573 phases.insert("test".to_string(), phase);
574
575 let ready = get_ready_tasks(&phases, "test", false);
576 assert_eq!(ready.len(), 3);
577 assert_eq!(ready[0].task.id, "critical");
578 assert_eq!(ready[1].task.id, "high");
579 assert_eq!(ready[2].task.id, "low");
580 }
581
582 #[test]
583 fn test_count_in_progress() {
584 let mut phase = Phase::new("test".to_string());
585 phase
586 .tasks
587 .push(create_test_task("1", TaskStatus::InProgress, vec![]));
588 phase
589 .tasks
590 .push(create_test_task("2", TaskStatus::InProgress, vec![]));
591 phase
592 .tasks
593 .push(create_test_task("3", TaskStatus::Pending, vec![]));
594 phase
595 .tasks
596 .push(create_test_task("4", TaskStatus::Done, vec![]));
597
598 let mut phases = HashMap::new();
599 phases.insert("test".to_string(), phase);
600
601 assert_eq!(count_in_progress(&phases, "test", false), 2);
602 }
603
604 #[test]
605 fn test_count_remaining() {
606 let mut phase = Phase::new("test".to_string());
607 phase
608 .tasks
609 .push(create_test_task("1", TaskStatus::InProgress, vec![]));
610 phase
611 .tasks
612 .push(create_test_task("2", TaskStatus::Pending, vec![]));
613 phase
614 .tasks
615 .push(create_test_task("3", TaskStatus::Done, vec![]));
616 phase
617 .tasks
618 .push(create_test_task("4", TaskStatus::Failed, vec![]));
619
620 let mut phases = HashMap::new();
621 phases.insert("test".to_string(), phase);
622
623 assert_eq!(count_remaining(&phases, "test", false), 2); }
625
626 #[test]
627 fn test_claim_task_pending() {
628 let mut phase = Phase::new("test".to_string());
629 phase
630 .tasks
631 .push(create_test_task("1", TaskStatus::Pending, vec![]));
632
633 let (_temp_dir, storage) = setup_storage_with_phase(&phase, "test");
634
635 let claimed = claim_task(&storage, "1", "test").unwrap();
637 assert!(claimed);
638
639 let reloaded = storage.load_group("test").unwrap();
641 assert_eq!(
642 reloaded.get_task("1").unwrap().status,
643 TaskStatus::InProgress
644 );
645 }
646
647 #[test]
648 fn test_claim_task_already_in_progress() {
649 let mut phase = Phase::new("test".to_string());
650 phase
651 .tasks
652 .push(create_test_task("1", TaskStatus::InProgress, vec![]));
653
654 let (_temp_dir, storage) = setup_storage_with_phase(&phase, "test");
655
656 let claimed = claim_task(&storage, "1", "test").unwrap();
658 assert!(!claimed);
659 }
660
661 #[test]
662 fn test_claim_task_nonexistent() {
663 let mut phase = Phase::new("test".to_string());
664 phase
665 .tasks
666 .push(create_test_task("1", TaskStatus::Pending, vec![]));
667
668 let (_temp_dir, storage) = setup_storage_with_phase(&phase, "test");
669
670 let claimed = claim_task(&storage, "nonexistent", "test").unwrap();
672 assert!(!claimed);
673 }
674
675 #[test]
676 fn test_claim_task_already_done() {
677 let mut phase = Phase::new("test".to_string());
678 phase
679 .tasks
680 .push(create_test_task("1", TaskStatus::Done, vec![]));
681
682 let (_temp_dir, storage) = setup_storage_with_phase(&phase, "test");
683
684 let claimed = claim_task(&storage, "1", "test").unwrap();
686 assert!(!claimed);
687 }
688}