use std::collections::{HashMap, HashSet};
use std::path::Path;
use std::thread;
use std::time::{Duration, Instant};
use anyhow::Result;
use colored::Colorize;
use crate::commands::spawn::agent;
use crate::commands::spawn::terminal::{self, Harness};
use crate::commands::task_selection::{
count_in_progress_tasks, is_actionable_pending_task, scoped_phases,
};
use crate::models::phase::Phase;
use crate::models::task::{Task, TaskStatus};
use crate::storage::Storage;
use super::events::EventWriter;
use super::session::{RoundState, SwarmSession};
pub struct BeadsConfig {
pub max_concurrent: usize,
pub poll_interval: Duration,
}
impl Default for BeadsConfig {
fn default() -> Self {
Self {
max_concurrent: 5,
poll_interval: Duration::from_secs(3),
}
}
}
#[derive(Clone, Debug)]
pub struct ReadyTask {
pub task: Task,
pub tag: String,
}
pub struct BeadsResult {
pub tasks_completed: usize,
pub tasks_failed: usize,
pub total_duration: Duration,
}
pub fn get_ready_tasks(
all_phases: &HashMap<String, Phase>,
phase_tag: &str,
all_tags: bool,
) -> Vec<ReadyTask> {
let mut ready = Vec::new();
let all_task_refs: Vec<&Task> = all_phases.values().flat_map(|p| &p.tasks).collect();
let phase_tags: Vec<&String> = if all_tags {
all_phases.keys().collect()
} else {
all_phases
.keys()
.filter(|t| t.as_str() == phase_tag)
.collect()
};
for tag in phase_tags {
if let Some(phase) = all_phases.get(tag) {
for task in &phase.tasks {
if is_task_ready(task, phase, &all_task_refs) {
ready.push(ReadyTask {
task: task.clone(),
tag: tag.clone(),
});
}
}
}
}
ready.sort_by(|a, b| {
use crate::models::task::Priority;
let priority_ord = |p: &Priority| match p {
Priority::Critical => 0,
Priority::High => 1,
Priority::Medium => 2,
Priority::Low => 3,
};
priority_ord(&a.task.priority)
.cmp(&priority_ord(&b.task.priority))
.then_with(|| a.task.id.cmp(&b.task.id))
});
ready
}
fn is_task_ready(task: &Task, phase: &Phase, all_tasks: &[&Task]) -> bool {
if !is_actionable_pending_task(task, phase) {
return false;
}
task.has_dependencies_met_refs(all_tasks)
}
pub fn count_in_progress(
all_phases: &HashMap<String, Phase>,
phase_tag: &str,
all_tags: bool,
) -> usize {
count_in_progress_tasks(all_phases, phase_tag, all_tags)
}
pub fn count_remaining(
all_phases: &HashMap<String, Phase>,
phase_tag: &str,
all_tags: bool,
) -> usize {
scoped_phases(all_phases, phase_tag, all_tags)
.into_iter()
.flat_map(|phase| &phase.tasks)
.filter(|t| {
t.status == TaskStatus::InProgress
|| (t.status == TaskStatus::Pending && !t.is_expanded())
})
.count()
}
pub fn claim_task(storage: &Storage, task_id: &str, tag: &str) -> Result<bool> {
let mut phase = storage.load_group(tag)?;
if let Some(task) = phase.get_task_mut(task_id) {
if task.status == TaskStatus::Pending {
task.set_status(TaskStatus::InProgress);
storage.update_group(tag, &phase)?;
return Ok(true);
}
}
Ok(false)
}
pub fn spawn_agent_tmux(
ready_task: &ReadyTask,
working_dir: &Path,
session_name: &str,
default_harness: Harness,
) -> Result<String> {
let config = agent::resolve_agent_config(
&ready_task.task,
&ready_task.tag,
default_harness,
None,
working_dir,
);
let spawn_config = terminal::SpawnConfig {
task_id: &ready_task.task.id,
prompt: &config.prompt,
working_dir,
session_name,
harness: config.harness,
model: config.model.as_deref(),
task_list_id: None,
};
let window_index = terminal::spawn_tmux_agent(&spawn_config)?;
Ok(format!("{}:{}", session_name, window_index))
}
#[allow(clippy::too_many_arguments)]
pub fn run_beads_loop(
storage: &Storage,
phase_tag: &str,
all_tags: bool,
working_dir: &Path,
session_name: &str,
default_harness: Harness,
config: &BeadsConfig,
session: &mut SwarmSession,
) -> Result<BeadsResult> {
let start_time = Instant::now();
let mut tasks_completed = 0;
let mut tasks_failed = 0;
let mut spawned_tasks: HashSet<String> = HashSet::new();
let mut spawned_times: HashMap<String, Instant> = HashMap::new();
let mut round_state = RoundState::new(0);
let event_writer = EventWriter::new(working_dir, session_name)
.map_err(|e| anyhow::anyhow!("Failed to initialize event writer: {}", e))?;
println!();
println!("{}", "Beads Execution Mode".cyan().bold());
println!("{}", "═".repeat(50));
println!(" {} Continuous ready-task polling", "Mode:".dimmed());
if let Some(session_file) = event_writer.session_file() {
println!(
" {} {}",
"Event log:".dimmed(),
session_file.display().to_string().dimmed()
);
}
println!(
" {} {}",
"Max concurrent:".dimmed(),
config.max_concurrent.to_string().cyan()
);
println!(
" {} {}ms",
"Poll interval:".dimmed(),
config.poll_interval.as_millis().to_string().cyan()
);
println!();
loop {
let all_phases = storage.load_tasks()?;
let in_progress = count_in_progress(&all_phases, phase_tag, all_tags);
let remaining = count_remaining(&all_phases, phase_tag, all_tags);
if remaining == 0 {
println!();
println!("{}", "All tasks complete!".green().bold());
break;
}
let ready_tasks = get_ready_tasks(&all_phases, phase_tag, all_tags);
let ready_tasks: Vec<_> = ready_tasks
.into_iter()
.filter(|rt| !spawned_tasks.contains(&rt.task.id))
.collect();
if ready_tasks.is_empty() {
if in_progress > 0 {
print!(
"\r {} {} task(s) in progress, waiting... ",
"⏳".dimmed(),
in_progress.to_string().cyan()
);
std::io::Write::flush(&mut std::io::stdout())?;
thread::sleep(config.poll_interval);
continue;
} else {
println!();
println!("{}", "No ready tasks and none in progress.".yellow());
println!(
" {} {} remaining task(s) may be blocked.",
"!".yellow(),
remaining
);
println!(" Check for circular dependencies or missing dependencies.");
break;
}
}
print!("\r{}\r", " ".repeat(60));
let available_slots = config.max_concurrent.saturating_sub(in_progress);
let to_spawn = ready_tasks.into_iter().take(available_slots);
for ready_task in to_spawn {
if !claim_task(storage, &ready_task.task.id, &ready_task.tag)? {
continue;
}
spawned_tasks.insert(ready_task.task.id.clone());
spawned_times.insert(ready_task.task.id.clone(), Instant::now());
if let Err(e) = event_writer.log_spawned(&ready_task.task.id) {
eprintln!("Warning: Failed to log spawn event: {}", e);
}
match spawn_agent_tmux(&ready_task, working_dir, session_name, default_harness) {
Ok(window_info) => {
println!(
" {} Spawned: {} | {} [{}]",
"✓".green(),
ready_task.task.id.cyan(),
ready_task.task.title.dimmed(),
window_info.dimmed()
);
round_state.task_ids.push(ready_task.task.id.clone());
round_state.tags.push(ready_task.tag.clone());
}
Err(e) => {
println!(
" {} Failed: {} - {}",
"✗".red(),
ready_task.task.id.red(),
e
);
round_state.failures.push(ready_task.task.id.clone());
tasks_failed += 1;
if let Err(log_err) = event_writer.log_completed(&ready_task.task.id, false, 0)
{
eprintln!("Warning: Failed to log completion event: {}", log_err);
}
if let Ok(mut phase) = storage.load_group(&ready_task.tag) {
if let Some(task) = phase.get_task_mut(&ready_task.task.id) {
task.set_status(TaskStatus::Failed);
let _ = storage.update_group(&ready_task.tag, &phase);
}
}
}
}
}
let mut newly_completed: Vec<(String, bool)> = Vec::new();
for task_id in &spawned_tasks {
if !spawned_times.contains_key(task_id) {
continue;
}
for phase in all_phases.values() {
if let Some(task) = phase.get_task(task_id) {
match task.status {
TaskStatus::Done => {
newly_completed.push((task_id.clone(), true));
}
TaskStatus::Failed => {
newly_completed.push((task_id.clone(), false));
}
_ => {}
}
break;
}
}
}
for (task_id, success) in newly_completed {
if let Some(spawn_time) = spawned_times.remove(&task_id) {
spawned_tasks.remove(&task_id);
let duration_ms = spawn_time.elapsed().as_millis() as u64;
if let Err(e) = event_writer.log_completed(&task_id, success, duration_ms) {
eprintln!("Warning: Failed to log completion: {}", e);
}
if success {
tasks_completed += 1;
println!(
" {} Completed: {} ({}ms)",
"✓".green(),
task_id.cyan(),
duration_ms
);
for phase in all_phases.values() {
for potential_unblocked in &phase.tasks {
if potential_unblocked.status == TaskStatus::Pending
&& potential_unblocked.dependencies.contains(&task_id)
{
if let Err(e) =
event_writer.log_unblocked(&potential_unblocked.id, &task_id)
{
eprintln!("Warning: Failed to log unblock: {}", e);
}
}
}
}
} else {
tasks_failed += 1;
}
}
}
if in_progress >= config.max_concurrent {
thread::sleep(config.poll_interval);
} else {
thread::sleep(Duration::from_millis(100));
}
}
let mut wave_state = super::session::WaveState::new(1);
wave_state.rounds.push(round_state);
session.waves.push(wave_state);
Ok(BeadsResult {
tasks_completed,
tasks_failed,
total_duration: start_time.elapsed(),
})
}
#[cfg(test)]
mod tests {
use super::*;
use crate::models::task::Priority;
use tempfile::TempDir;
fn create_test_task(id: &str, status: TaskStatus, deps: Vec<&str>) -> Task {
let mut task = Task::new(
id.to_string(),
format!("Task {}", id),
"Description".to_string(),
);
task.status = status;
task.dependencies = deps.into_iter().map(String::from).collect();
task
}
fn setup_storage_with_phase(phase: &Phase, tag: &str) -> (TempDir, Storage) {
let temp_dir = TempDir::new().unwrap();
let storage = Storage::new(Some(temp_dir.path().to_path_buf()));
storage.update_group(tag, phase).unwrap();
(temp_dir, storage)
}
#[test]
fn test_get_ready_tasks_no_deps() {
let mut phase = Phase::new("test".to_string());
phase
.tasks
.push(create_test_task("1", TaskStatus::Pending, vec![]));
phase
.tasks
.push(create_test_task("2", TaskStatus::Pending, vec![]));
let mut phases = HashMap::new();
phases.insert("test".to_string(), phase);
let ready = get_ready_tasks(&phases, "test", false);
assert_eq!(ready.len(), 2);
}
#[test]
fn test_get_ready_tasks_with_deps_met() {
let mut phase = Phase::new("test".to_string());
phase
.tasks
.push(create_test_task("1", TaskStatus::Done, vec![]));
phase
.tasks
.push(create_test_task("2", TaskStatus::Pending, vec!["1"]));
let mut phases = HashMap::new();
phases.insert("test".to_string(), phase);
let ready = get_ready_tasks(&phases, "test", false);
assert_eq!(ready.len(), 1);
assert_eq!(ready[0].task.id, "2");
}
#[test]
fn test_get_ready_tasks_with_deps_not_met() {
let mut phase = Phase::new("test".to_string());
phase
.tasks
.push(create_test_task("1", TaskStatus::InProgress, vec![]));
phase
.tasks
.push(create_test_task("2", TaskStatus::Pending, vec!["1"]));
let mut phases = HashMap::new();
phases.insert("test".to_string(), phase);
let ready = get_ready_tasks(&phases, "test", false);
assert_eq!(ready.len(), 0);
}
#[test]
fn test_get_ready_tasks_skips_expanded() {
let mut phase = Phase::new("test".to_string());
let mut expanded_task = create_test_task("1", TaskStatus::Expanded, vec![]);
expanded_task.subtasks = vec!["1.1".to_string()];
phase.tasks.push(expanded_task);
let mut subtask = create_test_task("1.1", TaskStatus::Pending, vec![]);
subtask.parent_id = Some("1".to_string());
phase.tasks.push(subtask);
let mut phases = HashMap::new();
phases.insert("test".to_string(), phase);
let ready = get_ready_tasks(&phases, "test", false);
assert_eq!(ready.len(), 1);
assert_eq!(ready[0].task.id, "1.1");
}
#[test]
fn test_get_ready_tasks_priority_sort() {
let mut phase = Phase::new("test".to_string());
let mut low = create_test_task("low", TaskStatus::Pending, vec![]);
low.priority = Priority::Low;
let mut critical = create_test_task("critical", TaskStatus::Pending, vec![]);
critical.priority = Priority::Critical;
let mut high = create_test_task("high", TaskStatus::Pending, vec![]);
high.priority = Priority::High;
phase.tasks.push(low);
phase.tasks.push(critical);
phase.tasks.push(high);
let mut phases = HashMap::new();
phases.insert("test".to_string(), phase);
let ready = get_ready_tasks(&phases, "test", false);
assert_eq!(ready.len(), 3);
assert_eq!(ready[0].task.id, "critical");
assert_eq!(ready[1].task.id, "high");
assert_eq!(ready[2].task.id, "low");
}
#[test]
fn test_count_in_progress() {
let mut phase = Phase::new("test".to_string());
phase
.tasks
.push(create_test_task("1", TaskStatus::InProgress, vec![]));
phase
.tasks
.push(create_test_task("2", TaskStatus::InProgress, vec![]));
phase
.tasks
.push(create_test_task("3", TaskStatus::Pending, vec![]));
phase
.tasks
.push(create_test_task("4", TaskStatus::Done, vec![]));
let mut phases = HashMap::new();
phases.insert("test".to_string(), phase);
assert_eq!(count_in_progress(&phases, "test", false), 2);
}
#[test]
fn test_count_remaining() {
let mut phase = Phase::new("test".to_string());
phase
.tasks
.push(create_test_task("1", TaskStatus::InProgress, vec![]));
phase
.tasks
.push(create_test_task("2", TaskStatus::Pending, vec![]));
phase
.tasks
.push(create_test_task("3", TaskStatus::Done, vec![]));
phase
.tasks
.push(create_test_task("4", TaskStatus::Failed, vec![]));
let mut phases = HashMap::new();
phases.insert("test".to_string(), phase);
assert_eq!(count_remaining(&phases, "test", false), 2); }
#[test]
fn test_claim_task_pending() {
let mut phase = Phase::new("test".to_string());
phase
.tasks
.push(create_test_task("1", TaskStatus::Pending, vec![]));
let (_temp_dir, storage) = setup_storage_with_phase(&phase, "test");
let claimed = claim_task(&storage, "1", "test").unwrap();
assert!(claimed);
let reloaded = storage.load_group("test").unwrap();
assert_eq!(
reloaded.get_task("1").unwrap().status,
TaskStatus::InProgress
);
}
#[test]
fn test_claim_task_already_in_progress() {
let mut phase = Phase::new("test".to_string());
phase
.tasks
.push(create_test_task("1", TaskStatus::InProgress, vec![]));
let (_temp_dir, storage) = setup_storage_with_phase(&phase, "test");
let claimed = claim_task(&storage, "1", "test").unwrap();
assert!(!claimed);
}
#[test]
fn test_claim_task_nonexistent() {
let mut phase = Phase::new("test".to_string());
phase
.tasks
.push(create_test_task("1", TaskStatus::Pending, vec![]));
let (_temp_dir, storage) = setup_storage_with_phase(&phase, "test");
let claimed = claim_task(&storage, "nonexistent", "test").unwrap();
assert!(!claimed);
}
#[test]
fn test_claim_task_already_done() {
let mut phase = Phase::new("test".to_string());
phase
.tasks
.push(create_test_task("1", TaskStatus::Done, vec![]));
let (_temp_dir, storage) = setup_storage_with_phase(&phase, "test");
let claimed = claim_task(&storage, "1", "test").unwrap();
assert!(!claimed);
}
}