use ralph_proto::{Event, UxEvent};
use serde::{Deserialize, Serialize};
use std::io::{self, Write};
use std::sync::Mutex;
use std::time::{Instant, SystemTime, UNIX_EPOCH};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Record {
pub ts: u64,
pub event: String,
pub data: serde_json::Value,
}
impl Record {
pub fn new(event: impl Into<String>, data: impl Serialize) -> Self {
let ts = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
Self {
ts,
event: event.into(),
data: serde_json::to_value(data).unwrap_or(serde_json::Value::Null),
}
}
pub fn from_bus_event(event: &Event) -> Self {
Self::new("bus.publish", event)
}
pub fn from_ux_event(ux_event: &UxEvent) -> Self {
let event_type = match ux_event {
UxEvent::TerminalWrite(_) => "ux.terminal.write",
UxEvent::TerminalResize(_) => "ux.terminal.resize",
UxEvent::TerminalColorMode(_) => "ux.terminal.color_mode",
UxEvent::TuiFrame(_) => "ux.tui.frame",
};
Self::new(event_type, ux_event)
}
pub fn meta_loop_start(prompt_file: &str, max_iterations: u32, ux_mode: Option<&str>) -> Self {
Self::new(
"_meta.loop_start",
serde_json::json!({
"prompt_file": prompt_file,
"max_iterations": max_iterations,
"ux_mode": ux_mode.unwrap_or("cli"),
}),
)
}
pub fn meta_iteration(iteration: u32, elapsed_ms: u64, hat: &str) -> Self {
Self::new(
"_meta.iteration",
serde_json::json!({
"n": iteration,
"elapsed_ms": elapsed_ms,
"hat": hat,
}),
)
}
pub fn meta_termination(
reason: &str,
iterations: u32,
elapsed_secs: f64,
ux_writes: u32,
) -> Self {
Self::new(
"_meta.termination",
serde_json::json!({
"reason": reason,
"iterations": iterations,
"elapsed_secs": elapsed_secs,
"ux_writes": ux_writes,
}),
)
}
}
pub struct SessionRecorder<W> {
writer: Mutex<W>,
start_time: Instant,
ux_write_count: Mutex<u32>,
}
impl<W: Write> SessionRecorder<W> {
pub fn new(writer: W) -> Self {
Self {
writer: Mutex::new(writer),
start_time: Instant::now(),
ux_write_count: Mutex::new(0),
}
}
pub fn record_bus_event(&self, event: &Event) {
let record = Record::from_bus_event(event);
self.write_record(&record);
}
pub fn record_ux_event(&self, ux_event: &UxEvent) {
if matches!(ux_event, UxEvent::TerminalWrite(_))
&& let Ok(mut count) = self.ux_write_count.lock()
{
*count += 1;
}
let record = Record::from_ux_event(ux_event);
self.write_record(&record);
}
pub fn record_ux_events(&self, events: &[UxEvent]) {
for event in events {
self.record_ux_event(event);
}
}
pub fn record_meta(&self, record: Record) {
self.write_record(&record);
}
pub fn ux_write_count(&self) -> u32 {
self.ux_write_count.lock().map(|g| *g).unwrap_or(0)
}
pub fn elapsed(&self) -> std::time::Duration {
self.start_time.elapsed()
}
fn write_record(&self, record: &Record) {
if let Ok(mut writer) = self.writer.lock() {
if let Ok(json) = serde_json::to_string(record) {
let _ = writeln!(writer, "{}", json);
let _ = writer.flush();
}
}
}
pub fn flush(&self) -> io::Result<()> {
self.writer
.lock()
.map_err(|_| io::Error::new(io::ErrorKind::Other, "Failed to acquire writer lock"))?
.flush()
}
}
impl<W: Write + Send + 'static> SessionRecorder<W> {
pub fn make_observer(recorder: std::sync::Arc<Self>) -> impl Fn(&Event) + Send + 'static {
move |event| {
recorder.record_bus_event(event);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_record_bus_event() {
let mut output = Vec::new();
{
let recorder = SessionRecorder::new(&mut output);
let event = Event::new("task.start", "Begin work");
recorder.record_bus_event(&event);
}
let output_str = String::from_utf8_lossy(&output);
assert!(output_str.contains("bus.publish"));
assert!(output_str.contains("task.start"));
assert!(output_str.contains("Begin work"));
}
#[test]
fn test_record_ux_event() {
use ralph_proto::TerminalWrite;
let mut output = Vec::new();
{
let recorder = SessionRecorder::new(&mut output);
let ux_event = UxEvent::TerminalWrite(TerminalWrite::new(b"Hello", true, 100));
recorder.record_ux_event(&ux_event);
}
let output_str = String::from_utf8_lossy(&output);
assert!(output_str.contains("ux.terminal.write"));
assert!(output_str.contains("SGVsbG8=")); }
#[test]
fn test_record_metadata() {
let mut output = Vec::new();
{
let recorder = SessionRecorder::new(&mut output);
recorder.record_meta(Record::meta_loop_start("PROMPT.md", 100, Some("cli")));
recorder.record_meta(Record::meta_iteration(1, 5000, "default"));
recorder.record_meta(Record::meta_termination("CompletionPromise", 3, 25.5, 42));
}
let output_str = String::from_utf8_lossy(&output);
assert!(output_str.contains("_meta.loop_start"));
assert!(output_str.contains("_meta.iteration"));
assert!(output_str.contains("_meta.termination"));
assert!(output_str.contains("PROMPT.md"));
assert!(output_str.contains("CompletionPromise"));
}
#[test]
fn test_jsonl_format() {
let mut output = Vec::new();
{
let recorder = SessionRecorder::new(&mut output);
recorder.record_bus_event(&Event::new("test.1", "First"));
recorder.record_bus_event(&Event::new("test.2", "Second"));
}
let output_str = String::from_utf8_lossy(&output);
let lines: Vec<&str> = output_str.lines().collect();
assert_eq!(lines.len(), 2);
for line in lines {
let parsed: Result<serde_json::Value, _> = serde_json::from_str(line);
assert!(parsed.is_ok(), "Line should be valid JSON: {}", line);
}
}
#[test]
fn test_ux_write_count() {
use ralph_proto::{TerminalResize, TerminalWrite};
let output = Vec::new();
let recorder = SessionRecorder::new(output);
recorder.record_ux_event(&UxEvent::TerminalWrite(TerminalWrite::new(b"a", true, 0)));
recorder.record_ux_event(&UxEvent::TerminalResize(TerminalResize::new(80, 24, 10)));
recorder.record_ux_event(&UxEvent::TerminalWrite(TerminalWrite::new(b"b", true, 20)));
assert_eq!(recorder.ux_write_count(), 2);
}
#[test]
fn test_record_roundtrip() {
let event = Event::new("task.done", "Finished");
let record = Record::from_bus_event(&event);
let json = serde_json::to_string(&record).unwrap();
let parsed: Record = serde_json::from_str(&json).unwrap();
assert_eq!(parsed.event, "bus.publish");
assert!(parsed.ts > 0);
}
#[test]
fn test_record_bus_event_flushes_buffered_writer_immediately() {
use std::io::{BufWriter, Write};
use std::sync::{Arc, Mutex};
#[derive(Clone)]
struct SharedWriter(Arc<Mutex<Vec<u8>>>);
impl Write for SharedWriter {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.0
.lock()
.expect("shared writer lock")
.extend_from_slice(buf);
Ok(buf.len())
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
let shared = Arc::new(Mutex::new(Vec::new()));
let writer = BufWriter::new(SharedWriter(Arc::clone(&shared)));
let recorder = SessionRecorder::new(writer);
recorder.record_bus_event(&Event::new("task.start", "Begin work"));
let output = shared.lock().expect("shared bytes lock").clone();
let output_str = String::from_utf8(output).expect("utf8 output");
assert!(
output_str.contains("task.start"),
"session records should be flushed before normal drop/teardown"
);
}
}