opentelemetry_tracing_utils/
lib.rs1use anyhow::Result;
6use std::str::FromStr;
7use tracing_opentelemetry::OpenTelemetryLayer;
8
9use opentelemetry::{global, propagation::TextMapCompositePropagator, trace::TracerProvider};
11use opentelemetry_sdk::{
12 propagation::{BaggagePropagator, TraceContextPropagator},
13 trace::SdkTracerProvider,
14};
15pub use opentelemetry_semantic_conventions as semcov;
16use tonic::{metadata::MetadataKey, service::Interceptor};
17use tracing::Span;
18pub use tracing_opentelemetry::OpenTelemetrySpanExt;
19use tracing_subscriber::{
20 fmt::{self, format::FmtSpan, TestWriter},
21 layer::SubscriberExt,
22 util::SubscriberInitExt,
23 EnvFilter, Layer,
24};
25
26use self::trace_output_fmt::JsonWithTraceId;
27
28pub mod trace_output_fmt;
29
30pub fn set_up_logging() -> Result<LoggingSetupBuildResult> {
33 LoggingSetupBuilder::new().build()
34}
35
36pub const OTEL_TRACES_EXPORTER: &str = "OTEL_TRACES_EXPORTER";
39pub const NO_OTLP: &str = "NO_OTLP";
41
42#[derive(Debug)]
43pub struct LoggingSetupBuilder {
44 pub otel_traces_exporter: OtelTracesExporterOption,
45 pub pretty_logs: bool,
46 pub use_test_writer: bool,
47}
48#[derive(Debug)]
49pub enum OtelTracesExporterOption {
50 Otlp,
52 Stdout,
54 None,
56}
57
58impl Default for LoggingSetupBuilder {
59 fn default() -> Self {
60 let no_otlp = match std::env::var(NO_OTLP).as_deref() {
61 Ok("0") => false,
62 Ok(_) => true,
63 Err(_) => false,
64 };
65
66 let otel_traces_exporter = match no_otlp {
67 true => OtelTracesExporterOption::None,
68 false => match std::env::var(OTEL_TRACES_EXPORTER).as_deref() {
69 Ok("otlp") => OtelTracesExporterOption::Otlp,
70 Ok("none") => OtelTracesExporterOption::None,
71 Ok("stdout") => OtelTracesExporterOption::Stdout,
72 _ => OtelTracesExporterOption::Otlp,
73 },
74 };
75
76 let pretty_logs = std::env::var("PRETTY_LOGS")
78 .map(|e| &e == "1")
79 .unwrap_or_else(|_| matches!(otel_traces_exporter, OtelTracesExporterOption::None));
82
83 Self {
84 otel_traces_exporter,
85 pretty_logs,
86 use_test_writer: false,
87 }
88 }
89}
90
91impl LoggingSetupBuilder {
92 pub fn new() -> Self {
93 Self::default()
94 }
95 pub fn build(&self) -> Result<LoggingSetupBuildResult> {
96 let otlp_enabled = &self.otel_traces_exporter;
97
98 let baggage_propagator = BaggagePropagator::new();
100 let trace_context_propagator = TraceContextPropagator::new();
101
102 let composite_propagator = TextMapCompositePropagator::new(vec![
104 Box::new(baggage_propagator),
105 Box::new(trace_context_propagator),
106 ]);
107
108 global::set_text_map_propagator(composite_propagator);
109
110 let otel_tracer_provider_no_output = SdkTracerProvider::builder().build();
112 let otel_tracer_provider_stdout = SdkTracerProvider::builder()
114 .with_simple_exporter(opentelemetry_stdout::SpanExporter::default())
115 .build();
116
117 let otlp_tracer_exporter = opentelemetry_otlp::SpanExporter::builder()
120 .with_tonic()
121 .build()?;
122 let otlp_tracer: SdkTracerProvider = SdkTracerProvider::builder()
124 .with_batch_exporter(otlp_tracer_exporter)
125 .build();
126
127 let tracer_provider = match otlp_enabled {
128 OtelTracesExporterOption::Otlp => otlp_tracer,
129 OtelTracesExporterOption::Stdout => otel_tracer_provider_stdout,
130 OtelTracesExporterOption::None => otel_tracer_provider_no_output,
132 };
133 let tracer = tracer_provider.tracer(env!("CARGO_PKG_NAME"));
134
135 let opentelemetry: OpenTelemetryLayer<_, _> = tracing_opentelemetry::layer()
137 .with_error_fields_to_exceptions(true)
138 .with_error_records_to_exceptions(true)
139 .with_tracer(tracer);
140
141 let use_test_writer = self.use_test_writer;
142 let pretty_logs = self.pretty_logs;
143
144 #[derive(Debug)]
145 enum MaybeTestWriterLayer<N, E> {
146 WithTestWriter(fmt::Layer<tracing_subscriber::Registry, N, E, TestWriter>),
147 NoTestWriter(fmt::Layer<tracing_subscriber::Registry>),
148 }
149
150 let base_layer = fmt::Layer::default();
151 let base_layer: MaybeTestWriterLayer<_, _> = match use_test_writer {
152 false => MaybeTestWriterLayer::NoTestWriter(base_layer),
153 true => MaybeTestWriterLayer::WithTestWriter(base_layer.with_test_writer()),
154 };
155
156 let format_layers = match pretty_logs {
159 false => match base_layer {
161 MaybeTestWriterLayer::NoTestWriter(layer) => {
162 layer.json().event_format(JsonWithTraceId).boxed()
163 }
164 MaybeTestWriterLayer::WithTestWriter(layer) => {
165 layer.json().event_format(JsonWithTraceId).boxed()
166 }
167 },
168 true => match base_layer {
170 MaybeTestWriterLayer::NoTestWriter(layer) => {
171 layer.pretty().with_span_events(FmtSpan::NONE).boxed()
172 }
173 MaybeTestWriterLayer::WithTestWriter(layer) => {
174 layer.pretty().with_span_events(FmtSpan::NONE).boxed()
175 }
176 },
177 };
178
179 let layers = opentelemetry.and_then(format_layers);
180
181 let tracing_registry = tracing_subscriber::registry()
182 .with(layers.with_filter(
184 EnvFilter::try_from_default_env().unwrap_or_else(|_| {
187 EnvFilter::try_new("info")
189 .expect("hard-coded default directive should be valid")
190 }),
191 ));
192
193 #[cfg(feature = "tokio-console")]
194 let tracing_registry = tracing_registry.with(console_subscriber::spawn());
195
196 tracing_registry.try_init()?;
197
198 global::set_tracer_provider(tracer_provider.clone());
205
206 Ok(LoggingSetupBuildResult { tracer_provider })
207 }
208}
209
210pub struct LoggingSetupBuildResult {
212 pub tracer_provider: SdkTracerProvider,
214}
215
216#[derive(Clone)]
219pub struct GrpcInterceptor;
220impl Interceptor for GrpcInterceptor {
221 fn call(&mut self, mut req: tonic::Request<()>) -> Result<tonic::Request<()>, tonic::Status> {
222 let context = Span::current().context();
224
225 opentelemetry::global::get_text_map_propagator(|propagator| {
226 propagator.inject_context(&context, &mut MetadataInjector(req.metadata_mut()));
227 });
228
229 Ok(req)
230 }
231}
232
233pub struct MetadataInjector<'a>(&'a mut tonic::metadata::MetadataMap);
234impl<'a> opentelemetry::propagation::Injector for MetadataInjector<'a> {
235 fn set(&mut self, key: &str, value: String) {
236 if let Ok(key) = MetadataKey::from_str(key) {
237 if let Ok(val) = value.parse() {
238 self.0.insert(key, val);
239 }
240 }
241 }
242}
243
244pub type InterceptedGrpcService =
246 tonic::codegen::InterceptedService<tonic::transport::Channel, GrpcInterceptor>;
247
248#[cfg(feature = "tower")]
249pub use tower_tracing::*;
250
251#[cfg(feature = "tower")]
252pub mod tower_tracing {
253 use std::task::{Context, Poll};
254
255 use http::Request;
256 use opentelemetry::global;
257 use opentelemetry_http::HeaderExtractor;
258 use tower::{Layer, Service};
259 use tower_http::classify::{ServerErrorsAsFailures, SharedClassifier};
260 use tower_http::trace::TraceLayer;
261 use tracing::trace;
262 use tracing_opentelemetry::OpenTelemetrySpanExt;
263
264 pub struct TracingLayer;
265
266 impl<S> Layer<S> for TracingLayer {
267 type Service = TracingService<S>;
268
269 fn layer(&self, service: S) -> Self::Service {
270 TracingService { service }
271 }
272 }
273
274 #[derive(Clone, Debug)]
276 pub struct TracingService<S> {
277 service: S,
278 }
279
280 impl<S, BodyType> Service<http::Request<BodyType>> for TracingService<S>
281 where
282 S: Service<http::Request<BodyType>>,
283 BodyType: std::fmt::Debug,
284 {
285 type Response = S::Response;
286 type Error = S::Error;
287 type Future = S::Future;
288
289 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
290 self.service.poll_ready(cx)
291 }
292
293 fn call(&mut self, mut request: Request<BodyType>) -> Self::Future {
294 let old_headers = request.headers().clone();
295
296 let context = tracing::Span::current().context();
297
298 global::get_text_map_propagator(|propagator| {
299 propagator.inject_context(
300 &context,
301 &mut opentelemetry_http::HeaderInjector(request.headers_mut()),
302 )
303 });
304
305 trace!(
306 "
307--------------------------------------------------------------
308old headers:
309{:#?}
310new headers:
311{:#?}
312-----------------------------------------------",
313 old_headers,
314 request.headers()
315 );
316
317 self.service.call(request)
318 }
319 }
320
321 pub fn make_tower_http_otel_trace_layer<BodyType>() -> TraceLayer<
329 SharedClassifier<ServerErrorsAsFailures>,
330 impl (Fn(&Request<BodyType>) -> tracing::Span) + Clone,
331 > {
332 tower_http::trace::TraceLayer::new_for_http().make_span_with(
333 |request: &http::Request<BodyType>| {
334 let context = get_otel_context_from_request(request);
335
336 let span = tracing::debug_span!(
337 "request",
338 method = %request.method(),
339 uri = %request.uri(),
340 version = ?request.version(),
341 headers = ?request.headers());
342
343 span.set_parent(context);
344
345 span
346 },
347 )
348 }
349
350 pub fn get_otel_context_from_request<BodyType>(
352 request: &Request<BodyType>,
353 ) -> opentelemetry::Context {
354 let parent_context = global::get_text_map_propagator(|propagator| {
356 let extracted = propagator.extract(&HeaderExtractor(request.headers()));
357 trace!("extracted: {:#?}", &extracted);
358 extracted
359 });
360 trace!("parent context (extraction): {:#?}", parent_context);
361
362 parent_context
363 }
364
365 #[cfg(test)]
366 mod tests {
367 use opentelemetry::{baggage::BaggageExt, trace::TraceContextExt};
368
369 use crate::tower_tracing::get_otel_context_from_request;
370
371 #[tokio::test]
373 async fn test_trace_context_extractor() {
374 let _ = crate::set_up_logging().map_err(|err| dbg!(err));
375
376 let request: http::Request<String> = http::Request::builder()
377 .uri("/")
378 .header(
379 "traceparent",
380 "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01",
381 )
382 .header("tracestate", "asdf=123456")
383 .header(
384 "baggage",
385 "userId=alice,serverNode=DF%2028,isProduction=false",
386 )
387 .body("".to_string())
388 .unwrap();
389
390 let context = get_otel_context_from_request(&request);
391
392 dbg!(&context);
393 dbg!(&context.has_active_span());
394 assert!(context.has_active_span());
395
396 let baggage = context.baggage();
397 assert_eq!(baggage.get("userId"), Some(&"alice".into()));
398 assert_eq!(baggage.get("serverNode"), Some(&"DF 28".into()));
399 assert_eq!(baggage.get("isProduction"), Some(&"false".into()));
400 }
401 }
402}