1pub mod send_data_result;
5
6use crate::msgpack_encoder;
7use crate::send_with_retry::{send_with_retry, RetryStrategy, SendWithRetryResult};
8use crate::trace_utils::TracerHeaderTags;
9use crate::tracer_payload::TracerPayloadCollection;
10use anyhow::{anyhow, Context};
11use futures::stream::FuturesUnordered;
12use futures::StreamExt;
13use http::{header::CONTENT_TYPE, HeaderMap, HeaderValue};
14use libdd_capabilities::{HttpClientCapability, SleepCapability};
15use libdd_common::{
16 header::{
17 APPLICATION_MSGPACK, APPLICATION_PROTOBUF, DATADOG_SEND_REAL_HTTP_STATUS,
18 DATADOG_TRACE_COUNT,
19 },
20 Endpoint,
21};
22use libdd_trace_protobuf::pb::{AgentPayload, TracerPayload};
23use send_data_result::SendDataResult;
24use std::collections::HashMap;
25#[cfg(feature = "compression")]
26use std::io::Write;
27#[cfg(feature = "compression")]
28use zstd::stream::write::Encoder;
29
30#[derive(Debug)]
31pub struct SendData {
69 pub(crate) tracer_payloads: TracerPayloadCollection,
70 pub(crate) size: usize, target: Endpoint,
72 headers: HeaderMap,
73 retry_strategy: RetryStrategy,
74 #[cfg(feature = "compression")]
75 compression: Compression,
76}
77
78#[cfg(feature = "compression")]
79#[derive(Debug, Clone)]
80pub enum Compression {
81 Zstd(i32),
82 None,
83}
84
85pub struct SendDataBuilder {
86 pub(crate) tracer_payloads: TracerPayloadCollection,
87 pub(crate) size: usize,
88 target: Endpoint,
89 headers: HeaderMap,
90 retry_strategy: RetryStrategy,
91 #[cfg(feature = "compression")]
92 compression: Compression,
93}
94
95impl SendDataBuilder {
96 pub fn new(
97 size: usize,
98 tracer_payload: TracerPayloadCollection,
99 tracer_header_tags: TracerHeaderTags,
100 target: &Endpoint,
101 ) -> SendDataBuilder {
102 let mut headers: HeaderMap = tracer_header_tags.into();
103 headers.insert(DATADOG_SEND_REAL_HTTP_STATUS, HeaderValue::from_static("1"));
104 SendDataBuilder {
105 tracer_payloads: tracer_payload,
106 size,
107 target: target.clone(),
108 headers,
109 retry_strategy: RetryStrategy::default(),
110 #[cfg(feature = "compression")]
111 compression: Compression::None,
112 }
113 }
114
115 #[cfg(feature = "compression")]
116 pub fn with_compression(mut self, compression: Compression) -> SendDataBuilder {
117 self.compression = compression;
118 self
119 }
120
121 pub fn with_api_key(mut self, api_key: &str) -> SendDataBuilder {
122 self.target.api_key = Some(api_key.to_string().into());
123 self
124 }
125
126 pub fn with_retry_strategy(mut self, retry_strategy: RetryStrategy) -> SendDataBuilder {
127 self.retry_strategy = retry_strategy;
128 self
129 }
130
131 pub fn build(self) -> SendData {
132 SendData {
133 tracer_payloads: self.tracer_payloads,
134 size: self.size,
135 target: self.target,
136 headers: self.headers,
137 retry_strategy: self.retry_strategy,
138 #[cfg(feature = "compression")]
139 compression: self.compression,
140 }
141 }
142}
143
144impl SendData {
145 #[allow(unused_variables)]
158 pub fn new(
159 size: usize,
160 tracer_payload: TracerPayloadCollection,
161 tracer_header_tags: TracerHeaderTags,
162 target: &Endpoint,
163 ) -> SendData {
164 let mut headers: HeaderMap = tracer_header_tags.into();
165 headers.insert(DATADOG_SEND_REAL_HTTP_STATUS, HeaderValue::from_static("1"));
166 SendData {
167 tracer_payloads: tracer_payload,
168 size,
169 target: target.clone(),
170 headers,
171 retry_strategy: RetryStrategy::default(),
172 #[cfg(feature = "compression")]
173 compression: Compression::None,
174 }
175 }
176
177 pub fn len(&self) -> usize {
183 self.size
184 }
185
186 pub fn is_empty(&self) -> bool {
192 self.size == 0
193 }
194
195 pub fn get_target(&self) -> &Endpoint {
201 &self.target
202 }
203
204 pub fn get_payloads(&self) -> &TracerPayloadCollection {
210 &self.tracer_payloads
211 }
212
213 pub fn set_retry_strategy(&mut self, retry_strategy: RetryStrategy) {
219 self.retry_strategy = retry_strategy;
220 }
221
222 pub async fn send<C: HttpClientCapability + SleepCapability>(
228 &self,
229 capabilities: &C,
230 ) -> SendDataResult {
231 self.send_internal(capabilities, None).await
232 }
233
234 async fn send_internal<C: HttpClientCapability + SleepCapability>(
235 &self,
236 capabilities: &C,
237 endpoint: Option<Endpoint>,
238 ) -> SendDataResult {
239 if self.use_protobuf() {
240 self.send_with_protobuf(capabilities, endpoint).await
241 } else {
242 self.send_with_msgpack(capabilities, endpoint).await
243 }
244 }
245
246 async fn send_payload<C: HttpClientCapability + SleepCapability>(
247 &self,
248 capabilities: &C,
249 chunks: u64,
250 payload: Vec<u8>,
251 headers: HeaderMap,
252 endpoint: Option<&Endpoint>,
253 ) -> (SendWithRetryResult, u64, u64) {
254 #[allow(clippy::unwrap_used)]
255 let payload_len = u64::try_from(payload.len()).unwrap();
256 (
257 send_with_retry(
258 capabilities,
259 endpoint.unwrap_or(&self.target),
260 payload,
261 &headers,
262 &self.retry_strategy,
263 )
264 .await,
265 payload_len,
266 chunks,
267 )
268 }
269
270 fn use_protobuf(&self) -> bool {
271 self.target.api_key.is_some()
272 }
273
274 #[cfg(feature = "compression")]
275 fn compress_payload(&self, payload: Vec<u8>, headers: &mut HeaderMap) -> Vec<u8> {
276 match self.compression {
277 Compression::Zstd(level) => {
278 let result = (|| -> std::io::Result<Vec<u8>> {
279 let mut encoder = Encoder::new(Vec::new(), level)?;
280 encoder.write_all(&payload)?;
281 encoder.finish()
282 })();
283
284 match result {
285 Ok(compressed_payload) => {
286 headers.insert(
287 http::header::CONTENT_ENCODING,
288 HeaderValue::from_static("zstd"),
289 );
290 compressed_payload
291 }
292 Err(_) => payload,
293 }
294 }
295 _ => payload,
296 }
297 }
298
299 async fn send_with_protobuf<C: HttpClientCapability + SleepCapability>(
300 &self,
301 capabilities: &C,
302 endpoint: Option<Endpoint>,
303 ) -> SendDataResult {
304 let mut result = SendDataResult::default();
305
306 #[allow(clippy::unwrap_used)]
307 let chunks = u64::try_from(self.tracer_payloads.size()).unwrap();
308
309 match &self.tracer_payloads {
310 TracerPayloadCollection::V07(payloads) => {
311 let agent_payload = construct_agent_payload(payloads.to_vec());
312 let serialized_trace_payload = match serialize_proto_payload(&agent_payload)
313 .context("Failed to serialize trace agent payload, dropping traces")
314 {
315 Ok(p) => p,
316 Err(e) => return result.error(e),
317 };
318 let mut request_headers = self.headers.clone();
319
320 #[cfg(feature = "compression")]
321 let final_payload =
322 self.compress_payload(serialized_trace_payload, &mut request_headers);
323
324 #[cfg(not(feature = "compression"))]
325 let final_payload = serialized_trace_payload;
326
327 request_headers.insert(CONTENT_TYPE, APPLICATION_PROTOBUF);
328
329 let (response, bytes_sent, chunks) = self
330 .send_payload(
331 capabilities,
332 chunks,
333 final_payload,
334 request_headers,
335 endpoint.as_ref(),
336 )
337 .await;
338
339 result.update(response, bytes_sent, chunks);
340
341 result
342 }
343 _ => result,
344 }
345 }
346
347 async fn send_with_msgpack<C: HttpClientCapability + SleepCapability>(
348 &self,
349 capabilities: &C,
350 endpoint: Option<Endpoint>,
351 ) -> SendDataResult {
352 let mut result = SendDataResult::default();
353 let mut futures = FuturesUnordered::new();
354
355 match &self.tracer_payloads {
356 TracerPayloadCollection::V07(payloads) => {
357 for tracer_payload in payloads {
358 #[allow(clippy::unwrap_used)]
359 let chunks = u64::try_from(tracer_payload.chunks.len()).unwrap();
360 let mut headers = self.headers.clone();
361 headers.reserve(2);
362 headers.insert(DATADOG_TRACE_COUNT, chunks.into());
363 headers.insert(CONTENT_TYPE, APPLICATION_MSGPACK);
364
365 let payload = match rmp_serde::to_vec_named(tracer_payload) {
366 Ok(p) => p,
367 Err(e) => return result.error(anyhow!(e)),
368 };
369
370 futures.push(self.send_payload(
371 capabilities,
372 chunks,
373 payload,
374 headers,
375 endpoint.as_ref(),
376 ));
377 }
378 }
379 TracerPayloadCollection::V04(payload) => {
380 #[allow(clippy::unwrap_used)]
381 let chunks = u64::try_from(self.tracer_payloads.size()).unwrap();
382 let mut headers = self.headers.clone();
383 headers.reserve(2);
384 headers.insert(DATADOG_TRACE_COUNT, chunks.into());
385 headers.insert(CONTENT_TYPE, APPLICATION_MSGPACK);
386
387 let payload = msgpack_encoder::v04::to_vec(payload);
388
389 futures.push(self.send_payload(
390 capabilities,
391 chunks,
392 payload,
393 headers,
394 endpoint.as_ref(),
395 ));
396 }
397 TracerPayloadCollection::V05(payload) => {
398 #[allow(clippy::unwrap_used)]
399 let chunks = u64::try_from(self.tracer_payloads.size()).unwrap();
400 let mut headers = self.headers.clone();
401 headers.reserve(2);
402 headers.insert(DATADOG_TRACE_COUNT, chunks.into());
403 headers.insert(CONTENT_TYPE, APPLICATION_MSGPACK);
404
405 let payload = match rmp_serde::to_vec(payload) {
406 Ok(p) => p,
407 Err(e) => return result.error(anyhow!(e)),
408 };
409
410 futures.push(self.send_payload(
411 capabilities,
412 chunks,
413 payload,
414 headers,
415 endpoint.as_ref(),
416 ));
417 }
418 }
419
420 loop {
421 match futures.next().await {
422 Some((response, payload_len, chunks)) => {
423 result.update(response, payload_len, chunks);
424 if result.last_result.is_err() {
425 return result;
426 }
427 }
428 None => return result,
429 }
430 }
431 }
432}
433
434fn construct_agent_payload(tracer_payloads: Vec<TracerPayload>) -> AgentPayload {
435 AgentPayload {
436 host_name: "".to_string(),
437 env: "".to_string(),
438 agent_version: "".to_string(),
439 error_tps: 60.0,
440 target_tps: 60.0,
441 tags: HashMap::new(),
442 tracer_payloads,
443 rare_sampler_enabled: false,
444 idx_tracer_payloads: Vec::new(),
445 }
446}
447
448fn serialize_proto_payload<T>(payload: &T) -> anyhow::Result<Vec<u8>>
449where
450 T: prost::Message,
451{
452 let mut buf = Vec::with_capacity(payload.encoded_len());
453 payload.encode(&mut buf)?;
454 Ok(buf)
455}
456
457#[cfg(test)]
458mod tests {
459 use super::*;
460 use crate::send_with_retry::{RetryBackoffType, RetryStrategy};
461 use crate::test_utils::create_test_no_alloc_span;
462 use crate::trace_utils::{construct_trace_chunk, construct_tracer_payload, TracerPayloadTags};
463 use crate::tracer_header_tags::TracerHeaderTags;
464 use httpmock::prelude::*;
465 use httpmock::MockServer;
466 use libdd_capabilities::HttpClientCapability;
467 use libdd_capabilities_impl::NativeCapabilities;
468 use libdd_common::Endpoint;
469 use libdd_trace_protobuf::pb::Span;
470 use std::collections::HashMap;
471 use std::time::Duration;
472
473 const ONE_SECOND: u64 = 1_000;
474 const HEADER_TAGS: TracerHeaderTags = TracerHeaderTags {
475 lang: "test-lang",
476 lang_version: "2.0",
477 lang_interpreter: "interpreter",
478 lang_vendor: "vendor",
479 tracer_version: "1.0",
480 container_id: "id",
481 client_computed_top_level: false,
482 client_computed_stats: false,
483 dropped_p0_traces: 0,
484 dropped_p0_spans: 0,
485 };
486
487 fn setup_payload(header_tags: &TracerHeaderTags) -> TracerPayload {
488 let tracer_payload_tags = TracerPayloadTags {
489 env: "TEST".to_string(),
490 app_version: "1.0".to_string(),
491 hostname: "test_bench".to_string(),
492 runtime_id: "id".to_string(),
493 };
494
495 let chunk = construct_trace_chunk(vec![Span {
496 service: "test-service".to_string(),
497 name: "test-service-name".to_string(),
498 resource: "test-service-resource".to_string(),
499 trace_id: 111,
500 span_id: 222,
501 parent_id: 333,
502 start: 1,
503 duration: 5,
504 error: 0,
505 meta: HashMap::new(),
506 metrics: HashMap::new(),
507 meta_struct: HashMap::new(),
508 r#type: "".to_string(),
509 span_links: vec![],
510 span_events: vec![],
511 }]);
512
513 construct_tracer_payload(vec![chunk], header_tags, tracer_payload_tags)
514 }
515
516 fn compute_payload_len(collection: &TracerPayloadCollection) -> usize {
517 match collection {
518 TracerPayloadCollection::V07(payloads) => {
519 let agent_payload = construct_agent_payload(payloads.to_vec());
520 let serialized_trace_payload = serialize_proto_payload(&agent_payload).unwrap();
521 serialized_trace_payload.len()
522 }
523 _ => 0,
524 }
525 }
526
527 fn rmp_compute_payload_len(collection: &TracerPayloadCollection) -> usize {
528 match collection {
529 TracerPayloadCollection::V07(payloads) => {
530 let mut total: usize = 0;
531 for payload in payloads {
532 total += rmp_serde::to_vec_named(payload).unwrap().len();
533 }
534 total
535 }
536 TracerPayloadCollection::V04(payloads) => {
537 msgpack_encoder::v04::to_encoded_byte_len(payloads) as usize
538 }
539 TracerPayloadCollection::V05(payloads) => rmp_serde::to_vec(payloads).unwrap().len(),
540 }
541 }
542
543 #[test]
544 fn send_data_new_api_key() {
545 let header_tags = TracerHeaderTags::default();
546
547 let payload = setup_payload(&header_tags);
548 let data = SendData::new(
549 100,
550 TracerPayloadCollection::V07(vec![payload]),
551 HEADER_TAGS,
552 &Endpoint {
553 api_key: Some(std::borrow::Cow::Borrowed("TEST-KEY")),
554 url: "/foo/bar?baz".parse::<hyper::Uri>().unwrap(),
555 timeout_ms: ONE_SECOND,
556 ..Endpoint::default()
557 },
558 );
559
560 assert_eq!(data.size, 100);
561
562 assert_eq!(data.target.api_key.unwrap(), "TEST-KEY");
563 assert_eq!(data.target.url.path(), "/foo/bar");
564 }
565
566 #[test]
567 fn send_data_new_no_api_key() {
568 let header_tags = TracerHeaderTags::default();
569
570 let payload = setup_payload(&header_tags);
571 let data = SendData::new(
572 100,
573 TracerPayloadCollection::V07(vec![payload]),
574 header_tags.clone(),
575 &Endpoint {
576 api_key: None,
577 url: "/foo/bar?baz".parse::<hyper::Uri>().unwrap(),
578 timeout_ms: ONE_SECOND,
579 ..Endpoint::default()
580 },
581 );
582
583 assert_eq!(data.size, 100);
584
585 assert_eq!(data.target.api_key, None);
586 assert_eq!(data.target.url.path(), "/foo/bar");
587
588 for (key, value) in &HeaderMap::from(header_tags) {
589 assert_eq!(data.headers.get(key), Some(value));
590 }
591 }
592
593 #[cfg_attr(miri, ignore)]
594 #[tokio::test]
595 async fn request_protobuf() {
596 let server = MockServer::start_async().await;
597
598 let mock = server
599 .mock_async(|when, then| {
600 when.method(POST)
601 .header("Content-type", "application/x-protobuf")
602 .header("DD-API-KEY", "TEST-KEY")
603 .path("/");
604 then.status(202).body("");
605 })
606 .await;
607
608 let header_tags = TracerHeaderTags::default();
609
610 let payload = setup_payload(&header_tags);
611 let data = SendData::new(
612 100,
613 TracerPayloadCollection::V07(vec![payload.clone()]),
614 header_tags,
615 &Endpoint {
616 api_key: Some(std::borrow::Cow::Borrowed("TEST-KEY")),
617 url: server.url("/").parse::<hyper::Uri>().unwrap(),
618 timeout_ms: ONE_SECOND,
619 ..Endpoint::default()
620 },
621 );
622
623 let data_payload_len = compute_payload_len(&data.tracer_payloads);
624 let res = data.send(&NativeCapabilities::new_client()).await;
625
626 mock.assert_async().await;
627
628 assert_eq!(
629 res.last_result.unwrap().status(),
630 http::StatusCode::ACCEPTED
631 );
632 assert_eq!(res.errors_timeout, 0);
633 assert_eq!(res.errors_network, 0);
634 assert_eq!(res.errors_status_code, 0);
635 assert_eq!(res.requests_count, 1);
636 assert_eq!(res.chunks_sent, 1);
637 assert_eq!(res.bytes_sent, data_payload_len as u64);
638 assert_eq!(*res.responses_count_per_code.get(&202).unwrap(), 1_u64);
639 }
640
641 #[cfg_attr(miri, ignore)]
642 #[tokio::test]
643 async fn request_protobuf_several_payloads() {
644 let server = MockServer::start_async().await;
645
646 let mock = server
647 .mock_async(|when, then| {
648 when.method(POST)
649 .header("Content-type", "application/x-protobuf")
650 .header("DD-API-KEY", "TEST-KEY")
651 .path("/");
652 then.status(202).body("");
653 })
654 .await;
655
656 let header_tags = TracerHeaderTags::default();
657
658 let payload = setup_payload(&header_tags);
659 let data = SendData::new(
660 100,
661 TracerPayloadCollection::V07(vec![payload.clone(), payload.clone()]),
662 header_tags,
663 &Endpoint {
664 api_key: Some(std::borrow::Cow::Borrowed("TEST-KEY")),
665 url: server.url("/").parse::<hyper::Uri>().unwrap(),
666 timeout_ms: ONE_SECOND,
667 ..Endpoint::default()
668 },
669 );
670
671 let data_payload_len = compute_payload_len(&data.tracer_payloads);
672 let res = data.send(&NativeCapabilities::new_client()).await;
673
674 mock.assert_async().await;
675
676 assert_eq!(
677 res.last_result.unwrap().status(),
678 http::StatusCode::ACCEPTED
679 );
680 assert_eq!(res.errors_timeout, 0);
681 assert_eq!(res.errors_network, 0);
682 assert_eq!(res.errors_status_code, 0);
683 assert_eq!(res.requests_count, 1);
684 assert_eq!(res.chunks_sent, 2);
685 assert_eq!(res.bytes_sent, data_payload_len as u64);
686 assert_eq!(*res.responses_count_per_code.get(&202).unwrap(), 1_u64);
687 }
688
689 #[cfg_attr(miri, ignore)]
690 #[tokio::test]
691 async fn request_msgpack_v07() {
692 let server = MockServer::start_async().await;
693
694 let header_tags = HEADER_TAGS;
695 let mock = server
696 .mock_async(|when, then| {
697 when.method(POST)
698 .header(DATADOG_TRACE_COUNT.as_str(), "1")
699 .header("Content-type", "application/msgpack")
700 .header("datadog-meta-lang", header_tags.lang)
701 .header(
702 "datadog-meta-lang-interpreter",
703 header_tags.lang_interpreter,
704 )
705 .header("datadog-meta-lang-version", header_tags.lang_version)
706 .header(
707 "datadog-meta-lang-interpreter-vendor",
708 header_tags.lang_vendor,
709 )
710 .header("datadog-meta-tracer-version", header_tags.tracer_version)
711 .header("datadog-container-id", header_tags.container_id)
712 .header("Datadog-Send-Real-Http-Status", "1")
713 .path("/");
714 then.status(200).body("");
715 })
716 .await;
717
718 let header_tags = HEADER_TAGS;
719
720 let payload = setup_payload(&header_tags);
721 let data = SendData::new(
722 100,
723 TracerPayloadCollection::V07(vec![payload.clone()]),
724 header_tags,
725 &Endpoint {
726 api_key: None,
727 url: server.url("/").parse::<hyper::Uri>().unwrap(),
728 timeout_ms: ONE_SECOND,
729 ..Endpoint::default()
730 },
731 );
732
733 let data_payload_len = rmp_compute_payload_len(&data.tracer_payloads);
734 let res = data.send(&NativeCapabilities::new_client()).await;
735
736 mock.assert_async().await;
737
738 assert_eq!(res.last_result.unwrap().status(), http::StatusCode::OK);
739 assert_eq!(res.errors_timeout, 0);
740 assert_eq!(res.errors_network, 0);
741 assert_eq!(res.errors_status_code, 0);
742 assert_eq!(res.requests_count, 1);
743 assert_eq!(res.chunks_sent, 1);
744 assert_eq!(res.bytes_sent, data_payload_len as u64);
745 assert_eq!(*res.responses_count_per_code.get(&200).unwrap(), 1_u64);
746 }
747
748 #[cfg_attr(miri, ignore)]
749 #[tokio::test]
750 async fn request_msgpack_v04() {
751 let server = MockServer::start_async().await;
752
753 let header_tags = HEADER_TAGS;
754 let mock = server
755 .mock_async(|when, then| {
756 when.method(POST)
757 .header(DATADOG_TRACE_COUNT.as_str(), "1")
758 .header("Content-type", "application/msgpack")
759 .header("datadog-meta-lang", header_tags.lang)
760 .header(
761 "datadog-meta-lang-interpreter",
762 header_tags.lang_interpreter,
763 )
764 .header("datadog-meta-lang-version", header_tags.lang_version)
765 .header(
766 "datadog-meta-lang-interpreter-vendor",
767 header_tags.lang_vendor,
768 )
769 .header("datadog-meta-tracer-version", header_tags.tracer_version)
770 .header("datadog-container-id", header_tags.container_id)
771 .path("/");
772 then.status(200).body("");
773 })
774 .await;
775
776 let header_tags = HEADER_TAGS;
777
778 let trace = vec![create_test_no_alloc_span(1234, 12342, 12341, 1, false)];
779 let data = SendData::new(
780 100,
781 TracerPayloadCollection::V04(vec![trace.clone()]),
782 header_tags,
783 &Endpoint {
784 api_key: None,
785 url: server.url("/").parse::<hyper::Uri>().unwrap(),
786 timeout_ms: ONE_SECOND,
787 ..Endpoint::default()
788 },
789 );
790
791 let data_payload_len = rmp_compute_payload_len(&data.tracer_payloads);
792 let res = data.send(&NativeCapabilities::new_client()).await;
793
794 mock.assert_async().await;
795
796 assert_eq!(res.last_result.unwrap().status(), http::StatusCode::OK);
797 assert_eq!(res.errors_timeout, 0);
798 assert_eq!(res.errors_network, 0);
799 assert_eq!(res.errors_status_code, 0);
800 assert_eq!(res.requests_count, 1);
801 assert_eq!(res.chunks_sent, 1);
802 assert_eq!(res.bytes_sent, data_payload_len as u64);
803 assert_eq!(*res.responses_count_per_code.get(&200).unwrap(), 1_u64);
804 }
805
806 #[cfg_attr(miri, ignore)]
807 #[tokio::test]
808 async fn request_msgpack_several_payloads() {
809 let server = MockServer::start_async().await;
810
811 let mock = server
812 .mock_async(|when, then| {
813 when.method(POST)
814 .header("Content-type", "application/msgpack")
815 .path("/");
816 then.status(200).body("");
817 })
818 .await;
819
820 let header_tags = TracerHeaderTags::default();
821
822 let payload = setup_payload(&header_tags);
823 let data = SendData::new(
824 100,
825 TracerPayloadCollection::V07(vec![payload.clone(), payload.clone()]),
826 header_tags,
827 &Endpoint {
828 api_key: None,
829 url: server.url("/").parse::<hyper::Uri>().unwrap(),
830 timeout_ms: ONE_SECOND,
831 ..Endpoint::default()
832 },
833 );
834
835 let data_payload_len = rmp_compute_payload_len(&data.tracer_payloads);
836 let res = data.send(&NativeCapabilities::new_client()).await;
837
838 mock.assert_calls_async(2).await;
839
840 assert_eq!(res.last_result.unwrap().status(), http::StatusCode::OK);
841 assert_eq!(res.errors_timeout, 0);
842 assert_eq!(res.errors_network, 0);
843 assert_eq!(res.errors_status_code, 0);
844 assert_eq!(res.requests_count, 2);
845 assert_eq!(res.chunks_sent, 2);
846 assert_eq!(res.bytes_sent, data_payload_len as u64);
847 assert_eq!(*res.responses_count_per_code.get(&200).unwrap(), 2_u64);
848 }
849
850 #[cfg_attr(miri, ignore)]
851 #[tokio::test]
852 async fn request_error_status_code() {
853 let server = MockServer::start_async().await;
854
855 let mock = server
856 .mock_async(|when, then| {
857 when.method(POST)
858 .header("Content-type", "application/msgpack")
859 .path("/");
860 then.status(500).body("");
861 })
862 .await;
863
864 let payload = setup_payload(&HEADER_TAGS);
865 let data = SendData::new(
866 100,
867 TracerPayloadCollection::V07(vec![payload]),
868 HEADER_TAGS,
869 &Endpoint {
870 api_key: None,
871 url: server.url("/").parse::<hyper::Uri>().unwrap(),
872 timeout_ms: ONE_SECOND,
873 ..Endpoint::default()
874 },
875 );
876
877 let res = data.send(&NativeCapabilities::new_client()).await;
878
879 mock.assert_calls_async(5).await;
880
881 assert!(res.last_result.is_ok());
882 assert_eq!(
883 res.last_result.unwrap().status(),
884 http::StatusCode::INTERNAL_SERVER_ERROR
885 );
886 assert_eq!(res.errors_timeout, 0);
887 assert_eq!(res.errors_network, 0);
888 assert_eq!(res.errors_status_code, 1);
889 assert_eq!(res.requests_count, 5);
890 assert_eq!(res.chunks_sent, 0);
891 assert_eq!(res.bytes_sent, 0);
892 assert_eq!(*res.responses_count_per_code.get(&500).unwrap(), 1_u64);
893 }
894
895 #[cfg_attr(miri, ignore)]
896 #[tokio::test]
897 async fn request_error_network() {
898 let payload = setup_payload(&HEADER_TAGS);
900 let data = SendData::new(
901 100,
902 TracerPayloadCollection::V07(vec![payload]),
903 HEADER_TAGS,
904 &Endpoint {
905 api_key: None,
906 url: "http://127.0.0.1:4321/".parse::<hyper::Uri>().unwrap(),
907 timeout_ms: ONE_SECOND,
908 ..Endpoint::default()
909 },
910 );
911
912 let res = data.send(&NativeCapabilities::new_client()).await;
913
914 assert!(res.last_result.is_err());
915 match std::env::consts::OS {
916 "windows" => {
917 assert_eq!(res.errors_timeout, 1);
921 assert_eq!(res.errors_network, 0);
922 }
923 _ => {
924 assert_eq!(res.errors_timeout, 0);
925 assert_eq!(res.errors_network, 1);
926 }
927 }
928 assert_eq!(res.errors_status_code, 0);
929 assert_eq!(res.requests_count, 5);
930 assert_eq!(res.errors_status_code, 0);
931 assert_eq!(res.chunks_sent, 0);
932 assert_eq!(res.bytes_sent, 0);
933 assert_eq!(res.responses_count_per_code.len(), 0);
934 }
935
936 #[cfg_attr(miri, ignore)]
937 #[tokio::test]
938 async fn request_error_timeout_v04() {
939 let server = MockServer::start_async().await;
940
941 let header_tags = HEADER_TAGS;
942 let mock = server
943 .mock_async(|when, then| {
944 when.method(POST)
945 .header(DATADOG_TRACE_COUNT.as_str(), "2")
946 .header("Content-type", "application/msgpack")
947 .header("datadog-meta-lang", header_tags.lang)
948 .header(
949 "datadog-meta-lang-interpreter",
950 header_tags.lang_interpreter,
951 )
952 .header("datadog-meta-lang-version", header_tags.lang_version)
953 .header(
954 "datadog-meta-lang-interpreter-vendor",
955 header_tags.lang_vendor,
956 )
957 .header("datadog-meta-tracer-version", header_tags.tracer_version)
958 .header("datadog-container-id", header_tags.container_id)
959 .path("/");
960 then.status(200).body("").delay(Duration::from_millis(500));
961 })
962 .await;
963
964 let header_tags = HEADER_TAGS;
965
966 let trace = vec![create_test_no_alloc_span(1234, 12342, 12341, 1, false)];
967 let data = SendData::new(
968 100,
969 TracerPayloadCollection::V04(vec![trace.clone(), trace.clone()]),
970 header_tags,
971 &Endpoint {
972 api_key: None,
973 url: server.url("/").parse::<hyper::Uri>().unwrap(),
974 timeout_ms: 200,
975 ..Endpoint::default()
976 },
977 );
978
979 let res = data.send(&NativeCapabilities::new_client()).await;
980
981 mock.assert_calls_async(5).await;
982
983 assert_eq!(res.errors_timeout, 1);
984 assert_eq!(res.errors_network, 0);
985 assert_eq!(res.errors_status_code, 0);
986 assert_eq!(res.requests_count, 5);
987 assert_eq!(res.chunks_sent, 0);
988 assert_eq!(res.bytes_sent, 0);
989 assert_eq!(res.responses_count_per_code.len(), 0);
990 }
991
992 #[cfg_attr(miri, ignore)]
993 #[tokio::test]
994 async fn request_error_timeout_v07() {
995 let server = MockServer::start_async().await;
996
997 let mock = server
998 .mock_async(|when, then| {
999 when.method(POST)
1000 .header("Content-type", "application/msgpack")
1001 .path("/");
1002 then.status(200).body("").delay(Duration::from_millis(500));
1003 })
1004 .await;
1005
1006 let header_tags = TracerHeaderTags::default();
1007
1008 let payload = setup_payload(&header_tags);
1009 let data = SendData::new(
1010 100,
1011 TracerPayloadCollection::V07(vec![payload.clone(), payload.clone()]),
1012 header_tags,
1013 &Endpoint {
1014 api_key: None,
1015 url: server.url("/").parse::<hyper::Uri>().unwrap(),
1016 timeout_ms: 200,
1017 ..Endpoint::default()
1018 },
1019 );
1020
1021 let res = data.send(&NativeCapabilities::new_client()).await;
1022
1023 mock.assert_calls_async(10).await;
1024
1025 assert_eq!(res.errors_timeout, 1);
1026 assert_eq!(res.errors_network, 0);
1027 assert_eq!(res.errors_status_code, 0);
1028 assert_eq!(res.requests_count, 5);
1029 assert_eq!(res.chunks_sent, 0);
1030 assert_eq!(res.bytes_sent, 0);
1031 assert_eq!(res.responses_count_per_code.len(), 0);
1032 }
1033
1034 #[test]
1035 fn test_builder() {
1036 let header_tags = HEADER_TAGS;
1037 let payload = setup_payload(&header_tags);
1038 let retry_strategy = RetryStrategy::new(5, 100, RetryBackoffType::Constant, None);
1039
1040 let send_data = SendDataBuilder::new(
1041 100,
1042 TracerPayloadCollection::V07(vec![payload]),
1043 header_tags,
1044 &Endpoint::default(),
1045 )
1046 .with_api_key("TEST-KEY")
1048 .with_retry_strategy(retry_strategy.clone())
1050 .build();
1051
1052 assert_eq!(
1053 send_data.target.api_key,
1054 Some(std::borrow::Cow::Borrowed("TEST-KEY"))
1055 );
1056 assert_eq!(send_data.retry_strategy, retry_strategy);
1057 }
1058}