dynamo_runtime/
logging.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Dynamo Distributed Logging Module.
5//!
6//! - Configuration loaded from:
7//!   1. Environment variables (highest priority).
8//!   2. Optional TOML file pointed to by the `DYN_LOGGING_CONFIG_PATH` environment variable.
9//!   3. `/opt/dynamo/etc/logging.toml`.
10//!
11//! Logging can take two forms: `READABLE` or `JSONL`. The default is `READABLE`. `JSONL`
12//! can be enabled by setting the `DYN_LOGGING_JSONL` environment variable to `1`.
13//!
14//! To use local timezone for logging timestamps, set the `DYN_LOG_USE_LOCAL_TZ` environment variable to `1`.
15//!
16//! Filters can be configured using the `DYN_LOG` environment variable or by setting the `filters`
17//! key in the TOML configuration file. Filters are comma-separated key-value pairs where the key
18//! is the crate or module name and the value is the log level. The default log level is `info`.
19//!
20//! Example:
21//! ```toml
22//! log_level = "error"
23//!
24//! [log_filters]
25//! "test_logging" = "info"
26//! "test_logging::api" = "trace"
27//! ```
28
29use std::collections::{BTreeMap, HashMap};
30use std::sync::Once;
31
32use figment::{
33    Figment,
34    providers::{Format, Serialized, Toml},
35};
36use serde::{Deserialize, Serialize};
37use tracing::level_filters::LevelFilter;
38use tracing::{Event, Subscriber};
39use tracing_subscriber::EnvFilter;
40use tracing_subscriber::fmt::time::FormatTime;
41use tracing_subscriber::fmt::time::LocalTime;
42use tracing_subscriber::fmt::time::SystemTime;
43use tracing_subscriber::fmt::time::UtcTime;
44use tracing_subscriber::fmt::{FmtContext, FormatFields};
45use tracing_subscriber::fmt::{FormattedFields, format::Writer};
46use tracing_subscriber::prelude::*;
47use tracing_subscriber::registry::LookupSpan;
48use tracing_subscriber::{filter::Directive, fmt};
49
50use crate::config::{disable_ansi_logging, jsonl_logging_enabled};
51use async_nats::{HeaderMap, HeaderValue};
52use axum::extract::FromRequestParts;
53use axum::http;
54use axum::http::Request;
55use axum::http::request::Parts;
56use serde_json::Value;
57use std::convert::Infallible;
58use std::time::Instant;
59use tower_http::trace::{DefaultMakeSpan, TraceLayer};
60use tracing::Id;
61use tracing::Span;
62use tracing::field::Field;
63use tracing::span;
64use tracing_subscriber::Layer;
65use tracing_subscriber::Registry;
66use tracing_subscriber::field::Visit;
67use tracing_subscriber::fmt::format::FmtSpan;
68use tracing_subscriber::layer::Context;
69use tracing_subscriber::registry::SpanData;
70use uuid::Uuid;
71
72use opentelemetry::propagation::{Extractor, Injector, TextMapPropagator};
73use opentelemetry::trace::TraceContextExt;
74use opentelemetry::{global, trace::Tracer};
75use opentelemetry_otlp::WithExportConfig;
76
77use opentelemetry::trace::TracerProvider as _;
78use opentelemetry::{Key, KeyValue};
79use opentelemetry_sdk::Resource;
80use opentelemetry_sdk::trace::SdkTracerProvider;
81use tracing::error;
82use tracing_subscriber::layer::SubscriberExt;
83// use tracing_subscriber::Registry;
84
85use std::time::Duration;
86use tracing::{info, instrument};
87use tracing_opentelemetry::OpenTelemetrySpanExt;
88use tracing_subscriber::util::SubscriberInitExt;
89
90/// ENV used to set the log level
91const FILTER_ENV: &str = "DYN_LOG";
92
93/// Default log level
94const DEFAULT_FILTER_LEVEL: &str = "info";
95
96/// ENV used to set the path to the logging configuration file
97const CONFIG_PATH_ENV: &str = "DYN_LOGGING_CONFIG_PATH";
98
99/// Enable OTLP trace exporting
100const OTEL_EXPORT_ENABLED_ENV: &str = "OTEL_EXPORT_ENABLED";
101
102/// OTEL exporter endpoint
103const OTEL_EXPORT_ENDPOINT_ENV: &str = "OTEL_EXPORT_ENDPOINT";
104
105/// Default OTLP endpoint
106const DEFAULT_OTLP_ENDPOINT: &str = "http://localhost:4317";
107
108/// Service name environment variable
109const OTEL_SERVICE_NAME_ENV: &str = "OTEL_SERVICE_NAME";
110
111/// Default service name
112const DEFAULT_OTEL_SERVICE_NAME: &str = "dynamo";
113
114/// Once instance to ensure the logger is only initialized once
115static INIT: Once = Once::new();
116
117#[derive(Serialize, Deserialize, Debug)]
118struct LoggingConfig {
119    log_level: String,
120    log_filters: HashMap<String, String>,
121}
122impl Default for LoggingConfig {
123    fn default() -> Self {
124        LoggingConfig {
125            log_level: DEFAULT_FILTER_LEVEL.to_string(),
126            log_filters: HashMap::from([
127                ("h2".to_string(), "error".to_string()),
128                ("tower".to_string(), "error".to_string()),
129                ("hyper_util".to_string(), "error".to_string()),
130                ("neli".to_string(), "error".to_string()),
131                ("async_nats".to_string(), "error".to_string()),
132                ("rustls".to_string(), "error".to_string()),
133                ("tokenizers".to_string(), "error".to_string()),
134                ("axum".to_string(), "error".to_string()),
135                ("tonic".to_string(), "error".to_string()),
136                ("mistralrs_core".to_string(), "error".to_string()),
137                ("hf_hub".to_string(), "error".to_string()),
138                ("opentelemetry".to_string(), "error".to_string()),
139                ("opentelemetry-otlp".to_string(), "error".to_string()),
140                ("opentelemetry_sdk".to_string(), "error".to_string()),
141            ]),
142        }
143    }
144}
145
146/// Check if OTLP trace exporting is enabled (set OTEL_EXPORT_ENABLED=1 to enable)
147fn otlp_exporter_enabled() -> bool {
148    std::env::var(OTEL_EXPORT_ENABLED_ENV)
149        .map(|v| v == "1")
150        .unwrap_or(false)
151}
152
153/// Get the service name from environment or use default
154fn get_service_name() -> String {
155    std::env::var(OTEL_SERVICE_NAME_ENV).unwrap_or_else(|_| DEFAULT_OTEL_SERVICE_NAME.to_string())
156}
157
158/// Validate a given trace ID according to W3C Trace Context specifications.
159/// A valid trace ID is a 32-character hexadecimal string (lowercase).
160pub fn is_valid_trace_id(trace_id: &str) -> bool {
161    trace_id.len() == 32 && trace_id.chars().all(|c| c.is_ascii_hexdigit())
162}
163
164/// Validate a given span ID according to W3C Trace Context specifications.
165/// A valid span ID is a 16-character hexadecimal string (lowercase).
166pub fn is_valid_span_id(span_id: &str) -> bool {
167    span_id.len() == 16 && span_id.chars().all(|c| c.is_ascii_hexdigit())
168}
169
170pub struct DistributedTraceIdLayer;
171
172#[derive(Debug, Clone, Serialize, Deserialize)]
173pub struct DistributedTraceContext {
174    pub trace_id: String,
175    pub span_id: String,
176    #[serde(skip_serializing_if = "Option::is_none")]
177    pub parent_id: Option<String>,
178    #[serde(skip_serializing_if = "Option::is_none")]
179    pub tracestate: Option<String>,
180    #[serde(skip)]
181    start: Option<Instant>,
182    #[serde(skip)]
183    end: Option<Instant>,
184    #[serde(skip_serializing_if = "Option::is_none")]
185    pub x_request_id: Option<String>,
186    #[serde(skip_serializing_if = "Option::is_none")]
187    pub x_dynamo_request_id: Option<String>,
188}
189
190/// Pending context data collected in on_new_span, to be finalized in on_enter
191#[derive(Debug, Clone)]
192struct PendingDistributedTraceContext {
193    trace_id: Option<String>,
194    span_id: Option<String>,
195    parent_id: Option<String>,
196    tracestate: Option<String>,
197    x_request_id: Option<String>,
198    x_dynamo_request_id: Option<String>,
199}
200
201impl DistributedTraceContext {
202    /// Create a traceparent string from the context
203    pub fn create_traceparent(&self) -> String {
204        format!("00-{}-{}-01", self.trace_id, self.span_id)
205    }
206}
207
208/// Parse a traceparent string into its components
209pub fn parse_traceparent(traceparent: &str) -> (Option<String>, Option<String>) {
210    let pieces: Vec<_> = traceparent.split('-').collect();
211    if pieces.len() != 4 {
212        return (None, None);
213    }
214    let trace_id = pieces[1];
215    let parent_id = pieces[2];
216
217    if !is_valid_trace_id(trace_id) || !is_valid_span_id(parent_id) {
218        return (None, None);
219    }
220
221    (Some(trace_id.to_string()), Some(parent_id.to_string()))
222}
223
224#[derive(Debug, Clone, Default)]
225pub struct TraceParent {
226    pub trace_id: Option<String>,
227    pub parent_id: Option<String>,
228    pub tracestate: Option<String>,
229    pub x_request_id: Option<String>,
230    pub x_dynamo_request_id: Option<String>,
231}
232
233pub trait GenericHeaders {
234    fn get(&self, key: &str) -> Option<&str>;
235}
236
237impl GenericHeaders for async_nats::HeaderMap {
238    fn get(&self, key: &str) -> Option<&str> {
239        async_nats::HeaderMap::get(self, key).map(|value| value.as_str())
240    }
241}
242
243impl GenericHeaders for http::HeaderMap {
244    fn get(&self, key: &str) -> Option<&str> {
245        http::HeaderMap::get(self, key).and_then(|value| value.to_str().ok())
246    }
247}
248
249impl TraceParent {
250    pub fn from_headers<H: GenericHeaders>(headers: &H) -> TraceParent {
251        let mut trace_id = None;
252        let mut parent_id = None;
253        let mut tracestate = None;
254        let mut x_request_id = None;
255        let mut x_dynamo_request_id = None;
256
257        if let Some(header_value) = headers.get("traceparent") {
258            (trace_id, parent_id) = parse_traceparent(header_value);
259        }
260
261        if let Some(header_value) = headers.get("x-request-id") {
262            x_request_id = Some(header_value.to_string());
263        }
264
265        if let Some(header_value) = headers.get("tracestate") {
266            tracestate = Some(header_value.to_string());
267        }
268
269        if let Some(header_value) = headers.get("x-dynamo-request-id") {
270            x_dynamo_request_id = Some(header_value.to_string());
271        }
272
273        // Validate UUID format
274        let x_dynamo_request_id =
275            x_dynamo_request_id.filter(|id| uuid::Uuid::parse_str(id).is_ok());
276        TraceParent {
277            trace_id,
278            parent_id,
279            tracestate,
280            x_request_id,
281            x_dynamo_request_id,
282        }
283    }
284}
285
286// Takes Axum request and returning a span
287pub fn make_request_span<B>(req: &Request<B>) -> Span {
288    let method = req.method();
289    let uri = req.uri();
290    let version = format!("{:?}", req.version());
291    let trace_parent = TraceParent::from_headers(req.headers());
292
293    let span = tracing::info_span!(
294        "http-request",
295        method = %method,
296        uri = %uri,
297        version = %version,
298        trace_id = trace_parent.trace_id,
299        parent_id = trace_parent.parent_id,
300        x_request_id = trace_parent.x_request_id,
301    x_dynamo_request_id = trace_parent.x_dynamo_request_id,
302    );
303
304    span
305}
306
307/// Create a handle_payload span from NATS headers with component context
308pub fn make_handle_payload_span(
309    headers: &async_nats::HeaderMap,
310    component: &str,
311    endpoint: &str,
312    namespace: &str,
313    instance_id: i64,
314) -> Span {
315    let (otel_context, trace_id, parent_span_id) = extract_otel_context_from_nats_headers(headers);
316    let trace_parent = TraceParent::from_headers(headers);
317
318    if let (Some(trace_id), Some(parent_id)) = (trace_id.as_ref(), parent_span_id.as_ref()) {
319        let span = tracing::info_span!(
320            "handle_payload",
321            trace_id = trace_id.as_str(),
322            parent_id = parent_id.as_str(),
323            x_request_id = trace_parent.x_request_id,
324            x_dynamo_request_id = trace_parent.x_dynamo_request_id,
325            tracestate = trace_parent.tracestate,
326            component = component,
327            endpoint = endpoint,
328            namespace = namespace,
329            instance_id = instance_id,
330        );
331
332        if let Some(context) = otel_context {
333            let _ = span.set_parent(context);
334        }
335        span
336    } else {
337        tracing::info_span!(
338            "handle_payload",
339            x_request_id = trace_parent.x_request_id,
340            x_dynamo_request_id = trace_parent.x_dynamo_request_id,
341            tracestate = trace_parent.tracestate,
342            component = component,
343            endpoint = endpoint,
344            namespace = namespace,
345            instance_id = instance_id,
346        )
347    }
348}
349
350/// Extract OpenTelemetry trace context from NATS headers for distributed tracing
351pub fn extract_otel_context_from_nats_headers(
352    headers: &async_nats::HeaderMap,
353) -> (
354    Option<opentelemetry::Context>,
355    Option<String>,
356    Option<String>,
357) {
358    let traceparent_value = match headers.get("traceparent") {
359        Some(value) => value.as_str(),
360        None => return (None, None, None),
361    };
362
363    let (trace_id, parent_span_id) = parse_traceparent(traceparent_value);
364
365    struct NatsHeaderExtractor<'a>(&'a async_nats::HeaderMap);
366
367    impl<'a> Extractor for NatsHeaderExtractor<'a> {
368        fn get(&self, key: &str) -> Option<&str> {
369            self.0.get(key).map(|value| value.as_str())
370        }
371
372        fn keys(&self) -> Vec<&str> {
373            vec!["traceparent", "tracestate"]
374                .into_iter()
375                .filter(|&key| self.0.get(key).is_some())
376                .collect()
377        }
378    }
379
380    let extractor = NatsHeaderExtractor(headers);
381    let propagator = opentelemetry_sdk::propagation::TraceContextPropagator::new();
382    let otel_context = propagator.extract(&extractor);
383
384    let context_with_trace = if otel_context.span().span_context().is_valid() {
385        Some(otel_context)
386    } else {
387        None
388    };
389
390    (context_with_trace, trace_id, parent_span_id)
391}
392
393/// Inject OpenTelemetry trace context into NATS headers using W3C Trace Context propagation
394pub fn inject_otel_context_into_nats_headers(
395    headers: &mut async_nats::HeaderMap,
396    context: Option<opentelemetry::Context>,
397) {
398    let otel_context = context.unwrap_or_else(|| Span::current().context());
399
400    struct NatsHeaderInjector<'a>(&'a mut async_nats::HeaderMap);
401
402    impl<'a> Injector for NatsHeaderInjector<'a> {
403        fn set(&mut self, key: &str, value: String) {
404            self.0.insert(key, value);
405        }
406    }
407
408    let mut injector = NatsHeaderInjector(headers);
409    let propagator = opentelemetry_sdk::propagation::TraceContextPropagator::new();
410    propagator.inject_context(&otel_context, &mut injector);
411}
412
413/// Inject trace context from current span into NATS headers
414pub fn inject_current_trace_into_nats_headers(headers: &mut async_nats::HeaderMap) {
415    inject_otel_context_into_nats_headers(headers, None);
416}
417
418/// Create a client_request span linked to the parent trace context
419pub fn make_client_request_span(
420    operation: &str,
421    request_id: &str,
422    trace_context: Option<&DistributedTraceContext>,
423    instance_id: Option<&str>,
424) -> Span {
425    if let Some(ctx) = trace_context {
426        let mut headers = async_nats::HeaderMap::new();
427        headers.insert("traceparent", ctx.create_traceparent());
428
429        if let Some(ref tracestate) = ctx.tracestate {
430            headers.insert("tracestate", tracestate.as_str());
431        }
432
433        let (otel_context, _extracted_trace_id, _extracted_parent_span_id) =
434            extract_otel_context_from_nats_headers(&headers);
435
436        let span = if let Some(inst_id) = instance_id {
437            tracing::info_span!(
438                "client_request",
439                operation = operation,
440                request_id = request_id,
441                instance_id = inst_id,
442                trace_id = ctx.trace_id.as_str(),
443                parent_id = ctx.span_id.as_str(),
444                x_request_id = ctx.x_request_id.as_deref(),
445                x_dynamo_request_id = ctx.x_dynamo_request_id.as_deref(),
446                // tracestate = ctx.tracestate.as_deref(),
447            )
448        } else {
449            tracing::info_span!(
450                "client_request",
451                operation = operation,
452                request_id = request_id,
453                trace_id = ctx.trace_id.as_str(),
454                parent_id = ctx.span_id.as_str(),
455                x_request_id = ctx.x_request_id.as_deref(),
456                x_dynamo_request_id = ctx.x_dynamo_request_id.as_deref(),
457                // tracestate = ctx.tracestate.as_deref(),
458            )
459        };
460
461        if let Some(context) = otel_context {
462            let _ = span.set_parent(context);
463        }
464
465        span
466    } else if let Some(inst_id) = instance_id {
467        tracing::info_span!(
468            "client_request",
469            operation = operation,
470            request_id = request_id,
471            instance_id = inst_id,
472        )
473    } else {
474        tracing::info_span!(
475            "client_request",
476            operation = operation,
477            request_id = request_id,
478        )
479    }
480}
481
482#[derive(Debug, Default)]
483pub struct FieldVisitor {
484    pub fields: HashMap<String, String>,
485}
486
487impl Visit for FieldVisitor {
488    fn record_str(&mut self, field: &Field, value: &str) {
489        self.fields
490            .insert(field.name().to_string(), value.to_string());
491    }
492
493    fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
494        self.fields
495            .insert(field.name().to_string(), format!("{:?}", value).to_string());
496    }
497}
498
499impl<S> Layer<S> for DistributedTraceIdLayer
500where
501    S: Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>,
502{
503    // Capture close span time
504    // Currently not used but added for future use in timing
505    fn on_close(&self, id: Id, ctx: Context<'_, S>) {
506        if let Some(span) = ctx.span(&id) {
507            let mut extensions = span.extensions_mut();
508            if let Some(distributed_tracing_context) =
509                extensions.get_mut::<DistributedTraceContext>()
510            {
511                distributed_tracing_context.end = Some(Instant::now());
512            }
513        }
514    }
515
516    // Collects span attributes and metadata in on_new_span
517    // Final initialization deferred to on_enter when OtelData is available
518    fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &Id, ctx: Context<'_, S>) {
519        if let Some(span) = ctx.span(id) {
520            let mut trace_id: Option<String> = None;
521            let mut parent_id: Option<String> = None;
522            let mut span_id: Option<String> = None;
523            let mut x_request_id: Option<String> = None;
524            let mut x_dynamo_request_id: Option<String> = None;
525            let mut tracestate: Option<String> = None;
526            let mut visitor = FieldVisitor::default();
527            attrs.record(&mut visitor);
528
529            // Extract trace_id from span attributes
530            if let Some(trace_id_input) = visitor.fields.get("trace_id") {
531                if !is_valid_trace_id(trace_id_input) {
532                    tracing::trace!("trace id  '{}' is not valid! Ignoring.", trace_id_input);
533                } else {
534                    trace_id = Some(trace_id_input.to_string());
535                }
536            }
537
538            // Extract span_id from span attributes
539            if let Some(span_id_input) = visitor.fields.get("span_id") {
540                if !is_valid_span_id(span_id_input) {
541                    tracing::trace!("span id  '{}' is not valid! Ignoring.", span_id_input);
542                } else {
543                    span_id = Some(span_id_input.to_string());
544                }
545            }
546
547            // Extract parent_id from span attributes
548            if let Some(parent_id_input) = visitor.fields.get("parent_id") {
549                if !is_valid_span_id(parent_id_input) {
550                    tracing::trace!("parent id  '{}' is not valid! Ignoring.", parent_id_input);
551                } else {
552                    parent_id = Some(parent_id_input.to_string());
553                }
554            }
555
556            // Extract tracestate
557            if let Some(tracestate_input) = visitor.fields.get("tracestate") {
558                tracestate = Some(tracestate_input.to_string());
559            }
560
561            // Extract x_request_id
562            if let Some(x_request_id_input) = visitor.fields.get("x_request_id") {
563                x_request_id = Some(x_request_id_input.to_string());
564            }
565
566            // Extract x_dynamo_request_id
567            if let Some(x_request_id_input) = visitor.fields.get("x_dynamo_request_id") {
568                x_dynamo_request_id = Some(x_request_id_input.to_string());
569            }
570
571            // Inherit trace context from parent span if available
572            if parent_id.is_none()
573                && let Some(parent_span_id) = ctx.current_span().id()
574                && let Some(parent_span) = ctx.span(parent_span_id)
575            {
576                let parent_ext = parent_span.extensions();
577                if let Some(parent_tracing_context) = parent_ext.get::<DistributedTraceContext>() {
578                    trace_id = Some(parent_tracing_context.trace_id.clone());
579                    parent_id = Some(parent_tracing_context.span_id.clone());
580                    tracestate = parent_tracing_context.tracestate.clone();
581                }
582            }
583
584            // Validate consistency
585            if (parent_id.is_some() || span_id.is_some()) && trace_id.is_none() {
586                tracing::error!("parent id or span id are set but trace id is not set!");
587                // Clear inconsistent IDs to maintain trace integrity
588                parent_id = None;
589                span_id = None;
590            }
591
592            // Store pending context - will be finalized in on_enter
593            let mut extensions = span.extensions_mut();
594            extensions.insert(PendingDistributedTraceContext {
595                trace_id,
596                span_id,
597                parent_id,
598                tracestate,
599                x_request_id,
600                x_dynamo_request_id,
601            });
602        }
603    }
604
605    // Finalizes the DistributedTraceContext when span is entered
606    // At this point, OtelData should have valid trace_id and span_id
607    fn on_enter(&self, id: &Id, ctx: Context<'_, S>) {
608        if let Some(span) = ctx.span(id) {
609            // Check if already initialized (e.g., span re-entered)
610            {
611                let extensions = span.extensions();
612                if extensions.get::<DistributedTraceContext>().is_some() {
613                    return;
614                }
615            }
616
617            // Get the pending context and extract OtelData IDs
618            let mut extensions = span.extensions_mut();
619            let pending = match extensions.remove::<PendingDistributedTraceContext>() {
620                Some(p) => p,
621                None => {
622                    // This shouldn't happen - on_new_span should have created it
623                    tracing::error!("PendingDistributedTraceContext not found in on_enter");
624                    return;
625                }
626            };
627
628            let mut trace_id = pending.trace_id;
629            let mut span_id = pending.span_id;
630            let parent_id = pending.parent_id;
631            let tracestate = pending.tracestate;
632            let x_request_id = pending.x_request_id;
633            let x_dynamo_request_id = pending.x_dynamo_request_id;
634
635            // Try to extract from OtelData if not already set
636            // Need to drop extensions_mut to get immutable borrow for OtelData
637            drop(extensions);
638
639            if trace_id.is_none() || span_id.is_none() {
640                let extensions = span.extensions();
641                if let Some(otel_data) = extensions.get::<tracing_opentelemetry::OtelData>() {
642                    // Extract trace_id from OTEL data if not already set
643                    if trace_id.is_none()
644                        && let Some(otel_trace_id) = otel_data.trace_id()
645                    {
646                        let trace_id_str = format!("{}", otel_trace_id);
647                        if is_valid_trace_id(&trace_id_str) {
648                            trace_id = Some(trace_id_str);
649                        }
650                    }
651
652                    // Extract span_id from OTEL data if not already set
653                    if span_id.is_none()
654                        && let Some(otel_span_id) = otel_data.span_id()
655                    {
656                        let span_id_str = format!("{}", otel_span_id);
657                        if is_valid_span_id(&span_id_str) {
658                            span_id = Some(span_id_str);
659                        }
660                    }
661                }
662            }
663
664            // Panic if we still don't have required IDs
665            if trace_id.is_none() {
666                panic!(
667                    "trace_id is not set in on_enter - OtelData may not be properly initialized"
668                );
669            }
670
671            if span_id.is_none() {
672                panic!("span_id is not set in on_enter - OtelData may not be properly initialized");
673            }
674
675            // Re-acquire mutable borrow to insert the finalized context
676            let mut extensions = span.extensions_mut();
677            extensions.insert(DistributedTraceContext {
678                trace_id: trace_id.expect("Trace ID must be set"),
679                span_id: span_id.expect("Span ID must be set"),
680                parent_id,
681                tracestate,
682                start: Some(Instant::now()),
683                end: None,
684                x_request_id,
685                x_dynamo_request_id,
686            });
687        }
688    }
689}
690
691// Enables functions to retreive their current
692// context for adding to distributed headers
693pub fn get_distributed_tracing_context() -> Option<DistributedTraceContext> {
694    Span::current()
695        .with_subscriber(|(id, subscriber)| {
696            subscriber
697                .downcast_ref::<Registry>()
698                .and_then(|registry| registry.span_data(id))
699                .and_then(|span_data| {
700                    let extensions = span_data.extensions();
701                    extensions.get::<DistributedTraceContext>().cloned()
702                })
703        })
704        .flatten()
705}
706
707/// Initialize the logger - must be called when Tokio runtime is available
708pub fn init() {
709    INIT.call_once(|| {
710        if let Err(e) = setup_logging() {
711            eprintln!("Failed to initialize logging: {}", e);
712            std::process::exit(1);
713        }
714    });
715}
716
717#[cfg(feature = "tokio-console")]
718fn setup_logging() {
719    let tokio_console_layer = console_subscriber::ConsoleLayer::builder()
720        .with_default_env()
721        .server_addr(([0, 0, 0, 0], console_subscriber::Server::DEFAULT_PORT))
722        .spawn();
723    let tokio_console_target = tracing_subscriber::filter::Targets::new()
724        .with_default(LevelFilter::ERROR)
725        .with_target("runtime", LevelFilter::TRACE)
726        .with_target("tokio", LevelFilter::TRACE);
727    let l = fmt::layer()
728        .with_ansi(!disable_ansi_logging())
729        .event_format(fmt::format().compact().with_timer(TimeFormatter::new()))
730        .with_writer(std::io::stderr)
731        .with_filter(filters(load_config()));
732    tracing_subscriber::registry()
733        .with(l)
734        .with(tokio_console_layer.with_filter(tokio_console_target))
735        .init();
736}
737
738#[cfg(not(feature = "tokio-console"))]
739fn setup_logging() -> Result<(), Box<dyn std::error::Error>> {
740    let fmt_filter_layer = filters(load_config());
741    let trace_filter_layer = filters(load_config());
742    let otel_filter_layer = filters(load_config());
743
744    if jsonl_logging_enabled() {
745        let l = fmt::layer()
746            .with_ansi(false)
747            .with_span_events(FmtSpan::NEW | FmtSpan::CLOSE)
748            .event_format(CustomJsonFormatter::new())
749            .with_writer(std::io::stderr)
750            .with_filter(fmt_filter_layer);
751
752        // Create OpenTelemetry tracer - conditionally export to OTLP based on env var
753        let service_name = get_service_name();
754
755        // Build tracer provider - with or without OTLP export
756        let tracer_provider = if otlp_exporter_enabled() {
757            // Export enabled: create OTLP exporter with batch processor
758            let endpoint = std::env::var(OTEL_EXPORT_ENDPOINT_ENV)
759                .unwrap_or_else(|_| DEFAULT_OTLP_ENDPOINT.to_string());
760
761            tracing::info!(
762                "OpenTelemetry OTLP export enabled, endpoint: {}, service: {}",
763                endpoint,
764                service_name
765            );
766
767            // Initialize OTLP exporter using gRPC (Tonic)
768            let otlp_exporter = opentelemetry_otlp::SpanExporter::builder()
769                .with_tonic()
770                .with_endpoint(endpoint)
771                .build()?;
772
773            // Create tracer provider with batch exporter and service name
774            opentelemetry_sdk::trace::SdkTracerProvider::builder()
775                .with_batch_exporter(otlp_exporter)
776                .with_resource(
777                    opentelemetry_sdk::Resource::builder_empty()
778                        .with_service_name(service_name.clone())
779                        .build(),
780                )
781                .build()
782        } else {
783            // No export - traces generated locally only (for logging/trace IDs)
784            tracing::info!(
785                "OpenTelemetry OTLP export disabled, traces local only, service: {}",
786                service_name
787            );
788
789            opentelemetry_sdk::trace::SdkTracerProvider::builder()
790                .with_resource(
791                    opentelemetry_sdk::Resource::builder_empty()
792                        .with_service_name(service_name.clone())
793                        .build(),
794                )
795                .build()
796        };
797
798        // Get a tracer from the provider
799        let tracer = tracer_provider.tracer(service_name);
800
801        tracing_subscriber::registry()
802            .with(
803                tracing_opentelemetry::layer()
804                    .with_tracer(tracer)
805                    .with_filter(otel_filter_layer),
806            )
807            .with(DistributedTraceIdLayer.with_filter(trace_filter_layer))
808            .with(l)
809            .init();
810    } else {
811        let l = fmt::layer()
812            .with_ansi(!disable_ansi_logging())
813            .event_format(fmt::format().compact().with_timer(TimeFormatter::new()))
814            .with_writer(std::io::stderr)
815            .with_filter(fmt_filter_layer);
816
817        tracing_subscriber::registry().with(l).init();
818    }
819
820    Ok(())
821}
822
823fn filters(config: LoggingConfig) -> EnvFilter {
824    let mut filter_layer = EnvFilter::builder()
825        .with_default_directive(config.log_level.parse().unwrap())
826        .with_env_var(FILTER_ENV)
827        .from_env_lossy();
828
829    for (module, level) in config.log_filters {
830        match format!("{module}={level}").parse::<Directive>() {
831            Ok(d) => {
832                filter_layer = filter_layer.add_directive(d);
833            }
834            Err(e) => {
835                eprintln!("Failed parsing filter '{level}' for module '{module}': {e}");
836            }
837        }
838    }
839    filter_layer
840}
841
842/// Log a message with file and line info
843/// Used by Python wrapper
844pub fn log_message(level: &str, message: &str, module: &str, file: &str, line: u32) {
845    let level = match level {
846        "debug" => log::Level::Debug,
847        "info" => log::Level::Info,
848        "warn" => log::Level::Warn,
849        "error" => log::Level::Error,
850        "warning" => log::Level::Warn,
851        _ => log::Level::Info,
852    };
853    log::logger().log(
854        &log::Record::builder()
855            .args(format_args!("{}", message))
856            .level(level)
857            .target(module)
858            .file(Some(file))
859            .line(Some(line))
860            .build(),
861    );
862}
863
864fn load_config() -> LoggingConfig {
865    let config_path = std::env::var(CONFIG_PATH_ENV).unwrap_or_else(|_| "".to_string());
866    let figment = Figment::new()
867        .merge(Serialized::defaults(LoggingConfig::default()))
868        .merge(Toml::file("/opt/dynamo/etc/logging.toml"))
869        .merge(Toml::file(config_path));
870
871    figment.extract().unwrap()
872}
873
874#[derive(Serialize)]
875struct JsonLog<'a> {
876    time: String,
877    level: String,
878    #[serde(skip_serializing_if = "Option::is_none")]
879    file: Option<&'a str>,
880    #[serde(skip_serializing_if = "Option::is_none")]
881    line: Option<u32>,
882    target: &'a str,
883    message: serde_json::Value,
884    #[serde(flatten)]
885    fields: BTreeMap<String, serde_json::Value>,
886}
887
888struct TimeFormatter {
889    use_local_tz: bool,
890}
891
892impl TimeFormatter {
893    fn new() -> Self {
894        Self {
895            use_local_tz: crate::config::use_local_timezone(),
896        }
897    }
898
899    fn format_now(&self) -> String {
900        if self.use_local_tz {
901            chrono::Local::now()
902                .format("%Y-%m-%dT%H:%M:%S%.6f%:z")
903                .to_string()
904        } else {
905            chrono::Utc::now()
906                .format("%Y-%m-%dT%H:%M:%S%.6fZ")
907                .to_string()
908        }
909    }
910}
911
912impl FormatTime for TimeFormatter {
913    fn format_time(&self, w: &mut fmt::format::Writer<'_>) -> std::fmt::Result {
914        write!(w, "{}", self.format_now())
915    }
916}
917
918struct CustomJsonFormatter {
919    time_formatter: TimeFormatter,
920}
921
922impl CustomJsonFormatter {
923    fn new() -> Self {
924        Self {
925            time_formatter: TimeFormatter::new(),
926        }
927    }
928}
929
930use once_cell::sync::Lazy;
931use regex::Regex;
932fn parse_tracing_duration(s: &str) -> Option<u64> {
933    static RE: Lazy<Regex> =
934        Lazy::new(|| Regex::new(r#"^["']?\s*([0-9.]+)\s*(µs|us|ns|ms|s)\s*["']?$"#).unwrap());
935    let captures = RE.captures(s)?;
936    let value: f64 = captures[1].parse().ok()?;
937    let unit = &captures[2];
938    match unit {
939        "ns" => Some((value / 1000.0) as u64),
940        "µs" | "us" => Some(value as u64),
941        "ms" => Some((value * 1000.0) as u64),
942        "s" => Some((value * 1_000_000.0) as u64),
943        _ => None,
944    }
945}
946
947impl<S, N> tracing_subscriber::fmt::FormatEvent<S, N> for CustomJsonFormatter
948where
949    S: Subscriber + for<'a> LookupSpan<'a>,
950    N: for<'a> FormatFields<'a> + 'static,
951{
952    fn format_event(
953        &self,
954        ctx: &FmtContext<'_, S, N>,
955        mut writer: Writer<'_>,
956        event: &Event<'_>,
957    ) -> std::fmt::Result {
958        let mut visitor = JsonVisitor::default();
959        let time = self.time_formatter.format_now();
960        event.record(&mut visitor);
961        let mut message = visitor
962            .fields
963            .remove("message")
964            .unwrap_or(serde_json::Value::String("".to_string()));
965
966        let current_span = event
967            .parent()
968            .and_then(|id| ctx.span(id))
969            .or_else(|| ctx.lookup_current());
970        if let Some(span) = current_span {
971            let ext = span.extensions();
972            let data = ext.get::<FormattedFields<N>>().unwrap();
973            let span_fields: Vec<(&str, &str)> = data
974                .fields
975                .split(' ')
976                .filter_map(|entry| entry.split_once('='))
977                .collect();
978            for (name, value) in span_fields {
979                visitor.fields.insert(
980                    name.to_string(),
981                    serde_json::Value::String(value.trim_matches('"').to_string()),
982                );
983            }
984
985            let busy_us = visitor
986                .fields
987                .remove("time.busy")
988                .and_then(|v| parse_tracing_duration(&v.to_string()));
989            let idle_us = visitor
990                .fields
991                .remove("time.idle")
992                .and_then(|v| parse_tracing_duration(&v.to_string()));
993
994            if let (Some(busy_us), Some(idle_us)) = (busy_us, idle_us) {
995                visitor.fields.insert(
996                    "time.busy_us".to_string(),
997                    serde_json::Value::Number(busy_us.into()),
998                );
999                visitor.fields.insert(
1000                    "time.idle_us".to_string(),
1001                    serde_json::Value::Number(idle_us.into()),
1002                );
1003                visitor.fields.insert(
1004                    "time.duration_us".to_string(),
1005                    serde_json::Value::Number((busy_us + idle_us).into()),
1006                );
1007            }
1008
1009            message = match message.as_str() {
1010                Some("new") => serde_json::Value::String("SPAN_CREATED".to_string()),
1011                Some("close") => serde_json::Value::String("SPAN_CLOSED".to_string()),
1012                _ => message.clone(),
1013            };
1014
1015            visitor.fields.insert(
1016                "span_name".to_string(),
1017                serde_json::Value::String(span.name().to_string()),
1018            );
1019
1020            if let Some(tracing_context) = ext.get::<DistributedTraceContext>() {
1021                visitor.fields.insert(
1022                    "span_id".to_string(),
1023                    serde_json::Value::String(tracing_context.span_id.clone()),
1024                );
1025                visitor.fields.insert(
1026                    "trace_id".to_string(),
1027                    serde_json::Value::String(tracing_context.trace_id.clone()),
1028                );
1029                if let Some(parent_id) = tracing_context.parent_id.clone() {
1030                    visitor.fields.insert(
1031                        "parent_id".to_string(),
1032                        serde_json::Value::String(parent_id),
1033                    );
1034                } else {
1035                    visitor.fields.remove("parent_id");
1036                }
1037                if let Some(tracestate) = tracing_context.tracestate.clone() {
1038                    visitor.fields.insert(
1039                        "tracestate".to_string(),
1040                        serde_json::Value::String(tracestate),
1041                    );
1042                } else {
1043                    visitor.fields.remove("tracestate");
1044                }
1045                if let Some(x_request_id) = tracing_context.x_request_id.clone() {
1046                    visitor.fields.insert(
1047                        "x_request_id".to_string(),
1048                        serde_json::Value::String(x_request_id),
1049                    );
1050                } else {
1051                    visitor.fields.remove("x_request_id");
1052                }
1053
1054                if let Some(x_dynamo_request_id) = tracing_context.x_dynamo_request_id.clone() {
1055                    visitor.fields.insert(
1056                        "x_dynamo_request_id".to_string(),
1057                        serde_json::Value::String(x_dynamo_request_id),
1058                    );
1059                } else {
1060                    visitor.fields.remove("x_dynamo_request_id");
1061                }
1062            } else {
1063                tracing::error!(
1064                    "Distributed Trace Context not found, falling back to internal ids"
1065                );
1066                visitor.fields.insert(
1067                    "span_id".to_string(),
1068                    serde_json::Value::String(span.id().into_u64().to_string()),
1069                );
1070                if let Some(parent) = span.parent() {
1071                    visitor.fields.insert(
1072                        "parent_id".to_string(),
1073                        serde_json::Value::String(parent.id().into_u64().to_string()),
1074                    );
1075                }
1076            }
1077        } else {
1078            let reserved_fields = [
1079                "trace_id",
1080                "span_id",
1081                "parent_id",
1082                "span_name",
1083                "tracestate",
1084            ];
1085            for reserved_field in reserved_fields {
1086                visitor.fields.remove(reserved_field);
1087            }
1088        }
1089        let metadata = event.metadata();
1090        let log = JsonLog {
1091            level: metadata.level().to_string(),
1092            time,
1093            file: metadata.file(),
1094            line: metadata.line(),
1095            target: metadata.target(),
1096            message,
1097            fields: visitor.fields,
1098        };
1099        let json = serde_json::to_string(&log).unwrap();
1100        writeln!(writer, "{json}")
1101    }
1102}
1103
1104#[derive(Default)]
1105struct JsonVisitor {
1106    fields: BTreeMap<String, serde_json::Value>,
1107}
1108
1109impl tracing::field::Visit for JsonVisitor {
1110    fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
1111        self.fields.insert(
1112            field.name().to_string(),
1113            serde_json::Value::String(format!("{value:?}")),
1114        );
1115    }
1116
1117    fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
1118        if field.name() != "message" {
1119            match serde_json::from_str::<Value>(value) {
1120                Ok(json_val) => self.fields.insert(field.name().to_string(), json_val),
1121                Err(_) => self.fields.insert(field.name().to_string(), value.into()),
1122            };
1123        } else {
1124            self.fields.insert(field.name().to_string(), value.into());
1125        }
1126    }
1127
1128    fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
1129        self.fields
1130            .insert(field.name().to_string(), serde_json::Value::Bool(value));
1131    }
1132
1133    fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
1134        self.fields.insert(
1135            field.name().to_string(),
1136            serde_json::Value::Number(value.into()),
1137        );
1138    }
1139
1140    fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
1141        self.fields.insert(
1142            field.name().to_string(),
1143            serde_json::Value::Number(value.into()),
1144        );
1145    }
1146
1147    fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
1148        use serde_json::value::Number;
1149        self.fields.insert(
1150            field.name().to_string(),
1151            serde_json::Value::Number(Number::from_f64(value).unwrap_or(0.into())),
1152        );
1153    }
1154}
1155
1156#[cfg(test)]
1157pub mod tests {
1158    use super::*;
1159    use anyhow::{Result, anyhow};
1160    use chrono::{DateTime, Utc};
1161    use jsonschema::{Draft, JSONSchema};
1162    use serde_json::Value;
1163    use std::fs::File;
1164    use std::io::{BufRead, BufReader};
1165    use stdio_override::*;
1166    use tempfile::NamedTempFile;
1167
1168    static LOG_LINE_SCHEMA: &str = r#"
1169    {
1170      "$schema": "http://json-schema.org/draft-07/schema#",
1171      "title": "Runtime Log Line",
1172      "type": "object",
1173      "required": [
1174        "file",
1175        "level",
1176        "line",
1177        "message",
1178        "target",
1179        "time"
1180      ],
1181      "properties": {
1182        "file":      { "type": "string" },
1183        "level":     { "type": "string", "enum": ["ERROR", "WARN", "INFO", "DEBUG", "TRACE"] },
1184        "line":      { "type": "integer" },
1185        "message":   { "type": "string" },
1186        "target":    { "type": "string" },
1187        "time":      { "type": "string", "format": "date-time" },
1188        "span_id":   { "type": "string", "pattern": "^[a-f0-9]{16}$" },
1189        "parent_id": { "type": "string", "pattern": "^[a-f0-9]{16}$" },
1190        "trace_id":  { "type": "string", "pattern": "^[a-f0-9]{32}$" },
1191        "span_name": { "type": "string" },
1192        "time.busy_us":     { "type": "integer" },
1193        "time.duration_us": { "type": "integer" },
1194        "time.idle_us":     { "type": "integer" },
1195        "tracestate": { "type": "string" }
1196      },
1197      "additionalProperties": true
1198    }
1199    "#;
1200
1201    #[tracing::instrument(skip_all)]
1202    async fn parent() {
1203        tracing::trace!(message = "parent!");
1204        if let Some(my_ctx) = get_distributed_tracing_context() {
1205            tracing::info!(my_trace_id = my_ctx.trace_id);
1206        }
1207        child().await;
1208    }
1209
1210    #[tracing::instrument(skip_all)]
1211    async fn child() {
1212        tracing::trace!(message = "child");
1213        if let Some(my_ctx) = get_distributed_tracing_context() {
1214            tracing::info!(my_trace_id = my_ctx.trace_id);
1215        }
1216        grandchild().await;
1217    }
1218
1219    #[tracing::instrument(skip_all)]
1220    async fn grandchild() {
1221        tracing::trace!(message = "grandchild");
1222        if let Some(my_ctx) = get_distributed_tracing_context() {
1223            tracing::info!(my_trace_id = my_ctx.trace_id);
1224        }
1225    }
1226
1227    pub fn load_log(file_name: &str) -> Result<Vec<serde_json::Value>> {
1228        let schema_json: Value =
1229            serde_json::from_str(LOG_LINE_SCHEMA).expect("schema parse failure");
1230        let compiled_schema = JSONSchema::options()
1231            .with_draft(Draft::Draft7)
1232            .compile(&schema_json)
1233            .expect("Invalid schema");
1234
1235        let f = File::open(file_name)?;
1236        let reader = BufReader::new(f);
1237        let mut result = Vec::new();
1238
1239        for (line_num, line) in reader.lines().enumerate() {
1240            let line = line?;
1241            let val: Value = serde_json::from_str(&line)
1242                .map_err(|e| anyhow!("Line {}: invalid JSON: {}", line_num + 1, e))?;
1243
1244            if let Err(errors) = compiled_schema.validate(&val) {
1245                let errs = errors.map(|e| e.to_string()).collect::<Vec<_>>().join("; ");
1246                return Err(anyhow!(
1247                    "Line {}: JSON Schema Validation errors: {}",
1248                    line_num + 1,
1249                    errs
1250                ));
1251            }
1252            println!("{}", val);
1253            result.push(val);
1254        }
1255        Ok(result)
1256    }
1257
1258    #[tokio::test]
1259    async fn test_json_log_capture() -> Result<()> {
1260        #[allow(clippy::redundant_closure_call)]
1261        let _ = temp_env::async_with_vars(
1262            [("DYN_LOGGING_JSONL", Some("1"))],
1263            (async || {
1264                let tmp_file = NamedTempFile::new().unwrap();
1265                let file_name = tmp_file.path().to_str().unwrap();
1266                let guard = StderrOverride::from_file(file_name)?;
1267                init();
1268                parent().await;
1269                drop(guard);
1270
1271                let lines = load_log(file_name)?;
1272
1273                // 1. Extract the dynamically generated trace ID and validate consistency
1274                // All logs should have the same trace_id since they're part of the same trace
1275                let trace_id = lines
1276                    .first()
1277                    .and_then(|log_line| log_line.get("trace_id"))
1278                    .and_then(|v| v.as_str())
1279                    .expect("First log line should have a trace_id")
1280                    .to_string();
1281
1282                // Verify trace_id is not a zero/invalid ID
1283                assert_ne!(
1284                    trace_id, "00000000000000000000000000000000",
1285                    "trace_id should not be a zero/invalid ID"
1286                );
1287                assert!(
1288                    !trace_id.chars().all(|c| c == '0'),
1289                    "trace_id should not be all zeros"
1290                );
1291
1292                // Verify all logs have the same trace_id
1293                for log_line in &lines {
1294                    if let Some(line_trace_id) = log_line.get("trace_id") {
1295                        assert_eq!(
1296                            line_trace_id.as_str().unwrap(),
1297                            &trace_id,
1298                            "All logs should have the same trace_id"
1299                        );
1300                    }
1301                }
1302
1303                // Validate my_trace_id matches the actual trace ID
1304                for log_line in &lines {
1305                    if let Some(my_trace_id) = log_line.get("my_trace_id") {
1306                        assert_eq!(
1307                            my_trace_id,
1308                            &serde_json::Value::String(trace_id.clone()),
1309                            "my_trace_id should match the trace_id from distributed tracing context"
1310                        );
1311                    }
1312                }
1313
1314                // 2. Validate span IDs are unique for SPAN_CREATED and SPAN_CLOSED events
1315                let mut created_span_ids: Vec<String> = Vec::new();
1316                let mut closed_span_ids: Vec<String> = Vec::new();
1317
1318                for log_line in &lines {
1319                    if let Some(message) = log_line.get("message") {
1320                        match message.as_str().unwrap() {
1321                            "SPAN_CREATED" => {
1322                                if let Some(span_id) = log_line.get("span_id") {
1323                                    let span_id_str = span_id.as_str().unwrap();
1324                                    assert!(
1325                                        created_span_ids.iter().all(|id| id != span_id_str),
1326                                        "Duplicate span ID found in SPAN_CREATED: {}",
1327                                        span_id_str
1328                                    );
1329                                    created_span_ids.push(span_id_str.to_string());
1330                                }
1331                            }
1332                            "SPAN_CLOSED" => {
1333                                if let Some(span_id) = log_line.get("span_id") {
1334                                    let span_id_str = span_id.as_str().unwrap();
1335                                    assert!(
1336                                        closed_span_ids.iter().all(|id| id != span_id_str),
1337                                        "Duplicate span ID found in SPAN_CLOSED: {}",
1338                                        span_id_str
1339                                    );
1340                                    closed_span_ids.push(span_id_str.to_string());
1341                                }
1342                            }
1343                            _ => {}
1344                        }
1345                    }
1346                }
1347
1348                // Additionally, ensure that every SPAN_CLOSED has a corresponding SPAN_CREATED
1349                for closed_span_id in &closed_span_ids {
1350                    assert!(
1351                        created_span_ids.contains(closed_span_id),
1352                        "SPAN_CLOSED without corresponding SPAN_CREATED: {}",
1353                        closed_span_id
1354                    );
1355                }
1356
1357                // 3. Validate parent span relationships
1358                let parent_span_id = lines
1359                    .iter()
1360                    .find(|log_line| {
1361                        log_line.get("message").unwrap().as_str().unwrap() == "SPAN_CREATED"
1362                            && log_line.get("span_name").unwrap().as_str().unwrap() == "parent"
1363                    })
1364                    .and_then(|log_line| {
1365                        log_line
1366                            .get("span_id")
1367                            .map(|s| s.as_str().unwrap().to_string())
1368                    })
1369                    .unwrap();
1370
1371                let child_span_id = lines
1372                    .iter()
1373                    .find(|log_line| {
1374                        log_line.get("message").unwrap().as_str().unwrap() == "SPAN_CREATED"
1375                            && log_line.get("span_name").unwrap().as_str().unwrap() == "child"
1376                    })
1377                    .and_then(|log_line| {
1378                        log_line
1379                            .get("span_id")
1380                            .map(|s| s.as_str().unwrap().to_string())
1381                    })
1382                    .unwrap();
1383
1384                let _grandchild_span_id = lines
1385                    .iter()
1386                    .find(|log_line| {
1387                        log_line.get("message").unwrap().as_str().unwrap() == "SPAN_CREATED"
1388                            && log_line.get("span_name").unwrap().as_str().unwrap() == "grandchild"
1389                    })
1390                    .and_then(|log_line| {
1391                        log_line
1392                            .get("span_id")
1393                            .map(|s| s.as_str().unwrap().to_string())
1394                    })
1395                    .unwrap();
1396
1397                // Parent span has no parent_id
1398                for log_line in &lines {
1399                    if let Some(span_name) = log_line.get("span_name")
1400                        && let Some(span_name_str) = span_name.as_str()
1401                        && span_name_str == "parent"
1402                    {
1403                        assert!(log_line.get("parent_id").is_none());
1404                    }
1405                }
1406
1407                // Child span's parent_id is parent_span_id
1408                for log_line in &lines {
1409                    if let Some(span_name) = log_line.get("span_name")
1410                        && let Some(span_name_str) = span_name.as_str()
1411                        && span_name_str == "child"
1412                    {
1413                        assert_eq!(
1414                            log_line.get("parent_id").unwrap().as_str().unwrap(),
1415                            &parent_span_id
1416                        );
1417                    }
1418                }
1419
1420                // Grandchild span's parent_id is child_span_id
1421                for log_line in &lines {
1422                    if let Some(span_name) = log_line.get("span_name")
1423                        && let Some(span_name_str) = span_name.as_str()
1424                        && span_name_str == "grandchild"
1425                    {
1426                        assert_eq!(
1427                            log_line.get("parent_id").unwrap().as_str().unwrap(),
1428                            &child_span_id
1429                        );
1430                    }
1431                }
1432
1433                // Validate duration relationships
1434                let parent_duration = lines
1435                    .iter()
1436                    .find(|log_line| {
1437                        log_line.get("message").unwrap().as_str().unwrap() == "SPAN_CLOSED"
1438                            && log_line.get("span_name").unwrap().as_str().unwrap() == "parent"
1439                    })
1440                    .and_then(|log_line| {
1441                        log_line
1442                            .get("time.duration_us")
1443                            .map(|d| d.as_u64().unwrap())
1444                    })
1445                    .unwrap();
1446
1447                let child_duration = lines
1448                    .iter()
1449                    .find(|log_line| {
1450                        log_line.get("message").unwrap().as_str().unwrap() == "SPAN_CLOSED"
1451                            && log_line.get("span_name").unwrap().as_str().unwrap() == "child"
1452                    })
1453                    .and_then(|log_line| {
1454                        log_line
1455                            .get("time.duration_us")
1456                            .map(|d| d.as_u64().unwrap())
1457                    })
1458                    .unwrap();
1459
1460                let grandchild_duration = lines
1461                    .iter()
1462                    .find(|log_line| {
1463                        log_line.get("message").unwrap().as_str().unwrap() == "SPAN_CLOSED"
1464                            && log_line.get("span_name").unwrap().as_str().unwrap() == "grandchild"
1465                    })
1466                    .and_then(|log_line| {
1467                        log_line
1468                            .get("time.duration_us")
1469                            .map(|d| d.as_u64().unwrap())
1470                    })
1471                    .unwrap();
1472
1473                assert!(
1474                    parent_duration > child_duration + grandchild_duration,
1475                    "Parent duration is not greater than the sum of child and grandchild durations"
1476                );
1477                assert!(
1478                    child_duration > grandchild_duration,
1479                    "Child duration is not greater than grandchild duration"
1480                );
1481
1482                Ok::<(), anyhow::Error>(())
1483            })(),
1484        )
1485        .await;
1486        Ok(())
1487    }
1488}