Skip to main content

tracing_datadog/
export.rs

1mod span;
2
3use crate::span::Span as InternalSpan;
4#[cfg(feature = "ahash")]
5use ahash::AHashMap as HashMap;
6use reqwest::header::{self, HeaderMap, HeaderName, HeaderValue};
7use rmp_serde::Serializer as MpSerializer;
8use serde::Serialize;
9use span::Span as ExportSpan;
10#[cfg(not(feature = "ahash"))]
11use std::collections::HashMap;
12use std::{
13    ops::DerefMut,
14    sync::{Arc, Mutex, mpsc},
15    thread::sleep,
16    time::Duration,
17};
18
19const DATADOG_LANGUAGE_HEADER: HeaderName = HeaderName::from_static("datadog-meta-lang");
20const DATADOG_TRACER_VERSION_HEADER: HeaderName =
21    HeaderName::from_static("datadog-meta-tracer-version");
22const DATADOG_TRACE_COUNT_HEADER: HeaderName = HeaderName::from_static("x-datadog-trace-count");
23const DATADOG_CONTAINER_ID_HEADER: HeaderName = HeaderName::from_static("datadog-container-id");
24
25/// The different versions of the Datadog trace API.
26///
27/// This maps to <https://github.com/DataDog/datadog-agent/blob/main/pkg/trace/api/version.go>.
28#[derive(Copy, Clone)]
29#[non_exhaustive]
30pub enum ApiVersion {
31    /// v0.4 sends all trace chunks as-is, as a big array.
32    ///
33    /// This is the default.
34    V04,
35}
36
37impl ApiVersion {
38    /// Returns the URL path for the given API version.
39    fn url_path(&self) -> &'static str {
40        match self {
41            Self::V04 => "/v0.4/traces",
42        }
43    }
44
45    /// Returns a function that produces a payload for the given API version.
46    fn serializer(&self) -> SerializerFn {
47        match self {
48            Self::V04 => v04_trace_api_payload,
49        }
50    }
51}
52
53pub(crate) fn exporter(
54    agent_address: String,
55    api_version: ApiVersion,
56    buffer: Arc<Mutex<Vec<InternalSpan>>>,
57    container_id: Option<HeaderValue>,
58    shutdown_signal: mpsc::Receiver<()>,
59) -> impl FnOnce() {
60    move || {
61        let url = format!("http://{}{}", agent_address, api_version.url_path());
62        let client = {
63            let mut default_headers = HeaderMap::new();
64
65            if let Some(container_id) = container_id {
66                default_headers.insert(DATADOG_CONTAINER_ID_HEADER, container_id);
67            };
68
69            reqwest::blocking::Client::builder()
70                .default_headers(default_headers)
71                .retry(reqwest::retry::for_host(agent_address).max_retries_per_request(2))
72                .build()
73                .expect("Failed to build reqwest client")
74        };
75        let mut spans = Vec::new();
76
77        while let Err(mpsc::TryRecvError::Empty) = shutdown_signal.try_recv() {
78            sleep(Duration::from_secs(1));
79
80            std::mem::swap(&mut spans, buffer.lock().unwrap().deref_mut());
81
82            if spans.is_empty() {
83                continue;
84            }
85
86            let (body, trace_count) = api_version.serializer()(&mut spans);
87
88            let _ = client
89                .post(&url)
90                .header(DATADOG_TRACER_VERSION_HEADER, env!("CARGO_PKG_VERSION"))
91                .header(DATADOG_LANGUAGE_HEADER, "rust")
92                .header(DATADOG_TRACE_COUNT_HEADER, trace_count)
93                .header(header::CONTENT_TYPE, "application/msgpack")
94                .body(body)
95                .send()
96                .inspect_err(|error| tracing::error!(?error, "Error exporting spans"));
97        }
98    }
99}
100
101/// Groups spans into trace chunks.
102fn group_traces(
103    spans: impl Iterator<Item = InternalSpan>,
104) -> impl Iterator<Item = Vec<ExportSpan>> {
105    let mut traces = HashMap::new();
106    spans.for_each(|span| {
107        traces
108            .entry(span.trace_id)
109            .or_insert_with(Vec::new)
110            .push(ExportSpan::from(span));
111    });
112    traces.into_values()
113}
114
115/// The type of a function that produces a payload for a given API version.
116type SerializerFn = fn(&mut Vec<InternalSpan>) -> (Vec<u8>, usize);
117
118/// Produces the payload for the v0.4 Datadog trace API.
119///
120/// Also returns the number of traces serialized.
121fn v04_trace_api_payload(spans: &mut Vec<InternalSpan>) -> (Vec<u8>, usize) {
122    let mut payload = vec![];
123
124    let trace_chunks = group_traces(spans.drain(..)).collect::<Vec<_>>();
125
126    let _ = trace_chunks
127        .serialize(&mut MpSerializer::new(&mut payload).with_struct_map())
128        .inspect_err(|error| tracing::error!(?error, "Error serializing spans"));
129
130    (payload, trace_chunks.len())
131}