Skip to main content

fast_telemetry_export/
dogstatsd.rs

1//! DogStatsD metrics exporter.
2//!
3//! Exports metrics to a DogStatsD-compatible agent (Datadog, StatsD) over UDP.
4//! The exporter handles UDP socket management, packet batching (respecting MTU),
5//! and newline-delimited packet splitting via `memchr`.
6//!
7//! The actual metric serialization is provided by the caller via a closure,
8//! making this exporter work with any metrics struct.
9
10use std::time::Duration;
11
12/// Configuration for the DogStatsD exporter.
13#[derive(Clone)]
14pub struct DogStatsDConfig {
15    /// DogStatsD endpoint (host:port), e.g. "127.0.0.1:8125"
16    pub endpoint: String,
17    /// Export interval
18    pub interval: Duration,
19    /// Maximum UDP packet size (default: 8000 bytes)
20    pub max_packet_size: usize,
21}
22
23impl Default for DogStatsDConfig {
24    fn default() -> Self {
25        Self {
26            endpoint: "127.0.0.1:8125".to_string(),
27            interval: Duration::from_secs(10),
28            max_packet_size: 8000,
29        }
30    }
31}
32
33impl DogStatsDConfig {
34    pub fn new(endpoint: impl Into<String>) -> Self {
35        Self {
36            endpoint: endpoint.into(),
37            ..Default::default()
38        }
39    }
40
41    pub fn with_interval(mut self, interval: Duration) -> Self {
42        self.interval = interval;
43        self
44    }
45
46    pub fn with_max_packet_size(mut self, size: usize) -> Self {
47        self.max_packet_size = size;
48        self
49    }
50}
51
52/// Run the DogStatsD export loop.
53///
54/// `export_fn` is called each cycle with a `&mut String` buffer. The closure
55/// should append newline-delimited DogStatsD metric lines (typically via
56/// `ExportMetrics::export_dogstatsd_delta`). The exporter handles UDP batching
57/// and sending.
58///
59/// Runs until `cancel` is triggered. On cancellation, returns immediately
60/// without a final export (DogStatsD is fire-and-forget).
61///
62/// # Example
63///
64/// ```ignore
65/// use std::sync::Arc;
66///
67/// use fast_telemetry_export::dogstatsd::{DogStatsDConfig, run};
68/// use tokio_util::sync::CancellationToken;
69///
70/// let metrics = Arc::new(MyMetrics::new());
71/// let mut state = MyMetricsExportState::new();
72/// let tags: Vec<(&str, &str)> = vec![("service", "myapp")];
73/// let cancel = CancellationToken::new();
74/// let config = DogStatsDConfig::new("127.0.0.1:8125");
75///
76/// let m = metrics.clone();
77/// tokio::spawn(run(config, cancel, move |output| {
78///     m.export_dogstatsd_delta(output, &tags, &mut state);
79/// }));
80/// ```
81///
82/// `MyMetricsExportState` is the derive-generated state type for delta
83/// DogStatsD export. Keep one state instance per export sink.
84pub async fn run<F>(
85    config: DogStatsDConfig,
86    cancel: tokio_util::sync::CancellationToken,
87    mut export_fn: F,
88) where
89    F: FnMut(&mut String),
90{
91    use tokio::net::UdpSocket;
92    use tokio::time::MissedTickBehavior;
93
94    log::info!("Starting DogStatsD exporter, endpoint={}", config.endpoint);
95
96    let socket = match UdpSocket::bind("0.0.0.0:0").await {
97        Ok(s) => s,
98        Err(e) => {
99            log::error!("Failed to bind UDP socket for DogStatsD export: {e}");
100            return;
101        }
102    };
103
104    if let Err(e) = socket.connect(&config.endpoint).await {
105        log::error!("Failed to connect UDP socket to {}: {e}", config.endpoint);
106        return;
107    }
108
109    let max_packet_size = config.max_packet_size;
110    let mut output = String::with_capacity(16384);
111    let mut batch = Vec::<u8>::with_capacity(max_packet_size);
112
113    let mut interval = tokio::time::interval(config.interval);
114    interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
115    interval.tick().await;
116
117    loop {
118        tokio::select! {
119            _ = interval.tick() => {}
120            _ = cancel.cancelled() => {
121                log::info!("DogStatsD exporter shutting down");
122                return;
123            }
124        }
125
126        output.clear();
127        export_fn(&mut output);
128
129        if output.is_empty() {
130            continue;
131        }
132
133        let output_bytes = output.as_bytes();
134        batch.clear();
135
136        let mut total_sent = 0usize;
137        let mut batch_count = 0usize;
138        let mut metric_count = 0usize;
139        let mut start = 0usize;
140
141        for nl in memchr::memchr_iter(b'\n', output_bytes) {
142            let end = nl + 1;
143            let line = &output_bytes[start..end];
144            let line_len = line.len();
145            metric_count += 1;
146
147            if line_len > max_packet_size {
148                log::warn!(
149                    "Dropping oversized metric line ({line_len} bytes, max {max_packet_size})"
150                );
151                start = end;
152                continue;
153            }
154
155            if !batch.is_empty() && batch.len() + line_len > max_packet_size {
156                match socket.send(&batch).await {
157                    Ok(n) => {
158                        total_sent += n;
159                        batch_count += 1;
160                    }
161                    Err(e) => log::warn!("Failed to send DogStatsD batch: {e}"),
162                }
163                batch.clear();
164            }
165
166            batch.extend_from_slice(line);
167            start = end;
168        }
169
170        // Handle trailing bytes if output didn't end with '\n'
171        if start < output_bytes.len() {
172            let line = &output_bytes[start..];
173            let line_len = line.len();
174            metric_count += 1;
175
176            if line_len <= max_packet_size {
177                if !batch.is_empty() && batch.len() + line_len > max_packet_size {
178                    match socket.send(&batch).await {
179                        Ok(n) => {
180                            total_sent += n;
181                            batch_count += 1;
182                        }
183                        Err(e) => log::warn!("Failed to send DogStatsD batch: {e}"),
184                    }
185                    batch.clear();
186                }
187                batch.extend_from_slice(line);
188            } else {
189                log::warn!("Dropping oversized trailing metric ({line_len} bytes)");
190            }
191        }
192
193        if !batch.is_empty() {
194            match socket.send(&batch).await {
195                Ok(n) => {
196                    total_sent += n;
197                    batch_count += 1;
198                }
199                Err(e) => log::warn!("Failed to send final DogStatsD batch: {e}"),
200            }
201        }
202
203        log::debug!(
204            "DogStatsD export: {metric_count} metrics, {batch_count} batches, {total_sent} bytes"
205        );
206    }
207}
208
209/// Run the DogStatsD export loop on a monoio runtime.
210///
211/// This is the monoio-native counterpart to [`run`]. It uses
212/// [`monoio::net::udp::UdpSocket`] and [`monoio::time::interval`], so the
213/// caller must run it inside a monoio runtime with timers enabled.
214#[cfg(feature = "monoio")]
215pub async fn run_monoio<F>(
216    config: DogStatsDConfig,
217    cancel: tokio_util::sync::CancellationToken,
218    mut export_fn: F,
219) where
220    F: FnMut(&mut String),
221{
222    use std::net::{SocketAddr, ToSocketAddrs};
223
224    use monoio::net::udp::UdpSocket;
225    use monoio::time::MissedTickBehavior;
226
227    log::info!(
228        "Starting monoio DogStatsD exporter, endpoint={}",
229        config.endpoint
230    );
231
232    let endpoint = match config.endpoint.to_socket_addrs() {
233        Ok(mut addrs) => match addrs.next() {
234            Some(addr) => addr,
235            None => {
236                log::error!("DogStatsD endpoint resolved to no addresses");
237                return;
238            }
239        },
240        Err(e) => {
241            log::error!(
242                "Failed to resolve DogStatsD endpoint {}: {e}",
243                config.endpoint
244            );
245            return;
246        }
247    };
248
249    let bind_addr: SocketAddr = if endpoint.is_ipv4() {
250        "0.0.0.0:0"
251    } else {
252        "[::]:0"
253    }
254    .parse()
255    .expect("valid UDP bind address");
256
257    let socket = match UdpSocket::bind(bind_addr) {
258        Ok(s) => s,
259        Err(e) => {
260            log::error!("Failed to bind monoio UDP socket for DogStatsD export: {e}");
261            return;
262        }
263    };
264
265    if let Err(e) = socket.connect(endpoint).await {
266        log::error!("Failed to connect monoio UDP socket to {endpoint}: {e}");
267        return;
268    }
269
270    let max_packet_size = config.max_packet_size;
271    let mut output = String::with_capacity(16384);
272    let mut batch = Vec::<u8>::with_capacity(max_packet_size);
273
274    let mut interval = monoio::time::interval(config.interval);
275    interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
276    interval.tick().await;
277
278    loop {
279        monoio::select! {
280            _ = interval.tick() => {}
281            _ = cancel.cancelled() => {
282                log::info!("monoio DogStatsD exporter shutting down");
283                return;
284            }
285        }
286
287        output.clear();
288        export_fn(&mut output);
289
290        if output.is_empty() {
291            continue;
292        }
293
294        let output_bytes = output.as_bytes();
295        batch.clear();
296
297        let mut total_sent = 0usize;
298        let mut batch_count = 0usize;
299        let mut metric_count = 0usize;
300        let mut start = 0usize;
301
302        for nl in memchr::memchr_iter(b'\n', output_bytes) {
303            let end = nl + 1;
304            let line = &output_bytes[start..end];
305            let line_len = line.len();
306            metric_count += 1;
307
308            if line_len > max_packet_size {
309                log::warn!(
310                    "Dropping oversized metric line ({line_len} bytes, max {max_packet_size})"
311                );
312                start = end;
313                continue;
314            }
315
316            if !batch.is_empty() && batch.len() + line_len > max_packet_size {
317                if let Some(n) = send_monoio_batch(&socket, &mut batch, "DogStatsD batch").await {
318                    total_sent += n;
319                    batch_count += 1;
320                }
321            }
322
323            batch.extend_from_slice(line);
324            start = end;
325        }
326
327        if start < output_bytes.len() {
328            let line = &output_bytes[start..];
329            let line_len = line.len();
330            metric_count += 1;
331
332            if line_len <= max_packet_size {
333                if !batch.is_empty() && batch.len() + line_len > max_packet_size {
334                    if let Some(n) = send_monoio_batch(&socket, &mut batch, "DogStatsD batch").await
335                    {
336                        total_sent += n;
337                        batch_count += 1;
338                    }
339                }
340                batch.extend_from_slice(line);
341            } else {
342                log::warn!("Dropping oversized trailing metric ({line_len} bytes)");
343            }
344        }
345
346        if !batch.is_empty()
347            && let Some(n) = send_monoio_batch(&socket, &mut batch, "final DogStatsD batch").await
348        {
349            total_sent += n;
350            batch_count += 1;
351        }
352
353        log::debug!(
354            "monoio DogStatsD export: {metric_count} metrics, {batch_count} batches, {total_sent} bytes"
355        );
356    }
357}
358
359#[cfg(feature = "monoio")]
360async fn send_monoio_batch(
361    socket: &monoio::net::udp::UdpSocket,
362    batch: &mut Vec<u8>,
363    context: &str,
364) -> Option<usize> {
365    let send_buf = std::mem::take(batch);
366    let (result, mut send_buf) = socket.send(send_buf).await;
367    send_buf.clear();
368    *batch = send_buf;
369
370    match result {
371        Ok(n) => Some(n),
372        Err(e) => {
373            log::warn!("Failed to send monoio {context}: {e}");
374            None
375        }
376    }
377}