use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
use std::sync::{Arc, RwLock};
#[derive(Clone)]
pub struct CounterVec {
inner: Arc<RwLock<CounterVecInner>>,
name: Arc<String>,
help: Arc<String>,
}
struct CounterVecInner {
counters: HashMap<Vec<String>, Arc<AtomicU64>>,
}
impl CounterVec {
pub fn new(name: &str, help: &str) -> Self {
Self {
inner: Arc::new(RwLock::new(CounterVecInner {
counters: HashMap::new(),
})),
name: Arc::new(name.to_string()),
help: Arc::new(help.to_string()),
}
}
pub fn inc(&self, labels: &[&str]) {
let label_key: Vec<String> = labels.iter().map(|s| s.to_string()).collect();
if let Ok(map) = self.inner.read()
&& let Some(counter) = map.counters.get(&label_key)
{
counter.fetch_add(1, Ordering::Relaxed);
return;
}
if let Ok(mut map) = self.inner.write() {
let counter = map
.counters
.entry(label_key)
.or_insert_with(|| Arc::new(AtomicU64::new(0)));
counter.fetch_add(1, Ordering::Relaxed);
}
}
pub fn gather(&self) -> Vec<(Vec<String>, u64)> {
let mut result = Vec::new();
if let Ok(map) = self.inner.read() {
for (labels, counter) in &map.counters {
result.push((labels.clone(), counter.load(Ordering::Relaxed)));
}
}
result
}
pub fn name(&self) -> &str {
&self.name
}
pub fn help(&self) -> &str {
&self.help
}
}
#[derive(Clone)]
pub struct IntGauge {
value: Arc<AtomicI64>,
name: Arc<String>,
help: Arc<String>,
}
impl IntGauge {
pub fn new(name: &str, help: &str) -> Self {
Self {
value: Arc::new(AtomicI64::new(0)),
name: Arc::new(name.to_string()),
help: Arc::new(help.to_string()),
}
}
pub fn set(&self, val: i64) {
self.value.store(val, Ordering::Relaxed);
}
pub fn inc(&self) {
self.value.fetch_add(1, Ordering::Relaxed);
}
pub fn dec(&self) {
self.value.fetch_sub(1, Ordering::Relaxed);
}
pub fn get(&self) -> i64 {
self.value.load(Ordering::Relaxed)
}
pub fn name(&self) -> &str {
&self.name
}
pub fn help(&self) -> &str {
&self.help
}
}
#[derive(Clone)]
pub struct MetricsRegistry {
events_total: CounterVec,
subscribers: IntGauge,
monitored_paths: IntGauge,
reader_groups: IntGauge,
pending_paths: IntGauge,
disk_buffer_events: IntGauge,
}
impl Default for MetricsRegistry {
fn default() -> Self {
Self::new()
}
}
impl MetricsRegistry {
pub fn new() -> Self {
Self {
events_total: CounterVec::new(
"fsmon_events_total",
"Total file system events processed by fsmon",
),
subscribers: IntGauge::new(
"fsmon_subscribers",
"Current number of active subscribe connections",
),
monitored_paths: IntGauge::new(
"fsmon_monitored_paths",
"Current number of monitored path entries",
),
reader_groups: IntGauge::new(
"fsmon_reader_groups",
"Current number of fanotify fd groups",
),
pending_paths: IntGauge::new(
"fsmon_pending_paths",
"Current number of paths pending creation",
),
disk_buffer_events: IntGauge::new(
"fsmon_disk_buffer_events",
"Current number of buffered events (disk full)",
),
}
}
pub fn inc_event(&self, event_type: &str, cmd: &str) {
self.events_total.inc(&[event_type, cmd]);
}
pub fn set_subscribers(&self, n: i64) {
self.subscribers.set(n);
}
pub fn inc_subscribers(&self) {
self.subscribers.inc();
}
pub fn dec_subscribers(&self) {
self.subscribers.dec();
}
pub fn subscribers(&self) -> i64 {
self.subscribers.get()
}
pub fn set_monitored_paths(&self, n: i64) {
self.monitored_paths.set(n);
}
pub fn set_reader_groups(&self, n: i64) {
self.reader_groups.set(n);
}
pub fn set_pending_paths(&self, n: i64) {
self.pending_paths.set(n);
}
pub fn set_disk_buffer_events(&self, n: i64) {
self.disk_buffer_events.set(n);
}
pub fn format_prometheus(&self) -> String {
let mut out = String::new();
let entries = self.events_total.gather();
if !entries.is_empty() {
push_help_type(&mut out, self.events_total.name(), self.events_total.help(), "counter");
for (labels, value) in &entries {
push_metric_line(&mut out, self.events_total.name(), &[("event_type", &labels[0]), ("cmd", &labels[1])], *value);
}
}
push_gauge(&mut out, &self.subscribers);
push_gauge(&mut out, &self.monitored_paths);
push_gauge(&mut out, &self.reader_groups);
push_gauge(&mut out, &self.pending_paths);
push_gauge(&mut out, &self.disk_buffer_events);
out
}
}
fn push_help_type(out: &mut String, name: &str, help: &str, typ: &str) {
out.push_str("# HELP ");
out.push_str(name);
out.push(' ');
out.push_str(help);
out.push('\n');
out.push_str("# TYPE ");
out.push_str(name);
out.push(' ');
out.push_str(typ);
out.push('\n');
}
fn push_metric_line(out: &mut String, name: &str, labels: &[(&str, &str)], value: u64) {
out.push_str(name);
if !labels.is_empty() {
out.push('{');
for (i, (k, v)) in labels.iter().enumerate() {
if i > 0 {
out.push(',');
}
out.push_str(k);
out.push_str("=\"");
out.push_str(v);
out.push('"');
}
out.push('}');
}
out.push(' ');
out.push_str(&value.to_string());
out.push('\n');
}
fn push_gauge(out: &mut String, g: &IntGauge) {
push_help_type(out, g.name(), g.help(), "gauge");
push_metric_line(out, g.name(), &[], g.get() as u64);
}
pub async fn handle_metrics_socket(
mut writer: tokio::net::unix::OwnedWriteHalf,
metrics: &MetricsRegistry,
) {
use tokio::io::AsyncWriteExt;
let text = metrics.format_prometheus();
let _ = writer.write_all(text.as_bytes()).await;
}
pub async fn serve_metrics_tcp(
addr: SocketAddr,
metrics: MetricsRegistry,
) -> std::io::Result<bool> {
let listener = match tokio::net::TcpListener::bind(addr).await {
Ok(l) => l,
Err(e) => {
eprintln!(
"[WARNING] Cannot bind metrics TCP address {}: {}",
addr, e
);
return Ok(false);
}
};
eprintln!("[INFO] Metrics TCP endpoint: http://{}/metrics", addr);
tokio::spawn(async move {
loop {
match listener.accept().await {
Ok((mut stream, _)) => {
let text = metrics.format_prometheus();
let response = format!(
"HTTP/1.1 200 OK\r\n\
Content-Type: text/plain; version=0.0.4\r\n\
Content-Length: {}\r\n\
Connection: close\r\n\
\r\n\
{}",
text.len(),
text,
);
tokio::spawn(async move {
use tokio::io::AsyncWriteExt;
let _ = stream.write_all(response.as_bytes()).await;
});
}
Err(e) => {
eprintln!("[WARNING] Metrics TCP accept error: {}", e);
}
}
}
});
Ok(true)
}
#[cfg(test)]
mod tests {
use super::*;
use tokio::io::AsyncReadExt;
use tokio::net::TcpStream;
async fn start_test_metrics_server(metrics: MetricsRegistry) -> std::net::SocketAddr {
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
tokio::spawn(async move {
loop {
match listener.accept().await {
Ok((mut stream, _)) => {
let text = metrics.format_prometheus();
let response = format!(
"HTTP/1.1 200 OK\r\n\
Content-Type: text/plain; version=0.0.4\r\n\
Content-Length: {}\r\n\
Connection: close\r\n\
\r\n\
{}",
text.len(),
text,
);
tokio::spawn(async move {
use tokio::io::AsyncWriteExt;
let _ = stream.write_all(response.as_bytes()).await;
});
}
Err(_) => break,
}
}
});
addr
}
async fn read_http_response(stream: &mut TcpStream) -> (String, String) {
let mut buf = vec![0u8; 8192];
let n = stream.read(&mut buf).await.unwrap();
let raw = String::from_utf8_lossy(&buf[..n]).to_string();
if let Some(pos) = raw.find("\r\n\r\n") {
let headers = raw[..pos].to_string();
let body = raw[pos + 4..].to_string();
(headers, body)
} else {
(raw, String::new())
}
}
#[test]
fn test_counter_vec_inc() {
let cv = CounterVec::new("test_total", "Test counter");
cv.inc(&["CREATE", "nginx"]);
cv.inc(&["CREATE", "nginx"]);
cv.inc(&["MODIFY", "global"]);
cv.inc(&["CREATE", "nginx"]);
let entries = cv.gather();
let find = |et: &str, cmd: &str| -> Option<u64> {
entries
.iter()
.find(|(l, _)| l[0] == et && l[1] == cmd)
.map(|(_, v)| *v)
};
assert_eq!(find("CREATE", "nginx"), Some(3));
assert_eq!(find("MODIFY", "global"), Some(1));
assert_eq!(find("DELETE", "nginx"), None);
}
#[test]
fn test_counter_vec_concurrent() {
use std::thread;
let cv = CounterVec::new("test_total", "Test counter");
let cv2 = cv.clone();
let h = thread::spawn(move || {
for _ in 0..1000 {
cv2.inc(&["CREATE", "nginx"]);
}
});
for _ in 0..1000 {
cv.inc(&["CREATE", "nginx"]);
}
h.join().unwrap();
let entries = cv.gather();
let val = entries
.iter()
.find(|(l, _)| l[0] == "CREATE" && l[1] == "nginx")
.map(|(_, v)| *v);
assert_eq!(val, Some(2000));
}
#[test]
fn test_int_gauge() {
let g = IntGauge::new("test_gauge", "Test gauge");
assert_eq!(g.get(), 0);
g.inc();
assert_eq!(g.get(), 1);
g.inc();
g.inc();
assert_eq!(g.get(), 3);
g.dec();
assert_eq!(g.get(), 2);
g.set(42);
assert_eq!(g.get(), 42);
}
#[test]
fn test_format_prometheus() {
let r = MetricsRegistry::new();
r.inc_event("CREATE", "nginx");
r.inc_event("CREATE", "nginx");
r.inc_event("MODIFY", "global");
r.set_subscribers(3);
r.set_monitored_paths(5);
let text = r.format_prometheus();
assert!(text.contains("fsmon_events_total{event_type=\"CREATE\",cmd=\"nginx\"} 2"));
assert!(text.contains("fsmon_events_total{event_type=\"MODIFY\",cmd=\"global\"} 1"));
assert!(text.contains("fsmon_subscribers 3"));
assert!(text.contains("fsmon_monitored_paths 5"));
assert!(text.contains("# HELP fsmon_subscribers"));
assert!(text.contains("# TYPE fsmon_subscribers gauge"));
assert!(text.contains("# HELP fsmon_events_total"));
assert!(text.contains("# TYPE fsmon_events_total counter"));
}
#[test]
fn test_format_prometheus_empty_counters() {
let r = MetricsRegistry::new();
let text = r.format_prometheus();
assert!(text.contains("fsmon_subscribers 0"));
assert!(!text.contains("fsmon_events_total{"));
}
#[tokio::test]
async fn test_tcp_metrics_http_200_content_type() {
let r = MetricsRegistry::new();
r.set_subscribers(7);
r.set_monitored_paths(3);
r.set_reader_groups(2);
r.set_pending_paths(1);
let addr = start_test_metrics_server(r).await;
let mut stream = TcpStream::connect(addr).await.unwrap();
let (headers, body) = read_http_response(&mut stream).await;
assert!(headers.starts_with("HTTP/1.1 200 OK"), "Expected 200 OK, got: {}", headers.lines().next().unwrap_or(""));
assert!(headers.contains("Content-Type: text/plain"), "Expected text/plain Content-Type");
assert!(body.contains("fsmon_subscribers 7"));
assert!(body.contains("fsmon_monitored_paths 3"));
assert!(body.contains("fsmon_reader_groups 2"));
assert!(body.contains("fsmon_pending_paths 1"));
assert!(body.contains("# HELP fsmon_subscribers"));
assert!(body.contains("# TYPE fsmon_subscribers gauge"));
}
#[tokio::test]
async fn test_tcp_metrics_empty_registry() {
let r = MetricsRegistry::new();
let addr = start_test_metrics_server(r).await;
let mut stream = TcpStream::connect(addr).await.unwrap();
let (headers, body) = read_http_response(&mut stream).await;
assert!(headers.starts_with("HTTP/1.1 200 OK"));
assert!(body.contains("fsmon_subscribers 0"));
assert!(body.contains("fsmon_monitored_paths 0"));
assert!(!body.contains("fsmon_events_total{"));
}
#[tokio::test]
async fn test_tcp_metrics_with_events() {
let r = MetricsRegistry::new();
r.inc_event("CREATE", "nginx");
r.inc_event("CREATE", "nginx");
r.inc_event("DELETE", "global");
r.inc_event("MODIFY", "global");
r.inc_event("MODIFY", "global");
r.inc_event("MODIFY", "global");
let addr = start_test_metrics_server(r).await;
let mut stream = TcpStream::connect(addr).await.unwrap();
let (_, body) = read_http_response(&mut stream).await;
assert!(body.contains("fsmon_events_total{event_type=\"CREATE\",cmd=\"nginx\"} 2"));
assert!(body.contains("fsmon_events_total{event_type=\"DELETE\",cmd=\"global\"} 1"));
assert!(body.contains("fsmon_events_total{event_type=\"MODIFY\",cmd=\"global\"} 3"));
assert!(body.contains("# HELP fsmon_events_total"));
assert!(body.contains("# TYPE fsmon_events_total counter"));
}
#[tokio::test]
async fn test_tcp_metrics_connection_close_header() {
let r = MetricsRegistry::new();
let addr = start_test_metrics_server(r).await;
let mut stream = TcpStream::connect(addr).await.unwrap();
let (headers, _) = read_http_response(&mut stream).await;
assert!(headers.contains("Connection: close"),
"Prometheus scrape requires Connection: close, got headers: {}", headers);
}
#[tokio::test]
async fn test_tcp_metrics_content_length_matches_body() {
let r = MetricsRegistry::new();
r.set_subscribers(42);
r.set_monitored_paths(99);
let addr = start_test_metrics_server(r).await;
let mut stream = TcpStream::connect(addr).await.unwrap();
let (headers, body) = read_http_response(&mut stream).await;
let cl = headers
.lines()
.find(|l| l.to_lowercase().starts_with("content-length:"))
.and_then(|l| l.split(':').nth(1))
.and_then(|s| s.trim().parse::<usize>().ok())
.expect("Content-Length header missing or invalid");
assert_eq!(cl, body.len(),
"Content-Length ({}) must match body length ({})", cl, body.len());
}
}