Skip to main content

ralph_core/
loop_history.rs

1//! Event-sourced loop history for crash recovery and debugging.
2//!
3//! Each loop maintains an append-only event log in `.ralph/history.jsonl`.
4//! This provides:
5//! - **Crash recovery**: Resume from last known state after crash
6//! - **Debugging**: Replay loop execution to understand failures
7//! - **Auditing**: Complete trace of what happened and when
8//! - **Source of truth**: Registry state can be derived from history
9
10use std::fs::{File, OpenOptions};
11use std::io::{BufRead, BufReader, Write};
12use std::path::{Path, PathBuf};
13
14use chrono::{DateTime, Utc};
15use serde::{Deserialize, Serialize};
16use thiserror::Error;
17
18use crate::file_lock::FileLock;
19
20/// Errors that can occur during history operations.
21#[derive(Debug, Error)]
22pub enum HistoryError {
23    #[error("I/O error: {0}")]
24    Io(#[from] std::io::Error),
25
26    #[error("JSON serialization error: {0}")]
27    Json(#[from] serde_json::Error),
28}
29
30/// A single event in the loop history.
31#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct HistoryEvent {
33    /// Timestamp when the event occurred.
34    #[serde(rename = "ts")]
35    pub timestamp: DateTime<Utc>,
36
37    /// Type of the event.
38    #[serde(rename = "type")]
39    pub event_type: HistoryEventType,
40
41    /// Optional additional data.
42    #[serde(skip_serializing_if = "Option::is_none")]
43    pub data: Option<serde_json::Value>,
44}
45
46impl HistoryEvent {
47    /// Create a new history event with current timestamp.
48    pub fn new(event_type: HistoryEventType) -> Self {
49        Self {
50            timestamp: Utc::now(),
51            event_type,
52            data: None,
53        }
54    }
55
56    /// Create a new history event with data.
57    pub fn with_data(event_type: HistoryEventType, data: serde_json::Value) -> Self {
58        Self {
59            timestamp: Utc::now(),
60            event_type,
61            data: Some(data),
62        }
63    }
64}
65
66/// Types of events that can be recorded in loop history.
67#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
68#[serde(tag = "kind", rename_all = "snake_case")]
69pub enum HistoryEventType {
70    /// Loop started with given prompt.
71    LoopStarted { prompt: String },
72
73    /// Iteration started.
74    IterationStarted { iteration: u32 },
75
76    /// An event was published during the iteration.
77    EventPublished { topic: String, payload: String },
78
79    /// Iteration completed.
80    IterationCompleted { iteration: u32, success: bool },
81
82    /// Loop completed successfully.
83    LoopCompleted { reason: String },
84
85    /// Loop was resumed from a previous state.
86    LoopResumed { from_iteration: u32 },
87
88    /// Loop was terminated (SIGTERM or similar).
89    LoopTerminated { signal: String },
90
91    /// Loop was queued for merge.
92    MergeQueued,
93
94    /// Merge-ralph started.
95    MergeStarted { pid: u32 },
96
97    /// Merge completed successfully.
98    MergeCompleted { commit: String },
99
100    /// Merge failed.
101    MergeFailed { reason: String },
102
103    /// Loop was discarded.
104    LoopDiscarded { reason: String },
105}
106
107/// Loop history manager for a single loop.
108///
109/// Wraps an append-only JSONL file for recording loop events.
110pub struct LoopHistory {
111    path: PathBuf,
112}
113
114impl LoopHistory {
115    /// Create a new loop history at the given path.
116    pub fn new(path: impl AsRef<Path>) -> Self {
117        Self {
118            path: path.as_ref().to_path_buf(),
119        }
120    }
121
122    /// Create a loop history from a loop context.
123    pub fn from_context(context: &crate::LoopContext) -> Self {
124        Self::new(context.history_path())
125    }
126
127    /// Get the path to the history file.
128    pub fn path(&self) -> &Path {
129        &self.path
130    }
131
132    /// Append an event to the history file.
133    ///
134    /// This is thread-safe via file locking.
135    pub fn append(&self, event: HistoryEvent) -> Result<(), HistoryError> {
136        // Ensure parent directory exists
137        if let Some(parent) = self.path.parent() {
138            std::fs::create_dir_all(parent)?;
139        }
140
141        // Acquire exclusive lock
142        let file_lock = FileLock::new(&self.path)?;
143        let _lock = file_lock.exclusive()?;
144
145        // Open file in append mode
146        let mut file = OpenOptions::new()
147            .create(true)
148            .append(true)
149            .open(&self.path)?;
150
151        // Serialize and write
152        let json = serde_json::to_string(&event)?;
153        writeln!(file, "{}", json)?;
154        file.flush()?;
155
156        Ok(())
157    }
158
159    /// Read all events from the history file.
160    pub fn read_all(&self) -> Result<Vec<HistoryEvent>, HistoryError> {
161        if !self.path.exists() {
162            return Ok(Vec::new());
163        }
164
165        // Acquire shared lock
166        let file_lock = FileLock::new(&self.path)?;
167        let _lock = file_lock.shared()?;
168
169        let file = File::open(&self.path)?;
170        let reader = BufReader::new(file);
171
172        let mut events = Vec::new();
173        for line in reader.lines() {
174            let line = line?;
175            if line.trim().is_empty() {
176                continue;
177            }
178
179            // Skip malformed lines (best-effort parsing)
180            if let Ok(event) = serde_json::from_str::<HistoryEvent>(&line) {
181                events.push(event);
182            }
183        }
184
185        Ok(events)
186    }
187
188    /// Find the last completed iteration number.
189    ///
190    /// Returns None if no iterations have been completed.
191    pub fn last_iteration(&self) -> Result<Option<u32>, HistoryError> {
192        let events = self.read_all()?;
193
194        let mut last_completed = None;
195
196        for event in events {
197            if let HistoryEventType::IterationCompleted { iteration, .. } = event.event_type {
198                last_completed = Some(iteration);
199            }
200        }
201
202        Ok(last_completed)
203    }
204
205    /// Check if the loop completed successfully.
206    pub fn is_completed(&self) -> Result<bool, HistoryError> {
207        let events = self.read_all()?;
208
209        for event in events.iter().rev() {
210            match &event.event_type {
211                HistoryEventType::LoopCompleted { .. } => return Ok(true),
212                HistoryEventType::LoopTerminated { .. } => return Ok(false),
213                HistoryEventType::LoopDiscarded { .. } => return Ok(false),
214                _ => {}
215            }
216        }
217
218        Ok(false)
219    }
220
221    /// Get the original prompt that started the loop.
222    pub fn get_prompt(&self) -> Result<Option<String>, HistoryError> {
223        let events = self.read_all()?;
224
225        for event in events {
226            if let HistoryEventType::LoopStarted { prompt } = event.event_type {
227                return Ok(Some(prompt));
228            }
229        }
230
231        Ok(None)
232    }
233
234    /// Get summary statistics about the loop.
235    pub fn summary(&self) -> Result<HistorySummary, HistoryError> {
236        let events = self.read_all()?;
237
238        let mut summary = HistorySummary::default();
239
240        for event in &events {
241            match &event.event_type {
242                HistoryEventType::LoopStarted { prompt } => {
243                    summary.prompt = Some(prompt.clone());
244                    summary.started_at = Some(event.timestamp);
245                }
246                HistoryEventType::IterationCompleted { iteration, success } => {
247                    summary.iterations_completed = *iteration;
248                    if !success {
249                        summary.iterations_failed += 1;
250                    }
251                }
252                HistoryEventType::EventPublished { .. } => {
253                    summary.events_published += 1;
254                }
255                HistoryEventType::LoopCompleted { reason } => {
256                    summary.completed = true;
257                    summary.completion_reason = Some(reason.clone());
258                    summary.ended_at = Some(event.timestamp);
259                }
260                HistoryEventType::LoopTerminated { signal } => {
261                    summary.terminated = true;
262                    summary.termination_signal = Some(signal.clone());
263                    summary.ended_at = Some(event.timestamp);
264                }
265                HistoryEventType::MergeCompleted { commit } => {
266                    summary.merge_commit = Some(commit.clone());
267                }
268                HistoryEventType::MergeFailed { reason } => {
269                    summary.merge_failed = true;
270                    summary.merge_failure_reason = Some(reason.clone());
271                }
272                _ => {}
273            }
274        }
275
276        Ok(summary)
277    }
278
279    /// Record loop started event.
280    pub fn record_started(&self, prompt: &str) -> Result<(), HistoryError> {
281        self.append(HistoryEvent::new(HistoryEventType::LoopStarted {
282            prompt: prompt.to_string(),
283        }))
284    }
285
286    /// Record iteration started event.
287    pub fn record_iteration_started(&self, iteration: u32) -> Result<(), HistoryError> {
288        self.append(HistoryEvent::new(HistoryEventType::IterationStarted {
289            iteration,
290        }))
291    }
292
293    /// Record event published event.
294    pub fn record_event_published(&self, topic: &str, payload: &str) -> Result<(), HistoryError> {
295        self.append(HistoryEvent::new(HistoryEventType::EventPublished {
296            topic: topic.to_string(),
297            payload: payload.to_string(),
298        }))
299    }
300
301    /// Record iteration completed event.
302    pub fn record_iteration_completed(
303        &self,
304        iteration: u32,
305        success: bool,
306    ) -> Result<(), HistoryError> {
307        self.append(HistoryEvent::new(HistoryEventType::IterationCompleted {
308            iteration,
309            success,
310        }))
311    }
312
313    /// Record loop completed event.
314    pub fn record_completed(&self, reason: &str) -> Result<(), HistoryError> {
315        self.append(HistoryEvent::new(HistoryEventType::LoopCompleted {
316            reason: reason.to_string(),
317        }))
318    }
319
320    /// Record loop resumed event.
321    pub fn record_resumed(&self, from_iteration: u32) -> Result<(), HistoryError> {
322        self.append(HistoryEvent::new(HistoryEventType::LoopResumed {
323            from_iteration,
324        }))
325    }
326
327    /// Record loop terminated event.
328    pub fn record_terminated(&self, signal: &str) -> Result<(), HistoryError> {
329        self.append(HistoryEvent::new(HistoryEventType::LoopTerminated {
330            signal: signal.to_string(),
331        }))
332    }
333
334    /// Record merge queued event.
335    pub fn record_merge_queued(&self) -> Result<(), HistoryError> {
336        self.append(HistoryEvent::new(HistoryEventType::MergeQueued))
337    }
338
339    /// Record merge started event.
340    pub fn record_merge_started(&self, pid: u32) -> Result<(), HistoryError> {
341        self.append(HistoryEvent::new(HistoryEventType::MergeStarted { pid }))
342    }
343
344    /// Record merge completed event.
345    pub fn record_merge_completed(&self, commit: &str) -> Result<(), HistoryError> {
346        self.append(HistoryEvent::new(HistoryEventType::MergeCompleted {
347            commit: commit.to_string(),
348        }))
349    }
350
351    /// Record merge failed event.
352    pub fn record_merge_failed(&self, reason: &str) -> Result<(), HistoryError> {
353        self.append(HistoryEvent::new(HistoryEventType::MergeFailed {
354            reason: reason.to_string(),
355        }))
356    }
357
358    /// Record loop discarded event.
359    pub fn record_discarded(&self, reason: &str) -> Result<(), HistoryError> {
360        self.append(HistoryEvent::new(HistoryEventType::LoopDiscarded {
361            reason: reason.to_string(),
362        }))
363    }
364}
365
366/// Summary statistics for a loop history.
367#[derive(Debug, Default)]
368pub struct HistorySummary {
369    /// Original prompt.
370    pub prompt: Option<String>,
371
372    /// When the loop started.
373    pub started_at: Option<DateTime<Utc>>,
374
375    /// When the loop ended.
376    pub ended_at: Option<DateTime<Utc>>,
377
378    /// Number of completed iterations.
379    pub iterations_completed: u32,
380
381    /// Number of failed iterations.
382    pub iterations_failed: u32,
383
384    /// Number of events published.
385    pub events_published: u32,
386
387    /// Whether the loop completed successfully.
388    pub completed: bool,
389
390    /// Completion reason (if completed).
391    pub completion_reason: Option<String>,
392
393    /// Whether the loop was terminated.
394    pub terminated: bool,
395
396    /// Termination signal (if terminated).
397    pub termination_signal: Option<String>,
398
399    /// Merge commit SHA (if merged).
400    pub merge_commit: Option<String>,
401
402    /// Whether merge failed.
403    pub merge_failed: bool,
404
405    /// Merge failure reason (if failed).
406    pub merge_failure_reason: Option<String>,
407}
408
409#[cfg(test)]
410mod tests {
411    use super::*;
412    use tempfile::TempDir;
413
414    fn temp_history() -> (TempDir, LoopHistory) {
415        let dir = TempDir::new().unwrap();
416        let history = LoopHistory::new(dir.path().join("history.jsonl"));
417        (dir, history)
418    }
419
420    #[test]
421    fn test_append_and_read() {
422        let (_dir, history) = temp_history();
423
424        history.record_started("test prompt").unwrap();
425        history.record_iteration_started(1).unwrap();
426        history.record_iteration_completed(1, true).unwrap();
427        history.record_completed("completion_promise").unwrap();
428
429        let events = history.read_all().unwrap();
430        assert_eq!(events.len(), 4);
431
432        assert!(matches!(
433            events[0].event_type,
434            HistoryEventType::LoopStarted { .. }
435        ));
436        assert!(matches!(
437            events[1].event_type,
438            HistoryEventType::IterationStarted { iteration: 1 }
439        ));
440        assert!(matches!(
441            events[2].event_type,
442            HistoryEventType::IterationCompleted {
443                iteration: 1,
444                success: true
445            }
446        ));
447        assert!(matches!(
448            events[3].event_type,
449            HistoryEventType::LoopCompleted { .. }
450        ));
451    }
452
453    #[test]
454    fn test_last_iteration() {
455        let (_dir, history) = temp_history();
456
457        assert_eq!(history.last_iteration().unwrap(), None);
458
459        history.record_started("test").unwrap();
460        history.record_iteration_started(1).unwrap();
461        history.record_iteration_completed(1, true).unwrap();
462        assert_eq!(history.last_iteration().unwrap(), Some(1));
463
464        history.record_iteration_started(2).unwrap();
465        history.record_iteration_completed(2, true).unwrap();
466        assert_eq!(history.last_iteration().unwrap(), Some(2));
467
468        history.record_iteration_started(3).unwrap();
469        history.record_iteration_completed(3, false).unwrap();
470        assert_eq!(history.last_iteration().unwrap(), Some(3));
471    }
472
473    #[test]
474    fn test_is_completed() {
475        let (_dir, history) = temp_history();
476
477        assert!(!history.is_completed().unwrap());
478
479        history.record_started("test").unwrap();
480        assert!(!history.is_completed().unwrap());
481
482        history.record_completed("done").unwrap();
483        assert!(history.is_completed().unwrap());
484    }
485
486    #[test]
487    fn test_is_completed_terminated() {
488        let (_dir, history) = temp_history();
489
490        history.record_started("test").unwrap();
491        history.record_terminated("SIGTERM").unwrap();
492        assert!(!history.is_completed().unwrap());
493    }
494
495    #[test]
496    fn test_get_prompt() {
497        let (_dir, history) = temp_history();
498
499        assert!(history.get_prompt().unwrap().is_none());
500
501        history.record_started("my test prompt").unwrap();
502        assert_eq!(
503            history.get_prompt().unwrap(),
504            Some("my test prompt".to_string())
505        );
506    }
507
508    #[test]
509    fn test_summary() {
510        let (_dir, history) = temp_history();
511
512        history.record_started("test prompt").unwrap();
513        history.record_iteration_started(1).unwrap();
514        history
515            .record_event_published("build.task", "task 1")
516            .unwrap();
517        history.record_iteration_completed(1, true).unwrap();
518        history.record_iteration_started(2).unwrap();
519        history
520            .record_event_published("build.done", "done")
521            .unwrap();
522        history.record_iteration_completed(2, true).unwrap();
523        history.record_completed("completion_promise").unwrap();
524
525        let summary = history.summary().unwrap();
526        assert_eq!(summary.prompt, Some("test prompt".to_string()));
527        assert_eq!(summary.iterations_completed, 2);
528        assert_eq!(summary.events_published, 2);
529        assert!(summary.completed);
530        assert_eq!(
531            summary.completion_reason,
532            Some("completion_promise".to_string())
533        );
534    }
535
536    #[test]
537    fn test_empty_file() {
538        let (_dir, history) = temp_history();
539
540        let events = history.read_all().unwrap();
541        assert!(events.is_empty());
542    }
543
544    #[test]
545    fn test_merge_events() {
546        let (_dir, history) = temp_history();
547
548        history.record_merge_queued().unwrap();
549        history.record_merge_started(12345).unwrap();
550        history.record_merge_completed("abc123").unwrap();
551
552        let events = history.read_all().unwrap();
553        assert_eq!(events.len(), 3);
554
555        assert!(matches!(
556            events[0].event_type,
557            HistoryEventType::MergeQueued
558        ));
559        assert!(matches!(
560            events[1].event_type,
561            HistoryEventType::MergeStarted { pid: 12345 }
562        ));
563        assert!(matches!(
564            events[2].event_type,
565            HistoryEventType::MergeCompleted { .. }
566        ));
567    }
568
569    #[test]
570    fn test_serialization_format() {
571        let event = HistoryEvent::new(HistoryEventType::LoopStarted {
572            prompt: "test".to_string(),
573        });
574
575        let json = serde_json::to_string(&event).unwrap();
576        // Check that it contains expected fields
577        assert!(json.contains("\"ts\""));
578        assert!(json.contains("\"type\""));
579        assert!(json.contains("\"kind\":\"loop_started\""));
580        assert!(json.contains("\"prompt\":\"test\""));
581
582        // Verify it can be deserialized back
583        let parsed: HistoryEvent = serde_json::from_str(&json).unwrap();
584        assert!(matches!(
585            parsed.event_type,
586            HistoryEventType::LoopStarted { prompt } if prompt == "test"
587        ));
588    }
589}