use anyhow::Result;
use std::str::FromStr;
use tracing_opentelemetry::OpenTelemetryLayer;
use opentelemetry::{global, trace::TracerProvider as _};
use opentelemetry_sdk::{propagation::TraceContextPropagator, trace::TracerProvider};
pub use opentelemetry_semantic_conventions as semcov;
use tonic::{metadata::MetadataKey, service::Interceptor};
use tracing::Span;
pub use tracing_opentelemetry::OpenTelemetrySpanExt;
use tracing_subscriber::{
fmt::{self, format::FmtSpan},
layer::SubscriberExt,
util::SubscriberInitExt,
EnvFilter, Layer,
};
use self::trace_output_fmt::JsonWithTraceId;
pub mod trace_output_fmt;
pub use opentelemetry::global::shutdown_tracer_provider;
pub fn set_up_logging() -> Result<()> {
let otlp_enabled = std::env::var("NO_OTLP")
.unwrap_or_else(|_| "0".to_owned())
.as_str()
== "0";
global::set_text_map_propagator(TraceContextPropagator::new());
let provider = TracerProvider::builder()
.with_simple_exporter(opentelemetry_stdout::SpanExporter::default())
.build();
let basic_no_otlp_tracer = provider.tracer(env!("CARGO_PKG_NAME"));
let otlp_tracer = opentelemetry_otlp::new_pipeline()
.tracing()
.with_trace_config(opentelemetry_sdk::trace::config())
.with_exporter(opentelemetry_otlp::new_exporter().tonic())
.install_batch(opentelemetry_sdk::runtime::TokioCurrentThread)?;
let tracer = match otlp_enabled {
true => otlp_tracer,
false => basic_no_otlp_tracer,
};
let opentelemetry: OpenTelemetryLayer<_, _> = tracing_opentelemetry::layer()
.with_error_fields_to_exceptions(true)
.with_error_records_to_exceptions(true)
.with_tracer(tracer);
let fmt_layer = fmt::Layer::default().json().event_format(JsonWithTraceId);
let pretty_fmt_layer = fmt::Layer::default()
.pretty()
.with_span_events(FmtSpan::NONE);
let pretty_logs = std::env::var("PRETTY_LOGS")
.map(|e| &e == "1")
.unwrap_or_else(|_| !otlp_enabled);
let layers = match pretty_logs {
false => opentelemetry.and_then(fmt_layer).boxed(),
true => opentelemetry.and_then(pretty_fmt_layer).boxed(),
};
let tracing_registry = tracing_subscriber::registry()
.with(layers.with_filter(
EnvFilter::try_from_default_env().unwrap_or_else(|_| {
EnvFilter::try_new("info").expect("hard-coded default directive should be valid")
}),
));
#[cfg(feature = "tokio-console")]
let tracing_registry = tracing_registry.with(console_subscriber::spawn());
tracing_registry.try_init()?;
Ok(())
}
#[derive(Clone)]
pub struct GrpcInterceptor;
impl Interceptor for GrpcInterceptor {
fn call(&mut self, mut req: tonic::Request<()>) -> Result<tonic::Request<()>, tonic::Status> {
let context = Span::current().context();
opentelemetry::global::get_text_map_propagator(|propagator| {
propagator.inject_context(&context, &mut MetadataInjector(req.metadata_mut()));
});
Ok(req)
}
}
pub struct MetadataInjector<'a>(&'a mut tonic::metadata::MetadataMap);
impl<'a> opentelemetry::propagation::Injector for MetadataInjector<'a> {
fn set(&mut self, key: &str, value: String) {
if let Ok(key) = MetadataKey::from_str(key) {
if let Ok(val) = value.parse() {
self.0.insert(key, val);
}
}
}
}
pub type InterceptedGrpcService =
tonic::codegen::InterceptedService<tonic::transport::Channel, GrpcInterceptor>;
#[cfg(feature = "tower")]
pub use tower_tracing::*;
#[cfg(feature = "tower")]
pub mod tower_tracing {
use std::task::{Context, Poll};
use http::Request;
use opentelemetry::{
global,
propagation::{Extractor, Injector},
};
use tower::{Layer, Service};
use tracing::trace;
use tracing_opentelemetry::OpenTelemetrySpanExt;
pub struct TracingLayer;
impl<S> Layer<S> for TracingLayer {
type Service = TracingService<S>;
fn layer(&self, service: S) -> Self::Service {
TracingService { service }
}
}
#[derive(Clone, Debug)]
pub struct TracingService<S> {
service: S,
}
impl<S, BodyType> Service<http::Request<BodyType>> for TracingService<S>
where
S: Service<http::Request<BodyType>>,
BodyType: std::fmt::Debug,
{
type Response = S::Response;
type Error = S::Error;
type Future = S::Future;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.service.poll_ready(cx)
}
fn call(&mut self, mut request: Request<BodyType>) -> Self::Future {
let old_headers = request.headers().clone();
let context = tracing::Span::current().context();
global::get_text_map_propagator(|propagator| {
propagator.inject_context(&context, &mut HeaderInjector(request.headers_mut()))
});
trace!(
"
--------------------------------------------------------------
old headers:
{:#?}
new headers:
{:#?}
-----------------------------------------------",
old_headers,
request.headers()
);
self.service.call(request)
}
}
pub fn extract_trace_context<BodyType>(request: Request<BodyType>) -> Request<BodyType>
where
BodyType: std::fmt::Debug,
{
let parent_context = global::get_text_map_propagator(|propagator| {
propagator.extract(&HeaderExtractor(request.headers()))
});
trace!("parent context (extraction): {:#?}", parent_context);
tracing::Span::current().set_parent(parent_context);
request
}
struct HeaderInjector<'a>(pub &'a mut http::HeaderMap);
impl<'a> Injector for HeaderInjector<'a> {
fn set(&mut self, key: &str, value: String) {
println!("In Header Injector set function!!");
trace!("setting key: {}, to value: {}", key, value);
trace!("old self.0: {:?}", self.0);
if let Ok(name) = http::header::HeaderName::from_bytes(key.as_bytes()) {
if let Ok(val) = http::header::HeaderValue::from_str(&value) {
self.0.insert(name, val);
}
}
trace!("new self.0: {:?}", self.0);
}
}
struct HeaderExtractor<'a>(pub &'a http::HeaderMap);
impl<'a> Extractor for HeaderExtractor<'a> {
fn get(&self, key: &str) -> Option<&str> {
self.0.get(key).and_then(|value| value.to_str().ok())
}
fn keys(&self) -> Vec<&str> {
self.0
.keys()
.map(|value| value.as_str())
.collect::<Vec<_>>()
}
}
}