Skip to main content

temporalio_sdk_core/telemetry/
metrics.rs

1#[cfg(test)]
2use crate::TelemetryInstance;
3use crate::abstractions::dbg_panic;
4
5use std::{
6    collections::HashMap,
7    fmt::{Debug, Display},
8    iter::Iterator,
9    sync::{Arc, atomic::AtomicU64},
10    time::Duration,
11};
12use temporalio_common::{
13    protos::temporal::api::{enums::v1::WorkflowTaskFailedCause, failure::v1::Failure},
14    telemetry::metrics::{core::*, *},
15};
16
17pub(super) const NUM_POLLERS_NAME: &str = "num_pollers";
18pub(super) const TASK_SLOTS_AVAILABLE_NAME: &str = "worker_task_slots_available";
19pub(super) const TASK_SLOTS_USED_NAME: &str = "worker_task_slots_used";
20pub(super) const STICKY_CACHE_SIZE_NAME: &str = "sticky_cache_size";
21
22/// Used to track context associated with metrics, and record/update them
23#[derive(Clone)]
24pub(crate) struct MetricsContext {
25    meter: TemporalMeter,
26    instruments: Arc<Instruments>,
27    in_memory_metrics: Option<Arc<WorkerHeartbeatMetrics>>,
28}
29
30#[derive(Clone)]
31struct Instruments {
32    wf_completed_counter: Counter,
33    wf_canceled_counter: Counter,
34    wf_failed_counter: Counter,
35    wf_cont_counter: Counter,
36    wf_e2e_latency: HistogramDuration,
37    wf_task_queue_poll_empty_counter: Counter,
38    wf_task_queue_poll_succeed_counter: Counter,
39    wf_task_execution_failure_counter: Counter,
40    wf_task_sched_to_start_latency: HistogramDuration,
41    wf_task_replay_latency: HistogramDuration,
42    wf_task_execution_latency: HistogramDuration,
43    act_poll_no_task: Counter,
44    act_task_received_counter: Counter,
45    act_execution_failed: Counter,
46    act_sched_to_start_latency: HistogramDuration,
47    act_exec_latency: HistogramDuration,
48    act_exec_succeeded_latency: HistogramDuration,
49    la_execution_cancelled: Counter,
50    la_execution_failed: Counter,
51    la_exec_latency: HistogramDuration,
52    la_exec_succeeded_latency: HistogramDuration,
53    la_total: Counter,
54    nexus_poll_no_task: Counter,
55    nexus_task_schedule_to_start_latency: HistogramDuration,
56    nexus_task_e2e_latency: HistogramDuration,
57    nexus_task_execution_latency: HistogramDuration,
58    nexus_task_execution_failed: Counter,
59    worker_registered: Counter,
60    num_pollers: Gauge,
61    task_slots_available: Gauge,
62    task_slots_used: Gauge,
63    sticky_cache_hit: Counter,
64    sticky_cache_miss: Counter,
65    sticky_cache_size: Gauge,
66    sticky_cache_forced_evictions: Counter,
67}
68
69impl MetricsContext {
70    pub(crate) fn no_op() -> Self {
71        let meter = TemporalMeter::no_op();
72        let in_memory_metrics = Some(Arc::new(WorkerHeartbeatMetrics::default()));
73        let instruments = Arc::new(Instruments::new(&meter, in_memory_metrics.clone()));
74        Self {
75            instruments,
76            meter,
77            in_memory_metrics,
78        }
79    }
80
81    #[cfg(test)]
82    pub(crate) fn top_level(namespace: String, tq: String, telemetry: &TelemetryInstance) -> Self {
83        MetricsContext::top_level_with_meter(namespace, tq, telemetry.get_temporal_metric_meter())
84    }
85
86    pub(crate) fn top_level_with_meter(
87        namespace: String,
88        tq: String,
89        temporal_meter: Option<TemporalMeter>,
90    ) -> Self {
91        if let Some(mut meter) = temporal_meter {
92            let addtl_attributes = [
93                MetricKeyValue::new(KEY_NAMESPACE, namespace),
94                task_queue(tq),
95            ];
96            meter.merge_attributes(addtl_attributes.into());
97            let in_memory_metrics = Some(Arc::new(WorkerHeartbeatMetrics::default()));
98            let mut instruments = Instruments::new(&meter, in_memory_metrics.clone());
99            instruments.update_attributes(meter.get_default_attributes());
100            Self {
101                instruments: Arc::new(instruments),
102                meter,
103                in_memory_metrics,
104            }
105        } else {
106            Self::no_op()
107        }
108    }
109
110    /// Extend an existing metrics context with new attributes
111    pub(crate) fn with_new_attrs(
112        &self,
113        new_attrs: impl IntoIterator<Item = MetricKeyValue>,
114    ) -> Self {
115        let mut tm = self.meter.clone();
116        tm.merge_attributes(new_attrs.into());
117        let mut instruments = (*self.instruments).clone();
118        instruments.update_attributes(tm.get_default_attributes());
119        Self {
120            instruments: Arc::new(instruments),
121            meter: self.meter.clone(),
122            in_memory_metrics: self.in_memory_metrics.clone(),
123        }
124    }
125
126    pub(crate) fn in_memory_meter(&self) -> Option<Arc<WorkerHeartbeatMetrics>> {
127        self.in_memory_metrics.clone()
128    }
129
130    /// A workflow task queue poll succeeded
131    pub(crate) fn wf_tq_poll_ok(&self) {
132        self.instruments.wf_task_queue_poll_succeed_counter.adds(1);
133    }
134
135    /// A workflow task queue poll timed out / had empty response
136    pub(crate) fn wf_tq_poll_empty(&self) {
137        self.instruments.wf_task_queue_poll_empty_counter.adds(1);
138    }
139
140    /// A workflow task execution failed
141    pub(crate) fn wf_task_failed(&self) {
142        self.instruments.wf_task_execution_failure_counter.adds(1);
143    }
144
145    /// A workflow completed successfully
146    pub(crate) fn wf_completed(&self) {
147        self.instruments.wf_completed_counter.adds(1);
148    }
149
150    /// A workflow ended cancelled
151    pub(crate) fn wf_canceled(&self) {
152        self.instruments.wf_canceled_counter.adds(1);
153    }
154
155    /// A workflow ended failed
156    pub(crate) fn wf_failed(&self) {
157        self.instruments.wf_failed_counter.adds(1);
158    }
159
160    /// A workflow continued as new
161    pub(crate) fn wf_continued_as_new(&self) {
162        self.instruments.wf_cont_counter.adds(1);
163    }
164
165    /// Record workflow total execution time in milliseconds
166    pub(crate) fn wf_e2e_latency(&self, dur: Duration) {
167        self.instruments.wf_e2e_latency.records(dur);
168    }
169
170    /// Record workflow task schedule to start time in millis
171    pub(crate) fn wf_task_sched_to_start_latency(&self, dur: Duration) {
172        self.instruments.wf_task_sched_to_start_latency.records(dur);
173    }
174
175    /// Record workflow task execution time in milliseconds
176    pub(crate) fn wf_task_latency(&self, dur: Duration) {
177        self.instruments.wf_task_execution_latency.records(dur);
178    }
179
180    /// Record time it takes to catch up on replaying a WFT
181    pub(crate) fn wf_task_replay_latency(&self, dur: Duration) {
182        self.instruments.wf_task_replay_latency.records(dur);
183    }
184
185    /// An activity long poll timed out
186    pub(crate) fn act_poll_timeout(&self) {
187        self.instruments.act_poll_no_task.adds(1);
188    }
189
190    /// A count of activity tasks received
191    pub(crate) fn act_task_received(&self) {
192        self.instruments.act_task_received_counter.adds(1);
193    }
194
195    /// An activity execution failed
196    pub(crate) fn act_execution_failed(&self) {
197        self.instruments.act_execution_failed.adds(1);
198    }
199
200    /// Record end-to-end (sched-to-complete) time for successful activity executions
201    pub(crate) fn act_execution_succeeded(&self, dur: Duration) {
202        self.instruments.act_exec_succeeded_latency.records(dur);
203    }
204
205    /// Record activity task schedule to start time in millis
206    pub(crate) fn act_sched_to_start_latency(&self, dur: Duration) {
207        self.instruments.act_sched_to_start_latency.records(dur);
208    }
209
210    /// Record time it took to complete activity execution, from the time core generated the
211    /// activity task, to the time lang responded with a completion (failure or success).
212    pub(crate) fn act_execution_latency(&self, dur: Duration) {
213        self.instruments.act_exec_latency.records(dur);
214    }
215
216    pub(crate) fn la_execution_cancelled(&self) {
217        self.instruments.la_execution_cancelled.adds(1);
218    }
219
220    pub(crate) fn la_execution_failed(&self) {
221        self.instruments.la_execution_failed.adds(1);
222    }
223
224    pub(crate) fn la_exec_latency(&self, dur: Duration) {
225        self.instruments.la_exec_latency.records(dur);
226    }
227
228    pub(crate) fn la_exec_succeeded_latency(&self, dur: Duration) {
229        self.instruments.la_exec_succeeded_latency.records(dur);
230    }
231
232    pub(crate) fn la_executed(&self) {
233        self.instruments.la_total.adds(1);
234    }
235
236    /// A nexus long poll timed out
237    pub(crate) fn nexus_poll_timeout(&self) {
238        self.instruments.nexus_poll_no_task.adds(1);
239    }
240
241    /// Record nexus task schedule to start time
242    pub(crate) fn nexus_task_sched_to_start_latency(&self, dur: Duration) {
243        self.instruments
244            .nexus_task_schedule_to_start_latency
245            .records(dur);
246    }
247
248    /// Record nexus task end-to-end time
249    pub(crate) fn nexus_task_e2e_latency(&self, dur: Duration) {
250        self.instruments.nexus_task_e2e_latency.records(dur);
251    }
252
253    /// Record nexus task execution time
254    pub(crate) fn nexus_task_execution_latency(&self, dur: Duration) {
255        self.instruments.nexus_task_execution_latency.records(dur);
256    }
257
258    /// Record a nexus task execution failure
259    pub(crate) fn nexus_task_execution_failed(&self) {
260        self.instruments.nexus_task_execution_failed.adds(1);
261    }
262
263    /// A worker was registered
264    pub(crate) fn worker_registered(&self) {
265        self.instruments.worker_registered.adds(1);
266    }
267
268    /// Record current number of available task slots. Context should have worker type set.
269    pub(crate) fn available_task_slots(&self, num: usize) {
270        self.instruments.task_slots_available.records(num as u64)
271    }
272
273    /// Record current number of used task slots. Context should have worker type set.
274    pub(crate) fn task_slots_used(&self, num: u64) {
275        self.instruments.task_slots_used.records(num)
276    }
277
278    /// Record current number of pollers. Context should include poller type / task queue tag.
279    pub(crate) fn record_num_pollers(&self, num: usize) {
280        self.instruments.num_pollers.records(num as u64);
281    }
282
283    /// A workflow task found a cached workflow to run against
284    pub(crate) fn sticky_cache_hit(&self) {
285        self.instruments.sticky_cache_hit.adds(1);
286    }
287
288    /// A workflow task did not find a cached workflow
289    pub(crate) fn sticky_cache_miss(&self) {
290        self.instruments.sticky_cache_miss.adds(1);
291    }
292
293    /// Record current cache size (in number of wfs, not bytes)
294    pub(crate) fn cache_size(&self, size: u64) {
295        self.instruments.sticky_cache_size.records(size);
296    }
297
298    /// Count a workflow being evicted from the cache
299    pub(crate) fn forced_cache_eviction(&self) {
300        self.instruments.sticky_cache_forced_evictions.adds(1);
301    }
302}
303
304impl Instruments {
305    fn new(meter: &TemporalMeter, in_memory: Option<Arc<WorkerHeartbeatMetrics>>) -> Self {
306        let counter_with_in_mem = |params: MetricParameters| -> Counter {
307            in_memory
308                .clone()
309                .and_then(|in_mem| in_mem.get_metric(&params.name))
310                .map(|metric| meter.counter_with_in_memory(params.clone(), metric))
311                .unwrap_or_else(|| meter.counter(params))
312        };
313
314        let gauge_with_in_mem = |params: MetricParameters| -> Gauge {
315            in_memory
316                .clone()
317                .and_then(|in_mem| in_mem.get_metric(&params.name))
318                .map(|metric| meter.gauge_with_in_memory(params.clone(), metric))
319                .unwrap_or_else(|| meter.gauge(params))
320        };
321
322        let histogram_with_in_mem = |params: MetricParameters| -> HistogramDuration {
323            in_memory
324                .clone()
325                .and_then(|in_mem| in_mem.get_metric(&params.name))
326                .map(|metric| meter.histogram_duration_with_in_memory(params.clone(), metric))
327                .unwrap_or_else(|| meter.histogram_duration(params))
328        };
329
330        Self {
331            wf_completed_counter: meter.counter(MetricParameters {
332                name: "workflow_completed".into(),
333                description: "Count of successfully completed workflows".into(),
334                unit: "".into(),
335            }),
336            wf_canceled_counter: meter.counter(MetricParameters {
337                name: "workflow_canceled".into(),
338                description: "Count of canceled workflows".into(),
339                unit: "".into(),
340            }),
341            wf_failed_counter: meter.counter(MetricParameters {
342                name: "workflow_failed".into(),
343                description: "Count of failed workflows".into(),
344                unit: "".into(),
345            }),
346            wf_cont_counter: meter.counter(MetricParameters {
347                name: "workflow_continue_as_new".into(),
348                description: "Count of continued-as-new workflows".into(),
349                unit: "".into(),
350            }),
351            wf_e2e_latency: meter.histogram_duration(MetricParameters {
352                name: WORKFLOW_E2E_LATENCY_HISTOGRAM_NAME.into(),
353                unit: "duration".into(),
354                description: "Histogram of total workflow execution latencies".into(),
355            }),
356            wf_task_queue_poll_empty_counter: meter.counter(MetricParameters {
357                name: "workflow_task_queue_poll_empty".into(),
358                description: "Count of workflow task queue poll timeouts (no new task)".into(),
359                unit: "".into(),
360            }),
361            wf_task_queue_poll_succeed_counter: counter_with_in_mem(MetricParameters {
362                name: "workflow_task_queue_poll_succeed".into(),
363                description: "Count of workflow task queue poll successes".into(),
364                unit: "".into(),
365            }),
366            wf_task_execution_failure_counter: counter_with_in_mem(MetricParameters {
367                name: "workflow_task_execution_failed".into(),
368                description: "Count of workflow task execution failures".into(),
369                unit: "".into(),
370            }),
371            wf_task_sched_to_start_latency: meter.histogram_duration(MetricParameters {
372                name: WORKFLOW_TASK_SCHED_TO_START_LATENCY_HISTOGRAM_NAME.into(),
373                unit: "duration".into(),
374                description: "Histogram of workflow task schedule-to-start latencies".into(),
375            }),
376            wf_task_replay_latency: meter.histogram_duration(MetricParameters {
377                name: WORKFLOW_TASK_REPLAY_LATENCY_HISTOGRAM_NAME.into(),
378                unit: "duration".into(),
379                description: "Histogram of workflow task replay latencies".into(),
380            }),
381            wf_task_execution_latency: histogram_with_in_mem(MetricParameters {
382                name: WORKFLOW_TASK_EXECUTION_LATENCY_HISTOGRAM_NAME.into(),
383                unit: "duration".into(),
384                description: "Histogram of workflow task execution (not replay) latencies".into(),
385            }),
386            act_poll_no_task: meter.counter(MetricParameters {
387                name: "activity_poll_no_task".into(),
388                description: "Count of activity task queue poll timeouts (no new task)".into(),
389                unit: "".into(),
390            }),
391            act_task_received_counter: counter_with_in_mem(MetricParameters {
392                name: "activity_task_received".into(),
393                description: "Count of activity task queue poll successes".into(),
394                unit: "".into(),
395            }),
396            act_execution_failed: counter_with_in_mem(MetricParameters {
397                name: "activity_execution_failed".into(),
398                description: "Count of activity task execution failures".into(),
399                unit: "".into(),
400            }),
401            act_sched_to_start_latency: meter.histogram_duration(MetricParameters {
402                name: ACTIVITY_SCHED_TO_START_LATENCY_HISTOGRAM_NAME.into(),
403                unit: "duration".into(),
404                description: "Histogram of activity schedule-to-start latencies".into(),
405            }),
406            act_exec_latency: histogram_with_in_mem(MetricParameters {
407                name: ACTIVITY_EXEC_LATENCY_HISTOGRAM_NAME.into(),
408                unit: "duration".into(),
409                description: "Histogram of activity execution latencies".into(),
410            }),
411            act_exec_succeeded_latency: meter.histogram_duration(MetricParameters {
412                name: "activity_succeed_endtoend_latency".into(),
413                unit: "duration".into(),
414                description: "Histogram of activity execution latencies for successful activities"
415                    .into(),
416            }),
417            la_execution_cancelled: meter.counter(MetricParameters {
418                name: "local_activity_execution_cancelled".into(),
419                description: "Count of local activity executions that were cancelled".into(),
420                unit: "".into(),
421            }),
422            la_execution_failed: meter.counter(MetricParameters {
423                name: "local_activity_execution_failed".into(),
424                description: "Count of local activity executions that failed".into(),
425                unit: "".into(),
426            }),
427            la_exec_latency: histogram_with_in_mem(MetricParameters {
428                name: "local_activity_execution_latency".into(),
429                unit: "duration".into(),
430                description: "Histogram of local activity execution latencies".into(),
431            }),
432            la_exec_succeeded_latency: meter.histogram_duration(MetricParameters {
433                name: "local_activity_succeed_endtoend_latency".into(),
434                unit: "duration".into(),
435                description:
436                    "Histogram of local activity execution latencies for successful local activities"
437                        .into(),
438            }),
439            la_total: counter_with_in_mem(MetricParameters {
440                name: "local_activity_total".into(),
441                description: "Count of local activities executed".into(),
442                unit: "".into(),
443            }),
444            nexus_poll_no_task: meter.counter(MetricParameters {
445                name: "nexus_poll_no_task".into(),
446                description: "Count of nexus task queue poll timeouts (no new task)".into(),
447                unit: "".into(),
448            }),
449            nexus_task_schedule_to_start_latency: meter.histogram_duration(MetricParameters {
450                name: "nexus_task_schedule_to_start_latency".into(),
451                unit: "duration".into(),
452                description: "Histogram of nexus task schedule-to-start latencies".into(),
453            }),
454            nexus_task_e2e_latency: meter.histogram_duration(MetricParameters {
455                name: "nexus_task_endtoend_latency".into(),
456                unit: "duration".into(),
457                description: "Histogram of nexus task end-to-end latencies".into(),
458            }),
459            nexus_task_execution_latency: histogram_with_in_mem(MetricParameters {
460                name: "nexus_task_execution_latency".into(),
461                unit: "duration".into(),
462                description: "Histogram of nexus task execution latencies".into(),
463            }),
464            nexus_task_execution_failed: counter_with_in_mem(MetricParameters {
465                name: "nexus_task_execution_failed".into(),
466                description: "Count of nexus task execution failures".into(),
467                unit: "".into(),
468            }),
469            // name kept as worker start for compat with old sdk / what users expect
470            worker_registered: meter.counter(MetricParameters {
471                name: "worker_start".into(),
472                description: "Count of the number of initialized workers".into(),
473                unit: "".into(),
474            }),
475            num_pollers: gauge_with_in_mem(MetricParameters {
476                name: NUM_POLLERS_NAME.into(),
477                description: "Current number of active pollers per queue type".into(),
478                unit: "".into(),
479            }),
480            task_slots_available: gauge_with_in_mem(MetricParameters {
481                name: TASK_SLOTS_AVAILABLE_NAME.into(),
482                description: "Current number of available slots per task type".into(),
483                unit: "".into(),
484            }),
485            task_slots_used: gauge_with_in_mem(MetricParameters {
486                name: TASK_SLOTS_USED_NAME.into(),
487                description: "Current number of used slots per task type".into(),
488                unit: "".into(),
489            }),
490            sticky_cache_hit: counter_with_in_mem(MetricParameters {
491                name: "sticky_cache_hit".into(),
492                description: "Count of times the workflow cache was used for a new workflow task"
493                    .into(),
494                unit: "".into(),
495            }),
496            sticky_cache_miss: counter_with_in_mem(MetricParameters {
497                name: "sticky_cache_miss".into(),
498                description:
499                "Count of times the workflow cache was missing a workflow for a sticky task".into(),
500                unit: "".into(),
501            }),
502            sticky_cache_size: gauge_with_in_mem(MetricParameters {
503                name: STICKY_CACHE_SIZE_NAME.into(),
504                description: "Current number of cached workflows".into(),
505                unit: "".into(),
506            }),
507            sticky_cache_forced_evictions: meter.counter(MetricParameters {
508                name: "sticky_cache_total_forced_eviction".into(),
509                description: "Count of evictions of cached workflows".into(),
510                unit: "".into(),
511            }),
512        }
513    }
514
515    fn update_attributes(&mut self, new_attributes: &MetricAttributes) {
516        self.wf_completed_counter
517            .update_attributes(new_attributes.clone());
518        self.wf_canceled_counter
519            .update_attributes(new_attributes.clone());
520        self.wf_failed_counter
521            .update_attributes(new_attributes.clone());
522        self.wf_cont_counter
523            .update_attributes(new_attributes.clone());
524        self.wf_e2e_latency
525            .update_attributes(new_attributes.clone());
526        self.wf_task_queue_poll_empty_counter
527            .update_attributes(new_attributes.clone());
528        self.wf_task_queue_poll_succeed_counter
529            .update_attributes(new_attributes.clone());
530        self.wf_task_execution_failure_counter
531            .update_attributes(new_attributes.clone());
532        self.wf_task_sched_to_start_latency
533            .update_attributes(new_attributes.clone());
534        self.wf_task_replay_latency
535            .update_attributes(new_attributes.clone());
536        self.wf_task_execution_latency
537            .update_attributes(new_attributes.clone());
538        self.act_poll_no_task
539            .update_attributes(new_attributes.clone());
540        self.act_task_received_counter
541            .update_attributes(new_attributes.clone());
542        self.act_execution_failed
543            .update_attributes(new_attributes.clone());
544        self.act_sched_to_start_latency
545            .update_attributes(new_attributes.clone());
546        self.act_exec_latency
547            .update_attributes(new_attributes.clone());
548        self.act_exec_succeeded_latency
549            .update_attributes(new_attributes.clone());
550        self.la_execution_cancelled
551            .update_attributes(new_attributes.clone());
552        self.la_execution_failed
553            .update_attributes(new_attributes.clone());
554        self.la_exec_latency
555            .update_attributes(new_attributes.clone());
556        self.la_exec_succeeded_latency
557            .update_attributes(new_attributes.clone());
558        self.la_total.update_attributes(new_attributes.clone());
559        self.nexus_poll_no_task
560            .update_attributes(new_attributes.clone());
561        self.nexus_task_schedule_to_start_latency
562            .update_attributes(new_attributes.clone());
563        self.nexus_task_e2e_latency
564            .update_attributes(new_attributes.clone());
565        self.nexus_task_execution_latency
566            .update_attributes(new_attributes.clone());
567        self.nexus_task_execution_failed
568            .update_attributes(new_attributes.clone());
569        self.worker_registered
570            .update_attributes(new_attributes.clone());
571        self.num_pollers.update_attributes(new_attributes.clone());
572        self.task_slots_available
573            .update_attributes(new_attributes.clone());
574        self.task_slots_used
575            .update_attributes(new_attributes.clone());
576        self.sticky_cache_hit
577            .update_attributes(new_attributes.clone());
578        self.sticky_cache_miss
579            .update_attributes(new_attributes.clone());
580        self.sticky_cache_size
581            .update_attributes(new_attributes.clone());
582        self.sticky_cache_forced_evictions
583            .update_attributes(new_attributes.clone());
584    }
585}
586
587#[derive(Default, Debug)]
588pub(crate) struct NumPollersMetric {
589    pub wft_current_pollers: Arc<AtomicU64>,
590    pub sticky_wft_current_pollers: Arc<AtomicU64>,
591    pub activity_current_pollers: Arc<AtomicU64>,
592    pub nexus_current_pollers: Arc<AtomicU64>,
593}
594
595impl NumPollersMetric {
596    pub(crate) fn as_map(&self) -> HashMap<String, Arc<AtomicU64>> {
597        HashMap::from([
598            (
599                "workflow_task".to_string(),
600                self.wft_current_pollers.clone(),
601            ),
602            (
603                "sticky_workflow_task".to_string(),
604                self.sticky_wft_current_pollers.clone(),
605            ),
606            (
607                "activity_task".to_string(),
608                self.activity_current_pollers.clone(),
609            ),
610            ("nexus_task".to_string(), self.nexus_current_pollers.clone()),
611        ])
612    }
613}
614
615#[derive(Default, Debug)]
616pub(crate) struct SlotMetrics {
617    pub workflow_worker: Arc<AtomicU64>,
618    pub activity_worker: Arc<AtomicU64>,
619    pub nexus_worker: Arc<AtomicU64>,
620    pub local_activity_worker: Arc<AtomicU64>,
621}
622
623impl SlotMetrics {
624    pub(crate) fn as_map(&self) -> HashMap<String, Arc<AtomicU64>> {
625        HashMap::from([
626            ("WorkflowWorker".to_string(), self.workflow_worker.clone()),
627            ("ActivityWorker".to_string(), self.activity_worker.clone()),
628            ("NexusWorker".to_string(), self.nexus_worker.clone()),
629            (
630                "LocalActivityWorker".to_string(),
631                self.local_activity_worker.clone(),
632            ),
633        ])
634    }
635}
636
637#[derive(Default, Debug)]
638pub(crate) struct WorkerHeartbeatMetrics {
639    pub sticky_cache_size: Arc<AtomicU64>,
640    pub total_sticky_cache_hit: Arc<AtomicU64>,
641    pub total_sticky_cache_miss: Arc<AtomicU64>,
642    pub num_pollers: NumPollersMetric,
643    pub worker_task_slots_used: SlotMetrics,
644    pub worker_task_slots_available: SlotMetrics,
645    pub workflow_task_execution_failed: Arc<AtomicU64>,
646    pub activity_execution_failed: Arc<AtomicU64>,
647    pub nexus_task_execution_failed: Arc<AtomicU64>,
648    pub local_activity_execution_failed: Arc<AtomicU64>,
649    // Although latency metrics here are histograms, we are using the number of times they're called
650    // to represent the `total_processed_tasks` heartbeat field
651    pub activity_execution_latency: Arc<AtomicU64>,
652    pub local_activity_execution_latency: Arc<AtomicU64>,
653    pub workflow_task_execution_latency: Arc<AtomicU64>,
654    pub nexus_task_execution_latency: Arc<AtomicU64>,
655}
656
657impl WorkerHeartbeatMetrics {
658    pub(crate) fn get_metric(&self, name: &str) -> Option<HeartbeatMetricType> {
659        match name {
660            "sticky_cache_size" => Some(HeartbeatMetricType::Individual(
661                self.sticky_cache_size.clone(),
662            )),
663            "sticky_cache_hit" => Some(HeartbeatMetricType::Individual(
664                self.total_sticky_cache_hit.clone(),
665            )),
666            "sticky_cache_miss" => Some(HeartbeatMetricType::Individual(
667                self.total_sticky_cache_miss.clone(),
668            )),
669            "num_pollers" => Some(HeartbeatMetricType::WithLabel {
670                label_key: "poller_type".to_string(),
671                metrics: self.num_pollers.as_map(),
672            }),
673            "worker_task_slots_used" => Some(HeartbeatMetricType::WithLabel {
674                label_key: "worker_type".to_string(),
675                metrics: self.worker_task_slots_used.as_map(),
676            }),
677            "worker_task_slots_available" => Some(HeartbeatMetricType::WithLabel {
678                label_key: "worker_type".to_string(),
679                metrics: self.worker_task_slots_available.as_map(),
680            }),
681            "workflow_task_execution_failed" => Some(HeartbeatMetricType::Individual(
682                self.workflow_task_execution_failed.clone(),
683            )),
684            "activity_execution_failed" => Some(HeartbeatMetricType::Individual(
685                self.activity_execution_failed.clone(),
686            )),
687            "nexus_task_execution_failed" => Some(HeartbeatMetricType::Individual(
688                self.nexus_task_execution_failed.clone(),
689            )),
690            "local_activity_execution_failed" => Some(HeartbeatMetricType::Individual(
691                self.local_activity_execution_failed.clone(),
692            )),
693            "activity_execution_latency" => Some(HeartbeatMetricType::Individual(
694                self.activity_execution_latency.clone(),
695            )),
696            "local_activity_execution_latency" => Some(HeartbeatMetricType::Individual(
697                self.local_activity_execution_latency.clone(),
698            )),
699            "workflow_task_execution_latency" => Some(HeartbeatMetricType::Individual(
700                self.workflow_task_execution_latency.clone(),
701            )),
702            "nexus_task_execution_latency" => Some(HeartbeatMetricType::Individual(
703                self.nexus_task_execution_latency.clone(),
704            )),
705            _ => None,
706        }
707    }
708}
709
710const KEY_NAMESPACE: &str = "namespace";
711const KEY_WF_TYPE: &str = "workflow_type";
712const KEY_TASK_QUEUE: &str = "task_queue";
713const KEY_ACT_TYPE: &str = "activity_type";
714const KEY_POLLER_TYPE: &str = "poller_type";
715const KEY_WORKER_TYPE: &str = "worker_type";
716const KEY_EAGER: &str = "eager";
717const KEY_TASK_FAILURE_TYPE: &str = "failure_reason";
718
719pub(crate) fn workflow_poller() -> MetricKeyValue {
720    MetricKeyValue::new(KEY_POLLER_TYPE, "workflow_task")
721}
722pub(crate) fn workflow_sticky_poller() -> MetricKeyValue {
723    MetricKeyValue::new(KEY_POLLER_TYPE, "sticky_workflow_task")
724}
725pub(crate) fn activity_poller() -> MetricKeyValue {
726    MetricKeyValue::new(KEY_POLLER_TYPE, "activity_task")
727}
728pub(crate) fn nexus_poller() -> MetricKeyValue {
729    MetricKeyValue::new(KEY_POLLER_TYPE, "nexus_task")
730}
731pub(crate) fn task_queue(tq: String) -> MetricKeyValue {
732    MetricKeyValue::new(KEY_TASK_QUEUE, tq)
733}
734pub(crate) fn activity_type(ty: String) -> MetricKeyValue {
735    MetricKeyValue::new(KEY_ACT_TYPE, ty)
736}
737pub(crate) fn workflow_type(ty: String) -> MetricKeyValue {
738    MetricKeyValue::new(KEY_WF_TYPE, ty)
739}
740pub(crate) fn workflow_worker_type() -> MetricKeyValue {
741    MetricKeyValue::new(KEY_WORKER_TYPE, "WorkflowWorker")
742}
743pub(crate) fn activity_worker_type() -> MetricKeyValue {
744    MetricKeyValue::new(KEY_WORKER_TYPE, "ActivityWorker")
745}
746pub(crate) fn local_activity_worker_type() -> MetricKeyValue {
747    MetricKeyValue::new(KEY_WORKER_TYPE, "LocalActivityWorker")
748}
749pub(crate) fn nexus_worker_type() -> MetricKeyValue {
750    MetricKeyValue::new(KEY_WORKER_TYPE, "NexusWorker")
751}
752pub(crate) fn eager(is_eager: bool) -> MetricKeyValue {
753    MetricKeyValue::new(KEY_EAGER, is_eager)
754}
755pub(crate) enum FailureReason {
756    Nondeterminism,
757    Workflow,
758    Timeout,
759    NexusOperation(String),
760    NexusHandlerError(String),
761    GrpcMessageTooLarge,
762}
763impl Display for FailureReason {
764    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
765        let str = match self {
766            FailureReason::Nondeterminism => "NonDeterminismError".to_owned(),
767            FailureReason::Workflow => "WorkflowError".to_owned(),
768            FailureReason::Timeout => "timeout".to_owned(),
769            FailureReason::NexusOperation(op) => format!("operation_{op}"),
770            FailureReason::NexusHandlerError(op) => format!("handler_error_{op}"),
771            FailureReason::GrpcMessageTooLarge => "GrpcMessageTooLarge".to_owned(),
772        };
773        write!(f, "{str}")
774    }
775}
776impl From<WorkflowTaskFailedCause> for FailureReason {
777    fn from(v: WorkflowTaskFailedCause) -> Self {
778        match v {
779            WorkflowTaskFailedCause::NonDeterministicError => FailureReason::Nondeterminism,
780            _ => FailureReason::Workflow,
781        }
782    }
783}
784pub(crate) fn failure_reason(reason: FailureReason) -> MetricKeyValue {
785    MetricKeyValue::new(KEY_TASK_FAILURE_TYPE, reason.to_string())
786}
787
788/// Track a failure metric if the failure is not a benign application failure.
789pub(crate) fn should_record_failure_metric(failure: &Option<Failure>) -> bool {
790    !failure
791        .as_ref()
792        .is_some_and(|f| f.is_benign_application_failure())
793}
794
795/// Buffers [MetricEvent]s for periodic consumption by lang
796#[derive(Debug)]
797pub struct MetricsCallBuffer<I>
798where
799    I: BufferInstrumentRef,
800{
801    calls_rx: crossbeam_channel::Receiver<MetricEvent<I>>,
802    calls_tx: LogErrOnFullSender<MetricEvent<I>>,
803}
804#[derive(Clone, Debug)]
805struct LogErrOnFullSender<I>(crossbeam_channel::Sender<I>);
806impl<I> LogErrOnFullSender<I> {
807    fn send(&self, v: I) {
808        if let Err(crossbeam_channel::TrySendError::Full(_)) = self.0.try_send(v) {
809            error!(
810                "Core's metrics buffer is full! Dropping call to record metrics. \
811                 Make sure you drain the metric buffer often!"
812            );
813        }
814    }
815}
816
817impl<I> MetricsCallBuffer<I>
818where
819    I: Clone + BufferInstrumentRef,
820{
821    /// Create a new buffer with the given capacity
822    pub fn new(buffer_size: usize) -> Self {
823        let (calls_tx, calls_rx) = crossbeam_channel::bounded(buffer_size);
824        MetricsCallBuffer {
825            calls_rx,
826            calls_tx: LogErrOnFullSender(calls_tx),
827        }
828    }
829    fn new_instrument(&self, params: MetricParameters, kind: MetricKind) -> BufferInstrument<I> {
830        let hole = LazyBufferInstrument::hole();
831        self.calls_tx.send(MetricEvent::Create {
832            params,
833            kind,
834            populate_into: hole.clone(),
835        });
836        BufferInstrument {
837            instrument_ref: hole,
838            tx: self.calls_tx.clone(),
839        }
840    }
841}
842
843impl<I> CoreMeter for MetricsCallBuffer<I>
844where
845    I: BufferInstrumentRef + Debug + Send + Sync + Clone + 'static,
846{
847    fn new_attributes(&self, opts: NewAttributes) -> MetricAttributes {
848        let ba = BufferAttributes::hole();
849        self.calls_tx.send(MetricEvent::CreateAttributes {
850            populate_into: ba.clone(),
851            append_from: None,
852            attributes: opts.attributes,
853        });
854        MetricAttributes::Buffer(ba)
855    }
856
857    fn extend_attributes(
858        &self,
859        existing: MetricAttributes,
860        attribs: NewAttributes,
861    ) -> MetricAttributes {
862        if let MetricAttributes::Buffer(ol) = existing {
863            let ba = BufferAttributes::hole();
864            self.calls_tx.send(MetricEvent::CreateAttributes {
865                populate_into: ba.clone(),
866                append_from: Some(ol),
867                attributes: attribs.attributes,
868            });
869            MetricAttributes::Buffer(ba)
870        } else {
871            dbg_panic!("Must use buffer attributes with a buffer metric implementation");
872            existing
873        }
874    }
875
876    fn counter(&self, params: MetricParameters) -> Counter {
877        Counter::new(Arc::new(self.new_instrument(params, MetricKind::Counter)))
878    }
879
880    fn histogram(&self, params: MetricParameters) -> Histogram {
881        Histogram::new(Arc::new(self.new_instrument(params, MetricKind::Histogram)))
882    }
883
884    fn histogram_f64(&self, params: MetricParameters) -> HistogramF64 {
885        HistogramF64::new(Arc::new(
886            self.new_instrument(params, MetricKind::HistogramF64),
887        ))
888    }
889
890    fn histogram_duration(&self, params: MetricParameters) -> HistogramDuration {
891        HistogramDuration::new(Arc::new(
892            self.new_instrument(params, MetricKind::HistogramDuration),
893        ))
894    }
895
896    fn gauge(&self, params: MetricParameters) -> Gauge {
897        Gauge::new(Arc::new(self.new_instrument(params, MetricKind::Gauge)))
898    }
899
900    fn gauge_f64(&self, params: MetricParameters) -> GaugeF64 {
901        GaugeF64::new(Arc::new(self.new_instrument(params, MetricKind::GaugeF64)))
902    }
903
904    fn up_down_counter(&self, params: MetricParameters) -> UpDownCounter {
905        UpDownCounter::new(Arc::new(
906            self.new_instrument(params, MetricKind::UpDownCounter),
907        ))
908    }
909}
910impl<I> MetricCallBufferer<I> for MetricsCallBuffer<I>
911where
912    I: Send + Sync + BufferInstrumentRef,
913{
914    fn retrieve(&self) -> Vec<MetricEvent<I>> {
915        self.calls_rx.try_iter().collect()
916    }
917}
918
919#[derive(Clone)]
920struct BufferInstrument<I: BufferInstrumentRef> {
921    instrument_ref: LazyBufferInstrument<I>,
922    tx: LogErrOnFullSender<MetricEvent<I>>,
923}
924impl<I> BufferInstrument<I>
925where
926    I: Clone + BufferInstrumentRef,
927{
928    fn send(&self, value: MetricUpdateVal, attributes: &MetricAttributes) {
929        let attributes = match attributes {
930            MetricAttributes::Buffer(l) => l.clone(),
931            e => panic!(
932                "MetricsCallBuffer only works with MetricAttributes::Buffer, but used: {:?}",
933                e
934            ),
935        };
936        self.tx.send(MetricEvent::Update {
937            instrument: self.instrument_ref.clone(),
938            update: value,
939            attributes: attributes.clone(),
940        });
941    }
942}
943
944#[derive(Clone)]
945struct InstrumentWithAttributes<I> {
946    inner: I,
947    attributes: MetricAttributes,
948}
949
950impl<I> MetricAttributable<Box<dyn CounterBase>> for BufferInstrument<I>
951where
952    I: BufferInstrumentRef + Send + Sync + Clone + 'static,
953{
954    fn with_attributes(
955        &self,
956        attributes: &MetricAttributes,
957    ) -> Result<Box<dyn CounterBase>, Box<dyn std::error::Error>> {
958        Ok(Box::new(InstrumentWithAttributes {
959            inner: self.clone(),
960            attributes: attributes.clone(),
961        }))
962    }
963}
964
965impl<I> MetricAttributable<Box<dyn HistogramBase>> for BufferInstrument<I>
966where
967    I: BufferInstrumentRef + Send + Sync + Clone + 'static,
968{
969    fn with_attributes(
970        &self,
971        attributes: &MetricAttributes,
972    ) -> Result<Box<dyn HistogramBase>, Box<dyn std::error::Error>> {
973        Ok(Box::new(InstrumentWithAttributes {
974            inner: self.clone(),
975            attributes: attributes.clone(),
976        }))
977    }
978}
979
980impl<I> MetricAttributable<Box<dyn HistogramF64Base>> for BufferInstrument<I>
981where
982    I: BufferInstrumentRef + Send + Sync + Clone + 'static,
983{
984    fn with_attributes(
985        &self,
986        attributes: &MetricAttributes,
987    ) -> Result<Box<dyn HistogramF64Base>, Box<dyn std::error::Error>> {
988        Ok(Box::new(InstrumentWithAttributes {
989            inner: self.clone(),
990            attributes: attributes.clone(),
991        }))
992    }
993}
994
995impl<I> MetricAttributable<Box<dyn HistogramDurationBase>> for BufferInstrument<I>
996where
997    I: BufferInstrumentRef + Send + Sync + Clone + 'static,
998{
999    fn with_attributes(
1000        &self,
1001        attributes: &MetricAttributes,
1002    ) -> Result<Box<dyn HistogramDurationBase>, Box<dyn std::error::Error>> {
1003        Ok(Box::new(InstrumentWithAttributes {
1004            inner: self.clone(),
1005            attributes: attributes.clone(),
1006        }))
1007    }
1008}
1009
1010impl<I> MetricAttributable<Box<dyn GaugeBase>> for BufferInstrument<I>
1011where
1012    I: BufferInstrumentRef + Send + Sync + Clone + 'static,
1013{
1014    fn with_attributes(
1015        &self,
1016        attributes: &MetricAttributes,
1017    ) -> Result<Box<dyn GaugeBase>, Box<dyn std::error::Error>> {
1018        Ok(Box::new(InstrumentWithAttributes {
1019            inner: self.clone(),
1020            attributes: attributes.clone(),
1021        }))
1022    }
1023}
1024
1025impl<I> MetricAttributable<Box<dyn GaugeF64Base>> for BufferInstrument<I>
1026where
1027    I: BufferInstrumentRef + Send + Sync + Clone + 'static,
1028{
1029    fn with_attributes(
1030        &self,
1031        attributes: &MetricAttributes,
1032    ) -> Result<Box<dyn GaugeF64Base>, Box<dyn std::error::Error>> {
1033        Ok(Box::new(InstrumentWithAttributes {
1034            inner: self.clone(),
1035            attributes: attributes.clone(),
1036        }))
1037    }
1038}
1039
1040impl<I> MetricAttributable<Box<dyn UpDownCounterBase>> for BufferInstrument<I>
1041where
1042    I: BufferInstrumentRef + Send + Sync + Clone + 'static,
1043{
1044    fn with_attributes(
1045        &self,
1046        attributes: &MetricAttributes,
1047    ) -> Result<Box<dyn UpDownCounterBase>, Box<dyn std::error::Error>> {
1048        Ok(Box::new(InstrumentWithAttributes {
1049            inner: self.clone(),
1050            attributes: attributes.clone(),
1051        }))
1052    }
1053}
1054
1055impl<I> CounterBase for InstrumentWithAttributes<BufferInstrument<I>>
1056where
1057    I: BufferInstrumentRef + Send + Sync + Clone + 'static,
1058{
1059    fn adds(&self, value: u64) {
1060        self.inner
1061            .send(MetricUpdateVal::Delta(value), &self.attributes)
1062    }
1063}
1064impl<I> GaugeBase for InstrumentWithAttributes<BufferInstrument<I>>
1065where
1066    I: BufferInstrumentRef + Send + Sync + Clone + 'static,
1067{
1068    fn records(&self, value: u64) {
1069        self.inner
1070            .send(MetricUpdateVal::Value(value), &self.attributes)
1071    }
1072}
1073impl<I> GaugeF64Base for InstrumentWithAttributes<BufferInstrument<I>>
1074where
1075    I: BufferInstrumentRef + Send + Sync + Clone + 'static,
1076{
1077    fn records(&self, value: f64) {
1078        self.inner
1079            .send(MetricUpdateVal::ValueF64(value), &self.attributes)
1080    }
1081}
1082impl<I> HistogramBase for InstrumentWithAttributes<BufferInstrument<I>>
1083where
1084    I: BufferInstrumentRef + Send + Sync + Clone + 'static,
1085{
1086    fn records(&self, value: u64) {
1087        self.inner
1088            .send(MetricUpdateVal::Value(value), &self.attributes)
1089    }
1090}
1091impl<I> HistogramF64Base for InstrumentWithAttributes<BufferInstrument<I>>
1092where
1093    I: BufferInstrumentRef + Send + Sync + Clone + 'static,
1094{
1095    fn records(&self, value: f64) {
1096        self.inner
1097            .send(MetricUpdateVal::ValueF64(value), &self.attributes)
1098    }
1099}
1100impl<I> HistogramDurationBase for InstrumentWithAttributes<BufferInstrument<I>>
1101where
1102    I: BufferInstrumentRef + Send + Sync + Clone + 'static,
1103{
1104    fn records(&self, value: Duration) {
1105        self.inner
1106            .send(MetricUpdateVal::Duration(value), &self.attributes)
1107    }
1108}
1109
1110impl<I> UpDownCounterBase for InstrumentWithAttributes<BufferInstrument<I>>
1111where
1112    I: BufferInstrumentRef + Send + Sync + Clone + 'static,
1113{
1114    fn adds(&self, value: i64) {
1115        self.inner
1116            .send(MetricUpdateVal::SignedDelta(value), &self.attributes)
1117    }
1118}
1119
1120#[cfg(test)]
1121mod tests {
1122    use super::*;
1123    use std::any::Any;
1124    use temporalio_common::telemetry::{
1125        TelemetryOptions,
1126        metrics::core::{BufferInstrumentRef, CustomMetricAttributes},
1127        telemetry_init,
1128    };
1129
1130    #[derive(Debug)]
1131    struct DummyCustomAttrs(usize);
1132    impl CustomMetricAttributes for DummyCustomAttrs {
1133        fn as_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
1134            self as Arc<dyn Any + Send + Sync>
1135        }
1136    }
1137    impl DummyCustomAttrs {
1138        fn as_id(ba: &BufferAttributes) -> usize {
1139            let as_dum = ba
1140                .get()
1141                .clone()
1142                .as_any()
1143                .downcast::<DummyCustomAttrs>()
1144                .unwrap();
1145            as_dum.0
1146        }
1147    }
1148
1149    #[derive(Debug, Clone)]
1150    struct DummyInstrumentRef(usize);
1151    impl BufferInstrumentRef for DummyInstrumentRef {}
1152
1153    #[test]
1154    fn test_buffered_core_context() {
1155        let call_buffer = Arc::new(MetricsCallBuffer::new(100));
1156        let telem_instance = telemetry_init(
1157            TelemetryOptions::builder()
1158                .metrics(call_buffer.clone() as Arc<dyn CoreMeter>)
1159                .build(),
1160        )
1161        .unwrap();
1162        let mc = MetricsContext::top_level("foo".to_string(), "q".to_string(), &telem_instance);
1163        mc.forced_cache_eviction();
1164        let events = call_buffer.retrieve();
1165        let a1 = assert_matches!(
1166            &events[0],
1167            MetricEvent::CreateAttributes {
1168                populate_into,
1169                append_from: None,
1170                attributes,
1171            }
1172            if attributes[0].key == "service_name"
1173            => populate_into
1174        );
1175        let a2 = assert_matches!(
1176            &events[1],
1177            MetricEvent::CreateAttributes {
1178                populate_into,
1179                append_from: Some(_),
1180                attributes,
1181            }
1182            if attributes[0].key == "namespace" &&
1183               attributes[1].key == "task_queue"
1184            => populate_into
1185        );
1186        a1.set(Arc::new(DummyCustomAttrs(1))).unwrap();
1187        a2.set(Arc::new(DummyCustomAttrs(2))).unwrap();
1188        // Verify all metrics are created. This number will need to get updated any time a metric
1189        // is added.
1190        let num_metrics = 35;
1191        #[allow(clippy::needless_range_loop)] // Sorry clippy, this reads easier.
1192        for metric_num in 2..=num_metrics + 1 {
1193            let hole = assert_matches!(&events[metric_num],
1194                MetricEvent::Create { populate_into, .. }
1195                => populate_into
1196            );
1197            hole.set(Arc::new(DummyInstrumentRef(metric_num))).unwrap();
1198        }
1199        assert_matches!(
1200            &events[num_metrics + 2], // +2 for attrib creation (at start), then this update
1201            MetricEvent::Update {
1202                instrument,
1203                attributes,
1204                update: MetricUpdateVal::Delta(1)
1205            }
1206            if DummyCustomAttrs::as_id(attributes) == 2 && instrument.get().0 == num_metrics + 1
1207        );
1208        // Verify creating a new context with new attributes merges them properly
1209        let mc2 = mc.with_new_attrs([MetricKeyValue::new("gotta", "go fast")]);
1210        mc2.wf_task_latency(Duration::from_secs(1));
1211        let events = call_buffer.retrieve();
1212        dbg!(&events);
1213        let a2 = assert_matches!(
1214            &events[0],
1215            MetricEvent::CreateAttributes {
1216                populate_into,
1217                append_from: Some(eh),
1218                attributes
1219            }
1220            if attributes[0].key == "gotta" && DummyCustomAttrs::as_id(eh) == 2
1221            => populate_into
1222        );
1223        a2.set(Arc::new(DummyCustomAttrs(3))).unwrap();
1224        assert_matches!(
1225            &events[1],
1226            MetricEvent::Update {
1227                instrument,
1228                attributes,
1229                update: MetricUpdateVal::Duration(d)
1230            }
1231            if DummyCustomAttrs::as_id(attributes) == 3 && instrument.get().0 == 12
1232               && d == &Duration::from_secs(1)
1233        );
1234    }
1235
1236    #[test]
1237    fn metric_buffer() {
1238        let call_buffer = MetricsCallBuffer::new(12);
1239        let ctr = call_buffer.counter(MetricParameters {
1240            name: "ctr".into(),
1241            description: "a counter".into(),
1242            unit: "grognaks".into(),
1243        });
1244        let histo = call_buffer.histogram(MetricParameters {
1245            name: "histo".into(),
1246            description: "a histogram".into(),
1247            unit: "flubarbs".into(),
1248        });
1249        let gauge = call_buffer.gauge(MetricParameters {
1250            name: "gauge".into(),
1251            description: "a counter".into(),
1252            unit: "bleezles".into(),
1253        });
1254        let histo_dur = call_buffer.histogram_duration(MetricParameters {
1255            name: "histo_dur".into(),
1256            description: "a duration histogram".into(),
1257            unit: "seconds".into(),
1258        });
1259        let up_down_ctr = call_buffer.up_down_counter(MetricParameters {
1260            name: "up_down_ctr".into(),
1261            description: "an up down counter".into(),
1262            unit: "widgets".into(),
1263        });
1264        let attrs_1 = call_buffer.new_attributes(NewAttributes {
1265            attributes: vec![MetricKeyValue::new("hi", "yo")],
1266        });
1267        let attrs_2 = call_buffer.new_attributes(NewAttributes {
1268            attributes: vec![MetricKeyValue::new("run", "fast")],
1269        });
1270        ctr.add(1, &attrs_1);
1271        histo.record(2, &attrs_1);
1272        gauge.record(3, &attrs_2);
1273        histo_dur.record(Duration::from_secs_f64(1.2), &attrs_1);
1274        up_down_ctr.add(-3, &attrs_2);
1275
1276        let mut calls = call_buffer.retrieve();
1277        calls.reverse();
1278        let ctr_1 = assert_matches!(
1279            calls.pop(),
1280            Some(MetricEvent::Create {
1281                params,
1282                populate_into,
1283                kind: MetricKind::Counter
1284            })
1285            if params.name == "ctr"
1286            => populate_into
1287        );
1288        ctr_1.set(Arc::new(DummyInstrumentRef(1))).unwrap();
1289        let hist_2 = assert_matches!(
1290            calls.pop(),
1291            Some(MetricEvent::Create {
1292                params,
1293                populate_into,
1294                kind: MetricKind::Histogram
1295            })
1296            if params.name == "histo"
1297            => populate_into
1298        );
1299        hist_2.set(Arc::new(DummyInstrumentRef(2))).unwrap();
1300        let gauge_3 = assert_matches!(
1301            calls.pop(),
1302            Some(MetricEvent::Create {
1303                params,
1304                populate_into,
1305                kind: MetricKind::Gauge
1306            })
1307            if params.name == "gauge"
1308            => populate_into
1309        );
1310        gauge_3.set(Arc::new(DummyInstrumentRef(3))).unwrap();
1311        let hist_4 = assert_matches!(
1312            calls.pop(),
1313            Some(MetricEvent::Create {
1314                params,
1315                populate_into,
1316                kind: MetricKind::HistogramDuration
1317            })
1318            if params.name == "histo_dur"
1319            => populate_into
1320        );
1321        hist_4.set(Arc::new(DummyInstrumentRef(4))).unwrap();
1322        let up_down_5 = assert_matches!(
1323            calls.pop(),
1324            Some(MetricEvent::Create {
1325                params,
1326                populate_into,
1327                kind: MetricKind::UpDownCounter
1328            })
1329            if params.name == "up_down_ctr"
1330            => populate_into
1331        );
1332        up_down_5.set(Arc::new(DummyInstrumentRef(5))).unwrap();
1333        let a1 = assert_matches!(
1334            calls.pop(),
1335            Some(MetricEvent::CreateAttributes {
1336                populate_into,
1337                append_from: None,
1338                attributes
1339            })
1340            if attributes[0].key == "hi"
1341            => populate_into
1342        );
1343        a1.set(Arc::new(DummyCustomAttrs(1))).unwrap();
1344        let a2 = assert_matches!(
1345            calls.pop(),
1346            Some(MetricEvent::CreateAttributes {
1347                populate_into,
1348                append_from: None,
1349                attributes
1350            })
1351            if attributes[0].key == "run"
1352            => populate_into
1353        );
1354        a2.set(Arc::new(DummyCustomAttrs(2))).unwrap();
1355        assert_matches!(
1356            calls.pop(),
1357            Some(MetricEvent::Update{
1358                instrument,
1359                attributes,
1360                update: MetricUpdateVal::Delta(1)
1361            })
1362            if DummyCustomAttrs::as_id(&attributes) == 1 && instrument.get().0 == 1
1363        );
1364        assert_matches!(
1365            calls.pop(),
1366            Some(MetricEvent::Update{
1367                instrument,
1368                attributes,
1369                update: MetricUpdateVal::Value(2)
1370            })
1371            if DummyCustomAttrs::as_id(&attributes) == 1 && instrument.get().0 == 2
1372        );
1373        assert_matches!(
1374            calls.pop(),
1375            Some(MetricEvent::Update{
1376                instrument,
1377                attributes,
1378                update: MetricUpdateVal::Value(3)
1379            })
1380            if DummyCustomAttrs::as_id(&attributes) == 2 && instrument.get().0 == 3
1381        );
1382        assert_matches!(
1383            calls.pop(),
1384            Some(MetricEvent::Update{
1385                instrument,
1386                attributes,
1387                update: MetricUpdateVal::Duration(d)
1388            })
1389            if DummyCustomAttrs::as_id(&attributes) == 1 && instrument.get().0 == 4
1390               && d == Duration::from_secs_f64(1.2)
1391        );
1392        assert_matches!(
1393            calls.pop(),
1394            Some(MetricEvent::Update{
1395                instrument,
1396                attributes,
1397                update: MetricUpdateVal::SignedDelta(-3)
1398            })
1399            if DummyCustomAttrs::as_id(&attributes) == 2 && instrument.get().0 == 5
1400        );
1401    }
1402}