1pub mod send_data_result;
5
6use crate::msgpack_encoder;
7use crate::send_with_retry::{send_with_retry, RetryStrategy, SendWithRetryResult};
8use crate::trace_utils::TracerHeaderTags;
9use crate::tracer_payload::TracerPayloadCollection;
10use anyhow::{anyhow, Context};
11use futures::stream::FuturesUnordered;
12use futures::StreamExt;
13use http::{header::CONTENT_TYPE, HeaderMap, HeaderValue};
14use libdd_common::{
15 header::{
16 APPLICATION_MSGPACK, APPLICATION_PROTOBUF, DATADOG_SEND_REAL_HTTP_STATUS,
17 DATADOG_TRACE_COUNT,
18 },
19 Connect, Endpoint, GenericHttpClient,
20};
21use libdd_trace_protobuf::pb::{AgentPayload, TracerPayload};
22use send_data_result::SendDataResult;
23use std::collections::HashMap;
24#[cfg(feature = "compression")]
25use std::io::Write;
26#[cfg(feature = "compression")]
27use zstd::stream::write::Encoder;
28
29#[derive(Debug)]
30pub struct SendData {
68 pub(crate) tracer_payloads: TracerPayloadCollection,
69 pub(crate) size: usize, target: Endpoint,
71 headers: HeaderMap,
72 retry_strategy: RetryStrategy,
73 #[cfg(feature = "compression")]
74 compression: Compression,
75}
76
77#[cfg(feature = "compression")]
78#[derive(Debug, Clone)]
79pub enum Compression {
80 Zstd(i32),
81 None,
82}
83
84pub struct SendDataBuilder {
85 pub(crate) tracer_payloads: TracerPayloadCollection,
86 pub(crate) size: usize,
87 target: Endpoint,
88 headers: HeaderMap,
89 retry_strategy: RetryStrategy,
90 #[cfg(feature = "compression")]
91 compression: Compression,
92}
93
94impl SendDataBuilder {
95 pub fn new(
96 size: usize,
97 tracer_payload: TracerPayloadCollection,
98 tracer_header_tags: TracerHeaderTags,
99 target: &Endpoint,
100 ) -> SendDataBuilder {
101 let mut headers: HeaderMap = tracer_header_tags.into();
102 headers.insert(DATADOG_SEND_REAL_HTTP_STATUS, HeaderValue::from_static("1"));
103 SendDataBuilder {
104 tracer_payloads: tracer_payload,
105 size,
106 target: target.clone(),
107 headers,
108 retry_strategy: RetryStrategy::default(),
109 #[cfg(feature = "compression")]
110 compression: Compression::None,
111 }
112 }
113
114 #[cfg(feature = "compression")]
115 pub fn with_compression(mut self, compression: Compression) -> SendDataBuilder {
116 self.compression = compression;
117 self
118 }
119
120 pub fn with_api_key(mut self, api_key: &str) -> SendDataBuilder {
121 self.target.api_key = Some(api_key.to_string().into());
122 self
123 }
124
125 pub fn with_retry_strategy(mut self, retry_strategy: RetryStrategy) -> SendDataBuilder {
126 self.retry_strategy = retry_strategy;
127 self
128 }
129
130 pub fn build(self) -> SendData {
131 SendData {
132 tracer_payloads: self.tracer_payloads,
133 size: self.size,
134 target: self.target,
135 headers: self.headers,
136 retry_strategy: self.retry_strategy,
137 #[cfg(feature = "compression")]
138 compression: self.compression,
139 }
140 }
141}
142
143impl SendData {
144 #[allow(unused_variables)]
157 pub fn new(
158 size: usize,
159 tracer_payload: TracerPayloadCollection,
160 tracer_header_tags: TracerHeaderTags,
161 target: &Endpoint,
162 ) -> SendData {
163 let mut headers: HeaderMap = tracer_header_tags.into();
164 headers.insert(DATADOG_SEND_REAL_HTTP_STATUS, HeaderValue::from_static("1"));
165 SendData {
166 tracer_payloads: tracer_payload,
167 size,
168 target: target.clone(),
169 headers,
170 retry_strategy: RetryStrategy::default(),
171 #[cfg(feature = "compression")]
172 compression: Compression::None,
173 }
174 }
175
176 pub fn len(&self) -> usize {
182 self.size
183 }
184
185 pub fn is_empty(&self) -> bool {
191 self.size == 0
192 }
193
194 pub fn get_target(&self) -> &Endpoint {
200 &self.target
201 }
202
203 pub fn get_payloads(&self) -> &TracerPayloadCollection {
209 &self.tracer_payloads
210 }
211
212 pub fn set_retry_strategy(&mut self, retry_strategy: RetryStrategy) {
218 self.retry_strategy = retry_strategy;
219 }
220
221 pub async fn send<C: Connect>(&self, http_client: &GenericHttpClient<C>) -> SendDataResult {
227 self.send_internal(http_client, None).await
228 }
229
230 async fn send_internal<C: Connect>(
231 &self,
232 http_client: &GenericHttpClient<C>,
233 endpoint: Option<Endpoint>,
234 ) -> SendDataResult {
235 if self.use_protobuf() {
236 self.send_with_protobuf(http_client, endpoint).await
237 } else {
238 self.send_with_msgpack(http_client, endpoint).await
239 }
240 }
241
242 async fn send_payload<C: Connect>(
243 &self,
244 chunks: u64,
245 payload: Vec<u8>,
246 headers: HeaderMap,
247 http_client: &GenericHttpClient<C>,
248 endpoint: Option<&Endpoint>,
249 ) -> (SendWithRetryResult, u64, u64) {
250 #[allow(clippy::unwrap_used)]
251 let payload_len = u64::try_from(payload.len()).unwrap();
252 (
253 send_with_retry(
254 http_client,
255 endpoint.unwrap_or(&self.target),
256 payload,
257 &headers,
258 &self.retry_strategy,
259 )
260 .await,
261 payload_len,
262 chunks,
263 )
264 }
265
266 fn use_protobuf(&self) -> bool {
267 self.target.api_key.is_some()
268 }
269
270 #[cfg(feature = "compression")]
271 fn compress_payload(&self, payload: Vec<u8>, headers: &mut HeaderMap) -> Vec<u8> {
272 match self.compression {
273 Compression::Zstd(level) => {
274 let result = (|| -> std::io::Result<Vec<u8>> {
275 let mut encoder = Encoder::new(Vec::new(), level)?;
276 encoder.write_all(&payload)?;
277 encoder.finish()
278 })();
279
280 match result {
281 Ok(compressed_payload) => {
282 headers.insert(
283 http::header::CONTENT_ENCODING,
284 HeaderValue::from_static("zstd"),
285 );
286 compressed_payload
287 }
288 Err(_) => payload,
289 }
290 }
291 _ => payload,
292 }
293 }
294
295 async fn send_with_protobuf<C: Connect>(
296 &self,
297 http_client: &GenericHttpClient<C>,
298 endpoint: Option<Endpoint>,
299 ) -> SendDataResult {
300 let mut result = SendDataResult::default();
301
302 #[allow(clippy::unwrap_used)]
303 let chunks = u64::try_from(self.tracer_payloads.size()).unwrap();
304
305 match &self.tracer_payloads {
306 TracerPayloadCollection::V07(payloads) => {
307 let agent_payload = construct_agent_payload(payloads.to_vec());
308 let serialized_trace_payload = match serialize_proto_payload(&agent_payload)
309 .context("Failed to serialize trace agent payload, dropping traces")
310 {
311 Ok(p) => p,
312 Err(e) => return result.error(e),
313 };
314 let mut request_headers = self.headers.clone();
315
316 #[cfg(feature = "compression")]
317 let final_payload =
318 self.compress_payload(serialized_trace_payload, &mut request_headers);
319
320 #[cfg(not(feature = "compression"))]
321 let final_payload = serialized_trace_payload;
322
323 request_headers.insert(CONTENT_TYPE, APPLICATION_PROTOBUF);
324
325 let (response, bytes_sent, chunks) = self
326 .send_payload(
327 chunks,
328 final_payload,
329 request_headers,
330 http_client,
331 endpoint.as_ref(),
332 )
333 .await;
334
335 result.update(response, bytes_sent, chunks);
336
337 result
338 }
339 _ => result,
340 }
341 }
342
343 async fn send_with_msgpack<C: Connect>(
344 &self,
345 http_client: &GenericHttpClient<C>,
346 endpoint: Option<Endpoint>,
347 ) -> SendDataResult {
348 let mut result = SendDataResult::default();
349 let mut futures = FuturesUnordered::new();
350
351 match &self.tracer_payloads {
352 TracerPayloadCollection::V07(payloads) => {
353 for tracer_payload in payloads {
354 #[allow(clippy::unwrap_used)]
355 let chunks = u64::try_from(tracer_payload.chunks.len()).unwrap();
356 let mut headers = self.headers.clone();
357 headers.reserve(2);
358 headers.insert(DATADOG_TRACE_COUNT, chunks.into());
359 headers.insert(CONTENT_TYPE, APPLICATION_MSGPACK);
360
361 let payload = match rmp_serde::to_vec_named(tracer_payload) {
362 Ok(p) => p,
363 Err(e) => return result.error(anyhow!(e)),
364 };
365
366 futures.push(self.send_payload(
367 chunks,
368 payload,
369 headers,
370 http_client,
371 endpoint.as_ref(),
372 ));
373 }
374 }
375 TracerPayloadCollection::V04(payload) => {
376 #[allow(clippy::unwrap_used)]
377 let chunks = u64::try_from(self.tracer_payloads.size()).unwrap();
378 let mut headers = self.headers.clone();
379 headers.reserve(2);
380 headers.insert(DATADOG_TRACE_COUNT, chunks.into());
381 headers.insert(CONTENT_TYPE, APPLICATION_MSGPACK);
382
383 let payload = msgpack_encoder::v04::to_vec(payload);
384
385 futures.push(self.send_payload(
386 chunks,
387 payload,
388 headers,
389 http_client,
390 endpoint.as_ref(),
391 ));
392 }
393 TracerPayloadCollection::V05(payload) => {
394 #[allow(clippy::unwrap_used)]
395 let chunks = u64::try_from(self.tracer_payloads.size()).unwrap();
396 let mut headers = self.headers.clone();
397 headers.reserve(2);
398 headers.insert(DATADOG_TRACE_COUNT, chunks.into());
399 headers.insert(CONTENT_TYPE, APPLICATION_MSGPACK);
400
401 let payload = match rmp_serde::to_vec(payload) {
402 Ok(p) => p,
403 Err(e) => return result.error(anyhow!(e)),
404 };
405
406 futures.push(self.send_payload(
407 chunks,
408 payload,
409 headers,
410 http_client,
411 endpoint.as_ref(),
412 ));
413 }
414 }
415
416 loop {
417 match futures.next().await {
418 Some((response, payload_len, chunks)) => {
419 result.update(response, payload_len, chunks);
420 if result.last_result.is_err() {
421 return result;
422 }
423 }
424 None => return result,
425 }
426 }
427 }
428}
429
430fn construct_agent_payload(tracer_payloads: Vec<TracerPayload>) -> AgentPayload {
431 AgentPayload {
432 host_name: "".to_string(),
433 env: "".to_string(),
434 agent_version: "".to_string(),
435 error_tps: 60.0,
436 target_tps: 60.0,
437 tags: HashMap::new(),
438 tracer_payloads,
439 rare_sampler_enabled: false,
440 idx_tracer_payloads: Vec::new(),
441 }
442}
443
444fn serialize_proto_payload<T>(payload: &T) -> anyhow::Result<Vec<u8>>
445where
446 T: prost::Message,
447{
448 let mut buf = Vec::with_capacity(payload.encoded_len());
449 payload.encode(&mut buf)?;
450 Ok(buf)
451}
452
453#[cfg(test)]
454mod tests {
455 use super::*;
456 use crate::send_with_retry::{RetryBackoffType, RetryStrategy};
457 use crate::test_utils::create_test_no_alloc_span;
458 use crate::trace_utils::{construct_trace_chunk, construct_tracer_payload, RootSpanTags};
459 use crate::tracer_header_tags::TracerHeaderTags;
460 use httpmock::prelude::*;
461 use httpmock::MockServer;
462 use libdd_common::Endpoint;
463 use libdd_trace_protobuf::pb::Span;
464 use std::collections::HashMap;
465 use std::time::Duration;
466
467 const ONE_SECOND: u64 = 1_000;
468 const HEADER_TAGS: TracerHeaderTags = TracerHeaderTags {
469 lang: "test-lang",
470 lang_version: "2.0",
471 lang_interpreter: "interpreter",
472 lang_vendor: "vendor",
473 tracer_version: "1.0",
474 container_id: "id",
475 client_computed_top_level: false,
476 client_computed_stats: false,
477 dropped_p0_traces: 0,
478 dropped_p0_spans: 0,
479 };
480
481 fn setup_payload(header_tags: &TracerHeaderTags) -> TracerPayload {
482 let root_tags = RootSpanTags {
483 env: "TEST",
484 app_version: "1.0",
485 hostname: "test_bench",
486 runtime_id: "id",
487 };
488
489 let chunk = construct_trace_chunk(vec![Span {
490 service: "test-service".to_string(),
491 name: "test-service-name".to_string(),
492 resource: "test-service-resource".to_string(),
493 trace_id: 111,
494 span_id: 222,
495 parent_id: 333,
496 start: 1,
497 duration: 5,
498 error: 0,
499 meta: HashMap::new(),
500 metrics: HashMap::new(),
501 meta_struct: HashMap::new(),
502 r#type: "".to_string(),
503 span_links: vec![],
504 span_events: vec![],
505 }]);
506
507 construct_tracer_payload(vec![chunk], header_tags, root_tags)
508 }
509
510 fn compute_payload_len(collection: &TracerPayloadCollection) -> usize {
511 match collection {
512 TracerPayloadCollection::V07(payloads) => {
513 let agent_payload = construct_agent_payload(payloads.to_vec());
514 let serialized_trace_payload = serialize_proto_payload(&agent_payload).unwrap();
515 serialized_trace_payload.len()
516 }
517 _ => 0,
518 }
519 }
520
521 fn rmp_compute_payload_len(collection: &TracerPayloadCollection) -> usize {
522 match collection {
523 TracerPayloadCollection::V07(payloads) => {
524 let mut total: usize = 0;
525 for payload in payloads {
526 total += rmp_serde::to_vec_named(payload).unwrap().len();
527 }
528 total
529 }
530 TracerPayloadCollection::V04(payloads) => {
531 msgpack_encoder::v04::to_len(payloads) as usize
532 }
533 TracerPayloadCollection::V05(payloads) => rmp_serde::to_vec(payloads).unwrap().len(),
534 }
535 }
536
537 #[test]
538 fn send_data_new_api_key() {
539 let header_tags = TracerHeaderTags::default();
540
541 let payload = setup_payload(&header_tags);
542 let data = SendData::new(
543 100,
544 TracerPayloadCollection::V07(vec![payload]),
545 HEADER_TAGS,
546 &Endpoint {
547 api_key: Some(std::borrow::Cow::Borrowed("TEST-KEY")),
548 url: "/foo/bar?baz".parse::<hyper::Uri>().unwrap(),
549 timeout_ms: ONE_SECOND,
550 ..Endpoint::default()
551 },
552 );
553
554 assert_eq!(data.size, 100);
555
556 assert_eq!(data.target.api_key.unwrap(), "TEST-KEY");
557 assert_eq!(data.target.url.path(), "/foo/bar");
558 }
559
560 #[test]
561 fn send_data_new_no_api_key() {
562 let header_tags = TracerHeaderTags::default();
563
564 let payload = setup_payload(&header_tags);
565 let data = SendData::new(
566 100,
567 TracerPayloadCollection::V07(vec![payload]),
568 header_tags.clone(),
569 &Endpoint {
570 api_key: None,
571 url: "/foo/bar?baz".parse::<hyper::Uri>().unwrap(),
572 timeout_ms: ONE_SECOND,
573 ..Endpoint::default()
574 },
575 );
576
577 assert_eq!(data.size, 100);
578
579 assert_eq!(data.target.api_key, None);
580 assert_eq!(data.target.url.path(), "/foo/bar");
581
582 for (key, value) in &HeaderMap::from(header_tags) {
583 assert_eq!(data.headers.get(key).unwrap(), value);
584 }
585 }
586
587 #[cfg_attr(miri, ignore)]
588 #[tokio::test]
589 async fn request_protobuf() {
590 let server = MockServer::start_async().await;
591
592 let mock = server
593 .mock_async(|when, then| {
594 when.method(POST)
595 .header("Content-type", "application/x-protobuf")
596 .header("DD-API-KEY", "TEST-KEY")
597 .path("/");
598 then.status(202).body("");
599 })
600 .await;
601
602 let header_tags = TracerHeaderTags::default();
603
604 let payload = setup_payload(&header_tags);
605 let data = SendData::new(
606 100,
607 TracerPayloadCollection::V07(vec![payload.clone()]),
608 header_tags,
609 &Endpoint {
610 api_key: Some(std::borrow::Cow::Borrowed("TEST-KEY")),
611 url: server.url("/").parse::<hyper::Uri>().unwrap(),
612 timeout_ms: ONE_SECOND,
613 ..Endpoint::default()
614 },
615 );
616
617 let data_payload_len = compute_payload_len(&data.tracer_payloads);
618 let client = libdd_common::http_common::new_default_client();
619 let res = data.send(&client).await;
620
621 mock.assert_async().await;
622
623 assert_eq!(res.last_result.unwrap().status(), 202);
624 assert_eq!(res.errors_timeout, 0);
625 assert_eq!(res.errors_network, 0);
626 assert_eq!(res.errors_status_code, 0);
627 assert_eq!(res.requests_count, 1);
628 assert_eq!(res.chunks_sent, 1);
629 assert_eq!(res.bytes_sent, data_payload_len as u64);
630 assert_eq!(*res.responses_count_per_code.get(&202).unwrap(), 1_u64);
631 }
632
633 #[cfg_attr(miri, ignore)]
634 #[tokio::test]
635 async fn request_protobuf_several_payloads() {
636 let server = MockServer::start_async().await;
637
638 let mock = server
639 .mock_async(|when, then| {
640 when.method(POST)
641 .header("Content-type", "application/x-protobuf")
642 .header("DD-API-KEY", "TEST-KEY")
643 .path("/");
644 then.status(202).body("");
645 })
646 .await;
647
648 let header_tags = TracerHeaderTags::default();
649
650 let payload = setup_payload(&header_tags);
651 let data = SendData::new(
652 100,
653 TracerPayloadCollection::V07(vec![payload.clone(), payload.clone()]),
654 header_tags,
655 &Endpoint {
656 api_key: Some(std::borrow::Cow::Borrowed("TEST-KEY")),
657 url: server.url("/").parse::<hyper::Uri>().unwrap(),
658 timeout_ms: ONE_SECOND,
659 ..Endpoint::default()
660 },
661 );
662
663 let data_payload_len = compute_payload_len(&data.tracer_payloads);
664 let client = libdd_common::http_common::new_default_client();
665 let res = data.send(&client).await;
666
667 mock.assert_async().await;
668
669 assert_eq!(res.last_result.unwrap().status(), 202);
670 assert_eq!(res.errors_timeout, 0);
671 assert_eq!(res.errors_network, 0);
672 assert_eq!(res.errors_status_code, 0);
673 assert_eq!(res.requests_count, 1);
674 assert_eq!(res.chunks_sent, 2);
675 assert_eq!(res.bytes_sent, data_payload_len as u64);
676 assert_eq!(*res.responses_count_per_code.get(&202).unwrap(), 1_u64);
677 }
678
679 #[cfg_attr(miri, ignore)]
680 #[tokio::test]
681 async fn request_msgpack_v07() {
682 let server = MockServer::start_async().await;
683
684 let header_tags = HEADER_TAGS;
685 let mock = server
686 .mock_async(|when, then| {
687 when.method(POST)
688 .header(DATADOG_TRACE_COUNT.as_str(), "1")
689 .header("Content-type", "application/msgpack")
690 .header("datadog-meta-lang", header_tags.lang)
691 .header(
692 "datadog-meta-lang-interpreter",
693 header_tags.lang_interpreter,
694 )
695 .header("datadog-meta-lang-version", header_tags.lang_version)
696 .header(
697 "datadog-meta-lang-interpreter-vendor",
698 header_tags.lang_vendor,
699 )
700 .header("datadog-meta-tracer-version", header_tags.tracer_version)
701 .header("datadog-container-id", header_tags.container_id)
702 .header("Datadog-Send-Real-Http-Status", "1")
703 .path("/");
704 then.status(200).body("");
705 })
706 .await;
707
708 let header_tags = HEADER_TAGS;
709
710 let payload = setup_payload(&header_tags);
711 let data = SendData::new(
712 100,
713 TracerPayloadCollection::V07(vec![payload.clone()]),
714 header_tags,
715 &Endpoint {
716 api_key: None,
717 url: server.url("/").parse::<hyper::Uri>().unwrap(),
718 timeout_ms: ONE_SECOND,
719 ..Endpoint::default()
720 },
721 );
722
723 let data_payload_len = rmp_compute_payload_len(&data.tracer_payloads);
724 let client = libdd_common::http_common::new_default_client();
725 let res = data.send(&client).await;
726
727 mock.assert_async().await;
728
729 assert_eq!(res.last_result.unwrap().status(), 200);
730 assert_eq!(res.errors_timeout, 0);
731 assert_eq!(res.errors_network, 0);
732 assert_eq!(res.errors_status_code, 0);
733 assert_eq!(res.requests_count, 1);
734 assert_eq!(res.chunks_sent, 1);
735 assert_eq!(res.bytes_sent, data_payload_len as u64);
736 assert_eq!(*res.responses_count_per_code.get(&200).unwrap(), 1_u64);
737 }
738
739 #[cfg_attr(miri, ignore)]
740 #[tokio::test]
741 async fn request_msgpack_v04() {
742 let server = MockServer::start_async().await;
743
744 let header_tags = HEADER_TAGS;
745 let mock = server
746 .mock_async(|when, then| {
747 when.method(POST)
748 .header(DATADOG_TRACE_COUNT.as_str(), "1")
749 .header("Content-type", "application/msgpack")
750 .header("datadog-meta-lang", header_tags.lang)
751 .header(
752 "datadog-meta-lang-interpreter",
753 header_tags.lang_interpreter,
754 )
755 .header("datadog-meta-lang-version", header_tags.lang_version)
756 .header(
757 "datadog-meta-lang-interpreter-vendor",
758 header_tags.lang_vendor,
759 )
760 .header("datadog-meta-tracer-version", header_tags.tracer_version)
761 .header("datadog-container-id", header_tags.container_id)
762 .path("/");
763 then.status(200).body("");
764 })
765 .await;
766
767 let header_tags = HEADER_TAGS;
768
769 let trace = vec![create_test_no_alloc_span(1234, 12342, 12341, 1, false)];
770 let data = SendData::new(
771 100,
772 TracerPayloadCollection::V04(vec![trace.clone()]),
773 header_tags,
774 &Endpoint {
775 api_key: None,
776 url: server.url("/").parse::<hyper::Uri>().unwrap(),
777 timeout_ms: ONE_SECOND,
778 ..Endpoint::default()
779 },
780 );
781
782 let data_payload_len = rmp_compute_payload_len(&data.tracer_payloads);
783 let client = libdd_common::http_common::new_default_client();
784 let res = data.send(&client).await;
785
786 mock.assert_async().await;
787
788 assert_eq!(res.last_result.unwrap().status(), 200);
789 assert_eq!(res.errors_timeout, 0);
790 assert_eq!(res.errors_network, 0);
791 assert_eq!(res.errors_status_code, 0);
792 assert_eq!(res.requests_count, 1);
793 assert_eq!(res.chunks_sent, 1);
794 assert_eq!(res.bytes_sent, data_payload_len as u64);
795 assert_eq!(*res.responses_count_per_code.get(&200).unwrap(), 1_u64);
796 }
797
798 #[cfg_attr(miri, ignore)]
799 #[tokio::test]
800 async fn request_msgpack_several_payloads() {
801 let server = MockServer::start_async().await;
802
803 let mock = server
804 .mock_async(|when, then| {
805 when.method(POST)
806 .header("Content-type", "application/msgpack")
807 .path("/");
808 then.status(200).body("");
809 })
810 .await;
811
812 let header_tags = TracerHeaderTags::default();
813
814 let payload = setup_payload(&header_tags);
815 let data = SendData::new(
816 100,
817 TracerPayloadCollection::V07(vec![payload.clone(), payload.clone()]),
818 header_tags,
819 &Endpoint {
820 api_key: None,
821 url: server.url("/").parse::<hyper::Uri>().unwrap(),
822 timeout_ms: ONE_SECOND,
823 ..Endpoint::default()
824 },
825 );
826
827 let data_payload_len = rmp_compute_payload_len(&data.tracer_payloads);
828 let client = libdd_common::http_common::new_default_client();
829 let res = data.send(&client).await;
830
831 mock.assert_calls_async(2).await;
832
833 assert_eq!(res.last_result.unwrap().status(), 200);
834 assert_eq!(res.errors_timeout, 0);
835 assert_eq!(res.errors_network, 0);
836 assert_eq!(res.errors_status_code, 0);
837 assert_eq!(res.requests_count, 2);
838 assert_eq!(res.chunks_sent, 2);
839 assert_eq!(res.bytes_sent, data_payload_len as u64);
840 assert_eq!(*res.responses_count_per_code.get(&200).unwrap(), 2_u64);
841 }
842
843 #[cfg_attr(miri, ignore)]
844 #[tokio::test]
845 async fn request_error_status_code() {
846 let server = MockServer::start_async().await;
847
848 let mock = server
849 .mock_async(|when, then| {
850 when.method(POST)
851 .header("Content-type", "application/msgpack")
852 .path("/");
853 then.status(500).body("");
854 })
855 .await;
856
857 let payload = setup_payload(&HEADER_TAGS);
858 let data = SendData::new(
859 100,
860 TracerPayloadCollection::V07(vec![payload]),
861 HEADER_TAGS,
862 &Endpoint {
863 api_key: None,
864 url: server.url("/").parse::<hyper::Uri>().unwrap(),
865 timeout_ms: ONE_SECOND,
866 ..Endpoint::default()
867 },
868 );
869
870 let client = libdd_common::http_common::new_default_client();
871 let res = data.send(&client).await;
872
873 mock.assert_calls_async(5).await;
874
875 assert!(res.last_result.is_ok());
876 assert_eq!(res.last_result.unwrap().status(), 500);
877 assert_eq!(res.errors_timeout, 0);
878 assert_eq!(res.errors_network, 0);
879 assert_eq!(res.errors_status_code, 1);
880 assert_eq!(res.requests_count, 5);
881 assert_eq!(res.chunks_sent, 0);
882 assert_eq!(res.bytes_sent, 0);
883 assert_eq!(*res.responses_count_per_code.get(&500).unwrap(), 1_u64);
884 }
885
886 #[cfg_attr(miri, ignore)]
887 #[tokio::test]
888 async fn request_error_network() {
889 let payload = setup_payload(&HEADER_TAGS);
891 let data = SendData::new(
892 100,
893 TracerPayloadCollection::V07(vec![payload]),
894 HEADER_TAGS,
895 &Endpoint {
896 api_key: None,
897 url: "http://127.0.0.1:4321/".parse::<hyper::Uri>().unwrap(),
898 timeout_ms: ONE_SECOND,
899 ..Endpoint::default()
900 },
901 );
902
903 let client = libdd_common::http_common::new_default_client();
904 let res = data.send(&client).await;
905
906 assert!(res.last_result.is_err());
907 match std::env::consts::OS {
908 "windows" => {
909 assert_eq!(res.errors_timeout, 1);
913 assert_eq!(res.errors_network, 0);
914 }
915 _ => {
916 assert_eq!(res.errors_timeout, 0);
917 assert_eq!(res.errors_network, 1);
918 }
919 }
920 assert_eq!(res.errors_status_code, 0);
921 assert_eq!(res.requests_count, 5);
922 assert_eq!(res.errors_status_code, 0);
923 assert_eq!(res.chunks_sent, 0);
924 assert_eq!(res.bytes_sent, 0);
925 assert_eq!(res.responses_count_per_code.len(), 0);
926 }
927
928 #[cfg_attr(miri, ignore)]
929 #[tokio::test]
930 async fn request_error_timeout_v04() {
931 let server = MockServer::start_async().await;
932
933 let header_tags = HEADER_TAGS;
934 let mock = server
935 .mock_async(|when, then| {
936 when.method(POST)
937 .header(DATADOG_TRACE_COUNT.as_str(), "2")
938 .header("Content-type", "application/msgpack")
939 .header("datadog-meta-lang", header_tags.lang)
940 .header(
941 "datadog-meta-lang-interpreter",
942 header_tags.lang_interpreter,
943 )
944 .header("datadog-meta-lang-version", header_tags.lang_version)
945 .header(
946 "datadog-meta-lang-interpreter-vendor",
947 header_tags.lang_vendor,
948 )
949 .header("datadog-meta-tracer-version", header_tags.tracer_version)
950 .header("datadog-container-id", header_tags.container_id)
951 .path("/");
952 then.status(200).body("").delay(Duration::from_millis(500));
953 })
954 .await;
955
956 let header_tags = HEADER_TAGS;
957
958 let trace = vec![create_test_no_alloc_span(1234, 12342, 12341, 1, false)];
959 let data = SendData::new(
960 100,
961 TracerPayloadCollection::V04(vec![trace.clone(), trace.clone()]),
962 header_tags,
963 &Endpoint {
964 api_key: None,
965 url: server.url("/").parse::<hyper::Uri>().unwrap(),
966 timeout_ms: 200,
967 ..Endpoint::default()
968 },
969 );
970
971 let client = libdd_common::http_common::new_default_client();
972 let res = data.send(&client).await;
973
974 mock.assert_calls_async(5).await;
975
976 assert_eq!(res.errors_timeout, 1);
977 assert_eq!(res.errors_network, 0);
978 assert_eq!(res.errors_status_code, 0);
979 assert_eq!(res.requests_count, 5);
980 assert_eq!(res.chunks_sent, 0);
981 assert_eq!(res.bytes_sent, 0);
982 assert_eq!(res.responses_count_per_code.len(), 0);
983 }
984
985 #[cfg_attr(miri, ignore)]
986 #[tokio::test]
987 async fn request_error_timeout_v07() {
988 let server = MockServer::start_async().await;
989
990 let mock = server
991 .mock_async(|when, then| {
992 when.method(POST)
993 .header("Content-type", "application/msgpack")
994 .path("/");
995 then.status(200).body("").delay(Duration::from_millis(500));
996 })
997 .await;
998
999 let header_tags = TracerHeaderTags::default();
1000
1001 let payload = setup_payload(&header_tags);
1002 let data = SendData::new(
1003 100,
1004 TracerPayloadCollection::V07(vec![payload.clone(), payload.clone()]),
1005 header_tags,
1006 &Endpoint {
1007 api_key: None,
1008 url: server.url("/").parse::<hyper::Uri>().unwrap(),
1009 timeout_ms: 200,
1010 ..Endpoint::default()
1011 },
1012 );
1013
1014 let client = libdd_common::http_common::new_default_client();
1015 let res = data.send(&client).await;
1016
1017 mock.assert_calls_async(10).await;
1018
1019 assert_eq!(res.errors_timeout, 1);
1020 assert_eq!(res.errors_network, 0);
1021 assert_eq!(res.errors_status_code, 0);
1022 assert_eq!(res.requests_count, 5);
1023 assert_eq!(res.chunks_sent, 0);
1024 assert_eq!(res.bytes_sent, 0);
1025 assert_eq!(res.responses_count_per_code.len(), 0);
1026 }
1027
1028 #[test]
1029 fn test_builder() {
1030 let header_tags = HEADER_TAGS;
1031 let payload = setup_payload(&header_tags);
1032 let retry_strategy = RetryStrategy::new(5, 100, RetryBackoffType::Constant, None);
1033
1034 let send_data = SendDataBuilder::new(
1035 100,
1036 TracerPayloadCollection::V07(vec![payload]),
1037 header_tags,
1038 &Endpoint::default(),
1039 )
1040 .with_api_key("TEST-KEY")
1042 .with_retry_strategy(retry_strategy.clone())
1044 .build();
1045
1046 assert_eq!(
1047 send_data.target.api_key,
1048 Some(std::borrow::Cow::Borrowed("TEST-KEY"))
1049 );
1050 assert_eq!(send_data.retry_strategy, retry_strategy);
1051 }
1052}