1use crate::{dbg_panic, telemetry::TaskQueueLabelStrategy};
2use std::{
3 borrow::Cow,
4 collections::{BTreeMap, HashMap},
5 fmt::{Debug, Display},
6 ops::Deref,
7 sync::{
8 Arc, OnceLock,
9 atomic::{AtomicU64, Ordering},
10 },
11 time::Duration,
12};
13
14#[cfg(feature = "core-based-sdk")]
15pub mod core;
16
17pub const WORKFLOW_E2E_LATENCY_HISTOGRAM_NAME: &str = "workflow_endtoend_latency";
19pub const WORKFLOW_TASK_SCHED_TO_START_LATENCY_HISTOGRAM_NAME: &str =
21 "workflow_task_schedule_to_start_latency";
22pub const WORKFLOW_TASK_REPLAY_LATENCY_HISTOGRAM_NAME: &str = "workflow_task_replay_latency";
24pub const WORKFLOW_TASK_EXECUTION_LATENCY_HISTOGRAM_NAME: &str = "workflow_task_execution_latency";
26pub const ACTIVITY_SCHED_TO_START_LATENCY_HISTOGRAM_NAME: &str =
28 "activity_schedule_to_start_latency";
29pub const ACTIVITY_EXEC_LATENCY_HISTOGRAM_NAME: &str = "activity_execution_latency";
31
32macro_rules! define_latency_buckets {
34 ($(($metric_name:pat, $name:ident, $sec_name:ident, [$($bucket:expr),*])),*) => {
35 $(
36 pub(super) static $name: &[f64] = &[$($bucket,)*];
37 pub(super) static $sec_name: &[f64] = &[$( $bucket / 1000.0, )*];
38 )*
39
40 pub fn default_buckets_for(histo_name: &str, use_seconds: bool) -> &'static [f64] {
46 match histo_name {
47 $(
48 $metric_name => { if use_seconds { &$sec_name } else { &$name } },
49 )*
50 }
51 }
52 };
53}
54
55define_latency_buckets!(
56 (
57 WORKFLOW_E2E_LATENCY_HISTOGRAM_NAME,
58 WF_LATENCY_MS_BUCKETS,
59 WF_LATENCY_S_BUCKETS,
60 [
61 100.,
62 500.,
63 1000.,
64 1500.,
65 2000.,
66 5000.,
67 10_000.,
68 30_000.,
69 60_000.,
70 120_000.,
71 300_000.,
72 600_000.,
73 1_800_000., 3_600_000., 30_600_000., 8.64e7 ]
78 ),
79 (
80 WORKFLOW_TASK_EXECUTION_LATENCY_HISTOGRAM_NAME
81 | WORKFLOW_TASK_REPLAY_LATENCY_HISTOGRAM_NAME,
82 WF_TASK_MS_BUCKETS,
83 WF_TASK_S_BUCKETS,
84 [1., 10., 20., 50., 100., 200., 500., 1000.]
85 ),
86 (
87 ACTIVITY_EXEC_LATENCY_HISTOGRAM_NAME,
88 ACT_EXE_MS_BUCKETS,
89 ACT_EXE_S_BUCKETS,
90 [50., 100., 500., 1000., 5000., 10_000., 60_000.]
91 ),
92 (
93 WORKFLOW_TASK_SCHED_TO_START_LATENCY_HISTOGRAM_NAME
94 | ACTIVITY_SCHED_TO_START_LATENCY_HISTOGRAM_NAME,
95 TASK_SCHED_TO_START_MS_BUCKETS,
96 TASK_SCHED_TO_START_S_BUCKETS,
97 [100., 500., 1000., 5000., 10_000., 100_000., 1_000_000.]
98 ),
99 (
100 _,
101 DEFAULT_MS_BUCKETS,
102 DEFAULT_S_BUCKETS,
103 [50., 100., 500., 1000., 2500., 10_000.]
104 )
105);
106
107pub trait CoreMeter: Send + Sync + Debug {
111 fn new_attributes(&self, attribs: NewAttributes) -> MetricAttributes;
115 fn extend_attributes(
119 &self,
120 existing: MetricAttributes,
121 attribs: NewAttributes,
122 ) -> MetricAttributes;
123 fn counter(&self, params: MetricParameters) -> Counter;
125
126 fn counter_with_in_memory(
128 &self,
129 params: MetricParameters,
130 in_memory_counter: HeartbeatMetricType,
131 ) -> Counter {
132 let primary_counter = self.counter(params);
133
134 Counter::new_with_in_memory(primary_counter.primary.metric.clone(), in_memory_counter)
135 }
136
137 fn histogram(&self, params: MetricParameters) -> Histogram;
139 fn histogram_f64(&self, params: MetricParameters) -> HistogramF64;
141 fn histogram_duration(&self, params: MetricParameters) -> HistogramDuration;
146
147 fn histogram_duration_with_in_memory(
149 &self,
150 params: MetricParameters,
151 in_memory_hist: HeartbeatMetricType,
152 ) -> HistogramDuration {
153 let primary_hist = self.histogram_duration(params);
154
155 HistogramDuration::new_with_in_memory(primary_hist.primary.metric.clone(), in_memory_hist)
156 }
157 fn gauge(&self, params: MetricParameters) -> Gauge;
159
160 fn gauge_with_in_memory(
162 &self,
163 params: MetricParameters,
164 in_memory_metrics: HeartbeatMetricType,
165 ) -> Gauge {
166 let primary_gauge = self.gauge(params.clone());
167 Gauge::new_with_in_memory(primary_gauge.primary.metric.clone(), in_memory_metrics)
168 }
169
170 fn gauge_f64(&self, params: MetricParameters) -> GaugeF64;
172}
173
174#[derive(Clone, Debug)]
178pub enum HeartbeatMetricType {
179 Individual(Arc<AtomicU64>),
181 WithLabel {
183 label_key: String,
185 metrics: HashMap<String, Arc<AtomicU64>>,
187 },
188}
189
190impl HeartbeatMetricType {
191 fn record_counter(&self, delta: u64) {
192 match self {
193 HeartbeatMetricType::Individual(metric) => {
194 metric.fetch_add(delta, Ordering::Relaxed);
195 }
196 HeartbeatMetricType::WithLabel { .. } => {
197 dbg_panic!("Counter does not support in-memory metric with labels");
198 }
199 }
200 }
201
202 fn record_histogram_observation(&self) {
203 match self {
204 HeartbeatMetricType::Individual(metric) => {
205 metric.fetch_add(1, Ordering::Relaxed);
206 }
207 HeartbeatMetricType::WithLabel { .. } => {
208 dbg_panic!("Histogram does not support in-memory metric with labels");
209 }
210 }
211 }
212
213 fn record_gauge(&self, value: u64, attributes: &MetricAttributes) {
214 match self {
215 HeartbeatMetricType::Individual(metric) => {
216 metric.store(value, Ordering::Relaxed);
217 }
218 HeartbeatMetricType::WithLabel { label_key, metrics } => {
219 if let Some(metric) = label_value_from_attributes(attributes, label_key.as_str())
220 .and_then(|label_value| metrics.get(label_value.as_str()))
221 {
222 metric.store(value, Ordering::Relaxed)
223 }
224 }
225 }
226 }
227}
228
229fn label_value_from_attributes(attributes: &MetricAttributes, key: &str) -> Option<String> {
230 match attributes {
231 MetricAttributes::Prometheus { labels } => labels.as_prom_labels().get(key).cloned(),
232 #[cfg(feature = "otel")]
233 MetricAttributes::OTel { kvs } => kvs
234 .iter()
235 .find(|kv| kv.key.as_str() == key)
236 .map(|kv| kv.value.to_string()),
237 MetricAttributes::NoOp(labels) => labels.get(key).cloned(),
238 _ => None,
239 }
240}
241
242#[derive(Debug, Clone, bon::Builder)]
244pub struct MetricParameters {
245 #[builder(into)]
247 pub name: Cow<'static, str>,
248 #[builder(into, default = Cow::Borrowed(""))]
250 pub description: Cow<'static, str>,
251 #[builder(into, default = Cow::Borrowed(""))]
253 pub unit: Cow<'static, str>,
254}
255impl From<&'static str> for MetricParameters {
256 fn from(value: &'static str) -> Self {
257 Self {
258 name: value.into(),
259 description: Default::default(),
260 unit: Default::default(),
261 }
262 }
263}
264
265#[derive(Clone, Debug)]
267pub struct TemporalMeter {
268 inner: Arc<dyn CoreMeter>,
269 default_attribs: MetricAttributes,
270 task_queue_label_strategy: TaskQueueLabelStrategy,
271}
272
273impl TemporalMeter {
274 pub fn new(
276 meter: Arc<dyn CoreMeter>,
277 default_attribs: NewAttributes,
278 task_queue_label_strategy: TaskQueueLabelStrategy,
279 ) -> Self {
280 Self {
281 default_attribs: meter.new_attributes(default_attribs),
282 inner: meter,
283 task_queue_label_strategy,
284 }
285 }
286
287 pub fn no_op() -> Self {
289 Self {
290 inner: Arc::new(NoOpCoreMeter),
291 default_attribs: MetricAttributes::NoOp(Arc::new(Default::default())),
292 task_queue_label_strategy: TaskQueueLabelStrategy::UseNormal,
293 }
294 }
295
296 pub fn get_default_attributes(&self) -> &MetricAttributes {
298 &self.default_attribs
299 }
300
301 pub fn get_task_queue_label_strategy(&self) -> &TaskQueueLabelStrategy {
303 &self.task_queue_label_strategy
304 }
305
306 pub fn merge_attributes(&mut self, new_attribs: NewAttributes) {
308 self.default_attribs = self.extend_attributes(self.default_attribs.clone(), new_attribs);
309 }
310}
311
312impl Deref for TemporalMeter {
313 type Target = dyn CoreMeter;
314 fn deref(&self) -> &Self::Target {
315 self.inner.as_ref()
316 }
317}
318
319impl CoreMeter for Arc<dyn CoreMeter> {
320 fn new_attributes(&self, attribs: NewAttributes) -> MetricAttributes {
321 self.as_ref().new_attributes(attribs)
322 }
323
324 fn extend_attributes(
325 &self,
326 existing: MetricAttributes,
327 attribs: NewAttributes,
328 ) -> MetricAttributes {
329 self.as_ref().extend_attributes(existing, attribs)
330 }
331
332 fn counter(&self, params: MetricParameters) -> Counter {
333 self.as_ref().counter(params)
334 }
335 fn histogram(&self, params: MetricParameters) -> Histogram {
336 self.as_ref().histogram(params)
337 }
338
339 fn histogram_f64(&self, params: MetricParameters) -> HistogramF64 {
340 self.as_ref().histogram_f64(params)
341 }
342
343 fn histogram_duration(&self, params: MetricParameters) -> HistogramDuration {
344 self.as_ref().histogram_duration(params)
345 }
346
347 fn gauge(&self, params: MetricParameters) -> Gauge {
348 self.as_ref().gauge(params)
349 }
350
351 fn gauge_f64(&self, params: MetricParameters) -> GaugeF64 {
352 self.as_ref().gauge_f64(params)
353 }
354}
355
356#[derive(Clone, Debug)]
359#[non_exhaustive]
360pub enum MetricAttributes {
361 #[cfg(feature = "otel")]
363 OTel {
364 kvs: Arc<Vec<opentelemetry::KeyValue>>,
366 },
367 Prometheus {
369 labels: Arc<OrderedPromLabelSet>,
371 },
372 #[cfg(feature = "core-based-sdk")]
374 Buffer(core::BufferAttributes),
375 #[cfg(feature = "core-based-sdk")]
377 Dynamic(Arc<dyn core::CustomMetricAttributes>),
378 NoOp(Arc<HashMap<String, String>>),
380 Empty,
382}
383
384#[derive(Clone, Debug, Default, derive_more::Constructor)]
386pub struct NewAttributes {
387 pub attributes: Vec<MetricKeyValue>,
389}
390impl NewAttributes {
391 pub fn extend(&mut self, new_kvs: impl IntoIterator<Item = MetricKeyValue>) {
393 self.attributes.extend(new_kvs)
394 }
395}
396impl<I> From<I> for NewAttributes
397where
398 I: IntoIterator<Item = MetricKeyValue>,
399{
400 fn from(value: I) -> Self {
401 Self {
402 attributes: value.into_iter().collect(),
403 }
404 }
405}
406
407impl From<NewAttributes> for HashMap<String, String> {
408 fn from(value: NewAttributes) -> Self {
409 value
410 .attributes
411 .into_iter()
412 .map(|kv| (kv.key, kv.value.to_string()))
413 .collect()
414 }
415}
416
417#[derive(Clone, Debug, PartialEq)]
419pub struct MetricKeyValue {
420 pub key: String,
422 pub value: MetricValue,
424}
425impl MetricKeyValue {
426 pub fn new(key: impl Into<String>, value: impl Into<MetricValue>) -> Self {
428 Self {
429 key: key.into(),
430 value: value.into(),
431 }
432 }
433}
434
435#[derive(Clone, Debug, PartialEq, derive_more::From)]
437pub enum MetricValue {
438 String(String),
440 Int(i64),
442 Float(f64),
444 Bool(bool),
446 }
448impl From<&'static str> for MetricValue {
449 fn from(value: &'static str) -> Self {
450 MetricValue::String(value.to_string())
451 }
452}
453impl Display for MetricValue {
454 fn fmt(&self, f1: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
455 match self {
456 MetricValue::String(s) => write!(f1, "{s}"),
457 MetricValue::Int(i) => write!(f1, "{i}"),
458 MetricValue::Float(f) => write!(f1, "{f}"),
459 MetricValue::Bool(b) => write!(f1, "{b}"),
460 }
461 }
462}
463
464pub trait MetricAttributable<Base> {
466 fn with_attributes(
476 &self,
477 attributes: &MetricAttributes,
478 ) -> Result<Base, Box<dyn std::error::Error>>;
479}
480
481#[derive(Clone)]
483pub struct LazyBoundMetric<T, B> {
484 metric: T,
485 attributes: MetricAttributes,
486 bound_cache: OnceLock<B>,
487}
488impl<T, B> LazyBoundMetric<T, B> {
489 pub fn update_attributes(&mut self, new_attributes: MetricAttributes) {
491 self.attributes = new_attributes;
492 self.bound_cache = OnceLock::new();
493 }
494}
495
496pub trait CounterBase: Send + Sync {
498 fn adds(&self, value: u64);
500}
501
502pub type CounterImpl = LazyBoundMetric<
504 Arc<dyn MetricAttributable<Box<dyn CounterBase>> + Send + Sync>,
505 Arc<dyn CounterBase>,
506>;
507
508#[derive(Clone)]
510pub struct Counter {
511 primary: CounterImpl,
512 in_memory: Option<HeartbeatMetricType>,
513}
514impl Counter {
515 pub fn new(inner: Arc<dyn MetricAttributable<Box<dyn CounterBase>> + Send + Sync>) -> Self {
517 Self {
518 primary: LazyBoundMetric {
519 metric: inner,
520 attributes: MetricAttributes::Empty,
521 bound_cache: OnceLock::new(),
522 },
523 in_memory: None,
524 }
525 }
526
527 pub fn new_with_in_memory(
529 primary: Arc<dyn MetricAttributable<Box<dyn CounterBase>> + Send + Sync>,
530 in_memory: HeartbeatMetricType,
531 ) -> Self {
532 Self {
533 primary: LazyBoundMetric {
534 metric: primary,
535 attributes: MetricAttributes::Empty,
536 bound_cache: OnceLock::new(),
537 },
538 in_memory: Some(in_memory),
539 }
540 }
541
542 pub fn add(&self, value: u64, attributes: &MetricAttributes) {
544 match self.primary.metric.with_attributes(attributes) {
545 Ok(base) => base.adds(value),
546 Err(e) => {
547 dbg_panic!("Failed to initialize primary metric, will drop values: {e:?}");
548 }
549 }
550
551 if let Some(ref in_mem) = self.in_memory {
552 in_mem.record_counter(value);
553 }
554 }
555
556 pub fn update_attributes(&mut self, new_attributes: MetricAttributes) {
558 self.primary.update_attributes(new_attributes.clone());
559 }
560}
561impl CounterBase for Counter {
562 fn adds(&self, value: u64) {
563 let bound = self.primary.bound_cache.get_or_init(|| {
566 self.primary
567 .metric
568 .with_attributes(&self.primary.attributes)
569 .map(Into::into)
570 .unwrap_or_else(|e| {
571 dbg_panic!("Failed to initialize primary metric, will drop values: {e:?}");
572 Arc::new(NoOpInstrument) as Arc<dyn CounterBase>
573 })
574 });
575 bound.adds(value);
576
577 if let Some(ref in_mem) = self.in_memory {
578 in_mem.record_counter(value);
579 }
580 }
581}
582impl MetricAttributable<Counter> for Counter {
583 fn with_attributes(
584 &self,
585 attributes: &MetricAttributes,
586 ) -> Result<Counter, Box<dyn std::error::Error>> {
587 let primary = LazyBoundMetric {
588 metric: self.primary.metric.clone(),
589 attributes: attributes.clone(),
590 bound_cache: OnceLock::new(),
591 };
592
593 Ok(Counter {
594 primary,
595 in_memory: self.in_memory.clone(),
596 })
597 }
598}
599
600pub trait HistogramBase: Send + Sync {
602 fn records(&self, value: u64);
604}
605pub type Histogram = LazyBoundMetric<
607 Arc<dyn MetricAttributable<Box<dyn HistogramBase>> + Send + Sync>,
608 Arc<dyn HistogramBase>,
609>;
610impl Histogram {
611 pub fn new(inner: Arc<dyn MetricAttributable<Box<dyn HistogramBase>> + Send + Sync>) -> Self {
613 Self {
614 metric: inner,
615 attributes: MetricAttributes::Empty,
616 bound_cache: OnceLock::new(),
617 }
618 }
619 pub fn record(&self, value: u64, attributes: &MetricAttributes) {
621 match self.metric.with_attributes(attributes) {
622 Ok(base) => {
623 base.records(value);
624 }
625 Err(e) => {
626 dbg_panic!("Failed to initialize metric, will drop values: {e:?}",);
627 }
628 }
629 }
630}
631impl HistogramBase for Histogram {
632 fn records(&self, value: u64) {
633 let bound = self.bound_cache.get_or_init(|| {
634 self.metric
635 .with_attributes(&self.attributes)
636 .map(Into::into)
637 .unwrap_or_else(|e| {
638 dbg_panic!("Failed to initialize metric, will drop values: {e:?}");
639 Arc::new(NoOpInstrument) as Arc<dyn HistogramBase>
640 })
641 });
642 bound.records(value);
643 }
644}
645impl MetricAttributable<Histogram> for Histogram {
646 fn with_attributes(
647 &self,
648 attributes: &MetricAttributes,
649 ) -> Result<Histogram, Box<dyn std::error::Error>> {
650 Ok(Self {
651 metric: self.metric.clone(),
652 attributes: attributes.clone(),
653 bound_cache: OnceLock::new(),
654 })
655 }
656}
657
658pub trait HistogramF64Base: Send + Sync {
660 fn records(&self, value: f64);
662}
663pub type HistogramF64 = LazyBoundMetric<
665 Arc<dyn MetricAttributable<Box<dyn HistogramF64Base>> + Send + Sync>,
666 Arc<dyn HistogramF64Base>,
667>;
668impl HistogramF64 {
669 pub fn new(
671 inner: Arc<dyn MetricAttributable<Box<dyn HistogramF64Base>> + Send + Sync>,
672 ) -> Self {
673 Self {
674 metric: inner,
675 attributes: MetricAttributes::Empty,
676 bound_cache: OnceLock::new(),
677 }
678 }
679 pub fn record(&self, value: f64, attributes: &MetricAttributes) {
681 match self.metric.with_attributes(attributes) {
682 Ok(base) => {
683 base.records(value);
684 }
685 Err(e) => {
686 dbg_panic!("Failed to initialize metric, will drop values: {e:?}",);
687 }
688 }
689 }
690}
691impl HistogramF64Base for HistogramF64 {
692 fn records(&self, value: f64) {
693 let bound = self.bound_cache.get_or_init(|| {
694 self.metric
695 .with_attributes(&self.attributes)
696 .map(Into::into)
697 .unwrap_or_else(|e| {
698 dbg_panic!("Failed to initialize metric, will drop values: {e:?}");
699 Arc::new(NoOpInstrument) as Arc<dyn HistogramF64Base>
700 })
701 });
702 bound.records(value);
703 }
704}
705impl MetricAttributable<HistogramF64> for HistogramF64 {
706 fn with_attributes(
707 &self,
708 attributes: &MetricAttributes,
709 ) -> Result<HistogramF64, Box<dyn std::error::Error>> {
710 Ok(Self {
711 metric: self.metric.clone(),
712 attributes: attributes.clone(),
713 bound_cache: OnceLock::new(),
714 })
715 }
716}
717
718pub trait HistogramDurationBase: Send + Sync {
720 fn records(&self, value: Duration);
722}
723
724pub type HistogramDurationImpl = LazyBoundMetric<
726 Arc<dyn MetricAttributable<Box<dyn HistogramDurationBase>> + Send + Sync>,
727 Arc<dyn HistogramDurationBase>,
728>;
729
730#[derive(Clone)]
732pub struct HistogramDuration {
733 primary: HistogramDurationImpl,
734 in_memory: Option<HeartbeatMetricType>,
735}
736impl HistogramDuration {
737 pub fn new(
739 inner: Arc<dyn MetricAttributable<Box<dyn HistogramDurationBase>> + Send + Sync>,
740 ) -> Self {
741 Self {
742 primary: LazyBoundMetric {
743 metric: inner,
744 attributes: MetricAttributes::Empty,
745 bound_cache: OnceLock::new(),
746 },
747 in_memory: None,
748 }
749 }
750 pub fn new_with_in_memory(
752 primary: Arc<dyn MetricAttributable<Box<dyn HistogramDurationBase>> + Send + Sync>,
753 in_memory: HeartbeatMetricType,
754 ) -> Self {
755 Self {
756 primary: LazyBoundMetric {
757 metric: primary,
758 attributes: MetricAttributes::Empty,
759 bound_cache: OnceLock::new(),
760 },
761 in_memory: Some(in_memory),
762 }
763 }
764 pub fn record(&self, value: Duration, attributes: &MetricAttributes) {
766 match self.primary.metric.with_attributes(attributes) {
767 Ok(base) => {
768 base.records(value);
769 }
770 Err(e) => {
771 dbg_panic!("Failed to initialize metric, will drop values: {e:?}",);
772 }
773 }
774
775 if let Some(ref in_mem) = self.in_memory {
776 in_mem.record_histogram_observation();
777 }
778 }
779
780 pub fn update_attributes(&mut self, new_attributes: MetricAttributes) {
782 self.primary.update_attributes(new_attributes.clone());
783 }
784}
785impl HistogramDurationBase for HistogramDuration {
786 fn records(&self, value: Duration) {
787 let bound = self.primary.bound_cache.get_or_init(|| {
788 self.primary
789 .metric
790 .with_attributes(&self.primary.attributes)
791 .map(Into::into)
792 .unwrap_or_else(|e| {
793 dbg_panic!("Failed to initialize metric, will drop values: {e:?}");
794 Arc::new(NoOpInstrument) as Arc<dyn HistogramDurationBase>
795 })
796 });
797 bound.records(value);
798
799 if let Some(ref in_mem) = self.in_memory {
800 in_mem.record_histogram_observation();
801 }
802 }
803}
804impl MetricAttributable<HistogramDuration> for HistogramDuration {
805 fn with_attributes(
806 &self,
807 attributes: &MetricAttributes,
808 ) -> Result<HistogramDuration, Box<dyn std::error::Error>> {
809 let primary = LazyBoundMetric {
810 metric: self.primary.metric.clone(),
811 attributes: attributes.clone(),
812 bound_cache: OnceLock::new(),
813 };
814
815 Ok(HistogramDuration {
816 primary,
817 in_memory: self.in_memory.clone(),
818 })
819 }
820}
821
822pub trait GaugeBase: Send + Sync {
824 fn records(&self, value: u64);
826}
827
828pub type GaugeImpl = LazyBoundMetric<
830 Arc<dyn MetricAttributable<Box<dyn GaugeBase>> + Send + Sync>,
831 Arc<dyn GaugeBase>,
832>;
833
834#[derive(Clone)]
836pub struct Gauge {
837 primary: GaugeImpl,
838 in_memory: Option<HeartbeatMetricType>,
839}
840impl Gauge {
841 pub fn new(inner: Arc<dyn MetricAttributable<Box<dyn GaugeBase>> + Send + Sync>) -> Self {
843 Self {
844 primary: LazyBoundMetric {
845 metric: inner,
846 attributes: MetricAttributes::Empty,
847 bound_cache: OnceLock::new(),
848 },
849 in_memory: None,
850 }
851 }
852
853 pub fn new_with_in_memory(
855 primary: Arc<dyn MetricAttributable<Box<dyn GaugeBase>> + Send + Sync>,
856 in_memory: HeartbeatMetricType,
857 ) -> Self {
858 Self {
859 primary: LazyBoundMetric {
860 metric: primary,
861 attributes: MetricAttributes::Empty,
862 bound_cache: OnceLock::new(),
863 },
864 in_memory: Some(in_memory),
865 }
866 }
867
868 pub fn record(&self, value: u64, attributes: &MetricAttributes) {
870 match self.primary.metric.with_attributes(attributes) {
871 Ok(base) => base.records(value),
872 Err(e) => {
873 dbg_panic!("Failed to initialize primary metric, will drop values: {e:?}");
874 }
875 }
876
877 if let Some(ref in_mem) = self.in_memory {
878 in_mem.record_gauge(value, attributes);
879 }
880 }
881
882 pub fn update_attributes(&mut self, new_attributes: MetricAttributes) {
884 self.primary.update_attributes(new_attributes.clone());
885 }
886}
887impl GaugeBase for Gauge {
888 fn records(&self, value: u64) {
889 let bound = self.primary.bound_cache.get_or_init(|| {
890 self.primary
891 .metric
892 .with_attributes(&self.primary.attributes)
893 .map(Into::into)
894 .unwrap_or_else(|e| {
895 dbg_panic!("Failed to initialize primary metric, will drop values: {e:?}");
896 Arc::new(NoOpInstrument) as Arc<dyn GaugeBase>
897 })
898 });
899 bound.records(value);
900
901 if let Some(ref in_mem) = self.in_memory {
902 in_mem.record_gauge(value, &self.primary.attributes);
903 }
904 }
905}
906impl MetricAttributable<Gauge> for Gauge {
907 fn with_attributes(
908 &self,
909 attributes: &MetricAttributes,
910 ) -> Result<Gauge, Box<dyn std::error::Error>> {
911 let primary = LazyBoundMetric {
912 metric: self.primary.metric.clone(),
913 attributes: attributes.clone(),
914 bound_cache: OnceLock::new(),
915 };
916
917 Ok(Gauge {
918 primary,
919 in_memory: self.in_memory.clone(),
920 })
921 }
922}
923
924pub trait GaugeF64Base: Send + Sync {
926 fn records(&self, value: f64);
928}
929pub type GaugeF64 = LazyBoundMetric<
931 Arc<dyn MetricAttributable<Box<dyn GaugeF64Base>> + Send + Sync>,
932 Arc<dyn GaugeF64Base>,
933>;
934impl GaugeF64 {
935 pub fn new(inner: Arc<dyn MetricAttributable<Box<dyn GaugeF64Base>> + Send + Sync>) -> Self {
937 Self {
938 metric: inner,
939 attributes: MetricAttributes::Empty,
940 bound_cache: OnceLock::new(),
941 }
942 }
943 pub fn record(&self, value: f64, attributes: &MetricAttributes) {
945 match self.metric.with_attributes(attributes) {
946 Ok(base) => {
947 base.records(value);
948 }
949 Err(e) => {
950 dbg_panic!("Failed to initialize metric, will drop values: {e:?}",);
951 }
952 }
953 }
954}
955impl GaugeF64Base for GaugeF64 {
956 fn records(&self, value: f64) {
957 let bound = self.bound_cache.get_or_init(|| {
958 self.metric
959 .with_attributes(&self.attributes)
960 .map(Into::into)
961 .unwrap_or_else(|e| {
962 dbg_panic!("Failed to initialize metric, will drop values: {e:?}");
963 Arc::new(NoOpInstrument) as Arc<dyn GaugeF64Base>
964 })
965 });
966 bound.records(value);
967 }
968}
969impl MetricAttributable<GaugeF64> for GaugeF64 {
970 fn with_attributes(
971 &self,
972 attributes: &MetricAttributes,
973 ) -> Result<GaugeF64, Box<dyn std::error::Error>> {
974 Ok(Self {
975 metric: self.metric.clone(),
976 attributes: attributes.clone(),
977 bound_cache: OnceLock::new(),
978 })
979 }
980}
981
982#[derive(Debug)]
984pub struct NoOpCoreMeter;
985impl CoreMeter for NoOpCoreMeter {
986 fn new_attributes(&self, attribs: NewAttributes) -> MetricAttributes {
987 MetricAttributes::NoOp(Arc::new(attribs.into()))
988 }
989
990 fn extend_attributes(
991 &self,
992 existing: MetricAttributes,
993 attribs: NewAttributes,
994 ) -> MetricAttributes {
995 if let MetricAttributes::NoOp(labels) = existing {
996 let mut labels = (*labels).clone();
997 labels.extend::<HashMap<String, String>>(attribs.into());
998 MetricAttributes::NoOp(Arc::new(labels))
999 } else {
1000 dbg_panic!("Must use NoOp attributes with a NoOp metric implementation");
1001 existing
1002 }
1003 }
1004
1005 fn counter(&self, _: MetricParameters) -> Counter {
1006 Counter::new(Arc::new(NoOpInstrument))
1007 }
1008
1009 fn histogram(&self, _: MetricParameters) -> Histogram {
1010 Histogram::new(Arc::new(NoOpInstrument))
1011 }
1012
1013 fn histogram_f64(&self, _: MetricParameters) -> HistogramF64 {
1014 HistogramF64::new(Arc::new(NoOpInstrument))
1015 }
1016
1017 fn histogram_duration(&self, _: MetricParameters) -> HistogramDuration {
1018 HistogramDuration::new(Arc::new(NoOpInstrument))
1019 }
1020
1021 fn gauge(&self, _: MetricParameters) -> Gauge {
1022 Gauge::new(Arc::new(NoOpInstrument))
1023 }
1024
1025 fn gauge_f64(&self, _: MetricParameters) -> GaugeF64 {
1026 GaugeF64::new(Arc::new(NoOpInstrument))
1027 }
1028}
1029
1030macro_rules! impl_metric_attributable {
1031 ($base_trait:ident, $rt:ty, $init:expr) => {
1032 impl MetricAttributable<Box<dyn $base_trait>> for $rt {
1033 fn with_attributes(
1034 &self,
1035 _: &MetricAttributes,
1036 ) -> Result<Box<dyn $base_trait>, Box<dyn std::error::Error>> {
1037 Ok(Box::new($init))
1038 }
1039 }
1040 };
1041}
1042
1043pub struct NoOpInstrument;
1045macro_rules! impl_no_op {
1046 ($base_trait:ident, $value_type:ty) => {
1047 impl_metric_attributable!($base_trait, NoOpInstrument, NoOpInstrument);
1048 impl $base_trait for NoOpInstrument {
1049 fn records(&self, _: $value_type) {}
1050 }
1051 };
1052 ($base_trait:ident) => {
1053 impl_metric_attributable!($base_trait, NoOpInstrument, NoOpInstrument);
1054 impl $base_trait for NoOpInstrument {
1055 fn adds(&self, _: u64) {}
1056 }
1057 };
1058}
1059impl_no_op!(CounterBase);
1060impl_no_op!(HistogramBase, u64);
1061impl_no_op!(HistogramF64Base, f64);
1062impl_no_op!(HistogramDurationBase, Duration);
1063impl_no_op!(GaugeBase, u64);
1064impl_no_op!(GaugeF64Base, f64);
1065
1066#[cfg(test)]
1067mod tests {
1068 use super::*;
1069 use std::{
1070 collections::HashMap,
1071 sync::{
1072 Arc,
1073 atomic::{AtomicU64, Ordering},
1074 },
1075 };
1076
1077 #[test]
1078 fn in_memory_attributes_provide_label_values() {
1079 let meter = NoOpCoreMeter;
1080 let base_attrs = meter.new_attributes(NewAttributes::default());
1081 let attrs = meter.extend_attributes(
1082 base_attrs,
1083 NewAttributes::from(vec![MetricKeyValue::new("poller_type", "workflow_task")]),
1084 );
1085
1086 let value = Arc::new(AtomicU64::new(0));
1087 let mut metrics = HashMap::new();
1088 metrics.insert("workflow_task".to_string(), value.clone());
1089 let heartbeat_metric = HeartbeatMetricType::WithLabel {
1090 label_key: "poller_type".to_string(),
1091 metrics,
1092 };
1093
1094 heartbeat_metric.record_gauge(3, &attrs);
1095
1096 assert_eq!(value.load(Ordering::Relaxed), 3);
1097 assert_eq!(
1098 label_value_from_attributes(&attrs, "poller_type").as_deref(),
1099 Some("workflow_task")
1100 );
1101 }
1102}
1103
1104#[cfg(feature = "otel")]
1105mod otel {
1106 use super::*;
1107 use opentelemetry::{KeyValue, metrics};
1108
1109 #[derive(Clone)]
1110 struct InstrumentWithAttributes<I> {
1111 inner: I,
1112 attributes: MetricAttributes,
1113 }
1114
1115 impl From<MetricKeyValue> for KeyValue {
1116 fn from(kv: MetricKeyValue) -> Self {
1117 KeyValue::new(kv.key, kv.value)
1118 }
1119 }
1120
1121 impl From<MetricValue> for opentelemetry::Value {
1122 fn from(mv: MetricValue) -> Self {
1123 match mv {
1124 MetricValue::String(s) => opentelemetry::Value::String(s.into()),
1125 MetricValue::Int(i) => opentelemetry::Value::I64(i),
1126 MetricValue::Float(f) => opentelemetry::Value::F64(f),
1127 MetricValue::Bool(b) => opentelemetry::Value::Bool(b),
1128 }
1129 }
1130 }
1131
1132 impl MetricAttributable<Box<dyn CounterBase>> for metrics::Counter<u64> {
1133 fn with_attributes(
1134 &self,
1135 attributes: &MetricAttributes,
1136 ) -> Result<Box<dyn CounterBase>, Box<dyn std::error::Error>> {
1137 Ok(Box::new(InstrumentWithAttributes {
1138 inner: self.clone(),
1139 attributes: attributes.clone(),
1140 }))
1141 }
1142 }
1143
1144 impl CounterBase for InstrumentWithAttributes<metrics::Counter<u64>> {
1145 fn adds(&self, value: u64) {
1146 if let MetricAttributes::OTel { kvs } = &self.attributes {
1147 self.inner.add(value, kvs);
1148 } else {
1149 dbg_panic!("Must use OTel attributes with an OTel metric implementation");
1150 }
1151 }
1152 }
1153
1154 impl MetricAttributable<Box<dyn GaugeBase>> for metrics::Gauge<u64> {
1155 fn with_attributes(
1156 &self,
1157 attributes: &MetricAttributes,
1158 ) -> Result<Box<dyn GaugeBase>, Box<dyn std::error::Error>> {
1159 Ok(Box::new(InstrumentWithAttributes {
1160 inner: self.clone(),
1161 attributes: attributes.clone(),
1162 }))
1163 }
1164 }
1165
1166 impl GaugeBase for InstrumentWithAttributes<metrics::Gauge<u64>> {
1167 fn records(&self, value: u64) {
1168 if let MetricAttributes::OTel { kvs } = &self.attributes {
1169 self.inner.record(value, kvs);
1170 } else {
1171 dbg_panic!("Must use OTel attributes with an OTel metric implementation");
1172 }
1173 }
1174 }
1175
1176 impl MetricAttributable<Box<dyn GaugeF64Base>> for metrics::Gauge<f64> {
1177 fn with_attributes(
1178 &self,
1179 attributes: &MetricAttributes,
1180 ) -> Result<Box<dyn GaugeF64Base>, Box<dyn std::error::Error>> {
1181 Ok(Box::new(InstrumentWithAttributes {
1182 inner: self.clone(),
1183 attributes: attributes.clone(),
1184 }))
1185 }
1186 }
1187
1188 impl GaugeF64Base for InstrumentWithAttributes<metrics::Gauge<f64>> {
1189 fn records(&self, value: f64) {
1190 if let MetricAttributes::OTel { kvs } = &self.attributes {
1191 self.inner.record(value, kvs);
1192 } else {
1193 dbg_panic!("Must use OTel attributes with an OTel metric implementation");
1194 }
1195 }
1196 }
1197
1198 impl MetricAttributable<Box<dyn HistogramBase>> for metrics::Histogram<u64> {
1199 fn with_attributes(
1200 &self,
1201 attributes: &MetricAttributes,
1202 ) -> Result<Box<dyn HistogramBase>, Box<dyn std::error::Error>> {
1203 Ok(Box::new(InstrumentWithAttributes {
1204 inner: self.clone(),
1205 attributes: attributes.clone(),
1206 }))
1207 }
1208 }
1209
1210 impl HistogramBase for InstrumentWithAttributes<metrics::Histogram<u64>> {
1211 fn records(&self, value: u64) {
1212 if let MetricAttributes::OTel { kvs } = &self.attributes {
1213 self.inner.record(value, kvs);
1214 } else {
1215 dbg_panic!("Must use OTel attributes with an OTel metric implementation");
1216 }
1217 }
1218 }
1219
1220 impl MetricAttributable<Box<dyn HistogramF64Base>> for metrics::Histogram<f64> {
1221 fn with_attributes(
1222 &self,
1223 attributes: &MetricAttributes,
1224 ) -> Result<Box<dyn HistogramF64Base>, Box<dyn std::error::Error>> {
1225 Ok(Box::new(InstrumentWithAttributes {
1226 inner: self.clone(),
1227 attributes: attributes.clone(),
1228 }))
1229 }
1230 }
1231
1232 impl HistogramF64Base for InstrumentWithAttributes<metrics::Histogram<f64>> {
1233 fn records(&self, value: f64) {
1234 if let MetricAttributes::OTel { kvs } = &self.attributes {
1235 self.inner.record(value, kvs);
1236 } else {
1237 dbg_panic!("Must use OTel attributes with an OTel metric implementation");
1238 }
1239 }
1240 }
1241}
1242
1243#[derive(Debug, Clone, PartialEq, Default)]
1245pub struct OrderedPromLabelSet {
1246 attributes: BTreeMap<String, MetricValue>,
1247}
1248
1249impl OrderedPromLabelSet {
1250 pub const fn new() -> Self {
1252 Self {
1253 attributes: BTreeMap::new(),
1254 }
1255 }
1256 pub fn keys_ordered(&self) -> impl Iterator<Item = &str> {
1258 self.attributes.keys().map(|s| s.as_str())
1259 }
1260 pub fn as_prom_labels(&self) -> HashMap<&str, String> {
1262 let mut labels = HashMap::new();
1263 for (k, v) in self.attributes.iter() {
1264 labels.insert(k.as_str(), v.to_string());
1265 }
1266 labels
1267 }
1268 pub fn add_kv(&mut self, kv: MetricKeyValue) {
1270 self.attributes.insert(kv.key.replace('-', "_"), kv.value);
1272 }
1273}
1274
1275impl From<NewAttributes> for OrderedPromLabelSet {
1276 fn from(n: NewAttributes) -> Self {
1277 let mut me = Self::default();
1278 for kv in n.attributes {
1279 me.add_kv(kv);
1280 }
1281 me
1282 }
1283}
1284
1285#[derive(Debug, derive_more::Constructor)]
1286pub(crate) struct PrefixedMetricsMeter<CM> {
1287 prefix: String,
1288 meter: CM,
1289}
1290impl<CM: CoreMeter> CoreMeter for PrefixedMetricsMeter<CM> {
1291 fn new_attributes(&self, attribs: NewAttributes) -> MetricAttributes {
1292 self.meter.new_attributes(attribs)
1293 }
1294
1295 fn extend_attributes(
1296 &self,
1297 existing: MetricAttributes,
1298 attribs: NewAttributes,
1299 ) -> MetricAttributes {
1300 self.meter.extend_attributes(existing, attribs)
1301 }
1302
1303 fn counter(&self, mut params: MetricParameters) -> Counter {
1304 params.name = (self.prefix.clone() + &*params.name).into();
1305 self.meter.counter(params)
1306 }
1307
1308 fn histogram(&self, mut params: MetricParameters) -> Histogram {
1309 params.name = (self.prefix.clone() + &*params.name).into();
1310 self.meter.histogram(params)
1311 }
1312
1313 fn histogram_f64(&self, mut params: MetricParameters) -> HistogramF64 {
1314 params.name = (self.prefix.clone() + &*params.name).into();
1315 self.meter.histogram_f64(params)
1316 }
1317
1318 fn histogram_duration(&self, mut params: MetricParameters) -> HistogramDuration {
1319 params.name = (self.prefix.clone() + &*params.name).into();
1320 self.meter.histogram_duration(params)
1321 }
1322
1323 fn gauge(&self, mut params: MetricParameters) -> Gauge {
1324 params.name = (self.prefix.clone() + &*params.name).into();
1325 self.meter.gauge(params)
1326 }
1327
1328 fn gauge_f64(&self, mut params: MetricParameters) -> GaugeF64 {
1329 params.name = (self.prefix.clone() + &*params.name).into();
1330 self.meter.gauge_f64(params)
1331 }
1332}