libdd_trace_utils/send_data/
mod.rs

1// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
2// SPDX-License-Identifier: Apache-2.0
3
4pub mod send_data_result;
5
6use crate::msgpack_encoder;
7use crate::send_with_retry::{send_with_retry, RetryStrategy, SendWithRetryResult};
8use crate::trace_utils::TracerHeaderTags;
9use crate::tracer_payload::TracerPayloadCollection;
10use anyhow::{anyhow, Context};
11use futures::stream::FuturesUnordered;
12use futures::StreamExt;
13use hyper::header::CONTENT_TYPE;
14use libdd_common::{
15    header::{
16        APPLICATION_MSGPACK_STR, APPLICATION_PROTOBUF_STR, DATADOG_SEND_REAL_HTTP_STATUS_STR,
17        DATADOG_TRACE_COUNT_STR,
18    },
19    Connect, Endpoint, GenericHttpClient,
20};
21use libdd_trace_protobuf::pb::{AgentPayload, TracerPayload};
22use send_data_result::SendDataResult;
23use std::collections::HashMap;
24#[cfg(feature = "compression")]
25use std::io::Write;
26#[cfg(feature = "compression")]
27use zstd::stream::write::Encoder;
28
29#[derive(Debug, Clone)]
30/// `SendData` is a structure that holds the data to be sent to a target endpoint.
31/// It includes the payloads to be sent, the size of the data, the target endpoint,
32/// headers for the request, and a retry strategy for sending the data.
33///
34/// # Example
35///
36/// ```rust
37/// use libdd_trace_protobuf::pb::TracerPayload;
38/// use libdd_trace_utils::send_data::{
39///     SendData,
40/// };
41/// use libdd_common::Endpoint;
42/// use libdd_common::hyper_migration::new_default_client;
43/// use libdd_trace_utils::send_with_retry::{RetryBackoffType, RetryStrategy};
44/// use libdd_trace_utils::trace_utils::TracerHeaderTags;
45/// use libdd_trace_utils::tracer_payload::TracerPayloadCollection;
46///
47/// #[cfg_attr(miri, ignore)]
48/// async fn update_send_results_example() {
49///     let size = 100;
50///     let tracer_payload = TracerPayloadCollection::V07(
51///         vec![TracerPayload::default()]); // Replace with actual payload
52///     let tracer_header_tags = TracerHeaderTags::default(); // Replace with actual header tags
53///     let target = Endpoint::default(); // Replace with actual endpoint
54///
55///     let mut send_data = SendData::new(size, tracer_payload, tracer_header_tags, &target);
56///
57///     // Set a custom retry strategy
58///     let retry_strategy = RetryStrategy::new(3, 10, RetryBackoffType::Exponential, Some(5));
59///
60///     send_data.set_retry_strategy(retry_strategy);
61///
62///     let client = new_default_client();
63///     // Send the data
64///     let result = send_data.send(&client).await;
65/// }
66/// ```
67pub struct SendData {
68    pub(crate) tracer_payloads: TracerPayloadCollection,
69    pub(crate) size: usize, // have a rough size estimate to force flushing if it's large
70    target: Endpoint,
71    headers: HashMap<&'static str, String>,
72    retry_strategy: RetryStrategy,
73    #[cfg(feature = "compression")]
74    compression: Compression,
75}
76
77#[cfg(feature = "compression")]
78#[derive(Debug, Clone)]
79pub enum Compression {
80    Zstd(i32),
81    None,
82}
83
84#[derive(Clone)]
85pub struct SendDataBuilder {
86    pub(crate) tracer_payloads: TracerPayloadCollection,
87    pub(crate) size: usize,
88    target: Endpoint,
89    headers: HashMap<&'static str, String>,
90    retry_strategy: RetryStrategy,
91    #[cfg(feature = "compression")]
92    compression: Compression,
93}
94
95impl SendDataBuilder {
96    pub fn new(
97        size: usize,
98        tracer_payload: TracerPayloadCollection,
99        tracer_header_tags: TracerHeaderTags,
100        target: &Endpoint,
101    ) -> SendDataBuilder {
102        let mut headers: HashMap<&'static str, String> = tracer_header_tags.into();
103        headers.insert(DATADOG_SEND_REAL_HTTP_STATUS_STR, "1".to_string());
104        SendDataBuilder {
105            tracer_payloads: tracer_payload,
106            size,
107            target: target.clone(),
108            headers,
109            retry_strategy: RetryStrategy::default(),
110            #[cfg(feature = "compression")]
111            compression: Compression::None,
112        }
113    }
114
115    #[cfg(feature = "compression")]
116    pub fn with_compression(mut self, compression: Compression) -> SendDataBuilder {
117        self.compression = compression;
118        self
119    }
120
121    pub fn with_api_key(mut self, api_key: &str) -> SendDataBuilder {
122        self.target.api_key = Some(api_key.to_string().into());
123        self
124    }
125
126    pub fn with_retry_strategy(mut self, retry_strategy: RetryStrategy) -> SendDataBuilder {
127        self.retry_strategy = retry_strategy;
128        self
129    }
130
131    pub fn build(self) -> SendData {
132        SendData {
133            tracer_payloads: self.tracer_payloads,
134            size: self.size,
135            target: self.target,
136            headers: self.headers,
137            retry_strategy: self.retry_strategy,
138            #[cfg(feature = "compression")]
139            compression: self.compression,
140        }
141    }
142}
143
144impl SendData {
145    /// Creates a new instance of `SendData`.
146    ///
147    /// # Arguments
148    ///
149    /// * `size`: Approximate size of the data to be sent in bytes.
150    /// * `tracer_payload`: The payload to be sent.
151    /// * `tracer_header_tags`: The header tags for the tracer.
152    /// * `target`: The endpoint to which the data will be sent.
153    ///
154    /// # Returns
155    ///
156    /// A new `SendData` instance.
157    #[allow(unused_variables)]
158    pub fn new(
159        size: usize,
160        tracer_payload: TracerPayloadCollection,
161        tracer_header_tags: TracerHeaderTags,
162        target: &Endpoint,
163    ) -> SendData {
164        let mut headers: HashMap<&'static str, String> = tracer_header_tags.into();
165        headers.insert(DATADOG_SEND_REAL_HTTP_STATUS_STR, "1".to_string());
166        SendData {
167            tracer_payloads: tracer_payload,
168            size,
169            target: target.clone(),
170            headers,
171            retry_strategy: RetryStrategy::default(),
172            #[cfg(feature = "compression")]
173            compression: Compression::None,
174        }
175    }
176
177    /// Returns the user defined approximate size of the data to be sent in bytes.
178    ///
179    /// # Returns
180    ///
181    /// The size of the data.
182    pub fn len(&self) -> usize {
183        self.size
184    }
185
186    /// Checks if the user defined approximate size of the data to be sent is zero.
187    ///
188    /// # Returns
189    ///
190    /// `true` if size is 0, `false` otherwise.
191    pub fn is_empty(&self) -> bool {
192        self.size == 0
193    }
194
195    /// Returns the target endpoint.
196    ///
197    /// # Returns
198    ///
199    /// A reference to the target endpoint.
200    pub fn get_target(&self) -> &Endpoint {
201        &self.target
202    }
203
204    /// Returns the payloads to be sent.
205    ///
206    /// # Returns
207    ///
208    /// A reference to the vector of payloads.
209    pub fn get_payloads(&self) -> &TracerPayloadCollection {
210        &self.tracer_payloads
211    }
212
213    /// Overrides the default RetryStrategy with user-defined values.
214    ///
215    /// # Arguments
216    ///
217    /// * `retry_strategy`: The new retry strategy to be used.
218    pub fn set_retry_strategy(&mut self, retry_strategy: RetryStrategy) {
219        self.retry_strategy = retry_strategy;
220    }
221
222    /// Returns a clone of the SendData with the user-defined endpoint.
223    ///
224    /// # Arguments
225    ///
226    /// * `endpoint`: The new endpoint to be used.
227    pub fn with_endpoint(&self, endpoint: Endpoint) -> SendData {
228        SendData {
229            target: endpoint,
230            ..self.clone()
231        }
232    }
233
234    /// Sends the data to the target endpoint.
235    ///
236    /// # Returns
237    ///
238    /// A `SendDataResult` instance containing the result of the operation.
239    pub async fn send<C: Connect>(&self, http_client: &GenericHttpClient<C>) -> SendDataResult {
240        self.send_internal(http_client).await
241    }
242
243    async fn send_internal<C: Connect>(
244        &self,
245        http_client: &GenericHttpClient<C>,
246    ) -> SendDataResult {
247        if self.use_protobuf() {
248            self.send_with_protobuf(http_client).await
249        } else {
250            self.send_with_msgpack(http_client).await
251        }
252    }
253
254    async fn send_payload<C: Connect>(
255        &self,
256        chunks: u64,
257        payload: Vec<u8>,
258        headers: HashMap<&'static str, String>,
259        http_client: &GenericHttpClient<C>,
260    ) -> (SendWithRetryResult, u64, u64) {
261        #[allow(clippy::unwrap_used)]
262        let payload_len = u64::try_from(payload.len()).unwrap();
263        (
264            send_with_retry(
265                http_client,
266                &self.target,
267                payload,
268                &headers,
269                &self.retry_strategy,
270            )
271            .await,
272            payload_len,
273            chunks,
274        )
275    }
276
277    fn use_protobuf(&self) -> bool {
278        self.target.api_key.is_some()
279    }
280
281    #[cfg(feature = "compression")]
282    fn compress_payload(&self, payload: Vec<u8>, headers: &mut HashMap<&str, String>) -> Vec<u8> {
283        match self.compression {
284            Compression::Zstd(level) => {
285                let result = (|| -> std::io::Result<Vec<u8>> {
286                    let mut encoder = Encoder::new(Vec::new(), level)?;
287                    encoder.write_all(&payload)?;
288                    encoder.finish()
289                })();
290
291                match result {
292                    Ok(compressed_payload) => {
293                        headers.insert("Content-Encoding", "zstd".to_string());
294                        compressed_payload
295                    }
296                    Err(_) => payload,
297                }
298            }
299            _ => payload,
300        }
301    }
302
303    async fn send_with_protobuf<C: Connect>(
304        &self,
305        http_client: &GenericHttpClient<C>,
306    ) -> SendDataResult {
307        let mut result = SendDataResult::default();
308
309        #[allow(clippy::unwrap_used)]
310        let chunks = u64::try_from(self.tracer_payloads.size()).unwrap();
311
312        match &self.tracer_payloads {
313            TracerPayloadCollection::V07(payloads) => {
314                let agent_payload = construct_agent_payload(payloads.to_vec());
315                let serialized_trace_payload = match serialize_proto_payload(&agent_payload)
316                    .context("Failed to serialize trace agent payload, dropping traces")
317                {
318                    Ok(p) => p,
319                    Err(e) => return result.error(e),
320                };
321                let mut request_headers = self.headers.clone();
322
323                #[cfg(feature = "compression")]
324                let final_payload =
325                    self.compress_payload(serialized_trace_payload, &mut request_headers);
326
327                #[cfg(not(feature = "compression"))]
328                let final_payload = serialized_trace_payload;
329
330                request_headers.insert(CONTENT_TYPE.as_str(), APPLICATION_PROTOBUF_STR.to_string());
331
332                let (response, bytes_sent, chunks) = self
333                    .send_payload(chunks, final_payload, request_headers, http_client)
334                    .await;
335
336                result.update(response, bytes_sent, chunks);
337
338                result
339            }
340            _ => result,
341        }
342    }
343
344    async fn send_with_msgpack<C: Connect>(
345        &self,
346        http_client: &GenericHttpClient<C>,
347    ) -> SendDataResult {
348        let mut result = SendDataResult::default();
349        let mut futures = FuturesUnordered::new();
350
351        match &self.tracer_payloads {
352            TracerPayloadCollection::V07(payloads) => {
353                for tracer_payload in payloads {
354                    #[allow(clippy::unwrap_used)]
355                    let chunks = u64::try_from(tracer_payload.chunks.len()).unwrap();
356                    let mut headers = self.headers.clone();
357                    headers.insert(DATADOG_TRACE_COUNT_STR, chunks.to_string());
358                    headers.insert(CONTENT_TYPE.as_str(), APPLICATION_MSGPACK_STR.to_string());
359
360                    let payload = match rmp_serde::to_vec_named(tracer_payload) {
361                        Ok(p) => p,
362                        Err(e) => return result.error(anyhow!(e)),
363                    };
364
365                    futures.push(self.send_payload(chunks, payload, headers, http_client));
366                }
367            }
368            TracerPayloadCollection::V04(payload) => {
369                #[allow(clippy::unwrap_used)]
370                let chunks = u64::try_from(self.tracer_payloads.size()).unwrap();
371                let mut headers = self.headers.clone();
372                headers.insert(DATADOG_TRACE_COUNT_STR, chunks.to_string());
373                headers.insert(CONTENT_TYPE.as_str(), APPLICATION_MSGPACK_STR.to_string());
374
375                let payload = msgpack_encoder::v04::to_vec(payload);
376
377                futures.push(self.send_payload(chunks, payload, headers, http_client));
378            }
379            TracerPayloadCollection::V05(payload) => {
380                #[allow(clippy::unwrap_used)]
381                let chunks = u64::try_from(self.tracer_payloads.size()).unwrap();
382                let mut headers = self.headers.clone();
383                headers.insert(DATADOG_TRACE_COUNT_STR, chunks.to_string());
384                headers.insert(CONTENT_TYPE.as_str(), APPLICATION_MSGPACK_STR.to_string());
385
386                let payload = match rmp_serde::to_vec(payload) {
387                    Ok(p) => p,
388                    Err(e) => return result.error(anyhow!(e)),
389                };
390
391                futures.push(self.send_payload(chunks, payload, headers, http_client));
392            }
393        }
394
395        loop {
396            match futures.next().await {
397                Some((response, payload_len, chunks)) => {
398                    result.update(response, payload_len, chunks);
399                    if result.last_result.is_err() {
400                        return result;
401                    }
402                }
403                None => return result,
404            }
405        }
406    }
407}
408
409fn construct_agent_payload(tracer_payloads: Vec<TracerPayload>) -> AgentPayload {
410    AgentPayload {
411        host_name: "".to_string(),
412        env: "".to_string(),
413        agent_version: "".to_string(),
414        error_tps: 60.0,
415        target_tps: 60.0,
416        tags: HashMap::new(),
417        tracer_payloads,
418        rare_sampler_enabled: false,
419        idx_tracer_payloads: Vec::new(),
420    }
421}
422
423fn serialize_proto_payload<T>(payload: &T) -> anyhow::Result<Vec<u8>>
424where
425    T: prost::Message,
426{
427    let mut buf = Vec::with_capacity(payload.encoded_len());
428    payload.encode(&mut buf)?;
429    Ok(buf)
430}
431
432#[cfg(test)]
433mod tests {
434    use super::*;
435    use crate::send_with_retry::{RetryBackoffType, RetryStrategy};
436    use crate::test_utils::create_test_no_alloc_span;
437    use crate::trace_utils::{construct_trace_chunk, construct_tracer_payload, RootSpanTags};
438    use crate::tracer_header_tags::TracerHeaderTags;
439    use httpmock::prelude::*;
440    use httpmock::MockServer;
441    use libdd_common::Endpoint;
442    use libdd_trace_protobuf::pb::Span;
443    use std::collections::HashMap;
444    use std::time::Duration;
445
446    const ONE_SECOND: u64 = 1_000;
447    const HEADER_TAGS: TracerHeaderTags = TracerHeaderTags {
448        lang: "test-lang",
449        lang_version: "2.0",
450        lang_interpreter: "interpreter",
451        lang_vendor: "vendor",
452        tracer_version: "1.0",
453        container_id: "id",
454        client_computed_top_level: false,
455        client_computed_stats: false,
456        dropped_p0_traces: 0,
457        dropped_p0_spans: 0,
458    };
459
460    fn setup_payload(header_tags: &TracerHeaderTags) -> TracerPayload {
461        let root_tags = RootSpanTags {
462            env: "TEST",
463            app_version: "1.0",
464            hostname: "test_bench",
465            runtime_id: "id",
466        };
467
468        let chunk = construct_trace_chunk(vec![Span {
469            service: "test-service".to_string(),
470            name: "test-service-name".to_string(),
471            resource: "test-service-resource".to_string(),
472            trace_id: 111,
473            span_id: 222,
474            parent_id: 333,
475            start: 1,
476            duration: 5,
477            error: 0,
478            meta: HashMap::new(),
479            metrics: HashMap::new(),
480            meta_struct: HashMap::new(),
481            r#type: "".to_string(),
482            span_links: vec![],
483            span_events: vec![],
484        }]);
485
486        construct_tracer_payload(vec![chunk], header_tags, root_tags)
487    }
488
489    fn compute_payload_len(collection: &TracerPayloadCollection) -> usize {
490        match collection {
491            TracerPayloadCollection::V07(payloads) => {
492                let agent_payload = construct_agent_payload(payloads.to_vec());
493                let serialized_trace_payload = serialize_proto_payload(&agent_payload).unwrap();
494                serialized_trace_payload.len()
495            }
496            _ => 0,
497        }
498    }
499
500    fn rmp_compute_payload_len(collection: &TracerPayloadCollection) -> usize {
501        match collection {
502            TracerPayloadCollection::V07(payloads) => {
503                let mut total: usize = 0;
504                for payload in payloads {
505                    total += rmp_serde::to_vec_named(payload).unwrap().len();
506                }
507                total
508            }
509            TracerPayloadCollection::V04(payloads) => {
510                msgpack_encoder::v04::to_len(payloads) as usize
511            }
512            TracerPayloadCollection::V05(payloads) => rmp_serde::to_vec(payloads).unwrap().len(),
513        }
514    }
515
516    #[test]
517    fn send_data_new_api_key() {
518        let header_tags = TracerHeaderTags::default();
519
520        let payload = setup_payload(&header_tags);
521        let data = SendData::new(
522            100,
523            TracerPayloadCollection::V07(vec![payload]),
524            HEADER_TAGS,
525            &Endpoint {
526                api_key: Some(std::borrow::Cow::Borrowed("TEST-KEY")),
527                url: "/foo/bar?baz".parse::<hyper::Uri>().unwrap(),
528                timeout_ms: ONE_SECOND,
529                ..Endpoint::default()
530            },
531        );
532
533        assert_eq!(data.size, 100);
534
535        assert_eq!(data.target.api_key.unwrap(), "TEST-KEY");
536        assert_eq!(data.target.url.path(), "/foo/bar");
537    }
538
539    #[test]
540    fn send_data_new_no_api_key() {
541        let header_tags = TracerHeaderTags::default();
542
543        let payload = setup_payload(&header_tags);
544        let data = SendData::new(
545            100,
546            TracerPayloadCollection::V07(vec![payload]),
547            header_tags.clone(),
548            &Endpoint {
549                api_key: None,
550                url: "/foo/bar?baz".parse::<hyper::Uri>().unwrap(),
551                timeout_ms: ONE_SECOND,
552                ..Endpoint::default()
553            },
554        );
555
556        assert_eq!(data.size, 100);
557
558        assert_eq!(data.target.api_key, None);
559        assert_eq!(data.target.url.path(), "/foo/bar");
560
561        for (key, value) in HashMap::from(header_tags) {
562            assert_eq!(data.headers.get(key).unwrap(), &value);
563        }
564    }
565
566    #[cfg_attr(miri, ignore)]
567    #[tokio::test]
568    async fn request_protobuf() {
569        let server = MockServer::start_async().await;
570
571        let mock = server
572            .mock_async(|when, then| {
573                when.method(POST)
574                    .header("Content-type", "application/x-protobuf")
575                    .header("DD-API-KEY", "TEST-KEY")
576                    .path("/");
577                then.status(202).body("");
578            })
579            .await;
580
581        let header_tags = TracerHeaderTags::default();
582
583        let payload = setup_payload(&header_tags);
584        let data = SendData::new(
585            100,
586            TracerPayloadCollection::V07(vec![payload.clone()]),
587            header_tags,
588            &Endpoint {
589                api_key: Some(std::borrow::Cow::Borrowed("TEST-KEY")),
590                url: server.url("/").parse::<hyper::Uri>().unwrap(),
591                timeout_ms: ONE_SECOND,
592                ..Endpoint::default()
593            },
594        );
595
596        let data_payload_len = compute_payload_len(&data.tracer_payloads);
597        let client = libdd_common::hyper_migration::new_default_client();
598        let res = data.send(&client).await;
599
600        mock.assert_async().await;
601
602        assert_eq!(res.last_result.unwrap().status(), 202);
603        assert_eq!(res.errors_timeout, 0);
604        assert_eq!(res.errors_network, 0);
605        assert_eq!(res.errors_status_code, 0);
606        assert_eq!(res.requests_count, 1);
607        assert_eq!(res.chunks_sent, 1);
608        assert_eq!(res.bytes_sent, data_payload_len as u64);
609        assert_eq!(*res.responses_count_per_code.get(&202).unwrap(), 1_u64);
610    }
611
612    #[cfg_attr(miri, ignore)]
613    #[tokio::test]
614    async fn request_protobuf_several_payloads() {
615        let server = MockServer::start_async().await;
616
617        let mock = server
618            .mock_async(|when, then| {
619                when.method(POST)
620                    .header("Content-type", "application/x-protobuf")
621                    .header("DD-API-KEY", "TEST-KEY")
622                    .path("/");
623                then.status(202).body("");
624            })
625            .await;
626
627        let header_tags = TracerHeaderTags::default();
628
629        let payload = setup_payload(&header_tags);
630        let data = SendData::new(
631            100,
632            TracerPayloadCollection::V07(vec![payload.clone(), payload.clone()]),
633            header_tags,
634            &Endpoint {
635                api_key: Some(std::borrow::Cow::Borrowed("TEST-KEY")),
636                url: server.url("/").parse::<hyper::Uri>().unwrap(),
637                timeout_ms: ONE_SECOND,
638                ..Endpoint::default()
639            },
640        );
641
642        let data_payload_len = compute_payload_len(&data.tracer_payloads);
643        let client = libdd_common::hyper_migration::new_default_client();
644        let res = data.send(&client).await;
645
646        mock.assert_async().await;
647
648        assert_eq!(res.last_result.unwrap().status(), 202);
649        assert_eq!(res.errors_timeout, 0);
650        assert_eq!(res.errors_network, 0);
651        assert_eq!(res.errors_status_code, 0);
652        assert_eq!(res.requests_count, 1);
653        assert_eq!(res.chunks_sent, 2);
654        assert_eq!(res.bytes_sent, data_payload_len as u64);
655        assert_eq!(*res.responses_count_per_code.get(&202).unwrap(), 1_u64);
656    }
657
658    #[cfg_attr(miri, ignore)]
659    #[tokio::test]
660    async fn request_msgpack_v07() {
661        let server = MockServer::start_async().await;
662
663        let header_tags = HEADER_TAGS;
664        let mock = server
665            .mock_async(|when, then| {
666                when.method(POST)
667                    .header(DATADOG_TRACE_COUNT_STR, "1")
668                    .header("Content-type", "application/msgpack")
669                    .header("datadog-meta-lang", header_tags.lang)
670                    .header(
671                        "datadog-meta-lang-interpreter",
672                        header_tags.lang_interpreter,
673                    )
674                    .header("datadog-meta-lang-version", header_tags.lang_version)
675                    .header(
676                        "datadog-meta-lang-interpreter-vendor",
677                        header_tags.lang_vendor,
678                    )
679                    .header("datadog-meta-tracer-version", header_tags.tracer_version)
680                    .header("datadog-container-id", header_tags.container_id)
681                    .header("Datadog-Send-Real-Http-Status", "1")
682                    .path("/");
683                then.status(200).body("");
684            })
685            .await;
686
687        let header_tags = HEADER_TAGS;
688
689        let payload = setup_payload(&header_tags);
690        let data = SendData::new(
691            100,
692            TracerPayloadCollection::V07(vec![payload.clone()]),
693            header_tags,
694            &Endpoint {
695                api_key: None,
696                url: server.url("/").parse::<hyper::Uri>().unwrap(),
697                timeout_ms: ONE_SECOND,
698                ..Endpoint::default()
699            },
700        );
701
702        let data_payload_len = rmp_compute_payload_len(&data.tracer_payloads);
703        let client = libdd_common::hyper_migration::new_default_client();
704        let res = data.send(&client).await;
705
706        mock.assert_async().await;
707
708        assert_eq!(res.last_result.unwrap().status(), 200);
709        assert_eq!(res.errors_timeout, 0);
710        assert_eq!(res.errors_network, 0);
711        assert_eq!(res.errors_status_code, 0);
712        assert_eq!(res.requests_count, 1);
713        assert_eq!(res.chunks_sent, 1);
714        assert_eq!(res.bytes_sent, data_payload_len as u64);
715        assert_eq!(*res.responses_count_per_code.get(&200).unwrap(), 1_u64);
716    }
717
718    #[cfg_attr(miri, ignore)]
719    #[tokio::test]
720    async fn request_msgpack_v04() {
721        let server = MockServer::start_async().await;
722
723        let header_tags = HEADER_TAGS;
724        let mock = server
725            .mock_async(|when, then| {
726                when.method(POST)
727                    .header(DATADOG_TRACE_COUNT_STR, "1")
728                    .header("Content-type", "application/msgpack")
729                    .header("datadog-meta-lang", header_tags.lang)
730                    .header(
731                        "datadog-meta-lang-interpreter",
732                        header_tags.lang_interpreter,
733                    )
734                    .header("datadog-meta-lang-version", header_tags.lang_version)
735                    .header(
736                        "datadog-meta-lang-interpreter-vendor",
737                        header_tags.lang_vendor,
738                    )
739                    .header("datadog-meta-tracer-version", header_tags.tracer_version)
740                    .header("datadog-container-id", header_tags.container_id)
741                    .path("/");
742                then.status(200).body("");
743            })
744            .await;
745
746        let header_tags = HEADER_TAGS;
747
748        let trace = vec![create_test_no_alloc_span(1234, 12342, 12341, 1, false)];
749        let data = SendData::new(
750            100,
751            TracerPayloadCollection::V04(vec![trace.clone()]),
752            header_tags,
753            &Endpoint {
754                api_key: None,
755                url: server.url("/").parse::<hyper::Uri>().unwrap(),
756                timeout_ms: ONE_SECOND,
757                ..Endpoint::default()
758            },
759        );
760
761        let data_payload_len = rmp_compute_payload_len(&data.tracer_payloads);
762        let client = libdd_common::hyper_migration::new_default_client();
763        let res = data.send(&client).await;
764
765        mock.assert_async().await;
766
767        assert_eq!(res.last_result.unwrap().status(), 200);
768        assert_eq!(res.errors_timeout, 0);
769        assert_eq!(res.errors_network, 0);
770        assert_eq!(res.errors_status_code, 0);
771        assert_eq!(res.requests_count, 1);
772        assert_eq!(res.chunks_sent, 1);
773        assert_eq!(res.bytes_sent, data_payload_len as u64);
774        assert_eq!(*res.responses_count_per_code.get(&200).unwrap(), 1_u64);
775    }
776
777    #[cfg_attr(miri, ignore)]
778    #[tokio::test]
779    async fn request_msgpack_several_payloads() {
780        let server = MockServer::start_async().await;
781
782        let mock = server
783            .mock_async(|when, then| {
784                when.method(POST)
785                    .header("Content-type", "application/msgpack")
786                    .path("/");
787                then.status(200).body("");
788            })
789            .await;
790
791        let header_tags = TracerHeaderTags::default();
792
793        let payload = setup_payload(&header_tags);
794        let data = SendData::new(
795            100,
796            TracerPayloadCollection::V07(vec![payload.clone(), payload.clone()]),
797            header_tags,
798            &Endpoint {
799                api_key: None,
800                url: server.url("/").parse::<hyper::Uri>().unwrap(),
801                timeout_ms: ONE_SECOND,
802                ..Endpoint::default()
803            },
804        );
805
806        let data_payload_len = rmp_compute_payload_len(&data.tracer_payloads);
807        let client = libdd_common::hyper_migration::new_default_client();
808        let res = data.send(&client).await;
809
810        mock.assert_calls_async(2).await;
811
812        assert_eq!(res.last_result.unwrap().status(), 200);
813        assert_eq!(res.errors_timeout, 0);
814        assert_eq!(res.errors_network, 0);
815        assert_eq!(res.errors_status_code, 0);
816        assert_eq!(res.requests_count, 2);
817        assert_eq!(res.chunks_sent, 2);
818        assert_eq!(res.bytes_sent, data_payload_len as u64);
819        assert_eq!(*res.responses_count_per_code.get(&200).unwrap(), 2_u64);
820    }
821
822    #[cfg_attr(miri, ignore)]
823    #[tokio::test]
824    async fn request_error_status_code() {
825        let server = MockServer::start_async().await;
826
827        let mock = server
828            .mock_async(|when, then| {
829                when.method(POST)
830                    .header("Content-type", "application/msgpack")
831                    .path("/");
832                then.status(500).body("");
833            })
834            .await;
835
836        let payload = setup_payload(&HEADER_TAGS);
837        let data = SendData::new(
838            100,
839            TracerPayloadCollection::V07(vec![payload]),
840            HEADER_TAGS,
841            &Endpoint {
842                api_key: None,
843                url: server.url("/").parse::<hyper::Uri>().unwrap(),
844                timeout_ms: ONE_SECOND,
845                ..Endpoint::default()
846            },
847        );
848
849        let client = libdd_common::hyper_migration::new_default_client();
850        let res = data.send(&client).await;
851
852        mock.assert_calls_async(5).await;
853
854        assert!(res.last_result.is_ok());
855        assert_eq!(res.last_result.unwrap().status(), 500);
856        assert_eq!(res.errors_timeout, 0);
857        assert_eq!(res.errors_network, 0);
858        assert_eq!(res.errors_status_code, 1);
859        assert_eq!(res.requests_count, 5);
860        assert_eq!(res.chunks_sent, 0);
861        assert_eq!(res.bytes_sent, 0);
862        assert_eq!(*res.responses_count_per_code.get(&500).unwrap(), 1_u64);
863    }
864
865    #[cfg_attr(miri, ignore)]
866    #[tokio::test]
867    async fn request_error_network() {
868        // Server not created in order to return a 'connection refused' error.
869        let payload = setup_payload(&HEADER_TAGS);
870        let data = SendData::new(
871            100,
872            TracerPayloadCollection::V07(vec![payload]),
873            HEADER_TAGS,
874            &Endpoint {
875                api_key: None,
876                url: "http://127.0.0.1:4321/".parse::<hyper::Uri>().unwrap(),
877                timeout_ms: ONE_SECOND,
878                ..Endpoint::default()
879            },
880        );
881
882        let client = libdd_common::hyper_migration::new_default_client();
883        let res = data.send(&client).await;
884
885        assert!(res.last_result.is_err());
886        match std::env::consts::OS {
887            "windows" => {
888                // On windows the TCP/IP stack returns a timeout error (at hyper level) rather
889                // than a connection refused error despite not having a listening socket on the
890                // port.
891                assert_eq!(res.errors_timeout, 1);
892                assert_eq!(res.errors_network, 0);
893            }
894            _ => {
895                assert_eq!(res.errors_timeout, 0);
896                assert_eq!(res.errors_network, 1);
897            }
898        }
899        assert_eq!(res.errors_status_code, 0);
900        assert_eq!(res.requests_count, 5);
901        assert_eq!(res.errors_status_code, 0);
902        assert_eq!(res.chunks_sent, 0);
903        assert_eq!(res.bytes_sent, 0);
904        assert_eq!(res.responses_count_per_code.len(), 0);
905    }
906
907    #[cfg_attr(miri, ignore)]
908    #[tokio::test]
909    async fn request_error_timeout_v04() {
910        let server = MockServer::start_async().await;
911
912        let header_tags = HEADER_TAGS;
913        let mock = server
914            .mock_async(|when, then| {
915                when.method(POST)
916                    .header(DATADOG_TRACE_COUNT_STR, "2")
917                    .header("Content-type", "application/msgpack")
918                    .header("datadog-meta-lang", header_tags.lang)
919                    .header(
920                        "datadog-meta-lang-interpreter",
921                        header_tags.lang_interpreter,
922                    )
923                    .header("datadog-meta-lang-version", header_tags.lang_version)
924                    .header(
925                        "datadog-meta-lang-interpreter-vendor",
926                        header_tags.lang_vendor,
927                    )
928                    .header("datadog-meta-tracer-version", header_tags.tracer_version)
929                    .header("datadog-container-id", header_tags.container_id)
930                    .path("/");
931                then.status(200).body("").delay(Duration::from_millis(500));
932            })
933            .await;
934
935        let header_tags = HEADER_TAGS;
936
937        let trace = vec![create_test_no_alloc_span(1234, 12342, 12341, 1, false)];
938        let data = SendData::new(
939            100,
940            TracerPayloadCollection::V04(vec![trace.clone(), trace.clone()]),
941            header_tags,
942            &Endpoint {
943                api_key: None,
944                url: server.url("/").parse::<hyper::Uri>().unwrap(),
945                timeout_ms: 200,
946                ..Endpoint::default()
947            },
948        );
949
950        let client = libdd_common::hyper_migration::new_default_client();
951        let res = data.send(&client).await;
952
953        mock.assert_calls_async(5).await;
954
955        assert_eq!(res.errors_timeout, 1);
956        assert_eq!(res.errors_network, 0);
957        assert_eq!(res.errors_status_code, 0);
958        assert_eq!(res.requests_count, 5);
959        assert_eq!(res.chunks_sent, 0);
960        assert_eq!(res.bytes_sent, 0);
961        assert_eq!(res.responses_count_per_code.len(), 0);
962    }
963
964    #[cfg_attr(miri, ignore)]
965    #[tokio::test]
966    async fn request_error_timeout_v07() {
967        let server = MockServer::start_async().await;
968
969        let mock = server
970            .mock_async(|when, then| {
971                when.method(POST)
972                    .header("Content-type", "application/msgpack")
973                    .path("/");
974                then.status(200).body("").delay(Duration::from_millis(500));
975            })
976            .await;
977
978        let header_tags = TracerHeaderTags::default();
979
980        let payload = setup_payload(&header_tags);
981        let data = SendData::new(
982            100,
983            TracerPayloadCollection::V07(vec![payload.clone(), payload.clone()]),
984            header_tags,
985            &Endpoint {
986                api_key: None,
987                url: server.url("/").parse::<hyper::Uri>().unwrap(),
988                timeout_ms: 200,
989                ..Endpoint::default()
990            },
991        );
992
993        let client = libdd_common::hyper_migration::new_default_client();
994        let res = data.send(&client).await;
995
996        mock.assert_calls_async(10).await;
997
998        assert_eq!(res.errors_timeout, 1);
999        assert_eq!(res.errors_network, 0);
1000        assert_eq!(res.errors_status_code, 0);
1001        assert_eq!(res.requests_count, 5);
1002        assert_eq!(res.chunks_sent, 0);
1003        assert_eq!(res.bytes_sent, 0);
1004        assert_eq!(res.responses_count_per_code.len(), 0);
1005    }
1006
1007    #[test]
1008    fn test_with_endpoint() {
1009        let header_tags = HEADER_TAGS;
1010        let payload = setup_payload(&header_tags);
1011        let original_endpoint = Endpoint {
1012            api_key: Some(std::borrow::Cow::Borrowed("original-key")),
1013            url: "http://originalexample.com/".parse::<hyper::Uri>().unwrap(),
1014            timeout_ms: 1000,
1015            ..Endpoint::default()
1016        };
1017
1018        let original_data = SendData::new(
1019            100,
1020            TracerPayloadCollection::V07(vec![payload]),
1021            header_tags,
1022            &original_endpoint,
1023        );
1024
1025        let new_endpoint = Endpoint {
1026            api_key: Some(std::borrow::Cow::Borrowed("new-key")),
1027            url: "http://newexample.com/".parse::<hyper::Uri>().unwrap(),
1028            timeout_ms: 2000,
1029            ..Endpoint::default()
1030        };
1031
1032        let new_data = original_data.with_endpoint(new_endpoint.clone());
1033
1034        assert_eq!(new_data.target.api_key, new_endpoint.api_key);
1035        assert_eq!(new_data.target.url, new_endpoint.url);
1036        assert_eq!(new_data.target.timeout_ms, new_endpoint.timeout_ms);
1037
1038        assert_eq!(new_data.size, original_data.size);
1039        assert_eq!(new_data.headers, original_data.headers);
1040        assert_eq!(new_data.retry_strategy, original_data.retry_strategy);
1041        assert_eq!(
1042            new_data.tracer_payloads.size(),
1043            original_data.tracer_payloads.size()
1044        );
1045
1046        assert_eq!(original_data.target.api_key, original_endpoint.api_key);
1047        assert_eq!(original_data.target.url, original_endpoint.url);
1048        assert_eq!(
1049            original_data.target.timeout_ms,
1050            original_endpoint.timeout_ms
1051        );
1052
1053        #[cfg(feature = "compression")]
1054        assert!(matches!(new_data.compression, Compression::None));
1055    }
1056
1057    #[test]
1058    fn test_builder() {
1059        let header_tags = HEADER_TAGS;
1060        let payload = setup_payload(&header_tags);
1061        let retry_strategy = RetryStrategy::new(5, 100, RetryBackoffType::Constant, None);
1062
1063        let send_data = SendDataBuilder::new(
1064            100,
1065            TracerPayloadCollection::V07(vec![payload]),
1066            header_tags,
1067            &Endpoint::default(),
1068        )
1069        // Test with_api_key()
1070        .with_api_key("TEST-KEY")
1071        // Test with_retry_strategy()
1072        .with_retry_strategy(retry_strategy.clone())
1073        .build();
1074
1075        assert_eq!(
1076            send_data.target.api_key,
1077            Some(std::borrow::Cow::Borrowed("TEST-KEY"))
1078        );
1079        assert_eq!(send_data.retry_strategy, retry_strategy);
1080    }
1081}