1#![deny(missing_docs)]
4#![cfg_attr(not(test), deny(clippy::panic))]
5#![cfg_attr(not(test), deny(clippy::unwrap_used))]
6#![cfg_attr(not(test), deny(clippy::expect_used))]
7#![cfg_attr(not(test), deny(clippy::todo))]
8#![cfg_attr(not(test), deny(clippy::unimplemented))]
9
10use libdd_common::tag::Tag;
14use libdd_common::Endpoint;
15use serde::{Deserialize, Serialize};
16use std::fmt::Debug;
17use tracing::error;
18
19use anyhow::anyhow;
20use cadence::prelude::*;
21#[cfg(unix)]
22use cadence::UnixMetricSink;
23use cadence::{Metric, MetricBuilder, QueuingMetricSink, StatsdClient, UdpMetricSink};
24#[cfg(unix)]
25use libdd_common::connector::uds::socket_path_from_uri;
26use std::net::{ToSocketAddrs, UdpSocket};
27#[cfg(unix)]
28use std::os::unix::net::UnixDatagram;
29use std::sync::{Arc, Mutex};
30
31const QUEUE_SIZE: usize = 32 * 1024;
33
34#[derive(Debug, Serialize, Deserialize)]
43pub enum DogStatsDActionOwned {
44 #[allow(missing_docs)]
45 Count(String, i64, Vec<Tag>),
46 #[allow(missing_docs)]
47 Distribution(String, f64, Vec<Tag>),
48 #[allow(missing_docs)]
49 Gauge(String, f64, Vec<Tag>),
50 #[allow(missing_docs)]
51 Histogram(String, f64, Vec<Tag>),
52 Set(String, i64, Vec<Tag>),
56}
57
58#[derive(Debug, Serialize, Deserialize)]
60pub enum DogStatsDAction<'a, T: AsRef<str>, V: IntoIterator<Item = &'a Tag>> {
61 #[allow(missing_docs)]
63 Count(T, i64, V),
64 #[allow(missing_docs)]
65 Distribution(T, f64, V),
66 #[allow(missing_docs)]
67 Gauge(T, f64, V),
68 #[allow(missing_docs)]
69 Histogram(T, f64, V),
70 Set(T, i64, V),
74}
75
76#[derive(Debug, Default)]
78pub struct Client {
79 client: Mutex<Arc<Option<StatsdClient>>>,
80 endpoint: Option<Endpoint>,
81}
82
83pub fn new(endpoint: Endpoint) -> anyhow::Result<Client> {
86 Ok(Client {
89 endpoint: Some(endpoint),
90 ..Default::default()
91 })
92}
93
94impl Client {
95 pub fn send_owned(&self, actions: Vec<DogStatsDActionOwned>) {
98 let client_opt = match self.get_or_init_client() {
99 Ok(client) => client,
100 Err(e) => {
101 error!(?e, "Failed to get client");
102 return;
103 }
104 };
105
106 if let Some(client) = &*client_opt {
107 for action in actions {
108 if let Err(err) = match action {
109 DogStatsDActionOwned::Count(metric, value, tags) => {
110 do_send(client.count_with_tags(metric.as_ref(), value), &tags)
111 }
112 DogStatsDActionOwned::Distribution(metric, value, tags) => {
113 do_send(client.distribution_with_tags(metric.as_ref(), value), &tags)
114 }
115 DogStatsDActionOwned::Gauge(metric, value, tags) => {
116 do_send(client.gauge_with_tags(metric.as_ref(), value), &tags)
117 }
118 DogStatsDActionOwned::Histogram(metric, value, tags) => {
119 do_send(client.histogram_with_tags(metric.as_ref(), value), &tags)
120 }
121 DogStatsDActionOwned::Set(metric, value, tags) => {
122 do_send(client.set_with_tags(metric.as_ref(), value), &tags)
123 }
124 } {
125 error!(?err, "Error while sending metric");
126 }
127 }
128 }
129 }
130
131 pub fn send<'a, T: AsRef<str>, V: IntoIterator<Item = &'a Tag>>(
134 &self,
135 actions: Vec<DogStatsDAction<'a, T, V>>,
136 ) {
137 let client_opt = match self.get_or_init_client() {
138 Ok(client) => client,
139 Err(e) => {
140 error!(?e, "Failed to get client");
141 return;
142 }
143 };
144 if let Some(client) = &*client_opt {
145 for action in actions {
146 if let Err(err) = match action {
147 DogStatsDAction::Count(metric, value, tags) => {
148 let metric_builder = client.count_with_tags(metric.as_ref(), value);
149 do_send(metric_builder, tags)
150 }
151 DogStatsDAction::Distribution(metric, value, tags) => {
152 do_send(client.distribution_with_tags(metric.as_ref(), value), tags)
153 }
154 DogStatsDAction::Gauge(metric, value, tags) => {
155 do_send(client.gauge_with_tags(metric.as_ref(), value), tags)
156 }
157 DogStatsDAction::Histogram(metric, value, tags) => {
158 do_send(client.histogram_with_tags(metric.as_ref(), value), tags)
159 }
160 DogStatsDAction::Set(metric, value, tags) => {
161 do_send(client.set_with_tags(metric.as_ref(), value), tags)
162 }
163 } {
164 error!(?err, "Error while sending metric");
165 }
166 }
167 }
168 }
169
170 fn get_or_init_client(&self) -> anyhow::Result<Arc<Option<StatsdClient>>> {
171 if let Some(endpoint) = &self.endpoint {
172 let mut client_guard = self
173 .client
174 .lock()
175 .map_err(|e| anyhow!("Failed to acquire dogstatsd client lock: {e}"))?;
176 return if client_guard.is_some() {
177 Ok(client_guard.clone())
178 } else {
179 let client = Arc::new(Some(create_client(endpoint)?));
180 *client_guard = client.clone();
181 Ok(client)
182 };
183 }
184
185 Ok(None.into())
186 }
187}
188
189fn do_send<'m, 't, T, V: IntoIterator<Item = &'t Tag>>(
190 mut builder: MetricBuilder<'m, '_, T>,
191 tags: V,
192) -> anyhow::Result<()>
193where
194 T: Metric + From<String>,
195 't: 'm,
196{
197 let mut tags_iter = tags.into_iter();
198 let mut tag_opt = tags_iter.next();
199 #[allow(clippy::unwrap_used)]
200 while tag_opt.is_some() {
201 builder = builder.with_tag_value(tag_opt.unwrap().as_ref());
202 tag_opt = tags_iter.next();
203 }
204 builder.try_send()?;
205 Ok(())
206}
207
208fn create_client(endpoint: &Endpoint) -> anyhow::Result<StatsdClient> {
209 match endpoint.url.scheme_str() {
210 #[cfg(unix)]
211 Some("unix") => {
212 let socket = UnixDatagram::unbound()
213 .map_err(|e| anyhow!("failed to make unbound unix port: {}", e))?;
214 socket
215 .set_nonblocking(true)
216 .map_err(|e| anyhow!("failed to set socket to nonblocking: {}", e))?;
217 let sink = QueuingMetricSink::with_capacity(
218 UnixMetricSink::from(
219 socket_path_from_uri(&endpoint.url)
220 .map_err(|e| anyhow!("failed to build socket path from uri: {}", e))?,
221 socket,
222 ),
223 QUEUE_SIZE,
224 );
225
226 Ok(StatsdClient::from_sink("", sink))
227 }
228 _ => {
229 let host = endpoint.url.host().ok_or(anyhow!("invalid host"))?;
230 let port = endpoint.url.port().ok_or(anyhow!("invalid port"))?.as_u16();
231
232 let server_address = (host, port)
233 .to_socket_addrs()?
234 .next()
235 .ok_or(anyhow!("invalid address"))?;
236
237 let socket = if server_address.is_ipv4() {
238 UdpSocket::bind("0.0.0.0:0")
239 .map_err(|e| anyhow!("failed to bind to 0.0.0.0:0: {}", e))?
240 } else {
241 UdpSocket::bind("[::]:0").map_err(|e| anyhow!("failed to bind to [::]:0: {}", e))?
242 };
243 socket.set_nonblocking(true)?;
244
245 let sink = QueuingMetricSink::with_capacity(
246 UdpMetricSink::from((host, port), socket)
247 .map_err(|e| anyhow!("failed to build UdpMetricSink: {}", e))?,
248 QUEUE_SIZE,
249 );
250
251 Ok(StatsdClient::from_sink("", sink))
252 }
253 }
254}
255
256#[cfg(test)]
257mod test {
258 use crate::DogStatsDAction::{Count, Distribution, Gauge, Histogram, Set};
259 use crate::{create_client, new, DogStatsDActionOwned};
260 #[cfg(unix)]
261 use http::Uri;
262 #[cfg(unix)]
263 use libdd_common::connector::uds::socket_path_to_uri;
264 use libdd_common::{tag, Endpoint};
265 use std::net;
266 use std::sync::Arc;
267 use std::time::Duration;
268
269 #[test]
270 #[cfg_attr(miri, ignore)]
271 fn test_flusher() {
272 let socket = net::UdpSocket::bind("127.0.0.1:0").expect("failed to bind host socket");
273 let _ = socket.set_read_timeout(Some(Duration::from_millis(500)));
274
275 let flusher = new(Endpoint::from_slice(
276 socket.local_addr().unwrap().to_string().as_str(),
277 ))
278 .unwrap();
279 flusher.send(vec![
280 Count("test_count", 3, &vec![tag!("foo", "bar")]),
281 Count("test_neg_count", -2, &vec![]),
282 Distribution("test_distribution", 4.2, &vec![]),
283 Gauge("test_gauge", 7.6, &vec![]),
284 Histogram("test_histogram", 8.0, &vec![]),
285 Set("test_set", 9, &vec![tag!("the", "end")]),
286 Set("test_neg_set", -1, &vec![]),
287 ]);
288
289 fn read(socket: &net::UdpSocket) -> String {
290 let mut buf = [0; 100];
291 socket.recv(&mut buf).expect("No data");
292 let datagram = String::from_utf8_lossy(buf.strip_suffix(&[0]).unwrap());
293 datagram.trim_matches(char::from(0)).to_string()
294 }
295
296 assert_eq!("test_count:3|c|#foo:bar", read(&socket));
297 assert_eq!("test_neg_count:-2|c", read(&socket));
298 assert_eq!("test_distribution:4.2|d", read(&socket));
299 assert_eq!("test_gauge:7.6|g", read(&socket));
300 assert_eq!("test_histogram:8|h", read(&socket));
301 assert_eq!("test_set:9|s|#the:end", read(&socket));
302 assert_eq!("test_neg_set:-1|s", read(&socket));
303 }
304
305 #[test]
306 #[cfg_attr(miri, ignore)]
307 fn test_create_client_udp() {
308 let res = create_client(&Endpoint::default());
309 assert!(res.is_err());
310 assert_eq!("invalid host", res.unwrap_err().to_string().as_str());
311
312 let res = create_client(&Endpoint::from_slice("localhost:99999"));
313 assert!(res.is_err());
314 assert_eq!("invalid port", res.unwrap_err().to_string().as_str());
315
316 let res = create_client(&Endpoint::from_slice("localhost:80"));
317 assert!(res.is_ok());
318
319 let res = create_client(&Endpoint::from_slice("http://localhost:80"));
320 assert!(res.is_ok());
321 }
322
323 #[test]
324 #[cfg(unix)]
325 #[cfg_attr(miri, ignore)]
326 fn test_create_client_unix_domain_socket() {
327 let res = create_client(&Endpoint::from_url(
328 "unix://localhost:80".parse::<Uri>().unwrap(),
329 ));
330 assert!(res.is_err());
331 assert_eq!(
332 "failed to build socket path from uri: invalid url",
333 res.unwrap_err().to_string().as_str()
334 );
335
336 let res = create_client(&Endpoint::from_url(
337 socket_path_to_uri("/path/to/a/socket.sock".as_ref()).unwrap(),
338 ));
339 assert!(res.is_ok());
340 }
341
342 #[test]
343 fn test_owned_sync() {
344 let owned_act = DogStatsDActionOwned::Count("test".to_string(), 1, vec![]);
348 match owned_act {
349 DogStatsDActionOwned::Count(_, _, _) => {}
350 DogStatsDActionOwned::Distribution(_, _, _) => {}
351 DogStatsDActionOwned::Gauge(_, _, _) => {}
352 DogStatsDActionOwned::Histogram(_, _, _) => {}
353 DogStatsDActionOwned::Set(_, _, _) => {}
354 }
355
356 let act = Count("test".to_string(), 1, vec![]);
357 match act {
358 Count(_, _, _) => {}
359 Distribution(_, _, _) => {}
360 Gauge(_, _, _) => {}
361 Histogram(_, _, _) => {}
362 Set(_, _, _) => {}
363 }
364 }
371
372 #[tokio::test]
373 #[cfg_attr(miri, ignore)]
374 async fn test_thread_safety() {
375 let socket = net::UdpSocket::bind("127.0.0.1:0").expect("failed to bind host socket");
376 let _ = socket.set_read_timeout(Some(Duration::from_millis(500)));
377 let endpoint = Endpoint::from_slice(socket.local_addr().unwrap().to_string().as_str());
378 let flusher = Arc::new(new(endpoint.clone()).unwrap());
379
380 {
381 let client = flusher
382 .client
383 .lock()
384 .expect("failed to obtain lock on client");
385 assert!(client.is_none());
386 }
387
388 let tasks: Vec<_> = (0..10)
389 .map(|_| {
390 let flusher_clone = Arc::clone(&flusher);
391 tokio::spawn(async move {
392 flusher_clone.send(vec![
393 Count("test_count", 3, &vec![tag!("foo", "bar")]),
394 Count("test_neg_count", -2, &vec![]),
395 Distribution("test_distribution", 4.2, &vec![]),
396 Gauge("test_gauge", 7.6, &vec![]),
397 Histogram("test_histogram", 8.0, &vec![]),
398 Set("test_set", 9, &vec![tag!("the", "end")]),
399 Set("test_neg_set", -1, &vec![]),
400 ]);
401
402 let client = flusher_clone
403 .client
404 .lock()
405 .expect("failed to obtain lock on client within send thread");
406 assert!(client.is_some());
407 })
408 })
409 .collect();
410
411 for task in tasks {
412 task.await.unwrap();
413 }
414 }
415}