use crate::core::configuration::SamplingRuleConfig;
use crate::core::constants::{
RL_EFFECTIVE_RATE, SAMPLING_AGENT_RATE_TAG_KEY, SAMPLING_DECISION_MAKER_TAG_KEY,
SAMPLING_PRIORITY_TAG_KEY, SAMPLING_RULE_RATE_TAG_KEY,
};
use crate::core::sampling::{mechanism, SamplingMechanism, SamplingPriority};
pub type SamplingRulesCallback = Box<dyn for<'a> Fn(&'a [SamplingRuleConfig]) + Send + Sync>;
use crate::mappings::{
get_dd_key_for_otlp_attribute, get_otel_env, get_otel_operation_name_v2, get_otel_resource_v2,
get_otel_service, get_otel_status_code, OtelSpan,
};
use opentelemetry::trace::SamplingDecision;
use opentelemetry::trace::TraceId;
use opentelemetry::KeyValue;
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use super::agent_service_sampler::{AgentRates, ServicesSampler};
use super::constants::pattern::NO_RULE;
use super::glob_matcher::GlobMatcher;
use super::otel_mappings::PreSampledSpan;
use super::rate_limiter::RateLimiter;
use super::rate_sampler::RateSampler;
use super::rules_sampler::RulesSampler;
use super::utils;
fn matcher_from_rule(rule: &str) -> Option<GlobMatcher> {
(rule != NO_RULE).then(|| GlobMatcher::new(rule))
}
#[derive(Clone, Debug)]
pub struct SamplingRule {
sample_rate: f64,
provenance: String,
rate_sampler: RateSampler,
name_matcher: Option<GlobMatcher>,
service_matcher: Option<GlobMatcher>,
resource_matcher: Option<GlobMatcher>,
tag_matchers: HashMap<String, GlobMatcher>,
}
impl SamplingRule {
pub fn from_configs(configs: Vec<SamplingRuleConfig>) -> Vec<Self> {
configs
.into_iter()
.map(|config| {
Self::new(
config.sample_rate,
config.service,
config.name,
config.resource,
Some(config.tags),
Some(config.provenance),
)
})
.collect()
}
pub fn new(
sample_rate: f64,
service: Option<String>,
name: Option<String>,
resource: Option<String>,
tags: Option<HashMap<String, String>>,
provenance: Option<String>,
) -> Self {
let name_matcher = name.as_deref().and_then(matcher_from_rule);
let service_matcher = service.as_deref().and_then(matcher_from_rule);
let resource_matcher = resource.as_deref().and_then(matcher_from_rule);
let tag_map = tags.clone().unwrap_or_default();
let mut tag_matchers = HashMap::with_capacity(tag_map.len());
for (key, value) in &tag_map {
if let Some(matcher) = matcher_from_rule(value) {
tag_matchers.insert(key.clone(), matcher);
}
}
SamplingRule {
sample_rate,
provenance: provenance.unwrap_or_else(|| "default".to_string()),
rate_sampler: RateSampler::new(sample_rate),
name_matcher,
service_matcher,
resource_matcher,
tag_matchers,
}
}
fn matches(&self, span: &PreSampledSpan) -> bool {
let name: std::borrow::Cow<'_, str> = get_otel_operation_name_v2(span);
if let Some(ref matcher) = self.name_matcher {
if !matcher.matches(name.as_ref()) {
return false;
}
}
if let Some(ref matcher) = self.service_matcher {
let service_from_resource = get_otel_service(span);
if !matcher.matches(&service_from_resource) {
return false;
}
}
let resource_str: std::borrow::Cow<'_, str> = get_otel_resource_v2(span);
if let Some(ref matcher) = self.resource_matcher {
if !matcher.matches(resource_str.as_ref()) {
return false;
}
}
for (key, matcher) in &self.tag_matchers {
let rule_tag_key_str = key.as_str();
if rule_tag_key_str == "http.status_code"
|| rule_tag_key_str
== opentelemetry_semantic_conventions::trace::HTTP_RESPONSE_STATUS_CODE
{
match self.match_http_status_code_rule(matcher, span) {
Some(true) => continue, Some(false) | None => return false, }
} else {
let direct_match = span
.attributes
.iter()
.find(|kv| kv.key.as_str() == rule_tag_key_str)
.and_then(|kv| self.match_attribute_value(&kv.value, matcher));
if direct_match.unwrap_or(false) {
continue;
}
if rule_tag_key_str.starts_with("http.") {
let tag_match = span.attributes.iter().any(|kv| {
let dd_key_from_otel_attr = get_dd_key_for_otlp_attribute(kv.key.as_str());
if dd_key_from_otel_attr == rule_tag_key_str {
return self
.match_attribute_value(&kv.value, matcher)
.unwrap_or(false);
}
false
});
if !tag_match {
return false; }
} else {
return false;
}
}
}
true
}
fn match_http_status_code_rule(
&self,
matcher: &GlobMatcher,
span: &PreSampledSpan,
) -> Option<bool> {
let status_code_u32 = get_otel_status_code(span);
if status_code_u32 != 0 {
let status_value = opentelemetry::Value::I64(i64::from(status_code_u32));
self.match_attribute_value(&status_value, matcher)
} else {
None }
}
fn match_attribute_value(
&self,
value: &opentelemetry::Value,
matcher: &GlobMatcher,
) -> Option<bool> {
if let Some(float_val) = utils::extract_float_value(value) {
let has_decimal = float_val != (float_val as i64) as f64;
if has_decimal {
return Some(matcher.pattern().chars().all(|c| c == '*'));
}
return Some(matcher.matches(&float_val.to_string()));
}
utils::extract_string_value(value).map(|string_value| matcher.matches(&string_value))
}
pub fn sample(&self, trace_id: TraceId) -> bool {
self.rate_sampler.sample(trace_id)
}
}
#[allow(dead_code)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub enum RuleProvenance {
Customer = 0,
Dynamic = 1,
Default = 2,
}
impl From<&str> for RuleProvenance {
fn from(s: &str) -> Self {
match s {
"customer" => RuleProvenance::Customer,
"dynamic" => RuleProvenance::Dynamic,
_ => RuleProvenance::Default,
}
}
}
#[derive(Clone, Debug)]
pub struct DatadogSampler {
rules: RulesSampler,
service_samplers: ServicesSampler,
rate_limiter: RateLimiter,
resource: Arc<RwLock<opentelemetry_sdk::Resource>>,
}
impl DatadogSampler {
pub fn new(
rules: Vec<SamplingRule>,
rate_limit: i32,
resource: Arc<RwLock<opentelemetry_sdk::Resource>>,
) -> Self {
let limiter = RateLimiter::new(rate_limit, None);
DatadogSampler {
rules: RulesSampler::new(rules),
service_samplers: ServicesSampler::default(),
rate_limiter: limiter,
resource,
}
}
#[allow(dead_code)]
pub(crate) fn update_service_rates(&self, rates: impl IntoIterator<Item = (String, f64)>) {
self.service_samplers.update_rates(rates);
}
pub fn on_agent_response(&self) -> Box<dyn for<'a> Fn(&'a str) + Send + Sync> {
let service_samplers = self.service_samplers.clone();
Box::new(move |s: &str| {
let Ok(new_rates) = serde_json::de::from_str::<AgentRates>(s) else {
return;
};
let Some(new_rates) = new_rates.rates_by_service else {
return;
};
service_samplers.update_rates(new_rates.into_iter().map(|(k, v)| (k.to_string(), v)));
})
}
pub fn on_rules_update(&self) -> SamplingRulesCallback {
let rules_sampler = self.rules.clone();
Box::new(move |rule_configs: &[SamplingRuleConfig]| {
let new_rules = SamplingRule::from_configs(rule_configs.to_vec());
rules_sampler.update_rules(new_rules);
})
}
fn service_key<'a>(&self, span: &impl OtelSpan<'a>) -> String {
let service = get_otel_service(span).into_owned();
let env = get_otel_env(span);
format!("service:{service},env:{env}")
}
fn find_matching_rule(&self, span: &PreSampledSpan) -> Option<SamplingRule> {
self.rules.find_matching_rule(|rule| rule.matches(span))
}
fn get_sampling_mechanism(
&self,
rule: Option<&SamplingRule>,
used_agent_sampler: bool,
) -> SamplingMechanism {
if let Some(rule) = rule {
match rule.provenance.as_str() {
"customer" => mechanism::REMOTE_USER_TRACE_SAMPLING_RULE,
"dynamic" => mechanism::REMOTE_DYNAMIC_TRACE_SAMPLING_RULE,
_ => mechanism::LOCAL_USER_TRACE_SAMPLING_RULE,
}
} else if used_agent_sampler {
mechanism::AGENT_RATE_BY_SERVICE
} else {
mechanism::DEFAULT
}
}
pub fn sample(
&self,
is_parent_sampled: Option<bool>,
trace_id: TraceId,
_name: &str,
span_kind: &opentelemetry::trace::SpanKind,
attributes: &[KeyValue],
) -> DdSamplingResult {
if let Some(is_parent_sampled) = is_parent_sampled {
return DdSamplingResult {
is_keep: is_parent_sampled,
trace_root_info: None,
};
}
self.sample_root(trace_id, _name, span_kind, attributes)
}
fn sample_root(
&self,
trace_id: TraceId,
name: &str,
span_kind: &opentelemetry::trace::SpanKind,
attributes: &[KeyValue],
) -> DdSamplingResult {
let mut is_keep = true;
let mut used_agent_sampler = false;
let sample_rate;
let mut rl_effective_rate: Option<i32> = None;
let resource_guard = self.resource.read().unwrap();
let span = PreSampledSpan::new(name, span_kind.clone(), attributes, &resource_guard);
let matching_rule = self.find_matching_rule(&span);
if let Some(rule) = &matching_rule {
sample_rate = rule.sample_rate;
if !rule.sample(trace_id) {
is_keep = false;
} else if !self.rate_limiter.is_allowed() {
is_keep = false;
rl_effective_rate = Some(self.rate_limiter.effective_rate() as i32);
}
} else {
let service_key = self.service_key(&span);
if let Some(sampler) = self.service_samplers.get(&service_key) {
used_agent_sampler = true;
sample_rate = sampler.sample_rate();
if !sampler.sample(trace_id) {
is_keep = false;
}
} else {
sample_rate = 1.0;
}
}
let mechanism = self.get_sampling_mechanism(matching_rule.as_ref(), used_agent_sampler);
DdSamplingResult {
is_keep,
trace_root_info: Some(TraceRootSamplingInfo {
mechanism,
priority: mechanism.to_priority(is_keep),
rate: sample_rate,
rl_effective_rate,
}),
}
}
}
pub struct DdSamplingResult {
pub is_keep: bool,
pub trace_root_info: Option<TraceRootSamplingInfo>,
}
pub struct TraceRootSamplingInfo {
pub priority: SamplingPriority,
pub mechanism: SamplingMechanism,
pub rate: f64,
pub rl_effective_rate: Option<i32>,
}
impl DdSamplingResult {
pub fn to_dd_sampling_tags(&self) -> Vec<KeyValue> {
let mut result = Vec::new();
let Some(root_info) = &self.trace_root_info else {
return result; };
if let Some(limit) = root_info.rl_effective_rate {
result.push(KeyValue::new(RL_EFFECTIVE_RATE, limit as i64));
}
let mechanism = root_info.mechanism;
result.push(KeyValue::new(
SAMPLING_DECISION_MAKER_TAG_KEY,
mechanism.to_cow(),
));
match mechanism {
mechanism::AGENT_RATE_BY_SERVICE => {
result.push(KeyValue::new(SAMPLING_AGENT_RATE_TAG_KEY, root_info.rate));
}
mechanism::REMOTE_USER_TRACE_SAMPLING_RULE
| mechanism::REMOTE_DYNAMIC_TRACE_SAMPLING_RULE
| mechanism::LOCAL_USER_TRACE_SAMPLING_RULE => {
result.push(KeyValue::new(SAMPLING_RULE_RATE_TAG_KEY, root_info.rate));
}
_ => {}
}
let priority = root_info.priority;
result.push(KeyValue::new(
SAMPLING_PRIORITY_TAG_KEY,
priority.into_i8() as i64,
));
result
}
pub fn to_otel_decision(&self) -> SamplingDecision {
if self.is_keep {
SamplingDecision::RecordAndSample
} else {
SamplingDecision::RecordOnly
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::sampling::constants::{
attr::{ENV_TAG, RESOURCE_TAG},
pattern,
};
use opentelemetry::{trace::SpanKind, Key, KeyValue, Value};
use opentelemetry_sdk::Resource as SdkResource;
use opentelemetry_semantic_conventions::{
attribute::{
DB_SYSTEM_NAME, HTTP_REQUEST_METHOD, MESSAGING_OPERATION_TYPE, MESSAGING_SYSTEM,
},
resource::SERVICE_NAME,
trace::{HTTP_RESPONSE_STATUS_CODE, NETWORK_PROTOCOL_NAME},
};
fn create_empty_resource() -> opentelemetry_sdk::Resource {
opentelemetry_sdk::Resource::builder_empty().build()
}
fn create_empty_resource_arc() -> Arc<RwLock<opentelemetry_sdk::Resource>> {
Arc::new(RwLock::new(
opentelemetry_sdk::Resource::builder_empty().build(),
))
}
fn create_resource(res: String) -> Arc<RwLock<SdkResource>> {
let attributes = vec![
KeyValue::new(SERVICE_NAME, res), ];
let resource: SdkResource = SdkResource::builder_empty()
.with_attributes(attributes)
.build();
Arc::new(RwLock::new(resource))
}
fn create_trace_id() -> TraceId {
let bytes = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16];
TraceId::from_bytes(bytes)
}
fn create_attributes(resource: &'static str, env: &'static str) -> Vec<KeyValue> {
vec![
KeyValue::new(RESOURCE_TAG, resource),
KeyValue::new("datadog.env", env),
]
}
#[test]
fn test_sampling_rule_creation() {
let rule = SamplingRule::new(
0.5,
Some("test-service".to_string()),
Some("test-name".to_string()),
Some("test-resource".to_string()),
Some(HashMap::from([(
"custom-tag".to_string(),
"tag-value".to_string(),
)])),
Some("customer".to_string()),
);
assert_eq!(rule.sample_rate, 0.5);
assert_eq!(rule.service_matcher.unwrap().pattern(), "test-service");
assert_eq!(rule.name_matcher.unwrap().pattern(), "test-name");
assert_eq!(
rule.resource_matcher.unwrap().pattern(),
"test-resource".to_string()
);
assert_eq!(
rule.tag_matchers.get("custom-tag").unwrap().pattern(),
"tag-value"
);
assert_eq!(rule.provenance, "customer");
}
#[test]
fn test_sampling_rule_with_no_rule() {
let rule = SamplingRule::new(
0.5, None, None, None, None, None, );
assert_eq!(rule.sample_rate, 0.5);
assert!(rule.service_matcher.is_none());
assert!(rule.name_matcher.is_none());
assert!(rule.resource_matcher.is_none());
assert!(rule.tag_matchers.is_empty());
assert_eq!(rule.provenance, "default");
assert!(rule.service_matcher.is_none());
assert!(rule.name_matcher.is_none());
assert!(rule.resource_matcher.is_none());
assert!(rule.tag_matchers.is_empty());
let rule_with_empty_strings = SamplingRule::new(
0.5,
Some(pattern::NO_RULE.to_string()), Some(pattern::NO_RULE.to_string()), Some(pattern::NO_RULE.to_string()), Some(HashMap::from([(
pattern::NO_RULE.to_string(),
pattern::NO_RULE.to_string(),
)])), None,
);
assert!(rule_with_empty_strings.service_matcher.is_none());
assert!(rule_with_empty_strings.name_matcher.is_none());
assert!(rule_with_empty_strings.resource_matcher.is_none());
assert!(rule_with_empty_strings.tag_matchers.is_empty());
let attributes = create_attributes("some-resource", "some-env");
let empty_resource = create_empty_resource();
let span = PreSampledSpan::new("", SpanKind::Client, &attributes, &empty_resource);
assert!(rule.matches(&span));
assert!(rule_with_empty_strings.matches(&span));
}
#[test]
fn test_sampling_rule_matches() {
let _rule = SamplingRule::new(
0.5,
Some("web-*".to_string()),
Some("http.*".to_string()),
None,
Some(HashMap::from([(
"custom_key".to_string(),
"custom_value".to_string(),
)])),
None,
);
}
#[test]
fn test_sample_method() {
let rule_always = SamplingRule::new(1.0, None, None, None, None, None);
let rule_never = SamplingRule::new(0.0, None, None, None, None, None);
let trace_id = create_trace_id();
assert!(rule_always.sample(trace_id));
assert!(!rule_never.sample(trace_id));
}
#[test]
fn test_datadog_sampler_creation() {
let sampler = DatadogSampler::new(vec![], 100, create_empty_resource_arc());
assert!(sampler.rules.is_empty());
assert!(sampler.service_samplers.is_empty());
let rule = SamplingRule::new(0.5, None, None, None, None, None);
let sampler_with_rules = DatadogSampler::new(vec![rule], 200, create_empty_resource_arc());
assert_eq!(sampler_with_rules.rules.len(), 1);
}
#[test]
fn test_service_key_generation() {
let test_service_name = "test-service".to_string();
let sampler_resource = create_resource(test_service_name.clone());
let sampler = DatadogSampler::new(vec![], 100, sampler_resource);
let attrs = create_attributes("resource", "production");
let res = &sampler.resource.read().unwrap();
let span = PreSampledSpan::new("test-span", SpanKind::Internal, attrs.as_slice(), res);
assert_eq!(
sampler.service_key(&span),
format!("service:{test_service_name},env:production")
);
let attrs_no_env = vec![KeyValue::new(RESOURCE_TAG, "resource")];
let span = PreSampledSpan::new(
"test-span",
SpanKind::Internal,
attrs_no_env.as_slice(),
res,
);
assert_eq!(
sampler.service_key(&span),
format!("service:{test_service_name},env:")
);
}
#[test]
fn test_update_service_rates() {
let sampler = DatadogSampler::new(vec![], 100, create_empty_resource_arc());
let mut rates = HashMap::new();
rates.insert("service:web,env:prod".to_string(), 0.5);
rates.insert("service:api,env:prod".to_string(), 0.75);
sampler.service_samplers.update_rates(rates);
assert_eq!(sampler.service_samplers.len(), 2);
assert!(sampler
.service_samplers
.contains_key("service:web,env:prod"));
assert!(sampler
.service_samplers
.contains_key("service:api,env:prod"));
if let Some(web_sampler) = sampler.service_samplers.get("service:web,env:prod") {
assert_eq!(web_sampler.sample_rate(), 0.5);
} else {
panic!("Web service sampler not found");
}
if let Some(api_sampler) = sampler.service_samplers.get("service:api,env:prod") {
assert_eq!(api_sampler.sample_rate(), 0.75);
} else {
panic!("API service sampler not found");
}
}
#[test]
fn test_find_matching_rule() {
let rule1 = SamplingRule::new(
0.1,
Some("service1".to_string()),
None,
None,
None,
Some("customer".to_string()), );
let rule2 = SamplingRule::new(
0.2,
Some("service2".to_string()),
None,
None,
None,
Some("dynamic".to_string()), );
let rule3 = SamplingRule::new(
0.3,
Some("service*".to_string()), None,
None,
None,
Some("default".to_string()), );
let mut sampler = DatadogSampler::new(
vec![rule1.clone(), rule2.clone(), rule3.clone()],
100,
create_empty_resource_arc(), );
{
sampler.resource = create_resource("service1".to_string());
let attrs1 = create_attributes("resource_val_for_attr1", "prod");
let res = sampler.resource.read().unwrap();
let span = PreSampledSpan::new("test-span", SpanKind::Client, attrs1.as_slice(), &res);
let matching_rule_for_attrs1 = sampler.find_matching_rule(&span);
assert!(
matching_rule_for_attrs1.is_some(),
"Expected rule1 to match for service1"
);
let rule = matching_rule_for_attrs1.unwrap();
assert_eq!(rule.sample_rate, 0.1, "Expected rule1 sample rate");
assert_eq!(rule.provenance, "customer", "Expected rule1 provenance");
}
{
sampler.resource = create_resource("service2".to_string());
let attrs2 = create_attributes("resource_val_for_attr2", "prod");
let res = sampler.resource.read().unwrap();
let span = PreSampledSpan::new("test-span", SpanKind::Client, attrs2.as_slice(), &res);
let matching_rule_for_attrs2 = sampler.find_matching_rule(&span);
assert!(
matching_rule_for_attrs2.is_some(),
"Expected rule2 to match for service2"
);
let rule = matching_rule_for_attrs2.unwrap();
assert_eq!(rule.sample_rate, 0.2, "Expected rule2 sample rate");
assert_eq!(rule.provenance, "dynamic", "Expected rule2 provenance");
}
{
sampler.resource = create_resource("service3".to_string());
let attrs3 = create_attributes("resource_val_for_attr3", "prod");
let res = sampler.resource.read().unwrap();
let span = PreSampledSpan::new("test-span", SpanKind::Client, attrs3.as_slice(), &res);
let matching_rule_for_attrs3 = sampler.find_matching_rule(&span);
assert!(
matching_rule_for_attrs3.is_some(),
"Expected rule3 to match for service3"
);
let rule = matching_rule_for_attrs3.unwrap();
assert_eq!(rule.sample_rate, 0.3, "Expected rule3 sample rate");
assert_eq!(rule.provenance, "default", "Expected rule3 provenance");
}
{
sampler.resource = create_resource("other_sampler_service".to_string());
let attrs4 = create_attributes("resource_val_for_attr4", "prod");
let res = sampler.resource.read().unwrap();
let span = PreSampledSpan::new("test-span", SpanKind::Client, attrs4.as_slice(), &res);
let matching_rule_for_attrs4 = sampler.find_matching_rule(&span);
assert!(
matching_rule_for_attrs4.is_none(),
"Expected no rule to match for service 'other_sampler_service'"
);
}
}
#[test]
fn test_get_sampling_mechanism() {
let sampler = DatadogSampler::new(vec![], 100, create_empty_resource_arc());
let rule_customer =
SamplingRule::new(0.1, None, None, None, None, Some("customer".to_string()));
let rule_dynamic =
SamplingRule::new(0.2, None, None, None, None, Some("dynamic".to_string()));
let rule_default =
SamplingRule::new(0.3, None, None, None, None, Some("default".to_string()));
let mechanism1 = sampler.get_sampling_mechanism(Some(&rule_customer), false);
assert_eq!(mechanism1, mechanism::REMOTE_USER_TRACE_SAMPLING_RULE);
let mechanism2 = sampler.get_sampling_mechanism(Some(&rule_dynamic), false);
assert_eq!(mechanism2, mechanism::REMOTE_DYNAMIC_TRACE_SAMPLING_RULE);
let mechanism3 = sampler.get_sampling_mechanism(Some(&rule_default), false);
assert_eq!(mechanism3, mechanism::LOCAL_USER_TRACE_SAMPLING_RULE);
let mechanism4 = sampler.get_sampling_mechanism(None, true);
assert_eq!(mechanism4, mechanism::AGENT_RATE_BY_SERVICE);
let mechanism5 = sampler.get_sampling_mechanism(None, false);
assert_eq!(mechanism5, mechanism::DEFAULT);
}
#[test]
fn test_add_dd_sampling_tags() {
let sample_rate = 0.5;
let is_sampled = true;
let mechanism = mechanism::LOCAL_USER_TRACE_SAMPLING_RULE;
let sampling_result = DdSamplingResult {
is_keep: true,
trace_root_info: Some(TraceRootSamplingInfo {
priority: mechanism.to_priority(is_sampled),
mechanism,
rate: 0.5,
rl_effective_rate: None,
}),
};
let attrs = sampling_result.to_dd_sampling_tags();
assert_eq!(attrs.len(), 3);
let mut found_decision_maker = false;
let mut found_priority = false;
let mut found_rule_rate = false;
for attr in &attrs {
match attr.key.as_str() {
SAMPLING_DECISION_MAKER_TAG_KEY => {
let value_str = match &attr.value {
opentelemetry::Value::String(s) => s.to_string(),
_ => panic!("Expected string value for decision maker tag"),
};
assert_eq!(value_str, mechanism.to_cow());
found_decision_maker = true;
}
SAMPLING_PRIORITY_TAG_KEY => {
let expected_priority = mechanism.to_priority(true).into_i8() as i64;
let value_int = match attr.value {
opentelemetry::Value::I64(i) => i,
_ => panic!("Expected integer value for priority tag"),
};
assert_eq!(value_int, expected_priority);
found_priority = true;
}
SAMPLING_RULE_RATE_TAG_KEY => {
let value_float = match attr.value {
opentelemetry::Value::F64(f) => f,
_ => panic!("Expected float value for rule rate tag"),
};
assert_eq!(value_float, sample_rate);
found_rule_rate = true;
}
_ => {}
}
}
assert!(found_decision_maker, "Missing decision maker tag");
assert!(found_priority, "Missing priority tag");
assert!(found_rule_rate, "Missing rule rate tag");
let rate_limit = 100;
let is_sampled = false;
let mechanism = mechanism::LOCAL_USER_TRACE_SAMPLING_RULE;
let sampling_result = DdSamplingResult {
is_keep: false,
trace_root_info: Some(TraceRootSamplingInfo {
priority: mechanism.to_priority(is_sampled),
mechanism,
rate: 0.5,
rl_effective_rate: Some(rate_limit),
}),
};
let attrs_with_limit = sampling_result.to_dd_sampling_tags();
assert_eq!(attrs_with_limit.len(), 4);
let mut found_limit = false;
for attr in &attrs_with_limit {
if attr.key.as_str() == RL_EFFECTIVE_RATE {
let value_int = match attr.value {
opentelemetry::Value::I64(i) => i,
_ => panic!("Expected integer value for rate limit tag"),
};
assert_eq!(value_int, rate_limit as i64);
found_limit = true;
break;
}
}
assert!(found_limit, "Missing rate limit tag");
let agent_rate = 0.75;
let is_sampled = false;
let mechanism = mechanism::AGENT_RATE_BY_SERVICE;
let sampling_result = DdSamplingResult {
is_keep: false,
trace_root_info: Some(TraceRootSamplingInfo {
priority: mechanism.to_priority(is_sampled),
mechanism,
rate: agent_rate,
rl_effective_rate: None,
}),
};
let agent_attrs = sampling_result.to_dd_sampling_tags();
assert_eq!(agent_attrs.len(), 3);
let mut found_agent_rate = false;
for attr in &agent_attrs {
if attr.key.as_str() == SAMPLING_AGENT_RATE_TAG_KEY {
let value_float = match attr.value {
opentelemetry::Value::F64(f) => f,
_ => panic!("Expected float value for agent rate tag"),
};
assert_eq!(value_float, agent_rate);
found_agent_rate = true;
break;
}
}
assert!(found_agent_rate, "Missing agent rate tag");
for attr in &agent_attrs {
assert_ne!(
attr.key.as_str(),
SAMPLING_RULE_RATE_TAG_KEY,
"Rule rate tag should not be present for agent mechanism"
);
}
}
#[test]
fn test_should_sample_parent_context() {
let sampler = DatadogSampler::new(vec![], 100, create_empty_resource_arc());
let empty_attrs: &[KeyValue] = &[];
let result_sampled = sampler.sample(
Some(true),
create_trace_id(),
"span",
&SpanKind::Client,
empty_attrs,
);
assert_eq!(
result_sampled.to_otel_decision(),
SamplingDecision::RecordAndSample
);
assert!(result_sampled.to_dd_sampling_tags().is_empty());
let result_not_sampled = sampler.sample(
Some(false),
create_trace_id(),
"span",
&SpanKind::Client,
empty_attrs,
);
assert_eq!(
result_not_sampled.to_otel_decision(),
SamplingDecision::RecordOnly
);
assert!(result_not_sampled.to_dd_sampling_tags().is_empty());
}
#[test]
fn test_should_sample_with_rule() {
let rule = SamplingRule::new(
1.0,
Some("test-service".to_string()),
None,
None,
None,
None,
);
let sampler = DatadogSampler::new(vec![rule], 100, create_empty_resource_arc());
let attrs = create_attributes("resource", "prod");
let result = sampler.sample(
None,
create_trace_id(),
"span",
&SpanKind::Client,
attrs.as_slice(),
);
assert_eq!(result.to_otel_decision(), SamplingDecision::RecordAndSample);
assert!(!result.to_dd_sampling_tags().is_empty());
let attrs_no_match = create_attributes("other-resource", "prod");
let result_no_match = sampler.sample(
None,
create_trace_id(),
"span",
&SpanKind::Client,
attrs_no_match.as_slice(),
);
assert_eq!(
result_no_match.to_otel_decision(),
SamplingDecision::RecordAndSample
);
assert!(!result_no_match.to_dd_sampling_tags().is_empty());
}
#[test]
fn test_should_sample_with_service_rates() {
let mut sampler =
DatadogSampler::new(vec![], 100, create_resource("test-service".to_string()));
let mut rates = HashMap::new();
rates.insert("service:test-service,env:prod".to_string(), 1.0); rates.insert("service:other-service,env:prod".to_string(), 0.0);
sampler.update_service_rates(rates);
let attrs_sample = create_attributes("any_resource_name_matching_env", "prod");
let result_sample = sampler.sample(
None,
create_trace_id(),
"span_for_test_service",
&SpanKind::Client,
attrs_sample.as_slice(),
);
assert_eq!(
result_sample.to_otel_decision(),
SamplingDecision::RecordAndSample,
"Span for test-service/prod should be sampled"
);
sampler.resource = create_resource("other-service".to_string());
let attrs_no_sample = create_attributes("any_resource_name_matching_env", "prod");
let result_no_sample = sampler.sample(
None,
create_trace_id(),
"span_for_other_service",
&SpanKind::Client,
attrs_no_sample.as_slice(),
);
assert_eq!(
result_no_sample.to_otel_decision(),
SamplingDecision::RecordOnly,
"Span for other-service/prod should be dropped"
);
}
#[test]
fn test_sampling_rule_matches_float_attributes() {
use opentelemetry::Value;
fn create_attributes_with_float(tag_key: &'static str, float_value: f64) -> Vec<KeyValue> {
vec![
KeyValue::new(RESOURCE_TAG, "resource"),
KeyValue::new(ENV_TAG, "prod"),
KeyValue::new(tag_key, Value::F64(float_value)),
]
}
let rule_integer = SamplingRule::new(
0.5,
None,
None,
None,
Some(HashMap::from([("float_tag".to_string(), "42".to_string())])),
None,
);
let integer_float_attrs = create_attributes_with_float("float_tag", 42.0);
assert!(rule_integer.matches(&PreSampledSpan::new(
"test-span",
SpanKind::Client,
integer_float_attrs.as_slice(),
&create_empty_resource()
)));
let rule_wildcard = SamplingRule::new(
0.5,
None,
None,
None,
Some(HashMap::from([("float_tag".to_string(), "*".to_string())])),
None,
);
let decimal_float_attrs = create_attributes_with_float("float_tag", 42.5);
assert!(rule_wildcard.matches(&PreSampledSpan::new(
"test-span",
SpanKind::Client,
decimal_float_attrs.as_slice(),
&create_empty_resource()
)));
let rule_specific = SamplingRule::new(
0.5,
None,
None,
None,
Some(HashMap::from([(
"float_tag".to_string(),
"42.5".to_string(),
)])),
None,
);
let decimal_float_attrs = create_attributes_with_float("float_tag", 42.5);
assert!(!rule_specific.matches(&PreSampledSpan::new(
"test-span",
SpanKind::Client,
decimal_float_attrs.as_slice(),
&create_empty_resource()
)));
let rule_prefix = SamplingRule::new(
0.5,
None,
None,
None,
Some(HashMap::from([(
"float_tag".to_string(),
"42.*".to_string(),
)])),
None,
);
assert!(!rule_prefix.matches(&PreSampledSpan::new(
"test-span",
SpanKind::Client,
decimal_float_attrs.as_slice(),
&create_empty_resource()
)));
}
#[test]
fn test_otel_to_datadog_attribute_mapping() {
let rule = SamplingRule::new(
1.0,
None,
None,
None,
Some(HashMap::from([(
"http.response.status_code".to_string(),
"5*".to_string(),
)])),
None,
);
let otel_attrs = vec![KeyValue::new("http.response.status_code", 500)];
assert!(rule.matches(&PreSampledSpan::new(
"test-span",
SpanKind::Client,
otel_attrs.as_slice(),
&create_empty_resource()
)));
let non_matching_attrs = vec![KeyValue::new("http.response.status_code", 200)];
assert!(!rule.matches(&PreSampledSpan::new(
"test-span",
SpanKind::Client,
non_matching_attrs.as_slice(),
&create_empty_resource()
)));
let unrelated_attrs = vec![KeyValue::new("unrelated.attribute", "value")];
assert!(!rule.matches(&PreSampledSpan::new(
"test-span",
SpanKind::Client,
unrelated_attrs.as_slice(),
&create_empty_resource()
)));
}
#[test]
fn test_multiple_otel_attribute_mappings() {
let mut tags = HashMap::new();
tags.insert("http.status_code".to_string(), "5*".to_string());
tags.insert("http.method".to_string(), "POST".to_string());
tags.insert("http.url".to_string(), "*api*".to_string());
let rule = SamplingRule::new(1.0, None, None, None, Some(tags), None);
let mixed_attrs = vec![
KeyValue::new("http.response.status_code", 503),
KeyValue::new("http.request.method", "POST"),
KeyValue::new("url.full", "https://example.com/api/v1/resource"),
];
assert!(rule.matches(&PreSampledSpan::new(
"test-span",
SpanKind::Client,
&mixed_attrs,
&create_empty_resource()
),));
let missing_method = vec![
KeyValue::new("http.response.status_code", 503),
KeyValue::new("url.full", "https://example.com/api/v1/resource"),
];
assert!(!rule.matches(&PreSampledSpan::new(
"test-span",
SpanKind::Client,
&missing_method,
&create_empty_resource()
),));
let wrong_method = vec![
KeyValue::new("http.response.status_code", 503),
KeyValue::new("http.request.method", "GET"), KeyValue::new("url.full", "https://example.com/api/v1/resource"),
];
assert!(!rule.matches(&PreSampledSpan::new(
"test-span",
SpanKind::Client,
&wrong_method,
&create_empty_resource()
),));
}
#[test]
fn test_direct_and_mapped_mixed_attributes() {
let dd_status_key_str = HTTP_RESPONSE_STATUS_CODE;
let otel_response_status_key_str = HTTP_RESPONSE_STATUS_CODE;
let custom_tag_key = "custom.tag";
let custom_tag_value = "value";
let empty_resource = create_empty_resource();
let span_kind_client = SpanKind::Client;
let mut tags_rule1 = HashMap::new();
tags_rule1.insert(dd_status_key_str.to_string(), "5*".to_string());
tags_rule1.insert(custom_tag_key.to_string(), custom_tag_value.to_string());
let rule1 = SamplingRule::new(1.0, None, None, None, Some(tags_rule1), None);
let mixed_attrs_match = vec![
KeyValue::new(otel_response_status_key_str, 503),
KeyValue::new(custom_tag_key, custom_tag_value),
];
assert!(rule1.matches(&PreSampledSpan::new(
"test-span",
span_kind_client,
&mixed_attrs_match,
&empty_resource
)), "Rule with dd_status_key (5*) and custom.tag should match span with otel_response_status_key (503) and custom.tag");
let dd_attrs_match = vec![
KeyValue::new(dd_status_key_str, 503),
KeyValue::new(custom_tag_key, custom_tag_value),
];
assert!(rule1.matches(&PreSampledSpan::new(
"test-span",
SpanKind::Client,
&dd_attrs_match,
&empty_resource
)), "Rule with dd_status_key (5*) and custom.tag should match span with dd_status_key (503) and custom.tag");
let missing_custom_tag_attrs = vec![KeyValue::new(otel_response_status_key_str, 503)];
assert!(
!rule1.matches(&PreSampledSpan::new(
"test-span",
SpanKind::Client,
&missing_custom_tag_attrs,
&empty_resource
)),
"Rule with dd_status_key (5*) and custom.tag should NOT match span missing custom.tag"
);
let non_matching_otel_status_attrs = vec![
KeyValue::new(otel_response_status_key_str, 200),
KeyValue::new(custom_tag_key, custom_tag_value),
];
assert!(!rule1.matches(&PreSampledSpan::new(
"test-span",
SpanKind::Client,
&non_matching_otel_status_attrs,
&empty_resource
)), "Rule with dd_status_key (5*) and custom.tag should NOT match span with non-matching otel_response_status_key (200)");
let no_status_code_attrs = vec![
KeyValue::new("another.tag", "irrelevant"),
KeyValue::new(custom_tag_key, custom_tag_value),
];
assert!(!rule1.matches(&PreSampledSpan::new(
"test-span",
SpanKind::Client,
&no_status_code_attrs,
&empty_resource
)), "Rule with dd_status_key (5*) and custom.tag should NOT match span with no status code attribute");
let mut tags_rule2 = HashMap::new();
tags_rule2.insert(otel_response_status_key_str.to_string(), "200".to_string());
tags_rule2.insert(custom_tag_key.to_string(), custom_tag_value.to_string());
let rule2 = SamplingRule::new(1.0, None, None, None, Some(tags_rule2), None);
let otel_key_rule_match_attrs = vec![
KeyValue::new(otel_response_status_key_str, 200),
KeyValue::new(custom_tag_key, custom_tag_value),
];
assert!(rule2.matches(&PreSampledSpan::new(
"test-span",
SpanKind::Client,
&otel_key_rule_match_attrs,
&empty_resource
)), "Rule with otel_response_status_key (200) and custom.tag should match span with otel_response_status_key (200) and custom.tag");
}
#[test]
fn test_operation_name_integration() {
let http_rule = SamplingRule::new(
1.0, None, Some("http.*.request".to_string()), None, None, Some("default".to_string()), );
let db_rule = SamplingRule::new(
1.0, None, Some("postgresql.query".to_string()), None, None, Some("default".to_string()), );
let messaging_rule = SamplingRule::new(
1.0, None, Some("kafka.process".to_string()), None, None, Some("default".to_string()), );
let sampler = DatadogSampler::new(
vec![http_rule, db_rule, messaging_rule],
100,
create_empty_resource_arc(),
);
let trace_id = create_trace_id();
let http_client_attrs = vec![KeyValue::new(
Key::from_static_str(HTTP_REQUEST_METHOD),
Value::String("GET".into()),
)];
let empty_resource: SdkResource = create_empty_resource();
let http_client_op_name = get_otel_operation_name_v2(&PreSampledSpan::new(
"",
SpanKind::Client,
&http_client_attrs,
&empty_resource,
));
assert_eq!(
http_client_op_name, "http.client.request",
"HTTP client operation name should be correct"
);
let result = sampler.sample(
None,
trace_id,
"test-span",
&SpanKind::Client,
&http_client_attrs,
);
assert_eq!(result.to_otel_decision(), SamplingDecision::RecordAndSample);
let http_server_attrs = vec![KeyValue::new(
Key::from_static_str(HTTP_REQUEST_METHOD),
Value::String("POST".into()),
)];
let http_server_op_name = get_otel_operation_name_v2(&PreSampledSpan::new(
"",
SpanKind::Server,
&http_server_attrs,
&empty_resource,
));
assert_eq!(
http_server_op_name, "http.server.request",
"HTTP server operation name should be correct"
);
let result = sampler.sample(
None,
trace_id,
"test-span",
&SpanKind::Server,
&http_server_attrs,
);
assert_eq!(result.to_otel_decision(), SamplingDecision::RecordAndSample);
let db_attrs = vec![KeyValue::new(
Key::from_static_str(DB_SYSTEM_NAME),
Value::String("postgresql".into()),
)];
let db_op_name = get_otel_operation_name_v2(&PreSampledSpan::new(
"",
SpanKind::Client,
&db_attrs,
&empty_resource,
));
assert_eq!(
db_op_name, "postgresql.query",
"Database operation name should be correct"
);
let result = sampler.sample(
None,
trace_id,
"test-span",
&SpanKind::Client, &db_attrs,
);
assert_eq!(result.to_otel_decision(), SamplingDecision::RecordAndSample);
let messaging_attrs = vec![
KeyValue::new(
Key::from_static_str(MESSAGING_SYSTEM),
Value::String("kafka".into()),
),
KeyValue::new(
Key::from_static_str(MESSAGING_OPERATION_TYPE),
Value::String("process".into()),
),
];
let messaging_op_name = get_otel_operation_name_v2(&PreSampledSpan::new(
"",
SpanKind::Consumer,
&messaging_attrs,
&empty_resource,
));
assert_eq!(
messaging_op_name, "kafka.process",
"Messaging operation name should be correct"
);
let result = sampler.sample(
None,
trace_id,
"test-span",
&SpanKind::Consumer, &messaging_attrs,
);
assert_eq!(result.to_otel_decision(), SamplingDecision::RecordAndSample);
let internal_attrs = vec![KeyValue::new("custom.tag", "value")];
let internal_op_name = get_otel_operation_name_v2(&PreSampledSpan::new(
"",
SpanKind::Internal,
&internal_attrs,
&empty_resource,
));
assert_eq!(
internal_op_name, "internal",
"Internal operation name should be the span kind"
);
let result = sampler.sample(
None,
trace_id,
"test-span",
&SpanKind::Internal,
&internal_attrs,
);
assert_eq!(result.to_otel_decision(), SamplingDecision::RecordAndSample);
let server_protocol_attrs = vec![KeyValue::new(
Key::from_static_str(NETWORK_PROTOCOL_NAME),
Value::String("http".into()),
)];
let server_protocol_op_name = get_otel_operation_name_v2(&PreSampledSpan::new(
"",
SpanKind::Server,
&server_protocol_attrs,
&empty_resource,
));
assert_eq!(
server_protocol_op_name, "http.server.request",
"Server with protocol operation name should use protocol"
);
let result = sampler.sample(
None,
trace_id,
"test-span",
&SpanKind::Server,
&server_protocol_attrs,
);
assert_eq!(result.to_otel_decision(), SamplingDecision::RecordAndSample);
}
#[test]
fn test_on_rules_update_callback() {
let initial_rule = SamplingRule::new(
0.1,
Some("initial-service".to_string()),
None,
None,
None,
Some("default".to_string()),
);
let test_resource = Arc::new(RwLock::new(
opentelemetry_sdk::Resource::builder_empty()
.with_attributes(vec![KeyValue::new(SERVICE_NAME, "web-frontend")])
.build(),
));
let sampler = DatadogSampler::new(vec![initial_rule], 100, test_resource);
assert_eq!(sampler.rules.len(), 1);
let callback = sampler.on_rules_update();
let new_rules = vec![
SamplingRuleConfig {
sample_rate: 0.5,
service: Some("web-*".to_string()),
name: Some("http.*".to_string()),
resource: None,
tags: std::collections::HashMap::new(),
provenance: "customer".to_string(),
},
SamplingRuleConfig {
sample_rate: 0.2,
service: Some("api-*".to_string()),
name: None,
resource: Some("/api/*".to_string()),
tags: [("env".to_string(), "prod".to_string())].into(),
provenance: "dynamic".to_string(),
},
];
callback(&new_rules);
assert_eq!(sampler.rules.len(), 2);
let attrs = vec![
KeyValue::new(HTTP_REQUEST_METHOD, "GET"),
];
let resource_guard = sampler.resource.read().unwrap();
let span = PreSampledSpan::new(
"test-span",
SpanKind::Client,
attrs.as_slice(),
&resource_guard,
);
let matching_rule = sampler.find_matching_rule(&span);
assert!(matching_rule.is_some(), "Expected to find a matching rule for service 'web-frontend' and name 'http.client.request'");
let rule = matching_rule.unwrap();
assert_eq!(rule.sample_rate, 0.5);
assert_eq!(rule.provenance, "customer");
callback(&[]);
assert_eq!(sampler.rules.len(), 0); }
}