use crate::observability::config::{LogFilterConfig, ObservabilityConfig};
use crate::observability::config_file::{
observability_config_from_rust_log, validate_observability_config,
validate_observability_config_path, ConfigWatchGuard,
};
use crate::observability::filter::SharedOrderedFilter;
use crate::observability::local::{build_console_layer, build_local_layer};
use crate::observability::metrics::build_metrics_provider;
use crate::observability::reload::{FilterReloadHandle, ObservabilityReloadHandle};
use crate::observability::remote::build_remote_log_layer;
use crate::observability::trace::build_trace_layer;
use anyhow::{Context as AnyhowContext, Result};
use opentelemetry::global;
use opentelemetry_sdk::{
logs::SdkLoggerProvider, metrics::SdkMeterProvider, trace::SdkTracerProvider,
};
use std::path::Path;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, Layer, Registry};
pub struct ObservabilityGuards {
config_watch_guard: Option<ConfigWatchGuard>,
pub file_guard: Option<tracing_appender::non_blocking::WorkerGuard>,
pub logger_provider: Option<SdkLoggerProvider>,
pub tracer_provider: Option<SdkTracerProvider>,
pub meter_provider: Option<SdkMeterProvider>,
}
impl ObservabilityGuards {
pub fn shutdown(self) {
drop(self.config_watch_guard);
if let Some(provider) = self.logger_provider {
let _ = provider.shutdown();
}
if let Some(provider) = self.tracer_provider {
let _ = provider.shutdown();
}
if let Some(provider) = self.meter_provider {
let _ = provider.shutdown();
}
drop(self.file_guard);
}
}
pub fn init_observability(
config: ObservabilityConfig,
) -> Result<(ObservabilityReloadHandle, ObservabilityGuards)> {
validate_observability_config(&config)?;
let reload_enabled = config.log.dynamic.enabled;
let remote_uses_local_filter =
config.log.remote.enabled.unwrap_or(false) && config.log.remote.filter.is_none();
let local_filter = SharedOrderedFilter::new(config.log.filter.clone())
.context("failed to build observability filter")?;
let local_reload = FilterReloadHandle::new(reload_enabled, "local", local_filter.clone());
let format = config.log.format.unwrap_or_default();
let mut layers: Vec<Box<dyn Layer<Registry> + Send + Sync>> = Vec::new();
let mut file_guard = None;
let mut logger_provider = None;
let mut tracer_provider = None;
let mut meter_provider = None;
let mut remote_reload = None;
let mut trace_reload = None;
if config
.log
.console
.as_ref()
.and_then(|console| console.enabled)
.unwrap_or(false)
{
layers.push(build_console_layer(format, local_filter.clone()));
}
if config.log.local.enabled.unwrap_or(true) {
let (local_layer, guard) =
build_local_layer(&config.log.local, format, local_filter.clone());
file_guard = Some(guard);
layers.push(local_layer);
}
if config.log.remote.enabled.unwrap_or(false) {
let remote_filter = match &config.log.remote.filter {
Some(remote_filter_config) => SharedOrderedFilter::new(remote_filter_config.clone())
.context("failed to build remote observability filter")?,
None => local_filter.clone(),
};
remote_reload = Some(FilterReloadHandle::new(
reload_enabled,
"remote",
remote_filter.clone(),
));
let (remote_layer, provider) =
build_remote_log_layer(&config.log.remote, &config.trace, remote_filter)?;
logger_provider = Some(provider);
layers.push(remote_layer);
}
if config.trace.enabled {
let trace_filter_config = config
.trace
.filter
.clone()
.unwrap_or_else(default_trace_filter);
let trace_filter = SharedOrderedFilter::new(trace_filter_config)
.context("failed to build trace observability filter")?;
trace_reload = Some(FilterReloadHandle::new(
reload_enabled,
"trace",
trace_filter.clone(),
));
let (trace_layer, provider) = build_trace_layer(&config.trace, trace_filter)?;
global::set_tracer_provider(provider.clone());
tracer_provider = Some(provider);
layers.push(trace_layer);
}
if let Some(metrics) = &config.metrics {
if metrics.enabled {
let provider = build_metrics_provider(metrics)?;
global::set_meter_provider(provider.clone());
meter_provider = Some(provider);
}
}
tracing_subscriber::registry()
.with(layers)
.try_init()
.context("failed to initialize tracing subscriber")?;
Ok((
ObservabilityReloadHandle::new(
local_reload,
remote_reload,
trace_reload,
remote_uses_local_filter,
),
ObservabilityGuards {
config_watch_guard: None,
file_guard,
logger_provider,
tracer_provider,
meter_provider,
},
))
}
pub fn init_observability_from_optional_path(
path: Option<&Path>,
) -> Result<(ObservabilityReloadHandle, ObservabilityGuards)> {
let Some(path) = path.filter(|path| path.exists()) else {
let rust_log = std::env::var("RUST_LOG").ok();
return init_observability(observability_config_from_rust_log(rust_log.as_deref())?);
};
let config = validate_observability_config_path(path)?;
let (reload, mut guards) = init_observability(config)?;
guards.config_watch_guard = Some(ConfigWatchGuard::start(path.to_path_buf(), reload.clone()));
Ok((reload, guards))
}
fn default_trace_filter() -> LogFilterConfig {
LogFilterConfig {
default_level: "trace".to_string(),
overrides: Vec::new(),
}
}