Skip to main content

scud/commands/swarm/
beads.rs

1//! Beads-style continuous execution mode
2//!
3//! Unlike wave-based execution which batches tasks and waits for all to complete,
4//! beads-style execution uses continuous polling for ready tasks:
5//!
6//! 1. Query for all tasks where dependencies are met
7//! 2. Claim task (mark in-progress)
8//! 3. Spawn agent
9//! 4. Immediately loop back to step 1 (no waiting for batch)
10//!
11//! This enables more fluid execution where downstream tasks can start
12//! immediately when their dependencies complete, rather than waiting
13//! for artificial wave boundaries.
14//!
15//! Inspired by the Beads project (https://github.com/steveyegge/beads)
16//! and Gas Town's GUPP principle: "When an agent finds work on their hook,
17//! they execute immediately. No confirmation. No questions. No waiting."
18
19use std::collections::{HashMap, HashSet};
20use std::path::Path;
21use std::thread;
22use std::time::{Duration, Instant};
23
24use anyhow::Result;
25use colored::Colorize;
26
27use crate::commands::spawn::agent;
28use crate::commands::spawn::terminal::{self, Harness};
29use crate::models::phase::Phase;
30use crate::models::task::{Task, TaskStatus};
31use crate::storage::Storage;
32
33use super::events::EventWriter;
34use super::session::{RoundState, SwarmSession};
35
36/// Configuration for beads execution
37pub struct BeadsConfig {
38    /// Maximum concurrent agents
39    pub max_concurrent: usize,
40    /// Poll interval when no tasks are ready but some are in-progress
41    pub poll_interval: Duration,
42}
43
44impl Default for BeadsConfig {
45    fn default() -> Self {
46        Self {
47            max_concurrent: 5,
48            poll_interval: Duration::from_secs(3),
49        }
50    }
51}
52
53/// Task info with tag for tracking
54#[derive(Clone, Debug)]
55pub struct ReadyTask {
56    pub task: Task,
57    pub tag: String,
58}
59
60/// Result of a beads execution run
61pub struct BeadsResult {
62    pub tasks_completed: usize,
63    pub tasks_failed: usize,
64    pub total_duration: Duration,
65}
66
67/// Get all tasks that are ready to execute (dependencies met, not in-progress)
68///
69/// A task is ready when:
70/// - Status is Pending
71/// - Not expanded (or is subtask of expanded parent)
72/// - All dependencies have status Done
73/// - Not blocked by in-progress tasks (unlike waves, we allow execution while others run)
74pub fn get_ready_tasks(
75    all_phases: &HashMap<String, Phase>,
76    phase_tag: &str,
77    all_tags: bool,
78) -> Vec<ReadyTask> {
79    let mut ready = Vec::new();
80
81    // Collect all tasks as references for dependency checking
82    let all_task_refs: Vec<&Task> = all_phases.values().flat_map(|p| &p.tasks).collect();
83
84    // Determine which phases to check
85    let phase_tags: Vec<&String> = if all_tags {
86        all_phases.keys().collect()
87    } else {
88        all_phases
89            .keys()
90            .filter(|t| t.as_str() == phase_tag)
91            .collect()
92    };
93
94    for tag in phase_tags {
95        if let Some(phase) = all_phases.get(tag) {
96            for task in &phase.tasks {
97                if is_task_ready(task, phase, &all_task_refs) {
98                    ready.push(ReadyTask {
99                        task: task.clone(),
100                        tag: tag.clone(),
101                    });
102                }
103            }
104        }
105    }
106
107    // Sort by priority (Critical > High > Medium > Low), then by ID
108    ready.sort_by(|a, b| {
109        use crate::models::task::Priority;
110        let priority_ord = |p: &Priority| match p {
111            Priority::Critical => 0,
112            Priority::High => 1,
113            Priority::Medium => 2,
114            Priority::Low => 3,
115        };
116        priority_ord(&a.task.priority)
117            .cmp(&priority_ord(&b.task.priority))
118            .then_with(|| a.task.id.cmp(&b.task.id))
119    });
120
121    ready
122}
123
124/// Check if a task is ready to execute
125fn is_task_ready(task: &Task, phase: &Phase, all_tasks: &[&Task]) -> bool {
126    // Must be pending
127    if task.status != TaskStatus::Pending {
128        return false;
129    }
130
131    // Skip expanded tasks (they have subtasks to do instead)
132    if task.is_expanded() {
133        return false;
134    }
135
136    // If subtask, parent must be expanded
137    if let Some(ref parent_id) = task.parent_id {
138        let parent_expanded = phase
139            .get_task(parent_id)
140            .map(|p| p.is_expanded())
141            .unwrap_or(false);
142        if !parent_expanded {
143            return false;
144        }
145    }
146
147    // All dependencies must be Done (not just "not pending")
148    // This uses the effective dependencies which includes inherited parent deps
149    task.has_dependencies_met_refs(all_tasks)
150}
151
152/// Count tasks currently in progress
153pub fn count_in_progress(
154    all_phases: &HashMap<String, Phase>,
155    phase_tag: &str,
156    all_tags: bool,
157) -> usize {
158    let tags: Vec<&String> = if all_tags {
159        all_phases.keys().collect()
160    } else {
161        all_phases
162            .keys()
163            .filter(|t| t.as_str() == phase_tag)
164            .collect()
165    };
166
167    tags.iter()
168        .filter_map(|tag| all_phases.get(*tag))
169        .flat_map(|phase| &phase.tasks)
170        .filter(|t| t.status == TaskStatus::InProgress)
171        .count()
172}
173
174/// Count remaining tasks (pending or in-progress)
175pub fn count_remaining(
176    all_phases: &HashMap<String, Phase>,
177    phase_tag: &str,
178    all_tags: bool,
179) -> usize {
180    let tags: Vec<&String> = if all_tags {
181        all_phases.keys().collect()
182    } else {
183        all_phases
184            .keys()
185            .filter(|t| t.as_str() == phase_tag)
186            .collect()
187    };
188
189    tags.iter()
190        .filter_map(|tag| all_phases.get(*tag))
191        .flat_map(|phase| &phase.tasks)
192        .filter(|t| {
193            t.status == TaskStatus::InProgress
194                || (t.status == TaskStatus::Pending && !t.is_expanded())
195        })
196        .count()
197}
198
199/// Claim a task by marking it as in-progress
200pub fn claim_task(storage: &Storage, task_id: &str, tag: &str) -> Result<bool> {
201    let mut phase = storage.load_group(tag)?;
202
203    if let Some(task) = phase.get_task_mut(task_id) {
204        // Only claim if still pending (prevent race conditions)
205        if task.status == TaskStatus::Pending {
206            task.set_status(TaskStatus::InProgress);
207            storage.update_group(tag, &phase)?;
208            return Ok(true);
209        }
210    }
211
212    Ok(false)
213}
214
215/// Spawn an agent for a task using tmux
216pub fn spawn_agent_tmux(
217    ready_task: &ReadyTask,
218    working_dir: &Path,
219    session_name: &str,
220    default_harness: Harness,
221) -> Result<String> {
222    // Resolve agent config (harness, model, prompt) from task's agent_type
223    let config = agent::resolve_agent_config(
224        &ready_task.task,
225        &ready_task.tag,
226        default_harness,
227        None,
228        working_dir,
229    );
230
231    // Spawn in tmux
232    let window_index = terminal::spawn_terminal_with_harness_and_model(
233        &ready_task.task.id,
234        &config.prompt,
235        working_dir,
236        session_name,
237        config.harness,
238        config.model.as_deref(),
239    )?;
240
241    Ok(format!("{}:{}", session_name, window_index))
242}
243
244/// Main beads execution loop
245///
246/// Continuously polls for ready tasks and spawns agents immediately.
247/// Does not wait for batches - new tasks can start as soon as their
248/// dependencies complete.
249pub fn run_beads_loop(
250    storage: &Storage,
251    phase_tag: &str,
252    all_tags: bool,
253    working_dir: &Path,
254    session_name: &str,
255    default_harness: Harness,
256    config: &BeadsConfig,
257    session: &mut SwarmSession,
258) -> Result<BeadsResult> {
259    let start_time = Instant::now();
260    let mut tasks_completed = 0;
261    let mut tasks_failed = 0;
262    let mut spawned_tasks: HashSet<String> = HashSet::new();
263    let mut spawned_times: HashMap<String, Instant> = HashMap::new();
264    let mut round_state = RoundState::new(0); // Single continuous "round"
265
266    // Initialize event writer for retrospective logging
267    let event_writer = EventWriter::new(working_dir, session_name)
268        .map_err(|e| anyhow::anyhow!("Failed to initialize event writer: {}", e))?;
269
270    println!();
271    println!("{}", "Beads Execution Mode".cyan().bold());
272    println!("{}", "═".repeat(50));
273    println!(
274        "  {} Continuous ready-task polling",
275        "Mode:".dimmed()
276    );
277    println!(
278        "  {} {}",
279        "Event log:".dimmed(),
280        event_writer.session_file().display().to_string().dimmed()
281    );
282    println!(
283        "  {} {}",
284        "Max concurrent:".dimmed(),
285        config.max_concurrent.to_string().cyan()
286    );
287    println!(
288        "  {} {}ms",
289        "Poll interval:".dimmed(),
290        config.poll_interval.as_millis().to_string().cyan()
291    );
292    println!();
293
294    loop {
295        // Reload task state to see completed tasks
296        let all_phases = storage.load_tasks()?;
297
298        // Count current state
299        let in_progress = count_in_progress(&all_phases, phase_tag, all_tags);
300        let remaining = count_remaining(&all_phases, phase_tag, all_tags);
301
302        // Check for completion
303        if remaining == 0 {
304            println!();
305            println!("{}", "All tasks complete!".green().bold());
306            break;
307        }
308
309        // Get ready tasks
310        let ready_tasks = get_ready_tasks(&all_phases, phase_tag, all_tags);
311
312        // Filter out tasks we've already spawned (in case status update is delayed)
313        let ready_tasks: Vec<_> = ready_tasks
314            .into_iter()
315            .filter(|rt| !spawned_tasks.contains(&rt.task.id))
316            .collect();
317
318        if ready_tasks.is_empty() {
319            if in_progress > 0 {
320                // Some tasks running but none ready - wait for completion
321                print!(
322                    "\r  {} {} task(s) in progress, waiting...   ",
323                    "⏳".dimmed(),
324                    in_progress.to_string().cyan()
325                );
326                std::io::Write::flush(&mut std::io::stdout())?;
327                thread::sleep(config.poll_interval);
328                continue;
329            } else {
330                // No tasks ready and none in progress - might be blocked
331                println!();
332                println!(
333                    "{}",
334                    "No ready tasks and none in progress.".yellow()
335                );
336                println!(
337                    "  {} {} remaining task(s) may be blocked.",
338                    "!".yellow(),
339                    remaining
340                );
341                println!("  Check for circular dependencies or missing dependencies.");
342                break;
343            }
344        }
345
346        // Clear waiting line if we were waiting
347        print!("\r{}\r", " ".repeat(60));
348
349        // Calculate how many we can spawn
350        let available_slots = config.max_concurrent.saturating_sub(in_progress);
351        let to_spawn = ready_tasks.into_iter().take(available_slots);
352
353        // Spawn agents for ready tasks
354        for ready_task in to_spawn {
355            // Try to claim the task
356            if !claim_task(storage, &ready_task.task.id, &ready_task.tag)? {
357                // Task was claimed by another process or status changed
358                continue;
359            }
360
361            // Mark as spawned locally
362            spawned_tasks.insert(ready_task.task.id.clone());
363            spawned_times.insert(ready_task.task.id.clone(), Instant::now());
364
365            // Log spawn event
366            if let Err(e) = event_writer.log_spawned(&ready_task.task.id) {
367                eprintln!("Warning: Failed to log spawn event: {}", e);
368            }
369
370            // Spawn agent
371            match spawn_agent_tmux(&ready_task, working_dir, session_name, default_harness) {
372                Ok(window_info) => {
373                    println!(
374                        "  {} Spawned: {} | {} [{}]",
375                        "✓".green(),
376                        ready_task.task.id.cyan(),
377                        ready_task.task.title.dimmed(),
378                        window_info.dimmed()
379                    );
380                    round_state.task_ids.push(ready_task.task.id.clone());
381                    round_state.tags.push(ready_task.tag.clone());
382                }
383                Err(e) => {
384                    println!(
385                        "  {} Failed: {} - {}",
386                        "✗".red(),
387                        ready_task.task.id.red(),
388                        e
389                    );
390                    round_state.failures.push(ready_task.task.id.clone());
391                    tasks_failed += 1;
392
393                    // Log failure event
394                    if let Err(log_err) = event_writer.log_completed(&ready_task.task.id, false, 0) {
395                        eprintln!("Warning: Failed to log completion event: {}", log_err);
396                    }
397
398                    // Reset task status on spawn failure
399                    if let Ok(mut phase) = storage.load_group(&ready_task.tag) {
400                        if let Some(task) = phase.get_task_mut(&ready_task.task.id) {
401                            task.set_status(TaskStatus::Failed);
402                            let _ = storage.update_group(&ready_task.tag, &phase);
403                        }
404                    }
405                }
406            }
407        }
408
409        // Detect newly completed tasks and log them
410        let mut newly_completed: Vec<(String, bool)> = Vec::new();
411        for task_id in &spawned_tasks {
412            // Skip if we've already counted this task
413            if !spawned_times.contains_key(task_id) {
414                continue;
415            }
416            for phase in all_phases.values() {
417                if let Some(task) = phase.get_task(task_id) {
418                    match task.status {
419                        TaskStatus::Done => {
420                            newly_completed.push((task_id.clone(), true));
421                        }
422                        TaskStatus::Failed => {
423                            newly_completed.push((task_id.clone(), false));
424                        }
425                        _ => {}
426                    }
427                    break;
428                }
429            }
430        }
431
432        // Log completion events and track what unblocked what
433        for (task_id, success) in newly_completed {
434            if let Some(spawn_time) = spawned_times.remove(&task_id) {
435                // Clean up spawned_tasks to prevent unbounded growth
436                spawned_tasks.remove(&task_id);
437
438                let duration_ms = spawn_time.elapsed().as_millis() as u64;
439                if let Err(e) = event_writer.log_completed(&task_id, success, duration_ms) {
440                    eprintln!("Warning: Failed to log completion: {}", e);
441                }
442                if success {
443                    tasks_completed += 1;
444                    println!(
445                        "  {} Completed: {} ({}ms)",
446                        "✓".green(),
447                        task_id.cyan(),
448                        duration_ms
449                    );
450
451                    // Check what tasks were unblocked by this completion
452                    // by looking at all pending tasks that depend on this one
453                    for phase in all_phases.values() {
454                        for potential_unblocked in &phase.tasks {
455                            if potential_unblocked.status == TaskStatus::Pending
456                                && potential_unblocked.dependencies.contains(&task_id)
457                            {
458                                if let Err(e) = event_writer.log_unblocked(&potential_unblocked.id, &task_id) {
459                                    eprintln!("Warning: Failed to log unblock: {}", e);
460                                }
461                            }
462                        }
463                    }
464                } else {
465                    tasks_failed += 1;
466                }
467            }
468        }
469
470        // Short sleep to avoid tight polling when at max capacity
471        if in_progress >= config.max_concurrent {
472            thread::sleep(config.poll_interval);
473        } else {
474            // Brief yield to allow other processes
475            thread::sleep(Duration::from_millis(100));
476        }
477    }
478
479    // Save session state
480    let mut wave_state = super::session::WaveState::new(1);
481    wave_state.rounds.push(round_state);
482    session.waves.push(wave_state);
483
484    Ok(BeadsResult {
485        tasks_completed,
486        tasks_failed,
487        total_duration: start_time.elapsed(),
488    })
489}
490
491// Note: Beads extensions mode (async subprocess) is planned but not yet implemented.
492// For now, beads mode uses tmux-based execution via run_beads_loop().
493
494#[cfg(test)]
495mod tests {
496    use super::*;
497    use crate::models::task::Priority;
498    use tempfile::TempDir;
499
500    fn create_test_task(id: &str, status: TaskStatus, deps: Vec<&str>) -> Task {
501        let mut task = Task::new(id.to_string(), format!("Task {}", id), "Description".to_string());
502        task.status = status;
503        task.dependencies = deps.into_iter().map(String::from).collect();
504        task
505    }
506
507    fn setup_storage_with_phase(phase: &Phase, tag: &str) -> (TempDir, Storage) {
508        let temp_dir = TempDir::new().unwrap();
509        let storage = Storage::new(Some(temp_dir.path().to_path_buf()));
510        storage.update_group(tag, phase).unwrap();
511        (temp_dir, storage)
512    }
513
514    #[test]
515    fn test_get_ready_tasks_no_deps() {
516        let mut phase = Phase::new("test".to_string());
517        phase.tasks.push(create_test_task("1", TaskStatus::Pending, vec![]));
518        phase.tasks.push(create_test_task("2", TaskStatus::Pending, vec![]));
519
520        let mut phases = HashMap::new();
521        phases.insert("test".to_string(), phase);
522
523        let ready = get_ready_tasks(&phases, "test", false);
524        assert_eq!(ready.len(), 2);
525    }
526
527    #[test]
528    fn test_get_ready_tasks_with_deps_met() {
529        let mut phase = Phase::new("test".to_string());
530        phase.tasks.push(create_test_task("1", TaskStatus::Done, vec![]));
531        phase.tasks.push(create_test_task("2", TaskStatus::Pending, vec!["1"]));
532
533        let mut phases = HashMap::new();
534        phases.insert("test".to_string(), phase);
535
536        let ready = get_ready_tasks(&phases, "test", false);
537        assert_eq!(ready.len(), 1);
538        assert_eq!(ready[0].task.id, "2");
539    }
540
541    #[test]
542    fn test_get_ready_tasks_with_deps_not_met() {
543        let mut phase = Phase::new("test".to_string());
544        phase.tasks.push(create_test_task("1", TaskStatus::InProgress, vec![]));
545        phase.tasks.push(create_test_task("2", TaskStatus::Pending, vec!["1"]));
546
547        let mut phases = HashMap::new();
548        phases.insert("test".to_string(), phase);
549
550        let ready = get_ready_tasks(&phases, "test", false);
551        assert_eq!(ready.len(), 0);
552    }
553
554    #[test]
555    fn test_get_ready_tasks_skips_expanded() {
556        let mut phase = Phase::new("test".to_string());
557        let mut expanded_task = create_test_task("1", TaskStatus::Expanded, vec![]);
558        expanded_task.subtasks = vec!["1.1".to_string()];
559        phase.tasks.push(expanded_task);
560
561        let mut subtask = create_test_task("1.1", TaskStatus::Pending, vec![]);
562        subtask.parent_id = Some("1".to_string());
563        phase.tasks.push(subtask);
564
565        let mut phases = HashMap::new();
566        phases.insert("test".to_string(), phase);
567
568        let ready = get_ready_tasks(&phases, "test", false);
569        assert_eq!(ready.len(), 1);
570        assert_eq!(ready[0].task.id, "1.1");
571    }
572
573    #[test]
574    fn test_get_ready_tasks_priority_sort() {
575        let mut phase = Phase::new("test".to_string());
576
577        let mut low = create_test_task("low", TaskStatus::Pending, vec![]);
578        low.priority = Priority::Low;
579
580        let mut critical = create_test_task("critical", TaskStatus::Pending, vec![]);
581        critical.priority = Priority::Critical;
582
583        let mut high = create_test_task("high", TaskStatus::Pending, vec![]);
584        high.priority = Priority::High;
585
586        phase.tasks.push(low);
587        phase.tasks.push(critical);
588        phase.tasks.push(high);
589
590        let mut phases = HashMap::new();
591        phases.insert("test".to_string(), phase);
592
593        let ready = get_ready_tasks(&phases, "test", false);
594        assert_eq!(ready.len(), 3);
595        assert_eq!(ready[0].task.id, "critical");
596        assert_eq!(ready[1].task.id, "high");
597        assert_eq!(ready[2].task.id, "low");
598    }
599
600    #[test]
601    fn test_count_in_progress() {
602        let mut phase = Phase::new("test".to_string());
603        phase.tasks.push(create_test_task("1", TaskStatus::InProgress, vec![]));
604        phase.tasks.push(create_test_task("2", TaskStatus::InProgress, vec![]));
605        phase.tasks.push(create_test_task("3", TaskStatus::Pending, vec![]));
606        phase.tasks.push(create_test_task("4", TaskStatus::Done, vec![]));
607
608        let mut phases = HashMap::new();
609        phases.insert("test".to_string(), phase);
610
611        assert_eq!(count_in_progress(&phases, "test", false), 2);
612    }
613
614    #[test]
615    fn test_count_remaining() {
616        let mut phase = Phase::new("test".to_string());
617        phase.tasks.push(create_test_task("1", TaskStatus::InProgress, vec![]));
618        phase.tasks.push(create_test_task("2", TaskStatus::Pending, vec![]));
619        phase.tasks.push(create_test_task("3", TaskStatus::Done, vec![]));
620        phase.tasks.push(create_test_task("4", TaskStatus::Failed, vec![]));
621
622        let mut phases = HashMap::new();
623        phases.insert("test".to_string(), phase);
624
625        assert_eq!(count_remaining(&phases, "test", false), 2); // InProgress + Pending
626    }
627
628    #[test]
629    fn test_claim_task_pending() {
630        let mut phase = Phase::new("test".to_string());
631        phase.tasks.push(create_test_task("1", TaskStatus::Pending, vec![]));
632
633        let (_temp_dir, storage) = setup_storage_with_phase(&phase, "test");
634
635        // Should successfully claim the pending task
636        let claimed = claim_task(&storage, "1", "test").unwrap();
637        assert!(claimed);
638
639        // Verify the task is now in-progress
640        let reloaded = storage.load_group("test").unwrap();
641        assert_eq!(reloaded.get_task("1").unwrap().status, TaskStatus::InProgress);
642    }
643
644    #[test]
645    fn test_claim_task_already_in_progress() {
646        let mut phase = Phase::new("test".to_string());
647        phase.tasks.push(create_test_task("1", TaskStatus::InProgress, vec![]));
648
649        let (_temp_dir, storage) = setup_storage_with_phase(&phase, "test");
650
651        // Should fail to claim a task that's already in-progress
652        let claimed = claim_task(&storage, "1", "test").unwrap();
653        assert!(!claimed);
654    }
655
656    #[test]
657    fn test_claim_task_nonexistent() {
658        let mut phase = Phase::new("test".to_string());
659        phase.tasks.push(create_test_task("1", TaskStatus::Pending, vec![]));
660
661        let (_temp_dir, storage) = setup_storage_with_phase(&phase, "test");
662
663        // Should fail to claim a task that doesn't exist
664        let claimed = claim_task(&storage, "nonexistent", "test").unwrap();
665        assert!(!claimed);
666    }
667
668    #[test]
669    fn test_claim_task_already_done() {
670        let mut phase = Phase::new("test".to_string());
671        phase.tasks.push(create_test_task("1", TaskStatus::Done, vec![]));
672
673        let (_temp_dir, storage) = setup_storage_with_phase(&phase, "test");
674
675        // Should fail to claim a task that's already done
676        let claimed = claim_task(&storage, "1", "test").unwrap();
677        assert!(!claimed);
678    }
679}