1use std::time::Duration;
37use tracing::{Instrument, error, info, warn};
38use tracing_subscriber::{
39 Registry, filter::EnvFilter, fmt, layer::SubscriberExt, util::SubscriberInitExt,
40};
41
42#[derive(Debug, Clone)]
44pub struct ObservabilityConfig {
45 pub service_name: String,
47 pub service_version: String,
49 pub security_auditing: bool,
51 pub performance_monitoring: bool,
53 pub log_level: String,
55}
56
57impl Default for ObservabilityConfig {
58 fn default() -> Self {
59 Self {
60 service_name: "turbomcp-server".to_string(),
61 service_version: env!("CARGO_PKG_VERSION").to_string(),
62 security_auditing: true,
63 performance_monitoring: true,
64 log_level: "info,turbomcp=debug".to_string(),
65 }
66 }
67}
68
69impl ObservabilityConfig {
70 pub fn new(service_name: impl Into<String>) -> Self {
72 Self {
73 service_name: service_name.into(),
74 ..Default::default()
75 }
76 }
77
78 pub fn with_service_name(mut self, name: impl Into<String>) -> Self {
80 self.service_name = name.into();
81 self
82 }
83
84 pub fn with_service_version(mut self, version: impl Into<String>) -> Self {
86 self.service_version = version.into();
87 self
88 }
89
90 pub fn with_log_level(mut self, level: impl Into<String>) -> Self {
92 self.log_level = level.into();
93 self
94 }
95
96 pub fn enable_security_auditing(mut self) -> Self {
98 self.security_auditing = true;
99 self
100 }
101
102 pub fn enable_performance_monitoring(mut self) -> Self {
104 self.performance_monitoring = true;
105 self
106 }
107
108 pub fn init(self) -> Result<ObservabilityGuard, ObservabilityError> {
110 ObservabilityGuard::init(self)
111 }
112}
113
114#[derive(Debug)]
118pub struct ObservabilityGuard {
119 config: ObservabilityConfig,
120}
121
122impl ObservabilityGuard {
123 pub fn init(config: ObservabilityConfig) -> Result<Self, ObservabilityError> {
125 info!("Initializing TurboMCP observability");
126
127 let env_filter = EnvFilter::try_from_default_env()
129 .or_else(|_| EnvFilter::try_new(&config.log_level))
130 .map_err(|e| {
131 ObservabilityError::InitializationFailed(format!("Invalid log level: {}", e))
132 })?;
133
134 Registry::default()
136 .with(env_filter)
137 .with(
138 fmt::layer()
139 .with_target(true)
140 .with_thread_ids(true)
141 .with_file(true)
142 .with_line_number(true)
143 .json(),
144 )
145 .try_init()
146 .map_err(|e| {
147 ObservabilityError::InitializationFailed(format!("Tracing subscriber: {}", e))
148 })?;
149
150 let security_logger = SecurityAuditLogger::new(config.security_auditing);
152 let performance_monitor = PerformanceMonitor::new(config.performance_monitoring);
153
154 futures::executor::block_on(async {
156 global_observability()
157 .set_security_audit_logger(security_logger)
158 .await;
159 global_observability()
160 .set_performance_monitor(performance_monitor)
161 .await;
162 });
163
164 info!(
165 service_name = %config.service_name,
166 service_version = %config.service_version,
167 security_auditing = config.security_auditing,
168 performance_monitoring = config.performance_monitoring,
169 "TurboMCP observability initialized successfully"
170 );
171
172 Ok(Self { config })
173 }
174
175 pub fn service_name(&self) -> &str {
177 &self.config.service_name
178 }
179
180 pub fn config(&self) -> &ObservabilityConfig {
182 &self.config
183 }
184}
185
186impl Drop for ObservabilityGuard {
187 fn drop(&mut self) {
188 info!("Shutting down TurboMCP observability");
189 }
190}
191
192#[derive(Debug, Clone)]
194pub struct SecurityAuditLogger {
195 enabled: bool,
196}
197
198impl SecurityAuditLogger {
199 pub fn new(enabled: bool) -> Self {
201 Self { enabled }
202 }
203
204 pub fn log_authentication(&self, user_id: &str, success: bool, details: Option<&str>) {
206 if !self.enabled {
207 return;
208 }
209
210 if success {
211 info!(
212 event = "authentication_success",
213 user_id = user_id,
214 details = details.unwrap_or(""),
215 "User authentication successful"
216 );
217 } else {
218 warn!(
219 event = "authentication_failure",
220 user_id = user_id,
221 details = details.unwrap_or(""),
222 "User authentication failed"
223 );
224 }
225 }
226
227 pub fn log_authorization(&self, user_id: &str, resource: &str, action: &str, granted: bool) {
229 if !self.enabled {
230 return;
231 }
232
233 if granted {
234 info!(
235 event = "authorization_granted",
236 user_id = user_id,
237 resource = resource,
238 action = action,
239 "Authorization granted"
240 );
241 } else {
242 warn!(
243 event = "authorization_denied",
244 user_id = user_id,
245 resource = resource,
246 action = action,
247 "Authorization denied"
248 );
249 }
250 }
251
252 pub fn log_tool_execution(
254 &self,
255 user_id: &str,
256 tool_name: &str,
257 success: bool,
258 execution_time_ms: u64,
259 ) {
260 if !self.enabled {
261 return;
262 }
263
264 if success {
265 info!(
266 event = "tool_execution_success",
267 user_id = user_id,
268 tool_name = tool_name,
269 execution_time_ms = execution_time_ms,
270 "Tool execution completed successfully"
271 );
272 } else {
273 warn!(
274 event = "tool_execution_failure",
275 user_id = user_id,
276 tool_name = tool_name,
277 execution_time_ms = execution_time_ms,
278 "Tool execution failed"
279 );
280 }
281 }
282
283 pub fn log_security_violation(&self, violation_type: &str, details: &str, severity: &str) {
285 if !self.enabled {
286 return;
287 }
288
289 error!(
290 event = "security_violation",
291 violation_type = violation_type,
292 details = details,
293 severity = severity,
294 "Security violation detected"
295 );
296 }
297}
298
299#[derive(Debug, Clone)]
301pub struct PerformanceMonitor {
302 enabled: bool,
303}
304
305impl PerformanceMonitor {
306 pub fn new(enabled: bool) -> Self {
308 Self { enabled }
309 }
310
311 pub fn start_span(&self, operation: &str) -> PerformanceSpan {
313 if !self.enabled {
314 return PerformanceSpan::disabled();
315 }
316
317 PerformanceSpan::new(operation.to_string())
318 }
319
320 pub fn instrument_async<F>(
322 &self,
323 future: F,
324 operation: &str,
325 ) -> Box<dyn std::future::Future<Output = F::Output> + Send>
326 where
327 F: std::future::Future + Send + 'static,
328 {
329 if self.enabled {
330 let span = tracing::info_span!(
331 "performance_operation",
332 operation = operation,
333 performance_monitoring = true
334 );
335 Box::new(future.instrument(span))
336 } else {
337 Box::new(future)
339 }
340 }
341}
342
343#[derive(Debug)]
345pub struct PerformanceSpan {
346 enabled: bool,
347 operation: String,
348 start_time: std::time::Instant,
349}
350
351impl PerformanceSpan {
352 fn new(operation: String) -> Self {
353 Self {
354 enabled: true,
355 operation,
356 start_time: std::time::Instant::now(),
357 }
358 }
359
360 fn disabled() -> Self {
361 Self {
362 enabled: false,
363 operation: String::new(),
364 start_time: std::time::Instant::now(),
365 }
366 }
367
368 pub fn finish(self) -> Duration {
370 let duration = self.start_time.elapsed();
371
372 if self.enabled {
373 info!(
374 event = "performance_measurement",
375 operation = self.operation,
376 duration_ms = duration.as_millis(),
377 "Operation completed"
378 );
379 }
380
381 duration
382 }
383}
384
385#[derive(Debug, thiserror::Error)]
387pub enum ObservabilityError {
388 #[error("Failed to initialize observability: {0}")]
390 InitializationFailed(String),
391
392 #[error("Configuration error: {0}")]
394 ConfigurationError(String),
395}
396
397#[derive(Debug)]
399pub struct GlobalObservability {
400 security_audit_logger: tokio::sync::RwLock<Option<SecurityAuditLogger>>,
401 performance_monitor: tokio::sync::RwLock<Option<PerformanceMonitor>>,
402}
403
404impl Default for GlobalObservability {
405 fn default() -> Self {
406 Self::new()
407 }
408}
409
410impl GlobalObservability {
411 pub fn new() -> Self {
413 Self {
414 security_audit_logger: tokio::sync::RwLock::new(None),
415 performance_monitor: tokio::sync::RwLock::new(None),
416 }
417 }
418
419 pub async fn set_security_audit_logger(&self, logger: SecurityAuditLogger) {
421 *self.security_audit_logger.write().await = Some(logger);
422 }
423
424 pub async fn set_performance_monitor(&self, monitor: PerformanceMonitor) {
426 *self.performance_monitor.write().await = Some(monitor);
427 }
428
429 pub async fn security_audit_logger(&self) -> Option<SecurityAuditLogger> {
431 self.security_audit_logger.read().await.clone()
432 }
433
434 pub async fn performance_monitor(&self) -> Option<PerformanceMonitor> {
436 self.performance_monitor.read().await.clone()
437 }
438}
439
440static GLOBAL_OBSERVABILITY: once_cell::sync::Lazy<GlobalObservability> =
442 once_cell::sync::Lazy::new(GlobalObservability::new);
443
444pub fn global_observability() -> &'static GlobalObservability {
446 &GLOBAL_OBSERVABILITY
447}
448
449#[macro_export]
451macro_rules! instrument_async {
452 ($operation:expr, $future:expr) => {{
453 let monitor = $crate::observability::global_observability()
454 .performance_monitor()
455 .await;
456
457 if let Some(monitor) = monitor {
458 monitor.instrument_async($future, $operation).await
459 } else {
460 $future.await
461 }
462 }};
463}
464
465#[macro_export]
467macro_rules! measure_performance {
468 ($operation:expr, $code:block) => {{
469 let monitor = $crate::observability::global_observability()
470 .performance_monitor()
471 .await;
472
473 let span = if let Some(ref monitor) = monitor {
474 Some(monitor.start_span($operation))
475 } else {
476 None
477 };
478
479 let result = $code;
480
481 if let Some(span) = span {
482 let _duration = span.finish();
483 }
484
485 result
486 }};
487}
488
489#[derive(Debug, Clone, PartialEq)]
491pub enum OtlpProtocol {
492 Grpc,
494 Http,
496}
497
498#[derive(Debug, Clone)]
500pub struct SamplingConfig {
501 pub sample_rate: f64,
503 pub parent_based: bool,
505}
506
507impl Default for SamplingConfig {
508 fn default() -> Self {
509 Self {
510 sample_rate: 1.0,
511 parent_based: true,
512 }
513 }
514}
515
516#[cfg(test)]
517mod tests {
518 use super::*;
519
520 #[test]
521 fn test_observability_config_defaults() {
522 let config = ObservabilityConfig::default();
523 assert_eq!(config.service_name, "turbomcp-server");
524 assert!(config.security_auditing);
525 assert!(config.performance_monitoring);
526 }
527
528 #[test]
529 fn test_observability_config_builder() {
530 let config = ObservabilityConfig::new("test-service")
531 .with_service_version("1.0.0")
532 .with_log_level("debug")
533 .enable_security_auditing()
534 .enable_performance_monitoring();
535
536 assert_eq!(config.service_name, "test-service");
537 assert_eq!(config.service_version, "1.0.0");
538 assert_eq!(config.log_level, "debug");
539 assert!(config.security_auditing);
540 assert!(config.performance_monitoring);
541 }
542
543 #[tokio::test]
544 async fn test_security_audit_logger() {
545 let logger = SecurityAuditLogger::new(true);
546
547 logger.log_authentication("user123", true, Some("JWT token"));
549 logger.log_authorization("user123", "/api/tools", "execute", true);
550 logger.log_tool_execution("user123", "file_reader", true, 150);
551 logger.log_security_violation("rate_limit_exceeded", "Too many requests", "warning");
552 }
553
554 #[test]
555 fn test_performance_monitor() {
556 let monitor = PerformanceMonitor::new(true);
557 let span = monitor.start_span("test_operation");
558
559 std::thread::sleep(std::time::Duration::from_nanos(100));
561
562 let duration = span.finish();
563
564 assert!(duration.as_nanos() > 0);
565 }
566
567 #[tokio::test]
568 async fn test_global_observability() {
569 let global = global_observability();
570 let logger = SecurityAuditLogger::new(true);
571 let monitor = PerformanceMonitor::new(true);
572
573 global.set_security_audit_logger(logger.clone()).await;
574 global.set_performance_monitor(monitor.clone()).await;
575
576 let retrieved_logger = global.security_audit_logger().await;
577 let retrieved_monitor = global.performance_monitor().await;
578
579 assert!(retrieved_logger.is_some());
580 assert!(retrieved_monitor.is_some());
581 }
582}