opentelemetry_datadog/exporter/
mod.rs

1mod intern;
2mod model;
3
4pub use model::ApiVersion;
5pub use model::Error;
6pub use model::FieldMappingFn;
7
8use crate::exporter::model::FieldMapping;
9use http::{Method, Request, Uri};
10use opentelemetry::{Key, KeyValue};
11use opentelemetry_http::{HttpClient, ResponseExt};
12use opentelemetry_sdk::{
13    error::{OTelSdkError, OTelSdkResult},
14    resource::{ResourceDetector, SdkProvidedResourceDetector},
15    trace::{Config, SdkTracerProvider, TraceError},
16    trace::{SpanData, SpanExporter},
17    Resource,
18};
19use opentelemetry_semantic_conventions as semcov;
20use std::borrow::Cow;
21use std::fmt::{Debug, Formatter};
22use std::sync::Arc;
23use url::Url;
24
25use self::model::unified_tags::UnifiedTags;
26
27/// Default Datadog collector endpoint
28const DEFAULT_AGENT_ENDPOINT: &str = "http://127.0.0.1:8126";
29
30/// Header name used to inform the Datadog agent of the number of traces in the payload
31const DATADOG_TRACE_COUNT_HEADER: &str = "X-Datadog-Trace-Count";
32
33/// Header name use to inform datadog as to what version
34const DATADOG_META_LANG_HEADER: &str = "Datadog-Meta-Lang";
35const DATADOG_META_TRACER_VERSION_HEADER: &str = "Datadog-Meta-Tracer-Version";
36
37// Struct to hold the mapping between Opentelemetry spans and datadog spans.
38pub struct Mapping {
39    resource: Option<FieldMapping>,
40    name: Option<FieldMapping>,
41    service_name: Option<FieldMapping>,
42}
43
44impl Mapping {
45    pub fn new(
46        resource: Option<FieldMapping>,
47        name: Option<FieldMapping>,
48        service_name: Option<FieldMapping>,
49    ) -> Self {
50        Mapping {
51            resource,
52            name,
53            service_name,
54        }
55    }
56    pub fn empty() -> Self {
57        Self::new(None, None, None)
58    }
59}
60
61/// Datadog span exporter
62pub struct DatadogExporter {
63    client: Arc<dyn HttpClient>,
64    request_url: Uri,
65    model_config: ModelConfig,
66    api_version: ApiVersion,
67    mapping: Mapping,
68    unified_tags: UnifiedTags,
69    resource: Option<Resource>,
70}
71
72impl DatadogExporter {
73    fn new(
74        model_config: ModelConfig,
75        request_url: Uri,
76        api_version: ApiVersion,
77        client: Arc<dyn HttpClient>,
78        mapping: Mapping,
79        unified_tags: UnifiedTags,
80    ) -> Self {
81        DatadogExporter {
82            client,
83            request_url,
84            model_config,
85            api_version,
86            mapping,
87            unified_tags,
88            resource: None,
89        }
90    }
91
92    fn build_request(
93        &self,
94        mut batch: Vec<SpanData>,
95    ) -> Result<http::Request<Vec<u8>>, OTelSdkError> {
96        let traces: Vec<&[SpanData]> = group_into_traces(&mut batch);
97        let trace_count = traces.len();
98        let data = self
99            .api_version
100            .encode(
101                &self.model_config,
102                traces,
103                &self.mapping,
104                &self.unified_tags,
105                self.resource.as_ref(),
106            )
107            .map_err(|e| OTelSdkError::InternalFailure(format!("{:?}", e)))?;
108        let req = Request::builder()
109            .method(Method::POST)
110            .uri(self.request_url.clone())
111            .header(http::header::CONTENT_TYPE, self.api_version.content_type())
112            .header(DATADOG_TRACE_COUNT_HEADER, trace_count)
113            .header(DATADOG_META_LANG_HEADER, "rust")
114            .header(
115                DATADOG_META_TRACER_VERSION_HEADER,
116                env!("CARGO_PKG_VERSION"),
117            )
118            .body(data)
119            .map_err(|e| OTelSdkError::InternalFailure(format!("{:?}", e)));
120        Ok(req)?
121    }
122}
123
124impl Debug for DatadogExporter {
125    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
126        f.debug_struct("DatadogExporter")
127            .field("model_config", &self.model_config)
128            .field("request_url", &self.request_url)
129            .field("api_version", &self.api_version)
130            .field("client", &self.client)
131            .field("resource_mapping", &mapping_debug(&self.mapping.resource))
132            .field("name_mapping", &mapping_debug(&self.mapping.name))
133            .field(
134                "service_name_mapping",
135                &mapping_debug(&self.mapping.service_name),
136            )
137            .finish()
138    }
139}
140
141/// Create a new Datadog exporter pipeline builder.
142pub fn new_pipeline() -> DatadogPipelineBuilder {
143    DatadogPipelineBuilder::default()
144}
145
146/// Builder for `ExporterConfig` struct.
147pub struct DatadogPipelineBuilder {
148    agent_endpoint: String,
149    trace_config: Option<Config>,
150    api_version: ApiVersion,
151    client: Option<Arc<dyn HttpClient>>,
152    mapping: Mapping,
153    unified_tags: UnifiedTags,
154}
155
156impl Default for DatadogPipelineBuilder {
157    fn default() -> Self {
158        DatadogPipelineBuilder {
159            agent_endpoint: DEFAULT_AGENT_ENDPOINT.to_string(),
160            trace_config: None,
161            mapping: Mapping::empty(),
162            api_version: ApiVersion::Version05,
163            unified_tags: UnifiedTags::new(),
164            #[cfg(all(
165                not(feature = "reqwest-client"),
166                not(feature = "reqwest-blocking-client"),
167                not(feature = "surf-client"),
168            ))]
169            client: None,
170            #[cfg(all(
171                not(feature = "reqwest-client"),
172                not(feature = "reqwest-blocking-client"),
173                feature = "surf-client"
174            ))]
175            client: Some(Arc::new(surf::Client::new())),
176            #[cfg(all(
177                not(feature = "surf-client"),
178                not(feature = "reqwest-blocking-client"),
179                feature = "reqwest-client"
180            ))]
181            client: Some(Arc::new(reqwest::Client::new())),
182            #[cfg(feature = "reqwest-blocking-client")]
183            client: Some(Arc::new(reqwest::blocking::Client::new())),
184        }
185    }
186}
187
188impl Debug for DatadogPipelineBuilder {
189    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
190        f.debug_struct("DatadogExporter")
191            .field("agent_endpoint", &self.agent_endpoint)
192            .field("trace_config", &self.trace_config)
193            .field("client", &self.client)
194            .field("resource_mapping", &mapping_debug(&self.mapping.resource))
195            .field("name_mapping", &mapping_debug(&self.mapping.name))
196            .field(
197                "service_name_mapping",
198                &mapping_debug(&self.mapping.service_name),
199            )
200            .finish()
201    }
202}
203
204impl DatadogPipelineBuilder {
205    /// Building a new exporter.
206    ///
207    /// This is useful if you are manually constructing a pipeline.
208    pub fn build_exporter(mut self) -> Result<DatadogExporter, TraceError> {
209        let (_, service_name) = self.build_config_and_service_name();
210        self.build_exporter_with_service_name(service_name)
211    }
212
213    fn build_config_and_service_name(&mut self) -> (Config, String) {
214        let service_name = self.unified_tags.service();
215        if let Some(service_name) = service_name {
216            let config = if let Some(mut cfg) = self.trace_config.take() {
217                cfg.resource = Cow::Owned(
218                    Resource::builder()
219                        .with_attributes(
220                            cfg.resource
221                                .iter()
222                                .filter(|(k, _v)| k.as_str() != semcov::resource::SERVICE_NAME)
223                                .map(|(k, v)| KeyValue::new(k.clone(), v.clone())),
224                        )
225                        .build(),
226                );
227                cfg
228            } else {
229                let mut cfg = Config::default();
230                cfg.resource = Cow::Owned(Resource::builder_empty().build());
231                cfg
232            };
233            (config, service_name)
234        } else {
235            let service_name = SdkProvidedResourceDetector
236                .detect()
237                .get(&Key::new(semcov::resource::SERVICE_NAME))
238                .unwrap()
239                .to_string();
240            let mut cfg = Config::default();
241            // use a empty resource to prevent TracerProvider to assign a service name.
242            cfg.resource = Cow::Owned(Resource::builder_empty().build());
243            (cfg, service_name)
244        }
245    }
246
247    // parse the endpoint and append the path based on versions.
248    // keep the query and host the same.
249    fn build_endpoint(agent_endpoint: &str, version: &str) -> Result<Uri, TraceError> {
250        // build agent endpoint based on version
251        let mut endpoint = agent_endpoint
252            .parse::<Url>()
253            .map_err::<Error, _>(Into::into)?;
254        let mut paths = endpoint
255            .path_segments()
256            .map(|c| c.filter(|s| !s.is_empty()).collect::<Vec<_>>())
257            .unwrap_or_default();
258        paths.push(version);
259
260        let path_str = paths.join("/");
261        endpoint.set_path(path_str.as_str());
262
263        Ok(endpoint.as_str().parse().map_err::<Error, _>(Into::into)?)
264    }
265
266    fn build_exporter_with_service_name(
267        self,
268        service_name: String,
269    ) -> Result<DatadogExporter, TraceError> {
270        if let Some(client) = self.client {
271            let model_config = ModelConfig { service_name };
272
273            let exporter = DatadogExporter::new(
274                model_config,
275                Self::build_endpoint(&self.agent_endpoint, self.api_version.path())?,
276                self.api_version,
277                client,
278                self.mapping,
279                self.unified_tags,
280            );
281            Ok(exporter)
282        } else {
283            Err(Error::NoHttpClient.into())
284        }
285    }
286
287    /// Install the Datadog trace exporter pipeline using a simple span processor.
288    pub fn install_simple(mut self) -> Result<SdkTracerProvider, TraceError> {
289        let (config, service_name) = self.build_config_and_service_name();
290        let exporter = self.build_exporter_with_service_name(service_name)?;
291        Ok(SdkTracerProvider::builder()
292            .with_simple_exporter(exporter)
293            .with_resource(config.resource.into_owned())
294            .build())
295    }
296
297    /// Install the Datadog trace exporter pipeline using a batch span processor with the specified
298    /// runtime.
299    pub fn install_batch(mut self) -> Result<SdkTracerProvider, TraceError> {
300        let (config, service_name) = self.build_config_and_service_name();
301        let exporter = self.build_exporter_with_service_name(service_name)?;
302        Ok(SdkTracerProvider::builder()
303            .with_batch_exporter(exporter)
304            .with_resource(config.resource.into_owned())
305            .build())
306    }
307
308    /// Assign the service name under which to group traces
309    pub fn with_service_name<T: Into<String>>(mut self, service_name: T) -> Self {
310        self.unified_tags.set_service(Some(service_name.into()));
311        self
312    }
313
314    /// Assign the version under which to group traces
315    pub fn with_version<T: Into<String>>(mut self, version: T) -> Self {
316        self.unified_tags.set_version(Some(version.into()));
317        self
318    }
319
320    /// Assign the env under which to group traces
321    pub fn with_env<T: Into<String>>(mut self, env: T) -> Self {
322        self.unified_tags.set_env(Some(env.into()));
323        self
324    }
325
326    /// Assign the Datadog collector endpoint.
327    ///
328    /// The endpoint of the datadog agent, by default it is `http://127.0.0.1:8126`.
329    pub fn with_agent_endpoint<T: Into<String>>(mut self, endpoint: T) -> Self {
330        self.agent_endpoint = endpoint.into();
331        self
332    }
333
334    /// Choose the http client used by uploader
335    pub fn with_http_client<T: HttpClient + 'static>(mut self, client: T) -> Self {
336        self.client = Some(Arc::new(client));
337        self
338    }
339
340    /// Assign the SDK trace configuration
341    pub fn with_trace_config(mut self, config: Config) -> Self {
342        self.trace_config = Some(config);
343        self
344    }
345
346    /// Set version of Datadog trace ingestion API
347    pub fn with_api_version(mut self, api_version: ApiVersion) -> Self {
348        self.api_version = api_version;
349        self
350    }
351
352    /// Custom the value used for `resource` field in datadog spans.
353    /// See [`FieldMappingFn`] for details.
354    pub fn with_resource_mapping<F>(mut self, f: F) -> Self
355    where
356        F: for<'a> Fn(&'a SpanData, &'a ModelConfig) -> &'a str + Send + Sync + 'static,
357    {
358        self.mapping.resource = Some(Arc::new(f));
359        self
360    }
361
362    /// Custom the value used for `name` field in datadog spans.
363    /// See [`FieldMappingFn`] for details.
364    pub fn with_name_mapping<F>(mut self, f: F) -> Self
365    where
366        F: for<'a> Fn(&'a SpanData, &'a ModelConfig) -> &'a str + Send + Sync + 'static,
367    {
368        self.mapping.name = Some(Arc::new(f));
369        self
370    }
371
372    /// Custom the value used for `service_name` field in datadog spans.
373    /// See [`FieldMappingFn`] for details.
374    pub fn with_service_name_mapping<F>(mut self, f: F) -> Self
375    where
376        F: for<'a> Fn(&'a SpanData, &'a ModelConfig) -> &'a str + Send + Sync + 'static,
377    {
378        self.mapping.service_name = Some(Arc::new(f));
379        self
380    }
381}
382
383fn group_into_traces(spans: &mut [SpanData]) -> Vec<&[SpanData]> {
384    if spans.is_empty() {
385        return vec![];
386    }
387
388    spans.sort_unstable_by_key(|x| x.span_context.trace_id().to_bytes());
389
390    let mut traces: Vec<&[SpanData]> = Vec::with_capacity(spans.len());
391
392    let mut start = 0;
393    let mut start_trace_id = spans[start].span_context.trace_id();
394    for (idx, span) in spans.iter().enumerate() {
395        let current_trace_id = span.span_context.trace_id();
396        if start_trace_id != current_trace_id {
397            traces.push(&spans[start..idx]);
398            start = idx;
399            start_trace_id = current_trace_id;
400        }
401    }
402    traces.push(&spans[start..]);
403    traces
404}
405
406async fn send_request(
407    client: Arc<dyn HttpClient>,
408    request: http::Request<Vec<u8>>,
409) -> OTelSdkResult {
410    #[allow(deprecated)]
411    let response = client
412        .send(request)
413        .await
414        .map_err(|e| OTelSdkError::InternalFailure(format!("HTTP request failed: {}", e)))?;
415
416    response
417        .error_for_status()
418        .map_err(|e| OTelSdkError::InternalFailure(format!("HTTP response error: {}", e)))?;
419
420    Ok(())
421}
422
423impl SpanExporter for DatadogExporter {
424    /// Export spans to datadog-agent
425    async fn export(&self, batch: Vec<SpanData>) -> OTelSdkResult {
426        let request = match self.build_request(batch) {
427            Ok(req) => req,
428            Err(err) => return Err(err),
429        };
430
431        let client = self.client.clone();
432        send_request(client, request).await
433    }
434    fn set_resource(&mut self, resource: &Resource) {
435        self.resource = Some(resource.clone());
436    }
437}
438
439/// Helper struct to custom the mapping between Opentelemetry spans and datadog spans.
440///
441/// This struct will be passed to [`FieldMappingFn`]
442#[derive(Default, Debug)]
443#[non_exhaustive]
444pub struct ModelConfig {
445    pub service_name: String,
446}
447
448fn mapping_debug(f: &Option<FieldMapping>) -> String {
449    if f.is_some() {
450        "custom mapping"
451    } else {
452        "default mapping"
453    }
454    .to_string()
455}
456
457#[cfg(test)]
458mod tests {
459    use super::*;
460    use crate::ApiVersion::Version05;
461
462    use crate::exporter::model::tests::get_span;
463    use bytes::Bytes;
464
465    #[test]
466    fn test_out_of_order_group() {
467        let mut batch = vec![get_span(1, 1, 1), get_span(2, 2, 2), get_span(1, 1, 3)];
468        let expected = vec![
469            vec![get_span(1, 1, 1), get_span(1, 1, 3)],
470            vec![get_span(2, 2, 2)],
471        ];
472
473        let mut traces = group_into_traces(&mut batch);
474        // We need to sort the output in order to compare, but this is not required by the Datadog agent
475        traces.sort_by_key(|t| u128::from_be_bytes(t[0].span_context.trace_id().to_bytes()));
476
477        assert_eq!(traces, expected);
478    }
479
480    #[test]
481    fn test_agent_endpoint_with_api_version() {
482        let with_tail_slash =
483            DatadogPipelineBuilder::build_endpoint("http://localhost:8126/", Version05.path());
484        let without_tail_slash =
485            DatadogPipelineBuilder::build_endpoint("http://localhost:8126", Version05.path());
486        let with_query = DatadogPipelineBuilder::build_endpoint(
487            "http://localhost:8126?api_key=123",
488            Version05.path(),
489        );
490        let invalid = DatadogPipelineBuilder::build_endpoint(
491            "http://localhost:klsajfjksfh",
492            Version05.path(),
493        );
494
495        assert_eq!(
496            with_tail_slash.unwrap().to_string(),
497            "http://localhost:8126/v0.5/traces"
498        );
499        assert_eq!(
500            without_tail_slash.unwrap().to_string(),
501            "http://localhost:8126/v0.5/traces"
502        );
503        assert_eq!(
504            with_query.unwrap().to_string(),
505            "http://localhost:8126/v0.5/traces?api_key=123"
506        );
507        assert!(invalid.is_err())
508    }
509
510    #[derive(Debug)]
511    struct DummyClient;
512
513    #[async_trait::async_trait]
514    impl HttpClient for DummyClient {
515        async fn send(
516            &self,
517            _request: Request<Vec<u8>>,
518        ) -> Result<http::Response<bytes::Bytes>, opentelemetry_http::HttpError> {
519            Ok(http::Response::new("dummy response".into()))
520        }
521        async fn send_bytes(
522            &self,
523            request: Request<Bytes>,
524        ) -> Result<http::Response<Bytes>, opentelemetry_http::HttpError> {
525            Ok(http::Response::builder()
526                .status(200)
527                .body(request.into_body())
528                .unwrap())
529        }
530    }
531
532    #[test]
533    fn test_custom_http_client() {
534        new_pipeline()
535            .with_http_client(DummyClient)
536            .build_exporter()
537            .unwrap();
538    }
539
540    #[test]
541    fn test_install_simple() {
542        new_pipeline()
543            .with_service_name("test_service")
544            .with_http_client(DummyClient)
545            .install_simple()
546            .unwrap();
547    }
548
549    #[test]
550    fn test_install_batch() {
551        new_pipeline()
552            .with_service_name("test_service")
553            .with_http_client(DummyClient)
554            .install_batch()
555            .unwrap();
556    }
557}