use actr_config::ObservabilityConfig;
use actr_protocol::ActorResult;
#[cfg(feature = "opentelemetry")]
use opentelemetry::{KeyValue, trace::TracerProvider as _};
#[cfg(feature = "opentelemetry")]
use opentelemetry_otlp::WithExportConfig;
#[cfg(feature = "opentelemetry")]
use opentelemetry_sdk::{
propagation::TraceContextPropagator, resource::Resource, trace::SdkTracerProvider,
};
#[cfg(feature = "opentelemetry")]
use tracing_subscriber::filter::Targets;
use tracing_subscriber::{
Layer, filter::EnvFilter, fmt, layer::SubscriberExt, prelude::*, registry::LookupSpan,
};
type BoxedLayer<S> = Box<dyn Layer<S> + Send + Sync + 'static>;
#[derive(Default)]
pub struct ObservabilityGuard {
#[cfg(feature = "opentelemetry")]
tracer_provider: Option<SdkTracerProvider>,
}
impl Drop for ObservabilityGuard {
fn drop(&mut self) {
#[cfg(feature = "opentelemetry")]
if let Some(provider) = self.tracer_provider.take() {
if let Err(err) = provider.shutdown() {
tracing::warn!("Failed to shutdown tracer provider: {err:?}");
}
}
}
}
pub fn init_observability(
cfg: &actr_config::ObservabilityConfig,
) -> ActorResult<ObservabilityGuard> {
init_observability_with_layer(cfg, None::<BoxedLayer<tracing_subscriber::Registry>>)
}
pub fn init_observability_with_layer<L>(
cfg: &ObservabilityConfig,
platform_layer: Option<L>,
) -> ActorResult<ObservabilityGuard>
where
L: Layer<tracing_subscriber::Registry> + Send + Sync + 'static,
{
let level_directive = std::env::var("RUST_LOG")
.ok()
.filter(|s| !s.is_empty())
.unwrap_or_else(|| cfg.filter_level.clone());
let env_filter =
EnvFilter::try_new(level_directive.clone()).unwrap_or_else(|_| EnvFilter::new("info"));
init_subscriber_internal(cfg, env_filter, platform_layer)
}
#[cfg(not(feature = "opentelemetry"))]
fn init_subscriber_internal<L>(
_cfg: &ObservabilityConfig,
env_filter: EnvFilter,
platform_layer: Option<L>,
) -> ActorResult<ObservabilityGuard>
where
L: Layer<tracing_subscriber::Registry> + Send + Sync + 'static,
{
let filtered_layer = if let Some(layer) = platform_layer {
layer.with_filter(env_filter).boxed()
} else {
create_default_fmt_layer().with_filter(env_filter).boxed()
};
let _ = tracing_subscriber::registry()
.with(filtered_layer)
.try_init();
Ok(ObservabilityGuard::default())
}
#[cfg(feature = "opentelemetry")]
fn init_subscriber_internal<L>(
cfg: &ObservabilityConfig,
env_filter: EnvFilter,
platform_layer: Option<L>,
) -> ActorResult<ObservabilityGuard>
where
L: Layer<tracing_subscriber::Registry> + Send + Sync + 'static,
{
let filtered_output_layer = if let Some(layer) = platform_layer {
layer.with_filter(env_filter).boxed()
} else {
create_default_fmt_layer().with_filter(env_filter).boxed()
};
let mut tracer_provider = None;
if cfg.tracing_enabled {
let provider = build_otel_provider(cfg)?;
let tracer = provider.tracer("actr-runtime");
let otel_layer = tracing_opentelemetry::layer().with_tracer(tracer);
let otel_default_level = cfg
.filter_level
.parse::<tracing::Level>()
.unwrap_or(tracing::Level::INFO);
let otel_filter = Targets::new()
.with_default(otel_default_level)
.with_target("tungstenite", tracing::Level::ERROR) .with_target("tokio_tungstenite", tracing::Level::ERROR) .with_target("wasmtime", tracing::Level::WARN)
.with_target("webrtc_mdns::conn", tracing::Level::WARN)
.with_target("webrtc_ice::agent::agent_internal", tracing::Level::WARN)
.with_target("webrtc_sctp", tracing::Level::WARN);
let _ = tracing_subscriber::registry()
.with(filtered_output_layer)
.with(otel_layer.with_filter(otel_filter))
.try_init();
tracer_provider = Some(provider);
} else {
let _ = tracing_subscriber::registry()
.with(filtered_output_layer)
.try_init();
}
Ok(ObservabilityGuard { tracer_provider })
}
fn create_default_fmt_layer<S>() -> impl Layer<S>
where
S: tracing::Subscriber + for<'a> LookupSpan<'a>,
{
let enable_ansi = cfg!(all(
unix,
not(target_os = "ios"),
not(target_os = "android")
));
fmt::layer()
.with_writer(std::io::stderr)
.with_target(true)
.with_level(true)
.with_line_number(true)
.with_file(true)
.with_ansi(enable_ansi)
}
#[cfg(feature = "opentelemetry")]
fn build_otel_provider(config: &ObservabilityConfig) -> ActorResult<SdkTracerProvider> {
let exporter = opentelemetry_otlp::SpanExporter::builder()
.with_tonic()
.with_endpoint(config.tracing_endpoint.clone())
.build()
.map_err(|e| {
actr_protocol::ActrError::Internal(format!("OTLP exporter build failed: {e}"))
})?;
let resource = Resource::builder()
.with_service_name(config.tracing_service_name.clone())
.with_attributes([KeyValue::new("telemetry.sdk.language", "rust")])
.build();
let tracer_provider = opentelemetry_sdk::trace::SdkTracerProvider::builder()
.with_resource(resource)
.with_batch_exporter(exporter)
.build();
opentelemetry::global::set_tracer_provider(tracer_provider.clone());
opentelemetry::global::set_text_map_propagator(TraceContextPropagator::new());
Ok(tracer_provider)
}