pulseengine_mcp_logging/
telemetry.rs

1//! Simplified OpenTelemetry integration for distributed tracing
2//!
3//! This module provides basic distributed tracing capabilities for MCP servers.
4//! The full OpenTelemetry integration is complex and requires careful API matching.
5
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8use tracing::info;
9
10/// Telemetry configuration for OpenTelemetry
11#[derive(Debug, Clone, Serialize, Deserialize)]
12pub struct TelemetryConfig {
13    /// Enable telemetry
14    pub enabled: bool,
15
16    /// Service name for traces
17    pub service_name: String,
18
19    /// Service version
20    pub service_version: String,
21
22    /// Service namespace (e.g., "mcp", "loxone")
23    pub service_namespace: Option<String>,
24
25    /// Deployment environment (dev, staging, prod)
26    pub environment: Option<String>,
27
28    /// OTLP exporter configuration
29    pub otlp: OtlpConfig,
30
31    /// Jaeger exporter configuration
32    pub jaeger: Option<JaegerConfig>,
33
34    /// Zipkin exporter configuration
35    pub zipkin: Option<ZipkinConfig>,
36
37    /// Sampling configuration
38    pub sampling: SamplingConfig,
39
40    /// Batch processing configuration
41    pub batch: BatchProcessingConfig,
42
43    /// Custom resource attributes
44    pub resource_attributes: HashMap<String, String>,
45
46    /// Enable console exporter for development
47    pub console_exporter: bool,
48}
49
50/// OTLP (OpenTelemetry Protocol) exporter configuration
51#[derive(Debug, Clone, Serialize, Deserialize)]
52pub struct OtlpConfig {
53    /// Enable OTLP exporter
54    pub enabled: bool,
55
56    /// OTLP endpoint URL
57    pub endpoint: String,
58
59    /// Optional headers for authentication
60    pub headers: HashMap<String, String>,
61
62    /// Timeout for exports
63    pub timeout_secs: u64,
64
65    /// Use TLS
66    pub tls_enabled: bool,
67}
68
69/// Jaeger exporter configuration
70#[derive(Debug, Clone, Serialize, Deserialize)]
71pub struct JaegerConfig {
72    /// Jaeger agent endpoint
73    pub agent_endpoint: String,
74
75    /// Jaeger collector endpoint
76    pub collector_endpoint: Option<String>,
77
78    /// Authentication username
79    pub username: Option<String>,
80
81    /// Authentication password
82    pub password: Option<String>,
83}
84
85/// Zipkin exporter configuration
86#[derive(Debug, Clone, Serialize, Deserialize)]
87pub struct ZipkinConfig {
88    /// Zipkin endpoint URL
89    pub endpoint: String,
90
91    /// Timeout for exports
92    pub timeout_secs: u64,
93}
94
95/// Sampling configuration
96#[derive(Debug, Clone, Serialize, Deserialize)]
97pub struct SamplingConfig {
98    /// Sampling strategy
99    pub strategy: SamplingStrategy,
100
101    /// Sampling rate (0.0 to 1.0) for ratio-based sampling
102    pub rate: f64,
103
104    /// Parent-based sampling configuration
105    pub parent_based: bool,
106}
107
108/// Sampling strategies
109#[derive(Debug, Clone, Serialize, Deserialize)]
110#[serde(rename_all = "snake_case")]
111pub enum SamplingStrategy {
112    /// Always sample
113    Always,
114    /// Never sample
115    Never,
116    /// Sample based on ratio
117    Ratio,
118    /// Parent-based sampling
119    ParentBased,
120}
121
122/// Batch processing configuration
123#[derive(Debug, Clone, Serialize, Deserialize)]
124pub struct BatchProcessingConfig {
125    /// Maximum batch size
126    pub max_batch_size: usize,
127
128    /// Batch timeout in milliseconds
129    pub batch_timeout_ms: u64,
130
131    /// Maximum queue size
132    pub max_queue_size: usize,
133
134    /// Export timeout in milliseconds
135    pub export_timeout_ms: u64,
136}
137
138impl Default for TelemetryConfig {
139    fn default() -> Self {
140        Self {
141            enabled: true,
142            service_name: "mcp-server".to_string(),
143            service_version: "1.0.0".to_string(),
144            service_namespace: Some("mcp".to_string()),
145            environment: Some("development".to_string()),
146            otlp: OtlpConfig {
147                enabled: true,
148                endpoint: "http://localhost:4317".to_string(),
149                headers: HashMap::new(),
150                timeout_secs: 10,
151                tls_enabled: false,
152            },
153            jaeger: None,
154            zipkin: None,
155            sampling: SamplingConfig {
156                strategy: SamplingStrategy::Ratio,
157                rate: 0.1, // 10% sampling by default
158                parent_based: true,
159            },
160            batch: BatchProcessingConfig {
161                max_batch_size: 512,
162                batch_timeout_ms: 1000,
163                max_queue_size: 2048,
164                export_timeout_ms: 30000,
165            },
166            resource_attributes: HashMap::new(),
167            console_exporter: false,
168        }
169    }
170}
171
172/// Telemetry manager for OpenTelemetry integration
173pub struct TelemetryManager {
174    config: TelemetryConfig,
175}
176
177impl TelemetryManager {
178    /// Initialize telemetry with the given configuration
179    pub async fn new(config: TelemetryConfig) -> Result<Self, TelemetryError> {
180        let manager = Self { config };
181
182        if manager.config.enabled {
183            info!(
184                "Telemetry enabled for service: {} v{}",
185                manager.config.service_name, manager.config.service_version
186            );
187            // Note: Full OpenTelemetry integration requires matching API versions
188            // This is a simplified version that logs configuration
189        }
190
191        Ok(manager)
192    }
193
194    /// Shutdown telemetry
195    pub async fn shutdown(&self) -> Result<(), TelemetryError> {
196        if self.config.enabled {
197            info!("Shutting down telemetry");
198        }
199        Ok(())
200    }
201}
202
203/// Telemetry error types
204#[derive(Debug, thiserror::Error)]
205pub enum TelemetryError {
206    #[error("Initialization error: {0}")]
207    Initialization(String),
208
209    #[error("Configuration error: {0}")]
210    Configuration(String),
211}
212
213/// Span utilities for common MCP operations
214pub mod spans {
215    use tracing::Span;
216
217    /// Create a span for MCP request handling
218    pub fn mcp_request_span(method: &str, request_id: &str) -> Span {
219        tracing::info_span!(
220            "mcp_request",
221            mcp.method = method,
222            mcp.request_id = request_id,
223            otel.kind = "server"
224        )
225    }
226
227    /// Create a span for backend operations
228    pub fn backend_operation_span(operation: &str, resource: Option<&str>) -> Span {
229        let span = tracing::info_span!(
230            "backend_operation",
231            backend.operation = operation,
232            otel.kind = "internal"
233        );
234
235        if let Some(res) = resource {
236            span.record("backend.resource", res);
237        }
238
239        span
240    }
241
242    /// Create a span for authentication operations
243    pub fn auth_operation_span(operation: &str, user_id: Option<&str>) -> Span {
244        let span = tracing::info_span!(
245            "auth_operation",
246            auth.operation = operation,
247            otel.kind = "internal"
248        );
249
250        if let Some(user) = user_id {
251            span.record("auth.user_id", user);
252        }
253
254        span
255    }
256
257    /// Create a span for external API calls
258    pub fn external_api_span(service: &str, endpoint: &str, method: &str) -> Span {
259        tracing::info_span!(
260            "external_api_call",
261            http.method = method,
262            http.url = endpoint,
263            service.name = service,
264            otel.kind = "client"
265        )
266    }
267
268    /// Create a span for database operations
269    pub fn database_operation_span(operation: &str, table: Option<&str>) -> Span {
270        let span = tracing::info_span!(
271            "database_operation",
272            db.operation = operation,
273            otel.kind = "client"
274        );
275
276        if let Some(tbl) = table {
277            span.record("db.table", tbl);
278        }
279
280        span
281    }
282}
283
284/// Context propagation utilities
285pub mod propagation {
286    use std::collections::HashMap;
287
288    /// Extract OpenTelemetry context from headers (simplified)
289    pub fn extract_context_from_headers(_headers: &HashMap<String, String>) {
290        // Note: Full context propagation requires OpenTelemetry API
291        // This is a placeholder for the functionality
292    }
293
294    /// Inject context into headers (simplified)
295    pub fn inject_context_into_headers(_headers: &mut HashMap<String, String>) {
296        // Note: Full context injection requires OpenTelemetry API
297        // This is a placeholder for the functionality
298    }
299}
300
301#[cfg(test)]
302mod tests {
303    use super::*;
304
305    #[tokio::test]
306    async fn test_telemetry_config_default() {
307        let config = TelemetryConfig::default();
308        assert!(config.enabled);
309        assert_eq!(config.service_name, "mcp-server");
310        assert!(config.otlp.enabled);
311    }
312
313    #[tokio::test]
314    async fn test_telemetry_manager_disabled() {
315        let config = TelemetryConfig {
316            enabled: false,
317            ..Default::default()
318        };
319
320        let manager = TelemetryManager::new(config).await.unwrap();
321        assert!(manager.shutdown().await.is_ok());
322    }
323
324    #[test]
325    fn test_span_utilities() {
326        // Initialize tracing subscriber for test environment
327        let _guard = tracing_subscriber::fmt()
328            .with_max_level(tracing::Level::INFO)
329            .with_test_writer()
330            .try_init();
331
332        let span = spans::mcp_request_span("tools/list", "req-123");
333        assert!(!span.is_disabled());
334
335        let span = spans::backend_operation_span("fetch_data", Some("users"));
336        assert!(!span.is_disabled());
337    }
338}