tracing_datadog/
layer.rs

1use crate::{
2    log::{DatadogLog, FieldVisitor},
3    span::{DatadogSpan, SpanAttributeVisitor, SpanLink},
4};
5use jiff::Zoned;
6use reqwest::header::HeaderValue;
7use std::{
8    borrow::Cow,
9    collections::HashMap,
10    fmt::{Display, Formatter},
11    marker::PhantomData,
12    sync::{Arc, Mutex, mpsc},
13    thread::spawn,
14    time::{SystemTime, UNIX_EPOCH},
15};
16use tracing_core::{
17    Event, Subscriber,
18    span::{Attributes, Id, Record},
19};
20use tracing_subscriber::{
21    Layer,
22    layer::Context,
23    registry::{LookupSpan, Scope},
24};
25
26/// A [`Layer`] that sends traces to Datadog.
27///
28/// ```
29/// # use tracing_subscriber::prelude::*;
30/// # use tracing_datadog::DatadogTraceLayer;
31/// tracing_subscriber::registry()
32///    .with(
33///        DatadogTraceLayer::builder()
34///            .service("my-service")
35///            .env("production")
36///            .version("git sha")
37///            .agent_address("localhost:8126")
38///            .build()
39///            .expect("failed to build DatadogTraceLayer"),
40///    )
41///    .init();
42/// ```
43#[derive(Debug)]
44pub struct DatadogTraceLayer<S> {
45    buffer: Arc<Mutex<Vec<DatadogSpan>>>,
46    service: String,
47    default_tags: HashMap<Cow<'static, str>, String>,
48    logging_enabled: bool,
49    #[cfg(feature = "http")]
50    with_context: crate::http::WithContext,
51    shutdown: mpsc::Sender<()>,
52    _registry: PhantomData<S>,
53}
54
55impl<S> DatadogTraceLayer<S>
56where
57    S: Subscriber + for<'a> LookupSpan<'a>,
58{
59    /// Creates a builder to construct a [`DatadogTraceLayer`].
60    pub fn builder() -> DatadogTraceLayerBuilder<S> {
61        DatadogTraceLayerBuilder {
62            service: None,
63            default_tags: HashMap::from_iter([("span.kind".into(), "internal".to_string())]),
64            agent_address: None,
65            container_id: None,
66            logging_enabled: false,
67            phantom_data: Default::default(),
68        }
69    }
70
71    #[cfg(feature = "http")]
72    fn get_context(
73        dispatch: &tracing_core::Dispatch,
74        id: &Id,
75        f: &mut dyn FnMut(&mut DatadogSpan),
76    ) {
77        let subscriber = dispatch
78            .downcast_ref::<S>()
79            .expect("Subscriber did not downcast to expected type, this is a bug");
80        let span = subscriber.span(id).expect("Span not found, this is a bug");
81
82        let mut extensions = span.extensions_mut();
83        if let Some(dd_span) = extensions.get_mut::<DatadogSpan>() {
84            f(dd_span);
85        }
86    }
87}
88
89impl<S> Drop for DatadogTraceLayer<S> {
90    fn drop(&mut self) {
91        let _ = self.shutdown.send(());
92    }
93}
94
95impl<S> Layer<S> for DatadogTraceLayer<S>
96where
97    S: Subscriber + for<'a> LookupSpan<'a>,
98{
99    fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, ctx: Context<'_, S>) {
100        let span = ctx.span(id).expect("Span not found, this is a bug");
101        let mut extensions = span.extensions_mut();
102
103        let trace_id = span
104            .parent()
105            .map(|parent| {
106                parent
107                    .extensions()
108                    .get::<DatadogSpan>()
109                    .expect("Parent span didn't have a DatadogSpan extension, this is a bug")
110                    .trace_id
111            })
112            .unwrap_or(rand::random_range(1..=u64::MAX));
113
114        debug_assert!(trace_id != 0, "Trace ID is zero, this is a bug");
115
116        let mut dd_span = DatadogSpan {
117            name: span.name().to_string(),
118            service: self.service.clone(),
119            r#type: "custom".into(),
120            span_id: span.id().into_u64(),
121            start: epoch_ns(),
122            parent_id: span
123                .parent()
124                .map(|parent| parent.id().into_u64())
125                .unwrap_or_default(),
126            trace_id,
127            meta: self.default_tags.clone(),
128            metrics: {
129                let mut m = HashMap::new();
130                if span.parent().is_none() {
131                    // Special tag to mark the service entry span.
132                    m.insert("_dd.top_level", 1.0);
133                    m.insert("_dd.agent_psr", 1.0);
134                    m.insert("_dd.rule_psr", 1.0);
135                    m.insert("_dd.limit_psr", 1.0);
136                    m.insert("_sample_rate", 1.0);
137                    m.insert("_dd.tracer_kr", 1.0);
138                }
139                m.insert("_sampling_priority_v1", 2.0);
140                m.insert("process_id", std::process::id() as f64);
141                m
142            },
143            ..Default::default()
144        };
145
146        attrs.record(&mut SpanAttributeVisitor::new(&mut dd_span));
147
148        extensions.insert(dd_span);
149    }
150
151    fn on_record(&self, id: &Id, values: &Record<'_>, ctx: Context<'_, S>) {
152        let span = ctx.span(id).expect("Span not found, this is a bug");
153        let mut extensions = span.extensions_mut();
154
155        if let Some(dd_span) = extensions.get_mut::<DatadogSpan>() {
156            values.record(&mut SpanAttributeVisitor::new(dd_span));
157        }
158    }
159
160    fn on_follows_from(&self, id: &Id, follows: &Id, ctx: Context<'_, S>) {
161        let span = ctx.span(id).expect("Span not found, this is a bug");
162        let mut extensions = span.extensions_mut();
163
164        let Some(other_span) = ctx.span(follows) else {
165            // The other span might be filtered or closed, so we can't access it.
166            return;
167        };
168
169        if let Some(dd_span) = extensions.get_mut::<DatadogSpan>()
170            && let Some(other_dd_span) = other_span.extensions().get::<DatadogSpan>()
171        {
172            dd_span.span_links.push(SpanLink {
173                trace_id: other_dd_span.trace_id,
174                span_id: other_dd_span.span_id,
175            })
176        }
177    }
178
179    fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
180        if !self.logging_enabled {
181            return;
182        }
183
184        let mut fields = {
185            let mut visitor = FieldVisitor::default();
186            event.record(&mut visitor);
187            visitor.finish()
188        };
189
190        fields.extend(
191            ctx.event_scope(event)
192                .into_iter()
193                .flat_map(Scope::from_root)
194                .flat_map(|span| match span.extensions().get::<DatadogSpan>() {
195                    Some(dd_span) => dd_span.meta.clone(),
196                    None => panic!("DatadogSpan extension not found, this is a bug"),
197                }),
198        );
199
200        let message = fields.remove("message").unwrap_or_default();
201
202        let (trace_id, span_id) = ctx
203            .lookup_current()
204            .and_then(|span| {
205                span.extensions()
206                    .get::<DatadogSpan>()
207                    .map(|dd_span| (Some(dd_span.trace_id), Some(dd_span.span_id)))
208            })
209            .unwrap_or_default();
210
211        let log = DatadogLog {
212            timestamp: Zoned::now().timestamp(),
213            level: event.metadata().level().to_owned(),
214            message,
215            trace_id,
216            span_id,
217            fields,
218        };
219
220        let serialized = serde_json::to_string(&log).expect("Failed to serialize log");
221
222        println!("{serialized}");
223    }
224
225    fn on_enter(&self, id: &Id, ctx: Context<'_, S>) {
226        let span = ctx.span(id).expect("Span not found, this is a bug");
227        let mut extensions = span.extensions_mut();
228
229        let now = epoch_ns();
230
231        match extensions.get_mut::<DatadogSpan>() {
232            Some(dd_span) if dd_span.start == 0 => dd_span.start = now,
233            _ => {}
234        }
235    }
236
237    fn on_exit(&self, id: &Id, ctx: Context<'_, S>) {
238        let span = ctx.span(id).expect("Span not found, this is a bug");
239        let mut extensions = span.extensions_mut();
240
241        let now = epoch_ns();
242
243        if let Some(dd_span) = extensions.get_mut::<DatadogSpan>() {
244            dd_span.duration = now - dd_span.start
245        }
246    }
247
248    fn on_close(&self, id: Id, ctx: Context<'_, S>) {
249        let span = ctx.span(&id).expect("Span not found, this is a bug");
250        let mut extensions = span.extensions_mut();
251
252        if let Some(mut dd_span) = extensions.remove::<DatadogSpan>() {
253            // Enable trace metrics for select span kinds.
254            if let Some("server" | "client" | "consumer" | "producer") =
255                dd_span.meta.get("span.kind").map(String::as_str)
256            {
257                dd_span.metrics.insert("_dd.measured", 1.0);
258                dd_span.metrics.insert("_dd1.sr.eausr", 1.0);
259            }
260
261            self.buffer.lock().unwrap().push(dd_span);
262        }
263    }
264
265    // SAFETY: This is safe because the `WithContext` function pointer is valid
266    // for the lifetime of `&self`.
267    #[cfg(feature = "http")]
268    unsafe fn downcast_raw(&self, id: std::any::TypeId) -> Option<*const ()> {
269        match id {
270            id if id == std::any::TypeId::of::<Self>() => Some(self as *const _ as *const ()),
271            id if id == std::any::TypeId::of::<crate::http::WithContext>() => {
272                Some(&self.with_context as *const _ as *const ())
273            }
274            _ => None,
275        }
276    }
277}
278
279/// A builder for [`DatadogTraceLayer`].
280pub struct DatadogTraceLayerBuilder<S> {
281    service: Option<String>,
282    default_tags: HashMap<Cow<'static, str>, String>,
283    agent_address: Option<String>,
284    container_id: Option<String>,
285    logging_enabled: bool,
286    phantom_data: PhantomData<S>,
287}
288
289/// An error that can occur when building a [`DatadogTraceLayer`].
290#[derive(Debug)]
291pub struct BuilderError(&'static str);
292
293impl Display for BuilderError {
294    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
295        f.write_str(self.0)
296    }
297}
298
299impl std::error::Error for BuilderError {}
300
301impl<S> DatadogTraceLayerBuilder<S>
302where
303    S: Subscriber + for<'a> LookupSpan<'a>,
304{
305    /// Sets the `service`. This is required.
306    pub fn service(mut self, service: impl Into<String>) -> Self {
307        self.service = Some(service.into());
308        self
309    }
310
311    /// Sets the `env`. This is required.
312    pub fn env(mut self, env: impl Into<String>) -> Self {
313        self.default_tags.insert("env".into(), env.into());
314        self
315    }
316
317    /// Sets the `version`. This is required.
318    pub fn version(mut self, version: impl Into<String>) -> Self {
319        self.default_tags.insert("version".into(), version.into());
320        self
321    }
322
323    /// Sets the `agent_address`. This is required.
324    pub fn agent_address(mut self, agent_address: impl Into<String>) -> Self {
325        self.agent_address = Some(agent_address.into());
326        self
327    }
328
329    /// Adds a fixed default tag to all spans.
330    ///
331    /// This can be used multiple times for several tags.
332    ///
333    /// Default tags are overridden by tags set explicitly on a span.
334    pub fn default_tag(
335        mut self,
336        key: impl Into<Cow<'static, str>>,
337        value: impl Into<String>,
338    ) -> Self {
339        let _ = self.default_tags.insert(key.into(), value.into());
340        self
341    }
342
343    /// Sets the container ID. This enables infrastructure metrics in APM for supported platforms.
344    pub fn container_id(mut self, container_id: impl Into<String>) -> Self {
345        self.container_id = Some(container_id.into());
346        self
347    }
348
349    /// Enables or disables structured logging with trace correlation to stdout.
350    /// Disabled by default.
351    pub fn enable_logs(mut self, enable_logs: bool) -> Self {
352        self.logging_enabled = enable_logs;
353        self
354    }
355
356    /// Consumes the builder to construct the tracing layer.
357    pub fn build(self) -> Result<DatadogTraceLayer<S>, BuilderError> {
358        let Some(service) = self.service else {
359            return Err(BuilderError("service is required"));
360        };
361        if !self.default_tags.contains_key("env") {
362            return Err(BuilderError("env is required"));
363        };
364        if !self.default_tags.contains_key("version") {
365            return Err(BuilderError("version is required"));
366        };
367        let Some(agent_address) = self.agent_address else {
368            return Err(BuilderError("agent_address is required"));
369        };
370        let container_id = match self.container_id {
371            Some(s) => Some(
372                s.parse::<HeaderValue>()
373                    .map_err(|_| BuilderError("Failed to parse container ID into header"))?,
374            ),
375            _ => None,
376        };
377
378        let buffer = Arc::new(Mutex::new(Vec::new()));
379        let (shutdown, shutdown_rx) = mpsc::channel();
380
381        spawn(crate::export::exporter(
382            agent_address,
383            buffer.clone(),
384            container_id,
385            shutdown_rx,
386        ));
387
388        Ok(DatadogTraceLayer {
389            buffer,
390            service,
391            default_tags: self.default_tags,
392            logging_enabled: self.logging_enabled,
393            #[cfg(feature = "http")]
394            with_context: crate::http::WithContext(DatadogTraceLayer::<S>::get_context),
395            shutdown,
396            _registry: PhantomData,
397        })
398    }
399}
400
401/// Returns the current system time as nanoseconds since 1970.
402fn epoch_ns() -> i64 {
403    SystemTime::now()
404        .duration_since(UNIX_EPOCH)
405        .expect("SystemTime is before UNIX epoch")
406        .as_nanos() as i64
407}
408
409#[cfg(test)]
410mod tests {
411    use super::*;
412
413    #[test]
414    fn builder_builds_successfully() {
415        assert!(
416            DatadogTraceLayer::<tracing_subscriber::Registry>::builder()
417                .service("test-service")
418                .env("test")
419                .version("test-version")
420                .agent_address("localhost:8126")
421                .build()
422                .is_ok()
423        );
424    }
425
426    #[test]
427    fn service_is_required() {
428        let result = DatadogTraceLayer::<tracing_subscriber::Registry>::builder()
429            .env("test")
430            .version("test-version")
431            .agent_address("localhost:8126")
432            .build();
433        assert!(result.unwrap_err().to_string().contains("service"));
434    }
435
436    #[test]
437    fn env_is_required() {
438        let result = DatadogTraceLayer::<tracing_subscriber::Registry>::builder()
439            .service("test-service")
440            .version("test-version")
441            .agent_address("localhost:8126")
442            .build();
443        assert!(result.unwrap_err().to_string().contains("env"));
444    }
445
446    #[test]
447    fn version_is_required() {
448        let result = DatadogTraceLayer::<tracing_subscriber::Registry>::builder()
449            .service("test-service")
450            .env("test")
451            .agent_address("localhost:8126")
452            .build();
453        assert!(result.unwrap_err().to_string().contains("version"));
454    }
455
456    #[test]
457    fn agent_address_is_required() {
458        let result = DatadogTraceLayer::<tracing_subscriber::Registry>::builder()
459            .service("test-service")
460            .env("test")
461            .version("test-version")
462            .build();
463        assert!(result.unwrap_err().to_string().contains("agent_address"));
464    }
465
466    #[test]
467    fn default_default_tags_include_env_and_version() {
468        let layer: DatadogTraceLayer<tracing_subscriber::Registry> = DatadogTraceLayer::builder()
469            .service("test-service")
470            .env("test")
471            .version("test-version")
472            .agent_address("localhost:8126")
473            .build()
474            .unwrap();
475        let default_tags = &layer.default_tags;
476        assert_eq!(default_tags["env"], "test");
477        assert_eq!(default_tags["version"], "test-version");
478    }
479
480    #[test]
481    fn default_tags_can_be_added() {
482        let layer: DatadogTraceLayer<tracing_subscriber::Registry> = DatadogTraceLayer::builder()
483            .service("test-service")
484            .env("test")
485            .version("test-version")
486            .agent_address("localhost:8126")
487            .default_tag("static", "bar")
488            .default_tag(String::from("dynamic"), "qux")
489            .build()
490            .unwrap();
491        let default_tags = &layer.default_tags;
492        assert_eq!(default_tags["static"], "bar");
493        assert_eq!(default_tags["dynamic"], "qux");
494    }
495}