1use std::collections::{BTreeMap, HashMap};
30use std::sync::Once;
31
32use figment::{
33 Figment,
34 providers::{Format, Serialized, Toml},
35};
36use serde::{Deserialize, Serialize};
37use tracing::level_filters::LevelFilter;
38use tracing::{Event, Subscriber};
39use tracing_subscriber::EnvFilter;
40use tracing_subscriber::fmt::time::FormatTime;
41use tracing_subscriber::fmt::time::LocalTime;
42use tracing_subscriber::fmt::time::SystemTime;
43use tracing_subscriber::fmt::time::UtcTime;
44use tracing_subscriber::fmt::{FmtContext, FormatFields};
45use tracing_subscriber::fmt::{FormattedFields, format::Writer};
46use tracing_subscriber::prelude::*;
47use tracing_subscriber::registry::LookupSpan;
48use tracing_subscriber::{filter::Directive, fmt};
49
50use crate::config::{disable_ansi_logging, jsonl_logging_enabled, span_events_enabled};
51use async_nats::{HeaderMap, HeaderValue};
52use axum::extract::FromRequestParts;
53use axum::http;
54use axum::http::Request;
55use axum::http::request::Parts;
56use serde_json::Value;
57use std::convert::Infallible;
58use std::time::Instant;
59use tower_http::trace::{DefaultMakeSpan, TraceLayer};
60use tracing::Id;
61use tracing::Span;
62use tracing::field::Field;
63use tracing::span;
64use tracing_subscriber::Layer;
65use tracing_subscriber::Registry;
66use tracing_subscriber::field::Visit;
67use tracing_subscriber::fmt::format::FmtSpan;
68use tracing_subscriber::layer::Context;
69use tracing_subscriber::registry::SpanData;
70use uuid::Uuid;
71
72use opentelemetry::propagation::{Extractor, Injector, TextMapPropagator};
73use opentelemetry::trace::TraceContextExt;
74use opentelemetry::{global, trace::Tracer};
75use opentelemetry_otlp::WithExportConfig;
76
77use opentelemetry::trace::TracerProvider as _;
78use opentelemetry::{Key, KeyValue};
79use opentelemetry_sdk::Resource;
80use opentelemetry_sdk::trace::SdkTracerProvider;
81use tracing::error;
82use tracing_subscriber::layer::SubscriberExt;
83use std::time::Duration;
86use tracing::{info, instrument};
87use tracing_opentelemetry::OpenTelemetrySpanExt;
88use tracing_subscriber::util::SubscriberInitExt;
89
90use crate::config::environment_names::logging as env_logging;
91
92use dynamo_config::env_is_truthy;
93
94const DEFAULT_FILTER_LEVEL: &str = "info";
96
97const DEFAULT_OTLP_ENDPOINT: &str = "http://localhost:4317";
99
100const DEFAULT_OTEL_SERVICE_NAME: &str = "dynamo";
102
103static INIT: Once = Once::new();
105
106#[derive(Serialize, Deserialize, Debug)]
107struct LoggingConfig {
108 log_level: String,
109 log_filters: HashMap<String, String>,
110}
111impl Default for LoggingConfig {
112 fn default() -> Self {
113 LoggingConfig {
114 log_level: DEFAULT_FILTER_LEVEL.to_string(),
115 log_filters: HashMap::from([
116 ("h2".to_string(), "error".to_string()),
117 ("tower".to_string(), "error".to_string()),
118 ("hyper_util".to_string(), "error".to_string()),
119 ("neli".to_string(), "error".to_string()),
120 ("async_nats".to_string(), "error".to_string()),
121 ("rustls".to_string(), "error".to_string()),
122 ("tokenizers".to_string(), "error".to_string()),
123 ("axum".to_string(), "error".to_string()),
124 ("tonic".to_string(), "error".to_string()),
125 ("hf_hub".to_string(), "error".to_string()),
126 ("opentelemetry".to_string(), "error".to_string()),
127 ("opentelemetry-otlp".to_string(), "error".to_string()),
128 ("opentelemetry_sdk".to_string(), "error".to_string()),
129 ]),
130 }
131 }
132}
133
134fn otlp_exporter_enabled() -> bool {
136 env_is_truthy(env_logging::otlp::OTEL_EXPORT_ENABLED)
137}
138
139fn get_service_name() -> String {
141 std::env::var(env_logging::otlp::OTEL_SERVICE_NAME)
142 .unwrap_or_else(|_| DEFAULT_OTEL_SERVICE_NAME.to_string())
143}
144
145pub fn is_valid_trace_id(trace_id: &str) -> bool {
148 trace_id.len() == 32 && trace_id.chars().all(|c| c.is_ascii_hexdigit())
149}
150
151pub fn is_valid_span_id(span_id: &str) -> bool {
154 span_id.len() == 16 && span_id.chars().all(|c| c.is_ascii_hexdigit())
155}
156
157pub struct DistributedTraceIdLayer;
158
159#[derive(Debug, Clone, Serialize, Deserialize)]
160pub struct DistributedTraceContext {
161 pub trace_id: String,
162 pub span_id: String,
163 #[serde(skip_serializing_if = "Option::is_none")]
164 pub parent_id: Option<String>,
165 #[serde(skip_serializing_if = "Option::is_none")]
166 pub tracestate: Option<String>,
167 #[serde(skip)]
168 start: Option<Instant>,
169 #[serde(skip)]
170 end: Option<Instant>,
171 #[serde(skip_serializing_if = "Option::is_none")]
172 pub x_request_id: Option<String>,
173 #[serde(skip_serializing_if = "Option::is_none")]
174 pub x_dynamo_request_id: Option<String>,
175}
176
177#[derive(Debug, Clone)]
179struct PendingDistributedTraceContext {
180 trace_id: Option<String>,
181 span_id: Option<String>,
182 parent_id: Option<String>,
183 tracestate: Option<String>,
184 x_request_id: Option<String>,
185 x_dynamo_request_id: Option<String>,
186}
187
188macro_rules! emit_at_level {
190 ($level:expr, target: $target:expr, $($arg:tt)*) => {
191 match $level {
195 &tracing::Level::ERROR => tracing::event!(target: $target, tracing::Level::ERROR, $($arg)*),
196 &tracing::Level::WARN => tracing::event!(target: $target, tracing::Level::WARN, $($arg)*),
197 &tracing::Level::INFO => tracing::event!(target: $target, tracing::Level::INFO, $($arg)*),
198 &tracing::Level::DEBUG => tracing::event!(target: $target, tracing::Level::DEBUG, $($arg)*),
199 &tracing::Level::TRACE => tracing::event!(target: $target, tracing::Level::TRACE, $($arg)*),
200 }
201 };
202}
203
204impl DistributedTraceContext {
205 pub fn create_traceparent(&self) -> String {
207 format!("00-{}-{}-01", self.trace_id, self.span_id)
208 }
209}
210
211pub fn parse_traceparent(traceparent: &str) -> (Option<String>, Option<String>) {
213 let pieces: Vec<_> = traceparent.split('-').collect();
214 if pieces.len() != 4 {
215 return (None, None);
216 }
217 let trace_id = pieces[1];
218 let parent_id = pieces[2];
219
220 if !is_valid_trace_id(trace_id) || !is_valid_span_id(parent_id) {
221 return (None, None);
222 }
223
224 (Some(trace_id.to_string()), Some(parent_id.to_string()))
225}
226
227#[derive(Debug, Clone, Default)]
228pub struct TraceParent {
229 pub trace_id: Option<String>,
230 pub parent_id: Option<String>,
231 pub tracestate: Option<String>,
232 pub x_request_id: Option<String>,
233 pub x_dynamo_request_id: Option<String>,
234}
235
236pub trait GenericHeaders {
237 fn get(&self, key: &str) -> Option<&str>;
238}
239
240impl GenericHeaders for async_nats::HeaderMap {
241 fn get(&self, key: &str) -> Option<&str> {
242 async_nats::HeaderMap::get(self, key).map(|value| value.as_str())
243 }
244}
245
246impl GenericHeaders for http::HeaderMap {
247 fn get(&self, key: &str) -> Option<&str> {
248 http::HeaderMap::get(self, key).and_then(|value| value.to_str().ok())
249 }
250}
251
252impl TraceParent {
253 pub fn from_headers<H: GenericHeaders>(headers: &H) -> TraceParent {
254 let mut trace_id = None;
255 let mut parent_id = None;
256 let mut tracestate = None;
257 let mut x_request_id = None;
258 let mut x_dynamo_request_id = None;
259
260 if let Some(header_value) = headers.get("traceparent") {
261 (trace_id, parent_id) = parse_traceparent(header_value);
262 }
263
264 if let Some(header_value) = headers.get("x-request-id") {
265 x_request_id = Some(header_value.to_string());
266 }
267
268 if let Some(header_value) = headers.get("tracestate") {
269 tracestate = Some(header_value.to_string());
270 }
271
272 if let Some(header_value) = headers.get("x-dynamo-request-id") {
273 x_dynamo_request_id = Some(header_value.to_string());
274 }
275
276 let x_dynamo_request_id =
278 x_dynamo_request_id.filter(|id| uuid::Uuid::parse_str(id).is_ok());
279 TraceParent {
280 trace_id,
281 parent_id,
282 tracestate,
283 x_request_id,
284 x_dynamo_request_id,
285 }
286 }
287}
288
289pub fn make_request_span<B>(req: &Request<B>) -> Span {
291 let method = req.method();
292 let uri = req.uri();
293 let version = format!("{:?}", req.version());
294 let trace_parent = TraceParent::from_headers(req.headers());
295
296 let otel_context = extract_otel_context_from_http_headers(req.headers());
297
298 let span = tracing::info_span!(
299 "http-request",
300 method = %method,
301 uri = %uri,
302 version = %version,
303 trace_id = trace_parent.trace_id,
304 parent_id = trace_parent.parent_id,
305 x_request_id = trace_parent.x_request_id,
306 x_dynamo_request_id = trace_parent.x_dynamo_request_id,
307 );
308
309 if let Some(context) = otel_context {
310 let _ = span.set_parent(context);
311 }
312
313 span
314}
315
316fn extract_otel_context_from_http_headers(
318 headers: &http::HeaderMap,
319) -> Option<opentelemetry::Context> {
320 let traceparent_value = headers.get("traceparent")?.to_str().ok()?;
321
322 struct HttpHeaderExtractor<'a>(&'a http::HeaderMap);
323
324 impl<'a> Extractor for HttpHeaderExtractor<'a> {
325 fn get(&self, key: &str) -> Option<&str> {
326 self.0.get(key).and_then(|v| v.to_str().ok())
327 }
328
329 fn keys(&self) -> Vec<&str> {
330 vec!["traceparent", "tracestate"]
331 .into_iter()
332 .filter(|&key| self.0.get(key).is_some())
333 .collect()
334 }
335 }
336
337 if traceparent_value.is_empty() {
339 return None;
340 }
341
342 let extractor = HttpHeaderExtractor(headers);
343 let otel_context = TRACE_PROPAGATOR.extract(&extractor);
344
345 if otel_context.span().span_context().is_valid() {
346 Some(otel_context)
347 } else {
348 None
349 }
350}
351
352pub fn make_handle_payload_span(
354 headers: &async_nats::HeaderMap,
355 component: &str,
356 endpoint: &str,
357 namespace: &str,
358 instance_id: u64,
359) -> Span {
360 let (otel_context, trace_id, parent_span_id) = extract_otel_context_from_nats_headers(headers);
361 let trace_parent = TraceParent::from_headers(headers);
362
363 if let (Some(trace_id), Some(parent_id)) = (trace_id.as_ref(), parent_span_id.as_ref()) {
364 let span = tracing::info_span!(
365 "handle_payload",
366 trace_id = trace_id.as_str(),
367 parent_id = parent_id.as_str(),
368 x_request_id = trace_parent.x_request_id,
369 x_dynamo_request_id = trace_parent.x_dynamo_request_id,
370 tracestate = trace_parent.tracestate,
371 component = component,
372 endpoint = endpoint,
373 namespace = namespace,
374 instance_id = instance_id,
375 );
376
377 if let Some(context) = otel_context {
378 let _ = span.set_parent(context);
379 }
380 span
381 } else {
382 tracing::info_span!(
383 "handle_payload",
384 x_request_id = trace_parent.x_request_id,
385 x_dynamo_request_id = trace_parent.x_dynamo_request_id,
386 tracestate = trace_parent.tracestate,
387 component = component,
388 endpoint = endpoint,
389 namespace = namespace,
390 instance_id = instance_id,
391 )
392 }
393}
394
395pub fn make_handle_payload_span_from_tcp_headers(
397 headers: &std::collections::HashMap<String, String>,
398 component: &str,
399 endpoint: &str,
400 namespace: &str,
401 instance_id: u64,
402) -> Span {
403 let (otel_context, trace_id, parent_span_id) = extract_otel_context_from_tcp_headers(headers);
404 let x_request_id = headers.get("x-request-id").cloned();
405 let x_dynamo_request_id = headers.get("x-dynamo-request-id").cloned();
406 let tracestate = headers.get("tracestate").cloned();
407
408 if let (Some(trace_id), Some(parent_id)) = (trace_id.as_ref(), parent_span_id.as_ref()) {
409 let span = tracing::info_span!(
410 "handle_payload",
411 trace_id = trace_id.as_str(),
412 parent_id = parent_id.as_str(),
413 x_request_id = x_request_id,
414 x_dynamo_request_id = x_dynamo_request_id,
415 tracestate = tracestate,
416 component = component,
417 endpoint = endpoint,
418 namespace = namespace,
419 instance_id = instance_id,
420 );
421
422 if let Some(context) = otel_context {
423 let _ = span.set_parent(context);
424 }
425 span
426 } else {
427 tracing::info_span!(
428 "handle_payload",
429 x_request_id = x_request_id,
430 x_dynamo_request_id = x_dynamo_request_id,
431 tracestate = tracestate,
432 component = component,
433 endpoint = endpoint,
434 namespace = namespace,
435 instance_id = instance_id,
436 )
437 }
438}
439
440fn extract_otel_context_from_tcp_headers(
442 headers: &std::collections::HashMap<String, String>,
443) -> (
444 Option<opentelemetry::Context>,
445 Option<String>,
446 Option<String>,
447) {
448 let traceparent_value = match headers.get("traceparent") {
449 Some(value) => value.as_str(),
450 None => return (None, None, None),
451 };
452
453 let (trace_id, parent_span_id) = parse_traceparent(traceparent_value);
454
455 struct TcpHeaderExtractor<'a>(&'a std::collections::HashMap<String, String>);
456
457 impl<'a> Extractor for TcpHeaderExtractor<'a> {
458 fn get(&self, key: &str) -> Option<&str> {
459 self.0.get(key).map(|s| s.as_str())
460 }
461
462 fn keys(&self) -> Vec<&str> {
463 vec!["traceparent", "tracestate"]
464 .into_iter()
465 .filter(|&key| self.0.get(key).is_some())
466 .collect()
467 }
468 }
469
470 let extractor = TcpHeaderExtractor(headers);
471 let otel_context = TRACE_PROPAGATOR.extract(&extractor);
472
473 let context_with_trace = if otel_context.span().span_context().is_valid() {
474 Some(otel_context)
475 } else {
476 None
477 };
478
479 (context_with_trace, trace_id, parent_span_id)
480}
481
482pub fn extract_otel_context_from_nats_headers(
484 headers: &async_nats::HeaderMap,
485) -> (
486 Option<opentelemetry::Context>,
487 Option<String>,
488 Option<String>,
489) {
490 let traceparent_value = match headers.get("traceparent") {
491 Some(value) => value.as_str(),
492 None => return (None, None, None),
493 };
494
495 let (trace_id, parent_span_id) = parse_traceparent(traceparent_value);
496
497 struct NatsHeaderExtractor<'a>(&'a async_nats::HeaderMap);
498
499 impl<'a> Extractor for NatsHeaderExtractor<'a> {
500 fn get(&self, key: &str) -> Option<&str> {
501 self.0.get(key).map(|value| value.as_str())
502 }
503
504 fn keys(&self) -> Vec<&str> {
505 vec!["traceparent", "tracestate"]
506 .into_iter()
507 .filter(|&key| self.0.get(key).is_some())
508 .collect()
509 }
510 }
511
512 let extractor = NatsHeaderExtractor(headers);
513 let otel_context = TRACE_PROPAGATOR.extract(&extractor);
514
515 let context_with_trace = if otel_context.span().span_context().is_valid() {
516 Some(otel_context)
517 } else {
518 None
519 };
520
521 (context_with_trace, trace_id, parent_span_id)
522}
523
524pub fn inject_otel_context_into_nats_headers(
526 headers: &mut async_nats::HeaderMap,
527 context: Option<opentelemetry::Context>,
528) {
529 let otel_context = context.unwrap_or_else(|| Span::current().context());
530
531 struct NatsHeaderInjector<'a>(&'a mut async_nats::HeaderMap);
532
533 impl<'a> Injector for NatsHeaderInjector<'a> {
534 fn set(&mut self, key: &str, value: String) {
535 self.0.insert(key, value);
536 }
537 }
538
539 let mut injector = NatsHeaderInjector(headers);
540 TRACE_PROPAGATOR.inject_context(&otel_context, &mut injector);
541}
542
543pub fn inject_current_trace_into_nats_headers(headers: &mut async_nats::HeaderMap) {
545 inject_otel_context_into_nats_headers(headers, None);
546}
547
548pub fn inject_trace_headers_into_map(headers: &mut std::collections::HashMap<String, String>) {
550 if let Some(trace_context) = get_distributed_tracing_context() {
551 headers.insert(
553 "traceparent".to_string(),
554 trace_context.create_traceparent(),
555 );
556
557 if let Some(tracestate) = trace_context.tracestate {
559 headers.insert("tracestate".to_string(), tracestate);
560 }
561
562 if let Some(x_request_id) = trace_context.x_request_id {
564 headers.insert("x-request-id".to_string(), x_request_id);
565 }
566 if let Some(x_dynamo_request_id) = trace_context.x_dynamo_request_id {
567 headers.insert("x-dynamo-request-id".to_string(), x_dynamo_request_id);
568 }
569 }
570}
571
572pub fn make_client_request_span(
574 operation: &str,
575 request_id: &str,
576 trace_context: Option<&DistributedTraceContext>,
577 instance_id: Option<&str>,
578) -> Span {
579 if let Some(ctx) = trace_context {
580 let mut headers = async_nats::HeaderMap::new();
581 headers.insert("traceparent", ctx.create_traceparent());
582
583 if let Some(ref tracestate) = ctx.tracestate {
584 headers.insert("tracestate", tracestate.as_str());
585 }
586
587 let (otel_context, _extracted_trace_id, _extracted_parent_span_id) =
588 extract_otel_context_from_nats_headers(&headers);
589
590 let span = if let Some(inst_id) = instance_id {
591 tracing::info_span!(
592 "client_request",
593 operation = operation,
594 request_id = request_id,
595 instance_id = inst_id,
596 trace_id = ctx.trace_id.as_str(),
597 parent_id = ctx.span_id.as_str(),
598 x_request_id = ctx.x_request_id.as_deref(),
599 x_dynamo_request_id = ctx.x_dynamo_request_id.as_deref(),
600 )
602 } else {
603 tracing::info_span!(
604 "client_request",
605 operation = operation,
606 request_id = request_id,
607 trace_id = ctx.trace_id.as_str(),
608 parent_id = ctx.span_id.as_str(),
609 x_request_id = ctx.x_request_id.as_deref(),
610 x_dynamo_request_id = ctx.x_dynamo_request_id.as_deref(),
611 )
613 };
614
615 if let Some(context) = otel_context {
616 let _ = span.set_parent(context);
617 }
618
619 span
620 } else if let Some(inst_id) = instance_id {
621 tracing::info_span!(
622 "client_request",
623 operation = operation,
624 request_id = request_id,
625 instance_id = inst_id,
626 )
627 } else {
628 tracing::info_span!(
629 "client_request",
630 operation = operation,
631 request_id = request_id,
632 )
633 }
634}
635
636#[derive(Debug, Default)]
637pub struct FieldVisitor {
638 pub fields: HashMap<String, String>,
639}
640
641impl Visit for FieldVisitor {
642 fn record_str(&mut self, field: &Field, value: &str) {
643 self.fields
644 .insert(field.name().to_string(), value.to_string());
645 }
646
647 fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
648 self.fields
649 .insert(field.name().to_string(), format!("{:?}", value).to_string());
650 }
651}
652
653impl<S> Layer<S> for DistributedTraceIdLayer
654where
655 S: Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>,
656{
657 fn on_close(&self, id: Id, ctx: Context<'_, S>) {
660 if let Some(span) = ctx.span(&id) {
661 let mut extensions = span.extensions_mut();
662 if let Some(distributed_tracing_context) =
663 extensions.get_mut::<DistributedTraceContext>()
664 {
665 distributed_tracing_context.end = Some(Instant::now());
666 }
667 }
668 }
669
670 fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &Id, ctx: Context<'_, S>) {
673 if let Some(span) = ctx.span(id) {
674 let mut trace_id: Option<String> = None;
675 let mut parent_id: Option<String> = None;
676 let mut span_id: Option<String> = None;
677 let mut x_request_id: Option<String> = None;
678 let mut x_dynamo_request_id: Option<String> = None;
679 let mut tracestate: Option<String> = None;
680 let mut visitor = FieldVisitor::default();
681 attrs.record(&mut visitor);
682
683 if let Some(trace_id_input) = visitor.fields.get("trace_id") {
685 if !is_valid_trace_id(trace_id_input) {
686 tracing::trace!("trace id '{}' is not valid! Ignoring.", trace_id_input);
687 } else {
688 trace_id = Some(trace_id_input.to_string());
689 }
690 }
691
692 if let Some(span_id_input) = visitor.fields.get("span_id") {
694 if !is_valid_span_id(span_id_input) {
695 tracing::trace!("span id '{}' is not valid! Ignoring.", span_id_input);
696 } else {
697 span_id = Some(span_id_input.to_string());
698 }
699 }
700
701 if let Some(parent_id_input) = visitor.fields.get("parent_id") {
703 if !is_valid_span_id(parent_id_input) {
704 tracing::trace!("parent id '{}' is not valid! Ignoring.", parent_id_input);
705 } else {
706 parent_id = Some(parent_id_input.to_string());
707 }
708 }
709
710 if let Some(tracestate_input) = visitor.fields.get("tracestate") {
712 tracestate = Some(tracestate_input.to_string());
713 }
714
715 if let Some(x_request_id_input) = visitor.fields.get("x_request_id") {
717 x_request_id = Some(x_request_id_input.to_string());
718 }
719
720 if let Some(x_request_id_input) = visitor.fields.get("x_dynamo_request_id") {
722 x_dynamo_request_id = Some(x_request_id_input.to_string());
723 }
724
725 if parent_id.is_none()
727 && let Some(parent_span_id) = ctx.current_span().id()
728 && let Some(parent_span) = ctx.span(parent_span_id)
729 {
730 let parent_ext = parent_span.extensions();
731 if let Some(parent_tracing_context) = parent_ext.get::<DistributedTraceContext>() {
732 trace_id = Some(parent_tracing_context.trace_id.clone());
733 parent_id = Some(parent_tracing_context.span_id.clone());
734 tracestate = parent_tracing_context.tracestate.clone();
735 }
736 }
737
738 if (parent_id.is_some() || span_id.is_some()) && trace_id.is_none() {
740 tracing::error!("parent id or span id are set but trace id is not set!");
741 parent_id = None;
743 span_id = None;
744 }
745
746 let mut extensions = span.extensions_mut();
748 extensions.insert(PendingDistributedTraceContext {
749 trace_id,
750 span_id,
751 parent_id,
752 tracestate,
753 x_request_id,
754 x_dynamo_request_id,
755 });
756 }
757 }
758
759 fn on_enter(&self, id: &Id, ctx: Context<'_, S>) {
762 if let Some(span) = ctx.span(id) {
763 {
765 let extensions = span.extensions();
766 if extensions.get::<DistributedTraceContext>().is_some() {
767 return;
768 }
769 }
770
771 let mut extensions = span.extensions_mut();
773 let pending = match extensions.remove::<PendingDistributedTraceContext>() {
774 Some(p) => p,
775 None => {
776 tracing::error!("PendingDistributedTraceContext not found in on_enter");
778 return;
779 }
780 };
781
782 let mut trace_id = pending.trace_id;
783 let mut span_id = pending.span_id;
784 let parent_id = pending.parent_id;
785 let tracestate = pending.tracestate;
786 let x_request_id = pending.x_request_id;
787 let x_dynamo_request_id = pending.x_dynamo_request_id;
788
789 drop(extensions);
792
793 if trace_id.is_none() || span_id.is_none() {
794 let extensions = span.extensions();
795 if let Some(otel_data) = extensions.get::<tracing_opentelemetry::OtelData>() {
796 if trace_id.is_none()
798 && let Some(otel_trace_id) = otel_data.trace_id()
799 {
800 let trace_id_str = format!("{}", otel_trace_id);
801 if is_valid_trace_id(&trace_id_str) {
802 trace_id = Some(trace_id_str);
803 }
804 }
805
806 if span_id.is_none()
808 && let Some(otel_span_id) = otel_data.span_id()
809 {
810 let span_id_str = format!("{}", otel_span_id);
811 if is_valid_span_id(&span_id_str) {
812 span_id = Some(span_id_str);
813 }
814 }
815 }
816 }
817
818 if trace_id.is_none() {
820 panic!(
821 "trace_id is not set in on_enter - OtelData may not be properly initialized"
822 );
823 }
824
825 if span_id.is_none() {
826 panic!("span_id is not set in on_enter - OtelData may not be properly initialized");
827 }
828
829 let span_level = span.metadata().level();
830 let mut extensions = span.extensions_mut();
831 extensions.insert(DistributedTraceContext {
832 trace_id: trace_id.expect("Trace ID must be set"),
833 span_id: span_id.expect("Span ID must be set"),
834 parent_id,
835 tracestate,
836 start: Some(Instant::now()),
837 end: None,
838 x_request_id,
839 x_dynamo_request_id,
840 });
841
842 drop(extensions);
843
844 if span_events_enabled() {
847 emit_at_level!(span_level, target: "span_event", message = "SPAN_FIRST_ENTRY");
848 }
849 }
850 }
851}
852
853pub fn get_distributed_tracing_context() -> Option<DistributedTraceContext> {
856 Span::current()
857 .with_subscriber(|(id, subscriber)| {
858 subscriber
859 .downcast_ref::<Registry>()
860 .and_then(|registry| registry.span_data(id))
861 .and_then(|span_data| {
862 let extensions = span_data.extensions();
863 extensions.get::<DistributedTraceContext>().cloned()
864 })
865 })
866 .flatten()
867}
868
869pub fn init() {
871 INIT.call_once(|| {
872 if let Err(e) = setup_logging() {
873 eprintln!("Failed to initialize logging: {}", e);
874 std::process::exit(1);
875 }
876 });
877}
878
879#[cfg(feature = "tokio-console")]
880fn setup_logging() {
881 let tokio_console_layer = console_subscriber::ConsoleLayer::builder()
882 .with_default_env()
883 .server_addr(([0, 0, 0, 0], console_subscriber::Server::DEFAULT_PORT))
884 .spawn();
885 let tokio_console_target = tracing_subscriber::filter::Targets::new()
886 .with_default(LevelFilter::ERROR)
887 .with_target("runtime", LevelFilter::TRACE)
888 .with_target("tokio", LevelFilter::TRACE);
889 let l = fmt::layer()
890 .with_ansi(!disable_ansi_logging())
891 .event_format(fmt::format().compact().with_timer(TimeFormatter::new()))
892 .with_writer(std::io::stderr)
893 .with_filter(filters(load_config()));
894 tracing_subscriber::registry()
895 .with(l)
896 .with(tokio_console_layer.with_filter(tokio_console_target))
897 .init();
898}
899
900#[cfg(not(feature = "tokio-console"))]
901fn setup_logging() -> Result<(), Box<dyn std::error::Error>> {
902 let fmt_filter_layer = filters(load_config());
903 let trace_filter_layer = filters(load_config());
904 let otel_filter_layer = filters(load_config());
905
906 if jsonl_logging_enabled() {
907 let span_events = if span_events_enabled() {
908 FmtSpan::CLOSE
909 } else {
910 FmtSpan::NONE
911 };
912 let l = fmt::layer()
913 .with_ansi(false)
914 .with_span_events(span_events)
915 .event_format(CustomJsonFormatter::new())
916 .with_writer(std::io::stderr)
917 .with_filter(fmt_filter_layer);
918
919 let service_name = get_service_name();
921
922 let (tracer_provider, endpoint_opt) = if otlp_exporter_enabled() {
924 let endpoint = std::env::var(env_logging::otlp::OTEL_EXPORTER_OTLP_TRACES_ENDPOINT)
926 .unwrap_or_else(|_| DEFAULT_OTLP_ENDPOINT.to_string());
927
928 let otlp_exporter = opentelemetry_otlp::SpanExporter::builder()
930 .with_tonic()
931 .with_endpoint(&endpoint)
932 .build()?;
933
934 let provider = opentelemetry_sdk::trace::SdkTracerProvider::builder()
936 .with_batch_exporter(otlp_exporter)
937 .with_resource(
938 opentelemetry_sdk::Resource::builder_empty()
939 .with_service_name(service_name.clone())
940 .build(),
941 )
942 .build();
943
944 (provider, Some(endpoint))
945 } else {
946 let provider = opentelemetry_sdk::trace::SdkTracerProvider::builder()
948 .with_resource(
949 opentelemetry_sdk::Resource::builder_empty()
950 .with_service_name(service_name.clone())
951 .build(),
952 )
953 .build();
954
955 (provider, None)
956 };
957
958 let tracer = tracer_provider.tracer(service_name.clone());
960
961 tracing_subscriber::registry()
962 .with(
963 tracing_opentelemetry::layer()
964 .with_tracer(tracer)
965 .with_filter(otel_filter_layer),
966 )
967 .with(DistributedTraceIdLayer.with_filter(trace_filter_layer))
968 .with(l)
969 .init();
970
971 if let Some(endpoint) = endpoint_opt {
973 tracing::info!(
974 endpoint = %endpoint,
975 service = %service_name,
976 "OpenTelemetry OTLP export enabled"
977 );
978 } else {
979 tracing::info!(
980 service = %service_name,
981 "OpenTelemetry OTLP export disabled, traces local only"
982 );
983 }
984 } else {
985 let l = fmt::layer()
986 .with_ansi(!disable_ansi_logging())
987 .event_format(fmt::format().compact().with_timer(TimeFormatter::new()))
988 .with_writer(std::io::stderr)
989 .with_filter(fmt_filter_layer);
990
991 tracing_subscriber::registry().with(l).init();
992 }
993
994 Ok(())
995}
996
997fn filters(config: LoggingConfig) -> EnvFilter {
998 let mut filter_layer = EnvFilter::builder()
999 .with_default_directive(config.log_level.parse().unwrap())
1000 .with_env_var(env_logging::DYN_LOG)
1001 .from_env_lossy();
1002
1003 for (module, level) in config.log_filters {
1004 match format!("{module}={level}").parse::<Directive>() {
1005 Ok(d) => {
1006 filter_layer = filter_layer.add_directive(d);
1007 }
1008 Err(e) => {
1009 eprintln!("Failed parsing filter '{level}' for module '{module}': {e}");
1010 }
1011 }
1012 }
1013
1014 if span_events_enabled() {
1017 filter_layer = filter_layer.add_directive("span_event=trace".parse().unwrap());
1018 }
1019
1020 filter_layer
1021}
1022
1023pub fn log_message(level: &str, message: &str, module: &str, file: &str, line: u32) {
1026 let level = match level {
1027 "debug" => log::Level::Debug,
1028 "info" => log::Level::Info,
1029 "warn" => log::Level::Warn,
1030 "error" => log::Level::Error,
1031 "warning" => log::Level::Warn,
1032 _ => log::Level::Info,
1033 };
1034 log::logger().log(
1035 &log::Record::builder()
1036 .args(format_args!("{}", message))
1037 .level(level)
1038 .target(module)
1039 .file(Some(file))
1040 .line(Some(line))
1041 .build(),
1042 );
1043}
1044
1045fn load_config() -> LoggingConfig {
1046 let config_path =
1047 std::env::var(env_logging::DYN_LOGGING_CONFIG_PATH).unwrap_or_else(|_| "".to_string());
1048 let figment = Figment::new()
1049 .merge(Serialized::defaults(LoggingConfig::default()))
1050 .merge(Toml::file("/opt/dynamo/etc/logging.toml"))
1051 .merge(Toml::file(config_path));
1052
1053 figment.extract().unwrap()
1054}
1055
1056#[derive(Serialize)]
1057struct JsonLog<'a> {
1058 time: String,
1059 level: String,
1060 #[serde(skip_serializing_if = "Option::is_none")]
1061 file: Option<&'a str>,
1062 #[serde(skip_serializing_if = "Option::is_none")]
1063 line: Option<u32>,
1064 target: String,
1065 message: serde_json::Value,
1066 #[serde(flatten)]
1067 fields: BTreeMap<String, serde_json::Value>,
1068}
1069
1070struct TimeFormatter {
1071 use_local_tz: bool,
1072}
1073
1074impl TimeFormatter {
1075 fn new() -> Self {
1076 Self {
1077 use_local_tz: crate::config::use_local_timezone(),
1078 }
1079 }
1080
1081 fn format_now(&self) -> String {
1082 if self.use_local_tz {
1083 chrono::Local::now()
1084 .format("%Y-%m-%dT%H:%M:%S%.6f%:z")
1085 .to_string()
1086 } else {
1087 chrono::Utc::now()
1088 .format("%Y-%m-%dT%H:%M:%S%.6fZ")
1089 .to_string()
1090 }
1091 }
1092}
1093
1094impl FormatTime for TimeFormatter {
1095 fn format_time(&self, w: &mut fmt::format::Writer<'_>) -> std::fmt::Result {
1096 write!(w, "{}", self.format_now())
1097 }
1098}
1099
1100struct CustomJsonFormatter {
1101 time_formatter: TimeFormatter,
1102}
1103
1104impl CustomJsonFormatter {
1105 fn new() -> Self {
1106 Self {
1107 time_formatter: TimeFormatter::new(),
1108 }
1109 }
1110}
1111
1112use once_cell::sync::Lazy;
1113use regex::Regex;
1114
1115static TRACE_PROPAGATOR: Lazy<opentelemetry_sdk::propagation::TraceContextPropagator> =
1117 Lazy::new(opentelemetry_sdk::propagation::TraceContextPropagator::new);
1118
1119fn parse_tracing_duration(s: &str) -> Option<u64> {
1120 static RE: Lazy<Regex> =
1121 Lazy::new(|| Regex::new(r#"^["']?\s*([0-9.]+)\s*(µs|us|ns|ms|s)\s*["']?$"#).unwrap());
1122 let captures = RE.captures(s)?;
1123 let value: f64 = captures[1].parse().ok()?;
1124 let unit = &captures[2];
1125 match unit {
1126 "ns" => Some((value / 1000.0) as u64),
1127 "µs" | "us" => Some(value as u64),
1128 "ms" => Some((value * 1000.0) as u64),
1129 "s" => Some((value * 1_000_000.0) as u64),
1130 _ => None,
1131 }
1132}
1133
1134impl<S, N> tracing_subscriber::fmt::FormatEvent<S, N> for CustomJsonFormatter
1135where
1136 S: Subscriber + for<'a> LookupSpan<'a>,
1137 N: for<'a> FormatFields<'a> + 'static,
1138{
1139 fn format_event(
1140 &self,
1141 ctx: &FmtContext<'_, S, N>,
1142 mut writer: Writer<'_>,
1143 event: &Event<'_>,
1144 ) -> std::fmt::Result {
1145 let mut visitor = JsonVisitor::default();
1146 let time = self.time_formatter.format_now();
1147 event.record(&mut visitor);
1148 let mut message = visitor
1149 .fields
1150 .remove("message")
1151 .unwrap_or(serde_json::Value::String("".to_string()));
1152
1153 let mut target_override: Option<String> = None;
1154
1155 let current_span = event
1156 .parent()
1157 .and_then(|id| ctx.span(id))
1158 .or_else(|| ctx.lookup_current());
1159 if let Some(span) = current_span {
1160 let ext = span.extensions();
1161 let data = ext.get::<FormattedFields<N>>().unwrap();
1162 let span_fields: Vec<(&str, &str)> = data
1163 .fields
1164 .split(' ')
1165 .filter_map(|entry| entry.split_once('='))
1166 .collect();
1167 for (name, value) in span_fields {
1168 visitor.fields.insert(
1169 name.to_string(),
1170 serde_json::Value::String(value.trim_matches('"').to_string()),
1171 );
1172 }
1173
1174 let busy_us = visitor
1175 .fields
1176 .remove("time.busy")
1177 .and_then(|v| parse_tracing_duration(&v.to_string()));
1178 let idle_us = visitor
1179 .fields
1180 .remove("time.idle")
1181 .and_then(|v| parse_tracing_duration(&v.to_string()));
1182
1183 if let (Some(busy_us), Some(idle_us)) = (busy_us, idle_us) {
1184 visitor.fields.insert(
1185 "time.busy_us".to_string(),
1186 serde_json::Value::Number(busy_us.into()),
1187 );
1188 visitor.fields.insert(
1189 "time.idle_us".to_string(),
1190 serde_json::Value::Number(idle_us.into()),
1191 );
1192 visitor.fields.insert(
1193 "time.duration_us".to_string(),
1194 serde_json::Value::Number((busy_us + idle_us).into()),
1195 );
1196 }
1197
1198 let is_span_created = message.as_str() == Some("SPAN_FIRST_ENTRY");
1199 let is_span_closed = message.as_str() == Some("close");
1200 if is_span_created || is_span_closed {
1201 target_override = Some(span.metadata().target().to_string());
1202 if is_span_closed {
1203 message = serde_json::Value::String("SPAN_CLOSED".to_string());
1204 }
1205 }
1206
1207 visitor.fields.insert(
1208 "span_name".to_string(),
1209 serde_json::Value::String(span.name().to_string()),
1210 );
1211
1212 if let Some(tracing_context) = ext.get::<DistributedTraceContext>() {
1213 visitor.fields.insert(
1214 "span_id".to_string(),
1215 serde_json::Value::String(tracing_context.span_id.clone()),
1216 );
1217 visitor.fields.insert(
1218 "trace_id".to_string(),
1219 serde_json::Value::String(tracing_context.trace_id.clone()),
1220 );
1221 if let Some(parent_id) = tracing_context.parent_id.clone() {
1222 visitor.fields.insert(
1223 "parent_id".to_string(),
1224 serde_json::Value::String(parent_id),
1225 );
1226 } else {
1227 visitor.fields.remove("parent_id");
1228 }
1229 if let Some(tracestate) = tracing_context.tracestate.clone() {
1230 visitor.fields.insert(
1231 "tracestate".to_string(),
1232 serde_json::Value::String(tracestate),
1233 );
1234 } else {
1235 visitor.fields.remove("tracestate");
1236 }
1237 if let Some(x_request_id) = tracing_context.x_request_id.clone() {
1238 visitor.fields.insert(
1239 "x_request_id".to_string(),
1240 serde_json::Value::String(x_request_id),
1241 );
1242 } else {
1243 visitor.fields.remove("x_request_id");
1244 }
1245
1246 if let Some(x_dynamo_request_id) = tracing_context.x_dynamo_request_id.clone() {
1247 visitor.fields.insert(
1248 "x_dynamo_request_id".to_string(),
1249 serde_json::Value::String(x_dynamo_request_id),
1250 );
1251 } else {
1252 visitor.fields.remove("x_dynamo_request_id");
1253 }
1254 } else {
1255 tracing::error!(
1256 "Distributed Trace Context not found, falling back to internal ids"
1257 );
1258 visitor.fields.insert(
1259 "span_id".to_string(),
1260 serde_json::Value::String(span.id().into_u64().to_string()),
1261 );
1262 if let Some(parent) = span.parent() {
1263 visitor.fields.insert(
1264 "parent_id".to_string(),
1265 serde_json::Value::String(parent.id().into_u64().to_string()),
1266 );
1267 }
1268 }
1269 } else {
1270 let reserved_fields = [
1271 "trace_id",
1272 "span_id",
1273 "parent_id",
1274 "span_name",
1275 "tracestate",
1276 ];
1277 for reserved_field in reserved_fields {
1278 visitor.fields.remove(reserved_field);
1279 }
1280 }
1281 let metadata = event.metadata();
1282 let log = JsonLog {
1283 level: metadata.level().to_string(),
1284 time,
1285 file: metadata.file(),
1286 line: metadata.line(),
1287 target: target_override.unwrap_or_else(|| metadata.target().to_string()),
1288 message,
1289 fields: visitor.fields,
1290 };
1291 let json = serde_json::to_string(&log).unwrap();
1292 writeln!(writer, "{json}")
1293 }
1294}
1295
1296#[derive(Default)]
1297struct JsonVisitor {
1298 fields: BTreeMap<String, serde_json::Value>,
1299}
1300
1301impl tracing::field::Visit for JsonVisitor {
1302 fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
1303 self.fields.insert(
1304 field.name().to_string(),
1305 serde_json::Value::String(format!("{value:?}")),
1306 );
1307 }
1308
1309 fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
1310 if field.name() != "message" {
1311 match serde_json::from_str::<Value>(value) {
1312 Ok(json_val) => self.fields.insert(field.name().to_string(), json_val),
1313 Err(_) => self.fields.insert(field.name().to_string(), value.into()),
1314 };
1315 } else {
1316 self.fields.insert(field.name().to_string(), value.into());
1317 }
1318 }
1319
1320 fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
1321 self.fields
1322 .insert(field.name().to_string(), serde_json::Value::Bool(value));
1323 }
1324
1325 fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
1326 self.fields.insert(
1327 field.name().to_string(),
1328 serde_json::Value::Number(value.into()),
1329 );
1330 }
1331
1332 fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
1333 self.fields.insert(
1334 field.name().to_string(),
1335 serde_json::Value::Number(value.into()),
1336 );
1337 }
1338
1339 fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
1340 use serde_json::value::Number;
1341 self.fields.insert(
1342 field.name().to_string(),
1343 serde_json::Value::Number(Number::from_f64(value).unwrap_or(0.into())),
1344 );
1345 }
1346}
1347
1348#[cfg(test)]
1349pub mod tests {
1350 use super::*;
1351 use anyhow::{Result, anyhow};
1352 use chrono::{DateTime, Utc};
1353 use jsonschema::{Draft, JSONSchema};
1354 use serde_json::Value;
1355 use std::fs::File;
1356 use std::io::{BufRead, BufReader};
1357 use stdio_override::*;
1358 use tempfile::NamedTempFile;
1359
1360 static LOG_LINE_SCHEMA: &str = r#"
1361 {
1362 "$schema": "http://json-schema.org/draft-07/schema#",
1363 "title": "Runtime Log Line",
1364 "type": "object",
1365 "required": [
1366 "file",
1367 "level",
1368 "line",
1369 "message",
1370 "target",
1371 "time"
1372 ],
1373 "properties": {
1374 "file": { "type": "string" },
1375 "level": { "type": "string", "enum": ["ERROR", "WARN", "INFO", "DEBUG", "TRACE"] },
1376 "line": { "type": "integer" },
1377 "message": { "type": "string" },
1378 "target": { "type": "string" },
1379 "time": { "type": "string", "format": "date-time" },
1380 "span_id": { "type": "string", "pattern": "^[a-f0-9]{16}$" },
1381 "parent_id": { "type": "string", "pattern": "^[a-f0-9]{16}$" },
1382 "trace_id": { "type": "string", "pattern": "^[a-f0-9]{32}$" },
1383 "span_name": { "type": "string" },
1384 "time.busy_us": { "type": "integer" },
1385 "time.duration_us": { "type": "integer" },
1386 "time.idle_us": { "type": "integer" },
1387 "tracestate": { "type": "string" }
1388 },
1389 "additionalProperties": true
1390 }
1391 "#;
1392
1393 #[tracing::instrument(skip_all)]
1394 async fn parent() {
1395 tracing::trace!(message = "parent!");
1396 if let Some(my_ctx) = get_distributed_tracing_context() {
1397 tracing::info!(my_trace_id = my_ctx.trace_id);
1398 }
1399 child().await;
1400 }
1401
1402 #[tracing::instrument(skip_all)]
1403 async fn child() {
1404 tracing::trace!(message = "child");
1405 if let Some(my_ctx) = get_distributed_tracing_context() {
1406 tracing::info!(my_trace_id = my_ctx.trace_id);
1407 }
1408 grandchild().await;
1409 }
1410
1411 #[tracing::instrument(skip_all)]
1412 async fn grandchild() {
1413 tracing::trace!(message = "grandchild");
1414 if let Some(my_ctx) = get_distributed_tracing_context() {
1415 tracing::info!(my_trace_id = my_ctx.trace_id);
1416 }
1417 }
1418
1419 pub fn load_log(file_name: &str) -> Result<Vec<serde_json::Value>> {
1420 let schema_json: Value =
1421 serde_json::from_str(LOG_LINE_SCHEMA).expect("schema parse failure");
1422 let compiled_schema = JSONSchema::options()
1423 .with_draft(Draft::Draft7)
1424 .compile(&schema_json)
1425 .expect("Invalid schema");
1426
1427 let f = File::open(file_name)?;
1428 let reader = BufReader::new(f);
1429 let mut result = Vec::new();
1430
1431 for (line_num, line) in reader.lines().enumerate() {
1432 let line = line?;
1433 let val: Value = serde_json::from_str(&line)
1434 .map_err(|e| anyhow!("Line {}: invalid JSON: {}", line_num + 1, e))?;
1435
1436 if let Err(errors) = compiled_schema.validate(&val) {
1437 let errs = errors.map(|e| e.to_string()).collect::<Vec<_>>().join("; ");
1438 return Err(anyhow!(
1439 "Line {}: JSON Schema Validation errors: {}",
1440 line_num + 1,
1441 errs
1442 ));
1443 }
1444 println!("{}", val);
1445 result.push(val);
1446 }
1447 Ok(result)
1448 }
1449
1450 #[tokio::test]
1451 async fn test_json_log_capture() -> Result<()> {
1452 #[allow(clippy::redundant_closure_call)]
1453 let _ = temp_env::async_with_vars(
1454 [(env_logging::DYN_LOGGING_JSONL, Some("1"))],
1455 (async || {
1456 let tmp_file = NamedTempFile::new().unwrap();
1457 let file_name = tmp_file.path().to_str().unwrap();
1458 let guard = StderrOverride::from_file(file_name)?;
1459 init();
1460 parent().await;
1461 drop(guard);
1462
1463 let lines = load_log(file_name)?;
1464
1465 let Some(trace_id) = lines
1473 .iter()
1474 .find_map(|log_line| log_line.get("trace_id").and_then(|v| v.as_str()))
1475 .map(|s| s.to_string())
1476 else {
1477 return Ok(());
1479 };
1480
1481 assert_ne!(
1483 trace_id, "00000000000000000000000000000000",
1484 "trace_id should not be a zero/invalid ID"
1485 );
1486 assert!(
1487 !trace_id.chars().all(|c| c == '0'),
1488 "trace_id should not be all zeros"
1489 );
1490
1491 for log_line in &lines {
1493 if let Some(line_trace_id) = log_line.get("trace_id") {
1494 assert_eq!(
1495 line_trace_id.as_str().unwrap(),
1496 &trace_id,
1497 "All logs should have the same trace_id"
1498 );
1499 }
1500 }
1501
1502 for log_line in &lines {
1504 if let Some(my_trace_id) = log_line.get("my_trace_id") {
1505 assert_eq!(
1506 my_trace_id,
1507 &serde_json::Value::String(trace_id.clone()),
1508 "my_trace_id should match the trace_id from distributed tracing context"
1509 );
1510 }
1511 }
1512
1513 let mut span_ids_seen: std::collections::HashSet<String> = std::collections::HashSet::new();
1515 let mut span_timestamps: std::collections::HashMap<String, DateTime<Utc>> = std::collections::HashMap::new();
1516
1517 for log_line in &lines {
1518 if let Some(span_id) = log_line.get("span_id") {
1519 let span_id_str = span_id.as_str().unwrap();
1520 assert!(
1521 is_valid_span_id(span_id_str),
1522 "Invalid span_id format: {}",
1523 span_id_str
1524 );
1525 span_ids_seen.insert(span_id_str.to_string());
1526 }
1527
1528 if let Some(time_str) = log_line.get("time").and_then(|v| v.as_str()) {
1530 let timestamp = DateTime::parse_from_rfc3339(time_str)
1531 .expect("All timestamps should be valid RFC3339 format")
1532 .with_timezone(&Utc);
1533
1534 if let Some(span_name) = log_line.get("span_name").and_then(|v| v.as_str()) {
1536 span_timestamps.insert(span_name.to_string(), timestamp);
1537 }
1538 }
1539 }
1540
1541 let parent_span_id = lines
1544 .iter()
1545 .find(|log_line| {
1546 log_line.get("span_name")
1547 .and_then(|v| v.as_str()) == Some("parent")
1548 })
1549 .and_then(|log_line| {
1550 log_line.get("span_id")
1551 .and_then(|v| v.as_str())
1552 .map(|s| s.to_string())
1553 })
1554 .expect("Should find parent span with span_id");
1555
1556 let child_span_id = lines
1557 .iter()
1558 .find(|log_line| {
1559 log_line.get("span_name")
1560 .and_then(|v| v.as_str()) == Some("child")
1561 })
1562 .and_then(|log_line| {
1563 log_line.get("span_id")
1564 .and_then(|v| v.as_str())
1565 .map(|s| s.to_string())
1566 })
1567 .expect("Should find child span with span_id");
1568
1569 let grandchild_span_id = lines
1570 .iter()
1571 .find(|log_line| {
1572 log_line.get("span_name")
1573 .and_then(|v| v.as_str()) == Some("grandchild")
1574 })
1575 .and_then(|log_line| {
1576 log_line.get("span_id")
1577 .and_then(|v| v.as_str())
1578 .map(|s| s.to_string())
1579 })
1580 .expect("Should find grandchild span with span_id");
1581
1582 assert_ne!(parent_span_id, child_span_id, "Parent and child should have different span IDs");
1584 assert_ne!(child_span_id, grandchild_span_id, "Child and grandchild should have different span IDs");
1585 assert_ne!(parent_span_id, grandchild_span_id, "Parent and grandchild should have different span IDs");
1586
1587 for log_line in &lines {
1589 if let Some(span_name) = log_line.get("span_name")
1590 && let Some(span_name_str) = span_name.as_str()
1591 && span_name_str == "parent"
1592 {
1593 assert!(
1594 log_line.get("parent_id").is_none(),
1595 "Parent span should not have a parent_id"
1596 );
1597 }
1598 }
1599
1600 for log_line in &lines {
1602 if let Some(span_name) = log_line.get("span_name")
1603 && let Some(span_name_str) = span_name.as_str()
1604 && span_name_str == "child"
1605 {
1606 let parent_id = log_line.get("parent_id")
1607 .and_then(|v| v.as_str())
1608 .expect("Child span should have a parent_id");
1609 assert_eq!(
1610 parent_id,
1611 parent_span_id,
1612 "Child's parent_id should match parent's span_id"
1613 );
1614 }
1615 }
1616
1617 for log_line in &lines {
1619 if let Some(span_name) = log_line.get("span_name")
1620 && let Some(span_name_str) = span_name.as_str()
1621 && span_name_str == "grandchild"
1622 {
1623 let parent_id = log_line.get("parent_id")
1624 .and_then(|v| v.as_str())
1625 .expect("Grandchild span should have a parent_id");
1626 assert_eq!(
1627 parent_id,
1628 child_span_id,
1629 "Grandchild's parent_id should match child's span_id"
1630 );
1631 }
1632 }
1633
1634 let parent_time = span_timestamps.get("parent")
1636 .expect("Should have timestamp for parent span");
1637 let child_time = span_timestamps.get("child")
1638 .expect("Should have timestamp for child span");
1639 let grandchild_time = span_timestamps.get("grandchild")
1640 .expect("Should have timestamp for grandchild span");
1641
1642 assert!(
1644 parent_time <= child_time,
1645 "Parent span should log before or at same time as child span (parent: {}, child: {})",
1646 parent_time,
1647 child_time
1648 );
1649 assert!(
1650 child_time <= grandchild_time,
1651 "Child span should log before or at same time as grandchild span (child: {}, grandchild: {})",
1652 child_time,
1653 grandchild_time
1654 );
1655
1656 Ok::<(), anyhow::Error>(())
1657 })(),
1658 )
1659 .await;
1660 Ok(())
1661 }
1662
1663 #[tracing::instrument(level = "debug", skip_all)]
1665 async fn debug_level_span() {
1666 tracing::debug!("inside debug span");
1667 }
1668
1669 #[tracing::instrument(level = "info", skip_all)]
1670 async fn info_level_span() {
1671 tracing::info!("inside info span");
1672 }
1673
1674 #[tracing::instrument(level = "warn", skip_all)]
1675 async fn warn_level_span() {
1676 tracing::warn!("inside warn span");
1677 }
1678
1679 #[tracing::instrument(level = "info", target = "other_module", skip_all)]
1682 async fn other_target_info_span() {
1683 tracing::info!(target: "other_module", "inside other target span");
1684 }
1685
1686 #[test]
1697 fn test_span_events() {
1698 use std::process::Command;
1699
1700 let output = Command::new("cargo")
1702 .args([
1703 "test",
1704 "-p",
1705 "dynamo-runtime",
1706 "test_span_events_subprocess",
1707 "--",
1708 "--exact",
1709 "--nocapture",
1710 ])
1711 .env("DYN_LOGGING_JSONL", "1")
1712 .env("DYN_LOGGING_SPAN_EVENTS", "1")
1713 .env("DYN_LOG", "warn,dynamo_runtime::logging::tests=debug")
1714 .output()
1715 .expect("Failed to execute subprocess test");
1716
1717 if !output.status.success() {
1719 eprintln!(
1720 "=== STDOUT ===\n{}",
1721 String::from_utf8_lossy(&output.stdout)
1722 );
1723 eprintln!(
1724 "=== STDERR ===\n{}",
1725 String::from_utf8_lossy(&output.stderr)
1726 );
1727 }
1728
1729 assert!(
1730 output.status.success(),
1731 "Subprocess test failed with exit code: {:?}",
1732 output.status.code()
1733 );
1734 }
1735
1736 #[tokio::test]
1739 async fn test_span_events_subprocess() -> Result<()> {
1740 if std::env::var("DYN_LOGGING_SPAN_EVENTS").is_err() {
1742 return Ok(());
1743 }
1744
1745 let tmp_file = NamedTempFile::new().unwrap();
1746 let file_name = tmp_file.path().to_str().unwrap();
1747 let guard = StderrOverride::from_file(file_name)?;
1748 init();
1749
1750 parent().await;
1752
1753 debug_level_span().await;
1755 info_level_span().await;
1756 warn_level_span().await;
1757
1758 other_target_info_span().await;
1760
1761 drop(guard);
1762
1763 let lines = load_log(file_name)?;
1764
1765 let has_span_event = |msg: &str, span_name: &str| {
1767 lines.iter().any(|log| {
1768 log.get("message").and_then(|v| v.as_str()) == Some(msg)
1769 && log.get("span_name").and_then(|v| v.as_str()) == Some(span_name)
1770 })
1771 };
1772
1773 let get_span_events = |msg: &str| -> Vec<&serde_json::Value> {
1775 lines
1776 .iter()
1777 .filter(|log| log.get("message").and_then(|v| v.as_str()) == Some(msg))
1778 .collect()
1779 };
1780
1781 let span_created_events = get_span_events("SPAN_FIRST_ENTRY");
1783 for event in &span_created_events {
1784 assert!(
1786 event.get("span_name").is_some(),
1787 "SPAN_FIRST_ENTRY must have span_name"
1788 );
1789 let trace_id = event
1791 .get("trace_id")
1792 .and_then(|v| v.as_str())
1793 .expect("SPAN_FIRST_ENTRY must have trace_id");
1794 assert!(
1795 trace_id.len() == 32 && trace_id.chars().all(|c| c.is_ascii_hexdigit()),
1796 "SPAN_FIRST_ENTRY must have valid trace_id format"
1797 );
1798 let span_id = event
1800 .get("span_id")
1801 .and_then(|v| v.as_str())
1802 .expect("SPAN_FIRST_ENTRY must have span_id");
1803 assert!(
1804 is_valid_span_id(span_id),
1805 "SPAN_FIRST_ENTRY must have valid span_id"
1806 );
1807 }
1808
1809 let span_closed_events = get_span_events("SPAN_CLOSED");
1811 for event in &span_closed_events {
1812 assert!(
1813 event.get("span_name").is_some(),
1814 "SPAN_CLOSED must have span_name"
1815 );
1816 assert!(
1817 event.get("time.busy_us").is_some()
1818 || event.get("time.idle_us").is_some()
1819 || event.get("time.duration_us").is_some(),
1820 "SPAN_CLOSED must have timing information"
1821 );
1822 let trace_id = event
1824 .get("trace_id")
1825 .and_then(|v| v.as_str())
1826 .expect("SPAN_CLOSED must have trace_id");
1827 assert!(
1828 trace_id.len() == 32 && trace_id.chars().all(|c| c.is_ascii_hexdigit()),
1829 "SPAN_CLOSED must have valid trace_id format"
1830 );
1831 }
1832
1833 assert!(
1837 has_span_event("SPAN_FIRST_ENTRY", "debug_level_span"),
1838 "DEBUG span from allowed target MUST pass (target=debug filter)"
1839 );
1840 assert!(
1841 has_span_event("SPAN_FIRST_ENTRY", "info_level_span"),
1842 "INFO span from allowed target MUST pass (target=debug filter)"
1843 );
1844 assert!(
1845 has_span_event("SPAN_FIRST_ENTRY", "warn_level_span"),
1846 "WARN span from allowed target MUST pass (target=debug filter)"
1847 );
1848
1849 assert!(
1851 has_span_event("SPAN_FIRST_ENTRY", "parent"),
1852 "parent span (INFO) from allowed target MUST pass"
1853 );
1854 assert!(
1855 has_span_event("SPAN_FIRST_ENTRY", "child"),
1856 "child span (INFO) from allowed target MUST pass"
1857 );
1858 assert!(
1859 has_span_event("SPAN_FIRST_ENTRY", "grandchild"),
1860 "grandchild span (INFO) from allowed target MUST pass"
1861 );
1862
1863 assert!(
1866 !has_span_event("SPAN_FIRST_ENTRY", "other_target_info_span"),
1867 "INFO span from non-allowed target (other_module) MUST be filtered out"
1868 );
1869
1870 for event in &span_created_events {
1872 let target = event.get("target").and_then(|v| v.as_str()).unwrap_or("");
1873 let level = event.get("level").and_then(|v| v.as_str()).unwrap_or("");
1874
1875 if level == "DEBUG" || level == "INFO" {
1877 assert!(
1878 target.contains("dynamo_runtime::logging::tests"),
1879 "DEBUG/INFO span must be from allowed target, got target={target}"
1880 );
1881 }
1882 }
1883
1884 Ok(())
1885 }
1886}