Skip to main content

temporalio_common/telemetry/
metrics.rs

1use crate::{dbg_panic, telemetry::TaskQueueLabelStrategy};
2use std::{
3    borrow::Cow,
4    collections::{BTreeMap, HashMap},
5    fmt::{Debug, Display},
6    ops::Deref,
7    sync::{
8        Arc, OnceLock,
9        atomic::{AtomicU64, Ordering},
10    },
11    time::Duration,
12};
13
14#[cfg(feature = "core-based-sdk")]
15pub mod core;
16
17/// The string name (which may be prefixed) for this metric
18pub const WORKFLOW_E2E_LATENCY_HISTOGRAM_NAME: &str = "workflow_endtoend_latency";
19/// The string name (which may be prefixed) for this metric
20pub const WORKFLOW_TASK_SCHED_TO_START_LATENCY_HISTOGRAM_NAME: &str =
21    "workflow_task_schedule_to_start_latency";
22/// The string name (which may be prefixed) for this metric
23pub const WORKFLOW_TASK_REPLAY_LATENCY_HISTOGRAM_NAME: &str = "workflow_task_replay_latency";
24/// The string name (which may be prefixed) for this metric
25pub const WORKFLOW_TASK_EXECUTION_LATENCY_HISTOGRAM_NAME: &str = "workflow_task_execution_latency";
26/// The string name (which may be prefixed) for this metric
27pub const ACTIVITY_SCHED_TO_START_LATENCY_HISTOGRAM_NAME: &str =
28    "activity_schedule_to_start_latency";
29/// The string name (which may be prefixed) for this metric
30pub const ACTIVITY_EXEC_LATENCY_HISTOGRAM_NAME: &str = "activity_execution_latency";
31
32/// Helps define buckets once in terms of millis, but also generates a seconds version
33macro_rules! define_latency_buckets {
34    ($(($metric_name:pat, $name:ident, $sec_name:ident, [$($bucket:expr),*])),*) => {
35        $(
36            pub(super) static $name: &[f64] = &[$($bucket,)*];
37            pub(super) static $sec_name: &[f64] = &[$( $bucket / 1000.0, )*];
38        )*
39
40        /// Returns the default histogram buckets that lang should use for a given metric name if
41        /// they have not been overridden by the user. If `use_seconds` is true, returns buckets
42        /// in terms of seconds rather than milliseconds.
43        ///
44        /// The name must *not* be prefixed with `temporal_`
45        pub fn default_buckets_for(histo_name: &str, use_seconds: bool) -> &'static [f64] {
46            match histo_name {
47                $(
48                    $metric_name => { if use_seconds { &$sec_name } else { &$name } },
49                )*
50            }
51        }
52    };
53}
54
55define_latency_buckets!(
56    (
57        WORKFLOW_E2E_LATENCY_HISTOGRAM_NAME,
58        WF_LATENCY_MS_BUCKETS,
59        WF_LATENCY_S_BUCKETS,
60        [
61            100.,
62            500.,
63            1000.,
64            1500.,
65            2000.,
66            5000.,
67            10_000.,
68            30_000.,
69            60_000.,
70            120_000.,
71            300_000.,
72            600_000.,
73            1_800_000.,  // 30 min
74            3_600_000.,  //  1 hr
75            30_600_000., // 10 hrs
76            8.64e7       // 24 hrs
77        ]
78    ),
79    (
80        WORKFLOW_TASK_EXECUTION_LATENCY_HISTOGRAM_NAME
81            | WORKFLOW_TASK_REPLAY_LATENCY_HISTOGRAM_NAME,
82        WF_TASK_MS_BUCKETS,
83        WF_TASK_S_BUCKETS,
84        [1., 10., 20., 50., 100., 200., 500., 1000.]
85    ),
86    (
87        ACTIVITY_EXEC_LATENCY_HISTOGRAM_NAME,
88        ACT_EXE_MS_BUCKETS,
89        ACT_EXE_S_BUCKETS,
90        [50., 100., 500., 1000., 5000., 10_000., 60_000.]
91    ),
92    (
93        WORKFLOW_TASK_SCHED_TO_START_LATENCY_HISTOGRAM_NAME
94            | ACTIVITY_SCHED_TO_START_LATENCY_HISTOGRAM_NAME,
95        TASK_SCHED_TO_START_MS_BUCKETS,
96        TASK_SCHED_TO_START_S_BUCKETS,
97        [100., 500., 1000., 5000., 10_000., 100_000., 1_000_000.]
98    ),
99    (
100        _,
101        DEFAULT_MS_BUCKETS,
102        DEFAULT_S_BUCKETS,
103        [50., 100., 500., 1000., 2500., 10_000.]
104    )
105);
106
107/// Implementors of this trait are expected to be defined in each language's bridge.
108/// The implementor is responsible for the allocation/instantiation of new metric meters which
109/// Core has requested.
110pub trait CoreMeter: Send + Sync + Debug {
111    /// Given some k/v pairs, create a return a new instantiated instance of metric attributes.
112    /// Only [MetricAttributes] created by this meter can be used when calling record on instruments
113    /// created by this meter.
114    fn new_attributes(&self, attribs: NewAttributes) -> MetricAttributes;
115    /// Extend some existing attributes with new values. Implementations should create new instances
116    /// when doing so, rather than mutating whatever is backing the passed in `existing` attributes.
117    /// Ideally that new instance retains a ref to the extended old attribute, promoting re-use.
118    fn extend_attributes(
119        &self,
120        existing: MetricAttributes,
121        attribs: NewAttributes,
122    ) -> MetricAttributes;
123    /// Create a new counter instrument.
124    fn counter(&self, params: MetricParameters) -> Counter;
125
126    /// Create a counter with in-memory tracking for worker heartbeating reporting
127    fn counter_with_in_memory(
128        &self,
129        params: MetricParameters,
130        in_memory_counter: HeartbeatMetricType,
131    ) -> Counter {
132        let primary_counter = self.counter(params);
133
134        Counter::new_with_in_memory(primary_counter.primary.metric.clone(), in_memory_counter)
135    }
136
137    /// Create a new histogram instrument recording `u64` values.
138    fn histogram(&self, params: MetricParameters) -> Histogram;
139    /// Create a new histogram instrument recording `f64` values.
140    fn histogram_f64(&self, params: MetricParameters) -> HistogramF64;
141    /// Create a histogram which records Durations. Implementations should choose to emit in
142    /// either milliseconds or seconds depending on how they have been configured.
143    /// [MetricParameters::unit] should be overwritten by implementations to be `ms` or `s`
144    /// accordingly.
145    fn histogram_duration(&self, params: MetricParameters) -> HistogramDuration;
146
147    /// Create a histogram duration with in-memory tracking for worker heartbeating reporting
148    fn histogram_duration_with_in_memory(
149        &self,
150        params: MetricParameters,
151        in_memory_hist: HeartbeatMetricType,
152    ) -> HistogramDuration {
153        let primary_hist = self.histogram_duration(params);
154
155        HistogramDuration::new_with_in_memory(primary_hist.primary.metric.clone(), in_memory_hist)
156    }
157    /// Create a new gauge instrument recording `u64` values.
158    fn gauge(&self, params: MetricParameters) -> Gauge;
159
160    /// Create a gauge with in-memory tracking for worker heartbeating reporting
161    fn gauge_with_in_memory(
162        &self,
163        params: MetricParameters,
164        in_memory_metrics: HeartbeatMetricType,
165    ) -> Gauge {
166        let primary_gauge = self.gauge(params.clone());
167        Gauge::new_with_in_memory(primary_gauge.primary.metric.clone(), in_memory_metrics)
168    }
169
170    /// Create a new gauge instrument recording `f64` values.
171    fn gauge_f64(&self, params: MetricParameters) -> GaugeF64;
172}
173
174/// Provides a generic way to record metrics in memory.
175/// This can be done either with individual metrics or more fine-grained metrics
176/// that vary by a set of labels for the same metric.
177#[derive(Clone, Debug)]
178pub enum HeartbeatMetricType {
179    /// A single counter shared across all label values.
180    Individual(Arc<AtomicU64>),
181    /// Per-label-value counters, keyed by a specific label.
182    WithLabel {
183        /// The label key to match against metric attributes.
184        label_key: String,
185        /// Map from label value to its atomic counter.
186        metrics: HashMap<String, Arc<AtomicU64>>,
187    },
188}
189
190impl HeartbeatMetricType {
191    fn record_counter(&self, delta: u64) {
192        match self {
193            HeartbeatMetricType::Individual(metric) => {
194                metric.fetch_add(delta, Ordering::Relaxed);
195            }
196            HeartbeatMetricType::WithLabel { .. } => {
197                dbg_panic!("Counter does not support in-memory metric with labels");
198            }
199        }
200    }
201
202    fn record_histogram_observation(&self) {
203        match self {
204            HeartbeatMetricType::Individual(metric) => {
205                metric.fetch_add(1, Ordering::Relaxed);
206            }
207            HeartbeatMetricType::WithLabel { .. } => {
208                dbg_panic!("Histogram does not support in-memory metric with labels");
209            }
210        }
211    }
212
213    fn record_gauge(&self, value: u64, attributes: &MetricAttributes) {
214        match self {
215            HeartbeatMetricType::Individual(metric) => {
216                metric.store(value, Ordering::Relaxed);
217            }
218            HeartbeatMetricType::WithLabel { label_key, metrics } => {
219                if let Some(metric) = label_value_from_attributes(attributes, label_key.as_str())
220                    .and_then(|label_value| metrics.get(label_value.as_str()))
221                {
222                    metric.store(value, Ordering::Relaxed)
223                }
224            }
225        }
226    }
227}
228
229fn label_value_from_attributes(attributes: &MetricAttributes, key: &str) -> Option<String> {
230    match attributes {
231        MetricAttributes::Prometheus { labels } => labels.as_prom_labels().get(key).cloned(),
232        #[cfg(feature = "otel")]
233        MetricAttributes::OTel { kvs } => kvs
234            .iter()
235            .find(|kv| kv.key.as_str() == key)
236            .map(|kv| kv.value.to_string()),
237        MetricAttributes::NoOp(labels) => labels.get(key).cloned(),
238        _ => None,
239    }
240}
241
242/// Parameters used when creating a new metric instrument (name, description, unit).
243#[derive(Debug, Clone, bon::Builder)]
244pub struct MetricParameters {
245    /// The name for the new metric/instrument
246    #[builder(into)]
247    pub name: Cow<'static, str>,
248    /// A description that will appear in metadata if the backend supports it
249    #[builder(into, default = Cow::Borrowed(""))]
250    pub description: Cow<'static, str>,
251    /// Unit information that will appear in metadata if the backend supports it
252    #[builder(into, default = Cow::Borrowed(""))]
253    pub unit: Cow<'static, str>,
254}
255impl From<&'static str> for MetricParameters {
256    fn from(value: &'static str) -> Self {
257        Self {
258            name: value.into(),
259            description: Default::default(),
260            unit: Default::default(),
261        }
262    }
263}
264
265/// Wraps a [CoreMeter] to enable the attaching of default labels to metrics. Cloning is cheap.
266#[derive(Clone, Debug)]
267pub struct TemporalMeter {
268    inner: Arc<dyn CoreMeter>,
269    default_attribs: MetricAttributes,
270    task_queue_label_strategy: TaskQueueLabelStrategy,
271}
272
273impl TemporalMeter {
274    /// Create a new TemporalMeter.
275    pub fn new(
276        meter: Arc<dyn CoreMeter>,
277        default_attribs: NewAttributes,
278        task_queue_label_strategy: TaskQueueLabelStrategy,
279    ) -> Self {
280        Self {
281            default_attribs: meter.new_attributes(default_attribs),
282            inner: meter,
283            task_queue_label_strategy,
284        }
285    }
286
287    /// Creates a TemporalMeter that records nothing
288    pub fn no_op() -> Self {
289        Self {
290            inner: Arc::new(NoOpCoreMeter),
291            default_attribs: MetricAttributes::NoOp(Arc::new(Default::default())),
292            task_queue_label_strategy: TaskQueueLabelStrategy::UseNormal,
293        }
294    }
295
296    /// Returns the default attributes this meter uses.
297    pub fn get_default_attributes(&self) -> &MetricAttributes {
298        &self.default_attribs
299    }
300
301    /// Returns the Task Queue labeling strategy this meter uses.
302    pub fn get_task_queue_label_strategy(&self) -> &TaskQueueLabelStrategy {
303        &self.task_queue_label_strategy
304    }
305
306    /// Add some new attributes to the default set already in this meter.
307    pub fn merge_attributes(&mut self, new_attribs: NewAttributes) {
308        self.default_attribs = self.extend_attributes(self.default_attribs.clone(), new_attribs);
309    }
310}
311
312impl Deref for TemporalMeter {
313    type Target = dyn CoreMeter;
314    fn deref(&self) -> &Self::Target {
315        self.inner.as_ref()
316    }
317}
318
319impl CoreMeter for Arc<dyn CoreMeter> {
320    fn new_attributes(&self, attribs: NewAttributes) -> MetricAttributes {
321        self.as_ref().new_attributes(attribs)
322    }
323
324    fn extend_attributes(
325        &self,
326        existing: MetricAttributes,
327        attribs: NewAttributes,
328    ) -> MetricAttributes {
329        self.as_ref().extend_attributes(existing, attribs)
330    }
331
332    fn counter(&self, params: MetricParameters) -> Counter {
333        self.as_ref().counter(params)
334    }
335    fn histogram(&self, params: MetricParameters) -> Histogram {
336        self.as_ref().histogram(params)
337    }
338
339    fn histogram_f64(&self, params: MetricParameters) -> HistogramF64 {
340        self.as_ref().histogram_f64(params)
341    }
342
343    fn histogram_duration(&self, params: MetricParameters) -> HistogramDuration {
344        self.as_ref().histogram_duration(params)
345    }
346
347    fn gauge(&self, params: MetricParameters) -> Gauge {
348        self.as_ref().gauge(params)
349    }
350
351    fn gauge_f64(&self, params: MetricParameters) -> GaugeF64 {
352        self.as_ref().gauge_f64(params)
353    }
354}
355
356/// Attributes which are provided every time a call to record a specific metric is made.
357/// Implementors must be very cheap to clone, as these attributes will be re-used frequently.
358#[derive(Clone, Debug)]
359#[non_exhaustive]
360pub enum MetricAttributes {
361    /// OpenTelemetry-backed attributes with key-value pairs.
362    #[cfg(feature = "otel")]
363    OTel {
364        /// The OpenTelemetry key-value pairs for this attribute set.
365        kvs: Arc<Vec<opentelemetry::KeyValue>>,
366    },
367    /// Prometheus-backed attributes with ordered labels.
368    Prometheus {
369        /// The ordered label set.
370        labels: Arc<OrderedPromLabelSet>,
371    },
372    /// Buffered attributes used by core-based SDKs for deferred metric initialization.
373    #[cfg(feature = "core-based-sdk")]
374    Buffer(core::BufferAttributes),
375    /// Dynamic attributes backed by a lang-side custom implementation.
376    #[cfg(feature = "core-based-sdk")]
377    Dynamic(Arc<dyn core::CustomMetricAttributes>),
378    /// No-op attributes that store labels but do not record.
379    NoOp(Arc<HashMap<String, String>>),
380    /// Empty placeholder attributes.
381    Empty,
382}
383
384/// Options that are attached to metrics on a per-call basis
385#[derive(Clone, Debug, Default, derive_more::Constructor)]
386pub struct NewAttributes {
387    /// The key-value pairs for this attribute set.
388    pub attributes: Vec<MetricKeyValue>,
389}
390impl NewAttributes {
391    /// Append additional key-value pairs to this attribute set.
392    pub fn extend(&mut self, new_kvs: impl IntoIterator<Item = MetricKeyValue>) {
393        self.attributes.extend(new_kvs)
394    }
395}
396impl<I> From<I> for NewAttributes
397where
398    I: IntoIterator<Item = MetricKeyValue>,
399{
400    fn from(value: I) -> Self {
401        Self {
402            attributes: value.into_iter().collect(),
403        }
404    }
405}
406
407impl From<NewAttributes> for HashMap<String, String> {
408    fn from(value: NewAttributes) -> Self {
409        value
410            .attributes
411            .into_iter()
412            .map(|kv| (kv.key, kv.value.to_string()))
413            .collect()
414    }
415}
416
417/// A K/V pair that can be used to label a specific recording of a metric
418#[derive(Clone, Debug, PartialEq)]
419pub struct MetricKeyValue {
420    /// The label key.
421    pub key: String,
422    /// The label value.
423    pub value: MetricValue,
424}
425impl MetricKeyValue {
426    /// Create a new key-value pair.
427    pub fn new(key: impl Into<String>, value: impl Into<MetricValue>) -> Self {
428        Self {
429            key: key.into(),
430            value: value.into(),
431        }
432    }
433}
434
435/// Values metric labels may assume
436#[derive(Clone, Debug, PartialEq, derive_more::From)]
437pub enum MetricValue {
438    /// A string label value.
439    String(String),
440    /// An integer label value.
441    Int(i64),
442    /// A floating-point label value.
443    Float(f64),
444    /// A boolean label value.
445    Bool(bool),
446    // can add array if needed
447}
448impl From<&'static str> for MetricValue {
449    fn from(value: &'static str) -> Self {
450        MetricValue::String(value.to_string())
451    }
452}
453impl Display for MetricValue {
454    fn fmt(&self, f1: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
455        match self {
456            MetricValue::String(s) => write!(f1, "{s}"),
457            MetricValue::Int(i) => write!(f1, "{i}"),
458            MetricValue::Float(f) => write!(f1, "{f}"),
459            MetricValue::Bool(b) => write!(f1, "{b}"),
460        }
461    }
462}
463
464/// Trait for metric instruments that can be bound to a set of attributes.
465pub trait MetricAttributable<Base> {
466    /// Replace any existing attributes on this metric with new ones, and return a new copy
467    /// of the metric, or a base version, which can be used to record values.
468    ///
469    /// Note that this operation is relatively expensive compared to simply recording a value
470    /// without any additional attributes, so users should prefer to save the metric instance
471    /// after calling this, and use the value-only methods afterward.
472    ///
473    /// This operation may fail if the underlying metrics implementation disallows the registration
474    /// of a new metric, or encounters any other issue.
475    fn with_attributes(
476        &self,
477        attributes: &MetricAttributes,
478    ) -> Result<Base, Box<dyn std::error::Error>>;
479}
480
481/// A metric that lazily binds to its attributes on first use, caching the result.
482#[derive(Clone)]
483pub struct LazyBoundMetric<T, B> {
484    metric: T,
485    attributes: MetricAttributes,
486    bound_cache: OnceLock<B>,
487}
488impl<T, B> LazyBoundMetric<T, B> {
489    /// Replace the attributes and invalidate the cached binding.
490    pub fn update_attributes(&mut self, new_attributes: MetricAttributes) {
491        self.attributes = new_attributes;
492        self.bound_cache = OnceLock::new();
493    }
494}
495
496/// Base trait for counter implementations.
497pub trait CounterBase: Send + Sync {
498    /// Increment the counter by `value`.
499    fn adds(&self, value: u64);
500}
501
502/// The lazy-bound inner type for [`Counter`].
503pub type CounterImpl = LazyBoundMetric<
504    Arc<dyn MetricAttributable<Box<dyn CounterBase>> + Send + Sync>,
505    Arc<dyn CounterBase>,
506>;
507
508/// A counter metric instrument that optionally tracks values in memory for heartbeating.
509#[derive(Clone)]
510pub struct Counter {
511    primary: CounterImpl,
512    in_memory: Option<HeartbeatMetricType>,
513}
514impl Counter {
515    /// Create a new counter from an attributable metric source.
516    pub fn new(inner: Arc<dyn MetricAttributable<Box<dyn CounterBase>> + Send + Sync>) -> Self {
517        Self {
518            primary: LazyBoundMetric {
519                metric: inner,
520                attributes: MetricAttributes::Empty,
521                bound_cache: OnceLock::new(),
522            },
523            in_memory: None,
524        }
525    }
526
527    /// Create a new counter with an additional in-memory tracker for heartbeat reporting.
528    pub fn new_with_in_memory(
529        primary: Arc<dyn MetricAttributable<Box<dyn CounterBase>> + Send + Sync>,
530        in_memory: HeartbeatMetricType,
531    ) -> Self {
532        Self {
533            primary: LazyBoundMetric {
534                metric: primary,
535                attributes: MetricAttributes::Empty,
536                bound_cache: OnceLock::new(),
537            },
538            in_memory: Some(in_memory),
539        }
540    }
541
542    /// Increment the counter by `value` with the given attributes.
543    pub fn add(&self, value: u64, attributes: &MetricAttributes) {
544        match self.primary.metric.with_attributes(attributes) {
545            Ok(base) => base.adds(value),
546            Err(e) => {
547                dbg_panic!("Failed to initialize primary metric, will drop values: {e:?}");
548            }
549        }
550
551        if let Some(ref in_mem) = self.in_memory {
552            in_mem.record_counter(value);
553        }
554    }
555
556    /// Replace the attributes on the primary metric.
557    pub fn update_attributes(&mut self, new_attributes: MetricAttributes) {
558        self.primary.update_attributes(new_attributes.clone());
559    }
560}
561impl CounterBase for Counter {
562    fn adds(&self, value: u64) {
563        // TODO: Replace all of these with below when stable
564        //   https://doc.rust-lang.org/std/sync/struct.OnceLock.html#method.get_or_try_init
565        let bound = self.primary.bound_cache.get_or_init(|| {
566            self.primary
567                .metric
568                .with_attributes(&self.primary.attributes)
569                .map(Into::into)
570                .unwrap_or_else(|e| {
571                    dbg_panic!("Failed to initialize primary metric, will drop values: {e:?}");
572                    Arc::new(NoOpInstrument) as Arc<dyn CounterBase>
573                })
574        });
575        bound.adds(value);
576
577        if let Some(ref in_mem) = self.in_memory {
578            in_mem.record_counter(value);
579        }
580    }
581}
582impl MetricAttributable<Counter> for Counter {
583    fn with_attributes(
584        &self,
585        attributes: &MetricAttributes,
586    ) -> Result<Counter, Box<dyn std::error::Error>> {
587        let primary = LazyBoundMetric {
588            metric: self.primary.metric.clone(),
589            attributes: attributes.clone(),
590            bound_cache: OnceLock::new(),
591        };
592
593        Ok(Counter {
594            primary,
595            in_memory: self.in_memory.clone(),
596        })
597    }
598}
599
600/// Base trait for `u64` histogram implementations.
601pub trait HistogramBase: Send + Sync {
602    /// Record a `u64` observation.
603    fn records(&self, value: u64);
604}
605/// A `u64` histogram metric instrument.
606pub type Histogram = LazyBoundMetric<
607    Arc<dyn MetricAttributable<Box<dyn HistogramBase>> + Send + Sync>,
608    Arc<dyn HistogramBase>,
609>;
610impl Histogram {
611    /// Create a new histogram from an attributable metric source.
612    pub fn new(inner: Arc<dyn MetricAttributable<Box<dyn HistogramBase>> + Send + Sync>) -> Self {
613        Self {
614            metric: inner,
615            attributes: MetricAttributes::Empty,
616            bound_cache: OnceLock::new(),
617        }
618    }
619    /// Record a `u64` value with the given attributes.
620    pub fn record(&self, value: u64, attributes: &MetricAttributes) {
621        match self.metric.with_attributes(attributes) {
622            Ok(base) => {
623                base.records(value);
624            }
625            Err(e) => {
626                dbg_panic!("Failed to initialize metric, will drop values: {e:?}",);
627            }
628        }
629    }
630}
631impl HistogramBase for Histogram {
632    fn records(&self, value: u64) {
633        let bound = self.bound_cache.get_or_init(|| {
634            self.metric
635                .with_attributes(&self.attributes)
636                .map(Into::into)
637                .unwrap_or_else(|e| {
638                    dbg_panic!("Failed to initialize metric, will drop values: {e:?}");
639                    Arc::new(NoOpInstrument) as Arc<dyn HistogramBase>
640                })
641        });
642        bound.records(value);
643    }
644}
645impl MetricAttributable<Histogram> for Histogram {
646    fn with_attributes(
647        &self,
648        attributes: &MetricAttributes,
649    ) -> Result<Histogram, Box<dyn std::error::Error>> {
650        Ok(Self {
651            metric: self.metric.clone(),
652            attributes: attributes.clone(),
653            bound_cache: OnceLock::new(),
654        })
655    }
656}
657
658/// Base trait for `f64` histogram implementations.
659pub trait HistogramF64Base: Send + Sync {
660    /// Record an `f64` observation.
661    fn records(&self, value: f64);
662}
663/// An `f64` histogram metric instrument.
664pub type HistogramF64 = LazyBoundMetric<
665    Arc<dyn MetricAttributable<Box<dyn HistogramF64Base>> + Send + Sync>,
666    Arc<dyn HistogramF64Base>,
667>;
668impl HistogramF64 {
669    /// Create a new `f64` histogram from an attributable metric source.
670    pub fn new(
671        inner: Arc<dyn MetricAttributable<Box<dyn HistogramF64Base>> + Send + Sync>,
672    ) -> Self {
673        Self {
674            metric: inner,
675            attributes: MetricAttributes::Empty,
676            bound_cache: OnceLock::new(),
677        }
678    }
679    /// Record an `f64` value with the given attributes.
680    pub fn record(&self, value: f64, attributes: &MetricAttributes) {
681        match self.metric.with_attributes(attributes) {
682            Ok(base) => {
683                base.records(value);
684            }
685            Err(e) => {
686                dbg_panic!("Failed to initialize metric, will drop values: {e:?}",);
687            }
688        }
689    }
690}
691impl HistogramF64Base for HistogramF64 {
692    fn records(&self, value: f64) {
693        let bound = self.bound_cache.get_or_init(|| {
694            self.metric
695                .with_attributes(&self.attributes)
696                .map(Into::into)
697                .unwrap_or_else(|e| {
698                    dbg_panic!("Failed to initialize metric, will drop values: {e:?}");
699                    Arc::new(NoOpInstrument) as Arc<dyn HistogramF64Base>
700                })
701        });
702        bound.records(value);
703    }
704}
705impl MetricAttributable<HistogramF64> for HistogramF64 {
706    fn with_attributes(
707        &self,
708        attributes: &MetricAttributes,
709    ) -> Result<HistogramF64, Box<dyn std::error::Error>> {
710        Ok(Self {
711            metric: self.metric.clone(),
712            attributes: attributes.clone(),
713            bound_cache: OnceLock::new(),
714        })
715    }
716}
717
718/// Base trait for duration histogram implementations.
719pub trait HistogramDurationBase: Send + Sync {
720    /// Record a duration observation.
721    fn records(&self, value: Duration);
722}
723
724/// The lazy-bound inner type for [`HistogramDuration`].
725pub type HistogramDurationImpl = LazyBoundMetric<
726    Arc<dyn MetricAttributable<Box<dyn HistogramDurationBase>> + Send + Sync>,
727    Arc<dyn HistogramDurationBase>,
728>;
729
730/// A histogram that records [`Duration`] values, optionally tracking in memory for heartbeating.
731#[derive(Clone)]
732pub struct HistogramDuration {
733    primary: HistogramDurationImpl,
734    in_memory: Option<HeartbeatMetricType>,
735}
736impl HistogramDuration {
737    /// Create a new duration histogram from an attributable metric source.
738    pub fn new(
739        inner: Arc<dyn MetricAttributable<Box<dyn HistogramDurationBase>> + Send + Sync>,
740    ) -> Self {
741        Self {
742            primary: LazyBoundMetric {
743                metric: inner,
744                attributes: MetricAttributes::Empty,
745                bound_cache: OnceLock::new(),
746            },
747            in_memory: None,
748        }
749    }
750    /// Create a new duration histogram with an additional in-memory tracker for heartbeat reporting.
751    pub fn new_with_in_memory(
752        primary: Arc<dyn MetricAttributable<Box<dyn HistogramDurationBase>> + Send + Sync>,
753        in_memory: HeartbeatMetricType,
754    ) -> Self {
755        Self {
756            primary: LazyBoundMetric {
757                metric: primary,
758                attributes: MetricAttributes::Empty,
759                bound_cache: OnceLock::new(),
760            },
761            in_memory: Some(in_memory),
762        }
763    }
764    /// Record a duration value with the given attributes.
765    pub fn record(&self, value: Duration, attributes: &MetricAttributes) {
766        match self.primary.metric.with_attributes(attributes) {
767            Ok(base) => {
768                base.records(value);
769            }
770            Err(e) => {
771                dbg_panic!("Failed to initialize metric, will drop values: {e:?}",);
772            }
773        }
774
775        if let Some(ref in_mem) = self.in_memory {
776            in_mem.record_histogram_observation();
777        }
778    }
779
780    /// Replace the attributes on the primary metric.
781    pub fn update_attributes(&mut self, new_attributes: MetricAttributes) {
782        self.primary.update_attributes(new_attributes.clone());
783    }
784}
785impl HistogramDurationBase for HistogramDuration {
786    fn records(&self, value: Duration) {
787        let bound = self.primary.bound_cache.get_or_init(|| {
788            self.primary
789                .metric
790                .with_attributes(&self.primary.attributes)
791                .map(Into::into)
792                .unwrap_or_else(|e| {
793                    dbg_panic!("Failed to initialize metric, will drop values: {e:?}");
794                    Arc::new(NoOpInstrument) as Arc<dyn HistogramDurationBase>
795                })
796        });
797        bound.records(value);
798
799        if let Some(ref in_mem) = self.in_memory {
800            in_mem.record_histogram_observation();
801        }
802    }
803}
804impl MetricAttributable<HistogramDuration> for HistogramDuration {
805    fn with_attributes(
806        &self,
807        attributes: &MetricAttributes,
808    ) -> Result<HistogramDuration, Box<dyn std::error::Error>> {
809        let primary = LazyBoundMetric {
810            metric: self.primary.metric.clone(),
811            attributes: attributes.clone(),
812            bound_cache: OnceLock::new(),
813        };
814
815        Ok(HistogramDuration {
816            primary,
817            in_memory: self.in_memory.clone(),
818        })
819    }
820}
821
822/// Base trait for `u64` gauge implementations.
823pub trait GaugeBase: Send + Sync {
824    /// Record a `u64` gauge value.
825    fn records(&self, value: u64);
826}
827
828/// The lazy-bound inner type for [`Gauge`].
829pub type GaugeImpl = LazyBoundMetric<
830    Arc<dyn MetricAttributable<Box<dyn GaugeBase>> + Send + Sync>,
831    Arc<dyn GaugeBase>,
832>;
833
834/// A gauge metric instrument that optionally tracks values in memory for heartbeating.
835#[derive(Clone)]
836pub struct Gauge {
837    primary: GaugeImpl,
838    in_memory: Option<HeartbeatMetricType>,
839}
840impl Gauge {
841    /// Create a new gauge from an attributable metric source.
842    pub fn new(inner: Arc<dyn MetricAttributable<Box<dyn GaugeBase>> + Send + Sync>) -> Self {
843        Self {
844            primary: LazyBoundMetric {
845                metric: inner,
846                attributes: MetricAttributes::Empty,
847                bound_cache: OnceLock::new(),
848            },
849            in_memory: None,
850        }
851    }
852
853    /// Create a new gauge with an additional in-memory tracker for heartbeat reporting.
854    pub fn new_with_in_memory(
855        primary: Arc<dyn MetricAttributable<Box<dyn GaugeBase>> + Send + Sync>,
856        in_memory: HeartbeatMetricType,
857    ) -> Self {
858        Self {
859            primary: LazyBoundMetric {
860                metric: primary,
861                attributes: MetricAttributes::Empty,
862                bound_cache: OnceLock::new(),
863            },
864            in_memory: Some(in_memory),
865        }
866    }
867
868    /// Record a `u64` gauge value with the given attributes.
869    pub fn record(&self, value: u64, attributes: &MetricAttributes) {
870        match self.primary.metric.with_attributes(attributes) {
871            Ok(base) => base.records(value),
872            Err(e) => {
873                dbg_panic!("Failed to initialize primary metric, will drop values: {e:?}");
874            }
875        }
876
877        if let Some(ref in_mem) = self.in_memory {
878            in_mem.record_gauge(value, attributes);
879        }
880    }
881
882    /// Replace the attributes on the primary metric.
883    pub fn update_attributes(&mut self, new_attributes: MetricAttributes) {
884        self.primary.update_attributes(new_attributes.clone());
885    }
886}
887impl GaugeBase for Gauge {
888    fn records(&self, value: u64) {
889        let bound = self.primary.bound_cache.get_or_init(|| {
890            self.primary
891                .metric
892                .with_attributes(&self.primary.attributes)
893                .map(Into::into)
894                .unwrap_or_else(|e| {
895                    dbg_panic!("Failed to initialize primary metric, will drop values: {e:?}");
896                    Arc::new(NoOpInstrument) as Arc<dyn GaugeBase>
897                })
898        });
899        bound.records(value);
900
901        if let Some(ref in_mem) = self.in_memory {
902            in_mem.record_gauge(value, &self.primary.attributes);
903        }
904    }
905}
906impl MetricAttributable<Gauge> for Gauge {
907    fn with_attributes(
908        &self,
909        attributes: &MetricAttributes,
910    ) -> Result<Gauge, Box<dyn std::error::Error>> {
911        let primary = LazyBoundMetric {
912            metric: self.primary.metric.clone(),
913            attributes: attributes.clone(),
914            bound_cache: OnceLock::new(),
915        };
916
917        Ok(Gauge {
918            primary,
919            in_memory: self.in_memory.clone(),
920        })
921    }
922}
923
924/// Base trait for `f64` gauge implementations.
925pub trait GaugeF64Base: Send + Sync {
926    /// Record an `f64` gauge value.
927    fn records(&self, value: f64);
928}
929/// An `f64` gauge metric instrument.
930pub type GaugeF64 = LazyBoundMetric<
931    Arc<dyn MetricAttributable<Box<dyn GaugeF64Base>> + Send + Sync>,
932    Arc<dyn GaugeF64Base>,
933>;
934impl GaugeF64 {
935    /// Create a new `f64` gauge from an attributable metric source.
936    pub fn new(inner: Arc<dyn MetricAttributable<Box<dyn GaugeF64Base>> + Send + Sync>) -> Self {
937        Self {
938            metric: inner,
939            attributes: MetricAttributes::Empty,
940            bound_cache: OnceLock::new(),
941        }
942    }
943    /// Record an `f64` gauge value with the given attributes.
944    pub fn record(&self, value: f64, attributes: &MetricAttributes) {
945        match self.metric.with_attributes(attributes) {
946            Ok(base) => {
947                base.records(value);
948            }
949            Err(e) => {
950                dbg_panic!("Failed to initialize metric, will drop values: {e:?}",);
951            }
952        }
953    }
954}
955impl GaugeF64Base for GaugeF64 {
956    fn records(&self, value: f64) {
957        let bound = self.bound_cache.get_or_init(|| {
958            self.metric
959                .with_attributes(&self.attributes)
960                .map(Into::into)
961                .unwrap_or_else(|e| {
962                    dbg_panic!("Failed to initialize metric, will drop values: {e:?}");
963                    Arc::new(NoOpInstrument) as Arc<dyn GaugeF64Base>
964                })
965        });
966        bound.records(value);
967    }
968}
969impl MetricAttributable<GaugeF64> for GaugeF64 {
970    fn with_attributes(
971        &self,
972        attributes: &MetricAttributes,
973    ) -> Result<GaugeF64, Box<dyn std::error::Error>> {
974        Ok(Self {
975            metric: self.metric.clone(),
976            attributes: attributes.clone(),
977            bound_cache: OnceLock::new(),
978        })
979    }
980}
981
982/// A [`CoreMeter`] implementation that discards all metric recordings.
983#[derive(Debug)]
984pub struct NoOpCoreMeter;
985impl CoreMeter for NoOpCoreMeter {
986    fn new_attributes(&self, attribs: NewAttributes) -> MetricAttributes {
987        MetricAttributes::NoOp(Arc::new(attribs.into()))
988    }
989
990    fn extend_attributes(
991        &self,
992        existing: MetricAttributes,
993        attribs: NewAttributes,
994    ) -> MetricAttributes {
995        if let MetricAttributes::NoOp(labels) = existing {
996            let mut labels = (*labels).clone();
997            labels.extend::<HashMap<String, String>>(attribs.into());
998            MetricAttributes::NoOp(Arc::new(labels))
999        } else {
1000            dbg_panic!("Must use NoOp attributes with a NoOp metric implementation");
1001            existing
1002        }
1003    }
1004
1005    fn counter(&self, _: MetricParameters) -> Counter {
1006        Counter::new(Arc::new(NoOpInstrument))
1007    }
1008
1009    fn histogram(&self, _: MetricParameters) -> Histogram {
1010        Histogram::new(Arc::new(NoOpInstrument))
1011    }
1012
1013    fn histogram_f64(&self, _: MetricParameters) -> HistogramF64 {
1014        HistogramF64::new(Arc::new(NoOpInstrument))
1015    }
1016
1017    fn histogram_duration(&self, _: MetricParameters) -> HistogramDuration {
1018        HistogramDuration::new(Arc::new(NoOpInstrument))
1019    }
1020
1021    fn gauge(&self, _: MetricParameters) -> Gauge {
1022        Gauge::new(Arc::new(NoOpInstrument))
1023    }
1024
1025    fn gauge_f64(&self, _: MetricParameters) -> GaugeF64 {
1026        GaugeF64::new(Arc::new(NoOpInstrument))
1027    }
1028}
1029
1030macro_rules! impl_metric_attributable {
1031    ($base_trait:ident, $rt:ty, $init:expr) => {
1032        impl MetricAttributable<Box<dyn $base_trait>> for $rt {
1033            fn with_attributes(
1034                &self,
1035                _: &MetricAttributes,
1036            ) -> Result<Box<dyn $base_trait>, Box<dyn std::error::Error>> {
1037                Ok(Box::new($init))
1038            }
1039        }
1040    };
1041}
1042
1043/// A no-op metric instrument that discards all recordings.
1044pub struct NoOpInstrument;
1045macro_rules! impl_no_op {
1046    ($base_trait:ident, $value_type:ty) => {
1047        impl_metric_attributable!($base_trait, NoOpInstrument, NoOpInstrument);
1048        impl $base_trait for NoOpInstrument {
1049            fn records(&self, _: $value_type) {}
1050        }
1051    };
1052    ($base_trait:ident) => {
1053        impl_metric_attributable!($base_trait, NoOpInstrument, NoOpInstrument);
1054        impl $base_trait for NoOpInstrument {
1055            fn adds(&self, _: u64) {}
1056        }
1057    };
1058}
1059impl_no_op!(CounterBase);
1060impl_no_op!(HistogramBase, u64);
1061impl_no_op!(HistogramF64Base, f64);
1062impl_no_op!(HistogramDurationBase, Duration);
1063impl_no_op!(GaugeBase, u64);
1064impl_no_op!(GaugeF64Base, f64);
1065
1066#[cfg(test)]
1067mod tests {
1068    use super::*;
1069    use std::{
1070        collections::HashMap,
1071        sync::{
1072            Arc,
1073            atomic::{AtomicU64, Ordering},
1074        },
1075    };
1076
1077    #[test]
1078    fn in_memory_attributes_provide_label_values() {
1079        let meter = NoOpCoreMeter;
1080        let base_attrs = meter.new_attributes(NewAttributes::default());
1081        let attrs = meter.extend_attributes(
1082            base_attrs,
1083            NewAttributes::from(vec![MetricKeyValue::new("poller_type", "workflow_task")]),
1084        );
1085
1086        let value = Arc::new(AtomicU64::new(0));
1087        let mut metrics = HashMap::new();
1088        metrics.insert("workflow_task".to_string(), value.clone());
1089        let heartbeat_metric = HeartbeatMetricType::WithLabel {
1090            label_key: "poller_type".to_string(),
1091            metrics,
1092        };
1093
1094        heartbeat_metric.record_gauge(3, &attrs);
1095
1096        assert_eq!(value.load(Ordering::Relaxed), 3);
1097        assert_eq!(
1098            label_value_from_attributes(&attrs, "poller_type").as_deref(),
1099            Some("workflow_task")
1100        );
1101    }
1102}
1103
1104#[cfg(feature = "otel")]
1105mod otel {
1106    use super::*;
1107    use opentelemetry::{KeyValue, metrics};
1108
1109    #[derive(Clone)]
1110    struct InstrumentWithAttributes<I> {
1111        inner: I,
1112        attributes: MetricAttributes,
1113    }
1114
1115    impl From<MetricKeyValue> for KeyValue {
1116        fn from(kv: MetricKeyValue) -> Self {
1117            KeyValue::new(kv.key, kv.value)
1118        }
1119    }
1120
1121    impl From<MetricValue> for opentelemetry::Value {
1122        fn from(mv: MetricValue) -> Self {
1123            match mv {
1124                MetricValue::String(s) => opentelemetry::Value::String(s.into()),
1125                MetricValue::Int(i) => opentelemetry::Value::I64(i),
1126                MetricValue::Float(f) => opentelemetry::Value::F64(f),
1127                MetricValue::Bool(b) => opentelemetry::Value::Bool(b),
1128            }
1129        }
1130    }
1131
1132    impl MetricAttributable<Box<dyn CounterBase>> for metrics::Counter<u64> {
1133        fn with_attributes(
1134            &self,
1135            attributes: &MetricAttributes,
1136        ) -> Result<Box<dyn CounterBase>, Box<dyn std::error::Error>> {
1137            Ok(Box::new(InstrumentWithAttributes {
1138                inner: self.clone(),
1139                attributes: attributes.clone(),
1140            }))
1141        }
1142    }
1143
1144    impl CounterBase for InstrumentWithAttributes<metrics::Counter<u64>> {
1145        fn adds(&self, value: u64) {
1146            if let MetricAttributes::OTel { kvs } = &self.attributes {
1147                self.inner.add(value, kvs);
1148            } else {
1149                dbg_panic!("Must use OTel attributes with an OTel metric implementation");
1150            }
1151        }
1152    }
1153
1154    impl MetricAttributable<Box<dyn GaugeBase>> for metrics::Gauge<u64> {
1155        fn with_attributes(
1156            &self,
1157            attributes: &MetricAttributes,
1158        ) -> Result<Box<dyn GaugeBase>, Box<dyn std::error::Error>> {
1159            Ok(Box::new(InstrumentWithAttributes {
1160                inner: self.clone(),
1161                attributes: attributes.clone(),
1162            }))
1163        }
1164    }
1165
1166    impl GaugeBase for InstrumentWithAttributes<metrics::Gauge<u64>> {
1167        fn records(&self, value: u64) {
1168            if let MetricAttributes::OTel { kvs } = &self.attributes {
1169                self.inner.record(value, kvs);
1170            } else {
1171                dbg_panic!("Must use OTel attributes with an OTel metric implementation");
1172            }
1173        }
1174    }
1175
1176    impl MetricAttributable<Box<dyn GaugeF64Base>> for metrics::Gauge<f64> {
1177        fn with_attributes(
1178            &self,
1179            attributes: &MetricAttributes,
1180        ) -> Result<Box<dyn GaugeF64Base>, Box<dyn std::error::Error>> {
1181            Ok(Box::new(InstrumentWithAttributes {
1182                inner: self.clone(),
1183                attributes: attributes.clone(),
1184            }))
1185        }
1186    }
1187
1188    impl GaugeF64Base for InstrumentWithAttributes<metrics::Gauge<f64>> {
1189        fn records(&self, value: f64) {
1190            if let MetricAttributes::OTel { kvs } = &self.attributes {
1191                self.inner.record(value, kvs);
1192            } else {
1193                dbg_panic!("Must use OTel attributes with an OTel metric implementation");
1194            }
1195        }
1196    }
1197
1198    impl MetricAttributable<Box<dyn HistogramBase>> for metrics::Histogram<u64> {
1199        fn with_attributes(
1200            &self,
1201            attributes: &MetricAttributes,
1202        ) -> Result<Box<dyn HistogramBase>, Box<dyn std::error::Error>> {
1203            Ok(Box::new(InstrumentWithAttributes {
1204                inner: self.clone(),
1205                attributes: attributes.clone(),
1206            }))
1207        }
1208    }
1209
1210    impl HistogramBase for InstrumentWithAttributes<metrics::Histogram<u64>> {
1211        fn records(&self, value: u64) {
1212            if let MetricAttributes::OTel { kvs } = &self.attributes {
1213                self.inner.record(value, kvs);
1214            } else {
1215                dbg_panic!("Must use OTel attributes with an OTel metric implementation");
1216            }
1217        }
1218    }
1219
1220    impl MetricAttributable<Box<dyn HistogramF64Base>> for metrics::Histogram<f64> {
1221        fn with_attributes(
1222            &self,
1223            attributes: &MetricAttributes,
1224        ) -> Result<Box<dyn HistogramF64Base>, Box<dyn std::error::Error>> {
1225            Ok(Box::new(InstrumentWithAttributes {
1226                inner: self.clone(),
1227                attributes: attributes.clone(),
1228            }))
1229        }
1230    }
1231
1232    impl HistogramF64Base for InstrumentWithAttributes<metrics::Histogram<f64>> {
1233        fn records(&self, value: f64) {
1234            if let MetricAttributes::OTel { kvs } = &self.attributes {
1235                self.inner.record(value, kvs);
1236            } else {
1237                dbg_panic!("Must use OTel attributes with an OTel metric implementation");
1238            }
1239        }
1240    }
1241}
1242
1243/// Maintains a mapping of metric labels->values with a defined ordering, used for Prometheus labels
1244#[derive(Debug, Clone, PartialEq, Default)]
1245pub struct OrderedPromLabelSet {
1246    attributes: BTreeMap<String, MetricValue>,
1247}
1248
1249impl OrderedPromLabelSet {
1250    /// Create an empty label set.
1251    pub const fn new() -> Self {
1252        Self {
1253            attributes: BTreeMap::new(),
1254        }
1255    }
1256    /// Iterate over label keys in sorted order.
1257    pub fn keys_ordered(&self) -> impl Iterator<Item = &str> {
1258        self.attributes.keys().map(|s| s.as_str())
1259    }
1260    /// Return a map of label keys to their string values, suitable for Prometheus.
1261    pub fn as_prom_labels(&self) -> HashMap<&str, String> {
1262        let mut labels = HashMap::new();
1263        for (k, v) in self.attributes.iter() {
1264            labels.insert(k.as_str(), v.to_string());
1265        }
1266        labels
1267    }
1268    /// Insert a key-value pair, replacing dashes with underscores per Prometheus conventions.
1269    pub fn add_kv(&mut self, kv: MetricKeyValue) {
1270        // Replace '-' with '_' per Prom naming requirements
1271        self.attributes.insert(kv.key.replace('-', "_"), kv.value);
1272    }
1273}
1274
1275impl From<NewAttributes> for OrderedPromLabelSet {
1276    fn from(n: NewAttributes) -> Self {
1277        let mut me = Self::default();
1278        for kv in n.attributes {
1279            me.add_kv(kv);
1280        }
1281        me
1282    }
1283}
1284
1285#[derive(Debug, derive_more::Constructor)]
1286pub(crate) struct PrefixedMetricsMeter<CM> {
1287    prefix: String,
1288    meter: CM,
1289}
1290impl<CM: CoreMeter> CoreMeter for PrefixedMetricsMeter<CM> {
1291    fn new_attributes(&self, attribs: NewAttributes) -> MetricAttributes {
1292        self.meter.new_attributes(attribs)
1293    }
1294
1295    fn extend_attributes(
1296        &self,
1297        existing: MetricAttributes,
1298        attribs: NewAttributes,
1299    ) -> MetricAttributes {
1300        self.meter.extend_attributes(existing, attribs)
1301    }
1302
1303    fn counter(&self, mut params: MetricParameters) -> Counter {
1304        params.name = (self.prefix.clone() + &*params.name).into();
1305        self.meter.counter(params)
1306    }
1307
1308    fn histogram(&self, mut params: MetricParameters) -> Histogram {
1309        params.name = (self.prefix.clone() + &*params.name).into();
1310        self.meter.histogram(params)
1311    }
1312
1313    fn histogram_f64(&self, mut params: MetricParameters) -> HistogramF64 {
1314        params.name = (self.prefix.clone() + &*params.name).into();
1315        self.meter.histogram_f64(params)
1316    }
1317
1318    fn histogram_duration(&self, mut params: MetricParameters) -> HistogramDuration {
1319        params.name = (self.prefix.clone() + &*params.name).into();
1320        self.meter.histogram_duration(params)
1321    }
1322
1323    fn gauge(&self, mut params: MetricParameters) -> Gauge {
1324        params.name = (self.prefix.clone() + &*params.name).into();
1325        self.meter.gauge(params)
1326    }
1327
1328    fn gauge_f64(&self, mut params: MetricParameters) -> GaugeF64 {
1329        params.name = (self.prefix.clone() + &*params.name).into();
1330        self.meter.gauge_f64(params)
1331    }
1332}