Skip to main content

turbomcp_telemetry/
init.rs

1//! Telemetry initialization
2//!
3//! Provides the [`TelemetryGuard`] for managing telemetry lifecycle.
4
5use crate::{TelemetryConfig, TelemetryError};
6use tracing::info;
7use tracing_subscriber::{
8    Registry, filter::EnvFilter, fmt, layer::SubscriberExt, util::SubscriberInitExt,
9};
10
11/// Guard that manages telemetry lifecycle
12///
13/// When dropped, ensures proper cleanup of telemetry resources including
14/// flushing any pending trace/metric data to exporters.
15///
16/// # Critical: Drop Behavior
17///
18/// **The `TelemetryGuard` MUST outlive all traced code in your application.**
19///
20/// When the guard is dropped, its `Drop` implementation:
21/// 1. Flushes all pending traces to configured exporters (OTLP, etc.)
22/// 2. Shuts down the OpenTelemetry tracer provider
23/// 3. Releases telemetry resources
24///
25/// ## Common Pitfall
26///
27/// ```rust,ignore
28/// // ❌ WRONG: Guard dropped too early
29/// {
30///     let _guard = TelemetryConfig::default().init()?;
31/// } // Guard dropped here
32/// my_traced_function().await; // Traces lost!
33/// ```
34///
35/// ```rust,ignore
36/// // ✅ CORRECT: Guard outlives traced code
37/// let _guard = TelemetryConfig::default().init()?;
38/// my_traced_function().await;
39/// // Guard dropped at end of scope, traces flushed
40/// ```
41///
42/// ## Best Practice
43///
44/// Store the guard in your main application struct or as a variable in `main()`:
45///
46/// ```rust,ignore
47/// #[tokio::main]
48/// async fn main() -> Result<()> {
49///     let _telemetry = TelemetryConfig::default().init()?;
50///
51///     // Run your server
52///     run_server().await?;
53///
54///     Ok(())
55///     // Guard dropped here after server shutdown
56/// }
57/// ```
58///
59/// # Example
60///
61/// ```rust,ignore
62/// use turbomcp_telemetry::{TelemetryConfig, TelemetryGuard};
63///
64/// let config = TelemetryConfig::builder()
65///     .service_name("my-server")
66///     .build();
67///
68/// // Initialize telemetry - guard must be kept alive
69/// let _guard = config.init()?;
70///
71/// // Your application code here...
72///
73/// // Telemetry is properly cleaned up when guard is dropped
74/// ```
75pub struct TelemetryGuard {
76    config: TelemetryConfig,
77    #[cfg(feature = "opentelemetry")]
78    tracer_provider: Option<opentelemetry_sdk::trace::SdkTracerProvider>,
79    #[cfg(feature = "prometheus")]
80    _metrics_handle: Option<MetricsHandle>,
81}
82
83impl std::fmt::Debug for TelemetryGuard {
84    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
85        let mut debug = f.debug_struct("TelemetryGuard");
86        debug.field("config", &self.config);
87        #[cfg(feature = "opentelemetry")]
88        debug.field(
89            "tracer_provider",
90            &self.tracer_provider.as_ref().map(|_| "SdkTracerProvider"),
91        );
92        debug.finish()
93    }
94}
95
96#[cfg(feature = "prometheus")]
97struct MetricsHandle {
98    // Handle to the metrics exporter for cleanup
99    _handle: metrics_exporter_prometheus::PrometheusHandle,
100}
101
102#[cfg(feature = "prometheus")]
103impl std::fmt::Debug for MetricsHandle {
104    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
105        f.debug_struct("MetricsHandle").finish()
106    }
107}
108
109impl TelemetryGuard {
110    /// Initialize telemetry with the provided configuration
111    pub fn init(config: TelemetryConfig) -> Result<Self, TelemetryError> {
112        // Initialize OpenTelemetry provider if configured
113        #[cfg(feature = "opentelemetry")]
114        let tracer_provider = if config.otlp_endpoint.is_some() {
115            Some(init_tracer_provider(&config)?)
116        } else {
117            None
118        };
119
120        // Build and initialize the subscriber based on configuration
121        init_subscriber(
122            &config,
123            #[cfg(feature = "opentelemetry")]
124            tracer_provider.as_ref(),
125        )?;
126
127        // Initialize Prometheus metrics if configured
128        #[cfg(feature = "prometheus")]
129        let metrics_handle = if let Some(port) = config.prometheus_port {
130            Some(init_prometheus(&config, port)?)
131        } else {
132            None
133        };
134
135        info!(
136            service_name = %config.service_name,
137            service_version = %config.service_version,
138            json_logs = config.json_logs,
139            stderr_output = config.stderr_output,
140            "TurboMCP telemetry initialized"
141        );
142
143        Ok(Self {
144            config,
145            #[cfg(feature = "opentelemetry")]
146            tracer_provider,
147            #[cfg(feature = "prometheus")]
148            _metrics_handle: metrics_handle,
149        })
150    }
151
152    /// Get the service name
153    #[must_use]
154    pub fn service_name(&self) -> &str {
155        &self.config.service_name
156    }
157
158    /// Get the service version
159    #[must_use]
160    pub fn service_version(&self) -> &str {
161        &self.config.service_version
162    }
163
164    /// Get the configuration
165    #[must_use]
166    pub fn config(&self) -> &TelemetryConfig {
167        &self.config
168    }
169}
170
171impl Drop for TelemetryGuard {
172    fn drop(&mut self) {
173        info!(
174            service_name = %self.config.service_name,
175            "Shutting down TurboMCP telemetry"
176        );
177
178        // Shutdown OpenTelemetry provider if it was initialized
179        #[cfg(feature = "opentelemetry")]
180        if let Some(ref provider) = self.tracer_provider
181            && let Err(e) = provider.shutdown()
182        {
183            tracing::error!("Error shutting down tracer provider: {e}");
184        }
185    }
186}
187
188/// Initialize the tracing subscriber with all configured layers
189///
190/// Due to Rust's type system and tracing's layered architecture, each configuration
191/// combination requires its own complete initialization path. The OpenTelemetry layer
192/// must be created fresh for each subscriber type.
193fn init_subscriber(
194    config: &TelemetryConfig,
195    #[cfg(feature = "opentelemetry")] tracer_provider: Option<
196        &opentelemetry_sdk::trace::SdkTracerProvider,
197    >,
198) -> Result<(), TelemetryError> {
199    let env_filter = EnvFilter::try_from_default_env()
200        .or_else(|_| EnvFilter::try_new(&config.log_level))
201        .map_err(|e| TelemetryError::InvalidConfiguration(format!("Invalid log level: {e}")))?;
202
203    // Handle all configuration combinations
204    // Note: We need completely separate initialization paths because the layer types differ
205
206    #[cfg(feature = "opentelemetry")]
207    if let Some(provider) = tracer_provider {
208        return init_with_otel(config, env_filter, provider);
209    }
210
211    // No OpenTelemetry - just fmt layer
212    init_without_otel(config, env_filter)
213}
214
215/// Initialize subscriber with OpenTelemetry layer
216#[cfg(feature = "opentelemetry")]
217fn init_with_otel(
218    config: &TelemetryConfig,
219    env_filter: EnvFilter,
220    provider: &opentelemetry_sdk::trace::SdkTracerProvider,
221) -> Result<(), TelemetryError> {
222    use opentelemetry::trace::TracerProvider;
223
224    let tracer = provider.tracer("turbomcp-telemetry");
225
226    // Each branch needs its own otel_layer creation because the layer type
227    // depends on the subscriber type it's being added to
228    if config.json_logs && config.stderr_output {
229        let fmt_layer = fmt::layer()
230            .with_writer(std::io::stderr)
231            .with_target(true)
232            .with_thread_ids(true)
233            .with_file(true)
234            .with_line_number(true)
235            .json();
236
237        let otel_layer = tracing_opentelemetry::layer().with_tracer(tracer);
238
239        Registry::default()
240            .with(env_filter)
241            .with(otel_layer)
242            .with(fmt_layer)
243            .try_init()
244            .map_err(|e| TelemetryError::TracingError(e.to_string()))
245    } else if config.json_logs {
246        let fmt_layer = fmt::layer()
247            .with_target(true)
248            .with_thread_ids(true)
249            .with_file(true)
250            .with_line_number(true)
251            .json();
252
253        let otel_layer = tracing_opentelemetry::layer().with_tracer(tracer);
254
255        Registry::default()
256            .with(env_filter)
257            .with(otel_layer)
258            .with(fmt_layer)
259            .try_init()
260            .map_err(|e| TelemetryError::TracingError(e.to_string()))
261    } else if config.stderr_output {
262        let fmt_layer = fmt::layer()
263            .with_writer(std::io::stderr)
264            .with_target(true)
265            .with_thread_ids(false)
266            .pretty();
267
268        let otel_layer = tracing_opentelemetry::layer().with_tracer(tracer);
269
270        Registry::default()
271            .with(env_filter)
272            .with(otel_layer)
273            .with(fmt_layer)
274            .try_init()
275            .map_err(|e| TelemetryError::TracingError(e.to_string()))
276    } else {
277        let fmt_layer = fmt::layer()
278            .with_target(true)
279            .with_thread_ids(false)
280            .pretty();
281
282        let otel_layer = tracing_opentelemetry::layer().with_tracer(tracer);
283
284        Registry::default()
285            .with(env_filter)
286            .with(otel_layer)
287            .with(fmt_layer)
288            .try_init()
289            .map_err(|e| TelemetryError::TracingError(e.to_string()))
290    }
291}
292
293/// Initialize subscriber without OpenTelemetry
294fn init_without_otel(
295    config: &TelemetryConfig,
296    env_filter: EnvFilter,
297) -> Result<(), TelemetryError> {
298    if config.json_logs && config.stderr_output {
299        let fmt_layer = fmt::layer()
300            .with_writer(std::io::stderr)
301            .with_target(true)
302            .with_thread_ids(true)
303            .with_file(true)
304            .with_line_number(true)
305            .json();
306
307        Registry::default()
308            .with(env_filter)
309            .with(fmt_layer)
310            .try_init()
311            .map_err(|e| TelemetryError::TracingError(e.to_string()))
312    } else if config.json_logs {
313        let fmt_layer = fmt::layer()
314            .with_target(true)
315            .with_thread_ids(true)
316            .with_file(true)
317            .with_line_number(true)
318            .json();
319
320        Registry::default()
321            .with(env_filter)
322            .with(fmt_layer)
323            .try_init()
324            .map_err(|e| TelemetryError::TracingError(e.to_string()))
325    } else if config.stderr_output {
326        let fmt_layer = fmt::layer()
327            .with_writer(std::io::stderr)
328            .with_target(true)
329            .with_thread_ids(false)
330            .pretty();
331
332        Registry::default()
333            .with(env_filter)
334            .with(fmt_layer)
335            .try_init()
336            .map_err(|e| TelemetryError::TracingError(e.to_string()))
337    } else {
338        let fmt_layer = fmt::layer()
339            .with_target(true)
340            .with_thread_ids(false)
341            .pretty();
342
343        Registry::default()
344            .with(env_filter)
345            .with(fmt_layer)
346            .try_init()
347            .map_err(|e| TelemetryError::TracingError(e.to_string()))
348    }
349}
350
351/// Initialize the OpenTelemetry tracer provider
352#[cfg(feature = "opentelemetry")]
353fn init_tracer_provider(
354    config: &TelemetryConfig,
355) -> Result<opentelemetry_sdk::trace::SdkTracerProvider, TelemetryError> {
356    use opentelemetry_otlp::WithExportConfig;
357    use opentelemetry_sdk::{
358        Resource,
359        trace::{RandomIdGenerator, Sampler, SdkTracerProvider},
360    };
361
362    let endpoint = config.otlp_endpoint.as_ref().ok_or_else(|| {
363        TelemetryError::InvalidConfiguration("OTLP endpoint not configured".into())
364    })?;
365
366    // Build resource with service info
367    let mut resource_attrs = vec![
368        opentelemetry::KeyValue::new("service.name", config.service_name.clone()),
369        opentelemetry::KeyValue::new("service.version", config.service_version.clone()),
370    ];
371
372    for (key, value) in &config.resource_attributes {
373        resource_attrs.push(opentelemetry::KeyValue::new(key.clone(), value.clone()));
374    }
375
376    let resource = Resource::builder().with_attributes(resource_attrs).build();
377
378    // Configure sampler
379    let sampler = if (config.sampling_ratio - 1.0).abs() < f64::EPSILON {
380        Sampler::AlwaysOn
381    } else if config.sampling_ratio <= 0.0 {
382        Sampler::AlwaysOff
383    } else {
384        Sampler::TraceIdRatioBased(config.sampling_ratio)
385    };
386
387    // Build the OTLP exporter
388    let exporter = opentelemetry_otlp::SpanExporter::builder()
389        .with_http()
390        .with_endpoint(endpoint)
391        .with_timeout(config.export_timeout)
392        .build()
393        .map_err(|e| TelemetryError::OpenTelemetryError(e.to_string()))?;
394
395    // Build the tracer provider (0.31 API - no runtime argument needed)
396    let provider = SdkTracerProvider::builder()
397        .with_sampler(sampler)
398        .with_id_generator(RandomIdGenerator::default())
399        .with_resource(resource)
400        .with_batch_exporter(exporter)
401        .build();
402
403    Ok(provider)
404}
405
406/// Initialize Prometheus metrics exporter
407#[cfg(feature = "prometheus")]
408fn init_prometheus(config: &TelemetryConfig, port: u16) -> Result<MetricsHandle, TelemetryError> {
409    use metrics_exporter_prometheus::PrometheusBuilder;
410    use std::net::SocketAddr;
411
412    let addr: SocketAddr = format!("0.0.0.0:{port}")
413        .parse()
414        .map_err(|e| TelemetryError::InvalidConfiguration(format!("Invalid port: {e}")))?;
415
416    let handle = PrometheusBuilder::new()
417        .with_http_listener(addr)
418        .install_recorder()
419        .map_err(|e| TelemetryError::MetricsError(e.to_string()))?;
420
421    info!(
422        port = port,
423        path = %config.prometheus_path,
424        "Prometheus metrics endpoint started"
425    );
426
427    Ok(MetricsHandle { _handle: handle })
428}
429
430#[cfg(test)]
431mod tests {
432    use super::*;
433
434    #[test]
435    fn test_telemetry_config_builder() {
436        let config = TelemetryConfig::builder()
437            .service_name("test-service")
438            .service_version("1.0.0")
439            .log_level("debug")
440            .build();
441
442        assert_eq!(config.service_name, "test-service");
443        assert_eq!(config.service_version, "1.0.0");
444        assert_eq!(config.log_level, "debug");
445    }
446
447    // Note: Full initialization tests require careful handling to avoid
448    // conflicts with the global tracing subscriber. See integration tests.
449}