Skip to main content

ralph_core/
event_logger.rs

1//! Event logging for debugging and post-mortem analysis.
2//!
3//! Logs all events to `.ralph/events.jsonl` as specified in the event-loop spec.
4//! The observer pattern allows hooking into the event bus without modifying routing.
5
6use crate::loop_context::LoopContext;
7use crate::text::floor_char_boundary;
8use ralph_proto::{Event, HatId};
9use serde::{Deserialize, Deserializer, Serialize};
10use std::fs::{self, File, OpenOptions};
11use std::io::{BufRead, BufReader, Write};
12use std::path::{Path, PathBuf};
13use tracing::{debug, warn};
14
15/// Custom deserializer that accepts both String and structured JSON payloads.
16///
17/// Agents sometimes write structured data as JSON objects instead of strings.
18/// This deserializer accepts both formats:
19/// - `"payload": "string"` → `"string"`
20/// - `"payload": {...}` → `"{...}"` (serialized to JSON string)
21/// - `"payload": null` or missing → `""` (empty string)
22fn deserialize_flexible_payload<'de, D>(deserializer: D) -> Result<String, D::Error>
23where
24    D: Deserializer<'de>,
25{
26    #[derive(Deserialize)]
27    #[serde(untagged)]
28    enum FlexiblePayload {
29        String(String),
30        Object(serde_json::Value),
31    }
32
33    let opt = Option::<FlexiblePayload>::deserialize(deserializer)?;
34    Ok(opt
35        .map(|flex| match flex {
36            FlexiblePayload::String(s) => s,
37            FlexiblePayload::Object(serde_json::Value::Null) => String::new(),
38            FlexiblePayload::Object(obj) => {
39                // Serialize the object back to a JSON string
40                serde_json::to_string(&obj).unwrap_or_else(|_| obj.to_string())
41            }
42        })
43        .unwrap_or_default())
44}
45
46/// A logged event record for debugging.
47///
48/// Supports two schemas:
49/// 1. Rich internal format (logged by Ralph):
50///    `{"ts":"2024-01-15T10:23:45Z","iteration":1,"hat":"loop","topic":"task.start","triggered":"planner","payload":"..."}`
51/// 2. Simple agent format (written by agents):
52///    `{"topic":"build.task","payload":"...","ts":"2024-01-15T10:24:12Z"}`
53///
54/// Fields that don't exist in the agent format default to sensible values.
55#[derive(Debug, Clone, Serialize, Deserialize)]
56pub struct EventRecord {
57    /// ISO 8601 timestamp.
58    pub ts: String,
59
60    /// Loop iteration number (0 if not provided by agent-written events).
61    #[serde(default)]
62    pub iteration: u32,
63
64    /// Hat that was active when event was published (empty string if not provided).
65    #[serde(default)]
66    pub hat: String,
67
68    /// Event topic.
69    pub topic: String,
70
71    /// Hat that will be triggered by this event.
72    #[serde(skip_serializing_if = "Option::is_none")]
73    pub triggered: Option<String>,
74
75    /// Event content (truncated if large). Defaults to empty string for agent events without payload.
76    /// Accepts both string and object payloads - objects are serialized to JSON strings.
77    #[serde(default, deserialize_with = "deserialize_flexible_payload")]
78    pub payload: String,
79
80    /// How many times this task has blocked (optional).
81    #[serde(skip_serializing_if = "Option::is_none")]
82    pub blocked_count: Option<u32>,
83}
84
85impl EventRecord {
86    /// Maximum payload length before truncation.
87    const MAX_PAYLOAD_LEN: usize = 500;
88
89    /// Creates a new event record.
90    pub fn new(
91        iteration: u32,
92        hat: impl Into<String>,
93        event: &Event,
94        triggered: Option<&HatId>,
95    ) -> Self {
96        let payload = if event.payload.len() > Self::MAX_PAYLOAD_LEN {
97            // Find a valid UTF-8 char boundary at or before MAX_PAYLOAD_LEN.
98            let truncate_at = floor_char_boundary(&event.payload, Self::MAX_PAYLOAD_LEN);
99            format!(
100                "{}... [truncated, {} chars total]",
101                &event.payload[..truncate_at],
102                event.payload.chars().count()
103            )
104        } else {
105            event.payload.clone()
106        };
107
108        Self {
109            ts: chrono::Utc::now().to_rfc3339(),
110            iteration,
111            hat: hat.into(),
112            topic: event.topic.to_string(),
113            triggered: triggered.map(|h| h.to_string()),
114            payload,
115            blocked_count: None,
116        }
117    }
118
119    /// Sets the blocked count for this record.
120    pub fn with_blocked_count(mut self, count: u32) -> Self {
121        self.blocked_count = Some(count);
122        self
123    }
124}
125
126/// Logger that writes events to a JSONL file.
127pub struct EventLogger {
128    /// Path to the events file.
129    path: PathBuf,
130
131    /// File handle for appending.
132    file: Option<File>,
133}
134
135impl EventLogger {
136    /// Default path for the events file.
137    pub const DEFAULT_PATH: &'static str = ".ralph/events.jsonl";
138
139    /// Creates a new event logger.
140    ///
141    /// The `.ralph/` directory is created if it doesn't exist.
142    pub fn new(path: impl Into<PathBuf>) -> Self {
143        Self {
144            path: path.into(),
145            file: None,
146        }
147    }
148
149    /// Creates a logger with the default path.
150    pub fn default_path() -> Self {
151        Self::new(Self::DEFAULT_PATH)
152    }
153
154    /// Creates a logger using the events path from a LoopContext.
155    ///
156    /// This reads the timestamped events path from the marker file if it exists,
157    /// falling back to the default events path. This ensures the logger writes
158    /// to the correct location when running in a worktree or other isolated workspace.
159    pub fn from_context(context: &LoopContext) -> Self {
160        // Read timestamped events path from marker file, fall back to default
161        // The marker file contains a relative path like ".ralph/events-20260127-123456.jsonl"
162        // which we resolve relative to the workspace root
163        let events_path = std::fs::read_to_string(context.current_events_marker())
164            .map(|s| {
165                let relative = s.trim();
166                context.workspace().join(relative)
167            })
168            .unwrap_or_else(|_| context.events_path());
169        Self::new(events_path)
170    }
171
172    /// Ensures the parent directory exists and opens the file.
173    fn ensure_open(&mut self) -> std::io::Result<&mut File> {
174        if self.file.is_none() {
175            if let Some(parent) = self.path.parent() {
176                fs::create_dir_all(parent)?;
177            }
178            let file = OpenOptions::new()
179                .create(true)
180                .append(true)
181                .open(&self.path)?;
182            self.file = Some(file);
183        }
184        Ok(self.file.as_mut().unwrap())
185    }
186
187    /// Logs an event record.
188    ///
189    /// Uses a single `write_all` call to ensure the JSON line is written atomically.
190    /// This prevents corruption when multiple processes append to the same file
191    /// concurrently (e.g., during parallel merge queue processing).
192    pub fn log(&mut self, record: &EventRecord) -> std::io::Result<()> {
193        let file = self.ensure_open()?;
194        let mut json = serde_json::to_string(record)?;
195        json.push('\n');
196        // Single write_all ensures atomic append on POSIX with O_APPEND
197        file.write_all(json.as_bytes())?;
198        file.flush()?;
199        debug!(topic = %record.topic, iteration = record.iteration, "Event logged");
200        Ok(())
201    }
202
203    /// Convenience method to log an event directly.
204    pub fn log_event(
205        &mut self,
206        iteration: u32,
207        hat: &str,
208        event: &Event,
209        triggered: Option<&HatId>,
210    ) -> std::io::Result<()> {
211        let record = EventRecord::new(iteration, hat, event, triggered);
212        self.log(&record)
213    }
214
215    /// Returns the path to the log file.
216    pub fn path(&self) -> &Path {
217        &self.path
218    }
219}
220
221/// Reader for event history files.
222pub struct EventHistory {
223    path: PathBuf,
224}
225
226impl EventHistory {
227    /// Creates a new history reader.
228    pub fn new(path: impl Into<PathBuf>) -> Self {
229        Self { path: path.into() }
230    }
231
232    /// Creates a reader for the default path.
233    pub fn default_path() -> Self {
234        Self::new(EventLogger::DEFAULT_PATH)
235    }
236
237    /// Creates a history reader using the events path from a LoopContext.
238    ///
239    /// This ensures the reader looks in the correct location when running
240    /// in a worktree or other isolated workspace.
241    pub fn from_context(context: &LoopContext) -> Self {
242        Self::new(context.events_path())
243    }
244
245    /// Returns true if the history file exists.
246    pub fn exists(&self) -> bool {
247        self.path.exists()
248    }
249
250    /// Reads all event records from the file.
251    pub fn read_all(&self) -> std::io::Result<Vec<EventRecord>> {
252        if !self.exists() {
253            return Ok(Vec::new());
254        }
255
256        let file = File::open(&self.path)?;
257        let reader = BufReader::new(file);
258        let mut records = Vec::new();
259
260        for (line_num, line) in reader.lines().enumerate() {
261            let line = line?;
262            if line.trim().is_empty() {
263                continue;
264            }
265            match serde_json::from_str(&line) {
266                Ok(record) => records.push(record),
267                Err(e) => {
268                    warn!(line = line_num + 1, error = %e, "Failed to parse event record");
269                }
270            }
271        }
272
273        Ok(records)
274    }
275
276    /// Reads the last N event records.
277    pub fn read_last(&self, n: usize) -> std::io::Result<Vec<EventRecord>> {
278        let all = self.read_all()?;
279        let start = all.len().saturating_sub(n);
280        Ok(all[start..].to_vec())
281    }
282
283    /// Reads events filtered by topic.
284    pub fn filter_by_topic(&self, topic: &str) -> std::io::Result<Vec<EventRecord>> {
285        let all = self.read_all()?;
286        Ok(all.into_iter().filter(|r| r.topic == topic).collect())
287    }
288
289    /// Reads events filtered by iteration.
290    pub fn filter_by_iteration(&self, iteration: u32) -> std::io::Result<Vec<EventRecord>> {
291        let all = self.read_all()?;
292        Ok(all
293            .into_iter()
294            .filter(|r| r.iteration == iteration)
295            .collect())
296    }
297
298    /// Clears the event history file.
299    pub fn clear(&self) -> std::io::Result<()> {
300        if self.exists() {
301            fs::remove_file(&self.path)?;
302        }
303        Ok(())
304    }
305}
306
307#[cfg(test)]
308mod tests {
309    use super::*;
310    use tempfile::TempDir;
311
312    fn make_event(topic: &str, payload: &str) -> Event {
313        Event::new(topic, payload)
314    }
315
316    #[test]
317    fn test_log_and_read() {
318        let tmp = TempDir::new().unwrap();
319        let path = tmp.path().join("events.jsonl");
320
321        let mut logger = EventLogger::new(&path);
322
323        // Log some events
324        let event1 = make_event("task.start", "Starting task");
325        let event2 = make_event("build.done", "Build complete");
326
327        logger
328            .log_event(1, "loop", &event1, Some(&HatId::new("planner")))
329            .unwrap();
330        logger
331            .log_event(2, "builder", &event2, Some(&HatId::new("planner")))
332            .unwrap();
333
334        // Read them back
335        let history = EventHistory::new(&path);
336        let records = history.read_all().unwrap();
337
338        assert_eq!(records.len(), 2);
339        assert_eq!(records[0].topic, "task.start");
340        assert_eq!(records[0].iteration, 1);
341        assert_eq!(records[0].hat, "loop");
342        assert_eq!(records[0].triggered, Some("planner".to_string()));
343        assert_eq!(records[1].topic, "build.done");
344    }
345
346    #[test]
347    fn test_read_last() {
348        let tmp = TempDir::new().unwrap();
349        let path = tmp.path().join("events.jsonl");
350
351        let mut logger = EventLogger::new(&path);
352
353        for i in 1..=10 {
354            let event = make_event("test", &format!("Event {}", i));
355            logger.log_event(i, "hat", &event, None).unwrap();
356        }
357
358        let history = EventHistory::new(&path);
359        let last_3 = history.read_last(3).unwrap();
360
361        assert_eq!(last_3.len(), 3);
362        assert_eq!(last_3[0].iteration, 8);
363        assert_eq!(last_3[2].iteration, 10);
364    }
365
366    #[test]
367    fn test_filter_by_topic() {
368        let tmp = TempDir::new().unwrap();
369        let path = tmp.path().join("events.jsonl");
370
371        let mut logger = EventLogger::new(&path);
372
373        logger
374            .log_event(1, "hat", &make_event("build.done", "a"), None)
375            .unwrap();
376        logger
377            .log_event(2, "hat", &make_event("build.blocked", "b"), None)
378            .unwrap();
379        logger
380            .log_event(3, "hat", &make_event("build.done", "c"), None)
381            .unwrap();
382
383        let history = EventHistory::new(&path);
384        let blocked = history.filter_by_topic("build.blocked").unwrap();
385
386        assert_eq!(blocked.len(), 1);
387        assert_eq!(blocked[0].iteration, 2);
388    }
389
390    #[test]
391    fn test_payload_truncation() {
392        let long_payload = "x".repeat(1000);
393        let event = make_event("test", &long_payload);
394        let record = EventRecord::new(1, "hat", &event, None);
395
396        assert!(record.payload.len() < 1000);
397        assert!(record.payload.contains("[truncated"));
398    }
399
400    #[test]
401    fn test_payload_truncation_with_multibyte_chars() {
402        // Create a payload with multi-byte UTF-8 characters (✅ is 3 bytes)
403        // Place emoji near the truncation boundary to trigger the bug
404        let mut payload = "x".repeat(498);
405        payload.push_str("✅✅✅"); // 3 emojis at bytes 498-506
406        payload.push_str(&"y".repeat(500));
407
408        let event = make_event("test", &payload);
409        // This should NOT panic
410        let record = EventRecord::new(1, "hat", &event, None);
411
412        assert!(record.payload.contains("[truncated"));
413        // Verify the payload is valid UTF-8 (would panic on iteration if not)
414        for _ in record.payload.chars() {}
415    }
416
417    #[test]
418    fn test_creates_parent_directory() {
419        let tmp = TempDir::new().unwrap();
420        let path = tmp.path().join("nested/dir/events.jsonl");
421
422        let mut logger = EventLogger::new(&path);
423        let event = make_event("test", "payload");
424        logger.log_event(1, "hat", &event, None).unwrap();
425
426        assert!(path.exists());
427    }
428
429    #[test]
430    fn test_empty_history() {
431        let tmp = TempDir::new().unwrap();
432        let path = tmp.path().join("nonexistent.jsonl");
433
434        let history = EventHistory::new(&path);
435        assert!(!history.exists());
436
437        let records = history.read_all().unwrap();
438        assert!(records.is_empty());
439    }
440
441    #[test]
442    fn test_agent_written_events_without_iteration() {
443        // Agent events use simple format: {"topic":"...","payload":"...","ts":"..."}
444        // They don't include iteration, hat, or triggered fields
445        let tmp = TempDir::new().unwrap();
446        let path = tmp.path().join("events.jsonl");
447
448        // Write agent-style events (without iteration field)
449        let mut file = File::create(&path).unwrap();
450        writeln!(
451            file,
452            r#"{{"topic":"build.task","payload":"Implement auth","ts":"2024-01-15T10:00:00Z"}}"#
453        )
454        .unwrap();
455        writeln!(
456            file,
457            r#"{{"topic":"build.done","ts":"2024-01-15T10:30:00Z"}}"#
458        )
459        .unwrap();
460
461        // Should read without warnings (iteration defaults to 0)
462        let history = EventHistory::new(&path);
463        let records = history.read_all().unwrap();
464
465        assert_eq!(records.len(), 2);
466        assert_eq!(records[0].topic, "build.task");
467        assert_eq!(records[0].payload, "Implement auth");
468        assert_eq!(records[0].iteration, 0); // Defaults to 0
469        assert_eq!(records[0].hat, ""); // Defaults to empty string
470        assert_eq!(records[1].topic, "build.done");
471        assert_eq!(records[1].payload, ""); // Defaults to empty when not provided
472    }
473
474    #[test]
475    fn test_mixed_event_formats() {
476        // Test that both agent-written and Ralph-logged events can coexist
477        let tmp = TempDir::new().unwrap();
478        let path = tmp.path().join("events.jsonl");
479
480        // Write a Ralph-logged event (full format)
481        let mut logger = EventLogger::new(&path);
482        let event = make_event("task.start", "Initial task");
483        logger
484            .log_event(1, "loop", &event, Some(&HatId::new("planner")))
485            .unwrap();
486
487        // Write an agent-style event (simple format)
488        let mut file = std::fs::OpenOptions::new()
489            .append(true)
490            .open(&path)
491            .unwrap();
492        writeln!(
493            file,
494            r#"{{"topic":"build.task","payload":"Agent wrote this","ts":"2024-01-15T10:05:00Z"}}"#
495        )
496        .unwrap();
497
498        // Should read both without warnings
499        let history = EventHistory::new(&path);
500        let records = history.read_all().unwrap();
501
502        assert_eq!(records.len(), 2);
503        // First is Ralph's full-format event
504        assert_eq!(records[0].topic, "task.start");
505        assert_eq!(records[0].iteration, 1);
506        assert_eq!(records[0].hat, "loop");
507        // Second is agent's simple format
508        assert_eq!(records[1].topic, "build.task");
509        assert_eq!(records[1].iteration, 0); // Defaulted
510        assert_eq!(records[1].hat, ""); // Defaulted
511    }
512
513    #[test]
514    fn test_object_payload_from_ralph_emit_json() {
515        // Test that `ralph emit --json` object payloads are parsed correctly
516        // This was the root cause of "invalid type: map, expected a string" errors
517        let tmp = TempDir::new().unwrap();
518        let path = tmp.path().join("events.jsonl");
519
520        let mut file = File::create(&path).unwrap();
521
522        // String payload (normal case)
523        writeln!(
524            file,
525            r#"{{"ts":"2024-01-15T10:00:00Z","topic":"task.start","payload":"implement feature"}}"#
526        )
527        .unwrap();
528
529        // Object payload (from `ralph emit --json`)
530        writeln!(
531            file,
532            r#"{{"topic":"task.complete","payload":{{"status":"verified","tasks":["auth","api"]}},"ts":"2024-01-15T10:30:00Z"}}"#
533        )
534        .unwrap();
535
536        // Nested object payload
537        writeln!(
538            file,
539            r#"{{"topic":"loop.recovery","payload":{{"status":"recovered","evidence":{{"tests":"pass"}}}},"ts":"2024-01-15T10:45:00Z"}}"#
540        )
541        .unwrap();
542
543        let history = EventHistory::new(&path);
544        let records = history.read_all().unwrap();
545
546        assert_eq!(records.len(), 3);
547
548        // String payload unchanged
549        assert_eq!(records[0].topic, "task.start");
550        assert_eq!(records[0].payload, "implement feature");
551
552        // Object payload converted to JSON string
553        assert_eq!(records[1].topic, "task.complete");
554        assert!(records[1].payload.contains("\"status\""));
555        assert!(records[1].payload.contains("\"verified\""));
556        // Should be valid JSON
557        let parsed: serde_json::Value = serde_json::from_str(&records[1].payload).unwrap();
558        assert_eq!(parsed["status"], "verified");
559
560        // Nested object also works
561        assert_eq!(records[2].topic, "loop.recovery");
562        let parsed: serde_json::Value = serde_json::from_str(&records[2].payload).unwrap();
563        assert_eq!(parsed["evidence"]["tests"], "pass");
564    }
565}