Skip to main content

ralph/commands/run/
parallel_ops.rs

1//! Parallel operations commands (status, retry) for CLI.
2//!
3//! Responsibilities:
4//! - Implement `ralph run parallel status` to show worker states.
5//! - Implement `ralph run parallel retry` to resume blocked workers.
6//!
7//! Not handled here:
8//! - Worker orchestration (see `parallel/orchestration.rs`).
9//! - Integration loop logic (see `parallel/integration.rs`).
10//!
11//! Invariants/assumptions:
12//! - Commands run in coordinator repo context (CWD is repo root).
13//! - State file is at `.ralph/cache/parallel/state.json`.
14
15use crate::commands::run::parallel::state::{load_state, state_file_path};
16use anyhow::{Context, Result};
17use std::collections::HashMap;
18
19/// Show status of parallel workers.
20///
21/// If `json` is true, outputs structured JSON. Otherwise, prints human-readable table.
22pub fn parallel_status(resolved: &crate::config::Resolved, json: bool) -> Result<()> {
23    let state_path = state_file_path(&resolved.repo_root);
24
25    let state_opt = load_state(&state_path).with_context(|| {
26        format!(
27            "Failed to load parallel state from {}",
28            state_path.display()
29        )
30    })?;
31
32    let state = match state_opt {
33        Some(s) => s,
34        None => {
35            if json {
36                println!(
37                    "{{\"schema_version\":3,\"workers\":[],\"message\":\"No parallel state found\"}}"
38                );
39            } else {
40                println!("No parallel run state found.");
41                println!("Run `ralph run loop --parallel N` to start parallel execution.");
42            }
43            return Ok(());
44        }
45    };
46
47    if json {
48        // Output raw state as JSON
49        let json_str =
50            serde_json::to_string_pretty(&state).context("Failed to serialize state to JSON")?;
51        println!("{}", json_str);
52    } else {
53        // Human-readable output
54        print_status_table(&state);
55    }
56
57    Ok(())
58}
59
60fn print_status_table(state: &crate::commands::run::parallel::state::ParallelStateFile) {
61    use crate::commands::run::parallel::state::WorkerLifecycle;
62
63    println!("Parallel Run Status");
64    println!("===================");
65    println!("Schema Version: {}", state.schema_version);
66    println!("Started:        {}", state.started_at);
67    println!("Target Branch:  {}", state.target_branch);
68    println!();
69
70    if state.workers.is_empty() {
71        println!("No workers tracked.");
72        return;
73    }
74
75    // Group workers by lifecycle
76    let mut by_lifecycle: HashMap<
77        WorkerLifecycle,
78        Vec<&crate::commands::run::parallel::state::WorkerRecord>,
79    > = HashMap::new();
80    for worker in &state.workers {
81        by_lifecycle
82            .entry(worker.lifecycle.clone())
83            .or_default()
84            .push(worker);
85    }
86
87    // Print summary
88    println!(
89        "Total: {} | Running: {} | Integrating: {} | Completed: {} | Failed: {} | Blocked: {}",
90        state.workers.len(),
91        by_lifecycle
92            .get(&WorkerLifecycle::Running)
93            .map_or(0, |v| v.len()),
94        by_lifecycle
95            .get(&WorkerLifecycle::Integrating)
96            .map_or(0, |v| v.len()),
97        by_lifecycle
98            .get(&WorkerLifecycle::Completed)
99            .map_or(0, |v| v.len()),
100        by_lifecycle
101            .get(&WorkerLifecycle::Failed)
102            .map_or(0, |v| v.len()),
103        by_lifecycle
104            .get(&WorkerLifecycle::BlockedPush)
105            .map_or(0, |v| v.len()),
106    );
107    println!();
108
109    // Print active workers (running, integrating)
110    if let Some(active) = by_lifecycle.get(&WorkerLifecycle::Running) {
111        println!("Running Workers:");
112        for w in active {
113            println!(
114                "  {} - started {} ({} attempts)",
115                w.task_id, w.started_at, w.push_attempts
116            );
117        }
118        println!();
119    }
120
121    if let Some(integrating) = by_lifecycle.get(&WorkerLifecycle::Integrating) {
122        println!("Integrating Workers:");
123        for w in integrating {
124            println!(
125                "  {} - started {} ({} attempts)",
126                w.task_id, w.started_at, w.push_attempts
127            );
128        }
129        println!();
130    }
131
132    // Print terminal workers
133    if let Some(completed) = by_lifecycle.get(&WorkerLifecycle::Completed) {
134        println!("Completed Workers:");
135        for w in completed {
136            println!(
137                "  {} - completed {}",
138                w.task_id,
139                w.completed_at.as_deref().unwrap_or("unknown")
140            );
141        }
142        println!();
143    }
144
145    if let Some(failed) = by_lifecycle.get(&WorkerLifecycle::Failed) {
146        println!("Failed Workers:");
147        for w in failed {
148            println!(
149                "  {} - {}",
150                w.task_id,
151                w.last_error.as_deref().unwrap_or("no error")
152            );
153        }
154        println!();
155    }
156
157    if let Some(blocked) = by_lifecycle.get(&WorkerLifecycle::BlockedPush) {
158        println!("Blocked Workers (use `ralph run parallel retry --task <ID>`):");
159        for w in blocked {
160            println!(
161                "  {} - {} ({} attempts)",
162                w.task_id,
163                w.last_error.as_deref().unwrap_or("blocked"),
164                w.push_attempts
165            );
166        }
167        println!();
168    }
169}
170
171/// Retry a blocked or failed parallel worker.
172///
173/// This resumes the integration loop for a worker that is in a terminal
174/// state (BlockedPush or Failed).
175pub fn parallel_retry(
176    resolved: &crate::config::Resolved,
177    task_id: &str,
178    _force: bool,
179) -> Result<()> {
180    use crate::commands::run::parallel::state::{WorkerLifecycle, load_state, save_state};
181
182    let state_path = state_file_path(&resolved.repo_root);
183
184    let mut state = match load_state(&state_path).with_context(|| {
185        format!(
186            "Failed to load parallel state from {}",
187            state_path.display()
188        )
189    })? {
190        Some(s) => s,
191        None => {
192            anyhow::bail!("No parallel run state found. Run `ralph run loop --parallel N` first.");
193        }
194    };
195
196    // Find the worker
197    let worker = state
198        .get_worker(task_id)
199        .ok_or_else(|| anyhow::anyhow!("Task {} not found in parallel state", task_id))?;
200
201    // Check if retryable
202    match worker.lifecycle {
203        WorkerLifecycle::BlockedPush | WorkerLifecycle::Failed => {
204            // Retryable - reset to Running and let the worker resume
205            let mut updated_worker = worker.clone();
206            updated_worker.lifecycle = WorkerLifecycle::Running;
207            updated_worker.last_error = None;
208            // Don't reset push_attempts - keep history
209
210            state.upsert_worker(updated_worker);
211            save_state(&state_path, &state).context("Failed to save updated worker state")?;
212
213            println!("Worker {} marked for retry.", task_id);
214            println!("Run `ralph run loop --parallel N` to resume parallel execution.");
215
216            Ok(())
217        }
218        WorkerLifecycle::Completed => {
219            anyhow::bail!(
220                "Task {} has already completed successfully. No retry needed.",
221                task_id
222            )
223        }
224        WorkerLifecycle::Running | WorkerLifecycle::Integrating => {
225            anyhow::bail!(
226                "Task {} is currently {}. Cannot retry an active worker.",
227                task_id,
228                match worker.lifecycle {
229                    WorkerLifecycle::Running => "running",
230                    WorkerLifecycle::Integrating => "integrating",
231                    _ => unreachable!(),
232                }
233            )
234        }
235    }
236}