Skip to main content

dynamo_runtime/
logging.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Dynamo Distributed Logging Module.
5//!
6//! - Configuration loaded from:
7//!   1. Environment variables (highest priority).
8//!   2. Optional TOML file pointed to by the `DYN_LOGGING_CONFIG_PATH` environment variable.
9//!   3. `/opt/dynamo/etc/logging.toml`.
10//!
11//! Logging can take two forms: `READABLE` or `JSONL`. The default is `READABLE`. `JSONL`
12//! can be enabled by setting the `DYN_LOGGING_JSONL` environment variable to `1`.
13//!
14//! To use local timezone for logging timestamps, set the `DYN_LOG_USE_LOCAL_TZ` environment variable to `1`.
15//!
16//! Filters can be configured using the `DYN_LOG` environment variable or by setting the `filters`
17//! key in the TOML configuration file. Filters are comma-separated key-value pairs where the key
18//! is the crate or module name and the value is the log level. The default log level is `info`.
19//!
20//! Example:
21//! ```toml
22//! log_level = "error"
23//!
24//! [log_filters]
25//! "test_logging" = "info"
26//! "test_logging::api" = "trace"
27//! ```
28
29use std::collections::{BTreeMap, HashMap};
30use std::sync::Once;
31
32use figment::{
33    Figment,
34    providers::{Format, Serialized, Toml},
35};
36use serde::{Deserialize, Serialize};
37use tracing::level_filters::LevelFilter;
38use tracing::{Event, Subscriber};
39use tracing_subscriber::EnvFilter;
40use tracing_subscriber::fmt::time::FormatTime;
41use tracing_subscriber::fmt::time::LocalTime;
42use tracing_subscriber::fmt::time::SystemTime;
43use tracing_subscriber::fmt::time::UtcTime;
44use tracing_subscriber::fmt::{FmtContext, FormatFields};
45use tracing_subscriber::fmt::{FormattedFields, format::Writer};
46use tracing_subscriber::prelude::*;
47use tracing_subscriber::registry::LookupSpan;
48use tracing_subscriber::{filter::Directive, fmt};
49
50use crate::config::{disable_ansi_logging, jsonl_logging_enabled, span_events_enabled};
51use async_nats::{HeaderMap, HeaderValue};
52use axum::extract::FromRequestParts;
53use axum::http;
54use axum::http::Request;
55use axum::http::request::Parts;
56use serde_json::Value;
57use std::convert::Infallible;
58use std::time::Instant;
59use tower_http::trace::{DefaultMakeSpan, TraceLayer};
60use tracing::Id;
61use tracing::Span;
62use tracing::field::Field;
63use tracing::span;
64use tracing_subscriber::Layer;
65use tracing_subscriber::Registry;
66use tracing_subscriber::field::Visit;
67use tracing_subscriber::fmt::format::FmtSpan;
68use tracing_subscriber::layer::Context;
69use tracing_subscriber::registry::SpanData;
70use uuid::Uuid;
71
72use opentelemetry::propagation::{Extractor, Injector, TextMapPropagator};
73use opentelemetry::trace::TraceContextExt;
74use opentelemetry::{global, trace::Tracer};
75use opentelemetry_otlp::WithExportConfig;
76
77use opentelemetry::trace::TracerProvider as _;
78use opentelemetry::{Key, KeyValue};
79use opentelemetry_sdk::Resource;
80use opentelemetry_sdk::trace::SdkTracerProvider;
81use tracing::error;
82use tracing_subscriber::layer::SubscriberExt;
83// use tracing_subscriber::Registry;
84
85use std::time::Duration;
86use tracing::{info, instrument};
87use tracing_opentelemetry::OpenTelemetrySpanExt;
88use tracing_subscriber::util::SubscriberInitExt;
89
90use crate::config::environment_names::logging as env_logging;
91
92use dynamo_config::env_is_truthy;
93
94/// Default log level
95const DEFAULT_FILTER_LEVEL: &str = "info";
96
97/// Default OTLP endpoint
98const DEFAULT_OTLP_ENDPOINT: &str = "http://localhost:4317";
99
100/// Default service name
101const DEFAULT_OTEL_SERVICE_NAME: &str = "dynamo";
102
103/// Once instance to ensure the logger is only initialized once
104static INIT: Once = Once::new();
105
106#[derive(Serialize, Deserialize, Debug)]
107struct LoggingConfig {
108    log_level: String,
109    log_filters: HashMap<String, String>,
110}
111impl Default for LoggingConfig {
112    fn default() -> Self {
113        LoggingConfig {
114            log_level: DEFAULT_FILTER_LEVEL.to_string(),
115            log_filters: HashMap::from([
116                ("h2".to_string(), "error".to_string()),
117                ("tower".to_string(), "error".to_string()),
118                ("hyper_util".to_string(), "error".to_string()),
119                ("neli".to_string(), "error".to_string()),
120                ("async_nats".to_string(), "error".to_string()),
121                ("rustls".to_string(), "error".to_string()),
122                ("tokenizers".to_string(), "error".to_string()),
123                ("axum".to_string(), "error".to_string()),
124                ("tonic".to_string(), "error".to_string()),
125                ("hf_hub".to_string(), "error".to_string()),
126                ("opentelemetry".to_string(), "error".to_string()),
127                ("opentelemetry-otlp".to_string(), "error".to_string()),
128                ("opentelemetry_sdk".to_string(), "error".to_string()),
129            ]),
130        }
131    }
132}
133
134/// Check if OTLP trace exporting is enabled (accepts: "1", "true", "on", "yes" - case insensitive)
135fn otlp_exporter_enabled() -> bool {
136    env_is_truthy(env_logging::otlp::OTEL_EXPORT_ENABLED)
137}
138
139/// Get the service name from environment or use default
140fn get_service_name() -> String {
141    std::env::var(env_logging::otlp::OTEL_SERVICE_NAME)
142        .unwrap_or_else(|_| DEFAULT_OTEL_SERVICE_NAME.to_string())
143}
144
145/// Validate a given trace ID according to W3C Trace Context specifications.
146/// A valid trace ID is a 32-character hexadecimal string (lowercase).
147pub fn is_valid_trace_id(trace_id: &str) -> bool {
148    trace_id.len() == 32 && trace_id.chars().all(|c| c.is_ascii_hexdigit())
149}
150
151/// Validate a given span ID according to W3C Trace Context specifications.
152/// A valid span ID is a 16-character hexadecimal string (lowercase).
153pub fn is_valid_span_id(span_id: &str) -> bool {
154    span_id.len() == 16 && span_id.chars().all(|c| c.is_ascii_hexdigit())
155}
156
157pub struct DistributedTraceIdLayer;
158
159#[derive(Debug, Clone, Serialize, Deserialize)]
160pub struct DistributedTraceContext {
161    pub trace_id: String,
162    pub span_id: String,
163    #[serde(skip_serializing_if = "Option::is_none")]
164    pub parent_id: Option<String>,
165    #[serde(skip_serializing_if = "Option::is_none")]
166    pub tracestate: Option<String>,
167    #[serde(skip)]
168    start: Option<Instant>,
169    #[serde(skip)]
170    end: Option<Instant>,
171    #[serde(skip_serializing_if = "Option::is_none")]
172    pub x_request_id: Option<String>,
173    #[serde(skip_serializing_if = "Option::is_none")]
174    pub x_dynamo_request_id: Option<String>,
175}
176
177/// Pending context data collected in on_new_span, to be finalized in on_enter
178#[derive(Debug, Clone)]
179struct PendingDistributedTraceContext {
180    trace_id: Option<String>,
181    span_id: Option<String>,
182    parent_id: Option<String>,
183    tracestate: Option<String>,
184    x_request_id: Option<String>,
185    x_dynamo_request_id: Option<String>,
186}
187
188/// Macro to emit a tracing event at a dynamic level with a custom target.
189macro_rules! emit_at_level {
190    ($level:expr, target: $target:expr, $($arg:tt)*) => {
191        // tracing::event! requires a compile-time constant level, so we must match
192        // on the runtime level and use a literal Level constant in each arm.
193        // See: https://github.com/tokio-rs/tracing/issues/2730
194        match $level {
195            &tracing::Level::ERROR => tracing::event!(target: $target, tracing::Level::ERROR, $($arg)*),
196            &tracing::Level::WARN => tracing::event!(target: $target, tracing::Level::WARN, $($arg)*),
197            &tracing::Level::INFO => tracing::event!(target: $target, tracing::Level::INFO, $($arg)*),
198            &tracing::Level::DEBUG => tracing::event!(target: $target, tracing::Level::DEBUG, $($arg)*),
199            &tracing::Level::TRACE => tracing::event!(target: $target, tracing::Level::TRACE, $($arg)*),
200        }
201    };
202}
203
204impl DistributedTraceContext {
205    /// Create a traceparent string from the context
206    pub fn create_traceparent(&self) -> String {
207        format!("00-{}-{}-01", self.trace_id, self.span_id)
208    }
209}
210
211/// Parse a traceparent string into its components
212pub fn parse_traceparent(traceparent: &str) -> (Option<String>, Option<String>) {
213    let pieces: Vec<_> = traceparent.split('-').collect();
214    if pieces.len() != 4 {
215        return (None, None);
216    }
217    let trace_id = pieces[1];
218    let parent_id = pieces[2];
219
220    if !is_valid_trace_id(trace_id) || !is_valid_span_id(parent_id) {
221        return (None, None);
222    }
223
224    (Some(trace_id.to_string()), Some(parent_id.to_string()))
225}
226
227#[derive(Debug, Clone, Default)]
228pub struct TraceParent {
229    pub trace_id: Option<String>,
230    pub parent_id: Option<String>,
231    pub tracestate: Option<String>,
232    pub x_request_id: Option<String>,
233    pub x_dynamo_request_id: Option<String>,
234}
235
236pub trait GenericHeaders {
237    fn get(&self, key: &str) -> Option<&str>;
238}
239
240impl GenericHeaders for async_nats::HeaderMap {
241    fn get(&self, key: &str) -> Option<&str> {
242        async_nats::HeaderMap::get(self, key).map(|value| value.as_str())
243    }
244}
245
246impl GenericHeaders for http::HeaderMap {
247    fn get(&self, key: &str) -> Option<&str> {
248        http::HeaderMap::get(self, key).and_then(|value| value.to_str().ok())
249    }
250}
251
252impl TraceParent {
253    pub fn from_headers<H: GenericHeaders>(headers: &H) -> TraceParent {
254        let mut trace_id = None;
255        let mut parent_id = None;
256        let mut tracestate = None;
257        let mut x_request_id = None;
258        let mut x_dynamo_request_id = None;
259
260        if let Some(header_value) = headers.get("traceparent") {
261            (trace_id, parent_id) = parse_traceparent(header_value);
262        }
263
264        if let Some(header_value) = headers.get("x-request-id") {
265            x_request_id = Some(header_value.to_string());
266        }
267
268        if let Some(header_value) = headers.get("tracestate") {
269            tracestate = Some(header_value.to_string());
270        }
271
272        if let Some(header_value) = headers.get("x-dynamo-request-id") {
273            x_dynamo_request_id = Some(header_value.to_string());
274        }
275
276        // Validate UUID format
277        let x_dynamo_request_id =
278            x_dynamo_request_id.filter(|id| uuid::Uuid::parse_str(id).is_ok());
279        TraceParent {
280            trace_id,
281            parent_id,
282            tracestate,
283            x_request_id,
284            x_dynamo_request_id,
285        }
286    }
287}
288
289// Takes Axum request and returning a span
290pub fn make_request_span<B>(req: &Request<B>) -> Span {
291    let method = req.method();
292    let uri = req.uri();
293    let version = format!("{:?}", req.version());
294    let trace_parent = TraceParent::from_headers(req.headers());
295
296    let otel_context = extract_otel_context_from_http_headers(req.headers());
297
298    let span = tracing::info_span!(
299        "http-request",
300        method = %method,
301        uri = %uri,
302        version = %version,
303        trace_id = trace_parent.trace_id,
304        parent_id = trace_parent.parent_id,
305        x_request_id = trace_parent.x_request_id,
306        x_dynamo_request_id = trace_parent.x_dynamo_request_id,
307    );
308
309    if let Some(context) = otel_context {
310        let _ = span.set_parent(context);
311    }
312
313    span
314}
315
316/// Extract OpenTelemetry context from HTTP headers for distributed tracing
317fn extract_otel_context_from_http_headers(
318    headers: &http::HeaderMap,
319) -> Option<opentelemetry::Context> {
320    let traceparent_value = headers.get("traceparent")?.to_str().ok()?;
321
322    struct HttpHeaderExtractor<'a>(&'a http::HeaderMap);
323
324    impl<'a> Extractor for HttpHeaderExtractor<'a> {
325        fn get(&self, key: &str) -> Option<&str> {
326            self.0.get(key).and_then(|v| v.to_str().ok())
327        }
328
329        fn keys(&self) -> Vec<&str> {
330            vec!["traceparent", "tracestate"]
331                .into_iter()
332                .filter(|&key| self.0.get(key).is_some())
333                .collect()
334        }
335    }
336
337    // Early return if traceparent is empty
338    if traceparent_value.is_empty() {
339        return None;
340    }
341
342    let extractor = HttpHeaderExtractor(headers);
343    let otel_context = TRACE_PROPAGATOR.extract(&extractor);
344
345    if otel_context.span().span_context().is_valid() {
346        Some(otel_context)
347    } else {
348        None
349    }
350}
351
352/// Create a handle_payload span from NATS headers with component context
353pub fn make_handle_payload_span(
354    headers: &async_nats::HeaderMap,
355    component: &str,
356    endpoint: &str,
357    namespace: &str,
358    instance_id: u64,
359) -> Span {
360    let (otel_context, trace_id, parent_span_id) = extract_otel_context_from_nats_headers(headers);
361    let trace_parent = TraceParent::from_headers(headers);
362
363    if let (Some(trace_id), Some(parent_id)) = (trace_id.as_ref(), parent_span_id.as_ref()) {
364        let span = tracing::info_span!(
365            "handle_payload",
366            trace_id = trace_id.as_str(),
367            parent_id = parent_id.as_str(),
368            x_request_id = trace_parent.x_request_id,
369            x_dynamo_request_id = trace_parent.x_dynamo_request_id,
370            tracestate = trace_parent.tracestate,
371            component = component,
372            endpoint = endpoint,
373            namespace = namespace,
374            instance_id = instance_id,
375        );
376
377        if let Some(context) = otel_context {
378            let _ = span.set_parent(context);
379        }
380        span
381    } else {
382        tracing::info_span!(
383            "handle_payload",
384            x_request_id = trace_parent.x_request_id,
385            x_dynamo_request_id = trace_parent.x_dynamo_request_id,
386            tracestate = trace_parent.tracestate,
387            component = component,
388            endpoint = endpoint,
389            namespace = namespace,
390            instance_id = instance_id,
391        )
392    }
393}
394
395/// Create a handle_payload span from TCP/HashMap headers with component context
396pub fn make_handle_payload_span_from_tcp_headers(
397    headers: &std::collections::HashMap<String, String>,
398    component: &str,
399    endpoint: &str,
400    namespace: &str,
401    instance_id: u64,
402) -> Span {
403    let (otel_context, trace_id, parent_span_id) = extract_otel_context_from_tcp_headers(headers);
404    let x_request_id = headers.get("x-request-id").cloned();
405    let x_dynamo_request_id = headers.get("x-dynamo-request-id").cloned();
406    let tracestate = headers.get("tracestate").cloned();
407
408    if let (Some(trace_id), Some(parent_id)) = (trace_id.as_ref(), parent_span_id.as_ref()) {
409        let span = tracing::info_span!(
410            "handle_payload",
411            trace_id = trace_id.as_str(),
412            parent_id = parent_id.as_str(),
413            x_request_id = x_request_id,
414            x_dynamo_request_id = x_dynamo_request_id,
415            tracestate = tracestate,
416            component = component,
417            endpoint = endpoint,
418            namespace = namespace,
419            instance_id = instance_id,
420        );
421
422        if let Some(context) = otel_context {
423            let _ = span.set_parent(context);
424        }
425        span
426    } else {
427        tracing::info_span!(
428            "handle_payload",
429            x_request_id = x_request_id,
430            x_dynamo_request_id = x_dynamo_request_id,
431            tracestate = tracestate,
432            component = component,
433            endpoint = endpoint,
434            namespace = namespace,
435            instance_id = instance_id,
436        )
437    }
438}
439
440/// Extract OpenTelemetry trace context from TCP/HashMap headers for distributed tracing
441fn extract_otel_context_from_tcp_headers(
442    headers: &std::collections::HashMap<String, String>,
443) -> (
444    Option<opentelemetry::Context>,
445    Option<String>,
446    Option<String>,
447) {
448    let traceparent_value = match headers.get("traceparent") {
449        Some(value) => value.as_str(),
450        None => return (None, None, None),
451    };
452
453    let (trace_id, parent_span_id) = parse_traceparent(traceparent_value);
454
455    struct TcpHeaderExtractor<'a>(&'a std::collections::HashMap<String, String>);
456
457    impl<'a> Extractor for TcpHeaderExtractor<'a> {
458        fn get(&self, key: &str) -> Option<&str> {
459            self.0.get(key).map(|s| s.as_str())
460        }
461
462        fn keys(&self) -> Vec<&str> {
463            vec!["traceparent", "tracestate"]
464                .into_iter()
465                .filter(|&key| self.0.get(key).is_some())
466                .collect()
467        }
468    }
469
470    let extractor = TcpHeaderExtractor(headers);
471    let otel_context = TRACE_PROPAGATOR.extract(&extractor);
472
473    let context_with_trace = if otel_context.span().span_context().is_valid() {
474        Some(otel_context)
475    } else {
476        None
477    };
478
479    (context_with_trace, trace_id, parent_span_id)
480}
481
482/// Extract OpenTelemetry trace context from NATS headers for distributed tracing
483pub fn extract_otel_context_from_nats_headers(
484    headers: &async_nats::HeaderMap,
485) -> (
486    Option<opentelemetry::Context>,
487    Option<String>,
488    Option<String>,
489) {
490    let traceparent_value = match headers.get("traceparent") {
491        Some(value) => value.as_str(),
492        None => return (None, None, None),
493    };
494
495    let (trace_id, parent_span_id) = parse_traceparent(traceparent_value);
496
497    struct NatsHeaderExtractor<'a>(&'a async_nats::HeaderMap);
498
499    impl<'a> Extractor for NatsHeaderExtractor<'a> {
500        fn get(&self, key: &str) -> Option<&str> {
501            self.0.get(key).map(|value| value.as_str())
502        }
503
504        fn keys(&self) -> Vec<&str> {
505            vec!["traceparent", "tracestate"]
506                .into_iter()
507                .filter(|&key| self.0.get(key).is_some())
508                .collect()
509        }
510    }
511
512    let extractor = NatsHeaderExtractor(headers);
513    let otel_context = TRACE_PROPAGATOR.extract(&extractor);
514
515    let context_with_trace = if otel_context.span().span_context().is_valid() {
516        Some(otel_context)
517    } else {
518        None
519    };
520
521    (context_with_trace, trace_id, parent_span_id)
522}
523
524/// Inject OpenTelemetry trace context into NATS headers using W3C Trace Context propagation
525pub fn inject_otel_context_into_nats_headers(
526    headers: &mut async_nats::HeaderMap,
527    context: Option<opentelemetry::Context>,
528) {
529    let otel_context = context.unwrap_or_else(|| Span::current().context());
530
531    struct NatsHeaderInjector<'a>(&'a mut async_nats::HeaderMap);
532
533    impl<'a> Injector for NatsHeaderInjector<'a> {
534        fn set(&mut self, key: &str, value: String) {
535            self.0.insert(key, value);
536        }
537    }
538
539    let mut injector = NatsHeaderInjector(headers);
540    TRACE_PROPAGATOR.inject_context(&otel_context, &mut injector);
541}
542
543/// Inject trace context from current span into NATS headers
544pub fn inject_current_trace_into_nats_headers(headers: &mut async_nats::HeaderMap) {
545    inject_otel_context_into_nats_headers(headers, None);
546}
547
548// Inject trace headers into a generic HashMap for HTTP/TCP transports
549pub fn inject_trace_headers_into_map(headers: &mut std::collections::HashMap<String, String>) {
550    if let Some(trace_context) = get_distributed_tracing_context() {
551        // Inject W3C traceparent header
552        headers.insert(
553            "traceparent".to_string(),
554            trace_context.create_traceparent(),
555        );
556
557        // Inject optional tracestate
558        if let Some(tracestate) = trace_context.tracestate {
559            headers.insert("tracestate".to_string(), tracestate);
560        }
561
562        // Inject custom request IDs
563        if let Some(x_request_id) = trace_context.x_request_id {
564            headers.insert("x-request-id".to_string(), x_request_id);
565        }
566        if let Some(x_dynamo_request_id) = trace_context.x_dynamo_request_id {
567            headers.insert("x-dynamo-request-id".to_string(), x_dynamo_request_id);
568        }
569    }
570}
571
572/// Create a client_request span linked to the parent trace context
573pub fn make_client_request_span(
574    operation: &str,
575    request_id: &str,
576    trace_context: Option<&DistributedTraceContext>,
577    instance_id: Option<&str>,
578) -> Span {
579    if let Some(ctx) = trace_context {
580        let mut headers = async_nats::HeaderMap::new();
581        headers.insert("traceparent", ctx.create_traceparent());
582
583        if let Some(ref tracestate) = ctx.tracestate {
584            headers.insert("tracestate", tracestate.as_str());
585        }
586
587        let (otel_context, _extracted_trace_id, _extracted_parent_span_id) =
588            extract_otel_context_from_nats_headers(&headers);
589
590        let span = if let Some(inst_id) = instance_id {
591            tracing::info_span!(
592                "client_request",
593                operation = operation,
594                request_id = request_id,
595                instance_id = inst_id,
596                trace_id = ctx.trace_id.as_str(),
597                parent_id = ctx.span_id.as_str(),
598                x_request_id = ctx.x_request_id.as_deref(),
599                x_dynamo_request_id = ctx.x_dynamo_request_id.as_deref(),
600                // tracestate = ctx.tracestate.as_deref(),
601            )
602        } else {
603            tracing::info_span!(
604                "client_request",
605                operation = operation,
606                request_id = request_id,
607                trace_id = ctx.trace_id.as_str(),
608                parent_id = ctx.span_id.as_str(),
609                x_request_id = ctx.x_request_id.as_deref(),
610                x_dynamo_request_id = ctx.x_dynamo_request_id.as_deref(),
611                // tracestate = ctx.tracestate.as_deref(),
612            )
613        };
614
615        if let Some(context) = otel_context {
616            let _ = span.set_parent(context);
617        }
618
619        span
620    } else if let Some(inst_id) = instance_id {
621        tracing::info_span!(
622            "client_request",
623            operation = operation,
624            request_id = request_id,
625            instance_id = inst_id,
626        )
627    } else {
628        tracing::info_span!(
629            "client_request",
630            operation = operation,
631            request_id = request_id,
632        )
633    }
634}
635
636#[derive(Debug, Default)]
637pub struct FieldVisitor {
638    pub fields: HashMap<String, String>,
639}
640
641impl Visit for FieldVisitor {
642    fn record_str(&mut self, field: &Field, value: &str) {
643        self.fields
644            .insert(field.name().to_string(), value.to_string());
645    }
646
647    fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
648        self.fields
649            .insert(field.name().to_string(), format!("{:?}", value).to_string());
650    }
651}
652
653impl<S> Layer<S> for DistributedTraceIdLayer
654where
655    S: Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>,
656{
657    // Capture close span time
658    // Currently not used but added for future use in timing
659    fn on_close(&self, id: Id, ctx: Context<'_, S>) {
660        if let Some(span) = ctx.span(&id) {
661            let mut extensions = span.extensions_mut();
662            if let Some(distributed_tracing_context) =
663                extensions.get_mut::<DistributedTraceContext>()
664            {
665                distributed_tracing_context.end = Some(Instant::now());
666            }
667        }
668    }
669
670    // Collects span attributes and metadata in on_new_span
671    // Final initialization deferred to on_enter when OtelData is available
672    fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &Id, ctx: Context<'_, S>) {
673        if let Some(span) = ctx.span(id) {
674            let mut trace_id: Option<String> = None;
675            let mut parent_id: Option<String> = None;
676            let mut span_id: Option<String> = None;
677            let mut x_request_id: Option<String> = None;
678            let mut x_dynamo_request_id: Option<String> = None;
679            let mut tracestate: Option<String> = None;
680            let mut visitor = FieldVisitor::default();
681            attrs.record(&mut visitor);
682
683            // Extract trace_id from span attributes
684            if let Some(trace_id_input) = visitor.fields.get("trace_id") {
685                if !is_valid_trace_id(trace_id_input) {
686                    tracing::trace!("trace id  '{}' is not valid! Ignoring.", trace_id_input);
687                } else {
688                    trace_id = Some(trace_id_input.to_string());
689                }
690            }
691
692            // Extract span_id from span attributes
693            if let Some(span_id_input) = visitor.fields.get("span_id") {
694                if !is_valid_span_id(span_id_input) {
695                    tracing::trace!("span id  '{}' is not valid! Ignoring.", span_id_input);
696                } else {
697                    span_id = Some(span_id_input.to_string());
698                }
699            }
700
701            // Extract parent_id from span attributes
702            if let Some(parent_id_input) = visitor.fields.get("parent_id") {
703                if !is_valid_span_id(parent_id_input) {
704                    tracing::trace!("parent id  '{}' is not valid! Ignoring.", parent_id_input);
705                } else {
706                    parent_id = Some(parent_id_input.to_string());
707                }
708            }
709
710            // Extract tracestate
711            if let Some(tracestate_input) = visitor.fields.get("tracestate") {
712                tracestate = Some(tracestate_input.to_string());
713            }
714
715            // Extract x_request_id
716            if let Some(x_request_id_input) = visitor.fields.get("x_request_id") {
717                x_request_id = Some(x_request_id_input.to_string());
718            }
719
720            // Extract x_dynamo_request_id
721            if let Some(x_request_id_input) = visitor.fields.get("x_dynamo_request_id") {
722                x_dynamo_request_id = Some(x_request_id_input.to_string());
723            }
724
725            // Inherit trace context from parent span if available
726            if parent_id.is_none()
727                && let Some(parent_span_id) = ctx.current_span().id()
728                && let Some(parent_span) = ctx.span(parent_span_id)
729            {
730                let parent_ext = parent_span.extensions();
731                if let Some(parent_tracing_context) = parent_ext.get::<DistributedTraceContext>() {
732                    trace_id = Some(parent_tracing_context.trace_id.clone());
733                    parent_id = Some(parent_tracing_context.span_id.clone());
734                    tracestate = parent_tracing_context.tracestate.clone();
735                }
736            }
737
738            // Validate consistency
739            if (parent_id.is_some() || span_id.is_some()) && trace_id.is_none() {
740                tracing::error!("parent id or span id are set but trace id is not set!");
741                // Clear inconsistent IDs to maintain trace integrity
742                parent_id = None;
743                span_id = None;
744            }
745
746            // Store pending context - will be finalized in on_enter
747            let mut extensions = span.extensions_mut();
748            extensions.insert(PendingDistributedTraceContext {
749                trace_id,
750                span_id,
751                parent_id,
752                tracestate,
753                x_request_id,
754                x_dynamo_request_id,
755            });
756        }
757    }
758
759    // Finalizes the DistributedTraceContext when span is entered
760    // At this point, OtelData should have valid trace_id and span_id
761    fn on_enter(&self, id: &Id, ctx: Context<'_, S>) {
762        if let Some(span) = ctx.span(id) {
763            // Check if already initialized (e.g., span re-entered)
764            {
765                let extensions = span.extensions();
766                if extensions.get::<DistributedTraceContext>().is_some() {
767                    return;
768                }
769            }
770
771            // Get the pending context and extract OtelData IDs
772            let mut extensions = span.extensions_mut();
773            let pending = match extensions.remove::<PendingDistributedTraceContext>() {
774                Some(p) => p,
775                None => {
776                    // This shouldn't happen - on_new_span should have created it
777                    tracing::error!("PendingDistributedTraceContext not found in on_enter");
778                    return;
779                }
780            };
781
782            let mut trace_id = pending.trace_id;
783            let mut span_id = pending.span_id;
784            let parent_id = pending.parent_id;
785            let tracestate = pending.tracestate;
786            let x_request_id = pending.x_request_id;
787            let x_dynamo_request_id = pending.x_dynamo_request_id;
788
789            // Try to extract from OtelData if not already set
790            // Need to drop extensions_mut to get immutable borrow for OtelData
791            drop(extensions);
792
793            if trace_id.is_none() || span_id.is_none() {
794                let extensions = span.extensions();
795                if let Some(otel_data) = extensions.get::<tracing_opentelemetry::OtelData>() {
796                    // Extract trace_id from OTEL data if not already set
797                    if trace_id.is_none()
798                        && let Some(otel_trace_id) = otel_data.trace_id()
799                    {
800                        let trace_id_str = format!("{}", otel_trace_id);
801                        if is_valid_trace_id(&trace_id_str) {
802                            trace_id = Some(trace_id_str);
803                        }
804                    }
805
806                    // Extract span_id from OTEL data if not already set
807                    if span_id.is_none()
808                        && let Some(otel_span_id) = otel_data.span_id()
809                    {
810                        let span_id_str = format!("{}", otel_span_id);
811                        if is_valid_span_id(&span_id_str) {
812                            span_id = Some(span_id_str);
813                        }
814                    }
815                }
816            }
817
818            // Panic if we still don't have required IDs
819            if trace_id.is_none() {
820                panic!(
821                    "trace_id is not set in on_enter - OtelData may not be properly initialized"
822                );
823            }
824
825            if span_id.is_none() {
826                panic!("span_id is not set in on_enter - OtelData may not be properly initialized");
827            }
828
829            let span_level = span.metadata().level();
830            let mut extensions = span.extensions_mut();
831            extensions.insert(DistributedTraceContext {
832                trace_id: trace_id.expect("Trace ID must be set"),
833                span_id: span_id.expect("Span ID must be set"),
834                parent_id,
835                tracestate,
836                start: Some(Instant::now()),
837                end: None,
838                x_request_id,
839                x_dynamo_request_id,
840            });
841
842            drop(extensions);
843
844            // Emit SPAN_FIRST_ENTRY event. This only runs if the span passed the layer's filter
845            // (on_enter is not called for filtered-out spans), so no additional check needed.
846            if span_events_enabled() {
847                emit_at_level!(span_level, target: "span_event", message = "SPAN_FIRST_ENTRY");
848            }
849        }
850    }
851}
852
853// Enables functions to retreive their current
854// context for adding to distributed headers
855pub fn get_distributed_tracing_context() -> Option<DistributedTraceContext> {
856    Span::current()
857        .with_subscriber(|(id, subscriber)| {
858            subscriber
859                .downcast_ref::<Registry>()
860                .and_then(|registry| registry.span_data(id))
861                .and_then(|span_data| {
862                    let extensions = span_data.extensions();
863                    extensions.get::<DistributedTraceContext>().cloned()
864                })
865        })
866        .flatten()
867}
868
869/// Initialize the logger - must be called when Tokio runtime is available
870pub fn init() {
871    INIT.call_once(|| {
872        if let Err(e) = setup_logging() {
873            eprintln!("Failed to initialize logging: {}", e);
874            std::process::exit(1);
875        }
876    });
877}
878
879#[cfg(feature = "tokio-console")]
880fn setup_logging() {
881    let tokio_console_layer = console_subscriber::ConsoleLayer::builder()
882        .with_default_env()
883        .server_addr(([0, 0, 0, 0], console_subscriber::Server::DEFAULT_PORT))
884        .spawn();
885    let tokio_console_target = tracing_subscriber::filter::Targets::new()
886        .with_default(LevelFilter::ERROR)
887        .with_target("runtime", LevelFilter::TRACE)
888        .with_target("tokio", LevelFilter::TRACE);
889    let l = fmt::layer()
890        .with_ansi(!disable_ansi_logging())
891        .event_format(fmt::format().compact().with_timer(TimeFormatter::new()))
892        .with_writer(std::io::stderr)
893        .with_filter(filters(load_config()));
894    tracing_subscriber::registry()
895        .with(l)
896        .with(tokio_console_layer.with_filter(tokio_console_target))
897        .init();
898}
899
900#[cfg(not(feature = "tokio-console"))]
901fn setup_logging() -> Result<(), Box<dyn std::error::Error>> {
902    let fmt_filter_layer = filters(load_config());
903    let trace_filter_layer = filters(load_config());
904    let otel_filter_layer = filters(load_config());
905
906    if jsonl_logging_enabled() {
907        let span_events = if span_events_enabled() {
908            FmtSpan::CLOSE
909        } else {
910            FmtSpan::NONE
911        };
912        let l = fmt::layer()
913            .with_ansi(false)
914            .with_span_events(span_events)
915            .event_format(CustomJsonFormatter::new())
916            .with_writer(std::io::stderr)
917            .with_filter(fmt_filter_layer);
918
919        // Create OpenTelemetry tracer - conditionally export to OTLP based on env var
920        let service_name = get_service_name();
921
922        // Build tracer provider - with or without OTLP export
923        let (tracer_provider, endpoint_opt) = if otlp_exporter_enabled() {
924            // Export enabled: create OTLP exporter with batch processor
925            let endpoint = std::env::var(env_logging::otlp::OTEL_EXPORTER_OTLP_TRACES_ENDPOINT)
926                .unwrap_or_else(|_| DEFAULT_OTLP_ENDPOINT.to_string());
927
928            // Initialize OTLP exporter using gRPC (Tonic)
929            let otlp_exporter = opentelemetry_otlp::SpanExporter::builder()
930                .with_tonic()
931                .with_endpoint(&endpoint)
932                .build()?;
933
934            // Create tracer provider with batch exporter and service name
935            let provider = opentelemetry_sdk::trace::SdkTracerProvider::builder()
936                .with_batch_exporter(otlp_exporter)
937                .with_resource(
938                    opentelemetry_sdk::Resource::builder_empty()
939                        .with_service_name(service_name.clone())
940                        .build(),
941                )
942                .build();
943
944            (provider, Some(endpoint))
945        } else {
946            // No export - traces generated locally only (for logging/trace IDs)
947            let provider = opentelemetry_sdk::trace::SdkTracerProvider::builder()
948                .with_resource(
949                    opentelemetry_sdk::Resource::builder_empty()
950                        .with_service_name(service_name.clone())
951                        .build(),
952                )
953                .build();
954
955            (provider, None)
956        };
957
958        // Get a tracer from the provider
959        let tracer = tracer_provider.tracer(service_name.clone());
960
961        tracing_subscriber::registry()
962            .with(
963                tracing_opentelemetry::layer()
964                    .with_tracer(tracer)
965                    .with_filter(otel_filter_layer),
966            )
967            .with(DistributedTraceIdLayer.with_filter(trace_filter_layer))
968            .with(l)
969            .init();
970
971        // Log initialization status after subscriber is ready
972        if let Some(endpoint) = endpoint_opt {
973            tracing::info!(
974                endpoint = %endpoint,
975                service = %service_name,
976                "OpenTelemetry OTLP export enabled"
977            );
978        } else {
979            tracing::info!(
980                service = %service_name,
981                "OpenTelemetry OTLP export disabled, traces local only"
982            );
983        }
984    } else {
985        let l = fmt::layer()
986            .with_ansi(!disable_ansi_logging())
987            .event_format(fmt::format().compact().with_timer(TimeFormatter::new()))
988            .with_writer(std::io::stderr)
989            .with_filter(fmt_filter_layer);
990
991        tracing_subscriber::registry().with(l).init();
992    }
993
994    Ok(())
995}
996
997fn filters(config: LoggingConfig) -> EnvFilter {
998    let mut filter_layer = EnvFilter::builder()
999        .with_default_directive(config.log_level.parse().unwrap())
1000        .with_env_var(env_logging::DYN_LOG)
1001        .from_env_lossy();
1002
1003    for (module, level) in config.log_filters {
1004        match format!("{module}={level}").parse::<Directive>() {
1005            Ok(d) => {
1006                filter_layer = filter_layer.add_directive(d);
1007            }
1008            Err(e) => {
1009                eprintln!("Failed parsing filter '{level}' for module '{module}': {e}");
1010            }
1011        }
1012    }
1013
1014    // When span events are enabled, allow "span_event" target at all levels
1015    // This ensures SPAN_FIRST_ENTRY events pass the filter when emitted from on_enter
1016    if span_events_enabled() {
1017        filter_layer = filter_layer.add_directive("span_event=trace".parse().unwrap());
1018    }
1019
1020    filter_layer
1021}
1022
1023/// Log a message with file and line info
1024/// Used by Python wrapper
1025pub fn log_message(level: &str, message: &str, module: &str, file: &str, line: u32) {
1026    let level = match level {
1027        "debug" => log::Level::Debug,
1028        "info" => log::Level::Info,
1029        "warn" => log::Level::Warn,
1030        "error" => log::Level::Error,
1031        "warning" => log::Level::Warn,
1032        _ => log::Level::Info,
1033    };
1034    log::logger().log(
1035        &log::Record::builder()
1036            .args(format_args!("{}", message))
1037            .level(level)
1038            .target(module)
1039            .file(Some(file))
1040            .line(Some(line))
1041            .build(),
1042    );
1043}
1044
1045fn load_config() -> LoggingConfig {
1046    let config_path =
1047        std::env::var(env_logging::DYN_LOGGING_CONFIG_PATH).unwrap_or_else(|_| "".to_string());
1048    let figment = Figment::new()
1049        .merge(Serialized::defaults(LoggingConfig::default()))
1050        .merge(Toml::file("/opt/dynamo/etc/logging.toml"))
1051        .merge(Toml::file(config_path));
1052
1053    figment.extract().unwrap()
1054}
1055
1056#[derive(Serialize)]
1057struct JsonLog<'a> {
1058    time: String,
1059    level: String,
1060    #[serde(skip_serializing_if = "Option::is_none")]
1061    file: Option<&'a str>,
1062    #[serde(skip_serializing_if = "Option::is_none")]
1063    line: Option<u32>,
1064    target: String,
1065    message: serde_json::Value,
1066    #[serde(flatten)]
1067    fields: BTreeMap<String, serde_json::Value>,
1068}
1069
1070struct TimeFormatter {
1071    use_local_tz: bool,
1072}
1073
1074impl TimeFormatter {
1075    fn new() -> Self {
1076        Self {
1077            use_local_tz: crate::config::use_local_timezone(),
1078        }
1079    }
1080
1081    fn format_now(&self) -> String {
1082        if self.use_local_tz {
1083            chrono::Local::now()
1084                .format("%Y-%m-%dT%H:%M:%S%.6f%:z")
1085                .to_string()
1086        } else {
1087            chrono::Utc::now()
1088                .format("%Y-%m-%dT%H:%M:%S%.6fZ")
1089                .to_string()
1090        }
1091    }
1092}
1093
1094impl FormatTime for TimeFormatter {
1095    fn format_time(&self, w: &mut fmt::format::Writer<'_>) -> std::fmt::Result {
1096        write!(w, "{}", self.format_now())
1097    }
1098}
1099
1100struct CustomJsonFormatter {
1101    time_formatter: TimeFormatter,
1102}
1103
1104impl CustomJsonFormatter {
1105    fn new() -> Self {
1106        Self {
1107            time_formatter: TimeFormatter::new(),
1108        }
1109    }
1110}
1111
1112use once_cell::sync::Lazy;
1113use regex::Regex;
1114
1115/// Static W3C Trace Context propagator instance to avoid repeated allocations
1116static TRACE_PROPAGATOR: Lazy<opentelemetry_sdk::propagation::TraceContextPropagator> =
1117    Lazy::new(opentelemetry_sdk::propagation::TraceContextPropagator::new);
1118
1119fn parse_tracing_duration(s: &str) -> Option<u64> {
1120    static RE: Lazy<Regex> =
1121        Lazy::new(|| Regex::new(r#"^["']?\s*([0-9.]+)\s*(µs|us|ns|ms|s)\s*["']?$"#).unwrap());
1122    let captures = RE.captures(s)?;
1123    let value: f64 = captures[1].parse().ok()?;
1124    let unit = &captures[2];
1125    match unit {
1126        "ns" => Some((value / 1000.0) as u64),
1127        "µs" | "us" => Some(value as u64),
1128        "ms" => Some((value * 1000.0) as u64),
1129        "s" => Some((value * 1_000_000.0) as u64),
1130        _ => None,
1131    }
1132}
1133
1134impl<S, N> tracing_subscriber::fmt::FormatEvent<S, N> for CustomJsonFormatter
1135where
1136    S: Subscriber + for<'a> LookupSpan<'a>,
1137    N: for<'a> FormatFields<'a> + 'static,
1138{
1139    fn format_event(
1140        &self,
1141        ctx: &FmtContext<'_, S, N>,
1142        mut writer: Writer<'_>,
1143        event: &Event<'_>,
1144    ) -> std::fmt::Result {
1145        let mut visitor = JsonVisitor::default();
1146        let time = self.time_formatter.format_now();
1147        event.record(&mut visitor);
1148        let mut message = visitor
1149            .fields
1150            .remove("message")
1151            .unwrap_or(serde_json::Value::String("".to_string()));
1152
1153        let mut target_override: Option<String> = None;
1154
1155        let current_span = event
1156            .parent()
1157            .and_then(|id| ctx.span(id))
1158            .or_else(|| ctx.lookup_current());
1159        if let Some(span) = current_span {
1160            let ext = span.extensions();
1161            let data = ext.get::<FormattedFields<N>>().unwrap();
1162            let span_fields: Vec<(&str, &str)> = data
1163                .fields
1164                .split(' ')
1165                .filter_map(|entry| entry.split_once('='))
1166                .collect();
1167            for (name, value) in span_fields {
1168                visitor.fields.insert(
1169                    name.to_string(),
1170                    serde_json::Value::String(value.trim_matches('"').to_string()),
1171                );
1172            }
1173
1174            let busy_us = visitor
1175                .fields
1176                .remove("time.busy")
1177                .and_then(|v| parse_tracing_duration(&v.to_string()));
1178            let idle_us = visitor
1179                .fields
1180                .remove("time.idle")
1181                .and_then(|v| parse_tracing_duration(&v.to_string()));
1182
1183            if let (Some(busy_us), Some(idle_us)) = (busy_us, idle_us) {
1184                visitor.fields.insert(
1185                    "time.busy_us".to_string(),
1186                    serde_json::Value::Number(busy_us.into()),
1187                );
1188                visitor.fields.insert(
1189                    "time.idle_us".to_string(),
1190                    serde_json::Value::Number(idle_us.into()),
1191                );
1192                visitor.fields.insert(
1193                    "time.duration_us".to_string(),
1194                    serde_json::Value::Number((busy_us + idle_us).into()),
1195                );
1196            }
1197
1198            let is_span_created = message.as_str() == Some("SPAN_FIRST_ENTRY");
1199            let is_span_closed = message.as_str() == Some("close");
1200            if is_span_created || is_span_closed {
1201                target_override = Some(span.metadata().target().to_string());
1202                if is_span_closed {
1203                    message = serde_json::Value::String("SPAN_CLOSED".to_string());
1204                }
1205            }
1206
1207            visitor.fields.insert(
1208                "span_name".to_string(),
1209                serde_json::Value::String(span.name().to_string()),
1210            );
1211
1212            if let Some(tracing_context) = ext.get::<DistributedTraceContext>() {
1213                visitor.fields.insert(
1214                    "span_id".to_string(),
1215                    serde_json::Value::String(tracing_context.span_id.clone()),
1216                );
1217                visitor.fields.insert(
1218                    "trace_id".to_string(),
1219                    serde_json::Value::String(tracing_context.trace_id.clone()),
1220                );
1221                if let Some(parent_id) = tracing_context.parent_id.clone() {
1222                    visitor.fields.insert(
1223                        "parent_id".to_string(),
1224                        serde_json::Value::String(parent_id),
1225                    );
1226                } else {
1227                    visitor.fields.remove("parent_id");
1228                }
1229                if let Some(tracestate) = tracing_context.tracestate.clone() {
1230                    visitor.fields.insert(
1231                        "tracestate".to_string(),
1232                        serde_json::Value::String(tracestate),
1233                    );
1234                } else {
1235                    visitor.fields.remove("tracestate");
1236                }
1237                if let Some(x_request_id) = tracing_context.x_request_id.clone() {
1238                    visitor.fields.insert(
1239                        "x_request_id".to_string(),
1240                        serde_json::Value::String(x_request_id),
1241                    );
1242                } else {
1243                    visitor.fields.remove("x_request_id");
1244                }
1245
1246                if let Some(x_dynamo_request_id) = tracing_context.x_dynamo_request_id.clone() {
1247                    visitor.fields.insert(
1248                        "x_dynamo_request_id".to_string(),
1249                        serde_json::Value::String(x_dynamo_request_id),
1250                    );
1251                } else {
1252                    visitor.fields.remove("x_dynamo_request_id");
1253                }
1254            } else {
1255                tracing::error!(
1256                    "Distributed Trace Context not found, falling back to internal ids"
1257                );
1258                visitor.fields.insert(
1259                    "span_id".to_string(),
1260                    serde_json::Value::String(span.id().into_u64().to_string()),
1261                );
1262                if let Some(parent) = span.parent() {
1263                    visitor.fields.insert(
1264                        "parent_id".to_string(),
1265                        serde_json::Value::String(parent.id().into_u64().to_string()),
1266                    );
1267                }
1268            }
1269        } else {
1270            let reserved_fields = [
1271                "trace_id",
1272                "span_id",
1273                "parent_id",
1274                "span_name",
1275                "tracestate",
1276            ];
1277            for reserved_field in reserved_fields {
1278                visitor.fields.remove(reserved_field);
1279            }
1280        }
1281        let metadata = event.metadata();
1282        let log = JsonLog {
1283            level: metadata.level().to_string(),
1284            time,
1285            file: metadata.file(),
1286            line: metadata.line(),
1287            target: target_override.unwrap_or_else(|| metadata.target().to_string()),
1288            message,
1289            fields: visitor.fields,
1290        };
1291        let json = serde_json::to_string(&log).unwrap();
1292        writeln!(writer, "{json}")
1293    }
1294}
1295
1296#[derive(Default)]
1297struct JsonVisitor {
1298    fields: BTreeMap<String, serde_json::Value>,
1299}
1300
1301impl tracing::field::Visit for JsonVisitor {
1302    fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
1303        self.fields.insert(
1304            field.name().to_string(),
1305            serde_json::Value::String(format!("{value:?}")),
1306        );
1307    }
1308
1309    fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
1310        if field.name() != "message" {
1311            match serde_json::from_str::<Value>(value) {
1312                Ok(json_val) => self.fields.insert(field.name().to_string(), json_val),
1313                Err(_) => self.fields.insert(field.name().to_string(), value.into()),
1314            };
1315        } else {
1316            self.fields.insert(field.name().to_string(), value.into());
1317        }
1318    }
1319
1320    fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
1321        self.fields
1322            .insert(field.name().to_string(), serde_json::Value::Bool(value));
1323    }
1324
1325    fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
1326        self.fields.insert(
1327            field.name().to_string(),
1328            serde_json::Value::Number(value.into()),
1329        );
1330    }
1331
1332    fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
1333        self.fields.insert(
1334            field.name().to_string(),
1335            serde_json::Value::Number(value.into()),
1336        );
1337    }
1338
1339    fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
1340        use serde_json::value::Number;
1341        self.fields.insert(
1342            field.name().to_string(),
1343            serde_json::Value::Number(Number::from_f64(value).unwrap_or(0.into())),
1344        );
1345    }
1346}
1347
1348#[cfg(test)]
1349pub mod tests {
1350    use super::*;
1351    use anyhow::{Result, anyhow};
1352    use chrono::{DateTime, Utc};
1353    use jsonschema::{Draft, JSONSchema};
1354    use serde_json::Value;
1355    use std::fs::File;
1356    use std::io::{BufRead, BufReader};
1357    use stdio_override::*;
1358    use tempfile::NamedTempFile;
1359
1360    static LOG_LINE_SCHEMA: &str = r#"
1361    {
1362      "$schema": "http://json-schema.org/draft-07/schema#",
1363      "title": "Runtime Log Line",
1364      "type": "object",
1365      "required": [
1366        "file",
1367        "level",
1368        "line",
1369        "message",
1370        "target",
1371        "time"
1372      ],
1373      "properties": {
1374        "file":      { "type": "string" },
1375        "level":     { "type": "string", "enum": ["ERROR", "WARN", "INFO", "DEBUG", "TRACE"] },
1376        "line":      { "type": "integer" },
1377        "message":   { "type": "string" },
1378        "target":    { "type": "string" },
1379        "time":      { "type": "string", "format": "date-time" },
1380        "span_id":   { "type": "string", "pattern": "^[a-f0-9]{16}$" },
1381        "parent_id": { "type": "string", "pattern": "^[a-f0-9]{16}$" },
1382        "trace_id":  { "type": "string", "pattern": "^[a-f0-9]{32}$" },
1383        "span_name": { "type": "string" },
1384        "time.busy_us":     { "type": "integer" },
1385        "time.duration_us": { "type": "integer" },
1386        "time.idle_us":     { "type": "integer" },
1387        "tracestate": { "type": "string" }
1388      },
1389      "additionalProperties": true
1390    }
1391    "#;
1392
1393    #[tracing::instrument(skip_all)]
1394    async fn parent() {
1395        tracing::trace!(message = "parent!");
1396        if let Some(my_ctx) = get_distributed_tracing_context() {
1397            tracing::info!(my_trace_id = my_ctx.trace_id);
1398        }
1399        child().await;
1400    }
1401
1402    #[tracing::instrument(skip_all)]
1403    async fn child() {
1404        tracing::trace!(message = "child");
1405        if let Some(my_ctx) = get_distributed_tracing_context() {
1406            tracing::info!(my_trace_id = my_ctx.trace_id);
1407        }
1408        grandchild().await;
1409    }
1410
1411    #[tracing::instrument(skip_all)]
1412    async fn grandchild() {
1413        tracing::trace!(message = "grandchild");
1414        if let Some(my_ctx) = get_distributed_tracing_context() {
1415            tracing::info!(my_trace_id = my_ctx.trace_id);
1416        }
1417    }
1418
1419    pub fn load_log(file_name: &str) -> Result<Vec<serde_json::Value>> {
1420        let schema_json: Value =
1421            serde_json::from_str(LOG_LINE_SCHEMA).expect("schema parse failure");
1422        let compiled_schema = JSONSchema::options()
1423            .with_draft(Draft::Draft7)
1424            .compile(&schema_json)
1425            .expect("Invalid schema");
1426
1427        let f = File::open(file_name)?;
1428        let reader = BufReader::new(f);
1429        let mut result = Vec::new();
1430
1431        for (line_num, line) in reader.lines().enumerate() {
1432            let line = line?;
1433            let val: Value = serde_json::from_str(&line)
1434                .map_err(|e| anyhow!("Line {}: invalid JSON: {}", line_num + 1, e))?;
1435
1436            if let Err(errors) = compiled_schema.validate(&val) {
1437                let errs = errors.map(|e| e.to_string()).collect::<Vec<_>>().join("; ");
1438                return Err(anyhow!(
1439                    "Line {}: JSON Schema Validation errors: {}",
1440                    line_num + 1,
1441                    errs
1442                ));
1443            }
1444            println!("{}", val);
1445            result.push(val);
1446        }
1447        Ok(result)
1448    }
1449
1450    #[tokio::test]
1451    async fn test_json_log_capture() -> Result<()> {
1452        #[allow(clippy::redundant_closure_call)]
1453        let _ = temp_env::async_with_vars(
1454            [(env_logging::DYN_LOGGING_JSONL, Some("1"))],
1455            (async || {
1456                let tmp_file = NamedTempFile::new().unwrap();
1457                let file_name = tmp_file.path().to_str().unwrap();
1458                let guard = StderrOverride::from_file(file_name)?;
1459                init();
1460                parent().await;
1461                drop(guard);
1462
1463                let lines = load_log(file_name)?;
1464
1465                // 1. Extract the dynamically generated trace ID and validate consistency
1466                // All logs should have the same trace_id since they're part of the same trace
1467                // Skip any initialization logs that don't have trace_id (e.g., OTLP setup messages)
1468                //
1469                // Note: This test can fail if logging was already initialized by another test running
1470                // in parallel. Logging initialization is global (Once) and can only happen once per process.
1471                // If no trace_id is found, skip validation gracefully.
1472                let Some(trace_id) = lines
1473                    .iter()
1474                    .find_map(|log_line| log_line.get("trace_id").and_then(|v| v.as_str()))
1475                    .map(|s| s.to_string())
1476                else {
1477                    // Skip test if logging was already initialized - we can't control the output format
1478                    return Ok(());
1479                };
1480
1481                // Verify trace_id is not a zero/invalid ID
1482                assert_ne!(
1483                    trace_id, "00000000000000000000000000000000",
1484                    "trace_id should not be a zero/invalid ID"
1485                );
1486                assert!(
1487                    !trace_id.chars().all(|c| c == '0'),
1488                    "trace_id should not be all zeros"
1489                );
1490
1491                // Verify all logs have the same trace_id
1492                for log_line in &lines {
1493                    if let Some(line_trace_id) = log_line.get("trace_id") {
1494                        assert_eq!(
1495                            line_trace_id.as_str().unwrap(),
1496                            &trace_id,
1497                            "All logs should have the same trace_id"
1498                        );
1499                    }
1500                }
1501
1502                // Validate my_trace_id matches the actual trace ID
1503                for log_line in &lines {
1504                    if let Some(my_trace_id) = log_line.get("my_trace_id") {
1505                        assert_eq!(
1506                            my_trace_id,
1507                            &serde_json::Value::String(trace_id.clone()),
1508                            "my_trace_id should match the trace_id from distributed tracing context"
1509                        );
1510                    }
1511                }
1512
1513                // 2. Validate span IDs exist and are properly formatted
1514                let mut span_ids_seen: std::collections::HashSet<String> = std::collections::HashSet::new();
1515                let mut span_timestamps: std::collections::HashMap<String, DateTime<Utc>> = std::collections::HashMap::new();
1516
1517                for log_line in &lines {
1518                    if let Some(span_id) = log_line.get("span_id") {
1519                        let span_id_str = span_id.as_str().unwrap();
1520                        assert!(
1521                            is_valid_span_id(span_id_str),
1522                            "Invalid span_id format: {}",
1523                            span_id_str
1524                        );
1525                        span_ids_seen.insert(span_id_str.to_string());
1526                    }
1527
1528                    // Validate timestamp format and track span timestamps
1529                    if let Some(time_str) = log_line.get("time").and_then(|v| v.as_str()) {
1530                        let timestamp = DateTime::parse_from_rfc3339(time_str)
1531                            .expect("All timestamps should be valid RFC3339 format")
1532                            .with_timezone(&Utc);
1533
1534                        // Track timestamp for each span_name
1535                        if let Some(span_name) = log_line.get("span_name").and_then(|v| v.as_str()) {
1536                            span_timestamps.insert(span_name.to_string(), timestamp);
1537                        }
1538                    }
1539                }
1540
1541                // 3. Validate parent-child span relationships
1542                // Extract span IDs for each span by looking at their log messages
1543                let parent_span_id = lines
1544                    .iter()
1545                    .find(|log_line| {
1546                        log_line.get("span_name")
1547                            .and_then(|v| v.as_str()) == Some("parent")
1548                    })
1549                    .and_then(|log_line| {
1550                        log_line.get("span_id")
1551                            .and_then(|v| v.as_str())
1552                            .map(|s| s.to_string())
1553                    })
1554                    .expect("Should find parent span with span_id");
1555
1556                let child_span_id = lines
1557                    .iter()
1558                    .find(|log_line| {
1559                        log_line.get("span_name")
1560                            .and_then(|v| v.as_str()) == Some("child")
1561                    })
1562                    .and_then(|log_line| {
1563                        log_line.get("span_id")
1564                            .and_then(|v| v.as_str())
1565                            .map(|s| s.to_string())
1566                    })
1567                    .expect("Should find child span with span_id");
1568
1569                let grandchild_span_id = lines
1570                    .iter()
1571                    .find(|log_line| {
1572                        log_line.get("span_name")
1573                            .and_then(|v| v.as_str()) == Some("grandchild")
1574                    })
1575                    .and_then(|log_line| {
1576                        log_line.get("span_id")
1577                            .and_then(|v| v.as_str())
1578                            .map(|s| s.to_string())
1579                    })
1580                    .expect("Should find grandchild span with span_id");
1581
1582                // Verify span IDs are unique
1583                assert_ne!(parent_span_id, child_span_id, "Parent and child should have different span IDs");
1584                assert_ne!(child_span_id, grandchild_span_id, "Child and grandchild should have different span IDs");
1585                assert_ne!(parent_span_id, grandchild_span_id, "Parent and grandchild should have different span IDs");
1586
1587                // Verify parent span has no parent_id
1588                for log_line in &lines {
1589                    if let Some(span_name) = log_line.get("span_name")
1590                        && let Some(span_name_str) = span_name.as_str()
1591                        && span_name_str == "parent"
1592                    {
1593                        assert!(
1594                            log_line.get("parent_id").is_none(),
1595                            "Parent span should not have a parent_id"
1596                        );
1597                    }
1598                }
1599
1600                // Verify child span's parent_id is parent_span_id
1601                for log_line in &lines {
1602                    if let Some(span_name) = log_line.get("span_name")
1603                        && let Some(span_name_str) = span_name.as_str()
1604                        && span_name_str == "child"
1605                    {
1606                        let parent_id = log_line.get("parent_id")
1607                            .and_then(|v| v.as_str())
1608                            .expect("Child span should have a parent_id");
1609                        assert_eq!(
1610                            parent_id,
1611                            parent_span_id,
1612                            "Child's parent_id should match parent's span_id"
1613                        );
1614                    }
1615                }
1616
1617                // Verify grandchild span's parent_id is child_span_id
1618                for log_line in &lines {
1619                    if let Some(span_name) = log_line.get("span_name")
1620                        && let Some(span_name_str) = span_name.as_str()
1621                        && span_name_str == "grandchild"
1622                    {
1623                        let parent_id = log_line.get("parent_id")
1624                            .and_then(|v| v.as_str())
1625                            .expect("Grandchild span should have a parent_id");
1626                        assert_eq!(
1627                            parent_id,
1628                            child_span_id,
1629                            "Grandchild's parent_id should match child's span_id"
1630                        );
1631                    }
1632                }
1633
1634                // 4. Validate timestamp ordering - spans should log in execution order
1635                let parent_time = span_timestamps.get("parent")
1636                    .expect("Should have timestamp for parent span");
1637                let child_time = span_timestamps.get("child")
1638                    .expect("Should have timestamp for child span");
1639                let grandchild_time = span_timestamps.get("grandchild")
1640                    .expect("Should have timestamp for grandchild span");
1641
1642                // Parent logs first (or at same time), then child, then grandchild
1643                assert!(
1644                    parent_time <= child_time,
1645                    "Parent span should log before or at same time as child span (parent: {}, child: {})",
1646                    parent_time,
1647                    child_time
1648                );
1649                assert!(
1650                    child_time <= grandchild_time,
1651                    "Child span should log before or at same time as grandchild span (child: {}, grandchild: {})",
1652                    child_time,
1653                    grandchild_time
1654                );
1655
1656                Ok::<(), anyhow::Error>(())
1657            })(),
1658        )
1659        .await;
1660        Ok(())
1661    }
1662
1663    // Test functions at different log levels for filtering tests
1664    #[tracing::instrument(level = "debug", skip_all)]
1665    async fn debug_level_span() {
1666        tracing::debug!("inside debug span");
1667    }
1668
1669    #[tracing::instrument(level = "info", skip_all)]
1670    async fn info_level_span() {
1671        tracing::info!("inside info span");
1672    }
1673
1674    #[tracing::instrument(level = "warn", skip_all)]
1675    async fn warn_level_span() {
1676        tracing::warn!("inside warn span");
1677    }
1678
1679    // Span from a different target - should be FILTERED OUT at info level
1680    // because the filter is warn,dynamo_runtime::logging::tests=debug
1681    #[tracing::instrument(level = "info", target = "other_module", skip_all)]
1682    async fn other_target_info_span() {
1683        tracing::info!(target: "other_module", "inside other target span");
1684    }
1685
1686    /// Comprehensive test for span events covering:
1687    /// - SPAN_FIRST_ENTRY and SPAN_CLOSED event emission
1688    /// - Trace context (trace_id, span_id) in span events
1689    /// - Timing information in SPAN_CLOSED events
1690    /// - Level-based filtering (positive: allowed levels pass, negative: filtered levels blocked)
1691    /// - Target-based filtering (spans from allowed targets pass even at lower levels)
1692    ///
1693    /// This test runs in a subprocess to ensure logging is initialized with our specific
1694    /// filter settings (DYN_LOG=warn,dynamo_runtime::logging::tests=debug), avoiding
1695    /// interference from other tests that may have initialized logging first.
1696    #[test]
1697    fn test_span_events() {
1698        use std::process::Command;
1699
1700        // Run cargo test for the subprocess test with specific env vars
1701        let output = Command::new("cargo")
1702            .args([
1703                "test",
1704                "-p",
1705                "dynamo-runtime",
1706                "test_span_events_subprocess",
1707                "--",
1708                "--exact",
1709                "--nocapture",
1710            ])
1711            .env("DYN_LOGGING_JSONL", "1")
1712            .env("DYN_LOGGING_SPAN_EVENTS", "1")
1713            .env("DYN_LOG", "warn,dynamo_runtime::logging::tests=debug")
1714            .output()
1715            .expect("Failed to execute subprocess test");
1716
1717        // Print output for debugging
1718        if !output.status.success() {
1719            eprintln!(
1720                "=== STDOUT ===\n{}",
1721                String::from_utf8_lossy(&output.stdout)
1722            );
1723            eprintln!(
1724                "=== STDERR ===\n{}",
1725                String::from_utf8_lossy(&output.stderr)
1726            );
1727        }
1728
1729        assert!(
1730            output.status.success(),
1731            "Subprocess test failed with exit code: {:?}",
1732            output.status.code()
1733        );
1734    }
1735
1736    /// Subprocess test that performs the actual span event validation.
1737    /// This is called by test_span_events in a separate process with controlled env vars.
1738    #[tokio::test]
1739    async fn test_span_events_subprocess() -> Result<()> {
1740        // Skip if not running as subprocess (env vars not set)
1741        if std::env::var("DYN_LOGGING_SPAN_EVENTS").is_err() {
1742            return Ok(());
1743        }
1744
1745        let tmp_file = NamedTempFile::new().unwrap();
1746        let file_name = tmp_file.path().to_str().unwrap();
1747        let guard = StderrOverride::from_file(file_name)?;
1748        init();
1749
1750        // Run parent/child/grandchild spans (all INFO level by default)
1751        parent().await;
1752
1753        // Run spans at explicit levels from our test module
1754        debug_level_span().await;
1755        info_level_span().await;
1756        warn_level_span().await;
1757
1758        // Run span from different target (should be filtered out)
1759        other_target_info_span().await;
1760
1761        drop(guard);
1762
1763        let lines = load_log(file_name)?;
1764
1765        // Helper to check if a span event exists
1766        let has_span_event = |msg: &str, span_name: &str| {
1767            lines.iter().any(|log| {
1768                log.get("message").and_then(|v| v.as_str()) == Some(msg)
1769                    && log.get("span_name").and_then(|v| v.as_str()) == Some(span_name)
1770            })
1771        };
1772
1773        // Helper to get span events
1774        let get_span_events = |msg: &str| -> Vec<&serde_json::Value> {
1775            lines
1776                .iter()
1777                .filter(|log| log.get("message").and_then(|v| v.as_str()) == Some(msg))
1778                .collect()
1779        };
1780
1781        // === Test 1: SPAN_FIRST_ENTRY events have required fields ===
1782        let span_created_events = get_span_events("SPAN_FIRST_ENTRY");
1783        for event in &span_created_events {
1784            // Must have span_name
1785            assert!(
1786                event.get("span_name").is_some(),
1787                "SPAN_FIRST_ENTRY must have span_name"
1788            );
1789            // Must have valid trace_id (format check)
1790            let trace_id = event
1791                .get("trace_id")
1792                .and_then(|v| v.as_str())
1793                .expect("SPAN_FIRST_ENTRY must have trace_id");
1794            assert!(
1795                trace_id.len() == 32 && trace_id.chars().all(|c| c.is_ascii_hexdigit()),
1796                "SPAN_FIRST_ENTRY must have valid trace_id format"
1797            );
1798            // Must have valid span_id
1799            let span_id = event
1800                .get("span_id")
1801                .and_then(|v| v.as_str())
1802                .expect("SPAN_FIRST_ENTRY must have span_id");
1803            assert!(
1804                is_valid_span_id(span_id),
1805                "SPAN_FIRST_ENTRY must have valid span_id"
1806            );
1807        }
1808
1809        // === Test 2: SPAN_CLOSED events have timing info ===
1810        let span_closed_events = get_span_events("SPAN_CLOSED");
1811        for event in &span_closed_events {
1812            assert!(
1813                event.get("span_name").is_some(),
1814                "SPAN_CLOSED must have span_name"
1815            );
1816            assert!(
1817                event.get("time.busy_us").is_some()
1818                    || event.get("time.idle_us").is_some()
1819                    || event.get("time.duration_us").is_some(),
1820                "SPAN_CLOSED must have timing information"
1821            );
1822            // Must have valid trace_id
1823            let trace_id = event
1824                .get("trace_id")
1825                .and_then(|v| v.as_str())
1826                .expect("SPAN_CLOSED must have trace_id");
1827            assert!(
1828                trace_id.len() == 32 && trace_id.chars().all(|c| c.is_ascii_hexdigit()),
1829                "SPAN_CLOSED must have valid trace_id format"
1830            );
1831        }
1832
1833        // === Test 3: Target-based filtering (positive) ===
1834        // Spans from dynamo_runtime::logging::tests should pass at ALL levels
1835        // because the target is allowed at debug level
1836        assert!(
1837            has_span_event("SPAN_FIRST_ENTRY", "debug_level_span"),
1838            "DEBUG span from allowed target MUST pass (target=debug filter)"
1839        );
1840        assert!(
1841            has_span_event("SPAN_FIRST_ENTRY", "info_level_span"),
1842            "INFO span from allowed target MUST pass (target=debug filter)"
1843        );
1844        assert!(
1845            has_span_event("SPAN_FIRST_ENTRY", "warn_level_span"),
1846            "WARN span from allowed target MUST pass (target=debug filter)"
1847        );
1848
1849        // parent/child/grandchild are INFO level from allowed target - should pass
1850        assert!(
1851            has_span_event("SPAN_FIRST_ENTRY", "parent"),
1852            "parent span (INFO) from allowed target MUST pass"
1853        );
1854        assert!(
1855            has_span_event("SPAN_FIRST_ENTRY", "child"),
1856            "child span (INFO) from allowed target MUST pass"
1857        );
1858        assert!(
1859            has_span_event("SPAN_FIRST_ENTRY", "grandchild"),
1860            "grandchild span (INFO) from allowed target MUST pass"
1861        );
1862
1863        // === Test 4: Level-based filtering (negative) ===
1864        // Verify spans from OTHER targets at debug/info level are filtered out
1865        assert!(
1866            !has_span_event("SPAN_FIRST_ENTRY", "other_target_info_span"),
1867            "INFO span from non-allowed target (other_module) MUST be filtered out"
1868        );
1869
1870        // Also verify no spans from other targets appear at debug/info level
1871        for event in &span_created_events {
1872            let target = event.get("target").and_then(|v| v.as_str()).unwrap_or("");
1873            let level = event.get("level").and_then(|v| v.as_str()).unwrap_or("");
1874
1875            // If level is DEBUG or INFO, target must be our test module
1876            if level == "DEBUG" || level == "INFO" {
1877                assert!(
1878                    target.contains("dynamo_runtime::logging::tests"),
1879                    "DEBUG/INFO span must be from allowed target, got target={target}"
1880                );
1881            }
1882        }
1883
1884        Ok(())
1885    }
1886}