1use crate::loop_context::LoopContext;
7use ralph_proto::{Event, HatId};
8use serde::{Deserialize, Deserializer, Serialize};
9use std::fs::{self, File, OpenOptions};
10use std::io::{BufRead, BufReader, Write};
11use std::path::{Path, PathBuf};
12use tracing::{debug, warn};
13
14fn deserialize_flexible_payload<'de, D>(deserializer: D) -> Result<String, D::Error>
22where
23 D: Deserializer<'de>,
24{
25 #[derive(Deserialize)]
26 #[serde(untagged)]
27 enum FlexiblePayload {
28 String(String),
29 Object(serde_json::Value),
30 }
31
32 let opt = Option::<FlexiblePayload>::deserialize(deserializer)?;
33 Ok(opt
34 .map(|flex| match flex {
35 FlexiblePayload::String(s) => s,
36 FlexiblePayload::Object(serde_json::Value::Null) => String::new(),
37 FlexiblePayload::Object(obj) => {
38 serde_json::to_string(&obj).unwrap_or_else(|_| obj.to_string())
40 }
41 })
42 .unwrap_or_default())
43}
44
45#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct EventRecord {
56 pub ts: String,
58
59 #[serde(default)]
61 pub iteration: u32,
62
63 #[serde(default)]
65 pub hat: String,
66
67 pub topic: String,
69
70 #[serde(skip_serializing_if = "Option::is_none")]
72 pub triggered: Option<String>,
73
74 #[serde(default, deserialize_with = "deserialize_flexible_payload")]
77 pub payload: String,
78
79 #[serde(skip_serializing_if = "Option::is_none")]
81 pub blocked_count: Option<u32>,
82}
83
84impl EventRecord {
85 const MAX_PAYLOAD_LEN: usize = 500;
87
88 pub fn new(
90 iteration: u32,
91 hat: impl Into<String>,
92 event: &Event,
93 triggered: Option<&HatId>,
94 ) -> Self {
95 let payload = if event.payload.len() > Self::MAX_PAYLOAD_LEN {
96 let mut truncate_at = Self::MAX_PAYLOAD_LEN;
99 while truncate_at > 0 && !event.payload.is_char_boundary(truncate_at) {
100 truncate_at -= 1;
101 }
102 format!(
103 "{}... [truncated, {} chars total]",
104 &event.payload[..truncate_at],
105 event.payload.chars().count()
106 )
107 } else {
108 event.payload.clone()
109 };
110
111 Self {
112 ts: chrono::Utc::now().to_rfc3339(),
113 iteration,
114 hat: hat.into(),
115 topic: event.topic.to_string(),
116 triggered: triggered.map(|h| h.to_string()),
117 payload,
118 blocked_count: None,
119 }
120 }
121
122 pub fn with_blocked_count(mut self, count: u32) -> Self {
124 self.blocked_count = Some(count);
125 self
126 }
127}
128
129pub struct EventLogger {
131 path: PathBuf,
133
134 file: Option<File>,
136}
137
138impl EventLogger {
139 pub const DEFAULT_PATH: &'static str = ".ralph/events.jsonl";
141
142 pub fn new(path: impl Into<PathBuf>) -> Self {
146 Self {
147 path: path.into(),
148 file: None,
149 }
150 }
151
152 pub fn default_path() -> Self {
154 Self::new(Self::DEFAULT_PATH)
155 }
156
157 pub fn from_context(context: &LoopContext) -> Self {
163 let events_path = std::fs::read_to_string(context.current_events_marker())
167 .map(|s| {
168 let relative = s.trim();
169 context.workspace().join(relative)
170 })
171 .unwrap_or_else(|_| context.events_path());
172 Self::new(events_path)
173 }
174
175 fn ensure_open(&mut self) -> std::io::Result<&mut File> {
177 if self.file.is_none() {
178 if let Some(parent) = self.path.parent() {
179 fs::create_dir_all(parent)?;
180 }
181 let file = OpenOptions::new()
182 .create(true)
183 .append(true)
184 .open(&self.path)?;
185 self.file = Some(file);
186 }
187 Ok(self.file.as_mut().unwrap())
188 }
189
190 pub fn log(&mut self, record: &EventRecord) -> std::io::Result<()> {
196 let file = self.ensure_open()?;
197 let mut json = serde_json::to_string(record)?;
198 json.push('\n');
199 file.write_all(json.as_bytes())?;
201 file.flush()?;
202 debug!(topic = %record.topic, iteration = record.iteration, "Event logged");
203 Ok(())
204 }
205
206 pub fn log_event(
208 &mut self,
209 iteration: u32,
210 hat: &str,
211 event: &Event,
212 triggered: Option<&HatId>,
213 ) -> std::io::Result<()> {
214 let record = EventRecord::new(iteration, hat, event, triggered);
215 self.log(&record)
216 }
217
218 pub fn path(&self) -> &Path {
220 &self.path
221 }
222}
223
224pub struct EventHistory {
226 path: PathBuf,
227}
228
229impl EventHistory {
230 pub fn new(path: impl Into<PathBuf>) -> Self {
232 Self { path: path.into() }
233 }
234
235 pub fn default_path() -> Self {
237 Self::new(EventLogger::DEFAULT_PATH)
238 }
239
240 pub fn from_context(context: &LoopContext) -> Self {
245 Self::new(context.events_path())
246 }
247
248 pub fn exists(&self) -> bool {
250 self.path.exists()
251 }
252
253 pub fn read_all(&self) -> std::io::Result<Vec<EventRecord>> {
255 if !self.exists() {
256 return Ok(Vec::new());
257 }
258
259 let file = File::open(&self.path)?;
260 let reader = BufReader::new(file);
261 let mut records = Vec::new();
262
263 for (line_num, line) in reader.lines().enumerate() {
264 let line = line?;
265 if line.trim().is_empty() {
266 continue;
267 }
268 match serde_json::from_str(&line) {
269 Ok(record) => records.push(record),
270 Err(e) => {
271 warn!(line = line_num + 1, error = %e, "Failed to parse event record");
272 }
273 }
274 }
275
276 Ok(records)
277 }
278
279 pub fn read_last(&self, n: usize) -> std::io::Result<Vec<EventRecord>> {
281 let all = self.read_all()?;
282 let start = all.len().saturating_sub(n);
283 Ok(all[start..].to_vec())
284 }
285
286 pub fn filter_by_topic(&self, topic: &str) -> std::io::Result<Vec<EventRecord>> {
288 let all = self.read_all()?;
289 Ok(all.into_iter().filter(|r| r.topic == topic).collect())
290 }
291
292 pub fn filter_by_iteration(&self, iteration: u32) -> std::io::Result<Vec<EventRecord>> {
294 let all = self.read_all()?;
295 Ok(all
296 .into_iter()
297 .filter(|r| r.iteration == iteration)
298 .collect())
299 }
300
301 pub fn clear(&self) -> std::io::Result<()> {
303 if self.exists() {
304 fs::remove_file(&self.path)?;
305 }
306 Ok(())
307 }
308}
309
310#[cfg(test)]
311mod tests {
312 use super::*;
313 use tempfile::TempDir;
314
315 fn make_event(topic: &str, payload: &str) -> Event {
316 Event::new(topic, payload)
317 }
318
319 #[test]
320 fn test_log_and_read() {
321 let tmp = TempDir::new().unwrap();
322 let path = tmp.path().join("events.jsonl");
323
324 let mut logger = EventLogger::new(&path);
325
326 let event1 = make_event("task.start", "Starting task");
328 let event2 = make_event("build.done", "Build complete");
329
330 logger
331 .log_event(1, "loop", &event1, Some(&HatId::new("planner")))
332 .unwrap();
333 logger
334 .log_event(2, "builder", &event2, Some(&HatId::new("planner")))
335 .unwrap();
336
337 let history = EventHistory::new(&path);
339 let records = history.read_all().unwrap();
340
341 assert_eq!(records.len(), 2);
342 assert_eq!(records[0].topic, "task.start");
343 assert_eq!(records[0].iteration, 1);
344 assert_eq!(records[0].hat, "loop");
345 assert_eq!(records[0].triggered, Some("planner".to_string()));
346 assert_eq!(records[1].topic, "build.done");
347 }
348
349 #[test]
350 fn test_read_last() {
351 let tmp = TempDir::new().unwrap();
352 let path = tmp.path().join("events.jsonl");
353
354 let mut logger = EventLogger::new(&path);
355
356 for i in 1..=10 {
357 let event = make_event("test", &format!("Event {}", i));
358 logger.log_event(i, "hat", &event, None).unwrap();
359 }
360
361 let history = EventHistory::new(&path);
362 let last_3 = history.read_last(3).unwrap();
363
364 assert_eq!(last_3.len(), 3);
365 assert_eq!(last_3[0].iteration, 8);
366 assert_eq!(last_3[2].iteration, 10);
367 }
368
369 #[test]
370 fn test_filter_by_topic() {
371 let tmp = TempDir::new().unwrap();
372 let path = tmp.path().join("events.jsonl");
373
374 let mut logger = EventLogger::new(&path);
375
376 logger
377 .log_event(1, "hat", &make_event("build.done", "a"), None)
378 .unwrap();
379 logger
380 .log_event(2, "hat", &make_event("build.blocked", "b"), None)
381 .unwrap();
382 logger
383 .log_event(3, "hat", &make_event("build.done", "c"), None)
384 .unwrap();
385
386 let history = EventHistory::new(&path);
387 let blocked = history.filter_by_topic("build.blocked").unwrap();
388
389 assert_eq!(blocked.len(), 1);
390 assert_eq!(blocked[0].iteration, 2);
391 }
392
393 #[test]
394 fn test_payload_truncation() {
395 let long_payload = "x".repeat(1000);
396 let event = make_event("test", &long_payload);
397 let record = EventRecord::new(1, "hat", &event, None);
398
399 assert!(record.payload.len() < 1000);
400 assert!(record.payload.contains("[truncated"));
401 }
402
403 #[test]
404 fn test_payload_truncation_with_multibyte_chars() {
405 let mut payload = "x".repeat(498);
408 payload.push_str("✅✅✅"); payload.push_str(&"y".repeat(500));
410
411 let event = make_event("test", &payload);
412 let record = EventRecord::new(1, "hat", &event, None);
414
415 assert!(record.payload.contains("[truncated"));
416 for _ in record.payload.chars() {}
418 }
419
420 #[test]
421 fn test_creates_parent_directory() {
422 let tmp = TempDir::new().unwrap();
423 let path = tmp.path().join("nested/dir/events.jsonl");
424
425 let mut logger = EventLogger::new(&path);
426 let event = make_event("test", "payload");
427 logger.log_event(1, "hat", &event, None).unwrap();
428
429 assert!(path.exists());
430 }
431
432 #[test]
433 fn test_empty_history() {
434 let tmp = TempDir::new().unwrap();
435 let path = tmp.path().join("nonexistent.jsonl");
436
437 let history = EventHistory::new(&path);
438 assert!(!history.exists());
439
440 let records = history.read_all().unwrap();
441 assert!(records.is_empty());
442 }
443
444 #[test]
445 fn test_agent_written_events_without_iteration() {
446 let tmp = TempDir::new().unwrap();
449 let path = tmp.path().join("events.jsonl");
450
451 let mut file = File::create(&path).unwrap();
453 writeln!(
454 file,
455 r#"{{"topic":"build.task","payload":"Implement auth","ts":"2024-01-15T10:00:00Z"}}"#
456 )
457 .unwrap();
458 writeln!(
459 file,
460 r#"{{"topic":"build.done","ts":"2024-01-15T10:30:00Z"}}"#
461 )
462 .unwrap();
463
464 let history = EventHistory::new(&path);
466 let records = history.read_all().unwrap();
467
468 assert_eq!(records.len(), 2);
469 assert_eq!(records[0].topic, "build.task");
470 assert_eq!(records[0].payload, "Implement auth");
471 assert_eq!(records[0].iteration, 0); assert_eq!(records[0].hat, ""); assert_eq!(records[1].topic, "build.done");
474 assert_eq!(records[1].payload, ""); }
476
477 #[test]
478 fn test_mixed_event_formats() {
479 let tmp = TempDir::new().unwrap();
481 let path = tmp.path().join("events.jsonl");
482
483 let mut logger = EventLogger::new(&path);
485 let event = make_event("task.start", "Initial task");
486 logger
487 .log_event(1, "loop", &event, Some(&HatId::new("planner")))
488 .unwrap();
489
490 let mut file = std::fs::OpenOptions::new()
492 .append(true)
493 .open(&path)
494 .unwrap();
495 writeln!(
496 file,
497 r#"{{"topic":"build.task","payload":"Agent wrote this","ts":"2024-01-15T10:05:00Z"}}"#
498 )
499 .unwrap();
500
501 let history = EventHistory::new(&path);
503 let records = history.read_all().unwrap();
504
505 assert_eq!(records.len(), 2);
506 assert_eq!(records[0].topic, "task.start");
508 assert_eq!(records[0].iteration, 1);
509 assert_eq!(records[0].hat, "loop");
510 assert_eq!(records[1].topic, "build.task");
512 assert_eq!(records[1].iteration, 0); assert_eq!(records[1].hat, ""); }
515
516 #[test]
517 fn test_object_payload_from_ralph_emit_json() {
518 let tmp = TempDir::new().unwrap();
521 let path = tmp.path().join("events.jsonl");
522
523 let mut file = File::create(&path).unwrap();
524
525 writeln!(
527 file,
528 r#"{{"ts":"2024-01-15T10:00:00Z","topic":"task.start","payload":"implement feature"}}"#
529 )
530 .unwrap();
531
532 writeln!(
534 file,
535 r#"{{"topic":"task.complete","payload":{{"status":"verified","tasks":["auth","api"]}},"ts":"2024-01-15T10:30:00Z"}}"#
536 )
537 .unwrap();
538
539 writeln!(
541 file,
542 r#"{{"topic":"loop.recovery","payload":{{"status":"recovered","evidence":{{"tests":"pass"}}}},"ts":"2024-01-15T10:45:00Z"}}"#
543 )
544 .unwrap();
545
546 let history = EventHistory::new(&path);
547 let records = history.read_all().unwrap();
548
549 assert_eq!(records.len(), 3);
550
551 assert_eq!(records[0].topic, "task.start");
553 assert_eq!(records[0].payload, "implement feature");
554
555 assert_eq!(records[1].topic, "task.complete");
557 assert!(records[1].payload.contains("\"status\""));
558 assert!(records[1].payload.contains("\"verified\""));
559 let parsed: serde_json::Value = serde_json::from_str(&records[1].payload).unwrap();
561 assert_eq!(parsed["status"], "verified");
562
563 assert_eq!(records[2].topic, "loop.recovery");
565 let parsed: serde_json::Value = serde_json::from_str(&records[2].payload).unwrap();
566 assert_eq!(parsed["evidence"]["tests"], "pass");
567 }
568}