use std::sync::atomic::{AtomicU64, Ordering};
const ORDER: Ordering = Ordering::Relaxed;
#[derive(Default)]
pub struct Metrics {
exec_success: AtomicU64,
exec_error: AtomicU64,
exec_timeout: AtomicU64,
exec_duration_ms_sum: AtomicU64,
output_truncated: AtomicU64,
sessions_created: AtomicU64,
rejected_concurrency: AtomicU64,
rejected_payload: AtomicU64,
rejected_unauthorized: AtomicU64,
rejected_rate: AtomicU64,
}
pub struct Gauges {
pub sessions_active: u64,
pub sessions_total: u64,
pub exec_in_flight: u64,
pub sessions_disk_bytes: u64,
}
#[derive(serde::Serialize)]
pub struct SessionResourceRow {
pub id: String,
pub disk_bytes: u64,
pub memory_cap_pages: Option<u32>,
}
impl Metrics {
pub fn new() -> Self {
Self::default()
}
pub fn record_exec_success(&self, dur_ms: u64) {
self.exec_success.fetch_add(1, ORDER);
self.exec_duration_ms_sum.fetch_add(dur_ms, ORDER);
}
pub fn record_exec_error(&self, dur_ms: u64) {
self.exec_error.fetch_add(1, ORDER);
self.exec_duration_ms_sum.fetch_add(dur_ms, ORDER);
}
pub fn record_exec_timeout(&self, dur_ms: u64) {
self.exec_timeout.fetch_add(1, ORDER);
self.exec_duration_ms_sum.fetch_add(dur_ms, ORDER);
}
pub fn record_output_truncated(&self) {
self.output_truncated.fetch_add(1, ORDER);
}
pub fn record_session_created(&self) {
self.sessions_created.fetch_add(1, ORDER);
}
pub fn record_rejected_concurrency(&self) {
self.rejected_concurrency.fetch_add(1, ORDER);
}
pub fn record_rejected_payload(&self) {
self.rejected_payload.fetch_add(1, ORDER);
}
pub fn record_rejected_unauthorized(&self) {
self.rejected_unauthorized.fetch_add(1, ORDER);
}
pub fn record_rejected_rate(&self) {
self.rejected_rate.fetch_add(1, ORDER);
}
fn snapshot(&self) -> Snapshot {
Snapshot {
exec_success: self.exec_success.load(ORDER),
exec_error: self.exec_error.load(ORDER),
exec_timeout: self.exec_timeout.load(ORDER),
exec_duration_ms_sum: self.exec_duration_ms_sum.load(ORDER),
output_truncated: self.output_truncated.load(ORDER),
sessions_created: self.sessions_created.load(ORDER),
rejected_concurrency: self.rejected_concurrency.load(ORDER),
rejected_payload: self.rejected_payload.load(ORDER),
rejected_unauthorized: self.rejected_unauthorized.load(ORDER),
rejected_rate: self.rejected_rate.load(ORDER),
}
}
pub fn render_prometheus(&self, g: &Gauges) -> String {
let s = self.snapshot();
let exec_count = s.exec_success + s.exec_error + s.exec_timeout;
let mut out = String::with_capacity(2048);
metric(
&mut out,
"wasmrun_agent_exec_total",
"Total code executions by terminal result.",
"counter",
&[
(&[("result", "success")], s.exec_success),
(&[("result", "error")], s.exec_error),
(&[("result", "timeout")], s.exec_timeout),
],
);
metric(
&mut out,
"wasmrun_agent_exec_duration_ms_sum",
"Sum of execution wall-clock durations in milliseconds.",
"counter",
&[(&[], s.exec_duration_ms_sum)],
);
metric(
&mut out,
"wasmrun_agent_exec_duration_ms_count",
"Count of executions contributing to the duration sum.",
"counter",
&[(&[], exec_count)],
);
metric(
&mut out,
"wasmrun_agent_output_truncated_total",
"Executions whose captured output hit the output cap.",
"counter",
&[(&[], s.output_truncated)],
);
metric(
&mut out,
"wasmrun_agent_sessions_created_total",
"Total sessions created since startup.",
"counter",
&[(&[], s.sessions_created)],
);
metric(
&mut out,
"wasmrun_agent_exec_rejected_total",
"Executions/requests rejected before doing work, by reason.",
"counter",
&[
(&[("reason", "concurrency")], s.rejected_concurrency),
(&[("reason", "payload")], s.rejected_payload),
(&[("reason", "unauthorized")], s.rejected_unauthorized),
(&[("reason", "rate")], s.rejected_rate),
],
);
metric(
&mut out,
"wasmrun_agent_sessions_active",
"Currently active (non-expired) sessions.",
"gauge",
&[(&[], g.sessions_active)],
);
metric(
&mut out,
"wasmrun_agent_sessions_total",
"Total sessions tracked, including expired-but-not-cleaned.",
"gauge",
&[(&[], g.sessions_total)],
);
metric(
&mut out,
"wasmrun_agent_exec_in_flight",
"Exec workers currently running.",
"gauge",
&[(&[], g.exec_in_flight)],
);
metric(
&mut out,
"wasmrun_agent_sessions_disk_bytes",
"Total on-disk footprint across active sessions in bytes.",
"gauge",
&[(&[], g.sessions_disk_bytes)],
);
out
}
pub fn render_json(
&self,
g: &Gauges,
per_session: Option<Vec<SessionResourceRow>>,
) -> serde_json::Value {
let s = self.snapshot();
let exec_count = s.exec_success + s.exec_error + s.exec_timeout;
let mut obj = serde_json::json!({
"exec_total": {
"success": s.exec_success,
"error": s.exec_error,
"timeout": s.exec_timeout,
},
"exec_duration_ms_sum": s.exec_duration_ms_sum,
"exec_duration_ms_count": exec_count,
"output_truncated_total": s.output_truncated,
"sessions_created_total": s.sessions_created,
"exec_rejected_total": {
"concurrency": s.rejected_concurrency,
"payload": s.rejected_payload,
"unauthorized": s.rejected_unauthorized,
"rate": s.rejected_rate,
},
"sessions_active": g.sessions_active,
"sessions_total": g.sessions_total,
"exec_in_flight": g.exec_in_flight,
"sessions_disk_bytes": g.sessions_disk_bytes,
});
if let Some(rows) = per_session {
obj["sessions"] = serde_json::to_value(rows).unwrap_or(serde_json::Value::Null);
}
obj
}
}
struct Snapshot {
exec_success: u64,
exec_error: u64,
exec_timeout: u64,
exec_duration_ms_sum: u64,
output_truncated: u64,
sessions_created: u64,
rejected_concurrency: u64,
rejected_payload: u64,
rejected_unauthorized: u64,
rejected_rate: u64,
}
fn metric(
out: &mut String,
name: &str,
help: &str,
kind: &str,
samples: &[(&[(&str, &str)], u64)],
) {
out.push_str("# HELP ");
out.push_str(name);
out.push(' ');
out.push_str(help);
out.push('\n');
out.push_str("# TYPE ");
out.push_str(name);
out.push(' ');
out.push_str(kind);
out.push('\n');
for (labels, value) in samples {
out.push_str(name);
if !labels.is_empty() {
out.push('{');
for (i, (k, v)) in labels.iter().enumerate() {
if i > 0 {
out.push(',');
}
out.push_str(k);
out.push_str("=\"");
out.push_str(v);
out.push('"');
}
out.push('}');
}
out.push(' ');
out.push_str(&value.to_string());
out.push('\n');
}
}
#[cfg(test)]
mod tests {
use super::*;
fn gauges() -> Gauges {
Gauges {
sessions_active: 2,
sessions_total: 3,
exec_in_flight: 1,
sessions_disk_bytes: 4096,
}
}
#[test]
fn counters_increment() {
let m = Metrics::new();
m.record_exec_success(10);
m.record_exec_success(20);
m.record_exec_error(5);
m.record_exec_timeout(30);
m.record_output_truncated();
m.record_session_created();
m.record_rejected_concurrency();
m.record_rejected_payload();
m.record_rejected_unauthorized();
m.record_rejected_rate();
m.record_rejected_rate();
let s = m.snapshot();
assert_eq!(s.exec_success, 2);
assert_eq!(s.exec_error, 1);
assert_eq!(s.exec_timeout, 1);
assert_eq!(s.exec_duration_ms_sum, 65); assert_eq!(s.output_truncated, 1);
assert_eq!(s.sessions_created, 1);
assert_eq!(s.rejected_concurrency, 1);
assert_eq!(s.rejected_payload, 1);
assert_eq!(s.rejected_unauthorized, 1);
assert_eq!(s.rejected_rate, 2);
let g = gauges();
assert!(m
.render_prometheus(&g)
.contains("wasmrun_agent_exec_rejected_total{reason=\"rate\"} 2"));
assert_eq!(m.render_json(&g, None)["exec_rejected_total"]["rate"], 2);
}
#[test]
fn prometheus_render_is_well_formed() {
let m = Metrics::new();
m.record_exec_success(40);
m.record_exec_error(10);
let text = m.render_prometheus(&gauges());
let help = text.matches("# HELP ").count();
let typ = text.matches("# TYPE ").count();
assert_eq!(help, typ);
assert!(help >= 10);
assert!(text.contains("wasmrun_agent_exec_total{result=\"success\"} 1"));
assert!(text.contains("wasmrun_agent_exec_total{result=\"error\"} 1"));
assert!(text.contains("wasmrun_agent_exec_duration_ms_sum 50"));
assert!(text.contains("wasmrun_agent_exec_duration_ms_count 2"));
assert!(text.contains("wasmrun_agent_sessions_active 2"));
assert!(text.contains("wasmrun_agent_exec_in_flight 1"));
assert!(text.contains("wasmrun_agent_sessions_disk_bytes 4096"));
for line in text.lines() {
if line.starts_with('#') || line.is_empty() {
continue;
}
let last = line.rsplit(' ').next().unwrap();
assert!(last.parse::<u64>().is_ok(), "bad sample line: {line}");
}
}
#[test]
fn json_render_shape() {
let m = Metrics::new();
m.record_exec_success(40);
m.record_exec_timeout(60);
let v = m.render_json(&gauges(), None);
assert_eq!(v["exec_total"]["success"], 1);
assert_eq!(v["exec_total"]["timeout"], 1);
assert_eq!(v["exec_duration_ms_sum"], 100);
assert_eq!(v["exec_duration_ms_count"], 2);
assert_eq!(v["sessions_active"], 2);
assert_eq!(v["sessions_disk_bytes"], 4096);
assert!(v.get("sessions").is_none());
}
#[test]
fn json_render_includes_per_session_when_present() {
let m = Metrics::new();
let rows = vec![SessionResourceRow {
id: "abc".into(),
disk_bytes: 1024,
memory_cap_pages: Some(4096),
}];
let v = m.render_json(&gauges(), Some(rows));
assert_eq!(v["sessions"][0]["id"], "abc");
assert_eq!(v["sessions"][0]["disk_bytes"], 1024);
assert_eq!(v["sessions"][0]["memory_cap_pages"], 4096);
}
}