Skip to main content

dynamo_runtime/
logging.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Dynamo Distributed Logging Module.
5//!
6//! - Configuration loaded from:
7//!   1. Environment variables (highest priority).
8//!   2. Optional TOML file pointed to by the `DYN_LOGGING_CONFIG_PATH` environment variable.
9//!   3. `/opt/dynamo/etc/logging.toml`.
10//!
11//! Logging can take two forms: `READABLE` or `JSONL`. The default is `READABLE`. `JSONL`
12//! can be enabled by setting the `DYN_LOGGING_JSONL` environment variable to `1`.
13//!
14//! To use local timezone for logging timestamps, set the `DYN_LOG_USE_LOCAL_TZ` environment variable to `1`.
15//!
16//! Filters can be configured using the `DYN_LOG` environment variable or by setting the `filters`
17//! key in the TOML configuration file. Filters are comma-separated key-value pairs where the key
18//! is the crate or module name and the value is the log level. The default log level is `info`.
19//!
20//! Example:
21//! ```toml
22//! log_level = "error"
23//!
24//! [log_filters]
25//! "test_logging" = "info"
26//! "test_logging::api" = "trace"
27//! ```
28
29use std::collections::{BTreeMap, HashMap};
30use std::sync::Once;
31
32use figment::{
33    Figment,
34    providers::{Format, Serialized, Toml},
35};
36use serde::{Deserialize, Serialize};
37use tracing::level_filters::LevelFilter;
38use tracing::{Event, Subscriber};
39use tracing_subscriber::EnvFilter;
40use tracing_subscriber::fmt::time::FormatTime;
41use tracing_subscriber::fmt::time::LocalTime;
42use tracing_subscriber::fmt::time::SystemTime;
43use tracing_subscriber::fmt::time::UtcTime;
44use tracing_subscriber::fmt::{FmtContext, FormatFields};
45use tracing_subscriber::fmt::{FormattedFields, format::Writer};
46use tracing_subscriber::prelude::*;
47use tracing_subscriber::registry::LookupSpan;
48use tracing_subscriber::{filter::Directive, fmt};
49
50use crate::config::{disable_ansi_logging, jsonl_logging_enabled, span_events_enabled};
51use async_nats::{HeaderMap, HeaderValue};
52use axum::extract::FromRequestParts;
53use axum::http;
54use axum::http::Request;
55use axum::http::request::Parts;
56use serde_json::Value;
57use std::convert::Infallible;
58use std::time::Instant;
59use tower_http::trace::{DefaultMakeSpan, TraceLayer};
60use tracing::Id;
61use tracing::Span;
62use tracing::field::Field;
63use tracing::span;
64use tracing_subscriber::Layer;
65use tracing_subscriber::Registry;
66use tracing_subscriber::field::Visit;
67use tracing_subscriber::fmt::format::FmtSpan;
68use tracing_subscriber::layer::Context;
69use tracing_subscriber::registry::SpanData;
70use uuid::Uuid;
71
72use opentelemetry::propagation::{Extractor, Injector, TextMapPropagator};
73use opentelemetry::trace::TraceContextExt;
74use opentelemetry::{global, trace::Tracer};
75use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
76use opentelemetry_otlp::WithExportConfig;
77
78use opentelemetry::trace::TracerProvider as _;
79use opentelemetry::{Key, KeyValue};
80use opentelemetry_sdk::Resource;
81use opentelemetry_sdk::logs::SdkLoggerProvider;
82use opentelemetry_sdk::trace::SdkTracerProvider;
83use tracing::error;
84use tracing_subscriber::layer::SubscriberExt;
85// use tracing_subscriber::Registry;
86
87use std::time::Duration;
88use tracing::{info, instrument};
89use tracing_opentelemetry::OpenTelemetrySpanExt;
90use tracing_subscriber::util::SubscriberInitExt;
91
92use crate::config::environment_names::logging as env_logging;
93
94use dynamo_config::env_is_truthy;
95
96/// Default log level
97const DEFAULT_FILTER_LEVEL: &str = "info";
98
99/// Default OTLP endpoint
100const DEFAULT_OTLP_ENDPOINT: &str = "http://localhost:4317";
101
102/// Default service name
103const DEFAULT_OTEL_SERVICE_NAME: &str = "dynamo";
104
105/// Once instance to ensure the logger is only initialized once
106static INIT: Once = Once::new();
107
108#[derive(Serialize, Deserialize, Debug)]
109struct LoggingConfig {
110    log_level: String,
111    log_filters: HashMap<String, String>,
112}
113impl Default for LoggingConfig {
114    fn default() -> Self {
115        LoggingConfig {
116            log_level: DEFAULT_FILTER_LEVEL.to_string(),
117            log_filters: HashMap::from([
118                ("h2".to_string(), "error".to_string()),
119                ("tower".to_string(), "error".to_string()),
120                ("hyper_util".to_string(), "error".to_string()),
121                ("neli".to_string(), "error".to_string()),
122                ("async_nats".to_string(), "error".to_string()),
123                ("rustls".to_string(), "error".to_string()),
124                ("tokenizers".to_string(), "error".to_string()),
125                ("axum".to_string(), "error".to_string()),
126                ("tonic".to_string(), "error".to_string()),
127                ("hf_hub".to_string(), "error".to_string()),
128                ("opentelemetry".to_string(), "error".to_string()),
129                ("opentelemetry-otlp".to_string(), "error".to_string()),
130                ("opentelemetry_sdk".to_string(), "error".to_string()),
131            ]),
132        }
133    }
134}
135
136/// Check if OTLP trace exporting is enabled (accepts: "1", "true", "on", "yes" - case insensitive)
137fn otlp_exporter_enabled() -> bool {
138    env_is_truthy(env_logging::otlp::OTEL_EXPORT_ENABLED)
139}
140
141/// Get the service name from environment or use default
142fn get_service_name() -> String {
143    std::env::var(env_logging::otlp::OTEL_SERVICE_NAME)
144        .unwrap_or_else(|_| DEFAULT_OTEL_SERVICE_NAME.to_string())
145}
146
147/// Validate a given trace ID according to W3C Trace Context specifications.
148/// A valid trace ID is a 32-character hexadecimal string (lowercase).
149pub fn is_valid_trace_id(trace_id: &str) -> bool {
150    trace_id.len() == 32 && trace_id.chars().all(|c| c.is_ascii_hexdigit())
151}
152
153/// Validate a given span ID according to W3C Trace Context specifications.
154/// A valid span ID is a 16-character hexadecimal string (lowercase).
155pub fn is_valid_span_id(span_id: &str) -> bool {
156    span_id.len() == 16 && span_id.chars().all(|c| c.is_ascii_hexdigit())
157}
158
159pub struct DistributedTraceIdLayer;
160
161#[derive(Debug, Clone, Serialize, Deserialize)]
162pub struct DistributedTraceContext {
163    pub trace_id: String,
164    pub span_id: String,
165    #[serde(skip_serializing_if = "Option::is_none")]
166    pub parent_id: Option<String>,
167    #[serde(skip_serializing_if = "Option::is_none")]
168    pub tracestate: Option<String>,
169    #[serde(skip)]
170    start: Option<Instant>,
171    #[serde(skip)]
172    end: Option<Instant>,
173    #[serde(skip_serializing_if = "Option::is_none")]
174    pub x_request_id: Option<String>,
175    #[serde(skip_serializing_if = "Option::is_none")]
176    pub request_id: Option<String>,
177}
178
179/// Pending context data collected in on_new_span, to be finalized in on_enter
180#[derive(Debug, Clone)]
181struct PendingDistributedTraceContext {
182    trace_id: Option<String>,
183    span_id: Option<String>,
184    parent_id: Option<String>,
185    tracestate: Option<String>,
186    x_request_id: Option<String>,
187    request_id: Option<String>,
188}
189
190/// Macro to emit a tracing event at a dynamic level with a custom target.
191macro_rules! emit_at_level {
192    ($level:expr, target: $target:expr, $($arg:tt)*) => {
193        // tracing::event! requires a compile-time constant level, so we must match
194        // on the runtime level and use a literal Level constant in each arm.
195        // See: https://github.com/tokio-rs/tracing/issues/2730
196        match $level {
197            &tracing::Level::ERROR => tracing::event!(target: $target, tracing::Level::ERROR, $($arg)*),
198            &tracing::Level::WARN => tracing::event!(target: $target, tracing::Level::WARN, $($arg)*),
199            &tracing::Level::INFO => tracing::event!(target: $target, tracing::Level::INFO, $($arg)*),
200            &tracing::Level::DEBUG => tracing::event!(target: $target, tracing::Level::DEBUG, $($arg)*),
201            &tracing::Level::TRACE => tracing::event!(target: $target, tracing::Level::TRACE, $($arg)*),
202        }
203    };
204}
205
206impl DistributedTraceContext {
207    /// Create a traceparent string from the context
208    pub fn create_traceparent(&self) -> String {
209        format!("00-{}-{}-01", self.trace_id, self.span_id)
210    }
211}
212
213/// Parse a traceparent string into its components
214pub fn parse_traceparent(traceparent: &str) -> (Option<String>, Option<String>) {
215    let pieces: Vec<_> = traceparent.split('-').collect();
216    if pieces.len() != 4 {
217        return (None, None);
218    }
219    let trace_id = pieces[1];
220    let parent_id = pieces[2];
221
222    if !is_valid_trace_id(trace_id) || !is_valid_span_id(parent_id) {
223        return (None, None);
224    }
225
226    (Some(trace_id.to_string()), Some(parent_id.to_string()))
227}
228
229#[derive(Debug, Clone, Default)]
230pub struct TraceParent {
231    pub trace_id: Option<String>,
232    pub parent_id: Option<String>,
233    pub tracestate: Option<String>,
234    pub x_request_id: Option<String>,
235    pub request_id: Option<String>,
236}
237
238pub trait GenericHeaders {
239    fn get(&self, key: &str) -> Option<&str>;
240}
241
242impl GenericHeaders for async_nats::HeaderMap {
243    fn get(&self, key: &str) -> Option<&str> {
244        async_nats::HeaderMap::get(self, key).map(|value| value.as_str())
245    }
246}
247
248impl GenericHeaders for http::HeaderMap {
249    fn get(&self, key: &str) -> Option<&str> {
250        http::HeaderMap::get(self, key).and_then(|value| value.to_str().ok())
251    }
252}
253
254impl TraceParent {
255    pub fn from_headers<H: GenericHeaders>(headers: &H) -> TraceParent {
256        let mut trace_id = None;
257        let mut parent_id = None;
258        let mut tracestate = None;
259        let mut x_request_id = None;
260        let mut request_id = None;
261
262        if let Some(header_value) = headers.get("traceparent") {
263            (trace_id, parent_id) = parse_traceparent(header_value);
264        }
265
266        if let Some(header_value) = headers.get("x-request-id") {
267            x_request_id = Some(header_value.to_string());
268        }
269
270        if let Some(header_value) = headers.get("tracestate") {
271            tracestate = Some(header_value.to_string());
272        }
273
274        // Read request-id from internal headers, with fallback to deprecated x-dynamo-request-id
275        if let Some(header_value) = headers.get("request-id") {
276            request_id = Some(header_value.to_string());
277        } else if let Some(header_value) = headers.get("x-dynamo-request-id") {
278            request_id = Some(header_value.to_string());
279        }
280
281        let request_id = request_id.filter(|id| uuid::Uuid::parse_str(id).is_ok());
282        TraceParent {
283            trace_id,
284            parent_id,
285            tracestate,
286            x_request_id,
287            request_id,
288        }
289    }
290}
291
292/// Create a span for inference request endpoints (completions, chat, embeddings, etc.).
293///
294/// Uses `target: "request_span"` which is always allowed through the DYN_LOG filter
295/// (via `request_span=trace` directive in `filters()`). This ensures request context
296/// (request_id, model, trace_id) is always available on log events.
297pub fn make_inference_request_span<B>(req: &Request<B>) -> Span {
298    let method = req.method();
299    let uri = req.uri();
300    let version = format!("{:?}", req.version());
301    let trace_parent = TraceParent::from_headers(req.headers());
302
303    let otel_context = extract_otel_context_from_http_headers(req.headers());
304
305    // Ensure every inference request has a request_id on the span.
306    // This is the single source of truth — workers and get_or_create_request_id
307    // read it back via DistributedTraceIdLayer.
308    let request_id = trace_parent
309        .request_id
310        .unwrap_or_else(|| Uuid::new_v4().to_string());
311
312    let span = tracing::info_span!(
313            target: "request_span",
314        "http-request",
315        method = %method,
316        uri = %uri,
317        version = %version,
318        trace_id = trace_parent.trace_id,
319        parent_id = trace_parent.parent_id,
320        x_request_id = trace_parent.x_request_id,
321        request_id = %request_id,
322        model = tracing::field::Empty,
323        input_tokens = tracing::field::Empty,
324        output_tokens = tracing::field::Empty,
325        ttft_ms = tracing::field::Empty,
326        avg_itl_ms = tracing::field::Empty,
327        prefill_worker_id = tracing::field::Empty,
328        decode_worker_id = tracing::field::Empty,
329    );
330
331    if let Some(context) = otel_context {
332        let _ = span.set_parent(context);
333    }
334
335    span
336}
337
338/// Create a span for system endpoints (health, metrics, models, engine, loras, etc.).
339///
340/// Same structure as `make_inference_request_span` but uses `target: "system_span"`
341/// which follows normal DYN_LOG filtering (debug level by default). The inference
342/// span target `request_span` is always-on via a `request_span=trace` directive;
343/// system spans are not, keeping high-frequency polling endpoints quiet.
344pub fn make_system_request_span<B>(req: &Request<B>) -> Span {
345    let method = req.method();
346    let uri = req.uri();
347    let version = format!("{:?}", req.version());
348    let trace_parent = TraceParent::from_headers(req.headers());
349    let otel_context = extract_otel_context_from_http_headers(req.headers());
350
351    // Ensure every system request has a request_id on the span.
352    let request_id = trace_parent
353        .request_id
354        .unwrap_or_else(|| Uuid::new_v4().to_string());
355
356    let span = tracing::debug_span!(
357        target: "system_span",
358        "http-request",
359        method = %method,
360        uri = %uri,
361        version = %version,
362        trace_id = trace_parent.trace_id,
363        parent_id = trace_parent.parent_id,
364        x_request_id = trace_parent.x_request_id,
365        request_id = %request_id,
366        model = tracing::field::Empty,
367        input_tokens = tracing::field::Empty,
368        output_tokens = tracing::field::Empty,
369        ttft_ms = tracing::field::Empty,
370        avg_itl_ms = tracing::field::Empty,
371        prefill_worker_id = tracing::field::Empty,
372        decode_worker_id = tracing::field::Empty,
373    );
374
375    if let Some(context) = otel_context {
376        let _ = span.set_parent(context);
377    }
378
379    span
380}
381
382/// Extract OpenTelemetry context from HTTP headers for distributed tracing
383fn extract_otel_context_from_http_headers(
384    headers: &http::HeaderMap,
385) -> Option<opentelemetry::Context> {
386    let traceparent_value = headers.get("traceparent")?.to_str().ok()?;
387
388    struct HttpHeaderExtractor<'a>(&'a http::HeaderMap);
389
390    impl<'a> Extractor for HttpHeaderExtractor<'a> {
391        fn get(&self, key: &str) -> Option<&str> {
392            self.0.get(key).and_then(|v| v.to_str().ok())
393        }
394
395        fn keys(&self) -> Vec<&str> {
396            vec!["traceparent", "tracestate"]
397                .into_iter()
398                .filter(|&key| self.0.get(key).is_some())
399                .collect()
400        }
401    }
402
403    // Early return if traceparent is empty
404    if traceparent_value.is_empty() {
405        return None;
406    }
407
408    let extractor = HttpHeaderExtractor(headers);
409    let otel_context = TRACE_PROPAGATOR.extract(&extractor);
410
411    if otel_context.span().span_context().is_valid() {
412        Some(otel_context)
413    } else {
414        None
415    }
416}
417
418/// Create a handle_payload span from NATS headers with component context
419pub fn make_handle_payload_span(
420    headers: &async_nats::HeaderMap,
421    component: &str,
422    endpoint: &str,
423    namespace: &str,
424    instance_id: u64,
425) -> Span {
426    let (otel_context, trace_id, parent_span_id) = extract_otel_context_from_nats_headers(headers);
427    let trace_parent = TraceParent::from_headers(headers);
428
429    if let (Some(trace_id), Some(parent_id)) = (trace_id.as_ref(), parent_span_id.as_ref()) {
430        let span = tracing::info_span!(
431            target: "request_span",
432            "handle_payload",
433            trace_id = trace_id.as_str(),
434            parent_id = parent_id.as_str(),
435            x_request_id = trace_parent.x_request_id,
436            request_id = trace_parent.request_id,
437            tracestate = trace_parent.tracestate,
438            component = component,
439            endpoint = endpoint,
440            namespace = namespace,
441            instance_id = instance_id,
442        );
443
444        if let Some(context) = otel_context {
445            let _ = span.set_parent(context);
446        }
447        span
448    } else {
449        tracing::info_span!(
450            target: "request_span",
451            "handle_payload",
452            x_request_id = trace_parent.x_request_id,
453            request_id = trace_parent.request_id,
454            tracestate = trace_parent.tracestate,
455            component = component,
456            endpoint = endpoint,
457            namespace = namespace,
458            instance_id = instance_id,
459        )
460    }
461}
462
463/// Create a handle_payload span from TCP/HashMap headers with component context
464pub fn make_handle_payload_span_from_tcp_headers(
465    headers: &std::collections::HashMap<String, String>,
466    component: &str,
467    endpoint: &str,
468    namespace: &str,
469    instance_id: u64,
470) -> Span {
471    let (otel_context, trace_id, parent_span_id) = extract_otel_context_from_tcp_headers(headers);
472    let x_request_id = headers.get("x-request-id").cloned();
473    let request_id = headers
474        .get("request-id")
475        .or_else(|| headers.get("x-dynamo-request-id"))
476        .filter(|id| uuid::Uuid::parse_str(id).is_ok())
477        .cloned();
478    let tracestate = headers.get("tracestate").cloned();
479
480    if let (Some(trace_id), Some(parent_id)) = (trace_id.as_ref(), parent_span_id.as_ref()) {
481        let span = tracing::info_span!(
482            target: "request_span",
483            "handle_payload",
484            trace_id = trace_id.as_str(),
485            parent_id = parent_id.as_str(),
486            x_request_id = x_request_id,
487            request_id = request_id,
488            tracestate = tracestate,
489            component = component,
490            endpoint = endpoint,
491            namespace = namespace,
492            instance_id = instance_id,
493        );
494
495        if let Some(context) = otel_context {
496            let _ = span.set_parent(context);
497        }
498        span
499    } else {
500        tracing::info_span!(
501            target: "request_span",
502            "handle_payload",
503            x_request_id = x_request_id,
504            request_id = request_id,
505            tracestate = tracestate,
506            component = component,
507            endpoint = endpoint,
508            namespace = namespace,
509            instance_id = instance_id,
510        )
511    }
512}
513
514/// Extract OpenTelemetry trace context from TCP/HashMap headers for distributed tracing
515fn extract_otel_context_from_tcp_headers(
516    headers: &std::collections::HashMap<String, String>,
517) -> (
518    Option<opentelemetry::Context>,
519    Option<String>,
520    Option<String>,
521) {
522    let traceparent_value = match headers.get("traceparent") {
523        Some(value) => value.as_str(),
524        None => return (None, None, None),
525    };
526
527    let (trace_id, parent_span_id) = parse_traceparent(traceparent_value);
528
529    struct TcpHeaderExtractor<'a>(&'a std::collections::HashMap<String, String>);
530
531    impl<'a> Extractor for TcpHeaderExtractor<'a> {
532        fn get(&self, key: &str) -> Option<&str> {
533            self.0.get(key).map(|s| s.as_str())
534        }
535
536        fn keys(&self) -> Vec<&str> {
537            vec!["traceparent", "tracestate"]
538                .into_iter()
539                .filter(|&key| self.0.get(key).is_some())
540                .collect()
541        }
542    }
543
544    let extractor = TcpHeaderExtractor(headers);
545    let otel_context = TRACE_PROPAGATOR.extract(&extractor);
546
547    let context_with_trace = if otel_context.span().span_context().is_valid() {
548        Some(otel_context)
549    } else {
550        None
551    };
552
553    (context_with_trace, trace_id, parent_span_id)
554}
555
556/// Extract OpenTelemetry trace context from NATS headers for distributed tracing
557pub fn extract_otel_context_from_nats_headers(
558    headers: &async_nats::HeaderMap,
559) -> (
560    Option<opentelemetry::Context>,
561    Option<String>,
562    Option<String>,
563) {
564    let traceparent_value = match headers.get("traceparent") {
565        Some(value) => value.as_str(),
566        None => return (None, None, None),
567    };
568
569    let (trace_id, parent_span_id) = parse_traceparent(traceparent_value);
570
571    struct NatsHeaderExtractor<'a>(&'a async_nats::HeaderMap);
572
573    impl<'a> Extractor for NatsHeaderExtractor<'a> {
574        fn get(&self, key: &str) -> Option<&str> {
575            self.0.get(key).map(|value| value.as_str())
576        }
577
578        fn keys(&self) -> Vec<&str> {
579            vec!["traceparent", "tracestate"]
580                .into_iter()
581                .filter(|&key| self.0.get(key).is_some())
582                .collect()
583        }
584    }
585
586    let extractor = NatsHeaderExtractor(headers);
587    let otel_context = TRACE_PROPAGATOR.extract(&extractor);
588
589    let context_with_trace = if otel_context.span().span_context().is_valid() {
590        Some(otel_context)
591    } else {
592        None
593    };
594
595    (context_with_trace, trace_id, parent_span_id)
596}
597
598/// Inject OpenTelemetry trace context into NATS headers using W3C Trace Context propagation
599pub fn inject_otel_context_into_nats_headers(
600    headers: &mut async_nats::HeaderMap,
601    context: Option<opentelemetry::Context>,
602) {
603    let otel_context = context.unwrap_or_else(|| Span::current().context());
604
605    struct NatsHeaderInjector<'a>(&'a mut async_nats::HeaderMap);
606
607    impl<'a> Injector for NatsHeaderInjector<'a> {
608        fn set(&mut self, key: &str, value: String) {
609            self.0.insert(key, value);
610        }
611    }
612
613    let mut injector = NatsHeaderInjector(headers);
614    TRACE_PROPAGATOR.inject_context(&otel_context, &mut injector);
615}
616
617/// Inject trace context from current span into NATS headers
618pub fn inject_current_trace_into_nats_headers(headers: &mut async_nats::HeaderMap) {
619    inject_otel_context_into_nats_headers(headers, None);
620}
621
622// Inject trace headers into a generic HashMap for HTTP/TCP transports
623pub fn inject_trace_headers_into_map(headers: &mut std::collections::HashMap<String, String>) {
624    if let Some(trace_context) = get_distributed_tracing_context() {
625        // Inject W3C traceparent header
626        headers.insert(
627            "traceparent".to_string(),
628            trace_context.create_traceparent(),
629        );
630
631        // Inject optional tracestate
632        if let Some(tracestate) = trace_context.tracestate {
633            headers.insert("tracestate".to_string(), tracestate);
634        }
635
636        // Inject custom request IDs
637        if let Some(x_request_id) = trace_context.x_request_id {
638            headers.insert("x-request-id".to_string(), x_request_id);
639        }
640        if let Some(request_id) = trace_context.request_id {
641            headers.insert("request-id".to_string(), request_id);
642        }
643    }
644}
645
646/// Create a client_request span linked to the parent trace context
647pub fn make_client_request_span(
648    operation: &str,
649    request_id: &str,
650    trace_context: Option<&DistributedTraceContext>,
651    instance_id: Option<&str>,
652) -> Span {
653    if let Some(ctx) = trace_context {
654        let mut headers = async_nats::HeaderMap::new();
655        headers.insert("traceparent", ctx.create_traceparent());
656
657        if let Some(ref tracestate) = ctx.tracestate {
658            headers.insert("tracestate", tracestate.as_str());
659        }
660
661        let (otel_context, _extracted_trace_id, _extracted_parent_span_id) =
662            extract_otel_context_from_nats_headers(&headers);
663
664        let span = if let Some(inst_id) = instance_id {
665            tracing::info_span!(
666                "client_request",
667                operation = operation,
668                request_id = request_id,
669                instance_id = inst_id,
670                trace_id = ctx.trace_id.as_str(),
671                parent_id = ctx.span_id.as_str(),
672                x_request_id = ctx.x_request_id.as_deref(),
673            )
674        } else {
675            tracing::info_span!(
676                "client_request",
677                operation = operation,
678                request_id = request_id,
679                trace_id = ctx.trace_id.as_str(),
680                parent_id = ctx.span_id.as_str(),
681                x_request_id = ctx.x_request_id.as_deref(),
682            )
683        };
684
685        if let Some(context) = otel_context {
686            let _ = span.set_parent(context);
687        }
688
689        span
690    } else if let Some(inst_id) = instance_id {
691        tracing::info_span!(
692            "client_request",
693            operation = operation,
694            request_id = request_id,
695            instance_id = inst_id,
696        )
697    } else {
698        tracing::info_span!(
699            "client_request",
700            operation = operation,
701            request_id = request_id,
702        )
703    }
704}
705
706#[derive(Debug, Default)]
707pub struct FieldVisitor {
708    pub fields: HashMap<String, String>,
709}
710
711impl Visit for FieldVisitor {
712    fn record_str(&mut self, field: &Field, value: &str) {
713        self.fields
714            .insert(field.name().to_string(), value.to_string());
715    }
716
717    fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
718        self.fields
719            .insert(field.name().to_string(), format!("{:?}", value).to_string());
720    }
721}
722
723impl<S> Layer<S> for DistributedTraceIdLayer
724where
725    S: Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>,
726{
727    // Capture close span time
728    // Currently not used but added for future use in timing
729    fn on_close(&self, id: Id, ctx: Context<'_, S>) {
730        if let Some(span) = ctx.span(&id) {
731            let mut extensions = span.extensions_mut();
732            if let Some(distributed_tracing_context) =
733                extensions.get_mut::<DistributedTraceContext>()
734            {
735                distributed_tracing_context.end = Some(Instant::now());
736            }
737        }
738    }
739
740    // Collects span attributes and metadata in on_new_span
741    // Final initialization deferred to on_enter when OtelData is available
742    fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &Id, ctx: Context<'_, S>) {
743        if let Some(span) = ctx.span(id) {
744            let mut trace_id: Option<String> = None;
745            let mut parent_id: Option<String> = None;
746            let mut span_id: Option<String> = None;
747            let mut x_request_id: Option<String> = None;
748            let mut request_id: Option<String> = None;
749            let mut tracestate: Option<String> = None;
750            let mut visitor = FieldVisitor::default();
751            attrs.record(&mut visitor);
752
753            // Extract trace_id from span attributes
754            if let Some(trace_id_input) = visitor.fields.get("trace_id") {
755                if !is_valid_trace_id(trace_id_input) {
756                    tracing::trace!("trace id  '{trace_id_input}' is not valid! Ignoring.");
757                } else {
758                    trace_id = Some(trace_id_input.to_string());
759                }
760            }
761
762            // Extract span_id from span attributes
763            if let Some(span_id_input) = visitor.fields.get("span_id") {
764                if !is_valid_span_id(span_id_input) {
765                    tracing::trace!("span id  '{span_id_input}' is not valid! Ignoring.");
766                } else {
767                    span_id = Some(span_id_input.to_string());
768                }
769            }
770
771            // Extract parent_id from span attributes
772            if let Some(parent_id_input) = visitor.fields.get("parent_id") {
773                if !is_valid_span_id(parent_id_input) {
774                    tracing::trace!("parent id  '{parent_id_input}' is not valid! Ignoring.");
775                } else {
776                    parent_id = Some(parent_id_input.to_string());
777                }
778            }
779
780            // Extract tracestate
781            if let Some(tracestate_input) = visitor.fields.get("tracestate") {
782                tracestate = Some(tracestate_input.to_string());
783            }
784
785            // Extract x_request_id
786            if let Some(x_request_id_input) = visitor.fields.get("x_request_id") {
787                x_request_id = Some(x_request_id_input.to_string());
788            }
789
790            // Extract request_id (with backward compat for x_dynamo_request_id)
791            if let Some(request_id_input) = visitor.fields.get("request_id") {
792                request_id = Some(request_id_input.to_string());
793            } else if let Some(x_request_id_input) = visitor.fields.get("x_dynamo_request_id") {
794                request_id = Some(x_request_id_input.to_string());
795            }
796
797            // Inherit trace context from parent span if available
798            if parent_id.is_none()
799                && let Some(parent_span_id) = ctx.current_span().id()
800                && let Some(parent_span) = ctx.span(parent_span_id)
801            {
802                let parent_ext = parent_span.extensions();
803                if let Some(parent_tracing_context) = parent_ext.get::<DistributedTraceContext>() {
804                    trace_id = Some(parent_tracing_context.trace_id.clone());
805                    parent_id = Some(parent_tracing_context.span_id.clone());
806                    tracestate = parent_tracing_context.tracestate.clone();
807                    if x_request_id.is_none() {
808                        x_request_id = parent_tracing_context.x_request_id.clone();
809                    }
810                    if request_id.is_none() {
811                        request_id = parent_tracing_context.request_id.clone();
812                    }
813                }
814            }
815
816            // Validate consistency
817            if (parent_id.is_some() || span_id.is_some()) && trace_id.is_none() {
818                tracing::error!("parent id or span id are set but trace id is not set!");
819                // Clear inconsistent IDs to maintain trace integrity
820                parent_id = None;
821                span_id = None;
822            }
823
824            // Store pending context - will be finalized in on_enter
825            let mut extensions = span.extensions_mut();
826            extensions.insert(PendingDistributedTraceContext {
827                trace_id,
828                span_id,
829                parent_id,
830                tracestate,
831                x_request_id,
832                request_id,
833            });
834        }
835    }
836
837    // Finalizes the DistributedTraceContext when span is entered
838    // At this point, OtelData should have valid trace_id and span_id
839    fn on_enter(&self, id: &Id, ctx: Context<'_, S>) {
840        if let Some(span) = ctx.span(id) {
841            // Check if already initialized (e.g., span re-entered)
842            {
843                let extensions = span.extensions();
844                if extensions.get::<DistributedTraceContext>().is_some() {
845                    return;
846                }
847            }
848
849            // Get the pending context and extract OtelData IDs
850            let mut extensions = span.extensions_mut();
851            let pending = match extensions.remove::<PendingDistributedTraceContext>() {
852                Some(p) => p,
853                None => {
854                    // This shouldn't happen - on_new_span should have created it
855                    tracing::error!("PendingDistributedTraceContext not found in on_enter");
856                    return;
857                }
858            };
859
860            let mut trace_id = pending.trace_id;
861            let mut span_id = pending.span_id;
862            let parent_id = pending.parent_id;
863            let tracestate = pending.tracestate;
864            let x_request_id = pending.x_request_id;
865            let request_id = pending.request_id;
866
867            // Try to extract from OtelData if not already set
868            // Need to drop extensions_mut to get immutable borrow for OtelData
869            drop(extensions);
870
871            if trace_id.is_none() || span_id.is_none() {
872                let extensions = span.extensions();
873                if let Some(otel_data) = extensions.get::<tracing_opentelemetry::OtelData>() {
874                    // Extract trace_id from OTEL data if not already set
875                    if trace_id.is_none()
876                        && let Some(otel_trace_id) = otel_data.trace_id()
877                    {
878                        let trace_id_str = format!("{}", otel_trace_id);
879                        if is_valid_trace_id(&trace_id_str) {
880                            trace_id = Some(trace_id_str);
881                        }
882                    }
883
884                    // Extract span_id from OTEL data if not already set
885                    if span_id.is_none()
886                        && let Some(otel_span_id) = otel_data.span_id()
887                    {
888                        let span_id_str = format!("{}", otel_span_id);
889                        if is_valid_span_id(&span_id_str) {
890                            span_id = Some(span_id_str);
891                        }
892                    }
893                }
894            }
895
896            // Panic if we still don't have required IDs
897            if trace_id.is_none() {
898                panic!(
899                    "trace_id is not set in on_enter - OtelData may not be properly initialized"
900                );
901            }
902
903            if span_id.is_none() {
904                panic!("span_id is not set in on_enter - OtelData may not be properly initialized");
905            }
906
907            let span_level = span.metadata().level();
908            let mut extensions = span.extensions_mut();
909            extensions.insert(DistributedTraceContext {
910                trace_id: trace_id.expect("Trace ID must be set"),
911                span_id: span_id.expect("Span ID must be set"),
912                parent_id,
913                tracestate,
914                start: Some(Instant::now()),
915                end: None,
916                x_request_id,
917                request_id,
918            });
919
920            drop(extensions);
921
922            // Emit SPAN_FIRST_ENTRY event. This only runs if the span passed the layer's filter
923            // (on_enter is not called for filtered-out spans), so no additional check needed.
924            if span_events_enabled() {
925                emit_at_level!(span_level, target: "span_event", message = "SPAN_FIRST_ENTRY");
926            }
927        }
928    }
929}
930
931// Enables functions to retreive their current
932// context for adding to distributed headers
933pub fn get_distributed_tracing_context() -> Option<DistributedTraceContext> {
934    Span::current()
935        .with_subscriber(|(id, subscriber)| {
936            subscriber
937                .downcast_ref::<Registry>()
938                .and_then(|registry| registry.span_data(id))
939                .and_then(|span_data| {
940                    let extensions = span_data.extensions();
941                    extensions.get::<DistributedTraceContext>().cloned()
942                })
943        })
944        .flatten()
945}
946
947/// Initialize the logger - must be called when Tokio runtime is available
948pub fn init() {
949    INIT.call_once(|| {
950        if let Err(e) = setup_logging() {
951            eprintln!("Failed to initialize logging: {}", e);
952            std::process::exit(1);
953        }
954    });
955}
956
957#[cfg(feature = "tokio-console")]
958fn setup_logging() -> Result<(), Box<dyn std::error::Error>> {
959    let tokio_console_layer = console_subscriber::ConsoleLayer::builder()
960        .with_default_env()
961        .server_addr(([0, 0, 0, 0], console_subscriber::Server::DEFAULT_PORT))
962        .spawn();
963    let tokio_console_target = tracing_subscriber::filter::Targets::new()
964        .with_default(LevelFilter::ERROR)
965        .with_target("runtime", LevelFilter::TRACE)
966        .with_target("tokio", LevelFilter::TRACE);
967    let l = fmt::layer()
968        .with_ansi(!disable_ansi_logging())
969        .event_format(fmt::format().compact().with_timer(TimeFormatter::new()))
970        .with_writer(std::io::stderr)
971        .with_filter(filters(load_config()));
972    tracing_subscriber::registry()
973        .with(l)
974        .with(tokio_console_layer.with_filter(tokio_console_target))
975        .init();
976    Ok(())
977}
978
979#[cfg(not(feature = "tokio-console"))]
980fn setup_logging() -> Result<(), Box<dyn std::error::Error>> {
981    let fmt_filter_layer = filters(load_config());
982    let trace_filter_layer = filters(load_config());
983    let otel_filter_layer = filters(load_config());
984    let otel_logs_filter_layer = filters(load_config());
985
986    if jsonl_logging_enabled() {
987        let span_events = if span_events_enabled() {
988            FmtSpan::CLOSE
989        } else {
990            FmtSpan::NONE
991        };
992        let l = fmt::layer()
993            .with_ansi(false)
994            .with_span_events(span_events)
995            .event_format(CustomJsonFormatter::new())
996            .with_writer(std::io::stderr)
997            .with_filter(fmt_filter_layer);
998
999        // Create OpenTelemetry tracer - conditionally export to OTLP based on env var
1000        let service_name = get_service_name();
1001
1002        // Build tracer and logger providers - with or without OTLP export
1003        let (tracer_provider, logger_provider_opt, endpoint_opt) = if otlp_exporter_enabled() {
1004            // Export enabled: create OTLP exporters with batch processors
1005            let traces_endpoint =
1006                std::env::var(env_logging::otlp::OTEL_EXPORTER_OTLP_TRACES_ENDPOINT)
1007                    .unwrap_or_else(|_| DEFAULT_OTLP_ENDPOINT.to_string());
1008            let logs_endpoint = std::env::var(env_logging::otlp::OTEL_EXPORTER_OTLP_LOGS_ENDPOINT)
1009                .unwrap_or_else(|_| traces_endpoint.clone());
1010
1011            let resource = opentelemetry_sdk::Resource::builder_empty()
1012                .with_service_name(service_name.clone())
1013                .build();
1014
1015            // Initialize OTLP span exporter using gRPC (Tonic)
1016            let span_exporter = opentelemetry_otlp::SpanExporter::builder()
1017                .with_tonic()
1018                .with_endpoint(&traces_endpoint)
1019                .build()?;
1020
1021            let tracer_provider = opentelemetry_sdk::trace::SdkTracerProvider::builder()
1022                .with_batch_exporter(span_exporter)
1023                .with_resource(resource.clone())
1024                .build();
1025
1026            // Initialize OTLP log exporter using gRPC (Tonic)
1027            let log_exporter = opentelemetry_otlp::LogExporter::builder()
1028                .with_tonic()
1029                .with_endpoint(&logs_endpoint)
1030                .build()?;
1031
1032            let logger_provider = SdkLoggerProvider::builder()
1033                .with_batch_exporter(log_exporter)
1034                .with_resource(resource)
1035                .build();
1036
1037            (
1038                tracer_provider,
1039                Some(logger_provider),
1040                Some(traces_endpoint),
1041            )
1042        } else {
1043            // No export - traces generated locally only (for logging/trace IDs)
1044            let provider = opentelemetry_sdk::trace::SdkTracerProvider::builder()
1045                .with_resource(
1046                    opentelemetry_sdk::Resource::builder_empty()
1047                        .with_service_name(service_name.clone())
1048                        .build(),
1049                )
1050                .build();
1051
1052            (provider, None, None)
1053        };
1054
1055        // Get a tracer from the provider
1056        let tracer = tracer_provider.tracer(service_name.clone());
1057
1058        // Build the OTLP logs bridge layer (only when export is enabled)
1059        let otel_logs_layer = logger_provider_opt
1060            .as_ref()
1061            .map(|lp| OpenTelemetryTracingBridge::new(lp).with_filter(otel_logs_filter_layer));
1062
1063        tracing_subscriber::registry()
1064            .with(
1065                tracing_opentelemetry::layer()
1066                    .with_tracer(tracer)
1067                    .with_filter(otel_filter_layer),
1068            )
1069            .with(otel_logs_layer)
1070            .with(DistributedTraceIdLayer.with_filter(trace_filter_layer))
1071            .with(l)
1072            .init();
1073
1074        // Log initialization status after subscriber is ready
1075        if let Some(endpoint) = endpoint_opt {
1076            tracing::info!(
1077                endpoint = %endpoint,
1078                service = %service_name,
1079                "OpenTelemetry OTLP export enabled (traces and logs)"
1080            );
1081        } else {
1082            tracing::info!(
1083                service = %service_name,
1084                "OpenTelemetry OTLP export disabled, traces local only"
1085            );
1086        }
1087    } else {
1088        let l = fmt::layer()
1089            .with_ansi(!disable_ansi_logging())
1090            .event_format(fmt::format().compact().with_timer(TimeFormatter::new()))
1091            .with_writer(std::io::stderr)
1092            .with_filter(fmt_filter_layer);
1093
1094        tracing_subscriber::registry().with(l).init();
1095    }
1096
1097    Ok(())
1098}
1099
1100fn filters(config: LoggingConfig) -> EnvFilter {
1101    let mut filter_layer = EnvFilter::builder()
1102        .with_default_directive(config.log_level.parse().unwrap())
1103        .with_env_var(env_logging::DYN_LOG)
1104        .from_env_lossy();
1105
1106    for (module, level) in config.log_filters {
1107        match format!("{module}={level}").parse::<Directive>() {
1108            Ok(d) => {
1109                filter_layer = filter_layer.add_directive(d);
1110            }
1111            Err(e) => {
1112                eprintln!("Failed parsing filter '{level}' for module '{module}': {e}");
1113            }
1114        }
1115    }
1116
1117    // When span events are enabled, allow "span_event" target at all levels
1118    // This ensures SPAN_FIRST_ENTRY events pass the filter when emitted from on_enter
1119    if span_events_enabled() {
1120        filter_layer = filter_layer.add_directive("span_event=trace".parse().unwrap());
1121    }
1122
1123    // Always allow infrastructure request spans regardless of DYN_LOG level.
1124    // This ensures request context (request_id, model, trace_id) is always
1125    // available on log events, even when DYN_LOG=error or DYN_LOG=warn.
1126    // Can be overridden via DYN_LOG=request_span=<level> if needed.
1127    filter_layer = filter_layer.add_directive("request_span=trace".parse().unwrap());
1128
1129    filter_layer
1130}
1131
1132/// Log a message with file and line info
1133/// Used by Python wrapper
1134pub fn log_message(level: &str, message: &str, module: &str, file: &str, line: u32) {
1135    let level = match level {
1136        "debug" => log::Level::Debug,
1137        "info" => log::Level::Info,
1138        "warn" => log::Level::Warn,
1139        "error" => log::Level::Error,
1140        "warning" => log::Level::Warn,
1141        _ => log::Level::Info,
1142    };
1143    log::logger().log(
1144        &log::Record::builder()
1145            .args(format_args!("{}", message))
1146            .level(level)
1147            .target(module)
1148            .file(Some(file))
1149            .line(Some(line))
1150            .build(),
1151    );
1152}
1153
1154fn load_config() -> LoggingConfig {
1155    let config_path =
1156        std::env::var(env_logging::DYN_LOGGING_CONFIG_PATH).unwrap_or_else(|_| "".to_string());
1157    let figment = Figment::new()
1158        .merge(Serialized::defaults(LoggingConfig::default()))
1159        .merge(Toml::file("/opt/dynamo/etc/logging.toml"))
1160        .merge(Toml::file(config_path));
1161
1162    figment.extract().unwrap()
1163}
1164
1165#[derive(Serialize)]
1166struct JsonLog<'a> {
1167    time: String,
1168    level: String,
1169    #[serde(skip_serializing_if = "Option::is_none")]
1170    file: Option<&'a str>,
1171    #[serde(skip_serializing_if = "Option::is_none")]
1172    line: Option<u32>,
1173    target: String,
1174    message: serde_json::Value,
1175    #[serde(flatten)]
1176    fields: BTreeMap<String, serde_json::Value>,
1177}
1178
1179struct TimeFormatter {
1180    use_local_tz: bool,
1181}
1182
1183impl TimeFormatter {
1184    fn new() -> Self {
1185        Self {
1186            use_local_tz: crate::config::use_local_timezone(),
1187        }
1188    }
1189
1190    fn format_now(&self) -> String {
1191        if self.use_local_tz {
1192            chrono::Local::now()
1193                .format("%Y-%m-%dT%H:%M:%S%.6f%:z")
1194                .to_string()
1195        } else {
1196            chrono::Utc::now()
1197                .format("%Y-%m-%dT%H:%M:%S%.6fZ")
1198                .to_string()
1199        }
1200    }
1201}
1202
1203impl FormatTime for TimeFormatter {
1204    fn format_time(&self, w: &mut fmt::format::Writer<'_>) -> std::fmt::Result {
1205        write!(w, "{}", self.format_now())
1206    }
1207}
1208
1209struct CustomJsonFormatter {
1210    time_formatter: TimeFormatter,
1211}
1212
1213impl CustomJsonFormatter {
1214    fn new() -> Self {
1215        Self {
1216            time_formatter: TimeFormatter::new(),
1217        }
1218    }
1219}
1220
1221use once_cell::sync::Lazy;
1222use regex::Regex;
1223
1224/// Static W3C Trace Context propagator instance to avoid repeated allocations
1225static TRACE_PROPAGATOR: Lazy<opentelemetry_sdk::propagation::TraceContextPropagator> =
1226    Lazy::new(opentelemetry_sdk::propagation::TraceContextPropagator::new);
1227
1228fn parse_tracing_duration(s: &str) -> Option<u64> {
1229    static RE: Lazy<Regex> =
1230        Lazy::new(|| Regex::new(r#"^["']?\s*([0-9.]+)\s*(µs|us|ns|ms|s)\s*["']?$"#).unwrap());
1231    let captures = RE.captures(s)?;
1232    let value: f64 = captures[1].parse().ok()?;
1233    let unit = &captures[2];
1234    match unit {
1235        "ns" => Some((value / 1000.0) as u64),
1236        "µs" | "us" => Some(value as u64),
1237        "ms" => Some((value * 1000.0) as u64),
1238        "s" => Some((value * 1_000_000.0) as u64),
1239        _ => None,
1240    }
1241}
1242
1243impl<S, N> tracing_subscriber::fmt::FormatEvent<S, N> for CustomJsonFormatter
1244where
1245    S: Subscriber + for<'a> LookupSpan<'a>,
1246    N: for<'a> FormatFields<'a> + 'static,
1247{
1248    fn format_event(
1249        &self,
1250        ctx: &FmtContext<'_, S, N>,
1251        mut writer: Writer<'_>,
1252        event: &Event<'_>,
1253    ) -> std::fmt::Result {
1254        let mut visitor = JsonVisitor::default();
1255        let time = self.time_formatter.format_now();
1256        event.record(&mut visitor);
1257        let mut message = visitor
1258            .fields
1259            .remove("message")
1260            .unwrap_or(serde_json::Value::String("".to_string()));
1261
1262        let mut target_override: Option<String> = None;
1263
1264        let current_span = event
1265            .parent()
1266            .and_then(|id| ctx.span(id))
1267            .or_else(|| ctx.lookup_current());
1268        if let Some(span) = current_span {
1269            let ext = span.extensions();
1270            let data = ext.get::<FormattedFields<N>>().unwrap();
1271            let span_fields: Vec<(&str, &str)> = data
1272                .fields
1273                .split(' ')
1274                .filter_map(|entry| entry.split_once('='))
1275                .collect();
1276            for (name, value) in span_fields {
1277                visitor.fields.insert(
1278                    name.to_string(),
1279                    serde_json::Value::String(value.trim_matches('"').to_string()),
1280                );
1281            }
1282
1283            let busy_us = visitor
1284                .fields
1285                .remove("time.busy")
1286                .and_then(|v| parse_tracing_duration(&v.to_string()));
1287            let idle_us = visitor
1288                .fields
1289                .remove("time.idle")
1290                .and_then(|v| parse_tracing_duration(&v.to_string()));
1291
1292            if let (Some(busy_us), Some(idle_us)) = (busy_us, idle_us) {
1293                visitor.fields.insert(
1294                    "time.busy_us".to_string(),
1295                    serde_json::Value::Number(busy_us.into()),
1296                );
1297                visitor.fields.insert(
1298                    "time.idle_us".to_string(),
1299                    serde_json::Value::Number(idle_us.into()),
1300                );
1301                visitor.fields.insert(
1302                    "time.duration_us".to_string(),
1303                    serde_json::Value::Number((busy_us + idle_us).into()),
1304                );
1305            }
1306
1307            let is_span_created = message.as_str() == Some("SPAN_FIRST_ENTRY");
1308            let is_span_closed = message.as_str() == Some("close");
1309            if is_span_created || is_span_closed {
1310                target_override = Some(span.metadata().target().to_string());
1311                if is_span_closed {
1312                    message = serde_json::Value::String("SPAN_CLOSED".to_string());
1313                }
1314            }
1315
1316            visitor.fields.insert(
1317                "span_name".to_string(),
1318                serde_json::Value::String(span.name().to_string()),
1319            );
1320
1321            if let Some(tracing_context) = ext.get::<DistributedTraceContext>() {
1322                visitor.fields.insert(
1323                    "span_id".to_string(),
1324                    serde_json::Value::String(tracing_context.span_id.clone()),
1325                );
1326                visitor.fields.insert(
1327                    "trace_id".to_string(),
1328                    serde_json::Value::String(tracing_context.trace_id.clone()),
1329                );
1330                if let Some(parent_id) = tracing_context.parent_id.clone() {
1331                    visitor.fields.insert(
1332                        "parent_id".to_string(),
1333                        serde_json::Value::String(parent_id),
1334                    );
1335                } else {
1336                    visitor.fields.remove("parent_id");
1337                }
1338                if let Some(tracestate) = tracing_context.tracestate.clone() {
1339                    visitor.fields.insert(
1340                        "tracestate".to_string(),
1341                        serde_json::Value::String(tracestate),
1342                    );
1343                } else {
1344                    visitor.fields.remove("tracestate");
1345                }
1346                if let Some(x_request_id) = tracing_context.x_request_id.clone() {
1347                    visitor.fields.insert(
1348                        "x_request_id".to_string(),
1349                        serde_json::Value::String(x_request_id),
1350                    );
1351                } else {
1352                    visitor.fields.remove("x_request_id");
1353                }
1354
1355                if let Some(request_id) = tracing_context.request_id.clone() {
1356                    visitor.fields.insert(
1357                        "request_id".to_string(),
1358                        serde_json::Value::String(request_id),
1359                    );
1360                } else {
1361                    visitor.fields.remove("request_id");
1362                }
1363                // Remove old field name if present
1364                visitor.fields.remove("x_dynamo_request_id");
1365            } else {
1366                tracing::error!(
1367                    "Distributed Trace Context not found, falling back to internal ids"
1368                );
1369                visitor.fields.insert(
1370                    "span_id".to_string(),
1371                    serde_json::Value::String(span.id().into_u64().to_string()),
1372                );
1373                if let Some(parent) = span.parent() {
1374                    visitor.fields.insert(
1375                        "parent_id".to_string(),
1376                        serde_json::Value::String(parent.id().into_u64().to_string()),
1377                    );
1378                }
1379            }
1380        } else {
1381            let reserved_fields = [
1382                "trace_id",
1383                "span_id",
1384                "parent_id",
1385                "span_name",
1386                "tracestate",
1387            ];
1388            for reserved_field in reserved_fields {
1389                visitor.fields.remove(reserved_field);
1390            }
1391        }
1392        let metadata = event.metadata();
1393        let log = JsonLog {
1394            level: metadata.level().to_string(),
1395            time,
1396            file: metadata.file(),
1397            line: metadata.line(),
1398            target: target_override.unwrap_or_else(|| metadata.target().to_string()),
1399            message,
1400            fields: visitor.fields,
1401        };
1402        let json = serde_json::to_string(&log).unwrap();
1403        writeln!(writer, "{json}")
1404    }
1405}
1406
1407#[derive(Default)]
1408struct JsonVisitor {
1409    fields: BTreeMap<String, serde_json::Value>,
1410}
1411
1412impl tracing::field::Visit for JsonVisitor {
1413    fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
1414        self.fields.insert(
1415            field.name().to_string(),
1416            serde_json::Value::String(format!("{value:?}")),
1417        );
1418    }
1419
1420    fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
1421        if field.name() != "message" {
1422            match serde_json::from_str::<Value>(value) {
1423                Ok(json_val) => self.fields.insert(field.name().to_string(), json_val),
1424                Err(_) => self.fields.insert(field.name().to_string(), value.into()),
1425            };
1426        } else {
1427            self.fields.insert(field.name().to_string(), value.into());
1428        }
1429    }
1430
1431    fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
1432        self.fields
1433            .insert(field.name().to_string(), serde_json::Value::Bool(value));
1434    }
1435
1436    fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
1437        self.fields.insert(
1438            field.name().to_string(),
1439            serde_json::Value::Number(value.into()),
1440        );
1441    }
1442
1443    fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
1444        self.fields.insert(
1445            field.name().to_string(),
1446            serde_json::Value::Number(value.into()),
1447        );
1448    }
1449
1450    fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
1451        use serde_json::value::Number;
1452        self.fields.insert(
1453            field.name().to_string(),
1454            serde_json::Value::Number(Number::from_f64(value).unwrap_or(0.into())),
1455        );
1456    }
1457}
1458
1459#[cfg(test)]
1460pub mod tests {
1461    use super::*;
1462    use anyhow::{Result, anyhow};
1463    use chrono::{DateTime, Utc};
1464    use jsonschema::{Draft, JSONSchema};
1465    use serde_json::Value;
1466    use std::fs::File;
1467    use std::io::{BufRead, BufReader};
1468    use stdio_override::*;
1469    use tempfile::NamedTempFile;
1470
1471    static LOG_LINE_SCHEMA: &str = r#"
1472    {
1473      "$schema": "http://json-schema.org/draft-07/schema#",
1474      "title": "Runtime Log Line",
1475      "type": "object",
1476      "required": [
1477        "file",
1478        "level",
1479        "line",
1480        "message",
1481        "target",
1482        "time"
1483      ],
1484      "properties": {
1485        "file":      { "type": "string" },
1486        "level":     { "type": "string", "enum": ["ERROR", "WARN", "INFO", "DEBUG", "TRACE"] },
1487        "line":      { "type": "integer" },
1488        "message":   { "type": "string" },
1489        "target":    { "type": "string" },
1490        "time":      { "type": "string", "format": "date-time" },
1491        "span_id":   { "type": "string", "pattern": "^[a-f0-9]{16}$" },
1492        "parent_id": { "type": "string", "pattern": "^[a-f0-9]{16}$" },
1493        "trace_id":  { "type": "string", "pattern": "^[a-f0-9]{32}$" },
1494        "span_name": { "type": "string" },
1495        "time.busy_us":     { "type": "integer" },
1496        "time.duration_us": { "type": "integer" },
1497        "time.idle_us":     { "type": "integer" },
1498        "tracestate": { "type": "string" }
1499      },
1500      "additionalProperties": true
1501    }
1502    "#;
1503
1504    #[tracing::instrument(skip_all)]
1505    async fn parent() {
1506        tracing::trace!(message = "parent!");
1507        if let Some(my_ctx) = get_distributed_tracing_context() {
1508            tracing::info!(my_trace_id = my_ctx.trace_id);
1509        }
1510        child().await;
1511    }
1512
1513    #[tracing::instrument(skip_all)]
1514    async fn child() {
1515        tracing::trace!(message = "child");
1516        if let Some(my_ctx) = get_distributed_tracing_context() {
1517            tracing::info!(my_trace_id = my_ctx.trace_id);
1518        }
1519        grandchild().await;
1520    }
1521
1522    #[tracing::instrument(skip_all)]
1523    async fn grandchild() {
1524        tracing::trace!(message = "grandchild");
1525        if let Some(my_ctx) = get_distributed_tracing_context() {
1526            tracing::info!(my_trace_id = my_ctx.trace_id);
1527        }
1528    }
1529
1530    pub fn load_log(file_name: &str) -> Result<Vec<serde_json::Value>> {
1531        let schema_json: Value =
1532            serde_json::from_str(LOG_LINE_SCHEMA).expect("schema parse failure");
1533        let compiled_schema = JSONSchema::options()
1534            .with_draft(Draft::Draft7)
1535            .compile(&schema_json)
1536            .expect("Invalid schema");
1537
1538        let f = File::open(file_name)?;
1539        let reader = BufReader::new(f);
1540        let mut result = Vec::new();
1541
1542        for (line_num, line) in reader.lines().enumerate() {
1543            let line = line?;
1544            let val: Value = serde_json::from_str(&line)
1545                .map_err(|e| anyhow!("Line {}: invalid JSON: {}", line_num + 1, e))?;
1546
1547            if let Err(errors) = compiled_schema.validate(&val) {
1548                let errs = errors.map(|e| e.to_string()).collect::<Vec<_>>().join("; ");
1549                return Err(anyhow!(
1550                    "Line {}: JSON Schema Validation errors: {}",
1551                    line_num + 1,
1552                    errs
1553                ));
1554            }
1555            println!("{}", val);
1556            result.push(val);
1557        }
1558        Ok(result)
1559    }
1560
1561    #[tokio::test]
1562    async fn test_json_log_capture() -> Result<()> {
1563        #[allow(clippy::redundant_closure_call)]
1564        let _ = temp_env::async_with_vars(
1565            [(env_logging::DYN_LOGGING_JSONL, Some("1"))],
1566            (async || {
1567                let tmp_file = NamedTempFile::new().unwrap();
1568                let file_name = tmp_file.path().to_str().unwrap();
1569                let guard = StderrOverride::from_file(file_name)?;
1570                init();
1571                parent().await;
1572                drop(guard);
1573
1574                let lines = load_log(file_name)?;
1575
1576                // 1. Extract the dynamically generated trace ID and validate consistency
1577                // All logs should have the same trace_id since they're part of the same trace
1578                // Skip any initialization logs that don't have trace_id (e.g., OTLP setup messages)
1579                //
1580                // Note: This test can fail if logging was already initialized by another test running
1581                // in parallel. Logging initialization is global (Once) and can only happen once per process.
1582                // If no trace_id is found, skip validation gracefully.
1583                let Some(trace_id) = lines
1584                    .iter()
1585                    .find_map(|log_line| log_line.get("trace_id").and_then(|v| v.as_str()))
1586                    .map(|s| s.to_string())
1587                else {
1588                    // Skip test if logging was already initialized - we can't control the output format
1589                    return Ok(());
1590                };
1591
1592                // Verify trace_id is not a zero/invalid ID
1593                assert_ne!(
1594                    trace_id, "00000000000000000000000000000000",
1595                    "trace_id should not be a zero/invalid ID"
1596                );
1597                assert!(
1598                    !trace_id.chars().all(|c| c == '0'),
1599                    "trace_id should not be all zeros"
1600                );
1601
1602                // Verify all logs have the same trace_id
1603                for log_line in &lines {
1604                    if let Some(line_trace_id) = log_line.get("trace_id") {
1605                        assert_eq!(
1606                            line_trace_id.as_str().unwrap(),
1607                            &trace_id,
1608                            "All logs should have the same trace_id"
1609                        );
1610                    }
1611                }
1612
1613                // Validate my_trace_id matches the actual trace ID
1614                for log_line in &lines {
1615                    if let Some(my_trace_id) = log_line.get("my_trace_id") {
1616                        assert_eq!(
1617                            my_trace_id,
1618                            &serde_json::Value::String(trace_id.clone()),
1619                            "my_trace_id should match the trace_id from distributed tracing context"
1620                        );
1621                    }
1622                }
1623
1624                // 2. Validate span IDs exist and are properly formatted
1625                let mut span_ids_seen: std::collections::HashSet<String> = std::collections::HashSet::new();
1626                let mut span_timestamps: std::collections::HashMap<String, DateTime<Utc>> = std::collections::HashMap::new();
1627
1628                for log_line in &lines {
1629                    if let Some(span_id) = log_line.get("span_id") {
1630                        let span_id_str = span_id.as_str().unwrap();
1631                        assert!(
1632                            is_valid_span_id(span_id_str),
1633                            "Invalid span_id format: {}",
1634                            span_id_str
1635                        );
1636                        span_ids_seen.insert(span_id_str.to_string());
1637                    }
1638
1639                    // Validate timestamp format and track span timestamps
1640                    if let Some(time_str) = log_line.get("time").and_then(|v| v.as_str()) {
1641                        let timestamp = DateTime::parse_from_rfc3339(time_str)
1642                            .expect("All timestamps should be valid RFC3339 format")
1643                            .with_timezone(&Utc);
1644
1645                        // Track timestamp for each span_name
1646                        if let Some(span_name) = log_line.get("span_name").and_then(|v| v.as_str()) {
1647                            span_timestamps.insert(span_name.to_string(), timestamp);
1648                        }
1649                    }
1650                }
1651
1652                // 3. Validate parent-child span relationships
1653                // Extract span IDs for each span by looking at their log messages
1654                let parent_span_id = lines
1655                    .iter()
1656                    .find(|log_line| {
1657                        log_line.get("span_name")
1658                            .and_then(|v| v.as_str()) == Some("parent")
1659                    })
1660                    .and_then(|log_line| {
1661                        log_line.get("span_id")
1662                            .and_then(|v| v.as_str())
1663                            .map(|s| s.to_string())
1664                    })
1665                    .expect("Should find parent span with span_id");
1666
1667                let child_span_id = lines
1668                    .iter()
1669                    .find(|log_line| {
1670                        log_line.get("span_name")
1671                            .and_then(|v| v.as_str()) == Some("child")
1672                    })
1673                    .and_then(|log_line| {
1674                        log_line.get("span_id")
1675                            .and_then(|v| v.as_str())
1676                            .map(|s| s.to_string())
1677                    })
1678                    .expect("Should find child span with span_id");
1679
1680                let grandchild_span_id = lines
1681                    .iter()
1682                    .find(|log_line| {
1683                        log_line.get("span_name")
1684                            .and_then(|v| v.as_str()) == Some("grandchild")
1685                    })
1686                    .and_then(|log_line| {
1687                        log_line.get("span_id")
1688                            .and_then(|v| v.as_str())
1689                            .map(|s| s.to_string())
1690                    })
1691                    .expect("Should find grandchild span with span_id");
1692
1693                // Verify span IDs are unique
1694                assert_ne!(parent_span_id, child_span_id, "Parent and child should have different span IDs");
1695                assert_ne!(child_span_id, grandchild_span_id, "Child and grandchild should have different span IDs");
1696                assert_ne!(parent_span_id, grandchild_span_id, "Parent and grandchild should have different span IDs");
1697
1698                // Verify parent span has no parent_id
1699                for log_line in &lines {
1700                    if let Some(span_name) = log_line.get("span_name")
1701                        && let Some(span_name_str) = span_name.as_str()
1702                        && span_name_str == "parent"
1703                    {
1704                        assert!(
1705                            log_line.get("parent_id").is_none(),
1706                            "Parent span should not have a parent_id"
1707                        );
1708                    }
1709                }
1710
1711                // Verify child span's parent_id is parent_span_id
1712                for log_line in &lines {
1713                    if let Some(span_name) = log_line.get("span_name")
1714                        && let Some(span_name_str) = span_name.as_str()
1715                        && span_name_str == "child"
1716                    {
1717                        let parent_id = log_line.get("parent_id")
1718                            .and_then(|v| v.as_str())
1719                            .expect("Child span should have a parent_id");
1720                        assert_eq!(
1721                            parent_id,
1722                            parent_span_id,
1723                            "Child's parent_id should match parent's span_id"
1724                        );
1725                    }
1726                }
1727
1728                // Verify grandchild span's parent_id is child_span_id
1729                for log_line in &lines {
1730                    if let Some(span_name) = log_line.get("span_name")
1731                        && let Some(span_name_str) = span_name.as_str()
1732                        && span_name_str == "grandchild"
1733                    {
1734                        let parent_id = log_line.get("parent_id")
1735                            .and_then(|v| v.as_str())
1736                            .expect("Grandchild span should have a parent_id");
1737                        assert_eq!(
1738                            parent_id,
1739                            child_span_id,
1740                            "Grandchild's parent_id should match child's span_id"
1741                        );
1742                    }
1743                }
1744
1745                // 4. Validate timestamp ordering - spans should log in execution order
1746                let parent_time = span_timestamps.get("parent")
1747                    .expect("Should have timestamp for parent span");
1748                let child_time = span_timestamps.get("child")
1749                    .expect("Should have timestamp for child span");
1750                let grandchild_time = span_timestamps.get("grandchild")
1751                    .expect("Should have timestamp for grandchild span");
1752
1753                // Parent logs first (or at same time), then child, then grandchild
1754                assert!(
1755                    parent_time <= child_time,
1756                    "Parent span should log before or at same time as child span (parent: {}, child: {})",
1757                    parent_time,
1758                    child_time
1759                );
1760                assert!(
1761                    child_time <= grandchild_time,
1762                    "Child span should log before or at same time as grandchild span (child: {}, grandchild: {})",
1763                    child_time,
1764                    grandchild_time
1765                );
1766
1767                Ok::<(), anyhow::Error>(())
1768            })(),
1769        )
1770        .await;
1771        Ok(())
1772    }
1773
1774    // Test functions at different log levels for filtering tests
1775    #[tracing::instrument(level = "debug", skip_all)]
1776    async fn debug_level_span() {
1777        tracing::debug!("inside debug span");
1778    }
1779
1780    #[tracing::instrument(level = "info", skip_all)]
1781    async fn info_level_span() {
1782        tracing::info!("inside info span");
1783    }
1784
1785    #[tracing::instrument(level = "warn", skip_all)]
1786    async fn warn_level_span() {
1787        tracing::warn!("inside warn span");
1788    }
1789
1790    // Span from a different target - should be FILTERED OUT at info level
1791    // because the filter is warn,dynamo_runtime::logging::tests=debug
1792    #[tracing::instrument(level = "info", target = "other_module", skip_all)]
1793    async fn other_target_info_span() {
1794        tracing::info!(target: "other_module", "inside other target span");
1795    }
1796
1797    /// Comprehensive test for span events covering:
1798    /// - SPAN_FIRST_ENTRY and SPAN_CLOSED event emission
1799    /// - Trace context (trace_id, span_id) in span events
1800    /// - Timing information in SPAN_CLOSED events
1801    /// - Level-based filtering (positive: allowed levels pass, negative: filtered levels blocked)
1802    /// - Target-based filtering (spans from allowed targets pass even at lower levels)
1803    ///
1804    /// This test runs in a subprocess to ensure logging is initialized with our specific
1805    /// filter settings (DYN_LOG=warn,dynamo_runtime::logging::tests=debug), avoiding
1806    /// interference from other tests that may have initialized logging first.
1807    #[test]
1808    fn test_span_events() {
1809        use std::process::Command;
1810
1811        // Run cargo test for the subprocess test with specific env vars
1812        let output = Command::new("cargo")
1813            .args([
1814                "test",
1815                "-p",
1816                "dynamo-runtime",
1817                "test_span_events_subprocess",
1818                "--",
1819                "--exact",
1820                "--nocapture",
1821            ])
1822            .env("DYN_LOGGING_JSONL", "1")
1823            .env("DYN_LOGGING_SPAN_EVENTS", "1")
1824            .env("DYN_LOG", "warn,dynamo_runtime::logging::tests=debug")
1825            .output()
1826            .expect("Failed to execute subprocess test");
1827
1828        // Print output for debugging
1829        if !output.status.success() {
1830            eprintln!(
1831                "=== STDOUT ===\n{}",
1832                String::from_utf8_lossy(&output.stdout)
1833            );
1834            eprintln!(
1835                "=== STDERR ===\n{}",
1836                String::from_utf8_lossy(&output.stderr)
1837            );
1838        }
1839
1840        assert!(
1841            output.status.success(),
1842            "Subprocess test failed with exit code: {:?}",
1843            output.status.code()
1844        );
1845    }
1846
1847    /// Subprocess test that performs the actual span event validation.
1848    /// This is called by test_span_events in a separate process with controlled env vars.
1849    #[tokio::test]
1850    async fn test_span_events_subprocess() -> Result<()> {
1851        // Skip if not running as subprocess (env vars not set)
1852        if std::env::var("DYN_LOGGING_SPAN_EVENTS").is_err() {
1853            return Ok(());
1854        }
1855
1856        let tmp_file = NamedTempFile::new().unwrap();
1857        let file_name = tmp_file.path().to_str().unwrap();
1858        let guard = StderrOverride::from_file(file_name)?;
1859        init();
1860
1861        // Run parent/child/grandchild spans (all INFO level by default)
1862        parent().await;
1863
1864        // Run spans at explicit levels from our test module
1865        debug_level_span().await;
1866        info_level_span().await;
1867        warn_level_span().await;
1868
1869        // Run span from different target (should be filtered out)
1870        other_target_info_span().await;
1871
1872        drop(guard);
1873
1874        let lines = load_log(file_name)?;
1875
1876        // Helper to check if a span event exists
1877        let has_span_event = |msg: &str, span_name: &str| {
1878            lines.iter().any(|log| {
1879                log.get("message").and_then(|v| v.as_str()) == Some(msg)
1880                    && log.get("span_name").and_then(|v| v.as_str()) == Some(span_name)
1881            })
1882        };
1883
1884        // Helper to get span events
1885        let get_span_events = |msg: &str| -> Vec<&serde_json::Value> {
1886            lines
1887                .iter()
1888                .filter(|log| log.get("message").and_then(|v| v.as_str()) == Some(msg))
1889                .collect()
1890        };
1891
1892        // === Test 1: SPAN_FIRST_ENTRY events have required fields ===
1893        let span_created_events = get_span_events("SPAN_FIRST_ENTRY");
1894        for event in &span_created_events {
1895            // Must have span_name
1896            assert!(
1897                event.get("span_name").is_some(),
1898                "SPAN_FIRST_ENTRY must have span_name"
1899            );
1900            // Must have valid trace_id (format check)
1901            let trace_id = event
1902                .get("trace_id")
1903                .and_then(|v| v.as_str())
1904                .expect("SPAN_FIRST_ENTRY must have trace_id");
1905            assert!(
1906                trace_id.len() == 32 && trace_id.chars().all(|c| c.is_ascii_hexdigit()),
1907                "SPAN_FIRST_ENTRY must have valid trace_id format"
1908            );
1909            // Must have valid span_id
1910            let span_id = event
1911                .get("span_id")
1912                .and_then(|v| v.as_str())
1913                .expect("SPAN_FIRST_ENTRY must have span_id");
1914            assert!(
1915                is_valid_span_id(span_id),
1916                "SPAN_FIRST_ENTRY must have valid span_id"
1917            );
1918        }
1919
1920        // === Test 2: SPAN_CLOSED events have timing info ===
1921        let span_closed_events = get_span_events("SPAN_CLOSED");
1922        for event in &span_closed_events {
1923            assert!(
1924                event.get("span_name").is_some(),
1925                "SPAN_CLOSED must have span_name"
1926            );
1927            assert!(
1928                event.get("time.busy_us").is_some()
1929                    || event.get("time.idle_us").is_some()
1930                    || event.get("time.duration_us").is_some(),
1931                "SPAN_CLOSED must have timing information"
1932            );
1933            // Must have valid trace_id
1934            let trace_id = event
1935                .get("trace_id")
1936                .and_then(|v| v.as_str())
1937                .expect("SPAN_CLOSED must have trace_id");
1938            assert!(
1939                trace_id.len() == 32 && trace_id.chars().all(|c| c.is_ascii_hexdigit()),
1940                "SPAN_CLOSED must have valid trace_id format"
1941            );
1942        }
1943
1944        // === Test 3: Target-based filtering (positive) ===
1945        // Spans from dynamo_runtime::logging::tests should pass at ALL levels
1946        // because the target is allowed at debug level
1947        assert!(
1948            has_span_event("SPAN_FIRST_ENTRY", "debug_level_span"),
1949            "DEBUG span from allowed target MUST pass (target=debug filter)"
1950        );
1951        assert!(
1952            has_span_event("SPAN_FIRST_ENTRY", "info_level_span"),
1953            "INFO span from allowed target MUST pass (target=debug filter)"
1954        );
1955        assert!(
1956            has_span_event("SPAN_FIRST_ENTRY", "warn_level_span"),
1957            "WARN span from allowed target MUST pass (target=debug filter)"
1958        );
1959
1960        // parent/child/grandchild are INFO level from allowed target - should pass
1961        assert!(
1962            has_span_event("SPAN_FIRST_ENTRY", "parent"),
1963            "parent span (INFO) from allowed target MUST pass"
1964        );
1965        assert!(
1966            has_span_event("SPAN_FIRST_ENTRY", "child"),
1967            "child span (INFO) from allowed target MUST pass"
1968        );
1969        assert!(
1970            has_span_event("SPAN_FIRST_ENTRY", "grandchild"),
1971            "grandchild span (INFO) from allowed target MUST pass"
1972        );
1973
1974        // === Test 4: Level-based filtering (negative) ===
1975        // Verify spans from OTHER targets at debug/info level are filtered out
1976        assert!(
1977            !has_span_event("SPAN_FIRST_ENTRY", "other_target_info_span"),
1978            "INFO span from non-allowed target (other_module) MUST be filtered out"
1979        );
1980
1981        // Also verify no spans from other targets appear at debug/info level
1982        for event in &span_created_events {
1983            let target = event.get("target").and_then(|v| v.as_str()).unwrap_or("");
1984            let level = event.get("level").and_then(|v| v.as_str()).unwrap_or("");
1985
1986            // If level is DEBUG or INFO, target must be our test module
1987            if level == "DEBUG" || level == "INFO" {
1988                assert!(
1989                    target.contains("dynamo_runtime::logging::tests"),
1990                    "DEBUG/INFO span must be from allowed target, got target={target}"
1991                );
1992            }
1993        }
1994
1995        Ok(())
1996    }
1997}