Skip to main content

ralph/commands/run/parallel/
state.rs

1//! Parallel run state persistence for crash recovery.
2//!
3//! Responsibilities:
4//! - Define the parallel state file format and helpers for direct-push mode.
5//! - Persist and reload state for in-flight workers.
6//!
7//! Not handled here:
8//! - Worker orchestration or process management (see `parallel/mod.rs`).
9//! - Integration loop logic (see `worker.rs`).
10//!
11//! Invariants/assumptions:
12//! - State file lives at `.ralph/cache/parallel/state.json`.
13//! - Callers update and persist state after each significant transition.
14//! - Deserialization is tolerant of missing/unknown fields; callers normalize and persist the canonical shape.
15//! - Schema version migrations are applied on load to ensure compatibility.
16
17use crate::fsutil;
18use anyhow::{Context, Result};
19use serde::{Deserialize, Serialize};
20use std::path::{Path, PathBuf};
21
22// =============================================================================
23// Schema Version and Migration
24// =============================================================================
25
26/// Current parallel state schema version.
27///
28/// Version history:
29/// - v1: Legacy schema with PR metadata and finished_without_pr
30/// - v2: Minimal restart-safe schema with PR records and pending merges
31/// - v3: Direct-push mode - worker lifecycle only, no PR/merge tracking
32pub const PARALLEL_STATE_SCHEMA_VERSION: u32 = 3;
33
34// =============================================================================
35// Worker Lifecycle States
36// =============================================================================
37
38/// Lifecycle states for a parallel worker.
39#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash, Default)]
40#[serde(rename_all = "snake_case")]
41pub enum WorkerLifecycle {
42    /// Worker is running task phases.
43    #[default]
44    Running,
45    /// Worker is in the integration loop (rebase, conflict resolution, push).
46    Integrating,
47    /// Worker completed successfully (push succeeded).
48    Completed,
49    /// Worker failed with a terminal error.
50    Failed,
51    /// Push is blocked (conflicts, CI failure, or non-retryable error).
52    BlockedPush,
53}
54
55/// A worker record tracking task execution and integration.
56#[derive(Debug, Clone, Serialize, Deserialize)]
57pub struct WorkerRecord {
58    /// Task ID associated with this worker.
59    pub task_id: String,
60    /// Absolute path to the workspace directory.
61    pub workspace_path: PathBuf,
62    /// Current lifecycle state.
63    #[serde(default)]
64    pub lifecycle: WorkerLifecycle,
65    /// Timestamp when the worker was started (RFC3339).
66    pub started_at: String,
67    /// Timestamp when the worker completed/failed (RFC3339).
68    #[serde(skip_serializing_if = "Option::is_none")]
69    pub completed_at: Option<String>,
70    /// Number of push attempts made.
71    #[serde(default)]
72    pub push_attempts: u32,
73    /// Last error message if failed/blocked.
74    #[serde(skip_serializing_if = "Option::is_none")]
75    pub last_error: Option<String>,
76}
77
78impl WorkerRecord {
79    pub fn new(task_id: impl Into<String>, workspace_path: PathBuf, started_at: String) -> Self {
80        Self {
81            task_id: task_id.into(),
82            workspace_path,
83            lifecycle: WorkerLifecycle::Running,
84            started_at,
85            completed_at: None,
86            push_attempts: 0,
87            last_error: None,
88        }
89    }
90
91    /// Mark the worker as transitioning to integration phase.
92    pub fn start_integration(&mut self) {
93        self.lifecycle = WorkerLifecycle::Integrating;
94    }
95
96    /// Mark the worker as completed successfully.
97    pub fn mark_completed(&mut self, timestamp: String) {
98        self.lifecycle = WorkerLifecycle::Completed;
99        self.completed_at = Some(timestamp);
100    }
101
102    /// Mark the worker as failed.
103    pub fn mark_failed(&mut self, timestamp: String, error: impl Into<String>) {
104        self.lifecycle = WorkerLifecycle::Failed;
105        self.completed_at = Some(timestamp);
106        self.last_error = Some(error.into());
107    }
108
109    /// Mark the worker as blocked on push.
110    pub fn mark_blocked(&mut self, timestamp: String, error: impl Into<String>) {
111        self.lifecycle = WorkerLifecycle::BlockedPush;
112        self.completed_at = Some(timestamp);
113        self.last_error = Some(error.into());
114    }
115
116    /// Increment push attempt counter.
117    pub fn increment_push_attempt(&mut self) {
118        self.push_attempts += 1;
119    }
120
121    /// Returns true if the worker is in a terminal state.
122    pub fn is_terminal(&self) -> bool {
123        matches!(
124            self.lifecycle,
125            WorkerLifecycle::Completed | WorkerLifecycle::Failed | WorkerLifecycle::BlockedPush
126        )
127    }
128}
129
130// =============================================================================
131// State File
132// =============================================================================
133
134/// Parallel state file for direct-push mode.
135#[derive(Debug, Clone, Serialize, Deserialize)]
136pub struct ParallelStateFile {
137    /// Schema version for migration compatibility.
138    #[serde(default = "default_schema_version")]
139    pub schema_version: u32,
140    /// Timestamp when the parallel run started (RFC3339).
141    #[serde(default)]
142    pub started_at: String,
143    /// Target branch for direct pushes.
144    #[serde(default)]
145    pub target_branch: String,
146    /// Active workers (one per task ID).
147    #[serde(default)]
148    pub workers: Vec<WorkerRecord>,
149}
150
151fn default_schema_version() -> u32 {
152    1
153}
154
155impl ParallelStateFile {
156    pub fn new(started_at: impl Into<String>, target_branch: impl Into<String>) -> Self {
157        Self {
158            schema_version: PARALLEL_STATE_SCHEMA_VERSION,
159            started_at: started_at.into(),
160            target_branch: target_branch.into(),
161            workers: Vec::new(),
162        }
163    }
164
165    /// Upsert a worker record. Replaces existing if task_id matches.
166    pub fn upsert_worker(&mut self, record: WorkerRecord) {
167        if let Some(existing) = self
168            .workers
169            .iter_mut()
170            .find(|w| w.task_id == record.task_id)
171        {
172            *existing = record;
173        } else {
174            self.workers.push(record);
175        }
176    }
177
178    /// Remove a worker by task_id.
179    pub fn remove_worker(&mut self, task_id: &str) {
180        self.workers.retain(|w| w.task_id != task_id);
181    }
182
183    /// Get a worker by task_id.
184    pub fn get_worker(&self, task_id: &str) -> Option<&WorkerRecord> {
185        self.workers.iter().find(|w| w.task_id == task_id)
186    }
187
188    /// Get a mutable worker by task_id.
189    pub fn get_worker_mut(&mut self, task_id: &str) -> Option<&mut WorkerRecord> {
190        self.workers.iter_mut().find(|w| w.task_id == task_id)
191    }
192
193    /// Returns true if there's a worker for this task_id.
194    pub fn has_worker(&self, task_id: &str) -> bool {
195        self.workers.iter().any(|w| w.task_id == task_id)
196    }
197
198    /// Get all workers in a specific lifecycle state.
199    pub fn workers_by_lifecycle(
200        &self,
201        lifecycle: WorkerLifecycle,
202    ) -> impl Iterator<Item = &WorkerRecord> {
203        self.workers
204            .iter()
205            .filter(move |w| w.lifecycle == lifecycle)
206    }
207
208    /// Count workers that are not in a terminal state.
209    pub fn active_worker_count(&self) -> usize {
210        self.workers.iter().filter(|w| !w.is_terminal()).count()
211    }
212
213    /// Count workers in the blocked_push state.
214    pub fn blocked_worker_count(&self) -> usize {
215        self.workers_by_lifecycle(WorkerLifecycle::BlockedPush)
216            .count()
217    }
218}
219
220pub fn state_file_path(repo_root: &Path) -> PathBuf {
221    repo_root.join(".ralph/cache/parallel/state.json")
222}
223
224/// Migrate legacy state to current schema version.
225///
226/// v1/v2 -> v3:
227/// - Drop PR records, pending merges, tasks_in_flight
228/// - Create fresh v3 state with empty workers list
229fn migrate_state(mut state: ParallelStateFile) -> ParallelStateFile {
230    if state.schema_version < PARALLEL_STATE_SCHEMA_VERSION {
231        log::info!(
232            "Migrating parallel state from schema v{} to v{}",
233            state.schema_version,
234            PARALLEL_STATE_SCHEMA_VERSION
235        );
236        // v3 is a clean break - we drop legacy fields and start fresh
237        // Any in-flight work from v1/v2 is lost (should be handled by caller)
238        state.schema_version = PARALLEL_STATE_SCHEMA_VERSION;
239        state.workers.clear();
240    }
241    state
242}
243
244pub fn load_state(path: &Path) -> Result<Option<ParallelStateFile>> {
245    if !path.exists() {
246        return Ok(None);
247    }
248    let raw = std::fs::read_to_string(path)
249        .with_context(|| format!("read parallel state {}", path.display()))?;
250    let state: ParallelStateFile =
251        crate::jsonc::parse_jsonc::<ParallelStateFile>(&raw, "parallel state")?;
252
253    // Apply migrations
254    let state = migrate_state(state);
255
256    Ok(Some(state))
257}
258
259pub fn save_state(path: &Path, state: &ParallelStateFile) -> Result<()> {
260    if let Some(parent) = path.parent() {
261        std::fs::create_dir_all(parent)
262            .with_context(|| format!("create parallel state dir {}", parent.display()))?;
263    }
264    let rendered = serde_json::to_string_pretty(state).context("serialize parallel state")?;
265    fsutil::write_atomic(path, rendered.as_bytes())
266        .with_context(|| format!("write parallel state {}", path.display()))?;
267    Ok(())
268}
269
270#[cfg(test)]
271mod tests {
272    use super::*;
273    use tempfile::TempDir;
274
275    // =========================================================================
276    // Schema Version and Migration Tests
277    // =========================================================================
278
279    #[test]
280    fn new_state_has_current_schema_version() {
281        let state = ParallelStateFile::new("2026-02-20T00:00:00Z", "main");
282        assert_eq!(state.schema_version, PARALLEL_STATE_SCHEMA_VERSION);
283    }
284
285    #[test]
286    fn state_migration_v2_to_v3() -> Result<()> {
287        let temp = TempDir::new()?;
288        let path = temp.path().join("state.json");
289        let workspace_path = crate::testsupport::path::portable_abs_path("ws");
290
291        // v2 state with legacy fields (will be dropped)
292        let v2_state = serde_json::json!({
293            "schema_version": 2,
294            "started_at": "2026-02-01T00:00:00Z",
295            "base_branch": "main",
296            "merge_method": "squash",
297            "merge_when": "as_created",
298            "tasks_in_flight": [{
299                "task_id": "RQ-0001",
300                "workspace_path": workspace_path,
301                "branch": "b",
302                "pid": 123
303            }],
304            "prs": [{"task_id": "RQ-0001", "pr_number": 5}],
305            "pending_merges": [{
306                "task_id": "RQ-0001",
307                "pr_number": 5,
308                "queued_at": "2026-02-01T00:00:00Z"
309            }]
310        });
311
312        std::fs::write(&path, serde_json::to_string_pretty(&v2_state)?)?;
313
314        let state = load_state(&path)?.expect("state");
315        assert_eq!(state.schema_version, PARALLEL_STATE_SCHEMA_VERSION);
316        // Workers should be empty after migration
317        assert!(state.workers.is_empty());
318        Ok(())
319    }
320
321    // =========================================================================
322    // Worker Record Tests
323    // =========================================================================
324
325    #[test]
326    fn worker_record_lifecycle_transitions() {
327        let workspace_path = crate::testsupport::path::portable_abs_path("ws");
328        let mut worker =
329            WorkerRecord::new("RQ-0001", workspace_path, "2026-02-20T00:00:00Z".into());
330
331        assert!(matches!(worker.lifecycle, WorkerLifecycle::Running));
332        assert!(!worker.is_terminal());
333
334        worker.start_integration();
335        assert!(matches!(worker.lifecycle, WorkerLifecycle::Integrating));
336        assert!(!worker.is_terminal());
337
338        worker.mark_completed("2026-02-20T01:00:00Z".into());
339        assert!(matches!(worker.lifecycle, WorkerLifecycle::Completed));
340        assert!(worker.is_terminal());
341        assert!(worker.completed_at.is_some());
342    }
343
344    #[test]
345    fn worker_record_mark_failed() {
346        let workspace_path = crate::testsupport::path::portable_abs_path("ws");
347        let mut worker =
348            WorkerRecord::new("RQ-0001", workspace_path, "2026-02-20T00:00:00Z".into());
349
350        worker.mark_failed("2026-02-20T01:00:00Z".into(), "CI failed");
351
352        assert!(matches!(worker.lifecycle, WorkerLifecycle::Failed));
353        assert!(worker.is_terminal());
354        assert_eq!(worker.last_error, Some("CI failed".into()));
355    }
356
357    #[test]
358    fn worker_record_mark_blocked() {
359        let workspace_path = crate::testsupport::path::portable_abs_path("ws");
360        let mut worker =
361            WorkerRecord::new("RQ-0001", workspace_path, "2026-02-20T00:00:00Z".into());
362
363        worker.mark_blocked("2026-02-20T01:00:00Z".into(), "merge conflict");
364
365        assert!(matches!(worker.lifecycle, WorkerLifecycle::BlockedPush));
366        assert!(worker.is_terminal());
367        assert_eq!(worker.last_error, Some("merge conflict".into()));
368    }
369
370    #[test]
371    fn worker_record_push_attempts() {
372        let workspace_path = crate::testsupport::path::portable_abs_path("ws");
373        let mut worker =
374            WorkerRecord::new("RQ-0001", workspace_path, "2026-02-20T00:00:00Z".into());
375
376        assert_eq!(worker.push_attempts, 0);
377        worker.increment_push_attempt();
378        assert_eq!(worker.push_attempts, 1);
379        worker.increment_push_attempt();
380        assert_eq!(worker.push_attempts, 2);
381    }
382
383    // =========================================================================
384    // State File Tests
385    // =========================================================================
386
387    #[test]
388    fn state_upsert_worker_replaces_existing() {
389        let mut state = ParallelStateFile::new("2026-02-20T00:00:00Z", "main");
390        let ws1 = crate::testsupport::path::portable_abs_path("ws1");
391        let ws2 = crate::testsupport::path::portable_abs_path("ws2");
392        let ws1_new = crate::testsupport::path::portable_abs_path("ws1-new");
393
394        state.upsert_worker(WorkerRecord::new("RQ-0001", ws1, "t1".into()));
395        state.upsert_worker(WorkerRecord::new("RQ-0002", ws2, "t2".into()));
396
397        // Update RQ-0001 with new path
398        let mut updated = WorkerRecord::new("RQ-0001", ws1_new.clone(), "t1-new".into());
399        updated.start_integration();
400        state.upsert_worker(updated);
401
402        assert_eq!(state.workers.len(), 2);
403        let w1 = state.get_worker("RQ-0001").unwrap();
404        assert_eq!(w1.workspace_path, ws1_new);
405        assert!(matches!(w1.lifecycle, WorkerLifecycle::Integrating));
406    }
407
408    #[test]
409    fn state_remove_worker() {
410        let mut state = ParallelStateFile::new("2026-02-20T00:00:00Z", "main");
411        let ws1 = crate::testsupport::path::portable_abs_path("ws1");
412        let ws2 = crate::testsupport::path::portable_abs_path("ws2");
413
414        state.upsert_worker(WorkerRecord::new("RQ-0001", ws1, "t1".into()));
415        state.upsert_worker(WorkerRecord::new("RQ-0002", ws2, "t2".into()));
416
417        state.remove_worker("RQ-0001");
418
419        assert_eq!(state.workers.len(), 1);
420        assert!(state.get_worker("RQ-0001").is_none());
421        assert!(state.get_worker("RQ-0002").is_some());
422    }
423
424    #[test]
425    fn state_active_worker_count() {
426        let mut state = ParallelStateFile::new("2026-02-20T00:00:00Z", "main");
427        let ws1 = crate::testsupport::path::portable_abs_path("ws1");
428        let ws2 = crate::testsupport::path::portable_abs_path("ws2");
429        let ws3 = crate::testsupport::path::portable_abs_path("ws3");
430
431        let w1 = WorkerRecord::new("RQ-0001", ws1, "t1".into());
432        let mut w2 = WorkerRecord::new("RQ-0002", ws2, "t2".into());
433        let mut w3 = WorkerRecord::new("RQ-0003", ws3, "t3".into());
434
435        w2.mark_completed("t".into());
436        w3.mark_blocked("t".into(), "error");
437
438        state.upsert_worker(w1);
439        state.upsert_worker(w2);
440        state.upsert_worker(w3);
441
442        // Only RQ-0001 is active (not terminal)
443        assert_eq!(state.active_worker_count(), 1);
444    }
445
446    #[test]
447    fn state_round_trips() -> Result<()> {
448        let temp = TempDir::new()?;
449        let path = temp.path().join("state.json");
450        let workspace_path = crate::testsupport::path::portable_abs_path("ws");
451
452        let mut state = ParallelStateFile::new("2026-02-20T00:00:00Z", "main");
453        let mut worker = WorkerRecord::new(
454            "RQ-0001",
455            workspace_path.clone(),
456            "2026-02-20T00:00:00Z".into(),
457        );
458        worker.start_integration();
459        worker.increment_push_attempt();
460        state.upsert_worker(worker);
461
462        save_state(&path, &state)?;
463        let loaded = load_state(&path)?.expect("state");
464
465        assert_eq!(loaded.schema_version, PARALLEL_STATE_SCHEMA_VERSION);
466        assert_eq!(loaded.target_branch, "main");
467        assert_eq!(loaded.workers.len(), 1);
468
469        let w = &loaded.workers[0];
470        assert_eq!(w.task_id, "RQ-0001");
471        assert_eq!(w.workspace_path, workspace_path);
472        assert!(matches!(w.lifecycle, WorkerLifecycle::Integrating));
473        assert_eq!(w.push_attempts, 1);
474
475        Ok(())
476    }
477
478    #[test]
479    fn state_deserialization_ignores_unknown_fields() -> Result<()> {
480        let raw = serde_json::json!({
481            "schema_version": 3,
482            "started_at": "2026-02-20T00:00:00Z",
483            "target_branch": "main",
484            "unknown_top": "ignored",
485            "workers": [{
486                "task_id": "RQ-0001",
487                "workspace_path": crate::testsupport::path::portable_abs_path("ws"),
488                "started_at": "2026-02-20T00:00:00Z",
489                "unknown_worker": "ignored"
490            }]
491        });
492
493        let state: ParallelStateFile = serde_json::from_value(raw)?;
494        assert_eq!(state.workers.len(), 1);
495        assert_eq!(state.workers[0].task_id, "RQ-0001");
496
497        Ok(())
498    }
499}