rs-zero 0.2.6

Rust-first microservice framework inspired by go-zero engineering practices
Documentation
use std::{future::Future, time::Instant};

use tonic::{Code, Status};
use tracing::Instrument;

use crate::observability::{CorrelationContext, MetricsRegistry, RpcMetricLabels};

/// Observes one gRPC unary call without depending on generated service types.
pub async fn observe_rpc_unary<T, Fut>(
    metrics: Option<&MetricsRegistry>,
    service: &str,
    method: &str,
    request_id: Option<&str>,
    future: Fut,
) -> Result<T, Status>
where
    Fut: Future<Output = Result<T, Status>>,
{
    observe_rpc_unary_with_context(metrics, service, method, request_id, None, future).await
}

/// Observes one gRPC unary call with explicit request and trace context.
pub async fn observe_rpc_unary_with_context<T, Fut>(
    metrics: Option<&MetricsRegistry>,
    service: &str,
    method: &str,
    request_id: Option<&str>,
    traceparent: Option<&str>,
    future: Fut,
) -> Result<T, Status>
where
    Fut: Future<Output = Result<T, Status>>,
{
    let started = Instant::now();
    let correlation = CorrelationContext::from_rpc_parts(service, method, request_id, traceparent);
    let span = tracing::info_span!(
        "rs_zero.rpc.unary",
        rpc.service = service,
        rpc.method = method,
        service = service,
        transport = "grpc",
        route = method,
        method = method,
        request_id = correlation.request_id().unwrap_or(""),
        traceparent = correlation.traceparent().unwrap_or(""),
        code = tracing::field::Empty,
        trace_id = tracing::field::Empty,
        span_id = tracing::field::Empty
    );
    #[cfg(feature = "otlp")]
    if let Some(context) =
        traceparent.and_then(crate::observability::opentelemetry_context_from_traceparent)
    {
        use tracing_opentelemetry::OpenTelemetrySpanExt;
        let _ = span.set_parent(context);
    }
    let result = future.instrument(span.clone()).await;
    let code = result.as_ref().err().map(Status::code).unwrap_or(Code::Ok);
    let code_label = format!("{code:?}");
    span.record("code", tracing::field::display(&code_label));
    let correlation = correlation.with_status(code_label.clone());
    if let Some(trace_id) = correlation.trace_id() {
        span.record("trace_id", tracing::field::display(trace_id));
    }
    if let Some(span_id) = correlation.span_id() {
        span.record("span_id", tracing::field::display(span_id));
    }
    let duration = started.elapsed();
    tracing::info!(
        rpc.service = service,
        rpc.method = method,
        service = service,
        transport = "grpc",
        route = method,
        method = method,
        request_id = correlation.request_id().unwrap_or(""),
        traceparent = correlation.traceparent().unwrap_or(""),
        trace_id = correlation.trace_id().unwrap_or(""),
        span_id = correlation.span_id().unwrap_or(""),
        code = %code_label,
        duration_ms = duration.as_secs_f64() * 1000.0,
        "rpc unary observed"
    );
    if let Some(metrics) = metrics {
        metrics.record_rpc_request(RpcMetricLabels::new(service, method, code_label), duration);
    }
    result
}

/// Observes one gRPC unary call and derives correlation fields from tonic metadata.
#[cfg(feature = "rpc")]
pub async fn observe_rpc_unary_with_metadata<T, Fut>(
    metrics: Option<&MetricsRegistry>,
    service: &str,
    method: &str,
    metadata: &tonic::metadata::MetadataMap,
    future: Fut,
) -> Result<T, Status>
where
    Fut: Future<Output = Result<T, Status>>,
{
    let request_id = crate::observability::request_id_from_metadata(metadata);
    let traceparent = crate::observability::traceparent_from_metadata(metadata);
    observe_rpc_unary_with_context(
        metrics,
        service,
        method,
        request_id.as_deref(),
        traceparent.as_deref(),
        future,
    )
    .await
}

/// Records metrics and a log event for one completed gRPC stream snapshot.
#[cfg(feature = "rpc")]
pub fn record_rpc_streaming_snapshot(
    metrics: Option<&MetricsRegistry>,
    service: &str,
    method: &str,
    snapshot: &crate::rpc::streaming::RpcStreamingSnapshot,
) {
    let code = snapshot.code.unwrap_or(Code::Ok);
    let code_label = format!("{code:?}");
    tracing::info!(
        rpc.service = service,
        rpc.method = method,
        code = %code_label,
        sent_messages = snapshot.sent_messages,
        received_messages = snapshot.received_messages,
        "rpc stream observed"
    );
    if let Some(metrics) = metrics {
        metrics.record_rpc_request(
            RpcMetricLabels::new(service, method, code_label),
            snapshot.duration,
        );
    }
}