swarm_engine_core/events/
persistence.rs1use std::io::Write;
6use std::path::PathBuf;
7
8use serde::Serialize;
9use tokio::sync::broadcast;
10
11use super::action::ActionEvent;
12
13#[derive(Debug, Serialize)]
15struct JsonlEvent {
16 tick: u64,
17 worker_id: usize,
18 action: String,
19 target: Option<String>,
20 success: bool,
21 error: Option<String>,
22 duration_ms: u64,
23 #[serde(skip_serializing_if = "Option::is_none")]
24 selection_logic: Option<String>,
25 #[serde(skip_serializing_if = "Option::is_none")]
26 previous_action: Option<String>,
27}
28
29impl From<&ActionEvent> for JsonlEvent {
30 fn from(e: &ActionEvent) -> Self {
31 Self {
32 tick: e.tick,
33 worker_id: e.worker_id.0,
34 action: e.action.clone(),
35 target: e.target.clone(),
36 success: e.result.success,
37 error: e.result.error.clone(),
38 duration_ms: e.duration.as_millis() as u64,
39 selection_logic: e.context.selection_logic.clone(),
40 previous_action: e.context.previous_action.clone(),
41 }
42 }
43}
44
45pub struct JsonlWriter {
50 rx: broadcast::Receiver<ActionEvent>,
51 path: PathBuf,
52 buffer_lines: usize,
53}
54
55impl JsonlWriter {
56 pub fn new(rx: broadcast::Receiver<ActionEvent>, path: impl Into<PathBuf>) -> Self {
57 Self {
58 rx,
59 path: path.into(),
60 buffer_lines: 0,
61 }
62 }
63
64 pub fn with_buffer(mut self, lines: usize) -> Self {
66 self.buffer_lines = lines;
67 self
68 }
69
70 pub async fn run(mut self) -> std::io::Result<()> {
72 let file = std::fs::OpenOptions::new()
73 .create(true)
74 .append(true)
75 .open(&self.path)?;
76 let mut writer = std::io::BufWriter::new(file);
77 let mut line_count = 0;
78
79 while let Ok(event) = self.rx.recv().await {
80 let jsonl_event = JsonlEvent::from(&event);
81 if let Ok(json) = serde_json::to_string(&jsonl_event) {
82 writeln!(writer, "{}", json)?;
83 line_count += 1;
84
85 if self.buffer_lines == 0 || line_count >= self.buffer_lines {
86 writer.flush()?;
87 line_count = 0;
88 }
89 }
90 }
91
92 writer.flush()?;
93 Ok(())
94 }
95}
96
97#[cfg(test)]
98mod tests {
99 use std::time::Duration;
100
101 use super::*;
102 use crate::events::action::{ActionEventBuilder, ActionEventResult};
103 use crate::types::WorkerId;
104
105 fn make_event(tick: u64, action: &str, success: bool) -> ActionEvent {
106 let result = if success {
107 ActionEventResult::success()
108 } else {
109 ActionEventResult::failure("error")
110 };
111
112 ActionEventBuilder::new(tick, WorkerId(0), action)
113 .result(result)
114 .duration(Duration::from_millis(50))
115 .build()
116 }
117
118 #[tokio::test]
119 async fn test_jsonl_writer() {
120 let temp_dir = std::env::temp_dir();
121 let path = temp_dir.join(format!("test_events_{}.jsonl", std::process::id()));
122
123 let (tx, rx) = broadcast::channel::<ActionEvent>(16);
124 let writer = JsonlWriter::new(rx, &path);
125
126 let handle = tokio::spawn(async move {
127 writer.run().await.unwrap();
128 });
129
130 tx.send(make_event(1, "CheckStatus", true)).unwrap();
131 tx.send(make_event(2, "ReadLogs", false)).unwrap();
132
133 tokio::time::sleep(Duration::from_millis(10)).await;
134
135 drop(tx);
136 let _ = handle.await;
137
138 let content = std::fs::read_to_string(&path).unwrap();
139 let lines: Vec<&str> = content.lines().collect();
140 assert_eq!(lines.len(), 2);
141
142 let first: serde_json::Value = serde_json::from_str(lines[0]).unwrap();
143 assert_eq!(first["tick"], 1);
144 assert_eq!(first["action"], "CheckStatus");
145 assert!(first["success"].as_bool().unwrap());
146
147 std::fs::remove_file(&path).ok();
148 }
149}