dynamo_runtime/
logging.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Dynamo Distributed Logging Module.
5//!
6//! - Configuration loaded from:
7//!   1. Environment variables (highest priority).
8//!   2. Optional TOML file pointed to by the `DYN_LOGGING_CONFIG_PATH` environment variable.
9//!   3. `/opt/dynamo/etc/logging.toml`.
10//!
11//! Logging can take two forms: `READABLE` or `JSONL`. The default is `READABLE`. `JSONL`
12//! can be enabled by setting the `DYN_LOGGING_JSONL` environment variable to `1`.
13//!
14//! To use local timezone for logging timestamps, set the `DYN_LOG_USE_LOCAL_TZ` environment variable to `1`.
15//!
16//! Filters can be configured using the `DYN_LOG` environment variable or by setting the `filters`
17//! key in the TOML configuration file. Filters are comma-separated key-value pairs where the key
18//! is the crate or module name and the value is the log level. The default log level is `info`.
19//!
20//! Example:
21//! ```toml
22//! log_level = "error"
23//!
24//! [log_filters]
25//! "test_logging" = "info"
26//! "test_logging::api" = "trace"
27//! ```
28
29use std::collections::{BTreeMap, HashMap};
30use std::sync::Once;
31
32use figment::{
33    Figment,
34    providers::{Format, Serialized, Toml},
35};
36use serde::{Deserialize, Serialize};
37use tracing::level_filters::LevelFilter;
38use tracing::{Event, Subscriber};
39use tracing_subscriber::EnvFilter;
40use tracing_subscriber::fmt::time::FormatTime;
41use tracing_subscriber::fmt::time::LocalTime;
42use tracing_subscriber::fmt::time::SystemTime;
43use tracing_subscriber::fmt::time::UtcTime;
44use tracing_subscriber::fmt::{FmtContext, FormatFields};
45use tracing_subscriber::fmt::{FormattedFields, format::Writer};
46use tracing_subscriber::prelude::*;
47use tracing_subscriber::registry::LookupSpan;
48use tracing_subscriber::{filter::Directive, fmt};
49
50use crate::config::{disable_ansi_logging, jsonl_logging_enabled};
51use async_nats::{HeaderMap, HeaderValue};
52use axum::extract::FromRequestParts;
53use axum::http;
54use axum::http::Request;
55use axum::http::request::Parts;
56use serde_json::Value;
57use std::convert::Infallible;
58use std::time::Instant;
59use tower_http::trace::{DefaultMakeSpan, TraceLayer};
60use tracing::Id;
61use tracing::Span;
62use tracing::field::Field;
63use tracing::span;
64use tracing_subscriber::Layer;
65use tracing_subscriber::Registry;
66use tracing_subscriber::field::Visit;
67use tracing_subscriber::fmt::format::FmtSpan;
68use tracing_subscriber::layer::Context;
69use tracing_subscriber::registry::SpanData;
70use uuid::Uuid;
71
72/// ENV used to set the log level
73const FILTER_ENV: &str = "DYN_LOG";
74
75/// Default log level
76const DEFAULT_FILTER_LEVEL: &str = "info";
77
78/// ENV used to set the path to the logging configuration file
79const CONFIG_PATH_ENV: &str = "DYN_LOGGING_CONFIG_PATH";
80
81/// Once instance to ensure the logger is only initialized once
82static INIT: Once = Once::new();
83
84#[derive(Serialize, Deserialize, Debug)]
85struct LoggingConfig {
86    log_level: String,
87    log_filters: HashMap<String, String>,
88}
89impl Default for LoggingConfig {
90    fn default() -> Self {
91        LoggingConfig {
92            log_level: DEFAULT_FILTER_LEVEL.to_string(),
93            log_filters: HashMap::from([
94                ("h2".to_string(), "error".to_string()),
95                ("tower".to_string(), "error".to_string()),
96                ("hyper_util".to_string(), "error".to_string()),
97                ("neli".to_string(), "error".to_string()),
98                ("async_nats".to_string(), "error".to_string()),
99                ("rustls".to_string(), "error".to_string()),
100                ("tokenizers".to_string(), "error".to_string()),
101                ("axum".to_string(), "error".to_string()),
102                ("tonic".to_string(), "error".to_string()),
103                ("mistralrs_core".to_string(), "error".to_string()),
104                ("hf_hub".to_string(), "error".to_string()),
105            ]),
106        }
107    }
108}
109
110/// Generate a 32-character, lowercase hex trace ID (W3C-compliant)
111fn generate_trace_id() -> String {
112    Uuid::new_v4().simple().to_string()
113}
114
115/// Generate a 16-character, lowercase hex span ID (W3C-compliant)
116fn generate_span_id() -> String {
117    // Use the first 8 bytes (16 hex chars) of a UUID v4
118    let uuid = Uuid::new_v4();
119    let bytes = uuid.as_bytes();
120    bytes[..8].iter().map(|b| format!("{:02x}", b)).collect()
121}
122
123/// Validate a given trace ID according to W3C Trace Context specifications.
124/// A valid trace ID is a 32-character hexadecimal string (lowercase).
125pub fn is_valid_trace_id(trace_id: &str) -> bool {
126    trace_id.len() == 32 && trace_id.chars().all(|c| c.is_ascii_hexdigit())
127}
128
129/// Validate a given span ID according to W3C Trace Context specifications.
130/// A valid span ID is a 16-character hexadecimal string (lowercase).
131pub fn is_valid_span_id(span_id: &str) -> bool {
132    span_id.len() == 16 && span_id.chars().all(|c| c.is_ascii_hexdigit())
133}
134
135pub struct DistributedTraceIdLayer;
136
137#[derive(Debug, Clone, Serialize, Deserialize)]
138pub struct DistributedTraceContext {
139    pub trace_id: String,
140    pub span_id: String,
141    #[serde(skip_serializing_if = "Option::is_none")]
142    pub parent_id: Option<String>,
143    #[serde(skip_serializing_if = "Option::is_none")]
144    pub tracestate: Option<String>,
145    #[serde(skip)]
146    start: Option<Instant>,
147    #[serde(skip)]
148    end: Option<Instant>,
149    #[serde(skip_serializing_if = "Option::is_none")]
150    pub x_request_id: Option<String>,
151    #[serde(skip_serializing_if = "Option::is_none")]
152    pub x_dynamo_request_id: Option<String>,
153}
154
155impl DistributedTraceContext {
156    /// Create a traceparent string from the context
157    pub fn create_traceparent(&self) -> String {
158        format!("00-{}-{}-01", self.trace_id, self.span_id)
159    }
160}
161
162/// Parse a traceparent string into its components
163pub fn parse_traceparent(traceparent: &str) -> (Option<String>, Option<String>) {
164    let pieces: Vec<_> = traceparent.split('-').collect();
165    if pieces.len() != 4 {
166        return (None, None);
167    }
168    let trace_id = pieces[1];
169    let parent_id = pieces[2];
170
171    if !is_valid_trace_id(trace_id) || !is_valid_span_id(parent_id) {
172        return (None, None);
173    }
174
175    (Some(trace_id.to_string()), Some(parent_id.to_string()))
176}
177
178#[derive(Debug, Clone, Default)]
179pub struct TraceParent {
180    pub trace_id: Option<String>,
181    pub parent_id: Option<String>,
182    pub tracestate: Option<String>,
183    pub x_request_id: Option<String>,
184    pub x_dynamo_request_id: Option<String>,
185}
186
187pub trait GenericHeaders {
188    fn get(&self, key: &str) -> Option<&str>;
189}
190
191impl GenericHeaders for async_nats::HeaderMap {
192    fn get(&self, key: &str) -> Option<&str> {
193        async_nats::HeaderMap::get(self, key).map(|value| value.as_str())
194    }
195}
196
197impl GenericHeaders for http::HeaderMap {
198    fn get(&self, key: &str) -> Option<&str> {
199        http::HeaderMap::get(self, key).and_then(|value| value.to_str().ok())
200    }
201}
202
203impl TraceParent {
204    pub fn from_headers<H: GenericHeaders>(headers: &H) -> TraceParent {
205        let mut trace_id = None;
206        let mut parent_id = None;
207        let mut tracestate = None;
208        let mut x_request_id = None;
209        let mut x_dynamo_request_id = None;
210
211        if let Some(header_value) = headers.get("traceparent") {
212            (trace_id, parent_id) = parse_traceparent(header_value);
213        }
214
215        if let Some(header_value) = headers.get("x-request-id") {
216            x_request_id = Some(header_value.to_string());
217        }
218
219        if let Some(header_value) = headers.get("tracestate") {
220            tracestate = Some(header_value.to_string());
221        }
222
223        if let Some(header_value) = headers.get("x-dynamo-request-id") {
224            x_dynamo_request_id = Some(header_value.to_string());
225        }
226
227        // Validate UUID format
228        let x_dynamo_request_id =
229            x_dynamo_request_id.filter(|id| uuid::Uuid::parse_str(id).is_ok());
230        TraceParent {
231            trace_id,
232            parent_id,
233            tracestate,
234            x_request_id,
235            x_dynamo_request_id,
236        }
237    }
238}
239
240// Takes Axum request and returning a span
241pub fn make_request_span<B>(req: &Request<B>) -> Span {
242    let method = req.method();
243    let uri = req.uri();
244    let version = format!("{:?}", req.version());
245
246    let trace_parent = TraceParent::from_headers(req.headers());
247
248    tracing::info_span!(
249        "http-request",
250        method = %method,
251        uri = %uri,
252        version = %version,
253        trace_id = trace_parent.trace_id,
254        parent_id = trace_parent.parent_id,
255        x_request_id = trace_parent.x_request_id,
256    x_dynamo_request_id = trace_parent.x_dynamo_request_id,
257
258    )
259}
260
261#[derive(Debug, Default)]
262pub struct FieldVisitor {
263    pub fields: HashMap<String, String>,
264}
265
266impl Visit for FieldVisitor {
267    fn record_str(&mut self, field: &Field, value: &str) {
268        self.fields
269            .insert(field.name().to_string(), value.to_string());
270    }
271
272    fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
273        self.fields
274            .insert(field.name().to_string(), format!("{:?}", value).to_string());
275    }
276}
277
278impl<S> Layer<S> for DistributedTraceIdLayer
279where
280    S: Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>,
281{
282    // Capture close span time
283    // Currently not used but added for future use in timing
284    fn on_close(&self, id: Id, ctx: Context<'_, S>) {
285        if let Some(span) = ctx.span(&id) {
286            let mut extensions = span.extensions_mut();
287            if let Some(distributed_tracing_context) =
288                extensions.get_mut::<DistributedTraceContext>()
289            {
290                distributed_tracing_context.end = Some(Instant::now());
291            }
292        }
293    }
294
295    // Adds W3C compliant span_id, trace_id, and parent_id if not already present
296    fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &Id, ctx: Context<'_, S>) {
297        if let Some(span) = ctx.span(id) {
298            let mut trace_id: Option<String> = None;
299            let mut parent_id: Option<String> = None;
300            let mut span_id: Option<String> = None;
301            let mut x_request_id: Option<String> = None;
302            let mut x_dynamo_request_id: Option<String> = None;
303            let mut tracestate: Option<String> = None;
304            let mut visitor = FieldVisitor::default();
305            attrs.record(&mut visitor);
306
307            if let Some(trace_id_input) = visitor.fields.get("trace_id") {
308                if !is_valid_trace_id(trace_id_input) {
309                    tracing::trace!("trace id  '{}' is not valid! Ignoring.", trace_id_input);
310                } else {
311                    trace_id = Some(trace_id_input.to_string());
312                }
313            }
314
315            if let Some(span_id_input) = visitor.fields.get("span_id") {
316                if !is_valid_span_id(span_id_input) {
317                    tracing::trace!("span id  '{}' is not valid! Ignoring.", span_id_input);
318                } else {
319                    span_id = Some(span_id_input.to_string());
320                }
321            }
322
323            if let Some(parent_id_input) = visitor.fields.get("parent_id") {
324                if !is_valid_span_id(parent_id_input) {
325                    tracing::trace!("parent id  '{}' is not valid! Ignoring.", parent_id_input);
326                } else {
327                    parent_id = Some(parent_id_input.to_string());
328                }
329            }
330
331            if let Some(tracestate_input) = visitor.fields.get("tracestate") {
332                tracestate = Some(tracestate_input.to_string());
333            }
334
335            if let Some(x_request_id_input) = visitor.fields.get("x_request_id") {
336                x_request_id = Some(x_request_id_input.to_string());
337            }
338
339            if let Some(x_request_id_input) = visitor.fields.get("x_dynamo_request_id") {
340                x_dynamo_request_id = Some(x_request_id_input.to_string());
341            }
342
343            if parent_id.is_none()
344                && let Some(parent_span_id) = ctx.current_span().id()
345                && let Some(parent_span) = ctx.span(parent_span_id)
346            {
347                let parent_ext = parent_span.extensions();
348                if let Some(parent_tracing_context) = parent_ext.get::<DistributedTraceContext>() {
349                    trace_id = Some(parent_tracing_context.trace_id.clone());
350                    parent_id = Some(parent_tracing_context.span_id.clone());
351                    tracestate = parent_tracing_context.tracestate.clone();
352                }
353            }
354
355            if (parent_id.is_some() || span_id.is_some()) && trace_id.is_none() {
356                tracing::error!("parent id or span id are set but trace id is not set!");
357                // Clear inconsistent IDs to maintain trace integrity
358                parent_id = None;
359                span_id = None;
360            }
361
362            if trace_id.is_none() {
363                trace_id = Some(generate_trace_id());
364            }
365            if span_id.is_none() {
366                span_id = Some(generate_span_id());
367            }
368
369            let mut extensions = span.extensions_mut();
370            extensions.insert(DistributedTraceContext {
371                trace_id: trace_id.expect("Trace ID must be set"),
372                span_id: span_id.expect("Span ID must be set"),
373                parent_id,
374                tracestate,
375                start: Some(Instant::now()),
376                end: None,
377                x_request_id,
378                x_dynamo_request_id,
379            });
380        }
381    }
382}
383
384// Enables functions to retreive their current
385// context for adding to distributed headers
386pub fn get_distributed_tracing_context() -> Option<DistributedTraceContext> {
387    Span::current()
388        .with_subscriber(|(id, subscriber)| {
389            subscriber
390                .downcast_ref::<Registry>()
391                .and_then(|registry| registry.span_data(id))
392                .and_then(|span_data| {
393                    let extensions = span_data.extensions();
394                    extensions.get::<DistributedTraceContext>().cloned()
395                })
396        })
397        .flatten()
398}
399
400/// Initialize the logger
401pub fn init() {
402    INIT.call_once(setup_logging);
403}
404
405#[cfg(feature = "tokio-console")]
406fn setup_logging() {
407    let tokio_console_layer = console_subscriber::ConsoleLayer::builder()
408        .with_default_env()
409        .server_addr(([0, 0, 0, 0], console_subscriber::Server::DEFAULT_PORT))
410        .spawn();
411    let tokio_console_target = tracing_subscriber::filter::Targets::new()
412        .with_default(LevelFilter::ERROR)
413        .with_target("runtime", LevelFilter::TRACE)
414        .with_target("tokio", LevelFilter::TRACE);
415    let l = fmt::layer()
416        .with_ansi(!disable_ansi_logging())
417        .event_format(fmt::format().compact().with_timer(TimeFormatter::new()))
418        .with_writer(std::io::stderr)
419        .with_filter(filters(load_config()));
420    tracing_subscriber::registry()
421        .with(l)
422        .with(tokio_console_layer.with_filter(tokio_console_target))
423        .init();
424}
425
426#[cfg(not(feature = "tokio-console"))]
427fn setup_logging() {
428    let fmt_filter_layer = filters(load_config());
429    let trace_filter_layer = filters(load_config());
430    if jsonl_logging_enabled() {
431        let l = fmt::layer()
432            .with_ansi(false)
433            .with_span_events(FmtSpan::NEW | FmtSpan::CLOSE)
434            .event_format(CustomJsonFormatter::new())
435            .with_writer(std::io::stderr)
436            .with_filter(fmt_filter_layer);
437        tracing_subscriber::registry()
438            .with(DistributedTraceIdLayer.with_filter(trace_filter_layer))
439            .with(l)
440            .init();
441    } else {
442        let l = fmt::layer()
443            .with_ansi(!disable_ansi_logging())
444            .event_format(fmt::format().compact().with_timer(TimeFormatter::new()))
445            .with_writer(std::io::stderr)
446            .with_filter(fmt_filter_layer);
447        tracing_subscriber::registry().with(l).init();
448    }
449}
450
451fn filters(config: LoggingConfig) -> EnvFilter {
452    let mut filter_layer = EnvFilter::builder()
453        .with_default_directive(config.log_level.parse().unwrap())
454        .with_env_var(FILTER_ENV)
455        .from_env_lossy();
456
457    for (module, level) in config.log_filters {
458        match format!("{module}={level}").parse::<Directive>() {
459            Ok(d) => {
460                filter_layer = filter_layer.add_directive(d);
461            }
462            Err(e) => {
463                eprintln!("Failed parsing filter '{level}' for module '{module}': {e}");
464            }
465        }
466    }
467    filter_layer
468}
469
470/// Log a message with file and line info
471/// Used by Python wrapper
472pub fn log_message(level: &str, message: &str, module: &str, file: &str, line: u32) {
473    let level = match level {
474        "debug" => log::Level::Debug,
475        "info" => log::Level::Info,
476        "warn" => log::Level::Warn,
477        "error" => log::Level::Error,
478        "warning" => log::Level::Warn,
479        _ => log::Level::Info,
480    };
481    log::logger().log(
482        &log::Record::builder()
483            .args(format_args!("{}", message))
484            .level(level)
485            .target(module)
486            .file(Some(file))
487            .line(Some(line))
488            .build(),
489    );
490}
491
492fn load_config() -> LoggingConfig {
493    let config_path = std::env::var(CONFIG_PATH_ENV).unwrap_or_else(|_| "".to_string());
494    let figment = Figment::new()
495        .merge(Serialized::defaults(LoggingConfig::default()))
496        .merge(Toml::file("/opt/dynamo/etc/logging.toml"))
497        .merge(Toml::file(config_path));
498
499    figment.extract().unwrap()
500}
501
502#[derive(Serialize)]
503struct JsonLog<'a> {
504    time: String,
505    level: String,
506    #[serde(skip_serializing_if = "Option::is_none")]
507    file: Option<&'a str>,
508    #[serde(skip_serializing_if = "Option::is_none")]
509    line: Option<u32>,
510    target: &'a str,
511    message: serde_json::Value,
512    #[serde(flatten)]
513    fields: BTreeMap<String, serde_json::Value>,
514}
515
516struct TimeFormatter {
517    use_local_tz: bool,
518}
519
520impl TimeFormatter {
521    fn new() -> Self {
522        Self {
523            use_local_tz: crate::config::use_local_timezone(),
524        }
525    }
526
527    fn format_now(&self) -> String {
528        if self.use_local_tz {
529            chrono::Local::now()
530                .format("%Y-%m-%dT%H:%M:%S%.6f%:z")
531                .to_string()
532        } else {
533            chrono::Utc::now()
534                .format("%Y-%m-%dT%H:%M:%S%.6fZ")
535                .to_string()
536        }
537    }
538}
539
540impl FormatTime for TimeFormatter {
541    fn format_time(&self, w: &mut fmt::format::Writer<'_>) -> std::fmt::Result {
542        write!(w, "{}", self.format_now())
543    }
544}
545
546struct CustomJsonFormatter {
547    time_formatter: TimeFormatter,
548}
549
550impl CustomJsonFormatter {
551    fn new() -> Self {
552        Self {
553            time_formatter: TimeFormatter::new(),
554        }
555    }
556}
557
558use once_cell::sync::Lazy;
559use regex::Regex;
560fn parse_tracing_duration(s: &str) -> Option<u64> {
561    static RE: Lazy<Regex> =
562        Lazy::new(|| Regex::new(r#"^["']?\s*([0-9.]+)\s*(µs|us|ns|ms|s)\s*["']?$"#).unwrap());
563    let captures = RE.captures(s)?;
564    let value: f64 = captures[1].parse().ok()?;
565    let unit = &captures[2];
566    match unit {
567        "ns" => Some((value / 1000.0) as u64),
568        "µs" | "us" => Some(value as u64),
569        "ms" => Some((value * 1000.0) as u64),
570        "s" => Some((value * 1_000_000.0) as u64),
571        _ => None,
572    }
573}
574
575impl<S, N> tracing_subscriber::fmt::FormatEvent<S, N> for CustomJsonFormatter
576where
577    S: Subscriber + for<'a> LookupSpan<'a>,
578    N: for<'a> FormatFields<'a> + 'static,
579{
580    fn format_event(
581        &self,
582        ctx: &FmtContext<'_, S, N>,
583        mut writer: Writer<'_>,
584        event: &Event<'_>,
585    ) -> std::fmt::Result {
586        let mut visitor = JsonVisitor::default();
587        let time = self.time_formatter.format_now();
588        event.record(&mut visitor);
589        let mut message = visitor
590            .fields
591            .remove("message")
592            .unwrap_or(serde_json::Value::String("".to_string()));
593
594        let current_span = event
595            .parent()
596            .and_then(|id| ctx.span(id))
597            .or_else(|| ctx.lookup_current());
598        if let Some(span) = current_span {
599            let ext = span.extensions();
600            let data = ext.get::<FormattedFields<N>>().unwrap();
601            let span_fields: Vec<(&str, &str)> = data
602                .fields
603                .split(' ')
604                .filter_map(|entry| entry.split_once('='))
605                .collect();
606            for (name, value) in span_fields {
607                visitor.fields.insert(
608                    name.to_string(),
609                    serde_json::Value::String(value.trim_matches('"').to_string()),
610                );
611            }
612
613            let busy_us = visitor
614                .fields
615                .remove("time.busy")
616                .and_then(|v| parse_tracing_duration(&v.to_string()));
617            let idle_us = visitor
618                .fields
619                .remove("time.idle")
620                .and_then(|v| parse_tracing_duration(&v.to_string()));
621
622            if let (Some(busy_us), Some(idle_us)) = (busy_us, idle_us) {
623                visitor.fields.insert(
624                    "time.busy_us".to_string(),
625                    serde_json::Value::Number(busy_us.into()),
626                );
627                visitor.fields.insert(
628                    "time.idle_us".to_string(),
629                    serde_json::Value::Number(idle_us.into()),
630                );
631                visitor.fields.insert(
632                    "time.duration_us".to_string(),
633                    serde_json::Value::Number((busy_us + idle_us).into()),
634                );
635            }
636
637            message = match message.as_str() {
638                Some("new") => serde_json::Value::String("SPAN_CREATED".to_string()),
639                Some("close") => serde_json::Value::String("SPAN_CLOSED".to_string()),
640                _ => message.clone(),
641            };
642
643            visitor.fields.insert(
644                "span_name".to_string(),
645                serde_json::Value::String(span.name().to_string()),
646            );
647
648            if let Some(tracing_context) = ext.get::<DistributedTraceContext>() {
649                visitor.fields.insert(
650                    "span_id".to_string(),
651                    serde_json::Value::String(tracing_context.span_id.clone()),
652                );
653                visitor.fields.insert(
654                    "trace_id".to_string(),
655                    serde_json::Value::String(tracing_context.trace_id.clone()),
656                );
657                if let Some(parent_id) = tracing_context.parent_id.clone() {
658                    visitor.fields.insert(
659                        "parent_id".to_string(),
660                        serde_json::Value::String(parent_id),
661                    );
662                } else {
663                    visitor.fields.remove("parent_id");
664                }
665                if let Some(tracestate) = tracing_context.tracestate.clone() {
666                    visitor.fields.insert(
667                        "tracestate".to_string(),
668                        serde_json::Value::String(tracestate),
669                    );
670                } else {
671                    visitor.fields.remove("tracestate");
672                }
673                if let Some(x_request_id) = tracing_context.x_request_id.clone() {
674                    visitor.fields.insert(
675                        "x_request_id".to_string(),
676                        serde_json::Value::String(x_request_id),
677                    );
678                } else {
679                    visitor.fields.remove("x_request_id");
680                }
681
682                if let Some(x_dynamo_request_id) = tracing_context.x_dynamo_request_id.clone() {
683                    visitor.fields.insert(
684                        "x_dynamo_request_id".to_string(),
685                        serde_json::Value::String(x_dynamo_request_id),
686                    );
687                } else {
688                    visitor.fields.remove("x_dynamo_request_id");
689                }
690            } else {
691                tracing::error!(
692                    "Distributed Trace Context not found, falling back to internal ids"
693                );
694                visitor.fields.insert(
695                    "span_id".to_string(),
696                    serde_json::Value::String(span.id().into_u64().to_string()),
697                );
698                if let Some(parent) = span.parent() {
699                    visitor.fields.insert(
700                        "parent_id".to_string(),
701                        serde_json::Value::String(parent.id().into_u64().to_string()),
702                    );
703                }
704            }
705        } else {
706            let reserved_fields = [
707                "trace_id",
708                "span_id",
709                "parent_id",
710                "span_name",
711                "tracestate",
712            ];
713            for reserved_field in reserved_fields {
714                visitor.fields.remove(reserved_field);
715            }
716        }
717        let metadata = event.metadata();
718        let log = JsonLog {
719            level: metadata.level().to_string(),
720            time,
721            file: metadata.file(),
722            line: metadata.line(),
723            target: metadata.target(),
724            message,
725            fields: visitor.fields,
726        };
727        let json = serde_json::to_string(&log).unwrap();
728        writeln!(writer, "{json}")
729    }
730}
731
732#[derive(Default)]
733struct JsonVisitor {
734    fields: BTreeMap<String, serde_json::Value>,
735}
736
737impl tracing::field::Visit for JsonVisitor {
738    fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
739        self.fields.insert(
740            field.name().to_string(),
741            serde_json::Value::String(format!("{value:?}")),
742        );
743    }
744
745    fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
746        if field.name() != "message" {
747            match serde_json::from_str::<Value>(value) {
748                Ok(json_val) => self.fields.insert(field.name().to_string(), json_val),
749                Err(_) => self.fields.insert(field.name().to_string(), value.into()),
750            };
751        } else {
752            self.fields.insert(field.name().to_string(), value.into());
753        }
754    }
755
756    fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
757        self.fields
758            .insert(field.name().to_string(), serde_json::Value::Bool(value));
759    }
760
761    fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
762        self.fields.insert(
763            field.name().to_string(),
764            serde_json::Value::Number(value.into()),
765        );
766    }
767
768    fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
769        self.fields.insert(
770            field.name().to_string(),
771            serde_json::Value::Number(value.into()),
772        );
773    }
774
775    fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
776        use serde_json::value::Number;
777        self.fields.insert(
778            field.name().to_string(),
779            serde_json::Value::Number(Number::from_f64(value).unwrap_or(0.into())),
780        );
781    }
782}
783
784#[cfg(test)]
785pub mod tests {
786    use super::*;
787    use anyhow::{Result, anyhow};
788    use chrono::{DateTime, Utc};
789    use jsonschema::{Draft, JSONSchema};
790    use serde_json::Value;
791    use std::fs::File;
792    use std::io::{BufRead, BufReader};
793    use stdio_override::*;
794    use tempfile::NamedTempFile;
795
796    static LOG_LINE_SCHEMA: &str = r#"
797    {
798      "$schema": "http://json-schema.org/draft-07/schema#",
799      "title": "Runtime Log Line",
800      "type": "object",
801      "required": [
802        "file",
803        "level",
804        "line",
805        "message",
806        "target",
807        "time"
808      ],
809      "properties": {
810        "file":      { "type": "string" },
811        "level":     { "type": "string", "enum": ["ERROR", "WARN", "INFO", "DEBUG", "TRACE"] },
812        "line":      { "type": "integer" },
813        "message":   { "type": "string" },
814        "target":    { "type": "string" },
815        "time":      { "type": "string", "format": "date-time" },
816        "span_id":   { "type": "string", "pattern": "^[a-f0-9]{16}$" },
817        "parent_id": { "type": "string", "pattern": "^[a-f0-9]{16}$" },
818        "trace_id":  { "type": "string", "pattern": "^[a-f0-9]{32}$" },
819        "span_name": { "type": "string" },
820        "time.busy_us":     { "type": "integer" },
821        "time.duration_us": { "type": "integer" },
822        "time.idle_us":     { "type": "integer" },
823        "tracestate": { "type": "string" }
824      },
825      "additionalProperties": true
826    }
827    "#;
828
829    #[tracing::instrument(
830        skip_all,
831        fields(
832            span_id = "abd16e319329445f",
833            trace_id = "2adfd24468724599bb9a4990dc342288"
834        )
835    )]
836    async fn parent() {
837        tracing::Span::current().record("trace_id", "invalid");
838        tracing::Span::current().record("span_id", "invalid");
839        tracing::Span::current().record("span_name", "invalid");
840        tracing::trace!(message = "parent!");
841        if let Some(my_ctx) = get_distributed_tracing_context() {
842            tracing::info!(my_trace_id = my_ctx.trace_id);
843        }
844        child().await;
845    }
846
847    #[tracing::instrument(skip_all)]
848    async fn child() {
849        tracing::trace!(message = "child");
850        if let Some(my_ctx) = get_distributed_tracing_context() {
851            tracing::info!(my_trace_id = my_ctx.trace_id);
852        }
853        grandchild().await;
854    }
855
856    #[tracing::instrument(skip_all)]
857    async fn grandchild() {
858        tracing::trace!(message = "grandchild");
859        if let Some(my_ctx) = get_distributed_tracing_context() {
860            tracing::info!(my_trace_id = my_ctx.trace_id);
861        }
862    }
863
864    pub fn load_log(file_name: &str) -> Result<Vec<serde_json::Value>> {
865        let schema_json: Value =
866            serde_json::from_str(LOG_LINE_SCHEMA).expect("schema parse failure");
867        let compiled_schema = JSONSchema::options()
868            .with_draft(Draft::Draft7)
869            .compile(&schema_json)
870            .expect("Invalid schema");
871
872        let f = File::open(file_name)?;
873        let reader = BufReader::new(f);
874        let mut result = Vec::new();
875
876        for (line_num, line) in reader.lines().enumerate() {
877            let line = line?;
878            let val: Value = serde_json::from_str(&line)
879                .map_err(|e| anyhow!("Line {}: invalid JSON: {}", line_num + 1, e))?;
880
881            if let Err(errors) = compiled_schema.validate(&val) {
882                let errs = errors.map(|e| e.to_string()).collect::<Vec<_>>().join("; ");
883                return Err(anyhow!(
884                    "Line {}: JSON Schema Validation errors: {}",
885                    line_num + 1,
886                    errs
887                ));
888            }
889            println!("{}", val);
890            result.push(val);
891        }
892        Ok(result)
893    }
894
895    #[tokio::test]
896    async fn test_json_log_capture() -> Result<()> {
897        #[allow(clippy::redundant_closure_call)]
898        let _ = temp_env::async_with_vars(
899            [("DYN_LOGGING_JSONL", Some("1"))],
900            (async || {
901                let tmp_file = NamedTempFile::new().unwrap();
902                let file_name = tmp_file.path().to_str().unwrap();
903                let guard = StderrOverride::from_file(file_name)?;
904                init();
905                parent().await;
906                drop(guard);
907
908                let lines = load_log(file_name)?;
909
910                // 1. Validate my_trace_id matches parent's trace ID
911                let parent_trace_id = Uuid::parse_str("2adfd24468724599bb9a4990dc342288")
912                    .unwrap()
913                    .simple()
914                    .to_string();
915                for log_line in &lines {
916                    if let Some(my_trace_id) = log_line.get("my_trace_id") {
917                        assert_eq!(
918                            my_trace_id,
919                            &serde_json::Value::String(parent_trace_id.clone())
920                        );
921                    }
922                }
923
924                // 2. Validate span IDs are unique for SPAN_CREATED and SPAN_CLOSED events
925                let mut created_span_ids: Vec<String> = Vec::new();
926                let mut closed_span_ids: Vec<String> = Vec::new();
927
928                for log_line in &lines {
929                    if let Some(message) = log_line.get("message") {
930                        match message.as_str().unwrap() {
931                            "SPAN_CREATED" => {
932                                if let Some(span_id) = log_line.get("span_id") {
933                                    let span_id_str = span_id.as_str().unwrap();
934                                    assert!(
935                                        created_span_ids.iter().all(|id| id != span_id_str),
936                                        "Duplicate span ID found in SPAN_CREATED: {}",
937                                        span_id_str
938                                    );
939                                    created_span_ids.push(span_id_str.to_string());
940                                }
941                            }
942                            "SPAN_CLOSED" => {
943                                if let Some(span_id) = log_line.get("span_id") {
944                                    let span_id_str = span_id.as_str().unwrap();
945                                    assert!(
946                                        closed_span_ids.iter().all(|id| id != span_id_str),
947                                        "Duplicate span ID found in SPAN_CLOSED: {}",
948                                        span_id_str
949                                    );
950                                    closed_span_ids.push(span_id_str.to_string());
951                                }
952                            }
953                            _ => {}
954                        }
955                    }
956                }
957
958                // Additionally, ensure that every SPAN_CLOSED has a corresponding SPAN_CREATED
959                for closed_span_id in &closed_span_ids {
960                    assert!(
961                        created_span_ids.contains(closed_span_id),
962                        "SPAN_CLOSED without corresponding SPAN_CREATED: {}",
963                        closed_span_id
964                    );
965                }
966
967                // 3. Validate parent span relationships
968                let parent_span_id = lines
969                    .iter()
970                    .find(|log_line| {
971                        log_line.get("message").unwrap().as_str().unwrap() == "SPAN_CREATED"
972                            && log_line.get("span_name").unwrap().as_str().unwrap() == "parent"
973                    })
974                    .and_then(|log_line| {
975                        log_line
976                            .get("span_id")
977                            .map(|s| s.as_str().unwrap().to_string())
978                    })
979                    .unwrap();
980
981                let child_span_id = lines
982                    .iter()
983                    .find(|log_line| {
984                        log_line.get("message").unwrap().as_str().unwrap() == "SPAN_CREATED"
985                            && log_line.get("span_name").unwrap().as_str().unwrap() == "child"
986                    })
987                    .and_then(|log_line| {
988                        log_line
989                            .get("span_id")
990                            .map(|s| s.as_str().unwrap().to_string())
991                    })
992                    .unwrap();
993
994                let _grandchild_span_id = lines
995                    .iter()
996                    .find(|log_line| {
997                        log_line.get("message").unwrap().as_str().unwrap() == "SPAN_CREATED"
998                            && log_line.get("span_name").unwrap().as_str().unwrap() == "grandchild"
999                    })
1000                    .and_then(|log_line| {
1001                        log_line
1002                            .get("span_id")
1003                            .map(|s| s.as_str().unwrap().to_string())
1004                    })
1005                    .unwrap();
1006
1007                // Parent span has no parent_id
1008                for log_line in &lines {
1009                    if let Some(span_name) = log_line.get("span_name")
1010                        && let Some(span_name_str) = span_name.as_str()
1011                        && span_name_str == "parent"
1012                    {
1013                        assert!(log_line.get("parent_id").is_none());
1014                    }
1015                }
1016
1017                // Child span's parent_id is parent_span_id
1018                for log_line in &lines {
1019                    if let Some(span_name) = log_line.get("span_name")
1020                        && let Some(span_name_str) = span_name.as_str()
1021                        && span_name_str == "child"
1022                    {
1023                        assert_eq!(
1024                            log_line.get("parent_id").unwrap().as_str().unwrap(),
1025                            &parent_span_id
1026                        );
1027                    }
1028                }
1029
1030                // Grandchild span's parent_id is child_span_id
1031                for log_line in &lines {
1032                    if let Some(span_name) = log_line.get("span_name")
1033                        && let Some(span_name_str) = span_name.as_str()
1034                        && span_name_str == "grandchild"
1035                    {
1036                        assert_eq!(
1037                            log_line.get("parent_id").unwrap().as_str().unwrap(),
1038                            &child_span_id
1039                        );
1040                    }
1041                }
1042
1043                // Validate duration relationships
1044                let parent_duration = lines
1045                    .iter()
1046                    .find(|log_line| {
1047                        log_line.get("message").unwrap().as_str().unwrap() == "SPAN_CLOSED"
1048                            && log_line.get("span_name").unwrap().as_str().unwrap() == "parent"
1049                    })
1050                    .and_then(|log_line| {
1051                        log_line
1052                            .get("time.duration_us")
1053                            .map(|d| d.as_u64().unwrap())
1054                    })
1055                    .unwrap();
1056
1057                let child_duration = lines
1058                    .iter()
1059                    .find(|log_line| {
1060                        log_line.get("message").unwrap().as_str().unwrap() == "SPAN_CLOSED"
1061                            && log_line.get("span_name").unwrap().as_str().unwrap() == "child"
1062                    })
1063                    .and_then(|log_line| {
1064                        log_line
1065                            .get("time.duration_us")
1066                            .map(|d| d.as_u64().unwrap())
1067                    })
1068                    .unwrap();
1069
1070                let grandchild_duration = lines
1071                    .iter()
1072                    .find(|log_line| {
1073                        log_line.get("message").unwrap().as_str().unwrap() == "SPAN_CLOSED"
1074                            && log_line.get("span_name").unwrap().as_str().unwrap() == "grandchild"
1075                    })
1076                    .and_then(|log_line| {
1077                        log_line
1078                            .get("time.duration_us")
1079                            .map(|d| d.as_u64().unwrap())
1080                    })
1081                    .unwrap();
1082
1083                assert!(
1084                    parent_duration > child_duration + grandchild_duration,
1085                    "Parent duration is not greater than the sum of child and grandchild durations"
1086                );
1087                assert!(
1088                    child_duration > grandchild_duration,
1089                    "Child duration is not greater than grandchild duration"
1090                );
1091
1092                Ok::<(), anyhow::Error>(())
1093            })(),
1094        )
1095        .await;
1096        Ok(())
1097    }
1098}