Skip to main content

prosa_utils/config/
observability.rs

1//! Definition of Opentelemetry configuration
2
3use opentelemetry::{KeyValue, trace::TracerProvider as _};
4use opentelemetry_otlp::{
5    ExportConfig, ExporterBuildError, Protocol, WithExportConfig, WithHttpConfig,
6};
7use opentelemetry_sdk::{
8    logs::SdkLoggerProvider,
9    metrics::SdkMeterProvider,
10    trace::{SdkTracerProvider, Tracer},
11};
12use serde::{Deserialize, Serialize};
13use std::{collections::HashMap, time::Duration};
14use tracing_subscriber::{filter, prelude::*};
15use tracing_subscriber::{layer::SubscriberExt, util::TryInitError};
16use url::Url;
17
18use crate::config::url_authentication;
19
20use super::tracing::{TelemetryFilter, TelemetryLevel};
21
22/// Configuration struct of an **O**pen **T**e**l**emetry **P**rotocol Exporter
23#[derive(Debug, Deserialize, Serialize, Clone)]
24pub(crate) struct OTLPExporterCfg {
25    pub(crate) level: Option<TelemetryLevel>,
26    endpoint: Url,
27    #[serde(skip_serializing)]
28    timeout_sec: Option<u64>,
29}
30
31impl OTLPExporterCfg {
32    pub(crate) fn get_protocol(&self) -> Protocol {
33        match self.endpoint.scheme().to_lowercase().as_str() {
34            "grpc" => Protocol::Grpc,
35            "http/json" => Protocol::HttpJson,
36            _ => Protocol::HttpBinary,
37        }
38    }
39
40    pub(crate) fn get_header(&self) -> HashMap<String, String> {
41        let mut headers = HashMap::with_capacity(1);
42        if let Some(authorization) = url_authentication(&self.endpoint) {
43            headers.insert("Authorization".to_string(), authorization);
44        }
45        headers
46    }
47
48    pub(crate) fn get_resource(
49        &self,
50        attr: Vec<KeyValue>,
51    ) -> opentelemetry_sdk::resource::Resource {
52        opentelemetry_sdk::resource::Resource::builder()
53            .with_attributes(attr)
54            .with_attribute(opentelemetry::KeyValue::new(
55                "process.creation.time",
56                chrono::Utc::now().to_rfc3339(),
57            ))
58            .with_attribute(opentelemetry::KeyValue::new(
59                "process.pid",
60                opentelemetry::Value::I64(std::process::id() as i64),
61            ))
62            .build()
63    }
64}
65
66impl From<OTLPExporterCfg> for ExportConfig {
67    fn from(value: OTLPExporterCfg) -> Self {
68        let protocol = value.get_protocol();
69        let mut endpoint = value.endpoint;
70        if !endpoint.username().is_empty() {
71            let _ = endpoint.set_username("");
72        }
73        if endpoint.password().is_some() {
74            let _ = endpoint.set_password(None);
75        }
76
77        ExportConfig {
78            endpoint: Some(endpoint.to_string()),
79            timeout: value.timeout_sec.map(Duration::from_secs),
80            protocol,
81        }
82    }
83}
84
85impl Default for OTLPExporterCfg {
86    fn default() -> Self {
87        Self {
88            level: None,
89            endpoint: Url::parse("grpc://localhost:4317").unwrap(),
90            timeout_sec: None,
91        }
92    }
93}
94
95#[cfg(feature = "config-observability-prometheus")]
96/// Configuration struct of a prometheus metric exporter
97#[derive(Default, Debug, Deserialize, Serialize, Clone)]
98pub struct PrometheusExporterCfg {
99    endpoint: Option<String>,
100}
101
102#[cfg(feature = "config-observability-prometheus")]
103impl PrometheusExporterCfg {
104    /// Start an HTTP server to expose the metrics if needed
105    pub(crate) fn init_prometheus_server(
106        &self,
107        registry: &prometheus::Registry,
108    ) -> Result<(), ExporterBuildError> {
109        if let Some(endpoint) = self.endpoint.clone() {
110            let registry = registry.clone();
111            tokio::task::spawn(async move {
112                match tokio::net::TcpListener::bind(endpoint).await {
113                    Ok(listener) => {
114                        loop {
115                            if let Ok((stream, _)) = listener.accept().await {
116                                let io = hyper_util::rt::TokioIo::new(stream);
117                                let registry = registry.clone();
118                                tokio::task::spawn(async move {
119                                    if let Err(err) = hyper::server::conn::http1::Builder::new()
120                                    .serve_connection(
121                                        io,
122                                        #[allow(unused)]
123                                        hyper::service::service_fn(|req| {
124                                            let registry = registry.clone();
125                                            async move {
126                                                let metric_families = registry.gather();
127                                                let encoder = prometheus::TextEncoder::new();
128                                                if let Ok(metric_data) =
129                                                    encoder.encode_to_string(&metric_families)
130                                                {
131                                                    let response = hyper::Response::builder()
132                                                        .header(hyper::header::SERVER, concat!("ProSA/", env!("CARGO_PKG_VERSION")))
133                                                        .header(
134                                                            hyper::header::CONTENT_TYPE,
135                                                            "text/plain; version=1.0.0",
136                                                        );
137
138                                                    #[cfg(feature = "config-observability-gzip")]
139                                                    if req.headers().get(hyper::header::ACCEPT_ENCODING).is_some_and(|a| a.to_str().is_ok_and(|v| v.contains("gzip"))) {
140                                                        let mut gz_encoder = flate2::write::GzEncoder::new(Vec::with_capacity(2048), flate2::Compression::fast());
141                                                        if std::io::Write::write_all(&mut gz_encoder, metric_data.as_bytes()).is_ok()
142                                                            && let Ok(compressed_data) = gz_encoder.finish()
143                                                        {
144                                                            return response
145                                                                .header(hyper::header::CONTENT_ENCODING, "gzip")
146                                                                .body(http_body_util::Full::new(
147                                                                    bytes::Bytes::from(compressed_data)),
148                                                                )
149                                                                .map_err(|e| e.to_string());
150                                                        }
151                                                    }
152
153                                                    response
154                                                        .body(http_body_util::Full::new(
155                                                            bytes::Bytes::from(metric_data),
156                                                        ))
157                                                        .map_err(|e| e.to_string())
158                                                } else {
159                                                    Err("Can't serialize metrics".to_string())
160                                                }
161                                            }
162                                        }),
163                                    )
164                                    .await
165                                {
166                                    log::debug!(target: "prosa::observability::prometheus_server", "Error serving prometheus connection: {err:?}");
167                                }
168                                });
169                            }
170                        }
171                    }
172                    Err(e) => {
173                        log::error!(target: "prosa::observability::prometheus_server", "Failed to bind Prometheus metrics server: {e}");
174                    }
175                }
176            });
177        }
178
179        Ok(())
180    }
181
182    pub(crate) fn get_resource(
183        &self,
184        attr: Vec<KeyValue>,
185    ) -> opentelemetry_sdk::resource::Resource {
186        opentelemetry_sdk::resource::Resource::builder()
187            .with_attributes(attr)
188            .build()
189    }
190}
191
192/// Configuration struct of an stdout exporter
193#[derive(Default, Debug, Deserialize, Serialize, Copy, Clone)]
194pub(crate) struct StdoutExporterCfg {
195    #[serde(default)]
196    pub(crate) level: Option<TelemetryLevel>,
197}
198
199/// Telemetry data define for metrics
200#[derive(Default, Debug, Deserialize, Serialize, Clone)]
201pub struct TelemetryMetrics {
202    otlp: Option<OTLPExporterCfg>,
203    #[cfg(feature = "config-observability-prometheus")]
204    prometheus: Option<PrometheusExporterCfg>,
205    stdout: Option<StdoutExporterCfg>,
206}
207
208impl TelemetryMetrics {
209    /// Build a meter provider based on the self configuration
210    fn build_provider(
211        &self,
212        #[cfg(feature = "config-observability-prometheus")] resource_attr: Vec<KeyValue>,
213        #[cfg(feature = "config-observability-prometheus")] registry: &prometheus::Registry,
214    ) -> Result<SdkMeterProvider, ExporterBuildError> {
215        let mut meter_provider = SdkMeterProvider::builder();
216        if let Some(s) = &self.otlp {
217            let exporter = if s.get_protocol() == Protocol::Grpc {
218                opentelemetry_otlp::MetricExporter::builder()
219                    .with_tonic()
220                    .with_export_config(s.clone().into())
221                    .build()
222            } else {
223                opentelemetry_otlp::MetricExporter::builder()
224                    .with_http()
225                    .with_headers(s.get_header())
226                    .with_export_config(s.clone().into())
227                    .build()
228            }?;
229            meter_provider = meter_provider.with_periodic_exporter(exporter);
230        }
231
232        #[cfg(feature = "config-observability-prometheus")]
233        if let Some(prom) = &self.prometheus {
234            // configure OpenTelemetry to use this registry
235            let exporter = opentelemetry_prometheus::exporter()
236                .with_registry(registry.clone())
237                .with_resource_selector(opentelemetry_prometheus::ResourceSelector::All)
238                .without_target_info()
239                .build()
240                .map_err(|e| ExporterBuildError::InternalFailure(e.to_string()))?;
241            meter_provider = meter_provider
242                .with_resource(prom.get_resource(resource_attr))
243                .with_reader(exporter);
244
245            // Initialize the Prometheus server if needed
246            prom.init_prometheus_server(registry)?;
247        }
248
249        if self.stdout.is_some() {
250            let exporter = opentelemetry_stdout::MetricExporter::default();
251            meter_provider = meter_provider.with_periodic_exporter(exporter);
252        }
253
254        Ok(meter_provider.build())
255    }
256}
257
258/// Telemetry data define for metrics, logs, traces
259#[derive(Debug, Deserialize, Serialize, Clone)]
260pub struct TelemetryData {
261    otlp: Option<OTLPExporterCfg>,
262    stdout: Option<StdoutExporterCfg>,
263}
264
265impl TelemetryData {
266    /// Get the greater log level of the configuration (log level that include both OpenTelemetry and stdout)
267    fn get_max_level(&self) -> TelemetryLevel {
268        if let Some(otlp_level) = self.otlp.as_ref().and_then(|o| o.level) {
269            if let Some(stdout_level) = self.stdout.as_ref().and_then(|l| l.level) {
270                if otlp_level > stdout_level {
271                    otlp_level
272                } else {
273                    stdout_level
274                }
275            } else {
276                otlp_level
277            }
278        } else if let Some(stdout_level) = self.stdout.as_ref().and_then(|l| l.level) {
279            stdout_level
280        } else {
281            TelemetryLevel::TRACE
282        }
283    }
284
285    /// Build a logger provider based on the self configuration
286    fn build_logger_provider(
287        &self,
288        resource_attr: Vec<KeyValue>,
289    ) -> Result<(SdkLoggerProvider, TelemetryLevel), ExporterBuildError> {
290        let logs_provider = SdkLoggerProvider::builder();
291        if let Some(s) = &self.otlp {
292            let exporter = if s.get_protocol() == Protocol::Grpc {
293                opentelemetry_otlp::LogExporter::builder()
294                    .with_tonic()
295                    .with_export_config(s.clone().into())
296                    .build()
297            } else {
298                opentelemetry_otlp::LogExporter::builder()
299                    .with_http()
300                    .with_headers(s.get_header())
301                    .with_export_config(s.clone().into())
302                    .build()
303            }?;
304            Ok((
305                logs_provider
306                    .with_resource(s.get_resource(resource_attr))
307                    .with_batch_exporter(exporter)
308                    .build(),
309                s.level.unwrap_or_default(),
310            ))
311        } else if let Some(stdout) = &self.stdout {
312            Ok((
313                logs_provider
314                    .with_simple_exporter(opentelemetry_stdout::LogExporter::default())
315                    .build(),
316                stdout.level.unwrap_or_default(),
317            ))
318        } else {
319            Ok((logs_provider.build(), TelemetryLevel::OFF))
320        }
321    }
322
323    /// Build a tracer provider based on the self configuration
324    fn build_tracer_provider(
325        &self,
326        resource_attr: Vec<KeyValue>,
327    ) -> Result<SdkTracerProvider, ExporterBuildError> {
328        let mut trace_provider = SdkTracerProvider::builder();
329        if let Some(s) = &self.otlp {
330            let exporter = if s.get_protocol() == Protocol::Grpc {
331                opentelemetry_otlp::SpanExporter::builder()
332                    .with_tonic()
333                    .with_export_config(s.clone().into())
334                    .build()
335            } else {
336                opentelemetry_otlp::SpanExporter::builder()
337                    .with_http()
338                    .with_headers(s.get_header())
339                    .with_export_config(s.clone().into())
340                    .build()
341            }?;
342
343            trace_provider = trace_provider
344                .with_resource(s.get_resource(resource_attr))
345                .with_batch_exporter(exporter);
346        }
347
348        Ok(trace_provider.build())
349    }
350
351    /// Build a tracer provider based on the self configuration
352    fn build_tracer(
353        &self,
354        name: &str,
355        resource_attr: Vec<KeyValue>,
356    ) -> Result<Tracer, ExporterBuildError> {
357        self.build_tracer_provider(resource_attr)
358            .map(|p| p.tracer(name.to_string()))
359    }
360}
361
362impl Default for TelemetryData {
363    fn default() -> Self {
364        TelemetryData {
365            otlp: None,
366            stdout: Some(StdoutExporterCfg::default()),
367        }
368    }
369}
370
371/// Open telemetry settings of a ProSA
372///
373/// See [`TelemetryFilter`] to configure a specific filter for ProSA processors.
374///
375/// ```
376/// use opentelemetry::global;
377/// use prosa_utils::config::observability::Observability;
378/// use prosa_utils::config::tracing::TelemetryFilter;
379///
380/// #[tokio::main]
381/// async fn main() {
382///     let observability_settings = Observability::default();
383///
384///     // trace
385///     let filter = TelemetryFilter::default();
386///     observability_settings.tracing_init(&filter);
387/// }
388/// ```
389#[derive(Debug, Deserialize, Serialize, Clone)]
390pub struct Observability {
391    /// Additional attributes for all telemetry data
392    #[serde(default)]
393    attributes: HashMap<String, String>,
394    /// Global level for observability
395    #[serde(default)]
396    level: TelemetryLevel,
397    /// Metrics settings of a ProSA
398    metrics: Option<TelemetryMetrics>,
399    /// Logs settings of a ProSA
400    logs: Option<TelemetryData>,
401    /// Traces settings of a ProSA
402    traces: Option<TelemetryData>,
403}
404
405impl Observability {
406    pub(crate) fn common_scope_attributes(service_name: String, capacity: usize) -> Vec<KeyValue> {
407        let mut scope_attributes = Vec::with_capacity(capacity + 3);
408        scope_attributes.push(KeyValue::new("service.name", service_name));
409
410        match std::env::consts::ARCH {
411            "x86_64" => scope_attributes.push(KeyValue::new("host.arch", "amd64")),
412            "aarch64" => scope_attributes.push(KeyValue::new("host.arch", "arm64")),
413            "arm" => scope_attributes.push(KeyValue::new("host.arch", "arm32")),
414            _ => {}
415        }
416
417        match std::env::consts::OS {
418            "linux" => scope_attributes.push(KeyValue::new("os.type", "linux")),
419            "macos" => scope_attributes.push(KeyValue::new("os.type", "darwin")),
420            "freebsd" => scope_attributes.push(KeyValue::new("os.type", "freebsd")),
421            "openbsd" => scope_attributes.push(KeyValue::new("os.type", "openbsd")),
422            "netbsd" => scope_attributes.push(KeyValue::new("os.type", "netbsd")),
423            "windows" => scope_attributes.push(KeyValue::new("os.type", "windows")),
424            _ => {}
425        }
426
427        scope_attributes
428    }
429
430    /// Create an observability object with inline parameter instead of getting it from an external configuration
431    pub fn new(level: TelemetryLevel) -> Observability {
432        Observability {
433            attributes: HashMap::new(),
434            level,
435            metrics: Some(TelemetryMetrics::default()),
436            logs: Some(TelemetryData::default()),
437            traces: Some(TelemetryData::default()),
438        }
439    }
440
441    /// Getter of the observability `service.name` attributes
442    pub fn get_service_name(&self) -> &str {
443        self.attributes
444            .get("service.name")
445            .map(|s| s.as_ref())
446            .unwrap_or("prosa")
447    }
448
449    /// Setter of the ProSA name for all observability `service.name` attributes
450    pub fn set_prosa_name(&mut self, name: &str) {
451        self.attributes
452            .entry("service.name".to_string())
453            .or_insert_with(|| name.to_string());
454    }
455
456    /// Getter of the common scope attributes
457    pub fn get_scope_attributes(&self) -> Vec<KeyValue> {
458        // start with common attributes
459        let mut scope_attr = Self::common_scope_attributes(
460            self.get_service_name().to_string(),
461            self.attributes.len() + 2,
462        );
463
464        if !self.attributes.contains_key("host.name")
465            && let Some(hostname) = super::hostname()
466        {
467            scope_attr.push(KeyValue::new("host.name", hostname));
468        }
469
470        if !self.attributes.contains_key("service.version") {
471            scope_attr.push(KeyValue::new("service.version", env!("CARGO_PKG_VERSION")));
472        }
473
474        // append custom attributes from configuration
475        scope_attr.append(
476            self.attributes
477                .iter()
478                .map(|(k, v)| {
479                    KeyValue::new(k.clone(), opentelemetry::Value::String(v.clone().into()))
480                })
481                .collect::<Vec<KeyValue>>()
482                .as_mut(),
483        );
484
485        scope_attr
486    }
487
488    /// Getter of the log level (max value)
489    pub fn get_logger_level(&self) -> TelemetryLevel {
490        if let Some(logs) = &self.logs {
491            let logs_level = logs.get_max_level();
492            if logs_level > self.level {
493                logs_level
494            } else {
495                self.level
496            }
497        } else {
498            self.level
499        }
500    }
501
502    /// Meter provider builder
503    #[cfg(feature = "config-observability-prometheus")]
504    pub fn build_meter_provider(&self, registry: &prometheus::Registry) -> SdkMeterProvider {
505        if let Some(settings) = &self.metrics {
506            settings
507                .build_provider(self.get_scope_attributes(), registry)
508                .unwrap_or_default()
509        } else {
510            SdkMeterProvider::default()
511        }
512    }
513
514    /// Meter provider builder
515    #[cfg(not(feature = "config-observability-prometheus"))]
516    pub fn build_meter_provider(&self) -> SdkMeterProvider {
517        if let Some(settings) = &self.metrics {
518            settings.build_provider().unwrap_or_default()
519        } else {
520            SdkMeterProvider::default()
521        }
522    }
523
524    /// Logger provider builder
525    pub fn build_logger_provider(&self) -> (SdkLoggerProvider, TelemetryLevel) {
526        if let Some(settings) = &self.logs {
527            match settings.build_logger_provider(self.get_scope_attributes()) {
528                Ok(m) => m,
529                Err(_) => (
530                    SdkLoggerProvider::builder().build(),
531                    TelemetryLevel::default(),
532                ),
533            }
534        } else {
535            (
536                SdkLoggerProvider::builder().build(),
537                TelemetryLevel::default(),
538            )
539        }
540    }
541
542    /// Tracer provider builder
543    ///
544    /// ```
545    /// use opentelemetry::{global, trace::TracerProvider};
546    /// use prosa_utils::config::observability::Observability;
547    ///
548    /// let otel_settings = Observability::default();
549    /// let tracer = otel_settings
550    ///     .build_tracer_provider()
551    ///     .tracer("prosa_proc_example");
552    /// ```
553    pub fn build_tracer_provider(&self) -> SdkTracerProvider {
554        if let Some(settings) = &self.traces {
555            settings
556                .build_tracer_provider(self.get_scope_attributes())
557                .unwrap_or_default()
558        } else {
559            SdkTracerProvider::default()
560        }
561    }
562
563    /// Tracer builder
564    ///
565    /// ```
566    /// use opentelemetry::{global, trace::Tracer};
567    /// use prosa_utils::config::observability::Observability;
568    ///
569    /// let otel_settings = Observability::default();
570    /// let tracer = otel_settings
571    ///     .build_tracer();
572    /// ```
573    pub fn build_tracer(&self) -> Tracer {
574        if let Some(settings) = &self.traces {
575            match settings.build_tracer(self.get_service_name(), self.get_scope_attributes()) {
576                Ok(m) => m,
577                Err(_) => SdkTracerProvider::default().tracer(self.get_service_name().to_string()),
578            }
579        } else {
580            SdkTracerProvider::default().tracer(self.get_service_name().to_string())
581        }
582    }
583
584    /// Method to init `tracing`
585    pub fn tracing_init(&self, filter: &TelemetryFilter) -> Result<(), TryInitError> {
586        let global_level: filter::LevelFilter = self.level.into();
587        let subscriber = tracing_subscriber::registry().with(global_level);
588
589        if let Some(traces) = &self.traces {
590            if let Some(otlp) = &traces.otlp {
591                let tracer = self.build_tracer();
592                let subscriber_filter = filter.clone_with_level(otlp.level.unwrap_or_default());
593                let subscriber = subscriber.with(
594                    tracing_opentelemetry::layer()
595                        .with_tracer(tracer)
596                        .with_filter(subscriber_filter),
597                );
598
599                if let Some(stdout) = traces.stdout {
600                    let subscriber_filter =
601                        filter.clone_with_level(stdout.level.unwrap_or_default());
602                    subscriber
603                        .with(tracing_subscriber::fmt::Layer::new().with_filter(subscriber_filter))
604                        .try_init()
605                } else {
606                    subscriber.try_init()
607                }
608            } else if let Some(stdout) = traces.stdout {
609                let subscriber_filter = filter.clone_with_level(stdout.level.unwrap_or_default());
610                subscriber
611                    .with(tracing_subscriber::fmt::Layer::new().with_filter(subscriber_filter))
612                    .try_init()
613            } else {
614                subscriber.try_init()
615            }
616        } else if let Some(logs) = &self.logs
617            && let Ok((logger_provider, level)) =
618                logs.build_logger_provider(self.get_scope_attributes())
619            && level > TelemetryLevel::OFF
620        {
621            let logger_filter = filter.clone_with_level(level);
622            subscriber
623                .with(
624                    opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge::new(
625                        &logger_provider,
626                    )
627                    .with_filter(logger_filter),
628                )
629                .try_init()
630        } else {
631            subscriber.try_init()
632        }
633    }
634}
635
636impl Default for Observability {
637    fn default() -> Self {
638        Self {
639            attributes: HashMap::new(),
640            level: TelemetryLevel::default(),
641            metrics: Some(TelemetryMetrics::default()),
642            logs: Some(TelemetryData {
643                otlp: None,
644                stdout: Some(StdoutExporterCfg {
645                    level: Some(TelemetryLevel::DEBUG),
646                }),
647            }),
648            traces: Some(TelemetryData {
649                otlp: None,
650                stdout: Some(StdoutExporterCfg {
651                    level: Some(TelemetryLevel::DEBUG),
652                }),
653            }),
654        }
655    }
656}