Skip to main content

scud/commands/swarm/
beads.rs

1//! Beads-style continuous execution mode
2//!
3//! Unlike wave-based execution which batches tasks and waits for all to complete,
4//! beads-style execution uses continuous polling for ready tasks:
5//!
6//! 1. Query for all tasks where dependencies are met
7//! 2. Claim task (mark in-progress)
8//! 3. Spawn agent
9//! 4. Immediately loop back to step 1 (no waiting for batch)
10//!
11//! This enables more fluid execution where downstream tasks can start
12//! immediately when their dependencies complete, rather than waiting
13//! for artificial wave boundaries.
14//!
15//! Inspired by the Beads project (https://github.com/steveyegge/beads)
16//! and Gas Town's GUPP principle: "When an agent finds work on their hook,
17//! they execute immediately. No confirmation. No questions. No waiting."
18
19use std::collections::{HashMap, HashSet};
20use std::path::Path;
21use std::thread;
22use std::time::{Duration, Instant};
23
24use anyhow::Result;
25use colored::Colorize;
26
27use crate::commands::spawn::agent;
28use crate::commands::spawn::terminal::{self, Harness};
29use crate::models::phase::Phase;
30use crate::models::task::{Task, TaskStatus};
31use crate::storage::Storage;
32
33use super::events::EventWriter;
34use super::session::{RoundState, SwarmSession};
35
36/// Configuration for beads execution
37pub struct BeadsConfig {
38    /// Maximum concurrent agents
39    pub max_concurrent: usize,
40    /// Poll interval when no tasks are ready but some are in-progress
41    pub poll_interval: Duration,
42}
43
44impl Default for BeadsConfig {
45    fn default() -> Self {
46        Self {
47            max_concurrent: 5,
48            poll_interval: Duration::from_secs(3),
49        }
50    }
51}
52
53/// Task info with tag for tracking
54#[derive(Clone, Debug)]
55pub struct ReadyTask {
56    pub task: Task,
57    pub tag: String,
58}
59
60/// Result of a beads execution run
61pub struct BeadsResult {
62    pub tasks_completed: usize,
63    pub tasks_failed: usize,
64    pub total_duration: Duration,
65}
66
67/// Get all tasks that are ready to execute (dependencies met, not in-progress)
68///
69/// A task is ready when:
70/// - Status is Pending
71/// - Not expanded (or is subtask of expanded parent)
72/// - All dependencies have status Done
73/// - Not blocked by in-progress tasks (unlike waves, we allow execution while others run)
74pub fn get_ready_tasks(
75    all_phases: &HashMap<String, Phase>,
76    phase_tag: &str,
77    all_tags: bool,
78) -> Vec<ReadyTask> {
79    let mut ready = Vec::new();
80
81    // Collect all tasks as references for dependency checking
82    let all_task_refs: Vec<&Task> = all_phases.values().flat_map(|p| &p.tasks).collect();
83
84    // Determine which phases to check
85    let phase_tags: Vec<&String> = if all_tags {
86        all_phases.keys().collect()
87    } else {
88        all_phases
89            .keys()
90            .filter(|t| t.as_str() == phase_tag)
91            .collect()
92    };
93
94    for tag in phase_tags {
95        if let Some(phase) = all_phases.get(tag) {
96            for task in &phase.tasks {
97                if is_task_ready(task, phase, &all_task_refs) {
98                    ready.push(ReadyTask {
99                        task: task.clone(),
100                        tag: tag.clone(),
101                    });
102                }
103            }
104        }
105    }
106
107    // Sort by priority (Critical > High > Medium > Low), then by ID
108    ready.sort_by(|a, b| {
109        use crate::models::task::Priority;
110        let priority_ord = |p: &Priority| match p {
111            Priority::Critical => 0,
112            Priority::High => 1,
113            Priority::Medium => 2,
114            Priority::Low => 3,
115        };
116        priority_ord(&a.task.priority)
117            .cmp(&priority_ord(&b.task.priority))
118            .then_with(|| a.task.id.cmp(&b.task.id))
119    });
120
121    ready
122}
123
124/// Check if a task is ready to execute
125fn is_task_ready(task: &Task, phase: &Phase, all_tasks: &[&Task]) -> bool {
126    // Must be pending
127    if task.status != TaskStatus::Pending {
128        return false;
129    }
130
131    // Skip expanded tasks (they have subtasks to do instead)
132    if task.is_expanded() {
133        return false;
134    }
135
136    // If subtask, parent must be expanded
137    if let Some(ref parent_id) = task.parent_id {
138        let parent_expanded = phase
139            .get_task(parent_id)
140            .map(|p| p.is_expanded())
141            .unwrap_or(false);
142        if !parent_expanded {
143            return false;
144        }
145    }
146
147    // All dependencies must be Done (not just "not pending")
148    // This uses the effective dependencies which includes inherited parent deps
149    task.has_dependencies_met_refs(all_tasks)
150}
151
152/// Count tasks currently in progress
153pub fn count_in_progress(
154    all_phases: &HashMap<String, Phase>,
155    phase_tag: &str,
156    all_tags: bool,
157) -> usize {
158    let tags: Vec<&String> = if all_tags {
159        all_phases.keys().collect()
160    } else {
161        all_phases
162            .keys()
163            .filter(|t| t.as_str() == phase_tag)
164            .collect()
165    };
166
167    tags.iter()
168        .filter_map(|tag| all_phases.get(*tag))
169        .flat_map(|phase| &phase.tasks)
170        .filter(|t| t.status == TaskStatus::InProgress)
171        .count()
172}
173
174/// Count remaining tasks (pending or in-progress)
175pub fn count_remaining(
176    all_phases: &HashMap<String, Phase>,
177    phase_tag: &str,
178    all_tags: bool,
179) -> usize {
180    let tags: Vec<&String> = if all_tags {
181        all_phases.keys().collect()
182    } else {
183        all_phases
184            .keys()
185            .filter(|t| t.as_str() == phase_tag)
186            .collect()
187    };
188
189    tags.iter()
190        .filter_map(|tag| all_phases.get(*tag))
191        .flat_map(|phase| &phase.tasks)
192        .filter(|t| {
193            t.status == TaskStatus::InProgress
194                || (t.status == TaskStatus::Pending && !t.is_expanded())
195        })
196        .count()
197}
198
199/// Claim a task by marking it as in-progress
200pub fn claim_task(storage: &Storage, task_id: &str, tag: &str) -> Result<bool> {
201    let mut phase = storage.load_group(tag)?;
202
203    if let Some(task) = phase.get_task_mut(task_id) {
204        // Only claim if still pending (prevent race conditions)
205        if task.status == TaskStatus::Pending {
206            task.set_status(TaskStatus::InProgress);
207            storage.update_group(tag, &phase)?;
208            return Ok(true);
209        }
210    }
211
212    Ok(false)
213}
214
215/// Spawn an agent for a task using tmux
216pub fn spawn_agent_tmux(
217    ready_task: &ReadyTask,
218    working_dir: &Path,
219    session_name: &str,
220    default_harness: Harness,
221) -> Result<String> {
222    // Resolve agent config (harness, model, prompt) from task's agent_type
223    let config = agent::resolve_agent_config(
224        &ready_task.task,
225        &ready_task.tag,
226        default_harness,
227        None,
228        working_dir,
229    );
230
231    // Spawn in tmux
232    let window_index = terminal::spawn_terminal_with_harness_and_model(
233        &ready_task.task.id,
234        &config.prompt,
235        working_dir,
236        session_name,
237        config.harness,
238        config.model.as_deref(),
239    )?;
240
241    Ok(format!("{}:{}", session_name, window_index))
242}
243
244/// Main beads execution loop
245///
246/// Continuously polls for ready tasks and spawns agents immediately.
247/// Does not wait for batches - new tasks can start as soon as their
248/// dependencies complete.
249#[allow(clippy::too_many_arguments)]
250pub fn run_beads_loop(
251    storage: &Storage,
252    phase_tag: &str,
253    all_tags: bool,
254    working_dir: &Path,
255    session_name: &str,
256    default_harness: Harness,
257    config: &BeadsConfig,
258    session: &mut SwarmSession,
259) -> Result<BeadsResult> {
260    let start_time = Instant::now();
261    let mut tasks_completed = 0;
262    let mut tasks_failed = 0;
263    let mut spawned_tasks: HashSet<String> = HashSet::new();
264    let mut spawned_times: HashMap<String, Instant> = HashMap::new();
265    let mut round_state = RoundState::new(0); // Single continuous "round"
266
267    // Initialize event writer for retrospective logging
268    let event_writer = EventWriter::new(working_dir, session_name)
269        .map_err(|e| anyhow::anyhow!("Failed to initialize event writer: {}", e))?;
270
271    println!();
272    println!("{}", "Beads Execution Mode".cyan().bold());
273    println!("{}", "═".repeat(50));
274    println!("  {} Continuous ready-task polling", "Mode:".dimmed());
275    if let Some(session_file) = event_writer.session_file() {
276        println!(
277            "  {} {}",
278            "Event log:".dimmed(),
279            session_file.display().to_string().dimmed()
280        );
281    }
282    println!(
283        "  {} {}",
284        "Max concurrent:".dimmed(),
285        config.max_concurrent.to_string().cyan()
286    );
287    println!(
288        "  {} {}ms",
289        "Poll interval:".dimmed(),
290        config.poll_interval.as_millis().to_string().cyan()
291    );
292    println!();
293
294    loop {
295        // Reload task state to see completed tasks
296        let all_phases = storage.load_tasks()?;
297
298        // Count current state
299        let in_progress = count_in_progress(&all_phases, phase_tag, all_tags);
300        let remaining = count_remaining(&all_phases, phase_tag, all_tags);
301
302        // Check for completion
303        if remaining == 0 {
304            println!();
305            println!("{}", "All tasks complete!".green().bold());
306            break;
307        }
308
309        // Get ready tasks
310        let ready_tasks = get_ready_tasks(&all_phases, phase_tag, all_tags);
311
312        // Filter out tasks we've already spawned (in case status update is delayed)
313        let ready_tasks: Vec<_> = ready_tasks
314            .into_iter()
315            .filter(|rt| !spawned_tasks.contains(&rt.task.id))
316            .collect();
317
318        if ready_tasks.is_empty() {
319            if in_progress > 0 {
320                // Some tasks running but none ready - wait for completion
321                print!(
322                    "\r  {} {} task(s) in progress, waiting...   ",
323                    "⏳".dimmed(),
324                    in_progress.to_string().cyan()
325                );
326                std::io::Write::flush(&mut std::io::stdout())?;
327                thread::sleep(config.poll_interval);
328                continue;
329            } else {
330                // No tasks ready and none in progress - might be blocked
331                println!();
332                println!("{}", "No ready tasks and none in progress.".yellow());
333                println!(
334                    "  {} {} remaining task(s) may be blocked.",
335                    "!".yellow(),
336                    remaining
337                );
338                println!("  Check for circular dependencies or missing dependencies.");
339                break;
340            }
341        }
342
343        // Clear waiting line if we were waiting
344        print!("\r{}\r", " ".repeat(60));
345
346        // Calculate how many we can spawn
347        let available_slots = config.max_concurrent.saturating_sub(in_progress);
348        let to_spawn = ready_tasks.into_iter().take(available_slots);
349
350        // Spawn agents for ready tasks
351        for ready_task in to_spawn {
352            // Try to claim the task
353            if !claim_task(storage, &ready_task.task.id, &ready_task.tag)? {
354                // Task was claimed by another process or status changed
355                continue;
356            }
357
358            // Mark as spawned locally
359            spawned_tasks.insert(ready_task.task.id.clone());
360            spawned_times.insert(ready_task.task.id.clone(), Instant::now());
361
362            // Log spawn event
363            if let Err(e) = event_writer.log_spawned(&ready_task.task.id) {
364                eprintln!("Warning: Failed to log spawn event: {}", e);
365            }
366
367            // Spawn agent
368            match spawn_agent_tmux(&ready_task, working_dir, session_name, default_harness) {
369                Ok(window_info) => {
370                    println!(
371                        "  {} Spawned: {} | {} [{}]",
372                        "✓".green(),
373                        ready_task.task.id.cyan(),
374                        ready_task.task.title.dimmed(),
375                        window_info.dimmed()
376                    );
377                    round_state.task_ids.push(ready_task.task.id.clone());
378                    round_state.tags.push(ready_task.tag.clone());
379                }
380                Err(e) => {
381                    println!(
382                        "  {} Failed: {} - {}",
383                        "✗".red(),
384                        ready_task.task.id.red(),
385                        e
386                    );
387                    round_state.failures.push(ready_task.task.id.clone());
388                    tasks_failed += 1;
389
390                    // Log failure event
391                    if let Err(log_err) = event_writer.log_completed(&ready_task.task.id, false, 0)
392                    {
393                        eprintln!("Warning: Failed to log completion event: {}", log_err);
394                    }
395
396                    // Reset task status on spawn failure
397                    if let Ok(mut phase) = storage.load_group(&ready_task.tag) {
398                        if let Some(task) = phase.get_task_mut(&ready_task.task.id) {
399                            task.set_status(TaskStatus::Failed);
400                            let _ = storage.update_group(&ready_task.tag, &phase);
401                        }
402                    }
403                }
404            }
405        }
406
407        // Detect newly completed tasks and log them
408        let mut newly_completed: Vec<(String, bool)> = Vec::new();
409        for task_id in &spawned_tasks {
410            // Skip if we've already counted this task
411            if !spawned_times.contains_key(task_id) {
412                continue;
413            }
414            for phase in all_phases.values() {
415                if let Some(task) = phase.get_task(task_id) {
416                    match task.status {
417                        TaskStatus::Done => {
418                            newly_completed.push((task_id.clone(), true));
419                        }
420                        TaskStatus::Failed => {
421                            newly_completed.push((task_id.clone(), false));
422                        }
423                        _ => {}
424                    }
425                    break;
426                }
427            }
428        }
429
430        // Log completion events and track what unblocked what
431        for (task_id, success) in newly_completed {
432            if let Some(spawn_time) = spawned_times.remove(&task_id) {
433                // Clean up spawned_tasks to prevent unbounded growth
434                spawned_tasks.remove(&task_id);
435
436                let duration_ms = spawn_time.elapsed().as_millis() as u64;
437                if let Err(e) = event_writer.log_completed(&task_id, success, duration_ms) {
438                    eprintln!("Warning: Failed to log completion: {}", e);
439                }
440                if success {
441                    tasks_completed += 1;
442                    println!(
443                        "  {} Completed: {} ({}ms)",
444                        "✓".green(),
445                        task_id.cyan(),
446                        duration_ms
447                    );
448
449                    // Check what tasks were unblocked by this completion
450                    // by looking at all pending tasks that depend on this one
451                    for phase in all_phases.values() {
452                        for potential_unblocked in &phase.tasks {
453                            if potential_unblocked.status == TaskStatus::Pending
454                                && potential_unblocked.dependencies.contains(&task_id)
455                            {
456                                if let Err(e) =
457                                    event_writer.log_unblocked(&potential_unblocked.id, &task_id)
458                                {
459                                    eprintln!("Warning: Failed to log unblock: {}", e);
460                                }
461                            }
462                        }
463                    }
464                } else {
465                    tasks_failed += 1;
466                }
467            }
468        }
469
470        // Short sleep to avoid tight polling when at max capacity
471        if in_progress >= config.max_concurrent {
472            thread::sleep(config.poll_interval);
473        } else {
474            // Brief yield to allow other processes
475            thread::sleep(Duration::from_millis(100));
476        }
477    }
478
479    // Save session state
480    let mut wave_state = super::session::WaveState::new(1);
481    wave_state.rounds.push(round_state);
482    session.waves.push(wave_state);
483
484    Ok(BeadsResult {
485        tasks_completed,
486        tasks_failed,
487        total_duration: start_time.elapsed(),
488    })
489}
490
491// Note: Beads extensions mode (async subprocess) is planned but not yet implemented.
492// For now, beads mode uses tmux-based execution via run_beads_loop().
493
494#[cfg(test)]
495mod tests {
496    use super::*;
497    use crate::models::task::Priority;
498    use tempfile::TempDir;
499
500    fn create_test_task(id: &str, status: TaskStatus, deps: Vec<&str>) -> Task {
501        let mut task = Task::new(
502            id.to_string(),
503            format!("Task {}", id),
504            "Description".to_string(),
505        );
506        task.status = status;
507        task.dependencies = deps.into_iter().map(String::from).collect();
508        task
509    }
510
511    fn setup_storage_with_phase(phase: &Phase, tag: &str) -> (TempDir, Storage) {
512        let temp_dir = TempDir::new().unwrap();
513        let storage = Storage::new(Some(temp_dir.path().to_path_buf()));
514        storage.update_group(tag, phase).unwrap();
515        (temp_dir, storage)
516    }
517
518    #[test]
519    fn test_get_ready_tasks_no_deps() {
520        let mut phase = Phase::new("test".to_string());
521        phase
522            .tasks
523            .push(create_test_task("1", TaskStatus::Pending, vec![]));
524        phase
525            .tasks
526            .push(create_test_task("2", TaskStatus::Pending, vec![]));
527
528        let mut phases = HashMap::new();
529        phases.insert("test".to_string(), phase);
530
531        let ready = get_ready_tasks(&phases, "test", false);
532        assert_eq!(ready.len(), 2);
533    }
534
535    #[test]
536    fn test_get_ready_tasks_with_deps_met() {
537        let mut phase = Phase::new("test".to_string());
538        phase
539            .tasks
540            .push(create_test_task("1", TaskStatus::Done, vec![]));
541        phase
542            .tasks
543            .push(create_test_task("2", TaskStatus::Pending, vec!["1"]));
544
545        let mut phases = HashMap::new();
546        phases.insert("test".to_string(), phase);
547
548        let ready = get_ready_tasks(&phases, "test", false);
549        assert_eq!(ready.len(), 1);
550        assert_eq!(ready[0].task.id, "2");
551    }
552
553    #[test]
554    fn test_get_ready_tasks_with_deps_not_met() {
555        let mut phase = Phase::new("test".to_string());
556        phase
557            .tasks
558            .push(create_test_task("1", TaskStatus::InProgress, vec![]));
559        phase
560            .tasks
561            .push(create_test_task("2", TaskStatus::Pending, vec!["1"]));
562
563        let mut phases = HashMap::new();
564        phases.insert("test".to_string(), phase);
565
566        let ready = get_ready_tasks(&phases, "test", false);
567        assert_eq!(ready.len(), 0);
568    }
569
570    #[test]
571    fn test_get_ready_tasks_skips_expanded() {
572        let mut phase = Phase::new("test".to_string());
573        let mut expanded_task = create_test_task("1", TaskStatus::Expanded, vec![]);
574        expanded_task.subtasks = vec!["1.1".to_string()];
575        phase.tasks.push(expanded_task);
576
577        let mut subtask = create_test_task("1.1", TaskStatus::Pending, vec![]);
578        subtask.parent_id = Some("1".to_string());
579        phase.tasks.push(subtask);
580
581        let mut phases = HashMap::new();
582        phases.insert("test".to_string(), phase);
583
584        let ready = get_ready_tasks(&phases, "test", false);
585        assert_eq!(ready.len(), 1);
586        assert_eq!(ready[0].task.id, "1.1");
587    }
588
589    #[test]
590    fn test_get_ready_tasks_priority_sort() {
591        let mut phase = Phase::new("test".to_string());
592
593        let mut low = create_test_task("low", TaskStatus::Pending, vec![]);
594        low.priority = Priority::Low;
595
596        let mut critical = create_test_task("critical", TaskStatus::Pending, vec![]);
597        critical.priority = Priority::Critical;
598
599        let mut high = create_test_task("high", TaskStatus::Pending, vec![]);
600        high.priority = Priority::High;
601
602        phase.tasks.push(low);
603        phase.tasks.push(critical);
604        phase.tasks.push(high);
605
606        let mut phases = HashMap::new();
607        phases.insert("test".to_string(), phase);
608
609        let ready = get_ready_tasks(&phases, "test", false);
610        assert_eq!(ready.len(), 3);
611        assert_eq!(ready[0].task.id, "critical");
612        assert_eq!(ready[1].task.id, "high");
613        assert_eq!(ready[2].task.id, "low");
614    }
615
616    #[test]
617    fn test_count_in_progress() {
618        let mut phase = Phase::new("test".to_string());
619        phase
620            .tasks
621            .push(create_test_task("1", TaskStatus::InProgress, vec![]));
622        phase
623            .tasks
624            .push(create_test_task("2", TaskStatus::InProgress, vec![]));
625        phase
626            .tasks
627            .push(create_test_task("3", TaskStatus::Pending, vec![]));
628        phase
629            .tasks
630            .push(create_test_task("4", TaskStatus::Done, vec![]));
631
632        let mut phases = HashMap::new();
633        phases.insert("test".to_string(), phase);
634
635        assert_eq!(count_in_progress(&phases, "test", false), 2);
636    }
637
638    #[test]
639    fn test_count_remaining() {
640        let mut phase = Phase::new("test".to_string());
641        phase
642            .tasks
643            .push(create_test_task("1", TaskStatus::InProgress, vec![]));
644        phase
645            .tasks
646            .push(create_test_task("2", TaskStatus::Pending, vec![]));
647        phase
648            .tasks
649            .push(create_test_task("3", TaskStatus::Done, vec![]));
650        phase
651            .tasks
652            .push(create_test_task("4", TaskStatus::Failed, vec![]));
653
654        let mut phases = HashMap::new();
655        phases.insert("test".to_string(), phase);
656
657        assert_eq!(count_remaining(&phases, "test", false), 2); // InProgress + Pending
658    }
659
660    #[test]
661    fn test_claim_task_pending() {
662        let mut phase = Phase::new("test".to_string());
663        phase
664            .tasks
665            .push(create_test_task("1", TaskStatus::Pending, vec![]));
666
667        let (_temp_dir, storage) = setup_storage_with_phase(&phase, "test");
668
669        // Should successfully claim the pending task
670        let claimed = claim_task(&storage, "1", "test").unwrap();
671        assert!(claimed);
672
673        // Verify the task is now in-progress
674        let reloaded = storage.load_group("test").unwrap();
675        assert_eq!(
676            reloaded.get_task("1").unwrap().status,
677            TaskStatus::InProgress
678        );
679    }
680
681    #[test]
682    fn test_claim_task_already_in_progress() {
683        let mut phase = Phase::new("test".to_string());
684        phase
685            .tasks
686            .push(create_test_task("1", TaskStatus::InProgress, vec![]));
687
688        let (_temp_dir, storage) = setup_storage_with_phase(&phase, "test");
689
690        // Should fail to claim a task that's already in-progress
691        let claimed = claim_task(&storage, "1", "test").unwrap();
692        assert!(!claimed);
693    }
694
695    #[test]
696    fn test_claim_task_nonexistent() {
697        let mut phase = Phase::new("test".to_string());
698        phase
699            .tasks
700            .push(create_test_task("1", TaskStatus::Pending, vec![]));
701
702        let (_temp_dir, storage) = setup_storage_with_phase(&phase, "test");
703
704        // Should fail to claim a task that doesn't exist
705        let claimed = claim_task(&storage, "nonexistent", "test").unwrap();
706        assert!(!claimed);
707    }
708
709    #[test]
710    fn test_claim_task_already_done() {
711        let mut phase = Phase::new("test".to_string());
712        phase
713            .tasks
714            .push(create_test_task("1", TaskStatus::Done, vec![]));
715
716        let (_temp_dir, storage) = setup_storage_with_phase(&phase, "test");
717
718        // Should fail to claim a task that's already done
719        let claimed = claim_task(&storage, "1", "test").unwrap();
720        assert!(!claimed);
721    }
722}