1use std::collections::{HashMap, HashSet};
20use std::path::Path;
21use std::thread;
22use std::time::{Duration, Instant};
23
24use anyhow::Result;
25use colored::Colorize;
26
27use crate::commands::spawn::agent;
28use crate::commands::spawn::terminal::{self, Harness};
29use crate::models::phase::Phase;
30use crate::models::task::{Task, TaskStatus};
31use crate::storage::Storage;
32
33use super::events::EventWriter;
34use super::session::{RoundState, SwarmSession};
35
36pub struct BeadsConfig {
38 pub max_concurrent: usize,
40 pub poll_interval: Duration,
42}
43
44impl Default for BeadsConfig {
45 fn default() -> Self {
46 Self {
47 max_concurrent: 5,
48 poll_interval: Duration::from_secs(3),
49 }
50 }
51}
52
53#[derive(Clone, Debug)]
55pub struct ReadyTask {
56 pub task: Task,
57 pub tag: String,
58}
59
60pub struct BeadsResult {
62 pub tasks_completed: usize,
63 pub tasks_failed: usize,
64 pub total_duration: Duration,
65}
66
67pub fn get_ready_tasks(
75 all_phases: &HashMap<String, Phase>,
76 phase_tag: &str,
77 all_tags: bool,
78) -> Vec<ReadyTask> {
79 let mut ready = Vec::new();
80
81 let all_task_refs: Vec<&Task> = all_phases.values().flat_map(|p| &p.tasks).collect();
83
84 let phase_tags: Vec<&String> = if all_tags {
86 all_phases.keys().collect()
87 } else {
88 all_phases
89 .keys()
90 .filter(|t| t.as_str() == phase_tag)
91 .collect()
92 };
93
94 for tag in phase_tags {
95 if let Some(phase) = all_phases.get(tag) {
96 for task in &phase.tasks {
97 if is_task_ready(task, phase, &all_task_refs) {
98 ready.push(ReadyTask {
99 task: task.clone(),
100 tag: tag.clone(),
101 });
102 }
103 }
104 }
105 }
106
107 ready.sort_by(|a, b| {
109 use crate::models::task::Priority;
110 let priority_ord = |p: &Priority| match p {
111 Priority::Critical => 0,
112 Priority::High => 1,
113 Priority::Medium => 2,
114 Priority::Low => 3,
115 };
116 priority_ord(&a.task.priority)
117 .cmp(&priority_ord(&b.task.priority))
118 .then_with(|| a.task.id.cmp(&b.task.id))
119 });
120
121 ready
122}
123
124fn is_task_ready(task: &Task, phase: &Phase, all_tasks: &[&Task]) -> bool {
126 if task.status != TaskStatus::Pending {
128 return false;
129 }
130
131 if task.is_expanded() {
133 return false;
134 }
135
136 if let Some(ref parent_id) = task.parent_id {
138 let parent_expanded = phase
139 .get_task(parent_id)
140 .map(|p| p.is_expanded())
141 .unwrap_or(false);
142 if !parent_expanded {
143 return false;
144 }
145 }
146
147 task.has_dependencies_met_refs(all_tasks)
150}
151
152pub fn count_in_progress(
154 all_phases: &HashMap<String, Phase>,
155 phase_tag: &str,
156 all_tags: bool,
157) -> usize {
158 let tags: Vec<&String> = if all_tags {
159 all_phases.keys().collect()
160 } else {
161 all_phases
162 .keys()
163 .filter(|t| t.as_str() == phase_tag)
164 .collect()
165 };
166
167 tags.iter()
168 .filter_map(|tag| all_phases.get(*tag))
169 .flat_map(|phase| &phase.tasks)
170 .filter(|t| t.status == TaskStatus::InProgress)
171 .count()
172}
173
174pub fn count_remaining(
176 all_phases: &HashMap<String, Phase>,
177 phase_tag: &str,
178 all_tags: bool,
179) -> usize {
180 let tags: Vec<&String> = if all_tags {
181 all_phases.keys().collect()
182 } else {
183 all_phases
184 .keys()
185 .filter(|t| t.as_str() == phase_tag)
186 .collect()
187 };
188
189 tags.iter()
190 .filter_map(|tag| all_phases.get(*tag))
191 .flat_map(|phase| &phase.tasks)
192 .filter(|t| {
193 t.status == TaskStatus::InProgress
194 || (t.status == TaskStatus::Pending && !t.is_expanded())
195 })
196 .count()
197}
198
199pub fn claim_task(storage: &Storage, task_id: &str, tag: &str) -> Result<bool> {
201 let mut phase = storage.load_group(tag)?;
202
203 if let Some(task) = phase.get_task_mut(task_id) {
204 if task.status == TaskStatus::Pending {
206 task.set_status(TaskStatus::InProgress);
207 storage.update_group(tag, &phase)?;
208 return Ok(true);
209 }
210 }
211
212 Ok(false)
213}
214
215pub fn spawn_agent_tmux(
217 ready_task: &ReadyTask,
218 working_dir: &Path,
219 session_name: &str,
220 default_harness: Harness,
221) -> Result<String> {
222 let config = agent::resolve_agent_config(
224 &ready_task.task,
225 &ready_task.tag,
226 default_harness,
227 None,
228 working_dir,
229 );
230
231 let window_index = terminal::spawn_terminal_with_harness_and_model(
233 &ready_task.task.id,
234 &config.prompt,
235 working_dir,
236 session_name,
237 config.harness,
238 config.model.as_deref(),
239 )?;
240
241 Ok(format!("{}:{}", session_name, window_index))
242}
243
244pub fn run_beads_loop(
250 storage: &Storage,
251 phase_tag: &str,
252 all_tags: bool,
253 working_dir: &Path,
254 session_name: &str,
255 default_harness: Harness,
256 config: &BeadsConfig,
257 session: &mut SwarmSession,
258) -> Result<BeadsResult> {
259 let start_time = Instant::now();
260 let mut tasks_completed = 0;
261 let mut tasks_failed = 0;
262 let mut spawned_tasks: HashSet<String> = HashSet::new();
263 let mut spawned_times: HashMap<String, Instant> = HashMap::new();
264 let mut round_state = RoundState::new(0); let event_writer = EventWriter::new(working_dir, session_name)
268 .map_err(|e| anyhow::anyhow!("Failed to initialize event writer: {}", e))?;
269
270 println!();
271 println!("{}", "Beads Execution Mode".cyan().bold());
272 println!("{}", "═".repeat(50));
273 println!(
274 " {} Continuous ready-task polling",
275 "Mode:".dimmed()
276 );
277 println!(
278 " {} {}",
279 "Event log:".dimmed(),
280 event_writer.session_file().display().to_string().dimmed()
281 );
282 println!(
283 " {} {}",
284 "Max concurrent:".dimmed(),
285 config.max_concurrent.to_string().cyan()
286 );
287 println!(
288 " {} {}ms",
289 "Poll interval:".dimmed(),
290 config.poll_interval.as_millis().to_string().cyan()
291 );
292 println!();
293
294 loop {
295 let all_phases = storage.load_tasks()?;
297
298 let in_progress = count_in_progress(&all_phases, phase_tag, all_tags);
300 let remaining = count_remaining(&all_phases, phase_tag, all_tags);
301
302 if remaining == 0 {
304 println!();
305 println!("{}", "All tasks complete!".green().bold());
306 break;
307 }
308
309 let ready_tasks = get_ready_tasks(&all_phases, phase_tag, all_tags);
311
312 let ready_tasks: Vec<_> = ready_tasks
314 .into_iter()
315 .filter(|rt| !spawned_tasks.contains(&rt.task.id))
316 .collect();
317
318 if ready_tasks.is_empty() {
319 if in_progress > 0 {
320 print!(
322 "\r {} {} task(s) in progress, waiting... ",
323 "⏳".dimmed(),
324 in_progress.to_string().cyan()
325 );
326 std::io::Write::flush(&mut std::io::stdout())?;
327 thread::sleep(config.poll_interval);
328 continue;
329 } else {
330 println!();
332 println!(
333 "{}",
334 "No ready tasks and none in progress.".yellow()
335 );
336 println!(
337 " {} {} remaining task(s) may be blocked.",
338 "!".yellow(),
339 remaining
340 );
341 println!(" Check for circular dependencies or missing dependencies.");
342 break;
343 }
344 }
345
346 print!("\r{}\r", " ".repeat(60));
348
349 let available_slots = config.max_concurrent.saturating_sub(in_progress);
351 let to_spawn = ready_tasks.into_iter().take(available_slots);
352
353 for ready_task in to_spawn {
355 if !claim_task(storage, &ready_task.task.id, &ready_task.tag)? {
357 continue;
359 }
360
361 spawned_tasks.insert(ready_task.task.id.clone());
363 spawned_times.insert(ready_task.task.id.clone(), Instant::now());
364
365 if let Err(e) = event_writer.log_spawned(&ready_task.task.id) {
367 eprintln!("Warning: Failed to log spawn event: {}", e);
368 }
369
370 match spawn_agent_tmux(&ready_task, working_dir, session_name, default_harness) {
372 Ok(window_info) => {
373 println!(
374 " {} Spawned: {} | {} [{}]",
375 "✓".green(),
376 ready_task.task.id.cyan(),
377 ready_task.task.title.dimmed(),
378 window_info.dimmed()
379 );
380 round_state.task_ids.push(ready_task.task.id.clone());
381 round_state.tags.push(ready_task.tag.clone());
382 }
383 Err(e) => {
384 println!(
385 " {} Failed: {} - {}",
386 "✗".red(),
387 ready_task.task.id.red(),
388 e
389 );
390 round_state.failures.push(ready_task.task.id.clone());
391 tasks_failed += 1;
392
393 if let Err(log_err) = event_writer.log_completed(&ready_task.task.id, false, 0) {
395 eprintln!("Warning: Failed to log completion event: {}", log_err);
396 }
397
398 if let Ok(mut phase) = storage.load_group(&ready_task.tag) {
400 if let Some(task) = phase.get_task_mut(&ready_task.task.id) {
401 task.set_status(TaskStatus::Failed);
402 let _ = storage.update_group(&ready_task.tag, &phase);
403 }
404 }
405 }
406 }
407 }
408
409 let mut newly_completed: Vec<(String, bool)> = Vec::new();
411 for task_id in &spawned_tasks {
412 if !spawned_times.contains_key(task_id) {
414 continue;
415 }
416 for phase in all_phases.values() {
417 if let Some(task) = phase.get_task(task_id) {
418 match task.status {
419 TaskStatus::Done => {
420 newly_completed.push((task_id.clone(), true));
421 }
422 TaskStatus::Failed => {
423 newly_completed.push((task_id.clone(), false));
424 }
425 _ => {}
426 }
427 break;
428 }
429 }
430 }
431
432 for (task_id, success) in newly_completed {
434 if let Some(spawn_time) = spawned_times.remove(&task_id) {
435 spawned_tasks.remove(&task_id);
437
438 let duration_ms = spawn_time.elapsed().as_millis() as u64;
439 if let Err(e) = event_writer.log_completed(&task_id, success, duration_ms) {
440 eprintln!("Warning: Failed to log completion: {}", e);
441 }
442 if success {
443 tasks_completed += 1;
444 println!(
445 " {} Completed: {} ({}ms)",
446 "✓".green(),
447 task_id.cyan(),
448 duration_ms
449 );
450
451 for phase in all_phases.values() {
454 for potential_unblocked in &phase.tasks {
455 if potential_unblocked.status == TaskStatus::Pending
456 && potential_unblocked.dependencies.contains(&task_id)
457 {
458 if let Err(e) = event_writer.log_unblocked(&potential_unblocked.id, &task_id) {
459 eprintln!("Warning: Failed to log unblock: {}", e);
460 }
461 }
462 }
463 }
464 } else {
465 tasks_failed += 1;
466 }
467 }
468 }
469
470 if in_progress >= config.max_concurrent {
472 thread::sleep(config.poll_interval);
473 } else {
474 thread::sleep(Duration::from_millis(100));
476 }
477 }
478
479 let mut wave_state = super::session::WaveState::new(1);
481 wave_state.rounds.push(round_state);
482 session.waves.push(wave_state);
483
484 Ok(BeadsResult {
485 tasks_completed,
486 tasks_failed,
487 total_duration: start_time.elapsed(),
488 })
489}
490
491#[cfg(test)]
495mod tests {
496 use super::*;
497 use crate::models::task::Priority;
498 use tempfile::TempDir;
499
500 fn create_test_task(id: &str, status: TaskStatus, deps: Vec<&str>) -> Task {
501 let mut task = Task::new(id.to_string(), format!("Task {}", id), "Description".to_string());
502 task.status = status;
503 task.dependencies = deps.into_iter().map(String::from).collect();
504 task
505 }
506
507 fn setup_storage_with_phase(phase: &Phase, tag: &str) -> (TempDir, Storage) {
508 let temp_dir = TempDir::new().unwrap();
509 let storage = Storage::new(Some(temp_dir.path().to_path_buf()));
510 storage.update_group(tag, phase).unwrap();
511 (temp_dir, storage)
512 }
513
514 #[test]
515 fn test_get_ready_tasks_no_deps() {
516 let mut phase = Phase::new("test".to_string());
517 phase.tasks.push(create_test_task("1", TaskStatus::Pending, vec![]));
518 phase.tasks.push(create_test_task("2", TaskStatus::Pending, vec![]));
519
520 let mut phases = HashMap::new();
521 phases.insert("test".to_string(), phase);
522
523 let ready = get_ready_tasks(&phases, "test", false);
524 assert_eq!(ready.len(), 2);
525 }
526
527 #[test]
528 fn test_get_ready_tasks_with_deps_met() {
529 let mut phase = Phase::new("test".to_string());
530 phase.tasks.push(create_test_task("1", TaskStatus::Done, vec![]));
531 phase.tasks.push(create_test_task("2", TaskStatus::Pending, vec!["1"]));
532
533 let mut phases = HashMap::new();
534 phases.insert("test".to_string(), phase);
535
536 let ready = get_ready_tasks(&phases, "test", false);
537 assert_eq!(ready.len(), 1);
538 assert_eq!(ready[0].task.id, "2");
539 }
540
541 #[test]
542 fn test_get_ready_tasks_with_deps_not_met() {
543 let mut phase = Phase::new("test".to_string());
544 phase.tasks.push(create_test_task("1", TaskStatus::InProgress, vec![]));
545 phase.tasks.push(create_test_task("2", TaskStatus::Pending, vec!["1"]));
546
547 let mut phases = HashMap::new();
548 phases.insert("test".to_string(), phase);
549
550 let ready = get_ready_tasks(&phases, "test", false);
551 assert_eq!(ready.len(), 0);
552 }
553
554 #[test]
555 fn test_get_ready_tasks_skips_expanded() {
556 let mut phase = Phase::new("test".to_string());
557 let mut expanded_task = create_test_task("1", TaskStatus::Expanded, vec![]);
558 expanded_task.subtasks = vec!["1.1".to_string()];
559 phase.tasks.push(expanded_task);
560
561 let mut subtask = create_test_task("1.1", TaskStatus::Pending, vec![]);
562 subtask.parent_id = Some("1".to_string());
563 phase.tasks.push(subtask);
564
565 let mut phases = HashMap::new();
566 phases.insert("test".to_string(), phase);
567
568 let ready = get_ready_tasks(&phases, "test", false);
569 assert_eq!(ready.len(), 1);
570 assert_eq!(ready[0].task.id, "1.1");
571 }
572
573 #[test]
574 fn test_get_ready_tasks_priority_sort() {
575 let mut phase = Phase::new("test".to_string());
576
577 let mut low = create_test_task("low", TaskStatus::Pending, vec![]);
578 low.priority = Priority::Low;
579
580 let mut critical = create_test_task("critical", TaskStatus::Pending, vec![]);
581 critical.priority = Priority::Critical;
582
583 let mut high = create_test_task("high", TaskStatus::Pending, vec![]);
584 high.priority = Priority::High;
585
586 phase.tasks.push(low);
587 phase.tasks.push(critical);
588 phase.tasks.push(high);
589
590 let mut phases = HashMap::new();
591 phases.insert("test".to_string(), phase);
592
593 let ready = get_ready_tasks(&phases, "test", false);
594 assert_eq!(ready.len(), 3);
595 assert_eq!(ready[0].task.id, "critical");
596 assert_eq!(ready[1].task.id, "high");
597 assert_eq!(ready[2].task.id, "low");
598 }
599
600 #[test]
601 fn test_count_in_progress() {
602 let mut phase = Phase::new("test".to_string());
603 phase.tasks.push(create_test_task("1", TaskStatus::InProgress, vec![]));
604 phase.tasks.push(create_test_task("2", TaskStatus::InProgress, vec![]));
605 phase.tasks.push(create_test_task("3", TaskStatus::Pending, vec![]));
606 phase.tasks.push(create_test_task("4", TaskStatus::Done, vec![]));
607
608 let mut phases = HashMap::new();
609 phases.insert("test".to_string(), phase);
610
611 assert_eq!(count_in_progress(&phases, "test", false), 2);
612 }
613
614 #[test]
615 fn test_count_remaining() {
616 let mut phase = Phase::new("test".to_string());
617 phase.tasks.push(create_test_task("1", TaskStatus::InProgress, vec![]));
618 phase.tasks.push(create_test_task("2", TaskStatus::Pending, vec![]));
619 phase.tasks.push(create_test_task("3", TaskStatus::Done, vec![]));
620 phase.tasks.push(create_test_task("4", TaskStatus::Failed, vec![]));
621
622 let mut phases = HashMap::new();
623 phases.insert("test".to_string(), phase);
624
625 assert_eq!(count_remaining(&phases, "test", false), 2); }
627
628 #[test]
629 fn test_claim_task_pending() {
630 let mut phase = Phase::new("test".to_string());
631 phase.tasks.push(create_test_task("1", TaskStatus::Pending, vec![]));
632
633 let (_temp_dir, storage) = setup_storage_with_phase(&phase, "test");
634
635 let claimed = claim_task(&storage, "1", "test").unwrap();
637 assert!(claimed);
638
639 let reloaded = storage.load_group("test").unwrap();
641 assert_eq!(reloaded.get_task("1").unwrap().status, TaskStatus::InProgress);
642 }
643
644 #[test]
645 fn test_claim_task_already_in_progress() {
646 let mut phase = Phase::new("test".to_string());
647 phase.tasks.push(create_test_task("1", TaskStatus::InProgress, vec![]));
648
649 let (_temp_dir, storage) = setup_storage_with_phase(&phase, "test");
650
651 let claimed = claim_task(&storage, "1", "test").unwrap();
653 assert!(!claimed);
654 }
655
656 #[test]
657 fn test_claim_task_nonexistent() {
658 let mut phase = Phase::new("test".to_string());
659 phase.tasks.push(create_test_task("1", TaskStatus::Pending, vec![]));
660
661 let (_temp_dir, storage) = setup_storage_with_phase(&phase, "test");
662
663 let claimed = claim_task(&storage, "nonexistent", "test").unwrap();
665 assert!(!claimed);
666 }
667
668 #[test]
669 fn test_claim_task_already_done() {
670 let mut phase = Phase::new("test".to_string());
671 phase.tasks.push(create_test_task("1", TaskStatus::Done, vec![]));
672
673 let (_temp_dir, storage) = setup_storage_with_phase(&phase, "test");
674
675 let claimed = claim_task(&storage, "1", "test").unwrap();
677 assert!(!claimed);
678 }
679}