dynamo_runtime/
logging.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Dynamo Distributed Logging Module.
5//!
6//! - Configuration loaded from:
7//!   1. Environment variables (highest priority).
8//!   2. Optional TOML file pointed to by the `DYN_LOGGING_CONFIG_PATH` environment variable.
9//!   3. `/opt/dynamo/etc/logging.toml`.
10//!
11//! Logging can take two forms: `READABLE` or `JSONL`. The default is `READABLE`. `JSONL`
12//! can be enabled by setting the `DYN_LOGGING_JSONL` environment variable to `1`.
13//!
14//! To use local timezone for logging timestamps, set the `DYN_LOG_USE_LOCAL_TZ` environment variable to `1`.
15//!
16//! Filters can be configured using the `DYN_LOG` environment variable or by setting the `filters`
17//! key in the TOML configuration file. Filters are comma-separated key-value pairs where the key
18//! is the crate or module name and the value is the log level. The default log level is `info`.
19//!
20//! Example:
21//! ```toml
22//! log_level = "error"
23//!
24//! [log_filters]
25//! "test_logging" = "info"
26//! "test_logging::api" = "trace"
27//! ```
28
29use std::collections::{BTreeMap, HashMap};
30use std::sync::Once;
31
32use figment::{
33    Figment,
34    providers::{Format, Serialized, Toml},
35};
36use serde::{Deserialize, Serialize};
37use tracing::level_filters::LevelFilter;
38use tracing::{Event, Subscriber};
39use tracing_subscriber::EnvFilter;
40use tracing_subscriber::fmt::time::FormatTime;
41use tracing_subscriber::fmt::time::LocalTime;
42use tracing_subscriber::fmt::time::SystemTime;
43use tracing_subscriber::fmt::time::UtcTime;
44use tracing_subscriber::fmt::{FmtContext, FormatFields};
45use tracing_subscriber::fmt::{FormattedFields, format::Writer};
46use tracing_subscriber::prelude::*;
47use tracing_subscriber::registry::LookupSpan;
48use tracing_subscriber::{filter::Directive, fmt};
49
50use crate::config::{disable_ansi_logging, jsonl_logging_enabled};
51use async_nats::{HeaderMap, HeaderValue};
52use axum::extract::FromRequestParts;
53use axum::http;
54use axum::http::Request;
55use axum::http::request::Parts;
56use serde_json::Value;
57use std::convert::Infallible;
58use std::time::Instant;
59use tower_http::trace::{DefaultMakeSpan, TraceLayer};
60use tracing::Id;
61use tracing::Span;
62use tracing::field::Field;
63use tracing::span;
64use tracing_subscriber::Layer;
65use tracing_subscriber::Registry;
66use tracing_subscriber::field::Visit;
67use tracing_subscriber::fmt::format::FmtSpan;
68use tracing_subscriber::layer::Context;
69use tracing_subscriber::registry::SpanData;
70use uuid::Uuid;
71
72use opentelemetry::propagation::{Extractor, Injector, TextMapPropagator};
73use opentelemetry::trace::TraceContextExt;
74use opentelemetry::{global, trace::Tracer};
75use opentelemetry_otlp::WithExportConfig;
76
77use opentelemetry::trace::TracerProvider as _;
78use opentelemetry::{Key, KeyValue};
79use opentelemetry_sdk::Resource;
80use opentelemetry_sdk::trace::SdkTracerProvider;
81use tracing::error;
82use tracing_subscriber::layer::SubscriberExt;
83// use tracing_subscriber::Registry;
84
85use std::time::Duration;
86use tracing::{info, instrument};
87use tracing_opentelemetry::OpenTelemetrySpanExt;
88use tracing_subscriber::util::SubscriberInitExt;
89
90use crate::config::environment_names::logging as env_logging;
91
92use dynamo_config::env_is_truthy;
93
94/// Default log level
95const DEFAULT_FILTER_LEVEL: &str = "info";
96
97/// Default OTLP endpoint
98const DEFAULT_OTLP_ENDPOINT: &str = "http://localhost:4317";
99
100/// Default service name
101const DEFAULT_OTEL_SERVICE_NAME: &str = "dynamo";
102
103/// Once instance to ensure the logger is only initialized once
104static INIT: Once = Once::new();
105
106#[derive(Serialize, Deserialize, Debug)]
107struct LoggingConfig {
108    log_level: String,
109    log_filters: HashMap<String, String>,
110}
111impl Default for LoggingConfig {
112    fn default() -> Self {
113        LoggingConfig {
114            log_level: DEFAULT_FILTER_LEVEL.to_string(),
115            log_filters: HashMap::from([
116                ("h2".to_string(), "error".to_string()),
117                ("tower".to_string(), "error".to_string()),
118                ("hyper_util".to_string(), "error".to_string()),
119                ("neli".to_string(), "error".to_string()),
120                ("async_nats".to_string(), "error".to_string()),
121                ("rustls".to_string(), "error".to_string()),
122                ("tokenizers".to_string(), "error".to_string()),
123                ("axum".to_string(), "error".to_string()),
124                ("tonic".to_string(), "error".to_string()),
125                ("mistralrs_core".to_string(), "error".to_string()),
126                ("hf_hub".to_string(), "error".to_string()),
127                ("opentelemetry".to_string(), "error".to_string()),
128                ("opentelemetry-otlp".to_string(), "error".to_string()),
129                ("opentelemetry_sdk".to_string(), "error".to_string()),
130            ]),
131        }
132    }
133}
134
135/// Check if OTLP trace exporting is enabled (accepts: "1", "true", "on", "yes" - case insensitive)
136fn otlp_exporter_enabled() -> bool {
137    env_is_truthy(env_logging::otlp::OTEL_EXPORT_ENABLED)
138}
139
140/// Get the service name from environment or use default
141fn get_service_name() -> String {
142    std::env::var(env_logging::otlp::OTEL_SERVICE_NAME)
143        .unwrap_or_else(|_| DEFAULT_OTEL_SERVICE_NAME.to_string())
144}
145
146/// Validate a given trace ID according to W3C Trace Context specifications.
147/// A valid trace ID is a 32-character hexadecimal string (lowercase).
148pub fn is_valid_trace_id(trace_id: &str) -> bool {
149    trace_id.len() == 32 && trace_id.chars().all(|c| c.is_ascii_hexdigit())
150}
151
152/// Validate a given span ID according to W3C Trace Context specifications.
153/// A valid span ID is a 16-character hexadecimal string (lowercase).
154pub fn is_valid_span_id(span_id: &str) -> bool {
155    span_id.len() == 16 && span_id.chars().all(|c| c.is_ascii_hexdigit())
156}
157
158pub struct DistributedTraceIdLayer;
159
160#[derive(Debug, Clone, Serialize, Deserialize)]
161pub struct DistributedTraceContext {
162    pub trace_id: String,
163    pub span_id: String,
164    #[serde(skip_serializing_if = "Option::is_none")]
165    pub parent_id: Option<String>,
166    #[serde(skip_serializing_if = "Option::is_none")]
167    pub tracestate: Option<String>,
168    #[serde(skip)]
169    start: Option<Instant>,
170    #[serde(skip)]
171    end: Option<Instant>,
172    #[serde(skip_serializing_if = "Option::is_none")]
173    pub x_request_id: Option<String>,
174    #[serde(skip_serializing_if = "Option::is_none")]
175    pub x_dynamo_request_id: Option<String>,
176}
177
178/// Pending context data collected in on_new_span, to be finalized in on_enter
179#[derive(Debug, Clone)]
180struct PendingDistributedTraceContext {
181    trace_id: Option<String>,
182    span_id: Option<String>,
183    parent_id: Option<String>,
184    tracestate: Option<String>,
185    x_request_id: Option<String>,
186    x_dynamo_request_id: Option<String>,
187}
188
189impl DistributedTraceContext {
190    /// Create a traceparent string from the context
191    pub fn create_traceparent(&self) -> String {
192        format!("00-{}-{}-01", self.trace_id, self.span_id)
193    }
194}
195
196/// Parse a traceparent string into its components
197pub fn parse_traceparent(traceparent: &str) -> (Option<String>, Option<String>) {
198    let pieces: Vec<_> = traceparent.split('-').collect();
199    if pieces.len() != 4 {
200        return (None, None);
201    }
202    let trace_id = pieces[1];
203    let parent_id = pieces[2];
204
205    if !is_valid_trace_id(trace_id) || !is_valid_span_id(parent_id) {
206        return (None, None);
207    }
208
209    (Some(trace_id.to_string()), Some(parent_id.to_string()))
210}
211
212#[derive(Debug, Clone, Default)]
213pub struct TraceParent {
214    pub trace_id: Option<String>,
215    pub parent_id: Option<String>,
216    pub tracestate: Option<String>,
217    pub x_request_id: Option<String>,
218    pub x_dynamo_request_id: Option<String>,
219}
220
221pub trait GenericHeaders {
222    fn get(&self, key: &str) -> Option<&str>;
223}
224
225impl GenericHeaders for async_nats::HeaderMap {
226    fn get(&self, key: &str) -> Option<&str> {
227        async_nats::HeaderMap::get(self, key).map(|value| value.as_str())
228    }
229}
230
231impl GenericHeaders for http::HeaderMap {
232    fn get(&self, key: &str) -> Option<&str> {
233        http::HeaderMap::get(self, key).and_then(|value| value.to_str().ok())
234    }
235}
236
237impl TraceParent {
238    pub fn from_headers<H: GenericHeaders>(headers: &H) -> TraceParent {
239        let mut trace_id = None;
240        let mut parent_id = None;
241        let mut tracestate = None;
242        let mut x_request_id = None;
243        let mut x_dynamo_request_id = None;
244
245        if let Some(header_value) = headers.get("traceparent") {
246            (trace_id, parent_id) = parse_traceparent(header_value);
247        }
248
249        if let Some(header_value) = headers.get("x-request-id") {
250            x_request_id = Some(header_value.to_string());
251        }
252
253        if let Some(header_value) = headers.get("tracestate") {
254            tracestate = Some(header_value.to_string());
255        }
256
257        if let Some(header_value) = headers.get("x-dynamo-request-id") {
258            x_dynamo_request_id = Some(header_value.to_string());
259        }
260
261        // Validate UUID format
262        let x_dynamo_request_id =
263            x_dynamo_request_id.filter(|id| uuid::Uuid::parse_str(id).is_ok());
264        TraceParent {
265            trace_id,
266            parent_id,
267            tracestate,
268            x_request_id,
269            x_dynamo_request_id,
270        }
271    }
272}
273
274// Takes Axum request and returning a span
275pub fn make_request_span<B>(req: &Request<B>) -> Span {
276    let method = req.method();
277    let uri = req.uri();
278    let version = format!("{:?}", req.version());
279    let trace_parent = TraceParent::from_headers(req.headers());
280
281    let otel_context = extract_otel_context_from_http_headers(req.headers());
282
283    let span = tracing::info_span!(
284        "http-request",
285        method = %method,
286        uri = %uri,
287        version = %version,
288        trace_id = trace_parent.trace_id,
289        parent_id = trace_parent.parent_id,
290        x_request_id = trace_parent.x_request_id,
291        x_dynamo_request_id = trace_parent.x_dynamo_request_id,
292    );
293
294    if let Some(context) = otel_context {
295        let _ = span.set_parent(context);
296    }
297
298    span
299}
300
301/// Extract OpenTelemetry context from HTTP headers for distributed tracing
302fn extract_otel_context_from_http_headers(
303    headers: &http::HeaderMap,
304) -> Option<opentelemetry::Context> {
305    let traceparent_value = headers.get("traceparent")?.to_str().ok()?;
306
307    struct HttpHeaderExtractor<'a>(&'a http::HeaderMap);
308
309    impl<'a> Extractor for HttpHeaderExtractor<'a> {
310        fn get(&self, key: &str) -> Option<&str> {
311            self.0.get(key).and_then(|v| v.to_str().ok())
312        }
313
314        fn keys(&self) -> Vec<&str> {
315            vec!["traceparent", "tracestate"]
316                .into_iter()
317                .filter(|&key| self.0.get(key).is_some())
318                .collect()
319        }
320    }
321
322    // Early return if traceparent is empty
323    if traceparent_value.is_empty() {
324        return None;
325    }
326
327    let extractor = HttpHeaderExtractor(headers);
328    let otel_context = TRACE_PROPAGATOR.extract(&extractor);
329
330    if otel_context.span().span_context().is_valid() {
331        Some(otel_context)
332    } else {
333        None
334    }
335}
336
337/// Create a handle_payload span from NATS headers with component context
338pub fn make_handle_payload_span(
339    headers: &async_nats::HeaderMap,
340    component: &str,
341    endpoint: &str,
342    namespace: &str,
343    instance_id: u64,
344) -> Span {
345    let (otel_context, trace_id, parent_span_id) = extract_otel_context_from_nats_headers(headers);
346    let trace_parent = TraceParent::from_headers(headers);
347
348    if let (Some(trace_id), Some(parent_id)) = (trace_id.as_ref(), parent_span_id.as_ref()) {
349        let span = tracing::info_span!(
350            "handle_payload",
351            trace_id = trace_id.as_str(),
352            parent_id = parent_id.as_str(),
353            x_request_id = trace_parent.x_request_id,
354            x_dynamo_request_id = trace_parent.x_dynamo_request_id,
355            tracestate = trace_parent.tracestate,
356            component = component,
357            endpoint = endpoint,
358            namespace = namespace,
359            instance_id = instance_id,
360        );
361
362        if let Some(context) = otel_context {
363            let _ = span.set_parent(context);
364        }
365        span
366    } else {
367        tracing::info_span!(
368            "handle_payload",
369            x_request_id = trace_parent.x_request_id,
370            x_dynamo_request_id = trace_parent.x_dynamo_request_id,
371            tracestate = trace_parent.tracestate,
372            component = component,
373            endpoint = endpoint,
374            namespace = namespace,
375            instance_id = instance_id,
376        )
377    }
378}
379
380/// Create a handle_payload span from TCP/HashMap headers with component context
381pub fn make_handle_payload_span_from_tcp_headers(
382    headers: &std::collections::HashMap<String, String>,
383    component: &str,
384    endpoint: &str,
385    namespace: &str,
386    instance_id: u64,
387) -> Span {
388    let (otel_context, trace_id, parent_span_id) = extract_otel_context_from_tcp_headers(headers);
389    let x_request_id = headers.get("x-request-id").cloned();
390    let x_dynamo_request_id = headers.get("x-dynamo-request-id").cloned();
391    let tracestate = headers.get("tracestate").cloned();
392
393    if let (Some(trace_id), Some(parent_id)) = (trace_id.as_ref(), parent_span_id.as_ref()) {
394        let span = tracing::info_span!(
395            "handle_payload",
396            trace_id = trace_id.as_str(),
397            parent_id = parent_id.as_str(),
398            x_request_id = x_request_id,
399            x_dynamo_request_id = x_dynamo_request_id,
400            tracestate = tracestate,
401            component = component,
402            endpoint = endpoint,
403            namespace = namespace,
404            instance_id = instance_id,
405        );
406
407        if let Some(context) = otel_context {
408            let _ = span.set_parent(context);
409        }
410        span
411    } else {
412        tracing::info_span!(
413            "handle_payload",
414            x_request_id = x_request_id,
415            x_dynamo_request_id = x_dynamo_request_id,
416            tracestate = tracestate,
417            component = component,
418            endpoint = endpoint,
419            namespace = namespace,
420            instance_id = instance_id,
421        )
422    }
423}
424
425/// Extract OpenTelemetry trace context from TCP/HashMap headers for distributed tracing
426fn extract_otel_context_from_tcp_headers(
427    headers: &std::collections::HashMap<String, String>,
428) -> (
429    Option<opentelemetry::Context>,
430    Option<String>,
431    Option<String>,
432) {
433    let traceparent_value = match headers.get("traceparent") {
434        Some(value) => value.as_str(),
435        None => return (None, None, None),
436    };
437
438    let (trace_id, parent_span_id) = parse_traceparent(traceparent_value);
439
440    struct TcpHeaderExtractor<'a>(&'a std::collections::HashMap<String, String>);
441
442    impl<'a> Extractor for TcpHeaderExtractor<'a> {
443        fn get(&self, key: &str) -> Option<&str> {
444            self.0.get(key).map(|s| s.as_str())
445        }
446
447        fn keys(&self) -> Vec<&str> {
448            vec!["traceparent", "tracestate"]
449                .into_iter()
450                .filter(|&key| self.0.get(key).is_some())
451                .collect()
452        }
453    }
454
455    let extractor = TcpHeaderExtractor(headers);
456    let otel_context = TRACE_PROPAGATOR.extract(&extractor);
457
458    let context_with_trace = if otel_context.span().span_context().is_valid() {
459        Some(otel_context)
460    } else {
461        None
462    };
463
464    (context_with_trace, trace_id, parent_span_id)
465}
466
467/// Extract OpenTelemetry trace context from NATS headers for distributed tracing
468pub fn extract_otel_context_from_nats_headers(
469    headers: &async_nats::HeaderMap,
470) -> (
471    Option<opentelemetry::Context>,
472    Option<String>,
473    Option<String>,
474) {
475    let traceparent_value = match headers.get("traceparent") {
476        Some(value) => value.as_str(),
477        None => return (None, None, None),
478    };
479
480    let (trace_id, parent_span_id) = parse_traceparent(traceparent_value);
481
482    struct NatsHeaderExtractor<'a>(&'a async_nats::HeaderMap);
483
484    impl<'a> Extractor for NatsHeaderExtractor<'a> {
485        fn get(&self, key: &str) -> Option<&str> {
486            self.0.get(key).map(|value| value.as_str())
487        }
488
489        fn keys(&self) -> Vec<&str> {
490            vec!["traceparent", "tracestate"]
491                .into_iter()
492                .filter(|&key| self.0.get(key).is_some())
493                .collect()
494        }
495    }
496
497    let extractor = NatsHeaderExtractor(headers);
498    let otel_context = TRACE_PROPAGATOR.extract(&extractor);
499
500    let context_with_trace = if otel_context.span().span_context().is_valid() {
501        Some(otel_context)
502    } else {
503        None
504    };
505
506    (context_with_trace, trace_id, parent_span_id)
507}
508
509/// Inject OpenTelemetry trace context into NATS headers using W3C Trace Context propagation
510pub fn inject_otel_context_into_nats_headers(
511    headers: &mut async_nats::HeaderMap,
512    context: Option<opentelemetry::Context>,
513) {
514    let otel_context = context.unwrap_or_else(|| Span::current().context());
515
516    struct NatsHeaderInjector<'a>(&'a mut async_nats::HeaderMap);
517
518    impl<'a> Injector for NatsHeaderInjector<'a> {
519        fn set(&mut self, key: &str, value: String) {
520            self.0.insert(key, value);
521        }
522    }
523
524    let mut injector = NatsHeaderInjector(headers);
525    TRACE_PROPAGATOR.inject_context(&otel_context, &mut injector);
526}
527
528/// Inject trace context from current span into NATS headers
529pub fn inject_current_trace_into_nats_headers(headers: &mut async_nats::HeaderMap) {
530    inject_otel_context_into_nats_headers(headers, None);
531}
532
533// Inject trace headers into a generic HashMap for HTTP/TCP transports
534pub fn inject_trace_headers_into_map(headers: &mut std::collections::HashMap<String, String>) {
535    if let Some(trace_context) = get_distributed_tracing_context() {
536        // Inject W3C traceparent header
537        headers.insert(
538            "traceparent".to_string(),
539            trace_context.create_traceparent(),
540        );
541
542        // Inject optional tracestate
543        if let Some(tracestate) = trace_context.tracestate {
544            headers.insert("tracestate".to_string(), tracestate);
545        }
546
547        // Inject custom request IDs
548        if let Some(x_request_id) = trace_context.x_request_id {
549            headers.insert("x-request-id".to_string(), x_request_id);
550        }
551        if let Some(x_dynamo_request_id) = trace_context.x_dynamo_request_id {
552            headers.insert("x-dynamo-request-id".to_string(), x_dynamo_request_id);
553        }
554    }
555}
556
557/// Create a client_request span linked to the parent trace context
558pub fn make_client_request_span(
559    operation: &str,
560    request_id: &str,
561    trace_context: Option<&DistributedTraceContext>,
562    instance_id: Option<&str>,
563) -> Span {
564    if let Some(ctx) = trace_context {
565        let mut headers = async_nats::HeaderMap::new();
566        headers.insert("traceparent", ctx.create_traceparent());
567
568        if let Some(ref tracestate) = ctx.tracestate {
569            headers.insert("tracestate", tracestate.as_str());
570        }
571
572        let (otel_context, _extracted_trace_id, _extracted_parent_span_id) =
573            extract_otel_context_from_nats_headers(&headers);
574
575        let span = if let Some(inst_id) = instance_id {
576            tracing::info_span!(
577                "client_request",
578                operation = operation,
579                request_id = request_id,
580                instance_id = inst_id,
581                trace_id = ctx.trace_id.as_str(),
582                parent_id = ctx.span_id.as_str(),
583                x_request_id = ctx.x_request_id.as_deref(),
584                x_dynamo_request_id = ctx.x_dynamo_request_id.as_deref(),
585                // tracestate = ctx.tracestate.as_deref(),
586            )
587        } else {
588            tracing::info_span!(
589                "client_request",
590                operation = operation,
591                request_id = request_id,
592                trace_id = ctx.trace_id.as_str(),
593                parent_id = ctx.span_id.as_str(),
594                x_request_id = ctx.x_request_id.as_deref(),
595                x_dynamo_request_id = ctx.x_dynamo_request_id.as_deref(),
596                // tracestate = ctx.tracestate.as_deref(),
597            )
598        };
599
600        if let Some(context) = otel_context {
601            let _ = span.set_parent(context);
602        }
603
604        span
605    } else if let Some(inst_id) = instance_id {
606        tracing::info_span!(
607            "client_request",
608            operation = operation,
609            request_id = request_id,
610            instance_id = inst_id,
611        )
612    } else {
613        tracing::info_span!(
614            "client_request",
615            operation = operation,
616            request_id = request_id,
617        )
618    }
619}
620
621#[derive(Debug, Default)]
622pub struct FieldVisitor {
623    pub fields: HashMap<String, String>,
624}
625
626impl Visit for FieldVisitor {
627    fn record_str(&mut self, field: &Field, value: &str) {
628        self.fields
629            .insert(field.name().to_string(), value.to_string());
630    }
631
632    fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
633        self.fields
634            .insert(field.name().to_string(), format!("{:?}", value).to_string());
635    }
636}
637
638impl<S> Layer<S> for DistributedTraceIdLayer
639where
640    S: Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>,
641{
642    // Capture close span time
643    // Currently not used but added for future use in timing
644    fn on_close(&self, id: Id, ctx: Context<'_, S>) {
645        if let Some(span) = ctx.span(&id) {
646            let mut extensions = span.extensions_mut();
647            if let Some(distributed_tracing_context) =
648                extensions.get_mut::<DistributedTraceContext>()
649            {
650                distributed_tracing_context.end = Some(Instant::now());
651            }
652        }
653    }
654
655    // Collects span attributes and metadata in on_new_span
656    // Final initialization deferred to on_enter when OtelData is available
657    fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &Id, ctx: Context<'_, S>) {
658        if let Some(span) = ctx.span(id) {
659            let mut trace_id: Option<String> = None;
660            let mut parent_id: Option<String> = None;
661            let mut span_id: Option<String> = None;
662            let mut x_request_id: Option<String> = None;
663            let mut x_dynamo_request_id: Option<String> = None;
664            let mut tracestate: Option<String> = None;
665            let mut visitor = FieldVisitor::default();
666            attrs.record(&mut visitor);
667
668            // Extract trace_id from span attributes
669            if let Some(trace_id_input) = visitor.fields.get("trace_id") {
670                if !is_valid_trace_id(trace_id_input) {
671                    tracing::trace!("trace id  '{}' is not valid! Ignoring.", trace_id_input);
672                } else {
673                    trace_id = Some(trace_id_input.to_string());
674                }
675            }
676
677            // Extract span_id from span attributes
678            if let Some(span_id_input) = visitor.fields.get("span_id") {
679                if !is_valid_span_id(span_id_input) {
680                    tracing::trace!("span id  '{}' is not valid! Ignoring.", span_id_input);
681                } else {
682                    span_id = Some(span_id_input.to_string());
683                }
684            }
685
686            // Extract parent_id from span attributes
687            if let Some(parent_id_input) = visitor.fields.get("parent_id") {
688                if !is_valid_span_id(parent_id_input) {
689                    tracing::trace!("parent id  '{}' is not valid! Ignoring.", parent_id_input);
690                } else {
691                    parent_id = Some(parent_id_input.to_string());
692                }
693            }
694
695            // Extract tracestate
696            if let Some(tracestate_input) = visitor.fields.get("tracestate") {
697                tracestate = Some(tracestate_input.to_string());
698            }
699
700            // Extract x_request_id
701            if let Some(x_request_id_input) = visitor.fields.get("x_request_id") {
702                x_request_id = Some(x_request_id_input.to_string());
703            }
704
705            // Extract x_dynamo_request_id
706            if let Some(x_request_id_input) = visitor.fields.get("x_dynamo_request_id") {
707                x_dynamo_request_id = Some(x_request_id_input.to_string());
708            }
709
710            // Inherit trace context from parent span if available
711            if parent_id.is_none()
712                && let Some(parent_span_id) = ctx.current_span().id()
713                && let Some(parent_span) = ctx.span(parent_span_id)
714            {
715                let parent_ext = parent_span.extensions();
716                if let Some(parent_tracing_context) = parent_ext.get::<DistributedTraceContext>() {
717                    trace_id = Some(parent_tracing_context.trace_id.clone());
718                    parent_id = Some(parent_tracing_context.span_id.clone());
719                    tracestate = parent_tracing_context.tracestate.clone();
720                }
721            }
722
723            // Validate consistency
724            if (parent_id.is_some() || span_id.is_some()) && trace_id.is_none() {
725                tracing::error!("parent id or span id are set but trace id is not set!");
726                // Clear inconsistent IDs to maintain trace integrity
727                parent_id = None;
728                span_id = None;
729            }
730
731            // Store pending context - will be finalized in on_enter
732            let mut extensions = span.extensions_mut();
733            extensions.insert(PendingDistributedTraceContext {
734                trace_id,
735                span_id,
736                parent_id,
737                tracestate,
738                x_request_id,
739                x_dynamo_request_id,
740            });
741        }
742    }
743
744    // Finalizes the DistributedTraceContext when span is entered
745    // At this point, OtelData should have valid trace_id and span_id
746    fn on_enter(&self, id: &Id, ctx: Context<'_, S>) {
747        if let Some(span) = ctx.span(id) {
748            // Check if already initialized (e.g., span re-entered)
749            {
750                let extensions = span.extensions();
751                if extensions.get::<DistributedTraceContext>().is_some() {
752                    return;
753                }
754            }
755
756            // Get the pending context and extract OtelData IDs
757            let mut extensions = span.extensions_mut();
758            let pending = match extensions.remove::<PendingDistributedTraceContext>() {
759                Some(p) => p,
760                None => {
761                    // This shouldn't happen - on_new_span should have created it
762                    tracing::error!("PendingDistributedTraceContext not found in on_enter");
763                    return;
764                }
765            };
766
767            let mut trace_id = pending.trace_id;
768            let mut span_id = pending.span_id;
769            let parent_id = pending.parent_id;
770            let tracestate = pending.tracestate;
771            let x_request_id = pending.x_request_id;
772            let x_dynamo_request_id = pending.x_dynamo_request_id;
773
774            // Try to extract from OtelData if not already set
775            // Need to drop extensions_mut to get immutable borrow for OtelData
776            drop(extensions);
777
778            if trace_id.is_none() || span_id.is_none() {
779                let extensions = span.extensions();
780                if let Some(otel_data) = extensions.get::<tracing_opentelemetry::OtelData>() {
781                    // Extract trace_id from OTEL data if not already set
782                    if trace_id.is_none()
783                        && let Some(otel_trace_id) = otel_data.trace_id()
784                    {
785                        let trace_id_str = format!("{}", otel_trace_id);
786                        if is_valid_trace_id(&trace_id_str) {
787                            trace_id = Some(trace_id_str);
788                        }
789                    }
790
791                    // Extract span_id from OTEL data if not already set
792                    if span_id.is_none()
793                        && let Some(otel_span_id) = otel_data.span_id()
794                    {
795                        let span_id_str = format!("{}", otel_span_id);
796                        if is_valid_span_id(&span_id_str) {
797                            span_id = Some(span_id_str);
798                        }
799                    }
800                }
801            }
802
803            // Panic if we still don't have required IDs
804            if trace_id.is_none() {
805                panic!(
806                    "trace_id is not set in on_enter - OtelData may not be properly initialized"
807                );
808            }
809
810            if span_id.is_none() {
811                panic!("span_id is not set in on_enter - OtelData may not be properly initialized");
812            }
813
814            // Re-acquire mutable borrow to insert the finalized context
815            let mut extensions = span.extensions_mut();
816            extensions.insert(DistributedTraceContext {
817                trace_id: trace_id.expect("Trace ID must be set"),
818                span_id: span_id.expect("Span ID must be set"),
819                parent_id,
820                tracestate,
821                start: Some(Instant::now()),
822                end: None,
823                x_request_id,
824                x_dynamo_request_id,
825            });
826        }
827    }
828}
829
830// Enables functions to retreive their current
831// context for adding to distributed headers
832pub fn get_distributed_tracing_context() -> Option<DistributedTraceContext> {
833    Span::current()
834        .with_subscriber(|(id, subscriber)| {
835            subscriber
836                .downcast_ref::<Registry>()
837                .and_then(|registry| registry.span_data(id))
838                .and_then(|span_data| {
839                    let extensions = span_data.extensions();
840                    extensions.get::<DistributedTraceContext>().cloned()
841                })
842        })
843        .flatten()
844}
845
846/// Initialize the logger - must be called when Tokio runtime is available
847pub fn init() {
848    INIT.call_once(|| {
849        if let Err(e) = setup_logging() {
850            eprintln!("Failed to initialize logging: {}", e);
851            std::process::exit(1);
852        }
853    });
854}
855
856#[cfg(feature = "tokio-console")]
857fn setup_logging() {
858    let tokio_console_layer = console_subscriber::ConsoleLayer::builder()
859        .with_default_env()
860        .server_addr(([0, 0, 0, 0], console_subscriber::Server::DEFAULT_PORT))
861        .spawn();
862    let tokio_console_target = tracing_subscriber::filter::Targets::new()
863        .with_default(LevelFilter::ERROR)
864        .with_target("runtime", LevelFilter::TRACE)
865        .with_target("tokio", LevelFilter::TRACE);
866    let l = fmt::layer()
867        .with_ansi(!disable_ansi_logging())
868        .event_format(fmt::format().compact().with_timer(TimeFormatter::new()))
869        .with_writer(std::io::stderr)
870        .with_filter(filters(load_config()));
871    tracing_subscriber::registry()
872        .with(l)
873        .with(tokio_console_layer.with_filter(tokio_console_target))
874        .init();
875}
876
877#[cfg(not(feature = "tokio-console"))]
878fn setup_logging() -> Result<(), Box<dyn std::error::Error>> {
879    let fmt_filter_layer = filters(load_config());
880    let trace_filter_layer = filters(load_config());
881    let otel_filter_layer = filters(load_config());
882
883    if jsonl_logging_enabled() {
884        let l = fmt::layer()
885            .with_ansi(false)
886            .event_format(CustomJsonFormatter::new())
887            .with_writer(std::io::stderr)
888            .with_filter(fmt_filter_layer);
889
890        // Create OpenTelemetry tracer - conditionally export to OTLP based on env var
891        let service_name = get_service_name();
892
893        // Build tracer provider - with or without OTLP export
894        let (tracer_provider, endpoint_opt) = if otlp_exporter_enabled() {
895            // Export enabled: create OTLP exporter with batch processor
896            let endpoint = std::env::var(env_logging::otlp::OTEL_EXPORTER_OTLP_TRACES_ENDPOINT)
897                .unwrap_or_else(|_| DEFAULT_OTLP_ENDPOINT.to_string());
898
899            // Initialize OTLP exporter using gRPC (Tonic)
900            let otlp_exporter = opentelemetry_otlp::SpanExporter::builder()
901                .with_tonic()
902                .with_endpoint(&endpoint)
903                .build()?;
904
905            // Create tracer provider with batch exporter and service name
906            let provider = opentelemetry_sdk::trace::SdkTracerProvider::builder()
907                .with_batch_exporter(otlp_exporter)
908                .with_resource(
909                    opentelemetry_sdk::Resource::builder_empty()
910                        .with_service_name(service_name.clone())
911                        .build(),
912                )
913                .build();
914
915            (provider, Some(endpoint))
916        } else {
917            // No export - traces generated locally only (for logging/trace IDs)
918            let provider = opentelemetry_sdk::trace::SdkTracerProvider::builder()
919                .with_resource(
920                    opentelemetry_sdk::Resource::builder_empty()
921                        .with_service_name(service_name.clone())
922                        .build(),
923                )
924                .build();
925
926            (provider, None)
927        };
928
929        // Get a tracer from the provider
930        let tracer = tracer_provider.tracer(service_name.clone());
931
932        tracing_subscriber::registry()
933            .with(
934                tracing_opentelemetry::layer()
935                    .with_tracer(tracer)
936                    .with_filter(otel_filter_layer),
937            )
938            .with(DistributedTraceIdLayer.with_filter(trace_filter_layer))
939            .with(l)
940            .init();
941
942        // Log initialization status after subscriber is ready
943        if let Some(endpoint) = endpoint_opt {
944            tracing::info!(
945                endpoint = %endpoint,
946                service = %service_name,
947                "OpenTelemetry OTLP export enabled"
948            );
949        } else {
950            tracing::info!(
951                service = %service_name,
952                "OpenTelemetry OTLP export disabled, traces local only"
953            );
954        }
955    } else {
956        let l = fmt::layer()
957            .with_ansi(!disable_ansi_logging())
958            .event_format(fmt::format().compact().with_timer(TimeFormatter::new()))
959            .with_writer(std::io::stderr)
960            .with_filter(fmt_filter_layer);
961
962        tracing_subscriber::registry().with(l).init();
963    }
964
965    Ok(())
966}
967
968fn filters(config: LoggingConfig) -> EnvFilter {
969    let mut filter_layer = EnvFilter::builder()
970        .with_default_directive(config.log_level.parse().unwrap())
971        .with_env_var(env_logging::DYN_LOG)
972        .from_env_lossy();
973
974    for (module, level) in config.log_filters {
975        match format!("{module}={level}").parse::<Directive>() {
976            Ok(d) => {
977                filter_layer = filter_layer.add_directive(d);
978            }
979            Err(e) => {
980                eprintln!("Failed parsing filter '{level}' for module '{module}': {e}");
981            }
982        }
983    }
984    filter_layer
985}
986
987/// Log a message with file and line info
988/// Used by Python wrapper
989pub fn log_message(level: &str, message: &str, module: &str, file: &str, line: u32) {
990    let level = match level {
991        "debug" => log::Level::Debug,
992        "info" => log::Level::Info,
993        "warn" => log::Level::Warn,
994        "error" => log::Level::Error,
995        "warning" => log::Level::Warn,
996        _ => log::Level::Info,
997    };
998    log::logger().log(
999        &log::Record::builder()
1000            .args(format_args!("{}", message))
1001            .level(level)
1002            .target(module)
1003            .file(Some(file))
1004            .line(Some(line))
1005            .build(),
1006    );
1007}
1008
1009fn load_config() -> LoggingConfig {
1010    let config_path =
1011        std::env::var(env_logging::DYN_LOGGING_CONFIG_PATH).unwrap_or_else(|_| "".to_string());
1012    let figment = Figment::new()
1013        .merge(Serialized::defaults(LoggingConfig::default()))
1014        .merge(Toml::file("/opt/dynamo/etc/logging.toml"))
1015        .merge(Toml::file(config_path));
1016
1017    figment.extract().unwrap()
1018}
1019
1020#[derive(Serialize)]
1021struct JsonLog<'a> {
1022    time: String,
1023    level: String,
1024    #[serde(skip_serializing_if = "Option::is_none")]
1025    file: Option<&'a str>,
1026    #[serde(skip_serializing_if = "Option::is_none")]
1027    line: Option<u32>,
1028    target: &'a str,
1029    message: serde_json::Value,
1030    #[serde(flatten)]
1031    fields: BTreeMap<String, serde_json::Value>,
1032}
1033
1034struct TimeFormatter {
1035    use_local_tz: bool,
1036}
1037
1038impl TimeFormatter {
1039    fn new() -> Self {
1040        Self {
1041            use_local_tz: crate::config::use_local_timezone(),
1042        }
1043    }
1044
1045    fn format_now(&self) -> String {
1046        if self.use_local_tz {
1047            chrono::Local::now()
1048                .format("%Y-%m-%dT%H:%M:%S%.6f%:z")
1049                .to_string()
1050        } else {
1051            chrono::Utc::now()
1052                .format("%Y-%m-%dT%H:%M:%S%.6fZ")
1053                .to_string()
1054        }
1055    }
1056}
1057
1058impl FormatTime for TimeFormatter {
1059    fn format_time(&self, w: &mut fmt::format::Writer<'_>) -> std::fmt::Result {
1060        write!(w, "{}", self.format_now())
1061    }
1062}
1063
1064struct CustomJsonFormatter {
1065    time_formatter: TimeFormatter,
1066}
1067
1068impl CustomJsonFormatter {
1069    fn new() -> Self {
1070        Self {
1071            time_formatter: TimeFormatter::new(),
1072        }
1073    }
1074}
1075
1076use once_cell::sync::Lazy;
1077use regex::Regex;
1078
1079/// Static W3C Trace Context propagator instance to avoid repeated allocations
1080static TRACE_PROPAGATOR: Lazy<opentelemetry_sdk::propagation::TraceContextPropagator> =
1081    Lazy::new(opentelemetry_sdk::propagation::TraceContextPropagator::new);
1082
1083fn parse_tracing_duration(s: &str) -> Option<u64> {
1084    static RE: Lazy<Regex> =
1085        Lazy::new(|| Regex::new(r#"^["']?\s*([0-9.]+)\s*(µs|us|ns|ms|s)\s*["']?$"#).unwrap());
1086    let captures = RE.captures(s)?;
1087    let value: f64 = captures[1].parse().ok()?;
1088    let unit = &captures[2];
1089    match unit {
1090        "ns" => Some((value / 1000.0) as u64),
1091        "µs" | "us" => Some(value as u64),
1092        "ms" => Some((value * 1000.0) as u64),
1093        "s" => Some((value * 1_000_000.0) as u64),
1094        _ => None,
1095    }
1096}
1097
1098impl<S, N> tracing_subscriber::fmt::FormatEvent<S, N> for CustomJsonFormatter
1099where
1100    S: Subscriber + for<'a> LookupSpan<'a>,
1101    N: for<'a> FormatFields<'a> + 'static,
1102{
1103    fn format_event(
1104        &self,
1105        ctx: &FmtContext<'_, S, N>,
1106        mut writer: Writer<'_>,
1107        event: &Event<'_>,
1108    ) -> std::fmt::Result {
1109        let mut visitor = JsonVisitor::default();
1110        let time = self.time_formatter.format_now();
1111        event.record(&mut visitor);
1112        let mut message = visitor
1113            .fields
1114            .remove("message")
1115            .unwrap_or(serde_json::Value::String("".to_string()));
1116
1117        let current_span = event
1118            .parent()
1119            .and_then(|id| ctx.span(id))
1120            .or_else(|| ctx.lookup_current());
1121        if let Some(span) = current_span {
1122            let ext = span.extensions();
1123            let data = ext.get::<FormattedFields<N>>().unwrap();
1124            let span_fields: Vec<(&str, &str)> = data
1125                .fields
1126                .split(' ')
1127                .filter_map(|entry| entry.split_once('='))
1128                .collect();
1129            for (name, value) in span_fields {
1130                visitor.fields.insert(
1131                    name.to_string(),
1132                    serde_json::Value::String(value.trim_matches('"').to_string()),
1133                );
1134            }
1135
1136            let busy_us = visitor
1137                .fields
1138                .remove("time.busy")
1139                .and_then(|v| parse_tracing_duration(&v.to_string()));
1140            let idle_us = visitor
1141                .fields
1142                .remove("time.idle")
1143                .and_then(|v| parse_tracing_duration(&v.to_string()));
1144
1145            if let (Some(busy_us), Some(idle_us)) = (busy_us, idle_us) {
1146                visitor.fields.insert(
1147                    "time.busy_us".to_string(),
1148                    serde_json::Value::Number(busy_us.into()),
1149                );
1150                visitor.fields.insert(
1151                    "time.idle_us".to_string(),
1152                    serde_json::Value::Number(idle_us.into()),
1153                );
1154                visitor.fields.insert(
1155                    "time.duration_us".to_string(),
1156                    serde_json::Value::Number((busy_us + idle_us).into()),
1157                );
1158            }
1159
1160            message = match message.as_str() {
1161                Some("new") => serde_json::Value::String("SPAN_CREATED".to_string()),
1162                Some("close") => serde_json::Value::String("SPAN_CLOSED".to_string()),
1163                _ => message.clone(),
1164            };
1165
1166            visitor.fields.insert(
1167                "span_name".to_string(),
1168                serde_json::Value::String(span.name().to_string()),
1169            );
1170
1171            if let Some(tracing_context) = ext.get::<DistributedTraceContext>() {
1172                visitor.fields.insert(
1173                    "span_id".to_string(),
1174                    serde_json::Value::String(tracing_context.span_id.clone()),
1175                );
1176                visitor.fields.insert(
1177                    "trace_id".to_string(),
1178                    serde_json::Value::String(tracing_context.trace_id.clone()),
1179                );
1180                if let Some(parent_id) = tracing_context.parent_id.clone() {
1181                    visitor.fields.insert(
1182                        "parent_id".to_string(),
1183                        serde_json::Value::String(parent_id),
1184                    );
1185                } else {
1186                    visitor.fields.remove("parent_id");
1187                }
1188                if let Some(tracestate) = tracing_context.tracestate.clone() {
1189                    visitor.fields.insert(
1190                        "tracestate".to_string(),
1191                        serde_json::Value::String(tracestate),
1192                    );
1193                } else {
1194                    visitor.fields.remove("tracestate");
1195                }
1196                if let Some(x_request_id) = tracing_context.x_request_id.clone() {
1197                    visitor.fields.insert(
1198                        "x_request_id".to_string(),
1199                        serde_json::Value::String(x_request_id),
1200                    );
1201                } else {
1202                    visitor.fields.remove("x_request_id");
1203                }
1204
1205                if let Some(x_dynamo_request_id) = tracing_context.x_dynamo_request_id.clone() {
1206                    visitor.fields.insert(
1207                        "x_dynamo_request_id".to_string(),
1208                        serde_json::Value::String(x_dynamo_request_id),
1209                    );
1210                } else {
1211                    visitor.fields.remove("x_dynamo_request_id");
1212                }
1213            } else {
1214                tracing::error!(
1215                    "Distributed Trace Context not found, falling back to internal ids"
1216                );
1217                visitor.fields.insert(
1218                    "span_id".to_string(),
1219                    serde_json::Value::String(span.id().into_u64().to_string()),
1220                );
1221                if let Some(parent) = span.parent() {
1222                    visitor.fields.insert(
1223                        "parent_id".to_string(),
1224                        serde_json::Value::String(parent.id().into_u64().to_string()),
1225                    );
1226                }
1227            }
1228        } else {
1229            let reserved_fields = [
1230                "trace_id",
1231                "span_id",
1232                "parent_id",
1233                "span_name",
1234                "tracestate",
1235            ];
1236            for reserved_field in reserved_fields {
1237                visitor.fields.remove(reserved_field);
1238            }
1239        }
1240        let metadata = event.metadata();
1241        let log = JsonLog {
1242            level: metadata.level().to_string(),
1243            time,
1244            file: metadata.file(),
1245            line: metadata.line(),
1246            target: metadata.target(),
1247            message,
1248            fields: visitor.fields,
1249        };
1250        let json = serde_json::to_string(&log).unwrap();
1251        writeln!(writer, "{json}")
1252    }
1253}
1254
1255#[derive(Default)]
1256struct JsonVisitor {
1257    fields: BTreeMap<String, serde_json::Value>,
1258}
1259
1260impl tracing::field::Visit for JsonVisitor {
1261    fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
1262        self.fields.insert(
1263            field.name().to_string(),
1264            serde_json::Value::String(format!("{value:?}")),
1265        );
1266    }
1267
1268    fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
1269        if field.name() != "message" {
1270            match serde_json::from_str::<Value>(value) {
1271                Ok(json_val) => self.fields.insert(field.name().to_string(), json_val),
1272                Err(_) => self.fields.insert(field.name().to_string(), value.into()),
1273            };
1274        } else {
1275            self.fields.insert(field.name().to_string(), value.into());
1276        }
1277    }
1278
1279    fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
1280        self.fields
1281            .insert(field.name().to_string(), serde_json::Value::Bool(value));
1282    }
1283
1284    fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
1285        self.fields.insert(
1286            field.name().to_string(),
1287            serde_json::Value::Number(value.into()),
1288        );
1289    }
1290
1291    fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
1292        self.fields.insert(
1293            field.name().to_string(),
1294            serde_json::Value::Number(value.into()),
1295        );
1296    }
1297
1298    fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
1299        use serde_json::value::Number;
1300        self.fields.insert(
1301            field.name().to_string(),
1302            serde_json::Value::Number(Number::from_f64(value).unwrap_or(0.into())),
1303        );
1304    }
1305}
1306
1307#[cfg(test)]
1308pub mod tests {
1309    use super::*;
1310    use anyhow::{Result, anyhow};
1311    use chrono::{DateTime, Utc};
1312    use jsonschema::{Draft, JSONSchema};
1313    use serde_json::Value;
1314    use std::fs::File;
1315    use std::io::{BufRead, BufReader};
1316    use stdio_override::*;
1317    use tempfile::NamedTempFile;
1318
1319    static LOG_LINE_SCHEMA: &str = r#"
1320    {
1321      "$schema": "http://json-schema.org/draft-07/schema#",
1322      "title": "Runtime Log Line",
1323      "type": "object",
1324      "required": [
1325        "file",
1326        "level",
1327        "line",
1328        "message",
1329        "target",
1330        "time"
1331      ],
1332      "properties": {
1333        "file":      { "type": "string" },
1334        "level":     { "type": "string", "enum": ["ERROR", "WARN", "INFO", "DEBUG", "TRACE"] },
1335        "line":      { "type": "integer" },
1336        "message":   { "type": "string" },
1337        "target":    { "type": "string" },
1338        "time":      { "type": "string", "format": "date-time" },
1339        "span_id":   { "type": "string", "pattern": "^[a-f0-9]{16}$" },
1340        "parent_id": { "type": "string", "pattern": "^[a-f0-9]{16}$" },
1341        "trace_id":  { "type": "string", "pattern": "^[a-f0-9]{32}$" },
1342        "span_name": { "type": "string" },
1343        "time.busy_us":     { "type": "integer" },
1344        "time.duration_us": { "type": "integer" },
1345        "time.idle_us":     { "type": "integer" },
1346        "tracestate": { "type": "string" }
1347      },
1348      "additionalProperties": true
1349    }
1350    "#;
1351
1352    #[tracing::instrument(skip_all)]
1353    async fn parent() {
1354        tracing::trace!(message = "parent!");
1355        if let Some(my_ctx) = get_distributed_tracing_context() {
1356            tracing::info!(my_trace_id = my_ctx.trace_id);
1357        }
1358        child().await;
1359    }
1360
1361    #[tracing::instrument(skip_all)]
1362    async fn child() {
1363        tracing::trace!(message = "child");
1364        if let Some(my_ctx) = get_distributed_tracing_context() {
1365            tracing::info!(my_trace_id = my_ctx.trace_id);
1366        }
1367        grandchild().await;
1368    }
1369
1370    #[tracing::instrument(skip_all)]
1371    async fn grandchild() {
1372        tracing::trace!(message = "grandchild");
1373        if let Some(my_ctx) = get_distributed_tracing_context() {
1374            tracing::info!(my_trace_id = my_ctx.trace_id);
1375        }
1376    }
1377
1378    pub fn load_log(file_name: &str) -> Result<Vec<serde_json::Value>> {
1379        let schema_json: Value =
1380            serde_json::from_str(LOG_LINE_SCHEMA).expect("schema parse failure");
1381        let compiled_schema = JSONSchema::options()
1382            .with_draft(Draft::Draft7)
1383            .compile(&schema_json)
1384            .expect("Invalid schema");
1385
1386        let f = File::open(file_name)?;
1387        let reader = BufReader::new(f);
1388        let mut result = Vec::new();
1389
1390        for (line_num, line) in reader.lines().enumerate() {
1391            let line = line?;
1392            let val: Value = serde_json::from_str(&line)
1393                .map_err(|e| anyhow!("Line {}: invalid JSON: {}", line_num + 1, e))?;
1394
1395            if let Err(errors) = compiled_schema.validate(&val) {
1396                let errs = errors.map(|e| e.to_string()).collect::<Vec<_>>().join("; ");
1397                return Err(anyhow!(
1398                    "Line {}: JSON Schema Validation errors: {}",
1399                    line_num + 1,
1400                    errs
1401                ));
1402            }
1403            println!("{}", val);
1404            result.push(val);
1405        }
1406        Ok(result)
1407    }
1408
1409    #[tokio::test]
1410    async fn test_json_log_capture() -> Result<()> {
1411        #[allow(clippy::redundant_closure_call)]
1412        let _ = temp_env::async_with_vars(
1413            [(env_logging::DYN_LOGGING_JSONL, Some("1"))],
1414            (async || {
1415                let tmp_file = NamedTempFile::new().unwrap();
1416                let file_name = tmp_file.path().to_str().unwrap();
1417                let guard = StderrOverride::from_file(file_name)?;
1418                init();
1419                parent().await;
1420                drop(guard);
1421
1422                let lines = load_log(file_name)?;
1423
1424                // 1. Extract the dynamically generated trace ID and validate consistency
1425                // All logs should have the same trace_id since they're part of the same trace
1426                // Skip any initialization logs that don't have trace_id (e.g., OTLP setup messages)
1427                //
1428                // Note: This test can fail if logging was already initialized by another test running
1429                // in parallel. Logging initialization is global (Once) and can only happen once per process.
1430                // If no trace_id is found, skip validation gracefully.
1431                let Some(trace_id) = lines
1432                    .iter()
1433                    .find_map(|log_line| log_line.get("trace_id").and_then(|v| v.as_str()))
1434                    .map(|s| s.to_string())
1435                else {
1436                    // Skip test if logging was already initialized - we can't control the output format
1437                    return Ok(());
1438                };
1439
1440                // Verify trace_id is not a zero/invalid ID
1441                assert_ne!(
1442                    trace_id, "00000000000000000000000000000000",
1443                    "trace_id should not be a zero/invalid ID"
1444                );
1445                assert!(
1446                    !trace_id.chars().all(|c| c == '0'),
1447                    "trace_id should not be all zeros"
1448                );
1449
1450                // Verify all logs have the same trace_id
1451                for log_line in &lines {
1452                    if let Some(line_trace_id) = log_line.get("trace_id") {
1453                        assert_eq!(
1454                            line_trace_id.as_str().unwrap(),
1455                            &trace_id,
1456                            "All logs should have the same trace_id"
1457                        );
1458                    }
1459                }
1460
1461                // Validate my_trace_id matches the actual trace ID
1462                for log_line in &lines {
1463                    if let Some(my_trace_id) = log_line.get("my_trace_id") {
1464                        assert_eq!(
1465                            my_trace_id,
1466                            &serde_json::Value::String(trace_id.clone()),
1467                            "my_trace_id should match the trace_id from distributed tracing context"
1468                        );
1469                    }
1470                }
1471
1472                // 2. Validate span IDs exist and are properly formatted
1473                let mut span_ids_seen: std::collections::HashSet<String> = std::collections::HashSet::new();
1474                let mut span_timestamps: std::collections::HashMap<String, DateTime<Utc>> = std::collections::HashMap::new();
1475
1476                for log_line in &lines {
1477                    if let Some(span_id) = log_line.get("span_id") {
1478                        let span_id_str = span_id.as_str().unwrap();
1479                        assert!(
1480                            is_valid_span_id(span_id_str),
1481                            "Invalid span_id format: {}",
1482                            span_id_str
1483                        );
1484                        span_ids_seen.insert(span_id_str.to_string());
1485                    }
1486
1487                    // Validate timestamp format and track span timestamps
1488                    if let Some(time_str) = log_line.get("time").and_then(|v| v.as_str()) {
1489                        let timestamp = DateTime::parse_from_rfc3339(time_str)
1490                            .expect("All timestamps should be valid RFC3339 format")
1491                            .with_timezone(&Utc);
1492
1493                        // Track timestamp for each span_name
1494                        if let Some(span_name) = log_line.get("span_name").and_then(|v| v.as_str()) {
1495                            span_timestamps.insert(span_name.to_string(), timestamp);
1496                        }
1497                    }
1498                }
1499
1500                // 3. Validate parent-child span relationships
1501                // Extract span IDs for each span by looking at their log messages
1502                let parent_span_id = lines
1503                    .iter()
1504                    .find(|log_line| {
1505                        log_line.get("span_name")
1506                            .and_then(|v| v.as_str()) == Some("parent")
1507                    })
1508                    .and_then(|log_line| {
1509                        log_line.get("span_id")
1510                            .and_then(|v| v.as_str())
1511                            .map(|s| s.to_string())
1512                    })
1513                    .expect("Should find parent span with span_id");
1514
1515                let child_span_id = lines
1516                    .iter()
1517                    .find(|log_line| {
1518                        log_line.get("span_name")
1519                            .and_then(|v| v.as_str()) == Some("child")
1520                    })
1521                    .and_then(|log_line| {
1522                        log_line.get("span_id")
1523                            .and_then(|v| v.as_str())
1524                            .map(|s| s.to_string())
1525                    })
1526                    .expect("Should find child span with span_id");
1527
1528                let grandchild_span_id = lines
1529                    .iter()
1530                    .find(|log_line| {
1531                        log_line.get("span_name")
1532                            .and_then(|v| v.as_str()) == Some("grandchild")
1533                    })
1534                    .and_then(|log_line| {
1535                        log_line.get("span_id")
1536                            .and_then(|v| v.as_str())
1537                            .map(|s| s.to_string())
1538                    })
1539                    .expect("Should find grandchild span with span_id");
1540
1541                // Verify span IDs are unique
1542                assert_ne!(parent_span_id, child_span_id, "Parent and child should have different span IDs");
1543                assert_ne!(child_span_id, grandchild_span_id, "Child and grandchild should have different span IDs");
1544                assert_ne!(parent_span_id, grandchild_span_id, "Parent and grandchild should have different span IDs");
1545
1546                // Verify parent span has no parent_id
1547                for log_line in &lines {
1548                    if let Some(span_name) = log_line.get("span_name")
1549                        && let Some(span_name_str) = span_name.as_str()
1550                        && span_name_str == "parent"
1551                    {
1552                        assert!(
1553                            log_line.get("parent_id").is_none(),
1554                            "Parent span should not have a parent_id"
1555                        );
1556                    }
1557                }
1558
1559                // Verify child span's parent_id is parent_span_id
1560                for log_line in &lines {
1561                    if let Some(span_name) = log_line.get("span_name")
1562                        && let Some(span_name_str) = span_name.as_str()
1563                        && span_name_str == "child"
1564                    {
1565                        let parent_id = log_line.get("parent_id")
1566                            .and_then(|v| v.as_str())
1567                            .expect("Child span should have a parent_id");
1568                        assert_eq!(
1569                            parent_id,
1570                            parent_span_id,
1571                            "Child's parent_id should match parent's span_id"
1572                        );
1573                    }
1574                }
1575
1576                // Verify grandchild span's parent_id is child_span_id
1577                for log_line in &lines {
1578                    if let Some(span_name) = log_line.get("span_name")
1579                        && let Some(span_name_str) = span_name.as_str()
1580                        && span_name_str == "grandchild"
1581                    {
1582                        let parent_id = log_line.get("parent_id")
1583                            .and_then(|v| v.as_str())
1584                            .expect("Grandchild span should have a parent_id");
1585                        assert_eq!(
1586                            parent_id,
1587                            child_span_id,
1588                            "Grandchild's parent_id should match child's span_id"
1589                        );
1590                    }
1591                }
1592
1593                // 4. Validate timestamp ordering - spans should log in execution order
1594                let parent_time = span_timestamps.get("parent")
1595                    .expect("Should have timestamp for parent span");
1596                let child_time = span_timestamps.get("child")
1597                    .expect("Should have timestamp for child span");
1598                let grandchild_time = span_timestamps.get("grandchild")
1599                    .expect("Should have timestamp for grandchild span");
1600
1601                // Parent logs first (or at same time), then child, then grandchild
1602                assert!(
1603                    parent_time <= child_time,
1604                    "Parent span should log before or at same time as child span (parent: {}, child: {})",
1605                    parent_time,
1606                    child_time
1607                );
1608                assert!(
1609                    child_time <= grandchild_time,
1610                    "Child span should log before or at same time as grandchild span (child: {}, grandchild: {})",
1611                    child_time,
1612                    grandchild_time
1613                );
1614
1615                Ok::<(), anyhow::Error>(())
1616            })(),
1617        )
1618        .await;
1619        Ok(())
1620    }
1621}