Skip to main content

ralph_core/
event_reader.rs

1//! Event reader for consuming events from `.agent/events.jsonl`.
2
3use serde::{Deserialize, Deserializer, Serialize};
4use std::fs::File;
5use std::io::{BufRead, BufReader, Seek, SeekFrom};
6use std::path::PathBuf;
7use tracing::warn;
8
9/// Result of parsing events from a JSONL file.
10///
11/// Contains both successfully parsed events and information about lines
12/// that failed to parse. This supports backpressure validation by allowing
13/// the caller to respond to malformed events.
14#[derive(Debug, Clone, Default)]
15pub struct ParseResult {
16    /// Successfully parsed events.
17    pub events: Vec<Event>,
18    /// Lines that failed to parse.
19    pub malformed: Vec<MalformedLine>,
20}
21
22/// Information about a malformed JSONL line.
23///
24/// Used for backpressure feedback - when agents write invalid JSONL,
25/// this provides details for the `event.malformed` system event.
26#[derive(Debug, Clone, Serialize)]
27pub struct MalformedLine {
28    /// Line number in the file (1-indexed).
29    pub line_number: u64,
30    /// The raw content that failed to parse (truncated if very long).
31    pub content: String,
32    /// The parse error message.
33    pub error: String,
34}
35
36impl MalformedLine {
37    /// Maximum content length before truncation.
38    const MAX_CONTENT_LEN: usize = 100;
39
40    /// Creates a new MalformedLine, truncating content if needed.
41    pub fn new(line_number: u64, content: &str, error: String) -> Self {
42        let content = if content.len() > Self::MAX_CONTENT_LEN {
43            format!("{}...", &content[..Self::MAX_CONTENT_LEN])
44        } else {
45            content.to_string()
46        };
47        Self {
48            line_number,
49            content,
50            error,
51        }
52    }
53}
54
55/// Custom deserializer that accepts both String and structured JSON payloads.
56///
57/// Agents sometimes write structured data as JSON objects instead of strings.
58/// This deserializer accepts both formats:
59/// - `"payload": "string"` → `Some("string")`
60/// - `"payload": {...}` → `Some("{...}")` (serialized to JSON string)
61/// - `"payload": null` → `None`
62/// - missing field → `None`
63fn deserialize_flexible_payload<'de, D>(deserializer: D) -> Result<Option<String>, D::Error>
64where
65    D: Deserializer<'de>,
66{
67    #[derive(Deserialize)]
68    #[serde(untagged)]
69    enum FlexiblePayload {
70        String(String),
71        Object(serde_json::Value),
72    }
73
74    let opt = Option::<FlexiblePayload>::deserialize(deserializer)?;
75    Ok(opt.map(|flex| match flex {
76        FlexiblePayload::String(s) => s,
77        FlexiblePayload::Object(obj) => {
78            // Serialize the object back to a JSON string
79            serde_json::to_string(&obj).unwrap_or_else(|_| obj.to_string())
80        }
81    }))
82}
83
84/// A simplified event for reading from JSONL.
85#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
86pub struct Event {
87    pub topic: String,
88    #[serde(
89        default,
90        skip_serializing_if = "Option::is_none",
91        deserialize_with = "deserialize_flexible_payload"
92    )]
93    pub payload: Option<String>,
94    pub ts: String,
95}
96
97/// Reads new events from `.agent/events.jsonl` since last read.
98pub struct EventReader {
99    path: PathBuf,
100    position: u64,
101}
102
103impl EventReader {
104    /// Creates a new event reader for the given path.
105    pub fn new(path: impl Into<PathBuf>) -> Self {
106        Self {
107            path: path.into(),
108            position: 0,
109        }
110    }
111
112    /// Reads new events since the last read.
113    ///
114    /// Returns a `ParseResult` containing both successfully parsed events
115    /// and information about malformed lines. This enables backpressure
116    /// validation - the caller can emit `event.malformed` events and
117    /// track consecutive failures.
118    ///
119    /// # Errors
120    ///
121    /// Returns an error if the file cannot be opened or read.
122    pub fn read_new_events(&mut self) -> std::io::Result<ParseResult> {
123        if !self.path.exists() {
124            return Ok(ParseResult::default());
125        }
126
127        let mut file = File::open(&self.path)?;
128        file.seek(SeekFrom::Start(self.position))?;
129
130        let reader = BufReader::new(file);
131        let mut result = ParseResult::default();
132        let mut current_pos = self.position;
133        let mut line_number = self.count_lines_before_position();
134
135        for line in reader.lines() {
136            let line = line?;
137            let line_bytes = line.len() as u64 + 1; // +1 for newline
138            line_number += 1;
139
140            if line.trim().is_empty() {
141                current_pos += line_bytes;
142                continue;
143            }
144
145            match serde_json::from_str::<Event>(&line) {
146                Ok(event) => result.events.push(event),
147                Err(e) => {
148                    warn!(error = %e, line_number = line_number, "Malformed JSON line");
149                    result.malformed.push(MalformedLine::new(
150                        line_number,
151                        &line,
152                        e.to_string(),
153                    ));
154                }
155            }
156
157            current_pos += line_bytes;
158        }
159
160        self.position = current_pos;
161        Ok(result)
162    }
163
164    /// Counts lines before the current position (for line numbering).
165    fn count_lines_before_position(&self) -> u64 {
166        if self.position == 0 || !self.path.exists() {
167            return 0;
168        }
169        // Read file up to position and count newlines
170        if let Ok(file) = File::open(&self.path) {
171            let reader = BufReader::new(file);
172            let mut count = 0u64;
173            let mut bytes_read = 0u64;
174            for line in reader.lines() {
175                if let Ok(line) = line {
176                    bytes_read += line.len() as u64 + 1;
177                    if bytes_read > self.position {
178                        break;
179                    }
180                    count += 1;
181                } else {
182                    break;
183                }
184            }
185            count
186        } else {
187            0
188        }
189    }
190
191    /// Returns the current file position.
192    pub fn position(&self) -> u64 {
193        self.position
194    }
195
196    /// Resets the position to the start of the file.
197    pub fn reset(&mut self) {
198        self.position = 0;
199    }
200}
201
202#[cfg(test)]
203mod tests {
204    use super::*;
205    use std::io::Write;
206    use tempfile::NamedTempFile;
207
208    #[test]
209    fn test_read_new_events() {
210        let mut file = NamedTempFile::new().unwrap();
211        writeln!(
212            file,
213            r#"{{"topic":"test","payload":"hello","ts":"2024-01-01T00:00:00Z"}}"#
214        )
215        .unwrap();
216        writeln!(
217            file,
218            r#"{{"topic":"test2","ts":"2024-01-01T00:00:01Z"}}"#
219        )
220        .unwrap();
221        file.flush().unwrap();
222
223        let mut reader = EventReader::new(file.path());
224        let result = reader.read_new_events().unwrap();
225
226        assert_eq!(result.events.len(), 2);
227        assert_eq!(result.events[0].topic, "test");
228        assert_eq!(result.events[0].payload, Some("hello".to_string()));
229        assert_eq!(result.events[1].topic, "test2");
230        assert_eq!(result.events[1].payload, None);
231        assert!(result.malformed.is_empty());
232    }
233
234    #[test]
235    fn test_tracks_position() {
236        let mut file = NamedTempFile::new().unwrap();
237        writeln!(
238            file,
239            r#"{{"topic":"first","ts":"2024-01-01T00:00:00Z"}}"#
240        )
241        .unwrap();
242        file.flush().unwrap();
243
244        let mut reader = EventReader::new(file.path());
245        let result = reader.read_new_events().unwrap();
246        assert_eq!(result.events.len(), 1);
247
248        // Add more events
249        writeln!(
250            file,
251            r#"{{"topic":"second","ts":"2024-01-01T00:00:01Z"}}"#
252        )
253        .unwrap();
254        file.flush().unwrap();
255
256        // Should only read new events
257        let result = reader.read_new_events().unwrap();
258        assert_eq!(result.events.len(), 1);
259        assert_eq!(result.events[0].topic, "second");
260    }
261
262    #[test]
263    fn test_missing_file() {
264        let mut reader = EventReader::new("/nonexistent/path.jsonl");
265        let result = reader.read_new_events().unwrap();
266        assert!(result.events.is_empty());
267        assert!(result.malformed.is_empty());
268    }
269
270    #[test]
271    fn test_captures_malformed_lines() {
272        let mut file = NamedTempFile::new().unwrap();
273        writeln!(
274            file,
275            r#"{{"topic":"good","ts":"2024-01-01T00:00:00Z"}}"#
276        )
277        .unwrap();
278        writeln!(file, r"{{corrupt json}}").unwrap();
279        writeln!(
280            file,
281            r#"{{"topic":"also_good","ts":"2024-01-01T00:00:01Z"}}"#
282        )
283        .unwrap();
284        file.flush().unwrap();
285
286        let mut reader = EventReader::new(file.path());
287        let result = reader.read_new_events().unwrap();
288
289        // Good events should be parsed
290        assert_eq!(result.events.len(), 2);
291        assert_eq!(result.events[0].topic, "good");
292        assert_eq!(result.events[1].topic, "also_good");
293
294        // Malformed line should be captured
295        assert_eq!(result.malformed.len(), 1);
296        assert_eq!(result.malformed[0].line_number, 2);
297        assert!(result.malformed[0].content.contains("corrupt json"));
298        assert!(!result.malformed[0].error.is_empty());
299    }
300
301    #[test]
302    fn test_empty_file() {
303        let file = NamedTempFile::new().unwrap();
304        let mut reader = EventReader::new(file.path());
305        let result = reader.read_new_events().unwrap();
306        assert!(result.events.is_empty());
307        assert!(result.malformed.is_empty());
308    }
309
310    #[test]
311    fn test_reset_position() {
312        let mut file = NamedTempFile::new().unwrap();
313        writeln!(
314            file,
315            r#"{{"topic":"test","ts":"2024-01-01T00:00:00Z"}}"#
316        )
317        .unwrap();
318        file.flush().unwrap();
319
320        let mut reader = EventReader::new(file.path());
321        reader.read_new_events().unwrap();
322        assert!(reader.position() > 0);
323
324        reader.reset();
325        assert_eq!(reader.position(), 0);
326
327        let result = reader.read_new_events().unwrap();
328        assert_eq!(result.events.len(), 1);
329    }
330
331    #[test]
332    fn test_structured_payload_as_object() {
333        // Test that JSON objects in payload field are converted to strings
334        let mut file = NamedTempFile::new().unwrap();
335        writeln!(
336            file,
337            r#"{{"topic":"review.done","payload":{{"status":"approved","files":["a.rs","b.rs"]}},"ts":"2024-01-01T00:00:00Z"}}"#
338        )
339        .unwrap();
340        file.flush().unwrap();
341
342        let mut reader = EventReader::new(file.path());
343        let result = reader.read_new_events().unwrap();
344
345        assert_eq!(result.events.len(), 1);
346        assert_eq!(result.events[0].topic, "review.done");
347
348        // Payload should be stringified JSON
349        let payload = result.events[0].payload.as_ref().unwrap();
350        assert!(payload.contains("\"status\""));
351        assert!(payload.contains("\"approved\""));
352        assert!(payload.contains("\"files\""));
353
354        // Verify it can be parsed back as JSON
355        let parsed: serde_json::Value = serde_json::from_str(payload).unwrap();
356        assert_eq!(parsed["status"], "approved");
357    }
358
359    #[test]
360    fn test_mixed_payload_formats() {
361        // Test mixing string and object payloads in same file
362        let mut file = NamedTempFile::new().unwrap();
363
364        // String payload
365        writeln!(
366            file,
367            r#"{{"topic":"task.start","payload":"Start work","ts":"2024-01-01T00:00:00Z"}}"#
368        )
369        .unwrap();
370
371        // Object payload
372        writeln!(
373            file,
374            r#"{{"topic":"task.done","payload":{{"result":"success"}},"ts":"2024-01-01T00:00:01Z"}}"#
375        )
376        .unwrap();
377
378        // No payload
379        writeln!(
380            file,
381            r#"{{"topic":"heartbeat","ts":"2024-01-01T00:00:02Z"}}"#
382        )
383        .unwrap();
384
385        file.flush().unwrap();
386
387        let mut reader = EventReader::new(file.path());
388        let result = reader.read_new_events().unwrap();
389
390        assert_eq!(result.events.len(), 3);
391
392        // First event: string payload
393        assert_eq!(result.events[0].payload, Some("Start work".to_string()));
394
395        // Second event: object payload converted to string
396        let payload2 = result.events[1].payload.as_ref().unwrap();
397        assert!(payload2.contains("\"result\""));
398
399        // Third event: no payload
400        assert_eq!(result.events[2].payload, None);
401    }
402
403    #[test]
404    fn test_nested_object_payload() {
405        // Test deeply nested objects are handled correctly
406        let mut file = NamedTempFile::new().unwrap();
407        writeln!(
408            file,
409            r#"{{"topic":"analysis","payload":{{"issues":[{{"file":"test.rs","line":42,"severity":"major"}}],"approval":"conditional"}},"ts":"2024-01-01T00:00:00Z"}}"#
410        )
411        .unwrap();
412        file.flush().unwrap();
413
414        let mut reader = EventReader::new(file.path());
415        let result = reader.read_new_events().unwrap();
416
417        assert_eq!(result.events.len(), 1);
418
419        // Should serialize nested structure
420        let payload = result.events[0].payload.as_ref().unwrap();
421        let parsed: serde_json::Value = serde_json::from_str(payload).unwrap();
422        assert_eq!(parsed["issues"][0]["file"], "test.rs");
423        assert_eq!(parsed["issues"][0]["line"], 42);
424        assert_eq!(parsed["approval"], "conditional");
425    }
426
427    #[test]
428    fn test_mixed_valid_invalid_handling() {
429        // Test that valid events are captured alongside malformed ones
430        let mut file = NamedTempFile::new().unwrap();
431        writeln!(
432            file,
433            r#"{{"topic":"valid1","ts":"2024-01-01T00:00:00Z"}}"#
434        )
435        .unwrap();
436        writeln!(file, "not valid json at all").unwrap();
437        writeln!(
438            file,
439            r#"{{"topic":"valid2","ts":"2024-01-01T00:00:01Z"}}"#
440        )
441        .unwrap();
442        file.flush().unwrap();
443
444        let mut reader = EventReader::new(file.path());
445        let result = reader.read_new_events().unwrap();
446
447        assert_eq!(result.events.len(), 2);
448        assert_eq!(result.malformed.len(), 1);
449        assert_eq!(result.events[0].topic, "valid1");
450        assert_eq!(result.events[1].topic, "valid2");
451    }
452}