use std::io::Write;
use std::path::Path;
use std::sync::{Arc, Mutex};
use super::action::ActionEvent;
pub trait TraceSubscriber: Send + Sync {
fn on_event(&self, event: &ActionEvent);
fn finish(&self) {}
}
#[derive(Debug, Default)]
pub struct NoOpTraceSubscriber;
impl NoOpTraceSubscriber {
pub fn new() -> Self {
Self
}
}
impl TraceSubscriber for NoOpTraceSubscriber {
fn on_event(&self, _event: &ActionEvent) {
}
}
#[derive(Debug, Default)]
pub struct InMemoryTraceSubscriber {
events: Mutex<Vec<TraceEvent>>,
}
impl InMemoryTraceSubscriber {
pub fn new() -> Self {
Self {
events: Mutex::new(Vec::new()),
}
}
pub fn len(&self) -> usize {
self.events.lock().unwrap().len()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn clear(&self) {
self.events.lock().unwrap().clear();
}
pub fn dump_to_file(&self, path: impl AsRef<Path>) -> std::io::Result<()> {
let events = self.events.lock().unwrap();
let file = std::fs::File::create(path)?;
let mut writer = std::io::BufWriter::new(file);
for event in events.iter() {
let json = serde_json::to_string(event)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
writeln!(writer, "{}", json)?;
}
writer.flush()?;
Ok(())
}
pub fn events(&self) -> Vec<TraceEvent> {
self.events.lock().unwrap().clone()
}
}
impl TraceSubscriber for InMemoryTraceSubscriber {
fn on_event(&self, event: &ActionEvent) {
let trace_event = TraceEvent::from(event);
self.events.lock().unwrap().push(trace_event);
}
}
pub struct JsonlTraceSubscriber {
writer: Mutex<std::io::BufWriter<std::fs::File>>,
}
impl JsonlTraceSubscriber {
pub fn new(path: impl AsRef<Path>) -> std::io::Result<Self> {
let file = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(path)?;
let writer = std::io::BufWriter::new(file);
Ok(Self {
writer: Mutex::new(writer),
})
}
}
impl TraceSubscriber for JsonlTraceSubscriber {
fn on_event(&self, event: &ActionEvent) {
let trace_event = TraceEvent::from(event);
if let Ok(json) = serde_json::to_string(&trace_event) {
if let Ok(mut writer) = self.writer.lock() {
let _ = writeln!(writer, "{}", json);
}
}
}
fn finish(&self) {
if let Ok(mut writer) = self.writer.lock() {
let _ = writer.flush();
}
}
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct TraceEvent {
pub tick: u64,
pub worker_id: usize,
pub action: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub target: Option<String>,
pub success: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub output: Option<String>,
pub duration_ms: u64,
#[serde(skip_serializing_if = "Option::is_none")]
pub selection_logic: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub previous_action: Option<String>,
#[serde(default, skip_serializing_if = "std::ops::Not::not")]
pub from_guidance: bool,
}
impl From<&ActionEvent> for TraceEvent {
fn from(e: &ActionEvent) -> Self {
Self {
tick: e.tick,
worker_id: e.worker_id.0,
action: e.action.clone(),
target: e.target.clone(),
success: e.result.success,
error: e.result.error.clone(),
output: e.result.output.clone(),
duration_ms: e.duration.as_millis() as u64,
selection_logic: e.context.selection_logic.clone(),
previous_action: e.context.previous_action.clone(),
from_guidance: e.context.from_guidance,
}
}
}
impl<T: TraceSubscriber> TraceSubscriber for Arc<T> {
fn on_event(&self, event: &ActionEvent) {
(**self).on_event(event);
}
fn finish(&self) {
(**self).finish();
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use super::*;
use crate::events::action::{ActionEventBuilder, ActionEventResult};
use crate::types::WorkerId;
fn make_event(tick: u64, action: &str, success: bool) -> ActionEvent {
let result = if success {
ActionEventResult::success_with_output("ok")
} else {
ActionEventResult::failure("error")
};
ActionEventBuilder::new(tick, WorkerId(0), action)
.result(result)
.duration(Duration::from_millis(50))
.build()
}
#[test]
fn test_noop_subscriber() {
let sub = NoOpTraceSubscriber::new();
sub.on_event(&make_event(1, "Test", true));
}
#[test]
fn test_in_memory_subscriber() {
let sub = InMemoryTraceSubscriber::new();
assert!(sub.is_empty());
sub.on_event(&make_event(1, "CheckStatus", true));
sub.on_event(&make_event(2, "ReadLogs", false));
assert_eq!(sub.len(), 2);
let events = sub.events();
assert_eq!(events[0].tick, 1);
assert_eq!(events[0].action, "CheckStatus");
assert!(events[0].success);
assert_eq!(events[1].tick, 2);
assert_eq!(events[1].action, "ReadLogs");
assert!(!events[1].success);
}
#[test]
fn test_jsonl_subscriber() {
let temp_dir = std::env::temp_dir();
let path = temp_dir.join(format!("test_trace_{}.jsonl", std::process::id()));
{
let sub = JsonlTraceSubscriber::new(&path).unwrap();
sub.on_event(&make_event(1, "CheckStatus", true));
sub.on_event(&make_event(2, "ReadLogs", false));
sub.finish();
}
let content = std::fs::read_to_string(&path).unwrap();
let lines: Vec<&str> = content.lines().collect();
assert_eq!(lines.len(), 2);
let first: TraceEvent = serde_json::from_str(lines[0]).unwrap();
assert_eq!(first.tick, 1);
assert_eq!(first.action, "CheckStatus");
std::fs::remove_file(&path).ok();
}
}