1mod intern;
2mod model;
3
4pub use model::ApiVersion;
5pub use model::Error;
6pub use model::FieldMappingFn;
7
8use crate::exporter::model::FieldMapping;
9use http::{Method, Request, Uri};
10use opentelemetry::{Key, KeyValue};
11use opentelemetry_http::{HttpClient, ResponseExt};
12use opentelemetry_sdk::{
13 error::{OTelSdkError, OTelSdkResult},
14 resource::{ResourceDetector, SdkProvidedResourceDetector},
15 trace::{Config, SdkTracerProvider, TraceError},
16 trace::{SpanData, SpanExporter},
17 Resource,
18};
19use opentelemetry_semantic_conventions as semcov;
20use std::borrow::Cow;
21use std::fmt::{Debug, Formatter};
22use std::sync::Arc;
23use url::Url;
24
25use self::model::unified_tags::UnifiedTags;
26
27const DEFAULT_AGENT_ENDPOINT: &str = "http://127.0.0.1:8126";
29
30const DATADOG_TRACE_COUNT_HEADER: &str = "X-Datadog-Trace-Count";
32
33const DATADOG_META_LANG_HEADER: &str = "Datadog-Meta-Lang";
35const DATADOG_META_TRACER_VERSION_HEADER: &str = "Datadog-Meta-Tracer-Version";
36
37pub struct Mapping {
39 resource: Option<FieldMapping>,
40 name: Option<FieldMapping>,
41 service_name: Option<FieldMapping>,
42}
43
44impl Mapping {
45 pub fn new(
46 resource: Option<FieldMapping>,
47 name: Option<FieldMapping>,
48 service_name: Option<FieldMapping>,
49 ) -> Self {
50 Mapping {
51 resource,
52 name,
53 service_name,
54 }
55 }
56 pub fn empty() -> Self {
57 Self::new(None, None, None)
58 }
59}
60
61pub struct DatadogExporter {
63 client: Arc<dyn HttpClient>,
64 request_url: Uri,
65 model_config: ModelConfig,
66 api_version: ApiVersion,
67 mapping: Mapping,
68 unified_tags: UnifiedTags,
69 resource: Option<Resource>,
70}
71
72impl DatadogExporter {
73 fn new(
74 model_config: ModelConfig,
75 request_url: Uri,
76 api_version: ApiVersion,
77 client: Arc<dyn HttpClient>,
78 mapping: Mapping,
79 unified_tags: UnifiedTags,
80 ) -> Self {
81 DatadogExporter {
82 client,
83 request_url,
84 model_config,
85 api_version,
86 mapping,
87 unified_tags,
88 resource: None,
89 }
90 }
91
92 fn build_request(
93 &self,
94 mut batch: Vec<SpanData>,
95 ) -> Result<http::Request<Vec<u8>>, OTelSdkError> {
96 let traces: Vec<&[SpanData]> = group_into_traces(&mut batch);
97 let trace_count = traces.len();
98 let data = self
99 .api_version
100 .encode(
101 &self.model_config,
102 traces,
103 &self.mapping,
104 &self.unified_tags,
105 self.resource.as_ref(),
106 )
107 .map_err(|e| OTelSdkError::InternalFailure(format!("{:?}", e)))?;
108 let req = Request::builder()
109 .method(Method::POST)
110 .uri(self.request_url.clone())
111 .header(http::header::CONTENT_TYPE, self.api_version.content_type())
112 .header(DATADOG_TRACE_COUNT_HEADER, trace_count)
113 .header(DATADOG_META_LANG_HEADER, "rust")
114 .header(
115 DATADOG_META_TRACER_VERSION_HEADER,
116 env!("CARGO_PKG_VERSION"),
117 )
118 .body(data)
119 .map_err(|e| OTelSdkError::InternalFailure(format!("{:?}", e)));
120 Ok(req)?
121 }
122}
123
124impl Debug for DatadogExporter {
125 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
126 f.debug_struct("DatadogExporter")
127 .field("model_config", &self.model_config)
128 .field("request_url", &self.request_url)
129 .field("api_version", &self.api_version)
130 .field("client", &self.client)
131 .field("resource_mapping", &mapping_debug(&self.mapping.resource))
132 .field("name_mapping", &mapping_debug(&self.mapping.name))
133 .field(
134 "service_name_mapping",
135 &mapping_debug(&self.mapping.service_name),
136 )
137 .finish()
138 }
139}
140
141pub fn new_pipeline() -> DatadogPipelineBuilder {
143 DatadogPipelineBuilder::default()
144}
145
146pub struct DatadogPipelineBuilder {
148 agent_endpoint: String,
149 trace_config: Option<Config>,
150 api_version: ApiVersion,
151 client: Option<Arc<dyn HttpClient>>,
152 mapping: Mapping,
153 unified_tags: UnifiedTags,
154}
155
156impl Default for DatadogPipelineBuilder {
157 fn default() -> Self {
158 DatadogPipelineBuilder {
159 agent_endpoint: DEFAULT_AGENT_ENDPOINT.to_string(),
160 trace_config: None,
161 mapping: Mapping::empty(),
162 api_version: ApiVersion::Version05,
163 unified_tags: UnifiedTags::new(),
164 #[cfg(all(
165 not(feature = "reqwest-client"),
166 not(feature = "reqwest-blocking-client"),
167 not(feature = "surf-client"),
168 ))]
169 client: None,
170 #[cfg(all(
171 not(feature = "reqwest-client"),
172 not(feature = "reqwest-blocking-client"),
173 feature = "surf-client"
174 ))]
175 client: Some(Arc::new(surf::Client::new())),
176 #[cfg(all(
177 not(feature = "surf-client"),
178 not(feature = "reqwest-blocking-client"),
179 feature = "reqwest-client"
180 ))]
181 client: Some(Arc::new(reqwest::Client::new())),
182 #[cfg(feature = "reqwest-blocking-client")]
183 client: Some(Arc::new(reqwest::blocking::Client::new())),
184 }
185 }
186}
187
188impl Debug for DatadogPipelineBuilder {
189 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
190 f.debug_struct("DatadogExporter")
191 .field("agent_endpoint", &self.agent_endpoint)
192 .field("trace_config", &self.trace_config)
193 .field("client", &self.client)
194 .field("resource_mapping", &mapping_debug(&self.mapping.resource))
195 .field("name_mapping", &mapping_debug(&self.mapping.name))
196 .field(
197 "service_name_mapping",
198 &mapping_debug(&self.mapping.service_name),
199 )
200 .finish()
201 }
202}
203
204impl DatadogPipelineBuilder {
205 pub fn build_exporter(mut self) -> Result<DatadogExporter, TraceError> {
209 let (_, service_name) = self.build_config_and_service_name();
210 self.build_exporter_with_service_name(service_name)
211 }
212
213 fn build_config_and_service_name(&mut self) -> (Config, String) {
214 let service_name = self.unified_tags.service();
215 if let Some(service_name) = service_name {
216 let config = if let Some(mut cfg) = self.trace_config.take() {
217 cfg.resource = Cow::Owned(
218 Resource::builder()
219 .with_attributes(
220 cfg.resource
221 .iter()
222 .filter(|(k, _v)| k.as_str() != semcov::resource::SERVICE_NAME)
223 .map(|(k, v)| KeyValue::new(k.clone(), v.clone())),
224 )
225 .build(),
226 );
227 cfg
228 } else {
229 let mut cfg = Config::default();
230 cfg.resource = Cow::Owned(Resource::builder_empty().build());
231 cfg
232 };
233 (config, service_name)
234 } else {
235 let service_name = SdkProvidedResourceDetector
236 .detect()
237 .get(&Key::new(semcov::resource::SERVICE_NAME))
238 .unwrap()
239 .to_string();
240 let mut cfg = Config::default();
241 cfg.resource = Cow::Owned(Resource::builder_empty().build());
243 (cfg, service_name)
244 }
245 }
246
247 fn build_endpoint(agent_endpoint: &str, version: &str) -> Result<Uri, TraceError> {
250 let mut endpoint = agent_endpoint
252 .parse::<Url>()
253 .map_err::<Error, _>(Into::into)?;
254 let mut paths = endpoint
255 .path_segments()
256 .map(|c| c.filter(|s| !s.is_empty()).collect::<Vec<_>>())
257 .unwrap_or_default();
258 paths.push(version);
259
260 let path_str = paths.join("/");
261 endpoint.set_path(path_str.as_str());
262
263 Ok(endpoint.as_str().parse().map_err::<Error, _>(Into::into)?)
264 }
265
266 fn build_exporter_with_service_name(
267 self,
268 service_name: String,
269 ) -> Result<DatadogExporter, TraceError> {
270 if let Some(client) = self.client {
271 let model_config = ModelConfig { service_name };
272
273 let exporter = DatadogExporter::new(
274 model_config,
275 Self::build_endpoint(&self.agent_endpoint, self.api_version.path())?,
276 self.api_version,
277 client,
278 self.mapping,
279 self.unified_tags,
280 );
281 Ok(exporter)
282 } else {
283 Err(Error::NoHttpClient.into())
284 }
285 }
286
287 pub fn install_simple(mut self) -> Result<SdkTracerProvider, TraceError> {
289 let (config, service_name) = self.build_config_and_service_name();
290 let exporter = self.build_exporter_with_service_name(service_name)?;
291 Ok(SdkTracerProvider::builder()
292 .with_simple_exporter(exporter)
293 .with_resource(config.resource.into_owned())
294 .build())
295 }
296
297 pub fn install_batch(mut self) -> Result<SdkTracerProvider, TraceError> {
300 let (config, service_name) = self.build_config_and_service_name();
301 let exporter = self.build_exporter_with_service_name(service_name)?;
302 Ok(SdkTracerProvider::builder()
303 .with_batch_exporter(exporter)
304 .with_resource(config.resource.into_owned())
305 .build())
306 }
307
308 pub fn with_service_name<T: Into<String>>(mut self, service_name: T) -> Self {
310 self.unified_tags.set_service(Some(service_name.into()));
311 self
312 }
313
314 pub fn with_version<T: Into<String>>(mut self, version: T) -> Self {
316 self.unified_tags.set_version(Some(version.into()));
317 self
318 }
319
320 pub fn with_env<T: Into<String>>(mut self, env: T) -> Self {
322 self.unified_tags.set_env(Some(env.into()));
323 self
324 }
325
326 pub fn with_agent_endpoint<T: Into<String>>(mut self, endpoint: T) -> Self {
330 self.agent_endpoint = endpoint.into();
331 self
332 }
333
334 pub fn with_http_client<T: HttpClient + 'static>(mut self, client: T) -> Self {
336 self.client = Some(Arc::new(client));
337 self
338 }
339
340 pub fn with_trace_config(mut self, config: Config) -> Self {
342 self.trace_config = Some(config);
343 self
344 }
345
346 pub fn with_api_version(mut self, api_version: ApiVersion) -> Self {
348 self.api_version = api_version;
349 self
350 }
351
352 pub fn with_resource_mapping<F>(mut self, f: F) -> Self
355 where
356 F: for<'a> Fn(&'a SpanData, &'a ModelConfig) -> &'a str + Send + Sync + 'static,
357 {
358 self.mapping.resource = Some(Arc::new(f));
359 self
360 }
361
362 pub fn with_name_mapping<F>(mut self, f: F) -> Self
365 where
366 F: for<'a> Fn(&'a SpanData, &'a ModelConfig) -> &'a str + Send + Sync + 'static,
367 {
368 self.mapping.name = Some(Arc::new(f));
369 self
370 }
371
372 pub fn with_service_name_mapping<F>(mut self, f: F) -> Self
375 where
376 F: for<'a> Fn(&'a SpanData, &'a ModelConfig) -> &'a str + Send + Sync + 'static,
377 {
378 self.mapping.service_name = Some(Arc::new(f));
379 self
380 }
381}
382
383fn group_into_traces(spans: &mut [SpanData]) -> Vec<&[SpanData]> {
384 if spans.is_empty() {
385 return vec![];
386 }
387
388 spans.sort_unstable_by_key(|x| x.span_context.trace_id().to_bytes());
389
390 let mut traces: Vec<&[SpanData]> = Vec::with_capacity(spans.len());
391
392 let mut start = 0;
393 let mut start_trace_id = spans[start].span_context.trace_id();
394 for (idx, span) in spans.iter().enumerate() {
395 let current_trace_id = span.span_context.trace_id();
396 if start_trace_id != current_trace_id {
397 traces.push(&spans[start..idx]);
398 start = idx;
399 start_trace_id = current_trace_id;
400 }
401 }
402 traces.push(&spans[start..]);
403 traces
404}
405
406async fn send_request(
407 client: Arc<dyn HttpClient>,
408 request: http::Request<Vec<u8>>,
409) -> OTelSdkResult {
410 #[allow(deprecated)]
411 let response = client
412 .send(request)
413 .await
414 .map_err(|e| OTelSdkError::InternalFailure(format!("HTTP request failed: {}", e)))?;
415
416 response
417 .error_for_status()
418 .map_err(|e| OTelSdkError::InternalFailure(format!("HTTP response error: {}", e)))?;
419
420 Ok(())
421}
422
423impl SpanExporter for DatadogExporter {
424 async fn export(&self, batch: Vec<SpanData>) -> OTelSdkResult {
426 let request = match self.build_request(batch) {
427 Ok(req) => req,
428 Err(err) => return Err(err),
429 };
430
431 let client = self.client.clone();
432 send_request(client, request).await
433 }
434 fn set_resource(&mut self, resource: &Resource) {
435 self.resource = Some(resource.clone());
436 }
437}
438
439#[derive(Default, Debug)]
443#[non_exhaustive]
444pub struct ModelConfig {
445 pub service_name: String,
446}
447
448fn mapping_debug(f: &Option<FieldMapping>) -> String {
449 if f.is_some() {
450 "custom mapping"
451 } else {
452 "default mapping"
453 }
454 .to_string()
455}
456
457#[cfg(test)]
458mod tests {
459 use super::*;
460 use crate::ApiVersion::Version05;
461
462 use crate::exporter::model::tests::get_span;
463 use bytes::Bytes;
464
465 #[test]
466 fn test_out_of_order_group() {
467 let mut batch = vec![get_span(1, 1, 1), get_span(2, 2, 2), get_span(1, 1, 3)];
468 let expected = vec![
469 vec![get_span(1, 1, 1), get_span(1, 1, 3)],
470 vec![get_span(2, 2, 2)],
471 ];
472
473 let mut traces = group_into_traces(&mut batch);
474 traces.sort_by_key(|t| u128::from_be_bytes(t[0].span_context.trace_id().to_bytes()));
476
477 assert_eq!(traces, expected);
478 }
479
480 #[test]
481 fn test_agent_endpoint_with_api_version() {
482 let with_tail_slash =
483 DatadogPipelineBuilder::build_endpoint("http://localhost:8126/", Version05.path());
484 let without_tail_slash =
485 DatadogPipelineBuilder::build_endpoint("http://localhost:8126", Version05.path());
486 let with_query = DatadogPipelineBuilder::build_endpoint(
487 "http://localhost:8126?api_key=123",
488 Version05.path(),
489 );
490 let invalid = DatadogPipelineBuilder::build_endpoint(
491 "http://localhost:klsajfjksfh",
492 Version05.path(),
493 );
494
495 assert_eq!(
496 with_tail_slash.unwrap().to_string(),
497 "http://localhost:8126/v0.5/traces"
498 );
499 assert_eq!(
500 without_tail_slash.unwrap().to_string(),
501 "http://localhost:8126/v0.5/traces"
502 );
503 assert_eq!(
504 with_query.unwrap().to_string(),
505 "http://localhost:8126/v0.5/traces?api_key=123"
506 );
507 assert!(invalid.is_err())
508 }
509
510 #[derive(Debug)]
511 struct DummyClient;
512
513 #[async_trait::async_trait]
514 impl HttpClient for DummyClient {
515 async fn send(
516 &self,
517 _request: Request<Vec<u8>>,
518 ) -> Result<http::Response<bytes::Bytes>, opentelemetry_http::HttpError> {
519 Ok(http::Response::new("dummy response".into()))
520 }
521 async fn send_bytes(
522 &self,
523 request: Request<Bytes>,
524 ) -> Result<http::Response<Bytes>, opentelemetry_http::HttpError> {
525 Ok(http::Response::builder()
526 .status(200)
527 .body(request.into_body())
528 .unwrap())
529 }
530 }
531
532 #[test]
533 fn test_custom_http_client() {
534 new_pipeline()
535 .with_http_client(DummyClient)
536 .build_exporter()
537 .unwrap();
538 }
539
540 #[test]
541 fn test_install_simple() {
542 new_pipeline()
543 .with_service_name("test_service")
544 .with_http_client(DummyClient)
545 .install_simple()
546 .unwrap();
547 }
548
549 #[test]
550 fn test_install_batch() {
551 new_pipeline()
552 .with_service_name("test_service")
553 .with_http_client(DummyClient)
554 .install_batch()
555 .unwrap();
556 }
557}