use crate::loop_context::LoopContext;
use crate::text::floor_char_boundary;
use ralph_proto::{Event, HatId};
use serde::{Deserialize, Deserializer, Serialize};
use std::fs::{self, File, OpenOptions};
use std::io::{BufRead, BufReader, Write};
use std::path::{Path, PathBuf};
use tracing::{debug, warn};
fn deserialize_flexible_payload<'de, D>(deserializer: D) -> Result<String, D::Error>
where
D: Deserializer<'de>,
{
#[derive(Deserialize)]
#[serde(untagged)]
enum FlexiblePayload {
String(String),
Object(serde_json::Value),
}
let opt = Option::<FlexiblePayload>::deserialize(deserializer)?;
Ok(opt
.map(|flex| match flex {
FlexiblePayload::String(s) => s,
FlexiblePayload::Object(serde_json::Value::Null) => String::new(),
FlexiblePayload::Object(obj) => {
serde_json::to_string(&obj).unwrap_or_else(|_| obj.to_string())
}
})
.unwrap_or_default())
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EventRecord {
pub ts: String,
#[serde(default)]
pub iteration: u32,
#[serde(default)]
pub hat: String,
pub topic: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub triggered: Option<String>,
#[serde(default, deserialize_with = "deserialize_flexible_payload")]
pub payload: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub blocked_count: Option<u32>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub wave_id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub wave_index: Option<u32>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub wave_total: Option<u32>,
}
impl EventRecord {
const MAX_PAYLOAD_LEN: usize = 500;
pub fn new(
iteration: u32,
hat: impl Into<String>,
event: &Event,
triggered: Option<&HatId>,
) -> Self {
let payload = if event.payload.len() > Self::MAX_PAYLOAD_LEN {
let truncate_at = floor_char_boundary(&event.payload, Self::MAX_PAYLOAD_LEN);
format!(
"{}... [truncated, {} chars total]",
&event.payload[..truncate_at],
event.payload.chars().count()
)
} else {
event.payload.clone()
};
Self {
ts: chrono::Utc::now().to_rfc3339(),
iteration,
hat: hat.into(),
topic: event.topic.to_string(),
triggered: triggered.map(|h| h.to_string()),
payload,
blocked_count: None,
wave_id: event.wave_id.clone(),
wave_index: event.wave_index,
wave_total: event.wave_total,
}
}
pub fn with_blocked_count(mut self, count: u32) -> Self {
self.blocked_count = Some(count);
self
}
}
pub struct EventLogger {
path: PathBuf,
file: Option<File>,
}
impl EventLogger {
pub const DEFAULT_PATH: &'static str = ".ralph/events.jsonl";
pub fn new(path: impl Into<PathBuf>) -> Self {
Self {
path: path.into(),
file: None,
}
}
pub fn default_path() -> Self {
Self::new(Self::DEFAULT_PATH)
}
pub fn from_context(context: &LoopContext) -> Self {
let events_path = std::fs::read_to_string(context.current_events_marker())
.map(|s| {
let relative = s.trim();
context.workspace().join(relative)
})
.unwrap_or_else(|_| context.events_path());
Self::new(events_path)
}
fn ensure_open(&mut self) -> std::io::Result<&mut File> {
if self.file.is_none() {
if let Some(parent) = self.path.parent() {
fs::create_dir_all(parent)?;
}
let file = OpenOptions::new()
.create(true)
.append(true)
.open(&self.path)?;
self.file = Some(file);
}
Ok(self.file.as_mut().unwrap())
}
pub fn log(&mut self, record: &EventRecord) -> std::io::Result<()> {
let file = self.ensure_open()?;
let mut json = serde_json::to_string(record)?;
json.push('\n');
file.write_all(json.as_bytes())?;
file.flush()?;
debug!(topic = %record.topic, iteration = record.iteration, "Event logged");
Ok(())
}
pub fn log_event(
&mut self,
iteration: u32,
hat: &str,
event: &Event,
triggered: Option<&HatId>,
) -> std::io::Result<()> {
let record = EventRecord::new(iteration, hat, event, triggered);
self.log(&record)
}
pub fn path(&self) -> &Path {
&self.path
}
}
pub struct EventHistory {
path: PathBuf,
}
impl EventHistory {
pub fn new(path: impl Into<PathBuf>) -> Self {
Self { path: path.into() }
}
pub fn default_path() -> Self {
Self::new(EventLogger::DEFAULT_PATH)
}
pub fn from_context(context: &LoopContext) -> Self {
Self::new(context.events_path())
}
pub fn exists(&self) -> bool {
self.path.exists()
}
pub fn read_all(&self) -> std::io::Result<Vec<EventRecord>> {
if !self.exists() {
return Ok(Vec::new());
}
let file = File::open(&self.path)?;
let reader = BufReader::new(file);
let mut records = Vec::new();
for (line_num, line) in reader.lines().enumerate() {
let line = line?;
if line.trim().is_empty() {
continue;
}
match serde_json::from_str(&line) {
Ok(record) => records.push(record),
Err(e) => {
warn!(line = line_num + 1, error = %e, "Failed to parse event record");
}
}
}
Ok(records)
}
pub fn read_last(&self, n: usize) -> std::io::Result<Vec<EventRecord>> {
let all = self.read_all()?;
let start = all.len().saturating_sub(n);
Ok(all[start..].to_vec())
}
pub fn filter_by_topic(&self, topic: &str) -> std::io::Result<Vec<EventRecord>> {
let all = self.read_all()?;
Ok(all.into_iter().filter(|r| r.topic == topic).collect())
}
pub fn filter_by_iteration(&self, iteration: u32) -> std::io::Result<Vec<EventRecord>> {
let all = self.read_all()?;
Ok(all
.into_iter()
.filter(|r| r.iteration == iteration)
.collect())
}
pub fn clear(&self) -> std::io::Result<()> {
if self.exists() {
fs::remove_file(&self.path)?;
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
fn make_event(topic: &str, payload: &str) -> Event {
Event::new(topic, payload)
}
#[test]
fn test_log_and_read() {
let tmp = TempDir::new().unwrap();
let path = tmp.path().join("events.jsonl");
let mut logger = EventLogger::new(&path);
let event1 = make_event("task.start", "Starting task");
let event2 = make_event("build.done", "Build complete");
logger
.log_event(1, "loop", &event1, Some(&HatId::new("planner")))
.unwrap();
logger
.log_event(2, "builder", &event2, Some(&HatId::new("planner")))
.unwrap();
let history = EventHistory::new(&path);
let records = history.read_all().unwrap();
assert_eq!(records.len(), 2);
assert_eq!(records[0].topic, "task.start");
assert_eq!(records[0].iteration, 1);
assert_eq!(records[0].hat, "loop");
assert_eq!(records[0].triggered, Some("planner".to_string()));
assert_eq!(records[1].topic, "build.done");
}
#[test]
fn test_read_last() {
let tmp = TempDir::new().unwrap();
let path = tmp.path().join("events.jsonl");
let mut logger = EventLogger::new(&path);
for i in 1..=10 {
let event = make_event("test", &format!("Event {}", i));
logger.log_event(i, "hat", &event, None).unwrap();
}
let history = EventHistory::new(&path);
let last_3 = history.read_last(3).unwrap();
assert_eq!(last_3.len(), 3);
assert_eq!(last_3[0].iteration, 8);
assert_eq!(last_3[2].iteration, 10);
}
#[test]
fn test_filter_by_topic() {
let tmp = TempDir::new().unwrap();
let path = tmp.path().join("events.jsonl");
let mut logger = EventLogger::new(&path);
logger
.log_event(1, "hat", &make_event("build.done", "a"), None)
.unwrap();
logger
.log_event(2, "hat", &make_event("build.blocked", "b"), None)
.unwrap();
logger
.log_event(3, "hat", &make_event("build.done", "c"), None)
.unwrap();
let history = EventHistory::new(&path);
let blocked = history.filter_by_topic("build.blocked").unwrap();
assert_eq!(blocked.len(), 1);
assert_eq!(blocked[0].iteration, 2);
}
#[test]
fn test_payload_truncation() {
let long_payload = "x".repeat(1000);
let event = make_event("test", &long_payload);
let record = EventRecord::new(1, "hat", &event, None);
assert!(record.payload.len() < 1000);
assert!(record.payload.contains("[truncated"));
}
#[test]
fn test_payload_truncation_with_multibyte_chars() {
let mut payload = "x".repeat(498);
payload.push_str("✅✅✅"); payload.push_str(&"y".repeat(500));
let event = make_event("test", &payload);
let record = EventRecord::new(1, "hat", &event, None);
assert!(record.payload.contains("[truncated"));
for _ in record.payload.chars() {}
}
#[test]
fn test_creates_parent_directory() {
let tmp = TempDir::new().unwrap();
let path = tmp.path().join("nested/dir/events.jsonl");
let mut logger = EventLogger::new(&path);
let event = make_event("test", "payload");
logger.log_event(1, "hat", &event, None).unwrap();
assert!(path.exists());
}
#[test]
fn test_empty_history() {
let tmp = TempDir::new().unwrap();
let path = tmp.path().join("nonexistent.jsonl");
let history = EventHistory::new(&path);
assert!(!history.exists());
let records = history.read_all().unwrap();
assert!(records.is_empty());
}
#[test]
fn test_agent_written_events_without_iteration() {
let tmp = TempDir::new().unwrap();
let path = tmp.path().join("events.jsonl");
let mut file = File::create(&path).unwrap();
writeln!(
file,
r#"{{"topic":"build.task","payload":"Implement auth","ts":"2024-01-15T10:00:00Z"}}"#
)
.unwrap();
writeln!(
file,
r#"{{"topic":"build.done","ts":"2024-01-15T10:30:00Z"}}"#
)
.unwrap();
let history = EventHistory::new(&path);
let records = history.read_all().unwrap();
assert_eq!(records.len(), 2);
assert_eq!(records[0].topic, "build.task");
assert_eq!(records[0].payload, "Implement auth");
assert_eq!(records[0].iteration, 0); assert_eq!(records[0].hat, ""); assert_eq!(records[1].topic, "build.done");
assert_eq!(records[1].payload, ""); }
#[test]
fn test_mixed_event_formats() {
let tmp = TempDir::new().unwrap();
let path = tmp.path().join("events.jsonl");
let mut logger = EventLogger::new(&path);
let event = make_event("task.start", "Initial task");
logger
.log_event(1, "loop", &event, Some(&HatId::new("planner")))
.unwrap();
let mut file = std::fs::OpenOptions::new()
.append(true)
.open(&path)
.unwrap();
writeln!(
file,
r#"{{"topic":"build.task","payload":"Agent wrote this","ts":"2024-01-15T10:05:00Z"}}"#
)
.unwrap();
let history = EventHistory::new(&path);
let records = history.read_all().unwrap();
assert_eq!(records.len(), 2);
assert_eq!(records[0].topic, "task.start");
assert_eq!(records[0].iteration, 1);
assert_eq!(records[0].hat, "loop");
assert_eq!(records[1].topic, "build.task");
assert_eq!(records[1].iteration, 0); assert_eq!(records[1].hat, ""); }
#[test]
fn test_event_record_propagates_wave_metadata() {
let event = make_event("review.file", "src/main.rs").with_wave("w-1a2b3c4d", 1, 3);
let record = EventRecord::new(1, "dispatcher", &event, None);
assert_eq!(record.wave_id.as_deref(), Some("w-1a2b3c4d"));
assert_eq!(record.wave_index, Some(1));
assert_eq!(record.wave_total, Some(3));
}
#[test]
fn test_event_record_no_wave_metadata() {
let event = make_event("build.done", "success");
let record = EventRecord::new(1, "builder", &event, None);
assert!(record.wave_id.is_none());
assert!(record.wave_index.is_none());
assert!(record.wave_total.is_none());
}
#[test]
fn test_event_record_wave_roundtrip_through_jsonl() {
let tmp = TempDir::new().unwrap();
let path = tmp.path().join("events.jsonl");
let mut logger = EventLogger::new(&path);
let event = make_event("review.file", "src/main.rs").with_wave("w-deadbeef", 0, 5);
logger.log_event(1, "dispatcher", &event, None).unwrap();
let plain_event = make_event("build.done", "ok");
logger.log_event(2, "builder", &plain_event, None).unwrap();
let history = EventHistory::new(&path);
let records = history.read_all().unwrap();
assert_eq!(records.len(), 2);
assert_eq!(records[0].wave_id.as_deref(), Some("w-deadbeef"));
assert_eq!(records[0].wave_index, Some(0));
assert_eq!(records[0].wave_total, Some(5));
assert!(records[1].wave_id.is_none());
assert!(records[1].wave_index.is_none());
assert!(records[1].wave_total.is_none());
}
#[test]
fn test_event_record_wave_fields_not_serialized_when_none() {
let event = make_event("test", "payload");
let record = EventRecord::new(1, "hat", &event, None);
let json = serde_json::to_string(&record).unwrap();
assert!(!json.contains("wave_id"));
assert!(!json.contains("wave_index"));
assert!(!json.contains("wave_total"));
}
#[test]
fn test_event_record_backwards_compat_no_wave_fields() {
let json = r#"{"ts":"2024-01-15T10:00:00Z","iteration":1,"hat":"builder","topic":"build.done","payload":"ok"}"#;
let record: EventRecord = serde_json::from_str(json).unwrap();
assert!(record.wave_id.is_none());
assert!(record.wave_index.is_none());
assert!(record.wave_total.is_none());
assert_eq!(record.topic, "build.done");
}
#[test]
fn test_object_payload_from_ralph_emit_json() {
let tmp = TempDir::new().unwrap();
let path = tmp.path().join("events.jsonl");
let mut file = File::create(&path).unwrap();
writeln!(
file,
r#"{{"ts":"2024-01-15T10:00:00Z","topic":"task.start","payload":"implement feature"}}"#
)
.unwrap();
writeln!(
file,
r#"{{"topic":"task.complete","payload":{{"status":"verified","tasks":["auth","api"]}},"ts":"2024-01-15T10:30:00Z"}}"#
)
.unwrap();
writeln!(
file,
r#"{{"topic":"loop.recovery","payload":{{"status":"recovered","evidence":{{"tests":"pass"}}}},"ts":"2024-01-15T10:45:00Z"}}"#
)
.unwrap();
let history = EventHistory::new(&path);
let records = history.read_all().unwrap();
assert_eq!(records.len(), 3);
assert_eq!(records[0].topic, "task.start");
assert_eq!(records[0].payload, "implement feature");
assert_eq!(records[1].topic, "task.complete");
assert!(records[1].payload.contains("\"status\""));
assert!(records[1].payload.contains("\"verified\""));
let parsed: serde_json::Value = serde_json::from_str(&records[1].payload).unwrap();
assert_eq!(parsed["status"], "verified");
assert_eq!(records[2].topic, "loop.recovery");
let parsed: serde_json::Value = serde_json::from_str(&records[2].payload).unwrap();
assert_eq!(parsed["evidence"]["tests"], "pass");
}
}