swarm_engine_core/debug/
mod.rs1use serde::Serialize;
43use std::collections::HashMap;
44use std::sync::atomic::{AtomicBool, Ordering};
45use std::sync::Arc;
46use tokio::sync::broadcast;
47
48use crate::util::epoch_millis;
49
50pub trait Dumpable: Send + Sync {
58 fn name(&self) -> &'static str;
60
61 fn snapshot(&self, tick: u64) -> Option<serde_json::Value>;
66}
67
68#[derive(Debug, Clone, Serialize)]
74pub struct DebugSnapshot {
75 pub tick: u64,
77 pub timestamp_ms: u64,
79 pub data: HashMap<&'static str, serde_json::Value>,
81}
82
83pub struct TickDumper {
91 observers: Vec<Arc<dyn Dumpable>>,
93 tx: broadcast::Sender<DebugSnapshot>,
95 enabled: AtomicBool,
97 dump_on_error: AtomicBool,
99}
100
101impl TickDumper {
102 pub fn new(capacity: usize) -> Self {
106 let (tx, _) = broadcast::channel(capacity);
107 Self {
108 observers: Vec::new(),
109 tx,
110 enabled: AtomicBool::new(false),
111 dump_on_error: AtomicBool::new(true),
112 }
113 }
114
115 pub fn register(&mut self, obj: Arc<dyn Dumpable>) {
117 self.observers.push(obj);
118 }
119
120 pub fn register_all(&mut self, objs: impl IntoIterator<Item = Arc<dyn Dumpable>>) {
122 for obj in objs {
123 self.observers.push(obj);
124 }
125 }
126
127 pub fn enable(&self) {
129 self.enabled.store(true, Ordering::Relaxed);
130 }
131
132 pub fn disable(&self) {
134 self.enabled.store(false, Ordering::Relaxed);
135 }
136
137 pub fn is_enabled(&self) -> bool {
139 self.enabled.load(Ordering::Relaxed)
140 }
141
142 pub fn set_dump_on_error(&self, enabled: bool) {
144 self.dump_on_error.store(enabled, Ordering::Relaxed);
145 }
146
147 pub fn dump(&self, tick: u64) {
151 if !self.enabled.load(Ordering::Relaxed) {
152 return;
153 }
154
155 if let Some(snapshot) = self.collect_snapshot(tick) {
156 let _ = self.tx.send(snapshot);
157 }
158 }
159
160 pub fn dump_error(&self, tick: u64, error: &str) {
162 if !self.dump_on_error.load(Ordering::Relaxed) {
163 return;
164 }
165
166 let mut snapshot = self
167 .collect_snapshot(tick)
168 .unwrap_or_else(|| DebugSnapshot {
169 tick,
170 timestamp_ms: epoch_millis(),
171 data: HashMap::new(),
172 });
173
174 snapshot
175 .data
176 .insert("_error", serde_json::json!(error.to_string()));
177
178 let _ = self.tx.send(snapshot);
179 }
180
181 pub fn subscribe(&self) -> broadcast::Receiver<DebugSnapshot> {
183 self.tx.subscribe()
184 }
185
186 pub fn observer_count(&self) -> usize {
188 self.observers.len()
189 }
190
191 fn collect_snapshot(&self, tick: u64) -> Option<DebugSnapshot> {
193 let mut data = HashMap::new();
194
195 for obs in &self.observers {
196 if let Some(value) = obs.snapshot(tick) {
197 data.insert(obs.name(), value);
198 }
199 }
200
201 if data.is_empty() {
202 return None;
203 }
204
205 Some(DebugSnapshot {
206 tick,
207 timestamp_ms: epoch_millis(),
208 data,
209 })
210 }
211}
212
213impl Default for TickDumper {
214 fn default() -> Self {
215 Self::new(256)
216 }
217}
218
219pub struct StderrDumpSubscriber {
225 rx: broadcast::Receiver<DebugSnapshot>,
226 pretty: bool,
228}
229
230impl StderrDumpSubscriber {
231 pub fn new(rx: broadcast::Receiver<DebugSnapshot>, pretty: bool) -> Self {
232 Self { rx, pretty }
233 }
234
235 pub async fn run(mut self) {
237 while let Ok(snapshot) = self.rx.recv().await {
238 self.print_snapshot(&snapshot);
239 }
240 }
241
242 fn print_snapshot(&self, snapshot: &DebugSnapshot) {
243 eprintln!("=== Tick {} ===", snapshot.tick);
244 if self.pretty {
245 if let Ok(json) = serde_json::to_string_pretty(&snapshot.data) {
246 eprintln!("{}", json);
247 }
248 } else if let Ok(json) = serde_json::to_string(&snapshot.data) {
249 eprintln!("{}", json);
250 }
251 eprintln!();
252 }
253}
254
255#[cfg(test)]
260mod tests {
261 use super::*;
262
263 struct MockDumpable {
264 name: &'static str,
265 value: i32,
266 }
267
268 impl Dumpable for MockDumpable {
269 fn name(&self) -> &'static str {
270 self.name
271 }
272
273 fn snapshot(&self, _tick: u64) -> Option<serde_json::Value> {
274 Some(serde_json::json!({ "value": self.value }))
275 }
276 }
277
278 struct EmptyDumpable;
279
280 impl Dumpable for EmptyDumpable {
281 fn name(&self) -> &'static str {
282 "empty"
283 }
284
285 fn snapshot(&self, _tick: u64) -> Option<serde_json::Value> {
286 None }
288 }
289
290 #[test]
291 fn test_tick_dumper_disabled_by_default() {
292 let dumper = TickDumper::new(16);
293 assert!(!dumper.is_enabled());
294 }
295
296 #[test]
297 fn test_tick_dumper_enable_disable() {
298 let dumper = TickDumper::new(16);
299 dumper.enable();
300 assert!(dumper.is_enabled());
301 dumper.disable();
302 assert!(!dumper.is_enabled());
303 }
304
305 #[test]
306 fn test_tick_dumper_register() {
307 let mut dumper = TickDumper::new(16);
308 assert_eq!(dumper.observer_count(), 0);
309
310 dumper.register(Arc::new(MockDumpable {
311 name: "test",
312 value: 42,
313 }));
314 assert_eq!(dumper.observer_count(), 1);
315 }
316
317 #[tokio::test]
318 async fn test_tick_dumper_dump() {
319 let mut dumper = TickDumper::new(16);
320 dumper.register(Arc::new(MockDumpable {
321 name: "test",
322 value: 42,
323 }));
324 dumper.enable();
325
326 let mut rx = dumper.subscribe();
327
328 dumper.dump(1);
329
330 let snapshot = rx.recv().await.unwrap();
331 assert_eq!(snapshot.tick, 1);
332 assert!(snapshot.data.contains_key("test"));
333 assert_eq!(snapshot.data["test"]["value"], 42);
334 }
335
336 #[tokio::test]
337 async fn test_tick_dumper_empty_snapshot_not_sent() {
338 let mut dumper = TickDumper::new(16);
339 dumper.register(Arc::new(EmptyDumpable));
340 dumper.enable();
341
342 let mut rx = dumper.subscribe();
343
344 dumper.dump(1);
345
346 let result = tokio::time::timeout(std::time::Duration::from_millis(10), rx.recv()).await;
348 assert!(result.is_err()); }
350
351 #[tokio::test]
352 async fn test_tick_dumper_dump_error() {
353 let mut dumper = TickDumper::new(16);
354 dumper.register(Arc::new(MockDumpable {
355 name: "test",
356 value: 42,
357 }));
358 let mut rx = dumper.subscribe();
361
362 dumper.dump_error(1, "test error");
363
364 let snapshot = rx.recv().await.unwrap();
365 assert_eq!(snapshot.tick, 1);
366 assert!(snapshot.data.contains_key("_error"));
367 assert_eq!(snapshot.data["_error"], "test error");
368 }
369}