1use serde::{Deserialize, Deserializer, Serialize};
4use std::fs::File;
5use std::io::{BufRead, BufReader, Seek, SeekFrom};
6use std::path::PathBuf;
7use tracing::warn;
8
9#[derive(Debug, Clone, Default)]
15pub struct ParseResult {
16 pub events: Vec<Event>,
18 pub malformed: Vec<MalformedLine>,
20}
21
22#[derive(Debug, Clone, Serialize)]
27pub struct MalformedLine {
28 pub line_number: u64,
30 pub content: String,
32 pub error: String,
34}
35
36impl MalformedLine {
37 const MAX_CONTENT_LEN: usize = 100;
39
40 pub fn new(line_number: u64, content: &str, error: String) -> Self {
42 let content = if content.len() > Self::MAX_CONTENT_LEN {
43 let truncate_at = crate::text::floor_char_boundary(content, Self::MAX_CONTENT_LEN);
46 format!("{}...", &content[..truncate_at])
47 } else {
48 content.to_string()
49 };
50 Self {
51 line_number,
52 content,
53 error,
54 }
55 }
56}
57
58fn deserialize_flexible_payload<'de, D>(deserializer: D) -> Result<Option<String>, D::Error>
67where
68 D: Deserializer<'de>,
69{
70 #[derive(Deserialize)]
71 #[serde(untagged)]
72 enum FlexiblePayload {
73 String(String),
74 Object(serde_json::Value),
75 }
76
77 let opt = Option::<FlexiblePayload>::deserialize(deserializer)?;
78 Ok(opt.map(|flex| match flex {
79 FlexiblePayload::String(s) => s,
80 FlexiblePayload::Object(obj) => {
81 serde_json::to_string(&obj).unwrap_or_else(|_| obj.to_string())
83 }
84 }))
85}
86
87#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
89pub struct Event {
90 pub topic: String,
91 #[serde(
92 default,
93 skip_serializing_if = "Option::is_none",
94 deserialize_with = "deserialize_flexible_payload"
95 )]
96 pub payload: Option<String>,
97 pub ts: String,
98
99 #[serde(default, skip_serializing_if = "Option::is_none")]
101 pub wave_id: Option<String>,
102
103 #[serde(default, skip_serializing_if = "Option::is_none")]
105 pub wave_index: Option<u32>,
106
107 #[serde(default, skip_serializing_if = "Option::is_none")]
109 pub wave_total: Option<u32>,
110}
111
112impl Event {
113 pub fn is_wave_event(&self) -> bool {
115 self.wave_id.is_some()
116 }
117}
118
119impl From<Event> for ralph_proto::Event {
120 fn from(e: Event) -> Self {
121 let mut pe = ralph_proto::Event::new(e.topic.as_str(), e.payload.unwrap_or_default());
123 if let Some(wave_id) = e.wave_id {
124 let index = e.wave_index.unwrap_or(0);
127 let total = e.wave_total.unwrap_or(1);
128 pe = pe.with_wave(wave_id, index, total);
129 }
130 pe
131 }
132}
133
134pub struct EventReader {
136 path: PathBuf,
137 position: u64,
138}
139
140impl EventReader {
141 pub fn new(path: impl Into<PathBuf>) -> Self {
143 Self {
144 path: path.into(),
145 position: 0,
146 }
147 }
148
149 pub fn read_new_events(&mut self) -> std::io::Result<ParseResult> {
160 if !self.path.exists() {
161 return Ok(ParseResult::default());
162 }
163
164 let mut file = File::open(&self.path)?;
165 file.seek(SeekFrom::Start(self.position))?;
166
167 let reader = BufReader::new(file);
168 let mut result = ParseResult::default();
169 let mut current_pos = self.position;
170 let mut line_number = self.count_lines_before_position();
171
172 for line in reader.lines() {
173 let line = line?;
174 let line_bytes = line.len() as u64 + 1; line_number += 1;
176
177 if line.trim().is_empty() {
178 current_pos += line_bytes;
179 continue;
180 }
181
182 match serde_json::from_str::<Event>(&line) {
183 Ok(event) => result.events.push(event),
184 Err(e) => {
185 warn!(error = %e, line_number = line_number, "Malformed JSON line");
186 result
187 .malformed
188 .push(MalformedLine::new(line_number, &line, e.to_string()));
189 }
190 }
191
192 current_pos += line_bytes;
193 }
194
195 self.position = current_pos;
196 Ok(result)
197 }
198
199 pub fn peek_new_events(&self) -> std::io::Result<ParseResult> {
204 let mut reader = Self {
205 path: self.path.clone(),
206 position: self.position,
207 };
208 reader.read_new_events()
209 }
210
211 fn count_lines_before_position(&self) -> u64 {
213 if self.position == 0 || !self.path.exists() {
214 return 0;
215 }
216 if let Ok(file) = File::open(&self.path) {
218 let reader = BufReader::new(file);
219 let mut count = 0u64;
220 let mut bytes_read = 0u64;
221 for line in reader.lines() {
222 if let Ok(line) = line {
223 bytes_read += line.len() as u64 + 1;
224 if bytes_read > self.position {
225 break;
226 }
227 count += 1;
228 } else {
229 break;
230 }
231 }
232 count
233 } else {
234 0
235 }
236 }
237
238 pub fn path(&self) -> &std::path::Path {
240 &self.path
241 }
242
243 pub fn position(&self) -> u64 {
245 self.position
246 }
247
248 pub fn set_position(&mut self, position: u64) {
253 self.position = position;
254 }
255
256 pub fn reset(&mut self) {
258 self.position = 0;
259 }
260}
261
262#[cfg(test)]
263mod tests {
264 use super::*;
265 use std::io::Write;
266 use tempfile::NamedTempFile;
267
268 #[test]
269 fn test_read_new_events() {
270 let mut file = NamedTempFile::new().unwrap();
271 writeln!(
272 file,
273 r#"{{"topic":"test","payload":"hello","ts":"2024-01-01T00:00:00Z"}}"#
274 )
275 .unwrap();
276 writeln!(file, r#"{{"topic":"test2","ts":"2024-01-01T00:00:01Z"}}"#).unwrap();
277 file.flush().unwrap();
278
279 let mut reader = EventReader::new(file.path());
280 let result = reader.read_new_events().unwrap();
281
282 assert_eq!(result.events.len(), 2);
283 assert_eq!(result.events[0].topic, "test");
284 assert_eq!(result.events[0].payload, Some("hello".to_string()));
285 assert_eq!(result.events[1].topic, "test2");
286 assert_eq!(result.events[1].payload, None);
287 assert!(result.malformed.is_empty());
288 }
289
290 #[test]
291 fn test_tracks_position() {
292 let mut file = NamedTempFile::new().unwrap();
293 writeln!(file, r#"{{"topic":"first","ts":"2024-01-01T00:00:00Z"}}"#).unwrap();
294 file.flush().unwrap();
295
296 let mut reader = EventReader::new(file.path());
297 let result = reader.read_new_events().unwrap();
298 assert_eq!(result.events.len(), 1);
299
300 writeln!(file, r#"{{"topic":"second","ts":"2024-01-01T00:00:01Z"}}"#).unwrap();
302 file.flush().unwrap();
303
304 let result = reader.read_new_events().unwrap();
306 assert_eq!(result.events.len(), 1);
307 assert_eq!(result.events[0].topic, "second");
308 }
309
310 #[test]
311 fn test_peek_new_events_does_not_advance_position() {
312 let mut file = NamedTempFile::new().unwrap();
313 writeln!(file, r#"{{"topic":"first","ts":"2024-01-01T00:00:00Z"}}"#).unwrap();
314 file.flush().unwrap();
315
316 let mut reader = EventReader::new(file.path());
317 let peeked = reader.peek_new_events().unwrap();
318 assert_eq!(peeked.events.len(), 1);
319 assert_eq!(peeked.events[0].topic, "first");
320
321 assert_eq!(reader.position(), 0);
323
324 let consumed = reader.read_new_events().unwrap();
325 assert_eq!(consumed.events.len(), 1);
326 assert_eq!(consumed.events[0].topic, "first");
327 }
328
329 #[test]
330 fn test_missing_file() {
331 let mut reader = EventReader::new("/nonexistent/path.jsonl");
332 let result = reader.read_new_events().unwrap();
333 assert!(result.events.is_empty());
334 assert!(result.malformed.is_empty());
335 }
336
337 #[test]
338 fn test_captures_malformed_lines() {
339 let mut file = NamedTempFile::new().unwrap();
340 writeln!(file, r#"{{"topic":"good","ts":"2024-01-01T00:00:00Z"}}"#).unwrap();
341 writeln!(file, r"{{corrupt json}}").unwrap();
342 writeln!(
343 file,
344 r#"{{"topic":"also_good","ts":"2024-01-01T00:00:01Z"}}"#
345 )
346 .unwrap();
347 file.flush().unwrap();
348
349 let mut reader = EventReader::new(file.path());
350 let result = reader.read_new_events().unwrap();
351
352 assert_eq!(result.events.len(), 2);
354 assert_eq!(result.events[0].topic, "good");
355 assert_eq!(result.events[1].topic, "also_good");
356
357 assert_eq!(result.malformed.len(), 1);
359 assert_eq!(result.malformed[0].line_number, 2);
360 assert!(result.malformed[0].content.contains("corrupt json"));
361 assert!(!result.malformed[0].error.is_empty());
362 }
363
364 #[test]
365 fn test_empty_file() {
366 let file = NamedTempFile::new().unwrap();
367 let mut reader = EventReader::new(file.path());
368 let result = reader.read_new_events().unwrap();
369 assert!(result.events.is_empty());
370 assert!(result.malformed.is_empty());
371 }
372
373 #[test]
374 fn test_reset_position() {
375 let mut file = NamedTempFile::new().unwrap();
376 writeln!(file, r#"{{"topic":"test","ts":"2024-01-01T00:00:00Z"}}"#).unwrap();
377 file.flush().unwrap();
378
379 let mut reader = EventReader::new(file.path());
380 reader.read_new_events().unwrap();
381 assert!(reader.position() > 0);
382
383 reader.reset();
384 assert_eq!(reader.position(), 0);
385
386 let result = reader.read_new_events().unwrap();
387 assert_eq!(result.events.len(), 1);
388 }
389
390 #[test]
391 fn test_structured_payload_as_object() {
392 let mut file = NamedTempFile::new().unwrap();
394 writeln!(
395 file,
396 r#"{{"topic":"review.done","payload":{{"status":"approved","files":["a.rs","b.rs"]}},"ts":"2024-01-01T00:00:00Z"}}"#
397 )
398 .unwrap();
399 file.flush().unwrap();
400
401 let mut reader = EventReader::new(file.path());
402 let result = reader.read_new_events().unwrap();
403
404 assert_eq!(result.events.len(), 1);
405 assert_eq!(result.events[0].topic, "review.done");
406
407 let payload = result.events[0].payload.as_ref().unwrap();
409 assert!(payload.contains("\"status\""));
410 assert!(payload.contains("\"approved\""));
411 assert!(payload.contains("\"files\""));
412
413 let parsed: serde_json::Value = serde_json::from_str(payload).unwrap();
415 assert_eq!(parsed["status"], "approved");
416 }
417
418 #[test]
419 fn test_mixed_payload_formats() {
420 let mut file = NamedTempFile::new().unwrap();
422
423 writeln!(
425 file,
426 r#"{{"topic":"task.start","payload":"Start work","ts":"2024-01-01T00:00:00Z"}}"#
427 )
428 .unwrap();
429
430 writeln!(
432 file,
433 r#"{{"topic":"task.done","payload":{{"result":"success"}},"ts":"2024-01-01T00:00:01Z"}}"#
434 )
435 .unwrap();
436
437 writeln!(
439 file,
440 r#"{{"topic":"heartbeat","ts":"2024-01-01T00:00:02Z"}}"#
441 )
442 .unwrap();
443
444 file.flush().unwrap();
445
446 let mut reader = EventReader::new(file.path());
447 let result = reader.read_new_events().unwrap();
448
449 assert_eq!(result.events.len(), 3);
450
451 assert_eq!(result.events[0].payload, Some("Start work".to_string()));
453
454 let payload2 = result.events[1].payload.as_ref().unwrap();
456 assert!(payload2.contains("\"result\""));
457
458 assert_eq!(result.events[2].payload, None);
460 }
461
462 #[test]
463 fn test_nested_object_payload() {
464 let mut file = NamedTempFile::new().unwrap();
466 writeln!(
467 file,
468 r#"{{"topic":"analysis","payload":{{"issues":[{{"file":"test.rs","line":42,"severity":"major"}}],"approval":"conditional"}},"ts":"2024-01-01T00:00:00Z"}}"#
469 )
470 .unwrap();
471 file.flush().unwrap();
472
473 let mut reader = EventReader::new(file.path());
474 let result = reader.read_new_events().unwrap();
475
476 assert_eq!(result.events.len(), 1);
477
478 let payload = result.events[0].payload.as_ref().unwrap();
480 let parsed: serde_json::Value = serde_json::from_str(payload).unwrap();
481 assert_eq!(parsed["issues"][0]["file"], "test.rs");
482 assert_eq!(parsed["issues"][0]["line"], 42);
483 assert_eq!(parsed["approval"], "conditional");
484 }
485
486 #[test]
487 fn test_event_reader_parses_wave_metadata() {
488 let mut file = NamedTempFile::new().unwrap();
489 writeln!(
490 file,
491 r#"{{"topic":"review.file","payload":"src/main.rs","ts":"2024-01-01T00:00:00Z","wave_id":"w-1a2b3c4d","wave_index":0,"wave_total":3}}"#
492 )
493 .unwrap();
494 writeln!(
495 file,
496 r#"{{"topic":"review.file","payload":"src/lib.rs","ts":"2024-01-01T00:00:00Z","wave_id":"w-1a2b3c4d","wave_index":1,"wave_total":3}}"#
497 )
498 .unwrap();
499 file.flush().unwrap();
500
501 let mut reader = EventReader::new(file.path());
502 let result = reader.read_new_events().unwrap();
503
504 assert_eq!(result.events.len(), 2);
505 assert!(result.events[0].is_wave_event());
506 assert_eq!(result.events[0].wave_id.as_deref(), Some("w-1a2b3c4d"));
507 assert_eq!(result.events[0].wave_index, Some(0));
508 assert_eq!(result.events[0].wave_total, Some(3));
509 assert_eq!(result.events[1].wave_index, Some(1));
510 }
511
512 #[test]
513 fn test_event_reader_backwards_compat_no_wave_fields() {
514 let mut file = NamedTempFile::new().unwrap();
516 writeln!(
517 file,
518 r#"{{"topic":"build.done","payload":"ok","ts":"2024-01-01T00:00:00Z"}}"#
519 )
520 .unwrap();
521 file.flush().unwrap();
522
523 let mut reader = EventReader::new(file.path());
524 let result = reader.read_new_events().unwrap();
525
526 assert_eq!(result.events.len(), 1);
527 assert!(!result.events[0].is_wave_event());
528 assert!(result.events[0].wave_id.is_none());
529 assert!(result.events[0].wave_index.is_none());
530 assert!(result.events[0].wave_total.is_none());
531 }
532
533 #[test]
534 fn test_event_reader_mixed_wave_and_non_wave() {
535 let mut file = NamedTempFile::new().unwrap();
536 writeln!(
538 file,
539 r#"{{"topic":"task.start","payload":"begin","ts":"2024-01-01T00:00:00Z"}}"#
540 )
541 .unwrap();
542 writeln!(
544 file,
545 r#"{{"topic":"review.file","payload":"src/main.rs","ts":"2024-01-01T00:00:01Z","wave_id":"w-abc","wave_index":0,"wave_total":2}}"#
546 )
547 .unwrap();
548 writeln!(
550 file,
551 r#"{{"topic":"build.done","ts":"2024-01-01T00:00:02Z"}}"#
552 )
553 .unwrap();
554 file.flush().unwrap();
555
556 let mut reader = EventReader::new(file.path());
557 let result = reader.read_new_events().unwrap();
558
559 assert_eq!(result.events.len(), 3);
560 assert!(!result.events[0].is_wave_event());
561 assert!(result.events[1].is_wave_event());
562 assert_eq!(result.events[1].wave_id.as_deref(), Some("w-abc"));
563 assert!(!result.events[2].is_wave_event());
564 }
565
566 #[test]
567 fn test_from_event_reader_to_proto_without_wave() {
568 let event = Event {
569 topic: "build.done".to_string(),
570 payload: Some("success".to_string()),
571 ts: "2024-01-01T00:00:00Z".to_string(),
572 wave_id: None,
573 wave_index: None,
574 wave_total: None,
575 };
576 let proto: ralph_proto::Event = event.into();
577 assert_eq!(proto.topic.as_str(), "build.done");
578 assert_eq!(proto.payload, "success");
579 assert!(!proto.is_wave_event());
580 }
581
582 #[test]
583 fn test_from_event_reader_to_proto_with_wave() {
584 let event = Event {
585 topic: "review.file".to_string(),
586 payload: Some("src/main.rs".to_string()),
587 ts: "2024-01-01T00:00:00Z".to_string(),
588 wave_id: Some("w-abc".to_string()),
589 wave_index: Some(2),
590 wave_total: Some(5),
591 };
592 let proto: ralph_proto::Event = event.into();
593 assert_eq!(proto.topic.as_str(), "review.file");
594 assert_eq!(proto.payload, "src/main.rs");
595 assert!(proto.is_wave_event());
596 assert_eq!(proto.wave_id.as_deref(), Some("w-abc"));
597 assert_eq!(proto.wave_index, Some(2));
598 assert_eq!(proto.wave_total, Some(5));
599 }
600
601 #[test]
602 fn test_from_event_reader_to_proto_none_payload() {
603 let event = Event {
604 topic: "empty.event".to_string(),
605 payload: None,
606 ts: "2024-01-01T00:00:00Z".to_string(),
607 wave_id: None,
608 wave_index: None,
609 wave_total: None,
610 };
611 let proto: ralph_proto::Event = event.into();
612 assert_eq!(proto.payload, "");
613 }
614
615 #[test]
616 fn test_mixed_valid_invalid_handling() {
617 let mut file = NamedTempFile::new().unwrap();
619 writeln!(file, r#"{{"topic":"valid1","ts":"2024-01-01T00:00:00Z"}}"#).unwrap();
620 writeln!(file, "not valid json at all").unwrap();
621 writeln!(file, r#"{{"topic":"valid2","ts":"2024-01-01T00:00:01Z"}}"#).unwrap();
622 file.flush().unwrap();
623
624 let mut reader = EventReader::new(file.path());
625 let result = reader.read_new_events().unwrap();
626
627 assert_eq!(result.events.len(), 2);
628 assert_eq!(result.malformed.len(), 1);
629 assert_eq!(result.events[0].topic, "valid1");
630 assert_eq!(result.events[1].topic, "valid2");
631 }
632}