pub mod send_data_result;
use crate::msgpack_encoder;
use crate::send_with_retry::{send_with_retry, RetryStrategy, SendWithRetryResult};
use crate::trace_utils::TracerHeaderTags;
use crate::tracer_payload::TracerPayloadCollection;
use anyhow::{anyhow, Context};
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use http::{header::CONTENT_TYPE, HeaderMap, HeaderValue};
use libdd_capabilities::{HttpClientCapability, SleepCapability};
use libdd_common::{
header::{
APPLICATION_MSGPACK, APPLICATION_PROTOBUF, DATADOG_SEND_REAL_HTTP_STATUS,
DATADOG_TRACE_COUNT,
},
Endpoint,
};
use libdd_trace_protobuf::pb::{AgentPayload, TracerPayload};
use send_data_result::SendDataResult;
use std::collections::HashMap;
#[cfg(feature = "compression")]
use std::io::Write;
#[cfg(feature = "compression")]
use zstd::stream::write::Encoder;
#[derive(Debug)]
pub struct SendData {
pub(crate) tracer_payloads: TracerPayloadCollection,
pub(crate) size: usize, target: Endpoint,
headers: HeaderMap,
retry_strategy: RetryStrategy,
#[cfg(feature = "compression")]
compression: Compression,
}
#[cfg(feature = "compression")]
#[derive(Debug, Clone)]
pub enum Compression {
Zstd(i32),
None,
}
pub struct SendDataBuilder {
pub(crate) tracer_payloads: TracerPayloadCollection,
pub(crate) size: usize,
target: Endpoint,
headers: HeaderMap,
retry_strategy: RetryStrategy,
#[cfg(feature = "compression")]
compression: Compression,
}
impl SendDataBuilder {
pub fn new(
size: usize,
tracer_payload: TracerPayloadCollection,
tracer_header_tags: TracerHeaderTags,
target: &Endpoint,
) -> SendDataBuilder {
let mut headers: HeaderMap = tracer_header_tags.into();
headers.insert(DATADOG_SEND_REAL_HTTP_STATUS, HeaderValue::from_static("1"));
SendDataBuilder {
tracer_payloads: tracer_payload,
size,
target: target.clone(),
headers,
retry_strategy: RetryStrategy::default(),
#[cfg(feature = "compression")]
compression: Compression::None,
}
}
#[cfg(feature = "compression")]
pub fn with_compression(mut self, compression: Compression) -> SendDataBuilder {
self.compression = compression;
self
}
pub fn with_api_key(mut self, api_key: &str) -> SendDataBuilder {
self.target.api_key = Some(api_key.to_string().into());
self
}
pub fn with_retry_strategy(mut self, retry_strategy: RetryStrategy) -> SendDataBuilder {
self.retry_strategy = retry_strategy;
self
}
pub fn build(self) -> SendData {
SendData {
tracer_payloads: self.tracer_payloads,
size: self.size,
target: self.target,
headers: self.headers,
retry_strategy: self.retry_strategy,
#[cfg(feature = "compression")]
compression: self.compression,
}
}
}
impl SendData {
#[allow(unused_variables)]
pub fn new(
size: usize,
tracer_payload: TracerPayloadCollection,
tracer_header_tags: TracerHeaderTags,
target: &Endpoint,
) -> SendData {
let mut headers: HeaderMap = tracer_header_tags.into();
headers.insert(DATADOG_SEND_REAL_HTTP_STATUS, HeaderValue::from_static("1"));
SendData {
tracer_payloads: tracer_payload,
size,
target: target.clone(),
headers,
retry_strategy: RetryStrategy::default(),
#[cfg(feature = "compression")]
compression: Compression::None,
}
}
pub fn len(&self) -> usize {
self.size
}
pub fn is_empty(&self) -> bool {
self.size == 0
}
pub fn get_target(&self) -> &Endpoint {
&self.target
}
pub fn get_payloads(&self) -> &TracerPayloadCollection {
&self.tracer_payloads
}
pub fn set_retry_strategy(&mut self, retry_strategy: RetryStrategy) {
self.retry_strategy = retry_strategy;
}
pub async fn send<C: HttpClientCapability + SleepCapability>(
&self,
capabilities: &C,
) -> SendDataResult {
self.send_internal(capabilities, None).await
}
async fn send_internal<C: HttpClientCapability + SleepCapability>(
&self,
capabilities: &C,
endpoint: Option<Endpoint>,
) -> SendDataResult {
if self.use_protobuf() {
self.send_with_protobuf(capabilities, endpoint).await
} else {
self.send_with_msgpack(capabilities, endpoint).await
}
}
async fn send_payload<C: HttpClientCapability + SleepCapability>(
&self,
capabilities: &C,
chunks: u64,
payload: Vec<u8>,
headers: HeaderMap,
endpoint: Option<&Endpoint>,
) -> (SendWithRetryResult, u64, u64) {
#[allow(clippy::unwrap_used)]
let payload_len = u64::try_from(payload.len()).unwrap();
(
send_with_retry(
capabilities,
endpoint.unwrap_or(&self.target),
payload,
&headers,
&self.retry_strategy,
)
.await,
payload_len,
chunks,
)
}
fn use_protobuf(&self) -> bool {
self.target.api_key.is_some()
}
#[cfg(feature = "compression")]
fn compress_payload(&self, payload: Vec<u8>, headers: &mut HeaderMap) -> Vec<u8> {
match self.compression {
Compression::Zstd(level) => {
let result = (|| -> std::io::Result<Vec<u8>> {
let mut encoder = Encoder::new(Vec::new(), level)?;
encoder.write_all(&payload)?;
encoder.finish()
})();
match result {
Ok(compressed_payload) => {
headers.insert(
http::header::CONTENT_ENCODING,
HeaderValue::from_static("zstd"),
);
compressed_payload
}
Err(_) => payload,
}
}
_ => payload,
}
}
async fn send_with_protobuf<C: HttpClientCapability + SleepCapability>(
&self,
capabilities: &C,
endpoint: Option<Endpoint>,
) -> SendDataResult {
let mut result = SendDataResult::default();
#[allow(clippy::unwrap_used)]
let chunks = u64::try_from(self.tracer_payloads.size()).unwrap();
match &self.tracer_payloads {
TracerPayloadCollection::V07(payloads) => {
let agent_payload = construct_agent_payload(payloads.to_vec());
let serialized_trace_payload = match serialize_proto_payload(&agent_payload)
.context("Failed to serialize trace agent payload, dropping traces")
{
Ok(p) => p,
Err(e) => return result.error(e),
};
let mut request_headers = self.headers.clone();
#[cfg(feature = "compression")]
let final_payload =
self.compress_payload(serialized_trace_payload, &mut request_headers);
#[cfg(not(feature = "compression"))]
let final_payload = serialized_trace_payload;
request_headers.insert(CONTENT_TYPE, APPLICATION_PROTOBUF);
let (response, bytes_sent, chunks) = self
.send_payload(
capabilities,
chunks,
final_payload,
request_headers,
endpoint.as_ref(),
)
.await;
result.update(response, bytes_sent, chunks);
result
}
_ => result,
}
}
async fn send_with_msgpack<C: HttpClientCapability + SleepCapability>(
&self,
capabilities: &C,
endpoint: Option<Endpoint>,
) -> SendDataResult {
let mut result = SendDataResult::default();
let mut futures = FuturesUnordered::new();
match &self.tracer_payloads {
TracerPayloadCollection::V07(payloads) => {
for tracer_payload in payloads {
#[allow(clippy::unwrap_used)]
let chunks = u64::try_from(tracer_payload.chunks.len()).unwrap();
let mut headers = self.headers.clone();
headers.reserve(2);
headers.insert(DATADOG_TRACE_COUNT, chunks.into());
headers.insert(CONTENT_TYPE, APPLICATION_MSGPACK);
let payload = match rmp_serde::to_vec_named(tracer_payload) {
Ok(p) => p,
Err(e) => return result.error(anyhow!(e)),
};
futures.push(self.send_payload(
capabilities,
chunks,
payload,
headers,
endpoint.as_ref(),
));
}
}
TracerPayloadCollection::V04(payload) => {
#[allow(clippy::unwrap_used)]
let chunks = u64::try_from(self.tracer_payloads.size()).unwrap();
let mut headers = self.headers.clone();
headers.reserve(2);
headers.insert(DATADOG_TRACE_COUNT, chunks.into());
headers.insert(CONTENT_TYPE, APPLICATION_MSGPACK);
let payload = msgpack_encoder::v04::to_vec(payload);
futures.push(self.send_payload(
capabilities,
chunks,
payload,
headers,
endpoint.as_ref(),
));
}
TracerPayloadCollection::V05(payload) => {
#[allow(clippy::unwrap_used)]
let chunks = u64::try_from(self.tracer_payloads.size()).unwrap();
let mut headers = self.headers.clone();
headers.reserve(2);
headers.insert(DATADOG_TRACE_COUNT, chunks.into());
headers.insert(CONTENT_TYPE, APPLICATION_MSGPACK);
let payload = match rmp_serde::to_vec(payload) {
Ok(p) => p,
Err(e) => return result.error(anyhow!(e)),
};
futures.push(self.send_payload(
capabilities,
chunks,
payload,
headers,
endpoint.as_ref(),
));
}
}
loop {
match futures.next().await {
Some((response, payload_len, chunks)) => {
result.update(response, payload_len, chunks);
if result.last_result.is_err() {
return result;
}
}
None => return result,
}
}
}
}
fn construct_agent_payload(tracer_payloads: Vec<TracerPayload>) -> AgentPayload {
AgentPayload {
host_name: "".to_string(),
env: "".to_string(),
agent_version: "".to_string(),
error_tps: 60.0,
target_tps: 60.0,
tags: HashMap::new(),
tracer_payloads,
rare_sampler_enabled: false,
idx_tracer_payloads: Vec::new(),
}
}
fn serialize_proto_payload<T>(payload: &T) -> anyhow::Result<Vec<u8>>
where
T: prost::Message,
{
let mut buf = Vec::with_capacity(payload.encoded_len());
payload.encode(&mut buf)?;
Ok(buf)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::send_with_retry::{RetryBackoffType, RetryStrategy};
use crate::test_utils::create_test_no_alloc_span;
use crate::trace_utils::{construct_trace_chunk, construct_tracer_payload, TracerPayloadTags};
use crate::tracer_header_tags::TracerHeaderTags;
use httpmock::prelude::*;
use httpmock::MockServer;
use libdd_capabilities::HttpClientCapability;
use libdd_capabilities_impl::NativeCapabilities;
use libdd_common::Endpoint;
use libdd_trace_protobuf::pb::Span;
use std::collections::HashMap;
use std::time::Duration;
const ONE_SECOND: u64 = 1_000;
const HEADER_TAGS: TracerHeaderTags = TracerHeaderTags {
lang: "test-lang",
lang_version: "2.0",
lang_interpreter: "interpreter",
lang_vendor: "vendor",
tracer_version: "1.0",
container_id: "id",
client_computed_top_level: false,
client_computed_stats: false,
dropped_p0_traces: 0,
dropped_p0_spans: 0,
};
fn setup_payload(header_tags: &TracerHeaderTags) -> TracerPayload {
let tracer_payload_tags = TracerPayloadTags {
env: "TEST".to_string(),
app_version: "1.0".to_string(),
hostname: "test_bench".to_string(),
runtime_id: "id".to_string(),
};
let chunk = construct_trace_chunk(vec![Span {
service: "test-service".to_string(),
name: "test-service-name".to_string(),
resource: "test-service-resource".to_string(),
trace_id: 111,
span_id: 222,
parent_id: 333,
start: 1,
duration: 5,
error: 0,
meta: HashMap::new(),
metrics: HashMap::new(),
meta_struct: HashMap::new(),
r#type: "".to_string(),
span_links: vec![],
span_events: vec![],
}]);
construct_tracer_payload(vec![chunk], header_tags, tracer_payload_tags)
}
fn compute_payload_len(collection: &TracerPayloadCollection) -> usize {
match collection {
TracerPayloadCollection::V07(payloads) => {
let agent_payload = construct_agent_payload(payloads.to_vec());
let serialized_trace_payload = serialize_proto_payload(&agent_payload).unwrap();
serialized_trace_payload.len()
}
_ => 0,
}
}
fn rmp_compute_payload_len(collection: &TracerPayloadCollection) -> usize {
match collection {
TracerPayloadCollection::V07(payloads) => {
let mut total: usize = 0;
for payload in payloads {
total += rmp_serde::to_vec_named(payload).unwrap().len();
}
total
}
TracerPayloadCollection::V04(payloads) => {
msgpack_encoder::v04::to_encoded_byte_len(payloads) as usize
}
TracerPayloadCollection::V05(payloads) => rmp_serde::to_vec(payloads).unwrap().len(),
}
}
#[test]
fn send_data_new_api_key() {
let header_tags = TracerHeaderTags::default();
let payload = setup_payload(&header_tags);
let data = SendData::new(
100,
TracerPayloadCollection::V07(vec![payload]),
HEADER_TAGS,
&Endpoint {
api_key: Some(std::borrow::Cow::Borrowed("TEST-KEY")),
url: "/foo/bar?baz".parse::<hyper::Uri>().unwrap(),
timeout_ms: ONE_SECOND,
..Endpoint::default()
},
);
assert_eq!(data.size, 100);
assert_eq!(data.target.api_key.unwrap(), "TEST-KEY");
assert_eq!(data.target.url.path(), "/foo/bar");
}
#[test]
fn send_data_new_no_api_key() {
let header_tags = TracerHeaderTags::default();
let payload = setup_payload(&header_tags);
let data = SendData::new(
100,
TracerPayloadCollection::V07(vec![payload]),
header_tags.clone(),
&Endpoint {
api_key: None,
url: "/foo/bar?baz".parse::<hyper::Uri>().unwrap(),
timeout_ms: ONE_SECOND,
..Endpoint::default()
},
);
assert_eq!(data.size, 100);
assert_eq!(data.target.api_key, None);
assert_eq!(data.target.url.path(), "/foo/bar");
for (key, value) in &HeaderMap::from(header_tags) {
assert_eq!(data.headers.get(key), Some(value));
}
}
#[cfg_attr(miri, ignore)]
#[tokio::test]
async fn request_protobuf() {
let server = MockServer::start_async().await;
let mock = server
.mock_async(|when, then| {
when.method(POST)
.header("Content-type", "application/x-protobuf")
.header("DD-API-KEY", "TEST-KEY")
.path("/");
then.status(202).body("");
})
.await;
let header_tags = TracerHeaderTags::default();
let payload = setup_payload(&header_tags);
let data = SendData::new(
100,
TracerPayloadCollection::V07(vec![payload.clone()]),
header_tags,
&Endpoint {
api_key: Some(std::borrow::Cow::Borrowed("TEST-KEY")),
url: server.url("/").parse::<hyper::Uri>().unwrap(),
timeout_ms: ONE_SECOND,
..Endpoint::default()
},
);
let data_payload_len = compute_payload_len(&data.tracer_payloads);
let res = data.send(&NativeCapabilities::new_client()).await;
mock.assert_async().await;
assert_eq!(
res.last_result.unwrap().status(),
http::StatusCode::ACCEPTED
);
assert_eq!(res.errors_timeout, 0);
assert_eq!(res.errors_network, 0);
assert_eq!(res.errors_status_code, 0);
assert_eq!(res.requests_count, 1);
assert_eq!(res.chunks_sent, 1);
assert_eq!(res.bytes_sent, data_payload_len as u64);
assert_eq!(*res.responses_count_per_code.get(&202).unwrap(), 1_u64);
}
#[cfg_attr(miri, ignore)]
#[tokio::test]
async fn request_protobuf_several_payloads() {
let server = MockServer::start_async().await;
let mock = server
.mock_async(|when, then| {
when.method(POST)
.header("Content-type", "application/x-protobuf")
.header("DD-API-KEY", "TEST-KEY")
.path("/");
then.status(202).body("");
})
.await;
let header_tags = TracerHeaderTags::default();
let payload = setup_payload(&header_tags);
let data = SendData::new(
100,
TracerPayloadCollection::V07(vec![payload.clone(), payload.clone()]),
header_tags,
&Endpoint {
api_key: Some(std::borrow::Cow::Borrowed("TEST-KEY")),
url: server.url("/").parse::<hyper::Uri>().unwrap(),
timeout_ms: ONE_SECOND,
..Endpoint::default()
},
);
let data_payload_len = compute_payload_len(&data.tracer_payloads);
let res = data.send(&NativeCapabilities::new_client()).await;
mock.assert_async().await;
assert_eq!(
res.last_result.unwrap().status(),
http::StatusCode::ACCEPTED
);
assert_eq!(res.errors_timeout, 0);
assert_eq!(res.errors_network, 0);
assert_eq!(res.errors_status_code, 0);
assert_eq!(res.requests_count, 1);
assert_eq!(res.chunks_sent, 2);
assert_eq!(res.bytes_sent, data_payload_len as u64);
assert_eq!(*res.responses_count_per_code.get(&202).unwrap(), 1_u64);
}
#[cfg_attr(miri, ignore)]
#[tokio::test]
async fn request_msgpack_v07() {
let server = MockServer::start_async().await;
let header_tags = HEADER_TAGS;
let mock = server
.mock_async(|when, then| {
when.method(POST)
.header(DATADOG_TRACE_COUNT.as_str(), "1")
.header("Content-type", "application/msgpack")
.header("datadog-meta-lang", header_tags.lang)
.header(
"datadog-meta-lang-interpreter",
header_tags.lang_interpreter,
)
.header("datadog-meta-lang-version", header_tags.lang_version)
.header(
"datadog-meta-lang-interpreter-vendor",
header_tags.lang_vendor,
)
.header("datadog-meta-tracer-version", header_tags.tracer_version)
.header("datadog-container-id", header_tags.container_id)
.header("Datadog-Send-Real-Http-Status", "1")
.path("/");
then.status(200).body("");
})
.await;
let header_tags = HEADER_TAGS;
let payload = setup_payload(&header_tags);
let data = SendData::new(
100,
TracerPayloadCollection::V07(vec![payload.clone()]),
header_tags,
&Endpoint {
api_key: None,
url: server.url("/").parse::<hyper::Uri>().unwrap(),
timeout_ms: ONE_SECOND,
..Endpoint::default()
},
);
let data_payload_len = rmp_compute_payload_len(&data.tracer_payloads);
let res = data.send(&NativeCapabilities::new_client()).await;
mock.assert_async().await;
assert_eq!(res.last_result.unwrap().status(), http::StatusCode::OK);
assert_eq!(res.errors_timeout, 0);
assert_eq!(res.errors_network, 0);
assert_eq!(res.errors_status_code, 0);
assert_eq!(res.requests_count, 1);
assert_eq!(res.chunks_sent, 1);
assert_eq!(res.bytes_sent, data_payload_len as u64);
assert_eq!(*res.responses_count_per_code.get(&200).unwrap(), 1_u64);
}
#[cfg_attr(miri, ignore)]
#[tokio::test]
async fn request_msgpack_v04() {
let server = MockServer::start_async().await;
let header_tags = HEADER_TAGS;
let mock = server
.mock_async(|when, then| {
when.method(POST)
.header(DATADOG_TRACE_COUNT.as_str(), "1")
.header("Content-type", "application/msgpack")
.header("datadog-meta-lang", header_tags.lang)
.header(
"datadog-meta-lang-interpreter",
header_tags.lang_interpreter,
)
.header("datadog-meta-lang-version", header_tags.lang_version)
.header(
"datadog-meta-lang-interpreter-vendor",
header_tags.lang_vendor,
)
.header("datadog-meta-tracer-version", header_tags.tracer_version)
.header("datadog-container-id", header_tags.container_id)
.path("/");
then.status(200).body("");
})
.await;
let header_tags = HEADER_TAGS;
let trace = vec![create_test_no_alloc_span(1234, 12342, 12341, 1, false)];
let data = SendData::new(
100,
TracerPayloadCollection::V04(vec![trace.clone()]),
header_tags,
&Endpoint {
api_key: None,
url: server.url("/").parse::<hyper::Uri>().unwrap(),
timeout_ms: ONE_SECOND,
..Endpoint::default()
},
);
let data_payload_len = rmp_compute_payload_len(&data.tracer_payloads);
let res = data.send(&NativeCapabilities::new_client()).await;
mock.assert_async().await;
assert_eq!(res.last_result.unwrap().status(), http::StatusCode::OK);
assert_eq!(res.errors_timeout, 0);
assert_eq!(res.errors_network, 0);
assert_eq!(res.errors_status_code, 0);
assert_eq!(res.requests_count, 1);
assert_eq!(res.chunks_sent, 1);
assert_eq!(res.bytes_sent, data_payload_len as u64);
assert_eq!(*res.responses_count_per_code.get(&200).unwrap(), 1_u64);
}
#[cfg_attr(miri, ignore)]
#[tokio::test]
async fn request_msgpack_several_payloads() {
let server = MockServer::start_async().await;
let mock = server
.mock_async(|when, then| {
when.method(POST)
.header("Content-type", "application/msgpack")
.path("/");
then.status(200).body("");
})
.await;
let header_tags = TracerHeaderTags::default();
let payload = setup_payload(&header_tags);
let data = SendData::new(
100,
TracerPayloadCollection::V07(vec![payload.clone(), payload.clone()]),
header_tags,
&Endpoint {
api_key: None,
url: server.url("/").parse::<hyper::Uri>().unwrap(),
timeout_ms: ONE_SECOND,
..Endpoint::default()
},
);
let data_payload_len = rmp_compute_payload_len(&data.tracer_payloads);
let res = data.send(&NativeCapabilities::new_client()).await;
mock.assert_calls_async(2).await;
assert_eq!(res.last_result.unwrap().status(), http::StatusCode::OK);
assert_eq!(res.errors_timeout, 0);
assert_eq!(res.errors_network, 0);
assert_eq!(res.errors_status_code, 0);
assert_eq!(res.requests_count, 2);
assert_eq!(res.chunks_sent, 2);
assert_eq!(res.bytes_sent, data_payload_len as u64);
assert_eq!(*res.responses_count_per_code.get(&200).unwrap(), 2_u64);
}
#[cfg_attr(miri, ignore)]
#[tokio::test]
async fn request_error_status_code() {
let server = MockServer::start_async().await;
let mock = server
.mock_async(|when, then| {
when.method(POST)
.header("Content-type", "application/msgpack")
.path("/");
then.status(500).body("");
})
.await;
let payload = setup_payload(&HEADER_TAGS);
let data = SendData::new(
100,
TracerPayloadCollection::V07(vec![payload]),
HEADER_TAGS,
&Endpoint {
api_key: None,
url: server.url("/").parse::<hyper::Uri>().unwrap(),
timeout_ms: ONE_SECOND,
..Endpoint::default()
},
);
let res = data.send(&NativeCapabilities::new_client()).await;
mock.assert_calls_async(5).await;
assert!(res.last_result.is_ok());
assert_eq!(
res.last_result.unwrap().status(),
http::StatusCode::INTERNAL_SERVER_ERROR
);
assert_eq!(res.errors_timeout, 0);
assert_eq!(res.errors_network, 0);
assert_eq!(res.errors_status_code, 1);
assert_eq!(res.requests_count, 5);
assert_eq!(res.chunks_sent, 0);
assert_eq!(res.bytes_sent, 0);
assert_eq!(*res.responses_count_per_code.get(&500).unwrap(), 1_u64);
}
#[cfg_attr(miri, ignore)]
#[tokio::test]
async fn request_error_network() {
let payload = setup_payload(&HEADER_TAGS);
let data = SendData::new(
100,
TracerPayloadCollection::V07(vec![payload]),
HEADER_TAGS,
&Endpoint {
api_key: None,
url: "http://127.0.0.1:4321/".parse::<hyper::Uri>().unwrap(),
timeout_ms: ONE_SECOND,
..Endpoint::default()
},
);
let res = data.send(&NativeCapabilities::new_client()).await;
assert!(res.last_result.is_err());
match std::env::consts::OS {
"windows" => {
assert_eq!(res.errors_timeout, 1);
assert_eq!(res.errors_network, 0);
}
_ => {
assert_eq!(res.errors_timeout, 0);
assert_eq!(res.errors_network, 1);
}
}
assert_eq!(res.errors_status_code, 0);
assert_eq!(res.requests_count, 5);
assert_eq!(res.errors_status_code, 0);
assert_eq!(res.chunks_sent, 0);
assert_eq!(res.bytes_sent, 0);
assert_eq!(res.responses_count_per_code.len(), 0);
}
#[cfg_attr(miri, ignore)]
#[tokio::test]
async fn request_error_timeout_v04() {
let server = MockServer::start_async().await;
let header_tags = HEADER_TAGS;
let mock = server
.mock_async(|when, then| {
when.method(POST)
.header(DATADOG_TRACE_COUNT.as_str(), "2")
.header("Content-type", "application/msgpack")
.header("datadog-meta-lang", header_tags.lang)
.header(
"datadog-meta-lang-interpreter",
header_tags.lang_interpreter,
)
.header("datadog-meta-lang-version", header_tags.lang_version)
.header(
"datadog-meta-lang-interpreter-vendor",
header_tags.lang_vendor,
)
.header("datadog-meta-tracer-version", header_tags.tracer_version)
.header("datadog-container-id", header_tags.container_id)
.path("/");
then.status(200).body("").delay(Duration::from_millis(500));
})
.await;
let header_tags = HEADER_TAGS;
let trace = vec![create_test_no_alloc_span(1234, 12342, 12341, 1, false)];
let data = SendData::new(
100,
TracerPayloadCollection::V04(vec![trace.clone(), trace.clone()]),
header_tags,
&Endpoint {
api_key: None,
url: server.url("/").parse::<hyper::Uri>().unwrap(),
timeout_ms: 200,
..Endpoint::default()
},
);
let res = data.send(&NativeCapabilities::new_client()).await;
mock.assert_calls_async(5).await;
assert_eq!(res.errors_timeout, 1);
assert_eq!(res.errors_network, 0);
assert_eq!(res.errors_status_code, 0);
assert_eq!(res.requests_count, 5);
assert_eq!(res.chunks_sent, 0);
assert_eq!(res.bytes_sent, 0);
assert_eq!(res.responses_count_per_code.len(), 0);
}
#[cfg_attr(miri, ignore)]
#[tokio::test]
async fn request_error_timeout_v07() {
let server = MockServer::start_async().await;
let mock = server
.mock_async(|when, then| {
when.method(POST)
.header("Content-type", "application/msgpack")
.path("/");
then.status(200).body("").delay(Duration::from_millis(500));
})
.await;
let header_tags = TracerHeaderTags::default();
let payload = setup_payload(&header_tags);
let data = SendData::new(
100,
TracerPayloadCollection::V07(vec![payload.clone(), payload.clone()]),
header_tags,
&Endpoint {
api_key: None,
url: server.url("/").parse::<hyper::Uri>().unwrap(),
timeout_ms: 200,
..Endpoint::default()
},
);
let res = data.send(&NativeCapabilities::new_client()).await;
mock.assert_calls_async(10).await;
assert_eq!(res.errors_timeout, 1);
assert_eq!(res.errors_network, 0);
assert_eq!(res.errors_status_code, 0);
assert_eq!(res.requests_count, 5);
assert_eq!(res.chunks_sent, 0);
assert_eq!(res.bytes_sent, 0);
assert_eq!(res.responses_count_per_code.len(), 0);
}
#[test]
fn test_builder() {
let header_tags = HEADER_TAGS;
let payload = setup_payload(&header_tags);
let retry_strategy = RetryStrategy::new(5, 100, RetryBackoffType::Constant, None);
let send_data = SendDataBuilder::new(
100,
TracerPayloadCollection::V07(vec![payload]),
header_tags,
&Endpoint::default(),
)
.with_api_key("TEST-KEY")
.with_retry_strategy(retry_strategy.clone())
.build();
assert_eq!(
send_data.target.api_key,
Some(std::borrow::Cow::Borrowed("TEST-KEY"))
);
assert_eq!(send_data.retry_strategy, retry_strategy);
}
}