use serde::{Deserialize, Deserializer, Serialize};
use std::fs::File;
use std::io::{BufRead, BufReader, Seek, SeekFrom};
use std::path::PathBuf;
use tracing::warn;
#[derive(Debug, Clone, Default)]
pub struct ParseResult {
pub events: Vec<Event>,
pub malformed: Vec<MalformedLine>,
}
#[derive(Debug, Clone, Serialize)]
pub struct MalformedLine {
pub line_number: u64,
pub content: String,
pub error: String,
}
impl MalformedLine {
const MAX_CONTENT_LEN: usize = 100;
pub fn new(line_number: u64, content: &str, error: String) -> Self {
let content = if content.len() > Self::MAX_CONTENT_LEN {
let truncate_at = crate::text::floor_char_boundary(content, Self::MAX_CONTENT_LEN);
format!("{}...", &content[..truncate_at])
} else {
content.to_string()
};
Self {
line_number,
content,
error,
}
}
}
fn deserialize_flexible_payload<'de, D>(deserializer: D) -> Result<Option<String>, D::Error>
where
D: Deserializer<'de>,
{
#[derive(Deserialize)]
#[serde(untagged)]
enum FlexiblePayload {
String(String),
Object(serde_json::Value),
}
let opt = Option::<FlexiblePayload>::deserialize(deserializer)?;
Ok(opt.map(|flex| match flex {
FlexiblePayload::String(s) => s,
FlexiblePayload::Object(obj) => {
serde_json::to_string(&obj).unwrap_or_else(|_| obj.to_string())
}
}))
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct Event {
pub topic: String,
#[serde(
default,
skip_serializing_if = "Option::is_none",
deserialize_with = "deserialize_flexible_payload"
)]
pub payload: Option<String>,
pub ts: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub wave_id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub wave_index: Option<u32>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub wave_total: Option<u32>,
}
impl Event {
pub fn is_wave_event(&self) -> bool {
self.wave_id.is_some()
}
}
impl From<Event> for ralph_proto::Event {
fn from(e: Event) -> Self {
let mut pe = ralph_proto::Event::new(e.topic.as_str(), e.payload.unwrap_or_default());
if let Some(wave_id) = e.wave_id {
let index = e.wave_index.unwrap_or(0);
let total = e.wave_total.unwrap_or(1);
pe = pe.with_wave(wave_id, index, total);
}
pe
}
}
pub struct EventReader {
path: PathBuf,
position: u64,
}
impl EventReader {
pub fn new(path: impl Into<PathBuf>) -> Self {
Self {
path: path.into(),
position: 0,
}
}
pub fn read_new_events(&mut self) -> std::io::Result<ParseResult> {
if !self.path.exists() {
return Ok(ParseResult::default());
}
let mut file = File::open(&self.path)?;
file.seek(SeekFrom::Start(self.position))?;
let reader = BufReader::new(file);
let mut result = ParseResult::default();
let mut current_pos = self.position;
let mut line_number = self.count_lines_before_position();
for line in reader.lines() {
let line = line?;
let line_bytes = line.len() as u64 + 1; line_number += 1;
if line.trim().is_empty() {
current_pos += line_bytes;
continue;
}
match serde_json::from_str::<Event>(&line) {
Ok(event) => result.events.push(event),
Err(e) => {
warn!(error = %e, line_number = line_number, "Malformed JSON line");
result
.malformed
.push(MalformedLine::new(line_number, &line, e.to_string()));
}
}
current_pos += line_bytes;
}
self.position = current_pos;
Ok(result)
}
pub fn peek_new_events(&self) -> std::io::Result<ParseResult> {
let mut reader = Self {
path: self.path.clone(),
position: self.position,
};
reader.read_new_events()
}
fn count_lines_before_position(&self) -> u64 {
if self.position == 0 || !self.path.exists() {
return 0;
}
if let Ok(file) = File::open(&self.path) {
let reader = BufReader::new(file);
let mut count = 0u64;
let mut bytes_read = 0u64;
for line in reader.lines() {
if let Ok(line) = line {
bytes_read += line.len() as u64 + 1;
if bytes_read > self.position {
break;
}
count += 1;
} else {
break;
}
}
count
} else {
0
}
}
pub fn path(&self) -> &std::path::Path {
&self.path
}
pub fn position(&self) -> u64 {
self.position
}
pub fn set_position(&mut self, position: u64) {
self.position = position;
}
pub fn reset(&mut self) {
self.position = 0;
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Write;
use tempfile::NamedTempFile;
#[test]
fn test_read_new_events() {
let mut file = NamedTempFile::new().unwrap();
writeln!(
file,
r#"{{"topic":"test","payload":"hello","ts":"2024-01-01T00:00:00Z"}}"#
)
.unwrap();
writeln!(file, r#"{{"topic":"test2","ts":"2024-01-01T00:00:01Z"}}"#).unwrap();
file.flush().unwrap();
let mut reader = EventReader::new(file.path());
let result = reader.read_new_events().unwrap();
assert_eq!(result.events.len(), 2);
assert_eq!(result.events[0].topic, "test");
assert_eq!(result.events[0].payload, Some("hello".to_string()));
assert_eq!(result.events[1].topic, "test2");
assert_eq!(result.events[1].payload, None);
assert!(result.malformed.is_empty());
}
#[test]
fn test_tracks_position() {
let mut file = NamedTempFile::new().unwrap();
writeln!(file, r#"{{"topic":"first","ts":"2024-01-01T00:00:00Z"}}"#).unwrap();
file.flush().unwrap();
let mut reader = EventReader::new(file.path());
let result = reader.read_new_events().unwrap();
assert_eq!(result.events.len(), 1);
writeln!(file, r#"{{"topic":"second","ts":"2024-01-01T00:00:01Z"}}"#).unwrap();
file.flush().unwrap();
let result = reader.read_new_events().unwrap();
assert_eq!(result.events.len(), 1);
assert_eq!(result.events[0].topic, "second");
}
#[test]
fn test_peek_new_events_does_not_advance_position() {
let mut file = NamedTempFile::new().unwrap();
writeln!(file, r#"{{"topic":"first","ts":"2024-01-01T00:00:00Z"}}"#).unwrap();
file.flush().unwrap();
let mut reader = EventReader::new(file.path());
let peeked = reader.peek_new_events().unwrap();
assert_eq!(peeked.events.len(), 1);
assert_eq!(peeked.events[0].topic, "first");
assert_eq!(reader.position(), 0);
let consumed = reader.read_new_events().unwrap();
assert_eq!(consumed.events.len(), 1);
assert_eq!(consumed.events[0].topic, "first");
}
#[test]
fn test_missing_file() {
let mut reader = EventReader::new("/nonexistent/path.jsonl");
let result = reader.read_new_events().unwrap();
assert!(result.events.is_empty());
assert!(result.malformed.is_empty());
}
#[test]
fn test_captures_malformed_lines() {
let mut file = NamedTempFile::new().unwrap();
writeln!(file, r#"{{"topic":"good","ts":"2024-01-01T00:00:00Z"}}"#).unwrap();
writeln!(file, r"{{corrupt json}}").unwrap();
writeln!(
file,
r#"{{"topic":"also_good","ts":"2024-01-01T00:00:01Z"}}"#
)
.unwrap();
file.flush().unwrap();
let mut reader = EventReader::new(file.path());
let result = reader.read_new_events().unwrap();
assert_eq!(result.events.len(), 2);
assert_eq!(result.events[0].topic, "good");
assert_eq!(result.events[1].topic, "also_good");
assert_eq!(result.malformed.len(), 1);
assert_eq!(result.malformed[0].line_number, 2);
assert!(result.malformed[0].content.contains("corrupt json"));
assert!(!result.malformed[0].error.is_empty());
}
#[test]
fn test_empty_file() {
let file = NamedTempFile::new().unwrap();
let mut reader = EventReader::new(file.path());
let result = reader.read_new_events().unwrap();
assert!(result.events.is_empty());
assert!(result.malformed.is_empty());
}
#[test]
fn test_reset_position() {
let mut file = NamedTempFile::new().unwrap();
writeln!(file, r#"{{"topic":"test","ts":"2024-01-01T00:00:00Z"}}"#).unwrap();
file.flush().unwrap();
let mut reader = EventReader::new(file.path());
reader.read_new_events().unwrap();
assert!(reader.position() > 0);
reader.reset();
assert_eq!(reader.position(), 0);
let result = reader.read_new_events().unwrap();
assert_eq!(result.events.len(), 1);
}
#[test]
fn test_structured_payload_as_object() {
let mut file = NamedTempFile::new().unwrap();
writeln!(
file,
r#"{{"topic":"review.done","payload":{{"status":"approved","files":["a.rs","b.rs"]}},"ts":"2024-01-01T00:00:00Z"}}"#
)
.unwrap();
file.flush().unwrap();
let mut reader = EventReader::new(file.path());
let result = reader.read_new_events().unwrap();
assert_eq!(result.events.len(), 1);
assert_eq!(result.events[0].topic, "review.done");
let payload = result.events[0].payload.as_ref().unwrap();
assert!(payload.contains("\"status\""));
assert!(payload.contains("\"approved\""));
assert!(payload.contains("\"files\""));
let parsed: serde_json::Value = serde_json::from_str(payload).unwrap();
assert_eq!(parsed["status"], "approved");
}
#[test]
fn test_mixed_payload_formats() {
let mut file = NamedTempFile::new().unwrap();
writeln!(
file,
r#"{{"topic":"task.start","payload":"Start work","ts":"2024-01-01T00:00:00Z"}}"#
)
.unwrap();
writeln!(
file,
r#"{{"topic":"task.done","payload":{{"result":"success"}},"ts":"2024-01-01T00:00:01Z"}}"#
)
.unwrap();
writeln!(
file,
r#"{{"topic":"heartbeat","ts":"2024-01-01T00:00:02Z"}}"#
)
.unwrap();
file.flush().unwrap();
let mut reader = EventReader::new(file.path());
let result = reader.read_new_events().unwrap();
assert_eq!(result.events.len(), 3);
assert_eq!(result.events[0].payload, Some("Start work".to_string()));
let payload2 = result.events[1].payload.as_ref().unwrap();
assert!(payload2.contains("\"result\""));
assert_eq!(result.events[2].payload, None);
}
#[test]
fn test_nested_object_payload() {
let mut file = NamedTempFile::new().unwrap();
writeln!(
file,
r#"{{"topic":"analysis","payload":{{"issues":[{{"file":"test.rs","line":42,"severity":"major"}}],"approval":"conditional"}},"ts":"2024-01-01T00:00:00Z"}}"#
)
.unwrap();
file.flush().unwrap();
let mut reader = EventReader::new(file.path());
let result = reader.read_new_events().unwrap();
assert_eq!(result.events.len(), 1);
let payload = result.events[0].payload.as_ref().unwrap();
let parsed: serde_json::Value = serde_json::from_str(payload).unwrap();
assert_eq!(parsed["issues"][0]["file"], "test.rs");
assert_eq!(parsed["issues"][0]["line"], 42);
assert_eq!(parsed["approval"], "conditional");
}
#[test]
fn test_event_reader_parses_wave_metadata() {
let mut file = NamedTempFile::new().unwrap();
writeln!(
file,
r#"{{"topic":"review.file","payload":"src/main.rs","ts":"2024-01-01T00:00:00Z","wave_id":"w-1a2b3c4d","wave_index":0,"wave_total":3}}"#
)
.unwrap();
writeln!(
file,
r#"{{"topic":"review.file","payload":"src/lib.rs","ts":"2024-01-01T00:00:00Z","wave_id":"w-1a2b3c4d","wave_index":1,"wave_total":3}}"#
)
.unwrap();
file.flush().unwrap();
let mut reader = EventReader::new(file.path());
let result = reader.read_new_events().unwrap();
assert_eq!(result.events.len(), 2);
assert!(result.events[0].is_wave_event());
assert_eq!(result.events[0].wave_id.as_deref(), Some("w-1a2b3c4d"));
assert_eq!(result.events[0].wave_index, Some(0));
assert_eq!(result.events[0].wave_total, Some(3));
assert_eq!(result.events[1].wave_index, Some(1));
}
#[test]
fn test_event_reader_backwards_compat_no_wave_fields() {
let mut file = NamedTempFile::new().unwrap();
writeln!(
file,
r#"{{"topic":"build.done","payload":"ok","ts":"2024-01-01T00:00:00Z"}}"#
)
.unwrap();
file.flush().unwrap();
let mut reader = EventReader::new(file.path());
let result = reader.read_new_events().unwrap();
assert_eq!(result.events.len(), 1);
assert!(!result.events[0].is_wave_event());
assert!(result.events[0].wave_id.is_none());
assert!(result.events[0].wave_index.is_none());
assert!(result.events[0].wave_total.is_none());
}
#[test]
fn test_event_reader_mixed_wave_and_non_wave() {
let mut file = NamedTempFile::new().unwrap();
writeln!(
file,
r#"{{"topic":"task.start","payload":"begin","ts":"2024-01-01T00:00:00Z"}}"#
)
.unwrap();
writeln!(
file,
r#"{{"topic":"review.file","payload":"src/main.rs","ts":"2024-01-01T00:00:01Z","wave_id":"w-abc","wave_index":0,"wave_total":2}}"#
)
.unwrap();
writeln!(
file,
r#"{{"topic":"build.done","ts":"2024-01-01T00:00:02Z"}}"#
)
.unwrap();
file.flush().unwrap();
let mut reader = EventReader::new(file.path());
let result = reader.read_new_events().unwrap();
assert_eq!(result.events.len(), 3);
assert!(!result.events[0].is_wave_event());
assert!(result.events[1].is_wave_event());
assert_eq!(result.events[1].wave_id.as_deref(), Some("w-abc"));
assert!(!result.events[2].is_wave_event());
}
#[test]
fn test_from_event_reader_to_proto_without_wave() {
let event = Event {
topic: "build.done".to_string(),
payload: Some("success".to_string()),
ts: "2024-01-01T00:00:00Z".to_string(),
wave_id: None,
wave_index: None,
wave_total: None,
};
let proto: ralph_proto::Event = event.into();
assert_eq!(proto.topic.as_str(), "build.done");
assert_eq!(proto.payload, "success");
assert!(!proto.is_wave_event());
}
#[test]
fn test_from_event_reader_to_proto_with_wave() {
let event = Event {
topic: "review.file".to_string(),
payload: Some("src/main.rs".to_string()),
ts: "2024-01-01T00:00:00Z".to_string(),
wave_id: Some("w-abc".to_string()),
wave_index: Some(2),
wave_total: Some(5),
};
let proto: ralph_proto::Event = event.into();
assert_eq!(proto.topic.as_str(), "review.file");
assert_eq!(proto.payload, "src/main.rs");
assert!(proto.is_wave_event());
assert_eq!(proto.wave_id.as_deref(), Some("w-abc"));
assert_eq!(proto.wave_index, Some(2));
assert_eq!(proto.wave_total, Some(5));
}
#[test]
fn test_from_event_reader_to_proto_none_payload() {
let event = Event {
topic: "empty.event".to_string(),
payload: None,
ts: "2024-01-01T00:00:00Z".to_string(),
wave_id: None,
wave_index: None,
wave_total: None,
};
let proto: ralph_proto::Event = event.into();
assert_eq!(proto.payload, "");
}
#[test]
fn test_mixed_valid_invalid_handling() {
let mut file = NamedTempFile::new().unwrap();
writeln!(file, r#"{{"topic":"valid1","ts":"2024-01-01T00:00:00Z"}}"#).unwrap();
writeln!(file, "not valid json at all").unwrap();
writeln!(file, r#"{{"topic":"valid2","ts":"2024-01-01T00:00:01Z"}}"#).unwrap();
file.flush().unwrap();
let mut reader = EventReader::new(file.path());
let result = reader.read_new_events().unwrap();
assert_eq!(result.events.len(), 2);
assert_eq!(result.malformed.len(), 1);
assert_eq!(result.events[0].topic, "valid1");
assert_eq!(result.events[1].topic, "valid2");
}
}