avantis_utils/
telemetry.rs

1use gethostname::gethostname;
2use opentelemetry::global::set_text_map_propagator;
3use opentelemetry::sdk::propagation::TraceContextPropagator;
4use opentelemetry::sdk::trace;
5use opentelemetry::sdk::Resource;
6use opentelemetry::KeyValue;
7use opentelemetry_otlp::WithExportConfig;
8use serde::Deserialize;
9use std::collections::HashSet;
10use thiserror::Error;
11use tracing::info;
12use tracing::subscriber::set_global_default;
13use tracing::Subscriber;
14use tracing_bunyan_formatter::{BunyanFormattingLayer, JsonStorageLayer};
15use tracing_log::LogTracer;
16use tracing_subscriber::filter::FilterFn;
17use tracing_subscriber::layer::SubscriberExt;
18use tracing_subscriber::Layer;
19use tracing_subscriber::{EnvFilter, Registry};
20
21#[derive(Deserialize, Clone, PartialEq, Eq, Debug)]
22pub struct TelemetrySetting {
23    pub otel_collector_endpoint: String,
24    pub disabled_targets: HashSet<String>,
25    pub log_level: String,
26}
27
28impl TelemetrySetting {
29    fn log_level_filter<S>(&self) -> impl Layer<S>
30    where
31        S: Subscriber,
32    {
33        EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(self.log_level.clone()))
34    }
35
36    fn bunyan_formatter<S>(&self, service_name: &'static str) -> impl Layer<S>
37    where
38        S: Subscriber + for<'span> tracing_subscriber::registry::LookupSpan<'span>,
39    {
40        BunyanFormattingLayer::new(service_name.to_string(), std::io::stdout)
41    }
42
43    fn disable_targets_filter<S>(&self) -> impl Layer<S>
44    where
45        S: Subscriber,
46    {
47        let disabled_targets = self.disabled_targets.clone();
48        FilterFn::new(move |metadata| !disabled_targets.contains(metadata.target()))
49    }
50
51    fn tracer<S>(&self, service_name: &'static str) -> impl Layer<S>
52    where
53        S: Subscriber + for<'span> tracing_subscriber::registry::LookupSpan<'span>,
54    {
55        let tracer = opentelemetry_otlp::new_pipeline()
56            .tracing()
57            .with_trace_config(trace::config().with_resource(Resource::new(vec![
58                KeyValue::new("service.name", service_name),
59                KeyValue::new("host.name", gethostname().into_string().unwrap()),
60            ])))
61            .with_exporter(
62                opentelemetry_otlp::new_exporter()
63                    .tonic()
64                    .with_endpoint(self.otel_collector_endpoint.clone()),
65            )
66            .install_batch(opentelemetry::runtime::Tokio)
67            .unwrap();
68
69        tracing_opentelemetry::layer().with_tracer(tracer)
70    }
71
72    fn subscriber(&self, service_name: &'static str) -> impl Subscriber {
73        Registry::default()
74            .with(self.log_level_filter())
75            .with(self.disable_targets_filter())
76            .with(JsonStorageLayer)
77            .with(self.bunyan_formatter(service_name))
78            .with(self.tracer(service_name))
79    }
80
81    pub fn init_telemetry(&self, service_name: &'static str) -> Result<(), Error> {
82        LogTracer::init().map_err(|_| Error::TelemetryAlreadyInit)?;
83        set_text_map_propagator(TraceContextPropagator::new());
84        set_global_default(self.subscriber(service_name))
85            .map_err(|_| Error::TelemetryAlreadyInit)?;
86
87        info!(
88            "initializing telemetry with log level [{}]: Done",
89            self.log_level
90        );
91
92        Ok(())
93    }
94}
95
96#[derive(Error, Debug)]
97pub enum Error {
98    #[error("telemetry already initialized")]
99    TelemetryAlreadyInit,
100}