pg_exporter 0.4.2

PostgreSQL metric exporter for Prometheus
Documentation
use anyhow::{Result, anyhow};
use base64::{Engine, engine::general_purpose};
use opentelemetry::propagation::TextMapCompositePropagator;
use opentelemetry::{KeyValue, global, trace::TracerProvider as _};
use opentelemetry_otlp::{Compression, WithExportConfig, WithHttpConfig, WithTonicConfig};
use opentelemetry_sdk::{
    Resource,
    propagation::{BaggagePropagator, TraceContextPropagator},
    trace::{SdkTracerProvider, Tracer},
};
use std::{collections::HashMap, env::var, time::Duration};
use tonic::{metadata::*, transport::ClientTlsConfig};
use tracing::Level;
use tracing::debug;
use tracing_subscriber::{EnvFilter, Registry, fmt, layer::SubscriberExt};
use ulid::Ulid;

use once_cell::sync::OnceCell;
static TRACER_PROVIDER: OnceCell<SdkTracerProvider> = OnceCell::new();

fn parse_headers_env(headers_str: &str) -> HashMap<String, String> {
    headers_str
        .split(',')
        .filter_map(|pair| {
            let mut parts = pair.splitn(2, '=');
            let key = parts.next()?.trim().to_string();
            let value = parts.next()?.trim().to_string();
            Some((key, value))
        })
        .collect()
}

// Convert HashMap<String, String> into tonic::MetadataMap
// - Supports ASCII metadata (normal keys)
// - Supports binary metadata keys (ending with "-bin"), values must be base64-encoded
fn headers_to_metadata(headers: &HashMap<String, String>) -> Result<MetadataMap> {
    let mut meta = MetadataMap::with_capacity(headers.len());

    for (k, v) in headers {
        // gRPC metadata keys must be lowercase ASCII. Normalize to be safe.
        let key_str = k.to_ascii_lowercase();

        if key_str.ends_with("-bin") {
            // Binary metadata: value must be bytes. Expect base64 in env, decode here.
            let bytes = general_purpose::STANDARD
                .decode(v.as_bytes())
                .map_err(|e| anyhow!("failed to base64-decode value for key {}: {}", key_str, e))?;

            // Build an owned binary metadata key (no 'static required)
            let key = MetadataKey::<Binary>::from_bytes(key_str.as_bytes())
                .map_err(|e| anyhow!("invalid binary metadata key {}: {}", key_str, e))?;

            let val = MetadataValue::from_bytes(&bytes);
            meta.insert_bin(key, val);
        } else {
            // ASCII metadata
            let key = MetadataKey::<Ascii>::from_bytes(key_str.as_bytes())
                .map_err(|e| anyhow!("invalid ASCII metadata key {}: {}", key_str, e))?;

            let val: MetadataValue<_> = v
                .parse()
                .map_err(|e| anyhow!("invalid ASCII metadata value for key {}: {}", key_str, e))?;

            meta.insert(key, val);
        }
    }

    Ok(meta)
}

fn normalize_endpoint(ep: String) -> String {
    if ep.starts_with("http://") || ep.starts_with("https://") {
        ep
    } else {
        format!("https://{}", ep.trim_end_matches('/'))
    }
}

