use std::{future::Future, time::Instant};
use tonic::{Code, Status};
use tracing::Instrument;
use crate::observability::{CorrelationContext, MetricsRegistry, RpcMetricLabels};
pub async fn observe_rpc_unary<T, Fut>(
metrics: Option<&MetricsRegistry>,
service: &str,
method: &str,
request_id: Option<&str>,
future: Fut,
) -> Result<T, Status>
where
Fut: Future<Output = Result<T, Status>>,
{
observe_rpc_unary_with_context(metrics, service, method, request_id, None, future).await
}
pub async fn observe_rpc_unary_with_context<T, Fut>(
metrics: Option<&MetricsRegistry>,
service: &str,
method: &str,
request_id: Option<&str>,
traceparent: Option<&str>,
future: Fut,
) -> Result<T, Status>
where
Fut: Future<Output = Result<T, Status>>,
{
let started = Instant::now();
let correlation = CorrelationContext::from_rpc_parts(service, method, request_id, traceparent);
let span = tracing::info_span!(
"rs_zero.rpc.unary",
rpc.service = service,
rpc.method = method,
service = service,
transport = "grpc",
route = method,
method = method,
request_id = correlation.request_id().unwrap_or(""),
traceparent = correlation.traceparent().unwrap_or(""),
code = tracing::field::Empty,
trace_id = tracing::field::Empty,
span_id = tracing::field::Empty
);
#[cfg(feature = "otlp")]
if let Some(context) =
traceparent.and_then(crate::observability::opentelemetry_context_from_traceparent)
{
use tracing_opentelemetry::OpenTelemetrySpanExt;
let _ = span.set_parent(context);
}
let result = future.instrument(span.clone()).await;
let code = result.as_ref().err().map(Status::code).unwrap_or(Code::Ok);
let code_label = format!("{code:?}");
span.record("code", tracing::field::display(&code_label));
let correlation = correlation.with_status(code_label.clone());
if let Some(trace_id) = correlation.trace_id() {
span.record("trace_id", tracing::field::display(trace_id));
}
if let Some(span_id) = correlation.span_id() {
span.record("span_id", tracing::field::display(span_id));
}
let duration = started.elapsed();
tracing::info!(
rpc.service = service,
rpc.method = method,
service = service,
transport = "grpc",
route = method,
method = method,
request_id = correlation.request_id().unwrap_or(""),
traceparent = correlation.traceparent().unwrap_or(""),
trace_id = correlation.trace_id().unwrap_or(""),
span_id = correlation.span_id().unwrap_or(""),
code = %code_label,
duration_ms = duration.as_secs_f64() * 1000.0,
"rpc unary observed"
);
if let Some(metrics) = metrics {
metrics.record_rpc_request(RpcMetricLabels::new(service, method, code_label), duration);
}
result
}
#[cfg(feature = "rpc")]
pub async fn observe_rpc_unary_with_metadata<T, Fut>(
metrics: Option<&MetricsRegistry>,
service: &str,
method: &str,
metadata: &tonic::metadata::MetadataMap,
future: Fut,
) -> Result<T, Status>
where
Fut: Future<Output = Result<T, Status>>,
{
let request_id = crate::observability::request_id_from_metadata(metadata);
let traceparent = crate::observability::traceparent_from_metadata(metadata);
observe_rpc_unary_with_context(
metrics,
service,
method,
request_id.as_deref(),
traceparent.as_deref(),
future,
)
.await
}
#[cfg(feature = "rpc")]
pub fn record_rpc_streaming_snapshot(
metrics: Option<&MetricsRegistry>,
service: &str,
method: &str,
snapshot: &crate::rpc::streaming::RpcStreamingSnapshot,
) {
let code = snapshot.code.unwrap_or(Code::Ok);
let code_label = format!("{code:?}");
tracing::info!(
rpc.service = service,
rpc.method = method,
code = %code_label,
sent_messages = snapshot.sent_messages,
received_messages = snapshot.received_messages,
"rpc stream observed"
);
if let Some(metrics) = metrics {
metrics.record_rpc_request(
RpcMetricLabels::new(service, method, code_label),
snapshot.duration,
);
}
}