ralph/commands/run/
parallel_ops.rs1use crate::commands::run::parallel::state::{load_state, state_file_path};
16use anyhow::{Context, Result};
17use std::collections::HashMap;
18
19pub fn parallel_status(resolved: &crate::config::Resolved, json: bool) -> Result<()> {
23 let state_path = state_file_path(&resolved.repo_root);
24
25 let state_opt = load_state(&state_path).with_context(|| {
26 format!(
27 "Failed to load parallel state from {}",
28 state_path.display()
29 )
30 })?;
31
32 let state = match state_opt {
33 Some(s) => s,
34 None => {
35 if json {
36 println!(
37 "{{\"schema_version\":3,\"workers\":[],\"message\":\"No parallel state found\"}}"
38 );
39 } else {
40 println!("No parallel run state found.");
41 println!("Run `ralph run loop --parallel N` to start parallel execution.");
42 }
43 return Ok(());
44 }
45 };
46
47 if json {
48 let json_str =
50 serde_json::to_string_pretty(&state).context("Failed to serialize state to JSON")?;
51 println!("{}", json_str);
52 } else {
53 print_status_table(&state);
55 }
56
57 Ok(())
58}
59
60fn print_status_table(state: &crate::commands::run::parallel::state::ParallelStateFile) {
61 use crate::commands::run::parallel::state::WorkerLifecycle;
62
63 println!("Parallel Run Status");
64 println!("===================");
65 println!("Schema Version: {}", state.schema_version);
66 println!("Started: {}", state.started_at);
67 println!("Target Branch: {}", state.target_branch);
68 println!();
69
70 if state.workers.is_empty() {
71 println!("No workers tracked.");
72 return;
73 }
74
75 let mut by_lifecycle: HashMap<
77 WorkerLifecycle,
78 Vec<&crate::commands::run::parallel::state::WorkerRecord>,
79 > = HashMap::new();
80 for worker in &state.workers {
81 by_lifecycle
82 .entry(worker.lifecycle.clone())
83 .or_default()
84 .push(worker);
85 }
86
87 println!(
89 "Total: {} | Running: {} | Integrating: {} | Completed: {} | Failed: {} | Blocked: {}",
90 state.workers.len(),
91 by_lifecycle
92 .get(&WorkerLifecycle::Running)
93 .map_or(0, |v| v.len()),
94 by_lifecycle
95 .get(&WorkerLifecycle::Integrating)
96 .map_or(0, |v| v.len()),
97 by_lifecycle
98 .get(&WorkerLifecycle::Completed)
99 .map_or(0, |v| v.len()),
100 by_lifecycle
101 .get(&WorkerLifecycle::Failed)
102 .map_or(0, |v| v.len()),
103 by_lifecycle
104 .get(&WorkerLifecycle::BlockedPush)
105 .map_or(0, |v| v.len()),
106 );
107 println!();
108
109 if let Some(active) = by_lifecycle.get(&WorkerLifecycle::Running) {
111 println!("Running Workers:");
112 for w in active {
113 println!(
114 " {} - started {} ({} attempts)",
115 w.task_id, w.started_at, w.push_attempts
116 );
117 }
118 println!();
119 }
120
121 if let Some(integrating) = by_lifecycle.get(&WorkerLifecycle::Integrating) {
122 println!("Integrating Workers:");
123 for w in integrating {
124 println!(
125 " {} - started {} ({} attempts)",
126 w.task_id, w.started_at, w.push_attempts
127 );
128 }
129 println!();
130 }
131
132 if let Some(completed) = by_lifecycle.get(&WorkerLifecycle::Completed) {
134 println!("Completed Workers:");
135 for w in completed {
136 println!(
137 " {} - completed {}",
138 w.task_id,
139 w.completed_at.as_deref().unwrap_or("unknown")
140 );
141 }
142 println!();
143 }
144
145 if let Some(failed) = by_lifecycle.get(&WorkerLifecycle::Failed) {
146 println!("Failed Workers:");
147 for w in failed {
148 println!(
149 " {} - {}",
150 w.task_id,
151 w.last_error.as_deref().unwrap_or("no error")
152 );
153 }
154 println!();
155 }
156
157 if let Some(blocked) = by_lifecycle.get(&WorkerLifecycle::BlockedPush) {
158 println!("Blocked Workers (use `ralph run parallel retry --task <ID>`):");
159 for w in blocked {
160 println!(
161 " {} - {} ({} attempts)",
162 w.task_id,
163 w.last_error.as_deref().unwrap_or("blocked"),
164 w.push_attempts
165 );
166 }
167 println!();
168 }
169}
170
171pub fn parallel_retry(
176 resolved: &crate::config::Resolved,
177 task_id: &str,
178 _force: bool,
179) -> Result<()> {
180 use crate::commands::run::parallel::state::{WorkerLifecycle, load_state, save_state};
181
182 let state_path = state_file_path(&resolved.repo_root);
183
184 let mut state = match load_state(&state_path).with_context(|| {
185 format!(
186 "Failed to load parallel state from {}",
187 state_path.display()
188 )
189 })? {
190 Some(s) => s,
191 None => {
192 anyhow::bail!("No parallel run state found. Run `ralph run loop --parallel N` first.");
193 }
194 };
195
196 let worker = state
198 .get_worker(task_id)
199 .ok_or_else(|| anyhow::anyhow!("Task {} not found in parallel state", task_id))?;
200
201 match worker.lifecycle {
203 WorkerLifecycle::BlockedPush | WorkerLifecycle::Failed => {
204 let mut updated_worker = worker.clone();
206 updated_worker.lifecycle = WorkerLifecycle::Running;
207 updated_worker.last_error = None;
208 state.upsert_worker(updated_worker);
211 save_state(&state_path, &state).context("Failed to save updated worker state")?;
212
213 println!("Worker {} marked for retry.", task_id);
214 println!("Run `ralph run loop --parallel N` to resume parallel execution.");
215
216 Ok(())
217 }
218 WorkerLifecycle::Completed => {
219 anyhow::bail!(
220 "Task {} has already completed successfully. No retry needed.",
221 task_id
222 )
223 }
224 WorkerLifecycle::Running | WorkerLifecycle::Integrating => {
225 anyhow::bail!(
226 "Task {} is currently {}. Cannot retry an active worker.",
227 task_id,
228 match worker.lifecycle {
229 WorkerLifecycle::Running => "running",
230 WorkerLifecycle::Integrating => "integrating",
231 _ => unreachable!(),
232 }
233 )
234 }
235 }
236}