1use std::{future::Future, time::Instant};
2
3use tonic::{Code, Status};
4use tracing::Instrument;
5
6use crate::observability::{CorrelationContext, MetricsRegistry, RpcMetricLabels};
7
8pub async fn observe_rpc_unary<T, Fut>(
10 metrics: Option<&MetricsRegistry>,
11 service: &str,
12 method: &str,
13 request_id: Option<&str>,
14 future: Fut,
15) -> Result<T, Status>
16where
17 Fut: Future<Output = Result<T, Status>>,
18{
19 observe_rpc_unary_with_context(metrics, service, method, request_id, None, future).await
20}
21
22pub async fn observe_rpc_unary_with_context<T, Fut>(
24 metrics: Option<&MetricsRegistry>,
25 service: &str,
26 method: &str,
27 request_id: Option<&str>,
28 traceparent: Option<&str>,
29 future: Fut,
30) -> Result<T, Status>
31where
32 Fut: Future<Output = Result<T, Status>>,
33{
34 let started = Instant::now();
35 let correlation = CorrelationContext::from_rpc_parts(service, method, request_id, traceparent);
36 let span = tracing::info_span!(
37 "rs_zero.rpc.unary",
38 rpc.service = service,
39 rpc.method = method,
40 service = service,
41 transport = "grpc",
42 route = method,
43 method = method,
44 request_id = correlation.request_id().unwrap_or(""),
45 traceparent = correlation.traceparent().unwrap_or(""),
46 code = tracing::field::Empty,
47 trace_id = tracing::field::Empty,
48 span_id = tracing::field::Empty
49 );
50 #[cfg(feature = "otlp")]
51 if let Some(context) =
52 traceparent.and_then(crate::observability::opentelemetry_context_from_traceparent)
53 {
54 use tracing_opentelemetry::OpenTelemetrySpanExt;
55 let _ = span.set_parent(context);
56 }
57 let result = future.instrument(span.clone()).await;
58 let code = result.as_ref().err().map(Status::code).unwrap_or(Code::Ok);
59 let code_label = format!("{code:?}");
60 span.record("code", tracing::field::display(&code_label));
61 let correlation = correlation.with_status(code_label.clone());
62 if let Some(trace_id) = correlation.trace_id() {
63 span.record("trace_id", tracing::field::display(trace_id));
64 }
65 if let Some(span_id) = correlation.span_id() {
66 span.record("span_id", tracing::field::display(span_id));
67 }
68 let duration = started.elapsed();
69 tracing::info!(
70 rpc.service = service,
71 rpc.method = method,
72 service = service,
73 transport = "grpc",
74 route = method,
75 method = method,
76 request_id = correlation.request_id().unwrap_or(""),
77 traceparent = correlation.traceparent().unwrap_or(""),
78 trace_id = correlation.trace_id().unwrap_or(""),
79 span_id = correlation.span_id().unwrap_or(""),
80 code = %code_label,
81 duration_ms = duration.as_secs_f64() * 1000.0,
82 "rpc unary observed"
83 );
84 if let Some(metrics) = metrics {
85 metrics.record_rpc_request(RpcMetricLabels::new(service, method, code_label), duration);
86 }
87 result
88}
89
90#[cfg(feature = "rpc")]
92pub async fn observe_rpc_unary_with_metadata<T, Fut>(
93 metrics: Option<&MetricsRegistry>,
94 service: &str,
95 method: &str,
96 metadata: &tonic::metadata::MetadataMap,
97 future: Fut,
98) -> Result<T, Status>
99where
100 Fut: Future<Output = Result<T, Status>>,
101{
102 let request_id = crate::observability::request_id_from_metadata(metadata);
103 let traceparent = crate::observability::traceparent_from_metadata(metadata);
104 observe_rpc_unary_with_context(
105 metrics,
106 service,
107 method,
108 request_id.as_deref(),
109 traceparent.as_deref(),
110 future,
111 )
112 .await
113}
114
115#[cfg(feature = "rpc")]
117pub fn record_rpc_streaming_snapshot(
118 metrics: Option<&MetricsRegistry>,
119 service: &str,
120 method: &str,
121 snapshot: &crate::rpc::streaming::RpcStreamingSnapshot,
122) {
123 let code = snapshot.code.unwrap_or(Code::Ok);
124 let code_label = format!("{code:?}");
125 tracing::info!(
126 rpc.service = service,
127 rpc.method = method,
128 code = %code_label,
129 sent_messages = snapshot.sent_messages,
130 received_messages = snapshot.received_messages,
131 "rpc stream observed"
132 );
133 if let Some(metrics) = metrics {
134 metrics.record_rpc_request(
135 RpcMetricLabels::new(service, method, code_label),
136 snapshot.duration,
137 );
138 }
139}