tracing_datadog/
lib.rs

1#![doc = include_str!("../README.md")]
2
3use jiff::{Timestamp, Zoned};
4use rmp_serde::Serializer as MpSerializer;
5use serde::{Serialize, Serializer};
6use std::{
7    collections::HashMap,
8    fmt::{Debug, Display, Formatter, Write},
9    marker::PhantomData,
10    sync::{Arc, Mutex, mpsc},
11    thread::{sleep, spawn},
12    time::{Duration, SystemTime, UNIX_EPOCH},
13};
14use tracing_core::{
15    Event, Field, Level, Subscriber,
16    field::Visit,
17    span::{Attributes, Id, Record},
18};
19use tracing_subscriber::{
20    Layer,
21    layer::Context,
22    registry::{LookupSpan, Scope},
23};
24
25/// A [`Layer`] that sends traces to DataDog.
26///
27/// ```
28/// # use tracing_subscriber::prelude::*;
29/// # use tracing_datadog::DataDogTraceLayer;
30/// tracing_subscriber::registry()
31///    .with(
32///        DataDogTraceLayer::builder()
33///            .service("my-service")
34///            .env("production")
35///            .version("git sha")
36///            .agent_address("localhost:8126")
37///            .build()
38///            .expect("failed to build DataDogTraceLayer"),
39///    )
40///    .init();
41/// ```
42pub struct DataDogTraceLayer<S> {
43    buffer: Arc<Mutex<Vec<DataDogSpan>>>,
44    service: String,
45    env: String,
46    version: String,
47    logging_enabled: bool,
48    #[cfg(feature = "http")]
49    with_context: http::WithContext,
50    shutdown: mpsc::Sender<()>,
51    _registry: PhantomData<S>,
52}
53
54impl<S> DataDogTraceLayer<S>
55where
56    S: Subscriber + for<'a> LookupSpan<'a>,
57{
58    /// Creates a builder to construct a [`DataDogTraceLayer`].
59    pub fn builder() -> DataDogTraceLayerBuilder<S> {
60        DataDogTraceLayerBuilder {
61            service: None,
62            env: None,
63            version: None,
64            agent_address: None,
65            container_id: None,
66            logging_enabled: false,
67            phantom_data: Default::default(),
68        }
69    }
70
71    #[cfg(feature = "http")]
72    fn get_context(
73        dispatch: &tracing_core::Dispatch,
74        id: &Id,
75        f: &mut dyn FnMut(&mut DataDogSpan),
76    ) {
77        let subscriber = dispatch
78            .downcast_ref::<S>()
79            .expect("Subscriber did not downcast to expected type, this is a bug");
80        let span = subscriber.span(id).expect("Span not found, this is a bug");
81
82        let mut extensions = span.extensions_mut();
83        if let Some(dd_span) = extensions.get_mut::<DataDogSpan>() {
84            f(dd_span);
85        }
86    }
87}
88
89impl<S> Drop for DataDogTraceLayer<S> {
90    fn drop(&mut self) {
91        let _ = self.shutdown.send(());
92    }
93}
94
95impl<S> Layer<S> for DataDogTraceLayer<S>
96where
97    S: Subscriber + for<'a> LookupSpan<'a>,
98{
99    fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, ctx: Context<'_, S>) {
100        let span = ctx.span(id).expect("Span not found, this is a bug");
101        let mut extensions = span.extensions_mut();
102
103        let trace_id = span
104            .parent()
105            .and_then(|parent| {
106                parent
107                    .extensions()
108                    .get::<DataDogSpan>()
109                    .map(|dd_span| dd_span.trace_id)
110            })
111            .unwrap_or(rand::random());
112
113        let mut dd_span = DataDogSpan {
114            name: span.name().to_string(),
115            service: self.service.clone(),
116            r#type: "internal".into(),
117            span_id: span.id().into_u64(),
118            start: epoch_ns(),
119            parent_id: span
120                .parent()
121                .map(|parent| parent.id().into_u64())
122                .unwrap_or_default(),
123            trace_id,
124            meta: HashMap::from_iter([
125                ("env".into(), self.env.clone()),
126                ("version".into(), self.version.clone()),
127            ]),
128            ..Default::default()
129        };
130
131        attrs.record(&mut SpanAttributeVisitor::new(&mut dd_span));
132
133        extensions.insert(dd_span);
134    }
135
136    fn on_record(&self, id: &Id, values: &Record<'_>, ctx: Context<'_, S>) {
137        let span = ctx.span(id).expect("Span not found, this is a bug");
138        let mut extensions = span.extensions_mut();
139
140        if let Some(dd_span) = extensions.get_mut::<DataDogSpan>() {
141            values.record(&mut SpanAttributeVisitor::new(dd_span));
142        }
143    }
144
145    fn on_follows_from(&self, id: &Id, follows: &Id, ctx: Context<'_, S>) {
146        let span = ctx.span(id).expect("Span not found, this is a bug");
147        let mut extensions = span.extensions_mut();
148
149        if let Some(dd_span) = extensions.get_mut::<DataDogSpan>() {
150            dd_span.parent_id = follows.into_u64();
151        }
152    }
153
154    fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
155        if !self.logging_enabled {
156            return;
157        }
158
159        let mut fields = {
160            let mut visitor = FieldVisitor::default();
161            event.record(&mut visitor);
162            visitor.fields
163        };
164
165        let mut message = fields.remove("message").unwrap_or_default();
166
167        fields.extend(
168            ctx.event_scope(event)
169                .into_iter()
170                .flat_map(Scope::from_root)
171                .flat_map(|span| match span.extensions().get::<DataDogSpan>() {
172                    Some(dd_span) => dd_span.meta.clone(),
173                    None => panic!("Span not found, this is a bug"),
174                }),
175        );
176
177        fields
178            .into_iter()
179            .try_for_each(|(k, v)| write!(&mut message, " {k}={v}"))
180            .expect("Failed to write message");
181
182        let (trace_id, span_id) = ctx
183            .lookup_current()
184            .and_then(|span| {
185                span.extensions()
186                    .get::<DataDogSpan>()
187                    .map(|dd_span| (Some(dd_span.trace_id), Some(dd_span.span_id)))
188            })
189            .unwrap_or_default();
190
191        let log = DataDogLog {
192            timestamp: Zoned::now().timestamp(),
193            level: event.metadata().level().to_owned(),
194            message,
195            trace_id,
196            span_id,
197        };
198
199        let serialized = serde_json::to_string(&log).expect("Failed to serialize log");
200
201        println!("{serialized}");
202    }
203
204    fn on_enter(&self, id: &Id, ctx: Context<'_, S>) {
205        let span = ctx.span(id).expect("Span not found, this is a bug");
206        let mut extensions = span.extensions_mut();
207
208        let now = epoch_ns();
209
210        match extensions.get_mut::<DataDogSpan>() {
211            Some(dd_span) if dd_span.start == 0 => dd_span.start = now,
212            _ => {}
213        }
214    }
215
216    fn on_exit(&self, id: &Id, ctx: Context<'_, S>) {
217        let span = ctx.span(id).expect("Span not found, this is a bug");
218        let mut extensions = span.extensions_mut();
219
220        let now = epoch_ns();
221
222        if let Some(dd_span) = extensions.get_mut::<DataDogSpan>() {
223            dd_span.duration = now - dd_span.start
224        }
225    }
226
227    fn on_close(&self, id: Id, ctx: Context<'_, S>) {
228        let span = ctx.span(&id).expect("Span not found, this is a bug");
229        let mut extensions = span.extensions_mut();
230
231        if let Some(dd_span) = extensions.remove::<DataDogSpan>() {
232            self.buffer.lock().unwrap().push(dd_span);
233        }
234    }
235
236    // SAFETY: This is safe because the `WithContext` function pointer is valid
237    // for the lifetime of `&self`.
238    #[cfg(feature = "http")]
239    unsafe fn downcast_raw(&self, id: std::any::TypeId) -> Option<*const ()> {
240        match id {
241            id if id == std::any::TypeId::of::<Self>() => Some(self as *const _ as *const ()),
242            id if id == std::any::TypeId::of::<http::WithContext>() => {
243                Some(&self.with_context as *const _ as *const ())
244            }
245            _ => None,
246        }
247    }
248}
249
250/// A builder for [`DataDogTraceLayer`].
251pub struct DataDogTraceLayerBuilder<S> {
252    service: Option<String>,
253    env: Option<String>,
254    version: Option<String>,
255    agent_address: Option<String>,
256    container_id: Option<String>,
257    logging_enabled: bool,
258    phantom_data: PhantomData<S>,
259}
260
261/// An error that can occur when building a [`DataDogTraceLayer`].
262#[derive(Debug)]
263pub struct BuilderError(&'static str);
264
265impl Display for BuilderError {
266    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
267        f.write_str(self.0)
268    }
269}
270
271impl std::error::Error for BuilderError {}
272
273impl<S> DataDogTraceLayerBuilder<S>
274where
275    S: Subscriber + for<'a> LookupSpan<'a>,
276{
277    /// Sets the `service`. This is required.
278    pub fn service(mut self, service: impl Into<String>) -> Self {
279        self.service = Some(service.into());
280        self
281    }
282
283    /// Sets the `env`. This is required.
284    pub fn env(mut self, env: impl Into<String>) -> Self {
285        self.env = Some(env.into());
286        self
287    }
288
289    /// Sets the `version`. This is required.
290    pub fn version(mut self, version: impl Into<String>) -> Self {
291        self.version = Some(version.into());
292        self
293    }
294
295    /// Sets the `agent_address`. This is required.
296    pub fn agent_address(mut self, agent_address: impl Into<String>) -> Self {
297        self.agent_address = Some(agent_address.into());
298        self
299    }
300
301    /// Sets the container ID. This enables infrastructure metrics in APM for supported platforms.
302    pub fn container_id(mut self, container_id: impl Into<String>) -> Self {
303        self.container_id = Some(container_id.into());
304        self
305    }
306
307    /// Enables or disables structured logging with trace correlation to stdout.
308    /// Disabled by default.
309    pub fn enable_logs(mut self, enable_logs: bool) -> Self {
310        self.logging_enabled = enable_logs;
311        self
312    }
313
314    /// Consumes the builder to construct the tracing layer.
315    pub fn build(self) -> Result<DataDogTraceLayer<S>, BuilderError> {
316        let Some(service) = self.service else {
317            return Err(BuilderError("service is required"));
318        };
319        let Some(env) = self.env else {
320            return Err(BuilderError("env is required"));
321        };
322        let Some(version) = self.version else {
323            return Err(BuilderError("version is required"));
324        };
325        let Some(agent_address) = self.agent_address else {
326            return Err(BuilderError("agent_address is required"));
327        };
328        let container_id = match self.container_id {
329            Some(s) => Some(
330                s.parse::<reqwest::header::HeaderValue>()
331                    .map_err(|_| BuilderError("Failed to parse container ID into header"))?,
332            ),
333            _ => None,
334        };
335
336        let buffer = Arc::new(Mutex::new(Vec::new()));
337        let exporter_buffer = buffer.clone();
338        let url = format!("http://{}/v0.4/traces", agent_address);
339        let (tx, rx) = mpsc::channel();
340
341        spawn(move || {
342            let client = {
343                let mut builder = reqwest::blocking::Client::builder();
344                if let Some(container_id) = container_id {
345                    builder = builder.default_headers(reqwest::header::HeaderMap::from_iter([(
346                        reqwest::header::HeaderName::from_static("Datadog-Container-ID"),
347                        container_id,
348                    )]));
349                };
350                builder.build().expect("Failed to build reqwest client")
351            };
352
353            loop {
354                if rx.try_recv().is_ok() {
355                    break;
356                }
357
358                sleep(Duration::from_secs(5));
359
360                let spans = exporter_buffer
361                    .lock()
362                    .unwrap()
363                    .drain(..)
364                    .collect::<Vec<_>>();
365                if spans.is_empty() {
366                    continue;
367                }
368
369                let mut body = vec![0b10010001];
370                let _ = spans
371                    .serialize(&mut MpSerializer::new(&mut body).with_struct_map())
372                    .inspect_err(|error| println!("Error serializing spans: {error:?}"));
373
374                let _ = client
375                    .post(&url)
376                    .header("Datadog-Meta-Tracer-Version", "v1.27.0")
377                    .header("Content-Type", "application/msgpack")
378                    .body(body)
379                    .send()
380                    .inspect_err(|error| println!("Error exporting spans: {error:?}"));
381            }
382        });
383
384        Ok(DataDogTraceLayer {
385            buffer,
386            service,
387            env,
388            version,
389            logging_enabled: self.logging_enabled,
390            #[cfg(feature = "http")]
391            with_context: http::WithContext(DataDogTraceLayer::<S>::get_context),
392            shutdown: tx,
393            _registry: PhantomData,
394        })
395    }
396}
397
398/// Returns the current system time as nanoseconds since 1970.
399fn epoch_ns() -> i64 {
400    SystemTime::now()
401        .duration_since(UNIX_EPOCH)
402        .expect("SystemTime is before UNIX epoch")
403        .as_nanos() as i64
404}
405
406/// The v0.4 DataDog trace API format for spans. This is what we write to MessagePack.
407#[derive(Default, Debug, Serialize)]
408struct DataDogSpan {
409    name: String,
410    service: String,
411    r#type: String,
412    resource: String,
413    start: i64,
414    duration: i64,
415    meta: HashMap<String, String>,
416    error_code: i32,
417    span_id: u64,
418    trace_id: u64,
419    parent_id: u64,
420}
421
422/// A visitor that converts tracing span attributes to a [`DataDogSpan`].
423struct SpanAttributeVisitor<'a> {
424    dd_span: &'a mut DataDogSpan,
425}
426
427impl<'a> SpanAttributeVisitor<'a> {
428    fn new(dd_span: &'a mut DataDogSpan) -> Self {
429        Self { dd_span }
430    }
431}
432
433impl<'a> Visit for SpanAttributeVisitor<'a> {
434    fn record_str(&mut self, field: &Field, value: &str) {
435        // Strings are broken out because their debug representation includes quotation marks.
436        match field.name() {
437            "service" => self.dd_span.service = value.to_string(),
438            "span.type" => self.dd_span.r#type = value.to_string(),
439            "operation" => self.dd_span.name = value.to_string(),
440            "resource" => self.dd_span.resource = value.to_string(),
441            name => {
442                self.dd_span
443                    .meta
444                    .insert(name.to_string(), value.to_string());
445            }
446        };
447    }
448
449    fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
450        match field.name() {
451            "service" => self.dd_span.service = format!("{value:?}"),
452            "span.type" => self.dd_span.r#type = format!("{value:?}"),
453            "operation" => self.dd_span.name = format!("{value:?}"),
454            "resource" => self.dd_span.resource = format!("{value:?}"),
455            name => {
456                self.dd_span
457                    .meta
458                    .insert(name.to_string(), format!("{value:?}"));
459            }
460        };
461    }
462}
463
464/// The DataDog structure log format. This is what we write to JSON.
465#[derive(Serialize)]
466struct DataDogLog {
467    timestamp: Timestamp,
468    #[serde(serialize_with = "serialize_level")]
469    level: Level,
470    message: String,
471    #[serde(rename = "dd.trace_id", skip_serializing_if = "Option::is_none")]
472    trace_id: Option<u64>,
473    #[serde(rename = "dd.span_id", skip_serializing_if = "Option::is_none")]
474    span_id: Option<u64>,
475}
476
477/// Serializes a `Level` to a string, e.g. `"INFO"`.
478fn serialize_level<S: Serializer>(level: &Level, serializer: S) -> Result<S::Ok, S::Error> {
479    serializer.serialize_str(level.as_str())
480}
481
482/// A visitor that collects tracing attributes into a map.
483#[derive(Default)]
484struct FieldVisitor {
485    fields: HashMap<String, String>,
486}
487
488impl Visit for FieldVisitor {
489    fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
490        self.fields
491            .insert(field.name().to_string(), format!("{value:?}"));
492    }
493}
494
495#[cfg(feature = "http")]
496#[doc = "Functionality for working with distributed tracing HTTP headers"]
497pub mod http {
498    use crate::DataDogSpan;
499    use http::{HeaderMap, HeaderName};
500    use tracing_core::{Dispatch, span::Id};
501
502    /// The trace context for distributed tracing. This is a subset of the W3C trace context
503    /// which allows stitching together traces with spans from different services.
504    #[derive(Copy, Clone, Default)]
505    pub struct DataDogContext {
506        trace_id: u128,
507        parent_id: u64,
508    }
509
510    impl DataDogContext {
511        /// Parses a context for distributed tracing from W3C trace context headers.
512        ///
513        /// This would be useful in HTTP server middleware.
514        ///
515        /// ```
516        /// # let request = http::Request::builder().body(()).unwrap();
517        /// use tracing_datadog::http::{DataDogContext, DistributedTracingContext};
518        ///
519        /// // Construct a new span.
520        /// let span = tracing::info_span!("http.request");
521        ///
522        /// // Set the context on the span based on request headers.
523        /// span.set_context(DataDogContext::from_w3c_headers(request.headers()));
524        /// ```
525        ///
526        /// An alternative use case is setting the context on the current span, for example
527        /// within `#[instrument]`ed functions.
528        ///
529        /// ```
530        /// # let request = http::Request::builder().body(()).unwrap();
531        /// use tracing_datadog::http::{DataDogContext, DistributedTracingContext};
532        ///
533        /// tracing::Span::current().set_context(DataDogContext::from_w3c_headers(request.headers()));
534        /// ```
535        pub fn from_w3c_headers(headers: &HeaderMap) -> Self {
536            Self::parse_w3c_headers(headers).unwrap_or_default()
537        }
538
539        fn parse_w3c_headers(headers: &HeaderMap) -> Option<Self> {
540            let header = headers.get("traceparent")?.to_str().ok()?;
541
542            let parts: Vec<&str> = header.split('-').collect();
543            if parts.len() != 4 {
544                return None;
545            }
546
547            let Some(0) = u8::from_str_radix(parts[0], 16).ok() else {
548                return None;
549            };
550
551            let trace_id = u128::from_str_radix(parts[1], 16).ok()?;
552            let parent_id = u64::from_str_radix(parts[2], 16).ok()?;
553
554            Some(Self {
555                trace_id,
556                parent_id,
557            })
558        }
559
560        /// Serializes a context for distributed tracing to W3C trace context headers.
561        ///
562        /// ```
563        /// # use http::Request;
564        /// use tracing_datadog::http::DistributedTracingContext;
565        ///
566        /// // Build the request.
567        /// let mut request = Request::builder().body(()).unwrap();
568        ///
569        /// // Inject distributed tracing headers.
570        /// request.headers_mut().extend(tracing::Span::current().get_context().to_w3c_headers());
571        ///
572        /// // Execute the request.
573        /// // ..
574        /// ```
575        pub fn to_w3c_headers(&self) -> HeaderMap {
576            let header = format!(
577                "{version:02x}-{trace_id:032x}-{parent_id:016x}-{trace_flags:02x}",
578                version = 0,
579                trace_id = self.trace_id,
580                parent_id = self.parent_id,
581                trace_flags = 1,
582            );
583
584            HeaderMap::from_iter([(
585                HeaderName::from_static("traceparent"),
586                header.parse().unwrap(),
587            )])
588        }
589    }
590
591    // This function "remembers" the types of the subscriber so that we can downcast to something
592    // aware of them without knowing those types at the call site. Adapted from tracing-error.
593    pub(crate) struct WithContext(
594        #[allow(clippy::type_complexity)]
595        pub(crate)  fn(&Dispatch, &Id, f: &mut dyn FnMut(&mut DataDogSpan)),
596    );
597
598    impl WithContext {
599        pub(crate) fn with_context(
600            &self,
601            dispatch: &Dispatch,
602            id: &Id,
603            mut f: &mut dyn FnMut(&mut DataDogSpan),
604        ) {
605            self.0(dispatch, id, &mut f);
606        }
607    }
608
609    pub trait DistributedTracingContext {
610        /// Gets the context for distributed tracing from the current span.
611        fn get_context(&self) -> DataDogContext;
612
613        /// Sets the context for distributed tracing on the current span.
614        fn set_context(&self, context: DataDogContext);
615    }
616
617    impl DistributedTracingContext for tracing::Span {
618        fn get_context(&self) -> DataDogContext {
619            let mut ctx = None;
620
621            self.with_subscriber(|(id, subscriber)| {
622                let Some(get_context) = subscriber.downcast_ref::<WithContext>() else {
623                    return;
624                };
625                get_context.with_context(subscriber, id, &mut |dd_span| {
626                    ctx = Some(DataDogContext {
627                        // NB Trace IDs can be 128-bit nowadays, but the 0.4 API still uses 64-bit.
628                        trace_id: dd_span.trace_id as u128,
629                        parent_id: dd_span.parent_id,
630                    })
631                });
632            });
633
634            ctx.unwrap_or_default()
635        }
636
637        fn set_context(&self, context: DataDogContext) {
638            self.with_subscriber(move |(id, subscriber)| {
639                let Some(get_context) = subscriber.downcast_ref::<WithContext>() else {
640                    return;
641                };
642                get_context.with_context(subscriber, id, &mut |dd_span| {
643                    // NB Trace IDs can be 128-bit nowadays, but the 0.4 API still uses 64-bit.
644                    dd_span.trace_id = context.trace_id as u64;
645                    dd_span.parent_id = context.parent_id;
646                })
647            });
648        }
649    }
650
651    #[cfg(test)]
652    mod tests {
653        use super::*;
654        use crate::DataDogTraceLayer;
655        use rand::random;
656        use tracing::info_span;
657        use tracing_subscriber::layer::SubscriberExt;
658
659        #[test]
660        fn w3c_trace_header_round_trip() {
661            let context = DataDogContext {
662                trace_id: random(),
663                parent_id: random(),
664            };
665
666            let headers = context.to_w3c_headers();
667            let parsed = DataDogContext::parse_w3c_headers(&headers).unwrap();
668
669            assert_eq!(context.trace_id, parsed.trace_id);
670            assert_eq!(context.parent_id, parsed.parent_id);
671        }
672
673        #[test]
674        fn span_context_round_trip() {
675            tracing::subscriber::with_default(
676                tracing_subscriber::registry().with(
677                    DataDogTraceLayer::builder()
678                        .service("test-service")
679                        .env("test")
680                        .version("test-version")
681                        .agent_address("localhost:8126")
682                        .build()
683                        .unwrap(),
684                ),
685                || {
686                    let context = DataDogContext {
687                        // Need to limit the size here as we only track 64-bit trace IDs.
688                        trace_id: random::<u64>() as u128,
689                        parent_id: random(),
690                    };
691
692                    let span = info_span!("test");
693
694                    span.set_context(context);
695                    let result = span.get_context();
696
697                    assert_eq!(context.trace_id, result.trace_id);
698                    assert_eq!(context.parent_id, result.parent_id);
699                },
700            );
701        }
702    }
703}