Skip to main content

ralph_core/
event_logger.rs

1//! Event logging for debugging and post-mortem analysis.
2//!
3//! Logs all events to `.agent/events.jsonl` as specified in the event-loop spec.
4//! The observer pattern allows hooking into the event bus without modifying routing.
5
6use ralph_proto::{Event, HatId};
7use serde::{Deserialize, Deserializer, Serialize};
8use std::fs::{self, File, OpenOptions};
9use std::io::{BufRead, BufReader, Write};
10use std::path::{Path, PathBuf};
11use tracing::{debug, warn};
12
13/// Custom deserializer that accepts both String and structured JSON payloads.
14///
15/// Agents sometimes write structured data as JSON objects instead of strings.
16/// This deserializer accepts both formats:
17/// - `"payload": "string"` → `"string"`
18/// - `"payload": {...}` → `"{...}"` (serialized to JSON string)
19/// - `"payload": null` or missing → `""` (empty string)
20fn deserialize_flexible_payload<'de, D>(deserializer: D) -> Result<String, D::Error>
21where
22    D: Deserializer<'de>,
23{
24    #[derive(Deserialize)]
25    #[serde(untagged)]
26    enum FlexiblePayload {
27        String(String),
28        Object(serde_json::Value),
29    }
30
31    let opt = Option::<FlexiblePayload>::deserialize(deserializer)?;
32    Ok(opt
33        .map(|flex| match flex {
34            FlexiblePayload::String(s) => s,
35            FlexiblePayload::Object(serde_json::Value::Null) => String::new(),
36            FlexiblePayload::Object(obj) => {
37                // Serialize the object back to a JSON string
38                serde_json::to_string(&obj).unwrap_or_else(|_| obj.to_string())
39            }
40        })
41        .unwrap_or_default())
42}
43
44/// A logged event record for debugging.
45///
46/// Supports two schemas:
47/// 1. Rich internal format (logged by Ralph):
48///    `{"ts":"2024-01-15T10:23:45Z","iteration":1,"hat":"loop","topic":"task.start","triggered":"planner","payload":"..."}`
49/// 2. Simple agent format (written by agents):
50///    `{"topic":"build.task","payload":"...","ts":"2024-01-15T10:24:12Z"}`
51///
52/// Fields that don't exist in the agent format default to sensible values.
53#[derive(Debug, Clone, Serialize, Deserialize)]
54pub struct EventRecord {
55    /// ISO 8601 timestamp.
56    pub ts: String,
57
58    /// Loop iteration number (0 if not provided by agent-written events).
59    #[serde(default)]
60    pub iteration: u32,
61
62    /// Hat that was active when event was published (empty string if not provided).
63    #[serde(default)]
64    pub hat: String,
65
66    /// Event topic.
67    pub topic: String,
68
69    /// Hat that will be triggered by this event.
70    #[serde(skip_serializing_if = "Option::is_none")]
71    pub triggered: Option<String>,
72
73    /// Event content (truncated if large). Defaults to empty string for agent events without payload.
74    /// Accepts both string and object payloads - objects are serialized to JSON strings.
75    #[serde(default, deserialize_with = "deserialize_flexible_payload")]
76    pub payload: String,
77
78    /// How many times this task has blocked (optional).
79    #[serde(skip_serializing_if = "Option::is_none")]
80    pub blocked_count: Option<u32>,
81}
82
83impl EventRecord {
84    /// Maximum payload length before truncation.
85    const MAX_PAYLOAD_LEN: usize = 500;
86
87    /// Creates a new event record.
88    pub fn new(
89        iteration: u32,
90        hat: impl Into<String>,
91        event: &Event,
92        triggered: Option<&HatId>,
93    ) -> Self {
94        let payload = if event.payload.len() > Self::MAX_PAYLOAD_LEN {
95            // Find a valid UTF-8 char boundary at or before MAX_PAYLOAD_LEN.
96            // We walk backwards from the limit until we find a char boundary.
97            let mut truncate_at = Self::MAX_PAYLOAD_LEN;
98            while truncate_at > 0 && !event.payload.is_char_boundary(truncate_at) {
99                truncate_at -= 1;
100            }
101            format!(
102                "{}... [truncated, {} chars total]",
103                &event.payload[..truncate_at],
104                event.payload.chars().count()
105            )
106        } else {
107            event.payload.clone()
108        };
109
110        Self {
111            ts: chrono::Utc::now().to_rfc3339(),
112            iteration,
113            hat: hat.into(),
114            topic: event.topic.to_string(),
115            triggered: triggered.map(|h| h.to_string()),
116            payload,
117            blocked_count: None,
118        }
119    }
120
121    /// Sets the blocked count for this record.
122    pub fn with_blocked_count(mut self, count: u32) -> Self {
123        self.blocked_count = Some(count);
124        self
125    }
126}
127
128/// Logger that writes events to a JSONL file.
129pub struct EventLogger {
130    /// Path to the events file.
131    path: PathBuf,
132
133    /// File handle for appending.
134    file: Option<File>,
135}
136
137impl EventLogger {
138    /// Default path for the events file.
139    pub const DEFAULT_PATH: &'static str = ".agent/events.jsonl";
140
141    /// Creates a new event logger.
142    ///
143    /// The `.agent/` directory is created if it doesn't exist.
144    pub fn new(path: impl Into<PathBuf>) -> Self {
145        Self {
146            path: path.into(),
147            file: None,
148        }
149    }
150
151    /// Creates a logger with the default path.
152    pub fn default_path() -> Self {
153        Self::new(Self::DEFAULT_PATH)
154    }
155
156    /// Ensures the parent directory exists and opens the file.
157    fn ensure_open(&mut self) -> std::io::Result<&mut File> {
158        if self.file.is_none() {
159            if let Some(parent) = self.path.parent() {
160                fs::create_dir_all(parent)?;
161            }
162            let file = OpenOptions::new()
163                .create(true)
164                .append(true)
165                .open(&self.path)?;
166            self.file = Some(file);
167        }
168        Ok(self.file.as_mut().unwrap())
169    }
170
171    /// Logs an event record.
172    pub fn log(&mut self, record: &EventRecord) -> std::io::Result<()> {
173        let file = self.ensure_open()?;
174        let json = serde_json::to_string(record)?;
175        writeln!(file, "{}", json)?;
176        file.flush()?;
177        debug!(topic = %record.topic, iteration = record.iteration, "Event logged");
178        Ok(())
179    }
180
181    /// Convenience method to log an event directly.
182    pub fn log_event(
183        &mut self,
184        iteration: u32,
185        hat: &str,
186        event: &Event,
187        triggered: Option<&HatId>,
188    ) -> std::io::Result<()> {
189        let record = EventRecord::new(iteration, hat, event, triggered);
190        self.log(&record)
191    }
192
193    /// Returns the path to the log file.
194    pub fn path(&self) -> &Path {
195        &self.path
196    }
197}
198
199/// Reader for event history files.
200pub struct EventHistory {
201    path: PathBuf,
202}
203
204impl EventHistory {
205    /// Creates a new history reader.
206    pub fn new(path: impl Into<PathBuf>) -> Self {
207        Self { path: path.into() }
208    }
209
210    /// Creates a reader for the default path.
211    pub fn default_path() -> Self {
212        Self::new(EventLogger::DEFAULT_PATH)
213    }
214
215    /// Returns true if the history file exists.
216    pub fn exists(&self) -> bool {
217        self.path.exists()
218    }
219
220    /// Reads all event records from the file.
221    pub fn read_all(&self) -> std::io::Result<Vec<EventRecord>> {
222        if !self.exists() {
223            return Ok(Vec::new());
224        }
225
226        let file = File::open(&self.path)?;
227        let reader = BufReader::new(file);
228        let mut records = Vec::new();
229
230        for (line_num, line) in reader.lines().enumerate() {
231            let line = line?;
232            if line.trim().is_empty() {
233                continue;
234            }
235            match serde_json::from_str(&line) {
236                Ok(record) => records.push(record),
237                Err(e) => {
238                    warn!(line = line_num + 1, error = %e, "Failed to parse event record");
239                }
240            }
241        }
242
243        Ok(records)
244    }
245
246    /// Reads the last N event records.
247    pub fn read_last(&self, n: usize) -> std::io::Result<Vec<EventRecord>> {
248        let all = self.read_all()?;
249        let start = all.len().saturating_sub(n);
250        Ok(all[start..].to_vec())
251    }
252
253    /// Reads events filtered by topic.
254    pub fn filter_by_topic(&self, topic: &str) -> std::io::Result<Vec<EventRecord>> {
255        let all = self.read_all()?;
256        Ok(all.into_iter().filter(|r| r.topic == topic).collect())
257    }
258
259    /// Reads events filtered by iteration.
260    pub fn filter_by_iteration(&self, iteration: u32) -> std::io::Result<Vec<EventRecord>> {
261        let all = self.read_all()?;
262        Ok(all
263            .into_iter()
264            .filter(|r| r.iteration == iteration)
265            .collect())
266    }
267
268    /// Clears the event history file.
269    pub fn clear(&self) -> std::io::Result<()> {
270        if self.exists() {
271            fs::remove_file(&self.path)?;
272        }
273        Ok(())
274    }
275}
276
277#[cfg(test)]
278mod tests {
279    use super::*;
280    use tempfile::TempDir;
281
282    fn make_event(topic: &str, payload: &str) -> Event {
283        Event::new(topic, payload)
284    }
285
286    #[test]
287    fn test_log_and_read() {
288        let tmp = TempDir::new().unwrap();
289        let path = tmp.path().join("events.jsonl");
290
291        let mut logger = EventLogger::new(&path);
292
293        // Log some events
294        let event1 = make_event("task.start", "Starting task");
295        let event2 = make_event("build.done", "Build complete");
296
297        logger
298            .log_event(1, "loop", &event1, Some(&HatId::new("planner")))
299            .unwrap();
300        logger
301            .log_event(2, "builder", &event2, Some(&HatId::new("planner")))
302            .unwrap();
303
304        // Read them back
305        let history = EventHistory::new(&path);
306        let records = history.read_all().unwrap();
307
308        assert_eq!(records.len(), 2);
309        assert_eq!(records[0].topic, "task.start");
310        assert_eq!(records[0].iteration, 1);
311        assert_eq!(records[0].hat, "loop");
312        assert_eq!(records[0].triggered, Some("planner".to_string()));
313        assert_eq!(records[1].topic, "build.done");
314    }
315
316    #[test]
317    fn test_read_last() {
318        let tmp = TempDir::new().unwrap();
319        let path = tmp.path().join("events.jsonl");
320
321        let mut logger = EventLogger::new(&path);
322
323        for i in 1..=10 {
324            let event = make_event("test", &format!("Event {}", i));
325            logger.log_event(i, "hat", &event, None).unwrap();
326        }
327
328        let history = EventHistory::new(&path);
329        let last_3 = history.read_last(3).unwrap();
330
331        assert_eq!(last_3.len(), 3);
332        assert_eq!(last_3[0].iteration, 8);
333        assert_eq!(last_3[2].iteration, 10);
334    }
335
336    #[test]
337    fn test_filter_by_topic() {
338        let tmp = TempDir::new().unwrap();
339        let path = tmp.path().join("events.jsonl");
340
341        let mut logger = EventLogger::new(&path);
342
343        logger
344            .log_event(1, "hat", &make_event("build.done", "a"), None)
345            .unwrap();
346        logger
347            .log_event(2, "hat", &make_event("build.blocked", "b"), None)
348            .unwrap();
349        logger
350            .log_event(3, "hat", &make_event("build.done", "c"), None)
351            .unwrap();
352
353        let history = EventHistory::new(&path);
354        let blocked = history.filter_by_topic("build.blocked").unwrap();
355
356        assert_eq!(blocked.len(), 1);
357        assert_eq!(blocked[0].iteration, 2);
358    }
359
360    #[test]
361    fn test_payload_truncation() {
362        let long_payload = "x".repeat(1000);
363        let event = make_event("test", &long_payload);
364        let record = EventRecord::new(1, "hat", &event, None);
365
366        assert!(record.payload.len() < 1000);
367        assert!(record.payload.contains("[truncated"));
368    }
369
370    #[test]
371    fn test_payload_truncation_with_multibyte_chars() {
372        // Create a payload with multi-byte UTF-8 characters (✅ is 3 bytes)
373        // Place emoji near the truncation boundary to trigger the bug
374        let mut payload = "x".repeat(498);
375        payload.push_str("✅✅✅"); // 3 emojis at bytes 498-506
376        payload.push_str(&"y".repeat(500));
377
378        let event = make_event("test", &payload);
379        // This should NOT panic
380        let record = EventRecord::new(1, "hat", &event, None);
381
382        assert!(record.payload.contains("[truncated"));
383        // Verify the payload is valid UTF-8 (would panic on iteration if not)
384        for _ in record.payload.chars() {}
385    }
386
387    #[test]
388    fn test_creates_parent_directory() {
389        let tmp = TempDir::new().unwrap();
390        let path = tmp.path().join("nested/dir/events.jsonl");
391
392        let mut logger = EventLogger::new(&path);
393        let event = make_event("test", "payload");
394        logger.log_event(1, "hat", &event, None).unwrap();
395
396        assert!(path.exists());
397    }
398
399    #[test]
400    fn test_empty_history() {
401        let tmp = TempDir::new().unwrap();
402        let path = tmp.path().join("nonexistent.jsonl");
403
404        let history = EventHistory::new(&path);
405        assert!(!history.exists());
406
407        let records = history.read_all().unwrap();
408        assert!(records.is_empty());
409    }
410
411    #[test]
412    fn test_agent_written_events_without_iteration() {
413        // Agent events use simple format: {"topic":"...","payload":"...","ts":"..."}
414        // They don't include iteration, hat, or triggered fields
415        let tmp = TempDir::new().unwrap();
416        let path = tmp.path().join("events.jsonl");
417
418        // Write agent-style events (without iteration field)
419        let mut file = File::create(&path).unwrap();
420        writeln!(
421            file,
422            r#"{{"topic":"build.task","payload":"Implement auth","ts":"2024-01-15T10:00:00Z"}}"#
423        )
424        .unwrap();
425        writeln!(
426            file,
427            r#"{{"topic":"build.done","ts":"2024-01-15T10:30:00Z"}}"#
428        )
429        .unwrap();
430
431        // Should read without warnings (iteration defaults to 0)
432        let history = EventHistory::new(&path);
433        let records = history.read_all().unwrap();
434
435        assert_eq!(records.len(), 2);
436        assert_eq!(records[0].topic, "build.task");
437        assert_eq!(records[0].payload, "Implement auth");
438        assert_eq!(records[0].iteration, 0); // Defaults to 0
439        assert_eq!(records[0].hat, ""); // Defaults to empty string
440        assert_eq!(records[1].topic, "build.done");
441        assert_eq!(records[1].payload, ""); // Defaults to empty when not provided
442    }
443
444    #[test]
445    fn test_mixed_event_formats() {
446        // Test that both agent-written and Ralph-logged events can coexist
447        let tmp = TempDir::new().unwrap();
448        let path = tmp.path().join("events.jsonl");
449
450        // Write a Ralph-logged event (full format)
451        let mut logger = EventLogger::new(&path);
452        let event = make_event("task.start", "Initial task");
453        logger.log_event(1, "loop", &event, Some(&HatId::new("planner"))).unwrap();
454
455        // Write an agent-style event (simple format)
456        let mut file = std::fs::OpenOptions::new()
457            .append(true)
458            .open(&path)
459            .unwrap();
460        writeln!(
461            file,
462            r#"{{"topic":"build.task","payload":"Agent wrote this","ts":"2024-01-15T10:05:00Z"}}"#
463        )
464        .unwrap();
465
466        // Should read both without warnings
467        let history = EventHistory::new(&path);
468        let records = history.read_all().unwrap();
469
470        assert_eq!(records.len(), 2);
471        // First is Ralph's full-format event
472        assert_eq!(records[0].topic, "task.start");
473        assert_eq!(records[0].iteration, 1);
474        assert_eq!(records[0].hat, "loop");
475        // Second is agent's simple format
476        assert_eq!(records[1].topic, "build.task");
477        assert_eq!(records[1].iteration, 0); // Defaulted
478        assert_eq!(records[1].hat, ""); // Defaulted
479    }
480
481    #[test]
482    fn test_object_payload_from_ralph_emit_json() {
483        // Test that `ralph emit --json` object payloads are parsed correctly
484        // This was the root cause of "invalid type: map, expected a string" errors
485        let tmp = TempDir::new().unwrap();
486        let path = tmp.path().join("events.jsonl");
487
488        let mut file = File::create(&path).unwrap();
489
490        // String payload (normal case)
491        writeln!(
492            file,
493            r#"{{"ts":"2024-01-15T10:00:00Z","topic":"task.start","payload":"implement feature"}}"#
494        )
495        .unwrap();
496
497        // Object payload (from `ralph emit --json`)
498        writeln!(
499            file,
500            r#"{{"topic":"task.complete","payload":{{"status":"verified","tasks":["auth","api"]}},"ts":"2024-01-15T10:30:00Z"}}"#
501        )
502        .unwrap();
503
504        // Nested object payload
505        writeln!(
506            file,
507            r#"{{"topic":"loop.recovery","payload":{{"status":"recovered","evidence":{{"tests":"pass"}}}},"ts":"2024-01-15T10:45:00Z"}}"#
508        )
509        .unwrap();
510
511        let history = EventHistory::new(&path);
512        let records = history.read_all().unwrap();
513
514        assert_eq!(records.len(), 3);
515
516        // String payload unchanged
517        assert_eq!(records[0].topic, "task.start");
518        assert_eq!(records[0].payload, "implement feature");
519
520        // Object payload converted to JSON string
521        assert_eq!(records[1].topic, "task.complete");
522        assert!(records[1].payload.contains("\"status\""));
523        assert!(records[1].payload.contains("\"verified\""));
524        // Should be valid JSON
525        let parsed: serde_json::Value = serde_json::from_str(&records[1].payload).unwrap();
526        assert_eq!(parsed["status"], "verified");
527
528        // Nested object also works
529        assert_eq!(records[2].topic, "loop.recovery");
530        let parsed: serde_json::Value = serde_json::from_str(&records[2].payload).unwrap();
531        assert_eq!(parsed["evidence"]["tests"], "pass");
532    }
533}