1pub mod send_data_result;
5
6use crate::msgpack_encoder;
7use crate::send_with_retry::{send_with_retry, RetryStrategy, SendWithRetryResult};
8use crate::trace_utils::TracerHeaderTags;
9use crate::tracer_payload::TracerPayloadCollection;
10use anyhow::{anyhow, Context};
11use futures::stream::FuturesUnordered;
12use futures::StreamExt;
13use http::header::CONTENT_TYPE;
14use libdd_common::{
15 header::{
16 APPLICATION_MSGPACK_STR, APPLICATION_PROTOBUF_STR, DATADOG_SEND_REAL_HTTP_STATUS_STR,
17 DATADOG_TRACE_COUNT_STR,
18 },
19 Connect, Endpoint, GenericHttpClient,
20};
21use libdd_trace_protobuf::pb::{AgentPayload, TracerPayload};
22use send_data_result::SendDataResult;
23use std::collections::HashMap;
24#[cfg(feature = "compression")]
25use std::io::Write;
26#[cfg(feature = "compression")]
27use zstd::stream::write::Encoder;
28
29#[derive(Debug)]
30pub struct SendData {
68 pub(crate) tracer_payloads: TracerPayloadCollection,
69 pub(crate) size: usize, target: Endpoint,
71 headers: HashMap<&'static str, String>,
72 retry_strategy: RetryStrategy,
73 #[cfg(feature = "compression")]
74 compression: Compression,
75}
76
77#[cfg(feature = "compression")]
78#[derive(Debug, Clone)]
79pub enum Compression {
80 Zstd(i32),
81 None,
82}
83
84pub struct SendDataBuilder {
85 pub(crate) tracer_payloads: TracerPayloadCollection,
86 pub(crate) size: usize,
87 target: Endpoint,
88 headers: HashMap<&'static str, String>,
89 retry_strategy: RetryStrategy,
90 #[cfg(feature = "compression")]
91 compression: Compression,
92}
93
94impl SendDataBuilder {
95 pub fn new(
96 size: usize,
97 tracer_payload: TracerPayloadCollection,
98 tracer_header_tags: TracerHeaderTags,
99 target: &Endpoint,
100 ) -> SendDataBuilder {
101 let mut headers: HashMap<&'static str, String> = tracer_header_tags.into();
102 headers.insert(DATADOG_SEND_REAL_HTTP_STATUS_STR, "1".to_string());
103 SendDataBuilder {
104 tracer_payloads: tracer_payload,
105 size,
106 target: target.clone(),
107 headers,
108 retry_strategy: RetryStrategy::default(),
109 #[cfg(feature = "compression")]
110 compression: Compression::None,
111 }
112 }
113
114 #[cfg(feature = "compression")]
115 pub fn with_compression(mut self, compression: Compression) -> SendDataBuilder {
116 self.compression = compression;
117 self
118 }
119
120 pub fn with_api_key(mut self, api_key: &str) -> SendDataBuilder {
121 self.target.api_key = Some(api_key.to_string().into());
122 self
123 }
124
125 pub fn with_retry_strategy(mut self, retry_strategy: RetryStrategy) -> SendDataBuilder {
126 self.retry_strategy = retry_strategy;
127 self
128 }
129
130 pub fn build(self) -> SendData {
131 SendData {
132 tracer_payloads: self.tracer_payloads,
133 size: self.size,
134 target: self.target,
135 headers: self.headers,
136 retry_strategy: self.retry_strategy,
137 #[cfg(feature = "compression")]
138 compression: self.compression,
139 }
140 }
141}
142
143impl SendData {
144 #[allow(unused_variables)]
157 pub fn new(
158 size: usize,
159 tracer_payload: TracerPayloadCollection,
160 tracer_header_tags: TracerHeaderTags,
161 target: &Endpoint,
162 ) -> SendData {
163 let mut headers: HashMap<&'static str, String> = tracer_header_tags.into();
164 headers.insert(DATADOG_SEND_REAL_HTTP_STATUS_STR, "1".to_string());
165 SendData {
166 tracer_payloads: tracer_payload,
167 size,
168 target: target.clone(),
169 headers,
170 retry_strategy: RetryStrategy::default(),
171 #[cfg(feature = "compression")]
172 compression: Compression::None,
173 }
174 }
175
176 pub fn len(&self) -> usize {
182 self.size
183 }
184
185 pub fn is_empty(&self) -> bool {
191 self.size == 0
192 }
193
194 pub fn get_target(&self) -> &Endpoint {
200 &self.target
201 }
202
203 pub fn get_payloads(&self) -> &TracerPayloadCollection {
209 &self.tracer_payloads
210 }
211
212 pub fn set_retry_strategy(&mut self, retry_strategy: RetryStrategy) {
218 self.retry_strategy = retry_strategy;
219 }
220
221 pub async fn send<C: Connect>(&self, http_client: &GenericHttpClient<C>) -> SendDataResult {
227 self.send_internal(http_client, None).await
228 }
229
230 async fn send_internal<C: Connect>(
231 &self,
232 http_client: &GenericHttpClient<C>,
233 endpoint: Option<Endpoint>,
234 ) -> SendDataResult {
235 if self.use_protobuf() {
236 self.send_with_protobuf(http_client, endpoint).await
237 } else {
238 self.send_with_msgpack(http_client, endpoint).await
239 }
240 }
241
242 async fn send_payload<C: Connect>(
243 &self,
244 chunks: u64,
245 payload: Vec<u8>,
246 headers: HashMap<&'static str, String>,
247 http_client: &GenericHttpClient<C>,
248 endpoint: Option<&Endpoint>,
249 ) -> (SendWithRetryResult, u64, u64) {
250 #[allow(clippy::unwrap_used)]
251 let payload_len = u64::try_from(payload.len()).unwrap();
252 (
253 send_with_retry(
254 http_client,
255 endpoint.unwrap_or(&self.target),
256 payload,
257 &headers,
258 &self.retry_strategy,
259 )
260 .await,
261 payload_len,
262 chunks,
263 )
264 }
265
266 fn use_protobuf(&self) -> bool {
267 self.target.api_key.is_some()
268 }
269
270 #[cfg(feature = "compression")]
271 fn compress_payload(&self, payload: Vec<u8>, headers: &mut HashMap<&str, String>) -> Vec<u8> {
272 match self.compression {
273 Compression::Zstd(level) => {
274 let result = (|| -> std::io::Result<Vec<u8>> {
275 let mut encoder = Encoder::new(Vec::new(), level)?;
276 encoder.write_all(&payload)?;
277 encoder.finish()
278 })();
279
280 match result {
281 Ok(compressed_payload) => {
282 headers.insert("Content-Encoding", "zstd".to_string());
283 compressed_payload
284 }
285 Err(_) => payload,
286 }
287 }
288 _ => payload,
289 }
290 }
291
292 async fn send_with_protobuf<C: Connect>(
293 &self,
294 http_client: &GenericHttpClient<C>,
295 endpoint: Option<Endpoint>,
296 ) -> SendDataResult {
297 let mut result = SendDataResult::default();
298
299 #[allow(clippy::unwrap_used)]
300 let chunks = u64::try_from(self.tracer_payloads.size()).unwrap();
301
302 match &self.tracer_payloads {
303 TracerPayloadCollection::V07(payloads) => {
304 let agent_payload = construct_agent_payload(payloads.to_vec());
305 let serialized_trace_payload = match serialize_proto_payload(&agent_payload)
306 .context("Failed to serialize trace agent payload, dropping traces")
307 {
308 Ok(p) => p,
309 Err(e) => return result.error(e),
310 };
311 let mut request_headers = self.headers.clone();
312
313 #[cfg(feature = "compression")]
314 let final_payload =
315 self.compress_payload(serialized_trace_payload, &mut request_headers);
316
317 #[cfg(not(feature = "compression"))]
318 let final_payload = serialized_trace_payload;
319
320 request_headers.insert(CONTENT_TYPE.as_str(), APPLICATION_PROTOBUF_STR.to_string());
321
322 let (response, bytes_sent, chunks) = self
323 .send_payload(
324 chunks,
325 final_payload,
326 request_headers,
327 http_client,
328 endpoint.as_ref(),
329 )
330 .await;
331
332 result.update(response, bytes_sent, chunks);
333
334 result
335 }
336 _ => result,
337 }
338 }
339
340 async fn send_with_msgpack<C: Connect>(
341 &self,
342 http_client: &GenericHttpClient<C>,
343 endpoint: Option<Endpoint>,
344 ) -> SendDataResult {
345 let mut result = SendDataResult::default();
346 let mut futures = FuturesUnordered::new();
347
348 match &self.tracer_payloads {
349 TracerPayloadCollection::V07(payloads) => {
350 for tracer_payload in payloads {
351 #[allow(clippy::unwrap_used)]
352 let chunks = u64::try_from(tracer_payload.chunks.len()).unwrap();
353 let mut headers = self.headers.clone();
354 headers.insert(DATADOG_TRACE_COUNT_STR, chunks.to_string());
355 headers.insert(CONTENT_TYPE.as_str(), APPLICATION_MSGPACK_STR.to_string());
356
357 let payload = match rmp_serde::to_vec_named(tracer_payload) {
358 Ok(p) => p,
359 Err(e) => return result.error(anyhow!(e)),
360 };
361
362 futures.push(self.send_payload(
363 chunks,
364 payload,
365 headers,
366 http_client,
367 endpoint.as_ref(),
368 ));
369 }
370 }
371 TracerPayloadCollection::V04(payload) => {
372 #[allow(clippy::unwrap_used)]
373 let chunks = u64::try_from(self.tracer_payloads.size()).unwrap();
374 let mut headers = self.headers.clone();
375 headers.insert(DATADOG_TRACE_COUNT_STR, chunks.to_string());
376 headers.insert(CONTENT_TYPE.as_str(), APPLICATION_MSGPACK_STR.to_string());
377
378 let payload = msgpack_encoder::v04::to_vec(payload);
379
380 futures.push(self.send_payload(
381 chunks,
382 payload,
383 headers,
384 http_client,
385 endpoint.as_ref(),
386 ));
387 }
388 TracerPayloadCollection::V05(payload) => {
389 #[allow(clippy::unwrap_used)]
390 let chunks = u64::try_from(self.tracer_payloads.size()).unwrap();
391 let mut headers = self.headers.clone();
392 headers.insert(DATADOG_TRACE_COUNT_STR, chunks.to_string());
393 headers.insert(CONTENT_TYPE.as_str(), APPLICATION_MSGPACK_STR.to_string());
394
395 let payload = match rmp_serde::to_vec(payload) {
396 Ok(p) => p,
397 Err(e) => return result.error(anyhow!(e)),
398 };
399
400 futures.push(self.send_payload(
401 chunks,
402 payload,
403 headers,
404 http_client,
405 endpoint.as_ref(),
406 ));
407 }
408 }
409
410 loop {
411 match futures.next().await {
412 Some((response, payload_len, chunks)) => {
413 result.update(response, payload_len, chunks);
414 if result.last_result.is_err() {
415 return result;
416 }
417 }
418 None => return result,
419 }
420 }
421 }
422}
423
424fn construct_agent_payload(tracer_payloads: Vec<TracerPayload>) -> AgentPayload {
425 AgentPayload {
426 host_name: "".to_string(),
427 env: "".to_string(),
428 agent_version: "".to_string(),
429 error_tps: 60.0,
430 target_tps: 60.0,
431 tags: HashMap::new(),
432 tracer_payloads,
433 rare_sampler_enabled: false,
434 idx_tracer_payloads: Vec::new(),
435 }
436}
437
438fn serialize_proto_payload<T>(payload: &T) -> anyhow::Result<Vec<u8>>
439where
440 T: prost::Message,
441{
442 let mut buf = Vec::with_capacity(payload.encoded_len());
443 payload.encode(&mut buf)?;
444 Ok(buf)
445}
446
447#[cfg(test)]
448mod tests {
449 use super::*;
450 use crate::send_with_retry::{RetryBackoffType, RetryStrategy};
451 use crate::test_utils::create_test_no_alloc_span;
452 use crate::trace_utils::{construct_trace_chunk, construct_tracer_payload, RootSpanTags};
453 use crate::tracer_header_tags::TracerHeaderTags;
454 use httpmock::prelude::*;
455 use httpmock::MockServer;
456 use libdd_common::Endpoint;
457 use libdd_trace_protobuf::pb::Span;
458 use std::collections::HashMap;
459 use std::time::Duration;
460
461 const ONE_SECOND: u64 = 1_000;
462 const HEADER_TAGS: TracerHeaderTags = TracerHeaderTags {
463 lang: "test-lang",
464 lang_version: "2.0",
465 lang_interpreter: "interpreter",
466 lang_vendor: "vendor",
467 tracer_version: "1.0",
468 container_id: "id",
469 client_computed_top_level: false,
470 client_computed_stats: false,
471 dropped_p0_traces: 0,
472 dropped_p0_spans: 0,
473 };
474
475 fn setup_payload(header_tags: &TracerHeaderTags) -> TracerPayload {
476 let root_tags = RootSpanTags {
477 env: "TEST",
478 app_version: "1.0",
479 hostname: "test_bench",
480 runtime_id: "id",
481 };
482
483 let chunk = construct_trace_chunk(vec![Span {
484 service: "test-service".to_string(),
485 name: "test-service-name".to_string(),
486 resource: "test-service-resource".to_string(),
487 trace_id: 111,
488 span_id: 222,
489 parent_id: 333,
490 start: 1,
491 duration: 5,
492 error: 0,
493 meta: HashMap::new(),
494 metrics: HashMap::new(),
495 meta_struct: HashMap::new(),
496 r#type: "".to_string(),
497 span_links: vec![],
498 span_events: vec![],
499 }]);
500
501 construct_tracer_payload(vec![chunk], header_tags, root_tags)
502 }
503
504 fn compute_payload_len(collection: &TracerPayloadCollection) -> usize {
505 match collection {
506 TracerPayloadCollection::V07(payloads) => {
507 let agent_payload = construct_agent_payload(payloads.to_vec());
508 let serialized_trace_payload = serialize_proto_payload(&agent_payload).unwrap();
509 serialized_trace_payload.len()
510 }
511 _ => 0,
512 }
513 }
514
515 fn rmp_compute_payload_len(collection: &TracerPayloadCollection) -> usize {
516 match collection {
517 TracerPayloadCollection::V07(payloads) => {
518 let mut total: usize = 0;
519 for payload in payloads {
520 total += rmp_serde::to_vec_named(payload).unwrap().len();
521 }
522 total
523 }
524 TracerPayloadCollection::V04(payloads) => {
525 msgpack_encoder::v04::to_len(payloads) as usize
526 }
527 TracerPayloadCollection::V05(payloads) => rmp_serde::to_vec(payloads).unwrap().len(),
528 }
529 }
530
531 #[test]
532 fn send_data_new_api_key() {
533 let header_tags = TracerHeaderTags::default();
534
535 let payload = setup_payload(&header_tags);
536 let data = SendData::new(
537 100,
538 TracerPayloadCollection::V07(vec![payload]),
539 HEADER_TAGS,
540 &Endpoint {
541 api_key: Some(std::borrow::Cow::Borrowed("TEST-KEY")),
542 url: "/foo/bar?baz".parse::<hyper::Uri>().unwrap(),
543 timeout_ms: ONE_SECOND,
544 ..Endpoint::default()
545 },
546 );
547
548 assert_eq!(data.size, 100);
549
550 assert_eq!(data.target.api_key.unwrap(), "TEST-KEY");
551 assert_eq!(data.target.url.path(), "/foo/bar");
552 }
553
554 #[test]
555 fn send_data_new_no_api_key() {
556 let header_tags = TracerHeaderTags::default();
557
558 let payload = setup_payload(&header_tags);
559 let data = SendData::new(
560 100,
561 TracerPayloadCollection::V07(vec![payload]),
562 header_tags.clone(),
563 &Endpoint {
564 api_key: None,
565 url: "/foo/bar?baz".parse::<hyper::Uri>().unwrap(),
566 timeout_ms: ONE_SECOND,
567 ..Endpoint::default()
568 },
569 );
570
571 assert_eq!(data.size, 100);
572
573 assert_eq!(data.target.api_key, None);
574 assert_eq!(data.target.url.path(), "/foo/bar");
575
576 for (key, value) in HashMap::from(header_tags) {
577 assert_eq!(data.headers.get(key).unwrap(), &value);
578 }
579 }
580
581 #[cfg_attr(miri, ignore)]
582 #[tokio::test]
583 async fn request_protobuf() {
584 let server = MockServer::start_async().await;
585
586 let mock = server
587 .mock_async(|when, then| {
588 when.method(POST)
589 .header("Content-type", "application/x-protobuf")
590 .header("DD-API-KEY", "TEST-KEY")
591 .path("/");
592 then.status(202).body("");
593 })
594 .await;
595
596 let header_tags = TracerHeaderTags::default();
597
598 let payload = setup_payload(&header_tags);
599 let data = SendData::new(
600 100,
601 TracerPayloadCollection::V07(vec![payload.clone()]),
602 header_tags,
603 &Endpoint {
604 api_key: Some(std::borrow::Cow::Borrowed("TEST-KEY")),
605 url: server.url("/").parse::<hyper::Uri>().unwrap(),
606 timeout_ms: ONE_SECOND,
607 ..Endpoint::default()
608 },
609 );
610
611 let data_payload_len = compute_payload_len(&data.tracer_payloads);
612 let client = libdd_common::http_common::new_default_client();
613 let res = data.send(&client).await;
614
615 mock.assert_async().await;
616
617 assert_eq!(res.last_result.unwrap().status(), 202);
618 assert_eq!(res.errors_timeout, 0);
619 assert_eq!(res.errors_network, 0);
620 assert_eq!(res.errors_status_code, 0);
621 assert_eq!(res.requests_count, 1);
622 assert_eq!(res.chunks_sent, 1);
623 assert_eq!(res.bytes_sent, data_payload_len as u64);
624 assert_eq!(*res.responses_count_per_code.get(&202).unwrap(), 1_u64);
625 }
626
627 #[cfg_attr(miri, ignore)]
628 #[tokio::test]
629 async fn request_protobuf_several_payloads() {
630 let server = MockServer::start_async().await;
631
632 let mock = server
633 .mock_async(|when, then| {
634 when.method(POST)
635 .header("Content-type", "application/x-protobuf")
636 .header("DD-API-KEY", "TEST-KEY")
637 .path("/");
638 then.status(202).body("");
639 })
640 .await;
641
642 let header_tags = TracerHeaderTags::default();
643
644 let payload = setup_payload(&header_tags);
645 let data = SendData::new(
646 100,
647 TracerPayloadCollection::V07(vec![payload.clone(), payload.clone()]),
648 header_tags,
649 &Endpoint {
650 api_key: Some(std::borrow::Cow::Borrowed("TEST-KEY")),
651 url: server.url("/").parse::<hyper::Uri>().unwrap(),
652 timeout_ms: ONE_SECOND,
653 ..Endpoint::default()
654 },
655 );
656
657 let data_payload_len = compute_payload_len(&data.tracer_payloads);
658 let client = libdd_common::http_common::new_default_client();
659 let res = data.send(&client).await;
660
661 mock.assert_async().await;
662
663 assert_eq!(res.last_result.unwrap().status(), 202);
664 assert_eq!(res.errors_timeout, 0);
665 assert_eq!(res.errors_network, 0);
666 assert_eq!(res.errors_status_code, 0);
667 assert_eq!(res.requests_count, 1);
668 assert_eq!(res.chunks_sent, 2);
669 assert_eq!(res.bytes_sent, data_payload_len as u64);
670 assert_eq!(*res.responses_count_per_code.get(&202).unwrap(), 1_u64);
671 }
672
673 #[cfg_attr(miri, ignore)]
674 #[tokio::test]
675 async fn request_msgpack_v07() {
676 let server = MockServer::start_async().await;
677
678 let header_tags = HEADER_TAGS;
679 let mock = server
680 .mock_async(|when, then| {
681 when.method(POST)
682 .header(DATADOG_TRACE_COUNT_STR, "1")
683 .header("Content-type", "application/msgpack")
684 .header("datadog-meta-lang", header_tags.lang)
685 .header(
686 "datadog-meta-lang-interpreter",
687 header_tags.lang_interpreter,
688 )
689 .header("datadog-meta-lang-version", header_tags.lang_version)
690 .header(
691 "datadog-meta-lang-interpreter-vendor",
692 header_tags.lang_vendor,
693 )
694 .header("datadog-meta-tracer-version", header_tags.tracer_version)
695 .header("datadog-container-id", header_tags.container_id)
696 .header("Datadog-Send-Real-Http-Status", "1")
697 .path("/");
698 then.status(200).body("");
699 })
700 .await;
701
702 let header_tags = HEADER_TAGS;
703
704 let payload = setup_payload(&header_tags);
705 let data = SendData::new(
706 100,
707 TracerPayloadCollection::V07(vec![payload.clone()]),
708 header_tags,
709 &Endpoint {
710 api_key: None,
711 url: server.url("/").parse::<hyper::Uri>().unwrap(),
712 timeout_ms: ONE_SECOND,
713 ..Endpoint::default()
714 },
715 );
716
717 let data_payload_len = rmp_compute_payload_len(&data.tracer_payloads);
718 let client = libdd_common::http_common::new_default_client();
719 let res = data.send(&client).await;
720
721 mock.assert_async().await;
722
723 assert_eq!(res.last_result.unwrap().status(), 200);
724 assert_eq!(res.errors_timeout, 0);
725 assert_eq!(res.errors_network, 0);
726 assert_eq!(res.errors_status_code, 0);
727 assert_eq!(res.requests_count, 1);
728 assert_eq!(res.chunks_sent, 1);
729 assert_eq!(res.bytes_sent, data_payload_len as u64);
730 assert_eq!(*res.responses_count_per_code.get(&200).unwrap(), 1_u64);
731 }
732
733 #[cfg_attr(miri, ignore)]
734 #[tokio::test]
735 async fn request_msgpack_v04() {
736 let server = MockServer::start_async().await;
737
738 let header_tags = HEADER_TAGS;
739 let mock = server
740 .mock_async(|when, then| {
741 when.method(POST)
742 .header(DATADOG_TRACE_COUNT_STR, "1")
743 .header("Content-type", "application/msgpack")
744 .header("datadog-meta-lang", header_tags.lang)
745 .header(
746 "datadog-meta-lang-interpreter",
747 header_tags.lang_interpreter,
748 )
749 .header("datadog-meta-lang-version", header_tags.lang_version)
750 .header(
751 "datadog-meta-lang-interpreter-vendor",
752 header_tags.lang_vendor,
753 )
754 .header("datadog-meta-tracer-version", header_tags.tracer_version)
755 .header("datadog-container-id", header_tags.container_id)
756 .path("/");
757 then.status(200).body("");
758 })
759 .await;
760
761 let header_tags = HEADER_TAGS;
762
763 let trace = vec![create_test_no_alloc_span(1234, 12342, 12341, 1, false)];
764 let data = SendData::new(
765 100,
766 TracerPayloadCollection::V04(vec![trace.clone()]),
767 header_tags,
768 &Endpoint {
769 api_key: None,
770 url: server.url("/").parse::<hyper::Uri>().unwrap(),
771 timeout_ms: ONE_SECOND,
772 ..Endpoint::default()
773 },
774 );
775
776 let data_payload_len = rmp_compute_payload_len(&data.tracer_payloads);
777 let client = libdd_common::http_common::new_default_client();
778 let res = data.send(&client).await;
779
780 mock.assert_async().await;
781
782 assert_eq!(res.last_result.unwrap().status(), 200);
783 assert_eq!(res.errors_timeout, 0);
784 assert_eq!(res.errors_network, 0);
785 assert_eq!(res.errors_status_code, 0);
786 assert_eq!(res.requests_count, 1);
787 assert_eq!(res.chunks_sent, 1);
788 assert_eq!(res.bytes_sent, data_payload_len as u64);
789 assert_eq!(*res.responses_count_per_code.get(&200).unwrap(), 1_u64);
790 }
791
792 #[cfg_attr(miri, ignore)]
793 #[tokio::test]
794 async fn request_msgpack_several_payloads() {
795 let server = MockServer::start_async().await;
796
797 let mock = server
798 .mock_async(|when, then| {
799 when.method(POST)
800 .header("Content-type", "application/msgpack")
801 .path("/");
802 then.status(200).body("");
803 })
804 .await;
805
806 let header_tags = TracerHeaderTags::default();
807
808 let payload = setup_payload(&header_tags);
809 let data = SendData::new(
810 100,
811 TracerPayloadCollection::V07(vec![payload.clone(), payload.clone()]),
812 header_tags,
813 &Endpoint {
814 api_key: None,
815 url: server.url("/").parse::<hyper::Uri>().unwrap(),
816 timeout_ms: ONE_SECOND,
817 ..Endpoint::default()
818 },
819 );
820
821 let data_payload_len = rmp_compute_payload_len(&data.tracer_payloads);
822 let client = libdd_common::http_common::new_default_client();
823 let res = data.send(&client).await;
824
825 mock.assert_calls_async(2).await;
826
827 assert_eq!(res.last_result.unwrap().status(), 200);
828 assert_eq!(res.errors_timeout, 0);
829 assert_eq!(res.errors_network, 0);
830 assert_eq!(res.errors_status_code, 0);
831 assert_eq!(res.requests_count, 2);
832 assert_eq!(res.chunks_sent, 2);
833 assert_eq!(res.bytes_sent, data_payload_len as u64);
834 assert_eq!(*res.responses_count_per_code.get(&200).unwrap(), 2_u64);
835 }
836
837 #[cfg_attr(miri, ignore)]
838 #[tokio::test]
839 async fn request_error_status_code() {
840 let server = MockServer::start_async().await;
841
842 let mock = server
843 .mock_async(|when, then| {
844 when.method(POST)
845 .header("Content-type", "application/msgpack")
846 .path("/");
847 then.status(500).body("");
848 })
849 .await;
850
851 let payload = setup_payload(&HEADER_TAGS);
852 let data = SendData::new(
853 100,
854 TracerPayloadCollection::V07(vec![payload]),
855 HEADER_TAGS,
856 &Endpoint {
857 api_key: None,
858 url: server.url("/").parse::<hyper::Uri>().unwrap(),
859 timeout_ms: ONE_SECOND,
860 ..Endpoint::default()
861 },
862 );
863
864 let client = libdd_common::http_common::new_default_client();
865 let res = data.send(&client).await;
866
867 mock.assert_calls_async(5).await;
868
869 assert!(res.last_result.is_ok());
870 assert_eq!(res.last_result.unwrap().status(), 500);
871 assert_eq!(res.errors_timeout, 0);
872 assert_eq!(res.errors_network, 0);
873 assert_eq!(res.errors_status_code, 1);
874 assert_eq!(res.requests_count, 5);
875 assert_eq!(res.chunks_sent, 0);
876 assert_eq!(res.bytes_sent, 0);
877 assert_eq!(*res.responses_count_per_code.get(&500).unwrap(), 1_u64);
878 }
879
880 #[cfg_attr(miri, ignore)]
881 #[tokio::test]
882 async fn request_error_network() {
883 let payload = setup_payload(&HEADER_TAGS);
885 let data = SendData::new(
886 100,
887 TracerPayloadCollection::V07(vec![payload]),
888 HEADER_TAGS,
889 &Endpoint {
890 api_key: None,
891 url: "http://127.0.0.1:4321/".parse::<hyper::Uri>().unwrap(),
892 timeout_ms: ONE_SECOND,
893 ..Endpoint::default()
894 },
895 );
896
897 let client = libdd_common::http_common::new_default_client();
898 let res = data.send(&client).await;
899
900 assert!(res.last_result.is_err());
901 match std::env::consts::OS {
902 "windows" => {
903 assert_eq!(res.errors_timeout, 1);
907 assert_eq!(res.errors_network, 0);
908 }
909 _ => {
910 assert_eq!(res.errors_timeout, 0);
911 assert_eq!(res.errors_network, 1);
912 }
913 }
914 assert_eq!(res.errors_status_code, 0);
915 assert_eq!(res.requests_count, 5);
916 assert_eq!(res.errors_status_code, 0);
917 assert_eq!(res.chunks_sent, 0);
918 assert_eq!(res.bytes_sent, 0);
919 assert_eq!(res.responses_count_per_code.len(), 0);
920 }
921
922 #[cfg_attr(miri, ignore)]
923 #[tokio::test]
924 async fn request_error_timeout_v04() {
925 let server = MockServer::start_async().await;
926
927 let header_tags = HEADER_TAGS;
928 let mock = server
929 .mock_async(|when, then| {
930 when.method(POST)
931 .header(DATADOG_TRACE_COUNT_STR, "2")
932 .header("Content-type", "application/msgpack")
933 .header("datadog-meta-lang", header_tags.lang)
934 .header(
935 "datadog-meta-lang-interpreter",
936 header_tags.lang_interpreter,
937 )
938 .header("datadog-meta-lang-version", header_tags.lang_version)
939 .header(
940 "datadog-meta-lang-interpreter-vendor",
941 header_tags.lang_vendor,
942 )
943 .header("datadog-meta-tracer-version", header_tags.tracer_version)
944 .header("datadog-container-id", header_tags.container_id)
945 .path("/");
946 then.status(200).body("").delay(Duration::from_millis(500));
947 })
948 .await;
949
950 let header_tags = HEADER_TAGS;
951
952 let trace = vec![create_test_no_alloc_span(1234, 12342, 12341, 1, false)];
953 let data = SendData::new(
954 100,
955 TracerPayloadCollection::V04(vec![trace.clone(), trace.clone()]),
956 header_tags,
957 &Endpoint {
958 api_key: None,
959 url: server.url("/").parse::<hyper::Uri>().unwrap(),
960 timeout_ms: 200,
961 ..Endpoint::default()
962 },
963 );
964
965 let client = libdd_common::http_common::new_default_client();
966 let res = data.send(&client).await;
967
968 mock.assert_calls_async(5).await;
969
970 assert_eq!(res.errors_timeout, 1);
971 assert_eq!(res.errors_network, 0);
972 assert_eq!(res.errors_status_code, 0);
973 assert_eq!(res.requests_count, 5);
974 assert_eq!(res.chunks_sent, 0);
975 assert_eq!(res.bytes_sent, 0);
976 assert_eq!(res.responses_count_per_code.len(), 0);
977 }
978
979 #[cfg_attr(miri, ignore)]
980 #[tokio::test]
981 async fn request_error_timeout_v07() {
982 let server = MockServer::start_async().await;
983
984 let mock = server
985 .mock_async(|when, then| {
986 when.method(POST)
987 .header("Content-type", "application/msgpack")
988 .path("/");
989 then.status(200).body("").delay(Duration::from_millis(500));
990 })
991 .await;
992
993 let header_tags = TracerHeaderTags::default();
994
995 let payload = setup_payload(&header_tags);
996 let data = SendData::new(
997 100,
998 TracerPayloadCollection::V07(vec![payload.clone(), payload.clone()]),
999 header_tags,
1000 &Endpoint {
1001 api_key: None,
1002 url: server.url("/").parse::<hyper::Uri>().unwrap(),
1003 timeout_ms: 200,
1004 ..Endpoint::default()
1005 },
1006 );
1007
1008 let client = libdd_common::http_common::new_default_client();
1009 let res = data.send(&client).await;
1010
1011 mock.assert_calls_async(10).await;
1012
1013 assert_eq!(res.errors_timeout, 1);
1014 assert_eq!(res.errors_network, 0);
1015 assert_eq!(res.errors_status_code, 0);
1016 assert_eq!(res.requests_count, 5);
1017 assert_eq!(res.chunks_sent, 0);
1018 assert_eq!(res.bytes_sent, 0);
1019 assert_eq!(res.responses_count_per_code.len(), 0);
1020 }
1021
1022 #[test]
1023 fn test_builder() {
1024 let header_tags = HEADER_TAGS;
1025 let payload = setup_payload(&header_tags);
1026 let retry_strategy = RetryStrategy::new(5, 100, RetryBackoffType::Constant, None);
1027
1028 let send_data = SendDataBuilder::new(
1029 100,
1030 TracerPayloadCollection::V07(vec![payload]),
1031 header_tags,
1032 &Endpoint::default(),
1033 )
1034 .with_api_key("TEST-KEY")
1036 .with_retry_strategy(retry_strategy.clone())
1038 .build();
1039
1040 assert_eq!(
1041 send_data.target.api_key,
1042 Some(std::borrow::Cow::Borrowed("TEST-KEY"))
1043 );
1044 assert_eq!(send_data.retry_strategy, retry_strategy);
1045 }
1046}