1use std::collections::{BTreeMap, HashMap};
30use std::sync::Once;
31
32use figment::{
33 Figment,
34 providers::{Format, Serialized, Toml},
35};
36use serde::{Deserialize, Serialize};
37use tracing::level_filters::LevelFilter;
38use tracing::{Event, Subscriber};
39use tracing_subscriber::EnvFilter;
40use tracing_subscriber::fmt::time::FormatTime;
41use tracing_subscriber::fmt::time::LocalTime;
42use tracing_subscriber::fmt::time::SystemTime;
43use tracing_subscriber::fmt::time::UtcTime;
44use tracing_subscriber::fmt::{FmtContext, FormatFields};
45use tracing_subscriber::fmt::{FormattedFields, format::Writer};
46use tracing_subscriber::prelude::*;
47use tracing_subscriber::registry::LookupSpan;
48use tracing_subscriber::{filter::Directive, fmt};
49
50use crate::config::{disable_ansi_logging, jsonl_logging_enabled};
51use async_nats::{HeaderMap, HeaderValue};
52use axum::extract::FromRequestParts;
53use axum::http;
54use axum::http::Request;
55use axum::http::request::Parts;
56use serde_json::Value;
57use std::convert::Infallible;
58use std::time::Instant;
59use tower_http::trace::{DefaultMakeSpan, TraceLayer};
60use tracing::Id;
61use tracing::Span;
62use tracing::field::Field;
63use tracing::span;
64use tracing_subscriber::Layer;
65use tracing_subscriber::Registry;
66use tracing_subscriber::field::Visit;
67use tracing_subscriber::fmt::format::FmtSpan;
68use tracing_subscriber::layer::Context;
69use tracing_subscriber::registry::SpanData;
70use uuid::Uuid;
71
72use opentelemetry::propagation::{Extractor, Injector, TextMapPropagator};
73use opentelemetry::trace::TraceContextExt;
74use opentelemetry::{global, trace::Tracer};
75use opentelemetry_otlp::WithExportConfig;
76
77use opentelemetry::trace::TracerProvider as _;
78use opentelemetry::{Key, KeyValue};
79use opentelemetry_sdk::Resource;
80use opentelemetry_sdk::trace::SdkTracerProvider;
81use tracing::error;
82use tracing_subscriber::layer::SubscriberExt;
83use std::time::Duration;
86use tracing::{info, instrument};
87use tracing_opentelemetry::OpenTelemetrySpanExt;
88use tracing_subscriber::util::SubscriberInitExt;
89
90const FILTER_ENV: &str = "DYN_LOG";
92
93const DEFAULT_FILTER_LEVEL: &str = "info";
95
96const CONFIG_PATH_ENV: &str = "DYN_LOGGING_CONFIG_PATH";
98
99const OTEL_EXPORT_ENABLED_ENV: &str = "OTEL_EXPORT_ENABLED";
101
102const OTEL_EXPORT_ENDPOINT_ENV: &str = "OTEL_EXPORT_ENDPOINT";
104
105const DEFAULT_OTLP_ENDPOINT: &str = "http://localhost:4317";
107
108const OTEL_SERVICE_NAME_ENV: &str = "OTEL_SERVICE_NAME";
110
111const DEFAULT_OTEL_SERVICE_NAME: &str = "dynamo";
113
114static INIT: Once = Once::new();
116
117#[derive(Serialize, Deserialize, Debug)]
118struct LoggingConfig {
119 log_level: String,
120 log_filters: HashMap<String, String>,
121}
122impl Default for LoggingConfig {
123 fn default() -> Self {
124 LoggingConfig {
125 log_level: DEFAULT_FILTER_LEVEL.to_string(),
126 log_filters: HashMap::from([
127 ("h2".to_string(), "error".to_string()),
128 ("tower".to_string(), "error".to_string()),
129 ("hyper_util".to_string(), "error".to_string()),
130 ("neli".to_string(), "error".to_string()),
131 ("async_nats".to_string(), "error".to_string()),
132 ("rustls".to_string(), "error".to_string()),
133 ("tokenizers".to_string(), "error".to_string()),
134 ("axum".to_string(), "error".to_string()),
135 ("tonic".to_string(), "error".to_string()),
136 ("mistralrs_core".to_string(), "error".to_string()),
137 ("hf_hub".to_string(), "error".to_string()),
138 ("opentelemetry".to_string(), "error".to_string()),
139 ("opentelemetry-otlp".to_string(), "error".to_string()),
140 ("opentelemetry_sdk".to_string(), "error".to_string()),
141 ]),
142 }
143 }
144}
145
146fn otlp_exporter_enabled() -> bool {
148 std::env::var(OTEL_EXPORT_ENABLED_ENV)
149 .map(|v| v == "1")
150 .unwrap_or(false)
151}
152
153fn get_service_name() -> String {
155 std::env::var(OTEL_SERVICE_NAME_ENV).unwrap_or_else(|_| DEFAULT_OTEL_SERVICE_NAME.to_string())
156}
157
158pub fn is_valid_trace_id(trace_id: &str) -> bool {
161 trace_id.len() == 32 && trace_id.chars().all(|c| c.is_ascii_hexdigit())
162}
163
164pub fn is_valid_span_id(span_id: &str) -> bool {
167 span_id.len() == 16 && span_id.chars().all(|c| c.is_ascii_hexdigit())
168}
169
170pub struct DistributedTraceIdLayer;
171
172#[derive(Debug, Clone, Serialize, Deserialize)]
173pub struct DistributedTraceContext {
174 pub trace_id: String,
175 pub span_id: String,
176 #[serde(skip_serializing_if = "Option::is_none")]
177 pub parent_id: Option<String>,
178 #[serde(skip_serializing_if = "Option::is_none")]
179 pub tracestate: Option<String>,
180 #[serde(skip)]
181 start: Option<Instant>,
182 #[serde(skip)]
183 end: Option<Instant>,
184 #[serde(skip_serializing_if = "Option::is_none")]
185 pub x_request_id: Option<String>,
186 #[serde(skip_serializing_if = "Option::is_none")]
187 pub x_dynamo_request_id: Option<String>,
188}
189
190#[derive(Debug, Clone)]
192struct PendingDistributedTraceContext {
193 trace_id: Option<String>,
194 span_id: Option<String>,
195 parent_id: Option<String>,
196 tracestate: Option<String>,
197 x_request_id: Option<String>,
198 x_dynamo_request_id: Option<String>,
199}
200
201impl DistributedTraceContext {
202 pub fn create_traceparent(&self) -> String {
204 format!("00-{}-{}-01", self.trace_id, self.span_id)
205 }
206}
207
208pub fn parse_traceparent(traceparent: &str) -> (Option<String>, Option<String>) {
210 let pieces: Vec<_> = traceparent.split('-').collect();
211 if pieces.len() != 4 {
212 return (None, None);
213 }
214 let trace_id = pieces[1];
215 let parent_id = pieces[2];
216
217 if !is_valid_trace_id(trace_id) || !is_valid_span_id(parent_id) {
218 return (None, None);
219 }
220
221 (Some(trace_id.to_string()), Some(parent_id.to_string()))
222}
223
224#[derive(Debug, Clone, Default)]
225pub struct TraceParent {
226 pub trace_id: Option<String>,
227 pub parent_id: Option<String>,
228 pub tracestate: Option<String>,
229 pub x_request_id: Option<String>,
230 pub x_dynamo_request_id: Option<String>,
231}
232
233pub trait GenericHeaders {
234 fn get(&self, key: &str) -> Option<&str>;
235}
236
237impl GenericHeaders for async_nats::HeaderMap {
238 fn get(&self, key: &str) -> Option<&str> {
239 async_nats::HeaderMap::get(self, key).map(|value| value.as_str())
240 }
241}
242
243impl GenericHeaders for http::HeaderMap {
244 fn get(&self, key: &str) -> Option<&str> {
245 http::HeaderMap::get(self, key).and_then(|value| value.to_str().ok())
246 }
247}
248
249impl TraceParent {
250 pub fn from_headers<H: GenericHeaders>(headers: &H) -> TraceParent {
251 let mut trace_id = None;
252 let mut parent_id = None;
253 let mut tracestate = None;
254 let mut x_request_id = None;
255 let mut x_dynamo_request_id = None;
256
257 if let Some(header_value) = headers.get("traceparent") {
258 (trace_id, parent_id) = parse_traceparent(header_value);
259 }
260
261 if let Some(header_value) = headers.get("x-request-id") {
262 x_request_id = Some(header_value.to_string());
263 }
264
265 if let Some(header_value) = headers.get("tracestate") {
266 tracestate = Some(header_value.to_string());
267 }
268
269 if let Some(header_value) = headers.get("x-dynamo-request-id") {
270 x_dynamo_request_id = Some(header_value.to_string());
271 }
272
273 let x_dynamo_request_id =
275 x_dynamo_request_id.filter(|id| uuid::Uuid::parse_str(id).is_ok());
276 TraceParent {
277 trace_id,
278 parent_id,
279 tracestate,
280 x_request_id,
281 x_dynamo_request_id,
282 }
283 }
284}
285
286pub fn make_request_span<B>(req: &Request<B>) -> Span {
288 let method = req.method();
289 let uri = req.uri();
290 let version = format!("{:?}", req.version());
291 let trace_parent = TraceParent::from_headers(req.headers());
292
293 let span = tracing::info_span!(
294 "http-request",
295 method = %method,
296 uri = %uri,
297 version = %version,
298 trace_id = trace_parent.trace_id,
299 parent_id = trace_parent.parent_id,
300 x_request_id = trace_parent.x_request_id,
301 x_dynamo_request_id = trace_parent.x_dynamo_request_id,
302 );
303
304 span
305}
306
307pub fn make_handle_payload_span(
309 headers: &async_nats::HeaderMap,
310 component: &str,
311 endpoint: &str,
312 namespace: &str,
313 instance_id: i64,
314) -> Span {
315 let (otel_context, trace_id, parent_span_id) = extract_otel_context_from_nats_headers(headers);
316 let trace_parent = TraceParent::from_headers(headers);
317
318 if let (Some(trace_id), Some(parent_id)) = (trace_id.as_ref(), parent_span_id.as_ref()) {
319 let span = tracing::info_span!(
320 "handle_payload",
321 trace_id = trace_id.as_str(),
322 parent_id = parent_id.as_str(),
323 x_request_id = trace_parent.x_request_id,
324 x_dynamo_request_id = trace_parent.x_dynamo_request_id,
325 tracestate = trace_parent.tracestate,
326 component = component,
327 endpoint = endpoint,
328 namespace = namespace,
329 instance_id = instance_id,
330 );
331
332 if let Some(context) = otel_context {
333 let _ = span.set_parent(context);
334 }
335 span
336 } else {
337 tracing::info_span!(
338 "handle_payload",
339 x_request_id = trace_parent.x_request_id,
340 x_dynamo_request_id = trace_parent.x_dynamo_request_id,
341 tracestate = trace_parent.tracestate,
342 component = component,
343 endpoint = endpoint,
344 namespace = namespace,
345 instance_id = instance_id,
346 )
347 }
348}
349
350pub fn extract_otel_context_from_nats_headers(
352 headers: &async_nats::HeaderMap,
353) -> (
354 Option<opentelemetry::Context>,
355 Option<String>,
356 Option<String>,
357) {
358 let traceparent_value = match headers.get("traceparent") {
359 Some(value) => value.as_str(),
360 None => return (None, None, None),
361 };
362
363 let (trace_id, parent_span_id) = parse_traceparent(traceparent_value);
364
365 struct NatsHeaderExtractor<'a>(&'a async_nats::HeaderMap);
366
367 impl<'a> Extractor for NatsHeaderExtractor<'a> {
368 fn get(&self, key: &str) -> Option<&str> {
369 self.0.get(key).map(|value| value.as_str())
370 }
371
372 fn keys(&self) -> Vec<&str> {
373 vec!["traceparent", "tracestate"]
374 .into_iter()
375 .filter(|&key| self.0.get(key).is_some())
376 .collect()
377 }
378 }
379
380 let extractor = NatsHeaderExtractor(headers);
381 let propagator = opentelemetry_sdk::propagation::TraceContextPropagator::new();
382 let otel_context = propagator.extract(&extractor);
383
384 let context_with_trace = if otel_context.span().span_context().is_valid() {
385 Some(otel_context)
386 } else {
387 None
388 };
389
390 (context_with_trace, trace_id, parent_span_id)
391}
392
393pub fn inject_otel_context_into_nats_headers(
395 headers: &mut async_nats::HeaderMap,
396 context: Option<opentelemetry::Context>,
397) {
398 let otel_context = context.unwrap_or_else(|| Span::current().context());
399
400 struct NatsHeaderInjector<'a>(&'a mut async_nats::HeaderMap);
401
402 impl<'a> Injector for NatsHeaderInjector<'a> {
403 fn set(&mut self, key: &str, value: String) {
404 self.0.insert(key, value);
405 }
406 }
407
408 let mut injector = NatsHeaderInjector(headers);
409 let propagator = opentelemetry_sdk::propagation::TraceContextPropagator::new();
410 propagator.inject_context(&otel_context, &mut injector);
411}
412
413pub fn inject_current_trace_into_nats_headers(headers: &mut async_nats::HeaderMap) {
415 inject_otel_context_into_nats_headers(headers, None);
416}
417
418pub fn make_client_request_span(
420 operation: &str,
421 request_id: &str,
422 trace_context: Option<&DistributedTraceContext>,
423 instance_id: Option<&str>,
424) -> Span {
425 if let Some(ctx) = trace_context {
426 let mut headers = async_nats::HeaderMap::new();
427 headers.insert("traceparent", ctx.create_traceparent());
428
429 if let Some(ref tracestate) = ctx.tracestate {
430 headers.insert("tracestate", tracestate.as_str());
431 }
432
433 let (otel_context, _extracted_trace_id, _extracted_parent_span_id) =
434 extract_otel_context_from_nats_headers(&headers);
435
436 let span = if let Some(inst_id) = instance_id {
437 tracing::info_span!(
438 "client_request",
439 operation = operation,
440 request_id = request_id,
441 instance_id = inst_id,
442 trace_id = ctx.trace_id.as_str(),
443 parent_id = ctx.span_id.as_str(),
444 x_request_id = ctx.x_request_id.as_deref(),
445 x_dynamo_request_id = ctx.x_dynamo_request_id.as_deref(),
446 )
448 } else {
449 tracing::info_span!(
450 "client_request",
451 operation = operation,
452 request_id = request_id,
453 trace_id = ctx.trace_id.as_str(),
454 parent_id = ctx.span_id.as_str(),
455 x_request_id = ctx.x_request_id.as_deref(),
456 x_dynamo_request_id = ctx.x_dynamo_request_id.as_deref(),
457 )
459 };
460
461 if let Some(context) = otel_context {
462 let _ = span.set_parent(context);
463 }
464
465 span
466 } else if let Some(inst_id) = instance_id {
467 tracing::info_span!(
468 "client_request",
469 operation = operation,
470 request_id = request_id,
471 instance_id = inst_id,
472 )
473 } else {
474 tracing::info_span!(
475 "client_request",
476 operation = operation,
477 request_id = request_id,
478 )
479 }
480}
481
482#[derive(Debug, Default)]
483pub struct FieldVisitor {
484 pub fields: HashMap<String, String>,
485}
486
487impl Visit for FieldVisitor {
488 fn record_str(&mut self, field: &Field, value: &str) {
489 self.fields
490 .insert(field.name().to_string(), value.to_string());
491 }
492
493 fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
494 self.fields
495 .insert(field.name().to_string(), format!("{:?}", value).to_string());
496 }
497}
498
499impl<S> Layer<S> for DistributedTraceIdLayer
500where
501 S: Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>,
502{
503 fn on_close(&self, id: Id, ctx: Context<'_, S>) {
506 if let Some(span) = ctx.span(&id) {
507 let mut extensions = span.extensions_mut();
508 if let Some(distributed_tracing_context) =
509 extensions.get_mut::<DistributedTraceContext>()
510 {
511 distributed_tracing_context.end = Some(Instant::now());
512 }
513 }
514 }
515
516 fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &Id, ctx: Context<'_, S>) {
519 if let Some(span) = ctx.span(id) {
520 let mut trace_id: Option<String> = None;
521 let mut parent_id: Option<String> = None;
522 let mut span_id: Option<String> = None;
523 let mut x_request_id: Option<String> = None;
524 let mut x_dynamo_request_id: Option<String> = None;
525 let mut tracestate: Option<String> = None;
526 let mut visitor = FieldVisitor::default();
527 attrs.record(&mut visitor);
528
529 if let Some(trace_id_input) = visitor.fields.get("trace_id") {
531 if !is_valid_trace_id(trace_id_input) {
532 tracing::trace!("trace id '{}' is not valid! Ignoring.", trace_id_input);
533 } else {
534 trace_id = Some(trace_id_input.to_string());
535 }
536 }
537
538 if let Some(span_id_input) = visitor.fields.get("span_id") {
540 if !is_valid_span_id(span_id_input) {
541 tracing::trace!("span id '{}' is not valid! Ignoring.", span_id_input);
542 } else {
543 span_id = Some(span_id_input.to_string());
544 }
545 }
546
547 if let Some(parent_id_input) = visitor.fields.get("parent_id") {
549 if !is_valid_span_id(parent_id_input) {
550 tracing::trace!("parent id '{}' is not valid! Ignoring.", parent_id_input);
551 } else {
552 parent_id = Some(parent_id_input.to_string());
553 }
554 }
555
556 if let Some(tracestate_input) = visitor.fields.get("tracestate") {
558 tracestate = Some(tracestate_input.to_string());
559 }
560
561 if let Some(x_request_id_input) = visitor.fields.get("x_request_id") {
563 x_request_id = Some(x_request_id_input.to_string());
564 }
565
566 if let Some(x_request_id_input) = visitor.fields.get("x_dynamo_request_id") {
568 x_dynamo_request_id = Some(x_request_id_input.to_string());
569 }
570
571 if parent_id.is_none()
573 && let Some(parent_span_id) = ctx.current_span().id()
574 && let Some(parent_span) = ctx.span(parent_span_id)
575 {
576 let parent_ext = parent_span.extensions();
577 if let Some(parent_tracing_context) = parent_ext.get::<DistributedTraceContext>() {
578 trace_id = Some(parent_tracing_context.trace_id.clone());
579 parent_id = Some(parent_tracing_context.span_id.clone());
580 tracestate = parent_tracing_context.tracestate.clone();
581 }
582 }
583
584 if (parent_id.is_some() || span_id.is_some()) && trace_id.is_none() {
586 tracing::error!("parent id or span id are set but trace id is not set!");
587 parent_id = None;
589 span_id = None;
590 }
591
592 let mut extensions = span.extensions_mut();
594 extensions.insert(PendingDistributedTraceContext {
595 trace_id,
596 span_id,
597 parent_id,
598 tracestate,
599 x_request_id,
600 x_dynamo_request_id,
601 });
602 }
603 }
604
605 fn on_enter(&self, id: &Id, ctx: Context<'_, S>) {
608 if let Some(span) = ctx.span(id) {
609 {
611 let extensions = span.extensions();
612 if extensions.get::<DistributedTraceContext>().is_some() {
613 return;
614 }
615 }
616
617 let mut extensions = span.extensions_mut();
619 let pending = match extensions.remove::<PendingDistributedTraceContext>() {
620 Some(p) => p,
621 None => {
622 tracing::error!("PendingDistributedTraceContext not found in on_enter");
624 return;
625 }
626 };
627
628 let mut trace_id = pending.trace_id;
629 let mut span_id = pending.span_id;
630 let parent_id = pending.parent_id;
631 let tracestate = pending.tracestate;
632 let x_request_id = pending.x_request_id;
633 let x_dynamo_request_id = pending.x_dynamo_request_id;
634
635 drop(extensions);
638
639 if trace_id.is_none() || span_id.is_none() {
640 let extensions = span.extensions();
641 if let Some(otel_data) = extensions.get::<tracing_opentelemetry::OtelData>() {
642 if trace_id.is_none()
644 && let Some(otel_trace_id) = otel_data.trace_id()
645 {
646 let trace_id_str = format!("{}", otel_trace_id);
647 if is_valid_trace_id(&trace_id_str) {
648 trace_id = Some(trace_id_str);
649 }
650 }
651
652 if span_id.is_none()
654 && let Some(otel_span_id) = otel_data.span_id()
655 {
656 let span_id_str = format!("{}", otel_span_id);
657 if is_valid_span_id(&span_id_str) {
658 span_id = Some(span_id_str);
659 }
660 }
661 }
662 }
663
664 if trace_id.is_none() {
666 panic!(
667 "trace_id is not set in on_enter - OtelData may not be properly initialized"
668 );
669 }
670
671 if span_id.is_none() {
672 panic!("span_id is not set in on_enter - OtelData may not be properly initialized");
673 }
674
675 let mut extensions = span.extensions_mut();
677 extensions.insert(DistributedTraceContext {
678 trace_id: trace_id.expect("Trace ID must be set"),
679 span_id: span_id.expect("Span ID must be set"),
680 parent_id,
681 tracestate,
682 start: Some(Instant::now()),
683 end: None,
684 x_request_id,
685 x_dynamo_request_id,
686 });
687 }
688 }
689}
690
691pub fn get_distributed_tracing_context() -> Option<DistributedTraceContext> {
694 Span::current()
695 .with_subscriber(|(id, subscriber)| {
696 subscriber
697 .downcast_ref::<Registry>()
698 .and_then(|registry| registry.span_data(id))
699 .and_then(|span_data| {
700 let extensions = span_data.extensions();
701 extensions.get::<DistributedTraceContext>().cloned()
702 })
703 })
704 .flatten()
705}
706
707pub fn init() {
709 INIT.call_once(|| {
710 if let Err(e) = setup_logging() {
711 eprintln!("Failed to initialize logging: {}", e);
712 std::process::exit(1);
713 }
714 });
715}
716
717#[cfg(feature = "tokio-console")]
718fn setup_logging() {
719 let tokio_console_layer = console_subscriber::ConsoleLayer::builder()
720 .with_default_env()
721 .server_addr(([0, 0, 0, 0], console_subscriber::Server::DEFAULT_PORT))
722 .spawn();
723 let tokio_console_target = tracing_subscriber::filter::Targets::new()
724 .with_default(LevelFilter::ERROR)
725 .with_target("runtime", LevelFilter::TRACE)
726 .with_target("tokio", LevelFilter::TRACE);
727 let l = fmt::layer()
728 .with_ansi(!disable_ansi_logging())
729 .event_format(fmt::format().compact().with_timer(TimeFormatter::new()))
730 .with_writer(std::io::stderr)
731 .with_filter(filters(load_config()));
732 tracing_subscriber::registry()
733 .with(l)
734 .with(tokio_console_layer.with_filter(tokio_console_target))
735 .init();
736}
737
738#[cfg(not(feature = "tokio-console"))]
739fn setup_logging() -> Result<(), Box<dyn std::error::Error>> {
740 let fmt_filter_layer = filters(load_config());
741 let trace_filter_layer = filters(load_config());
742 let otel_filter_layer = filters(load_config());
743
744 if jsonl_logging_enabled() {
745 let l = fmt::layer()
746 .with_ansi(false)
747 .with_span_events(FmtSpan::NEW | FmtSpan::CLOSE)
748 .event_format(CustomJsonFormatter::new())
749 .with_writer(std::io::stderr)
750 .with_filter(fmt_filter_layer);
751
752 let service_name = get_service_name();
754
755 let tracer_provider = if otlp_exporter_enabled() {
757 let endpoint = std::env::var(OTEL_EXPORT_ENDPOINT_ENV)
759 .unwrap_or_else(|_| DEFAULT_OTLP_ENDPOINT.to_string());
760
761 tracing::info!(
762 "OpenTelemetry OTLP export enabled, endpoint: {}, service: {}",
763 endpoint,
764 service_name
765 );
766
767 let otlp_exporter = opentelemetry_otlp::SpanExporter::builder()
769 .with_tonic()
770 .with_endpoint(endpoint)
771 .build()?;
772
773 opentelemetry_sdk::trace::SdkTracerProvider::builder()
775 .with_batch_exporter(otlp_exporter)
776 .with_resource(
777 opentelemetry_sdk::Resource::builder_empty()
778 .with_service_name(service_name.clone())
779 .build(),
780 )
781 .build()
782 } else {
783 tracing::info!(
785 "OpenTelemetry OTLP export disabled, traces local only, service: {}",
786 service_name
787 );
788
789 opentelemetry_sdk::trace::SdkTracerProvider::builder()
790 .with_resource(
791 opentelemetry_sdk::Resource::builder_empty()
792 .with_service_name(service_name.clone())
793 .build(),
794 )
795 .build()
796 };
797
798 let tracer = tracer_provider.tracer(service_name);
800
801 tracing_subscriber::registry()
802 .with(
803 tracing_opentelemetry::layer()
804 .with_tracer(tracer)
805 .with_filter(otel_filter_layer),
806 )
807 .with(DistributedTraceIdLayer.with_filter(trace_filter_layer))
808 .with(l)
809 .init();
810 } else {
811 let l = fmt::layer()
812 .with_ansi(!disable_ansi_logging())
813 .event_format(fmt::format().compact().with_timer(TimeFormatter::new()))
814 .with_writer(std::io::stderr)
815 .with_filter(fmt_filter_layer);
816
817 tracing_subscriber::registry().with(l).init();
818 }
819
820 Ok(())
821}
822
823fn filters(config: LoggingConfig) -> EnvFilter {
824 let mut filter_layer = EnvFilter::builder()
825 .with_default_directive(config.log_level.parse().unwrap())
826 .with_env_var(FILTER_ENV)
827 .from_env_lossy();
828
829 for (module, level) in config.log_filters {
830 match format!("{module}={level}").parse::<Directive>() {
831 Ok(d) => {
832 filter_layer = filter_layer.add_directive(d);
833 }
834 Err(e) => {
835 eprintln!("Failed parsing filter '{level}' for module '{module}': {e}");
836 }
837 }
838 }
839 filter_layer
840}
841
842pub fn log_message(level: &str, message: &str, module: &str, file: &str, line: u32) {
845 let level = match level {
846 "debug" => log::Level::Debug,
847 "info" => log::Level::Info,
848 "warn" => log::Level::Warn,
849 "error" => log::Level::Error,
850 "warning" => log::Level::Warn,
851 _ => log::Level::Info,
852 };
853 log::logger().log(
854 &log::Record::builder()
855 .args(format_args!("{}", message))
856 .level(level)
857 .target(module)
858 .file(Some(file))
859 .line(Some(line))
860 .build(),
861 );
862}
863
864fn load_config() -> LoggingConfig {
865 let config_path = std::env::var(CONFIG_PATH_ENV).unwrap_or_else(|_| "".to_string());
866 let figment = Figment::new()
867 .merge(Serialized::defaults(LoggingConfig::default()))
868 .merge(Toml::file("/opt/dynamo/etc/logging.toml"))
869 .merge(Toml::file(config_path));
870
871 figment.extract().unwrap()
872}
873
874#[derive(Serialize)]
875struct JsonLog<'a> {
876 time: String,
877 level: String,
878 #[serde(skip_serializing_if = "Option::is_none")]
879 file: Option<&'a str>,
880 #[serde(skip_serializing_if = "Option::is_none")]
881 line: Option<u32>,
882 target: &'a str,
883 message: serde_json::Value,
884 #[serde(flatten)]
885 fields: BTreeMap<String, serde_json::Value>,
886}
887
888struct TimeFormatter {
889 use_local_tz: bool,
890}
891
892impl TimeFormatter {
893 fn new() -> Self {
894 Self {
895 use_local_tz: crate::config::use_local_timezone(),
896 }
897 }
898
899 fn format_now(&self) -> String {
900 if self.use_local_tz {
901 chrono::Local::now()
902 .format("%Y-%m-%dT%H:%M:%S%.6f%:z")
903 .to_string()
904 } else {
905 chrono::Utc::now()
906 .format("%Y-%m-%dT%H:%M:%S%.6fZ")
907 .to_string()
908 }
909 }
910}
911
912impl FormatTime for TimeFormatter {
913 fn format_time(&self, w: &mut fmt::format::Writer<'_>) -> std::fmt::Result {
914 write!(w, "{}", self.format_now())
915 }
916}
917
918struct CustomJsonFormatter {
919 time_formatter: TimeFormatter,
920}
921
922impl CustomJsonFormatter {
923 fn new() -> Self {
924 Self {
925 time_formatter: TimeFormatter::new(),
926 }
927 }
928}
929
930use once_cell::sync::Lazy;
931use regex::Regex;
932fn parse_tracing_duration(s: &str) -> Option<u64> {
933 static RE: Lazy<Regex> =
934 Lazy::new(|| Regex::new(r#"^["']?\s*([0-9.]+)\s*(µs|us|ns|ms|s)\s*["']?$"#).unwrap());
935 let captures = RE.captures(s)?;
936 let value: f64 = captures[1].parse().ok()?;
937 let unit = &captures[2];
938 match unit {
939 "ns" => Some((value / 1000.0) as u64),
940 "µs" | "us" => Some(value as u64),
941 "ms" => Some((value * 1000.0) as u64),
942 "s" => Some((value * 1_000_000.0) as u64),
943 _ => None,
944 }
945}
946
947impl<S, N> tracing_subscriber::fmt::FormatEvent<S, N> for CustomJsonFormatter
948where
949 S: Subscriber + for<'a> LookupSpan<'a>,
950 N: for<'a> FormatFields<'a> + 'static,
951{
952 fn format_event(
953 &self,
954 ctx: &FmtContext<'_, S, N>,
955 mut writer: Writer<'_>,
956 event: &Event<'_>,
957 ) -> std::fmt::Result {
958 let mut visitor = JsonVisitor::default();
959 let time = self.time_formatter.format_now();
960 event.record(&mut visitor);
961 let mut message = visitor
962 .fields
963 .remove("message")
964 .unwrap_or(serde_json::Value::String("".to_string()));
965
966 let current_span = event
967 .parent()
968 .and_then(|id| ctx.span(id))
969 .or_else(|| ctx.lookup_current());
970 if let Some(span) = current_span {
971 let ext = span.extensions();
972 let data = ext.get::<FormattedFields<N>>().unwrap();
973 let span_fields: Vec<(&str, &str)> = data
974 .fields
975 .split(' ')
976 .filter_map(|entry| entry.split_once('='))
977 .collect();
978 for (name, value) in span_fields {
979 visitor.fields.insert(
980 name.to_string(),
981 serde_json::Value::String(value.trim_matches('"').to_string()),
982 );
983 }
984
985 let busy_us = visitor
986 .fields
987 .remove("time.busy")
988 .and_then(|v| parse_tracing_duration(&v.to_string()));
989 let idle_us = visitor
990 .fields
991 .remove("time.idle")
992 .and_then(|v| parse_tracing_duration(&v.to_string()));
993
994 if let (Some(busy_us), Some(idle_us)) = (busy_us, idle_us) {
995 visitor.fields.insert(
996 "time.busy_us".to_string(),
997 serde_json::Value::Number(busy_us.into()),
998 );
999 visitor.fields.insert(
1000 "time.idle_us".to_string(),
1001 serde_json::Value::Number(idle_us.into()),
1002 );
1003 visitor.fields.insert(
1004 "time.duration_us".to_string(),
1005 serde_json::Value::Number((busy_us + idle_us).into()),
1006 );
1007 }
1008
1009 message = match message.as_str() {
1010 Some("new") => serde_json::Value::String("SPAN_CREATED".to_string()),
1011 Some("close") => serde_json::Value::String("SPAN_CLOSED".to_string()),
1012 _ => message.clone(),
1013 };
1014
1015 visitor.fields.insert(
1016 "span_name".to_string(),
1017 serde_json::Value::String(span.name().to_string()),
1018 );
1019
1020 if let Some(tracing_context) = ext.get::<DistributedTraceContext>() {
1021 visitor.fields.insert(
1022 "span_id".to_string(),
1023 serde_json::Value::String(tracing_context.span_id.clone()),
1024 );
1025 visitor.fields.insert(
1026 "trace_id".to_string(),
1027 serde_json::Value::String(tracing_context.trace_id.clone()),
1028 );
1029 if let Some(parent_id) = tracing_context.parent_id.clone() {
1030 visitor.fields.insert(
1031 "parent_id".to_string(),
1032 serde_json::Value::String(parent_id),
1033 );
1034 } else {
1035 visitor.fields.remove("parent_id");
1036 }
1037 if let Some(tracestate) = tracing_context.tracestate.clone() {
1038 visitor.fields.insert(
1039 "tracestate".to_string(),
1040 serde_json::Value::String(tracestate),
1041 );
1042 } else {
1043 visitor.fields.remove("tracestate");
1044 }
1045 if let Some(x_request_id) = tracing_context.x_request_id.clone() {
1046 visitor.fields.insert(
1047 "x_request_id".to_string(),
1048 serde_json::Value::String(x_request_id),
1049 );
1050 } else {
1051 visitor.fields.remove("x_request_id");
1052 }
1053
1054 if let Some(x_dynamo_request_id) = tracing_context.x_dynamo_request_id.clone() {
1055 visitor.fields.insert(
1056 "x_dynamo_request_id".to_string(),
1057 serde_json::Value::String(x_dynamo_request_id),
1058 );
1059 } else {
1060 visitor.fields.remove("x_dynamo_request_id");
1061 }
1062 } else {
1063 tracing::error!(
1064 "Distributed Trace Context not found, falling back to internal ids"
1065 );
1066 visitor.fields.insert(
1067 "span_id".to_string(),
1068 serde_json::Value::String(span.id().into_u64().to_string()),
1069 );
1070 if let Some(parent) = span.parent() {
1071 visitor.fields.insert(
1072 "parent_id".to_string(),
1073 serde_json::Value::String(parent.id().into_u64().to_string()),
1074 );
1075 }
1076 }
1077 } else {
1078 let reserved_fields = [
1079 "trace_id",
1080 "span_id",
1081 "parent_id",
1082 "span_name",
1083 "tracestate",
1084 ];
1085 for reserved_field in reserved_fields {
1086 visitor.fields.remove(reserved_field);
1087 }
1088 }
1089 let metadata = event.metadata();
1090 let log = JsonLog {
1091 level: metadata.level().to_string(),
1092 time,
1093 file: metadata.file(),
1094 line: metadata.line(),
1095 target: metadata.target(),
1096 message,
1097 fields: visitor.fields,
1098 };
1099 let json = serde_json::to_string(&log).unwrap();
1100 writeln!(writer, "{json}")
1101 }
1102}
1103
1104#[derive(Default)]
1105struct JsonVisitor {
1106 fields: BTreeMap<String, serde_json::Value>,
1107}
1108
1109impl tracing::field::Visit for JsonVisitor {
1110 fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
1111 self.fields.insert(
1112 field.name().to_string(),
1113 serde_json::Value::String(format!("{value:?}")),
1114 );
1115 }
1116
1117 fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
1118 if field.name() != "message" {
1119 match serde_json::from_str::<Value>(value) {
1120 Ok(json_val) => self.fields.insert(field.name().to_string(), json_val),
1121 Err(_) => self.fields.insert(field.name().to_string(), value.into()),
1122 };
1123 } else {
1124 self.fields.insert(field.name().to_string(), value.into());
1125 }
1126 }
1127
1128 fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
1129 self.fields
1130 .insert(field.name().to_string(), serde_json::Value::Bool(value));
1131 }
1132
1133 fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
1134 self.fields.insert(
1135 field.name().to_string(),
1136 serde_json::Value::Number(value.into()),
1137 );
1138 }
1139
1140 fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
1141 self.fields.insert(
1142 field.name().to_string(),
1143 serde_json::Value::Number(value.into()),
1144 );
1145 }
1146
1147 fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
1148 use serde_json::value::Number;
1149 self.fields.insert(
1150 field.name().to_string(),
1151 serde_json::Value::Number(Number::from_f64(value).unwrap_or(0.into())),
1152 );
1153 }
1154}
1155
1156#[cfg(test)]
1157pub mod tests {
1158 use super::*;
1159 use anyhow::{Result, anyhow};
1160 use chrono::{DateTime, Utc};
1161 use jsonschema::{Draft, JSONSchema};
1162 use serde_json::Value;
1163 use std::fs::File;
1164 use std::io::{BufRead, BufReader};
1165 use stdio_override::*;
1166 use tempfile::NamedTempFile;
1167
1168 static LOG_LINE_SCHEMA: &str = r#"
1169 {
1170 "$schema": "http://json-schema.org/draft-07/schema#",
1171 "title": "Runtime Log Line",
1172 "type": "object",
1173 "required": [
1174 "file",
1175 "level",
1176 "line",
1177 "message",
1178 "target",
1179 "time"
1180 ],
1181 "properties": {
1182 "file": { "type": "string" },
1183 "level": { "type": "string", "enum": ["ERROR", "WARN", "INFO", "DEBUG", "TRACE"] },
1184 "line": { "type": "integer" },
1185 "message": { "type": "string" },
1186 "target": { "type": "string" },
1187 "time": { "type": "string", "format": "date-time" },
1188 "span_id": { "type": "string", "pattern": "^[a-f0-9]{16}$" },
1189 "parent_id": { "type": "string", "pattern": "^[a-f0-9]{16}$" },
1190 "trace_id": { "type": "string", "pattern": "^[a-f0-9]{32}$" },
1191 "span_name": { "type": "string" },
1192 "time.busy_us": { "type": "integer" },
1193 "time.duration_us": { "type": "integer" },
1194 "time.idle_us": { "type": "integer" },
1195 "tracestate": { "type": "string" }
1196 },
1197 "additionalProperties": true
1198 }
1199 "#;
1200
1201 #[tracing::instrument(skip_all)]
1202 async fn parent() {
1203 tracing::trace!(message = "parent!");
1204 if let Some(my_ctx) = get_distributed_tracing_context() {
1205 tracing::info!(my_trace_id = my_ctx.trace_id);
1206 }
1207 child().await;
1208 }
1209
1210 #[tracing::instrument(skip_all)]
1211 async fn child() {
1212 tracing::trace!(message = "child");
1213 if let Some(my_ctx) = get_distributed_tracing_context() {
1214 tracing::info!(my_trace_id = my_ctx.trace_id);
1215 }
1216 grandchild().await;
1217 }
1218
1219 #[tracing::instrument(skip_all)]
1220 async fn grandchild() {
1221 tracing::trace!(message = "grandchild");
1222 if let Some(my_ctx) = get_distributed_tracing_context() {
1223 tracing::info!(my_trace_id = my_ctx.trace_id);
1224 }
1225 }
1226
1227 pub fn load_log(file_name: &str) -> Result<Vec<serde_json::Value>> {
1228 let schema_json: Value =
1229 serde_json::from_str(LOG_LINE_SCHEMA).expect("schema parse failure");
1230 let compiled_schema = JSONSchema::options()
1231 .with_draft(Draft::Draft7)
1232 .compile(&schema_json)
1233 .expect("Invalid schema");
1234
1235 let f = File::open(file_name)?;
1236 let reader = BufReader::new(f);
1237 let mut result = Vec::new();
1238
1239 for (line_num, line) in reader.lines().enumerate() {
1240 let line = line?;
1241 let val: Value = serde_json::from_str(&line)
1242 .map_err(|e| anyhow!("Line {}: invalid JSON: {}", line_num + 1, e))?;
1243
1244 if let Err(errors) = compiled_schema.validate(&val) {
1245 let errs = errors.map(|e| e.to_string()).collect::<Vec<_>>().join("; ");
1246 return Err(anyhow!(
1247 "Line {}: JSON Schema Validation errors: {}",
1248 line_num + 1,
1249 errs
1250 ));
1251 }
1252 println!("{}", val);
1253 result.push(val);
1254 }
1255 Ok(result)
1256 }
1257
1258 #[tokio::test]
1259 async fn test_json_log_capture() -> Result<()> {
1260 #[allow(clippy::redundant_closure_call)]
1261 let _ = temp_env::async_with_vars(
1262 [("DYN_LOGGING_JSONL", Some("1"))],
1263 (async || {
1264 let tmp_file = NamedTempFile::new().unwrap();
1265 let file_name = tmp_file.path().to_str().unwrap();
1266 let guard = StderrOverride::from_file(file_name)?;
1267 init();
1268 parent().await;
1269 drop(guard);
1270
1271 let lines = load_log(file_name)?;
1272
1273 let trace_id = lines
1276 .first()
1277 .and_then(|log_line| log_line.get("trace_id"))
1278 .and_then(|v| v.as_str())
1279 .expect("First log line should have a trace_id")
1280 .to_string();
1281
1282 assert_ne!(
1284 trace_id, "00000000000000000000000000000000",
1285 "trace_id should not be a zero/invalid ID"
1286 );
1287 assert!(
1288 !trace_id.chars().all(|c| c == '0'),
1289 "trace_id should not be all zeros"
1290 );
1291
1292 for log_line in &lines {
1294 if let Some(line_trace_id) = log_line.get("trace_id") {
1295 assert_eq!(
1296 line_trace_id.as_str().unwrap(),
1297 &trace_id,
1298 "All logs should have the same trace_id"
1299 );
1300 }
1301 }
1302
1303 for log_line in &lines {
1305 if let Some(my_trace_id) = log_line.get("my_trace_id") {
1306 assert_eq!(
1307 my_trace_id,
1308 &serde_json::Value::String(trace_id.clone()),
1309 "my_trace_id should match the trace_id from distributed tracing context"
1310 );
1311 }
1312 }
1313
1314 let mut created_span_ids: Vec<String> = Vec::new();
1316 let mut closed_span_ids: Vec<String> = Vec::new();
1317
1318 for log_line in &lines {
1319 if let Some(message) = log_line.get("message") {
1320 match message.as_str().unwrap() {
1321 "SPAN_CREATED" => {
1322 if let Some(span_id) = log_line.get("span_id") {
1323 let span_id_str = span_id.as_str().unwrap();
1324 assert!(
1325 created_span_ids.iter().all(|id| id != span_id_str),
1326 "Duplicate span ID found in SPAN_CREATED: {}",
1327 span_id_str
1328 );
1329 created_span_ids.push(span_id_str.to_string());
1330 }
1331 }
1332 "SPAN_CLOSED" => {
1333 if let Some(span_id) = log_line.get("span_id") {
1334 let span_id_str = span_id.as_str().unwrap();
1335 assert!(
1336 closed_span_ids.iter().all(|id| id != span_id_str),
1337 "Duplicate span ID found in SPAN_CLOSED: {}",
1338 span_id_str
1339 );
1340 closed_span_ids.push(span_id_str.to_string());
1341 }
1342 }
1343 _ => {}
1344 }
1345 }
1346 }
1347
1348 for closed_span_id in &closed_span_ids {
1350 assert!(
1351 created_span_ids.contains(closed_span_id),
1352 "SPAN_CLOSED without corresponding SPAN_CREATED: {}",
1353 closed_span_id
1354 );
1355 }
1356
1357 let parent_span_id = lines
1359 .iter()
1360 .find(|log_line| {
1361 log_line.get("message").unwrap().as_str().unwrap() == "SPAN_CREATED"
1362 && log_line.get("span_name").unwrap().as_str().unwrap() == "parent"
1363 })
1364 .and_then(|log_line| {
1365 log_line
1366 .get("span_id")
1367 .map(|s| s.as_str().unwrap().to_string())
1368 })
1369 .unwrap();
1370
1371 let child_span_id = lines
1372 .iter()
1373 .find(|log_line| {
1374 log_line.get("message").unwrap().as_str().unwrap() == "SPAN_CREATED"
1375 && log_line.get("span_name").unwrap().as_str().unwrap() == "child"
1376 })
1377 .and_then(|log_line| {
1378 log_line
1379 .get("span_id")
1380 .map(|s| s.as_str().unwrap().to_string())
1381 })
1382 .unwrap();
1383
1384 let _grandchild_span_id = lines
1385 .iter()
1386 .find(|log_line| {
1387 log_line.get("message").unwrap().as_str().unwrap() == "SPAN_CREATED"
1388 && log_line.get("span_name").unwrap().as_str().unwrap() == "grandchild"
1389 })
1390 .and_then(|log_line| {
1391 log_line
1392 .get("span_id")
1393 .map(|s| s.as_str().unwrap().to_string())
1394 })
1395 .unwrap();
1396
1397 for log_line in &lines {
1399 if let Some(span_name) = log_line.get("span_name")
1400 && let Some(span_name_str) = span_name.as_str()
1401 && span_name_str == "parent"
1402 {
1403 assert!(log_line.get("parent_id").is_none());
1404 }
1405 }
1406
1407 for log_line in &lines {
1409 if let Some(span_name) = log_line.get("span_name")
1410 && let Some(span_name_str) = span_name.as_str()
1411 && span_name_str == "child"
1412 {
1413 assert_eq!(
1414 log_line.get("parent_id").unwrap().as_str().unwrap(),
1415 &parent_span_id
1416 );
1417 }
1418 }
1419
1420 for log_line in &lines {
1422 if let Some(span_name) = log_line.get("span_name")
1423 && let Some(span_name_str) = span_name.as_str()
1424 && span_name_str == "grandchild"
1425 {
1426 assert_eq!(
1427 log_line.get("parent_id").unwrap().as_str().unwrap(),
1428 &child_span_id
1429 );
1430 }
1431 }
1432
1433 let parent_duration = lines
1435 .iter()
1436 .find(|log_line| {
1437 log_line.get("message").unwrap().as_str().unwrap() == "SPAN_CLOSED"
1438 && log_line.get("span_name").unwrap().as_str().unwrap() == "parent"
1439 })
1440 .and_then(|log_line| {
1441 log_line
1442 .get("time.duration_us")
1443 .map(|d| d.as_u64().unwrap())
1444 })
1445 .unwrap();
1446
1447 let child_duration = lines
1448 .iter()
1449 .find(|log_line| {
1450 log_line.get("message").unwrap().as_str().unwrap() == "SPAN_CLOSED"
1451 && log_line.get("span_name").unwrap().as_str().unwrap() == "child"
1452 })
1453 .and_then(|log_line| {
1454 log_line
1455 .get("time.duration_us")
1456 .map(|d| d.as_u64().unwrap())
1457 })
1458 .unwrap();
1459
1460 let grandchild_duration = lines
1461 .iter()
1462 .find(|log_line| {
1463 log_line.get("message").unwrap().as_str().unwrap() == "SPAN_CLOSED"
1464 && log_line.get("span_name").unwrap().as_str().unwrap() == "grandchild"
1465 })
1466 .and_then(|log_line| {
1467 log_line
1468 .get("time.duration_us")
1469 .map(|d| d.as_u64().unwrap())
1470 })
1471 .unwrap();
1472
1473 assert!(
1474 parent_duration > child_duration + grandchild_duration,
1475 "Parent duration is not greater than the sum of child and grandchild durations"
1476 );
1477 assert!(
1478 child_duration > grandchild_duration,
1479 "Child duration is not greater than grandchild duration"
1480 );
1481
1482 Ok::<(), anyhow::Error>(())
1483 })(),
1484 )
1485 .await;
1486 Ok(())
1487 }
1488}