1use core::{fmt, ops::ControlFlow};
6
7use emit_core::{
8 and::And,
9 emitter::Emitter,
10 event::{Event, ToEvent},
11 extent::{Extent, ToExtent},
12 or::Or,
13 path::Path,
14 props::{ErasedProps, Props},
15 str::{Str, ToStr},
16 template::{self, Template},
17 timestamp::Timestamp,
18 value::{ToValue, Value},
19 well_known::{KEY_EVT_KIND, KEY_METRIC_AGG, KEY_METRIC_NAME, KEY_METRIC_VALUE},
20};
21
22use crate::kind::Kind;
23
24pub use self::{sampler::Sampler, source::Source};
25
26pub struct Metric<'a, P> {
34 mdl: Path<'a>,
35 name: Str<'a>,
36 agg: Str<'a>,
37 extent: Option<Extent>,
38 tpl: Option<Template<'a>>,
39 value: Value<'a>,
40 props: P,
41}
42
43impl<'a, P> Metric<'a, P> {
44 pub fn new(
57 mdl: impl Into<Path<'a>>,
58 name: impl Into<Str<'a>>,
59 agg: impl Into<Str<'a>>,
60 extent: impl ToExtent,
61 value: impl Into<Value<'a>>,
62 props: P,
63 ) -> Self {
64 Metric {
65 mdl: mdl.into(),
66 extent: extent.to_extent(),
67 tpl: None,
68 name: name.into(),
69 agg: agg.into(),
70 value: value.into(),
71 props,
72 }
73 }
74
75 pub fn mdl(&self) -> &Path<'a> {
79 &self.mdl
80 }
81
82 pub fn with_mdl(mut self, mdl: impl Into<Path<'a>>) -> Self {
86 self.mdl = mdl.into();
87 self
88 }
89
90 pub fn name(&self) -> &Str<'a> {
94 &self.name
95 }
96
97 pub fn with_name(mut self, name: impl Into<Str<'a>>) -> Self {
101 self.name = name.into();
102 self
103 }
104
105 pub fn agg(&self) -> &Str<'a> {
111 &self.agg
112 }
113
114 pub fn with_agg(mut self, agg: impl Into<Str<'a>>) -> Self {
120 self.agg = agg.into();
121 self
122 }
123
124 pub fn value(&self) -> &Value<'a> {
128 &self.value
129 }
130
131 pub fn with_value(mut self, value: impl Into<Value<'a>>) -> Self {
135 self.value = value.into();
136 self
137 }
138
139 pub fn extent(&self) -> Option<&Extent> {
143 self.extent.as_ref()
144 }
145
146 pub fn with_extent(mut self, extent: impl ToExtent) -> Self {
150 self.extent = extent.to_extent();
151 self
152 }
153
154 pub fn ts(&self) -> Option<&Timestamp> {
160 self.extent.as_ref().map(|extent| extent.as_point())
161 }
162
163 pub fn ts_start(&self) -> Option<&Timestamp> {
169 self.extent
170 .as_ref()
171 .and_then(|extent| extent.as_range())
172 .map(|span| &span.start)
173 }
174
175 pub fn tpl(&self) -> &Template<'a> {
179 self.tpl.as_ref().unwrap_or(&TEMPLATE)
180 }
181
182 pub fn with_tpl(mut self, tpl: impl Into<Template<'a>>) -> Self {
186 self.tpl = Some(tpl.into());
187 self
188 }
189
190 pub fn props(&self) -> &P {
194 &self.props
195 }
196
197 pub fn props_mut(&mut self) -> &mut P {
201 &mut self.props
202 }
203
204 pub fn with_props<U>(self, props: U) -> Metric<'a, U> {
208 Metric {
209 mdl: self.mdl,
210 extent: self.extent,
211 tpl: self.tpl,
212 name: self.name,
213 agg: self.agg,
214 value: self.value,
215 props,
216 }
217 }
218
219 pub fn map_props<U>(self, map: impl FnOnce(P) -> U) -> Metric<'a, U> {
223 Metric {
224 mdl: self.mdl,
225 extent: self.extent,
226 tpl: self.tpl,
227 name: self.name,
228 agg: self.agg,
229 value: self.value,
230 props: map(self.props),
231 }
232 }
233}
234
235impl<'a, P: Props> fmt::Debug for Metric<'a, P> {
236 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
237 fmt::Debug::fmt(&self.to_event(), f)
238 }
239}
240
241impl<'a, P: Props> ToEvent for Metric<'a, P> {
242 type Props<'b>
243 = &'b Self
244 where
245 Self: 'b;
246
247 fn to_event<'b>(&'b self) -> Event<'b, Self::Props<'b>> {
248 Event::new(
249 self.mdl.by_ref(),
250 self.tpl().by_ref(),
251 self.extent.clone(),
252 self,
253 )
254 }
255}
256
257impl<'a, P: Props> Metric<'a, P> {
258 pub fn by_ref<'b>(&'b self) -> Metric<'b, &'b P> {
262 Metric {
263 mdl: self.mdl.by_ref(),
264 extent: self.extent.clone(),
265 tpl: self.tpl.as_ref().map(|tpl| tpl.by_ref()),
266 name: self.name.by_ref(),
267 agg: self.agg.by_ref(),
268 value: self.value.by_ref(),
269 props: &self.props,
270 }
271 }
272
273 pub fn erase<'b>(&'b self) -> Metric<'b, &'b dyn ErasedProps> {
277 Metric {
278 mdl: self.mdl.by_ref(),
279 extent: self.extent.clone(),
280 tpl: self.tpl.as_ref().map(|tpl| tpl.by_ref()),
281 name: self.name.by_ref(),
282 agg: self.agg.by_ref(),
283 value: self.value.by_ref(),
284 props: &self.props,
285 }
286 }
287}
288
289impl<'a, P> ToExtent for Metric<'a, P> {
290 fn to_extent(&self) -> Option<Extent> {
291 self.extent.clone()
292 }
293}
294
295impl<'a, P: Props> Props for Metric<'a, P> {
296 fn for_each<'kv, F: FnMut(Str<'kv>, Value<'kv>) -> ControlFlow<()>>(
297 &'kv self,
298 mut for_each: F,
299 ) -> ControlFlow<()> {
300 for_each(KEY_EVT_KIND.to_str(), Kind::Metric.to_value())?;
301 for_each(KEY_METRIC_NAME.to_str(), self.name.to_value())?;
302 for_each(KEY_METRIC_AGG.to_str(), self.agg.to_value())?;
303 for_each(KEY_METRIC_VALUE.to_str(), self.value.by_ref())?;
304
305 self.props.for_each(for_each)
306 }
307}
308
309const TEMPLATE_PARTS: &'static [template::Part<'static>] = &[
311 template::Part::hole("metric_agg"),
312 template::Part::text(" of "),
313 template::Part::hole("metric_name"),
314 template::Part::text(" is "),
315 template::Part::hole("metric_value"),
316];
317
318static TEMPLATE: Template<'static> = Template::new(TEMPLATE_PARTS);
319
320pub mod source {
321 use self::sampler::ErasedSampler;
328
329 use super::*;
330
331 pub trait Source {
335 fn sample_metrics<S: sampler::Sampler>(&self, sampler: S);
339
340 fn and_sample<U>(self, other: U) -> And<Self, U>
344 where
345 Self: Sized,
346 {
347 And::new(self, other)
348 }
349 }
350
351 impl<'a, T: Source + ?Sized> Source for &'a T {
352 fn sample_metrics<S: sampler::Sampler>(&self, sampler: S) {
353 (**self).sample_metrics(sampler)
354 }
355 }
356
357 impl<T: Source> Source for Option<T> {
358 fn sample_metrics<S: sampler::Sampler>(&self, sampler: S) {
359 if let Some(source) = self {
360 source.sample_metrics(sampler);
361 }
362 }
363 }
364
365 #[cfg(feature = "alloc")]
366 impl<'a, T: Source + ?Sized + 'a> Source for alloc::boxed::Box<T> {
367 fn sample_metrics<S: sampler::Sampler>(&self, sampler: S) {
368 (**self).sample_metrics(sampler)
369 }
370 }
371
372 #[cfg(feature = "alloc")]
373 impl<'a, T: Source + ?Sized + 'a> Source for alloc::sync::Arc<T> {
374 fn sample_metrics<S: sampler::Sampler>(&self, sampler: S) {
375 (**self).sample_metrics(sampler)
376 }
377 }
378
379 impl<T: Source, U: Source> Source for And<T, U> {
380 fn sample_metrics<S: sampler::Sampler>(&self, sampler: S) {
381 self.left().sample_metrics(&sampler);
382 self.right().sample_metrics(&sampler);
383 }
384 }
385
386 impl<T: Source, U: Source> Source for Or<T, U> {
387 fn sample_metrics<S: sampler::Sampler>(&self, sampler: S) {
388 self.left().sample_metrics(&sampler);
389 self.right().sample_metrics(&sampler);
390 }
391 }
392
393 impl<'a, P: Props> Source for Metric<'a, P> {
394 fn sample_metrics<S: Sampler>(&self, sampler: S) {
395 sampler.metric(self.by_ref());
396 }
397 }
398
399 pub struct FromFn<F = fn(&mut dyn ErasedSampler)>(F);
405
406 pub const fn from_fn<F: Fn(&mut dyn ErasedSampler)>(source: F) -> FromFn<F> {
410 FromFn::new(source)
411 }
412
413 impl<F> FromFn<F> {
414 pub const fn new(source: F) -> Self {
418 FromFn(source)
419 }
420 }
421
422 impl<F: Fn(&mut dyn ErasedSampler)> Source for FromFn<F> {
423 fn sample_metrics<S: sampler::Sampler>(&self, mut sampler: S) {
424 (self.0)(&mut sampler)
425 }
426 }
427
428 mod internal {
429 use super::*;
430
431 pub trait DispatchSource {
432 fn dispatch_sample_metrics(&self, sampler: &dyn sampler::ErasedSampler);
433 }
434
435 pub trait SealedSource {
436 fn erase_source(&self) -> crate::internal::Erased<&dyn DispatchSource>;
437 }
438 }
439
440 pub trait ErasedSource: internal::SealedSource {}
446
447 impl<T: Source> ErasedSource for T {}
448
449 impl<T: Source> internal::SealedSource for T {
450 fn erase_source(&self) -> crate::internal::Erased<&dyn internal::DispatchSource> {
451 crate::internal::Erased(self)
452 }
453 }
454
455 impl<T: Source> internal::DispatchSource for T {
456 fn dispatch_sample_metrics(&self, sampler: &dyn sampler::ErasedSampler) {
457 self.sample_metrics(sampler)
458 }
459 }
460
461 impl<'a> Source for dyn ErasedSource + 'a {
462 fn sample_metrics<S: sampler::Sampler>(&self, sampler: S) {
463 self.erase_source().0.dispatch_sample_metrics(&sampler)
464 }
465 }
466
467 impl<'a> Source for dyn ErasedSource + Send + Sync + 'a {
468 fn sample_metrics<S: sampler::Sampler>(&self, sampler: S) {
469 (self as &(dyn ErasedSource + 'a)).sample_metrics(sampler)
470 }
471 }
472
473 #[cfg(test)]
474 mod tests {
475 use super::*;
476 use std::cell::Cell;
477
478 #[test]
479 fn source_sample_emit() {
480 struct MySource;
481
482 impl Source for MySource {
483 fn sample_metrics<S: Sampler>(&self, sampler: S) {
484 sampler.metric(Metric::new(
485 Path::new_raw("test"),
486 "metric 1",
487 "count",
488 crate::Empty,
489 42,
490 crate::Empty,
491 ));
492
493 sampler.metric(Metric::new(
494 Path::new_raw("test"),
495 "metric 2",
496 "count",
497 crate::Empty,
498 42,
499 crate::Empty,
500 ));
501 }
502 }
503
504 let calls = Cell::new(0);
505
506 MySource.sample_metrics(sampler::from_fn(|_| {
507 calls.set(calls.get() + 1);
508 }));
509
510 assert_eq!(2, calls.get());
511 }
512
513 #[test]
514 fn and_sample() {
515 let calls = Cell::new(0);
516
517 from_fn(|sampler| {
518 sampler.metric(Metric::new(
519 Path::new_raw("test"),
520 "metric 1",
521 "count",
522 crate::Empty,
523 42,
524 crate::Empty,
525 ));
526 })
527 .and_sample(from_fn(|sampler| {
528 sampler.metric(Metric::new(
529 Path::new_raw("test"),
530 "metric 2",
531 "count",
532 crate::Empty,
533 42,
534 crate::Empty,
535 ));
536 }))
537 .sample_metrics(sampler::from_fn(|_| {
538 calls.set(calls.get() + 1);
539 }));
540
541 assert_eq!(2, calls.get());
542 }
543
544 #[test]
545 fn from_fn_source() {
546 let calls = Cell::new(0);
547
548 from_fn(|sampler| {
549 sampler.metric(Metric::new(
550 Path::new_raw("test"),
551 "metric 1",
552 "count",
553 crate::Empty,
554 42,
555 crate::Empty,
556 ));
557
558 sampler.metric(Metric::new(
559 Path::new_raw("test"),
560 "metric 2",
561 "count",
562 crate::Empty,
563 42,
564 crate::Empty,
565 ));
566 })
567 .sample_metrics(sampler::from_fn(|_| {
568 calls.set(calls.get() + 1);
569 }));
570
571 assert_eq!(2, calls.get());
572 }
573
574 #[test]
575 fn erased_source() {
576 let source = from_fn(|sampler| {
577 sampler.metric(Metric::new(
578 Path::new_raw("test"),
579 "metric 1",
580 "count",
581 crate::Empty,
582 42,
583 crate::Empty,
584 ));
585
586 sampler.metric(Metric::new(
587 Path::new_raw("test"),
588 "metric 2",
589 "count",
590 crate::Empty,
591 42,
592 crate::Empty,
593 ));
594 });
595
596 let source = &source as &dyn ErasedSource;
597
598 let calls = Cell::new(0);
599
600 source.sample_metrics(sampler::from_fn(|_| {
601 calls.set(calls.get() + 1);
602 }));
603
604 assert_eq!(2, calls.get());
605 }
606
607 #[test]
608 fn metric_as_source() {
609 let sampler = sampler::from_fn(|metric| {
610 assert_eq!("metric", metric.name().to_string());
611 assert_eq!("count", metric.agg().to_string());
612 });
613
614 let metric = Metric::new(
615 Path::new_raw("test"),
616 "metric",
617 "count",
618 crate::Empty,
619 42,
620 crate::Empty,
621 );
622
623 metric.sample_metrics(sampler);
624 }
625 }
626}
627
628#[cfg(feature = "alloc")]
629mod alloc_support {
630 use super::*;
631
632 use alloc::{boxed::Box, vec::Vec};
633 use core::ops::Range;
634
635 use crate::{
636 clock::{Clock, ErasedClock},
637 metric::source::{ErasedSource, Source},
638 };
639
640 pub struct Reporter {
658 sources: Vec<Box<dyn ErasedSource + Send + Sync>>,
659 clock: ReporterClock,
660 }
661
662 impl Reporter {
663 pub const fn new() -> Self {
670 Reporter {
671 sources: Vec::new(),
672 clock: ReporterClock::Default,
673 }
674 }
675
676 pub fn normalize_with_clock(
680 &mut self,
681 clock: impl Clock + Send + Sync + 'static,
682 ) -> &mut Self {
683 self.clock = ReporterClock::Other(Some(Box::new(clock)));
684
685 self
686 }
687
688 pub fn without_normalization(&mut self) -> &mut Self {
692 self.clock = ReporterClock::Other(None);
693
694 self
695 }
696
697 pub fn add_source(&mut self, source: impl Source + Send + Sync + 'static) -> &mut Self {
701 self.sources.push(Box::new(source));
702
703 self
704 }
705
706 pub fn sample_metrics<S: sampler::Sampler>(&self, sampler: S) {
710 let sampler = TimeNormalizer::new(self.clock.now(), sampler);
711
712 for source in &self.sources {
713 source.sample_metrics(&sampler);
714 }
715 }
716
717 pub fn emit_metrics<E: Emitter>(&self, emitter: E) {
721 self.sample_metrics(sampler::from_emitter(emitter).with_sampled_at(self.clock.now()))
722 }
723 }
724
725 impl Source for Reporter {
726 fn sample_metrics<S: sampler::Sampler>(&self, sampler: S) {
727 self.sample_metrics(sampler)
728 }
729 }
730
731 struct TimeNormalizer<S> {
732 now: Option<Timestamp>,
733 inner: S,
734 }
735
736 impl<S> TimeNormalizer<S> {
737 fn new(now: Option<Timestamp>, sampler: S) -> TimeNormalizer<S> {
738 TimeNormalizer {
739 now,
740 inner: sampler,
741 }
742 }
743 }
744
745 impl<S: Sampler> Sampler for TimeNormalizer<S> {
746 fn metric<P: Props>(&self, metric: Metric<P>) {
747 if let Some(now) = self.now {
748 let extent = metric.extent();
749
750 let extent = if let Some(range) = extent.and_then(|extent| extent.as_range()) {
751 normalize_range(now, range)
753 .map(Extent::range)
754 .unwrap_or_else(|| Extent::range(range.clone()))
756 } else {
757 Extent::point(now)
759 };
760
761 self.inner.metric(metric.with_extent(extent))
762 } else {
763 self.inner.metric(metric)
764 }
765 }
766
767 fn sampled_at(&self) -> Option<Timestamp> {
768 self.now
769 }
770 }
771
772 fn normalize_range(now: Timestamp, range: &Range<Timestamp>) -> Option<Range<Timestamp>> {
773 let len = range.end.duration_since(range.start)?;
776 let start = now.checked_sub(len)?;
777
778 Some(start..now)
779 }
780
781 enum ReporterClock {
782 Default,
783 Other(Option<Box<dyn ErasedClock + Send + Sync>>),
784 }
785
786 impl Clock for ReporterClock {
787 fn now(&self) -> Option<Timestamp> {
788 match self {
789 ReporterClock::Default => crate::platform::DefaultClock::new().now(),
790 ReporterClock::Other(clock) => clock.now(),
791 }
792 }
793 }
794
795 #[cfg(test)]
796 mod tests {
797 use super::*;
798 use std::time::Duration;
799
800 #[cfg(all(
801 target_arch = "wasm32",
802 target_vendor = "unknown",
803 target_os = "unknown"
804 ))]
805 use wasm_bindgen_test::*;
806
807 #[test]
808 fn reporter_is_send_sync() {
809 fn check<T: Send + Sync>() {}
810
811 check::<Reporter>();
812 }
813
814 #[test]
815 #[cfg(not(miri))]
816 #[cfg_attr(
817 all(
818 target_arch = "wasm32",
819 target_vendor = "unknown",
820 target_os = "unknown"
821 ),
822 wasm_bindgen_test
823 )]
824 fn reporter_sample() {
825 use std::cell::Cell;
826
827 let mut reporter = Reporter::new();
828
829 reporter
830 .add_source(source::from_fn(|sampler| {
831 sampler.metric(Metric::new(
832 Path::new_raw("test"),
833 "metric 1",
834 "count",
835 crate::Empty,
836 42,
837 crate::Empty,
838 ));
839 }))
840 .add_source(source::from_fn(|sampler| {
841 sampler.metric(Metric::new(
842 Path::new_raw("test"),
843 "metric 2",
844 "count",
845 crate::Empty,
846 42,
847 crate::Empty,
848 ));
849 }));
850
851 let calls = Cell::new(0);
852
853 reporter.sample_metrics(sampler::from_fn(|_| {
854 calls.set(calls.get() + 1);
855 }));
856
857 assert_eq!(2, calls.get());
858 }
859
860 struct TestClock(Option<Timestamp>);
861
862 impl Clock for TestClock {
863 fn now(&self) -> Option<Timestamp> {
864 self.0
865 }
866 }
867
868 #[test]
869 #[cfg(all(feature = "std", not(miri)))]
870 #[cfg_attr(
871 all(
872 target_arch = "wasm32",
873 target_vendor = "unknown",
874 target_os = "unknown"
875 ),
876 wasm_bindgen_test
877 )]
878 fn reporter_normalize_std() {
879 let mut reporter = Reporter::new();
880
881 reporter.add_source(source::from_fn(|sampler| {
882 sampler.metric(Metric::new(
883 Path::new_raw("test"),
884 "metric 1",
885 "count",
886 crate::Empty,
887 42,
888 crate::Empty,
889 ));
890 }));
891
892 reporter.sample_metrics(sampler::from_fn(|metric| {
893 assert!(metric.extent().is_some());
894 }));
895 }
896
897 #[wasm_bindgen_test]
898 #[cfg(all(feature = "web", not(miri)))]
899 #[cfg(all(
900 target_arch = "wasm32",
901 target_vendor = "unknown",
902 target_os = "unknown"
903 ))]
904 fn reporter_normalize_web() {
905 let mut reporter = Reporter::new();
906
907 reporter.add_source(source::from_fn(|sampler| {
908 sampler.metric(Metric::new(
909 Path::new_raw("test"),
910 "metric 1",
911 "count",
912 crate::Empty,
913 42,
914 crate::Empty,
915 ));
916 }));
917
918 reporter.sample_metrics(sampler::from_fn(|metric| {
919 assert!(metric.extent().is_some());
920 }));
921 }
922
923 #[test]
924 #[cfg_attr(
925 all(
926 target_arch = "wasm32",
927 target_vendor = "unknown",
928 target_os = "unknown"
929 ),
930 wasm_bindgen_test
931 )]
932 fn reporter_normalize_empty_extent() {
933 let mut reporter = Reporter::new();
934
935 reporter.normalize_with_clock(TestClock(Some(Timestamp::MIN)));
936
937 reporter.add_source(source::from_fn(|sampler| {
938 sampler.metric(Metric::new(
939 Path::new_raw("test"),
940 "metric 1",
941 "count",
942 crate::Empty,
943 42,
944 crate::Empty,
945 ));
946 }));
947
948 reporter.sample_metrics(sampler::from_fn(|metric| {
949 assert_eq!(Timestamp::MIN, metric.extent().unwrap().as_point());
950 }));
951 }
952
953 #[test]
954 #[cfg_attr(
955 all(
956 target_arch = "wasm32",
957 target_vendor = "unknown",
958 target_os = "unknown"
959 ),
960 wasm_bindgen_test
961 )]
962 fn reporter_normalize_point_extent() {
963 let mut reporter = Reporter::new();
964
965 reporter.normalize_with_clock(TestClock(Some(
966 Timestamp::from_unix(Duration::from_secs(37)).unwrap(),
967 )));
968
969 reporter.add_source(source::from_fn(|sampler| {
970 sampler.metric(Metric::new(
971 Path::new_raw("test"),
972 "metric 1",
973 "count",
974 Timestamp::from_unix(Duration::from_secs(100)).unwrap(),
975 42,
976 crate::Empty,
977 ));
978 }));
979
980 reporter.sample_metrics(sampler::from_fn(|metric| {
981 assert_eq!(
982 Timestamp::from_unix(Duration::from_secs(37)).unwrap(),
983 metric.extent().unwrap().as_point()
984 );
985 }));
986 }
987
988 #[test]
989 #[cfg_attr(
990 all(
991 target_arch = "wasm32",
992 target_vendor = "unknown",
993 target_os = "unknown"
994 ),
995 wasm_bindgen_test
996 )]
997 fn reporter_normalize_range_extent() {
998 let mut reporter = Reporter::new();
999
1000 reporter.normalize_with_clock(TestClock(Some(
1001 Timestamp::from_unix(Duration::from_secs(350)).unwrap(),
1002 )));
1003
1004 reporter.add_source(source::from_fn(|sampler| {
1005 sampler.metric(Metric::new(
1006 Path::new_raw("test"),
1007 "metric 1",
1008 "count",
1009 Timestamp::from_unix(Duration::from_secs(100)).unwrap()
1010 ..Timestamp::from_unix(Duration::from_secs(200)).unwrap(),
1011 42,
1012 crate::Empty,
1013 ));
1014 }));
1015
1016 reporter.sample_metrics(sampler::from_fn(|metric| {
1017 assert_eq!(
1018 Timestamp::from_unix(Duration::from_secs(250)).unwrap()
1019 ..Timestamp::from_unix(Duration::from_secs(350)).unwrap(),
1020 metric.extent().unwrap().as_range().unwrap().clone()
1021 );
1022 }));
1023 }
1024 }
1025}
1026
1027#[cfg(feature = "alloc")]
1028pub use self::alloc_support::*;
1029
1030pub mod sampler {
1031 use emit_core::empty::Empty;
1038
1039 use super::*;
1040
1041 pub trait Sampler {
1045 fn metric<P: Props>(&self, metric: Metric<P>);
1049
1050 fn sampled_at(&self) -> Option<Timestamp> {
1056 None
1057 }
1058
1059 fn with_sampled_at(self, now: Option<Timestamp>) -> WithSampledAt<Self>
1063 where
1064 Self: Sized,
1065 {
1066 WithSampledAt::new(self, now)
1067 }
1068 }
1069
1070 impl<'a, T: Sampler + ?Sized> Sampler for &'a T {
1071 fn metric<P: Props>(&self, metric: Metric<P>) {
1072 (**self).metric(metric)
1073 }
1074
1075 fn sampled_at(&self) -> Option<Timestamp> {
1076 (**self).sampled_at()
1077 }
1078 }
1079
1080 impl Sampler for Empty {
1081 fn metric<P: Props>(&self, _: Metric<P>) {}
1082 }
1083
1084 pub struct WithSampledAt<S> {
1088 sampler: S,
1089 now: Option<Timestamp>,
1090 }
1091
1092 impl<S> WithSampledAt<S> {
1093 pub const fn new(sampler: S, now: Option<Timestamp>) -> Self {
1097 WithSampledAt { sampler, now }
1098 }
1099 }
1100
1101 impl<S: Sampler> Sampler for WithSampledAt<S> {
1102 fn metric<P: Props>(&self, metric: Metric<P>) {
1103 self.sampler.metric(metric)
1104 }
1105
1106 fn sampled_at(&self) -> Option<Timestamp> {
1107 self.now
1108 }
1109 }
1110
1111 pub struct FromEmitter<E>(E);
1119
1120 impl<E: Emitter> Sampler for FromEmitter<E> {
1121 fn metric<P: Props>(&self, metric: Metric<P>) {
1122 self.0.emit(metric)
1123 }
1124 }
1125
1126 impl<E> FromEmitter<E> {
1127 pub const fn new(emitter: E) -> Self {
1131 FromEmitter(emitter)
1132 }
1133 }
1134
1135 pub const fn from_emitter<E: Emitter>(emitter: E) -> FromEmitter<E> {
1141 FromEmitter(emitter)
1142 }
1143
1144 pub struct FromFn<F = fn(Metric<&dyn ErasedProps>)>(F);
1150
1151 pub const fn from_fn<F: Fn(Metric<&dyn ErasedProps>)>(f: F) -> FromFn<F> {
1155 FromFn(f)
1156 }
1157
1158 impl<F> FromFn<F> {
1159 pub const fn new(sampler: F) -> FromFn<F> {
1163 FromFn(sampler)
1164 }
1165 }
1166
1167 impl<F: Fn(Metric<&dyn ErasedProps>)> Sampler for FromFn<F> {
1168 fn metric<P: Props>(&self, metric: Metric<P>) {
1169 (self.0)(metric.erase())
1170 }
1171 }
1172
1173 mod internal {
1174 use super::*;
1175
1176 pub trait DispatchSampler {
1177 fn dispatch_metric(&self, metric: Metric<&dyn ErasedProps>);
1178
1179 fn dispatch_sampled_at(&self) -> Option<Timestamp>;
1180 }
1181
1182 pub trait SealedSampler {
1183 fn erase_sampler(&self) -> crate::internal::Erased<&dyn DispatchSampler>;
1184 }
1185 }
1186
1187 pub trait ErasedSampler: internal::SealedSampler {}
1193
1194 impl<T: Sampler> ErasedSampler for T {}
1195
1196 impl<T: Sampler> internal::SealedSampler for T {
1197 fn erase_sampler(&self) -> crate::internal::Erased<&dyn internal::DispatchSampler> {
1198 crate::internal::Erased(self)
1199 }
1200 }
1201
1202 impl<T: Sampler> internal::DispatchSampler for T {
1203 fn dispatch_metric(&self, metric: Metric<&dyn ErasedProps>) {
1204 self.metric(metric)
1205 }
1206
1207 fn dispatch_sampled_at(&self) -> Option<Timestamp> {
1208 self.sampled_at()
1209 }
1210 }
1211
1212 impl<'a> Sampler for dyn ErasedSampler + 'a {
1213 fn metric<P: Props>(&self, metric: Metric<P>) {
1214 self.erase_sampler().0.dispatch_metric(metric.erase())
1215 }
1216
1217 fn sampled_at(&self) -> Option<Timestamp> {
1218 self.erase_sampler().0.dispatch_sampled_at()
1219 }
1220 }
1221
1222 impl<'a> Sampler for dyn ErasedSampler + Send + Sync + 'a {
1223 fn metric<P: Props>(&self, metric: Metric<P>) {
1224 (self as &(dyn ErasedSampler + 'a)).metric(metric)
1225 }
1226
1227 fn sampled_at(&self) -> Option<Timestamp> {
1228 (self as &(dyn ErasedSampler + 'a)).sampled_at()
1229 }
1230 }
1231
1232 #[cfg(test)]
1233 mod tests {
1234 use super::*;
1235 use std::cell::Cell;
1236
1237 #[test]
1238 fn from_fn_sampler() {
1239 let called = Cell::new(false);
1240
1241 let sampler = from_fn(|metric| {
1242 assert_eq!("test", metric.name());
1243
1244 called.set(true);
1245 });
1246
1247 sampler.metric(Metric::new(
1248 Path::new_raw("test"),
1249 "test",
1250 "count",
1251 Empty,
1252 1,
1253 Empty,
1254 ));
1255
1256 assert!(called.get());
1257 }
1258
1259 #[test]
1260 fn erased_sampler() {
1261 let called = Cell::new(false);
1262
1263 let sampler = from_fn(|metric| {
1264 assert_eq!("test", metric.name());
1265
1266 called.set(true);
1267 });
1268
1269 let sampler = &sampler as &dyn ErasedSampler;
1270
1271 sampler.metric(Metric::new(
1272 Path::new_raw("test"),
1273 "test",
1274 "count",
1275 Empty,
1276 1,
1277 Empty,
1278 ));
1279
1280 assert!(called.get());
1281 }
1282 }
1283}
1284
1285pub mod exp {
1286 use crate::{
1291 platform::libm,
1292 value::{FromValue, ToValue, Value},
1293 };
1294
1295 use core::{cmp, fmt, hash};
1296
1297 #[derive(Clone, Copy)]
1305 #[repr(transparent)]
1306 pub struct Point(f64);
1307
1308 impl Point {
1309 pub const fn new(value: f64) -> Self {
1313 Point(value)
1314 }
1315
1316 pub const fn get(&self) -> f64 {
1320 self.0
1321 }
1322
1323 pub const fn is_sign_positive(&self) -> bool {
1327 self.get().is_sign_positive()
1328 }
1329
1330 pub const fn is_sign_negative(&self) -> bool {
1334 self.get().is_sign_negative()
1335 }
1336
1337 pub const fn is_zero_bucket(&self) -> bool {
1343 self.get() == 0.0
1344 }
1345
1346 pub const fn is_positive_bucket(&self) -> bool {
1352 self.is_indexable() && self.is_sign_positive()
1353 }
1354
1355 pub const fn is_negative_bucket(&self) -> bool {
1361 self.is_indexable() && self.is_sign_negative()
1362 }
1363
1364 pub const fn is_indexable(&self) -> bool {
1373 let value = self.get();
1374
1375 value != 0.0 && value.is_finite()
1376 }
1377 }
1378
1379 impl From<f64> for Point {
1380 fn from(value: f64) -> Self {
1381 Point::new(value)
1382 }
1383 }
1384
1385 impl From<Point> for f64 {
1386 fn from(value: Point) -> Self {
1387 value.get()
1388 }
1389 }
1390
1391 impl PartialEq for Point {
1392 fn eq(&self, other: &Self) -> bool {
1393 self.cmp(other) == cmp::Ordering::Equal
1394 }
1395 }
1396
1397 impl Eq for Point {}
1398
1399 impl PartialOrd for Point {
1400 fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
1401 Some(self.cmp(other))
1402 }
1403 }
1404
1405 impl Ord for Point {
1406 fn cmp(&self, other: &Self) -> cmp::Ordering {
1407 libm::cmp(self.get()).cmp(&libm::cmp(other.get()))
1408 }
1409 }
1410
1411 impl hash::Hash for Point {
1412 fn hash<H: hash::Hasher>(&self, state: &mut H) {
1413 libm::cmp(self.get()).hash(state)
1414 }
1415 }
1416
1417 impl fmt::Debug for Point {
1418 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1419 fmt::Debug::fmt(&self.get(), f)
1420 }
1421 }
1422
1423 impl fmt::Display for Point {
1424 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1425 fmt::Display::fmt(&self.get(), f)
1426 }
1427 }
1428
1429 #[cfg(feature = "sval")]
1430 impl sval::Value for Point {
1431 fn stream<'sval, S: sval::Stream<'sval> + ?Sized>(
1432 &'sval self,
1433 stream: &mut S,
1434 ) -> sval::Result {
1435 stream.f64(self.get())
1436 }
1437 }
1438
1439 #[cfg(feature = "serde")]
1440 impl serde::Serialize for Point {
1441 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1442 where
1443 S: serde::Serializer,
1444 {
1445 serializer.serialize_f64(self.get())
1446 }
1447 }
1448
1449 impl ToValue for Point {
1450 fn to_value(&self) -> Value<'_> {
1451 Value::capture_display(self)
1452 }
1453 }
1454
1455 impl<'v> FromValue<'v> for Point {
1456 fn from_value(value: Value<'v>) -> Option<Self>
1457 where
1458 Self: Sized,
1459 {
1460 value
1461 .downcast_ref::<Point>()
1462 .copied()
1463 .or_else(|| f64::from_value(value).map(Point::new))
1464 }
1465 }
1466
1467 pub const fn gamma(scale: i32) -> f64 {
1479 libm::pow(2.0, libm::pow(2.0, -(scale as f64)))
1480 }
1481
1482 pub const fn midpoint(value: f64, scale: i32) -> Point {
1502 let sign = value.signum();
1503 let value = value.abs();
1504
1505 if value == 0.0 {
1506 return Point::new(value);
1507 }
1508
1509 let gamma = gamma(scale);
1510
1511 let index = libm::ceil(libm::log(value, gamma));
1512
1513 let lower = libm::pow(gamma, index - 1.0);
1514 let upper = lower * gamma;
1515
1516 Point::new(sign * lower.midpoint(upper))
1517 }
1518
1519 #[cfg(test)]
1520 mod tests {
1521 use core::f64::consts::PI;
1522
1523 use super::*;
1524
1525 #[test]
1526 fn point_cmp() {
1527 let mut values = vec![
1528 Point::new(1.0),
1529 Point::new(f64::NAN),
1530 Point::new(0.0),
1531 Point::new(f64::NEG_INFINITY),
1532 Point::new(-1.0),
1533 Point::new(-0.0),
1534 Point::new(f64::INFINITY),
1535 ];
1536
1537 values.sort();
1538
1539 assert_eq!(
1540 vec![
1541 Point::new(f64::NEG_INFINITY),
1542 Point::new(-1.0),
1543 Point::new(-0.0),
1544 Point::new(0.0),
1545 Point::new(1.0),
1546 Point::new(f64::INFINITY),
1547 Point::new(f64::NAN)
1548 ],
1549 &*values
1550 );
1551 }
1552
1553 #[test]
1554 fn point_is_indexable() {
1555 for (case, indexable) in [
1556 (Point::new(0.0), false),
1557 (Point::new(-0.0), false),
1558 (Point::new(f64::INFINITY), false),
1559 (Point::new(f64::NEG_INFINITY), false),
1560 (Point::new(f64::NAN), false),
1561 (Point::new(f64::EPSILON), true),
1562 (Point::new(-f64::EPSILON), true),
1563 (Point::new(f64::MIN), true),
1564 (Point::new(f64::MAX), true),
1565 ] {
1566 assert_eq!(indexable, case.is_indexable());
1567 }
1568 }
1569
1570 #[test]
1571 fn point_is_bucket() {
1572 for (case, zero, neg, pos) in [
1573 (Point::new(0.0), true, false, false),
1574 (Point::new(-0.0), true, false, false),
1575 (Point::new(f64::INFINITY), false, false, false),
1576 (Point::new(f64::NEG_INFINITY), false, false, false),
1577 (Point::new(f64::NAN), false, false, false),
1578 (Point::new(f64::EPSILON), false, false, true),
1579 (Point::new(-f64::EPSILON), false, true, false),
1580 (Point::new(f64::MIN), false, true, false),
1581 (Point::new(f64::MAX), false, false, true),
1582 ] {
1583 assert_eq!(zero, case.is_zero_bucket());
1584 assert_eq!(neg, case.is_negative_bucket());
1585 assert_eq!(pos, case.is_positive_bucket());
1586 }
1587 }
1588
1589 #[cfg(feature = "sval")]
1590 #[test]
1591 fn point_stream() {
1592 sval_test::assert_tokens(&Point::new(3.1), &[sval_test::Token::F64(3.1)]);
1593 }
1594
1595 #[cfg(feature = "serde")]
1596 #[test]
1597 fn point_serialize() {
1598 serde_test::assert_ser_tokens(&Point::new(3.1), &[serde_test::Token::F64(3.1)]);
1599 }
1600
1601 #[test]
1602 fn point_to_from_value() {
1603 let point = Point::new(3.1);
1604
1605 assert_eq!(point, Point::from_value(point.to_value()).unwrap());
1606 }
1607
1608 #[test]
1609 fn compute_midpoints() {
1610 let cases = [
1611 0.0f64,
1612 PI,
1613 PI * 100.0f64,
1614 PI * 1000.0f64,
1615 -0.0f64,
1616 -PI,
1617 -(PI * 100.0f64),
1618 -(PI * 1000.0f64),
1619 f64::INFINITY,
1620 f64::NEG_INFINITY,
1621 f64::NAN,
1622 ];
1623 for (scale, expected) in [
1624 (
1625 0i32,
1626 [
1627 0.0f64,
1628 3.0f64,
1629 384.0f64,
1630 3072.0f64,
1631 0.0f64,
1632 -3.0f64,
1633 -384.0f64,
1634 -3072.0f64,
1635 f64::INFINITY,
1636 f64::NEG_INFINITY,
1637 f64::NAN,
1638 ],
1639 ),
1640 (
1641 2i32,
1642 [
1643 0.0f64,
1644 3.0960063928805233f64,
1645 333.2378467041041f64,
1646 3170.3105463096517f64,
1647 0.0f64,
1648 -3.0960063928805233f64,
1649 -333.2378467041041f64,
1650 -3170.3105463096517f64,
1651 f64::INFINITY,
1652 f64::NEG_INFINITY,
1653 f64::NAN,
1654 ],
1655 ),
1656 (
1657 4i32,
1658 [
1659 0.0f64,
1660 3.152701157357188f64,
1661 311.17631066575086f64,
1662 3091.493858659732f64,
1663 0.0f64,
1664 -3.152701157357188f64,
1665 -311.17631066575086f64,
1666 -3091.493858659732f64,
1667 f64::INFINITY,
1668 f64::NEG_INFINITY,
1669 f64::NAN,
1670 ],
1671 ),
1672 (
1673 8i32,
1674 [
1675 0.0f64,
1676 3.1391891212579424f64,
1677 314.0658342072582f64,
1678 3145.6489181930947f64,
1679 0.0f64,
1680 -3.1391891212579424f64,
1681 -314.0658342072582f64,
1682 -3145.6489181930947f64,
1683 f64::INFINITY,
1684 f64::NEG_INFINITY,
1685 f64::NAN,
1686 ],
1687 ),
1688 (
1689 16i32,
1690 [
1691 0.0f64,
1692 3.141594303685526f64,
1693 314.1602303152259f64,
1694 3141.606302893263f64,
1695 0.0f64,
1696 -3.141594303685526f64,
1697 -314.1602303152259f64,
1698 -3141.606302893263f64,
1699 f64::INFINITY,
1700 f64::NEG_INFINITY,
1701 f64::NAN,
1702 ],
1703 ),
1704 ] {
1705 for (case, expected) in cases.iter().copied().zip(expected.iter().copied()) {
1706 let actual = midpoint(case, scale);
1707 let roundtrip = midpoint(actual.get(), scale);
1708
1709 if expected.is_nan() && actual.get().is_nan() && roundtrip.get().is_nan() {
1710 continue;
1711 }
1712
1713 assert_eq!(
1714 expected.to_bits(),
1715 actual.get().to_bits(),
1716 "expected midpoint({case}, {scale}) to be {expected}, but got {actual}"
1717 );
1718
1719 assert_eq!(
1720 actual.get().to_bits(),
1721 roundtrip.get().to_bits(),
1722 "expected midpoint(midpoint({case}, {scale}), {scale}) to roundtrip to {actual}, but got {roundtrip}"
1723 );
1724 }
1725 }
1726 }
1727 }
1728}
1729
1730mod delta {
1731 use super::*;
1732
1733 use core::mem;
1734
1735 use crate::Timestamp;
1736
1737 pub struct Delta<T> {
1752 start: Option<Timestamp>,
1753 value: T,
1754 }
1755
1756 impl<T> Delta<T> {
1757 pub fn new(start: Option<Timestamp>, initial: T) -> Self {
1761 Delta {
1762 start,
1763 value: initial,
1764 }
1765 }
1766
1767 pub fn new_default(start: Option<Timestamp>) -> Self
1771 where
1772 T: Default,
1773 {
1774 Self::new(start, Default::default())
1775 }
1776
1777 pub fn current_start(&self) -> Option<&Timestamp> {
1781 self.start.as_ref()
1782 }
1783
1784 pub fn current_value_mut(&mut self) -> &mut T {
1788 &mut self.value
1789 }
1790
1791 pub fn current_value(&self) -> &T {
1795 &self.value
1796 }
1797
1798 pub fn advance(&mut self, end: Option<Timestamp>) -> (Option<Extent>, &mut T) {
1807 let start = mem::replace(&mut self.start, end);
1808
1809 let extent = (start..end).to_extent();
1810
1811 (extent, &mut self.value)
1812 }
1813
1814 pub fn advance_default(&mut self, end: Option<Timestamp>) -> (Option<Extent>, T)
1823 where
1824 T: Default,
1825 {
1826 let (extent, value) = self.advance(end);
1827
1828 (extent, mem::take(value))
1829 }
1830 }
1831
1832 #[cfg(test)]
1833 mod tests {
1834 use super::*;
1835
1836 use core::time::Duration;
1837
1838 #[test]
1839 fn delta_advance() {
1840 let mut delta = Delta::new(Some(Timestamp::MIN), 0);
1841
1842 *delta.current_value_mut() += 1;
1843
1844 let (extent, value) = delta.advance(Some(Timestamp::MIN + Duration::from_secs(1)));
1845 let extent = extent.unwrap();
1846 let range = extent.as_range().unwrap();
1847
1848 assert_eq!(
1849 Timestamp::MIN..Timestamp::MIN + Duration::from_secs(1),
1850 *range
1851 );
1852 assert_eq!(1, *value);
1853 }
1854 }
1855}
1856
1857pub use self::delta::*;
1858
1859#[cfg(test)]
1860mod tests {
1861 use super::*;
1862 use std::time::Duration;
1863
1864 use crate::Timestamp;
1865
1866 #[test]
1867 fn metric_new() {
1868 let metric = Metric::new(
1869 Path::new_raw("test"),
1870 "my metric",
1871 "count",
1872 Timestamp::from_unix(Duration::from_secs(1)),
1873 42,
1874 ("metric_prop", true),
1875 );
1876
1877 assert_eq!("test", metric.mdl());
1878 assert_eq!(
1879 Timestamp::from_unix(Duration::from_secs(1)).unwrap(),
1880 metric.extent().unwrap().as_point()
1881 );
1882 assert_eq!("my metric", metric.name());
1883 assert_eq!("count", metric.agg());
1884 assert_eq!(42, metric.value().by_ref().cast::<i32>().unwrap());
1885 assert_eq!(true, metric.props().pull::<bool, _>("metric_prop").unwrap());
1886 }
1887
1888 #[test]
1889 fn metric_to_event() {
1890 let metric = Metric::new(
1891 Path::new_raw("test"),
1892 "my metric",
1893 "count",
1894 Timestamp::from_unix(Duration::from_secs(1)),
1895 42,
1896 ("metric_prop", true),
1897 );
1898
1899 let evt = metric.to_event();
1900
1901 assert_eq!("test", evt.mdl());
1902 assert_eq!(
1903 Timestamp::from_unix(Duration::from_secs(1)).unwrap(),
1904 evt.extent().unwrap().as_point()
1905 );
1906 assert_eq!("count of my metric is 42", evt.msg().to_string());
1907 assert_eq!("count", evt.props().pull::<Str, _>(KEY_METRIC_AGG).unwrap());
1908 assert_eq!(42, evt.props().pull::<i32, _>(KEY_METRIC_VALUE).unwrap());
1909 assert_eq!(
1910 "my metric",
1911 evt.props().pull::<Str, _>(KEY_METRIC_NAME).unwrap()
1912 );
1913 assert_eq!(true, evt.props().pull::<bool, _>("metric_prop").unwrap());
1914 assert_eq!(
1915 Kind::Metric,
1916 evt.props().pull::<Kind, _>(KEY_EVT_KIND).unwrap()
1917 );
1918 }
1919
1920 #[test]
1921 fn metric_to_event_uses_tpl() {
1922 assert_eq!(
1923 "test",
1924 Metric::new(
1925 Path::new_raw("test"),
1926 "my metric",
1927 "count",
1928 Timestamp::from_unix(Duration::from_secs(1)),
1929 42,
1930 ("metric_prop", true),
1931 )
1932 .with_tpl(Template::literal("test"))
1933 .to_event()
1934 .msg()
1935 .to_string(),
1936 );
1937 }
1938
1939 #[test]
1940 fn metric_to_extent() {
1941 for (case, expected) in [
1942 (
1943 Some(Timestamp::from_unix(Duration::from_secs(1)).unwrap()),
1944 Some(Extent::point(
1945 Timestamp::from_unix(Duration::from_secs(1)).unwrap(),
1946 )),
1947 ),
1948 (None, None),
1949 ] {
1950 let metric = Metric::new(
1951 Path::new_raw("test"),
1952 "my metric",
1953 "count",
1954 case,
1955 42,
1956 ("metric_prop", true),
1957 );
1958
1959 let extent = metric.to_extent();
1960
1961 assert_eq!(
1962 expected.map(|extent| extent.as_range().cloned()),
1963 extent.map(|extent| extent.as_range().cloned())
1964 );
1965 }
1966 }
1967}