tracing_datadog/
layer.rs

1use crate::{
2    log::{FieldVisitor, Log},
3    span::{Span, SpanAttributeVisitor, SpanLink},
4};
5#[cfg(feature = "ahash")]
6use ahash::AHashMap as HashMap;
7use jiff::Zoned;
8use reqwest::header::HeaderValue;
9#[cfg(not(feature = "ahash"))]
10use std::collections::HashMap;
11use std::{
12    borrow::Cow,
13    fmt::{Display, Formatter},
14    marker::PhantomData,
15    sync::{Arc, Mutex, mpsc},
16    thread::spawn,
17    time::{SystemTime, UNIX_EPOCH},
18};
19use tracing_core::{
20    Event, Subscriber,
21    span::{Attributes, Id, Record},
22};
23use tracing_subscriber::{
24    Layer,
25    layer::Context,
26    registry::{LookupSpan, Scope},
27};
28
29/// A [`Layer`] that sends traces to Datadog.
30///
31/// ```
32/// # use tracing_subscriber::prelude::*;
33/// # use tracing_datadog::DatadogTraceLayer;
34/// tracing_subscriber::registry()
35///    .with(
36///        DatadogTraceLayer::builder()
37///            .service("my-service")
38///            .env("production")
39///            .version("git sha")
40///            .agent_address("localhost:8126")
41///            .build()
42///            .expect("failed to build DatadogTraceLayer"),
43///    )
44///    .init();
45/// ```
46#[derive(Debug)]
47pub struct DatadogTraceLayer<S> {
48    buffer: Arc<Mutex<Vec<Span>>>,
49    service: String,
50    default_tags: HashMap<Cow<'static, str>, String>,
51    logging_enabled: bool,
52    with_context: crate::context::WithContext,
53    shutdown: mpsc::Sender<()>,
54    _registry: PhantomData<S>,
55}
56
57impl<S> DatadogTraceLayer<S>
58where
59    S: Subscriber + for<'a> LookupSpan<'a>,
60{
61    /// Creates a builder to construct a [`DatadogTraceLayer`].
62    pub fn builder() -> DatadogTraceLayerBuilder<S> {
63        DatadogTraceLayerBuilder {
64            service: None,
65            default_tags: HashMap::from_iter([("span.kind".into(), "internal".to_string())]),
66            agent_address: None,
67            container_id: None,
68            logging_enabled: false,
69            phantom_data: Default::default(),
70        }
71    }
72
73    fn get_context(dispatch: &tracing_core::Dispatch, id: &Id, f: &mut dyn FnMut(&mut Span)) {
74        let subscriber = dispatch
75            .downcast_ref::<S>()
76            .expect("Subscriber did not downcast to expected type, this is a bug");
77        let span = subscriber.span(id).expect("Span not found, this is a bug");
78
79        let mut extensions = span.extensions_mut();
80        if let Some(dd_span) = extensions.get_mut::<Span>() {
81            f(dd_span);
82        }
83    }
84}
85
86impl<S> Drop for DatadogTraceLayer<S> {
87    fn drop(&mut self) {
88        let _ = self.shutdown.send(());
89    }
90}
91
92impl<S> Layer<S> for DatadogTraceLayer<S>
93where
94    S: Subscriber + for<'a> LookupSpan<'a>,
95{
96    fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, ctx: Context<'_, S>) {
97        let span = ctx.span(id).expect("Span not found, this is a bug");
98        let mut extensions = span.extensions_mut();
99
100        let trace_id = span
101            .parent()
102            .map(|parent| {
103                parent
104                    .extensions()
105                    .get::<Span>()
106                    .expect("Parent span didn't have a DatadogSpan extension, this is a bug")
107                    .trace_id
108            })
109            .unwrap_or(rand::random_range(1..=u128::MAX));
110
111        debug_assert!(trace_id != 0, "Trace ID is zero, this is a bug");
112
113        let mut dd_span = Span {
114            name: span.name().to_string(),
115            service: self.service.clone(),
116            r#type: "custom".into(),
117            span_id: span.id().into_u64(),
118            start: epoch_ns(),
119            parent_id: span
120                .parent()
121                .map(|parent| parent.id().into_u64())
122                .unwrap_or_default(),
123            trace_id,
124            meta: self.default_tags.clone(),
125            metrics: {
126                let mut m = HashMap::new();
127                if span.parent().is_none() {
128                    // Special tag to mark the service entry span.
129                    m.insert("_dd.top_level", 1.0);
130                    m.insert("_dd.agent_psr", 1.0);
131                    m.insert("_dd.rule_psr", 1.0);
132                    m.insert("_dd.limit_psr", 1.0);
133                    m.insert("_sample_rate", 1.0);
134                    m.insert("_dd.tracer_kr", 1.0);
135                }
136                m.insert("_sampling_priority_v1", 2.0);
137                m.insert("process_id", std::process::id() as f64);
138                m
139            },
140            ..Default::default()
141        };
142
143        attrs.record(&mut SpanAttributeVisitor::new(&mut dd_span));
144
145        extensions.insert(dd_span);
146    }
147
148    fn on_record(&self, id: &Id, values: &Record<'_>, ctx: Context<'_, S>) {
149        let span = ctx.span(id).expect("Span not found, this is a bug");
150        let mut extensions = span.extensions_mut();
151
152        if let Some(dd_span) = extensions.get_mut::<Span>() {
153            values.record(&mut SpanAttributeVisitor::new(dd_span));
154        }
155    }
156
157    fn on_follows_from(&self, id: &Id, follows: &Id, ctx: Context<'_, S>) {
158        let span = ctx.span(id).expect("Span not found, this is a bug");
159        let mut extensions = span.extensions_mut();
160
161        let Some(other_span) = ctx.span(follows) else {
162            // The other span might be filtered or closed, so we can't access it.
163            return;
164        };
165
166        if let Some(dd_span) = extensions.get_mut::<Span>()
167            && let Some(other_dd_span) = other_span.extensions().get::<Span>()
168        {
169            dd_span.span_links.push(SpanLink {
170                trace_id: other_dd_span.trace_id,
171                span_id: other_dd_span.span_id,
172            })
173        }
174    }
175
176    fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
177        if !self.logging_enabled {
178            return;
179        }
180
181        let mut fields = {
182            let mut visitor = FieldVisitor::default();
183            event.record(&mut visitor);
184            visitor.finish()
185        };
186
187        fields.extend(
188            ctx.event_scope(event)
189                .into_iter()
190                .flat_map(Scope::from_root)
191                .flat_map(|span| match span.extensions().get::<Span>() {
192                    Some(dd_span) => dd_span.meta.clone(),
193                    None => panic!("Datadog Span extension not found, this is a bug"),
194                }),
195        );
196
197        let message = fields.remove("message").unwrap_or_default();
198
199        let trace_context = ctx.lookup_current().and_then(|span| {
200            span.extensions()
201                .get::<Span>()
202                .map(|dd_span| (dd_span.trace_id, dd_span.span_id))
203        });
204
205        let log = Log {
206            timestamp: Zoned::now().timestamp(),
207            level: event.metadata().level().to_owned(),
208            message,
209            trace_context,
210            fields,
211        };
212
213        let serialized = serde_json::to_string(&log).expect("Failed to serialize log");
214
215        println!("{serialized}");
216    }
217
218    fn on_enter(&self, id: &Id, ctx: Context<'_, S>) {
219        let span = ctx.span(id).expect("Span not found, this is a bug");
220        let mut extensions = span.extensions_mut();
221
222        let now = epoch_ns();
223
224        match extensions.get_mut::<Span>() {
225            Some(dd_span) if dd_span.start == 0 => dd_span.start = now,
226            _ => {}
227        }
228    }
229
230    fn on_exit(&self, id: &Id, ctx: Context<'_, S>) {
231        let span = ctx.span(id).expect("Span not found, this is a bug");
232        let mut extensions = span.extensions_mut();
233
234        let now = epoch_ns();
235
236        if let Some(dd_span) = extensions.get_mut::<Span>() {
237            dd_span.duration = now - dd_span.start
238        }
239    }
240
241    fn on_close(&self, id: Id, ctx: Context<'_, S>) {
242        let span = ctx.span(&id).expect("Span not found, this is a bug");
243        let mut extensions = span.extensions_mut();
244
245        if let Some(mut dd_span) = extensions.remove::<Span>() {
246            // Enable trace metrics for select span kinds.
247            if let Some("server" | "client" | "consumer" | "producer") =
248                dd_span.meta.get("span.kind").map(String::as_str)
249            {
250                dd_span.metrics.insert("_dd.measured", 1.0);
251                dd_span.metrics.insert("_dd1.sr.eausr", 1.0);
252            }
253
254            self.buffer.lock().unwrap().push(dd_span);
255        }
256    }
257
258    // SAFETY: This is safe because the `WithContext` function pointer is valid
259    // for the lifetime of `&self`.
260    unsafe fn downcast_raw(&self, id: std::any::TypeId) -> Option<*const ()> {
261        match id {
262            id if id == std::any::TypeId::of::<Self>() => Some(self as *const _ as *const ()),
263            id if id == std::any::TypeId::of::<crate::context::WithContext>() => {
264                Some(&self.with_context as *const _ as *const ())
265            }
266            _ => None,
267        }
268    }
269}
270
271/// A builder for [`DatadogTraceLayer`].
272pub struct DatadogTraceLayerBuilder<S> {
273    service: Option<String>,
274    default_tags: HashMap<Cow<'static, str>, String>,
275    agent_address: Option<String>,
276    container_id: Option<String>,
277    logging_enabled: bool,
278    phantom_data: PhantomData<S>,
279}
280
281/// An error that can occur when building a [`DatadogTraceLayer`].
282#[derive(Debug)]
283pub struct BuilderError(&'static str);
284
285impl Display for BuilderError {
286    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
287        f.write_str(self.0)
288    }
289}
290
291impl std::error::Error for BuilderError {}
292
293impl<S> DatadogTraceLayerBuilder<S>
294where
295    S: Subscriber + for<'a> LookupSpan<'a>,
296{
297    /// Sets the `service`. This is required.
298    pub fn service(mut self, service: impl Into<String>) -> Self {
299        self.service = Some(service.into());
300        self
301    }
302
303    /// Sets the `env`. This is required.
304    pub fn env(mut self, env: impl Into<String>) -> Self {
305        self.default_tags.insert("env".into(), env.into());
306        self
307    }
308
309    /// Sets the `version`. This is required.
310    pub fn version(mut self, version: impl Into<String>) -> Self {
311        self.default_tags.insert("version".into(), version.into());
312        self
313    }
314
315    /// Sets the `agent_address`. This is required.
316    pub fn agent_address(mut self, agent_address: impl Into<String>) -> Self {
317        self.agent_address = Some(agent_address.into());
318        self
319    }
320
321    /// Adds a fixed default tag to all spans.
322    ///
323    /// This can be used multiple times for several tags.
324    ///
325    /// Default tags are overridden by tags set explicitly on a span.
326    pub fn default_tag(
327        mut self,
328        key: impl Into<Cow<'static, str>>,
329        value: impl Into<String>,
330    ) -> Self {
331        let _ = self.default_tags.insert(key.into(), value.into());
332        self
333    }
334
335    /// Sets the container ID. This enables infrastructure metrics in APM for supported platforms.
336    pub fn container_id(mut self, container_id: impl Into<String>) -> Self {
337        self.container_id = Some(container_id.into());
338        self
339    }
340
341    /// Enables or disables structured logging with trace correlation to stdout.
342    /// Disabled by default.
343    pub fn enable_logs(mut self, enable_logs: bool) -> Self {
344        self.logging_enabled = enable_logs;
345        self
346    }
347
348    /// Consumes the builder to construct the tracing layer.
349    pub fn build(self) -> Result<DatadogTraceLayer<S>, BuilderError> {
350        let Some(service) = self.service else {
351            return Err(BuilderError("service is required"));
352        };
353        if !self.default_tags.contains_key("env") {
354            return Err(BuilderError("env is required"));
355        };
356        if !self.default_tags.contains_key("version") {
357            return Err(BuilderError("version is required"));
358        };
359        let Some(agent_address) = self.agent_address else {
360            return Err(BuilderError("agent_address is required"));
361        };
362        let container_id = match self.container_id {
363            Some(s) => Some(
364                s.parse::<HeaderValue>()
365                    .map_err(|_| BuilderError("Failed to parse container ID into header"))?,
366            ),
367            _ => None,
368        };
369
370        let buffer = Arc::new(Mutex::new(Vec::new()));
371        let (shutdown, shutdown_rx) = mpsc::channel();
372
373        spawn(crate::export::exporter(
374            agent_address,
375            buffer.clone(),
376            container_id,
377            shutdown_rx,
378        ));
379
380        Ok(DatadogTraceLayer {
381            buffer,
382            service,
383            default_tags: self.default_tags,
384            logging_enabled: self.logging_enabled,
385            with_context: crate::context::WithContext(DatadogTraceLayer::<S>::get_context),
386            shutdown,
387            _registry: PhantomData,
388        })
389    }
390}
391
392/// Returns the current system time as nanoseconds since 1970.
393fn epoch_ns() -> i64 {
394    SystemTime::now()
395        .duration_since(UNIX_EPOCH)
396        .expect("SystemTime is before UNIX epoch")
397        .as_nanos() as i64
398}
399
400#[cfg(test)]
401mod tests {
402    use super::*;
403
404    #[test]
405    fn builder_builds_successfully() {
406        assert!(
407            DatadogTraceLayer::<tracing_subscriber::Registry>::builder()
408                .service("test-service")
409                .env("test")
410                .version("test-version")
411                .agent_address("localhost:8126")
412                .build()
413                .is_ok()
414        );
415    }
416
417    #[test]
418    fn service_is_required() {
419        let result = DatadogTraceLayer::<tracing_subscriber::Registry>::builder()
420            .env("test")
421            .version("test-version")
422            .agent_address("localhost:8126")
423            .build();
424        assert!(result.unwrap_err().to_string().contains("service"));
425    }
426
427    #[test]
428    fn env_is_required() {
429        let result = DatadogTraceLayer::<tracing_subscriber::Registry>::builder()
430            .service("test-service")
431            .version("test-version")
432            .agent_address("localhost:8126")
433            .build();
434        assert!(result.unwrap_err().to_string().contains("env"));
435    }
436
437    #[test]
438    fn version_is_required() {
439        let result = DatadogTraceLayer::<tracing_subscriber::Registry>::builder()
440            .service("test-service")
441            .env("test")
442            .agent_address("localhost:8126")
443            .build();
444        assert!(result.unwrap_err().to_string().contains("version"));
445    }
446
447    #[test]
448    fn agent_address_is_required() {
449        let result = DatadogTraceLayer::<tracing_subscriber::Registry>::builder()
450            .service("test-service")
451            .env("test")
452            .version("test-version")
453            .build();
454        assert!(result.unwrap_err().to_string().contains("agent_address"));
455    }
456
457    #[test]
458    fn default_default_tags_include_env_and_version() {
459        let layer: DatadogTraceLayer<tracing_subscriber::Registry> = DatadogTraceLayer::builder()
460            .service("test-service")
461            .env("test")
462            .version("test-version")
463            .agent_address("localhost:8126")
464            .build()
465            .unwrap();
466        let default_tags = &layer.default_tags;
467        assert_eq!(default_tags["env"], "test");
468        assert_eq!(default_tags["version"], "test-version");
469    }
470
471    #[test]
472    fn default_tags_can_be_added() {
473        let layer: DatadogTraceLayer<tracing_subscriber::Registry> = DatadogTraceLayer::builder()
474            .service("test-service")
475            .env("test")
476            .version("test-version")
477            .agent_address("localhost:8126")
478            .default_tag("static", "bar")
479            .default_tag(String::from("dynamic"), "qux")
480            .build()
481            .unwrap();
482        let default_tags = &layer.default_tags;
483        assert_eq!(default_tags["static"], "bar");
484        assert_eq!(default_tags["dynamic"], "qux");
485    }
486}