Skip to main content

scud/commands/swarm/
beads.rs

1//! Beads-style continuous execution mode
2//!
3//! Unlike wave-based execution which batches tasks and waits for all to complete,
4//! beads-style execution uses continuous polling for ready tasks:
5//!
6//! 1. Query for all tasks where dependencies are met
7//! 2. Claim task (mark in-progress)
8//! 3. Spawn agent
9//! 4. Immediately loop back to step 1 (no waiting for batch)
10//!
11//! This enables more fluid execution where downstream tasks can start
12//! immediately when their dependencies complete, rather than waiting
13//! for artificial wave boundaries.
14//!
15//! Inspired by the Beads project (https://github.com/steveyegge/beads)
16//! and Gas Town's GUPP principle: "When an agent finds work on their hook,
17//! they execute immediately. No confirmation. No questions. No waiting."
18
19use std::collections::{HashMap, HashSet};
20use std::path::Path;
21use std::thread;
22use std::time::{Duration, Instant};
23
24use anyhow::Result;
25use colored::Colorize;
26
27use crate::commands::spawn::agent;
28use crate::commands::spawn::terminal::{self, Harness};
29use crate::commands::task_selection::{
30    count_in_progress_tasks, is_actionable_pending_task, scoped_phases,
31};
32use crate::models::phase::Phase;
33use crate::models::task::{Task, TaskStatus};
34use crate::storage::Storage;
35
36use super::events::EventWriter;
37use super::session::{RoundState, SwarmSession};
38
39/// Configuration for beads execution
40pub struct BeadsConfig {
41    /// Maximum concurrent agents
42    pub max_concurrent: usize,
43    /// Poll interval when no tasks are ready but some are in-progress
44    pub poll_interval: Duration,
45}
46
47impl Default for BeadsConfig {
48    fn default() -> Self {
49        Self {
50            max_concurrent: 5,
51            poll_interval: Duration::from_secs(3),
52        }
53    }
54}
55
56/// Task info with tag for tracking
57#[derive(Clone, Debug)]
58pub struct ReadyTask {
59    pub task: Task,
60    pub tag: String,
61}
62
63/// Result of a beads execution run
64pub struct BeadsResult {
65    pub tasks_completed: usize,
66    pub tasks_failed: usize,
67    pub total_duration: Duration,
68}
69
70/// Get all tasks that are ready to execute (dependencies met, not in-progress)
71///
72/// A task is ready when:
73/// - Status is Pending
74/// - Not expanded (or is subtask of expanded parent)
75/// - All dependencies have status Done
76/// - Not blocked by in-progress tasks (unlike waves, we allow execution while others run)
77pub fn get_ready_tasks(
78    all_phases: &HashMap<String, Phase>,
79    phase_tag: &str,
80    all_tags: bool,
81) -> Vec<ReadyTask> {
82    let mut ready = Vec::new();
83
84    // Collect all tasks as references for dependency checking
85    let all_task_refs: Vec<&Task> = all_phases.values().flat_map(|p| &p.tasks).collect();
86
87    // Determine which phases to check
88    let phase_tags: Vec<&String> = if all_tags {
89        all_phases.keys().collect()
90    } else {
91        all_phases
92            .keys()
93            .filter(|t| t.as_str() == phase_tag)
94            .collect()
95    };
96
97    for tag in phase_tags {
98        if let Some(phase) = all_phases.get(tag) {
99            for task in &phase.tasks {
100                if is_task_ready(task, phase, &all_task_refs) {
101                    ready.push(ReadyTask {
102                        task: task.clone(),
103                        tag: tag.clone(),
104                    });
105                }
106            }
107        }
108    }
109
110    // Sort by priority (Critical > High > Medium > Low), then by ID
111    ready.sort_by(|a, b| {
112        use crate::models::task::Priority;
113        let priority_ord = |p: &Priority| match p {
114            Priority::Critical => 0,
115            Priority::High => 1,
116            Priority::Medium => 2,
117            Priority::Low => 3,
118        };
119        priority_ord(&a.task.priority)
120            .cmp(&priority_ord(&b.task.priority))
121            .then_with(|| a.task.id.cmp(&b.task.id))
122    });
123
124    ready
125}
126
127/// Check if a task is ready to execute
128fn is_task_ready(task: &Task, phase: &Phase, all_tasks: &[&Task]) -> bool {
129    if !is_actionable_pending_task(task, phase) {
130        return false;
131    }
132
133    // All dependencies must be Done (not just "not pending")
134    // This uses the effective dependencies which includes inherited parent deps
135    task.has_dependencies_met_refs(all_tasks)
136}
137
138/// Count tasks currently in progress
139pub fn count_in_progress(
140    all_phases: &HashMap<String, Phase>,
141    phase_tag: &str,
142    all_tags: bool,
143) -> usize {
144    count_in_progress_tasks(all_phases, phase_tag, all_tags)
145}
146
147/// Count remaining tasks (pending or in-progress)
148pub fn count_remaining(
149    all_phases: &HashMap<String, Phase>,
150    phase_tag: &str,
151    all_tags: bool,
152) -> usize {
153    scoped_phases(all_phases, phase_tag, all_tags)
154        .into_iter()
155        .flat_map(|phase| &phase.tasks)
156        .filter(|t| {
157            t.status == TaskStatus::InProgress
158                || (t.status == TaskStatus::Pending && !t.is_expanded())
159        })
160        .count()
161}
162
163/// Claim a task by marking it as in-progress
164pub fn claim_task(storage: &Storage, task_id: &str, tag: &str) -> Result<bool> {
165    let mut phase = storage.load_group(tag)?;
166
167    if let Some(task) = phase.get_task_mut(task_id) {
168        // Only claim if still pending (prevent race conditions)
169        if task.status == TaskStatus::Pending {
170            task.set_status(TaskStatus::InProgress);
171            storage.update_group(tag, &phase)?;
172            return Ok(true);
173        }
174    }
175
176    Ok(false)
177}
178
179/// Spawn an agent for a task using tmux
180pub fn spawn_agent_tmux(
181    ready_task: &ReadyTask,
182    working_dir: &Path,
183    session_name: &str,
184    default_harness: Harness,
185) -> Result<String> {
186    // Resolve agent config (harness, model, prompt) from task's agent_type
187    let config = agent::resolve_agent_config(
188        &ready_task.task,
189        &ready_task.tag,
190        default_harness,
191        None,
192        working_dir,
193    );
194
195    // Spawn in tmux
196    let spawn_config = terminal::SpawnConfig {
197        task_id: &ready_task.task.id,
198        prompt: &config.prompt,
199        working_dir,
200        session_name,
201        harness: config.harness,
202        model: config.model.as_deref(),
203        task_list_id: None,
204    };
205    let window_index = terminal::spawn_tmux_agent(&spawn_config)?;
206
207    Ok(format!("{}:{}", session_name, window_index))
208}
209
210/// Main beads execution loop
211///
212/// Continuously polls for ready tasks and spawns agents immediately.
213/// Does not wait for batches - new tasks can start as soon as their
214/// dependencies complete.
215#[allow(clippy::too_many_arguments)]
216pub fn run_beads_loop(
217    storage: &Storage,
218    phase_tag: &str,
219    all_tags: bool,
220    working_dir: &Path,
221    session_name: &str,
222    default_harness: Harness,
223    config: &BeadsConfig,
224    session: &mut SwarmSession,
225) -> Result<BeadsResult> {
226    let start_time = Instant::now();
227    let mut tasks_completed = 0;
228    let mut tasks_failed = 0;
229    let mut spawned_tasks: HashSet<String> = HashSet::new();
230    let mut spawned_times: HashMap<String, Instant> = HashMap::new();
231    let mut round_state = RoundState::new(0); // Single continuous "round"
232
233    // Initialize event writer for retrospective logging
234    let event_writer = EventWriter::new(working_dir, session_name)
235        .map_err(|e| anyhow::anyhow!("Failed to initialize event writer: {}", e))?;
236
237    println!();
238    println!("{}", "Beads Execution Mode".cyan().bold());
239    println!("{}", "═".repeat(50));
240    println!("  {} Continuous ready-task polling", "Mode:".dimmed());
241    if let Some(session_file) = event_writer.session_file() {
242        println!(
243            "  {} {}",
244            "Event log:".dimmed(),
245            session_file.display().to_string().dimmed()
246        );
247    }
248    println!(
249        "  {} {}",
250        "Max concurrent:".dimmed(),
251        config.max_concurrent.to_string().cyan()
252    );
253    println!(
254        "  {} {}ms",
255        "Poll interval:".dimmed(),
256        config.poll_interval.as_millis().to_string().cyan()
257    );
258    println!();
259
260    loop {
261        // Reload task state to see completed tasks
262        let all_phases = storage.load_tasks()?;
263
264        // Count current state
265        let in_progress = count_in_progress(&all_phases, phase_tag, all_tags);
266        let remaining = count_remaining(&all_phases, phase_tag, all_tags);
267
268        // Check for completion
269        if remaining == 0 {
270            println!();
271            println!("{}", "All tasks complete!".green().bold());
272            break;
273        }
274
275        // Get ready tasks
276        let ready_tasks = get_ready_tasks(&all_phases, phase_tag, all_tags);
277
278        // Filter out tasks we've already spawned (in case status update is delayed)
279        let ready_tasks: Vec<_> = ready_tasks
280            .into_iter()
281            .filter(|rt| !spawned_tasks.contains(&rt.task.id))
282            .collect();
283
284        if ready_tasks.is_empty() {
285            if in_progress > 0 {
286                // Some tasks running but none ready - wait for completion
287                print!(
288                    "\r  {} {} task(s) in progress, waiting...   ",
289                    "⏳".dimmed(),
290                    in_progress.to_string().cyan()
291                );
292                std::io::Write::flush(&mut std::io::stdout())?;
293                thread::sleep(config.poll_interval);
294                continue;
295            } else {
296                // No tasks ready and none in progress - might be blocked
297                println!();
298                println!("{}", "No ready tasks and none in progress.".yellow());
299                println!(
300                    "  {} {} remaining task(s) may be blocked.",
301                    "!".yellow(),
302                    remaining
303                );
304                println!("  Check for circular dependencies or missing dependencies.");
305                break;
306            }
307        }
308
309        // Clear waiting line if we were waiting
310        print!("\r{}\r", " ".repeat(60));
311
312        // Calculate how many we can spawn
313        let available_slots = config.max_concurrent.saturating_sub(in_progress);
314        let to_spawn = ready_tasks.into_iter().take(available_slots);
315
316        // Spawn agents for ready tasks
317        for ready_task in to_spawn {
318            // Try to claim the task
319            if !claim_task(storage, &ready_task.task.id, &ready_task.tag)? {
320                // Task was claimed by another process or status changed
321                continue;
322            }
323
324            // Mark as spawned locally
325            spawned_tasks.insert(ready_task.task.id.clone());
326            spawned_times.insert(ready_task.task.id.clone(), Instant::now());
327
328            // Log spawn event
329            if let Err(e) = event_writer.log_spawned(&ready_task.task.id) {
330                eprintln!("Warning: Failed to log spawn event: {}", e);
331            }
332
333            // Spawn agent
334            match spawn_agent_tmux(&ready_task, working_dir, session_name, default_harness) {
335                Ok(window_info) => {
336                    println!(
337                        "  {} Spawned: {} | {} [{}]",
338                        "✓".green(),
339                        ready_task.task.id.cyan(),
340                        ready_task.task.title.dimmed(),
341                        window_info.dimmed()
342                    );
343                    round_state.task_ids.push(ready_task.task.id.clone());
344                    round_state.tags.push(ready_task.tag.clone());
345                }
346                Err(e) => {
347                    println!(
348                        "  {} Failed: {} - {}",
349                        "✗".red(),
350                        ready_task.task.id.red(),
351                        e
352                    );
353                    round_state.failures.push(ready_task.task.id.clone());
354                    tasks_failed += 1;
355
356                    // Log failure event
357                    if let Err(log_err) = event_writer.log_completed(&ready_task.task.id, false, 0)
358                    {
359                        eprintln!("Warning: Failed to log completion event: {}", log_err);
360                    }
361
362                    // Reset task status on spawn failure
363                    if let Ok(mut phase) = storage.load_group(&ready_task.tag) {
364                        if let Some(task) = phase.get_task_mut(&ready_task.task.id) {
365                            task.set_status(TaskStatus::Failed);
366                            let _ = storage.update_group(&ready_task.tag, &phase);
367                        }
368                    }
369                }
370            }
371        }
372
373        // Detect newly completed tasks and log them
374        let mut newly_completed: Vec<(String, bool)> = Vec::new();
375        for task_id in &spawned_tasks {
376            // Skip if we've already counted this task
377            if !spawned_times.contains_key(task_id) {
378                continue;
379            }
380            for phase in all_phases.values() {
381                if let Some(task) = phase.get_task(task_id) {
382                    match task.status {
383                        TaskStatus::Done => {
384                            newly_completed.push((task_id.clone(), true));
385                        }
386                        TaskStatus::Failed => {
387                            newly_completed.push((task_id.clone(), false));
388                        }
389                        _ => {}
390                    }
391                    break;
392                }
393            }
394        }
395
396        // Log completion events and track what unblocked what
397        for (task_id, success) in newly_completed {
398            if let Some(spawn_time) = spawned_times.remove(&task_id) {
399                // Clean up spawned_tasks to prevent unbounded growth
400                spawned_tasks.remove(&task_id);
401
402                let duration_ms = spawn_time.elapsed().as_millis() as u64;
403                if let Err(e) = event_writer.log_completed(&task_id, success, duration_ms) {
404                    eprintln!("Warning: Failed to log completion: {}", e);
405                }
406                if success {
407                    tasks_completed += 1;
408                    println!(
409                        "  {} Completed: {} ({}ms)",
410                        "✓".green(),
411                        task_id.cyan(),
412                        duration_ms
413                    );
414
415                    // Check what tasks were unblocked by this completion
416                    // by looking at all pending tasks that depend on this one
417                    for phase in all_phases.values() {
418                        for potential_unblocked in &phase.tasks {
419                            if potential_unblocked.status == TaskStatus::Pending
420                                && potential_unblocked.dependencies.contains(&task_id)
421                            {
422                                if let Err(e) =
423                                    event_writer.log_unblocked(&potential_unblocked.id, &task_id)
424                                {
425                                    eprintln!("Warning: Failed to log unblock: {}", e);
426                                }
427                            }
428                        }
429                    }
430                } else {
431                    tasks_failed += 1;
432                }
433            }
434        }
435
436        // Short sleep to avoid tight polling when at max capacity
437        if in_progress >= config.max_concurrent {
438            thread::sleep(config.poll_interval);
439        } else {
440            // Brief yield to allow other processes
441            thread::sleep(Duration::from_millis(100));
442        }
443    }
444
445    // Save session state
446    let mut wave_state = super::session::WaveState::new(1);
447    wave_state.rounds.push(round_state);
448    session.waves.push(wave_state);
449
450    Ok(BeadsResult {
451        tasks_completed,
452        tasks_failed,
453        total_duration: start_time.elapsed(),
454    })
455}
456
457// Note: Beads extensions mode (async subprocess) is planned but not yet implemented.
458// For now, beads mode uses tmux-based execution via run_beads_loop().
459
460#[cfg(test)]
461mod tests {
462    use super::*;
463    use crate::models::task::Priority;
464    use tempfile::TempDir;
465
466    fn create_test_task(id: &str, status: TaskStatus, deps: Vec<&str>) -> Task {
467        let mut task = Task::new(
468            id.to_string(),
469            format!("Task {}", id),
470            "Description".to_string(),
471        );
472        task.status = status;
473        task.dependencies = deps.into_iter().map(String::from).collect();
474        task
475    }
476
477    fn setup_storage_with_phase(phase: &Phase, tag: &str) -> (TempDir, Storage) {
478        let temp_dir = TempDir::new().unwrap();
479        let storage = Storage::new(Some(temp_dir.path().to_path_buf()));
480        storage.update_group(tag, phase).unwrap();
481        (temp_dir, storage)
482    }
483
484    #[test]
485    fn test_get_ready_tasks_no_deps() {
486        let mut phase = Phase::new("test".to_string());
487        phase
488            .tasks
489            .push(create_test_task("1", TaskStatus::Pending, vec![]));
490        phase
491            .tasks
492            .push(create_test_task("2", TaskStatus::Pending, vec![]));
493
494        let mut phases = HashMap::new();
495        phases.insert("test".to_string(), phase);
496
497        let ready = get_ready_tasks(&phases, "test", false);
498        assert_eq!(ready.len(), 2);
499    }
500
501    #[test]
502    fn test_get_ready_tasks_with_deps_met() {
503        let mut phase = Phase::new("test".to_string());
504        phase
505            .tasks
506            .push(create_test_task("1", TaskStatus::Done, vec![]));
507        phase
508            .tasks
509            .push(create_test_task("2", TaskStatus::Pending, vec!["1"]));
510
511        let mut phases = HashMap::new();
512        phases.insert("test".to_string(), phase);
513
514        let ready = get_ready_tasks(&phases, "test", false);
515        assert_eq!(ready.len(), 1);
516        assert_eq!(ready[0].task.id, "2");
517    }
518
519    #[test]
520    fn test_get_ready_tasks_with_deps_not_met() {
521        let mut phase = Phase::new("test".to_string());
522        phase
523            .tasks
524            .push(create_test_task("1", TaskStatus::InProgress, vec![]));
525        phase
526            .tasks
527            .push(create_test_task("2", TaskStatus::Pending, vec!["1"]));
528
529        let mut phases = HashMap::new();
530        phases.insert("test".to_string(), phase);
531
532        let ready = get_ready_tasks(&phases, "test", false);
533        assert_eq!(ready.len(), 0);
534    }
535
536    #[test]
537    fn test_get_ready_tasks_skips_expanded() {
538        let mut phase = Phase::new("test".to_string());
539        let mut expanded_task = create_test_task("1", TaskStatus::Expanded, vec![]);
540        expanded_task.subtasks = vec!["1.1".to_string()];
541        phase.tasks.push(expanded_task);
542
543        let mut subtask = create_test_task("1.1", TaskStatus::Pending, vec![]);
544        subtask.parent_id = Some("1".to_string());
545        phase.tasks.push(subtask);
546
547        let mut phases = HashMap::new();
548        phases.insert("test".to_string(), phase);
549
550        let ready = get_ready_tasks(&phases, "test", false);
551        assert_eq!(ready.len(), 1);
552        assert_eq!(ready[0].task.id, "1.1");
553    }
554
555    #[test]
556    fn test_get_ready_tasks_priority_sort() {
557        let mut phase = Phase::new("test".to_string());
558
559        let mut low = create_test_task("low", TaskStatus::Pending, vec![]);
560        low.priority = Priority::Low;
561
562        let mut critical = create_test_task("critical", TaskStatus::Pending, vec![]);
563        critical.priority = Priority::Critical;
564
565        let mut high = create_test_task("high", TaskStatus::Pending, vec![]);
566        high.priority = Priority::High;
567
568        phase.tasks.push(low);
569        phase.tasks.push(critical);
570        phase.tasks.push(high);
571
572        let mut phases = HashMap::new();
573        phases.insert("test".to_string(), phase);
574
575        let ready = get_ready_tasks(&phases, "test", false);
576        assert_eq!(ready.len(), 3);
577        assert_eq!(ready[0].task.id, "critical");
578        assert_eq!(ready[1].task.id, "high");
579        assert_eq!(ready[2].task.id, "low");
580    }
581
582    #[test]
583    fn test_count_in_progress() {
584        let mut phase = Phase::new("test".to_string());
585        phase
586            .tasks
587            .push(create_test_task("1", TaskStatus::InProgress, vec![]));
588        phase
589            .tasks
590            .push(create_test_task("2", TaskStatus::InProgress, vec![]));
591        phase
592            .tasks
593            .push(create_test_task("3", TaskStatus::Pending, vec![]));
594        phase
595            .tasks
596            .push(create_test_task("4", TaskStatus::Done, vec![]));
597
598        let mut phases = HashMap::new();
599        phases.insert("test".to_string(), phase);
600
601        assert_eq!(count_in_progress(&phases, "test", false), 2);
602    }
603
604    #[test]
605    fn test_count_remaining() {
606        let mut phase = Phase::new("test".to_string());
607        phase
608            .tasks
609            .push(create_test_task("1", TaskStatus::InProgress, vec![]));
610        phase
611            .tasks
612            .push(create_test_task("2", TaskStatus::Pending, vec![]));
613        phase
614            .tasks
615            .push(create_test_task("3", TaskStatus::Done, vec![]));
616        phase
617            .tasks
618            .push(create_test_task("4", TaskStatus::Failed, vec![]));
619
620        let mut phases = HashMap::new();
621        phases.insert("test".to_string(), phase);
622
623        assert_eq!(count_remaining(&phases, "test", false), 2); // InProgress + Pending
624    }
625
626    #[test]
627    fn test_claim_task_pending() {
628        let mut phase = Phase::new("test".to_string());
629        phase
630            .tasks
631            .push(create_test_task("1", TaskStatus::Pending, vec![]));
632
633        let (_temp_dir, storage) = setup_storage_with_phase(&phase, "test");
634
635        // Should successfully claim the pending task
636        let claimed = claim_task(&storage, "1", "test").unwrap();
637        assert!(claimed);
638
639        // Verify the task is now in-progress
640        let reloaded = storage.load_group("test").unwrap();
641        assert_eq!(
642            reloaded.get_task("1").unwrap().status,
643            TaskStatus::InProgress
644        );
645    }
646
647    #[test]
648    fn test_claim_task_already_in_progress() {
649        let mut phase = Phase::new("test".to_string());
650        phase
651            .tasks
652            .push(create_test_task("1", TaskStatus::InProgress, vec![]));
653
654        let (_temp_dir, storage) = setup_storage_with_phase(&phase, "test");
655
656        // Should fail to claim a task that's already in-progress
657        let claimed = claim_task(&storage, "1", "test").unwrap();
658        assert!(!claimed);
659    }
660
661    #[test]
662    fn test_claim_task_nonexistent() {
663        let mut phase = Phase::new("test".to_string());
664        phase
665            .tasks
666            .push(create_test_task("1", TaskStatus::Pending, vec![]));
667
668        let (_temp_dir, storage) = setup_storage_with_phase(&phase, "test");
669
670        // Should fail to claim a task that doesn't exist
671        let claimed = claim_task(&storage, "nonexistent", "test").unwrap();
672        assert!(!claimed);
673    }
674
675    #[test]
676    fn test_claim_task_already_done() {
677        let mut phase = Phase::new("test".to_string());
678        phase
679            .tasks
680            .push(create_test_task("1", TaskStatus::Done, vec![]));
681
682        let (_temp_dir, storage) = setup_storage_with_phase(&phase, "test");
683
684        // Should fail to claim a task that's already done
685        let claimed = claim_task(&storage, "1", "test").unwrap();
686        assert!(!claimed);
687    }
688}