use std::time::Duration;
#[derive(Clone)]
pub struct DogStatsDConfig {
pub endpoint: String,
pub interval: Duration,
pub max_packet_size: usize,
}
impl Default for DogStatsDConfig {
fn default() -> Self {
Self {
endpoint: "127.0.0.1:8125".to_string(),
interval: Duration::from_secs(10),
max_packet_size: 8000,
}
}
}
impl DogStatsDConfig {
pub fn new(endpoint: impl Into<String>) -> Self {
Self {
endpoint: endpoint.into(),
..Default::default()
}
}
pub fn with_interval(mut self, interval: Duration) -> Self {
self.interval = interval;
self
}
pub fn with_max_packet_size(mut self, size: usize) -> Self {
self.max_packet_size = size;
self
}
}
pub async fn run<F>(
config: DogStatsDConfig,
cancel: tokio_util::sync::CancellationToken,
mut export_fn: F,
) where
F: FnMut(&mut String),
{
use tokio::net::UdpSocket;
use tokio::time::MissedTickBehavior;
log::info!("Starting DogStatsD exporter, endpoint={}", config.endpoint);
let socket = match UdpSocket::bind("0.0.0.0:0").await {
Ok(s) => s,
Err(e) => {
log::error!("Failed to bind UDP socket for DogStatsD export: {e}");
return;
}
};
if let Err(e) = socket.connect(&config.endpoint).await {
log::error!("Failed to connect UDP socket to {}: {e}", config.endpoint);
return;
}
let max_packet_size = config.max_packet_size;
let mut output = String::with_capacity(16384);
let mut batch = Vec::<u8>::with_capacity(max_packet_size);
let mut interval = tokio::time::interval(config.interval);
interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
interval.tick().await;
loop {
tokio::select! {
_ = interval.tick() => {}
_ = cancel.cancelled() => {
log::info!("DogStatsD exporter shutting down");
return;
}
}
output.clear();
export_fn(&mut output);
if output.is_empty() {
continue;
}
let output_bytes = output.as_bytes();
batch.clear();
let mut total_sent = 0usize;
let mut batch_count = 0usize;
let mut metric_count = 0usize;
let mut start = 0usize;
for nl in memchr::memchr_iter(b'\n', output_bytes) {
let end = nl + 1;
let line = &output_bytes[start..end];
let line_len = line.len();
metric_count += 1;
if line_len > max_packet_size {
log::warn!(
"Dropping oversized metric line ({line_len} bytes, max {max_packet_size})"
);
start = end;
continue;
}
if !batch.is_empty() && batch.len() + line_len > max_packet_size {
match socket.send(&batch).await {
Ok(n) => {
total_sent += n;
batch_count += 1;
}
Err(e) => log::warn!("Failed to send DogStatsD batch: {e}"),
}
batch.clear();
}
batch.extend_from_slice(line);
start = end;
}
if start < output_bytes.len() {
let line = &output_bytes[start..];
let line_len = line.len();
metric_count += 1;
if line_len <= max_packet_size {
if !batch.is_empty() && batch.len() + line_len > max_packet_size {
match socket.send(&batch).await {
Ok(n) => {
total_sent += n;
batch_count += 1;
}
Err(e) => log::warn!("Failed to send DogStatsD batch: {e}"),
}
batch.clear();
}
batch.extend_from_slice(line);
} else {
log::warn!("Dropping oversized trailing metric ({line_len} bytes)");
}
}
if !batch.is_empty() {
match socket.send(&batch).await {
Ok(n) => {
total_sent += n;
batch_count += 1;
}
Err(e) => log::warn!("Failed to send final DogStatsD batch: {e}"),
}
}
log::debug!(
"DogStatsD export: {metric_count} metrics, {batch_count} batches, {total_sent} bytes"
);
}
}
#[cfg(feature = "monoio")]
pub async fn run_monoio<F>(
config: DogStatsDConfig,
cancel: tokio_util::sync::CancellationToken,
mut export_fn: F,
) where
F: FnMut(&mut String),
{
use std::net::{SocketAddr, ToSocketAddrs};
use monoio::net::udp::UdpSocket;
use monoio::time::MissedTickBehavior;
log::info!(
"Starting monoio DogStatsD exporter, endpoint={}",
config.endpoint
);
let endpoint = match config.endpoint.to_socket_addrs() {
Ok(mut addrs) => match addrs.next() {
Some(addr) => addr,
None => {
log::error!("DogStatsD endpoint resolved to no addresses");
return;
}
},
Err(e) => {
log::error!(
"Failed to resolve DogStatsD endpoint {}: {e}",
config.endpoint
);
return;
}
};
let bind_addr: SocketAddr = if endpoint.is_ipv4() {
"0.0.0.0:0"
} else {
"[::]:0"
}
.parse()
.expect("valid UDP bind address");
let socket = match UdpSocket::bind(bind_addr) {
Ok(s) => s,
Err(e) => {
log::error!("Failed to bind monoio UDP socket for DogStatsD export: {e}");
return;
}
};
if let Err(e) = socket.connect(endpoint).await {
log::error!("Failed to connect monoio UDP socket to {endpoint}: {e}");
return;
}
let max_packet_size = config.max_packet_size;
let mut output = String::with_capacity(16384);
let mut batch = Vec::<u8>::with_capacity(max_packet_size);
let mut interval = monoio::time::interval(config.interval);
interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
interval.tick().await;
loop {
monoio::select! {
_ = interval.tick() => {}
_ = cancel.cancelled() => {
log::info!("monoio DogStatsD exporter shutting down");
return;
}
}
output.clear();
export_fn(&mut output);
if output.is_empty() {
continue;
}
let output_bytes = output.as_bytes();
batch.clear();
let mut total_sent = 0usize;
let mut batch_count = 0usize;
let mut metric_count = 0usize;
let mut start = 0usize;
for nl in memchr::memchr_iter(b'\n', output_bytes) {
let end = nl + 1;
let line = &output_bytes[start..end];
let line_len = line.len();
metric_count += 1;
if line_len > max_packet_size {
log::warn!(
"Dropping oversized metric line ({line_len} bytes, max {max_packet_size})"
);
start = end;
continue;
}
if !batch.is_empty() && batch.len() + line_len > max_packet_size {
if let Some(n) = send_monoio_batch(&socket, &mut batch, "DogStatsD batch").await {
total_sent += n;
batch_count += 1;
}
}
batch.extend_from_slice(line);
start = end;
}
if start < output_bytes.len() {
let line = &output_bytes[start..];
let line_len = line.len();
metric_count += 1;
if line_len <= max_packet_size {
if !batch.is_empty() && batch.len() + line_len > max_packet_size {
if let Some(n) = send_monoio_batch(&socket, &mut batch, "DogStatsD batch").await
{
total_sent += n;
batch_count += 1;
}
}
batch.extend_from_slice(line);
} else {
log::warn!("Dropping oversized trailing metric ({line_len} bytes)");
}
}
if !batch.is_empty()
&& let Some(n) = send_monoio_batch(&socket, &mut batch, "final DogStatsD batch").await
{
total_sent += n;
batch_count += 1;
}
log::debug!(
"monoio DogStatsD export: {metric_count} metrics, {batch_count} batches, {total_sent} bytes"
);
}
}
#[cfg(feature = "monoio")]
async fn send_monoio_batch(
socket: &monoio::net::udp::UdpSocket,
batch: &mut Vec<u8>,
context: &str,
) -> Option<usize> {
let send_buf = std::mem::take(batch);
let (result, mut send_buf) = socket.send(send_buf).await;
send_buf.clear();
*batch = send_buf;
match result {
Ok(n) => Some(n),
Err(e) => {
log::warn!("Failed to send monoio {context}: {e}");
None
}
}
}