ralph/commands/run/parallel/
state.rs1use crate::fsutil;
18use anyhow::{Context, Result};
19use serde::{Deserialize, Serialize};
20use std::path::{Path, PathBuf};
21
22pub const PARALLEL_STATE_SCHEMA_VERSION: u32 = 3;
33
34#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash, Default)]
40#[serde(rename_all = "snake_case")]
41pub enum WorkerLifecycle {
42 #[default]
44 Running,
45 Integrating,
47 Completed,
49 Failed,
51 BlockedPush,
53}
54
55#[derive(Debug, Clone, Serialize, Deserialize)]
57pub struct WorkerRecord {
58 pub task_id: String,
60 pub workspace_path: PathBuf,
62 #[serde(default)]
64 pub lifecycle: WorkerLifecycle,
65 pub started_at: String,
67 #[serde(skip_serializing_if = "Option::is_none")]
69 pub completed_at: Option<String>,
70 #[serde(default)]
72 pub push_attempts: u32,
73 #[serde(skip_serializing_if = "Option::is_none")]
75 pub last_error: Option<String>,
76}
77
78impl WorkerRecord {
79 pub fn new(task_id: impl Into<String>, workspace_path: PathBuf, started_at: String) -> Self {
80 Self {
81 task_id: task_id.into(),
82 workspace_path,
83 lifecycle: WorkerLifecycle::Running,
84 started_at,
85 completed_at: None,
86 push_attempts: 0,
87 last_error: None,
88 }
89 }
90
91 pub fn start_integration(&mut self) {
93 self.lifecycle = WorkerLifecycle::Integrating;
94 }
95
96 pub fn mark_completed(&mut self, timestamp: String) {
98 self.lifecycle = WorkerLifecycle::Completed;
99 self.completed_at = Some(timestamp);
100 }
101
102 pub fn mark_failed(&mut self, timestamp: String, error: impl Into<String>) {
104 self.lifecycle = WorkerLifecycle::Failed;
105 self.completed_at = Some(timestamp);
106 self.last_error = Some(error.into());
107 }
108
109 pub fn mark_blocked(&mut self, timestamp: String, error: impl Into<String>) {
111 self.lifecycle = WorkerLifecycle::BlockedPush;
112 self.completed_at = Some(timestamp);
113 self.last_error = Some(error.into());
114 }
115
116 pub fn increment_push_attempt(&mut self) {
118 self.push_attempts += 1;
119 }
120
121 pub fn is_terminal(&self) -> bool {
123 matches!(
124 self.lifecycle,
125 WorkerLifecycle::Completed | WorkerLifecycle::Failed | WorkerLifecycle::BlockedPush
126 )
127 }
128}
129
130#[derive(Debug, Clone, Serialize, Deserialize)]
136pub struct ParallelStateFile {
137 #[serde(default = "default_schema_version")]
139 pub schema_version: u32,
140 #[serde(default)]
142 pub started_at: String,
143 #[serde(default)]
145 pub target_branch: String,
146 #[serde(default)]
148 pub workers: Vec<WorkerRecord>,
149}
150
151fn default_schema_version() -> u32 {
152 1
153}
154
155impl ParallelStateFile {
156 pub fn new(started_at: impl Into<String>, target_branch: impl Into<String>) -> Self {
157 Self {
158 schema_version: PARALLEL_STATE_SCHEMA_VERSION,
159 started_at: started_at.into(),
160 target_branch: target_branch.into(),
161 workers: Vec::new(),
162 }
163 }
164
165 pub fn upsert_worker(&mut self, record: WorkerRecord) {
167 if let Some(existing) = self
168 .workers
169 .iter_mut()
170 .find(|w| w.task_id == record.task_id)
171 {
172 *existing = record;
173 } else {
174 self.workers.push(record);
175 }
176 }
177
178 pub fn remove_worker(&mut self, task_id: &str) {
180 self.workers.retain(|w| w.task_id != task_id);
181 }
182
183 pub fn get_worker(&self, task_id: &str) -> Option<&WorkerRecord> {
185 self.workers.iter().find(|w| w.task_id == task_id)
186 }
187
188 pub fn get_worker_mut(&mut self, task_id: &str) -> Option<&mut WorkerRecord> {
190 self.workers.iter_mut().find(|w| w.task_id == task_id)
191 }
192
193 pub fn has_worker(&self, task_id: &str) -> bool {
195 self.workers.iter().any(|w| w.task_id == task_id)
196 }
197
198 pub fn workers_by_lifecycle(
200 &self,
201 lifecycle: WorkerLifecycle,
202 ) -> impl Iterator<Item = &WorkerRecord> {
203 self.workers
204 .iter()
205 .filter(move |w| w.lifecycle == lifecycle)
206 }
207
208 pub fn active_worker_count(&self) -> usize {
210 self.workers.iter().filter(|w| !w.is_terminal()).count()
211 }
212
213 pub fn blocked_worker_count(&self) -> usize {
215 self.workers_by_lifecycle(WorkerLifecycle::BlockedPush)
216 .count()
217 }
218}
219
220pub fn state_file_path(repo_root: &Path) -> PathBuf {
221 repo_root.join(".ralph/cache/parallel/state.json")
222}
223
224fn migrate_state(mut state: ParallelStateFile) -> ParallelStateFile {
230 if state.schema_version < PARALLEL_STATE_SCHEMA_VERSION {
231 log::info!(
232 "Migrating parallel state from schema v{} to v{}",
233 state.schema_version,
234 PARALLEL_STATE_SCHEMA_VERSION
235 );
236 state.schema_version = PARALLEL_STATE_SCHEMA_VERSION;
239 state.workers.clear();
240 }
241 state
242}
243
244pub fn load_state(path: &Path) -> Result<Option<ParallelStateFile>> {
245 if !path.exists() {
246 return Ok(None);
247 }
248 let raw = std::fs::read_to_string(path)
249 .with_context(|| format!("read parallel state {}", path.display()))?;
250 let state: ParallelStateFile =
251 crate::jsonc::parse_jsonc::<ParallelStateFile>(&raw, "parallel state")?;
252
253 let state = migrate_state(state);
255
256 Ok(Some(state))
257}
258
259pub fn save_state(path: &Path, state: &ParallelStateFile) -> Result<()> {
260 if let Some(parent) = path.parent() {
261 std::fs::create_dir_all(parent)
262 .with_context(|| format!("create parallel state dir {}", parent.display()))?;
263 }
264 let rendered = serde_json::to_string_pretty(state).context("serialize parallel state")?;
265 fsutil::write_atomic(path, rendered.as_bytes())
266 .with_context(|| format!("write parallel state {}", path.display()))?;
267 Ok(())
268}
269
270#[cfg(test)]
271mod tests {
272 use super::*;
273 use tempfile::TempDir;
274
275 #[test]
280 fn new_state_has_current_schema_version() {
281 let state = ParallelStateFile::new("2026-02-20T00:00:00Z", "main");
282 assert_eq!(state.schema_version, PARALLEL_STATE_SCHEMA_VERSION);
283 }
284
285 #[test]
286 fn state_migration_v2_to_v3() -> Result<()> {
287 let temp = TempDir::new()?;
288 let path = temp.path().join("state.json");
289 let workspace_path = crate::testsupport::path::portable_abs_path("ws");
290
291 let v2_state = serde_json::json!({
293 "schema_version": 2,
294 "started_at": "2026-02-01T00:00:00Z",
295 "base_branch": "main",
296 "merge_method": "squash",
297 "merge_when": "as_created",
298 "tasks_in_flight": [{
299 "task_id": "RQ-0001",
300 "workspace_path": workspace_path,
301 "branch": "b",
302 "pid": 123
303 }],
304 "prs": [{"task_id": "RQ-0001", "pr_number": 5}],
305 "pending_merges": [{
306 "task_id": "RQ-0001",
307 "pr_number": 5,
308 "queued_at": "2026-02-01T00:00:00Z"
309 }]
310 });
311
312 std::fs::write(&path, serde_json::to_string_pretty(&v2_state)?)?;
313
314 let state = load_state(&path)?.expect("state");
315 assert_eq!(state.schema_version, PARALLEL_STATE_SCHEMA_VERSION);
316 assert!(state.workers.is_empty());
318 Ok(())
319 }
320
321 #[test]
326 fn worker_record_lifecycle_transitions() {
327 let workspace_path = crate::testsupport::path::portable_abs_path("ws");
328 let mut worker =
329 WorkerRecord::new("RQ-0001", workspace_path, "2026-02-20T00:00:00Z".into());
330
331 assert!(matches!(worker.lifecycle, WorkerLifecycle::Running));
332 assert!(!worker.is_terminal());
333
334 worker.start_integration();
335 assert!(matches!(worker.lifecycle, WorkerLifecycle::Integrating));
336 assert!(!worker.is_terminal());
337
338 worker.mark_completed("2026-02-20T01:00:00Z".into());
339 assert!(matches!(worker.lifecycle, WorkerLifecycle::Completed));
340 assert!(worker.is_terminal());
341 assert!(worker.completed_at.is_some());
342 }
343
344 #[test]
345 fn worker_record_mark_failed() {
346 let workspace_path = crate::testsupport::path::portable_abs_path("ws");
347 let mut worker =
348 WorkerRecord::new("RQ-0001", workspace_path, "2026-02-20T00:00:00Z".into());
349
350 worker.mark_failed("2026-02-20T01:00:00Z".into(), "CI failed");
351
352 assert!(matches!(worker.lifecycle, WorkerLifecycle::Failed));
353 assert!(worker.is_terminal());
354 assert_eq!(worker.last_error, Some("CI failed".into()));
355 }
356
357 #[test]
358 fn worker_record_mark_blocked() {
359 let workspace_path = crate::testsupport::path::portable_abs_path("ws");
360 let mut worker =
361 WorkerRecord::new("RQ-0001", workspace_path, "2026-02-20T00:00:00Z".into());
362
363 worker.mark_blocked("2026-02-20T01:00:00Z".into(), "merge conflict");
364
365 assert!(matches!(worker.lifecycle, WorkerLifecycle::BlockedPush));
366 assert!(worker.is_terminal());
367 assert_eq!(worker.last_error, Some("merge conflict".into()));
368 }
369
370 #[test]
371 fn worker_record_push_attempts() {
372 let workspace_path = crate::testsupport::path::portable_abs_path("ws");
373 let mut worker =
374 WorkerRecord::new("RQ-0001", workspace_path, "2026-02-20T00:00:00Z".into());
375
376 assert_eq!(worker.push_attempts, 0);
377 worker.increment_push_attempt();
378 assert_eq!(worker.push_attempts, 1);
379 worker.increment_push_attempt();
380 assert_eq!(worker.push_attempts, 2);
381 }
382
383 #[test]
388 fn state_upsert_worker_replaces_existing() {
389 let mut state = ParallelStateFile::new("2026-02-20T00:00:00Z", "main");
390 let ws1 = crate::testsupport::path::portable_abs_path("ws1");
391 let ws2 = crate::testsupport::path::portable_abs_path("ws2");
392 let ws1_new = crate::testsupport::path::portable_abs_path("ws1-new");
393
394 state.upsert_worker(WorkerRecord::new("RQ-0001", ws1, "t1".into()));
395 state.upsert_worker(WorkerRecord::new("RQ-0002", ws2, "t2".into()));
396
397 let mut updated = WorkerRecord::new("RQ-0001", ws1_new.clone(), "t1-new".into());
399 updated.start_integration();
400 state.upsert_worker(updated);
401
402 assert_eq!(state.workers.len(), 2);
403 let w1 = state.get_worker("RQ-0001").unwrap();
404 assert_eq!(w1.workspace_path, ws1_new);
405 assert!(matches!(w1.lifecycle, WorkerLifecycle::Integrating));
406 }
407
408 #[test]
409 fn state_remove_worker() {
410 let mut state = ParallelStateFile::new("2026-02-20T00:00:00Z", "main");
411 let ws1 = crate::testsupport::path::portable_abs_path("ws1");
412 let ws2 = crate::testsupport::path::portable_abs_path("ws2");
413
414 state.upsert_worker(WorkerRecord::new("RQ-0001", ws1, "t1".into()));
415 state.upsert_worker(WorkerRecord::new("RQ-0002", ws2, "t2".into()));
416
417 state.remove_worker("RQ-0001");
418
419 assert_eq!(state.workers.len(), 1);
420 assert!(state.get_worker("RQ-0001").is_none());
421 assert!(state.get_worker("RQ-0002").is_some());
422 }
423
424 #[test]
425 fn state_active_worker_count() {
426 let mut state = ParallelStateFile::new("2026-02-20T00:00:00Z", "main");
427 let ws1 = crate::testsupport::path::portable_abs_path("ws1");
428 let ws2 = crate::testsupport::path::portable_abs_path("ws2");
429 let ws3 = crate::testsupport::path::portable_abs_path("ws3");
430
431 let w1 = WorkerRecord::new("RQ-0001", ws1, "t1".into());
432 let mut w2 = WorkerRecord::new("RQ-0002", ws2, "t2".into());
433 let mut w3 = WorkerRecord::new("RQ-0003", ws3, "t3".into());
434
435 w2.mark_completed("t".into());
436 w3.mark_blocked("t".into(), "error");
437
438 state.upsert_worker(w1);
439 state.upsert_worker(w2);
440 state.upsert_worker(w3);
441
442 assert_eq!(state.active_worker_count(), 1);
444 }
445
446 #[test]
447 fn state_round_trips() -> Result<()> {
448 let temp = TempDir::new()?;
449 let path = temp.path().join("state.json");
450 let workspace_path = crate::testsupport::path::portable_abs_path("ws");
451
452 let mut state = ParallelStateFile::new("2026-02-20T00:00:00Z", "main");
453 let mut worker = WorkerRecord::new(
454 "RQ-0001",
455 workspace_path.clone(),
456 "2026-02-20T00:00:00Z".into(),
457 );
458 worker.start_integration();
459 worker.increment_push_attempt();
460 state.upsert_worker(worker);
461
462 save_state(&path, &state)?;
463 let loaded = load_state(&path)?.expect("state");
464
465 assert_eq!(loaded.schema_version, PARALLEL_STATE_SCHEMA_VERSION);
466 assert_eq!(loaded.target_branch, "main");
467 assert_eq!(loaded.workers.len(), 1);
468
469 let w = &loaded.workers[0];
470 assert_eq!(w.task_id, "RQ-0001");
471 assert_eq!(w.workspace_path, workspace_path);
472 assert!(matches!(w.lifecycle, WorkerLifecycle::Integrating));
473 assert_eq!(w.push_attempts, 1);
474
475 Ok(())
476 }
477
478 #[test]
479 fn state_deserialization_ignores_unknown_fields() -> Result<()> {
480 let raw = serde_json::json!({
481 "schema_version": 3,
482 "started_at": "2026-02-20T00:00:00Z",
483 "target_branch": "main",
484 "unknown_top": "ignored",
485 "workers": [{
486 "task_id": "RQ-0001",
487 "workspace_path": crate::testsupport::path::portable_abs_path("ws"),
488 "started_at": "2026-02-20T00:00:00Z",
489 "unknown_worker": "ignored"
490 }]
491 });
492
493 let state: ParallelStateFile = serde_json::from_value(raw)?;
494 assert_eq!(state.workers.len(), 1);
495 assert_eq!(state.workers[0].task_id, "RQ-0001");
496
497 Ok(())
498 }
499}