turbomcp_server/
observability.rs

1//! Production-grade OpenTelemetry observability integration
2//!
3//! This module provides comprehensive distributed tracing, structured logging,
4//! and observability configuration for TurboMCP server applications.
5//!
6//! # Features
7//!
8//! - **Structured Tracing**: Rich span attributes with user context propagation
9//! - **Security Audit Logging**: Structured events for security-relevant actions
10//! - **Performance Monitoring**: Request timing and tool execution metrics
11//! - **Production Ready**: Proper initialization and cleanup
12//!
13//! # Example
14//!
15//! ```rust,no_run
16//! use turbomcp_server::observability::{ObservabilityConfig, ObservabilityGuard};
17//! use turbomcp_server::ServerBuilder;
18//!
19//! #[tokio::main]
20//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
21//!     // Initialize observability
22//!     let config = ObservabilityConfig::default()
23//!         .with_service_name("my-mcp-server")
24//!         .enable_security_auditing()
25//!         .enable_performance_monitoring();
26//!
27//!     let _guard = config.init()?;
28//!
29//!     // Build server with observability
30//!     let server = ServerBuilder::new().build();
31//!     server.run_stdio().await?;
32//!     Ok(())
33//! }
34//! ```
35
36use std::time::Duration;
37use tracing::{Instrument, error, info, warn};
38use tracing_subscriber::{
39    Registry, filter::EnvFilter, fmt, layer::SubscriberExt, util::SubscriberInitExt,
40};
41
42/// OpenTelemetry observability configuration
43#[derive(Debug, Clone)]
44pub struct ObservabilityConfig {
45    /// Service name for tracing
46    pub service_name: String,
47    /// Service version
48    pub service_version: String,
49    /// Enable security audit logging
50    pub security_auditing: bool,
51    /// Enable performance monitoring
52    pub performance_monitoring: bool,
53    /// Custom log level filter
54    pub log_level: String,
55}
56
57impl Default for ObservabilityConfig {
58    fn default() -> Self {
59        Self {
60            service_name: "turbomcp-server".to_string(),
61            service_version: env!("CARGO_PKG_VERSION").to_string(),
62            security_auditing: true,
63            performance_monitoring: true,
64            log_level: "info,turbomcp=debug".to_string(),
65        }
66    }
67}
68
69impl ObservabilityConfig {
70    /// Create new observability configuration
71    pub fn new(service_name: impl Into<String>) -> Self {
72        Self {
73            service_name: service_name.into(),
74            ..Default::default()
75        }
76    }
77
78    /// Set service name
79    pub fn with_service_name(mut self, name: impl Into<String>) -> Self {
80        self.service_name = name.into();
81        self
82    }
83
84    /// Set service version
85    pub fn with_service_version(mut self, version: impl Into<String>) -> Self {
86        self.service_version = version.into();
87        self
88    }
89
90    /// Set log level filter
91    pub fn with_log_level(mut self, level: impl Into<String>) -> Self {
92        self.log_level = level.into();
93        self
94    }
95
96    /// Enable security audit logging
97    pub fn enable_security_auditing(mut self) -> Self {
98        self.security_auditing = true;
99        self
100    }
101
102    /// Enable performance monitoring
103    pub fn enable_performance_monitoring(mut self) -> Self {
104        self.performance_monitoring = true;
105        self
106    }
107
108    /// Initialize observability with this configuration
109    pub fn init(self) -> Result<ObservabilityGuard, ObservabilityError> {
110        ObservabilityGuard::init(self)
111    }
112}
113
114/// Observability initialization guard
115///
116/// Ensures proper cleanup on drop.
117#[derive(Debug)]
118pub struct ObservabilityGuard {
119    config: ObservabilityConfig,
120}
121
122impl ObservabilityGuard {
123    /// Initialize structured logging with the provided configuration
124    pub fn init(config: ObservabilityConfig) -> Result<Self, ObservabilityError> {
125        info!("Initializing TurboMCP observability");
126
127        // Create environment filter
128        let env_filter = EnvFilter::try_from_default_env()
129            .or_else(|_| EnvFilter::try_new(&config.log_level))
130            .map_err(|e| {
131                ObservabilityError::InitializationFailed(format!("Invalid log level: {}", e))
132            })?;
133
134        // Initialize tracing subscriber with structured JSON logging
135        Registry::default()
136            .with(env_filter)
137            .with(
138                fmt::layer()
139                    .with_target(true)
140                    .with_thread_ids(true)
141                    .with_file(true)
142                    .with_line_number(true)
143                    .json(),
144            )
145            .try_init()
146            .map_err(|e| {
147                ObservabilityError::InitializationFailed(format!("Tracing subscriber: {}", e))
148            })?;
149
150        // Initialize global observability components
151        let security_logger = SecurityAuditLogger::new(config.security_auditing);
152        let performance_monitor = PerformanceMonitor::new(config.performance_monitoring);
153
154        // Set global instances
155        futures::executor::block_on(async {
156            global_observability()
157                .set_security_audit_logger(security_logger)
158                .await;
159            global_observability()
160                .set_performance_monitor(performance_monitor)
161                .await;
162        });
163
164        info!(
165            service_name = %config.service_name,
166            service_version = %config.service_version,
167            security_auditing = config.security_auditing,
168            performance_monitoring = config.performance_monitoring,
169            "TurboMCP observability initialized successfully"
170        );
171
172        Ok(Self { config })
173    }
174
175    /// Get the service name
176    pub fn service_name(&self) -> &str {
177        &self.config.service_name
178    }
179
180    /// Get the configuration
181    pub fn config(&self) -> &ObservabilityConfig {
182        &self.config
183    }
184}
185
186impl Drop for ObservabilityGuard {
187    fn drop(&mut self) {
188        info!("Shutting down TurboMCP observability");
189    }
190}
191
192/// Security audit logger using structured tracing events
193#[derive(Debug, Clone)]
194pub struct SecurityAuditLogger {
195    enabled: bool,
196}
197
198impl SecurityAuditLogger {
199    /// Create new security audit logger
200    pub fn new(enabled: bool) -> Self {
201        Self { enabled }
202    }
203
204    /// Log authentication event
205    pub fn log_authentication(&self, user_id: &str, success: bool, details: Option<&str>) {
206        if !self.enabled {
207            return;
208        }
209
210        if success {
211            info!(
212                event = "authentication_success",
213                user_id = user_id,
214                details = details.unwrap_or(""),
215                "User authentication successful"
216            );
217        } else {
218            warn!(
219                event = "authentication_failure",
220                user_id = user_id,
221                details = details.unwrap_or(""),
222                "User authentication failed"
223            );
224        }
225    }
226
227    /// Log authorization event
228    pub fn log_authorization(&self, user_id: &str, resource: &str, action: &str, granted: bool) {
229        if !self.enabled {
230            return;
231        }
232
233        if granted {
234            info!(
235                event = "authorization_granted",
236                user_id = user_id,
237                resource = resource,
238                action = action,
239                "Authorization granted"
240            );
241        } else {
242            warn!(
243                event = "authorization_denied",
244                user_id = user_id,
245                resource = resource,
246                action = action,
247                "Authorization denied"
248            );
249        }
250    }
251
252    /// Log tool execution
253    pub fn log_tool_execution(
254        &self,
255        user_id: &str,
256        tool_name: &str,
257        success: bool,
258        execution_time_ms: u64,
259    ) {
260        if !self.enabled {
261            return;
262        }
263
264        if success {
265            info!(
266                event = "tool_execution_success",
267                user_id = user_id,
268                tool_name = tool_name,
269                execution_time_ms = execution_time_ms,
270                "Tool execution completed successfully"
271            );
272        } else {
273            warn!(
274                event = "tool_execution_failure",
275                user_id = user_id,
276                tool_name = tool_name,
277                execution_time_ms = execution_time_ms,
278                "Tool execution failed"
279            );
280        }
281    }
282
283    /// Log security violation
284    pub fn log_security_violation(&self, violation_type: &str, details: &str, severity: &str) {
285        if !self.enabled {
286            return;
287        }
288
289        error!(
290            event = "security_violation",
291            violation_type = violation_type,
292            details = details,
293            severity = severity,
294            "Security violation detected"
295        );
296    }
297}
298
299/// Performance monitoring utilities
300#[derive(Debug, Clone)]
301pub struct PerformanceMonitor {
302    enabled: bool,
303}
304
305impl PerformanceMonitor {
306    /// Create new performance monitor
307    pub fn new(enabled: bool) -> Self {
308        Self { enabled }
309    }
310
311    /// Start performance span
312    pub fn start_span(&self, operation: &str) -> PerformanceSpan {
313        if !self.enabled {
314            return PerformanceSpan::disabled();
315        }
316
317        PerformanceSpan::new(operation.to_string())
318    }
319
320    /// Create an instrumented future for performance monitoring
321    pub fn instrument_async<F>(
322        &self,
323        future: F,
324        operation: &str,
325    ) -> Box<dyn std::future::Future<Output = F::Output> + Send>
326    where
327        F: std::future::Future + Send + 'static,
328    {
329        if self.enabled {
330            let span = tracing::info_span!(
331                "performance_operation",
332                operation = operation,
333                performance_monitoring = true
334            );
335            Box::new(future.instrument(span))
336        } else {
337            // Return the future as-is without instrumentation
338            Box::new(future)
339        }
340    }
341}
342
343/// Performance tracking span
344#[derive(Debug)]
345pub struct PerformanceSpan {
346    enabled: bool,
347    operation: String,
348    start_time: std::time::Instant,
349}
350
351impl PerformanceSpan {
352    fn new(operation: String) -> Self {
353        Self {
354            enabled: true,
355            operation,
356            start_time: std::time::Instant::now(),
357        }
358    }
359
360    fn disabled() -> Self {
361        Self {
362            enabled: false,
363            operation: String::new(),
364            start_time: std::time::Instant::now(),
365        }
366    }
367
368    /// Record execution time and finish span
369    pub fn finish(self) -> Duration {
370        let duration = self.start_time.elapsed();
371
372        if self.enabled {
373            info!(
374                event = "performance_measurement",
375                operation = self.operation,
376                duration_ms = duration.as_millis(),
377                "Operation completed"
378            );
379        }
380
381        duration
382    }
383}
384
385/// Observability errors
386#[derive(Debug, thiserror::Error)]
387pub enum ObservabilityError {
388    /// Failed to initialize observability system
389    #[error("Failed to initialize observability: {0}")]
390    InitializationFailed(String),
391
392    /// Configuration error in observability setup
393    #[error("Configuration error: {0}")]
394    ConfigurationError(String),
395}
396
397/// Global observability state for server integration
398#[derive(Debug)]
399pub struct GlobalObservability {
400    security_audit_logger: tokio::sync::RwLock<Option<SecurityAuditLogger>>,
401    performance_monitor: tokio::sync::RwLock<Option<PerformanceMonitor>>,
402}
403
404impl Default for GlobalObservability {
405    fn default() -> Self {
406        Self::new()
407    }
408}
409
410impl GlobalObservability {
411    /// Initialize global observability
412    pub fn new() -> Self {
413        Self {
414            security_audit_logger: tokio::sync::RwLock::new(None),
415            performance_monitor: tokio::sync::RwLock::new(None),
416        }
417    }
418
419    /// Set security audit logger
420    pub async fn set_security_audit_logger(&self, logger: SecurityAuditLogger) {
421        *self.security_audit_logger.write().await = Some(logger);
422    }
423
424    /// Set performance monitor
425    pub async fn set_performance_monitor(&self, monitor: PerformanceMonitor) {
426        *self.performance_monitor.write().await = Some(monitor);
427    }
428
429    /// Get security audit logger
430    pub async fn security_audit_logger(&self) -> Option<SecurityAuditLogger> {
431        self.security_audit_logger.read().await.clone()
432    }
433
434    /// Get performance monitor
435    pub async fn performance_monitor(&self) -> Option<PerformanceMonitor> {
436        self.performance_monitor.read().await.clone()
437    }
438}
439
440/// Global observability instance
441static GLOBAL_OBSERVABILITY: once_cell::sync::Lazy<GlobalObservability> =
442    once_cell::sync::Lazy::new(GlobalObservability::new);
443
444/// Get global observability instance
445pub fn global_observability() -> &'static GlobalObservability {
446    &GLOBAL_OBSERVABILITY
447}
448
449/// Helper macro for instrumenting async functions
450#[macro_export]
451macro_rules! instrument_async {
452    ($operation:expr, $future:expr) => {{
453        let monitor = $crate::observability::global_observability()
454            .performance_monitor()
455            .await;
456
457        if let Some(monitor) = monitor {
458            monitor.instrument_async($future, $operation).await
459        } else {
460            $future.await
461        }
462    }};
463}
464
465/// Helper macro for performance span measurement
466#[macro_export]
467macro_rules! measure_performance {
468    ($operation:expr, $code:block) => {{
469        let monitor = $crate::observability::global_observability()
470            .performance_monitor()
471            .await;
472
473        let span = if let Some(ref monitor) = monitor {
474            Some(monitor.start_span($operation))
475        } else {
476            None
477        };
478
479        let result = $code;
480
481        if let Some(span) = span {
482            let _duration = span.finish();
483        }
484
485        result
486    }};
487}
488
489/// OTLP protocol configuration (placeholder for future enhancement)
490#[derive(Debug, Clone, PartialEq)]
491pub enum OtlpProtocol {
492    /// gRPC protocol (default, port 4317)
493    Grpc,
494    /// HTTP binary protocol (port 4318)
495    Http,
496}
497
498/// Trace sampling configuration (placeholder for future enhancement)
499#[derive(Debug, Clone)]
500pub struct SamplingConfig {
501    /// Sample rate (0.0 to 1.0)
502    pub sample_rate: f64,
503    /// Parent-based sampling
504    pub parent_based: bool,
505}
506
507impl Default for SamplingConfig {
508    fn default() -> Self {
509        Self {
510            sample_rate: 1.0,
511            parent_based: true,
512        }
513    }
514}
515
516#[cfg(test)]
517mod tests {
518    use super::*;
519
520    #[test]
521    fn test_observability_config_defaults() {
522        let config = ObservabilityConfig::default();
523        assert_eq!(config.service_name, "turbomcp-server");
524        assert!(config.security_auditing);
525        assert!(config.performance_monitoring);
526    }
527
528    #[test]
529    fn test_observability_config_builder() {
530        let config = ObservabilityConfig::new("test-service")
531            .with_service_version("1.0.0")
532            .with_log_level("debug")
533            .enable_security_auditing()
534            .enable_performance_monitoring();
535
536        assert_eq!(config.service_name, "test-service");
537        assert_eq!(config.service_version, "1.0.0");
538        assert_eq!(config.log_level, "debug");
539        assert!(config.security_auditing);
540        assert!(config.performance_monitoring);
541    }
542
543    #[tokio::test]
544    async fn test_security_audit_logger() {
545        let logger = SecurityAuditLogger::new(true);
546
547        // These should not panic
548        logger.log_authentication("user123", true, Some("JWT token"));
549        logger.log_authorization("user123", "/api/tools", "execute", true);
550        logger.log_tool_execution("user123", "file_reader", true, 150);
551        logger.log_security_violation("rate_limit_exceeded", "Too many requests", "warning");
552    }
553
554    #[test]
555    fn test_performance_monitor() {
556        let monitor = PerformanceMonitor::new(true);
557        let span = monitor.start_span("test_operation");
558
559        // Add a tiny delay to ensure measurable duration
560        std::thread::sleep(std::time::Duration::from_nanos(100));
561
562        let duration = span.finish();
563
564        assert!(duration.as_nanos() > 0);
565    }
566
567    #[tokio::test]
568    async fn test_global_observability() {
569        let global = global_observability();
570        let logger = SecurityAuditLogger::new(true);
571        let monitor = PerformanceMonitor::new(true);
572
573        global.set_security_audit_logger(logger.clone()).await;
574        global.set_performance_monitor(monitor.clone()).await;
575
576        let retrieved_logger = global.security_audit_logger().await;
577        let retrieved_monitor = global.performance_monitor().await;
578
579        assert!(retrieved_logger.is_some());
580        assert!(retrieved_monitor.is_some());
581    }
582}