use serde_json::{json, Value};
use crate::hub::HubStatsSnapshot;
pub fn format_prometheus(stats: &HubStatsSnapshot) -> String {
let mut lines = Vec::with_capacity(20);
lines.push(
"# HELP bext_hub_active_connections Number of active subscriber connections".to_string(),
);
lines.push("# TYPE bext_hub_active_connections gauge".to_string());
lines.push(format!(
"bext_hub_active_connections {}",
stats.active_connections
));
lines.push("# HELP bext_hub_total_published Total events published since startup".to_string());
lines.push("# TYPE bext_hub_total_published counter".to_string());
lines.push(format!(
"bext_hub_total_published {}",
stats.total_published
));
lines.push("# HELP bext_hub_total_delivered Total events delivered to subscribers".to_string());
lines.push("# TYPE bext_hub_total_delivered counter".to_string());
lines.push(format!(
"bext_hub_total_delivered {}",
stats.total_delivered
));
lines.push(
"# HELP bext_hub_topic_count Number of distinct topic patterns with subscribers"
.to_string(),
);
lines.push("# TYPE bext_hub_topic_count gauge".to_string());
lines.push(format!("bext_hub_topic_count {}", stats.topic_count));
lines.push("# HELP bext_hub_subscriber_count Number of registered subscribers".to_string());
lines.push("# TYPE bext_hub_subscriber_count gauge".to_string());
lines.push(format!(
"bext_hub_subscriber_count {}",
stats.subscriber_count
));
lines.push("# HELP bext_hub_uptime_seconds Time since hub was created".to_string());
lines.push("# TYPE bext_hub_uptime_seconds gauge".to_string());
lines.push(format!("bext_hub_uptime_seconds {:.3}", stats.uptime_secs));
let delivery_ratio = if stats.total_published > 0 {
stats.total_delivered as f64 / stats.total_published as f64
} else {
0.0
};
lines.push("# HELP bext_hub_delivery_ratio Ratio of delivered to published events".to_string());
lines.push("# TYPE bext_hub_delivery_ratio gauge".to_string());
lines.push(format!("bext_hub_delivery_ratio {:.4}", delivery_ratio));
let msgs_per_sec = if stats.uptime_secs > 0.0 {
stats.total_published as f64 / stats.uptime_secs
} else {
0.0
};
lines.push("# HELP bext_hub_messages_per_second Average message throughput".to_string());
lines.push("# TYPE bext_hub_messages_per_second gauge".to_string());
lines.push(format!("bext_hub_messages_per_second {:.4}", msgs_per_sec));
lines.join("\n") + "\n"
}
pub fn format_json(stats: &HubStatsSnapshot) -> Value {
let delivery_ratio = if stats.total_published > 0 {
stats.total_delivered as f64 / stats.total_published as f64
} else {
0.0
};
let msgs_per_sec = if stats.uptime_secs > 0.0 {
stats.total_published as f64 / stats.uptime_secs
} else {
0.0
};
json!({
"active_connections": stats.active_connections,
"total_published": stats.total_published,
"total_delivered": stats.total_delivered,
"topic_count": stats.topic_count,
"subscriber_count": stats.subscriber_count,
"uptime_secs": stats.uptime_secs,
"delivery_ratio": delivery_ratio,
"messages_per_second": msgs_per_sec,
})
}
#[derive(Debug, Default)]
pub struct TopicStats {
counts: dashmap::DashMap<String, u64>,
}
impl TopicStats {
pub fn new() -> Self {
Self {
counts: dashmap::DashMap::new(),
}
}
pub fn record(&self, topic: &str) {
self.counts
.entry(topic.to_string())
.and_modify(|c| *c += 1)
.or_insert(1);
}
pub fn get(&self, topic: &str) -> u64 {
self.counts.get(topic).map(|v| *v).unwrap_or(0)
}
pub fn snapshot(&self) -> Vec<(String, u64)> {
let mut entries: Vec<(String, u64)> = self
.counts
.iter()
.map(|e| (e.key().clone(), *e.value()))
.collect();
entries.sort_by(|a, b| b.1.cmp(&a.1)); entries
}
pub fn format_prometheus(&self) -> String {
let snapshot = self.snapshot();
if snapshot.is_empty() {
return String::new();
}
let mut lines = Vec::new();
lines.push("# HELP bext_hub_topic_messages Total messages per topic".to_string());
lines.push("# TYPE bext_hub_topic_messages counter".to_string());
for (topic, count) in &snapshot {
let escaped = topic.replace('\\', "\\\\").replace('"', "\\\"");
lines.push(format!(
"bext_hub_topic_messages{{topic=\"{}\"}} {}",
escaped, count
));
}
lines.join("\n") + "\n"
}
pub fn format_json(&self) -> Value {
let snapshot = self.snapshot();
let obj: serde_json::Map<String, Value> = snapshot
.into_iter()
.map(|(k, v)| (k, Value::Number(v.into())))
.collect();
Value::Object(obj)
}
pub fn reset(&self) {
self.counts.clear();
}
pub fn len(&self) -> usize {
self.counts.len()
}
pub fn is_empty(&self) -> bool {
self.counts.is_empty()
}
}
#[derive(Debug)]
pub struct DurationHistogram {
buckets: Vec<f64>,
counts: Vec<std::sync::atomic::AtomicU64>,
total_count: std::sync::atomic::AtomicU64,
total_sum: parking_lot::Mutex<f64>,
}
impl DurationHistogram {
pub fn new() -> Self {
Self::with_buckets(vec![
1.0, 5.0, 10.0, 30.0, 60.0, 300.0, 600.0, 1800.0, 3600.0,
])
}
pub fn with_buckets(buckets: Vec<f64>) -> Self {
let counts = buckets
.iter()
.map(|_| std::sync::atomic::AtomicU64::new(0))
.collect();
Self {
buckets,
counts,
total_count: std::sync::atomic::AtomicU64::new(0),
total_sum: parking_lot::Mutex::new(0.0),
}
}
pub fn observe(&self, duration_secs: f64) {
self.total_count
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
{
let mut sum = self.total_sum.lock();
*sum += duration_secs;
}
for (i, boundary) in self.buckets.iter().enumerate() {
if duration_secs <= *boundary {
self.counts[i].fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
}
}
pub fn format_prometheus(&self, name: &str) -> String {
let mut lines = Vec::new();
lines.push(format!("# HELP {} Connection duration histogram", name));
lines.push(format!("# TYPE {} histogram", name));
for (i, boundary) in self.buckets.iter().enumerate() {
let count = self.counts[i].load(std::sync::atomic::Ordering::Relaxed);
lines.push(format!(
"{}_bucket{{le=\"{:.1}\"}} {}",
name, boundary, count
));
}
let total = self.total_count.load(std::sync::atomic::Ordering::Relaxed);
lines.push(format!("{}_bucket{{le=\"+Inf\"}} {}", name, total));
lines.push(format!("{}_count {}", name, total));
let sum = *self.total_sum.lock();
lines.push(format!("{}_sum {:.3}", name, sum));
lines.join("\n") + "\n"
}
pub fn count(&self) -> u64 {
self.total_count.load(std::sync::atomic::Ordering::Relaxed)
}
pub fn sum(&self) -> f64 {
*self.total_sum.lock()
}
}
impl Default for DurationHistogram {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::hub::HubStatsSnapshot;
fn sample_stats() -> HubStatsSnapshot {
HubStatsSnapshot {
active_connections: 42,
total_published: 1000,
total_delivered: 4500,
topic_count: 15,
subscriber_count: 42,
uptime_secs: 3600.5,
}
}
#[test]
fn prometheus_contains_all_metrics() {
let output = format_prometheus(&sample_stats());
assert!(output.contains("bext_hub_active_connections 42"));
assert!(output.contains("bext_hub_total_published 1000"));
assert!(output.contains("bext_hub_total_delivered 4500"));
assert!(output.contains("bext_hub_topic_count 15"));
assert!(output.contains("bext_hub_subscriber_count 42"));
assert!(output.contains("bext_hub_uptime_seconds 3600.500"));
}
#[test]
fn prometheus_contains_type_declarations() {
let output = format_prometheus(&sample_stats());
assert!(output.contains("# TYPE bext_hub_active_connections gauge"));
assert!(output.contains("# TYPE bext_hub_total_published counter"));
assert!(output.contains("# TYPE bext_hub_total_delivered counter"));
}
#[test]
fn prometheus_contains_help_text() {
let output = format_prometheus(&sample_stats());
assert!(output.contains("# HELP bext_hub_active_connections"));
assert!(output.contains("# HELP bext_hub_total_published"));
}
#[test]
fn prometheus_delivery_ratio() {
let output = format_prometheus(&sample_stats());
assert!(output.contains("bext_hub_delivery_ratio 4.5000"));
}
#[test]
fn prometheus_delivery_ratio_zero_published() {
let stats = HubStatsSnapshot {
total_published: 0,
total_delivered: 0,
..sample_stats()
};
let output = format_prometheus(&stats);
assert!(output.contains("bext_hub_delivery_ratio 0.0000"));
}
#[test]
fn prometheus_messages_per_second() {
let output = format_prometheus(&sample_stats());
assert!(output.contains("bext_hub_messages_per_second"));
}
#[test]
fn prometheus_ends_with_newline() {
let output = format_prometheus(&sample_stats());
assert!(output.ends_with('\n'));
}
#[test]
fn json_contains_all_fields() {
let output = format_json(&sample_stats());
assert_eq!(output["active_connections"], 42);
assert_eq!(output["total_published"], 1000);
assert_eq!(output["total_delivered"], 4500);
assert_eq!(output["topic_count"], 15);
assert_eq!(output["subscriber_count"], 42);
}
#[test]
fn json_delivery_ratio() {
let output = format_json(&sample_stats());
let ratio = output["delivery_ratio"].as_f64().unwrap();
assert!((ratio - 4.5).abs() < 0.001);
}
#[test]
fn json_messages_per_second() {
let output = format_json(&sample_stats());
let mps = output["messages_per_second"].as_f64().unwrap();
assert!(mps > 0.0);
}
#[test]
fn json_zero_uptime() {
let stats = HubStatsSnapshot {
uptime_secs: 0.0,
..sample_stats()
};
let output = format_json(&stats);
let mps = output["messages_per_second"].as_f64().unwrap();
assert_eq!(mps, 0.0);
}
#[test]
fn topic_stats_record_and_get() {
let ts = TopicStats::new();
ts.record("app/events");
ts.record("app/events");
ts.record("system/deploy");
assert_eq!(ts.get("app/events"), 2);
assert_eq!(ts.get("system/deploy"), 1);
assert_eq!(ts.get("nonexistent"), 0);
}
#[test]
fn topic_stats_snapshot_sorted() {
let ts = TopicStats::new();
ts.record("a");
ts.record("b");
ts.record("b");
ts.record("c");
ts.record("c");
ts.record("c");
let snap = ts.snapshot();
assert_eq!(snap[0], ("c".to_string(), 3));
assert_eq!(snap[1], ("b".to_string(), 2));
assert_eq!(snap[2], ("a".to_string(), 1));
}
#[test]
fn topic_stats_len_and_empty() {
let ts = TopicStats::new();
assert!(ts.is_empty());
assert_eq!(ts.len(), 0);
ts.record("a");
assert!(!ts.is_empty());
assert_eq!(ts.len(), 1);
}
#[test]
fn topic_stats_reset() {
let ts = TopicStats::new();
ts.record("a");
ts.record("b");
ts.reset();
assert!(ts.is_empty());
assert_eq!(ts.get("a"), 0);
}
#[test]
fn topic_stats_prometheus_format() {
let ts = TopicStats::new();
ts.record("app/events");
ts.record("app/events");
let output = ts.format_prometheus();
assert!(output.contains("bext_hub_topic_messages{topic=\"app/events\"} 2"));
assert!(output.contains("# TYPE bext_hub_topic_messages counter"));
}
#[test]
fn topic_stats_prometheus_empty() {
let ts = TopicStats::new();
let output = ts.format_prometheus();
assert!(output.is_empty());
}
#[test]
fn topic_stats_json_format() {
let ts = TopicStats::new();
ts.record("a");
ts.record("b");
ts.record("b");
let json = ts.format_json();
assert_eq!(json["a"], 1);
assert_eq!(json["b"], 2);
}
#[test]
fn histogram_observe_updates_count() {
let h = DurationHistogram::new();
h.observe(5.0);
h.observe(10.0);
assert_eq!(h.count(), 2);
}
#[test]
fn histogram_observe_updates_sum() {
let h = DurationHistogram::new();
h.observe(5.0);
h.observe(10.5);
assert!((h.sum() - 15.5).abs() < 0.001);
}
#[test]
fn histogram_buckets_cumulative() {
let h = DurationHistogram::with_buckets(vec![1.0, 5.0, 10.0]);
h.observe(0.5); h.observe(3.0); h.observe(7.0);
let output = h.format_prometheus("test_hist");
assert!(output.contains("test_hist_bucket{le=\"1.0\"} 1"));
assert!(output.contains("test_hist_bucket{le=\"5.0\"} 2"));
assert!(output.contains("test_hist_bucket{le=\"10.0\"} 3"));
assert!(output.contains("test_hist_bucket{le=\"+Inf\"} 3"));
assert!(output.contains("test_hist_count 3"));
}
#[test]
fn histogram_prometheus_format() {
let h = DurationHistogram::new();
h.observe(30.0);
let output = h.format_prometheus("bext_connection_duration");
assert!(output.contains("# TYPE bext_connection_duration histogram"));
assert!(output.contains("bext_connection_duration_count 1"));
assert!(output.contains("bext_connection_duration_sum 30.000"));
}
#[test]
fn histogram_empty() {
let h = DurationHistogram::new();
assert_eq!(h.count(), 0);
assert!((h.sum()).abs() < 0.001);
}
#[test]
fn histogram_value_exceeds_all_buckets() {
let h = DurationHistogram::with_buckets(vec![1.0, 5.0]);
h.observe(100.0);
let output = h.format_prometheus("test");
assert!(output.contains("test_bucket{le=\"1.0\"} 0"));
assert!(output.contains("test_bucket{le=\"5.0\"} 0"));
assert!(output.contains("test_bucket{le=\"+Inf\"} 1"));
}
}