1use std::collections::{BTreeMap, HashMap};
30use std::sync::Once;
31
32use figment::{
33 Figment,
34 providers::{Format, Serialized, Toml},
35};
36use serde::{Deserialize, Serialize};
37use tracing::level_filters::LevelFilter;
38use tracing::{Event, Subscriber};
39use tracing_subscriber::EnvFilter;
40use tracing_subscriber::fmt::time::FormatTime;
41use tracing_subscriber::fmt::time::LocalTime;
42use tracing_subscriber::fmt::time::SystemTime;
43use tracing_subscriber::fmt::time::UtcTime;
44use tracing_subscriber::fmt::{FmtContext, FormatFields};
45use tracing_subscriber::fmt::{FormattedFields, format::Writer};
46use tracing_subscriber::prelude::*;
47use tracing_subscriber::registry::LookupSpan;
48use tracing_subscriber::{filter::Directive, fmt};
49
50use crate::config::{disable_ansi_logging, jsonl_logging_enabled, span_events_enabled};
51use async_nats::{HeaderMap, HeaderValue};
52use axum::extract::FromRequestParts;
53use axum::http;
54use axum::http::Request;
55use axum::http::request::Parts;
56use serde_json::Value;
57use std::convert::Infallible;
58use std::time::Instant;
59use tower_http::trace::{DefaultMakeSpan, TraceLayer};
60use tracing::Id;
61use tracing::Span;
62use tracing::field::Field;
63use tracing::span;
64use tracing_subscriber::Layer;
65use tracing_subscriber::Registry;
66use tracing_subscriber::field::Visit;
67use tracing_subscriber::fmt::format::FmtSpan;
68use tracing_subscriber::layer::Context;
69use tracing_subscriber::registry::SpanData;
70use uuid::Uuid;
71
72use opentelemetry::propagation::{Extractor, Injector, TextMapPropagator};
73use opentelemetry::trace::TraceContextExt;
74use opentelemetry::{global, trace::Tracer};
75use opentelemetry_otlp::WithExportConfig;
76
77use opentelemetry::trace::TracerProvider as _;
78use opentelemetry::{Key, KeyValue};
79use opentelemetry_sdk::Resource;
80use opentelemetry_sdk::trace::SdkTracerProvider;
81use tracing::error;
82use tracing_subscriber::layer::SubscriberExt;
83use std::time::Duration;
86use tracing::{info, instrument};
87use tracing_opentelemetry::OpenTelemetrySpanExt;
88use tracing_subscriber::util::SubscriberInitExt;
89
90use crate::config::environment_names::logging as env_logging;
91
92use dynamo_config::env_is_truthy;
93
94const DEFAULT_FILTER_LEVEL: &str = "info";
96
97const DEFAULT_OTLP_ENDPOINT: &str = "http://localhost:4317";
99
100const DEFAULT_OTEL_SERVICE_NAME: &str = "dynamo";
102
103static INIT: Once = Once::new();
105
106#[derive(Serialize, Deserialize, Debug)]
107struct LoggingConfig {
108 log_level: String,
109 log_filters: HashMap<String, String>,
110}
111impl Default for LoggingConfig {
112 fn default() -> Self {
113 LoggingConfig {
114 log_level: DEFAULT_FILTER_LEVEL.to_string(),
115 log_filters: HashMap::from([
116 ("h2".to_string(), "error".to_string()),
117 ("tower".to_string(), "error".to_string()),
118 ("hyper_util".to_string(), "error".to_string()),
119 ("neli".to_string(), "error".to_string()),
120 ("async_nats".to_string(), "error".to_string()),
121 ("rustls".to_string(), "error".to_string()),
122 ("tokenizers".to_string(), "error".to_string()),
123 ("axum".to_string(), "error".to_string()),
124 ("tonic".to_string(), "error".to_string()),
125 ("mistralrs_core".to_string(), "error".to_string()),
126 ("hf_hub".to_string(), "error".to_string()),
127 ("opentelemetry".to_string(), "error".to_string()),
128 ("opentelemetry-otlp".to_string(), "error".to_string()),
129 ("opentelemetry_sdk".to_string(), "error".to_string()),
130 ]),
131 }
132 }
133}
134
135fn otlp_exporter_enabled() -> bool {
137 env_is_truthy(env_logging::otlp::OTEL_EXPORT_ENABLED)
138}
139
140fn get_service_name() -> String {
142 std::env::var(env_logging::otlp::OTEL_SERVICE_NAME)
143 .unwrap_or_else(|_| DEFAULT_OTEL_SERVICE_NAME.to_string())
144}
145
146pub fn is_valid_trace_id(trace_id: &str) -> bool {
149 trace_id.len() == 32 && trace_id.chars().all(|c| c.is_ascii_hexdigit())
150}
151
152pub fn is_valid_span_id(span_id: &str) -> bool {
155 span_id.len() == 16 && span_id.chars().all(|c| c.is_ascii_hexdigit())
156}
157
158pub struct DistributedTraceIdLayer;
159
160#[derive(Debug, Clone, Serialize, Deserialize)]
161pub struct DistributedTraceContext {
162 pub trace_id: String,
163 pub span_id: String,
164 #[serde(skip_serializing_if = "Option::is_none")]
165 pub parent_id: Option<String>,
166 #[serde(skip_serializing_if = "Option::is_none")]
167 pub tracestate: Option<String>,
168 #[serde(skip)]
169 start: Option<Instant>,
170 #[serde(skip)]
171 end: Option<Instant>,
172 #[serde(skip_serializing_if = "Option::is_none")]
173 pub x_request_id: Option<String>,
174 #[serde(skip_serializing_if = "Option::is_none")]
175 pub x_dynamo_request_id: Option<String>,
176}
177
178#[derive(Debug, Clone)]
180struct PendingDistributedTraceContext {
181 trace_id: Option<String>,
182 span_id: Option<String>,
183 parent_id: Option<String>,
184 tracestate: Option<String>,
185 x_request_id: Option<String>,
186 x_dynamo_request_id: Option<String>,
187}
188
189macro_rules! emit_at_level {
191 ($level:expr, target: $target:expr, $($arg:tt)*) => {
192 match $level {
196 &tracing::Level::ERROR => tracing::event!(target: $target, tracing::Level::ERROR, $($arg)*),
197 &tracing::Level::WARN => tracing::event!(target: $target, tracing::Level::WARN, $($arg)*),
198 &tracing::Level::INFO => tracing::event!(target: $target, tracing::Level::INFO, $($arg)*),
199 &tracing::Level::DEBUG => tracing::event!(target: $target, tracing::Level::DEBUG, $($arg)*),
200 &tracing::Level::TRACE => tracing::event!(target: $target, tracing::Level::TRACE, $($arg)*),
201 }
202 };
203}
204
205impl DistributedTraceContext {
206 pub fn create_traceparent(&self) -> String {
208 format!("00-{}-{}-01", self.trace_id, self.span_id)
209 }
210}
211
212pub fn parse_traceparent(traceparent: &str) -> (Option<String>, Option<String>) {
214 let pieces: Vec<_> = traceparent.split('-').collect();
215 if pieces.len() != 4 {
216 return (None, None);
217 }
218 let trace_id = pieces[1];
219 let parent_id = pieces[2];
220
221 if !is_valid_trace_id(trace_id) || !is_valid_span_id(parent_id) {
222 return (None, None);
223 }
224
225 (Some(trace_id.to_string()), Some(parent_id.to_string()))
226}
227
228#[derive(Debug, Clone, Default)]
229pub struct TraceParent {
230 pub trace_id: Option<String>,
231 pub parent_id: Option<String>,
232 pub tracestate: Option<String>,
233 pub x_request_id: Option<String>,
234 pub x_dynamo_request_id: Option<String>,
235}
236
237pub trait GenericHeaders {
238 fn get(&self, key: &str) -> Option<&str>;
239}
240
241impl GenericHeaders for async_nats::HeaderMap {
242 fn get(&self, key: &str) -> Option<&str> {
243 async_nats::HeaderMap::get(self, key).map(|value| value.as_str())
244 }
245}
246
247impl GenericHeaders for http::HeaderMap {
248 fn get(&self, key: &str) -> Option<&str> {
249 http::HeaderMap::get(self, key).and_then(|value| value.to_str().ok())
250 }
251}
252
253impl TraceParent {
254 pub fn from_headers<H: GenericHeaders>(headers: &H) -> TraceParent {
255 let mut trace_id = None;
256 let mut parent_id = None;
257 let mut tracestate = None;
258 let mut x_request_id = None;
259 let mut x_dynamo_request_id = None;
260
261 if let Some(header_value) = headers.get("traceparent") {
262 (trace_id, parent_id) = parse_traceparent(header_value);
263 }
264
265 if let Some(header_value) = headers.get("x-request-id") {
266 x_request_id = Some(header_value.to_string());
267 }
268
269 if let Some(header_value) = headers.get("tracestate") {
270 tracestate = Some(header_value.to_string());
271 }
272
273 if let Some(header_value) = headers.get("x-dynamo-request-id") {
274 x_dynamo_request_id = Some(header_value.to_string());
275 }
276
277 let x_dynamo_request_id =
279 x_dynamo_request_id.filter(|id| uuid::Uuid::parse_str(id).is_ok());
280 TraceParent {
281 trace_id,
282 parent_id,
283 tracestate,
284 x_request_id,
285 x_dynamo_request_id,
286 }
287 }
288}
289
290pub fn make_request_span<B>(req: &Request<B>) -> Span {
292 let method = req.method();
293 let uri = req.uri();
294 let version = format!("{:?}", req.version());
295 let trace_parent = TraceParent::from_headers(req.headers());
296
297 let otel_context = extract_otel_context_from_http_headers(req.headers());
298
299 let span = tracing::info_span!(
300 "http-request",
301 method = %method,
302 uri = %uri,
303 version = %version,
304 trace_id = trace_parent.trace_id,
305 parent_id = trace_parent.parent_id,
306 x_request_id = trace_parent.x_request_id,
307 x_dynamo_request_id = trace_parent.x_dynamo_request_id,
308 );
309
310 if let Some(context) = otel_context {
311 let _ = span.set_parent(context);
312 }
313
314 span
315}
316
317fn extract_otel_context_from_http_headers(
319 headers: &http::HeaderMap,
320) -> Option<opentelemetry::Context> {
321 let traceparent_value = headers.get("traceparent")?.to_str().ok()?;
322
323 struct HttpHeaderExtractor<'a>(&'a http::HeaderMap);
324
325 impl<'a> Extractor for HttpHeaderExtractor<'a> {
326 fn get(&self, key: &str) -> Option<&str> {
327 self.0.get(key).and_then(|v| v.to_str().ok())
328 }
329
330 fn keys(&self) -> Vec<&str> {
331 vec!["traceparent", "tracestate"]
332 .into_iter()
333 .filter(|&key| self.0.get(key).is_some())
334 .collect()
335 }
336 }
337
338 if traceparent_value.is_empty() {
340 return None;
341 }
342
343 let extractor = HttpHeaderExtractor(headers);
344 let otel_context = TRACE_PROPAGATOR.extract(&extractor);
345
346 if otel_context.span().span_context().is_valid() {
347 Some(otel_context)
348 } else {
349 None
350 }
351}
352
353pub fn make_handle_payload_span(
355 headers: &async_nats::HeaderMap,
356 component: &str,
357 endpoint: &str,
358 namespace: &str,
359 instance_id: u64,
360) -> Span {
361 let (otel_context, trace_id, parent_span_id) = extract_otel_context_from_nats_headers(headers);
362 let trace_parent = TraceParent::from_headers(headers);
363
364 if let (Some(trace_id), Some(parent_id)) = (trace_id.as_ref(), parent_span_id.as_ref()) {
365 let span = tracing::info_span!(
366 "handle_payload",
367 trace_id = trace_id.as_str(),
368 parent_id = parent_id.as_str(),
369 x_request_id = trace_parent.x_request_id,
370 x_dynamo_request_id = trace_parent.x_dynamo_request_id,
371 tracestate = trace_parent.tracestate,
372 component = component,
373 endpoint = endpoint,
374 namespace = namespace,
375 instance_id = instance_id,
376 );
377
378 if let Some(context) = otel_context {
379 let _ = span.set_parent(context);
380 }
381 span
382 } else {
383 tracing::info_span!(
384 "handle_payload",
385 x_request_id = trace_parent.x_request_id,
386 x_dynamo_request_id = trace_parent.x_dynamo_request_id,
387 tracestate = trace_parent.tracestate,
388 component = component,
389 endpoint = endpoint,
390 namespace = namespace,
391 instance_id = instance_id,
392 )
393 }
394}
395
396pub fn make_handle_payload_span_from_tcp_headers(
398 headers: &std::collections::HashMap<String, String>,
399 component: &str,
400 endpoint: &str,
401 namespace: &str,
402 instance_id: u64,
403) -> Span {
404 let (otel_context, trace_id, parent_span_id) = extract_otel_context_from_tcp_headers(headers);
405 let x_request_id = headers.get("x-request-id").cloned();
406 let x_dynamo_request_id = headers.get("x-dynamo-request-id").cloned();
407 let tracestate = headers.get("tracestate").cloned();
408
409 if let (Some(trace_id), Some(parent_id)) = (trace_id.as_ref(), parent_span_id.as_ref()) {
410 let span = tracing::info_span!(
411 "handle_payload",
412 trace_id = trace_id.as_str(),
413 parent_id = parent_id.as_str(),
414 x_request_id = x_request_id,
415 x_dynamo_request_id = x_dynamo_request_id,
416 tracestate = tracestate,
417 component = component,
418 endpoint = endpoint,
419 namespace = namespace,
420 instance_id = instance_id,
421 );
422
423 if let Some(context) = otel_context {
424 let _ = span.set_parent(context);
425 }
426 span
427 } else {
428 tracing::info_span!(
429 "handle_payload",
430 x_request_id = x_request_id,
431 x_dynamo_request_id = x_dynamo_request_id,
432 tracestate = tracestate,
433 component = component,
434 endpoint = endpoint,
435 namespace = namespace,
436 instance_id = instance_id,
437 )
438 }
439}
440
441fn extract_otel_context_from_tcp_headers(
443 headers: &std::collections::HashMap<String, String>,
444) -> (
445 Option<opentelemetry::Context>,
446 Option<String>,
447 Option<String>,
448) {
449 let traceparent_value = match headers.get("traceparent") {
450 Some(value) => value.as_str(),
451 None => return (None, None, None),
452 };
453
454 let (trace_id, parent_span_id) = parse_traceparent(traceparent_value);
455
456 struct TcpHeaderExtractor<'a>(&'a std::collections::HashMap<String, String>);
457
458 impl<'a> Extractor for TcpHeaderExtractor<'a> {
459 fn get(&self, key: &str) -> Option<&str> {
460 self.0.get(key).map(|s| s.as_str())
461 }
462
463 fn keys(&self) -> Vec<&str> {
464 vec!["traceparent", "tracestate"]
465 .into_iter()
466 .filter(|&key| self.0.get(key).is_some())
467 .collect()
468 }
469 }
470
471 let extractor = TcpHeaderExtractor(headers);
472 let otel_context = TRACE_PROPAGATOR.extract(&extractor);
473
474 let context_with_trace = if otel_context.span().span_context().is_valid() {
475 Some(otel_context)
476 } else {
477 None
478 };
479
480 (context_with_trace, trace_id, parent_span_id)
481}
482
483pub fn extract_otel_context_from_nats_headers(
485 headers: &async_nats::HeaderMap,
486) -> (
487 Option<opentelemetry::Context>,
488 Option<String>,
489 Option<String>,
490) {
491 let traceparent_value = match headers.get("traceparent") {
492 Some(value) => value.as_str(),
493 None => return (None, None, None),
494 };
495
496 let (trace_id, parent_span_id) = parse_traceparent(traceparent_value);
497
498 struct NatsHeaderExtractor<'a>(&'a async_nats::HeaderMap);
499
500 impl<'a> Extractor for NatsHeaderExtractor<'a> {
501 fn get(&self, key: &str) -> Option<&str> {
502 self.0.get(key).map(|value| value.as_str())
503 }
504
505 fn keys(&self) -> Vec<&str> {
506 vec!["traceparent", "tracestate"]
507 .into_iter()
508 .filter(|&key| self.0.get(key).is_some())
509 .collect()
510 }
511 }
512
513 let extractor = NatsHeaderExtractor(headers);
514 let otel_context = TRACE_PROPAGATOR.extract(&extractor);
515
516 let context_with_trace = if otel_context.span().span_context().is_valid() {
517 Some(otel_context)
518 } else {
519 None
520 };
521
522 (context_with_trace, trace_id, parent_span_id)
523}
524
525pub fn inject_otel_context_into_nats_headers(
527 headers: &mut async_nats::HeaderMap,
528 context: Option<opentelemetry::Context>,
529) {
530 let otel_context = context.unwrap_or_else(|| Span::current().context());
531
532 struct NatsHeaderInjector<'a>(&'a mut async_nats::HeaderMap);
533
534 impl<'a> Injector for NatsHeaderInjector<'a> {
535 fn set(&mut self, key: &str, value: String) {
536 self.0.insert(key, value);
537 }
538 }
539
540 let mut injector = NatsHeaderInjector(headers);
541 TRACE_PROPAGATOR.inject_context(&otel_context, &mut injector);
542}
543
544pub fn inject_current_trace_into_nats_headers(headers: &mut async_nats::HeaderMap) {
546 inject_otel_context_into_nats_headers(headers, None);
547}
548
549pub fn inject_trace_headers_into_map(headers: &mut std::collections::HashMap<String, String>) {
551 if let Some(trace_context) = get_distributed_tracing_context() {
552 headers.insert(
554 "traceparent".to_string(),
555 trace_context.create_traceparent(),
556 );
557
558 if let Some(tracestate) = trace_context.tracestate {
560 headers.insert("tracestate".to_string(), tracestate);
561 }
562
563 if let Some(x_request_id) = trace_context.x_request_id {
565 headers.insert("x-request-id".to_string(), x_request_id);
566 }
567 if let Some(x_dynamo_request_id) = trace_context.x_dynamo_request_id {
568 headers.insert("x-dynamo-request-id".to_string(), x_dynamo_request_id);
569 }
570 }
571}
572
573pub fn make_client_request_span(
575 operation: &str,
576 request_id: &str,
577 trace_context: Option<&DistributedTraceContext>,
578 instance_id: Option<&str>,
579) -> Span {
580 if let Some(ctx) = trace_context {
581 let mut headers = async_nats::HeaderMap::new();
582 headers.insert("traceparent", ctx.create_traceparent());
583
584 if let Some(ref tracestate) = ctx.tracestate {
585 headers.insert("tracestate", tracestate.as_str());
586 }
587
588 let (otel_context, _extracted_trace_id, _extracted_parent_span_id) =
589 extract_otel_context_from_nats_headers(&headers);
590
591 let span = if let Some(inst_id) = instance_id {
592 tracing::info_span!(
593 "client_request",
594 operation = operation,
595 request_id = request_id,
596 instance_id = inst_id,
597 trace_id = ctx.trace_id.as_str(),
598 parent_id = ctx.span_id.as_str(),
599 x_request_id = ctx.x_request_id.as_deref(),
600 x_dynamo_request_id = ctx.x_dynamo_request_id.as_deref(),
601 )
603 } else {
604 tracing::info_span!(
605 "client_request",
606 operation = operation,
607 request_id = request_id,
608 trace_id = ctx.trace_id.as_str(),
609 parent_id = ctx.span_id.as_str(),
610 x_request_id = ctx.x_request_id.as_deref(),
611 x_dynamo_request_id = ctx.x_dynamo_request_id.as_deref(),
612 )
614 };
615
616 if let Some(context) = otel_context {
617 let _ = span.set_parent(context);
618 }
619
620 span
621 } else if let Some(inst_id) = instance_id {
622 tracing::info_span!(
623 "client_request",
624 operation = operation,
625 request_id = request_id,
626 instance_id = inst_id,
627 )
628 } else {
629 tracing::info_span!(
630 "client_request",
631 operation = operation,
632 request_id = request_id,
633 )
634 }
635}
636
637#[derive(Debug, Default)]
638pub struct FieldVisitor {
639 pub fields: HashMap<String, String>,
640}
641
642impl Visit for FieldVisitor {
643 fn record_str(&mut self, field: &Field, value: &str) {
644 self.fields
645 .insert(field.name().to_string(), value.to_string());
646 }
647
648 fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
649 self.fields
650 .insert(field.name().to_string(), format!("{:?}", value).to_string());
651 }
652}
653
654impl<S> Layer<S> for DistributedTraceIdLayer
655where
656 S: Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>,
657{
658 fn on_close(&self, id: Id, ctx: Context<'_, S>) {
661 if let Some(span) = ctx.span(&id) {
662 let mut extensions = span.extensions_mut();
663 if let Some(distributed_tracing_context) =
664 extensions.get_mut::<DistributedTraceContext>()
665 {
666 distributed_tracing_context.end = Some(Instant::now());
667 }
668 }
669 }
670
671 fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &Id, ctx: Context<'_, S>) {
674 if let Some(span) = ctx.span(id) {
675 let mut trace_id: Option<String> = None;
676 let mut parent_id: Option<String> = None;
677 let mut span_id: Option<String> = None;
678 let mut x_request_id: Option<String> = None;
679 let mut x_dynamo_request_id: Option<String> = None;
680 let mut tracestate: Option<String> = None;
681 let mut visitor = FieldVisitor::default();
682 attrs.record(&mut visitor);
683
684 if let Some(trace_id_input) = visitor.fields.get("trace_id") {
686 if !is_valid_trace_id(trace_id_input) {
687 tracing::trace!("trace id '{}' is not valid! Ignoring.", trace_id_input);
688 } else {
689 trace_id = Some(trace_id_input.to_string());
690 }
691 }
692
693 if let Some(span_id_input) = visitor.fields.get("span_id") {
695 if !is_valid_span_id(span_id_input) {
696 tracing::trace!("span id '{}' is not valid! Ignoring.", span_id_input);
697 } else {
698 span_id = Some(span_id_input.to_string());
699 }
700 }
701
702 if let Some(parent_id_input) = visitor.fields.get("parent_id") {
704 if !is_valid_span_id(parent_id_input) {
705 tracing::trace!("parent id '{}' is not valid! Ignoring.", parent_id_input);
706 } else {
707 parent_id = Some(parent_id_input.to_string());
708 }
709 }
710
711 if let Some(tracestate_input) = visitor.fields.get("tracestate") {
713 tracestate = Some(tracestate_input.to_string());
714 }
715
716 if let Some(x_request_id_input) = visitor.fields.get("x_request_id") {
718 x_request_id = Some(x_request_id_input.to_string());
719 }
720
721 if let Some(x_request_id_input) = visitor.fields.get("x_dynamo_request_id") {
723 x_dynamo_request_id = Some(x_request_id_input.to_string());
724 }
725
726 if parent_id.is_none()
728 && let Some(parent_span_id) = ctx.current_span().id()
729 && let Some(parent_span) = ctx.span(parent_span_id)
730 {
731 let parent_ext = parent_span.extensions();
732 if let Some(parent_tracing_context) = parent_ext.get::<DistributedTraceContext>() {
733 trace_id = Some(parent_tracing_context.trace_id.clone());
734 parent_id = Some(parent_tracing_context.span_id.clone());
735 tracestate = parent_tracing_context.tracestate.clone();
736 }
737 }
738
739 if (parent_id.is_some() || span_id.is_some()) && trace_id.is_none() {
741 tracing::error!("parent id or span id are set but trace id is not set!");
742 parent_id = None;
744 span_id = None;
745 }
746
747 let mut extensions = span.extensions_mut();
749 extensions.insert(PendingDistributedTraceContext {
750 trace_id,
751 span_id,
752 parent_id,
753 tracestate,
754 x_request_id,
755 x_dynamo_request_id,
756 });
757 }
758 }
759
760 fn on_enter(&self, id: &Id, ctx: Context<'_, S>) {
763 if let Some(span) = ctx.span(id) {
764 {
766 let extensions = span.extensions();
767 if extensions.get::<DistributedTraceContext>().is_some() {
768 return;
769 }
770 }
771
772 let mut extensions = span.extensions_mut();
774 let pending = match extensions.remove::<PendingDistributedTraceContext>() {
775 Some(p) => p,
776 None => {
777 tracing::error!("PendingDistributedTraceContext not found in on_enter");
779 return;
780 }
781 };
782
783 let mut trace_id = pending.trace_id;
784 let mut span_id = pending.span_id;
785 let parent_id = pending.parent_id;
786 let tracestate = pending.tracestate;
787 let x_request_id = pending.x_request_id;
788 let x_dynamo_request_id = pending.x_dynamo_request_id;
789
790 drop(extensions);
793
794 if trace_id.is_none() || span_id.is_none() {
795 let extensions = span.extensions();
796 if let Some(otel_data) = extensions.get::<tracing_opentelemetry::OtelData>() {
797 if trace_id.is_none()
799 && let Some(otel_trace_id) = otel_data.trace_id()
800 {
801 let trace_id_str = format!("{}", otel_trace_id);
802 if is_valid_trace_id(&trace_id_str) {
803 trace_id = Some(trace_id_str);
804 }
805 }
806
807 if span_id.is_none()
809 && let Some(otel_span_id) = otel_data.span_id()
810 {
811 let span_id_str = format!("{}", otel_span_id);
812 if is_valid_span_id(&span_id_str) {
813 span_id = Some(span_id_str);
814 }
815 }
816 }
817 }
818
819 if trace_id.is_none() {
821 panic!(
822 "trace_id is not set in on_enter - OtelData may not be properly initialized"
823 );
824 }
825
826 if span_id.is_none() {
827 panic!("span_id is not set in on_enter - OtelData may not be properly initialized");
828 }
829
830 let span_level = span.metadata().level();
831 let mut extensions = span.extensions_mut();
832 extensions.insert(DistributedTraceContext {
833 trace_id: trace_id.expect("Trace ID must be set"),
834 span_id: span_id.expect("Span ID must be set"),
835 parent_id,
836 tracestate,
837 start: Some(Instant::now()),
838 end: None,
839 x_request_id,
840 x_dynamo_request_id,
841 });
842
843 drop(extensions);
844
845 if span_events_enabled() {
848 emit_at_level!(span_level, target: "span_event", message = "SPAN_FIRST_ENTRY");
849 }
850 }
851 }
852}
853
854pub fn get_distributed_tracing_context() -> Option<DistributedTraceContext> {
857 Span::current()
858 .with_subscriber(|(id, subscriber)| {
859 subscriber
860 .downcast_ref::<Registry>()
861 .and_then(|registry| registry.span_data(id))
862 .and_then(|span_data| {
863 let extensions = span_data.extensions();
864 extensions.get::<DistributedTraceContext>().cloned()
865 })
866 })
867 .flatten()
868}
869
870pub fn init() {
872 INIT.call_once(|| {
873 if let Err(e) = setup_logging() {
874 eprintln!("Failed to initialize logging: {}", e);
875 std::process::exit(1);
876 }
877 });
878}
879
880#[cfg(feature = "tokio-console")]
881fn setup_logging() {
882 let tokio_console_layer = console_subscriber::ConsoleLayer::builder()
883 .with_default_env()
884 .server_addr(([0, 0, 0, 0], console_subscriber::Server::DEFAULT_PORT))
885 .spawn();
886 let tokio_console_target = tracing_subscriber::filter::Targets::new()
887 .with_default(LevelFilter::ERROR)
888 .with_target("runtime", LevelFilter::TRACE)
889 .with_target("tokio", LevelFilter::TRACE);
890 let l = fmt::layer()
891 .with_ansi(!disable_ansi_logging())
892 .event_format(fmt::format().compact().with_timer(TimeFormatter::new()))
893 .with_writer(std::io::stderr)
894 .with_filter(filters(load_config()));
895 tracing_subscriber::registry()
896 .with(l)
897 .with(tokio_console_layer.with_filter(tokio_console_target))
898 .init();
899}
900
901#[cfg(not(feature = "tokio-console"))]
902fn setup_logging() -> Result<(), Box<dyn std::error::Error>> {
903 let fmt_filter_layer = filters(load_config());
904 let trace_filter_layer = filters(load_config());
905 let otel_filter_layer = filters(load_config());
906
907 if jsonl_logging_enabled() {
908 let span_events = if span_events_enabled() {
909 FmtSpan::CLOSE
910 } else {
911 FmtSpan::NONE
912 };
913 let l = fmt::layer()
914 .with_ansi(false)
915 .with_span_events(span_events)
916 .event_format(CustomJsonFormatter::new())
917 .with_writer(std::io::stderr)
918 .with_filter(fmt_filter_layer);
919
920 let service_name = get_service_name();
922
923 let (tracer_provider, endpoint_opt) = if otlp_exporter_enabled() {
925 let endpoint = std::env::var(env_logging::otlp::OTEL_EXPORTER_OTLP_TRACES_ENDPOINT)
927 .unwrap_or_else(|_| DEFAULT_OTLP_ENDPOINT.to_string());
928
929 let otlp_exporter = opentelemetry_otlp::SpanExporter::builder()
931 .with_tonic()
932 .with_endpoint(&endpoint)
933 .build()?;
934
935 let provider = opentelemetry_sdk::trace::SdkTracerProvider::builder()
937 .with_batch_exporter(otlp_exporter)
938 .with_resource(
939 opentelemetry_sdk::Resource::builder_empty()
940 .with_service_name(service_name.clone())
941 .build(),
942 )
943 .build();
944
945 (provider, Some(endpoint))
946 } else {
947 let provider = opentelemetry_sdk::trace::SdkTracerProvider::builder()
949 .with_resource(
950 opentelemetry_sdk::Resource::builder_empty()
951 .with_service_name(service_name.clone())
952 .build(),
953 )
954 .build();
955
956 (provider, None)
957 };
958
959 let tracer = tracer_provider.tracer(service_name.clone());
961
962 tracing_subscriber::registry()
963 .with(
964 tracing_opentelemetry::layer()
965 .with_tracer(tracer)
966 .with_filter(otel_filter_layer),
967 )
968 .with(DistributedTraceIdLayer.with_filter(trace_filter_layer))
969 .with(l)
970 .init();
971
972 if let Some(endpoint) = endpoint_opt {
974 tracing::info!(
975 endpoint = %endpoint,
976 service = %service_name,
977 "OpenTelemetry OTLP export enabled"
978 );
979 } else {
980 tracing::info!(
981 service = %service_name,
982 "OpenTelemetry OTLP export disabled, traces local only"
983 );
984 }
985 } else {
986 let l = fmt::layer()
987 .with_ansi(!disable_ansi_logging())
988 .event_format(fmt::format().compact().with_timer(TimeFormatter::new()))
989 .with_writer(std::io::stderr)
990 .with_filter(fmt_filter_layer);
991
992 tracing_subscriber::registry().with(l).init();
993 }
994
995 Ok(())
996}
997
998fn filters(config: LoggingConfig) -> EnvFilter {
999 let mut filter_layer = EnvFilter::builder()
1000 .with_default_directive(config.log_level.parse().unwrap())
1001 .with_env_var(env_logging::DYN_LOG)
1002 .from_env_lossy();
1003
1004 for (module, level) in config.log_filters {
1005 match format!("{module}={level}").parse::<Directive>() {
1006 Ok(d) => {
1007 filter_layer = filter_layer.add_directive(d);
1008 }
1009 Err(e) => {
1010 eprintln!("Failed parsing filter '{level}' for module '{module}': {e}");
1011 }
1012 }
1013 }
1014
1015 if span_events_enabled() {
1018 filter_layer = filter_layer.add_directive("span_event=trace".parse().unwrap());
1019 }
1020
1021 filter_layer
1022}
1023
1024pub fn log_message(level: &str, message: &str, module: &str, file: &str, line: u32) {
1027 let level = match level {
1028 "debug" => log::Level::Debug,
1029 "info" => log::Level::Info,
1030 "warn" => log::Level::Warn,
1031 "error" => log::Level::Error,
1032 "warning" => log::Level::Warn,
1033 _ => log::Level::Info,
1034 };
1035 log::logger().log(
1036 &log::Record::builder()
1037 .args(format_args!("{}", message))
1038 .level(level)
1039 .target(module)
1040 .file(Some(file))
1041 .line(Some(line))
1042 .build(),
1043 );
1044}
1045
1046fn load_config() -> LoggingConfig {
1047 let config_path =
1048 std::env::var(env_logging::DYN_LOGGING_CONFIG_PATH).unwrap_or_else(|_| "".to_string());
1049 let figment = Figment::new()
1050 .merge(Serialized::defaults(LoggingConfig::default()))
1051 .merge(Toml::file("/opt/dynamo/etc/logging.toml"))
1052 .merge(Toml::file(config_path));
1053
1054 figment.extract().unwrap()
1055}
1056
1057#[derive(Serialize)]
1058struct JsonLog<'a> {
1059 time: String,
1060 level: String,
1061 #[serde(skip_serializing_if = "Option::is_none")]
1062 file: Option<&'a str>,
1063 #[serde(skip_serializing_if = "Option::is_none")]
1064 line: Option<u32>,
1065 target: String,
1066 message: serde_json::Value,
1067 #[serde(flatten)]
1068 fields: BTreeMap<String, serde_json::Value>,
1069}
1070
1071struct TimeFormatter {
1072 use_local_tz: bool,
1073}
1074
1075impl TimeFormatter {
1076 fn new() -> Self {
1077 Self {
1078 use_local_tz: crate::config::use_local_timezone(),
1079 }
1080 }
1081
1082 fn format_now(&self) -> String {
1083 if self.use_local_tz {
1084 chrono::Local::now()
1085 .format("%Y-%m-%dT%H:%M:%S%.6f%:z")
1086 .to_string()
1087 } else {
1088 chrono::Utc::now()
1089 .format("%Y-%m-%dT%H:%M:%S%.6fZ")
1090 .to_string()
1091 }
1092 }
1093}
1094
1095impl FormatTime for TimeFormatter {
1096 fn format_time(&self, w: &mut fmt::format::Writer<'_>) -> std::fmt::Result {
1097 write!(w, "{}", self.format_now())
1098 }
1099}
1100
1101struct CustomJsonFormatter {
1102 time_formatter: TimeFormatter,
1103}
1104
1105impl CustomJsonFormatter {
1106 fn new() -> Self {
1107 Self {
1108 time_formatter: TimeFormatter::new(),
1109 }
1110 }
1111}
1112
1113use once_cell::sync::Lazy;
1114use regex::Regex;
1115
1116static TRACE_PROPAGATOR: Lazy<opentelemetry_sdk::propagation::TraceContextPropagator> =
1118 Lazy::new(opentelemetry_sdk::propagation::TraceContextPropagator::new);
1119
1120fn parse_tracing_duration(s: &str) -> Option<u64> {
1121 static RE: Lazy<Regex> =
1122 Lazy::new(|| Regex::new(r#"^["']?\s*([0-9.]+)\s*(µs|us|ns|ms|s)\s*["']?$"#).unwrap());
1123 let captures = RE.captures(s)?;
1124 let value: f64 = captures[1].parse().ok()?;
1125 let unit = &captures[2];
1126 match unit {
1127 "ns" => Some((value / 1000.0) as u64),
1128 "µs" | "us" => Some(value as u64),
1129 "ms" => Some((value * 1000.0) as u64),
1130 "s" => Some((value * 1_000_000.0) as u64),
1131 _ => None,
1132 }
1133}
1134
1135impl<S, N> tracing_subscriber::fmt::FormatEvent<S, N> for CustomJsonFormatter
1136where
1137 S: Subscriber + for<'a> LookupSpan<'a>,
1138 N: for<'a> FormatFields<'a> + 'static,
1139{
1140 fn format_event(
1141 &self,
1142 ctx: &FmtContext<'_, S, N>,
1143 mut writer: Writer<'_>,
1144 event: &Event<'_>,
1145 ) -> std::fmt::Result {
1146 let mut visitor = JsonVisitor::default();
1147 let time = self.time_formatter.format_now();
1148 event.record(&mut visitor);
1149 let mut message = visitor
1150 .fields
1151 .remove("message")
1152 .unwrap_or(serde_json::Value::String("".to_string()));
1153
1154 let mut target_override: Option<String> = None;
1155
1156 let current_span = event
1157 .parent()
1158 .and_then(|id| ctx.span(id))
1159 .or_else(|| ctx.lookup_current());
1160 if let Some(span) = current_span {
1161 let ext = span.extensions();
1162 let data = ext.get::<FormattedFields<N>>().unwrap();
1163 let span_fields: Vec<(&str, &str)> = data
1164 .fields
1165 .split(' ')
1166 .filter_map(|entry| entry.split_once('='))
1167 .collect();
1168 for (name, value) in span_fields {
1169 visitor.fields.insert(
1170 name.to_string(),
1171 serde_json::Value::String(value.trim_matches('"').to_string()),
1172 );
1173 }
1174
1175 let busy_us = visitor
1176 .fields
1177 .remove("time.busy")
1178 .and_then(|v| parse_tracing_duration(&v.to_string()));
1179 let idle_us = visitor
1180 .fields
1181 .remove("time.idle")
1182 .and_then(|v| parse_tracing_duration(&v.to_string()));
1183
1184 if let (Some(busy_us), Some(idle_us)) = (busy_us, idle_us) {
1185 visitor.fields.insert(
1186 "time.busy_us".to_string(),
1187 serde_json::Value::Number(busy_us.into()),
1188 );
1189 visitor.fields.insert(
1190 "time.idle_us".to_string(),
1191 serde_json::Value::Number(idle_us.into()),
1192 );
1193 visitor.fields.insert(
1194 "time.duration_us".to_string(),
1195 serde_json::Value::Number((busy_us + idle_us).into()),
1196 );
1197 }
1198
1199 let is_span_created = message.as_str() == Some("SPAN_FIRST_ENTRY");
1200 let is_span_closed = message.as_str() == Some("close");
1201 if is_span_created || is_span_closed {
1202 target_override = Some(span.metadata().target().to_string());
1203 if is_span_closed {
1204 message = serde_json::Value::String("SPAN_CLOSED".to_string());
1205 }
1206 }
1207
1208 visitor.fields.insert(
1209 "span_name".to_string(),
1210 serde_json::Value::String(span.name().to_string()),
1211 );
1212
1213 if let Some(tracing_context) = ext.get::<DistributedTraceContext>() {
1214 visitor.fields.insert(
1215 "span_id".to_string(),
1216 serde_json::Value::String(tracing_context.span_id.clone()),
1217 );
1218 visitor.fields.insert(
1219 "trace_id".to_string(),
1220 serde_json::Value::String(tracing_context.trace_id.clone()),
1221 );
1222 if let Some(parent_id) = tracing_context.parent_id.clone() {
1223 visitor.fields.insert(
1224 "parent_id".to_string(),
1225 serde_json::Value::String(parent_id),
1226 );
1227 } else {
1228 visitor.fields.remove("parent_id");
1229 }
1230 if let Some(tracestate) = tracing_context.tracestate.clone() {
1231 visitor.fields.insert(
1232 "tracestate".to_string(),
1233 serde_json::Value::String(tracestate),
1234 );
1235 } else {
1236 visitor.fields.remove("tracestate");
1237 }
1238 if let Some(x_request_id) = tracing_context.x_request_id.clone() {
1239 visitor.fields.insert(
1240 "x_request_id".to_string(),
1241 serde_json::Value::String(x_request_id),
1242 );
1243 } else {
1244 visitor.fields.remove("x_request_id");
1245 }
1246
1247 if let Some(x_dynamo_request_id) = tracing_context.x_dynamo_request_id.clone() {
1248 visitor.fields.insert(
1249 "x_dynamo_request_id".to_string(),
1250 serde_json::Value::String(x_dynamo_request_id),
1251 );
1252 } else {
1253 visitor.fields.remove("x_dynamo_request_id");
1254 }
1255 } else {
1256 tracing::error!(
1257 "Distributed Trace Context not found, falling back to internal ids"
1258 );
1259 visitor.fields.insert(
1260 "span_id".to_string(),
1261 serde_json::Value::String(span.id().into_u64().to_string()),
1262 );
1263 if let Some(parent) = span.parent() {
1264 visitor.fields.insert(
1265 "parent_id".to_string(),
1266 serde_json::Value::String(parent.id().into_u64().to_string()),
1267 );
1268 }
1269 }
1270 } else {
1271 let reserved_fields = [
1272 "trace_id",
1273 "span_id",
1274 "parent_id",
1275 "span_name",
1276 "tracestate",
1277 ];
1278 for reserved_field in reserved_fields {
1279 visitor.fields.remove(reserved_field);
1280 }
1281 }
1282 let metadata = event.metadata();
1283 let log = JsonLog {
1284 level: metadata.level().to_string(),
1285 time,
1286 file: metadata.file(),
1287 line: metadata.line(),
1288 target: target_override.unwrap_or_else(|| metadata.target().to_string()),
1289 message,
1290 fields: visitor.fields,
1291 };
1292 let json = serde_json::to_string(&log).unwrap();
1293 writeln!(writer, "{json}")
1294 }
1295}
1296
1297#[derive(Default)]
1298struct JsonVisitor {
1299 fields: BTreeMap<String, serde_json::Value>,
1300}
1301
1302impl tracing::field::Visit for JsonVisitor {
1303 fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
1304 self.fields.insert(
1305 field.name().to_string(),
1306 serde_json::Value::String(format!("{value:?}")),
1307 );
1308 }
1309
1310 fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
1311 if field.name() != "message" {
1312 match serde_json::from_str::<Value>(value) {
1313 Ok(json_val) => self.fields.insert(field.name().to_string(), json_val),
1314 Err(_) => self.fields.insert(field.name().to_string(), value.into()),
1315 };
1316 } else {
1317 self.fields.insert(field.name().to_string(), value.into());
1318 }
1319 }
1320
1321 fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
1322 self.fields
1323 .insert(field.name().to_string(), serde_json::Value::Bool(value));
1324 }
1325
1326 fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
1327 self.fields.insert(
1328 field.name().to_string(),
1329 serde_json::Value::Number(value.into()),
1330 );
1331 }
1332
1333 fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
1334 self.fields.insert(
1335 field.name().to_string(),
1336 serde_json::Value::Number(value.into()),
1337 );
1338 }
1339
1340 fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
1341 use serde_json::value::Number;
1342 self.fields.insert(
1343 field.name().to_string(),
1344 serde_json::Value::Number(Number::from_f64(value).unwrap_or(0.into())),
1345 );
1346 }
1347}
1348
1349#[cfg(test)]
1350pub mod tests {
1351 use super::*;
1352 use anyhow::{Result, anyhow};
1353 use chrono::{DateTime, Utc};
1354 use jsonschema::{Draft, JSONSchema};
1355 use serde_json::Value;
1356 use std::fs::File;
1357 use std::io::{BufRead, BufReader};
1358 use stdio_override::*;
1359 use tempfile::NamedTempFile;
1360
1361 static LOG_LINE_SCHEMA: &str = r#"
1362 {
1363 "$schema": "http://json-schema.org/draft-07/schema#",
1364 "title": "Runtime Log Line",
1365 "type": "object",
1366 "required": [
1367 "file",
1368 "level",
1369 "line",
1370 "message",
1371 "target",
1372 "time"
1373 ],
1374 "properties": {
1375 "file": { "type": "string" },
1376 "level": { "type": "string", "enum": ["ERROR", "WARN", "INFO", "DEBUG", "TRACE"] },
1377 "line": { "type": "integer" },
1378 "message": { "type": "string" },
1379 "target": { "type": "string" },
1380 "time": { "type": "string", "format": "date-time" },
1381 "span_id": { "type": "string", "pattern": "^[a-f0-9]{16}$" },
1382 "parent_id": { "type": "string", "pattern": "^[a-f0-9]{16}$" },
1383 "trace_id": { "type": "string", "pattern": "^[a-f0-9]{32}$" },
1384 "span_name": { "type": "string" },
1385 "time.busy_us": { "type": "integer" },
1386 "time.duration_us": { "type": "integer" },
1387 "time.idle_us": { "type": "integer" },
1388 "tracestate": { "type": "string" }
1389 },
1390 "additionalProperties": true
1391 }
1392 "#;
1393
1394 #[tracing::instrument(skip_all)]
1395 async fn parent() {
1396 tracing::trace!(message = "parent!");
1397 if let Some(my_ctx) = get_distributed_tracing_context() {
1398 tracing::info!(my_trace_id = my_ctx.trace_id);
1399 }
1400 child().await;
1401 }
1402
1403 #[tracing::instrument(skip_all)]
1404 async fn child() {
1405 tracing::trace!(message = "child");
1406 if let Some(my_ctx) = get_distributed_tracing_context() {
1407 tracing::info!(my_trace_id = my_ctx.trace_id);
1408 }
1409 grandchild().await;
1410 }
1411
1412 #[tracing::instrument(skip_all)]
1413 async fn grandchild() {
1414 tracing::trace!(message = "grandchild");
1415 if let Some(my_ctx) = get_distributed_tracing_context() {
1416 tracing::info!(my_trace_id = my_ctx.trace_id);
1417 }
1418 }
1419
1420 pub fn load_log(file_name: &str) -> Result<Vec<serde_json::Value>> {
1421 let schema_json: Value =
1422 serde_json::from_str(LOG_LINE_SCHEMA).expect("schema parse failure");
1423 let compiled_schema = JSONSchema::options()
1424 .with_draft(Draft::Draft7)
1425 .compile(&schema_json)
1426 .expect("Invalid schema");
1427
1428 let f = File::open(file_name)?;
1429 let reader = BufReader::new(f);
1430 let mut result = Vec::new();
1431
1432 for (line_num, line) in reader.lines().enumerate() {
1433 let line = line?;
1434 let val: Value = serde_json::from_str(&line)
1435 .map_err(|e| anyhow!("Line {}: invalid JSON: {}", line_num + 1, e))?;
1436
1437 if let Err(errors) = compiled_schema.validate(&val) {
1438 let errs = errors.map(|e| e.to_string()).collect::<Vec<_>>().join("; ");
1439 return Err(anyhow!(
1440 "Line {}: JSON Schema Validation errors: {}",
1441 line_num + 1,
1442 errs
1443 ));
1444 }
1445 println!("{}", val);
1446 result.push(val);
1447 }
1448 Ok(result)
1449 }
1450
1451 #[tokio::test]
1452 async fn test_json_log_capture() -> Result<()> {
1453 #[allow(clippy::redundant_closure_call)]
1454 let _ = temp_env::async_with_vars(
1455 [(env_logging::DYN_LOGGING_JSONL, Some("1"))],
1456 (async || {
1457 let tmp_file = NamedTempFile::new().unwrap();
1458 let file_name = tmp_file.path().to_str().unwrap();
1459 let guard = StderrOverride::from_file(file_name)?;
1460 init();
1461 parent().await;
1462 drop(guard);
1463
1464 let lines = load_log(file_name)?;
1465
1466 let Some(trace_id) = lines
1474 .iter()
1475 .find_map(|log_line| log_line.get("trace_id").and_then(|v| v.as_str()))
1476 .map(|s| s.to_string())
1477 else {
1478 return Ok(());
1480 };
1481
1482 assert_ne!(
1484 trace_id, "00000000000000000000000000000000",
1485 "trace_id should not be a zero/invalid ID"
1486 );
1487 assert!(
1488 !trace_id.chars().all(|c| c == '0'),
1489 "trace_id should not be all zeros"
1490 );
1491
1492 for log_line in &lines {
1494 if let Some(line_trace_id) = log_line.get("trace_id") {
1495 assert_eq!(
1496 line_trace_id.as_str().unwrap(),
1497 &trace_id,
1498 "All logs should have the same trace_id"
1499 );
1500 }
1501 }
1502
1503 for log_line in &lines {
1505 if let Some(my_trace_id) = log_line.get("my_trace_id") {
1506 assert_eq!(
1507 my_trace_id,
1508 &serde_json::Value::String(trace_id.clone()),
1509 "my_trace_id should match the trace_id from distributed tracing context"
1510 );
1511 }
1512 }
1513
1514 let mut span_ids_seen: std::collections::HashSet<String> = std::collections::HashSet::new();
1516 let mut span_timestamps: std::collections::HashMap<String, DateTime<Utc>> = std::collections::HashMap::new();
1517
1518 for log_line in &lines {
1519 if let Some(span_id) = log_line.get("span_id") {
1520 let span_id_str = span_id.as_str().unwrap();
1521 assert!(
1522 is_valid_span_id(span_id_str),
1523 "Invalid span_id format: {}",
1524 span_id_str
1525 );
1526 span_ids_seen.insert(span_id_str.to_string());
1527 }
1528
1529 if let Some(time_str) = log_line.get("time").and_then(|v| v.as_str()) {
1531 let timestamp = DateTime::parse_from_rfc3339(time_str)
1532 .expect("All timestamps should be valid RFC3339 format")
1533 .with_timezone(&Utc);
1534
1535 if let Some(span_name) = log_line.get("span_name").and_then(|v| v.as_str()) {
1537 span_timestamps.insert(span_name.to_string(), timestamp);
1538 }
1539 }
1540 }
1541
1542 let parent_span_id = lines
1545 .iter()
1546 .find(|log_line| {
1547 log_line.get("span_name")
1548 .and_then(|v| v.as_str()) == Some("parent")
1549 })
1550 .and_then(|log_line| {
1551 log_line.get("span_id")
1552 .and_then(|v| v.as_str())
1553 .map(|s| s.to_string())
1554 })
1555 .expect("Should find parent span with span_id");
1556
1557 let child_span_id = lines
1558 .iter()
1559 .find(|log_line| {
1560 log_line.get("span_name")
1561 .and_then(|v| v.as_str()) == Some("child")
1562 })
1563 .and_then(|log_line| {
1564 log_line.get("span_id")
1565 .and_then(|v| v.as_str())
1566 .map(|s| s.to_string())
1567 })
1568 .expect("Should find child span with span_id");
1569
1570 let grandchild_span_id = lines
1571 .iter()
1572 .find(|log_line| {
1573 log_line.get("span_name")
1574 .and_then(|v| v.as_str()) == Some("grandchild")
1575 })
1576 .and_then(|log_line| {
1577 log_line.get("span_id")
1578 .and_then(|v| v.as_str())
1579 .map(|s| s.to_string())
1580 })
1581 .expect("Should find grandchild span with span_id");
1582
1583 assert_ne!(parent_span_id, child_span_id, "Parent and child should have different span IDs");
1585 assert_ne!(child_span_id, grandchild_span_id, "Child and grandchild should have different span IDs");
1586 assert_ne!(parent_span_id, grandchild_span_id, "Parent and grandchild should have different span IDs");
1587
1588 for log_line in &lines {
1590 if let Some(span_name) = log_line.get("span_name")
1591 && let Some(span_name_str) = span_name.as_str()
1592 && span_name_str == "parent"
1593 {
1594 assert!(
1595 log_line.get("parent_id").is_none(),
1596 "Parent span should not have a parent_id"
1597 );
1598 }
1599 }
1600
1601 for log_line in &lines {
1603 if let Some(span_name) = log_line.get("span_name")
1604 && let Some(span_name_str) = span_name.as_str()
1605 && span_name_str == "child"
1606 {
1607 let parent_id = log_line.get("parent_id")
1608 .and_then(|v| v.as_str())
1609 .expect("Child span should have a parent_id");
1610 assert_eq!(
1611 parent_id,
1612 parent_span_id,
1613 "Child's parent_id should match parent's span_id"
1614 );
1615 }
1616 }
1617
1618 for log_line in &lines {
1620 if let Some(span_name) = log_line.get("span_name")
1621 && let Some(span_name_str) = span_name.as_str()
1622 && span_name_str == "grandchild"
1623 {
1624 let parent_id = log_line.get("parent_id")
1625 .and_then(|v| v.as_str())
1626 .expect("Grandchild span should have a parent_id");
1627 assert_eq!(
1628 parent_id,
1629 child_span_id,
1630 "Grandchild's parent_id should match child's span_id"
1631 );
1632 }
1633 }
1634
1635 let parent_time = span_timestamps.get("parent")
1637 .expect("Should have timestamp for parent span");
1638 let child_time = span_timestamps.get("child")
1639 .expect("Should have timestamp for child span");
1640 let grandchild_time = span_timestamps.get("grandchild")
1641 .expect("Should have timestamp for grandchild span");
1642
1643 assert!(
1645 parent_time <= child_time,
1646 "Parent span should log before or at same time as child span (parent: {}, child: {})",
1647 parent_time,
1648 child_time
1649 );
1650 assert!(
1651 child_time <= grandchild_time,
1652 "Child span should log before or at same time as grandchild span (child: {}, grandchild: {})",
1653 child_time,
1654 grandchild_time
1655 );
1656
1657 Ok::<(), anyhow::Error>(())
1658 })(),
1659 )
1660 .await;
1661 Ok(())
1662 }
1663
1664 #[tracing::instrument(level = "debug", skip_all)]
1666 async fn debug_level_span() {
1667 tracing::debug!("inside debug span");
1668 }
1669
1670 #[tracing::instrument(level = "info", skip_all)]
1671 async fn info_level_span() {
1672 tracing::info!("inside info span");
1673 }
1674
1675 #[tracing::instrument(level = "warn", skip_all)]
1676 async fn warn_level_span() {
1677 tracing::warn!("inside warn span");
1678 }
1679
1680 #[tracing::instrument(level = "info", target = "other_module", skip_all)]
1683 async fn other_target_info_span() {
1684 tracing::info!(target: "other_module", "inside other target span");
1685 }
1686
1687 #[test]
1698 fn test_span_events() {
1699 use std::process::Command;
1700
1701 let output = Command::new("cargo")
1703 .args([
1704 "test",
1705 "-p",
1706 "dynamo-runtime",
1707 "test_span_events_subprocess",
1708 "--",
1709 "--exact",
1710 "--nocapture",
1711 ])
1712 .env("DYN_LOGGING_JSONL", "1")
1713 .env("DYN_LOGGING_SPAN_EVENTS", "1")
1714 .env("DYN_LOG", "warn,dynamo_runtime::logging::tests=debug")
1715 .output()
1716 .expect("Failed to execute subprocess test");
1717
1718 if !output.status.success() {
1720 eprintln!(
1721 "=== STDOUT ===\n{}",
1722 String::from_utf8_lossy(&output.stdout)
1723 );
1724 eprintln!(
1725 "=== STDERR ===\n{}",
1726 String::from_utf8_lossy(&output.stderr)
1727 );
1728 }
1729
1730 assert!(
1731 output.status.success(),
1732 "Subprocess test failed with exit code: {:?}",
1733 output.status.code()
1734 );
1735 }
1736
1737 #[tokio::test]
1740 async fn test_span_events_subprocess() -> Result<()> {
1741 if std::env::var("DYN_LOGGING_SPAN_EVENTS").is_err() {
1743 return Ok(());
1744 }
1745
1746 let tmp_file = NamedTempFile::new().unwrap();
1747 let file_name = tmp_file.path().to_str().unwrap();
1748 let guard = StderrOverride::from_file(file_name)?;
1749 init();
1750
1751 parent().await;
1753
1754 debug_level_span().await;
1756 info_level_span().await;
1757 warn_level_span().await;
1758
1759 other_target_info_span().await;
1761
1762 drop(guard);
1763
1764 let lines = load_log(file_name)?;
1765
1766 let has_span_event = |msg: &str, span_name: &str| {
1768 lines.iter().any(|log| {
1769 log.get("message").and_then(|v| v.as_str()) == Some(msg)
1770 && log.get("span_name").and_then(|v| v.as_str()) == Some(span_name)
1771 })
1772 };
1773
1774 let get_span_events = |msg: &str| -> Vec<&serde_json::Value> {
1776 lines
1777 .iter()
1778 .filter(|log| log.get("message").and_then(|v| v.as_str()) == Some(msg))
1779 .collect()
1780 };
1781
1782 let span_created_events = get_span_events("SPAN_FIRST_ENTRY");
1784 for event in &span_created_events {
1785 assert!(
1787 event.get("span_name").is_some(),
1788 "SPAN_FIRST_ENTRY must have span_name"
1789 );
1790 let trace_id = event
1792 .get("trace_id")
1793 .and_then(|v| v.as_str())
1794 .expect("SPAN_FIRST_ENTRY must have trace_id");
1795 assert!(
1796 trace_id.len() == 32 && trace_id.chars().all(|c| c.is_ascii_hexdigit()),
1797 "SPAN_FIRST_ENTRY must have valid trace_id format"
1798 );
1799 let span_id = event
1801 .get("span_id")
1802 .and_then(|v| v.as_str())
1803 .expect("SPAN_FIRST_ENTRY must have span_id");
1804 assert!(
1805 is_valid_span_id(span_id),
1806 "SPAN_FIRST_ENTRY must have valid span_id"
1807 );
1808 }
1809
1810 let span_closed_events = get_span_events("SPAN_CLOSED");
1812 for event in &span_closed_events {
1813 assert!(
1814 event.get("span_name").is_some(),
1815 "SPAN_CLOSED must have span_name"
1816 );
1817 assert!(
1818 event.get("time.busy_us").is_some()
1819 || event.get("time.idle_us").is_some()
1820 || event.get("time.duration_us").is_some(),
1821 "SPAN_CLOSED must have timing information"
1822 );
1823 let trace_id = event
1825 .get("trace_id")
1826 .and_then(|v| v.as_str())
1827 .expect("SPAN_CLOSED must have trace_id");
1828 assert!(
1829 trace_id.len() == 32 && trace_id.chars().all(|c| c.is_ascii_hexdigit()),
1830 "SPAN_CLOSED must have valid trace_id format"
1831 );
1832 }
1833
1834 assert!(
1838 has_span_event("SPAN_FIRST_ENTRY", "debug_level_span"),
1839 "DEBUG span from allowed target MUST pass (target=debug filter)"
1840 );
1841 assert!(
1842 has_span_event("SPAN_FIRST_ENTRY", "info_level_span"),
1843 "INFO span from allowed target MUST pass (target=debug filter)"
1844 );
1845 assert!(
1846 has_span_event("SPAN_FIRST_ENTRY", "warn_level_span"),
1847 "WARN span from allowed target MUST pass (target=debug filter)"
1848 );
1849
1850 assert!(
1852 has_span_event("SPAN_FIRST_ENTRY", "parent"),
1853 "parent span (INFO) from allowed target MUST pass"
1854 );
1855 assert!(
1856 has_span_event("SPAN_FIRST_ENTRY", "child"),
1857 "child span (INFO) from allowed target MUST pass"
1858 );
1859 assert!(
1860 has_span_event("SPAN_FIRST_ENTRY", "grandchild"),
1861 "grandchild span (INFO) from allowed target MUST pass"
1862 );
1863
1864 assert!(
1867 !has_span_event("SPAN_FIRST_ENTRY", "other_target_info_span"),
1868 "INFO span from non-allowed target (other_module) MUST be filtered out"
1869 );
1870
1871 for event in &span_created_events {
1873 let target = event.get("target").and_then(|v| v.as_str()).unwrap_or("");
1874 let level = event.get("level").and_then(|v| v.as_str()).unwrap_or("");
1875
1876 if level == "DEBUG" || level == "INFO" {
1878 assert!(
1879 target.contains("dynamo_runtime::logging::tests"),
1880 "DEBUG/INFO span must be from allowed target, got target={target}"
1881 );
1882 }
1883 }
1884
1885 Ok(())
1886 }
1887}