1use ralph_proto::{Event, HatId};
7use serde::{Deserialize, Deserializer, Serialize};
8use std::fs::{self, File, OpenOptions};
9use std::io::{BufRead, BufReader, Write};
10use std::path::{Path, PathBuf};
11use tracing::{debug, warn};
12
13fn deserialize_flexible_payload<'de, D>(deserializer: D) -> Result<String, D::Error>
21where
22 D: Deserializer<'de>,
23{
24 #[derive(Deserialize)]
25 #[serde(untagged)]
26 enum FlexiblePayload {
27 String(String),
28 Object(serde_json::Value),
29 }
30
31 let opt = Option::<FlexiblePayload>::deserialize(deserializer)?;
32 Ok(opt
33 .map(|flex| match flex {
34 FlexiblePayload::String(s) => s,
35 FlexiblePayload::Object(serde_json::Value::Null) => String::new(),
36 FlexiblePayload::Object(obj) => {
37 serde_json::to_string(&obj).unwrap_or_else(|_| obj.to_string())
39 }
40 })
41 .unwrap_or_default())
42}
43
44#[derive(Debug, Clone, Serialize, Deserialize)]
54pub struct EventRecord {
55 pub ts: String,
57
58 #[serde(default)]
60 pub iteration: u32,
61
62 #[serde(default)]
64 pub hat: String,
65
66 pub topic: String,
68
69 #[serde(skip_serializing_if = "Option::is_none")]
71 pub triggered: Option<String>,
72
73 #[serde(default, deserialize_with = "deserialize_flexible_payload")]
76 pub payload: String,
77
78 #[serde(skip_serializing_if = "Option::is_none")]
80 pub blocked_count: Option<u32>,
81}
82
83impl EventRecord {
84 const MAX_PAYLOAD_LEN: usize = 500;
86
87 pub fn new(
89 iteration: u32,
90 hat: impl Into<String>,
91 event: &Event,
92 triggered: Option<&HatId>,
93 ) -> Self {
94 let payload = if event.payload.len() > Self::MAX_PAYLOAD_LEN {
95 let mut truncate_at = Self::MAX_PAYLOAD_LEN;
98 while truncate_at > 0 && !event.payload.is_char_boundary(truncate_at) {
99 truncate_at -= 1;
100 }
101 format!(
102 "{}... [truncated, {} chars total]",
103 &event.payload[..truncate_at],
104 event.payload.chars().count()
105 )
106 } else {
107 event.payload.clone()
108 };
109
110 Self {
111 ts: chrono::Utc::now().to_rfc3339(),
112 iteration,
113 hat: hat.into(),
114 topic: event.topic.to_string(),
115 triggered: triggered.map(|h| h.to_string()),
116 payload,
117 blocked_count: None,
118 }
119 }
120
121 pub fn with_blocked_count(mut self, count: u32) -> Self {
123 self.blocked_count = Some(count);
124 self
125 }
126}
127
128pub struct EventLogger {
130 path: PathBuf,
132
133 file: Option<File>,
135}
136
137impl EventLogger {
138 pub const DEFAULT_PATH: &'static str = ".agent/events.jsonl";
140
141 pub fn new(path: impl Into<PathBuf>) -> Self {
145 Self {
146 path: path.into(),
147 file: None,
148 }
149 }
150
151 pub fn default_path() -> Self {
153 Self::new(Self::DEFAULT_PATH)
154 }
155
156 fn ensure_open(&mut self) -> std::io::Result<&mut File> {
158 if self.file.is_none() {
159 if let Some(parent) = self.path.parent() {
160 fs::create_dir_all(parent)?;
161 }
162 let file = OpenOptions::new()
163 .create(true)
164 .append(true)
165 .open(&self.path)?;
166 self.file = Some(file);
167 }
168 Ok(self.file.as_mut().unwrap())
169 }
170
171 pub fn log(&mut self, record: &EventRecord) -> std::io::Result<()> {
173 let file = self.ensure_open()?;
174 let json = serde_json::to_string(record)?;
175 writeln!(file, "{}", json)?;
176 file.flush()?;
177 debug!(topic = %record.topic, iteration = record.iteration, "Event logged");
178 Ok(())
179 }
180
181 pub fn log_event(
183 &mut self,
184 iteration: u32,
185 hat: &str,
186 event: &Event,
187 triggered: Option<&HatId>,
188 ) -> std::io::Result<()> {
189 let record = EventRecord::new(iteration, hat, event, triggered);
190 self.log(&record)
191 }
192
193 pub fn path(&self) -> &Path {
195 &self.path
196 }
197}
198
199pub struct EventHistory {
201 path: PathBuf,
202}
203
204impl EventHistory {
205 pub fn new(path: impl Into<PathBuf>) -> Self {
207 Self { path: path.into() }
208 }
209
210 pub fn default_path() -> Self {
212 Self::new(EventLogger::DEFAULT_PATH)
213 }
214
215 pub fn exists(&self) -> bool {
217 self.path.exists()
218 }
219
220 pub fn read_all(&self) -> std::io::Result<Vec<EventRecord>> {
222 if !self.exists() {
223 return Ok(Vec::new());
224 }
225
226 let file = File::open(&self.path)?;
227 let reader = BufReader::new(file);
228 let mut records = Vec::new();
229
230 for (line_num, line) in reader.lines().enumerate() {
231 let line = line?;
232 if line.trim().is_empty() {
233 continue;
234 }
235 match serde_json::from_str(&line) {
236 Ok(record) => records.push(record),
237 Err(e) => {
238 warn!(line = line_num + 1, error = %e, "Failed to parse event record");
239 }
240 }
241 }
242
243 Ok(records)
244 }
245
246 pub fn read_last(&self, n: usize) -> std::io::Result<Vec<EventRecord>> {
248 let all = self.read_all()?;
249 let start = all.len().saturating_sub(n);
250 Ok(all[start..].to_vec())
251 }
252
253 pub fn filter_by_topic(&self, topic: &str) -> std::io::Result<Vec<EventRecord>> {
255 let all = self.read_all()?;
256 Ok(all.into_iter().filter(|r| r.topic == topic).collect())
257 }
258
259 pub fn filter_by_iteration(&self, iteration: u32) -> std::io::Result<Vec<EventRecord>> {
261 let all = self.read_all()?;
262 Ok(all
263 .into_iter()
264 .filter(|r| r.iteration == iteration)
265 .collect())
266 }
267
268 pub fn clear(&self) -> std::io::Result<()> {
270 if self.exists() {
271 fs::remove_file(&self.path)?;
272 }
273 Ok(())
274 }
275}
276
277#[cfg(test)]
278mod tests {
279 use super::*;
280 use tempfile::TempDir;
281
282 fn make_event(topic: &str, payload: &str) -> Event {
283 Event::new(topic, payload)
284 }
285
286 #[test]
287 fn test_log_and_read() {
288 let tmp = TempDir::new().unwrap();
289 let path = tmp.path().join("events.jsonl");
290
291 let mut logger = EventLogger::new(&path);
292
293 let event1 = make_event("task.start", "Starting task");
295 let event2 = make_event("build.done", "Build complete");
296
297 logger
298 .log_event(1, "loop", &event1, Some(&HatId::new("planner")))
299 .unwrap();
300 logger
301 .log_event(2, "builder", &event2, Some(&HatId::new("planner")))
302 .unwrap();
303
304 let history = EventHistory::new(&path);
306 let records = history.read_all().unwrap();
307
308 assert_eq!(records.len(), 2);
309 assert_eq!(records[0].topic, "task.start");
310 assert_eq!(records[0].iteration, 1);
311 assert_eq!(records[0].hat, "loop");
312 assert_eq!(records[0].triggered, Some("planner".to_string()));
313 assert_eq!(records[1].topic, "build.done");
314 }
315
316 #[test]
317 fn test_read_last() {
318 let tmp = TempDir::new().unwrap();
319 let path = tmp.path().join("events.jsonl");
320
321 let mut logger = EventLogger::new(&path);
322
323 for i in 1..=10 {
324 let event = make_event("test", &format!("Event {}", i));
325 logger.log_event(i, "hat", &event, None).unwrap();
326 }
327
328 let history = EventHistory::new(&path);
329 let last_3 = history.read_last(3).unwrap();
330
331 assert_eq!(last_3.len(), 3);
332 assert_eq!(last_3[0].iteration, 8);
333 assert_eq!(last_3[2].iteration, 10);
334 }
335
336 #[test]
337 fn test_filter_by_topic() {
338 let tmp = TempDir::new().unwrap();
339 let path = tmp.path().join("events.jsonl");
340
341 let mut logger = EventLogger::new(&path);
342
343 logger
344 .log_event(1, "hat", &make_event("build.done", "a"), None)
345 .unwrap();
346 logger
347 .log_event(2, "hat", &make_event("build.blocked", "b"), None)
348 .unwrap();
349 logger
350 .log_event(3, "hat", &make_event("build.done", "c"), None)
351 .unwrap();
352
353 let history = EventHistory::new(&path);
354 let blocked = history.filter_by_topic("build.blocked").unwrap();
355
356 assert_eq!(blocked.len(), 1);
357 assert_eq!(blocked[0].iteration, 2);
358 }
359
360 #[test]
361 fn test_payload_truncation() {
362 let long_payload = "x".repeat(1000);
363 let event = make_event("test", &long_payload);
364 let record = EventRecord::new(1, "hat", &event, None);
365
366 assert!(record.payload.len() < 1000);
367 assert!(record.payload.contains("[truncated"));
368 }
369
370 #[test]
371 fn test_payload_truncation_with_multibyte_chars() {
372 let mut payload = "x".repeat(498);
375 payload.push_str("✅✅✅"); payload.push_str(&"y".repeat(500));
377
378 let event = make_event("test", &payload);
379 let record = EventRecord::new(1, "hat", &event, None);
381
382 assert!(record.payload.contains("[truncated"));
383 for _ in record.payload.chars() {}
385 }
386
387 #[test]
388 fn test_creates_parent_directory() {
389 let tmp = TempDir::new().unwrap();
390 let path = tmp.path().join("nested/dir/events.jsonl");
391
392 let mut logger = EventLogger::new(&path);
393 let event = make_event("test", "payload");
394 logger.log_event(1, "hat", &event, None).unwrap();
395
396 assert!(path.exists());
397 }
398
399 #[test]
400 fn test_empty_history() {
401 let tmp = TempDir::new().unwrap();
402 let path = tmp.path().join("nonexistent.jsonl");
403
404 let history = EventHistory::new(&path);
405 assert!(!history.exists());
406
407 let records = history.read_all().unwrap();
408 assert!(records.is_empty());
409 }
410
411 #[test]
412 fn test_agent_written_events_without_iteration() {
413 let tmp = TempDir::new().unwrap();
416 let path = tmp.path().join("events.jsonl");
417
418 let mut file = File::create(&path).unwrap();
420 writeln!(
421 file,
422 r#"{{"topic":"build.task","payload":"Implement auth","ts":"2024-01-15T10:00:00Z"}}"#
423 )
424 .unwrap();
425 writeln!(
426 file,
427 r#"{{"topic":"build.done","ts":"2024-01-15T10:30:00Z"}}"#
428 )
429 .unwrap();
430
431 let history = EventHistory::new(&path);
433 let records = history.read_all().unwrap();
434
435 assert_eq!(records.len(), 2);
436 assert_eq!(records[0].topic, "build.task");
437 assert_eq!(records[0].payload, "Implement auth");
438 assert_eq!(records[0].iteration, 0); assert_eq!(records[0].hat, ""); assert_eq!(records[1].topic, "build.done");
441 assert_eq!(records[1].payload, ""); }
443
444 #[test]
445 fn test_mixed_event_formats() {
446 let tmp = TempDir::new().unwrap();
448 let path = tmp.path().join("events.jsonl");
449
450 let mut logger = EventLogger::new(&path);
452 let event = make_event("task.start", "Initial task");
453 logger.log_event(1, "loop", &event, Some(&HatId::new("planner"))).unwrap();
454
455 let mut file = std::fs::OpenOptions::new()
457 .append(true)
458 .open(&path)
459 .unwrap();
460 writeln!(
461 file,
462 r#"{{"topic":"build.task","payload":"Agent wrote this","ts":"2024-01-15T10:05:00Z"}}"#
463 )
464 .unwrap();
465
466 let history = EventHistory::new(&path);
468 let records = history.read_all().unwrap();
469
470 assert_eq!(records.len(), 2);
471 assert_eq!(records[0].topic, "task.start");
473 assert_eq!(records[0].iteration, 1);
474 assert_eq!(records[0].hat, "loop");
475 assert_eq!(records[1].topic, "build.task");
477 assert_eq!(records[1].iteration, 0); assert_eq!(records[1].hat, ""); }
480
481 #[test]
482 fn test_object_payload_from_ralph_emit_json() {
483 let tmp = TempDir::new().unwrap();
486 let path = tmp.path().join("events.jsonl");
487
488 let mut file = File::create(&path).unwrap();
489
490 writeln!(
492 file,
493 r#"{{"ts":"2024-01-15T10:00:00Z","topic":"task.start","payload":"implement feature"}}"#
494 )
495 .unwrap();
496
497 writeln!(
499 file,
500 r#"{{"topic":"task.complete","payload":{{"status":"verified","tasks":["auth","api"]}},"ts":"2024-01-15T10:30:00Z"}}"#
501 )
502 .unwrap();
503
504 writeln!(
506 file,
507 r#"{{"topic":"loop.recovery","payload":{{"status":"recovered","evidence":{{"tests":"pass"}}}},"ts":"2024-01-15T10:45:00Z"}}"#
508 )
509 .unwrap();
510
511 let history = EventHistory::new(&path);
512 let records = history.read_all().unwrap();
513
514 assert_eq!(records.len(), 3);
515
516 assert_eq!(records[0].topic, "task.start");
518 assert_eq!(records[0].payload, "implement feature");
519
520 assert_eq!(records[1].topic, "task.complete");
522 assert!(records[1].payload.contains("\"status\""));
523 assert!(records[1].payload.contains("\"verified\""));
524 let parsed: serde_json::Value = serde_json::from_str(&records[1].payload).unwrap();
526 assert_eq!(parsed["status"], "verified");
527
528 assert_eq!(records[2].topic, "loop.recovery");
530 let parsed: serde_json::Value = serde_json::from_str(&records[2].payload).unwrap();
531 assert_eq!(parsed["evidence"]["tests"], "pass");
532 }
533}