use crate::config::{LogConfig, LogFormat, TelemetryConfig, TelemetryProtocol};
use http::Uri;
use thiserror::Error;
use tracing_subscriber::{EnvFilter, fmt, layer::SubscriberExt, util::SubscriberInitExt};
#[cfg(feature = "telemetry-otlp")]
use opentelemetry::{KeyValue, trace::TracerProvider as _};
#[cfg(feature = "telemetry-otlp")]
use opentelemetry_otlp::WithExportConfig as _;
#[cfg(feature = "telemetry-otlp")]
use opentelemetry_sdk::{Resource, propagation::TraceContextPropagator, trace::SdkTracerProvider};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum ResolvedLogFormat {
Pretty,
Json,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TelemetryRuntime {
pub log_format: ResolvedLogFormat,
pub trace_export: TraceExport,
pub warning: Option<String>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum TraceExport {
Disabled,
Otlp(OtlpTraceRuntime),
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct OtlpTraceRuntime {
pub endpoint: String,
pub protocol: TelemetryProtocol,
pub resource: TelemetryResource,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TelemetryResource {
pub service_name: String,
pub service_namespace: Option<String>,
pub service_version: String,
pub environment: String,
}
#[derive(Debug, Error)]
#[non_exhaustive]
pub enum TelemetryInitError {
#[error("telemetry is enabled but no OTLP endpoint was configured")]
MissingEndpoint,
#[error("telemetry service_name must not be empty")]
EmptyServiceName,
#[error("invalid OTLP endpoint {endpoint:?}: {reason}")]
InvalidEndpoint {
endpoint: String,
reason: String,
},
#[error("telemetry-otlp cargo feature is not enabled")]
#[allow(dead_code)]
FeatureDisabled,
#[error("failed to initialize OTLP exporter: {0}")]
#[allow(dead_code)]
#[allow(dead_code)]
#[allow(dead_code)]
#[allow(dead_code)]
ExporterInit(String),
#[error("failed to initialize tracing subscriber: {0}")]
SubscriberInit(String),
}
#[must_use]
#[derive(Debug)]
pub struct TelemetryGuard {
#[cfg(feature = "telemetry-otlp")]
provider: Option<SdkTracerProvider>,
}
impl TelemetryGuard {
pub const fn disabled() -> Self {
Self {
#[cfg(feature = "telemetry-otlp")]
provider: None,
}
}
#[cfg(feature = "telemetry-otlp")]
const fn with_provider(provider: SdkTracerProvider) -> Self {
Self {
provider: Some(provider),
}
}
}
impl Drop for TelemetryGuard {
fn drop(&mut self) {
#[cfg(feature = "telemetry-otlp")]
if let Some(provider) = self.provider.take() {
let _ = provider.shutdown();
}
}
}
impl TelemetryRuntime {
pub fn from_config(
log: &LogConfig,
telemetry: &TelemetryConfig,
profile: Option<&str>,
) -> Result<Self, TelemetryInitError> {
let log_format = resolve_log_format(log, profile);
if !telemetry.enabled {
return Ok(Self {
log_format,
trace_export: TraceExport::Disabled,
warning: None,
});
}
if telemetry.service_name.trim().is_empty() {
return strict_or_fallback(
log_format,
telemetry.strict,
TelemetryInitError::EmptyServiceName,
);
}
let Some(endpoint) = telemetry
.otlp_endpoint
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty())
else {
return strict_or_fallback(
log_format,
telemetry.strict,
TelemetryInitError::MissingEndpoint,
);
};
if let Err(error) = validate_otlp_endpoint(endpoint) {
return strict_or_fallback(log_format, telemetry.strict, error);
}
Ok(Self {
log_format,
trace_export: TraceExport::Otlp(OtlpTraceRuntime {
endpoint: endpoint.to_owned(),
protocol: telemetry.protocol,
resource: TelemetryResource {
service_name: telemetry.service_name.clone(),
service_namespace: telemetry.service_namespace.clone(),
service_version: telemetry.service_version.clone(),
environment: telemetry.environment.clone(),
},
}),
warning: None,
})
}
}
pub fn init(
log: &LogConfig,
telemetry: &TelemetryConfig,
profile: Option<&str>,
) -> Result<TelemetryGuard, TelemetryInitError> {
let runtime = TelemetryRuntime::from_config(log, telemetry, profile)?;
if let Some(warning) = runtime.warning.as_deref() {
eprintln!("Warning: {warning}");
}
match &runtime.trace_export {
TraceExport::Disabled => init_logging_only(log, runtime.log_format),
TraceExport::Otlp(otlp) => {
init_otlp_runtime(log, runtime.log_format, telemetry.strict, otlp)
}
}
}
fn strict_or_fallback(
log_format: ResolvedLogFormat,
strict: bool,
error: TelemetryInitError,
) -> Result<TelemetryRuntime, TelemetryInitError> {
if strict {
Err(error)
} else {
Ok(TelemetryRuntime {
log_format,
trace_export: TraceExport::Disabled,
warning: Some(error.to_string()),
})
}
}
fn resolve_log_format(log: &LogConfig, profile: Option<&str>) -> ResolvedLogFormat {
match log.format {
LogFormat::Pretty => ResolvedLogFormat::Pretty,
LogFormat::Json => ResolvedLogFormat::Json,
LogFormat::Auto => {
if is_production_profile(profile) || is_production_env() {
ResolvedLogFormat::Json
} else {
ResolvedLogFormat::Pretty
}
}
}
}
fn is_production_profile(profile: Option<&str>) -> bool {
profile.is_some_and(|value| {
value.eq_ignore_ascii_case("prod") || value.eq_ignore_ascii_case("production")
})
}
fn is_production_env() -> bool {
std::env::var("AUTUMN_ENV").is_ok_and(|value| value.eq_ignore_ascii_case("production"))
}
fn validate_otlp_endpoint(endpoint: &str) -> Result<(), TelemetryInitError> {
let uri: Uri = endpoint.parse().map_err(|error: http::uri::InvalidUri| {
TelemetryInitError::InvalidEndpoint {
endpoint: endpoint.to_owned(),
reason: error.to_string(),
}
})?;
if uri.scheme().is_none() {
return Err(TelemetryInitError::InvalidEndpoint {
endpoint: endpoint.to_owned(),
reason: "missing URI scheme".to_owned(),
});
}
if uri.authority().is_none() {
return Err(TelemetryInitError::InvalidEndpoint {
endpoint: endpoint.to_owned(),
reason: "missing URI authority".to_owned(),
});
}
Ok(())
}
fn build_filter(log: &LogConfig) -> EnvFilter {
EnvFilter::try_new(&log.level).unwrap_or_else(|error| {
eprintln!(
"Warning: invalid log filter {:?}: {error}, falling back to \"info\"",
log.level
);
EnvFilter::new("info")
})
}
fn init_logging_only(
log: &LogConfig,
log_format: ResolvedLogFormat,
) -> Result<TelemetryGuard, TelemetryInitError> {
let filter = build_filter(log);
match log_format {
ResolvedLogFormat::Json => tracing_subscriber::registry()
.with(filter)
.with(fmt::layer().json())
.try_init()
.map_err(|error| TelemetryInitError::SubscriberInit(error.to_string()))?,
ResolvedLogFormat::Pretty => tracing_subscriber::registry()
.with(filter)
.with(fmt::layer().pretty())
.try_init()
.map_err(|error| TelemetryInitError::SubscriberInit(error.to_string()))?,
}
Ok(TelemetryGuard::disabled())
}
#[cfg(feature = "telemetry-otlp")]
fn init_otlp_runtime(
log: &LogConfig,
log_format: ResolvedLogFormat,
strict: bool,
otlp: &OtlpTraceRuntime,
) -> Result<TelemetryGuard, TelemetryInitError> {
let provider = match build_tracer_provider(otlp) {
Ok(provider) => provider,
Err(error) => {
if strict {
return Err(error);
}
eprintln!("Warning: {error}");
return init_logging_only(log, log_format);
}
};
let tracer = provider.tracer("autumn-web");
let filter = build_filter(log);
match log_format {
ResolvedLogFormat::Json => tracing_subscriber::registry()
.with(filter)
.with(fmt::layer().json())
.with(tracing_opentelemetry::layer().with_tracer(tracer))
.try_init()
.map_err(|error| TelemetryInitError::SubscriberInit(error.to_string()))?,
ResolvedLogFormat::Pretty => tracing_subscriber::registry()
.with(filter)
.with(fmt::layer().pretty())
.with(tracing_opentelemetry::layer().with_tracer(tracer))
.try_init()
.map_err(|error| TelemetryInitError::SubscriberInit(error.to_string()))?,
}
Ok(TelemetryGuard::with_provider(provider))
}
#[cfg(not(feature = "telemetry-otlp"))]
fn init_otlp_runtime(
log: &LogConfig,
log_format: ResolvedLogFormat,
strict: bool,
_otlp: &OtlpTraceRuntime,
) -> Result<TelemetryGuard, TelemetryInitError> {
if strict {
return Err(TelemetryInitError::FeatureDisabled);
}
eprintln!("Warning: {}", TelemetryInitError::FeatureDisabled);
init_logging_only(log, log_format)
}
#[cfg(feature = "telemetry-otlp")]
fn build_tracer_provider(otlp: &OtlpTraceRuntime) -> Result<SdkTracerProvider, TelemetryInitError> {
let resource = Resource::builder()
.with_service_name(otlp.resource.service_name.clone())
.with_attributes(build_resource_attributes(&otlp.resource))
.build();
let exporter = match otlp.protocol {
TelemetryProtocol::Grpc => opentelemetry_otlp::SpanExporter::builder()
.with_tonic()
.with_endpoint(otlp.endpoint.clone())
.build(),
TelemetryProtocol::HttpProtobuf => opentelemetry_otlp::SpanExporter::builder()
.with_http()
.with_endpoint(otlp.endpoint.clone())
.build(),
}
.map_err(|error| TelemetryInitError::ExporterInit(error.to_string()))?;
opentelemetry::global::set_text_map_propagator(TraceContextPropagator::new());
Ok(SdkTracerProvider::builder()
.with_resource(resource)
.with_batch_exporter(exporter)
.build())
}
#[cfg(feature = "telemetry-otlp")]
fn build_resource_attributes(resource: &TelemetryResource) -> [KeyValue; 3] {
[
KeyValue::new(
"service.namespace",
resource.service_namespace.clone().unwrap_or_default(),
),
KeyValue::new("service.version", resource.service_version.clone()),
KeyValue::new("deployment.environment", resource.environment.clone()),
]
}
pub trait TelemetryProvider: Send + Sync + 'static {
fn init(
&self,
log: &LogConfig,
telemetry: &TelemetryConfig,
profile: Option<&str>,
) -> Result<TelemetryGuard, TelemetryInitError>;
}
#[derive(Debug, Default, Clone, Copy)]
pub struct TracingOtlpTelemetryProvider;
impl TracingOtlpTelemetryProvider {
#[must_use]
pub const fn new() -> Self {
Self
}
}
impl TelemetryProvider for TracingOtlpTelemetryProvider {
fn init(
&self,
log: &LogConfig,
telemetry: &TelemetryConfig,
profile: Option<&str>,
) -> Result<TelemetryGuard, TelemetryInitError> {
init(log, telemetry, profile)
}
}
#[cfg(test)]
mod tests {
use super::*;
struct NoOpTelemetryProvider;
impl TelemetryProvider for NoOpTelemetryProvider {
fn init(
&self,
_log: &LogConfig,
_telemetry: &TelemetryConfig,
_profile: Option<&str>,
) -> Result<TelemetryGuard, TelemetryInitError> {
Ok(TelemetryGuard::disabled())
}
}
#[test]
fn telemetry_provider_trait_returns_supplied_guard() {
let provider = NoOpTelemetryProvider;
let log = LogConfig::default();
let telemetry = TelemetryConfig::default();
let guard = provider
.init(&log, &telemetry, Some("test"))
.expect("no-op provider should succeed");
drop(guard);
}
#[test]
fn build_filter_falls_back_to_info_on_invalid_level() {
let log = LogConfig {
level: "this_is_not_a_valid_directive_it_lacks_an_equal_sign_and_is_not_a_level,foo=bar=baz=invalid".to_owned(),
..Default::default()
};
let filter = build_filter(&log);
assert_eq!(filter.to_string(), "info");
}
#[cfg(feature = "telemetry-otlp")]
#[test]
fn build_resource_attributes_populates_otel_semantic_keys() {
let resource = TelemetryResource {
service_name: "svc".into(),
service_namespace: Some("team".into()),
service_version: "1.2.3".into(),
environment: "staging".into(),
};
let attrs = build_resource_attributes(&resource);
let pairs: std::collections::HashMap<_, _> = attrs
.iter()
.map(|kv| (kv.key.as_str().to_owned(), kv.value.to_string()))
.collect();
assert_eq!(
pairs.get("service.namespace").map(String::as_str),
Some("team")
);
assert_eq!(
pairs.get("service.version").map(String::as_str),
Some("1.2.3")
);
assert_eq!(
pairs.get("deployment.environment").map(String::as_str),
Some("staging")
);
}
#[cfg(feature = "telemetry-otlp")]
struct MapExtractor<'a>(&'a std::collections::HashMap<&'static str, &'static str>);
#[cfg(feature = "telemetry-otlp")]
impl opentelemetry::propagation::Extractor for MapExtractor<'_> {
fn get(&self, key: &str) -> Option<&str> {
self.0.get(key).copied()
}
fn keys(&self) -> Vec<&str> {
self.0.keys().copied().collect()
}
}
#[cfg(feature = "telemetry-otlp")]
#[tokio::test]
async fn build_tracer_provider_installs_w3c_propagator_and_returns_provider() {
use opentelemetry::trace::TraceContextExt as _;
let otlp = OtlpTraceRuntime {
endpoint: "http://127.0.0.1:65530".into(),
protocol: TelemetryProtocol::Grpc,
resource: TelemetryResource {
service_name: "unit-test".into(),
service_namespace: None,
service_version: "0.0.0".into(),
environment: "test".into(),
},
};
let provider = build_tracer_provider(&otlp)
.expect("tonic exporter + provider build should succeed lazily");
let headers = std::collections::HashMap::from([(
"traceparent",
"00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01",
)]);
let cx =
opentelemetry::global::get_text_map_propagator(|p| p.extract(&MapExtractor(&headers)));
assert!(
cx.span().span_context().is_valid(),
"global propagator should have been installed"
);
let _ = provider.shutdown();
}
#[cfg(feature = "telemetry-otlp")]
#[tokio::test]
async fn build_tracer_provider_supports_http_protobuf_protocol() {
let otlp = OtlpTraceRuntime {
endpoint: "http://127.0.0.1:65531".into(),
protocol: TelemetryProtocol::HttpProtobuf,
resource: TelemetryResource {
service_name: "unit-test".into(),
service_namespace: None,
service_version: "0.0.0".into(),
environment: "test".into(),
},
};
let provider =
build_tracer_provider(&otlp).expect("http-protobuf exporter should build lazily");
let _ = provider.shutdown();
}
}