opentelemetry_tracing_utils/
lib.rs1use anyhow::Result;
6use std::str::FromStr;
7use tracing_opentelemetry::OpenTelemetryLayer;
8
9use opentelemetry::{global, propagation::TextMapCompositePropagator, trace::TracerProvider};
11use opentelemetry_sdk::{
12 propagation::{BaggagePropagator, TraceContextPropagator},
13 trace::SdkTracerProvider,
14};
15pub use opentelemetry_semantic_conventions as semcov;
16use tonic::{metadata::MetadataKey, service::Interceptor};
17use tracing::Span;
18pub use tracing_opentelemetry::OpenTelemetrySpanExt;
19use tracing_subscriber::{
20 fmt::{self, format::FmtSpan, TestWriter},
21 layer::SubscriberExt,
22 util::SubscriberInitExt,
23 EnvFilter, Layer,
24};
25
26use self::trace_output_fmt::JsonWithTraceId;
27
28pub mod trace_output_fmt;
29
30pub fn set_up_logging() -> Result<LoggingSetupBuildResult> {
33 LoggingSetupBuilder::new().build()
34}
35
36#[derive(Debug)]
37pub struct LoggingSetupBuilder {
38 pub otlp_output_enabled: bool,
39 pub pretty_logs: bool,
40 pub use_test_writer: bool,
41}
42impl Default for LoggingSetupBuilder {
43 fn default() -> Self {
44 let otlp_enabled = std::env::var("NO_OTLP")
45 .unwrap_or_else(|_| "0".to_owned())
46 .as_str()
47 == "0";
48
49 let pretty_logs = std::env::var("PRETTY_LOGS")
51 .map(|e| &e == "1")
52 .unwrap_or_else(|_| !otlp_enabled);
53
54 Self {
55 otlp_output_enabled: otlp_enabled,
56 pretty_logs,
57 use_test_writer: false,
58 }
59 }
60}
61
62impl LoggingSetupBuilder {
63 pub fn new() -> Self {
64 Self::default()
65 }
66 pub fn build(&self) -> Result<LoggingSetupBuildResult> {
67 let otlp_enabled = self.otlp_output_enabled;
68
69 let baggage_propagator = BaggagePropagator::new();
71 let trace_context_propagator = TraceContextPropagator::new();
72
73 let composite_propagator = TextMapCompositePropagator::new(vec![
75 Box::new(baggage_propagator),
76 Box::new(trace_context_propagator),
77 ]);
78
79 global::set_text_map_propagator(composite_propagator);
80
81 let basic_no_otlp_tracer_provider = SdkTracerProvider::builder()
82 .with_simple_exporter(opentelemetry_stdout::SpanExporter::default())
83 .build();
84
85 let otlp_tracer_exporter = opentelemetry_otlp::SpanExporter::builder()
88 .with_tonic()
89 .build()?;
90 let otlp_tracer: SdkTracerProvider = SdkTracerProvider::builder()
92 .with_batch_exporter(otlp_tracer_exporter)
93 .build();
94
95 let tracer_provider = match otlp_enabled {
96 true => otlp_tracer,
97 false => basic_no_otlp_tracer_provider,
99 };
100 let tracer = tracer_provider.tracer(env!("CARGO_PKG_NAME"));
101
102 let opentelemetry: OpenTelemetryLayer<_, _> = tracing_opentelemetry::layer()
104 .with_error_fields_to_exceptions(true)
105 .with_error_records_to_exceptions(true)
106 .with_tracer(tracer);
107
108 let use_test_writer = self.use_test_writer;
109 let pretty_logs = self.pretty_logs;
110
111 #[derive(Debug)]
112 enum MaybeTestWriterLayer<N, E> {
113 WithTestWriter(fmt::Layer<tracing_subscriber::Registry, N, E, TestWriter>),
114 NoTestWriter(fmt::Layer<tracing_subscriber::Registry>),
115 }
116
117 let base_layer = fmt::Layer::default();
118 let base_layer: MaybeTestWriterLayer<_, _> = match use_test_writer {
119 false => MaybeTestWriterLayer::NoTestWriter(base_layer),
120 true => MaybeTestWriterLayer::WithTestWriter(base_layer.with_test_writer()),
121 };
122
123 let format_layers = match pretty_logs {
126 false => match base_layer {
128 MaybeTestWriterLayer::NoTestWriter(layer) => {
129 layer.json().event_format(JsonWithTraceId).boxed()
130 }
131 MaybeTestWriterLayer::WithTestWriter(layer) => {
132 layer.json().event_format(JsonWithTraceId).boxed()
133 }
134 },
135 true => match base_layer {
137 MaybeTestWriterLayer::NoTestWriter(layer) => {
138 layer.pretty().with_span_events(FmtSpan::NONE).boxed()
139 }
140 MaybeTestWriterLayer::WithTestWriter(layer) => {
141 layer.pretty().with_span_events(FmtSpan::NONE).boxed()
142 }
143 },
144 };
145
146 let layers = opentelemetry.and_then(format_layers);
147
148 let tracing_registry = tracing_subscriber::registry()
149 .with(layers.with_filter(
151 EnvFilter::try_from_default_env().unwrap_or_else(|_| {
154 EnvFilter::try_new("info")
156 .expect("hard-coded default directive should be valid")
157 }),
158 ));
159
160 #[cfg(feature = "tokio-console")]
161 let tracing_registry = tracing_registry.with(console_subscriber::spawn());
162
163 tracing_registry.try_init()?;
164
165 global::set_tracer_provider(tracer_provider.clone());
172
173 Ok(LoggingSetupBuildResult { tracer_provider })
174 }
175}
176
177pub struct LoggingSetupBuildResult {
179 pub tracer_provider: SdkTracerProvider,
181}
182
183#[derive(Clone)]
186pub struct GrpcInterceptor;
187impl Interceptor for GrpcInterceptor {
188 fn call(&mut self, mut req: tonic::Request<()>) -> Result<tonic::Request<()>, tonic::Status> {
189 let context = Span::current().context();
191
192 opentelemetry::global::get_text_map_propagator(|propagator| {
193 propagator.inject_context(&context, &mut MetadataInjector(req.metadata_mut()));
194 });
195
196 Ok(req)
197 }
198}
199
200pub struct MetadataInjector<'a>(&'a mut tonic::metadata::MetadataMap);
201impl<'a> opentelemetry::propagation::Injector for MetadataInjector<'a> {
202 fn set(&mut self, key: &str, value: String) {
203 if let Ok(key) = MetadataKey::from_str(key) {
204 if let Ok(val) = value.parse() {
205 self.0.insert(key, val);
206 }
207 }
208 }
209}
210
211pub type InterceptedGrpcService =
213 tonic::codegen::InterceptedService<tonic::transport::Channel, GrpcInterceptor>;
214
215#[cfg(feature = "tower")]
216pub use tower_tracing::*;
217
218#[cfg(feature = "tower")]
219pub mod tower_tracing {
220 use std::task::{Context, Poll};
221
222 use http::Request;
223 use opentelemetry::global;
224 use opentelemetry_http::HeaderExtractor;
225 use tower::{Layer, Service};
226 use tower_http::classify::{ServerErrorsAsFailures, SharedClassifier};
227 use tower_http::trace::TraceLayer;
228 use tracing::trace;
229 use tracing_opentelemetry::OpenTelemetrySpanExt;
230
231 pub struct TracingLayer;
232
233 impl<S> Layer<S> for TracingLayer {
234 type Service = TracingService<S>;
235
236 fn layer(&self, service: S) -> Self::Service {
237 TracingService { service }
238 }
239 }
240
241 #[derive(Clone, Debug)]
243 pub struct TracingService<S> {
244 service: S,
245 }
246
247 impl<S, BodyType> Service<http::Request<BodyType>> for TracingService<S>
248 where
249 S: Service<http::Request<BodyType>>,
250 BodyType: std::fmt::Debug,
251 {
252 type Response = S::Response;
253 type Error = S::Error;
254 type Future = S::Future;
255
256 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
257 self.service.poll_ready(cx)
258 }
259
260 fn call(&mut self, mut request: Request<BodyType>) -> Self::Future {
261 let old_headers = request.headers().clone();
262
263 let context = tracing::Span::current().context();
264
265 global::get_text_map_propagator(|propagator| {
266 propagator.inject_context(
267 &context,
268 &mut opentelemetry_http::HeaderInjector(request.headers_mut()),
269 )
270 });
271
272 trace!(
273 "
274--------------------------------------------------------------
275old headers:
276{:#?}
277new headers:
278{:#?}
279-----------------------------------------------",
280 old_headers,
281 request.headers()
282 );
283
284 self.service.call(request)
285 }
286 }
287
288 pub fn make_tower_http_otel_trace_layer<BodyType>() -> TraceLayer<
296 SharedClassifier<ServerErrorsAsFailures>,
297 impl (Fn(&Request<BodyType>) -> tracing::Span) + Clone,
298 > {
299 tower_http::trace::TraceLayer::new_for_http().make_span_with(
300 |request: &http::Request<BodyType>| {
301 let context = get_otel_context_from_request(request);
302
303 let span = tracing::debug_span!(
304 "request",
305 method = %request.method(),
306 uri = %request.uri(),
307 version = ?request.version(),
308 headers = ?request.headers());
309
310 span.set_parent(context);
311
312 span
313 },
314 )
315 }
316
317 pub fn get_otel_context_from_request<BodyType>(
319 request: &Request<BodyType>,
320 ) -> opentelemetry::Context {
321 let parent_context = global::get_text_map_propagator(|propagator| {
323 let extracted = propagator.extract(&HeaderExtractor(request.headers()));
324 trace!("extracted: {:#?}", &extracted);
325 extracted
326 });
327 trace!("parent context (extraction): {:#?}", parent_context);
328
329 parent_context
330 }
331
332 #[cfg(test)]
333 mod tests {
334 use opentelemetry::{baggage::BaggageExt, trace::TraceContextExt};
335
336 use crate::tower_tracing::get_otel_context_from_request;
337
338 #[tokio::test]
340 async fn test_trace_context_extractor() {
341 let _ = crate::set_up_logging().map_err(|err| dbg!(err));
342
343 let request: http::Request<String> = http::Request::builder()
344 .uri("/")
345 .header(
346 "traceparent",
347 "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01",
348 )
349 .header("tracestate", "asdf=123456")
350 .header(
351 "baggage",
352 "userId=alice,serverNode=DF%2028,isProduction=false",
353 )
354 .body("".to_string())
355 .unwrap();
356
357 let context = get_otel_context_from_request(&request);
358
359 dbg!(&context);
360 dbg!(&context.has_active_span());
361 assert!(context.has_active_span());
362
363 let baggage = context.baggage();
364 assert_eq!(baggage.get("userId"), Some(&"alice".into()));
365 assert_eq!(baggage.get("serverNode"), Some(&"DF 28".into()));
366 assert_eq!(baggage.get("isProduction"), Some(&"false".into()));
367 }
368 }
369}