1use crate::{dbg_panic, telemetry::TaskQueueLabelStrategy};
2use std::{
3 borrow::Cow,
4 collections::{BTreeMap, HashMap},
5 fmt::{Debug, Display},
6 ops::Deref,
7 sync::{
8 Arc, OnceLock,
9 atomic::{AtomicU64, Ordering},
10 },
11 time::Duration,
12};
13
14#[cfg(feature = "core-based-sdk")]
15pub mod core;
16
17pub const WORKFLOW_E2E_LATENCY_HISTOGRAM_NAME: &str = "workflow_endtoend_latency";
19pub const WORKFLOW_TASK_SCHED_TO_START_LATENCY_HISTOGRAM_NAME: &str =
21 "workflow_task_schedule_to_start_latency";
22pub const WORKFLOW_TASK_REPLAY_LATENCY_HISTOGRAM_NAME: &str = "workflow_task_replay_latency";
24pub const WORKFLOW_TASK_EXECUTION_LATENCY_HISTOGRAM_NAME: &str = "workflow_task_execution_latency";
26pub const ACTIVITY_SCHED_TO_START_LATENCY_HISTOGRAM_NAME: &str =
28 "activity_schedule_to_start_latency";
29pub const ACTIVITY_EXEC_LATENCY_HISTOGRAM_NAME: &str = "activity_execution_latency";
31
32macro_rules! define_latency_buckets {
34 ($(($metric_name:pat, $name:ident, $sec_name:ident, [$($bucket:expr),*])),*) => {
35 $(
36 pub(super) static $name: &[f64] = &[$($bucket,)*];
37 pub(super) static $sec_name: &[f64] = &[$( $bucket / 1000.0, )*];
38 )*
39
40 pub fn default_buckets_for(histo_name: &str, use_seconds: bool) -> &'static [f64] {
46 match histo_name {
47 $(
48 $metric_name => { if use_seconds { &$sec_name } else { &$name } },
49 )*
50 }
51 }
52 };
53}
54
55define_latency_buckets!(
56 (
57 WORKFLOW_E2E_LATENCY_HISTOGRAM_NAME,
58 WF_LATENCY_MS_BUCKETS,
59 WF_LATENCY_S_BUCKETS,
60 [
61 100.,
62 500.,
63 1000.,
64 1500.,
65 2000.,
66 5000.,
67 10_000.,
68 30_000.,
69 60_000.,
70 120_000.,
71 300_000.,
72 600_000.,
73 1_800_000., 3_600_000., 30_600_000., 8.64e7 ]
78 ),
79 (
80 WORKFLOW_TASK_EXECUTION_LATENCY_HISTOGRAM_NAME
81 | WORKFLOW_TASK_REPLAY_LATENCY_HISTOGRAM_NAME,
82 WF_TASK_MS_BUCKETS,
83 WF_TASK_S_BUCKETS,
84 [1., 10., 20., 50., 100., 200., 500., 1000.]
85 ),
86 (
87 ACTIVITY_EXEC_LATENCY_HISTOGRAM_NAME,
88 ACT_EXE_MS_BUCKETS,
89 ACT_EXE_S_BUCKETS,
90 [50., 100., 500., 1000., 5000., 10_000., 60_000.]
91 ),
92 (
93 WORKFLOW_TASK_SCHED_TO_START_LATENCY_HISTOGRAM_NAME
94 | ACTIVITY_SCHED_TO_START_LATENCY_HISTOGRAM_NAME,
95 TASK_SCHED_TO_START_MS_BUCKETS,
96 TASK_SCHED_TO_START_S_BUCKETS,
97 [100., 500., 1000., 5000., 10_000., 100_000., 1_000_000.]
98 ),
99 (
100 _,
101 DEFAULT_MS_BUCKETS,
102 DEFAULT_S_BUCKETS,
103 [50., 100., 500., 1000., 2500., 10_000.]
104 )
105);
106
107pub trait CoreMeter: Send + Sync + Debug {
111 fn new_attributes(&self, attribs: NewAttributes) -> MetricAttributes;
115 fn extend_attributes(
119 &self,
120 existing: MetricAttributes,
121 attribs: NewAttributes,
122 ) -> MetricAttributes;
123 fn counter(&self, params: MetricParameters) -> Counter;
125
126 fn counter_with_in_memory(
128 &self,
129 params: MetricParameters,
130 in_memory_counter: HeartbeatMetricType,
131 ) -> Counter {
132 let primary_counter = self.counter(params);
133
134 Counter::new_with_in_memory(primary_counter.primary.metric.clone(), in_memory_counter)
135 }
136
137 fn histogram(&self, params: MetricParameters) -> Histogram;
139 fn histogram_f64(&self, params: MetricParameters) -> HistogramF64;
141 fn histogram_duration(&self, params: MetricParameters) -> HistogramDuration;
146
147 fn histogram_duration_with_in_memory(
149 &self,
150 params: MetricParameters,
151 in_memory_hist: HeartbeatMetricType,
152 ) -> HistogramDuration {
153 let primary_hist = self.histogram_duration(params);
154
155 HistogramDuration::new_with_in_memory(primary_hist.primary.metric.clone(), in_memory_hist)
156 }
157 fn gauge(&self, params: MetricParameters) -> Gauge;
159
160 fn gauge_with_in_memory(
162 &self,
163 params: MetricParameters,
164 in_memory_metrics: HeartbeatMetricType,
165 ) -> Gauge {
166 let primary_gauge = self.gauge(params.clone());
167 Gauge::new_with_in_memory(primary_gauge.primary.metric.clone(), in_memory_metrics)
168 }
169
170 fn gauge_f64(&self, params: MetricParameters) -> GaugeF64;
172
173 fn up_down_counter(&self, params: MetricParameters) -> UpDownCounter;
175}
176
177#[derive(Clone, Debug)]
181pub enum HeartbeatMetricType {
182 Individual(Arc<AtomicU64>),
184 WithLabel {
186 label_key: String,
188 metrics: HashMap<String, Arc<AtomicU64>>,
190 },
191}
192
193impl HeartbeatMetricType {
194 fn record_counter(&self, delta: u64) {
195 match self {
196 HeartbeatMetricType::Individual(metric) => {
197 metric.fetch_add(delta, Ordering::Relaxed);
198 }
199 HeartbeatMetricType::WithLabel { .. } => {
200 dbg_panic!("Counter does not support in-memory metric with labels");
201 }
202 }
203 }
204
205 fn record_histogram_observation(&self) {
206 match self {
207 HeartbeatMetricType::Individual(metric) => {
208 metric.fetch_add(1, Ordering::Relaxed);
209 }
210 HeartbeatMetricType::WithLabel { .. } => {
211 dbg_panic!("Histogram does not support in-memory metric with labels");
212 }
213 }
214 }
215
216 fn record_gauge(&self, value: u64, attributes: &MetricAttributes) {
217 match self {
218 HeartbeatMetricType::Individual(metric) => {
219 metric.store(value, Ordering::Relaxed);
220 }
221 HeartbeatMetricType::WithLabel { label_key, metrics } => {
222 if let Some(metric) = label_value_from_attributes(attributes, label_key.as_str())
223 .and_then(|label_value| metrics.get(label_value.as_str()))
224 {
225 metric.store(value, Ordering::Relaxed)
226 }
227 }
228 }
229 }
230}
231
232fn label_value_from_attributes(attributes: &MetricAttributes, key: &str) -> Option<String> {
233 match attributes {
234 MetricAttributes::Prometheus { labels } => labels.as_prom_labels().get(key).cloned(),
235 #[cfg(feature = "otel")]
236 MetricAttributes::OTel { kvs } => kvs
237 .iter()
238 .find(|kv| kv.key.as_str() == key)
239 .map(|kv| kv.value.to_string()),
240 MetricAttributes::NoOp(labels) => labels.get(key).cloned(),
241 _ => None,
242 }
243}
244
245#[derive(Debug, Clone, bon::Builder)]
247pub struct MetricParameters {
248 #[builder(into)]
250 pub name: Cow<'static, str>,
251 #[builder(into, default = Cow::Borrowed(""))]
253 pub description: Cow<'static, str>,
254 #[builder(into, default = Cow::Borrowed(""))]
256 pub unit: Cow<'static, str>,
257}
258impl From<&'static str> for MetricParameters {
259 fn from(value: &'static str) -> Self {
260 Self {
261 name: value.into(),
262 description: Default::default(),
263 unit: Default::default(),
264 }
265 }
266}
267
268#[derive(Clone, Debug)]
270pub struct TemporalMeter {
271 inner: Arc<dyn CoreMeter>,
272 default_attribs: MetricAttributes,
273 task_queue_label_strategy: TaskQueueLabelStrategy,
274}
275
276impl TemporalMeter {
277 pub fn new(
279 meter: Arc<dyn CoreMeter>,
280 default_attribs: NewAttributes,
281 task_queue_label_strategy: TaskQueueLabelStrategy,
282 ) -> Self {
283 Self {
284 default_attribs: meter.new_attributes(default_attribs),
285 inner: meter,
286 task_queue_label_strategy,
287 }
288 }
289
290 pub fn no_op() -> Self {
292 Self {
293 inner: Arc::new(NoOpCoreMeter),
294 default_attribs: MetricAttributes::NoOp(Arc::new(Default::default())),
295 task_queue_label_strategy: TaskQueueLabelStrategy::UseNormal,
296 }
297 }
298
299 pub fn get_default_attributes(&self) -> &MetricAttributes {
301 &self.default_attribs
302 }
303
304 pub fn get_task_queue_label_strategy(&self) -> &TaskQueueLabelStrategy {
306 &self.task_queue_label_strategy
307 }
308
309 pub fn merge_attributes(&mut self, new_attribs: NewAttributes) {
311 self.default_attribs = self.extend_attributes(self.default_attribs.clone(), new_attribs);
312 }
313}
314
315impl Deref for TemporalMeter {
316 type Target = dyn CoreMeter;
317 fn deref(&self) -> &Self::Target {
318 self.inner.as_ref()
319 }
320}
321
322impl CoreMeter for Arc<dyn CoreMeter> {
323 fn new_attributes(&self, attribs: NewAttributes) -> MetricAttributes {
324 self.as_ref().new_attributes(attribs)
325 }
326
327 fn extend_attributes(
328 &self,
329 existing: MetricAttributes,
330 attribs: NewAttributes,
331 ) -> MetricAttributes {
332 self.as_ref().extend_attributes(existing, attribs)
333 }
334
335 fn counter(&self, params: MetricParameters) -> Counter {
336 self.as_ref().counter(params)
337 }
338 fn histogram(&self, params: MetricParameters) -> Histogram {
339 self.as_ref().histogram(params)
340 }
341
342 fn histogram_f64(&self, params: MetricParameters) -> HistogramF64 {
343 self.as_ref().histogram_f64(params)
344 }
345
346 fn histogram_duration(&self, params: MetricParameters) -> HistogramDuration {
347 self.as_ref().histogram_duration(params)
348 }
349
350 fn gauge(&self, params: MetricParameters) -> Gauge {
351 self.as_ref().gauge(params)
352 }
353
354 fn gauge_f64(&self, params: MetricParameters) -> GaugeF64 {
355 self.as_ref().gauge_f64(params)
356 }
357
358 fn up_down_counter(&self, params: MetricParameters) -> UpDownCounter {
359 self.as_ref().up_down_counter(params)
360 }
361}
362
363#[derive(Clone, Debug)]
366#[non_exhaustive]
367pub enum MetricAttributes {
368 #[cfg(feature = "otel")]
370 OTel {
371 kvs: Arc<Vec<opentelemetry::KeyValue>>,
373 },
374 Prometheus {
376 labels: Arc<OrderedPromLabelSet>,
378 },
379 #[cfg(feature = "core-based-sdk")]
381 Buffer(core::BufferAttributes),
382 #[cfg(feature = "core-based-sdk")]
384 Dynamic(Arc<dyn core::CustomMetricAttributes>),
385 NoOp(Arc<HashMap<String, String>>),
387 Empty,
389}
390
391#[derive(Clone, Debug, Default, derive_more::Constructor)]
393pub struct NewAttributes {
394 pub attributes: Vec<MetricKeyValue>,
396}
397impl NewAttributes {
398 pub fn extend(&mut self, new_kvs: impl IntoIterator<Item = MetricKeyValue>) {
400 self.attributes.extend(new_kvs)
401 }
402}
403impl<I> From<I> for NewAttributes
404where
405 I: IntoIterator<Item = MetricKeyValue>,
406{
407 fn from(value: I) -> Self {
408 Self {
409 attributes: value.into_iter().collect(),
410 }
411 }
412}
413
414impl From<NewAttributes> for HashMap<String, String> {
415 fn from(value: NewAttributes) -> Self {
416 value
417 .attributes
418 .into_iter()
419 .map(|kv| (kv.key, kv.value.to_string()))
420 .collect()
421 }
422}
423
424#[derive(Clone, Debug, PartialEq)]
426pub struct MetricKeyValue {
427 pub key: String,
429 pub value: MetricValue,
431}
432impl MetricKeyValue {
433 pub fn new(key: impl Into<String>, value: impl Into<MetricValue>) -> Self {
435 Self {
436 key: key.into(),
437 value: value.into(),
438 }
439 }
440}
441
442#[derive(Clone, Debug, PartialEq, derive_more::From)]
444pub enum MetricValue {
445 String(String),
447 Int(i64),
449 Float(f64),
451 Bool(bool),
453 }
455impl From<&'static str> for MetricValue {
456 fn from(value: &'static str) -> Self {
457 MetricValue::String(value.to_string())
458 }
459}
460impl Display for MetricValue {
461 fn fmt(&self, f1: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
462 match self {
463 MetricValue::String(s) => write!(f1, "{s}"),
464 MetricValue::Int(i) => write!(f1, "{i}"),
465 MetricValue::Float(f) => write!(f1, "{f}"),
466 MetricValue::Bool(b) => write!(f1, "{b}"),
467 }
468 }
469}
470
471pub trait MetricAttributable<Base> {
473 fn with_attributes(
483 &self,
484 attributes: &MetricAttributes,
485 ) -> Result<Base, Box<dyn std::error::Error>>;
486}
487
488#[derive(Clone)]
490pub struct LazyBoundMetric<T, B> {
491 metric: T,
492 attributes: MetricAttributes,
493 bound_cache: OnceLock<B>,
494}
495impl<T, B> LazyBoundMetric<T, B> {
496 pub fn update_attributes(&mut self, new_attributes: MetricAttributes) {
498 self.attributes = new_attributes;
499 self.bound_cache = OnceLock::new();
500 }
501}
502
503pub trait CounterBase: Send + Sync {
505 fn adds(&self, value: u64);
507}
508
509pub type CounterImpl = LazyBoundMetric<
511 Arc<dyn MetricAttributable<Box<dyn CounterBase>> + Send + Sync>,
512 Arc<dyn CounterBase>,
513>;
514
515#[derive(Clone)]
517pub struct Counter {
518 primary: CounterImpl,
519 in_memory: Option<HeartbeatMetricType>,
520}
521impl Counter {
522 pub fn new(inner: Arc<dyn MetricAttributable<Box<dyn CounterBase>> + Send + Sync>) -> Self {
524 Self {
525 primary: LazyBoundMetric {
526 metric: inner,
527 attributes: MetricAttributes::Empty,
528 bound_cache: OnceLock::new(),
529 },
530 in_memory: None,
531 }
532 }
533
534 pub fn new_with_in_memory(
536 primary: Arc<dyn MetricAttributable<Box<dyn CounterBase>> + Send + Sync>,
537 in_memory: HeartbeatMetricType,
538 ) -> Self {
539 Self {
540 primary: LazyBoundMetric {
541 metric: primary,
542 attributes: MetricAttributes::Empty,
543 bound_cache: OnceLock::new(),
544 },
545 in_memory: Some(in_memory),
546 }
547 }
548
549 pub fn add(&self, value: u64, attributes: &MetricAttributes) {
551 match self.primary.metric.with_attributes(attributes) {
552 Ok(base) => base.adds(value),
553 Err(e) => {
554 dbg_panic!("Failed to initialize primary metric, will drop values: {e:?}");
555 }
556 }
557
558 if let Some(ref in_mem) = self.in_memory {
559 in_mem.record_counter(value);
560 }
561 }
562
563 pub fn update_attributes(&mut self, new_attributes: MetricAttributes) {
565 self.primary.update_attributes(new_attributes.clone());
566 }
567}
568impl CounterBase for Counter {
569 fn adds(&self, value: u64) {
570 let bound = self.primary.bound_cache.get_or_init(|| {
573 self.primary
574 .metric
575 .with_attributes(&self.primary.attributes)
576 .map(Into::into)
577 .unwrap_or_else(|e| {
578 dbg_panic!("Failed to initialize primary metric, will drop values: {e:?}");
579 Arc::new(NoOpInstrument) as Arc<dyn CounterBase>
580 })
581 });
582 bound.adds(value);
583
584 if let Some(ref in_mem) = self.in_memory {
585 in_mem.record_counter(value);
586 }
587 }
588}
589impl MetricAttributable<Counter> for Counter {
590 fn with_attributes(
591 &self,
592 attributes: &MetricAttributes,
593 ) -> Result<Counter, Box<dyn std::error::Error>> {
594 let primary = LazyBoundMetric {
595 metric: self.primary.metric.clone(),
596 attributes: attributes.clone(),
597 bound_cache: OnceLock::new(),
598 };
599
600 Ok(Counter {
601 primary,
602 in_memory: self.in_memory.clone(),
603 })
604 }
605}
606
607pub trait HistogramBase: Send + Sync {
609 fn records(&self, value: u64);
611}
612pub type Histogram = LazyBoundMetric<
614 Arc<dyn MetricAttributable<Box<dyn HistogramBase>> + Send + Sync>,
615 Arc<dyn HistogramBase>,
616>;
617impl Histogram {
618 pub fn new(inner: Arc<dyn MetricAttributable<Box<dyn HistogramBase>> + Send + Sync>) -> Self {
620 Self {
621 metric: inner,
622 attributes: MetricAttributes::Empty,
623 bound_cache: OnceLock::new(),
624 }
625 }
626 pub fn record(&self, value: u64, attributes: &MetricAttributes) {
628 match self.metric.with_attributes(attributes) {
629 Ok(base) => {
630 base.records(value);
631 }
632 Err(e) => {
633 dbg_panic!("Failed to initialize metric, will drop values: {e:?}",);
634 }
635 }
636 }
637}
638impl HistogramBase for Histogram {
639 fn records(&self, value: u64) {
640 let bound = self.bound_cache.get_or_init(|| {
641 self.metric
642 .with_attributes(&self.attributes)
643 .map(Into::into)
644 .unwrap_or_else(|e| {
645 dbg_panic!("Failed to initialize metric, will drop values: {e:?}");
646 Arc::new(NoOpInstrument) as Arc<dyn HistogramBase>
647 })
648 });
649 bound.records(value);
650 }
651}
652impl MetricAttributable<Histogram> for Histogram {
653 fn with_attributes(
654 &self,
655 attributes: &MetricAttributes,
656 ) -> Result<Histogram, Box<dyn std::error::Error>> {
657 Ok(Self {
658 metric: self.metric.clone(),
659 attributes: attributes.clone(),
660 bound_cache: OnceLock::new(),
661 })
662 }
663}
664
665pub trait HistogramF64Base: Send + Sync {
667 fn records(&self, value: f64);
669}
670pub type HistogramF64 = LazyBoundMetric<
672 Arc<dyn MetricAttributable<Box<dyn HistogramF64Base>> + Send + Sync>,
673 Arc<dyn HistogramF64Base>,
674>;
675impl HistogramF64 {
676 pub fn new(
678 inner: Arc<dyn MetricAttributable<Box<dyn HistogramF64Base>> + Send + Sync>,
679 ) -> Self {
680 Self {
681 metric: inner,
682 attributes: MetricAttributes::Empty,
683 bound_cache: OnceLock::new(),
684 }
685 }
686 pub fn record(&self, value: f64, attributes: &MetricAttributes) {
688 match self.metric.with_attributes(attributes) {
689 Ok(base) => {
690 base.records(value);
691 }
692 Err(e) => {
693 dbg_panic!("Failed to initialize metric, will drop values: {e:?}",);
694 }
695 }
696 }
697}
698impl HistogramF64Base for HistogramF64 {
699 fn records(&self, value: f64) {
700 let bound = self.bound_cache.get_or_init(|| {
701 self.metric
702 .with_attributes(&self.attributes)
703 .map(Into::into)
704 .unwrap_or_else(|e| {
705 dbg_panic!("Failed to initialize metric, will drop values: {e:?}");
706 Arc::new(NoOpInstrument) as Arc<dyn HistogramF64Base>
707 })
708 });
709 bound.records(value);
710 }
711}
712impl MetricAttributable<HistogramF64> for HistogramF64 {
713 fn with_attributes(
714 &self,
715 attributes: &MetricAttributes,
716 ) -> Result<HistogramF64, Box<dyn std::error::Error>> {
717 Ok(Self {
718 metric: self.metric.clone(),
719 attributes: attributes.clone(),
720 bound_cache: OnceLock::new(),
721 })
722 }
723}
724
725pub trait HistogramDurationBase: Send + Sync {
727 fn records(&self, value: Duration);
729}
730
731pub type HistogramDurationImpl = LazyBoundMetric<
733 Arc<dyn MetricAttributable<Box<dyn HistogramDurationBase>> + Send + Sync>,
734 Arc<dyn HistogramDurationBase>,
735>;
736
737#[derive(Clone)]
739pub struct HistogramDuration {
740 primary: HistogramDurationImpl,
741 in_memory: Option<HeartbeatMetricType>,
742}
743impl HistogramDuration {
744 pub fn new(
746 inner: Arc<dyn MetricAttributable<Box<dyn HistogramDurationBase>> + Send + Sync>,
747 ) -> Self {
748 Self {
749 primary: LazyBoundMetric {
750 metric: inner,
751 attributes: MetricAttributes::Empty,
752 bound_cache: OnceLock::new(),
753 },
754 in_memory: None,
755 }
756 }
757 pub fn new_with_in_memory(
759 primary: Arc<dyn MetricAttributable<Box<dyn HistogramDurationBase>> + Send + Sync>,
760 in_memory: HeartbeatMetricType,
761 ) -> Self {
762 Self {
763 primary: LazyBoundMetric {
764 metric: primary,
765 attributes: MetricAttributes::Empty,
766 bound_cache: OnceLock::new(),
767 },
768 in_memory: Some(in_memory),
769 }
770 }
771 pub fn record(&self, value: Duration, attributes: &MetricAttributes) {
773 match self.primary.metric.with_attributes(attributes) {
774 Ok(base) => {
775 base.records(value);
776 }
777 Err(e) => {
778 dbg_panic!("Failed to initialize metric, will drop values: {e:?}",);
779 }
780 }
781
782 if let Some(ref in_mem) = self.in_memory {
783 in_mem.record_histogram_observation();
784 }
785 }
786
787 pub fn update_attributes(&mut self, new_attributes: MetricAttributes) {
789 self.primary.update_attributes(new_attributes.clone());
790 }
791}
792impl HistogramDurationBase for HistogramDuration {
793 fn records(&self, value: Duration) {
794 let bound = self.primary.bound_cache.get_or_init(|| {
795 self.primary
796 .metric
797 .with_attributes(&self.primary.attributes)
798 .map(Into::into)
799 .unwrap_or_else(|e| {
800 dbg_panic!("Failed to initialize metric, will drop values: {e:?}");
801 Arc::new(NoOpInstrument) as Arc<dyn HistogramDurationBase>
802 })
803 });
804 bound.records(value);
805
806 if let Some(ref in_mem) = self.in_memory {
807 in_mem.record_histogram_observation();
808 }
809 }
810}
811impl MetricAttributable<HistogramDuration> for HistogramDuration {
812 fn with_attributes(
813 &self,
814 attributes: &MetricAttributes,
815 ) -> Result<HistogramDuration, Box<dyn std::error::Error>> {
816 let primary = LazyBoundMetric {
817 metric: self.primary.metric.clone(),
818 attributes: attributes.clone(),
819 bound_cache: OnceLock::new(),
820 };
821
822 Ok(HistogramDuration {
823 primary,
824 in_memory: self.in_memory.clone(),
825 })
826 }
827}
828
829pub trait GaugeBase: Send + Sync {
831 fn records(&self, value: u64);
833}
834
835pub type GaugeImpl = LazyBoundMetric<
837 Arc<dyn MetricAttributable<Box<dyn GaugeBase>> + Send + Sync>,
838 Arc<dyn GaugeBase>,
839>;
840
841#[derive(Clone)]
843pub struct Gauge {
844 primary: GaugeImpl,
845 in_memory: Option<HeartbeatMetricType>,
846}
847impl Gauge {
848 pub fn new(inner: Arc<dyn MetricAttributable<Box<dyn GaugeBase>> + Send + Sync>) -> Self {
850 Self {
851 primary: LazyBoundMetric {
852 metric: inner,
853 attributes: MetricAttributes::Empty,
854 bound_cache: OnceLock::new(),
855 },
856 in_memory: None,
857 }
858 }
859
860 pub fn new_with_in_memory(
862 primary: Arc<dyn MetricAttributable<Box<dyn GaugeBase>> + Send + Sync>,
863 in_memory: HeartbeatMetricType,
864 ) -> Self {
865 Self {
866 primary: LazyBoundMetric {
867 metric: primary,
868 attributes: MetricAttributes::Empty,
869 bound_cache: OnceLock::new(),
870 },
871 in_memory: Some(in_memory),
872 }
873 }
874
875 pub fn record(&self, value: u64, attributes: &MetricAttributes) {
877 match self.primary.metric.with_attributes(attributes) {
878 Ok(base) => base.records(value),
879 Err(e) => {
880 dbg_panic!("Failed to initialize primary metric, will drop values: {e:?}");
881 }
882 }
883
884 if let Some(ref in_mem) = self.in_memory {
885 in_mem.record_gauge(value, attributes);
886 }
887 }
888
889 pub fn update_attributes(&mut self, new_attributes: MetricAttributes) {
891 self.primary.update_attributes(new_attributes.clone());
892 }
893}
894impl GaugeBase for Gauge {
895 fn records(&self, value: u64) {
896 let bound = self.primary.bound_cache.get_or_init(|| {
897 self.primary
898 .metric
899 .with_attributes(&self.primary.attributes)
900 .map(Into::into)
901 .unwrap_or_else(|e| {
902 dbg_panic!("Failed to initialize primary metric, will drop values: {e:?}");
903 Arc::new(NoOpInstrument) as Arc<dyn GaugeBase>
904 })
905 });
906 bound.records(value);
907
908 if let Some(ref in_mem) = self.in_memory {
909 in_mem.record_gauge(value, &self.primary.attributes);
910 }
911 }
912}
913impl MetricAttributable<Gauge> for Gauge {
914 fn with_attributes(
915 &self,
916 attributes: &MetricAttributes,
917 ) -> Result<Gauge, Box<dyn std::error::Error>> {
918 let primary = LazyBoundMetric {
919 metric: self.primary.metric.clone(),
920 attributes: attributes.clone(),
921 bound_cache: OnceLock::new(),
922 };
923
924 Ok(Gauge {
925 primary,
926 in_memory: self.in_memory.clone(),
927 })
928 }
929}
930
931pub trait GaugeF64Base: Send + Sync {
933 fn records(&self, value: f64);
935}
936pub type GaugeF64 = LazyBoundMetric<
938 Arc<dyn MetricAttributable<Box<dyn GaugeF64Base>> + Send + Sync>,
939 Arc<dyn GaugeF64Base>,
940>;
941impl GaugeF64 {
942 pub fn new(inner: Arc<dyn MetricAttributable<Box<dyn GaugeF64Base>> + Send + Sync>) -> Self {
944 Self {
945 metric: inner,
946 attributes: MetricAttributes::Empty,
947 bound_cache: OnceLock::new(),
948 }
949 }
950 pub fn record(&self, value: f64, attributes: &MetricAttributes) {
952 match self.metric.with_attributes(attributes) {
953 Ok(base) => {
954 base.records(value);
955 }
956 Err(e) => {
957 dbg_panic!("Failed to initialize metric, will drop values: {e:?}",);
958 }
959 }
960 }
961}
962impl GaugeF64Base for GaugeF64 {
963 fn records(&self, value: f64) {
964 let bound = self.bound_cache.get_or_init(|| {
965 self.metric
966 .with_attributes(&self.attributes)
967 .map(Into::into)
968 .unwrap_or_else(|e| {
969 dbg_panic!("Failed to initialize metric, will drop values: {e:?}");
970 Arc::new(NoOpInstrument) as Arc<dyn GaugeF64Base>
971 })
972 });
973 bound.records(value);
974 }
975}
976impl MetricAttributable<GaugeF64> for GaugeF64 {
977 fn with_attributes(
978 &self,
979 attributes: &MetricAttributes,
980 ) -> Result<GaugeF64, Box<dyn std::error::Error>> {
981 Ok(Self {
982 metric: self.metric.clone(),
983 attributes: attributes.clone(),
984 bound_cache: OnceLock::new(),
985 })
986 }
987}
988
989pub trait UpDownCounterBase: Send + Sync {
991 fn adds(&self, value: i64);
993}
994
995pub type UpDownCounterImpl = LazyBoundMetric<
997 Arc<dyn MetricAttributable<Box<dyn UpDownCounterBase>> + Send + Sync>,
998 Arc<dyn UpDownCounterBase>,
999>;
1000
1001#[derive(Clone)]
1003pub struct UpDownCounter {
1004 primary: UpDownCounterImpl,
1005}
1006impl UpDownCounter {
1007 pub fn new(
1009 inner: Arc<dyn MetricAttributable<Box<dyn UpDownCounterBase>> + Send + Sync>,
1010 ) -> Self {
1011 Self {
1012 primary: LazyBoundMetric {
1013 metric: inner,
1014 attributes: MetricAttributes::Empty,
1015 bound_cache: OnceLock::new(),
1016 },
1017 }
1018 }
1019
1020 pub fn add(&self, value: i64, attributes: &MetricAttributes) {
1022 match self.primary.metric.with_attributes(attributes) {
1023 Ok(base) => base.adds(value),
1024 Err(e) => {
1025 dbg_panic!("Failed to initialize primary metric, will drop values: {e:?}");
1026 }
1027 }
1028 }
1029
1030 pub fn update_attributes(&mut self, new_attributes: MetricAttributes) {
1032 self.primary.update_attributes(new_attributes.clone());
1033 }
1034}
1035impl UpDownCounterBase for UpDownCounter {
1036 fn adds(&self, value: i64) {
1037 let bound = self.primary.bound_cache.get_or_init(|| {
1038 self.primary
1039 .metric
1040 .with_attributes(&self.primary.attributes)
1041 .map(Into::into)
1042 .unwrap_or_else(|e| {
1043 dbg_panic!("Failed to initialize primary metric, will drop values: {e:?}");
1044 Arc::new(NoOpInstrument) as Arc<dyn UpDownCounterBase>
1045 })
1046 });
1047 bound.adds(value);
1048 }
1049}
1050impl MetricAttributable<UpDownCounter> for UpDownCounter {
1051 fn with_attributes(
1052 &self,
1053 attributes: &MetricAttributes,
1054 ) -> Result<UpDownCounter, Box<dyn std::error::Error>> {
1055 let primary = LazyBoundMetric {
1056 metric: self.primary.metric.clone(),
1057 attributes: attributes.clone(),
1058 bound_cache: OnceLock::new(),
1059 };
1060 Ok(UpDownCounter { primary })
1061 }
1062}
1063
1064#[derive(Debug)]
1066pub struct NoOpCoreMeter;
1067impl CoreMeter for NoOpCoreMeter {
1068 fn new_attributes(&self, attribs: NewAttributes) -> MetricAttributes {
1069 MetricAttributes::NoOp(Arc::new(attribs.into()))
1070 }
1071
1072 fn extend_attributes(
1073 &self,
1074 existing: MetricAttributes,
1075 attribs: NewAttributes,
1076 ) -> MetricAttributes {
1077 if let MetricAttributes::NoOp(labels) = existing {
1078 let mut labels = (*labels).clone();
1079 labels.extend::<HashMap<String, String>>(attribs.into());
1080 MetricAttributes::NoOp(Arc::new(labels))
1081 } else {
1082 dbg_panic!("Must use NoOp attributes with a NoOp metric implementation");
1083 existing
1084 }
1085 }
1086
1087 fn counter(&self, _: MetricParameters) -> Counter {
1088 Counter::new(Arc::new(NoOpInstrument))
1089 }
1090
1091 fn histogram(&self, _: MetricParameters) -> Histogram {
1092 Histogram::new(Arc::new(NoOpInstrument))
1093 }
1094
1095 fn histogram_f64(&self, _: MetricParameters) -> HistogramF64 {
1096 HistogramF64::new(Arc::new(NoOpInstrument))
1097 }
1098
1099 fn histogram_duration(&self, _: MetricParameters) -> HistogramDuration {
1100 HistogramDuration::new(Arc::new(NoOpInstrument))
1101 }
1102
1103 fn gauge(&self, _: MetricParameters) -> Gauge {
1104 Gauge::new(Arc::new(NoOpInstrument))
1105 }
1106
1107 fn gauge_f64(&self, _: MetricParameters) -> GaugeF64 {
1108 GaugeF64::new(Arc::new(NoOpInstrument))
1109 }
1110
1111 fn up_down_counter(&self, _: MetricParameters) -> UpDownCounter {
1112 UpDownCounter::new(Arc::new(NoOpInstrument))
1113 }
1114}
1115
1116macro_rules! impl_metric_attributable {
1117 ($base_trait:ident, $rt:ty, $init:expr) => {
1118 impl MetricAttributable<Box<dyn $base_trait>> for $rt {
1119 fn with_attributes(
1120 &self,
1121 _: &MetricAttributes,
1122 ) -> Result<Box<dyn $base_trait>, Box<dyn std::error::Error>> {
1123 Ok(Box::new($init))
1124 }
1125 }
1126 };
1127}
1128
1129pub struct NoOpInstrument;
1131macro_rules! impl_no_op {
1132 ($base_trait:ident, signed) => {
1133 impl_metric_attributable!($base_trait, NoOpInstrument, NoOpInstrument);
1134 impl $base_trait for NoOpInstrument {
1135 fn adds(&self, _: i64) {}
1136 }
1137 };
1138 ($base_trait:ident, $value_type:ty) => {
1139 impl_metric_attributable!($base_trait, NoOpInstrument, NoOpInstrument);
1140 impl $base_trait for NoOpInstrument {
1141 fn records(&self, _: $value_type) {}
1142 }
1143 };
1144 ($base_trait:ident) => {
1145 impl_metric_attributable!($base_trait, NoOpInstrument, NoOpInstrument);
1146 impl $base_trait for NoOpInstrument {
1147 fn adds(&self, _: u64) {}
1148 }
1149 };
1150}
1151impl_no_op!(CounterBase);
1152impl_no_op!(HistogramBase, u64);
1153impl_no_op!(HistogramF64Base, f64);
1154impl_no_op!(HistogramDurationBase, Duration);
1155impl_no_op!(GaugeBase, u64);
1156impl_no_op!(GaugeF64Base, f64);
1157impl_no_op!(UpDownCounterBase, signed);
1158
1159#[cfg(test)]
1160mod tests {
1161 use super::*;
1162 use std::{
1163 collections::HashMap,
1164 sync::{
1165 Arc,
1166 atomic::{AtomicU64, Ordering},
1167 },
1168 };
1169
1170 #[test]
1171 fn in_memory_attributes_provide_label_values() {
1172 let meter = NoOpCoreMeter;
1173 let base_attrs = meter.new_attributes(NewAttributes::default());
1174 let attrs = meter.extend_attributes(
1175 base_attrs,
1176 NewAttributes::from(vec![MetricKeyValue::new("poller_type", "workflow_task")]),
1177 );
1178
1179 let value = Arc::new(AtomicU64::new(0));
1180 let mut metrics = HashMap::new();
1181 metrics.insert("workflow_task".to_string(), value.clone());
1182 let heartbeat_metric = HeartbeatMetricType::WithLabel {
1183 label_key: "poller_type".to_string(),
1184 metrics,
1185 };
1186
1187 heartbeat_metric.record_gauge(3, &attrs);
1188
1189 assert_eq!(value.load(Ordering::Relaxed), 3);
1190 assert_eq!(
1191 label_value_from_attributes(&attrs, "poller_type").as_deref(),
1192 Some("workflow_task")
1193 );
1194 }
1195}
1196
1197#[cfg(feature = "otel")]
1198mod otel {
1199 use super::*;
1200 use opentelemetry::{KeyValue, metrics};
1201
1202 #[derive(Clone)]
1203 struct InstrumentWithAttributes<I> {
1204 inner: I,
1205 attributes: MetricAttributes,
1206 }
1207
1208 impl From<MetricKeyValue> for KeyValue {
1209 fn from(kv: MetricKeyValue) -> Self {
1210 KeyValue::new(kv.key, kv.value)
1211 }
1212 }
1213
1214 impl From<MetricValue> for opentelemetry::Value {
1215 fn from(mv: MetricValue) -> Self {
1216 match mv {
1217 MetricValue::String(s) => opentelemetry::Value::String(s.into()),
1218 MetricValue::Int(i) => opentelemetry::Value::I64(i),
1219 MetricValue::Float(f) => opentelemetry::Value::F64(f),
1220 MetricValue::Bool(b) => opentelemetry::Value::Bool(b),
1221 }
1222 }
1223 }
1224
1225 impl MetricAttributable<Box<dyn CounterBase>> for metrics::Counter<u64> {
1226 fn with_attributes(
1227 &self,
1228 attributes: &MetricAttributes,
1229 ) -> Result<Box<dyn CounterBase>, Box<dyn std::error::Error>> {
1230 Ok(Box::new(InstrumentWithAttributes {
1231 inner: self.clone(),
1232 attributes: attributes.clone(),
1233 }))
1234 }
1235 }
1236
1237 impl CounterBase for InstrumentWithAttributes<metrics::Counter<u64>> {
1238 fn adds(&self, value: u64) {
1239 if let MetricAttributes::OTel { kvs } = &self.attributes {
1240 self.inner.add(value, kvs);
1241 } else {
1242 dbg_panic!("Must use OTel attributes with an OTel metric implementation");
1243 }
1244 }
1245 }
1246
1247 impl MetricAttributable<Box<dyn GaugeBase>> for metrics::Gauge<u64> {
1248 fn with_attributes(
1249 &self,
1250 attributes: &MetricAttributes,
1251 ) -> Result<Box<dyn GaugeBase>, Box<dyn std::error::Error>> {
1252 Ok(Box::new(InstrumentWithAttributes {
1253 inner: self.clone(),
1254 attributes: attributes.clone(),
1255 }))
1256 }
1257 }
1258
1259 impl GaugeBase for InstrumentWithAttributes<metrics::Gauge<u64>> {
1260 fn records(&self, value: u64) {
1261 if let MetricAttributes::OTel { kvs } = &self.attributes {
1262 self.inner.record(value, kvs);
1263 } else {
1264 dbg_panic!("Must use OTel attributes with an OTel metric implementation");
1265 }
1266 }
1267 }
1268
1269 impl MetricAttributable<Box<dyn GaugeF64Base>> for metrics::Gauge<f64> {
1270 fn with_attributes(
1271 &self,
1272 attributes: &MetricAttributes,
1273 ) -> Result<Box<dyn GaugeF64Base>, Box<dyn std::error::Error>> {
1274 Ok(Box::new(InstrumentWithAttributes {
1275 inner: self.clone(),
1276 attributes: attributes.clone(),
1277 }))
1278 }
1279 }
1280
1281 impl GaugeF64Base for InstrumentWithAttributes<metrics::Gauge<f64>> {
1282 fn records(&self, value: f64) {
1283 if let MetricAttributes::OTel { kvs } = &self.attributes {
1284 self.inner.record(value, kvs);
1285 } else {
1286 dbg_panic!("Must use OTel attributes with an OTel metric implementation");
1287 }
1288 }
1289 }
1290
1291 impl MetricAttributable<Box<dyn HistogramBase>> for metrics::Histogram<u64> {
1292 fn with_attributes(
1293 &self,
1294 attributes: &MetricAttributes,
1295 ) -> Result<Box<dyn HistogramBase>, Box<dyn std::error::Error>> {
1296 Ok(Box::new(InstrumentWithAttributes {
1297 inner: self.clone(),
1298 attributes: attributes.clone(),
1299 }))
1300 }
1301 }
1302
1303 impl HistogramBase for InstrumentWithAttributes<metrics::Histogram<u64>> {
1304 fn records(&self, value: u64) {
1305 if let MetricAttributes::OTel { kvs } = &self.attributes {
1306 self.inner.record(value, kvs);
1307 } else {
1308 dbg_panic!("Must use OTel attributes with an OTel metric implementation");
1309 }
1310 }
1311 }
1312
1313 impl MetricAttributable<Box<dyn HistogramF64Base>> for metrics::Histogram<f64> {
1314 fn with_attributes(
1315 &self,
1316 attributes: &MetricAttributes,
1317 ) -> Result<Box<dyn HistogramF64Base>, Box<dyn std::error::Error>> {
1318 Ok(Box::new(InstrumentWithAttributes {
1319 inner: self.clone(),
1320 attributes: attributes.clone(),
1321 }))
1322 }
1323 }
1324
1325 impl HistogramF64Base for InstrumentWithAttributes<metrics::Histogram<f64>> {
1326 fn records(&self, value: f64) {
1327 if let MetricAttributes::OTel { kvs } = &self.attributes {
1328 self.inner.record(value, kvs);
1329 } else {
1330 dbg_panic!("Must use OTel attributes with an OTel metric implementation");
1331 }
1332 }
1333 }
1334
1335 impl MetricAttributable<Box<dyn UpDownCounterBase>> for opentelemetry::metrics::UpDownCounter<i64> {
1336 fn with_attributes(
1337 &self,
1338 attributes: &MetricAttributes,
1339 ) -> Result<Box<dyn UpDownCounterBase>, Box<dyn std::error::Error>> {
1340 Ok(Box::new(InstrumentWithAttributes {
1341 inner: self.clone(),
1342 attributes: attributes.clone(),
1343 }))
1344 }
1345 }
1346
1347 impl UpDownCounterBase for InstrumentWithAttributes<opentelemetry::metrics::UpDownCounter<i64>> {
1348 fn adds(&self, value: i64) {
1349 if let MetricAttributes::OTel { kvs } = &self.attributes {
1350 self.inner.add(value, kvs);
1351 } else {
1352 dbg_panic!("Must use OTel attributes with an OTel metric implementation");
1353 }
1354 }
1355 }
1356}
1357
1358#[derive(Debug, Clone, PartialEq, Default)]
1360pub struct OrderedPromLabelSet {
1361 attributes: BTreeMap<String, MetricValue>,
1362}
1363
1364impl OrderedPromLabelSet {
1365 pub const fn new() -> Self {
1367 Self {
1368 attributes: BTreeMap::new(),
1369 }
1370 }
1371 pub fn keys_ordered(&self) -> impl Iterator<Item = &str> {
1373 self.attributes.keys().map(|s| s.as_str())
1374 }
1375 pub fn as_prom_labels(&self) -> HashMap<&str, String> {
1377 let mut labels = HashMap::new();
1378 for (k, v) in self.attributes.iter() {
1379 labels.insert(k.as_str(), v.to_string());
1380 }
1381 labels
1382 }
1383 pub fn add_kv(&mut self, kv: MetricKeyValue) {
1385 self.attributes.insert(kv.key.replace('-', "_"), kv.value);
1387 }
1388}
1389
1390impl From<NewAttributes> for OrderedPromLabelSet {
1391 fn from(n: NewAttributes) -> Self {
1392 let mut me = Self::default();
1393 for kv in n.attributes {
1394 me.add_kv(kv);
1395 }
1396 me
1397 }
1398}
1399
1400#[derive(Debug, derive_more::Constructor)]
1401pub(crate) struct PrefixedMetricsMeter<CM> {
1402 prefix: String,
1403 meter: CM,
1404}
1405impl<CM: CoreMeter> CoreMeter for PrefixedMetricsMeter<CM> {
1406 fn new_attributes(&self, attribs: NewAttributes) -> MetricAttributes {
1407 self.meter.new_attributes(attribs)
1408 }
1409
1410 fn extend_attributes(
1411 &self,
1412 existing: MetricAttributes,
1413 attribs: NewAttributes,
1414 ) -> MetricAttributes {
1415 self.meter.extend_attributes(existing, attribs)
1416 }
1417
1418 fn counter(&self, mut params: MetricParameters) -> Counter {
1419 params.name = (self.prefix.clone() + &*params.name).into();
1420 self.meter.counter(params)
1421 }
1422
1423 fn histogram(&self, mut params: MetricParameters) -> Histogram {
1424 params.name = (self.prefix.clone() + &*params.name).into();
1425 self.meter.histogram(params)
1426 }
1427
1428 fn histogram_f64(&self, mut params: MetricParameters) -> HistogramF64 {
1429 params.name = (self.prefix.clone() + &*params.name).into();
1430 self.meter.histogram_f64(params)
1431 }
1432
1433 fn histogram_duration(&self, mut params: MetricParameters) -> HistogramDuration {
1434 params.name = (self.prefix.clone() + &*params.name).into();
1435 self.meter.histogram_duration(params)
1436 }
1437
1438 fn gauge(&self, mut params: MetricParameters) -> Gauge {
1439 params.name = (self.prefix.clone() + &*params.name).into();
1440 self.meter.gauge(params)
1441 }
1442
1443 fn gauge_f64(&self, mut params: MetricParameters) -> GaugeF64 {
1444 params.name = (self.prefix.clone() + &*params.name).into();
1445 self.meter.gauge_f64(params)
1446 }
1447
1448 fn up_down_counter(&self, mut params: MetricParameters) -> UpDownCounter {
1449 params.name = (self.prefix.clone() + &*params.name).into();
1450 self.meter.up_down_counter(params)
1451 }
1452}