1#[cfg(test)]
2use crate::TelemetryInstance;
3use crate::abstractions::dbg_panic;
4
5use std::{
6 collections::HashMap,
7 fmt::{Debug, Display},
8 iter::Iterator,
9 sync::{Arc, atomic::AtomicU64},
10 time::Duration,
11};
12use temporalio_common::{
13 protos::temporal::api::{enums::v1::WorkflowTaskFailedCause, failure::v1::Failure},
14 telemetry::metrics::{core::*, *},
15};
16
17pub(super) const NUM_POLLERS_NAME: &str = "num_pollers";
18pub(super) const TASK_SLOTS_AVAILABLE_NAME: &str = "worker_task_slots_available";
19pub(super) const TASK_SLOTS_USED_NAME: &str = "worker_task_slots_used";
20pub(super) const STICKY_CACHE_SIZE_NAME: &str = "sticky_cache_size";
21
22#[derive(Clone)]
24pub(crate) struct MetricsContext {
25 meter: TemporalMeter,
26 instruments: Arc<Instruments>,
27 in_memory_metrics: Option<Arc<WorkerHeartbeatMetrics>>,
28}
29
30#[derive(Clone)]
31struct Instruments {
32 wf_completed_counter: Counter,
33 wf_canceled_counter: Counter,
34 wf_failed_counter: Counter,
35 wf_cont_counter: Counter,
36 wf_e2e_latency: HistogramDuration,
37 wf_task_queue_poll_empty_counter: Counter,
38 wf_task_queue_poll_succeed_counter: Counter,
39 wf_task_execution_failure_counter: Counter,
40 wf_task_sched_to_start_latency: HistogramDuration,
41 wf_task_replay_latency: HistogramDuration,
42 wf_task_execution_latency: HistogramDuration,
43 act_poll_no_task: Counter,
44 act_task_received_counter: Counter,
45 act_execution_failed: Counter,
46 act_sched_to_start_latency: HistogramDuration,
47 act_exec_latency: HistogramDuration,
48 act_exec_succeeded_latency: HistogramDuration,
49 la_execution_cancelled: Counter,
50 la_execution_failed: Counter,
51 la_exec_latency: HistogramDuration,
52 la_exec_succeeded_latency: HistogramDuration,
53 la_total: Counter,
54 nexus_poll_no_task: Counter,
55 nexus_task_schedule_to_start_latency: HistogramDuration,
56 nexus_task_e2e_latency: HistogramDuration,
57 nexus_task_execution_latency: HistogramDuration,
58 nexus_task_execution_failed: Counter,
59 worker_registered: Counter,
60 num_pollers: Gauge,
61 task_slots_available: Gauge,
62 task_slots_used: Gauge,
63 sticky_cache_hit: Counter,
64 sticky_cache_miss: Counter,
65 sticky_cache_size: Gauge,
66 sticky_cache_forced_evictions: Counter,
67}
68
69impl MetricsContext {
70 pub(crate) fn no_op() -> Self {
71 let meter = TemporalMeter::no_op();
72 let in_memory_metrics = Some(Arc::new(WorkerHeartbeatMetrics::default()));
73 let instruments = Arc::new(Instruments::new(&meter, in_memory_metrics.clone()));
74 Self {
75 instruments,
76 meter,
77 in_memory_metrics,
78 }
79 }
80
81 #[cfg(test)]
82 pub(crate) fn top_level(namespace: String, tq: String, telemetry: &TelemetryInstance) -> Self {
83 MetricsContext::top_level_with_meter(namespace, tq, telemetry.get_temporal_metric_meter())
84 }
85
86 pub(crate) fn top_level_with_meter(
87 namespace: String,
88 tq: String,
89 temporal_meter: Option<TemporalMeter>,
90 ) -> Self {
91 if let Some(mut meter) = temporal_meter {
92 let addtl_attributes = [
93 MetricKeyValue::new(KEY_NAMESPACE, namespace),
94 task_queue(tq),
95 ];
96 meter.merge_attributes(addtl_attributes.into());
97 let in_memory_metrics = Some(Arc::new(WorkerHeartbeatMetrics::default()));
98 let mut instruments = Instruments::new(&meter, in_memory_metrics.clone());
99 instruments.update_attributes(meter.get_default_attributes());
100 Self {
101 instruments: Arc::new(instruments),
102 meter,
103 in_memory_metrics,
104 }
105 } else {
106 Self::no_op()
107 }
108 }
109
110 pub(crate) fn with_new_attrs(
112 &self,
113 new_attrs: impl IntoIterator<Item = MetricKeyValue>,
114 ) -> Self {
115 let mut tm = self.meter.clone();
116 tm.merge_attributes(new_attrs.into());
117 let mut instruments = (*self.instruments).clone();
118 instruments.update_attributes(tm.get_default_attributes());
119 Self {
120 instruments: Arc::new(instruments),
121 meter: self.meter.clone(),
122 in_memory_metrics: self.in_memory_metrics.clone(),
123 }
124 }
125
126 pub(crate) fn in_memory_meter(&self) -> Option<Arc<WorkerHeartbeatMetrics>> {
127 self.in_memory_metrics.clone()
128 }
129
130 pub(crate) fn wf_tq_poll_ok(&self) {
132 self.instruments.wf_task_queue_poll_succeed_counter.adds(1);
133 }
134
135 pub(crate) fn wf_tq_poll_empty(&self) {
137 self.instruments.wf_task_queue_poll_empty_counter.adds(1);
138 }
139
140 pub(crate) fn wf_task_failed(&self) {
142 self.instruments.wf_task_execution_failure_counter.adds(1);
143 }
144
145 pub(crate) fn wf_completed(&self) {
147 self.instruments.wf_completed_counter.adds(1);
148 }
149
150 pub(crate) fn wf_canceled(&self) {
152 self.instruments.wf_canceled_counter.adds(1);
153 }
154
155 pub(crate) fn wf_failed(&self) {
157 self.instruments.wf_failed_counter.adds(1);
158 }
159
160 pub(crate) fn wf_continued_as_new(&self) {
162 self.instruments.wf_cont_counter.adds(1);
163 }
164
165 pub(crate) fn wf_e2e_latency(&self, dur: Duration) {
167 self.instruments.wf_e2e_latency.records(dur);
168 }
169
170 pub(crate) fn wf_task_sched_to_start_latency(&self, dur: Duration) {
172 self.instruments.wf_task_sched_to_start_latency.records(dur);
173 }
174
175 pub(crate) fn wf_task_latency(&self, dur: Duration) {
177 self.instruments.wf_task_execution_latency.records(dur);
178 }
179
180 pub(crate) fn wf_task_replay_latency(&self, dur: Duration) {
182 self.instruments.wf_task_replay_latency.records(dur);
183 }
184
185 pub(crate) fn act_poll_timeout(&self) {
187 self.instruments.act_poll_no_task.adds(1);
188 }
189
190 pub(crate) fn act_task_received(&self) {
192 self.instruments.act_task_received_counter.adds(1);
193 }
194
195 pub(crate) fn act_execution_failed(&self) {
197 self.instruments.act_execution_failed.adds(1);
198 }
199
200 pub(crate) fn act_execution_succeeded(&self, dur: Duration) {
202 self.instruments.act_exec_succeeded_latency.records(dur);
203 }
204
205 pub(crate) fn act_sched_to_start_latency(&self, dur: Duration) {
207 self.instruments.act_sched_to_start_latency.records(dur);
208 }
209
210 pub(crate) fn act_execution_latency(&self, dur: Duration) {
213 self.instruments.act_exec_latency.records(dur);
214 }
215
216 pub(crate) fn la_execution_cancelled(&self) {
217 self.instruments.la_execution_cancelled.adds(1);
218 }
219
220 pub(crate) fn la_execution_failed(&self) {
221 self.instruments.la_execution_failed.adds(1);
222 }
223
224 pub(crate) fn la_exec_latency(&self, dur: Duration) {
225 self.instruments.la_exec_latency.records(dur);
226 }
227
228 pub(crate) fn la_exec_succeeded_latency(&self, dur: Duration) {
229 self.instruments.la_exec_succeeded_latency.records(dur);
230 }
231
232 pub(crate) fn la_executed(&self) {
233 self.instruments.la_total.adds(1);
234 }
235
236 pub(crate) fn nexus_poll_timeout(&self) {
238 self.instruments.nexus_poll_no_task.adds(1);
239 }
240
241 pub(crate) fn nexus_task_sched_to_start_latency(&self, dur: Duration) {
243 self.instruments
244 .nexus_task_schedule_to_start_latency
245 .records(dur);
246 }
247
248 pub(crate) fn nexus_task_e2e_latency(&self, dur: Duration) {
250 self.instruments.nexus_task_e2e_latency.records(dur);
251 }
252
253 pub(crate) fn nexus_task_execution_latency(&self, dur: Duration) {
255 self.instruments.nexus_task_execution_latency.records(dur);
256 }
257
258 pub(crate) fn nexus_task_execution_failed(&self) {
260 self.instruments.nexus_task_execution_failed.adds(1);
261 }
262
263 pub(crate) fn worker_registered(&self) {
265 self.instruments.worker_registered.adds(1);
266 }
267
268 pub(crate) fn available_task_slots(&self, num: usize) {
270 self.instruments.task_slots_available.records(num as u64)
271 }
272
273 pub(crate) fn task_slots_used(&self, num: u64) {
275 self.instruments.task_slots_used.records(num)
276 }
277
278 pub(crate) fn record_num_pollers(&self, num: usize) {
280 self.instruments.num_pollers.records(num as u64);
281 }
282
283 pub(crate) fn sticky_cache_hit(&self) {
285 self.instruments.sticky_cache_hit.adds(1);
286 }
287
288 pub(crate) fn sticky_cache_miss(&self) {
290 self.instruments.sticky_cache_miss.adds(1);
291 }
292
293 pub(crate) fn cache_size(&self, size: u64) {
295 self.instruments.sticky_cache_size.records(size);
296 }
297
298 pub(crate) fn forced_cache_eviction(&self) {
300 self.instruments.sticky_cache_forced_evictions.adds(1);
301 }
302}
303
304impl Instruments {
305 fn new(meter: &TemporalMeter, in_memory: Option<Arc<WorkerHeartbeatMetrics>>) -> Self {
306 let counter_with_in_mem = |params: MetricParameters| -> Counter {
307 in_memory
308 .clone()
309 .and_then(|in_mem| in_mem.get_metric(¶ms.name))
310 .map(|metric| meter.counter_with_in_memory(params.clone(), metric))
311 .unwrap_or_else(|| meter.counter(params))
312 };
313
314 let gauge_with_in_mem = |params: MetricParameters| -> Gauge {
315 in_memory
316 .clone()
317 .and_then(|in_mem| in_mem.get_metric(¶ms.name))
318 .map(|metric| meter.gauge_with_in_memory(params.clone(), metric))
319 .unwrap_or_else(|| meter.gauge(params))
320 };
321
322 let histogram_with_in_mem = |params: MetricParameters| -> HistogramDuration {
323 in_memory
324 .clone()
325 .and_then(|in_mem| in_mem.get_metric(¶ms.name))
326 .map(|metric| meter.histogram_duration_with_in_memory(params.clone(), metric))
327 .unwrap_or_else(|| meter.histogram_duration(params))
328 };
329
330 Self {
331 wf_completed_counter: meter.counter(MetricParameters {
332 name: "workflow_completed".into(),
333 description: "Count of successfully completed workflows".into(),
334 unit: "".into(),
335 }),
336 wf_canceled_counter: meter.counter(MetricParameters {
337 name: "workflow_canceled".into(),
338 description: "Count of canceled workflows".into(),
339 unit: "".into(),
340 }),
341 wf_failed_counter: meter.counter(MetricParameters {
342 name: "workflow_failed".into(),
343 description: "Count of failed workflows".into(),
344 unit: "".into(),
345 }),
346 wf_cont_counter: meter.counter(MetricParameters {
347 name: "workflow_continue_as_new".into(),
348 description: "Count of continued-as-new workflows".into(),
349 unit: "".into(),
350 }),
351 wf_e2e_latency: meter.histogram_duration(MetricParameters {
352 name: WORKFLOW_E2E_LATENCY_HISTOGRAM_NAME.into(),
353 unit: "duration".into(),
354 description: "Histogram of total workflow execution latencies".into(),
355 }),
356 wf_task_queue_poll_empty_counter: meter.counter(MetricParameters {
357 name: "workflow_task_queue_poll_empty".into(),
358 description: "Count of workflow task queue poll timeouts (no new task)".into(),
359 unit: "".into(),
360 }),
361 wf_task_queue_poll_succeed_counter: counter_with_in_mem(MetricParameters {
362 name: "workflow_task_queue_poll_succeed".into(),
363 description: "Count of workflow task queue poll successes".into(),
364 unit: "".into(),
365 }),
366 wf_task_execution_failure_counter: counter_with_in_mem(MetricParameters {
367 name: "workflow_task_execution_failed".into(),
368 description: "Count of workflow task execution failures".into(),
369 unit: "".into(),
370 }),
371 wf_task_sched_to_start_latency: meter.histogram_duration(MetricParameters {
372 name: WORKFLOW_TASK_SCHED_TO_START_LATENCY_HISTOGRAM_NAME.into(),
373 unit: "duration".into(),
374 description: "Histogram of workflow task schedule-to-start latencies".into(),
375 }),
376 wf_task_replay_latency: meter.histogram_duration(MetricParameters {
377 name: WORKFLOW_TASK_REPLAY_LATENCY_HISTOGRAM_NAME.into(),
378 unit: "duration".into(),
379 description: "Histogram of workflow task replay latencies".into(),
380 }),
381 wf_task_execution_latency: histogram_with_in_mem(MetricParameters {
382 name: WORKFLOW_TASK_EXECUTION_LATENCY_HISTOGRAM_NAME.into(),
383 unit: "duration".into(),
384 description: "Histogram of workflow task execution (not replay) latencies".into(),
385 }),
386 act_poll_no_task: meter.counter(MetricParameters {
387 name: "activity_poll_no_task".into(),
388 description: "Count of activity task queue poll timeouts (no new task)".into(),
389 unit: "".into(),
390 }),
391 act_task_received_counter: counter_with_in_mem(MetricParameters {
392 name: "activity_task_received".into(),
393 description: "Count of activity task queue poll successes".into(),
394 unit: "".into(),
395 }),
396 act_execution_failed: counter_with_in_mem(MetricParameters {
397 name: "activity_execution_failed".into(),
398 description: "Count of activity task execution failures".into(),
399 unit: "".into(),
400 }),
401 act_sched_to_start_latency: meter.histogram_duration(MetricParameters {
402 name: ACTIVITY_SCHED_TO_START_LATENCY_HISTOGRAM_NAME.into(),
403 unit: "duration".into(),
404 description: "Histogram of activity schedule-to-start latencies".into(),
405 }),
406 act_exec_latency: histogram_with_in_mem(MetricParameters {
407 name: ACTIVITY_EXEC_LATENCY_HISTOGRAM_NAME.into(),
408 unit: "duration".into(),
409 description: "Histogram of activity execution latencies".into(),
410 }),
411 act_exec_succeeded_latency: meter.histogram_duration(MetricParameters {
412 name: "activity_succeed_endtoend_latency".into(),
413 unit: "duration".into(),
414 description: "Histogram of activity execution latencies for successful activities"
415 .into(),
416 }),
417 la_execution_cancelled: meter.counter(MetricParameters {
418 name: "local_activity_execution_cancelled".into(),
419 description: "Count of local activity executions that were cancelled".into(),
420 unit: "".into(),
421 }),
422 la_execution_failed: meter.counter(MetricParameters {
423 name: "local_activity_execution_failed".into(),
424 description: "Count of local activity executions that failed".into(),
425 unit: "".into(),
426 }),
427 la_exec_latency: histogram_with_in_mem(MetricParameters {
428 name: "local_activity_execution_latency".into(),
429 unit: "duration".into(),
430 description: "Histogram of local activity execution latencies".into(),
431 }),
432 la_exec_succeeded_latency: meter.histogram_duration(MetricParameters {
433 name: "local_activity_succeed_endtoend_latency".into(),
434 unit: "duration".into(),
435 description:
436 "Histogram of local activity execution latencies for successful local activities"
437 .into(),
438 }),
439 la_total: counter_with_in_mem(MetricParameters {
440 name: "local_activity_total".into(),
441 description: "Count of local activities executed".into(),
442 unit: "".into(),
443 }),
444 nexus_poll_no_task: meter.counter(MetricParameters {
445 name: "nexus_poll_no_task".into(),
446 description: "Count of nexus task queue poll timeouts (no new task)".into(),
447 unit: "".into(),
448 }),
449 nexus_task_schedule_to_start_latency: meter.histogram_duration(MetricParameters {
450 name: "nexus_task_schedule_to_start_latency".into(),
451 unit: "duration".into(),
452 description: "Histogram of nexus task schedule-to-start latencies".into(),
453 }),
454 nexus_task_e2e_latency: meter.histogram_duration(MetricParameters {
455 name: "nexus_task_endtoend_latency".into(),
456 unit: "duration".into(),
457 description: "Histogram of nexus task end-to-end latencies".into(),
458 }),
459 nexus_task_execution_latency: histogram_with_in_mem(MetricParameters {
460 name: "nexus_task_execution_latency".into(),
461 unit: "duration".into(),
462 description: "Histogram of nexus task execution latencies".into(),
463 }),
464 nexus_task_execution_failed: counter_with_in_mem(MetricParameters {
465 name: "nexus_task_execution_failed".into(),
466 description: "Count of nexus task execution failures".into(),
467 unit: "".into(),
468 }),
469 worker_registered: meter.counter(MetricParameters {
471 name: "worker_start".into(),
472 description: "Count of the number of initialized workers".into(),
473 unit: "".into(),
474 }),
475 num_pollers: gauge_with_in_mem(MetricParameters {
476 name: NUM_POLLERS_NAME.into(),
477 description: "Current number of active pollers per queue type".into(),
478 unit: "".into(),
479 }),
480 task_slots_available: gauge_with_in_mem(MetricParameters {
481 name: TASK_SLOTS_AVAILABLE_NAME.into(),
482 description: "Current number of available slots per task type".into(),
483 unit: "".into(),
484 }),
485 task_slots_used: gauge_with_in_mem(MetricParameters {
486 name: TASK_SLOTS_USED_NAME.into(),
487 description: "Current number of used slots per task type".into(),
488 unit: "".into(),
489 }),
490 sticky_cache_hit: counter_with_in_mem(MetricParameters {
491 name: "sticky_cache_hit".into(),
492 description: "Count of times the workflow cache was used for a new workflow task"
493 .into(),
494 unit: "".into(),
495 }),
496 sticky_cache_miss: counter_with_in_mem(MetricParameters {
497 name: "sticky_cache_miss".into(),
498 description:
499 "Count of times the workflow cache was missing a workflow for a sticky task".into(),
500 unit: "".into(),
501 }),
502 sticky_cache_size: gauge_with_in_mem(MetricParameters {
503 name: STICKY_CACHE_SIZE_NAME.into(),
504 description: "Current number of cached workflows".into(),
505 unit: "".into(),
506 }),
507 sticky_cache_forced_evictions: meter.counter(MetricParameters {
508 name: "sticky_cache_total_forced_eviction".into(),
509 description: "Count of evictions of cached workflows".into(),
510 unit: "".into(),
511 }),
512 }
513 }
514
515 fn update_attributes(&mut self, new_attributes: &MetricAttributes) {
516 self.wf_completed_counter
517 .update_attributes(new_attributes.clone());
518 self.wf_canceled_counter
519 .update_attributes(new_attributes.clone());
520 self.wf_failed_counter
521 .update_attributes(new_attributes.clone());
522 self.wf_cont_counter
523 .update_attributes(new_attributes.clone());
524 self.wf_e2e_latency
525 .update_attributes(new_attributes.clone());
526 self.wf_task_queue_poll_empty_counter
527 .update_attributes(new_attributes.clone());
528 self.wf_task_queue_poll_succeed_counter
529 .update_attributes(new_attributes.clone());
530 self.wf_task_execution_failure_counter
531 .update_attributes(new_attributes.clone());
532 self.wf_task_sched_to_start_latency
533 .update_attributes(new_attributes.clone());
534 self.wf_task_replay_latency
535 .update_attributes(new_attributes.clone());
536 self.wf_task_execution_latency
537 .update_attributes(new_attributes.clone());
538 self.act_poll_no_task
539 .update_attributes(new_attributes.clone());
540 self.act_task_received_counter
541 .update_attributes(new_attributes.clone());
542 self.act_execution_failed
543 .update_attributes(new_attributes.clone());
544 self.act_sched_to_start_latency
545 .update_attributes(new_attributes.clone());
546 self.act_exec_latency
547 .update_attributes(new_attributes.clone());
548 self.act_exec_succeeded_latency
549 .update_attributes(new_attributes.clone());
550 self.la_execution_cancelled
551 .update_attributes(new_attributes.clone());
552 self.la_execution_failed
553 .update_attributes(new_attributes.clone());
554 self.la_exec_latency
555 .update_attributes(new_attributes.clone());
556 self.la_exec_succeeded_latency
557 .update_attributes(new_attributes.clone());
558 self.la_total.update_attributes(new_attributes.clone());
559 self.nexus_poll_no_task
560 .update_attributes(new_attributes.clone());
561 self.nexus_task_schedule_to_start_latency
562 .update_attributes(new_attributes.clone());
563 self.nexus_task_e2e_latency
564 .update_attributes(new_attributes.clone());
565 self.nexus_task_execution_latency
566 .update_attributes(new_attributes.clone());
567 self.nexus_task_execution_failed
568 .update_attributes(new_attributes.clone());
569 self.worker_registered
570 .update_attributes(new_attributes.clone());
571 self.num_pollers.update_attributes(new_attributes.clone());
572 self.task_slots_available
573 .update_attributes(new_attributes.clone());
574 self.task_slots_used
575 .update_attributes(new_attributes.clone());
576 self.sticky_cache_hit
577 .update_attributes(new_attributes.clone());
578 self.sticky_cache_miss
579 .update_attributes(new_attributes.clone());
580 self.sticky_cache_size
581 .update_attributes(new_attributes.clone());
582 self.sticky_cache_forced_evictions
583 .update_attributes(new_attributes.clone());
584 }
585}
586
587#[derive(Default, Debug)]
588pub(crate) struct NumPollersMetric {
589 pub wft_current_pollers: Arc<AtomicU64>,
590 pub sticky_wft_current_pollers: Arc<AtomicU64>,
591 pub activity_current_pollers: Arc<AtomicU64>,
592 pub nexus_current_pollers: Arc<AtomicU64>,
593}
594
595impl NumPollersMetric {
596 pub(crate) fn as_map(&self) -> HashMap<String, Arc<AtomicU64>> {
597 HashMap::from([
598 (
599 "workflow_task".to_string(),
600 self.wft_current_pollers.clone(),
601 ),
602 (
603 "sticky_workflow_task".to_string(),
604 self.sticky_wft_current_pollers.clone(),
605 ),
606 (
607 "activity_task".to_string(),
608 self.activity_current_pollers.clone(),
609 ),
610 ("nexus_task".to_string(), self.nexus_current_pollers.clone()),
611 ])
612 }
613}
614
615#[derive(Default, Debug)]
616pub(crate) struct SlotMetrics {
617 pub workflow_worker: Arc<AtomicU64>,
618 pub activity_worker: Arc<AtomicU64>,
619 pub nexus_worker: Arc<AtomicU64>,
620 pub local_activity_worker: Arc<AtomicU64>,
621}
622
623impl SlotMetrics {
624 pub(crate) fn as_map(&self) -> HashMap<String, Arc<AtomicU64>> {
625 HashMap::from([
626 ("WorkflowWorker".to_string(), self.workflow_worker.clone()),
627 ("ActivityWorker".to_string(), self.activity_worker.clone()),
628 ("NexusWorker".to_string(), self.nexus_worker.clone()),
629 (
630 "LocalActivityWorker".to_string(),
631 self.local_activity_worker.clone(),
632 ),
633 ])
634 }
635}
636
637#[derive(Default, Debug)]
638pub(crate) struct WorkerHeartbeatMetrics {
639 pub sticky_cache_size: Arc<AtomicU64>,
640 pub total_sticky_cache_hit: Arc<AtomicU64>,
641 pub total_sticky_cache_miss: Arc<AtomicU64>,
642 pub num_pollers: NumPollersMetric,
643 pub worker_task_slots_used: SlotMetrics,
644 pub worker_task_slots_available: SlotMetrics,
645 pub workflow_task_execution_failed: Arc<AtomicU64>,
646 pub activity_execution_failed: Arc<AtomicU64>,
647 pub nexus_task_execution_failed: Arc<AtomicU64>,
648 pub local_activity_execution_failed: Arc<AtomicU64>,
649 pub activity_execution_latency: Arc<AtomicU64>,
652 pub local_activity_execution_latency: Arc<AtomicU64>,
653 pub workflow_task_execution_latency: Arc<AtomicU64>,
654 pub nexus_task_execution_latency: Arc<AtomicU64>,
655}
656
657impl WorkerHeartbeatMetrics {
658 pub(crate) fn get_metric(&self, name: &str) -> Option<HeartbeatMetricType> {
659 match name {
660 "sticky_cache_size" => Some(HeartbeatMetricType::Individual(
661 self.sticky_cache_size.clone(),
662 )),
663 "sticky_cache_hit" => Some(HeartbeatMetricType::Individual(
664 self.total_sticky_cache_hit.clone(),
665 )),
666 "sticky_cache_miss" => Some(HeartbeatMetricType::Individual(
667 self.total_sticky_cache_miss.clone(),
668 )),
669 "num_pollers" => Some(HeartbeatMetricType::WithLabel {
670 label_key: "poller_type".to_string(),
671 metrics: self.num_pollers.as_map(),
672 }),
673 "worker_task_slots_used" => Some(HeartbeatMetricType::WithLabel {
674 label_key: "worker_type".to_string(),
675 metrics: self.worker_task_slots_used.as_map(),
676 }),
677 "worker_task_slots_available" => Some(HeartbeatMetricType::WithLabel {
678 label_key: "worker_type".to_string(),
679 metrics: self.worker_task_slots_available.as_map(),
680 }),
681 "workflow_task_execution_failed" => Some(HeartbeatMetricType::Individual(
682 self.workflow_task_execution_failed.clone(),
683 )),
684 "activity_execution_failed" => Some(HeartbeatMetricType::Individual(
685 self.activity_execution_failed.clone(),
686 )),
687 "nexus_task_execution_failed" => Some(HeartbeatMetricType::Individual(
688 self.nexus_task_execution_failed.clone(),
689 )),
690 "local_activity_execution_failed" => Some(HeartbeatMetricType::Individual(
691 self.local_activity_execution_failed.clone(),
692 )),
693 "activity_execution_latency" => Some(HeartbeatMetricType::Individual(
694 self.activity_execution_latency.clone(),
695 )),
696 "local_activity_execution_latency" => Some(HeartbeatMetricType::Individual(
697 self.local_activity_execution_latency.clone(),
698 )),
699 "workflow_task_execution_latency" => Some(HeartbeatMetricType::Individual(
700 self.workflow_task_execution_latency.clone(),
701 )),
702 "nexus_task_execution_latency" => Some(HeartbeatMetricType::Individual(
703 self.nexus_task_execution_latency.clone(),
704 )),
705 _ => None,
706 }
707 }
708}
709
710const KEY_NAMESPACE: &str = "namespace";
711const KEY_WF_TYPE: &str = "workflow_type";
712const KEY_TASK_QUEUE: &str = "task_queue";
713const KEY_ACT_TYPE: &str = "activity_type";
714const KEY_POLLER_TYPE: &str = "poller_type";
715const KEY_WORKER_TYPE: &str = "worker_type";
716const KEY_EAGER: &str = "eager";
717const KEY_TASK_FAILURE_TYPE: &str = "failure_reason";
718
719pub(crate) fn workflow_poller() -> MetricKeyValue {
720 MetricKeyValue::new(KEY_POLLER_TYPE, "workflow_task")
721}
722pub(crate) fn workflow_sticky_poller() -> MetricKeyValue {
723 MetricKeyValue::new(KEY_POLLER_TYPE, "sticky_workflow_task")
724}
725pub(crate) fn activity_poller() -> MetricKeyValue {
726 MetricKeyValue::new(KEY_POLLER_TYPE, "activity_task")
727}
728pub(crate) fn nexus_poller() -> MetricKeyValue {
729 MetricKeyValue::new(KEY_POLLER_TYPE, "nexus_task")
730}
731pub(crate) fn task_queue(tq: String) -> MetricKeyValue {
732 MetricKeyValue::new(KEY_TASK_QUEUE, tq)
733}
734pub(crate) fn activity_type(ty: String) -> MetricKeyValue {
735 MetricKeyValue::new(KEY_ACT_TYPE, ty)
736}
737pub(crate) fn workflow_type(ty: String) -> MetricKeyValue {
738 MetricKeyValue::new(KEY_WF_TYPE, ty)
739}
740pub(crate) fn workflow_worker_type() -> MetricKeyValue {
741 MetricKeyValue::new(KEY_WORKER_TYPE, "WorkflowWorker")
742}
743pub(crate) fn activity_worker_type() -> MetricKeyValue {
744 MetricKeyValue::new(KEY_WORKER_TYPE, "ActivityWorker")
745}
746pub(crate) fn local_activity_worker_type() -> MetricKeyValue {
747 MetricKeyValue::new(KEY_WORKER_TYPE, "LocalActivityWorker")
748}
749pub(crate) fn nexus_worker_type() -> MetricKeyValue {
750 MetricKeyValue::new(KEY_WORKER_TYPE, "NexusWorker")
751}
752pub(crate) fn eager(is_eager: bool) -> MetricKeyValue {
753 MetricKeyValue::new(KEY_EAGER, is_eager)
754}
755pub(crate) enum FailureReason {
756 Nondeterminism,
757 Workflow,
758 Timeout,
759 NexusOperation(String),
760 NexusHandlerError(String),
761 GrpcMessageTooLarge,
762}
763impl Display for FailureReason {
764 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
765 let str = match self {
766 FailureReason::Nondeterminism => "NonDeterminismError".to_owned(),
767 FailureReason::Workflow => "WorkflowError".to_owned(),
768 FailureReason::Timeout => "timeout".to_owned(),
769 FailureReason::NexusOperation(op) => format!("operation_{op}"),
770 FailureReason::NexusHandlerError(op) => format!("handler_error_{op}"),
771 FailureReason::GrpcMessageTooLarge => "GrpcMessageTooLarge".to_owned(),
772 };
773 write!(f, "{str}")
774 }
775}
776impl From<WorkflowTaskFailedCause> for FailureReason {
777 fn from(v: WorkflowTaskFailedCause) -> Self {
778 match v {
779 WorkflowTaskFailedCause::NonDeterministicError => FailureReason::Nondeterminism,
780 _ => FailureReason::Workflow,
781 }
782 }
783}
784pub(crate) fn failure_reason(reason: FailureReason) -> MetricKeyValue {
785 MetricKeyValue::new(KEY_TASK_FAILURE_TYPE, reason.to_string())
786}
787
788pub(crate) fn should_record_failure_metric(failure: &Option<Failure>) -> bool {
790 !failure
791 .as_ref()
792 .is_some_and(|f| f.is_benign_application_failure())
793}
794
795#[derive(Debug)]
797pub struct MetricsCallBuffer<I>
798where
799 I: BufferInstrumentRef,
800{
801 calls_rx: crossbeam_channel::Receiver<MetricEvent<I>>,
802 calls_tx: LogErrOnFullSender<MetricEvent<I>>,
803}
804#[derive(Clone, Debug)]
805struct LogErrOnFullSender<I>(crossbeam_channel::Sender<I>);
806impl<I> LogErrOnFullSender<I> {
807 fn send(&self, v: I) {
808 if let Err(crossbeam_channel::TrySendError::Full(_)) = self.0.try_send(v) {
809 error!(
810 "Core's metrics buffer is full! Dropping call to record metrics. \
811 Make sure you drain the metric buffer often!"
812 );
813 }
814 }
815}
816
817impl<I> MetricsCallBuffer<I>
818where
819 I: Clone + BufferInstrumentRef,
820{
821 pub fn new(buffer_size: usize) -> Self {
823 let (calls_tx, calls_rx) = crossbeam_channel::bounded(buffer_size);
824 MetricsCallBuffer {
825 calls_rx,
826 calls_tx: LogErrOnFullSender(calls_tx),
827 }
828 }
829 fn new_instrument(&self, params: MetricParameters, kind: MetricKind) -> BufferInstrument<I> {
830 let hole = LazyBufferInstrument::hole();
831 self.calls_tx.send(MetricEvent::Create {
832 params,
833 kind,
834 populate_into: hole.clone(),
835 });
836 BufferInstrument {
837 instrument_ref: hole,
838 tx: self.calls_tx.clone(),
839 }
840 }
841}
842
843impl<I> CoreMeter for MetricsCallBuffer<I>
844where
845 I: BufferInstrumentRef + Debug + Send + Sync + Clone + 'static,
846{
847 fn new_attributes(&self, opts: NewAttributes) -> MetricAttributes {
848 let ba = BufferAttributes::hole();
849 self.calls_tx.send(MetricEvent::CreateAttributes {
850 populate_into: ba.clone(),
851 append_from: None,
852 attributes: opts.attributes,
853 });
854 MetricAttributes::Buffer(ba)
855 }
856
857 fn extend_attributes(
858 &self,
859 existing: MetricAttributes,
860 attribs: NewAttributes,
861 ) -> MetricAttributes {
862 if let MetricAttributes::Buffer(ol) = existing {
863 let ba = BufferAttributes::hole();
864 self.calls_tx.send(MetricEvent::CreateAttributes {
865 populate_into: ba.clone(),
866 append_from: Some(ol),
867 attributes: attribs.attributes,
868 });
869 MetricAttributes::Buffer(ba)
870 } else {
871 dbg_panic!("Must use buffer attributes with a buffer metric implementation");
872 existing
873 }
874 }
875
876 fn counter(&self, params: MetricParameters) -> Counter {
877 Counter::new(Arc::new(self.new_instrument(params, MetricKind::Counter)))
878 }
879
880 fn histogram(&self, params: MetricParameters) -> Histogram {
881 Histogram::new(Arc::new(self.new_instrument(params, MetricKind::Histogram)))
882 }
883
884 fn histogram_f64(&self, params: MetricParameters) -> HistogramF64 {
885 HistogramF64::new(Arc::new(
886 self.new_instrument(params, MetricKind::HistogramF64),
887 ))
888 }
889
890 fn histogram_duration(&self, params: MetricParameters) -> HistogramDuration {
891 HistogramDuration::new(Arc::new(
892 self.new_instrument(params, MetricKind::HistogramDuration),
893 ))
894 }
895
896 fn gauge(&self, params: MetricParameters) -> Gauge {
897 Gauge::new(Arc::new(self.new_instrument(params, MetricKind::Gauge)))
898 }
899
900 fn gauge_f64(&self, params: MetricParameters) -> GaugeF64 {
901 GaugeF64::new(Arc::new(self.new_instrument(params, MetricKind::GaugeF64)))
902 }
903
904 fn up_down_counter(&self, params: MetricParameters) -> UpDownCounter {
905 UpDownCounter::new(Arc::new(
906 self.new_instrument(params, MetricKind::UpDownCounter),
907 ))
908 }
909}
910impl<I> MetricCallBufferer<I> for MetricsCallBuffer<I>
911where
912 I: Send + Sync + BufferInstrumentRef,
913{
914 fn retrieve(&self) -> Vec<MetricEvent<I>> {
915 self.calls_rx.try_iter().collect()
916 }
917}
918
919#[derive(Clone)]
920struct BufferInstrument<I: BufferInstrumentRef> {
921 instrument_ref: LazyBufferInstrument<I>,
922 tx: LogErrOnFullSender<MetricEvent<I>>,
923}
924impl<I> BufferInstrument<I>
925where
926 I: Clone + BufferInstrumentRef,
927{
928 fn send(&self, value: MetricUpdateVal, attributes: &MetricAttributes) {
929 let attributes = match attributes {
930 MetricAttributes::Buffer(l) => l.clone(),
931 e => panic!(
932 "MetricsCallBuffer only works with MetricAttributes::Buffer, but used: {:?}",
933 e
934 ),
935 };
936 self.tx.send(MetricEvent::Update {
937 instrument: self.instrument_ref.clone(),
938 update: value,
939 attributes: attributes.clone(),
940 });
941 }
942}
943
944#[derive(Clone)]
945struct InstrumentWithAttributes<I> {
946 inner: I,
947 attributes: MetricAttributes,
948}
949
950impl<I> MetricAttributable<Box<dyn CounterBase>> for BufferInstrument<I>
951where
952 I: BufferInstrumentRef + Send + Sync + Clone + 'static,
953{
954 fn with_attributes(
955 &self,
956 attributes: &MetricAttributes,
957 ) -> Result<Box<dyn CounterBase>, Box<dyn std::error::Error>> {
958 Ok(Box::new(InstrumentWithAttributes {
959 inner: self.clone(),
960 attributes: attributes.clone(),
961 }))
962 }
963}
964
965impl<I> MetricAttributable<Box<dyn HistogramBase>> for BufferInstrument<I>
966where
967 I: BufferInstrumentRef + Send + Sync + Clone + 'static,
968{
969 fn with_attributes(
970 &self,
971 attributes: &MetricAttributes,
972 ) -> Result<Box<dyn HistogramBase>, Box<dyn std::error::Error>> {
973 Ok(Box::new(InstrumentWithAttributes {
974 inner: self.clone(),
975 attributes: attributes.clone(),
976 }))
977 }
978}
979
980impl<I> MetricAttributable<Box<dyn HistogramF64Base>> for BufferInstrument<I>
981where
982 I: BufferInstrumentRef + Send + Sync + Clone + 'static,
983{
984 fn with_attributes(
985 &self,
986 attributes: &MetricAttributes,
987 ) -> Result<Box<dyn HistogramF64Base>, Box<dyn std::error::Error>> {
988 Ok(Box::new(InstrumentWithAttributes {
989 inner: self.clone(),
990 attributes: attributes.clone(),
991 }))
992 }
993}
994
995impl<I> MetricAttributable<Box<dyn HistogramDurationBase>> for BufferInstrument<I>
996where
997 I: BufferInstrumentRef + Send + Sync + Clone + 'static,
998{
999 fn with_attributes(
1000 &self,
1001 attributes: &MetricAttributes,
1002 ) -> Result<Box<dyn HistogramDurationBase>, Box<dyn std::error::Error>> {
1003 Ok(Box::new(InstrumentWithAttributes {
1004 inner: self.clone(),
1005 attributes: attributes.clone(),
1006 }))
1007 }
1008}
1009
1010impl<I> MetricAttributable<Box<dyn GaugeBase>> for BufferInstrument<I>
1011where
1012 I: BufferInstrumentRef + Send + Sync + Clone + 'static,
1013{
1014 fn with_attributes(
1015 &self,
1016 attributes: &MetricAttributes,
1017 ) -> Result<Box<dyn GaugeBase>, Box<dyn std::error::Error>> {
1018 Ok(Box::new(InstrumentWithAttributes {
1019 inner: self.clone(),
1020 attributes: attributes.clone(),
1021 }))
1022 }
1023}
1024
1025impl<I> MetricAttributable<Box<dyn GaugeF64Base>> for BufferInstrument<I>
1026where
1027 I: BufferInstrumentRef + Send + Sync + Clone + 'static,
1028{
1029 fn with_attributes(
1030 &self,
1031 attributes: &MetricAttributes,
1032 ) -> Result<Box<dyn GaugeF64Base>, Box<dyn std::error::Error>> {
1033 Ok(Box::new(InstrumentWithAttributes {
1034 inner: self.clone(),
1035 attributes: attributes.clone(),
1036 }))
1037 }
1038}
1039
1040impl<I> MetricAttributable<Box<dyn UpDownCounterBase>> for BufferInstrument<I>
1041where
1042 I: BufferInstrumentRef + Send + Sync + Clone + 'static,
1043{
1044 fn with_attributes(
1045 &self,
1046 attributes: &MetricAttributes,
1047 ) -> Result<Box<dyn UpDownCounterBase>, Box<dyn std::error::Error>> {
1048 Ok(Box::new(InstrumentWithAttributes {
1049 inner: self.clone(),
1050 attributes: attributes.clone(),
1051 }))
1052 }
1053}
1054
1055impl<I> CounterBase for InstrumentWithAttributes<BufferInstrument<I>>
1056where
1057 I: BufferInstrumentRef + Send + Sync + Clone + 'static,
1058{
1059 fn adds(&self, value: u64) {
1060 self.inner
1061 .send(MetricUpdateVal::Delta(value), &self.attributes)
1062 }
1063}
1064impl<I> GaugeBase for InstrumentWithAttributes<BufferInstrument<I>>
1065where
1066 I: BufferInstrumentRef + Send + Sync + Clone + 'static,
1067{
1068 fn records(&self, value: u64) {
1069 self.inner
1070 .send(MetricUpdateVal::Value(value), &self.attributes)
1071 }
1072}
1073impl<I> GaugeF64Base for InstrumentWithAttributes<BufferInstrument<I>>
1074where
1075 I: BufferInstrumentRef + Send + Sync + Clone + 'static,
1076{
1077 fn records(&self, value: f64) {
1078 self.inner
1079 .send(MetricUpdateVal::ValueF64(value), &self.attributes)
1080 }
1081}
1082impl<I> HistogramBase for InstrumentWithAttributes<BufferInstrument<I>>
1083where
1084 I: BufferInstrumentRef + Send + Sync + Clone + 'static,
1085{
1086 fn records(&self, value: u64) {
1087 self.inner
1088 .send(MetricUpdateVal::Value(value), &self.attributes)
1089 }
1090}
1091impl<I> HistogramF64Base for InstrumentWithAttributes<BufferInstrument<I>>
1092where
1093 I: BufferInstrumentRef + Send + Sync + Clone + 'static,
1094{
1095 fn records(&self, value: f64) {
1096 self.inner
1097 .send(MetricUpdateVal::ValueF64(value), &self.attributes)
1098 }
1099}
1100impl<I> HistogramDurationBase for InstrumentWithAttributes<BufferInstrument<I>>
1101where
1102 I: BufferInstrumentRef + Send + Sync + Clone + 'static,
1103{
1104 fn records(&self, value: Duration) {
1105 self.inner
1106 .send(MetricUpdateVal::Duration(value), &self.attributes)
1107 }
1108}
1109
1110impl<I> UpDownCounterBase for InstrumentWithAttributes<BufferInstrument<I>>
1111where
1112 I: BufferInstrumentRef + Send + Sync + Clone + 'static,
1113{
1114 fn adds(&self, value: i64) {
1115 self.inner
1116 .send(MetricUpdateVal::SignedDelta(value), &self.attributes)
1117 }
1118}
1119
1120#[cfg(test)]
1121mod tests {
1122 use super::*;
1123 use std::any::Any;
1124 use temporalio_common::telemetry::{
1125 TelemetryOptions,
1126 metrics::core::{BufferInstrumentRef, CustomMetricAttributes},
1127 telemetry_init,
1128 };
1129
1130 #[derive(Debug)]
1131 struct DummyCustomAttrs(usize);
1132 impl CustomMetricAttributes for DummyCustomAttrs {
1133 fn as_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
1134 self as Arc<dyn Any + Send + Sync>
1135 }
1136 }
1137 impl DummyCustomAttrs {
1138 fn as_id(ba: &BufferAttributes) -> usize {
1139 let as_dum = ba
1140 .get()
1141 .clone()
1142 .as_any()
1143 .downcast::<DummyCustomAttrs>()
1144 .unwrap();
1145 as_dum.0
1146 }
1147 }
1148
1149 #[derive(Debug, Clone)]
1150 struct DummyInstrumentRef(usize);
1151 impl BufferInstrumentRef for DummyInstrumentRef {}
1152
1153 #[test]
1154 fn test_buffered_core_context() {
1155 let call_buffer = Arc::new(MetricsCallBuffer::new(100));
1156 let telem_instance = telemetry_init(
1157 TelemetryOptions::builder()
1158 .metrics(call_buffer.clone() as Arc<dyn CoreMeter>)
1159 .build(),
1160 )
1161 .unwrap();
1162 let mc = MetricsContext::top_level("foo".to_string(), "q".to_string(), &telem_instance);
1163 mc.forced_cache_eviction();
1164 let events = call_buffer.retrieve();
1165 let a1 = assert_matches!(
1166 &events[0],
1167 MetricEvent::CreateAttributes {
1168 populate_into,
1169 append_from: None,
1170 attributes,
1171 }
1172 if attributes[0].key == "service_name"
1173 => populate_into
1174 );
1175 let a2 = assert_matches!(
1176 &events[1],
1177 MetricEvent::CreateAttributes {
1178 populate_into,
1179 append_from: Some(_),
1180 attributes,
1181 }
1182 if attributes[0].key == "namespace" &&
1183 attributes[1].key == "task_queue"
1184 => populate_into
1185 );
1186 a1.set(Arc::new(DummyCustomAttrs(1))).unwrap();
1187 a2.set(Arc::new(DummyCustomAttrs(2))).unwrap();
1188 let num_metrics = 35;
1191 #[allow(clippy::needless_range_loop)] for metric_num in 2..=num_metrics + 1 {
1193 let hole = assert_matches!(&events[metric_num],
1194 MetricEvent::Create { populate_into, .. }
1195 => populate_into
1196 );
1197 hole.set(Arc::new(DummyInstrumentRef(metric_num))).unwrap();
1198 }
1199 assert_matches!(
1200 &events[num_metrics + 2], MetricEvent::Update {
1202 instrument,
1203 attributes,
1204 update: MetricUpdateVal::Delta(1)
1205 }
1206 if DummyCustomAttrs::as_id(attributes) == 2 && instrument.get().0 == num_metrics + 1
1207 );
1208 let mc2 = mc.with_new_attrs([MetricKeyValue::new("gotta", "go fast")]);
1210 mc2.wf_task_latency(Duration::from_secs(1));
1211 let events = call_buffer.retrieve();
1212 dbg!(&events);
1213 let a2 = assert_matches!(
1214 &events[0],
1215 MetricEvent::CreateAttributes {
1216 populate_into,
1217 append_from: Some(eh),
1218 attributes
1219 }
1220 if attributes[0].key == "gotta" && DummyCustomAttrs::as_id(eh) == 2
1221 => populate_into
1222 );
1223 a2.set(Arc::new(DummyCustomAttrs(3))).unwrap();
1224 assert_matches!(
1225 &events[1],
1226 MetricEvent::Update {
1227 instrument,
1228 attributes,
1229 update: MetricUpdateVal::Duration(d)
1230 }
1231 if DummyCustomAttrs::as_id(attributes) == 3 && instrument.get().0 == 12
1232 && d == &Duration::from_secs(1)
1233 );
1234 }
1235
1236 #[test]
1237 fn metric_buffer() {
1238 let call_buffer = MetricsCallBuffer::new(12);
1239 let ctr = call_buffer.counter(MetricParameters {
1240 name: "ctr".into(),
1241 description: "a counter".into(),
1242 unit: "grognaks".into(),
1243 });
1244 let histo = call_buffer.histogram(MetricParameters {
1245 name: "histo".into(),
1246 description: "a histogram".into(),
1247 unit: "flubarbs".into(),
1248 });
1249 let gauge = call_buffer.gauge(MetricParameters {
1250 name: "gauge".into(),
1251 description: "a counter".into(),
1252 unit: "bleezles".into(),
1253 });
1254 let histo_dur = call_buffer.histogram_duration(MetricParameters {
1255 name: "histo_dur".into(),
1256 description: "a duration histogram".into(),
1257 unit: "seconds".into(),
1258 });
1259 let up_down_ctr = call_buffer.up_down_counter(MetricParameters {
1260 name: "up_down_ctr".into(),
1261 description: "an up down counter".into(),
1262 unit: "widgets".into(),
1263 });
1264 let attrs_1 = call_buffer.new_attributes(NewAttributes {
1265 attributes: vec![MetricKeyValue::new("hi", "yo")],
1266 });
1267 let attrs_2 = call_buffer.new_attributes(NewAttributes {
1268 attributes: vec![MetricKeyValue::new("run", "fast")],
1269 });
1270 ctr.add(1, &attrs_1);
1271 histo.record(2, &attrs_1);
1272 gauge.record(3, &attrs_2);
1273 histo_dur.record(Duration::from_secs_f64(1.2), &attrs_1);
1274 up_down_ctr.add(-3, &attrs_2);
1275
1276 let mut calls = call_buffer.retrieve();
1277 calls.reverse();
1278 let ctr_1 = assert_matches!(
1279 calls.pop(),
1280 Some(MetricEvent::Create {
1281 params,
1282 populate_into,
1283 kind: MetricKind::Counter
1284 })
1285 if params.name == "ctr"
1286 => populate_into
1287 );
1288 ctr_1.set(Arc::new(DummyInstrumentRef(1))).unwrap();
1289 let hist_2 = assert_matches!(
1290 calls.pop(),
1291 Some(MetricEvent::Create {
1292 params,
1293 populate_into,
1294 kind: MetricKind::Histogram
1295 })
1296 if params.name == "histo"
1297 => populate_into
1298 );
1299 hist_2.set(Arc::new(DummyInstrumentRef(2))).unwrap();
1300 let gauge_3 = assert_matches!(
1301 calls.pop(),
1302 Some(MetricEvent::Create {
1303 params,
1304 populate_into,
1305 kind: MetricKind::Gauge
1306 })
1307 if params.name == "gauge"
1308 => populate_into
1309 );
1310 gauge_3.set(Arc::new(DummyInstrumentRef(3))).unwrap();
1311 let hist_4 = assert_matches!(
1312 calls.pop(),
1313 Some(MetricEvent::Create {
1314 params,
1315 populate_into,
1316 kind: MetricKind::HistogramDuration
1317 })
1318 if params.name == "histo_dur"
1319 => populate_into
1320 );
1321 hist_4.set(Arc::new(DummyInstrumentRef(4))).unwrap();
1322 let up_down_5 = assert_matches!(
1323 calls.pop(),
1324 Some(MetricEvent::Create {
1325 params,
1326 populate_into,
1327 kind: MetricKind::UpDownCounter
1328 })
1329 if params.name == "up_down_ctr"
1330 => populate_into
1331 );
1332 up_down_5.set(Arc::new(DummyInstrumentRef(5))).unwrap();
1333 let a1 = assert_matches!(
1334 calls.pop(),
1335 Some(MetricEvent::CreateAttributes {
1336 populate_into,
1337 append_from: None,
1338 attributes
1339 })
1340 if attributes[0].key == "hi"
1341 => populate_into
1342 );
1343 a1.set(Arc::new(DummyCustomAttrs(1))).unwrap();
1344 let a2 = assert_matches!(
1345 calls.pop(),
1346 Some(MetricEvent::CreateAttributes {
1347 populate_into,
1348 append_from: None,
1349 attributes
1350 })
1351 if attributes[0].key == "run"
1352 => populate_into
1353 );
1354 a2.set(Arc::new(DummyCustomAttrs(2))).unwrap();
1355 assert_matches!(
1356 calls.pop(),
1357 Some(MetricEvent::Update{
1358 instrument,
1359 attributes,
1360 update: MetricUpdateVal::Delta(1)
1361 })
1362 if DummyCustomAttrs::as_id(&attributes) == 1 && instrument.get().0 == 1
1363 );
1364 assert_matches!(
1365 calls.pop(),
1366 Some(MetricEvent::Update{
1367 instrument,
1368 attributes,
1369 update: MetricUpdateVal::Value(2)
1370 })
1371 if DummyCustomAttrs::as_id(&attributes) == 1 && instrument.get().0 == 2
1372 );
1373 assert_matches!(
1374 calls.pop(),
1375 Some(MetricEvent::Update{
1376 instrument,
1377 attributes,
1378 update: MetricUpdateVal::Value(3)
1379 })
1380 if DummyCustomAttrs::as_id(&attributes) == 2 && instrument.get().0 == 3
1381 );
1382 assert_matches!(
1383 calls.pop(),
1384 Some(MetricEvent::Update{
1385 instrument,
1386 attributes,
1387 update: MetricUpdateVal::Duration(d)
1388 })
1389 if DummyCustomAttrs::as_id(&attributes) == 1 && instrument.get().0 == 4
1390 && d == Duration::from_secs_f64(1.2)
1391 );
1392 assert_matches!(
1393 calls.pop(),
1394 Some(MetricEvent::Update{
1395 instrument,
1396 attributes,
1397 update: MetricUpdateVal::SignedDelta(-3)
1398 })
1399 if DummyCustomAttrs::as_id(&attributes) == 2 && instrument.get().0 == 5
1400 );
1401 }
1402}