Skip to main content

libdd_trace_utils/send_data/
mod.rs

1// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
2// SPDX-License-Identifier: Apache-2.0
3
4pub mod send_data_result;
5
6use crate::msgpack_encoder;
7use crate::send_with_retry::{send_with_retry, RetryStrategy, SendWithRetryResult};
8use crate::trace_utils::TracerHeaderTags;
9use crate::tracer_payload::TracerPayloadCollection;
10use anyhow::{anyhow, Context};
11use futures::stream::FuturesUnordered;
12use futures::StreamExt;
13use http::{header::CONTENT_TYPE, HeaderMap, HeaderValue};
14use libdd_common::{
15    header::{
16        APPLICATION_MSGPACK, APPLICATION_PROTOBUF, DATADOG_SEND_REAL_HTTP_STATUS,
17        DATADOG_TRACE_COUNT,
18    },
19    Connect, Endpoint, GenericHttpClient,
20};
21use libdd_trace_protobuf::pb::{AgentPayload, TracerPayload};
22use send_data_result::SendDataResult;
23use std::collections::HashMap;
24#[cfg(feature = "compression")]
25use std::io::Write;
26#[cfg(feature = "compression")]
27use zstd::stream::write::Encoder;
28
29#[derive(Debug)]
30/// `SendData` is a structure that holds the data to be sent to a target endpoint.
31/// It includes the payloads to be sent, the size of the data, the target endpoint,
32/// headers for the request, and a retry strategy for sending the data.
33///
34/// # Example
35///
36/// ```rust
37/// use libdd_trace_protobuf::pb::TracerPayload;
38/// use libdd_trace_utils::send_data::{
39///     SendData,
40/// };
41/// use libdd_common::Endpoint;
42/// use libdd_common::http_common::new_default_client;
43/// use libdd_trace_utils::send_with_retry::{RetryBackoffType, RetryStrategy};
44/// use libdd_trace_utils::trace_utils::TracerHeaderTags;
45/// use libdd_trace_utils::tracer_payload::TracerPayloadCollection;
46///
47/// #[cfg_attr(miri, ignore)]
48/// async fn update_send_results_example() {
49///     let size = 100;
50///     let tracer_payload = TracerPayloadCollection::V07(
51///         vec![TracerPayload::default()]); // Replace with actual payload
52///     let tracer_header_tags = TracerHeaderTags::default(); // Replace with actual header tags
53///     let target = Endpoint::default(); // Replace with actual endpoint
54///
55///     let mut send_data = SendData::new(size, tracer_payload, tracer_header_tags, &target);
56///
57///     // Set a custom retry strategy
58///     let retry_strategy = RetryStrategy::new(3, 10, RetryBackoffType::Exponential, Some(5));
59///
60///     send_data.set_retry_strategy(retry_strategy);
61///
62///     let client = new_default_client();
63///     // Send the data
64///     let result = send_data.send(&client).await;
65/// }
66/// ```
67pub struct SendData {
68    pub(crate) tracer_payloads: TracerPayloadCollection,
69    pub(crate) size: usize, // have a rough size estimate to force flushing if it's large
70    target: Endpoint,
71    headers: HeaderMap,
72    retry_strategy: RetryStrategy,
73    #[cfg(feature = "compression")]
74    compression: Compression,
75}
76
77#[cfg(feature = "compression")]
78#[derive(Debug, Clone)]
79pub enum Compression {
80    Zstd(i32),
81    None,
82}
83
84pub struct SendDataBuilder {
85    pub(crate) tracer_payloads: TracerPayloadCollection,
86    pub(crate) size: usize,
87    target: Endpoint,
88    headers: HeaderMap,
89    retry_strategy: RetryStrategy,
90    #[cfg(feature = "compression")]
91    compression: Compression,
92}
93
94impl SendDataBuilder {
95    pub fn new(
96        size: usize,
97        tracer_payload: TracerPayloadCollection,
98        tracer_header_tags: TracerHeaderTags,
99        target: &Endpoint,
100    ) -> SendDataBuilder {
101        let mut headers: HeaderMap = tracer_header_tags.into();
102        headers.insert(DATADOG_SEND_REAL_HTTP_STATUS, HeaderValue::from_static("1"));
103        SendDataBuilder {
104            tracer_payloads: tracer_payload,
105            size,
106            target: target.clone(),
107            headers,
108            retry_strategy: RetryStrategy::default(),
109            #[cfg(feature = "compression")]
110            compression: Compression::None,
111        }
112    }
113
114    #[cfg(feature = "compression")]
115    pub fn with_compression(mut self, compression: Compression) -> SendDataBuilder {
116        self.compression = compression;
117        self
118    }
119
120    pub fn with_api_key(mut self, api_key: &str) -> SendDataBuilder {
121        self.target.api_key = Some(api_key.to_string().into());
122        self
123    }
124
125    pub fn with_retry_strategy(mut self, retry_strategy: RetryStrategy) -> SendDataBuilder {
126        self.retry_strategy = retry_strategy;
127        self
128    }
129
130    pub fn build(self) -> SendData {
131        SendData {
132            tracer_payloads: self.tracer_payloads,
133            size: self.size,
134            target: self.target,
135            headers: self.headers,
136            retry_strategy: self.retry_strategy,
137            #[cfg(feature = "compression")]
138            compression: self.compression,
139        }
140    }
141}
142
143impl SendData {
144    /// Creates a new instance of `SendData`.
145    ///
146    /// # Arguments
147    ///
148    /// * `size`: Approximate size of the data to be sent in bytes.
149    /// * `tracer_payload`: The payload to be sent.
150    /// * `tracer_header_tags`: The header tags for the tracer.
151    /// * `target`: The endpoint to which the data will be sent.
152    ///
153    /// # Returns
154    ///
155    /// A new `SendData` instance.
156    #[allow(unused_variables)]
157    pub fn new(
158        size: usize,
159        tracer_payload: TracerPayloadCollection,
160        tracer_header_tags: TracerHeaderTags,
161        target: &Endpoint,
162    ) -> SendData {
163        let mut headers: HeaderMap = tracer_header_tags.into();
164        headers.insert(DATADOG_SEND_REAL_HTTP_STATUS, HeaderValue::from_static("1"));
165        SendData {
166            tracer_payloads: tracer_payload,
167            size,
168            target: target.clone(),
169            headers,
170            retry_strategy: RetryStrategy::default(),
171            #[cfg(feature = "compression")]
172            compression: Compression::None,
173        }
174    }
175
176    /// Returns the user defined approximate size of the data to be sent in bytes.
177    ///
178    /// # Returns
179    ///
180    /// The size of the data.
181    pub fn len(&self) -> usize {
182        self.size
183    }
184
185    /// Checks if the user defined approximate size of the data to be sent is zero.
186    ///
187    /// # Returns
188    ///
189    /// `true` if size is 0, `false` otherwise.
190    pub fn is_empty(&self) -> bool {
191        self.size == 0
192    }
193
194    /// Returns the target endpoint.
195    ///
196    /// # Returns
197    ///
198    /// A reference to the target endpoint.
199    pub fn get_target(&self) -> &Endpoint {
200        &self.target
201    }
202
203    /// Returns the payloads to be sent.
204    ///
205    /// # Returns
206    ///
207    /// A reference to the vector of payloads.
208    pub fn get_payloads(&self) -> &TracerPayloadCollection {
209        &self.tracer_payloads
210    }
211
212    /// Overrides the default RetryStrategy with user-defined values.
213    ///
214    /// # Arguments
215    ///
216    /// * `retry_strategy`: The new retry strategy to be used.
217    pub fn set_retry_strategy(&mut self, retry_strategy: RetryStrategy) {
218        self.retry_strategy = retry_strategy;
219    }
220
221    /// Sends the data to the target endpoint.
222    ///
223    /// # Returns
224    ///
225    /// A `SendDataResult` instance containing the result of the operation.
226    pub async fn send<C: Connect>(&self, http_client: &GenericHttpClient<C>) -> SendDataResult {
227        self.send_internal(http_client, None).await
228    }
229
230    async fn send_internal<C: Connect>(
231        &self,
232        http_client: &GenericHttpClient<C>,
233        endpoint: Option<Endpoint>,
234    ) -> SendDataResult {
235        if self.use_protobuf() {
236            self.send_with_protobuf(http_client, endpoint).await
237        } else {
238            self.send_with_msgpack(http_client, endpoint).await
239        }
240    }
241
242    async fn send_payload<C: Connect>(
243        &self,
244        chunks: u64,
245        payload: Vec<u8>,
246        headers: HeaderMap,
247        http_client: &GenericHttpClient<C>,
248        endpoint: Option<&Endpoint>,
249    ) -> (SendWithRetryResult, u64, u64) {
250        #[allow(clippy::unwrap_used)]
251        let payload_len = u64::try_from(payload.len()).unwrap();
252        (
253            send_with_retry(
254                http_client,
255                endpoint.unwrap_or(&self.target),
256                payload,
257                &headers,
258                &self.retry_strategy,
259            )
260            .await,
261            payload_len,
262            chunks,
263        )
264    }
265
266    fn use_protobuf(&self) -> bool {
267        self.target.api_key.is_some()
268    }
269
270    #[cfg(feature = "compression")]
271    fn compress_payload(&self, payload: Vec<u8>, headers: &mut HeaderMap) -> Vec<u8> {
272        match self.compression {
273            Compression::Zstd(level) => {
274                let result = (|| -> std::io::Result<Vec<u8>> {
275                    let mut encoder = Encoder::new(Vec::new(), level)?;
276                    encoder.write_all(&payload)?;
277                    encoder.finish()
278                })();
279
280                match result {
281                    Ok(compressed_payload) => {
282                        headers.insert(
283                            http::header::CONTENT_ENCODING,
284                            HeaderValue::from_static("zstd"),
285                        );
286                        compressed_payload
287                    }
288                    Err(_) => payload,
289                }
290            }
291            _ => payload,
292        }
293    }
294
295    async fn send_with_protobuf<C: Connect>(
296        &self,
297        http_client: &GenericHttpClient<C>,
298        endpoint: Option<Endpoint>,
299    ) -> SendDataResult {
300        let mut result = SendDataResult::default();
301
302        #[allow(clippy::unwrap_used)]
303        let chunks = u64::try_from(self.tracer_payloads.size()).unwrap();
304
305        match &self.tracer_payloads {
306            TracerPayloadCollection::V07(payloads) => {
307                let agent_payload = construct_agent_payload(payloads.to_vec());
308                let serialized_trace_payload = match serialize_proto_payload(&agent_payload)
309                    .context("Failed to serialize trace agent payload, dropping traces")
310                {
311                    Ok(p) => p,
312                    Err(e) => return result.error(e),
313                };
314                let mut request_headers = self.headers.clone();
315
316                #[cfg(feature = "compression")]
317                let final_payload =
318                    self.compress_payload(serialized_trace_payload, &mut request_headers);
319
320                #[cfg(not(feature = "compression"))]
321                let final_payload = serialized_trace_payload;
322
323                request_headers.insert(CONTENT_TYPE, APPLICATION_PROTOBUF);
324
325                let (response, bytes_sent, chunks) = self
326                    .send_payload(
327                        chunks,
328                        final_payload,
329                        request_headers,
330                        http_client,
331                        endpoint.as_ref(),
332                    )
333                    .await;
334
335                result.update(response, bytes_sent, chunks);
336
337                result
338            }
339            _ => result,
340        }
341    }
342
343    async fn send_with_msgpack<C: Connect>(
344        &self,
345        http_client: &GenericHttpClient<C>,
346        endpoint: Option<Endpoint>,
347    ) -> SendDataResult {
348        let mut result = SendDataResult::default();
349        let mut futures = FuturesUnordered::new();
350
351        match &self.tracer_payloads {
352            TracerPayloadCollection::V07(payloads) => {
353                for tracer_payload in payloads {
354                    #[allow(clippy::unwrap_used)]
355                    let chunks = u64::try_from(tracer_payload.chunks.len()).unwrap();
356                    let mut headers = self.headers.clone();
357                    headers.reserve(2);
358                    headers.insert(DATADOG_TRACE_COUNT, chunks.into());
359                    headers.insert(CONTENT_TYPE, APPLICATION_MSGPACK);
360
361                    let payload = match rmp_serde::to_vec_named(tracer_payload) {
362                        Ok(p) => p,
363                        Err(e) => return result.error(anyhow!(e)),
364                    };
365
366                    futures.push(self.send_payload(
367                        chunks,
368                        payload,
369                        headers,
370                        http_client,
371                        endpoint.as_ref(),
372                    ));
373                }
374            }
375            TracerPayloadCollection::V04(payload) => {
376                #[allow(clippy::unwrap_used)]
377                let chunks = u64::try_from(self.tracer_payloads.size()).unwrap();
378                let mut headers = self.headers.clone();
379                headers.reserve(2);
380                headers.insert(DATADOG_TRACE_COUNT, chunks.into());
381                headers.insert(CONTENT_TYPE, APPLICATION_MSGPACK);
382
383                let payload = msgpack_encoder::v04::to_vec(payload);
384
385                futures.push(self.send_payload(
386                    chunks,
387                    payload,
388                    headers,
389                    http_client,
390                    endpoint.as_ref(),
391                ));
392            }
393            TracerPayloadCollection::V05(payload) => {
394                #[allow(clippy::unwrap_used)]
395                let chunks = u64::try_from(self.tracer_payloads.size()).unwrap();
396                let mut headers = self.headers.clone();
397                headers.reserve(2);
398                headers.insert(DATADOG_TRACE_COUNT, chunks.into());
399                headers.insert(CONTENT_TYPE, APPLICATION_MSGPACK);
400
401                let payload = match rmp_serde::to_vec(payload) {
402                    Ok(p) => p,
403                    Err(e) => return result.error(anyhow!(e)),
404                };
405
406                futures.push(self.send_payload(
407                    chunks,
408                    payload,
409                    headers,
410                    http_client,
411                    endpoint.as_ref(),
412                ));
413            }
414        }
415
416        loop {
417            match futures.next().await {
418                Some((response, payload_len, chunks)) => {
419                    result.update(response, payload_len, chunks);
420                    if result.last_result.is_err() {
421                        return result;
422                    }
423                }
424                None => return result,
425            }
426        }
427    }
428}
429
430fn construct_agent_payload(tracer_payloads: Vec<TracerPayload>) -> AgentPayload {
431    AgentPayload {
432        host_name: "".to_string(),
433        env: "".to_string(),
434        agent_version: "".to_string(),
435        error_tps: 60.0,
436        target_tps: 60.0,
437        tags: HashMap::new(),
438        tracer_payloads,
439        rare_sampler_enabled: false,
440        idx_tracer_payloads: Vec::new(),
441    }
442}
443
444fn serialize_proto_payload<T>(payload: &T) -> anyhow::Result<Vec<u8>>
445where
446    T: prost::Message,
447{
448    let mut buf = Vec::with_capacity(payload.encoded_len());
449    payload.encode(&mut buf)?;
450    Ok(buf)
451}
452
453#[cfg(test)]
454mod tests {
455    use super::*;
456    use crate::send_with_retry::{RetryBackoffType, RetryStrategy};
457    use crate::test_utils::create_test_no_alloc_span;
458    use crate::trace_utils::{construct_trace_chunk, construct_tracer_payload, RootSpanTags};
459    use crate::tracer_header_tags::TracerHeaderTags;
460    use httpmock::prelude::*;
461    use httpmock::MockServer;
462    use libdd_common::Endpoint;
463    use libdd_trace_protobuf::pb::Span;
464    use std::collections::HashMap;
465    use std::time::Duration;
466
467    const ONE_SECOND: u64 = 1_000;
468    const HEADER_TAGS: TracerHeaderTags = TracerHeaderTags {
469        lang: "test-lang",
470        lang_version: "2.0",
471        lang_interpreter: "interpreter",
472        lang_vendor: "vendor",
473        tracer_version: "1.0",
474        container_id: "id",
475        client_computed_top_level: false,
476        client_computed_stats: false,
477        dropped_p0_traces: 0,
478        dropped_p0_spans: 0,
479    };
480
481    fn setup_payload(header_tags: &TracerHeaderTags) -> TracerPayload {
482        let root_tags = RootSpanTags {
483            env: "TEST",
484            app_version: "1.0",
485            hostname: "test_bench",
486            runtime_id: "id",
487        };
488
489        let chunk = construct_trace_chunk(vec![Span {
490            service: "test-service".to_string(),
491            name: "test-service-name".to_string(),
492            resource: "test-service-resource".to_string(),
493            trace_id: 111,
494            span_id: 222,
495            parent_id: 333,
496            start: 1,
497            duration: 5,
498            error: 0,
499            meta: HashMap::new(),
500            metrics: HashMap::new(),
501            meta_struct: HashMap::new(),
502            r#type: "".to_string(),
503            span_links: vec![],
504            span_events: vec![],
505        }]);
506
507        construct_tracer_payload(vec![chunk], header_tags, root_tags)
508    }
509
510    fn compute_payload_len(collection: &TracerPayloadCollection) -> usize {
511        match collection {
512            TracerPayloadCollection::V07(payloads) => {
513                let agent_payload = construct_agent_payload(payloads.to_vec());
514                let serialized_trace_payload = serialize_proto_payload(&agent_payload).unwrap();
515                serialized_trace_payload.len()
516            }
517            _ => 0,
518        }
519    }
520
521    fn rmp_compute_payload_len(collection: &TracerPayloadCollection) -> usize {
522        match collection {
523            TracerPayloadCollection::V07(payloads) => {
524                let mut total: usize = 0;
525                for payload in payloads {
526                    total += rmp_serde::to_vec_named(payload).unwrap().len();
527                }
528                total
529            }
530            TracerPayloadCollection::V04(payloads) => {
531                msgpack_encoder::v04::to_len(payloads) as usize
532            }
533            TracerPayloadCollection::V05(payloads) => rmp_serde::to_vec(payloads).unwrap().len(),
534        }
535    }
536
537    #[test]
538    fn send_data_new_api_key() {
539        let header_tags = TracerHeaderTags::default();
540
541        let payload = setup_payload(&header_tags);
542        let data = SendData::new(
543            100,
544            TracerPayloadCollection::V07(vec![payload]),
545            HEADER_TAGS,
546            &Endpoint {
547                api_key: Some(std::borrow::Cow::Borrowed("TEST-KEY")),
548                url: "/foo/bar?baz".parse::<hyper::Uri>().unwrap(),
549                timeout_ms: ONE_SECOND,
550                ..Endpoint::default()
551            },
552        );
553
554        assert_eq!(data.size, 100);
555
556        assert_eq!(data.target.api_key.unwrap(), "TEST-KEY");
557        assert_eq!(data.target.url.path(), "/foo/bar");
558    }
559
560    #[test]
561    fn send_data_new_no_api_key() {
562        let header_tags = TracerHeaderTags::default();
563
564        let payload = setup_payload(&header_tags);
565        let data = SendData::new(
566            100,
567            TracerPayloadCollection::V07(vec![payload]),
568            header_tags.clone(),
569            &Endpoint {
570                api_key: None,
571                url: "/foo/bar?baz".parse::<hyper::Uri>().unwrap(),
572                timeout_ms: ONE_SECOND,
573                ..Endpoint::default()
574            },
575        );
576
577        assert_eq!(data.size, 100);
578
579        assert_eq!(data.target.api_key, None);
580        assert_eq!(data.target.url.path(), "/foo/bar");
581
582        for (key, value) in &HeaderMap::from(header_tags) {
583            assert_eq!(data.headers.get(key).unwrap(), value);
584        }
585    }
586
587    #[cfg_attr(miri, ignore)]
588    #[tokio::test]
589    async fn request_protobuf() {
590        let server = MockServer::start_async().await;
591
592        let mock = server
593            .mock_async(|when, then| {
594                when.method(POST)
595                    .header("Content-type", "application/x-protobuf")
596                    .header("DD-API-KEY", "TEST-KEY")
597                    .path("/");
598                then.status(202).body("");
599            })
600            .await;
601
602        let header_tags = TracerHeaderTags::default();
603
604        let payload = setup_payload(&header_tags);
605        let data = SendData::new(
606            100,
607            TracerPayloadCollection::V07(vec![payload.clone()]),
608            header_tags,
609            &Endpoint {
610                api_key: Some(std::borrow::Cow::Borrowed("TEST-KEY")),
611                url: server.url("/").parse::<hyper::Uri>().unwrap(),
612                timeout_ms: ONE_SECOND,
613                ..Endpoint::default()
614            },
615        );
616
617        let data_payload_len = compute_payload_len(&data.tracer_payloads);
618        let client = libdd_common::http_common::new_default_client();
619        let res = data.send(&client).await;
620
621        mock.assert_async().await;
622
623        assert_eq!(res.last_result.unwrap().status(), 202);
624        assert_eq!(res.errors_timeout, 0);
625        assert_eq!(res.errors_network, 0);
626        assert_eq!(res.errors_status_code, 0);
627        assert_eq!(res.requests_count, 1);
628        assert_eq!(res.chunks_sent, 1);
629        assert_eq!(res.bytes_sent, data_payload_len as u64);
630        assert_eq!(*res.responses_count_per_code.get(&202).unwrap(), 1_u64);
631    }
632
633    #[cfg_attr(miri, ignore)]
634    #[tokio::test]
635    async fn request_protobuf_several_payloads() {
636        let server = MockServer::start_async().await;
637
638        let mock = server
639            .mock_async(|when, then| {
640                when.method(POST)
641                    .header("Content-type", "application/x-protobuf")
642                    .header("DD-API-KEY", "TEST-KEY")
643                    .path("/");
644                then.status(202).body("");
645            })
646            .await;
647
648        let header_tags = TracerHeaderTags::default();
649
650        let payload = setup_payload(&header_tags);
651        let data = SendData::new(
652            100,
653            TracerPayloadCollection::V07(vec![payload.clone(), payload.clone()]),
654            header_tags,
655            &Endpoint {
656                api_key: Some(std::borrow::Cow::Borrowed("TEST-KEY")),
657                url: server.url("/").parse::<hyper::Uri>().unwrap(),
658                timeout_ms: ONE_SECOND,
659                ..Endpoint::default()
660            },
661        );
662
663        let data_payload_len = compute_payload_len(&data.tracer_payloads);
664        let client = libdd_common::http_common::new_default_client();
665        let res = data.send(&client).await;
666
667        mock.assert_async().await;
668
669        assert_eq!(res.last_result.unwrap().status(), 202);
670        assert_eq!(res.errors_timeout, 0);
671        assert_eq!(res.errors_network, 0);
672        assert_eq!(res.errors_status_code, 0);
673        assert_eq!(res.requests_count, 1);
674        assert_eq!(res.chunks_sent, 2);
675        assert_eq!(res.bytes_sent, data_payload_len as u64);
676        assert_eq!(*res.responses_count_per_code.get(&202).unwrap(), 1_u64);
677    }
678
679    #[cfg_attr(miri, ignore)]
680    #[tokio::test]
681    async fn request_msgpack_v07() {
682        let server = MockServer::start_async().await;
683
684        let header_tags = HEADER_TAGS;
685        let mock = server
686            .mock_async(|when, then| {
687                when.method(POST)
688                    .header(DATADOG_TRACE_COUNT.as_str(), "1")
689                    .header("Content-type", "application/msgpack")
690                    .header("datadog-meta-lang", header_tags.lang)
691                    .header(
692                        "datadog-meta-lang-interpreter",
693                        header_tags.lang_interpreter,
694                    )
695                    .header("datadog-meta-lang-version", header_tags.lang_version)
696                    .header(
697                        "datadog-meta-lang-interpreter-vendor",
698                        header_tags.lang_vendor,
699                    )
700                    .header("datadog-meta-tracer-version", header_tags.tracer_version)
701                    .header("datadog-container-id", header_tags.container_id)
702                    .header("Datadog-Send-Real-Http-Status", "1")
703                    .path("/");
704                then.status(200).body("");
705            })
706            .await;
707
708        let header_tags = HEADER_TAGS;
709
710        let payload = setup_payload(&header_tags);
711        let data = SendData::new(
712            100,
713            TracerPayloadCollection::V07(vec![payload.clone()]),
714            header_tags,
715            &Endpoint {
716                api_key: None,
717                url: server.url("/").parse::<hyper::Uri>().unwrap(),
718                timeout_ms: ONE_SECOND,
719                ..Endpoint::default()
720            },
721        );
722
723        let data_payload_len = rmp_compute_payload_len(&data.tracer_payloads);
724        let client = libdd_common::http_common::new_default_client();
725        let res = data.send(&client).await;
726
727        mock.assert_async().await;
728
729        assert_eq!(res.last_result.unwrap().status(), 200);
730        assert_eq!(res.errors_timeout, 0);
731        assert_eq!(res.errors_network, 0);
732        assert_eq!(res.errors_status_code, 0);
733        assert_eq!(res.requests_count, 1);
734        assert_eq!(res.chunks_sent, 1);
735        assert_eq!(res.bytes_sent, data_payload_len as u64);
736        assert_eq!(*res.responses_count_per_code.get(&200).unwrap(), 1_u64);
737    }
738
739    #[cfg_attr(miri, ignore)]
740    #[tokio::test]
741    async fn request_msgpack_v04() {
742        let server = MockServer::start_async().await;
743
744        let header_tags = HEADER_TAGS;
745        let mock = server
746            .mock_async(|when, then| {
747                when.method(POST)
748                    .header(DATADOG_TRACE_COUNT.as_str(), "1")
749                    .header("Content-type", "application/msgpack")
750                    .header("datadog-meta-lang", header_tags.lang)
751                    .header(
752                        "datadog-meta-lang-interpreter",
753                        header_tags.lang_interpreter,
754                    )
755                    .header("datadog-meta-lang-version", header_tags.lang_version)
756                    .header(
757                        "datadog-meta-lang-interpreter-vendor",
758                        header_tags.lang_vendor,
759                    )
760                    .header("datadog-meta-tracer-version", header_tags.tracer_version)
761                    .header("datadog-container-id", header_tags.container_id)
762                    .path("/");
763                then.status(200).body("");
764            })
765            .await;
766
767        let header_tags = HEADER_TAGS;
768
769        let trace = vec![create_test_no_alloc_span(1234, 12342, 12341, 1, false)];
770        let data = SendData::new(
771            100,
772            TracerPayloadCollection::V04(vec![trace.clone()]),
773            header_tags,
774            &Endpoint {
775                api_key: None,
776                url: server.url("/").parse::<hyper::Uri>().unwrap(),
777                timeout_ms: ONE_SECOND,
778                ..Endpoint::default()
779            },
780        );
781
782        let data_payload_len = rmp_compute_payload_len(&data.tracer_payloads);
783        let client = libdd_common::http_common::new_default_client();
784        let res = data.send(&client).await;
785
786        mock.assert_async().await;
787
788        assert_eq!(res.last_result.unwrap().status(), 200);
789        assert_eq!(res.errors_timeout, 0);
790        assert_eq!(res.errors_network, 0);
791        assert_eq!(res.errors_status_code, 0);
792        assert_eq!(res.requests_count, 1);
793        assert_eq!(res.chunks_sent, 1);
794        assert_eq!(res.bytes_sent, data_payload_len as u64);
795        assert_eq!(*res.responses_count_per_code.get(&200).unwrap(), 1_u64);
796    }
797
798    #[cfg_attr(miri, ignore)]
799    #[tokio::test]
800    async fn request_msgpack_several_payloads() {
801        let server = MockServer::start_async().await;
802
803        let mock = server
804            .mock_async(|when, then| {
805                when.method(POST)
806                    .header("Content-type", "application/msgpack")
807                    .path("/");
808                then.status(200).body("");
809            })
810            .await;
811
812        let header_tags = TracerHeaderTags::default();
813
814        let payload = setup_payload(&header_tags);
815        let data = SendData::new(
816            100,
817            TracerPayloadCollection::V07(vec![payload.clone(), payload.clone()]),
818            header_tags,
819            &Endpoint {
820                api_key: None,
821                url: server.url("/").parse::<hyper::Uri>().unwrap(),
822                timeout_ms: ONE_SECOND,
823                ..Endpoint::default()
824            },
825        );
826
827        let data_payload_len = rmp_compute_payload_len(&data.tracer_payloads);
828        let client = libdd_common::http_common::new_default_client();
829        let res = data.send(&client).await;
830
831        mock.assert_calls_async(2).await;
832
833        assert_eq!(res.last_result.unwrap().status(), 200);
834        assert_eq!(res.errors_timeout, 0);
835        assert_eq!(res.errors_network, 0);
836        assert_eq!(res.errors_status_code, 0);
837        assert_eq!(res.requests_count, 2);
838        assert_eq!(res.chunks_sent, 2);
839        assert_eq!(res.bytes_sent, data_payload_len as u64);
840        assert_eq!(*res.responses_count_per_code.get(&200).unwrap(), 2_u64);
841    }
842
843    #[cfg_attr(miri, ignore)]
844    #[tokio::test]
845    async fn request_error_status_code() {
846        let server = MockServer::start_async().await;
847
848        let mock = server
849            .mock_async(|when, then| {
850                when.method(POST)
851                    .header("Content-type", "application/msgpack")
852                    .path("/");
853                then.status(500).body("");
854            })
855            .await;
856
857        let payload = setup_payload(&HEADER_TAGS);
858        let data = SendData::new(
859            100,
860            TracerPayloadCollection::V07(vec![payload]),
861            HEADER_TAGS,
862            &Endpoint {
863                api_key: None,
864                url: server.url("/").parse::<hyper::Uri>().unwrap(),
865                timeout_ms: ONE_SECOND,
866                ..Endpoint::default()
867            },
868        );
869
870        let client = libdd_common::http_common::new_default_client();
871        let res = data.send(&client).await;
872
873        mock.assert_calls_async(5).await;
874
875        assert!(res.last_result.is_ok());
876        assert_eq!(res.last_result.unwrap().status(), 500);
877        assert_eq!(res.errors_timeout, 0);
878        assert_eq!(res.errors_network, 0);
879        assert_eq!(res.errors_status_code, 1);
880        assert_eq!(res.requests_count, 5);
881        assert_eq!(res.chunks_sent, 0);
882        assert_eq!(res.bytes_sent, 0);
883        assert_eq!(*res.responses_count_per_code.get(&500).unwrap(), 1_u64);
884    }
885
886    #[cfg_attr(miri, ignore)]
887    #[tokio::test]
888    async fn request_error_network() {
889        // Server not created in order to return a 'connection refused' error.
890        let payload = setup_payload(&HEADER_TAGS);
891        let data = SendData::new(
892            100,
893            TracerPayloadCollection::V07(vec![payload]),
894            HEADER_TAGS,
895            &Endpoint {
896                api_key: None,
897                url: "http://127.0.0.1:4321/".parse::<hyper::Uri>().unwrap(),
898                timeout_ms: ONE_SECOND,
899                ..Endpoint::default()
900            },
901        );
902
903        let client = libdd_common::http_common::new_default_client();
904        let res = data.send(&client).await;
905
906        assert!(res.last_result.is_err());
907        match std::env::consts::OS {
908            "windows" => {
909                // On windows the TCP/IP stack returns a timeout error (at hyper level) rather
910                // than a connection refused error despite not having a listening socket on the
911                // port.
912                assert_eq!(res.errors_timeout, 1);
913                assert_eq!(res.errors_network, 0);
914            }
915            _ => {
916                assert_eq!(res.errors_timeout, 0);
917                assert_eq!(res.errors_network, 1);
918            }
919        }
920        assert_eq!(res.errors_status_code, 0);
921        assert_eq!(res.requests_count, 5);
922        assert_eq!(res.errors_status_code, 0);
923        assert_eq!(res.chunks_sent, 0);
924        assert_eq!(res.bytes_sent, 0);
925        assert_eq!(res.responses_count_per_code.len(), 0);
926    }
927
928    #[cfg_attr(miri, ignore)]
929    #[tokio::test]
930    async fn request_error_timeout_v04() {
931        let server = MockServer::start_async().await;
932
933        let header_tags = HEADER_TAGS;
934        let mock = server
935            .mock_async(|when, then| {
936                when.method(POST)
937                    .header(DATADOG_TRACE_COUNT.as_str(), "2")
938                    .header("Content-type", "application/msgpack")
939                    .header("datadog-meta-lang", header_tags.lang)
940                    .header(
941                        "datadog-meta-lang-interpreter",
942                        header_tags.lang_interpreter,
943                    )
944                    .header("datadog-meta-lang-version", header_tags.lang_version)
945                    .header(
946                        "datadog-meta-lang-interpreter-vendor",
947                        header_tags.lang_vendor,
948                    )
949                    .header("datadog-meta-tracer-version", header_tags.tracer_version)
950                    .header("datadog-container-id", header_tags.container_id)
951                    .path("/");
952                then.status(200).body("").delay(Duration::from_millis(500));
953            })
954            .await;
955
956        let header_tags = HEADER_TAGS;
957
958        let trace = vec![create_test_no_alloc_span(1234, 12342, 12341, 1, false)];
959        let data = SendData::new(
960            100,
961            TracerPayloadCollection::V04(vec![trace.clone(), trace.clone()]),
962            header_tags,
963            &Endpoint {
964                api_key: None,
965                url: server.url("/").parse::<hyper::Uri>().unwrap(),
966                timeout_ms: 200,
967                ..Endpoint::default()
968            },
969        );
970
971        let client = libdd_common::http_common::new_default_client();
972        let res = data.send(&client).await;
973
974        mock.assert_calls_async(5).await;
975
976        assert_eq!(res.errors_timeout, 1);
977        assert_eq!(res.errors_network, 0);
978        assert_eq!(res.errors_status_code, 0);
979        assert_eq!(res.requests_count, 5);
980        assert_eq!(res.chunks_sent, 0);
981        assert_eq!(res.bytes_sent, 0);
982        assert_eq!(res.responses_count_per_code.len(), 0);
983    }
984
985    #[cfg_attr(miri, ignore)]
986    #[tokio::test]
987    async fn request_error_timeout_v07() {
988        let server = MockServer::start_async().await;
989
990        let mock = server
991            .mock_async(|when, then| {
992                when.method(POST)
993                    .header("Content-type", "application/msgpack")
994                    .path("/");
995                then.status(200).body("").delay(Duration::from_millis(500));
996            })
997            .await;
998
999        let header_tags = TracerHeaderTags::default();
1000
1001        let payload = setup_payload(&header_tags);
1002        let data = SendData::new(
1003            100,
1004            TracerPayloadCollection::V07(vec![payload.clone(), payload.clone()]),
1005            header_tags,
1006            &Endpoint {
1007                api_key: None,
1008                url: server.url("/").parse::<hyper::Uri>().unwrap(),
1009                timeout_ms: 200,
1010                ..Endpoint::default()
1011            },
1012        );
1013
1014        let client = libdd_common::http_common::new_default_client();
1015        let res = data.send(&client).await;
1016
1017        mock.assert_calls_async(10).await;
1018
1019        assert_eq!(res.errors_timeout, 1);
1020        assert_eq!(res.errors_network, 0);
1021        assert_eq!(res.errors_status_code, 0);
1022        assert_eq!(res.requests_count, 5);
1023        assert_eq!(res.chunks_sent, 0);
1024        assert_eq!(res.bytes_sent, 0);
1025        assert_eq!(res.responses_count_per_code.len(), 0);
1026    }
1027
1028    #[test]
1029    fn test_builder() {
1030        let header_tags = HEADER_TAGS;
1031        let payload = setup_payload(&header_tags);
1032        let retry_strategy = RetryStrategy::new(5, 100, RetryBackoffType::Constant, None);
1033
1034        let send_data = SendDataBuilder::new(
1035            100,
1036            TracerPayloadCollection::V07(vec![payload]),
1037            header_tags,
1038            &Endpoint::default(),
1039        )
1040        // Test with_api_key()
1041        .with_api_key("TEST-KEY")
1042        // Test with_retry_strategy()
1043        .with_retry_strategy(retry_strategy.clone())
1044        .build();
1045
1046        assert_eq!(
1047            send_data.target.api_key,
1048            Some(std::borrow::Cow::Borrowed("TEST-KEY"))
1049        );
1050        assert_eq!(send_data.retry_strategy, retry_strategy);
1051    }
1052}