datadog_tracing/
formatter.rs

1//! An event formatter to emit events in a way that Datadog can correlate them with traces.
2//!
3//! Datadog's trace ID and span ID format is different from the OpenTelemetry standard.
4//! Using this formatter, the trace ID is converted to the correct format.
5//! It also adds the trace ID to the `dd.trace_id` field and the span ID to the
6//! `dd.span_id` field, which is where Datadog looks for these by default
7//! (although the path to the trace ID can be overridden in Datadog).
8
9use std::io;
10
11use chrono::Utc;
12use opentelemetry::trace::{SpanId, TraceContextExt, TraceId};
13use serde::ser::{SerializeMap, Serializer as _};
14use serde::Serialize;
15use tracing::{Event, Subscriber};
16use tracing_opentelemetry::OtelData;
17
18use tracing_serde::AsSerde;
19use tracing_subscriber::fmt::format::Writer;
20use tracing_subscriber::fmt::{FmtContext, FormatEvent, FormatFields};
21use tracing_subscriber::registry::{LookupSpan, SpanRef};
22
23#[derive(Serialize)]
24struct DatadogId(u64);
25
26struct TraceInfo {
27    trace_id: DatadogId,
28    span_id: DatadogId,
29}
30
31impl From<TraceId> for DatadogId {
32    fn from(value: TraceId) -> Self {
33        let bytes = &value.to_bytes()[std::mem::size_of::<u64>()..std::mem::size_of::<u128>()];
34        Self(u64::from_be_bytes(bytes.try_into().unwrap_or_default()))
35    }
36}
37
38impl From<SpanId> for DatadogId {
39    fn from(value: SpanId) -> Self {
40        Self(u64::from_be_bytes(value.to_bytes()))
41    }
42}
43
44fn lookup_trace_info<S>(span_ref: &SpanRef<S>) -> Option<TraceInfo>
45where
46    S: Subscriber + for<'a> LookupSpan<'a>,
47{
48    span_ref.extensions().get::<OtelData>().map(|o| {
49        let trace_id = if o.parent_cx.has_active_span() {
50            o.parent_cx.span().span_context().trace_id()
51        } else {
52            o.builder.trace_id.unwrap_or(TraceId::INVALID)
53        };
54        TraceInfo {
55            trace_id: trace_id.into(),
56            span_id: o.builder.span_id.unwrap_or(SpanId::INVALID).into(),
57        }
58    })
59}
60
61// mostly stolen from here: https://github.com/tokio-rs/tracing/issues/1531
62pub struct DatadogFormatter;
63
64impl<S, N> FormatEvent<S, N> for DatadogFormatter
65where
66    S: Subscriber + for<'lookup> LookupSpan<'lookup>,
67    N: for<'writer> FormatFields<'writer> + 'static,
68{
69    fn format_event(
70        &self,
71        ctx: &FmtContext<'_, S, N>,
72        mut writer: Writer<'_>,
73        event: &Event<'_>,
74    ) -> std::fmt::Result
75    where
76        S: Subscriber + for<'a> LookupSpan<'a>,
77    {
78        let meta = event.metadata();
79
80        let mut visit = || {
81            let mut serializer = serde_json::Serializer::new(WriteAdaptor::new(&mut writer));
82            let mut serializer = serializer.serialize_map(None)?;
83            serializer.serialize_entry("timestamp", &Utc::now().to_rfc3339())?;
84            serializer.serialize_entry("level", &meta.level().as_serde())?;
85            serializer.serialize_entry("target", meta.target())?;
86
87            // fields -> stolen from https://github.com/tokio-rs/tracing/blob/tracing-subscriber-0.3.17/tracing-subscriber/src/fmt/format/json.rs#L263-L268
88            let mut visitor = tracing_serde::SerdeMapVisitor::new(serializer);
89            event.record(&mut visitor);
90            serializer = visitor.take_serializer()?;
91
92            if let Some(ref span_ref) = ctx.lookup_current() {
93                if let Some(trace_info) = lookup_trace_info(span_ref) {
94                    serializer.serialize_entry("dd.span_id", &trace_info.span_id)?;
95                    serializer.serialize_entry("dd.trace_id", &trace_info.trace_id)?;
96                }
97            }
98
99            serializer.end()
100        };
101
102        visit().map_err(|_| std::fmt::Error)?;
103        writeln!(writer)
104    }
105}
106
107struct WriteAdaptor<'a> {
108    fmt_write: &'a mut dyn std::fmt::Write,
109}
110
111impl<'a> WriteAdaptor<'a> {
112    fn new(fmt_write: &'a mut dyn std::fmt::Write) -> Self {
113        Self { fmt_write }
114    }
115}
116
117impl<'a> io::Write for WriteAdaptor<'a> {
118    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
119        let s =
120            std::str::from_utf8(buf).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
121
122        self.fmt_write
123            .write_str(s)
124            .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
125
126        Ok(s.as_bytes().len())
127    }
128
129    fn flush(&mut self) -> io::Result<()> {
130        Ok(())
131    }
132}
133
134#[cfg(test)]
135mod tests {
136    use super::DatadogId;
137    use opentelemetry::trace::{SpanId, TraceId};
138
139    #[test]
140    fn test_trace_id_converted_to_datadog_id() {
141        let trace_id = TraceId::from_hex("2de7888d8f42abc9c7ba048b78f7a9fb").unwrap();
142        let datadog_id: DatadogId = trace_id.into();
143
144        assert_eq!(datadog_id.0, 14391820556292303355);
145    }
146
147    #[test]
148    fn test_invalid_trace_id_converted_to_zero() {
149        let trace_id = TraceId::INVALID;
150        let datadog_id: DatadogId = trace_id.into();
151
152        assert_eq!(datadog_id.0, 0);
153    }
154
155    #[test]
156    fn test_span_id_converted_to_datadog_id() {
157        let span_id = SpanId::from_hex("58406520a0066491").unwrap();
158        let datadog_id: DatadogId = span_id.into();
159
160        assert_eq!(datadog_id.0, 6359193864645272721);
161    }
162}