1use std::collections::{BTreeMap, HashMap};
30use std::sync::Once;
31
32use figment::{
33 Figment,
34 providers::{Format, Serialized, Toml},
35};
36use serde::{Deserialize, Serialize};
37use tracing::level_filters::LevelFilter;
38use tracing::{Event, Subscriber};
39use tracing_subscriber::EnvFilter;
40use tracing_subscriber::fmt::time::FormatTime;
41use tracing_subscriber::fmt::time::LocalTime;
42use tracing_subscriber::fmt::time::SystemTime;
43use tracing_subscriber::fmt::time::UtcTime;
44use tracing_subscriber::fmt::{FmtContext, FormatFields};
45use tracing_subscriber::fmt::{FormattedFields, format::Writer};
46use tracing_subscriber::prelude::*;
47use tracing_subscriber::registry::LookupSpan;
48use tracing_subscriber::{filter::Directive, fmt};
49
50use crate::config::{disable_ansi_logging, jsonl_logging_enabled, span_events_enabled};
51use async_nats::{HeaderMap, HeaderValue};
52use axum::extract::FromRequestParts;
53use axum::http;
54use axum::http::Request;
55use axum::http::request::Parts;
56use serde_json::Value;
57use std::convert::Infallible;
58use std::time::Instant;
59use tower_http::trace::{DefaultMakeSpan, TraceLayer};
60use tracing::Id;
61use tracing::Span;
62use tracing::field::Field;
63use tracing::span;
64use tracing_subscriber::Layer;
65use tracing_subscriber::Registry;
66use tracing_subscriber::field::Visit;
67use tracing_subscriber::fmt::format::FmtSpan;
68use tracing_subscriber::layer::Context;
69use tracing_subscriber::registry::SpanData;
70use uuid::Uuid;
71
72use opentelemetry::propagation::{Extractor, Injector, TextMapPropagator};
73use opentelemetry::trace::TraceContextExt;
74use opentelemetry::{global, trace::Tracer};
75use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
76use opentelemetry_otlp::WithExportConfig;
77
78use opentelemetry::trace::TracerProvider as _;
79use opentelemetry::{Key, KeyValue};
80use opentelemetry_sdk::Resource;
81use opentelemetry_sdk::logs::SdkLoggerProvider;
82use opentelemetry_sdk::trace::SdkTracerProvider;
83use tracing::error;
84use tracing_subscriber::layer::SubscriberExt;
85use std::time::Duration;
88use tracing::{info, instrument};
89use tracing_opentelemetry::OpenTelemetrySpanExt;
90use tracing_subscriber::util::SubscriberInitExt;
91
92use crate::config::environment_names::logging as env_logging;
93
94use dynamo_config::env_is_truthy;
95
96const DEFAULT_FILTER_LEVEL: &str = "info";
98
99const DEFAULT_OTLP_ENDPOINT: &str = "http://localhost:4317";
101
102const DEFAULT_OTEL_SERVICE_NAME: &str = "dynamo";
104
105static INIT: Once = Once::new();
107
108#[derive(Serialize, Deserialize, Debug)]
109struct LoggingConfig {
110 log_level: String,
111 log_filters: HashMap<String, String>,
112}
113impl Default for LoggingConfig {
114 fn default() -> Self {
115 LoggingConfig {
116 log_level: DEFAULT_FILTER_LEVEL.to_string(),
117 log_filters: HashMap::from([
118 ("h2".to_string(), "error".to_string()),
119 ("tower".to_string(), "error".to_string()),
120 ("hyper_util".to_string(), "error".to_string()),
121 ("neli".to_string(), "error".to_string()),
122 ("async_nats".to_string(), "error".to_string()),
123 ("rustls".to_string(), "error".to_string()),
124 ("tokenizers".to_string(), "error".to_string()),
125 ("axum".to_string(), "error".to_string()),
126 ("tonic".to_string(), "error".to_string()),
127 ("hf_hub".to_string(), "error".to_string()),
128 ("opentelemetry".to_string(), "error".to_string()),
129 ("opentelemetry-otlp".to_string(), "error".to_string()),
130 ("opentelemetry_sdk".to_string(), "error".to_string()),
131 ]),
132 }
133 }
134}
135
136fn otlp_exporter_enabled() -> bool {
138 env_is_truthy(env_logging::otlp::OTEL_EXPORT_ENABLED)
139}
140
141fn get_service_name() -> String {
143 std::env::var(env_logging::otlp::OTEL_SERVICE_NAME)
144 .unwrap_or_else(|_| DEFAULT_OTEL_SERVICE_NAME.to_string())
145}
146
147pub fn is_valid_trace_id(trace_id: &str) -> bool {
150 trace_id.len() == 32 && trace_id.chars().all(|c| c.is_ascii_hexdigit())
151}
152
153pub fn is_valid_span_id(span_id: &str) -> bool {
156 span_id.len() == 16 && span_id.chars().all(|c| c.is_ascii_hexdigit())
157}
158
159pub struct DistributedTraceIdLayer;
160
161#[derive(Debug, Clone, Serialize, Deserialize)]
162pub struct DistributedTraceContext {
163 pub trace_id: String,
164 pub span_id: String,
165 #[serde(skip_serializing_if = "Option::is_none")]
166 pub parent_id: Option<String>,
167 #[serde(skip_serializing_if = "Option::is_none")]
168 pub tracestate: Option<String>,
169 #[serde(skip)]
170 start: Option<Instant>,
171 #[serde(skip)]
172 end: Option<Instant>,
173 #[serde(skip_serializing_if = "Option::is_none")]
174 pub x_request_id: Option<String>,
175 #[serde(skip_serializing_if = "Option::is_none")]
176 pub request_id: Option<String>,
177}
178
179#[derive(Debug, Clone)]
181struct PendingDistributedTraceContext {
182 trace_id: Option<String>,
183 span_id: Option<String>,
184 parent_id: Option<String>,
185 tracestate: Option<String>,
186 x_request_id: Option<String>,
187 request_id: Option<String>,
188}
189
190macro_rules! emit_at_level {
192 ($level:expr, target: $target:expr, $($arg:tt)*) => {
193 match $level {
197 &tracing::Level::ERROR => tracing::event!(target: $target, tracing::Level::ERROR, $($arg)*),
198 &tracing::Level::WARN => tracing::event!(target: $target, tracing::Level::WARN, $($arg)*),
199 &tracing::Level::INFO => tracing::event!(target: $target, tracing::Level::INFO, $($arg)*),
200 &tracing::Level::DEBUG => tracing::event!(target: $target, tracing::Level::DEBUG, $($arg)*),
201 &tracing::Level::TRACE => tracing::event!(target: $target, tracing::Level::TRACE, $($arg)*),
202 }
203 };
204}
205
206impl DistributedTraceContext {
207 pub fn create_traceparent(&self) -> String {
209 format!("00-{}-{}-01", self.trace_id, self.span_id)
210 }
211}
212
213pub fn parse_traceparent(traceparent: &str) -> (Option<String>, Option<String>) {
215 let pieces: Vec<_> = traceparent.split('-').collect();
216 if pieces.len() != 4 {
217 return (None, None);
218 }
219 let trace_id = pieces[1];
220 let parent_id = pieces[2];
221
222 if !is_valid_trace_id(trace_id) || !is_valid_span_id(parent_id) {
223 return (None, None);
224 }
225
226 (Some(trace_id.to_string()), Some(parent_id.to_string()))
227}
228
229#[derive(Debug, Clone, Default)]
230pub struct TraceParent {
231 pub trace_id: Option<String>,
232 pub parent_id: Option<String>,
233 pub tracestate: Option<String>,
234 pub x_request_id: Option<String>,
235 pub request_id: Option<String>,
236}
237
238pub trait GenericHeaders {
239 fn get(&self, key: &str) -> Option<&str>;
240}
241
242impl GenericHeaders for async_nats::HeaderMap {
243 fn get(&self, key: &str) -> Option<&str> {
244 async_nats::HeaderMap::get(self, key).map(|value| value.as_str())
245 }
246}
247
248impl GenericHeaders for http::HeaderMap {
249 fn get(&self, key: &str) -> Option<&str> {
250 http::HeaderMap::get(self, key).and_then(|value| value.to_str().ok())
251 }
252}
253
254impl TraceParent {
255 pub fn from_headers<H: GenericHeaders>(headers: &H) -> TraceParent {
256 let mut trace_id = None;
257 let mut parent_id = None;
258 let mut tracestate = None;
259 let mut x_request_id = None;
260 let mut request_id = None;
261
262 if let Some(header_value) = headers.get("traceparent") {
263 (trace_id, parent_id) = parse_traceparent(header_value);
264 }
265
266 if let Some(header_value) = headers.get("x-request-id") {
267 x_request_id = Some(header_value.to_string());
268 }
269
270 if let Some(header_value) = headers.get("tracestate") {
271 tracestate = Some(header_value.to_string());
272 }
273
274 if let Some(header_value) = headers.get("request-id") {
276 request_id = Some(header_value.to_string());
277 } else if let Some(header_value) = headers.get("x-dynamo-request-id") {
278 request_id = Some(header_value.to_string());
279 }
280
281 let request_id = request_id.filter(|id| uuid::Uuid::parse_str(id).is_ok());
282 TraceParent {
283 trace_id,
284 parent_id,
285 tracestate,
286 x_request_id,
287 request_id,
288 }
289 }
290}
291
292pub fn make_inference_request_span<B>(req: &Request<B>) -> Span {
298 let method = req.method();
299 let uri = req.uri();
300 let version = format!("{:?}", req.version());
301 let trace_parent = TraceParent::from_headers(req.headers());
302
303 let otel_context = extract_otel_context_from_http_headers(req.headers());
304
305 let request_id = trace_parent
309 .request_id
310 .unwrap_or_else(|| Uuid::new_v4().to_string());
311
312 let span = tracing::info_span!(
313 target: "request_span",
314 "http-request",
315 method = %method,
316 uri = %uri,
317 version = %version,
318 trace_id = trace_parent.trace_id,
319 parent_id = trace_parent.parent_id,
320 x_request_id = trace_parent.x_request_id,
321 request_id = %request_id,
322 model = tracing::field::Empty,
323 input_tokens = tracing::field::Empty,
324 output_tokens = tracing::field::Empty,
325 ttft_ms = tracing::field::Empty,
326 avg_itl_ms = tracing::field::Empty,
327 prefill_worker_id = tracing::field::Empty,
328 decode_worker_id = tracing::field::Empty,
329 );
330
331 if let Some(context) = otel_context {
332 let _ = span.set_parent(context);
333 }
334
335 span
336}
337
338pub fn make_system_request_span<B>(req: &Request<B>) -> Span {
345 let method = req.method();
346 let uri = req.uri();
347 let version = format!("{:?}", req.version());
348 let trace_parent = TraceParent::from_headers(req.headers());
349 let otel_context = extract_otel_context_from_http_headers(req.headers());
350
351 let request_id = trace_parent
353 .request_id
354 .unwrap_or_else(|| Uuid::new_v4().to_string());
355
356 let span = tracing::debug_span!(
357 target: "system_span",
358 "http-request",
359 method = %method,
360 uri = %uri,
361 version = %version,
362 trace_id = trace_parent.trace_id,
363 parent_id = trace_parent.parent_id,
364 x_request_id = trace_parent.x_request_id,
365 request_id = %request_id,
366 model = tracing::field::Empty,
367 input_tokens = tracing::field::Empty,
368 output_tokens = tracing::field::Empty,
369 ttft_ms = tracing::field::Empty,
370 avg_itl_ms = tracing::field::Empty,
371 prefill_worker_id = tracing::field::Empty,
372 decode_worker_id = tracing::field::Empty,
373 );
374
375 if let Some(context) = otel_context {
376 let _ = span.set_parent(context);
377 }
378
379 span
380}
381
382fn extract_otel_context_from_http_headers(
384 headers: &http::HeaderMap,
385) -> Option<opentelemetry::Context> {
386 let traceparent_value = headers.get("traceparent")?.to_str().ok()?;
387
388 struct HttpHeaderExtractor<'a>(&'a http::HeaderMap);
389
390 impl<'a> Extractor for HttpHeaderExtractor<'a> {
391 fn get(&self, key: &str) -> Option<&str> {
392 self.0.get(key).and_then(|v| v.to_str().ok())
393 }
394
395 fn keys(&self) -> Vec<&str> {
396 vec!["traceparent", "tracestate"]
397 .into_iter()
398 .filter(|&key| self.0.get(key).is_some())
399 .collect()
400 }
401 }
402
403 if traceparent_value.is_empty() {
405 return None;
406 }
407
408 let extractor = HttpHeaderExtractor(headers);
409 let otel_context = TRACE_PROPAGATOR.extract(&extractor);
410
411 if otel_context.span().span_context().is_valid() {
412 Some(otel_context)
413 } else {
414 None
415 }
416}
417
418pub fn make_handle_payload_span(
420 headers: &async_nats::HeaderMap,
421 component: &str,
422 endpoint: &str,
423 namespace: &str,
424 instance_id: u64,
425) -> Span {
426 let (otel_context, trace_id, parent_span_id) = extract_otel_context_from_nats_headers(headers);
427 let trace_parent = TraceParent::from_headers(headers);
428
429 if let (Some(trace_id), Some(parent_id)) = (trace_id.as_ref(), parent_span_id.as_ref()) {
430 let span = tracing::info_span!(
431 target: "request_span",
432 "handle_payload",
433 trace_id = trace_id.as_str(),
434 parent_id = parent_id.as_str(),
435 x_request_id = trace_parent.x_request_id,
436 request_id = trace_parent.request_id,
437 tracestate = trace_parent.tracestate,
438 component = component,
439 endpoint = endpoint,
440 namespace = namespace,
441 instance_id = instance_id,
442 );
443
444 if let Some(context) = otel_context {
445 let _ = span.set_parent(context);
446 }
447 span
448 } else {
449 tracing::info_span!(
450 target: "request_span",
451 "handle_payload",
452 x_request_id = trace_parent.x_request_id,
453 request_id = trace_parent.request_id,
454 tracestate = trace_parent.tracestate,
455 component = component,
456 endpoint = endpoint,
457 namespace = namespace,
458 instance_id = instance_id,
459 )
460 }
461}
462
463pub fn make_handle_payload_span_from_tcp_headers(
465 headers: &std::collections::HashMap<String, String>,
466 component: &str,
467 endpoint: &str,
468 namespace: &str,
469 instance_id: u64,
470) -> Span {
471 let (otel_context, trace_id, parent_span_id) = extract_otel_context_from_tcp_headers(headers);
472 let x_request_id = headers.get("x-request-id").cloned();
473 let request_id = headers
474 .get("request-id")
475 .or_else(|| headers.get("x-dynamo-request-id"))
476 .filter(|id| uuid::Uuid::parse_str(id).is_ok())
477 .cloned();
478 let tracestate = headers.get("tracestate").cloned();
479
480 if let (Some(trace_id), Some(parent_id)) = (trace_id.as_ref(), parent_span_id.as_ref()) {
481 let span = tracing::info_span!(
482 target: "request_span",
483 "handle_payload",
484 trace_id = trace_id.as_str(),
485 parent_id = parent_id.as_str(),
486 x_request_id = x_request_id,
487 request_id = request_id,
488 tracestate = tracestate,
489 component = component,
490 endpoint = endpoint,
491 namespace = namespace,
492 instance_id = instance_id,
493 );
494
495 if let Some(context) = otel_context {
496 let _ = span.set_parent(context);
497 }
498 span
499 } else {
500 tracing::info_span!(
501 target: "request_span",
502 "handle_payload",
503 x_request_id = x_request_id,
504 request_id = request_id,
505 tracestate = tracestate,
506 component = component,
507 endpoint = endpoint,
508 namespace = namespace,
509 instance_id = instance_id,
510 )
511 }
512}
513
514fn extract_otel_context_from_tcp_headers(
516 headers: &std::collections::HashMap<String, String>,
517) -> (
518 Option<opentelemetry::Context>,
519 Option<String>,
520 Option<String>,
521) {
522 let traceparent_value = match headers.get("traceparent") {
523 Some(value) => value.as_str(),
524 None => return (None, None, None),
525 };
526
527 let (trace_id, parent_span_id) = parse_traceparent(traceparent_value);
528
529 struct TcpHeaderExtractor<'a>(&'a std::collections::HashMap<String, String>);
530
531 impl<'a> Extractor for TcpHeaderExtractor<'a> {
532 fn get(&self, key: &str) -> Option<&str> {
533 self.0.get(key).map(|s| s.as_str())
534 }
535
536 fn keys(&self) -> Vec<&str> {
537 vec!["traceparent", "tracestate"]
538 .into_iter()
539 .filter(|&key| self.0.get(key).is_some())
540 .collect()
541 }
542 }
543
544 let extractor = TcpHeaderExtractor(headers);
545 let otel_context = TRACE_PROPAGATOR.extract(&extractor);
546
547 let context_with_trace = if otel_context.span().span_context().is_valid() {
548 Some(otel_context)
549 } else {
550 None
551 };
552
553 (context_with_trace, trace_id, parent_span_id)
554}
555
556pub fn extract_otel_context_from_nats_headers(
558 headers: &async_nats::HeaderMap,
559) -> (
560 Option<opentelemetry::Context>,
561 Option<String>,
562 Option<String>,
563) {
564 let traceparent_value = match headers.get("traceparent") {
565 Some(value) => value.as_str(),
566 None => return (None, None, None),
567 };
568
569 let (trace_id, parent_span_id) = parse_traceparent(traceparent_value);
570
571 struct NatsHeaderExtractor<'a>(&'a async_nats::HeaderMap);
572
573 impl<'a> Extractor for NatsHeaderExtractor<'a> {
574 fn get(&self, key: &str) -> Option<&str> {
575 self.0.get(key).map(|value| value.as_str())
576 }
577
578 fn keys(&self) -> Vec<&str> {
579 vec!["traceparent", "tracestate"]
580 .into_iter()
581 .filter(|&key| self.0.get(key).is_some())
582 .collect()
583 }
584 }
585
586 let extractor = NatsHeaderExtractor(headers);
587 let otel_context = TRACE_PROPAGATOR.extract(&extractor);
588
589 let context_with_trace = if otel_context.span().span_context().is_valid() {
590 Some(otel_context)
591 } else {
592 None
593 };
594
595 (context_with_trace, trace_id, parent_span_id)
596}
597
598pub fn inject_otel_context_into_nats_headers(
600 headers: &mut async_nats::HeaderMap,
601 context: Option<opentelemetry::Context>,
602) {
603 let otel_context = context.unwrap_or_else(|| Span::current().context());
604
605 struct NatsHeaderInjector<'a>(&'a mut async_nats::HeaderMap);
606
607 impl<'a> Injector for NatsHeaderInjector<'a> {
608 fn set(&mut self, key: &str, value: String) {
609 self.0.insert(key, value);
610 }
611 }
612
613 let mut injector = NatsHeaderInjector(headers);
614 TRACE_PROPAGATOR.inject_context(&otel_context, &mut injector);
615}
616
617pub fn inject_current_trace_into_nats_headers(headers: &mut async_nats::HeaderMap) {
619 inject_otel_context_into_nats_headers(headers, None);
620}
621
622pub fn inject_trace_headers_into_map(headers: &mut std::collections::HashMap<String, String>) {
624 if let Some(trace_context) = get_distributed_tracing_context() {
625 headers.insert(
627 "traceparent".to_string(),
628 trace_context.create_traceparent(),
629 );
630
631 if let Some(tracestate) = trace_context.tracestate {
633 headers.insert("tracestate".to_string(), tracestate);
634 }
635
636 if let Some(x_request_id) = trace_context.x_request_id {
638 headers.insert("x-request-id".to_string(), x_request_id);
639 }
640 if let Some(request_id) = trace_context.request_id {
641 headers.insert("request-id".to_string(), request_id);
642 }
643 }
644}
645
646pub fn make_client_request_span(
648 operation: &str,
649 request_id: &str,
650 trace_context: Option<&DistributedTraceContext>,
651 instance_id: Option<&str>,
652) -> Span {
653 if let Some(ctx) = trace_context {
654 let mut headers = async_nats::HeaderMap::new();
655 headers.insert("traceparent", ctx.create_traceparent());
656
657 if let Some(ref tracestate) = ctx.tracestate {
658 headers.insert("tracestate", tracestate.as_str());
659 }
660
661 let (otel_context, _extracted_trace_id, _extracted_parent_span_id) =
662 extract_otel_context_from_nats_headers(&headers);
663
664 let span = if let Some(inst_id) = instance_id {
665 tracing::info_span!(
666 "client_request",
667 operation = operation,
668 request_id = request_id,
669 instance_id = inst_id,
670 trace_id = ctx.trace_id.as_str(),
671 parent_id = ctx.span_id.as_str(),
672 x_request_id = ctx.x_request_id.as_deref(),
673 )
674 } else {
675 tracing::info_span!(
676 "client_request",
677 operation = operation,
678 request_id = request_id,
679 trace_id = ctx.trace_id.as_str(),
680 parent_id = ctx.span_id.as_str(),
681 x_request_id = ctx.x_request_id.as_deref(),
682 )
683 };
684
685 if let Some(context) = otel_context {
686 let _ = span.set_parent(context);
687 }
688
689 span
690 } else if let Some(inst_id) = instance_id {
691 tracing::info_span!(
692 "client_request",
693 operation = operation,
694 request_id = request_id,
695 instance_id = inst_id,
696 )
697 } else {
698 tracing::info_span!(
699 "client_request",
700 operation = operation,
701 request_id = request_id,
702 )
703 }
704}
705
706#[derive(Debug, Default)]
707pub struct FieldVisitor {
708 pub fields: HashMap<String, String>,
709}
710
711impl Visit for FieldVisitor {
712 fn record_str(&mut self, field: &Field, value: &str) {
713 self.fields
714 .insert(field.name().to_string(), value.to_string());
715 }
716
717 fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
718 self.fields
719 .insert(field.name().to_string(), format!("{:?}", value).to_string());
720 }
721}
722
723impl<S> Layer<S> for DistributedTraceIdLayer
724where
725 S: Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>,
726{
727 fn on_close(&self, id: Id, ctx: Context<'_, S>) {
730 if let Some(span) = ctx.span(&id) {
731 let mut extensions = span.extensions_mut();
732 if let Some(distributed_tracing_context) =
733 extensions.get_mut::<DistributedTraceContext>()
734 {
735 distributed_tracing_context.end = Some(Instant::now());
736 }
737 }
738 }
739
740 fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &Id, ctx: Context<'_, S>) {
743 if let Some(span) = ctx.span(id) {
744 let mut trace_id: Option<String> = None;
745 let mut parent_id: Option<String> = None;
746 let mut span_id: Option<String> = None;
747 let mut x_request_id: Option<String> = None;
748 let mut request_id: Option<String> = None;
749 let mut tracestate: Option<String> = None;
750 let mut visitor = FieldVisitor::default();
751 attrs.record(&mut visitor);
752
753 if let Some(trace_id_input) = visitor.fields.get("trace_id") {
755 if !is_valid_trace_id(trace_id_input) {
756 tracing::trace!("trace id '{trace_id_input}' is not valid! Ignoring.");
757 } else {
758 trace_id = Some(trace_id_input.to_string());
759 }
760 }
761
762 if let Some(span_id_input) = visitor.fields.get("span_id") {
764 if !is_valid_span_id(span_id_input) {
765 tracing::trace!("span id '{span_id_input}' is not valid! Ignoring.");
766 } else {
767 span_id = Some(span_id_input.to_string());
768 }
769 }
770
771 if let Some(parent_id_input) = visitor.fields.get("parent_id") {
773 if !is_valid_span_id(parent_id_input) {
774 tracing::trace!("parent id '{parent_id_input}' is not valid! Ignoring.");
775 } else {
776 parent_id = Some(parent_id_input.to_string());
777 }
778 }
779
780 if let Some(tracestate_input) = visitor.fields.get("tracestate") {
782 tracestate = Some(tracestate_input.to_string());
783 }
784
785 if let Some(x_request_id_input) = visitor.fields.get("x_request_id") {
787 x_request_id = Some(x_request_id_input.to_string());
788 }
789
790 if let Some(request_id_input) = visitor.fields.get("request_id") {
792 request_id = Some(request_id_input.to_string());
793 } else if let Some(x_request_id_input) = visitor.fields.get("x_dynamo_request_id") {
794 request_id = Some(x_request_id_input.to_string());
795 }
796
797 if parent_id.is_none()
799 && let Some(parent_span_id) = ctx.current_span().id()
800 && let Some(parent_span) = ctx.span(parent_span_id)
801 {
802 let parent_ext = parent_span.extensions();
803 if let Some(parent_tracing_context) = parent_ext.get::<DistributedTraceContext>() {
804 trace_id = Some(parent_tracing_context.trace_id.clone());
805 parent_id = Some(parent_tracing_context.span_id.clone());
806 tracestate = parent_tracing_context.tracestate.clone();
807 if x_request_id.is_none() {
808 x_request_id = parent_tracing_context.x_request_id.clone();
809 }
810 if request_id.is_none() {
811 request_id = parent_tracing_context.request_id.clone();
812 }
813 }
814 }
815
816 if (parent_id.is_some() || span_id.is_some()) && trace_id.is_none() {
818 tracing::error!("parent id or span id are set but trace id is not set!");
819 parent_id = None;
821 span_id = None;
822 }
823
824 let mut extensions = span.extensions_mut();
826 extensions.insert(PendingDistributedTraceContext {
827 trace_id,
828 span_id,
829 parent_id,
830 tracestate,
831 x_request_id,
832 request_id,
833 });
834 }
835 }
836
837 fn on_enter(&self, id: &Id, ctx: Context<'_, S>) {
840 if let Some(span) = ctx.span(id) {
841 {
843 let extensions = span.extensions();
844 if extensions.get::<DistributedTraceContext>().is_some() {
845 return;
846 }
847 }
848
849 let mut extensions = span.extensions_mut();
851 let pending = match extensions.remove::<PendingDistributedTraceContext>() {
852 Some(p) => p,
853 None => {
854 tracing::error!("PendingDistributedTraceContext not found in on_enter");
856 return;
857 }
858 };
859
860 let mut trace_id = pending.trace_id;
861 let mut span_id = pending.span_id;
862 let parent_id = pending.parent_id;
863 let tracestate = pending.tracestate;
864 let x_request_id = pending.x_request_id;
865 let request_id = pending.request_id;
866
867 drop(extensions);
870
871 if trace_id.is_none() || span_id.is_none() {
872 let extensions = span.extensions();
873 if let Some(otel_data) = extensions.get::<tracing_opentelemetry::OtelData>() {
874 if trace_id.is_none()
876 && let Some(otel_trace_id) = otel_data.trace_id()
877 {
878 let trace_id_str = format!("{}", otel_trace_id);
879 if is_valid_trace_id(&trace_id_str) {
880 trace_id = Some(trace_id_str);
881 }
882 }
883
884 if span_id.is_none()
886 && let Some(otel_span_id) = otel_data.span_id()
887 {
888 let span_id_str = format!("{}", otel_span_id);
889 if is_valid_span_id(&span_id_str) {
890 span_id = Some(span_id_str);
891 }
892 }
893 }
894 }
895
896 if trace_id.is_none() {
898 panic!(
899 "trace_id is not set in on_enter - OtelData may not be properly initialized"
900 );
901 }
902
903 if span_id.is_none() {
904 panic!("span_id is not set in on_enter - OtelData may not be properly initialized");
905 }
906
907 let span_level = span.metadata().level();
908 let mut extensions = span.extensions_mut();
909 extensions.insert(DistributedTraceContext {
910 trace_id: trace_id.expect("Trace ID must be set"),
911 span_id: span_id.expect("Span ID must be set"),
912 parent_id,
913 tracestate,
914 start: Some(Instant::now()),
915 end: None,
916 x_request_id,
917 request_id,
918 });
919
920 drop(extensions);
921
922 if span_events_enabled() {
925 emit_at_level!(span_level, target: "span_event", message = "SPAN_FIRST_ENTRY");
926 }
927 }
928 }
929}
930
931pub fn get_distributed_tracing_context() -> Option<DistributedTraceContext> {
934 Span::current()
935 .with_subscriber(|(id, subscriber)| {
936 subscriber
937 .downcast_ref::<Registry>()
938 .and_then(|registry| registry.span_data(id))
939 .and_then(|span_data| {
940 let extensions = span_data.extensions();
941 extensions.get::<DistributedTraceContext>().cloned()
942 })
943 })
944 .flatten()
945}
946
947pub fn init() {
949 INIT.call_once(|| {
950 if let Err(e) = setup_logging() {
951 eprintln!("Failed to initialize logging: {}", e);
952 std::process::exit(1);
953 }
954 });
955}
956
957#[cfg(feature = "tokio-console")]
958fn setup_logging() -> Result<(), Box<dyn std::error::Error>> {
959 let tokio_console_layer = console_subscriber::ConsoleLayer::builder()
960 .with_default_env()
961 .server_addr(([0, 0, 0, 0], console_subscriber::Server::DEFAULT_PORT))
962 .spawn();
963 let tokio_console_target = tracing_subscriber::filter::Targets::new()
964 .with_default(LevelFilter::ERROR)
965 .with_target("runtime", LevelFilter::TRACE)
966 .with_target("tokio", LevelFilter::TRACE);
967 let l = fmt::layer()
968 .with_ansi(!disable_ansi_logging())
969 .event_format(fmt::format().compact().with_timer(TimeFormatter::new()))
970 .with_writer(std::io::stderr)
971 .with_filter(filters(load_config()));
972 tracing_subscriber::registry()
973 .with(l)
974 .with(tokio_console_layer.with_filter(tokio_console_target))
975 .init();
976 Ok(())
977}
978
979#[cfg(not(feature = "tokio-console"))]
980fn setup_logging() -> Result<(), Box<dyn std::error::Error>> {
981 let fmt_filter_layer = filters(load_config());
982 let trace_filter_layer = filters(load_config());
983 let otel_filter_layer = filters(load_config());
984 let otel_logs_filter_layer = filters(load_config());
985
986 if jsonl_logging_enabled() {
987 let span_events = if span_events_enabled() {
988 FmtSpan::CLOSE
989 } else {
990 FmtSpan::NONE
991 };
992 let l = fmt::layer()
993 .with_ansi(false)
994 .with_span_events(span_events)
995 .event_format(CustomJsonFormatter::new())
996 .with_writer(std::io::stderr)
997 .with_filter(fmt_filter_layer);
998
999 let service_name = get_service_name();
1001
1002 let (tracer_provider, logger_provider_opt, endpoint_opt) = if otlp_exporter_enabled() {
1004 let traces_endpoint =
1006 std::env::var(env_logging::otlp::OTEL_EXPORTER_OTLP_TRACES_ENDPOINT)
1007 .unwrap_or_else(|_| DEFAULT_OTLP_ENDPOINT.to_string());
1008 let logs_endpoint = std::env::var(env_logging::otlp::OTEL_EXPORTER_OTLP_LOGS_ENDPOINT)
1009 .unwrap_or_else(|_| traces_endpoint.clone());
1010
1011 let resource = opentelemetry_sdk::Resource::builder_empty()
1012 .with_service_name(service_name.clone())
1013 .build();
1014
1015 let span_exporter = opentelemetry_otlp::SpanExporter::builder()
1017 .with_tonic()
1018 .with_endpoint(&traces_endpoint)
1019 .build()?;
1020
1021 let tracer_provider = opentelemetry_sdk::trace::SdkTracerProvider::builder()
1022 .with_batch_exporter(span_exporter)
1023 .with_resource(resource.clone())
1024 .build();
1025
1026 let log_exporter = opentelemetry_otlp::LogExporter::builder()
1028 .with_tonic()
1029 .with_endpoint(&logs_endpoint)
1030 .build()?;
1031
1032 let logger_provider = SdkLoggerProvider::builder()
1033 .with_batch_exporter(log_exporter)
1034 .with_resource(resource)
1035 .build();
1036
1037 (
1038 tracer_provider,
1039 Some(logger_provider),
1040 Some(traces_endpoint),
1041 )
1042 } else {
1043 let provider = opentelemetry_sdk::trace::SdkTracerProvider::builder()
1045 .with_resource(
1046 opentelemetry_sdk::Resource::builder_empty()
1047 .with_service_name(service_name.clone())
1048 .build(),
1049 )
1050 .build();
1051
1052 (provider, None, None)
1053 };
1054
1055 let tracer = tracer_provider.tracer(service_name.clone());
1057
1058 let otel_logs_layer = logger_provider_opt
1060 .as_ref()
1061 .map(|lp| OpenTelemetryTracingBridge::new(lp).with_filter(otel_logs_filter_layer));
1062
1063 tracing_subscriber::registry()
1064 .with(
1065 tracing_opentelemetry::layer()
1066 .with_tracer(tracer)
1067 .with_filter(otel_filter_layer),
1068 )
1069 .with(otel_logs_layer)
1070 .with(DistributedTraceIdLayer.with_filter(trace_filter_layer))
1071 .with(l)
1072 .init();
1073
1074 if let Some(endpoint) = endpoint_opt {
1076 tracing::info!(
1077 endpoint = %endpoint,
1078 service = %service_name,
1079 "OpenTelemetry OTLP export enabled (traces and logs)"
1080 );
1081 } else {
1082 tracing::info!(
1083 service = %service_name,
1084 "OpenTelemetry OTLP export disabled, traces local only"
1085 );
1086 }
1087 } else {
1088 let l = fmt::layer()
1089 .with_ansi(!disable_ansi_logging())
1090 .event_format(fmt::format().compact().with_timer(TimeFormatter::new()))
1091 .with_writer(std::io::stderr)
1092 .with_filter(fmt_filter_layer);
1093
1094 tracing_subscriber::registry().with(l).init();
1095 }
1096
1097 Ok(())
1098}
1099
1100fn filters(config: LoggingConfig) -> EnvFilter {
1101 let mut filter_layer = EnvFilter::builder()
1102 .with_default_directive(config.log_level.parse().unwrap())
1103 .with_env_var(env_logging::DYN_LOG)
1104 .from_env_lossy();
1105
1106 for (module, level) in config.log_filters {
1107 match format!("{module}={level}").parse::<Directive>() {
1108 Ok(d) => {
1109 filter_layer = filter_layer.add_directive(d);
1110 }
1111 Err(e) => {
1112 eprintln!("Failed parsing filter '{level}' for module '{module}': {e}");
1113 }
1114 }
1115 }
1116
1117 if span_events_enabled() {
1120 filter_layer = filter_layer.add_directive("span_event=trace".parse().unwrap());
1121 }
1122
1123 filter_layer = filter_layer.add_directive("request_span=trace".parse().unwrap());
1128
1129 filter_layer
1130}
1131
1132pub fn log_message(level: &str, message: &str, module: &str, file: &str, line: u32) {
1135 let level = match level {
1136 "debug" => log::Level::Debug,
1137 "info" => log::Level::Info,
1138 "warn" => log::Level::Warn,
1139 "error" => log::Level::Error,
1140 "warning" => log::Level::Warn,
1141 _ => log::Level::Info,
1142 };
1143 log::logger().log(
1144 &log::Record::builder()
1145 .args(format_args!("{}", message))
1146 .level(level)
1147 .target(module)
1148 .file(Some(file))
1149 .line(Some(line))
1150 .build(),
1151 );
1152}
1153
1154fn load_config() -> LoggingConfig {
1155 let config_path =
1156 std::env::var(env_logging::DYN_LOGGING_CONFIG_PATH).unwrap_or_else(|_| "".to_string());
1157 let figment = Figment::new()
1158 .merge(Serialized::defaults(LoggingConfig::default()))
1159 .merge(Toml::file("/opt/dynamo/etc/logging.toml"))
1160 .merge(Toml::file(config_path));
1161
1162 figment.extract().unwrap()
1163}
1164
1165#[derive(Serialize)]
1166struct JsonLog<'a> {
1167 time: String,
1168 level: String,
1169 #[serde(skip_serializing_if = "Option::is_none")]
1170 file: Option<&'a str>,
1171 #[serde(skip_serializing_if = "Option::is_none")]
1172 line: Option<u32>,
1173 target: String,
1174 message: serde_json::Value,
1175 #[serde(flatten)]
1176 fields: BTreeMap<String, serde_json::Value>,
1177}
1178
1179struct TimeFormatter {
1180 use_local_tz: bool,
1181}
1182
1183impl TimeFormatter {
1184 fn new() -> Self {
1185 Self {
1186 use_local_tz: crate::config::use_local_timezone(),
1187 }
1188 }
1189
1190 fn format_now(&self) -> String {
1191 if self.use_local_tz {
1192 chrono::Local::now()
1193 .format("%Y-%m-%dT%H:%M:%S%.6f%:z")
1194 .to_string()
1195 } else {
1196 chrono::Utc::now()
1197 .format("%Y-%m-%dT%H:%M:%S%.6fZ")
1198 .to_string()
1199 }
1200 }
1201}
1202
1203impl FormatTime for TimeFormatter {
1204 fn format_time(&self, w: &mut fmt::format::Writer<'_>) -> std::fmt::Result {
1205 write!(w, "{}", self.format_now())
1206 }
1207}
1208
1209struct CustomJsonFormatter {
1210 time_formatter: TimeFormatter,
1211}
1212
1213impl CustomJsonFormatter {
1214 fn new() -> Self {
1215 Self {
1216 time_formatter: TimeFormatter::new(),
1217 }
1218 }
1219}
1220
1221use once_cell::sync::Lazy;
1222use regex::Regex;
1223
1224static TRACE_PROPAGATOR: Lazy<opentelemetry_sdk::propagation::TraceContextPropagator> =
1226 Lazy::new(opentelemetry_sdk::propagation::TraceContextPropagator::new);
1227
1228fn parse_tracing_duration(s: &str) -> Option<u64> {
1229 static RE: Lazy<Regex> =
1230 Lazy::new(|| Regex::new(r#"^["']?\s*([0-9.]+)\s*(µs|us|ns|ms|s)\s*["']?$"#).unwrap());
1231 let captures = RE.captures(s)?;
1232 let value: f64 = captures[1].parse().ok()?;
1233 let unit = &captures[2];
1234 match unit {
1235 "ns" => Some((value / 1000.0) as u64),
1236 "µs" | "us" => Some(value as u64),
1237 "ms" => Some((value * 1000.0) as u64),
1238 "s" => Some((value * 1_000_000.0) as u64),
1239 _ => None,
1240 }
1241}
1242
1243impl<S, N> tracing_subscriber::fmt::FormatEvent<S, N> for CustomJsonFormatter
1244where
1245 S: Subscriber + for<'a> LookupSpan<'a>,
1246 N: for<'a> FormatFields<'a> + 'static,
1247{
1248 fn format_event(
1249 &self,
1250 ctx: &FmtContext<'_, S, N>,
1251 mut writer: Writer<'_>,
1252 event: &Event<'_>,
1253 ) -> std::fmt::Result {
1254 let mut visitor = JsonVisitor::default();
1255 let time = self.time_formatter.format_now();
1256 event.record(&mut visitor);
1257 let mut message = visitor
1258 .fields
1259 .remove("message")
1260 .unwrap_or(serde_json::Value::String("".to_string()));
1261
1262 let mut target_override: Option<String> = None;
1263
1264 let current_span = event
1265 .parent()
1266 .and_then(|id| ctx.span(id))
1267 .or_else(|| ctx.lookup_current());
1268 if let Some(span) = current_span {
1269 let ext = span.extensions();
1270 let data = ext.get::<FormattedFields<N>>().unwrap();
1271 let span_fields: Vec<(&str, &str)> = data
1272 .fields
1273 .split(' ')
1274 .filter_map(|entry| entry.split_once('='))
1275 .collect();
1276 for (name, value) in span_fields {
1277 visitor.fields.insert(
1278 name.to_string(),
1279 serde_json::Value::String(value.trim_matches('"').to_string()),
1280 );
1281 }
1282
1283 let busy_us = visitor
1284 .fields
1285 .remove("time.busy")
1286 .and_then(|v| parse_tracing_duration(&v.to_string()));
1287 let idle_us = visitor
1288 .fields
1289 .remove("time.idle")
1290 .and_then(|v| parse_tracing_duration(&v.to_string()));
1291
1292 if let (Some(busy_us), Some(idle_us)) = (busy_us, idle_us) {
1293 visitor.fields.insert(
1294 "time.busy_us".to_string(),
1295 serde_json::Value::Number(busy_us.into()),
1296 );
1297 visitor.fields.insert(
1298 "time.idle_us".to_string(),
1299 serde_json::Value::Number(idle_us.into()),
1300 );
1301 visitor.fields.insert(
1302 "time.duration_us".to_string(),
1303 serde_json::Value::Number((busy_us + idle_us).into()),
1304 );
1305 }
1306
1307 let is_span_created = message.as_str() == Some("SPAN_FIRST_ENTRY");
1308 let is_span_closed = message.as_str() == Some("close");
1309 if is_span_created || is_span_closed {
1310 target_override = Some(span.metadata().target().to_string());
1311 if is_span_closed {
1312 message = serde_json::Value::String("SPAN_CLOSED".to_string());
1313 }
1314 }
1315
1316 visitor.fields.insert(
1317 "span_name".to_string(),
1318 serde_json::Value::String(span.name().to_string()),
1319 );
1320
1321 if let Some(tracing_context) = ext.get::<DistributedTraceContext>() {
1322 visitor.fields.insert(
1323 "span_id".to_string(),
1324 serde_json::Value::String(tracing_context.span_id.clone()),
1325 );
1326 visitor.fields.insert(
1327 "trace_id".to_string(),
1328 serde_json::Value::String(tracing_context.trace_id.clone()),
1329 );
1330 if let Some(parent_id) = tracing_context.parent_id.clone() {
1331 visitor.fields.insert(
1332 "parent_id".to_string(),
1333 serde_json::Value::String(parent_id),
1334 );
1335 } else {
1336 visitor.fields.remove("parent_id");
1337 }
1338 if let Some(tracestate) = tracing_context.tracestate.clone() {
1339 visitor.fields.insert(
1340 "tracestate".to_string(),
1341 serde_json::Value::String(tracestate),
1342 );
1343 } else {
1344 visitor.fields.remove("tracestate");
1345 }
1346 if let Some(x_request_id) = tracing_context.x_request_id.clone() {
1347 visitor.fields.insert(
1348 "x_request_id".to_string(),
1349 serde_json::Value::String(x_request_id),
1350 );
1351 } else {
1352 visitor.fields.remove("x_request_id");
1353 }
1354
1355 if let Some(request_id) = tracing_context.request_id.clone() {
1356 visitor.fields.insert(
1357 "request_id".to_string(),
1358 serde_json::Value::String(request_id),
1359 );
1360 } else {
1361 visitor.fields.remove("request_id");
1362 }
1363 visitor.fields.remove("x_dynamo_request_id");
1365 } else {
1366 tracing::error!(
1367 "Distributed Trace Context not found, falling back to internal ids"
1368 );
1369 visitor.fields.insert(
1370 "span_id".to_string(),
1371 serde_json::Value::String(span.id().into_u64().to_string()),
1372 );
1373 if let Some(parent) = span.parent() {
1374 visitor.fields.insert(
1375 "parent_id".to_string(),
1376 serde_json::Value::String(parent.id().into_u64().to_string()),
1377 );
1378 }
1379 }
1380 } else {
1381 let reserved_fields = [
1382 "trace_id",
1383 "span_id",
1384 "parent_id",
1385 "span_name",
1386 "tracestate",
1387 ];
1388 for reserved_field in reserved_fields {
1389 visitor.fields.remove(reserved_field);
1390 }
1391 }
1392 let metadata = event.metadata();
1393 let log = JsonLog {
1394 level: metadata.level().to_string(),
1395 time,
1396 file: metadata.file(),
1397 line: metadata.line(),
1398 target: target_override.unwrap_or_else(|| metadata.target().to_string()),
1399 message,
1400 fields: visitor.fields,
1401 };
1402 let json = serde_json::to_string(&log).unwrap();
1403 writeln!(writer, "{json}")
1404 }
1405}
1406
1407#[derive(Default)]
1408struct JsonVisitor {
1409 fields: BTreeMap<String, serde_json::Value>,
1410}
1411
1412impl tracing::field::Visit for JsonVisitor {
1413 fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
1414 self.fields.insert(
1415 field.name().to_string(),
1416 serde_json::Value::String(format!("{value:?}")),
1417 );
1418 }
1419
1420 fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
1421 if field.name() != "message" {
1422 match serde_json::from_str::<Value>(value) {
1423 Ok(json_val) => self.fields.insert(field.name().to_string(), json_val),
1424 Err(_) => self.fields.insert(field.name().to_string(), value.into()),
1425 };
1426 } else {
1427 self.fields.insert(field.name().to_string(), value.into());
1428 }
1429 }
1430
1431 fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
1432 self.fields
1433 .insert(field.name().to_string(), serde_json::Value::Bool(value));
1434 }
1435
1436 fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
1437 self.fields.insert(
1438 field.name().to_string(),
1439 serde_json::Value::Number(value.into()),
1440 );
1441 }
1442
1443 fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
1444 self.fields.insert(
1445 field.name().to_string(),
1446 serde_json::Value::Number(value.into()),
1447 );
1448 }
1449
1450 fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
1451 use serde_json::value::Number;
1452 self.fields.insert(
1453 field.name().to_string(),
1454 serde_json::Value::Number(Number::from_f64(value).unwrap_or(0.into())),
1455 );
1456 }
1457}
1458
1459#[cfg(test)]
1460pub mod tests {
1461 use super::*;
1462 use anyhow::{Result, anyhow};
1463 use chrono::{DateTime, Utc};
1464 use jsonschema::{Draft, JSONSchema};
1465 use serde_json::Value;
1466 use std::fs::File;
1467 use std::io::{BufRead, BufReader};
1468 use stdio_override::*;
1469 use tempfile::NamedTempFile;
1470
1471 static LOG_LINE_SCHEMA: &str = r#"
1472 {
1473 "$schema": "http://json-schema.org/draft-07/schema#",
1474 "title": "Runtime Log Line",
1475 "type": "object",
1476 "required": [
1477 "file",
1478 "level",
1479 "line",
1480 "message",
1481 "target",
1482 "time"
1483 ],
1484 "properties": {
1485 "file": { "type": "string" },
1486 "level": { "type": "string", "enum": ["ERROR", "WARN", "INFO", "DEBUG", "TRACE"] },
1487 "line": { "type": "integer" },
1488 "message": { "type": "string" },
1489 "target": { "type": "string" },
1490 "time": { "type": "string", "format": "date-time" },
1491 "span_id": { "type": "string", "pattern": "^[a-f0-9]{16}$" },
1492 "parent_id": { "type": "string", "pattern": "^[a-f0-9]{16}$" },
1493 "trace_id": { "type": "string", "pattern": "^[a-f0-9]{32}$" },
1494 "span_name": { "type": "string" },
1495 "time.busy_us": { "type": "integer" },
1496 "time.duration_us": { "type": "integer" },
1497 "time.idle_us": { "type": "integer" },
1498 "tracestate": { "type": "string" }
1499 },
1500 "additionalProperties": true
1501 }
1502 "#;
1503
1504 #[tracing::instrument(skip_all)]
1505 async fn parent() {
1506 tracing::trace!(message = "parent!");
1507 if let Some(my_ctx) = get_distributed_tracing_context() {
1508 tracing::info!(my_trace_id = my_ctx.trace_id);
1509 }
1510 child().await;
1511 }
1512
1513 #[tracing::instrument(skip_all)]
1514 async fn child() {
1515 tracing::trace!(message = "child");
1516 if let Some(my_ctx) = get_distributed_tracing_context() {
1517 tracing::info!(my_trace_id = my_ctx.trace_id);
1518 }
1519 grandchild().await;
1520 }
1521
1522 #[tracing::instrument(skip_all)]
1523 async fn grandchild() {
1524 tracing::trace!(message = "grandchild");
1525 if let Some(my_ctx) = get_distributed_tracing_context() {
1526 tracing::info!(my_trace_id = my_ctx.trace_id);
1527 }
1528 }
1529
1530 pub fn load_log(file_name: &str) -> Result<Vec<serde_json::Value>> {
1531 let schema_json: Value =
1532 serde_json::from_str(LOG_LINE_SCHEMA).expect("schema parse failure");
1533 let compiled_schema = JSONSchema::options()
1534 .with_draft(Draft::Draft7)
1535 .compile(&schema_json)
1536 .expect("Invalid schema");
1537
1538 let f = File::open(file_name)?;
1539 let reader = BufReader::new(f);
1540 let mut result = Vec::new();
1541
1542 for (line_num, line) in reader.lines().enumerate() {
1543 let line = line?;
1544 let val: Value = serde_json::from_str(&line)
1545 .map_err(|e| anyhow!("Line {}: invalid JSON: {}", line_num + 1, e))?;
1546
1547 if let Err(errors) = compiled_schema.validate(&val) {
1548 let errs = errors.map(|e| e.to_string()).collect::<Vec<_>>().join("; ");
1549 return Err(anyhow!(
1550 "Line {}: JSON Schema Validation errors: {}",
1551 line_num + 1,
1552 errs
1553 ));
1554 }
1555 println!("{}", val);
1556 result.push(val);
1557 }
1558 Ok(result)
1559 }
1560
1561 #[tokio::test]
1562 async fn test_json_log_capture() -> Result<()> {
1563 #[allow(clippy::redundant_closure_call)]
1564 let _ = temp_env::async_with_vars(
1565 [(env_logging::DYN_LOGGING_JSONL, Some("1"))],
1566 (async || {
1567 let tmp_file = NamedTempFile::new().unwrap();
1568 let file_name = tmp_file.path().to_str().unwrap();
1569 let guard = StderrOverride::from_file(file_name)?;
1570 init();
1571 parent().await;
1572 drop(guard);
1573
1574 let lines = load_log(file_name)?;
1575
1576 let Some(trace_id) = lines
1584 .iter()
1585 .find_map(|log_line| log_line.get("trace_id").and_then(|v| v.as_str()))
1586 .map(|s| s.to_string())
1587 else {
1588 return Ok(());
1590 };
1591
1592 assert_ne!(
1594 trace_id, "00000000000000000000000000000000",
1595 "trace_id should not be a zero/invalid ID"
1596 );
1597 assert!(
1598 !trace_id.chars().all(|c| c == '0'),
1599 "trace_id should not be all zeros"
1600 );
1601
1602 for log_line in &lines {
1604 if let Some(line_trace_id) = log_line.get("trace_id") {
1605 assert_eq!(
1606 line_trace_id.as_str().unwrap(),
1607 &trace_id,
1608 "All logs should have the same trace_id"
1609 );
1610 }
1611 }
1612
1613 for log_line in &lines {
1615 if let Some(my_trace_id) = log_line.get("my_trace_id") {
1616 assert_eq!(
1617 my_trace_id,
1618 &serde_json::Value::String(trace_id.clone()),
1619 "my_trace_id should match the trace_id from distributed tracing context"
1620 );
1621 }
1622 }
1623
1624 let mut span_ids_seen: std::collections::HashSet<String> = std::collections::HashSet::new();
1626 let mut span_timestamps: std::collections::HashMap<String, DateTime<Utc>> = std::collections::HashMap::new();
1627
1628 for log_line in &lines {
1629 if let Some(span_id) = log_line.get("span_id") {
1630 let span_id_str = span_id.as_str().unwrap();
1631 assert!(
1632 is_valid_span_id(span_id_str),
1633 "Invalid span_id format: {}",
1634 span_id_str
1635 );
1636 span_ids_seen.insert(span_id_str.to_string());
1637 }
1638
1639 if let Some(time_str) = log_line.get("time").and_then(|v| v.as_str()) {
1641 let timestamp = DateTime::parse_from_rfc3339(time_str)
1642 .expect("All timestamps should be valid RFC3339 format")
1643 .with_timezone(&Utc);
1644
1645 if let Some(span_name) = log_line.get("span_name").and_then(|v| v.as_str()) {
1647 span_timestamps.insert(span_name.to_string(), timestamp);
1648 }
1649 }
1650 }
1651
1652 let parent_span_id = lines
1655 .iter()
1656 .find(|log_line| {
1657 log_line.get("span_name")
1658 .and_then(|v| v.as_str()) == Some("parent")
1659 })
1660 .and_then(|log_line| {
1661 log_line.get("span_id")
1662 .and_then(|v| v.as_str())
1663 .map(|s| s.to_string())
1664 })
1665 .expect("Should find parent span with span_id");
1666
1667 let child_span_id = lines
1668 .iter()
1669 .find(|log_line| {
1670 log_line.get("span_name")
1671 .and_then(|v| v.as_str()) == Some("child")
1672 })
1673 .and_then(|log_line| {
1674 log_line.get("span_id")
1675 .and_then(|v| v.as_str())
1676 .map(|s| s.to_string())
1677 })
1678 .expect("Should find child span with span_id");
1679
1680 let grandchild_span_id = lines
1681 .iter()
1682 .find(|log_line| {
1683 log_line.get("span_name")
1684 .and_then(|v| v.as_str()) == Some("grandchild")
1685 })
1686 .and_then(|log_line| {
1687 log_line.get("span_id")
1688 .and_then(|v| v.as_str())
1689 .map(|s| s.to_string())
1690 })
1691 .expect("Should find grandchild span with span_id");
1692
1693 assert_ne!(parent_span_id, child_span_id, "Parent and child should have different span IDs");
1695 assert_ne!(child_span_id, grandchild_span_id, "Child and grandchild should have different span IDs");
1696 assert_ne!(parent_span_id, grandchild_span_id, "Parent and grandchild should have different span IDs");
1697
1698 for log_line in &lines {
1700 if let Some(span_name) = log_line.get("span_name")
1701 && let Some(span_name_str) = span_name.as_str()
1702 && span_name_str == "parent"
1703 {
1704 assert!(
1705 log_line.get("parent_id").is_none(),
1706 "Parent span should not have a parent_id"
1707 );
1708 }
1709 }
1710
1711 for log_line in &lines {
1713 if let Some(span_name) = log_line.get("span_name")
1714 && let Some(span_name_str) = span_name.as_str()
1715 && span_name_str == "child"
1716 {
1717 let parent_id = log_line.get("parent_id")
1718 .and_then(|v| v.as_str())
1719 .expect("Child span should have a parent_id");
1720 assert_eq!(
1721 parent_id,
1722 parent_span_id,
1723 "Child's parent_id should match parent's span_id"
1724 );
1725 }
1726 }
1727
1728 for log_line in &lines {
1730 if let Some(span_name) = log_line.get("span_name")
1731 && let Some(span_name_str) = span_name.as_str()
1732 && span_name_str == "grandchild"
1733 {
1734 let parent_id = log_line.get("parent_id")
1735 .and_then(|v| v.as_str())
1736 .expect("Grandchild span should have a parent_id");
1737 assert_eq!(
1738 parent_id,
1739 child_span_id,
1740 "Grandchild's parent_id should match child's span_id"
1741 );
1742 }
1743 }
1744
1745 let parent_time = span_timestamps.get("parent")
1747 .expect("Should have timestamp for parent span");
1748 let child_time = span_timestamps.get("child")
1749 .expect("Should have timestamp for child span");
1750 let grandchild_time = span_timestamps.get("grandchild")
1751 .expect("Should have timestamp for grandchild span");
1752
1753 assert!(
1755 parent_time <= child_time,
1756 "Parent span should log before or at same time as child span (parent: {}, child: {})",
1757 parent_time,
1758 child_time
1759 );
1760 assert!(
1761 child_time <= grandchild_time,
1762 "Child span should log before or at same time as grandchild span (child: {}, grandchild: {})",
1763 child_time,
1764 grandchild_time
1765 );
1766
1767 Ok::<(), anyhow::Error>(())
1768 })(),
1769 )
1770 .await;
1771 Ok(())
1772 }
1773
1774 #[tracing::instrument(level = "debug", skip_all)]
1776 async fn debug_level_span() {
1777 tracing::debug!("inside debug span");
1778 }
1779
1780 #[tracing::instrument(level = "info", skip_all)]
1781 async fn info_level_span() {
1782 tracing::info!("inside info span");
1783 }
1784
1785 #[tracing::instrument(level = "warn", skip_all)]
1786 async fn warn_level_span() {
1787 tracing::warn!("inside warn span");
1788 }
1789
1790 #[tracing::instrument(level = "info", target = "other_module", skip_all)]
1793 async fn other_target_info_span() {
1794 tracing::info!(target: "other_module", "inside other target span");
1795 }
1796
1797 #[test]
1808 fn test_span_events() {
1809 use std::process::Command;
1810
1811 let output = Command::new("cargo")
1813 .args([
1814 "test",
1815 "-p",
1816 "dynamo-runtime",
1817 "test_span_events_subprocess",
1818 "--",
1819 "--exact",
1820 "--nocapture",
1821 ])
1822 .env("DYN_LOGGING_JSONL", "1")
1823 .env("DYN_LOGGING_SPAN_EVENTS", "1")
1824 .env("DYN_LOG", "warn,dynamo_runtime::logging::tests=debug")
1825 .output()
1826 .expect("Failed to execute subprocess test");
1827
1828 if !output.status.success() {
1830 eprintln!(
1831 "=== STDOUT ===\n{}",
1832 String::from_utf8_lossy(&output.stdout)
1833 );
1834 eprintln!(
1835 "=== STDERR ===\n{}",
1836 String::from_utf8_lossy(&output.stderr)
1837 );
1838 }
1839
1840 assert!(
1841 output.status.success(),
1842 "Subprocess test failed with exit code: {:?}",
1843 output.status.code()
1844 );
1845 }
1846
1847 #[tokio::test]
1850 async fn test_span_events_subprocess() -> Result<()> {
1851 if std::env::var("DYN_LOGGING_SPAN_EVENTS").is_err() {
1853 return Ok(());
1854 }
1855
1856 let tmp_file = NamedTempFile::new().unwrap();
1857 let file_name = tmp_file.path().to_str().unwrap();
1858 let guard = StderrOverride::from_file(file_name)?;
1859 init();
1860
1861 parent().await;
1863
1864 debug_level_span().await;
1866 info_level_span().await;
1867 warn_level_span().await;
1868
1869 other_target_info_span().await;
1871
1872 drop(guard);
1873
1874 let lines = load_log(file_name)?;
1875
1876 let has_span_event = |msg: &str, span_name: &str| {
1878 lines.iter().any(|log| {
1879 log.get("message").and_then(|v| v.as_str()) == Some(msg)
1880 && log.get("span_name").and_then(|v| v.as_str()) == Some(span_name)
1881 })
1882 };
1883
1884 let get_span_events = |msg: &str| -> Vec<&serde_json::Value> {
1886 lines
1887 .iter()
1888 .filter(|log| log.get("message").and_then(|v| v.as_str()) == Some(msg))
1889 .collect()
1890 };
1891
1892 let span_created_events = get_span_events("SPAN_FIRST_ENTRY");
1894 for event in &span_created_events {
1895 assert!(
1897 event.get("span_name").is_some(),
1898 "SPAN_FIRST_ENTRY must have span_name"
1899 );
1900 let trace_id = event
1902 .get("trace_id")
1903 .and_then(|v| v.as_str())
1904 .expect("SPAN_FIRST_ENTRY must have trace_id");
1905 assert!(
1906 trace_id.len() == 32 && trace_id.chars().all(|c| c.is_ascii_hexdigit()),
1907 "SPAN_FIRST_ENTRY must have valid trace_id format"
1908 );
1909 let span_id = event
1911 .get("span_id")
1912 .and_then(|v| v.as_str())
1913 .expect("SPAN_FIRST_ENTRY must have span_id");
1914 assert!(
1915 is_valid_span_id(span_id),
1916 "SPAN_FIRST_ENTRY must have valid span_id"
1917 );
1918 }
1919
1920 let span_closed_events = get_span_events("SPAN_CLOSED");
1922 for event in &span_closed_events {
1923 assert!(
1924 event.get("span_name").is_some(),
1925 "SPAN_CLOSED must have span_name"
1926 );
1927 assert!(
1928 event.get("time.busy_us").is_some()
1929 || event.get("time.idle_us").is_some()
1930 || event.get("time.duration_us").is_some(),
1931 "SPAN_CLOSED must have timing information"
1932 );
1933 let trace_id = event
1935 .get("trace_id")
1936 .and_then(|v| v.as_str())
1937 .expect("SPAN_CLOSED must have trace_id");
1938 assert!(
1939 trace_id.len() == 32 && trace_id.chars().all(|c| c.is_ascii_hexdigit()),
1940 "SPAN_CLOSED must have valid trace_id format"
1941 );
1942 }
1943
1944 assert!(
1948 has_span_event("SPAN_FIRST_ENTRY", "debug_level_span"),
1949 "DEBUG span from allowed target MUST pass (target=debug filter)"
1950 );
1951 assert!(
1952 has_span_event("SPAN_FIRST_ENTRY", "info_level_span"),
1953 "INFO span from allowed target MUST pass (target=debug filter)"
1954 );
1955 assert!(
1956 has_span_event("SPAN_FIRST_ENTRY", "warn_level_span"),
1957 "WARN span from allowed target MUST pass (target=debug filter)"
1958 );
1959
1960 assert!(
1962 has_span_event("SPAN_FIRST_ENTRY", "parent"),
1963 "parent span (INFO) from allowed target MUST pass"
1964 );
1965 assert!(
1966 has_span_event("SPAN_FIRST_ENTRY", "child"),
1967 "child span (INFO) from allowed target MUST pass"
1968 );
1969 assert!(
1970 has_span_event("SPAN_FIRST_ENTRY", "grandchild"),
1971 "grandchild span (INFO) from allowed target MUST pass"
1972 );
1973
1974 assert!(
1977 !has_span_event("SPAN_FIRST_ENTRY", "other_target_info_span"),
1978 "INFO span from non-allowed target (other_module) MUST be filtered out"
1979 );
1980
1981 for event in &span_created_events {
1983 let target = event.get("target").and_then(|v| v.as_str()).unwrap_or("");
1984 let level = event.get("level").and_then(|v| v.as_str()).unwrap_or("");
1985
1986 if level == "DEBUG" || level == "INFO" {
1988 assert!(
1989 target.contains("dynamo_runtime::logging::tests"),
1990 "DEBUG/INFO span must be from allowed target, got target={target}"
1991 );
1992 }
1993 }
1994
1995 Ok(())
1996 }
1997}