Skip to main content

tracing_datadog/
layer.rs

1use crate::export::ApiVersion;
2use crate::{
3    log::{FieldVisitor, Log},
4    span::{Span, SpanAttributeVisitor, SpanLink},
5};
6#[cfg(feature = "ahash")]
7use ahash::AHashMap as HashMap;
8use jiff::Zoned;
9use reqwest::header::HeaderValue;
10#[cfg(not(feature = "ahash"))]
11use std::collections::HashMap;
12use std::{
13    borrow::Cow,
14    fmt::{Display, Formatter},
15    marker::PhantomData,
16    sync::{Arc, Mutex, mpsc},
17    thread::spawn,
18    time::{SystemTime, UNIX_EPOCH},
19};
20use tracing_core::{
21    Event, Subscriber,
22    span::{Attributes, Id, Record},
23};
24use tracing_subscriber::{
25    Layer,
26    layer::Context,
27    registry::{LookupSpan, Scope},
28};
29
30/// A [`Layer`] that sends traces to Datadog.
31///
32/// ```
33/// # use tracing_subscriber::prelude::*;
34/// # use tracing_datadog::DatadogTraceLayer;
35/// tracing_subscriber::registry()
36///    .with(
37///        DatadogTraceLayer::builder()
38///            .service("my-service")
39///            .env("production")
40///            .version("git sha")
41///            .agent_address("localhost:8126")
42///            .build()
43///            .expect("failed to build DatadogTraceLayer"),
44///    )
45///    .init();
46/// ```
47#[derive(Debug)]
48pub struct DatadogTraceLayer<S> {
49    buffer: Arc<Mutex<Vec<Span>>>,
50    service: String,
51    default_tags: HashMap<Cow<'static, str>, String>,
52    logging_enabled: bool,
53    with_context: crate::context::WithContext,
54    shutdown: mpsc::Sender<()>,
55    _registry: PhantomData<S>,
56}
57
58impl<S> DatadogTraceLayer<S>
59where
60    S: Subscriber + for<'a> LookupSpan<'a>,
61{
62    /// Creates a builder to construct a [`DatadogTraceLayer`].
63    pub fn builder() -> DatadogTraceLayerBuilder<S> {
64        DatadogTraceLayerBuilder {
65            service: None,
66            default_tags: HashMap::from_iter([("span.kind".into(), "internal".to_string())]),
67            agent_address: None,
68            api_version: ApiVersion::V04,
69            container_id: None,
70            logging_enabled: false,
71            phantom_data: Default::default(),
72        }
73    }
74
75    fn get_context(dispatch: &tracing_core::Dispatch, id: &Id, f: &mut dyn FnMut(&mut Span)) {
76        let subscriber = dispatch
77            .downcast_ref::<S>()
78            .expect("Subscriber did not downcast to expected type, this is a bug");
79        let span = subscriber.span(id).expect("Span not found, this is a bug");
80
81        let mut extensions = span.extensions_mut();
82        if let Some(dd_span) = extensions.get_mut::<Span>() {
83            f(dd_span);
84        }
85    }
86}
87
88impl<S> Drop for DatadogTraceLayer<S> {
89    fn drop(&mut self) {
90        let _ = self.shutdown.send(());
91    }
92}
93
94impl<S> Layer<S> for DatadogTraceLayer<S>
95where
96    S: Subscriber + for<'a> LookupSpan<'a>,
97{
98    fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, ctx: Context<'_, S>) {
99        let span = ctx.span(id).expect("Span not found, this is a bug");
100        let mut extensions = span.extensions_mut();
101
102        let trace_id = span
103            .parent()
104            .map(|parent| {
105                parent
106                    .extensions()
107                    .get::<Span>()
108                    .expect("Parent span didn't have a DatadogSpan extension, this is a bug")
109                    .trace_id
110            })
111            .unwrap_or(rand::random_range(1..=u128::MAX));
112
113        debug_assert!(trace_id != 0, "Trace ID is zero, this is a bug");
114
115        let mut dd_span = Span {
116            name: span.name().to_string(),
117            service: self.service.clone(),
118            r#type: "custom".into(),
119            span_id: span.id().into_u64(),
120            start: epoch_ns(),
121            parent_id: span
122                .parent()
123                .map(|parent| parent.id().into_u64())
124                .unwrap_or_default(),
125            trace_id,
126            meta: self.default_tags.clone(),
127            metrics: {
128                let mut m = HashMap::new();
129                if span.parent().is_none() {
130                    // Special tag to mark the service entry span.
131                    m.insert("_dd.top_level".into(), 1.0);
132                    m.insert("_dd.agent_psr".into(), 1.0);
133                    m.insert("_dd.rule_psr".into(), 1.0);
134                    m.insert("_dd.limit_psr".into(), 1.0);
135                    m.insert("_sample_rate".into(), 1.0);
136                    m.insert("_dd.tracer_kr".into(), 1.0);
137                }
138                m.insert("_sampling_priority_v1".into(), 2.0);
139                m.insert("process_id".into(), std::process::id() as f64);
140                m
141            },
142            ..Default::default()
143        };
144
145        attrs.record(&mut SpanAttributeVisitor::new(&mut dd_span));
146
147        extensions.insert(dd_span);
148    }
149
150    fn on_record(&self, id: &Id, values: &Record<'_>, ctx: Context<'_, S>) {
151        let span = ctx.span(id).expect("Span not found, this is a bug");
152        let mut extensions = span.extensions_mut();
153
154        if let Some(dd_span) = extensions.get_mut::<Span>() {
155            values.record(&mut SpanAttributeVisitor::new(dd_span));
156        }
157    }
158
159    fn on_follows_from(&self, id: &Id, follows: &Id, ctx: Context<'_, S>) {
160        let span = ctx.span(id).expect("Span not found, this is a bug");
161        let mut extensions = span.extensions_mut();
162
163        let Some(other_span) = ctx.span(follows) else {
164            // The other span might be filtered or closed, so we can't access it.
165            return;
166        };
167
168        if let Some(dd_span) = extensions.get_mut::<Span>()
169            && let Some(other_dd_span) = other_span.extensions().get::<Span>()
170        {
171            dd_span.span_links.push(SpanLink {
172                trace_id: other_dd_span.trace_id,
173                span_id: other_dd_span.span_id,
174            })
175        }
176    }
177
178    fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
179        if !self.logging_enabled {
180            return;
181        }
182
183        let mut fields = {
184            let mut visitor = FieldVisitor::default();
185            event.record(&mut visitor);
186            visitor.finish()
187        };
188
189        fields.extend(
190            ctx.event_scope(event)
191                .into_iter()
192                .flat_map(Scope::from_root)
193                .flat_map(|span| match span.extensions().get::<Span>() {
194                    Some(dd_span) => dd_span.meta.clone(),
195                    None => panic!("Datadog Span extension not found, this is a bug"),
196                }),
197        );
198
199        let message = fields.remove("message").unwrap_or_default();
200
201        let trace_context = ctx.lookup_current().and_then(|span| {
202            span.extensions()
203                .get::<Span>()
204                .map(|dd_span| (dd_span.trace_id, dd_span.span_id))
205        });
206
207        let log = Log {
208            timestamp: Zoned::now().timestamp(),
209            level: event.metadata().level().to_owned(),
210            message,
211            trace_context,
212            fields,
213        };
214
215        let serialized = serde_json::to_string(&log).expect("Failed to serialize log");
216
217        println!("{serialized}");
218    }
219
220    fn on_enter(&self, id: &Id, ctx: Context<'_, S>) {
221        let span = ctx.span(id).expect("Span not found, this is a bug");
222        let mut extensions = span.extensions_mut();
223
224        let now = epoch_ns();
225
226        match extensions.get_mut::<Span>() {
227            Some(dd_span) if dd_span.start == 0 => dd_span.start = now,
228            _ => {}
229        }
230    }
231
232    fn on_exit(&self, id: &Id, ctx: Context<'_, S>) {
233        let span = ctx.span(id).expect("Span not found, this is a bug");
234        let mut extensions = span.extensions_mut();
235
236        let now = epoch_ns();
237
238        if let Some(dd_span) = extensions.get_mut::<Span>() {
239            dd_span.duration = now - dd_span.start
240        }
241    }
242
243    fn on_close(&self, id: Id, ctx: Context<'_, S>) {
244        let span = ctx.span(&id).expect("Span not found, this is a bug");
245        let mut extensions = span.extensions_mut();
246
247        if let Some(mut dd_span) = extensions.remove::<Span>() {
248            // Enable trace metrics for select span kinds.
249            if let Some("server" | "client" | "consumer" | "producer") =
250                dd_span.meta.get("span.kind").map(String::as_str)
251            {
252                dd_span.metrics.insert("_dd.measured".into(), 1.0);
253                dd_span.metrics.insert("_dd1.sr.eausr".into(), 1.0);
254            }
255
256            self.buffer.lock().unwrap().push(dd_span);
257        }
258    }
259
260    // SAFETY: This is safe because the `WithContext` function pointer is valid
261    // for the lifetime of `&self`.
262    unsafe fn downcast_raw(&self, id: std::any::TypeId) -> Option<*const ()> {
263        match id {
264            id if id == std::any::TypeId::of::<Self>() => Some(self as *const _ as *const ()),
265            id if id == std::any::TypeId::of::<crate::context::WithContext>() => {
266                Some(&self.with_context as *const _ as *const ())
267            }
268            _ => None,
269        }
270    }
271}
272
273/// A builder for [`DatadogTraceLayer`].
274pub struct DatadogTraceLayerBuilder<S> {
275    service: Option<String>,
276    default_tags: HashMap<Cow<'static, str>, String>,
277    agent_address: Option<String>,
278    api_version: ApiVersion,
279    container_id: Option<String>,
280    logging_enabled: bool,
281    phantom_data: PhantomData<S>,
282}
283
284/// An error that can occur when building a [`DatadogTraceLayer`].
285#[derive(Debug)]
286pub struct BuilderError(&'static str);
287
288impl Display for BuilderError {
289    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
290        f.write_str(self.0)
291    }
292}
293
294impl std::error::Error for BuilderError {}
295
296impl<S> DatadogTraceLayerBuilder<S>
297where
298    S: Subscriber + for<'a> LookupSpan<'a>,
299{
300    /// Sets the `service`. This is required.
301    pub fn service(mut self, service: impl Into<String>) -> Self {
302        self.service = Some(service.into());
303        self
304    }
305
306    /// Sets the `env`. This is required.
307    pub fn env(mut self, env: impl Into<String>) -> Self {
308        self.default_tags.insert("env".into(), env.into());
309        self
310    }
311
312    /// Sets the `version`. This is required.
313    pub fn version(mut self, version: impl Into<String>) -> Self {
314        self.default_tags.insert("version".into(), version.into());
315        self
316    }
317
318    /// Sets the `agent_address`. This is required.
319    pub fn agent_address(mut self, agent_address: impl Into<String>) -> Self {
320        self.agent_address = Some(agent_address.into());
321        self
322    }
323
324    /// Sets the Datadog trace agent API version. Defaults to [`ApiVersion::V04`].
325    pub fn api_version(mut self, api_version: ApiVersion) -> Self {
326        self.api_version = api_version;
327        self
328    }
329
330    /// Adds a fixed default tag to all spans.
331    ///
332    /// This can be used multiple times for several tags.
333    ///
334    /// Default tags are overridden by tags set explicitly on a span.
335    pub fn default_tag(
336        mut self,
337        key: impl Into<Cow<'static, str>>,
338        value: impl Into<String>,
339    ) -> Self {
340        let _ = self.default_tags.insert(key.into(), value.into());
341        self
342    }
343
344    /// Sets the container ID. This enables infrastructure metrics in APM for supported platforms.
345    pub fn container_id(mut self, container_id: impl Into<String>) -> Self {
346        self.container_id = Some(container_id.into());
347        self
348    }
349
350    /// Enables or disables structured logging with trace correlation to stdout.
351    /// Disabled by default.
352    pub fn enable_logs(mut self, enable_logs: bool) -> Self {
353        self.logging_enabled = enable_logs;
354        self
355    }
356
357    /// Consumes the builder to construct the tracing layer.
358    pub fn build(self) -> Result<DatadogTraceLayer<S>, BuilderError> {
359        let Some(service) = self.service else {
360            return Err(BuilderError("service is required"));
361        };
362        if !self.default_tags.contains_key("env") {
363            return Err(BuilderError("env is required"));
364        };
365        if !self.default_tags.contains_key("version") {
366            return Err(BuilderError("version is required"));
367        };
368        let Some(agent_address) = self.agent_address else {
369            return Err(BuilderError("agent_address is required"));
370        };
371        let container_id = match self.container_id {
372            Some(s) => Some(
373                s.parse::<HeaderValue>()
374                    .map_err(|_| BuilderError("Failed to parse container ID into header"))?,
375            ),
376            _ => None,
377        };
378
379        let buffer = Arc::new(Mutex::new(Vec::new()));
380        let (shutdown, shutdown_rx) = mpsc::channel();
381
382        spawn(crate::export::exporter(
383            agent_address,
384            self.api_version,
385            buffer.clone(),
386            container_id,
387            shutdown_rx,
388        ));
389
390        Ok(DatadogTraceLayer {
391            buffer,
392            service,
393            default_tags: self.default_tags,
394            logging_enabled: self.logging_enabled,
395            with_context: crate::context::WithContext(DatadogTraceLayer::<S>::get_context),
396            shutdown,
397            _registry: PhantomData,
398        })
399    }
400}
401
402/// Returns the current system time as nanoseconds since 1970.
403fn epoch_ns() -> i64 {
404    SystemTime::now()
405        .duration_since(UNIX_EPOCH)
406        .expect("SystemTime is before UNIX epoch")
407        .as_nanos() as i64
408}
409
410#[cfg(test)]
411mod tests {
412    use super::*;
413
414    #[test]
415    fn builder_builds_successfully() {
416        assert!(
417            DatadogTraceLayer::<tracing_subscriber::Registry>::builder()
418                .service("test-service")
419                .env("test")
420                .version("test-version")
421                .agent_address("localhost:8126")
422                .build()
423                .is_ok()
424        );
425    }
426
427    #[test]
428    fn service_is_required() {
429        let result = DatadogTraceLayer::<tracing_subscriber::Registry>::builder()
430            .env("test")
431            .version("test-version")
432            .agent_address("localhost:8126")
433            .build();
434        assert!(result.unwrap_err().to_string().contains("service"));
435    }
436
437    #[test]
438    fn env_is_required() {
439        let result = DatadogTraceLayer::<tracing_subscriber::Registry>::builder()
440            .service("test-service")
441            .version("test-version")
442            .agent_address("localhost:8126")
443            .build();
444        assert!(result.unwrap_err().to_string().contains("env"));
445    }
446
447    #[test]
448    fn version_is_required() {
449        let result = DatadogTraceLayer::<tracing_subscriber::Registry>::builder()
450            .service("test-service")
451            .env("test")
452            .agent_address("localhost:8126")
453            .build();
454        assert!(result.unwrap_err().to_string().contains("version"));
455    }
456
457    #[test]
458    fn agent_address_is_required() {
459        let result = DatadogTraceLayer::<tracing_subscriber::Registry>::builder()
460            .service("test-service")
461            .env("test")
462            .version("test-version")
463            .build();
464        assert!(result.unwrap_err().to_string().contains("agent_address"));
465    }
466
467    #[test]
468    fn default_default_tags_include_env_and_version() {
469        let layer: DatadogTraceLayer<tracing_subscriber::Registry> = DatadogTraceLayer::builder()
470            .service("test-service")
471            .env("test")
472            .version("test-version")
473            .agent_address("localhost:8126")
474            .build()
475            .unwrap();
476        let default_tags = &layer.default_tags;
477        assert_eq!(default_tags["env"], "test");
478        assert_eq!(default_tags["version"], "test-version");
479    }
480
481    #[test]
482    fn default_tags_can_be_added() {
483        let layer: DatadogTraceLayer<tracing_subscriber::Registry> = DatadogTraceLayer::builder()
484            .service("test-service")
485            .env("test")
486            .version("test-version")
487            .agent_address("localhost:8126")
488            .default_tag("static", "bar")
489            .default_tag(String::from("dynamic"), "qux")
490            .build()
491            .unwrap();
492        let default_tags = &layer.default_tags;
493        assert_eq!(default_tags["static"], "bar");
494        assert_eq!(default_tags["dynamic"], "qux");
495    }
496}