Skip to main content

ralph_core/
event_reader.rs

1//! Event reader for consuming events from `.ralph/events.jsonl`.
2
3use serde::{Deserialize, Deserializer, Serialize};
4use std::fs::File;
5use std::io::{BufRead, BufReader, Seek, SeekFrom};
6use std::path::PathBuf;
7use tracing::warn;
8
9/// Result of parsing events from a JSONL file.
10///
11/// Contains both successfully parsed events and information about lines
12/// that failed to parse. This supports backpressure validation by allowing
13/// the caller to respond to malformed events.
14#[derive(Debug, Clone, Default)]
15pub struct ParseResult {
16    /// Successfully parsed events.
17    pub events: Vec<Event>,
18    /// Lines that failed to parse.
19    pub malformed: Vec<MalformedLine>,
20}
21
22/// Information about a malformed JSONL line.
23///
24/// Used for backpressure feedback - when agents write invalid JSONL,
25/// this provides details for the `event.malformed` system event.
26#[derive(Debug, Clone, Serialize)]
27pub struct MalformedLine {
28    /// Line number in the file (1-indexed).
29    pub line_number: u64,
30    /// The raw content that failed to parse (truncated if very long).
31    pub content: String,
32    /// The parse error message.
33    pub error: String,
34}
35
36impl MalformedLine {
37    /// Maximum content length before truncation.
38    const MAX_CONTENT_LEN: usize = 100;
39
40    /// Creates a new MalformedLine, truncating content if needed.
41    pub fn new(line_number: u64, content: &str, error: String) -> Self {
42        let content = if content.len() > Self::MAX_CONTENT_LEN {
43            format!("{}...", &content[..Self::MAX_CONTENT_LEN])
44        } else {
45            content.to_string()
46        };
47        Self {
48            line_number,
49            content,
50            error,
51        }
52    }
53}
54
55/// Custom deserializer that accepts both String and structured JSON payloads.
56///
57/// Agents sometimes write structured data as JSON objects instead of strings.
58/// This deserializer accepts both formats:
59/// - `"payload": "string"` → `Some("string")`
60/// - `"payload": {...}` → `Some("{...}")` (serialized to JSON string)
61/// - `"payload": null` → `None`
62/// - missing field → `None`
63fn deserialize_flexible_payload<'de, D>(deserializer: D) -> Result<Option<String>, D::Error>
64where
65    D: Deserializer<'de>,
66{
67    #[derive(Deserialize)]
68    #[serde(untagged)]
69    enum FlexiblePayload {
70        String(String),
71        Object(serde_json::Value),
72    }
73
74    let opt = Option::<FlexiblePayload>::deserialize(deserializer)?;
75    Ok(opt.map(|flex| match flex {
76        FlexiblePayload::String(s) => s,
77        FlexiblePayload::Object(obj) => {
78            // Serialize the object back to a JSON string
79            serde_json::to_string(&obj).unwrap_or_else(|_| obj.to_string())
80        }
81    }))
82}
83
84/// A simplified event for reading from JSONL.
85#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
86pub struct Event {
87    pub topic: String,
88    #[serde(
89        default,
90        skip_serializing_if = "Option::is_none",
91        deserialize_with = "deserialize_flexible_payload"
92    )]
93    pub payload: Option<String>,
94    pub ts: String,
95}
96
97/// Reads new events from `.ralph/events.jsonl` since last read.
98pub struct EventReader {
99    path: PathBuf,
100    position: u64,
101}
102
103impl EventReader {
104    /// Creates a new event reader for the given path.
105    pub fn new(path: impl Into<PathBuf>) -> Self {
106        Self {
107            path: path.into(),
108            position: 0,
109        }
110    }
111
112    /// Reads new events since the last read.
113    ///
114    /// Returns a `ParseResult` containing both successfully parsed events
115    /// and information about malformed lines. This enables backpressure
116    /// validation - the caller can emit `event.malformed` events and
117    /// track consecutive failures.
118    ///
119    /// # Errors
120    ///
121    /// Returns an error if the file cannot be opened or read.
122    pub fn read_new_events(&mut self) -> std::io::Result<ParseResult> {
123        if !self.path.exists() {
124            return Ok(ParseResult::default());
125        }
126
127        let mut file = File::open(&self.path)?;
128        file.seek(SeekFrom::Start(self.position))?;
129
130        let reader = BufReader::new(file);
131        let mut result = ParseResult::default();
132        let mut current_pos = self.position;
133        let mut line_number = self.count_lines_before_position();
134
135        for line in reader.lines() {
136            let line = line?;
137            let line_bytes = line.len() as u64 + 1; // +1 for newline
138            line_number += 1;
139
140            if line.trim().is_empty() {
141                current_pos += line_bytes;
142                continue;
143            }
144
145            match serde_json::from_str::<Event>(&line) {
146                Ok(event) => result.events.push(event),
147                Err(e) => {
148                    warn!(error = %e, line_number = line_number, "Malformed JSON line");
149                    result
150                        .malformed
151                        .push(MalformedLine::new(line_number, &line, e.to_string()));
152                }
153            }
154
155            current_pos += line_bytes;
156        }
157
158        self.position = current_pos;
159        Ok(result)
160    }
161
162    /// Counts lines before the current position (for line numbering).
163    fn count_lines_before_position(&self) -> u64 {
164        if self.position == 0 || !self.path.exists() {
165            return 0;
166        }
167        // Read file up to position and count newlines
168        if let Ok(file) = File::open(&self.path) {
169            let reader = BufReader::new(file);
170            let mut count = 0u64;
171            let mut bytes_read = 0u64;
172            for line in reader.lines() {
173                if let Ok(line) = line {
174                    bytes_read += line.len() as u64 + 1;
175                    if bytes_read > self.position {
176                        break;
177                    }
178                    count += 1;
179                } else {
180                    break;
181                }
182            }
183            count
184        } else {
185            0
186        }
187    }
188
189    /// Returns the current file position.
190    pub fn position(&self) -> u64 {
191        self.position
192    }
193
194    /// Resets the position to the start of the file.
195    pub fn reset(&mut self) {
196        self.position = 0;
197    }
198}
199
200#[cfg(test)]
201mod tests {
202    use super::*;
203    use std::io::Write;
204    use tempfile::NamedTempFile;
205
206    #[test]
207    fn test_read_new_events() {
208        let mut file = NamedTempFile::new().unwrap();
209        writeln!(
210            file,
211            r#"{{"topic":"test","payload":"hello","ts":"2024-01-01T00:00:00Z"}}"#
212        )
213        .unwrap();
214        writeln!(file, r#"{{"topic":"test2","ts":"2024-01-01T00:00:01Z"}}"#).unwrap();
215        file.flush().unwrap();
216
217        let mut reader = EventReader::new(file.path());
218        let result = reader.read_new_events().unwrap();
219
220        assert_eq!(result.events.len(), 2);
221        assert_eq!(result.events[0].topic, "test");
222        assert_eq!(result.events[0].payload, Some("hello".to_string()));
223        assert_eq!(result.events[1].topic, "test2");
224        assert_eq!(result.events[1].payload, None);
225        assert!(result.malformed.is_empty());
226    }
227
228    #[test]
229    fn test_tracks_position() {
230        let mut file = NamedTempFile::new().unwrap();
231        writeln!(file, r#"{{"topic":"first","ts":"2024-01-01T00:00:00Z"}}"#).unwrap();
232        file.flush().unwrap();
233
234        let mut reader = EventReader::new(file.path());
235        let result = reader.read_new_events().unwrap();
236        assert_eq!(result.events.len(), 1);
237
238        // Add more events
239        writeln!(file, r#"{{"topic":"second","ts":"2024-01-01T00:00:01Z"}}"#).unwrap();
240        file.flush().unwrap();
241
242        // Should only read new events
243        let result = reader.read_new_events().unwrap();
244        assert_eq!(result.events.len(), 1);
245        assert_eq!(result.events[0].topic, "second");
246    }
247
248    #[test]
249    fn test_missing_file() {
250        let mut reader = EventReader::new("/nonexistent/path.jsonl");
251        let result = reader.read_new_events().unwrap();
252        assert!(result.events.is_empty());
253        assert!(result.malformed.is_empty());
254    }
255
256    #[test]
257    fn test_captures_malformed_lines() {
258        let mut file = NamedTempFile::new().unwrap();
259        writeln!(file, r#"{{"topic":"good","ts":"2024-01-01T00:00:00Z"}}"#).unwrap();
260        writeln!(file, r"{{corrupt json}}").unwrap();
261        writeln!(
262            file,
263            r#"{{"topic":"also_good","ts":"2024-01-01T00:00:01Z"}}"#
264        )
265        .unwrap();
266        file.flush().unwrap();
267
268        let mut reader = EventReader::new(file.path());
269        let result = reader.read_new_events().unwrap();
270
271        // Good events should be parsed
272        assert_eq!(result.events.len(), 2);
273        assert_eq!(result.events[0].topic, "good");
274        assert_eq!(result.events[1].topic, "also_good");
275
276        // Malformed line should be captured
277        assert_eq!(result.malformed.len(), 1);
278        assert_eq!(result.malformed[0].line_number, 2);
279        assert!(result.malformed[0].content.contains("corrupt json"));
280        assert!(!result.malformed[0].error.is_empty());
281    }
282
283    #[test]
284    fn test_empty_file() {
285        let file = NamedTempFile::new().unwrap();
286        let mut reader = EventReader::new(file.path());
287        let result = reader.read_new_events().unwrap();
288        assert!(result.events.is_empty());
289        assert!(result.malformed.is_empty());
290    }
291
292    #[test]
293    fn test_reset_position() {
294        let mut file = NamedTempFile::new().unwrap();
295        writeln!(file, r#"{{"topic":"test","ts":"2024-01-01T00:00:00Z"}}"#).unwrap();
296        file.flush().unwrap();
297
298        let mut reader = EventReader::new(file.path());
299        reader.read_new_events().unwrap();
300        assert!(reader.position() > 0);
301
302        reader.reset();
303        assert_eq!(reader.position(), 0);
304
305        let result = reader.read_new_events().unwrap();
306        assert_eq!(result.events.len(), 1);
307    }
308
309    #[test]
310    fn test_structured_payload_as_object() {
311        // Test that JSON objects in payload field are converted to strings
312        let mut file = NamedTempFile::new().unwrap();
313        writeln!(
314            file,
315            r#"{{"topic":"review.done","payload":{{"status":"approved","files":["a.rs","b.rs"]}},"ts":"2024-01-01T00:00:00Z"}}"#
316        )
317        .unwrap();
318        file.flush().unwrap();
319
320        let mut reader = EventReader::new(file.path());
321        let result = reader.read_new_events().unwrap();
322
323        assert_eq!(result.events.len(), 1);
324        assert_eq!(result.events[0].topic, "review.done");
325
326        // Payload should be stringified JSON
327        let payload = result.events[0].payload.as_ref().unwrap();
328        assert!(payload.contains("\"status\""));
329        assert!(payload.contains("\"approved\""));
330        assert!(payload.contains("\"files\""));
331
332        // Verify it can be parsed back as JSON
333        let parsed: serde_json::Value = serde_json::from_str(payload).unwrap();
334        assert_eq!(parsed["status"], "approved");
335    }
336
337    #[test]
338    fn test_mixed_payload_formats() {
339        // Test mixing string and object payloads in same file
340        let mut file = NamedTempFile::new().unwrap();
341
342        // String payload
343        writeln!(
344            file,
345            r#"{{"topic":"task.start","payload":"Start work","ts":"2024-01-01T00:00:00Z"}}"#
346        )
347        .unwrap();
348
349        // Object payload
350        writeln!(
351            file,
352            r#"{{"topic":"task.done","payload":{{"result":"success"}},"ts":"2024-01-01T00:00:01Z"}}"#
353        )
354        .unwrap();
355
356        // No payload
357        writeln!(
358            file,
359            r#"{{"topic":"heartbeat","ts":"2024-01-01T00:00:02Z"}}"#
360        )
361        .unwrap();
362
363        file.flush().unwrap();
364
365        let mut reader = EventReader::new(file.path());
366        let result = reader.read_new_events().unwrap();
367
368        assert_eq!(result.events.len(), 3);
369
370        // First event: string payload
371        assert_eq!(result.events[0].payload, Some("Start work".to_string()));
372
373        // Second event: object payload converted to string
374        let payload2 = result.events[1].payload.as_ref().unwrap();
375        assert!(payload2.contains("\"result\""));
376
377        // Third event: no payload
378        assert_eq!(result.events[2].payload, None);
379    }
380
381    #[test]
382    fn test_nested_object_payload() {
383        // Test deeply nested objects are handled correctly
384        let mut file = NamedTempFile::new().unwrap();
385        writeln!(
386            file,
387            r#"{{"topic":"analysis","payload":{{"issues":[{{"file":"test.rs","line":42,"severity":"major"}}],"approval":"conditional"}},"ts":"2024-01-01T00:00:00Z"}}"#
388        )
389        .unwrap();
390        file.flush().unwrap();
391
392        let mut reader = EventReader::new(file.path());
393        let result = reader.read_new_events().unwrap();
394
395        assert_eq!(result.events.len(), 1);
396
397        // Should serialize nested structure
398        let payload = result.events[0].payload.as_ref().unwrap();
399        let parsed: serde_json::Value = serde_json::from_str(payload).unwrap();
400        assert_eq!(parsed["issues"][0]["file"], "test.rs");
401        assert_eq!(parsed["issues"][0]["line"], 42);
402        assert_eq!(parsed["approval"], "conditional");
403    }
404
405    #[test]
406    fn test_mixed_valid_invalid_handling() {
407        // Test that valid events are captured alongside malformed ones
408        let mut file = NamedTempFile::new().unwrap();
409        writeln!(file, r#"{{"topic":"valid1","ts":"2024-01-01T00:00:00Z"}}"#).unwrap();
410        writeln!(file, "not valid json at all").unwrap();
411        writeln!(file, r#"{{"topic":"valid2","ts":"2024-01-01T00:00:01Z"}}"#).unwrap();
412        file.flush().unwrap();
413
414        let mut reader = EventReader::new(file.path());
415        let result = reader.read_new_events().unwrap();
416
417        assert_eq!(result.events.len(), 2);
418        assert_eq!(result.malformed.len(), 1);
419        assert_eq!(result.events[0].topic, "valid1");
420        assert_eq!(result.events[1].topic, "valid2");
421    }
422}