opentelemetry_tracing_utils/
lib.rs1use anyhow::Result;
6use std::str::FromStr;
7use tracing_opentelemetry::OpenTelemetryLayer;
8
9use opentelemetry::{global, propagation::TextMapCompositePropagator, trace::TracerProvider};
11use opentelemetry_sdk::{
12 propagation::{BaggagePropagator, TraceContextPropagator},
13 trace::SdkTracerProvider,
14};
15pub use opentelemetry_semantic_conventions as semcov;
16use tonic::{metadata::MetadataKey, service::Interceptor};
17use tracing::Span;
18pub use tracing_opentelemetry::OpenTelemetrySpanExt;
19use tracing_subscriber::{
20 fmt::{self, format::FmtSpan, TestWriter},
21 layer::SubscriberExt,
22 util::SubscriberInitExt,
23 EnvFilter, Layer,
24};
25
26use self::trace_output_fmt::JsonWithTraceId;
27
28pub mod trace_output_fmt;
29
30pub fn set_up_logging() -> Result<LoggingSetupBuildResult> {
33 LoggingSetupBuilder::new().build()
34}
35
36pub const OTEL_TRACES_EXPORTER: &str = "OTEL_TRACES_EXPORTER";
39pub const NO_OTLP: &str = "NO_OTLP";
41
42#[derive(Debug)]
43pub struct LoggingSetupBuilder {
44 pub otlp_output_enabled: bool,
45 pub pretty_logs: bool,
46 pub use_test_writer: bool,
47}
48impl Default for LoggingSetupBuilder {
49 fn default() -> Self {
50 let no_otlp = match std::env::var(NO_OTLP).as_deref() {
51 Ok("0") => false,
52 Ok(_) => true,
53 Err(_) => false,
54 };
55
56 let otel_traces_exporter = match std::env::var(OTEL_TRACES_EXPORTER).as_deref() {
57 Ok("otlp") => OtelTracesExporterOption::Otlp,
58 Ok("none") => OtelTracesExporterOption::None,
59 _ => OtelTracesExporterOption::Otlp,
60 };
61
62 let otlp_enabled = no_otlp == false
63 && match otel_traces_exporter {
64 OtelTracesExporterOption::Otlp => true,
65 OtelTracesExporterOption::None => false,
66 };
67
68 let pretty_logs = std::env::var("PRETTY_LOGS")
70 .map(|e| &e == "1")
71 .unwrap_or_else(|_| !otlp_enabled);
72
73 Self {
74 otlp_output_enabled: otlp_enabled,
75 pretty_logs,
76 use_test_writer: false,
77 }
78 }
79}
80
81enum OtelTracesExporterOption {
82 Otlp,
83 None,
84}
85
86impl LoggingSetupBuilder {
87 pub fn new() -> Self {
88 Self::default()
89 }
90 pub fn build(&self) -> Result<LoggingSetupBuildResult> {
91 let otlp_enabled = self.otlp_output_enabled;
92
93 let baggage_propagator = BaggagePropagator::new();
95 let trace_context_propagator = TraceContextPropagator::new();
96
97 let composite_propagator = TextMapCompositePropagator::new(vec![
99 Box::new(baggage_propagator),
100 Box::new(trace_context_propagator),
101 ]);
102
103 global::set_text_map_propagator(composite_propagator);
104
105 let basic_no_otlp_tracer_provider = SdkTracerProvider::builder().build();
107
108 let otlp_tracer_exporter = opentelemetry_otlp::SpanExporter::builder()
111 .with_tonic()
112 .build()?;
113 let otlp_tracer: SdkTracerProvider = SdkTracerProvider::builder()
115 .with_batch_exporter(otlp_tracer_exporter)
116 .build();
117
118 let tracer_provider = match otlp_enabled {
119 true => otlp_tracer,
120 false => basic_no_otlp_tracer_provider,
122 };
123 let tracer = tracer_provider.tracer(env!("CARGO_PKG_NAME"));
124
125 let opentelemetry: OpenTelemetryLayer<_, _> = tracing_opentelemetry::layer()
127 .with_error_fields_to_exceptions(true)
128 .with_error_records_to_exceptions(true)
129 .with_tracer(tracer);
130
131 let use_test_writer = self.use_test_writer;
132 let pretty_logs = self.pretty_logs;
133
134 #[derive(Debug)]
135 enum MaybeTestWriterLayer<N, E> {
136 WithTestWriter(fmt::Layer<tracing_subscriber::Registry, N, E, TestWriter>),
137 NoTestWriter(fmt::Layer<tracing_subscriber::Registry>),
138 }
139
140 let base_layer = fmt::Layer::default();
141 let base_layer: MaybeTestWriterLayer<_, _> = match use_test_writer {
142 false => MaybeTestWriterLayer::NoTestWriter(base_layer),
143 true => MaybeTestWriterLayer::WithTestWriter(base_layer.with_test_writer()),
144 };
145
146 let format_layers = match pretty_logs {
149 false => match base_layer {
151 MaybeTestWriterLayer::NoTestWriter(layer) => {
152 layer.json().event_format(JsonWithTraceId).boxed()
153 }
154 MaybeTestWriterLayer::WithTestWriter(layer) => {
155 layer.json().event_format(JsonWithTraceId).boxed()
156 }
157 },
158 true => match base_layer {
160 MaybeTestWriterLayer::NoTestWriter(layer) => {
161 layer.pretty().with_span_events(FmtSpan::NONE).boxed()
162 }
163 MaybeTestWriterLayer::WithTestWriter(layer) => {
164 layer.pretty().with_span_events(FmtSpan::NONE).boxed()
165 }
166 },
167 };
168
169 let layers = opentelemetry.and_then(format_layers);
170
171 let tracing_registry = tracing_subscriber::registry()
172 .with(layers.with_filter(
174 EnvFilter::try_from_default_env().unwrap_or_else(|_| {
177 EnvFilter::try_new("info")
179 .expect("hard-coded default directive should be valid")
180 }),
181 ));
182
183 #[cfg(feature = "tokio-console")]
184 let tracing_registry = tracing_registry.with(console_subscriber::spawn());
185
186 tracing_registry.try_init()?;
187
188 global::set_tracer_provider(tracer_provider.clone());
195
196 Ok(LoggingSetupBuildResult { tracer_provider })
197 }
198}
199
200pub struct LoggingSetupBuildResult {
202 pub tracer_provider: SdkTracerProvider,
204}
205
206#[derive(Clone)]
209pub struct GrpcInterceptor;
210impl Interceptor for GrpcInterceptor {
211 fn call(&mut self, mut req: tonic::Request<()>) -> Result<tonic::Request<()>, tonic::Status> {
212 let context = Span::current().context();
214
215 opentelemetry::global::get_text_map_propagator(|propagator| {
216 propagator.inject_context(&context, &mut MetadataInjector(req.metadata_mut()));
217 });
218
219 Ok(req)
220 }
221}
222
223pub struct MetadataInjector<'a>(&'a mut tonic::metadata::MetadataMap);
224impl<'a> opentelemetry::propagation::Injector for MetadataInjector<'a> {
225 fn set(&mut self, key: &str, value: String) {
226 if let Ok(key) = MetadataKey::from_str(key) {
227 if let Ok(val) = value.parse() {
228 self.0.insert(key, val);
229 }
230 }
231 }
232}
233
234pub type InterceptedGrpcService =
236 tonic::codegen::InterceptedService<tonic::transport::Channel, GrpcInterceptor>;
237
238#[cfg(feature = "tower")]
239pub use tower_tracing::*;
240
241#[cfg(feature = "tower")]
242pub mod tower_tracing {
243 use std::task::{Context, Poll};
244
245 use http::Request;
246 use opentelemetry::global;
247 use opentelemetry_http::HeaderExtractor;
248 use tower::{Layer, Service};
249 use tower_http::classify::{ServerErrorsAsFailures, SharedClassifier};
250 use tower_http::trace::TraceLayer;
251 use tracing::trace;
252 use tracing_opentelemetry::OpenTelemetrySpanExt;
253
254 pub struct TracingLayer;
255
256 impl<S> Layer<S> for TracingLayer {
257 type Service = TracingService<S>;
258
259 fn layer(&self, service: S) -> Self::Service {
260 TracingService { service }
261 }
262 }
263
264 #[derive(Clone, Debug)]
266 pub struct TracingService<S> {
267 service: S,
268 }
269
270 impl<S, BodyType> Service<http::Request<BodyType>> for TracingService<S>
271 where
272 S: Service<http::Request<BodyType>>,
273 BodyType: std::fmt::Debug,
274 {
275 type Response = S::Response;
276 type Error = S::Error;
277 type Future = S::Future;
278
279 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
280 self.service.poll_ready(cx)
281 }
282
283 fn call(&mut self, mut request: Request<BodyType>) -> Self::Future {
284 let old_headers = request.headers().clone();
285
286 let context = tracing::Span::current().context();
287
288 global::get_text_map_propagator(|propagator| {
289 propagator.inject_context(
290 &context,
291 &mut opentelemetry_http::HeaderInjector(request.headers_mut()),
292 )
293 });
294
295 trace!(
296 "
297--------------------------------------------------------------
298old headers:
299{:#?}
300new headers:
301{:#?}
302-----------------------------------------------",
303 old_headers,
304 request.headers()
305 );
306
307 self.service.call(request)
308 }
309 }
310
311 pub fn make_tower_http_otel_trace_layer<BodyType>() -> TraceLayer<
319 SharedClassifier<ServerErrorsAsFailures>,
320 impl (Fn(&Request<BodyType>) -> tracing::Span) + Clone,
321 > {
322 tower_http::trace::TraceLayer::new_for_http().make_span_with(
323 |request: &http::Request<BodyType>| {
324 let context = get_otel_context_from_request(request);
325
326 let span = tracing::debug_span!(
327 "request",
328 method = %request.method(),
329 uri = %request.uri(),
330 version = ?request.version(),
331 headers = ?request.headers());
332
333 span.set_parent(context);
334
335 span
336 },
337 )
338 }
339
340 pub fn get_otel_context_from_request<BodyType>(
342 request: &Request<BodyType>,
343 ) -> opentelemetry::Context {
344 let parent_context = global::get_text_map_propagator(|propagator| {
346 let extracted = propagator.extract(&HeaderExtractor(request.headers()));
347 trace!("extracted: {:#?}", &extracted);
348 extracted
349 });
350 trace!("parent context (extraction): {:#?}", parent_context);
351
352 parent_context
353 }
354
355 #[cfg(test)]
356 mod tests {
357 use opentelemetry::{baggage::BaggageExt, trace::TraceContextExt};
358
359 use crate::tower_tracing::get_otel_context_from_request;
360
361 #[tokio::test]
363 async fn test_trace_context_extractor() {
364 let _ = crate::set_up_logging().map_err(|err| dbg!(err));
365
366 let request: http::Request<String> = http::Request::builder()
367 .uri("/")
368 .header(
369 "traceparent",
370 "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01",
371 )
372 .header("tracestate", "asdf=123456")
373 .header(
374 "baggage",
375 "userId=alice,serverNode=DF%2028,isProduction=false",
376 )
377 .body("".to_string())
378 .unwrap();
379
380 let context = get_otel_context_from_request(&request);
381
382 dbg!(&context);
383 dbg!(&context.has_active_span());
384 assert!(context.has_active_span());
385
386 let baggage = context.baggage();
387 assert_eq!(baggage.get("userId"), Some(&"alice".into()));
388 assert_eq!(baggage.get("serverNode"), Some(&"DF 28".into()));
389 assert_eq!(baggage.get("isProduction"), Some(&"false".into()));
390 }
391 }
392}