1use opentelemetry::{KeyValue, trace::TracerProvider as _};
4use opentelemetry_otlp::{
5 ExportConfig, ExporterBuildError, Protocol, WithExportConfig, WithHttpConfig,
6};
7use opentelemetry_sdk::{
8 logs::SdkLoggerProvider,
9 metrics::SdkMeterProvider,
10 trace::{SdkTracerProvider, Tracer},
11};
12use serde::{Deserialize, Serialize};
13use std::{collections::HashMap, time::Duration};
14use tracing_subscriber::{filter, prelude::*};
15use tracing_subscriber::{layer::SubscriberExt, util::TryInitError};
16use url::Url;
17
18use crate::config::url_authentication;
19
20use super::tracing::{TelemetryFilter, TelemetryLevel};
21
22#[derive(Debug, Deserialize, Serialize, Clone)]
24pub(crate) struct OTLPExporterCfg {
25 pub(crate) level: Option<TelemetryLevel>,
26 endpoint: Url,
27 #[serde(skip_serializing)]
28 timeout_sec: Option<u64>,
29}
30
31impl OTLPExporterCfg {
32 pub(crate) fn get_protocol(&self) -> Protocol {
33 match self.endpoint.scheme().to_lowercase().as_str() {
34 "grpc" => Protocol::Grpc,
35 "http/json" => Protocol::HttpJson,
36 _ => Protocol::HttpBinary,
37 }
38 }
39
40 pub(crate) fn get_header(&self) -> HashMap<String, String> {
41 let mut headers = HashMap::with_capacity(1);
42 if let Some(authorization) = url_authentication(&self.endpoint) {
43 headers.insert("Authorization".to_string(), authorization);
44 }
45 headers
46 }
47
48 pub(crate) fn get_resource(
49 &self,
50 attr: Vec<KeyValue>,
51 ) -> opentelemetry_sdk::resource::Resource {
52 opentelemetry_sdk::resource::Resource::builder()
53 .with_attributes(attr)
54 .with_attribute(opentelemetry::KeyValue::new(
55 "process.creation.time",
56 chrono::Utc::now().to_rfc3339(),
57 ))
58 .with_attribute(opentelemetry::KeyValue::new(
59 "process.pid",
60 opentelemetry::Value::I64(std::process::id() as i64),
61 ))
62 .build()
63 }
64}
65
66impl From<OTLPExporterCfg> for ExportConfig {
67 fn from(value: OTLPExporterCfg) -> Self {
68 let protocol = value.get_protocol();
69 let mut endpoint = value.endpoint;
70 if !endpoint.username().is_empty() {
71 let _ = endpoint.set_username("");
72 }
73 if endpoint.password().is_some() {
74 let _ = endpoint.set_password(None);
75 }
76
77 ExportConfig {
78 endpoint: Some(endpoint.to_string()),
79 timeout: value.timeout_sec.map(Duration::from_secs),
80 protocol,
81 }
82 }
83}
84
85impl Default for OTLPExporterCfg {
86 fn default() -> Self {
87 Self {
88 level: None,
89 endpoint: Url::parse("grpc://localhost:4317").expect("default OTLP address is invalid"),
90 timeout_sec: None,
91 }
92 }
93}
94
95#[cfg(feature = "config-observability-prometheus")]
96#[derive(Default, Debug, Deserialize, Serialize, Clone)]
98pub struct PrometheusExporterCfg {
99 endpoint: Option<String>,
100}
101
102#[cfg(feature = "config-observability-prometheus")]
103impl PrometheusExporterCfg {
104 pub(crate) fn init_prometheus_server(
106 &self,
107 registry: &prometheus::Registry,
108 ) -> Result<(), ExporterBuildError> {
109 if let Some(endpoint) = self.endpoint.clone() {
110 let registry = registry.clone();
111 tokio::task::spawn(async move {
112 match tokio::net::TcpListener::bind(endpoint).await {
113 Ok(listener) => {
114 loop {
115 if let Ok((stream, _)) = listener.accept().await {
116 let io = hyper_util::rt::TokioIo::new(stream);
117 let registry = registry.clone();
118 tokio::task::spawn(async move {
119 if let Err(err) = hyper::server::conn::http1::Builder::new()
120 .serve_connection(
121 io,
122 #[allow(unused)]
123 hyper::service::service_fn(|req| {
124 let registry = registry.clone();
125 async move {
126 let metric_families = registry.gather();
127 let encoder = prometheus::TextEncoder::new();
128 if let Ok(metric_data) =
129 encoder.encode_to_string(&metric_families)
130 {
131 let response = hyper::Response::builder()
132 .header(hyper::header::SERVER, concat!("ProSA/", env!("CARGO_PKG_VERSION")))
133 .header(
134 hyper::header::CONTENT_TYPE,
135 "text/plain; version=1.0.0",
136 );
137
138 #[cfg(feature = "config-observability-gzip")]
139 if req.headers().get(hyper::header::ACCEPT_ENCODING).is_some_and(|a| a.to_str().is_ok_and(|v| v.contains("gzip"))) {
140 let mut gz_encoder = flate2::write::GzEncoder::new(Vec::with_capacity(2048), flate2::Compression::fast());
141 if std::io::Write::write_all(&mut gz_encoder, metric_data.as_bytes()).is_ok()
142 && let Ok(compressed_data) = gz_encoder.finish()
143 {
144 return response
145 .header(hyper::header::CONTENT_ENCODING, "gzip")
146 .body(http_body_util::Full::new(
147 bytes::Bytes::from(compressed_data)),
148 )
149 .map_err(|e| e.to_string());
150 }
151 }
152
153 response
154 .body(http_body_util::Full::new(
155 bytes::Bytes::from(metric_data),
156 ))
157 .map_err(|e| e.to_string())
158 } else {
159 Err("Can't serialize metrics".to_string())
160 }
161 }
162 }),
163 )
164 .await
165 {
166 log::debug!(target: "prosa::observability::prometheus_server", "Error serving prometheus connection: {err:?}");
167 }
168 });
169 }
170 }
171 }
172 Err(e) => {
173 log::error!(target: "prosa::observability::prometheus_server", "Failed to bind Prometheus metrics server: {e}");
174 }
175 }
176 });
177 }
178
179 Ok(())
180 }
181
182 pub(crate) fn get_resource(
183 &self,
184 attr: Vec<KeyValue>,
185 ) -> opentelemetry_sdk::resource::Resource {
186 opentelemetry_sdk::resource::Resource::builder()
187 .with_attributes(attr)
188 .build()
189 }
190}
191
192#[derive(Default, Debug, Deserialize, Serialize, Copy, Clone)]
194pub(crate) struct StdoutExporterCfg {
195 #[serde(default)]
196 pub(crate) level: Option<TelemetryLevel>,
197}
198
199#[derive(Default, Debug, Deserialize, Serialize, Clone)]
201pub struct TelemetryMetrics {
202 otlp: Option<OTLPExporterCfg>,
203 #[cfg(feature = "config-observability-prometheus")]
204 prometheus: Option<PrometheusExporterCfg>,
205 stdout: Option<StdoutExporterCfg>,
206}
207
208impl TelemetryMetrics {
209 fn build_provider(
211 &self,
212 #[cfg(feature = "config-observability-prometheus")] resource_attr: Vec<KeyValue>,
213 #[cfg(feature = "config-observability-prometheus")] registry: &prometheus::Registry,
214 ) -> Result<SdkMeterProvider, ExporterBuildError> {
215 let mut meter_provider = SdkMeterProvider::builder();
216 if let Some(s) = &self.otlp {
217 let exporter = if s.get_protocol() == Protocol::Grpc {
218 opentelemetry_otlp::MetricExporter::builder()
219 .with_tonic()
220 .with_export_config(s.clone().into())
221 .build()
222 } else {
223 opentelemetry_otlp::MetricExporter::builder()
224 .with_http()
225 .with_headers(s.get_header())
226 .with_export_config(s.clone().into())
227 .build()
228 }?;
229 meter_provider = meter_provider.with_periodic_exporter(exporter);
230 }
231
232 #[cfg(feature = "config-observability-prometheus")]
233 if let Some(prom) = &self.prometheus {
234 let exporter = opentelemetry_prometheus::exporter()
236 .with_registry(registry.clone())
237 .with_resource_selector(opentelemetry_prometheus::ResourceSelector::All)
238 .without_target_info()
239 .build()
240 .map_err(|e| ExporterBuildError::InternalFailure(e.to_string()))?;
241 meter_provider = meter_provider
242 .with_resource(prom.get_resource(resource_attr))
243 .with_reader(exporter);
244
245 prom.init_prometheus_server(registry)?;
247 }
248
249 if self.stdout.is_some() {
250 let exporter = opentelemetry_stdout::MetricExporter::default();
251 meter_provider = meter_provider.with_periodic_exporter(exporter);
252 }
253
254 Ok(meter_provider.build())
255 }
256}
257
258#[derive(Debug, Deserialize, Serialize, Clone)]
260pub struct TelemetryData {
261 otlp: Option<OTLPExporterCfg>,
262 stdout: Option<StdoutExporterCfg>,
263}
264
265impl TelemetryData {
266 fn get_max_level(&self) -> TelemetryLevel {
268 if let Some(otlp_level) = self.otlp.as_ref().and_then(|o| o.level) {
269 if let Some(stdout_level) = self.stdout.as_ref().and_then(|l| l.level) {
270 if otlp_level > stdout_level {
271 otlp_level
272 } else {
273 stdout_level
274 }
275 } else {
276 otlp_level
277 }
278 } else if let Some(stdout_level) = self.stdout.as_ref().and_then(|l| l.level) {
279 stdout_level
280 } else {
281 TelemetryLevel::TRACE
282 }
283 }
284
285 fn build_logger_provider(
287 &self,
288 resource_attr: Vec<KeyValue>,
289 ) -> Result<(SdkLoggerProvider, TelemetryLevel), ExporterBuildError> {
290 let logs_provider = SdkLoggerProvider::builder();
291 if let Some(s) = &self.otlp {
292 let exporter = if s.get_protocol() == Protocol::Grpc {
293 opentelemetry_otlp::LogExporter::builder()
294 .with_tonic()
295 .with_export_config(s.clone().into())
296 .build()
297 } else {
298 opentelemetry_otlp::LogExporter::builder()
299 .with_http()
300 .with_headers(s.get_header())
301 .with_export_config(s.clone().into())
302 .build()
303 }?;
304 Ok((
305 logs_provider
306 .with_resource(s.get_resource(resource_attr))
307 .with_batch_exporter(exporter)
308 .build(),
309 s.level.unwrap_or_default(),
310 ))
311 } else if let Some(stdout) = &self.stdout {
312 Ok((
313 logs_provider
314 .with_simple_exporter(opentelemetry_stdout::LogExporter::default())
315 .build(),
316 stdout.level.unwrap_or_default(),
317 ))
318 } else {
319 Ok((logs_provider.build(), TelemetryLevel::OFF))
320 }
321 }
322
323 fn build_tracer_provider(
325 &self,
326 resource_attr: Vec<KeyValue>,
327 ) -> Result<SdkTracerProvider, ExporterBuildError> {
328 let mut trace_provider = SdkTracerProvider::builder();
329 if let Some(s) = &self.otlp {
330 let exporter = if s.get_protocol() == Protocol::Grpc {
331 opentelemetry_otlp::SpanExporter::builder()
332 .with_tonic()
333 .with_export_config(s.clone().into())
334 .build()
335 } else {
336 opentelemetry_otlp::SpanExporter::builder()
337 .with_http()
338 .with_headers(s.get_header())
339 .with_export_config(s.clone().into())
340 .build()
341 }?;
342
343 trace_provider = trace_provider
344 .with_resource(s.get_resource(resource_attr))
345 .with_batch_exporter(exporter);
346 }
347
348 Ok(trace_provider.build())
349 }
350
351 fn build_tracer(
353 &self,
354 name: &str,
355 resource_attr: Vec<KeyValue>,
356 ) -> Result<Tracer, ExporterBuildError> {
357 self.build_tracer_provider(resource_attr)
358 .map(|p| p.tracer(name.to_string()))
359 }
360}
361
362impl Default for TelemetryData {
363 fn default() -> Self {
364 TelemetryData {
365 otlp: None,
366 stdout: Some(StdoutExporterCfg::default()),
367 }
368 }
369}
370
371#[derive(Debug, Deserialize, Serialize, Clone)]
390pub struct Observability {
391 #[serde(default)]
393 attributes: HashMap<String, String>,
394 #[serde(default)]
396 level: TelemetryLevel,
397 metrics: Option<TelemetryMetrics>,
399 logs: Option<TelemetryData>,
401 traces: Option<TelemetryData>,
403}
404
405impl Observability {
406 pub(crate) fn common_scope_attributes(service_name: String, capacity: usize) -> Vec<KeyValue> {
407 let mut scope_attributes = Vec::with_capacity(capacity + 3);
408 scope_attributes.push(KeyValue::new("service.name", service_name));
409
410 match std::env::consts::ARCH {
411 "x86_64" => scope_attributes.push(KeyValue::new("host.arch", "amd64")),
412 "aarch64" => scope_attributes.push(KeyValue::new("host.arch", "arm64")),
413 "arm" => scope_attributes.push(KeyValue::new("host.arch", "arm32")),
414 _ => {}
415 }
416
417 match std::env::consts::OS {
418 "linux" => scope_attributes.push(KeyValue::new("os.type", "linux")),
419 "macos" => scope_attributes.push(KeyValue::new("os.type", "darwin")),
420 "freebsd" => scope_attributes.push(KeyValue::new("os.type", "freebsd")),
421 "openbsd" => scope_attributes.push(KeyValue::new("os.type", "openbsd")),
422 "netbsd" => scope_attributes.push(KeyValue::new("os.type", "netbsd")),
423 "windows" => scope_attributes.push(KeyValue::new("os.type", "windows")),
424 _ => {}
425 }
426
427 scope_attributes
428 }
429
430 pub fn new(level: TelemetryLevel) -> Observability {
432 Observability {
433 attributes: HashMap::new(),
434 level,
435 metrics: Some(TelemetryMetrics::default()),
436 logs: Some(TelemetryData::default()),
437 traces: Some(TelemetryData::default()),
438 }
439 }
440
441 pub fn get_service_name(&self) -> &str {
443 self.attributes
444 .get("service.name")
445 .map(|s| s.as_ref())
446 .unwrap_or("prosa")
447 }
448
449 pub fn set_prosa_name(&mut self, name: &str) {
451 self.attributes
452 .entry("service.name".to_string())
453 .or_insert_with(|| name.to_string());
454 }
455
456 pub fn get_scope_attributes(&self) -> Vec<KeyValue> {
458 let mut scope_attr = Self::common_scope_attributes(
460 self.get_service_name().to_string(),
461 self.attributes.len() + 3,
462 );
463
464 if !self.attributes.contains_key("host.name")
465 && let Some(hostname) = super::hostname()
466 {
467 scope_attr.push(KeyValue::new("host.name", hostname));
468 }
469
470 if !self.attributes.contains_key("service.instance.id") {
471 scope_attr.push(KeyValue::new("service.instance.id", super::hostid()));
472 }
473
474 if !self.attributes.contains_key("service.version") {
475 scope_attr.push(KeyValue::new("service.version", env!("CARGO_PKG_VERSION")));
476 }
477
478 scope_attr.append(
480 self.attributes
481 .iter()
482 .map(|(k, v)| {
483 KeyValue::new(k.clone(), opentelemetry::Value::String(v.clone().into()))
484 })
485 .collect::<Vec<KeyValue>>()
486 .as_mut(),
487 );
488
489 scope_attr
490 }
491
492 pub fn get_logger_level(&self) -> TelemetryLevel {
494 if let Some(logs) = &self.logs {
495 let logs_level = logs.get_max_level();
496 if logs_level > self.level {
497 logs_level
498 } else {
499 self.level
500 }
501 } else {
502 self.level
503 }
504 }
505
506 #[cfg(feature = "config-observability-prometheus")]
508 pub fn build_meter_provider(&self, registry: &prometheus::Registry) -> SdkMeterProvider {
509 if let Some(settings) = &self.metrics {
510 settings
511 .build_provider(self.get_scope_attributes(), registry)
512 .unwrap_or_default()
513 } else {
514 SdkMeterProvider::default()
515 }
516 }
517
518 #[cfg(not(feature = "config-observability-prometheus"))]
520 pub fn build_meter_provider(&self) -> SdkMeterProvider {
521 if let Some(settings) = &self.metrics {
522 settings.build_provider().unwrap_or_default()
523 } else {
524 SdkMeterProvider::default()
525 }
526 }
527
528 pub fn build_logger_provider(&self) -> (SdkLoggerProvider, TelemetryLevel) {
530 if let Some(settings) = &self.logs {
531 match settings.build_logger_provider(self.get_scope_attributes()) {
532 Ok(m) => m,
533 Err(_) => (
534 SdkLoggerProvider::builder().build(),
535 TelemetryLevel::default(),
536 ),
537 }
538 } else {
539 (
540 SdkLoggerProvider::builder().build(),
541 TelemetryLevel::default(),
542 )
543 }
544 }
545
546 pub fn build_tracer_provider(&self) -> SdkTracerProvider {
558 if let Some(settings) = &self.traces {
559 settings
560 .build_tracer_provider(self.get_scope_attributes())
561 .unwrap_or_default()
562 } else {
563 SdkTracerProvider::default()
564 }
565 }
566
567 pub fn build_tracer(&self) -> Tracer {
578 if let Some(settings) = &self.traces {
579 match settings.build_tracer(self.get_service_name(), self.get_scope_attributes()) {
580 Ok(m) => m,
581 Err(_) => SdkTracerProvider::default().tracer(self.get_service_name().to_string()),
582 }
583 } else {
584 SdkTracerProvider::default().tracer(self.get_service_name().to_string())
585 }
586 }
587
588 pub fn tracing_init(&self, filter: &TelemetryFilter) -> Result<(), TryInitError> {
590 let global_level: filter::LevelFilter = self.level.into();
591 let subscriber = tracing_subscriber::registry().with(global_level);
592
593 if let Some(traces) = &self.traces {
594 if let Some(otlp) = &traces.otlp {
595 let tracer = self.build_tracer();
596 let subscriber_filter = filter.clone_with_level(otlp.level.unwrap_or_default());
597 let subscriber = subscriber.with(
598 tracing_opentelemetry::layer()
599 .with_tracer(tracer)
600 .with_filter(subscriber_filter),
601 );
602
603 if let Some(stdout) = traces.stdout {
604 let subscriber_filter =
605 filter.clone_with_level(stdout.level.unwrap_or_default());
606 subscriber
607 .with(tracing_subscriber::fmt::Layer::new().with_filter(subscriber_filter))
608 .try_init()
609 } else {
610 subscriber.try_init()
611 }
612 } else if let Some(stdout) = traces.stdout {
613 let subscriber_filter = filter.clone_with_level(stdout.level.unwrap_or_default());
614 subscriber
615 .with(tracing_subscriber::fmt::Layer::new().with_filter(subscriber_filter))
616 .try_init()
617 } else {
618 subscriber.try_init()
619 }
620 } else if let Some(logs) = &self.logs
621 && let Ok((logger_provider, level)) =
622 logs.build_logger_provider(self.get_scope_attributes())
623 && level > TelemetryLevel::OFF
624 {
625 let logger_filter = filter.clone_with_level(level);
626 subscriber
627 .with(
628 opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge::new(
629 &logger_provider,
630 )
631 .with_filter(logger_filter),
632 )
633 .try_init()
634 } else {
635 subscriber.try_init()
636 }
637 }
638}
639
640impl Default for Observability {
641 fn default() -> Self {
642 Self {
643 attributes: HashMap::new(),
644 level: TelemetryLevel::default(),
645 metrics: Some(TelemetryMetrics::default()),
646 logs: Some(TelemetryData {
647 otlp: None,
648 stdout: Some(StdoutExporterCfg {
649 level: Some(TelemetryLevel::DEBUG),
650 }),
651 }),
652 traces: Some(TelemetryData {
653 otlp: None,
654 stdout: Some(StdoutExporterCfg {
655 level: Some(TelemetryLevel::DEBUG),
656 }),
657 }),
658 }
659 }
660}