pxs 0.6.3

pxs (Parallel X-Sync) - Integrity-first Rust sync/clone for large mutable datasets.
Documentation
use anyhow::Result;
use opentelemetry::{KeyValue, global, trace::TracerProvider as _};
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk::{
    Resource,
    trace::{SdkTracerProvider, Tracer},
};
use std::sync::OnceLock;
use std::time::Duration;
use tracing::Level;
use tracing_subscriber::{EnvFilter, Registry, fmt, layer::SubscriberExt};

static TRACER_PROVIDER: OnceLock<SdkTracerProvider> = OnceLock::new();

fn init_tracer() -> Result<Tracer> {
    // Only initialize OTLP if an endpoint is explicitly provided in the environment
    if std::env::var("OTEL_EXPORTER_OTLP_ENDPOINT").is_err() {
        return Err(anyhow::anyhow!("No OTLP endpoint configured"));
    }

    let tracer_provider = SdkTracerProvider::builder()
        .with_batch_exporter(
            opentelemetry_otlp::SpanExporter::builder()
                .with_tonic()
                .with_timeout(Duration::from_secs(3))
                .build()?,
        )
        .with_resource(
            Resource::builder_empty()
                .with_attributes(vec![
                    KeyValue::new("service.name", env!("CARGO_PKG_NAME")),
                    KeyValue::new("service.version", env!("CARGO_PKG_VERSION")),
                ])
                .build(),
        )
        .build();

    global::set_tracer_provider(tracer_provider.clone());
    let tracer = tracer_provider.tracer(env!("CARGO_PKG_NAME"));

    // Store the provider for later shutdown
    let _ = TRACER_PROVIDER.set(tracer_provider);

    Ok(tracer)
}

/// Start the telemetry layer
/// # Errors
/// Will return an error if the telemetry layer fails to start
pub fn init(verbosity_level: Option<Level>) -> Result<()> {
    let verbosity_level = verbosity_level.unwrap_or(Level::INFO);

    let fmt_layer = fmt::layer()
        .with_writer(std::io::stderr)
        .with_file(false)
        .with_line_number(false)
        .with_thread_ids(false)
        .with_thread_names(false)
        .with_target(false)
        .with_level(false) // Hide "INFO" prefix
        .without_time()
        .compact();

    let filter = EnvFilter::builder()
        .with_default_directive(verbosity_level.into())
        .from_env_lossy()
        .add_directive("hyper=error".parse()?)
        .add_directive("tokio=error".parse()?)
        .add_directive("reqwest=error".parse()?);

    let registry = Registry::default().with(fmt_layer).with(filter);

    if let Ok(tracer) = init_tracer() {
        let otel_tracer_layer = tracing_opentelemetry::layer().with_tracer(tracer);
        tracing::subscriber::set_global_default(registry.with(otel_tracer_layer))?;
    } else {
        tracing::subscriber::set_global_default(registry)?;
    }

    Ok(())
}

/// Shutdown the telemetry layer
pub fn shutdown() {
    if let Some(provider) = TRACER_PROVIDER.get() {
        let _ = provider.shutdown();
    }
}