1use serde::{Deserialize, Deserializer, Serialize};
4use std::fs::File;
5use std::io::{BufRead, BufReader, Seek, SeekFrom};
6use std::path::PathBuf;
7use tracing::warn;
8
9#[derive(Debug, Clone, Default)]
15pub struct ParseResult {
16 pub events: Vec<Event>,
18 pub malformed: Vec<MalformedLine>,
20}
21
22#[derive(Debug, Clone, Serialize)]
27pub struct MalformedLine {
28 pub line_number: u64,
30 pub content: String,
32 pub error: String,
34}
35
36impl MalformedLine {
37 const MAX_CONTENT_LEN: usize = 100;
39
40 pub fn new(line_number: u64, content: &str, error: String) -> Self {
42 let content = if content.len() > Self::MAX_CONTENT_LEN {
43 format!("{}...", &content[..Self::MAX_CONTENT_LEN])
44 } else {
45 content.to_string()
46 };
47 Self {
48 line_number,
49 content,
50 error,
51 }
52 }
53}
54
55fn deserialize_flexible_payload<'de, D>(deserializer: D) -> Result<Option<String>, D::Error>
64where
65 D: Deserializer<'de>,
66{
67 #[derive(Deserialize)]
68 #[serde(untagged)]
69 enum FlexiblePayload {
70 String(String),
71 Object(serde_json::Value),
72 }
73
74 let opt = Option::<FlexiblePayload>::deserialize(deserializer)?;
75 Ok(opt.map(|flex| match flex {
76 FlexiblePayload::String(s) => s,
77 FlexiblePayload::Object(obj) => {
78 serde_json::to_string(&obj).unwrap_or_else(|_| obj.to_string())
80 }
81 }))
82}
83
84#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
86pub struct Event {
87 pub topic: String,
88 #[serde(
89 default,
90 skip_serializing_if = "Option::is_none",
91 deserialize_with = "deserialize_flexible_payload"
92 )]
93 pub payload: Option<String>,
94 pub ts: String,
95}
96
97pub struct EventReader {
99 path: PathBuf,
100 position: u64,
101}
102
103impl EventReader {
104 pub fn new(path: impl Into<PathBuf>) -> Self {
106 Self {
107 path: path.into(),
108 position: 0,
109 }
110 }
111
112 pub fn read_new_events(&mut self) -> std::io::Result<ParseResult> {
123 if !self.path.exists() {
124 return Ok(ParseResult::default());
125 }
126
127 let mut file = File::open(&self.path)?;
128 file.seek(SeekFrom::Start(self.position))?;
129
130 let reader = BufReader::new(file);
131 let mut result = ParseResult::default();
132 let mut current_pos = self.position;
133 let mut line_number = self.count_lines_before_position();
134
135 for line in reader.lines() {
136 let line = line?;
137 let line_bytes = line.len() as u64 + 1; line_number += 1;
139
140 if line.trim().is_empty() {
141 current_pos += line_bytes;
142 continue;
143 }
144
145 match serde_json::from_str::<Event>(&line) {
146 Ok(event) => result.events.push(event),
147 Err(e) => {
148 warn!(error = %e, line_number = line_number, "Malformed JSON line");
149 result
150 .malformed
151 .push(MalformedLine::new(line_number, &line, e.to_string()));
152 }
153 }
154
155 current_pos += line_bytes;
156 }
157
158 self.position = current_pos;
159 Ok(result)
160 }
161
162 pub fn peek_new_events(&self) -> std::io::Result<ParseResult> {
167 let mut reader = Self {
168 path: self.path.clone(),
169 position: self.position,
170 };
171 reader.read_new_events()
172 }
173
174 fn count_lines_before_position(&self) -> u64 {
176 if self.position == 0 || !self.path.exists() {
177 return 0;
178 }
179 if let Ok(file) = File::open(&self.path) {
181 let reader = BufReader::new(file);
182 let mut count = 0u64;
183 let mut bytes_read = 0u64;
184 for line in reader.lines() {
185 if let Ok(line) = line {
186 bytes_read += line.len() as u64 + 1;
187 if bytes_read > self.position {
188 break;
189 }
190 count += 1;
191 } else {
192 break;
193 }
194 }
195 count
196 } else {
197 0
198 }
199 }
200
201 pub fn position(&self) -> u64 {
203 self.position
204 }
205
206 pub fn reset(&mut self) {
208 self.position = 0;
209 }
210}
211
212#[cfg(test)]
213mod tests {
214 use super::*;
215 use std::io::Write;
216 use tempfile::NamedTempFile;
217
218 #[test]
219 fn test_read_new_events() {
220 let mut file = NamedTempFile::new().unwrap();
221 writeln!(
222 file,
223 r#"{{"topic":"test","payload":"hello","ts":"2024-01-01T00:00:00Z"}}"#
224 )
225 .unwrap();
226 writeln!(file, r#"{{"topic":"test2","ts":"2024-01-01T00:00:01Z"}}"#).unwrap();
227 file.flush().unwrap();
228
229 let mut reader = EventReader::new(file.path());
230 let result = reader.read_new_events().unwrap();
231
232 assert_eq!(result.events.len(), 2);
233 assert_eq!(result.events[0].topic, "test");
234 assert_eq!(result.events[0].payload, Some("hello".to_string()));
235 assert_eq!(result.events[1].topic, "test2");
236 assert_eq!(result.events[1].payload, None);
237 assert!(result.malformed.is_empty());
238 }
239
240 #[test]
241 fn test_tracks_position() {
242 let mut file = NamedTempFile::new().unwrap();
243 writeln!(file, r#"{{"topic":"first","ts":"2024-01-01T00:00:00Z"}}"#).unwrap();
244 file.flush().unwrap();
245
246 let mut reader = EventReader::new(file.path());
247 let result = reader.read_new_events().unwrap();
248 assert_eq!(result.events.len(), 1);
249
250 writeln!(file, r#"{{"topic":"second","ts":"2024-01-01T00:00:01Z"}}"#).unwrap();
252 file.flush().unwrap();
253
254 let result = reader.read_new_events().unwrap();
256 assert_eq!(result.events.len(), 1);
257 assert_eq!(result.events[0].topic, "second");
258 }
259
260 #[test]
261 fn test_peek_new_events_does_not_advance_position() {
262 let mut file = NamedTempFile::new().unwrap();
263 writeln!(file, r#"{{"topic":"first","ts":"2024-01-01T00:00:00Z"}}"#).unwrap();
264 file.flush().unwrap();
265
266 let mut reader = EventReader::new(file.path());
267 let peeked = reader.peek_new_events().unwrap();
268 assert_eq!(peeked.events.len(), 1);
269 assert_eq!(peeked.events[0].topic, "first");
270
271 assert_eq!(reader.position(), 0);
273
274 let consumed = reader.read_new_events().unwrap();
275 assert_eq!(consumed.events.len(), 1);
276 assert_eq!(consumed.events[0].topic, "first");
277 }
278
279 #[test]
280 fn test_missing_file() {
281 let mut reader = EventReader::new("/nonexistent/path.jsonl");
282 let result = reader.read_new_events().unwrap();
283 assert!(result.events.is_empty());
284 assert!(result.malformed.is_empty());
285 }
286
287 #[test]
288 fn test_captures_malformed_lines() {
289 let mut file = NamedTempFile::new().unwrap();
290 writeln!(file, r#"{{"topic":"good","ts":"2024-01-01T00:00:00Z"}}"#).unwrap();
291 writeln!(file, r"{{corrupt json}}").unwrap();
292 writeln!(
293 file,
294 r#"{{"topic":"also_good","ts":"2024-01-01T00:00:01Z"}}"#
295 )
296 .unwrap();
297 file.flush().unwrap();
298
299 let mut reader = EventReader::new(file.path());
300 let result = reader.read_new_events().unwrap();
301
302 assert_eq!(result.events.len(), 2);
304 assert_eq!(result.events[0].topic, "good");
305 assert_eq!(result.events[1].topic, "also_good");
306
307 assert_eq!(result.malformed.len(), 1);
309 assert_eq!(result.malformed[0].line_number, 2);
310 assert!(result.malformed[0].content.contains("corrupt json"));
311 assert!(!result.malformed[0].error.is_empty());
312 }
313
314 #[test]
315 fn test_empty_file() {
316 let file = NamedTempFile::new().unwrap();
317 let mut reader = EventReader::new(file.path());
318 let result = reader.read_new_events().unwrap();
319 assert!(result.events.is_empty());
320 assert!(result.malformed.is_empty());
321 }
322
323 #[test]
324 fn test_reset_position() {
325 let mut file = NamedTempFile::new().unwrap();
326 writeln!(file, r#"{{"topic":"test","ts":"2024-01-01T00:00:00Z"}}"#).unwrap();
327 file.flush().unwrap();
328
329 let mut reader = EventReader::new(file.path());
330 reader.read_new_events().unwrap();
331 assert!(reader.position() > 0);
332
333 reader.reset();
334 assert_eq!(reader.position(), 0);
335
336 let result = reader.read_new_events().unwrap();
337 assert_eq!(result.events.len(), 1);
338 }
339
340 #[test]
341 fn test_structured_payload_as_object() {
342 let mut file = NamedTempFile::new().unwrap();
344 writeln!(
345 file,
346 r#"{{"topic":"review.done","payload":{{"status":"approved","files":["a.rs","b.rs"]}},"ts":"2024-01-01T00:00:00Z"}}"#
347 )
348 .unwrap();
349 file.flush().unwrap();
350
351 let mut reader = EventReader::new(file.path());
352 let result = reader.read_new_events().unwrap();
353
354 assert_eq!(result.events.len(), 1);
355 assert_eq!(result.events[0].topic, "review.done");
356
357 let payload = result.events[0].payload.as_ref().unwrap();
359 assert!(payload.contains("\"status\""));
360 assert!(payload.contains("\"approved\""));
361 assert!(payload.contains("\"files\""));
362
363 let parsed: serde_json::Value = serde_json::from_str(payload).unwrap();
365 assert_eq!(parsed["status"], "approved");
366 }
367
368 #[test]
369 fn test_mixed_payload_formats() {
370 let mut file = NamedTempFile::new().unwrap();
372
373 writeln!(
375 file,
376 r#"{{"topic":"task.start","payload":"Start work","ts":"2024-01-01T00:00:00Z"}}"#
377 )
378 .unwrap();
379
380 writeln!(
382 file,
383 r#"{{"topic":"task.done","payload":{{"result":"success"}},"ts":"2024-01-01T00:00:01Z"}}"#
384 )
385 .unwrap();
386
387 writeln!(
389 file,
390 r#"{{"topic":"heartbeat","ts":"2024-01-01T00:00:02Z"}}"#
391 )
392 .unwrap();
393
394 file.flush().unwrap();
395
396 let mut reader = EventReader::new(file.path());
397 let result = reader.read_new_events().unwrap();
398
399 assert_eq!(result.events.len(), 3);
400
401 assert_eq!(result.events[0].payload, Some("Start work".to_string()));
403
404 let payload2 = result.events[1].payload.as_ref().unwrap();
406 assert!(payload2.contains("\"result\""));
407
408 assert_eq!(result.events[2].payload, None);
410 }
411
412 #[test]
413 fn test_nested_object_payload() {
414 let mut file = NamedTempFile::new().unwrap();
416 writeln!(
417 file,
418 r#"{{"topic":"analysis","payload":{{"issues":[{{"file":"test.rs","line":42,"severity":"major"}}],"approval":"conditional"}},"ts":"2024-01-01T00:00:00Z"}}"#
419 )
420 .unwrap();
421 file.flush().unwrap();
422
423 let mut reader = EventReader::new(file.path());
424 let result = reader.read_new_events().unwrap();
425
426 assert_eq!(result.events.len(), 1);
427
428 let payload = result.events[0].payload.as_ref().unwrap();
430 let parsed: serde_json::Value = serde_json::from_str(payload).unwrap();
431 assert_eq!(parsed["issues"][0]["file"], "test.rs");
432 assert_eq!(parsed["issues"][0]["line"], 42);
433 assert_eq!(parsed["approval"], "conditional");
434 }
435
436 #[test]
437 fn test_mixed_valid_invalid_handling() {
438 let mut file = NamedTempFile::new().unwrap();
440 writeln!(file, r#"{{"topic":"valid1","ts":"2024-01-01T00:00:00Z"}}"#).unwrap();
441 writeln!(file, "not valid json at all").unwrap();
442 writeln!(file, r#"{{"topic":"valid2","ts":"2024-01-01T00:00:01Z"}}"#).unwrap();
443 file.flush().unwrap();
444
445 let mut reader = EventReader::new(file.path());
446 let result = reader.read_new_events().unwrap();
447
448 assert_eq!(result.events.len(), 2);
449 assert_eq!(result.malformed.len(), 1);
450 assert_eq!(result.events[0].topic, "valid1");
451 assert_eq!(result.events[1].topic, "valid2");
452 }
453}