Skip to main content

ralph_core/
event_logger.rs

1//! Event logging for debugging and post-mortem analysis.
2//!
3//! Logs all events to `.ralph/events.jsonl` as specified in the event-loop spec.
4//! The observer pattern allows hooking into the event bus without modifying routing.
5
6use crate::loop_context::LoopContext;
7use ralph_proto::{Event, HatId};
8use serde::{Deserialize, Deserializer, Serialize};
9use std::fs::{self, File, OpenOptions};
10use std::io::{BufRead, BufReader, Write};
11use std::path::{Path, PathBuf};
12use tracing::{debug, warn};
13
14/// Custom deserializer that accepts both String and structured JSON payloads.
15///
16/// Agents sometimes write structured data as JSON objects instead of strings.
17/// This deserializer accepts both formats:
18/// - `"payload": "string"` → `"string"`
19/// - `"payload": {...}` → `"{...}"` (serialized to JSON string)
20/// - `"payload": null` or missing → `""` (empty string)
21fn deserialize_flexible_payload<'de, D>(deserializer: D) -> Result<String, D::Error>
22where
23    D: Deserializer<'de>,
24{
25    #[derive(Deserialize)]
26    #[serde(untagged)]
27    enum FlexiblePayload {
28        String(String),
29        Object(serde_json::Value),
30    }
31
32    let opt = Option::<FlexiblePayload>::deserialize(deserializer)?;
33    Ok(opt
34        .map(|flex| match flex {
35            FlexiblePayload::String(s) => s,
36            FlexiblePayload::Object(serde_json::Value::Null) => String::new(),
37            FlexiblePayload::Object(obj) => {
38                // Serialize the object back to a JSON string
39                serde_json::to_string(&obj).unwrap_or_else(|_| obj.to_string())
40            }
41        })
42        .unwrap_or_default())
43}
44
45/// A logged event record for debugging.
46///
47/// Supports two schemas:
48/// 1. Rich internal format (logged by Ralph):
49///    `{"ts":"2024-01-15T10:23:45Z","iteration":1,"hat":"loop","topic":"task.start","triggered":"planner","payload":"..."}`
50/// 2. Simple agent format (written by agents):
51///    `{"topic":"build.task","payload":"...","ts":"2024-01-15T10:24:12Z"}`
52///
53/// Fields that don't exist in the agent format default to sensible values.
54#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct EventRecord {
56    /// ISO 8601 timestamp.
57    pub ts: String,
58
59    /// Loop iteration number (0 if not provided by agent-written events).
60    #[serde(default)]
61    pub iteration: u32,
62
63    /// Hat that was active when event was published (empty string if not provided).
64    #[serde(default)]
65    pub hat: String,
66
67    /// Event topic.
68    pub topic: String,
69
70    /// Hat that will be triggered by this event.
71    #[serde(skip_serializing_if = "Option::is_none")]
72    pub triggered: Option<String>,
73
74    /// Event content (truncated if large). Defaults to empty string for agent events without payload.
75    /// Accepts both string and object payloads - objects are serialized to JSON strings.
76    #[serde(default, deserialize_with = "deserialize_flexible_payload")]
77    pub payload: String,
78
79    /// How many times this task has blocked (optional).
80    #[serde(skip_serializing_if = "Option::is_none")]
81    pub blocked_count: Option<u32>,
82}
83
84impl EventRecord {
85    /// Maximum payload length before truncation.
86    const MAX_PAYLOAD_LEN: usize = 500;
87
88    /// Creates a new event record.
89    pub fn new(
90        iteration: u32,
91        hat: impl Into<String>,
92        event: &Event,
93        triggered: Option<&HatId>,
94    ) -> Self {
95        let payload = if event.payload.len() > Self::MAX_PAYLOAD_LEN {
96            // Find a valid UTF-8 char boundary at or before MAX_PAYLOAD_LEN.
97            // We walk backwards from the limit until we find a char boundary.
98            let mut truncate_at = Self::MAX_PAYLOAD_LEN;
99            while truncate_at > 0 && !event.payload.is_char_boundary(truncate_at) {
100                truncate_at -= 1;
101            }
102            format!(
103                "{}... [truncated, {} chars total]",
104                &event.payload[..truncate_at],
105                event.payload.chars().count()
106            )
107        } else {
108            event.payload.clone()
109        };
110
111        Self {
112            ts: chrono::Utc::now().to_rfc3339(),
113            iteration,
114            hat: hat.into(),
115            topic: event.topic.to_string(),
116            triggered: triggered.map(|h| h.to_string()),
117            payload,
118            blocked_count: None,
119        }
120    }
121
122    /// Sets the blocked count for this record.
123    pub fn with_blocked_count(mut self, count: u32) -> Self {
124        self.blocked_count = Some(count);
125        self
126    }
127}
128
129/// Logger that writes events to a JSONL file.
130pub struct EventLogger {
131    /// Path to the events file.
132    path: PathBuf,
133
134    /// File handle for appending.
135    file: Option<File>,
136}
137
138impl EventLogger {
139    /// Default path for the events file.
140    pub const DEFAULT_PATH: &'static str = ".ralph/events.jsonl";
141
142    /// Creates a new event logger.
143    ///
144    /// The `.ralph/` directory is created if it doesn't exist.
145    pub fn new(path: impl Into<PathBuf>) -> Self {
146        Self {
147            path: path.into(),
148            file: None,
149        }
150    }
151
152    /// Creates a logger with the default path.
153    pub fn default_path() -> Self {
154        Self::new(Self::DEFAULT_PATH)
155    }
156
157    /// Creates a logger using the events path from a LoopContext.
158    ///
159    /// This reads the timestamped events path from the marker file if it exists,
160    /// falling back to the default events path. This ensures the logger writes
161    /// to the correct location when running in a worktree or other isolated workspace.
162    pub fn from_context(context: &LoopContext) -> Self {
163        // Read timestamped events path from marker file, fall back to default
164        // The marker file contains a relative path like ".ralph/events-20260127-123456.jsonl"
165        // which we resolve relative to the workspace root
166        let events_path = std::fs::read_to_string(context.current_events_marker())
167            .map(|s| {
168                let relative = s.trim();
169                context.workspace().join(relative)
170            })
171            .unwrap_or_else(|_| context.events_path());
172        Self::new(events_path)
173    }
174
175    /// Ensures the parent directory exists and opens the file.
176    fn ensure_open(&mut self) -> std::io::Result<&mut File> {
177        if self.file.is_none() {
178            if let Some(parent) = self.path.parent() {
179                fs::create_dir_all(parent)?;
180            }
181            let file = OpenOptions::new()
182                .create(true)
183                .append(true)
184                .open(&self.path)?;
185            self.file = Some(file);
186        }
187        Ok(self.file.as_mut().unwrap())
188    }
189
190    /// Logs an event record.
191    ///
192    /// Uses a single `write_all` call to ensure the JSON line is written atomically.
193    /// This prevents corruption when multiple processes append to the same file
194    /// concurrently (e.g., during parallel merge queue processing).
195    pub fn log(&mut self, record: &EventRecord) -> std::io::Result<()> {
196        let file = self.ensure_open()?;
197        let mut json = serde_json::to_string(record)?;
198        json.push('\n');
199        // Single write_all ensures atomic append on POSIX with O_APPEND
200        file.write_all(json.as_bytes())?;
201        file.flush()?;
202        debug!(topic = %record.topic, iteration = record.iteration, "Event logged");
203        Ok(())
204    }
205
206    /// Convenience method to log an event directly.
207    pub fn log_event(
208        &mut self,
209        iteration: u32,
210        hat: &str,
211        event: &Event,
212        triggered: Option<&HatId>,
213    ) -> std::io::Result<()> {
214        let record = EventRecord::new(iteration, hat, event, triggered);
215        self.log(&record)
216    }
217
218    /// Returns the path to the log file.
219    pub fn path(&self) -> &Path {
220        &self.path
221    }
222}
223
224/// Reader for event history files.
225pub struct EventHistory {
226    path: PathBuf,
227}
228
229impl EventHistory {
230    /// Creates a new history reader.
231    pub fn new(path: impl Into<PathBuf>) -> Self {
232        Self { path: path.into() }
233    }
234
235    /// Creates a reader for the default path.
236    pub fn default_path() -> Self {
237        Self::new(EventLogger::DEFAULT_PATH)
238    }
239
240    /// Creates a history reader using the events path from a LoopContext.
241    ///
242    /// This ensures the reader looks in the correct location when running
243    /// in a worktree or other isolated workspace.
244    pub fn from_context(context: &LoopContext) -> Self {
245        Self::new(context.events_path())
246    }
247
248    /// Returns true if the history file exists.
249    pub fn exists(&self) -> bool {
250        self.path.exists()
251    }
252
253    /// Reads all event records from the file.
254    pub fn read_all(&self) -> std::io::Result<Vec<EventRecord>> {
255        if !self.exists() {
256            return Ok(Vec::new());
257        }
258
259        let file = File::open(&self.path)?;
260        let reader = BufReader::new(file);
261        let mut records = Vec::new();
262
263        for (line_num, line) in reader.lines().enumerate() {
264            let line = line?;
265            if line.trim().is_empty() {
266                continue;
267            }
268            match serde_json::from_str(&line) {
269                Ok(record) => records.push(record),
270                Err(e) => {
271                    warn!(line = line_num + 1, error = %e, "Failed to parse event record");
272                }
273            }
274        }
275
276        Ok(records)
277    }
278
279    /// Reads the last N event records.
280    pub fn read_last(&self, n: usize) -> std::io::Result<Vec<EventRecord>> {
281        let all = self.read_all()?;
282        let start = all.len().saturating_sub(n);
283        Ok(all[start..].to_vec())
284    }
285
286    /// Reads events filtered by topic.
287    pub fn filter_by_topic(&self, topic: &str) -> std::io::Result<Vec<EventRecord>> {
288        let all = self.read_all()?;
289        Ok(all.into_iter().filter(|r| r.topic == topic).collect())
290    }
291
292    /// Reads events filtered by iteration.
293    pub fn filter_by_iteration(&self, iteration: u32) -> std::io::Result<Vec<EventRecord>> {
294        let all = self.read_all()?;
295        Ok(all
296            .into_iter()
297            .filter(|r| r.iteration == iteration)
298            .collect())
299    }
300
301    /// Clears the event history file.
302    pub fn clear(&self) -> std::io::Result<()> {
303        if self.exists() {
304            fs::remove_file(&self.path)?;
305        }
306        Ok(())
307    }
308}
309
310#[cfg(test)]
311mod tests {
312    use super::*;
313    use tempfile::TempDir;
314
315    fn make_event(topic: &str, payload: &str) -> Event {
316        Event::new(topic, payload)
317    }
318
319    #[test]
320    fn test_log_and_read() {
321        let tmp = TempDir::new().unwrap();
322        let path = tmp.path().join("events.jsonl");
323
324        let mut logger = EventLogger::new(&path);
325
326        // Log some events
327        let event1 = make_event("task.start", "Starting task");
328        let event2 = make_event("build.done", "Build complete");
329
330        logger
331            .log_event(1, "loop", &event1, Some(&HatId::new("planner")))
332            .unwrap();
333        logger
334            .log_event(2, "builder", &event2, Some(&HatId::new("planner")))
335            .unwrap();
336
337        // Read them back
338        let history = EventHistory::new(&path);
339        let records = history.read_all().unwrap();
340
341        assert_eq!(records.len(), 2);
342        assert_eq!(records[0].topic, "task.start");
343        assert_eq!(records[0].iteration, 1);
344        assert_eq!(records[0].hat, "loop");
345        assert_eq!(records[0].triggered, Some("planner".to_string()));
346        assert_eq!(records[1].topic, "build.done");
347    }
348
349    #[test]
350    fn test_read_last() {
351        let tmp = TempDir::new().unwrap();
352        let path = tmp.path().join("events.jsonl");
353
354        let mut logger = EventLogger::new(&path);
355
356        for i in 1..=10 {
357            let event = make_event("test", &format!("Event {}", i));
358            logger.log_event(i, "hat", &event, None).unwrap();
359        }
360
361        let history = EventHistory::new(&path);
362        let last_3 = history.read_last(3).unwrap();
363
364        assert_eq!(last_3.len(), 3);
365        assert_eq!(last_3[0].iteration, 8);
366        assert_eq!(last_3[2].iteration, 10);
367    }
368
369    #[test]
370    fn test_filter_by_topic() {
371        let tmp = TempDir::new().unwrap();
372        let path = tmp.path().join("events.jsonl");
373
374        let mut logger = EventLogger::new(&path);
375
376        logger
377            .log_event(1, "hat", &make_event("build.done", "a"), None)
378            .unwrap();
379        logger
380            .log_event(2, "hat", &make_event("build.blocked", "b"), None)
381            .unwrap();
382        logger
383            .log_event(3, "hat", &make_event("build.done", "c"), None)
384            .unwrap();
385
386        let history = EventHistory::new(&path);
387        let blocked = history.filter_by_topic("build.blocked").unwrap();
388
389        assert_eq!(blocked.len(), 1);
390        assert_eq!(blocked[0].iteration, 2);
391    }
392
393    #[test]
394    fn test_payload_truncation() {
395        let long_payload = "x".repeat(1000);
396        let event = make_event("test", &long_payload);
397        let record = EventRecord::new(1, "hat", &event, None);
398
399        assert!(record.payload.len() < 1000);
400        assert!(record.payload.contains("[truncated"));
401    }
402
403    #[test]
404    fn test_payload_truncation_with_multibyte_chars() {
405        // Create a payload with multi-byte UTF-8 characters (✅ is 3 bytes)
406        // Place emoji near the truncation boundary to trigger the bug
407        let mut payload = "x".repeat(498);
408        payload.push_str("✅✅✅"); // 3 emojis at bytes 498-506
409        payload.push_str(&"y".repeat(500));
410
411        let event = make_event("test", &payload);
412        // This should NOT panic
413        let record = EventRecord::new(1, "hat", &event, None);
414
415        assert!(record.payload.contains("[truncated"));
416        // Verify the payload is valid UTF-8 (would panic on iteration if not)
417        for _ in record.payload.chars() {}
418    }
419
420    #[test]
421    fn test_creates_parent_directory() {
422        let tmp = TempDir::new().unwrap();
423        let path = tmp.path().join("nested/dir/events.jsonl");
424
425        let mut logger = EventLogger::new(&path);
426        let event = make_event("test", "payload");
427        logger.log_event(1, "hat", &event, None).unwrap();
428
429        assert!(path.exists());
430    }
431
432    #[test]
433    fn test_empty_history() {
434        let tmp = TempDir::new().unwrap();
435        let path = tmp.path().join("nonexistent.jsonl");
436
437        let history = EventHistory::new(&path);
438        assert!(!history.exists());
439
440        let records = history.read_all().unwrap();
441        assert!(records.is_empty());
442    }
443
444    #[test]
445    fn test_agent_written_events_without_iteration() {
446        // Agent events use simple format: {"topic":"...","payload":"...","ts":"..."}
447        // They don't include iteration, hat, or triggered fields
448        let tmp = TempDir::new().unwrap();
449        let path = tmp.path().join("events.jsonl");
450
451        // Write agent-style events (without iteration field)
452        let mut file = File::create(&path).unwrap();
453        writeln!(
454            file,
455            r#"{{"topic":"build.task","payload":"Implement auth","ts":"2024-01-15T10:00:00Z"}}"#
456        )
457        .unwrap();
458        writeln!(
459            file,
460            r#"{{"topic":"build.done","ts":"2024-01-15T10:30:00Z"}}"#
461        )
462        .unwrap();
463
464        // Should read without warnings (iteration defaults to 0)
465        let history = EventHistory::new(&path);
466        let records = history.read_all().unwrap();
467
468        assert_eq!(records.len(), 2);
469        assert_eq!(records[0].topic, "build.task");
470        assert_eq!(records[0].payload, "Implement auth");
471        assert_eq!(records[0].iteration, 0); // Defaults to 0
472        assert_eq!(records[0].hat, ""); // Defaults to empty string
473        assert_eq!(records[1].topic, "build.done");
474        assert_eq!(records[1].payload, ""); // Defaults to empty when not provided
475    }
476
477    #[test]
478    fn test_mixed_event_formats() {
479        // Test that both agent-written and Ralph-logged events can coexist
480        let tmp = TempDir::new().unwrap();
481        let path = tmp.path().join("events.jsonl");
482
483        // Write a Ralph-logged event (full format)
484        let mut logger = EventLogger::new(&path);
485        let event = make_event("task.start", "Initial task");
486        logger
487            .log_event(1, "loop", &event, Some(&HatId::new("planner")))
488            .unwrap();
489
490        // Write an agent-style event (simple format)
491        let mut file = std::fs::OpenOptions::new()
492            .append(true)
493            .open(&path)
494            .unwrap();
495        writeln!(
496            file,
497            r#"{{"topic":"build.task","payload":"Agent wrote this","ts":"2024-01-15T10:05:00Z"}}"#
498        )
499        .unwrap();
500
501        // Should read both without warnings
502        let history = EventHistory::new(&path);
503        let records = history.read_all().unwrap();
504
505        assert_eq!(records.len(), 2);
506        // First is Ralph's full-format event
507        assert_eq!(records[0].topic, "task.start");
508        assert_eq!(records[0].iteration, 1);
509        assert_eq!(records[0].hat, "loop");
510        // Second is agent's simple format
511        assert_eq!(records[1].topic, "build.task");
512        assert_eq!(records[1].iteration, 0); // Defaulted
513        assert_eq!(records[1].hat, ""); // Defaulted
514    }
515
516    #[test]
517    fn test_object_payload_from_ralph_emit_json() {
518        // Test that `ralph emit --json` object payloads are parsed correctly
519        // This was the root cause of "invalid type: map, expected a string" errors
520        let tmp = TempDir::new().unwrap();
521        let path = tmp.path().join("events.jsonl");
522
523        let mut file = File::create(&path).unwrap();
524
525        // String payload (normal case)
526        writeln!(
527            file,
528            r#"{{"ts":"2024-01-15T10:00:00Z","topic":"task.start","payload":"implement feature"}}"#
529        )
530        .unwrap();
531
532        // Object payload (from `ralph emit --json`)
533        writeln!(
534            file,
535            r#"{{"topic":"task.complete","payload":{{"status":"verified","tasks":["auth","api"]}},"ts":"2024-01-15T10:30:00Z"}}"#
536        )
537        .unwrap();
538
539        // Nested object payload
540        writeln!(
541            file,
542            r#"{{"topic":"loop.recovery","payload":{{"status":"recovered","evidence":{{"tests":"pass"}}}},"ts":"2024-01-15T10:45:00Z"}}"#
543        )
544        .unwrap();
545
546        let history = EventHistory::new(&path);
547        let records = history.read_all().unwrap();
548
549        assert_eq!(records.len(), 3);
550
551        // String payload unchanged
552        assert_eq!(records[0].topic, "task.start");
553        assert_eq!(records[0].payload, "implement feature");
554
555        // Object payload converted to JSON string
556        assert_eq!(records[1].topic, "task.complete");
557        assert!(records[1].payload.contains("\"status\""));
558        assert!(records[1].payload.contains("\"verified\""));
559        // Should be valid JSON
560        let parsed: serde_json::Value = serde_json::from_str(&records[1].payload).unwrap();
561        assert_eq!(parsed["status"], "verified");
562
563        // Nested object also works
564        assert_eq!(records[2].topic, "loop.recovery");
565        let parsed: serde_json::Value = serde_json::from_str(&records[2].payload).unwrap();
566        assert_eq!(parsed["evidence"]["tests"], "pass");
567    }
568}