use std::io::Write;
use std::path::PathBuf;
use serde::Serialize;
use tokio::sync::broadcast;
use super::action::ActionEvent;
#[derive(Debug, Serialize)]
struct JsonlEvent {
tick: u64,
worker_id: usize,
action: String,
target: Option<String>,
success: bool,
error: Option<String>,
duration_ms: u64,
#[serde(skip_serializing_if = "Option::is_none")]
selection_logic: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
previous_action: Option<String>,
}
impl From<&ActionEvent> for JsonlEvent {
fn from(e: &ActionEvent) -> Self {
Self {
tick: e.tick,
worker_id: e.worker_id.0,
action: e.action.clone(),
target: e.target.clone(),
success: e.result.success,
error: e.result.error.clone(),
duration_ms: e.duration.as_millis() as u64,
selection_logic: e.context.selection_logic.clone(),
previous_action: e.context.previous_action.clone(),
}
}
}
pub struct JsonlWriter {
rx: broadcast::Receiver<ActionEvent>,
path: PathBuf,
buffer_lines: usize,
}
impl JsonlWriter {
pub fn new(rx: broadcast::Receiver<ActionEvent>, path: impl Into<PathBuf>) -> Self {
Self {
rx,
path: path.into(),
buffer_lines: 0,
}
}
pub fn with_buffer(mut self, lines: usize) -> Self {
self.buffer_lines = lines;
self
}
pub async fn run(mut self) -> std::io::Result<()> {
let file = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&self.path)?;
let mut writer = std::io::BufWriter::new(file);
let mut line_count = 0;
while let Ok(event) = self.rx.recv().await {
let jsonl_event = JsonlEvent::from(&event);
if let Ok(json) = serde_json::to_string(&jsonl_event) {
writeln!(writer, "{}", json)?;
line_count += 1;
if self.buffer_lines == 0 || line_count >= self.buffer_lines {
writer.flush()?;
line_count = 0;
}
}
}
writer.flush()?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use super::*;
use crate::events::action::{ActionEventBuilder, ActionEventResult};
use crate::types::WorkerId;
fn make_event(tick: u64, action: &str, success: bool) -> ActionEvent {
let result = if success {
ActionEventResult::success()
} else {
ActionEventResult::failure("error")
};
ActionEventBuilder::new(tick, WorkerId(0), action)
.result(result)
.duration(Duration::from_millis(50))
.build()
}
#[tokio::test]
async fn test_jsonl_writer() {
let temp_dir = std::env::temp_dir();
let path = temp_dir.join(format!("test_events_{}.jsonl", std::process::id()));
let (tx, rx) = broadcast::channel::<ActionEvent>(16);
let writer = JsonlWriter::new(rx, &path);
let handle = tokio::spawn(async move {
writer.run().await.unwrap();
});
tx.send(make_event(1, "CheckStatus", true)).unwrap();
tx.send(make_event(2, "ReadLogs", false)).unwrap();
tokio::time::sleep(Duration::from_millis(10)).await;
drop(tx);
let _ = handle.await;
let content = std::fs::read_to_string(&path).unwrap();
let lines: Vec<&str> = content.lines().collect();
assert_eq!(lines.len(), 2);
let first: serde_json::Value = serde_json::from_str(lines[0]).unwrap();
assert_eq!(first["tick"], 1);
assert_eq!(first["action"], "CheckStatus");
assert!(first["success"].as_bool().unwrap());
std::fs::remove_file(&path).ok();
}
}