fn init_tracer() -> Result<Tracer> {
    // Read protocol from environment (default to grpc if not set)
    let protocol = var("OTEL_EXPORTER_OTLP_PROTOCOL").unwrap_or_else(|_| "grpc".to_string());

    // Protocol-specific sensible default
    let default_ep = match protocol.as_str() {
        "http/protobuf" => "http://localhost:4318",
        _ => "http://localhost:4317",
    };

    let endpoint = var("OTEL_EXPORTER_OTLP_ENDPOINT").unwrap_or_else(|_| default_ep.to_string());
    let endpoint = normalize_endpoint(endpoint);

    let headers = var("OTEL_EXPORTER_OTLP_HEADERS")
        .ok()
        .map(|s| parse_headers_env(&s))
        .unwrap_or_default();

    let exporter = match protocol.as_str() {
        "http/protobuf" => {
            let http_client = reqwest::Client::builder()
                .timeout(Duration::from_secs(5))
                .build()?;

            let mut builder = opentelemetry_otlp::SpanExporter::builder()
                .with_http()
                .with_compression(Compression::Gzip)
                .with_http_client(http_client)
                .with_endpoint(endpoint);

            if !headers.is_empty() {
                builder = builder.with_headers(headers);
            }

            builder.with_timeout(Duration::from_secs(3)).build()?
        }
        _ => {
            // gRPC mode (default)
            let mut builder = opentelemetry_otlp::SpanExporter::builder()
                .with_tonic()
                .with_endpoint(&endpoint);

            if let Some(host) = &endpoint
                .strip_prefix("https://")
                .and_then(|s| s.split('/').next())
                .and_then(|h| h.split(':').next())
            {
                let tls = ClientTlsConfig::new()
                    .domain_name(host.to_string())
                    .with_native_roots();

                builder = builder.with_tls_config(tls);
            }

            builder = builder
                .with_compression(Compression::Gzip)
                .with_timeout(Duration::from_secs(3));

            if !headers.is_empty() {
                let metadata = headers_to_metadata(&headers)?;
                builder = builder.with_metadata(metadata);
            }

            builder.build()?
        }
    };

    // Generate service.instance.id (override via env if you want stability).
    let instance_id = var("OTEL_SERVICE_INSTANCE_ID").unwrap_or_else(|_| Ulid::new().to_string());

    let trace_provider = SdkTracerProvider::builder()
        .with_batch_exporter(exporter)
        .with_resource(
            Resource::builder_empty()
                .with_attributes(vec![
                    KeyValue::new("service.name", env!("CARGO_PKG_NAME")),
                    KeyValue::new("service.version", env!("CARGO_PKG_VERSION")),
                    KeyValue::new("service.instance.id", instance_id),
                ])
                .build(),
        )
        .build();

    let stored = trace_provider.clone();

    let _ = TRACER_PROVIDER.set(stored);

    global::set_tracer_provider(trace_provider.clone());

    global::set_text_map_propagator(TextMapCompositePropagator::new(vec![
        Box::new(TraceContextPropagator::new()),
        Box::new(BaggagePropagator::new()),
    ]));

    let tracer = trace_provider.tracer(env!("CARGO_PKG_NAME"));

    Ok(tracer)
}

/// Start the telemetry layer
/// # Errors
/// Public telemetry initialization
pub fn init(verbosity_level: Option<Level>) -> Result<()> {
    let verbosity_level = verbosity_level.unwrap_or(Level::ERROR);

    let fmt_layer = fmt::layer()
        .with_file(false)
        .with_line_number(false)
        .with_thread_ids(false)
        .with_thread_names(false)
        .with_target(false)
        .pretty();

    let filter = EnvFilter::builder()
        .with_default_directive(verbosity_level.into())
        .from_env_lossy()
        .add_directive("hyper=error".parse()?)
        .add_directive("tokio=error".parse()?)
        .add_directive("opentelemetry_sdk=warn".parse()?);

    if var("OTEL_EXPORTER_OTLP_ENDPOINT").is_ok() {
        let tracer = init_tracer()?;
        let otel_layer = tracing_opentelemetry::layer().with_tracer(tracer);
        let subscriber = Registry::default()
            .with(fmt_layer)
            .with(otel_layer)
            .with(filter);
        tracing::subscriber::set_global_default(subscriber)?;
    } else {
        let subscriber = Registry::default().with(fmt_layer).with(filter);
        tracing::subscriber::set_global_default(subscriber)?;
    }
    Ok(())
}

/// Gracefully shut down tracer provider
pub fn shutdown_tracer() {
    if let Some(tp) = TRACER_PROVIDER.get() {
        debug!("shutting down tracer provider");
        let _ = tp.shutdown();
        debug!("tracer provider shutdown complete");
    }
}