use anyhow::Context;
use cargo_metadata::MetadataCommand;
use http_body_util::BodyExt;
use hyper::body::Incoming;
use hyper::{Request, Response, Uri};
use libdd_common::http_common;
use std::collections::HashMap;
use std::fmt::Write;
use std::path::Path;
use std::str::FromStr;
use std::time::Duration;
const TEST_AGENT_IMAGE_NAME: &str = "ghcr.io/datadog/dd-apm-test-agent/ddapm-test-agent";
const TEST_AGENT_IMAGE_TAG: &str = "v1.39.0";
const TEST_AGENT_READY_MSG: &str =
"INFO:ddapm_test_agent.agent:Trace request stall seconds setting set to 0.0.";
const TRACE_AGENT_API_PORT: u16 = 8126;
const OTEL_HTTP_PORT: u16 = 4318;
const OTEL_PROTO_PORT: u16 = 4317;
const SAMPLE_RATE_QUERY_PARAM_KEY: &str = "agent_sample_rate_by_service";
const SESSION_TEST_TOKEN_QUERY_PARAM_KEY: &str = "test_session_token";
const SESSION_START_ENDPOINT: &str = "test/session/start";
const SESSION_ASSERT_SNAPSHOT: &str = "test/session/snapshot";
const SET_REMOTE_CONFIG_RESPONSE_PATH_ENDPOINT: &str = "test/session/responses/config/path";
struct DatadogAgentContainerBuilder {
mounts: Vec<(String, String)>,
env_vars: Vec<(String, String)>,
trace_agent_port: u16,
otlp_http_port: u16,
otlp_proto_port: u16,
}
struct DatadogTestAgentContainer {
container_id: String,
trace_agent_port: u16,
otlp_http_port: u16,
otlp_proto_port: u16,
}
fn run_command(c: &mut std::process::Command) -> anyhow::Result<std::process::Output> {
let output = c.output()?;
if !output.status.success() {
anyhow::bail!(
"Running command failed: {}",
String::from_utf8_lossy(&output.stderr)
)
}
Ok(output)
}
impl DatadogTestAgentContainer {
fn stderr(&self) -> anyhow::Result<String> {
use std::process::*;
let output = run_command(Command::new("docker").arg("logs").arg(&self.container_id))
.context("docker logs")?;
Ok(String::from_utf8(output.stderr)?)
}
fn wait_ready(&self) -> anyhow::Result<()> {
for _ in 0..100 {
if self
.stderr()
.context("reading container logs")?
.contains(TEST_AGENT_READY_MSG)
{
return Ok(());
}
std::thread::sleep(Duration::from_millis(50));
}
anyhow::bail!("waiting for test container timed out")
}
fn host_port(&self, container_port: u16) -> anyhow::Result<String> {
use std::process::*;
let output = run_command(
Command::new("docker")
.args(["inspect", "--format"])
.arg(format!(
r##"{{{{(index (index .NetworkSettings.Ports "{}/tcp") 0).HostPort}}}}"##,
container_port
))
.arg(&self.container_id),
)
.context("docker inspect mapped host port")?;
Ok(String::from_utf8(output.stdout)?.trim().to_owned())
}
fn trace_agent_uri(&self) -> anyhow::Result<String> {
Ok(format!(
"http://localhost:{}",
self.host_port(self.trace_agent_port)?
))
}
}
impl Drop for DatadogTestAgentContainer {
fn drop(&mut self) {
use std::process::*;
if let Err(e) = (|| -> anyhow::Result<()> {
run_command(Command::new("docker").arg("stop").arg(&self.container_id))
.context("docker stop container")?;
Ok(())
})() {
eprintln!("error stopping test agent container: {e}");
};
}
}
impl DatadogAgentContainerBuilder {
fn start(&self) -> anyhow::Result<DatadogTestAgentContainer> {
use std::process::*;
let mounts = self.mounts.iter().flat_map(|(host_path, container_path)| {
["-v".to_owned(), format!("{host_path}:{container_path}")]
});
let envs = self
.env_vars
.iter()
.flat_map(|(e, v)| ["-e".to_owned(), format!("{e}={v}")]);
let output = run_command(
Command::new("docker")
.args(["run", "--rm", "-d"])
.args(mounts)
.args(envs)
.args(["-p".to_owned(), format!("{}", self.trace_agent_port)])
.args(["-p".to_owned(), format!("{}", self.otlp_http_port)])
.args(["-p".to_owned(), format!("{}", self.otlp_proto_port)])
.arg(format!("{TEST_AGENT_IMAGE_NAME}:{TEST_AGENT_IMAGE_TAG}",)),
)
.context("docker run container")?;
let container_id = String::from_utf8(output.stdout)?.trim().to_owned();
let container = DatadogTestAgentContainer {
container_id,
trace_agent_port: self.trace_agent_port,
otlp_http_port: self.otlp_http_port,
otlp_proto_port: self.otlp_proto_port,
};
container.wait_ready()?;
Ok(container)
}
fn new(relative_snapshot_path: Option<&str>, absolute_socket_path: Option<&str>) -> Self {
let mut env_vars = Vec::new();
let mut mounts = Vec::new();
if let Some(absolute_socket_path) = absolute_socket_path {
env_vars.push((
"DD_APM_RECEIVER_SOCKET".to_string(),
"/tmp/ddsockets/apm.socket".to_owned(),
));
mounts.push((absolute_socket_path.to_owned(), "/tmp/ddsockets".to_owned()));
}
if let Some(relative_snapshot_path) = relative_snapshot_path {
mounts.push((
Self::calculate_volume_absolute_path(relative_snapshot_path),
"/snapshots".to_owned(),
));
}
DatadogAgentContainerBuilder {
mounts,
env_vars,
trace_agent_port: TRACE_AGENT_API_PORT,
otlp_http_port: OTEL_HTTP_PORT,
otlp_proto_port: OTEL_PROTO_PORT,
}
}
pub fn with_env(mut self, key: &str, value: &str) -> Self {
self.env_vars.push((key.to_string(), value.to_string()));
self
}
fn calculate_volume_absolute_path(relative_snapshot_path: &str) -> String {
let metadata = MetadataCommand::new()
.exec()
.expect("Failed to fetch metadata");
let project_root_dir = metadata.workspace_root;
let calculated_path = Path::new(&project_root_dir)
.join(relative_snapshot_path)
.as_os_str()
.to_str()
.expect("unable to convert OS string")
.to_owned();
calculated_path
}
}
pub struct DatadogTestAgent {
container: DatadogTestAgentContainer,
socket_path: Option<String>,
}
impl DatadogTestAgent {
pub async fn new(
relative_snapshot_path: Option<&str>,
absolute_socket_path: Option<&str>,
test_agent_extra_env: &[(&str, &str)],
) -> Self {
let mut container =
DatadogAgentContainerBuilder::new(relative_snapshot_path, absolute_socket_path);
for (key, value) in test_agent_extra_env {
container = container.with_env(key, value);
}
DatadogTestAgent {
container: container
.start()
.expect("Unable to start DatadogTestAgent, is the Docker Daemon running?"),
socket_path: absolute_socket_path.map(|p: &str| format!("{}/apm.socket", p)),
}
}
pub async fn get_base_uri(&self) -> http::Uri {
libdd_common::parse_uri(&if let Some(path) = &self.socket_path {
format!("unix://{path}")
} else {
self.container.trace_agent_uri().unwrap()
})
.unwrap()
}
pub async fn get_uri_for_endpoint(&self, endpoint: &str, snapshot_token: Option<&str>) -> Uri {
self.get_uri_for_endpoint_and_params(
endpoint,
snapshot_token.map(|t| ("test_session_token", t)),
)
.await
}
async fn get_uri_for_endpoint_and_params<'a, I: IntoIterator<Item = (&'a str, &'a str)>>(
&self,
endpoint: &str,
query_params: I,
) -> Uri {
let base_uri = self.get_base_uri().await;
let mut parts = base_uri.into_parts();
let mut query_string = String::new();
for (i, (k, v)) in query_params.into_iter().enumerate() {
if i == 0 {
query_string.push('?');
} else {
query_string.push('&');
}
let _ = write!(
&mut query_string,
"{}={}",
urlencoding::encode(k),
urlencoding::encode(v)
);
}
parts.path_and_query = Some(
format!("/{}{}", endpoint.trim_start_matches('/'), query_string)
.parse()
.expect("Invalid path and query"),
);
Uri::from_parts(parts).expect("Invalid URI")
}
pub async fn get_otlp_http_uri(&self) -> Uri {
let host_port = self
.container
.host_port(self.container.otlp_http_port)
.expect("Failed to get OTLP HTTP host port");
let uri_string = format!("http://localhost:{}", host_port);
Uri::from_str(&uri_string).expect("Invalid URI")
}
pub async fn get_otlp_grpc_uri(&self) -> Uri {
let host_port = self
.container
.host_port(self.container.otlp_proto_port)
.expect("Failed to get OTLP gRPC host port");
let uri_string = format!("http://localhost:{}", host_port);
Uri::from_str(&uri_string).expect("Invalid URI")
}
pub async fn assert_snapshot(&self, snapshot_token: &str) {
let uri = self
.get_uri_for_endpoint(SESSION_ASSERT_SNAPSHOT, Some(snapshot_token))
.await;
let req = Request::builder()
.method("GET")
.uri(uri)
.body(http_common::Body::empty())
.expect("Failed to create request");
let res = self
.agent_request_with_retry(req, 5)
.await
.expect("request failed");
let status_code = res.status();
let body_bytes = res
.into_body()
.collect()
.await
.expect("Read failed")
.to_bytes();
let body_string = String::from_utf8(body_bytes.to_vec()).expect("Conversion failed");
assert_eq!(
status_code,
hyper::StatusCode::OK,
"Expected status 200, but got {status_code}. Response body: {body_string}"
);
}
pub async fn get_sent_traces(&self) -> Vec<serde_json::Value> {
let uri = self.get_uri_for_endpoint("test/traces", None).await;
let req = Request::builder()
.method("GET")
.uri(uri)
.body(http_common::Body::empty())
.expect("Failed to create request");
let res = self
.agent_request_with_retry(req, 5)
.await
.expect("request failed");
let body_bytes = res
.into_body()
.collect()
.await
.expect("Read failed")
.to_bytes();
let body_string = String::from_utf8(body_bytes.to_vec()).expect("Conversion failed");
serde_json::from_str(&body_string).expect("Failed to parse JSON response")
}
pub async fn start_session(
&self,
session_token: &str,
agent_sample_rates_by_service: Option<&str>,
) {
let mut query_params_map = HashMap::new();
query_params_map.insert(SESSION_TEST_TOKEN_QUERY_PARAM_KEY, session_token);
query_params_map
.extend(agent_sample_rates_by_service.map(|r| (SAMPLE_RATE_QUERY_PARAM_KEY, r)));
let uri = self
.get_uri_for_endpoint_and_params(SESSION_START_ENDPOINT, query_params_map)
.await;
let req = Request::builder()
.method("GET")
.uri(uri)
.body(http_common::Body::empty())
.expect("Failed to create request");
let res = self
.agent_request_with_retry(req, 5)
.await
.expect("request failed");
assert_eq!(
res.status(),
hyper::StatusCode::OK,
"Expected status 200 for test agent {}, but got {}",
SESSION_START_ENDPOINT,
res.status()
);
}
async fn agent_request_with_retry(
&self,
req: Request<http_common::Body>,
max_attempts: i32,
) -> anyhow::Result<Response<Incoming>> {
let mut attempts = 1;
let mut delay_ms = 100;
let (parts, body) = req.into_parts();
let body_bytes = body
.collect()
.await
.expect("Failed to collect body")
.to_bytes();
let mut last_response;
loop {
let client = http_common::new_default_client();
let req = Request::from_parts(
parts.clone(),
http_common::Body::from_bytes(body_bytes.clone()),
);
let res = client.request(req).await;
match res {
Ok(response) => {
if response.status().is_success() {
return Ok(response);
} else {
println!(
"Request failed with status code: {}. Request attempt {attempts} of {max_attempts}",
response.status()
);
last_response = Ok(response);
}
}
Err(e) => {
println!(
"Request failed with error: {e}. Request attempt {attempts} of {max_attempts}"
);
last_response = Err(e)
}
}
if attempts >= max_attempts {
return Ok(last_response?);
}
tokio::time::sleep(Duration::from_millis(delay_ms)).await;
delay_ms *= 2;
attempts += 1;
}
}
pub async fn set_remote_config_response(&self, data: &str, snapshot_token: Option<&str>) {
let uri = self
.get_uri_for_endpoint(SET_REMOTE_CONFIG_RESPONSE_PATH_ENDPOINT, snapshot_token)
.await;
let req = Request::builder()
.method("POST")
.uri(uri)
.body(http_common::Body::from(data.as_bytes().to_vec()))
.expect("Failed to create request");
let res = self
.agent_request_with_retry(req, 5)
.await
.expect("request failed");
let status = res.status();
let body = res
.into_body()
.collect()
.await
.expect("failed to read request body")
.to_bytes();
assert_eq!(
status,
hyper::StatusCode::ACCEPTED,
"Expected status 202 for test agent {}, but got {}: {:?}",
SET_REMOTE_CONFIG_RESPONSE_PATH_ENDPOINT,
status,
String::from_utf8_lossy(&body)
);
}
}