Skip to main content

fast_telemetry_export/
dogstatsd.rs

1//! DogStatsD metrics exporter.
2//!
3//! Exports metrics to a DogStatsD-compatible agent (Datadog, StatsD) over UDP.
4//! The exporter handles UDP socket management, packet batching (respecting MTU),
5//! and newline-delimited packet splitting via `memchr`.
6//!
7//! The actual metric serialization is provided by the caller via a closure,
8//! making this exporter work with any metrics struct.
9
10use std::time::Duration;
11
12/// Configuration for the DogStatsD exporter.
13#[derive(Clone)]
14pub struct DogStatsDConfig {
15    /// DogStatsD endpoint (host:port), e.g. "127.0.0.1:8125"
16    pub endpoint: String,
17    /// Export interval
18    pub interval: Duration,
19    /// Maximum UDP packet size (default: 8000 bytes)
20    pub max_packet_size: usize,
21}
22
23impl Default for DogStatsDConfig {
24    fn default() -> Self {
25        Self {
26            endpoint: "127.0.0.1:8125".to_string(),
27            interval: Duration::from_secs(10),
28            max_packet_size: 8000,
29        }
30    }
31}
32
33impl DogStatsDConfig {
34    pub fn new(endpoint: impl Into<String>) -> Self {
35        Self {
36            endpoint: endpoint.into(),
37            ..Default::default()
38        }
39    }
40
41    pub fn with_interval(mut self, interval: Duration) -> Self {
42        self.interval = interval;
43        self
44    }
45
46    pub fn with_max_packet_size(mut self, size: usize) -> Self {
47        self.max_packet_size = size;
48        self
49    }
50}
51
52/// Run the DogStatsD export loop.
53///
54/// `export_fn` is called each cycle with a `&mut String` buffer. The closure
55/// should append newline-delimited DogStatsD metric lines (typically via
56/// `ExportMetrics::export_dogstatsd_delta`). The exporter handles UDP batching
57/// and sending.
58///
59/// Runs until `cancel` is triggered. On cancellation, returns immediately
60/// without a final export (DogStatsD is fire-and-forget).
61///
62/// # Example
63///
64/// ```ignore
65/// use std::sync::Arc;
66///
67/// use fast_telemetry_export::dogstatsd::{DogStatsDConfig, run};
68/// use tokio_util::sync::CancellationToken;
69///
70/// let metrics = Arc::new(MyMetrics::new());
71/// let mut state = MyMetricsExportState::new();
72/// let tags: Vec<(&str, &str)> = vec![("service", "myapp")];
73/// let cancel = CancellationToken::new();
74/// let config = DogStatsDConfig::new("127.0.0.1:8125");
75///
76/// let m = metrics.clone();
77/// tokio::spawn(run(config, cancel, move |output| {
78///     m.export_dogstatsd_delta(output, &tags, &mut state);
79/// }));
80/// ```
81///
82/// `MyMetricsExportState` is the derive-generated state type for delta
83/// DogStatsD export. Keep one state instance per export sink.
84pub async fn run<F>(
85    config: DogStatsDConfig,
86    cancel: tokio_util::sync::CancellationToken,
87    mut export_fn: F,
88) where
89    F: FnMut(&mut String),
90{
91    use tokio::net::UdpSocket;
92    use tokio::time::MissedTickBehavior;
93
94    log::info!("Starting DogStatsD exporter, endpoint={}", config.endpoint);
95
96    let socket = match UdpSocket::bind("0.0.0.0:0").await {
97        Ok(s) => s,
98        Err(e) => {
99            log::error!("Failed to bind UDP socket for DogStatsD export: {e}");
100            return;
101        }
102    };
103
104    if let Err(e) = socket.connect(&config.endpoint).await {
105        log::error!("Failed to connect UDP socket to {}: {e}", config.endpoint);
106        return;
107    }
108
109    let max_packet_size = config.max_packet_size;
110    let mut output = String::with_capacity(16384);
111    let mut batch = Vec::<u8>::with_capacity(max_packet_size);
112
113    let mut interval = tokio::time::interval(config.interval);
114    interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
115    interval.tick().await;
116
117    loop {
118        tokio::select! {
119            _ = interval.tick() => {}
120            _ = cancel.cancelled() => {
121                log::info!("DogStatsD exporter shutting down");
122                return;
123            }
124        }
125
126        output.clear();
127        export_fn(&mut output);
128
129        if output.is_empty() {
130            continue;
131        }
132
133        let output_bytes = output.as_bytes();
134        batch.clear();
135
136        let mut total_sent = 0usize;
137        let mut batch_count = 0usize;
138        let mut metric_count = 0usize;
139        let mut start = 0usize;
140
141        for nl in memchr::memchr_iter(b'\n', output_bytes) {
142            let end = nl + 1;
143            let line = &output_bytes[start..end];
144            let line_len = line.len();
145            metric_count += 1;
146
147            if line_len > max_packet_size {
148                log::warn!(
149                    "Dropping oversized metric line ({line_len} bytes, max {max_packet_size})"
150                );
151                start = end;
152                continue;
153            }
154
155            if !batch.is_empty() && batch.len() + line_len > max_packet_size {
156                match socket.send(&batch).await {
157                    Ok(n) => {
158                        total_sent += n;
159                        batch_count += 1;
160                    }
161                    Err(e) => log::warn!("Failed to send DogStatsD batch: {e}"),
162                }
163                batch.clear();
164            }
165
166            batch.extend_from_slice(line);
167            start = end;
168        }
169
170        // Handle trailing bytes if output didn't end with '\n'
171        if start < output_bytes.len() {
172            let line = &output_bytes[start..];
173            let line_len = line.len();
174            metric_count += 1;
175
176            if line_len <= max_packet_size {
177                if !batch.is_empty() && batch.len() + line_len > max_packet_size {
178                    match socket.send(&batch).await {
179                        Ok(n) => {
180                            total_sent += n;
181                            batch_count += 1;
182                        }
183                        Err(e) => log::warn!("Failed to send DogStatsD batch: {e}"),
184                    }
185                    batch.clear();
186                }
187                batch.extend_from_slice(line);
188            } else {
189                log::warn!("Dropping oversized trailing metric ({line_len} bytes)");
190            }
191        }
192
193        if !batch.is_empty() {
194            match socket.send(&batch).await {
195                Ok(n) => {
196                    total_sent += n;
197                    batch_count += 1;
198                }
199                Err(e) => log::warn!("Failed to send final DogStatsD batch: {e}"),
200            }
201        }
202
203        log::debug!(
204            "DogStatsD export: {metric_count} metrics, {batch_count} batches, {total_sent} bytes"
205        );
206    }
207}