Skip to main content

ralph_core/
event_reader.rs

1//! Event reader for consuming events from `.ralph/events.jsonl`.
2
3use serde::{Deserialize, Deserializer, Serialize};
4use std::fs::File;
5use std::io::{BufRead, BufReader, Seek, SeekFrom};
6use std::path::PathBuf;
7use tracing::warn;
8
9/// Result of parsing events from a JSONL file.
10///
11/// Contains both successfully parsed events and information about lines
12/// that failed to parse. This supports backpressure validation by allowing
13/// the caller to respond to malformed events.
14#[derive(Debug, Clone, Default)]
15pub struct ParseResult {
16    /// Successfully parsed events.
17    pub events: Vec<Event>,
18    /// Lines that failed to parse.
19    pub malformed: Vec<MalformedLine>,
20}
21
22/// Information about a malformed JSONL line.
23///
24/// Used for backpressure feedback - when agents write invalid JSONL,
25/// this provides details for the `event.malformed` system event.
26#[derive(Debug, Clone, Serialize)]
27pub struct MalformedLine {
28    /// Line number in the file (1-indexed).
29    pub line_number: u64,
30    /// The raw content that failed to parse (truncated if very long).
31    pub content: String,
32    /// The parse error message.
33    pub error: String,
34}
35
36impl MalformedLine {
37    /// Maximum content length before truncation.
38    const MAX_CONTENT_LEN: usize = 100;
39
40    /// Creates a new MalformedLine, truncating content if needed.
41    pub fn new(line_number: u64, content: &str, error: String) -> Self {
42        let content = if content.len() > Self::MAX_CONTENT_LEN {
43            format!("{}...", &content[..Self::MAX_CONTENT_LEN])
44        } else {
45            content.to_string()
46        };
47        Self {
48            line_number,
49            content,
50            error,
51        }
52    }
53}
54
55/// Custom deserializer that accepts both String and structured JSON payloads.
56///
57/// Agents sometimes write structured data as JSON objects instead of strings.
58/// This deserializer accepts both formats:
59/// - `"payload": "string"` → `Some("string")`
60/// - `"payload": {...}` → `Some("{...}")` (serialized to JSON string)
61/// - `"payload": null` → `None`
62/// - missing field → `None`
63fn deserialize_flexible_payload<'de, D>(deserializer: D) -> Result<Option<String>, D::Error>
64where
65    D: Deserializer<'de>,
66{
67    #[derive(Deserialize)]
68    #[serde(untagged)]
69    enum FlexiblePayload {
70        String(String),
71        Object(serde_json::Value),
72    }
73
74    let opt = Option::<FlexiblePayload>::deserialize(deserializer)?;
75    Ok(opt.map(|flex| match flex {
76        FlexiblePayload::String(s) => s,
77        FlexiblePayload::Object(obj) => {
78            // Serialize the object back to a JSON string
79            serde_json::to_string(&obj).unwrap_or_else(|_| obj.to_string())
80        }
81    }))
82}
83
84/// A simplified event for reading from JSONL.
85#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
86pub struct Event {
87    pub topic: String,
88    #[serde(
89        default,
90        skip_serializing_if = "Option::is_none",
91        deserialize_with = "deserialize_flexible_payload"
92    )]
93    pub payload: Option<String>,
94    pub ts: String,
95}
96
97/// Reads new events from `.ralph/events.jsonl` since last read.
98pub struct EventReader {
99    path: PathBuf,
100    position: u64,
101}
102
103impl EventReader {
104    /// Creates a new event reader for the given path.
105    pub fn new(path: impl Into<PathBuf>) -> Self {
106        Self {
107            path: path.into(),
108            position: 0,
109        }
110    }
111
112    /// Reads new events since the last read.
113    ///
114    /// Returns a `ParseResult` containing both successfully parsed events
115    /// and information about malformed lines. This enables backpressure
116    /// validation - the caller can emit `event.malformed` events and
117    /// track consecutive failures.
118    ///
119    /// # Errors
120    ///
121    /// Returns an error if the file cannot be opened or read.
122    pub fn read_new_events(&mut self) -> std::io::Result<ParseResult> {
123        if !self.path.exists() {
124            return Ok(ParseResult::default());
125        }
126
127        let mut file = File::open(&self.path)?;
128        file.seek(SeekFrom::Start(self.position))?;
129
130        let reader = BufReader::new(file);
131        let mut result = ParseResult::default();
132        let mut current_pos = self.position;
133        let mut line_number = self.count_lines_before_position();
134
135        for line in reader.lines() {
136            let line = line?;
137            let line_bytes = line.len() as u64 + 1; // +1 for newline
138            line_number += 1;
139
140            if line.trim().is_empty() {
141                current_pos += line_bytes;
142                continue;
143            }
144
145            match serde_json::from_str::<Event>(&line) {
146                Ok(event) => result.events.push(event),
147                Err(e) => {
148                    warn!(error = %e, line_number = line_number, "Malformed JSON line");
149                    result
150                        .malformed
151                        .push(MalformedLine::new(line_number, &line, e.to_string()));
152                }
153            }
154
155            current_pos += line_bytes;
156        }
157
158        self.position = current_pos;
159        Ok(result)
160    }
161
162    /// Reads new events without advancing the internal file position.
163    ///
164    /// This is used by callers that need to inspect unread events before
165    /// deciding whether to process them.
166    pub fn peek_new_events(&self) -> std::io::Result<ParseResult> {
167        let mut reader = Self {
168            path: self.path.clone(),
169            position: self.position,
170        };
171        reader.read_new_events()
172    }
173
174    /// Counts lines before the current position (for line numbering).
175    fn count_lines_before_position(&self) -> u64 {
176        if self.position == 0 || !self.path.exists() {
177            return 0;
178        }
179        // Read file up to position and count newlines
180        if let Ok(file) = File::open(&self.path) {
181            let reader = BufReader::new(file);
182            let mut count = 0u64;
183            let mut bytes_read = 0u64;
184            for line in reader.lines() {
185                if let Ok(line) = line {
186                    bytes_read += line.len() as u64 + 1;
187                    if bytes_read > self.position {
188                        break;
189                    }
190                    count += 1;
191                } else {
192                    break;
193                }
194            }
195            count
196        } else {
197            0
198        }
199    }
200
201    /// Returns the current file position.
202    pub fn position(&self) -> u64 {
203        self.position
204    }
205
206    /// Resets the position to the start of the file.
207    pub fn reset(&mut self) {
208        self.position = 0;
209    }
210}
211
212#[cfg(test)]
213mod tests {
214    use super::*;
215    use std::io::Write;
216    use tempfile::NamedTempFile;
217
218    #[test]
219    fn test_read_new_events() {
220        let mut file = NamedTempFile::new().unwrap();
221        writeln!(
222            file,
223            r#"{{"topic":"test","payload":"hello","ts":"2024-01-01T00:00:00Z"}}"#
224        )
225        .unwrap();
226        writeln!(file, r#"{{"topic":"test2","ts":"2024-01-01T00:00:01Z"}}"#).unwrap();
227        file.flush().unwrap();
228
229        let mut reader = EventReader::new(file.path());
230        let result = reader.read_new_events().unwrap();
231
232        assert_eq!(result.events.len(), 2);
233        assert_eq!(result.events[0].topic, "test");
234        assert_eq!(result.events[0].payload, Some("hello".to_string()));
235        assert_eq!(result.events[1].topic, "test2");
236        assert_eq!(result.events[1].payload, None);
237        assert!(result.malformed.is_empty());
238    }
239
240    #[test]
241    fn test_tracks_position() {
242        let mut file = NamedTempFile::new().unwrap();
243        writeln!(file, r#"{{"topic":"first","ts":"2024-01-01T00:00:00Z"}}"#).unwrap();
244        file.flush().unwrap();
245
246        let mut reader = EventReader::new(file.path());
247        let result = reader.read_new_events().unwrap();
248        assert_eq!(result.events.len(), 1);
249
250        // Add more events
251        writeln!(file, r#"{{"topic":"second","ts":"2024-01-01T00:00:01Z"}}"#).unwrap();
252        file.flush().unwrap();
253
254        // Should only read new events
255        let result = reader.read_new_events().unwrap();
256        assert_eq!(result.events.len(), 1);
257        assert_eq!(result.events[0].topic, "second");
258    }
259
260    #[test]
261    fn test_peek_new_events_does_not_advance_position() {
262        let mut file = NamedTempFile::new().unwrap();
263        writeln!(file, r#"{{"topic":"first","ts":"2024-01-01T00:00:00Z"}}"#).unwrap();
264        file.flush().unwrap();
265
266        let mut reader = EventReader::new(file.path());
267        let peeked = reader.peek_new_events().unwrap();
268        assert_eq!(peeked.events.len(), 1);
269        assert_eq!(peeked.events[0].topic, "first");
270
271        // Position should remain unchanged after peek.
272        assert_eq!(reader.position(), 0);
273
274        let consumed = reader.read_new_events().unwrap();
275        assert_eq!(consumed.events.len(), 1);
276        assert_eq!(consumed.events[0].topic, "first");
277    }
278
279    #[test]
280    fn test_missing_file() {
281        let mut reader = EventReader::new("/nonexistent/path.jsonl");
282        let result = reader.read_new_events().unwrap();
283        assert!(result.events.is_empty());
284        assert!(result.malformed.is_empty());
285    }
286
287    #[test]
288    fn test_captures_malformed_lines() {
289        let mut file = NamedTempFile::new().unwrap();
290        writeln!(file, r#"{{"topic":"good","ts":"2024-01-01T00:00:00Z"}}"#).unwrap();
291        writeln!(file, r"{{corrupt json}}").unwrap();
292        writeln!(
293            file,
294            r#"{{"topic":"also_good","ts":"2024-01-01T00:00:01Z"}}"#
295        )
296        .unwrap();
297        file.flush().unwrap();
298
299        let mut reader = EventReader::new(file.path());
300        let result = reader.read_new_events().unwrap();
301
302        // Good events should be parsed
303        assert_eq!(result.events.len(), 2);
304        assert_eq!(result.events[0].topic, "good");
305        assert_eq!(result.events[1].topic, "also_good");
306
307        // Malformed line should be captured
308        assert_eq!(result.malformed.len(), 1);
309        assert_eq!(result.malformed[0].line_number, 2);
310        assert!(result.malformed[0].content.contains("corrupt json"));
311        assert!(!result.malformed[0].error.is_empty());
312    }
313
314    #[test]
315    fn test_empty_file() {
316        let file = NamedTempFile::new().unwrap();
317        let mut reader = EventReader::new(file.path());
318        let result = reader.read_new_events().unwrap();
319        assert!(result.events.is_empty());
320        assert!(result.malformed.is_empty());
321    }
322
323    #[test]
324    fn test_reset_position() {
325        let mut file = NamedTempFile::new().unwrap();
326        writeln!(file, r#"{{"topic":"test","ts":"2024-01-01T00:00:00Z"}}"#).unwrap();
327        file.flush().unwrap();
328
329        let mut reader = EventReader::new(file.path());
330        reader.read_new_events().unwrap();
331        assert!(reader.position() > 0);
332
333        reader.reset();
334        assert_eq!(reader.position(), 0);
335
336        let result = reader.read_new_events().unwrap();
337        assert_eq!(result.events.len(), 1);
338    }
339
340    #[test]
341    fn test_structured_payload_as_object() {
342        // Test that JSON objects in payload field are converted to strings
343        let mut file = NamedTempFile::new().unwrap();
344        writeln!(
345            file,
346            r#"{{"topic":"review.done","payload":{{"status":"approved","files":["a.rs","b.rs"]}},"ts":"2024-01-01T00:00:00Z"}}"#
347        )
348        .unwrap();
349        file.flush().unwrap();
350
351        let mut reader = EventReader::new(file.path());
352        let result = reader.read_new_events().unwrap();
353
354        assert_eq!(result.events.len(), 1);
355        assert_eq!(result.events[0].topic, "review.done");
356
357        // Payload should be stringified JSON
358        let payload = result.events[0].payload.as_ref().unwrap();
359        assert!(payload.contains("\"status\""));
360        assert!(payload.contains("\"approved\""));
361        assert!(payload.contains("\"files\""));
362
363        // Verify it can be parsed back as JSON
364        let parsed: serde_json::Value = serde_json::from_str(payload).unwrap();
365        assert_eq!(parsed["status"], "approved");
366    }
367
368    #[test]
369    fn test_mixed_payload_formats() {
370        // Test mixing string and object payloads in same file
371        let mut file = NamedTempFile::new().unwrap();
372
373        // String payload
374        writeln!(
375            file,
376            r#"{{"topic":"task.start","payload":"Start work","ts":"2024-01-01T00:00:00Z"}}"#
377        )
378        .unwrap();
379
380        // Object payload
381        writeln!(
382            file,
383            r#"{{"topic":"task.done","payload":{{"result":"success"}},"ts":"2024-01-01T00:00:01Z"}}"#
384        )
385        .unwrap();
386
387        // No payload
388        writeln!(
389            file,
390            r#"{{"topic":"heartbeat","ts":"2024-01-01T00:00:02Z"}}"#
391        )
392        .unwrap();
393
394        file.flush().unwrap();
395
396        let mut reader = EventReader::new(file.path());
397        let result = reader.read_new_events().unwrap();
398
399        assert_eq!(result.events.len(), 3);
400
401        // First event: string payload
402        assert_eq!(result.events[0].payload, Some("Start work".to_string()));
403
404        // Second event: object payload converted to string
405        let payload2 = result.events[1].payload.as_ref().unwrap();
406        assert!(payload2.contains("\"result\""));
407
408        // Third event: no payload
409        assert_eq!(result.events[2].payload, None);
410    }
411
412    #[test]
413    fn test_nested_object_payload() {
414        // Test deeply nested objects are handled correctly
415        let mut file = NamedTempFile::new().unwrap();
416        writeln!(
417            file,
418            r#"{{"topic":"analysis","payload":{{"issues":[{{"file":"test.rs","line":42,"severity":"major"}}],"approval":"conditional"}},"ts":"2024-01-01T00:00:00Z"}}"#
419        )
420        .unwrap();
421        file.flush().unwrap();
422
423        let mut reader = EventReader::new(file.path());
424        let result = reader.read_new_events().unwrap();
425
426        assert_eq!(result.events.len(), 1);
427
428        // Should serialize nested structure
429        let payload = result.events[0].payload.as_ref().unwrap();
430        let parsed: serde_json::Value = serde_json::from_str(payload).unwrap();
431        assert_eq!(parsed["issues"][0]["file"], "test.rs");
432        assert_eq!(parsed["issues"][0]["line"], 42);
433        assert_eq!(parsed["approval"], "conditional");
434    }
435
436    #[test]
437    fn test_mixed_valid_invalid_handling() {
438        // Test that valid events are captured alongside malformed ones
439        let mut file = NamedTempFile::new().unwrap();
440        writeln!(file, r#"{{"topic":"valid1","ts":"2024-01-01T00:00:00Z"}}"#).unwrap();
441        writeln!(file, "not valid json at all").unwrap();
442        writeln!(file, r#"{{"topic":"valid2","ts":"2024-01-01T00:00:01Z"}}"#).unwrap();
443        file.flush().unwrap();
444
445        let mut reader = EventReader::new(file.path());
446        let result = reader.read_new_events().unwrap();
447
448        assert_eq!(result.events.len(), 2);
449        assert_eq!(result.malformed.len(), 1);
450        assert_eq!(result.events[0].topic, "valid1");
451        assert_eq!(result.events[1].topic, "valid2");
452    }
453}