Skip to main content

ralph_core/
event_reader.rs

1//! Event reader for consuming events from `.ralph/events.jsonl`.
2
3use serde::{Deserialize, Deserializer, Serialize};
4use std::fs::File;
5use std::io::{BufRead, BufReader, Seek, SeekFrom};
6use std::path::PathBuf;
7use tracing::warn;
8
9/// Result of parsing events from a JSONL file.
10///
11/// Contains both successfully parsed events and information about lines
12/// that failed to parse. This supports backpressure validation by allowing
13/// the caller to respond to malformed events.
14#[derive(Debug, Clone, Default)]
15pub struct ParseResult {
16    /// Successfully parsed events.
17    pub events: Vec<Event>,
18    /// Lines that failed to parse.
19    pub malformed: Vec<MalformedLine>,
20}
21
22/// Information about a malformed JSONL line.
23///
24/// Used for backpressure feedback - when agents write invalid JSONL,
25/// this provides details for the `event.malformed` system event.
26#[derive(Debug, Clone, Serialize)]
27pub struct MalformedLine {
28    /// Line number in the file (1-indexed).
29    pub line_number: u64,
30    /// The raw content that failed to parse (truncated if very long).
31    pub content: String,
32    /// The parse error message.
33    pub error: String,
34}
35
36impl MalformedLine {
37    /// Maximum content length before truncation.
38    const MAX_CONTENT_LEN: usize = 100;
39
40    /// Creates a new MalformedLine, truncating content if needed.
41    pub fn new(line_number: u64, content: &str, error: String) -> Self {
42        let content = if content.len() > Self::MAX_CONTENT_LEN {
43            // Truncate at a valid UTF-8 character boundary to avoid panics
44            // on multi-byte content.
45            let truncate_at = crate::text::floor_char_boundary(content, Self::MAX_CONTENT_LEN);
46            format!("{}...", &content[..truncate_at])
47        } else {
48            content.to_string()
49        };
50        Self {
51            line_number,
52            content,
53            error,
54        }
55    }
56}
57
58/// Custom deserializer that accepts both String and structured JSON payloads.
59///
60/// Agents sometimes write structured data as JSON objects instead of strings.
61/// This deserializer accepts both formats:
62/// - `"payload": "string"` → `Some("string")`
63/// - `"payload": {...}` → `Some("{...}")` (serialized to JSON string)
64/// - `"payload": null` → `None`
65/// - missing field → `None`
66fn deserialize_flexible_payload<'de, D>(deserializer: D) -> Result<Option<String>, D::Error>
67where
68    D: Deserializer<'de>,
69{
70    #[derive(Deserialize)]
71    #[serde(untagged)]
72    enum FlexiblePayload {
73        String(String),
74        Object(serde_json::Value),
75    }
76
77    let opt = Option::<FlexiblePayload>::deserialize(deserializer)?;
78    Ok(opt.map(|flex| match flex {
79        FlexiblePayload::String(s) => s,
80        FlexiblePayload::Object(obj) => {
81            // Serialize the object back to a JSON string
82            serde_json::to_string(&obj).unwrap_or_else(|_| obj.to_string())
83        }
84    }))
85}
86
87/// A simplified event for reading from JSONL.
88#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
89pub struct Event {
90    pub topic: String,
91    #[serde(
92        default,
93        skip_serializing_if = "Option::is_none",
94        deserialize_with = "deserialize_flexible_payload"
95    )]
96    pub payload: Option<String>,
97    pub ts: String,
98
99    /// Wave correlation ID.
100    #[serde(default, skip_serializing_if = "Option::is_none")]
101    pub wave_id: Option<String>,
102
103    /// Index of this event within the wave (0-based).
104    #[serde(default, skip_serializing_if = "Option::is_none")]
105    pub wave_index: Option<u32>,
106
107    /// Total number of events in the wave.
108    #[serde(default, skip_serializing_if = "Option::is_none")]
109    pub wave_total: Option<u32>,
110}
111
112impl Event {
113    /// Returns true if this event has wave correlation metadata.
114    pub fn is_wave_event(&self) -> bool {
115        self.wave_id.is_some()
116    }
117}
118
119impl From<Event> for ralph_proto::Event {
120    fn from(e: Event) -> Self {
121        // ts is a JSONL serialization concern, not carried to bus events.
122        let mut pe = ralph_proto::Event::new(e.topic.as_str(), e.payload.unwrap_or_default());
123        if let Some(wave_id) = e.wave_id {
124            // wave_index is required when wave_id is present; default to 0
125            // only as a last resort (should not happen with well-formed events).
126            let index = e.wave_index.unwrap_or(0);
127            let total = e.wave_total.unwrap_or(1);
128            pe = pe.with_wave(wave_id, index, total);
129        }
130        pe
131    }
132}
133
134/// Reads new events from `.ralph/events.jsonl` since last read.
135pub struct EventReader {
136    path: PathBuf,
137    position: u64,
138}
139
140impl EventReader {
141    /// Creates a new event reader for the given path.
142    pub fn new(path: impl Into<PathBuf>) -> Self {
143        Self {
144            path: path.into(),
145            position: 0,
146        }
147    }
148
149    /// Reads new events since the last read.
150    ///
151    /// Returns a `ParseResult` containing both successfully parsed events
152    /// and information about malformed lines. This enables backpressure
153    /// validation - the caller can emit `event.malformed` events and
154    /// track consecutive failures.
155    ///
156    /// # Errors
157    ///
158    /// Returns an error if the file cannot be opened or read.
159    pub fn read_new_events(&mut self) -> std::io::Result<ParseResult> {
160        if !self.path.exists() {
161            return Ok(ParseResult::default());
162        }
163
164        let mut file = File::open(&self.path)?;
165        file.seek(SeekFrom::Start(self.position))?;
166
167        let reader = BufReader::new(file);
168        let mut result = ParseResult::default();
169        let mut current_pos = self.position;
170        let mut line_number = self.count_lines_before_position();
171
172        for line in reader.lines() {
173            let line = line?;
174            let line_bytes = line.len() as u64 + 1; // +1 for newline
175            line_number += 1;
176
177            if line.trim().is_empty() {
178                current_pos += line_bytes;
179                continue;
180            }
181
182            match serde_json::from_str::<Event>(&line) {
183                Ok(event) => result.events.push(event),
184                Err(e) => {
185                    warn!(error = %e, line_number = line_number, "Malformed JSON line");
186                    result
187                        .malformed
188                        .push(MalformedLine::new(line_number, &line, e.to_string()));
189                }
190            }
191
192            current_pos += line_bytes;
193        }
194
195        self.position = current_pos;
196        Ok(result)
197    }
198
199    /// Reads new events without advancing the internal file position.
200    ///
201    /// This is used by callers that need to inspect unread events before
202    /// deciding whether to process them.
203    pub fn peek_new_events(&self) -> std::io::Result<ParseResult> {
204        let mut reader = Self {
205            path: self.path.clone(),
206            position: self.position,
207        };
208        reader.read_new_events()
209    }
210
211    /// Counts lines before the current position (for line numbering).
212    fn count_lines_before_position(&self) -> u64 {
213        if self.position == 0 || !self.path.exists() {
214            return 0;
215        }
216        // Read file up to position and count newlines
217        if let Ok(file) = File::open(&self.path) {
218            let reader = BufReader::new(file);
219            let mut count = 0u64;
220            let mut bytes_read = 0u64;
221            for line in reader.lines() {
222                if let Ok(line) = line {
223                    bytes_read += line.len() as u64 + 1;
224                    if bytes_read > self.position {
225                        break;
226                    }
227                    count += 1;
228                } else {
229                    break;
230                }
231            }
232            count
233        } else {
234            0
235        }
236    }
237
238    /// Returns the path to the events file.
239    pub fn path(&self) -> &std::path::Path {
240        &self.path
241    }
242
243    /// Returns the current file position.
244    pub fn position(&self) -> u64 {
245        self.position
246    }
247
248    /// Sets the file position to a specific byte offset.
249    ///
250    /// Use this to skip past entries written by the EventLogger so they
251    /// are not re-read by `process_events_from_jsonl`.
252    pub fn set_position(&mut self, position: u64) {
253        self.position = position;
254    }
255
256    /// Resets the position to the start of the file.
257    pub fn reset(&mut self) {
258        self.position = 0;
259    }
260}
261
262#[cfg(test)]
263mod tests {
264    use super::*;
265    use std::io::Write;
266    use tempfile::NamedTempFile;
267
268    #[test]
269    fn test_read_new_events() {
270        let mut file = NamedTempFile::new().unwrap();
271        writeln!(
272            file,
273            r#"{{"topic":"test","payload":"hello","ts":"2024-01-01T00:00:00Z"}}"#
274        )
275        .unwrap();
276        writeln!(file, r#"{{"topic":"test2","ts":"2024-01-01T00:00:01Z"}}"#).unwrap();
277        file.flush().unwrap();
278
279        let mut reader = EventReader::new(file.path());
280        let result = reader.read_new_events().unwrap();
281
282        assert_eq!(result.events.len(), 2);
283        assert_eq!(result.events[0].topic, "test");
284        assert_eq!(result.events[0].payload, Some("hello".to_string()));
285        assert_eq!(result.events[1].topic, "test2");
286        assert_eq!(result.events[1].payload, None);
287        assert!(result.malformed.is_empty());
288    }
289
290    #[test]
291    fn test_tracks_position() {
292        let mut file = NamedTempFile::new().unwrap();
293        writeln!(file, r#"{{"topic":"first","ts":"2024-01-01T00:00:00Z"}}"#).unwrap();
294        file.flush().unwrap();
295
296        let mut reader = EventReader::new(file.path());
297        let result = reader.read_new_events().unwrap();
298        assert_eq!(result.events.len(), 1);
299
300        // Add more events
301        writeln!(file, r#"{{"topic":"second","ts":"2024-01-01T00:00:01Z"}}"#).unwrap();
302        file.flush().unwrap();
303
304        // Should only read new events
305        let result = reader.read_new_events().unwrap();
306        assert_eq!(result.events.len(), 1);
307        assert_eq!(result.events[0].topic, "second");
308    }
309
310    #[test]
311    fn test_peek_new_events_does_not_advance_position() {
312        let mut file = NamedTempFile::new().unwrap();
313        writeln!(file, r#"{{"topic":"first","ts":"2024-01-01T00:00:00Z"}}"#).unwrap();
314        file.flush().unwrap();
315
316        let mut reader = EventReader::new(file.path());
317        let peeked = reader.peek_new_events().unwrap();
318        assert_eq!(peeked.events.len(), 1);
319        assert_eq!(peeked.events[0].topic, "first");
320
321        // Position should remain unchanged after peek.
322        assert_eq!(reader.position(), 0);
323
324        let consumed = reader.read_new_events().unwrap();
325        assert_eq!(consumed.events.len(), 1);
326        assert_eq!(consumed.events[0].topic, "first");
327    }
328
329    #[test]
330    fn test_missing_file() {
331        let mut reader = EventReader::new("/nonexistent/path.jsonl");
332        let result = reader.read_new_events().unwrap();
333        assert!(result.events.is_empty());
334        assert!(result.malformed.is_empty());
335    }
336
337    #[test]
338    fn test_captures_malformed_lines() {
339        let mut file = NamedTempFile::new().unwrap();
340        writeln!(file, r#"{{"topic":"good","ts":"2024-01-01T00:00:00Z"}}"#).unwrap();
341        writeln!(file, r"{{corrupt json}}").unwrap();
342        writeln!(
343            file,
344            r#"{{"topic":"also_good","ts":"2024-01-01T00:00:01Z"}}"#
345        )
346        .unwrap();
347        file.flush().unwrap();
348
349        let mut reader = EventReader::new(file.path());
350        let result = reader.read_new_events().unwrap();
351
352        // Good events should be parsed
353        assert_eq!(result.events.len(), 2);
354        assert_eq!(result.events[0].topic, "good");
355        assert_eq!(result.events[1].topic, "also_good");
356
357        // Malformed line should be captured
358        assert_eq!(result.malformed.len(), 1);
359        assert_eq!(result.malformed[0].line_number, 2);
360        assert!(result.malformed[0].content.contains("corrupt json"));
361        assert!(!result.malformed[0].error.is_empty());
362    }
363
364    #[test]
365    fn test_empty_file() {
366        let file = NamedTempFile::new().unwrap();
367        let mut reader = EventReader::new(file.path());
368        let result = reader.read_new_events().unwrap();
369        assert!(result.events.is_empty());
370        assert!(result.malformed.is_empty());
371    }
372
373    #[test]
374    fn test_reset_position() {
375        let mut file = NamedTempFile::new().unwrap();
376        writeln!(file, r#"{{"topic":"test","ts":"2024-01-01T00:00:00Z"}}"#).unwrap();
377        file.flush().unwrap();
378
379        let mut reader = EventReader::new(file.path());
380        reader.read_new_events().unwrap();
381        assert!(reader.position() > 0);
382
383        reader.reset();
384        assert_eq!(reader.position(), 0);
385
386        let result = reader.read_new_events().unwrap();
387        assert_eq!(result.events.len(), 1);
388    }
389
390    #[test]
391    fn test_structured_payload_as_object() {
392        // Test that JSON objects in payload field are converted to strings
393        let mut file = NamedTempFile::new().unwrap();
394        writeln!(
395            file,
396            r#"{{"topic":"review.done","payload":{{"status":"approved","files":["a.rs","b.rs"]}},"ts":"2024-01-01T00:00:00Z"}}"#
397        )
398        .unwrap();
399        file.flush().unwrap();
400
401        let mut reader = EventReader::new(file.path());
402        let result = reader.read_new_events().unwrap();
403
404        assert_eq!(result.events.len(), 1);
405        assert_eq!(result.events[0].topic, "review.done");
406
407        // Payload should be stringified JSON
408        let payload = result.events[0].payload.as_ref().unwrap();
409        assert!(payload.contains("\"status\""));
410        assert!(payload.contains("\"approved\""));
411        assert!(payload.contains("\"files\""));
412
413        // Verify it can be parsed back as JSON
414        let parsed: serde_json::Value = serde_json::from_str(payload).unwrap();
415        assert_eq!(parsed["status"], "approved");
416    }
417
418    #[test]
419    fn test_mixed_payload_formats() {
420        // Test mixing string and object payloads in same file
421        let mut file = NamedTempFile::new().unwrap();
422
423        // String payload
424        writeln!(
425            file,
426            r#"{{"topic":"task.start","payload":"Start work","ts":"2024-01-01T00:00:00Z"}}"#
427        )
428        .unwrap();
429
430        // Object payload
431        writeln!(
432            file,
433            r#"{{"topic":"task.done","payload":{{"result":"success"}},"ts":"2024-01-01T00:00:01Z"}}"#
434        )
435        .unwrap();
436
437        // No payload
438        writeln!(
439            file,
440            r#"{{"topic":"heartbeat","ts":"2024-01-01T00:00:02Z"}}"#
441        )
442        .unwrap();
443
444        file.flush().unwrap();
445
446        let mut reader = EventReader::new(file.path());
447        let result = reader.read_new_events().unwrap();
448
449        assert_eq!(result.events.len(), 3);
450
451        // First event: string payload
452        assert_eq!(result.events[0].payload, Some("Start work".to_string()));
453
454        // Second event: object payload converted to string
455        let payload2 = result.events[1].payload.as_ref().unwrap();
456        assert!(payload2.contains("\"result\""));
457
458        // Third event: no payload
459        assert_eq!(result.events[2].payload, None);
460    }
461
462    #[test]
463    fn test_nested_object_payload() {
464        // Test deeply nested objects are handled correctly
465        let mut file = NamedTempFile::new().unwrap();
466        writeln!(
467            file,
468            r#"{{"topic":"analysis","payload":{{"issues":[{{"file":"test.rs","line":42,"severity":"major"}}],"approval":"conditional"}},"ts":"2024-01-01T00:00:00Z"}}"#
469        )
470        .unwrap();
471        file.flush().unwrap();
472
473        let mut reader = EventReader::new(file.path());
474        let result = reader.read_new_events().unwrap();
475
476        assert_eq!(result.events.len(), 1);
477
478        // Should serialize nested structure
479        let payload = result.events[0].payload.as_ref().unwrap();
480        let parsed: serde_json::Value = serde_json::from_str(payload).unwrap();
481        assert_eq!(parsed["issues"][0]["file"], "test.rs");
482        assert_eq!(parsed["issues"][0]["line"], 42);
483        assert_eq!(parsed["approval"], "conditional");
484    }
485
486    #[test]
487    fn test_event_reader_parses_wave_metadata() {
488        let mut file = NamedTempFile::new().unwrap();
489        writeln!(
490            file,
491            r#"{{"topic":"review.file","payload":"src/main.rs","ts":"2024-01-01T00:00:00Z","wave_id":"w-1a2b3c4d","wave_index":0,"wave_total":3}}"#
492        )
493        .unwrap();
494        writeln!(
495            file,
496            r#"{{"topic":"review.file","payload":"src/lib.rs","ts":"2024-01-01T00:00:00Z","wave_id":"w-1a2b3c4d","wave_index":1,"wave_total":3}}"#
497        )
498        .unwrap();
499        file.flush().unwrap();
500
501        let mut reader = EventReader::new(file.path());
502        let result = reader.read_new_events().unwrap();
503
504        assert_eq!(result.events.len(), 2);
505        assert!(result.events[0].is_wave_event());
506        assert_eq!(result.events[0].wave_id.as_deref(), Some("w-1a2b3c4d"));
507        assert_eq!(result.events[0].wave_index, Some(0));
508        assert_eq!(result.events[0].wave_total, Some(3));
509        assert_eq!(result.events[1].wave_index, Some(1));
510    }
511
512    #[test]
513    fn test_event_reader_backwards_compat_no_wave_fields() {
514        // Events written before wave support should still parse
515        let mut file = NamedTempFile::new().unwrap();
516        writeln!(
517            file,
518            r#"{{"topic":"build.done","payload":"ok","ts":"2024-01-01T00:00:00Z"}}"#
519        )
520        .unwrap();
521        file.flush().unwrap();
522
523        let mut reader = EventReader::new(file.path());
524        let result = reader.read_new_events().unwrap();
525
526        assert_eq!(result.events.len(), 1);
527        assert!(!result.events[0].is_wave_event());
528        assert!(result.events[0].wave_id.is_none());
529        assert!(result.events[0].wave_index.is_none());
530        assert!(result.events[0].wave_total.is_none());
531    }
532
533    #[test]
534    fn test_event_reader_mixed_wave_and_non_wave() {
535        let mut file = NamedTempFile::new().unwrap();
536        // Non-wave event
537        writeln!(
538            file,
539            r#"{{"topic":"task.start","payload":"begin","ts":"2024-01-01T00:00:00Z"}}"#
540        )
541        .unwrap();
542        // Wave event
543        writeln!(
544            file,
545            r#"{{"topic":"review.file","payload":"src/main.rs","ts":"2024-01-01T00:00:01Z","wave_id":"w-abc","wave_index":0,"wave_total":2}}"#
546        )
547        .unwrap();
548        // Another non-wave event
549        writeln!(
550            file,
551            r#"{{"topic":"build.done","ts":"2024-01-01T00:00:02Z"}}"#
552        )
553        .unwrap();
554        file.flush().unwrap();
555
556        let mut reader = EventReader::new(file.path());
557        let result = reader.read_new_events().unwrap();
558
559        assert_eq!(result.events.len(), 3);
560        assert!(!result.events[0].is_wave_event());
561        assert!(result.events[1].is_wave_event());
562        assert_eq!(result.events[1].wave_id.as_deref(), Some("w-abc"));
563        assert!(!result.events[2].is_wave_event());
564    }
565
566    #[test]
567    fn test_from_event_reader_to_proto_without_wave() {
568        let event = Event {
569            topic: "build.done".to_string(),
570            payload: Some("success".to_string()),
571            ts: "2024-01-01T00:00:00Z".to_string(),
572            wave_id: None,
573            wave_index: None,
574            wave_total: None,
575        };
576        let proto: ralph_proto::Event = event.into();
577        assert_eq!(proto.topic.as_str(), "build.done");
578        assert_eq!(proto.payload, "success");
579        assert!(!proto.is_wave_event());
580    }
581
582    #[test]
583    fn test_from_event_reader_to_proto_with_wave() {
584        let event = Event {
585            topic: "review.file".to_string(),
586            payload: Some("src/main.rs".to_string()),
587            ts: "2024-01-01T00:00:00Z".to_string(),
588            wave_id: Some("w-abc".to_string()),
589            wave_index: Some(2),
590            wave_total: Some(5),
591        };
592        let proto: ralph_proto::Event = event.into();
593        assert_eq!(proto.topic.as_str(), "review.file");
594        assert_eq!(proto.payload, "src/main.rs");
595        assert!(proto.is_wave_event());
596        assert_eq!(proto.wave_id.as_deref(), Some("w-abc"));
597        assert_eq!(proto.wave_index, Some(2));
598        assert_eq!(proto.wave_total, Some(5));
599    }
600
601    #[test]
602    fn test_from_event_reader_to_proto_none_payload() {
603        let event = Event {
604            topic: "empty.event".to_string(),
605            payload: None,
606            ts: "2024-01-01T00:00:00Z".to_string(),
607            wave_id: None,
608            wave_index: None,
609            wave_total: None,
610        };
611        let proto: ralph_proto::Event = event.into();
612        assert_eq!(proto.payload, "");
613    }
614
615    #[test]
616    fn test_mixed_valid_invalid_handling() {
617        // Test that valid events are captured alongside malformed ones
618        let mut file = NamedTempFile::new().unwrap();
619        writeln!(file, r#"{{"topic":"valid1","ts":"2024-01-01T00:00:00Z"}}"#).unwrap();
620        writeln!(file, "not valid json at all").unwrap();
621        writeln!(file, r#"{{"topic":"valid2","ts":"2024-01-01T00:00:01Z"}}"#).unwrap();
622        file.flush().unwrap();
623
624        let mut reader = EventReader::new(file.path());
625        let result = reader.read_new_events().unwrap();
626
627        assert_eq!(result.events.len(), 2);
628        assert_eq!(result.malformed.len(), 1);
629        assert_eq!(result.events[0].topic, "valid1");
630        assert_eq!(result.events[1].topic, "valid2");
631    }
632}