1use crate::loop_context::LoopContext;
7use crate::text::floor_char_boundary;
8use ralph_proto::{Event, HatId};
9use serde::{Deserialize, Deserializer, Serialize};
10use std::fs::{self, File, OpenOptions};
11use std::io::{BufRead, BufReader, Write};
12use std::path::{Path, PathBuf};
13use tracing::{debug, warn};
14
15fn deserialize_flexible_payload<'de, D>(deserializer: D) -> Result<String, D::Error>
23where
24 D: Deserializer<'de>,
25{
26 #[derive(Deserialize)]
27 #[serde(untagged)]
28 enum FlexiblePayload {
29 String(String),
30 Object(serde_json::Value),
31 }
32
33 let opt = Option::<FlexiblePayload>::deserialize(deserializer)?;
34 Ok(opt
35 .map(|flex| match flex {
36 FlexiblePayload::String(s) => s,
37 FlexiblePayload::Object(serde_json::Value::Null) => String::new(),
38 FlexiblePayload::Object(obj) => {
39 serde_json::to_string(&obj).unwrap_or_else(|_| obj.to_string())
41 }
42 })
43 .unwrap_or_default())
44}
45
46#[derive(Debug, Clone, Serialize, Deserialize)]
56pub struct EventRecord {
57 pub ts: String,
59
60 #[serde(default)]
62 pub iteration: u32,
63
64 #[serde(default)]
66 pub hat: String,
67
68 pub topic: String,
70
71 #[serde(skip_serializing_if = "Option::is_none")]
73 pub triggered: Option<String>,
74
75 #[serde(default, deserialize_with = "deserialize_flexible_payload")]
78 pub payload: String,
79
80 #[serde(skip_serializing_if = "Option::is_none")]
82 pub blocked_count: Option<u32>,
83}
84
85impl EventRecord {
86 const MAX_PAYLOAD_LEN: usize = 500;
88
89 pub fn new(
91 iteration: u32,
92 hat: impl Into<String>,
93 event: &Event,
94 triggered: Option<&HatId>,
95 ) -> Self {
96 let payload = if event.payload.len() > Self::MAX_PAYLOAD_LEN {
97 let truncate_at = floor_char_boundary(&event.payload, Self::MAX_PAYLOAD_LEN);
99 format!(
100 "{}... [truncated, {} chars total]",
101 &event.payload[..truncate_at],
102 event.payload.chars().count()
103 )
104 } else {
105 event.payload.clone()
106 };
107
108 Self {
109 ts: chrono::Utc::now().to_rfc3339(),
110 iteration,
111 hat: hat.into(),
112 topic: event.topic.to_string(),
113 triggered: triggered.map(|h| h.to_string()),
114 payload,
115 blocked_count: None,
116 }
117 }
118
119 pub fn with_blocked_count(mut self, count: u32) -> Self {
121 self.blocked_count = Some(count);
122 self
123 }
124}
125
126pub struct EventLogger {
128 path: PathBuf,
130
131 file: Option<File>,
133}
134
135impl EventLogger {
136 pub const DEFAULT_PATH: &'static str = ".ralph/events.jsonl";
138
139 pub fn new(path: impl Into<PathBuf>) -> Self {
143 Self {
144 path: path.into(),
145 file: None,
146 }
147 }
148
149 pub fn default_path() -> Self {
151 Self::new(Self::DEFAULT_PATH)
152 }
153
154 pub fn from_context(context: &LoopContext) -> Self {
160 let events_path = std::fs::read_to_string(context.current_events_marker())
164 .map(|s| {
165 let relative = s.trim();
166 context.workspace().join(relative)
167 })
168 .unwrap_or_else(|_| context.events_path());
169 Self::new(events_path)
170 }
171
172 fn ensure_open(&mut self) -> std::io::Result<&mut File> {
174 if self.file.is_none() {
175 if let Some(parent) = self.path.parent() {
176 fs::create_dir_all(parent)?;
177 }
178 let file = OpenOptions::new()
179 .create(true)
180 .append(true)
181 .open(&self.path)?;
182 self.file = Some(file);
183 }
184 Ok(self.file.as_mut().unwrap())
185 }
186
187 pub fn log(&mut self, record: &EventRecord) -> std::io::Result<()> {
193 let file = self.ensure_open()?;
194 let mut json = serde_json::to_string(record)?;
195 json.push('\n');
196 file.write_all(json.as_bytes())?;
198 file.flush()?;
199 debug!(topic = %record.topic, iteration = record.iteration, "Event logged");
200 Ok(())
201 }
202
203 pub fn log_event(
205 &mut self,
206 iteration: u32,
207 hat: &str,
208 event: &Event,
209 triggered: Option<&HatId>,
210 ) -> std::io::Result<()> {
211 let record = EventRecord::new(iteration, hat, event, triggered);
212 self.log(&record)
213 }
214
215 pub fn path(&self) -> &Path {
217 &self.path
218 }
219}
220
221pub struct EventHistory {
223 path: PathBuf,
224}
225
226impl EventHistory {
227 pub fn new(path: impl Into<PathBuf>) -> Self {
229 Self { path: path.into() }
230 }
231
232 pub fn default_path() -> Self {
234 Self::new(EventLogger::DEFAULT_PATH)
235 }
236
237 pub fn from_context(context: &LoopContext) -> Self {
242 Self::new(context.events_path())
243 }
244
245 pub fn exists(&self) -> bool {
247 self.path.exists()
248 }
249
250 pub fn read_all(&self) -> std::io::Result<Vec<EventRecord>> {
252 if !self.exists() {
253 return Ok(Vec::new());
254 }
255
256 let file = File::open(&self.path)?;
257 let reader = BufReader::new(file);
258 let mut records = Vec::new();
259
260 for (line_num, line) in reader.lines().enumerate() {
261 let line = line?;
262 if line.trim().is_empty() {
263 continue;
264 }
265 match serde_json::from_str(&line) {
266 Ok(record) => records.push(record),
267 Err(e) => {
268 warn!(line = line_num + 1, error = %e, "Failed to parse event record");
269 }
270 }
271 }
272
273 Ok(records)
274 }
275
276 pub fn read_last(&self, n: usize) -> std::io::Result<Vec<EventRecord>> {
278 let all = self.read_all()?;
279 let start = all.len().saturating_sub(n);
280 Ok(all[start..].to_vec())
281 }
282
283 pub fn filter_by_topic(&self, topic: &str) -> std::io::Result<Vec<EventRecord>> {
285 let all = self.read_all()?;
286 Ok(all.into_iter().filter(|r| r.topic == topic).collect())
287 }
288
289 pub fn filter_by_iteration(&self, iteration: u32) -> std::io::Result<Vec<EventRecord>> {
291 let all = self.read_all()?;
292 Ok(all
293 .into_iter()
294 .filter(|r| r.iteration == iteration)
295 .collect())
296 }
297
298 pub fn clear(&self) -> std::io::Result<()> {
300 if self.exists() {
301 fs::remove_file(&self.path)?;
302 }
303 Ok(())
304 }
305}
306
307#[cfg(test)]
308mod tests {
309 use super::*;
310 use tempfile::TempDir;
311
312 fn make_event(topic: &str, payload: &str) -> Event {
313 Event::new(topic, payload)
314 }
315
316 #[test]
317 fn test_log_and_read() {
318 let tmp = TempDir::new().unwrap();
319 let path = tmp.path().join("events.jsonl");
320
321 let mut logger = EventLogger::new(&path);
322
323 let event1 = make_event("task.start", "Starting task");
325 let event2 = make_event("build.done", "Build complete");
326
327 logger
328 .log_event(1, "loop", &event1, Some(&HatId::new("planner")))
329 .unwrap();
330 logger
331 .log_event(2, "builder", &event2, Some(&HatId::new("planner")))
332 .unwrap();
333
334 let history = EventHistory::new(&path);
336 let records = history.read_all().unwrap();
337
338 assert_eq!(records.len(), 2);
339 assert_eq!(records[0].topic, "task.start");
340 assert_eq!(records[0].iteration, 1);
341 assert_eq!(records[0].hat, "loop");
342 assert_eq!(records[0].triggered, Some("planner".to_string()));
343 assert_eq!(records[1].topic, "build.done");
344 }
345
346 #[test]
347 fn test_read_last() {
348 let tmp = TempDir::new().unwrap();
349 let path = tmp.path().join("events.jsonl");
350
351 let mut logger = EventLogger::new(&path);
352
353 for i in 1..=10 {
354 let event = make_event("test", &format!("Event {}", i));
355 logger.log_event(i, "hat", &event, None).unwrap();
356 }
357
358 let history = EventHistory::new(&path);
359 let last_3 = history.read_last(3).unwrap();
360
361 assert_eq!(last_3.len(), 3);
362 assert_eq!(last_3[0].iteration, 8);
363 assert_eq!(last_3[2].iteration, 10);
364 }
365
366 #[test]
367 fn test_filter_by_topic() {
368 let tmp = TempDir::new().unwrap();
369 let path = tmp.path().join("events.jsonl");
370
371 let mut logger = EventLogger::new(&path);
372
373 logger
374 .log_event(1, "hat", &make_event("build.done", "a"), None)
375 .unwrap();
376 logger
377 .log_event(2, "hat", &make_event("build.blocked", "b"), None)
378 .unwrap();
379 logger
380 .log_event(3, "hat", &make_event("build.done", "c"), None)
381 .unwrap();
382
383 let history = EventHistory::new(&path);
384 let blocked = history.filter_by_topic("build.blocked").unwrap();
385
386 assert_eq!(blocked.len(), 1);
387 assert_eq!(blocked[0].iteration, 2);
388 }
389
390 #[test]
391 fn test_payload_truncation() {
392 let long_payload = "x".repeat(1000);
393 let event = make_event("test", &long_payload);
394 let record = EventRecord::new(1, "hat", &event, None);
395
396 assert!(record.payload.len() < 1000);
397 assert!(record.payload.contains("[truncated"));
398 }
399
400 #[test]
401 fn test_payload_truncation_with_multibyte_chars() {
402 let mut payload = "x".repeat(498);
405 payload.push_str("✅✅✅"); payload.push_str(&"y".repeat(500));
407
408 let event = make_event("test", &payload);
409 let record = EventRecord::new(1, "hat", &event, None);
411
412 assert!(record.payload.contains("[truncated"));
413 for _ in record.payload.chars() {}
415 }
416
417 #[test]
418 fn test_creates_parent_directory() {
419 let tmp = TempDir::new().unwrap();
420 let path = tmp.path().join("nested/dir/events.jsonl");
421
422 let mut logger = EventLogger::new(&path);
423 let event = make_event("test", "payload");
424 logger.log_event(1, "hat", &event, None).unwrap();
425
426 assert!(path.exists());
427 }
428
429 #[test]
430 fn test_empty_history() {
431 let tmp = TempDir::new().unwrap();
432 let path = tmp.path().join("nonexistent.jsonl");
433
434 let history = EventHistory::new(&path);
435 assert!(!history.exists());
436
437 let records = history.read_all().unwrap();
438 assert!(records.is_empty());
439 }
440
441 #[test]
442 fn test_agent_written_events_without_iteration() {
443 let tmp = TempDir::new().unwrap();
446 let path = tmp.path().join("events.jsonl");
447
448 let mut file = File::create(&path).unwrap();
450 writeln!(
451 file,
452 r#"{{"topic":"build.task","payload":"Implement auth","ts":"2024-01-15T10:00:00Z"}}"#
453 )
454 .unwrap();
455 writeln!(
456 file,
457 r#"{{"topic":"build.done","ts":"2024-01-15T10:30:00Z"}}"#
458 )
459 .unwrap();
460
461 let history = EventHistory::new(&path);
463 let records = history.read_all().unwrap();
464
465 assert_eq!(records.len(), 2);
466 assert_eq!(records[0].topic, "build.task");
467 assert_eq!(records[0].payload, "Implement auth");
468 assert_eq!(records[0].iteration, 0); assert_eq!(records[0].hat, ""); assert_eq!(records[1].topic, "build.done");
471 assert_eq!(records[1].payload, ""); }
473
474 #[test]
475 fn test_mixed_event_formats() {
476 let tmp = TempDir::new().unwrap();
478 let path = tmp.path().join("events.jsonl");
479
480 let mut logger = EventLogger::new(&path);
482 let event = make_event("task.start", "Initial task");
483 logger
484 .log_event(1, "loop", &event, Some(&HatId::new("planner")))
485 .unwrap();
486
487 let mut file = std::fs::OpenOptions::new()
489 .append(true)
490 .open(&path)
491 .unwrap();
492 writeln!(
493 file,
494 r#"{{"topic":"build.task","payload":"Agent wrote this","ts":"2024-01-15T10:05:00Z"}}"#
495 )
496 .unwrap();
497
498 let history = EventHistory::new(&path);
500 let records = history.read_all().unwrap();
501
502 assert_eq!(records.len(), 2);
503 assert_eq!(records[0].topic, "task.start");
505 assert_eq!(records[0].iteration, 1);
506 assert_eq!(records[0].hat, "loop");
507 assert_eq!(records[1].topic, "build.task");
509 assert_eq!(records[1].iteration, 0); assert_eq!(records[1].hat, ""); }
512
513 #[test]
514 fn test_object_payload_from_ralph_emit_json() {
515 let tmp = TempDir::new().unwrap();
518 let path = tmp.path().join("events.jsonl");
519
520 let mut file = File::create(&path).unwrap();
521
522 writeln!(
524 file,
525 r#"{{"ts":"2024-01-15T10:00:00Z","topic":"task.start","payload":"implement feature"}}"#
526 )
527 .unwrap();
528
529 writeln!(
531 file,
532 r#"{{"topic":"task.complete","payload":{{"status":"verified","tasks":["auth","api"]}},"ts":"2024-01-15T10:30:00Z"}}"#
533 )
534 .unwrap();
535
536 writeln!(
538 file,
539 r#"{{"topic":"loop.recovery","payload":{{"status":"recovered","evidence":{{"tests":"pass"}}}},"ts":"2024-01-15T10:45:00Z"}}"#
540 )
541 .unwrap();
542
543 let history = EventHistory::new(&path);
544 let records = history.read_all().unwrap();
545
546 assert_eq!(records.len(), 3);
547
548 assert_eq!(records[0].topic, "task.start");
550 assert_eq!(records[0].payload, "implement feature");
551
552 assert_eq!(records[1].topic, "task.complete");
554 assert!(records[1].payload.contains("\"status\""));
555 assert!(records[1].payload.contains("\"verified\""));
556 let parsed: serde_json::Value = serde_json::from_str(&records[1].payload).unwrap();
558 assert_eq!(parsed["status"], "verified");
559
560 assert_eq!(records[2].topic, "loop.recovery");
562 let parsed: serde_json::Value = serde_json::from_str(&records[2].payload).unwrap();
563 assert_eq!(parsed["evidence"]["tests"], "pass");
564 }
565}