1use std::collections::{BTreeMap, HashMap};
30use std::sync::Once;
31
32use figment::{
33 Figment,
34 providers::{Format, Serialized, Toml},
35};
36use serde::{Deserialize, Serialize};
37use tracing::level_filters::LevelFilter;
38use tracing::{Event, Subscriber};
39use tracing_subscriber::EnvFilter;
40use tracing_subscriber::fmt::time::FormatTime;
41use tracing_subscriber::fmt::time::LocalTime;
42use tracing_subscriber::fmt::time::SystemTime;
43use tracing_subscriber::fmt::time::UtcTime;
44use tracing_subscriber::fmt::{FmtContext, FormatFields};
45use tracing_subscriber::fmt::{FormattedFields, format::Writer};
46use tracing_subscriber::prelude::*;
47use tracing_subscriber::registry::LookupSpan;
48use tracing_subscriber::{filter::Directive, fmt};
49
50use crate::config::{disable_ansi_logging, jsonl_logging_enabled};
51use async_nats::{HeaderMap, HeaderValue};
52use axum::extract::FromRequestParts;
53use axum::http;
54use axum::http::Request;
55use axum::http::request::Parts;
56use serde_json::Value;
57use std::convert::Infallible;
58use std::time::Instant;
59use tower_http::trace::{DefaultMakeSpan, TraceLayer};
60use tracing::Id;
61use tracing::Span;
62use tracing::field::Field;
63use tracing::span;
64use tracing_subscriber::Layer;
65use tracing_subscriber::Registry;
66use tracing_subscriber::field::Visit;
67use tracing_subscriber::fmt::format::FmtSpan;
68use tracing_subscriber::layer::Context;
69use tracing_subscriber::registry::SpanData;
70use uuid::Uuid;
71
72use opentelemetry::propagation::{Extractor, Injector, TextMapPropagator};
73use opentelemetry::trace::TraceContextExt;
74use opentelemetry::{global, trace::Tracer};
75use opentelemetry_otlp::WithExportConfig;
76
77use opentelemetry::trace::TracerProvider as _;
78use opentelemetry::{Key, KeyValue};
79use opentelemetry_sdk::Resource;
80use opentelemetry_sdk::trace::SdkTracerProvider;
81use tracing::error;
82use tracing_subscriber::layer::SubscriberExt;
83use std::time::Duration;
86use tracing::{info, instrument};
87use tracing_opentelemetry::OpenTelemetrySpanExt;
88use tracing_subscriber::util::SubscriberInitExt;
89
90use crate::config::environment_names::logging as env_logging;
91
92use dynamo_config::env_is_truthy;
93
94const DEFAULT_FILTER_LEVEL: &str = "info";
96
97const DEFAULT_OTLP_ENDPOINT: &str = "http://localhost:4317";
99
100const DEFAULT_OTEL_SERVICE_NAME: &str = "dynamo";
102
103static INIT: Once = Once::new();
105
106#[derive(Serialize, Deserialize, Debug)]
107struct LoggingConfig {
108 log_level: String,
109 log_filters: HashMap<String, String>,
110}
111impl Default for LoggingConfig {
112 fn default() -> Self {
113 LoggingConfig {
114 log_level: DEFAULT_FILTER_LEVEL.to_string(),
115 log_filters: HashMap::from([
116 ("h2".to_string(), "error".to_string()),
117 ("tower".to_string(), "error".to_string()),
118 ("hyper_util".to_string(), "error".to_string()),
119 ("neli".to_string(), "error".to_string()),
120 ("async_nats".to_string(), "error".to_string()),
121 ("rustls".to_string(), "error".to_string()),
122 ("tokenizers".to_string(), "error".to_string()),
123 ("axum".to_string(), "error".to_string()),
124 ("tonic".to_string(), "error".to_string()),
125 ("mistralrs_core".to_string(), "error".to_string()),
126 ("hf_hub".to_string(), "error".to_string()),
127 ("opentelemetry".to_string(), "error".to_string()),
128 ("opentelemetry-otlp".to_string(), "error".to_string()),
129 ("opentelemetry_sdk".to_string(), "error".to_string()),
130 ]),
131 }
132 }
133}
134
135fn otlp_exporter_enabled() -> bool {
137 env_is_truthy(env_logging::otlp::OTEL_EXPORT_ENABLED)
138}
139
140fn get_service_name() -> String {
142 std::env::var(env_logging::otlp::OTEL_SERVICE_NAME)
143 .unwrap_or_else(|_| DEFAULT_OTEL_SERVICE_NAME.to_string())
144}
145
146pub fn is_valid_trace_id(trace_id: &str) -> bool {
149 trace_id.len() == 32 && trace_id.chars().all(|c| c.is_ascii_hexdigit())
150}
151
152pub fn is_valid_span_id(span_id: &str) -> bool {
155 span_id.len() == 16 && span_id.chars().all(|c| c.is_ascii_hexdigit())
156}
157
158pub struct DistributedTraceIdLayer;
159
160#[derive(Debug, Clone, Serialize, Deserialize)]
161pub struct DistributedTraceContext {
162 pub trace_id: String,
163 pub span_id: String,
164 #[serde(skip_serializing_if = "Option::is_none")]
165 pub parent_id: Option<String>,
166 #[serde(skip_serializing_if = "Option::is_none")]
167 pub tracestate: Option<String>,
168 #[serde(skip)]
169 start: Option<Instant>,
170 #[serde(skip)]
171 end: Option<Instant>,
172 #[serde(skip_serializing_if = "Option::is_none")]
173 pub x_request_id: Option<String>,
174 #[serde(skip_serializing_if = "Option::is_none")]
175 pub x_dynamo_request_id: Option<String>,
176}
177
178#[derive(Debug, Clone)]
180struct PendingDistributedTraceContext {
181 trace_id: Option<String>,
182 span_id: Option<String>,
183 parent_id: Option<String>,
184 tracestate: Option<String>,
185 x_request_id: Option<String>,
186 x_dynamo_request_id: Option<String>,
187}
188
189impl DistributedTraceContext {
190 pub fn create_traceparent(&self) -> String {
192 format!("00-{}-{}-01", self.trace_id, self.span_id)
193 }
194}
195
196pub fn parse_traceparent(traceparent: &str) -> (Option<String>, Option<String>) {
198 let pieces: Vec<_> = traceparent.split('-').collect();
199 if pieces.len() != 4 {
200 return (None, None);
201 }
202 let trace_id = pieces[1];
203 let parent_id = pieces[2];
204
205 if !is_valid_trace_id(trace_id) || !is_valid_span_id(parent_id) {
206 return (None, None);
207 }
208
209 (Some(trace_id.to_string()), Some(parent_id.to_string()))
210}
211
212#[derive(Debug, Clone, Default)]
213pub struct TraceParent {
214 pub trace_id: Option<String>,
215 pub parent_id: Option<String>,
216 pub tracestate: Option<String>,
217 pub x_request_id: Option<String>,
218 pub x_dynamo_request_id: Option<String>,
219}
220
221pub trait GenericHeaders {
222 fn get(&self, key: &str) -> Option<&str>;
223}
224
225impl GenericHeaders for async_nats::HeaderMap {
226 fn get(&self, key: &str) -> Option<&str> {
227 async_nats::HeaderMap::get(self, key).map(|value| value.as_str())
228 }
229}
230
231impl GenericHeaders for http::HeaderMap {
232 fn get(&self, key: &str) -> Option<&str> {
233 http::HeaderMap::get(self, key).and_then(|value| value.to_str().ok())
234 }
235}
236
237impl TraceParent {
238 pub fn from_headers<H: GenericHeaders>(headers: &H) -> TraceParent {
239 let mut trace_id = None;
240 let mut parent_id = None;
241 let mut tracestate = None;
242 let mut x_request_id = None;
243 let mut x_dynamo_request_id = None;
244
245 if let Some(header_value) = headers.get("traceparent") {
246 (trace_id, parent_id) = parse_traceparent(header_value);
247 }
248
249 if let Some(header_value) = headers.get("x-request-id") {
250 x_request_id = Some(header_value.to_string());
251 }
252
253 if let Some(header_value) = headers.get("tracestate") {
254 tracestate = Some(header_value.to_string());
255 }
256
257 if let Some(header_value) = headers.get("x-dynamo-request-id") {
258 x_dynamo_request_id = Some(header_value.to_string());
259 }
260
261 let x_dynamo_request_id =
263 x_dynamo_request_id.filter(|id| uuid::Uuid::parse_str(id).is_ok());
264 TraceParent {
265 trace_id,
266 parent_id,
267 tracestate,
268 x_request_id,
269 x_dynamo_request_id,
270 }
271 }
272}
273
274pub fn make_request_span<B>(req: &Request<B>) -> Span {
276 let method = req.method();
277 let uri = req.uri();
278 let version = format!("{:?}", req.version());
279 let trace_parent = TraceParent::from_headers(req.headers());
280
281 let otel_context = extract_otel_context_from_http_headers(req.headers());
282
283 let span = tracing::info_span!(
284 "http-request",
285 method = %method,
286 uri = %uri,
287 version = %version,
288 trace_id = trace_parent.trace_id,
289 parent_id = trace_parent.parent_id,
290 x_request_id = trace_parent.x_request_id,
291 x_dynamo_request_id = trace_parent.x_dynamo_request_id,
292 );
293
294 if let Some(context) = otel_context {
295 let _ = span.set_parent(context);
296 }
297
298 span
299}
300
301fn extract_otel_context_from_http_headers(
303 headers: &http::HeaderMap,
304) -> Option<opentelemetry::Context> {
305 let traceparent_value = headers.get("traceparent")?.to_str().ok()?;
306
307 struct HttpHeaderExtractor<'a>(&'a http::HeaderMap);
308
309 impl<'a> Extractor for HttpHeaderExtractor<'a> {
310 fn get(&self, key: &str) -> Option<&str> {
311 self.0.get(key).and_then(|v| v.to_str().ok())
312 }
313
314 fn keys(&self) -> Vec<&str> {
315 vec!["traceparent", "tracestate"]
316 .into_iter()
317 .filter(|&key| self.0.get(key).is_some())
318 .collect()
319 }
320 }
321
322 if traceparent_value.is_empty() {
324 return None;
325 }
326
327 let extractor = HttpHeaderExtractor(headers);
328 let otel_context = TRACE_PROPAGATOR.extract(&extractor);
329
330 if otel_context.span().span_context().is_valid() {
331 Some(otel_context)
332 } else {
333 None
334 }
335}
336
337pub fn make_handle_payload_span(
339 headers: &async_nats::HeaderMap,
340 component: &str,
341 endpoint: &str,
342 namespace: &str,
343 instance_id: u64,
344) -> Span {
345 let (otel_context, trace_id, parent_span_id) = extract_otel_context_from_nats_headers(headers);
346 let trace_parent = TraceParent::from_headers(headers);
347
348 if let (Some(trace_id), Some(parent_id)) = (trace_id.as_ref(), parent_span_id.as_ref()) {
349 let span = tracing::info_span!(
350 "handle_payload",
351 trace_id = trace_id.as_str(),
352 parent_id = parent_id.as_str(),
353 x_request_id = trace_parent.x_request_id,
354 x_dynamo_request_id = trace_parent.x_dynamo_request_id,
355 tracestate = trace_parent.tracestate,
356 component = component,
357 endpoint = endpoint,
358 namespace = namespace,
359 instance_id = instance_id,
360 );
361
362 if let Some(context) = otel_context {
363 let _ = span.set_parent(context);
364 }
365 span
366 } else {
367 tracing::info_span!(
368 "handle_payload",
369 x_request_id = trace_parent.x_request_id,
370 x_dynamo_request_id = trace_parent.x_dynamo_request_id,
371 tracestate = trace_parent.tracestate,
372 component = component,
373 endpoint = endpoint,
374 namespace = namespace,
375 instance_id = instance_id,
376 )
377 }
378}
379
380pub fn make_handle_payload_span_from_tcp_headers(
382 headers: &std::collections::HashMap<String, String>,
383 component: &str,
384 endpoint: &str,
385 namespace: &str,
386 instance_id: u64,
387) -> Span {
388 let (otel_context, trace_id, parent_span_id) = extract_otel_context_from_tcp_headers(headers);
389 let x_request_id = headers.get("x-request-id").cloned();
390 let x_dynamo_request_id = headers.get("x-dynamo-request-id").cloned();
391 let tracestate = headers.get("tracestate").cloned();
392
393 if let (Some(trace_id), Some(parent_id)) = (trace_id.as_ref(), parent_span_id.as_ref()) {
394 let span = tracing::info_span!(
395 "handle_payload",
396 trace_id = trace_id.as_str(),
397 parent_id = parent_id.as_str(),
398 x_request_id = x_request_id,
399 x_dynamo_request_id = x_dynamo_request_id,
400 tracestate = tracestate,
401 component = component,
402 endpoint = endpoint,
403 namespace = namespace,
404 instance_id = instance_id,
405 );
406
407 if let Some(context) = otel_context {
408 let _ = span.set_parent(context);
409 }
410 span
411 } else {
412 tracing::info_span!(
413 "handle_payload",
414 x_request_id = x_request_id,
415 x_dynamo_request_id = x_dynamo_request_id,
416 tracestate = tracestate,
417 component = component,
418 endpoint = endpoint,
419 namespace = namespace,
420 instance_id = instance_id,
421 )
422 }
423}
424
425fn extract_otel_context_from_tcp_headers(
427 headers: &std::collections::HashMap<String, String>,
428) -> (
429 Option<opentelemetry::Context>,
430 Option<String>,
431 Option<String>,
432) {
433 let traceparent_value = match headers.get("traceparent") {
434 Some(value) => value.as_str(),
435 None => return (None, None, None),
436 };
437
438 let (trace_id, parent_span_id) = parse_traceparent(traceparent_value);
439
440 struct TcpHeaderExtractor<'a>(&'a std::collections::HashMap<String, String>);
441
442 impl<'a> Extractor for TcpHeaderExtractor<'a> {
443 fn get(&self, key: &str) -> Option<&str> {
444 self.0.get(key).map(|s| s.as_str())
445 }
446
447 fn keys(&self) -> Vec<&str> {
448 vec!["traceparent", "tracestate"]
449 .into_iter()
450 .filter(|&key| self.0.get(key).is_some())
451 .collect()
452 }
453 }
454
455 let extractor = TcpHeaderExtractor(headers);
456 let otel_context = TRACE_PROPAGATOR.extract(&extractor);
457
458 let context_with_trace = if otel_context.span().span_context().is_valid() {
459 Some(otel_context)
460 } else {
461 None
462 };
463
464 (context_with_trace, trace_id, parent_span_id)
465}
466
467pub fn extract_otel_context_from_nats_headers(
469 headers: &async_nats::HeaderMap,
470) -> (
471 Option<opentelemetry::Context>,
472 Option<String>,
473 Option<String>,
474) {
475 let traceparent_value = match headers.get("traceparent") {
476 Some(value) => value.as_str(),
477 None => return (None, None, None),
478 };
479
480 let (trace_id, parent_span_id) = parse_traceparent(traceparent_value);
481
482 struct NatsHeaderExtractor<'a>(&'a async_nats::HeaderMap);
483
484 impl<'a> Extractor for NatsHeaderExtractor<'a> {
485 fn get(&self, key: &str) -> Option<&str> {
486 self.0.get(key).map(|value| value.as_str())
487 }
488
489 fn keys(&self) -> Vec<&str> {
490 vec!["traceparent", "tracestate"]
491 .into_iter()
492 .filter(|&key| self.0.get(key).is_some())
493 .collect()
494 }
495 }
496
497 let extractor = NatsHeaderExtractor(headers);
498 let otel_context = TRACE_PROPAGATOR.extract(&extractor);
499
500 let context_with_trace = if otel_context.span().span_context().is_valid() {
501 Some(otel_context)
502 } else {
503 None
504 };
505
506 (context_with_trace, trace_id, parent_span_id)
507}
508
509pub fn inject_otel_context_into_nats_headers(
511 headers: &mut async_nats::HeaderMap,
512 context: Option<opentelemetry::Context>,
513) {
514 let otel_context = context.unwrap_or_else(|| Span::current().context());
515
516 struct NatsHeaderInjector<'a>(&'a mut async_nats::HeaderMap);
517
518 impl<'a> Injector for NatsHeaderInjector<'a> {
519 fn set(&mut self, key: &str, value: String) {
520 self.0.insert(key, value);
521 }
522 }
523
524 let mut injector = NatsHeaderInjector(headers);
525 TRACE_PROPAGATOR.inject_context(&otel_context, &mut injector);
526}
527
528pub fn inject_current_trace_into_nats_headers(headers: &mut async_nats::HeaderMap) {
530 inject_otel_context_into_nats_headers(headers, None);
531}
532
533pub fn inject_trace_headers_into_map(headers: &mut std::collections::HashMap<String, String>) {
535 if let Some(trace_context) = get_distributed_tracing_context() {
536 headers.insert(
538 "traceparent".to_string(),
539 trace_context.create_traceparent(),
540 );
541
542 if let Some(tracestate) = trace_context.tracestate {
544 headers.insert("tracestate".to_string(), tracestate);
545 }
546
547 if let Some(x_request_id) = trace_context.x_request_id {
549 headers.insert("x-request-id".to_string(), x_request_id);
550 }
551 if let Some(x_dynamo_request_id) = trace_context.x_dynamo_request_id {
552 headers.insert("x-dynamo-request-id".to_string(), x_dynamo_request_id);
553 }
554 }
555}
556
557pub fn make_client_request_span(
559 operation: &str,
560 request_id: &str,
561 trace_context: Option<&DistributedTraceContext>,
562 instance_id: Option<&str>,
563) -> Span {
564 if let Some(ctx) = trace_context {
565 let mut headers = async_nats::HeaderMap::new();
566 headers.insert("traceparent", ctx.create_traceparent());
567
568 if let Some(ref tracestate) = ctx.tracestate {
569 headers.insert("tracestate", tracestate.as_str());
570 }
571
572 let (otel_context, _extracted_trace_id, _extracted_parent_span_id) =
573 extract_otel_context_from_nats_headers(&headers);
574
575 let span = if let Some(inst_id) = instance_id {
576 tracing::info_span!(
577 "client_request",
578 operation = operation,
579 request_id = request_id,
580 instance_id = inst_id,
581 trace_id = ctx.trace_id.as_str(),
582 parent_id = ctx.span_id.as_str(),
583 x_request_id = ctx.x_request_id.as_deref(),
584 x_dynamo_request_id = ctx.x_dynamo_request_id.as_deref(),
585 )
587 } else {
588 tracing::info_span!(
589 "client_request",
590 operation = operation,
591 request_id = request_id,
592 trace_id = ctx.trace_id.as_str(),
593 parent_id = ctx.span_id.as_str(),
594 x_request_id = ctx.x_request_id.as_deref(),
595 x_dynamo_request_id = ctx.x_dynamo_request_id.as_deref(),
596 )
598 };
599
600 if let Some(context) = otel_context {
601 let _ = span.set_parent(context);
602 }
603
604 span
605 } else if let Some(inst_id) = instance_id {
606 tracing::info_span!(
607 "client_request",
608 operation = operation,
609 request_id = request_id,
610 instance_id = inst_id,
611 )
612 } else {
613 tracing::info_span!(
614 "client_request",
615 operation = operation,
616 request_id = request_id,
617 )
618 }
619}
620
621#[derive(Debug, Default)]
622pub struct FieldVisitor {
623 pub fields: HashMap<String, String>,
624}
625
626impl Visit for FieldVisitor {
627 fn record_str(&mut self, field: &Field, value: &str) {
628 self.fields
629 .insert(field.name().to_string(), value.to_string());
630 }
631
632 fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
633 self.fields
634 .insert(field.name().to_string(), format!("{:?}", value).to_string());
635 }
636}
637
638impl<S> Layer<S> for DistributedTraceIdLayer
639where
640 S: Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>,
641{
642 fn on_close(&self, id: Id, ctx: Context<'_, S>) {
645 if let Some(span) = ctx.span(&id) {
646 let mut extensions = span.extensions_mut();
647 if let Some(distributed_tracing_context) =
648 extensions.get_mut::<DistributedTraceContext>()
649 {
650 distributed_tracing_context.end = Some(Instant::now());
651 }
652 }
653 }
654
655 fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &Id, ctx: Context<'_, S>) {
658 if let Some(span) = ctx.span(id) {
659 let mut trace_id: Option<String> = None;
660 let mut parent_id: Option<String> = None;
661 let mut span_id: Option<String> = None;
662 let mut x_request_id: Option<String> = None;
663 let mut x_dynamo_request_id: Option<String> = None;
664 let mut tracestate: Option<String> = None;
665 let mut visitor = FieldVisitor::default();
666 attrs.record(&mut visitor);
667
668 if let Some(trace_id_input) = visitor.fields.get("trace_id") {
670 if !is_valid_trace_id(trace_id_input) {
671 tracing::trace!("trace id '{}' is not valid! Ignoring.", trace_id_input);
672 } else {
673 trace_id = Some(trace_id_input.to_string());
674 }
675 }
676
677 if let Some(span_id_input) = visitor.fields.get("span_id") {
679 if !is_valid_span_id(span_id_input) {
680 tracing::trace!("span id '{}' is not valid! Ignoring.", span_id_input);
681 } else {
682 span_id = Some(span_id_input.to_string());
683 }
684 }
685
686 if let Some(parent_id_input) = visitor.fields.get("parent_id") {
688 if !is_valid_span_id(parent_id_input) {
689 tracing::trace!("parent id '{}' is not valid! Ignoring.", parent_id_input);
690 } else {
691 parent_id = Some(parent_id_input.to_string());
692 }
693 }
694
695 if let Some(tracestate_input) = visitor.fields.get("tracestate") {
697 tracestate = Some(tracestate_input.to_string());
698 }
699
700 if let Some(x_request_id_input) = visitor.fields.get("x_request_id") {
702 x_request_id = Some(x_request_id_input.to_string());
703 }
704
705 if let Some(x_request_id_input) = visitor.fields.get("x_dynamo_request_id") {
707 x_dynamo_request_id = Some(x_request_id_input.to_string());
708 }
709
710 if parent_id.is_none()
712 && let Some(parent_span_id) = ctx.current_span().id()
713 && let Some(parent_span) = ctx.span(parent_span_id)
714 {
715 let parent_ext = parent_span.extensions();
716 if let Some(parent_tracing_context) = parent_ext.get::<DistributedTraceContext>() {
717 trace_id = Some(parent_tracing_context.trace_id.clone());
718 parent_id = Some(parent_tracing_context.span_id.clone());
719 tracestate = parent_tracing_context.tracestate.clone();
720 }
721 }
722
723 if (parent_id.is_some() || span_id.is_some()) && trace_id.is_none() {
725 tracing::error!("parent id or span id are set but trace id is not set!");
726 parent_id = None;
728 span_id = None;
729 }
730
731 let mut extensions = span.extensions_mut();
733 extensions.insert(PendingDistributedTraceContext {
734 trace_id,
735 span_id,
736 parent_id,
737 tracestate,
738 x_request_id,
739 x_dynamo_request_id,
740 });
741 }
742 }
743
744 fn on_enter(&self, id: &Id, ctx: Context<'_, S>) {
747 if let Some(span) = ctx.span(id) {
748 {
750 let extensions = span.extensions();
751 if extensions.get::<DistributedTraceContext>().is_some() {
752 return;
753 }
754 }
755
756 let mut extensions = span.extensions_mut();
758 let pending = match extensions.remove::<PendingDistributedTraceContext>() {
759 Some(p) => p,
760 None => {
761 tracing::error!("PendingDistributedTraceContext not found in on_enter");
763 return;
764 }
765 };
766
767 let mut trace_id = pending.trace_id;
768 let mut span_id = pending.span_id;
769 let parent_id = pending.parent_id;
770 let tracestate = pending.tracestate;
771 let x_request_id = pending.x_request_id;
772 let x_dynamo_request_id = pending.x_dynamo_request_id;
773
774 drop(extensions);
777
778 if trace_id.is_none() || span_id.is_none() {
779 let extensions = span.extensions();
780 if let Some(otel_data) = extensions.get::<tracing_opentelemetry::OtelData>() {
781 if trace_id.is_none()
783 && let Some(otel_trace_id) = otel_data.trace_id()
784 {
785 let trace_id_str = format!("{}", otel_trace_id);
786 if is_valid_trace_id(&trace_id_str) {
787 trace_id = Some(trace_id_str);
788 }
789 }
790
791 if span_id.is_none()
793 && let Some(otel_span_id) = otel_data.span_id()
794 {
795 let span_id_str = format!("{}", otel_span_id);
796 if is_valid_span_id(&span_id_str) {
797 span_id = Some(span_id_str);
798 }
799 }
800 }
801 }
802
803 if trace_id.is_none() {
805 panic!(
806 "trace_id is not set in on_enter - OtelData may not be properly initialized"
807 );
808 }
809
810 if span_id.is_none() {
811 panic!("span_id is not set in on_enter - OtelData may not be properly initialized");
812 }
813
814 let mut extensions = span.extensions_mut();
816 extensions.insert(DistributedTraceContext {
817 trace_id: trace_id.expect("Trace ID must be set"),
818 span_id: span_id.expect("Span ID must be set"),
819 parent_id,
820 tracestate,
821 start: Some(Instant::now()),
822 end: None,
823 x_request_id,
824 x_dynamo_request_id,
825 });
826 }
827 }
828}
829
830pub fn get_distributed_tracing_context() -> Option<DistributedTraceContext> {
833 Span::current()
834 .with_subscriber(|(id, subscriber)| {
835 subscriber
836 .downcast_ref::<Registry>()
837 .and_then(|registry| registry.span_data(id))
838 .and_then(|span_data| {
839 let extensions = span_data.extensions();
840 extensions.get::<DistributedTraceContext>().cloned()
841 })
842 })
843 .flatten()
844}
845
846pub fn init() {
848 INIT.call_once(|| {
849 if let Err(e) = setup_logging() {
850 eprintln!("Failed to initialize logging: {}", e);
851 std::process::exit(1);
852 }
853 });
854}
855
856#[cfg(feature = "tokio-console")]
857fn setup_logging() {
858 let tokio_console_layer = console_subscriber::ConsoleLayer::builder()
859 .with_default_env()
860 .server_addr(([0, 0, 0, 0], console_subscriber::Server::DEFAULT_PORT))
861 .spawn();
862 let tokio_console_target = tracing_subscriber::filter::Targets::new()
863 .with_default(LevelFilter::ERROR)
864 .with_target("runtime", LevelFilter::TRACE)
865 .with_target("tokio", LevelFilter::TRACE);
866 let l = fmt::layer()
867 .with_ansi(!disable_ansi_logging())
868 .event_format(fmt::format().compact().with_timer(TimeFormatter::new()))
869 .with_writer(std::io::stderr)
870 .with_filter(filters(load_config()));
871 tracing_subscriber::registry()
872 .with(l)
873 .with(tokio_console_layer.with_filter(tokio_console_target))
874 .init();
875}
876
877#[cfg(not(feature = "tokio-console"))]
878fn setup_logging() -> Result<(), Box<dyn std::error::Error>> {
879 let fmt_filter_layer = filters(load_config());
880 let trace_filter_layer = filters(load_config());
881 let otel_filter_layer = filters(load_config());
882
883 if jsonl_logging_enabled() {
884 let l = fmt::layer()
885 .with_ansi(false)
886 .event_format(CustomJsonFormatter::new())
887 .with_writer(std::io::stderr)
888 .with_filter(fmt_filter_layer);
889
890 let service_name = get_service_name();
892
893 let (tracer_provider, endpoint_opt) = if otlp_exporter_enabled() {
895 let endpoint = std::env::var(env_logging::otlp::OTEL_EXPORTER_OTLP_TRACES_ENDPOINT)
897 .unwrap_or_else(|_| DEFAULT_OTLP_ENDPOINT.to_string());
898
899 let otlp_exporter = opentelemetry_otlp::SpanExporter::builder()
901 .with_tonic()
902 .with_endpoint(&endpoint)
903 .build()?;
904
905 let provider = opentelemetry_sdk::trace::SdkTracerProvider::builder()
907 .with_batch_exporter(otlp_exporter)
908 .with_resource(
909 opentelemetry_sdk::Resource::builder_empty()
910 .with_service_name(service_name.clone())
911 .build(),
912 )
913 .build();
914
915 (provider, Some(endpoint))
916 } else {
917 let provider = opentelemetry_sdk::trace::SdkTracerProvider::builder()
919 .with_resource(
920 opentelemetry_sdk::Resource::builder_empty()
921 .with_service_name(service_name.clone())
922 .build(),
923 )
924 .build();
925
926 (provider, None)
927 };
928
929 let tracer = tracer_provider.tracer(service_name.clone());
931
932 tracing_subscriber::registry()
933 .with(
934 tracing_opentelemetry::layer()
935 .with_tracer(tracer)
936 .with_filter(otel_filter_layer),
937 )
938 .with(DistributedTraceIdLayer.with_filter(trace_filter_layer))
939 .with(l)
940 .init();
941
942 if let Some(endpoint) = endpoint_opt {
944 tracing::info!(
945 endpoint = %endpoint,
946 service = %service_name,
947 "OpenTelemetry OTLP export enabled"
948 );
949 } else {
950 tracing::info!(
951 service = %service_name,
952 "OpenTelemetry OTLP export disabled, traces local only"
953 );
954 }
955 } else {
956 let l = fmt::layer()
957 .with_ansi(!disable_ansi_logging())
958 .event_format(fmt::format().compact().with_timer(TimeFormatter::new()))
959 .with_writer(std::io::stderr)
960 .with_filter(fmt_filter_layer);
961
962 tracing_subscriber::registry().with(l).init();
963 }
964
965 Ok(())
966}
967
968fn filters(config: LoggingConfig) -> EnvFilter {
969 let mut filter_layer = EnvFilter::builder()
970 .with_default_directive(config.log_level.parse().unwrap())
971 .with_env_var(env_logging::DYN_LOG)
972 .from_env_lossy();
973
974 for (module, level) in config.log_filters {
975 match format!("{module}={level}").parse::<Directive>() {
976 Ok(d) => {
977 filter_layer = filter_layer.add_directive(d);
978 }
979 Err(e) => {
980 eprintln!("Failed parsing filter '{level}' for module '{module}': {e}");
981 }
982 }
983 }
984 filter_layer
985}
986
987pub fn log_message(level: &str, message: &str, module: &str, file: &str, line: u32) {
990 let level = match level {
991 "debug" => log::Level::Debug,
992 "info" => log::Level::Info,
993 "warn" => log::Level::Warn,
994 "error" => log::Level::Error,
995 "warning" => log::Level::Warn,
996 _ => log::Level::Info,
997 };
998 log::logger().log(
999 &log::Record::builder()
1000 .args(format_args!("{}", message))
1001 .level(level)
1002 .target(module)
1003 .file(Some(file))
1004 .line(Some(line))
1005 .build(),
1006 );
1007}
1008
1009fn load_config() -> LoggingConfig {
1010 let config_path =
1011 std::env::var(env_logging::DYN_LOGGING_CONFIG_PATH).unwrap_or_else(|_| "".to_string());
1012 let figment = Figment::new()
1013 .merge(Serialized::defaults(LoggingConfig::default()))
1014 .merge(Toml::file("/opt/dynamo/etc/logging.toml"))
1015 .merge(Toml::file(config_path));
1016
1017 figment.extract().unwrap()
1018}
1019
1020#[derive(Serialize)]
1021struct JsonLog<'a> {
1022 time: String,
1023 level: String,
1024 #[serde(skip_serializing_if = "Option::is_none")]
1025 file: Option<&'a str>,
1026 #[serde(skip_serializing_if = "Option::is_none")]
1027 line: Option<u32>,
1028 target: &'a str,
1029 message: serde_json::Value,
1030 #[serde(flatten)]
1031 fields: BTreeMap<String, serde_json::Value>,
1032}
1033
1034struct TimeFormatter {
1035 use_local_tz: bool,
1036}
1037
1038impl TimeFormatter {
1039 fn new() -> Self {
1040 Self {
1041 use_local_tz: crate::config::use_local_timezone(),
1042 }
1043 }
1044
1045 fn format_now(&self) -> String {
1046 if self.use_local_tz {
1047 chrono::Local::now()
1048 .format("%Y-%m-%dT%H:%M:%S%.6f%:z")
1049 .to_string()
1050 } else {
1051 chrono::Utc::now()
1052 .format("%Y-%m-%dT%H:%M:%S%.6fZ")
1053 .to_string()
1054 }
1055 }
1056}
1057
1058impl FormatTime for TimeFormatter {
1059 fn format_time(&self, w: &mut fmt::format::Writer<'_>) -> std::fmt::Result {
1060 write!(w, "{}", self.format_now())
1061 }
1062}
1063
1064struct CustomJsonFormatter {
1065 time_formatter: TimeFormatter,
1066}
1067
1068impl CustomJsonFormatter {
1069 fn new() -> Self {
1070 Self {
1071 time_formatter: TimeFormatter::new(),
1072 }
1073 }
1074}
1075
1076use once_cell::sync::Lazy;
1077use regex::Regex;
1078
1079static TRACE_PROPAGATOR: Lazy<opentelemetry_sdk::propagation::TraceContextPropagator> =
1081 Lazy::new(opentelemetry_sdk::propagation::TraceContextPropagator::new);
1082
1083fn parse_tracing_duration(s: &str) -> Option<u64> {
1084 static RE: Lazy<Regex> =
1085 Lazy::new(|| Regex::new(r#"^["']?\s*([0-9.]+)\s*(µs|us|ns|ms|s)\s*["']?$"#).unwrap());
1086 let captures = RE.captures(s)?;
1087 let value: f64 = captures[1].parse().ok()?;
1088 let unit = &captures[2];
1089 match unit {
1090 "ns" => Some((value / 1000.0) as u64),
1091 "µs" | "us" => Some(value as u64),
1092 "ms" => Some((value * 1000.0) as u64),
1093 "s" => Some((value * 1_000_000.0) as u64),
1094 _ => None,
1095 }
1096}
1097
1098impl<S, N> tracing_subscriber::fmt::FormatEvent<S, N> for CustomJsonFormatter
1099where
1100 S: Subscriber + for<'a> LookupSpan<'a>,
1101 N: for<'a> FormatFields<'a> + 'static,
1102{
1103 fn format_event(
1104 &self,
1105 ctx: &FmtContext<'_, S, N>,
1106 mut writer: Writer<'_>,
1107 event: &Event<'_>,
1108 ) -> std::fmt::Result {
1109 let mut visitor = JsonVisitor::default();
1110 let time = self.time_formatter.format_now();
1111 event.record(&mut visitor);
1112 let mut message = visitor
1113 .fields
1114 .remove("message")
1115 .unwrap_or(serde_json::Value::String("".to_string()));
1116
1117 let current_span = event
1118 .parent()
1119 .and_then(|id| ctx.span(id))
1120 .or_else(|| ctx.lookup_current());
1121 if let Some(span) = current_span {
1122 let ext = span.extensions();
1123 let data = ext.get::<FormattedFields<N>>().unwrap();
1124 let span_fields: Vec<(&str, &str)> = data
1125 .fields
1126 .split(' ')
1127 .filter_map(|entry| entry.split_once('='))
1128 .collect();
1129 for (name, value) in span_fields {
1130 visitor.fields.insert(
1131 name.to_string(),
1132 serde_json::Value::String(value.trim_matches('"').to_string()),
1133 );
1134 }
1135
1136 let busy_us = visitor
1137 .fields
1138 .remove("time.busy")
1139 .and_then(|v| parse_tracing_duration(&v.to_string()));
1140 let idle_us = visitor
1141 .fields
1142 .remove("time.idle")
1143 .and_then(|v| parse_tracing_duration(&v.to_string()));
1144
1145 if let (Some(busy_us), Some(idle_us)) = (busy_us, idle_us) {
1146 visitor.fields.insert(
1147 "time.busy_us".to_string(),
1148 serde_json::Value::Number(busy_us.into()),
1149 );
1150 visitor.fields.insert(
1151 "time.idle_us".to_string(),
1152 serde_json::Value::Number(idle_us.into()),
1153 );
1154 visitor.fields.insert(
1155 "time.duration_us".to_string(),
1156 serde_json::Value::Number((busy_us + idle_us).into()),
1157 );
1158 }
1159
1160 message = match message.as_str() {
1161 Some("new") => serde_json::Value::String("SPAN_CREATED".to_string()),
1162 Some("close") => serde_json::Value::String("SPAN_CLOSED".to_string()),
1163 _ => message.clone(),
1164 };
1165
1166 visitor.fields.insert(
1167 "span_name".to_string(),
1168 serde_json::Value::String(span.name().to_string()),
1169 );
1170
1171 if let Some(tracing_context) = ext.get::<DistributedTraceContext>() {
1172 visitor.fields.insert(
1173 "span_id".to_string(),
1174 serde_json::Value::String(tracing_context.span_id.clone()),
1175 );
1176 visitor.fields.insert(
1177 "trace_id".to_string(),
1178 serde_json::Value::String(tracing_context.trace_id.clone()),
1179 );
1180 if let Some(parent_id) = tracing_context.parent_id.clone() {
1181 visitor.fields.insert(
1182 "parent_id".to_string(),
1183 serde_json::Value::String(parent_id),
1184 );
1185 } else {
1186 visitor.fields.remove("parent_id");
1187 }
1188 if let Some(tracestate) = tracing_context.tracestate.clone() {
1189 visitor.fields.insert(
1190 "tracestate".to_string(),
1191 serde_json::Value::String(tracestate),
1192 );
1193 } else {
1194 visitor.fields.remove("tracestate");
1195 }
1196 if let Some(x_request_id) = tracing_context.x_request_id.clone() {
1197 visitor.fields.insert(
1198 "x_request_id".to_string(),
1199 serde_json::Value::String(x_request_id),
1200 );
1201 } else {
1202 visitor.fields.remove("x_request_id");
1203 }
1204
1205 if let Some(x_dynamo_request_id) = tracing_context.x_dynamo_request_id.clone() {
1206 visitor.fields.insert(
1207 "x_dynamo_request_id".to_string(),
1208 serde_json::Value::String(x_dynamo_request_id),
1209 );
1210 } else {
1211 visitor.fields.remove("x_dynamo_request_id");
1212 }
1213 } else {
1214 tracing::error!(
1215 "Distributed Trace Context not found, falling back to internal ids"
1216 );
1217 visitor.fields.insert(
1218 "span_id".to_string(),
1219 serde_json::Value::String(span.id().into_u64().to_string()),
1220 );
1221 if let Some(parent) = span.parent() {
1222 visitor.fields.insert(
1223 "parent_id".to_string(),
1224 serde_json::Value::String(parent.id().into_u64().to_string()),
1225 );
1226 }
1227 }
1228 } else {
1229 let reserved_fields = [
1230 "trace_id",
1231 "span_id",
1232 "parent_id",
1233 "span_name",
1234 "tracestate",
1235 ];
1236 for reserved_field in reserved_fields {
1237 visitor.fields.remove(reserved_field);
1238 }
1239 }
1240 let metadata = event.metadata();
1241 let log = JsonLog {
1242 level: metadata.level().to_string(),
1243 time,
1244 file: metadata.file(),
1245 line: metadata.line(),
1246 target: metadata.target(),
1247 message,
1248 fields: visitor.fields,
1249 };
1250 let json = serde_json::to_string(&log).unwrap();
1251 writeln!(writer, "{json}")
1252 }
1253}
1254
1255#[derive(Default)]
1256struct JsonVisitor {
1257 fields: BTreeMap<String, serde_json::Value>,
1258}
1259
1260impl tracing::field::Visit for JsonVisitor {
1261 fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
1262 self.fields.insert(
1263 field.name().to_string(),
1264 serde_json::Value::String(format!("{value:?}")),
1265 );
1266 }
1267
1268 fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
1269 if field.name() != "message" {
1270 match serde_json::from_str::<Value>(value) {
1271 Ok(json_val) => self.fields.insert(field.name().to_string(), json_val),
1272 Err(_) => self.fields.insert(field.name().to_string(), value.into()),
1273 };
1274 } else {
1275 self.fields.insert(field.name().to_string(), value.into());
1276 }
1277 }
1278
1279 fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
1280 self.fields
1281 .insert(field.name().to_string(), serde_json::Value::Bool(value));
1282 }
1283
1284 fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
1285 self.fields.insert(
1286 field.name().to_string(),
1287 serde_json::Value::Number(value.into()),
1288 );
1289 }
1290
1291 fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
1292 self.fields.insert(
1293 field.name().to_string(),
1294 serde_json::Value::Number(value.into()),
1295 );
1296 }
1297
1298 fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
1299 use serde_json::value::Number;
1300 self.fields.insert(
1301 field.name().to_string(),
1302 serde_json::Value::Number(Number::from_f64(value).unwrap_or(0.into())),
1303 );
1304 }
1305}
1306
1307#[cfg(test)]
1308pub mod tests {
1309 use super::*;
1310 use anyhow::{Result, anyhow};
1311 use chrono::{DateTime, Utc};
1312 use jsonschema::{Draft, JSONSchema};
1313 use serde_json::Value;
1314 use std::fs::File;
1315 use std::io::{BufRead, BufReader};
1316 use stdio_override::*;
1317 use tempfile::NamedTempFile;
1318
1319 static LOG_LINE_SCHEMA: &str = r#"
1320 {
1321 "$schema": "http://json-schema.org/draft-07/schema#",
1322 "title": "Runtime Log Line",
1323 "type": "object",
1324 "required": [
1325 "file",
1326 "level",
1327 "line",
1328 "message",
1329 "target",
1330 "time"
1331 ],
1332 "properties": {
1333 "file": { "type": "string" },
1334 "level": { "type": "string", "enum": ["ERROR", "WARN", "INFO", "DEBUG", "TRACE"] },
1335 "line": { "type": "integer" },
1336 "message": { "type": "string" },
1337 "target": { "type": "string" },
1338 "time": { "type": "string", "format": "date-time" },
1339 "span_id": { "type": "string", "pattern": "^[a-f0-9]{16}$" },
1340 "parent_id": { "type": "string", "pattern": "^[a-f0-9]{16}$" },
1341 "trace_id": { "type": "string", "pattern": "^[a-f0-9]{32}$" },
1342 "span_name": { "type": "string" },
1343 "time.busy_us": { "type": "integer" },
1344 "time.duration_us": { "type": "integer" },
1345 "time.idle_us": { "type": "integer" },
1346 "tracestate": { "type": "string" }
1347 },
1348 "additionalProperties": true
1349 }
1350 "#;
1351
1352 #[tracing::instrument(skip_all)]
1353 async fn parent() {
1354 tracing::trace!(message = "parent!");
1355 if let Some(my_ctx) = get_distributed_tracing_context() {
1356 tracing::info!(my_trace_id = my_ctx.trace_id);
1357 }
1358 child().await;
1359 }
1360
1361 #[tracing::instrument(skip_all)]
1362 async fn child() {
1363 tracing::trace!(message = "child");
1364 if let Some(my_ctx) = get_distributed_tracing_context() {
1365 tracing::info!(my_trace_id = my_ctx.trace_id);
1366 }
1367 grandchild().await;
1368 }
1369
1370 #[tracing::instrument(skip_all)]
1371 async fn grandchild() {
1372 tracing::trace!(message = "grandchild");
1373 if let Some(my_ctx) = get_distributed_tracing_context() {
1374 tracing::info!(my_trace_id = my_ctx.trace_id);
1375 }
1376 }
1377
1378 pub fn load_log(file_name: &str) -> Result<Vec<serde_json::Value>> {
1379 let schema_json: Value =
1380 serde_json::from_str(LOG_LINE_SCHEMA).expect("schema parse failure");
1381 let compiled_schema = JSONSchema::options()
1382 .with_draft(Draft::Draft7)
1383 .compile(&schema_json)
1384 .expect("Invalid schema");
1385
1386 let f = File::open(file_name)?;
1387 let reader = BufReader::new(f);
1388 let mut result = Vec::new();
1389
1390 for (line_num, line) in reader.lines().enumerate() {
1391 let line = line?;
1392 let val: Value = serde_json::from_str(&line)
1393 .map_err(|e| anyhow!("Line {}: invalid JSON: {}", line_num + 1, e))?;
1394
1395 if let Err(errors) = compiled_schema.validate(&val) {
1396 let errs = errors.map(|e| e.to_string()).collect::<Vec<_>>().join("; ");
1397 return Err(anyhow!(
1398 "Line {}: JSON Schema Validation errors: {}",
1399 line_num + 1,
1400 errs
1401 ));
1402 }
1403 println!("{}", val);
1404 result.push(val);
1405 }
1406 Ok(result)
1407 }
1408
1409 #[tokio::test]
1410 async fn test_json_log_capture() -> Result<()> {
1411 #[allow(clippy::redundant_closure_call)]
1412 let _ = temp_env::async_with_vars(
1413 [(env_logging::DYN_LOGGING_JSONL, Some("1"))],
1414 (async || {
1415 let tmp_file = NamedTempFile::new().unwrap();
1416 let file_name = tmp_file.path().to_str().unwrap();
1417 let guard = StderrOverride::from_file(file_name)?;
1418 init();
1419 parent().await;
1420 drop(guard);
1421
1422 let lines = load_log(file_name)?;
1423
1424 let Some(trace_id) = lines
1432 .iter()
1433 .find_map(|log_line| log_line.get("trace_id").and_then(|v| v.as_str()))
1434 .map(|s| s.to_string())
1435 else {
1436 return Ok(());
1438 };
1439
1440 assert_ne!(
1442 trace_id, "00000000000000000000000000000000",
1443 "trace_id should not be a zero/invalid ID"
1444 );
1445 assert!(
1446 !trace_id.chars().all(|c| c == '0'),
1447 "trace_id should not be all zeros"
1448 );
1449
1450 for log_line in &lines {
1452 if let Some(line_trace_id) = log_line.get("trace_id") {
1453 assert_eq!(
1454 line_trace_id.as_str().unwrap(),
1455 &trace_id,
1456 "All logs should have the same trace_id"
1457 );
1458 }
1459 }
1460
1461 for log_line in &lines {
1463 if let Some(my_trace_id) = log_line.get("my_trace_id") {
1464 assert_eq!(
1465 my_trace_id,
1466 &serde_json::Value::String(trace_id.clone()),
1467 "my_trace_id should match the trace_id from distributed tracing context"
1468 );
1469 }
1470 }
1471
1472 let mut span_ids_seen: std::collections::HashSet<String> = std::collections::HashSet::new();
1474 let mut span_timestamps: std::collections::HashMap<String, DateTime<Utc>> = std::collections::HashMap::new();
1475
1476 for log_line in &lines {
1477 if let Some(span_id) = log_line.get("span_id") {
1478 let span_id_str = span_id.as_str().unwrap();
1479 assert!(
1480 is_valid_span_id(span_id_str),
1481 "Invalid span_id format: {}",
1482 span_id_str
1483 );
1484 span_ids_seen.insert(span_id_str.to_string());
1485 }
1486
1487 if let Some(time_str) = log_line.get("time").and_then(|v| v.as_str()) {
1489 let timestamp = DateTime::parse_from_rfc3339(time_str)
1490 .expect("All timestamps should be valid RFC3339 format")
1491 .with_timezone(&Utc);
1492
1493 if let Some(span_name) = log_line.get("span_name").and_then(|v| v.as_str()) {
1495 span_timestamps.insert(span_name.to_string(), timestamp);
1496 }
1497 }
1498 }
1499
1500 let parent_span_id = lines
1503 .iter()
1504 .find(|log_line| {
1505 log_line.get("span_name")
1506 .and_then(|v| v.as_str()) == Some("parent")
1507 })
1508 .and_then(|log_line| {
1509 log_line.get("span_id")
1510 .and_then(|v| v.as_str())
1511 .map(|s| s.to_string())
1512 })
1513 .expect("Should find parent span with span_id");
1514
1515 let child_span_id = lines
1516 .iter()
1517 .find(|log_line| {
1518 log_line.get("span_name")
1519 .and_then(|v| v.as_str()) == Some("child")
1520 })
1521 .and_then(|log_line| {
1522 log_line.get("span_id")
1523 .and_then(|v| v.as_str())
1524 .map(|s| s.to_string())
1525 })
1526 .expect("Should find child span with span_id");
1527
1528 let grandchild_span_id = lines
1529 .iter()
1530 .find(|log_line| {
1531 log_line.get("span_name")
1532 .and_then(|v| v.as_str()) == Some("grandchild")
1533 })
1534 .and_then(|log_line| {
1535 log_line.get("span_id")
1536 .and_then(|v| v.as_str())
1537 .map(|s| s.to_string())
1538 })
1539 .expect("Should find grandchild span with span_id");
1540
1541 assert_ne!(parent_span_id, child_span_id, "Parent and child should have different span IDs");
1543 assert_ne!(child_span_id, grandchild_span_id, "Child and grandchild should have different span IDs");
1544 assert_ne!(parent_span_id, grandchild_span_id, "Parent and grandchild should have different span IDs");
1545
1546 for log_line in &lines {
1548 if let Some(span_name) = log_line.get("span_name")
1549 && let Some(span_name_str) = span_name.as_str()
1550 && span_name_str == "parent"
1551 {
1552 assert!(
1553 log_line.get("parent_id").is_none(),
1554 "Parent span should not have a parent_id"
1555 );
1556 }
1557 }
1558
1559 for log_line in &lines {
1561 if let Some(span_name) = log_line.get("span_name")
1562 && let Some(span_name_str) = span_name.as_str()
1563 && span_name_str == "child"
1564 {
1565 let parent_id = log_line.get("parent_id")
1566 .and_then(|v| v.as_str())
1567 .expect("Child span should have a parent_id");
1568 assert_eq!(
1569 parent_id,
1570 parent_span_id,
1571 "Child's parent_id should match parent's span_id"
1572 );
1573 }
1574 }
1575
1576 for log_line in &lines {
1578 if let Some(span_name) = log_line.get("span_name")
1579 && let Some(span_name_str) = span_name.as_str()
1580 && span_name_str == "grandchild"
1581 {
1582 let parent_id = log_line.get("parent_id")
1583 .and_then(|v| v.as_str())
1584 .expect("Grandchild span should have a parent_id");
1585 assert_eq!(
1586 parent_id,
1587 child_span_id,
1588 "Grandchild's parent_id should match child's span_id"
1589 );
1590 }
1591 }
1592
1593 let parent_time = span_timestamps.get("parent")
1595 .expect("Should have timestamp for parent span");
1596 let child_time = span_timestamps.get("child")
1597 .expect("Should have timestamp for child span");
1598 let grandchild_time = span_timestamps.get("grandchild")
1599 .expect("Should have timestamp for grandchild span");
1600
1601 assert!(
1603 parent_time <= child_time,
1604 "Parent span should log before or at same time as child span (parent: {}, child: {})",
1605 parent_time,
1606 child_time
1607 );
1608 assert!(
1609 child_time <= grandchild_time,
1610 "Child span should log before or at same time as grandchild span (child: {}, grandchild: {})",
1611 child_time,
1612 grandchild_time
1613 );
1614
1615 Ok::<(), anyhow::Error>(())
1616 })(),
1617 )
1618 .await;
1619 Ok(())
1620 }
1621}