metrics_exporter_statsd/
builder.rs

1use std::net::UdpSocket;
2use std::panic::RefUnwindSafe;
3use std::sync::Arc;
4
5use cadence::{
6    BufferedUdpMetricSink, MetricSink, QueuingMetricSink, StatsdClient, StatsdClientBuilder,
7};
8use metrics::SetRecorderError;
9
10use crate::recorder::StatsdRecorder;
11use crate::types::HistogramType;
12use thiserror::Error;
13
14const DEFAULT_HOST: &str = "127.0.0.1";
15const DEFAULT_PORT: u16 = 8125;
16const DEFAULT_QUEUE_SIZE: usize = 5000;
17const DEFAULT_BUFFER_SIZE: usize = 256;
18const CLIENT_UDP_HOST: &str = "0.0.0.0";
19
20#[derive(Error, Debug)]
21pub enum StatsdError {
22    /// This error indicates that the caller has supplied an invalid/empty host name.
23    #[error("Empty hostname is not allowed")]
24    InvalidHost,
25
26    /// The caller specified port 0. In TCP/UDP programming generally, this is sometimes used to
27    /// tell the system "pick a port for me", but we don't support it.
28    #[error("Port number must be nonzero")]
29    InvalidPortZero,
30
31    /// MetricError indicates that there was an error reporting metrics to statsd, this is directly
32    /// mapped from [`cadence::MetricError`].
33    #[error("Metrics reporting error")]
34    MetricError {
35        #[from]
36        source: cadence::MetricError,
37    },
38
39    /// Any I/O-related errors, e.g. UDP connection/bind errors.
40    #[error(transparent)]
41    IoError(#[from] std::io::Error),
42
43    /// An error indicating that there was a problem registering [`StatsdRecorder`] with the
44    /// [`metrics-rs`] system.
45    #[error("Could not register the metrics recorder")]
46    RecorderError {
47        #[from]
48        source: SetRecorderError<StatsdRecorder>,
49    },
50}
51
52/// Type used as a wrapper for a custom sink.
53///
54/// The alternative would be `Box<dyn MetricSink + Sync + Send + ...>` but that would incur `dyn`
55/// overhead each time a metric is written, whereas this incurs `dyn` overhead only once (the one
56/// time this closure is called, from `StatsdBuilder::build`).
57type BoxedSinkClosure = Box<dyn FnOnce(&str) -> StatsdClientBuilder>;
58
59/// [`StatsdBuilder`] is responsible building and configuring a [`StatsdRecorder`].
60pub struct StatsdBuilder {
61    host: String,
62    port: u16,
63    queue_size: Option<usize>,
64    buffer_size: Option<usize>,
65    default_histogram: HistogramType,
66    client_udp_host: String,
67    default_tags: Vec<(String, String)>,
68    sink: Option<BoxedSinkClosure>,
69}
70
71impl StatsdBuilder {
72    /// Configures the [`StatsdBuilder`] with provided host and port number. A [`StatsdError`]
73    /// is returned to the caller if the values supplied for host and/or port are invalid.
74    /// You can further customize other variables like `queue_size` and `buffer_size` by calling
75    /// appropriate `with_*` methods on the builder.
76    pub fn from<S: Into<String>>(host: S, port: u16) -> Self {
77        StatsdBuilder {
78            host: host.into(),
79            port,
80            queue_size: None,
81            buffer_size: None,
82            default_histogram: HistogramType::Histogram,
83            client_udp_host: CLIENT_UDP_HOST.to_string(),
84            default_tags: Vec::new(),
85            sink: None,
86        }
87    }
88
89    /// Configure queue size for this builder, the queue size is eventually passed down to the
90    /// underlying StatsdClient to control how many elements should be allowed to buffer in a queue.
91    /// The default value for the queue size is `5000`, Statsd client will error out and drop the
92    /// new elements being sent to it once it hits this capacity.
93    pub fn with_queue_size(mut self, queue_size: usize) -> Self {
94        self.queue_size = Some(queue_size);
95        self
96    }
97
98    /// Buffer size controls how much should be buffered in StatsdClient's memory before they are
99    /// actually written out over the socket. This value is conservatively set to 256 bytes and
100    /// should be adjusted according to the application needs.
101    pub fn with_buffer_size(mut self, buffer_size: usize) -> Self {
102        self.buffer_size = Some(buffer_size);
103        self
104    }
105
106    /// Host address to which the local udp socket would be bound, this address defaults to
107    /// `0.0.0.0`. Be careful with using `127.0.0.1` as systems like kubernetes might blackhole
108    /// all the traffic routed to that address.
109    pub fn with_client_udp_host<S: Into<String>>(mut self, client_udp_host: S) -> Self {
110        self.client_udp_host = client_udp_host.into();
111        self
112    }
113
114    /// A hint for the metric emitter to determine how the histogram metrics should be emitted,
115    /// all the histogram metrics will be sent as distribution when running in this mode unless
116    /// specified otherwise via a label.
117    pub fn histogram_is_distribution(mut self) -> Self {
118        self.default_histogram = HistogramType::Distribution;
119        self
120    }
121
122    /// A hint for the metric emitter to determine how the histogram metrics should be emitted,
123    /// all the histogram metrics will be sent as timer when running in this mode unless specified
124    /// otherwise via a label.
125    pub fn histogram_is_timer(mut self) -> Self {
126        self.default_histogram = HistogramType::Timer;
127        self
128    }
129
130    /// Add a default tag with key and value to all statsd metrics produced with this recorder.
131    pub fn with_default_tag<K, V>(mut self, key: K, value: V) -> Self
132    where
133        K: ToString,
134        V: ToString,
135    {
136        self.default_tags.push((key.to_string(), value.to_string()));
137        self
138    }
139
140    /// Use a custom `MetricSink`.
141    ///
142    /// This method supersedes all other settings for metrics output, including the hostname and
143    /// port specified in [`StatsdBuilder::from`] and values passed to the `with_queue_size`,
144    /// `with_buffer_size`, and `with_client_udp_host` methods. The specified `sink` is used
145    /// instead.
146    ///
147    /// (When this method is not called, the builder creates a default sink using those settings,
148    /// [`cadence::QueuingMetricSink`], and [`cadence::UdpMetricSink`].)
149    ///
150    /// # Examples
151    ///
152    /// This code replaces the ordinary UDP sink with output to a Unix socket.
153    ///
154    /// ```no_run
155    /// use metrics_exporter_statsd::StatsdBuilder;
156    /// use cadence::BufferedUnixMetricSink;
157    /// use std::os::unix::net::UnixDatagram;
158    ///
159    /// # fn main() -> Result<(), Box<dyn std::error::Error + Sync + Send>> {
160    /// let path = "/path/to/my/metrics/socket";
161    /// let socket = UnixDatagram::bind(path)?;
162    /// let sink = BufferedUnixMetricSink::from(path, socket);
163    ///
164    /// let recorder = StatsdBuilder::from("", 0)
165    ///     .with_sink(sink)
166    ///     .build(Some("my_app"))?;
167    /// metrics::set_global_recorder(recorder);
168    /// #     Ok(())
169    /// # }
170    /// ```
171    pub fn with_sink<T>(mut self, sink: T) -> Self
172    where
173        T: MetricSink + Sync + Send + RefUnwindSafe + 'static,
174    {
175        self.sink = Some(Box::new(move |prefix: &str| {
176            StatsdClient::builder(prefix, sink)
177        }));
178        self
179    }
180
181    /// This method is responsible building the StatsdRecorder. It configures the underlying metrics sink for
182    /// the [`StatsdClient`] with the values provided e.g. `queue_size`, `buffer_size` etc.
183    ///
184    /// All the metrics emitted from the recorder are prefixed with the prefix that's provided here.
185    ///
186    /// # Examples
187    /// ```
188    /// use metrics_exporter_statsd::StatsdBuilder;
189    /// let recorder = StatsdBuilder::from("localhost", 8125)
190    ///                .build(Some("prefix"))
191    ///                .expect("Could not create StatsdRecorder");
192    ///
193    /// metrics::set_global_recorder(recorder);
194    /// metrics::counter!("counter.name").increment(10);
195    /// ```
196    /// will emit a counter metric name as `prefix.counter.name`
197    pub fn build(self, prefix: Option<&str>) -> Result<StatsdRecorder, StatsdError> {
198        self.is_valid()?;
199
200        let prefix = prefix.unwrap_or("");
201        let mut builder = match self.sink {
202            Some(sink_fn) => sink_fn(prefix),
203            None => {
204                // create a local udp socket where the communication needs to happen, the port is set to
205                // 0 so that we can pick any available port on the host. We also want this socket to be
206                // non-blocking
207                let socket = UdpSocket::bind(format!("{}:{}", self.client_udp_host, 0))?;
208                socket.set_nonblocking(true)?;
209                // Initialize the statsd client with metrics sink that will be used to collect and send
210                // the metrics to the remote host.
211                let host = (self.host, self.port);
212
213                // Initialize buffered udp metrics sink with the provided or default capacity, this allows
214                // statsd client (cadence) to buffer metrics upto the configured size in memory before, flushing
215                // to network.
216                let udp_sink = BufferedUdpMetricSink::with_capacity(
217                    host,
218                    socket,
219                    self.buffer_size.unwrap_or(DEFAULT_BUFFER_SIZE),
220                )?;
221                // Initialize a bounded QueuingMetricSink so that we are not buffering unlimited items onto
222                // statsd client's queue, statsd client will error out when the queue is full.
223                let sink = QueuingMetricSink::with_capacity(
224                    udp_sink,
225                    self.queue_size.unwrap_or(DEFAULT_BUFFER_SIZE),
226                );
227                StatsdClient::builder(prefix, sink)
228            }
229        };
230
231        for (key, value) in self.default_tags {
232            builder = builder.with_tag(key, value);
233        }
234
235        Ok(StatsdRecorder {
236            statsd: Arc::new(builder.build()),
237            default_histogram: self.default_histogram,
238        })
239    }
240
241    fn is_valid(&self) -> Result<(), StatsdError> {
242        // Check settings only if we are going to use them.
243        if self.sink.is_none() {
244            if self.host.trim().is_empty() {
245                return Err(StatsdError::InvalidHost);
246            }
247            if self.port == 0 {
248                return Err(StatsdError::InvalidPortZero);
249            }
250        }
251        Ok(())
252    }
253}
254
255impl Default for StatsdBuilder {
256    fn default() -> Self {
257        StatsdBuilder {
258            host: DEFAULT_HOST.to_string(),
259            port: DEFAULT_PORT,
260            queue_size: Some(DEFAULT_QUEUE_SIZE),
261            buffer_size: Some(DEFAULT_BUFFER_SIZE),
262            default_histogram: HistogramType::Histogram,
263            client_udp_host: CLIENT_UDP_HOST.to_string(),
264            default_tags: Vec::new(),
265            sink: None,
266        }
267    }
268}
269
270#[cfg(test)]
271mod tests {
272    use std::io;
273    use std::net::UdpSocket;
274    use std::sync::{Arc, Mutex};
275    use std::time::Duration;
276
277    use metrics::{Key, Label, Recorder};
278
279    use super::*;
280
281    pub struct Environ {
282        server_socket: UdpSocket,
283        recorder: StatsdRecorder,
284    }
285
286    impl Environ {
287        fn setup() -> (UdpSocket, StatsdBuilder) {
288            let server_socket = UdpSocket::bind("127.0.0.1:0")
289                .expect("localhost should always be a valid socket address");
290            server_socket
291                .set_read_timeout(Some(Duration::from_secs(2)))
292                .expect("failed to set the read timeout on our localhost socket");
293            let port = server_socket
294                .local_addr()
295                .expect("socket should have a local addr")
296                .port();
297
298            let builder = StatsdBuilder::from("127.0.0.1", port)
299                .with_queue_size(1)
300                .with_buffer_size(10);
301            (server_socket, builder)
302        }
303
304        pub fn new(prefix: Option<&str>) -> Self {
305            let (server_socket, builder) = Environ::setup();
306            let recorder = builder
307                .build(prefix)
308                .expect("test env should build a valid recorder");
309            Environ {
310                server_socket,
311                recorder,
312            }
313        }
314
315        pub fn new_histogram_is_distribution() -> Self {
316            let (server_socket, builder) = Environ::setup();
317            let recorder = builder
318                .histogram_is_distribution()
319                .build(None)
320                .expect("test env should build a valid recorder");
321            Environ {
322                server_socket,
323                recorder,
324            }
325        }
326
327        pub fn new_histogram_is_timer() -> Self {
328            let (server_socket, builder) = Environ::setup();
329            let recorder = builder
330                .histogram_is_timer()
331                .build(None)
332                .expect("test env should build a valid recorder");
333            Environ {
334                server_socket,
335                recorder,
336            }
337        }
338
339        fn receive_on_server(&self) -> String {
340            let mut buff = [0; 100];
341
342            let size = self
343                .server_socket
344                .recv(&mut buff)
345                .expect("could not receive on server socket");
346            let data = &buff[..size];
347            let request = std::str::from_utf8(data).expect("request is no a valid UTF-8 string");
348            String::from(request)
349        }
350    }
351
352    static METADATA: metrics::Metadata =
353        metrics::Metadata::new(module_path!(), metrics::Level::INFO, Some(module_path!()));
354
355    #[test]
356    #[should_panic]
357    fn bad_host_name() {
358        StatsdBuilder::from("", 10)
359            .build(None)
360            .expect("this should panic");
361    }
362
363    #[test]
364    #[should_panic]
365    fn bad_port() {
366        StatsdBuilder::from("127.0.0.1", 0)
367            .build(None)
368            .expect("this should panic");
369    }
370
371    #[test]
372    fn counter() {
373        let env = Environ::new(None);
374        let key = Key::from_name("counter.name");
375        let counter = env.recorder.register_counter(&key, &METADATA);
376        counter.increment(1);
377        assert_eq!("counter.name:1|c", env.receive_on_server());
378    }
379
380    #[test]
381    fn counter_with_tags() {
382        let env = Environ::new(None);
383        let tags = vec![Label::new("t1", "v1"), Label::new("t2", "v2")];
384        let key = Key::from(("counter.name", tags));
385
386        let coutner = env.recorder.register_counter(&key, &METADATA);
387        coutner.increment(10);
388        assert_eq!("counter.name:10|c|#t1:v1,t2:v2", env.receive_on_server());
389    }
390
391    #[test]
392    fn gauge() {
393        let env = Environ::new(None);
394        let key = Key::from_name("gauge.name");
395        let gauge = env.recorder.register_gauge(&key, &METADATA);
396        gauge.set(50.25);
397        assert_eq!("gauge.name:50.25|g", env.receive_on_server());
398    }
399
400    #[test]
401    fn gauge_with_tags() {
402        let env = Environ::new(None);
403        let tags = vec![Label::new("t1", "v1"), Label::new("t2", "v2")];
404        let key = Key::from(("gauge.name", tags));
405        let gauge = env.recorder.register_gauge(&key, &METADATA);
406        gauge.set(50.25);
407        assert_eq!("gauge.name:50.25|g|#t1:v1,t2:v2", env.receive_on_server());
408    }
409
410    #[test]
411    fn histogram() {
412        let env = Environ::new(None);
413        let key = Key::from_name("histogram.name");
414        let histogram = env.recorder.register_histogram(&key, &METADATA);
415        histogram.record(100.00);
416        assert_eq!("histogram.name:100|h", env.receive_on_server());
417    }
418
419    #[test]
420    fn histogram_with_decimals() {
421        let env = Environ::new(None);
422        let key = Key::from_name("histogram.name");
423        let histogram = env.recorder.register_histogram(&key, &METADATA);
424        histogram.record(100.52);
425        assert_eq!("histogram.name:100.52|h", env.receive_on_server());
426    }
427
428    #[test]
429    fn distribution_with_decimals() {
430        let env = Environ::new_histogram_is_distribution();
431        let key = Key::from_name("distribution.name");
432
433        let histogram = env.recorder.register_histogram(&key, &METADATA);
434        histogram.record(100.52);
435        assert_eq!("distribution.name:100.52|d", env.receive_on_server());
436    }
437
438    #[test]
439    fn histogram_with_tags() {
440        let env = Environ::new(None);
441        let tags = vec![Label::new("t1", "v1"), Label::new("t2", "v2")];
442        let key = Key::from(("histogram.name", tags));
443
444        let histogram = env.recorder.register_histogram(&key, &METADATA);
445        histogram.record(100.00);
446        assert_eq!("histogram.name:100|h|#t1:v1,t2:v2", env.receive_on_server());
447    }
448
449    #[test]
450    fn histogram_as_distribution() {
451        let env = Environ::new(None);
452        let tags = vec![
453            Label::new("t1", "v1"),
454            Label::new("t2", "v2"),
455            Label::new("histogram", "distribution"),
456        ];
457        let key = Key::from(("distribution.name", tags));
458
459        let histogram = env.recorder.register_histogram(&key, &METADATA);
460        histogram.record(100.00);
461        assert_eq!(
462            "distribution.name:100|d|#t1:v1,t2:v2",
463            env.receive_on_server()
464        );
465    }
466
467    #[test]
468    fn distribution_with_prefix() {
469        let env = Environ::new(Some("blackbird"));
470        let tags = vec![
471            Label::new("t1", "v1"),
472            Label::new("t2", "v2"),
473            Label::new("histogram", "distribution"),
474        ];
475        let key = Key::from(("distribution.name", tags));
476
477        let histogram = env.recorder.register_histogram(&key, &METADATA);
478        histogram.record(100.00);
479        assert_eq!(
480            "blackbird.distribution.name:100|d|#t1:v1,t2:v2",
481            env.receive_on_server()
482        );
483    }
484
485    #[test]
486    fn histogram_with_prefix() {
487        let env = Environ::new(Some("blackbird"));
488        let tags = vec![Label::new("t1", "v1"), Label::new("t2", "v2")];
489        let key = Key::from(("histogram.name", tags));
490
491        let histogram = env.recorder.register_histogram(&key, &METADATA);
492        histogram.record(100.00);
493        assert_eq!(
494            "blackbird.histogram.name:100|h|#t1:v1,t2:v2",
495            env.receive_on_server()
496        );
497    }
498
499    #[test]
500    fn histogram_as_timer() {
501        let env = Environ::new(None);
502        let tags = vec![
503            Label::new("t1", "v1"),
504            Label::new("t2", "v2"),
505            Label::new("histogram", "timer"),
506        ];
507        let key = Key::from(("histogram.name", tags));
508
509        let histogram = env.recorder.register_histogram(&key, &METADATA);
510        histogram.record(100.00);
511        // metrics-rs reports the unit as seconds and we convert it to ms
512        assert_eq!(
513            "histogram.name:100000|ms|#t1:v1,t2:v2",
514            env.receive_on_server()
515        );
516    }
517
518    #[test]
519    fn default_histogram_to_distribution() {
520        let env = Environ::new_histogram_is_distribution();
521        let tags = vec![Label::new("t1", "v1"), Label::new("t2", "v2")];
522        let key = Key::from(("histogram.name", tags));
523
524        let histogram = env.recorder.register_histogram(&key, &METADATA);
525        histogram.record(100.00);
526        assert_eq!("histogram.name:100|d|#t1:v1,t2:v2", env.receive_on_server());
527    }
528
529    #[test]
530    fn default_histogram_to_timer() {
531        let env = Environ::new_histogram_is_timer();
532        let tags = vec![Label::new("t1", "v1"), Label::new("t2", "v2")];
533        let key = Key::from(("histogram.name", tags));
534
535        let histogram = env.recorder.register_histogram(&key, &METADATA);
536        histogram.record(100.00);
537        // metrics-rs reports the unit as seconds and we convert it to ms
538        assert_eq!(
539            "histogram.name:100000|ms|#t1:v1,t2:v2",
540            env.receive_on_server()
541        );
542    }
543
544    #[test]
545    fn prefix() {
546        let env = Environ::new(Some("koelbird"));
547        let key = Key::from_name("counter.name");
548        let counter = env.recorder.register_counter(&key, &METADATA);
549        counter.increment(1);
550        assert_eq!("koelbird.counter.name:1|c", env.receive_on_server());
551    }
552
553    #[test]
554    fn test_default_tags() {
555        let (server_socket, builder) = Environ::setup();
556        let recorder = builder
557            .with_default_tag("app_name", "test")
558            .with_default_tag("blackbird_cluster", "magenta")
559            .build(None)
560            .expect("test env should build a valid recorder");
561        let env = Environ {
562            server_socket,
563            recorder,
564        };
565
566        let key = Key::from_name("counter.name");
567        let counter = env.recorder.register_counter(&key, &METADATA);
568
569        counter.increment(1);
570        assert_eq!(
571            "counter.name:1|c|#app_name:test,blackbird_cluster:magenta",
572            env.receive_on_server()
573        );
574    }
575
576    #[test]
577    fn test_custom_sink() {
578        struct BadSink {
579            data: Arc<Mutex<String>>,
580        }
581
582        impl MetricSink for BadSink {
583            fn emit(&self, metric: &str) -> io::Result<usize> {
584                let mut writer = self.data.lock().unwrap();
585                *writer += metric;
586                writer.push('\n');
587                Ok(metric.len())
588            }
589        }
590
591        let s = Arc::new(Mutex::new(String::new()));
592        let recorder = StatsdBuilder::from("", 0)
593            .with_sink(BadSink {
594                data: Arc::clone(&s),
595            })
596            .build(Some("example_app"))
597            .expect("should build a recorder with custom sink");
598
599        let key = Key::from_name("counter.name");
600        let counter = recorder.register_counter(&key, &METADATA);
601        counter.increment(1);
602
603        let guard = s.lock().unwrap();
604        assert_eq!(guard.as_str(), "example_app.counter.name:1|c\n");
605    }
606}