Skip to main content

ralph/commands/run/parallel/
state.rs

1//! Parallel run state persistence for crash recovery.
2//!
3//! Responsibilities:
4//! - Define the parallel state file format and helpers for direct-push mode.
5//! - Persist and reload state for in-flight workers.
6//!
7//! Not handled here:
8//! - Worker orchestration or process management (see `parallel/mod.rs`).
9//! - Integration loop logic (see `worker.rs`).
10//!
11//! Invariants/assumptions:
12//! - State file lives at `.ralph/cache/parallel/state.json`.
13//! - Callers update and persist state after each significant transition.
14//! - Deserialization is tolerant of missing/unknown fields; callers normalize and persist the canonical shape.
15//! - Schema version migrations are applied on load to ensure compatibility.
16
17use crate::fsutil;
18use anyhow::{Context, Result};
19use serde::{Deserialize, Serialize};
20use std::path::{Path, PathBuf};
21
22// =============================================================================
23// Schema Version and Migration
24// =============================================================================
25
26/// Current parallel state schema version.
27///
28/// Version history:
29/// - v1: Legacy schema with PR metadata and finished_without_pr
30/// - v2: Minimal restart-safe schema with PR records and pending merges
31/// - v3: Direct-push mode - worker lifecycle only, no PR/merge tracking
32pub const PARALLEL_STATE_SCHEMA_VERSION: u32 = 3;
33
34// =============================================================================
35// Worker Lifecycle States
36// =============================================================================
37
38/// Lifecycle states for a parallel worker.
39#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash, Default)]
40#[serde(rename_all = "snake_case")]
41pub enum WorkerLifecycle {
42    /// Worker is running task phases.
43    #[default]
44    Running,
45    /// Worker is in the integration loop (rebase, conflict resolution, push).
46    Integrating,
47    /// Worker completed successfully (push succeeded).
48    Completed,
49    /// Worker failed with a terminal error.
50    Failed,
51    /// Push is blocked (conflicts, CI failure, or non-retryable error).
52    BlockedPush,
53}
54
55/// A worker record tracking task execution and integration.
56#[derive(Debug, Clone, Serialize, Deserialize)]
57pub struct WorkerRecord {
58    /// Task ID associated with this worker.
59    pub task_id: String,
60    /// Absolute path to the workspace directory.
61    pub workspace_path: PathBuf,
62    /// Current lifecycle state.
63    #[serde(default)]
64    pub lifecycle: WorkerLifecycle,
65    /// Timestamp when the worker was started (RFC3339).
66    pub started_at: String,
67    /// Timestamp when the worker completed/failed (RFC3339).
68    #[serde(skip_serializing_if = "Option::is_none")]
69    pub completed_at: Option<String>,
70    /// Number of push attempts made.
71    #[serde(default)]
72    pub push_attempts: u32,
73    /// Last error message if failed/blocked.
74    #[serde(skip_serializing_if = "Option::is_none")]
75    pub last_error: Option<String>,
76}
77
78impl WorkerRecord {
79    pub fn new(task_id: impl Into<String>, workspace_path: PathBuf, started_at: String) -> Self {
80        Self {
81            task_id: task_id.into(),
82            workspace_path,
83            lifecycle: WorkerLifecycle::Running,
84            started_at,
85            completed_at: None,
86            push_attempts: 0,
87            last_error: None,
88        }
89    }
90
91    /// Mark the worker as transitioning to integration phase.
92    pub fn start_integration(&mut self) {
93        self.lifecycle = WorkerLifecycle::Integrating;
94    }
95
96    /// Mark the worker as completed successfully.
97    pub fn mark_completed(&mut self, timestamp: String) {
98        self.lifecycle = WorkerLifecycle::Completed;
99        self.completed_at = Some(timestamp);
100    }
101
102    /// Mark the worker as failed.
103    pub fn mark_failed(&mut self, timestamp: String, error: impl Into<String>) {
104        self.lifecycle = WorkerLifecycle::Failed;
105        self.completed_at = Some(timestamp);
106        self.last_error = Some(error.into());
107    }
108
109    /// Mark the worker as blocked on push.
110    pub fn mark_blocked(&mut self, timestamp: String, error: impl Into<String>) {
111        self.lifecycle = WorkerLifecycle::BlockedPush;
112        self.completed_at = Some(timestamp);
113        self.last_error = Some(error.into());
114    }
115
116    /// Increment push attempt counter.
117    pub fn increment_push_attempt(&mut self) {
118        self.push_attempts += 1;
119    }
120
121    /// Returns true if the worker is in a terminal state.
122    pub fn is_terminal(&self) -> bool {
123        matches!(
124            self.lifecycle,
125            WorkerLifecycle::Completed | WorkerLifecycle::Failed | WorkerLifecycle::BlockedPush
126        )
127    }
128}
129
130// =============================================================================
131// State File
132// =============================================================================
133
134/// Parallel state file for direct-push mode.
135#[derive(Debug, Clone, Serialize, Deserialize)]
136pub struct ParallelStateFile {
137    /// Schema version for migration compatibility.
138    #[serde(default = "default_schema_version")]
139    pub schema_version: u32,
140    /// Timestamp when the parallel run started (RFC3339).
141    #[serde(default)]
142    pub started_at: String,
143    /// Target branch for direct pushes.
144    #[serde(default)]
145    pub target_branch: String,
146    /// Active workers (one per task ID).
147    #[serde(default)]
148    pub workers: Vec<WorkerRecord>,
149}
150
151fn default_schema_version() -> u32 {
152    1
153}
154
155impl ParallelStateFile {
156    pub fn new(started_at: impl Into<String>, target_branch: impl Into<String>) -> Self {
157        Self {
158            schema_version: PARALLEL_STATE_SCHEMA_VERSION,
159            started_at: started_at.into(),
160            target_branch: target_branch.into(),
161            workers: Vec::new(),
162        }
163    }
164
165    /// Upsert a worker record. Replaces existing if task_id matches.
166    pub fn upsert_worker(&mut self, record: WorkerRecord) {
167        if let Some(existing) = self
168            .workers
169            .iter_mut()
170            .find(|w| w.task_id == record.task_id)
171        {
172            *existing = record;
173        } else {
174            self.workers.push(record);
175        }
176    }
177
178    /// Remove a worker by task_id.
179    pub fn remove_worker(&mut self, task_id: &str) {
180        self.workers.retain(|w| w.task_id != task_id);
181    }
182
183    /// Get a worker by task_id.
184    pub fn get_worker(&self, task_id: &str) -> Option<&WorkerRecord> {
185        self.workers.iter().find(|w| w.task_id == task_id)
186    }
187
188    /// Get a mutable worker by task_id.
189    pub fn get_worker_mut(&mut self, task_id: &str) -> Option<&mut WorkerRecord> {
190        self.workers.iter_mut().find(|w| w.task_id == task_id)
191    }
192
193    /// Returns true if there's a worker for this task_id.
194    pub fn has_worker(&self, task_id: &str) -> bool {
195        self.workers.iter().any(|w| w.task_id == task_id)
196    }
197
198    /// Get all workers in a specific lifecycle state.
199    pub fn workers_by_lifecycle(
200        &self,
201        lifecycle: WorkerLifecycle,
202    ) -> impl Iterator<Item = &WorkerRecord> {
203        self.workers
204            .iter()
205            .filter(move |w| w.lifecycle == lifecycle)
206    }
207
208    /// Count workers that are not in a terminal state.
209    pub fn active_worker_count(&self) -> usize {
210        self.workers.iter().filter(|w| !w.is_terminal()).count()
211    }
212
213    /// Count workers in the blocked_push state.
214    pub fn blocked_worker_count(&self) -> usize {
215        self.workers_by_lifecycle(WorkerLifecycle::BlockedPush)
216            .count()
217    }
218}
219
220pub fn state_file_path(repo_root: &Path) -> PathBuf {
221    repo_root.join(".ralph/cache/parallel/state.json")
222}
223
224/// Migrate legacy state to current schema version.
225///
226/// v1/v2 -> v3:
227/// - Drop PR records, pending merges, tasks_in_flight
228/// - Create fresh v3 state with empty workers list
229fn migrate_state(mut state: ParallelStateFile) -> ParallelStateFile {
230    if state.schema_version < PARALLEL_STATE_SCHEMA_VERSION {
231        log::info!(
232            "Migrating parallel state from schema v{} to v{}",
233            state.schema_version,
234            PARALLEL_STATE_SCHEMA_VERSION
235        );
236        // v3 is a clean break - we drop legacy fields and start fresh
237        // Any in-flight work from v1/v2 is lost (should be handled by caller)
238        state.schema_version = PARALLEL_STATE_SCHEMA_VERSION;
239        state.workers.clear();
240    }
241    state
242}
243
244pub fn load_state(path: &Path) -> Result<Option<ParallelStateFile>> {
245    if !path.exists() {
246        return Ok(None);
247    }
248    let raw = std::fs::read_to_string(path)
249        .with_context(|| format!("read parallel state {}", path.display()))?;
250    let state: ParallelStateFile =
251        crate::jsonc::parse_jsonc::<ParallelStateFile>(&raw, "parallel state")?;
252
253    // Apply migrations
254    let state = migrate_state(state);
255
256    Ok(Some(state))
257}
258
259pub fn save_state(path: &Path, state: &ParallelStateFile) -> Result<()> {
260    if let Some(parent) = path.parent() {
261        std::fs::create_dir_all(parent)
262            .with_context(|| format!("create parallel state dir {}", parent.display()))?;
263    }
264    let rendered = serde_json::to_string_pretty(state).context("serialize parallel state")?;
265    fsutil::write_atomic(path, rendered.as_bytes())
266        .with_context(|| format!("write parallel state {}", path.display()))?;
267    Ok(())
268}
269
270#[cfg(test)]
271#[path = "state/tests.rs"]
272mod tests;