Skip to main content

ralph/commands/run/parallel/
state.rs

1//! Parallel run state persistence for crash recovery.
2//!
3//! Responsibilities:
4//! - Define the parallel state file format and helpers for direct-push mode.
5//! - Persist and reload state for in-flight workers.
6//!
7//! Not handled here:
8//! - Worker orchestration or process management (see `parallel/mod.rs`).
9//! - Integration loop logic (see `worker.rs`).
10//!
11//! Invariants/assumptions:
12//! - State file lives at `.ralph/cache/parallel/state.json`.
13//! - Callers update and persist state after each significant transition.
14//! - Deserialization is tolerant of missing/unknown fields; callers normalize and persist the canonical shape.
15//! - Schema version migrations are applied on load to ensure compatibility.
16
17use crate::fsutil;
18use anyhow::{Context, Result};
19use serde::{Deserialize, Serialize};
20use std::path::{Path, PathBuf};
21
22// =============================================================================
23// Schema Version and Migration
24// =============================================================================
25
26/// Current parallel state schema version.
27///
28/// Version history:
29/// - v1: Legacy schema with PR metadata and finished_without_pr
30/// - v2: Minimal restart-safe schema with PR records and pending merges
31/// - v3: Direct-push mode - worker lifecycle only, no PR/merge tracking
32pub const PARALLEL_STATE_SCHEMA_VERSION: u32 = 3;
33
34// =============================================================================
35// Worker Lifecycle States
36// =============================================================================
37
38/// Lifecycle states for a parallel worker.
39#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash, Default)]
40#[serde(rename_all = "snake_case")]
41pub enum WorkerLifecycle {
42    /// Worker is running task phases.
43    #[default]
44    Running,
45    /// Worker is in the integration loop (rebase, conflict resolution, push).
46    Integrating,
47    /// Worker completed successfully (push succeeded).
48    Completed,
49    /// Worker failed with a terminal error.
50    Failed,
51    /// Push is blocked (conflicts, CI failure, or non-retryable error).
52    BlockedPush,
53}
54
55/// A worker record tracking task execution and integration.
56#[derive(Debug, Clone, Serialize, Deserialize)]
57pub struct WorkerRecord {
58    /// Task ID associated with this worker.
59    pub task_id: String,
60    /// Absolute path to the workspace directory.
61    pub workspace_path: PathBuf,
62    /// Current lifecycle state.
63    #[serde(default)]
64    pub lifecycle: WorkerLifecycle,
65    /// Timestamp when the worker was started (RFC3339).
66    pub started_at: String,
67    /// Timestamp when the worker completed/failed (RFC3339).
68    #[serde(skip_serializing_if = "Option::is_none")]
69    pub completed_at: Option<String>,
70    /// Number of push attempts made.
71    #[serde(default)]
72    pub push_attempts: u32,
73    /// Last error message if failed/blocked.
74    #[serde(skip_serializing_if = "Option::is_none")]
75    pub last_error: Option<String>,
76}
77
78impl WorkerRecord {
79    pub fn new(task_id: impl Into<String>, workspace_path: PathBuf, started_at: String) -> Self {
80        Self {
81            task_id: task_id.into(),
82            workspace_path,
83            lifecycle: WorkerLifecycle::Running,
84            started_at,
85            completed_at: None,
86            push_attempts: 0,
87            last_error: None,
88        }
89    }
90
91    /// Mark the worker as transitioning to integration phase.
92    pub fn start_integration(&mut self) {
93        self.lifecycle = WorkerLifecycle::Integrating;
94    }
95
96    /// Mark the worker as completed successfully.
97    pub fn mark_completed(&mut self, timestamp: String) {
98        self.lifecycle = WorkerLifecycle::Completed;
99        self.completed_at = Some(timestamp);
100    }
101
102    /// Mark the worker as failed.
103    pub fn mark_failed(&mut self, timestamp: String, error: impl Into<String>) {
104        self.lifecycle = WorkerLifecycle::Failed;
105        self.completed_at = Some(timestamp);
106        self.last_error = Some(error.into());
107    }
108
109    /// Mark the worker as blocked on push.
110    pub fn mark_blocked(&mut self, timestamp: String, error: impl Into<String>) {
111        self.lifecycle = WorkerLifecycle::BlockedPush;
112        self.completed_at = Some(timestamp);
113        self.last_error = Some(error.into());
114    }
115
116    /// Increment push attempt counter.
117    pub fn increment_push_attempt(&mut self) {
118        self.push_attempts += 1;
119    }
120
121    /// Returns true if the worker is in a terminal state.
122    pub fn is_terminal(&self) -> bool {
123        matches!(
124            self.lifecycle,
125            WorkerLifecycle::Completed | WorkerLifecycle::Failed | WorkerLifecycle::BlockedPush
126        )
127    }
128}
129
130// =============================================================================
131// State File
132// =============================================================================
133
134/// Parallel state file for direct-push mode.
135#[derive(Debug, Clone, Serialize, Deserialize)]
136pub struct ParallelStateFile {
137    /// Schema version for migration compatibility.
138    #[serde(default = "default_schema_version")]
139    pub schema_version: u32,
140    /// Timestamp when the parallel run started (RFC3339).
141    #[serde(default)]
142    pub started_at: String,
143    /// Target branch for direct pushes.
144    #[serde(default)]
145    pub target_branch: String,
146    /// Active workers (one per task ID).
147    #[serde(default)]
148    pub workers: Vec<WorkerRecord>,
149}
150
151fn default_schema_version() -> u32 {
152    1
153}
154
155impl ParallelStateFile {
156    pub fn new(started_at: impl Into<String>, target_branch: impl Into<String>) -> Self {
157        Self {
158            schema_version: PARALLEL_STATE_SCHEMA_VERSION,
159            started_at: started_at.into(),
160            target_branch: target_branch.into(),
161            workers: Vec::new(),
162        }
163    }
164
165    /// Upsert a worker record. Replaces existing if task_id matches.
166    pub fn upsert_worker(&mut self, record: WorkerRecord) {
167        if let Some(existing) = self
168            .workers
169            .iter_mut()
170            .find(|w| w.task_id == record.task_id)
171        {
172            *existing = record;
173        } else {
174            self.workers.push(record);
175        }
176    }
177
178    /// Remove a worker by task_id.
179    pub fn remove_worker(&mut self, task_id: &str) {
180        self.workers.retain(|w| w.task_id != task_id);
181    }
182
183    /// Get a worker by task_id.
184    pub fn get_worker(&self, task_id: &str) -> Option<&WorkerRecord> {
185        self.workers.iter().find(|w| w.task_id == task_id)
186    }
187
188    /// Get a mutable worker by task_id.
189    pub fn get_worker_mut(&mut self, task_id: &str) -> Option<&mut WorkerRecord> {
190        self.workers.iter_mut().find(|w| w.task_id == task_id)
191    }
192
193    /// Returns true if there's a worker for this task_id.
194    pub fn has_worker(&self, task_id: &str) -> bool {
195        self.workers.iter().any(|w| w.task_id == task_id)
196    }
197
198    /// Get all workers in a specific lifecycle state.
199    pub fn workers_by_lifecycle(
200        &self,
201        lifecycle: WorkerLifecycle,
202    ) -> impl Iterator<Item = &WorkerRecord> {
203        self.workers
204            .iter()
205            .filter(move |w| w.lifecycle == lifecycle)
206    }
207
208    /// Count workers that are not in a terminal state.
209    pub fn active_worker_count(&self) -> usize {
210        self.workers.iter().filter(|w| !w.is_terminal()).count()
211    }
212
213    /// Count workers in the blocked_push state.
214    pub fn blocked_worker_count(&self) -> usize {
215        self.workers_by_lifecycle(WorkerLifecycle::BlockedPush)
216            .count()
217    }
218}
219
220pub fn state_file_path(repo_root: &Path) -> PathBuf {
221    repo_root.join(".ralph/cache/parallel/state.json")
222}
223
224/// Migrate legacy state to current schema version.
225///
226/// v1/v2 -> v3:
227/// - Drop PR records, pending merges, tasks_in_flight
228/// - Create fresh v3 state with empty workers list
229fn migrate_state(mut state: ParallelStateFile) -> ParallelStateFile {
230    if state.schema_version < PARALLEL_STATE_SCHEMA_VERSION {
231        log::info!(
232            "Migrating parallel state from schema v{} to v{}",
233            state.schema_version,
234            PARALLEL_STATE_SCHEMA_VERSION
235        );
236        // v3 is a clean break - we drop legacy fields and start fresh
237        // Any in-flight work from v1/v2 is lost (should be handled by caller)
238        state.schema_version = PARALLEL_STATE_SCHEMA_VERSION;
239        state.workers.clear();
240    }
241    state
242}
243
244pub fn load_state(path: &Path) -> Result<Option<ParallelStateFile>> {
245    if !path.exists() {
246        return Ok(None);
247    }
248    let raw = std::fs::read_to_string(path)
249        .with_context(|| format!("read parallel state {}", path.display()))?;
250    let state: ParallelStateFile =
251        crate::jsonc::parse_jsonc::<ParallelStateFile>(&raw, "parallel state")?;
252
253    // Apply migrations
254    let state = migrate_state(state);
255
256    Ok(Some(state))
257}
258
259pub fn save_state(path: &Path, state: &ParallelStateFile) -> Result<()> {
260    if let Some(parent) = path.parent() {
261        std::fs::create_dir_all(parent)
262            .with_context(|| format!("create parallel state dir {}", parent.display()))?;
263    }
264    let rendered = serde_json::to_string_pretty(state).context("serialize parallel state")?;
265    fsutil::write_atomic(path, rendered.as_bytes())
266        .with_context(|| format!("write parallel state {}", path.display()))?;
267    Ok(())
268}
269
270#[cfg(test)]
271mod tests {
272    use super::*;
273    use tempfile::TempDir;
274
275    // =========================================================================
276    // Schema Version and Migration Tests
277    // =========================================================================
278
279    #[test]
280    fn new_state_has_current_schema_version() {
281        let state = ParallelStateFile::new("2026-02-20T00:00:00Z", "main");
282        assert_eq!(state.schema_version, PARALLEL_STATE_SCHEMA_VERSION);
283    }
284
285    #[test]
286    fn state_migration_v2_to_v3() -> Result<()> {
287        let temp = TempDir::new()?;
288        let path = temp.path().join("state.json");
289
290        // v2 state with legacy fields (will be dropped)
291        let v2_state = r#"{
292            "schema_version": 2,
293            "started_at": "2026-02-01T00:00:00Z",
294            "base_branch": "main",
295            "merge_method": "squash",
296            "merge_when": "as_created",
297            "tasks_in_flight": [{"task_id": "RQ-0001", "workspace_path": "/tmp/ws", "branch": "b", "pid": 123}],
298            "prs": [{"task_id": "RQ-0001", "pr_number": 5}],
299            "pending_merges": [{"task_id": "RQ-0001", "pr_number": 5, "queued_at": "2026-02-01T00:00:00Z"}]
300        }"#;
301
302        std::fs::write(&path, v2_state)?;
303
304        let state = load_state(&path)?.expect("state");
305        assert_eq!(state.schema_version, PARALLEL_STATE_SCHEMA_VERSION);
306        // Workers should be empty after migration
307        assert!(state.workers.is_empty());
308        Ok(())
309    }
310
311    // =========================================================================
312    // Worker Record Tests
313    // =========================================================================
314
315    #[test]
316    fn worker_record_lifecycle_transitions() {
317        let mut worker = WorkerRecord::new(
318            "RQ-0001",
319            PathBuf::from("/tmp/ws"),
320            "2026-02-20T00:00:00Z".into(),
321        );
322
323        assert!(matches!(worker.lifecycle, WorkerLifecycle::Running));
324        assert!(!worker.is_terminal());
325
326        worker.start_integration();
327        assert!(matches!(worker.lifecycle, WorkerLifecycle::Integrating));
328        assert!(!worker.is_terminal());
329
330        worker.mark_completed("2026-02-20T01:00:00Z".into());
331        assert!(matches!(worker.lifecycle, WorkerLifecycle::Completed));
332        assert!(worker.is_terminal());
333        assert!(worker.completed_at.is_some());
334    }
335
336    #[test]
337    fn worker_record_mark_failed() {
338        let mut worker = WorkerRecord::new(
339            "RQ-0001",
340            PathBuf::from("/tmp/ws"),
341            "2026-02-20T00:00:00Z".into(),
342        );
343
344        worker.mark_failed("2026-02-20T01:00:00Z".into(), "CI failed");
345
346        assert!(matches!(worker.lifecycle, WorkerLifecycle::Failed));
347        assert!(worker.is_terminal());
348        assert_eq!(worker.last_error, Some("CI failed".into()));
349    }
350
351    #[test]
352    fn worker_record_mark_blocked() {
353        let mut worker = WorkerRecord::new(
354            "RQ-0001",
355            PathBuf::from("/tmp/ws"),
356            "2026-02-20T00:00:00Z".into(),
357        );
358
359        worker.mark_blocked("2026-02-20T01:00:00Z".into(), "merge conflict");
360
361        assert!(matches!(worker.lifecycle, WorkerLifecycle::BlockedPush));
362        assert!(worker.is_terminal());
363        assert_eq!(worker.last_error, Some("merge conflict".into()));
364    }
365
366    #[test]
367    fn worker_record_push_attempts() {
368        let mut worker = WorkerRecord::new(
369            "RQ-0001",
370            PathBuf::from("/tmp/ws"),
371            "2026-02-20T00:00:00Z".into(),
372        );
373
374        assert_eq!(worker.push_attempts, 0);
375        worker.increment_push_attempt();
376        assert_eq!(worker.push_attempts, 1);
377        worker.increment_push_attempt();
378        assert_eq!(worker.push_attempts, 2);
379    }
380
381    // =========================================================================
382    // State File Tests
383    // =========================================================================
384
385    #[test]
386    fn state_upsert_worker_replaces_existing() {
387        let mut state = ParallelStateFile::new("2026-02-20T00:00:00Z", "main");
388
389        state.upsert_worker(WorkerRecord::new(
390            "RQ-0001",
391            PathBuf::from("/tmp/ws1"),
392            "t1".into(),
393        ));
394        state.upsert_worker(WorkerRecord::new(
395            "RQ-0002",
396            PathBuf::from("/tmp/ws2"),
397            "t2".into(),
398        ));
399
400        // Update RQ-0001 with new path
401        let mut updated =
402            WorkerRecord::new("RQ-0001", PathBuf::from("/tmp/ws1-new"), "t1-new".into());
403        updated.start_integration();
404        state.upsert_worker(updated);
405
406        assert_eq!(state.workers.len(), 2);
407        let w1 = state.get_worker("RQ-0001").unwrap();
408        assert_eq!(w1.workspace_path, PathBuf::from("/tmp/ws1-new"));
409        assert!(matches!(w1.lifecycle, WorkerLifecycle::Integrating));
410    }
411
412    #[test]
413    fn state_remove_worker() {
414        let mut state = ParallelStateFile::new("2026-02-20T00:00:00Z", "main");
415
416        state.upsert_worker(WorkerRecord::new(
417            "RQ-0001",
418            PathBuf::from("/tmp/ws1"),
419            "t1".into(),
420        ));
421        state.upsert_worker(WorkerRecord::new(
422            "RQ-0002",
423            PathBuf::from("/tmp/ws2"),
424            "t2".into(),
425        ));
426
427        state.remove_worker("RQ-0001");
428
429        assert_eq!(state.workers.len(), 1);
430        assert!(state.get_worker("RQ-0001").is_none());
431        assert!(state.get_worker("RQ-0002").is_some());
432    }
433
434    #[test]
435    fn state_active_worker_count() {
436        let mut state = ParallelStateFile::new("2026-02-20T00:00:00Z", "main");
437
438        let w1 = WorkerRecord::new("RQ-0001", PathBuf::from("/tmp/ws1"), "t1".into());
439        let mut w2 = WorkerRecord::new("RQ-0002", PathBuf::from("/tmp/ws2"), "t2".into());
440        let mut w3 = WorkerRecord::new("RQ-0003", PathBuf::from("/tmp/ws3"), "t3".into());
441
442        w2.mark_completed("t".into());
443        w3.mark_blocked("t".into(), "error");
444
445        state.upsert_worker(w1);
446        state.upsert_worker(w2);
447        state.upsert_worker(w3);
448
449        // Only RQ-0001 is active (not terminal)
450        assert_eq!(state.active_worker_count(), 1);
451    }
452
453    #[test]
454    fn state_round_trips() -> Result<()> {
455        let temp = TempDir::new()?;
456        let path = temp.path().join("state.json");
457
458        let mut state = ParallelStateFile::new("2026-02-20T00:00:00Z", "main");
459        let mut worker = WorkerRecord::new(
460            "RQ-0001",
461            PathBuf::from("/tmp/ws"),
462            "2026-02-20T00:00:00Z".into(),
463        );
464        worker.start_integration();
465        worker.increment_push_attempt();
466        state.upsert_worker(worker);
467
468        save_state(&path, &state)?;
469        let loaded = load_state(&path)?.expect("state");
470
471        assert_eq!(loaded.schema_version, PARALLEL_STATE_SCHEMA_VERSION);
472        assert_eq!(loaded.target_branch, "main");
473        assert_eq!(loaded.workers.len(), 1);
474
475        let w = &loaded.workers[0];
476        assert_eq!(w.task_id, "RQ-0001");
477        assert_eq!(w.workspace_path, PathBuf::from("/tmp/ws"));
478        assert!(matches!(w.lifecycle, WorkerLifecycle::Integrating));
479        assert_eq!(w.push_attempts, 1);
480
481        Ok(())
482    }
483
484    #[test]
485    fn state_deserialization_ignores_unknown_fields() -> Result<()> {
486        let raw = r#"{
487            "schema_version": 3,
488            "started_at": "2026-02-20T00:00:00Z",
489            "target_branch": "main",
490            "unknown_top": "ignored",
491            "workers": [{
492                "task_id": "RQ-0001",
493                "workspace_path": "/tmp/ws",
494                "started_at": "2026-02-20T00:00:00Z",
495                "unknown_worker": "ignored"
496            }]
497        }"#;
498
499        let state: ParallelStateFile = serde_json::from_str(raw)?;
500        assert_eq!(state.workers.len(), 1);
501        assert_eq!(state.workers[0].task_id, "RQ-0001");
502
503        Ok(())
504    }
505}