Skip to main content

scud/commands/swarm/
beads.rs

1//! Beads-style continuous execution mode
2//!
3//! Unlike wave-based execution which batches tasks and waits for all to complete,
4//! beads-style execution uses continuous polling for ready tasks:
5//!
6//! 1. Query for all tasks where dependencies are met
7//! 2. Claim task (mark in-progress)
8//! 3. Spawn agent
9//! 4. Immediately loop back to step 1 (no waiting for batch)
10//!
11//! This enables more fluid execution where downstream tasks can start
12//! immediately when their dependencies complete, rather than waiting
13//! for artificial wave boundaries.
14//!
15//! Inspired by the Beads project (https://github.com/steveyegge/beads)
16//! and Gas Town's GUPP principle: "When an agent finds work on their hook,
17//! they execute immediately. No confirmation. No questions. No waiting."
18
19use std::collections::{HashMap, HashSet};
20use std::path::Path;
21use std::thread;
22use std::time::{Duration, Instant};
23
24use anyhow::Result;
25use colored::Colorize;
26
27use crate::commands::spawn::agent;
28use crate::commands::spawn::terminal::{self, Harness};
29use crate::models::phase::Phase;
30use crate::models::task::{Task, TaskStatus};
31use crate::storage::Storage;
32
33use super::events::EventWriter;
34use super::session::{RoundState, SwarmSession};
35
36/// Configuration for beads execution
37pub struct BeadsConfig {
38    /// Maximum concurrent agents
39    pub max_concurrent: usize,
40    /// Poll interval when no tasks are ready but some are in-progress
41    pub poll_interval: Duration,
42}
43
44impl Default for BeadsConfig {
45    fn default() -> Self {
46        Self {
47            max_concurrent: 5,
48            poll_interval: Duration::from_secs(3),
49        }
50    }
51}
52
53/// Task info with tag for tracking
54#[derive(Clone, Debug)]
55pub struct ReadyTask {
56    pub task: Task,
57    pub tag: String,
58}
59
60/// Result of a beads execution run
61pub struct BeadsResult {
62    pub tasks_completed: usize,
63    pub tasks_failed: usize,
64    pub total_duration: Duration,
65}
66
67/// Get all tasks that are ready to execute (dependencies met, not in-progress)
68///
69/// A task is ready when:
70/// - Status is Pending
71/// - Not expanded (or is subtask of expanded parent)
72/// - All dependencies have status Done
73/// - Not blocked by in-progress tasks (unlike waves, we allow execution while others run)
74pub fn get_ready_tasks(
75    all_phases: &HashMap<String, Phase>,
76    phase_tag: &str,
77    all_tags: bool,
78) -> Vec<ReadyTask> {
79    let mut ready = Vec::new();
80
81    // Collect all tasks as references for dependency checking
82    let all_task_refs: Vec<&Task> = all_phases.values().flat_map(|p| &p.tasks).collect();
83
84    // Determine which phases to check
85    let phase_tags: Vec<&String> = if all_tags {
86        all_phases.keys().collect()
87    } else {
88        all_phases
89            .keys()
90            .filter(|t| t.as_str() == phase_tag)
91            .collect()
92    };
93
94    for tag in phase_tags {
95        if let Some(phase) = all_phases.get(tag) {
96            for task in &phase.tasks {
97                if is_task_ready(task, phase, &all_task_refs) {
98                    ready.push(ReadyTask {
99                        task: task.clone(),
100                        tag: tag.clone(),
101                    });
102                }
103            }
104        }
105    }
106
107    // Sort by priority (Critical > High > Medium > Low), then by ID
108    ready.sort_by(|a, b| {
109        use crate::models::task::Priority;
110        let priority_ord = |p: &Priority| match p {
111            Priority::Critical => 0,
112            Priority::High => 1,
113            Priority::Medium => 2,
114            Priority::Low => 3,
115        };
116        priority_ord(&a.task.priority)
117            .cmp(&priority_ord(&b.task.priority))
118            .then_with(|| a.task.id.cmp(&b.task.id))
119    });
120
121    ready
122}
123
124/// Check if a task is ready to execute
125fn is_task_ready(task: &Task, phase: &Phase, all_tasks: &[&Task]) -> bool {
126    // Must be pending
127    if task.status != TaskStatus::Pending {
128        return false;
129    }
130
131    // Skip expanded tasks (they have subtasks to do instead)
132    if task.is_expanded() {
133        return false;
134    }
135
136    // If subtask, parent must be expanded
137    if let Some(ref parent_id) = task.parent_id {
138        let parent_expanded = phase
139            .get_task(parent_id)
140            .map(|p| p.is_expanded())
141            .unwrap_or(false);
142        if !parent_expanded {
143            return false;
144        }
145    }
146
147    // All dependencies must be Done (not just "not pending")
148    // This uses the effective dependencies which includes inherited parent deps
149    task.has_dependencies_met_refs(all_tasks)
150}
151
152/// Count tasks currently in progress
153pub fn count_in_progress(
154    all_phases: &HashMap<String, Phase>,
155    phase_tag: &str,
156    all_tags: bool,
157) -> usize {
158    let tags: Vec<&String> = if all_tags {
159        all_phases.keys().collect()
160    } else {
161        all_phases
162            .keys()
163            .filter(|t| t.as_str() == phase_tag)
164            .collect()
165    };
166
167    tags.iter()
168        .filter_map(|tag| all_phases.get(*tag))
169        .flat_map(|phase| &phase.tasks)
170        .filter(|t| t.status == TaskStatus::InProgress)
171        .count()
172}
173
174/// Count remaining tasks (pending or in-progress)
175pub fn count_remaining(
176    all_phases: &HashMap<String, Phase>,
177    phase_tag: &str,
178    all_tags: bool,
179) -> usize {
180    let tags: Vec<&String> = if all_tags {
181        all_phases.keys().collect()
182    } else {
183        all_phases
184            .keys()
185            .filter(|t| t.as_str() == phase_tag)
186            .collect()
187    };
188
189    tags.iter()
190        .filter_map(|tag| all_phases.get(*tag))
191        .flat_map(|phase| &phase.tasks)
192        .filter(|t| {
193            t.status == TaskStatus::InProgress
194                || (t.status == TaskStatus::Pending && !t.is_expanded())
195        })
196        .count()
197}
198
199/// Claim a task by marking it as in-progress
200pub fn claim_task(storage: &Storage, task_id: &str, tag: &str) -> Result<bool> {
201    let mut phase = storage.load_group(tag)?;
202
203    if let Some(task) = phase.get_task_mut(task_id) {
204        // Only claim if still pending (prevent race conditions)
205        if task.status == TaskStatus::Pending {
206            task.set_status(TaskStatus::InProgress);
207            storage.update_group(tag, &phase)?;
208            return Ok(true);
209        }
210    }
211
212    Ok(false)
213}
214
215/// Spawn an agent for a task using tmux
216pub fn spawn_agent_tmux(
217    ready_task: &ReadyTask,
218    working_dir: &Path,
219    session_name: &str,
220    default_harness: Harness,
221) -> Result<String> {
222    // Resolve agent config (harness, model, prompt) from task's agent_type
223    let config = agent::resolve_agent_config(
224        &ready_task.task,
225        &ready_task.tag,
226        default_harness,
227        None,
228        working_dir,
229    );
230
231    // Spawn in tmux
232    let window_index = terminal::spawn_terminal_with_harness_and_model(
233        &ready_task.task.id,
234        &config.prompt,
235        working_dir,
236        session_name,
237        config.harness,
238        config.model.as_deref(),
239    )?;
240
241    Ok(format!("{}:{}", session_name, window_index))
242}
243
244/// Main beads execution loop
245///
246/// Continuously polls for ready tasks and spawns agents immediately.
247/// Does not wait for batches - new tasks can start as soon as their
248/// dependencies complete.
249pub fn run_beads_loop(
250    storage: &Storage,
251    phase_tag: &str,
252    all_tags: bool,
253    working_dir: &Path,
254    session_name: &str,
255    default_harness: Harness,
256    config: &BeadsConfig,
257    session: &mut SwarmSession,
258) -> Result<BeadsResult> {
259    let start_time = Instant::now();
260    let mut tasks_completed = 0;
261    let mut tasks_failed = 0;
262    let mut spawned_tasks: HashSet<String> = HashSet::new();
263    let mut spawned_times: HashMap<String, Instant> = HashMap::new();
264    let mut round_state = RoundState::new(0); // Single continuous "round"
265
266    // Initialize event writer for retrospective logging
267    let event_writer = EventWriter::new(working_dir, session_name)
268        .map_err(|e| anyhow::anyhow!("Failed to initialize event writer: {}", e))?;
269
270    println!();
271    println!("{}", "Beads Execution Mode".cyan().bold());
272    println!("{}", "═".repeat(50));
273    println!("  {} Continuous ready-task polling", "Mode:".dimmed());
274    if let Some(session_file) = event_writer.session_file() {
275        println!(
276            "  {} {}",
277            "Event log:".dimmed(),
278            session_file.display().to_string().dimmed()
279        );
280    }
281    println!(
282        "  {} {}",
283        "Max concurrent:".dimmed(),
284        config.max_concurrent.to_string().cyan()
285    );
286    println!(
287        "  {} {}ms",
288        "Poll interval:".dimmed(),
289        config.poll_interval.as_millis().to_string().cyan()
290    );
291    println!();
292
293    loop {
294        // Reload task state to see completed tasks
295        let all_phases = storage.load_tasks()?;
296
297        // Count current state
298        let in_progress = count_in_progress(&all_phases, phase_tag, all_tags);
299        let remaining = count_remaining(&all_phases, phase_tag, all_tags);
300
301        // Check for completion
302        if remaining == 0 {
303            println!();
304            println!("{}", "All tasks complete!".green().bold());
305            break;
306        }
307
308        // Get ready tasks
309        let ready_tasks = get_ready_tasks(&all_phases, phase_tag, all_tags);
310
311        // Filter out tasks we've already spawned (in case status update is delayed)
312        let ready_tasks: Vec<_> = ready_tasks
313            .into_iter()
314            .filter(|rt| !spawned_tasks.contains(&rt.task.id))
315            .collect();
316
317        if ready_tasks.is_empty() {
318            if in_progress > 0 {
319                // Some tasks running but none ready - wait for completion
320                print!(
321                    "\r  {} {} task(s) in progress, waiting...   ",
322                    "⏳".dimmed(),
323                    in_progress.to_string().cyan()
324                );
325                std::io::Write::flush(&mut std::io::stdout())?;
326                thread::sleep(config.poll_interval);
327                continue;
328            } else {
329                // No tasks ready and none in progress - might be blocked
330                println!();
331                println!("{}", "No ready tasks and none in progress.".yellow());
332                println!(
333                    "  {} {} remaining task(s) may be blocked.",
334                    "!".yellow(),
335                    remaining
336                );
337                println!("  Check for circular dependencies or missing dependencies.");
338                break;
339            }
340        }
341
342        // Clear waiting line if we were waiting
343        print!("\r{}\r", " ".repeat(60));
344
345        // Calculate how many we can spawn
346        let available_slots = config.max_concurrent.saturating_sub(in_progress);
347        let to_spawn = ready_tasks.into_iter().take(available_slots);
348
349        // Spawn agents for ready tasks
350        for ready_task in to_spawn {
351            // Try to claim the task
352            if !claim_task(storage, &ready_task.task.id, &ready_task.tag)? {
353                // Task was claimed by another process or status changed
354                continue;
355            }
356
357            // Mark as spawned locally
358            spawned_tasks.insert(ready_task.task.id.clone());
359            spawned_times.insert(ready_task.task.id.clone(), Instant::now());
360
361            // Log spawn event
362            if let Err(e) = event_writer.log_spawned(&ready_task.task.id) {
363                eprintln!("Warning: Failed to log spawn event: {}", e);
364            }
365
366            // Spawn agent
367            match spawn_agent_tmux(&ready_task, working_dir, session_name, default_harness) {
368                Ok(window_info) => {
369                    println!(
370                        "  {} Spawned: {} | {} [{}]",
371                        "✓".green(),
372                        ready_task.task.id.cyan(),
373                        ready_task.task.title.dimmed(),
374                        window_info.dimmed()
375                    );
376                    round_state.task_ids.push(ready_task.task.id.clone());
377                    round_state.tags.push(ready_task.tag.clone());
378                }
379                Err(e) => {
380                    println!(
381                        "  {} Failed: {} - {}",
382                        "✗".red(),
383                        ready_task.task.id.red(),
384                        e
385                    );
386                    round_state.failures.push(ready_task.task.id.clone());
387                    tasks_failed += 1;
388
389                    // Log failure event
390                    if let Err(log_err) = event_writer.log_completed(&ready_task.task.id, false, 0)
391                    {
392                        eprintln!("Warning: Failed to log completion event: {}", log_err);
393                    }
394
395                    // Reset task status on spawn failure
396                    if let Ok(mut phase) = storage.load_group(&ready_task.tag) {
397                        if let Some(task) = phase.get_task_mut(&ready_task.task.id) {
398                            task.set_status(TaskStatus::Failed);
399                            let _ = storage.update_group(&ready_task.tag, &phase);
400                        }
401                    }
402                }
403            }
404        }
405
406        // Detect newly completed tasks and log them
407        let mut newly_completed: Vec<(String, bool)> = Vec::new();
408        for task_id in &spawned_tasks {
409            // Skip if we've already counted this task
410            if !spawned_times.contains_key(task_id) {
411                continue;
412            }
413            for phase in all_phases.values() {
414                if let Some(task) = phase.get_task(task_id) {
415                    match task.status {
416                        TaskStatus::Done => {
417                            newly_completed.push((task_id.clone(), true));
418                        }
419                        TaskStatus::Failed => {
420                            newly_completed.push((task_id.clone(), false));
421                        }
422                        _ => {}
423                    }
424                    break;
425                }
426            }
427        }
428
429        // Log completion events and track what unblocked what
430        for (task_id, success) in newly_completed {
431            if let Some(spawn_time) = spawned_times.remove(&task_id) {
432                // Clean up spawned_tasks to prevent unbounded growth
433                spawned_tasks.remove(&task_id);
434
435                let duration_ms = spawn_time.elapsed().as_millis() as u64;
436                if let Err(e) = event_writer.log_completed(&task_id, success, duration_ms) {
437                    eprintln!("Warning: Failed to log completion: {}", e);
438                }
439                if success {
440                    tasks_completed += 1;
441                    println!(
442                        "  {} Completed: {} ({}ms)",
443                        "✓".green(),
444                        task_id.cyan(),
445                        duration_ms
446                    );
447
448                    // Check what tasks were unblocked by this completion
449                    // by looking at all pending tasks that depend on this one
450                    for phase in all_phases.values() {
451                        for potential_unblocked in &phase.tasks {
452                            if potential_unblocked.status == TaskStatus::Pending
453                                && potential_unblocked.dependencies.contains(&task_id)
454                            {
455                                if let Err(e) =
456                                    event_writer.log_unblocked(&potential_unblocked.id, &task_id)
457                                {
458                                    eprintln!("Warning: Failed to log unblock: {}", e);
459                                }
460                            }
461                        }
462                    }
463                } else {
464                    tasks_failed += 1;
465                }
466            }
467        }
468
469        // Short sleep to avoid tight polling when at max capacity
470        if in_progress >= config.max_concurrent {
471            thread::sleep(config.poll_interval);
472        } else {
473            // Brief yield to allow other processes
474            thread::sleep(Duration::from_millis(100));
475        }
476    }
477
478    // Save session state
479    let mut wave_state = super::session::WaveState::new(1);
480    wave_state.rounds.push(round_state);
481    session.waves.push(wave_state);
482
483    Ok(BeadsResult {
484        tasks_completed,
485        tasks_failed,
486        total_duration: start_time.elapsed(),
487    })
488}
489
490// Note: Beads extensions mode (async subprocess) is planned but not yet implemented.
491// For now, beads mode uses tmux-based execution via run_beads_loop().
492
493#[cfg(test)]
494mod tests {
495    use super::*;
496    use crate::models::task::Priority;
497    use tempfile::TempDir;
498
499    fn create_test_task(id: &str, status: TaskStatus, deps: Vec<&str>) -> Task {
500        let mut task = Task::new(
501            id.to_string(),
502            format!("Task {}", id),
503            "Description".to_string(),
504        );
505        task.status = status;
506        task.dependencies = deps.into_iter().map(String::from).collect();
507        task
508    }
509
510    fn setup_storage_with_phase(phase: &Phase, tag: &str) -> (TempDir, Storage) {
511        let temp_dir = TempDir::new().unwrap();
512        let storage = Storage::new(Some(temp_dir.path().to_path_buf()));
513        storage.update_group(tag, phase).unwrap();
514        (temp_dir, storage)
515    }
516
517    #[test]
518    fn test_get_ready_tasks_no_deps() {
519        let mut phase = Phase::new("test".to_string());
520        phase
521            .tasks
522            .push(create_test_task("1", TaskStatus::Pending, vec![]));
523        phase
524            .tasks
525            .push(create_test_task("2", TaskStatus::Pending, vec![]));
526
527        let mut phases = HashMap::new();
528        phases.insert("test".to_string(), phase);
529
530        let ready = get_ready_tasks(&phases, "test", false);
531        assert_eq!(ready.len(), 2);
532    }
533
534    #[test]
535    fn test_get_ready_tasks_with_deps_met() {
536        let mut phase = Phase::new("test".to_string());
537        phase
538            .tasks
539            .push(create_test_task("1", TaskStatus::Done, vec![]));
540        phase
541            .tasks
542            .push(create_test_task("2", TaskStatus::Pending, vec!["1"]));
543
544        let mut phases = HashMap::new();
545        phases.insert("test".to_string(), phase);
546
547        let ready = get_ready_tasks(&phases, "test", false);
548        assert_eq!(ready.len(), 1);
549        assert_eq!(ready[0].task.id, "2");
550    }
551
552    #[test]
553    fn test_get_ready_tasks_with_deps_not_met() {
554        let mut phase = Phase::new("test".to_string());
555        phase
556            .tasks
557            .push(create_test_task("1", TaskStatus::InProgress, vec![]));
558        phase
559            .tasks
560            .push(create_test_task("2", TaskStatus::Pending, vec!["1"]));
561
562        let mut phases = HashMap::new();
563        phases.insert("test".to_string(), phase);
564
565        let ready = get_ready_tasks(&phases, "test", false);
566        assert_eq!(ready.len(), 0);
567    }
568
569    #[test]
570    fn test_get_ready_tasks_skips_expanded() {
571        let mut phase = Phase::new("test".to_string());
572        let mut expanded_task = create_test_task("1", TaskStatus::Expanded, vec![]);
573        expanded_task.subtasks = vec!["1.1".to_string()];
574        phase.tasks.push(expanded_task);
575
576        let mut subtask = create_test_task("1.1", TaskStatus::Pending, vec![]);
577        subtask.parent_id = Some("1".to_string());
578        phase.tasks.push(subtask);
579
580        let mut phases = HashMap::new();
581        phases.insert("test".to_string(), phase);
582
583        let ready = get_ready_tasks(&phases, "test", false);
584        assert_eq!(ready.len(), 1);
585        assert_eq!(ready[0].task.id, "1.1");
586    }
587
588    #[test]
589    fn test_get_ready_tasks_priority_sort() {
590        let mut phase = Phase::new("test".to_string());
591
592        let mut low = create_test_task("low", TaskStatus::Pending, vec![]);
593        low.priority = Priority::Low;
594
595        let mut critical = create_test_task("critical", TaskStatus::Pending, vec![]);
596        critical.priority = Priority::Critical;
597
598        let mut high = create_test_task("high", TaskStatus::Pending, vec![]);
599        high.priority = Priority::High;
600
601        phase.tasks.push(low);
602        phase.tasks.push(critical);
603        phase.tasks.push(high);
604
605        let mut phases = HashMap::new();
606        phases.insert("test".to_string(), phase);
607
608        let ready = get_ready_tasks(&phases, "test", false);
609        assert_eq!(ready.len(), 3);
610        assert_eq!(ready[0].task.id, "critical");
611        assert_eq!(ready[1].task.id, "high");
612        assert_eq!(ready[2].task.id, "low");
613    }
614
615    #[test]
616    fn test_count_in_progress() {
617        let mut phase = Phase::new("test".to_string());
618        phase
619            .tasks
620            .push(create_test_task("1", TaskStatus::InProgress, vec![]));
621        phase
622            .tasks
623            .push(create_test_task("2", TaskStatus::InProgress, vec![]));
624        phase
625            .tasks
626            .push(create_test_task("3", TaskStatus::Pending, vec![]));
627        phase
628            .tasks
629            .push(create_test_task("4", TaskStatus::Done, vec![]));
630
631        let mut phases = HashMap::new();
632        phases.insert("test".to_string(), phase);
633
634        assert_eq!(count_in_progress(&phases, "test", false), 2);
635    }
636
637    #[test]
638    fn test_count_remaining() {
639        let mut phase = Phase::new("test".to_string());
640        phase
641            .tasks
642            .push(create_test_task("1", TaskStatus::InProgress, vec![]));
643        phase
644            .tasks
645            .push(create_test_task("2", TaskStatus::Pending, vec![]));
646        phase
647            .tasks
648            .push(create_test_task("3", TaskStatus::Done, vec![]));
649        phase
650            .tasks
651            .push(create_test_task("4", TaskStatus::Failed, vec![]));
652
653        let mut phases = HashMap::new();
654        phases.insert("test".to_string(), phase);
655
656        assert_eq!(count_remaining(&phases, "test", false), 2); // InProgress + Pending
657    }
658
659    #[test]
660    fn test_claim_task_pending() {
661        let mut phase = Phase::new("test".to_string());
662        phase
663            .tasks
664            .push(create_test_task("1", TaskStatus::Pending, vec![]));
665
666        let (_temp_dir, storage) = setup_storage_with_phase(&phase, "test");
667
668        // Should successfully claim the pending task
669        let claimed = claim_task(&storage, "1", "test").unwrap();
670        assert!(claimed);
671
672        // Verify the task is now in-progress
673        let reloaded = storage.load_group("test").unwrap();
674        assert_eq!(
675            reloaded.get_task("1").unwrap().status,
676            TaskStatus::InProgress
677        );
678    }
679
680    #[test]
681    fn test_claim_task_already_in_progress() {
682        let mut phase = Phase::new("test".to_string());
683        phase
684            .tasks
685            .push(create_test_task("1", TaskStatus::InProgress, vec![]));
686
687        let (_temp_dir, storage) = setup_storage_with_phase(&phase, "test");
688
689        // Should fail to claim a task that's already in-progress
690        let claimed = claim_task(&storage, "1", "test").unwrap();
691        assert!(!claimed);
692    }
693
694    #[test]
695    fn test_claim_task_nonexistent() {
696        let mut phase = Phase::new("test".to_string());
697        phase
698            .tasks
699            .push(create_test_task("1", TaskStatus::Pending, vec![]));
700
701        let (_temp_dir, storage) = setup_storage_with_phase(&phase, "test");
702
703        // Should fail to claim a task that doesn't exist
704        let claimed = claim_task(&storage, "nonexistent", "test").unwrap();
705        assert!(!claimed);
706    }
707
708    #[test]
709    fn test_claim_task_already_done() {
710        let mut phase = Phase::new("test".to_string());
711        phase
712            .tasks
713            .push(create_test_task("1", TaskStatus::Done, vec![]));
714
715        let (_temp_dir, storage) = setup_storage_with_phase(&phase, "test");
716
717        // Should fail to claim a task that's already done
718        let claimed = claim_task(&storage, "1", "test").unwrap();
719        assert!(!claimed);
720    }
721}