opentelemetry_tracing_utils/
lib.rsuse anyhow::Result;
use std::str::FromStr;
use tracing_opentelemetry::OpenTelemetryLayer;
use opentelemetry::{global, propagation::TextMapCompositePropagator, trace::TracerProvider as _};
use opentelemetry_sdk::{
propagation::{BaggagePropagator, TraceContextPropagator},
trace::TracerProvider,
};
pub use opentelemetry_semantic_conventions as semcov;
use tonic::{metadata::MetadataKey, service::Interceptor};
use tracing::Span;
pub use tracing_opentelemetry::OpenTelemetrySpanExt;
use tracing_subscriber::{
fmt::{self, format::FmtSpan, TestWriter},
layer::SubscriberExt,
util::SubscriberInitExt,
EnvFilter, Layer,
};
use self::trace_output_fmt::JsonWithTraceId;
pub mod trace_output_fmt;
pub use opentelemetry::global::shutdown_tracer_provider;
pub fn set_up_logging() -> Result<()> {
LoggingSetupBuilder::new().build()
}
#[derive(Debug)]
pub struct LoggingSetupBuilder {
pub otlp_output_enabled: bool,
pub pretty_logs: bool,
pub use_test_writer: bool,
}
impl Default for LoggingSetupBuilder {
fn default() -> Self {
let otlp_enabled = std::env::var("NO_OTLP")
.unwrap_or_else(|_| "0".to_owned())
.as_str()
== "0";
let pretty_logs = std::env::var("PRETTY_LOGS")
.map(|e| &e == "1")
.unwrap_or_else(|_| !otlp_enabled);
Self {
otlp_output_enabled: otlp_enabled,
pretty_logs,
use_test_writer: false,
}
}
}
impl LoggingSetupBuilder {
pub fn new() -> Self {
Self::default()
}
pub fn build(&self) -> Result<()> {
let otlp_enabled = self.otlp_output_enabled;
let baggage_propagator = BaggagePropagator::new();
let trace_context_propagator = TraceContextPropagator::new();
let composite_propagator = TextMapCompositePropagator::new(vec![
Box::new(baggage_propagator),
Box::new(trace_context_propagator),
]);
global::set_text_map_propagator(composite_propagator);
let basic_no_otlp_tracer_provider = TracerProvider::builder()
.with_simple_exporter(opentelemetry_stdout::SpanExporter::default())
.build();
let otlp_tracer = opentelemetry_otlp::new_pipeline()
.tracing()
.with_trace_config(opentelemetry_sdk::trace::Config::default())
.with_exporter(opentelemetry_otlp::new_exporter().tonic())
.install_batch(opentelemetry_sdk::runtime::TokioCurrentThread)?;
let tracer = match otlp_enabled {
true => otlp_tracer,
false => basic_no_otlp_tracer_provider,
}
.tracer(env!("CARGO_PKG_NAME"));
let opentelemetry: OpenTelemetryLayer<_, _> = tracing_opentelemetry::layer()
.with_error_fields_to_exceptions(true)
.with_error_records_to_exceptions(true)
.with_tracer(tracer);
let use_test_writer = self.use_test_writer;
let pretty_logs = self.pretty_logs;
#[derive(Debug)]
enum MaybeTestWriterLayer<N, E> {
WithTestWriter(fmt::Layer<tracing_subscriber::Registry, N, E, TestWriter>),
NoTestWriter(fmt::Layer<tracing_subscriber::Registry>),
}
let base_layer = fmt::Layer::default();
let base_layer: MaybeTestWriterLayer<_, _> = match use_test_writer {
false => MaybeTestWriterLayer::NoTestWriter(base_layer),
true => MaybeTestWriterLayer::WithTestWriter(base_layer.with_test_writer()),
};
let format_layers = match pretty_logs {
false => match base_layer {
MaybeTestWriterLayer::NoTestWriter(layer) => {
layer.json().event_format(JsonWithTraceId).boxed()
}
MaybeTestWriterLayer::WithTestWriter(layer) => {
layer.json().event_format(JsonWithTraceId).boxed()
}
},
true => match base_layer {
MaybeTestWriterLayer::NoTestWriter(layer) => {
layer.pretty().with_span_events(FmtSpan::NONE).boxed()
}
MaybeTestWriterLayer::WithTestWriter(layer) => {
layer.pretty().with_span_events(FmtSpan::NONE).boxed()
}
},
};
let layers = opentelemetry.and_then(format_layers);
let tracing_registry = tracing_subscriber::registry()
.with(layers.with_filter(
EnvFilter::try_from_default_env().unwrap_or_else(|_| {
EnvFilter::try_new("info")
.expect("hard-coded default directive should be valid")
}),
));
#[cfg(feature = "tokio-console")]
let tracing_registry = tracing_registry.with(console_subscriber::spawn());
tracing_registry.try_init()?;
Ok(())
}
}
#[derive(Clone)]
pub struct GrpcInterceptor;
impl Interceptor for GrpcInterceptor {
fn call(&mut self, mut req: tonic::Request<()>) -> Result<tonic::Request<()>, tonic::Status> {
let context = Span::current().context();
opentelemetry::global::get_text_map_propagator(|propagator| {
propagator.inject_context(&context, &mut MetadataInjector(req.metadata_mut()));
});
Ok(req)
}
}
pub struct MetadataInjector<'a>(&'a mut tonic::metadata::MetadataMap);
impl<'a> opentelemetry::propagation::Injector for MetadataInjector<'a> {
fn set(&mut self, key: &str, value: String) {
if let Ok(key) = MetadataKey::from_str(key) {
if let Ok(val) = value.parse() {
self.0.insert(key, val);
}
}
}
}
pub type InterceptedGrpcService =
tonic::codegen::InterceptedService<tonic::transport::Channel, GrpcInterceptor>;
#[cfg(feature = "tower")]
pub use tower_tracing::*;
#[cfg(feature = "tower")]
pub mod tower_tracing {
use std::task::{Context, Poll};
use http::Request;
use opentelemetry::global;
use opentelemetry_http::HeaderExtractor;
use tower::{Layer, Service};
use tower_http::classify::{ServerErrorsAsFailures, SharedClassifier};
use tower_http::trace::TraceLayer;
use tracing::trace;
use tracing_opentelemetry::OpenTelemetrySpanExt;
pub struct TracingLayer;
impl<S> Layer<S> for TracingLayer {
type Service = TracingService<S>;
fn layer(&self, service: S) -> Self::Service {
TracingService { service }
}
}
#[derive(Clone, Debug)]
pub struct TracingService<S> {
service: S,
}
impl<S, BodyType> Service<http::Request<BodyType>> for TracingService<S>
where
S: Service<http::Request<BodyType>>,
BodyType: std::fmt::Debug,
{
type Response = S::Response;
type Error = S::Error;
type Future = S::Future;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.service.poll_ready(cx)
}
fn call(&mut self, mut request: Request<BodyType>) -> Self::Future {
let old_headers = request.headers().clone();
let context = tracing::Span::current().context();
global::get_text_map_propagator(|propagator| {
propagator.inject_context(
&context,
&mut opentelemetry_http::HeaderInjector(request.headers_mut()),
)
});
trace!(
"
--------------------------------------------------------------
old headers:
{:#?}
new headers:
{:#?}
-----------------------------------------------",
old_headers,
request.headers()
);
self.service.call(request)
}
}
pub fn make_tower_http_otel_trace_layer<BodyType>() -> TraceLayer<
SharedClassifier<ServerErrorsAsFailures>,
impl (Fn(&Request<BodyType>) -> tracing::Span) + Clone,
> {
tower_http::trace::TraceLayer::new_for_http().make_span_with(
|request: &http::Request<BodyType>| {
let context = get_otel_context_from_request(request);
let span = tracing::debug_span!(
"request",
method = %request.method(),
uri = %request.uri(),
version = ?request.version(),
headers = ?request.headers());
span.set_parent(context);
span
},
)
}
pub fn get_otel_context_from_request<BodyType>(
request: &Request<BodyType>,
) -> opentelemetry::Context {
let parent_context = global::get_text_map_propagator(|propagator| {
let extracted = propagator.extract(&HeaderExtractor(request.headers()));
trace!("extracted: {:#?}", &extracted);
extracted
});
trace!("parent context (extraction): {:#?}", parent_context);
parent_context
}
#[cfg(test)]
mod tests {
use opentelemetry::{baggage::BaggageExt, trace::TraceContextExt};
use crate::tower_tracing::get_otel_context_from_request;
#[tokio::test]
async fn test_trace_context_extractor() {
crate::set_up_logging().unwrap_or(());
let request: http::Request<String> = http::Request::builder()
.uri("/")
.header(
"traceparent",
"00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01",
)
.header("tracestate", "asdf=123456")
.header(
"baggage",
"userId=alice,serverNode=DF%2028,isProduction=false",
)
.body("".to_string())
.unwrap();
let context = get_otel_context_from_request(&request);
dbg!(&context);
dbg!(&context.has_active_span());
assert!(context.has_active_span());
let baggage = context.baggage();
assert_eq!(baggage.get("userId"), Some(&"alice".into()));
assert_eq!(baggage.get("serverNode"), Some(&"DF 28".into()));
assert_eq!(baggage.get("isProduction"), Some(&"false".into()));
}
}
}