Skip to main content

ralph_core/
event_logger.rs

1//! Event logging for debugging and post-mortem analysis.
2//!
3//! Logs all events to `.ralph/events.jsonl` as specified in the event-loop spec.
4//! The observer pattern allows hooking into the event bus without modifying routing.
5
6use crate::loop_context::LoopContext;
7use crate::text::floor_char_boundary;
8use ralph_proto::{Event, HatId};
9use serde::{Deserialize, Deserializer, Serialize};
10use std::fs::{self, File, OpenOptions};
11use std::io::{BufRead, BufReader, Write};
12use std::path::{Path, PathBuf};
13use tracing::{debug, warn};
14
15/// Custom deserializer that accepts both String and structured JSON payloads.
16///
17/// Agents sometimes write structured data as JSON objects instead of strings.
18/// This deserializer accepts both formats:
19/// - `"payload": "string"` → `"string"`
20/// - `"payload": {...}` → `"{...}"` (serialized to JSON string)
21/// - `"payload": null` or missing → `""` (empty string)
22fn deserialize_flexible_payload<'de, D>(deserializer: D) -> Result<String, D::Error>
23where
24    D: Deserializer<'de>,
25{
26    #[derive(Deserialize)]
27    #[serde(untagged)]
28    enum FlexiblePayload {
29        String(String),
30        Object(serde_json::Value),
31    }
32
33    let opt = Option::<FlexiblePayload>::deserialize(deserializer)?;
34    Ok(opt
35        .map(|flex| match flex {
36            FlexiblePayload::String(s) => s,
37            FlexiblePayload::Object(serde_json::Value::Null) => String::new(),
38            FlexiblePayload::Object(obj) => {
39                // Serialize the object back to a JSON string
40                serde_json::to_string(&obj).unwrap_or_else(|_| obj.to_string())
41            }
42        })
43        .unwrap_or_default())
44}
45
46/// A logged event record for debugging.
47///
48/// Supports two schemas:
49/// 1. Rich internal format (logged by Ralph):
50///    `{"ts":"2024-01-15T10:23:45Z","iteration":1,"hat":"loop","topic":"task.start","triggered":"planner","payload":"..."}`
51/// 2. Simple agent format (written by agents):
52///    `{"topic":"build.task","payload":"...","ts":"2024-01-15T10:24:12Z"}`
53///
54/// Fields that don't exist in the agent format default to sensible values.
55#[derive(Debug, Clone, Serialize, Deserialize)]
56pub struct EventRecord {
57    /// ISO 8601 timestamp.
58    pub ts: String,
59
60    /// Loop iteration number (0 if not provided by agent-written events).
61    #[serde(default)]
62    pub iteration: u32,
63
64    /// Hat that was active when event was published (empty string if not provided).
65    #[serde(default)]
66    pub hat: String,
67
68    /// Event topic.
69    pub topic: String,
70
71    /// Hat that will be triggered by this event.
72    #[serde(skip_serializing_if = "Option::is_none")]
73    pub triggered: Option<String>,
74
75    /// Event content (truncated if large). Defaults to empty string for agent events without payload.
76    /// Accepts both string and object payloads - objects are serialized to JSON strings.
77    #[serde(default, deserialize_with = "deserialize_flexible_payload")]
78    pub payload: String,
79
80    /// How many times this task has blocked (optional).
81    #[serde(skip_serializing_if = "Option::is_none")]
82    pub blocked_count: Option<u32>,
83
84    /// Wave correlation ID.
85    #[serde(default, skip_serializing_if = "Option::is_none")]
86    pub wave_id: Option<String>,
87
88    /// Index of this event within the wave (0-based).
89    #[serde(default, skip_serializing_if = "Option::is_none")]
90    pub wave_index: Option<u32>,
91
92    /// Total number of events in the wave.
93    #[serde(default, skip_serializing_if = "Option::is_none")]
94    pub wave_total: Option<u32>,
95}
96
97impl EventRecord {
98    /// Maximum payload length before truncation.
99    const MAX_PAYLOAD_LEN: usize = 500;
100
101    /// Creates a new event record.
102    pub fn new(
103        iteration: u32,
104        hat: impl Into<String>,
105        event: &Event,
106        triggered: Option<&HatId>,
107    ) -> Self {
108        let payload = if event.payload.len() > Self::MAX_PAYLOAD_LEN {
109            // Find a valid UTF-8 char boundary at or before MAX_PAYLOAD_LEN.
110            let truncate_at = floor_char_boundary(&event.payload, Self::MAX_PAYLOAD_LEN);
111            format!(
112                "{}... [truncated, {} chars total]",
113                &event.payload[..truncate_at],
114                event.payload.chars().count()
115            )
116        } else {
117            event.payload.clone()
118        };
119
120        Self {
121            ts: chrono::Utc::now().to_rfc3339(),
122            iteration,
123            hat: hat.into(),
124            topic: event.topic.to_string(),
125            triggered: triggered.map(|h| h.to_string()),
126            payload,
127            blocked_count: None,
128            wave_id: event.wave_id.clone(),
129            wave_index: event.wave_index,
130            wave_total: event.wave_total,
131        }
132    }
133
134    /// Sets the blocked count for this record.
135    pub fn with_blocked_count(mut self, count: u32) -> Self {
136        self.blocked_count = Some(count);
137        self
138    }
139}
140
141/// Logger that writes events to a JSONL file.
142pub struct EventLogger {
143    /// Path to the events file.
144    path: PathBuf,
145
146    /// File handle for appending.
147    file: Option<File>,
148}
149
150impl EventLogger {
151    /// Default path for the events file.
152    pub const DEFAULT_PATH: &'static str = ".ralph/events.jsonl";
153
154    /// Creates a new event logger.
155    ///
156    /// The `.ralph/` directory is created if it doesn't exist.
157    pub fn new(path: impl Into<PathBuf>) -> Self {
158        Self {
159            path: path.into(),
160            file: None,
161        }
162    }
163
164    /// Creates a logger with the default path.
165    pub fn default_path() -> Self {
166        Self::new(Self::DEFAULT_PATH)
167    }
168
169    /// Creates a logger using the events path from a LoopContext.
170    ///
171    /// This reads the timestamped events path from the marker file if it exists,
172    /// falling back to the default events path. This ensures the logger writes
173    /// to the correct location when running in a worktree or other isolated workspace.
174    pub fn from_context(context: &LoopContext) -> Self {
175        // Read timestamped events path from marker file, fall back to default
176        // The marker file contains a relative path like ".ralph/events-20260127-123456.jsonl"
177        // which we resolve relative to the workspace root
178        let events_path = std::fs::read_to_string(context.current_events_marker())
179            .map(|s| {
180                let relative = s.trim();
181                context.workspace().join(relative)
182            })
183            .unwrap_or_else(|_| context.events_path());
184        Self::new(events_path)
185    }
186
187    /// Ensures the parent directory exists and opens the file.
188    fn ensure_open(&mut self) -> std::io::Result<&mut File> {
189        if self.file.is_none() {
190            if let Some(parent) = self.path.parent() {
191                fs::create_dir_all(parent)?;
192            }
193            let file = OpenOptions::new()
194                .create(true)
195                .append(true)
196                .open(&self.path)?;
197            self.file = Some(file);
198        }
199        Ok(self.file.as_mut().unwrap())
200    }
201
202    /// Logs an event record.
203    ///
204    /// Uses a single `write_all` call to ensure the JSON line is written atomically.
205    /// This prevents corruption when multiple processes append to the same file
206    /// concurrently (e.g., during parallel merge queue processing).
207    pub fn log(&mut self, record: &EventRecord) -> std::io::Result<()> {
208        let file = self.ensure_open()?;
209        let mut json = serde_json::to_string(record)?;
210        json.push('\n');
211        // Single write_all ensures atomic append on POSIX with O_APPEND
212        file.write_all(json.as_bytes())?;
213        file.flush()?;
214        debug!(topic = %record.topic, iteration = record.iteration, "Event logged");
215        Ok(())
216    }
217
218    /// Convenience method to log an event directly.
219    pub fn log_event(
220        &mut self,
221        iteration: u32,
222        hat: &str,
223        event: &Event,
224        triggered: Option<&HatId>,
225    ) -> std::io::Result<()> {
226        let record = EventRecord::new(iteration, hat, event, triggered);
227        self.log(&record)
228    }
229
230    /// Returns the path to the log file.
231    pub fn path(&self) -> &Path {
232        &self.path
233    }
234}
235
236/// Reader for event history files.
237pub struct EventHistory {
238    path: PathBuf,
239}
240
241impl EventHistory {
242    /// Creates a new history reader.
243    pub fn new(path: impl Into<PathBuf>) -> Self {
244        Self { path: path.into() }
245    }
246
247    /// Creates a reader for the default path.
248    pub fn default_path() -> Self {
249        Self::new(EventLogger::DEFAULT_PATH)
250    }
251
252    /// Creates a history reader using the events path from a LoopContext.
253    ///
254    /// This ensures the reader looks in the correct location when running
255    /// in a worktree or other isolated workspace.
256    pub fn from_context(context: &LoopContext) -> Self {
257        Self::new(context.events_path())
258    }
259
260    /// Returns true if the history file exists.
261    pub fn exists(&self) -> bool {
262        self.path.exists()
263    }
264
265    /// Reads all event records from the file.
266    pub fn read_all(&self) -> std::io::Result<Vec<EventRecord>> {
267        if !self.exists() {
268            return Ok(Vec::new());
269        }
270
271        let file = File::open(&self.path)?;
272        let reader = BufReader::new(file);
273        let mut records = Vec::new();
274
275        for (line_num, line) in reader.lines().enumerate() {
276            let line = line?;
277            if line.trim().is_empty() {
278                continue;
279            }
280            match serde_json::from_str(&line) {
281                Ok(record) => records.push(record),
282                Err(e) => {
283                    warn!(line = line_num + 1, error = %e, "Failed to parse event record");
284                }
285            }
286        }
287
288        Ok(records)
289    }
290
291    /// Reads the last N event records.
292    pub fn read_last(&self, n: usize) -> std::io::Result<Vec<EventRecord>> {
293        let all = self.read_all()?;
294        let start = all.len().saturating_sub(n);
295        Ok(all[start..].to_vec())
296    }
297
298    /// Reads events filtered by topic.
299    pub fn filter_by_topic(&self, topic: &str) -> std::io::Result<Vec<EventRecord>> {
300        let all = self.read_all()?;
301        Ok(all.into_iter().filter(|r| r.topic == topic).collect())
302    }
303
304    /// Reads events filtered by iteration.
305    pub fn filter_by_iteration(&self, iteration: u32) -> std::io::Result<Vec<EventRecord>> {
306        let all = self.read_all()?;
307        Ok(all
308            .into_iter()
309            .filter(|r| r.iteration == iteration)
310            .collect())
311    }
312
313    /// Clears the event history file.
314    pub fn clear(&self) -> std::io::Result<()> {
315        if self.exists() {
316            fs::remove_file(&self.path)?;
317        }
318        Ok(())
319    }
320}
321
322#[cfg(test)]
323mod tests {
324    use super::*;
325    use tempfile::TempDir;
326
327    fn make_event(topic: &str, payload: &str) -> Event {
328        Event::new(topic, payload)
329    }
330
331    #[test]
332    fn test_log_and_read() {
333        let tmp = TempDir::new().unwrap();
334        let path = tmp.path().join("events.jsonl");
335
336        let mut logger = EventLogger::new(&path);
337
338        // Log some events
339        let event1 = make_event("task.start", "Starting task");
340        let event2 = make_event("build.done", "Build complete");
341
342        logger
343            .log_event(1, "loop", &event1, Some(&HatId::new("planner")))
344            .unwrap();
345        logger
346            .log_event(2, "builder", &event2, Some(&HatId::new("planner")))
347            .unwrap();
348
349        // Read them back
350        let history = EventHistory::new(&path);
351        let records = history.read_all().unwrap();
352
353        assert_eq!(records.len(), 2);
354        assert_eq!(records[0].topic, "task.start");
355        assert_eq!(records[0].iteration, 1);
356        assert_eq!(records[0].hat, "loop");
357        assert_eq!(records[0].triggered, Some("planner".to_string()));
358        assert_eq!(records[1].topic, "build.done");
359    }
360
361    #[test]
362    fn test_read_last() {
363        let tmp = TempDir::new().unwrap();
364        let path = tmp.path().join("events.jsonl");
365
366        let mut logger = EventLogger::new(&path);
367
368        for i in 1..=10 {
369            let event = make_event("test", &format!("Event {}", i));
370            logger.log_event(i, "hat", &event, None).unwrap();
371        }
372
373        let history = EventHistory::new(&path);
374        let last_3 = history.read_last(3).unwrap();
375
376        assert_eq!(last_3.len(), 3);
377        assert_eq!(last_3[0].iteration, 8);
378        assert_eq!(last_3[2].iteration, 10);
379    }
380
381    #[test]
382    fn test_filter_by_topic() {
383        let tmp = TempDir::new().unwrap();
384        let path = tmp.path().join("events.jsonl");
385
386        let mut logger = EventLogger::new(&path);
387
388        logger
389            .log_event(1, "hat", &make_event("build.done", "a"), None)
390            .unwrap();
391        logger
392            .log_event(2, "hat", &make_event("build.blocked", "b"), None)
393            .unwrap();
394        logger
395            .log_event(3, "hat", &make_event("build.done", "c"), None)
396            .unwrap();
397
398        let history = EventHistory::new(&path);
399        let blocked = history.filter_by_topic("build.blocked").unwrap();
400
401        assert_eq!(blocked.len(), 1);
402        assert_eq!(blocked[0].iteration, 2);
403    }
404
405    #[test]
406    fn test_payload_truncation() {
407        let long_payload = "x".repeat(1000);
408        let event = make_event("test", &long_payload);
409        let record = EventRecord::new(1, "hat", &event, None);
410
411        assert!(record.payload.len() < 1000);
412        assert!(record.payload.contains("[truncated"));
413    }
414
415    #[test]
416    fn test_payload_truncation_with_multibyte_chars() {
417        // Create a payload with multi-byte UTF-8 characters (✅ is 3 bytes)
418        // Place emoji near the truncation boundary to trigger the bug
419        let mut payload = "x".repeat(498);
420        payload.push_str("✅✅✅"); // 3 emojis at bytes 498-506
421        payload.push_str(&"y".repeat(500));
422
423        let event = make_event("test", &payload);
424        // This should NOT panic
425        let record = EventRecord::new(1, "hat", &event, None);
426
427        assert!(record.payload.contains("[truncated"));
428        // Verify the payload is valid UTF-8 (would panic on iteration if not)
429        for _ in record.payload.chars() {}
430    }
431
432    #[test]
433    fn test_creates_parent_directory() {
434        let tmp = TempDir::new().unwrap();
435        let path = tmp.path().join("nested/dir/events.jsonl");
436
437        let mut logger = EventLogger::new(&path);
438        let event = make_event("test", "payload");
439        logger.log_event(1, "hat", &event, None).unwrap();
440
441        assert!(path.exists());
442    }
443
444    #[test]
445    fn test_empty_history() {
446        let tmp = TempDir::new().unwrap();
447        let path = tmp.path().join("nonexistent.jsonl");
448
449        let history = EventHistory::new(&path);
450        assert!(!history.exists());
451
452        let records = history.read_all().unwrap();
453        assert!(records.is_empty());
454    }
455
456    #[test]
457    fn test_agent_written_events_without_iteration() {
458        // Agent events use simple format: {"topic":"...","payload":"...","ts":"..."}
459        // They don't include iteration, hat, or triggered fields
460        let tmp = TempDir::new().unwrap();
461        let path = tmp.path().join("events.jsonl");
462
463        // Write agent-style events (without iteration field)
464        let mut file = File::create(&path).unwrap();
465        writeln!(
466            file,
467            r#"{{"topic":"build.task","payload":"Implement auth","ts":"2024-01-15T10:00:00Z"}}"#
468        )
469        .unwrap();
470        writeln!(
471            file,
472            r#"{{"topic":"build.done","ts":"2024-01-15T10:30:00Z"}}"#
473        )
474        .unwrap();
475
476        // Should read without warnings (iteration defaults to 0)
477        let history = EventHistory::new(&path);
478        let records = history.read_all().unwrap();
479
480        assert_eq!(records.len(), 2);
481        assert_eq!(records[0].topic, "build.task");
482        assert_eq!(records[0].payload, "Implement auth");
483        assert_eq!(records[0].iteration, 0); // Defaults to 0
484        assert_eq!(records[0].hat, ""); // Defaults to empty string
485        assert_eq!(records[1].topic, "build.done");
486        assert_eq!(records[1].payload, ""); // Defaults to empty when not provided
487    }
488
489    #[test]
490    fn test_mixed_event_formats() {
491        // Test that both agent-written and Ralph-logged events can coexist
492        let tmp = TempDir::new().unwrap();
493        let path = tmp.path().join("events.jsonl");
494
495        // Write a Ralph-logged event (full format)
496        let mut logger = EventLogger::new(&path);
497        let event = make_event("task.start", "Initial task");
498        logger
499            .log_event(1, "loop", &event, Some(&HatId::new("planner")))
500            .unwrap();
501
502        // Write an agent-style event (simple format)
503        let mut file = std::fs::OpenOptions::new()
504            .append(true)
505            .open(&path)
506            .unwrap();
507        writeln!(
508            file,
509            r#"{{"topic":"build.task","payload":"Agent wrote this","ts":"2024-01-15T10:05:00Z"}}"#
510        )
511        .unwrap();
512
513        // Should read both without warnings
514        let history = EventHistory::new(&path);
515        let records = history.read_all().unwrap();
516
517        assert_eq!(records.len(), 2);
518        // First is Ralph's full-format event
519        assert_eq!(records[0].topic, "task.start");
520        assert_eq!(records[0].iteration, 1);
521        assert_eq!(records[0].hat, "loop");
522        // Second is agent's simple format
523        assert_eq!(records[1].topic, "build.task");
524        assert_eq!(records[1].iteration, 0); // Defaulted
525        assert_eq!(records[1].hat, ""); // Defaulted
526    }
527
528    #[test]
529    fn test_event_record_propagates_wave_metadata() {
530        let event = make_event("review.file", "src/main.rs").with_wave("w-1a2b3c4d", 1, 3);
531        let record = EventRecord::new(1, "dispatcher", &event, None);
532
533        assert_eq!(record.wave_id.as_deref(), Some("w-1a2b3c4d"));
534        assert_eq!(record.wave_index, Some(1));
535        assert_eq!(record.wave_total, Some(3));
536    }
537
538    #[test]
539    fn test_event_record_no_wave_metadata() {
540        let event = make_event("build.done", "success");
541        let record = EventRecord::new(1, "builder", &event, None);
542
543        assert!(record.wave_id.is_none());
544        assert!(record.wave_index.is_none());
545        assert!(record.wave_total.is_none());
546    }
547
548    #[test]
549    fn test_event_record_wave_roundtrip_through_jsonl() {
550        let tmp = TempDir::new().unwrap();
551        let path = tmp.path().join("events.jsonl");
552
553        let mut logger = EventLogger::new(&path);
554
555        // Log event with wave metadata
556        let event = make_event("review.file", "src/main.rs").with_wave("w-deadbeef", 0, 5);
557        logger.log_event(1, "dispatcher", &event, None).unwrap();
558
559        // Log event without wave metadata
560        let plain_event = make_event("build.done", "ok");
561        logger.log_event(2, "builder", &plain_event, None).unwrap();
562
563        let history = EventHistory::new(&path);
564        let records = history.read_all().unwrap();
565
566        assert_eq!(records.len(), 2);
567        // First has wave metadata
568        assert_eq!(records[0].wave_id.as_deref(), Some("w-deadbeef"));
569        assert_eq!(records[0].wave_index, Some(0));
570        assert_eq!(records[0].wave_total, Some(5));
571        // Second has no wave metadata
572        assert!(records[1].wave_id.is_none());
573        assert!(records[1].wave_index.is_none());
574        assert!(records[1].wave_total.is_none());
575    }
576
577    #[test]
578    fn test_event_record_wave_fields_not_serialized_when_none() {
579        let event = make_event("test", "payload");
580        let record = EventRecord::new(1, "hat", &event, None);
581        let json = serde_json::to_string(&record).unwrap();
582        assert!(!json.contains("wave_id"));
583        assert!(!json.contains("wave_index"));
584        assert!(!json.contains("wave_total"));
585    }
586
587    #[test]
588    fn test_event_record_backwards_compat_no_wave_fields() {
589        // Simulate reading a JSONL line written before wave support
590        let json = r#"{"ts":"2024-01-15T10:00:00Z","iteration":1,"hat":"builder","topic":"build.done","payload":"ok"}"#;
591        let record: EventRecord = serde_json::from_str(json).unwrap();
592        assert!(record.wave_id.is_none());
593        assert!(record.wave_index.is_none());
594        assert!(record.wave_total.is_none());
595        assert_eq!(record.topic, "build.done");
596    }
597
598    #[test]
599    fn test_object_payload_from_ralph_emit_json() {
600        // Test that `ralph emit --json` object payloads are parsed correctly
601        // This was the root cause of "invalid type: map, expected a string" errors
602        let tmp = TempDir::new().unwrap();
603        let path = tmp.path().join("events.jsonl");
604
605        let mut file = File::create(&path).unwrap();
606
607        // String payload (normal case)
608        writeln!(
609            file,
610            r#"{{"ts":"2024-01-15T10:00:00Z","topic":"task.start","payload":"implement feature"}}"#
611        )
612        .unwrap();
613
614        // Object payload (from `ralph emit --json`)
615        writeln!(
616            file,
617            r#"{{"topic":"task.complete","payload":{{"status":"verified","tasks":["auth","api"]}},"ts":"2024-01-15T10:30:00Z"}}"#
618        )
619        .unwrap();
620
621        // Nested object payload
622        writeln!(
623            file,
624            r#"{{"topic":"loop.recovery","payload":{{"status":"recovered","evidence":{{"tests":"pass"}}}},"ts":"2024-01-15T10:45:00Z"}}"#
625        )
626        .unwrap();
627
628        let history = EventHistory::new(&path);
629        let records = history.read_all().unwrap();
630
631        assert_eq!(records.len(), 3);
632
633        // String payload unchanged
634        assert_eq!(records[0].topic, "task.start");
635        assert_eq!(records[0].payload, "implement feature");
636
637        // Object payload converted to JSON string
638        assert_eq!(records[1].topic, "task.complete");
639        assert!(records[1].payload.contains("\"status\""));
640        assert!(records[1].payload.contains("\"verified\""));
641        // Should be valid JSON
642        let parsed: serde_json::Value = serde_json::from_str(&records[1].payload).unwrap();
643        assert_eq!(parsed["status"], "verified");
644
645        // Nested object also works
646        assert_eq!(records[2].topic, "loop.recovery");
647        let parsed: serde_json::Value = serde_json::from_str(&records[2].payload).unwrap();
648        assert_eq!(parsed["evidence"]["tests"], "pass");
649    }
650}