1use crate::loop_context::LoopContext;
7use crate::text::floor_char_boundary;
8use ralph_proto::{Event, HatId};
9use serde::{Deserialize, Deserializer, Serialize};
10use std::fs::{self, File, OpenOptions};
11use std::io::{BufRead, BufReader, Write};
12use std::path::{Path, PathBuf};
13use tracing::{debug, warn};
14
15fn deserialize_flexible_payload<'de, D>(deserializer: D) -> Result<String, D::Error>
23where
24 D: Deserializer<'de>,
25{
26 #[derive(Deserialize)]
27 #[serde(untagged)]
28 enum FlexiblePayload {
29 String(String),
30 Object(serde_json::Value),
31 }
32
33 let opt = Option::<FlexiblePayload>::deserialize(deserializer)?;
34 Ok(opt
35 .map(|flex| match flex {
36 FlexiblePayload::String(s) => s,
37 FlexiblePayload::Object(serde_json::Value::Null) => String::new(),
38 FlexiblePayload::Object(obj) => {
39 serde_json::to_string(&obj).unwrap_or_else(|_| obj.to_string())
41 }
42 })
43 .unwrap_or_default())
44}
45
46#[derive(Debug, Clone, Serialize, Deserialize)]
56pub struct EventRecord {
57 pub ts: String,
59
60 #[serde(default)]
62 pub iteration: u32,
63
64 #[serde(default)]
66 pub hat: String,
67
68 pub topic: String,
70
71 #[serde(skip_serializing_if = "Option::is_none")]
73 pub triggered: Option<String>,
74
75 #[serde(default, deserialize_with = "deserialize_flexible_payload")]
78 pub payload: String,
79
80 #[serde(skip_serializing_if = "Option::is_none")]
82 pub blocked_count: Option<u32>,
83
84 #[serde(default, skip_serializing_if = "Option::is_none")]
86 pub wave_id: Option<String>,
87
88 #[serde(default, skip_serializing_if = "Option::is_none")]
90 pub wave_index: Option<u32>,
91
92 #[serde(default, skip_serializing_if = "Option::is_none")]
94 pub wave_total: Option<u32>,
95}
96
97impl EventRecord {
98 const MAX_PAYLOAD_LEN: usize = 500;
100
101 pub fn new(
103 iteration: u32,
104 hat: impl Into<String>,
105 event: &Event,
106 triggered: Option<&HatId>,
107 ) -> Self {
108 let payload = if event.payload.len() > Self::MAX_PAYLOAD_LEN {
109 let truncate_at = floor_char_boundary(&event.payload, Self::MAX_PAYLOAD_LEN);
111 format!(
112 "{}... [truncated, {} chars total]",
113 &event.payload[..truncate_at],
114 event.payload.chars().count()
115 )
116 } else {
117 event.payload.clone()
118 };
119
120 Self {
121 ts: chrono::Utc::now().to_rfc3339(),
122 iteration,
123 hat: hat.into(),
124 topic: event.topic.to_string(),
125 triggered: triggered.map(|h| h.to_string()),
126 payload,
127 blocked_count: None,
128 wave_id: event.wave_id.clone(),
129 wave_index: event.wave_index,
130 wave_total: event.wave_total,
131 }
132 }
133
134 pub fn with_blocked_count(mut self, count: u32) -> Self {
136 self.blocked_count = Some(count);
137 self
138 }
139}
140
141pub struct EventLogger {
143 path: PathBuf,
145
146 file: Option<File>,
148}
149
150impl EventLogger {
151 pub const DEFAULT_PATH: &'static str = ".ralph/events.jsonl";
153
154 pub fn new(path: impl Into<PathBuf>) -> Self {
158 Self {
159 path: path.into(),
160 file: None,
161 }
162 }
163
164 pub fn default_path() -> Self {
166 Self::new(Self::DEFAULT_PATH)
167 }
168
169 pub fn from_context(context: &LoopContext) -> Self {
175 let events_path = std::fs::read_to_string(context.current_events_marker())
179 .map(|s| {
180 let relative = s.trim();
181 context.workspace().join(relative)
182 })
183 .unwrap_or_else(|_| context.events_path());
184 Self::new(events_path)
185 }
186
187 fn ensure_open(&mut self) -> std::io::Result<&mut File> {
189 if self.file.is_none() {
190 if let Some(parent) = self.path.parent() {
191 fs::create_dir_all(parent)?;
192 }
193 let file = OpenOptions::new()
194 .create(true)
195 .append(true)
196 .open(&self.path)?;
197 self.file = Some(file);
198 }
199 Ok(self.file.as_mut().unwrap())
200 }
201
202 pub fn log(&mut self, record: &EventRecord) -> std::io::Result<()> {
208 let file = self.ensure_open()?;
209 let mut json = serde_json::to_string(record)?;
210 json.push('\n');
211 file.write_all(json.as_bytes())?;
213 file.flush()?;
214 debug!(topic = %record.topic, iteration = record.iteration, "Event logged");
215 Ok(())
216 }
217
218 pub fn log_event(
220 &mut self,
221 iteration: u32,
222 hat: &str,
223 event: &Event,
224 triggered: Option<&HatId>,
225 ) -> std::io::Result<()> {
226 let record = EventRecord::new(iteration, hat, event, triggered);
227 self.log(&record)
228 }
229
230 pub fn path(&self) -> &Path {
232 &self.path
233 }
234}
235
236pub struct EventHistory {
238 path: PathBuf,
239}
240
241impl EventHistory {
242 pub fn new(path: impl Into<PathBuf>) -> Self {
244 Self { path: path.into() }
245 }
246
247 pub fn default_path() -> Self {
249 Self::new(EventLogger::DEFAULT_PATH)
250 }
251
252 pub fn from_context(context: &LoopContext) -> Self {
257 Self::new(context.events_path())
258 }
259
260 pub fn exists(&self) -> bool {
262 self.path.exists()
263 }
264
265 pub fn read_all(&self) -> std::io::Result<Vec<EventRecord>> {
267 if !self.exists() {
268 return Ok(Vec::new());
269 }
270
271 let file = File::open(&self.path)?;
272 let reader = BufReader::new(file);
273 let mut records = Vec::new();
274
275 for (line_num, line) in reader.lines().enumerate() {
276 let line = line?;
277 if line.trim().is_empty() {
278 continue;
279 }
280 match serde_json::from_str(&line) {
281 Ok(record) => records.push(record),
282 Err(e) => {
283 warn!(line = line_num + 1, error = %e, "Failed to parse event record");
284 }
285 }
286 }
287
288 Ok(records)
289 }
290
291 pub fn read_last(&self, n: usize) -> std::io::Result<Vec<EventRecord>> {
293 let all = self.read_all()?;
294 let start = all.len().saturating_sub(n);
295 Ok(all[start..].to_vec())
296 }
297
298 pub fn filter_by_topic(&self, topic: &str) -> std::io::Result<Vec<EventRecord>> {
300 let all = self.read_all()?;
301 Ok(all.into_iter().filter(|r| r.topic == topic).collect())
302 }
303
304 pub fn filter_by_iteration(&self, iteration: u32) -> std::io::Result<Vec<EventRecord>> {
306 let all = self.read_all()?;
307 Ok(all
308 .into_iter()
309 .filter(|r| r.iteration == iteration)
310 .collect())
311 }
312
313 pub fn clear(&self) -> std::io::Result<()> {
315 if self.exists() {
316 fs::remove_file(&self.path)?;
317 }
318 Ok(())
319 }
320}
321
322#[cfg(test)]
323mod tests {
324 use super::*;
325 use tempfile::TempDir;
326
327 fn make_event(topic: &str, payload: &str) -> Event {
328 Event::new(topic, payload)
329 }
330
331 #[test]
332 fn test_log_and_read() {
333 let tmp = TempDir::new().unwrap();
334 let path = tmp.path().join("events.jsonl");
335
336 let mut logger = EventLogger::new(&path);
337
338 let event1 = make_event("task.start", "Starting task");
340 let event2 = make_event("build.done", "Build complete");
341
342 logger
343 .log_event(1, "loop", &event1, Some(&HatId::new("planner")))
344 .unwrap();
345 logger
346 .log_event(2, "builder", &event2, Some(&HatId::new("planner")))
347 .unwrap();
348
349 let history = EventHistory::new(&path);
351 let records = history.read_all().unwrap();
352
353 assert_eq!(records.len(), 2);
354 assert_eq!(records[0].topic, "task.start");
355 assert_eq!(records[0].iteration, 1);
356 assert_eq!(records[0].hat, "loop");
357 assert_eq!(records[0].triggered, Some("planner".to_string()));
358 assert_eq!(records[1].topic, "build.done");
359 }
360
361 #[test]
362 fn test_read_last() {
363 let tmp = TempDir::new().unwrap();
364 let path = tmp.path().join("events.jsonl");
365
366 let mut logger = EventLogger::new(&path);
367
368 for i in 1..=10 {
369 let event = make_event("test", &format!("Event {}", i));
370 logger.log_event(i, "hat", &event, None).unwrap();
371 }
372
373 let history = EventHistory::new(&path);
374 let last_3 = history.read_last(3).unwrap();
375
376 assert_eq!(last_3.len(), 3);
377 assert_eq!(last_3[0].iteration, 8);
378 assert_eq!(last_3[2].iteration, 10);
379 }
380
381 #[test]
382 fn test_filter_by_topic() {
383 let tmp = TempDir::new().unwrap();
384 let path = tmp.path().join("events.jsonl");
385
386 let mut logger = EventLogger::new(&path);
387
388 logger
389 .log_event(1, "hat", &make_event("build.done", "a"), None)
390 .unwrap();
391 logger
392 .log_event(2, "hat", &make_event("build.blocked", "b"), None)
393 .unwrap();
394 logger
395 .log_event(3, "hat", &make_event("build.done", "c"), None)
396 .unwrap();
397
398 let history = EventHistory::new(&path);
399 let blocked = history.filter_by_topic("build.blocked").unwrap();
400
401 assert_eq!(blocked.len(), 1);
402 assert_eq!(blocked[0].iteration, 2);
403 }
404
405 #[test]
406 fn test_payload_truncation() {
407 let long_payload = "x".repeat(1000);
408 let event = make_event("test", &long_payload);
409 let record = EventRecord::new(1, "hat", &event, None);
410
411 assert!(record.payload.len() < 1000);
412 assert!(record.payload.contains("[truncated"));
413 }
414
415 #[test]
416 fn test_payload_truncation_with_multibyte_chars() {
417 let mut payload = "x".repeat(498);
420 payload.push_str("✅✅✅"); payload.push_str(&"y".repeat(500));
422
423 let event = make_event("test", &payload);
424 let record = EventRecord::new(1, "hat", &event, None);
426
427 assert!(record.payload.contains("[truncated"));
428 for _ in record.payload.chars() {}
430 }
431
432 #[test]
433 fn test_creates_parent_directory() {
434 let tmp = TempDir::new().unwrap();
435 let path = tmp.path().join("nested/dir/events.jsonl");
436
437 let mut logger = EventLogger::new(&path);
438 let event = make_event("test", "payload");
439 logger.log_event(1, "hat", &event, None).unwrap();
440
441 assert!(path.exists());
442 }
443
444 #[test]
445 fn test_empty_history() {
446 let tmp = TempDir::new().unwrap();
447 let path = tmp.path().join("nonexistent.jsonl");
448
449 let history = EventHistory::new(&path);
450 assert!(!history.exists());
451
452 let records = history.read_all().unwrap();
453 assert!(records.is_empty());
454 }
455
456 #[test]
457 fn test_agent_written_events_without_iteration() {
458 let tmp = TempDir::new().unwrap();
461 let path = tmp.path().join("events.jsonl");
462
463 let mut file = File::create(&path).unwrap();
465 writeln!(
466 file,
467 r#"{{"topic":"build.task","payload":"Implement auth","ts":"2024-01-15T10:00:00Z"}}"#
468 )
469 .unwrap();
470 writeln!(
471 file,
472 r#"{{"topic":"build.done","ts":"2024-01-15T10:30:00Z"}}"#
473 )
474 .unwrap();
475
476 let history = EventHistory::new(&path);
478 let records = history.read_all().unwrap();
479
480 assert_eq!(records.len(), 2);
481 assert_eq!(records[0].topic, "build.task");
482 assert_eq!(records[0].payload, "Implement auth");
483 assert_eq!(records[0].iteration, 0); assert_eq!(records[0].hat, ""); assert_eq!(records[1].topic, "build.done");
486 assert_eq!(records[1].payload, ""); }
488
489 #[test]
490 fn test_mixed_event_formats() {
491 let tmp = TempDir::new().unwrap();
493 let path = tmp.path().join("events.jsonl");
494
495 let mut logger = EventLogger::new(&path);
497 let event = make_event("task.start", "Initial task");
498 logger
499 .log_event(1, "loop", &event, Some(&HatId::new("planner")))
500 .unwrap();
501
502 let mut file = std::fs::OpenOptions::new()
504 .append(true)
505 .open(&path)
506 .unwrap();
507 writeln!(
508 file,
509 r#"{{"topic":"build.task","payload":"Agent wrote this","ts":"2024-01-15T10:05:00Z"}}"#
510 )
511 .unwrap();
512
513 let history = EventHistory::new(&path);
515 let records = history.read_all().unwrap();
516
517 assert_eq!(records.len(), 2);
518 assert_eq!(records[0].topic, "task.start");
520 assert_eq!(records[0].iteration, 1);
521 assert_eq!(records[0].hat, "loop");
522 assert_eq!(records[1].topic, "build.task");
524 assert_eq!(records[1].iteration, 0); assert_eq!(records[1].hat, ""); }
527
528 #[test]
529 fn test_event_record_propagates_wave_metadata() {
530 let event = make_event("review.file", "src/main.rs").with_wave("w-1a2b3c4d", 1, 3);
531 let record = EventRecord::new(1, "dispatcher", &event, None);
532
533 assert_eq!(record.wave_id.as_deref(), Some("w-1a2b3c4d"));
534 assert_eq!(record.wave_index, Some(1));
535 assert_eq!(record.wave_total, Some(3));
536 }
537
538 #[test]
539 fn test_event_record_no_wave_metadata() {
540 let event = make_event("build.done", "success");
541 let record = EventRecord::new(1, "builder", &event, None);
542
543 assert!(record.wave_id.is_none());
544 assert!(record.wave_index.is_none());
545 assert!(record.wave_total.is_none());
546 }
547
548 #[test]
549 fn test_event_record_wave_roundtrip_through_jsonl() {
550 let tmp = TempDir::new().unwrap();
551 let path = tmp.path().join("events.jsonl");
552
553 let mut logger = EventLogger::new(&path);
554
555 let event = make_event("review.file", "src/main.rs").with_wave("w-deadbeef", 0, 5);
557 logger.log_event(1, "dispatcher", &event, None).unwrap();
558
559 let plain_event = make_event("build.done", "ok");
561 logger.log_event(2, "builder", &plain_event, None).unwrap();
562
563 let history = EventHistory::new(&path);
564 let records = history.read_all().unwrap();
565
566 assert_eq!(records.len(), 2);
567 assert_eq!(records[0].wave_id.as_deref(), Some("w-deadbeef"));
569 assert_eq!(records[0].wave_index, Some(0));
570 assert_eq!(records[0].wave_total, Some(5));
571 assert!(records[1].wave_id.is_none());
573 assert!(records[1].wave_index.is_none());
574 assert!(records[1].wave_total.is_none());
575 }
576
577 #[test]
578 fn test_event_record_wave_fields_not_serialized_when_none() {
579 let event = make_event("test", "payload");
580 let record = EventRecord::new(1, "hat", &event, None);
581 let json = serde_json::to_string(&record).unwrap();
582 assert!(!json.contains("wave_id"));
583 assert!(!json.contains("wave_index"));
584 assert!(!json.contains("wave_total"));
585 }
586
587 #[test]
588 fn test_event_record_backwards_compat_no_wave_fields() {
589 let json = r#"{"ts":"2024-01-15T10:00:00Z","iteration":1,"hat":"builder","topic":"build.done","payload":"ok"}"#;
591 let record: EventRecord = serde_json::from_str(json).unwrap();
592 assert!(record.wave_id.is_none());
593 assert!(record.wave_index.is_none());
594 assert!(record.wave_total.is_none());
595 assert_eq!(record.topic, "build.done");
596 }
597
598 #[test]
599 fn test_object_payload_from_ralph_emit_json() {
600 let tmp = TempDir::new().unwrap();
603 let path = tmp.path().join("events.jsonl");
604
605 let mut file = File::create(&path).unwrap();
606
607 writeln!(
609 file,
610 r#"{{"ts":"2024-01-15T10:00:00Z","topic":"task.start","payload":"implement feature"}}"#
611 )
612 .unwrap();
613
614 writeln!(
616 file,
617 r#"{{"topic":"task.complete","payload":{{"status":"verified","tasks":["auth","api"]}},"ts":"2024-01-15T10:30:00Z"}}"#
618 )
619 .unwrap();
620
621 writeln!(
623 file,
624 r#"{{"topic":"loop.recovery","payload":{{"status":"recovered","evidence":{{"tests":"pass"}}}},"ts":"2024-01-15T10:45:00Z"}}"#
625 )
626 .unwrap();
627
628 let history = EventHistory::new(&path);
629 let records = history.read_all().unwrap();
630
631 assert_eq!(records.len(), 3);
632
633 assert_eq!(records[0].topic, "task.start");
635 assert_eq!(records[0].payload, "implement feature");
636
637 assert_eq!(records[1].topic, "task.complete");
639 assert!(records[1].payload.contains("\"status\""));
640 assert!(records[1].payload.contains("\"verified\""));
641 let parsed: serde_json::Value = serde_json::from_str(&records[1].payload).unwrap();
643 assert_eq!(parsed["status"], "verified");
644
645 assert_eq!(records[2].topic, "loop.recovery");
647 let parsed: serde_json::Value = serde_json::from_str(&records[2].payload).unwrap();
648 assert_eq!(parsed["evidence"]["tests"], "pass");
649 }
650}