1use opentelemetry::{KeyValue, trace::TracerProvider as _};
4use opentelemetry_otlp::{
5 ExportConfig, ExporterBuildError, Protocol, WithExportConfig, WithHttpConfig,
6};
7use opentelemetry_sdk::{
8 logs::SdkLoggerProvider,
9 metrics::SdkMeterProvider,
10 trace::{SdkTracerProvider, Tracer},
11};
12use serde::{Deserialize, Serialize};
13use std::{collections::HashMap, time::Duration};
14use tracing_subscriber::{filter, prelude::*};
15use tracing_subscriber::{layer::SubscriberExt, util::TryInitError};
16use url::Url;
17
18use crate::config::url_authentication;
19
20use super::tracing::{TelemetryFilter, TelemetryLevel};
21
22#[derive(Debug, Deserialize, Serialize, Clone)]
24pub(crate) struct OTLPExporterCfg {
25 pub(crate) level: Option<TelemetryLevel>,
26 endpoint: Url,
27 #[serde(skip_serializing)]
28 timeout_sec: Option<u64>,
29}
30
31impl OTLPExporterCfg {
32 pub(crate) fn get_protocol(&self) -> Protocol {
33 match self.endpoint.scheme().to_lowercase().as_str() {
34 "grpc" => Protocol::Grpc,
35 "http/json" => Protocol::HttpJson,
36 _ => Protocol::HttpBinary,
37 }
38 }
39
40 pub(crate) fn get_header(&self) -> HashMap<String, String> {
41 let mut headers = HashMap::with_capacity(1);
42 if let Some(authorization) = url_authentication(&self.endpoint) {
43 headers.insert("Authorization".to_string(), authorization);
44 }
45 headers
46 }
47
48 pub(crate) fn get_resource(
49 &self,
50 attr: Vec<KeyValue>,
51 ) -> opentelemetry_sdk::resource::Resource {
52 opentelemetry_sdk::resource::Resource::builder()
53 .with_attributes(attr)
54 .with_attribute(opentelemetry::KeyValue::new(
55 "process.creation.time",
56 chrono::Utc::now().to_rfc3339(),
57 ))
58 .with_attribute(opentelemetry::KeyValue::new(
59 "process.pid",
60 opentelemetry::Value::I64(std::process::id() as i64),
61 ))
62 .build()
63 }
64}
65
66impl From<OTLPExporterCfg> for ExportConfig {
67 fn from(value: OTLPExporterCfg) -> Self {
68 let protocol = value.get_protocol();
69 let mut endpoint = value.endpoint;
70 if !endpoint.username().is_empty() {
71 let _ = endpoint.set_username("");
72 }
73 if endpoint.password().is_some() {
74 let _ = endpoint.set_password(None);
75 }
76
77 ExportConfig {
78 endpoint: Some(endpoint.to_string()),
79 timeout: value.timeout_sec.map(Duration::from_secs),
80 protocol,
81 }
82 }
83}
84
85impl Default for OTLPExporterCfg {
86 fn default() -> Self {
87 Self {
88 level: None,
89 endpoint: Url::parse("grpc://localhost:4317").unwrap(),
90 timeout_sec: None,
91 }
92 }
93}
94
95#[cfg(feature = "config-observability-prometheus")]
96#[derive(Default, Debug, Deserialize, Serialize, Clone)]
98pub struct PrometheusExporterCfg {
99 endpoint: Option<String>,
100}
101
102#[cfg(feature = "config-observability-prometheus")]
103impl PrometheusExporterCfg {
104 pub(crate) fn init_prometheus_server(
106 &self,
107 registry: &prometheus::Registry,
108 ) -> Result<(), ExporterBuildError> {
109 if let Some(endpoint) = self.endpoint.clone() {
110 let registry = registry.clone();
111 tokio::task::spawn(async move {
112 match tokio::net::TcpListener::bind(endpoint).await {
113 Ok(listener) => loop {
114 if let Ok((stream, _)) = listener.accept().await {
115 let io = hyper_util::rt::TokioIo::new(stream);
116 let registry = registry.clone();
117 tokio::task::spawn(async move {
118 if let Err(err) = hyper::server::conn::http1::Builder::new()
119 .serve_connection(
120 io,
121 hyper::service::service_fn(|_req| {
122 let registry = registry.clone();
123 async move {
124 let metric_families = registry.gather();
125 let encoder = prometheus::TextEncoder::new();
126 if let Ok(metric_data) =
127 encoder.encode_to_string(&metric_families)
128 {
129 Ok(hyper::Response::new(
130 http_body_util::Full::new(
131 bytes::Bytes::from(metric_data),
132 ),
133 ))
134 } else {
135 Err("Can't serialize metrics")
136 }
137 }
138 }),
139 )
140 .await
141 {
142 log::debug!(target: "prosa::observability::prometheus_server", "Error serving prometheus connection: {err:?}");
143 }
144 });
145 }
146 },
147 Err(e) => {
148 log::error!(target: "prosa::observability::prometheus_server", "Failed to bind Prometheus metrics server: {e}");
149 }
150 }
151 });
152 }
153
154 Ok(())
155 }
156
157 pub(crate) fn get_resource(
158 &self,
159 attr: Vec<KeyValue>,
160 ) -> opentelemetry_sdk::resource::Resource {
161 opentelemetry_sdk::resource::Resource::builder()
162 .with_attributes(attr)
163 .build()
164 }
165}
166
167#[derive(Default, Debug, Deserialize, Serialize, Copy, Clone)]
169pub(crate) struct StdoutExporterCfg {
170 #[serde(default)]
171 pub(crate) level: Option<TelemetryLevel>,
172}
173
174#[derive(Default, Debug, Deserialize, Serialize, Clone)]
176pub struct TelemetryMetrics {
177 otlp: Option<OTLPExporterCfg>,
178 #[cfg(feature = "config-observability-prometheus")]
179 prometheus: Option<PrometheusExporterCfg>,
180 stdout: Option<StdoutExporterCfg>,
181}
182
183impl TelemetryMetrics {
184 fn build_provider(
186 &self,
187 #[cfg(feature = "config-observability-prometheus")] resource_attr: Vec<KeyValue>,
188 #[cfg(feature = "config-observability-prometheus")] registry: &prometheus::Registry,
189 ) -> Result<SdkMeterProvider, ExporterBuildError> {
190 let mut meter_provider = SdkMeterProvider::builder();
191 if let Some(s) = &self.otlp {
192 let exporter = if s.get_protocol() == Protocol::Grpc {
193 opentelemetry_otlp::MetricExporter::builder()
194 .with_tonic()
195 .with_export_config(s.clone().into())
196 .build()
197 } else {
198 opentelemetry_otlp::MetricExporter::builder()
199 .with_http()
200 .with_headers(s.get_header())
201 .with_export_config(s.clone().into())
202 .build()
203 }?;
204 meter_provider = meter_provider.with_periodic_exporter(exporter);
205 }
206
207 #[cfg(feature = "config-observability-prometheus")]
208 if let Some(prom) = &self.prometheus {
209 let exporter = opentelemetry_prometheus::exporter()
211 .with_registry(registry.clone())
212 .with_resource_selector(opentelemetry_prometheus::ResourceSelector::All)
213 .without_target_info()
214 .build()
215 .map_err(|e| ExporterBuildError::InternalFailure(e.to_string()))?;
216 meter_provider = meter_provider
217 .with_resource(prom.get_resource(resource_attr))
218 .with_reader(exporter);
219
220 prom.init_prometheus_server(registry)?;
222 }
223
224 if self.stdout.is_some() {
225 let exporter = opentelemetry_stdout::MetricExporter::default();
226 meter_provider = meter_provider.with_periodic_exporter(exporter);
227 }
228
229 Ok(meter_provider.build())
230 }
231}
232
233#[derive(Debug, Deserialize, Serialize, Clone)]
235pub struct TelemetryData {
236 otlp: Option<OTLPExporterCfg>,
237 stdout: Option<StdoutExporterCfg>,
238}
239
240impl TelemetryData {
241 fn get_max_level(&self) -> TelemetryLevel {
243 if let Some(otlp_level) = self.otlp.as_ref().and_then(|o| o.level) {
244 if let Some(stdout_level) = self.stdout.as_ref().and_then(|l| l.level) {
245 if otlp_level > stdout_level {
246 otlp_level
247 } else {
248 stdout_level
249 }
250 } else {
251 otlp_level
252 }
253 } else if let Some(stdout_level) = self.stdout.as_ref().and_then(|l| l.level) {
254 stdout_level
255 } else {
256 TelemetryLevel::TRACE
257 }
258 }
259
260 fn build_logger_provider(
262 &self,
263 resource_attr: Vec<KeyValue>,
264 ) -> Result<(SdkLoggerProvider, TelemetryLevel), ExporterBuildError> {
265 let logs_provider = SdkLoggerProvider::builder();
266 if let Some(s) = &self.otlp {
267 let exporter = if s.get_protocol() == Protocol::Grpc {
268 opentelemetry_otlp::LogExporter::builder()
269 .with_tonic()
270 .with_export_config(s.clone().into())
271 .build()
272 } else {
273 opentelemetry_otlp::LogExporter::builder()
274 .with_http()
275 .with_headers(s.get_header())
276 .with_export_config(s.clone().into())
277 .build()
278 }?;
279 Ok((
280 logs_provider
281 .with_resource(s.get_resource(resource_attr))
282 .with_batch_exporter(exporter)
283 .build(),
284 s.level.unwrap_or_default(),
285 ))
286 } else if let Some(stdout) = &self.stdout {
287 Ok((
288 logs_provider
289 .with_simple_exporter(opentelemetry_stdout::LogExporter::default())
290 .build(),
291 stdout.level.unwrap_or_default(),
292 ))
293 } else {
294 Ok((logs_provider.build(), TelemetryLevel::OFF))
295 }
296 }
297
298 fn build_tracer_provider(
300 &self,
301 resource_attr: Vec<KeyValue>,
302 ) -> Result<SdkTracerProvider, ExporterBuildError> {
303 let mut trace_provider = SdkTracerProvider::builder();
304 if let Some(s) = &self.otlp {
305 let exporter = if s.get_protocol() == Protocol::Grpc {
306 opentelemetry_otlp::SpanExporter::builder()
307 .with_tonic()
308 .with_export_config(s.clone().into())
309 .build()
310 } else {
311 opentelemetry_otlp::SpanExporter::builder()
312 .with_http()
313 .with_headers(s.get_header())
314 .with_export_config(s.clone().into())
315 .build()
316 }?;
317
318 trace_provider = trace_provider
319 .with_resource(s.get_resource(resource_attr))
320 .with_batch_exporter(exporter);
321 }
322
323 Ok(trace_provider.build())
324 }
325
326 fn build_tracer(
328 &self,
329 name: &str,
330 resource_attr: Vec<KeyValue>,
331 ) -> Result<Tracer, ExporterBuildError> {
332 self.build_tracer_provider(resource_attr)
333 .map(|p| p.tracer(name.to_string()))
334 }
335}
336
337impl Default for TelemetryData {
338 fn default() -> Self {
339 TelemetryData {
340 otlp: None,
341 stdout: Some(StdoutExporterCfg::default()),
342 }
343 }
344}
345
346#[derive(Debug, Deserialize, Serialize, Clone)]
365pub struct Observability {
366 service_name: Option<String>,
368 #[serde(default)]
370 attributes: HashMap<String, String>,
371 #[serde(default)]
373 level: TelemetryLevel,
374 metrics: Option<TelemetryMetrics>,
376 logs: Option<TelemetryData>,
378 traces: Option<TelemetryData>,
380}
381
382impl Observability {
383 pub(crate) fn common_scope_attributes(service_name: String, capacity: usize) -> Vec<KeyValue> {
384 let mut scope_attributes = Vec::with_capacity(capacity + 3);
385 scope_attributes.push(KeyValue::new("service.name", service_name));
386
387 match std::env::consts::ARCH {
388 "x86_64" => scope_attributes.push(KeyValue::new("host.arch", "amd64")),
389 "aarch64" => scope_attributes.push(KeyValue::new("host.arch", "arm64")),
390 "arm" => scope_attributes.push(KeyValue::new("host.arch", "arm32")),
391 _ => {}
392 }
393
394 match std::env::consts::OS {
395 "linux" => scope_attributes.push(KeyValue::new("os.type", "linux")),
396 "macos" => scope_attributes.push(KeyValue::new("os.type", "darwin")),
397 "freebsd" => scope_attributes.push(KeyValue::new("os.type", "freebsd")),
398 "openbsd" => scope_attributes.push(KeyValue::new("os.type", "openbsd")),
399 "netbsd" => scope_attributes.push(KeyValue::new("os.type", "netbsd")),
400 "windows" => scope_attributes.push(KeyValue::new("os.type", "windows")),
401 _ => {}
402 }
403
404 scope_attributes
405 }
406
407 pub fn new(level: TelemetryLevel) -> Observability {
409 Observability {
410 service_name: None,
411 attributes: HashMap::new(),
412 level,
413 metrics: Some(TelemetryMetrics::default()),
414 logs: Some(TelemetryData::default()),
415 traces: Some(TelemetryData::default()),
416 }
417 }
418
419 pub fn set_prosa_name(&mut self, name: &str) {
421 if self.service_name.is_none() {
422 self.service_name = Some(name.to_string());
423 }
424 }
425
426 pub fn get_scope_attributes(&self) -> Vec<KeyValue> {
428 let mut scope_attr = if let Some(service_name) = self.attributes.get("service.name") {
430 Self::common_scope_attributes(service_name.clone(), self.attributes.len() + 2)
431 } else {
432 Self::common_scope_attributes(
433 self.service_name.clone().unwrap_or("prosa".to_string()),
434 self.attributes.len() + 2,
435 )
436 };
437
438 if !self.attributes.contains_key("host.name")
439 && let Some(hostname) = super::hostname()
440 {
441 scope_attr.push(KeyValue::new("host.name", hostname));
442 }
443
444 if !self.attributes.contains_key("service.version") {
445 scope_attr.push(KeyValue::new("service.version", env!("CARGO_PKG_VERSION")));
446 }
447
448 scope_attr.append(
450 self.attributes
451 .iter()
452 .map(|(k, v)| {
453 KeyValue::new(k.clone(), opentelemetry::Value::String(v.clone().into()))
454 })
455 .collect::<Vec<KeyValue>>()
456 .as_mut(),
457 );
458
459 scope_attr
460 }
461
462 pub fn get_logger_level(&self) -> TelemetryLevel {
464 if let Some(logs) = &self.logs {
465 let logs_level = logs.get_max_level();
466 if logs_level > self.level {
467 logs_level
468 } else {
469 self.level
470 }
471 } else {
472 self.level
473 }
474 }
475
476 #[cfg(feature = "config-observability-prometheus")]
478 pub fn build_meter_provider(&self, registry: &prometheus::Registry) -> SdkMeterProvider {
479 if let Some(settings) = &self.metrics {
480 settings
481 .build_provider(self.get_scope_attributes(), registry)
482 .unwrap_or_default()
483 } else {
484 SdkMeterProvider::default()
485 }
486 }
487
488 #[cfg(not(feature = "config-observability-prometheus"))]
490 pub fn build_meter_provider(&self) -> SdkMeterProvider {
491 if let Some(settings) = &self.metrics {
492 settings.build_provider().unwrap_or_default()
493 } else {
494 SdkMeterProvider::default()
495 }
496 }
497
498 pub fn build_logger_provider(&self) -> (SdkLoggerProvider, TelemetryLevel) {
500 if let Some(settings) = &self.logs {
501 match settings.build_logger_provider(self.get_scope_attributes()) {
502 Ok(m) => m,
503 Err(_) => (
504 SdkLoggerProvider::builder().build(),
505 TelemetryLevel::default(),
506 ),
507 }
508 } else {
509 (
510 SdkLoggerProvider::builder().build(),
511 TelemetryLevel::default(),
512 )
513 }
514 }
515
516 pub fn build_tracer_provider(&self) -> SdkTracerProvider {
528 if let Some(settings) = &self.traces {
529 settings
530 .build_tracer_provider(self.get_scope_attributes())
531 .unwrap_or_default()
532 } else {
533 SdkTracerProvider::default()
534 }
535 }
536
537 pub fn build_tracer(&self) -> Tracer {
548 if let Some(settings) = &self.traces {
549 match settings.build_tracer(
550 self.service_name.as_deref().unwrap_or("prosa"),
551 self.get_scope_attributes(),
552 ) {
553 Ok(m) => m,
554 Err(_) => SdkTracerProvider::default()
555 .tracer(self.service_name.clone().unwrap_or("prosa".to_string())),
556 }
557 } else {
558 SdkTracerProvider::default()
559 .tracer(self.service_name.clone().unwrap_or("prosa".to_string()))
560 }
561 }
562
563 pub fn tracing_init(&self, filter: &TelemetryFilter) -> Result<(), TryInitError> {
565 let global_level: filter::LevelFilter = self.level.into();
566 let subscriber = tracing_subscriber::registry().with(global_level);
567
568 if let Some(traces) = &self.traces {
569 if let Some(otlp) = &traces.otlp {
570 let tracer = self.build_tracer();
571 let subscriber_filter = filter.clone_with_level(otlp.level.unwrap_or_default());
572 let subscriber = subscriber.with(
573 tracing_opentelemetry::layer()
574 .with_tracer(tracer)
575 .with_filter(subscriber_filter),
576 );
577
578 if let Some(stdout) = traces.stdout {
579 let subscriber_filter =
580 filter.clone_with_level(stdout.level.unwrap_or_default());
581 subscriber
582 .with(tracing_subscriber::fmt::Layer::new().with_filter(subscriber_filter))
583 .try_init()
584 } else {
585 subscriber.try_init()
586 }
587 } else if let Some(stdout) = traces.stdout {
588 let subscriber_filter = filter.clone_with_level(stdout.level.unwrap_or_default());
589 subscriber
590 .with(tracing_subscriber::fmt::Layer::new().with_filter(subscriber_filter))
591 .try_init()
592 } else {
593 subscriber.try_init()
594 }
595 } else if let Some(logs) = &self.logs
596 && let Ok((logger_provider, level)) =
597 logs.build_logger_provider(self.get_scope_attributes())
598 && level > TelemetryLevel::OFF
599 {
600 let logger_filter = filter.clone_with_level(level);
601 subscriber
602 .with(
603 opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge::new(
604 &logger_provider,
605 )
606 .with_filter(logger_filter),
607 )
608 .try_init()
609 } else {
610 subscriber.try_init()
611 }
612 }
613}
614
615impl Default for Observability {
616 fn default() -> Self {
617 Self {
618 service_name: None,
619 attributes: HashMap::new(),
620 level: TelemetryLevel::default(),
621 metrics: Some(TelemetryMetrics::default()),
622 logs: Some(TelemetryData {
623 otlp: None,
624 stdout: Some(StdoutExporterCfg {
625 level: Some(TelemetryLevel::DEBUG),
626 }),
627 }),
628 traces: Some(TelemetryData {
629 otlp: None,
630 stdout: Some(StdoutExporterCfg {
631 level: Some(TelemetryLevel::DEBUG),
632 }),
633 }),
634 }
635 }
636}