fast_telemetry_export/
dogstatsd.rs1use std::time::Duration;
11
12#[derive(Clone)]
14pub struct DogStatsDConfig {
15 pub endpoint: String,
17 pub interval: Duration,
19 pub max_packet_size: usize,
21}
22
23impl Default for DogStatsDConfig {
24 fn default() -> Self {
25 Self {
26 endpoint: "127.0.0.1:8125".to_string(),
27 interval: Duration::from_secs(10),
28 max_packet_size: 8000,
29 }
30 }
31}
32
33impl DogStatsDConfig {
34 pub fn new(endpoint: impl Into<String>) -> Self {
35 Self {
36 endpoint: endpoint.into(),
37 ..Default::default()
38 }
39 }
40
41 pub fn with_interval(mut self, interval: Duration) -> Self {
42 self.interval = interval;
43 self
44 }
45
46 pub fn with_max_packet_size(mut self, size: usize) -> Self {
47 self.max_packet_size = size;
48 self
49 }
50}
51
52pub async fn run<F>(
85 config: DogStatsDConfig,
86 cancel: tokio_util::sync::CancellationToken,
87 mut export_fn: F,
88) where
89 F: FnMut(&mut String),
90{
91 use tokio::net::UdpSocket;
92 use tokio::time::MissedTickBehavior;
93
94 log::info!("Starting DogStatsD exporter, endpoint={}", config.endpoint);
95
96 let socket = match UdpSocket::bind("0.0.0.0:0").await {
97 Ok(s) => s,
98 Err(e) => {
99 log::error!("Failed to bind UDP socket for DogStatsD export: {e}");
100 return;
101 }
102 };
103
104 if let Err(e) = socket.connect(&config.endpoint).await {
105 log::error!("Failed to connect UDP socket to {}: {e}", config.endpoint);
106 return;
107 }
108
109 let max_packet_size = config.max_packet_size;
110 let mut output = String::with_capacity(16384);
111 let mut batch = Vec::<u8>::with_capacity(max_packet_size);
112
113 let mut interval = tokio::time::interval(config.interval);
114 interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
115 interval.tick().await;
116
117 loop {
118 tokio::select! {
119 _ = interval.tick() => {}
120 _ = cancel.cancelled() => {
121 log::info!("DogStatsD exporter shutting down");
122 return;
123 }
124 }
125
126 output.clear();
127 export_fn(&mut output);
128
129 if output.is_empty() {
130 continue;
131 }
132
133 let output_bytes = output.as_bytes();
134 batch.clear();
135
136 let mut total_sent = 0usize;
137 let mut batch_count = 0usize;
138 let mut metric_count = 0usize;
139 let mut start = 0usize;
140
141 for nl in memchr::memchr_iter(b'\n', output_bytes) {
142 let end = nl + 1;
143 let line = &output_bytes[start..end];
144 let line_len = line.len();
145 metric_count += 1;
146
147 if line_len > max_packet_size {
148 log::warn!(
149 "Dropping oversized metric line ({line_len} bytes, max {max_packet_size})"
150 );
151 start = end;
152 continue;
153 }
154
155 if !batch.is_empty() && batch.len() + line_len > max_packet_size {
156 match socket.send(&batch).await {
157 Ok(n) => {
158 total_sent += n;
159 batch_count += 1;
160 }
161 Err(e) => log::warn!("Failed to send DogStatsD batch: {e}"),
162 }
163 batch.clear();
164 }
165
166 batch.extend_from_slice(line);
167 start = end;
168 }
169
170 if start < output_bytes.len() {
172 let line = &output_bytes[start..];
173 let line_len = line.len();
174 metric_count += 1;
175
176 if line_len <= max_packet_size {
177 if !batch.is_empty() && batch.len() + line_len > max_packet_size {
178 match socket.send(&batch).await {
179 Ok(n) => {
180 total_sent += n;
181 batch_count += 1;
182 }
183 Err(e) => log::warn!("Failed to send DogStatsD batch: {e}"),
184 }
185 batch.clear();
186 }
187 batch.extend_from_slice(line);
188 } else {
189 log::warn!("Dropping oversized trailing metric ({line_len} bytes)");
190 }
191 }
192
193 if !batch.is_empty() {
194 match socket.send(&batch).await {
195 Ok(n) => {
196 total_sent += n;
197 batch_count += 1;
198 }
199 Err(e) => log::warn!("Failed to send final DogStatsD batch: {e}"),
200 }
201 }
202
203 log::debug!(
204 "DogStatsD export: {metric_count} metrics, {batch_count} batches, {total_sent} bytes"
205 );
206 }
207}
208
209#[cfg(feature = "monoio")]
215pub async fn run_monoio<F>(
216 config: DogStatsDConfig,
217 cancel: tokio_util::sync::CancellationToken,
218 mut export_fn: F,
219) where
220 F: FnMut(&mut String),
221{
222 use std::net::{SocketAddr, ToSocketAddrs};
223
224 use monoio::net::udp::UdpSocket;
225 use monoio::time::MissedTickBehavior;
226
227 log::info!(
228 "Starting monoio DogStatsD exporter, endpoint={}",
229 config.endpoint
230 );
231
232 let endpoint = match config.endpoint.to_socket_addrs() {
233 Ok(mut addrs) => match addrs.next() {
234 Some(addr) => addr,
235 None => {
236 log::error!("DogStatsD endpoint resolved to no addresses");
237 return;
238 }
239 },
240 Err(e) => {
241 log::error!(
242 "Failed to resolve DogStatsD endpoint {}: {e}",
243 config.endpoint
244 );
245 return;
246 }
247 };
248
249 let bind_addr: SocketAddr = if endpoint.is_ipv4() {
250 "0.0.0.0:0"
251 } else {
252 "[::]:0"
253 }
254 .parse()
255 .expect("valid UDP bind address");
256
257 let socket = match UdpSocket::bind(bind_addr) {
258 Ok(s) => s,
259 Err(e) => {
260 log::error!("Failed to bind monoio UDP socket for DogStatsD export: {e}");
261 return;
262 }
263 };
264
265 if let Err(e) = socket.connect(endpoint).await {
266 log::error!("Failed to connect monoio UDP socket to {endpoint}: {e}");
267 return;
268 }
269
270 let max_packet_size = config.max_packet_size;
271 let mut output = String::with_capacity(16384);
272 let mut batch = Vec::<u8>::with_capacity(max_packet_size);
273
274 let mut interval = monoio::time::interval(config.interval);
275 interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
276 interval.tick().await;
277
278 loop {
279 monoio::select! {
280 _ = interval.tick() => {}
281 _ = cancel.cancelled() => {
282 log::info!("monoio DogStatsD exporter shutting down");
283 return;
284 }
285 }
286
287 output.clear();
288 export_fn(&mut output);
289
290 if output.is_empty() {
291 continue;
292 }
293
294 let output_bytes = output.as_bytes();
295 batch.clear();
296
297 let mut total_sent = 0usize;
298 let mut batch_count = 0usize;
299 let mut metric_count = 0usize;
300 let mut start = 0usize;
301
302 for nl in memchr::memchr_iter(b'\n', output_bytes) {
303 let end = nl + 1;
304 let line = &output_bytes[start..end];
305 let line_len = line.len();
306 metric_count += 1;
307
308 if line_len > max_packet_size {
309 log::warn!(
310 "Dropping oversized metric line ({line_len} bytes, max {max_packet_size})"
311 );
312 start = end;
313 continue;
314 }
315
316 if !batch.is_empty() && batch.len() + line_len > max_packet_size {
317 if let Some(n) = send_monoio_batch(&socket, &mut batch, "DogStatsD batch").await {
318 total_sent += n;
319 batch_count += 1;
320 }
321 }
322
323 batch.extend_from_slice(line);
324 start = end;
325 }
326
327 if start < output_bytes.len() {
328 let line = &output_bytes[start..];
329 let line_len = line.len();
330 metric_count += 1;
331
332 if line_len <= max_packet_size {
333 if !batch.is_empty() && batch.len() + line_len > max_packet_size {
334 if let Some(n) = send_monoio_batch(&socket, &mut batch, "DogStatsD batch").await
335 {
336 total_sent += n;
337 batch_count += 1;
338 }
339 }
340 batch.extend_from_slice(line);
341 } else {
342 log::warn!("Dropping oversized trailing metric ({line_len} bytes)");
343 }
344 }
345
346 if !batch.is_empty()
347 && let Some(n) = send_monoio_batch(&socket, &mut batch, "final DogStatsD batch").await
348 {
349 total_sent += n;
350 batch_count += 1;
351 }
352
353 log::debug!(
354 "monoio DogStatsD export: {metric_count} metrics, {batch_count} batches, {total_sent} bytes"
355 );
356 }
357}
358
359#[cfg(feature = "monoio")]
360async fn send_monoio_batch(
361 socket: &monoio::net::udp::UdpSocket,
362 batch: &mut Vec<u8>,
363 context: &str,
364) -> Option<usize> {
365 let send_buf = std::mem::take(batch);
366 let (result, mut send_buf) = socket.send(send_buf).await;
367 send_buf.clear();
368 *batch = send_buf;
369
370 match result {
371 Ok(n) => Some(n),
372 Err(e) => {
373 log::warn!("Failed to send monoio {context}: {e}");
374 None
375 }
376 }
377}