Skip to main content

rs_zero/observability/
rpc.rs

1use std::{future::Future, time::Instant};
2
3use tonic::{Code, Status};
4use tracing::Instrument;
5
6use crate::observability::{CorrelationContext, MetricsRegistry, RpcMetricLabels};
7
8/// Observes one gRPC unary call without depending on generated service types.
9pub async fn observe_rpc_unary<T, Fut>(
10    metrics: Option<&MetricsRegistry>,
11    service: &str,
12    method: &str,
13    request_id: Option<&str>,
14    future: Fut,
15) -> Result<T, Status>
16where
17    Fut: Future<Output = Result<T, Status>>,
18{
19    observe_rpc_unary_with_context(metrics, service, method, request_id, None, future).await
20}
21
22/// Observes one gRPC unary call with explicit request and trace context.
23pub async fn observe_rpc_unary_with_context<T, Fut>(
24    metrics: Option<&MetricsRegistry>,
25    service: &str,
26    method: &str,
27    request_id: Option<&str>,
28    traceparent: Option<&str>,
29    future: Fut,
30) -> Result<T, Status>
31where
32    Fut: Future<Output = Result<T, Status>>,
33{
34    let started = Instant::now();
35    let correlation = CorrelationContext::from_rpc_parts(service, method, request_id, traceparent);
36    let span = tracing::info_span!(
37        "rs_zero.rpc.unary",
38        rpc.service = service,
39        rpc.method = method,
40        service = service,
41        transport = "grpc",
42        route = method,
43        method = method,
44        request_id = correlation.request_id().unwrap_or(""),
45        traceparent = correlation.traceparent().unwrap_or(""),
46        code = tracing::field::Empty,
47        trace_id = tracing::field::Empty,
48        span_id = tracing::field::Empty
49    );
50    #[cfg(feature = "otlp")]
51    if let Some(context) =
52        traceparent.and_then(crate::observability::opentelemetry_context_from_traceparent)
53    {
54        use tracing_opentelemetry::OpenTelemetrySpanExt;
55        let _ = span.set_parent(context);
56    }
57    let result = future.instrument(span.clone()).await;
58    let code = result.as_ref().err().map(Status::code).unwrap_or(Code::Ok);
59    let code_label = format!("{code:?}");
60    span.record("code", tracing::field::display(&code_label));
61    let correlation = correlation.with_status(code_label.clone());
62    if let Some(trace_id) = correlation.trace_id() {
63        span.record("trace_id", tracing::field::display(trace_id));
64    }
65    if let Some(span_id) = correlation.span_id() {
66        span.record("span_id", tracing::field::display(span_id));
67    }
68    let duration = started.elapsed();
69    tracing::info!(
70        rpc.service = service,
71        rpc.method = method,
72        service = service,
73        transport = "grpc",
74        route = method,
75        method = method,
76        request_id = correlation.request_id().unwrap_or(""),
77        traceparent = correlation.traceparent().unwrap_or(""),
78        trace_id = correlation.trace_id().unwrap_or(""),
79        span_id = correlation.span_id().unwrap_or(""),
80        code = %code_label,
81        duration_ms = duration.as_secs_f64() * 1000.0,
82        "rpc unary observed"
83    );
84    if let Some(metrics) = metrics {
85        metrics.record_rpc_request(RpcMetricLabels::new(service, method, code_label), duration);
86    }
87    result
88}
89
90/// Observes one gRPC unary call and derives correlation fields from tonic metadata.
91#[cfg(feature = "rpc")]
92pub async fn observe_rpc_unary_with_metadata<T, Fut>(
93    metrics: Option<&MetricsRegistry>,
94    service: &str,
95    method: &str,
96    metadata: &tonic::metadata::MetadataMap,
97    future: Fut,
98) -> Result<T, Status>
99where
100    Fut: Future<Output = Result<T, Status>>,
101{
102    let request_id = crate::observability::request_id_from_metadata(metadata);
103    let traceparent = crate::observability::traceparent_from_metadata(metadata);
104    observe_rpc_unary_with_context(
105        metrics,
106        service,
107        method,
108        request_id.as_deref(),
109        traceparent.as_deref(),
110        future,
111    )
112    .await
113}
114
115/// Records metrics and a log event for one completed gRPC stream snapshot.
116#[cfg(feature = "rpc")]
117pub fn record_rpc_streaming_snapshot(
118    metrics: Option<&MetricsRegistry>,
119    service: &str,
120    method: &str,
121    snapshot: &crate::rpc::streaming::RpcStreamingSnapshot,
122) {
123    let code = snapshot.code.unwrap_or(Code::Ok);
124    let code_label = format!("{code:?}");
125    tracing::info!(
126        rpc.service = service,
127        rpc.method = method,
128        code = %code_label,
129        sent_messages = snapshot.sent_messages,
130        received_messages = snapshot.received_messages,
131        "rpc stream observed"
132    );
133    if let Some(metrics) = metrics {
134        metrics.record_rpc_request(
135            RpcMetricLabels::new(service, method, code_label),
136            snapshot.duration,
137        );
138    }
139}