avantis_utils/
telemetry.rs1use gethostname::gethostname;
2use opentelemetry::global::set_text_map_propagator;
3use opentelemetry::sdk::propagation::TraceContextPropagator;
4use opentelemetry::sdk::trace;
5use opentelemetry::sdk::Resource;
6use opentelemetry::KeyValue;
7use opentelemetry_otlp::WithExportConfig;
8use serde::Deserialize;
9use std::collections::HashSet;
10use thiserror::Error;
11use tracing::info;
12use tracing::subscriber::set_global_default;
13use tracing::Subscriber;
14use tracing_bunyan_formatter::{BunyanFormattingLayer, JsonStorageLayer};
15use tracing_log::LogTracer;
16use tracing_subscriber::filter::FilterFn;
17use tracing_subscriber::layer::SubscriberExt;
18use tracing_subscriber::Layer;
19use tracing_subscriber::{EnvFilter, Registry};
20
21#[derive(Deserialize, Clone, PartialEq, Eq, Debug)]
22pub struct TelemetrySetting {
23 pub otel_collector_endpoint: String,
24 pub disabled_targets: HashSet<String>,
25 pub log_level: String,
26}
27
28impl TelemetrySetting {
29 fn log_level_filter<S>(&self) -> impl Layer<S>
30 where
31 S: Subscriber,
32 {
33 EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(self.log_level.clone()))
34 }
35
36 fn bunyan_formatter<S>(&self, service_name: &'static str) -> impl Layer<S>
37 where
38 S: Subscriber + for<'span> tracing_subscriber::registry::LookupSpan<'span>,
39 {
40 BunyanFormattingLayer::new(service_name.to_string(), std::io::stdout)
41 }
42
43 fn disable_targets_filter<S>(&self) -> impl Layer<S>
44 where
45 S: Subscriber,
46 {
47 let disabled_targets = self.disabled_targets.clone();
48 FilterFn::new(move |metadata| !disabled_targets.contains(metadata.target()))
49 }
50
51 fn tracer<S>(&self, service_name: &'static str) -> impl Layer<S>
52 where
53 S: Subscriber + for<'span> tracing_subscriber::registry::LookupSpan<'span>,
54 {
55 let tracer = opentelemetry_otlp::new_pipeline()
56 .tracing()
57 .with_trace_config(trace::config().with_resource(Resource::new(vec![
58 KeyValue::new("service.name", service_name),
59 KeyValue::new("host.name", gethostname().into_string().unwrap()),
60 ])))
61 .with_exporter(
62 opentelemetry_otlp::new_exporter()
63 .tonic()
64 .with_endpoint(self.otel_collector_endpoint.clone()),
65 )
66 .install_batch(opentelemetry::runtime::Tokio)
67 .unwrap();
68
69 tracing_opentelemetry::layer().with_tracer(tracer)
70 }
71
72 fn subscriber(&self, service_name: &'static str) -> impl Subscriber {
73 Registry::default()
74 .with(self.log_level_filter())
75 .with(self.disable_targets_filter())
76 .with(JsonStorageLayer)
77 .with(self.bunyan_formatter(service_name))
78 .with(self.tracer(service_name))
79 }
80
81 pub fn init_telemetry(&self, service_name: &'static str) -> Result<(), Error> {
82 LogTracer::init().map_err(|_| Error::TelemetryAlreadyInit)?;
83 set_text_map_propagator(TraceContextPropagator::new());
84 set_global_default(self.subscriber(service_name))
85 .map_err(|_| Error::TelemetryAlreadyInit)?;
86
87 info!(
88 "initializing telemetry with log level [{}]: Done",
89 self.log_level
90 );
91
92 Ok(())
93 }
94}
95
96#[derive(Error, Debug)]
97pub enum Error {
98 #[error("telemetry already initialized")]
99 TelemetryAlreadyInit,
100}