near_o11y/
opentelemetry.rs

1use crate::reload::TracingLayer;
2use near_crypto::PublicKey;
3use near_primitives_core::types::AccountId;
4use opentelemetry::KeyValue;
5use opentelemetry_sdk::Resource;
6use opentelemetry_sdk::trace::{self, RandomIdGenerator, Sampler};
7use opentelemetry_semantic_conventions::resource::SERVICE_NAME;
8use tracing::level_filters::LevelFilter;
9use tracing_subscriber::filter::targets::Targets;
10use tracing_subscriber::layer::SubscriberExt;
11use tracing_subscriber::registry::LookupSpan;
12use tracing_subscriber::{Layer, reload};
13
14// Doesn't define WARN and ERROR, because the highest verbosity of spans is INFO.
15#[derive(Copy, Clone, Debug, Default, clap::ValueEnum)]
16pub enum OpenTelemetryLevel {
17    #[default]
18    OFF,
19    INFO,
20    DEBUG,
21    TRACE,
22}
23
24/// Constructs an OpenTelemetryConfig which sends span data to an external collector.
25//
26// NB: this function is `async` because `install_batch(Tokio)` requires a tokio context to
27// register timers and channels and whatnot.
28pub(crate) async fn add_opentelemetry_layer<S>(
29    opentelemetry_level: OpenTelemetryLevel,
30    chain_id: String,
31    node_public_key: PublicKey,
32    account_id: Option<AccountId>,
33    subscriber: S,
34) -> (TracingLayer<S>, reload::Handle<Targets, S>)
35where
36    S: tracing::Subscriber + for<'span> LookupSpan<'span> + Send + Sync,
37{
38    let filter = get_opentelemetry_filter(opentelemetry_level);
39    let (filter, handle) = reload::Layer::<Targets, S>::new(filter);
40
41    let mut resource = vec![
42        KeyValue::new("chain_id", chain_id),
43        KeyValue::new("node_id", node_public_key.to_string()),
44    ];
45    // Prefer account name as the node name.
46    // Fallback to a node public key if a validator key is unavailable.
47    let service_name = if let Some(account_id) = account_id {
48        resource.push(KeyValue::new("account_id", account_id.to_string()));
49        format!("neard:{}", account_id)
50    } else {
51        format!("neard:{}", node_public_key)
52    };
53    resource.push(KeyValue::new(SERVICE_NAME, service_name));
54
55    let overriding_vars = ["OTEL_BSP_MAX_CONCURRENT_EXPORTS", "OTEL_BSP_MAX_QUEUE_SIZE"];
56    let batch_config = if overriding_vars.iter().any(|v| std::env::var_os(v).is_some()) {
57        opentelemetry_sdk::trace::BatchConfigBuilder::default()
58    } else {
59        opentelemetry_sdk::trace::BatchConfigBuilder::default()
60            .with_max_concurrent_exports(2)
61            .with_max_queue_size(4096)
62    }
63    .build();
64    let tracer = opentelemetry_otlp::new_pipeline()
65        .tracing()
66        .with_exporter(opentelemetry_otlp::new_exporter().tonic())
67        .with_trace_config(
68            trace::config()
69                .with_sampler(Sampler::AlwaysOn)
70                .with_id_generator(RandomIdGenerator::default())
71                .with_resource(Resource::new(resource)),
72        )
73        .with_batch_config(batch_config)
74        .install_batch(opentelemetry_sdk::runtime::Tokio)
75        .unwrap();
76    let layer = tracing_opentelemetry::layer().with_tracer(tracer).with_filter(filter);
77    (subscriber.with(layer), handle)
78}
79
80pub(crate) fn get_opentelemetry_filter(opentelemetry_level: OpenTelemetryLevel) -> Targets {
81    Targets::new().with_default(match opentelemetry_level {
82        OpenTelemetryLevel::OFF => LevelFilter::OFF,
83        OpenTelemetryLevel::INFO => LevelFilter::INFO,
84        OpenTelemetryLevel::DEBUG => LevelFilter::DEBUG,
85        OpenTelemetryLevel::TRACE => LevelFilter::TRACE,
86    })
87}