1use std::time::Duration;
37use tracing::{Instrument, error, info, warn};
38use tracing_subscriber::{
39 Registry, filter::EnvFilter, fmt, layer::SubscriberExt, util::SubscriberInitExt,
40};
41
42#[derive(Debug, Clone)]
44pub struct ObservabilityConfig {
45 pub service_name: String,
47 pub service_version: String,
49 pub security_auditing: bool,
51 pub performance_monitoring: bool,
53 pub log_level: String,
55}
56
57impl Default for ObservabilityConfig {
58 fn default() -> Self {
59 Self {
60 service_name: "turbomcp-server".to_string(),
61 service_version: env!("CARGO_PKG_VERSION").to_string(),
62 security_auditing: true,
63 performance_monitoring: true,
64 log_level: "info,turbomcp=debug".to_string(),
65 }
66 }
67}
68
69impl ObservabilityConfig {
70 pub fn new(service_name: impl Into<String>) -> Self {
72 Self {
73 service_name: service_name.into(),
74 ..Default::default()
75 }
76 }
77
78 pub fn with_service_name(mut self, name: impl Into<String>) -> Self {
80 self.service_name = name.into();
81 self
82 }
83
84 pub fn with_service_version(mut self, version: impl Into<String>) -> Self {
86 self.service_version = version.into();
87 self
88 }
89
90 pub fn with_log_level(mut self, level: impl Into<String>) -> Self {
92 self.log_level = level.into();
93 self
94 }
95
96 pub fn enable_security_auditing(mut self) -> Self {
98 self.security_auditing = true;
99 self
100 }
101
102 pub fn enable_performance_monitoring(mut self) -> Self {
104 self.performance_monitoring = true;
105 self
106 }
107
108 pub fn init(self) -> Result<ObservabilityGuard, ObservabilityError> {
110 ObservabilityGuard::init(self)
111 }
112}
113
114#[derive(Debug)]
118pub struct ObservabilityGuard {
119 config: ObservabilityConfig,
120}
121
122impl ObservabilityGuard {
123 pub fn init(config: ObservabilityConfig) -> Result<Self, ObservabilityError> {
125 info!("Initializing TurboMCP observability");
126
127 let env_filter = EnvFilter::try_from_default_env()
129 .or_else(|_| EnvFilter::try_new(&config.log_level))
130 .map_err(|e| {
131 ObservabilityError::InitializationFailed(format!("Invalid log level: {}", e))
132 })?;
133
134 Registry::default()
137 .with(env_filter)
138 .with(
139 fmt::layer()
140 .with_writer(std::io::stderr)
141 .with_target(true)
142 .with_thread_ids(true)
143 .with_file(true)
144 .with_line_number(true)
145 .json(),
146 )
147 .try_init()
148 .map_err(|e| {
149 ObservabilityError::InitializationFailed(format!("Tracing subscriber: {}", e))
150 })?;
151
152 let security_logger = SecurityAuditLogger::new(config.security_auditing);
154 let performance_monitor = PerformanceMonitor::new(config.performance_monitoring);
155
156 futures::executor::block_on(async {
158 global_observability()
159 .set_security_audit_logger(security_logger)
160 .await;
161 global_observability()
162 .set_performance_monitor(performance_monitor)
163 .await;
164 });
165
166 info!(
167 service_name = %config.service_name,
168 service_version = %config.service_version,
169 security_auditing = config.security_auditing,
170 performance_monitoring = config.performance_monitoring,
171 "TurboMCP observability initialized successfully"
172 );
173
174 Ok(Self { config })
175 }
176
177 pub fn service_name(&self) -> &str {
179 &self.config.service_name
180 }
181
182 pub fn config(&self) -> &ObservabilityConfig {
184 &self.config
185 }
186}
187
188impl Drop for ObservabilityGuard {
189 fn drop(&mut self) {
190 info!("Shutting down TurboMCP observability");
191 }
192}
193
194#[derive(Debug, Clone)]
196pub struct SecurityAuditLogger {
197 enabled: bool,
198}
199
200impl SecurityAuditLogger {
201 pub fn new(enabled: bool) -> Self {
203 Self { enabled }
204 }
205
206 pub fn log_authentication(&self, user_id: &str, success: bool, details: Option<&str>) {
208 if !self.enabled {
209 return;
210 }
211
212 if success {
213 info!(
214 event = "authentication_success",
215 user_id = user_id,
216 details = details.unwrap_or(""),
217 "User authentication successful"
218 );
219 } else {
220 warn!(
221 event = "authentication_failure",
222 user_id = user_id,
223 details = details.unwrap_or(""),
224 "User authentication failed"
225 );
226 }
227 }
228
229 pub fn log_authorization(&self, user_id: &str, resource: &str, action: &str, granted: bool) {
231 if !self.enabled {
232 return;
233 }
234
235 if granted {
236 info!(
237 event = "authorization_granted",
238 user_id = user_id,
239 resource = resource,
240 action = action,
241 "Authorization granted"
242 );
243 } else {
244 warn!(
245 event = "authorization_denied",
246 user_id = user_id,
247 resource = resource,
248 action = action,
249 "Authorization denied"
250 );
251 }
252 }
253
254 pub fn log_tool_execution(
256 &self,
257 user_id: &str,
258 tool_name: &str,
259 success: bool,
260 execution_time_ms: u64,
261 ) {
262 if !self.enabled {
263 return;
264 }
265
266 if success {
267 info!(
268 event = "tool_execution_success",
269 user_id = user_id,
270 tool_name = tool_name,
271 execution_time_ms = execution_time_ms,
272 "Tool execution completed successfully"
273 );
274 } else {
275 warn!(
276 event = "tool_execution_failure",
277 user_id = user_id,
278 tool_name = tool_name,
279 execution_time_ms = execution_time_ms,
280 "Tool execution failed"
281 );
282 }
283 }
284
285 pub fn log_security_violation(&self, violation_type: &str, details: &str, severity: &str) {
287 if !self.enabled {
288 return;
289 }
290
291 error!(
292 event = "security_violation",
293 violation_type = violation_type,
294 details = details,
295 severity = severity,
296 "Security violation detected"
297 );
298 }
299}
300
301#[derive(Debug, Clone)]
303pub struct PerformanceMonitor {
304 enabled: bool,
305}
306
307impl PerformanceMonitor {
308 pub fn new(enabled: bool) -> Self {
310 Self { enabled }
311 }
312
313 pub fn start_span(&self, operation: &str) -> PerformanceSpan {
315 if !self.enabled {
316 return PerformanceSpan::disabled();
317 }
318
319 PerformanceSpan::new(operation.to_string())
320 }
321
322 pub fn instrument_async<F>(
324 &self,
325 future: F,
326 operation: &str,
327 ) -> Box<dyn std::future::Future<Output = F::Output> + Send>
328 where
329 F: std::future::Future + Send + 'static,
330 {
331 if self.enabled {
332 let span = tracing::info_span!(
333 "performance_operation",
334 operation = operation,
335 performance_monitoring = true
336 );
337 Box::new(future.instrument(span))
338 } else {
339 Box::new(future)
341 }
342 }
343}
344
345#[derive(Debug)]
347pub struct PerformanceSpan {
348 enabled: bool,
349 operation: String,
350 start_time: std::time::Instant,
351}
352
353impl PerformanceSpan {
354 fn new(operation: String) -> Self {
355 Self {
356 enabled: true,
357 operation,
358 start_time: std::time::Instant::now(),
359 }
360 }
361
362 fn disabled() -> Self {
363 Self {
364 enabled: false,
365 operation: String::new(),
366 start_time: std::time::Instant::now(),
367 }
368 }
369
370 pub fn finish(self) -> Duration {
372 let duration = self.start_time.elapsed();
373
374 if self.enabled {
375 info!(
376 event = "performance_measurement",
377 operation = self.operation,
378 duration_ms = duration.as_millis(),
379 "Operation completed"
380 );
381 }
382
383 duration
384 }
385}
386
387#[derive(Debug, thiserror::Error)]
389pub enum ObservabilityError {
390 #[error("Failed to initialize observability: {0}")]
392 InitializationFailed(String),
393
394 #[error("Configuration error: {0}")]
396 ConfigurationError(String),
397}
398
399#[derive(Debug)]
401pub struct GlobalObservability {
402 security_audit_logger: tokio::sync::RwLock<Option<SecurityAuditLogger>>,
403 performance_monitor: tokio::sync::RwLock<Option<PerformanceMonitor>>,
404}
405
406impl Default for GlobalObservability {
407 fn default() -> Self {
408 Self::new()
409 }
410}
411
412impl GlobalObservability {
413 pub fn new() -> Self {
415 Self {
416 security_audit_logger: tokio::sync::RwLock::new(None),
417 performance_monitor: tokio::sync::RwLock::new(None),
418 }
419 }
420
421 pub async fn set_security_audit_logger(&self, logger: SecurityAuditLogger) {
423 *self.security_audit_logger.write().await = Some(logger);
424 }
425
426 pub async fn set_performance_monitor(&self, monitor: PerformanceMonitor) {
428 *self.performance_monitor.write().await = Some(monitor);
429 }
430
431 pub async fn security_audit_logger(&self) -> Option<SecurityAuditLogger> {
433 self.security_audit_logger.read().await.clone()
434 }
435
436 pub async fn performance_monitor(&self) -> Option<PerformanceMonitor> {
438 self.performance_monitor.read().await.clone()
439 }
440}
441
442static GLOBAL_OBSERVABILITY: once_cell::sync::Lazy<GlobalObservability> =
444 once_cell::sync::Lazy::new(GlobalObservability::new);
445
446pub fn global_observability() -> &'static GlobalObservability {
448 &GLOBAL_OBSERVABILITY
449}
450
451#[macro_export]
453macro_rules! instrument_async {
454 ($operation:expr, $future:expr) => {{
455 let monitor = $crate::observability::global_observability()
456 .performance_monitor()
457 .await;
458
459 if let Some(monitor) = monitor {
460 monitor.instrument_async($future, $operation).await
461 } else {
462 $future.await
463 }
464 }};
465}
466
467#[macro_export]
469macro_rules! measure_performance {
470 ($operation:expr, $code:block) => {{
471 let monitor = $crate::observability::global_observability()
472 .performance_monitor()
473 .await;
474
475 let span = if let Some(ref monitor) = monitor {
476 Some(monitor.start_span($operation))
477 } else {
478 None
479 };
480
481 let result = $code;
482
483 if let Some(span) = span {
484 let _duration = span.finish();
485 }
486
487 result
488 }};
489}
490
491#[derive(Debug, Clone, PartialEq)]
493pub enum OtlpProtocol {
494 Grpc,
496 Http,
498}
499
500#[derive(Debug, Clone)]
502pub struct SamplingConfig {
503 pub sample_rate: f64,
505 pub parent_based: bool,
507}
508
509impl Default for SamplingConfig {
510 fn default() -> Self {
511 Self {
512 sample_rate: 1.0,
513 parent_based: true,
514 }
515 }
516}
517
518#[cfg(test)]
519mod tests {
520 use super::*;
521
522 #[test]
523 fn test_observability_config_defaults() {
524 let config = ObservabilityConfig::default();
525 assert_eq!(config.service_name, "turbomcp-server");
526 assert!(config.security_auditing);
527 assert!(config.performance_monitoring);
528 }
529
530 #[test]
531 fn test_observability_config_builder() {
532 let config = ObservabilityConfig::new("test-service")
533 .with_service_version("1.0.0")
534 .with_log_level("debug")
535 .enable_security_auditing()
536 .enable_performance_monitoring();
537
538 assert_eq!(config.service_name, "test-service");
539 assert_eq!(config.service_version, "1.0.0");
540 assert_eq!(config.log_level, "debug");
541 assert!(config.security_auditing);
542 assert!(config.performance_monitoring);
543 }
544
545 #[tokio::test]
546 async fn test_security_audit_logger() {
547 let logger = SecurityAuditLogger::new(true);
548
549 logger.log_authentication("user123", true, Some("JWT token"));
551 logger.log_authorization("user123", "/api/tools", "execute", true);
552 logger.log_tool_execution("user123", "file_reader", true, 150);
553 logger.log_security_violation("rate_limit_exceeded", "Too many requests", "warning");
554 }
555
556 #[test]
557 fn test_performance_monitor() {
558 let monitor = PerformanceMonitor::new(true);
559 let span = monitor.start_span("test_operation");
560
561 std::thread::sleep(std::time::Duration::from_nanos(100));
563
564 let duration = span.finish();
565
566 assert!(duration.as_nanos() > 0);
567 }
568
569 #[tokio::test]
570 async fn test_global_observability() {
571 let global = global_observability();
572 let logger = SecurityAuditLogger::new(true);
573 let monitor = PerformanceMonitor::new(true);
574
575 global.set_security_audit_logger(logger.clone()).await;
576 global.set_performance_monitor(monitor.clone()).await;
577
578 let retrieved_logger = global.security_audit_logger().await;
579 let retrieved_monitor = global.performance_monitor().await;
580
581 assert!(retrieved_logger.is_some());
582 assert!(retrieved_monitor.is_some());
583 }
584
585 #[tokio::test]
600 async fn test_observability_initialization_enables_logging() {
601 let config = ObservabilityConfig::default()
604 .with_service_name("regression-test")
605 .with_log_level("debug");
606
607 assert_eq!(config.service_name, "regression-test");
609 assert_eq!(config.log_level, "debug");
610 assert!(config.security_auditing);
611 assert!(config.performance_monitoring);
612
613 let result = config.init();
617 assert!(
618 result.is_ok(),
619 "Failed to initialize observability: {:?}",
620 result
621 );
622
623 info!("Test log message to stderr");
626
627 }
629}