Skip to main content

libdd_data_pipeline/trace_exporter/
mod.rs

1// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
2// SPDX-License-Identifier: Apache-2.0
3pub mod agent_response;
4pub mod builder;
5pub mod error;
6pub mod metrics;
7pub mod stats;
8mod trace_serializer;
9
10// Re-export the builder
11pub use builder::TraceExporterBuilder;
12
13use self::agent_response::AgentResponse;
14use self::metrics::MetricsEmitter;
15use self::stats::StatsComputationStatus;
16use self::trace_serializer::TraceSerializer;
17use crate::agent_info::{AgentInfoFetcher, ResponseObserver};
18use crate::pausable_worker::PausableWorker;
19use crate::stats_exporter::StatsExporter;
20use crate::telemetry::{SendPayloadTelemetry, TelemetryClient};
21use crate::trace_exporter::agent_response::{
22    AgentResponsePayloadVersion, DATADOG_RATES_PAYLOAD_VERSION_HEADER,
23};
24use crate::trace_exporter::error::{InternalErrorKind, RequestError, TraceExporterError};
25use crate::{
26    agent_info::{self, schema::AgentInfo},
27    health_metrics,
28    health_metrics::{HealthMetric, SendResult, TransportErrorType},
29};
30use arc_swap::{ArcSwap, ArcSwapOption};
31use http::uri::PathAndQuery;
32use http::Uri;
33use http_body_util::BodyExt;
34use libdd_common::tag::Tag;
35use libdd_common::{http_common, Endpoint};
36use libdd_common::{HttpClient, MutexExt};
37use libdd_dogstatsd_client::Client;
38use libdd_telemetry::worker::TelemetryWorker;
39use libdd_trace_utils::msgpack_decoder;
40use libdd_trace_utils::send_with_retry::{
41    send_with_retry, RetryStrategy, SendWithRetryError, SendWithRetryResult,
42};
43use libdd_trace_utils::span::{v04::Span, TraceData};
44use libdd_trace_utils::trace_utils::TracerHeaderTags;
45use std::io;
46use std::sync::{Arc, Mutex};
47use std::time::Duration;
48use std::{borrow::Borrow, collections::HashMap, str::FromStr};
49use tokio::runtime::Runtime;
50use tracing::{debug, error, warn};
51
52const INFO_ENDPOINT: &str = "/info";
53
54/// TraceExporterInputFormat represents the format of the input traces.
55/// The input format can be either Proxy or V0.4, where V0.4 is the default.
56#[derive(Copy, Clone, Debug, Default, PartialEq)]
57#[repr(C)]
58pub enum TraceExporterInputFormat {
59    #[allow(missing_docs)]
60    #[default]
61    V04,
62    V05,
63}
64
65/// TraceExporterOutputFormat represents the format of the output traces.
66/// The output format can be either V0.4 or v0.5, where V0.4 is the default.
67#[derive(Copy, Clone, Debug, Default, PartialEq)]
68#[repr(C)]
69pub enum TraceExporterOutputFormat {
70    #[allow(missing_docs)]
71    #[default]
72    V04,
73    V05,
74}
75
76impl TraceExporterOutputFormat {
77    /// Add the agent trace endpoint path to the URL.
78    fn add_path(&self, url: &Uri) -> Uri {
79        add_path(
80            url,
81            match self {
82                TraceExporterOutputFormat::V04 => "/v0.4/traces",
83                TraceExporterOutputFormat::V05 => "/v0.5/traces",
84            },
85        )
86    }
87}
88
89/// Add a path to the URL.
90///
91/// # Arguments
92///
93/// * `url` - The URL to which the path is to be added.
94/// * `path` - The path to be added to the URL.
95fn add_path(url: &Uri, path: &str) -> Uri {
96    let p_and_q = url.path_and_query();
97
98    #[allow(clippy::unwrap_used)]
99    let new_p_and_q = match p_and_q {
100        Some(pq) => {
101            let p = pq.path();
102            let mut p = p.strip_suffix('/').unwrap_or(p).to_owned();
103            p.push_str(path);
104
105            PathAndQuery::from_str(p.as_str())
106        }
107        None => PathAndQuery::from_str(path),
108    }
109    // TODO: Properly handle non-OK states to prevent possible panics (APMSP-18190).
110    .unwrap();
111    let mut parts = url.clone().into_parts();
112    parts.path_and_query = Some(new_p_and_q);
113    // TODO: Properly handle non-OK states to prevent possible panics (APMSP-18190).
114    #[allow(clippy::unwrap_used)]
115    Uri::from_parts(parts).unwrap()
116}
117
118#[derive(Clone, Default, Debug)]
119pub struct TracerMetadata {
120    pub hostname: String,
121    pub env: String,
122    pub app_version: String,
123    pub runtime_id: String,
124    pub service: String,
125    pub tracer_version: String,
126    pub language: String,
127    pub language_version: String,
128    pub language_interpreter: String,
129    pub language_interpreter_vendor: String,
130    pub git_commit_sha: String,
131    pub process_tags: String,
132    pub client_computed_stats: bool,
133    pub client_computed_top_level: bool,
134}
135
136impl<'a> From<&'a TracerMetadata> for TracerHeaderTags<'a> {
137    fn from(tags: &'a TracerMetadata) -> TracerHeaderTags<'a> {
138        TracerHeaderTags::<'_> {
139            lang: &tags.language,
140            lang_version: &tags.language_version,
141            tracer_version: &tags.tracer_version,
142            lang_interpreter: &tags.language_interpreter,
143            lang_vendor: &tags.language_interpreter_vendor,
144            client_computed_stats: tags.client_computed_stats,
145            client_computed_top_level: tags.client_computed_top_level,
146            ..Default::default()
147        }
148    }
149}
150
151impl<'a> From<&'a TracerMetadata> for HashMap<&'static str, String> {
152    fn from(tags: &'a TracerMetadata) -> HashMap<&'static str, String> {
153        TracerHeaderTags::from(tags).into()
154    }
155}
156
157#[derive(Debug)]
158pub(crate) struct TraceExporterWorkers {
159    pub info: PausableWorker<AgentInfoFetcher>,
160    pub stats: Option<PausableWorker<StatsExporter>>,
161    pub telemetry: Option<PausableWorker<TelemetryWorker>>,
162}
163
164/// The TraceExporter ingest traces from the tracers serialized as messagepack and forward them to
165/// the agent while applying some transformation.
166///
167/// # Proxy
168/// If the input format is set as `Proxy`, the exporter will forward traces to the agent without
169/// deserializing them.
170///
171/// # Features
172/// When the input format is set to `V04` the TraceExporter will deserialize the traces and perform
173/// some operation before sending them to the agent. The available operations are described below.
174///
175/// ## V07 Serialization
176/// The Trace exporter can serialize the traces to V07 before sending them to the agent.
177///
178/// ## Stats computation
179/// The Trace Exporter can compute stats on traces. In this case the trace exporter will start
180/// another task to send stats when a time bucket expire. When this feature is enabled the
181/// TraceExporter drops all spans that may not be sampled by the agent.
182#[allow(missing_docs)]
183enum DeserInputFormat {
184    V04,
185    V05,
186}
187
188#[derive(Debug)]
189pub struct TraceExporter {
190    endpoint: Endpoint,
191    metadata: TracerMetadata,
192    input_format: TraceExporterInputFormat,
193    output_format: TraceExporterOutputFormat,
194    // TODO - do something with the response callback - https://datadoghq.atlassian.net/browse/APMSP-1019
195    runtime: Arc<Mutex<Option<Arc<Runtime>>>>,
196    /// None if dogstatsd is disabled
197    dogstatsd: Option<Client>,
198    common_stats_tags: Vec<Tag>,
199    client_computed_top_level: bool,
200    client_side_stats: ArcSwap<StatsComputationStatus>,
201    previous_info_state: ArcSwapOption<String>,
202    info_response_observer: ResponseObserver,
203    telemetry: Option<TelemetryClient>,
204    health_metrics_enabled: bool,
205    workers: Arc<Mutex<TraceExporterWorkers>>,
206    agent_payload_response_version: Option<AgentResponsePayloadVersion>,
207    http_client: HttpClient,
208}
209
210impl TraceExporter {
211    #[allow(missing_docs)]
212    pub fn builder() -> TraceExporterBuilder {
213        TraceExporterBuilder::default()
214    }
215
216    /// Return the existing runtime or create a new one and start all workers
217    fn runtime(&self) -> Result<Arc<Runtime>, TraceExporterError> {
218        let mut runtime_guard = self.runtime.lock_or_panic();
219        match runtime_guard.as_ref() {
220            Some(runtime) => {
221                // Runtime already running
222                Ok(runtime.clone())
223            }
224            None => {
225                // Create a new current thread runtime with all features enabled
226                let runtime = Arc::new(
227                    tokio::runtime::Builder::new_multi_thread()
228                        .worker_threads(1)
229                        .enable_all()
230                        .build()?,
231                );
232                *runtime_guard = Some(runtime.clone());
233                self.start_all_workers(&runtime)?;
234                Ok(runtime)
235            }
236        }
237    }
238
239    /// Manually start all workers
240    pub fn run_worker(&self) -> Result<(), TraceExporterError> {
241        self.runtime()?;
242        Ok(())
243    }
244
245    /// Start all workers with the given runtime
246    fn start_all_workers(&self, runtime: &Arc<Runtime>) -> Result<(), TraceExporterError> {
247        let mut workers = self.workers.lock_or_panic();
248
249        self.start_info_worker(&mut workers, runtime)?;
250        self.start_stats_worker(&mut workers, runtime)?;
251        self.start_telemetry_worker(&mut workers, runtime)?;
252
253        Ok(())
254    }
255
256    /// Start the info worker
257    fn start_info_worker(
258        &self,
259        workers: &mut TraceExporterWorkers,
260        runtime: &Arc<Runtime>,
261    ) -> Result<(), TraceExporterError> {
262        workers.info.start(runtime).map_err(|e| {
263            TraceExporterError::Internal(InternalErrorKind::InvalidWorkerState(e.to_string()))
264        })
265    }
266
267    /// Start the stats worker if present
268    fn start_stats_worker(
269        &self,
270        workers: &mut TraceExporterWorkers,
271        runtime: &Arc<Runtime>,
272    ) -> Result<(), TraceExporterError> {
273        if let Some(stats_worker) = &mut workers.stats {
274            stats_worker.start(runtime).map_err(|e| {
275                TraceExporterError::Internal(InternalErrorKind::InvalidWorkerState(e.to_string()))
276            })?;
277        }
278        Ok(())
279    }
280
281    /// Start the telemetry worker if present
282    fn start_telemetry_worker(
283        &self,
284        workers: &mut TraceExporterWorkers,
285        runtime: &Arc<Runtime>,
286    ) -> Result<(), TraceExporterError> {
287        if let Some(telemetry_worker) = &mut workers.telemetry {
288            telemetry_worker.start(runtime).map_err(|e| {
289                TraceExporterError::Internal(InternalErrorKind::InvalidWorkerState(e.to_string()))
290            })?;
291            if let Some(client) = &self.telemetry {
292                runtime.block_on(client.start());
293            }
294        }
295        Ok(())
296    }
297
298    pub fn stop_worker(&self) {
299        let runtime = self.runtime.lock_or_panic().take();
300        if let Some(ref rt) = runtime {
301            // Stop workers to save their state
302            let mut workers = self.workers.lock_or_panic();
303            rt.block_on(async {
304                let _ = workers.info.pause().await;
305                if let Some(stats_worker) = &mut workers.stats {
306                    let _ = stats_worker.pause().await;
307                };
308                if let Some(telemetry_worker) = &mut workers.telemetry {
309                    let _ = telemetry_worker.pause().await;
310                };
311            });
312        }
313        // When the info fetcher is paused, the trigger channel keeps a reference to the runtime's
314        // IoStack as a waker. This prevents the IoStack from being dropped when shutting
315        // down runtime. By manually sending a message to the trigger channel we trigger the
316        // waker releasing the reference to the IoStack. Finally we drain the channel to
317        // avoid triggering a fetch when the info fetcher is restarted.
318        if let PausableWorker::Paused { worker } = &mut self.workers.lock_or_panic().info {
319            self.info_response_observer.manual_trigger();
320            worker.drain();
321        }
322        drop(runtime);
323    }
324
325    /// Send msgpack serialized traces to the agent
326    ///
327    /// # Arguments
328    ///
329    /// * data: A slice containing the serialized traces. This slice should be encoded following the
330    ///   input_format passed to the TraceExporter on creating.
331    ///
332    /// # Returns
333    /// * Ok(AgentResponse): The response from the agent
334    /// * Err(TraceExporterError): An error detailing what went wrong in the process
335    pub fn send(&self, data: &[u8]) -> Result<AgentResponse, TraceExporterError> {
336        self.check_agent_info();
337
338        let res = match self.input_format {
339            TraceExporterInputFormat::V04 => self.send_deser(data, DeserInputFormat::V04),
340            TraceExporterInputFormat::V05 => self.send_deser(data, DeserInputFormat::V05),
341        }?;
342        if matches!(&res, AgentResponse::Changed { body } if body.is_empty()) {
343            return Err(TraceExporterError::Agent(
344                error::AgentErrorKind::EmptyResponse,
345            ));
346        }
347
348        Ok(res)
349    }
350
351    /// Safely shutdown the TraceExporter and all related tasks
352    pub fn shutdown(mut self, timeout: Option<Duration>) -> Result<(), TraceExporterError> {
353        let runtime = tokio::runtime::Builder::new_current_thread()
354            .enable_all()
355            .build()?;
356
357        if let Some(timeout) = timeout {
358            match runtime
359                .block_on(async { tokio::time::timeout(timeout, self.shutdown_async()).await })
360            {
361                Ok(()) => Ok(()),
362                Err(_e) => Err(TraceExporterError::Shutdown(
363                    error::ShutdownError::TimedOut(timeout),
364                )),
365            }
366        } else {
367            runtime.block_on(self.shutdown_async());
368            Ok(())
369        }
370    }
371
372    /// Future used inside `Self::shutdown`.
373    ///
374    /// This function should not take ownership of the trace exporter as it will cause the runtime
375    /// stored in the trace exporter to be dropped in a non-blocking context causing a panic.
376    async fn shutdown_async(&mut self) {
377        let stats_status = self.client_side_stats.load();
378        if let StatsComputationStatus::Enabled {
379            cancellation_token, ..
380        } = stats_status.as_ref()
381        {
382            cancellation_token.cancel();
383
384            let stats_worker = self.workers.lock_or_panic().stats.take();
385
386            if let Some(stats_worker) = stats_worker {
387                let _ = stats_worker.join().await;
388            }
389        }
390        if let Some(telemetry) = self.telemetry.take() {
391            telemetry.shutdown().await;
392            let telemetry_worker = self.workers.lock_or_panic().telemetry.take();
393
394            if let Some(telemetry_worker) = telemetry_worker {
395                let _ = telemetry_worker.join().await;
396            }
397        }
398    }
399
400    /// Check if agent info state has changed
401    fn has_agent_info_state_changed(&self, agent_info: &Arc<AgentInfo>) -> bool {
402        Some(agent_info.state_hash.as_str())
403            != self
404                .previous_info_state
405                .load()
406                .as_deref()
407                .map(|s| s.as_str())
408    }
409
410    fn check_agent_info(&self) {
411        if let Some(agent_info) = agent_info::get_agent_info() {
412            if self.has_agent_info_state_changed(&agent_info) {
413                match &**self.client_side_stats.load() {
414                    StatsComputationStatus::Disabled => {}
415                    StatsComputationStatus::DisabledByAgent { .. } => {
416                        let ctx = stats::StatsContext {
417                            metadata: &self.metadata,
418                            endpoint_url: &self.endpoint.url,
419                            runtime: &self.runtime,
420                        };
421                        stats::handle_stats_disabled_by_agent(
422                            &ctx,
423                            &agent_info,
424                            &self.client_side_stats,
425                            &self.workers,
426                            self.http_client.clone(),
427                        );
428                    }
429                    StatsComputationStatus::Enabled {
430                        stats_concentrator, ..
431                    } => {
432                        let ctx = stats::StatsContext {
433                            metadata: &self.metadata,
434                            endpoint_url: &self.endpoint.url,
435                            runtime: &self.runtime,
436                        };
437                        stats::handle_stats_enabled(
438                            &ctx,
439                            &agent_info,
440                            stats_concentrator,
441                            &self.client_side_stats,
442                            &self.workers,
443                        );
444                    }
445                }
446                self.previous_info_state
447                    .store(Some(agent_info.state_hash.clone().into()))
448            }
449        }
450    }
451
452    /// !!! This function is only for testing purposes !!!
453    ///
454    /// Waits the agent info to be ready by checking the agent_info state.
455    /// It will only return Ok after the agent info has been fetched at least once or Err if timeout
456    /// has been reached
457    ///
458    /// In production:
459    /// 1) We should not synchronously wait for this to be ready before sending traces
460    /// 2) It's not guaranteed to not block forever, since the /info endpoint might not be
461    ///    available.
462    ///
463    /// The `send` function will check agent_info when running, which will only be available if the
464    /// fetcher had time to reach to the agent.
465    /// Since agent_info can enable CSS computation, waiting for this during testing can make
466    /// snapshots non-deterministic.
467    #[cfg(feature = "test-utils")]
468    pub fn wait_agent_info_ready(&self, timeout: Duration) -> anyhow::Result<()> {
469        let start = std::time::Instant::now();
470        loop {
471            if std::time::Instant::now().duration_since(start) > timeout {
472                anyhow::bail!("Timeout waiting for agent info to be ready",);
473            }
474            if agent_info::get_agent_info().is_some() {
475                return Ok(());
476            }
477            std::thread::sleep(Duration::from_millis(10));
478        }
479    }
480
481    /// Emit a health metric to dogstatsd
482    fn emit_metric(&self, metric: HealthMetric, custom_tags: Option<Vec<&Tag>>) {
483        if self.health_metrics_enabled {
484            let emitter = MetricsEmitter::new(self.dogstatsd.as_ref(), &self.common_stats_tags);
485            emitter.emit(metric, custom_tags);
486        }
487    }
488
489    /// Emit all health metrics from a SendResult
490    fn emit_send_result(&self, result: &SendResult) {
491        if self.health_metrics_enabled {
492            let emitter = MetricsEmitter::new(self.dogstatsd.as_ref(), &self.common_stats_tags);
493            emitter.emit_from_send_result(result);
494        }
495    }
496
497    /// Send a list of trace chunks to the agent
498    ///
499    /// # Arguments
500    /// * trace_chunks: A list of trace chunks. Each trace chunk is a list of spans.
501    ///
502    /// # Returns
503    /// * Ok(String): The response from the agent
504    /// * Err(TraceExporterError): An error detailing what went wrong in the process
505    pub fn send_trace_chunks<T: TraceData>(
506        &self,
507        trace_chunks: Vec<Vec<Span<T>>>,
508    ) -> Result<AgentResponse, TraceExporterError> {
509        self.check_agent_info();
510        self.runtime()?
511            .block_on(async { self.send_trace_chunks_inner(trace_chunks).await })
512    }
513
514    /// Send a list of trace chunks to the agent, asynchronously
515    ///
516    /// # Arguments
517    /// * trace_chunks: A list of trace chunks. Each trace chunk is a list of spans.
518    ///
519    /// # Returns
520    /// * Ok(String): The response from the agent
521    /// * Err(TraceExporterError): An error detailing what went wrong in the process
522    pub async fn send_trace_chunks_async<T: TraceData>(
523        &self,
524        trace_chunks: Vec<Vec<Span<T>>>,
525    ) -> Result<AgentResponse, TraceExporterError> {
526        self.check_agent_info();
527        self.send_trace_chunks_inner(trace_chunks).await
528    }
529
530    /// Deserializes, processes and sends trace chunks to the agent
531    fn send_deser(
532        &self,
533        data: &[u8],
534        format: DeserInputFormat,
535    ) -> Result<AgentResponse, TraceExporterError> {
536        let (traces, _) = match format {
537            DeserInputFormat::V04 => msgpack_decoder::v04::from_slice(data),
538            DeserInputFormat::V05 => msgpack_decoder::v05::from_slice(data),
539        }
540        .map_err(|e| {
541            error!("Error deserializing trace from request body: {e}");
542            self.emit_metric(
543                HealthMetric::Count(health_metrics::DESERIALIZE_TRACES_ERRORS, 1),
544                None,
545            );
546            TraceExporterError::Deserialization(e)
547        })?;
548        debug!(
549            trace_count = traces.len(),
550            "Trace deserialization completed successfully"
551        );
552        self.emit_metric(
553            HealthMetric::Count(health_metrics::DESERIALIZE_TRACES, traces.len() as i64),
554            None,
555        );
556
557        self.runtime()?
558            .block_on(async { self.send_trace_chunks_inner(traces).await })
559    }
560
561    /// Send traces payload to agent with retry and telemetry reporting
562    async fn send_traces_with_telemetry(
563        &self,
564        endpoint: &Endpoint,
565        mp_payload: Vec<u8>,
566        headers: HashMap<&'static str, String>,
567        chunks: usize,
568        chunks_dropped_p0: usize,
569    ) -> Result<AgentResponse, TraceExporterError> {
570        let strategy = RetryStrategy::default();
571        let payload_len = mp_payload.len();
572
573        // Send traces to the agent
574        let result =
575            send_with_retry(&self.http_client, endpoint, mp_payload, &headers, &strategy).await;
576
577        // Send telemetry for the payload sending
578        if let Some(telemetry) = &self.telemetry {
579            if let Err(e) = telemetry.send(&SendPayloadTelemetry::from_retry_result(
580                &result,
581                payload_len as u64,
582                chunks as u64,
583                chunks_dropped_p0 as u64,
584            )) {
585                error!(?e, "Error sending telemetry");
586            }
587        }
588
589        self.handle_send_result(result, chunks, payload_len).await
590    }
591
592    async fn send_trace_chunks_inner<T: TraceData>(
593        &self,
594        mut traces: Vec<Vec<Span<T>>>,
595    ) -> Result<AgentResponse, TraceExporterError> {
596        let mut header_tags: TracerHeaderTags = self.metadata.borrow().into();
597
598        // Process stats computation
599        let dropped_p0_stats = stats::process_traces_for_stats(
600            &mut traces,
601            &mut header_tags,
602            &self.client_side_stats,
603            self.client_computed_top_level,
604        );
605
606        let serializer = TraceSerializer::new(
607            self.output_format,
608            self.agent_payload_response_version.as_ref(),
609        );
610        let prepared = match serializer.prepare_traces_payload(traces, header_tags) {
611            Ok(p) => p,
612            Err(e) => {
613                error!("Error serializing traces: {e}");
614                self.emit_metric(
615                    HealthMetric::Count(health_metrics::SERIALIZE_TRACES_ERRORS, 1),
616                    None,
617                );
618                return Err(e);
619            }
620        };
621
622        let endpoint = Endpoint {
623            url: self.get_agent_url(),
624            ..self.endpoint.clone()
625        };
626
627        self.send_traces_with_telemetry(
628            &endpoint,
629            prepared.data,
630            prepared.headers,
631            prepared.chunk_count,
632            dropped_p0_stats.dropped_p0_traces,
633        )
634        .await
635    }
636
637    /// Handle the result of sending traces to the agent
638    async fn handle_send_result(
639        &self,
640        result: SendWithRetryResult,
641        chunks: usize,
642        payload_len: usize,
643    ) -> Result<AgentResponse, TraceExporterError> {
644        match result {
645            Ok((response, attempts)) => {
646                self.handle_agent_response(chunks, response, payload_len, attempts)
647                    .await
648            }
649            Err(err) => self.handle_send_error(err, payload_len, chunks).await,
650        }
651    }
652
653    /// Handle errors from send with retry operation
654    async fn handle_send_error(
655        &self,
656        err: SendWithRetryError,
657        payload_len: usize,
658        chunks: usize,
659    ) -> Result<AgentResponse, TraceExporterError> {
660        error!(?err, "Error sending traces");
661
662        match err {
663            SendWithRetryError::Http(response, attempts) => {
664                self.handle_http_send_error(response, payload_len, chunks, attempts)
665                    .await
666            }
667            SendWithRetryError::Timeout(attempts) => {
668                let send_result =
669                    SendResult::failure(TransportErrorType::Timeout, payload_len, chunks, attempts);
670                self.emit_send_result(&send_result);
671                Err(TraceExporterError::from(io::Error::from(
672                    io::ErrorKind::TimedOut,
673                )))
674            }
675            SendWithRetryError::Network(err, attempts) => {
676                let send_result =
677                    SendResult::failure(TransportErrorType::Network, payload_len, chunks, attempts);
678                self.emit_send_result(&send_result);
679                Err(TraceExporterError::from(err))
680            }
681            SendWithRetryError::Build(attempts) => {
682                let send_result =
683                    SendResult::failure(TransportErrorType::Build, payload_len, chunks, attempts);
684                self.emit_send_result(&send_result);
685                Err(TraceExporterError::from(io::Error::from(
686                    io::ErrorKind::Other,
687                )))
688            }
689        }
690    }
691
692    /// Handle HTTP error responses from send with retry
693    async fn handle_http_send_error(
694        &self,
695        response: http_common::HttpResponse,
696        payload_len: usize,
697        chunks: usize,
698        attempts: u32,
699    ) -> Result<AgentResponse, TraceExporterError> {
700        let status = response.status();
701
702        // Check if the agent state has changed for error responses
703        self.info_response_observer.check_response(&response);
704
705        // Emit health metrics using SendResult
706        let send_result = SendResult::failure(
707            TransportErrorType::Http(status.as_u16()),
708            payload_len,
709            chunks,
710            attempts,
711        );
712        self.emit_send_result(&send_result);
713
714        let body = self.read_error_response_body(response).await?;
715        Err(TraceExporterError::Request(RequestError::new(
716            status,
717            &String::from_utf8_lossy(&body),
718        )))
719    }
720
721    /// Read response body from error response
722    async fn read_error_response_body(
723        &self,
724        response: http_common::HttpResponse,
725    ) -> Result<bytes::Bytes, TraceExporterError> {
726        match response.into_body().collect().await {
727            Ok(body) => Ok(body.to_bytes()),
728            Err(err) => {
729                error!(?err, "Error reading agent response body");
730                Err(TraceExporterError::from(err))
731            }
732        }
733    }
734
735    /// Check if the agent's payload version has changed based on response headers
736    fn check_payload_version_changed(&self, response: &http_common::HttpResponse) -> bool {
737        let status = response.status();
738        match (
739            status.is_success(),
740            self.agent_payload_response_version.as_ref(),
741            response.headers().get(DATADOG_RATES_PAYLOAD_VERSION_HEADER),
742        ) {
743            (false, _, _) => {
744                // If the status is not success, the rates are considered unchanged
745                false
746            }
747            (true, None, _) => {
748                // if the agent_payload_response_version fingerprint is not enabled we always
749                // return the new rates
750                true
751            }
752            (true, Some(agent_payload_response_version), Some(new_payload_version)) => {
753                if let Ok(new_payload_version_str) = new_payload_version.to_str() {
754                    agent_payload_response_version.check_and_update(new_payload_version_str)
755                } else {
756                    false
757                }
758            }
759            _ => false,
760        }
761    }
762
763    /// Read response body and handle potential errors
764    async fn read_response_body(
765        response: http_common::HttpResponse,
766    ) -> Result<String, http_common::Error> {
767        let body = http_common::collect_response_bytes(response).await?;
768        Ok(String::from_utf8_lossy(&body).to_string())
769    }
770
771    /// Handle successful trace sending response
772    fn handle_successful_trace_response(
773        &self,
774        chunks: usize,
775        payload_len: usize,
776        attempts: u32,
777        body: String,
778        payload_version_changed: bool,
779    ) -> Result<AgentResponse, TraceExporterError> {
780        debug!(chunks = chunks, "Trace chunks sent successfully to agent");
781        let send_result = SendResult::success(payload_len, chunks, attempts);
782        self.emit_send_result(&send_result);
783
784        Ok(if payload_version_changed {
785            AgentResponse::Changed { body }
786        } else {
787            AgentResponse::Unchanged
788        })
789    }
790
791    async fn handle_agent_response(
792        &self,
793        chunks: usize,
794        response: http_common::HttpResponse,
795        payload_len: usize,
796        attempts: u32,
797    ) -> Result<AgentResponse, TraceExporterError> {
798        // Check if the agent state has changed
799        self.info_response_observer.check_response(&response);
800
801        let status = response.status();
802        let payload_version_changed = self.check_payload_version_changed(&response);
803
804        match Self::read_response_body(response).await {
805            Ok(body) => {
806                if !status.is_success() {
807                    warn!(
808                        status = %status,
809                        "Agent returned non-success status for trace send"
810                    );
811                    let send_result = SendResult::failure(
812                        TransportErrorType::Http(status.as_u16()),
813                        payload_len,
814                        chunks,
815                        attempts,
816                    );
817                    self.emit_send_result(&send_result);
818                    return Err(TraceExporterError::Request(RequestError::new(
819                        status, &body,
820                    )));
821                }
822
823                self.handle_successful_trace_response(
824                    chunks,
825                    payload_len,
826                    attempts,
827                    body,
828                    payload_version_changed,
829                )
830            }
831            Err(err) => {
832                error!(?err, "Error reading agent response body");
833                let send_result = SendResult::failure(
834                    TransportErrorType::ResponseBody,
835                    payload_len,
836                    chunks,
837                    attempts,
838                );
839                self.emit_send_result(&send_result);
840                Err(TraceExporterError::from(err))
841            }
842        }
843    }
844
845    fn get_agent_url(&self) -> Uri {
846        self.output_format.add_path(&self.endpoint.url)
847    }
848
849    #[cfg(test)]
850    /// Test only function to check if the stats computation is active and the worker is running
851    pub fn is_stats_worker_active(&self) -> bool {
852        stats::is_stats_worker_active(&self.client_side_stats, &self.workers)
853    }
854}
855
856#[derive(Debug, Default, Clone)]
857pub struct TelemetryConfig {
858    pub heartbeat: u64,
859    pub runtime_id: Option<String>,
860    pub debug_enabled: bool,
861}
862
863#[allow(missing_docs)]
864pub trait ResponseCallback {
865    #[allow(missing_docs)]
866    fn call(&self, response: &str);
867}
868
869#[cfg(test)]
870mod tests {
871    use self::error::AgentErrorKind;
872    use super::*;
873    use httpmock::prelude::*;
874    use httpmock::MockServer;
875    use libdd_tinybytes::BytesString;
876    use libdd_trace_utils::msgpack_encoder;
877    use libdd_trace_utils::span::v04::SpanBytes;
878    use libdd_trace_utils::span::v05;
879    use std::collections::HashMap;
880    use std::net;
881    use std::time::Duration;
882    use tokio::time::sleep;
883
884    // v05 messagepack empty payload -> [[""], []]
885    const V5_EMPTY: [u8; 4] = [0x92, 0x91, 0xA0, 0x90];
886
887    #[test]
888    fn test_from_tracer_tags_to_tracer_header_tags() {
889        let tracer_tags = TracerMetadata {
890            tracer_version: "v0.1".to_string(),
891            language: "rust".to_string(),
892            language_version: "1.52.1".to_string(),
893            language_interpreter: "rustc".to_string(),
894            language_interpreter_vendor: "rust-lang".to_string(),
895            client_computed_stats: true,
896            client_computed_top_level: true,
897            ..Default::default()
898        };
899
900        let tracer_header_tags: TracerHeaderTags = (&tracer_tags).into();
901
902        assert_eq!(tracer_header_tags.tracer_version, "v0.1");
903        assert_eq!(tracer_header_tags.lang, "rust");
904        assert_eq!(tracer_header_tags.lang_version, "1.52.1");
905        assert_eq!(tracer_header_tags.lang_interpreter, "rustc");
906        assert_eq!(tracer_header_tags.lang_vendor, "rust-lang");
907        assert!(tracer_header_tags.client_computed_stats);
908        assert!(tracer_header_tags.client_computed_top_level);
909    }
910
911    #[test]
912    fn test_from_tracer_tags_to_hashmap() {
913        let tracer_tags = TracerMetadata {
914            tracer_version: "v0.1".to_string(),
915            language: "rust".to_string(),
916            language_version: "1.52.1".to_string(),
917            language_interpreter: "rustc".to_string(),
918            client_computed_stats: true,
919            client_computed_top_level: true,
920            ..Default::default()
921        };
922
923        let hashmap: HashMap<&'static str, String> = (&tracer_tags).into();
924
925        assert_eq!(hashmap.get("datadog-meta-tracer-version").unwrap(), "v0.1");
926        assert_eq!(hashmap.get("datadog-meta-lang").unwrap(), "rust");
927        assert_eq!(hashmap.get("datadog-meta-lang-version").unwrap(), "1.52.1");
928        assert_eq!(
929            hashmap.get("datadog-meta-lang-interpreter").unwrap(),
930            "rustc"
931        );
932        assert!(hashmap.contains_key("datadog-client-computed-stats"));
933        assert!(hashmap.contains_key("datadog-client-computed-top-level"));
934    }
935
936    fn read(socket: &net::UdpSocket) -> String {
937        let mut buf = [0; 1_000];
938        socket.recv(&mut buf).expect("No data");
939        let datagram = String::from_utf8_lossy(buf.as_ref());
940        datagram.trim_matches(char::from(0)).to_string()
941    }
942
943    fn build_test_exporter(
944        url: String,
945        dogstatsd_url: Option<String>,
946        input: TraceExporterInputFormat,
947        output: TraceExporterOutputFormat,
948        enable_telemetry: bool,
949        enable_health_metrics: bool,
950    ) -> TraceExporter {
951        let mut builder = TraceExporterBuilder::default();
952        builder
953            .set_url(&url)
954            .set_service("test")
955            .set_env("staging")
956            .set_tracer_version("v0.1")
957            .set_language("nodejs")
958            .set_language_version("1.0")
959            .set_language_interpreter("v8")
960            .set_input_format(input)
961            .set_output_format(output);
962
963        if enable_health_metrics {
964            builder.enable_health_metrics();
965        }
966
967        if let Some(url) = dogstatsd_url {
968            builder.set_dogstatsd_url(&url);
969        };
970
971        if enable_telemetry {
972            builder.enable_telemetry(TelemetryConfig {
973                heartbeat: 100,
974                ..Default::default()
975            });
976        }
977
978        builder.build().unwrap()
979    }
980
981    #[test]
982    #[cfg_attr(miri, ignore)]
983    fn test_health_metrics() {
984        let stats_socket = net::UdpSocket::bind("127.0.0.1:0").expect("failed to bind host socket");
985        let _ = stats_socket.set_read_timeout(Some(Duration::from_millis(500)));
986
987        let fake_agent = MockServer::start();
988        let _mock_traces = fake_agent.mock(|_, then| {
989            then.status(200)
990                .header("content-type", "application/json")
991                .body(r#"{ "rate_by_service": { "service:test,env:staging": 1.0, "service:test,env:prod": 0.3 } }"#);
992        });
993
994        let exporter = build_test_exporter(
995            fake_agent.url("/v0.4/traces"),
996            Some(stats_socket.local_addr().unwrap().to_string()),
997            TraceExporterInputFormat::V04,
998            TraceExporterOutputFormat::V04,
999            false,
1000            true,
1001        );
1002
1003        let traces: Vec<Vec<SpanBytes>> = vec![
1004            vec![SpanBytes {
1005                name: BytesString::from_slice(b"test").unwrap(),
1006                ..Default::default()
1007            }],
1008            vec![SpanBytes {
1009                name: BytesString::from_slice(b"test2").unwrap(),
1010                ..Default::default()
1011            }],
1012        ];
1013        let data = msgpack_encoder::v04::to_vec(&traces);
1014
1015        let _result = exporter.send(data.as_ref()).expect("failed to send trace");
1016
1017        // Collect all metrics
1018        let mut received_metrics = Vec::new();
1019        for _ in 0..5 {
1020            received_metrics.push(read(&stats_socket));
1021        }
1022
1023        // Check that all expected metrics are present
1024        let expected_metrics = vec![
1025            format!(
1026                "datadog.tracer.exporter.deserialize.traces:2|c|#libdatadog_version:{}",
1027                env!("CARGO_PKG_VERSION")
1028            ),
1029            format!(
1030                "datadog.tracer.exporter.transport.traces.successful:2|c|#libdatadog_version:{}",
1031                env!("CARGO_PKG_VERSION")
1032            ),
1033            format!(
1034                "datadog.tracer.exporter.transport.sent.bytes:{}|d|#libdatadog_version:{}",
1035                data.len(),
1036                env!("CARGO_PKG_VERSION")
1037            ),
1038            format!(
1039                "datadog.tracer.exporter.transport.traces.sent:2|d|#libdatadog_version:{}",
1040                env!("CARGO_PKG_VERSION")
1041            ),
1042            format!(
1043                "datadog.tracer.exporter.transport.requests:1|d|#libdatadog_version:{}",
1044                env!("CARGO_PKG_VERSION")
1045            ),
1046        ];
1047
1048        for expected in expected_metrics {
1049            assert!(
1050                received_metrics.contains(&expected),
1051                "Expected metric '{expected}' not found in received metrics: {received_metrics:?}"
1052            );
1053        }
1054    }
1055
1056    #[test]
1057    #[cfg_attr(miri, ignore)]
1058    fn test_invalid_traces() {
1059        let stats_socket = net::UdpSocket::bind("127.0.0.1:0").expect("failed to bind host socket");
1060        let _ = stats_socket.set_read_timeout(Some(Duration::from_millis(500)));
1061
1062        let fake_agent = MockServer::start();
1063
1064        let exporter = build_test_exporter(
1065            fake_agent.url("/v0.4/traces"),
1066            Some(stats_socket.local_addr().unwrap().to_string()),
1067            TraceExporterInputFormat::V04,
1068            TraceExporterOutputFormat::V04,
1069            false,
1070            true,
1071        );
1072
1073        let bad_payload = b"some_bad_payload".as_ref();
1074        let result = exporter.send(bad_payload);
1075
1076        assert!(result.is_err());
1077
1078        assert_eq!(
1079            &format!(
1080                "datadog.tracer.exporter.deserialize.errors:1|c|#libdatadog_version:{}",
1081                env!("CARGO_PKG_VERSION")
1082            ),
1083            &read(&stats_socket)
1084        );
1085    }
1086
1087    #[test]
1088    #[cfg_attr(miri, ignore)]
1089    fn test_health_metrics_error() {
1090        let stats_socket = net::UdpSocket::bind("127.0.0.1:0").expect("failed to bind host socket");
1091        let _ = stats_socket.set_read_timeout(Some(Duration::from_millis(500)));
1092
1093        let fake_agent = MockServer::start();
1094        let _mock_traces = fake_agent.mock(|_, then| {
1095            then.status(400)
1096                .header("content-type", "application/json")
1097                .body("{}");
1098        });
1099
1100        let exporter = build_test_exporter(
1101            fake_agent.url("/v0.4/traces"),
1102            Some(stats_socket.local_addr().unwrap().to_string()),
1103            TraceExporterInputFormat::V04,
1104            TraceExporterOutputFormat::V04,
1105            false,
1106            true,
1107        );
1108
1109        let traces: Vec<Vec<SpanBytes>> = vec![vec![SpanBytes {
1110            name: BytesString::from_slice(b"test").unwrap(),
1111            ..Default::default()
1112        }]];
1113        let data = msgpack_encoder::v04::to_vec(&traces);
1114        let result = exporter.send(data.as_ref());
1115
1116        assert!(result.is_err());
1117
1118        // Collect all metrics
1119        let mut metrics = Vec::new();
1120        loop {
1121            let mut buf = [0; 1_000];
1122            match stats_socket.recv(&mut buf) {
1123                Ok(size) => {
1124                    let datagram = String::from_utf8_lossy(&buf[..size]);
1125                    metrics.push(datagram.to_string());
1126                }
1127                Err(_) => break, // Timeout, no more metrics
1128            }
1129        }
1130
1131        // Expected metrics
1132        let expected_deser = format!(
1133            "datadog.tracer.exporter.deserialize.traces:1|c|#libdatadog_version:{}",
1134            env!("CARGO_PKG_VERSION")
1135        );
1136        let expected_error = format!(
1137            "datadog.tracer.exporter.transport.traces.failed:1|c|#libdatadog_version:{},type:400",
1138            env!("CARGO_PKG_VERSION")
1139        );
1140        let expected_dropped = format!(
1141            "datadog.tracer.exporter.transport.dropped.bytes:{}|d|#libdatadog_version:{}",
1142            data.len(),
1143            env!("CARGO_PKG_VERSION")
1144        );
1145        let expected_sent_bytes = format!(
1146            "datadog.tracer.exporter.transport.sent.bytes:{}|d|#libdatadog_version:{}",
1147            data.len(),
1148            env!("CARGO_PKG_VERSION")
1149        );
1150        let expected_sent_traces = format!(
1151            "datadog.tracer.exporter.transport.traces.sent:1|d|#libdatadog_version:{}",
1152            env!("CARGO_PKG_VERSION")
1153        );
1154        let expected_dropped_traces = format!(
1155            "datadog.tracer.exporter.transport.traces.dropped:1|d|#libdatadog_version:{}",
1156            env!("CARGO_PKG_VERSION")
1157        );
1158        let expected_requests = format!(
1159            "datadog.tracer.exporter.transport.requests:5|d|#libdatadog_version:{}",
1160            env!("CARGO_PKG_VERSION")
1161        );
1162
1163        // Verify all expected metrics are present
1164        assert!(
1165            metrics.contains(&expected_deser),
1166            "Missing deser_traces metric. Got: {metrics:?}"
1167        );
1168        assert!(
1169            metrics.contains(&expected_error),
1170            "Missing send.traces.errors metric. Got: {metrics:?}"
1171        );
1172        assert!(
1173            metrics.contains(&expected_dropped),
1174            "Missing http.dropped.bytes metric. Got: {metrics:?}"
1175        );
1176        assert!(
1177            metrics.contains(&expected_dropped_traces),
1178            "Missing http.dropped.traces metric. Got: {metrics:?}"
1179        );
1180        assert!(
1181            metrics.contains(&expected_sent_bytes),
1182            "Missing http.sent.bytes metric. Got: {metrics:?}"
1183        );
1184        assert!(
1185            metrics.contains(&expected_sent_traces),
1186            "Missing http.sent.traces metric. Got: {metrics:?}"
1187        );
1188        assert!(
1189            metrics.contains(&expected_requests),
1190            "Missing http.requests metric. Got: {metrics:?}"
1191        );
1192    }
1193
1194    #[test]
1195    #[cfg_attr(miri, ignore)]
1196    fn test_health_metrics_dropped_bytes_exclusions() {
1197        let stats_socket = net::UdpSocket::bind("127.0.0.1:0").expect("failed to bind host socket");
1198        let _ = stats_socket.set_read_timeout(Some(Duration::from_millis(500)));
1199
1200        // Test 404 - should NOT emit http.dropped.bytes
1201        let fake_agent = MockServer::start();
1202        let _mock_traces = fake_agent.mock(|_, then| {
1203            then.status(404)
1204                .header("content-type", "application/json")
1205                .body("{}");
1206        });
1207
1208        let exporter = build_test_exporter(
1209            fake_agent.url("/v0.4/traces"),
1210            Some(stats_socket.local_addr().unwrap().to_string()),
1211            TraceExporterInputFormat::V04,
1212            TraceExporterOutputFormat::V04,
1213            false,
1214            true,
1215        );
1216
1217        let traces: Vec<Vec<SpanBytes>> = vec![vec![SpanBytes {
1218            name: BytesString::from_slice(b"test").unwrap(),
1219            ..Default::default()
1220        }]];
1221        let data = msgpack_encoder::v04::to_vec(&traces);
1222        let result = exporter.send(data.as_ref());
1223
1224        assert!(result.is_err());
1225
1226        // Collect all metrics
1227        let mut received_metrics = Vec::new();
1228        for _ in 0..5 {
1229            received_metrics.push(read(&stats_socket));
1230        }
1231
1232        // Expected metrics for 404 error
1233        let expected_deser = format!(
1234            "datadog.tracer.exporter.deserialize.traces:1|c|#libdatadog_version:{}",
1235            env!("CARGO_PKG_VERSION")
1236        );
1237        let expected_error = format!(
1238            "datadog.tracer.exporter.transport.traces.failed:1|c|#libdatadog_version:{},type:404",
1239            env!("CARGO_PKG_VERSION")
1240        );
1241        let expected_sent_bytes = format!(
1242            "datadog.tracer.exporter.transport.sent.bytes:{}|d|#libdatadog_version:{}",
1243            data.len(),
1244            env!("CARGO_PKG_VERSION")
1245        );
1246        let expected_sent_traces = format!(
1247            "datadog.tracer.exporter.transport.traces.sent:1|d|#libdatadog_version:{}",
1248            env!("CARGO_PKG_VERSION")
1249        );
1250        let expected_requests = format!(
1251            "datadog.tracer.exporter.transport.requests:5|d|#libdatadog_version:{}",
1252            env!("CARGO_PKG_VERSION")
1253        );
1254
1255        // Should emit these metrics
1256        assert!(
1257            received_metrics.contains(&expected_deser),
1258            "Missing deser_traces metric. Got: {received_metrics:?}"
1259        );
1260        assert!(
1261            received_metrics.contains(&expected_error),
1262            "Missing send.traces.errors metric. Got: {received_metrics:?}"
1263        );
1264        assert!(
1265            received_metrics.contains(&expected_sent_bytes),
1266            "Missing http.sent.bytes metric. Got: {received_metrics:?}"
1267        );
1268        assert!(
1269            received_metrics.contains(&expected_sent_traces),
1270            "Missing http.sent.traces metric. Got: {received_metrics:?}"
1271        );
1272        assert!(
1273            received_metrics.contains(&expected_requests),
1274            "Missing http.requests metric. Got: {received_metrics:?}"
1275        );
1276
1277        // Should NOT emit http.dropped.bytes for 404
1278        let dropped_bytes_metric = format!(
1279            "datadog.tracer.exporter.transport.dropped.bytes:{}|d|#libdatadog_version:{}",
1280            data.len(),
1281            env!("CARGO_PKG_VERSION")
1282        );
1283        assert!(
1284            !received_metrics.contains(&dropped_bytes_metric),
1285            "Should not emit http.dropped.bytes for 404. Got: {received_metrics:?}"
1286        );
1287
1288        // Should NOT emit http.dropped.traces for 404
1289        let dropped_traces_metric = format!(
1290            "datadog.tracer.exporter.transport.traces.dropped:1|d|#libdatadog_version:{}",
1291            env!("CARGO_PKG_VERSION")
1292        );
1293        assert!(
1294            !received_metrics.contains(&dropped_traces_metric),
1295            "Should not emit http.dropped.traces for 404. Got: {received_metrics:?}"
1296        );
1297    }
1298
1299    #[test]
1300    #[cfg_attr(miri, ignore)]
1301    fn test_health_metrics_disabled() {
1302        let stats_socket = net::UdpSocket::bind("127.0.0.1:0").expect("failed to bind host socket");
1303        let _ = stats_socket.set_read_timeout(Some(Duration::from_millis(500)));
1304
1305        let fake_agent = MockServer::start();
1306        let _mock_traces = fake_agent.mock(|_, then| {
1307            then.status(200)
1308                .header("content-type", "application/json")
1309                .body(r#"{ "rate_by_service": { "service:test,env:staging": 1.0 } }"#);
1310        });
1311
1312        let exporter = build_test_exporter(
1313            fake_agent.url("/v0.4/traces"),
1314            Some(stats_socket.local_addr().unwrap().to_string()),
1315            TraceExporterInputFormat::V04,
1316            TraceExporterOutputFormat::V04,
1317            false,
1318            false, // Health metrics disabled
1319        );
1320
1321        let traces: Vec<Vec<SpanBytes>> = vec![vec![SpanBytes {
1322            name: BytesString::from_slice(b"test").unwrap(),
1323            ..Default::default()
1324        }]];
1325        let data = msgpack_encoder::v04::to_vec(&traces);
1326
1327        let _result = exporter.send(data.as_ref()).expect("failed to send trace");
1328
1329        // Try to read metrics - should timeout since none are sent
1330        let mut buf = [0; 1_000];
1331        match stats_socket.recv(&mut buf) {
1332            Ok(_) => {
1333                let datagram = String::from_utf8_lossy(buf.as_ref());
1334                let received = datagram.trim_matches(char::from(0)).to_string();
1335                panic!(
1336                    "Expected no metrics when health metrics disabled, but received: {received}"
1337                );
1338            }
1339            Err(e)
1340                if e.kind() == std::io::ErrorKind::WouldBlock
1341                    || e.kind() == std::io::ErrorKind::TimedOut
1342                    || e.kind() == std::io::ErrorKind::Interrupted =>
1343            {
1344                // This is expected - no metrics should be sent when disabled.
1345                // WouldBlock on Unix, TimedOut on Windows.
1346                // Interrupted can occur when signals interrupt the blocking
1347                // recvfrom() syscall before the timeout expires.
1348            }
1349            Err(e) => panic!("Unexpected error reading from socket: {e}"),
1350        }
1351    }
1352
1353    #[test]
1354    #[cfg_attr(miri, ignore)]
1355    fn test_agent_response_parse_default() {
1356        let server = MockServer::start();
1357        let _agent = server.mock(|_, then| {
1358            then.status(200)
1359                .header("content-type", "application/json")
1360                .body(
1361                    r#"{
1362                    "rate_by_service": {
1363                        "service:foo,env:staging": 1.0,
1364                        "service:,env:": 0.8
1365                    }
1366                }"#,
1367                );
1368        });
1369
1370        let mut builder = TraceExporterBuilder::default();
1371        builder
1372            .set_url(&server.url("/"))
1373            .set_service("foo")
1374            .set_env("foo-env")
1375            .set_tracer_version("v0.1")
1376            .set_language("nodejs")
1377            .set_language_version("1.0")
1378            .set_language_interpreter("v8");
1379        let exporter = builder.build().unwrap();
1380
1381        let traces: Vec<Vec<SpanBytes>> = vec![vec![SpanBytes {
1382            name: BytesString::from_slice(b"test").unwrap(),
1383            ..Default::default()
1384        }]];
1385        let data = msgpack_encoder::v04::to_vec(&traces);
1386        let result = exporter.send(data.as_ref()).unwrap();
1387
1388        assert_eq!(
1389            result,
1390            AgentResponse::Changed {
1391                body: r#"{
1392                    "rate_by_service": {
1393                        "service:foo,env:staging": 1.0,
1394                        "service:,env:": 0.8
1395                    }
1396                }"#
1397                .to_string()
1398            }
1399        );
1400    }
1401
1402    #[test]
1403    #[cfg_attr(miri, ignore)]
1404    fn test_agent_response_error() {
1405        let server = MockServer::start();
1406        let _agent = server.mock(|_, then| {
1407            then.status(500)
1408                .header("content-type", "application/json")
1409                .body(r#"{ "error": "Unavailable" }"#);
1410        });
1411
1412        let mut builder = TraceExporterBuilder::default();
1413        builder
1414            .set_url(&server.url("/"))
1415            .set_service("foo")
1416            .set_env("foo-env")
1417            .set_tracer_version("v0.1")
1418            .set_language("nodejs")
1419            .set_language_version("1.0")
1420            .set_language_interpreter("v8");
1421        let exporter = builder.build().unwrap();
1422
1423        let traces: Vec<Vec<SpanBytes>> = vec![vec![SpanBytes {
1424            name: BytesString::from_slice(b"test").unwrap(),
1425            ..Default::default()
1426        }]];
1427        let data = msgpack_encoder::v04::to_vec(&traces);
1428        let code = match exporter.send(data.as_ref()).unwrap_err() {
1429            TraceExporterError::Request(e) => Some(e.status()),
1430            _ => None,
1431        }
1432        .unwrap();
1433
1434        assert_eq!(code, 500);
1435    }
1436
1437    #[test]
1438    #[cfg_attr(miri, ignore)]
1439    fn test_agent_empty_response_error() {
1440        let server = MockServer::start();
1441        let _agent = server.mock(|_, then| {
1442            then.status(200)
1443                .header("content-type", "application/json")
1444                .body("");
1445        });
1446
1447        let mut builder = TraceExporterBuilder::default();
1448        builder
1449            .set_url(&server.url("/"))
1450            .set_service("foo")
1451            .set_env("foo-env")
1452            .set_tracer_version("v0.1")
1453            .set_language("nodejs")
1454            .set_language_version("1.0")
1455            .set_language_interpreter("v8");
1456        let exporter = builder.build().unwrap();
1457
1458        let traces: Vec<Vec<SpanBytes>> = vec![vec![SpanBytes {
1459            name: BytesString::from_slice(b"test").unwrap(),
1460            ..Default::default()
1461        }]];
1462        let data = msgpack_encoder::v04::to_vec(&traces);
1463        let err = exporter.send(data.as_ref());
1464
1465        assert!(err.is_err());
1466        assert_eq!(
1467            match err.unwrap_err() {
1468                TraceExporterError::Agent(e) => Some(e),
1469                _ => None,
1470            },
1471            Some(AgentErrorKind::EmptyResponse)
1472        );
1473    }
1474
1475    #[test]
1476    #[cfg_attr(miri, ignore)]
1477    fn test_exporter_metrics_v4() {
1478        let server = MockServer::start();
1479        let response_body = r#"{
1480                        "rate_by_service": {
1481                            "service:foo,env:staging": 1.0,
1482                            "service:,env:": 0.8
1483                        }
1484                    }"#;
1485        let traces_endpoint = server.mock(|when, then| {
1486            when.method(POST).path("/v0.4/traces");
1487            then.status(200)
1488                .header("content-type", "application/json")
1489                .body(response_body);
1490        });
1491
1492        let metrics_endpoint = server.mock(|when, then| {
1493            when.method(POST)
1494                .body_includes("\"metric\":\"trace_api.bytes\"")
1495                .path("/telemetry/proxy/api/v2/apmtelemetry");
1496            then.status(200)
1497                .header("content-type", "application/json")
1498                .body("");
1499        });
1500
1501        let mut builder = TraceExporterBuilder::default();
1502        builder
1503            .set_url(&server.url("/"))
1504            .set_service("foo")
1505            .set_env("foo-env")
1506            .set_tracer_version("v0.1")
1507            .set_language("nodejs")
1508            .set_language_version("1.0")
1509            .set_language_interpreter("v8")
1510            .enable_telemetry(TelemetryConfig {
1511                heartbeat: 100,
1512                ..Default::default()
1513            });
1514        let exporter = builder.build().unwrap();
1515
1516        let traces = vec![0x90];
1517        let result = exporter.send(traces.as_ref()).unwrap();
1518        let AgentResponse::Changed { body } = result else {
1519            panic!("Expected Changed response");
1520        };
1521        assert_eq!(body, response_body);
1522
1523        traces_endpoint.assert_calls(1);
1524        while metrics_endpoint.calls() == 0 {
1525            exporter
1526                .runtime
1527                .lock()
1528                .unwrap()
1529                .as_ref()
1530                .unwrap()
1531                .block_on(async {
1532                    sleep(Duration::from_millis(100)).await;
1533                })
1534        }
1535        metrics_endpoint.assert_calls(1);
1536    }
1537
1538    #[test]
1539    #[cfg_attr(miri, ignore)]
1540    fn test_exporter_metrics_v5() {
1541        let server = MockServer::start();
1542        let response_body = r#"{
1543                        "rate_by_service": {
1544                            "service:foo,env:staging": 1.0,
1545                            "service:,env:": 0.8
1546                        }
1547                    }"#;
1548        let traces_endpoint = server.mock(|when, then| {
1549            when.method(POST).path("/v0.5/traces");
1550            then.status(200)
1551                .header("content-type", "application/json")
1552                .body(response_body);
1553        });
1554
1555        let metrics_endpoint = server.mock(|when, then| {
1556            when.method(POST)
1557                .body_includes("\"metric\":\"trace_api.bytes\"")
1558                .path("/telemetry/proxy/api/v2/apmtelemetry");
1559            then.status(200)
1560                .header("content-type", "application/json")
1561                .body("");
1562        });
1563
1564        let exporter = build_test_exporter(
1565            server.url("/"),
1566            None,
1567            TraceExporterInputFormat::V05,
1568            TraceExporterOutputFormat::V05,
1569            true,
1570            true,
1571        );
1572
1573        let v5: (Vec<BytesString>, Vec<Vec<v05::Span>>) = (vec![], vec![]);
1574        let traces = rmp_serde::to_vec(&v5).unwrap();
1575        let result = exporter.send(traces.as_ref()).unwrap();
1576        let AgentResponse::Changed { body } = result else {
1577            panic!("Expected Changed response");
1578        };
1579        assert_eq!(body, response_body);
1580
1581        traces_endpoint.assert_calls(1);
1582        while metrics_endpoint.calls() == 0 {
1583            exporter
1584                .runtime
1585                .lock()
1586                .unwrap()
1587                .as_ref()
1588                .unwrap()
1589                .block_on(async {
1590                    sleep(Duration::from_millis(100)).await;
1591                })
1592        }
1593        metrics_endpoint.assert_calls(1);
1594    }
1595
1596    #[test]
1597    #[cfg_attr(miri, ignore)]
1598    fn test_exporter_metrics_v4_to_v5() {
1599        let server = MockServer::start();
1600        let response_body = r#"{
1601                        "rate_by_service": {
1602                            "service:foo,env:staging": 1.0,
1603                            "service:,env:": 0.8
1604                        }
1605                    }"#;
1606        let traces_endpoint = server.mock(|when, then| {
1607            when.method(POST).path("/v0.5/traces").is_true(|req| {
1608                let bytes = libdd_tinybytes::Bytes::copy_from_slice(req.body_ref());
1609                bytes.to_vec() == V5_EMPTY
1610            });
1611            then.status(200)
1612                .header("content-type", "application/json")
1613                .body(response_body);
1614        });
1615
1616        let metrics_endpoint = server.mock(|when, then| {
1617            when.method(POST)
1618                .body_includes("\"metric\":\"trace_api.bytes\"")
1619                .path("/telemetry/proxy/api/v2/apmtelemetry");
1620            then.status(200)
1621                .header("content-type", "application/json")
1622                .body("");
1623        });
1624
1625        let mut builder = TraceExporterBuilder::default();
1626        builder
1627            .set_url(&server.url("/"))
1628            .set_service("foo")
1629            .set_env("foo-env")
1630            .set_tracer_version("v0.1")
1631            .set_language("nodejs")
1632            .set_language_version("1.0")
1633            .set_language_interpreter("v8")
1634            .enable_telemetry(TelemetryConfig {
1635                heartbeat: 100,
1636                ..Default::default()
1637            })
1638            .set_input_format(TraceExporterInputFormat::V04)
1639            .set_output_format(TraceExporterOutputFormat::V05);
1640
1641        let exporter = builder.build().unwrap();
1642
1643        let traces = vec![0x90];
1644        let result = exporter.send(traces.as_ref()).unwrap();
1645        let AgentResponse::Changed { body } = result else {
1646            panic!("Expected Changed response");
1647        };
1648        assert_eq!(body, response_body);
1649
1650        traces_endpoint.assert_calls(1);
1651        while metrics_endpoint.calls() == 0 {
1652            exporter
1653                .runtime
1654                .lock()
1655                .unwrap()
1656                .as_ref()
1657                .unwrap()
1658                .block_on(async {
1659                    sleep(Duration::from_millis(100)).await;
1660                })
1661        }
1662        metrics_endpoint.assert_calls(1);
1663    }
1664
1665    #[test]
1666    #[cfg_attr(miri, ignore)]
1667    /// Tests that if agent_response_payload_version is not enabled
1668    /// the exporter always returns the response body
1669    fn test_agent_response_payload_version_disabled() {
1670        let server = MockServer::start();
1671        let response_body = r#"{
1672                        "rate_by_service": {
1673                            "service:foo,env:staging": 1.0,
1674                            "service:,env:": 0.8
1675                        }
1676                    }"#;
1677        let traces_endpoint = server.mock(|when, then| {
1678            when.method(POST).path("/v0.4/traces");
1679            then.status(200)
1680                .header("content-type", "application/json")
1681                .header("datadog-rates-payload-version", "abc")
1682                .body(response_body);
1683        });
1684
1685        let mut builder = TraceExporterBuilder::default();
1686        builder.set_url(&server.url("/"));
1687        let exporter = builder.build().unwrap();
1688        let traces = vec![0x90];
1689        for _ in 0..2 {
1690            let result = exporter.send(traces.as_ref()).unwrap();
1691            let AgentResponse::Changed { body } = result else {
1692                panic!("Expected Changed response");
1693            };
1694            assert_eq!(body, response_body);
1695        }
1696        traces_endpoint.assert_calls(2);
1697    }
1698
1699    #[test]
1700    #[cfg_attr(miri, ignore)]
1701    /// Tests that if agent_response_payload_version is enabled
1702    /// the exporter returns the response body only once
1703    /// and then returns Unchanged response until the payload version header changes
1704    fn test_agent_response_payload_version() {
1705        let server = MockServer::start();
1706        let response_body = r#"{
1707                        "rate_by_service": {
1708                            "service:foo,env:staging": 1.0,
1709                            "service:,env:": 0.8
1710                        }
1711                    }"#;
1712        let mut traces_endpoint = server.mock(|when, then| {
1713            when.method(POST).path("/v0.4/traces");
1714            then.status(200)
1715                .header("content-type", "application/json")
1716                .header("datadog-rates-payload-version", "abc")
1717                .body(response_body);
1718        });
1719
1720        let mut builder = TraceExporterBuilder::default();
1721        builder
1722            .set_url(&server.url("/"))
1723            .enable_agent_rates_payload_version();
1724        let exporter = builder.build().unwrap();
1725        let traces = vec![0x90];
1726        let result = exporter.send(traces.as_ref()).unwrap();
1727        let AgentResponse::Changed { body } = result else {
1728            panic!("Expected Changed response");
1729        };
1730        assert_eq!(body, response_body);
1731
1732        let result = exporter.send(traces.as_ref()).unwrap();
1733        let AgentResponse::Unchanged = result else {
1734            panic!("Expected Unchanged response");
1735        };
1736        traces_endpoint.assert_calls(2);
1737        traces_endpoint.delete();
1738
1739        let traces_endpoint = server.mock(|when, then| {
1740            when.method(POST).path("/v0.4/traces");
1741            then.status(200)
1742                .header("content-type", "application/json")
1743                .header("datadog-rates-payload-version", "def")
1744                .body(response_body);
1745        });
1746        let result = exporter.send(traces.as_ref()).unwrap();
1747        let AgentResponse::Changed { body } = result else {
1748            panic!("Expected Changed response");
1749        };
1750        assert_eq!(body, response_body);
1751
1752        let result = exporter.send(traces.as_ref()).unwrap();
1753        let AgentResponse::Unchanged = result else {
1754            panic!("Expected Unchanged response");
1755        };
1756        traces_endpoint.assert_calls(2);
1757    }
1758
1759    #[test]
1760    #[cfg_attr(miri, ignore)]
1761    fn test_agent_malfunction_info_4xx() {
1762        test_agent_malfunction_info(404, r#"{"error":"Not Found"}"#, Duration::from_secs(0));
1763    }
1764
1765    #[test]
1766    #[cfg_attr(miri, ignore)]
1767    fn test_agent_malfunction_info_5xx() {
1768        test_agent_malfunction_info(
1769            500,
1770            r#"{"error":"Internal Server Error"}"#,
1771            Duration::from_secs(0),
1772        );
1773    }
1774
1775    #[test]
1776    #[cfg_attr(miri, ignore)]
1777    fn test_agent_malfunction_info_timeout() {
1778        test_agent_malfunction_info(
1779            408,
1780            r#"{"error":"Internal Server Error"}"#,
1781            Duration::from_secs(600),
1782        );
1783    }
1784
1785    #[test]
1786    #[cfg_attr(miri, ignore)]
1787    fn test_agent_malfunction_info_wrong_answer() {
1788        test_agent_malfunction_info(200, "WRONG_ANSWER", Duration::from_secs(0));
1789    }
1790
1791    fn test_agent_malfunction_info(status: u16, response: &str, delay: Duration) {
1792        let server = MockServer::start();
1793
1794        let mock_traces = server.mock(|when, then| {
1795            when.method(POST)
1796                .header("Content-type", "application/msgpack")
1797                .path("/v0.4/traces");
1798            then.status(200).body(
1799                r#"{
1800                    "rate_by_service": {
1801                        "service:test,env:staging": 1.0,
1802                    }
1803                }"#,
1804            );
1805        });
1806
1807        let mock_info = server.mock(|when, then| {
1808            when.method(GET).path("/info");
1809            then.delay(delay).status(status).body(response);
1810        });
1811
1812        let mut builder = TraceExporterBuilder::default();
1813        builder
1814            .set_url(&server.url("/"))
1815            .set_service("test")
1816            .set_env("staging")
1817            .set_tracer_version("v0.1")
1818            .set_language("nodejs")
1819            .set_language_version("1.0")
1820            .set_language_interpreter("v8")
1821            .set_input_format(TraceExporterInputFormat::V04)
1822            .set_output_format(TraceExporterOutputFormat::V04)
1823            .enable_stats(Duration::from_secs(10));
1824        let exporter = builder.build().unwrap();
1825
1826        let trace_chunk = vec![SpanBytes {
1827            duration: 10,
1828            ..Default::default()
1829        }];
1830
1831        let data = msgpack_encoder::v04::to_vec(&[trace_chunk]);
1832
1833        // Wait for the info fetcher to get the config
1834        while mock_info.calls() == 0 {
1835            exporter
1836                .runtime
1837                .lock()
1838                .unwrap()
1839                .as_ref()
1840                .unwrap()
1841                .block_on(async {
1842                    sleep(Duration::from_millis(100)).await;
1843                })
1844        }
1845
1846        let _ = exporter.send(data.as_ref()).unwrap();
1847
1848        exporter.shutdown(None).unwrap();
1849
1850        mock_traces.assert();
1851    }
1852
1853    #[test]
1854    #[cfg_attr(miri, ignore)]
1855    fn test_connection_timeout() {
1856        let exporter = TraceExporterBuilder::default().build().unwrap();
1857
1858        assert_eq!(exporter.endpoint.timeout_ms, Endpoint::default().timeout_ms);
1859
1860        let timeout = Some(42);
1861        let mut builder = TraceExporterBuilder::default();
1862        builder.set_connection_timeout(timeout);
1863
1864        let exporter = builder.build().unwrap();
1865
1866        assert_eq!(exporter.endpoint.timeout_ms, 42);
1867    }
1868
1869    #[test]
1870    #[cfg_attr(miri, ignore)]
1871    fn stop_and_start_runtime() {
1872        let builder = TraceExporterBuilder::default();
1873        let exporter = builder.build().unwrap();
1874        exporter.stop_worker();
1875        exporter.run_worker().unwrap();
1876    }
1877}
1878
1879#[cfg(test)]
1880mod single_threaded_tests {
1881    use super::*;
1882    use crate::agent_info;
1883    use httpmock::prelude::*;
1884    use libdd_trace_utils::msgpack_encoder;
1885    use libdd_trace_utils::span::v04::SpanBytes;
1886    use std::time::Duration;
1887    use tokio::time::sleep;
1888
1889    #[cfg_attr(miri, ignore)]
1890    #[test]
1891    fn test_shutdown() {
1892        // Clear the agent info cache to ensure test isolation
1893        agent_info::clear_cache_for_test();
1894
1895        let server = MockServer::start();
1896
1897        let mock_traces = server.mock(|when, then| {
1898            when.method(POST)
1899                .header("Content-type", "application/msgpack")
1900                .path("/v0.4/traces");
1901            then.status(200).body("");
1902        });
1903
1904        let mock_stats = server.mock(|when, then| {
1905            when.method(POST)
1906                .header("Content-type", "application/msgpack")
1907                .path("/v0.6/stats");
1908            then.status(200).body("");
1909        });
1910
1911        let _mock_info = server.mock(|when, then| {
1912            when.method(GET).path("/info");
1913            then.status(200)
1914                .header("content-type", "application/json")
1915                .header("datadog-agent-state", "1")
1916                .body(r#"{"version":"1","client_drop_p0s":true,"endpoints":["/v0.4/traces","/v0.6/stats"]}"#);
1917        });
1918
1919        let mut builder = TraceExporterBuilder::default();
1920        builder
1921            .set_url(&server.url("/"))
1922            .set_service("test")
1923            .set_env("staging")
1924            .set_tracer_version("v0.1")
1925            .set_language("nodejs")
1926            .set_language_version("1.0")
1927            .set_language_interpreter("v8")
1928            .set_input_format(TraceExporterInputFormat::V04)
1929            .set_output_format(TraceExporterOutputFormat::V04)
1930            .enable_stats(Duration::from_secs(10));
1931        let exporter = builder.build().unwrap();
1932
1933        let trace_chunk = vec![SpanBytes {
1934            duration: 10,
1935            ..Default::default()
1936        }];
1937
1938        let data = msgpack_encoder::v04::to_vec(&[trace_chunk]);
1939
1940        // Wait for the info fetcher to get the config
1941        while agent_info::get_agent_info().is_none() {
1942            exporter
1943                .runtime
1944                .lock()
1945                .unwrap()
1946                .as_ref()
1947                .unwrap()
1948                .block_on(async {
1949                    sleep(Duration::from_millis(100)).await;
1950                })
1951        }
1952
1953        let result = exporter.send(data.as_ref());
1954        // Error received because server is returning an empty body.
1955        assert!(result.is_err());
1956
1957        // Wait for the stats worker to be active before shutting down to avoid potential flaky
1958        // tests on CI where we shutdown before the stats worker had time to start
1959        let start_time = std::time::Instant::now();
1960        while !exporter.is_stats_worker_active() {
1961            if start_time.elapsed() > Duration::from_secs(10) {
1962                panic!("Timeout waiting for stats worker to become active");
1963            }
1964            std::thread::sleep(Duration::from_millis(10));
1965        }
1966
1967        exporter.shutdown(None).unwrap();
1968
1969        // Wait for the mock server to process the stats
1970        for _ in 0..1000 {
1971            if mock_traces.calls() > 0 && mock_stats.calls() > 0 {
1972                break;
1973            } else {
1974                std::thread::sleep(Duration::from_millis(10));
1975            }
1976        }
1977
1978        mock_traces.assert();
1979        mock_stats.assert();
1980    }
1981
1982    #[cfg_attr(miri, ignore)]
1983    #[test]
1984    fn test_shutdown_with_timeout() {
1985        // Clear the agent info cache to ensure test isolation
1986        agent_info::clear_cache_for_test();
1987
1988        let server = MockServer::start();
1989
1990        let mock_traces = server.mock(|when, then| {
1991            when.method(POST)
1992                .header("Content-type", "application/msgpack")
1993                .path("/v0.4/traces");
1994            then.status(200).body(
1995                r#"{
1996                    "rate_by_service": {
1997                        "service:foo,env:staging": 1.0,
1998                        "service:,env:": 0.8
1999                    }
2000                }"#,
2001            );
2002        });
2003
2004        let _mock_stats = server.mock(|when, then| {
2005            when.method(POST)
2006                .header("Content-type", "application/msgpack")
2007                .path("/v0.6/stats");
2008            then.delay(Duration::from_secs(10)).status(200).body("");
2009        });
2010
2011        let _mock_info = server.mock(|when, then| {
2012            when.method(GET).path("/info");
2013            then.status(200)
2014                .header("content-type", "application/json")
2015                .header("datadog-agent-state", "1")
2016                .body(r#"{"version":"1","client_drop_p0s":true,"endpoints":["/v0.4/traces","/v0.6/stats"]}"#);
2017        });
2018
2019        let mut builder = TraceExporterBuilder::default();
2020        builder
2021            .set_url(&server.url("/"))
2022            .set_service("test")
2023            .set_env("staging")
2024            .set_tracer_version("v0.1")
2025            .set_language("nodejs")
2026            .set_language_version("1.0")
2027            .set_language_interpreter("v8")
2028            .set_input_format(TraceExporterInputFormat::V04)
2029            .set_output_format(TraceExporterOutputFormat::V04)
2030            .enable_stats(Duration::from_secs(10));
2031        let exporter = builder.build().unwrap();
2032
2033        let trace_chunk = vec![SpanBytes {
2034            service: "test".into(),
2035            name: "test".into(),
2036            resource: "test".into(),
2037            r#type: "test".into(),
2038            duration: 10,
2039            ..Default::default()
2040        }];
2041
2042        let data = msgpack_encoder::v04::to_vec(&[trace_chunk]);
2043
2044        // Wait for agent_info to be present so that sending a trace will trigger the stats worker
2045        // to start
2046        while agent_info::get_agent_info().is_none() {
2047            exporter
2048                .runtime
2049                .lock()
2050                .unwrap()
2051                .as_ref()
2052                .unwrap()
2053                .block_on(async {
2054                    sleep(Duration::from_millis(100)).await;
2055                })
2056        }
2057
2058        exporter.send(data.as_ref()).unwrap();
2059
2060        // Wait for the stats worker to be active before shutting down to avoid potential flaky
2061        // tests on CI where we shutdown before the stats worker had time to start
2062        let start_time = std::time::Instant::now();
2063        while !exporter.is_stats_worker_active() {
2064            if start_time.elapsed() > Duration::from_secs(10) {
2065                panic!("Timeout waiting for stats worker to become active");
2066            }
2067            std::thread::sleep(Duration::from_millis(10));
2068        }
2069
2070        exporter
2071            .shutdown(Some(Duration::from_millis(5)))
2072            .unwrap_err(); // The shutdown should timeout
2073
2074        mock_traces.assert();
2075    }
2076}