use crate::{LabelSet, Registry};
use std::fmt::Write as _;
use std::io;
use std::net::{SocketAddr, ToSocketAddrs, UdpSocket};
const DEFAULT_PACKET_SIZE: usize = 1432;
pub struct StatsdSink {
socket: UdpSocket,
target: SocketAddr,
prefix: Option<String>,
packet_size: usize,
}
impl StatsdSink {
pub fn new(addr: impl ToSocketAddrs) -> io::Result<Self> {
let target = addr
.to_socket_addrs()?
.next()
.ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "empty SocketAddr list"))?;
let bind = if target.is_ipv6() {
"[::]:0"
} else {
"0.0.0.0:0"
};
let socket = UdpSocket::bind(bind)?;
Ok(Self {
socket,
target,
prefix: None,
packet_size: DEFAULT_PACKET_SIZE,
})
}
pub fn with_socket(socket: UdpSocket, target: SocketAddr) -> Self {
Self {
socket,
target,
prefix: None,
packet_size: DEFAULT_PACKET_SIZE,
}
}
#[must_use]
pub fn with_prefix(mut self, prefix: impl Into<String>) -> Self {
self.prefix = Some(prefix.into());
self
}
#[must_use]
pub fn with_packet_size(mut self, size: usize) -> Self {
self.packet_size = size.max(64);
self
}
pub fn send(&self, registry: &Registry) -> io::Result<usize> {
let body = self.render(registry);
self.flush_lines(&body)
}
#[must_use]
pub fn render(&self, registry: &Registry) -> String {
let mut out = String::with_capacity(2048);
let prefix = self.prefix.as_deref().unwrap_or("");
#[cfg(feature = "count")]
for (name, labels, c) in registry.counter_entries() {
emit_gauge_line(&mut out, prefix, &name, &labels, c.get() as f64);
}
#[cfg(feature = "gauge")]
for (name, labels, g) in registry.gauge_entries() {
emit_gauge_line(&mut out, prefix, &name, &labels, g.get());
}
#[cfg(feature = "timer")]
for (name, labels, t) in registry.timer_entries() {
let count = t.count();
emit_gauge_line(
&mut out,
prefix,
&format!("{name}.count"),
&labels,
count as f64,
);
emit_gauge_line(
&mut out,
prefix,
&format!("{name}.sum_seconds"),
&labels,
t.total().as_secs_f64(),
);
let min_s = if count == 0 {
0.0
} else {
t.min().as_secs_f64()
};
let max_s = if count == 0 {
0.0
} else {
t.max().as_secs_f64()
};
emit_gauge_line(
&mut out,
prefix,
&format!("{name}.min_seconds"),
&labels,
min_s,
);
emit_gauge_line(
&mut out,
prefix,
&format!("{name}.max_seconds"),
&labels,
max_s,
);
}
#[cfg(feature = "meter")]
for (name, labels, r) in registry.rate_meter_entries() {
emit_gauge_line(
&mut out,
prefix,
&format!("{name}.total"),
&labels,
r.total() as f64,
);
emit_gauge_line(
&mut out,
prefix,
&format!("{name}.per_second"),
&labels,
r.rate(),
);
}
#[cfg(feature = "histogram")]
for (name, labels, h) in registry.histogram_entries() {
emit_gauge_line(
&mut out,
prefix,
&format!("{name}.count"),
&labels,
h.count() as f64,
);
emit_gauge_line(&mut out, prefix, &format!("{name}.sum"), &labels, h.sum());
emit_gauge_line(
&mut out,
prefix,
&format!("{name}.p50"),
&labels,
h.quantile(0.50),
);
emit_gauge_line(
&mut out,
prefix,
&format!("{name}.p95"),
&labels,
h.quantile(0.95),
);
emit_gauge_line(
&mut out,
prefix,
&format!("{name}.p99"),
&labels,
h.quantile(0.99),
);
}
out
}
fn flush_lines(&self, body: &str) -> io::Result<usize> {
let mut total_sent = 0usize;
let mut buf = String::with_capacity(self.packet_size);
for line in body.split('\n').filter(|l| !l.is_empty()) {
let needed = line.len() + if buf.is_empty() { 0 } else { 1 };
if !buf.is_empty() && buf.len() + needed > self.packet_size {
total_sent += self.socket.send_to(buf.as_bytes(), self.target)?;
buf.clear();
}
if !buf.is_empty() {
buf.push('\n');
}
buf.push_str(line);
}
if !buf.is_empty() {
total_sent += self.socket.send_to(buf.as_bytes(), self.target)?;
}
Ok(total_sent)
}
}
fn emit_gauge_line(out: &mut String, prefix: &str, name: &str, labels: &LabelSet, value: f64) {
let value_str = if value.is_nan() {
"0".to_string()
} else if value == f64::INFINITY {
f64::MAX.to_string()
} else if value == f64::NEG_INFINITY {
f64::MIN.to_string()
} else {
format!("{value}")
};
write!(out, "{prefix}{name}:{value_str}|g").unwrap();
if !labels.is_empty() {
out.push_str(&labels.to_statsd());
}
out.push('\n');
}
#[cfg(test)]
mod tests {
use super::*;
use crate::Registry;
fn loopback_sink() -> (StatsdSink, std::net::SocketAddr) {
let recv = UdpSocket::bind("127.0.0.1:0").unwrap();
let target = recv.local_addr().unwrap();
let send = UdpSocket::bind("127.0.0.1:0").unwrap();
(StatsdSink::with_socket(send, target), target)
}
#[test]
fn render_emits_gauge_lines_for_counters_and_gauges() {
let r = Registry::new();
#[cfg(feature = "count")]
r.get_or_create_counter("hits").add(5);
#[cfg(feature = "gauge")]
r.get_or_create_gauge("temp_c").set(21.5);
let send = UdpSocket::bind("127.0.0.1:0").unwrap();
let sink = StatsdSink::with_socket(send, "127.0.0.1:0".parse().unwrap());
let body = sink.render(&r);
#[cfg(feature = "count")]
assert!(body.contains("hits:5|g\n"), "{body}");
#[cfg(feature = "gauge")]
assert!(body.contains("temp_c:21.5|g\n"), "{body}");
}
#[test]
#[cfg(feature = "count")]
fn labels_render_as_tags() {
let r = Registry::new();
let labels = LabelSet::from([("region", "us"), ("env", "prod")]);
r.get_or_create_counter_with("requests", &labels).inc();
let send = UdpSocket::bind("127.0.0.1:0").unwrap();
let sink = StatsdSink::with_socket(send, "127.0.0.1:0".parse().unwrap());
let body = sink.render(&r);
assert!(
body.contains("requests:1|g|#env:prod,region:us\n"),
"{body}"
);
}
#[test]
fn prefix_is_prepended() {
let r = Registry::new();
#[cfg(feature = "gauge")]
r.get_or_create_gauge("temp").set(42.0);
let send = UdpSocket::bind("127.0.0.1:0").unwrap();
let sink =
StatsdSink::with_socket(send, "127.0.0.1:0".parse().unwrap()).with_prefix("svc.");
let body = sink.render(&r);
#[cfg(feature = "gauge")]
assert!(body.contains("svc.temp:42|g\n"), "{body}");
let _ = body;
}
#[test]
fn send_writes_at_least_one_datagram() {
let r = Registry::new();
#[cfg(feature = "count")]
r.get_or_create_counter("hits").inc();
let (sink, _target) = loopback_sink();
let sent = sink.send(&r).unwrap();
#[cfg(feature = "count")]
assert!(sent > 0);
let _ = sent;
}
#[test]
#[cfg(feature = "timer")]
fn timer_renders_four_gauge_lines() {
let r = Registry::new();
let t = r.get_or_create_timer("rpc");
t.record(std::time::Duration::from_millis(2));
let send = UdpSocket::bind("127.0.0.1:0").unwrap();
let sink = StatsdSink::with_socket(send, "127.0.0.1:0".parse().unwrap());
let body = sink.render(&r);
assert!(body.contains("rpc.count:1|g\n"), "{body}");
assert!(body.contains("rpc.sum_seconds:"), "{body}");
assert!(body.contains("rpc.min_seconds:"), "{body}");
assert!(body.contains("rpc.max_seconds:"), "{body}");
}
#[test]
#[cfg(feature = "meter")]
fn rate_meter_renders_two_gauge_lines() {
let r = Registry::new();
r.get_or_create_rate_meter("qps").tick_n(3);
let send = UdpSocket::bind("127.0.0.1:0").unwrap();
let sink = StatsdSink::with_socket(send, "127.0.0.1:0".parse().unwrap());
let body = sink.render(&r);
assert!(body.contains("qps.total:3|g\n"), "{body}");
assert!(body.contains("qps.per_second:"), "{body}");
}
#[test]
#[cfg(feature = "histogram")]
fn histogram_renders_count_sum_and_three_quantiles() {
let r = Registry::new();
let h = r.get_or_create_histogram("rtt");
for v in &[0.001, 0.002, 0.005, 0.01, 0.02] {
h.observe(*v);
}
let send = UdpSocket::bind("127.0.0.1:0").unwrap();
let sink = StatsdSink::with_socket(send, "127.0.0.1:0".parse().unwrap());
let body = sink.render(&r);
assert!(body.contains("rtt.count:5|g\n"), "{body}");
assert!(body.contains("rtt.sum:"), "{body}");
assert!(body.contains("rtt.p50:"), "{body}");
assert!(body.contains("rtt.p95:"), "{body}");
assert!(body.contains("rtt.p99:"), "{body}");
}
#[test]
#[cfg(feature = "gauge")]
fn non_finite_gauge_value_renders_safely() {
let r = Registry::new();
let g = r.get_or_create_gauge("temp");
g.set(f64::INFINITY);
let send = UdpSocket::bind("127.0.0.1:0").unwrap();
let sink = StatsdSink::with_socket(send, "127.0.0.1:0".parse().unwrap());
let body = sink.render(&r);
assert!(body.starts_with("temp:"), "{body}");
assert!(!body.contains("Inf"), "{body}");
assert!(!body.contains("NaN"), "{body}");
}
#[test]
fn with_packet_size_floor_is_enforced() {
let send = UdpSocket::bind("127.0.0.1:0").unwrap();
let sink =
StatsdSink::with_socket(send, "127.0.0.1:0".parse().unwrap()).with_packet_size(8);
let r = Registry::new();
#[cfg(feature = "count")]
r.get_or_create_counter("x").inc();
let _ = sink.send(&r);
}
#[test]
fn send_packetises_long_bodies() {
let r = Registry::new();
#[cfg(feature = "count")]
{
for i in 0..200 {
r.get_or_create_counter(&format!("metric_{i}"))
.add(i as u64);
}
}
let (sink, _) = loopback_sink();
let sink = sink.with_packet_size(256); let sent = sink.send(&r).unwrap();
let _ = sent;
}
}