1use crate::client::{MetricBackend, StatsdClient};
13use crate::types::{Metric, MetricError, MetricResult};
14use std::fmt::{self, Write};
15use std::marker::PhantomData;
16
17#[derive(Debug, Clone, Copy)]
19enum MetricType {
20 Counter,
21 Timer,
22 Gauge,
23 Meter,
24 Histogram,
25 Set,
26 Distribution,
27}
28
29impl fmt::Display for MetricType {
30 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
31 match *self {
32 MetricType::Counter => "c".fmt(f),
33 MetricType::Timer => "ms".fmt(f),
34 MetricType::Gauge => "g".fmt(f),
35 MetricType::Meter => "m".fmt(f),
36 MetricType::Histogram => "h".fmt(f),
37 MetricType::Set => "s".fmt(f),
38 MetricType::Distribution => "d".fmt(f),
39 }
40 }
41}
42
43#[derive(Debug, Clone)]
51pub enum MetricValue {
52 Signed(i64),
53 PackedSigned(Vec<i64>),
54 Unsigned(u64),
55 PackedUnsigned(Vec<u64>),
56 Float(f64),
57 PackedFloat(Vec<f64>),
58}
59
60impl MetricValue {
61 fn count(&self) -> usize {
62 match self {
63 Self::PackedSigned(x) => x.len(),
64 Self::PackedUnsigned(x) => x.len(),
65 Self::PackedFloat(x) => x.len(),
66 _ => 1,
67 }
68 }
69}
70
71fn write_value<T>(f: &mut fmt::Formatter<'_>, vals: &[T]) -> fmt::Result
72where
73 T: fmt::Display,
74{
75 for (i, value) in vals.iter().enumerate() {
76 if i > 0 {
77 f.write_char(':')?;
78 }
79 value.fmt(f)?;
80 }
81
82 fmt::Result::Ok(())
83}
84
85impl fmt::Display for MetricValue {
86 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
87 match self {
88 MetricValue::Signed(v) => v.fmt(f),
89 MetricValue::PackedSigned(v) => write_value(f, v),
90 MetricValue::Unsigned(v) => v.fmt(f),
91 MetricValue::PackedUnsigned(v) => write_value(f, v),
92 MetricValue::Float(v) => v.fmt(f),
93 MetricValue::PackedFloat(v) => write_value(f, v),
94 }
95 }
96}
97
98#[derive(Debug, Clone)]
99pub(crate) struct MetricFormatter<'a> {
100 prefix: &'a str,
101 key: &'a str,
102 val: MetricValue,
103 type_: MetricType,
104 tags: Vec<(Option<&'a str>, &'a str)>,
105 timestamp: Option<u64>,
108 sampling_rate: Option<f64>,
109 container_id: Option<&'a str>,
110 base_size: usize,
111 kv_size: usize,
112}
113
114impl<'a> MetricFormatter<'a> {
115 const TAG_PREFIX: &'static str = "|#";
116
117 pub(crate) fn counter(prefix: &'a str, key: &'a str, val: MetricValue) -> Self {
118 Self::from_val(prefix, key, val, MetricType::Counter)
119 }
120
121 pub(crate) fn timer(prefix: &'a str, key: &'a str, val: MetricValue) -> Self {
122 Self::from_val(prefix, key, val, MetricType::Timer)
123 }
124
125 pub(crate) fn gauge(prefix: &'a str, key: &'a str, val: MetricValue) -> Self {
126 Self::from_val(prefix, key, val, MetricType::Gauge)
127 }
128
129 pub(crate) fn meter(prefix: &'a str, key: &'a str, val: MetricValue) -> Self {
130 Self::from_val(prefix, key, val, MetricType::Meter)
131 }
132
133 pub(crate) fn histogram(prefix: &'a str, key: &'a str, val: MetricValue) -> Self {
134 Self::from_val(prefix, key, val, MetricType::Histogram)
135 }
136
137 pub(crate) fn distribution(prefix: &'a str, key: &'a str, val: MetricValue) -> Self {
138 Self::from_val(prefix, key, val, MetricType::Distribution)
139 }
140
141 pub(crate) fn set(prefix: &'a str, key: &'a str, val: MetricValue) -> Self {
142 Self::from_val(prefix, key, val, MetricType::Set)
143 }
144
145 #[rustfmt::skip]
146 fn from_val(prefix: &'a str, key: &'a str, val: MetricValue, type_: MetricType) -> Self {
147 let value_count = val.count();
148 MetricFormatter {
149 prefix,
150 key,
151 type_,
152 val,
153 tags: Vec::new(),
154 kv_size: 0,
160 base_size: prefix.len() + key.len() + 1 + 10 * value_count + 1 + 2, timestamp: None,
162 sampling_rate: None,
163 container_id: None,
164 }
165 }
166
167 fn with_tag(&mut self, key: &'a str, value: &'a str) {
168 self.tags.push((Some(key), value));
169 self.kv_size += key.len() + 1 + value.len();
170 }
171
172 fn with_tag_value(&mut self, value: &'a str) {
173 self.tags.push((None, value));
174 self.kv_size += value.len();
175 }
176
177 fn with_timestamp(&mut self, timestamp: u64) {
178 self.timestamp = Some(timestamp);
179 }
180
181 fn with_container_id(&mut self, container_id: &'a str) {
182 self.container_id = Some(container_id);
183 }
184
185 fn with_sampling_rate(&mut self, rate: f64) {
186 self.sampling_rate = Some(rate);
187 }
188
189 fn write_base_metric(&self, out: &mut String) {
190 let _ = write!(out, "{}{}:{}|{}", self.prefix, self.key, self.val, self.type_);
191 }
192
193 fn write_sampling_rate(&self, out: &mut String) {
194 if let Some(rate) = self.sampling_rate {
195 let _ = write!(out, "|@{}", rate);
197 }
198 }
199
200 fn write_tags(&self, out: &mut String) {
201 if !self.tags.is_empty() {
202 out.push_str(Self::TAG_PREFIX);
203 for (i, &(key, value)) in self.tags.iter().enumerate() {
204 if i > 0 {
205 out.push(',');
206 }
207 if let Some(key) = key {
208 out.push_str(key);
209 out.push(':');
210 }
211 out.push_str(value);
212 }
213 }
214 }
215
216 fn write_timestamp(&self, out: &mut String) {
217 if let Some(timestamp) = self.timestamp {
218 let _ = write!(out, "|T{}", timestamp);
220 }
221 }
222
223 fn write_container_id(&self, out: &mut String) {
224 if let Some(container_id) = self.container_id {
225 let _ = write!(out, "|c:{}", container_id);
227 }
228 }
229
230 fn tag_size_hint(&self) -> usize {
231 if self.tags.is_empty() {
232 return 0;
233 }
234
235 Self::TAG_PREFIX.len() + self.kv_size + self.tags.len() - 1
237 }
238
239 fn timestamp_size_hint(&self) -> usize {
240 if let Some(_timestamp) = self.timestamp {
241 2 + 10
243 } else {
244 0
245 }
246 }
247
248 fn sampling_rate_size_hint(&self) -> usize {
249 if let Some(_rate) = self.sampling_rate {
250 2 + 17 } else {
253 0
254 }
255 }
256
257 fn container_id_size_hint(&self) -> usize {
258 if let Some(container_id) = self.container_id {
259 2 + container_id.len()
261 } else {
262 0
263 }
264 }
265
266 fn size_hint(&self) -> usize {
267 self.base_size
268 + self.sampling_rate_size_hint()
269 + self.tag_size_hint()
270 + self.timestamp_size_hint()
271 + self.container_id_size_hint()
272 }
273
274 pub(crate) fn format(&self) -> String {
275 let size_hint = self.size_hint();
276 let mut metric_string = String::with_capacity(size_hint);
277 self.write_base_metric(&mut metric_string);
278 self.write_sampling_rate(&mut metric_string);
279 self.write_tags(&mut metric_string);
280 self.write_container_id(&mut metric_string);
281 self.write_timestamp(&mut metric_string);
282 metric_string
283 }
284}
285
286#[derive(Debug)]
292enum BuilderRepr<'m, 'c> {
293 Success(MetricFormatter<'m>, &'c StatsdClient),
294 Error(MetricError, &'c StatsdClient),
295}
296
297#[must_use = "Did you forget to call .send() after adding tags?"]
370#[derive(Debug)]
371pub struct MetricBuilder<'m, 'c, T>
372where
373 T: Metric + From<String>,
374{
375 repr: BuilderRepr<'m, 'c>,
376 type_: PhantomData<T>,
377}
378
379impl<'m, 'c, T> MetricBuilder<'m, 'c, T>
380where
381 T: Metric + From<String>,
382{
383 pub(crate) fn from_fmt(formatter: MetricFormatter<'m>, client: &'c StatsdClient) -> Self {
384 MetricBuilder {
385 repr: BuilderRepr::Success(formatter, client),
386 type_: PhantomData,
387 }
388 }
389
390 pub(crate) fn from_error(err: MetricError, client: &'c StatsdClient) -> Self {
391 MetricBuilder {
392 repr: BuilderRepr::Error(err, client),
393 type_: PhantomData,
394 }
395 }
396
397 pub fn with_tag(mut self, key: &'m str, value: &'m str) -> Self {
416 if let BuilderRepr::Success(ref mut formatter, _) = self.repr {
417 formatter.with_tag(key, value);
418 }
419 self
420 }
421
422 pub fn with_tag_value(mut self, value: &'m str) -> Self {
441 if let BuilderRepr::Success(ref mut formatter, _) = self.repr {
442 formatter.with_tag_value(value);
443 }
444 self
445 }
446
447 pub(crate) fn with_tags<V>(mut self, tags: V) -> Self
449 where
450 V: IntoIterator<Item = (Option<&'m str>, &'m str)>,
451 {
452 if let BuilderRepr::Success(ref mut formatter, _) = self.repr {
453 for tag in tags.into_iter() {
454 match tag {
455 (Some(key), value) => formatter.with_tag(key, value),
456 (None, value) => formatter.with_tag_value(value),
457 }
458 }
459 }
460
461 self
462 }
463
464 pub fn with_container_id(mut self, container_id: &'m str) -> Self {
466 if let BuilderRepr::Success(ref mut formatter, _) = self.repr {
467 formatter.with_container_id(container_id);
468 }
469 self
470 }
471
472 pub(crate) fn with_container_id_opt(mut self, container_id: Option<&'m str>) -> Self {
473 if let BuilderRepr::Success(ref mut formatter, _) = self.repr {
474 if let Some(container_id) = container_id {
475 formatter.with_container_id(container_id);
476 }
477 }
478 self
479 }
480
481 pub fn with_timestamp(mut self, timestamp: u64) -> Self {
501 if let BuilderRepr::Success(ref mut formatter, _) = self.repr {
502 formatter.with_timestamp(timestamp);
503 }
504
505 self
506 }
507
508 pub fn with_sampling_rate(mut self, rate: f64) -> Self {
530 if let BuilderRepr::Success(ref mut formatter, _) = self.repr {
531 formatter.with_sampling_rate(rate);
532 }
533
534 self
535 }
536
537 pub fn try_send(self) -> MetricResult<T> {
559 match self.repr {
560 BuilderRepr::Error(err, _) => Err(err),
561 BuilderRepr::Success(ref formatter, client) => {
562 let metric = T::from(formatter.format());
563 client.send_metric(&metric)?;
564 Ok(metric)
565 }
566 }
567 }
568
569 pub fn send(self) {
598 match self.repr {
599 BuilderRepr::Error(err, client) => client.consume_error(err),
600 BuilderRepr::Success(_, client) => {
601 if let Err(e) = self.try_send() {
602 client.consume_error(e);
603 }
604 }
605 }
606 }
607}
608
609#[cfg(test)]
610mod tests {
611 use super::{MetricBuilder, MetricFormatter, MetricValue};
612 use crate::client::StatsdClient;
613 use crate::sinks::NopMetricSink;
614 use crate::test::ErrorMetricSink;
615 use crate::types::Counter;
616 use std::sync::atomic::{AtomicU64, Ordering};
617 use std::sync::Arc;
618
619 #[test]
620 fn test_metric_formatter_tag_size_hint_no_tags() {
621 let fmt = MetricFormatter::counter("prefix.", "some.key", MetricValue::Signed(1));
622 assert_eq!(0, fmt.tag_size_hint());
623 }
624
625 #[test]
626 fn test_metric_formatter_tag_size_hint_value() {
627 let mut fmt = MetricFormatter::counter("prefix.", "some.key", MetricValue::Signed(1));
628 fmt.with_tag_value("test");
629
630 assert_eq!(6, fmt.tag_size_hint());
631 }
632
633 #[test]
634 fn test_metric_formatter_tag_size_hint_key_value() {
635 let mut fmt = MetricFormatter::counter("prefix.", "some.key", MetricValue::Signed(1));
636 fmt.with_tag("host", "web");
637 fmt.with_tag("user", "123");
638
639 assert_eq!(19, fmt.tag_size_hint());
640 }
641
642 #[test]
643 fn test_metric_formatter_container_id() {
644 let mut fmt = MetricFormatter::counter("prefix.", "some.key", MetricValue::Signed(1));
645 fmt.with_container_id("1234");
646
647 let expected = "prefix.some.key:1|c|c:1234";
648 assert_eq!(expected, &fmt.format());
649 assert_eq!(35, fmt.size_hint());
650 }
651
652 #[test]
653 fn test_metric_formatter_timestamp() {
654 let mut fmt = MetricFormatter::counter("prefix.", "some.key", MetricValue::Signed(1));
655 fmt.with_timestamp(1234567890);
656
657 let expected = "prefix.some.key:1|c|T1234567890";
658 assert_eq!(expected, &fmt.format());
659 assert_eq!(41, fmt.size_hint());
660 }
661
662 #[test]
663 fn test_metric_formatter_counter_no_tags() {
664 let fmt = MetricFormatter::counter("prefix.", "some.key", MetricValue::Signed(4));
665 assert_eq!("prefix.some.key:4|c", &fmt.format());
666 }
667
668 #[test]
669 fn test_metric_formatter_counter_with_tags() {
670 let mut fmt = MetricFormatter::counter("prefix.", "some.key", MetricValue::Signed(4));
671 fmt.with_tag("host", "app03.example.com");
672 fmt.with_tag("bucket", "2");
673 fmt.with_tag_value("beta");
674
675 assert_eq!(
676 "prefix.some.key:4|c|#host:app03.example.com,bucket:2,beta",
677 &fmt.format()
678 );
679 }
680
681 #[test]
682 fn test_metric_formatter_timer_no_tags() {
683 let fmt = MetricFormatter::timer("prefix.", "some.method", MetricValue::Unsigned(21));
684
685 assert_eq!("prefix.some.method:21|ms", &fmt.format());
686 }
687
688 #[test]
689 fn test_metric_formatter_timer_no_tags_multiple_values() {
690 let fmt = MetricFormatter::timer("prefix.", "some.method", MetricValue::PackedUnsigned(vec![21, 22, 23]));
691
692 assert_eq!("prefix.some.method:21:22:23|ms", &fmt.format());
693 }
694
695 #[test]
696 fn test_metric_formatter_timer_with_tags() {
697 let mut fmt = MetricFormatter::timer("prefix.", "some.method", MetricValue::Unsigned(21));
698 fmt.with_tag("app", "metrics");
699 fmt.with_tag_value("async");
700
701 assert_eq!("prefix.some.method:21|ms|#app:metrics,async", &fmt.format());
702 }
703
704 #[test]
705 fn test_metric_formatter_timer_with_tags_multiple_values() {
706 let mut fmt = MetricFormatter::timer("prefix.", "some.method", MetricValue::PackedUnsigned(vec![21, 22, 23]));
707 fmt.with_tag("app", "metrics");
708 fmt.with_tag_value("async");
709
710 assert_eq!("prefix.some.method:21:22:23|ms|#app:metrics,async", &fmt.format());
711 }
712
713 #[test]
714 fn test_metric_formatter_gauge_no_tags() {
715 let fmt = MetricFormatter::gauge("prefix.", "num.failures", MetricValue::Unsigned(7));
716
717 assert_eq!("prefix.num.failures:7|g", &fmt.format());
718 }
719
720 #[test]
721 fn test_metric_formatter_gauge_with_tags() {
722 let mut fmt = MetricFormatter::gauge("prefix.", "num.failures", MetricValue::Unsigned(7));
723 fmt.with_tag("window", "300");
724 fmt.with_tag_value("best-effort");
725
726 assert_eq!("prefix.num.failures:7|g|#window:300,best-effort", &fmt.format());
727 }
728
729 #[test]
730 fn test_metric_formatter_meter_no_tags() {
731 let fmt = MetricFormatter::meter("prefix.", "user.logins", MetricValue::Unsigned(3));
732
733 assert_eq!("prefix.user.logins:3|m", &fmt.format());
734 }
735
736 #[test]
737 fn test_metric_formatter_meter_with_tags() {
738 let mut fmt = MetricFormatter::meter("prefix.", "user.logins", MetricValue::Unsigned(3));
739 fmt.with_tag("user-type", "verified");
740 fmt.with_tag_value("bucket1");
741
742 assert_eq!("prefix.user.logins:3|m|#user-type:verified,bucket1", &fmt.format());
743 }
744
745 #[test]
746 fn test_metric_formatter_histogram_no_tags() {
747 let fmt = MetricFormatter::histogram("prefix.", "num.results", MetricValue::Unsigned(44));
748
749 assert_eq!("prefix.num.results:44|h", &fmt.format());
750 }
751
752 #[test]
753 fn test_metric_formatter_histogram_no_tags_multiple_values() {
754 let fmt = MetricFormatter::histogram("prefix.", "num.results", MetricValue::PackedUnsigned(vec![44, 45, 46]));
755
756 assert_eq!("prefix.num.results:44:45:46|h", &fmt.format());
757 }
758
759 #[test]
760 fn test_metric_formatter_histogram_with_tags() {
761 let mut fmt = MetricFormatter::histogram("prefix.", "num.results", MetricValue::Unsigned(44));
762 fmt.with_tag("user-type", "authenticated");
763 fmt.with_tag_value("source=search");
764
765 assert_eq!(
766 "prefix.num.results:44|h|#user-type:authenticated,source=search",
767 &fmt.format()
768 );
769 }
770
771 #[test]
772 fn test_metric_formatter_histogram_with_tags_multiple_values() {
773 let mut fmt =
774 MetricFormatter::histogram("prefix.", "num.results", MetricValue::PackedUnsigned(vec![44, 45, 46]));
775 fmt.with_tag("user-type", "authenticated");
776 fmt.with_tag_value("source=search");
777
778 assert_eq!(
779 "prefix.num.results:44:45:46|h|#user-type:authenticated,source=search",
780 &fmt.format()
781 );
782 }
783
784 #[test]
785 fn test_metric_formatter_distribution_no_tags() {
786 let fmt = MetricFormatter::distribution("prefix.", "latency.milliseconds", MetricValue::Unsigned(44));
787
788 assert_eq!("prefix.latency.milliseconds:44|d", &fmt.format());
789 }
790
791 #[test]
792 fn test_metric_formatter_distribution_no_tags_multiple_values() {
793 let fmt = MetricFormatter::distribution(
794 "prefix.",
795 "latency.milliseconds",
796 MetricValue::PackedUnsigned(vec![44, 45, 46]),
797 );
798
799 assert_eq!("prefix.latency.milliseconds:44:45:46|d", &fmt.format());
800 }
801
802 #[test]
803 fn test_metric_formatter_sampling_rate() {
804 let mut fmt =
805 MetricFormatter::distribution("prefix.", "some.key", MetricValue::PackedUnsigned(vec![44, 45, 46]));
806 fmt.with_sampling_rate(0.5);
807
808 let expected = "prefix.some.key:44:45:46|d|@0.5";
809 assert_eq!(expected, &fmt.format());
810 assert_eq!(68, fmt.size_hint());
811 }
812
813 #[test]
814 fn test_metric_formatter_sampling_rate_small() {
815 let mut fmt =
816 MetricFormatter::distribution("prefix.", "some.key", MetricValue::PackedUnsigned(vec![44, 45, 46]));
817 fmt.with_sampling_rate(0.000000000000000000001);
818
819 let expected = "prefix.some.key:44:45:46|d|@0.000000000000000000001";
820 assert_eq!(expected, &fmt.format());
821 assert_eq!(68, fmt.size_hint());
822 }
823
824 #[test]
825 fn test_metric_formatter_distribution_with_tags() {
826 let mut fmt = MetricFormatter::distribution("prefix.", "latency.milliseconds", MetricValue::Unsigned(44));
827 fmt.with_tag("user-type", "authenticated");
828 fmt.with_tag_value("source=search");
829
830 assert_eq!(
831 "prefix.latency.milliseconds:44|d|#user-type:authenticated,source=search",
832 &fmt.format()
833 );
834 }
835
836 #[test]
837 fn test_metric_formatter_distribution_with_tags_multiple_values() {
838 let mut fmt = MetricFormatter::distribution(
839 "prefix.",
840 "latency.milliseconds",
841 MetricValue::PackedUnsigned(vec![44, 45, 46]),
842 );
843 fmt.with_tag("user-type", "authenticated");
844 fmt.with_tag_value("source=search");
845
846 assert_eq!(
847 "prefix.latency.milliseconds:44:45:46|d|#user-type:authenticated,source=search",
848 &fmt.format()
849 );
850 }
851
852 #[test]
853 fn test_metric_formatter_set_no_tags() {
854 let fmt = MetricFormatter::set("prefix.", "users.uniques", MetricValue::Signed(44));
855
856 assert_eq!("prefix.users.uniques:44|s", &fmt.format());
857 }
858
859 #[test]
860 fn test_metric_formatter_set_with_tags() {
861 let mut fmt = MetricFormatter::set("prefix.", "users.uniques", MetricValue::Signed(44));
862 fmt.with_tag("user-type", "authenticated");
863 fmt.with_tag_value("source=search");
864
865 assert_eq!(
866 concat!(
867 "prefix.users.uniques:44|s|#",
868 "user-type:authenticated,",
869 "source=search"
870 ),
871 &fmt.format()
872 );
873 }
874
875 #[test]
876 fn test_metric_builder_send_success() {
877 let fmt = MetricFormatter::counter("prefix.", "some.counter", MetricValue::Signed(11));
878 let client = StatsdClient::builder("prefix.", NopMetricSink)
879 .with_error_handler(|e| {
880 panic!("unexpected error sending metric: {}", e);
881 })
882 .build();
883
884 let builder: MetricBuilder<'_, '_, Counter> = MetricBuilder::from_fmt(fmt, &client);
886 builder.send();
887 }
888
889 #[test]
890 fn test_metric_builder_send_error() {
891 let errors = Arc::new(AtomicU64::new(0));
892 let errors_ref = errors.clone();
893
894 let fmt = MetricFormatter::counter("prefix.", "some.counter", MetricValue::Signed(11));
895 let client = StatsdClient::builder("prefix.", ErrorMetricSink::always())
896 .with_error_handler(move |_e| {
897 errors_ref.fetch_add(1, Ordering::Release);
898 })
899 .build();
900
901 let builder: MetricBuilder<'_, '_, Counter> = MetricBuilder::from_fmt(fmt, &client);
902 builder.send();
903
904 assert_eq!(1, errors.load(Ordering::Acquire));
905 }
906
907 #[test]
908 fn test_metric_builder_try_send_success() {
909 let fmt = MetricFormatter::counter("prefix.", "some.counter", MetricValue::Signed(11));
910 let client = StatsdClient::from_sink("prefix.", NopMetricSink);
911
912 let builder: MetricBuilder<'_, '_, Counter> = MetricBuilder::from_fmt(fmt, &client);
913 let res = builder.try_send();
914
915 assert!(res.is_ok(), "expected Ok result from try_send");
916 }
917
918 #[test]
919 fn test_metric_builder_try_send_error() {
920 let fmt = MetricFormatter::counter("prefix.", "some.counter", MetricValue::Signed(11));
921 let client = StatsdClient::from_sink("prefix.", ErrorMetricSink::always());
922
923 let builder: MetricBuilder<'_, '_, Counter> = MetricBuilder::from_fmt(fmt, &client);
924 let res = builder.try_send();
925
926 assert!(res.is_err(), "expected Err result from try_send");
927 }
928}