tracing_datadog/
export.rs1mod span;
2
3use crate::span::Span as InternalSpan;
4#[cfg(feature = "ahash")]
5use ahash::AHashMap as HashMap;
6use reqwest::header::{self, HeaderMap, HeaderName, HeaderValue};
7use rmp_serde::Serializer as MpSerializer;
8use serde::Serialize;
9use span::Span as ExportSpan;
10#[cfg(not(feature = "ahash"))]
11use std::collections::HashMap;
12use std::{
13 ops::DerefMut,
14 sync::{Arc, Mutex, mpsc},
15 thread::sleep,
16 time::Duration,
17};
18
19const DATADOG_LANGUAGE_HEADER: HeaderName = HeaderName::from_static("datadog-meta-lang");
20const DATADOG_TRACER_VERSION_HEADER: HeaderName =
21 HeaderName::from_static("datadog-meta-tracer-version");
22const DATADOG_TRACE_COUNT_HEADER: HeaderName = HeaderName::from_static("x-datadog-trace-count");
23const DATADOG_CONTAINER_ID_HEADER: HeaderName = HeaderName::from_static("datadog-container-id");
24
25#[derive(Copy, Clone)]
29#[non_exhaustive]
30pub enum ApiVersion {
31 V04,
35}
36
37impl ApiVersion {
38 fn url_path(&self) -> &'static str {
40 match self {
41 Self::V04 => "/v0.4/traces",
42 }
43 }
44
45 fn serializer(&self) -> SerializerFn {
47 match self {
48 Self::V04 => v04_trace_api_payload,
49 }
50 }
51}
52
53pub(crate) fn exporter(
54 agent_address: String,
55 api_version: ApiVersion,
56 buffer: Arc<Mutex<Vec<InternalSpan>>>,
57 container_id: Option<HeaderValue>,
58 shutdown_signal: mpsc::Receiver<()>,
59) -> impl FnOnce() {
60 move || {
61 let url = format!("http://{}{}", agent_address, api_version.url_path());
62 let client = {
63 let mut default_headers = HeaderMap::new();
64
65 if let Some(container_id) = container_id {
66 default_headers.insert(DATADOG_CONTAINER_ID_HEADER, container_id);
67 };
68
69 reqwest::blocking::Client::builder()
70 .default_headers(default_headers)
71 .retry(reqwest::retry::for_host(agent_address).max_retries_per_request(2))
72 .build()
73 .expect("Failed to build reqwest client")
74 };
75 let mut spans = Vec::new();
76
77 while let Err(mpsc::TryRecvError::Empty) = shutdown_signal.try_recv() {
78 sleep(Duration::from_secs(1));
79
80 std::mem::swap(&mut spans, buffer.lock().unwrap().deref_mut());
81
82 if spans.is_empty() {
83 continue;
84 }
85
86 let (body, trace_count) = api_version.serializer()(&mut spans);
87
88 let _ = client
89 .post(&url)
90 .header(DATADOG_TRACER_VERSION_HEADER, env!("CARGO_PKG_VERSION"))
91 .header(DATADOG_LANGUAGE_HEADER, "rust")
92 .header(DATADOG_TRACE_COUNT_HEADER, trace_count)
93 .header(header::CONTENT_TYPE, "application/msgpack")
94 .body(body)
95 .send()
96 .inspect_err(|error| tracing::error!(?error, "Error exporting spans"));
97 }
98 }
99}
100
101fn group_traces(
103 spans: impl Iterator<Item = InternalSpan>,
104) -> impl Iterator<Item = Vec<ExportSpan>> {
105 let mut traces = HashMap::new();
106 spans.for_each(|span| {
107 traces
108 .entry(span.trace_id)
109 .or_insert_with(Vec::new)
110 .push(ExportSpan::from(span));
111 });
112 traces.into_values()
113}
114
115type SerializerFn = fn(&mut Vec<InternalSpan>) -> (Vec<u8>, usize);
117
118fn v04_trace_api_payload(spans: &mut Vec<InternalSpan>) -> (Vec<u8>, usize) {
122 let mut payload = vec![];
123
124 let trace_chunks = group_traces(spans.drain(..)).collect::<Vec<_>>();
125
126 let _ = trace_chunks
127 .serialize(&mut MpSerializer::new(&mut payload).with_struct_map())
128 .inspect_err(|error| tracing::error!(?error, "Error serializing spans"));
129
130 (payload, trace_chunks.len())
131}