Skip to main content

dynamo_runtime/
logging.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Dynamo Distributed Logging Module.
5//!
6//! - Configuration loaded from:
7//!   1. Environment variables (highest priority).
8//!   2. Optional TOML file pointed to by the `DYN_LOGGING_CONFIG_PATH` environment variable.
9//!   3. `/opt/dynamo/etc/logging.toml`.
10//!
11//! Logging can take two forms: `READABLE` or `JSONL`. The default is `READABLE`. `JSONL`
12//! can be enabled by setting the `DYN_LOGGING_JSONL` environment variable to `1`.
13//!
14//! To use local timezone for logging timestamps, set the `DYN_LOG_USE_LOCAL_TZ` environment variable to `1`.
15//!
16//! Filters can be configured using the `DYN_LOG` environment variable or by setting the `filters`
17//! key in the TOML configuration file. Filters are comma-separated key-value pairs where the key
18//! is the crate or module name and the value is the log level. The default log level is `info`.
19//!
20//! Example:
21//! ```toml
22//! log_level = "error"
23//!
24//! [log_filters]
25//! "test_logging" = "info"
26//! "test_logging::api" = "trace"
27//! ```
28
29use std::collections::{BTreeMap, HashMap};
30use std::sync::Once;
31
32use figment::{
33    Figment,
34    providers::{Format, Serialized, Toml},
35};
36use serde::{Deserialize, Serialize};
37use tracing::level_filters::LevelFilter;
38use tracing::{Event, Subscriber};
39use tracing_subscriber::EnvFilter;
40use tracing_subscriber::fmt::time::FormatTime;
41use tracing_subscriber::fmt::time::LocalTime;
42use tracing_subscriber::fmt::time::SystemTime;
43use tracing_subscriber::fmt::time::UtcTime;
44use tracing_subscriber::fmt::{FmtContext, FormatFields};
45use tracing_subscriber::fmt::{FormattedFields, format::Writer};
46use tracing_subscriber::prelude::*;
47use tracing_subscriber::registry::LookupSpan;
48use tracing_subscriber::{filter::Directive, fmt};
49
50use crate::config::{disable_ansi_logging, jsonl_logging_enabled, span_events_enabled};
51use async_nats::{HeaderMap, HeaderValue};
52use axum::extract::FromRequestParts;
53use axum::http;
54use axum::http::Request;
55use axum::http::request::Parts;
56use serde_json::Value;
57use std::convert::Infallible;
58use std::time::Instant;
59use tower_http::trace::{DefaultMakeSpan, TraceLayer};
60use tracing::Id;
61use tracing::Span;
62use tracing::field::Field;
63use tracing::span;
64use tracing_subscriber::Layer;
65use tracing_subscriber::Registry;
66use tracing_subscriber::field::Visit;
67use tracing_subscriber::fmt::format::FmtSpan;
68use tracing_subscriber::layer::Context;
69use tracing_subscriber::registry::SpanData;
70use uuid::Uuid;
71
72use opentelemetry::propagation::{Extractor, Injector, TextMapPropagator};
73use opentelemetry::trace::TraceContextExt;
74use opentelemetry::{global, trace::Tracer};
75use opentelemetry_otlp::WithExportConfig;
76
77use opentelemetry::trace::TracerProvider as _;
78use opentelemetry::{Key, KeyValue};
79use opentelemetry_sdk::Resource;
80use opentelemetry_sdk::trace::SdkTracerProvider;
81use tracing::error;
82use tracing_subscriber::layer::SubscriberExt;
83// use tracing_subscriber::Registry;
84
85use std::time::Duration;
86use tracing::{info, instrument};
87use tracing_opentelemetry::OpenTelemetrySpanExt;
88use tracing_subscriber::util::SubscriberInitExt;
89
90use crate::config::environment_names::logging as env_logging;
91
92use dynamo_config::env_is_truthy;
93
94/// Default log level
95const DEFAULT_FILTER_LEVEL: &str = "info";
96
97/// Default OTLP endpoint
98const DEFAULT_OTLP_ENDPOINT: &str = "http://localhost:4317";
99
100/// Default service name
101const DEFAULT_OTEL_SERVICE_NAME: &str = "dynamo";
102
103/// Once instance to ensure the logger is only initialized once
104static INIT: Once = Once::new();
105
106#[derive(Serialize, Deserialize, Debug)]
107struct LoggingConfig {
108    log_level: String,
109    log_filters: HashMap<String, String>,
110}
111impl Default for LoggingConfig {
112    fn default() -> Self {
113        LoggingConfig {
114            log_level: DEFAULT_FILTER_LEVEL.to_string(),
115            log_filters: HashMap::from([
116                ("h2".to_string(), "error".to_string()),
117                ("tower".to_string(), "error".to_string()),
118                ("hyper_util".to_string(), "error".to_string()),
119                ("neli".to_string(), "error".to_string()),
120                ("async_nats".to_string(), "error".to_string()),
121                ("rustls".to_string(), "error".to_string()),
122                ("tokenizers".to_string(), "error".to_string()),
123                ("axum".to_string(), "error".to_string()),
124                ("tonic".to_string(), "error".to_string()),
125                ("mistralrs_core".to_string(), "error".to_string()),
126                ("hf_hub".to_string(), "error".to_string()),
127                ("opentelemetry".to_string(), "error".to_string()),
128                ("opentelemetry-otlp".to_string(), "error".to_string()),
129                ("opentelemetry_sdk".to_string(), "error".to_string()),
130            ]),
131        }
132    }
133}
134
135/// Check if OTLP trace exporting is enabled (accepts: "1", "true", "on", "yes" - case insensitive)
136fn otlp_exporter_enabled() -> bool {
137    env_is_truthy(env_logging::otlp::OTEL_EXPORT_ENABLED)
138}
139
140/// Get the service name from environment or use default
141fn get_service_name() -> String {
142    std::env::var(env_logging::otlp::OTEL_SERVICE_NAME)
143        .unwrap_or_else(|_| DEFAULT_OTEL_SERVICE_NAME.to_string())
144}
145
146/// Validate a given trace ID according to W3C Trace Context specifications.
147/// A valid trace ID is a 32-character hexadecimal string (lowercase).
148pub fn is_valid_trace_id(trace_id: &str) -> bool {
149    trace_id.len() == 32 && trace_id.chars().all(|c| c.is_ascii_hexdigit())
150}
151
152/// Validate a given span ID according to W3C Trace Context specifications.
153/// A valid span ID is a 16-character hexadecimal string (lowercase).
154pub fn is_valid_span_id(span_id: &str) -> bool {
155    span_id.len() == 16 && span_id.chars().all(|c| c.is_ascii_hexdigit())
156}
157
158pub struct DistributedTraceIdLayer;
159
160#[derive(Debug, Clone, Serialize, Deserialize)]
161pub struct DistributedTraceContext {
162    pub trace_id: String,
163    pub span_id: String,
164    #[serde(skip_serializing_if = "Option::is_none")]
165    pub parent_id: Option<String>,
166    #[serde(skip_serializing_if = "Option::is_none")]
167    pub tracestate: Option<String>,
168    #[serde(skip)]
169    start: Option<Instant>,
170    #[serde(skip)]
171    end: Option<Instant>,
172    #[serde(skip_serializing_if = "Option::is_none")]
173    pub x_request_id: Option<String>,
174    #[serde(skip_serializing_if = "Option::is_none")]
175    pub x_dynamo_request_id: Option<String>,
176}
177
178/// Pending context data collected in on_new_span, to be finalized in on_enter
179#[derive(Debug, Clone)]
180struct PendingDistributedTraceContext {
181    trace_id: Option<String>,
182    span_id: Option<String>,
183    parent_id: Option<String>,
184    tracestate: Option<String>,
185    x_request_id: Option<String>,
186    x_dynamo_request_id: Option<String>,
187}
188
189/// Macro to emit a tracing event at a dynamic level with a custom target.
190macro_rules! emit_at_level {
191    ($level:expr, target: $target:expr, $($arg:tt)*) => {
192        // tracing::event! requires a compile-time constant level, so we must match
193        // on the runtime level and use a literal Level constant in each arm.
194        // See: https://github.com/tokio-rs/tracing/issues/2730
195        match $level {
196            &tracing::Level::ERROR => tracing::event!(target: $target, tracing::Level::ERROR, $($arg)*),
197            &tracing::Level::WARN => tracing::event!(target: $target, tracing::Level::WARN, $($arg)*),
198            &tracing::Level::INFO => tracing::event!(target: $target, tracing::Level::INFO, $($arg)*),
199            &tracing::Level::DEBUG => tracing::event!(target: $target, tracing::Level::DEBUG, $($arg)*),
200            &tracing::Level::TRACE => tracing::event!(target: $target, tracing::Level::TRACE, $($arg)*),
201        }
202    };
203}
204
205impl DistributedTraceContext {
206    /// Create a traceparent string from the context
207    pub fn create_traceparent(&self) -> String {
208        format!("00-{}-{}-01", self.trace_id, self.span_id)
209    }
210}
211
212/// Parse a traceparent string into its components
213pub fn parse_traceparent(traceparent: &str) -> (Option<String>, Option<String>) {
214    let pieces: Vec<_> = traceparent.split('-').collect();
215    if pieces.len() != 4 {
216        return (None, None);
217    }
218    let trace_id = pieces[1];
219    let parent_id = pieces[2];
220
221    if !is_valid_trace_id(trace_id) || !is_valid_span_id(parent_id) {
222        return (None, None);
223    }
224
225    (Some(trace_id.to_string()), Some(parent_id.to_string()))
226}
227
228#[derive(Debug, Clone, Default)]
229pub struct TraceParent {
230    pub trace_id: Option<String>,
231    pub parent_id: Option<String>,
232    pub tracestate: Option<String>,
233    pub x_request_id: Option<String>,
234    pub x_dynamo_request_id: Option<String>,
235}
236
237pub trait GenericHeaders {
238    fn get(&self, key: &str) -> Option<&str>;
239}
240
241impl GenericHeaders for async_nats::HeaderMap {
242    fn get(&self, key: &str) -> Option<&str> {
243        async_nats::HeaderMap::get(self, key).map(|value| value.as_str())
244    }
245}
246
247impl GenericHeaders for http::HeaderMap {
248    fn get(&self, key: &str) -> Option<&str> {
249        http::HeaderMap::get(self, key).and_then(|value| value.to_str().ok())
250    }
251}
252
253impl TraceParent {
254    pub fn from_headers<H: GenericHeaders>(headers: &H) -> TraceParent {
255        let mut trace_id = None;
256        let mut parent_id = None;
257        let mut tracestate = None;
258        let mut x_request_id = None;
259        let mut x_dynamo_request_id = None;
260
261        if let Some(header_value) = headers.get("traceparent") {
262            (trace_id, parent_id) = parse_traceparent(header_value);
263        }
264
265        if let Some(header_value) = headers.get("x-request-id") {
266            x_request_id = Some(header_value.to_string());
267        }
268
269        if let Some(header_value) = headers.get("tracestate") {
270            tracestate = Some(header_value.to_string());
271        }
272
273        if let Some(header_value) = headers.get("x-dynamo-request-id") {
274            x_dynamo_request_id = Some(header_value.to_string());
275        }
276
277        // Validate UUID format
278        let x_dynamo_request_id =
279            x_dynamo_request_id.filter(|id| uuid::Uuid::parse_str(id).is_ok());
280        TraceParent {
281            trace_id,
282            parent_id,
283            tracestate,
284            x_request_id,
285            x_dynamo_request_id,
286        }
287    }
288}
289
290// Takes Axum request and returning a span
291pub fn make_request_span<B>(req: &Request<B>) -> Span {
292    let method = req.method();
293    let uri = req.uri();
294    let version = format!("{:?}", req.version());
295    let trace_parent = TraceParent::from_headers(req.headers());
296
297    let otel_context = extract_otel_context_from_http_headers(req.headers());
298
299    let span = tracing::info_span!(
300        "http-request",
301        method = %method,
302        uri = %uri,
303        version = %version,
304        trace_id = trace_parent.trace_id,
305        parent_id = trace_parent.parent_id,
306        x_request_id = trace_parent.x_request_id,
307        x_dynamo_request_id = trace_parent.x_dynamo_request_id,
308    );
309
310    if let Some(context) = otel_context {
311        let _ = span.set_parent(context);
312    }
313
314    span
315}
316
317/// Extract OpenTelemetry context from HTTP headers for distributed tracing
318fn extract_otel_context_from_http_headers(
319    headers: &http::HeaderMap,
320) -> Option<opentelemetry::Context> {
321    let traceparent_value = headers.get("traceparent")?.to_str().ok()?;
322
323    struct HttpHeaderExtractor<'a>(&'a http::HeaderMap);
324
325    impl<'a> Extractor for HttpHeaderExtractor<'a> {
326        fn get(&self, key: &str) -> Option<&str> {
327            self.0.get(key).and_then(|v| v.to_str().ok())
328        }
329
330        fn keys(&self) -> Vec<&str> {
331            vec!["traceparent", "tracestate"]
332                .into_iter()
333                .filter(|&key| self.0.get(key).is_some())
334                .collect()
335        }
336    }
337
338    // Early return if traceparent is empty
339    if traceparent_value.is_empty() {
340        return None;
341    }
342
343    let extractor = HttpHeaderExtractor(headers);
344    let otel_context = TRACE_PROPAGATOR.extract(&extractor);
345
346    if otel_context.span().span_context().is_valid() {
347        Some(otel_context)
348    } else {
349        None
350    }
351}
352
353/// Create a handle_payload span from NATS headers with component context
354pub fn make_handle_payload_span(
355    headers: &async_nats::HeaderMap,
356    component: &str,
357    endpoint: &str,
358    namespace: &str,
359    instance_id: u64,
360) -> Span {
361    let (otel_context, trace_id, parent_span_id) = extract_otel_context_from_nats_headers(headers);
362    let trace_parent = TraceParent::from_headers(headers);
363
364    if let (Some(trace_id), Some(parent_id)) = (trace_id.as_ref(), parent_span_id.as_ref()) {
365        let span = tracing::info_span!(
366            "handle_payload",
367            trace_id = trace_id.as_str(),
368            parent_id = parent_id.as_str(),
369            x_request_id = trace_parent.x_request_id,
370            x_dynamo_request_id = trace_parent.x_dynamo_request_id,
371            tracestate = trace_parent.tracestate,
372            component = component,
373            endpoint = endpoint,
374            namespace = namespace,
375            instance_id = instance_id,
376        );
377
378        if let Some(context) = otel_context {
379            let _ = span.set_parent(context);
380        }
381        span
382    } else {
383        tracing::info_span!(
384            "handle_payload",
385            x_request_id = trace_parent.x_request_id,
386            x_dynamo_request_id = trace_parent.x_dynamo_request_id,
387            tracestate = trace_parent.tracestate,
388            component = component,
389            endpoint = endpoint,
390            namespace = namespace,
391            instance_id = instance_id,
392        )
393    }
394}
395
396/// Create a handle_payload span from TCP/HashMap headers with component context
397pub fn make_handle_payload_span_from_tcp_headers(
398    headers: &std::collections::HashMap<String, String>,
399    component: &str,
400    endpoint: &str,
401    namespace: &str,
402    instance_id: u64,
403) -> Span {
404    let (otel_context, trace_id, parent_span_id) = extract_otel_context_from_tcp_headers(headers);
405    let x_request_id = headers.get("x-request-id").cloned();
406    let x_dynamo_request_id = headers.get("x-dynamo-request-id").cloned();
407    let tracestate = headers.get("tracestate").cloned();
408
409    if let (Some(trace_id), Some(parent_id)) = (trace_id.as_ref(), parent_span_id.as_ref()) {
410        let span = tracing::info_span!(
411            "handle_payload",
412            trace_id = trace_id.as_str(),
413            parent_id = parent_id.as_str(),
414            x_request_id = x_request_id,
415            x_dynamo_request_id = x_dynamo_request_id,
416            tracestate = tracestate,
417            component = component,
418            endpoint = endpoint,
419            namespace = namespace,
420            instance_id = instance_id,
421        );
422
423        if let Some(context) = otel_context {
424            let _ = span.set_parent(context);
425        }
426        span
427    } else {
428        tracing::info_span!(
429            "handle_payload",
430            x_request_id = x_request_id,
431            x_dynamo_request_id = x_dynamo_request_id,
432            tracestate = tracestate,
433            component = component,
434            endpoint = endpoint,
435            namespace = namespace,
436            instance_id = instance_id,
437        )
438    }
439}
440
441/// Extract OpenTelemetry trace context from TCP/HashMap headers for distributed tracing
442fn extract_otel_context_from_tcp_headers(
443    headers: &std::collections::HashMap<String, String>,
444) -> (
445    Option<opentelemetry::Context>,
446    Option<String>,
447    Option<String>,
448) {
449    let traceparent_value = match headers.get("traceparent") {
450        Some(value) => value.as_str(),
451        None => return (None, None, None),
452    };
453
454    let (trace_id, parent_span_id) = parse_traceparent(traceparent_value);
455
456    struct TcpHeaderExtractor<'a>(&'a std::collections::HashMap<String, String>);
457
458    impl<'a> Extractor for TcpHeaderExtractor<'a> {
459        fn get(&self, key: &str) -> Option<&str> {
460            self.0.get(key).map(|s| s.as_str())
461        }
462
463        fn keys(&self) -> Vec<&str> {
464            vec!["traceparent", "tracestate"]
465                .into_iter()
466                .filter(|&key| self.0.get(key).is_some())
467                .collect()
468        }
469    }
470
471    let extractor = TcpHeaderExtractor(headers);
472    let otel_context = TRACE_PROPAGATOR.extract(&extractor);
473
474    let context_with_trace = if otel_context.span().span_context().is_valid() {
475        Some(otel_context)
476    } else {
477        None
478    };
479
480    (context_with_trace, trace_id, parent_span_id)
481}
482
483/// Extract OpenTelemetry trace context from NATS headers for distributed tracing
484pub fn extract_otel_context_from_nats_headers(
485    headers: &async_nats::HeaderMap,
486) -> (
487    Option<opentelemetry::Context>,
488    Option<String>,
489    Option<String>,
490) {
491    let traceparent_value = match headers.get("traceparent") {
492        Some(value) => value.as_str(),
493        None => return (None, None, None),
494    };
495
496    let (trace_id, parent_span_id) = parse_traceparent(traceparent_value);
497
498    struct NatsHeaderExtractor<'a>(&'a async_nats::HeaderMap);
499
500    impl<'a> Extractor for NatsHeaderExtractor<'a> {
501        fn get(&self, key: &str) -> Option<&str> {
502            self.0.get(key).map(|value| value.as_str())
503        }
504
505        fn keys(&self) -> Vec<&str> {
506            vec!["traceparent", "tracestate"]
507                .into_iter()
508                .filter(|&key| self.0.get(key).is_some())
509                .collect()
510        }
511    }
512
513    let extractor = NatsHeaderExtractor(headers);
514    let otel_context = TRACE_PROPAGATOR.extract(&extractor);
515
516    let context_with_trace = if otel_context.span().span_context().is_valid() {
517        Some(otel_context)
518    } else {
519        None
520    };
521
522    (context_with_trace, trace_id, parent_span_id)
523}
524
525/// Inject OpenTelemetry trace context into NATS headers using W3C Trace Context propagation
526pub fn inject_otel_context_into_nats_headers(
527    headers: &mut async_nats::HeaderMap,
528    context: Option<opentelemetry::Context>,
529) {
530    let otel_context = context.unwrap_or_else(|| Span::current().context());
531
532    struct NatsHeaderInjector<'a>(&'a mut async_nats::HeaderMap);
533
534    impl<'a> Injector for NatsHeaderInjector<'a> {
535        fn set(&mut self, key: &str, value: String) {
536            self.0.insert(key, value);
537        }
538    }
539
540    let mut injector = NatsHeaderInjector(headers);
541    TRACE_PROPAGATOR.inject_context(&otel_context, &mut injector);
542}
543
544/// Inject trace context from current span into NATS headers
545pub fn inject_current_trace_into_nats_headers(headers: &mut async_nats::HeaderMap) {
546    inject_otel_context_into_nats_headers(headers, None);
547}
548
549// Inject trace headers into a generic HashMap for HTTP/TCP transports
550pub fn inject_trace_headers_into_map(headers: &mut std::collections::HashMap<String, String>) {
551    if let Some(trace_context) = get_distributed_tracing_context() {
552        // Inject W3C traceparent header
553        headers.insert(
554            "traceparent".to_string(),
555            trace_context.create_traceparent(),
556        );
557
558        // Inject optional tracestate
559        if let Some(tracestate) = trace_context.tracestate {
560            headers.insert("tracestate".to_string(), tracestate);
561        }
562
563        // Inject custom request IDs
564        if let Some(x_request_id) = trace_context.x_request_id {
565            headers.insert("x-request-id".to_string(), x_request_id);
566        }
567        if let Some(x_dynamo_request_id) = trace_context.x_dynamo_request_id {
568            headers.insert("x-dynamo-request-id".to_string(), x_dynamo_request_id);
569        }
570    }
571}
572
573/// Create a client_request span linked to the parent trace context
574pub fn make_client_request_span(
575    operation: &str,
576    request_id: &str,
577    trace_context: Option<&DistributedTraceContext>,
578    instance_id: Option<&str>,
579) -> Span {
580    if let Some(ctx) = trace_context {
581        let mut headers = async_nats::HeaderMap::new();
582        headers.insert("traceparent", ctx.create_traceparent());
583
584        if let Some(ref tracestate) = ctx.tracestate {
585            headers.insert("tracestate", tracestate.as_str());
586        }
587
588        let (otel_context, _extracted_trace_id, _extracted_parent_span_id) =
589            extract_otel_context_from_nats_headers(&headers);
590
591        let span = if let Some(inst_id) = instance_id {
592            tracing::info_span!(
593                "client_request",
594                operation = operation,
595                request_id = request_id,
596                instance_id = inst_id,
597                trace_id = ctx.trace_id.as_str(),
598                parent_id = ctx.span_id.as_str(),
599                x_request_id = ctx.x_request_id.as_deref(),
600                x_dynamo_request_id = ctx.x_dynamo_request_id.as_deref(),
601                // tracestate = ctx.tracestate.as_deref(),
602            )
603        } else {
604            tracing::info_span!(
605                "client_request",
606                operation = operation,
607                request_id = request_id,
608                trace_id = ctx.trace_id.as_str(),
609                parent_id = ctx.span_id.as_str(),
610                x_request_id = ctx.x_request_id.as_deref(),
611                x_dynamo_request_id = ctx.x_dynamo_request_id.as_deref(),
612                // tracestate = ctx.tracestate.as_deref(),
613            )
614        };
615
616        if let Some(context) = otel_context {
617            let _ = span.set_parent(context);
618        }
619
620        span
621    } else if let Some(inst_id) = instance_id {
622        tracing::info_span!(
623            "client_request",
624            operation = operation,
625            request_id = request_id,
626            instance_id = inst_id,
627        )
628    } else {
629        tracing::info_span!(
630            "client_request",
631            operation = operation,
632            request_id = request_id,
633        )
634    }
635}
636
637#[derive(Debug, Default)]
638pub struct FieldVisitor {
639    pub fields: HashMap<String, String>,
640}
641
642impl Visit for FieldVisitor {
643    fn record_str(&mut self, field: &Field, value: &str) {
644        self.fields
645            .insert(field.name().to_string(), value.to_string());
646    }
647
648    fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
649        self.fields
650            .insert(field.name().to_string(), format!("{:?}", value).to_string());
651    }
652}
653
654impl<S> Layer<S> for DistributedTraceIdLayer
655where
656    S: Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>,
657{
658    // Capture close span time
659    // Currently not used but added for future use in timing
660    fn on_close(&self, id: Id, ctx: Context<'_, S>) {
661        if let Some(span) = ctx.span(&id) {
662            let mut extensions = span.extensions_mut();
663            if let Some(distributed_tracing_context) =
664                extensions.get_mut::<DistributedTraceContext>()
665            {
666                distributed_tracing_context.end = Some(Instant::now());
667            }
668        }
669    }
670
671    // Collects span attributes and metadata in on_new_span
672    // Final initialization deferred to on_enter when OtelData is available
673    fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &Id, ctx: Context<'_, S>) {
674        if let Some(span) = ctx.span(id) {
675            let mut trace_id: Option<String> = None;
676            let mut parent_id: Option<String> = None;
677            let mut span_id: Option<String> = None;
678            let mut x_request_id: Option<String> = None;
679            let mut x_dynamo_request_id: Option<String> = None;
680            let mut tracestate: Option<String> = None;
681            let mut visitor = FieldVisitor::default();
682            attrs.record(&mut visitor);
683
684            // Extract trace_id from span attributes
685            if let Some(trace_id_input) = visitor.fields.get("trace_id") {
686                if !is_valid_trace_id(trace_id_input) {
687                    tracing::trace!("trace id  '{}' is not valid! Ignoring.", trace_id_input);
688                } else {
689                    trace_id = Some(trace_id_input.to_string());
690                }
691            }
692
693            // Extract span_id from span attributes
694            if let Some(span_id_input) = visitor.fields.get("span_id") {
695                if !is_valid_span_id(span_id_input) {
696                    tracing::trace!("span id  '{}' is not valid! Ignoring.", span_id_input);
697                } else {
698                    span_id = Some(span_id_input.to_string());
699                }
700            }
701
702            // Extract parent_id from span attributes
703            if let Some(parent_id_input) = visitor.fields.get("parent_id") {
704                if !is_valid_span_id(parent_id_input) {
705                    tracing::trace!("parent id  '{}' is not valid! Ignoring.", parent_id_input);
706                } else {
707                    parent_id = Some(parent_id_input.to_string());
708                }
709            }
710
711            // Extract tracestate
712            if let Some(tracestate_input) = visitor.fields.get("tracestate") {
713                tracestate = Some(tracestate_input.to_string());
714            }
715
716            // Extract x_request_id
717            if let Some(x_request_id_input) = visitor.fields.get("x_request_id") {
718                x_request_id = Some(x_request_id_input.to_string());
719            }
720
721            // Extract x_dynamo_request_id
722            if let Some(x_request_id_input) = visitor.fields.get("x_dynamo_request_id") {
723                x_dynamo_request_id = Some(x_request_id_input.to_string());
724            }
725
726            // Inherit trace context from parent span if available
727            if parent_id.is_none()
728                && let Some(parent_span_id) = ctx.current_span().id()
729                && let Some(parent_span) = ctx.span(parent_span_id)
730            {
731                let parent_ext = parent_span.extensions();
732                if let Some(parent_tracing_context) = parent_ext.get::<DistributedTraceContext>() {
733                    trace_id = Some(parent_tracing_context.trace_id.clone());
734                    parent_id = Some(parent_tracing_context.span_id.clone());
735                    tracestate = parent_tracing_context.tracestate.clone();
736                }
737            }
738
739            // Validate consistency
740            if (parent_id.is_some() || span_id.is_some()) && trace_id.is_none() {
741                tracing::error!("parent id or span id are set but trace id is not set!");
742                // Clear inconsistent IDs to maintain trace integrity
743                parent_id = None;
744                span_id = None;
745            }
746
747            // Store pending context - will be finalized in on_enter
748            let mut extensions = span.extensions_mut();
749            extensions.insert(PendingDistributedTraceContext {
750                trace_id,
751                span_id,
752                parent_id,
753                tracestate,
754                x_request_id,
755                x_dynamo_request_id,
756            });
757        }
758    }
759
760    // Finalizes the DistributedTraceContext when span is entered
761    // At this point, OtelData should have valid trace_id and span_id
762    fn on_enter(&self, id: &Id, ctx: Context<'_, S>) {
763        if let Some(span) = ctx.span(id) {
764            // Check if already initialized (e.g., span re-entered)
765            {
766                let extensions = span.extensions();
767                if extensions.get::<DistributedTraceContext>().is_some() {
768                    return;
769                }
770            }
771
772            // Get the pending context and extract OtelData IDs
773            let mut extensions = span.extensions_mut();
774            let pending = match extensions.remove::<PendingDistributedTraceContext>() {
775                Some(p) => p,
776                None => {
777                    // This shouldn't happen - on_new_span should have created it
778                    tracing::error!("PendingDistributedTraceContext not found in on_enter");
779                    return;
780                }
781            };
782
783            let mut trace_id = pending.trace_id;
784            let mut span_id = pending.span_id;
785            let parent_id = pending.parent_id;
786            let tracestate = pending.tracestate;
787            let x_request_id = pending.x_request_id;
788            let x_dynamo_request_id = pending.x_dynamo_request_id;
789
790            // Try to extract from OtelData if not already set
791            // Need to drop extensions_mut to get immutable borrow for OtelData
792            drop(extensions);
793
794            if trace_id.is_none() || span_id.is_none() {
795                let extensions = span.extensions();
796                if let Some(otel_data) = extensions.get::<tracing_opentelemetry::OtelData>() {
797                    // Extract trace_id from OTEL data if not already set
798                    if trace_id.is_none()
799                        && let Some(otel_trace_id) = otel_data.trace_id()
800                    {
801                        let trace_id_str = format!("{}", otel_trace_id);
802                        if is_valid_trace_id(&trace_id_str) {
803                            trace_id = Some(trace_id_str);
804                        }
805                    }
806
807                    // Extract span_id from OTEL data if not already set
808                    if span_id.is_none()
809                        && let Some(otel_span_id) = otel_data.span_id()
810                    {
811                        let span_id_str = format!("{}", otel_span_id);
812                        if is_valid_span_id(&span_id_str) {
813                            span_id = Some(span_id_str);
814                        }
815                    }
816                }
817            }
818
819            // Panic if we still don't have required IDs
820            if trace_id.is_none() {
821                panic!(
822                    "trace_id is not set in on_enter - OtelData may not be properly initialized"
823                );
824            }
825
826            if span_id.is_none() {
827                panic!("span_id is not set in on_enter - OtelData may not be properly initialized");
828            }
829
830            let span_level = span.metadata().level();
831            let mut extensions = span.extensions_mut();
832            extensions.insert(DistributedTraceContext {
833                trace_id: trace_id.expect("Trace ID must be set"),
834                span_id: span_id.expect("Span ID must be set"),
835                parent_id,
836                tracestate,
837                start: Some(Instant::now()),
838                end: None,
839                x_request_id,
840                x_dynamo_request_id,
841            });
842
843            drop(extensions);
844
845            // Emit SPAN_FIRST_ENTRY event. This only runs if the span passed the layer's filter
846            // (on_enter is not called for filtered-out spans), so no additional check needed.
847            if span_events_enabled() {
848                emit_at_level!(span_level, target: "span_event", message = "SPAN_FIRST_ENTRY");
849            }
850        }
851    }
852}
853
854// Enables functions to retreive their current
855// context for adding to distributed headers
856pub fn get_distributed_tracing_context() -> Option<DistributedTraceContext> {
857    Span::current()
858        .with_subscriber(|(id, subscriber)| {
859            subscriber
860                .downcast_ref::<Registry>()
861                .and_then(|registry| registry.span_data(id))
862                .and_then(|span_data| {
863                    let extensions = span_data.extensions();
864                    extensions.get::<DistributedTraceContext>().cloned()
865                })
866        })
867        .flatten()
868}
869
870/// Initialize the logger - must be called when Tokio runtime is available
871pub fn init() {
872    INIT.call_once(|| {
873        if let Err(e) = setup_logging() {
874            eprintln!("Failed to initialize logging: {}", e);
875            std::process::exit(1);
876        }
877    });
878}
879
880#[cfg(feature = "tokio-console")]
881fn setup_logging() {
882    let tokio_console_layer = console_subscriber::ConsoleLayer::builder()
883        .with_default_env()
884        .server_addr(([0, 0, 0, 0], console_subscriber::Server::DEFAULT_PORT))
885        .spawn();
886    let tokio_console_target = tracing_subscriber::filter::Targets::new()
887        .with_default(LevelFilter::ERROR)
888        .with_target("runtime", LevelFilter::TRACE)
889        .with_target("tokio", LevelFilter::TRACE);
890    let l = fmt::layer()
891        .with_ansi(!disable_ansi_logging())
892        .event_format(fmt::format().compact().with_timer(TimeFormatter::new()))
893        .with_writer(std::io::stderr)
894        .with_filter(filters(load_config()));
895    tracing_subscriber::registry()
896        .with(l)
897        .with(tokio_console_layer.with_filter(tokio_console_target))
898        .init();
899}
900
901#[cfg(not(feature = "tokio-console"))]
902fn setup_logging() -> Result<(), Box<dyn std::error::Error>> {
903    let fmt_filter_layer = filters(load_config());
904    let trace_filter_layer = filters(load_config());
905    let otel_filter_layer = filters(load_config());
906
907    if jsonl_logging_enabled() {
908        let span_events = if span_events_enabled() {
909            FmtSpan::CLOSE
910        } else {
911            FmtSpan::NONE
912        };
913        let l = fmt::layer()
914            .with_ansi(false)
915            .with_span_events(span_events)
916            .event_format(CustomJsonFormatter::new())
917            .with_writer(std::io::stderr)
918            .with_filter(fmt_filter_layer);
919
920        // Create OpenTelemetry tracer - conditionally export to OTLP based on env var
921        let service_name = get_service_name();
922
923        // Build tracer provider - with or without OTLP export
924        let (tracer_provider, endpoint_opt) = if otlp_exporter_enabled() {
925            // Export enabled: create OTLP exporter with batch processor
926            let endpoint = std::env::var(env_logging::otlp::OTEL_EXPORTER_OTLP_TRACES_ENDPOINT)
927                .unwrap_or_else(|_| DEFAULT_OTLP_ENDPOINT.to_string());
928
929            // Initialize OTLP exporter using gRPC (Tonic)
930            let otlp_exporter = opentelemetry_otlp::SpanExporter::builder()
931                .with_tonic()
932                .with_endpoint(&endpoint)
933                .build()?;
934
935            // Create tracer provider with batch exporter and service name
936            let provider = opentelemetry_sdk::trace::SdkTracerProvider::builder()
937                .with_batch_exporter(otlp_exporter)
938                .with_resource(
939                    opentelemetry_sdk::Resource::builder_empty()
940                        .with_service_name(service_name.clone())
941                        .build(),
942                )
943                .build();
944
945            (provider, Some(endpoint))
946        } else {
947            // No export - traces generated locally only (for logging/trace IDs)
948            let provider = opentelemetry_sdk::trace::SdkTracerProvider::builder()
949                .with_resource(
950                    opentelemetry_sdk::Resource::builder_empty()
951                        .with_service_name(service_name.clone())
952                        .build(),
953                )
954                .build();
955
956            (provider, None)
957        };
958
959        // Get a tracer from the provider
960        let tracer = tracer_provider.tracer(service_name.clone());
961
962        tracing_subscriber::registry()
963            .with(
964                tracing_opentelemetry::layer()
965                    .with_tracer(tracer)
966                    .with_filter(otel_filter_layer),
967            )
968            .with(DistributedTraceIdLayer.with_filter(trace_filter_layer))
969            .with(l)
970            .init();
971
972        // Log initialization status after subscriber is ready
973        if let Some(endpoint) = endpoint_opt {
974            tracing::info!(
975                endpoint = %endpoint,
976                service = %service_name,
977                "OpenTelemetry OTLP export enabled"
978            );
979        } else {
980            tracing::info!(
981                service = %service_name,
982                "OpenTelemetry OTLP export disabled, traces local only"
983            );
984        }
985    } else {
986        let l = fmt::layer()
987            .with_ansi(!disable_ansi_logging())
988            .event_format(fmt::format().compact().with_timer(TimeFormatter::new()))
989            .with_writer(std::io::stderr)
990            .with_filter(fmt_filter_layer);
991
992        tracing_subscriber::registry().with(l).init();
993    }
994
995    Ok(())
996}
997
998fn filters(config: LoggingConfig) -> EnvFilter {
999    let mut filter_layer = EnvFilter::builder()
1000        .with_default_directive(config.log_level.parse().unwrap())
1001        .with_env_var(env_logging::DYN_LOG)
1002        .from_env_lossy();
1003
1004    for (module, level) in config.log_filters {
1005        match format!("{module}={level}").parse::<Directive>() {
1006            Ok(d) => {
1007                filter_layer = filter_layer.add_directive(d);
1008            }
1009            Err(e) => {
1010                eprintln!("Failed parsing filter '{level}' for module '{module}': {e}");
1011            }
1012        }
1013    }
1014
1015    // When span events are enabled, allow "span_event" target at all levels
1016    // This ensures SPAN_FIRST_ENTRY events pass the filter when emitted from on_enter
1017    if span_events_enabled() {
1018        filter_layer = filter_layer.add_directive("span_event=trace".parse().unwrap());
1019    }
1020
1021    filter_layer
1022}
1023
1024/// Log a message with file and line info
1025/// Used by Python wrapper
1026pub fn log_message(level: &str, message: &str, module: &str, file: &str, line: u32) {
1027    let level = match level {
1028        "debug" => log::Level::Debug,
1029        "info" => log::Level::Info,
1030        "warn" => log::Level::Warn,
1031        "error" => log::Level::Error,
1032        "warning" => log::Level::Warn,
1033        _ => log::Level::Info,
1034    };
1035    log::logger().log(
1036        &log::Record::builder()
1037            .args(format_args!("{}", message))
1038            .level(level)
1039            .target(module)
1040            .file(Some(file))
1041            .line(Some(line))
1042            .build(),
1043    );
1044}
1045
1046fn load_config() -> LoggingConfig {
1047    let config_path =
1048        std::env::var(env_logging::DYN_LOGGING_CONFIG_PATH).unwrap_or_else(|_| "".to_string());
1049    let figment = Figment::new()
1050        .merge(Serialized::defaults(LoggingConfig::default()))
1051        .merge(Toml::file("/opt/dynamo/etc/logging.toml"))
1052        .merge(Toml::file(config_path));
1053
1054    figment.extract().unwrap()
1055}
1056
1057#[derive(Serialize)]
1058struct JsonLog<'a> {
1059    time: String,
1060    level: String,
1061    #[serde(skip_serializing_if = "Option::is_none")]
1062    file: Option<&'a str>,
1063    #[serde(skip_serializing_if = "Option::is_none")]
1064    line: Option<u32>,
1065    target: String,
1066    message: serde_json::Value,
1067    #[serde(flatten)]
1068    fields: BTreeMap<String, serde_json::Value>,
1069}
1070
1071struct TimeFormatter {
1072    use_local_tz: bool,
1073}
1074
1075impl TimeFormatter {
1076    fn new() -> Self {
1077        Self {
1078            use_local_tz: crate::config::use_local_timezone(),
1079        }
1080    }
1081
1082    fn format_now(&self) -> String {
1083        if self.use_local_tz {
1084            chrono::Local::now()
1085                .format("%Y-%m-%dT%H:%M:%S%.6f%:z")
1086                .to_string()
1087        } else {
1088            chrono::Utc::now()
1089                .format("%Y-%m-%dT%H:%M:%S%.6fZ")
1090                .to_string()
1091        }
1092    }
1093}
1094
1095impl FormatTime for TimeFormatter {
1096    fn format_time(&self, w: &mut fmt::format::Writer<'_>) -> std::fmt::Result {
1097        write!(w, "{}", self.format_now())
1098    }
1099}
1100
1101struct CustomJsonFormatter {
1102    time_formatter: TimeFormatter,
1103}
1104
1105impl CustomJsonFormatter {
1106    fn new() -> Self {
1107        Self {
1108            time_formatter: TimeFormatter::new(),
1109        }
1110    }
1111}
1112
1113use once_cell::sync::Lazy;
1114use regex::Regex;
1115
1116/// Static W3C Trace Context propagator instance to avoid repeated allocations
1117static TRACE_PROPAGATOR: Lazy<opentelemetry_sdk::propagation::TraceContextPropagator> =
1118    Lazy::new(opentelemetry_sdk::propagation::TraceContextPropagator::new);
1119
1120fn parse_tracing_duration(s: &str) -> Option<u64> {
1121    static RE: Lazy<Regex> =
1122        Lazy::new(|| Regex::new(r#"^["']?\s*([0-9.]+)\s*(µs|us|ns|ms|s)\s*["']?$"#).unwrap());
1123    let captures = RE.captures(s)?;
1124    let value: f64 = captures[1].parse().ok()?;
1125    let unit = &captures[2];
1126    match unit {
1127        "ns" => Some((value / 1000.0) as u64),
1128        "µs" | "us" => Some(value as u64),
1129        "ms" => Some((value * 1000.0) as u64),
1130        "s" => Some((value * 1_000_000.0) as u64),
1131        _ => None,
1132    }
1133}
1134
1135impl<S, N> tracing_subscriber::fmt::FormatEvent<S, N> for CustomJsonFormatter
1136where
1137    S: Subscriber + for<'a> LookupSpan<'a>,
1138    N: for<'a> FormatFields<'a> + 'static,
1139{
1140    fn format_event(
1141        &self,
1142        ctx: &FmtContext<'_, S, N>,
1143        mut writer: Writer<'_>,
1144        event: &Event<'_>,
1145    ) -> std::fmt::Result {
1146        let mut visitor = JsonVisitor::default();
1147        let time = self.time_formatter.format_now();
1148        event.record(&mut visitor);
1149        let mut message = visitor
1150            .fields
1151            .remove("message")
1152            .unwrap_or(serde_json::Value::String("".to_string()));
1153
1154        let mut target_override: Option<String> = None;
1155
1156        let current_span = event
1157            .parent()
1158            .and_then(|id| ctx.span(id))
1159            .or_else(|| ctx.lookup_current());
1160        if let Some(span) = current_span {
1161            let ext = span.extensions();
1162            let data = ext.get::<FormattedFields<N>>().unwrap();
1163            let span_fields: Vec<(&str, &str)> = data
1164                .fields
1165                .split(' ')
1166                .filter_map(|entry| entry.split_once('='))
1167                .collect();
1168            for (name, value) in span_fields {
1169                visitor.fields.insert(
1170                    name.to_string(),
1171                    serde_json::Value::String(value.trim_matches('"').to_string()),
1172                );
1173            }
1174
1175            let busy_us = visitor
1176                .fields
1177                .remove("time.busy")
1178                .and_then(|v| parse_tracing_duration(&v.to_string()));
1179            let idle_us = visitor
1180                .fields
1181                .remove("time.idle")
1182                .and_then(|v| parse_tracing_duration(&v.to_string()));
1183
1184            if let (Some(busy_us), Some(idle_us)) = (busy_us, idle_us) {
1185                visitor.fields.insert(
1186                    "time.busy_us".to_string(),
1187                    serde_json::Value::Number(busy_us.into()),
1188                );
1189                visitor.fields.insert(
1190                    "time.idle_us".to_string(),
1191                    serde_json::Value::Number(idle_us.into()),
1192                );
1193                visitor.fields.insert(
1194                    "time.duration_us".to_string(),
1195                    serde_json::Value::Number((busy_us + idle_us).into()),
1196                );
1197            }
1198
1199            let is_span_created = message.as_str() == Some("SPAN_FIRST_ENTRY");
1200            let is_span_closed = message.as_str() == Some("close");
1201            if is_span_created || is_span_closed {
1202                target_override = Some(span.metadata().target().to_string());
1203                if is_span_closed {
1204                    message = serde_json::Value::String("SPAN_CLOSED".to_string());
1205                }
1206            }
1207
1208            visitor.fields.insert(
1209                "span_name".to_string(),
1210                serde_json::Value::String(span.name().to_string()),
1211            );
1212
1213            if let Some(tracing_context) = ext.get::<DistributedTraceContext>() {
1214                visitor.fields.insert(
1215                    "span_id".to_string(),
1216                    serde_json::Value::String(tracing_context.span_id.clone()),
1217                );
1218                visitor.fields.insert(
1219                    "trace_id".to_string(),
1220                    serde_json::Value::String(tracing_context.trace_id.clone()),
1221                );
1222                if let Some(parent_id) = tracing_context.parent_id.clone() {
1223                    visitor.fields.insert(
1224                        "parent_id".to_string(),
1225                        serde_json::Value::String(parent_id),
1226                    );
1227                } else {
1228                    visitor.fields.remove("parent_id");
1229                }
1230                if let Some(tracestate) = tracing_context.tracestate.clone() {
1231                    visitor.fields.insert(
1232                        "tracestate".to_string(),
1233                        serde_json::Value::String(tracestate),
1234                    );
1235                } else {
1236                    visitor.fields.remove("tracestate");
1237                }
1238                if let Some(x_request_id) = tracing_context.x_request_id.clone() {
1239                    visitor.fields.insert(
1240                        "x_request_id".to_string(),
1241                        serde_json::Value::String(x_request_id),
1242                    );
1243                } else {
1244                    visitor.fields.remove("x_request_id");
1245                }
1246
1247                if let Some(x_dynamo_request_id) = tracing_context.x_dynamo_request_id.clone() {
1248                    visitor.fields.insert(
1249                        "x_dynamo_request_id".to_string(),
1250                        serde_json::Value::String(x_dynamo_request_id),
1251                    );
1252                } else {
1253                    visitor.fields.remove("x_dynamo_request_id");
1254                }
1255            } else {
1256                tracing::error!(
1257                    "Distributed Trace Context not found, falling back to internal ids"
1258                );
1259                visitor.fields.insert(
1260                    "span_id".to_string(),
1261                    serde_json::Value::String(span.id().into_u64().to_string()),
1262                );
1263                if let Some(parent) = span.parent() {
1264                    visitor.fields.insert(
1265                        "parent_id".to_string(),
1266                        serde_json::Value::String(parent.id().into_u64().to_string()),
1267                    );
1268                }
1269            }
1270        } else {
1271            let reserved_fields = [
1272                "trace_id",
1273                "span_id",
1274                "parent_id",
1275                "span_name",
1276                "tracestate",
1277            ];
1278            for reserved_field in reserved_fields {
1279                visitor.fields.remove(reserved_field);
1280            }
1281        }
1282        let metadata = event.metadata();
1283        let log = JsonLog {
1284            level: metadata.level().to_string(),
1285            time,
1286            file: metadata.file(),
1287            line: metadata.line(),
1288            target: target_override.unwrap_or_else(|| metadata.target().to_string()),
1289            message,
1290            fields: visitor.fields,
1291        };
1292        let json = serde_json::to_string(&log).unwrap();
1293        writeln!(writer, "{json}")
1294    }
1295}
1296
1297#[derive(Default)]
1298struct JsonVisitor {
1299    fields: BTreeMap<String, serde_json::Value>,
1300}
1301
1302impl tracing::field::Visit for JsonVisitor {
1303    fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
1304        self.fields.insert(
1305            field.name().to_string(),
1306            serde_json::Value::String(format!("{value:?}")),
1307        );
1308    }
1309
1310    fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
1311        if field.name() != "message" {
1312            match serde_json::from_str::<Value>(value) {
1313                Ok(json_val) => self.fields.insert(field.name().to_string(), json_val),
1314                Err(_) => self.fields.insert(field.name().to_string(), value.into()),
1315            };
1316        } else {
1317            self.fields.insert(field.name().to_string(), value.into());
1318        }
1319    }
1320
1321    fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
1322        self.fields
1323            .insert(field.name().to_string(), serde_json::Value::Bool(value));
1324    }
1325
1326    fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
1327        self.fields.insert(
1328            field.name().to_string(),
1329            serde_json::Value::Number(value.into()),
1330        );
1331    }
1332
1333    fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
1334        self.fields.insert(
1335            field.name().to_string(),
1336            serde_json::Value::Number(value.into()),
1337        );
1338    }
1339
1340    fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
1341        use serde_json::value::Number;
1342        self.fields.insert(
1343            field.name().to_string(),
1344            serde_json::Value::Number(Number::from_f64(value).unwrap_or(0.into())),
1345        );
1346    }
1347}
1348
1349#[cfg(test)]
1350pub mod tests {
1351    use super::*;
1352    use anyhow::{Result, anyhow};
1353    use chrono::{DateTime, Utc};
1354    use jsonschema::{Draft, JSONSchema};
1355    use serde_json::Value;
1356    use std::fs::File;
1357    use std::io::{BufRead, BufReader};
1358    use stdio_override::*;
1359    use tempfile::NamedTempFile;
1360
1361    static LOG_LINE_SCHEMA: &str = r#"
1362    {
1363      "$schema": "http://json-schema.org/draft-07/schema#",
1364      "title": "Runtime Log Line",
1365      "type": "object",
1366      "required": [
1367        "file",
1368        "level",
1369        "line",
1370        "message",
1371        "target",
1372        "time"
1373      ],
1374      "properties": {
1375        "file":      { "type": "string" },
1376        "level":     { "type": "string", "enum": ["ERROR", "WARN", "INFO", "DEBUG", "TRACE"] },
1377        "line":      { "type": "integer" },
1378        "message":   { "type": "string" },
1379        "target":    { "type": "string" },
1380        "time":      { "type": "string", "format": "date-time" },
1381        "span_id":   { "type": "string", "pattern": "^[a-f0-9]{16}$" },
1382        "parent_id": { "type": "string", "pattern": "^[a-f0-9]{16}$" },
1383        "trace_id":  { "type": "string", "pattern": "^[a-f0-9]{32}$" },
1384        "span_name": { "type": "string" },
1385        "time.busy_us":     { "type": "integer" },
1386        "time.duration_us": { "type": "integer" },
1387        "time.idle_us":     { "type": "integer" },
1388        "tracestate": { "type": "string" }
1389      },
1390      "additionalProperties": true
1391    }
1392    "#;
1393
1394    #[tracing::instrument(skip_all)]
1395    async fn parent() {
1396        tracing::trace!(message = "parent!");
1397        if let Some(my_ctx) = get_distributed_tracing_context() {
1398            tracing::info!(my_trace_id = my_ctx.trace_id);
1399        }
1400        child().await;
1401    }
1402
1403    #[tracing::instrument(skip_all)]
1404    async fn child() {
1405        tracing::trace!(message = "child");
1406        if let Some(my_ctx) = get_distributed_tracing_context() {
1407            tracing::info!(my_trace_id = my_ctx.trace_id);
1408        }
1409        grandchild().await;
1410    }
1411
1412    #[tracing::instrument(skip_all)]
1413    async fn grandchild() {
1414        tracing::trace!(message = "grandchild");
1415        if let Some(my_ctx) = get_distributed_tracing_context() {
1416            tracing::info!(my_trace_id = my_ctx.trace_id);
1417        }
1418    }
1419
1420    pub fn load_log(file_name: &str) -> Result<Vec<serde_json::Value>> {
1421        let schema_json: Value =
1422            serde_json::from_str(LOG_LINE_SCHEMA).expect("schema parse failure");
1423        let compiled_schema = JSONSchema::options()
1424            .with_draft(Draft::Draft7)
1425            .compile(&schema_json)
1426            .expect("Invalid schema");
1427
1428        let f = File::open(file_name)?;
1429        let reader = BufReader::new(f);
1430        let mut result = Vec::new();
1431
1432        for (line_num, line) in reader.lines().enumerate() {
1433            let line = line?;
1434            let val: Value = serde_json::from_str(&line)
1435                .map_err(|e| anyhow!("Line {}: invalid JSON: {}", line_num + 1, e))?;
1436
1437            if let Err(errors) = compiled_schema.validate(&val) {
1438                let errs = errors.map(|e| e.to_string()).collect::<Vec<_>>().join("; ");
1439                return Err(anyhow!(
1440                    "Line {}: JSON Schema Validation errors: {}",
1441                    line_num + 1,
1442                    errs
1443                ));
1444            }
1445            println!("{}", val);
1446            result.push(val);
1447        }
1448        Ok(result)
1449    }
1450
1451    #[tokio::test]
1452    async fn test_json_log_capture() -> Result<()> {
1453        #[allow(clippy::redundant_closure_call)]
1454        let _ = temp_env::async_with_vars(
1455            [(env_logging::DYN_LOGGING_JSONL, Some("1"))],
1456            (async || {
1457                let tmp_file = NamedTempFile::new().unwrap();
1458                let file_name = tmp_file.path().to_str().unwrap();
1459                let guard = StderrOverride::from_file(file_name)?;
1460                init();
1461                parent().await;
1462                drop(guard);
1463
1464                let lines = load_log(file_name)?;
1465
1466                // 1. Extract the dynamically generated trace ID and validate consistency
1467                // All logs should have the same trace_id since they're part of the same trace
1468                // Skip any initialization logs that don't have trace_id (e.g., OTLP setup messages)
1469                //
1470                // Note: This test can fail if logging was already initialized by another test running
1471                // in parallel. Logging initialization is global (Once) and can only happen once per process.
1472                // If no trace_id is found, skip validation gracefully.
1473                let Some(trace_id) = lines
1474                    .iter()
1475                    .find_map(|log_line| log_line.get("trace_id").and_then(|v| v.as_str()))
1476                    .map(|s| s.to_string())
1477                else {
1478                    // Skip test if logging was already initialized - we can't control the output format
1479                    return Ok(());
1480                };
1481
1482                // Verify trace_id is not a zero/invalid ID
1483                assert_ne!(
1484                    trace_id, "00000000000000000000000000000000",
1485                    "trace_id should not be a zero/invalid ID"
1486                );
1487                assert!(
1488                    !trace_id.chars().all(|c| c == '0'),
1489                    "trace_id should not be all zeros"
1490                );
1491
1492                // Verify all logs have the same trace_id
1493                for log_line in &lines {
1494                    if let Some(line_trace_id) = log_line.get("trace_id") {
1495                        assert_eq!(
1496                            line_trace_id.as_str().unwrap(),
1497                            &trace_id,
1498                            "All logs should have the same trace_id"
1499                        );
1500                    }
1501                }
1502
1503                // Validate my_trace_id matches the actual trace ID
1504                for log_line in &lines {
1505                    if let Some(my_trace_id) = log_line.get("my_trace_id") {
1506                        assert_eq!(
1507                            my_trace_id,
1508                            &serde_json::Value::String(trace_id.clone()),
1509                            "my_trace_id should match the trace_id from distributed tracing context"
1510                        );
1511                    }
1512                }
1513
1514                // 2. Validate span IDs exist and are properly formatted
1515                let mut span_ids_seen: std::collections::HashSet<String> = std::collections::HashSet::new();
1516                let mut span_timestamps: std::collections::HashMap<String, DateTime<Utc>> = std::collections::HashMap::new();
1517
1518                for log_line in &lines {
1519                    if let Some(span_id) = log_line.get("span_id") {
1520                        let span_id_str = span_id.as_str().unwrap();
1521                        assert!(
1522                            is_valid_span_id(span_id_str),
1523                            "Invalid span_id format: {}",
1524                            span_id_str
1525                        );
1526                        span_ids_seen.insert(span_id_str.to_string());
1527                    }
1528
1529                    // Validate timestamp format and track span timestamps
1530                    if let Some(time_str) = log_line.get("time").and_then(|v| v.as_str()) {
1531                        let timestamp = DateTime::parse_from_rfc3339(time_str)
1532                            .expect("All timestamps should be valid RFC3339 format")
1533                            .with_timezone(&Utc);
1534
1535                        // Track timestamp for each span_name
1536                        if let Some(span_name) = log_line.get("span_name").and_then(|v| v.as_str()) {
1537                            span_timestamps.insert(span_name.to_string(), timestamp);
1538                        }
1539                    }
1540                }
1541
1542                // 3. Validate parent-child span relationships
1543                // Extract span IDs for each span by looking at their log messages
1544                let parent_span_id = lines
1545                    .iter()
1546                    .find(|log_line| {
1547                        log_line.get("span_name")
1548                            .and_then(|v| v.as_str()) == Some("parent")
1549                    })
1550                    .and_then(|log_line| {
1551                        log_line.get("span_id")
1552                            .and_then(|v| v.as_str())
1553                            .map(|s| s.to_string())
1554                    })
1555                    .expect("Should find parent span with span_id");
1556
1557                let child_span_id = lines
1558                    .iter()
1559                    .find(|log_line| {
1560                        log_line.get("span_name")
1561                            .and_then(|v| v.as_str()) == Some("child")
1562                    })
1563                    .and_then(|log_line| {
1564                        log_line.get("span_id")
1565                            .and_then(|v| v.as_str())
1566                            .map(|s| s.to_string())
1567                    })
1568                    .expect("Should find child span with span_id");
1569
1570                let grandchild_span_id = lines
1571                    .iter()
1572                    .find(|log_line| {
1573                        log_line.get("span_name")
1574                            .and_then(|v| v.as_str()) == Some("grandchild")
1575                    })
1576                    .and_then(|log_line| {
1577                        log_line.get("span_id")
1578                            .and_then(|v| v.as_str())
1579                            .map(|s| s.to_string())
1580                    })
1581                    .expect("Should find grandchild span with span_id");
1582
1583                // Verify span IDs are unique
1584                assert_ne!(parent_span_id, child_span_id, "Parent and child should have different span IDs");
1585                assert_ne!(child_span_id, grandchild_span_id, "Child and grandchild should have different span IDs");
1586                assert_ne!(parent_span_id, grandchild_span_id, "Parent and grandchild should have different span IDs");
1587
1588                // Verify parent span has no parent_id
1589                for log_line in &lines {
1590                    if let Some(span_name) = log_line.get("span_name")
1591                        && let Some(span_name_str) = span_name.as_str()
1592                        && span_name_str == "parent"
1593                    {
1594                        assert!(
1595                            log_line.get("parent_id").is_none(),
1596                            "Parent span should not have a parent_id"
1597                        );
1598                    }
1599                }
1600
1601                // Verify child span's parent_id is parent_span_id
1602                for log_line in &lines {
1603                    if let Some(span_name) = log_line.get("span_name")
1604                        && let Some(span_name_str) = span_name.as_str()
1605                        && span_name_str == "child"
1606                    {
1607                        let parent_id = log_line.get("parent_id")
1608                            .and_then(|v| v.as_str())
1609                            .expect("Child span should have a parent_id");
1610                        assert_eq!(
1611                            parent_id,
1612                            parent_span_id,
1613                            "Child's parent_id should match parent's span_id"
1614                        );
1615                    }
1616                }
1617
1618                // Verify grandchild span's parent_id is child_span_id
1619                for log_line in &lines {
1620                    if let Some(span_name) = log_line.get("span_name")
1621                        && let Some(span_name_str) = span_name.as_str()
1622                        && span_name_str == "grandchild"
1623                    {
1624                        let parent_id = log_line.get("parent_id")
1625                            .and_then(|v| v.as_str())
1626                            .expect("Grandchild span should have a parent_id");
1627                        assert_eq!(
1628                            parent_id,
1629                            child_span_id,
1630                            "Grandchild's parent_id should match child's span_id"
1631                        );
1632                    }
1633                }
1634
1635                // 4. Validate timestamp ordering - spans should log in execution order
1636                let parent_time = span_timestamps.get("parent")
1637                    .expect("Should have timestamp for parent span");
1638                let child_time = span_timestamps.get("child")
1639                    .expect("Should have timestamp for child span");
1640                let grandchild_time = span_timestamps.get("grandchild")
1641                    .expect("Should have timestamp for grandchild span");
1642
1643                // Parent logs first (or at same time), then child, then grandchild
1644                assert!(
1645                    parent_time <= child_time,
1646                    "Parent span should log before or at same time as child span (parent: {}, child: {})",
1647                    parent_time,
1648                    child_time
1649                );
1650                assert!(
1651                    child_time <= grandchild_time,
1652                    "Child span should log before or at same time as grandchild span (child: {}, grandchild: {})",
1653                    child_time,
1654                    grandchild_time
1655                );
1656
1657                Ok::<(), anyhow::Error>(())
1658            })(),
1659        )
1660        .await;
1661        Ok(())
1662    }
1663
1664    // Test functions at different log levels for filtering tests
1665    #[tracing::instrument(level = "debug", skip_all)]
1666    async fn debug_level_span() {
1667        tracing::debug!("inside debug span");
1668    }
1669
1670    #[tracing::instrument(level = "info", skip_all)]
1671    async fn info_level_span() {
1672        tracing::info!("inside info span");
1673    }
1674
1675    #[tracing::instrument(level = "warn", skip_all)]
1676    async fn warn_level_span() {
1677        tracing::warn!("inside warn span");
1678    }
1679
1680    // Span from a different target - should be FILTERED OUT at info level
1681    // because the filter is warn,dynamo_runtime::logging::tests=debug
1682    #[tracing::instrument(level = "info", target = "other_module", skip_all)]
1683    async fn other_target_info_span() {
1684        tracing::info!(target: "other_module", "inside other target span");
1685    }
1686
1687    /// Comprehensive test for span events covering:
1688    /// - SPAN_FIRST_ENTRY and SPAN_CLOSED event emission
1689    /// - Trace context (trace_id, span_id) in span events
1690    /// - Timing information in SPAN_CLOSED events
1691    /// - Level-based filtering (positive: allowed levels pass, negative: filtered levels blocked)
1692    /// - Target-based filtering (spans from allowed targets pass even at lower levels)
1693    ///
1694    /// This test runs in a subprocess to ensure logging is initialized with our specific
1695    /// filter settings (DYN_LOG=warn,dynamo_runtime::logging::tests=debug), avoiding
1696    /// interference from other tests that may have initialized logging first.
1697    #[test]
1698    fn test_span_events() {
1699        use std::process::Command;
1700
1701        // Run cargo test for the subprocess test with specific env vars
1702        let output = Command::new("cargo")
1703            .args([
1704                "test",
1705                "-p",
1706                "dynamo-runtime",
1707                "test_span_events_subprocess",
1708                "--",
1709                "--exact",
1710                "--nocapture",
1711            ])
1712            .env("DYN_LOGGING_JSONL", "1")
1713            .env("DYN_LOGGING_SPAN_EVENTS", "1")
1714            .env("DYN_LOG", "warn,dynamo_runtime::logging::tests=debug")
1715            .output()
1716            .expect("Failed to execute subprocess test");
1717
1718        // Print output for debugging
1719        if !output.status.success() {
1720            eprintln!(
1721                "=== STDOUT ===\n{}",
1722                String::from_utf8_lossy(&output.stdout)
1723            );
1724            eprintln!(
1725                "=== STDERR ===\n{}",
1726                String::from_utf8_lossy(&output.stderr)
1727            );
1728        }
1729
1730        assert!(
1731            output.status.success(),
1732            "Subprocess test failed with exit code: {:?}",
1733            output.status.code()
1734        );
1735    }
1736
1737    /// Subprocess test that performs the actual span event validation.
1738    /// This is called by test_span_events in a separate process with controlled env vars.
1739    #[tokio::test]
1740    async fn test_span_events_subprocess() -> Result<()> {
1741        // Skip if not running as subprocess (env vars not set)
1742        if std::env::var("DYN_LOGGING_SPAN_EVENTS").is_err() {
1743            return Ok(());
1744        }
1745
1746        let tmp_file = NamedTempFile::new().unwrap();
1747        let file_name = tmp_file.path().to_str().unwrap();
1748        let guard = StderrOverride::from_file(file_name)?;
1749        init();
1750
1751        // Run parent/child/grandchild spans (all INFO level by default)
1752        parent().await;
1753
1754        // Run spans at explicit levels from our test module
1755        debug_level_span().await;
1756        info_level_span().await;
1757        warn_level_span().await;
1758
1759        // Run span from different target (should be filtered out)
1760        other_target_info_span().await;
1761
1762        drop(guard);
1763
1764        let lines = load_log(file_name)?;
1765
1766        // Helper to check if a span event exists
1767        let has_span_event = |msg: &str, span_name: &str| {
1768            lines.iter().any(|log| {
1769                log.get("message").and_then(|v| v.as_str()) == Some(msg)
1770                    && log.get("span_name").and_then(|v| v.as_str()) == Some(span_name)
1771            })
1772        };
1773
1774        // Helper to get span events
1775        let get_span_events = |msg: &str| -> Vec<&serde_json::Value> {
1776            lines
1777                .iter()
1778                .filter(|log| log.get("message").and_then(|v| v.as_str()) == Some(msg))
1779                .collect()
1780        };
1781
1782        // === Test 1: SPAN_FIRST_ENTRY events have required fields ===
1783        let span_created_events = get_span_events("SPAN_FIRST_ENTRY");
1784        for event in &span_created_events {
1785            // Must have span_name
1786            assert!(
1787                event.get("span_name").is_some(),
1788                "SPAN_FIRST_ENTRY must have span_name"
1789            );
1790            // Must have valid trace_id (format check)
1791            let trace_id = event
1792                .get("trace_id")
1793                .and_then(|v| v.as_str())
1794                .expect("SPAN_FIRST_ENTRY must have trace_id");
1795            assert!(
1796                trace_id.len() == 32 && trace_id.chars().all(|c| c.is_ascii_hexdigit()),
1797                "SPAN_FIRST_ENTRY must have valid trace_id format"
1798            );
1799            // Must have valid span_id
1800            let span_id = event
1801                .get("span_id")
1802                .and_then(|v| v.as_str())
1803                .expect("SPAN_FIRST_ENTRY must have span_id");
1804            assert!(
1805                is_valid_span_id(span_id),
1806                "SPAN_FIRST_ENTRY must have valid span_id"
1807            );
1808        }
1809
1810        // === Test 2: SPAN_CLOSED events have timing info ===
1811        let span_closed_events = get_span_events("SPAN_CLOSED");
1812        for event in &span_closed_events {
1813            assert!(
1814                event.get("span_name").is_some(),
1815                "SPAN_CLOSED must have span_name"
1816            );
1817            assert!(
1818                event.get("time.busy_us").is_some()
1819                    || event.get("time.idle_us").is_some()
1820                    || event.get("time.duration_us").is_some(),
1821                "SPAN_CLOSED must have timing information"
1822            );
1823            // Must have valid trace_id
1824            let trace_id = event
1825                .get("trace_id")
1826                .and_then(|v| v.as_str())
1827                .expect("SPAN_CLOSED must have trace_id");
1828            assert!(
1829                trace_id.len() == 32 && trace_id.chars().all(|c| c.is_ascii_hexdigit()),
1830                "SPAN_CLOSED must have valid trace_id format"
1831            );
1832        }
1833
1834        // === Test 3: Target-based filtering (positive) ===
1835        // Spans from dynamo_runtime::logging::tests should pass at ALL levels
1836        // because the target is allowed at debug level
1837        assert!(
1838            has_span_event("SPAN_FIRST_ENTRY", "debug_level_span"),
1839            "DEBUG span from allowed target MUST pass (target=debug filter)"
1840        );
1841        assert!(
1842            has_span_event("SPAN_FIRST_ENTRY", "info_level_span"),
1843            "INFO span from allowed target MUST pass (target=debug filter)"
1844        );
1845        assert!(
1846            has_span_event("SPAN_FIRST_ENTRY", "warn_level_span"),
1847            "WARN span from allowed target MUST pass (target=debug filter)"
1848        );
1849
1850        // parent/child/grandchild are INFO level from allowed target - should pass
1851        assert!(
1852            has_span_event("SPAN_FIRST_ENTRY", "parent"),
1853            "parent span (INFO) from allowed target MUST pass"
1854        );
1855        assert!(
1856            has_span_event("SPAN_FIRST_ENTRY", "child"),
1857            "child span (INFO) from allowed target MUST pass"
1858        );
1859        assert!(
1860            has_span_event("SPAN_FIRST_ENTRY", "grandchild"),
1861            "grandchild span (INFO) from allowed target MUST pass"
1862        );
1863
1864        // === Test 4: Level-based filtering (negative) ===
1865        // Verify spans from OTHER targets at debug/info level are filtered out
1866        assert!(
1867            !has_span_event("SPAN_FIRST_ENTRY", "other_target_info_span"),
1868            "INFO span from non-allowed target (other_module) MUST be filtered out"
1869        );
1870
1871        // Also verify no spans from other targets appear at debug/info level
1872        for event in &span_created_events {
1873            let target = event.get("target").and_then(|v| v.as_str()).unwrap_or("");
1874            let level = event.get("level").and_then(|v| v.as_str()).unwrap_or("");
1875
1876            // If level is DEBUG or INFO, target must be our test module
1877            if level == "DEBUG" || level == "INFO" {
1878                assert!(
1879                    target.contains("dynamo_runtime::logging::tests"),
1880                    "DEBUG/INFO span must be from allowed target, got target={target}"
1881                );
1882            }
1883        }
1884
1885        Ok(())
1886    }
1887}