use crate::core::configuration::SamplingRuleConfig;
use crate::core::constants::{
RL_EFFECTIVE_RATE, SAMPLING_AGENT_RATE_TAG_KEY, SAMPLING_DECISION_MAKER_TAG_KEY,
SAMPLING_KNUTH_RATE_TAG_KEY, SAMPLING_PRIORITY_TAG_KEY, SAMPLING_RULE_RATE_TAG_KEY,
};
use crate::core::sampling::{mechanism, priority, SamplingMechanism, SamplingPriority};
pub type SamplingRulesCallback = Box<dyn for<'a> Fn(&'a [SamplingRuleConfig]) + Send + Sync>;
use crate::sampling::{AttributeLike, SamplingData, SpanProperties, TraceIdLike, ValueLike};
use std::collections::HashMap;
use super::agent_service_sampler::{AgentRates, ServicesSampler};
use super::constants::pattern::NO_RULE;
use super::glob_matcher::GlobMatcher;
use super::rate_limiter::RateLimiter;
use super::rate_sampler::RateSampler;
use super::rules_sampler::RulesSampler;
fn matcher_from_rule(rule: &str) -> Option<GlobMatcher> {
(rule != NO_RULE).then(|| GlobMatcher::new(rule))
}
#[derive(Clone, Debug)]
pub struct SamplingRule {
sample_rate: f64,
provenance: String,
rate_sampler: RateSampler,
name_matcher: Option<GlobMatcher>,
service_matcher: Option<GlobMatcher>,
resource_matcher: Option<GlobMatcher>,
tag_matchers: HashMap<String, GlobMatcher>,
}
impl SamplingRule {
pub fn from_configs(configs: Vec<SamplingRuleConfig>) -> Vec<Self> {
configs
.into_iter()
.map(|config| {
Self::new(
config.sample_rate,
config.service,
config.name,
config.resource,
Some(config.tags),
Some(config.provenance),
)
})
.collect()
}
pub fn new(
sample_rate: f64,
service: Option<String>,
name: Option<String>,
resource: Option<String>,
tags: Option<HashMap<String, String>>,
provenance: Option<String>,
) -> Self {
let name_matcher = name.as_deref().and_then(matcher_from_rule);
let service_matcher = service.as_deref().and_then(matcher_from_rule);
let resource_matcher = resource.as_deref().and_then(matcher_from_rule);
let tag_map = tags.clone().unwrap_or_default();
let mut tag_matchers = HashMap::with_capacity(tag_map.len());
for (key, value) in &tag_map {
if let Some(matcher) = matcher_from_rule(value) {
tag_matchers.insert(key.clone(), matcher);
}
}
SamplingRule {
sample_rate,
provenance: provenance.unwrap_or_else(|| "default".to_string()),
rate_sampler: RateSampler::new(sample_rate),
name_matcher,
service_matcher,
resource_matcher,
tag_matchers,
}
}
fn matches(&self, span: &impl SpanProperties) -> bool {
let name = span.operation_name();
if let Some(ref matcher) = self.name_matcher {
if !matcher.matches(name.as_ref()) {
return false;
}
}
if let Some(ref matcher) = self.service_matcher {
let service = span.service();
if !matcher.matches(&service) {
return false;
}
}
let resource_str = span.resource();
if let Some(ref matcher) = self.resource_matcher {
if !matcher.matches(resource_str.as_ref()) {
return false;
}
}
for (key, matcher) in &self.tag_matchers {
let rule_tag_key_str = key.as_str();
if rule_tag_key_str == "http.status_code"
|| rule_tag_key_str
== opentelemetry_semantic_conventions::trace::HTTP_RESPONSE_STATUS_CODE
{
match self.match_http_status_code_rule(matcher, span) {
Some(true) => continue, Some(false) | None => return false, }
} else {
let direct_match = span
.attributes()
.find(|attr| attr.key() == rule_tag_key_str)
.and_then(|attr| self.match_attribute_value(attr.value(), matcher));
if direct_match.unwrap_or(false) {
continue;
}
if rule_tag_key_str.starts_with("http.") {
let tag_match = span.attributes().any(|attr| {
if let Some(alternate_key) = span.get_alternate_key(attr.key()) {
if alternate_key == rule_tag_key_str {
return self
.match_attribute_value(attr.value(), matcher)
.unwrap_or(false);
}
}
false
});
if !tag_match {
return false; }
} else {
return false;
}
}
}
true
}
fn match_http_status_code_rule(
&self,
matcher: &GlobMatcher,
span: &impl SpanProperties,
) -> Option<bool> {
span.status_code().and_then(|status_code| {
let status_value = opentelemetry::Value::I64(i64::from(status_code));
self.match_attribute_value(&status_value, matcher)
})
}
fn match_attribute_value(&self, value: &impl ValueLike, matcher: &GlobMatcher) -> Option<bool> {
if let Some(float_val) = value.extract_float() {
let has_decimal = float_val != (float_val as i64) as f64;
if has_decimal {
return Some(matcher.pattern().chars().all(|c| c == '*'));
}
return Some(matcher.matches(&float_val.to_string()));
}
value
.extract_string()
.map(|string_value| matcher.matches(&string_value))
}
pub fn sample(&self, trace_id: &impl TraceIdLike) -> bool {
self.rate_sampler.sample(trace_id)
}
}
#[allow(dead_code)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub enum RuleProvenance {
Customer = 0,
Dynamic = 1,
Default = 2,
}
impl From<&str> for RuleProvenance {
fn from(s: &str) -> Self {
match s {
"customer" => RuleProvenance::Customer,
"dynamic" => RuleProvenance::Dynamic,
_ => RuleProvenance::Default,
}
}
}
#[derive(Clone, Debug)]
pub struct DatadogSampler {
rules: RulesSampler,
service_samplers: ServicesSampler,
rate_limiter: RateLimiter,
}
impl DatadogSampler {
pub fn new(rules: Vec<SamplingRule>, rate_limit: i32) -> Self {
let limiter = RateLimiter::new(rate_limit, None);
DatadogSampler {
rules: RulesSampler::new(rules),
service_samplers: ServicesSampler::default(),
rate_limiter: limiter,
}
}
#[allow(dead_code)]
pub(crate) fn update_service_rates(&self, rates: impl IntoIterator<Item = (String, f64)>) {
self.service_samplers.update_rates(rates);
}
pub(crate) fn on_agent_response(&self) -> Box<dyn for<'a> Fn(&'a str) + Send + Sync> {
let service_samplers = self.service_samplers.clone();
Box::new(move |s: &str| {
let Ok(new_rates) = serde_json::de::from_str::<AgentRates>(s) else {
return;
};
let Some(new_rates) = new_rates.rates_by_service else {
return;
};
service_samplers.update_rates(new_rates.into_iter().map(|(k, v)| (k.to_string(), v)));
})
}
pub fn on_rules_update(&self) -> SamplingRulesCallback {
let rules_sampler = self.rules.clone();
Box::new(move |rule_configs: &[SamplingRuleConfig]| {
let new_rules = SamplingRule::from_configs(rule_configs.to_vec());
rules_sampler.update_rules(new_rules);
})
}
fn service_key(&self, span: &impl SpanProperties) -> String {
let service = span.service().into_owned();
let env = span.env();
format!("service:{service},env:{env}")
}
fn find_matching_rule(&self, span: &impl SpanProperties) -> Option<SamplingRule> {
self.rules.find_matching_rule(|rule| rule.matches(span))
}
fn get_sampling_mechanism(
&self,
rule: Option<&SamplingRule>,
used_agent_sampler: bool,
) -> SamplingMechanism {
if let Some(rule) = rule {
match rule.provenance.as_str() {
"customer" => mechanism::REMOTE_USER_TRACE_SAMPLING_RULE,
"dynamic" => mechanism::REMOTE_DYNAMIC_TRACE_SAMPLING_RULE,
_ => mechanism::LOCAL_USER_TRACE_SAMPLING_RULE,
}
} else if used_agent_sampler {
mechanism::AGENT_RATE_BY_SERVICE
} else {
mechanism::DEFAULT
}
}
pub fn sample(&self, data: &impl SamplingData) -> DdSamplingResult {
if let Some(is_parent_sampled) = data.is_parent_sampled() {
let priority = match is_parent_sampled {
false => priority::AUTO_REJECT,
true => priority::AUTO_KEEP,
};
return DdSamplingResult {
priority,
trace_root_info: None,
};
}
data.with_span_properties(self, |sampler, span| sampler.sample_root(data, span))
}
fn sample_root(
&self,
data: &impl SamplingData,
span: &impl SpanProperties,
) -> DdSamplingResult {
let mut is_keep = true;
let mut used_agent_sampler = false;
let sample_rate;
let mut rl_effective_rate: Option<f64> = None;
let trace_id = data.trace_id();
let matching_rule = self.find_matching_rule(span);
if let Some(rule) = &matching_rule {
sample_rate = rule.sample_rate;
if !rule.sample(trace_id) {
is_keep = false;
} else if !self.rate_limiter.is_allowed() {
is_keep = false;
rl_effective_rate = Some(self.rate_limiter.effective_rate());
}
} else {
let service_key = self.service_key(span);
if let Some(sampler) = self.service_samplers.get(&service_key) {
used_agent_sampler = true;
sample_rate = sampler.sample_rate();
if !sampler.sample(trace_id) {
is_keep = false;
}
} else {
sample_rate = 1.0;
}
}
let mechanism = self.get_sampling_mechanism(matching_rule.as_ref(), used_agent_sampler);
DdSamplingResult {
priority: mechanism.to_priority(is_keep),
trace_root_info: Some(TraceRootSamplingInfo {
mechanism,
rate: sample_rate,
rl_effective_rate,
}),
}
}
}
fn format_sampling_rate(rate: f64) -> Option<String> {
if rate.is_nan() || !(0.0..=1.0).contains(&rate) {
return None;
}
if rate == 0.0 {
return Some("0".to_string());
}
let digits = 6_i32;
let magnitude = rate.abs().log10().floor() as i32;
let scale = 10f64.powi(digits - 1 - magnitude);
let rounded = (rate * scale).round() / scale;
let decimal_places = if magnitude >= digits - 1 {
0
} else {
(digits - 1 - magnitude) as usize
};
let s = format!("{:.prec$}", rounded, prec = decimal_places);
Some(if s.contains('.') {
let s = s.trim_end_matches('0');
let s = s.trim_end_matches('.');
s.to_string()
} else {
s
})
}
pub struct TraceRootSamplingInfo {
mechanism: SamplingMechanism,
rate: f64,
rl_effective_rate: Option<f64>,
}
impl TraceRootSamplingInfo {
pub fn mechanism(&self) -> SamplingMechanism {
self.mechanism
}
pub fn rate(&self) -> f64 {
self.rate
}
pub fn rl_effective_rate(&self) -> Option<f64> {
self.rl_effective_rate
}
}
pub struct DdSamplingResult {
priority: SamplingPriority,
trace_root_info: Option<TraceRootSamplingInfo>,
}
impl DdSamplingResult {
#[inline(always)]
pub fn get_priority(&self) -> SamplingPriority {
self.priority
}
pub fn get_trace_root_sampling_info(&self) -> &Option<TraceRootSamplingInfo> {
&self.trace_root_info
}
pub fn to_dd_sampling_tags<F>(&self, factory: &F) -> Option<Vec<F::Attribute>>
where
F: crate::sampling::AttributeFactory,
{
let Some(root_info) = &self.trace_root_info else {
return None; };
let mut result: Vec<F::Attribute>;
if let Some(limit) = root_info.rl_effective_rate() {
result = Vec::with_capacity(4);
result.push(factory.create_f64(RL_EFFECTIVE_RATE, limit));
} else {
result = Vec::with_capacity(3);
}
let mechanism = root_info.mechanism();
result.push(factory.create_string(SAMPLING_DECISION_MAKER_TAG_KEY, mechanism.to_cow()));
match mechanism {
mechanism::AGENT_RATE_BY_SERVICE => {
result.push(factory.create_f64(SAMPLING_AGENT_RATE_TAG_KEY, root_info.rate()));
if let Some(rate_str) = format_sampling_rate(root_info.rate()) {
result.push(factory.create_string(
SAMPLING_KNUTH_RATE_TAG_KEY,
std::borrow::Cow::Owned(rate_str),
));
}
}
mechanism::REMOTE_USER_TRACE_SAMPLING_RULE
| mechanism::REMOTE_DYNAMIC_TRACE_SAMPLING_RULE
| mechanism::LOCAL_USER_TRACE_SAMPLING_RULE => {
result.push(factory.create_f64(SAMPLING_RULE_RATE_TAG_KEY, root_info.rate()));
if let Some(rate_str) = format_sampling_rate(root_info.rate()) {
result.push(factory.create_string(
SAMPLING_KNUTH_RATE_TAG_KEY,
std::borrow::Cow::Owned(rate_str),
));
}
}
_ => {}
}
let priority = self.priority;
result.push(factory.create_i64(SAMPLING_PRIORITY_TAG_KEY, priority.into_i8() as i64));
Some(result)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::mappings::get_otel_operation_name_v2;
use crate::sampling::constants::{
attr::{ENV_TAG, RESOURCE_TAG},
pattern,
};
use crate::sampling::otel_mappings::{OtelSamplingData, PreSampledSpan};
use opentelemetry::trace::{SpanKind, TraceId};
use opentelemetry::{Key, KeyValue, Value};
use opentelemetry_sdk::Resource as SdkResource;
use opentelemetry_semantic_conventions::{
attribute::{
DB_SYSTEM_NAME, HTTP_REQUEST_METHOD, MESSAGING_OPERATION_TYPE, MESSAGING_SYSTEM,
},
resource::SERVICE_NAME,
trace::{HTTP_RESPONSE_STATUS_CODE, NETWORK_PROTOCOL_NAME},
};
use std::sync::{Arc, RwLock};
fn create_empty_resource() -> opentelemetry_sdk::Resource {
opentelemetry_sdk::Resource::builder_empty().build()
}
fn create_empty_resource_arc() -> Arc<RwLock<opentelemetry_sdk::Resource>> {
Arc::new(RwLock::new(
opentelemetry_sdk::Resource::builder_empty().build(),
))
}
fn create_resource(res: String) -> Arc<RwLock<SdkResource>> {
let attributes = vec![
KeyValue::new(SERVICE_NAME, res), ];
let resource: SdkResource = SdkResource::builder_empty()
.with_attributes(attributes)
.build();
Arc::new(RwLock::new(resource))
}
fn create_trace_id() -> TraceId {
let bytes = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16];
TraceId::from_bytes(bytes)
}
fn create_attributes(resource: &'static str, env: &'static str) -> Vec<KeyValue> {
vec![
KeyValue::new(RESOURCE_TAG, resource),
KeyValue::new("datadog.env", env),
]
}
fn create_sampling_data<'a>(
is_parent_sampled: Option<bool>,
trace_id: &'a TraceId,
name: &'a str,
span_kind: SpanKind,
attributes: &'a [KeyValue],
resource: &'a RwLock<opentelemetry_sdk::Resource>,
) -> OtelSamplingData<'a> {
OtelSamplingData::new(
is_parent_sampled,
trace_id,
name,
span_kind,
attributes,
resource,
)
}
#[test]
fn test_sampling_rule_creation() {
let rule = SamplingRule::new(
0.5,
Some("test-service".to_string()),
Some("test-name".to_string()),
Some("test-resource".to_string()),
Some(HashMap::from([(
"custom-tag".to_string(),
"tag-value".to_string(),
)])),
Some("customer".to_string()),
);
assert_eq!(rule.sample_rate, 0.5);
assert_eq!(rule.service_matcher.unwrap().pattern(), "test-service");
assert_eq!(rule.name_matcher.unwrap().pattern(), "test-name");
assert_eq!(
rule.resource_matcher.unwrap().pattern(),
"test-resource".to_string()
);
assert_eq!(
rule.tag_matchers.get("custom-tag").unwrap().pattern(),
"tag-value"
);
assert_eq!(rule.provenance, "customer");
}
#[test]
fn test_sampling_rule_with_no_rule() {
let rule = SamplingRule::new(
0.5, None, None, None, None, None, );
assert_eq!(rule.sample_rate, 0.5);
assert!(rule.service_matcher.is_none());
assert!(rule.name_matcher.is_none());
assert!(rule.resource_matcher.is_none());
assert!(rule.tag_matchers.is_empty());
assert_eq!(rule.provenance, "default");
assert!(rule.service_matcher.is_none());
assert!(rule.name_matcher.is_none());
assert!(rule.resource_matcher.is_none());
assert!(rule.tag_matchers.is_empty());
let rule_with_empty_strings = SamplingRule::new(
0.5,
Some(pattern::NO_RULE.to_string()), Some(pattern::NO_RULE.to_string()), Some(pattern::NO_RULE.to_string()), Some(HashMap::from([(
pattern::NO_RULE.to_string(),
pattern::NO_RULE.to_string(),
)])), None,
);
assert!(rule_with_empty_strings.service_matcher.is_none());
assert!(rule_with_empty_strings.name_matcher.is_none());
assert!(rule_with_empty_strings.resource_matcher.is_none());
assert!(rule_with_empty_strings.tag_matchers.is_empty());
let attributes = create_attributes("some-resource", "some-env");
let empty_resource = create_empty_resource();
let span = PreSampledSpan::new("", SpanKind::Client, &attributes, &empty_resource);
assert!(rule.matches(&span));
assert!(rule_with_empty_strings.matches(&span));
}
#[test]
fn test_sampling_rule_matches() {
let _rule = SamplingRule::new(
0.5,
Some("web-*".to_string()),
Some("http.*".to_string()),
None,
Some(HashMap::from([(
"custom_key".to_string(),
"custom_value".to_string(),
)])),
None,
);
}
#[test]
fn test_sample_method() {
let rule_always = SamplingRule::new(1.0, None, None, None, None, None);
let rule_never = SamplingRule::new(0.0, None, None, None, None, None);
let trace_id = create_trace_id();
assert!(rule_always.sample(&trace_id));
assert!(!rule_never.sample(&trace_id));
}
#[test]
fn test_datadog_sampler_creation() {
let sampler = DatadogSampler::new(vec![], 100);
assert!(sampler.rules.is_empty());
assert!(sampler.service_samplers.is_empty());
let rule = SamplingRule::new(0.5, None, None, None, None, None);
let sampler_with_rules = DatadogSampler::new(vec![rule], 200);
assert_eq!(sampler_with_rules.rules.len(), 1);
}
#[test]
fn test_service_key_generation() {
let test_service_name = "test-service".to_string();
let resource = create_resource(test_service_name.clone());
let sampler = DatadogSampler::new(vec![], 100);
let attrs = create_attributes("resource", "production");
let res = &resource.read().unwrap();
let span = PreSampledSpan::new("test-span", SpanKind::Internal, attrs.as_slice(), res);
assert_eq!(
sampler.service_key(&span),
format!("service:{test_service_name},env:production")
);
let attrs_no_env = vec![KeyValue::new(RESOURCE_TAG, "resource")];
let span = PreSampledSpan::new(
"test-span",
SpanKind::Internal,
attrs_no_env.as_slice(),
res,
);
assert_eq!(
sampler.service_key(&span),
format!("service:{test_service_name},env:")
);
}
#[test]
fn test_update_service_rates() {
let sampler = DatadogSampler::new(vec![], 100);
let mut rates = HashMap::new();
rates.insert("service:web,env:prod".to_string(), 0.5);
rates.insert("service:api,env:prod".to_string(), 0.75);
sampler.service_samplers.update_rates(rates);
assert_eq!(sampler.service_samplers.len(), 2);
assert!(sampler
.service_samplers
.contains_key("service:web,env:prod"));
assert!(sampler
.service_samplers
.contains_key("service:api,env:prod"));
if let Some(web_sampler) = sampler.service_samplers.get("service:web,env:prod") {
assert_eq!(web_sampler.sample_rate(), 0.5);
} else {
panic!("Web service sampler not found");
}
if let Some(api_sampler) = sampler.service_samplers.get("service:api,env:prod") {
assert_eq!(api_sampler.sample_rate(), 0.75);
} else {
panic!("API service sampler not found");
}
}
#[test]
fn test_find_matching_rule() {
let rule1 = SamplingRule::new(
0.1,
Some("service1".to_string()),
None,
None,
None,
Some("customer".to_string()), );
let rule2 = SamplingRule::new(
0.2,
Some("service2".to_string()),
None,
None,
None,
Some("dynamic".to_string()), );
let rule3 = SamplingRule::new(
0.3,
Some("service*".to_string()), None,
None,
None,
Some("default".to_string()), );
let sampler = DatadogSampler::new(vec![rule1.clone(), rule2.clone(), rule3.clone()], 100);
{
let resource = create_resource("service1".to_string());
let attrs1 = create_attributes("resource_val_for_attr1", "prod");
let res = resource.read().unwrap();
let span = PreSampledSpan::new("test-span", SpanKind::Client, attrs1.as_slice(), &res);
let matching_rule_for_attrs1 = sampler.find_matching_rule(&span);
assert!(
matching_rule_for_attrs1.is_some(),
"Expected rule1 to match for service1"
);
let rule = matching_rule_for_attrs1.unwrap();
assert_eq!(rule.sample_rate, 0.1, "Expected rule1 sample rate");
assert_eq!(rule.provenance, "customer", "Expected rule1 provenance");
}
{
let resource = create_resource("service2".to_string());
let attrs2 = create_attributes("resource_val_for_attr2", "prod");
let res = resource.read().unwrap();
let span = PreSampledSpan::new("test-span", SpanKind::Client, attrs2.as_slice(), &res);
let matching_rule_for_attrs2 = sampler.find_matching_rule(&span);
assert!(
matching_rule_for_attrs2.is_some(),
"Expected rule2 to match for service2"
);
let rule = matching_rule_for_attrs2.unwrap();
assert_eq!(rule.sample_rate, 0.2, "Expected rule2 sample rate");
assert_eq!(rule.provenance, "dynamic", "Expected rule2 provenance");
}
{
let resource = create_resource("service3".to_string());
let attrs3 = create_attributes("resource_val_for_attr3", "prod");
let res = resource.read().unwrap();
let span = PreSampledSpan::new("test-span", SpanKind::Client, attrs3.as_slice(), &res);
let matching_rule_for_attrs3 = sampler.find_matching_rule(&span);
assert!(
matching_rule_for_attrs3.is_some(),
"Expected rule3 to match for service3"
);
let rule = matching_rule_for_attrs3.unwrap();
assert_eq!(rule.sample_rate, 0.3, "Expected rule3 sample rate");
assert_eq!(rule.provenance, "default", "Expected rule3 provenance");
}
{
let resource = create_resource("other_sampler_service".to_string());
let attrs4 = create_attributes("resource_val_for_attr4", "prod");
let res = resource.read().unwrap();
let span = PreSampledSpan::new("test-span", SpanKind::Client, attrs4.as_slice(), &res);
let matching_rule_for_attrs4 = sampler.find_matching_rule(&span);
assert!(
matching_rule_for_attrs4.is_none(),
"Expected no rule to match for service 'other_sampler_service'"
);
}
}
#[test]
fn test_get_sampling_mechanism() {
let sampler = DatadogSampler::new(vec![], 100);
let rule_customer =
SamplingRule::new(0.1, None, None, None, None, Some("customer".to_string()));
let rule_dynamic =
SamplingRule::new(0.2, None, None, None, None, Some("dynamic".to_string()));
let rule_default =
SamplingRule::new(0.3, None, None, None, None, Some("default".to_string()));
let mechanism1 = sampler.get_sampling_mechanism(Some(&rule_customer), false);
assert_eq!(mechanism1, mechanism::REMOTE_USER_TRACE_SAMPLING_RULE);
let mechanism2 = sampler.get_sampling_mechanism(Some(&rule_dynamic), false);
assert_eq!(mechanism2, mechanism::REMOTE_DYNAMIC_TRACE_SAMPLING_RULE);
let mechanism3 = sampler.get_sampling_mechanism(Some(&rule_default), false);
assert_eq!(mechanism3, mechanism::LOCAL_USER_TRACE_SAMPLING_RULE);
let mechanism4 = sampler.get_sampling_mechanism(None, true);
assert_eq!(mechanism4, mechanism::AGENT_RATE_BY_SERVICE);
let mechanism5 = sampler.get_sampling_mechanism(None, false);
assert_eq!(mechanism5, mechanism::DEFAULT);
}
#[test]
fn test_add_dd_sampling_tags() {
let sample_rate = 0.5;
let is_sampled = true;
let mechanism = mechanism::LOCAL_USER_TRACE_SAMPLING_RULE;
let sampling_result = DdSamplingResult {
priority: mechanism.to_priority(is_sampled),
trace_root_info: Some(TraceRootSamplingInfo {
mechanism,
rate: 0.5,
rl_effective_rate: None,
}),
};
let attrs = sampling_result
.to_dd_sampling_tags(&crate::sampling::OtelAttributeFactory)
.unwrap_or_default();
assert_eq!(attrs.len(), 4);
let mut found_decision_maker = false;
let mut found_priority = false;
let mut found_rule_rate = false;
let mut found_ksr = false;
for attr in &attrs {
match attr.key.as_str() {
SAMPLING_DECISION_MAKER_TAG_KEY => {
let value_str = match &attr.value {
opentelemetry::Value::String(s) => s.to_string(),
_ => panic!("Expected string value for decision maker tag"),
};
assert_eq!(value_str, mechanism.to_cow());
found_decision_maker = true;
}
SAMPLING_PRIORITY_TAG_KEY => {
let expected_priority = mechanism.to_priority(true).into_i8() as i64;
let value_int = match attr.value {
opentelemetry::Value::I64(i) => i,
_ => panic!("Expected integer value for priority tag"),
};
assert_eq!(value_int, expected_priority);
found_priority = true;
}
SAMPLING_RULE_RATE_TAG_KEY => {
let value_float = match attr.value {
opentelemetry::Value::F64(f) => f,
_ => panic!("Expected float value for rule rate tag"),
};
assert_eq!(value_float, sample_rate);
found_rule_rate = true;
}
SAMPLING_KNUTH_RATE_TAG_KEY => {
let value_str = match &attr.value {
opentelemetry::Value::String(s) => s.to_string(),
_ => panic!("Expected string value for ksr tag"),
};
assert_eq!(value_str, "0.5");
found_ksr = true;
}
_ => {}
}
}
assert!(found_decision_maker, "Missing decision maker tag");
assert!(found_priority, "Missing priority tag");
assert!(found_rule_rate, "Missing rule rate tag");
assert!(found_ksr, "Missing knuth sampling rate tag");
let rate_limit = 0.5;
let is_sampled = false;
let mechanism = mechanism::LOCAL_USER_TRACE_SAMPLING_RULE;
let sampling_result = DdSamplingResult {
priority: mechanism.to_priority(is_sampled),
trace_root_info: Some(TraceRootSamplingInfo {
mechanism,
rate: 0.5,
rl_effective_rate: Some(rate_limit),
}),
};
let attrs_with_limit = sampling_result
.to_dd_sampling_tags(&crate::sampling::OtelAttributeFactory)
.unwrap_or_default();
assert_eq!(attrs_with_limit.len(), 5);
let mut found_limit = false;
for attr in &attrs_with_limit {
if attr.key.as_str() == RL_EFFECTIVE_RATE {
let value_float = match attr.value {
opentelemetry::Value::F64(f) => f,
_ => panic!("Expected float value for rate limit tag"),
};
assert_eq!(value_float, rate_limit);
found_limit = true;
break;
}
}
assert!(found_limit, "Missing rate limit tag");
let agent_rate = 0.75;
let is_sampled = false;
let mechanism = mechanism::AGENT_RATE_BY_SERVICE;
let sampling_result = DdSamplingResult {
priority: mechanism.to_priority(is_sampled),
trace_root_info: Some(TraceRootSamplingInfo {
mechanism,
rate: agent_rate,
rl_effective_rate: None,
}),
};
let agent_attrs = sampling_result
.to_dd_sampling_tags(&crate::sampling::OtelAttributeFactory)
.unwrap_or_default();
assert_eq!(agent_attrs.len(), 4);
let mut found_agent_rate = false;
let mut found_ksr = false;
for attr in &agent_attrs {
match attr.key.as_str() {
SAMPLING_AGENT_RATE_TAG_KEY => {
let value_float = match attr.value {
opentelemetry::Value::F64(f) => f,
_ => panic!("Expected float value for agent rate tag"),
};
assert_eq!(value_float, agent_rate);
found_agent_rate = true;
}
SAMPLING_KNUTH_RATE_TAG_KEY => {
let value_str = match &attr.value {
opentelemetry::Value::String(s) => s.to_string(),
_ => panic!("Expected string value for ksr tag"),
};
assert_eq!(value_str, "0.75");
found_ksr = true;
}
_ => {}
}
}
assert!(found_agent_rate, "Missing agent rate tag");
assert!(
found_ksr,
"Missing knuth sampling rate tag for agent mechanism"
);
for attr in &agent_attrs {
assert_ne!(
attr.key.as_str(),
SAMPLING_RULE_RATE_TAG_KEY,
"Rule rate tag should not be present for agent mechanism"
);
}
}
#[test]
fn test_format_sampling_rate() {
assert_eq!(format_sampling_rate(1.0), Some("1".to_string()));
assert_eq!(format_sampling_rate(0.5), Some("0.5".to_string()));
assert_eq!(format_sampling_rate(0.1), Some("0.1".to_string()));
assert_eq!(format_sampling_rate(0.0), Some("0".to_string()));
assert_eq!(format_sampling_rate(0.100000), Some("0.1".to_string()));
assert_eq!(format_sampling_rate(0.500000), Some("0.5".to_string()));
assert_eq!(
format_sampling_rate(0.7654321),
Some("0.765432".to_string())
);
assert_eq!(
format_sampling_rate(0.123456789),
Some("0.123457".to_string())
);
assert_eq!(format_sampling_rate(0.001), Some("0.001".to_string()));
assert_eq!(format_sampling_rate(0.75), Some("0.75".to_string()));
assert_eq!(format_sampling_rate(0.999999), Some("0.999999".to_string()));
assert_eq!(format_sampling_rate(-0.1), None);
assert_eq!(format_sampling_rate(1.1), None);
assert_eq!(format_sampling_rate(f64::NAN), None);
assert_eq!(format_sampling_rate(f64::INFINITY), None);
assert_eq!(format_sampling_rate(f64::NEG_INFINITY), None);
}
#[test]
fn test_should_sample_parent_context() {
let sampler = DatadogSampler::new(vec![], 100);
let empty_attrs: &[KeyValue] = &[];
let trace_id = create_trace_id();
let span_kind = SpanKind::Client;
let resource = create_empty_resource_arc();
let data_sampled = create_sampling_data(
Some(true),
&trace_id,
"span",
span_kind.clone(),
empty_attrs,
resource.as_ref(),
);
let result_sampled = sampler.sample(&data_sampled);
assert!(result_sampled.get_priority().is_keep());
assert!(result_sampled
.to_dd_sampling_tags(&crate::sampling::OtelAttributeFactory)
.is_none());
let data_not_sampled = create_sampling_data(
Some(false),
&trace_id,
"span",
span_kind,
empty_attrs,
resource.as_ref(),
);
let result_not_sampled = sampler.sample(&data_not_sampled);
assert!(!result_not_sampled.get_priority().is_keep());
assert!(result_not_sampled
.to_dd_sampling_tags(&crate::sampling::OtelAttributeFactory)
.is_none());
}
#[test]
fn test_should_sample_with_rule() {
let rule = SamplingRule::new(
1.0,
Some("test-service".to_string()),
None,
None,
None,
None,
);
let sampler = DatadogSampler::new(vec![rule], 100);
let trace_id = create_trace_id();
let span_kind = SpanKind::Client;
let resource = create_empty_resource_arc();
let attrs = create_attributes("resource", "prod");
let data = create_sampling_data(
None,
&trace_id,
"span",
span_kind.clone(),
attrs.as_slice(),
resource.as_ref(),
);
let result = sampler.sample(&data);
assert!(result.get_priority().is_keep());
assert!(result
.to_dd_sampling_tags(&crate::sampling::OtelAttributeFactory)
.is_some());
let attrs_no_match = create_attributes("other-resource", "prod");
let data_no_match = create_sampling_data(
None,
&trace_id,
"span",
span_kind,
attrs_no_match.as_slice(),
resource.as_ref(),
);
let result_no_match = sampler.sample(&data_no_match);
assert!(result_no_match.get_priority().is_keep());
assert!(result_no_match
.to_dd_sampling_tags(&crate::sampling::OtelAttributeFactory)
.is_some());
}
#[test]
fn test_should_sample_with_service_rates() {
let sampler = DatadogSampler::new(vec![], 100);
let mut rates = HashMap::new();
rates.insert("service:test-service,env:prod".to_string(), 1.0); rates.insert("service:other-service,env:prod".to_string(), 0.0);
sampler.update_service_rates(rates);
let trace_id = create_trace_id();
let span_kind = SpanKind::Client;
let resource_test_service = create_resource("test-service".to_string());
let attrs_sample = create_attributes("any_resource_name_matching_env", "prod");
let data_sample = create_sampling_data(
None,
&trace_id,
"span_for_test_service",
span_kind.clone(),
attrs_sample.as_slice(),
resource_test_service.as_ref(),
);
let result_sample = sampler.sample(&data_sample);
assert!(
result_sample.get_priority().is_keep(),
"Span for test-service/prod should be sampled"
);
let resource_other_service = create_resource("other-service".to_string());
let attrs_no_sample = create_attributes("any_resource_name_matching_env", "prod");
let data_no_sample = create_sampling_data(
None,
&trace_id,
"span_for_other_service",
span_kind,
attrs_no_sample.as_slice(),
resource_other_service.as_ref(),
);
let result_no_sample = sampler.sample(&data_no_sample);
assert!(
!result_no_sample.get_priority().is_keep(),
"Span for other-service/prod should be dropped"
);
}
#[test]
fn test_sampling_rule_matches_float_attributes() {
use opentelemetry::Value;
fn create_attributes_with_float(tag_key: &'static str, float_value: f64) -> Vec<KeyValue> {
vec![
KeyValue::new(RESOURCE_TAG, "resource"),
KeyValue::new(ENV_TAG, "prod"),
KeyValue::new(tag_key, Value::F64(float_value)),
]
}
let rule_integer = SamplingRule::new(
0.5,
None,
None,
None,
Some(HashMap::from([("float_tag".to_string(), "42".to_string())])),
None,
);
let integer_float_attrs = create_attributes_with_float("float_tag", 42.0);
let resource = create_empty_resource();
let span = PreSampledSpan::new(
"test-span",
SpanKind::Client,
integer_float_attrs.as_slice(),
&resource,
);
assert!(rule_integer.matches(&span));
let rule_wildcard = SamplingRule::new(
0.5,
None,
None,
None,
Some(HashMap::from([("float_tag".to_string(), "*".to_string())])),
None,
);
let decimal_float_attrs = create_attributes_with_float("float_tag", 42.5);
let resource = create_empty_resource();
let span = PreSampledSpan::new(
"test-span",
SpanKind::Client,
decimal_float_attrs.as_slice(),
&resource,
);
assert!(rule_wildcard.matches(&span));
let rule_specific = SamplingRule::new(
0.5,
None,
None,
None,
Some(HashMap::from([(
"float_tag".to_string(),
"42.5".to_string(),
)])),
None,
);
let decimal_float_attrs = create_attributes_with_float("float_tag", 42.5);
let resource = create_empty_resource();
let span = PreSampledSpan::new(
"test-span",
SpanKind::Client,
decimal_float_attrs.as_slice(),
&resource,
);
assert!(!rule_specific.matches(&span));
let rule_prefix = SamplingRule::new(
0.5,
None,
None,
None,
Some(HashMap::from([(
"float_tag".to_string(),
"42.*".to_string(),
)])),
None,
);
let resource = create_empty_resource();
let span = PreSampledSpan::new(
"test-span",
SpanKind::Client,
decimal_float_attrs.as_slice(),
&resource,
);
assert!(!rule_prefix.matches(&span));
}
#[test]
fn test_otel_to_datadog_attribute_mapping() {
let rule = SamplingRule::new(
1.0,
None,
None,
None,
Some(HashMap::from([(
"http.response.status_code".to_string(),
"5*".to_string(),
)])),
None,
);
let otel_attrs = vec![KeyValue::new("http.response.status_code", 500)];
let resource = create_empty_resource();
let span = PreSampledSpan::new(
"test-span",
SpanKind::Client,
otel_attrs.as_slice(),
&resource,
);
assert!(rule.matches(&span));
let non_matching_attrs = vec![KeyValue::new("http.response.status_code", 200)];
let resource = create_empty_resource();
let span = PreSampledSpan::new(
"test-span",
SpanKind::Client,
non_matching_attrs.as_slice(),
&resource,
);
assert!(!rule.matches(&span));
let unrelated_attrs = vec![KeyValue::new("unrelated.attribute", "value")];
let resource = create_empty_resource();
let span = PreSampledSpan::new(
"test-span",
SpanKind::Client,
unrelated_attrs.as_slice(),
&resource,
);
assert!(!rule.matches(&span));
}
#[test]
fn test_multiple_otel_attribute_mappings() {
let mut tags = HashMap::new();
tags.insert("http.status_code".to_string(), "5*".to_string());
tags.insert("http.method".to_string(), "POST".to_string());
tags.insert("http.url".to_string(), "*api*".to_string());
let rule = SamplingRule::new(1.0, None, None, None, Some(tags), None);
let mixed_attrs = vec![
KeyValue::new("http.response.status_code", 503),
KeyValue::new("http.request.method", "POST"),
KeyValue::new("url.full", "https://example.com/api/v1/resource"),
];
let resource = create_empty_resource();
let span = PreSampledSpan::new("test-span", SpanKind::Client, &mixed_attrs, &resource);
assert!(rule.matches(&span));
let missing_method = vec![
KeyValue::new("http.response.status_code", 503),
KeyValue::new("url.full", "https://example.com/api/v1/resource"),
];
let resource = create_empty_resource();
let span = PreSampledSpan::new("test-span", SpanKind::Client, &missing_method, &resource);
assert!(!rule.matches(&span));
let wrong_method = vec![
KeyValue::new("http.response.status_code", 503),
KeyValue::new("http.request.method", "GET"), KeyValue::new("url.full", "https://example.com/api/v1/resource"),
];
let resource = create_empty_resource();
let span = PreSampledSpan::new("test-span", SpanKind::Client, &wrong_method, &resource);
assert!(!rule.matches(&span));
}
#[test]
fn test_direct_and_mapped_mixed_attributes() {
let dd_status_key_str = HTTP_RESPONSE_STATUS_CODE;
let otel_response_status_key_str = HTTP_RESPONSE_STATUS_CODE;
let custom_tag_key = "custom.tag";
let custom_tag_value = "value";
let empty_resource = create_empty_resource();
let span_kind_client = SpanKind::Client;
let mut tags_rule1 = HashMap::new();
tags_rule1.insert(dd_status_key_str.to_string(), "5*".to_string());
tags_rule1.insert(custom_tag_key.to_string(), custom_tag_value.to_string());
let rule1 = SamplingRule::new(1.0, None, None, None, Some(tags_rule1), None);
let mixed_attrs_match = vec![
KeyValue::new(otel_response_status_key_str, 503),
KeyValue::new(custom_tag_key, custom_tag_value),
];
let span = PreSampledSpan::new(
"test-span",
span_kind_client,
&mixed_attrs_match,
&empty_resource,
);
assert!(rule1.matches(&span), "Rule with dd_status_key (5*) and custom.tag should match span with otel_response_status_key (503) and custom.tag");
let dd_attrs_match = vec![
KeyValue::new(dd_status_key_str, 503),
KeyValue::new(custom_tag_key, custom_tag_value),
];
let span = PreSampledSpan::new(
"test-span",
SpanKind::Client,
&dd_attrs_match,
&empty_resource,
);
assert!(rule1.matches(&span), "Rule with dd_status_key (5*) and custom.tag should match span with dd_status_key (503) and custom.tag");
let missing_custom_tag_attrs = vec![KeyValue::new(otel_response_status_key_str, 503)];
let span = PreSampledSpan::new(
"test-span",
SpanKind::Client,
&missing_custom_tag_attrs,
&empty_resource,
);
assert!(
!rule1.matches(&span),
"Rule with dd_status_key (5*) and custom.tag should NOT match span missing custom.tag"
);
let non_matching_otel_status_attrs = vec![
KeyValue::new(otel_response_status_key_str, 200),
KeyValue::new(custom_tag_key, custom_tag_value),
];
let span = PreSampledSpan::new(
"test-span",
SpanKind::Client,
&non_matching_otel_status_attrs,
&empty_resource,
);
assert!(!rule1.matches(&span), "Rule with dd_status_key (5*) and custom.tag should NOT match span with non-matching otel_response_status_key (200)");
let no_status_code_attrs = vec![
KeyValue::new("another.tag", "irrelevant"),
KeyValue::new(custom_tag_key, custom_tag_value),
];
let span = PreSampledSpan::new(
"test-span",
SpanKind::Client,
&no_status_code_attrs,
&empty_resource,
);
assert!(!rule1.matches(&span), "Rule with dd_status_key (5*) and custom.tag should NOT match span with no status code attribute");
let mut tags_rule2 = HashMap::new();
tags_rule2.insert(otel_response_status_key_str.to_string(), "200".to_string());
tags_rule2.insert(custom_tag_key.to_string(), custom_tag_value.to_string());
let rule2 = SamplingRule::new(1.0, None, None, None, Some(tags_rule2), None);
let otel_key_rule_match_attrs = vec![
KeyValue::new(otel_response_status_key_str, 200),
KeyValue::new(custom_tag_key, custom_tag_value),
];
let span = PreSampledSpan::new(
"test-span",
SpanKind::Client,
&otel_key_rule_match_attrs,
&empty_resource,
);
assert!(rule2.matches(&span), "Rule with otel_response_status_key (200) and custom.tag should match span with otel_response_status_key (200) and custom.tag");
}
#[test]
fn test_operation_name_integration() {
let http_rule = SamplingRule::new(
1.0, None, Some("http.*.request".to_string()), None, None, Some("default".to_string()), );
let db_rule = SamplingRule::new(
1.0, None, Some("postgresql.query".to_string()), None, None, Some("default".to_string()), );
let messaging_rule = SamplingRule::new(
1.0, None, Some("kafka.process".to_string()), None, None, Some("default".to_string()), );
let sampler = DatadogSampler::new(vec![http_rule, db_rule, messaging_rule], 100);
let trace_id = create_trace_id();
let resource = create_empty_resource_arc();
let http_client_attrs = vec![KeyValue::new(
Key::from_static_str(HTTP_REQUEST_METHOD),
Value::String("GET".into()),
)];
let empty_resource: SdkResource = create_empty_resource();
let span = PreSampledSpan::new("", SpanKind::Client, &http_client_attrs, &empty_resource);
let http_client_op_name = get_otel_operation_name_v2(&span);
assert_eq!(
http_client_op_name, "http.client.request",
"HTTP client operation name should be correct"
);
let span_kind_client = SpanKind::Client;
let data = create_sampling_data(
None,
&trace_id,
"test-span",
span_kind_client.clone(),
&http_client_attrs,
resource.as_ref(),
);
let result = sampler.sample(&data);
assert!(result.get_priority().is_keep());
let http_server_attrs = vec![KeyValue::new(
Key::from_static_str(HTTP_REQUEST_METHOD),
Value::String("POST".into()),
)];
let span = PreSampledSpan::new("", SpanKind::Server, &http_server_attrs, &empty_resource);
let http_server_op_name = get_otel_operation_name_v2(&span);
assert_eq!(
http_server_op_name, "http.server.request",
"HTTP server operation name should be correct"
);
let span_kind_server = SpanKind::Server;
let data = create_sampling_data(
None,
&trace_id,
"test-span",
span_kind_server.clone(),
&http_server_attrs,
resource.as_ref(),
);
let result = sampler.sample(&data);
assert!(result.get_priority().is_keep());
let db_attrs = vec![KeyValue::new(
Key::from_static_str(DB_SYSTEM_NAME),
Value::String("postgresql".into()),
)];
let span = PreSampledSpan::new("", SpanKind::Client, &db_attrs, &empty_resource);
let db_op_name = get_otel_operation_name_v2(&span);
assert_eq!(
db_op_name, "postgresql.query",
"Database operation name should be correct"
);
let data = create_sampling_data(
None,
&trace_id,
"test-span",
span_kind_client, &db_attrs,
resource.as_ref(),
);
let result = sampler.sample(&data);
assert!(result.get_priority().is_keep());
let messaging_attrs = vec![
KeyValue::new(
Key::from_static_str(MESSAGING_SYSTEM),
Value::String("kafka".into()),
),
KeyValue::new(
Key::from_static_str(MESSAGING_OPERATION_TYPE),
Value::String("process".into()),
),
];
let span = PreSampledSpan::new("", SpanKind::Consumer, &messaging_attrs, &empty_resource);
let messaging_op_name = get_otel_operation_name_v2(&span);
assert_eq!(
messaging_op_name, "kafka.process",
"Messaging operation name should be correct"
);
let span_kind_consumer = SpanKind::Consumer;
let data = create_sampling_data(
None,
&trace_id,
"test-span",
span_kind_consumer, &messaging_attrs,
resource.as_ref(),
);
let result = sampler.sample(&data);
assert!(result.get_priority().is_keep());
let internal_attrs = vec![KeyValue::new("custom.tag", "value")];
let span = PreSampledSpan::new("", SpanKind::Internal, &internal_attrs, &empty_resource);
let internal_op_name = get_otel_operation_name_v2(&span);
assert_eq!(
internal_op_name, "internal",
"Internal operation name should be the span kind"
);
let span_kind_internal = SpanKind::Internal;
let data = create_sampling_data(
None,
&trace_id,
"test-span",
span_kind_internal,
&internal_attrs,
resource.as_ref(),
);
let result = sampler.sample(&data);
assert!(result.get_priority().is_keep());
let server_protocol_attrs = vec![KeyValue::new(
Key::from_static_str(NETWORK_PROTOCOL_NAME),
Value::String("http".into()),
)];
let span = PreSampledSpan::new(
"",
SpanKind::Server,
&server_protocol_attrs,
&empty_resource,
);
let server_protocol_op_name = get_otel_operation_name_v2(&span);
assert_eq!(
server_protocol_op_name, "http.server.request",
"Server with protocol operation name should use protocol"
);
let data = create_sampling_data(
None,
&trace_id,
"test-span",
span_kind_server,
&server_protocol_attrs,
resource.as_ref(),
);
let result = sampler.sample(&data);
assert!(result.get_priority().is_keep());
}
#[test]
fn test_on_rules_update_callback() {
let initial_rule = SamplingRule::new(
0.1,
Some("initial-service".to_string()),
None,
None,
None,
Some("default".to_string()),
);
let test_resource = Arc::new(RwLock::new(
opentelemetry_sdk::Resource::builder_empty()
.with_attributes(vec![KeyValue::new(SERVICE_NAME, "web-frontend")])
.build(),
));
let sampler = DatadogSampler::new(vec![initial_rule], 100);
assert_eq!(sampler.rules.len(), 1);
let callback = sampler.on_rules_update();
let new_rules = vec![
SamplingRuleConfig {
sample_rate: 0.5,
service: Some("web-*".to_string()),
name: Some("http.*".to_string()),
resource: None,
tags: std::collections::HashMap::new(),
provenance: "customer".to_string(),
},
SamplingRuleConfig {
sample_rate: 0.2,
service: Some("api-*".to_string()),
name: None,
resource: Some("/api/*".to_string()),
tags: [("env".to_string(), "prod".to_string())].into(),
provenance: "dynamic".to_string(),
},
];
callback(&new_rules);
assert_eq!(sampler.rules.len(), 2);
let attrs = vec![
KeyValue::new(HTTP_REQUEST_METHOD, "GET"),
];
let resource_guard = test_resource.read().unwrap();
let span = PreSampledSpan::new(
"test-span",
SpanKind::Client,
attrs.as_slice(),
&resource_guard,
);
let matching_rule = sampler.find_matching_rule(&span);
assert!(matching_rule.is_some(), "Expected to find a matching rule for service 'web-frontend' and name 'http.client.request'");
let rule = matching_rule.unwrap();
assert_eq!(rule.sample_rate, 0.5);
assert_eq!(rule.provenance, "customer");
callback(&[]);
assert_eq!(sampler.rules.len(), 0); }
}