use crate::core::configuration::Config;
use crate::core::utils::{ShutdownSignaler, WorkerHandle};
use anyhow::Result;
use core::fmt;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::thread::{self};
use std::time::{Duration, Instant};
use http_body_util::{BodyExt, Full};
use hyper::{body::Bytes, Method, Request};
use hyper_util::client::legacy::{connect::HttpConnector, Client};
use hyper_util::rt::TokioExecutor;
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(3);
#[derive(Debug, Clone)]
struct ClientCapabilities(u64);
impl ClientCapabilities {
const APM_TRACING_SAMPLE_RULES: u64 = 1 << 29;
fn new() -> Self {
Self(Self::APM_TRACING_SAMPLE_RULES)
}
fn encode(&self) -> String {
use base64::Engine;
let bytes = self.0.to_be_bytes();
let start = bytes
.iter()
.position(|&b| b != 0)
.unwrap_or(bytes.len() - 1);
base64::engine::general_purpose::STANDARD.encode(&bytes[start..])
}
}
#[derive(Debug, Clone, Serialize)]
struct ClientState {
root_version: u64,
targets_version: u64,
config_states: Vec<ConfigState>,
has_error: bool,
#[serde(skip_serializing_if = "Option::is_none")]
error: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
backend_client_state: Option<String>,
}
#[derive(Debug, Clone, Serialize)]
struct ConfigState {
id: String,
version: u64,
product: String,
apply_state: u64,
apply_error: Option<String>,
}
#[derive(Debug, Serialize)]
struct ConfigRequest {
client: ClientInfo,
cached_target_files: Vec<CachedTargetFile>,
}
#[derive(Debug, Serialize)]
struct ClientInfo {
#[serde(skip_serializing_if = "Option::is_none")]
state: Option<ClientState>,
id: String,
products: Vec<String>,
is_tracer: bool,
#[serde(skip_serializing_if = "Option::is_none")]
client_tracer: Option<ClientTracer>,
capabilities: String,
}
#[derive(Debug, Serialize)]
struct ClientTracer {
runtime_id: String,
language: String,
tracer_version: String,
service: String,
#[serde(default)]
extra_services: Vec<String>,
#[serde(skip_serializing_if = "Option::is_none")]
env: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
app_version: Option<String>,
tags: Vec<String>,
}
#[derive(Debug, Clone, Serialize)]
struct CachedTargetFile {
path: String,
length: u64,
hashes: Vec<Hash>,
}
#[derive(Debug, Clone, Serialize)]
struct Hash {
algorithm: String,
hash: String,
}
#[derive(Debug, Deserialize)]
struct ConfigResponse {
#[serde(default)]
#[allow(dead_code)] roots: Option<Vec<String>>,
#[serde(default)]
targets: Option<String>,
#[serde(default)]
target_files: Option<Vec<TargetFile>>,
#[serde(default)]
client_configs: Option<Vec<String>>,
}
#[derive(Debug, Deserialize)]
struct TargetFile {
path: String,
raw: String,
}
fn missing_field_and_null_value<'de, D>(
deserializer: D,
) -> Result<Option<serde_json::Value>, D::Error>
where
D: serde::Deserializer<'de>,
{
Ok(Some(serde_json::Value::deserialize(deserializer)?))
}
#[derive(Debug, Clone, Deserialize)]
struct ApmTracingConfig {
id: String,
lib_config: LibConfig, }
#[derive(Debug, Clone, Deserialize)]
struct LibConfig {
#[serde(
deserialize_with = "missing_field_and_null_value",
default,
rename = "tracing_sampling_rules"
)]
tracing_sampling_rules: Option<serde_json::Value>,
}
type TargetsMetadata = SignedTargets;
#[derive(Debug, Deserialize, Serialize)]
struct TargetDesc {
length: u64,
hashes: HashMap<String, String>,
custom: Option<serde_json::Value>,
}
#[derive(Debug, Deserialize)]
struct Targets {
#[serde(rename = "_type")]
#[allow(dead_code)] target_type: String,
custom: Option<serde_json::Value>,
#[allow(dead_code)] expires: String,
#[allow(dead_code)] spec_version: String,
targets: HashMap<String, TargetDesc>,
version: u64,
}
#[derive(Debug, Deserialize)]
struct SignedTargets {
#[allow(dead_code)] signatures: Option<Vec<serde_json::Value>>,
signed: Targets,
#[allow(dead_code)] version: Option<u64>,
}
#[derive(Debug, Clone)]
pub enum RemoteConfigClientError {
InvalidAgentUri,
HandleMutexPoisoned,
WorkerPanicked(String),
ShutdownTimedOut,
}
impl fmt::Display for RemoteConfigClientError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::InvalidAgentUri => write!(f, "invalid agent URI"),
Self::HandleMutexPoisoned => write!(f, "handle mutex poisoned"),
Self::WorkerPanicked(msg) => write!(f, "remote config worker panicked: {}", msg),
Self::ShutdownTimedOut => write!(f, "shutdown timed out"),
}
}
}
pub struct RemoteConfigClientHandle {
cancel_token: tokio_util::sync::CancellationToken,
worker_handle: WorkerHandle,
}
impl Drop for RemoteConfigClientHandle {
fn drop(&mut self) {
self.trigger_shutdown();
}
}
impl RemoteConfigClientHandle {
pub fn trigger_shutdown(&self) {
self.cancel_token.cancel();
}
pub fn wait_for_shutdown(&self, timeout: Duration) -> Result<(), RemoteConfigClientError> {
use crate::core::utils::WorkerError::*;
if let Err(e) = self.worker_handle.wait_for_shutdown(timeout) {
Err(match e {
ShutdownTimedOut => RemoteConfigClientError::ShutdownTimedOut,
HandleMutexPoisoned => RemoteConfigClientError::HandleMutexPoisoned,
WorkerPanicked(p) => RemoteConfigClientError::WorkerPanicked(p),
})
} else {
Ok(())
}
}
}
struct RemoteConfigClientShutdownReceiver {
cancel_token: tokio_util::sync::CancellationToken,
shutdown_finished: Arc<ShutdownSignaler>,
}
impl Drop for RemoteConfigClientShutdownReceiver {
fn drop(&mut self) {
self.shutdown_finished.signal_shutdown();
}
}
pub struct RemoteConfigClientWorker {
client: RemoteConfigClient,
shutdown_receiver: RemoteConfigClientShutdownReceiver,
}
impl RemoteConfigClientWorker {
pub fn start(config: Arc<Config>) -> Result<RemoteConfigClientHandle, RemoteConfigClientError> {
let cancel_token = tokio_util::sync::CancellationToken::new();
let shutdown_finished = ShutdownSignaler::new();
let shutdown_receiver = RemoteConfigClientShutdownReceiver {
cancel_token: cancel_token.clone(),
shutdown_finished: shutdown_finished.clone(),
};
let worker = Self {
client: RemoteConfigClient::new(config)?,
shutdown_receiver,
};
let join_handle = thread::spawn(move || worker.run());
Ok(RemoteConfigClientHandle {
cancel_token,
worker_handle: WorkerHandle::new(shutdown_finished, join_handle),
})
}
fn run(mut self) {
crate::dd_debug!("RemoteConfigClient: started client worker");
let rt = match tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
{
Ok(rt) => rt,
Err(e) => {
crate::dd_debug!("RemoteConfigClient: Failed to create Tokio runtime: {}", e);
return;
}
};
let run_loop = async {
let mut last_poll = Instant::now();
loop {
match self.client.fetch_and_apply_config().await {
Ok(()) => {
crate::dd_debug!(
"RemoteConfigClient: Successfully fetched and applied config"
);
if let Ok(mut state) = self.client.state.lock() {
state.has_error = false;
state.error = None;
}
}
Err(e) => {
crate::dd_debug!("RemoteConfigClient: Failed to fetch config: {}", e);
if let Ok(mut state) = self.client.state.lock() {
state.has_error = true;
state.error = Some(format!("{e}"));
}
}
}
let elapsed = last_poll.elapsed();
if elapsed < self.client.poll_interval {
tokio::time::sleep(self.client.poll_interval - elapsed).await
}
last_poll = Instant::now();
}
};
rt.block_on(async {
tokio::select! {
_ = self.shutdown_receiver.cancel_token.cancelled() => {},
_ = run_loop => {},
}
});
}
}
struct RemoteConfigClient {
client_id: String,
config: Arc<Config>,
agent_url: hyper::Uri,
client_timeout: Duration,
state: Arc<Mutex<ClientState>>,
capabilities: ClientCapabilities,
poll_interval: Duration,
cached_target_files: Vec<CachedTargetFile>,
product_registry: ProductRegistry,
}
impl RemoteConfigClient {
pub fn new(config: Arc<Config>) -> Result<Self, RemoteConfigClientError> {
let agent_url = hyper::Uri::from_maybe_shared(config.trace_agent_url().to_string())
.map_err(|_| RemoteConfigClientError::InvalidAgentUri)?;
let mut parts = agent_url.into_parts();
parts.path_and_query = Some(
"/v0.7/config"
.parse()
.map_err(|_| RemoteConfigClientError::InvalidAgentUri)?,
);
let agent_url =
hyper::Uri::from_parts(parts).map_err(|_| RemoteConfigClientError::InvalidAgentUri)?;
let state = Arc::new(Mutex::new(ClientState {
root_version: 1, targets_version: 0,
config_states: Vec::new(),
has_error: false,
error: None,
backend_client_state: None,
}));
let poll_interval = Duration::from_secs_f64(config.remote_config_poll_interval());
Ok(Self {
client_id: uuid::Uuid::new_v4().to_string(),
config,
agent_url,
client_timeout: DEFAULT_TIMEOUT,
state,
capabilities: ClientCapabilities::new(),
poll_interval,
cached_target_files: Vec::new(),
product_registry: ProductRegistry::new(),
})
}
async fn fetch_and_apply_config(&mut self) -> Result<()> {
let mut connector = HttpConnector::new();
connector.set_connect_timeout(Some(self.client_timeout));
let client = Client::builder(TokioExecutor::new()).build(connector);
let request_payload = self.build_request()?;
let json_body = serde_json::to_string(&request_payload)
.map_err(|e| anyhow::anyhow!("Failed to serialize request: {}", e))?;
let req = Request::builder()
.method(Method::POST)
.uri(self.agent_url.clone())
.header("content-type", "application/json")
.header("user-agent", "dd-trace-rs")
.body(Full::new(Bytes::from(json_body)))
.map_err(|e| anyhow::anyhow!("Failed to build request: {}", e))?;
let response = client
.request(req)
.await
.map_err(|e| anyhow::anyhow!("Failed to send request: {}", e))?;
if !response.status().is_success() {
return Err(anyhow::anyhow!(
"Agent returned error status: {}",
response.status()
));
}
let body_bytes = response
.into_body()
.collect()
.await
.map_err(|e| anyhow::anyhow!("Failed to read response body: {}", e))?
.to_bytes();
let config_response: ConfigResponse = serde_json::from_slice(&body_bytes)
.map_err(|e| anyhow::anyhow!("Failed to parse response: {}", e))?;
self.process_response(config_response)?;
Ok(())
}
fn build_request(&self) -> Result<ConfigRequest> {
let state = self
.state
.lock()
.map_err(|_| anyhow::anyhow!("Failed to lock state"))?;
let config = &self.config;
let client_info = ClientInfo {
state: Some(state.clone()),
id: self.client_id.clone(),
products: vec!["APM_TRACING".to_string()],
is_tracer: true,
client_tracer: Some(ClientTracer {
runtime_id: config.runtime_id().to_string(),
language: "rust".to_string(),
tracer_version: config.tracer_version().to_string(),
service: config.service().to_string(),
extra_services: config.get_extra_services(),
env: config.env().map(|s| s.to_string()),
app_version: config.version().map(|s| s.to_string()),
tags: config
.global_tags()
.map(|(key, value)| format!("{key}:{value}"))
.collect(),
}),
capabilities: self.capabilities.encode(),
};
let cached_files = self.cached_target_files.clone();
Ok(ConfigRequest {
client: client_info,
cached_target_files: cached_files,
})
}
fn process_response(&mut self, response: ConfigResponse) -> Result<()> {
let mut path_to_custom: HashMap<String, (Option<String>, Option<u64>)> = HashMap::new();
let mut signed_targets: Option<serde_json::Value> = None;
if let Some(targets_str) = response.targets {
use base64::Engine;
let decoded = base64::engine::general_purpose::STANDARD
.decode(&targets_str)
.map_err(|e| anyhow::anyhow!("Failed to decode targets: {}", e))?;
let targets_json = String::from_utf8(decoded)
.map_err(|e| anyhow::anyhow!("Invalid UTF-8 in targets: {}", e))?;
let targets: TargetsMetadata = serde_json::from_str(&targets_json)
.map_err(|e| anyhow::anyhow!("Failed to parse targets metadata: {}", e))?;
let targets_map = targets
.signed
.targets
.iter()
.map(|(k, v)| (k.clone(), serde_json::to_value(v).unwrap()))
.collect();
signed_targets = Some(serde_json::Value::Object(targets_map));
for (path, desc) in &targets.signed.targets {
let custom = &desc.custom;
let id = custom
.as_ref()
.and_then(|c| Some(c.get("id")?.as_str()?.to_owned()));
let version: Option<u64> = custom
.as_ref()
.and_then(|c| c.get("v").or_else(|| c.get("version"))?.as_u64());
path_to_custom.insert(path.clone(), (id, version));
}
if let Ok(mut state) = self.state.lock() {
state.targets_version = targets.signed.version;
if let Some(custom) = &targets.signed.custom {
if let Some(backend_state) =
custom.get("opaque_backend_state").and_then(|v| v.as_str())
{
state.backend_client_state = Some(backend_state.to_string());
}
}
}
}
if let Some(target_files) = &response.target_files {
self.validate_signed_target_files(
target_files,
&signed_targets,
&response.client_configs,
)?;
}
if let Some(target_files) = response.target_files {
let mut new_cache = Vec::new();
let mut any_failure = false;
let mut config_states_cleared = false;
for file in target_files {
let Some((product, config_id)) = extract_product_and_id_from_path(&file.path)
else {
crate::dd_debug!(
"RemoteConfigClient: Failed to extract product from path: {}",
file.path
);
continue;
};
let handler = match self.product_registry.get_handler(&product) {
Some(h) => h,
None => {
continue;
}
};
use base64::Engine;
let decoded = base64::engine::general_purpose::STANDARD
.decode(&file.raw)
.map_err(|e| anyhow::anyhow!("Failed to decode config: {}", e))?;
let config_str = String::from_utf8(decoded)
.map_err(|e| anyhow::anyhow!("Invalid UTF-8 in config: {}", e))?;
let (_, meta_version) = path_to_custom
.get(&file.path)
.cloned()
.unwrap_or((None, None));
let config_version = meta_version.unwrap_or(1);
match handler.process_config(&config_str, &self.config) {
Ok(_) => {
use sha2::{Digest, Sha256};
let mut hasher = Sha256::new();
hasher.update(&file.raw);
let hash_result = hasher.finalize();
let hash_hex = format!("{hash_result:x}");
new_cache.push(CachedTargetFile {
path: file.path.clone(),
length: file.raw.len() as u64,
hashes: vec![Hash {
algorithm: "sha256".to_string(),
hash: hash_hex,
}],
});
if let Ok(mut state) = self.state.lock() {
if !config_states_cleared {
state.config_states.clear();
config_states_cleared = true;
}
state.config_states.push(ConfigState {
id: config_id,
version: config_version,
product: product.clone(),
apply_state: 2, apply_error: None,
});
}
}
Err(e) => {
any_failure = true;
crate::dd_debug!(
"RemoteConfigClient: Failed to apply {} config {}: {}",
product,
config_id,
e
);
if let Ok(mut state) = self.state.lock() {
if !config_states_cleared {
state.config_states.clear();
config_states_cleared = true;
}
state.config_states.push(ConfigState {
id: config_id,
version: config_version,
product,
apply_state: 3, apply_error: Some(format!("{e}")),
});
}
continue;
}
}
}
if !any_failure {
self.cached_target_files = new_cache;
}
}
Ok(())
}
fn validate_signed_target_files(
&self,
payload_target_files: &[TargetFile],
payload_targets_signed: &Option<serde_json::Value>,
client_configs: &Option<Vec<String>>,
) -> Result<()> {
for target in payload_target_files {
let exists_in_signed_targets = payload_targets_signed
.as_ref()
.and_then(|targets| targets.get(&target.path))
.is_some();
let exists_in_client_configs = client_configs
.as_ref()
.map(|configs| configs.contains(&target.path))
.unwrap_or(false);
if !exists_in_signed_targets && !exists_in_client_configs {
return Err(anyhow::anyhow!(
"target file {} not exists in client_config and signed targets",
target.path
));
}
}
Ok(())
}
}
trait ProductHandler {
fn process_config(&self, config_json: &str, config: &Arc<Config>) -> Result<()>;
fn product_name(&self) -> &'static str;
}
struct ApmTracingHandler;
impl ProductHandler for ApmTracingHandler {
fn process_config(&self, config_json: &str, config: &Arc<Config>) -> Result<()> {
let tracing_config: ApmTracingConfig = serde_json::from_str(config_json)
.map_err(|e| anyhow::anyhow!("Failed to parse APM tracing config: {}", e))?;
if let Some(rules_value) = tracing_config.lib_config.tracing_sampling_rules {
if !rules_value.is_null() {
let rules_json = serde_json::to_string(&rules_value)
.map_err(|e| anyhow::anyhow!("Failed to serialize sampling rules: {}", e))?;
match config.update_sampling_rules_from_remote(&rules_json, Some(tracing_config.id))
{
Ok(()) => {
crate::dd_debug!(
"RemoteConfigClient: Applied sampling rules from remote config"
);
}
Err(e) => {
crate::dd_debug!(
"RemoteConfigClient: Failed to update sampling rules: {}",
e
);
}
}
} else {
crate::dd_debug!(
"RemoteConfigClient: APM tracing config received but tracing_sampling_rules is null"
);
config.clear_remote_sampling_rules(Some(tracing_config.id));
}
} else {
crate::dd_debug!(
"RemoteConfigClient: APM tracing config received but no tracing_sampling_rules present"
);
}
Ok(())
}
fn product_name(&self) -> &'static str {
"APM_TRACING"
}
}
struct ProductRegistry {
handlers: HashMap<String, Box<dyn ProductHandler + Send + Sync>>,
}
impl ProductRegistry {
fn new() -> Self {
let mut registry = Self {
handlers: HashMap::new(),
};
registry.register(Box::new(ApmTracingHandler));
registry
}
fn register(&mut self, handler: Box<dyn ProductHandler + Send + Sync>) {
self.handlers
.insert(handler.product_name().to_string(), handler);
}
fn get_handler(&self, product: &str) -> Option<&(dyn ProductHandler + Send + Sync)> {
self.handlers.get(product).map(|handler| handler.as_ref())
}
}
fn extract_product_and_id_from_path(path: &str) -> Option<(String, String)> {
let mut components = path
.strip_prefix("datadog/2/")?
.strip_suffix("/config")?
.split("/");
let (product, config_id) = (
components.next()?.to_string(),
components.next()?.to_string(),
);
if components.next().is_some() {
return None;
}
Some((product, config_id))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_client_capabilities() {
let caps = ClientCapabilities::new();
let encoded = caps.encode();
assert!(!encoded.is_empty());
use base64::Engine;
let decoded = base64::engine::general_purpose::STANDARD
.decode(&encoded)
.unwrap();
let mut bytes = [0u8; 8];
let offset = 8 - decoded.len();
bytes[offset..].copy_from_slice(&decoded);
let value = u64::from_be_bytes(bytes);
assert_eq!(value, ClientCapabilities::APM_TRACING_SAMPLE_RULES);
assert_eq!(
value & ClientCapabilities::APM_TRACING_SAMPLE_RULES,
ClientCapabilities::APM_TRACING_SAMPLE_RULES
);
}
#[test]
fn test_request_serialization() {
let state = ClientState {
root_version: 1,
targets_version: 122282776,
config_states: vec![],
has_error: false,
error: None,
backend_client_state: Some("test_backend_state".to_string()),
};
let client_info = ClientInfo {
state: Some(state),
id: "test-client-id".to_string(),
products: vec!["APM_TRACING".to_string()],
is_tracer: true,
client_tracer: Some(ClientTracer {
runtime_id: "test-runtime-id".to_string(),
language: "rust".to_string(),
tracer_version: "0.0.1".to_string(),
service: "test-service".to_string(),
extra_services: vec![],
env: Some("test-env".to_string()),
app_version: Some("1.0.0".to_string()),
tags: vec![],
}),
capabilities: ClientCapabilities::new().encode(),
};
let request = ConfigRequest {
client: client_info,
cached_target_files: Vec::new(),
};
let json = serde_json::to_value(&request).unwrap();
assert!(json.get("client").is_some());
assert_eq!(
json.get("cached_target_files"),
Some(&serde_json::json!([]))
);
let client = &json["client"];
assert_eq!(client["id"], "test-client-id");
assert_eq!(client["products"], serde_json::json!(["APM_TRACING"]));
assert_eq!(client["is_tracer"], true);
let client_tracer = &client["client_tracer"];
assert_eq!(client_tracer["runtime_id"], "test-runtime-id");
assert_eq!(client_tracer["language"], "rust");
assert_eq!(client_tracer["service"], "test-service");
assert_eq!(client_tracer["extra_services"], serde_json::json!([]));
assert_eq!(client_tracer["env"], "test-env");
assert_eq!(client_tracer["app_version"], "1.0.0");
let state = &client["state"];
assert_eq!(state["root_version"], 1);
assert_eq!(state["targets_version"], 122282776);
assert_eq!(state["has_error"], false);
assert_eq!(state["backend_client_state"], "test_backend_state");
let capabilities = &client["capabilities"];
assert!(capabilities.is_string());
assert!(!capabilities.as_str().unwrap().is_empty());
}
#[test]
fn test_request_serialization_with_error() {
let state = ClientState {
root_version: 1,
targets_version: 1,
config_states: vec![],
has_error: true,
error: Some("Test error message".to_string()),
backend_client_state: None,
};
let client_info = ClientInfo {
state: Some(state),
id: "test-client-id".to_string(),
products: vec!["APM_TRACING".to_string()],
is_tracer: true,
client_tracer: Some(ClientTracer {
runtime_id: "test-runtime-id".to_string(),
language: "rust".to_string(),
tracer_version: "0.0.1".to_string(),
service: "test-service".to_string(),
extra_services: vec!["service1".to_string(), "service2".to_string()],
env: None,
app_version: None,
tags: vec![],
}),
capabilities: ClientCapabilities::new().encode(),
};
let request = ConfigRequest {
client: client_info,
cached_target_files: Vec::new(),
};
let json = serde_json::to_value(&request).unwrap();
let state = &json["client"]["state"];
assert_eq!(state["has_error"], true);
assert_eq!(state["error"], "Test error message");
let client_tracer = &json["client"]["client_tracer"];
assert_eq!(
client_tracer["extra_services"],
serde_json::json!(["service1", "service2"])
);
assert!(client_tracer.get("env").is_none());
assert!(client_tracer.get("app_version").is_none());
assert!(state.get("backend_client_state").is_none());
}
#[test]
fn test_apm_tracing_config_parsing() {
let json = r#"{
"id": "42",
"lib_config": {
"tracing_sampling_rules": [
{
"sample_rate": 0.5,
"service": "test-service",
"provenance": "dynamic"
}
]
}
}"#;
let config: ApmTracingConfig = serde_json::from_str(json).unwrap();
assert!(config.lib_config.tracing_sampling_rules.is_some());
let rules_value = config.lib_config.tracing_sampling_rules.unwrap();
let rules: Vec<serde_json::Value> = serde_json::from_value(rules_value).unwrap();
assert_eq!(rules.len(), 1);
assert_eq!(rules[0]["sample_rate"], 0.5);
assert_eq!(rules[0]["service"], "test-service");
assert_eq!(rules[0]["provenance"], "dynamic");
}
#[test]
fn test_apm_tracing_config_full_schema() {
let json = r#"{
"id": "42",
"lib_config": {
"tracing_sampling_rules": [
{
"sample_rate": 0.3,
"service": "web-api",
"name": "GET /users/*",
"resource": "UserController.list",
"tags": {
"environment": "production",
"region": "us-east-1"
},
"provenance": "customer"
},
{
"sample_rate": 1.0,
"service": "auth-service",
"provenance": "dynamic"
}
]
}
}"#;
let config: ApmTracingConfig = serde_json::from_str(json).unwrap();
assert!(config.lib_config.tracing_sampling_rules.is_some());
let rules_value = config.lib_config.tracing_sampling_rules.unwrap();
let rules: Vec<serde_json::Value> = serde_json::from_value(rules_value).unwrap();
assert_eq!(rules.len(), 2);
assert_eq!(rules[0]["sample_rate"], 0.3);
assert_eq!(rules[0]["service"], "web-api");
assert_eq!(rules[0]["name"], "GET /users/*");
assert_eq!(rules[0]["resource"], "UserController.list");
assert_eq!(rules[0]["tags"].as_object().unwrap().len(), 2);
assert_eq!(rules[0]["tags"]["environment"], "production");
assert_eq!(rules[0]["tags"]["region"], "us-east-1");
assert_eq!(rules[0]["provenance"], "customer");
assert_eq!(rules[1]["sample_rate"], 1.0);
assert_eq!(rules[1]["service"], "auth-service");
assert_eq!(rules[1]["provenance"], "dynamic");
}
#[test]
fn test_apm_tracing_config_empty() {
let json = r#"{}"#;
let config: LibConfig = serde_json::from_str(json).unwrap();
assert!(config.tracing_sampling_rules.is_none());
}
#[test]
fn test_cached_target_files() {
let cached_file = CachedTargetFile {
path: "datadog/2/APM_TRACING/config123/config".to_string(),
length: 256,
hashes: vec![Hash {
algorithm: "sha256".to_string(),
hash: "abc123def456".to_string(),
}],
};
let request = ConfigRequest {
client: ClientInfo {
state: None,
id: "test-id".to_string(),
products: vec!["APM_TRACING".to_string()],
is_tracer: true,
client_tracer: None,
capabilities: ClientCapabilities::new().encode(),
},
cached_target_files: vec![cached_file.clone()],
};
let json = serde_json::to_value(&request).unwrap();
let cached = &json["cached_target_files"][0];
assert_eq!(cached["path"], "datadog/2/APM_TRACING/config123/config");
assert_eq!(cached["length"], 256);
assert_eq!(cached["hashes"][0]["algorithm"], "sha256");
assert_eq!(cached["hashes"][0]["hash"], "abc123def456");
}
#[test]
fn test_validate_signed_target_files() {
let config = Arc::new(Config::builder().build());
let client = RemoteConfigClient::new(config).unwrap();
let target_files = vec![TargetFile {
path: "datadog/2/APM_TRACING/config123/config".to_string(),
raw: "base64_encoded_content".to_string(),
}];
let signed_targets = serde_json::json!({
"datadog/2/APM_TRACING/config123/config": {
"custom": {"id": "config123", "v": 1}
}
});
let client_configs = None;
assert!(client
.validate_signed_target_files(&target_files, &Some(signed_targets), &client_configs)
.is_ok());
let target_files = vec![TargetFile {
path: "datadog/2/APM_TRACING/config456/config".to_string(),
raw: "base64_encoded_content".to_string(),
}];
let signed_targets = None;
let client_configs = Some(vec!["datadog/2/APM_TRACING/config456/config".to_string()]);
assert!(client
.validate_signed_target_files(&target_files, &signed_targets, &client_configs)
.is_ok());
let target_files = vec![TargetFile {
path: "datadog/2/APM_TRACING/config789/config".to_string(),
raw: "base64_encoded_content".to_string(),
}];
let signed_targets = serde_json::json!({
"datadog/2/APM_TRACING/config789/config": {
"custom": {"id": "config789", "v": 1}
}
});
let client_configs = Some(vec!["datadog/2/APM_TRACING/config789/config".to_string()]);
assert!(client
.validate_signed_target_files(&target_files, &Some(signed_targets), &client_configs)
.is_ok());
let target_files = vec![TargetFile {
path: "datadog/2/APM_TRACING/invalid_config/config".to_string(),
raw: "base64_encoded_content".to_string(),
}];
let signed_targets = serde_json::json!({
"datadog/2/APM_TRACING/other_config/config": {
"custom": {"id": "other_config", "v": 1}
}
});
let client_configs = Some(vec![
"datadog/2/APM_TRACING/another_config/config".to_string()
]);
let result = client.validate_signed_target_files(
&target_files,
&Some(signed_targets),
&client_configs,
);
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("target file datadog/2/APM_TRACING/invalid_config/config not exists in client_config and signed targets"));
let target_files = vec![];
let signed_targets = None;
let client_configs = None;
assert!(client
.validate_signed_target_files(&target_files, &signed_targets, &client_configs)
.is_ok());
}
#[test]
fn test_parse_example_response() {
let config_response = ConfigResponse {
roots: None,
targets: Some("eyJzaWduZWQiOiB7Il90eXBlIjogInRhcmdldHMiLCAiY3VzdG9tIjogeyJvcGFxdWVfYmFja2VuZF9zdGF0ZSI6ICJleUpmb29JT2lBaVltRm9JbjA9In0sICJleHBpcmVzIjogIjIwMjQtMTItMzFUMjM6NTk6NTlaIiwgInNwZWNfdmVyc2lvbiI6ICIxLjAuMCIsICJ0YXJnZXRzIjoge30sICJ2ZXJzaW9uIjogM319Cg==".to_string()), target_files: Some(vec![
TargetFile {
path: "datadog/2/APM_TRACING/apm-tracing-sampling/config".to_string(),
raw: "eyJpZCI6ICI0MiIsICJsaWJfY29uZmlnIjogeyJ0cmFjaW5nX3NhbXBsaW5nX3J1bGVzIjogW3sic2FtcGxlX3JhdGUiOiAwLjUsICJzZXJ2aWNlIjogInRlc3Qtc2VydmljZSJ9XX19".to_string(), },
]),
client_configs: Some(vec![
"datadog/2/APM_TRACING/apm-tracing-sampling/config".to_string(),
]),
};
let config = Arc::new(Config::builder().build());
let mut client = RemoteConfigClient::new(config).unwrap();
let result = client.process_response(config_response);
assert!(result.is_ok(), "process_response should succeed");
let state = client.state.lock().unwrap();
assert_eq!(state.targets_version, 3);
assert_eq!(
state.backend_client_state,
Some("eyJfooIOiAiYmFoIn0=".to_string())
);
assert!(!state.has_error);
assert_eq!(state.config_states.len(), 1);
let config_state = &state.config_states[0];
assert_eq!(config_state.product, "APM_TRACING");
assert_eq!(config_state.apply_state, 2);
let cached_files = client.cached_target_files;
assert_eq!(cached_files.len(), 1);
assert_eq!(
cached_files[0].path,
"datadog/2/APM_TRACING/apm-tracing-sampling/config"
);
assert_eq!(cached_files[0].length, 140);
assert_eq!(cached_files[0].hashes.len(), 1);
assert_eq!(cached_files[0].hashes[0].algorithm, "sha256");
let config = client.config;
let rules = config.trace_sampling_rules();
assert_eq!(rules.len(), 1);
assert_eq!(rules[0].sample_rate, 0.5);
assert_eq!(rules[0].service, Some("test-service".to_string()));
}
#[test]
fn test_parse_multi_product_response() {
let config_response = ConfigResponse {
roots: None,
targets: Some("eyJzaWduZWQiOiB7Il90eXBlIjogInRhcmdldHMiLCAiY3VzdG9tIjogeyJvcGFxdWVfYmFja2VuZF9zdGF0ZSI6ICJleUpmb29JT2lBaVltRm9JbjA9In0sICJleHBpcmVzIjogIjIwMjQtMTItMzFUMjM6NTk6NTlaIiwgInNwZWNfdmVyc2lvbiI6ICIxLjAuMCIsICJ0YXJnZXRzIjoge30sICJ2ZXJzaW9uIjogMn19Cg==".to_string()), target_files: Some(vec![
TargetFile {
path: "datadog/2/ASM_FEATURES/ASM_FEATURES-base/config".to_string(),
raw: "eyJhc20tZmVhdHVyZXMiOiB7ImVuYWJsZWQiOiB0cnVlfX0=".to_string(), },
TargetFile {
path: "datadog/2/LIVE_DEBUGGING/LIVE_DEBUGGING-base/config".to_string(),
raw: "eyJsaXZlLWRlYnVnZ2luZyI6IHsiZW5hYmxlZCI6IGZhbHNlfX0=".to_string(), },
]),
client_configs: Some(vec![
"datadog/2/ASM_FEATURES/ASM_FEATURES-base/config".to_string(),
"datadog/2/LIVE_DEBUGGING/LIVE_DEBUGGING-base/config".to_string(),
]),
};
let config = Arc::new(Config::builder().build());
let mut client = RemoteConfigClient::new(config).unwrap();
let result = client.process_response(config_response);
assert!(result.is_ok(), "process_response should succeed");
let state = client.state.lock().unwrap();
assert_eq!(state.targets_version, 2);
assert_eq!(
state.backend_client_state,
Some("eyJfooIOiAiYmFoIn0=".to_string())
);
assert!(!state.has_error);
assert_eq!(state.config_states.len(), 0);
let cached_files = client.cached_target_files;
assert_eq!(cached_files.len(), 0);
}
#[test]
fn test_config_update_from_remote() {
let config = Arc::new(Config::builder().build());
let mut client = RemoteConfigClient::new(config).unwrap();
let config_response = ConfigResponse {
roots: None,
targets: Some("eyJzaWduZWQiOiB7Il90eXBlIjogInRhcmdldHMiLCAiY3VzdG9tIjogeyJvcGFxdWVfYmFja2VuZF9zdGF0ZSI6ICJleUpmb29JT2lBaVltRm9JbjA9In0sICJleHBpcmVzIjogIjIwMjQtMTItMzFUMjM6NTk6NTlaIiwgInNwZWNfdmVyc2lvbiI6ICIxLjAuMCIsICJ0YXJnZXRzIjoge30sICJ2ZXJzaW9uIjogM319Cg==".to_string()),
target_files: Some(vec![
TargetFile {
path: "datadog/2/APM_TRACING/test-config/config".to_string(),
raw: "eyJpZCI6ICI0MiIsICJsaWJfY29uZmlnIjogeyJ0cmFjaW5nX3NhbXBsaW5nX3J1bGVzIjogW3sic2FtcGxlX3JhdGUiOiAwLjUsICJzZXJ2aWNlIjogInRlc3Qtc2VydmljZSJ9XX19".to_string(),
},
]),
client_configs: Some(vec![
"datadog/2/APM_TRACING/test-config/config".to_string(),
]),
};
let result = client.process_response(config_response);
assert!(result.is_ok(), "process_response should succeed");
let config = client.config;
let rules = config.trace_sampling_rules();
assert_eq!(rules.len(), 1);
assert_eq!(rules[0].sample_rate, 0.5);
assert_eq!(rules[0].service, Some("test-service".to_string()));
}
#[test]
fn test_tuf_targets_parsing() {
let tuf_targets_json = r#"{
"signatures": [
{
"keyid": "5c4ece41241a1bb513f6e3e5df74ab7d5183dfffbd71bfd43127920d880569fd",
"sig": "4dd483db8b4aff81a9afd2ed4eaeb23fe3aca9a148a7a8942e24e8c5ef911e2692f94492b882727b257dacfbf6bcea09d6e26ea28ac145fcb4254ea046be3b03"
}
],
"signed": {
"_type": "targets",
"custom": {
"opaque_backend_state": "eyJ2ZXJzaW9uIjoxLCJzdGF0ZSI6eyJmaWxlX2hhc2hlcyI6WyJGZXJOT1FyMStmTThKWk9TY0crZllucnhXMWpKN0w0ZlB5aGtxUWVCT3dJPSIsInd1aW9BVm1Qcy9oNEpXMDh1dnI1bi9meERLQ3lKdG1sQmRjaDNOcFdLZDg9IiwiOGFDYVJFc3hIV3R3SFNFWm5SV0pJYmtENXVBNUtETENoZG8vZ0RNdnJJMD0iXX19"
},
"expires": "2022-09-22T09:01:04Z",
"spec_version": "1.0.0",
"targets": {
"datadog/2/APM_SAMPLING/dynamic_rates/config": {
"custom": {
"v": 27423
},
"hashes": {
"sha256": "c2e8a801598fb3f878256d3cbafaf99ff7f10ca0b226d9a505d721dcda5629df"
},
"length": 58409
},
"employee/ASM_DD/1.recommended.json/config": {
"custom": {
"v": 1
},
"hashes": {
"sha256": "15eacd390af5f9f33c259392706f9f627af15b58c9ecbe1f3f2864a907813b02"
},
"length": 235228
},
"employee/CWS_DD/4.default.policy/config": {
"custom": {
"v": 1
},
"hashes": {
"sha256": "f1a09a444b311d6b701d21199d158921b903e6e0392832c285da3f80332fac8d"
},
"length": 34777
}
},
"version": 23755701
}
}"#;
let targets: SignedTargets = serde_json::from_str(tuf_targets_json)
.expect("Should successfully parse TUF targets JSON");
assert!(targets.signatures.is_some());
let signatures = targets.signatures.unwrap();
assert_eq!(signatures.len(), 1);
assert_eq!(targets.signed.target_type, "targets");
assert_eq!(targets.signed.expires, "2022-09-22T09:01:04Z");
assert_eq!(targets.signed.spec_version, "1.0.0");
assert_eq!(targets.signed.version, 23755701);
assert!(targets.signed.custom.is_some());
let custom = targets.signed.custom.unwrap();
let backend_state = custom
.get("opaque_backend_state")
.and_then(|v| v.as_str())
.expect("Should have opaque_backend_state");
assert_eq!(backend_state, "eyJ2ZXJzaW9uIjoxLCJzdGF0ZSI6eyJmaWxlX2hhc2hlcyI6WyJGZXJOT1FyMStmTThKWk9TY0crZllucnhXMWpKN0w0ZlB5aGtxUWVCT3dJPSIsInd1aW9BVm1Qcy9oNEpXMDh1dnI1bi9meERLQ3lKdG1sQmRjaDNOcFdLZDg9IiwiOGFDYVJFc3hIV3R3SFNFWm5SV0pJYmtENXVBNUtETENoZG8vZ0RNdnJJMD0iXX19");
assert_eq!(targets.signed.targets.len(), 3);
let apm_sampling = targets
.signed
.targets
.get("datadog/2/APM_SAMPLING/dynamic_rates/config")
.expect("Should have APM_SAMPLING target");
assert_eq!(apm_sampling.length, 58409);
assert_eq!(
apm_sampling.hashes.get("sha256").unwrap(),
"c2e8a801598fb3f878256d3cbafaf99ff7f10ca0b226d9a505d721dcda5629df"
);
let apm_custom = apm_sampling.custom.as_ref().unwrap();
assert_eq!(apm_custom.get("v").unwrap().as_u64().unwrap(), 27423);
let asm_dd = targets
.signed
.targets
.get("employee/ASM_DD/1.recommended.json/config")
.expect("Should have ASM_DD target");
assert_eq!(asm_dd.length, 235228);
assert_eq!(
asm_dd.hashes.get("sha256").unwrap(),
"15eacd390af5f9f33c259392706f9f627af15b58c9ecbe1f3f2864a907813b02"
);
let asm_custom = asm_dd.custom.as_ref().unwrap();
assert_eq!(asm_custom.get("v").unwrap().as_u64().unwrap(), 1);
let cws_dd = targets
.signed
.targets
.get("employee/CWS_DD/4.default.policy/config")
.expect("Should have CWS_DD target");
assert_eq!(cws_dd.length, 34777);
assert_eq!(
cws_dd.hashes.get("sha256").unwrap(),
"f1a09a444b311d6b701d21199d158921b903e6e0392832c285da3f80332fac8d"
);
let cws_custom = cws_dd.custom.as_ref().unwrap();
assert_eq!(cws_custom.get("v").unwrap().as_u64().unwrap(), 1);
}
#[test]
fn test_extract_product_from_path() {
assert_eq!(
extract_product_and_id_from_path("datadog/2/APM_TRACING/config123/config")
.map(|(p, _)| p),
Some("APM_TRACING".to_string())
);
assert_eq!(
extract_product_and_id_from_path("datadog/2/LIVE_DEBUGGING/LIVE_DEBUGGING-base/config")
.map(|(p, _)| p),
Some("LIVE_DEBUGGING".to_string())
);
assert_eq!(
extract_product_and_id_from_path("datadog/2/AGENT_CONFIG/dynamic_rates/config")
.map(|(p, _)| p),
Some("AGENT_CONFIG".to_string())
);
assert_eq!(
extract_product_and_id_from_path("invalid/path").map(|(p, _)| p),
None
);
assert_eq!(
extract_product_and_id_from_path("datadog/1/APM_TRACING/config").map(|(p, _)| p),
None
);
assert_eq!(
extract_product_and_id_from_path("datadog/APM_TRACING/config").map(|(p, _)| p),
None
);
assert_eq!(extract_product_and_id_from_path("").map(|(p, _)| p), None);
}
#[test]
fn test_extract_config_id_from_path() {
assert_eq!(
extract_product_and_id_from_path("datadog/2/APM_TRACING/config123/config")
.map(|(_, id)| id),
Some("config123".to_string())
);
assert_eq!(
extract_product_and_id_from_path("datadog/2/ASM_FEATURES/ASM_FEATURES-base/config")
.map(|(_, id)| id),
Some("ASM_FEATURES-base".to_string())
);
assert_eq!(
extract_product_and_id_from_path("invalid/path").map(|(_, id)| id),
None
);
assert_eq!(
extract_product_and_id_from_path("datadog/2/APM_TRACING/config").map(|(_, id)| id),
None
); assert_eq!(
extract_product_and_id_from_path("datadog/2/APM_TRACING").map(|(_, id)| id),
None
); assert_eq!(extract_product_and_id_from_path("").map(|(_, id)| id), None);
}
#[test]
fn test_product_registry() {
let registry = ProductRegistry::new();
assert!(registry.get_handler("APM_TRACING").is_some());
assert!(registry.get_handler("UNKNOWN_PRODUCT").is_none());
}
#[test]
fn test_apm_tracing_handler() {
let handler = ApmTracingHandler;
assert_eq!(handler.product_name(), "APM_TRACING");
let config = Arc::new(Config::builder().build());
let config_json = r#"{"id": "42", "lib_config": {"tracing_sampling_rules": [{"sample_rate": 0.5, "service": "test"}]}}"#;
let result = handler.process_config(config_json, &config);
assert!(result.is_ok());
let invalid_json = "invalid json";
let result = handler.process_config(invalid_json, &config);
assert!(result.is_err());
}
#[test]
fn test_config_states_cleared_between_processing_cycles() {
let config = Arc::new(Config::builder().build());
let mut client = RemoteConfigClient::new(config).unwrap();
let config_response_1 = ConfigResponse {
roots: None,
targets: Some("eyJzaWduZWQiOiB7Il90eXBlIjogInRhcmdldHMiLCAiY3VzdG9tIjogeyJvcGFxdWVfYmFja2VuZF9zdGF0ZSI6ICJleUpmb29JT2lBaVltRm9JbjA9In0sICJleHBpcmVzIjogIjIwMjQtMTItMzFUMjM6NTk6NTlaIiwgInNwZWNfdmVyc2lvbiI6ICIxLjAuMCIsICJ0YXJnZXRzIjoge30sICJ2ZXJzaW9uIjogMX19Cg==".to_string()),
target_files: Some(vec![
TargetFile {
path: "datadog/2/APM_TRACING/config1/config".to_string(),
raw: "eyJpZCI6ICI0MiIsICJsaWJfY29uZmlnIjogeyJ0cmFjaW5nX3NhbXBsaW5nX3J1bGVzIjogW3sic2FtcGxlX3JhdGUiOiAwLjUsICJzZXJ2aWNlIjogInRlc3Qtc2VydmljZS0xIn1dfX0=".to_string(),
},
]),
client_configs: Some(vec![
"datadog/2/APM_TRACING/config1/config".to_string(),
]),
};
let result = client.process_response(config_response_1);
assert!(result.is_ok(), "First process_response should succeed");
{
let state = client.state.lock().unwrap();
assert_eq!(state.config_states.len(), 1);
assert_eq!(state.config_states[0].id, "config1");
assert_eq!(state.config_states[0].apply_state, 2); }
let config_response_2 = ConfigResponse {
roots: None,
targets: Some("eyJzaWduZWQiOiB7Il90eXBlIjogInRhcmdldHMiLCAiY3VzdG9tIjogeyJvcGFxdWVfYmFja2VuZF9zdGF0ZSI6ICJleUpmb29JT2lBaVltRm9JbjA9In0sICJleHBpcmVzIjogIjIwMjQtMTItMzFUMjM6NTk6NTlaIiwgInNwZWNfdmVyc2lvbiI6ICIxLjAuMCIsICJ0YXJnZXRzIjoge30sICJ2ZXJzaW9uIjogMn19Cg==".to_string()),
target_files: Some(vec![
TargetFile {
path: "datadog/2/APM_TRACING/config2/config".to_string(),
raw: "eyJpZCI6ICI0MiIsICJsaWJfY29uZmlnIjogeyJpZCI6IjQyIiwgInRyYWNpbmdfc2FtcGxpbmdfcnVsZXMiOiBbeyJzYW1wbGVfcmF0ZSI6IDAuNzUsICJzZXJ2aWNlIjogInRlc3Qtc2VydmljZS0yIn1dfX0=".to_string(),
},
TargetFile {
path: "datadog/2/APM_TRACING/config3/config".to_string(),
raw: "eyJpZCI6ICI0MiIsICJsaWJfY29uZmlnIjogeyJpZCI6IjQyIiwgInRyYWNpbmdfc2FtcGxpbmdfcnVsZXMiOiBbeyJzYW1wbGVfcmF0ZSI6IDAuMjUsICJzZXJ2aWNlIjogInRlc3Qtc2VydmljZS0yIn1dfX0=".to_string(),
},
]),
client_configs: Some(vec![
"datadog/2/APM_TRACING/config2/config".to_string(),
"datadog/2/APM_TRACING/config3/config".to_string(),
]),
};
let result = client.process_response(config_response_2);
assert!(result.is_ok(), "Second process_response should succeed");
{
let state = client.state.lock().unwrap();
assert_eq!(state.config_states.len(), 2);
let config_ids: Vec<String> =
state.config_states.iter().map(|cs| cs.id.clone()).collect();
assert!(config_ids.contains(&"config2".to_string()));
assert!(config_ids.contains(&"config3".to_string()));
assert!(!config_ids.contains(&"config1".to_string()));
for config_state in &state.config_states {
assert_eq!(config_state.apply_state, 2); assert_eq!(config_state.product, "APM_TRACING");
}
}
let config_response_3 = ConfigResponse {
roots: None,
targets: Some("eyJzaWduZWQiOiB7Il90eXBlIjogInRhcmdldHMiLCAiY3VzdG9tIjogeyJvcGFxdWVfYmFja2VuZF9zdGF0ZSI6ICJleUpmb29JT2lBaVltRm9JbjA9In0sICJleHBpcmVzIjogIjIwMjQtMTItMzFUMjM6NTk6NTlaIiwgInNwZWNfdmVyc2lvbiI6ICIxLjAuMCIsICJ0YXJnZXRzIjoge30sICJ2ZXJzaW9uIjogM319Cg==".to_string()),
target_files: Some(vec![]), client_configs: Some(vec![]),
};
let result = client.process_response(config_response_3);
assert!(result.is_ok(), "Third process_response should succeed");
{
let state = client.state.lock().unwrap();
assert_eq!(state.config_states.len(), 2);
let config_ids: Vec<String> =
state.config_states.iter().map(|cs| cs.id.clone()).collect();
assert!(config_ids.contains(&"config2".to_string()));
assert!(config_ids.contains(&"config3".to_string()));
}
}
#[test]
fn test_config_states_cleared_on_error_configs() {
let config = Arc::new(Config::builder().build());
let mut client = RemoteConfigClient::new(config).unwrap();
let config_response_1 = ConfigResponse {
roots: None,
targets: Some("eyJzaWduZWQiOiB7Il90eXBlIjogInRhcmdldHMiLCAiY3VzdG9tIjogeyJvcGFxdWVfYmFja2VuZF9zdGF0ZSI6ICJleUpmb29JT2lBaVltRm9JbjA9In0sICJleHBpcmVzIjogIjIwMjQtMTItMzFUMjM6NTk6NTlaIiwgInNwZWNfdmVyc2lvbiI6ICIxLjAuMCIsICJ0YXJnZXRzIjoge30sICJ2ZXJzaW9uIjogMX19Cg==".to_string()),
target_files: Some(vec![
TargetFile {
path: "datadog/2/APM_TRACING/good_config/config".to_string(),
raw: "eyJpZCI6ICI0MiIsICJsaWJfY29uZmlnIjogeyJ0cmFjaW5nX3NhbXBsaW5nX3J1bGVzIjogW3sic2FtcGxlX3JhdGUiOiAwLjUsICJzZXJ2aWNlIjogInRlc3Qtc2VydmljZSJ9XX19".to_string(),
},
]),
client_configs: Some(vec![
"datadog/2/APM_TRACING/good_config/config".to_string(),
]),
};
let result = client.process_response(config_response_1);
assert!(result.is_ok(), "First process_response should succeed");
{
let state = client.state.lock().unwrap();
assert_eq!(state.config_states.len(), 1);
assert_eq!(state.config_states[0].id, "good_config");
assert_eq!(state.config_states[0].apply_state, 2); }
let config_response_2 = ConfigResponse {
roots: None,
targets: Some("eyJzaWduZWQiOiB7Il90eXBlIjogInRhcmdldHMiLCAiY3VzdG9tIjogeyJvcGFxdWVfYmFja2VuZF9zdGF0ZSI6ICJleUpmb29JT2lBaVltRm9JbjA9In0sICJleHBpcmVzIjogIjIwMjQtMTItMzFUMjM6NTk6NTlaIiwgInNwZWNfdmVyc2lvbiI6ICIxLjAuMCIsICJ0YXJnZXRzIjoge30sICJ2ZXJzaW9uIjogMn19Cg==".to_string()),
target_files: Some(vec![
TargetFile {
path: "datadog/2/APM_TRACING/bad_config/config".to_string(),
raw: "aW52YWxpZCBqc29u".to_string(), },
]),
client_configs: Some(vec![
"datadog/2/APM_TRACING/bad_config/config".to_string(),
]),
};
let result = client.process_response(config_response_2);
assert!(
result.is_ok(),
"Second process_response should succeed (even with config errors)"
);
{
let state = client.state.lock().unwrap();
assert_eq!(state.config_states.len(), 1); assert_eq!(state.config_states[0].id, "bad_config");
assert_eq!(state.config_states[0].apply_state, 3); assert!(state.config_states[0].apply_error.is_some());
assert_ne!(state.config_states[0].id, "good_config");
}
}
#[test]
fn test_tuf_targets_integration_with_remote_config() {
let config = Arc::new(Config::builder().build());
let mut client = RemoteConfigClient::new(config).unwrap();
let tuf_targets_json = r#"{
"signatures": [
{
"keyid": "5c4ece41241a1bb513f6e3e5df74ab7d5183dfffbd71bfd43127920d880569fd",
"sig": "4dd483db8b4aff81a9afd2ed4eaeb23fe3aca9a148a7a8942e24e8c5ef911e2692f94492b882727b257dacfbf6bcea09d6e26ea28ac145fcb4254ea046be3b03"
}
],
"signed": {
"_type": "targets",
"custom": {
"opaque_backend_state": "eyJ2ZXJzaW9uIjoxLCJzdGF0ZSI6eyJmaWxlX2hhc2hlcyI6WyJGZXJOT1FyMStmTThKWk9TY0crZllucnhXMWpKN0w0ZlB5aGtxUWVCT3dJPSIsInd1aW9BVm1Qcy9oNEpXMDh1dnI1bi9meERLQ3lKdG1sQmRjaDNOcFdLZDg9IiwiOGFDYVJFc3hIV3R3SFNFWm5SV0pJYmtENXVBNUtETENoZG8vZ0RNdnJJMD0iXX19"
},
"expires": "2022-09-22T09:01:04Z",
"spec_version": "1.0.0",
"targets": {
"datadog/2/APM_TRACING/test-sampling/config": {
"custom": {
"v": 100
},
"hashes": {
"sha256": "c2e8a801598fb3f878256d3cbafaf99ff7f10ca0b226d9a505d721dcda5629df"
},
"length": 58409
}
},
"version": 23755701
}
}"#;
use base64::Engine;
let encoded_targets =
base64::engine::general_purpose::STANDARD.encode(tuf_targets_json.as_bytes());
let config_response = ConfigResponse {
roots: None,
targets: Some(encoded_targets),
target_files: Some(vec![
TargetFile {
path: "datadog/2/APM_TRACING/test-sampling/config".to_string(),
raw: "eyJpZCI6ICI0MiIsICJsaWJfY29uZmlnIjogeyJ0cmFjaW5nX3NhbXBsaW5nX3J1bGVzIjogW3sic2FtcGxlX3JhdGUiOiAwLjc1LCAic2VydmljZSI6ICJ0ZXN0LWFwcC1zZXJ2aWNlIn1dfX0=".to_string(), },
]),
client_configs: Some(vec![
"datadog/2/APM_TRACING/test-sampling/config".to_string()
]),
};
let result = client.process_response(config_response);
assert!(
result.is_ok(),
"process_response should succeed: {result:?}"
);
let state = client.state.lock().unwrap();
assert_eq!(state.targets_version, 23755701);
assert_eq!(
state.backend_client_state,
Some("eyJ2ZXJzaW9uIjoxLCJzdGF0ZSI6eyJmaWxlX2hhc2hlcyI6WyJGZXJOT1FyMStmTThKWk9TY0crZllucnhXMWpKN0w0ZlB5aGtxUWVCT3dJPSIsInd1aW9BVm1Qcy9oNEpXMDh1dnI1bi9meERLQ3lKdG1sQmRjaDNOcFdLZDg9IiwiOGFDYVJFc3hIV3R3SFNFWm5SV0pJYmtENXVBNUtETENoZG8vZ0RNdnJJMD0iXX19".to_string())
);
assert_eq!(state.config_states.len(), 1);
let config_state = &state.config_states[0];
assert!(config_state.id == "test-sampling" || config_state.id == "apm-tracing-sampling");
assert_eq!(config_state.version, 100); assert_eq!(config_state.product, "APM_TRACING");
let config = client.config;
let rules = config.trace_sampling_rules();
assert_eq!(rules.len(), 1);
assert_eq!(rules[0].sample_rate, 0.75);
assert_eq!(rules[0].service, Some("test-app-service".to_string()));
}
#[test]
fn test_deserialize_tracing_sampling_rules_null() {
let config_json = r#"{"id": "42", "lib_config": {"tracing_sampling_rules": null}}"#;
let tracing_config: ApmTracingConfig =
serde_json::from_str(config_json).expect("Json should be parsed");
assert!(tracing_config.lib_config.tracing_sampling_rules.is_some());
assert!(tracing_config
.lib_config
.tracing_sampling_rules
.unwrap()
.is_null());
}
#[test]
fn test_deserialize_tracing_sampling_rules_missing() {
let config_json = r#"{"id": "42", "lib_config": {}}"#;
let tracing_config: ApmTracingConfig =
serde_json::from_str(config_json).expect("Json should be parsed");
assert!(tracing_config.lib_config.tracing_sampling_rules.is_none());
}
}