1mod exporter;
153
154pub use exporter::{
155 new_pipeline, ApiVersion, DatadogExporter, DatadogPipelineBuilder, Error, FieldMappingFn,
156 ModelConfig,
157};
158pub use propagator::{DatadogPropagator, DatadogTraceState, DatadogTraceStateBuilder};
159
160mod propagator {
161 use opentelemetry::{
162 propagation::{text_map_propagator::FieldIter, Extractor, Injector, TextMapPropagator},
163 trace::{SpanContext, SpanId, TraceContextExt, TraceFlags, TraceId, TraceState},
164 Context,
165 };
166 use std::sync::OnceLock;
167
168 const DATADOG_TRACE_ID_HEADER: &str = "x-datadog-trace-id";
169 const DATADOG_PARENT_ID_HEADER: &str = "x-datadog-parent-id";
170 const DATADOG_SAMPLING_PRIORITY_HEADER: &str = "x-datadog-sampling-priority";
171
172 const TRACE_FLAG_DEFERRED: TraceFlags = TraceFlags::new(0x02);
173 #[cfg(feature = "agent-sampling")]
174 const TRACE_STATE_PRIORITY_SAMPLING: &str = "psr";
175 const TRACE_STATE_MEASURE: &str = "m";
176 const TRACE_STATE_TRUE_VALUE: &str = "1";
177 const TRACE_STATE_FALSE_VALUE: &str = "0";
178
179 static TRACE_CONTEXT_HEADER_FIELDS: OnceLock<[String; 3]> = OnceLock::new();
181
182 fn trace_context_header_fields() -> &'static [String; 3] {
183 TRACE_CONTEXT_HEADER_FIELDS.get_or_init(|| {
184 [
185 DATADOG_TRACE_ID_HEADER.to_owned(),
186 DATADOG_PARENT_ID_HEADER.to_owned(),
187 DATADOG_SAMPLING_PRIORITY_HEADER.to_owned(),
188 ]
189 })
190 }
191
192 #[derive(Default)]
193 pub struct DatadogTraceStateBuilder {
194 #[cfg(feature = "agent-sampling")]
195 priority_sampling: bool,
196 measuring: bool,
197 }
198
199 fn boolean_to_trace_state_flag(value: bool) -> &'static str {
200 if value {
201 TRACE_STATE_TRUE_VALUE
202 } else {
203 TRACE_STATE_FALSE_VALUE
204 }
205 }
206
207 fn trace_flag_to_boolean(value: &str) -> bool {
208 value == TRACE_STATE_TRUE_VALUE
209 }
210
211 #[allow(clippy::needless_update)]
212 impl DatadogTraceStateBuilder {
213 #[cfg(feature = "agent-sampling")]
214 pub fn with_priority_sampling(self, enabled: bool) -> Self {
215 Self {
216 priority_sampling: enabled,
217 ..self
218 }
219 }
220
221 pub fn with_measuring(self, enabled: bool) -> Self {
222 Self {
223 measuring: enabled,
224 ..self
225 }
226 }
227
228 pub fn build(self) -> TraceState {
229 #[cfg(not(feature = "agent-sampling"))]
230 let values = [(
231 TRACE_STATE_MEASURE,
232 boolean_to_trace_state_flag(self.measuring),
233 )];
234 #[cfg(feature = "agent-sampling")]
235 let values = [
236 (
237 TRACE_STATE_MEASURE,
238 boolean_to_trace_state_flag(self.measuring),
239 ),
240 (
241 TRACE_STATE_PRIORITY_SAMPLING,
242 boolean_to_trace_state_flag(self.priority_sampling),
243 ),
244 ];
245
246 TraceState::from_key_value(values).unwrap_or_default()
247 }
248 }
249
250 pub trait DatadogTraceState {
251 fn with_measuring(&self, enabled: bool) -> TraceState;
252
253 fn measuring_enabled(&self) -> bool;
254
255 #[cfg(feature = "agent-sampling")]
256 fn with_priority_sampling(&self, enabled: bool) -> TraceState;
257
258 #[cfg(feature = "agent-sampling")]
259 fn priority_sampling_enabled(&self) -> bool;
260 }
261
262 impl DatadogTraceState for TraceState {
263 fn with_measuring(&self, enabled: bool) -> TraceState {
264 self.insert(TRACE_STATE_MEASURE, boolean_to_trace_state_flag(enabled))
265 .unwrap_or_else(|_err| self.clone())
266 }
267
268 fn measuring_enabled(&self) -> bool {
269 self.get(TRACE_STATE_MEASURE)
270 .map(trace_flag_to_boolean)
271 .unwrap_or_default()
272 }
273
274 #[cfg(feature = "agent-sampling")]
275 fn with_priority_sampling(&self, enabled: bool) -> TraceState {
276 self.insert(
277 TRACE_STATE_PRIORITY_SAMPLING,
278 boolean_to_trace_state_flag(enabled),
279 )
280 .unwrap_or_else(|_err| self.clone())
281 }
282
283 #[cfg(feature = "agent-sampling")]
284 fn priority_sampling_enabled(&self) -> bool {
285 self.get(TRACE_STATE_PRIORITY_SAMPLING)
286 .map(trace_flag_to_boolean)
287 .unwrap_or_default()
288 }
289 }
290
291 enum SamplingPriority {
292 UserReject = -1,
293 AutoReject = 0,
294 AutoKeep = 1,
295 UserKeep = 2,
296 }
297
298 #[derive(Debug)]
299 enum ExtractError {
300 TraceId,
301 SpanId,
302 SamplingPriority,
303 }
304
305 #[derive(Clone, Debug, Default)]
321 pub struct DatadogPropagator {
322 _private: (),
323 }
324
325 #[cfg(not(feature = "agent-sampling"))]
326 fn create_trace_state_and_flags(trace_flags: TraceFlags) -> (TraceState, TraceFlags) {
327 (TraceState::default(), trace_flags)
328 }
329
330 #[cfg(feature = "agent-sampling")]
331 fn create_trace_state_and_flags(trace_flags: TraceFlags) -> (TraceState, TraceFlags) {
332 if trace_flags & TRACE_FLAG_DEFERRED == TRACE_FLAG_DEFERRED {
333 (TraceState::default(), trace_flags)
334 } else {
335 (
336 DatadogTraceStateBuilder::default()
337 .with_priority_sampling(trace_flags.is_sampled())
338 .build(),
339 TraceFlags::SAMPLED,
340 )
341 }
342 }
343
344 impl DatadogPropagator {
345 pub fn new() -> Self {
347 DatadogPropagator::default()
348 }
349
350 fn extract_trace_id(&self, trace_id: &str) -> Result<TraceId, ExtractError> {
351 trace_id
352 .parse::<u64>()
353 .map(|id| TraceId::from(id as u128))
354 .map_err(|_| ExtractError::TraceId)
355 }
356
357 fn extract_span_id(&self, span_id: &str) -> Result<SpanId, ExtractError> {
358 span_id
359 .parse::<u64>()
360 .map(SpanId::from)
361 .map_err(|_| ExtractError::SpanId)
362 }
363
364 fn extract_sampling_priority(
365 &self,
366 sampling_priority: &str,
367 ) -> Result<SamplingPriority, ExtractError> {
368 let i = sampling_priority
369 .parse::<i32>()
370 .map_err(|_| ExtractError::SamplingPriority)?;
371
372 match i {
373 -1 => Ok(SamplingPriority::UserReject),
374 0 => Ok(SamplingPriority::AutoReject),
375 1 => Ok(SamplingPriority::AutoKeep),
376 2 => Ok(SamplingPriority::UserKeep),
377 _ => Err(ExtractError::SamplingPriority),
378 }
379 }
380
381 fn extract_span_context(
382 &self,
383 extractor: &dyn Extractor,
384 ) -> Result<SpanContext, ExtractError> {
385 let trace_id =
386 self.extract_trace_id(extractor.get(DATADOG_TRACE_ID_HEADER).unwrap_or(""))?;
387 let span_id = self
390 .extract_span_id(extractor.get(DATADOG_PARENT_ID_HEADER).unwrap_or(""))
391 .unwrap_or(SpanId::INVALID);
392 let sampling_priority = self.extract_sampling_priority(
393 extractor
394 .get(DATADOG_SAMPLING_PRIORITY_HEADER)
395 .unwrap_or(""),
396 );
397 let sampled = match sampling_priority {
398 Ok(SamplingPriority::UserReject) | Ok(SamplingPriority::AutoReject) => {
399 TraceFlags::default()
400 }
401 Ok(SamplingPriority::UserKeep) | Ok(SamplingPriority::AutoKeep) => {
402 TraceFlags::SAMPLED
403 }
404 Err(_) => TRACE_FLAG_DEFERRED,
406 };
407
408 let (trace_state, trace_flags) = create_trace_state_and_flags(sampled);
409
410 Ok(SpanContext::new(
411 trace_id,
412 span_id,
413 trace_flags,
414 true,
415 trace_state,
416 ))
417 }
418 }
419
420 #[cfg(not(feature = "agent-sampling"))]
421 fn get_sampling_priority(span_context: &SpanContext) -> SamplingPriority {
422 if span_context.is_sampled() {
423 SamplingPriority::AutoKeep
424 } else {
425 SamplingPriority::AutoReject
426 }
427 }
428
429 #[cfg(feature = "agent-sampling")]
430 fn get_sampling_priority(span_context: &SpanContext) -> SamplingPriority {
431 if span_context.trace_state().priority_sampling_enabled() {
432 SamplingPriority::AutoKeep
433 } else {
434 SamplingPriority::AutoReject
435 }
436 }
437
438 impl TextMapPropagator for DatadogPropagator {
439 fn inject_context(&self, cx: &Context, injector: &mut dyn Injector) {
440 let span = cx.span();
441 let span_context = span.span_context();
442 if span_context.is_valid() {
443 injector.set(
444 DATADOG_TRACE_ID_HEADER,
445 (u128::from_be_bytes(span_context.trace_id().to_bytes()) as u64).to_string(),
446 );
447 injector.set(
448 DATADOG_PARENT_ID_HEADER,
449 u64::from_be_bytes(span_context.span_id().to_bytes()).to_string(),
450 );
451
452 if span_context.trace_flags() & TRACE_FLAG_DEFERRED != TRACE_FLAG_DEFERRED {
453 let sampling_priority = get_sampling_priority(span_context);
454
455 injector.set(
456 DATADOG_SAMPLING_PRIORITY_HEADER,
457 (sampling_priority as i32).to_string(),
458 );
459 }
460 }
461 }
462
463 fn extract_with_context(&self, cx: &Context, extractor: &dyn Extractor) -> Context {
464 self.extract_span_context(extractor)
465 .map(|sc| cx.with_remote_span_context(sc))
466 .unwrap_or_else(|_| cx.clone())
467 }
468
469 fn fields(&self) -> FieldIter<'_> {
470 FieldIter::new(trace_context_header_fields())
471 }
472 }
473
474 #[cfg(test)]
475 mod tests {
476 use super::*;
477 use opentelemetry::trace::TraceState;
478 use opentelemetry_sdk::testing::trace::TestSpan;
479 use std::collections::HashMap;
480
481 #[rustfmt::skip]
482 fn extract_test_data() -> Vec<(Vec<(&'static str, &'static str)>, SpanContext)> {
483 #[cfg(feature = "agent-sampling")]
484 return vec![
485 (vec![], SpanContext::empty_context()),
486 (vec![(DATADOG_SAMPLING_PRIORITY_HEADER, "0")], SpanContext::empty_context()),
487 (vec![(DATADOG_TRACE_ID_HEADER, "garbage")], SpanContext::empty_context()),
488 (vec![(DATADOG_TRACE_ID_HEADER, "1234"), (DATADOG_PARENT_ID_HEADER, "garbage")], SpanContext::new(TraceId::from_u128(1234), SpanId::INVALID, TRACE_FLAG_DEFERRED, true, TraceState::default())),
489 (vec![(DATADOG_TRACE_ID_HEADER, "1234"), (DATADOG_PARENT_ID_HEADER, "12")], SpanContext::new(TraceId::from_u128(1234), SpanId::from_u64(12), TRACE_FLAG_DEFERRED, true, TraceState::default())),
490 (vec![(DATADOG_TRACE_ID_HEADER, "1234"), (DATADOG_PARENT_ID_HEADER, "12"), (DATADOG_SAMPLING_PRIORITY_HEADER, "0")], SpanContext::new(TraceId::from_u128(1234), SpanId::from_u64(12), TraceFlags::SAMPLED, true, DatadogTraceStateBuilder::default().with_priority_sampling(false).build())),
491 (vec![(DATADOG_TRACE_ID_HEADER, "1234"), (DATADOG_PARENT_ID_HEADER, "12"), (DATADOG_SAMPLING_PRIORITY_HEADER, "1")], SpanContext::new(TraceId::from_u128(1234), SpanId::from_u64(12), TraceFlags::SAMPLED, true, DatadogTraceStateBuilder::default().with_priority_sampling(true).build())),
492 ];
493 #[cfg(not(feature = "agent-sampling"))]
494 return vec![
495 (vec![], SpanContext::empty_context()),
496 (vec![(DATADOG_SAMPLING_PRIORITY_HEADER, "0")], SpanContext::empty_context()),
497 (vec![(DATADOG_TRACE_ID_HEADER, "garbage")], SpanContext::empty_context()),
498 (vec![(DATADOG_TRACE_ID_HEADER, "1234"), (DATADOG_PARENT_ID_HEADER, "garbage")], SpanContext::new(TraceId::from_u128(1234), SpanId::INVALID, TRACE_FLAG_DEFERRED, true, TraceState::default())),
499 (vec![(DATADOG_TRACE_ID_HEADER, "1234"), (DATADOG_PARENT_ID_HEADER, "12")], SpanContext::new(TraceId::from_u128(1234), SpanId::from_u64(12), TRACE_FLAG_DEFERRED, true, TraceState::default())),
500 (vec![(DATADOG_TRACE_ID_HEADER, "1234"), (DATADOG_PARENT_ID_HEADER, "12"), (DATADOG_SAMPLING_PRIORITY_HEADER, "0")], SpanContext::new(TraceId::from_u128(1234), SpanId::from_u64(12), TraceFlags::default(), true, TraceState::default())),
501 (vec![(DATADOG_TRACE_ID_HEADER, "1234"), (DATADOG_PARENT_ID_HEADER, "12"), (DATADOG_SAMPLING_PRIORITY_HEADER, "1")], SpanContext::new(TraceId::from_u128(1234), SpanId::from_u64(12), TraceFlags::SAMPLED, true, TraceState::default())),
502 ];
503 }
504
505 #[rustfmt::skip]
506 fn inject_test_data() -> Vec<(Vec<(&'static str, &'static str)>, SpanContext)> {
507 #[cfg(feature = "agent-sampling")]
508 return vec![
509 (vec![], SpanContext::empty_context()),
510 (vec![], SpanContext::new(TraceId::INVALID, SpanId::INVALID, TRACE_FLAG_DEFERRED, true, TraceState::default())),
511 (vec![], SpanContext::new(TraceId::from_hex("1234").unwrap(), SpanId::INVALID, TRACE_FLAG_DEFERRED, true, TraceState::default())),
512 (vec![], SpanContext::new(TraceId::from_hex("1234").unwrap(), SpanId::INVALID, TraceFlags::SAMPLED, true, TraceState::default())),
513 (vec![(DATADOG_TRACE_ID_HEADER, "1234"), (DATADOG_PARENT_ID_HEADER, "12")], SpanContext::new(TraceId::from_u128(1234), SpanId::from_u64(12), TRACE_FLAG_DEFERRED, true, TraceState::default())),
514 (vec![(DATADOG_TRACE_ID_HEADER, "1234"), (DATADOG_PARENT_ID_HEADER, "12"), (DATADOG_SAMPLING_PRIORITY_HEADER, "0")], SpanContext::new(TraceId::from_u128(1234), SpanId::from_u64(12), TraceFlags::SAMPLED, true, DatadogTraceStateBuilder::default().with_priority_sampling(false).build())),
515 (vec![(DATADOG_TRACE_ID_HEADER, "1234"), (DATADOG_PARENT_ID_HEADER, "12"), (DATADOG_SAMPLING_PRIORITY_HEADER, "1")], SpanContext::new(TraceId::from_u128(1234), SpanId::from_u64(12), TraceFlags::SAMPLED, true, DatadogTraceStateBuilder::default().with_priority_sampling(true).build())),
516 ];
517 #[cfg(not(feature = "agent-sampling"))]
518 return vec![
519 (vec![], SpanContext::empty_context()),
520 (vec![], SpanContext::new(TraceId::INVALID, SpanId::INVALID, TRACE_FLAG_DEFERRED, true, TraceState::default())),
521 (vec![], SpanContext::new(TraceId::from_hex("1234").unwrap(), SpanId::INVALID, TRACE_FLAG_DEFERRED, true, TraceState::default())),
522 (vec![], SpanContext::new(TraceId::from_hex("1234").unwrap(), SpanId::INVALID, TraceFlags::SAMPLED, true, TraceState::default())),
523 (vec![(DATADOG_TRACE_ID_HEADER, "1234"), (DATADOG_PARENT_ID_HEADER, "12")], SpanContext::new(TraceId::from_u128(1234), SpanId::from_u64(12), TRACE_FLAG_DEFERRED, true, TraceState::default())),
524 (vec![(DATADOG_TRACE_ID_HEADER, "1234"), (DATADOG_PARENT_ID_HEADER, "12"), (DATADOG_SAMPLING_PRIORITY_HEADER, "0")], SpanContext::new(TraceId::from_u128(1234), SpanId::from_u64(12), TraceFlags::default(), true, TraceState::default())),
525 (vec![(DATADOG_TRACE_ID_HEADER, "1234"), (DATADOG_PARENT_ID_HEADER, "12"), (DATADOG_SAMPLING_PRIORITY_HEADER, "1")], SpanContext::new(TraceId::from_u128(1234), SpanId::from_u64(12), TraceFlags::SAMPLED, true, TraceState::default())),
526 ];
527 }
528
529 #[test]
530 fn test_extract() {
531 for (header_list, expected) in extract_test_data() {
532 let map: HashMap<String, String> = header_list
533 .into_iter()
534 .map(|(k, v)| (k.to_string(), v.to_string()))
535 .collect();
536
537 let propagator = DatadogPropagator::default();
538 let context = propagator.extract(&map);
539 assert_eq!(context.span().span_context(), &expected);
540 }
541 }
542
543 #[test]
544 fn test_extract_empty() {
545 let map: HashMap<String, String> = HashMap::new();
546 let propagator = DatadogPropagator::default();
547 let context = propagator.extract(&map);
548 assert_eq!(context.span().span_context(), &SpanContext::empty_context())
549 }
550
551 #[test]
552 fn test_extract_with_empty_remote_context() {
553 let map: HashMap<String, String> = HashMap::new();
554 let propagator = DatadogPropagator::default();
555 let context = propagator.extract_with_context(&Context::new(), &map);
556 assert!(!context.has_active_span())
557 }
558
559 #[test]
560 fn test_inject() {
561 let propagator = DatadogPropagator::default();
562 for (header_values, span_context) in inject_test_data() {
563 let mut injector: HashMap<String, String> = HashMap::new();
564 propagator.inject_context(
565 &Context::current_with_span(TestSpan(span_context)),
566 &mut injector,
567 );
568
569 if !header_values.is_empty() {
570 for (k, v) in header_values.into_iter() {
571 let injected_value: Option<&String> = injector.get(k);
572 assert_eq!(injected_value, Some(&v.to_string()));
573 injector.remove(k);
574 }
575 }
576 assert!(injector.is_empty());
577 }
578 }
579 }
580}