rs-zero 0.2.8

Rust-first microservice framework inspired by go-zero engineering practices
Documentation
use std::{collections::BTreeMap, time::Duration};

use crate::observability::{ObservabilityError, ObservabilityResult};

/// OTLP transport protocol.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum OtlpProtocol {
    /// gRPC OTLP.
    Grpc,
    /// HTTP/protobuf OTLP.
    HttpProtobuf,
}

/// OTLP traces exporter configuration.
#[derive(Debug, Clone, PartialEq)]
pub struct OtlpTraceConfig {
    /// Collector endpoint.
    pub endpoint: String,
    /// Transport protocol.
    pub protocol: OtlpProtocol,
    /// Request headers.
    pub headers: BTreeMap<String, String>,
    /// Resource attributes.
    pub resource: BTreeMap<String, String>,
    /// Sampling ratio in `[0.0, 1.0]`.
    pub sampling_ratio: f64,
    /// Export timeout.
    pub timeout: Duration,
}

impl Default for OtlpTraceConfig {
    fn default() -> Self {
        Self {
            endpoint: "http://127.0.0.1:4317".to_string(),
            protocol: OtlpProtocol::Grpc,
            headers: BTreeMap::new(),
            resource: BTreeMap::new(),
            sampling_ratio: 1.0,
            timeout: Duration::from_secs(5),
        }
    }
}

/// Handle used to flush and shut down tracing exporters.
#[derive(Debug, Clone, Default)]
pub struct TraceShutdownHandle {
    installed: bool,
    #[cfg(feature = "otlp")]
    provider: Option<opentelemetry_sdk::trace::SdkTracerProvider>,
    #[cfg(feature = "otlp")]
    timeout: Duration,
}

impl TraceShutdownHandle {
    /// Creates a handle representing no external exporter.
    pub fn disabled() -> Self {
        Self {
            installed: false,
            #[cfg(feature = "otlp")]
            provider: None,
            #[cfg(feature = "otlp")]
            timeout: Duration::from_secs(5),
        }
    }

    /// Creates a handle representing an installed exporter.
    pub fn installed() -> Self {
        Self {
            installed: true,
            #[cfg(feature = "otlp")]
            provider: None,
            #[cfg(feature = "otlp")]
            timeout: Duration::from_secs(5),
        }
    }

    /// Returns whether this handle owns an installed exporter.
    pub fn is_installed(&self) -> bool {
        self.installed
    }

    /// Flushes pending spans.
    pub fn flush(&self) -> ObservabilityResult<()> {
        #[cfg(feature = "otlp")]
        if let Some(provider) = &self.provider {
            provider
                .force_flush()
                .map_err(|error| ObservabilityError::ExporterInstall(error.to_string()))?;
        }
        Ok(())
    }

    /// Shuts down the exporter.
    pub fn shutdown(self) -> ObservabilityResult<()> {
        #[cfg(feature = "otlp")]
        if let Some(provider) = self.provider {
            provider
                .shutdown_with_timeout(self.timeout)
                .map_err(|error| ObservabilityError::ExporterInstall(error.to_string()))?;
        }
        Ok(())
    }

    #[cfg(feature = "otlp")]
    fn from_provider(
        provider: opentelemetry_sdk::trace::SdkTracerProvider,
        timeout: Duration,
    ) -> Self {
        Self {
            installed: true,
            provider: Some(provider),
            timeout,
        }
    }
}

/// Validates and normalizes OTLP trace config.
pub fn build_otlp_trace_config(config: OtlpTraceConfig) -> ObservabilityResult<OtlpTraceConfig> {
    if config.endpoint.trim().is_empty() {
        return Err(ObservabilityError::MissingOtlpEndpoint);
    }
    Ok(OtlpTraceConfig {
        sampling_ratio: config.sampling_ratio.clamp(0.0, 1.0),
        ..config
    })
}

