Skip to main content

oxigdal_observability/telemetry/
mod.rs

1//! OpenTelemetry integration and telemetry setup.
2
3use crate::error::{ObservabilityError, Result};
4use opentelemetry::KeyValue;
5use opentelemetry_sdk::Resource;
6use opentelemetry_sdk::metrics::SdkMeterProvider;
7use parking_lot::RwLock;
8use std::sync::Arc;
9
10pub mod logs;
11pub mod metrics;
12pub mod traces;
13
14/// Telemetry configuration.
15#[derive(Debug, Clone)]
16pub struct TelemetryConfig {
17    /// Service name.
18    pub service_name: String,
19
20    /// Service version.
21    pub service_version: String,
22
23    /// Service namespace.
24    pub service_namespace: Option<String>,
25
26    /// Service instance ID.
27    pub service_instance_id: Option<String>,
28
29    /// Enable traces.
30    pub enable_traces: bool,
31
32    /// Enable metrics.
33    pub enable_metrics: bool,
34
35    /// Enable logs.
36    pub enable_logs: bool,
37
38    /// Jaeger endpoint for traces.
39    pub jaeger_endpoint: Option<String>,
40
41    /// Prometheus endpoint for metrics.
42    pub prometheus_endpoint: Option<String>,
43
44    /// OTLP endpoint (supports all signals).
45    pub otlp_endpoint: Option<String>,
46
47    /// Sampling rate (0.0 to 1.0).
48    pub sampling_rate: f64,
49
50    /// Custom resource attributes.
51    pub resource_attributes: Vec<(String, String)>,
52}
53
54impl Default for TelemetryConfig {
55    fn default() -> Self {
56        Self {
57            service_name: "oxigdal".to_string(),
58            service_version: env!("CARGO_PKG_VERSION").to_string(),
59            service_namespace: None,
60            service_instance_id: None,
61            enable_traces: true,
62            enable_metrics: true,
63            enable_logs: true,
64            jaeger_endpoint: None,
65            prometheus_endpoint: None,
66            otlp_endpoint: None,
67            sampling_rate: 0.1,
68            resource_attributes: Vec::new(),
69        }
70    }
71}
72
73impl TelemetryConfig {
74    /// Create a new telemetry configuration.
75    pub fn new(service_name: impl Into<String>) -> Self {
76        Self {
77            service_name: service_name.into(),
78            ..Default::default()
79        }
80    }
81
82    /// Set service version.
83    pub fn with_service_version(mut self, version: impl Into<String>) -> Self {
84        self.service_version = version.into();
85        self
86    }
87
88    /// Set service namespace.
89    pub fn with_service_namespace(mut self, namespace: impl Into<String>) -> Self {
90        self.service_namespace = Some(namespace.into());
91        self
92    }
93
94    /// Set service instance ID.
95    pub fn with_service_instance_id(mut self, instance_id: impl Into<String>) -> Self {
96        self.service_instance_id = Some(instance_id.into());
97        self
98    }
99
100    /// Enable or disable traces.
101    pub fn with_traces(mut self, enable: bool) -> Self {
102        self.enable_traces = enable;
103        self
104    }
105
106    /// Enable or disable metrics.
107    pub fn with_metrics(mut self, enable: bool) -> Self {
108        self.enable_metrics = enable;
109        self
110    }
111
112    /// Enable or disable logs.
113    pub fn with_logs(mut self, enable: bool) -> Self {
114        self.enable_logs = enable;
115        self
116    }
117
118    /// Set Jaeger endpoint.
119    pub fn with_jaeger_endpoint(mut self, endpoint: impl Into<String>) -> Self {
120        self.jaeger_endpoint = Some(endpoint.into());
121        self
122    }
123
124    /// Set Prometheus endpoint.
125    pub fn with_prometheus_endpoint(mut self, endpoint: impl Into<String>) -> Self {
126        self.prometheus_endpoint = Some(endpoint.into());
127        self
128    }
129
130    /// Set OTLP endpoint.
131    pub fn with_otlp_endpoint(mut self, endpoint: impl Into<String>) -> Self {
132        self.otlp_endpoint = Some(endpoint.into());
133        self
134    }
135
136    /// Set sampling rate.
137    pub fn with_sampling_rate(mut self, rate: f64) -> Self {
138        self.sampling_rate = rate.clamp(0.0, 1.0);
139        self
140    }
141
142    /// Add a resource attribute.
143    pub fn with_resource_attribute(
144        mut self,
145        key: impl Into<String>,
146        value: impl Into<String>,
147    ) -> Self {
148        self.resource_attributes.push((key.into(), value.into()));
149        self
150    }
151}
152
153/// Telemetry provider managing all observability components.
154pub struct TelemetryProvider {
155    config: TelemetryConfig,
156    meter_provider: Option<Arc<RwLock<SdkMeterProvider>>>,
157    is_initialized: Arc<RwLock<bool>>,
158}
159
160impl TelemetryProvider {
161    /// Create a new telemetry provider.
162    pub fn new(config: TelemetryConfig) -> Self {
163        Self {
164            config,
165            meter_provider: None,
166            is_initialized: Arc::new(RwLock::new(false)),
167        }
168    }
169
170    /// Initialize telemetry with the given configuration.
171    pub async fn init(&mut self) -> Result<()> {
172        {
173            let initialized = self.is_initialized.write();
174            if *initialized {
175                return Err(ObservabilityError::TelemetryInit(
176                    "Telemetry already initialized".to_string(),
177                ));
178            }
179        }
180
181        // Create resource with service information
182        let resource = self.create_resource()?;
183
184        // Initialize traces if enabled
185        if self.config.enable_traces {
186            self.init_traces(&resource).await?;
187        }
188
189        // Initialize metrics if enabled
190        if self.config.enable_metrics {
191            self.init_metrics(&resource)?;
192        }
193
194        // Initialize logs if enabled
195        if self.config.enable_logs {
196            self.init_logs()?;
197        }
198
199        *self.is_initialized.write() = true;
200        Ok(())
201    }
202    /// Create OpenTelemetry resource with service information.
203    fn create_resource(&self) -> Result<Resource> {
204        let mut attributes = vec![
205            KeyValue::new("service.name", self.config.service_name.clone()),
206            KeyValue::new("service.version", self.config.service_version.clone()),
207        ];
208
209        if let Some(ref namespace) = self.config.service_namespace {
210            attributes.push(KeyValue::new("service.namespace", namespace.clone()));
211        }
212
213        if let Some(ref instance_id) = self.config.service_instance_id {
214            attributes.push(KeyValue::new("service.instance.id", instance_id.clone()));
215        }
216
217        // Add custom resource attributes
218        for (key, value) in &self.config.resource_attributes {
219            attributes.push(KeyValue::new(key.clone(), value.clone()));
220        }
221
222        Ok(Resource::builder_empty()
223            .with_attributes(attributes)
224            .build())
225    }
226
227    /// Initialize distributed tracing.
228    async fn init_traces(&self, resource: &Resource) -> Result<()> {
229        traces::init_tracing(&self.config, resource.clone()).await
230    }
231
232    /// Initialize metrics collection.
233    fn init_metrics(&mut self, resource: &Resource) -> Result<()> {
234        let meter_provider = metrics::init_metrics(&self.config, resource.clone())?;
235        self.meter_provider = Some(Arc::new(RwLock::new(meter_provider)));
236        Ok(())
237    }
238
239    /// Initialize structured logging.
240    fn init_logs(&self) -> Result<()> {
241        logs::init_logging(&self.config)
242    }
243
244    /// Check if telemetry is initialized.
245    pub fn is_initialized(&self) -> bool {
246        *self.is_initialized.read()
247    }
248
249    /// Shutdown telemetry gracefully.
250    pub async fn shutdown(&self) -> Result<()> {
251        if !self.is_initialized() {
252            return Ok(());
253        }
254
255        // Note: In OpenTelemetry 0.31+, tracer provider shuts down automatically when dropped.
256        // The global tracer provider is managed by the global API and does not expose a shutdown fn.
257
258        // Shutdown metrics
259        if let Some(ref provider) = self.meter_provider {
260            let provider = provider.read();
261            provider.shutdown().map_err(|e| {
262                ObservabilityError::Other(format!("Failed to shutdown meter provider: {}", e))
263            })?;
264        }
265
266        Ok(())
267    }
268
269    /// Get the meter provider.
270    pub fn meter_provider(&self) -> Option<Arc<RwLock<SdkMeterProvider>>> {
271        self.meter_provider.clone()
272    }
273}
274
275/// Initialize global telemetry with default configuration.
276pub async fn init_default() -> Result<TelemetryProvider> {
277    let config = TelemetryConfig::default();
278    let mut provider = TelemetryProvider::new(config);
279    provider.init().await?;
280    Ok(provider)
281}
282
283/// Initialize global telemetry with custom configuration.
284pub async fn init_with_config(config: TelemetryConfig) -> Result<TelemetryProvider> {
285    let mut provider = TelemetryProvider::new(config);
286    provider.init().await?;
287    Ok(provider)
288}
289
290#[cfg(test)]
291mod tests {
292    use super::*;
293
294    #[test]
295    fn test_telemetry_config_builder() {
296        let config = TelemetryConfig::new("test-service")
297            .with_service_version("1.0.0")
298            .with_service_namespace("testing")
299            .with_sampling_rate(0.5)
300            .with_resource_attribute("env", "dev");
301
302        assert_eq!(config.service_name, "test-service");
303        assert_eq!(config.service_version, "1.0.0");
304        assert_eq!(config.service_namespace, Some("testing".to_string()));
305        assert_eq!(config.sampling_rate, 0.5);
306        assert_eq!(config.resource_attributes.len(), 1);
307    }
308
309    #[test]
310    fn test_sampling_rate_clamping() {
311        let config = TelemetryConfig::default().with_sampling_rate(1.5);
312        assert_eq!(config.sampling_rate, 1.0);
313
314        let config = TelemetryConfig::default().with_sampling_rate(-0.5);
315        assert_eq!(config.sampling_rate, 0.0);
316    }
317}