Skip to main content

libdd_trace_utils/send_data/
mod.rs

1// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
2// SPDX-License-Identifier: Apache-2.0
3
4pub mod send_data_result;
5
6use crate::msgpack_encoder;
7use crate::send_with_retry::{send_with_retry, RetryStrategy, SendWithRetryResult};
8use crate::trace_utils::TracerHeaderTags;
9use crate::tracer_payload::TracerPayloadCollection;
10use anyhow::{anyhow, Context};
11use futures::stream::FuturesUnordered;
12use futures::StreamExt;
13use http::header::CONTENT_TYPE;
14use libdd_common::{
15    header::{
16        APPLICATION_MSGPACK_STR, APPLICATION_PROTOBUF_STR, DATADOG_SEND_REAL_HTTP_STATUS_STR,
17        DATADOG_TRACE_COUNT_STR,
18    },
19    Connect, Endpoint, GenericHttpClient,
20};
21use libdd_trace_protobuf::pb::{AgentPayload, TracerPayload};
22use send_data_result::SendDataResult;
23use std::collections::HashMap;
24#[cfg(feature = "compression")]
25use std::io::Write;
26#[cfg(feature = "compression")]
27use zstd::stream::write::Encoder;
28
29#[derive(Debug)]
30/// `SendData` is a structure that holds the data to be sent to a target endpoint.
31/// It includes the payloads to be sent, the size of the data, the target endpoint,
32/// headers for the request, and a retry strategy for sending the data.
33///
34/// # Example
35///
36/// ```rust
37/// use libdd_trace_protobuf::pb::TracerPayload;
38/// use libdd_trace_utils::send_data::{
39///     SendData,
40/// };
41/// use libdd_common::Endpoint;
42/// use libdd_common::http_common::new_default_client;
43/// use libdd_trace_utils::send_with_retry::{RetryBackoffType, RetryStrategy};
44/// use libdd_trace_utils::trace_utils::TracerHeaderTags;
45/// use libdd_trace_utils::tracer_payload::TracerPayloadCollection;
46///
47/// #[cfg_attr(miri, ignore)]
48/// async fn update_send_results_example() {
49///     let size = 100;
50///     let tracer_payload = TracerPayloadCollection::V07(
51///         vec![TracerPayload::default()]); // Replace with actual payload
52///     let tracer_header_tags = TracerHeaderTags::default(); // Replace with actual header tags
53///     let target = Endpoint::default(); // Replace with actual endpoint
54///
55///     let mut send_data = SendData::new(size, tracer_payload, tracer_header_tags, &target);
56///
57///     // Set a custom retry strategy
58///     let retry_strategy = RetryStrategy::new(3, 10, RetryBackoffType::Exponential, Some(5));
59///
60///     send_data.set_retry_strategy(retry_strategy);
61///
62///     let client = new_default_client();
63///     // Send the data
64///     let result = send_data.send(&client).await;
65/// }
66/// ```
67pub struct SendData {
68    pub(crate) tracer_payloads: TracerPayloadCollection,
69    pub(crate) size: usize, // have a rough size estimate to force flushing if it's large
70    target: Endpoint,
71    headers: HashMap<&'static str, String>,
72    retry_strategy: RetryStrategy,
73    #[cfg(feature = "compression")]
74    compression: Compression,
75}
76
77#[cfg(feature = "compression")]
78#[derive(Debug, Clone)]
79pub enum Compression {
80    Zstd(i32),
81    None,
82}
83
84pub struct SendDataBuilder {
85    pub(crate) tracer_payloads: TracerPayloadCollection,
86    pub(crate) size: usize,
87    target: Endpoint,
88    headers: HashMap<&'static str, String>,
89    retry_strategy: RetryStrategy,
90    #[cfg(feature = "compression")]
91    compression: Compression,
92}
93
94impl SendDataBuilder {
95    pub fn new(
96        size: usize,
97        tracer_payload: TracerPayloadCollection,
98        tracer_header_tags: TracerHeaderTags,
99        target: &Endpoint,
100    ) -> SendDataBuilder {
101        let mut headers: HashMap<&'static str, String> = tracer_header_tags.into();
102        headers.insert(DATADOG_SEND_REAL_HTTP_STATUS_STR, "1".to_string());
103        SendDataBuilder {
104            tracer_payloads: tracer_payload,
105            size,
106            target: target.clone(),
107            headers,
108            retry_strategy: RetryStrategy::default(),
109            #[cfg(feature = "compression")]
110            compression: Compression::None,
111        }
112    }
113
114    #[cfg(feature = "compression")]
115    pub fn with_compression(mut self, compression: Compression) -> SendDataBuilder {
116        self.compression = compression;
117        self
118    }
119
120    pub fn with_api_key(mut self, api_key: &str) -> SendDataBuilder {
121        self.target.api_key = Some(api_key.to_string().into());
122        self
123    }
124
125    pub fn with_retry_strategy(mut self, retry_strategy: RetryStrategy) -> SendDataBuilder {
126        self.retry_strategy = retry_strategy;
127        self
128    }
129
130    pub fn build(self) -> SendData {
131        SendData {
132            tracer_payloads: self.tracer_payloads,
133            size: self.size,
134            target: self.target,
135            headers: self.headers,
136            retry_strategy: self.retry_strategy,
137            #[cfg(feature = "compression")]
138            compression: self.compression,
139        }
140    }
141}
142
143impl SendData {
144    /// Creates a new instance of `SendData`.
145    ///
146    /// # Arguments
147    ///
148    /// * `size`: Approximate size of the data to be sent in bytes.
149    /// * `tracer_payload`: The payload to be sent.
150    /// * `tracer_header_tags`: The header tags for the tracer.
151    /// * `target`: The endpoint to which the data will be sent.
152    ///
153    /// # Returns
154    ///
155    /// A new `SendData` instance.
156    #[allow(unused_variables)]
157    pub fn new(
158        size: usize,
159        tracer_payload: TracerPayloadCollection,
160        tracer_header_tags: TracerHeaderTags,
161        target: &Endpoint,
162    ) -> SendData {
163        let mut headers: HashMap<&'static str, String> = tracer_header_tags.into();
164        headers.insert(DATADOG_SEND_REAL_HTTP_STATUS_STR, "1".to_string());
165        SendData {
166            tracer_payloads: tracer_payload,
167            size,
168            target: target.clone(),
169            headers,
170            retry_strategy: RetryStrategy::default(),
171            #[cfg(feature = "compression")]
172            compression: Compression::None,
173        }
174    }
175
176    /// Returns the user defined approximate size of the data to be sent in bytes.
177    ///
178    /// # Returns
179    ///
180    /// The size of the data.
181    pub fn len(&self) -> usize {
182        self.size
183    }
184
185    /// Checks if the user defined approximate size of the data to be sent is zero.
186    ///
187    /// # Returns
188    ///
189    /// `true` if size is 0, `false` otherwise.
190    pub fn is_empty(&self) -> bool {
191        self.size == 0
192    }
193
194    /// Returns the target endpoint.
195    ///
196    /// # Returns
197    ///
198    /// A reference to the target endpoint.
199    pub fn get_target(&self) -> &Endpoint {
200        &self.target
201    }
202
203    /// Returns the payloads to be sent.
204    ///
205    /// # Returns
206    ///
207    /// A reference to the vector of payloads.
208    pub fn get_payloads(&self) -> &TracerPayloadCollection {
209        &self.tracer_payloads
210    }
211
212    /// Overrides the default RetryStrategy with user-defined values.
213    ///
214    /// # Arguments
215    ///
216    /// * `retry_strategy`: The new retry strategy to be used.
217    pub fn set_retry_strategy(&mut self, retry_strategy: RetryStrategy) {
218        self.retry_strategy = retry_strategy;
219    }
220
221    /// Sends the data to the target endpoint.
222    ///
223    /// # Returns
224    ///
225    /// A `SendDataResult` instance containing the result of the operation.
226    pub async fn send<C: Connect>(&self, http_client: &GenericHttpClient<C>) -> SendDataResult {
227        self.send_internal(http_client, None).await
228    }
229
230    async fn send_internal<C: Connect>(
231        &self,
232        http_client: &GenericHttpClient<C>,
233        endpoint: Option<Endpoint>,
234    ) -> SendDataResult {
235        if self.use_protobuf() {
236            self.send_with_protobuf(http_client, endpoint).await
237        } else {
238            self.send_with_msgpack(http_client, endpoint).await
239        }
240    }
241
242    async fn send_payload<C: Connect>(
243        &self,
244        chunks: u64,
245        payload: Vec<u8>,
246        headers: HashMap<&'static str, String>,
247        http_client: &GenericHttpClient<C>,
248        endpoint: Option<&Endpoint>,
249    ) -> (SendWithRetryResult, u64, u64) {
250        #[allow(clippy::unwrap_used)]
251        let payload_len = u64::try_from(payload.len()).unwrap();
252        (
253            send_with_retry(
254                http_client,
255                endpoint.unwrap_or(&self.target),
256                payload,
257                &headers,
258                &self.retry_strategy,
259            )
260            .await,
261            payload_len,
262            chunks,
263        )
264    }
265
266    fn use_protobuf(&self) -> bool {
267        self.target.api_key.is_some()
268    }
269
270    #[cfg(feature = "compression")]
271    fn compress_payload(&self, payload: Vec<u8>, headers: &mut HashMap<&str, String>) -> Vec<u8> {
272        match self.compression {
273            Compression::Zstd(level) => {
274                let result = (|| -> std::io::Result<Vec<u8>> {
275                    let mut encoder = Encoder::new(Vec::new(), level)?;
276                    encoder.write_all(&payload)?;
277                    encoder.finish()
278                })();
279
280                match result {
281                    Ok(compressed_payload) => {
282                        headers.insert("Content-Encoding", "zstd".to_string());
283                        compressed_payload
284                    }
285                    Err(_) => payload,
286                }
287            }
288            _ => payload,
289        }
290    }
291
292    async fn send_with_protobuf<C: Connect>(
293        &self,
294        http_client: &GenericHttpClient<C>,
295        endpoint: Option<Endpoint>,
296    ) -> SendDataResult {
297        let mut result = SendDataResult::default();
298
299        #[allow(clippy::unwrap_used)]
300        let chunks = u64::try_from(self.tracer_payloads.size()).unwrap();
301
302        match &self.tracer_payloads {
303            TracerPayloadCollection::V07(payloads) => {
304                let agent_payload = construct_agent_payload(payloads.to_vec());
305                let serialized_trace_payload = match serialize_proto_payload(&agent_payload)
306                    .context("Failed to serialize trace agent payload, dropping traces")
307                {
308                    Ok(p) => p,
309                    Err(e) => return result.error(e),
310                };
311                let mut request_headers = self.headers.clone();
312
313                #[cfg(feature = "compression")]
314                let final_payload =
315                    self.compress_payload(serialized_trace_payload, &mut request_headers);
316
317                #[cfg(not(feature = "compression"))]
318                let final_payload = serialized_trace_payload;
319
320                request_headers.insert(CONTENT_TYPE.as_str(), APPLICATION_PROTOBUF_STR.to_string());
321
322                let (response, bytes_sent, chunks) = self
323                    .send_payload(
324                        chunks,
325                        final_payload,
326                        request_headers,
327                        http_client,
328                        endpoint.as_ref(),
329                    )
330                    .await;
331
332                result.update(response, bytes_sent, chunks);
333
334                result
335            }
336            _ => result,
337        }
338    }
339
340    async fn send_with_msgpack<C: Connect>(
341        &self,
342        http_client: &GenericHttpClient<C>,
343        endpoint: Option<Endpoint>,
344    ) -> SendDataResult {
345        let mut result = SendDataResult::default();
346        let mut futures = FuturesUnordered::new();
347
348        match &self.tracer_payloads {
349            TracerPayloadCollection::V07(payloads) => {
350                for tracer_payload in payloads {
351                    #[allow(clippy::unwrap_used)]
352                    let chunks = u64::try_from(tracer_payload.chunks.len()).unwrap();
353                    let mut headers = self.headers.clone();
354                    headers.insert(DATADOG_TRACE_COUNT_STR, chunks.to_string());
355                    headers.insert(CONTENT_TYPE.as_str(), APPLICATION_MSGPACK_STR.to_string());
356
357                    let payload = match rmp_serde::to_vec_named(tracer_payload) {
358                        Ok(p) => p,
359                        Err(e) => return result.error(anyhow!(e)),
360                    };
361
362                    futures.push(self.send_payload(
363                        chunks,
364                        payload,
365                        headers,
366                        http_client,
367                        endpoint.as_ref(),
368                    ));
369                }
370            }
371            TracerPayloadCollection::V04(payload) => {
372                #[allow(clippy::unwrap_used)]
373                let chunks = u64::try_from(self.tracer_payloads.size()).unwrap();
374                let mut headers = self.headers.clone();
375                headers.insert(DATADOG_TRACE_COUNT_STR, chunks.to_string());
376                headers.insert(CONTENT_TYPE.as_str(), APPLICATION_MSGPACK_STR.to_string());
377
378                let payload = msgpack_encoder::v04::to_vec(payload);
379
380                futures.push(self.send_payload(
381                    chunks,
382                    payload,
383                    headers,
384                    http_client,
385                    endpoint.as_ref(),
386                ));
387            }
388            TracerPayloadCollection::V05(payload) => {
389                #[allow(clippy::unwrap_used)]
390                let chunks = u64::try_from(self.tracer_payloads.size()).unwrap();
391                let mut headers = self.headers.clone();
392                headers.insert(DATADOG_TRACE_COUNT_STR, chunks.to_string());
393                headers.insert(CONTENT_TYPE.as_str(), APPLICATION_MSGPACK_STR.to_string());
394
395                let payload = match rmp_serde::to_vec(payload) {
396                    Ok(p) => p,
397                    Err(e) => return result.error(anyhow!(e)),
398                };
399
400                futures.push(self.send_payload(
401                    chunks,
402                    payload,
403                    headers,
404                    http_client,
405                    endpoint.as_ref(),
406                ));
407            }
408        }
409
410        loop {
411            match futures.next().await {
412                Some((response, payload_len, chunks)) => {
413                    result.update(response, payload_len, chunks);
414                    if result.last_result.is_err() {
415                        return result;
416                    }
417                }
418                None => return result,
419            }
420        }
421    }
422}
423
424fn construct_agent_payload(tracer_payloads: Vec<TracerPayload>) -> AgentPayload {
425    AgentPayload {
426        host_name: "".to_string(),
427        env: "".to_string(),
428        agent_version: "".to_string(),
429        error_tps: 60.0,
430        target_tps: 60.0,
431        tags: HashMap::new(),
432        tracer_payloads,
433        rare_sampler_enabled: false,
434        idx_tracer_payloads: Vec::new(),
435    }
436}
437
438fn serialize_proto_payload<T>(payload: &T) -> anyhow::Result<Vec<u8>>
439where
440    T: prost::Message,
441{
442    let mut buf = Vec::with_capacity(payload.encoded_len());
443    payload.encode(&mut buf)?;
444    Ok(buf)
445}
446
447#[cfg(test)]
448mod tests {
449    use super::*;
450    use crate::send_with_retry::{RetryBackoffType, RetryStrategy};
451    use crate::test_utils::create_test_no_alloc_span;
452    use crate::trace_utils::{construct_trace_chunk, construct_tracer_payload, RootSpanTags};
453    use crate::tracer_header_tags::TracerHeaderTags;
454    use httpmock::prelude::*;
455    use httpmock::MockServer;
456    use libdd_common::Endpoint;
457    use libdd_trace_protobuf::pb::Span;
458    use std::collections::HashMap;
459    use std::time::Duration;
460
461    const ONE_SECOND: u64 = 1_000;
462    const HEADER_TAGS: TracerHeaderTags = TracerHeaderTags {
463        lang: "test-lang",
464        lang_version: "2.0",
465        lang_interpreter: "interpreter",
466        lang_vendor: "vendor",
467        tracer_version: "1.0",
468        container_id: "id",
469        client_computed_top_level: false,
470        client_computed_stats: false,
471        dropped_p0_traces: 0,
472        dropped_p0_spans: 0,
473    };
474
475    fn setup_payload(header_tags: &TracerHeaderTags) -> TracerPayload {
476        let root_tags = RootSpanTags {
477            env: "TEST",
478            app_version: "1.0",
479            hostname: "test_bench",
480            runtime_id: "id",
481        };
482
483        let chunk = construct_trace_chunk(vec![Span {
484            service: "test-service".to_string(),
485            name: "test-service-name".to_string(),
486            resource: "test-service-resource".to_string(),
487            trace_id: 111,
488            span_id: 222,
489            parent_id: 333,
490            start: 1,
491            duration: 5,
492            error: 0,
493            meta: HashMap::new(),
494            metrics: HashMap::new(),
495            meta_struct: HashMap::new(),
496            r#type: "".to_string(),
497            span_links: vec![],
498            span_events: vec![],
499        }]);
500
501        construct_tracer_payload(vec![chunk], header_tags, root_tags)
502    }
503
504    fn compute_payload_len(collection: &TracerPayloadCollection) -> usize {
505        match collection {
506            TracerPayloadCollection::V07(payloads) => {
507                let agent_payload = construct_agent_payload(payloads.to_vec());
508                let serialized_trace_payload = serialize_proto_payload(&agent_payload).unwrap();
509                serialized_trace_payload.len()
510            }
511            _ => 0,
512        }
513    }
514
515    fn rmp_compute_payload_len(collection: &TracerPayloadCollection) -> usize {
516        match collection {
517            TracerPayloadCollection::V07(payloads) => {
518                let mut total: usize = 0;
519                for payload in payloads {
520                    total += rmp_serde::to_vec_named(payload).unwrap().len();
521                }
522                total
523            }
524            TracerPayloadCollection::V04(payloads) => {
525                msgpack_encoder::v04::to_len(payloads) as usize
526            }
527            TracerPayloadCollection::V05(payloads) => rmp_serde::to_vec(payloads).unwrap().len(),
528        }
529    }
530
531    #[test]
532    fn send_data_new_api_key() {
533        let header_tags = TracerHeaderTags::default();
534
535        let payload = setup_payload(&header_tags);
536        let data = SendData::new(
537            100,
538            TracerPayloadCollection::V07(vec![payload]),
539            HEADER_TAGS,
540            &Endpoint {
541                api_key: Some(std::borrow::Cow::Borrowed("TEST-KEY")),
542                url: "/foo/bar?baz".parse::<hyper::Uri>().unwrap(),
543                timeout_ms: ONE_SECOND,
544                ..Endpoint::default()
545            },
546        );
547
548        assert_eq!(data.size, 100);
549
550        assert_eq!(data.target.api_key.unwrap(), "TEST-KEY");
551        assert_eq!(data.target.url.path(), "/foo/bar");
552    }
553
554    #[test]
555    fn send_data_new_no_api_key() {
556        let header_tags = TracerHeaderTags::default();
557
558        let payload = setup_payload(&header_tags);
559        let data = SendData::new(
560            100,
561            TracerPayloadCollection::V07(vec![payload]),
562            header_tags.clone(),
563            &Endpoint {
564                api_key: None,
565                url: "/foo/bar?baz".parse::<hyper::Uri>().unwrap(),
566                timeout_ms: ONE_SECOND,
567                ..Endpoint::default()
568            },
569        );
570
571        assert_eq!(data.size, 100);
572
573        assert_eq!(data.target.api_key, None);
574        assert_eq!(data.target.url.path(), "/foo/bar");
575
576        for (key, value) in HashMap::from(header_tags) {
577            assert_eq!(data.headers.get(key).unwrap(), &value);
578        }
579    }
580
581    #[cfg_attr(miri, ignore)]
582    #[tokio::test]
583    async fn request_protobuf() {
584        let server = MockServer::start_async().await;
585
586        let mock = server
587            .mock_async(|when, then| {
588                when.method(POST)
589                    .header("Content-type", "application/x-protobuf")
590                    .header("DD-API-KEY", "TEST-KEY")
591                    .path("/");
592                then.status(202).body("");
593            })
594            .await;
595
596        let header_tags = TracerHeaderTags::default();
597
598        let payload = setup_payload(&header_tags);
599        let data = SendData::new(
600            100,
601            TracerPayloadCollection::V07(vec![payload.clone()]),
602            header_tags,
603            &Endpoint {
604                api_key: Some(std::borrow::Cow::Borrowed("TEST-KEY")),
605                url: server.url("/").parse::<hyper::Uri>().unwrap(),
606                timeout_ms: ONE_SECOND,
607                ..Endpoint::default()
608            },
609        );
610
611        let data_payload_len = compute_payload_len(&data.tracer_payloads);
612        let client = libdd_common::http_common::new_default_client();
613        let res = data.send(&client).await;
614
615        mock.assert_async().await;
616
617        assert_eq!(res.last_result.unwrap().status(), 202);
618        assert_eq!(res.errors_timeout, 0);
619        assert_eq!(res.errors_network, 0);
620        assert_eq!(res.errors_status_code, 0);
621        assert_eq!(res.requests_count, 1);
622        assert_eq!(res.chunks_sent, 1);
623        assert_eq!(res.bytes_sent, data_payload_len as u64);
624        assert_eq!(*res.responses_count_per_code.get(&202).unwrap(), 1_u64);
625    }
626
627    #[cfg_attr(miri, ignore)]
628    #[tokio::test]
629    async fn request_protobuf_several_payloads() {
630        let server = MockServer::start_async().await;
631
632        let mock = server
633            .mock_async(|when, then| {
634                when.method(POST)
635                    .header("Content-type", "application/x-protobuf")
636                    .header("DD-API-KEY", "TEST-KEY")
637                    .path("/");
638                then.status(202).body("");
639            })
640            .await;
641
642        let header_tags = TracerHeaderTags::default();
643
644        let payload = setup_payload(&header_tags);
645        let data = SendData::new(
646            100,
647            TracerPayloadCollection::V07(vec![payload.clone(), payload.clone()]),
648            header_tags,
649            &Endpoint {
650                api_key: Some(std::borrow::Cow::Borrowed("TEST-KEY")),
651                url: server.url("/").parse::<hyper::Uri>().unwrap(),
652                timeout_ms: ONE_SECOND,
653                ..Endpoint::default()
654            },
655        );
656
657        let data_payload_len = compute_payload_len(&data.tracer_payloads);
658        let client = libdd_common::http_common::new_default_client();
659        let res = data.send(&client).await;
660
661        mock.assert_async().await;
662
663        assert_eq!(res.last_result.unwrap().status(), 202);
664        assert_eq!(res.errors_timeout, 0);
665        assert_eq!(res.errors_network, 0);
666        assert_eq!(res.errors_status_code, 0);
667        assert_eq!(res.requests_count, 1);
668        assert_eq!(res.chunks_sent, 2);
669        assert_eq!(res.bytes_sent, data_payload_len as u64);
670        assert_eq!(*res.responses_count_per_code.get(&202).unwrap(), 1_u64);
671    }
672
673    #[cfg_attr(miri, ignore)]
674    #[tokio::test]
675    async fn request_msgpack_v07() {
676        let server = MockServer::start_async().await;
677
678        let header_tags = HEADER_TAGS;
679        let mock = server
680            .mock_async(|when, then| {
681                when.method(POST)
682                    .header(DATADOG_TRACE_COUNT_STR, "1")
683                    .header("Content-type", "application/msgpack")
684                    .header("datadog-meta-lang", header_tags.lang)
685                    .header(
686                        "datadog-meta-lang-interpreter",
687                        header_tags.lang_interpreter,
688                    )
689                    .header("datadog-meta-lang-version", header_tags.lang_version)
690                    .header(
691                        "datadog-meta-lang-interpreter-vendor",
692                        header_tags.lang_vendor,
693                    )
694                    .header("datadog-meta-tracer-version", header_tags.tracer_version)
695                    .header("datadog-container-id", header_tags.container_id)
696                    .header("Datadog-Send-Real-Http-Status", "1")
697                    .path("/");
698                then.status(200).body("");
699            })
700            .await;
701
702        let header_tags = HEADER_TAGS;
703
704        let payload = setup_payload(&header_tags);
705        let data = SendData::new(
706            100,
707            TracerPayloadCollection::V07(vec![payload.clone()]),
708            header_tags,
709            &Endpoint {
710                api_key: None,
711                url: server.url("/").parse::<hyper::Uri>().unwrap(),
712                timeout_ms: ONE_SECOND,
713                ..Endpoint::default()
714            },
715        );
716
717        let data_payload_len = rmp_compute_payload_len(&data.tracer_payloads);
718        let client = libdd_common::http_common::new_default_client();
719        let res = data.send(&client).await;
720
721        mock.assert_async().await;
722
723        assert_eq!(res.last_result.unwrap().status(), 200);
724        assert_eq!(res.errors_timeout, 0);
725        assert_eq!(res.errors_network, 0);
726        assert_eq!(res.errors_status_code, 0);
727        assert_eq!(res.requests_count, 1);
728        assert_eq!(res.chunks_sent, 1);
729        assert_eq!(res.bytes_sent, data_payload_len as u64);
730        assert_eq!(*res.responses_count_per_code.get(&200).unwrap(), 1_u64);
731    }
732
733    #[cfg_attr(miri, ignore)]
734    #[tokio::test]
735    async fn request_msgpack_v04() {
736        let server = MockServer::start_async().await;
737
738        let header_tags = HEADER_TAGS;
739        let mock = server
740            .mock_async(|when, then| {
741                when.method(POST)
742                    .header(DATADOG_TRACE_COUNT_STR, "1")
743                    .header("Content-type", "application/msgpack")
744                    .header("datadog-meta-lang", header_tags.lang)
745                    .header(
746                        "datadog-meta-lang-interpreter",
747                        header_tags.lang_interpreter,
748                    )
749                    .header("datadog-meta-lang-version", header_tags.lang_version)
750                    .header(
751                        "datadog-meta-lang-interpreter-vendor",
752                        header_tags.lang_vendor,
753                    )
754                    .header("datadog-meta-tracer-version", header_tags.tracer_version)
755                    .header("datadog-container-id", header_tags.container_id)
756                    .path("/");
757                then.status(200).body("");
758            })
759            .await;
760
761        let header_tags = HEADER_TAGS;
762
763        let trace = vec![create_test_no_alloc_span(1234, 12342, 12341, 1, false)];
764        let data = SendData::new(
765            100,
766            TracerPayloadCollection::V04(vec![trace.clone()]),
767            header_tags,
768            &Endpoint {
769                api_key: None,
770                url: server.url("/").parse::<hyper::Uri>().unwrap(),
771                timeout_ms: ONE_SECOND,
772                ..Endpoint::default()
773            },
774        );
775
776        let data_payload_len = rmp_compute_payload_len(&data.tracer_payloads);
777        let client = libdd_common::http_common::new_default_client();
778        let res = data.send(&client).await;
779
780        mock.assert_async().await;
781
782        assert_eq!(res.last_result.unwrap().status(), 200);
783        assert_eq!(res.errors_timeout, 0);
784        assert_eq!(res.errors_network, 0);
785        assert_eq!(res.errors_status_code, 0);
786        assert_eq!(res.requests_count, 1);
787        assert_eq!(res.chunks_sent, 1);
788        assert_eq!(res.bytes_sent, data_payload_len as u64);
789        assert_eq!(*res.responses_count_per_code.get(&200).unwrap(), 1_u64);
790    }
791
792    #[cfg_attr(miri, ignore)]
793    #[tokio::test]
794    async fn request_msgpack_several_payloads() {
795        let server = MockServer::start_async().await;
796
797        let mock = server
798            .mock_async(|when, then| {
799                when.method(POST)
800                    .header("Content-type", "application/msgpack")
801                    .path("/");
802                then.status(200).body("");
803            })
804            .await;
805
806        let header_tags = TracerHeaderTags::default();
807
808        let payload = setup_payload(&header_tags);
809        let data = SendData::new(
810            100,
811            TracerPayloadCollection::V07(vec![payload.clone(), payload.clone()]),
812            header_tags,
813            &Endpoint {
814                api_key: None,
815                url: server.url("/").parse::<hyper::Uri>().unwrap(),
816                timeout_ms: ONE_SECOND,
817                ..Endpoint::default()
818            },
819        );
820
821        let data_payload_len = rmp_compute_payload_len(&data.tracer_payloads);
822        let client = libdd_common::http_common::new_default_client();
823        let res = data.send(&client).await;
824
825        mock.assert_calls_async(2).await;
826
827        assert_eq!(res.last_result.unwrap().status(), 200);
828        assert_eq!(res.errors_timeout, 0);
829        assert_eq!(res.errors_network, 0);
830        assert_eq!(res.errors_status_code, 0);
831        assert_eq!(res.requests_count, 2);
832        assert_eq!(res.chunks_sent, 2);
833        assert_eq!(res.bytes_sent, data_payload_len as u64);
834        assert_eq!(*res.responses_count_per_code.get(&200).unwrap(), 2_u64);
835    }
836
837    #[cfg_attr(miri, ignore)]
838    #[tokio::test]
839    async fn request_error_status_code() {
840        let server = MockServer::start_async().await;
841
842        let mock = server
843            .mock_async(|when, then| {
844                when.method(POST)
845                    .header("Content-type", "application/msgpack")
846                    .path("/");
847                then.status(500).body("");
848            })
849            .await;
850
851        let payload = setup_payload(&HEADER_TAGS);
852        let data = SendData::new(
853            100,
854            TracerPayloadCollection::V07(vec![payload]),
855            HEADER_TAGS,
856            &Endpoint {
857                api_key: None,
858                url: server.url("/").parse::<hyper::Uri>().unwrap(),
859                timeout_ms: ONE_SECOND,
860                ..Endpoint::default()
861            },
862        );
863
864        let client = libdd_common::http_common::new_default_client();
865        let res = data.send(&client).await;
866
867        mock.assert_calls_async(5).await;
868
869        assert!(res.last_result.is_ok());
870        assert_eq!(res.last_result.unwrap().status(), 500);
871        assert_eq!(res.errors_timeout, 0);
872        assert_eq!(res.errors_network, 0);
873        assert_eq!(res.errors_status_code, 1);
874        assert_eq!(res.requests_count, 5);
875        assert_eq!(res.chunks_sent, 0);
876        assert_eq!(res.bytes_sent, 0);
877        assert_eq!(*res.responses_count_per_code.get(&500).unwrap(), 1_u64);
878    }
879
880    #[cfg_attr(miri, ignore)]
881    #[tokio::test]
882    async fn request_error_network() {
883        // Server not created in order to return a 'connection refused' error.
884        let payload = setup_payload(&HEADER_TAGS);
885        let data = SendData::new(
886            100,
887            TracerPayloadCollection::V07(vec![payload]),
888            HEADER_TAGS,
889            &Endpoint {
890                api_key: None,
891                url: "http://127.0.0.1:4321/".parse::<hyper::Uri>().unwrap(),
892                timeout_ms: ONE_SECOND,
893                ..Endpoint::default()
894            },
895        );
896
897        let client = libdd_common::http_common::new_default_client();
898        let res = data.send(&client).await;
899
900        assert!(res.last_result.is_err());
901        match std::env::consts::OS {
902            "windows" => {
903                // On windows the TCP/IP stack returns a timeout error (at hyper level) rather
904                // than a connection refused error despite not having a listening socket on the
905                // port.
906                assert_eq!(res.errors_timeout, 1);
907                assert_eq!(res.errors_network, 0);
908            }
909            _ => {
910                assert_eq!(res.errors_timeout, 0);
911                assert_eq!(res.errors_network, 1);
912            }
913        }
914        assert_eq!(res.errors_status_code, 0);
915        assert_eq!(res.requests_count, 5);
916        assert_eq!(res.errors_status_code, 0);
917        assert_eq!(res.chunks_sent, 0);
918        assert_eq!(res.bytes_sent, 0);
919        assert_eq!(res.responses_count_per_code.len(), 0);
920    }
921
922    #[cfg_attr(miri, ignore)]
923    #[tokio::test]
924    async fn request_error_timeout_v04() {
925        let server = MockServer::start_async().await;
926
927        let header_tags = HEADER_TAGS;
928        let mock = server
929            .mock_async(|when, then| {
930                when.method(POST)
931                    .header(DATADOG_TRACE_COUNT_STR, "2")
932                    .header("Content-type", "application/msgpack")
933                    .header("datadog-meta-lang", header_tags.lang)
934                    .header(
935                        "datadog-meta-lang-interpreter",
936                        header_tags.lang_interpreter,
937                    )
938                    .header("datadog-meta-lang-version", header_tags.lang_version)
939                    .header(
940                        "datadog-meta-lang-interpreter-vendor",
941                        header_tags.lang_vendor,
942                    )
943                    .header("datadog-meta-tracer-version", header_tags.tracer_version)
944                    .header("datadog-container-id", header_tags.container_id)
945                    .path("/");
946                then.status(200).body("").delay(Duration::from_millis(500));
947            })
948            .await;
949
950        let header_tags = HEADER_TAGS;
951
952        let trace = vec![create_test_no_alloc_span(1234, 12342, 12341, 1, false)];
953        let data = SendData::new(
954            100,
955            TracerPayloadCollection::V04(vec![trace.clone(), trace.clone()]),
956            header_tags,
957            &Endpoint {
958                api_key: None,
959                url: server.url("/").parse::<hyper::Uri>().unwrap(),
960                timeout_ms: 200,
961                ..Endpoint::default()
962            },
963        );
964
965        let client = libdd_common::http_common::new_default_client();
966        let res = data.send(&client).await;
967
968        mock.assert_calls_async(5).await;
969
970        assert_eq!(res.errors_timeout, 1);
971        assert_eq!(res.errors_network, 0);
972        assert_eq!(res.errors_status_code, 0);
973        assert_eq!(res.requests_count, 5);
974        assert_eq!(res.chunks_sent, 0);
975        assert_eq!(res.bytes_sent, 0);
976        assert_eq!(res.responses_count_per_code.len(), 0);
977    }
978
979    #[cfg_attr(miri, ignore)]
980    #[tokio::test]
981    async fn request_error_timeout_v07() {
982        let server = MockServer::start_async().await;
983
984        let mock = server
985            .mock_async(|when, then| {
986                when.method(POST)
987                    .header("Content-type", "application/msgpack")
988                    .path("/");
989                then.status(200).body("").delay(Duration::from_millis(500));
990            })
991            .await;
992
993        let header_tags = TracerHeaderTags::default();
994
995        let payload = setup_payload(&header_tags);
996        let data = SendData::new(
997            100,
998            TracerPayloadCollection::V07(vec![payload.clone(), payload.clone()]),
999            header_tags,
1000            &Endpoint {
1001                api_key: None,
1002                url: server.url("/").parse::<hyper::Uri>().unwrap(),
1003                timeout_ms: 200,
1004                ..Endpoint::default()
1005            },
1006        );
1007
1008        let client = libdd_common::http_common::new_default_client();
1009        let res = data.send(&client).await;
1010
1011        mock.assert_calls_async(10).await;
1012
1013        assert_eq!(res.errors_timeout, 1);
1014        assert_eq!(res.errors_network, 0);
1015        assert_eq!(res.errors_status_code, 0);
1016        assert_eq!(res.requests_count, 5);
1017        assert_eq!(res.chunks_sent, 0);
1018        assert_eq!(res.bytes_sent, 0);
1019        assert_eq!(res.responses_count_per_code.len(), 0);
1020    }
1021
1022    #[test]
1023    fn test_builder() {
1024        let header_tags = HEADER_TAGS;
1025        let payload = setup_payload(&header_tags);
1026        let retry_strategy = RetryStrategy::new(5, 100, RetryBackoffType::Constant, None);
1027
1028        let send_data = SendDataBuilder::new(
1029            100,
1030            TracerPayloadCollection::V07(vec![payload]),
1031            header_tags,
1032            &Endpoint::default(),
1033        )
1034        // Test with_api_key()
1035        .with_api_key("TEST-KEY")
1036        // Test with_retry_strategy()
1037        .with_retry_strategy(retry_strategy.clone())
1038        .build();
1039
1040        assert_eq!(
1041            send_data.target.api_key,
1042            Some(std::borrow::Cow::Borrowed("TEST-KEY"))
1043        );
1044        assert_eq!(send_data.retry_strategy, retry_strategy);
1045    }
1046}