ralph/commands/run/parallel/
mod.rs1use crate::agent::AgentOverrides;
22use crate::config;
23use crate::git;
24use crate::timeutil;
25use anyhow::{Context, Result, bail};
26use std::path::{Path, PathBuf};
27
28mod args;
29mod cleanup_guard;
30mod integration;
31mod orchestration;
32mod path_map;
33pub mod state;
34mod state_init;
35mod sync;
36mod worker;
37mod workspace_cleanup;
38
39pub const CI_FAILURE_MARKER_FILE: &str = ".ralph/cache/ci-failure-marker";
46
47pub const BLOCKED_PUSH_MARKER_FILE: &str = ".ralph/cache/parallel/blocked_push.json";
49
50pub const CI_FAILURE_MARKER_FALLBACK_FILE: &str = ".ralph-ci-failure-marker";
52
53pub fn default_push_backoff_ms() -> Vec<u64> {
55 vec![500, 2000, 5000, 10000]
56}
57
58pub(crate) use integration::run_integration_loop;
60pub use integration::{IntegrationConfig, IntegrationOutcome, RemediationHandoff};
61pub(crate) use orchestration::run_loop_parallel;
62pub use state::{WorkerLifecycle, WorkerRecord};
63
64use cleanup_guard::ParallelCleanupGuard;
65use state_init::load_or_init_parallel_state;
66
67pub(crate) struct ParallelRunOptions {
68 pub max_tasks: u32,
69 pub workers: u8,
70 pub agent_overrides: AgentOverrides,
71 pub force: bool,
72}
73
74#[allow(dead_code)]
75pub(crate) struct ParallelSettings {
76 pub(crate) workers: u8,
77 pub(crate) workspace_root: PathBuf,
78 pub(crate) max_push_attempts: u8,
79 pub(crate) push_backoff_ms: Vec<u64>,
80 pub(crate) workspace_retention_hours: u32,
81}
82
83fn resolve_parallel_settings(
85 resolved: &config::Resolved,
86 opts: &ParallelRunOptions,
87) -> Result<ParallelSettings> {
88 let cfg = &resolved.config.parallel;
89 Ok(ParallelSettings {
90 workers: opts.workers,
91 workspace_root: git::workspace_root(&resolved.repo_root, &resolved.config),
92 max_push_attempts: cfg.max_push_attempts.unwrap_or(50),
93 push_backoff_ms: cfg
94 .push_backoff_ms
95 .clone()
96 .unwrap_or_else(default_push_backoff_ms),
97 workspace_retention_hours: cfg.workspace_retention_hours.unwrap_or(24),
98 })
99}
100
101fn overrides_for_parallel_workers(
102 resolved: &config::Resolved,
103 overrides: &AgentOverrides,
104) -> AgentOverrides {
105 let repoprompt_flags =
106 crate::agent::resolve_repoprompt_flags_from_overrides(overrides, resolved);
107 if repoprompt_flags.plan_required || repoprompt_flags.tool_injection {
108 log::warn!(
109 "Parallel workers disable RepoPrompt plan/tooling instructions to keep edits in workspace clones."
110 );
111 }
112
113 let mut worker_overrides = overrides.clone();
114 worker_overrides.repoprompt_plan_required = Some(false);
115 worker_overrides.repoprompt_tool_injection = Some(false);
116 worker_overrides
117}
118
119fn preflight_parallel_workspace_root_is_gitignored(
121 repo_root: &Path,
122 workspace_root: &Path,
123) -> Result<()> {
124 let Ok(rel) = workspace_root.strip_prefix(repo_root) else {
126 return Ok(());
127 };
128
129 let rel_str = rel.to_string_lossy().replace('\\', "/");
130 let rel_trimmed = rel_str.trim_matches('/');
131
132 if rel_trimmed.is_empty() {
134 bail!(
135 "Parallel preflight: parallel.workspace_root resolves to the repo root ({}). Refusing to run.",
136 repo_root.display()
137 );
138 }
139
140 let dir_candidate = rel_trimmed.to_string();
142 let dummy_candidate = format!("{}/__ralph_ignore_probe__", rel_trimmed);
143
144 let ignored_dir = git::is_path_ignored(repo_root, &dir_candidate)
145 .with_context(|| format!("Parallel preflight: check-ignore {}", dir_candidate))?;
146 let ignored_dummy = git::is_path_ignored(repo_root, &dummy_candidate)
147 .with_context(|| format!("Parallel preflight: check-ignore {}", dummy_candidate))?;
148
149 if ignored_dir || ignored_dummy {
150 return Ok(());
151 }
152
153 let ignore_rule = format!("{}/", rel_trimmed.trim_end_matches('/'));
154 bail!(
155 "Parallel preflight: parallel.workspace_root resolves inside the repo but is not gitignored.\n\
156workspace_root: {}\n\
157repo_root: {}\n\
158\n\
159Ralph will create clone workspaces under this directory, which would leave untracked files and make the repo appear dirty.\n\
160\n\
161Fix options:\n\
1621) Recommended: set parallel.workspace_root to an absolute path OUTSIDE the repo (or remove it to use the default outside-repo location).\n\
1632) If you intentionally keep workspaces inside the repo, ignore it:\n\
164 - Shared (tracked): add `{}` to `.gitignore` and commit it\n\
165 - Local-only: add `{}` to `.git/info/exclude`\n",
166 workspace_root.display(),
167 repo_root.display(),
168 ignore_rule,
169 ignore_rule
170 );
171}
172
173fn spawn_worker_with_registered_workspace<CreateWorkspace, SyncWorkspace, SpawnWorker>(
175 guard: &mut ParallelCleanupGuard,
176 task_id: &str,
177 create_workspace: CreateWorkspace,
178 sync_workspace: SyncWorkspace,
179 spawn: SpawnWorker,
180) -> Result<(git::WorkspaceSpec, std::process::Child)>
181where
182 CreateWorkspace: FnOnce() -> Result<git::WorkspaceSpec>,
183 SyncWorkspace: FnOnce(&Path) -> Result<()>,
184 SpawnWorker: FnOnce(&git::WorkspaceSpec) -> Result<std::process::Child>,
185{
186 let workspace = create_workspace()?;
187 guard.register_workspace(task_id.to_string(), workspace.clone());
188 sync_workspace(&workspace.path)?;
189 let child = spawn(&workspace)?;
190 Ok((workspace, child))
191}
192
193fn prune_stale_workers(state_file: &mut state::ParallelStateFile) -> Vec<String> {
195 let now = time::OffsetDateTime::now_utc();
196 let ttl_secs: i64 = crate::constants::timeouts::PARALLEL_TERMINAL_WORKER_TTL
197 .as_secs()
198 .try_into()
199 .unwrap_or(i64::MAX);
200
201 let mut dropped = Vec::new();
202 state_file.workers.retain(|worker| {
203 if !worker.is_terminal() {
205 return true;
206 }
207
208 let Some(started_at) = timeutil::parse_rfc3339_opt(&worker.started_at) else {
210 log::warn!(
211 "Dropping stale worker {} with invalid started_at (workspace: {}).",
212 worker.task_id,
213 worker.workspace_path.display()
214 );
215 dropped.push(worker.task_id.clone());
216 return false;
217 };
218
219 let age_secs = (now.unix_timestamp() - started_at.unix_timestamp()).max(0);
220 if age_secs >= ttl_secs {
221 log::warn!(
222 "Dropping stale worker {} after TTL (age_secs={}, ttl_secs={}, started_at='{}', workspace: {}).",
223 worker.task_id,
224 age_secs,
225 ttl_secs,
226 worker.started_at,
227 worker.workspace_path.display()
228 );
229 dropped.push(worker.task_id.clone());
230 return false;
231 }
232
233 true
234 });
235 dropped
236}
237
238fn effective_active_worker_count(
240 _state_file: &state::ParallelStateFile,
241 guard_in_flight_len: usize,
242) -> usize {
243 guard_in_flight_len
244}
245
246fn initial_tasks_started(_state_file: &state::ParallelStateFile) -> u32 {
247 0
248}
249
250fn can_start_more_tasks(tasks_started: u32, max_tasks: u32) -> bool {
251 max_tasks == 0 || tasks_started < max_tasks
252}
253
254#[cfg(test)]
255mod tests {
256 use super::*;
257 use std::cell::Cell;
258 use tempfile::TempDir;
259
260 fn create_test_cleanup_guard(temp: &TempDir) -> ParallelCleanupGuard {
261 let workspace_root = temp.path().join("workspaces");
262 std::fs::create_dir_all(&workspace_root).expect("create workspace root");
263
264 let state_path = temp.path().join("state.json");
265 let state_file =
266 state::ParallelStateFile::new("2026-02-20T00:00:00Z".to_string(), "main".to_string());
267
268 ParallelCleanupGuard::new_simple(state_path, state_file, workspace_root)
269 }
270
271 #[test]
272 fn prune_stale_workers_retains_recent_terminal_with_missing_workspace() -> Result<()> {
273 let mut state_file =
274 state::ParallelStateFile::new("2026-02-20T00:00:00Z".to_string(), "main".to_string());
275
276 let mut worker = WorkerRecord::new(
277 "RQ-0001",
278 PathBuf::from("/nonexistent/path/RQ-0001"),
279 timeutil::now_utc_rfc3339_or_fallback(),
280 );
281 worker.mark_completed(timeutil::now_utc_rfc3339_or_fallback());
282 state_file.upsert_worker(worker);
283
284 let dropped = prune_stale_workers(&mut state_file);
285
286 assert!(dropped.is_empty());
287 assert_eq!(state_file.workers.len(), 1);
288 assert_eq!(state_file.workers[0].task_id, "RQ-0001");
289 Ok(())
290 }
291
292 #[test]
293 fn prune_stale_workers_retains_active() -> Result<()> {
294 let temp = TempDir::new()?;
295 let workspace_path = temp.path().join("RQ-0002");
296 std::fs::create_dir_all(&workspace_path)?;
297
298 let mut state_file =
299 state::ParallelStateFile::new("2026-02-20T00:00:00Z".to_string(), "main".to_string());
300
301 let worker = WorkerRecord::new(
303 "RQ-0002",
304 workspace_path,
305 timeutil::now_utc_rfc3339_or_fallback(),
306 );
307 state_file.upsert_worker(worker);
308
309 let dropped = prune_stale_workers(&mut state_file);
310
311 assert!(dropped.is_empty());
312 assert_eq!(state_file.workers.len(), 1);
313 Ok(())
314 }
315
316 #[test]
317 fn spawn_failure_cleans_registered_workspace() -> Result<()> {
318 let temp = TempDir::new()?;
319 let mut guard = create_test_cleanup_guard(&temp);
320 let workspace_root = temp.path().join("workspaces");
321 let workspace_path = workspace_root.join("RQ-0001");
322
323 let result = spawn_worker_with_registered_workspace(
324 &mut guard,
325 "RQ-0001",
326 || {
327 std::fs::create_dir_all(&workspace_path)?;
328 Ok(git::WorkspaceSpec {
329 path: workspace_path.clone(),
330 branch: "main".to_string(),
331 })
332 },
333 |_| Ok(()),
334 |_| Err(anyhow::anyhow!("spawn failed")),
335 );
336
337 assert!(result.is_err());
338 guard.cleanup()?;
339 assert!(!workspace_path.exists());
340 Ok(())
341 }
342
343 #[test]
344 fn sync_failure_cleans_registered_workspace_without_spawning() -> Result<()> {
345 let temp = TempDir::new()?;
346 let mut guard = create_test_cleanup_guard(&temp);
347 let workspace_root = temp.path().join("workspaces");
348 let workspace_path = workspace_root.join("RQ-0002");
349 let spawn_called = Cell::new(false);
350
351 let result = spawn_worker_with_registered_workspace(
352 &mut guard,
353 "RQ-0002",
354 || {
355 std::fs::create_dir_all(&workspace_path)?;
356 Ok(git::WorkspaceSpec {
357 path: workspace_path.clone(),
358 branch: "main".to_string(),
359 })
360 },
361 |_| Err(anyhow::anyhow!("sync failed")),
362 |_| {
363 spawn_called.set(true);
364 Err(anyhow::anyhow!("spawn should not run"))
365 },
366 );
367
368 assert!(result.is_err());
369 assert!(!spawn_called.get());
370 guard.cleanup()?;
371 assert!(!workspace_path.exists());
372 Ok(())
373 }
374
375 #[test]
376 fn effective_active_worker_count_uses_max() {
377 let mut state_file =
378 state::ParallelStateFile::new("2026-02-20T00:00:00Z".to_string(), "main".to_string());
379 state_file.upsert_worker(WorkerRecord::new(
380 "RQ-0001",
381 PathBuf::from("/tmp/ws/RQ-0001"),
382 "2026-02-20T00:00:00Z".to_string(),
383 ));
384
385 assert_eq!(effective_active_worker_count(&state_file, 2), 2);
388 assert_eq!(effective_active_worker_count(&state_file, 0), 0);
389 }
390
391 #[test]
392 fn initial_tasks_started_starts_at_zero_per_invocation() {
393 let mut state_file =
394 state::ParallelStateFile::new("2026-02-20T00:00:00Z".to_string(), "main".to_string());
395 let mut failed = WorkerRecord::new(
396 "RQ-0001",
397 PathBuf::from("/tmp/ws/RQ-0001"),
398 "2026-02-20T00:00:00Z".to_string(),
399 );
400 failed.mark_failed("2026-02-20T00:05:00Z".to_string(), "failed");
401 state_file.upsert_worker(failed);
402
403 assert_eq!(initial_tasks_started(&state_file), 0);
404 }
405
406 #[test]
407 fn can_start_more_tasks_logic() {
408 assert!(can_start_more_tasks(100, 0));
410
411 assert!(can_start_more_tasks(0, 5));
413 assert!(can_start_more_tasks(4, 5));
414 assert!(!can_start_more_tasks(5, 5));
415 assert!(!can_start_more_tasks(6, 5));
416 }
417}