actr_hyper/
observability.rs1use actr_config::ObservabilityConfig;
9use actr_protocol::ActorResult;
10#[cfg(feature = "opentelemetry")]
11use opentelemetry::{KeyValue, trace::TracerProvider as _};
12#[cfg(feature = "opentelemetry")]
13use opentelemetry_otlp::WithExportConfig;
14#[cfg(feature = "opentelemetry")]
15use opentelemetry_sdk::{
16 propagation::TraceContextPropagator, resource::Resource, trace::SdkTracerProvider,
17};
18#[cfg(feature = "opentelemetry")]
19use tracing_subscriber::filter::Targets;
20use tracing_subscriber::{
21 Layer, filter::EnvFilter, fmt, layer::SubscriberExt, prelude::*, registry::LookupSpan,
22};
23
24type BoxedLayer<S> = Box<dyn Layer<S> + Send + Sync + 'static>;
29
30#[derive(Default)]
32pub struct ObservabilityGuard {
33 #[cfg(feature = "opentelemetry")]
34 tracer_provider: Option<SdkTracerProvider>,
35}
36
37impl Drop for ObservabilityGuard {
38 fn drop(&mut self) {
39 #[cfg(feature = "opentelemetry")]
40 if let Some(provider) = self.tracer_provider.take() {
41 if let Err(err) = provider.shutdown() {
42 tracing::warn!("Failed to shutdown tracer provider: {err:?}");
43 }
44 }
45 }
46}
47
48pub fn init_observability(
58 cfg: &actr_config::ObservabilityConfig,
59) -> ActorResult<ObservabilityGuard> {
60 init_observability_with_layer(cfg, None::<BoxedLayer<tracing_subscriber::Registry>>)
61}
62
63pub fn init_observability_with_layer<L>(
83 cfg: &ObservabilityConfig,
84 platform_layer: Option<L>,
85) -> ActorResult<ObservabilityGuard>
86where
87 L: Layer<tracing_subscriber::Registry> + Send + Sync + 'static,
88{
89 let level_directive = std::env::var("RUST_LOG")
90 .ok()
91 .filter(|s| !s.is_empty())
92 .unwrap_or_else(|| cfg.filter_level.clone());
93 let env_filter =
94 EnvFilter::try_new(level_directive.clone()).unwrap_or_else(|_| EnvFilter::new("info"));
95
96 init_subscriber_internal(cfg, env_filter, platform_layer)
97}
98
99#[cfg(not(feature = "opentelemetry"))]
104fn init_subscriber_internal<L>(
105 _cfg: &ObservabilityConfig,
106 env_filter: EnvFilter,
107 platform_layer: Option<L>,
108) -> ActorResult<ObservabilityGuard>
109where
110 L: Layer<tracing_subscriber::Registry> + Send + Sync + 'static,
111{
112 let filtered_layer = if let Some(layer) = platform_layer {
115 layer.with_filter(env_filter).boxed()
116 } else {
117 create_default_fmt_layer().with_filter(env_filter).boxed()
118 };
119
120 let _ = tracing_subscriber::registry()
121 .with(filtered_layer)
122 .try_init();
123
124 Ok(ObservabilityGuard::default())
125}
126
127#[cfg(feature = "opentelemetry")]
128fn init_subscriber_internal<L>(
129 cfg: &ObservabilityConfig,
130 env_filter: EnvFilter,
131 platform_layer: Option<L>,
132) -> ActorResult<ObservabilityGuard>
133where
134 L: Layer<tracing_subscriber::Registry> + Send + Sync + 'static,
135{
136 let filtered_output_layer = if let Some(layer) = platform_layer {
140 layer.with_filter(env_filter).boxed()
141 } else {
142 create_default_fmt_layer().with_filter(env_filter).boxed()
143 };
144
145 let mut tracer_provider = None;
147 if cfg.tracing_enabled {
148 let provider = build_otel_provider(cfg)?;
149 let tracer = provider.tracer("actr-runtime");
150 let otel_layer = tracing_opentelemetry::layer().with_tracer(tracer);
151
152 let otel_default_level = cfg
154 .filter_level
155 .parse::<tracing::Level>()
156 .unwrap_or(tracing::Level::INFO);
157 let otel_filter = Targets::new()
158 .with_default(otel_default_level)
159 .with_target("tungstenite", tracing::Level::ERROR) .with_target("tokio_tungstenite", tracing::Level::ERROR) .with_target("wasmtime", tracing::Level::WARN)
162 .with_target("webrtc_mdns::conn", tracing::Level::WARN)
163 .with_target("webrtc_ice::agent::agent_internal", tracing::Level::WARN)
164 .with_target("webrtc_sctp", tracing::Level::WARN);
165
166 let _ = tracing_subscriber::registry()
167 .with(filtered_output_layer)
168 .with(otel_layer.with_filter(otel_filter))
169 .try_init();
170 tracer_provider = Some(provider);
171 } else {
172 let _ = tracing_subscriber::registry()
173 .with(filtered_output_layer)
174 .try_init();
175 }
176
177 Ok(ObservabilityGuard { tracer_provider })
178}
179
180fn create_default_fmt_layer<S>() -> impl Layer<S>
182where
183 S: tracing::Subscriber + for<'a> LookupSpan<'a>,
184{
185 let enable_ansi = cfg!(all(
188 unix,
189 not(target_os = "ios"),
190 not(target_os = "android")
191 ));
192
193 fmt::layer()
194 .with_writer(std::io::stderr)
195 .with_target(true)
196 .with_level(true)
197 .with_line_number(true)
198 .with_file(true)
199 .with_ansi(enable_ansi)
200}
201
202#[cfg(feature = "opentelemetry")]
203fn build_otel_provider(config: &ObservabilityConfig) -> ActorResult<SdkTracerProvider> {
204 let exporter = opentelemetry_otlp::SpanExporter::builder()
205 .with_tonic()
206 .with_endpoint(config.tracing_endpoint.clone())
207 .build()
208 .map_err(|e| {
209 actr_protocol::ActrError::Internal(format!("OTLP exporter build failed: {e}"))
210 })?;
211
212 let resource = Resource::builder()
213 .with_service_name(config.tracing_service_name.clone())
214 .with_attributes([KeyValue::new("telemetry.sdk.language", "rust")])
215 .build();
216
217 let tracer_provider = opentelemetry_sdk::trace::SdkTracerProvider::builder()
218 .with_resource(resource)
219 .with_batch_exporter(exporter)
220 .build();
221
222 opentelemetry::global::set_tracer_provider(tracer_provider.clone());
223 opentelemetry::global::set_text_map_propagator(TraceContextPropagator::new());
224
225 Ok(tracer_provider)
226}