/// Installs a real OTLP tracing pipeline.
#[cfg(feature = "otlp")]
pub fn install_otlp_tracing(
    config: OtlpTraceConfig,
    filter: String,
) -> ObservabilityResult<TraceShutdownHandle> {
    use opentelemetry::trace::TracerProvider as _;
    use opentelemetry::{KeyValue, global};
    use opentelemetry_otlp::{Protocol, SpanExporter, WithExportConfig, WithHttpConfig};
    use opentelemetry_sdk::{Resource, propagation::TraceContextPropagator, trace as sdktrace};
    use tracing_subscriber::{
        EnvFilter, fmt::format::FmtSpan, layer::SubscriberExt, util::SubscriberInitExt,
    };

    let config = build_otlp_trace_config(config)?;
    let exporter = match config.protocol {
        OtlpProtocol::Grpc => build_grpc_exporter(&config)?,
        OtlpProtocol::HttpProtobuf => SpanExporter::builder()
            .with_http()
            .with_endpoint(config.endpoint.clone())
            .with_protocol(Protocol::HttpBinary)
            .with_timeout(config.timeout)
            .with_headers(config.headers.clone().into_iter().collect())
            .build()
            .map_err(|error| ObservabilityError::ExporterInstall(error.to_string()))?,
    };

    let resource = Resource::builder_empty()
        .with_attributes(
            config
                .resource
                .iter()
                .map(|(key, value)| KeyValue::new(key.clone(), value.clone())),
        )
        .build();
    let provider = sdktrace::SdkTracerProvider::builder()
        .with_batch_exporter(exporter)
        .with_sampler(sdktrace::Sampler::ParentBased(Box::new(
            sdktrace::Sampler::TraceIdRatioBased(config.sampling_ratio),
        )))
        .with_resource(resource)
        .build();
    let tracer = provider.tracer("rs-zero");
    let otel_layer = tracing_opentelemetry::layer().with_tracer(tracer);
    let env_filter = EnvFilter::try_new(filter).unwrap_or_else(|_| EnvFilter::new("info"));

    tracing_subscriber::registry()
        .with(env_filter)
        .with(tracing_subscriber::fmt::layer().with_span_events(FmtSpan::CLOSE))
        .with(otel_layer)
        .try_init()
        .map_err(|_| ObservabilityError::SubscriberAlreadyInitialized)?;
    global::set_tracer_provider(provider.clone());
    global::set_text_map_propagator(TraceContextPropagator::new());

    Ok(TraceShutdownHandle::from_provider(provider, config.timeout))
}

#[cfg(feature = "otlp")]
fn build_grpc_exporter(
    config: &OtlpTraceConfig,
) -> ObservabilityResult<opentelemetry_otlp::SpanExporter> {
    use opentelemetry_otlp::{Protocol, SpanExporter, WithExportConfig, WithTonicConfig};
    use tonic::metadata::{Ascii, MetadataKey, MetadataMap, MetadataValue};

    let mut metadata = MetadataMap::new();
    for (key, value) in &config.headers {
        metadata.insert(
            key.parse::<MetadataKey<Ascii>>()
                .map_err(|error| ObservabilityError::ExporterInstall(error.to_string()))?,
            MetadataValue::try_from(value.as_str())
                .map_err(|error| ObservabilityError::ExporterInstall(error.to_string()))?,
        );
    }

    SpanExporter::builder()
        .with_tonic()
        .with_endpoint(config.endpoint.clone())
        .with_protocol(Protocol::Grpc)
        .with_timeout(config.timeout)
        .with_metadata(metadata)
        .build()
        .map_err(|error| ObservabilityError::ExporterInstall(error.to_string()))
}

#[cfg(test)]
mod tests {
    use super::{OtlpTraceConfig, TraceShutdownHandle, build_otlp_trace_config};
    use crate::observability::ObservabilityError;

    #[test]
    fn otlp_config_requires_endpoint() {
        let error = build_otlp_trace_config(OtlpTraceConfig {
            endpoint: String::new(),
            ..OtlpTraceConfig::default()
        })
        .expect_err("endpoint");
        assert_eq!(error, ObservabilityError::MissingOtlpEndpoint);
    }

    #[test]
    fn shutdown_handle_flushes_without_collector() {
        let handle = TraceShutdownHandle::installed();
        assert!(handle.is_installed());
        handle.flush().expect("flush");
        handle.shutdown().expect("shutdown");
    }
}