use opentelemetry::{
global::{self, BoxedTracer},
propagation::TextMapCompositePropagator,
trace::{TraceContextExt, Tracer, TracerProvider},
InstrumentationScope, KeyValue,
};
use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
use opentelemetry_otlp::{
tonic_types::metadata::MetadataMap, LogExporter, MetricExporter, Protocol, SpanExporter,
};
use opentelemetry_otlp::{WithExportConfig, WithTonicConfig};
use opentelemetry_sdk::{
logs::SdkLoggerProvider,
metrics::SdkMeterProvider,
propagation::{BaggagePropagator, TraceContextPropagator},
trace::{Config, SdkTracerProvider},
};
use opentelemetry_sdk::{metrics::Temporality, Resource};
use parking_lot::RwLock;
use std::{
error::Error,
str::FromStr,
sync::{Arc, OnceLock},
time::Duration,
};
use tracing::{info, Subscriber};
use tracing_opentelemetry::OpenTelemetryLayer;
use tracing_subscriber::EnvFilter;
use tracing_subscriber::{
filter::{LevelFilter, Targets},
reload,
};
use tracing_subscriber::{prelude::*, Registry};
lazy_static! {
static ref TRACE_RELOAD_HANDLE: Arc<RwLock<Option<reload::Handle<EnvFilter, tracing_subscriber::Registry>>>> = Arc::new(RwLock::new(None));
}
pub struct OpenTelemetryMgr {
pub tracer: Option<SdkTracerProvider>,
pub meter_provider: Option<SdkMeterProvider>,
pub logger_provider: Option<SdkLoggerProvider>,
pub server_name: String,
pub version: String,
pub endpoint: String,
}
impl OpenTelemetryMgr {
pub fn new(server_name: &str, version: &str, endpoint: &str) -> Self {
OpenTelemetryMgr {
tracer: None,
meter_provider: None,
logger_provider: None,
server_name: server_name.to_string(),
version: version.to_string(),
endpoint: endpoint.to_string(),
}
}
pub fn init_logs(&mut self) {
let exporter = LogExporter::builder()
.with_http()
.with_protocol(Protocol::HttpBinary)
.with_endpoint(self.endpoint.clone() + "/v1/logs")
.build()
.expect("Failed to create log exporter");
let logger_provider = SdkLoggerProvider::builder()
.with_batch_exporter(exporter)
.with_resource(get_resource(&self.server_name, &self.version))
.build();
self.logger_provider = Some(logger_provider);
}
pub fn init_tracing(&mut self) {
let baggage_propagator = BaggagePropagator::new();
let trace_context_propagator = TraceContextPropagator::new();
let composite_propagator = TextMapCompositePropagator::new(vec![
Box::new(baggage_propagator),
Box::new(trace_context_propagator),
]);
global::set_text_map_propagator(composite_propagator);
let exporter = SpanExporter::builder()
.with_http()
.with_protocol(Protocol::HttpBinary) .with_endpoint(self.endpoint.clone() + "/v1/traces")
.with_timeout(Duration::from_secs(3))
.build()
.expect("Failed to create trace exporter");
let provider = SdkTracerProvider::builder()
.with_batch_exporter(exporter)
.with_resource(get_resource(&self.server_name, &self.version))
.build();
global::set_tracer_provider(provider.clone());
global::tracer_provider().tracer("pi_logger");
self.tracer = Some(provider);
}
pub fn init_metrics(&mut self) {
let exporter = MetricExporter::builder()
.with_http()
.with_protocol(Protocol::HttpBinary) .with_endpoint(self.endpoint.clone() + "/v1/metrics")
.with_temporality(Temporality::LowMemory)
.with_timeout(Duration::from_secs(3))
.build()
.expect("Failed to create metric exporter");
let provider = SdkMeterProvider::builder()
.with_periodic_exporter(exporter)
.with_resource(get_resource(&self.server_name, &self.version))
.build();
global::set_meter_provider(provider.clone());
self.meter_provider = Some(provider);
}
pub fn subscriber(&mut self) {
let filter_otel = EnvFilter::new(std::env::var("RUST_LOG").unwrap_or("info".to_string()))
.add_directive("hyper=off".parse().unwrap())
.add_directive("tonic=off".parse().unwrap())
.add_directive("h2=off".parse().unwrap())
.add_directive("reqwest=off".parse().unwrap());
let (filter, reload_handle) = reload::Layer::new(filter_otel);
*TRACE_RELOAD_HANDLE.write() = Some(reload_handle);
let subscriber = Registry::default() .with(filter)
.with(
tracing_opentelemetry::layer()
.with_tracer(self.tracer.clone().unwrap().tracer("pi_logger")),
);
tracing::subscriber::set_global_default(subscriber);
}
}
fn get_resource(server_name: &str, version: &str) -> Resource {
static RESOURCE: OnceLock<Resource> = OnceLock::new();
RESOURCE
.get_or_init(|| {
Resource::builder()
.with_service_name(server_name.to_string())
.with_attribute(KeyValue::new("service.version", version.to_string()))
.build()
})
.clone()
}
pub fn reload_trace_level(log: &str) -> Result<(), Box<dyn std::error::Error>> {
println!("reload_trace_level:{:?}", log);
let filter = EnvFilter::try_new(log)?;
if let Some(handle) = TRACE_RELOAD_HANDLE.write().as_ref() {
handle.modify(|filter_tmp| {
*filter_tmp = filter;
});
};
Ok(())
}
pub fn init(server_name: &str, version: &str, endpoint: &str) -> Option<OpenTelemetryMgr> {
if TRACE_RELOAD_HANDLE.read().is_none() {
let mut mgr = OpenTelemetryMgr::new(server_name, version, endpoint);
mgr.init_logs();
mgr.init_tracing();
mgr.init_metrics();
mgr.subscriber();
Some(mgr)
} else {
None
}
}
pub fn is_init() -> bool {
TRACE_RELOAD_HANDLE.read().is_some()
}
pub fn create_baggage() -> TextMapCompositePropagator {
let baggage_propagator = BaggagePropagator::new();
let trace_context_propagator = TraceContextPropagator::new();
TextMapCompositePropagator::new(vec![
Box::new(baggage_propagator),
Box::new(trace_context_propagator),
])
}
#[cfg(test)]
mod tests {
use super::*;
use tracing::info;
#[test]
fn test_trace() {
std::env::set_var("RUST_LOG", "info");
let mgr = init("test", "1.0.0", "http://127.0.0.1:4318").unwrap();
{
let main_span = tracing::info_span!("Main operation");
let _enter = main_span.enter();
tracing::event!(name: "Nice operation!", tracing::Level::INFO, some.key = 100);
info!(target: "my-target", "hello from {}. My price is {}. I am also inside a Span!", "banana", 2.99);
let sub_span = tracing::info_span!("Sub span event");
let _enter = sub_span.enter();
}
mgr.logger_provider.clone().unwrap().shutdown();
mgr.meter_provider.clone().unwrap().shutdown();
mgr.tracer.clone().unwrap().shutdown();
}
}