Skip to main content

prosa_utils/config/
observability.rs

1//! Definition of Opentelemetry configuration
2
3use opentelemetry::{KeyValue, trace::TracerProvider as _};
4use opentelemetry_otlp::{
5    ExportConfig, ExporterBuildError, Protocol, WithExportConfig, WithHttpConfig,
6};
7use opentelemetry_sdk::{
8    logs::SdkLoggerProvider,
9    metrics::SdkMeterProvider,
10    trace::{SdkTracerProvider, Tracer},
11};
12use serde::{Deserialize, Serialize};
13use std::{collections::HashMap, time::Duration};
14use tracing_subscriber::{filter, prelude::*};
15use tracing_subscriber::{layer::SubscriberExt, util::TryInitError};
16use url::Url;
17
18use crate::config::url_authentication;
19
20use super::tracing::{TelemetryFilter, TelemetryLevel};
21
22/// Configuration struct of an **O**pen **T**e**l**emetry **P**rotocol Exporter
23#[derive(Debug, Deserialize, Serialize, Clone)]
24pub(crate) struct OTLPExporterCfg {
25    pub(crate) level: Option<TelemetryLevel>,
26    endpoint: Url,
27    #[serde(skip_serializing)]
28    timeout_sec: Option<u64>,
29}
30
31impl OTLPExporterCfg {
32    pub(crate) fn get_protocol(&self) -> Protocol {
33        match self.endpoint.scheme().to_lowercase().as_str() {
34            "grpc" => Protocol::Grpc,
35            "http/json" => Protocol::HttpJson,
36            _ => Protocol::HttpBinary,
37        }
38    }
39
40    pub(crate) fn get_header(&self) -> HashMap<String, String> {
41        let mut headers = HashMap::with_capacity(1);
42        if let Some(authorization) = url_authentication(&self.endpoint) {
43            headers.insert("Authorization".to_string(), authorization);
44        }
45        headers
46    }
47
48    pub(crate) fn get_resource(
49        &self,
50        attr: Vec<KeyValue>,
51    ) -> opentelemetry_sdk::resource::Resource {
52        opentelemetry_sdk::resource::Resource::builder()
53            .with_attributes(attr)
54            .with_attribute(opentelemetry::KeyValue::new(
55                "process.creation.time",
56                chrono::Utc::now().to_rfc3339(),
57            ))
58            .with_attribute(opentelemetry::KeyValue::new(
59                "process.pid",
60                opentelemetry::Value::I64(std::process::id() as i64),
61            ))
62            .build()
63    }
64}
65
66impl From<OTLPExporterCfg> for ExportConfig {
67    fn from(value: OTLPExporterCfg) -> Self {
68        let protocol = value.get_protocol();
69        let mut endpoint = value.endpoint;
70        if !endpoint.username().is_empty() {
71            let _ = endpoint.set_username("");
72        }
73        if endpoint.password().is_some() {
74            let _ = endpoint.set_password(None);
75        }
76
77        ExportConfig {
78            endpoint: Some(endpoint.to_string()),
79            timeout: value.timeout_sec.map(Duration::from_secs),
80            protocol,
81        }
82    }
83}
84
85impl Default for OTLPExporterCfg {
86    fn default() -> Self {
87        Self {
88            level: None,
89            endpoint: Url::parse("grpc://localhost:4317").unwrap(),
90            timeout_sec: None,
91        }
92    }
93}
94
95#[cfg(feature = "config-observability-prometheus")]
96/// Configuration struct of a prometheus metric exporter
97#[derive(Default, Debug, Deserialize, Serialize, Clone)]
98pub struct PrometheusExporterCfg {
99    endpoint: Option<String>,
100}
101
102#[cfg(feature = "config-observability-prometheus")]
103impl PrometheusExporterCfg {
104    /// Start an HTTP server to expose the metrics if needed
105    pub(crate) fn init_prometheus_server(
106        &self,
107        registry: &prometheus::Registry,
108    ) -> Result<(), ExporterBuildError> {
109        if let Some(endpoint) = self.endpoint.clone() {
110            let registry = registry.clone();
111            tokio::task::spawn(async move {
112                match tokio::net::TcpListener::bind(endpoint).await {
113                    Ok(listener) => loop {
114                        if let Ok((stream, _)) = listener.accept().await {
115                            let io = hyper_util::rt::TokioIo::new(stream);
116                            let registry = registry.clone();
117                            tokio::task::spawn(async move {
118                                if let Err(err) = hyper::server::conn::http1::Builder::new()
119                                    .serve_connection(
120                                        io,
121                                        hyper::service::service_fn(|_req| {
122                                            let registry = registry.clone();
123                                            async move {
124                                                let metric_families = registry.gather();
125                                                let encoder = prometheus::TextEncoder::new();
126                                                if let Ok(metric_data) =
127                                                    encoder.encode_to_string(&metric_families)
128                                                {
129                                                    Ok(hyper::Response::new(
130                                                        http_body_util::Full::new(
131                                                            bytes::Bytes::from(metric_data),
132                                                        ),
133                                                    ))
134                                                } else {
135                                                    Err("Can't serialize metrics")
136                                                }
137                                            }
138                                        }),
139                                    )
140                                    .await
141                                {
142                                    log::debug!(target: "prosa::observability::prometheus_server", "Error serving prometheus connection: {err:?}");
143                                }
144                            });
145                        }
146                    },
147                    Err(e) => {
148                        log::error!(target: "prosa::observability::prometheus_server", "Failed to bind Prometheus metrics server: {e}");
149                    }
150                }
151            });
152        }
153
154        Ok(())
155    }
156
157    pub(crate) fn get_resource(
158        &self,
159        attr: Vec<KeyValue>,
160    ) -> opentelemetry_sdk::resource::Resource {
161        opentelemetry_sdk::resource::Resource::builder()
162            .with_attributes(attr)
163            .build()
164    }
165}
166
167/// Configuration struct of an stdout exporter
168#[derive(Default, Debug, Deserialize, Serialize, Copy, Clone)]
169pub(crate) struct StdoutExporterCfg {
170    #[serde(default)]
171    pub(crate) level: Option<TelemetryLevel>,
172}
173
174/// Telemetry data define for metrics
175#[derive(Default, Debug, Deserialize, Serialize, Clone)]
176pub struct TelemetryMetrics {
177    otlp: Option<OTLPExporterCfg>,
178    #[cfg(feature = "config-observability-prometheus")]
179    prometheus: Option<PrometheusExporterCfg>,
180    stdout: Option<StdoutExporterCfg>,
181}
182
183impl TelemetryMetrics {
184    /// Build a meter provider based on the self configuration
185    fn build_provider(
186        &self,
187        #[cfg(feature = "config-observability-prometheus")] resource_attr: Vec<KeyValue>,
188        #[cfg(feature = "config-observability-prometheus")] registry: &prometheus::Registry,
189    ) -> Result<SdkMeterProvider, ExporterBuildError> {
190        let mut meter_provider = SdkMeterProvider::builder();
191        if let Some(s) = &self.otlp {
192            let exporter = if s.get_protocol() == Protocol::Grpc {
193                opentelemetry_otlp::MetricExporter::builder()
194                    .with_tonic()
195                    .with_export_config(s.clone().into())
196                    .build()
197            } else {
198                opentelemetry_otlp::MetricExporter::builder()
199                    .with_http()
200                    .with_headers(s.get_header())
201                    .with_export_config(s.clone().into())
202                    .build()
203            }?;
204            meter_provider = meter_provider.with_periodic_exporter(exporter);
205        }
206
207        #[cfg(feature = "config-observability-prometheus")]
208        if let Some(prom) = &self.prometheus {
209            // configure OpenTelemetry to use this registry
210            let exporter = opentelemetry_prometheus::exporter()
211                .with_registry(registry.clone())
212                .with_resource_selector(opentelemetry_prometheus::ResourceSelector::All)
213                .without_target_info()
214                .build()
215                .map_err(|e| ExporterBuildError::InternalFailure(e.to_string()))?;
216            meter_provider = meter_provider
217                .with_resource(prom.get_resource(resource_attr))
218                .with_reader(exporter);
219
220            // Initialize the Prometheus server if needed
221            prom.init_prometheus_server(registry)?;
222        }
223
224        if self.stdout.is_some() {
225            let exporter = opentelemetry_stdout::MetricExporter::default();
226            meter_provider = meter_provider.with_periodic_exporter(exporter);
227        }
228
229        Ok(meter_provider.build())
230    }
231}
232
233/// Telemetry data define for metrics, logs, traces
234#[derive(Debug, Deserialize, Serialize, Clone)]
235pub struct TelemetryData {
236    otlp: Option<OTLPExporterCfg>,
237    stdout: Option<StdoutExporterCfg>,
238}
239
240impl TelemetryData {
241    /// Get the greater log level of the configuration (log level that include both OpenTelemetry and stdout)
242    fn get_max_level(&self) -> TelemetryLevel {
243        if let Some(otlp_level) = self.otlp.as_ref().and_then(|o| o.level) {
244            if let Some(stdout_level) = self.stdout.as_ref().and_then(|l| l.level) {
245                if otlp_level > stdout_level {
246                    otlp_level
247                } else {
248                    stdout_level
249                }
250            } else {
251                otlp_level
252            }
253        } else if let Some(stdout_level) = self.stdout.as_ref().and_then(|l| l.level) {
254            stdout_level
255        } else {
256            TelemetryLevel::TRACE
257        }
258    }
259
260    /// Build a logger provider based on the self configuration
261    fn build_logger_provider(
262        &self,
263        resource_attr: Vec<KeyValue>,
264    ) -> Result<(SdkLoggerProvider, TelemetryLevel), ExporterBuildError> {
265        let logs_provider = SdkLoggerProvider::builder();
266        if let Some(s) = &self.otlp {
267            let exporter = if s.get_protocol() == Protocol::Grpc {
268                opentelemetry_otlp::LogExporter::builder()
269                    .with_tonic()
270                    .with_export_config(s.clone().into())
271                    .build()
272            } else {
273                opentelemetry_otlp::LogExporter::builder()
274                    .with_http()
275                    .with_headers(s.get_header())
276                    .with_export_config(s.clone().into())
277                    .build()
278            }?;
279            Ok((
280                logs_provider
281                    .with_resource(s.get_resource(resource_attr))
282                    .with_batch_exporter(exporter)
283                    .build(),
284                s.level.unwrap_or_default(),
285            ))
286        } else if let Some(stdout) = &self.stdout {
287            Ok((
288                logs_provider
289                    .with_simple_exporter(opentelemetry_stdout::LogExporter::default())
290                    .build(),
291                stdout.level.unwrap_or_default(),
292            ))
293        } else {
294            Ok((logs_provider.build(), TelemetryLevel::OFF))
295        }
296    }
297
298    /// Build a tracer provider based on the self configuration
299    fn build_tracer_provider(
300        &self,
301        resource_attr: Vec<KeyValue>,
302    ) -> Result<SdkTracerProvider, ExporterBuildError> {
303        let mut trace_provider = SdkTracerProvider::builder();
304        if let Some(s) = &self.otlp {
305            let exporter = if s.get_protocol() == Protocol::Grpc {
306                opentelemetry_otlp::SpanExporter::builder()
307                    .with_tonic()
308                    .with_export_config(s.clone().into())
309                    .build()
310            } else {
311                opentelemetry_otlp::SpanExporter::builder()
312                    .with_http()
313                    .with_headers(s.get_header())
314                    .with_export_config(s.clone().into())
315                    .build()
316            }?;
317
318            trace_provider = trace_provider
319                .with_resource(s.get_resource(resource_attr))
320                .with_batch_exporter(exporter);
321        }
322
323        Ok(trace_provider.build())
324    }
325
326    /// Build a tracer provider based on the self configuration
327    fn build_tracer(
328        &self,
329        name: &str,
330        resource_attr: Vec<KeyValue>,
331    ) -> Result<Tracer, ExporterBuildError> {
332        self.build_tracer_provider(resource_attr)
333            .map(|p| p.tracer(name.to_string()))
334    }
335}
336
337impl Default for TelemetryData {
338    fn default() -> Self {
339        TelemetryData {
340            otlp: None,
341            stdout: Some(StdoutExporterCfg::default()),
342        }
343    }
344}
345
346/// Open telemetry settings of a ProSA
347///
348/// See [`TelemetryFilter`] to configure a specific filter for ProSA processors.
349///
350/// ```
351/// use opentelemetry::global;
352/// use prosa_utils::config::observability::Observability;
353/// use prosa_utils::config::tracing::TelemetryFilter;
354///
355/// #[tokio::main]
356/// async fn main() {
357///     let observability_settings = Observability::default();
358///
359///     // trace
360///     let filter = TelemetryFilter::default();
361///     observability_settings.tracing_init(&filter);
362/// }
363/// ```
364#[derive(Debug, Deserialize, Serialize, Clone)]
365pub struct Observability {
366    /// Name of ProSA from observability perspective
367    service_name: Option<String>,
368    /// Additional attributes for all telemetry data
369    #[serde(default)]
370    attributes: HashMap<String, String>,
371    /// Global level for observability
372    #[serde(default)]
373    level: TelemetryLevel,
374    /// Metrics settings of a ProSA
375    metrics: Option<TelemetryMetrics>,
376    /// Logs settings of a ProSA
377    logs: Option<TelemetryData>,
378    /// Traces settings of a ProSA
379    traces: Option<TelemetryData>,
380}
381
382impl Observability {
383    pub(crate) fn common_scope_attributes(service_name: String, capacity: usize) -> Vec<KeyValue> {
384        let mut scope_attributes = Vec::with_capacity(capacity + 3);
385        scope_attributes.push(KeyValue::new("service.name", service_name));
386
387        match std::env::consts::ARCH {
388            "x86_64" => scope_attributes.push(KeyValue::new("host.arch", "amd64")),
389            "aarch64" => scope_attributes.push(KeyValue::new("host.arch", "arm64")),
390            "arm" => scope_attributes.push(KeyValue::new("host.arch", "arm32")),
391            _ => {}
392        }
393
394        match std::env::consts::OS {
395            "linux" => scope_attributes.push(KeyValue::new("os.type", "linux")),
396            "macos" => scope_attributes.push(KeyValue::new("os.type", "darwin")),
397            "freebsd" => scope_attributes.push(KeyValue::new("os.type", "freebsd")),
398            "openbsd" => scope_attributes.push(KeyValue::new("os.type", "openbsd")),
399            "netbsd" => scope_attributes.push(KeyValue::new("os.type", "netbsd")),
400            "windows" => scope_attributes.push(KeyValue::new("os.type", "windows")),
401            _ => {}
402        }
403
404        scope_attributes
405    }
406
407    /// Create an observability object with inline parameter instead of getting it from an external configuration
408    pub fn new(level: TelemetryLevel) -> Observability {
409        Observability {
410            service_name: None,
411            attributes: HashMap::new(),
412            level,
413            metrics: Some(TelemetryMetrics::default()),
414            logs: Some(TelemetryData::default()),
415            traces: Some(TelemetryData::default()),
416        }
417    }
418
419    /// Setter of the ProSA name for all observability `service.name` attributes
420    pub fn set_prosa_name(&mut self, name: &str) {
421        if self.service_name.is_none() {
422            self.service_name = Some(name.to_string());
423        }
424    }
425
426    /// Getter of the common scope attributes
427    pub fn get_scope_attributes(&self) -> Vec<KeyValue> {
428        // start with common attributes
429        let mut scope_attr = if let Some(service_name) = self.attributes.get("service.name") {
430            Self::common_scope_attributes(service_name.clone(), self.attributes.len() + 2)
431        } else {
432            Self::common_scope_attributes(
433                self.service_name.clone().unwrap_or("prosa".to_string()),
434                self.attributes.len() + 2,
435            )
436        };
437
438        if !self.attributes.contains_key("host.name")
439            && let Some(hostname) = super::hostname()
440        {
441            scope_attr.push(KeyValue::new("host.name", hostname));
442        }
443
444        if !self.attributes.contains_key("service.version") {
445            scope_attr.push(KeyValue::new("service.version", env!("CARGO_PKG_VERSION")));
446        }
447
448        // append custom attributes from configuration
449        scope_attr.append(
450            self.attributes
451                .iter()
452                .map(|(k, v)| {
453                    KeyValue::new(k.clone(), opentelemetry::Value::String(v.clone().into()))
454                })
455                .collect::<Vec<KeyValue>>()
456                .as_mut(),
457        );
458
459        scope_attr
460    }
461
462    /// Getter of the log level (max value)
463    pub fn get_logger_level(&self) -> TelemetryLevel {
464        if let Some(logs) = &self.logs {
465            let logs_level = logs.get_max_level();
466            if logs_level > self.level {
467                logs_level
468            } else {
469                self.level
470            }
471        } else {
472            self.level
473        }
474    }
475
476    /// Meter provider builder
477    #[cfg(feature = "config-observability-prometheus")]
478    pub fn build_meter_provider(&self, registry: &prometheus::Registry) -> SdkMeterProvider {
479        if let Some(settings) = &self.metrics {
480            settings
481                .build_provider(self.get_scope_attributes(), registry)
482                .unwrap_or_default()
483        } else {
484            SdkMeterProvider::default()
485        }
486    }
487
488    /// Meter provider builder
489    #[cfg(not(feature = "config-observability-prometheus"))]
490    pub fn build_meter_provider(&self) -> SdkMeterProvider {
491        if let Some(settings) = &self.metrics {
492            settings.build_provider().unwrap_or_default()
493        } else {
494            SdkMeterProvider::default()
495        }
496    }
497
498    /// Logger provider builder
499    pub fn build_logger_provider(&self) -> (SdkLoggerProvider, TelemetryLevel) {
500        if let Some(settings) = &self.logs {
501            match settings.build_logger_provider(self.get_scope_attributes()) {
502                Ok(m) => m,
503                Err(_) => (
504                    SdkLoggerProvider::builder().build(),
505                    TelemetryLevel::default(),
506                ),
507            }
508        } else {
509            (
510                SdkLoggerProvider::builder().build(),
511                TelemetryLevel::default(),
512            )
513        }
514    }
515
516    /// Tracer provider builder
517    ///
518    /// ```
519    /// use opentelemetry::{global, trace::TracerProvider};
520    /// use prosa_utils::config::observability::Observability;
521    ///
522    /// let otel_settings = Observability::default();
523    /// let tracer = otel_settings
524    ///     .build_tracer_provider()
525    ///     .tracer("prosa_proc_example");
526    /// ```
527    pub fn build_tracer_provider(&self) -> SdkTracerProvider {
528        if let Some(settings) = &self.traces {
529            settings
530                .build_tracer_provider(self.get_scope_attributes())
531                .unwrap_or_default()
532        } else {
533            SdkTracerProvider::default()
534        }
535    }
536
537    /// Tracer builder
538    ///
539    /// ```
540    /// use opentelemetry::{global, trace::Tracer};
541    /// use prosa_utils::config::observability::Observability;
542    ///
543    /// let otel_settings = Observability::default();
544    /// let tracer = otel_settings
545    ///     .build_tracer();
546    /// ```
547    pub fn build_tracer(&self) -> Tracer {
548        if let Some(settings) = &self.traces {
549            match settings.build_tracer(
550                self.service_name.as_deref().unwrap_or("prosa"),
551                self.get_scope_attributes(),
552            ) {
553                Ok(m) => m,
554                Err(_) => SdkTracerProvider::default()
555                    .tracer(self.service_name.clone().unwrap_or("prosa".to_string())),
556            }
557        } else {
558            SdkTracerProvider::default()
559                .tracer(self.service_name.clone().unwrap_or("prosa".to_string()))
560        }
561    }
562
563    /// Method to init `tracing`
564    pub fn tracing_init(&self, filter: &TelemetryFilter) -> Result<(), TryInitError> {
565        let global_level: filter::LevelFilter = self.level.into();
566        let subscriber = tracing_subscriber::registry().with(global_level);
567
568        if let Some(traces) = &self.traces {
569            if let Some(otlp) = &traces.otlp {
570                let tracer = self.build_tracer();
571                let subscriber_filter = filter.clone_with_level(otlp.level.unwrap_or_default());
572                let subscriber = subscriber.with(
573                    tracing_opentelemetry::layer()
574                        .with_tracer(tracer)
575                        .with_filter(subscriber_filter),
576                );
577
578                if let Some(stdout) = traces.stdout {
579                    let subscriber_filter =
580                        filter.clone_with_level(stdout.level.unwrap_or_default());
581                    subscriber
582                        .with(tracing_subscriber::fmt::Layer::new().with_filter(subscriber_filter))
583                        .try_init()
584                } else {
585                    subscriber.try_init()
586                }
587            } else if let Some(stdout) = traces.stdout {
588                let subscriber_filter = filter.clone_with_level(stdout.level.unwrap_or_default());
589                subscriber
590                    .with(tracing_subscriber::fmt::Layer::new().with_filter(subscriber_filter))
591                    .try_init()
592            } else {
593                subscriber.try_init()
594            }
595        } else if let Some(logs) = &self.logs
596            && let Ok((logger_provider, level)) =
597                logs.build_logger_provider(self.get_scope_attributes())
598            && level > TelemetryLevel::OFF
599        {
600            let logger_filter = filter.clone_with_level(level);
601            subscriber
602                .with(
603                    opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge::new(
604                        &logger_provider,
605                    )
606                    .with_filter(logger_filter),
607                )
608                .try_init()
609        } else {
610            subscriber.try_init()
611        }
612    }
613}
614
615impl Default for Observability {
616    fn default() -> Self {
617        Self {
618            service_name: None,
619            attributes: HashMap::new(),
620            level: TelemetryLevel::default(),
621            metrics: Some(TelemetryMetrics::default()),
622            logs: Some(TelemetryData {
623                otlp: None,
624                stdout: Some(StdoutExporterCfg {
625                    level: Some(TelemetryLevel::DEBUG),
626                }),
627            }),
628            traces: Some(TelemetryData {
629                otlp: None,
630                stdout: Some(StdoutExporterCfg {
631                    level: Some(TelemetryLevel::DEBUG),
632                }),
633            }),
634        }
635    }
636}