1use serde::{Deserialize, Deserializer, Serialize};
4use std::fs::File;
5use std::io::{BufRead, BufReader, Seek, SeekFrom};
6use std::path::PathBuf;
7use tracing::warn;
8
9#[derive(Debug, Clone, Default)]
15pub struct ParseResult {
16 pub events: Vec<Event>,
18 pub malformed: Vec<MalformedLine>,
20}
21
22#[derive(Debug, Clone, Serialize)]
27pub struct MalformedLine {
28 pub line_number: u64,
30 pub content: String,
32 pub error: String,
34}
35
36impl MalformedLine {
37 const MAX_CONTENT_LEN: usize = 100;
39
40 pub fn new(line_number: u64, content: &str, error: String) -> Self {
42 let content = if content.len() > Self::MAX_CONTENT_LEN {
43 format!("{}...", &content[..Self::MAX_CONTENT_LEN])
44 } else {
45 content.to_string()
46 };
47 Self {
48 line_number,
49 content,
50 error,
51 }
52 }
53}
54
55fn deserialize_flexible_payload<'de, D>(deserializer: D) -> Result<Option<String>, D::Error>
64where
65 D: Deserializer<'de>,
66{
67 #[derive(Deserialize)]
68 #[serde(untagged)]
69 enum FlexiblePayload {
70 String(String),
71 Object(serde_json::Value),
72 }
73
74 let opt = Option::<FlexiblePayload>::deserialize(deserializer)?;
75 Ok(opt.map(|flex| match flex {
76 FlexiblePayload::String(s) => s,
77 FlexiblePayload::Object(obj) => {
78 serde_json::to_string(&obj).unwrap_or_else(|_| obj.to_string())
80 }
81 }))
82}
83
84#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
86pub struct Event {
87 pub topic: String,
88 #[serde(
89 default,
90 skip_serializing_if = "Option::is_none",
91 deserialize_with = "deserialize_flexible_payload"
92 )]
93 pub payload: Option<String>,
94 pub ts: String,
95}
96
97pub struct EventReader {
99 path: PathBuf,
100 position: u64,
101}
102
103impl EventReader {
104 pub fn new(path: impl Into<PathBuf>) -> Self {
106 Self {
107 path: path.into(),
108 position: 0,
109 }
110 }
111
112 pub fn read_new_events(&mut self) -> std::io::Result<ParseResult> {
123 if !self.path.exists() {
124 return Ok(ParseResult::default());
125 }
126
127 let mut file = File::open(&self.path)?;
128 file.seek(SeekFrom::Start(self.position))?;
129
130 let reader = BufReader::new(file);
131 let mut result = ParseResult::default();
132 let mut current_pos = self.position;
133 let mut line_number = self.count_lines_before_position();
134
135 for line in reader.lines() {
136 let line = line?;
137 let line_bytes = line.len() as u64 + 1; line_number += 1;
139
140 if line.trim().is_empty() {
141 current_pos += line_bytes;
142 continue;
143 }
144
145 match serde_json::from_str::<Event>(&line) {
146 Ok(event) => result.events.push(event),
147 Err(e) => {
148 warn!(error = %e, line_number = line_number, "Malformed JSON line");
149 result.malformed.push(MalformedLine::new(
150 line_number,
151 &line,
152 e.to_string(),
153 ));
154 }
155 }
156
157 current_pos += line_bytes;
158 }
159
160 self.position = current_pos;
161 Ok(result)
162 }
163
164 fn count_lines_before_position(&self) -> u64 {
166 if self.position == 0 || !self.path.exists() {
167 return 0;
168 }
169 if let Ok(file) = File::open(&self.path) {
171 let reader = BufReader::new(file);
172 let mut count = 0u64;
173 let mut bytes_read = 0u64;
174 for line in reader.lines() {
175 if let Ok(line) = line {
176 bytes_read += line.len() as u64 + 1;
177 if bytes_read > self.position {
178 break;
179 }
180 count += 1;
181 } else {
182 break;
183 }
184 }
185 count
186 } else {
187 0
188 }
189 }
190
191 pub fn position(&self) -> u64 {
193 self.position
194 }
195
196 pub fn reset(&mut self) {
198 self.position = 0;
199 }
200}
201
202#[cfg(test)]
203mod tests {
204 use super::*;
205 use std::io::Write;
206 use tempfile::NamedTempFile;
207
208 #[test]
209 fn test_read_new_events() {
210 let mut file = NamedTempFile::new().unwrap();
211 writeln!(
212 file,
213 r#"{{"topic":"test","payload":"hello","ts":"2024-01-01T00:00:00Z"}}"#
214 )
215 .unwrap();
216 writeln!(
217 file,
218 r#"{{"topic":"test2","ts":"2024-01-01T00:00:01Z"}}"#
219 )
220 .unwrap();
221 file.flush().unwrap();
222
223 let mut reader = EventReader::new(file.path());
224 let result = reader.read_new_events().unwrap();
225
226 assert_eq!(result.events.len(), 2);
227 assert_eq!(result.events[0].topic, "test");
228 assert_eq!(result.events[0].payload, Some("hello".to_string()));
229 assert_eq!(result.events[1].topic, "test2");
230 assert_eq!(result.events[1].payload, None);
231 assert!(result.malformed.is_empty());
232 }
233
234 #[test]
235 fn test_tracks_position() {
236 let mut file = NamedTempFile::new().unwrap();
237 writeln!(
238 file,
239 r#"{{"topic":"first","ts":"2024-01-01T00:00:00Z"}}"#
240 )
241 .unwrap();
242 file.flush().unwrap();
243
244 let mut reader = EventReader::new(file.path());
245 let result = reader.read_new_events().unwrap();
246 assert_eq!(result.events.len(), 1);
247
248 writeln!(
250 file,
251 r#"{{"topic":"second","ts":"2024-01-01T00:00:01Z"}}"#
252 )
253 .unwrap();
254 file.flush().unwrap();
255
256 let result = reader.read_new_events().unwrap();
258 assert_eq!(result.events.len(), 1);
259 assert_eq!(result.events[0].topic, "second");
260 }
261
262 #[test]
263 fn test_missing_file() {
264 let mut reader = EventReader::new("/nonexistent/path.jsonl");
265 let result = reader.read_new_events().unwrap();
266 assert!(result.events.is_empty());
267 assert!(result.malformed.is_empty());
268 }
269
270 #[test]
271 fn test_captures_malformed_lines() {
272 let mut file = NamedTempFile::new().unwrap();
273 writeln!(
274 file,
275 r#"{{"topic":"good","ts":"2024-01-01T00:00:00Z"}}"#
276 )
277 .unwrap();
278 writeln!(file, r"{{corrupt json}}").unwrap();
279 writeln!(
280 file,
281 r#"{{"topic":"also_good","ts":"2024-01-01T00:00:01Z"}}"#
282 )
283 .unwrap();
284 file.flush().unwrap();
285
286 let mut reader = EventReader::new(file.path());
287 let result = reader.read_new_events().unwrap();
288
289 assert_eq!(result.events.len(), 2);
291 assert_eq!(result.events[0].topic, "good");
292 assert_eq!(result.events[1].topic, "also_good");
293
294 assert_eq!(result.malformed.len(), 1);
296 assert_eq!(result.malformed[0].line_number, 2);
297 assert!(result.malformed[0].content.contains("corrupt json"));
298 assert!(!result.malformed[0].error.is_empty());
299 }
300
301 #[test]
302 fn test_empty_file() {
303 let file = NamedTempFile::new().unwrap();
304 let mut reader = EventReader::new(file.path());
305 let result = reader.read_new_events().unwrap();
306 assert!(result.events.is_empty());
307 assert!(result.malformed.is_empty());
308 }
309
310 #[test]
311 fn test_reset_position() {
312 let mut file = NamedTempFile::new().unwrap();
313 writeln!(
314 file,
315 r#"{{"topic":"test","ts":"2024-01-01T00:00:00Z"}}"#
316 )
317 .unwrap();
318 file.flush().unwrap();
319
320 let mut reader = EventReader::new(file.path());
321 reader.read_new_events().unwrap();
322 assert!(reader.position() > 0);
323
324 reader.reset();
325 assert_eq!(reader.position(), 0);
326
327 let result = reader.read_new_events().unwrap();
328 assert_eq!(result.events.len(), 1);
329 }
330
331 #[test]
332 fn test_structured_payload_as_object() {
333 let mut file = NamedTempFile::new().unwrap();
335 writeln!(
336 file,
337 r#"{{"topic":"review.done","payload":{{"status":"approved","files":["a.rs","b.rs"]}},"ts":"2024-01-01T00:00:00Z"}}"#
338 )
339 .unwrap();
340 file.flush().unwrap();
341
342 let mut reader = EventReader::new(file.path());
343 let result = reader.read_new_events().unwrap();
344
345 assert_eq!(result.events.len(), 1);
346 assert_eq!(result.events[0].topic, "review.done");
347
348 let payload = result.events[0].payload.as_ref().unwrap();
350 assert!(payload.contains("\"status\""));
351 assert!(payload.contains("\"approved\""));
352 assert!(payload.contains("\"files\""));
353
354 let parsed: serde_json::Value = serde_json::from_str(payload).unwrap();
356 assert_eq!(parsed["status"], "approved");
357 }
358
359 #[test]
360 fn test_mixed_payload_formats() {
361 let mut file = NamedTempFile::new().unwrap();
363
364 writeln!(
366 file,
367 r#"{{"topic":"task.start","payload":"Start work","ts":"2024-01-01T00:00:00Z"}}"#
368 )
369 .unwrap();
370
371 writeln!(
373 file,
374 r#"{{"topic":"task.done","payload":{{"result":"success"}},"ts":"2024-01-01T00:00:01Z"}}"#
375 )
376 .unwrap();
377
378 writeln!(
380 file,
381 r#"{{"topic":"heartbeat","ts":"2024-01-01T00:00:02Z"}}"#
382 )
383 .unwrap();
384
385 file.flush().unwrap();
386
387 let mut reader = EventReader::new(file.path());
388 let result = reader.read_new_events().unwrap();
389
390 assert_eq!(result.events.len(), 3);
391
392 assert_eq!(result.events[0].payload, Some("Start work".to_string()));
394
395 let payload2 = result.events[1].payload.as_ref().unwrap();
397 assert!(payload2.contains("\"result\""));
398
399 assert_eq!(result.events[2].payload, None);
401 }
402
403 #[test]
404 fn test_nested_object_payload() {
405 let mut file = NamedTempFile::new().unwrap();
407 writeln!(
408 file,
409 r#"{{"topic":"analysis","payload":{{"issues":[{{"file":"test.rs","line":42,"severity":"major"}}],"approval":"conditional"}},"ts":"2024-01-01T00:00:00Z"}}"#
410 )
411 .unwrap();
412 file.flush().unwrap();
413
414 let mut reader = EventReader::new(file.path());
415 let result = reader.read_new_events().unwrap();
416
417 assert_eq!(result.events.len(), 1);
418
419 let payload = result.events[0].payload.as_ref().unwrap();
421 let parsed: serde_json::Value = serde_json::from_str(payload).unwrap();
422 assert_eq!(parsed["issues"][0]["file"], "test.rs");
423 assert_eq!(parsed["issues"][0]["line"], 42);
424 assert_eq!(parsed["approval"], "conditional");
425 }
426
427 #[test]
428 fn test_mixed_valid_invalid_handling() {
429 let mut file = NamedTempFile::new().unwrap();
431 writeln!(
432 file,
433 r#"{{"topic":"valid1","ts":"2024-01-01T00:00:00Z"}}"#
434 )
435 .unwrap();
436 writeln!(file, "not valid json at all").unwrap();
437 writeln!(
438 file,
439 r#"{{"topic":"valid2","ts":"2024-01-01T00:00:01Z"}}"#
440 )
441 .unwrap();
442 file.flush().unwrap();
443
444 let mut reader = EventReader::new(file.path());
445 let result = reader.read_new_events().unwrap();
446
447 assert_eq!(result.events.len(), 2);
448 assert_eq!(result.malformed.len(), 1);
449 assert_eq!(result.events[0].topic, "valid1");
450 assert_eq!(result.events[1].topic, "valid2");
451 }
452}