fast_telemetry_export/
dogstatsd.rs1use std::time::Duration;
11
12#[derive(Clone)]
14pub struct DogStatsDConfig {
15 pub endpoint: String,
17 pub interval: Duration,
19 pub max_packet_size: usize,
21}
22
23impl Default for DogStatsDConfig {
24 fn default() -> Self {
25 Self {
26 endpoint: "127.0.0.1:8125".to_string(),
27 interval: Duration::from_secs(10),
28 max_packet_size: 8000,
29 }
30 }
31}
32
33impl DogStatsDConfig {
34 pub fn new(endpoint: impl Into<String>) -> Self {
35 Self {
36 endpoint: endpoint.into(),
37 ..Default::default()
38 }
39 }
40
41 pub fn with_interval(mut self, interval: Duration) -> Self {
42 self.interval = interval;
43 self
44 }
45
46 pub fn with_max_packet_size(mut self, size: usize) -> Self {
47 self.max_packet_size = size;
48 self
49 }
50}
51
52pub async fn run<F>(
85 config: DogStatsDConfig,
86 cancel: tokio_util::sync::CancellationToken,
87 mut export_fn: F,
88) where
89 F: FnMut(&mut String),
90{
91 use tokio::net::UdpSocket;
92 use tokio::time::MissedTickBehavior;
93
94 log::info!("Starting DogStatsD exporter, endpoint={}", config.endpoint);
95
96 let socket = match UdpSocket::bind("0.0.0.0:0").await {
97 Ok(s) => s,
98 Err(e) => {
99 log::error!("Failed to bind UDP socket for DogStatsD export: {e}");
100 return;
101 }
102 };
103
104 if let Err(e) = socket.connect(&config.endpoint).await {
105 log::error!("Failed to connect UDP socket to {}: {e}", config.endpoint);
106 return;
107 }
108
109 let max_packet_size = config.max_packet_size;
110 let mut output = String::with_capacity(16384);
111 let mut batch = Vec::<u8>::with_capacity(max_packet_size);
112
113 let mut interval = tokio::time::interval(config.interval);
114 interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
115 interval.tick().await;
116
117 loop {
118 tokio::select! {
119 _ = interval.tick() => {}
120 _ = cancel.cancelled() => {
121 log::info!("DogStatsD exporter shutting down");
122 return;
123 }
124 }
125
126 output.clear();
127 export_fn(&mut output);
128
129 if output.is_empty() {
130 continue;
131 }
132
133 let output_bytes = output.as_bytes();
134 batch.clear();
135
136 let mut total_sent = 0usize;
137 let mut batch_count = 0usize;
138 let mut metric_count = 0usize;
139 let mut start = 0usize;
140
141 for nl in memchr::memchr_iter(b'\n', output_bytes) {
142 let end = nl + 1;
143 let line = &output_bytes[start..end];
144 let line_len = line.len();
145 metric_count += 1;
146
147 if line_len > max_packet_size {
148 log::warn!(
149 "Dropping oversized metric line ({line_len} bytes, max {max_packet_size})"
150 );
151 start = end;
152 continue;
153 }
154
155 if !batch.is_empty() && batch.len() + line_len > max_packet_size {
156 match socket.send(&batch).await {
157 Ok(n) => {
158 total_sent += n;
159 batch_count += 1;
160 }
161 Err(e) => log::warn!("Failed to send DogStatsD batch: {e}"),
162 }
163 batch.clear();
164 }
165
166 batch.extend_from_slice(line);
167 start = end;
168 }
169
170 if start < output_bytes.len() {
172 let line = &output_bytes[start..];
173 let line_len = line.len();
174 metric_count += 1;
175
176 if line_len <= max_packet_size {
177 if !batch.is_empty() && batch.len() + line_len > max_packet_size {
178 match socket.send(&batch).await {
179 Ok(n) => {
180 total_sent += n;
181 batch_count += 1;
182 }
183 Err(e) => log::warn!("Failed to send DogStatsD batch: {e}"),
184 }
185 batch.clear();
186 }
187 batch.extend_from_slice(line);
188 } else {
189 log::warn!("Dropping oversized trailing metric ({line_len} bytes)");
190 }
191 }
192
193 if !batch.is_empty() {
194 match socket.send(&batch).await {
195 Ok(n) => {
196 total_sent += n;
197 batch_count += 1;
198 }
199 Err(e) => log::warn!("Failed to send final DogStatsD batch: {e}"),
200 }
201 }
202
203 log::debug!(
204 "DogStatsD export: {metric_count} metrics, {batch_count} batches, {total_sent} bytes"
205 );
206 }
207}