1use std::collections::VecDeque;
2use std::error;
3use std::fmt;
4use std::io::Error;
5use std::net::AddrParseError;
6use std::net::{SocketAddr, ToSocketAddrs, UdpSocket};
7use std::time;
8
9extern crate rand;
10
11#[derive(Debug)]
12pub enum StatsdError {
13 IoError(Error),
14 AddrParseError(String),
15}
16
17impl From<AddrParseError> for StatsdError {
18 fn from(_: AddrParseError) -> StatsdError {
19 StatsdError::AddrParseError("Address parsing error".to_string())
20 }
21}
22
23impl From<Error> for StatsdError {
24 fn from(err: Error) -> StatsdError {
25 StatsdError::IoError(err)
26 }
27}
28
29impl fmt::Display for StatsdError {
30 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
31 match *self {
32 StatsdError::IoError(ref e) => write!(f, "{}", e),
33 StatsdError::AddrParseError(ref e) => write!(f, "{}", e),
34 }
35 }
36}
37
38impl error::Error for StatsdError {}
39
40pub struct Client {
56 socket: UdpSocket,
57 server_address: SocketAddr,
58 prefix: String,
59 constant_tags: Vec<String>,
60}
61
62impl Client {
63 pub fn new<T: ToSocketAddrs>(
65 host: T,
66 prefix: &str,
67 constant_tags: Option<Vec<&str>>,
68 ) -> Result<Client, StatsdError> {
69 let server_address = host
70 .to_socket_addrs()?
71 .next()
72 .ok_or_else(|| StatsdError::AddrParseError("Address parsing error".to_string()))?;
73
74 let socket = if server_address.is_ipv4() {
77 UdpSocket::bind("0.0.0.0:0")?
78 } else {
79 UdpSocket::bind("[::]:0")?
80 };
81 Ok(Client {
82 socket,
83 prefix: prefix.to_string(),
84 server_address,
85 constant_tags: match constant_tags {
86 Some(tags) => tags.iter().map(|x| x.to_string()).collect(),
87 None => vec![],
88 },
89 })
90 }
91
92 pub fn incr(&self, metric: &str, tags: &Option<Vec<&str>>) {
102 self.count(metric, 1.0, tags);
103 }
104
105 pub fn decr(&self, metric: &str, tags: &Option<Vec<&str>>) {
115 self.count(metric, -1.0, tags);
116 }
117
118 pub fn count(&self, metric: &str, value: f64, tags: &Option<Vec<&str>>) {
128 let data = self.prepare_with_tags(format!("{}:{}|c", metric, value), tags);
129 self.send(data);
130 }
131
132 pub fn sampled_count(&self, metric: &str, value: f64, rate: f64, tags: &Option<Vec<&str>>) {
143 if rand::random::<f64>() >= rate {
144 return;
145 }
146 let data = self.prepare_with_tags(format!("{}:{}|c|@{}", metric, value, rate), tags);
147 self.send(data);
148 }
149
150 pub fn gauge(&self, metric: &str, value: f64, tags: &Option<Vec<&str>>) {
157 let data = self.prepare_with_tags(format!("{}:{}|g", metric, value), tags);
158 self.send(data);
159 }
160
161 pub fn timer(&self, metric: &str, value: f64, tags: &Option<Vec<&str>>) {
170 let data = self.prepare_with_tags(format!("{}:{}|ms", metric, value), tags);
171 self.send(data);
172 }
173
174 pub fn time<F, R>(&self, metric: &str, tags: &Option<Vec<&str>>, callable: F) -> R
186 where
187 F: FnOnce() -> R,
188 {
189 let start = time::Instant::now();
190 let return_val = callable();
191 let used = start.elapsed();
192 let data = self.prepare_with_tags(format!("{}:{}|ms", metric, used.as_millis()), tags);
193 self.send(data);
194 return_val
195 }
196
197 fn prepare<T: AsRef<str>>(&self, data: T) -> String {
198 if self.prefix.is_empty() {
199 data.as_ref().to_string()
200 } else {
201 format!("{}.{}", self.prefix, data.as_ref())
202 }
203 }
204
205 fn prepare_with_tags<T: AsRef<str>>(&self, data: T, tags: &Option<Vec<&str>>) -> String {
206 self.append_tags(self.prepare(data), tags)
207 }
208
209 fn append_tags<T: AsRef<str>>(&self, data: T, tags: &Option<Vec<&str>>) -> String {
210 if self.constant_tags.is_empty() && tags.is_none() {
211 data.as_ref().to_string()
212 } else {
213 let mut all_tags = self.constant_tags.clone();
214 match tags {
215 Some(v) => {
216 for tag in v {
217 all_tags.push(tag.to_string());
218 }
219 }
220 None => {
221 }
223 }
224 format!("{}|#{}", data.as_ref(), all_tags.join(","))
225 }
226 }
227
228 fn send(&self, data: String) {
230 let _ = self.socket.send_to(data.as_bytes(), self.server_address);
231 }
232
233 pub fn pipeline(&self) -> Pipeline {
243 Pipeline::new()
244 }
245
246 pub fn histogram(&self, metric: &str, value: f64, tags: &Option<Vec<&str>>) {
253 let data = self.prepare_with_tags(format!("{}:{}|h", metric, value), tags);
254 self.send(data);
255 }
256
257 pub fn event(&self, title: &str, text: &str, alert_type: AlertType, tags: &Option<Vec<&str>>) {
264 let mut d = vec![];
265 d.push(format!("_e{{{},{}}}:{}", title.len(), text.len(), title));
266 d.push(text.to_string());
267 if alert_type != AlertType::Info {
268 d.push(format!("t:{}", alert_type.to_string().to_lowercase()))
269 }
270 let event_with_tags = self.append_tags(d.join("|"), tags);
271 self.send(event_with_tags)
272 }
273
274 pub fn service_check(
281 &self,
282 service_check_name: &str,
283 status: ServiceCheckStatus,
284 tags: &Option<Vec<&str>>,
285 ) {
286 let mut d = vec![];
287 let status_code = (status as u32).to_string();
288 d.push("_sc");
289 d.push(service_check_name);
290 d.push(&status_code);
291 let sc_with_tags = self.append_tags(d.join("|"), tags);
292 self.send(sc_with_tags)
293 }
294}
295
296#[derive(Clone, Debug, PartialEq, Eq)]
297pub enum AlertType {
298 Info,
299 Error,
300 Warning,
301 Success,
302}
303
304impl fmt::Display for AlertType {
305 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
306 write!(f, "{:?}", self)
307 }
308}
309
310#[derive(Clone, Debug, PartialEq, Eq)]
311pub enum ServiceCheckStatus {
312 Ok = 0,
313 Warning = 1,
314 Critical = 2,
315 Unknown = 3,
316}
317
318pub struct Pipeline {
319 stats: VecDeque<String>,
320 max_udp_size: usize,
321}
322
323impl Default for Pipeline {
324 fn default() -> Self {
325 Self::new()
326 }
327}
328
329impl Pipeline {
330 pub fn new() -> Pipeline {
331 Pipeline {
332 stats: VecDeque::new(),
333 max_udp_size: 512,
334 }
335 }
336
337 pub fn set_max_udp_size(&mut self, max_udp_size: usize) {
346 self.max_udp_size = max_udp_size;
347 }
348
349 pub fn incr(&mut self, metric: &str) {
362 self.count(metric, 1.0);
363 }
364
365 pub fn decr(&mut self, metric: &str) {
378 self.count(metric, -1.0);
379 }
380
381 pub fn count(&mut self, metric: &str, value: f64) {
394 let data = format!("{}:{}|c", metric, value);
395 self.stats.push_back(data);
396 }
397
398 pub fn sampled_count(&mut self, metric: &str, value: f64, rate: f64) {
411 if rand::random::<f64>() >= rate {
412 return;
413 }
414 let data = format!("{}:{}|c|@{}", metric, value, rate);
415 self.stats.push_back(data);
416 }
417
418 pub fn gauge(&mut self, metric: &str, value: f64) {
428 let data = format!("{}:{}|g", metric, value);
429 self.stats.push_back(data);
430 }
431
432 pub fn timer(&mut self, metric: &str, value: f64) {
444 let data = format!("{}:{}|ms", metric, value);
445 self.stats.push_back(data);
446 }
447
448 pub fn time<F>(&mut self, metric: &str, callable: F)
463 where
464 F: FnOnce(),
465 {
466 let start = time::Instant::now();
467 callable();
468 let used = start.elapsed();
469 let data = format!("{}:{}|ms", metric, used.as_millis());
470 self.stats.push_back(data);
471 }
472
473 pub fn histogram(&mut self, metric: &str, value: f64) {
483 let data = format!("{}:{}|h", metric, value);
484 self.stats.push_back(data);
485 }
486
487 pub fn send(&mut self, client: &Client) {
489 let mut _data = String::new();
490 if let Some(data) = self.stats.pop_front() {
491 _data += client.prepare(&data).as_ref();
492 while !self.stats.is_empty() {
493 let stat = client.prepare(self.stats.pop_front().unwrap());
494 if data.len() + stat.len() + 1 > self.max_udp_size {
495 client.send(_data.clone());
496 _data.clear();
497 _data += &stat;
498 } else {
499 _data += "\n";
500 _data += &stat;
501 }
502 }
503 }
504 if !_data.is_empty() {
505 client.send(_data);
506 }
507 }
508}
509
510#[cfg(test)]
511mod test {
512 extern crate rand;
513 use self::rand::distributions::{IndependentSample, Range};
514 use super::*;
515 use std::net::UdpSocket;
516 use std::str;
517 use std::sync::mpsc::sync_channel;
518 use std::thread;
519
520 static PORT: u16 = 8125;
521
522 fn next_test_ip4() -> String {
526 let range = Range::new(0, 1000);
527 let mut rng = rand::thread_rng();
528 let port = PORT + range.ind_sample(&mut rng);
529 format!("127.0.0.1:{}", port)
530 }
531
532 fn make_server(host: &str) -> UdpSocket {
534 UdpSocket::bind(host).ok().unwrap()
535 }
536
537 fn server_recv(server: UdpSocket) -> String {
538 let (serv_tx, serv_rx) = sync_channel(1);
539 let _t = thread::spawn(move || {
540 let mut buf = [0; 128];
541 let (len, _) = match server.recv_from(&mut buf) {
542 Ok(r) => r,
543 Err(_) => panic!("No response from test server."),
544 };
545 drop(server);
546 let bytes = Vec::from(&buf[0..len]);
547 serv_tx.send(bytes).unwrap();
548 });
549
550 let bytes = serv_rx.recv().ok().unwrap();
551 str::from_utf8(&bytes).unwrap().to_string()
552 }
553
554 #[test]
555 fn test_sending_gauge() {
556 let host = next_test_ip4();
557 let server = make_server(&host);
558 let client = Client::new(&host, "myapp", None).unwrap();
559
560 client.gauge("metric", 9.1, &None);
561
562 let response = server_recv(server);
563 assert_eq!("myapp.metric:9.1|g", response);
564 }
565
566 #[test]
567 fn test_sending_gauge_without_prefix() {
568 let host = next_test_ip4();
569 let server = make_server(&host);
570 let client = Client::new(&host, "", None).unwrap();
571
572 client.gauge("metric", 9.1, &None);
573
574 let response = server_recv(server);
575 assert_eq!("metric:9.1|g", response);
576 }
577
578 #[test]
579 fn test_sending_incr() {
580 let host = next_test_ip4();
581 let server = make_server(&host);
582 let client = Client::new(&host, "myapp", None).unwrap();
583
584 client.incr("metric", &None);
585
586 let response = server_recv(server);
587 assert_eq!("myapp.metric:1|c", response);
588 }
589
590 #[test]
591 fn test_sending_decr() {
592 let host = next_test_ip4();
593 let server = make_server(&host);
594 let client = Client::new(&host, "myapp", None).unwrap();
595
596 client.decr("metric", &None);
597
598 let response = server_recv(server);
599 assert_eq!("myapp.metric:-1|c", response);
600 }
601
602 #[test]
603 fn test_sending_count() {
604 let host = next_test_ip4();
605 let server = make_server(&host);
606 let client = Client::new(&host, "myapp", None).unwrap();
607
608 client.count("metric", 12.2, &None);
609
610 let response = server_recv(server);
611 assert_eq!("myapp.metric:12.2|c", response);
612 }
613
614 #[test]
615 fn test_sending_timer() {
616 let host = next_test_ip4();
617 let server = make_server(&host);
618 let client = Client::new(&host, "myapp", None).unwrap();
619
620 client.timer("metric", 21.39, &None);
621
622 let response = server_recv(server);
623 assert_eq!("myapp.metric:21.39|ms", response);
624 }
625
626 #[test]
627 fn test_sending_timed_block() {
628 let host = next_test_ip4();
629 let server = make_server(&host);
630 let client = Client::new(&host, "myapp", None).unwrap();
631 struct TimeTest {
632 num: u8,
633 }
634
635 let mut t = TimeTest { num: 10 };
636 let output = client.time("time_block", &None, || {
637 t.num += 2;
638 "a string"
639 });
640
641 let response = server_recv(server);
642 assert_eq!(output, "a string");
643 assert_eq!(t.num, 12);
644 assert!(response.contains("myapp.time_block"));
645 assert!(response.contains("|ms"));
646 }
647
648 #[test]
649 fn test_sending_histogram() {
650 let host = next_test_ip4();
651 let server = make_server(&host);
652 let client = Client::new(&host, "myapp", None).unwrap();
653
654 client.histogram("metric", 9.1, &None);
656 let mut response = server_recv(server.try_clone().unwrap());
657 assert_eq!("myapp.metric:9.1|h", response);
658 client.histogram("metric", 9.1, &Some(vec!["tag1", "tag2:test"]));
660 response = server_recv(server.try_clone().unwrap());
661 assert_eq!("myapp.metric:9.1|h|#tag1,tag2:test", response);
662 }
663
664 #[test]
665 fn test_sending_histogram_with_constant_tags() {
666 let host = next_test_ip4();
667 let server = make_server(&host);
668 let client =
669 Client::new(&host, "myapp", Some(vec!["tag1common", "tag2common:test"])).unwrap();
670
671 client.histogram("metric", 9.1, &None);
673 let mut response = server_recv(server.try_clone().unwrap());
674 assert_eq!("myapp.metric:9.1|h|#tag1common,tag2common:test", response);
675 let tags = &Some(vec!["tag1", "tag2:test"]);
677 client.histogram("metric", 9.1, tags);
678 response = server_recv(server.try_clone().unwrap());
679 assert_eq!(
680 "myapp.metric:9.1|h|#tag1common,tag2common:test,tag1,tag2:test",
681 response
682 );
683 client.histogram("metric", 19.12, tags);
685 response = server_recv(server.try_clone().unwrap());
686 assert_eq!(
687 "myapp.metric:19.12|h|#tag1common,tag2common:test,tag1,tag2:test",
688 response
689 );
690 }
691
692 #[test]
693 fn test_sending_event_with_tags() {
694 let host = next_test_ip4();
695 let server = make_server(&host);
696 let client = Client::new(&host, "myapp", None).unwrap();
697
698 client.event(
699 "Title Test",
700 "Text ABC",
701 AlertType::Error,
702 &Some(vec!["tag1", "tag2:test"]),
703 );
704
705 let response = server_recv(server);
706 assert_eq!(
707 "_e{10,8}:Title Test|Text ABC|t:error|#tag1,tag2:test",
708 response
709 );
710 }
711
712 #[test]
713 fn test_sending_service_check_with_tags() {
714 let host = next_test_ip4();
715 let server = make_server(&host);
716 let client = Client::new(&host, "myapp", None).unwrap();
717
718 client.service_check(
719 "Service.check.name",
720 ServiceCheckStatus::Critical,
721 &Some(vec!["tag1", "tag2:test"]),
722 );
723
724 let response = server_recv(server);
725 assert_eq!("_sc|Service.check.name|2|#tag1,tag2:test", response);
726 }
727
728 #[test]
729 fn test_pipeline_sending_time_block() {
730 let host = next_test_ip4();
731 let server = make_server(&host);
732 let client = Client::new(&host, "myapp", None).unwrap();
733 let mut pipeline = client.pipeline();
734 pipeline.gauge("metric", 9.1);
735 struct TimeTest {
736 num: u8,
737 }
738
739 let mut t = TimeTest { num: 10 };
740 pipeline.time("time_block", || {
741 t.num += 2;
742 });
743 pipeline.send(&client);
744
745 let response = server_recv(server);
746 assert_eq!(t.num, 12);
747 assert_eq!("myapp.metric:9.1|g\nmyapp.time_block:0|ms", response);
748 }
749
750 #[test]
751 fn test_pipeline_sending_gauge() {
752 let host = next_test_ip4();
753 let server = make_server(&host);
754 let client = Client::new(&host, "myapp", None).unwrap();
755 let mut pipeline = client.pipeline();
756 pipeline.gauge("metric", 9.1);
757 pipeline.send(&client);
758
759 let response = server_recv(server);
760 assert_eq!("myapp.metric:9.1|g", response);
761 }
762
763 #[test]
764 fn test_pipeline_sending_histogram() {
765 let host = next_test_ip4();
766 let server = make_server(&host);
767 let client = Client::new(&host, "myapp", None).unwrap();
768 let mut pipeline = client.pipeline();
769 pipeline.histogram("metric", 9.1);
770 pipeline.send(&client);
771
772 let response = server_recv(server);
773 assert_eq!("myapp.metric:9.1|h", response);
774 }
775
776 #[test]
777 fn test_pipeline_sending_multiple_data() {
778 let host = next_test_ip4();
779 let server = make_server(&host);
780 let client = Client::new(&host, "myapp", None).unwrap();
781 let mut pipeline = client.pipeline();
782 pipeline.gauge("metric", 9.1);
783 pipeline.count("metric", 12.2);
784 pipeline.send(&client);
785
786 let response = server_recv(server);
787 assert_eq!("myapp.metric:9.1|g\nmyapp.metric:12.2|c", response);
788 }
789
790 #[test]
791 fn test_pipeline_set_max_udp_size() {
792 let host = next_test_ip4();
793 let server = make_server(&host);
794 let client = Client::new(&host, "myapp", None).unwrap();
795 let mut pipeline = client.pipeline();
796 pipeline.set_max_udp_size(20);
797 pipeline.gauge("metric", 9.1);
798 pipeline.count("metric", 12.2);
799 pipeline.send(&client);
800
801 let response = server_recv(server);
802 assert_eq!("myapp.metric:9.1|g", response);
803 }
804
805 #[test]
806 fn test_pipeline_send_metric_after_pipeline() {
807 let host = next_test_ip4();
808 let server = make_server(&host);
809 let client = Client::new(&host, "myapp", None).unwrap();
810 let mut pipeline = client.pipeline();
811
812 pipeline.gauge("load", 9.0);
813 pipeline.count("customers", 7.0);
814 pipeline.send(&client);
815
816 client.count("customers", 6.0, &None);
819
820 let response = server_recv(server);
821 assert_eq!("myapp.load:9|g\nmyapp.customers:7|c", response);
822 }
823}