use serde::Serialize;
use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tokio::sync::broadcast;
use crate::util::epoch_millis;
pub trait Dumpable: Send + Sync {
fn name(&self) -> &'static str;
fn snapshot(&self, tick: u64) -> Option<serde_json::Value>;
}
#[derive(Debug, Clone, Serialize)]
pub struct DebugSnapshot {
pub tick: u64,
pub timestamp_ms: u64,
pub data: HashMap<&'static str, serde_json::Value>,
}
pub struct TickDumper {
observers: Vec<Arc<dyn Dumpable>>,
tx: broadcast::Sender<DebugSnapshot>,
enabled: AtomicBool,
dump_on_error: AtomicBool,
}
impl TickDumper {
pub fn new(capacity: usize) -> Self {
let (tx, _) = broadcast::channel(capacity);
Self {
observers: Vec::new(),
tx,
enabled: AtomicBool::new(false),
dump_on_error: AtomicBool::new(true),
}
}
pub fn register(&mut self, obj: Arc<dyn Dumpable>) {
self.observers.push(obj);
}
pub fn register_all(&mut self, objs: impl IntoIterator<Item = Arc<dyn Dumpable>>) {
for obj in objs {
self.observers.push(obj);
}
}
pub fn enable(&self) {
self.enabled.store(true, Ordering::Relaxed);
}
pub fn disable(&self) {
self.enabled.store(false, Ordering::Relaxed);
}
pub fn is_enabled(&self) -> bool {
self.enabled.load(Ordering::Relaxed)
}
pub fn set_dump_on_error(&self, enabled: bool) {
self.dump_on_error.store(enabled, Ordering::Relaxed);
}
pub fn dump(&self, tick: u64) {
if !self.enabled.load(Ordering::Relaxed) {
return;
}
if let Some(snapshot) = self.collect_snapshot(tick) {
let _ = self.tx.send(snapshot);
}
}
pub fn dump_error(&self, tick: u64, error: &str) {
if !self.dump_on_error.load(Ordering::Relaxed) {
return;
}
let mut snapshot = self
.collect_snapshot(tick)
.unwrap_or_else(|| DebugSnapshot {
tick,
timestamp_ms: epoch_millis(),
data: HashMap::new(),
});
snapshot
.data
.insert("_error", serde_json::json!(error.to_string()));
let _ = self.tx.send(snapshot);
}
pub fn subscribe(&self) -> broadcast::Receiver<DebugSnapshot> {
self.tx.subscribe()
}
pub fn observer_count(&self) -> usize {
self.observers.len()
}
fn collect_snapshot(&self, tick: u64) -> Option<DebugSnapshot> {
let mut data = HashMap::new();
for obs in &self.observers {
if let Some(value) = obs.snapshot(tick) {
data.insert(obs.name(), value);
}
}
if data.is_empty() {
return None;
}
Some(DebugSnapshot {
tick,
timestamp_ms: epoch_millis(),
data,
})
}
}
impl Default for TickDumper {
fn default() -> Self {
Self::new(256)
}
}
pub struct StderrDumpSubscriber {
rx: broadcast::Receiver<DebugSnapshot>,
pretty: bool,
}
impl StderrDumpSubscriber {
pub fn new(rx: broadcast::Receiver<DebugSnapshot>, pretty: bool) -> Self {
Self { rx, pretty }
}
pub async fn run(mut self) {
while let Ok(snapshot) = self.rx.recv().await {
self.print_snapshot(&snapshot);
}
}
fn print_snapshot(&self, snapshot: &DebugSnapshot) {
eprintln!("=== Tick {} ===", snapshot.tick);
if self.pretty {
if let Ok(json) = serde_json::to_string_pretty(&snapshot.data) {
eprintln!("{}", json);
}
} else if let Ok(json) = serde_json::to_string(&snapshot.data) {
eprintln!("{}", json);
}
eprintln!();
}
}
#[cfg(test)]
mod tests {
use super::*;
struct MockDumpable {
name: &'static str,
value: i32,
}
impl Dumpable for MockDumpable {
fn name(&self) -> &'static str {
self.name
}
fn snapshot(&self, _tick: u64) -> Option<serde_json::Value> {
Some(serde_json::json!({ "value": self.value }))
}
}
struct EmptyDumpable;
impl Dumpable for EmptyDumpable {
fn name(&self) -> &'static str {
"empty"
}
fn snapshot(&self, _tick: u64) -> Option<serde_json::Value> {
None }
}
#[test]
fn test_tick_dumper_disabled_by_default() {
let dumper = TickDumper::new(16);
assert!(!dumper.is_enabled());
}
#[test]
fn test_tick_dumper_enable_disable() {
let dumper = TickDumper::new(16);
dumper.enable();
assert!(dumper.is_enabled());
dumper.disable();
assert!(!dumper.is_enabled());
}
#[test]
fn test_tick_dumper_register() {
let mut dumper = TickDumper::new(16);
assert_eq!(dumper.observer_count(), 0);
dumper.register(Arc::new(MockDumpable {
name: "test",
value: 42,
}));
assert_eq!(dumper.observer_count(), 1);
}
#[tokio::test]
async fn test_tick_dumper_dump() {
let mut dumper = TickDumper::new(16);
dumper.register(Arc::new(MockDumpable {
name: "test",
value: 42,
}));
dumper.enable();
let mut rx = dumper.subscribe();
dumper.dump(1);
let snapshot = rx.recv().await.unwrap();
assert_eq!(snapshot.tick, 1);
assert!(snapshot.data.contains_key("test"));
assert_eq!(snapshot.data["test"]["value"], 42);
}
#[tokio::test]
async fn test_tick_dumper_empty_snapshot_not_sent() {
let mut dumper = TickDumper::new(16);
dumper.register(Arc::new(EmptyDumpable));
dumper.enable();
let mut rx = dumper.subscribe();
dumper.dump(1);
let result = tokio::time::timeout(std::time::Duration::from_millis(10), rx.recv()).await;
assert!(result.is_err()); }
#[tokio::test]
async fn test_tick_dumper_dump_error() {
let mut dumper = TickDumper::new(16);
dumper.register(Arc::new(MockDumpable {
name: "test",
value: 42,
}));
let mut rx = dumper.subscribe();
dumper.dump_error(1, "test error");
let snapshot = rx.recv().await.unwrap();
assert_eq!(snapshot.tick, 1);
assert!(snapshot.data.contains_key("_error"));
assert_eq!(snapshot.data["_error"], "test error");
}
}