use std::cell::Cell;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Once, OnceLock};
use anyhow::{Context, Result};
use opentelemetry::trace::TracerProvider as _;
use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
use opentelemetry_otlp::{LogExporter, SpanExporter, WithExportConfig};
use opentelemetry_sdk::Resource;
use opentelemetry_sdk::logs::SdkLoggerProvider;
use opentelemetry_sdk::trace::{Sampler, SdkTracerProvider};
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::{EnvFilter, Layer, fmt, registry};
use crate::config::{LogFormat, ObservabilityConfig, redact_secret};
pub mod metrics;
pub mod source_ctx;
pub mod trace_context;
pub use metrics::{NodeCtx, NodeKind, ObsHandle, init_metrics};
pub use source_ctx::{SendStopped, SourceCtx};
static INIT: Once = Once::new();
static INIT_OK: AtomicBool = AtomicBool::new(false);
static TRACER_PROVIDER: OnceLock<SdkTracerProvider> = OnceLock::new();
static LOGGER_PROVIDER: OnceLock<SdkLoggerProvider> = OnceLock::new();
pub fn init_default_logging(default_directive: &str) -> Result<()> {
init_from_config(None, default_directive)
}
pub fn init_from_config(
config: Option<&ObservabilityConfig>,
default_directive: &str,
) -> Result<()> {
if INIT.is_completed() {
if INIT_OK.load(Ordering::Acquire) {
return Ok(());
} else {
return Err(anyhow::anyhow!(
"tracing subscriber initialization failed on a previous attempt"
));
}
}
let format = config.map(|c| c.log_format).unwrap_or_default();
let configured_level = config.and_then(|c| c.log_level.clone());
let provider_err: Cell<Option<anyhow::Error>> = Cell::new(None);
INIT.call_once(|| {
let tracer_provider = match init_traces(config) {
Ok(p) => p,
Err(e) => {
provider_err.set(Some(e));
INIT_OK.store(false, Ordering::Release);
return;
}
};
let logger_provider = match init_logs(config) {
Ok(p) => p,
Err(e) => {
provider_err.set(Some(e));
INIT_OK.store(false, Ordering::Release);
return;
}
};
let ok = install(
format,
configured_level.as_deref(),
default_directive,
tracer_provider,
logger_provider,
);
INIT_OK.store(ok, Ordering::Release);
});
if let Some(err) = provider_err.take() {
return Err(err);
}
if INIT_OK.load(Ordering::Acquire) {
Ok(())
} else {
Err(anyhow::anyhow!("failed to install tracing subscriber"))
}
}
fn install(
format: LogFormat,
configured_level: Option<&str>,
default_directive: &str,
tracer_provider: Option<SdkTracerProvider>,
logger_provider: Option<SdkLoggerProvider>,
) -> bool {
let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| {
EnvFilter::try_new(configured_level.unwrap_or(default_directive))
.expect("configured/default log filter should be valid")
});
let fmt_layer = match format {
LogFormat::Text => fmt::layer().with_writer(std::io::stderr).boxed(),
LogFormat::Json => fmt::layer().json().with_writer(std::io::stderr).boxed(),
};
let telemetry_layer = tracer_provider.map(|provider| {
let tracer = provider.tracer("courier");
let layer = tracing_opentelemetry::layer().with_tracer(tracer).boxed();
let _ = TRACER_PROVIDER.set(provider);
layer
});
let otlp_log_layer = logger_provider.map(|provider| {
let layer = OpenTelemetryTracingBridge::new(&provider).boxed();
let _ = LOGGER_PROVIDER.set(provider);
layer
});
registry()
.with(filter)
.with(telemetry_layer)
.with(otlp_log_layer)
.with(fmt_layer)
.try_init()
.is_ok()
}
fn configured_endpoint(endpoint: Option<&str>) -> Option<&str> {
endpoint.and_then(|endpoint| {
let endpoint = endpoint.trim();
(!endpoint.is_empty()).then_some(endpoint)
})
}
fn init_traces(config: Option<&ObservabilityConfig>) -> Result<Option<SdkTracerProvider>> {
let Some(obs) = config else {
return Ok(None);
};
let Some(endpoint) = configured_endpoint(obs.tracing.otlp_endpoint.as_deref()) else {
return Ok(None);
};
let exporter = SpanExporter::builder()
.with_tonic()
.with_endpoint(endpoint)
.build()
.with_context(|| {
format!(
"failed to build OTLP span exporter for {}",
redact_secret(endpoint)
)
})?;
let sampler = Sampler::ParentBased(Box::new(Sampler::TraceIdRatioBased(
obs.tracing.sample_ratio,
)));
let resource = Resource::builder()
.with_service_name(obs.service_name.clone())
.build();
let provider = SdkTracerProvider::builder()
.with_batch_exporter(exporter)
.with_sampler(sampler)
.with_resource(resource)
.build();
Ok(Some(provider))
}
fn init_logs(config: Option<&ObservabilityConfig>) -> Result<Option<SdkLoggerProvider>> {
let Some(obs) = config else {
return Ok(None);
};
let Some(endpoint) = configured_endpoint(obs.logs.otlp_endpoint.as_deref()) else {
return Ok(None);
};
let exporter = LogExporter::builder()
.with_tonic()
.with_endpoint(endpoint)
.build()
.with_context(|| {
format!(
"failed to build OTLP log exporter for {}",
redact_secret(endpoint)
)
})?;
let resource = Resource::builder()
.with_service_name(obs.service_name.clone())
.build();
let provider = SdkLoggerProvider::builder()
.with_batch_exporter(exporter)
.with_resource(resource)
.build();
Ok(Some(provider))
}
pub(crate) fn force_flush_traces() {
if let Some(provider) = TRACER_PROVIDER.get() {
let _ = provider.force_flush();
}
}
pub(crate) fn shutdown_traces() {
if let Some(provider) = TRACER_PROVIDER.get() {
let _ = provider.shutdown();
}
}
pub(crate) fn force_flush_logs() {
if let Some(provider) = LOGGER_PROVIDER.get() {
let _ = provider.force_flush();
}
}
pub(crate) fn shutdown_logs() {
if let Some(provider) = LOGGER_PROVIDER.get() {
let _ = provider.shutdown();
}
}
#[cfg(test)]
mod tests {
use std::sync::{Arc, Mutex};
use super::{configured_endpoint, init_from_config};
use tracing::Subscriber;
use tracing::subscriber::with_default;
use tracing_log::LogTracer;
use tracing_subscriber::{EnvFilter, layer::SubscriberExt, registry::Registry};
#[derive(Clone, Default)]
struct CapturingLayer {
events: Arc<Mutex<Vec<String>>>,
}
impl<S: Subscriber> tracing_subscriber::Layer<S> for CapturingLayer {
fn on_event(
&self,
event: &tracing::Event<'_>,
_ctx: tracing_subscriber::layer::Context<'_, S>,
) {
self.events
.lock()
.unwrap()
.push(event.metadata().target().to_string());
}
}
#[test]
fn rust_log_directive_parses_through_env_filter() {
let filter = EnvFilter::new("courier=debug,hyper=warn");
let rendered = filter.to_string();
assert!(rendered.contains("courier=debug"), "got: {rendered}");
assert!(rendered.contains("hyper=warn"), "got: {rendered}");
}
#[test]
fn empty_otlp_endpoints_are_disabled() {
assert_eq!(configured_endpoint(None), None);
assert_eq!(configured_endpoint(Some("")), None);
assert_eq!(configured_endpoint(Some(" \t\n")), None);
assert_eq!(
configured_endpoint(Some(" http://collector:4317 ")),
Some("http://collector:4317")
);
}
#[test]
fn init_from_config_second_call_is_noop() {
let config = crate::config::ObservabilityConfig::default();
let first = init_from_config(Some(&config), "off");
let second = init_from_config(Some(&config), "off");
assert_eq!(
first.is_ok(),
second.is_ok(),
"init_from_config should be idempotent: first={first:?}, second={second:?}"
);
}
#[test]
fn log_macros_flow_through_tracing_subscriber() {
let _ = LogTracer::init();
let layer = CapturingLayer::default();
let events = layer.events.clone();
let subscriber = Registry::default().with(layer);
with_default(subscriber, || {
log::warn!(target: "courier_pr1_bridge_check", "bridge ok");
});
let captured = events.lock().unwrap().clone();
assert!(
captured.iter().any(|t| t == "log"),
"expected log:: record to reach tracing subscriber, got: {captured:?}"
);
}
}