1use crate::{AttachMetricLabels, CallType, callback_based, dbg_panic};
2use futures_util::{
3 FutureExt, TryFutureExt,
4 future::{BoxFuture, Either},
5};
6use std::{
7 fmt,
8 task::{Context, Poll},
9 time::{Duration, Instant},
10};
11use temporalio_common::telemetry::{
12 TaskQueueLabelStrategy,
13 metrics::{
14 Counter, CounterBase, HistogramDuration, HistogramDurationBase, MetricAttributable,
15 MetricAttributes, MetricKeyValue, MetricParameters, TemporalMeter,
16 },
17};
18use tonic::{Code, body::Body, transport::Channel};
19use tower::Service;
20
21pub static REQUEST_LATENCY_HISTOGRAM_NAME: &str = "request_latency";
23pub static LONG_REQUEST_LATENCY_HISTOGRAM_NAME: &str = "long_request_latency";
25
26#[derive(Clone, derive_more::Debug)]
28#[debug("MetricsContext {{ poll_is_long: {poll_is_long} }}")]
29pub(crate) struct MetricsContext {
30 meter: TemporalMeter,
31 poll_is_long: bool,
32 instruments: Instruments,
33}
34#[derive(Clone)]
35struct Instruments {
36 svc_request: Counter,
37 svc_request_failed: Counter,
38 long_svc_request: Counter,
39 long_svc_request_failed: Counter,
40
41 svc_request_latency: HistogramDuration,
42 long_svc_request_latency: HistogramDuration,
43}
44
45impl MetricsContext {
46 pub(crate) fn new(tm: TemporalMeter) -> Self {
47 let instruments = Instruments {
48 svc_request: tm.counter(MetricParameters {
49 name: "request".into(),
50 description: "Count of client request successes by rpc name".into(),
51 unit: "".into(),
52 }),
53 svc_request_failed: tm.counter(MetricParameters {
54 name: "request_failure".into(),
55 description: "Count of client request failures by rpc name".into(),
56 unit: "".into(),
57 }),
58 long_svc_request: tm.counter(MetricParameters {
59 name: "long_request".into(),
60 description: "Count of long-poll request successes by rpc name".into(),
61 unit: "".into(),
62 }),
63 long_svc_request_failed: tm.counter(MetricParameters {
64 name: "long_request_failure".into(),
65 description: "Count of long-poll request failures by rpc name".into(),
66 unit: "".into(),
67 }),
68 svc_request_latency: tm.histogram_duration(MetricParameters {
69 name: REQUEST_LATENCY_HISTOGRAM_NAME.into(),
70 unit: "duration".into(),
71 description: "Histogram of client request latencies".into(),
72 }),
73 long_svc_request_latency: tm.histogram_duration(MetricParameters {
74 name: LONG_REQUEST_LATENCY_HISTOGRAM_NAME.into(),
75 unit: "duration".into(),
76 description: "Histogram of client long-poll request latencies".into(),
77 }),
78 };
79 Self {
80 poll_is_long: false,
81 instruments,
82 meter: tm,
83 }
84 }
85
86 pub(crate) fn with_new_attrs(&mut self, new_kvs: impl IntoIterator<Item = MetricKeyValue>) {
88 self.meter.merge_attributes(new_kvs.into());
89
90 let _ = self
91 .instruments
92 .svc_request
93 .with_attributes(self.meter.get_default_attributes())
94 .and_then(|v| {
95 self.instruments.svc_request = v;
96 self.instruments
97 .long_svc_request
98 .with_attributes(self.meter.get_default_attributes())
99 })
100 .and_then(|v| {
101 self.instruments.long_svc_request = v;
102 self.instruments
103 .svc_request_latency
104 .with_attributes(self.meter.get_default_attributes())
105 })
106 .and_then(|v| {
107 self.instruments.svc_request_latency = v;
108 self.instruments
109 .long_svc_request_latency
110 .with_attributes(self.meter.get_default_attributes())
111 })
112 .map(|v| {
113 self.instruments.long_svc_request_latency = v;
114 })
115 .inspect_err(|e| {
116 dbg_panic!("Failed to extend client metrics attributes: {:?}", e);
117 });
118 }
119
120 pub(crate) fn set_is_long_poll(&mut self) {
121 self.poll_is_long = true;
122 }
123
124 pub(crate) fn svc_request(&self) {
126 if self.poll_is_long {
127 self.instruments.long_svc_request.adds(1);
128 } else {
129 self.instruments.svc_request.adds(1);
130 }
131 }
132
133 pub(crate) fn svc_request_failed(&self, code: Option<Code>) {
135 let refme: MetricAttributes;
136 let kvs = if let Some(c) = code {
137 refme = self.meter.extend_attributes(
138 self.meter.get_default_attributes().clone(),
139 [status_code_kv(c)].into(),
140 );
141 &refme
142 } else {
143 self.meter.get_default_attributes()
144 };
145 if self.poll_is_long {
146 self.instruments.long_svc_request_failed.add(1, kvs);
147 } else {
148 self.instruments.svc_request_failed.add(1, kvs);
149 }
150 }
151
152 pub(crate) fn record_svc_req_latency(&self, dur: Duration) {
154 if self.poll_is_long {
155 self.instruments.long_svc_request_latency.records(dur);
156 } else {
157 self.instruments.svc_request_latency.records(dur);
158 }
159 }
160}
161
162const KEY_NAMESPACE: &str = "namespace";
163const KEY_SVC_METHOD: &str = "operation";
164const KEY_TASK_QUEUE: &str = "task_queue";
165const KEY_STATUS_CODE: &str = "status_code";
166
167pub(crate) fn namespace_kv(ns: String) -> MetricKeyValue {
168 MetricKeyValue::new(KEY_NAMESPACE, ns)
169}
170
171pub(crate) fn task_queue_kv(tq: String) -> MetricKeyValue {
172 MetricKeyValue::new(KEY_TASK_QUEUE, tq)
173}
174
175pub(crate) fn svc_operation(op: String) -> MetricKeyValue {
176 MetricKeyValue::new(KEY_SVC_METHOD, op)
177}
178
179pub(crate) fn status_code_kv(code: Code) -> MetricKeyValue {
180 MetricKeyValue::new(KEY_STATUS_CODE, code_as_screaming_snake(&code))
181}
182
183fn code_as_screaming_snake(code: &Code) -> &'static str {
185 match code {
186 Code::Ok => "OK",
187 Code::Cancelled => "CANCELLED",
188 Code::Unknown => "UNKNOWN",
189 Code::InvalidArgument => "INVALID_ARGUMENT",
190 Code::DeadlineExceeded => "DEADLINE_EXCEEDED",
191 Code::NotFound => "NOT_FOUND",
192 Code::AlreadyExists => "ALREADY_EXISTS",
193 Code::PermissionDenied => "PERMISSION_DENIED",
194 Code::ResourceExhausted => "RESOURCE_EXHAUSTED",
195 Code::FailedPrecondition => "FAILED_PRECONDITION",
196 Code::Aborted => "ABORTED",
197 Code::OutOfRange => "OUT_OF_RANGE",
198 Code::Unimplemented => "UNIMPLEMENTED",
199 Code::Internal => "INTERNAL",
200 Code::Unavailable => "UNAVAILABLE",
201 Code::DataLoss => "DATA_LOSS",
202 Code::Unauthenticated => "UNAUTHENTICATED",
203 }
204}
205
206#[derive(Debug, Clone)]
208pub(crate) struct GrpcMetricSvc {
209 pub(crate) inner: ChannelOrGrpcOverride,
210 pub(crate) metrics: Option<MetricsContext>,
212 pub(crate) disable_errcode_label: bool,
213}
214
215#[derive(Clone)]
216pub(crate) enum ChannelOrGrpcOverride {
217 Channel(Channel),
218 GrpcOverride(callback_based::CallbackBasedGrpcService),
219}
220
221impl fmt::Debug for ChannelOrGrpcOverride {
222 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
223 match self {
224 ChannelOrGrpcOverride::Channel(inner) => fmt::Debug::fmt(inner, f),
225 ChannelOrGrpcOverride::GrpcOverride(_) => f.write_str("<callback-based-grpc-service>"),
226 }
227 }
228}
229
230impl Service<http::Request<Body>> for GrpcMetricSvc {
232 type Response = http::Response<Body>;
233 type Error = Box<dyn std::error::Error + Send + Sync>;
234 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
235
236 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
237 match &mut self.inner {
238 ChannelOrGrpcOverride::Channel(inner) => inner.poll_ready(cx).map_err(Into::into),
239 ChannelOrGrpcOverride::GrpcOverride(inner) => inner.poll_ready(cx).map_err(Into::into),
240 }
241 }
242
243 fn call(&mut self, mut req: http::Request<Body>) -> Self::Future {
244 let metrics = self
245 .metrics
246 .clone()
247 .map(|mut m| {
248 if let Some(other_labels) = req.extensions_mut().remove::<AttachMetricLabels>() {
250 m.with_new_attrs(other_labels.labels);
251
252 if other_labels.normal_task_queue.is_some()
253 || other_labels.sticky_task_queue.is_some()
254 {
255 let task_queue_name = match m.meter.get_task_queue_label_strategy() {
256 TaskQueueLabelStrategy::UseNormal => other_labels.normal_task_queue,
257 TaskQueueLabelStrategy::UseNormalAndSticky => other_labels
258 .sticky_task_queue
259 .or(other_labels.normal_task_queue),
260 _ => other_labels.normal_task_queue,
261 };
262
263 if let Some(tq_name) = task_queue_name {
264 m.with_new_attrs([task_queue_kv(tq_name)]);
265 }
266 }
267 }
268 if let Some(ct) = req.extensions().get::<CallType>()
269 && ct.is_long()
270 {
271 m.set_is_long_poll();
272 }
273 m
274 })
275 .and_then(|mut metrics| {
276 req.uri().to_string().rsplit_once('/').map(|split_tup| {
278 let method_name = split_tup.1;
279 metrics.with_new_attrs([svc_operation(method_name.to_string())]);
280 metrics.svc_request();
281 metrics
282 })
283 });
284 let callfut = match &mut self.inner {
285 ChannelOrGrpcOverride::Channel(inner) => {
286 Either::Left(inner.call(req).map_err(Into::into))
287 }
288 ChannelOrGrpcOverride::GrpcOverride(inner) => {
289 Either::Right(inner.call(req).map_err(Into::into))
290 }
291 };
292 let errcode_label_disabled = self.disable_errcode_label;
293 async move {
294 let started = Instant::now();
295 let res = callfut.await;
296 if let Some(metrics) = metrics {
297 metrics.record_svc_req_latency(started.elapsed());
298 if let Ok(ref ok_res) = res
299 && let Some(number) = ok_res
300 .headers()
301 .get("grpc-status")
302 .and_then(|s| s.to_str().ok())
303 .and_then(|s| s.parse::<i32>().ok())
304 {
305 let code = Code::from(number);
306 if code != Code::Ok {
307 let code = if errcode_label_disabled {
308 None
309 } else {
310 Some(code)
311 };
312 metrics.svc_request_failed(code);
313 }
314 }
315 }
316 res
317 }
318 .boxed()
319 }
320}