1use opentelemetry::{KeyValue, trace::TracerProvider as _};
4use opentelemetry_otlp::{
5 ExportConfig, ExporterBuildError, Protocol, WithExportConfig, WithHttpConfig,
6};
7use opentelemetry_sdk::{
8 logs::SdkLoggerProvider,
9 metrics::SdkMeterProvider,
10 trace::{SdkTracerProvider, Tracer},
11};
12use serde::{Deserialize, Serialize};
13use std::{collections::HashMap, time::Duration};
14use tracing_subscriber::{filter, prelude::*};
15use tracing_subscriber::{layer::SubscriberExt, util::TryInitError};
16use url::Url;
17
18use crate::config::url_authentication;
19
20use super::tracing::{TelemetryFilter, TelemetryLevel};
21
22#[derive(Debug, Deserialize, Serialize, Clone)]
24pub(crate) struct OTLPExporterCfg {
25 pub(crate) level: Option<TelemetryLevel>,
26 endpoint: Url,
27 #[serde(skip_serializing)]
28 timeout_sec: Option<u64>,
29}
30
31impl OTLPExporterCfg {
32 pub(crate) fn get_protocol(&self) -> Protocol {
33 match self.endpoint.scheme().to_lowercase().as_str() {
34 "grpc" => Protocol::Grpc,
35 "http/json" => Protocol::HttpJson,
36 _ => Protocol::HttpBinary,
37 }
38 }
39
40 pub(crate) fn get_header(&self) -> HashMap<String, String> {
41 let mut headers = HashMap::with_capacity(1);
42 if let Some(authorization) = url_authentication(&self.endpoint) {
43 headers.insert("Authorization".to_string(), authorization);
44 }
45 headers
46 }
47
48 pub(crate) fn get_resource(
49 &self,
50 attr: Vec<KeyValue>,
51 ) -> opentelemetry_sdk::resource::Resource {
52 opentelemetry_sdk::resource::Resource::builder()
53 .with_attributes(attr)
54 .with_attribute(opentelemetry::KeyValue::new(
55 "process.creation.time",
56 chrono::Utc::now().to_rfc3339(),
57 ))
58 .with_attribute(opentelemetry::KeyValue::new(
59 "process.pid",
60 opentelemetry::Value::I64(std::process::id() as i64),
61 ))
62 .build()
63 }
64}
65
66impl From<OTLPExporterCfg> for ExportConfig {
67 fn from(value: OTLPExporterCfg) -> Self {
68 let protocol = value.get_protocol();
69 let mut endpoint = value.endpoint;
70 if !endpoint.username().is_empty() {
71 let _ = endpoint.set_username("");
72 }
73 if endpoint.password().is_some() {
74 let _ = endpoint.set_password(None);
75 }
76
77 ExportConfig {
78 endpoint: Some(endpoint.to_string()),
79 timeout: value.timeout_sec.map(Duration::from_secs),
80 protocol,
81 }
82 }
83}
84
85impl Default for OTLPExporterCfg {
86 fn default() -> Self {
87 Self {
88 level: None,
89 endpoint: Url::parse("grpc://localhost:4317").unwrap(),
90 timeout_sec: None,
91 }
92 }
93}
94
95#[cfg(feature = "config-observability-prometheus")]
96#[derive(Default, Debug, Deserialize, Serialize, Clone)]
98pub struct PrometheusExporterCfg {
99 endpoint: Option<String>,
100}
101
102#[cfg(feature = "config-observability-prometheus")]
103impl PrometheusExporterCfg {
104 pub(crate) fn init_prometheus_server(
106 &self,
107 registry: &prometheus::Registry,
108 ) -> Result<(), ExporterBuildError> {
109 if let Some(endpoint) = self.endpoint.clone() {
110 let registry = registry.clone();
111 tokio::task::spawn(async move {
112 match tokio::net::TcpListener::bind(endpoint).await {
113 Ok(listener) => {
114 loop {
115 if let Ok((stream, _)) = listener.accept().await {
116 let io = hyper_util::rt::TokioIo::new(stream);
117 let registry = registry.clone();
118 tokio::task::spawn(async move {
119 if let Err(err) = hyper::server::conn::http1::Builder::new()
120 .serve_connection(
121 io,
122 #[allow(unused)]
123 hyper::service::service_fn(|req| {
124 let registry = registry.clone();
125 async move {
126 let metric_families = registry.gather();
127 let encoder = prometheus::TextEncoder::new();
128 if let Ok(metric_data) =
129 encoder.encode_to_string(&metric_families)
130 {
131 let response = hyper::Response::builder()
132 .header(hyper::header::SERVER, concat!("ProSA/", env!("CARGO_PKG_VERSION")))
133 .header(
134 hyper::header::CONTENT_TYPE,
135 "text/plain; version=1.0.0",
136 );
137
138 #[cfg(feature = "config-observability-gzip")]
139 if req.headers().get(hyper::header::ACCEPT_ENCODING).is_some_and(|a| a.to_str().is_ok_and(|v| v.contains("gzip"))) {
140 let mut gz_encoder = flate2::write::GzEncoder::new(Vec::with_capacity(2048), flate2::Compression::fast());
141 if std::io::Write::write_all(&mut gz_encoder, metric_data.as_bytes()).is_ok()
142 && let Ok(compressed_data) = gz_encoder.finish()
143 {
144 return response
145 .header(hyper::header::CONTENT_ENCODING, "gzip")
146 .body(http_body_util::Full::new(
147 bytes::Bytes::from(compressed_data)),
148 )
149 .map_err(|e| e.to_string());
150 }
151 }
152
153 response
154 .body(http_body_util::Full::new(
155 bytes::Bytes::from(metric_data),
156 ))
157 .map_err(|e| e.to_string())
158 } else {
159 Err("Can't serialize metrics".to_string())
160 }
161 }
162 }),
163 )
164 .await
165 {
166 log::debug!(target: "prosa::observability::prometheus_server", "Error serving prometheus connection: {err:?}");
167 }
168 });
169 }
170 }
171 }
172 Err(e) => {
173 log::error!(target: "prosa::observability::prometheus_server", "Failed to bind Prometheus metrics server: {e}");
174 }
175 }
176 });
177 }
178
179 Ok(())
180 }
181
182 pub(crate) fn get_resource(
183 &self,
184 attr: Vec<KeyValue>,
185 ) -> opentelemetry_sdk::resource::Resource {
186 opentelemetry_sdk::resource::Resource::builder()
187 .with_attributes(attr)
188 .build()
189 }
190}
191
192#[derive(Default, Debug, Deserialize, Serialize, Copy, Clone)]
194pub(crate) struct StdoutExporterCfg {
195 #[serde(default)]
196 pub(crate) level: Option<TelemetryLevel>,
197}
198
199#[derive(Default, Debug, Deserialize, Serialize, Clone)]
201pub struct TelemetryMetrics {
202 otlp: Option<OTLPExporterCfg>,
203 #[cfg(feature = "config-observability-prometheus")]
204 prometheus: Option<PrometheusExporterCfg>,
205 stdout: Option<StdoutExporterCfg>,
206}
207
208impl TelemetryMetrics {
209 fn build_provider(
211 &self,
212 #[cfg(feature = "config-observability-prometheus")] resource_attr: Vec<KeyValue>,
213 #[cfg(feature = "config-observability-prometheus")] registry: &prometheus::Registry,
214 ) -> Result<SdkMeterProvider, ExporterBuildError> {
215 let mut meter_provider = SdkMeterProvider::builder();
216 if let Some(s) = &self.otlp {
217 let exporter = if s.get_protocol() == Protocol::Grpc {
218 opentelemetry_otlp::MetricExporter::builder()
219 .with_tonic()
220 .with_export_config(s.clone().into())
221 .build()
222 } else {
223 opentelemetry_otlp::MetricExporter::builder()
224 .with_http()
225 .with_headers(s.get_header())
226 .with_export_config(s.clone().into())
227 .build()
228 }?;
229 meter_provider = meter_provider.with_periodic_exporter(exporter);
230 }
231
232 #[cfg(feature = "config-observability-prometheus")]
233 if let Some(prom) = &self.prometheus {
234 let exporter = opentelemetry_prometheus::exporter()
236 .with_registry(registry.clone())
237 .with_resource_selector(opentelemetry_prometheus::ResourceSelector::All)
238 .without_target_info()
239 .build()
240 .map_err(|e| ExporterBuildError::InternalFailure(e.to_string()))?;
241 meter_provider = meter_provider
242 .with_resource(prom.get_resource(resource_attr))
243 .with_reader(exporter);
244
245 prom.init_prometheus_server(registry)?;
247 }
248
249 if self.stdout.is_some() {
250 let exporter = opentelemetry_stdout::MetricExporter::default();
251 meter_provider = meter_provider.with_periodic_exporter(exporter);
252 }
253
254 Ok(meter_provider.build())
255 }
256}
257
258#[derive(Debug, Deserialize, Serialize, Clone)]
260pub struct TelemetryData {
261 otlp: Option<OTLPExporterCfg>,
262 stdout: Option<StdoutExporterCfg>,
263}
264
265impl TelemetryData {
266 fn get_max_level(&self) -> TelemetryLevel {
268 if let Some(otlp_level) = self.otlp.as_ref().and_then(|o| o.level) {
269 if let Some(stdout_level) = self.stdout.as_ref().and_then(|l| l.level) {
270 if otlp_level > stdout_level {
271 otlp_level
272 } else {
273 stdout_level
274 }
275 } else {
276 otlp_level
277 }
278 } else if let Some(stdout_level) = self.stdout.as_ref().and_then(|l| l.level) {
279 stdout_level
280 } else {
281 TelemetryLevel::TRACE
282 }
283 }
284
285 fn build_logger_provider(
287 &self,
288 resource_attr: Vec<KeyValue>,
289 ) -> Result<(SdkLoggerProvider, TelemetryLevel), ExporterBuildError> {
290 let logs_provider = SdkLoggerProvider::builder();
291 if let Some(s) = &self.otlp {
292 let exporter = if s.get_protocol() == Protocol::Grpc {
293 opentelemetry_otlp::LogExporter::builder()
294 .with_tonic()
295 .with_export_config(s.clone().into())
296 .build()
297 } else {
298 opentelemetry_otlp::LogExporter::builder()
299 .with_http()
300 .with_headers(s.get_header())
301 .with_export_config(s.clone().into())
302 .build()
303 }?;
304 Ok((
305 logs_provider
306 .with_resource(s.get_resource(resource_attr))
307 .with_batch_exporter(exporter)
308 .build(),
309 s.level.unwrap_or_default(),
310 ))
311 } else if let Some(stdout) = &self.stdout {
312 Ok((
313 logs_provider
314 .with_simple_exporter(opentelemetry_stdout::LogExporter::default())
315 .build(),
316 stdout.level.unwrap_or_default(),
317 ))
318 } else {
319 Ok((logs_provider.build(), TelemetryLevel::OFF))
320 }
321 }
322
323 fn build_tracer_provider(
325 &self,
326 resource_attr: Vec<KeyValue>,
327 ) -> Result<SdkTracerProvider, ExporterBuildError> {
328 let mut trace_provider = SdkTracerProvider::builder();
329 if let Some(s) = &self.otlp {
330 let exporter = if s.get_protocol() == Protocol::Grpc {
331 opentelemetry_otlp::SpanExporter::builder()
332 .with_tonic()
333 .with_export_config(s.clone().into())
334 .build()
335 } else {
336 opentelemetry_otlp::SpanExporter::builder()
337 .with_http()
338 .with_headers(s.get_header())
339 .with_export_config(s.clone().into())
340 .build()
341 }?;
342
343 trace_provider = trace_provider
344 .with_resource(s.get_resource(resource_attr))
345 .with_batch_exporter(exporter);
346 }
347
348 Ok(trace_provider.build())
349 }
350
351 fn build_tracer(
353 &self,
354 name: &str,
355 resource_attr: Vec<KeyValue>,
356 ) -> Result<Tracer, ExporterBuildError> {
357 self.build_tracer_provider(resource_attr)
358 .map(|p| p.tracer(name.to_string()))
359 }
360}
361
362impl Default for TelemetryData {
363 fn default() -> Self {
364 TelemetryData {
365 otlp: None,
366 stdout: Some(StdoutExporterCfg::default()),
367 }
368 }
369}
370
371#[derive(Debug, Deserialize, Serialize, Clone)]
390pub struct Observability {
391 #[serde(default)]
393 attributes: HashMap<String, String>,
394 #[serde(default)]
396 level: TelemetryLevel,
397 metrics: Option<TelemetryMetrics>,
399 logs: Option<TelemetryData>,
401 traces: Option<TelemetryData>,
403}
404
405impl Observability {
406 pub(crate) fn common_scope_attributes(service_name: String, capacity: usize) -> Vec<KeyValue> {
407 let mut scope_attributes = Vec::with_capacity(capacity + 3);
408 scope_attributes.push(KeyValue::new("service.name", service_name));
409
410 match std::env::consts::ARCH {
411 "x86_64" => scope_attributes.push(KeyValue::new("host.arch", "amd64")),
412 "aarch64" => scope_attributes.push(KeyValue::new("host.arch", "arm64")),
413 "arm" => scope_attributes.push(KeyValue::new("host.arch", "arm32")),
414 _ => {}
415 }
416
417 match std::env::consts::OS {
418 "linux" => scope_attributes.push(KeyValue::new("os.type", "linux")),
419 "macos" => scope_attributes.push(KeyValue::new("os.type", "darwin")),
420 "freebsd" => scope_attributes.push(KeyValue::new("os.type", "freebsd")),
421 "openbsd" => scope_attributes.push(KeyValue::new("os.type", "openbsd")),
422 "netbsd" => scope_attributes.push(KeyValue::new("os.type", "netbsd")),
423 "windows" => scope_attributes.push(KeyValue::new("os.type", "windows")),
424 _ => {}
425 }
426
427 scope_attributes
428 }
429
430 pub fn new(level: TelemetryLevel) -> Observability {
432 Observability {
433 attributes: HashMap::new(),
434 level,
435 metrics: Some(TelemetryMetrics::default()),
436 logs: Some(TelemetryData::default()),
437 traces: Some(TelemetryData::default()),
438 }
439 }
440
441 pub fn get_service_name(&self) -> &str {
443 self.attributes
444 .get("service.name")
445 .map(|s| s.as_ref())
446 .unwrap_or("prosa")
447 }
448
449 pub fn set_prosa_name(&mut self, name: &str) {
451 self.attributes
452 .entry("service.name".to_string())
453 .or_insert_with(|| name.to_string());
454 }
455
456 pub fn get_scope_attributes(&self) -> Vec<KeyValue> {
458 let mut scope_attr = Self::common_scope_attributes(
460 self.get_service_name().to_string(),
461 self.attributes.len() + 2,
462 );
463
464 if !self.attributes.contains_key("host.name")
465 && let Some(hostname) = super::hostname()
466 {
467 scope_attr.push(KeyValue::new("host.name", hostname));
468 }
469
470 if !self.attributes.contains_key("service.version") {
471 scope_attr.push(KeyValue::new("service.version", env!("CARGO_PKG_VERSION")));
472 }
473
474 scope_attr.append(
476 self.attributes
477 .iter()
478 .map(|(k, v)| {
479 KeyValue::new(k.clone(), opentelemetry::Value::String(v.clone().into()))
480 })
481 .collect::<Vec<KeyValue>>()
482 .as_mut(),
483 );
484
485 scope_attr
486 }
487
488 pub fn get_logger_level(&self) -> TelemetryLevel {
490 if let Some(logs) = &self.logs {
491 let logs_level = logs.get_max_level();
492 if logs_level > self.level {
493 logs_level
494 } else {
495 self.level
496 }
497 } else {
498 self.level
499 }
500 }
501
502 #[cfg(feature = "config-observability-prometheus")]
504 pub fn build_meter_provider(&self, registry: &prometheus::Registry) -> SdkMeterProvider {
505 if let Some(settings) = &self.metrics {
506 settings
507 .build_provider(self.get_scope_attributes(), registry)
508 .unwrap_or_default()
509 } else {
510 SdkMeterProvider::default()
511 }
512 }
513
514 #[cfg(not(feature = "config-observability-prometheus"))]
516 pub fn build_meter_provider(&self) -> SdkMeterProvider {
517 if let Some(settings) = &self.metrics {
518 settings.build_provider().unwrap_or_default()
519 } else {
520 SdkMeterProvider::default()
521 }
522 }
523
524 pub fn build_logger_provider(&self) -> (SdkLoggerProvider, TelemetryLevel) {
526 if let Some(settings) = &self.logs {
527 match settings.build_logger_provider(self.get_scope_attributes()) {
528 Ok(m) => m,
529 Err(_) => (
530 SdkLoggerProvider::builder().build(),
531 TelemetryLevel::default(),
532 ),
533 }
534 } else {
535 (
536 SdkLoggerProvider::builder().build(),
537 TelemetryLevel::default(),
538 )
539 }
540 }
541
542 pub fn build_tracer_provider(&self) -> SdkTracerProvider {
554 if let Some(settings) = &self.traces {
555 settings
556 .build_tracer_provider(self.get_scope_attributes())
557 .unwrap_or_default()
558 } else {
559 SdkTracerProvider::default()
560 }
561 }
562
563 pub fn build_tracer(&self) -> Tracer {
574 if let Some(settings) = &self.traces {
575 match settings.build_tracer(self.get_service_name(), self.get_scope_attributes()) {
576 Ok(m) => m,
577 Err(_) => SdkTracerProvider::default().tracer(self.get_service_name().to_string()),
578 }
579 } else {
580 SdkTracerProvider::default().tracer(self.get_service_name().to_string())
581 }
582 }
583
584 pub fn tracing_init(&self, filter: &TelemetryFilter) -> Result<(), TryInitError> {
586 let global_level: filter::LevelFilter = self.level.into();
587 let subscriber = tracing_subscriber::registry().with(global_level);
588
589 if let Some(traces) = &self.traces {
590 if let Some(otlp) = &traces.otlp {
591 let tracer = self.build_tracer();
592 let subscriber_filter = filter.clone_with_level(otlp.level.unwrap_or_default());
593 let subscriber = subscriber.with(
594 tracing_opentelemetry::layer()
595 .with_tracer(tracer)
596 .with_filter(subscriber_filter),
597 );
598
599 if let Some(stdout) = traces.stdout {
600 let subscriber_filter =
601 filter.clone_with_level(stdout.level.unwrap_or_default());
602 subscriber
603 .with(tracing_subscriber::fmt::Layer::new().with_filter(subscriber_filter))
604 .try_init()
605 } else {
606 subscriber.try_init()
607 }
608 } else if let Some(stdout) = traces.stdout {
609 let subscriber_filter = filter.clone_with_level(stdout.level.unwrap_or_default());
610 subscriber
611 .with(tracing_subscriber::fmt::Layer::new().with_filter(subscriber_filter))
612 .try_init()
613 } else {
614 subscriber.try_init()
615 }
616 } else if let Some(logs) = &self.logs
617 && let Ok((logger_provider, level)) =
618 logs.build_logger_provider(self.get_scope_attributes())
619 && level > TelemetryLevel::OFF
620 {
621 let logger_filter = filter.clone_with_level(level);
622 subscriber
623 .with(
624 opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge::new(
625 &logger_provider,
626 )
627 .with_filter(logger_filter),
628 )
629 .try_init()
630 } else {
631 subscriber.try_init()
632 }
633 }
634}
635
636impl Default for Observability {
637 fn default() -> Self {
638 Self {
639 attributes: HashMap::new(),
640 level: TelemetryLevel::default(),
641 metrics: Some(TelemetryMetrics::default()),
642 logs: Some(TelemetryData {
643 otlp: None,
644 stdout: Some(StdoutExporterCfg {
645 level: Some(TelemetryLevel::DEBUG),
646 }),
647 }),
648 traces: Some(TelemetryData {
649 otlp: None,
650 stdout: Some(StdoutExporterCfg {
651 level: Some(TelemetryLevel::DEBUG),
652 }),
653 }),
654 }
655 }
656}