1use std::collections::{BTreeMap, HashMap};
30use std::sync::Once;
31
32use figment::{
33 Figment,
34 providers::{Format, Serialized, Toml},
35};
36use serde::{Deserialize, Serialize};
37use tracing::level_filters::LevelFilter;
38use tracing::{Event, Subscriber};
39use tracing_subscriber::EnvFilter;
40use tracing_subscriber::fmt::time::FormatTime;
41use tracing_subscriber::fmt::time::LocalTime;
42use tracing_subscriber::fmt::time::SystemTime;
43use tracing_subscriber::fmt::time::UtcTime;
44use tracing_subscriber::fmt::{FmtContext, FormatFields};
45use tracing_subscriber::fmt::{FormattedFields, format::Writer};
46use tracing_subscriber::prelude::*;
47use tracing_subscriber::registry::LookupSpan;
48use tracing_subscriber::{filter::Directive, fmt};
49
50use crate::config::{disable_ansi_logging, jsonl_logging_enabled};
51use async_nats::{HeaderMap, HeaderValue};
52use axum::extract::FromRequestParts;
53use axum::http;
54use axum::http::Request;
55use axum::http::request::Parts;
56use serde_json::Value;
57use std::convert::Infallible;
58use std::time::Instant;
59use tower_http::trace::{DefaultMakeSpan, TraceLayer};
60use tracing::Id;
61use tracing::Span;
62use tracing::field::Field;
63use tracing::span;
64use tracing_subscriber::Layer;
65use tracing_subscriber::Registry;
66use tracing_subscriber::field::Visit;
67use tracing_subscriber::fmt::format::FmtSpan;
68use tracing_subscriber::layer::Context;
69use tracing_subscriber::registry::SpanData;
70use uuid::Uuid;
71
72use opentelemetry::propagation::{Extractor, Injector, TextMapPropagator};
73use opentelemetry::trace::TraceContextExt;
74use opentelemetry::{global, trace::Tracer};
75use opentelemetry_otlp::WithExportConfig;
76
77use opentelemetry::trace::TracerProvider as _;
78use opentelemetry::{Key, KeyValue};
79use opentelemetry_sdk::Resource;
80use opentelemetry_sdk::trace::SdkTracerProvider;
81use tracing::error;
82use tracing_subscriber::layer::SubscriberExt;
83use std::time::Duration;
86use tracing::{info, instrument};
87use tracing_opentelemetry::OpenTelemetrySpanExt;
88use tracing_subscriber::util::SubscriberInitExt;
89
90const FILTER_ENV: &str = "DYN_LOG";
92
93const DEFAULT_FILTER_LEVEL: &str = "info";
95
96const CONFIG_PATH_ENV: &str = "DYN_LOGGING_CONFIG_PATH";
98
99const OTEL_EXPORT_ENABLED_ENV: &str = "OTEL_EXPORT_ENABLED";
101
102const OTEL_EXPORT_ENDPOINT_ENV: &str = "OTEL_EXPORTER_OTLP_TRACES_ENDPOINT";
105
106const DEFAULT_OTLP_ENDPOINT: &str = "http://localhost:4317";
108
109const OTEL_SERVICE_NAME_ENV: &str = "OTEL_SERVICE_NAME";
111
112const DEFAULT_OTEL_SERVICE_NAME: &str = "dynamo";
114
115static INIT: Once = Once::new();
117
118#[derive(Serialize, Deserialize, Debug)]
119struct LoggingConfig {
120 log_level: String,
121 log_filters: HashMap<String, String>,
122}
123impl Default for LoggingConfig {
124 fn default() -> Self {
125 LoggingConfig {
126 log_level: DEFAULT_FILTER_LEVEL.to_string(),
127 log_filters: HashMap::from([
128 ("h2".to_string(), "error".to_string()),
129 ("tower".to_string(), "error".to_string()),
130 ("hyper_util".to_string(), "error".to_string()),
131 ("neli".to_string(), "error".to_string()),
132 ("async_nats".to_string(), "error".to_string()),
133 ("rustls".to_string(), "error".to_string()),
134 ("tokenizers".to_string(), "error".to_string()),
135 ("axum".to_string(), "error".to_string()),
136 ("tonic".to_string(), "error".to_string()),
137 ("mistralrs_core".to_string(), "error".to_string()),
138 ("hf_hub".to_string(), "error".to_string()),
139 ("opentelemetry".to_string(), "error".to_string()),
140 ("opentelemetry-otlp".to_string(), "error".to_string()),
141 ("opentelemetry_sdk".to_string(), "error".to_string()),
142 ]),
143 }
144 }
145}
146
147fn otlp_exporter_enabled() -> bool {
149 crate::config::env_is_truthy(OTEL_EXPORT_ENABLED_ENV)
150}
151
152fn get_service_name() -> String {
154 std::env::var(OTEL_SERVICE_NAME_ENV).unwrap_or_else(|_| DEFAULT_OTEL_SERVICE_NAME.to_string())
155}
156
157pub fn is_valid_trace_id(trace_id: &str) -> bool {
160 trace_id.len() == 32 && trace_id.chars().all(|c| c.is_ascii_hexdigit())
161}
162
163pub fn is_valid_span_id(span_id: &str) -> bool {
166 span_id.len() == 16 && span_id.chars().all(|c| c.is_ascii_hexdigit())
167}
168
169pub struct DistributedTraceIdLayer;
170
171#[derive(Debug, Clone, Serialize, Deserialize)]
172pub struct DistributedTraceContext {
173 pub trace_id: String,
174 pub span_id: String,
175 #[serde(skip_serializing_if = "Option::is_none")]
176 pub parent_id: Option<String>,
177 #[serde(skip_serializing_if = "Option::is_none")]
178 pub tracestate: Option<String>,
179 #[serde(skip)]
180 start: Option<Instant>,
181 #[serde(skip)]
182 end: Option<Instant>,
183 #[serde(skip_serializing_if = "Option::is_none")]
184 pub x_request_id: Option<String>,
185 #[serde(skip_serializing_if = "Option::is_none")]
186 pub x_dynamo_request_id: Option<String>,
187}
188
189#[derive(Debug, Clone)]
191struct PendingDistributedTraceContext {
192 trace_id: Option<String>,
193 span_id: Option<String>,
194 parent_id: Option<String>,
195 tracestate: Option<String>,
196 x_request_id: Option<String>,
197 x_dynamo_request_id: Option<String>,
198}
199
200impl DistributedTraceContext {
201 pub fn create_traceparent(&self) -> String {
203 format!("00-{}-{}-01", self.trace_id, self.span_id)
204 }
205}
206
207pub fn parse_traceparent(traceparent: &str) -> (Option<String>, Option<String>) {
209 let pieces: Vec<_> = traceparent.split('-').collect();
210 if pieces.len() != 4 {
211 return (None, None);
212 }
213 let trace_id = pieces[1];
214 let parent_id = pieces[2];
215
216 if !is_valid_trace_id(trace_id) || !is_valid_span_id(parent_id) {
217 return (None, None);
218 }
219
220 (Some(trace_id.to_string()), Some(parent_id.to_string()))
221}
222
223#[derive(Debug, Clone, Default)]
224pub struct TraceParent {
225 pub trace_id: Option<String>,
226 pub parent_id: Option<String>,
227 pub tracestate: Option<String>,
228 pub x_request_id: Option<String>,
229 pub x_dynamo_request_id: Option<String>,
230}
231
232pub trait GenericHeaders {
233 fn get(&self, key: &str) -> Option<&str>;
234}
235
236impl GenericHeaders for async_nats::HeaderMap {
237 fn get(&self, key: &str) -> Option<&str> {
238 async_nats::HeaderMap::get(self, key).map(|value| value.as_str())
239 }
240}
241
242impl GenericHeaders for http::HeaderMap {
243 fn get(&self, key: &str) -> Option<&str> {
244 http::HeaderMap::get(self, key).and_then(|value| value.to_str().ok())
245 }
246}
247
248impl TraceParent {
249 pub fn from_headers<H: GenericHeaders>(headers: &H) -> TraceParent {
250 let mut trace_id = None;
251 let mut parent_id = None;
252 let mut tracestate = None;
253 let mut x_request_id = None;
254 let mut x_dynamo_request_id = None;
255
256 if let Some(header_value) = headers.get("traceparent") {
257 (trace_id, parent_id) = parse_traceparent(header_value);
258 }
259
260 if let Some(header_value) = headers.get("x-request-id") {
261 x_request_id = Some(header_value.to_string());
262 }
263
264 if let Some(header_value) = headers.get("tracestate") {
265 tracestate = Some(header_value.to_string());
266 }
267
268 if let Some(header_value) = headers.get("x-dynamo-request-id") {
269 x_dynamo_request_id = Some(header_value.to_string());
270 }
271
272 let x_dynamo_request_id =
274 x_dynamo_request_id.filter(|id| uuid::Uuid::parse_str(id).is_ok());
275 TraceParent {
276 trace_id,
277 parent_id,
278 tracestate,
279 x_request_id,
280 x_dynamo_request_id,
281 }
282 }
283}
284
285pub fn make_request_span<B>(req: &Request<B>) -> Span {
287 let method = req.method();
288 let uri = req.uri();
289 let version = format!("{:?}", req.version());
290 let trace_parent = TraceParent::from_headers(req.headers());
291
292 let span = tracing::info_span!(
293 "http-request",
294 method = %method,
295 uri = %uri,
296 version = %version,
297 trace_id = trace_parent.trace_id,
298 parent_id = trace_parent.parent_id,
299 x_request_id = trace_parent.x_request_id,
300 x_dynamo_request_id = trace_parent.x_dynamo_request_id,
301 );
302
303 span
304}
305
306pub fn make_handle_payload_span(
308 headers: &async_nats::HeaderMap,
309 component: &str,
310 endpoint: &str,
311 namespace: &str,
312 instance_id: u64,
313) -> Span {
314 let (otel_context, trace_id, parent_span_id) = extract_otel_context_from_nats_headers(headers);
315 let trace_parent = TraceParent::from_headers(headers);
316
317 if let (Some(trace_id), Some(parent_id)) = (trace_id.as_ref(), parent_span_id.as_ref()) {
318 let span = tracing::info_span!(
319 "handle_payload",
320 trace_id = trace_id.as_str(),
321 parent_id = parent_id.as_str(),
322 x_request_id = trace_parent.x_request_id,
323 x_dynamo_request_id = trace_parent.x_dynamo_request_id,
324 tracestate = trace_parent.tracestate,
325 component = component,
326 endpoint = endpoint,
327 namespace = namespace,
328 instance_id = instance_id,
329 );
330
331 if let Some(context) = otel_context {
332 let _ = span.set_parent(context);
333 }
334 span
335 } else {
336 tracing::info_span!(
337 "handle_payload",
338 x_request_id = trace_parent.x_request_id,
339 x_dynamo_request_id = trace_parent.x_dynamo_request_id,
340 tracestate = trace_parent.tracestate,
341 component = component,
342 endpoint = endpoint,
343 namespace = namespace,
344 instance_id = instance_id,
345 )
346 }
347}
348
349pub fn extract_otel_context_from_nats_headers(
351 headers: &async_nats::HeaderMap,
352) -> (
353 Option<opentelemetry::Context>,
354 Option<String>,
355 Option<String>,
356) {
357 let traceparent_value = match headers.get("traceparent") {
358 Some(value) => value.as_str(),
359 None => return (None, None, None),
360 };
361
362 let (trace_id, parent_span_id) = parse_traceparent(traceparent_value);
363
364 struct NatsHeaderExtractor<'a>(&'a async_nats::HeaderMap);
365
366 impl<'a> Extractor for NatsHeaderExtractor<'a> {
367 fn get(&self, key: &str) -> Option<&str> {
368 self.0.get(key).map(|value| value.as_str())
369 }
370
371 fn keys(&self) -> Vec<&str> {
372 vec!["traceparent", "tracestate"]
373 .into_iter()
374 .filter(|&key| self.0.get(key).is_some())
375 .collect()
376 }
377 }
378
379 let extractor = NatsHeaderExtractor(headers);
380 let propagator = opentelemetry_sdk::propagation::TraceContextPropagator::new();
381 let otel_context = propagator.extract(&extractor);
382
383 let context_with_trace = if otel_context.span().span_context().is_valid() {
384 Some(otel_context)
385 } else {
386 None
387 };
388
389 (context_with_trace, trace_id, parent_span_id)
390}
391
392pub fn inject_otel_context_into_nats_headers(
394 headers: &mut async_nats::HeaderMap,
395 context: Option<opentelemetry::Context>,
396) {
397 let otel_context = context.unwrap_or_else(|| Span::current().context());
398
399 struct NatsHeaderInjector<'a>(&'a mut async_nats::HeaderMap);
400
401 impl<'a> Injector for NatsHeaderInjector<'a> {
402 fn set(&mut self, key: &str, value: String) {
403 self.0.insert(key, value);
404 }
405 }
406
407 let mut injector = NatsHeaderInjector(headers);
408 let propagator = opentelemetry_sdk::propagation::TraceContextPropagator::new();
409 propagator.inject_context(&otel_context, &mut injector);
410}
411
412pub fn inject_current_trace_into_nats_headers(headers: &mut async_nats::HeaderMap) {
414 inject_otel_context_into_nats_headers(headers, None);
415}
416
417pub fn inject_trace_headers_into_map(headers: &mut std::collections::HashMap<String, String>) {
419 if let Some(trace_context) = get_distributed_tracing_context() {
420 headers.insert(
422 "traceparent".to_string(),
423 trace_context.create_traceparent(),
424 );
425
426 if let Some(tracestate) = trace_context.tracestate {
428 headers.insert("tracestate".to_string(), tracestate);
429 }
430
431 if let Some(x_request_id) = trace_context.x_request_id {
433 headers.insert("x-request-id".to_string(), x_request_id);
434 }
435 if let Some(x_dynamo_request_id) = trace_context.x_dynamo_request_id {
436 headers.insert("x-dynamo-request-id".to_string(), x_dynamo_request_id);
437 }
438 }
439}
440
441pub fn make_client_request_span(
443 operation: &str,
444 request_id: &str,
445 trace_context: Option<&DistributedTraceContext>,
446 instance_id: Option<&str>,
447) -> Span {
448 if let Some(ctx) = trace_context {
449 let mut headers = async_nats::HeaderMap::new();
450 headers.insert("traceparent", ctx.create_traceparent());
451
452 if let Some(ref tracestate) = ctx.tracestate {
453 headers.insert("tracestate", tracestate.as_str());
454 }
455
456 let (otel_context, _extracted_trace_id, _extracted_parent_span_id) =
457 extract_otel_context_from_nats_headers(&headers);
458
459 let span = if let Some(inst_id) = instance_id {
460 tracing::info_span!(
461 "client_request",
462 operation = operation,
463 request_id = request_id,
464 instance_id = inst_id,
465 trace_id = ctx.trace_id.as_str(),
466 parent_id = ctx.span_id.as_str(),
467 x_request_id = ctx.x_request_id.as_deref(),
468 x_dynamo_request_id = ctx.x_dynamo_request_id.as_deref(),
469 )
471 } else {
472 tracing::info_span!(
473 "client_request",
474 operation = operation,
475 request_id = request_id,
476 trace_id = ctx.trace_id.as_str(),
477 parent_id = ctx.span_id.as_str(),
478 x_request_id = ctx.x_request_id.as_deref(),
479 x_dynamo_request_id = ctx.x_dynamo_request_id.as_deref(),
480 )
482 };
483
484 if let Some(context) = otel_context {
485 let _ = span.set_parent(context);
486 }
487
488 span
489 } else if let Some(inst_id) = instance_id {
490 tracing::info_span!(
491 "client_request",
492 operation = operation,
493 request_id = request_id,
494 instance_id = inst_id,
495 )
496 } else {
497 tracing::info_span!(
498 "client_request",
499 operation = operation,
500 request_id = request_id,
501 )
502 }
503}
504
505#[derive(Debug, Default)]
506pub struct FieldVisitor {
507 pub fields: HashMap<String, String>,
508}
509
510impl Visit for FieldVisitor {
511 fn record_str(&mut self, field: &Field, value: &str) {
512 self.fields
513 .insert(field.name().to_string(), value.to_string());
514 }
515
516 fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
517 self.fields
518 .insert(field.name().to_string(), format!("{:?}", value).to_string());
519 }
520}
521
522impl<S> Layer<S> for DistributedTraceIdLayer
523where
524 S: Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>,
525{
526 fn on_close(&self, id: Id, ctx: Context<'_, S>) {
529 if let Some(span) = ctx.span(&id) {
530 let mut extensions = span.extensions_mut();
531 if let Some(distributed_tracing_context) =
532 extensions.get_mut::<DistributedTraceContext>()
533 {
534 distributed_tracing_context.end = Some(Instant::now());
535 }
536 }
537 }
538
539 fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &Id, ctx: Context<'_, S>) {
542 if let Some(span) = ctx.span(id) {
543 let mut trace_id: Option<String> = None;
544 let mut parent_id: Option<String> = None;
545 let mut span_id: Option<String> = None;
546 let mut x_request_id: Option<String> = None;
547 let mut x_dynamo_request_id: Option<String> = None;
548 let mut tracestate: Option<String> = None;
549 let mut visitor = FieldVisitor::default();
550 attrs.record(&mut visitor);
551
552 if let Some(trace_id_input) = visitor.fields.get("trace_id") {
554 if !is_valid_trace_id(trace_id_input) {
555 tracing::trace!("trace id '{}' is not valid! Ignoring.", trace_id_input);
556 } else {
557 trace_id = Some(trace_id_input.to_string());
558 }
559 }
560
561 if let Some(span_id_input) = visitor.fields.get("span_id") {
563 if !is_valid_span_id(span_id_input) {
564 tracing::trace!("span id '{}' is not valid! Ignoring.", span_id_input);
565 } else {
566 span_id = Some(span_id_input.to_string());
567 }
568 }
569
570 if let Some(parent_id_input) = visitor.fields.get("parent_id") {
572 if !is_valid_span_id(parent_id_input) {
573 tracing::trace!("parent id '{}' is not valid! Ignoring.", parent_id_input);
574 } else {
575 parent_id = Some(parent_id_input.to_string());
576 }
577 }
578
579 if let Some(tracestate_input) = visitor.fields.get("tracestate") {
581 tracestate = Some(tracestate_input.to_string());
582 }
583
584 if let Some(x_request_id_input) = visitor.fields.get("x_request_id") {
586 x_request_id = Some(x_request_id_input.to_string());
587 }
588
589 if let Some(x_request_id_input) = visitor.fields.get("x_dynamo_request_id") {
591 x_dynamo_request_id = Some(x_request_id_input.to_string());
592 }
593
594 if parent_id.is_none()
596 && let Some(parent_span_id) = ctx.current_span().id()
597 && let Some(parent_span) = ctx.span(parent_span_id)
598 {
599 let parent_ext = parent_span.extensions();
600 if let Some(parent_tracing_context) = parent_ext.get::<DistributedTraceContext>() {
601 trace_id = Some(parent_tracing_context.trace_id.clone());
602 parent_id = Some(parent_tracing_context.span_id.clone());
603 tracestate = parent_tracing_context.tracestate.clone();
604 }
605 }
606
607 if (parent_id.is_some() || span_id.is_some()) && trace_id.is_none() {
609 tracing::error!("parent id or span id are set but trace id is not set!");
610 parent_id = None;
612 span_id = None;
613 }
614
615 let mut extensions = span.extensions_mut();
617 extensions.insert(PendingDistributedTraceContext {
618 trace_id,
619 span_id,
620 parent_id,
621 tracestate,
622 x_request_id,
623 x_dynamo_request_id,
624 });
625 }
626 }
627
628 fn on_enter(&self, id: &Id, ctx: Context<'_, S>) {
631 if let Some(span) = ctx.span(id) {
632 {
634 let extensions = span.extensions();
635 if extensions.get::<DistributedTraceContext>().is_some() {
636 return;
637 }
638 }
639
640 let mut extensions = span.extensions_mut();
642 let pending = match extensions.remove::<PendingDistributedTraceContext>() {
643 Some(p) => p,
644 None => {
645 tracing::error!("PendingDistributedTraceContext not found in on_enter");
647 return;
648 }
649 };
650
651 let mut trace_id = pending.trace_id;
652 let mut span_id = pending.span_id;
653 let parent_id = pending.parent_id;
654 let tracestate = pending.tracestate;
655 let x_request_id = pending.x_request_id;
656 let x_dynamo_request_id = pending.x_dynamo_request_id;
657
658 drop(extensions);
661
662 if trace_id.is_none() || span_id.is_none() {
663 let extensions = span.extensions();
664 if let Some(otel_data) = extensions.get::<tracing_opentelemetry::OtelData>() {
665 if trace_id.is_none()
667 && let Some(otel_trace_id) = otel_data.trace_id()
668 {
669 let trace_id_str = format!("{}", otel_trace_id);
670 if is_valid_trace_id(&trace_id_str) {
671 trace_id = Some(trace_id_str);
672 }
673 }
674
675 if span_id.is_none()
677 && let Some(otel_span_id) = otel_data.span_id()
678 {
679 let span_id_str = format!("{}", otel_span_id);
680 if is_valid_span_id(&span_id_str) {
681 span_id = Some(span_id_str);
682 }
683 }
684 }
685 }
686
687 if trace_id.is_none() {
689 panic!(
690 "trace_id is not set in on_enter - OtelData may not be properly initialized"
691 );
692 }
693
694 if span_id.is_none() {
695 panic!("span_id is not set in on_enter - OtelData may not be properly initialized");
696 }
697
698 let mut extensions = span.extensions_mut();
700 extensions.insert(DistributedTraceContext {
701 trace_id: trace_id.expect("Trace ID must be set"),
702 span_id: span_id.expect("Span ID must be set"),
703 parent_id,
704 tracestate,
705 start: Some(Instant::now()),
706 end: None,
707 x_request_id,
708 x_dynamo_request_id,
709 });
710 }
711 }
712}
713
714pub fn get_distributed_tracing_context() -> Option<DistributedTraceContext> {
717 Span::current()
718 .with_subscriber(|(id, subscriber)| {
719 subscriber
720 .downcast_ref::<Registry>()
721 .and_then(|registry| registry.span_data(id))
722 .and_then(|span_data| {
723 let extensions = span_data.extensions();
724 extensions.get::<DistributedTraceContext>().cloned()
725 })
726 })
727 .flatten()
728}
729
730pub fn init() {
732 INIT.call_once(|| {
733 if let Err(e) = setup_logging() {
734 eprintln!("Failed to initialize logging: {}", e);
735 std::process::exit(1);
736 }
737 });
738}
739
740#[cfg(feature = "tokio-console")]
741fn setup_logging() {
742 let tokio_console_layer = console_subscriber::ConsoleLayer::builder()
743 .with_default_env()
744 .server_addr(([0, 0, 0, 0], console_subscriber::Server::DEFAULT_PORT))
745 .spawn();
746 let tokio_console_target = tracing_subscriber::filter::Targets::new()
747 .with_default(LevelFilter::ERROR)
748 .with_target("runtime", LevelFilter::TRACE)
749 .with_target("tokio", LevelFilter::TRACE);
750 let l = fmt::layer()
751 .with_ansi(!disable_ansi_logging())
752 .event_format(fmt::format().compact().with_timer(TimeFormatter::new()))
753 .with_writer(std::io::stderr)
754 .with_filter(filters(load_config()));
755 tracing_subscriber::registry()
756 .with(l)
757 .with(tokio_console_layer.with_filter(tokio_console_target))
758 .init();
759}
760
761#[cfg(not(feature = "tokio-console"))]
762fn setup_logging() -> Result<(), Box<dyn std::error::Error>> {
763 let fmt_filter_layer = filters(load_config());
764 let trace_filter_layer = filters(load_config());
765 let otel_filter_layer = filters(load_config());
766
767 if jsonl_logging_enabled() {
768 let l = fmt::layer()
769 .with_ansi(false)
770 .event_format(CustomJsonFormatter::new())
771 .with_writer(std::io::stderr)
772 .with_filter(fmt_filter_layer);
773
774 let service_name = get_service_name();
776
777 let (tracer_provider, endpoint_opt) = if otlp_exporter_enabled() {
779 let endpoint = std::env::var(OTEL_EXPORT_ENDPOINT_ENV)
781 .unwrap_or_else(|_| DEFAULT_OTLP_ENDPOINT.to_string());
782
783 let otlp_exporter = opentelemetry_otlp::SpanExporter::builder()
785 .with_tonic()
786 .with_endpoint(&endpoint)
787 .build()?;
788
789 let provider = opentelemetry_sdk::trace::SdkTracerProvider::builder()
791 .with_batch_exporter(otlp_exporter)
792 .with_resource(
793 opentelemetry_sdk::Resource::builder_empty()
794 .with_service_name(service_name.clone())
795 .build(),
796 )
797 .build();
798
799 (provider, Some(endpoint))
800 } else {
801 let provider = opentelemetry_sdk::trace::SdkTracerProvider::builder()
803 .with_resource(
804 opentelemetry_sdk::Resource::builder_empty()
805 .with_service_name(service_name.clone())
806 .build(),
807 )
808 .build();
809
810 (provider, None)
811 };
812
813 let tracer = tracer_provider.tracer(service_name.clone());
815
816 tracing_subscriber::registry()
817 .with(
818 tracing_opentelemetry::layer()
819 .with_tracer(tracer)
820 .with_filter(otel_filter_layer),
821 )
822 .with(DistributedTraceIdLayer.with_filter(trace_filter_layer))
823 .with(l)
824 .init();
825
826 if let Some(endpoint) = endpoint_opt {
828 tracing::info!(
829 endpoint = %endpoint,
830 service = %service_name,
831 "OpenTelemetry OTLP export enabled"
832 );
833 } else {
834 tracing::info!(
835 service = %service_name,
836 "OpenTelemetry OTLP export disabled, traces local only"
837 );
838 }
839 } else {
840 let l = fmt::layer()
841 .with_ansi(!disable_ansi_logging())
842 .event_format(fmt::format().compact().with_timer(TimeFormatter::new()))
843 .with_writer(std::io::stderr)
844 .with_filter(fmt_filter_layer);
845
846 tracing_subscriber::registry().with(l).init();
847 }
848
849 Ok(())
850}
851
852fn filters(config: LoggingConfig) -> EnvFilter {
853 let mut filter_layer = EnvFilter::builder()
854 .with_default_directive(config.log_level.parse().unwrap())
855 .with_env_var(FILTER_ENV)
856 .from_env_lossy();
857
858 for (module, level) in config.log_filters {
859 match format!("{module}={level}").parse::<Directive>() {
860 Ok(d) => {
861 filter_layer = filter_layer.add_directive(d);
862 }
863 Err(e) => {
864 eprintln!("Failed parsing filter '{level}' for module '{module}': {e}");
865 }
866 }
867 }
868 filter_layer
869}
870
871pub fn log_message(level: &str, message: &str, module: &str, file: &str, line: u32) {
874 let level = match level {
875 "debug" => log::Level::Debug,
876 "info" => log::Level::Info,
877 "warn" => log::Level::Warn,
878 "error" => log::Level::Error,
879 "warning" => log::Level::Warn,
880 _ => log::Level::Info,
881 };
882 log::logger().log(
883 &log::Record::builder()
884 .args(format_args!("{}", message))
885 .level(level)
886 .target(module)
887 .file(Some(file))
888 .line(Some(line))
889 .build(),
890 );
891}
892
893fn load_config() -> LoggingConfig {
894 let config_path = std::env::var(CONFIG_PATH_ENV).unwrap_or_else(|_| "".to_string());
895 let figment = Figment::new()
896 .merge(Serialized::defaults(LoggingConfig::default()))
897 .merge(Toml::file("/opt/dynamo/etc/logging.toml"))
898 .merge(Toml::file(config_path));
899
900 figment.extract().unwrap()
901}
902
903#[derive(Serialize)]
904struct JsonLog<'a> {
905 time: String,
906 level: String,
907 #[serde(skip_serializing_if = "Option::is_none")]
908 file: Option<&'a str>,
909 #[serde(skip_serializing_if = "Option::is_none")]
910 line: Option<u32>,
911 target: &'a str,
912 message: serde_json::Value,
913 #[serde(flatten)]
914 fields: BTreeMap<String, serde_json::Value>,
915}
916
917struct TimeFormatter {
918 use_local_tz: bool,
919}
920
921impl TimeFormatter {
922 fn new() -> Self {
923 Self {
924 use_local_tz: crate::config::use_local_timezone(),
925 }
926 }
927
928 fn format_now(&self) -> String {
929 if self.use_local_tz {
930 chrono::Local::now()
931 .format("%Y-%m-%dT%H:%M:%S%.6f%:z")
932 .to_string()
933 } else {
934 chrono::Utc::now()
935 .format("%Y-%m-%dT%H:%M:%S%.6fZ")
936 .to_string()
937 }
938 }
939}
940
941impl FormatTime for TimeFormatter {
942 fn format_time(&self, w: &mut fmt::format::Writer<'_>) -> std::fmt::Result {
943 write!(w, "{}", self.format_now())
944 }
945}
946
947struct CustomJsonFormatter {
948 time_formatter: TimeFormatter,
949}
950
951impl CustomJsonFormatter {
952 fn new() -> Self {
953 Self {
954 time_formatter: TimeFormatter::new(),
955 }
956 }
957}
958
959use once_cell::sync::Lazy;
960use regex::Regex;
961fn parse_tracing_duration(s: &str) -> Option<u64> {
962 static RE: Lazy<Regex> =
963 Lazy::new(|| Regex::new(r#"^["']?\s*([0-9.]+)\s*(µs|us|ns|ms|s)\s*["']?$"#).unwrap());
964 let captures = RE.captures(s)?;
965 let value: f64 = captures[1].parse().ok()?;
966 let unit = &captures[2];
967 match unit {
968 "ns" => Some((value / 1000.0) as u64),
969 "µs" | "us" => Some(value as u64),
970 "ms" => Some((value * 1000.0) as u64),
971 "s" => Some((value * 1_000_000.0) as u64),
972 _ => None,
973 }
974}
975
976impl<S, N> tracing_subscriber::fmt::FormatEvent<S, N> for CustomJsonFormatter
977where
978 S: Subscriber + for<'a> LookupSpan<'a>,
979 N: for<'a> FormatFields<'a> + 'static,
980{
981 fn format_event(
982 &self,
983 ctx: &FmtContext<'_, S, N>,
984 mut writer: Writer<'_>,
985 event: &Event<'_>,
986 ) -> std::fmt::Result {
987 let mut visitor = JsonVisitor::default();
988 let time = self.time_formatter.format_now();
989 event.record(&mut visitor);
990 let mut message = visitor
991 .fields
992 .remove("message")
993 .unwrap_or(serde_json::Value::String("".to_string()));
994
995 let current_span = event
996 .parent()
997 .and_then(|id| ctx.span(id))
998 .or_else(|| ctx.lookup_current());
999 if let Some(span) = current_span {
1000 let ext = span.extensions();
1001 let data = ext.get::<FormattedFields<N>>().unwrap();
1002 let span_fields: Vec<(&str, &str)> = data
1003 .fields
1004 .split(' ')
1005 .filter_map(|entry| entry.split_once('='))
1006 .collect();
1007 for (name, value) in span_fields {
1008 visitor.fields.insert(
1009 name.to_string(),
1010 serde_json::Value::String(value.trim_matches('"').to_string()),
1011 );
1012 }
1013
1014 let busy_us = visitor
1015 .fields
1016 .remove("time.busy")
1017 .and_then(|v| parse_tracing_duration(&v.to_string()));
1018 let idle_us = visitor
1019 .fields
1020 .remove("time.idle")
1021 .and_then(|v| parse_tracing_duration(&v.to_string()));
1022
1023 if let (Some(busy_us), Some(idle_us)) = (busy_us, idle_us) {
1024 visitor.fields.insert(
1025 "time.busy_us".to_string(),
1026 serde_json::Value::Number(busy_us.into()),
1027 );
1028 visitor.fields.insert(
1029 "time.idle_us".to_string(),
1030 serde_json::Value::Number(idle_us.into()),
1031 );
1032 visitor.fields.insert(
1033 "time.duration_us".to_string(),
1034 serde_json::Value::Number((busy_us + idle_us).into()),
1035 );
1036 }
1037
1038 message = match message.as_str() {
1039 Some("new") => serde_json::Value::String("SPAN_CREATED".to_string()),
1040 Some("close") => serde_json::Value::String("SPAN_CLOSED".to_string()),
1041 _ => message.clone(),
1042 };
1043
1044 visitor.fields.insert(
1045 "span_name".to_string(),
1046 serde_json::Value::String(span.name().to_string()),
1047 );
1048
1049 if let Some(tracing_context) = ext.get::<DistributedTraceContext>() {
1050 visitor.fields.insert(
1051 "span_id".to_string(),
1052 serde_json::Value::String(tracing_context.span_id.clone()),
1053 );
1054 visitor.fields.insert(
1055 "trace_id".to_string(),
1056 serde_json::Value::String(tracing_context.trace_id.clone()),
1057 );
1058 if let Some(parent_id) = tracing_context.parent_id.clone() {
1059 visitor.fields.insert(
1060 "parent_id".to_string(),
1061 serde_json::Value::String(parent_id),
1062 );
1063 } else {
1064 visitor.fields.remove("parent_id");
1065 }
1066 if let Some(tracestate) = tracing_context.tracestate.clone() {
1067 visitor.fields.insert(
1068 "tracestate".to_string(),
1069 serde_json::Value::String(tracestate),
1070 );
1071 } else {
1072 visitor.fields.remove("tracestate");
1073 }
1074 if let Some(x_request_id) = tracing_context.x_request_id.clone() {
1075 visitor.fields.insert(
1076 "x_request_id".to_string(),
1077 serde_json::Value::String(x_request_id),
1078 );
1079 } else {
1080 visitor.fields.remove("x_request_id");
1081 }
1082
1083 if let Some(x_dynamo_request_id) = tracing_context.x_dynamo_request_id.clone() {
1084 visitor.fields.insert(
1085 "x_dynamo_request_id".to_string(),
1086 serde_json::Value::String(x_dynamo_request_id),
1087 );
1088 } else {
1089 visitor.fields.remove("x_dynamo_request_id");
1090 }
1091 } else {
1092 tracing::error!(
1093 "Distributed Trace Context not found, falling back to internal ids"
1094 );
1095 visitor.fields.insert(
1096 "span_id".to_string(),
1097 serde_json::Value::String(span.id().into_u64().to_string()),
1098 );
1099 if let Some(parent) = span.parent() {
1100 visitor.fields.insert(
1101 "parent_id".to_string(),
1102 serde_json::Value::String(parent.id().into_u64().to_string()),
1103 );
1104 }
1105 }
1106 } else {
1107 let reserved_fields = [
1108 "trace_id",
1109 "span_id",
1110 "parent_id",
1111 "span_name",
1112 "tracestate",
1113 ];
1114 for reserved_field in reserved_fields {
1115 visitor.fields.remove(reserved_field);
1116 }
1117 }
1118 let metadata = event.metadata();
1119 let log = JsonLog {
1120 level: metadata.level().to_string(),
1121 time,
1122 file: metadata.file(),
1123 line: metadata.line(),
1124 target: metadata.target(),
1125 message,
1126 fields: visitor.fields,
1127 };
1128 let json = serde_json::to_string(&log).unwrap();
1129 writeln!(writer, "{json}")
1130 }
1131}
1132
1133#[derive(Default)]
1134struct JsonVisitor {
1135 fields: BTreeMap<String, serde_json::Value>,
1136}
1137
1138impl tracing::field::Visit for JsonVisitor {
1139 fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
1140 self.fields.insert(
1141 field.name().to_string(),
1142 serde_json::Value::String(format!("{value:?}")),
1143 );
1144 }
1145
1146 fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
1147 if field.name() != "message" {
1148 match serde_json::from_str::<Value>(value) {
1149 Ok(json_val) => self.fields.insert(field.name().to_string(), json_val),
1150 Err(_) => self.fields.insert(field.name().to_string(), value.into()),
1151 };
1152 } else {
1153 self.fields.insert(field.name().to_string(), value.into());
1154 }
1155 }
1156
1157 fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
1158 self.fields
1159 .insert(field.name().to_string(), serde_json::Value::Bool(value));
1160 }
1161
1162 fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
1163 self.fields.insert(
1164 field.name().to_string(),
1165 serde_json::Value::Number(value.into()),
1166 );
1167 }
1168
1169 fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
1170 self.fields.insert(
1171 field.name().to_string(),
1172 serde_json::Value::Number(value.into()),
1173 );
1174 }
1175
1176 fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
1177 use serde_json::value::Number;
1178 self.fields.insert(
1179 field.name().to_string(),
1180 serde_json::Value::Number(Number::from_f64(value).unwrap_or(0.into())),
1181 );
1182 }
1183}
1184
1185#[cfg(test)]
1186pub mod tests {
1187 use super::*;
1188 use anyhow::{Result, anyhow};
1189 use chrono::{DateTime, Utc};
1190 use jsonschema::{Draft, JSONSchema};
1191 use serde_json::Value;
1192 use std::fs::File;
1193 use std::io::{BufRead, BufReader};
1194 use stdio_override::*;
1195 use tempfile::NamedTempFile;
1196
1197 static LOG_LINE_SCHEMA: &str = r#"
1198 {
1199 "$schema": "http://json-schema.org/draft-07/schema#",
1200 "title": "Runtime Log Line",
1201 "type": "object",
1202 "required": [
1203 "file",
1204 "level",
1205 "line",
1206 "message",
1207 "target",
1208 "time"
1209 ],
1210 "properties": {
1211 "file": { "type": "string" },
1212 "level": { "type": "string", "enum": ["ERROR", "WARN", "INFO", "DEBUG", "TRACE"] },
1213 "line": { "type": "integer" },
1214 "message": { "type": "string" },
1215 "target": { "type": "string" },
1216 "time": { "type": "string", "format": "date-time" },
1217 "span_id": { "type": "string", "pattern": "^[a-f0-9]{16}$" },
1218 "parent_id": { "type": "string", "pattern": "^[a-f0-9]{16}$" },
1219 "trace_id": { "type": "string", "pattern": "^[a-f0-9]{32}$" },
1220 "span_name": { "type": "string" },
1221 "time.busy_us": { "type": "integer" },
1222 "time.duration_us": { "type": "integer" },
1223 "time.idle_us": { "type": "integer" },
1224 "tracestate": { "type": "string" }
1225 },
1226 "additionalProperties": true
1227 }
1228 "#;
1229
1230 #[tracing::instrument(skip_all)]
1231 async fn parent() {
1232 tracing::trace!(message = "parent!");
1233 if let Some(my_ctx) = get_distributed_tracing_context() {
1234 tracing::info!(my_trace_id = my_ctx.trace_id);
1235 }
1236 child().await;
1237 }
1238
1239 #[tracing::instrument(skip_all)]
1240 async fn child() {
1241 tracing::trace!(message = "child");
1242 if let Some(my_ctx) = get_distributed_tracing_context() {
1243 tracing::info!(my_trace_id = my_ctx.trace_id);
1244 }
1245 grandchild().await;
1246 }
1247
1248 #[tracing::instrument(skip_all)]
1249 async fn grandchild() {
1250 tracing::trace!(message = "grandchild");
1251 if let Some(my_ctx) = get_distributed_tracing_context() {
1252 tracing::info!(my_trace_id = my_ctx.trace_id);
1253 }
1254 }
1255
1256 pub fn load_log(file_name: &str) -> Result<Vec<serde_json::Value>> {
1257 let schema_json: Value =
1258 serde_json::from_str(LOG_LINE_SCHEMA).expect("schema parse failure");
1259 let compiled_schema = JSONSchema::options()
1260 .with_draft(Draft::Draft7)
1261 .compile(&schema_json)
1262 .expect("Invalid schema");
1263
1264 let f = File::open(file_name)?;
1265 let reader = BufReader::new(f);
1266 let mut result = Vec::new();
1267
1268 for (line_num, line) in reader.lines().enumerate() {
1269 let line = line?;
1270 let val: Value = serde_json::from_str(&line)
1271 .map_err(|e| anyhow!("Line {}: invalid JSON: {}", line_num + 1, e))?;
1272
1273 if let Err(errors) = compiled_schema.validate(&val) {
1274 let errs = errors.map(|e| e.to_string()).collect::<Vec<_>>().join("; ");
1275 return Err(anyhow!(
1276 "Line {}: JSON Schema Validation errors: {}",
1277 line_num + 1,
1278 errs
1279 ));
1280 }
1281 println!("{}", val);
1282 result.push(val);
1283 }
1284 Ok(result)
1285 }
1286
1287 #[tokio::test]
1288 async fn test_json_log_capture() -> Result<()> {
1289 #[allow(clippy::redundant_closure_call)]
1290 let _ = temp_env::async_with_vars(
1291 [("DYN_LOGGING_JSONL", Some("1"))],
1292 (async || {
1293 let tmp_file = NamedTempFile::new().unwrap();
1294 let file_name = tmp_file.path().to_str().unwrap();
1295 let guard = StderrOverride::from_file(file_name)?;
1296 init();
1297 parent().await;
1298 drop(guard);
1299
1300 let lines = load_log(file_name)?;
1301
1302 let trace_id = lines
1306 .iter()
1307 .find_map(|log_line| log_line.get("trace_id").and_then(|v| v.as_str()))
1308 .expect("At least one log line should have a trace_id")
1309 .to_string();
1310
1311 assert_ne!(
1313 trace_id, "00000000000000000000000000000000",
1314 "trace_id should not be a zero/invalid ID"
1315 );
1316 assert!(
1317 !trace_id.chars().all(|c| c == '0'),
1318 "trace_id should not be all zeros"
1319 );
1320
1321 for log_line in &lines {
1323 if let Some(line_trace_id) = log_line.get("trace_id") {
1324 assert_eq!(
1325 line_trace_id.as_str().unwrap(),
1326 &trace_id,
1327 "All logs should have the same trace_id"
1328 );
1329 }
1330 }
1331
1332 for log_line in &lines {
1334 if let Some(my_trace_id) = log_line.get("my_trace_id") {
1335 assert_eq!(
1336 my_trace_id,
1337 &serde_json::Value::String(trace_id.clone()),
1338 "my_trace_id should match the trace_id from distributed tracing context"
1339 );
1340 }
1341 }
1342
1343 let mut span_ids_seen: std::collections::HashSet<String> = std::collections::HashSet::new();
1345 let mut span_timestamps: std::collections::HashMap<String, DateTime<Utc>> = std::collections::HashMap::new();
1346
1347 for log_line in &lines {
1348 if let Some(span_id) = log_line.get("span_id") {
1349 let span_id_str = span_id.as_str().unwrap();
1350 assert!(
1351 is_valid_span_id(span_id_str),
1352 "Invalid span_id format: {}",
1353 span_id_str
1354 );
1355 span_ids_seen.insert(span_id_str.to_string());
1356 }
1357
1358 if let Some(time_str) = log_line.get("time").and_then(|v| v.as_str()) {
1360 let timestamp = DateTime::parse_from_rfc3339(time_str)
1361 .expect("All timestamps should be valid RFC3339 format")
1362 .with_timezone(&Utc);
1363
1364 if let Some(span_name) = log_line.get("span_name").and_then(|v| v.as_str()) {
1366 span_timestamps.insert(span_name.to_string(), timestamp);
1367 }
1368 }
1369 }
1370
1371 let parent_span_id = lines
1374 .iter()
1375 .find(|log_line| {
1376 log_line.get("span_name")
1377 .and_then(|v| v.as_str()) == Some("parent")
1378 })
1379 .and_then(|log_line| {
1380 log_line.get("span_id")
1381 .and_then(|v| v.as_str())
1382 .map(|s| s.to_string())
1383 })
1384 .expect("Should find parent span with span_id");
1385
1386 let child_span_id = lines
1387 .iter()
1388 .find(|log_line| {
1389 log_line.get("span_name")
1390 .and_then(|v| v.as_str()) == Some("child")
1391 })
1392 .and_then(|log_line| {
1393 log_line.get("span_id")
1394 .and_then(|v| v.as_str())
1395 .map(|s| s.to_string())
1396 })
1397 .expect("Should find child span with span_id");
1398
1399 let grandchild_span_id = lines
1400 .iter()
1401 .find(|log_line| {
1402 log_line.get("span_name")
1403 .and_then(|v| v.as_str()) == Some("grandchild")
1404 })
1405 .and_then(|log_line| {
1406 log_line.get("span_id")
1407 .and_then(|v| v.as_str())
1408 .map(|s| s.to_string())
1409 })
1410 .expect("Should find grandchild span with span_id");
1411
1412 assert_ne!(parent_span_id, child_span_id, "Parent and child should have different span IDs");
1414 assert_ne!(child_span_id, grandchild_span_id, "Child and grandchild should have different span IDs");
1415 assert_ne!(parent_span_id, grandchild_span_id, "Parent and grandchild should have different span IDs");
1416
1417 for log_line in &lines {
1419 if let Some(span_name) = log_line.get("span_name")
1420 && let Some(span_name_str) = span_name.as_str()
1421 && span_name_str == "parent"
1422 {
1423 assert!(
1424 log_line.get("parent_id").is_none(),
1425 "Parent span should not have a parent_id"
1426 );
1427 }
1428 }
1429
1430 for log_line in &lines {
1432 if let Some(span_name) = log_line.get("span_name")
1433 && let Some(span_name_str) = span_name.as_str()
1434 && span_name_str == "child"
1435 {
1436 let parent_id = log_line.get("parent_id")
1437 .and_then(|v| v.as_str())
1438 .expect("Child span should have a parent_id");
1439 assert_eq!(
1440 parent_id,
1441 parent_span_id,
1442 "Child's parent_id should match parent's span_id"
1443 );
1444 }
1445 }
1446
1447 for log_line in &lines {
1449 if let Some(span_name) = log_line.get("span_name")
1450 && let Some(span_name_str) = span_name.as_str()
1451 && span_name_str == "grandchild"
1452 {
1453 let parent_id = log_line.get("parent_id")
1454 .and_then(|v| v.as_str())
1455 .expect("Grandchild span should have a parent_id");
1456 assert_eq!(
1457 parent_id,
1458 child_span_id,
1459 "Grandchild's parent_id should match child's span_id"
1460 );
1461 }
1462 }
1463
1464 let parent_time = span_timestamps.get("parent")
1466 .expect("Should have timestamp for parent span");
1467 let child_time = span_timestamps.get("child")
1468 .expect("Should have timestamp for child span");
1469 let grandchild_time = span_timestamps.get("grandchild")
1470 .expect("Should have timestamp for grandchild span");
1471
1472 assert!(
1474 parent_time <= child_time,
1475 "Parent span should log before or at same time as child span (parent: {}, child: {})",
1476 parent_time,
1477 child_time
1478 );
1479 assert!(
1480 child_time <= grandchild_time,
1481 "Child span should log before or at same time as grandchild span (child: {}, grandchild: {})",
1482 child_time,
1483 grandchild_time
1484 );
1485
1486 Ok::<(), anyhow::Error>(())
1487 })(),
1488 )
1489 .await;
1490 Ok(())
1491 }
1492}