use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, OnceLock};
use opentelemetry::trace::TracerProvider as _;
use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
use opentelemetry_http::{Bytes, HttpClient, HttpError};
use opentelemetry_otlp::{Protocol, WithExportConfig, WithHttpConfig};
use opentelemetry_sdk::logs::log_processor_with_async_runtime::BatchLogProcessor;
use opentelemetry_sdk::logs::SdkLoggerProvider;
use opentelemetry_sdk::runtime;
use opentelemetry_sdk::trace::span_processor_with_async_runtime::BatchSpanProcessor;
use opentelemetry_sdk::trace::SdkTracerProvider;
use opentelemetry_sdk::Resource;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::EnvFilter;
use crate::config::{ObservabilityConfig, TracingConfig};
use crate::logging::{self, LogGuard};
#[derive(Default)]
pub struct OtlpGuard {
tracer_provider: Option<SdkTracerProvider>,
logger_provider: Option<SdkLoggerProvider>,
_fallback: Option<LogGuard>,
}
impl Drop for OtlpGuard {
fn drop(&mut self) {
if let Some(provider) = &self.tracer_provider {
let _ = provider.shutdown();
}
if let Some(provider) = &self.logger_provider {
let _ = provider.shutdown();
}
}
}
pub trait TelemetryProvider: Send + Sync {
fn init(&self, cfg: &ObservabilityConfig) -> OtlpGuard;
}
static TELEMETRY_PROVIDER: OnceLock<Box<dyn TelemetryProvider>> = OnceLock::new();
pub fn set_telemetry_provider(provider: Box<dyn TelemetryProvider>) -> bool {
TELEMETRY_PROVIDER.set(provider).is_ok()
}
#[must_use]
pub fn otlp_is_enabled(config: &TracingConfig) -> bool {
let env_enabled = std::env::var("OTEL_TRACES_ENABLED").is_ok_and(|v| v == "true" || v == "1");
config.enabled
|| env_enabled
|| config.otlp_endpoint.is_some()
|| std::env::var("OTEL_EXPORTER_OTLP_ENDPOINT").is_ok()
}
fn resolve_endpoint(config: &TracingConfig) -> Option<String> {
config
.otlp_endpoint
.clone()
.filter(|s| !s.is_empty())
.or_else(|| {
std::env::var("OTEL_EXPORTER_OTLP_ENDPOINT")
.ok()
.filter(|s| !s.is_empty())
})
}
fn resolve_service_name(config: &TracingConfig) -> String {
std::env::var("OTEL_SERVICE_NAME")
.ok()
.filter(|s| !s.is_empty())
.unwrap_or_else(|| config.service_name.clone())
}
fn otlp_env_filter() -> EnvFilter {
EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"))
}
#[must_use]
pub fn init_otlp_in_runtime(config: &ObservabilityConfig) -> OtlpGuard {
if !otlp_is_enabled(&config.tracing) {
return OtlpGuard::default();
}
if let Some(provider) = TELEMETRY_PROVIDER.get() {
return provider.init(config);
}
DefaultOtlpProvider.init(config)
}
pub struct DefaultOtlpProvider;
impl TelemetryProvider for DefaultOtlpProvider {
fn init(&self, config: &ObservabilityConfig) -> OtlpGuard {
let Some(endpoint) = resolve_endpoint(&config.tracing) else {
return OtlpGuard {
tracer_provider: None,
logger_provider: None,
_fallback: logging::init_logging_inner(&config.logging).ok(),
};
};
match build_otlp(&endpoint, config) {
Ok(guard) => guard,
Err(e) => {
eprintln!("OTLP telemetry init failed: {e}; falling back to console/file logging");
OtlpGuard {
tracer_provider: None,
logger_provider: None,
_fallback: logging::init_logging_inner(&config.logging).ok(),
}
}
}
}
}
fn signal_endpoint(base: &str, signal_path: &str) -> String {
let trimmed = base.trim_end_matches('/');
if trimmed.ends_with(signal_path) {
trimmed.to_string()
} else {
format!("{trimmed}{signal_path}")
}
}
#[derive(Debug, Clone)]
struct DiagnosticHttpClient {
inner: reqwest::Client,
reported: Arc<AtomicBool>,
}
#[async_trait::async_trait]
impl HttpClient for DiagnosticHttpClient {
async fn send_bytes(
&self,
request: http::Request<Bytes>,
) -> Result<http::Response<Bytes>, HttpError> {
let uri = request.uri().clone();
match self.inner.send_bytes(request).await {
Ok(response) => Ok(response),
Err(err) => {
if !self.reported.swap(true, Ordering::Relaxed) {
eprintln!(
"[zlayer-observability] OTLP export to {uri} failed: {err} \
— telemetry is being dropped; verify OTEL_EXPORTER_OTLP_ENDPOINT \
(expected a collector base like https://host/otel). \
This notice prints once."
);
}
Err(err)
}
}
}
}
fn build_otlp(
endpoint: &str,
config: &ObservabilityConfig,
) -> Result<OtlpGuard, opentelemetry_otlp::ExporterBuildError> {
let service_name = resolve_service_name(&config.tracing);
let resource = Resource::builder().with_service_name(service_name).build();
let traces_endpoint = signal_endpoint(endpoint, "/v1/traces");
let logs_endpoint = signal_endpoint(endpoint, "/v1/logs");
let http_client = DiagnosticHttpClient {
inner: reqwest::Client::builder()
.build()
.unwrap_or_else(|_| reqwest::Client::new()),
reported: Arc::new(AtomicBool::new(false)),
};
let span_exporter = opentelemetry_otlp::SpanExporter::builder()
.with_http()
.with_http_client(http_client.clone())
.with_endpoint(&traces_endpoint)
.with_protocol(Protocol::HttpBinary)
.build()?;
let span_processor = BatchSpanProcessor::builder(span_exporter, runtime::Tokio).build();
let tracer_provider = SdkTracerProvider::builder()
.with_resource(resource.clone())
.with_span_processor(span_processor)
.build();
let log_exporter = opentelemetry_otlp::LogExporter::builder()
.with_http()
.with_http_client(http_client)
.with_endpoint(&logs_endpoint)
.with_protocol(Protocol::HttpBinary)
.build()?;
let log_processor = BatchLogProcessor::builder(log_exporter, runtime::Tokio).build();
let logger_provider = SdkLoggerProvider::builder()
.with_resource(resource)
.with_log_processor(log_processor)
.build();
opentelemetry::global::set_tracer_provider(tracer_provider.clone());
let span_layer = tracing_opentelemetry::layer().with_tracer(tracer_provider.tracer("zlayer"));
let log_layer = OpenTelemetryTracingBridge::new(&logger_provider);
tracing_subscriber::registry()
.with(otlp_env_filter())
.with(span_layer)
.with(log_layer)
.init();
Ok(OtlpGuard {
tracer_provider: Some(tracer_provider),
logger_provider: Some(logger_provider),
_fallback: None,
})
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::TracingConfig;
#[test]
fn test_disabled_when_unconfigured() {
let config = TracingConfig {
enabled: false,
otlp_endpoint: None,
..Default::default()
};
if std::env::var("OTEL_EXPORTER_OTLP_ENDPOINT").is_err()
&& std::env::var("OTEL_TRACES_ENABLED").is_err()
{
assert!(!otlp_is_enabled(&config));
}
}
#[test]
fn test_enabled_via_config_endpoint() {
let config = TracingConfig {
enabled: false,
otlp_endpoint: Some("https://logging.blackleafcloud.com/otel".to_string()),
..Default::default()
};
assert!(otlp_is_enabled(&config));
assert_eq!(
resolve_endpoint(&config).as_deref(),
Some("https://logging.blackleafcloud.com/otel")
);
}
#[test]
fn test_signal_endpoint_appends_per_signal_path() {
assert_eq!(
signal_endpoint("https://logging.blackleafcloud.com/otel", "/v1/traces"),
"https://logging.blackleafcloud.com/otel/v1/traces"
);
assert_eq!(
signal_endpoint("https://logging.blackleafcloud.com/otel", "/v1/logs"),
"https://logging.blackleafcloud.com/otel/v1/logs"
);
assert_eq!(
signal_endpoint("https://logging.blackleafcloud.com/otel/", "/v1/logs"),
"https://logging.blackleafcloud.com/otel/v1/logs"
);
assert_eq!(
signal_endpoint(
"https://logging.blackleafcloud.com/otel/v1/traces",
"/v1/traces"
),
"https://logging.blackleafcloud.com/otel/v1/traces"
);
}
#[test]
fn test_resolve_service_name_falls_back_to_config() {
let config = TracingConfig {
service_name: "zlayer-test".to_string(),
..Default::default()
};
if std::env::var("OTEL_SERVICE_NAME").is_err() {
assert_eq!(resolve_service_name(&config), "zlayer-test");
}
}
#[test]
#[allow(clippy::used_underscore_binding)] fn test_disabled_returns_empty_guard() {
let config = ObservabilityConfig::default();
if std::env::var("OTEL_EXPORTER_OTLP_ENDPOINT").is_err()
&& std::env::var("OTEL_TRACES_ENABLED").is_err()
{
let guard = init_otlp_in_runtime(&config);
assert!(guard.tracer_provider.is_none());
assert!(guard.logger_provider.is_none());
assert!(guard._fallback.is_none());
}
}
#[test]
fn test_registered_provider_is_used() {
use std::sync::atomic::{AtomicBool, Ordering};
static CALLED: AtomicBool = AtomicBool::new(false);
struct DummyProvider;
impl TelemetryProvider for DummyProvider {
fn init(&self, _cfg: &ObservabilityConfig) -> OtlpGuard {
CALLED.store(true, Ordering::SeqCst);
OtlpGuard::default()
}
}
assert!(
set_telemetry_provider(Box::new(DummyProvider)),
"first registration should win"
);
let config = ObservabilityConfig {
tracing: TracingConfig {
enabled: true,
otlp_endpoint: Some("https://example.invalid/otel".to_string()),
..Default::default()
},
..Default::default()
};
let guard = init_otlp_in_runtime(&config);
assert!(
CALLED.load(Ordering::SeqCst),
"registered provider must run"
);
assert!(guard.tracer_provider.is_none());
assert!(guard.logger_provider.is_none());
}
}