1use std::collections::{BTreeMap, HashMap};
30use std::sync::Once;
31
32use figment::{
33 Figment,
34 providers::{Format, Serialized, Toml},
35};
36use serde::{Deserialize, Serialize};
37use tracing::level_filters::LevelFilter;
38use tracing::{Event, Subscriber};
39use tracing_subscriber::EnvFilter;
40use tracing_subscriber::fmt::time::FormatTime;
41use tracing_subscriber::fmt::time::LocalTime;
42use tracing_subscriber::fmt::time::SystemTime;
43use tracing_subscriber::fmt::time::UtcTime;
44use tracing_subscriber::fmt::{FmtContext, FormatFields};
45use tracing_subscriber::fmt::{FormattedFields, format::Writer};
46use tracing_subscriber::prelude::*;
47use tracing_subscriber::registry::LookupSpan;
48use tracing_subscriber::{filter::Directive, fmt};
49
50use crate::config::{
51 disable_ansi_logging, env_is_truthy, jsonl_logging_enabled, span_events_enabled,
52};
53use async_nats::{HeaderMap, HeaderValue};
54use axum::extract::FromRequestParts;
55use axum::http;
56use axum::http::Request;
57use axum::http::request::Parts;
58use serde_json::Value;
59use std::convert::Infallible;
60use std::time::Instant;
61use tower_http::trace::{DefaultMakeSpan, TraceLayer};
62use tracing::Id;
63use tracing::Span;
64use tracing::field::Field;
65use tracing::span;
66use tracing_subscriber::Layer;
67use tracing_subscriber::Registry;
68use tracing_subscriber::field::Visit;
69use tracing_subscriber::fmt::format::FmtSpan;
70use tracing_subscriber::layer::Context;
71use tracing_subscriber::registry::SpanData;
72use uuid::Uuid;
73
74use opentelemetry::propagation::{Extractor, Injector, TextMapPropagator};
75use opentelemetry::trace::TraceContextExt;
76use opentelemetry::{global, trace::Tracer};
77use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
78use opentelemetry_otlp::WithExportConfig;
79
80use opentelemetry::trace::TracerProvider as _;
81use opentelemetry::{Key, KeyValue};
82use opentelemetry_sdk::Resource;
83use opentelemetry_sdk::logs::SdkLoggerProvider;
84use opentelemetry_sdk::trace::SdkTracerProvider;
85use tracing::error;
86use tracing_subscriber::layer::SubscriberExt;
87use std::time::Duration;
90use tracing::{info, instrument};
91use tracing_opentelemetry::OpenTelemetrySpanExt;
92use tracing_subscriber::util::SubscriberInitExt;
93
94use crate::config::environment_names::logging as env_logging;
95
96const DEFAULT_FILTER_LEVEL: &str = "info";
98
99const DEFAULT_OTLP_ENDPOINT: &str = "http://localhost:4317";
101
102const DEFAULT_OTEL_SERVICE_NAME: &str = "dynamo";
104
105static INIT: Once = Once::new();
107
108#[derive(Serialize, Deserialize, Debug)]
109struct LoggingConfig {
110 log_level: String,
111 log_filters: HashMap<String, String>,
112}
113impl Default for LoggingConfig {
114 fn default() -> Self {
115 LoggingConfig {
116 log_level: DEFAULT_FILTER_LEVEL.to_string(),
117 log_filters: HashMap::from([
118 ("h2".to_string(), "error".to_string()),
119 ("tower".to_string(), "error".to_string()),
120 ("hyper_util".to_string(), "error".to_string()),
121 ("neli".to_string(), "error".to_string()),
122 ("async_nats".to_string(), "error".to_string()),
123 ("rustls".to_string(), "error".to_string()),
124 ("tokenizers".to_string(), "error".to_string()),
125 ("axum".to_string(), "error".to_string()),
126 ("tonic".to_string(), "error".to_string()),
127 ("hf_hub".to_string(), "error".to_string()),
128 ("opentelemetry".to_string(), "error".to_string()),
129 ("opentelemetry-otlp".to_string(), "error".to_string()),
130 ("opentelemetry_sdk".to_string(), "error".to_string()),
131 ]),
132 }
133 }
134}
135
136fn otlp_exporter_enabled() -> bool {
138 env_is_truthy(env_logging::otlp::OTEL_EXPORT_ENABLED)
139}
140
141fn get_service_name() -> String {
143 std::env::var(env_logging::otlp::OTEL_SERVICE_NAME)
144 .unwrap_or_else(|_| DEFAULT_OTEL_SERVICE_NAME.to_string())
145}
146
147pub fn is_valid_trace_id(trace_id: &str) -> bool {
150 trace_id.len() == 32 && trace_id.chars().all(|c| c.is_ascii_hexdigit())
151}
152
153pub fn is_valid_span_id(span_id: &str) -> bool {
156 span_id.len() == 16 && span_id.chars().all(|c| c.is_ascii_hexdigit())
157}
158
159pub struct DistributedTraceIdLayer;
160
161#[derive(Debug, Clone, Serialize, Deserialize)]
162pub struct DistributedTraceContext {
163 pub trace_id: String,
164 pub span_id: String,
165 #[serde(skip_serializing_if = "Option::is_none")]
166 pub parent_id: Option<String>,
167 #[serde(skip_serializing_if = "Option::is_none")]
168 pub tracestate: Option<String>,
169 #[serde(skip)]
170 start: Option<Instant>,
171 #[serde(skip)]
172 end: Option<Instant>,
173 #[serde(skip_serializing_if = "Option::is_none")]
174 pub x_request_id: Option<String>,
175 #[serde(skip_serializing_if = "Option::is_none")]
176 pub request_id: Option<String>,
177}
178
179#[derive(Debug, Clone)]
181struct PendingDistributedTraceContext {
182 trace_id: Option<String>,
183 span_id: Option<String>,
184 parent_id: Option<String>,
185 tracestate: Option<String>,
186 x_request_id: Option<String>,
187 request_id: Option<String>,
188}
189
190macro_rules! emit_at_level {
192 ($level:expr, target: $target:expr, $($arg:tt)*) => {
193 match $level {
197 &tracing::Level::ERROR => tracing::event!(target: $target, tracing::Level::ERROR, $($arg)*),
198 &tracing::Level::WARN => tracing::event!(target: $target, tracing::Level::WARN, $($arg)*),
199 &tracing::Level::INFO => tracing::event!(target: $target, tracing::Level::INFO, $($arg)*),
200 &tracing::Level::DEBUG => tracing::event!(target: $target, tracing::Level::DEBUG, $($arg)*),
201 &tracing::Level::TRACE => tracing::event!(target: $target, tracing::Level::TRACE, $($arg)*),
202 }
203 };
204}
205
206impl DistributedTraceContext {
207 pub fn create_traceparent(&self) -> String {
209 format!("00-{}-{}-01", self.trace_id, self.span_id)
210 }
211}
212
213pub fn parse_traceparent(traceparent: &str) -> (Option<String>, Option<String>) {
215 let pieces: Vec<_> = traceparent.split('-').collect();
216 if pieces.len() != 4 {
217 return (None, None);
218 }
219 let trace_id = pieces[1];
220 let parent_id = pieces[2];
221
222 if !is_valid_trace_id(trace_id) || !is_valid_span_id(parent_id) {
223 return (None, None);
224 }
225
226 (Some(trace_id.to_string()), Some(parent_id.to_string()))
227}
228
229#[derive(Debug, Clone, Default)]
230pub struct TraceParent {
231 pub trace_id: Option<String>,
232 pub parent_id: Option<String>,
233 pub tracestate: Option<String>,
234 pub x_request_id: Option<String>,
235 pub request_id: Option<String>,
236}
237
238pub trait GenericHeaders {
239 fn get(&self, key: &str) -> Option<&str>;
240}
241
242impl GenericHeaders for async_nats::HeaderMap {
243 fn get(&self, key: &str) -> Option<&str> {
244 async_nats::HeaderMap::get(self, key).map(|value| value.as_str())
245 }
246}
247
248impl GenericHeaders for http::HeaderMap {
249 fn get(&self, key: &str) -> Option<&str> {
250 http::HeaderMap::get(self, key).and_then(|value| value.to_str().ok())
251 }
252}
253
254impl TraceParent {
255 pub fn from_headers<H: GenericHeaders>(headers: &H) -> TraceParent {
256 let mut trace_id = None;
257 let mut parent_id = None;
258 let mut tracestate = None;
259 let mut x_request_id = None;
260 let mut request_id = None;
261
262 if let Some(header_value) = headers.get("traceparent") {
263 (trace_id, parent_id) = parse_traceparent(header_value);
264 }
265
266 if let Some(header_value) = headers.get("x-request-id") {
267 x_request_id = Some(header_value.to_string());
268 }
269
270 if let Some(header_value) = headers.get("tracestate") {
271 tracestate = Some(header_value.to_string());
272 }
273
274 if let Some(header_value) = headers.get("request-id") {
276 request_id = Some(header_value.to_string());
277 } else if let Some(header_value) = headers.get("x-dynamo-request-id") {
278 request_id = Some(header_value.to_string());
279 }
280
281 let request_id = request_id.filter(|id| uuid::Uuid::parse_str(id).is_ok());
282 TraceParent {
283 trace_id,
284 parent_id,
285 tracestate,
286 x_request_id,
287 request_id,
288 }
289 }
290}
291
292pub fn make_inference_request_span<B>(req: &Request<B>) -> Span {
298 let method = req.method();
299 let uri = req.uri();
300 let version = format!("{:?}", req.version());
301 let trace_parent = TraceParent::from_headers(req.headers());
302
303 let otel_context = extract_otel_context_from_http_headers(req.headers());
304
305 let request_id = trace_parent
309 .request_id
310 .unwrap_or_else(|| Uuid::new_v4().to_string());
311
312 let span = tracing::info_span!(
313 target: "request_span",
314 "http-request",
315 method = %method,
316 uri = %uri,
317 version = %version,
318 trace_id = trace_parent.trace_id,
319 parent_id = trace_parent.parent_id,
320 x_request_id = trace_parent.x_request_id,
321 request_id = %request_id,
322 model = tracing::field::Empty,
323 input_tokens = tracing::field::Empty,
324 output_tokens = tracing::field::Empty,
325 ttft_ms = tracing::field::Empty,
326 avg_itl_ms = tracing::field::Empty,
327 prefill_worker_id = tracing::field::Empty,
328 decode_worker_id = tracing::field::Empty,
329 );
330
331 if let Some(context) = otel_context {
332 let _ = span.set_parent(context);
333 }
334
335 span
336}
337
338pub fn make_system_request_span<B>(req: &Request<B>) -> Span {
345 let method = req.method();
346 let uri = req.uri();
347 let version = format!("{:?}", req.version());
348 let trace_parent = TraceParent::from_headers(req.headers());
349 let otel_context = extract_otel_context_from_http_headers(req.headers());
350
351 let request_id = trace_parent
353 .request_id
354 .unwrap_or_else(|| Uuid::new_v4().to_string());
355
356 let span = tracing::debug_span!(
357 target: "system_span",
358 "http-request",
359 method = %method,
360 uri = %uri,
361 version = %version,
362 trace_id = trace_parent.trace_id,
363 parent_id = trace_parent.parent_id,
364 x_request_id = trace_parent.x_request_id,
365 request_id = %request_id,
366 model = tracing::field::Empty,
367 input_tokens = tracing::field::Empty,
368 output_tokens = tracing::field::Empty,
369 ttft_ms = tracing::field::Empty,
370 avg_itl_ms = tracing::field::Empty,
371 prefill_worker_id = tracing::field::Empty,
372 decode_worker_id = tracing::field::Empty,
373 );
374
375 if let Some(context) = otel_context {
376 let _ = span.set_parent(context);
377 }
378
379 span
380}
381
382fn extract_otel_context_from_http_headers(
384 headers: &http::HeaderMap,
385) -> Option<opentelemetry::Context> {
386 let traceparent_value = headers.get("traceparent")?.to_str().ok()?;
387
388 struct HttpHeaderExtractor<'a>(&'a http::HeaderMap);
389
390 impl<'a> Extractor for HttpHeaderExtractor<'a> {
391 fn get(&self, key: &str) -> Option<&str> {
392 self.0.get(key).and_then(|v| v.to_str().ok())
393 }
394
395 fn keys(&self) -> Vec<&str> {
396 vec!["traceparent", "tracestate"]
397 .into_iter()
398 .filter(|&key| self.0.get(key).is_some())
399 .collect()
400 }
401 }
402
403 if traceparent_value.is_empty() {
405 return None;
406 }
407
408 let extractor = HttpHeaderExtractor(headers);
409 let otel_context = TRACE_PROPAGATOR.extract(&extractor);
410
411 if otel_context.span().span_context().is_valid() {
412 Some(otel_context)
413 } else {
414 None
415 }
416}
417
418pub fn make_handle_payload_span(
420 headers: &async_nats::HeaderMap,
421 component: &str,
422 endpoint: &str,
423 namespace: &str,
424 instance_id: u64,
425) -> Span {
426 let (otel_context, trace_id, parent_span_id) = extract_otel_context_from_nats_headers(headers);
427 let trace_parent = TraceParent::from_headers(headers);
428
429 if let (Some(trace_id), Some(parent_id)) = (trace_id.as_ref(), parent_span_id.as_ref()) {
430 let span = tracing::info_span!(
431 target: "request_span",
432 "handle_payload",
433 trace_id = trace_id.as_str(),
434 parent_id = parent_id.as_str(),
435 x_request_id = trace_parent.x_request_id,
436 request_id = trace_parent.request_id,
437 tracestate = trace_parent.tracestate,
438 component = component,
439 endpoint = endpoint,
440 namespace = namespace,
441 instance_id = instance_id,
442 );
443
444 if let Some(context) = otel_context {
445 let _ = span.set_parent(context);
446 }
447 span
448 } else {
449 tracing::info_span!(
450 target: "request_span",
451 "handle_payload",
452 x_request_id = trace_parent.x_request_id,
453 request_id = trace_parent.request_id,
454 tracestate = trace_parent.tracestate,
455 component = component,
456 endpoint = endpoint,
457 namespace = namespace,
458 instance_id = instance_id,
459 )
460 }
461}
462
463pub fn make_handle_payload_span_from_tcp_headers(
465 headers: &std::collections::HashMap<String, String>,
466 component: &str,
467 endpoint: &str,
468 namespace: &str,
469 instance_id: u64,
470) -> Span {
471 let (otel_context, trace_id, parent_span_id) = extract_otel_context_from_tcp_headers(headers);
472 let x_request_id = headers.get("x-request-id").cloned();
473 let request_id = headers
474 .get("request-id")
475 .or_else(|| headers.get("x-dynamo-request-id"))
476 .filter(|id| uuid::Uuid::parse_str(id).is_ok())
477 .cloned();
478 let tracestate = headers.get("tracestate").cloned();
479
480 if let (Some(trace_id), Some(parent_id)) = (trace_id.as_ref(), parent_span_id.as_ref()) {
481 let span = tracing::info_span!(
482 target: "request_span",
483 "handle_payload",
484 trace_id = trace_id.as_str(),
485 parent_id = parent_id.as_str(),
486 x_request_id = x_request_id,
487 request_id = request_id,
488 tracestate = tracestate,
489 component = component,
490 endpoint = endpoint,
491 namespace = namespace,
492 instance_id = instance_id,
493 );
494
495 if let Some(context) = otel_context {
496 let _ = span.set_parent(context);
497 }
498 span
499 } else {
500 tracing::info_span!(
501 target: "request_span",
502 "handle_payload",
503 x_request_id = x_request_id,
504 request_id = request_id,
505 tracestate = tracestate,
506 component = component,
507 endpoint = endpoint,
508 namespace = namespace,
509 instance_id = instance_id,
510 )
511 }
512}
513
514fn extract_otel_context_from_tcp_headers(
516 headers: &std::collections::HashMap<String, String>,
517) -> (
518 Option<opentelemetry::Context>,
519 Option<String>,
520 Option<String>,
521) {
522 let traceparent_value = match headers.get("traceparent") {
523 Some(value) => value.as_str(),
524 None => return (None, None, None),
525 };
526
527 let (trace_id, parent_span_id) = parse_traceparent(traceparent_value);
528
529 struct TcpHeaderExtractor<'a>(&'a std::collections::HashMap<String, String>);
530
531 impl<'a> Extractor for TcpHeaderExtractor<'a> {
532 fn get(&self, key: &str) -> Option<&str> {
533 self.0.get(key).map(|s| s.as_str())
534 }
535
536 fn keys(&self) -> Vec<&str> {
537 vec!["traceparent", "tracestate"]
538 .into_iter()
539 .filter(|&key| self.0.get(key).is_some())
540 .collect()
541 }
542 }
543
544 let extractor = TcpHeaderExtractor(headers);
545 let otel_context = TRACE_PROPAGATOR.extract(&extractor);
546
547 let context_with_trace = if otel_context.span().span_context().is_valid() {
548 Some(otel_context)
549 } else {
550 None
551 };
552
553 (context_with_trace, trace_id, parent_span_id)
554}
555
556pub fn extract_otel_context_from_nats_headers(
558 headers: &async_nats::HeaderMap,
559) -> (
560 Option<opentelemetry::Context>,
561 Option<String>,
562 Option<String>,
563) {
564 let traceparent_value = match headers.get("traceparent") {
565 Some(value) => value.as_str(),
566 None => return (None, None, None),
567 };
568
569 let (trace_id, parent_span_id) = parse_traceparent(traceparent_value);
570
571 struct NatsHeaderExtractor<'a>(&'a async_nats::HeaderMap);
572
573 impl<'a> Extractor for NatsHeaderExtractor<'a> {
574 fn get(&self, key: &str) -> Option<&str> {
575 self.0.get(key).map(|value| value.as_str())
576 }
577
578 fn keys(&self) -> Vec<&str> {
579 vec!["traceparent", "tracestate"]
580 .into_iter()
581 .filter(|&key| self.0.get(key).is_some())
582 .collect()
583 }
584 }
585
586 let extractor = NatsHeaderExtractor(headers);
587 let otel_context = TRACE_PROPAGATOR.extract(&extractor);
588
589 let context_with_trace = if otel_context.span().span_context().is_valid() {
590 Some(otel_context)
591 } else {
592 None
593 };
594
595 (context_with_trace, trace_id, parent_span_id)
596}
597
598pub fn inject_otel_context_into_nats_headers(
600 headers: &mut async_nats::HeaderMap,
601 context: Option<opentelemetry::Context>,
602) {
603 let otel_context = context.unwrap_or_else(|| Span::current().context());
604
605 struct NatsHeaderInjector<'a>(&'a mut async_nats::HeaderMap);
606
607 impl<'a> Injector for NatsHeaderInjector<'a> {
608 fn set(&mut self, key: &str, value: String) {
609 self.0.insert(key, value);
610 }
611 }
612
613 let mut injector = NatsHeaderInjector(headers);
614 TRACE_PROPAGATOR.inject_context(&otel_context, &mut injector);
615}
616
617pub fn inject_current_trace_into_nats_headers(headers: &mut async_nats::HeaderMap) {
619 inject_otel_context_into_nats_headers(headers, None);
620}
621
622pub fn inject_trace_headers_into_map(headers: &mut std::collections::HashMap<String, String>) {
624 if let Some(trace_context) = get_distributed_tracing_context() {
625 headers.insert(
627 "traceparent".to_string(),
628 trace_context.create_traceparent(),
629 );
630
631 if let Some(tracestate) = trace_context.tracestate {
633 headers.insert("tracestate".to_string(), tracestate);
634 }
635
636 if let Some(x_request_id) = trace_context.x_request_id {
638 headers.insert("x-request-id".to_string(), x_request_id);
639 }
640 if let Some(request_id) = trace_context.request_id {
641 headers.insert("request-id".to_string(), request_id);
642 }
643 }
644}
645
646pub fn make_client_request_span(
648 operation: &str,
649 request_id: &str,
650 trace_context: Option<&DistributedTraceContext>,
651 instance_id: Option<&str>,
652) -> Span {
653 if let Some(ctx) = trace_context {
654 let mut headers = async_nats::HeaderMap::new();
655 headers.insert("traceparent", ctx.create_traceparent());
656
657 if let Some(ref tracestate) = ctx.tracestate {
658 headers.insert("tracestate", tracestate.as_str());
659 }
660
661 let (otel_context, _extracted_trace_id, _extracted_parent_span_id) =
662 extract_otel_context_from_nats_headers(&headers);
663
664 let span = if let Some(inst_id) = instance_id {
665 tracing::info_span!(
666 "client_request",
667 operation = operation,
668 request_id = request_id,
669 instance_id = inst_id,
670 trace_id = ctx.trace_id.as_str(),
671 parent_id = ctx.span_id.as_str(),
672 x_request_id = ctx.x_request_id.as_deref(),
673 )
674 } else {
675 tracing::info_span!(
676 "client_request",
677 operation = operation,
678 request_id = request_id,
679 trace_id = ctx.trace_id.as_str(),
680 parent_id = ctx.span_id.as_str(),
681 x_request_id = ctx.x_request_id.as_deref(),
682 )
683 };
684
685 if let Some(context) = otel_context {
686 let _ = span.set_parent(context);
687 }
688
689 span
690 } else if let Some(inst_id) = instance_id {
691 tracing::info_span!(
692 "client_request",
693 operation = operation,
694 request_id = request_id,
695 instance_id = inst_id,
696 )
697 } else {
698 tracing::info_span!(
699 "client_request",
700 operation = operation,
701 request_id = request_id,
702 )
703 }
704}
705
706#[derive(Debug, Default)]
707pub struct FieldVisitor {
708 pub fields: HashMap<String, String>,
709}
710
711impl Visit for FieldVisitor {
712 fn record_str(&mut self, field: &Field, value: &str) {
713 self.fields
714 .insert(field.name().to_string(), value.to_string());
715 }
716
717 fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
718 self.fields
719 .insert(field.name().to_string(), format!("{:?}", value).to_string());
720 }
721}
722
723impl<S> Layer<S> for DistributedTraceIdLayer
724where
725 S: Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>,
726{
727 fn on_close(&self, id: Id, ctx: Context<'_, S>) {
730 if let Some(span) = ctx.span(&id) {
731 let mut extensions = span.extensions_mut();
732 if let Some(distributed_tracing_context) =
733 extensions.get_mut::<DistributedTraceContext>()
734 {
735 distributed_tracing_context.end = Some(Instant::now());
736 }
737 }
738 }
739
740 fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &Id, ctx: Context<'_, S>) {
743 if let Some(span) = ctx.span(id) {
744 let mut trace_id: Option<String> = None;
745 let mut parent_id: Option<String> = None;
746 let mut span_id: Option<String> = None;
747 let mut x_request_id: Option<String> = None;
748 let mut request_id: Option<String> = None;
749 let mut tracestate: Option<String> = None;
750 let mut visitor = FieldVisitor::default();
751 attrs.record(&mut visitor);
752
753 if let Some(trace_id_input) = visitor.fields.get("trace_id") {
755 if !is_valid_trace_id(trace_id_input) {
756 tracing::trace!("trace id '{trace_id_input}' is not valid! Ignoring.");
757 } else {
758 trace_id = Some(trace_id_input.to_string());
759 }
760 }
761
762 if let Some(span_id_input) = visitor.fields.get("span_id") {
764 if !is_valid_span_id(span_id_input) {
765 tracing::trace!("span id '{span_id_input}' is not valid! Ignoring.");
766 } else {
767 span_id = Some(span_id_input.to_string());
768 }
769 }
770
771 if let Some(parent_id_input) = visitor.fields.get("parent_id") {
773 if !is_valid_span_id(parent_id_input) {
774 tracing::trace!("parent id '{parent_id_input}' is not valid! Ignoring.");
775 } else {
776 parent_id = Some(parent_id_input.to_string());
777 }
778 }
779
780 if let Some(tracestate_input) = visitor.fields.get("tracestate") {
782 tracestate = Some(tracestate_input.to_string());
783 }
784
785 if let Some(x_request_id_input) = visitor.fields.get("x_request_id") {
787 x_request_id = Some(x_request_id_input.to_string());
788 }
789
790 if let Some(request_id_input) = visitor.fields.get("request_id") {
792 request_id = Some(request_id_input.to_string());
793 } else if let Some(x_request_id_input) = visitor.fields.get("x_dynamo_request_id") {
794 request_id = Some(x_request_id_input.to_string());
795 }
796
797 if parent_id.is_none()
799 && let Some(parent_span_id) = ctx.current_span().id()
800 && let Some(parent_span) = ctx.span(parent_span_id)
801 {
802 let parent_ext = parent_span.extensions();
803 if let Some(parent_tracing_context) = parent_ext.get::<DistributedTraceContext>() {
804 trace_id = Some(parent_tracing_context.trace_id.clone());
805 parent_id = Some(parent_tracing_context.span_id.clone());
806 tracestate = parent_tracing_context.tracestate.clone();
807 if x_request_id.is_none() {
808 x_request_id = parent_tracing_context.x_request_id.clone();
809 }
810 if request_id.is_none() {
811 request_id = parent_tracing_context.request_id.clone();
812 }
813 }
814 }
815
816 if (parent_id.is_some() || span_id.is_some()) && trace_id.is_none() {
818 tracing::error!("parent id or span id are set but trace id is not set!");
819 parent_id = None;
821 span_id = None;
822 }
823
824 let mut extensions = span.extensions_mut();
826 extensions.insert(PendingDistributedTraceContext {
827 trace_id,
828 span_id,
829 parent_id,
830 tracestate,
831 x_request_id,
832 request_id,
833 });
834 }
835 }
836
837 fn on_enter(&self, id: &Id, ctx: Context<'_, S>) {
840 if let Some(span) = ctx.span(id) {
841 {
843 let extensions = span.extensions();
844 if extensions.get::<DistributedTraceContext>().is_some() {
845 return;
846 }
847 }
848
849 let mut extensions = span.extensions_mut();
851 let pending = match extensions.remove::<PendingDistributedTraceContext>() {
852 Some(p) => p,
853 None => {
854 tracing::error!("PendingDistributedTraceContext not found in on_enter");
856 return;
857 }
858 };
859
860 let mut trace_id = pending.trace_id;
861 let mut span_id = pending.span_id;
862 let parent_id = pending.parent_id;
863 let tracestate = pending.tracestate;
864 let x_request_id = pending.x_request_id;
865 let request_id = pending.request_id;
866
867 drop(extensions);
870
871 if trace_id.is_none() || span_id.is_none() {
872 let extensions = span.extensions();
873 if let Some(otel_data) = extensions.get::<tracing_opentelemetry::OtelData>() {
874 if trace_id.is_none()
876 && let Some(otel_trace_id) = otel_data.trace_id()
877 {
878 let trace_id_str = format!("{}", otel_trace_id);
879 if is_valid_trace_id(&trace_id_str) {
880 trace_id = Some(trace_id_str);
881 }
882 }
883
884 if span_id.is_none()
886 && let Some(otel_span_id) = otel_data.span_id()
887 {
888 let span_id_str = format!("{}", otel_span_id);
889 if is_valid_span_id(&span_id_str) {
890 span_id = Some(span_id_str);
891 }
892 }
893 }
894 }
895
896 if trace_id.is_none() {
898 panic!(
899 "trace_id is not set in on_enter - OtelData may not be properly initialized"
900 );
901 }
902
903 if span_id.is_none() {
904 panic!("span_id is not set in on_enter - OtelData may not be properly initialized");
905 }
906
907 let span_level = span.metadata().level();
908 let mut extensions = span.extensions_mut();
909 extensions.insert(DistributedTraceContext {
910 trace_id: trace_id.expect("Trace ID must be set"),
911 span_id: span_id.expect("Span ID must be set"),
912 parent_id,
913 tracestate,
914 start: Some(Instant::now()),
915 end: None,
916 x_request_id,
917 request_id,
918 });
919
920 drop(extensions);
921
922 if span_events_enabled() {
925 emit_at_level!(span_level, target: "span_event", message = "SPAN_FIRST_ENTRY");
926 }
927 }
928 }
929}
930
931pub fn get_distributed_tracing_context() -> Option<DistributedTraceContext> {
934 Span::current()
935 .with_subscriber(|(id, subscriber)| {
936 subscriber
937 .downcast_ref::<Registry>()
938 .and_then(|registry| registry.span_data(id))
939 .and_then(|span_data| {
940 let extensions = span_data.extensions();
941 extensions.get::<DistributedTraceContext>().cloned()
942 })
943 })
944 .flatten()
945}
946
947pub fn init() {
949 INIT.call_once(|| {
950 if let Err(e) = setup_logging() {
951 eprintln!("Failed to initialize logging: {}", e);
952 std::process::exit(1);
953 }
954 });
955}
956
957#[cfg(feature = "tokio-console")]
958fn setup_logging() -> Result<(), Box<dyn std::error::Error>> {
959 let tokio_console_layer = console_subscriber::ConsoleLayer::builder()
960 .with_default_env()
961 .server_addr(([0, 0, 0, 0], console_subscriber::Server::DEFAULT_PORT))
962 .spawn();
963 let tokio_console_target = tracing_subscriber::filter::Targets::new()
964 .with_default(LevelFilter::ERROR)
965 .with_target("runtime", LevelFilter::TRACE)
966 .with_target("tokio", LevelFilter::TRACE);
967 let l = fmt::layer()
968 .with_ansi(!disable_ansi_logging())
969 .event_format(fmt::format().compact().with_timer(TimeFormatter::new()))
970 .with_writer(std::io::stderr)
971 .with_filter(filters(load_config()));
972 tracing_subscriber::registry()
973 .with(l)
974 .with(tokio_console_layer.with_filter(tokio_console_target))
975 .init();
976 Ok(())
977}
978
979#[cfg(not(feature = "tokio-console"))]
980fn setup_logging() -> Result<(), Box<dyn std::error::Error>> {
981 let fmt_filter_layer = filters(load_config());
982 let trace_filter_layer = filters(load_config());
983 let otel_filter_layer = filters(load_config());
984 let otel_logs_filter_layer = filters(load_config());
985
986 if jsonl_logging_enabled() {
987 let span_events = if span_events_enabled() {
988 FmtSpan::CLOSE
989 } else {
990 FmtSpan::NONE
991 };
992 let l = fmt::layer()
993 .with_ansi(false)
994 .with_span_events(span_events)
995 .event_format(CustomJsonFormatter::new())
996 .with_writer(std::io::stderr)
997 .with_filter(fmt_filter_layer);
998
999 let service_name = get_service_name();
1001
1002 let (tracer_provider, logger_provider_opt, endpoint_opt) = if otlp_exporter_enabled() {
1004 let traces_endpoint =
1006 std::env::var(env_logging::otlp::OTEL_EXPORTER_OTLP_TRACES_ENDPOINT)
1007 .unwrap_or_else(|_| DEFAULT_OTLP_ENDPOINT.to_string());
1008 let logs_endpoint = std::env::var(env_logging::otlp::OTEL_EXPORTER_OTLP_LOGS_ENDPOINT)
1009 .unwrap_or_else(|_| traces_endpoint.clone());
1010
1011 let resource = opentelemetry_sdk::Resource::builder_empty()
1012 .with_service_name(service_name.clone())
1013 .build();
1014
1015 let span_exporter = opentelemetry_otlp::SpanExporter::builder()
1017 .with_tonic()
1018 .with_endpoint(&traces_endpoint)
1019 .build()?;
1020
1021 let tracer_provider = opentelemetry_sdk::trace::SdkTracerProvider::builder()
1022 .with_batch_exporter(span_exporter)
1023 .with_resource(resource.clone())
1024 .build();
1025
1026 let log_exporter = opentelemetry_otlp::LogExporter::builder()
1028 .with_tonic()
1029 .with_endpoint(&logs_endpoint)
1030 .build()?;
1031
1032 let logger_provider = SdkLoggerProvider::builder()
1033 .with_batch_exporter(log_exporter)
1034 .with_resource(resource)
1035 .build();
1036
1037 (
1038 tracer_provider,
1039 Some(logger_provider),
1040 Some(traces_endpoint),
1041 )
1042 } else {
1043 let provider = opentelemetry_sdk::trace::SdkTracerProvider::builder()
1045 .with_resource(
1046 opentelemetry_sdk::Resource::builder_empty()
1047 .with_service_name(service_name.clone())
1048 .build(),
1049 )
1050 .build();
1051
1052 (provider, None, None)
1053 };
1054
1055 opentelemetry::global::set_tracer_provider(tracer_provider.clone());
1062
1063 let tracer = tracer_provider.tracer(service_name.clone());
1065
1066 let otel_logs_layer = logger_provider_opt
1068 .as_ref()
1069 .map(|lp| OpenTelemetryTracingBridge::new(lp).with_filter(otel_logs_filter_layer));
1070
1071 tracing_subscriber::registry()
1072 .with(
1073 tracing_opentelemetry::layer()
1074 .with_tracer(tracer)
1075 .with_filter(otel_filter_layer),
1076 )
1077 .with(otel_logs_layer)
1078 .with(DistributedTraceIdLayer.with_filter(trace_filter_layer))
1079 .with(l)
1080 .init();
1081
1082 if let Some(endpoint) = endpoint_opt {
1084 tracing::info!(
1085 endpoint = %endpoint,
1086 service = %service_name,
1087 "OpenTelemetry OTLP export enabled (traces and logs)"
1088 );
1089 } else {
1090 tracing::info!(
1091 service = %service_name,
1092 "OpenTelemetry OTLP export disabled, traces local only"
1093 );
1094 }
1095 } else {
1096 if otlp_exporter_enabled() {
1100 eprintln!(
1101 "WARNING: OTEL_EXPORT_ENABLED=1 has no effect without DYN_LOGGING_JSONL=1. \
1102 OTel layers and OTLP exporter are not installed. Set DYN_LOGGING_JSONL=1 \
1103 to enable trace/log export."
1104 );
1105 }
1106 let l = fmt::layer()
1107 .with_ansi(!disable_ansi_logging())
1108 .event_format(fmt::format().compact().with_timer(TimeFormatter::new()))
1109 .with_writer(std::io::stderr)
1110 .with_filter(fmt_filter_layer);
1111
1112 tracing_subscriber::registry().with(l).init();
1113 }
1114
1115 Ok(())
1116}
1117
1118fn filters(config: LoggingConfig) -> EnvFilter {
1119 let mut filter_layer = EnvFilter::builder()
1120 .with_default_directive(config.log_level.parse().unwrap())
1121 .with_env_var(env_logging::DYN_LOG)
1122 .from_env_lossy();
1123
1124 for (module, level) in config.log_filters {
1125 match format!("{module}={level}").parse::<Directive>() {
1126 Ok(d) => {
1127 filter_layer = filter_layer.add_directive(d);
1128 }
1129 Err(e) => {
1130 eprintln!("Failed parsing filter '{level}' for module '{module}': {e}");
1131 }
1132 }
1133 }
1134
1135 if span_events_enabled() {
1138 filter_layer = filter_layer.add_directive("span_event=trace".parse().unwrap());
1139 }
1140
1141 filter_layer = filter_layer.add_directive("request_span=trace".parse().unwrap());
1146
1147 filter_layer
1148}
1149
1150pub fn log_message(level: &str, message: &str, module: &str, file: &str, line: u32) {
1153 let level = match level {
1154 "debug" => log::Level::Debug,
1155 "info" => log::Level::Info,
1156 "warn" => log::Level::Warn,
1157 "error" => log::Level::Error,
1158 "warning" => log::Level::Warn,
1159 _ => log::Level::Info,
1160 };
1161 log::logger().log(
1162 &log::Record::builder()
1163 .args(format_args!("{}", message))
1164 .level(level)
1165 .target(module)
1166 .file(Some(file))
1167 .line(Some(line))
1168 .build(),
1169 );
1170}
1171
1172fn load_config() -> LoggingConfig {
1173 let config_path =
1174 std::env::var(env_logging::DYN_LOGGING_CONFIG_PATH).unwrap_or_else(|_| "".to_string());
1175 let figment = Figment::new()
1176 .merge(Serialized::defaults(LoggingConfig::default()))
1177 .merge(Toml::file("/opt/dynamo/etc/logging.toml"))
1178 .merge(Toml::file(config_path));
1179
1180 figment.extract().unwrap()
1181}
1182
1183#[derive(Serialize)]
1184struct JsonLog<'a> {
1185 time: String,
1186 level: String,
1187 #[serde(skip_serializing_if = "Option::is_none")]
1188 file: Option<&'a str>,
1189 #[serde(skip_serializing_if = "Option::is_none")]
1190 line: Option<u32>,
1191 target: String,
1192 message: serde_json::Value,
1193 #[serde(flatten)]
1194 fields: BTreeMap<String, serde_json::Value>,
1195}
1196
1197struct TimeFormatter {
1198 use_local_tz: bool,
1199}
1200
1201impl TimeFormatter {
1202 fn new() -> Self {
1203 Self {
1204 use_local_tz: crate::config::use_local_timezone(),
1205 }
1206 }
1207
1208 fn format_now(&self) -> String {
1209 if self.use_local_tz {
1210 chrono::Local::now()
1211 .format("%Y-%m-%dT%H:%M:%S%.6f%:z")
1212 .to_string()
1213 } else {
1214 chrono::Utc::now()
1215 .format("%Y-%m-%dT%H:%M:%S%.6fZ")
1216 .to_string()
1217 }
1218 }
1219}
1220
1221impl FormatTime for TimeFormatter {
1222 fn format_time(&self, w: &mut fmt::format::Writer<'_>) -> std::fmt::Result {
1223 write!(w, "{}", self.format_now())
1224 }
1225}
1226
1227struct CustomJsonFormatter {
1228 time_formatter: TimeFormatter,
1229}
1230
1231impl CustomJsonFormatter {
1232 fn new() -> Self {
1233 Self {
1234 time_formatter: TimeFormatter::new(),
1235 }
1236 }
1237}
1238
1239use once_cell::sync::Lazy;
1240use regex::Regex;
1241
1242static TRACE_PROPAGATOR: Lazy<opentelemetry_sdk::propagation::TraceContextPropagator> =
1244 Lazy::new(opentelemetry_sdk::propagation::TraceContextPropagator::new);
1245
1246fn parse_tracing_duration(s: &str) -> Option<u64> {
1247 static RE: Lazy<Regex> =
1248 Lazy::new(|| Regex::new(r#"^["']?\s*([0-9.]+)\s*(µs|us|ns|ms|s)\s*["']?$"#).unwrap());
1249 let captures = RE.captures(s)?;
1250 let value: f64 = captures[1].parse().ok()?;
1251 let unit = &captures[2];
1252 match unit {
1253 "ns" => Some((value / 1000.0) as u64),
1254 "µs" | "us" => Some(value as u64),
1255 "ms" => Some((value * 1000.0) as u64),
1256 "s" => Some((value * 1_000_000.0) as u64),
1257 _ => None,
1258 }
1259}
1260
1261impl<S, N> tracing_subscriber::fmt::FormatEvent<S, N> for CustomJsonFormatter
1262where
1263 S: Subscriber + for<'a> LookupSpan<'a>,
1264 N: for<'a> FormatFields<'a> + 'static,
1265{
1266 fn format_event(
1267 &self,
1268 ctx: &FmtContext<'_, S, N>,
1269 mut writer: Writer<'_>,
1270 event: &Event<'_>,
1271 ) -> std::fmt::Result {
1272 let mut visitor = JsonVisitor::default();
1273 let time = self.time_formatter.format_now();
1274 event.record(&mut visitor);
1275 let mut message = visitor
1276 .fields
1277 .remove("message")
1278 .unwrap_or(serde_json::Value::String("".to_string()));
1279
1280 let mut target_override: Option<String> = None;
1281
1282 let current_span = event
1283 .parent()
1284 .and_then(|id| ctx.span(id))
1285 .or_else(|| ctx.lookup_current());
1286 if let Some(span) = current_span {
1287 let ext = span.extensions();
1288 let data = ext.get::<FormattedFields<N>>().unwrap();
1289 let span_fields: Vec<(&str, &str)> = data
1290 .fields
1291 .split(' ')
1292 .filter_map(|entry| entry.split_once('='))
1293 .collect();
1294 for (name, value) in span_fields {
1295 visitor.fields.insert(
1296 name.to_string(),
1297 serde_json::Value::String(value.trim_matches('"').to_string()),
1298 );
1299 }
1300
1301 let busy_us = visitor
1302 .fields
1303 .remove("time.busy")
1304 .and_then(|v| parse_tracing_duration(&v.to_string()));
1305 let idle_us = visitor
1306 .fields
1307 .remove("time.idle")
1308 .and_then(|v| parse_tracing_duration(&v.to_string()));
1309
1310 if let (Some(busy_us), Some(idle_us)) = (busy_us, idle_us) {
1311 visitor.fields.insert(
1312 "time.busy_us".to_string(),
1313 serde_json::Value::Number(busy_us.into()),
1314 );
1315 visitor.fields.insert(
1316 "time.idle_us".to_string(),
1317 serde_json::Value::Number(idle_us.into()),
1318 );
1319 visitor.fields.insert(
1320 "time.duration_us".to_string(),
1321 serde_json::Value::Number((busy_us + idle_us).into()),
1322 );
1323 }
1324
1325 let is_span_created = message.as_str() == Some("SPAN_FIRST_ENTRY");
1326 let is_span_closed = message.as_str() == Some("close");
1327 if is_span_created || is_span_closed {
1328 target_override = Some(span.metadata().target().to_string());
1329 if is_span_closed {
1330 message = serde_json::Value::String("SPAN_CLOSED".to_string());
1331 }
1332 }
1333
1334 visitor.fields.insert(
1335 "span_name".to_string(),
1336 serde_json::Value::String(span.name().to_string()),
1337 );
1338
1339 if let Some(tracing_context) = ext.get::<DistributedTraceContext>() {
1340 visitor.fields.insert(
1341 "span_id".to_string(),
1342 serde_json::Value::String(tracing_context.span_id.clone()),
1343 );
1344 visitor.fields.insert(
1345 "trace_id".to_string(),
1346 serde_json::Value::String(tracing_context.trace_id.clone()),
1347 );
1348 if let Some(parent_id) = tracing_context.parent_id.clone() {
1349 visitor.fields.insert(
1350 "parent_id".to_string(),
1351 serde_json::Value::String(parent_id),
1352 );
1353 } else {
1354 visitor.fields.remove("parent_id");
1355 }
1356 if let Some(tracestate) = tracing_context.tracestate.clone() {
1357 visitor.fields.insert(
1358 "tracestate".to_string(),
1359 serde_json::Value::String(tracestate),
1360 );
1361 } else {
1362 visitor.fields.remove("tracestate");
1363 }
1364 if let Some(x_request_id) = tracing_context.x_request_id.clone() {
1365 visitor.fields.insert(
1366 "x_request_id".to_string(),
1367 serde_json::Value::String(x_request_id),
1368 );
1369 } else {
1370 visitor.fields.remove("x_request_id");
1371 }
1372
1373 if let Some(request_id) = tracing_context.request_id.clone() {
1374 visitor.fields.insert(
1375 "request_id".to_string(),
1376 serde_json::Value::String(request_id),
1377 );
1378 } else {
1379 visitor.fields.remove("request_id");
1380 }
1381 visitor.fields.remove("x_dynamo_request_id");
1383 } else {
1384 tracing::error!(
1385 "Distributed Trace Context not found, falling back to internal ids"
1386 );
1387 visitor.fields.insert(
1388 "span_id".to_string(),
1389 serde_json::Value::String(span.id().into_u64().to_string()),
1390 );
1391 if let Some(parent) = span.parent() {
1392 visitor.fields.insert(
1393 "parent_id".to_string(),
1394 serde_json::Value::String(parent.id().into_u64().to_string()),
1395 );
1396 }
1397 }
1398 } else {
1399 let reserved_fields = [
1400 "trace_id",
1401 "span_id",
1402 "parent_id",
1403 "span_name",
1404 "tracestate",
1405 ];
1406 for reserved_field in reserved_fields {
1407 visitor.fields.remove(reserved_field);
1408 }
1409 }
1410 let metadata = event.metadata();
1411 let log = JsonLog {
1412 level: metadata.level().to_string(),
1413 time,
1414 file: metadata.file(),
1415 line: metadata.line(),
1416 target: target_override.unwrap_or_else(|| metadata.target().to_string()),
1417 message,
1418 fields: visitor.fields,
1419 };
1420 let json = serde_json::to_string(&log).unwrap();
1421 writeln!(writer, "{json}")
1422 }
1423}
1424
1425#[derive(Default)]
1426struct JsonVisitor {
1427 fields: BTreeMap<String, serde_json::Value>,
1428}
1429
1430impl tracing::field::Visit for JsonVisitor {
1431 fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
1432 self.fields.insert(
1433 field.name().to_string(),
1434 serde_json::Value::String(format!("{value:?}")),
1435 );
1436 }
1437
1438 fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
1439 if field.name() != "message" {
1440 match serde_json::from_str::<Value>(value) {
1441 Ok(json_val) => self.fields.insert(field.name().to_string(), json_val),
1442 Err(_) => self.fields.insert(field.name().to_string(), value.into()),
1443 };
1444 } else {
1445 self.fields.insert(field.name().to_string(), value.into());
1446 }
1447 }
1448
1449 fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
1450 self.fields
1451 .insert(field.name().to_string(), serde_json::Value::Bool(value));
1452 }
1453
1454 fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
1455 self.fields.insert(
1456 field.name().to_string(),
1457 serde_json::Value::Number(value.into()),
1458 );
1459 }
1460
1461 fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
1462 self.fields.insert(
1463 field.name().to_string(),
1464 serde_json::Value::Number(value.into()),
1465 );
1466 }
1467
1468 fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
1469 use serde_json::value::Number;
1470 self.fields.insert(
1471 field.name().to_string(),
1472 serde_json::Value::Number(Number::from_f64(value).unwrap_or(0.into())),
1473 );
1474 }
1475}
1476
1477#[cfg(test)]
1478pub mod tests {
1479 use super::*;
1480 use anyhow::{Result, anyhow};
1481 use chrono::{DateTime, Utc};
1482 use jsonschema::{Draft, JSONSchema};
1483 use serde_json::Value;
1484 use std::fs::File;
1485 use std::io::{BufRead, BufReader};
1486 use stdio_override::*;
1487 use tempfile::NamedTempFile;
1488
1489 static LOG_LINE_SCHEMA: &str = r#"
1490 {
1491 "$schema": "http://json-schema.org/draft-07/schema#",
1492 "title": "Runtime Log Line",
1493 "type": "object",
1494 "required": [
1495 "file",
1496 "level",
1497 "line",
1498 "message",
1499 "target",
1500 "time"
1501 ],
1502 "properties": {
1503 "file": { "type": "string" },
1504 "level": { "type": "string", "enum": ["ERROR", "WARN", "INFO", "DEBUG", "TRACE"] },
1505 "line": { "type": "integer" },
1506 "message": { "type": "string" },
1507 "target": { "type": "string" },
1508 "time": { "type": "string", "format": "date-time" },
1509 "span_id": { "type": "string", "pattern": "^[a-f0-9]{16}$" },
1510 "parent_id": { "type": "string", "pattern": "^[a-f0-9]{16}$" },
1511 "trace_id": { "type": "string", "pattern": "^[a-f0-9]{32}$" },
1512 "span_name": { "type": "string" },
1513 "time.busy_us": { "type": "integer" },
1514 "time.duration_us": { "type": "integer" },
1515 "time.idle_us": { "type": "integer" },
1516 "tracestate": { "type": "string" }
1517 },
1518 "additionalProperties": true
1519 }
1520 "#;
1521
1522 #[tracing::instrument(skip_all)]
1523 async fn parent() {
1524 tracing::trace!(message = "parent!");
1525 if let Some(my_ctx) = get_distributed_tracing_context() {
1526 tracing::info!(my_trace_id = my_ctx.trace_id);
1527 }
1528 child().await;
1529 }
1530
1531 #[tracing::instrument(skip_all)]
1532 async fn child() {
1533 tracing::trace!(message = "child");
1534 if let Some(my_ctx) = get_distributed_tracing_context() {
1535 tracing::info!(my_trace_id = my_ctx.trace_id);
1536 }
1537 grandchild().await;
1538 }
1539
1540 #[tracing::instrument(skip_all)]
1541 async fn grandchild() {
1542 tracing::trace!(message = "grandchild");
1543 if let Some(my_ctx) = get_distributed_tracing_context() {
1544 tracing::info!(my_trace_id = my_ctx.trace_id);
1545 }
1546 }
1547
1548 pub fn load_log(file_name: &str) -> Result<Vec<serde_json::Value>> {
1549 let schema_json: Value =
1550 serde_json::from_str(LOG_LINE_SCHEMA).expect("schema parse failure");
1551 let compiled_schema = JSONSchema::options()
1552 .with_draft(Draft::Draft7)
1553 .compile(&schema_json)
1554 .expect("Invalid schema");
1555
1556 let f = File::open(file_name)?;
1557 let reader = BufReader::new(f);
1558 let mut result = Vec::new();
1559
1560 for (line_num, line) in reader.lines().enumerate() {
1561 let line = line?;
1562 let val: Value = serde_json::from_str(&line)
1563 .map_err(|e| anyhow!("Line {}: invalid JSON: {}", line_num + 1, e))?;
1564
1565 if let Err(errors) = compiled_schema.validate(&val) {
1566 let errs = errors.map(|e| e.to_string()).collect::<Vec<_>>().join("; ");
1567 return Err(anyhow!(
1568 "Line {}: JSON Schema Validation errors: {}",
1569 line_num + 1,
1570 errs
1571 ));
1572 }
1573 println!("{}", val);
1574 result.push(val);
1575 }
1576 Ok(result)
1577 }
1578
1579 #[tokio::test]
1580 async fn test_json_log_capture() -> Result<()> {
1581 #[allow(clippy::redundant_closure_call)]
1582 let _ = temp_env::async_with_vars(
1583 [(env_logging::DYN_LOGGING_JSONL, Some("1"))],
1584 (async || {
1585 let tmp_file = NamedTempFile::new().unwrap();
1586 let file_name = tmp_file.path().to_str().unwrap();
1587 let guard = StderrOverride::from_file(file_name)?;
1588 init();
1589 parent().await;
1590 drop(guard);
1591
1592 let lines = load_log(file_name)?;
1593
1594 let Some(trace_id) = lines
1602 .iter()
1603 .find_map(|log_line| log_line.get("trace_id").and_then(|v| v.as_str()))
1604 .map(|s| s.to_string())
1605 else {
1606 return Ok(());
1608 };
1609
1610 assert_ne!(
1612 trace_id, "00000000000000000000000000000000",
1613 "trace_id should not be a zero/invalid ID"
1614 );
1615 assert!(
1616 !trace_id.chars().all(|c| c == '0'),
1617 "trace_id should not be all zeros"
1618 );
1619
1620 for log_line in &lines {
1622 if let Some(line_trace_id) = log_line.get("trace_id") {
1623 assert_eq!(
1624 line_trace_id.as_str().unwrap(),
1625 &trace_id,
1626 "All logs should have the same trace_id"
1627 );
1628 }
1629 }
1630
1631 for log_line in &lines {
1633 if let Some(my_trace_id) = log_line.get("my_trace_id") {
1634 assert_eq!(
1635 my_trace_id,
1636 &serde_json::Value::String(trace_id.clone()),
1637 "my_trace_id should match the trace_id from distributed tracing context"
1638 );
1639 }
1640 }
1641
1642 let mut span_ids_seen: std::collections::HashSet<String> = std::collections::HashSet::new();
1644 let mut span_timestamps: std::collections::HashMap<String, DateTime<Utc>> = std::collections::HashMap::new();
1645
1646 for log_line in &lines {
1647 if let Some(span_id) = log_line.get("span_id") {
1648 let span_id_str = span_id.as_str().unwrap();
1649 assert!(
1650 is_valid_span_id(span_id_str),
1651 "Invalid span_id format: {}",
1652 span_id_str
1653 );
1654 span_ids_seen.insert(span_id_str.to_string());
1655 }
1656
1657 if let Some(time_str) = log_line.get("time").and_then(|v| v.as_str()) {
1659 let timestamp = DateTime::parse_from_rfc3339(time_str)
1660 .expect("All timestamps should be valid RFC3339 format")
1661 .with_timezone(&Utc);
1662
1663 if let Some(span_name) = log_line.get("span_name").and_then(|v| v.as_str()) {
1665 span_timestamps.insert(span_name.to_string(), timestamp);
1666 }
1667 }
1668 }
1669
1670 let parent_span_id = lines
1673 .iter()
1674 .find(|log_line| {
1675 log_line.get("span_name")
1676 .and_then(|v| v.as_str()) == Some("parent")
1677 })
1678 .and_then(|log_line| {
1679 log_line.get("span_id")
1680 .and_then(|v| v.as_str())
1681 .map(|s| s.to_string())
1682 })
1683 .expect("Should find parent span with span_id");
1684
1685 let child_span_id = lines
1686 .iter()
1687 .find(|log_line| {
1688 log_line.get("span_name")
1689 .and_then(|v| v.as_str()) == Some("child")
1690 })
1691 .and_then(|log_line| {
1692 log_line.get("span_id")
1693 .and_then(|v| v.as_str())
1694 .map(|s| s.to_string())
1695 })
1696 .expect("Should find child span with span_id");
1697
1698 let grandchild_span_id = lines
1699 .iter()
1700 .find(|log_line| {
1701 log_line.get("span_name")
1702 .and_then(|v| v.as_str()) == Some("grandchild")
1703 })
1704 .and_then(|log_line| {
1705 log_line.get("span_id")
1706 .and_then(|v| v.as_str())
1707 .map(|s| s.to_string())
1708 })
1709 .expect("Should find grandchild span with span_id");
1710
1711 assert_ne!(parent_span_id, child_span_id, "Parent and child should have different span IDs");
1713 assert_ne!(child_span_id, grandchild_span_id, "Child and grandchild should have different span IDs");
1714 assert_ne!(parent_span_id, grandchild_span_id, "Parent and grandchild should have different span IDs");
1715
1716 for log_line in &lines {
1718 if let Some(span_name) = log_line.get("span_name")
1719 && let Some(span_name_str) = span_name.as_str()
1720 && span_name_str == "parent"
1721 {
1722 assert!(
1723 log_line.get("parent_id").is_none(),
1724 "Parent span should not have a parent_id"
1725 );
1726 }
1727 }
1728
1729 for log_line in &lines {
1731 if let Some(span_name) = log_line.get("span_name")
1732 && let Some(span_name_str) = span_name.as_str()
1733 && span_name_str == "child"
1734 {
1735 let parent_id = log_line.get("parent_id")
1736 .and_then(|v| v.as_str())
1737 .expect("Child span should have a parent_id");
1738 assert_eq!(
1739 parent_id,
1740 parent_span_id,
1741 "Child's parent_id should match parent's span_id"
1742 );
1743 }
1744 }
1745
1746 for log_line in &lines {
1748 if let Some(span_name) = log_line.get("span_name")
1749 && let Some(span_name_str) = span_name.as_str()
1750 && span_name_str == "grandchild"
1751 {
1752 let parent_id = log_line.get("parent_id")
1753 .and_then(|v| v.as_str())
1754 .expect("Grandchild span should have a parent_id");
1755 assert_eq!(
1756 parent_id,
1757 child_span_id,
1758 "Grandchild's parent_id should match child's span_id"
1759 );
1760 }
1761 }
1762
1763 let parent_time = span_timestamps.get("parent")
1765 .expect("Should have timestamp for parent span");
1766 let child_time = span_timestamps.get("child")
1767 .expect("Should have timestamp for child span");
1768 let grandchild_time = span_timestamps.get("grandchild")
1769 .expect("Should have timestamp for grandchild span");
1770
1771 assert!(
1773 parent_time <= child_time,
1774 "Parent span should log before or at same time as child span (parent: {}, child: {})",
1775 parent_time,
1776 child_time
1777 );
1778 assert!(
1779 child_time <= grandchild_time,
1780 "Child span should log before or at same time as grandchild span (child: {}, grandchild: {})",
1781 child_time,
1782 grandchild_time
1783 );
1784
1785 Ok::<(), anyhow::Error>(())
1786 })(),
1787 )
1788 .await;
1789 Ok(())
1790 }
1791
1792 #[tracing::instrument(level = "debug", skip_all)]
1794 async fn debug_level_span() {
1795 tracing::debug!("inside debug span");
1796 }
1797
1798 #[tracing::instrument(level = "info", skip_all)]
1799 async fn info_level_span() {
1800 tracing::info!("inside info span");
1801 }
1802
1803 #[tracing::instrument(level = "warn", skip_all)]
1804 async fn warn_level_span() {
1805 tracing::warn!("inside warn span");
1806 }
1807
1808 #[tracing::instrument(level = "info", target = "other_module", skip_all)]
1811 async fn other_target_info_span() {
1812 tracing::info!(target: "other_module", "inside other target span");
1813 }
1814
1815 #[test]
1826 fn test_span_events() {
1827 use std::process::Command;
1828
1829 let output = Command::new("cargo")
1831 .args([
1832 "test",
1833 "-p",
1834 "dynamo-runtime",
1835 "test_span_events_subprocess",
1836 "--",
1837 "--exact",
1838 "--nocapture",
1839 ])
1840 .env("DYN_LOGGING_JSONL", "1")
1841 .env("DYN_LOGGING_SPAN_EVENTS", "1")
1842 .env("DYN_LOG", "warn,dynamo_runtime::logging::tests=debug")
1843 .output()
1844 .expect("Failed to execute subprocess test");
1845
1846 if !output.status.success() {
1848 eprintln!(
1849 "=== STDOUT ===\n{}",
1850 String::from_utf8_lossy(&output.stdout)
1851 );
1852 eprintln!(
1853 "=== STDERR ===\n{}",
1854 String::from_utf8_lossy(&output.stderr)
1855 );
1856 }
1857
1858 assert!(
1859 output.status.success(),
1860 "Subprocess test failed with exit code: {:?}",
1861 output.status.code()
1862 );
1863 }
1864
1865 #[tokio::test]
1868 async fn test_span_events_subprocess() -> Result<()> {
1869 if std::env::var("DYN_LOGGING_SPAN_EVENTS").is_err() {
1871 return Ok(());
1872 }
1873
1874 let tmp_file = NamedTempFile::new().unwrap();
1875 let file_name = tmp_file.path().to_str().unwrap();
1876 let guard = StderrOverride::from_file(file_name)?;
1877 init();
1878
1879 parent().await;
1881
1882 debug_level_span().await;
1884 info_level_span().await;
1885 warn_level_span().await;
1886
1887 other_target_info_span().await;
1889
1890 drop(guard);
1891
1892 let lines = load_log(file_name)?;
1893
1894 let has_span_event = |msg: &str, span_name: &str| {
1896 lines.iter().any(|log| {
1897 log.get("message").and_then(|v| v.as_str()) == Some(msg)
1898 && log.get("span_name").and_then(|v| v.as_str()) == Some(span_name)
1899 })
1900 };
1901
1902 let get_span_events = |msg: &str| -> Vec<&serde_json::Value> {
1904 lines
1905 .iter()
1906 .filter(|log| log.get("message").and_then(|v| v.as_str()) == Some(msg))
1907 .collect()
1908 };
1909
1910 let span_created_events = get_span_events("SPAN_FIRST_ENTRY");
1912 for event in &span_created_events {
1913 assert!(
1915 event.get("span_name").is_some(),
1916 "SPAN_FIRST_ENTRY must have span_name"
1917 );
1918 let trace_id = event
1920 .get("trace_id")
1921 .and_then(|v| v.as_str())
1922 .expect("SPAN_FIRST_ENTRY must have trace_id");
1923 assert!(
1924 trace_id.len() == 32 && trace_id.chars().all(|c| c.is_ascii_hexdigit()),
1925 "SPAN_FIRST_ENTRY must have valid trace_id format"
1926 );
1927 let span_id = event
1929 .get("span_id")
1930 .and_then(|v| v.as_str())
1931 .expect("SPAN_FIRST_ENTRY must have span_id");
1932 assert!(
1933 is_valid_span_id(span_id),
1934 "SPAN_FIRST_ENTRY must have valid span_id"
1935 );
1936 }
1937
1938 let span_closed_events = get_span_events("SPAN_CLOSED");
1940 for event in &span_closed_events {
1941 assert!(
1942 event.get("span_name").is_some(),
1943 "SPAN_CLOSED must have span_name"
1944 );
1945 assert!(
1946 event.get("time.busy_us").is_some()
1947 || event.get("time.idle_us").is_some()
1948 || event.get("time.duration_us").is_some(),
1949 "SPAN_CLOSED must have timing information"
1950 );
1951 let trace_id = event
1953 .get("trace_id")
1954 .and_then(|v| v.as_str())
1955 .expect("SPAN_CLOSED must have trace_id");
1956 assert!(
1957 trace_id.len() == 32 && trace_id.chars().all(|c| c.is_ascii_hexdigit()),
1958 "SPAN_CLOSED must have valid trace_id format"
1959 );
1960 }
1961
1962 assert!(
1966 has_span_event("SPAN_FIRST_ENTRY", "debug_level_span"),
1967 "DEBUG span from allowed target MUST pass (target=debug filter)"
1968 );
1969 assert!(
1970 has_span_event("SPAN_FIRST_ENTRY", "info_level_span"),
1971 "INFO span from allowed target MUST pass (target=debug filter)"
1972 );
1973 assert!(
1974 has_span_event("SPAN_FIRST_ENTRY", "warn_level_span"),
1975 "WARN span from allowed target MUST pass (target=debug filter)"
1976 );
1977
1978 assert!(
1980 has_span_event("SPAN_FIRST_ENTRY", "parent"),
1981 "parent span (INFO) from allowed target MUST pass"
1982 );
1983 assert!(
1984 has_span_event("SPAN_FIRST_ENTRY", "child"),
1985 "child span (INFO) from allowed target MUST pass"
1986 );
1987 assert!(
1988 has_span_event("SPAN_FIRST_ENTRY", "grandchild"),
1989 "grandchild span (INFO) from allowed target MUST pass"
1990 );
1991
1992 assert!(
1995 !has_span_event("SPAN_FIRST_ENTRY", "other_target_info_span"),
1996 "INFO span from non-allowed target (other_module) MUST be filtered out"
1997 );
1998
1999 for event in &span_created_events {
2001 let target = event.get("target").and_then(|v| v.as_str()).unwrap_or("");
2002 let level = event.get("level").and_then(|v| v.as_str()).unwrap_or("");
2003
2004 if level == "DEBUG" || level == "INFO" {
2006 assert!(
2007 target.contains("dynamo_runtime::logging::tests"),
2008 "DEBUG/INFO span must be from allowed target, got target={target}"
2009 );
2010 }
2011 }
2012
2013 Ok(())
2014 }
2015}