Skip to main content

libdd_trace_utils/send_data/
mod.rs

1// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
2// SPDX-License-Identifier: Apache-2.0
3
4pub mod send_data_result;
5
6use crate::msgpack_encoder;
7use crate::send_with_retry::{send_with_retry, RetryStrategy, SendWithRetryResult};
8use crate::trace_utils::TracerHeaderTags;
9use crate::tracer_payload::TracerPayloadCollection;
10use anyhow::{anyhow, Context};
11use futures::stream::FuturesUnordered;
12use futures::StreamExt;
13use http::{header::CONTENT_TYPE, HeaderMap, HeaderValue};
14use libdd_capabilities::{HttpClientCapability, SleepCapability};
15use libdd_common::{
16    header::{
17        APPLICATION_MSGPACK, APPLICATION_PROTOBUF, DATADOG_SEND_REAL_HTTP_STATUS,
18        DATADOG_TRACE_COUNT,
19    },
20    Endpoint,
21};
22use libdd_trace_protobuf::pb::{AgentPayload, TracerPayload};
23use send_data_result::SendDataResult;
24use std::collections::HashMap;
25#[cfg(feature = "compression")]
26use std::io::Write;
27#[cfg(feature = "compression")]
28use zstd::stream::write::Encoder;
29
30#[derive(Debug)]
31/// `SendData` is a structure that holds the data to be sent to a target endpoint.
32/// It includes the payloads to be sent, the size of the data, the target endpoint,
33/// headers for the request, and a retry strategy for sending the data.
34///
35/// # Example
36///
37/// ```rust
38/// use libdd_trace_protobuf::pb::TracerPayload;
39/// use libdd_trace_utils::send_data::{
40///     SendData,
41/// };
42/// use libdd_common::Endpoint;
43/// use libdd_trace_utils::send_with_retry::{RetryBackoffType, RetryStrategy};
44/// use libdd_trace_utils::trace_utils::TracerHeaderTags;
45/// use libdd_trace_utils::tracer_payload::TracerPayloadCollection;
46///
47/// #[cfg_attr(miri, ignore)]
48/// async fn update_send_results_example() {
49///     let size = 100;
50///     let tracer_payload = TracerPayloadCollection::V07(
51///         vec![TracerPayload::default()]); // Replace with actual payload
52///     let tracer_header_tags = TracerHeaderTags::default(); // Replace with actual header tags
53///     let target = Endpoint::default(); // Replace with actual endpoint
54///
55///     let mut send_data = SendData::new(size, tracer_payload, tracer_header_tags, &target);
56///
57///     // Set a custom retry strategy
58///     let retry_strategy = RetryStrategy::new(3, 10, RetryBackoffType::Exponential, Some(5));
59///
60///     send_data.set_retry_strategy(retry_strategy);
61///
62///     // Send the data (caller picks the capabilities implementation)
63///     use libdd_capabilities::HttpClientCapability;
64///     let capabilities = libdd_capabilities_impl::NativeCapabilities::new_client();
65///     let result = send_data.send(&capabilities).await;
66/// }
67/// ```
68pub struct SendData {
69    pub(crate) tracer_payloads: TracerPayloadCollection,
70    pub(crate) size: usize, // have a rough size estimate to force flushing if it's large
71    target: Endpoint,
72    headers: HeaderMap,
73    retry_strategy: RetryStrategy,
74    #[cfg(feature = "compression")]
75    compression: Compression,
76}
77
78#[cfg(feature = "compression")]
79#[derive(Debug, Clone)]
80pub enum Compression {
81    Zstd(i32),
82    None,
83}
84
85pub struct SendDataBuilder {
86    pub(crate) tracer_payloads: TracerPayloadCollection,
87    pub(crate) size: usize,
88    target: Endpoint,
89    headers: HeaderMap,
90    retry_strategy: RetryStrategy,
91    #[cfg(feature = "compression")]
92    compression: Compression,
93}
94
95impl SendDataBuilder {
96    pub fn new(
97        size: usize,
98        tracer_payload: TracerPayloadCollection,
99        tracer_header_tags: TracerHeaderTags,
100        target: &Endpoint,
101    ) -> SendDataBuilder {
102        let mut headers: HeaderMap = tracer_header_tags.into();
103        headers.insert(DATADOG_SEND_REAL_HTTP_STATUS, HeaderValue::from_static("1"));
104        SendDataBuilder {
105            tracer_payloads: tracer_payload,
106            size,
107            target: target.clone(),
108            headers,
109            retry_strategy: RetryStrategy::default(),
110            #[cfg(feature = "compression")]
111            compression: Compression::None,
112        }
113    }
114
115    #[cfg(feature = "compression")]
116    pub fn with_compression(mut self, compression: Compression) -> SendDataBuilder {
117        self.compression = compression;
118        self
119    }
120
121    pub fn with_api_key(mut self, api_key: &str) -> SendDataBuilder {
122        self.target.api_key = Some(api_key.to_string().into());
123        self
124    }
125
126    pub fn with_retry_strategy(mut self, retry_strategy: RetryStrategy) -> SendDataBuilder {
127        self.retry_strategy = retry_strategy;
128        self
129    }
130
131    pub fn build(self) -> SendData {
132        SendData {
133            tracer_payloads: self.tracer_payloads,
134            size: self.size,
135            target: self.target,
136            headers: self.headers,
137            retry_strategy: self.retry_strategy,
138            #[cfg(feature = "compression")]
139            compression: self.compression,
140        }
141    }
142}
143
144impl SendData {
145    /// Creates a new instance of `SendData`.
146    ///
147    /// # Arguments
148    ///
149    /// * `size`: Approximate size of the data to be sent in bytes.
150    /// * `tracer_payload`: The payload to be sent.
151    /// * `tracer_header_tags`: The header tags for the tracer.
152    /// * `target`: The endpoint to which the data will be sent.
153    ///
154    /// # Returns
155    ///
156    /// A new `SendData` instance.
157    #[allow(unused_variables)]
158    pub fn new(
159        size: usize,
160        tracer_payload: TracerPayloadCollection,
161        tracer_header_tags: TracerHeaderTags,
162        target: &Endpoint,
163    ) -> SendData {
164        let mut headers: HeaderMap = tracer_header_tags.into();
165        headers.insert(DATADOG_SEND_REAL_HTTP_STATUS, HeaderValue::from_static("1"));
166        SendData {
167            tracer_payloads: tracer_payload,
168            size,
169            target: target.clone(),
170            headers,
171            retry_strategy: RetryStrategy::default(),
172            #[cfg(feature = "compression")]
173            compression: Compression::None,
174        }
175    }
176
177    /// Returns the user defined approximate size of the data to be sent in bytes.
178    ///
179    /// # Returns
180    ///
181    /// The size of the data.
182    pub fn len(&self) -> usize {
183        self.size
184    }
185
186    /// Checks if the user defined approximate size of the data to be sent is zero.
187    ///
188    /// # Returns
189    ///
190    /// `true` if size is 0, `false` otherwise.
191    pub fn is_empty(&self) -> bool {
192        self.size == 0
193    }
194
195    /// Returns the target endpoint.
196    ///
197    /// # Returns
198    ///
199    /// A reference to the target endpoint.
200    pub fn get_target(&self) -> &Endpoint {
201        &self.target
202    }
203
204    /// Returns the payloads to be sent.
205    ///
206    /// # Returns
207    ///
208    /// A reference to the vector of payloads.
209    pub fn get_payloads(&self) -> &TracerPayloadCollection {
210        &self.tracer_payloads
211    }
212
213    /// Overrides the default RetryStrategy with user-defined values.
214    ///
215    /// # Arguments
216    ///
217    /// * `retry_strategy`: The new retry strategy to be used.
218    pub fn set_retry_strategy(&mut self, retry_strategy: RetryStrategy) {
219        self.retry_strategy = retry_strategy;
220    }
221
222    /// Sends the data to the target endpoint.
223    ///
224    /// # Returns
225    ///
226    /// A `SendDataResult` instance containing the result of the operation.
227    pub async fn send<C: HttpClientCapability + SleepCapability>(
228        &self,
229        capabilities: &C,
230    ) -> SendDataResult {
231        self.send_internal(capabilities, None).await
232    }
233
234    async fn send_internal<C: HttpClientCapability + SleepCapability>(
235        &self,
236        capabilities: &C,
237        endpoint: Option<Endpoint>,
238    ) -> SendDataResult {
239        if self.use_protobuf() {
240            self.send_with_protobuf(capabilities, endpoint).await
241        } else {
242            self.send_with_msgpack(capabilities, endpoint).await
243        }
244    }
245
246    async fn send_payload<C: HttpClientCapability + SleepCapability>(
247        &self,
248        capabilities: &C,
249        chunks: u64,
250        payload: Vec<u8>,
251        headers: HeaderMap,
252        endpoint: Option<&Endpoint>,
253    ) -> (SendWithRetryResult, u64, u64) {
254        #[allow(clippy::unwrap_used)]
255        let payload_len = u64::try_from(payload.len()).unwrap();
256        (
257            send_with_retry(
258                capabilities,
259                endpoint.unwrap_or(&self.target),
260                payload,
261                &headers,
262                &self.retry_strategy,
263            )
264            .await,
265            payload_len,
266            chunks,
267        )
268    }
269
270    fn use_protobuf(&self) -> bool {
271        self.target.api_key.is_some()
272    }
273
274    #[cfg(feature = "compression")]
275    fn compress_payload(&self, payload: Vec<u8>, headers: &mut HeaderMap) -> Vec<u8> {
276        match self.compression {
277            Compression::Zstd(level) => {
278                let result = (|| -> std::io::Result<Vec<u8>> {
279                    let mut encoder = Encoder::new(Vec::new(), level)?;
280                    encoder.write_all(&payload)?;
281                    encoder.finish()
282                })();
283
284                match result {
285                    Ok(compressed_payload) => {
286                        headers.insert(
287                            http::header::CONTENT_ENCODING,
288                            HeaderValue::from_static("zstd"),
289                        );
290                        compressed_payload
291                    }
292                    Err(_) => payload,
293                }
294            }
295            _ => payload,
296        }
297    }
298
299    async fn send_with_protobuf<C: HttpClientCapability + SleepCapability>(
300        &self,
301        capabilities: &C,
302        endpoint: Option<Endpoint>,
303    ) -> SendDataResult {
304        let mut result = SendDataResult::default();
305
306        #[allow(clippy::unwrap_used)]
307        let chunks = u64::try_from(self.tracer_payloads.size()).unwrap();
308
309        match &self.tracer_payloads {
310            TracerPayloadCollection::V07(payloads) => {
311                let agent_payload = construct_agent_payload(payloads.to_vec());
312                let serialized_trace_payload = match serialize_proto_payload(&agent_payload)
313                    .context("Failed to serialize trace agent payload, dropping traces")
314                {
315                    Ok(p) => p,
316                    Err(e) => return result.error(e),
317                };
318                let mut request_headers = self.headers.clone();
319
320                #[cfg(feature = "compression")]
321                let final_payload =
322                    self.compress_payload(serialized_trace_payload, &mut request_headers);
323
324                #[cfg(not(feature = "compression"))]
325                let final_payload = serialized_trace_payload;
326
327                request_headers.insert(CONTENT_TYPE, APPLICATION_PROTOBUF);
328
329                let (response, bytes_sent, chunks) = self
330                    .send_payload(
331                        capabilities,
332                        chunks,
333                        final_payload,
334                        request_headers,
335                        endpoint.as_ref(),
336                    )
337                    .await;
338
339                result.update(response, bytes_sent, chunks);
340
341                result
342            }
343            _ => result,
344        }
345    }
346
347    async fn send_with_msgpack<C: HttpClientCapability + SleepCapability>(
348        &self,
349        capabilities: &C,
350        endpoint: Option<Endpoint>,
351    ) -> SendDataResult {
352        let mut result = SendDataResult::default();
353        let mut futures = FuturesUnordered::new();
354
355        match &self.tracer_payloads {
356            TracerPayloadCollection::V07(payloads) => {
357                for tracer_payload in payloads {
358                    #[allow(clippy::unwrap_used)]
359                    let chunks = u64::try_from(tracer_payload.chunks.len()).unwrap();
360                    let mut headers = self.headers.clone();
361                    headers.reserve(2);
362                    headers.insert(DATADOG_TRACE_COUNT, chunks.into());
363                    headers.insert(CONTENT_TYPE, APPLICATION_MSGPACK);
364
365                    let payload = match rmp_serde::to_vec_named(tracer_payload) {
366                        Ok(p) => p,
367                        Err(e) => return result.error(anyhow!(e)),
368                    };
369
370                    futures.push(self.send_payload(
371                        capabilities,
372                        chunks,
373                        payload,
374                        headers,
375                        endpoint.as_ref(),
376                    ));
377                }
378            }
379            TracerPayloadCollection::V04(payload) => {
380                #[allow(clippy::unwrap_used)]
381                let chunks = u64::try_from(self.tracer_payloads.size()).unwrap();
382                let mut headers = self.headers.clone();
383                headers.reserve(2);
384                headers.insert(DATADOG_TRACE_COUNT, chunks.into());
385                headers.insert(CONTENT_TYPE, APPLICATION_MSGPACK);
386
387                let payload = msgpack_encoder::v04::to_vec(payload);
388
389                futures.push(self.send_payload(
390                    capabilities,
391                    chunks,
392                    payload,
393                    headers,
394                    endpoint.as_ref(),
395                ));
396            }
397            TracerPayloadCollection::V05(payload) => {
398                #[allow(clippy::unwrap_used)]
399                let chunks = u64::try_from(self.tracer_payloads.size()).unwrap();
400                let mut headers = self.headers.clone();
401                headers.reserve(2);
402                headers.insert(DATADOG_TRACE_COUNT, chunks.into());
403                headers.insert(CONTENT_TYPE, APPLICATION_MSGPACK);
404
405                let payload = match rmp_serde::to_vec(payload) {
406                    Ok(p) => p,
407                    Err(e) => return result.error(anyhow!(e)),
408                };
409
410                futures.push(self.send_payload(
411                    capabilities,
412                    chunks,
413                    payload,
414                    headers,
415                    endpoint.as_ref(),
416                ));
417            }
418        }
419
420        loop {
421            match futures.next().await {
422                Some((response, payload_len, chunks)) => {
423                    result.update(response, payload_len, chunks);
424                    if result.last_result.is_err() {
425                        return result;
426                    }
427                }
428                None => return result,
429            }
430        }
431    }
432}
433
434fn construct_agent_payload(tracer_payloads: Vec<TracerPayload>) -> AgentPayload {
435    AgentPayload {
436        host_name: "".to_string(),
437        env: "".to_string(),
438        agent_version: "".to_string(),
439        error_tps: 60.0,
440        target_tps: 60.0,
441        tags: HashMap::new(),
442        tracer_payloads,
443        rare_sampler_enabled: false,
444        idx_tracer_payloads: Vec::new(),
445    }
446}
447
448fn serialize_proto_payload<T>(payload: &T) -> anyhow::Result<Vec<u8>>
449where
450    T: prost::Message,
451{
452    let mut buf = Vec::with_capacity(payload.encoded_len());
453    payload.encode(&mut buf)?;
454    Ok(buf)
455}
456
457#[cfg(test)]
458mod tests {
459    use super::*;
460    use crate::send_with_retry::{RetryBackoffType, RetryStrategy};
461    use crate::test_utils::create_test_no_alloc_span;
462    use crate::trace_utils::{construct_trace_chunk, construct_tracer_payload, TracerPayloadTags};
463    use crate::tracer_header_tags::TracerHeaderTags;
464    use httpmock::prelude::*;
465    use httpmock::MockServer;
466    use libdd_capabilities::HttpClientCapability;
467    use libdd_capabilities_impl::NativeCapabilities;
468    use libdd_common::Endpoint;
469    use libdd_trace_protobuf::pb::Span;
470    use std::collections::HashMap;
471    use std::time::Duration;
472
473    const ONE_SECOND: u64 = 1_000;
474    const HEADER_TAGS: TracerHeaderTags = TracerHeaderTags {
475        lang: "test-lang",
476        lang_version: "2.0",
477        lang_interpreter: "interpreter",
478        lang_vendor: "vendor",
479        tracer_version: "1.0",
480        container_id: "id",
481        client_computed_top_level: false,
482        client_computed_stats: false,
483        dropped_p0_traces: 0,
484        dropped_p0_spans: 0,
485    };
486
487    fn setup_payload(header_tags: &TracerHeaderTags) -> TracerPayload {
488        let tracer_payload_tags = TracerPayloadTags {
489            env: "TEST".to_string(),
490            app_version: "1.0".to_string(),
491            hostname: "test_bench".to_string(),
492            runtime_id: "id".to_string(),
493        };
494
495        let chunk = construct_trace_chunk(vec![Span {
496            service: "test-service".to_string(),
497            name: "test-service-name".to_string(),
498            resource: "test-service-resource".to_string(),
499            trace_id: 111,
500            span_id: 222,
501            parent_id: 333,
502            start: 1,
503            duration: 5,
504            error: 0,
505            meta: HashMap::new(),
506            metrics: HashMap::new(),
507            meta_struct: HashMap::new(),
508            r#type: "".to_string(),
509            span_links: vec![],
510            span_events: vec![],
511        }]);
512
513        construct_tracer_payload(vec![chunk], header_tags, tracer_payload_tags)
514    }
515
516    fn compute_payload_len(collection: &TracerPayloadCollection) -> usize {
517        match collection {
518            TracerPayloadCollection::V07(payloads) => {
519                let agent_payload = construct_agent_payload(payloads.to_vec());
520                let serialized_trace_payload = serialize_proto_payload(&agent_payload).unwrap();
521                serialized_trace_payload.len()
522            }
523            _ => 0,
524        }
525    }
526
527    fn rmp_compute_payload_len(collection: &TracerPayloadCollection) -> usize {
528        match collection {
529            TracerPayloadCollection::V07(payloads) => {
530                let mut total: usize = 0;
531                for payload in payloads {
532                    total += rmp_serde::to_vec_named(payload).unwrap().len();
533                }
534                total
535            }
536            TracerPayloadCollection::V04(payloads) => {
537                msgpack_encoder::v04::to_encoded_byte_len(payloads) as usize
538            }
539            TracerPayloadCollection::V05(payloads) => rmp_serde::to_vec(payloads).unwrap().len(),
540        }
541    }
542
543    #[test]
544    fn send_data_new_api_key() {
545        let header_tags = TracerHeaderTags::default();
546
547        let payload = setup_payload(&header_tags);
548        let data = SendData::new(
549            100,
550            TracerPayloadCollection::V07(vec![payload]),
551            HEADER_TAGS,
552            &Endpoint {
553                api_key: Some(std::borrow::Cow::Borrowed("TEST-KEY")),
554                url: "/foo/bar?baz".parse::<hyper::Uri>().unwrap(),
555                timeout_ms: ONE_SECOND,
556                ..Endpoint::default()
557            },
558        );
559
560        assert_eq!(data.size, 100);
561
562        assert_eq!(data.target.api_key.unwrap(), "TEST-KEY");
563        assert_eq!(data.target.url.path(), "/foo/bar");
564    }
565
566    #[test]
567    fn send_data_new_no_api_key() {
568        let header_tags = TracerHeaderTags::default();
569
570        let payload = setup_payload(&header_tags);
571        let data = SendData::new(
572            100,
573            TracerPayloadCollection::V07(vec![payload]),
574            header_tags.clone(),
575            &Endpoint {
576                api_key: None,
577                url: "/foo/bar?baz".parse::<hyper::Uri>().unwrap(),
578                timeout_ms: ONE_SECOND,
579                ..Endpoint::default()
580            },
581        );
582
583        assert_eq!(data.size, 100);
584
585        assert_eq!(data.target.api_key, None);
586        assert_eq!(data.target.url.path(), "/foo/bar");
587
588        for (key, value) in &HeaderMap::from(header_tags) {
589            assert_eq!(data.headers.get(key), Some(value));
590        }
591    }
592
593    #[cfg_attr(miri, ignore)]
594    #[tokio::test]
595    async fn request_protobuf() {
596        let server = MockServer::start_async().await;
597
598        let mock = server
599            .mock_async(|when, then| {
600                when.method(POST)
601                    .header("Content-type", "application/x-protobuf")
602                    .header("DD-API-KEY", "TEST-KEY")
603                    .path("/");
604                then.status(202).body("");
605            })
606            .await;
607
608        let header_tags = TracerHeaderTags::default();
609
610        let payload = setup_payload(&header_tags);
611        let data = SendData::new(
612            100,
613            TracerPayloadCollection::V07(vec![payload.clone()]),
614            header_tags,
615            &Endpoint {
616                api_key: Some(std::borrow::Cow::Borrowed("TEST-KEY")),
617                url: server.url("/").parse::<hyper::Uri>().unwrap(),
618                timeout_ms: ONE_SECOND,
619                ..Endpoint::default()
620            },
621        );
622
623        let data_payload_len = compute_payload_len(&data.tracer_payloads);
624        let res = data.send(&NativeCapabilities::new_client()).await;
625
626        mock.assert_async().await;
627
628        assert_eq!(
629            res.last_result.unwrap().status(),
630            http::StatusCode::ACCEPTED
631        );
632        assert_eq!(res.errors_timeout, 0);
633        assert_eq!(res.errors_network, 0);
634        assert_eq!(res.errors_status_code, 0);
635        assert_eq!(res.requests_count, 1);
636        assert_eq!(res.chunks_sent, 1);
637        assert_eq!(res.bytes_sent, data_payload_len as u64);
638        assert_eq!(*res.responses_count_per_code.get(&202).unwrap(), 1_u64);
639    }
640
641    #[cfg_attr(miri, ignore)]
642    #[tokio::test]
643    async fn request_protobuf_several_payloads() {
644        let server = MockServer::start_async().await;
645
646        let mock = server
647            .mock_async(|when, then| {
648                when.method(POST)
649                    .header("Content-type", "application/x-protobuf")
650                    .header("DD-API-KEY", "TEST-KEY")
651                    .path("/");
652                then.status(202).body("");
653            })
654            .await;
655
656        let header_tags = TracerHeaderTags::default();
657
658        let payload = setup_payload(&header_tags);
659        let data = SendData::new(
660            100,
661            TracerPayloadCollection::V07(vec![payload.clone(), payload.clone()]),
662            header_tags,
663            &Endpoint {
664                api_key: Some(std::borrow::Cow::Borrowed("TEST-KEY")),
665                url: server.url("/").parse::<hyper::Uri>().unwrap(),
666                timeout_ms: ONE_SECOND,
667                ..Endpoint::default()
668            },
669        );
670
671        let data_payload_len = compute_payload_len(&data.tracer_payloads);
672        let res = data.send(&NativeCapabilities::new_client()).await;
673
674        mock.assert_async().await;
675
676        assert_eq!(
677            res.last_result.unwrap().status(),
678            http::StatusCode::ACCEPTED
679        );
680        assert_eq!(res.errors_timeout, 0);
681        assert_eq!(res.errors_network, 0);
682        assert_eq!(res.errors_status_code, 0);
683        assert_eq!(res.requests_count, 1);
684        assert_eq!(res.chunks_sent, 2);
685        assert_eq!(res.bytes_sent, data_payload_len as u64);
686        assert_eq!(*res.responses_count_per_code.get(&202).unwrap(), 1_u64);
687    }
688
689    #[cfg_attr(miri, ignore)]
690    #[tokio::test]
691    async fn request_msgpack_v07() {
692        let server = MockServer::start_async().await;
693
694        let header_tags = HEADER_TAGS;
695        let mock = server
696            .mock_async(|when, then| {
697                when.method(POST)
698                    .header(DATADOG_TRACE_COUNT.as_str(), "1")
699                    .header("Content-type", "application/msgpack")
700                    .header("datadog-meta-lang", header_tags.lang)
701                    .header(
702                        "datadog-meta-lang-interpreter",
703                        header_tags.lang_interpreter,
704                    )
705                    .header("datadog-meta-lang-version", header_tags.lang_version)
706                    .header(
707                        "datadog-meta-lang-interpreter-vendor",
708                        header_tags.lang_vendor,
709                    )
710                    .header("datadog-meta-tracer-version", header_tags.tracer_version)
711                    .header("datadog-container-id", header_tags.container_id)
712                    .header("Datadog-Send-Real-Http-Status", "1")
713                    .path("/");
714                then.status(200).body("");
715            })
716            .await;
717
718        let header_tags = HEADER_TAGS;
719
720        let payload = setup_payload(&header_tags);
721        let data = SendData::new(
722            100,
723            TracerPayloadCollection::V07(vec![payload.clone()]),
724            header_tags,
725            &Endpoint {
726                api_key: None,
727                url: server.url("/").parse::<hyper::Uri>().unwrap(),
728                timeout_ms: ONE_SECOND,
729                ..Endpoint::default()
730            },
731        );
732
733        let data_payload_len = rmp_compute_payload_len(&data.tracer_payloads);
734        let res = data.send(&NativeCapabilities::new_client()).await;
735
736        mock.assert_async().await;
737
738        assert_eq!(res.last_result.unwrap().status(), http::StatusCode::OK);
739        assert_eq!(res.errors_timeout, 0);
740        assert_eq!(res.errors_network, 0);
741        assert_eq!(res.errors_status_code, 0);
742        assert_eq!(res.requests_count, 1);
743        assert_eq!(res.chunks_sent, 1);
744        assert_eq!(res.bytes_sent, data_payload_len as u64);
745        assert_eq!(*res.responses_count_per_code.get(&200).unwrap(), 1_u64);
746    }
747
748    #[cfg_attr(miri, ignore)]
749    #[tokio::test]
750    async fn request_msgpack_v04() {
751        let server = MockServer::start_async().await;
752
753        let header_tags = HEADER_TAGS;
754        let mock = server
755            .mock_async(|when, then| {
756                when.method(POST)
757                    .header(DATADOG_TRACE_COUNT.as_str(), "1")
758                    .header("Content-type", "application/msgpack")
759                    .header("datadog-meta-lang", header_tags.lang)
760                    .header(
761                        "datadog-meta-lang-interpreter",
762                        header_tags.lang_interpreter,
763                    )
764                    .header("datadog-meta-lang-version", header_tags.lang_version)
765                    .header(
766                        "datadog-meta-lang-interpreter-vendor",
767                        header_tags.lang_vendor,
768                    )
769                    .header("datadog-meta-tracer-version", header_tags.tracer_version)
770                    .header("datadog-container-id", header_tags.container_id)
771                    .path("/");
772                then.status(200).body("");
773            })
774            .await;
775
776        let header_tags = HEADER_TAGS;
777
778        let trace = vec![create_test_no_alloc_span(1234, 12342, 12341, 1, false)];
779        let data = SendData::new(
780            100,
781            TracerPayloadCollection::V04(vec![trace.clone()]),
782            header_tags,
783            &Endpoint {
784                api_key: None,
785                url: server.url("/").parse::<hyper::Uri>().unwrap(),
786                timeout_ms: ONE_SECOND,
787                ..Endpoint::default()
788            },
789        );
790
791        let data_payload_len = rmp_compute_payload_len(&data.tracer_payloads);
792        let res = data.send(&NativeCapabilities::new_client()).await;
793
794        mock.assert_async().await;
795
796        assert_eq!(res.last_result.unwrap().status(), http::StatusCode::OK);
797        assert_eq!(res.errors_timeout, 0);
798        assert_eq!(res.errors_network, 0);
799        assert_eq!(res.errors_status_code, 0);
800        assert_eq!(res.requests_count, 1);
801        assert_eq!(res.chunks_sent, 1);
802        assert_eq!(res.bytes_sent, data_payload_len as u64);
803        assert_eq!(*res.responses_count_per_code.get(&200).unwrap(), 1_u64);
804    }
805
806    #[cfg_attr(miri, ignore)]
807    #[tokio::test]
808    async fn request_msgpack_several_payloads() {
809        let server = MockServer::start_async().await;
810
811        let mock = server
812            .mock_async(|when, then| {
813                when.method(POST)
814                    .header("Content-type", "application/msgpack")
815                    .path("/");
816                then.status(200).body("");
817            })
818            .await;
819
820        let header_tags = TracerHeaderTags::default();
821
822        let payload = setup_payload(&header_tags);
823        let data = SendData::new(
824            100,
825            TracerPayloadCollection::V07(vec![payload.clone(), payload.clone()]),
826            header_tags,
827            &Endpoint {
828                api_key: None,
829                url: server.url("/").parse::<hyper::Uri>().unwrap(),
830                timeout_ms: ONE_SECOND,
831                ..Endpoint::default()
832            },
833        );
834
835        let data_payload_len = rmp_compute_payload_len(&data.tracer_payloads);
836        let res = data.send(&NativeCapabilities::new_client()).await;
837
838        mock.assert_calls_async(2).await;
839
840        assert_eq!(res.last_result.unwrap().status(), http::StatusCode::OK);
841        assert_eq!(res.errors_timeout, 0);
842        assert_eq!(res.errors_network, 0);
843        assert_eq!(res.errors_status_code, 0);
844        assert_eq!(res.requests_count, 2);
845        assert_eq!(res.chunks_sent, 2);
846        assert_eq!(res.bytes_sent, data_payload_len as u64);
847        assert_eq!(*res.responses_count_per_code.get(&200).unwrap(), 2_u64);
848    }
849
850    #[cfg_attr(miri, ignore)]
851    #[tokio::test]
852    async fn request_error_status_code() {
853        let server = MockServer::start_async().await;
854
855        let mock = server
856            .mock_async(|when, then| {
857                when.method(POST)
858                    .header("Content-type", "application/msgpack")
859                    .path("/");
860                then.status(500).body("");
861            })
862            .await;
863
864        let payload = setup_payload(&HEADER_TAGS);
865        let data = SendData::new(
866            100,
867            TracerPayloadCollection::V07(vec![payload]),
868            HEADER_TAGS,
869            &Endpoint {
870                api_key: None,
871                url: server.url("/").parse::<hyper::Uri>().unwrap(),
872                timeout_ms: ONE_SECOND,
873                ..Endpoint::default()
874            },
875        );
876
877        let res = data.send(&NativeCapabilities::new_client()).await;
878
879        mock.assert_calls_async(5).await;
880
881        assert!(res.last_result.is_ok());
882        assert_eq!(
883            res.last_result.unwrap().status(),
884            http::StatusCode::INTERNAL_SERVER_ERROR
885        );
886        assert_eq!(res.errors_timeout, 0);
887        assert_eq!(res.errors_network, 0);
888        assert_eq!(res.errors_status_code, 1);
889        assert_eq!(res.requests_count, 5);
890        assert_eq!(res.chunks_sent, 0);
891        assert_eq!(res.bytes_sent, 0);
892        assert_eq!(*res.responses_count_per_code.get(&500).unwrap(), 1_u64);
893    }
894
895    #[cfg_attr(miri, ignore)]
896    #[tokio::test]
897    async fn request_error_network() {
898        // Server not created in order to return a 'connection refused' error.
899        let payload = setup_payload(&HEADER_TAGS);
900        let data = SendData::new(
901            100,
902            TracerPayloadCollection::V07(vec![payload]),
903            HEADER_TAGS,
904            &Endpoint {
905                api_key: None,
906                url: "http://127.0.0.1:4321/".parse::<hyper::Uri>().unwrap(),
907                timeout_ms: ONE_SECOND,
908                ..Endpoint::default()
909            },
910        );
911
912        let res = data.send(&NativeCapabilities::new_client()).await;
913
914        assert!(res.last_result.is_err());
915        match std::env::consts::OS {
916            "windows" => {
917                // On windows the TCP/IP stack returns a timeout error (at hyper level) rather
918                // than a connection refused error despite not having a listening socket on the
919                // port.
920                assert_eq!(res.errors_timeout, 1);
921                assert_eq!(res.errors_network, 0);
922            }
923            _ => {
924                assert_eq!(res.errors_timeout, 0);
925                assert_eq!(res.errors_network, 1);
926            }
927        }
928        assert_eq!(res.errors_status_code, 0);
929        assert_eq!(res.requests_count, 5);
930        assert_eq!(res.errors_status_code, 0);
931        assert_eq!(res.chunks_sent, 0);
932        assert_eq!(res.bytes_sent, 0);
933        assert_eq!(res.responses_count_per_code.len(), 0);
934    }
935
936    #[cfg_attr(miri, ignore)]
937    #[tokio::test]
938    async fn request_error_timeout_v04() {
939        let server = MockServer::start_async().await;
940
941        let header_tags = HEADER_TAGS;
942        let mock = server
943            .mock_async(|when, then| {
944                when.method(POST)
945                    .header(DATADOG_TRACE_COUNT.as_str(), "2")
946                    .header("Content-type", "application/msgpack")
947                    .header("datadog-meta-lang", header_tags.lang)
948                    .header(
949                        "datadog-meta-lang-interpreter",
950                        header_tags.lang_interpreter,
951                    )
952                    .header("datadog-meta-lang-version", header_tags.lang_version)
953                    .header(
954                        "datadog-meta-lang-interpreter-vendor",
955                        header_tags.lang_vendor,
956                    )
957                    .header("datadog-meta-tracer-version", header_tags.tracer_version)
958                    .header("datadog-container-id", header_tags.container_id)
959                    .path("/");
960                then.status(200).body("").delay(Duration::from_millis(500));
961            })
962            .await;
963
964        let header_tags = HEADER_TAGS;
965
966        let trace = vec![create_test_no_alloc_span(1234, 12342, 12341, 1, false)];
967        let data = SendData::new(
968            100,
969            TracerPayloadCollection::V04(vec![trace.clone(), trace.clone()]),
970            header_tags,
971            &Endpoint {
972                api_key: None,
973                url: server.url("/").parse::<hyper::Uri>().unwrap(),
974                timeout_ms: 200,
975                ..Endpoint::default()
976            },
977        );
978
979        let res = data.send(&NativeCapabilities::new_client()).await;
980
981        mock.assert_calls_async(5).await;
982
983        assert_eq!(res.errors_timeout, 1);
984        assert_eq!(res.errors_network, 0);
985        assert_eq!(res.errors_status_code, 0);
986        assert_eq!(res.requests_count, 5);
987        assert_eq!(res.chunks_sent, 0);
988        assert_eq!(res.bytes_sent, 0);
989        assert_eq!(res.responses_count_per_code.len(), 0);
990    }
991
992    #[cfg_attr(miri, ignore)]
993    #[tokio::test]
994    async fn request_error_timeout_v07() {
995        let server = MockServer::start_async().await;
996
997        let mock = server
998            .mock_async(|when, then| {
999                when.method(POST)
1000                    .header("Content-type", "application/msgpack")
1001                    .path("/");
1002                then.status(200).body("").delay(Duration::from_millis(500));
1003            })
1004            .await;
1005
1006        let header_tags = TracerHeaderTags::default();
1007
1008        let payload = setup_payload(&header_tags);
1009        let data = SendData::new(
1010            100,
1011            TracerPayloadCollection::V07(vec![payload.clone(), payload.clone()]),
1012            header_tags,
1013            &Endpoint {
1014                api_key: None,
1015                url: server.url("/").parse::<hyper::Uri>().unwrap(),
1016                timeout_ms: 200,
1017                ..Endpoint::default()
1018            },
1019        );
1020
1021        let res = data.send(&NativeCapabilities::new_client()).await;
1022
1023        mock.assert_calls_async(10).await;
1024
1025        assert_eq!(res.errors_timeout, 1);
1026        assert_eq!(res.errors_network, 0);
1027        assert_eq!(res.errors_status_code, 0);
1028        assert_eq!(res.requests_count, 5);
1029        assert_eq!(res.chunks_sent, 0);
1030        assert_eq!(res.bytes_sent, 0);
1031        assert_eq!(res.responses_count_per_code.len(), 0);
1032    }
1033
1034    #[test]
1035    fn test_builder() {
1036        let header_tags = HEADER_TAGS;
1037        let payload = setup_payload(&header_tags);
1038        let retry_strategy = RetryStrategy::new(5, 100, RetryBackoffType::Constant, None);
1039
1040        let send_data = SendDataBuilder::new(
1041            100,
1042            TracerPayloadCollection::V07(vec![payload]),
1043            header_tags,
1044            &Endpoint::default(),
1045        )
1046        // Test with_api_key()
1047        .with_api_key("TEST-KEY")
1048        // Test with_retry_strategy()
1049        .with_retry_strategy(retry_strategy.clone())
1050        .build();
1051
1052        assert_eq!(
1053            send_data.target.api_key,
1054            Some(std::borrow::Cow::Borrowed("TEST-KEY"))
1055        );
1056        assert_eq!(send_data.retry_strategy, retry_strategy);
1057    }
1058}