dynamo_runtime/
logging.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Dynamo Distributed Logging Module.
5//!
6//! - Configuration loaded from:
7//!   1. Environment variables (highest priority).
8//!   2. Optional TOML file pointed to by the `DYN_LOGGING_CONFIG_PATH` environment variable.
9//!   3. `/opt/dynamo/etc/logging.toml`.
10//!
11//! Logging can take two forms: `READABLE` or `JSONL`. The default is `READABLE`. `JSONL`
12//! can be enabled by setting the `DYN_LOGGING_JSONL` environment variable to `1`.
13//!
14//! To use local timezone for logging timestamps, set the `DYN_LOG_USE_LOCAL_TZ` environment variable to `1`.
15//!
16//! Filters can be configured using the `DYN_LOG` environment variable or by setting the `filters`
17//! key in the TOML configuration file. Filters are comma-separated key-value pairs where the key
18//! is the crate or module name and the value is the log level. The default log level is `info`.
19//!
20//! Example:
21//! ```toml
22//! log_level = "error"
23//!
24//! [log_filters]
25//! "test_logging" = "info"
26//! "test_logging::api" = "trace"
27//! ```
28
29use std::collections::{BTreeMap, HashMap};
30use std::sync::Once;
31
32use figment::{
33    Figment,
34    providers::{Format, Serialized, Toml},
35};
36use serde::{Deserialize, Serialize};
37use tracing::level_filters::LevelFilter;
38use tracing::{Event, Subscriber};
39use tracing_subscriber::EnvFilter;
40use tracing_subscriber::fmt::time::FormatTime;
41use tracing_subscriber::fmt::time::LocalTime;
42use tracing_subscriber::fmt::time::SystemTime;
43use tracing_subscriber::fmt::time::UtcTime;
44use tracing_subscriber::fmt::{FmtContext, FormatFields};
45use tracing_subscriber::fmt::{FormattedFields, format::Writer};
46use tracing_subscriber::prelude::*;
47use tracing_subscriber::registry::LookupSpan;
48use tracing_subscriber::{filter::Directive, fmt};
49
50use crate::config::{disable_ansi_logging, jsonl_logging_enabled};
51use async_nats::{HeaderMap, HeaderValue};
52use axum::extract::FromRequestParts;
53use axum::http;
54use axum::http::Request;
55use axum::http::request::Parts;
56use serde_json::Value;
57use std::convert::Infallible;
58use std::time::Instant;
59use tower_http::trace::{DefaultMakeSpan, TraceLayer};
60use tracing::Id;
61use tracing::Span;
62use tracing::field::Field;
63use tracing::span;
64use tracing_subscriber::Layer;
65use tracing_subscriber::Registry;
66use tracing_subscriber::field::Visit;
67use tracing_subscriber::fmt::format::FmtSpan;
68use tracing_subscriber::layer::Context;
69use tracing_subscriber::registry::SpanData;
70use uuid::Uuid;
71
72use opentelemetry::propagation::{Extractor, Injector, TextMapPropagator};
73use opentelemetry::trace::TraceContextExt;
74use opentelemetry::{global, trace::Tracer};
75use opentelemetry_otlp::WithExportConfig;
76
77use opentelemetry::trace::TracerProvider as _;
78use opentelemetry::{Key, KeyValue};
79use opentelemetry_sdk::Resource;
80use opentelemetry_sdk::trace::SdkTracerProvider;
81use tracing::error;
82use tracing_subscriber::layer::SubscriberExt;
83// use tracing_subscriber::Registry;
84
85use std::time::Duration;
86use tracing::{info, instrument};
87use tracing_opentelemetry::OpenTelemetrySpanExt;
88use tracing_subscriber::util::SubscriberInitExt;
89
90/// ENV used to set the log level
91const FILTER_ENV: &str = "DYN_LOG";
92
93/// Default log level
94const DEFAULT_FILTER_LEVEL: &str = "info";
95
96/// ENV used to set the path to the logging configuration file
97const CONFIG_PATH_ENV: &str = "DYN_LOGGING_CONFIG_PATH";
98
99/// Enable OTLP trace exporting
100const OTEL_EXPORT_ENABLED_ENV: &str = "OTEL_EXPORT_ENABLED";
101
102/// (OLTP exporter env var spec defined here - https://opentelemetry.io/docs/specs/otel/protocol/exporter/)
103/// OTEL exporter endpoint
104const OTEL_EXPORT_ENDPOINT_ENV: &str = "OTEL_EXPORTER_OTLP_TRACES_ENDPOINT";
105
106/// Default OTLP endpoint
107const DEFAULT_OTLP_ENDPOINT: &str = "http://localhost:4317";
108
109/// Service name environment variable
110const OTEL_SERVICE_NAME_ENV: &str = "OTEL_SERVICE_NAME";
111
112/// Default service name
113const DEFAULT_OTEL_SERVICE_NAME: &str = "dynamo";
114
115/// Once instance to ensure the logger is only initialized once
116static INIT: Once = Once::new();
117
118#[derive(Serialize, Deserialize, Debug)]
119struct LoggingConfig {
120    log_level: String,
121    log_filters: HashMap<String, String>,
122}
123impl Default for LoggingConfig {
124    fn default() -> Self {
125        LoggingConfig {
126            log_level: DEFAULT_FILTER_LEVEL.to_string(),
127            log_filters: HashMap::from([
128                ("h2".to_string(), "error".to_string()),
129                ("tower".to_string(), "error".to_string()),
130                ("hyper_util".to_string(), "error".to_string()),
131                ("neli".to_string(), "error".to_string()),
132                ("async_nats".to_string(), "error".to_string()),
133                ("rustls".to_string(), "error".to_string()),
134                ("tokenizers".to_string(), "error".to_string()),
135                ("axum".to_string(), "error".to_string()),
136                ("tonic".to_string(), "error".to_string()),
137                ("mistralrs_core".to_string(), "error".to_string()),
138                ("hf_hub".to_string(), "error".to_string()),
139                ("opentelemetry".to_string(), "error".to_string()),
140                ("opentelemetry-otlp".to_string(), "error".to_string()),
141                ("opentelemetry_sdk".to_string(), "error".to_string()),
142            ]),
143        }
144    }
145}
146
147/// Check if OTLP trace exporting is enabled (set OTEL_EXPORT_ENABLED to a truthy value: 1, true, on, yes)
148fn otlp_exporter_enabled() -> bool {
149    crate::config::env_is_truthy(OTEL_EXPORT_ENABLED_ENV)
150}
151
152/// Get the service name from environment or use default
153fn get_service_name() -> String {
154    std::env::var(OTEL_SERVICE_NAME_ENV).unwrap_or_else(|_| DEFAULT_OTEL_SERVICE_NAME.to_string())
155}
156
157/// Validate a given trace ID according to W3C Trace Context specifications.
158/// A valid trace ID is a 32-character hexadecimal string (lowercase).
159pub fn is_valid_trace_id(trace_id: &str) -> bool {
160    trace_id.len() == 32 && trace_id.chars().all(|c| c.is_ascii_hexdigit())
161}
162
163/// Validate a given span ID according to W3C Trace Context specifications.
164/// A valid span ID is a 16-character hexadecimal string (lowercase).
165pub fn is_valid_span_id(span_id: &str) -> bool {
166    span_id.len() == 16 && span_id.chars().all(|c| c.is_ascii_hexdigit())
167}
168
169pub struct DistributedTraceIdLayer;
170
171#[derive(Debug, Clone, Serialize, Deserialize)]
172pub struct DistributedTraceContext {
173    pub trace_id: String,
174    pub span_id: String,
175    #[serde(skip_serializing_if = "Option::is_none")]
176    pub parent_id: Option<String>,
177    #[serde(skip_serializing_if = "Option::is_none")]
178    pub tracestate: Option<String>,
179    #[serde(skip)]
180    start: Option<Instant>,
181    #[serde(skip)]
182    end: Option<Instant>,
183    #[serde(skip_serializing_if = "Option::is_none")]
184    pub x_request_id: Option<String>,
185    #[serde(skip_serializing_if = "Option::is_none")]
186    pub x_dynamo_request_id: Option<String>,
187}
188
189/// Pending context data collected in on_new_span, to be finalized in on_enter
190#[derive(Debug, Clone)]
191struct PendingDistributedTraceContext {
192    trace_id: Option<String>,
193    span_id: Option<String>,
194    parent_id: Option<String>,
195    tracestate: Option<String>,
196    x_request_id: Option<String>,
197    x_dynamo_request_id: Option<String>,
198}
199
200impl DistributedTraceContext {
201    /// Create a traceparent string from the context
202    pub fn create_traceparent(&self) -> String {
203        format!("00-{}-{}-01", self.trace_id, self.span_id)
204    }
205}
206
207/// Parse a traceparent string into its components
208pub fn parse_traceparent(traceparent: &str) -> (Option<String>, Option<String>) {
209    let pieces: Vec<_> = traceparent.split('-').collect();
210    if pieces.len() != 4 {
211        return (None, None);
212    }
213    let trace_id = pieces[1];
214    let parent_id = pieces[2];
215
216    if !is_valid_trace_id(trace_id) || !is_valid_span_id(parent_id) {
217        return (None, None);
218    }
219
220    (Some(trace_id.to_string()), Some(parent_id.to_string()))
221}
222
223#[derive(Debug, Clone, Default)]
224pub struct TraceParent {
225    pub trace_id: Option<String>,
226    pub parent_id: Option<String>,
227    pub tracestate: Option<String>,
228    pub x_request_id: Option<String>,
229    pub x_dynamo_request_id: Option<String>,
230}
231
232pub trait GenericHeaders {
233    fn get(&self, key: &str) -> Option<&str>;
234}
235
236impl GenericHeaders for async_nats::HeaderMap {
237    fn get(&self, key: &str) -> Option<&str> {
238        async_nats::HeaderMap::get(self, key).map(|value| value.as_str())
239    }
240}
241
242impl GenericHeaders for http::HeaderMap {
243    fn get(&self, key: &str) -> Option<&str> {
244        http::HeaderMap::get(self, key).and_then(|value| value.to_str().ok())
245    }
246}
247
248impl TraceParent {
249    pub fn from_headers<H: GenericHeaders>(headers: &H) -> TraceParent {
250        let mut trace_id = None;
251        let mut parent_id = None;
252        let mut tracestate = None;
253        let mut x_request_id = None;
254        let mut x_dynamo_request_id = None;
255
256        if let Some(header_value) = headers.get("traceparent") {
257            (trace_id, parent_id) = parse_traceparent(header_value);
258        }
259
260        if let Some(header_value) = headers.get("x-request-id") {
261            x_request_id = Some(header_value.to_string());
262        }
263
264        if let Some(header_value) = headers.get("tracestate") {
265            tracestate = Some(header_value.to_string());
266        }
267
268        if let Some(header_value) = headers.get("x-dynamo-request-id") {
269            x_dynamo_request_id = Some(header_value.to_string());
270        }
271
272        // Validate UUID format
273        let x_dynamo_request_id =
274            x_dynamo_request_id.filter(|id| uuid::Uuid::parse_str(id).is_ok());
275        TraceParent {
276            trace_id,
277            parent_id,
278            tracestate,
279            x_request_id,
280            x_dynamo_request_id,
281        }
282    }
283}
284
285// Takes Axum request and returning a span
286pub fn make_request_span<B>(req: &Request<B>) -> Span {
287    let method = req.method();
288    let uri = req.uri();
289    let version = format!("{:?}", req.version());
290    let trace_parent = TraceParent::from_headers(req.headers());
291
292    let span = tracing::info_span!(
293        "http-request",
294        method = %method,
295        uri = %uri,
296        version = %version,
297        trace_id = trace_parent.trace_id,
298        parent_id = trace_parent.parent_id,
299        x_request_id = trace_parent.x_request_id,
300    x_dynamo_request_id = trace_parent.x_dynamo_request_id,
301    );
302
303    span
304}
305
306/// Create a handle_payload span from NATS headers with component context
307pub fn make_handle_payload_span(
308    headers: &async_nats::HeaderMap,
309    component: &str,
310    endpoint: &str,
311    namespace: &str,
312    instance_id: u64,
313) -> Span {
314    let (otel_context, trace_id, parent_span_id) = extract_otel_context_from_nats_headers(headers);
315    let trace_parent = TraceParent::from_headers(headers);
316
317    if let (Some(trace_id), Some(parent_id)) = (trace_id.as_ref(), parent_span_id.as_ref()) {
318        let span = tracing::info_span!(
319            "handle_payload",
320            trace_id = trace_id.as_str(),
321            parent_id = parent_id.as_str(),
322            x_request_id = trace_parent.x_request_id,
323            x_dynamo_request_id = trace_parent.x_dynamo_request_id,
324            tracestate = trace_parent.tracestate,
325            component = component,
326            endpoint = endpoint,
327            namespace = namespace,
328            instance_id = instance_id,
329        );
330
331        if let Some(context) = otel_context {
332            let _ = span.set_parent(context);
333        }
334        span
335    } else {
336        tracing::info_span!(
337            "handle_payload",
338            x_request_id = trace_parent.x_request_id,
339            x_dynamo_request_id = trace_parent.x_dynamo_request_id,
340            tracestate = trace_parent.tracestate,
341            component = component,
342            endpoint = endpoint,
343            namespace = namespace,
344            instance_id = instance_id,
345        )
346    }
347}
348
349/// Extract OpenTelemetry trace context from NATS headers for distributed tracing
350pub fn extract_otel_context_from_nats_headers(
351    headers: &async_nats::HeaderMap,
352) -> (
353    Option<opentelemetry::Context>,
354    Option<String>,
355    Option<String>,
356) {
357    let traceparent_value = match headers.get("traceparent") {
358        Some(value) => value.as_str(),
359        None => return (None, None, None),
360    };
361
362    let (trace_id, parent_span_id) = parse_traceparent(traceparent_value);
363
364    struct NatsHeaderExtractor<'a>(&'a async_nats::HeaderMap);
365
366    impl<'a> Extractor for NatsHeaderExtractor<'a> {
367        fn get(&self, key: &str) -> Option<&str> {
368            self.0.get(key).map(|value| value.as_str())
369        }
370
371        fn keys(&self) -> Vec<&str> {
372            vec!["traceparent", "tracestate"]
373                .into_iter()
374                .filter(|&key| self.0.get(key).is_some())
375                .collect()
376        }
377    }
378
379    let extractor = NatsHeaderExtractor(headers);
380    let propagator = opentelemetry_sdk::propagation::TraceContextPropagator::new();
381    let otel_context = propagator.extract(&extractor);
382
383    let context_with_trace = if otel_context.span().span_context().is_valid() {
384        Some(otel_context)
385    } else {
386        None
387    };
388
389    (context_with_trace, trace_id, parent_span_id)
390}
391
392/// Inject OpenTelemetry trace context into NATS headers using W3C Trace Context propagation
393pub fn inject_otel_context_into_nats_headers(
394    headers: &mut async_nats::HeaderMap,
395    context: Option<opentelemetry::Context>,
396) {
397    let otel_context = context.unwrap_or_else(|| Span::current().context());
398
399    struct NatsHeaderInjector<'a>(&'a mut async_nats::HeaderMap);
400
401    impl<'a> Injector for NatsHeaderInjector<'a> {
402        fn set(&mut self, key: &str, value: String) {
403            self.0.insert(key, value);
404        }
405    }
406
407    let mut injector = NatsHeaderInjector(headers);
408    let propagator = opentelemetry_sdk::propagation::TraceContextPropagator::new();
409    propagator.inject_context(&otel_context, &mut injector);
410}
411
412/// Inject trace context from current span into NATS headers
413pub fn inject_current_trace_into_nats_headers(headers: &mut async_nats::HeaderMap) {
414    inject_otel_context_into_nats_headers(headers, None);
415}
416
417// Inject trace headers into a generic HashMap for HTTP/TCP transports
418pub fn inject_trace_headers_into_map(headers: &mut std::collections::HashMap<String, String>) {
419    if let Some(trace_context) = get_distributed_tracing_context() {
420        // Inject W3C traceparent header
421        headers.insert(
422            "traceparent".to_string(),
423            trace_context.create_traceparent(),
424        );
425
426        // Inject optional tracestate
427        if let Some(tracestate) = trace_context.tracestate {
428            headers.insert("tracestate".to_string(), tracestate);
429        }
430
431        // Inject custom request IDs
432        if let Some(x_request_id) = trace_context.x_request_id {
433            headers.insert("x-request-id".to_string(), x_request_id);
434        }
435        if let Some(x_dynamo_request_id) = trace_context.x_dynamo_request_id {
436            headers.insert("x-dynamo-request-id".to_string(), x_dynamo_request_id);
437        }
438    }
439}
440
441/// Create a client_request span linked to the parent trace context
442pub fn make_client_request_span(
443    operation: &str,
444    request_id: &str,
445    trace_context: Option<&DistributedTraceContext>,
446    instance_id: Option<&str>,
447) -> Span {
448    if let Some(ctx) = trace_context {
449        let mut headers = async_nats::HeaderMap::new();
450        headers.insert("traceparent", ctx.create_traceparent());
451
452        if let Some(ref tracestate) = ctx.tracestate {
453            headers.insert("tracestate", tracestate.as_str());
454        }
455
456        let (otel_context, _extracted_trace_id, _extracted_parent_span_id) =
457            extract_otel_context_from_nats_headers(&headers);
458
459        let span = if let Some(inst_id) = instance_id {
460            tracing::info_span!(
461                "client_request",
462                operation = operation,
463                request_id = request_id,
464                instance_id = inst_id,
465                trace_id = ctx.trace_id.as_str(),
466                parent_id = ctx.span_id.as_str(),
467                x_request_id = ctx.x_request_id.as_deref(),
468                x_dynamo_request_id = ctx.x_dynamo_request_id.as_deref(),
469                // tracestate = ctx.tracestate.as_deref(),
470            )
471        } else {
472            tracing::info_span!(
473                "client_request",
474                operation = operation,
475                request_id = request_id,
476                trace_id = ctx.trace_id.as_str(),
477                parent_id = ctx.span_id.as_str(),
478                x_request_id = ctx.x_request_id.as_deref(),
479                x_dynamo_request_id = ctx.x_dynamo_request_id.as_deref(),
480                // tracestate = ctx.tracestate.as_deref(),
481            )
482        };
483
484        if let Some(context) = otel_context {
485            let _ = span.set_parent(context);
486        }
487
488        span
489    } else if let Some(inst_id) = instance_id {
490        tracing::info_span!(
491            "client_request",
492            operation = operation,
493            request_id = request_id,
494            instance_id = inst_id,
495        )
496    } else {
497        tracing::info_span!(
498            "client_request",
499            operation = operation,
500            request_id = request_id,
501        )
502    }
503}
504
505#[derive(Debug, Default)]
506pub struct FieldVisitor {
507    pub fields: HashMap<String, String>,
508}
509
510impl Visit for FieldVisitor {
511    fn record_str(&mut self, field: &Field, value: &str) {
512        self.fields
513            .insert(field.name().to_string(), value.to_string());
514    }
515
516    fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
517        self.fields
518            .insert(field.name().to_string(), format!("{:?}", value).to_string());
519    }
520}
521
522impl<S> Layer<S> for DistributedTraceIdLayer
523where
524    S: Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>,
525{
526    // Capture close span time
527    // Currently not used but added for future use in timing
528    fn on_close(&self, id: Id, ctx: Context<'_, S>) {
529        if let Some(span) = ctx.span(&id) {
530            let mut extensions = span.extensions_mut();
531            if let Some(distributed_tracing_context) =
532                extensions.get_mut::<DistributedTraceContext>()
533            {
534                distributed_tracing_context.end = Some(Instant::now());
535            }
536        }
537    }
538
539    // Collects span attributes and metadata in on_new_span
540    // Final initialization deferred to on_enter when OtelData is available
541    fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &Id, ctx: Context<'_, S>) {
542        if let Some(span) = ctx.span(id) {
543            let mut trace_id: Option<String> = None;
544            let mut parent_id: Option<String> = None;
545            let mut span_id: Option<String> = None;
546            let mut x_request_id: Option<String> = None;
547            let mut x_dynamo_request_id: Option<String> = None;
548            let mut tracestate: Option<String> = None;
549            let mut visitor = FieldVisitor::default();
550            attrs.record(&mut visitor);
551
552            // Extract trace_id from span attributes
553            if let Some(trace_id_input) = visitor.fields.get("trace_id") {
554                if !is_valid_trace_id(trace_id_input) {
555                    tracing::trace!("trace id  '{}' is not valid! Ignoring.", trace_id_input);
556                } else {
557                    trace_id = Some(trace_id_input.to_string());
558                }
559            }
560
561            // Extract span_id from span attributes
562            if let Some(span_id_input) = visitor.fields.get("span_id") {
563                if !is_valid_span_id(span_id_input) {
564                    tracing::trace!("span id  '{}' is not valid! Ignoring.", span_id_input);
565                } else {
566                    span_id = Some(span_id_input.to_string());
567                }
568            }
569
570            // Extract parent_id from span attributes
571            if let Some(parent_id_input) = visitor.fields.get("parent_id") {
572                if !is_valid_span_id(parent_id_input) {
573                    tracing::trace!("parent id  '{}' is not valid! Ignoring.", parent_id_input);
574                } else {
575                    parent_id = Some(parent_id_input.to_string());
576                }
577            }
578
579            // Extract tracestate
580            if let Some(tracestate_input) = visitor.fields.get("tracestate") {
581                tracestate = Some(tracestate_input.to_string());
582            }
583
584            // Extract x_request_id
585            if let Some(x_request_id_input) = visitor.fields.get("x_request_id") {
586                x_request_id = Some(x_request_id_input.to_string());
587            }
588
589            // Extract x_dynamo_request_id
590            if let Some(x_request_id_input) = visitor.fields.get("x_dynamo_request_id") {
591                x_dynamo_request_id = Some(x_request_id_input.to_string());
592            }
593
594            // Inherit trace context from parent span if available
595            if parent_id.is_none()
596                && let Some(parent_span_id) = ctx.current_span().id()
597                && let Some(parent_span) = ctx.span(parent_span_id)
598            {
599                let parent_ext = parent_span.extensions();
600                if let Some(parent_tracing_context) = parent_ext.get::<DistributedTraceContext>() {
601                    trace_id = Some(parent_tracing_context.trace_id.clone());
602                    parent_id = Some(parent_tracing_context.span_id.clone());
603                    tracestate = parent_tracing_context.tracestate.clone();
604                }
605            }
606
607            // Validate consistency
608            if (parent_id.is_some() || span_id.is_some()) && trace_id.is_none() {
609                tracing::error!("parent id or span id are set but trace id is not set!");
610                // Clear inconsistent IDs to maintain trace integrity
611                parent_id = None;
612                span_id = None;
613            }
614
615            // Store pending context - will be finalized in on_enter
616            let mut extensions = span.extensions_mut();
617            extensions.insert(PendingDistributedTraceContext {
618                trace_id,
619                span_id,
620                parent_id,
621                tracestate,
622                x_request_id,
623                x_dynamo_request_id,
624            });
625        }
626    }
627
628    // Finalizes the DistributedTraceContext when span is entered
629    // At this point, OtelData should have valid trace_id and span_id
630    fn on_enter(&self, id: &Id, ctx: Context<'_, S>) {
631        if let Some(span) = ctx.span(id) {
632            // Check if already initialized (e.g., span re-entered)
633            {
634                let extensions = span.extensions();
635                if extensions.get::<DistributedTraceContext>().is_some() {
636                    return;
637                }
638            }
639
640            // Get the pending context and extract OtelData IDs
641            let mut extensions = span.extensions_mut();
642            let pending = match extensions.remove::<PendingDistributedTraceContext>() {
643                Some(p) => p,
644                None => {
645                    // This shouldn't happen - on_new_span should have created it
646                    tracing::error!("PendingDistributedTraceContext not found in on_enter");
647                    return;
648                }
649            };
650
651            let mut trace_id = pending.trace_id;
652            let mut span_id = pending.span_id;
653            let parent_id = pending.parent_id;
654            let tracestate = pending.tracestate;
655            let x_request_id = pending.x_request_id;
656            let x_dynamo_request_id = pending.x_dynamo_request_id;
657
658            // Try to extract from OtelData if not already set
659            // Need to drop extensions_mut to get immutable borrow for OtelData
660            drop(extensions);
661
662            if trace_id.is_none() || span_id.is_none() {
663                let extensions = span.extensions();
664                if let Some(otel_data) = extensions.get::<tracing_opentelemetry::OtelData>() {
665                    // Extract trace_id from OTEL data if not already set
666                    if trace_id.is_none()
667                        && let Some(otel_trace_id) = otel_data.trace_id()
668                    {
669                        let trace_id_str = format!("{}", otel_trace_id);
670                        if is_valid_trace_id(&trace_id_str) {
671                            trace_id = Some(trace_id_str);
672                        }
673                    }
674
675                    // Extract span_id from OTEL data if not already set
676                    if span_id.is_none()
677                        && let Some(otel_span_id) = otel_data.span_id()
678                    {
679                        let span_id_str = format!("{}", otel_span_id);
680                        if is_valid_span_id(&span_id_str) {
681                            span_id = Some(span_id_str);
682                        }
683                    }
684                }
685            }
686
687            // Panic if we still don't have required IDs
688            if trace_id.is_none() {
689                panic!(
690                    "trace_id is not set in on_enter - OtelData may not be properly initialized"
691                );
692            }
693
694            if span_id.is_none() {
695                panic!("span_id is not set in on_enter - OtelData may not be properly initialized");
696            }
697
698            // Re-acquire mutable borrow to insert the finalized context
699            let mut extensions = span.extensions_mut();
700            extensions.insert(DistributedTraceContext {
701                trace_id: trace_id.expect("Trace ID must be set"),
702                span_id: span_id.expect("Span ID must be set"),
703                parent_id,
704                tracestate,
705                start: Some(Instant::now()),
706                end: None,
707                x_request_id,
708                x_dynamo_request_id,
709            });
710        }
711    }
712}
713
714// Enables functions to retreive their current
715// context for adding to distributed headers
716pub fn get_distributed_tracing_context() -> Option<DistributedTraceContext> {
717    Span::current()
718        .with_subscriber(|(id, subscriber)| {
719            subscriber
720                .downcast_ref::<Registry>()
721                .and_then(|registry| registry.span_data(id))
722                .and_then(|span_data| {
723                    let extensions = span_data.extensions();
724                    extensions.get::<DistributedTraceContext>().cloned()
725                })
726        })
727        .flatten()
728}
729
730/// Initialize the logger - must be called when Tokio runtime is available
731pub fn init() {
732    INIT.call_once(|| {
733        if let Err(e) = setup_logging() {
734            eprintln!("Failed to initialize logging: {}", e);
735            std::process::exit(1);
736        }
737    });
738}
739
740#[cfg(feature = "tokio-console")]
741fn setup_logging() {
742    let tokio_console_layer = console_subscriber::ConsoleLayer::builder()
743        .with_default_env()
744        .server_addr(([0, 0, 0, 0], console_subscriber::Server::DEFAULT_PORT))
745        .spawn();
746    let tokio_console_target = tracing_subscriber::filter::Targets::new()
747        .with_default(LevelFilter::ERROR)
748        .with_target("runtime", LevelFilter::TRACE)
749        .with_target("tokio", LevelFilter::TRACE);
750    let l = fmt::layer()
751        .with_ansi(!disable_ansi_logging())
752        .event_format(fmt::format().compact().with_timer(TimeFormatter::new()))
753        .with_writer(std::io::stderr)
754        .with_filter(filters(load_config()));
755    tracing_subscriber::registry()
756        .with(l)
757        .with(tokio_console_layer.with_filter(tokio_console_target))
758        .init();
759}
760
761#[cfg(not(feature = "tokio-console"))]
762fn setup_logging() -> Result<(), Box<dyn std::error::Error>> {
763    let fmt_filter_layer = filters(load_config());
764    let trace_filter_layer = filters(load_config());
765    let otel_filter_layer = filters(load_config());
766
767    if jsonl_logging_enabled() {
768        let l = fmt::layer()
769            .with_ansi(false)
770            .event_format(CustomJsonFormatter::new())
771            .with_writer(std::io::stderr)
772            .with_filter(fmt_filter_layer);
773
774        // Create OpenTelemetry tracer - conditionally export to OTLP based on env var
775        let service_name = get_service_name();
776
777        // Build tracer provider - with or without OTLP export
778        let (tracer_provider, endpoint_opt) = if otlp_exporter_enabled() {
779            // Export enabled: create OTLP exporter with batch processor
780            let endpoint = std::env::var(OTEL_EXPORT_ENDPOINT_ENV)
781                .unwrap_or_else(|_| DEFAULT_OTLP_ENDPOINT.to_string());
782
783            // Initialize OTLP exporter using gRPC (Tonic)
784            let otlp_exporter = opentelemetry_otlp::SpanExporter::builder()
785                .with_tonic()
786                .with_endpoint(&endpoint)
787                .build()?;
788
789            // Create tracer provider with batch exporter and service name
790            let provider = opentelemetry_sdk::trace::SdkTracerProvider::builder()
791                .with_batch_exporter(otlp_exporter)
792                .with_resource(
793                    opentelemetry_sdk::Resource::builder_empty()
794                        .with_service_name(service_name.clone())
795                        .build(),
796                )
797                .build();
798
799            (provider, Some(endpoint))
800        } else {
801            // No export - traces generated locally only (for logging/trace IDs)
802            let provider = opentelemetry_sdk::trace::SdkTracerProvider::builder()
803                .with_resource(
804                    opentelemetry_sdk::Resource::builder_empty()
805                        .with_service_name(service_name.clone())
806                        .build(),
807                )
808                .build();
809
810            (provider, None)
811        };
812
813        // Get a tracer from the provider
814        let tracer = tracer_provider.tracer(service_name.clone());
815
816        tracing_subscriber::registry()
817            .with(
818                tracing_opentelemetry::layer()
819                    .with_tracer(tracer)
820                    .with_filter(otel_filter_layer),
821            )
822            .with(DistributedTraceIdLayer.with_filter(trace_filter_layer))
823            .with(l)
824            .init();
825
826        // Log initialization status after subscriber is ready
827        if let Some(endpoint) = endpoint_opt {
828            tracing::info!(
829                endpoint = %endpoint,
830                service = %service_name,
831                "OpenTelemetry OTLP export enabled"
832            );
833        } else {
834            tracing::info!(
835                service = %service_name,
836                "OpenTelemetry OTLP export disabled, traces local only"
837            );
838        }
839    } else {
840        let l = fmt::layer()
841            .with_ansi(!disable_ansi_logging())
842            .event_format(fmt::format().compact().with_timer(TimeFormatter::new()))
843            .with_writer(std::io::stderr)
844            .with_filter(fmt_filter_layer);
845
846        tracing_subscriber::registry().with(l).init();
847    }
848
849    Ok(())
850}
851
852fn filters(config: LoggingConfig) -> EnvFilter {
853    let mut filter_layer = EnvFilter::builder()
854        .with_default_directive(config.log_level.parse().unwrap())
855        .with_env_var(FILTER_ENV)
856        .from_env_lossy();
857
858    for (module, level) in config.log_filters {
859        match format!("{module}={level}").parse::<Directive>() {
860            Ok(d) => {
861                filter_layer = filter_layer.add_directive(d);
862            }
863            Err(e) => {
864                eprintln!("Failed parsing filter '{level}' for module '{module}': {e}");
865            }
866        }
867    }
868    filter_layer
869}
870
871/// Log a message with file and line info
872/// Used by Python wrapper
873pub fn log_message(level: &str, message: &str, module: &str, file: &str, line: u32) {
874    let level = match level {
875        "debug" => log::Level::Debug,
876        "info" => log::Level::Info,
877        "warn" => log::Level::Warn,
878        "error" => log::Level::Error,
879        "warning" => log::Level::Warn,
880        _ => log::Level::Info,
881    };
882    log::logger().log(
883        &log::Record::builder()
884            .args(format_args!("{}", message))
885            .level(level)
886            .target(module)
887            .file(Some(file))
888            .line(Some(line))
889            .build(),
890    );
891}
892
893fn load_config() -> LoggingConfig {
894    let config_path = std::env::var(CONFIG_PATH_ENV).unwrap_or_else(|_| "".to_string());
895    let figment = Figment::new()
896        .merge(Serialized::defaults(LoggingConfig::default()))
897        .merge(Toml::file("/opt/dynamo/etc/logging.toml"))
898        .merge(Toml::file(config_path));
899
900    figment.extract().unwrap()
901}
902
903#[derive(Serialize)]
904struct JsonLog<'a> {
905    time: String,
906    level: String,
907    #[serde(skip_serializing_if = "Option::is_none")]
908    file: Option<&'a str>,
909    #[serde(skip_serializing_if = "Option::is_none")]
910    line: Option<u32>,
911    target: &'a str,
912    message: serde_json::Value,
913    #[serde(flatten)]
914    fields: BTreeMap<String, serde_json::Value>,
915}
916
917struct TimeFormatter {
918    use_local_tz: bool,
919}
920
921impl TimeFormatter {
922    fn new() -> Self {
923        Self {
924            use_local_tz: crate::config::use_local_timezone(),
925        }
926    }
927
928    fn format_now(&self) -> String {
929        if self.use_local_tz {
930            chrono::Local::now()
931                .format("%Y-%m-%dT%H:%M:%S%.6f%:z")
932                .to_string()
933        } else {
934            chrono::Utc::now()
935                .format("%Y-%m-%dT%H:%M:%S%.6fZ")
936                .to_string()
937        }
938    }
939}
940
941impl FormatTime for TimeFormatter {
942    fn format_time(&self, w: &mut fmt::format::Writer<'_>) -> std::fmt::Result {
943        write!(w, "{}", self.format_now())
944    }
945}
946
947struct CustomJsonFormatter {
948    time_formatter: TimeFormatter,
949}
950
951impl CustomJsonFormatter {
952    fn new() -> Self {
953        Self {
954            time_formatter: TimeFormatter::new(),
955        }
956    }
957}
958
959use once_cell::sync::Lazy;
960use regex::Regex;
961fn parse_tracing_duration(s: &str) -> Option<u64> {
962    static RE: Lazy<Regex> =
963        Lazy::new(|| Regex::new(r#"^["']?\s*([0-9.]+)\s*(µs|us|ns|ms|s)\s*["']?$"#).unwrap());
964    let captures = RE.captures(s)?;
965    let value: f64 = captures[1].parse().ok()?;
966    let unit = &captures[2];
967    match unit {
968        "ns" => Some((value / 1000.0) as u64),
969        "µs" | "us" => Some(value as u64),
970        "ms" => Some((value * 1000.0) as u64),
971        "s" => Some((value * 1_000_000.0) as u64),
972        _ => None,
973    }
974}
975
976impl<S, N> tracing_subscriber::fmt::FormatEvent<S, N> for CustomJsonFormatter
977where
978    S: Subscriber + for<'a> LookupSpan<'a>,
979    N: for<'a> FormatFields<'a> + 'static,
980{
981    fn format_event(
982        &self,
983        ctx: &FmtContext<'_, S, N>,
984        mut writer: Writer<'_>,
985        event: &Event<'_>,
986    ) -> std::fmt::Result {
987        let mut visitor = JsonVisitor::default();
988        let time = self.time_formatter.format_now();
989        event.record(&mut visitor);
990        let mut message = visitor
991            .fields
992            .remove("message")
993            .unwrap_or(serde_json::Value::String("".to_string()));
994
995        let current_span = event
996            .parent()
997            .and_then(|id| ctx.span(id))
998            .or_else(|| ctx.lookup_current());
999        if let Some(span) = current_span {
1000            let ext = span.extensions();
1001            let data = ext.get::<FormattedFields<N>>().unwrap();
1002            let span_fields: Vec<(&str, &str)> = data
1003                .fields
1004                .split(' ')
1005                .filter_map(|entry| entry.split_once('='))
1006                .collect();
1007            for (name, value) in span_fields {
1008                visitor.fields.insert(
1009                    name.to_string(),
1010                    serde_json::Value::String(value.trim_matches('"').to_string()),
1011                );
1012            }
1013
1014            let busy_us = visitor
1015                .fields
1016                .remove("time.busy")
1017                .and_then(|v| parse_tracing_duration(&v.to_string()));
1018            let idle_us = visitor
1019                .fields
1020                .remove("time.idle")
1021                .and_then(|v| parse_tracing_duration(&v.to_string()));
1022
1023            if let (Some(busy_us), Some(idle_us)) = (busy_us, idle_us) {
1024                visitor.fields.insert(
1025                    "time.busy_us".to_string(),
1026                    serde_json::Value::Number(busy_us.into()),
1027                );
1028                visitor.fields.insert(
1029                    "time.idle_us".to_string(),
1030                    serde_json::Value::Number(idle_us.into()),
1031                );
1032                visitor.fields.insert(
1033                    "time.duration_us".to_string(),
1034                    serde_json::Value::Number((busy_us + idle_us).into()),
1035                );
1036            }
1037
1038            message = match message.as_str() {
1039                Some("new") => serde_json::Value::String("SPAN_CREATED".to_string()),
1040                Some("close") => serde_json::Value::String("SPAN_CLOSED".to_string()),
1041                _ => message.clone(),
1042            };
1043
1044            visitor.fields.insert(
1045                "span_name".to_string(),
1046                serde_json::Value::String(span.name().to_string()),
1047            );
1048
1049            if let Some(tracing_context) = ext.get::<DistributedTraceContext>() {
1050                visitor.fields.insert(
1051                    "span_id".to_string(),
1052                    serde_json::Value::String(tracing_context.span_id.clone()),
1053                );
1054                visitor.fields.insert(
1055                    "trace_id".to_string(),
1056                    serde_json::Value::String(tracing_context.trace_id.clone()),
1057                );
1058                if let Some(parent_id) = tracing_context.parent_id.clone() {
1059                    visitor.fields.insert(
1060                        "parent_id".to_string(),
1061                        serde_json::Value::String(parent_id),
1062                    );
1063                } else {
1064                    visitor.fields.remove("parent_id");
1065                }
1066                if let Some(tracestate) = tracing_context.tracestate.clone() {
1067                    visitor.fields.insert(
1068                        "tracestate".to_string(),
1069                        serde_json::Value::String(tracestate),
1070                    );
1071                } else {
1072                    visitor.fields.remove("tracestate");
1073                }
1074                if let Some(x_request_id) = tracing_context.x_request_id.clone() {
1075                    visitor.fields.insert(
1076                        "x_request_id".to_string(),
1077                        serde_json::Value::String(x_request_id),
1078                    );
1079                } else {
1080                    visitor.fields.remove("x_request_id");
1081                }
1082
1083                if let Some(x_dynamo_request_id) = tracing_context.x_dynamo_request_id.clone() {
1084                    visitor.fields.insert(
1085                        "x_dynamo_request_id".to_string(),
1086                        serde_json::Value::String(x_dynamo_request_id),
1087                    );
1088                } else {
1089                    visitor.fields.remove("x_dynamo_request_id");
1090                }
1091            } else {
1092                tracing::error!(
1093                    "Distributed Trace Context not found, falling back to internal ids"
1094                );
1095                visitor.fields.insert(
1096                    "span_id".to_string(),
1097                    serde_json::Value::String(span.id().into_u64().to_string()),
1098                );
1099                if let Some(parent) = span.parent() {
1100                    visitor.fields.insert(
1101                        "parent_id".to_string(),
1102                        serde_json::Value::String(parent.id().into_u64().to_string()),
1103                    );
1104                }
1105            }
1106        } else {
1107            let reserved_fields = [
1108                "trace_id",
1109                "span_id",
1110                "parent_id",
1111                "span_name",
1112                "tracestate",
1113            ];
1114            for reserved_field in reserved_fields {
1115                visitor.fields.remove(reserved_field);
1116            }
1117        }
1118        let metadata = event.metadata();
1119        let log = JsonLog {
1120            level: metadata.level().to_string(),
1121            time,
1122            file: metadata.file(),
1123            line: metadata.line(),
1124            target: metadata.target(),
1125            message,
1126            fields: visitor.fields,
1127        };
1128        let json = serde_json::to_string(&log).unwrap();
1129        writeln!(writer, "{json}")
1130    }
1131}
1132
1133#[derive(Default)]
1134struct JsonVisitor {
1135    fields: BTreeMap<String, serde_json::Value>,
1136}
1137
1138impl tracing::field::Visit for JsonVisitor {
1139    fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
1140        self.fields.insert(
1141            field.name().to_string(),
1142            serde_json::Value::String(format!("{value:?}")),
1143        );
1144    }
1145
1146    fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
1147        if field.name() != "message" {
1148            match serde_json::from_str::<Value>(value) {
1149                Ok(json_val) => self.fields.insert(field.name().to_string(), json_val),
1150                Err(_) => self.fields.insert(field.name().to_string(), value.into()),
1151            };
1152        } else {
1153            self.fields.insert(field.name().to_string(), value.into());
1154        }
1155    }
1156
1157    fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
1158        self.fields
1159            .insert(field.name().to_string(), serde_json::Value::Bool(value));
1160    }
1161
1162    fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
1163        self.fields.insert(
1164            field.name().to_string(),
1165            serde_json::Value::Number(value.into()),
1166        );
1167    }
1168
1169    fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
1170        self.fields.insert(
1171            field.name().to_string(),
1172            serde_json::Value::Number(value.into()),
1173        );
1174    }
1175
1176    fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
1177        use serde_json::value::Number;
1178        self.fields.insert(
1179            field.name().to_string(),
1180            serde_json::Value::Number(Number::from_f64(value).unwrap_or(0.into())),
1181        );
1182    }
1183}
1184
1185#[cfg(test)]
1186pub mod tests {
1187    use super::*;
1188    use anyhow::{Result, anyhow};
1189    use chrono::{DateTime, Utc};
1190    use jsonschema::{Draft, JSONSchema};
1191    use serde_json::Value;
1192    use std::fs::File;
1193    use std::io::{BufRead, BufReader};
1194    use stdio_override::*;
1195    use tempfile::NamedTempFile;
1196
1197    static LOG_LINE_SCHEMA: &str = r#"
1198    {
1199      "$schema": "http://json-schema.org/draft-07/schema#",
1200      "title": "Runtime Log Line",
1201      "type": "object",
1202      "required": [
1203        "file",
1204        "level",
1205        "line",
1206        "message",
1207        "target",
1208        "time"
1209      ],
1210      "properties": {
1211        "file":      { "type": "string" },
1212        "level":     { "type": "string", "enum": ["ERROR", "WARN", "INFO", "DEBUG", "TRACE"] },
1213        "line":      { "type": "integer" },
1214        "message":   { "type": "string" },
1215        "target":    { "type": "string" },
1216        "time":      { "type": "string", "format": "date-time" },
1217        "span_id":   { "type": "string", "pattern": "^[a-f0-9]{16}$" },
1218        "parent_id": { "type": "string", "pattern": "^[a-f0-9]{16}$" },
1219        "trace_id":  { "type": "string", "pattern": "^[a-f0-9]{32}$" },
1220        "span_name": { "type": "string" },
1221        "time.busy_us":     { "type": "integer" },
1222        "time.duration_us": { "type": "integer" },
1223        "time.idle_us":     { "type": "integer" },
1224        "tracestate": { "type": "string" }
1225      },
1226      "additionalProperties": true
1227    }
1228    "#;
1229
1230    #[tracing::instrument(skip_all)]
1231    async fn parent() {
1232        tracing::trace!(message = "parent!");
1233        if let Some(my_ctx) = get_distributed_tracing_context() {
1234            tracing::info!(my_trace_id = my_ctx.trace_id);
1235        }
1236        child().await;
1237    }
1238
1239    #[tracing::instrument(skip_all)]
1240    async fn child() {
1241        tracing::trace!(message = "child");
1242        if let Some(my_ctx) = get_distributed_tracing_context() {
1243            tracing::info!(my_trace_id = my_ctx.trace_id);
1244        }
1245        grandchild().await;
1246    }
1247
1248    #[tracing::instrument(skip_all)]
1249    async fn grandchild() {
1250        tracing::trace!(message = "grandchild");
1251        if let Some(my_ctx) = get_distributed_tracing_context() {
1252            tracing::info!(my_trace_id = my_ctx.trace_id);
1253        }
1254    }
1255
1256    pub fn load_log(file_name: &str) -> Result<Vec<serde_json::Value>> {
1257        let schema_json: Value =
1258            serde_json::from_str(LOG_LINE_SCHEMA).expect("schema parse failure");
1259        let compiled_schema = JSONSchema::options()
1260            .with_draft(Draft::Draft7)
1261            .compile(&schema_json)
1262            .expect("Invalid schema");
1263
1264        let f = File::open(file_name)?;
1265        let reader = BufReader::new(f);
1266        let mut result = Vec::new();
1267
1268        for (line_num, line) in reader.lines().enumerate() {
1269            let line = line?;
1270            let val: Value = serde_json::from_str(&line)
1271                .map_err(|e| anyhow!("Line {}: invalid JSON: {}", line_num + 1, e))?;
1272
1273            if let Err(errors) = compiled_schema.validate(&val) {
1274                let errs = errors.map(|e| e.to_string()).collect::<Vec<_>>().join("; ");
1275                return Err(anyhow!(
1276                    "Line {}: JSON Schema Validation errors: {}",
1277                    line_num + 1,
1278                    errs
1279                ));
1280            }
1281            println!("{}", val);
1282            result.push(val);
1283        }
1284        Ok(result)
1285    }
1286
1287    #[tokio::test]
1288    async fn test_json_log_capture() -> Result<()> {
1289        #[allow(clippy::redundant_closure_call)]
1290        let _ = temp_env::async_with_vars(
1291            [("DYN_LOGGING_JSONL", Some("1"))],
1292            (async || {
1293                let tmp_file = NamedTempFile::new().unwrap();
1294                let file_name = tmp_file.path().to_str().unwrap();
1295                let guard = StderrOverride::from_file(file_name)?;
1296                init();
1297                parent().await;
1298                drop(guard);
1299
1300                let lines = load_log(file_name)?;
1301
1302                // 1. Extract the dynamically generated trace ID and validate consistency
1303                // All logs should have the same trace_id since they're part of the same trace
1304                // Skip any initialization logs that don't have trace_id (e.g., OTLP setup messages)
1305                let trace_id = lines
1306                    .iter()
1307                    .find_map(|log_line| log_line.get("trace_id").and_then(|v| v.as_str()))
1308                    .expect("At least one log line should have a trace_id")
1309                    .to_string();
1310
1311                // Verify trace_id is not a zero/invalid ID
1312                assert_ne!(
1313                    trace_id, "00000000000000000000000000000000",
1314                    "trace_id should not be a zero/invalid ID"
1315                );
1316                assert!(
1317                    !trace_id.chars().all(|c| c == '0'),
1318                    "trace_id should not be all zeros"
1319                );
1320
1321                // Verify all logs have the same trace_id
1322                for log_line in &lines {
1323                    if let Some(line_trace_id) = log_line.get("trace_id") {
1324                        assert_eq!(
1325                            line_trace_id.as_str().unwrap(),
1326                            &trace_id,
1327                            "All logs should have the same trace_id"
1328                        );
1329                    }
1330                }
1331
1332                // Validate my_trace_id matches the actual trace ID
1333                for log_line in &lines {
1334                    if let Some(my_trace_id) = log_line.get("my_trace_id") {
1335                        assert_eq!(
1336                            my_trace_id,
1337                            &serde_json::Value::String(trace_id.clone()),
1338                            "my_trace_id should match the trace_id from distributed tracing context"
1339                        );
1340                    }
1341                }
1342
1343                // 2. Validate span IDs exist and are properly formatted
1344                let mut span_ids_seen: std::collections::HashSet<String> = std::collections::HashSet::new();
1345                let mut span_timestamps: std::collections::HashMap<String, DateTime<Utc>> = std::collections::HashMap::new();
1346
1347                for log_line in &lines {
1348                    if let Some(span_id) = log_line.get("span_id") {
1349                        let span_id_str = span_id.as_str().unwrap();
1350                        assert!(
1351                            is_valid_span_id(span_id_str),
1352                            "Invalid span_id format: {}",
1353                            span_id_str
1354                        );
1355                        span_ids_seen.insert(span_id_str.to_string());
1356                    }
1357
1358                    // Validate timestamp format and track span timestamps
1359                    if let Some(time_str) = log_line.get("time").and_then(|v| v.as_str()) {
1360                        let timestamp = DateTime::parse_from_rfc3339(time_str)
1361                            .expect("All timestamps should be valid RFC3339 format")
1362                            .with_timezone(&Utc);
1363
1364                        // Track timestamp for each span_name
1365                        if let Some(span_name) = log_line.get("span_name").and_then(|v| v.as_str()) {
1366                            span_timestamps.insert(span_name.to_string(), timestamp);
1367                        }
1368                    }
1369                }
1370
1371                // 3. Validate parent-child span relationships
1372                // Extract span IDs for each span by looking at their log messages
1373                let parent_span_id = lines
1374                    .iter()
1375                    .find(|log_line| {
1376                        log_line.get("span_name")
1377                            .and_then(|v| v.as_str()) == Some("parent")
1378                    })
1379                    .and_then(|log_line| {
1380                        log_line.get("span_id")
1381                            .and_then(|v| v.as_str())
1382                            .map(|s| s.to_string())
1383                    })
1384                    .expect("Should find parent span with span_id");
1385
1386                let child_span_id = lines
1387                    .iter()
1388                    .find(|log_line| {
1389                        log_line.get("span_name")
1390                            .and_then(|v| v.as_str()) == Some("child")
1391                    })
1392                    .and_then(|log_line| {
1393                        log_line.get("span_id")
1394                            .and_then(|v| v.as_str())
1395                            .map(|s| s.to_string())
1396                    })
1397                    .expect("Should find child span with span_id");
1398
1399                let grandchild_span_id = lines
1400                    .iter()
1401                    .find(|log_line| {
1402                        log_line.get("span_name")
1403                            .and_then(|v| v.as_str()) == Some("grandchild")
1404                    })
1405                    .and_then(|log_line| {
1406                        log_line.get("span_id")
1407                            .and_then(|v| v.as_str())
1408                            .map(|s| s.to_string())
1409                    })
1410                    .expect("Should find grandchild span with span_id");
1411
1412                // Verify span IDs are unique
1413                assert_ne!(parent_span_id, child_span_id, "Parent and child should have different span IDs");
1414                assert_ne!(child_span_id, grandchild_span_id, "Child and grandchild should have different span IDs");
1415                assert_ne!(parent_span_id, grandchild_span_id, "Parent and grandchild should have different span IDs");
1416
1417                // Verify parent span has no parent_id
1418                for log_line in &lines {
1419                    if let Some(span_name) = log_line.get("span_name")
1420                        && let Some(span_name_str) = span_name.as_str()
1421                        && span_name_str == "parent"
1422                    {
1423                        assert!(
1424                            log_line.get("parent_id").is_none(),
1425                            "Parent span should not have a parent_id"
1426                        );
1427                    }
1428                }
1429
1430                // Verify child span's parent_id is parent_span_id
1431                for log_line in &lines {
1432                    if let Some(span_name) = log_line.get("span_name")
1433                        && let Some(span_name_str) = span_name.as_str()
1434                        && span_name_str == "child"
1435                    {
1436                        let parent_id = log_line.get("parent_id")
1437                            .and_then(|v| v.as_str())
1438                            .expect("Child span should have a parent_id");
1439                        assert_eq!(
1440                            parent_id,
1441                            parent_span_id,
1442                            "Child's parent_id should match parent's span_id"
1443                        );
1444                    }
1445                }
1446
1447                // Verify grandchild span's parent_id is child_span_id
1448                for log_line in &lines {
1449                    if let Some(span_name) = log_line.get("span_name")
1450                        && let Some(span_name_str) = span_name.as_str()
1451                        && span_name_str == "grandchild"
1452                    {
1453                        let parent_id = log_line.get("parent_id")
1454                            .and_then(|v| v.as_str())
1455                            .expect("Grandchild span should have a parent_id");
1456                        assert_eq!(
1457                            parent_id,
1458                            child_span_id,
1459                            "Grandchild's parent_id should match child's span_id"
1460                        );
1461                    }
1462                }
1463
1464                // 4. Validate timestamp ordering - spans should log in execution order
1465                let parent_time = span_timestamps.get("parent")
1466                    .expect("Should have timestamp for parent span");
1467                let child_time = span_timestamps.get("child")
1468                    .expect("Should have timestamp for child span");
1469                let grandchild_time = span_timestamps.get("grandchild")
1470                    .expect("Should have timestamp for grandchild span");
1471
1472                // Parent logs first (or at same time), then child, then grandchild
1473                assert!(
1474                    parent_time <= child_time,
1475                    "Parent span should log before or at same time as child span (parent: {}, child: {})",
1476                    parent_time,
1477                    child_time
1478                );
1479                assert!(
1480                    child_time <= grandchild_time,
1481                    "Child span should log before or at same time as grandchild span (child: {}, grandchild: {})",
1482                    child_time,
1483                    grandchild_time
1484                );
1485
1486                Ok::<(), anyhow::Error>(())
1487            })(),
1488        )
1489        .await;
1490        Ok(())
1491    }
1492}