oxigdal_observability/telemetry/
mod.rs1use crate::error::{ObservabilityError, Result};
4use opentelemetry::KeyValue;
5use opentelemetry_sdk::Resource;
6use opentelemetry_sdk::metrics::SdkMeterProvider;
7use parking_lot::RwLock;
8use std::sync::Arc;
9
10pub mod logs;
11pub mod metrics;
12pub mod traces;
13
14#[derive(Debug, Clone)]
16pub struct TelemetryConfig {
17 pub service_name: String,
19
20 pub service_version: String,
22
23 pub service_namespace: Option<String>,
25
26 pub service_instance_id: Option<String>,
28
29 pub enable_traces: bool,
31
32 pub enable_metrics: bool,
34
35 pub enable_logs: bool,
37
38 pub jaeger_endpoint: Option<String>,
40
41 pub prometheus_endpoint: Option<String>,
43
44 pub otlp_endpoint: Option<String>,
46
47 pub sampling_rate: f64,
49
50 pub resource_attributes: Vec<(String, String)>,
52}
53
54impl Default for TelemetryConfig {
55 fn default() -> Self {
56 Self {
57 service_name: "oxigdal".to_string(),
58 service_version: env!("CARGO_PKG_VERSION").to_string(),
59 service_namespace: None,
60 service_instance_id: None,
61 enable_traces: true,
62 enable_metrics: true,
63 enable_logs: true,
64 jaeger_endpoint: None,
65 prometheus_endpoint: None,
66 otlp_endpoint: None,
67 sampling_rate: 0.1,
68 resource_attributes: Vec::new(),
69 }
70 }
71}
72
73impl TelemetryConfig {
74 pub fn new(service_name: impl Into<String>) -> Self {
76 Self {
77 service_name: service_name.into(),
78 ..Default::default()
79 }
80 }
81
82 pub fn with_service_version(mut self, version: impl Into<String>) -> Self {
84 self.service_version = version.into();
85 self
86 }
87
88 pub fn with_service_namespace(mut self, namespace: impl Into<String>) -> Self {
90 self.service_namespace = Some(namespace.into());
91 self
92 }
93
94 pub fn with_service_instance_id(mut self, instance_id: impl Into<String>) -> Self {
96 self.service_instance_id = Some(instance_id.into());
97 self
98 }
99
100 pub fn with_traces(mut self, enable: bool) -> Self {
102 self.enable_traces = enable;
103 self
104 }
105
106 pub fn with_metrics(mut self, enable: bool) -> Self {
108 self.enable_metrics = enable;
109 self
110 }
111
112 pub fn with_logs(mut self, enable: bool) -> Self {
114 self.enable_logs = enable;
115 self
116 }
117
118 pub fn with_jaeger_endpoint(mut self, endpoint: impl Into<String>) -> Self {
120 self.jaeger_endpoint = Some(endpoint.into());
121 self
122 }
123
124 pub fn with_prometheus_endpoint(mut self, endpoint: impl Into<String>) -> Self {
126 self.prometheus_endpoint = Some(endpoint.into());
127 self
128 }
129
130 pub fn with_otlp_endpoint(mut self, endpoint: impl Into<String>) -> Self {
132 self.otlp_endpoint = Some(endpoint.into());
133 self
134 }
135
136 pub fn with_sampling_rate(mut self, rate: f64) -> Self {
138 self.sampling_rate = rate.clamp(0.0, 1.0);
139 self
140 }
141
142 pub fn with_resource_attribute(
144 mut self,
145 key: impl Into<String>,
146 value: impl Into<String>,
147 ) -> Self {
148 self.resource_attributes.push((key.into(), value.into()));
149 self
150 }
151}
152
153pub struct TelemetryProvider {
155 config: TelemetryConfig,
156 meter_provider: Option<Arc<RwLock<SdkMeterProvider>>>,
157 is_initialized: Arc<RwLock<bool>>,
158}
159
160impl TelemetryProvider {
161 pub fn new(config: TelemetryConfig) -> Self {
163 Self {
164 config,
165 meter_provider: None,
166 is_initialized: Arc::new(RwLock::new(false)),
167 }
168 }
169
170 pub async fn init(&mut self) -> Result<()> {
172 {
173 let initialized = self.is_initialized.write();
174 if *initialized {
175 return Err(ObservabilityError::TelemetryInit(
176 "Telemetry already initialized".to_string(),
177 ));
178 }
179 }
180
181 let resource = self.create_resource()?;
183
184 if self.config.enable_traces {
186 self.init_traces(&resource).await?;
187 }
188
189 if self.config.enable_metrics {
191 self.init_metrics(&resource)?;
192 }
193
194 if self.config.enable_logs {
196 self.init_logs()?;
197 }
198
199 *self.is_initialized.write() = true;
200 Ok(())
201 }
202 fn create_resource(&self) -> Result<Resource> {
204 let mut attributes = vec![
205 KeyValue::new("service.name", self.config.service_name.clone()),
206 KeyValue::new("service.version", self.config.service_version.clone()),
207 ];
208
209 if let Some(ref namespace) = self.config.service_namespace {
210 attributes.push(KeyValue::new("service.namespace", namespace.clone()));
211 }
212
213 if let Some(ref instance_id) = self.config.service_instance_id {
214 attributes.push(KeyValue::new("service.instance.id", instance_id.clone()));
215 }
216
217 for (key, value) in &self.config.resource_attributes {
219 attributes.push(KeyValue::new(key.clone(), value.clone()));
220 }
221
222 Ok(Resource::builder_empty()
223 .with_attributes(attributes)
224 .build())
225 }
226
227 async fn init_traces(&self, resource: &Resource) -> Result<()> {
229 traces::init_tracing(&self.config, resource.clone()).await
230 }
231
232 fn init_metrics(&mut self, resource: &Resource) -> Result<()> {
234 let meter_provider = metrics::init_metrics(&self.config, resource.clone())?;
235 self.meter_provider = Some(Arc::new(RwLock::new(meter_provider)));
236 Ok(())
237 }
238
239 fn init_logs(&self) -> Result<()> {
241 logs::init_logging(&self.config)
242 }
243
244 pub fn is_initialized(&self) -> bool {
246 *self.is_initialized.read()
247 }
248
249 pub async fn shutdown(&self) -> Result<()> {
251 if !self.is_initialized() {
252 return Ok(());
253 }
254
255 if let Some(ref provider) = self.meter_provider {
260 let provider = provider.read();
261 provider.shutdown().map_err(|e| {
262 ObservabilityError::Other(format!("Failed to shutdown meter provider: {}", e))
263 })?;
264 }
265
266 Ok(())
267 }
268
269 pub fn meter_provider(&self) -> Option<Arc<RwLock<SdkMeterProvider>>> {
271 self.meter_provider.clone()
272 }
273}
274
275pub async fn init_default() -> Result<TelemetryProvider> {
277 let config = TelemetryConfig::default();
278 let mut provider = TelemetryProvider::new(config);
279 provider.init().await?;
280 Ok(provider)
281}
282
283pub async fn init_with_config(config: TelemetryConfig) -> Result<TelemetryProvider> {
285 let mut provider = TelemetryProvider::new(config);
286 provider.init().await?;
287 Ok(provider)
288}
289
290#[cfg(test)]
291mod tests {
292 use super::*;
293
294 #[test]
295 fn test_telemetry_config_builder() {
296 let config = TelemetryConfig::new("test-service")
297 .with_service_version("1.0.0")
298 .with_service_namespace("testing")
299 .with_sampling_rate(0.5)
300 .with_resource_attribute("env", "dev");
301
302 assert_eq!(config.service_name, "test-service");
303 assert_eq!(config.service_version, "1.0.0");
304 assert_eq!(config.service_namespace, Some("testing".to_string()));
305 assert_eq!(config.sampling_rate, 0.5);
306 assert_eq!(config.resource_attributes.len(), 1);
307 }
308
309 #[test]
310 fn test_sampling_rate_clamping() {
311 let config = TelemetryConfig::default().with_sampling_rate(1.5);
312 assert_eq!(config.sampling_rate, 1.0);
313
314 let config = TelemetryConfig::default().with_sampling_rate(-0.5);
315 assert_eq!(config.sampling_rate, 0.0);
316 }
317}