Skip to main content

temporalio_client/
metrics.rs

1use crate::{AttachMetricLabels, CallType, callback_based, dbg_panic};
2use futures_util::{
3    FutureExt, TryFutureExt,
4    future::{BoxFuture, Either},
5};
6use std::{
7    fmt,
8    task::{Context, Poll},
9    time::{Duration, Instant},
10};
11use temporalio_common::telemetry::{
12    TaskQueueLabelStrategy,
13    metrics::{
14        Counter, CounterBase, HistogramDuration, HistogramDurationBase, MetricAttributable,
15        MetricAttributes, MetricKeyValue, MetricParameters, TemporalMeter,
16    },
17};
18use tonic::{Code, body::Body, transport::Channel};
19use tower::Service;
20
21/// The string name (which may be prefixed) for this metric
22pub static REQUEST_LATENCY_HISTOGRAM_NAME: &str = "request_latency";
23/// The string name (which may be prefixed) for this metric
24pub static LONG_REQUEST_LATENCY_HISTOGRAM_NAME: &str = "long_request_latency";
25
26/// Used to track context associated with metrics, and record/update them
27#[derive(Clone, derive_more::Debug)]
28#[debug("MetricsContext {{ poll_is_long: {poll_is_long} }}")]
29pub(crate) struct MetricsContext {
30    meter: TemporalMeter,
31    poll_is_long: bool,
32    instruments: Instruments,
33}
34#[derive(Clone)]
35struct Instruments {
36    svc_request: Counter,
37    svc_request_failed: Counter,
38    long_svc_request: Counter,
39    long_svc_request_failed: Counter,
40
41    svc_request_latency: HistogramDuration,
42    long_svc_request_latency: HistogramDuration,
43}
44
45impl MetricsContext {
46    pub(crate) fn new(tm: TemporalMeter) -> Self {
47        let instruments = Instruments {
48            svc_request: tm.counter(MetricParameters {
49                name: "request".into(),
50                description: "Count of client request successes by rpc name".into(),
51                unit: "".into(),
52            }),
53            svc_request_failed: tm.counter(MetricParameters {
54                name: "request_failure".into(),
55                description: "Count of client request failures by rpc name".into(),
56                unit: "".into(),
57            }),
58            long_svc_request: tm.counter(MetricParameters {
59                name: "long_request".into(),
60                description: "Count of long-poll request successes by rpc name".into(),
61                unit: "".into(),
62            }),
63            long_svc_request_failed: tm.counter(MetricParameters {
64                name: "long_request_failure".into(),
65                description: "Count of long-poll request failures by rpc name".into(),
66                unit: "".into(),
67            }),
68            svc_request_latency: tm.histogram_duration(MetricParameters {
69                name: REQUEST_LATENCY_HISTOGRAM_NAME.into(),
70                unit: "duration".into(),
71                description: "Histogram of client request latencies".into(),
72            }),
73            long_svc_request_latency: tm.histogram_duration(MetricParameters {
74                name: LONG_REQUEST_LATENCY_HISTOGRAM_NAME.into(),
75                unit: "duration".into(),
76                description: "Histogram of client long-poll request latencies".into(),
77            }),
78        };
79        Self {
80            poll_is_long: false,
81            instruments,
82            meter: tm,
83        }
84    }
85
86    /// Mutate this metrics context with new attributes
87    pub(crate) fn with_new_attrs(&mut self, new_kvs: impl IntoIterator<Item = MetricKeyValue>) {
88        self.meter.merge_attributes(new_kvs.into());
89
90        let _ = self
91            .instruments
92            .svc_request
93            .with_attributes(self.meter.get_default_attributes())
94            .and_then(|v| {
95                self.instruments.svc_request = v;
96                self.instruments
97                    .long_svc_request
98                    .with_attributes(self.meter.get_default_attributes())
99            })
100            .and_then(|v| {
101                self.instruments.long_svc_request = v;
102                self.instruments
103                    .svc_request_latency
104                    .with_attributes(self.meter.get_default_attributes())
105            })
106            .and_then(|v| {
107                self.instruments.svc_request_latency = v;
108                self.instruments
109                    .long_svc_request_latency
110                    .with_attributes(self.meter.get_default_attributes())
111            })
112            .map(|v| {
113                self.instruments.long_svc_request_latency = v;
114            })
115            .inspect_err(|e| {
116                dbg_panic!("Failed to extend client metrics attributes: {:?}", e);
117            });
118    }
119
120    pub(crate) fn set_is_long_poll(&mut self) {
121        self.poll_is_long = true;
122    }
123
124    /// A request to the temporal service was made
125    pub(crate) fn svc_request(&self) {
126        if self.poll_is_long {
127            self.instruments.long_svc_request.adds(1);
128        } else {
129            self.instruments.svc_request.adds(1);
130        }
131    }
132
133    /// A request to the temporal service failed
134    pub(crate) fn svc_request_failed(&self, code: Option<Code>) {
135        let refme: MetricAttributes;
136        let kvs = if let Some(c) = code {
137            refme = self.meter.extend_attributes(
138                self.meter.get_default_attributes().clone(),
139                [status_code_kv(c)].into(),
140            );
141            &refme
142        } else {
143            self.meter.get_default_attributes()
144        };
145        if self.poll_is_long {
146            self.instruments.long_svc_request_failed.add(1, kvs);
147        } else {
148            self.instruments.svc_request_failed.add(1, kvs);
149        }
150    }
151
152    /// Record service request latency
153    pub(crate) fn record_svc_req_latency(&self, dur: Duration) {
154        if self.poll_is_long {
155            self.instruments.long_svc_request_latency.records(dur);
156        } else {
157            self.instruments.svc_request_latency.records(dur);
158        }
159    }
160}
161
162const KEY_NAMESPACE: &str = "namespace";
163const KEY_SVC_METHOD: &str = "operation";
164const KEY_TASK_QUEUE: &str = "task_queue";
165const KEY_STATUS_CODE: &str = "status_code";
166
167pub(crate) fn namespace_kv(ns: String) -> MetricKeyValue {
168    MetricKeyValue::new(KEY_NAMESPACE, ns)
169}
170
171pub(crate) fn task_queue_kv(tq: String) -> MetricKeyValue {
172    MetricKeyValue::new(KEY_TASK_QUEUE, tq)
173}
174
175pub(crate) fn svc_operation(op: String) -> MetricKeyValue {
176    MetricKeyValue::new(KEY_SVC_METHOD, op)
177}
178
179pub(crate) fn status_code_kv(code: Code) -> MetricKeyValue {
180    MetricKeyValue::new(KEY_STATUS_CODE, code_as_screaming_snake(&code))
181}
182
183/// This is done to match the way Java sdk labels these codes (and also matches gRPC spec)
184fn code_as_screaming_snake(code: &Code) -> &'static str {
185    match code {
186        Code::Ok => "OK",
187        Code::Cancelled => "CANCELLED",
188        Code::Unknown => "UNKNOWN",
189        Code::InvalidArgument => "INVALID_ARGUMENT",
190        Code::DeadlineExceeded => "DEADLINE_EXCEEDED",
191        Code::NotFound => "NOT_FOUND",
192        Code::AlreadyExists => "ALREADY_EXISTS",
193        Code::PermissionDenied => "PERMISSION_DENIED",
194        Code::ResourceExhausted => "RESOURCE_EXHAUSTED",
195        Code::FailedPrecondition => "FAILED_PRECONDITION",
196        Code::Aborted => "ABORTED",
197        Code::OutOfRange => "OUT_OF_RANGE",
198        Code::Unimplemented => "UNIMPLEMENTED",
199        Code::Internal => "INTERNAL",
200        Code::Unavailable => "UNAVAILABLE",
201        Code::DataLoss => "DATA_LOSS",
202        Code::Unauthenticated => "UNAUTHENTICATED",
203    }
204}
205
206/// Implements metrics functionality for gRPC (really, any http) calls
207#[derive(Debug, Clone)]
208pub(crate) struct GrpcMetricSvc {
209    pub(crate) inner: ChannelOrGrpcOverride,
210    // If set to none, metrics are a no-op
211    pub(crate) metrics: Option<MetricsContext>,
212    pub(crate) disable_errcode_label: bool,
213}
214
215#[derive(Clone)]
216pub(crate) enum ChannelOrGrpcOverride {
217    Channel(Channel),
218    GrpcOverride(callback_based::CallbackBasedGrpcService),
219}
220
221impl fmt::Debug for ChannelOrGrpcOverride {
222    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
223        match self {
224            ChannelOrGrpcOverride::Channel(inner) => fmt::Debug::fmt(inner, f),
225            ChannelOrGrpcOverride::GrpcOverride(_) => f.write_str("<callback-based-grpc-service>"),
226        }
227    }
228}
229
230// TODO: Rewrite as a RawGrpcCaller implementation
231impl Service<http::Request<Body>> for GrpcMetricSvc {
232    type Response = http::Response<Body>;
233    type Error = Box<dyn std::error::Error + Send + Sync>;
234    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
235
236    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
237        match &mut self.inner {
238            ChannelOrGrpcOverride::Channel(inner) => inner.poll_ready(cx).map_err(Into::into),
239            ChannelOrGrpcOverride::GrpcOverride(inner) => inner.poll_ready(cx).map_err(Into::into),
240        }
241    }
242
243    fn call(&mut self, mut req: http::Request<Body>) -> Self::Future {
244        let metrics = self
245            .metrics
246            .clone()
247            .map(|mut m| {
248                // Attach labels from client wrapper
249                if let Some(other_labels) = req.extensions_mut().remove::<AttachMetricLabels>() {
250                    m.with_new_attrs(other_labels.labels);
251
252                    if other_labels.normal_task_queue.is_some()
253                        || other_labels.sticky_task_queue.is_some()
254                    {
255                        let task_queue_name = match m.meter.get_task_queue_label_strategy() {
256                            TaskQueueLabelStrategy::UseNormal => other_labels.normal_task_queue,
257                            TaskQueueLabelStrategy::UseNormalAndSticky => other_labels
258                                .sticky_task_queue
259                                .or(other_labels.normal_task_queue),
260                            _ => other_labels.normal_task_queue,
261                        };
262
263                        if let Some(tq_name) = task_queue_name {
264                            m.with_new_attrs([task_queue_kv(tq_name)]);
265                        }
266                    }
267                }
268                if let Some(ct) = req.extensions().get::<CallType>()
269                    && ct.is_long()
270                {
271                    m.set_is_long_poll();
272                }
273                m
274            })
275            .and_then(|mut metrics| {
276                // Attach method name label if possible
277                req.uri().to_string().rsplit_once('/').map(|split_tup| {
278                    let method_name = split_tup.1;
279                    metrics.with_new_attrs([svc_operation(method_name.to_string())]);
280                    metrics.svc_request();
281                    metrics
282                })
283            });
284        let callfut = match &mut self.inner {
285            ChannelOrGrpcOverride::Channel(inner) => {
286                Either::Left(inner.call(req).map_err(Into::into))
287            }
288            ChannelOrGrpcOverride::GrpcOverride(inner) => {
289                Either::Right(inner.call(req).map_err(Into::into))
290            }
291        };
292        let errcode_label_disabled = self.disable_errcode_label;
293        async move {
294            let started = Instant::now();
295            let res = callfut.await;
296            if let Some(metrics) = metrics {
297                metrics.record_svc_req_latency(started.elapsed());
298                if let Ok(ref ok_res) = res
299                    && let Some(number) = ok_res
300                        .headers()
301                        .get("grpc-status")
302                        .and_then(|s| s.to_str().ok())
303                        .and_then(|s| s.parse::<i32>().ok())
304                {
305                    let code = Code::from(number);
306                    if code != Code::Ok {
307                        let code = if errcode_label_disabled {
308                            None
309                        } else {
310                            Some(code)
311                        };
312                        metrics.svc_request_failed(code);
313                    }
314                }
315            }
316            res
317        }
318        .boxed()
319    }
320}