turbomcp_telemetry/
init.rs1use crate::{TelemetryConfig, TelemetryError};
6use tracing::info;
7use tracing_subscriber::{
8 Registry, filter::EnvFilter, fmt, layer::SubscriberExt, util::SubscriberInitExt,
9};
10
11pub struct TelemetryGuard {
76 config: TelemetryConfig,
77 #[cfg(feature = "opentelemetry")]
78 tracer_provider: Option<opentelemetry_sdk::trace::SdkTracerProvider>,
79 #[cfg(feature = "prometheus")]
80 _metrics_handle: Option<MetricsHandle>,
81}
82
83impl std::fmt::Debug for TelemetryGuard {
84 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
85 let mut debug = f.debug_struct("TelemetryGuard");
86 debug.field("config", &self.config);
87 #[cfg(feature = "opentelemetry")]
88 debug.field(
89 "tracer_provider",
90 &self.tracer_provider.as_ref().map(|_| "SdkTracerProvider"),
91 );
92 debug.finish()
93 }
94}
95
96#[cfg(feature = "prometheus")]
97struct MetricsHandle {
98 _handle: metrics_exporter_prometheus::PrometheusHandle,
100}
101
102#[cfg(feature = "prometheus")]
103impl std::fmt::Debug for MetricsHandle {
104 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
105 f.debug_struct("MetricsHandle").finish()
106 }
107}
108
109impl TelemetryGuard {
110 pub fn init(config: TelemetryConfig) -> Result<Self, TelemetryError> {
112 #[cfg(feature = "opentelemetry")]
114 let tracer_provider = if config.otlp_endpoint.is_some() {
115 Some(init_tracer_provider(&config)?)
116 } else {
117 None
118 };
119
120 init_subscriber(
122 &config,
123 #[cfg(feature = "opentelemetry")]
124 tracer_provider.as_ref(),
125 )?;
126
127 #[cfg(feature = "prometheus")]
129 let metrics_handle = if let Some(port) = config.prometheus_port {
130 Some(init_prometheus(&config, port)?)
131 } else {
132 None
133 };
134
135 info!(
136 service_name = %config.service_name,
137 service_version = %config.service_version,
138 json_logs = config.json_logs,
139 stderr_output = config.stderr_output,
140 "TurboMCP telemetry initialized"
141 );
142
143 Ok(Self {
144 config,
145 #[cfg(feature = "opentelemetry")]
146 tracer_provider,
147 #[cfg(feature = "prometheus")]
148 _metrics_handle: metrics_handle,
149 })
150 }
151
152 #[must_use]
154 pub fn service_name(&self) -> &str {
155 &self.config.service_name
156 }
157
158 #[must_use]
160 pub fn service_version(&self) -> &str {
161 &self.config.service_version
162 }
163
164 #[must_use]
166 pub fn config(&self) -> &TelemetryConfig {
167 &self.config
168 }
169}
170
171impl Drop for TelemetryGuard {
172 fn drop(&mut self) {
173 info!(
174 service_name = %self.config.service_name,
175 "Shutting down TurboMCP telemetry"
176 );
177
178 #[cfg(feature = "opentelemetry")]
180 if let Some(ref provider) = self.tracer_provider
181 && let Err(e) = provider.shutdown()
182 {
183 tracing::error!("Error shutting down tracer provider: {e}");
184 }
185 }
186}
187
188fn init_subscriber(
194 config: &TelemetryConfig,
195 #[cfg(feature = "opentelemetry")] tracer_provider: Option<
196 &opentelemetry_sdk::trace::SdkTracerProvider,
197 >,
198) -> Result<(), TelemetryError> {
199 let env_filter = EnvFilter::try_from_default_env()
200 .or_else(|_| EnvFilter::try_new(&config.log_level))
201 .map_err(|e| TelemetryError::InvalidConfiguration(format!("Invalid log level: {e}")))?;
202
203 #[cfg(feature = "opentelemetry")]
207 if let Some(provider) = tracer_provider {
208 return init_with_otel(config, env_filter, provider);
209 }
210
211 init_without_otel(config, env_filter)
213}
214
215#[cfg(feature = "opentelemetry")]
217fn init_with_otel(
218 config: &TelemetryConfig,
219 env_filter: EnvFilter,
220 provider: &opentelemetry_sdk::trace::SdkTracerProvider,
221) -> Result<(), TelemetryError> {
222 use opentelemetry::trace::TracerProvider;
223
224 let tracer = provider.tracer("turbomcp-telemetry");
225
226 if config.json_logs && config.stderr_output {
229 let fmt_layer = fmt::layer()
230 .with_writer(std::io::stderr)
231 .with_target(true)
232 .with_thread_ids(true)
233 .with_file(true)
234 .with_line_number(true)
235 .json();
236
237 let otel_layer = tracing_opentelemetry::layer().with_tracer(tracer);
238
239 Registry::default()
240 .with(env_filter)
241 .with(otel_layer)
242 .with(fmt_layer)
243 .try_init()
244 .map_err(|e| TelemetryError::TracingError(e.to_string()))
245 } else if config.json_logs {
246 let fmt_layer = fmt::layer()
247 .with_target(true)
248 .with_thread_ids(true)
249 .with_file(true)
250 .with_line_number(true)
251 .json();
252
253 let otel_layer = tracing_opentelemetry::layer().with_tracer(tracer);
254
255 Registry::default()
256 .with(env_filter)
257 .with(otel_layer)
258 .with(fmt_layer)
259 .try_init()
260 .map_err(|e| TelemetryError::TracingError(e.to_string()))
261 } else if config.stderr_output {
262 let fmt_layer = fmt::layer()
263 .with_writer(std::io::stderr)
264 .with_target(true)
265 .with_thread_ids(false)
266 .pretty();
267
268 let otel_layer = tracing_opentelemetry::layer().with_tracer(tracer);
269
270 Registry::default()
271 .with(env_filter)
272 .with(otel_layer)
273 .with(fmt_layer)
274 .try_init()
275 .map_err(|e| TelemetryError::TracingError(e.to_string()))
276 } else {
277 let fmt_layer = fmt::layer()
278 .with_target(true)
279 .with_thread_ids(false)
280 .pretty();
281
282 let otel_layer = tracing_opentelemetry::layer().with_tracer(tracer);
283
284 Registry::default()
285 .with(env_filter)
286 .with(otel_layer)
287 .with(fmt_layer)
288 .try_init()
289 .map_err(|e| TelemetryError::TracingError(e.to_string()))
290 }
291}
292
293fn init_without_otel(
295 config: &TelemetryConfig,
296 env_filter: EnvFilter,
297) -> Result<(), TelemetryError> {
298 if config.json_logs && config.stderr_output {
299 let fmt_layer = fmt::layer()
300 .with_writer(std::io::stderr)
301 .with_target(true)
302 .with_thread_ids(true)
303 .with_file(true)
304 .with_line_number(true)
305 .json();
306
307 Registry::default()
308 .with(env_filter)
309 .with(fmt_layer)
310 .try_init()
311 .map_err(|e| TelemetryError::TracingError(e.to_string()))
312 } else if config.json_logs {
313 let fmt_layer = fmt::layer()
314 .with_target(true)
315 .with_thread_ids(true)
316 .with_file(true)
317 .with_line_number(true)
318 .json();
319
320 Registry::default()
321 .with(env_filter)
322 .with(fmt_layer)
323 .try_init()
324 .map_err(|e| TelemetryError::TracingError(e.to_string()))
325 } else if config.stderr_output {
326 let fmt_layer = fmt::layer()
327 .with_writer(std::io::stderr)
328 .with_target(true)
329 .with_thread_ids(false)
330 .pretty();
331
332 Registry::default()
333 .with(env_filter)
334 .with(fmt_layer)
335 .try_init()
336 .map_err(|e| TelemetryError::TracingError(e.to_string()))
337 } else {
338 let fmt_layer = fmt::layer()
339 .with_target(true)
340 .with_thread_ids(false)
341 .pretty();
342
343 Registry::default()
344 .with(env_filter)
345 .with(fmt_layer)
346 .try_init()
347 .map_err(|e| TelemetryError::TracingError(e.to_string()))
348 }
349}
350
351#[cfg(feature = "opentelemetry")]
353fn init_tracer_provider(
354 config: &TelemetryConfig,
355) -> Result<opentelemetry_sdk::trace::SdkTracerProvider, TelemetryError> {
356 use opentelemetry_otlp::WithExportConfig;
357 use opentelemetry_sdk::{
358 Resource,
359 trace::{RandomIdGenerator, Sampler, SdkTracerProvider},
360 };
361
362 let endpoint = config.otlp_endpoint.as_ref().ok_or_else(|| {
363 TelemetryError::InvalidConfiguration("OTLP endpoint not configured".into())
364 })?;
365
366 let mut resource_attrs = vec![
368 opentelemetry::KeyValue::new("service.name", config.service_name.clone()),
369 opentelemetry::KeyValue::new("service.version", config.service_version.clone()),
370 ];
371
372 for (key, value) in &config.resource_attributes {
373 resource_attrs.push(opentelemetry::KeyValue::new(key.clone(), value.clone()));
374 }
375
376 let resource = Resource::builder().with_attributes(resource_attrs).build();
377
378 let sampler = if (config.sampling_ratio - 1.0).abs() < f64::EPSILON {
380 Sampler::AlwaysOn
381 } else if config.sampling_ratio <= 0.0 {
382 Sampler::AlwaysOff
383 } else {
384 Sampler::TraceIdRatioBased(config.sampling_ratio)
385 };
386
387 let exporter = opentelemetry_otlp::SpanExporter::builder()
389 .with_http()
390 .with_endpoint(endpoint)
391 .with_timeout(config.export_timeout)
392 .build()
393 .map_err(|e| TelemetryError::OpenTelemetryError(e.to_string()))?;
394
395 let provider = SdkTracerProvider::builder()
397 .with_sampler(sampler)
398 .with_id_generator(RandomIdGenerator::default())
399 .with_resource(resource)
400 .with_batch_exporter(exporter)
401 .build();
402
403 Ok(provider)
404}
405
406#[cfg(feature = "prometheus")]
408fn init_prometheus(config: &TelemetryConfig, port: u16) -> Result<MetricsHandle, TelemetryError> {
409 use metrics_exporter_prometheus::PrometheusBuilder;
410 use std::net::SocketAddr;
411
412 let addr: SocketAddr = format!("0.0.0.0:{port}")
413 .parse()
414 .map_err(|e| TelemetryError::InvalidConfiguration(format!("Invalid port: {e}")))?;
415
416 let handle = PrometheusBuilder::new()
417 .with_http_listener(addr)
418 .install_recorder()
419 .map_err(|e| TelemetryError::MetricsError(e.to_string()))?;
420
421 info!(
422 port = port,
423 path = %config.prometheus_path,
424 "Prometheus metrics endpoint started"
425 );
426
427 Ok(MetricsHandle { _handle: handle })
428}
429
430#[cfg(test)]
431mod tests {
432 use super::*;
433
434 #[test]
435 fn test_telemetry_config_builder() {
436 let config = TelemetryConfig::builder()
437 .service_name("test-service")
438 .service_version("1.0.0")
439 .log_level("debug")
440 .build();
441
442 assert_eq!(config.service_name, "test-service");
443 assert_eq!(config.service_version, "1.0.0");
444 assert_eq!(config.log_level, "debug");
445 }
446
447 }