use std::{
sync::{
OnceLock,
atomic::{AtomicU64, Ordering},
},
thread,
time::Duration,
};
use dashmap::DashMap;
use myko::wire::MykoMessage;
const WINDOW_MS: u64 = 250;
fn in_counts() -> &'static DashMap<&'static str, AtomicU64> {
static C: OnceLock<DashMap<&'static str, AtomicU64>> = OnceLock::new();
C.get_or_init(DashMap::new)
}
fn out_counts() -> &'static DashMap<&'static str, AtomicU64> {
static C: OnceLock<DashMap<&'static str, AtomicU64>> = OnceLock::new();
C.get_or_init(DashMap::new)
}
pub fn record_inbound(kind: &'static str) {
in_counts()
.entry(kind)
.or_insert_with(|| AtomicU64::new(0))
.fetch_add(1, Ordering::Relaxed);
}
pub fn record_outbound(kind: &'static str) {
out_counts()
.entry(kind)
.or_insert_with(|| AtomicU64::new(0))
.fetch_add(1, Ordering::Relaxed);
}
pub fn message_kind(msg: &MykoMessage) -> &'static str {
match msg {
MykoMessage::Query(_) => "Query",
MykoMessage::QueryResponse(_) => "QueryResponse",
MykoMessage::QueryCancel(_) => "QueryCancel",
MykoMessage::QueryWindow(_) => "QueryWindow",
MykoMessage::QueryError(_) => "QueryError",
MykoMessage::View(_) => "View",
MykoMessage::ViewResponse(_) => "ViewResponse",
MykoMessage::ViewCancel(_) => "ViewCancel",
MykoMessage::ViewWindow(_) => "ViewWindow",
MykoMessage::ViewError(_) => "ViewError",
MykoMessage::Report(_) => "Report",
MykoMessage::ReportResponse(_) => "ReportResponse",
MykoMessage::ReportCancel(_) => "ReportCancel",
MykoMessage::ReportError(_) => "ReportError",
MykoMessage::Event(_) => "Event",
MykoMessage::EventBatch(_) => "EventBatch",
MykoMessage::Command(_) => "Command",
MykoMessage::CommandResponse(_) => "CommandResponse",
MykoMessage::CommandError(_) => "CommandError",
MykoMessage::Ping(_) => "Ping",
MykoMessage::Benchmark(_) => "Benchmark",
}
}
pub fn start_periodic_logger() {
static STARTED: OnceLock<()> = OnceLock::new();
if STARTED.set(()).is_err() {
return;
}
let _ = thread::Builder::new()
.name("myko-ws-timing".to_string())
.spawn(run_logger_loop)
.map_err(|e| {
log::warn!(
target: "myko_server::ws_timing",
"Failed to spawn ws_timing thread: {}", e
)
});
}
fn run_logger_loop() {
loop {
thread::sleep(Duration::from_millis(WINDOW_MS));
let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(emit_window));
}
}
fn emit_window() {
let in_snap = drain_counts(in_counts());
let out_snap = drain_counts(out_counts());
if in_snap.is_empty() && out_snap.is_empty() {
return;
}
let in_total: u64 = in_snap.iter().map(|(_, n)| *n).sum();
let out_total: u64 = out_snap.iter().map(|(_, n)| *n).sum();
log::info!(
target: "myko_server::ws_timing",
"[ws_timing window={}ms] in={} [{}] out={} [{}]",
WINDOW_MS,
in_total,
format_kinds(&in_snap),
out_total,
format_kinds(&out_snap),
);
}
fn drain_counts(counts: &DashMap<&'static str, AtomicU64>) -> Vec<(&'static str, u64)> {
let mut out: Vec<(&'static str, u64)> = counts
.iter()
.filter_map(|e| {
let n = e.value().swap(0, Ordering::Relaxed);
if n == 0 { None } else { Some((*e.key(), n)) }
})
.collect();
out.sort_by_key(|b| std::cmp::Reverse(b.1));
out
}
fn format_kinds(snap: &[(&'static str, u64)]) -> String {
snap.iter()
.map(|(k, n)| format!("{}={}", k, n))
.collect::<Vec<_>>()
.join(", ")
}