1pub mod send_data_result;
5
6use crate::msgpack_encoder;
7use crate::send_with_retry::{send_with_retry, RetryStrategy, SendWithRetryResult};
8use crate::trace_utils::TracerHeaderTags;
9use crate::tracer_payload::TracerPayloadCollection;
10use anyhow::{anyhow, Context};
11use futures::stream::FuturesUnordered;
12use futures::StreamExt;
13use hyper::header::CONTENT_TYPE;
14use libdd_common::{
15 header::{
16 APPLICATION_MSGPACK_STR, APPLICATION_PROTOBUF_STR, DATADOG_SEND_REAL_HTTP_STATUS_STR,
17 DATADOG_TRACE_COUNT_STR,
18 },
19 Connect, Endpoint, GenericHttpClient,
20};
21use libdd_trace_protobuf::pb::{AgentPayload, TracerPayload};
22use send_data_result::SendDataResult;
23use std::collections::HashMap;
24#[cfg(feature = "compression")]
25use std::io::Write;
26#[cfg(feature = "compression")]
27use zstd::stream::write::Encoder;
28
29#[derive(Debug, Clone)]
30pub struct SendData {
68 pub(crate) tracer_payloads: TracerPayloadCollection,
69 pub(crate) size: usize, target: Endpoint,
71 headers: HashMap<&'static str, String>,
72 retry_strategy: RetryStrategy,
73 #[cfg(feature = "compression")]
74 compression: Compression,
75}
76
77#[cfg(feature = "compression")]
78#[derive(Debug, Clone)]
79pub enum Compression {
80 Zstd(i32),
81 None,
82}
83
84#[derive(Clone)]
85pub struct SendDataBuilder {
86 pub(crate) tracer_payloads: TracerPayloadCollection,
87 pub(crate) size: usize,
88 target: Endpoint,
89 headers: HashMap<&'static str, String>,
90 retry_strategy: RetryStrategy,
91 #[cfg(feature = "compression")]
92 compression: Compression,
93}
94
95impl SendDataBuilder {
96 pub fn new(
97 size: usize,
98 tracer_payload: TracerPayloadCollection,
99 tracer_header_tags: TracerHeaderTags,
100 target: &Endpoint,
101 ) -> SendDataBuilder {
102 let mut headers: HashMap<&'static str, String> = tracer_header_tags.into();
103 headers.insert(DATADOG_SEND_REAL_HTTP_STATUS_STR, "1".to_string());
104 SendDataBuilder {
105 tracer_payloads: tracer_payload,
106 size,
107 target: target.clone(),
108 headers,
109 retry_strategy: RetryStrategy::default(),
110 #[cfg(feature = "compression")]
111 compression: Compression::None,
112 }
113 }
114
115 #[cfg(feature = "compression")]
116 pub fn with_compression(mut self, compression: Compression) -> SendDataBuilder {
117 self.compression = compression;
118 self
119 }
120
121 pub fn with_api_key(mut self, api_key: &str) -> SendDataBuilder {
122 self.target.api_key = Some(api_key.to_string().into());
123 self
124 }
125
126 pub fn with_retry_strategy(mut self, retry_strategy: RetryStrategy) -> SendDataBuilder {
127 self.retry_strategy = retry_strategy;
128 self
129 }
130
131 pub fn build(self) -> SendData {
132 SendData {
133 tracer_payloads: self.tracer_payloads,
134 size: self.size,
135 target: self.target,
136 headers: self.headers,
137 retry_strategy: self.retry_strategy,
138 #[cfg(feature = "compression")]
139 compression: self.compression,
140 }
141 }
142}
143
144impl SendData {
145 #[allow(unused_variables)]
158 pub fn new(
159 size: usize,
160 tracer_payload: TracerPayloadCollection,
161 tracer_header_tags: TracerHeaderTags,
162 target: &Endpoint,
163 ) -> SendData {
164 let mut headers: HashMap<&'static str, String> = tracer_header_tags.into();
165 headers.insert(DATADOG_SEND_REAL_HTTP_STATUS_STR, "1".to_string());
166 SendData {
167 tracer_payloads: tracer_payload,
168 size,
169 target: target.clone(),
170 headers,
171 retry_strategy: RetryStrategy::default(),
172 #[cfg(feature = "compression")]
173 compression: Compression::None,
174 }
175 }
176
177 pub fn len(&self) -> usize {
183 self.size
184 }
185
186 pub fn is_empty(&self) -> bool {
192 self.size == 0
193 }
194
195 pub fn get_target(&self) -> &Endpoint {
201 &self.target
202 }
203
204 pub fn get_payloads(&self) -> &TracerPayloadCollection {
210 &self.tracer_payloads
211 }
212
213 pub fn set_retry_strategy(&mut self, retry_strategy: RetryStrategy) {
219 self.retry_strategy = retry_strategy;
220 }
221
222 pub fn with_endpoint(&self, endpoint: Endpoint) -> SendData {
228 SendData {
229 target: endpoint,
230 ..self.clone()
231 }
232 }
233
234 pub async fn send<C: Connect>(&self, http_client: &GenericHttpClient<C>) -> SendDataResult {
240 self.send_internal(http_client).await
241 }
242
243 async fn send_internal<C: Connect>(
244 &self,
245 http_client: &GenericHttpClient<C>,
246 ) -> SendDataResult {
247 if self.use_protobuf() {
248 self.send_with_protobuf(http_client).await
249 } else {
250 self.send_with_msgpack(http_client).await
251 }
252 }
253
254 async fn send_payload<C: Connect>(
255 &self,
256 chunks: u64,
257 payload: Vec<u8>,
258 headers: HashMap<&'static str, String>,
259 http_client: &GenericHttpClient<C>,
260 ) -> (SendWithRetryResult, u64, u64) {
261 #[allow(clippy::unwrap_used)]
262 let payload_len = u64::try_from(payload.len()).unwrap();
263 (
264 send_with_retry(
265 http_client,
266 &self.target,
267 payload,
268 &headers,
269 &self.retry_strategy,
270 )
271 .await,
272 payload_len,
273 chunks,
274 )
275 }
276
277 fn use_protobuf(&self) -> bool {
278 self.target.api_key.is_some()
279 }
280
281 #[cfg(feature = "compression")]
282 fn compress_payload(&self, payload: Vec<u8>, headers: &mut HashMap<&str, String>) -> Vec<u8> {
283 match self.compression {
284 Compression::Zstd(level) => {
285 let result = (|| -> std::io::Result<Vec<u8>> {
286 let mut encoder = Encoder::new(Vec::new(), level)?;
287 encoder.write_all(&payload)?;
288 encoder.finish()
289 })();
290
291 match result {
292 Ok(compressed_payload) => {
293 headers.insert("Content-Encoding", "zstd".to_string());
294 compressed_payload
295 }
296 Err(_) => payload,
297 }
298 }
299 _ => payload,
300 }
301 }
302
303 async fn send_with_protobuf<C: Connect>(
304 &self,
305 http_client: &GenericHttpClient<C>,
306 ) -> SendDataResult {
307 let mut result = SendDataResult::default();
308
309 #[allow(clippy::unwrap_used)]
310 let chunks = u64::try_from(self.tracer_payloads.size()).unwrap();
311
312 match &self.tracer_payloads {
313 TracerPayloadCollection::V07(payloads) => {
314 let agent_payload = construct_agent_payload(payloads.to_vec());
315 let serialized_trace_payload = match serialize_proto_payload(&agent_payload)
316 .context("Failed to serialize trace agent payload, dropping traces")
317 {
318 Ok(p) => p,
319 Err(e) => return result.error(e),
320 };
321 let mut request_headers = self.headers.clone();
322
323 #[cfg(feature = "compression")]
324 let final_payload =
325 self.compress_payload(serialized_trace_payload, &mut request_headers);
326
327 #[cfg(not(feature = "compression"))]
328 let final_payload = serialized_trace_payload;
329
330 request_headers.insert(CONTENT_TYPE.as_str(), APPLICATION_PROTOBUF_STR.to_string());
331
332 let (response, bytes_sent, chunks) = self
333 .send_payload(chunks, final_payload, request_headers, http_client)
334 .await;
335
336 result.update(response, bytes_sent, chunks);
337
338 result
339 }
340 _ => result,
341 }
342 }
343
344 async fn send_with_msgpack<C: Connect>(
345 &self,
346 http_client: &GenericHttpClient<C>,
347 ) -> SendDataResult {
348 let mut result = SendDataResult::default();
349 let mut futures = FuturesUnordered::new();
350
351 match &self.tracer_payloads {
352 TracerPayloadCollection::V07(payloads) => {
353 for tracer_payload in payloads {
354 #[allow(clippy::unwrap_used)]
355 let chunks = u64::try_from(tracer_payload.chunks.len()).unwrap();
356 let mut headers = self.headers.clone();
357 headers.insert(DATADOG_TRACE_COUNT_STR, chunks.to_string());
358 headers.insert(CONTENT_TYPE.as_str(), APPLICATION_MSGPACK_STR.to_string());
359
360 let payload = match rmp_serde::to_vec_named(tracer_payload) {
361 Ok(p) => p,
362 Err(e) => return result.error(anyhow!(e)),
363 };
364
365 futures.push(self.send_payload(chunks, payload, headers, http_client));
366 }
367 }
368 TracerPayloadCollection::V04(payload) => {
369 #[allow(clippy::unwrap_used)]
370 let chunks = u64::try_from(self.tracer_payloads.size()).unwrap();
371 let mut headers = self.headers.clone();
372 headers.insert(DATADOG_TRACE_COUNT_STR, chunks.to_string());
373 headers.insert(CONTENT_TYPE.as_str(), APPLICATION_MSGPACK_STR.to_string());
374
375 let payload = msgpack_encoder::v04::to_vec(payload);
376
377 futures.push(self.send_payload(chunks, payload, headers, http_client));
378 }
379 TracerPayloadCollection::V05(payload) => {
380 #[allow(clippy::unwrap_used)]
381 let chunks = u64::try_from(self.tracer_payloads.size()).unwrap();
382 let mut headers = self.headers.clone();
383 headers.insert(DATADOG_TRACE_COUNT_STR, chunks.to_string());
384 headers.insert(CONTENT_TYPE.as_str(), APPLICATION_MSGPACK_STR.to_string());
385
386 let payload = match rmp_serde::to_vec(payload) {
387 Ok(p) => p,
388 Err(e) => return result.error(anyhow!(e)),
389 };
390
391 futures.push(self.send_payload(chunks, payload, headers, http_client));
392 }
393 }
394
395 loop {
396 match futures.next().await {
397 Some((response, payload_len, chunks)) => {
398 result.update(response, payload_len, chunks);
399 if result.last_result.is_err() {
400 return result;
401 }
402 }
403 None => return result,
404 }
405 }
406 }
407}
408
409fn construct_agent_payload(tracer_payloads: Vec<TracerPayload>) -> AgentPayload {
410 AgentPayload {
411 host_name: "".to_string(),
412 env: "".to_string(),
413 agent_version: "".to_string(),
414 error_tps: 60.0,
415 target_tps: 60.0,
416 tags: HashMap::new(),
417 tracer_payloads,
418 rare_sampler_enabled: false,
419 idx_tracer_payloads: Vec::new(),
420 }
421}
422
423fn serialize_proto_payload<T>(payload: &T) -> anyhow::Result<Vec<u8>>
424where
425 T: prost::Message,
426{
427 let mut buf = Vec::with_capacity(payload.encoded_len());
428 payload.encode(&mut buf)?;
429 Ok(buf)
430}
431
432#[cfg(test)]
433mod tests {
434 use super::*;
435 use crate::send_with_retry::{RetryBackoffType, RetryStrategy};
436 use crate::test_utils::create_test_no_alloc_span;
437 use crate::trace_utils::{construct_trace_chunk, construct_tracer_payload, RootSpanTags};
438 use crate::tracer_header_tags::TracerHeaderTags;
439 use httpmock::prelude::*;
440 use httpmock::MockServer;
441 use libdd_common::Endpoint;
442 use libdd_trace_protobuf::pb::Span;
443 use std::collections::HashMap;
444 use std::time::Duration;
445
446 const ONE_SECOND: u64 = 1_000;
447 const HEADER_TAGS: TracerHeaderTags = TracerHeaderTags {
448 lang: "test-lang",
449 lang_version: "2.0",
450 lang_interpreter: "interpreter",
451 lang_vendor: "vendor",
452 tracer_version: "1.0",
453 container_id: "id",
454 client_computed_top_level: false,
455 client_computed_stats: false,
456 dropped_p0_traces: 0,
457 dropped_p0_spans: 0,
458 };
459
460 fn setup_payload(header_tags: &TracerHeaderTags) -> TracerPayload {
461 let root_tags = RootSpanTags {
462 env: "TEST",
463 app_version: "1.0",
464 hostname: "test_bench",
465 runtime_id: "id",
466 };
467
468 let chunk = construct_trace_chunk(vec![Span {
469 service: "test-service".to_string(),
470 name: "test-service-name".to_string(),
471 resource: "test-service-resource".to_string(),
472 trace_id: 111,
473 span_id: 222,
474 parent_id: 333,
475 start: 1,
476 duration: 5,
477 error: 0,
478 meta: HashMap::new(),
479 metrics: HashMap::new(),
480 meta_struct: HashMap::new(),
481 r#type: "".to_string(),
482 span_links: vec![],
483 span_events: vec![],
484 }]);
485
486 construct_tracer_payload(vec![chunk], header_tags, root_tags)
487 }
488
489 fn compute_payload_len(collection: &TracerPayloadCollection) -> usize {
490 match collection {
491 TracerPayloadCollection::V07(payloads) => {
492 let agent_payload = construct_agent_payload(payloads.to_vec());
493 let serialized_trace_payload = serialize_proto_payload(&agent_payload).unwrap();
494 serialized_trace_payload.len()
495 }
496 _ => 0,
497 }
498 }
499
500 fn rmp_compute_payload_len(collection: &TracerPayloadCollection) -> usize {
501 match collection {
502 TracerPayloadCollection::V07(payloads) => {
503 let mut total: usize = 0;
504 for payload in payloads {
505 total += rmp_serde::to_vec_named(payload).unwrap().len();
506 }
507 total
508 }
509 TracerPayloadCollection::V04(payloads) => {
510 msgpack_encoder::v04::to_len(payloads) as usize
511 }
512 TracerPayloadCollection::V05(payloads) => rmp_serde::to_vec(payloads).unwrap().len(),
513 }
514 }
515
516 #[test]
517 fn send_data_new_api_key() {
518 let header_tags = TracerHeaderTags::default();
519
520 let payload = setup_payload(&header_tags);
521 let data = SendData::new(
522 100,
523 TracerPayloadCollection::V07(vec![payload]),
524 HEADER_TAGS,
525 &Endpoint {
526 api_key: Some(std::borrow::Cow::Borrowed("TEST-KEY")),
527 url: "/foo/bar?baz".parse::<hyper::Uri>().unwrap(),
528 timeout_ms: ONE_SECOND,
529 ..Endpoint::default()
530 },
531 );
532
533 assert_eq!(data.size, 100);
534
535 assert_eq!(data.target.api_key.unwrap(), "TEST-KEY");
536 assert_eq!(data.target.url.path(), "/foo/bar");
537 }
538
539 #[test]
540 fn send_data_new_no_api_key() {
541 let header_tags = TracerHeaderTags::default();
542
543 let payload = setup_payload(&header_tags);
544 let data = SendData::new(
545 100,
546 TracerPayloadCollection::V07(vec![payload]),
547 header_tags.clone(),
548 &Endpoint {
549 api_key: None,
550 url: "/foo/bar?baz".parse::<hyper::Uri>().unwrap(),
551 timeout_ms: ONE_SECOND,
552 ..Endpoint::default()
553 },
554 );
555
556 assert_eq!(data.size, 100);
557
558 assert_eq!(data.target.api_key, None);
559 assert_eq!(data.target.url.path(), "/foo/bar");
560
561 for (key, value) in HashMap::from(header_tags) {
562 assert_eq!(data.headers.get(key).unwrap(), &value);
563 }
564 }
565
566 #[cfg_attr(miri, ignore)]
567 #[tokio::test]
568 async fn request_protobuf() {
569 let server = MockServer::start_async().await;
570
571 let mock = server
572 .mock_async(|when, then| {
573 when.method(POST)
574 .header("Content-type", "application/x-protobuf")
575 .header("DD-API-KEY", "TEST-KEY")
576 .path("/");
577 then.status(202).body("");
578 })
579 .await;
580
581 let header_tags = TracerHeaderTags::default();
582
583 let payload = setup_payload(&header_tags);
584 let data = SendData::new(
585 100,
586 TracerPayloadCollection::V07(vec![payload.clone()]),
587 header_tags,
588 &Endpoint {
589 api_key: Some(std::borrow::Cow::Borrowed("TEST-KEY")),
590 url: server.url("/").parse::<hyper::Uri>().unwrap(),
591 timeout_ms: ONE_SECOND,
592 ..Endpoint::default()
593 },
594 );
595
596 let data_payload_len = compute_payload_len(&data.tracer_payloads);
597 let client = libdd_common::hyper_migration::new_default_client();
598 let res = data.send(&client).await;
599
600 mock.assert_async().await;
601
602 assert_eq!(res.last_result.unwrap().status(), 202);
603 assert_eq!(res.errors_timeout, 0);
604 assert_eq!(res.errors_network, 0);
605 assert_eq!(res.errors_status_code, 0);
606 assert_eq!(res.requests_count, 1);
607 assert_eq!(res.chunks_sent, 1);
608 assert_eq!(res.bytes_sent, data_payload_len as u64);
609 assert_eq!(*res.responses_count_per_code.get(&202).unwrap(), 1_u64);
610 }
611
612 #[cfg_attr(miri, ignore)]
613 #[tokio::test]
614 async fn request_protobuf_several_payloads() {
615 let server = MockServer::start_async().await;
616
617 let mock = server
618 .mock_async(|when, then| {
619 when.method(POST)
620 .header("Content-type", "application/x-protobuf")
621 .header("DD-API-KEY", "TEST-KEY")
622 .path("/");
623 then.status(202).body("");
624 })
625 .await;
626
627 let header_tags = TracerHeaderTags::default();
628
629 let payload = setup_payload(&header_tags);
630 let data = SendData::new(
631 100,
632 TracerPayloadCollection::V07(vec![payload.clone(), payload.clone()]),
633 header_tags,
634 &Endpoint {
635 api_key: Some(std::borrow::Cow::Borrowed("TEST-KEY")),
636 url: server.url("/").parse::<hyper::Uri>().unwrap(),
637 timeout_ms: ONE_SECOND,
638 ..Endpoint::default()
639 },
640 );
641
642 let data_payload_len = compute_payload_len(&data.tracer_payloads);
643 let client = libdd_common::hyper_migration::new_default_client();
644 let res = data.send(&client).await;
645
646 mock.assert_async().await;
647
648 assert_eq!(res.last_result.unwrap().status(), 202);
649 assert_eq!(res.errors_timeout, 0);
650 assert_eq!(res.errors_network, 0);
651 assert_eq!(res.errors_status_code, 0);
652 assert_eq!(res.requests_count, 1);
653 assert_eq!(res.chunks_sent, 2);
654 assert_eq!(res.bytes_sent, data_payload_len as u64);
655 assert_eq!(*res.responses_count_per_code.get(&202).unwrap(), 1_u64);
656 }
657
658 #[cfg_attr(miri, ignore)]
659 #[tokio::test]
660 async fn request_msgpack_v07() {
661 let server = MockServer::start_async().await;
662
663 let header_tags = HEADER_TAGS;
664 let mock = server
665 .mock_async(|when, then| {
666 when.method(POST)
667 .header(DATADOG_TRACE_COUNT_STR, "1")
668 .header("Content-type", "application/msgpack")
669 .header("datadog-meta-lang", header_tags.lang)
670 .header(
671 "datadog-meta-lang-interpreter",
672 header_tags.lang_interpreter,
673 )
674 .header("datadog-meta-lang-version", header_tags.lang_version)
675 .header(
676 "datadog-meta-lang-interpreter-vendor",
677 header_tags.lang_vendor,
678 )
679 .header("datadog-meta-tracer-version", header_tags.tracer_version)
680 .header("datadog-container-id", header_tags.container_id)
681 .header("Datadog-Send-Real-Http-Status", "1")
682 .path("/");
683 then.status(200).body("");
684 })
685 .await;
686
687 let header_tags = HEADER_TAGS;
688
689 let payload = setup_payload(&header_tags);
690 let data = SendData::new(
691 100,
692 TracerPayloadCollection::V07(vec![payload.clone()]),
693 header_tags,
694 &Endpoint {
695 api_key: None,
696 url: server.url("/").parse::<hyper::Uri>().unwrap(),
697 timeout_ms: ONE_SECOND,
698 ..Endpoint::default()
699 },
700 );
701
702 let data_payload_len = rmp_compute_payload_len(&data.tracer_payloads);
703 let client = libdd_common::hyper_migration::new_default_client();
704 let res = data.send(&client).await;
705
706 mock.assert_async().await;
707
708 assert_eq!(res.last_result.unwrap().status(), 200);
709 assert_eq!(res.errors_timeout, 0);
710 assert_eq!(res.errors_network, 0);
711 assert_eq!(res.errors_status_code, 0);
712 assert_eq!(res.requests_count, 1);
713 assert_eq!(res.chunks_sent, 1);
714 assert_eq!(res.bytes_sent, data_payload_len as u64);
715 assert_eq!(*res.responses_count_per_code.get(&200).unwrap(), 1_u64);
716 }
717
718 #[cfg_attr(miri, ignore)]
719 #[tokio::test]
720 async fn request_msgpack_v04() {
721 let server = MockServer::start_async().await;
722
723 let header_tags = HEADER_TAGS;
724 let mock = server
725 .mock_async(|when, then| {
726 when.method(POST)
727 .header(DATADOG_TRACE_COUNT_STR, "1")
728 .header("Content-type", "application/msgpack")
729 .header("datadog-meta-lang", header_tags.lang)
730 .header(
731 "datadog-meta-lang-interpreter",
732 header_tags.lang_interpreter,
733 )
734 .header("datadog-meta-lang-version", header_tags.lang_version)
735 .header(
736 "datadog-meta-lang-interpreter-vendor",
737 header_tags.lang_vendor,
738 )
739 .header("datadog-meta-tracer-version", header_tags.tracer_version)
740 .header("datadog-container-id", header_tags.container_id)
741 .path("/");
742 then.status(200).body("");
743 })
744 .await;
745
746 let header_tags = HEADER_TAGS;
747
748 let trace = vec![create_test_no_alloc_span(1234, 12342, 12341, 1, false)];
749 let data = SendData::new(
750 100,
751 TracerPayloadCollection::V04(vec![trace.clone()]),
752 header_tags,
753 &Endpoint {
754 api_key: None,
755 url: server.url("/").parse::<hyper::Uri>().unwrap(),
756 timeout_ms: ONE_SECOND,
757 ..Endpoint::default()
758 },
759 );
760
761 let data_payload_len = rmp_compute_payload_len(&data.tracer_payloads);
762 let client = libdd_common::hyper_migration::new_default_client();
763 let res = data.send(&client).await;
764
765 mock.assert_async().await;
766
767 assert_eq!(res.last_result.unwrap().status(), 200);
768 assert_eq!(res.errors_timeout, 0);
769 assert_eq!(res.errors_network, 0);
770 assert_eq!(res.errors_status_code, 0);
771 assert_eq!(res.requests_count, 1);
772 assert_eq!(res.chunks_sent, 1);
773 assert_eq!(res.bytes_sent, data_payload_len as u64);
774 assert_eq!(*res.responses_count_per_code.get(&200).unwrap(), 1_u64);
775 }
776
777 #[cfg_attr(miri, ignore)]
778 #[tokio::test]
779 async fn request_msgpack_several_payloads() {
780 let server = MockServer::start_async().await;
781
782 let mock = server
783 .mock_async(|when, then| {
784 when.method(POST)
785 .header("Content-type", "application/msgpack")
786 .path("/");
787 then.status(200).body("");
788 })
789 .await;
790
791 let header_tags = TracerHeaderTags::default();
792
793 let payload = setup_payload(&header_tags);
794 let data = SendData::new(
795 100,
796 TracerPayloadCollection::V07(vec![payload.clone(), payload.clone()]),
797 header_tags,
798 &Endpoint {
799 api_key: None,
800 url: server.url("/").parse::<hyper::Uri>().unwrap(),
801 timeout_ms: ONE_SECOND,
802 ..Endpoint::default()
803 },
804 );
805
806 let data_payload_len = rmp_compute_payload_len(&data.tracer_payloads);
807 let client = libdd_common::hyper_migration::new_default_client();
808 let res = data.send(&client).await;
809
810 mock.assert_calls_async(2).await;
811
812 assert_eq!(res.last_result.unwrap().status(), 200);
813 assert_eq!(res.errors_timeout, 0);
814 assert_eq!(res.errors_network, 0);
815 assert_eq!(res.errors_status_code, 0);
816 assert_eq!(res.requests_count, 2);
817 assert_eq!(res.chunks_sent, 2);
818 assert_eq!(res.bytes_sent, data_payload_len as u64);
819 assert_eq!(*res.responses_count_per_code.get(&200).unwrap(), 2_u64);
820 }
821
822 #[cfg_attr(miri, ignore)]
823 #[tokio::test]
824 async fn request_error_status_code() {
825 let server = MockServer::start_async().await;
826
827 let mock = server
828 .mock_async(|when, then| {
829 when.method(POST)
830 .header("Content-type", "application/msgpack")
831 .path("/");
832 then.status(500).body("");
833 })
834 .await;
835
836 let payload = setup_payload(&HEADER_TAGS);
837 let data = SendData::new(
838 100,
839 TracerPayloadCollection::V07(vec![payload]),
840 HEADER_TAGS,
841 &Endpoint {
842 api_key: None,
843 url: server.url("/").parse::<hyper::Uri>().unwrap(),
844 timeout_ms: ONE_SECOND,
845 ..Endpoint::default()
846 },
847 );
848
849 let client = libdd_common::hyper_migration::new_default_client();
850 let res = data.send(&client).await;
851
852 mock.assert_calls_async(5).await;
853
854 assert!(res.last_result.is_ok());
855 assert_eq!(res.last_result.unwrap().status(), 500);
856 assert_eq!(res.errors_timeout, 0);
857 assert_eq!(res.errors_network, 0);
858 assert_eq!(res.errors_status_code, 1);
859 assert_eq!(res.requests_count, 5);
860 assert_eq!(res.chunks_sent, 0);
861 assert_eq!(res.bytes_sent, 0);
862 assert_eq!(*res.responses_count_per_code.get(&500).unwrap(), 1_u64);
863 }
864
865 #[cfg_attr(miri, ignore)]
866 #[tokio::test]
867 async fn request_error_network() {
868 let payload = setup_payload(&HEADER_TAGS);
870 let data = SendData::new(
871 100,
872 TracerPayloadCollection::V07(vec![payload]),
873 HEADER_TAGS,
874 &Endpoint {
875 api_key: None,
876 url: "http://127.0.0.1:4321/".parse::<hyper::Uri>().unwrap(),
877 timeout_ms: ONE_SECOND,
878 ..Endpoint::default()
879 },
880 );
881
882 let client = libdd_common::hyper_migration::new_default_client();
883 let res = data.send(&client).await;
884
885 assert!(res.last_result.is_err());
886 match std::env::consts::OS {
887 "windows" => {
888 assert_eq!(res.errors_timeout, 1);
892 assert_eq!(res.errors_network, 0);
893 }
894 _ => {
895 assert_eq!(res.errors_timeout, 0);
896 assert_eq!(res.errors_network, 1);
897 }
898 }
899 assert_eq!(res.errors_status_code, 0);
900 assert_eq!(res.requests_count, 5);
901 assert_eq!(res.errors_status_code, 0);
902 assert_eq!(res.chunks_sent, 0);
903 assert_eq!(res.bytes_sent, 0);
904 assert_eq!(res.responses_count_per_code.len(), 0);
905 }
906
907 #[cfg_attr(miri, ignore)]
908 #[tokio::test]
909 async fn request_error_timeout_v04() {
910 let server = MockServer::start_async().await;
911
912 let header_tags = HEADER_TAGS;
913 let mock = server
914 .mock_async(|when, then| {
915 when.method(POST)
916 .header(DATADOG_TRACE_COUNT_STR, "2")
917 .header("Content-type", "application/msgpack")
918 .header("datadog-meta-lang", header_tags.lang)
919 .header(
920 "datadog-meta-lang-interpreter",
921 header_tags.lang_interpreter,
922 )
923 .header("datadog-meta-lang-version", header_tags.lang_version)
924 .header(
925 "datadog-meta-lang-interpreter-vendor",
926 header_tags.lang_vendor,
927 )
928 .header("datadog-meta-tracer-version", header_tags.tracer_version)
929 .header("datadog-container-id", header_tags.container_id)
930 .path("/");
931 then.status(200).body("").delay(Duration::from_millis(500));
932 })
933 .await;
934
935 let header_tags = HEADER_TAGS;
936
937 let trace = vec![create_test_no_alloc_span(1234, 12342, 12341, 1, false)];
938 let data = SendData::new(
939 100,
940 TracerPayloadCollection::V04(vec![trace.clone(), trace.clone()]),
941 header_tags,
942 &Endpoint {
943 api_key: None,
944 url: server.url("/").parse::<hyper::Uri>().unwrap(),
945 timeout_ms: 200,
946 ..Endpoint::default()
947 },
948 );
949
950 let client = libdd_common::hyper_migration::new_default_client();
951 let res = data.send(&client).await;
952
953 mock.assert_calls_async(5).await;
954
955 assert_eq!(res.errors_timeout, 1);
956 assert_eq!(res.errors_network, 0);
957 assert_eq!(res.errors_status_code, 0);
958 assert_eq!(res.requests_count, 5);
959 assert_eq!(res.chunks_sent, 0);
960 assert_eq!(res.bytes_sent, 0);
961 assert_eq!(res.responses_count_per_code.len(), 0);
962 }
963
964 #[cfg_attr(miri, ignore)]
965 #[tokio::test]
966 async fn request_error_timeout_v07() {
967 let server = MockServer::start_async().await;
968
969 let mock = server
970 .mock_async(|when, then| {
971 when.method(POST)
972 .header("Content-type", "application/msgpack")
973 .path("/");
974 then.status(200).body("").delay(Duration::from_millis(500));
975 })
976 .await;
977
978 let header_tags = TracerHeaderTags::default();
979
980 let payload = setup_payload(&header_tags);
981 let data = SendData::new(
982 100,
983 TracerPayloadCollection::V07(vec![payload.clone(), payload.clone()]),
984 header_tags,
985 &Endpoint {
986 api_key: None,
987 url: server.url("/").parse::<hyper::Uri>().unwrap(),
988 timeout_ms: 200,
989 ..Endpoint::default()
990 },
991 );
992
993 let client = libdd_common::hyper_migration::new_default_client();
994 let res = data.send(&client).await;
995
996 mock.assert_calls_async(10).await;
997
998 assert_eq!(res.errors_timeout, 1);
999 assert_eq!(res.errors_network, 0);
1000 assert_eq!(res.errors_status_code, 0);
1001 assert_eq!(res.requests_count, 5);
1002 assert_eq!(res.chunks_sent, 0);
1003 assert_eq!(res.bytes_sent, 0);
1004 assert_eq!(res.responses_count_per_code.len(), 0);
1005 }
1006
1007 #[test]
1008 fn test_with_endpoint() {
1009 let header_tags = HEADER_TAGS;
1010 let payload = setup_payload(&header_tags);
1011 let original_endpoint = Endpoint {
1012 api_key: Some(std::borrow::Cow::Borrowed("original-key")),
1013 url: "http://originalexample.com/".parse::<hyper::Uri>().unwrap(),
1014 timeout_ms: 1000,
1015 ..Endpoint::default()
1016 };
1017
1018 let original_data = SendData::new(
1019 100,
1020 TracerPayloadCollection::V07(vec![payload]),
1021 header_tags,
1022 &original_endpoint,
1023 );
1024
1025 let new_endpoint = Endpoint {
1026 api_key: Some(std::borrow::Cow::Borrowed("new-key")),
1027 url: "http://newexample.com/".parse::<hyper::Uri>().unwrap(),
1028 timeout_ms: 2000,
1029 ..Endpoint::default()
1030 };
1031
1032 let new_data = original_data.with_endpoint(new_endpoint.clone());
1033
1034 assert_eq!(new_data.target.api_key, new_endpoint.api_key);
1035 assert_eq!(new_data.target.url, new_endpoint.url);
1036 assert_eq!(new_data.target.timeout_ms, new_endpoint.timeout_ms);
1037
1038 assert_eq!(new_data.size, original_data.size);
1039 assert_eq!(new_data.headers, original_data.headers);
1040 assert_eq!(new_data.retry_strategy, original_data.retry_strategy);
1041 assert_eq!(
1042 new_data.tracer_payloads.size(),
1043 original_data.tracer_payloads.size()
1044 );
1045
1046 assert_eq!(original_data.target.api_key, original_endpoint.api_key);
1047 assert_eq!(original_data.target.url, original_endpoint.url);
1048 assert_eq!(
1049 original_data.target.timeout_ms,
1050 original_endpoint.timeout_ms
1051 );
1052
1053 #[cfg(feature = "compression")]
1054 assert!(matches!(new_data.compression, Compression::None));
1055 }
1056
1057 #[test]
1058 fn test_builder() {
1059 let header_tags = HEADER_TAGS;
1060 let payload = setup_payload(&header_tags);
1061 let retry_strategy = RetryStrategy::new(5, 100, RetryBackoffType::Constant, None);
1062
1063 let send_data = SendDataBuilder::new(
1064 100,
1065 TracerPayloadCollection::V07(vec![payload]),
1066 header_tags,
1067 &Endpoint::default(),
1068 )
1069 .with_api_key("TEST-KEY")
1071 .with_retry_strategy(retry_strategy.clone())
1073 .build();
1074
1075 assert_eq!(
1076 send_data.target.api_key,
1077 Some(std::borrow::Cow::Borrowed("TEST-KEY"))
1078 );
1079 assert_eq!(send_data.retry_strategy, retry_strategy);
1080 }
1081}