swarm_engine_core/events/
trace.rs1use std::io::Write;
21use std::path::Path;
22use std::sync::{Arc, Mutex};
23
24use super::action::ActionEvent;
25
26pub trait TraceSubscriber: Send + Sync {
30 fn on_event(&self, event: &ActionEvent);
32
33 fn finish(&self) {}
35}
36
37#[derive(Debug, Default)]
45pub struct NoOpTraceSubscriber;
46
47impl NoOpTraceSubscriber {
48 pub fn new() -> Self {
49 Self
50 }
51}
52
53impl TraceSubscriber for NoOpTraceSubscriber {
54 fn on_event(&self, _event: &ActionEvent) {
55 }
57}
58
59#[derive(Debug, Default)]
67pub struct InMemoryTraceSubscriber {
68 events: Mutex<Vec<TraceEvent>>,
69}
70
71impl InMemoryTraceSubscriber {
72 pub fn new() -> Self {
73 Self {
74 events: Mutex::new(Vec::new()),
75 }
76 }
77
78 pub fn len(&self) -> usize {
80 self.events.lock().unwrap().len()
81 }
82
83 pub fn is_empty(&self) -> bool {
85 self.len() == 0
86 }
87
88 pub fn clear(&self) {
90 self.events.lock().unwrap().clear();
91 }
92
93 pub fn dump_to_file(&self, path: impl AsRef<Path>) -> std::io::Result<()> {
95 let events = self.events.lock().unwrap();
96 let file = std::fs::File::create(path)?;
97 let mut writer = std::io::BufWriter::new(file);
98
99 for event in events.iter() {
100 let json = serde_json::to_string(event)
101 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
102 writeln!(writer, "{}", json)?;
103 }
104
105 writer.flush()?;
106 Ok(())
107 }
108
109 pub fn events(&self) -> Vec<TraceEvent> {
111 self.events.lock().unwrap().clone()
112 }
113}
114
115impl TraceSubscriber for InMemoryTraceSubscriber {
116 fn on_event(&self, event: &ActionEvent) {
117 let trace_event = TraceEvent::from(event);
118 self.events.lock().unwrap().push(trace_event);
119 }
120}
121
122pub struct JsonlTraceSubscriber {
128 writer: Mutex<std::io::BufWriter<std::fs::File>>,
129}
130
131impl JsonlTraceSubscriber {
132 pub fn new(path: impl AsRef<Path>) -> std::io::Result<Self> {
134 let file = std::fs::OpenOptions::new()
135 .create(true)
136 .append(true)
137 .open(path)?;
138 let writer = std::io::BufWriter::new(file);
139 Ok(Self {
140 writer: Mutex::new(writer),
141 })
142 }
143}
144
145impl TraceSubscriber for JsonlTraceSubscriber {
146 fn on_event(&self, event: &ActionEvent) {
147 let trace_event = TraceEvent::from(event);
148 if let Ok(json) = serde_json::to_string(&trace_event) {
149 if let Ok(mut writer) = self.writer.lock() {
150 let _ = writeln!(writer, "{}", json);
151 }
152 }
153 }
154
155 fn finish(&self) {
156 if let Ok(mut writer) = self.writer.lock() {
157 let _ = writer.flush();
158 }
159 }
160}
161
162#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
168pub struct TraceEvent {
169 pub tick: u64,
171 pub worker_id: usize,
173 pub action: String,
175 #[serde(skip_serializing_if = "Option::is_none")]
177 pub target: Option<String>,
178 pub success: bool,
180 #[serde(skip_serializing_if = "Option::is_none")]
182 pub error: Option<String>,
183 #[serde(skip_serializing_if = "Option::is_none")]
185 pub output: Option<String>,
186 pub duration_ms: u64,
188 #[serde(skip_serializing_if = "Option::is_none")]
190 pub selection_logic: Option<String>,
191 #[serde(skip_serializing_if = "Option::is_none")]
193 pub previous_action: Option<String>,
194 #[serde(default, skip_serializing_if = "std::ops::Not::not")]
196 pub from_guidance: bool,
197}
198
199impl From<&ActionEvent> for TraceEvent {
200 fn from(e: &ActionEvent) -> Self {
201 Self {
202 tick: e.tick,
203 worker_id: e.worker_id.0,
204 action: e.action.clone(),
205 target: e.target.clone(),
206 success: e.result.success,
207 error: e.result.error.clone(),
208 output: e.result.output.clone(),
209 duration_ms: e.duration.as_millis() as u64,
210 selection_logic: e.context.selection_logic.clone(),
211 previous_action: e.context.previous_action.clone(),
212 from_guidance: e.context.from_guidance,
213 }
214 }
215}
216
217impl<T: TraceSubscriber> TraceSubscriber for Arc<T> {
222 fn on_event(&self, event: &ActionEvent) {
223 (**self).on_event(event);
224 }
225
226 fn finish(&self) {
227 (**self).finish();
228 }
229}
230
231#[cfg(test)]
232mod tests {
233 use std::time::Duration;
234
235 use super::*;
236 use crate::events::action::{ActionEventBuilder, ActionEventResult};
237 use crate::types::WorkerId;
238
239 fn make_event(tick: u64, action: &str, success: bool) -> ActionEvent {
240 let result = if success {
241 ActionEventResult::success_with_output("ok")
242 } else {
243 ActionEventResult::failure("error")
244 };
245 ActionEventBuilder::new(tick, WorkerId(0), action)
246 .result(result)
247 .duration(Duration::from_millis(50))
248 .build()
249 }
250
251 #[test]
252 fn test_noop_subscriber() {
253 let sub = NoOpTraceSubscriber::new();
254 sub.on_event(&make_event(1, "Test", true));
255 }
257
258 #[test]
259 fn test_in_memory_subscriber() {
260 let sub = InMemoryTraceSubscriber::new();
261 assert!(sub.is_empty());
262
263 sub.on_event(&make_event(1, "CheckStatus", true));
264 sub.on_event(&make_event(2, "ReadLogs", false));
265
266 assert_eq!(sub.len(), 2);
267
268 let events = sub.events();
269 assert_eq!(events[0].tick, 1);
270 assert_eq!(events[0].action, "CheckStatus");
271 assert!(events[0].success);
272 assert_eq!(events[1].tick, 2);
273 assert_eq!(events[1].action, "ReadLogs");
274 assert!(!events[1].success);
275 }
276
277 #[test]
278 fn test_jsonl_subscriber() {
279 let temp_dir = std::env::temp_dir();
280 let path = temp_dir.join(format!("test_trace_{}.jsonl", std::process::id()));
281
282 {
283 let sub = JsonlTraceSubscriber::new(&path).unwrap();
284 sub.on_event(&make_event(1, "CheckStatus", true));
285 sub.on_event(&make_event(2, "ReadLogs", false));
286 sub.finish();
287 }
288
289 let content = std::fs::read_to_string(&path).unwrap();
290 let lines: Vec<&str> = content.lines().collect();
291 assert_eq!(lines.len(), 2);
292
293 let first: TraceEvent = serde_json::from_str(lines[0]).unwrap();
294 assert_eq!(first.tick, 1);
295 assert_eq!(first.action, "CheckStatus");
296
297 std::fs::remove_file(&path).ok();
298 }
299}