use libdd_telemetry::data::Configuration;
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet, VecDeque};
use std::ops::Deref;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use std::{borrow::Cow, fmt::Display, str::FromStr, sync::OnceLock};
use rustc_version_runtime::version;
use crate::core::configuration::sources::{
CompositeConfigSourceResult, CompositeSource, ConfigKey, ConfigSourceOrigin,
};
use crate::core::configuration::supported_configurations::SupportedConfigurations;
use crate::core::log::LevelFilter;
use crate::core::telemetry;
use crate::{dd_error, dd_warn};
#[derive(Debug, Clone)]
pub enum RemoteConfigUpdate {
SamplingRules(Vec<SamplingRuleConfig>),
}
type RemoteConfigCallback = Box<dyn Fn(&RemoteConfigUpdate) + Send + Sync>;
pub struct RemoteConfigCallbacks {
pub sampling_rules_update: Option<RemoteConfigCallback>,
}
impl std::fmt::Debug for RemoteConfigCallbacks {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RemoteConfigCallbacks")
.field(
"sampling_rules_update",
&self.sampling_rules_update.as_ref().map(|_| "<callback>"),
)
.finish()
}
}
impl RemoteConfigCallbacks {
pub fn new() -> Self {
Self {
sampling_rules_update: None,
}
}
pub fn set_sampling_rules_callback<F>(&mut self, callback: F)
where
F: Fn(&RemoteConfigUpdate) + Send + Sync + 'static,
{
self.sampling_rules_update = Some(Box::new(callback));
}
pub fn notify_update(&self, update: &RemoteConfigUpdate) {
match update {
RemoteConfigUpdate::SamplingRules(_) => {
if let Some(ref callback) = self.sampling_rules_update {
callback(update);
}
} }
}
}
impl Default for RemoteConfigCallbacks {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
pub struct SamplingRuleConfig {
pub sample_rate: f64,
#[serde(default)]
pub service: Option<String>,
#[serde(default)]
pub name: Option<String>,
#[serde(default)]
pub resource: Option<String>,
#[serde(default)]
pub tags: HashMap<String, String>,
#[serde(default = "default_provenance")]
pub provenance: String,
}
impl Display for SamplingRuleConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", serde_json::json!(self))
}
}
fn default_provenance() -> String {
"default".to_string()
}
pub const TRACER_VERSION: &str = env!("CARGO_PKG_VERSION");
const DATADOG_TAGS_MAX_LENGTH: usize = 512;
const RC_DEFAULT_POLL_INTERVAL: f64 = 5.0;
#[derive(Debug, Default, Clone, PartialEq)]
struct ParsedSamplingRules {
rules: Vec<SamplingRuleConfig>,
}
impl Deref for ParsedSamplingRules {
type Target = [SamplingRuleConfig];
fn deref(&self) -> &Self::Target {
&self.rules
}
}
impl From<ParsedSamplingRules> for Vec<SamplingRuleConfig> {
fn from(parsed: ParsedSamplingRules) -> Self {
parsed.rules
}
}
impl FromStr for ParsedSamplingRules {
type Err = serde_json::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
if s.trim().is_empty() {
return Ok(ParsedSamplingRules::default());
}
let rules_vec: Vec<SamplingRuleConfig> = serde_json::from_str(s)?;
Ok(ParsedSamplingRules { rules: rules_vec })
}
}
impl Display for ParsedSamplingRules {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{}",
serde_json::to_string(&self.rules).unwrap_or_default()
)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum OtlpProtocol {
Grpc,
HttpProtobuf,
HttpJson,
}
impl FromStr for OtlpProtocol {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let s = s.trim();
if s.eq_ignore_ascii_case("grpc") {
Ok(OtlpProtocol::Grpc)
} else if s.eq_ignore_ascii_case("http/protobuf") {
Ok(OtlpProtocol::HttpProtobuf)
} else if s.eq_ignore_ascii_case("http/json") {
Ok(OtlpProtocol::HttpJson)
} else {
Err(format!("Invalid OTLP protocol: {}", s))
}
}
}
impl OtlpProtocol {
pub(crate) fn parse_optional(s: String) -> Option<Self> {
if s.trim().is_empty() {
None
} else {
s.parse().ok()
}
}
}
fn parse_temporality(s: String) -> Option<opentelemetry_sdk::metrics::Temporality> {
let s = s.trim().to_lowercase();
if s == "cumulative" {
Some(opentelemetry_sdk::metrics::Temporality::Cumulative)
} else if s == "delta" || s.is_empty() {
Some(opentelemetry_sdk::metrics::Temporality::Delta)
} else {
None
}
}
enum ConfigItemRef<'a, T> {
Ref(&'a T),
ArcRef(arc_swap::Guard<Option<Arc<T>>>),
}
impl<T: Deref> Deref for ConfigItemRef<'_, T> {
type Target = T::Target;
fn deref(&self) -> &Self::Target {
match self {
ConfigItemRef::Ref(t) => t,
ConfigItemRef::ArcRef(guard) => guard.as_ref().unwrap(),
}
}
}
impl<T: ConfigurationValueProvider> ConfigurationValueProvider for ConfigItemRef<'_, T> {
fn get_configuration_value(&self) -> String {
match self {
ConfigItemRef::Ref(t) => t.get_configuration_value(),
ConfigItemRef::ArcRef(guard) => guard.as_ref().unwrap().get_configuration_value(),
}
}
}
pub trait ConfigurationProvider {
fn get_all_configurations(&self) -> Vec<Configuration>;
}
trait ConfigurationValueProvider {
fn get_configuration_value(&self) -> String;
}
trait ValueSourceUpdater<T> {
fn name(&self) -> SupportedConfigurations;
fn set_value_source(&mut self, value: T, source: ConfigSourceOrigin);
}
#[derive(Debug)]
struct ConfigItem<T: ConfigurationValueProvider> {
name: SupportedConfigurations,
default_value: T,
env_value: Option<T>,
code_value: Option<T>,
config_id: Option<String>,
}
impl<T: Clone + ConfigurationValueProvider> Clone for ConfigItem<T> {
fn clone(&self) -> Self {
Self {
name: self.name,
default_value: self.default_value.clone(),
env_value: self.env_value.clone(),
code_value: self.code_value.clone(),
config_id: self.config_id.clone(),
}
}
}
impl<T: Clone + ConfigurationValueProvider> ConfigItem<T> {
fn new(name: SupportedConfigurations, default: T) -> Self {
Self {
name,
default_value: default,
env_value: None,
code_value: None,
config_id: None,
}
}
fn set_code(&mut self, value: T) {
self.code_value = Some(value);
}
fn value(&self) -> &T {
self.code_value
.as_ref()
.or(self.env_value.as_ref())
.unwrap_or(&self.default_value)
}
#[allow(dead_code)] fn source(&self) -> ConfigSourceOrigin {
if self.code_value.is_some() {
ConfigSourceOrigin::Code
} else if self.env_value.is_some() {
ConfigSourceOrigin::EnvVar
} else {
ConfigSourceOrigin::Default
}
}
fn build_configurations_list(&self, calculated_value: Option<String>) -> Vec<Configuration> {
let mut configurations = Vec::new();
configurations.push(Configuration {
name: self.name.as_str().to_string(),
value: self.default_value.get_configuration_value(),
origin: ConfigSourceOrigin::Default.into(),
config_id: self.config_id.clone(),
seq_id: Some(ConfigSourceOrigin::Default as u64),
});
if let Some(calculated_value) = calculated_value {
configurations.push(Configuration {
name: self.name.as_str().to_string(),
value: calculated_value,
origin: ConfigSourceOrigin::Calculated.into(),
config_id: self.config_id.clone(),
seq_id: Some(ConfigSourceOrigin::Calculated as u64),
});
}
if let Some(ref env_value) = self.env_value {
configurations.push(Configuration {
name: self.name.as_str().to_string(),
value: env_value.get_configuration_value(),
origin: ConfigSourceOrigin::EnvVar.into(),
config_id: self.config_id.clone(),
seq_id: Some(ConfigSourceOrigin::EnvVar as u64),
});
}
if let Some(ref code_value) = self.code_value {
configurations.push(Configuration {
name: self.name.as_str().to_string(),
value: code_value.get_configuration_value(),
origin: ConfigSourceOrigin::Code.into(),
config_id: self.config_id.clone(),
seq_id: Some(ConfigSourceOrigin::Code as u64),
});
}
configurations
}
}
impl<T: Clone + ConfigurationValueProvider> ConfigurationProvider for ConfigItem<T> {
fn get_all_configurations(&self) -> Vec<Configuration> {
self.build_configurations_list(None)
}
}
impl<T: ConfigurationValueProvider> ValueSourceUpdater<T> for ConfigItem<T> {
fn name(&self) -> SupportedConfigurations {
self.name
}
fn set_value_source(&mut self, value: T, source: ConfigSourceOrigin) {
match source {
ConfigSourceOrigin::Code => self.code_value = Some(value),
ConfigSourceOrigin::EnvVar => self.env_value = Some(value),
ConfigSourceOrigin::Calculated => {
dd_warn!("Cannot set a calculated value");
}
ConfigSourceOrigin::RemoteConfig => {
dd_warn!("Cannot set a value from RC");
}
ConfigSourceOrigin::Default => {
dd_warn!("Cannot set default value after initialization");
}
}
}
}
#[derive(Debug)]
struct ConfigItemWithOverride<T: ConfigurationValueProvider + Deref> {
config_item: ConfigItem<T>,
override_value: arc_swap::ArcSwapOption<T>,
override_origin: ConfigSourceOrigin,
config_id: arc_swap::ArcSwapOption<String>,
}
impl<T: Clone + ConfigurationValueProvider + Deref> Clone for ConfigItemWithOverride<T> {
fn clone(&self) -> Self {
Self {
config_item: self.config_item.clone(),
override_value: arc_swap::ArcSwapOption::new(self.override_value.load_full()),
override_origin: self.override_origin,
config_id: arc_swap::ArcSwapOption::new(self.config_id.load_full()),
}
}
}
impl<T: ConfigurationValueProvider + Clone + Deref> ConfigItemWithOverride<T> {
fn new_calculated(name: SupportedConfigurations, default: T) -> Self {
Self {
config_item: ConfigItem::new(name, default),
override_value: arc_swap::ArcSwapOption::const_empty(),
override_origin: ConfigSourceOrigin::Calculated,
config_id: arc_swap::ArcSwapOption::const_empty(),
}
}
fn new_rc(name: SupportedConfigurations, default: T) -> Self {
Self {
config_item: ConfigItem::new(name, default),
override_value: arc_swap::ArcSwapOption::const_empty(),
override_origin: ConfigSourceOrigin::RemoteConfig,
config_id: arc_swap::ArcSwapOption::const_empty(),
}
}
fn source(&self) -> ConfigSourceOrigin {
let config_item_source = self.config_item.source();
if self.override_value.load().is_none() {
config_item_source
} else {
config_item_source.max(self.override_origin)
}
}
fn set_override_value(&self, value: T, source: ConfigSourceOrigin) {
if source == self.override_origin {
self.override_value.store(Some(Arc::new(value)));
}
}
fn set_config_id(&self, config_id: Option<String>) {
match config_id {
Some(id) => self.config_id.store(Some(Arc::new(id))),
None => self.config_id.store(None),
}
}
#[cfg(test)]
fn get_config_id(&self) -> Option<String> {
self.config_id.load().as_ref().map(|id| (**id).clone())
}
fn unset_override_value(&self) {
self.override_value.store(None);
}
fn set_code(&mut self, value: T) {
self.set_value_source(value, ConfigSourceOrigin::Code);
}
fn set_calculated(&mut self, value: T) {
self.set_value_source(value, ConfigSourceOrigin::Calculated);
}
fn value(&self) -> ConfigItemRef<'_, T> {
let override_value = self.override_value.load();
if override_value.is_some() && self.source() == self.override_origin {
ConfigItemRef::ArcRef(override_value)
} else {
ConfigItemRef::Ref(self.config_item.value())
}
}
}
impl<T: Clone + ConfigurationValueProvider + Deref> ConfigurationProvider
for ConfigItemWithOverride<T>
{
fn get_all_configurations(&self) -> Vec<Configuration> {
let override_value = self.override_value.load();
let calculated_option = if self.source() == ConfigSourceOrigin::Calculated {
Some(override_value.as_ref().unwrap().get_configuration_value())
} else {
None
};
let mut configurations = self
.config_item
.build_configurations_list(calculated_option);
if override_value.is_some() && self.source() != ConfigSourceOrigin::Calculated {
let config_id = self.config_id.load().as_ref().map(|id| (**id).clone());
configurations.push(Configuration {
name: self.config_item.name.as_str().to_string(),
value: self.value().get_configuration_value(),
origin: self.source().into(),
config_id,
seq_id: Some(self.source() as u64),
});
}
configurations
}
}
impl<T: Clone + ConfigurationValueProvider + Deref> ValueSourceUpdater<T>
for ConfigItemWithOverride<T>
{
fn name(&self) -> SupportedConfigurations {
self.config_item.name()
}
fn set_value_source(&mut self, value: T, source: ConfigSourceOrigin) {
if source == self.override_origin {
self.set_override_value(value, source);
} else {
self.config_item.set_value_source(value, source);
}
}
}
struct ConfigItemSourceUpdater<'a> {
sources: &'a CompositeSource,
}
impl ConfigItemSourceUpdater<'_> {
fn apply_result<ParsedConfig, RawConfig, ConfigItemType, F>(
&self,
mut item: ConfigItemType,
result: CompositeConfigSourceResult<RawConfig>,
transform: F,
) -> ConfigItemType
where
ParsedConfig: Clone + ConfigurationValueProvider,
ConfigItemType: ValueSourceUpdater<ParsedConfig>,
F: FnOnce(RawConfig) -> ParsedConfig,
{
if !result.errors.is_empty() {
dd_error!(
"Configuration: Error parsing property {} - {:?}",
item.name().as_str(),
result.errors
);
}
if let Some(ConfigKey { value, origin }) = result.value {
item.set_value_source(transform(value), origin);
}
item
}
fn update_parsed<ParsedConfig, ConfigItemType>(&self, default: ConfigItemType) -> ConfigItemType
where
ParsedConfig: Clone + FromStr + ConfigurationValueProvider,
ParsedConfig::Err: std::fmt::Display,
ConfigItemType: ValueSourceUpdater<ParsedConfig>,
{
let result = self.sources.get_parse::<ParsedConfig>(default.name());
self.apply_result(default, result, |value| value)
}
pub fn update_string<ParsedConfig, ConfigItemType, F>(
&self,
default: ConfigItemType,
transform: F,
) -> ConfigItemType
where
ParsedConfig: Clone + ConfigurationValueProvider,
ConfigItemType: ValueSourceUpdater<ParsedConfig>,
F: FnOnce(String) -> ParsedConfig,
{
let result = self.sources.get(default.name());
self.apply_result(default, result, transform)
}
pub fn update_parsed_with_transform<ParsedConfig, RawConfig, ConfigItemType, F>(
&self,
default: ConfigItemType,
transform: F,
) -> ConfigItemType
where
ParsedConfig: Clone + ConfigurationValueProvider,
RawConfig: FromStr,
RawConfig::Err: std::fmt::Display,
ConfigItemType: ValueSourceUpdater<ParsedConfig>,
F: FnOnce(RawConfig) -> ParsedConfig,
{
let result = self.sources.get_parse::<RawConfig>(default.name());
self.apply_result(default, result, transform)
}
}
macro_rules! impl_config_value_provider {
(option: $($type:ty),* $(,)?) => {
$(
impl ConfigurationValueProvider for Option<$type> {
fn get_configuration_value(&self) -> String {
match self {
Some(value) => value.to_string(),
None => String::new(),
}
}
}
)*
};
(simple: $($type:ty),* $(,)?) => {
$(
impl ConfigurationValueProvider for $type {
fn get_configuration_value(&self) -> String {
self.to_string()
}
}
)*
};
}
type SamplingRulesConfigItem = ConfigItemWithOverride<ParsedSamplingRules>;
#[derive(Debug, Clone)]
struct ExtraServicesTracker {
extra_services: Arc<Mutex<HashSet<String>>>,
extra_services_sent: Arc<Mutex<HashSet<String>>>,
extra_services_queue: Arc<Mutex<Option<VecDeque<String>>>>,
}
impl ExtraServicesTracker {
fn new() -> Self {
Self {
extra_services: Arc::new(Mutex::new(HashSet::new())),
extra_services_sent: Arc::new(Mutex::new(HashSet::new())),
extra_services_queue: Arc::new(Mutex::new(Some(VecDeque::new()))),
}
}
fn add_extra_services(
&self,
services: impl Iterator<Item = impl Deref<Target = str>>,
main_service: &str,
) {
let mut services = services.filter(|s| s.deref() != main_service).peekable();
if services.peek().is_none() {
return;
}
let mut sent = match self.extra_services_sent.lock() {
Ok(s) => s,
Err(_) => return,
};
let mut queue = match self.extra_services_queue.lock() {
Ok(q) => q,
Err(_) => return,
};
for service_name in services {
let service_name = service_name.deref();
if sent.contains(service_name) {
continue;
}
if let Some(ref mut q) = *queue {
q.push_back(service_name.to_string());
}
sent.insert(service_name.to_string());
}
}
fn get_extra_services(&self) -> Vec<String> {
let mut queue = match self.extra_services_queue.lock() {
Ok(q) => q,
Err(_) => return Vec::new(),
};
let mut services = match self.extra_services.lock() {
Ok(s) => s,
Err(_) => return Vec::new(),
};
if let Some(ref mut q) = *queue {
while let Some(service) = q.pop_front() {
services.insert(service);
if services.len() > 64 {
if let Some(to_remove) = services.iter().next().cloned() {
dd_warn!("ExtraServicesTracker:RemoteConfig: Exceeded 64 service limit, removing service: {}", to_remove);
services.remove(&to_remove);
}
}
}
}
services.iter().cloned().collect()
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum TracePropagationStyle {
Datadog,
TraceContext,
None,
}
impl TracePropagationStyle {
fn from_tags(tags: Option<Vec<String>>) -> Option<Vec<TracePropagationStyle>> {
match tags {
Some(tags) if !tags.is_empty() => Some(
tags.iter()
.filter_map(|value| match TracePropagationStyle::from_str(value) {
Ok(style) => Some(style),
Err(err) => {
dd_warn!("Error parsing: {err}");
None
}
})
.collect::<Vec<TracePropagationStyle>>(),
),
Some(_) => None,
None => None,
}
}
}
impl FromStr for TracePropagationStyle {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.trim().to_lowercase().as_str() {
"datadog" => Ok(TracePropagationStyle::Datadog),
"tracecontext" => Ok(TracePropagationStyle::TraceContext),
"none" => Ok(TracePropagationStyle::None),
_ => Err(format!("Unknown trace propagation style: '{s}'")),
}
}
}
impl Display for TracePropagationStyle {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let style = match self {
TracePropagationStyle::Datadog => "datadog",
TracePropagationStyle::TraceContext => "tracecontext",
TracePropagationStyle::None => "none",
};
write!(f, "{style}")
}
}
#[derive(Debug, Clone)]
enum ServiceName {
Default,
Configured(String),
}
impl ServiceName {
fn is_default(&self) -> bool {
matches!(self, ServiceName::Default)
}
fn as_str(&self) -> &str {
match self {
ServiceName::Default => "unnamed-rust-service",
ServiceName::Configured(name) => name,
}
}
}
impl std::ops::Deref for ServiceName {
type Target = str;
fn deref(&self) -> &Self::Target {
self.as_str()
}
}
impl Display for ServiceName {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.as_str())
}
}
impl ConfigurationValueProvider for Vec<(String, String)> {
fn get_configuration_value(&self) -> String {
self.iter()
.map(|(key, value)| format!("{key}:{value}"))
.collect::<Vec<_>>()
.join(",")
}
}
impl ConfigurationValueProvider for Option<Vec<TracePropagationStyle>> {
fn get_configuration_value(&self) -> String {
match &self {
Some(styles) => styles
.iter()
.map(|style| style.to_string())
.collect::<Vec<_>>()
.join(","),
None => "".to_string(),
}
}
}
impl ConfigurationValueProvider for OtlpProtocol {
fn get_configuration_value(&self) -> String {
match self {
OtlpProtocol::Grpc => "grpc",
OtlpProtocol::HttpProtobuf => "http/protobuf",
OtlpProtocol::HttpJson => "http/json",
}
.to_string()
}
}
impl ConfigurationValueProvider for Option<OtlpProtocol> {
fn get_configuration_value(&self) -> String {
self.as_ref()
.map(|p| p.get_configuration_value())
.unwrap_or_default()
}
}
impl ConfigurationValueProvider for opentelemetry_sdk::metrics::Temporality {
fn get_configuration_value(&self) -> String {
match self {
opentelemetry_sdk::metrics::Temporality::Cumulative => "cumulative",
opentelemetry_sdk::metrics::Temporality::Delta => "delta",
_ => "delta",
}
.to_string()
}
}
impl ConfigurationValueProvider for Option<opentelemetry_sdk::metrics::Temporality> {
fn get_configuration_value(&self) -> String {
self.as_ref()
.map(|t| t.get_configuration_value())
.unwrap_or_else(|| "delta".to_string())
}
}
impl_config_value_provider!(simple: Cow<'static, str>, bool, u32, usize, i32, f64, ServiceName, LevelFilter, ParsedSamplingRules);
impl_config_value_provider!(option: String);
#[derive(Clone)]
pub struct Config {
runtime_id: &'static str,
tracer_version: &'static str,
language_version: String,
language: &'static str,
service: ConfigItemWithOverride<ServiceName>,
env: ConfigItem<Option<String>>,
version: ConfigItem<Option<String>>,
global_tags: ConfigItem<Vec<(String, String)>>,
otel_resource_attributes: ConfigItem<Vec<(String, String)>>,
otel_metrics_exporter: ConfigItem<Cow<'static, str>>,
otel_metrics_temporality_preference:
ConfigItem<Option<opentelemetry_sdk::metrics::Temporality>>,
agent_host: ConfigItem<Cow<'static, str>>,
trace_agent_port: ConfigItem<u32>,
trace_agent_url: ConfigItemWithOverride<Cow<'static, str>>,
dogstatsd_agent_host: ConfigItem<Cow<'static, str>>,
dogstatsd_agent_port: ConfigItem<u32>,
dogstatsd_agent_url: ConfigItemWithOverride<Cow<'static, str>>,
trace_sampling_rules: SamplingRulesConfigItem,
trace_rate_limit: ConfigItem<i32>,
enabled: ConfigItem<bool>,
log_level_filter: ConfigItem<LevelFilter>,
trace_stats_computation_enabled: ConfigItem<bool>,
trace_writer_synchronous_write: bool,
trace_writer_synchronous_timeout: Duration,
trace_writer_max_flush_interval: Duration,
#[cfg(feature = "test-utils")]
wait_agent_info_ready: bool,
telemetry_enabled: ConfigItem<bool>,
telemetry_log_collection_enabled: ConfigItem<bool>,
telemetry_heartbeat_interval: ConfigItem<f64>,
trace_partial_flush_enabled: ConfigItem<bool>,
trace_partial_flush_min_spans: ConfigItem<usize>,
trace_propagation_style: ConfigItem<Option<Vec<TracePropagationStyle>>>,
trace_propagation_style_extract: ConfigItem<Option<Vec<TracePropagationStyle>>>,
trace_propagation_style_inject: ConfigItem<Option<Vec<TracePropagationStyle>>>,
trace_propagation_extract_first: ConfigItem<bool>,
remote_config_enabled: ConfigItem<bool>,
remote_config_poll_interval: ConfigItem<f64>,
extra_services_tracker: ExtraServicesTracker,
remote_config_callbacks: Arc<Mutex<RemoteConfigCallbacks>>,
datadog_tags_max_length: ConfigItem<usize>,
metrics_otel_enabled: ConfigItem<bool>,
otlp_metrics_endpoint: ConfigItem<Cow<'static, str>>,
otlp_endpoint: ConfigItem<Cow<'static, str>>,
otlp_headers: ConfigItem<Cow<'static, str>>,
otlp_metrics_protocol: ConfigItem<Option<OtlpProtocol>>,
otlp_metrics_headers: ConfigItem<Cow<'static, str>>,
otlp_protocol: ConfigItem<Option<OtlpProtocol>>,
otlp_metrics_timeout: ConfigItem<u32>,
otlp_timeout: ConfigItem<u32>,
metric_export_interval: ConfigItem<u32>,
metric_export_timeout: ConfigItem<u32>,
logs_otel_enabled: ConfigItem<bool>,
otel_logs_exporter: ConfigItem<Cow<'static, str>>,
otlp_logs_endpoint: ConfigItem<Cow<'static, str>>,
otlp_logs_headers: ConfigItem<Cow<'static, str>>,
otlp_logs_protocol: ConfigItem<Option<OtlpProtocol>>,
otlp_logs_timeout: ConfigItem<u32>,
}
impl Config {
fn from_sources(sources: &CompositeSource) -> Self {
let default = default_config();
struct DdTags(Vec<String>);
impl FromStr for DdTags {
type Err = &'static str;
fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(DdTags(
s.split(',').map(|s| s.to_string()).collect::<Vec<String>>(),
))
}
}
struct DdKeyValueTags(Vec<(String, String)>);
impl FromStr for DdKeyValueTags {
type Err = &'static str;
fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(DdKeyValueTags(
s.split(',')
.filter_map(|s| {
s.split_once(':')
.map(|(k, v)| (k.trim().to_string(), v.trim().to_string()))
})
.collect(),
))
}
}
struct OtelResourceAttributes(Vec<(String, String)>);
impl FromStr for OtelResourceAttributes {
type Err = &'static str;
fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(OtelResourceAttributes(
s.split(',')
.filter_map(|s| {
s.split_once('=')
.map(|(k, v)| (k.trim().to_string(), v.trim().to_string()))
})
.collect(),
))
}
}
let parsed_sampling_rules_config = sources
.get_parse::<ParsedSamplingRules>(SupportedConfigurations::DD_TRACE_SAMPLING_RULES);
let mut sampling_rules_item = ConfigItemWithOverride::new_rc(
parsed_sampling_rules_config.name,
ParsedSamplingRules::default(), );
if let Some(rules) = parsed_sampling_rules_config.value {
sampling_rules_item.set_value_source(rules.value, rules.origin);
}
let cisu = ConfigItemSourceUpdater { sources };
Self {
runtime_id: default.runtime_id,
tracer_version: default.tracer_version,
language_version: default.language_version,
language: default.language,
service: cisu.update_string(default.service, ServiceName::Configured),
env: cisu.update_string(default.env, Some),
version: cisu.update_string(default.version, Some),
global_tags: cisu
.update_parsed_with_transform(default.global_tags, |DdKeyValueTags(tags)| tags),
otel_resource_attributes: cisu.update_parsed_with_transform(
default.otel_resource_attributes,
|OtelResourceAttributes(attrs)| attrs,
),
otel_metrics_exporter: cisu.update_string(default.otel_metrics_exporter, Cow::Owned),
otel_metrics_temporality_preference: cisu.update_string(
default.otel_metrics_temporality_preference,
parse_temporality,
),
agent_host: cisu.update_string(default.agent_host, Cow::Owned),
trace_agent_port: cisu.update_parsed(default.trace_agent_port),
trace_agent_url: cisu.update_string(default.trace_agent_url, Cow::Owned),
dogstatsd_agent_host: cisu.update_string(default.dogstatsd_agent_host, Cow::Owned),
dogstatsd_agent_port: cisu.update_parsed(default.dogstatsd_agent_port),
dogstatsd_agent_url: cisu.update_string(default.dogstatsd_agent_url, Cow::Owned),
trace_partial_flush_enabled: cisu.update_parsed(default.trace_partial_flush_enabled),
trace_partial_flush_min_spans: cisu
.update_parsed(default.trace_partial_flush_min_spans),
trace_sampling_rules: sampling_rules_item,
trace_rate_limit: cisu.update_parsed(default.trace_rate_limit),
enabled: cisu.update_parsed(default.enabled),
log_level_filter: cisu.update_parsed(default.log_level_filter),
trace_stats_computation_enabled: cisu
.update_parsed(default.trace_stats_computation_enabled),
telemetry_enabled: cisu.update_parsed(default.telemetry_enabled),
telemetry_log_collection_enabled: cisu
.update_parsed(default.telemetry_log_collection_enabled),
telemetry_heartbeat_interval: cisu.update_parsed_with_transform(
default.telemetry_heartbeat_interval,
|interval: f64| interval.abs(),
),
trace_propagation_style: cisu
.update_parsed_with_transform(default.trace_propagation_style, |DdTags(tags)| {
TracePropagationStyle::from_tags(Some(tags))
}),
trace_propagation_style_extract: cisu.update_parsed_with_transform(
default.trace_propagation_style_extract,
|DdTags(tags)| TracePropagationStyle::from_tags(Some(tags)),
),
trace_propagation_style_inject: cisu.update_parsed_with_transform(
default.trace_propagation_style_inject,
|DdTags(tags)| TracePropagationStyle::from_tags(Some(tags)),
),
trace_propagation_extract_first: cisu
.update_parsed(default.trace_propagation_extract_first),
trace_writer_synchronous_write: default.trace_writer_synchronous_write,
trace_writer_synchronous_timeout: default.trace_writer_synchronous_timeout,
trace_writer_max_flush_interval: default.trace_writer_max_flush_interval,
#[cfg(feature = "test-utils")]
wait_agent_info_ready: default.wait_agent_info_ready,
extra_services_tracker: ExtraServicesTracker::new(),
remote_config_enabled: cisu.update_parsed(default.remote_config_enabled),
remote_config_poll_interval: cisu.update_parsed_with_transform(
default.remote_config_poll_interval,
|interval: f64| interval.abs().min(RC_DEFAULT_POLL_INTERVAL),
),
remote_config_callbacks: Arc::new(Mutex::new(RemoteConfigCallbacks::new())),
datadog_tags_max_length: cisu
.update_parsed_with_transform(default.datadog_tags_max_length, |max: usize| {
max.min(DATADOG_TAGS_MAX_LENGTH)
}),
metrics_otel_enabled: cisu.update_parsed(default.metrics_otel_enabled),
otlp_metrics_endpoint: cisu.update_string(default.otlp_metrics_endpoint, Cow::Owned),
otlp_endpoint: cisu.update_string(default.otlp_endpoint, Cow::Owned),
otlp_headers: cisu.update_string(default.otlp_headers, Cow::Owned),
otlp_metrics_protocol: cisu
.update_string(default.otlp_metrics_protocol, OtlpProtocol::parse_optional),
otlp_metrics_headers: cisu.update_string(default.otlp_metrics_headers, Cow::Owned),
otlp_protocol: cisu.update_string(default.otlp_protocol, OtlpProtocol::parse_optional),
otlp_metrics_timeout: cisu.update_parsed(default.otlp_metrics_timeout),
otlp_timeout: cisu.update_parsed(default.otlp_timeout),
metric_export_interval: cisu.update_parsed(default.metric_export_interval),
metric_export_timeout: cisu.update_parsed(default.metric_export_timeout),
logs_otel_enabled: cisu.update_parsed(default.logs_otel_enabled),
otel_logs_exporter: cisu.update_string(default.otel_logs_exporter, Cow::Owned),
otlp_logs_endpoint: cisu.update_string(default.otlp_logs_endpoint, Cow::Owned),
otlp_logs_headers: cisu.update_string(default.otlp_logs_headers, Cow::Owned),
otlp_logs_protocol: cisu
.update_string(default.otlp_logs_protocol, OtlpProtocol::parse_optional),
otlp_logs_timeout: cisu.update_parsed(default.otlp_logs_timeout),
}
}
fn builder_with_sources(sources: &CompositeSource) -> ConfigBuilder {
ConfigBuilder {
config: Config::from_sources(sources),
}
}
pub fn builder() -> ConfigBuilder {
Self::builder_with_sources(&CompositeSource::default_sources())
}
pub(crate) fn get_telemetry_configuration(&self) -> Vec<&dyn ConfigurationProvider> {
vec![
&self.service,
&self.env,
&self.version,
&self.global_tags,
&self.agent_host,
&self.trace_agent_port,
&self.trace_agent_url,
&self.dogstatsd_agent_host,
&self.dogstatsd_agent_port,
&self.dogstatsd_agent_url,
&self.trace_sampling_rules,
&self.trace_rate_limit,
&self.enabled,
&self.log_level_filter,
&self.trace_stats_computation_enabled,
&self.telemetry_enabled,
&self.telemetry_log_collection_enabled,
&self.telemetry_heartbeat_interval,
&self.trace_partial_flush_enabled,
&self.trace_partial_flush_min_spans,
&self.trace_propagation_style,
&self.trace_propagation_style_extract,
&self.trace_propagation_style_inject,
&self.trace_propagation_extract_first,
&self.remote_config_enabled,
&self.remote_config_poll_interval,
&self.datadog_tags_max_length,
&self.otlp_endpoint,
&self.otlp_timeout,
&self.otlp_headers,
&self.otlp_protocol,
&self.otlp_metrics_endpoint,
&self.otlp_metrics_timeout,
&self.otlp_metrics_headers,
&self.otlp_metrics_protocol,
&self.metric_export_interval,
&self.metric_export_timeout,
&self.logs_otel_enabled,
&self.otel_logs_exporter,
&self.otlp_logs_endpoint,
&self.otlp_logs_headers,
&self.otlp_logs_protocol,
&self.otlp_logs_timeout,
]
}
pub fn runtime_id(&self) -> &str {
self.runtime_id
}
pub fn tracer_version(&self) -> &str {
self.tracer_version
}
pub fn language(&self) -> &str {
self.language
}
pub fn language_version(&self) -> &str {
self.language_version.as_str()
}
pub fn service(&self) -> impl Deref<Target = str> + use<'_> {
self.service.value()
}
pub fn service_is_default(&self) -> bool {
match self.service.value() {
ConfigItemRef::Ref(t) => t.is_default(),
ConfigItemRef::ArcRef(guard) => guard.as_ref().unwrap().is_default(),
}
}
pub fn env(&self) -> Option<&str> {
self.env.value().as_deref()
}
pub fn version(&self) -> Option<&str> {
self.version.value().as_deref()
}
pub fn global_tags(&self) -> impl Iterator<Item = (&str, &str)> {
self.global_tags
.value()
.iter()
.map(|tag| (tag.0.as_str(), tag.1.as_str()))
}
pub fn otel_resource_attributes(&self) -> impl Iterator<Item = (&str, &str)> {
self.otel_resource_attributes
.value()
.iter()
.map(|attr| (attr.0.as_str(), attr.1.as_str()))
}
pub fn otel_metrics_exporter(&self) -> &str {
self.otel_metrics_exporter.value().as_ref()
}
pub fn otel_metrics_temporality_preference(
&self,
) -> Option<opentelemetry_sdk::metrics::Temporality> {
*self.otel_metrics_temporality_preference.value()
}
pub fn trace_agent_url(&self) -> impl Deref<Target = str> + use<'_> {
self.trace_agent_url.value()
}
pub fn dogstatsd_agent_host(&self) -> &Cow<'static, str> {
self.dogstatsd_agent_host.value()
}
pub fn dogstatsd_agent_port(&self) -> &u32 {
self.dogstatsd_agent_port.value()
}
pub fn dogstatsd_agent_url(&self) -> impl Deref<Target = str> + use<'_> {
self.dogstatsd_agent_url.value()
}
pub fn trace_sampling_rules(&self) -> impl Deref<Target = [SamplingRuleConfig]> + use<'_> {
self.trace_sampling_rules.value()
}
pub fn trace_rate_limit(&self) -> i32 {
*self.trace_rate_limit.value()
}
pub fn enabled(&self) -> bool {
*self.enabled.value()
}
pub fn log_level_filter(&self) -> &LevelFilter {
self.log_level_filter.value()
}
pub fn trace_stats_computation_enabled(&self) -> bool {
*self.trace_stats_computation_enabled.value()
}
pub(crate) fn trace_writer_synchronous_write(&self) -> bool {
self.trace_writer_synchronous_write
}
pub(crate) fn trace_writer_synchronous_timeout(&self) -> Duration {
self.trace_writer_synchronous_timeout
}
pub(crate) fn trace_writer_max_flush_interval(&self) -> Duration {
self.trace_writer_max_flush_interval
}
#[cfg(feature = "test-utils")]
pub(crate) fn __internal_wait_agent_info_ready(&self) -> bool {
self.wait_agent_info_ready
}
fn process_runtime_id() -> &'static str {
static RUNTIME_ID: OnceLock<String> = OnceLock::new();
RUNTIME_ID.get_or_init(|| uuid::Uuid::new_v4().to_string())
}
pub fn telemetry_enabled(&self) -> bool {
*self.telemetry_enabled.value()
}
pub fn telemetry_log_collection_enabled(&self) -> bool {
*self.telemetry_log_collection_enabled.value()
}
pub fn telemetry_heartbeat_interval(&self) -> f64 {
*self.telemetry_heartbeat_interval.value()
}
pub fn metrics_otel_enabled(&self) -> bool {
*self.metrics_otel_enabled.value()
}
pub fn otlp_metrics_endpoint(&self) -> &str {
self.otlp_metrics_endpoint.value().as_ref()
}
pub fn otlp_endpoint(&self) -> &str {
self.otlp_endpoint.value().as_ref()
}
pub fn otlp_headers(&self) -> &str {
self.otlp_headers.value().as_ref()
}
pub fn otlp_metrics_protocol(&self) -> Option<OtlpProtocol> {
*self.otlp_metrics_protocol.value()
}
pub fn otlp_metrics_headers(&self) -> &str {
self.otlp_metrics_headers.value().as_ref()
}
pub fn otlp_protocol(&self) -> Option<OtlpProtocol> {
*self.otlp_protocol.value()
}
pub fn otlp_metrics_timeout(&self) -> u32 {
*self.otlp_metrics_timeout.value()
}
pub fn otlp_timeout(&self) -> u32 {
*self.otlp_timeout.value()
}
pub fn metric_export_interval(&self) -> u32 {
*self.metric_export_interval.value()
}
pub fn metric_export_timeout(&self) -> u32 {
*self.metric_export_timeout.value()
}
pub fn logs_otel_enabled(&self) -> bool {
*self.logs_otel_enabled.value()
}
pub fn otel_logs_exporter(&self) -> &str {
self.otel_logs_exporter.value().as_ref()
}
pub fn otlp_logs_endpoint(&self) -> &str {
self.otlp_logs_endpoint.value().as_ref()
}
pub fn otlp_logs_headers(&self) -> &str {
self.otlp_logs_headers.value().as_ref()
}
pub fn otlp_logs_protocol(&self) -> Option<OtlpProtocol> {
*self.otlp_logs_protocol.value()
}
pub fn otlp_logs_timeout(&self) -> u32 {
*self.otlp_logs_timeout.value()
}
pub fn trace_partial_flush_enabled(&self) -> bool {
*self.trace_partial_flush_enabled.value()
}
pub fn trace_partial_flush_min_spans(&self) -> usize {
*self.trace_partial_flush_min_spans.value()
}
pub fn trace_propagation_style(&self) -> Option<&[TracePropagationStyle]> {
self.trace_propagation_style.value().as_deref()
}
pub fn trace_propagation_style_extract(&self) -> Option<&[TracePropagationStyle]> {
self.trace_propagation_style_extract.value().as_deref()
}
pub fn trace_propagation_style_inject(&self) -> Option<&[TracePropagationStyle]> {
self.trace_propagation_style_inject.value().as_deref()
}
pub fn trace_propagation_extract_first(&self) -> bool {
*self.trace_propagation_extract_first.value()
}
pub(crate) fn update_sampling_rules_from_remote(
&self,
rules_json: &str,
config_id: Option<String>,
) -> Result<(), String> {
let rules: Vec<SamplingRuleConfig> = serde_json::from_str(rules_json)
.map_err(|e| format!("Failed to parse sampling rules JSON: {e}"))?;
if rules.is_empty() {
self.clear_remote_sampling_rules(config_id);
} else {
self.trace_sampling_rules.set_override_value(
ParsedSamplingRules { rules },
ConfigSourceOrigin::RemoteConfig,
);
self.trace_sampling_rules.set_config_id(config_id);
self.remote_config_callbacks.lock().unwrap().notify_update(
&RemoteConfigUpdate::SamplingRules(self.trace_sampling_rules().to_vec()),
);
telemetry::notify_configuration_update(&self.trace_sampling_rules);
}
Ok(())
}
#[allow(dead_code)]
pub(crate) fn update_service_name(&self, service_name: Option<String>) {
if let Some(service_name) = service_name {
self.service.set_override_value(
ServiceName::Configured(service_name),
ConfigSourceOrigin::Code,
);
}
}
pub(crate) fn set_calculated_service_name(&self, service_name: Option<String>) {
if let Some(service_name) = service_name {
self.service.set_override_value(
ServiceName::Configured(service_name),
ConfigSourceOrigin::Calculated,
);
}
}
pub(crate) fn clear_remote_sampling_rules(&self, config_id: Option<String>) {
self.trace_sampling_rules.unset_override_value();
self.trace_sampling_rules.set_config_id(config_id);
self.remote_config_callbacks.lock().unwrap().notify_update(
&RemoteConfigUpdate::SamplingRules(self.trace_sampling_rules().to_vec()),
);
telemetry::notify_configuration_update(&self.trace_sampling_rules);
}
pub(crate) fn set_sampling_rules_callback<F>(&self, callback: F)
where
F: Fn(&RemoteConfigUpdate) + Send + Sync + 'static,
{
self.remote_config_callbacks
.lock()
.unwrap()
.set_sampling_rules_callback(callback);
}
pub(crate) fn add_extra_services(
&self,
service_names: impl Iterator<Item = impl Deref<Target = str>>,
) {
if !self.remote_config_enabled() {
return;
}
self.extra_services_tracker
.add_extra_services(service_names, self.service().deref());
}
pub(crate) fn get_extra_services(&self) -> Vec<String> {
if !self.remote_config_enabled() {
return Vec::new();
}
self.extra_services_tracker.get_extra_services()
}
pub fn remote_config_enabled(&self) -> bool {
*self.remote_config_enabled.value()
}
pub fn remote_config_poll_interval(&self) -> f64 {
*self.remote_config_poll_interval.value()
}
pub fn datadog_tags_max_length(&self) -> usize {
*self.datadog_tags_max_length.value()
}
}
impl std::fmt::Debug for Config {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Config")
.field("runtime_id", &self.runtime_id)
.field("tracer_version", &self.tracer_version)
.field("language_version", &self.language_version)
.field("service", &self.service)
.field("env", &self.env)
.field("version", &self.version)
.field("global_tags", &self.global_tags)
.field("trace_agent_url", &self.trace_agent_url)
.field("dogstatsd_agent_url", &self.dogstatsd_agent_url)
.field("trace_sampling_rules", &self.trace_sampling_rules)
.field("trace_rate_limit", &self.trace_rate_limit)
.field("enabled", &self.enabled)
.field("log_level_filter", &self.log_level_filter)
.field(
"trace_stats_computation_enabled",
&self.trace_stats_computation_enabled,
)
.field("trace_propagation_style", &self.trace_propagation_style)
.field(
"trace_propagation_style_extract",
&self.trace_propagation_style_extract,
)
.field(
"trace_propagation_style_inject",
&self.trace_propagation_style_inject,
)
.field(
"trace_propagation_extract_first",
&self.trace_propagation_extract_first,
)
.field("extra_services_tracker", &self.extra_services_tracker)
.field("remote_config_enabled", &self.remote_config_enabled)
.field(
"remote_config_poll_interval",
&self.remote_config_poll_interval,
)
.field("remote_config_callbacks", &self.remote_config_callbacks)
.finish()
}
}
fn default_config() -> Config {
Config {
runtime_id: Config::process_runtime_id(),
env: ConfigItem::new(SupportedConfigurations::DD_ENV, None),
service: ConfigItemWithOverride::new_calculated(
SupportedConfigurations::DD_SERVICE,
ServiceName::Default,
),
version: ConfigItem::new(SupportedConfigurations::DD_VERSION, None),
global_tags: ConfigItem::new(SupportedConfigurations::DD_TAGS, Vec::new()),
otel_resource_attributes: ConfigItem::new(
SupportedConfigurations::OTEL_RESOURCE_ATTRIBUTES,
Vec::new(),
),
otel_metrics_exporter: ConfigItem::new(
SupportedConfigurations::OTEL_METRICS_EXPORTER,
Cow::Borrowed("otlp"),
),
otel_metrics_temporality_preference: ConfigItem::new(
SupportedConfigurations::OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE,
Some(opentelemetry_sdk::metrics::Temporality::Delta),
),
agent_host: ConfigItem::new(
SupportedConfigurations::DD_AGENT_HOST,
Cow::Borrowed("localhost"),
),
trace_agent_port: ConfigItem::new(SupportedConfigurations::DD_TRACE_AGENT_PORT, 8126),
trace_agent_url: ConfigItemWithOverride::new_calculated(
SupportedConfigurations::DD_TRACE_AGENT_URL,
Cow::Borrowed(""),
),
dogstatsd_agent_host: ConfigItem::new(
SupportedConfigurations::DD_DOGSTATSD_HOST,
Cow::Borrowed("localhost"),
),
dogstatsd_agent_port: ConfigItem::new(SupportedConfigurations::DD_DOGSTATSD_PORT, 8125),
dogstatsd_agent_url: ConfigItemWithOverride::new_calculated(
SupportedConfigurations::DD_DOGSTATSD_URL,
Cow::Borrowed(""),
),
trace_sampling_rules: ConfigItemWithOverride::new_rc(
SupportedConfigurations::DD_TRACE_SAMPLING_RULES,
ParsedSamplingRules::default(), ),
trace_rate_limit: ConfigItem::new(SupportedConfigurations::DD_TRACE_RATE_LIMIT, 100),
enabled: ConfigItem::new(SupportedConfigurations::DD_TRACE_ENABLED, true),
log_level_filter: ConfigItem::new(
SupportedConfigurations::DD_LOG_LEVEL,
LevelFilter::default(),
),
tracer_version: TRACER_VERSION,
language: "rust",
language_version: version().to_string(),
trace_stats_computation_enabled: ConfigItem::new(
SupportedConfigurations::DD_TRACE_STATS_COMPUTATION_ENABLED,
true,
),
trace_writer_synchronous_write: false,
trace_writer_synchronous_timeout: Duration::from_secs(2),
trace_writer_max_flush_interval: Duration::from_secs(1),
#[cfg(feature = "test-utils")]
wait_agent_info_ready: false,
telemetry_enabled: ConfigItem::new(
SupportedConfigurations::DD_INSTRUMENTATION_TELEMETRY_ENABLED,
true,
),
telemetry_log_collection_enabled: ConfigItem::new(
SupportedConfigurations::DD_TELEMETRY_LOG_COLLECTION_ENABLED,
true,
),
telemetry_heartbeat_interval: ConfigItem::new(
SupportedConfigurations::DD_TELEMETRY_HEARTBEAT_INTERVAL,
60.0,
),
trace_partial_flush_enabled: ConfigItem::new(
SupportedConfigurations::DD_TRACE_PARTIAL_FLUSH_ENABLED,
false,
),
trace_partial_flush_min_spans: ConfigItem::new(
SupportedConfigurations::DD_TRACE_PARTIAL_FLUSH_MIN_SPANS,
300,
),
trace_propagation_style: ConfigItem::new(
SupportedConfigurations::DD_TRACE_PROPAGATION_STYLE,
Some(vec![
TracePropagationStyle::Datadog,
TracePropagationStyle::TraceContext,
]),
),
trace_propagation_style_extract: ConfigItem::new(
SupportedConfigurations::DD_TRACE_PROPAGATION_STYLE_EXTRACT,
None,
),
trace_propagation_style_inject: ConfigItem::new(
SupportedConfigurations::DD_TRACE_PROPAGATION_STYLE_INJECT,
None,
),
trace_propagation_extract_first: ConfigItem::new(
SupportedConfigurations::DD_TRACE_PROPAGATION_EXTRACT_FIRST,
false,
),
extra_services_tracker: ExtraServicesTracker::new(),
remote_config_enabled: ConfigItem::new(
SupportedConfigurations::DD_REMOTE_CONFIGURATION_ENABLED,
true,
),
remote_config_poll_interval: ConfigItem::new(
SupportedConfigurations::DD_REMOTE_CONFIG_POLL_INTERVAL_SECONDS,
RC_DEFAULT_POLL_INTERVAL,
),
remote_config_callbacks: Arc::new(Mutex::new(RemoteConfigCallbacks::new())),
datadog_tags_max_length: ConfigItem::new(
SupportedConfigurations::DD_TRACE_X_DATADOG_TAGS_MAX_LENGTH,
DATADOG_TAGS_MAX_LENGTH,
),
metrics_otel_enabled: ConfigItem::new(
SupportedConfigurations::DD_METRICS_OTEL_ENABLED,
true,
),
otlp_metrics_endpoint: ConfigItem::new(
SupportedConfigurations::OTEL_EXPORTER_OTLP_METRICS_ENDPOINT,
Cow::Borrowed(""),
),
otlp_endpoint: ConfigItem::new(
SupportedConfigurations::OTEL_EXPORTER_OTLP_ENDPOINT,
Cow::Borrowed(""),
),
otlp_headers: ConfigItem::new(
SupportedConfigurations::OTEL_EXPORTER_OTLP_HEADERS,
Cow::Borrowed(""),
),
otlp_metrics_protocol: ConfigItem::new(
SupportedConfigurations::OTEL_EXPORTER_OTLP_METRICS_PROTOCOL,
None,
),
otlp_metrics_headers: ConfigItem::new(
SupportedConfigurations::OTEL_EXPORTER_OTLP_METRICS_HEADERS,
Cow::Borrowed(""),
),
otlp_protocol: ConfigItem::new(SupportedConfigurations::OTEL_EXPORTER_OTLP_PROTOCOL, None),
otlp_metrics_timeout: ConfigItem::new(
SupportedConfigurations::OTEL_EXPORTER_OTLP_METRICS_TIMEOUT,
10000u32,
),
otlp_timeout: ConfigItem::new(
SupportedConfigurations::OTEL_EXPORTER_OTLP_TIMEOUT,
10000u32,
),
metric_export_interval: ConfigItem::new(
SupportedConfigurations::OTEL_METRIC_EXPORT_INTERVAL,
10000u32,
),
metric_export_timeout: ConfigItem::new(
SupportedConfigurations::OTEL_METRIC_EXPORT_TIMEOUT,
7500u32,
),
logs_otel_enabled: ConfigItem::new(SupportedConfigurations::DD_LOGS_OTEL_ENABLED, true),
otel_logs_exporter: ConfigItem::new(
SupportedConfigurations::OTEL_LOGS_EXPORTER,
Cow::Borrowed("otlp"),
),
otlp_logs_endpoint: ConfigItem::new(
SupportedConfigurations::OTEL_EXPORTER_OTLP_LOGS_ENDPOINT,
Cow::Borrowed(""),
),
otlp_logs_headers: ConfigItem::new(
SupportedConfigurations::OTEL_EXPORTER_OTLP_LOGS_HEADERS,
Cow::Borrowed(""),
),
otlp_logs_protocol: ConfigItem::new(
SupportedConfigurations::OTEL_EXPORTER_OTLP_LOGS_PROTOCOL,
None,
),
otlp_logs_timeout: ConfigItem::new(
SupportedConfigurations::OTEL_EXPORTER_OTLP_LOGS_TIMEOUT,
10000u32,
),
}
}
pub struct ConfigBuilder {
config: Config,
}
impl ConfigBuilder {
pub fn build(&self) -> Config {
crate::core::log::set_max_level(*self.config.log_level_filter.value());
let mut config = self.config.clone();
if config.trace_agent_url.value().is_empty() {
let host = &config.agent_host.value();
let port = *config.trace_agent_port.value();
config
.trace_agent_url
.set_calculated(Cow::Owned(format!("http://{host}:{port}")));
}
if config.dogstatsd_agent_url.value().is_empty() {
let host = &config.dogstatsd_agent_host.value();
let port = *config.dogstatsd_agent_port.value();
config
.dogstatsd_agent_url
.set_calculated(Cow::Owned(format!("http://{host}:{port}")));
}
config
}
pub fn set_service(&mut self, service: String) -> &mut Self {
self.config
.service
.set_code(ServiceName::Configured(service));
self
}
pub fn set_env(&mut self, env: String) -> &mut Self {
self.config.env.set_code(Some(env));
self
}
pub fn set_version(&mut self, version: String) -> &mut Self {
self.config.version.set_code(Some(version));
self
}
pub fn set_global_tags(&mut self, tags: Vec<(String, String)>) -> &mut Self {
self.config.global_tags.set_code(tags);
self
}
pub fn add_global_tag(&mut self, tag: (String, String)) -> &mut Self {
let mut current_tags = self.config.global_tags.value().clone();
current_tags.push(tag);
self.config.global_tags.set_code(current_tags);
self
}
pub fn set_otel_resource_attributes(&mut self, attributes: Vec<(String, String)>) -> &mut Self {
self.config.otel_resource_attributes.set_code(attributes);
self
}
pub fn set_telemetry_enabled(&mut self, enabled: bool) -> &mut Self {
self.config.telemetry_enabled.set_code(enabled);
self
}
pub fn set_telemetry_log_collection_enabled(&mut self, enabled: bool) -> &mut Self {
self.config
.telemetry_log_collection_enabled
.set_code(enabled);
self
}
pub fn set_telemetry_heartbeat_interval(&mut self, seconds: f64) -> &mut Self {
self.config
.telemetry_heartbeat_interval
.set_code(seconds.abs());
self
}
pub fn set_agent_host(&mut self, host: String) -> &mut Self {
self.config
.agent_host
.set_code(Cow::Owned(host.to_string()));
self
}
pub fn set_trace_agent_port(&mut self, port: u32) -> &mut Self {
self.config.trace_agent_port.set_code(port);
self
}
pub fn set_trace_agent_url(&mut self, url: String) -> &mut Self {
self.config
.trace_agent_url
.set_code(Cow::Owned(url.to_string()));
self
}
pub fn set_metrics_otel_enabled(&mut self, enabled: bool) -> &mut Self {
self.config.metrics_otel_enabled.set_code(enabled);
self
}
pub fn set_otlp_metrics_endpoint(&mut self, endpoint: String) -> &mut Self {
self.config
.otlp_metrics_endpoint
.set_code(Cow::Owned(endpoint));
self
}
pub fn set_otlp_endpoint(&mut self, endpoint: String) -> &mut Self {
self.config.otlp_endpoint.set_code(Cow::Owned(endpoint));
self
}
pub fn set_otlp_metrics_protocol(&mut self, protocol: String) -> &mut Self {
self.config
.otlp_metrics_protocol
.set_code(OtlpProtocol::parse_optional(protocol));
self
}
pub fn set_otlp_protocol(&mut self, protocol: String) -> &mut Self {
self.config
.otlp_protocol
.set_code(OtlpProtocol::parse_optional(protocol));
self
}
pub fn set_otlp_metrics_timeout(&mut self, timeout: u32) -> &mut Self {
self.config.otlp_metrics_timeout.set_code(timeout);
self
}
pub fn set_otlp_timeout(&mut self, timeout: u32) -> &mut Self {
self.config.otlp_timeout.set_code(timeout);
self
}
pub fn set_logs_otel_enabled(&mut self, enabled: bool) -> &mut Self {
self.config.logs_otel_enabled.set_code(enabled);
self
}
pub fn set_otlp_logs_endpoint(&mut self, endpoint: String) -> &mut Self {
self.config
.otlp_logs_endpoint
.set_code(Cow::Owned(endpoint));
self
}
pub fn set_otlp_logs_protocol(&mut self, protocol: String) -> &mut Self {
self.config
.otlp_logs_protocol
.set_code(OtlpProtocol::parse_optional(protocol));
self
}
pub fn set_otlp_logs_timeout(&mut self, timeout: u32) -> &mut Self {
self.config.otlp_logs_timeout.set_code(timeout);
self
}
pub fn set_otel_metrics_temporality_preference(
&mut self,
temporality: opentelemetry_sdk::metrics::Temporality,
) -> &mut Self {
self.config
.otel_metrics_temporality_preference
.set_code(Some(temporality));
self
}
pub fn set_metric_export_interval(&mut self, interval: u32) -> &mut Self {
self.config.metric_export_interval.set_code(interval);
self
}
pub fn set_metric_export_timeout(&mut self, timeout: u32) -> &mut Self {
self.config.metric_export_timeout.set_code(timeout);
self
}
pub fn set_dogstatsd_agent_host(&mut self, host: String) -> &mut Self {
self.config
.dogstatsd_agent_host
.set_code(Cow::Owned(host.to_string()));
self
}
pub fn set_dogstatsd_agent_port(&mut self, port: u32) -> &mut Self {
self.config.dogstatsd_agent_port.set_code(port);
self
}
pub fn set_trace_partial_flush_enabled(&mut self, enabled: bool) -> &mut Self {
self.config.trace_partial_flush_enabled.set_code(enabled);
self
}
pub fn set_trace_partial_flush_min_spans(&mut self, min_spans: usize) -> &mut Self {
self.config
.trace_partial_flush_min_spans
.set_code(min_spans);
self
}
pub fn set_trace_sampling_rules(&mut self, rules: Vec<SamplingRuleConfig>) -> &mut Self {
self.config
.trace_sampling_rules
.set_code(ParsedSamplingRules { rules });
self
}
pub fn set_trace_rate_limit(&mut self, rate_limit: i32) -> &mut Self {
self.config.trace_rate_limit.set_code(rate_limit);
self
}
pub fn set_trace_propagation_style(&mut self, styles: Vec<TracePropagationStyle>) -> &mut Self {
self.config.trace_propagation_style.set_code(Some(styles));
self
}
pub fn set_trace_propagation_style_extract(
&mut self,
styles: Vec<TracePropagationStyle>,
) -> &mut Self {
self.config
.trace_propagation_style_extract
.set_code(Some(styles));
self
}
pub fn set_trace_propagation_style_inject(
&mut self,
styles: Vec<TracePropagationStyle>,
) -> &mut Self {
self.config
.trace_propagation_style_inject
.set_code(Some(styles));
self
}
pub fn set_trace_propagation_extract_first(&mut self, first: bool) -> &mut Self {
self.config.trace_propagation_extract_first.set_code(first);
self
}
pub fn set_enabled(&mut self, enabled: bool) -> &mut Self {
self.config.enabled.set_code(enabled);
self
}
pub fn set_log_level_filter(&mut self, filter: LevelFilter) -> &mut Self {
self.config.log_level_filter.set_code(filter);
self
}
pub fn set_trace_stats_computation_enabled(
&mut self,
trace_stats_computation_enabled: bool,
) -> &mut Self {
self.config
.trace_stats_computation_enabled
.set_code(trace_stats_computation_enabled);
self
}
pub fn set_remote_config_enabled(&mut self, enabled: bool) -> &mut Self {
self.config.remote_config_enabled.set_code(enabled);
self
}
pub fn set_remote_config_poll_interval(&mut self, seconds: f64) -> &mut Self {
self.config
.remote_config_poll_interval
.set_code(seconds.abs().min(RC_DEFAULT_POLL_INTERVAL));
self
}
pub fn set_datadog_tags_max_length(&mut self, length: usize) -> &mut Self {
self.config
.datadog_tags_max_length
.set_code(length.min(DATADOG_TAGS_MAX_LENGTH));
self
}
#[cfg(feature = "test-utils")]
#[allow(missing_docs)]
pub fn set_datadog_tags_max_length_with_no_limit(&mut self, length: usize) -> &mut Self {
self.config.datadog_tags_max_length.set_code(length);
self
}
#[cfg(feature = "test-utils")]
#[allow(missing_docs)]
pub fn set_trace_writer_synchronous_write(
&mut self,
trace_writer_synchronous_write: bool,
) -> &mut Self {
self.config.trace_writer_synchronous_write = trace_writer_synchronous_write;
self
}
#[cfg(feature = "test-utils")]
#[allow(missing_docs)]
pub fn set_trace_writer_max_flush_interval(
&mut self,
trace_writer_max_flush_interval: Duration,
) -> &mut Self {
self.config.trace_writer_max_flush_interval = trace_writer_max_flush_interval;
self
}
#[cfg(feature = "test-utils")]
pub(crate) fn __internal_set_wait_agent_info_ready(
&mut self,
wait_agent_info_ready: bool,
) -> &mut Self {
self.config.wait_agent_info_ready = wait_agent_info_ready;
self
}
}
#[cfg(test)]
mod tests {
use libdd_telemetry::data::ConfigurationOrigin;
use super::Config;
use super::*;
use crate::core::configuration::sources::{CompositeSource, ConfigSourceOrigin, HashMapSource};
#[test]
fn test_config_from_source() {
let mut sources = CompositeSource::new();
sources.add_source(HashMapSource::from_iter(
[
("DD_SERVICE", "test-service"),
("DD_ENV", "test-env"),
("DD_TRACE_SAMPLING_RULES",
r#"[{"sample_rate":0.5,"service":"web-api","name":null,"resource":null,"tags":{},"provenance":"customer"}]"#),
("DD_TRACE_RATE_LIMIT", "123"),
("DD_TRACE_ENABLED", "true"),
("DD_LOG_LEVEL", "DEBUG"),
],
ConfigSourceOrigin::EnvVar,
));
let config = Config::builder_with_sources(&sources).build();
assert_eq!(&*config.service(), "test-service");
assert_eq!(config.env(), Some("test-env"));
assert_eq!(config.trace_rate_limit(), 123);
let rules = config.trace_sampling_rules();
assert_eq!(rules.len(), 1, "Should have one rule");
assert_eq!(
&rules[0],
&SamplingRuleConfig {
sample_rate: 0.5,
service: Some("web-api".to_string()),
provenance: "customer".to_string(),
..SamplingRuleConfig::default()
}
);
assert!(config.enabled());
assert_eq!(*config.log_level_filter(), super::LevelFilter::Debug);
}
#[test]
fn test_sampling_rules() {
let mut sources = CompositeSource::new();
sources.add_source(HashMapSource::from_iter(
[(
"DD_TRACE_SAMPLING_RULES",
r#"[{"sample_rate":0.5,"service":"test-service","provenance":"customer"}]"#,
)],
ConfigSourceOrigin::EnvVar,
));
let config = Config::builder_with_sources(&sources).build();
assert_eq!(config.trace_sampling_rules().len(), 1);
assert_eq!(
&config.trace_sampling_rules()[0],
&SamplingRuleConfig {
sample_rate: 0.5,
service: Some("test-service".to_string()),
provenance: "customer".to_string(),
..SamplingRuleConfig::default()
}
);
}
#[test]
fn test_config_from_source_manual_override() {
let mut sources = CompositeSource::new();
sources.add_source(HashMapSource::from_iter(
[
("DD_SERVICE", "test-service"),
("DD_TRACE_RATE_LIMIT", "50"),
],
ConfigSourceOrigin::EnvVar,
));
let config = Config::builder_with_sources(&sources)
.set_trace_sampling_rules(vec![SamplingRuleConfig {
sample_rate: 0.8,
service: Some("manual-service".to_string()),
name: None,
resource: None,
tags: HashMap::new(),
provenance: "manual".to_string(),
}])
.set_trace_rate_limit(200)
.set_service("manual-service".to_string())
.set_env("manual-env".to_string())
.set_log_level_filter(super::LevelFilter::Warn)
.build();
assert_eq!(config.trace_rate_limit(), 200);
let rules = config.trace_sampling_rules();
assert_eq!(rules.len(), 1);
assert_eq!(
&config.trace_sampling_rules()[0],
&SamplingRuleConfig {
sample_rate: 0.8,
service: Some("manual-service".to_string()),
provenance: "manual".to_string(),
..SamplingRuleConfig::default()
}
);
assert!(config.enabled());
assert_eq!(*config.log_level_filter(), super::LevelFilter::Warn);
}
#[test]
fn test_propagation_config_from_source() {
let mut sources = CompositeSource::new();
sources.add_source(HashMapSource::from_iter(
[
("DD_TRACE_PROPAGATION_STYLE", ""),
(
"DD_TRACE_PROPAGATION_STYLE_EXTRACT",
"datadog, tracecontext, invalid",
),
("DD_TRACE_PROPAGATION_STYLE_INJECT", "tracecontext"),
("DD_TRACE_PROPAGATION_EXTRACT_FIRST", "true"),
],
ConfigSourceOrigin::EnvVar,
));
let config = Config::builder_with_sources(&sources).build();
assert_eq!(config.trace_propagation_style(), Some(vec![]).as_deref());
assert_eq!(
config.trace_propagation_style_extract(),
Some(vec![
TracePropagationStyle::Datadog,
TracePropagationStyle::TraceContext
])
.as_deref()
);
assert_eq!(
config.trace_propagation_style_inject(),
Some(vec![TracePropagationStyle::TraceContext]).as_deref()
);
assert!(config.trace_propagation_extract_first())
}
#[test]
fn test_propagation_config_from_source_override() {
let mut sources = CompositeSource::new();
sources.add_source(HashMapSource::from_iter(
[
("DD_TRACE_PROPAGATION_STYLE", ""),
(
"DD_TRACE_PROPAGATION_STYLE_EXTRACT",
"datadog, tracecontext",
),
("DD_TRACE_PROPAGATION_STYLE_INJECT", "tracecontext"),
("DD_TRACE_PROPAGATION_EXTRACT_FIRST", "true"),
],
ConfigSourceOrigin::EnvVar,
));
let config = Config::builder_with_sources(&sources)
.set_trace_propagation_style(vec![
TracePropagationStyle::TraceContext,
TracePropagationStyle::Datadog,
])
.set_trace_propagation_style_extract(vec![TracePropagationStyle::TraceContext])
.set_trace_propagation_style_inject(vec![TracePropagationStyle::Datadog])
.set_trace_propagation_extract_first(false)
.build();
assert_eq!(
config.trace_propagation_style(),
Some(vec![
TracePropagationStyle::TraceContext,
TracePropagationStyle::Datadog
])
.as_deref()
);
assert_eq!(
config.trace_propagation_style_extract(),
Some(vec![TracePropagationStyle::TraceContext]).as_deref()
);
assert_eq!(
config.trace_propagation_style_inject(),
Some(vec![TracePropagationStyle::Datadog]).as_deref()
);
assert!(!config.trace_propagation_extract_first());
}
#[test]
fn test_propagation_config_incorrect_extract() {
let mut sources = CompositeSource::new();
sources.add_source(HashMapSource::from_iter(
[
("DD_TRACE_PROPAGATION_STYLE", "datadog, tracecontext"),
("DD_TRACE_PROPAGATION_STYLE_EXTRACT", "incorrect,"),
("DD_TRACE_PROPAGATION_STYLE_INJECT", "tracecontext"),
("DD_TRACE_PROPAGATION_EXTRACT_FIRST", "true"),
],
ConfigSourceOrigin::EnvVar,
));
let config = Config::builder_with_sources(&sources).build();
assert_eq!(
config.trace_propagation_style(),
Some(vec![
TracePropagationStyle::Datadog,
TracePropagationStyle::TraceContext,
])
.as_deref()
);
assert_eq!(
config.trace_propagation_style_extract(),
Some(vec![]).as_deref()
);
assert_eq!(
config.trace_propagation_style_inject(),
Some(vec![TracePropagationStyle::TraceContext]).as_deref()
);
assert!(config.trace_propagation_extract_first());
}
#[test]
fn test_propagation_config_empty_extract() {
let mut sources = CompositeSource::new();
sources.add_source(HashMapSource::from_iter(
[
("DD_TRACE_PROPAGATION_STYLE", ""),
("DD_TRACE_PROPAGATION_STYLE_EXTRACT", ""),
("DD_TRACE_PROPAGATION_STYLE_INJECT", "tracecontext"),
("DD_TRACE_PROPAGATION_EXTRACT_FIRST", "true"),
],
ConfigSourceOrigin::EnvVar,
));
let config = Config::builder_with_sources(&sources).build();
assert_eq!(config.trace_propagation_style(), Some(vec![]).as_deref());
assert_eq!(
config.trace_propagation_style_extract(),
Some(vec![]).as_deref()
);
assert_eq!(
config.trace_propagation_style_inject(),
Some(vec![TracePropagationStyle::TraceContext]).as_deref()
);
assert!(config.trace_propagation_extract_first());
}
#[test]
fn test_propagation_config_not_present_extract() {
let mut sources = CompositeSource::new();
sources.add_source(HashMapSource::from_iter(
[
("DD_TRACE_PROPAGATION_STYLE_INJECT", "tracecontext"),
("DD_TRACE_PROPAGATION_EXTRACT_FIRST", "true"),
],
ConfigSourceOrigin::EnvVar,
));
let config = Config::builder_with_sources(&sources).build();
assert_eq!(
config.trace_propagation_style(),
Some(vec![
TracePropagationStyle::Datadog,
TracePropagationStyle::TraceContext,
])
.as_deref()
);
assert_eq!(config.trace_propagation_style_extract(), None);
assert_eq!(
config.trace_propagation_style_inject(),
Some(vec![TracePropagationStyle::TraceContext]).as_deref()
);
assert!(config.trace_propagation_extract_first());
}
#[test]
fn test_stats_computation_enabled_config() {
let mut sources = CompositeSource::new();
sources.add_source(HashMapSource::from_iter(
[("DD_TRACE_STATS_COMPUTATION_ENABLED", "false")],
ConfigSourceOrigin::EnvVar,
));
let config = Config::builder_with_sources(&sources).build();
assert!(!config.trace_stats_computation_enabled());
let mut sources = CompositeSource::new();
sources.add_source(HashMapSource::from_iter(
[("DD_TRACE_STATS_COMPUTATION_ENABLED", "true")],
ConfigSourceOrigin::EnvVar,
));
let config = Config::builder_with_sources(&sources).build();
assert!(config.trace_stats_computation_enabled());
let mut sources = CompositeSource::new();
sources.add_source(HashMapSource::from_iter(
[("DD_TRACE_STATS_COMPUTATION_ENABLED", "a")],
ConfigSourceOrigin::EnvVar,
));
let config = Config::builder_with_sources(&sources).build();
assert!(config.trace_stats_computation_enabled());
let config = Config::builder()
.set_trace_stats_computation_enabled(false)
.build();
assert!(!config.trace_stats_computation_enabled());
}
#[test]
fn test_extra_services_tracking() {
let config = Config::builder()
.set_service("main-service".to_string())
.build();
assert_eq!(config.get_extra_services().len(), 0);
config.add_extra_services(
[
"service-1",
"service-2",
"service-3",
"main-service",
"service-1",
]
.into_iter(),
);
let services = config.get_extra_services();
assert_eq!(services.len(), 3);
assert!(services.contains(&"service-1".to_string()));
assert!(services.contains(&"service-2".to_string()));
assert!(services.contains(&"service-3".to_string()));
assert!(!services.contains(&"main-service".to_string()));
}
#[test]
fn test_extra_services_disabled_when_remote_config_disabled() {
let mut sources = CompositeSource::new();
sources.add_source(HashMapSource::from_iter(
[("DD_REMOTE_CONFIGURATION_ENABLED", "false")],
ConfigSourceOrigin::EnvVar,
));
let config = Config::builder_with_sources(&sources)
.set_service("main-service".to_string())
.build();
config.add_extra_services(["service-1", "service-2"].into_iter());
let services = config.get_extra_services();
assert_eq!(services.len(), 0);
}
#[test]
fn test_extra_services_limit() {
let config = Config::builder()
.set_service("main-service".to_string())
.build();
config.add_extra_services((0..70).map(|i| format!("service-{i}")));
let services = config.get_extra_services();
assert_eq!(services.len(), 64);
}
#[test]
fn test_remote_config_enabled_from_env() {
let mut sources = CompositeSource::new();
sources.add_source(HashMapSource::from_iter(
[("DD_REMOTE_CONFIGURATION_ENABLED", "true")],
ConfigSourceOrigin::EnvVar,
));
let config = Config::builder_with_sources(&sources).build();
assert!(config.remote_config_enabled());
let mut sources = CompositeSource::new();
sources.add_source(HashMapSource::from_iter(
[("DD_REMOTE_CONFIGURATION_ENABLED", "false")],
ConfigSourceOrigin::EnvVar,
));
let config = Config::builder_with_sources(&sources).build();
assert!(!config.remote_config_enabled());
let mut sources = CompositeSource::new();
sources.add_source(HashMapSource::from_iter(
[("DD_REMOTE_CONFIGURATION_ENABLED", "invalid")],
ConfigSourceOrigin::EnvVar,
));
let config = Config::builder_with_sources(&sources).build();
assert!(config.remote_config_enabled());
let config = Config::builder().build();
assert!(config.remote_config_enabled()); }
#[test]
fn test_sampling_rules_update_callbacks() {
let config = Config::builder().build();
let callback_called = Arc::new(Mutex::new(false));
let callback_rules = Arc::new(Mutex::new(Vec::<SamplingRuleConfig>::new()));
let callback_called_clone = callback_called.clone();
let callback_rules_clone = callback_rules.clone();
config.set_sampling_rules_callback(move |update| {
*callback_called_clone.lock().unwrap() = true;
let RemoteConfigUpdate::SamplingRules(rules) = update;
*callback_rules_clone.lock().unwrap() = rules.clone();
});
assert!(!*callback_called.lock().unwrap());
assert!(callback_rules.lock().unwrap().is_empty());
let new_rules = vec![SamplingRuleConfig {
sample_rate: 0.5,
service: Some("test-service".to_string()),
provenance: "remote".to_string(),
..SamplingRuleConfig::default()
}];
let rules_json = serde_json::to_string(&new_rules).unwrap();
config
.update_sampling_rules_from_remote(&rules_json, None)
.unwrap();
assert!(*callback_called.lock().unwrap());
assert_eq!(*callback_rules.lock().unwrap(), new_rules);
*callback_called.lock().unwrap() = false;
callback_rules.lock().unwrap().clear();
config.clear_remote_sampling_rules(None);
assert!(*callback_called.lock().unwrap());
assert!(callback_rules.lock().unwrap().is_empty());
}
#[test]
fn test_config_item_priority() {
let mut config_item = ConfigItemWithOverride::new_rc(
SupportedConfigurations::DD_TRACE_SAMPLING_RULES,
ParsedSamplingRules::default(),
);
assert_eq!(config_item.source(), ConfigSourceOrigin::Default);
assert_eq!(config_item.value().len(), 0);
config_item.set_value_source(
ParsedSamplingRules {
rules: vec![SamplingRuleConfig {
sample_rate: 0.3,
..SamplingRuleConfig::default()
}],
},
ConfigSourceOrigin::EnvVar,
);
assert_eq!(config_item.source(), ConfigSourceOrigin::EnvVar);
assert_eq!(config_item.value()[0].sample_rate, 0.3);
config_item.set_code(ParsedSamplingRules {
rules: vec![SamplingRuleConfig {
sample_rate: 0.5,
..SamplingRuleConfig::default()
}],
});
assert_eq!(config_item.source(), ConfigSourceOrigin::Code);
assert_eq!(config_item.value()[0].sample_rate, 0.5);
config_item.set_value_source(
ParsedSamplingRules {
rules: vec![SamplingRuleConfig {
sample_rate: 0.8,
..SamplingRuleConfig::default()
}],
},
ConfigSourceOrigin::RemoteConfig,
);
assert_eq!(config_item.source(), ConfigSourceOrigin::RemoteConfig);
assert_eq!(config_item.value()[0].sample_rate, 0.8);
config_item.unset_override_value();
assert_eq!(config_item.source(), ConfigSourceOrigin::Code);
assert_eq!(config_item.value()[0].sample_rate, 0.5);
}
#[test]
fn test_sampling_rules_with_config_item() {
let mut sources = CompositeSource::new();
sources.add_source(HashMapSource::from_iter(
[(
"DD_TRACE_SAMPLING_RULES",
r#"[{"sample_rate":0.25,"service":"env-service"}]"#,
)],
ConfigSourceOrigin::EnvVar,
));
let config = Config::builder_with_sources(&sources).build();
assert_eq!(config.trace_sampling_rules().len(), 1);
assert_eq!(config.trace_sampling_rules()[0].sample_rate, 0.25);
let config = Config::builder_with_sources(&sources)
.set_trace_sampling_rules(vec![SamplingRuleConfig {
sample_rate: 0.75,
service: Some("code-service".to_string()),
..SamplingRuleConfig::default()
}])
.build();
assert_eq!(config.trace_sampling_rules()[0].sample_rate, 0.75);
assert_eq!(
config.trace_sampling_rules()[0].service.as_ref().unwrap(),
"code-service"
);
}
#[test]
fn test_empty_remote_rules_fallback_behavior() {
let mut config = Config::builder().build();
let local_rules = ParsedSamplingRules {
rules: vec![SamplingRuleConfig {
sample_rate: 0.3,
service: Some("local-service".to_string()),
provenance: "local".to_string(),
..SamplingRuleConfig::default()
}],
};
config
.trace_sampling_rules
.set_value_source(local_rules.clone(), ConfigSourceOrigin::EnvVar);
assert_eq!(config.trace_sampling_rules().len(), 1);
assert_eq!(config.trace_sampling_rules()[0].sample_rate, 0.3);
assert_eq!(
config.trace_sampling_rules.source(),
ConfigSourceOrigin::EnvVar
);
let remote_rules_json =
r#"[{"sample_rate": 0.8, "service": "remote-service", "provenance": "remote"}]"#;
config
.update_sampling_rules_from_remote(remote_rules_json, None)
.unwrap();
assert_eq!(config.trace_sampling_rules().len(), 1);
assert_eq!(config.trace_sampling_rules()[0].sample_rate, 0.8);
assert_eq!(
config.trace_sampling_rules.source(),
ConfigSourceOrigin::RemoteConfig
);
let empty_remote_rules_json = "[]";
config
.update_sampling_rules_from_remote(empty_remote_rules_json, None)
.unwrap();
assert_eq!(config.trace_sampling_rules().len(), 1); assert_eq!(config.trace_sampling_rules()[0].sample_rate, 0.3); assert_eq!(
config.trace_sampling_rules.source(),
ConfigSourceOrigin::EnvVar
);
config.clear_remote_sampling_rules(None);
assert_eq!(config.trace_sampling_rules().len(), 1);
assert_eq!(config.trace_sampling_rules()[0].sample_rate, 0.3);
assert_eq!(
config.trace_sampling_rules.source(),
ConfigSourceOrigin::EnvVar
);
}
#[test]
fn test_update_sampling_rules_from_remote_config_id() {
let config = Config::builder().build();
let new_rules = vec![SamplingRuleConfig {
sample_rate: 0.5,
service: Some("test-service".to_string()),
provenance: "remote".to_string(),
..SamplingRuleConfig::default()
}];
let rules_json = serde_json::to_string(&new_rules).unwrap();
config
.update_sampling_rules_from_remote(&rules_json, Some("config_id_1".to_string()))
.unwrap();
assert_eq!(
config.trace_sampling_rules.get_config_id(),
Some("config_id_1".to_string())
);
config
.update_sampling_rules_from_remote(&rules_json, Some("config_id_2".to_string()))
.unwrap();
assert_eq!(
config.trace_sampling_rules.get_config_id(),
Some("config_id_2".to_string())
);
config
.update_sampling_rules_from_remote("[]", None)
.unwrap();
assert_eq!(config.trace_sampling_rules.get_config_id(), None);
}
#[test]
fn test_telemetry_config_from_sources() {
let mut sources = CompositeSource::new();
sources.add_source(HashMapSource::from_iter(
[
("DD_INSTRUMENTATION_TELEMETRY_ENABLED", "false"),
("DD_TELEMETRY_LOG_COLLECTION_ENABLED", "false"),
("DD_TELEMETRY_HEARTBEAT_INTERVAL", "42"),
],
ConfigSourceOrigin::EnvVar,
));
let config = Config::builder_with_sources(&sources).build();
assert!(!config.telemetry_enabled());
assert!(!config.telemetry_log_collection_enabled());
assert_eq!(config.telemetry_heartbeat_interval(), 42.0);
}
#[test]
fn test_telemetry_config() {
let mut sources = CompositeSource::new();
sources.add_source(HashMapSource::from_iter(
[
("DD_INSTRUMENTATION_TELEMETRY_ENABLED", "false"),
("DD_TELEMETRY_LOG_COLLECTION_ENABLED", "false"),
("DD_TELEMETRY_HEARTBEAT_INTERVAL", "42"),
],
ConfigSourceOrigin::EnvVar,
));
let mut builder = Config::builder_with_sources(&sources);
builder
.set_telemetry_enabled(true)
.set_telemetry_log_collection_enabled(true)
.set_telemetry_heartbeat_interval(0.1);
let config = builder.build();
assert!(config.telemetry_enabled());
assert!(config.telemetry_log_collection_enabled());
assert_eq!(config.telemetry_heartbeat_interval(), 0.1);
}
#[test]
fn test_dd_tags() {
let mut sources = CompositeSource::new();
sources.add_source(HashMapSource::from_iter(
[("DD_TAGS", "key1 :value1 , key2:,key3")],
ConfigSourceOrigin::EnvVar,
));
let config = Config::builder_with_sources(&sources).build();
let tags: Vec<(&str, &str)> = config.global_tags().collect();
assert_eq!(tags.len(), 2);
assert_eq!(tags, vec![("key1", "value1"), ("key2", "")]);
}
#[test]
fn test_dd_agent_url_default() {
let config = Config::builder().build();
assert_eq!(&*config.trace_agent_url(), "http://localhost:8126");
}
#[test]
fn test_dd_agent_url_from_host_and_port() {
let mut sources = CompositeSource::new();
sources.add_source(HashMapSource::from_iter(
[
("DD_AGENT_HOST", "agent-host"),
("DD_TRACE_AGENT_PORT", "4242"),
],
ConfigSourceOrigin::EnvVar,
));
let config = Config::builder_with_sources(&sources).build();
assert_eq!(&*config.trace_agent_url(), "http://agent-host:4242");
}
#[test]
fn test_dd_agent_url_from_url() {
let mut sources = CompositeSource::new();
sources.add_source(HashMapSource::from_iter(
[
("DD_TRACE_AGENT_URL", "https://test-host"),
("DD_AGENT_HOST", "agent-host"),
("DD_TRACE_AGENT_PORT", "4242"),
],
ConfigSourceOrigin::EnvVar,
));
let config = Config::builder_with_sources(&sources).build();
assert_eq!(&*config.trace_agent_url(), "https://test-host");
}
#[test]
fn test_dd_agent_url_from_url_empty() {
let mut sources = CompositeSource::new();
sources.add_source(HashMapSource::from_iter(
[
("DD_AGENT_HOST", "agent-host"),
("DD_TRACE_AGENT_PORT", "4242"),
],
ConfigSourceOrigin::EnvVar,
));
let config = Config::builder_with_sources(&sources).build();
assert_eq!(&*config.trace_agent_url(), "http://agent-host:4242");
}
#[test]
fn test_dd_agent_url_from_host_and_port_using_builder() {
let config = Config::builder()
.set_agent_host("agent-host".into())
.set_trace_agent_port(4242)
.build();
assert_eq!(&*config.trace_agent_url(), "http://agent-host:4242");
}
#[test]
fn test_dd_agent_url_from_url_using_builder() {
let config = Config::builder()
.set_agent_host("agent-host".into())
.set_trace_agent_port(4242)
.set_trace_agent_url("https://test-host".into())
.build();
assert_eq!(&*config.trace_agent_url(), "https://test-host");
}
#[test]
fn test_dogstatsd_agent_url_default() {
let config = Config::builder().build();
assert_eq!(&*config.dogstatsd_agent_url(), "http://localhost:8125");
}
#[test]
fn test_dogstatsd_agent_url_from_host_and_port() {
let mut sources = CompositeSource::new();
sources.add_source(HashMapSource::from_iter(
[
("DD_DOGSTATSD_HOST", "dogstatsd-host"),
("DD_DOGSTATSD_PORT", "4242"),
],
ConfigSourceOrigin::EnvVar,
));
let config = Config::builder_with_sources(&sources).build();
assert_eq!(&*config.dogstatsd_agent_url(), "http://dogstatsd-host:4242");
}
#[test]
fn test_dogstatsd_agent_url_from_url_using_builder() {
let config = Config::builder()
.set_dogstatsd_agent_host("dogstatsd-host".into())
.set_dogstatsd_agent_port(4242)
.build();
assert_eq!(&*config.dogstatsd_agent_url(), "http://dogstatsd-host:4242");
}
#[test]
fn test_config_source_updater() {
let mut sources = CompositeSource::new();
sources.add_source(HashMapSource::from_iter(
[("DD_ENV", "test-env")],
ConfigSourceOrigin::EnvVar,
));
sources.add_source(HashMapSource::from_iter(
[("DD_ENABLED", "false")],
ConfigSourceOrigin::RemoteConfig,
));
sources.add_source(HashMapSource::from_iter(
[("DD_TAGS", "v1,v2")],
ConfigSourceOrigin::Code,
));
let default = default_config();
let cisu = ConfigItemSourceUpdater { sources: &sources };
assert_eq!(default.env(), None);
assert!(default.enabled());
assert_eq!(default.global_tags().collect::<Vec<_>>(), vec![]);
let env = cisu.update_string(default.env, Some);
assert_eq!(env.default_value, None);
assert_eq!(env.env_value, Some(Some("test-env".to_string())));
assert_eq!(env.code_value, None);
let enabled = cisu.update_parsed(default.enabled);
assert!(enabled.default_value);
assert_eq!(enabled.env_value, None);
assert_eq!(enabled.code_value, None);
struct Tags(Vec<(String, String)>);
impl FromStr for Tags {
type Err = &'static str;
fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(Tags(
s.split(',')
.enumerate()
.map(|(index, s)| (index.to_string(), s.to_string()))
.collect(),
))
}
}
let tags = cisu.update_parsed_with_transform(default.global_tags, |Tags(tags)| tags);
assert_eq!(tags.default_value, vec![]);
assert_eq!(tags.env_value, None);
assert_eq!(
tags.code_value,
Some(vec![
("0".to_string(), "v1".to_string()),
("1".to_string(), "v2".to_string())
])
);
}
#[test]
fn test_get_configuration_config_item_rc() {
let mut sources = CompositeSource::new();
sources.add_source(HashMapSource::from_iter(
[
("DD_TRACE_SAMPLING_RULES",
r#"[{"sample_rate":0.5,"service":"web-api","name":null,"resource":null,"tags":{},"provenance":"customer"}]"#),
],
ConfigSourceOrigin::EnvVar,
));
let config = Config::builder_with_sources(&sources).build();
let expected = ParsedSamplingRules::from_str(
r#"[{"sample_rate":0.5,"service":"web-api","name":null,"resource":null,"tags":{},"provenance":"customer"}]"#
).unwrap();
let configurations = &config.trace_sampling_rules.get_all_configurations();
let active_configuration = configurations.iter().max_by_key(|c| c.seq_id).unwrap();
assert_eq!(active_configuration.origin, ConfigurationOrigin::EnvVar);
assert_eq!(
ParsedSamplingRules::from_str(&active_configuration.value).unwrap(),
expected.clone()
);
let expected_rc = ParsedSamplingRules::from_str(r#"[{"sample_rate":1,"service":"web-api","name":null,"resource":null,"tags":{},"provenance":"customer"}]"#).unwrap();
config
.trace_sampling_rules
.set_override_value(expected_rc.clone(), ConfigSourceOrigin::RemoteConfig);
let configurations_after_rc = &config.trace_sampling_rules.get_all_configurations();
let active_configuration_after_rc = configurations_after_rc
.iter()
.max_by_key(|c| c.seq_id)
.unwrap();
assert_eq!(
active_configuration_after_rc.origin,
ConfigurationOrigin::RemoteConfig
);
assert_eq!(
ParsedSamplingRules::from_str(&active_configuration_after_rc.value).unwrap(),
expected_rc
);
config.trace_sampling_rules.unset_override_value();
let configurations = &config.trace_sampling_rules.get_all_configurations();
let active_configuration = configurations.iter().max_by_key(|c| c.seq_id).unwrap();
assert_eq!(active_configuration.origin, ConfigurationOrigin::EnvVar);
assert_eq!(
ParsedSamplingRules::from_str(&active_configuration.value).unwrap(),
expected
);
}
#[test]
fn test_datadog_tags_max_length() {
let config = Config::builder().set_datadog_tags_max_length(4242).build();
assert_eq!(config.datadog_tags_max_length(), DATADOG_TAGS_MAX_LENGTH);
let mut sources = CompositeSource::new();
sources.add_source(HashMapSource::from_iter(
[("DD_TRACE_X_DATADOG_TAGS_MAX_LENGTH", "4242")],
ConfigSourceOrigin::EnvVar,
));
let config = Config::builder_with_sources(&sources).build();
assert_eq!(config.datadog_tags_max_length(), DATADOG_TAGS_MAX_LENGTH);
let mut sources = CompositeSource::new();
sources.add_source(HashMapSource::from_iter(
[("DD_TRACE_X_DATADOG_TAGS_MAX_LENGTH", "42")],
ConfigSourceOrigin::EnvVar,
));
let config = Config::builder_with_sources(&sources).build();
assert_eq!(config.datadog_tags_max_length(), 42);
}
#[test]
fn test_remote_config_poll_interval() {
let config = Config::builder()
.set_remote_config_poll_interval(42.0)
.build();
assert_eq!(config.remote_config_poll_interval(), 5.0);
let config = Config::builder()
.set_remote_config_poll_interval(-0.2)
.build();
assert_eq!(config.remote_config_poll_interval(), 0.2);
}
}