pub(crate) mod dd_proto {
include!(concat!(env!("OUT_DIR"), "/dd_trace.rs"));
}
mod exporter;
mod propagator {
use opentelemetry::{
propagation::{text_map_propagator::FieldIter, Extractor, Injector, TextMapPropagator},
trace::{SpanContext, SpanId, TraceContextExt, TraceFlags, TraceId, TraceState},
Context,
};
use crate::exporter::u128_to_u64s;
const DATADOG_TRACE_ID_HEADER: &str = "x-datadog-trace-id";
const DATADOG_PARENT_ID_HEADER: &str = "x-datadog-parent-id";
const DATADOG_SAMPLING_PRIORITY_HEADER: &str = "x-datadog-sampling-priority";
const TRACE_FLAG_DEFERRED: TraceFlags = TraceFlags::new(0x02);
lazy_static::lazy_static! {
static ref DATADOG_HEADER_FIELDS: [String; 3] = [
DATADOG_TRACE_ID_HEADER.to_string(),
DATADOG_PARENT_ID_HEADER.to_string(),
DATADOG_SAMPLING_PRIORITY_HEADER.to_string(),
];
}
enum SamplingPriority {
UserReject = -1,
AutoReject = 0,
AutoKeep = 1,
UserKeep = 2,
}
#[derive(Debug)]
enum ExtractError {
TraceId,
SpanId,
SamplingPriority,
}
#[derive(Clone, Debug, Default)]
#[allow(clippy::module_name_repetitions)]
pub struct DatadogPropagator {
_private: (),
}
impl DatadogPropagator {
#[must_use]
pub fn new() -> Self {
DatadogPropagator::default()
}
fn extract_trace_id(trace_id: &str) -> Result<TraceId, ExtractError> {
trace_id
.parse::<u64>()
.map(|id| TraceId::from(u128::from(id).to_be_bytes()))
.map_err(|_| ExtractError::TraceId)
}
fn extract_span_id(span_id: &str) -> Result<SpanId, ExtractError> {
span_id
.parse::<u64>()
.map(|id| SpanId::from(id.to_be_bytes()))
.map_err(|_| ExtractError::SpanId)
}
fn extract_sampling_priority(
sampling_priority: &str,
) -> Result<SamplingPriority, ExtractError> {
let i = sampling_priority
.parse::<i32>()
.map_err(|_| ExtractError::SamplingPriority)?;
match i {
-1 => Ok(SamplingPriority::UserReject),
0 => Ok(SamplingPriority::AutoReject),
1 => Ok(SamplingPriority::AutoKeep),
2 => Ok(SamplingPriority::UserKeep),
_ => Err(ExtractError::SamplingPriority),
}
}
fn extract_span_context(extractor: &dyn Extractor) -> Result<SpanContext, ExtractError> {
let trace_id =
Self::extract_trace_id(extractor.get(DATADOG_TRACE_ID_HEADER).unwrap_or(""))?;
let span_id =
Self::extract_span_id(extractor.get(DATADOG_PARENT_ID_HEADER).unwrap_or(""))
.unwrap_or(SpanId::INVALID);
let sampling_priority = Self::extract_sampling_priority(
extractor
.get(DATADOG_SAMPLING_PRIORITY_HEADER)
.unwrap_or(""),
);
let sampled = match sampling_priority {
Ok(SamplingPriority::UserReject | SamplingPriority::AutoReject) => {
TraceFlags::default()
}
Ok(SamplingPriority::UserKeep | SamplingPriority::AutoKeep) => TraceFlags::SAMPLED,
Err(_) => TRACE_FLAG_DEFERRED,
};
let trace_state = TraceState::default();
Ok(SpanContext::new(
trace_id,
span_id,
sampled,
true,
trace_state,
))
}
}
impl TextMapPropagator for DatadogPropagator {
fn inject_context(&self, cx: &Context, injector: &mut dyn Injector) {
let span = cx.span();
let span_context = span.span_context();
if span_context.is_valid() {
let [t0, _] = u128_to_u64s(u128::from_be_bytes(span_context.trace_id().to_bytes()));
injector.set(DATADOG_TRACE_ID_HEADER, t0.to_string());
injector.set(
DATADOG_PARENT_ID_HEADER,
u64::from_be_bytes(span_context.span_id().to_bytes()).to_string(),
);
if span_context.trace_flags() & TRACE_FLAG_DEFERRED != TRACE_FLAG_DEFERRED {
let sampling_priority = if span_context.is_sampled() {
SamplingPriority::AutoKeep
} else {
SamplingPriority::AutoReject
};
injector.set(
DATADOG_SAMPLING_PRIORITY_HEADER,
(sampling_priority as i32).to_string(),
);
}
}
}
fn extract_with_context(&self, cx: &Context, extractor: &dyn Extractor) -> Context {
let extracted = Self::extract_span_context(extractor)
.unwrap_or_else(|_| SpanContext::empty_context());
cx.with_remote_span_context(extracted)
}
fn fields(&self) -> FieldIter<'_> {
FieldIter::new(DATADOG_HEADER_FIELDS.as_ref())
}
}
#[cfg(test)]
mod tests {
use super::*;
use opentelemetry::testing::trace::TestSpan;
use opentelemetry::trace::TraceState;
use std::collections::HashMap;
#[rustfmt::skip]
fn extract_test_data() -> Vec<(Vec<(&'static str, &'static str)>, SpanContext)> {
vec![
(vec![], SpanContext::empty_context()),
(vec![(DATADOG_SAMPLING_PRIORITY_HEADER, "0")], SpanContext::empty_context()),
(vec![(DATADOG_TRACE_ID_HEADER, "garbage")], SpanContext::empty_context()),
(vec![(DATADOG_TRACE_ID_HEADER, "1234"), (DATADOG_PARENT_ID_HEADER, "garbage")], SpanContext::new(TraceId::from_u128(1234), SpanId::INVALID, TRACE_FLAG_DEFERRED, true, TraceState::default())),
(vec![(DATADOG_TRACE_ID_HEADER, "1234"), (DATADOG_PARENT_ID_HEADER, "12")], SpanContext::new(TraceId::from_u128(1234), SpanId::from_u64(12), TRACE_FLAG_DEFERRED, true, TraceState::default())),
(vec![(DATADOG_TRACE_ID_HEADER, "1234"), (DATADOG_PARENT_ID_HEADER, "12"), (DATADOG_SAMPLING_PRIORITY_HEADER, "0")], SpanContext::new(TraceId::from_u128(1234), SpanId::from_u64(12), TraceFlags::default(), true, TraceState::default())),
(vec![(DATADOG_TRACE_ID_HEADER, "1234"), (DATADOG_PARENT_ID_HEADER, "12"), (DATADOG_SAMPLING_PRIORITY_HEADER, "1")], SpanContext::new(TraceId::from_u128(1234), SpanId::from_u64(12), TraceFlags::SAMPLED, true, TraceState::default())),
]
}
#[rustfmt::skip]
fn inject_test_data() -> Vec<(Vec<(&'static str, &'static str)>, SpanContext)> {
vec![
(vec![], SpanContext::empty_context()),
(vec![], SpanContext::new(TraceId::INVALID, SpanId::INVALID, TRACE_FLAG_DEFERRED, true, TraceState::default())),
(vec![], SpanContext::new(TraceId::from_hex("1234").unwrap(), SpanId::INVALID, TRACE_FLAG_DEFERRED, true, TraceState::default())),
(vec![], SpanContext::new(TraceId::from_hex("1234").unwrap(), SpanId::INVALID, TraceFlags::SAMPLED, true, TraceState::default())),
(vec![(DATADOG_TRACE_ID_HEADER, "1234"), (DATADOG_PARENT_ID_HEADER, "12")], SpanContext::new(TraceId::from_u128(1234), SpanId::from_u64(12), TRACE_FLAG_DEFERRED, true, TraceState::default())),
(vec![(DATADOG_TRACE_ID_HEADER, "1234"), (DATADOG_PARENT_ID_HEADER, "12"), (DATADOG_SAMPLING_PRIORITY_HEADER, "0")], SpanContext::new(TraceId::from_u128(1234), SpanId::from_u64(12), TraceFlags::default(), true, TraceState::default())),
(vec![(DATADOG_TRACE_ID_HEADER, "1234"), (DATADOG_PARENT_ID_HEADER, "12"), (DATADOG_SAMPLING_PRIORITY_HEADER, "1")], SpanContext::new(TraceId::from_u128(1234), SpanId::from_u64(12), TraceFlags::SAMPLED, true, TraceState::default())),
]
}
#[test]
fn test_extract() {
for (header_list, expected) in extract_test_data() {
let map: HashMap<String, String> = header_list
.into_iter()
.map(|(k, v)| (k.to_string(), v.to_string()))
.collect();
let propagator = DatadogPropagator::default();
let context = propagator.extract(&map);
assert_eq!(context.span().span_context(), &expected);
}
}
#[test]
fn test_extract_empty() {
let map: HashMap<String, String> = HashMap::new();
let propagator = DatadogPropagator::default();
let context = propagator.extract(&map);
assert_eq!(context.span().span_context(), &SpanContext::empty_context());
}
#[test]
fn test_inject() {
let propagator = DatadogPropagator::default();
for (header_values, span_context) in inject_test_data() {
let mut injector: HashMap<String, String> = HashMap::new();
propagator.inject_context(
&Context::current_with_span(TestSpan(span_context)),
&mut injector,
);
if !header_values.is_empty() {
for (k, v) in header_values {
let injected_value: Option<&String> = injector.get(k);
assert_eq!(injected_value, Some(&v.to_string()));
injector.remove(k);
}
}
assert!(injector.is_empty());
}
}
}
}
pub use exporter::{new_pipeline, DatadogExporter, DatadogPipelineBuilder, Error};
pub use propagator::DatadogPropagator;