mod exporter;
pub use exporter::{
new_pipeline, ApiVersion, DatadogExporter, DatadogPipelineBuilder, Error, FieldMappingFn,
ModelConfig,
};
pub use propagator::{DatadogPropagator, DatadogTraceState, DatadogTraceStateBuilder};
mod propagator {
use once_cell::sync::Lazy;
use opentelemetry::{
propagation::{text_map_propagator::FieldIter, Extractor, Injector, TextMapPropagator},
trace::{SpanContext, SpanId, TraceContextExt, TraceFlags, TraceId, TraceState},
Context,
};
const DATADOG_TRACE_ID_HEADER: &str = "x-datadog-trace-id";
const DATADOG_PARENT_ID_HEADER: &str = "x-datadog-parent-id";
const DATADOG_SAMPLING_PRIORITY_HEADER: &str = "x-datadog-sampling-priority";
const TRACE_FLAG_DEFERRED: TraceFlags = TraceFlags::new(0x02);
#[cfg(feature = "agent-sampling")]
const TRACE_STATE_PRIORITY_SAMPLING: &str = "psr";
const TRACE_STATE_MEASURE: &str = "m";
const TRACE_STATE_TRUE_VALUE: &str = "1";
const TRACE_STATE_FALSE_VALUE: &str = "0";
static DATADOG_HEADER_FIELDS: Lazy<[String; 3]> = Lazy::new(|| {
[
DATADOG_TRACE_ID_HEADER.to_string(),
DATADOG_PARENT_ID_HEADER.to_string(),
DATADOG_SAMPLING_PRIORITY_HEADER.to_string(),
]
});
#[derive(Default)]
pub struct DatadogTraceStateBuilder {
#[cfg(feature = "agent-sampling")]
priority_sampling: bool,
measuring: bool,
}
fn boolean_to_trace_state_flag(value: bool) -> &'static str {
if value {
TRACE_STATE_TRUE_VALUE
} else {
TRACE_STATE_FALSE_VALUE
}
}
fn trace_flag_to_boolean(value: &str) -> bool {
value == TRACE_STATE_TRUE_VALUE
}
#[allow(clippy::needless_update)]
impl DatadogTraceStateBuilder {
#[cfg(feature = "agent-sampling")]
pub fn with_priority_sampling(self, enabled: bool) -> Self {
Self {
priority_sampling: enabled,
..self
}
}
pub fn with_measuring(self, enabled: bool) -> Self {
Self {
measuring: enabled,
..self
}
}
pub fn build(self) -> TraceState {
#[cfg(not(feature = "agent-sampling"))]
let values = [(
TRACE_STATE_MEASURE,
boolean_to_trace_state_flag(self.measuring),
)];
#[cfg(feature = "agent-sampling")]
let values = [
(
TRACE_STATE_MEASURE,
boolean_to_trace_state_flag(self.measuring),
),
(
TRACE_STATE_PRIORITY_SAMPLING,
boolean_to_trace_state_flag(self.priority_sampling),
),
];
TraceState::from_key_value(values).unwrap_or_default()
}
}
pub trait DatadogTraceState {
fn with_measuring(&self, enabled: bool) -> TraceState;
fn measuring_enabled(&self) -> bool;
#[cfg(feature = "agent-sampling")]
fn with_priority_sampling(&self, enabled: bool) -> TraceState;
#[cfg(feature = "agent-sampling")]
fn priority_sampling_enabled(&self) -> bool;
}
impl DatadogTraceState for TraceState {
fn with_measuring(&self, enabled: bool) -> TraceState {
self.insert(TRACE_STATE_MEASURE, boolean_to_trace_state_flag(enabled))
.unwrap_or_else(|_err| self.clone())
}
fn measuring_enabled(&self) -> bool {
self.get(TRACE_STATE_MEASURE)
.map(trace_flag_to_boolean)
.unwrap_or_default()
}
#[cfg(feature = "agent-sampling")]
fn with_priority_sampling(&self, enabled: bool) -> TraceState {
self.insert(
TRACE_STATE_PRIORITY_SAMPLING,
boolean_to_trace_state_flag(enabled),
)
.unwrap_or_else(|_err| self.clone())
}
#[cfg(feature = "agent-sampling")]
fn priority_sampling_enabled(&self) -> bool {
self.get(TRACE_STATE_PRIORITY_SAMPLING)
.map(trace_flag_to_boolean)
.unwrap_or_default()
}
}
enum SamplingPriority {
UserReject = -1,
AutoReject = 0,
AutoKeep = 1,
UserKeep = 2,
}
#[derive(Debug)]
enum ExtractError {
TraceId,
SpanId,
SamplingPriority,
}
#[derive(Clone, Debug, Default)]
pub struct DatadogPropagator {
_private: (),
}
#[cfg(not(feature = "agent-sampling"))]
fn create_trace_state_and_flags(trace_flags: TraceFlags) -> (TraceState, TraceFlags) {
(TraceState::default(), trace_flags)
}
#[cfg(feature = "agent-sampling")]
fn create_trace_state_and_flags(trace_flags: TraceFlags) -> (TraceState, TraceFlags) {
if trace_flags & TRACE_FLAG_DEFERRED == TRACE_FLAG_DEFERRED {
(TraceState::default(), trace_flags)
} else {
(
DatadogTraceStateBuilder::default()
.with_priority_sampling(trace_flags.is_sampled())
.build(),
TraceFlags::SAMPLED,
)
}
}
impl DatadogPropagator {
pub fn new() -> Self {
DatadogPropagator::default()
}
fn extract_trace_id(&self, trace_id: &str) -> Result<TraceId, ExtractError> {
trace_id
.parse::<u64>()
.map(|id| TraceId::from(id as u128))
.map_err(|_| ExtractError::TraceId)
}
fn extract_span_id(&self, span_id: &str) -> Result<SpanId, ExtractError> {
span_id
.parse::<u64>()
.map(SpanId::from)
.map_err(|_| ExtractError::SpanId)
}
fn extract_sampling_priority(
&self,
sampling_priority: &str,
) -> Result<SamplingPriority, ExtractError> {
let i = sampling_priority
.parse::<i32>()
.map_err(|_| ExtractError::SamplingPriority)?;
match i {
-1 => Ok(SamplingPriority::UserReject),
0 => Ok(SamplingPriority::AutoReject),
1 => Ok(SamplingPriority::AutoKeep),
2 => Ok(SamplingPriority::UserKeep),
_ => Err(ExtractError::SamplingPriority),
}
}
fn extract_span_context(
&self,
extractor: &dyn Extractor,
) -> Result<SpanContext, ExtractError> {
let trace_id =
self.extract_trace_id(extractor.get(DATADOG_TRACE_ID_HEADER).unwrap_or(""))?;
let span_id = self
.extract_span_id(extractor.get(DATADOG_PARENT_ID_HEADER).unwrap_or(""))
.unwrap_or(SpanId::INVALID);
let sampling_priority = self.extract_sampling_priority(
extractor
.get(DATADOG_SAMPLING_PRIORITY_HEADER)
.unwrap_or(""),
);
let sampled = match sampling_priority {
Ok(SamplingPriority::UserReject) | Ok(SamplingPriority::AutoReject) => {
TraceFlags::default()
}
Ok(SamplingPriority::UserKeep) | Ok(SamplingPriority::AutoKeep) => {
TraceFlags::SAMPLED
}
Err(_) => TRACE_FLAG_DEFERRED,
};
let (trace_state, trace_flags) = create_trace_state_and_flags(sampled);
Ok(SpanContext::new(
trace_id,
span_id,
trace_flags,
true,
trace_state,
))
}
}
#[cfg(not(feature = "agent-sampling"))]
fn get_sampling_priority(span_context: &SpanContext) -> SamplingPriority {
if span_context.is_sampled() {
SamplingPriority::AutoKeep
} else {
SamplingPriority::AutoReject
}
}
#[cfg(feature = "agent-sampling")]
fn get_sampling_priority(span_context: &SpanContext) -> SamplingPriority {
if span_context.trace_state().priority_sampling_enabled() {
SamplingPriority::AutoKeep
} else {
SamplingPriority::AutoReject
}
}
impl TextMapPropagator for DatadogPropagator {
fn inject_context(&self, cx: &Context, injector: &mut dyn Injector) {
let span = cx.span();
let span_context = span.span_context();
if span_context.is_valid() {
injector.set(
DATADOG_TRACE_ID_HEADER,
(u128::from_be_bytes(span_context.trace_id().to_bytes()) as u64).to_string(),
);
injector.set(
DATADOG_PARENT_ID_HEADER,
u64::from_be_bytes(span_context.span_id().to_bytes()).to_string(),
);
if span_context.trace_flags() & TRACE_FLAG_DEFERRED != TRACE_FLAG_DEFERRED {
let sampling_priority = get_sampling_priority(span_context);
injector.set(
DATADOG_SAMPLING_PRIORITY_HEADER,
(sampling_priority as i32).to_string(),
);
}
}
}
fn extract_with_context(&self, cx: &Context, extractor: &dyn Extractor) -> Context {
self.extract_span_context(extractor)
.map(|sc| cx.with_remote_span_context(sc))
.unwrap_or_else(|_| cx.clone())
}
fn fields(&self) -> FieldIter<'_> {
FieldIter::new(DATADOG_HEADER_FIELDS.as_ref())
}
}
#[cfg(test)]
mod tests {
use super::*;
use opentelemetry::trace::TraceState;
use opentelemetry_sdk::testing::trace::TestSpan;
use std::collections::HashMap;
#[rustfmt::skip]
fn extract_test_data() -> Vec<(Vec<(&'static str, &'static str)>, SpanContext)> {
#[cfg(feature = "agent-sampling")]
return vec![
(vec![], SpanContext::empty_context()),
(vec![(DATADOG_SAMPLING_PRIORITY_HEADER, "0")], SpanContext::empty_context()),
(vec![(DATADOG_TRACE_ID_HEADER, "garbage")], SpanContext::empty_context()),
(vec![(DATADOG_TRACE_ID_HEADER, "1234"), (DATADOG_PARENT_ID_HEADER, "garbage")], SpanContext::new(TraceId::from_u128(1234), SpanId::INVALID, TRACE_FLAG_DEFERRED, true, TraceState::default())),
(vec![(DATADOG_TRACE_ID_HEADER, "1234"), (DATADOG_PARENT_ID_HEADER, "12")], SpanContext::new(TraceId::from_u128(1234), SpanId::from_u64(12), TRACE_FLAG_DEFERRED, true, TraceState::default())),
(vec![(DATADOG_TRACE_ID_HEADER, "1234"), (DATADOG_PARENT_ID_HEADER, "12"), (DATADOG_SAMPLING_PRIORITY_HEADER, "0")], SpanContext::new(TraceId::from_u128(1234), SpanId::from_u64(12), TraceFlags::SAMPLED, true, DatadogTraceStateBuilder::default().with_priority_sampling(false).build())),
(vec![(DATADOG_TRACE_ID_HEADER, "1234"), (DATADOG_PARENT_ID_HEADER, "12"), (DATADOG_SAMPLING_PRIORITY_HEADER, "1")], SpanContext::new(TraceId::from_u128(1234), SpanId::from_u64(12), TraceFlags::SAMPLED, true, DatadogTraceStateBuilder::default().with_priority_sampling(true).build())),
];
#[cfg(not(feature = "agent-sampling"))]
return vec![
(vec![], SpanContext::empty_context()),
(vec![(DATADOG_SAMPLING_PRIORITY_HEADER, "0")], SpanContext::empty_context()),
(vec![(DATADOG_TRACE_ID_HEADER, "garbage")], SpanContext::empty_context()),
(vec![(DATADOG_TRACE_ID_HEADER, "1234"), (DATADOG_PARENT_ID_HEADER, "garbage")], SpanContext::new(TraceId::from_u128(1234), SpanId::INVALID, TRACE_FLAG_DEFERRED, true, TraceState::default())),
(vec![(DATADOG_TRACE_ID_HEADER, "1234"), (DATADOG_PARENT_ID_HEADER, "12")], SpanContext::new(TraceId::from_u128(1234), SpanId::from_u64(12), TRACE_FLAG_DEFERRED, true, TraceState::default())),
(vec![(DATADOG_TRACE_ID_HEADER, "1234"), (DATADOG_PARENT_ID_HEADER, "12"), (DATADOG_SAMPLING_PRIORITY_HEADER, "0")], SpanContext::new(TraceId::from_u128(1234), SpanId::from_u64(12), TraceFlags::default(), true, TraceState::default())),
(vec![(DATADOG_TRACE_ID_HEADER, "1234"), (DATADOG_PARENT_ID_HEADER, "12"), (DATADOG_SAMPLING_PRIORITY_HEADER, "1")], SpanContext::new(TraceId::from_u128(1234), SpanId::from_u64(12), TraceFlags::SAMPLED, true, TraceState::default())),
];
}
#[rustfmt::skip]
fn inject_test_data() -> Vec<(Vec<(&'static str, &'static str)>, SpanContext)> {
#[cfg(feature = "agent-sampling")]
return vec![
(vec![], SpanContext::empty_context()),
(vec![], SpanContext::new(TraceId::INVALID, SpanId::INVALID, TRACE_FLAG_DEFERRED, true, TraceState::default())),
(vec![], SpanContext::new(TraceId::from_hex("1234").unwrap(), SpanId::INVALID, TRACE_FLAG_DEFERRED, true, TraceState::default())),
(vec![], SpanContext::new(TraceId::from_hex("1234").unwrap(), SpanId::INVALID, TraceFlags::SAMPLED, true, TraceState::default())),
(vec![(DATADOG_TRACE_ID_HEADER, "1234"), (DATADOG_PARENT_ID_HEADER, "12")], SpanContext::new(TraceId::from_u128(1234), SpanId::from_u64(12), TRACE_FLAG_DEFERRED, true, TraceState::default())),
(vec![(DATADOG_TRACE_ID_HEADER, "1234"), (DATADOG_PARENT_ID_HEADER, "12"), (DATADOG_SAMPLING_PRIORITY_HEADER, "0")], SpanContext::new(TraceId::from_u128(1234), SpanId::from_u64(12), TraceFlags::SAMPLED, true, DatadogTraceStateBuilder::default().with_priority_sampling(false).build())),
(vec![(DATADOG_TRACE_ID_HEADER, "1234"), (DATADOG_PARENT_ID_HEADER, "12"), (DATADOG_SAMPLING_PRIORITY_HEADER, "1")], SpanContext::new(TraceId::from_u128(1234), SpanId::from_u64(12), TraceFlags::SAMPLED, true, DatadogTraceStateBuilder::default().with_priority_sampling(true).build())),
];
#[cfg(not(feature = "agent-sampling"))]
return vec![
(vec![], SpanContext::empty_context()),
(vec![], SpanContext::new(TraceId::INVALID, SpanId::INVALID, TRACE_FLAG_DEFERRED, true, TraceState::default())),
(vec![], SpanContext::new(TraceId::from_hex("1234").unwrap(), SpanId::INVALID, TRACE_FLAG_DEFERRED, true, TraceState::default())),
(vec![], SpanContext::new(TraceId::from_hex("1234").unwrap(), SpanId::INVALID, TraceFlags::SAMPLED, true, TraceState::default())),
(vec![(DATADOG_TRACE_ID_HEADER, "1234"), (DATADOG_PARENT_ID_HEADER, "12")], SpanContext::new(TraceId::from_u128(1234), SpanId::from_u64(12), TRACE_FLAG_DEFERRED, true, TraceState::default())),
(vec![(DATADOG_TRACE_ID_HEADER, "1234"), (DATADOG_PARENT_ID_HEADER, "12"), (DATADOG_SAMPLING_PRIORITY_HEADER, "0")], SpanContext::new(TraceId::from_u128(1234), SpanId::from_u64(12), TraceFlags::default(), true, TraceState::default())),
(vec![(DATADOG_TRACE_ID_HEADER, "1234"), (DATADOG_PARENT_ID_HEADER, "12"), (DATADOG_SAMPLING_PRIORITY_HEADER, "1")], SpanContext::new(TraceId::from_u128(1234), SpanId::from_u64(12), TraceFlags::SAMPLED, true, TraceState::default())),
];
}
#[test]
fn test_extract() {
for (header_list, expected) in extract_test_data() {
let map: HashMap<String, String> = header_list
.into_iter()
.map(|(k, v)| (k.to_string(), v.to_string()))
.collect();
let propagator = DatadogPropagator::default();
let context = propagator.extract(&map);
assert_eq!(context.span().span_context(), &expected);
}
}
#[test]
fn test_extract_empty() {
let map: HashMap<String, String> = HashMap::new();
let propagator = DatadogPropagator::default();
let context = propagator.extract(&map);
assert_eq!(context.span().span_context(), &SpanContext::empty_context())
}
#[test]
fn test_extract_with_empty_remote_context() {
let map: HashMap<String, String> = HashMap::new();
let propagator = DatadogPropagator::default();
let context = propagator.extract_with_context(&Context::new(), &map);
assert!(!context.has_active_span())
}
#[test]
fn test_inject() {
let propagator = DatadogPropagator::default();
for (header_values, span_context) in inject_test_data() {
let mut injector: HashMap<String, String> = HashMap::new();
propagator.inject_context(
&Context::current_with_span(TestSpan(span_context)),
&mut injector,
);
if !header_values.is_empty() {
for (k, v) in header_values.into_iter() {
let injected_value: Option<&String> = injector.get(k);
assert_eq!(injected_value, Some(&v.to_string()));
injector.remove(k);
}
}
assert!(injector.is_empty());
}
}
}
}