Skip to main content

libdd_dogstatsd_client/
lib.rs

1// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
2// SPDX-License-Identifier: Apache-2.0
3#![deny(missing_docs)]
4#![cfg_attr(not(test), deny(clippy::panic))]
5#![cfg_attr(not(test), deny(clippy::unwrap_used))]
6#![cfg_attr(not(test), deny(clippy::expect_used))]
7#![cfg_attr(not(test), deny(clippy::todo))]
8#![cfg_attr(not(test), deny(clippy::unimplemented))]
9
10//! dogstatsd-client implements a client to emit metrics to a dogstatsd server.
11//! This is made use of in at least the data-pipeline and sidecar crates.
12
13use libdd_common::tag::Tag;
14use libdd_common::Endpoint;
15use serde::{Deserialize, Serialize};
16use std::fmt::Debug;
17use tracing::error;
18
19use anyhow::anyhow;
20use cadence::prelude::*;
21#[cfg(unix)]
22use cadence::UnixMetricSink;
23use cadence::{Metric, MetricBuilder, QueuingMetricSink, StatsdClient, UdpMetricSink};
24#[cfg(unix)]
25use libdd_common::connector::uds::socket_path_from_uri;
26use std::net::{ToSocketAddrs, UdpSocket};
27#[cfg(unix)]
28use std::os::unix::net::UnixDatagram;
29use std::sync::{Arc, Mutex};
30
31// Queue with a maximum capacity of 32K elements
32const QUEUE_SIZE: usize = 32 * 1024;
33
34/// The `DogStatsDActionOwned` enum gathers the metric types that can be sent to the DogStatsD
35/// server. This type takes ownership of the relevant data to support the sidecar better.
36/// For documentation on the dogstatsd metric types: https://docs.datadoghq.com/metrics/types/?tab=count#metric-types
37///
38/// Originally I attempted to combine this type with `DogStatsDAction` but this GREATLY complicates
39/// the types to the point of insanity. I was unable to come up with a satisfactory approach that
40/// allows both the data-pipeline and sidecar crates to use the same type. If a future rustacean
41/// wants to take a stab and open a PR please do so!
42#[derive(Debug, Serialize, Deserialize)]
43pub enum DogStatsDActionOwned {
44    #[allow(missing_docs)]
45    Count(String, i64, Vec<Tag>),
46    #[allow(missing_docs)]
47    Distribution(String, f64, Vec<Tag>),
48    #[allow(missing_docs)]
49    Gauge(String, f64, Vec<Tag>),
50    #[allow(missing_docs)]
51    Histogram(String, f64, Vec<Tag>),
52    /// Cadence only support i64 type as value
53    /// but Golang implementation uses string (https://github.com/DataDog/datadog-go/blob/331d24832f7eac97b091efd696278fe2c4192b29/statsd/statsd.go#L230)
54    /// and PHP implementation uses float or string (https://github.com/DataDog/php-datadogstatsd/blob/0efdd1c38f6d3dd407efbb899ad1fd2e5cd18085/src/DogStatsd.php#L251)
55    Set(String, i64, Vec<Tag>),
56}
57
58/// The `DogStatsDAction` enum gathers the metric types that can be sent to the DogStatsD server.
59#[derive(Debug, Serialize, Deserialize)]
60pub enum DogStatsDAction<'a, T: AsRef<str>, V: IntoIterator<Item = &'a Tag>> {
61    // TODO: instead of AsRef<str> we can accept a marker Trait that users of this crate implement
62    #[allow(missing_docs)]
63    Count(T, i64, V),
64    #[allow(missing_docs)]
65    Distribution(T, f64, V),
66    #[allow(missing_docs)]
67    Gauge(T, f64, V),
68    #[allow(missing_docs)]
69    Histogram(T, f64, V),
70    /// Cadence only support i64 type as value
71    /// but Golang implementation uses string (https://github.com/DataDog/datadog-go/blob/331d24832f7eac97b091efd696278fe2c4192b29/statsd/statsd.go#L230)
72    /// and PHP implementation uses float or string (https://github.com/DataDog/php-datadogstatsd/blob/0efdd1c38f6d3dd407efbb899ad1fd2e5cd18085/src/DogStatsd.php#L251)
73    Set(T, i64, V),
74}
75
76/// A dogstatsd-client that flushes stats to a given endpoint.
77#[derive(Debug, Default)]
78pub struct Client {
79    client: Mutex<Arc<Option<StatsdClient>>>,
80    endpoint: Option<Endpoint>,
81}
82
83/// Build a new flusher instance pointed at the provided endpoint.
84/// Returns error if the provided endpoint is not valid.
85pub fn new(endpoint: Endpoint) -> anyhow::Result<Client> {
86    // defer initialization of the client until the first metric is sent and we definitely know the
87    // client is going to be used to communicate with the endpoint.
88    Ok(Client {
89        endpoint: Some(endpoint),
90        ..Default::default()
91    })
92}
93
94impl Client {
95    /// Send a vector of DogStatsDActionOwned, this is the same as `send` except it uses the "owned"
96    /// version of DogStatsDAction. See the docs for DogStatsDActionOwned for details.
97    pub fn send_owned(&self, actions: Vec<DogStatsDActionOwned>) {
98        let client_opt = match self.get_or_init_client() {
99            Ok(client) => client,
100            Err(e) => {
101                error!(?e, "Failed to get client");
102                return;
103            }
104        };
105
106        if let Some(client) = &*client_opt {
107            for action in actions {
108                if let Err(err) = match action {
109                    DogStatsDActionOwned::Count(metric, value, tags) => {
110                        do_send(client.count_with_tags(metric.as_ref(), value), &tags)
111                    }
112                    DogStatsDActionOwned::Distribution(metric, value, tags) => {
113                        do_send(client.distribution_with_tags(metric.as_ref(), value), &tags)
114                    }
115                    DogStatsDActionOwned::Gauge(metric, value, tags) => {
116                        do_send(client.gauge_with_tags(metric.as_ref(), value), &tags)
117                    }
118                    DogStatsDActionOwned::Histogram(metric, value, tags) => {
119                        do_send(client.histogram_with_tags(metric.as_ref(), value), &tags)
120                    }
121                    DogStatsDActionOwned::Set(metric, value, tags) => {
122                        do_send(client.set_with_tags(metric.as_ref(), value), &tags)
123                    }
124                } {
125                    error!(?err, "Error while sending metric");
126                }
127            }
128        }
129    }
130
131    /// Send a vector of DogStatsDAction, this is the same as `send_owned` except it only borrows
132    /// the provided values.See the docs for DogStatsDActionOwned for details.
133    pub fn send<'a, T: AsRef<str>, V: IntoIterator<Item = &'a Tag>>(
134        &self,
135        actions: Vec<DogStatsDAction<'a, T, V>>,
136    ) {
137        let client_opt = match self.get_or_init_client() {
138            Ok(client) => client,
139            Err(e) => {
140                error!(?e, "Failed to get client");
141                return;
142            }
143        };
144        if let Some(client) = &*client_opt {
145            for action in actions {
146                if let Err(err) = match action {
147                    DogStatsDAction::Count(metric, value, tags) => {
148                        let metric_builder = client.count_with_tags(metric.as_ref(), value);
149                        do_send(metric_builder, tags)
150                    }
151                    DogStatsDAction::Distribution(metric, value, tags) => {
152                        do_send(client.distribution_with_tags(metric.as_ref(), value), tags)
153                    }
154                    DogStatsDAction::Gauge(metric, value, tags) => {
155                        do_send(client.gauge_with_tags(metric.as_ref(), value), tags)
156                    }
157                    DogStatsDAction::Histogram(metric, value, tags) => {
158                        do_send(client.histogram_with_tags(metric.as_ref(), value), tags)
159                    }
160                    DogStatsDAction::Set(metric, value, tags) => {
161                        do_send(client.set_with_tags(metric.as_ref(), value), tags)
162                    }
163                } {
164                    error!(?err, "Error while sending metric");
165                }
166            }
167        }
168    }
169
170    fn get_or_init_client(&self) -> anyhow::Result<Arc<Option<StatsdClient>>> {
171        if let Some(endpoint) = &self.endpoint {
172            let mut client_guard = self
173                .client
174                .lock()
175                .map_err(|e| anyhow!("Failed to acquire dogstatsd client lock: {e}"))?;
176            return if client_guard.is_some() {
177                Ok(client_guard.clone())
178            } else {
179                let client = Arc::new(Some(create_client(endpoint)?));
180                *client_guard = client.clone();
181                Ok(client)
182            };
183        }
184
185        Ok(None.into())
186    }
187}
188
189fn do_send<'m, 't, T, V: IntoIterator<Item = &'t Tag>>(
190    mut builder: MetricBuilder<'m, '_, T>,
191    tags: V,
192) -> anyhow::Result<()>
193where
194    T: Metric + From<String>,
195    't: 'm,
196{
197    let mut tags_iter = tags.into_iter();
198    let mut tag_opt = tags_iter.next();
199    #[allow(clippy::unwrap_used)]
200    while tag_opt.is_some() {
201        builder = builder.with_tag_value(tag_opt.unwrap().as_ref());
202        tag_opt = tags_iter.next();
203    }
204    builder.try_send()?;
205    Ok(())
206}
207
208fn create_client(endpoint: &Endpoint) -> anyhow::Result<StatsdClient> {
209    match endpoint.url.scheme_str() {
210        #[cfg(unix)]
211        Some("unix") => {
212            let socket = UnixDatagram::unbound()
213                .map_err(|e| anyhow!("failed to make unbound unix port: {}", e))?;
214            socket
215                .set_nonblocking(true)
216                .map_err(|e| anyhow!("failed to set socket to nonblocking: {}", e))?;
217            let sink = QueuingMetricSink::with_capacity(
218                UnixMetricSink::from(
219                    socket_path_from_uri(&endpoint.url)
220                        .map_err(|e| anyhow!("failed to build socket path from uri: {}", e))?,
221                    socket,
222                ),
223                QUEUE_SIZE,
224            );
225
226            Ok(StatsdClient::from_sink("", sink))
227        }
228        _ => {
229            let host = endpoint.url.host().ok_or(anyhow!("invalid host"))?;
230            let port = endpoint.url.port().ok_or(anyhow!("invalid port"))?.as_u16();
231
232            let server_address = (host, port)
233                .to_socket_addrs()?
234                .next()
235                .ok_or(anyhow!("invalid address"))?;
236
237            let socket = if server_address.is_ipv4() {
238                UdpSocket::bind("0.0.0.0:0")
239                    .map_err(|e| anyhow!("failed to bind to 0.0.0.0:0: {}", e))?
240            } else {
241                UdpSocket::bind("[::]:0").map_err(|e| anyhow!("failed to bind to [::]:0: {}", e))?
242            };
243            socket.set_nonblocking(true)?;
244
245            let sink = QueuingMetricSink::with_capacity(
246                UdpMetricSink::from((host, port), socket)
247                    .map_err(|e| anyhow!("failed to build UdpMetricSink: {}", e))?,
248                QUEUE_SIZE,
249            );
250
251            Ok(StatsdClient::from_sink("", sink))
252        }
253    }
254}
255
256#[cfg(test)]
257mod test {
258    use crate::DogStatsDAction::{Count, Distribution, Gauge, Histogram, Set};
259    use crate::{create_client, new, DogStatsDActionOwned};
260    #[cfg(unix)]
261    use http::Uri;
262    #[cfg(unix)]
263    use libdd_common::connector::uds::socket_path_to_uri;
264    use libdd_common::{tag, Endpoint};
265    use std::net;
266    use std::sync::Arc;
267    use std::time::Duration;
268
269    #[test]
270    #[cfg_attr(miri, ignore)]
271    fn test_flusher() {
272        let socket = net::UdpSocket::bind("127.0.0.1:0").expect("failed to bind host socket");
273        let _ = socket.set_read_timeout(Some(Duration::from_millis(500)));
274
275        let flusher = new(Endpoint::from_slice(
276            socket.local_addr().unwrap().to_string().as_str(),
277        ))
278        .unwrap();
279        flusher.send(vec![
280            Count("test_count", 3, &vec![tag!("foo", "bar")]),
281            Count("test_neg_count", -2, &vec![]),
282            Distribution("test_distribution", 4.2, &vec![]),
283            Gauge("test_gauge", 7.6, &vec![]),
284            Histogram("test_histogram", 8.0, &vec![]),
285            Set("test_set", 9, &vec![tag!("the", "end")]),
286            Set("test_neg_set", -1, &vec![]),
287        ]);
288
289        fn read(socket: &net::UdpSocket) -> String {
290            let mut buf = [0; 100];
291            socket.recv(&mut buf).expect("No data");
292            let datagram = String::from_utf8_lossy(buf.strip_suffix(&[0]).unwrap());
293            datagram.trim_matches(char::from(0)).to_string()
294        }
295
296        assert_eq!("test_count:3|c|#foo:bar", read(&socket));
297        assert_eq!("test_neg_count:-2|c", read(&socket));
298        assert_eq!("test_distribution:4.2|d", read(&socket));
299        assert_eq!("test_gauge:7.6|g", read(&socket));
300        assert_eq!("test_histogram:8|h", read(&socket));
301        assert_eq!("test_set:9|s|#the:end", read(&socket));
302        assert_eq!("test_neg_set:-1|s", read(&socket));
303    }
304
305    #[test]
306    #[cfg_attr(miri, ignore)]
307    fn test_create_client_udp() {
308        let res = create_client(&Endpoint::default());
309        assert!(res.is_err());
310        assert_eq!("invalid host", res.unwrap_err().to_string().as_str());
311
312        let res = create_client(&Endpoint::from_slice("localhost:99999"));
313        assert!(res.is_err());
314        assert_eq!("invalid port", res.unwrap_err().to_string().as_str());
315
316        let res = create_client(&Endpoint::from_slice("localhost:80"));
317        assert!(res.is_ok());
318
319        let res = create_client(&Endpoint::from_slice("http://localhost:80"));
320        assert!(res.is_ok());
321    }
322
323    #[test]
324    #[cfg(unix)]
325    #[cfg_attr(miri, ignore)]
326    fn test_create_client_unix_domain_socket() {
327        let res = create_client(&Endpoint::from_url(
328            "unix://localhost:80".parse::<Uri>().unwrap(),
329        ));
330        assert!(res.is_err());
331        assert_eq!(
332            "failed to build socket path from uri: invalid url",
333            res.unwrap_err().to_string().as_str()
334        );
335
336        let res = create_client(&Endpoint::from_url(
337            socket_path_to_uri("/path/to/a/socket.sock".as_ref()).unwrap(),
338        ));
339        assert!(res.is_ok());
340    }
341
342    #[test]
343    fn test_owned_sync() {
344        // This test ensures that if a new variant is added to either `DogStatsDActionOwned` or
345        // `DogStatsDAction` this test will NOT COMPILE to act as a reminder that BOTH locations
346        // must be updated.
347        let owned_act = DogStatsDActionOwned::Count("test".to_string(), 1, vec![]);
348        match owned_act {
349            DogStatsDActionOwned::Count(_, _, _) => {}
350            DogStatsDActionOwned::Distribution(_, _, _) => {}
351            DogStatsDActionOwned::Gauge(_, _, _) => {}
352            DogStatsDActionOwned::Histogram(_, _, _) => {}
353            DogStatsDActionOwned::Set(_, _, _) => {}
354        }
355
356        let act = Count("test".to_string(), 1, vec![]);
357        match act {
358            Count(_, _, _) => {}
359            Distribution(_, _, _) => {}
360            Gauge(_, _, _) => {}
361            Histogram(_, _, _) => {}
362            Set(_, _, _) => {}
363        }
364        // TODO: when std::mem::variant_count is in stable we can do this instead
365        // assert_eq!(
366        //     std::mem::variant_count::<DogStatsDActionOwned>(),
367        //     std::mem::variant_count::<DogStatsDAction<String, Vec<&Tag>>>(),
368        //     "DogStatsDActionOwned and DogStatsDAction should have the same number of variants,
369        // did you forget to update one?", );
370    }
371
372    #[tokio::test]
373    #[cfg_attr(miri, ignore)]
374    async fn test_thread_safety() {
375        let socket = net::UdpSocket::bind("127.0.0.1:0").expect("failed to bind host socket");
376        let _ = socket.set_read_timeout(Some(Duration::from_millis(500)));
377        let endpoint = Endpoint::from_slice(socket.local_addr().unwrap().to_string().as_str());
378        let flusher = Arc::new(new(endpoint.clone()).unwrap());
379
380        {
381            let client = flusher
382                .client
383                .lock()
384                .expect("failed to obtain lock on client");
385            assert!(client.is_none());
386        }
387
388        let tasks: Vec<_> = (0..10)
389            .map(|_| {
390                let flusher_clone = Arc::clone(&flusher);
391                tokio::spawn(async move {
392                    flusher_clone.send(vec![
393                        Count("test_count", 3, &vec![tag!("foo", "bar")]),
394                        Count("test_neg_count", -2, &vec![]),
395                        Distribution("test_distribution", 4.2, &vec![]),
396                        Gauge("test_gauge", 7.6, &vec![]),
397                        Histogram("test_histogram", 8.0, &vec![]),
398                        Set("test_set", 9, &vec![tag!("the", "end")]),
399                        Set("test_neg_set", -1, &vec![]),
400                    ]);
401
402                    let client = flusher_clone
403                        .client
404                        .lock()
405                        .expect("failed to obtain lock on client within send thread");
406                    assert!(client.is_some());
407                })
408            })
409            .collect();
410
411        for task in tasks {
412            task.await.unwrap();
413        }
414    }
415}