1use std::net::UdpSocket;
2use std::panic::RefUnwindSafe;
3use std::sync::Arc;
4
5use cadence::{
6 BufferedUdpMetricSink, MetricSink, QueuingMetricSink, StatsdClient, StatsdClientBuilder,
7};
8use metrics::SetRecorderError;
9
10use crate::recorder::StatsdRecorder;
11use crate::types::HistogramType;
12use thiserror::Error;
13
14const DEFAULT_HOST: &str = "127.0.0.1";
15const DEFAULT_PORT: u16 = 8125;
16const DEFAULT_QUEUE_SIZE: usize = 5000;
17const DEFAULT_BUFFER_SIZE: usize = 256;
18const CLIENT_UDP_HOST: &str = "0.0.0.0";
19
20#[derive(Error, Debug)]
21pub enum StatsdError {
22 #[error("Empty hostname is not allowed")]
24 InvalidHost,
25
26 #[error("Port number must be nonzero")]
29 InvalidPortZero,
30
31 #[error("Metrics reporting error")]
34 MetricError {
35 #[from]
36 source: cadence::MetricError,
37 },
38
39 #[error(transparent)]
41 IoError(#[from] std::io::Error),
42
43 #[error("Could not register the metrics recorder")]
46 RecorderError {
47 #[from]
48 source: SetRecorderError<StatsdRecorder>,
49 },
50}
51
52type BoxedSinkClosure = Box<dyn FnOnce(&str) -> StatsdClientBuilder>;
58
59pub struct StatsdBuilder {
61 host: String,
62 port: u16,
63 queue_size: Option<usize>,
64 buffer_size: Option<usize>,
65 default_histogram: HistogramType,
66 client_udp_host: String,
67 default_tags: Vec<(String, String)>,
68 sink: Option<BoxedSinkClosure>,
69}
70
71impl StatsdBuilder {
72 pub fn from<S: Into<String>>(host: S, port: u16) -> Self {
77 StatsdBuilder {
78 host: host.into(),
79 port,
80 queue_size: None,
81 buffer_size: None,
82 default_histogram: HistogramType::Histogram,
83 client_udp_host: CLIENT_UDP_HOST.to_string(),
84 default_tags: Vec::new(),
85 sink: None,
86 }
87 }
88
89 pub fn with_queue_size(mut self, queue_size: usize) -> Self {
94 self.queue_size = Some(queue_size);
95 self
96 }
97
98 pub fn with_buffer_size(mut self, buffer_size: usize) -> Self {
102 self.buffer_size = Some(buffer_size);
103 self
104 }
105
106 pub fn with_client_udp_host<S: Into<String>>(mut self, client_udp_host: S) -> Self {
110 self.client_udp_host = client_udp_host.into();
111 self
112 }
113
114 pub fn histogram_is_distribution(mut self) -> Self {
118 self.default_histogram = HistogramType::Distribution;
119 self
120 }
121
122 pub fn histogram_is_timer(mut self) -> Self {
126 self.default_histogram = HistogramType::Timer;
127 self
128 }
129
130 pub fn with_default_tag<K, V>(mut self, key: K, value: V) -> Self
132 where
133 K: ToString,
134 V: ToString,
135 {
136 self.default_tags.push((key.to_string(), value.to_string()));
137 self
138 }
139
140 pub fn with_sink<T>(mut self, sink: T) -> Self
172 where
173 T: MetricSink + Sync + Send + RefUnwindSafe + 'static,
174 {
175 self.sink = Some(Box::new(move |prefix: &str| {
176 StatsdClient::builder(prefix, sink)
177 }));
178 self
179 }
180
181 pub fn build(self, prefix: Option<&str>) -> Result<StatsdRecorder, StatsdError> {
198 self.is_valid()?;
199
200 let prefix = prefix.unwrap_or("");
201 let mut builder = match self.sink {
202 Some(sink_fn) => sink_fn(prefix),
203 None => {
204 let socket = UdpSocket::bind(format!("{}:{}", self.client_udp_host, 0))?;
208 socket.set_nonblocking(true)?;
209 let host = (self.host, self.port);
212
213 let udp_sink = BufferedUdpMetricSink::with_capacity(
217 host,
218 socket,
219 self.buffer_size.unwrap_or(DEFAULT_BUFFER_SIZE),
220 )?;
221 let sink = QueuingMetricSink::with_capacity(
224 udp_sink,
225 self.queue_size.unwrap_or(DEFAULT_BUFFER_SIZE),
226 );
227 StatsdClient::builder(prefix, sink)
228 }
229 };
230
231 for (key, value) in self.default_tags {
232 builder = builder.with_tag(key, value);
233 }
234
235 Ok(StatsdRecorder {
236 statsd: Arc::new(builder.build()),
237 default_histogram: self.default_histogram,
238 })
239 }
240
241 fn is_valid(&self) -> Result<(), StatsdError> {
242 if self.sink.is_none() {
244 if self.host.trim().is_empty() {
245 return Err(StatsdError::InvalidHost);
246 }
247 if self.port == 0 {
248 return Err(StatsdError::InvalidPortZero);
249 }
250 }
251 Ok(())
252 }
253}
254
255impl Default for StatsdBuilder {
256 fn default() -> Self {
257 StatsdBuilder {
258 host: DEFAULT_HOST.to_string(),
259 port: DEFAULT_PORT,
260 queue_size: Some(DEFAULT_QUEUE_SIZE),
261 buffer_size: Some(DEFAULT_BUFFER_SIZE),
262 default_histogram: HistogramType::Histogram,
263 client_udp_host: CLIENT_UDP_HOST.to_string(),
264 default_tags: Vec::new(),
265 sink: None,
266 }
267 }
268}
269
270#[cfg(test)]
271mod tests {
272 use std::io;
273 use std::net::UdpSocket;
274 use std::sync::{Arc, Mutex};
275 use std::time::Duration;
276
277 use metrics::{Key, Label, Recorder};
278
279 use super::*;
280
281 pub struct Environ {
282 server_socket: UdpSocket,
283 recorder: StatsdRecorder,
284 }
285
286 impl Environ {
287 fn setup() -> (UdpSocket, StatsdBuilder) {
288 let server_socket = UdpSocket::bind("127.0.0.1:0")
289 .expect("localhost should always be a valid socket address");
290 server_socket
291 .set_read_timeout(Some(Duration::from_secs(2)))
292 .expect("failed to set the read timeout on our localhost socket");
293 let port = server_socket
294 .local_addr()
295 .expect("socket should have a local addr")
296 .port();
297
298 let builder = StatsdBuilder::from("127.0.0.1", port)
299 .with_queue_size(1)
300 .with_buffer_size(10);
301 (server_socket, builder)
302 }
303
304 pub fn new(prefix: Option<&str>) -> Self {
305 let (server_socket, builder) = Environ::setup();
306 let recorder = builder
307 .build(prefix)
308 .expect("test env should build a valid recorder");
309 Environ {
310 server_socket,
311 recorder,
312 }
313 }
314
315 pub fn new_histogram_is_distribution() -> Self {
316 let (server_socket, builder) = Environ::setup();
317 let recorder = builder
318 .histogram_is_distribution()
319 .build(None)
320 .expect("test env should build a valid recorder");
321 Environ {
322 server_socket,
323 recorder,
324 }
325 }
326
327 pub fn new_histogram_is_timer() -> Self {
328 let (server_socket, builder) = Environ::setup();
329 let recorder = builder
330 .histogram_is_timer()
331 .build(None)
332 .expect("test env should build a valid recorder");
333 Environ {
334 server_socket,
335 recorder,
336 }
337 }
338
339 fn receive_on_server(&self) -> String {
340 let mut buff = [0; 100];
341
342 let size = self
343 .server_socket
344 .recv(&mut buff)
345 .expect("could not receive on server socket");
346 let data = &buff[..size];
347 let request = std::str::from_utf8(data).expect("request is no a valid UTF-8 string");
348 String::from(request)
349 }
350 }
351
352 static METADATA: metrics::Metadata =
353 metrics::Metadata::new(module_path!(), metrics::Level::INFO, Some(module_path!()));
354
355 #[test]
356 #[should_panic]
357 fn bad_host_name() {
358 StatsdBuilder::from("", 10)
359 .build(None)
360 .expect("this should panic");
361 }
362
363 #[test]
364 #[should_panic]
365 fn bad_port() {
366 StatsdBuilder::from("127.0.0.1", 0)
367 .build(None)
368 .expect("this should panic");
369 }
370
371 #[test]
372 fn counter() {
373 let env = Environ::new(None);
374 let key = Key::from_name("counter.name");
375 let counter = env.recorder.register_counter(&key, &METADATA);
376 counter.increment(1);
377 assert_eq!("counter.name:1|c", env.receive_on_server());
378 }
379
380 #[test]
381 fn counter_with_tags() {
382 let env = Environ::new(None);
383 let tags = vec![Label::new("t1", "v1"), Label::new("t2", "v2")];
384 let key = Key::from(("counter.name", tags));
385
386 let coutner = env.recorder.register_counter(&key, &METADATA);
387 coutner.increment(10);
388 assert_eq!("counter.name:10|c|#t1:v1,t2:v2", env.receive_on_server());
389 }
390
391 #[test]
392 fn gauge() {
393 let env = Environ::new(None);
394 let key = Key::from_name("gauge.name");
395 let gauge = env.recorder.register_gauge(&key, &METADATA);
396 gauge.set(50.25);
397 assert_eq!("gauge.name:50.25|g", env.receive_on_server());
398 }
399
400 #[test]
401 fn gauge_with_tags() {
402 let env = Environ::new(None);
403 let tags = vec![Label::new("t1", "v1"), Label::new("t2", "v2")];
404 let key = Key::from(("gauge.name", tags));
405 let gauge = env.recorder.register_gauge(&key, &METADATA);
406 gauge.set(50.25);
407 assert_eq!("gauge.name:50.25|g|#t1:v1,t2:v2", env.receive_on_server());
408 }
409
410 #[test]
411 fn histogram() {
412 let env = Environ::new(None);
413 let key = Key::from_name("histogram.name");
414 let histogram = env.recorder.register_histogram(&key, &METADATA);
415 histogram.record(100.00);
416 assert_eq!("histogram.name:100|h", env.receive_on_server());
417 }
418
419 #[test]
420 fn histogram_with_decimals() {
421 let env = Environ::new(None);
422 let key = Key::from_name("histogram.name");
423 let histogram = env.recorder.register_histogram(&key, &METADATA);
424 histogram.record(100.52);
425 assert_eq!("histogram.name:100.52|h", env.receive_on_server());
426 }
427
428 #[test]
429 fn distribution_with_decimals() {
430 let env = Environ::new_histogram_is_distribution();
431 let key = Key::from_name("distribution.name");
432
433 let histogram = env.recorder.register_histogram(&key, &METADATA);
434 histogram.record(100.52);
435 assert_eq!("distribution.name:100.52|d", env.receive_on_server());
436 }
437
438 #[test]
439 fn histogram_with_tags() {
440 let env = Environ::new(None);
441 let tags = vec![Label::new("t1", "v1"), Label::new("t2", "v2")];
442 let key = Key::from(("histogram.name", tags));
443
444 let histogram = env.recorder.register_histogram(&key, &METADATA);
445 histogram.record(100.00);
446 assert_eq!("histogram.name:100|h|#t1:v1,t2:v2", env.receive_on_server());
447 }
448
449 #[test]
450 fn histogram_as_distribution() {
451 let env = Environ::new(None);
452 let tags = vec![
453 Label::new("t1", "v1"),
454 Label::new("t2", "v2"),
455 Label::new("histogram", "distribution"),
456 ];
457 let key = Key::from(("distribution.name", tags));
458
459 let histogram = env.recorder.register_histogram(&key, &METADATA);
460 histogram.record(100.00);
461 assert_eq!(
462 "distribution.name:100|d|#t1:v1,t2:v2",
463 env.receive_on_server()
464 );
465 }
466
467 #[test]
468 fn distribution_with_prefix() {
469 let env = Environ::new(Some("blackbird"));
470 let tags = vec![
471 Label::new("t1", "v1"),
472 Label::new("t2", "v2"),
473 Label::new("histogram", "distribution"),
474 ];
475 let key = Key::from(("distribution.name", tags));
476
477 let histogram = env.recorder.register_histogram(&key, &METADATA);
478 histogram.record(100.00);
479 assert_eq!(
480 "blackbird.distribution.name:100|d|#t1:v1,t2:v2",
481 env.receive_on_server()
482 );
483 }
484
485 #[test]
486 fn histogram_with_prefix() {
487 let env = Environ::new(Some("blackbird"));
488 let tags = vec![Label::new("t1", "v1"), Label::new("t2", "v2")];
489 let key = Key::from(("histogram.name", tags));
490
491 let histogram = env.recorder.register_histogram(&key, &METADATA);
492 histogram.record(100.00);
493 assert_eq!(
494 "blackbird.histogram.name:100|h|#t1:v1,t2:v2",
495 env.receive_on_server()
496 );
497 }
498
499 #[test]
500 fn histogram_as_timer() {
501 let env = Environ::new(None);
502 let tags = vec![
503 Label::new("t1", "v1"),
504 Label::new("t2", "v2"),
505 Label::new("histogram", "timer"),
506 ];
507 let key = Key::from(("histogram.name", tags));
508
509 let histogram = env.recorder.register_histogram(&key, &METADATA);
510 histogram.record(100.00);
511 assert_eq!(
513 "histogram.name:100000|ms|#t1:v1,t2:v2",
514 env.receive_on_server()
515 );
516 }
517
518 #[test]
519 fn default_histogram_to_distribution() {
520 let env = Environ::new_histogram_is_distribution();
521 let tags = vec![Label::new("t1", "v1"), Label::new("t2", "v2")];
522 let key = Key::from(("histogram.name", tags));
523
524 let histogram = env.recorder.register_histogram(&key, &METADATA);
525 histogram.record(100.00);
526 assert_eq!("histogram.name:100|d|#t1:v1,t2:v2", env.receive_on_server());
527 }
528
529 #[test]
530 fn default_histogram_to_timer() {
531 let env = Environ::new_histogram_is_timer();
532 let tags = vec![Label::new("t1", "v1"), Label::new("t2", "v2")];
533 let key = Key::from(("histogram.name", tags));
534
535 let histogram = env.recorder.register_histogram(&key, &METADATA);
536 histogram.record(100.00);
537 assert_eq!(
539 "histogram.name:100000|ms|#t1:v1,t2:v2",
540 env.receive_on_server()
541 );
542 }
543
544 #[test]
545 fn prefix() {
546 let env = Environ::new(Some("koelbird"));
547 let key = Key::from_name("counter.name");
548 let counter = env.recorder.register_counter(&key, &METADATA);
549 counter.increment(1);
550 assert_eq!("koelbird.counter.name:1|c", env.receive_on_server());
551 }
552
553 #[test]
554 fn test_default_tags() {
555 let (server_socket, builder) = Environ::setup();
556 let recorder = builder
557 .with_default_tag("app_name", "test")
558 .with_default_tag("blackbird_cluster", "magenta")
559 .build(None)
560 .expect("test env should build a valid recorder");
561 let env = Environ {
562 server_socket,
563 recorder,
564 };
565
566 let key = Key::from_name("counter.name");
567 let counter = env.recorder.register_counter(&key, &METADATA);
568
569 counter.increment(1);
570 assert_eq!(
571 "counter.name:1|c|#app_name:test,blackbird_cluster:magenta",
572 env.receive_on_server()
573 );
574 }
575
576 #[test]
577 fn test_custom_sink() {
578 struct BadSink {
579 data: Arc<Mutex<String>>,
580 }
581
582 impl MetricSink for BadSink {
583 fn emit(&self, metric: &str) -> io::Result<usize> {
584 let mut writer = self.data.lock().unwrap();
585 *writer += metric;
586 writer.push('\n');
587 Ok(metric.len())
588 }
589 }
590
591 let s = Arc::new(Mutex::new(String::new()));
592 let recorder = StatsdBuilder::from("", 0)
593 .with_sink(BadSink {
594 data: Arc::clone(&s),
595 })
596 .build(Some("example_app"))
597 .expect("should build a recorder with custom sink");
598
599 let key = Key::from_name("counter.name");
600 let counter = recorder.register_counter(&key, &METADATA);
601 counter.increment(1);
602
603 let guard = s.lock().unwrap();
604 assert_eq!(guard.as_str(), "example_app.counter.name:1|c\n");
605 }
606}