Skip to main content

ralph/commands/run/parallel/
mod.rs

1//! Parallel run loop supervisor and worker orchestration for direct-push mode.
2//!
3//! Responsibilities:
4//! - Coordinate parallel task execution across multiple workers.
5//! - Manage settings resolution and preflight validation.
6//! - Track worker capacity and task pruning.
7//! - Handle direct-push integration from workers.
8//!
9//! Not handled here:
10//! - Main orchestration loop (see `orchestration.rs`).
11//! - State initialization (see `state_init.rs`).
12//! - Worker lifecycle (see `worker.rs`).
13//! - Integration loop logic (see `integration.rs`).
14//!
15//! Invariants/assumptions:
16//! - Queue order is authoritative for task selection.
17//! - Workers run in isolated per-task workspaces on the target base branch.
18//! - Workers push directly to the target branch (no PRs).
19//! - One active worker per task ID (enforced by upsert_worker).
20
21use crate::agent::AgentOverrides;
22use crate::config;
23use crate::git;
24use crate::timeutil;
25use anyhow::{Context, Result, bail};
26use std::path::{Path, PathBuf};
27
28mod args;
29mod cleanup_guard;
30mod integration;
31mod orchestration;
32mod path_map;
33pub mod state;
34mod state_init;
35mod sync;
36mod worker;
37mod workspace_cleanup;
38
39// =============================================================================
40// Marker File Constants (for CI failure detection)
41// =============================================================================
42
43/// Marker file name for CI gate failure diagnostics.
44/// Written to workspace when CI fails so coordinator/status tooling can inspect failures.
45pub const CI_FAILURE_MARKER_FILE: &str = ".ralph/cache/ci-failure-marker";
46
47/// Marker file name for blocked push outcomes from integration loop.
48pub const BLOCKED_PUSH_MARKER_FILE: &str = ".ralph/cache/parallel/blocked_push.json";
49
50/// Fallback marker file used only when primary marker path is unavailable.
51pub const CI_FAILURE_MARKER_FALLBACK_FILE: &str = ".ralph-ci-failure-marker";
52
53/// Default push backoff intervals in milliseconds.
54pub fn default_push_backoff_ms() -> Vec<u64> {
55    vec![500, 2000, 5000, 10000]
56}
57
58// Re-export public APIs from submodules
59pub(crate) use integration::run_integration_loop;
60pub use integration::{IntegrationConfig, IntegrationOutcome, RemediationHandoff};
61pub(crate) use orchestration::run_loop_parallel;
62pub use state::{WorkerLifecycle, WorkerRecord};
63
64use cleanup_guard::ParallelCleanupGuard;
65use state_init::load_or_init_parallel_state;
66
67pub(crate) struct ParallelRunOptions {
68    pub max_tasks: u32,
69    pub workers: u8,
70    pub agent_overrides: AgentOverrides,
71    pub force: bool,
72}
73
74#[allow(dead_code)]
75pub(crate) struct ParallelSettings {
76    pub(crate) workers: u8,
77    pub(crate) workspace_root: PathBuf,
78    pub(crate) max_push_attempts: u8,
79    pub(crate) push_backoff_ms: Vec<u64>,
80    pub(crate) workspace_retention_hours: u32,
81}
82
83// Settings resolution
84fn resolve_parallel_settings(
85    resolved: &config::Resolved,
86    opts: &ParallelRunOptions,
87) -> Result<ParallelSettings> {
88    let cfg = &resolved.config.parallel;
89    Ok(ParallelSettings {
90        workers: opts.workers,
91        workspace_root: git::workspace_root(&resolved.repo_root, &resolved.config),
92        max_push_attempts: cfg.max_push_attempts.unwrap_or(50),
93        push_backoff_ms: cfg
94            .push_backoff_ms
95            .clone()
96            .unwrap_or_else(default_push_backoff_ms),
97        workspace_retention_hours: cfg.workspace_retention_hours.unwrap_or(24),
98    })
99}
100
101fn overrides_for_parallel_workers(
102    resolved: &config::Resolved,
103    overrides: &AgentOverrides,
104) -> AgentOverrides {
105    let repoprompt_flags =
106        crate::agent::resolve_repoprompt_flags_from_overrides(overrides, resolved);
107    if repoprompt_flags.plan_required || repoprompt_flags.tool_injection {
108        log::warn!(
109            "Parallel workers disable RepoPrompt plan/tooling instructions to keep edits in workspace clones."
110        );
111    }
112
113    let mut worker_overrides = overrides.clone();
114    worker_overrides.repoprompt_plan_required = Some(false);
115    worker_overrides.repoprompt_tool_injection = Some(false);
116    worker_overrides
117}
118
119// Preflight check: require workspace_root to be gitignored if inside repo
120fn preflight_parallel_workspace_root_is_gitignored(
121    repo_root: &Path,
122    workspace_root: &Path,
123) -> Result<()> {
124    // Only enforce when workspace_root is inside the repo.
125    let Ok(rel) = workspace_root.strip_prefix(repo_root) else {
126        return Ok(());
127    };
128
129    let rel_str = rel.to_string_lossy().replace('\\', "/");
130    let rel_trimmed = rel_str.trim_matches('/');
131
132    // If workspace_root == repo_root, that effectively asks to ignore the whole repo (nonsense).
133    if rel_trimmed.is_empty() {
134        bail!(
135            "Parallel preflight: parallel.workspace_root resolves to the repo root ({}). Refusing to run.",
136            repo_root.display()
137        );
138    }
139
140    // Check ignore rules without creating the directory:
141    let dir_candidate = rel_trimmed.to_string();
142    let dummy_candidate = format!("{}/__ralph_ignore_probe__", rel_trimmed);
143
144    let ignored_dir = git::is_path_ignored(repo_root, &dir_candidate)
145        .with_context(|| format!("Parallel preflight: check-ignore {}", dir_candidate))?;
146    let ignored_dummy = git::is_path_ignored(repo_root, &dummy_candidate)
147        .with_context(|| format!("Parallel preflight: check-ignore {}", dummy_candidate))?;
148
149    if ignored_dir || ignored_dummy {
150        return Ok(());
151    }
152
153    let ignore_rule = format!("{}/", rel_trimmed.trim_end_matches('/'));
154    bail!(
155        "Parallel preflight: parallel.workspace_root resolves inside the repo but is not gitignored.\n\
156workspace_root: {}\n\
157repo_root: {}\n\
158\n\
159Ralph will create clone workspaces under this directory, which would leave untracked files and make the repo appear dirty.\n\
160\n\
161Fix options:\n\
1621) Recommended: set parallel.workspace_root to an absolute path OUTSIDE the repo (or remove it to use the default outside-repo location).\n\
1632) If you intentionally keep workspaces inside the repo, ignore it:\n\
164   - Shared (tracked): add `{}` to `.gitignore` and commit it\n\
165   - Local-only: add `{}` to `.git/info/exclude`\n",
166        workspace_root.display(),
167        repo_root.display(),
168        ignore_rule,
169        ignore_rule
170    );
171}
172
173// Worker spawning helper
174fn spawn_worker_with_registered_workspace<CreateWorkspace, SyncWorkspace, SpawnWorker>(
175    guard: &mut ParallelCleanupGuard,
176    task_id: &str,
177    create_workspace: CreateWorkspace,
178    sync_workspace: SyncWorkspace,
179    spawn: SpawnWorker,
180) -> Result<(git::WorkspaceSpec, std::process::Child)>
181where
182    CreateWorkspace: FnOnce() -> Result<git::WorkspaceSpec>,
183    SyncWorkspace: FnOnce(&Path) -> Result<()>,
184    SpawnWorker: FnOnce(&git::WorkspaceSpec) -> Result<std::process::Child>,
185{
186    let workspace = create_workspace()?;
187    guard.register_workspace(task_id.to_string(), workspace.clone());
188    sync_workspace(&workspace.path)?;
189    let child = spawn(&workspace)?;
190    Ok((workspace, child))
191}
192
193// Task pruning: remove stale records
194fn prune_stale_workers(state_file: &mut state::ParallelStateFile) -> Vec<String> {
195    let now = time::OffsetDateTime::now_utc();
196    let ttl_secs: i64 = crate::constants::timeouts::PARALLEL_TERMINAL_WORKER_TTL
197        .as_secs()
198        .try_into()
199        .unwrap_or(i64::MAX);
200
201    let mut dropped = Vec::new();
202    state_file.workers.retain(|worker| {
203        // Don't prune active workers
204        if !worker.is_terminal() {
205            return true;
206        }
207
208        // Time-bound terminal workers so they don't block capacity forever
209        let Some(started_at) = timeutil::parse_rfc3339_opt(&worker.started_at) else {
210            log::warn!(
211                "Dropping stale worker {} with invalid started_at (workspace: {}).",
212                worker.task_id,
213                worker.workspace_path.display()
214            );
215            dropped.push(worker.task_id.clone());
216            return false;
217        };
218
219        let age_secs = (now.unix_timestamp() - started_at.unix_timestamp()).max(0);
220        if age_secs >= ttl_secs {
221            log::warn!(
222                "Dropping stale worker {} after TTL (age_secs={}, ttl_secs={}, started_at='{}', workspace: {}).",
223                worker.task_id,
224                age_secs,
225                ttl_secs,
226                worker.started_at,
227                worker.workspace_path.display()
228            );
229            dropped.push(worker.task_id.clone());
230            return false;
231        }
232
233        true
234    });
235    dropped
236}
237
238// Capacity tracking
239fn effective_active_worker_count(
240    _state_file: &state::ParallelStateFile,
241    guard_in_flight_len: usize,
242) -> usize {
243    guard_in_flight_len
244}
245
246fn initial_tasks_started(_state_file: &state::ParallelStateFile) -> u32 {
247    0
248}
249
250fn can_start_more_tasks(tasks_started: u32, max_tasks: u32) -> bool {
251    max_tasks == 0 || tasks_started < max_tasks
252}
253
254#[cfg(test)]
255mod tests {
256    use super::*;
257    use std::cell::Cell;
258    use tempfile::TempDir;
259
260    fn create_test_cleanup_guard(temp: &TempDir) -> ParallelCleanupGuard {
261        let workspace_root = temp.path().join("workspaces");
262        std::fs::create_dir_all(&workspace_root).expect("create workspace root");
263
264        let state_path = temp.path().join("state.json");
265        let state_file =
266            state::ParallelStateFile::new("2026-02-20T00:00:00Z".to_string(), "main".to_string());
267
268        ParallelCleanupGuard::new_simple(state_path, state_file, workspace_root)
269    }
270
271    #[test]
272    fn prune_stale_workers_retains_recent_terminal_with_missing_workspace() -> Result<()> {
273        let mut state_file =
274            state::ParallelStateFile::new("2026-02-20T00:00:00Z".to_string(), "main".to_string());
275
276        let mut worker = WorkerRecord::new(
277            "RQ-0001",
278            PathBuf::from("/nonexistent/path/RQ-0001"),
279            timeutil::now_utc_rfc3339_or_fallback(),
280        );
281        worker.mark_completed(timeutil::now_utc_rfc3339_or_fallback());
282        state_file.upsert_worker(worker);
283
284        let dropped = prune_stale_workers(&mut state_file);
285
286        assert!(dropped.is_empty());
287        assert_eq!(state_file.workers.len(), 1);
288        assert_eq!(state_file.workers[0].task_id, "RQ-0001");
289        Ok(())
290    }
291
292    #[test]
293    fn prune_stale_workers_retains_active() -> Result<()> {
294        let temp = TempDir::new()?;
295        let workspace_path = temp.path().join("RQ-0002");
296        std::fs::create_dir_all(&workspace_path)?;
297
298        let mut state_file =
299            state::ParallelStateFile::new("2026-02-20T00:00:00Z".to_string(), "main".to_string());
300
301        // Active worker (not terminal)
302        let worker = WorkerRecord::new(
303            "RQ-0002",
304            workspace_path,
305            timeutil::now_utc_rfc3339_or_fallback(),
306        );
307        state_file.upsert_worker(worker);
308
309        let dropped = prune_stale_workers(&mut state_file);
310
311        assert!(dropped.is_empty());
312        assert_eq!(state_file.workers.len(), 1);
313        Ok(())
314    }
315
316    #[test]
317    fn spawn_failure_cleans_registered_workspace() -> Result<()> {
318        let temp = TempDir::new()?;
319        let mut guard = create_test_cleanup_guard(&temp);
320        let workspace_root = temp.path().join("workspaces");
321        let workspace_path = workspace_root.join("RQ-0001");
322
323        let result = spawn_worker_with_registered_workspace(
324            &mut guard,
325            "RQ-0001",
326            || {
327                std::fs::create_dir_all(&workspace_path)?;
328                Ok(git::WorkspaceSpec {
329                    path: workspace_path.clone(),
330                    branch: "main".to_string(),
331                })
332            },
333            |_| Ok(()),
334            |_| Err(anyhow::anyhow!("spawn failed")),
335        );
336
337        assert!(result.is_err());
338        guard.cleanup()?;
339        assert!(!workspace_path.exists());
340        Ok(())
341    }
342
343    #[test]
344    fn sync_failure_cleans_registered_workspace_without_spawning() -> Result<()> {
345        let temp = TempDir::new()?;
346        let mut guard = create_test_cleanup_guard(&temp);
347        let workspace_root = temp.path().join("workspaces");
348        let workspace_path = workspace_root.join("RQ-0002");
349        let spawn_called = Cell::new(false);
350
351        let result = spawn_worker_with_registered_workspace(
352            &mut guard,
353            "RQ-0002",
354            || {
355                std::fs::create_dir_all(&workspace_path)?;
356                Ok(git::WorkspaceSpec {
357                    path: workspace_path.clone(),
358                    branch: "main".to_string(),
359                })
360            },
361            |_| Err(anyhow::anyhow!("sync failed")),
362            |_| {
363                spawn_called.set(true);
364                Err(anyhow::anyhow!("spawn should not run"))
365            },
366        );
367
368        assert!(result.is_err());
369        assert!(!spawn_called.get());
370        guard.cleanup()?;
371        assert!(!workspace_path.exists());
372        Ok(())
373    }
374
375    #[test]
376    fn effective_active_worker_count_uses_max() {
377        let mut state_file =
378            state::ParallelStateFile::new("2026-02-20T00:00:00Z".to_string(), "main".to_string());
379        state_file.upsert_worker(WorkerRecord::new(
380            "RQ-0001",
381            PathBuf::from("/tmp/ws/RQ-0001"),
382            "2026-02-20T00:00:00Z".to_string(),
383        ));
384
385        // Guard in-flight workers are the only capacity authority.
386        // State records are persisted history and must not consume capacity.
387        assert_eq!(effective_active_worker_count(&state_file, 2), 2);
388        assert_eq!(effective_active_worker_count(&state_file, 0), 0);
389    }
390
391    #[test]
392    fn initial_tasks_started_starts_at_zero_per_invocation() {
393        let mut state_file =
394            state::ParallelStateFile::new("2026-02-20T00:00:00Z".to_string(), "main".to_string());
395        let mut failed = WorkerRecord::new(
396            "RQ-0001",
397            PathBuf::from("/tmp/ws/RQ-0001"),
398            "2026-02-20T00:00:00Z".to_string(),
399        );
400        failed.mark_failed("2026-02-20T00:05:00Z".to_string(), "failed");
401        state_file.upsert_worker(failed);
402
403        assert_eq!(initial_tasks_started(&state_file), 0);
404    }
405
406    #[test]
407    fn can_start_more_tasks_logic() {
408        // max_tasks=0 means unlimited
409        assert!(can_start_more_tasks(100, 0));
410
411        // With max_tasks=5
412        assert!(can_start_more_tasks(0, 5));
413        assert!(can_start_more_tasks(4, 5));
414        assert!(!can_start_more_tasks(5, 5));
415        assert!(!can_start_more_tasks(6, 5));
416    }
417}