use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
#[derive(Debug, Default)]
pub struct Metrics {
pub files_received_total: AtomicU64,
pub bytes_received_total: AtomicU64,
pub upload_errors_total: AtomicU64,
pub ws_connections_total: AtomicU64,
active_ws_connections: AtomicU64,
active_transfers: AtomicU64,
queue_depth: AtomicU64,
pub files_sent_total: AtomicU64,
pub bytes_sent_total: AtomicU64,
pub send_errors_total: AtomicU64,
pub files_sent_http: AtomicU64,
pub files_sent_quic: AtomicU64,
pub hist_lt_1kb: AtomicU64,
pub hist_1kb_64kb: AtomicU64,
pub hist_64kb_1mb: AtomicU64,
pub hist_1mb_100mb: AtomicU64,
pub hist_gt_100mb: AtomicU64,
}
impl Metrics {
pub fn new() -> Arc<Self> {
Arc::new(Self::default())
}
pub fn inc_files_received(&self) {
self.files_received_total.fetch_add(1, Ordering::Relaxed);
}
pub fn add_bytes_received(&self, n: u64) {
self.bytes_received_total.fetch_add(n, Ordering::Relaxed);
}
pub fn inc_upload_errors(&self) {
self.upload_errors_total.fetch_add(1, Ordering::Relaxed);
}
pub fn inc_ws_connections(&self) {
self.ws_connections_total.fetch_add(1, Ordering::Relaxed);
self.active_ws_connections.fetch_add(1, Ordering::Relaxed);
}
pub fn inc_active_transfers(&self) {
self.active_transfers.fetch_add(1, Ordering::Relaxed);
}
pub fn dec_active_transfers(&self) {
let _ = self
.active_transfers
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |v| {
Some(v.saturating_sub(1))
});
}
pub fn active_transfers(&self) -> u64 {
self.active_transfers.load(Ordering::Relaxed)
}
pub fn set_queue_depth(&self, n: u64) {
self.queue_depth.store(n, Ordering::Relaxed);
}
pub fn queue_depth(&self) -> u64 {
self.queue_depth.load(Ordering::Relaxed)
}
pub fn inc_files_sent(&self, protocol: &str) {
self.files_sent_total.fetch_add(1, Ordering::Relaxed);
match protocol {
"quic" => {
self.files_sent_quic.fetch_add(1, Ordering::Relaxed);
}
_ => {
self.files_sent_http.fetch_add(1, Ordering::Relaxed);
}
}
}
pub fn add_bytes_sent(&self, n: u64) {
self.bytes_sent_total.fetch_add(n, Ordering::Relaxed);
}
pub fn inc_send_errors(&self) {
self.send_errors_total.fetch_add(1, Ordering::Relaxed);
}
pub fn observe_file_size(&self, bytes: u64) {
if bytes < 1_024 {
self.hist_lt_1kb.fetch_add(1, Ordering::Relaxed);
} else if bytes < 64 * 1_024 {
self.hist_1kb_64kb.fetch_add(1, Ordering::Relaxed);
} else if bytes < 1_024 * 1_024 {
self.hist_64kb_1mb.fetch_add(1, Ordering::Relaxed);
} else if bytes < 100 * 1_024 * 1_024 {
self.hist_1mb_100mb.fetch_add(1, Ordering::Relaxed);
} else {
self.hist_gt_100mb.fetch_add(1, Ordering::Relaxed);
}
}
pub fn dec_ws_connections(&self) {
let _ =
self.active_ws_connections
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |v| {
Some(v.saturating_sub(1))
});
}
pub fn active_ws(&self) -> u64 {
self.active_ws_connections.load(Ordering::Relaxed)
}
pub fn render(&self, free_bytes: u64, total_bytes: u64) -> String {
let files_received = self.files_received_total.load(Ordering::Relaxed);
let bytes_received = self.bytes_received_total.load(Ordering::Relaxed);
let upload_errors = self.upload_errors_total.load(Ordering::Relaxed);
let ws_total = self.ws_connections_total.load(Ordering::Relaxed);
let ws_active = self.active_ws();
let mut out = String::with_capacity(512);
out.push_str(
"# HELP aerosync_files_received_total Total number of successfully received files\n",
);
out.push_str("# TYPE aerosync_files_received_total counter\n");
out.push_str(&format!(
"aerosync_files_received_total {}\n",
files_received
));
out.push_str("# HELP aerosync_bytes_received_total Total bytes received\n");
out.push_str("# TYPE aerosync_bytes_received_total counter\n");
out.push_str(&format!(
"aerosync_bytes_received_total {}\n",
bytes_received
));
out.push_str("# HELP aerosync_upload_errors_total Total number of upload errors\n");
out.push_str("# TYPE aerosync_upload_errors_total counter\n");
out.push_str(&format!("aerosync_upload_errors_total {}\n", upload_errors));
out.push_str("# HELP aerosync_ws_connections_total Total WebSocket connections accepted\n");
out.push_str("# TYPE aerosync_ws_connections_total counter\n");
out.push_str(&format!("aerosync_ws_connections_total {}\n", ws_total));
out.push_str(
"# HELP aerosync_ws_active_connections Current active WebSocket connections\n",
);
out.push_str("# TYPE aerosync_ws_active_connections gauge\n");
out.push_str(&format!("aerosync_ws_active_connections {}\n", ws_active));
out.push_str(
"# HELP aerosync_disk_free_bytes Free disk space in bytes at the receive directory\n",
);
out.push_str("# TYPE aerosync_disk_free_bytes gauge\n");
out.push_str(&format!("aerosync_disk_free_bytes {}\n", free_bytes));
out.push_str("# HELP aerosync_disk_total_bytes Total disk capacity in bytes at the receive directory\n");
out.push_str("# TYPE aerosync_disk_total_bytes gauge\n");
out.push_str(&format!("aerosync_disk_total_bytes {}\n", total_bytes));
let active_tr = self.active_transfers();
out.push_str(
"# HELP aerosync_active_transfers Current number of in-progress file transfers\n",
);
out.push_str("# TYPE aerosync_active_transfers gauge\n");
out.push_str(&format!("aerosync_active_transfers {}\n", active_tr));
let qd = self.queue_depth();
out.push_str("# HELP aerosync_queue_depth Number of transfers waiting in queue\n");
out.push_str("# TYPE aerosync_queue_depth gauge\n");
out.push_str(&format!("aerosync_queue_depth {}\n", qd));
let files_sent = self.files_sent_total.load(Ordering::Relaxed);
out.push_str("# HELP aerosync_files_sent_total Total files successfully sent\n");
out.push_str("# TYPE aerosync_files_sent_total counter\n");
out.push_str(&format!("aerosync_files_sent_total {}\n", files_sent));
let bytes_sent = self.bytes_sent_total.load(Ordering::Relaxed);
out.push_str("# HELP aerosync_bytes_sent_total Total bytes successfully sent\n");
out.push_str("# TYPE aerosync_bytes_sent_total counter\n");
out.push_str(&format!("aerosync_bytes_sent_total {}\n", bytes_sent));
let send_errors = self.send_errors_total.load(Ordering::Relaxed);
out.push_str("# HELP aerosync_send_errors_total Total send-side errors\n");
out.push_str("# TYPE aerosync_send_errors_total counter\n");
out.push_str(&format!("aerosync_send_errors_total {}\n", send_errors));
let sent_http = self.files_sent_http.load(Ordering::Relaxed);
let sent_quic = self.files_sent_quic.load(Ordering::Relaxed);
out.push_str("# HELP aerosync_files_sent_by_protocol Total files sent per protocol\n");
out.push_str("# TYPE aerosync_files_sent_by_protocol counter\n");
out.push_str(&format!(
"aerosync_files_sent_by_protocol{{protocol=\"http\"}} {}\n",
sent_http
));
out.push_str(&format!(
"aerosync_files_sent_by_protocol{{protocol=\"quic\"}} {}\n",
sent_quic
));
let h0 = self.hist_lt_1kb.load(Ordering::Relaxed);
let h1 = self.hist_1kb_64kb.load(Ordering::Relaxed);
let h2 = self.hist_64kb_1mb.load(Ordering::Relaxed);
let h3 = self.hist_1mb_100mb.load(Ordering::Relaxed);
let h4 = self.hist_gt_100mb.load(Ordering::Relaxed);
out.push_str(
"# HELP aerosync_file_size_bucket File size distribution (count per bucket)\n",
);
out.push_str("# TYPE aerosync_file_size_bucket gauge\n");
out.push_str(&format!(
"aerosync_file_size_bucket{{le=\"1024\"}} {}\n",
h0
));
out.push_str(&format!(
"aerosync_file_size_bucket{{le=\"65536\"}} {}\n",
h1
));
out.push_str(&format!(
"aerosync_file_size_bucket{{le=\"1048576\"}} {}\n",
h2
));
out.push_str(&format!(
"aerosync_file_size_bucket{{le=\"104857600\"}} {}\n",
h3
));
out.push_str(&format!(
"aerosync_file_size_bucket{{le=\"+Inf\"}} {}\n",
h4
));
out
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_render_format() {
let m = Metrics::new();
let output = m.render(1024, 4096);
assert!(output.contains("aerosync_files_received_total 0"));
assert!(output.contains("aerosync_bytes_received_total 0"));
assert!(output.contains("aerosync_upload_errors_total 0"));
assert!(output.contains("aerosync_disk_free_bytes 1024"));
assert!(output.contains("aerosync_disk_total_bytes 4096"));
let help_count = output.matches("# HELP").count();
let type_count = output.matches("# TYPE").count();
assert_eq!(help_count, type_count, "HELP and TYPE counts must match");
assert!(help_count >= 5);
}
#[test]
fn test_counters() {
let m = Metrics::new();
m.inc_files_received();
m.inc_files_received();
m.add_bytes_received(512);
m.inc_upload_errors();
assert_eq!(m.files_received_total.load(Ordering::Relaxed), 2);
assert_eq!(m.bytes_received_total.load(Ordering::Relaxed), 512);
assert_eq!(m.upload_errors_total.load(Ordering::Relaxed), 1);
}
#[test]
fn test_render_reflects_counters() {
let m = Metrics::new();
m.inc_files_received();
m.add_bytes_received(1000);
let output = m.render(0, 0);
assert!(output.contains("aerosync_files_received_total 1"));
assert!(output.contains("aerosync_bytes_received_total 1000"));
}
#[test]
fn test_render_disk_gauges() {
let m = Metrics::new();
let free = 10_000_000_000u64;
let total = 100_000_000_000u64;
let output = m.render(free, total);
assert!(output.contains(&format!("aerosync_disk_free_bytes {}", free)));
assert!(output.contains(&format!("aerosync_disk_total_bytes {}", total)));
}
#[test]
fn test_ws_connection_tracking() {
let m = Metrics::new();
m.inc_ws_connections();
m.inc_ws_connections();
assert_eq!(m.active_ws(), 2);
assert_eq!(m.ws_connections_total.load(Ordering::Relaxed), 2);
m.dec_ws_connections();
assert_eq!(m.active_ws(), 1);
assert_eq!(m.ws_connections_total.load(Ordering::Relaxed), 2);
}
#[test]
fn test_ws_dec_saturates() {
let m = Metrics::new();
m.dec_ws_connections(); assert_eq!(m.active_ws(), 0);
}
}