use std::time::Duration;
#[derive(Clone)]
pub struct DogStatsDConfig {
pub endpoint: String,
pub interval: Duration,
pub max_packet_size: usize,
}
impl Default for DogStatsDConfig {
fn default() -> Self {
Self {
endpoint: "127.0.0.1:8125".to_string(),
interval: Duration::from_secs(10),
max_packet_size: 8000,
}
}
}
impl DogStatsDConfig {
pub fn new(endpoint: impl Into<String>) -> Self {
Self {
endpoint: endpoint.into(),
..Default::default()
}
}
pub fn with_interval(mut self, interval: Duration) -> Self {
self.interval = interval;
self
}
pub fn with_max_packet_size(mut self, size: usize) -> Self {
self.max_packet_size = size;
self
}
}
pub async fn run<F>(
config: DogStatsDConfig,
cancel: tokio_util::sync::CancellationToken,
mut export_fn: F,
) where
F: FnMut(&mut String),
{
use tokio::net::UdpSocket;
use tokio::time::MissedTickBehavior;
log::info!("Starting DogStatsD exporter, endpoint={}", config.endpoint);
let socket = match UdpSocket::bind("0.0.0.0:0").await {
Ok(s) => s,
Err(e) => {
log::error!("Failed to bind UDP socket for DogStatsD export: {e}");
return;
}
};
if let Err(e) = socket.connect(&config.endpoint).await {
log::error!("Failed to connect UDP socket to {}: {e}", config.endpoint);
return;
}
let max_packet_size = config.max_packet_size;
let mut output = String::with_capacity(16384);
let mut batch = Vec::<u8>::with_capacity(max_packet_size);
let mut interval = tokio::time::interval(config.interval);
interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
interval.tick().await;
loop {
tokio::select! {
_ = interval.tick() => {}
_ = cancel.cancelled() => {
log::info!("DogStatsD exporter shutting down");
return;
}
}
output.clear();
export_fn(&mut output);
if output.is_empty() {
continue;
}
let output_bytes = output.as_bytes();
batch.clear();
let mut total_sent = 0usize;
let mut batch_count = 0usize;
let mut metric_count = 0usize;
let mut start = 0usize;
for nl in memchr::memchr_iter(b'\n', output_bytes) {
let end = nl + 1;
let line = &output_bytes[start..end];
let line_len = line.len();
metric_count += 1;
if line_len > max_packet_size {
log::warn!(
"Dropping oversized metric line ({line_len} bytes, max {max_packet_size})"
);
start = end;
continue;
}
if !batch.is_empty() && batch.len() + line_len > max_packet_size {
match socket.send(&batch).await {
Ok(n) => {
total_sent += n;
batch_count += 1;
}
Err(e) => log::warn!("Failed to send DogStatsD batch: {e}"),
}
batch.clear();
}
batch.extend_from_slice(line);
start = end;
}
if start < output_bytes.len() {
let line = &output_bytes[start..];
let line_len = line.len();
metric_count += 1;
if line_len <= max_packet_size {
if !batch.is_empty() && batch.len() + line_len > max_packet_size {
match socket.send(&batch).await {
Ok(n) => {
total_sent += n;
batch_count += 1;
}
Err(e) => log::warn!("Failed to send DogStatsD batch: {e}"),
}
batch.clear();
}
batch.extend_from_slice(line);
} else {
log::warn!("Dropping oversized trailing metric ({line_len} bytes)");
}
}
if !batch.is_empty() {
match socket.send(&batch).await {
Ok(n) => {
total_sent += n;
batch_count += 1;
}
Err(e) => log::warn!("Failed to send final DogStatsD batch: {e}"),
}
}
log::debug!(
"DogStatsD export: {metric_count} metrics, {batch_count} batches, {total_sent} bytes"
);
}
}