1use serde::{Deserialize, Deserializer, Serialize};
4use std::fs::File;
5use std::io::{BufRead, BufReader, Seek, SeekFrom};
6use std::path::PathBuf;
7use tracing::warn;
8
9#[derive(Debug, Clone, Default)]
15pub struct ParseResult {
16 pub events: Vec<Event>,
18 pub malformed: Vec<MalformedLine>,
20}
21
22#[derive(Debug, Clone, Serialize)]
27pub struct MalformedLine {
28 pub line_number: u64,
30 pub content: String,
32 pub error: String,
34}
35
36impl MalformedLine {
37 const MAX_CONTENT_LEN: usize = 100;
39
40 pub fn new(line_number: u64, content: &str, error: String) -> Self {
42 let content = if content.len() > Self::MAX_CONTENT_LEN {
43 format!("{}...", &content[..Self::MAX_CONTENT_LEN])
44 } else {
45 content.to_string()
46 };
47 Self {
48 line_number,
49 content,
50 error,
51 }
52 }
53}
54
55fn deserialize_flexible_payload<'de, D>(deserializer: D) -> Result<Option<String>, D::Error>
64where
65 D: Deserializer<'de>,
66{
67 #[derive(Deserialize)]
68 #[serde(untagged)]
69 enum FlexiblePayload {
70 String(String),
71 Object(serde_json::Value),
72 }
73
74 let opt = Option::<FlexiblePayload>::deserialize(deserializer)?;
75 Ok(opt.map(|flex| match flex {
76 FlexiblePayload::String(s) => s,
77 FlexiblePayload::Object(obj) => {
78 serde_json::to_string(&obj).unwrap_or_else(|_| obj.to_string())
80 }
81 }))
82}
83
84#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
86pub struct Event {
87 pub topic: String,
88 #[serde(
89 default,
90 skip_serializing_if = "Option::is_none",
91 deserialize_with = "deserialize_flexible_payload"
92 )]
93 pub payload: Option<String>,
94 pub ts: String,
95}
96
97pub struct EventReader {
99 path: PathBuf,
100 position: u64,
101}
102
103impl EventReader {
104 pub fn new(path: impl Into<PathBuf>) -> Self {
106 Self {
107 path: path.into(),
108 position: 0,
109 }
110 }
111
112 pub fn read_new_events(&mut self) -> std::io::Result<ParseResult> {
123 if !self.path.exists() {
124 return Ok(ParseResult::default());
125 }
126
127 let mut file = File::open(&self.path)?;
128 file.seek(SeekFrom::Start(self.position))?;
129
130 let reader = BufReader::new(file);
131 let mut result = ParseResult::default();
132 let mut current_pos = self.position;
133 let mut line_number = self.count_lines_before_position();
134
135 for line in reader.lines() {
136 let line = line?;
137 let line_bytes = line.len() as u64 + 1; line_number += 1;
139
140 if line.trim().is_empty() {
141 current_pos += line_bytes;
142 continue;
143 }
144
145 match serde_json::from_str::<Event>(&line) {
146 Ok(event) => result.events.push(event),
147 Err(e) => {
148 warn!(error = %e, line_number = line_number, "Malformed JSON line");
149 result
150 .malformed
151 .push(MalformedLine::new(line_number, &line, e.to_string()));
152 }
153 }
154
155 current_pos += line_bytes;
156 }
157
158 self.position = current_pos;
159 Ok(result)
160 }
161
162 fn count_lines_before_position(&self) -> u64 {
164 if self.position == 0 || !self.path.exists() {
165 return 0;
166 }
167 if let Ok(file) = File::open(&self.path) {
169 let reader = BufReader::new(file);
170 let mut count = 0u64;
171 let mut bytes_read = 0u64;
172 for line in reader.lines() {
173 if let Ok(line) = line {
174 bytes_read += line.len() as u64 + 1;
175 if bytes_read > self.position {
176 break;
177 }
178 count += 1;
179 } else {
180 break;
181 }
182 }
183 count
184 } else {
185 0
186 }
187 }
188
189 pub fn position(&self) -> u64 {
191 self.position
192 }
193
194 pub fn reset(&mut self) {
196 self.position = 0;
197 }
198}
199
200#[cfg(test)]
201mod tests {
202 use super::*;
203 use std::io::Write;
204 use tempfile::NamedTempFile;
205
206 #[test]
207 fn test_read_new_events() {
208 let mut file = NamedTempFile::new().unwrap();
209 writeln!(
210 file,
211 r#"{{"topic":"test","payload":"hello","ts":"2024-01-01T00:00:00Z"}}"#
212 )
213 .unwrap();
214 writeln!(file, r#"{{"topic":"test2","ts":"2024-01-01T00:00:01Z"}}"#).unwrap();
215 file.flush().unwrap();
216
217 let mut reader = EventReader::new(file.path());
218 let result = reader.read_new_events().unwrap();
219
220 assert_eq!(result.events.len(), 2);
221 assert_eq!(result.events[0].topic, "test");
222 assert_eq!(result.events[0].payload, Some("hello".to_string()));
223 assert_eq!(result.events[1].topic, "test2");
224 assert_eq!(result.events[1].payload, None);
225 assert!(result.malformed.is_empty());
226 }
227
228 #[test]
229 fn test_tracks_position() {
230 let mut file = NamedTempFile::new().unwrap();
231 writeln!(file, r#"{{"topic":"first","ts":"2024-01-01T00:00:00Z"}}"#).unwrap();
232 file.flush().unwrap();
233
234 let mut reader = EventReader::new(file.path());
235 let result = reader.read_new_events().unwrap();
236 assert_eq!(result.events.len(), 1);
237
238 writeln!(file, r#"{{"topic":"second","ts":"2024-01-01T00:00:01Z"}}"#).unwrap();
240 file.flush().unwrap();
241
242 let result = reader.read_new_events().unwrap();
244 assert_eq!(result.events.len(), 1);
245 assert_eq!(result.events[0].topic, "second");
246 }
247
248 #[test]
249 fn test_missing_file() {
250 let mut reader = EventReader::new("/nonexistent/path.jsonl");
251 let result = reader.read_new_events().unwrap();
252 assert!(result.events.is_empty());
253 assert!(result.malformed.is_empty());
254 }
255
256 #[test]
257 fn test_captures_malformed_lines() {
258 let mut file = NamedTempFile::new().unwrap();
259 writeln!(file, r#"{{"topic":"good","ts":"2024-01-01T00:00:00Z"}}"#).unwrap();
260 writeln!(file, r"{{corrupt json}}").unwrap();
261 writeln!(
262 file,
263 r#"{{"topic":"also_good","ts":"2024-01-01T00:00:01Z"}}"#
264 )
265 .unwrap();
266 file.flush().unwrap();
267
268 let mut reader = EventReader::new(file.path());
269 let result = reader.read_new_events().unwrap();
270
271 assert_eq!(result.events.len(), 2);
273 assert_eq!(result.events[0].topic, "good");
274 assert_eq!(result.events[1].topic, "also_good");
275
276 assert_eq!(result.malformed.len(), 1);
278 assert_eq!(result.malformed[0].line_number, 2);
279 assert!(result.malformed[0].content.contains("corrupt json"));
280 assert!(!result.malformed[0].error.is_empty());
281 }
282
283 #[test]
284 fn test_empty_file() {
285 let file = NamedTempFile::new().unwrap();
286 let mut reader = EventReader::new(file.path());
287 let result = reader.read_new_events().unwrap();
288 assert!(result.events.is_empty());
289 assert!(result.malformed.is_empty());
290 }
291
292 #[test]
293 fn test_reset_position() {
294 let mut file = NamedTempFile::new().unwrap();
295 writeln!(file, r#"{{"topic":"test","ts":"2024-01-01T00:00:00Z"}}"#).unwrap();
296 file.flush().unwrap();
297
298 let mut reader = EventReader::new(file.path());
299 reader.read_new_events().unwrap();
300 assert!(reader.position() > 0);
301
302 reader.reset();
303 assert_eq!(reader.position(), 0);
304
305 let result = reader.read_new_events().unwrap();
306 assert_eq!(result.events.len(), 1);
307 }
308
309 #[test]
310 fn test_structured_payload_as_object() {
311 let mut file = NamedTempFile::new().unwrap();
313 writeln!(
314 file,
315 r#"{{"topic":"review.done","payload":{{"status":"approved","files":["a.rs","b.rs"]}},"ts":"2024-01-01T00:00:00Z"}}"#
316 )
317 .unwrap();
318 file.flush().unwrap();
319
320 let mut reader = EventReader::new(file.path());
321 let result = reader.read_new_events().unwrap();
322
323 assert_eq!(result.events.len(), 1);
324 assert_eq!(result.events[0].topic, "review.done");
325
326 let payload = result.events[0].payload.as_ref().unwrap();
328 assert!(payload.contains("\"status\""));
329 assert!(payload.contains("\"approved\""));
330 assert!(payload.contains("\"files\""));
331
332 let parsed: serde_json::Value = serde_json::from_str(payload).unwrap();
334 assert_eq!(parsed["status"], "approved");
335 }
336
337 #[test]
338 fn test_mixed_payload_formats() {
339 let mut file = NamedTempFile::new().unwrap();
341
342 writeln!(
344 file,
345 r#"{{"topic":"task.start","payload":"Start work","ts":"2024-01-01T00:00:00Z"}}"#
346 )
347 .unwrap();
348
349 writeln!(
351 file,
352 r#"{{"topic":"task.done","payload":{{"result":"success"}},"ts":"2024-01-01T00:00:01Z"}}"#
353 )
354 .unwrap();
355
356 writeln!(
358 file,
359 r#"{{"topic":"heartbeat","ts":"2024-01-01T00:00:02Z"}}"#
360 )
361 .unwrap();
362
363 file.flush().unwrap();
364
365 let mut reader = EventReader::new(file.path());
366 let result = reader.read_new_events().unwrap();
367
368 assert_eq!(result.events.len(), 3);
369
370 assert_eq!(result.events[0].payload, Some("Start work".to_string()));
372
373 let payload2 = result.events[1].payload.as_ref().unwrap();
375 assert!(payload2.contains("\"result\""));
376
377 assert_eq!(result.events[2].payload, None);
379 }
380
381 #[test]
382 fn test_nested_object_payload() {
383 let mut file = NamedTempFile::new().unwrap();
385 writeln!(
386 file,
387 r#"{{"topic":"analysis","payload":{{"issues":[{{"file":"test.rs","line":42,"severity":"major"}}],"approval":"conditional"}},"ts":"2024-01-01T00:00:00Z"}}"#
388 )
389 .unwrap();
390 file.flush().unwrap();
391
392 let mut reader = EventReader::new(file.path());
393 let result = reader.read_new_events().unwrap();
394
395 assert_eq!(result.events.len(), 1);
396
397 let payload = result.events[0].payload.as_ref().unwrap();
399 let parsed: serde_json::Value = serde_json::from_str(payload).unwrap();
400 assert_eq!(parsed["issues"][0]["file"], "test.rs");
401 assert_eq!(parsed["issues"][0]["line"], 42);
402 assert_eq!(parsed["approval"], "conditional");
403 }
404
405 #[test]
406 fn test_mixed_valid_invalid_handling() {
407 let mut file = NamedTempFile::new().unwrap();
409 writeln!(file, r#"{{"topic":"valid1","ts":"2024-01-01T00:00:00Z"}}"#).unwrap();
410 writeln!(file, "not valid json at all").unwrap();
411 writeln!(file, r#"{{"topic":"valid2","ts":"2024-01-01T00:00:01Z"}}"#).unwrap();
412 file.flush().unwrap();
413
414 let mut reader = EventReader::new(file.path());
415 let result = reader.read_new_events().unwrap();
416
417 assert_eq!(result.events.len(), 2);
418 assert_eq!(result.malformed.len(), 1);
419 assert_eq!(result.events[0].topic, "valid1");
420 assert_eq!(result.events[1].topic, "valid2");
421 }
422}