1use std::collections::{BTreeMap, HashMap};
30use std::sync::Once;
31
32use figment::{
33 Figment,
34 providers::{Format, Serialized, Toml},
35};
36use serde::{Deserialize, Serialize};
37use tracing::level_filters::LevelFilter;
38use tracing::{Event, Subscriber};
39use tracing_subscriber::EnvFilter;
40use tracing_subscriber::fmt::time::FormatTime;
41use tracing_subscriber::fmt::time::LocalTime;
42use tracing_subscriber::fmt::time::SystemTime;
43use tracing_subscriber::fmt::time::UtcTime;
44use tracing_subscriber::fmt::{FmtContext, FormatFields};
45use tracing_subscriber::fmt::{FormattedFields, format::Writer};
46use tracing_subscriber::prelude::*;
47use tracing_subscriber::registry::LookupSpan;
48use tracing_subscriber::{filter::Directive, fmt};
49
50use crate::config::{disable_ansi_logging, jsonl_logging_enabled};
51use async_nats::{HeaderMap, HeaderValue};
52use axum::extract::FromRequestParts;
53use axum::http;
54use axum::http::Request;
55use axum::http::request::Parts;
56use serde_json::Value;
57use std::convert::Infallible;
58use std::time::Instant;
59use tower_http::trace::{DefaultMakeSpan, TraceLayer};
60use tracing::Id;
61use tracing::Span;
62use tracing::field::Field;
63use tracing::span;
64use tracing_subscriber::Layer;
65use tracing_subscriber::Registry;
66use tracing_subscriber::field::Visit;
67use tracing_subscriber::fmt::format::FmtSpan;
68use tracing_subscriber::layer::Context;
69use tracing_subscriber::registry::SpanData;
70use uuid::Uuid;
71
72const FILTER_ENV: &str = "DYN_LOG";
74
75const DEFAULT_FILTER_LEVEL: &str = "info";
77
78const CONFIG_PATH_ENV: &str = "DYN_LOGGING_CONFIG_PATH";
80
81static INIT: Once = Once::new();
83
84#[derive(Serialize, Deserialize, Debug)]
85struct LoggingConfig {
86 log_level: String,
87 log_filters: HashMap<String, String>,
88}
89impl Default for LoggingConfig {
90 fn default() -> Self {
91 LoggingConfig {
92 log_level: DEFAULT_FILTER_LEVEL.to_string(),
93 log_filters: HashMap::from([
94 ("h2".to_string(), "error".to_string()),
95 ("tower".to_string(), "error".to_string()),
96 ("hyper_util".to_string(), "error".to_string()),
97 ("neli".to_string(), "error".to_string()),
98 ("async_nats".to_string(), "error".to_string()),
99 ("rustls".to_string(), "error".to_string()),
100 ("tokenizers".to_string(), "error".to_string()),
101 ("axum".to_string(), "error".to_string()),
102 ("tonic".to_string(), "error".to_string()),
103 ("mistralrs_core".to_string(), "error".to_string()),
104 ("hf_hub".to_string(), "error".to_string()),
105 ]),
106 }
107 }
108}
109
110fn generate_trace_id() -> String {
112 Uuid::new_v4().simple().to_string()
113}
114
115fn generate_span_id() -> String {
117 let uuid = Uuid::new_v4();
119 let bytes = uuid.as_bytes();
120 bytes[..8].iter().map(|b| format!("{:02x}", b)).collect()
121}
122
123pub fn is_valid_trace_id(trace_id: &str) -> bool {
126 trace_id.len() == 32 && trace_id.chars().all(|c| c.is_ascii_hexdigit())
127}
128
129pub fn is_valid_span_id(span_id: &str) -> bool {
132 span_id.len() == 16 && span_id.chars().all(|c| c.is_ascii_hexdigit())
133}
134
135pub struct DistributedTraceIdLayer;
136
137#[derive(Debug, Clone, Serialize, Deserialize)]
138pub struct DistributedTraceContext {
139 pub trace_id: String,
140 pub span_id: String,
141 #[serde(skip_serializing_if = "Option::is_none")]
142 pub parent_id: Option<String>,
143 #[serde(skip_serializing_if = "Option::is_none")]
144 pub tracestate: Option<String>,
145 #[serde(skip)]
146 start: Option<Instant>,
147 #[serde(skip)]
148 end: Option<Instant>,
149 #[serde(skip_serializing_if = "Option::is_none")]
150 pub x_request_id: Option<String>,
151 #[serde(skip_serializing_if = "Option::is_none")]
152 pub x_dynamo_request_id: Option<String>,
153}
154
155impl DistributedTraceContext {
156 pub fn create_traceparent(&self) -> String {
158 format!("00-{}-{}-01", self.trace_id, self.span_id)
159 }
160}
161
162pub fn parse_traceparent(traceparent: &str) -> (Option<String>, Option<String>) {
164 let pieces: Vec<_> = traceparent.split('-').collect();
165 if pieces.len() != 4 {
166 return (None, None);
167 }
168 let trace_id = pieces[1];
169 let parent_id = pieces[2];
170
171 if !is_valid_trace_id(trace_id) || !is_valid_span_id(parent_id) {
172 return (None, None);
173 }
174
175 (Some(trace_id.to_string()), Some(parent_id.to_string()))
176}
177
178#[derive(Debug, Clone, Default)]
179pub struct TraceParent {
180 pub trace_id: Option<String>,
181 pub parent_id: Option<String>,
182 pub tracestate: Option<String>,
183 pub x_request_id: Option<String>,
184 pub x_dynamo_request_id: Option<String>,
185}
186
187pub trait GenericHeaders {
188 fn get(&self, key: &str) -> Option<&str>;
189}
190
191impl GenericHeaders for async_nats::HeaderMap {
192 fn get(&self, key: &str) -> Option<&str> {
193 async_nats::HeaderMap::get(self, key).map(|value| value.as_str())
194 }
195}
196
197impl GenericHeaders for http::HeaderMap {
198 fn get(&self, key: &str) -> Option<&str> {
199 http::HeaderMap::get(self, key).and_then(|value| value.to_str().ok())
200 }
201}
202
203impl TraceParent {
204 pub fn from_headers<H: GenericHeaders>(headers: &H) -> TraceParent {
205 let mut trace_id = None;
206 let mut parent_id = None;
207 let mut tracestate = None;
208 let mut x_request_id = None;
209 let mut x_dynamo_request_id = None;
210
211 if let Some(header_value) = headers.get("traceparent") {
212 (trace_id, parent_id) = parse_traceparent(header_value);
213 }
214
215 if let Some(header_value) = headers.get("x-request-id") {
216 x_request_id = Some(header_value.to_string());
217 }
218
219 if let Some(header_value) = headers.get("tracestate") {
220 tracestate = Some(header_value.to_string());
221 }
222
223 if let Some(header_value) = headers.get("x-dynamo-request-id") {
224 x_dynamo_request_id = Some(header_value.to_string());
225 }
226
227 let x_dynamo_request_id =
229 x_dynamo_request_id.filter(|id| uuid::Uuid::parse_str(id).is_ok());
230 TraceParent {
231 trace_id,
232 parent_id,
233 tracestate,
234 x_request_id,
235 x_dynamo_request_id,
236 }
237 }
238}
239
240pub fn make_request_span<B>(req: &Request<B>) -> Span {
242 let method = req.method();
243 let uri = req.uri();
244 let version = format!("{:?}", req.version());
245
246 let trace_parent = TraceParent::from_headers(req.headers());
247
248 tracing::info_span!(
249 "http-request",
250 method = %method,
251 uri = %uri,
252 version = %version,
253 trace_id = trace_parent.trace_id,
254 parent_id = trace_parent.parent_id,
255 x_request_id = trace_parent.x_request_id,
256 x_dynamo_request_id = trace_parent.x_dynamo_request_id,
257
258 )
259}
260
261#[derive(Debug, Default)]
262pub struct FieldVisitor {
263 pub fields: HashMap<String, String>,
264}
265
266impl Visit for FieldVisitor {
267 fn record_str(&mut self, field: &Field, value: &str) {
268 self.fields
269 .insert(field.name().to_string(), value.to_string());
270 }
271
272 fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
273 self.fields
274 .insert(field.name().to_string(), format!("{:?}", value).to_string());
275 }
276}
277
278impl<S> Layer<S> for DistributedTraceIdLayer
279where
280 S: Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>,
281{
282 fn on_close(&self, id: Id, ctx: Context<'_, S>) {
285 if let Some(span) = ctx.span(&id) {
286 let mut extensions = span.extensions_mut();
287 if let Some(distributed_tracing_context) =
288 extensions.get_mut::<DistributedTraceContext>()
289 {
290 distributed_tracing_context.end = Some(Instant::now());
291 }
292 }
293 }
294
295 fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &Id, ctx: Context<'_, S>) {
297 if let Some(span) = ctx.span(id) {
298 let mut trace_id: Option<String> = None;
299 let mut parent_id: Option<String> = None;
300 let mut span_id: Option<String> = None;
301 let mut x_request_id: Option<String> = None;
302 let mut x_dynamo_request_id: Option<String> = None;
303 let mut tracestate: Option<String> = None;
304 let mut visitor = FieldVisitor::default();
305 attrs.record(&mut visitor);
306
307 if let Some(trace_id_input) = visitor.fields.get("trace_id") {
308 if !is_valid_trace_id(trace_id_input) {
309 tracing::trace!("trace id '{}' is not valid! Ignoring.", trace_id_input);
310 } else {
311 trace_id = Some(trace_id_input.to_string());
312 }
313 }
314
315 if let Some(span_id_input) = visitor.fields.get("span_id") {
316 if !is_valid_span_id(span_id_input) {
317 tracing::trace!("span id '{}' is not valid! Ignoring.", span_id_input);
318 } else {
319 span_id = Some(span_id_input.to_string());
320 }
321 }
322
323 if let Some(parent_id_input) = visitor.fields.get("parent_id") {
324 if !is_valid_span_id(parent_id_input) {
325 tracing::trace!("parent id '{}' is not valid! Ignoring.", parent_id_input);
326 } else {
327 parent_id = Some(parent_id_input.to_string());
328 }
329 }
330
331 if let Some(tracestate_input) = visitor.fields.get("tracestate") {
332 tracestate = Some(tracestate_input.to_string());
333 }
334
335 if let Some(x_request_id_input) = visitor.fields.get("x_request_id") {
336 x_request_id = Some(x_request_id_input.to_string());
337 }
338
339 if let Some(x_request_id_input) = visitor.fields.get("x_dynamo_request_id") {
340 x_dynamo_request_id = Some(x_request_id_input.to_string());
341 }
342
343 if parent_id.is_none()
344 && let Some(parent_span_id) = ctx.current_span().id()
345 && let Some(parent_span) = ctx.span(parent_span_id)
346 {
347 let parent_ext = parent_span.extensions();
348 if let Some(parent_tracing_context) = parent_ext.get::<DistributedTraceContext>() {
349 trace_id = Some(parent_tracing_context.trace_id.clone());
350 parent_id = Some(parent_tracing_context.span_id.clone());
351 tracestate = parent_tracing_context.tracestate.clone();
352 }
353 }
354
355 if (parent_id.is_some() || span_id.is_some()) && trace_id.is_none() {
356 tracing::error!("parent id or span id are set but trace id is not set!");
357 parent_id = None;
359 span_id = None;
360 }
361
362 if trace_id.is_none() {
363 trace_id = Some(generate_trace_id());
364 }
365 if span_id.is_none() {
366 span_id = Some(generate_span_id());
367 }
368
369 let mut extensions = span.extensions_mut();
370 extensions.insert(DistributedTraceContext {
371 trace_id: trace_id.expect("Trace ID must be set"),
372 span_id: span_id.expect("Span ID must be set"),
373 parent_id,
374 tracestate,
375 start: Some(Instant::now()),
376 end: None,
377 x_request_id,
378 x_dynamo_request_id,
379 });
380 }
381 }
382}
383
384pub fn get_distributed_tracing_context() -> Option<DistributedTraceContext> {
387 Span::current()
388 .with_subscriber(|(id, subscriber)| {
389 subscriber
390 .downcast_ref::<Registry>()
391 .and_then(|registry| registry.span_data(id))
392 .and_then(|span_data| {
393 let extensions = span_data.extensions();
394 extensions.get::<DistributedTraceContext>().cloned()
395 })
396 })
397 .flatten()
398}
399
400pub fn init() {
402 INIT.call_once(setup_logging);
403}
404
405#[cfg(feature = "tokio-console")]
406fn setup_logging() {
407 let tokio_console_layer = console_subscriber::ConsoleLayer::builder()
408 .with_default_env()
409 .server_addr(([0, 0, 0, 0], console_subscriber::Server::DEFAULT_PORT))
410 .spawn();
411 let tokio_console_target = tracing_subscriber::filter::Targets::new()
412 .with_default(LevelFilter::ERROR)
413 .with_target("runtime", LevelFilter::TRACE)
414 .with_target("tokio", LevelFilter::TRACE);
415 let l = fmt::layer()
416 .with_ansi(!disable_ansi_logging())
417 .event_format(fmt::format().compact().with_timer(TimeFormatter::new()))
418 .with_writer(std::io::stderr)
419 .with_filter(filters(load_config()));
420 tracing_subscriber::registry()
421 .with(l)
422 .with(tokio_console_layer.with_filter(tokio_console_target))
423 .init();
424}
425
426#[cfg(not(feature = "tokio-console"))]
427fn setup_logging() {
428 let fmt_filter_layer = filters(load_config());
429 let trace_filter_layer = filters(load_config());
430 if jsonl_logging_enabled() {
431 let l = fmt::layer()
432 .with_ansi(false)
433 .with_span_events(FmtSpan::NEW | FmtSpan::CLOSE)
434 .event_format(CustomJsonFormatter::new())
435 .with_writer(std::io::stderr)
436 .with_filter(fmt_filter_layer);
437 tracing_subscriber::registry()
438 .with(DistributedTraceIdLayer.with_filter(trace_filter_layer))
439 .with(l)
440 .init();
441 } else {
442 let l = fmt::layer()
443 .with_ansi(!disable_ansi_logging())
444 .event_format(fmt::format().compact().with_timer(TimeFormatter::new()))
445 .with_writer(std::io::stderr)
446 .with_filter(fmt_filter_layer);
447 tracing_subscriber::registry().with(l).init();
448 }
449}
450
451fn filters(config: LoggingConfig) -> EnvFilter {
452 let mut filter_layer = EnvFilter::builder()
453 .with_default_directive(config.log_level.parse().unwrap())
454 .with_env_var(FILTER_ENV)
455 .from_env_lossy();
456
457 for (module, level) in config.log_filters {
458 match format!("{module}={level}").parse::<Directive>() {
459 Ok(d) => {
460 filter_layer = filter_layer.add_directive(d);
461 }
462 Err(e) => {
463 eprintln!("Failed parsing filter '{level}' for module '{module}': {e}");
464 }
465 }
466 }
467 filter_layer
468}
469
470pub fn log_message(level: &str, message: &str, module: &str, file: &str, line: u32) {
473 let level = match level {
474 "debug" => log::Level::Debug,
475 "info" => log::Level::Info,
476 "warn" => log::Level::Warn,
477 "error" => log::Level::Error,
478 "warning" => log::Level::Warn,
479 _ => log::Level::Info,
480 };
481 log::logger().log(
482 &log::Record::builder()
483 .args(format_args!("{}", message))
484 .level(level)
485 .target(module)
486 .file(Some(file))
487 .line(Some(line))
488 .build(),
489 );
490}
491
492fn load_config() -> LoggingConfig {
493 let config_path = std::env::var(CONFIG_PATH_ENV).unwrap_or_else(|_| "".to_string());
494 let figment = Figment::new()
495 .merge(Serialized::defaults(LoggingConfig::default()))
496 .merge(Toml::file("/opt/dynamo/etc/logging.toml"))
497 .merge(Toml::file(config_path));
498
499 figment.extract().unwrap()
500}
501
502#[derive(Serialize)]
503struct JsonLog<'a> {
504 time: String,
505 level: String,
506 #[serde(skip_serializing_if = "Option::is_none")]
507 file: Option<&'a str>,
508 #[serde(skip_serializing_if = "Option::is_none")]
509 line: Option<u32>,
510 target: &'a str,
511 message: serde_json::Value,
512 #[serde(flatten)]
513 fields: BTreeMap<String, serde_json::Value>,
514}
515
516struct TimeFormatter {
517 use_local_tz: bool,
518}
519
520impl TimeFormatter {
521 fn new() -> Self {
522 Self {
523 use_local_tz: crate::config::use_local_timezone(),
524 }
525 }
526
527 fn format_now(&self) -> String {
528 if self.use_local_tz {
529 chrono::Local::now()
530 .format("%Y-%m-%dT%H:%M:%S%.6f%:z")
531 .to_string()
532 } else {
533 chrono::Utc::now()
534 .format("%Y-%m-%dT%H:%M:%S%.6fZ")
535 .to_string()
536 }
537 }
538}
539
540impl FormatTime for TimeFormatter {
541 fn format_time(&self, w: &mut fmt::format::Writer<'_>) -> std::fmt::Result {
542 write!(w, "{}", self.format_now())
543 }
544}
545
546struct CustomJsonFormatter {
547 time_formatter: TimeFormatter,
548}
549
550impl CustomJsonFormatter {
551 fn new() -> Self {
552 Self {
553 time_formatter: TimeFormatter::new(),
554 }
555 }
556}
557
558use once_cell::sync::Lazy;
559use regex::Regex;
560fn parse_tracing_duration(s: &str) -> Option<u64> {
561 static RE: Lazy<Regex> =
562 Lazy::new(|| Regex::new(r#"^["']?\s*([0-9.]+)\s*(µs|us|ns|ms|s)\s*["']?$"#).unwrap());
563 let captures = RE.captures(s)?;
564 let value: f64 = captures[1].parse().ok()?;
565 let unit = &captures[2];
566 match unit {
567 "ns" => Some((value / 1000.0) as u64),
568 "µs" | "us" => Some(value as u64),
569 "ms" => Some((value * 1000.0) as u64),
570 "s" => Some((value * 1_000_000.0) as u64),
571 _ => None,
572 }
573}
574
575impl<S, N> tracing_subscriber::fmt::FormatEvent<S, N> for CustomJsonFormatter
576where
577 S: Subscriber + for<'a> LookupSpan<'a>,
578 N: for<'a> FormatFields<'a> + 'static,
579{
580 fn format_event(
581 &self,
582 ctx: &FmtContext<'_, S, N>,
583 mut writer: Writer<'_>,
584 event: &Event<'_>,
585 ) -> std::fmt::Result {
586 let mut visitor = JsonVisitor::default();
587 let time = self.time_formatter.format_now();
588 event.record(&mut visitor);
589 let mut message = visitor
590 .fields
591 .remove("message")
592 .unwrap_or(serde_json::Value::String("".to_string()));
593
594 let current_span = event
595 .parent()
596 .and_then(|id| ctx.span(id))
597 .or_else(|| ctx.lookup_current());
598 if let Some(span) = current_span {
599 let ext = span.extensions();
600 let data = ext.get::<FormattedFields<N>>().unwrap();
601 let span_fields: Vec<(&str, &str)> = data
602 .fields
603 .split(' ')
604 .filter_map(|entry| entry.split_once('='))
605 .collect();
606 for (name, value) in span_fields {
607 visitor.fields.insert(
608 name.to_string(),
609 serde_json::Value::String(value.trim_matches('"').to_string()),
610 );
611 }
612
613 let busy_us = visitor
614 .fields
615 .remove("time.busy")
616 .and_then(|v| parse_tracing_duration(&v.to_string()));
617 let idle_us = visitor
618 .fields
619 .remove("time.idle")
620 .and_then(|v| parse_tracing_duration(&v.to_string()));
621
622 if let (Some(busy_us), Some(idle_us)) = (busy_us, idle_us) {
623 visitor.fields.insert(
624 "time.busy_us".to_string(),
625 serde_json::Value::Number(busy_us.into()),
626 );
627 visitor.fields.insert(
628 "time.idle_us".to_string(),
629 serde_json::Value::Number(idle_us.into()),
630 );
631 visitor.fields.insert(
632 "time.duration_us".to_string(),
633 serde_json::Value::Number((busy_us + idle_us).into()),
634 );
635 }
636
637 message = match message.as_str() {
638 Some("new") => serde_json::Value::String("SPAN_CREATED".to_string()),
639 Some("close") => serde_json::Value::String("SPAN_CLOSED".to_string()),
640 _ => message.clone(),
641 };
642
643 visitor.fields.insert(
644 "span_name".to_string(),
645 serde_json::Value::String(span.name().to_string()),
646 );
647
648 if let Some(tracing_context) = ext.get::<DistributedTraceContext>() {
649 visitor.fields.insert(
650 "span_id".to_string(),
651 serde_json::Value::String(tracing_context.span_id.clone()),
652 );
653 visitor.fields.insert(
654 "trace_id".to_string(),
655 serde_json::Value::String(tracing_context.trace_id.clone()),
656 );
657 if let Some(parent_id) = tracing_context.parent_id.clone() {
658 visitor.fields.insert(
659 "parent_id".to_string(),
660 serde_json::Value::String(parent_id),
661 );
662 } else {
663 visitor.fields.remove("parent_id");
664 }
665 if let Some(tracestate) = tracing_context.tracestate.clone() {
666 visitor.fields.insert(
667 "tracestate".to_string(),
668 serde_json::Value::String(tracestate),
669 );
670 } else {
671 visitor.fields.remove("tracestate");
672 }
673 if let Some(x_request_id) = tracing_context.x_request_id.clone() {
674 visitor.fields.insert(
675 "x_request_id".to_string(),
676 serde_json::Value::String(x_request_id),
677 );
678 } else {
679 visitor.fields.remove("x_request_id");
680 }
681
682 if let Some(x_dynamo_request_id) = tracing_context.x_dynamo_request_id.clone() {
683 visitor.fields.insert(
684 "x_dynamo_request_id".to_string(),
685 serde_json::Value::String(x_dynamo_request_id),
686 );
687 } else {
688 visitor.fields.remove("x_dynamo_request_id");
689 }
690 } else {
691 tracing::error!(
692 "Distributed Trace Context not found, falling back to internal ids"
693 );
694 visitor.fields.insert(
695 "span_id".to_string(),
696 serde_json::Value::String(span.id().into_u64().to_string()),
697 );
698 if let Some(parent) = span.parent() {
699 visitor.fields.insert(
700 "parent_id".to_string(),
701 serde_json::Value::String(parent.id().into_u64().to_string()),
702 );
703 }
704 }
705 } else {
706 let reserved_fields = [
707 "trace_id",
708 "span_id",
709 "parent_id",
710 "span_name",
711 "tracestate",
712 ];
713 for reserved_field in reserved_fields {
714 visitor.fields.remove(reserved_field);
715 }
716 }
717 let metadata = event.metadata();
718 let log = JsonLog {
719 level: metadata.level().to_string(),
720 time,
721 file: metadata.file(),
722 line: metadata.line(),
723 target: metadata.target(),
724 message,
725 fields: visitor.fields,
726 };
727 let json = serde_json::to_string(&log).unwrap();
728 writeln!(writer, "{json}")
729 }
730}
731
732#[derive(Default)]
733struct JsonVisitor {
734 fields: BTreeMap<String, serde_json::Value>,
735}
736
737impl tracing::field::Visit for JsonVisitor {
738 fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
739 self.fields.insert(
740 field.name().to_string(),
741 serde_json::Value::String(format!("{value:?}")),
742 );
743 }
744
745 fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
746 if field.name() != "message" {
747 match serde_json::from_str::<Value>(value) {
748 Ok(json_val) => self.fields.insert(field.name().to_string(), json_val),
749 Err(_) => self.fields.insert(field.name().to_string(), value.into()),
750 };
751 } else {
752 self.fields.insert(field.name().to_string(), value.into());
753 }
754 }
755
756 fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
757 self.fields
758 .insert(field.name().to_string(), serde_json::Value::Bool(value));
759 }
760
761 fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
762 self.fields.insert(
763 field.name().to_string(),
764 serde_json::Value::Number(value.into()),
765 );
766 }
767
768 fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
769 self.fields.insert(
770 field.name().to_string(),
771 serde_json::Value::Number(value.into()),
772 );
773 }
774
775 fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
776 use serde_json::value::Number;
777 self.fields.insert(
778 field.name().to_string(),
779 serde_json::Value::Number(Number::from_f64(value).unwrap_or(0.into())),
780 );
781 }
782}
783
784#[cfg(test)]
785pub mod tests {
786 use super::*;
787 use anyhow::{Result, anyhow};
788 use chrono::{DateTime, Utc};
789 use jsonschema::{Draft, JSONSchema};
790 use serde_json::Value;
791 use std::fs::File;
792 use std::io::{BufRead, BufReader};
793 use stdio_override::*;
794 use tempfile::NamedTempFile;
795
796 static LOG_LINE_SCHEMA: &str = r#"
797 {
798 "$schema": "http://json-schema.org/draft-07/schema#",
799 "title": "Runtime Log Line",
800 "type": "object",
801 "required": [
802 "file",
803 "level",
804 "line",
805 "message",
806 "target",
807 "time"
808 ],
809 "properties": {
810 "file": { "type": "string" },
811 "level": { "type": "string", "enum": ["ERROR", "WARN", "INFO", "DEBUG", "TRACE"] },
812 "line": { "type": "integer" },
813 "message": { "type": "string" },
814 "target": { "type": "string" },
815 "time": { "type": "string", "format": "date-time" },
816 "span_id": { "type": "string", "pattern": "^[a-f0-9]{16}$" },
817 "parent_id": { "type": "string", "pattern": "^[a-f0-9]{16}$" },
818 "trace_id": { "type": "string", "pattern": "^[a-f0-9]{32}$" },
819 "span_name": { "type": "string" },
820 "time.busy_us": { "type": "integer" },
821 "time.duration_us": { "type": "integer" },
822 "time.idle_us": { "type": "integer" },
823 "tracestate": { "type": "string" }
824 },
825 "additionalProperties": true
826 }
827 "#;
828
829 #[tracing::instrument(
830 skip_all,
831 fields(
832 span_id = "abd16e319329445f",
833 trace_id = "2adfd24468724599bb9a4990dc342288"
834 )
835 )]
836 async fn parent() {
837 tracing::Span::current().record("trace_id", "invalid");
838 tracing::Span::current().record("span_id", "invalid");
839 tracing::Span::current().record("span_name", "invalid");
840 tracing::trace!(message = "parent!");
841 if let Some(my_ctx) = get_distributed_tracing_context() {
842 tracing::info!(my_trace_id = my_ctx.trace_id);
843 }
844 child().await;
845 }
846
847 #[tracing::instrument(skip_all)]
848 async fn child() {
849 tracing::trace!(message = "child");
850 if let Some(my_ctx) = get_distributed_tracing_context() {
851 tracing::info!(my_trace_id = my_ctx.trace_id);
852 }
853 grandchild().await;
854 }
855
856 #[tracing::instrument(skip_all)]
857 async fn grandchild() {
858 tracing::trace!(message = "grandchild");
859 if let Some(my_ctx) = get_distributed_tracing_context() {
860 tracing::info!(my_trace_id = my_ctx.trace_id);
861 }
862 }
863
864 pub fn load_log(file_name: &str) -> Result<Vec<serde_json::Value>> {
865 let schema_json: Value =
866 serde_json::from_str(LOG_LINE_SCHEMA).expect("schema parse failure");
867 let compiled_schema = JSONSchema::options()
868 .with_draft(Draft::Draft7)
869 .compile(&schema_json)
870 .expect("Invalid schema");
871
872 let f = File::open(file_name)?;
873 let reader = BufReader::new(f);
874 let mut result = Vec::new();
875
876 for (line_num, line) in reader.lines().enumerate() {
877 let line = line?;
878 let val: Value = serde_json::from_str(&line)
879 .map_err(|e| anyhow!("Line {}: invalid JSON: {}", line_num + 1, e))?;
880
881 if let Err(errors) = compiled_schema.validate(&val) {
882 let errs = errors.map(|e| e.to_string()).collect::<Vec<_>>().join("; ");
883 return Err(anyhow!(
884 "Line {}: JSON Schema Validation errors: {}",
885 line_num + 1,
886 errs
887 ));
888 }
889 println!("{}", val);
890 result.push(val);
891 }
892 Ok(result)
893 }
894
895 #[tokio::test]
896 async fn test_json_log_capture() -> Result<()> {
897 #[allow(clippy::redundant_closure_call)]
898 let _ = temp_env::async_with_vars(
899 [("DYN_LOGGING_JSONL", Some("1"))],
900 (async || {
901 let tmp_file = NamedTempFile::new().unwrap();
902 let file_name = tmp_file.path().to_str().unwrap();
903 let guard = StderrOverride::from_file(file_name)?;
904 init();
905 parent().await;
906 drop(guard);
907
908 let lines = load_log(file_name)?;
909
910 let parent_trace_id = Uuid::parse_str("2adfd24468724599bb9a4990dc342288")
912 .unwrap()
913 .simple()
914 .to_string();
915 for log_line in &lines {
916 if let Some(my_trace_id) = log_line.get("my_trace_id") {
917 assert_eq!(
918 my_trace_id,
919 &serde_json::Value::String(parent_trace_id.clone())
920 );
921 }
922 }
923
924 let mut created_span_ids: Vec<String> = Vec::new();
926 let mut closed_span_ids: Vec<String> = Vec::new();
927
928 for log_line in &lines {
929 if let Some(message) = log_line.get("message") {
930 match message.as_str().unwrap() {
931 "SPAN_CREATED" => {
932 if let Some(span_id) = log_line.get("span_id") {
933 let span_id_str = span_id.as_str().unwrap();
934 assert!(
935 created_span_ids.iter().all(|id| id != span_id_str),
936 "Duplicate span ID found in SPAN_CREATED: {}",
937 span_id_str
938 );
939 created_span_ids.push(span_id_str.to_string());
940 }
941 }
942 "SPAN_CLOSED" => {
943 if let Some(span_id) = log_line.get("span_id") {
944 let span_id_str = span_id.as_str().unwrap();
945 assert!(
946 closed_span_ids.iter().all(|id| id != span_id_str),
947 "Duplicate span ID found in SPAN_CLOSED: {}",
948 span_id_str
949 );
950 closed_span_ids.push(span_id_str.to_string());
951 }
952 }
953 _ => {}
954 }
955 }
956 }
957
958 for closed_span_id in &closed_span_ids {
960 assert!(
961 created_span_ids.contains(closed_span_id),
962 "SPAN_CLOSED without corresponding SPAN_CREATED: {}",
963 closed_span_id
964 );
965 }
966
967 let parent_span_id = lines
969 .iter()
970 .find(|log_line| {
971 log_line.get("message").unwrap().as_str().unwrap() == "SPAN_CREATED"
972 && log_line.get("span_name").unwrap().as_str().unwrap() == "parent"
973 })
974 .and_then(|log_line| {
975 log_line
976 .get("span_id")
977 .map(|s| s.as_str().unwrap().to_string())
978 })
979 .unwrap();
980
981 let child_span_id = lines
982 .iter()
983 .find(|log_line| {
984 log_line.get("message").unwrap().as_str().unwrap() == "SPAN_CREATED"
985 && log_line.get("span_name").unwrap().as_str().unwrap() == "child"
986 })
987 .and_then(|log_line| {
988 log_line
989 .get("span_id")
990 .map(|s| s.as_str().unwrap().to_string())
991 })
992 .unwrap();
993
994 let _grandchild_span_id = lines
995 .iter()
996 .find(|log_line| {
997 log_line.get("message").unwrap().as_str().unwrap() == "SPAN_CREATED"
998 && log_line.get("span_name").unwrap().as_str().unwrap() == "grandchild"
999 })
1000 .and_then(|log_line| {
1001 log_line
1002 .get("span_id")
1003 .map(|s| s.as_str().unwrap().to_string())
1004 })
1005 .unwrap();
1006
1007 for log_line in &lines {
1009 if let Some(span_name) = log_line.get("span_name")
1010 && let Some(span_name_str) = span_name.as_str()
1011 && span_name_str == "parent"
1012 {
1013 assert!(log_line.get("parent_id").is_none());
1014 }
1015 }
1016
1017 for log_line in &lines {
1019 if let Some(span_name) = log_line.get("span_name")
1020 && let Some(span_name_str) = span_name.as_str()
1021 && span_name_str == "child"
1022 {
1023 assert_eq!(
1024 log_line.get("parent_id").unwrap().as_str().unwrap(),
1025 &parent_span_id
1026 );
1027 }
1028 }
1029
1030 for log_line in &lines {
1032 if let Some(span_name) = log_line.get("span_name")
1033 && let Some(span_name_str) = span_name.as_str()
1034 && span_name_str == "grandchild"
1035 {
1036 assert_eq!(
1037 log_line.get("parent_id").unwrap().as_str().unwrap(),
1038 &child_span_id
1039 );
1040 }
1041 }
1042
1043 let parent_duration = lines
1045 .iter()
1046 .find(|log_line| {
1047 log_line.get("message").unwrap().as_str().unwrap() == "SPAN_CLOSED"
1048 && log_line.get("span_name").unwrap().as_str().unwrap() == "parent"
1049 })
1050 .and_then(|log_line| {
1051 log_line
1052 .get("time.duration_us")
1053 .map(|d| d.as_u64().unwrap())
1054 })
1055 .unwrap();
1056
1057 let child_duration = lines
1058 .iter()
1059 .find(|log_line| {
1060 log_line.get("message").unwrap().as_str().unwrap() == "SPAN_CLOSED"
1061 && log_line.get("span_name").unwrap().as_str().unwrap() == "child"
1062 })
1063 .and_then(|log_line| {
1064 log_line
1065 .get("time.duration_us")
1066 .map(|d| d.as_u64().unwrap())
1067 })
1068 .unwrap();
1069
1070 let grandchild_duration = lines
1071 .iter()
1072 .find(|log_line| {
1073 log_line.get("message").unwrap().as_str().unwrap() == "SPAN_CLOSED"
1074 && log_line.get("span_name").unwrap().as_str().unwrap() == "grandchild"
1075 })
1076 .and_then(|log_line| {
1077 log_line
1078 .get("time.duration_us")
1079 .map(|d| d.as_u64().unwrap())
1080 })
1081 .unwrap();
1082
1083 assert!(
1084 parent_duration > child_duration + grandchild_duration,
1085 "Parent duration is not greater than the sum of child and grandchild durations"
1086 );
1087 assert!(
1088 child_duration > grandchild_duration,
1089 "Child duration is not greater than grandchild duration"
1090 );
1091
1092 Ok::<(), anyhow::Error>(())
1093 })(),
1094 )
1095 .await;
1096 Ok(())
1097 }
1098}