Skip to main content

libdd_data_pipeline/trace_exporter/
mod.rs

1// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
2// SPDX-License-Identifier: Apache-2.0
3pub mod agent_response;
4pub mod builder;
5pub mod error;
6pub mod metrics;
7pub mod stats;
8mod trace_serializer;
9
10// Re-export the builder
11pub use builder::TraceExporterBuilder;
12
13use self::agent_response::AgentResponse;
14use self::metrics::MetricsEmitter;
15use self::stats::StatsComputationStatus;
16use self::trace_serializer::TraceSerializer;
17use crate::agent_info::{AgentInfoFetcher, ResponseObserver};
18use crate::pausable_worker::PausableWorker;
19use crate::stats_exporter::StatsExporter;
20use crate::telemetry::{SendPayloadTelemetry, TelemetryClient};
21use crate::trace_exporter::agent_response::{
22    AgentResponsePayloadVersion, DATADOG_RATES_PAYLOAD_VERSION_HEADER,
23};
24use crate::trace_exporter::error::{InternalErrorKind, RequestError, TraceExporterError};
25use crate::{
26    agent_info::{self, schema::AgentInfo},
27    health_metrics,
28    health_metrics::{HealthMetric, SendResult, TransportErrorType},
29};
30use arc_swap::{ArcSwap, ArcSwapOption};
31use http::uri::PathAndQuery;
32use http::Uri;
33use http_body_util::BodyExt;
34use libdd_common::tag::Tag;
35use libdd_common::{http_common, Endpoint};
36use libdd_common::{HttpClient, MutexExt};
37use libdd_dogstatsd_client::Client;
38use libdd_telemetry::worker::TelemetryWorker;
39use libdd_trace_utils::msgpack_decoder;
40use libdd_trace_utils::send_with_retry::{
41    send_with_retry, RetryStrategy, SendWithRetryError, SendWithRetryResult,
42};
43use libdd_trace_utils::span::{v04::Span, TraceData};
44use libdd_trace_utils::trace_utils::TracerHeaderTags;
45use std::io;
46use std::sync::{Arc, Mutex};
47use std::time::Duration;
48use std::{borrow::Borrow, collections::HashMap, str::FromStr};
49use tokio::runtime::Runtime;
50use tracing::{debug, error, warn};
51
52const INFO_ENDPOINT: &str = "/info";
53
54/// TraceExporterInputFormat represents the format of the input traces.
55/// The input format can be either Proxy or V0.4, where V0.4 is the default.
56#[derive(Copy, Clone, Debug, Default, PartialEq)]
57#[repr(C)]
58pub enum TraceExporterInputFormat {
59    #[allow(missing_docs)]
60    #[default]
61    V04,
62    V05,
63}
64
65/// TraceExporterOutputFormat represents the format of the output traces.
66/// The output format can be either V0.4 or v0.5, where V0.4 is the default.
67#[derive(Copy, Clone, Debug, Default, PartialEq)]
68#[repr(C)]
69pub enum TraceExporterOutputFormat {
70    #[allow(missing_docs)]
71    #[default]
72    V04,
73    V05,
74}
75
76impl TraceExporterOutputFormat {
77    /// Add the agent trace endpoint path to the URL.
78    fn add_path(&self, url: &Uri) -> Uri {
79        add_path(
80            url,
81            match self {
82                TraceExporterOutputFormat::V04 => "/v0.4/traces",
83                TraceExporterOutputFormat::V05 => "/v0.5/traces",
84            },
85        )
86    }
87}
88
89/// Add a path to the URL.
90///
91/// # Arguments
92///
93/// * `url` - The URL to which the path is to be added.
94/// * `path` - The path to be added to the URL.
95fn add_path(url: &Uri, path: &str) -> Uri {
96    let p_and_q = url.path_and_query();
97
98    #[allow(clippy::unwrap_used)]
99    let new_p_and_q = match p_and_q {
100        Some(pq) => {
101            let p = pq.path();
102            let mut p = p.strip_suffix('/').unwrap_or(p).to_owned();
103            p.push_str(path);
104
105            PathAndQuery::from_str(p.as_str())
106        }
107        None => PathAndQuery::from_str(path),
108    }
109    // TODO: Properly handle non-OK states to prevent possible panics (APMSP-18190).
110    .unwrap();
111    let mut parts = url.clone().into_parts();
112    parts.path_and_query = Some(new_p_and_q);
113    // TODO: Properly handle non-OK states to prevent possible panics (APMSP-18190).
114    #[allow(clippy::unwrap_used)]
115    Uri::from_parts(parts).unwrap()
116}
117
118#[derive(Clone, Default, Debug)]
119pub struct TracerMetadata {
120    pub hostname: String,
121    pub env: String,
122    pub app_version: String,
123    pub runtime_id: String,
124    pub service: String,
125    pub tracer_version: String,
126    pub language: String,
127    pub language_version: String,
128    pub language_interpreter: String,
129    pub language_interpreter_vendor: String,
130    pub git_commit_sha: String,
131    pub client_computed_stats: bool,
132    pub client_computed_top_level: bool,
133}
134
135impl<'a> From<&'a TracerMetadata> for TracerHeaderTags<'a> {
136    fn from(tags: &'a TracerMetadata) -> TracerHeaderTags<'a> {
137        TracerHeaderTags::<'_> {
138            lang: &tags.language,
139            lang_version: &tags.language_version,
140            tracer_version: &tags.tracer_version,
141            lang_interpreter: &tags.language_interpreter,
142            lang_vendor: &tags.language_interpreter_vendor,
143            client_computed_stats: tags.client_computed_stats,
144            client_computed_top_level: tags.client_computed_top_level,
145            ..Default::default()
146        }
147    }
148}
149
150impl<'a> From<&'a TracerMetadata> for HashMap<&'static str, String> {
151    fn from(tags: &'a TracerMetadata) -> HashMap<&'static str, String> {
152        TracerHeaderTags::from(tags).into()
153    }
154}
155
156#[derive(Debug)]
157pub(crate) struct TraceExporterWorkers {
158    pub info: PausableWorker<AgentInfoFetcher>,
159    pub stats: Option<PausableWorker<StatsExporter>>,
160    pub telemetry: Option<PausableWorker<TelemetryWorker>>,
161}
162
163/// The TraceExporter ingest traces from the tracers serialized as messagepack and forward them to
164/// the agent while applying some transformation.
165///
166/// # Proxy
167/// If the input format is set as `Proxy`, the exporter will forward traces to the agent without
168/// deserializing them.
169///
170/// # Features
171/// When the input format is set to `V04` the TraceExporter will deserialize the traces and perform
172/// some operation before sending them to the agent. The available operations are described below.
173///
174/// ## V07 Serialization
175/// The Trace exporter can serialize the traces to V07 before sending them to the agent.
176///
177/// ## Stats computation
178/// The Trace Exporter can compute stats on traces. In this case the trace exporter will start
179/// another task to send stats when a time bucket expire. When this feature is enabled the
180/// TraceExporter drops all spans that may not be sampled by the agent.
181#[allow(missing_docs)]
182enum DeserInputFormat {
183    V04,
184    V05,
185}
186
187#[derive(Debug)]
188pub struct TraceExporter {
189    endpoint: Endpoint,
190    metadata: TracerMetadata,
191    input_format: TraceExporterInputFormat,
192    output_format: TraceExporterOutputFormat,
193    // TODO - do something with the response callback - https://datadoghq.atlassian.net/browse/APMSP-1019
194    runtime: Arc<Mutex<Option<Arc<Runtime>>>>,
195    /// None if dogstatsd is disabled
196    dogstatsd: Option<Client>,
197    common_stats_tags: Vec<Tag>,
198    client_computed_top_level: bool,
199    client_side_stats: ArcSwap<StatsComputationStatus>,
200    previous_info_state: ArcSwapOption<String>,
201    info_response_observer: ResponseObserver,
202    telemetry: Option<TelemetryClient>,
203    health_metrics_enabled: bool,
204    workers: Arc<Mutex<TraceExporterWorkers>>,
205    agent_payload_response_version: Option<AgentResponsePayloadVersion>,
206    http_client: HttpClient,
207}
208
209impl TraceExporter {
210    #[allow(missing_docs)]
211    pub fn builder() -> TraceExporterBuilder {
212        TraceExporterBuilder::default()
213    }
214
215    /// Return the existing runtime or create a new one and start all workers
216    fn runtime(&self) -> Result<Arc<Runtime>, TraceExporterError> {
217        let mut runtime_guard = self.runtime.lock_or_panic();
218        match runtime_guard.as_ref() {
219            Some(runtime) => {
220                // Runtime already running
221                Ok(runtime.clone())
222            }
223            None => {
224                // Create a new current thread runtime with all features enabled
225                let runtime = Arc::new(
226                    tokio::runtime::Builder::new_multi_thread()
227                        .worker_threads(1)
228                        .enable_all()
229                        .build()?,
230                );
231                *runtime_guard = Some(runtime.clone());
232                self.start_all_workers(&runtime)?;
233                Ok(runtime)
234            }
235        }
236    }
237
238    /// Manually start all workers
239    pub fn run_worker(&self) -> Result<(), TraceExporterError> {
240        self.runtime()?;
241        Ok(())
242    }
243
244    /// Start all workers with the given runtime
245    fn start_all_workers(&self, runtime: &Arc<Runtime>) -> Result<(), TraceExporterError> {
246        let mut workers = self.workers.lock_or_panic();
247
248        self.start_info_worker(&mut workers, runtime)?;
249        self.start_stats_worker(&mut workers, runtime)?;
250        self.start_telemetry_worker(&mut workers, runtime)?;
251
252        Ok(())
253    }
254
255    /// Start the info worker
256    fn start_info_worker(
257        &self,
258        workers: &mut TraceExporterWorkers,
259        runtime: &Arc<Runtime>,
260    ) -> Result<(), TraceExporterError> {
261        workers.info.start(runtime).map_err(|e| {
262            TraceExporterError::Internal(InternalErrorKind::InvalidWorkerState(e.to_string()))
263        })
264    }
265
266    /// Start the stats worker if present
267    fn start_stats_worker(
268        &self,
269        workers: &mut TraceExporterWorkers,
270        runtime: &Arc<Runtime>,
271    ) -> Result<(), TraceExporterError> {
272        if let Some(stats_worker) = &mut workers.stats {
273            stats_worker.start(runtime).map_err(|e| {
274                TraceExporterError::Internal(InternalErrorKind::InvalidWorkerState(e.to_string()))
275            })?;
276        }
277        Ok(())
278    }
279
280    /// Start the telemetry worker if present
281    fn start_telemetry_worker(
282        &self,
283        workers: &mut TraceExporterWorkers,
284        runtime: &Arc<Runtime>,
285    ) -> Result<(), TraceExporterError> {
286        if let Some(telemetry_worker) = &mut workers.telemetry {
287            telemetry_worker.start(runtime).map_err(|e| {
288                TraceExporterError::Internal(InternalErrorKind::InvalidWorkerState(e.to_string()))
289            })?;
290            if let Some(client) = &self.telemetry {
291                runtime.block_on(client.start());
292            }
293        }
294        Ok(())
295    }
296
297    pub fn stop_worker(&self) {
298        let runtime = self.runtime.lock_or_panic().take();
299        if let Some(ref rt) = runtime {
300            // Stop workers to save their state
301            let mut workers = self.workers.lock_or_panic();
302            rt.block_on(async {
303                let _ = workers.info.pause().await;
304                if let Some(stats_worker) = &mut workers.stats {
305                    let _ = stats_worker.pause().await;
306                };
307                if let Some(telemetry_worker) = &mut workers.telemetry {
308                    let _ = telemetry_worker.pause().await;
309                };
310            });
311        }
312        // When the info fetcher is paused, the trigger channel keeps a reference to the runtime's
313        // IoStack as a waker. This prevents the IoStack from being dropped when shutting
314        // down runtime. By manually sending a message to the trigger channel we trigger the
315        // waker releasing the reference to the IoStack. Finally we drain the channel to
316        // avoid triggering a fetch when the info fetcher is restarted.
317        if let PausableWorker::Paused { worker } = &mut self.workers.lock_or_panic().info {
318            self.info_response_observer.manual_trigger();
319            worker.drain();
320        }
321        drop(runtime);
322    }
323
324    /// Send msgpack serialized traces to the agent
325    ///
326    /// # Arguments
327    ///
328    /// * data: A slice containing the serialized traces. This slice should be encoded following the
329    ///   input_format passed to the TraceExporter on creating.
330    ///
331    /// # Returns
332    /// * Ok(AgentResponse): The response from the agent
333    /// * Err(TraceExporterError): An error detailing what went wrong in the process
334    pub fn send(&self, data: &[u8]) -> Result<AgentResponse, TraceExporterError> {
335        self.check_agent_info();
336
337        let res = match self.input_format {
338            TraceExporterInputFormat::V04 => self.send_deser(data, DeserInputFormat::V04),
339            TraceExporterInputFormat::V05 => self.send_deser(data, DeserInputFormat::V05),
340        }?;
341        if matches!(&res, AgentResponse::Changed { body } if body.is_empty()) {
342            return Err(TraceExporterError::Agent(
343                error::AgentErrorKind::EmptyResponse,
344            ));
345        }
346
347        Ok(res)
348    }
349
350    /// Safely shutdown the TraceExporter and all related tasks
351    pub fn shutdown(mut self, timeout: Option<Duration>) -> Result<(), TraceExporterError> {
352        let runtime = tokio::runtime::Builder::new_current_thread()
353            .enable_all()
354            .build()?;
355
356        if let Some(timeout) = timeout {
357            match runtime
358                .block_on(async { tokio::time::timeout(timeout, self.shutdown_async()).await })
359            {
360                Ok(()) => Ok(()),
361                Err(_e) => Err(TraceExporterError::Shutdown(
362                    error::ShutdownError::TimedOut(timeout),
363                )),
364            }
365        } else {
366            runtime.block_on(self.shutdown_async());
367            Ok(())
368        }
369    }
370
371    /// Future used inside `Self::shutdown`.
372    ///
373    /// This function should not take ownership of the trace exporter as it will cause the runtime
374    /// stored in the trace exporter to be dropped in a non-blocking context causing a panic.
375    async fn shutdown_async(&mut self) {
376        let stats_status = self.client_side_stats.load();
377        if let StatsComputationStatus::Enabled {
378            cancellation_token, ..
379        } = stats_status.as_ref()
380        {
381            cancellation_token.cancel();
382
383            let stats_worker = self.workers.lock_or_panic().stats.take();
384
385            if let Some(stats_worker) = stats_worker {
386                let _ = stats_worker.join().await;
387            }
388        }
389        if let Some(telemetry) = self.telemetry.take() {
390            telemetry.shutdown().await;
391            let telemetry_worker = self.workers.lock_or_panic().telemetry.take();
392
393            if let Some(telemetry_worker) = telemetry_worker {
394                let _ = telemetry_worker.join().await;
395            }
396        }
397    }
398
399    /// Check if agent info state has changed
400    fn has_agent_info_state_changed(&self, agent_info: &Arc<AgentInfo>) -> bool {
401        Some(agent_info.state_hash.as_str())
402            != self
403                .previous_info_state
404                .load()
405                .as_deref()
406                .map(|s| s.as_str())
407    }
408
409    fn check_agent_info(&self) {
410        if let Some(agent_info) = agent_info::get_agent_info() {
411            if self.has_agent_info_state_changed(&agent_info) {
412                match &**self.client_side_stats.load() {
413                    StatsComputationStatus::Disabled => {}
414                    StatsComputationStatus::DisabledByAgent { .. } => {
415                        let ctx = stats::StatsContext {
416                            metadata: &self.metadata,
417                            endpoint_url: &self.endpoint.url,
418                            runtime: &self.runtime,
419                        };
420                        stats::handle_stats_disabled_by_agent(
421                            &ctx,
422                            &agent_info,
423                            &self.client_side_stats,
424                            &self.workers,
425                            self.http_client.clone(),
426                        );
427                    }
428                    StatsComputationStatus::Enabled {
429                        stats_concentrator, ..
430                    } => {
431                        let ctx = stats::StatsContext {
432                            metadata: &self.metadata,
433                            endpoint_url: &self.endpoint.url,
434                            runtime: &self.runtime,
435                        };
436                        stats::handle_stats_enabled(
437                            &ctx,
438                            &agent_info,
439                            stats_concentrator,
440                            &self.client_side_stats,
441                            &self.workers,
442                        );
443                    }
444                }
445                self.previous_info_state
446                    .store(Some(agent_info.state_hash.clone().into()))
447            }
448        }
449    }
450
451    /// !!! This function is only for testing purposes !!!
452    ///
453    /// Waits the agent info to be ready by checking the agent_info state.
454    /// It will only return Ok after the agent info has been fetched at least once or Err if timeout
455    /// has been reached
456    ///
457    /// In production:
458    /// 1) We should not synchronously wait for this to be ready before sending traces
459    /// 2) It's not guaranteed to not block forever, since the /info endpoint might not be
460    ///    available.
461    ///
462    /// The `send` function will check agent_info when running, which will only be available if the
463    /// fetcher had time to reach to the agent.
464    /// Since agent_info can enable CSS computation, waiting for this during testing can make
465    /// snapshots non-deterministic.
466    #[cfg(feature = "test-utils")]
467    pub fn wait_agent_info_ready(&self, timeout: Duration) -> anyhow::Result<()> {
468        let start = std::time::Instant::now();
469        loop {
470            if std::time::Instant::now().duration_since(start) > timeout {
471                anyhow::bail!("Timeout waiting for agent info to be ready",);
472            }
473            if agent_info::get_agent_info().is_some() {
474                return Ok(());
475            }
476            std::thread::sleep(Duration::from_millis(10));
477        }
478    }
479
480    /// Emit a health metric to dogstatsd
481    fn emit_metric(&self, metric: HealthMetric, custom_tags: Option<Vec<&Tag>>) {
482        if self.health_metrics_enabled {
483            let emitter = MetricsEmitter::new(self.dogstatsd.as_ref(), &self.common_stats_tags);
484            emitter.emit(metric, custom_tags);
485        }
486    }
487
488    /// Emit all health metrics from a SendResult
489    fn emit_send_result(&self, result: &SendResult) {
490        if self.health_metrics_enabled {
491            let emitter = MetricsEmitter::new(self.dogstatsd.as_ref(), &self.common_stats_tags);
492            emitter.emit_from_send_result(result);
493        }
494    }
495
496    /// Send a list of trace chunks to the agent
497    ///
498    /// # Arguments
499    /// * trace_chunks: A list of trace chunks. Each trace chunk is a list of spans.
500    ///
501    /// # Returns
502    /// * Ok(String): The response from the agent
503    /// * Err(TraceExporterError): An error detailing what went wrong in the process
504    pub fn send_trace_chunks<T: TraceData>(
505        &self,
506        trace_chunks: Vec<Vec<Span<T>>>,
507    ) -> Result<AgentResponse, TraceExporterError> {
508        self.check_agent_info();
509        self.runtime()?
510            .block_on(async { self.send_trace_chunks_inner(trace_chunks).await })
511    }
512
513    /// Send a list of trace chunks to the agent, asynchronously
514    ///
515    /// # Arguments
516    /// * trace_chunks: A list of trace chunks. Each trace chunk is a list of spans.
517    ///
518    /// # Returns
519    /// * Ok(String): The response from the agent
520    /// * Err(TraceExporterError): An error detailing what went wrong in the process
521    pub async fn send_trace_chunks_async<T: TraceData>(
522        &self,
523        trace_chunks: Vec<Vec<Span<T>>>,
524    ) -> Result<AgentResponse, TraceExporterError> {
525        self.check_agent_info();
526        self.send_trace_chunks_inner(trace_chunks).await
527    }
528
529    /// Deserializes, processes and sends trace chunks to the agent
530    fn send_deser(
531        &self,
532        data: &[u8],
533        format: DeserInputFormat,
534    ) -> Result<AgentResponse, TraceExporterError> {
535        let (traces, _) = match format {
536            DeserInputFormat::V04 => msgpack_decoder::v04::from_slice(data),
537            DeserInputFormat::V05 => msgpack_decoder::v05::from_slice(data),
538        }
539        .map_err(|e| {
540            error!("Error deserializing trace from request body: {e}");
541            self.emit_metric(
542                HealthMetric::Count(health_metrics::DESERIALIZE_TRACES_ERRORS, 1),
543                None,
544            );
545            TraceExporterError::Deserialization(e)
546        })?;
547        debug!(
548            trace_count = traces.len(),
549            "Trace deserialization completed successfully"
550        );
551        self.emit_metric(
552            HealthMetric::Count(health_metrics::DESERIALIZE_TRACES, traces.len() as i64),
553            None,
554        );
555
556        self.runtime()?
557            .block_on(async { self.send_trace_chunks_inner(traces).await })
558    }
559
560    /// Send traces payload to agent with retry and telemetry reporting
561    async fn send_traces_with_telemetry(
562        &self,
563        endpoint: &Endpoint,
564        mp_payload: Vec<u8>,
565        headers: HashMap<&'static str, String>,
566        chunks: usize,
567        chunks_dropped_p0: usize,
568    ) -> Result<AgentResponse, TraceExporterError> {
569        let strategy = RetryStrategy::default();
570        let payload_len = mp_payload.len();
571
572        // Send traces to the agent
573        let result =
574            send_with_retry(&self.http_client, endpoint, mp_payload, &headers, &strategy).await;
575
576        // Send telemetry for the payload sending
577        if let Some(telemetry) = &self.telemetry {
578            if let Err(e) = telemetry.send(&SendPayloadTelemetry::from_retry_result(
579                &result,
580                payload_len as u64,
581                chunks as u64,
582                chunks_dropped_p0 as u64,
583            )) {
584                error!(?e, "Error sending telemetry");
585            }
586        }
587
588        self.handle_send_result(result, chunks, payload_len).await
589    }
590
591    async fn send_trace_chunks_inner<T: TraceData>(
592        &self,
593        mut traces: Vec<Vec<Span<T>>>,
594    ) -> Result<AgentResponse, TraceExporterError> {
595        let mut header_tags: TracerHeaderTags = self.metadata.borrow().into();
596
597        // Process stats computation
598        let dropped_p0_stats = stats::process_traces_for_stats(
599            &mut traces,
600            &mut header_tags,
601            &self.client_side_stats,
602            self.client_computed_top_level,
603        );
604
605        let serializer = TraceSerializer::new(
606            self.output_format,
607            self.agent_payload_response_version.as_ref(),
608        );
609        let prepared = match serializer.prepare_traces_payload(traces, header_tags) {
610            Ok(p) => p,
611            Err(e) => {
612                error!("Error serializing traces: {e}");
613                self.emit_metric(
614                    HealthMetric::Count(health_metrics::SERIALIZE_TRACES_ERRORS, 1),
615                    None,
616                );
617                return Err(e);
618            }
619        };
620
621        let endpoint = Endpoint {
622            url: self.get_agent_url(),
623            ..self.endpoint.clone()
624        };
625
626        self.send_traces_with_telemetry(
627            &endpoint,
628            prepared.data,
629            prepared.headers,
630            prepared.chunk_count,
631            dropped_p0_stats.dropped_p0_traces,
632        )
633        .await
634    }
635
636    /// Handle the result of sending traces to the agent
637    async fn handle_send_result(
638        &self,
639        result: SendWithRetryResult,
640        chunks: usize,
641        payload_len: usize,
642    ) -> Result<AgentResponse, TraceExporterError> {
643        match result {
644            Ok((response, attempts)) => {
645                self.handle_agent_response(chunks, response, payload_len, attempts)
646                    .await
647            }
648            Err(err) => self.handle_send_error(err, payload_len, chunks).await,
649        }
650    }
651
652    /// Handle errors from send with retry operation
653    async fn handle_send_error(
654        &self,
655        err: SendWithRetryError,
656        payload_len: usize,
657        chunks: usize,
658    ) -> Result<AgentResponse, TraceExporterError> {
659        error!(?err, "Error sending traces");
660
661        match err {
662            SendWithRetryError::Http(response, attempts) => {
663                self.handle_http_send_error(response, payload_len, chunks, attempts)
664                    .await
665            }
666            SendWithRetryError::Timeout(attempts) => {
667                let send_result =
668                    SendResult::failure(TransportErrorType::Timeout, payload_len, chunks, attempts);
669                self.emit_send_result(&send_result);
670                Err(TraceExporterError::from(io::Error::from(
671                    io::ErrorKind::TimedOut,
672                )))
673            }
674            SendWithRetryError::Network(err, attempts) => {
675                let send_result =
676                    SendResult::failure(TransportErrorType::Network, payload_len, chunks, attempts);
677                self.emit_send_result(&send_result);
678                Err(TraceExporterError::from(err))
679            }
680            SendWithRetryError::Build(attempts) => {
681                let send_result =
682                    SendResult::failure(TransportErrorType::Build, payload_len, chunks, attempts);
683                self.emit_send_result(&send_result);
684                Err(TraceExporterError::from(io::Error::from(
685                    io::ErrorKind::Other,
686                )))
687            }
688        }
689    }
690
691    /// Handle HTTP error responses from send with retry
692    async fn handle_http_send_error(
693        &self,
694        response: http_common::HttpResponse,
695        payload_len: usize,
696        chunks: usize,
697        attempts: u32,
698    ) -> Result<AgentResponse, TraceExporterError> {
699        let status = response.status();
700
701        // Check if the agent state has changed for error responses
702        self.info_response_observer.check_response(&response);
703
704        // Emit health metrics using SendResult
705        let send_result = SendResult::failure(
706            TransportErrorType::Http(status.as_u16()),
707            payload_len,
708            chunks,
709            attempts,
710        );
711        self.emit_send_result(&send_result);
712
713        let body = self.read_error_response_body(response).await?;
714        Err(TraceExporterError::Request(RequestError::new(
715            status,
716            &String::from_utf8_lossy(&body),
717        )))
718    }
719
720    /// Read response body from error response
721    async fn read_error_response_body(
722        &self,
723        response: http_common::HttpResponse,
724    ) -> Result<bytes::Bytes, TraceExporterError> {
725        match response.into_body().collect().await {
726            Ok(body) => Ok(body.to_bytes()),
727            Err(err) => {
728                error!(?err, "Error reading agent response body");
729                Err(TraceExporterError::from(err))
730            }
731        }
732    }
733
734    /// Check if the agent's payload version has changed based on response headers
735    fn check_payload_version_changed(&self, response: &http_common::HttpResponse) -> bool {
736        let status = response.status();
737        match (
738            status.is_success(),
739            self.agent_payload_response_version.as_ref(),
740            response.headers().get(DATADOG_RATES_PAYLOAD_VERSION_HEADER),
741        ) {
742            (false, _, _) => {
743                // If the status is not success, the rates are considered unchanged
744                false
745            }
746            (true, None, _) => {
747                // if the agent_payload_response_version fingerprint is not enabled we always
748                // return the new rates
749                true
750            }
751            (true, Some(agent_payload_response_version), Some(new_payload_version)) => {
752                if let Ok(new_payload_version_str) = new_payload_version.to_str() {
753                    agent_payload_response_version.check_and_update(new_payload_version_str)
754                } else {
755                    false
756                }
757            }
758            _ => false,
759        }
760    }
761
762    /// Read response body and handle potential errors
763    async fn read_response_body(
764        response: http_common::HttpResponse,
765    ) -> Result<String, http_common::Error> {
766        let body = http_common::collect_response_bytes(response).await?;
767        Ok(String::from_utf8_lossy(&body).to_string())
768    }
769
770    /// Handle successful trace sending response
771    fn handle_successful_trace_response(
772        &self,
773        chunks: usize,
774        payload_len: usize,
775        attempts: u32,
776        body: String,
777        payload_version_changed: bool,
778    ) -> Result<AgentResponse, TraceExporterError> {
779        debug!(chunks = chunks, "Trace chunks sent successfully to agent");
780        let send_result = SendResult::success(payload_len, chunks, attempts);
781        self.emit_send_result(&send_result);
782
783        Ok(if payload_version_changed {
784            AgentResponse::Changed { body }
785        } else {
786            AgentResponse::Unchanged
787        })
788    }
789
790    async fn handle_agent_response(
791        &self,
792        chunks: usize,
793        response: http_common::HttpResponse,
794        payload_len: usize,
795        attempts: u32,
796    ) -> Result<AgentResponse, TraceExporterError> {
797        // Check if the agent state has changed
798        self.info_response_observer.check_response(&response);
799
800        let status = response.status();
801        let payload_version_changed = self.check_payload_version_changed(&response);
802
803        match Self::read_response_body(response).await {
804            Ok(body) => {
805                if !status.is_success() {
806                    warn!(
807                        status = %status,
808                        "Agent returned non-success status for trace send"
809                    );
810                    let send_result = SendResult::failure(
811                        TransportErrorType::Http(status.as_u16()),
812                        payload_len,
813                        chunks,
814                        attempts,
815                    );
816                    self.emit_send_result(&send_result);
817                    return Err(TraceExporterError::Request(RequestError::new(
818                        status, &body,
819                    )));
820                }
821
822                self.handle_successful_trace_response(
823                    chunks,
824                    payload_len,
825                    attempts,
826                    body,
827                    payload_version_changed,
828                )
829            }
830            Err(err) => {
831                error!(?err, "Error reading agent response body");
832                let send_result = SendResult::failure(
833                    TransportErrorType::ResponseBody,
834                    payload_len,
835                    chunks,
836                    attempts,
837                );
838                self.emit_send_result(&send_result);
839                Err(TraceExporterError::from(err))
840            }
841        }
842    }
843
844    fn get_agent_url(&self) -> Uri {
845        self.output_format.add_path(&self.endpoint.url)
846    }
847
848    #[cfg(test)]
849    /// Test only function to check if the stats computation is active and the worker is running
850    pub fn is_stats_worker_active(&self) -> bool {
851        stats::is_stats_worker_active(&self.client_side_stats, &self.workers)
852    }
853}
854
855#[derive(Debug, Default, Clone)]
856pub struct TelemetryConfig {
857    pub heartbeat: u64,
858    pub runtime_id: Option<String>,
859    pub debug_enabled: bool,
860}
861
862#[allow(missing_docs)]
863pub trait ResponseCallback {
864    #[allow(missing_docs)]
865    fn call(&self, response: &str);
866}
867
868#[cfg(test)]
869mod tests {
870    use self::error::AgentErrorKind;
871    use super::*;
872    use httpmock::prelude::*;
873    use httpmock::MockServer;
874    use libdd_tinybytes::BytesString;
875    use libdd_trace_utils::msgpack_encoder;
876    use libdd_trace_utils::span::v04::SpanBytes;
877    use libdd_trace_utils::span::v05;
878    use std::collections::HashMap;
879    use std::net;
880    use std::time::Duration;
881    use tokio::time::sleep;
882
883    // v05 messagepack empty payload -> [[""], []]
884    const V5_EMPTY: [u8; 4] = [0x92, 0x91, 0xA0, 0x90];
885
886    #[test]
887    fn test_from_tracer_tags_to_tracer_header_tags() {
888        let tracer_tags = TracerMetadata {
889            tracer_version: "v0.1".to_string(),
890            language: "rust".to_string(),
891            language_version: "1.52.1".to_string(),
892            language_interpreter: "rustc".to_string(),
893            language_interpreter_vendor: "rust-lang".to_string(),
894            client_computed_stats: true,
895            client_computed_top_level: true,
896            ..Default::default()
897        };
898
899        let tracer_header_tags: TracerHeaderTags = (&tracer_tags).into();
900
901        assert_eq!(tracer_header_tags.tracer_version, "v0.1");
902        assert_eq!(tracer_header_tags.lang, "rust");
903        assert_eq!(tracer_header_tags.lang_version, "1.52.1");
904        assert_eq!(tracer_header_tags.lang_interpreter, "rustc");
905        assert_eq!(tracer_header_tags.lang_vendor, "rust-lang");
906        assert!(tracer_header_tags.client_computed_stats);
907        assert!(tracer_header_tags.client_computed_top_level);
908    }
909
910    #[test]
911    fn test_from_tracer_tags_to_hashmap() {
912        let tracer_tags = TracerMetadata {
913            tracer_version: "v0.1".to_string(),
914            language: "rust".to_string(),
915            language_version: "1.52.1".to_string(),
916            language_interpreter: "rustc".to_string(),
917            client_computed_stats: true,
918            client_computed_top_level: true,
919            ..Default::default()
920        };
921
922        let hashmap: HashMap<&'static str, String> = (&tracer_tags).into();
923
924        assert_eq!(hashmap.get("datadog-meta-tracer-version").unwrap(), "v0.1");
925        assert_eq!(hashmap.get("datadog-meta-lang").unwrap(), "rust");
926        assert_eq!(hashmap.get("datadog-meta-lang-version").unwrap(), "1.52.1");
927        assert_eq!(
928            hashmap.get("datadog-meta-lang-interpreter").unwrap(),
929            "rustc"
930        );
931        assert!(hashmap.contains_key("datadog-client-computed-stats"));
932        assert!(hashmap.contains_key("datadog-client-computed-top-level"));
933    }
934
935    fn read(socket: &net::UdpSocket) -> String {
936        let mut buf = [0; 1_000];
937        socket.recv(&mut buf).expect("No data");
938        let datagram = String::from_utf8_lossy(buf.as_ref());
939        datagram.trim_matches(char::from(0)).to_string()
940    }
941
942    fn build_test_exporter(
943        url: String,
944        dogstatsd_url: Option<String>,
945        input: TraceExporterInputFormat,
946        output: TraceExporterOutputFormat,
947        enable_telemetry: bool,
948        enable_health_metrics: bool,
949    ) -> TraceExporter {
950        let mut builder = TraceExporterBuilder::default();
951        builder
952            .set_url(&url)
953            .set_service("test")
954            .set_env("staging")
955            .set_tracer_version("v0.1")
956            .set_language("nodejs")
957            .set_language_version("1.0")
958            .set_language_interpreter("v8")
959            .set_input_format(input)
960            .set_output_format(output);
961
962        if enable_health_metrics {
963            builder.enable_health_metrics();
964        }
965
966        if let Some(url) = dogstatsd_url {
967            builder.set_dogstatsd_url(&url);
968        };
969
970        if enable_telemetry {
971            builder.enable_telemetry(TelemetryConfig {
972                heartbeat: 100,
973                ..Default::default()
974            });
975        }
976
977        builder.build().unwrap()
978    }
979
980    #[test]
981    #[cfg_attr(miri, ignore)]
982    fn test_health_metrics() {
983        let stats_socket = net::UdpSocket::bind("127.0.0.1:0").expect("failed to bind host socket");
984        let _ = stats_socket.set_read_timeout(Some(Duration::from_millis(500)));
985
986        let fake_agent = MockServer::start();
987        let _mock_traces = fake_agent.mock(|_, then| {
988            then.status(200)
989                .header("content-type", "application/json")
990                .body(r#"{ "rate_by_service": { "service:test,env:staging": 1.0, "service:test,env:prod": 0.3 } }"#);
991        });
992
993        let exporter = build_test_exporter(
994            fake_agent.url("/v0.4/traces"),
995            Some(stats_socket.local_addr().unwrap().to_string()),
996            TraceExporterInputFormat::V04,
997            TraceExporterOutputFormat::V04,
998            false,
999            true,
1000        );
1001
1002        let traces: Vec<Vec<SpanBytes>> = vec![
1003            vec![SpanBytes {
1004                name: BytesString::from_slice(b"test").unwrap(),
1005                ..Default::default()
1006            }],
1007            vec![SpanBytes {
1008                name: BytesString::from_slice(b"test2").unwrap(),
1009                ..Default::default()
1010            }],
1011        ];
1012        let data = msgpack_encoder::v04::to_vec(&traces);
1013
1014        let _result = exporter.send(data.as_ref()).expect("failed to send trace");
1015
1016        // Collect all metrics
1017        let mut received_metrics = Vec::new();
1018        for _ in 0..5 {
1019            received_metrics.push(read(&stats_socket));
1020        }
1021
1022        // Check that all expected metrics are present
1023        let expected_metrics = vec![
1024            format!(
1025                "datadog.tracer.exporter.deserialize.traces:2|c|#libdatadog_version:{}",
1026                env!("CARGO_PKG_VERSION")
1027            ),
1028            format!(
1029                "datadog.tracer.exporter.transport.traces.successful:2|c|#libdatadog_version:{}",
1030                env!("CARGO_PKG_VERSION")
1031            ),
1032            format!(
1033                "datadog.tracer.exporter.transport.sent.bytes:{}|d|#libdatadog_version:{}",
1034                data.len(),
1035                env!("CARGO_PKG_VERSION")
1036            ),
1037            format!(
1038                "datadog.tracer.exporter.transport.traces.sent:2|d|#libdatadog_version:{}",
1039                env!("CARGO_PKG_VERSION")
1040            ),
1041            format!(
1042                "datadog.tracer.exporter.transport.requests:1|d|#libdatadog_version:{}",
1043                env!("CARGO_PKG_VERSION")
1044            ),
1045        ];
1046
1047        for expected in expected_metrics {
1048            assert!(
1049                received_metrics.contains(&expected),
1050                "Expected metric '{expected}' not found in received metrics: {received_metrics:?}"
1051            );
1052        }
1053    }
1054
1055    #[test]
1056    #[cfg_attr(miri, ignore)]
1057    fn test_invalid_traces() {
1058        let stats_socket = net::UdpSocket::bind("127.0.0.1:0").expect("failed to bind host socket");
1059        let _ = stats_socket.set_read_timeout(Some(Duration::from_millis(500)));
1060
1061        let fake_agent = MockServer::start();
1062
1063        let exporter = build_test_exporter(
1064            fake_agent.url("/v0.4/traces"),
1065            Some(stats_socket.local_addr().unwrap().to_string()),
1066            TraceExporterInputFormat::V04,
1067            TraceExporterOutputFormat::V04,
1068            false,
1069            true,
1070        );
1071
1072        let bad_payload = b"some_bad_payload".as_ref();
1073        let result = exporter.send(bad_payload);
1074
1075        assert!(result.is_err());
1076
1077        assert_eq!(
1078            &format!(
1079                "datadog.tracer.exporter.deserialize.errors:1|c|#libdatadog_version:{}",
1080                env!("CARGO_PKG_VERSION")
1081            ),
1082            &read(&stats_socket)
1083        );
1084    }
1085
1086    #[test]
1087    #[cfg_attr(miri, ignore)]
1088    fn test_health_metrics_error() {
1089        let stats_socket = net::UdpSocket::bind("127.0.0.1:0").expect("failed to bind host socket");
1090        let _ = stats_socket.set_read_timeout(Some(Duration::from_millis(500)));
1091
1092        let fake_agent = MockServer::start();
1093        let _mock_traces = fake_agent.mock(|_, then| {
1094            then.status(400)
1095                .header("content-type", "application/json")
1096                .body("{}");
1097        });
1098
1099        let exporter = build_test_exporter(
1100            fake_agent.url("/v0.4/traces"),
1101            Some(stats_socket.local_addr().unwrap().to_string()),
1102            TraceExporterInputFormat::V04,
1103            TraceExporterOutputFormat::V04,
1104            false,
1105            true,
1106        );
1107
1108        let traces: Vec<Vec<SpanBytes>> = vec![vec![SpanBytes {
1109            name: BytesString::from_slice(b"test").unwrap(),
1110            ..Default::default()
1111        }]];
1112        let data = msgpack_encoder::v04::to_vec(&traces);
1113        let result = exporter.send(data.as_ref());
1114
1115        assert!(result.is_err());
1116
1117        // Collect all metrics
1118        let mut metrics = Vec::new();
1119        loop {
1120            let mut buf = [0; 1_000];
1121            match stats_socket.recv(&mut buf) {
1122                Ok(size) => {
1123                    let datagram = String::from_utf8_lossy(&buf[..size]);
1124                    metrics.push(datagram.to_string());
1125                }
1126                Err(_) => break, // Timeout, no more metrics
1127            }
1128        }
1129
1130        // Expected metrics
1131        let expected_deser = format!(
1132            "datadog.tracer.exporter.deserialize.traces:1|c|#libdatadog_version:{}",
1133            env!("CARGO_PKG_VERSION")
1134        );
1135        let expected_error = format!(
1136            "datadog.tracer.exporter.transport.traces.failed:1|c|#libdatadog_version:{},type:400",
1137            env!("CARGO_PKG_VERSION")
1138        );
1139        let expected_dropped = format!(
1140            "datadog.tracer.exporter.transport.dropped.bytes:{}|d|#libdatadog_version:{}",
1141            data.len(),
1142            env!("CARGO_PKG_VERSION")
1143        );
1144        let expected_sent_bytes = format!(
1145            "datadog.tracer.exporter.transport.sent.bytes:{}|d|#libdatadog_version:{}",
1146            data.len(),
1147            env!("CARGO_PKG_VERSION")
1148        );
1149        let expected_sent_traces = format!(
1150            "datadog.tracer.exporter.transport.traces.sent:1|d|#libdatadog_version:{}",
1151            env!("CARGO_PKG_VERSION")
1152        );
1153        let expected_dropped_traces = format!(
1154            "datadog.tracer.exporter.transport.traces.dropped:1|d|#libdatadog_version:{}",
1155            env!("CARGO_PKG_VERSION")
1156        );
1157        let expected_requests = format!(
1158            "datadog.tracer.exporter.transport.requests:5|d|#libdatadog_version:{}",
1159            env!("CARGO_PKG_VERSION")
1160        );
1161
1162        // Verify all expected metrics are present
1163        assert!(
1164            metrics.contains(&expected_deser),
1165            "Missing deser_traces metric. Got: {metrics:?}"
1166        );
1167        assert!(
1168            metrics.contains(&expected_error),
1169            "Missing send.traces.errors metric. Got: {metrics:?}"
1170        );
1171        assert!(
1172            metrics.contains(&expected_dropped),
1173            "Missing http.dropped.bytes metric. Got: {metrics:?}"
1174        );
1175        assert!(
1176            metrics.contains(&expected_dropped_traces),
1177            "Missing http.dropped.traces metric. Got: {metrics:?}"
1178        );
1179        assert!(
1180            metrics.contains(&expected_sent_bytes),
1181            "Missing http.sent.bytes metric. Got: {metrics:?}"
1182        );
1183        assert!(
1184            metrics.contains(&expected_sent_traces),
1185            "Missing http.sent.traces metric. Got: {metrics:?}"
1186        );
1187        assert!(
1188            metrics.contains(&expected_requests),
1189            "Missing http.requests metric. Got: {metrics:?}"
1190        );
1191    }
1192
1193    #[test]
1194    #[cfg_attr(miri, ignore)]
1195    fn test_health_metrics_dropped_bytes_exclusions() {
1196        let stats_socket = net::UdpSocket::bind("127.0.0.1:0").expect("failed to bind host socket");
1197        let _ = stats_socket.set_read_timeout(Some(Duration::from_millis(500)));
1198
1199        // Test 404 - should NOT emit http.dropped.bytes
1200        let fake_agent = MockServer::start();
1201        let _mock_traces = fake_agent.mock(|_, then| {
1202            then.status(404)
1203                .header("content-type", "application/json")
1204                .body("{}");
1205        });
1206
1207        let exporter = build_test_exporter(
1208            fake_agent.url("/v0.4/traces"),
1209            Some(stats_socket.local_addr().unwrap().to_string()),
1210            TraceExporterInputFormat::V04,
1211            TraceExporterOutputFormat::V04,
1212            false,
1213            true,
1214        );
1215
1216        let traces: Vec<Vec<SpanBytes>> = vec![vec![SpanBytes {
1217            name: BytesString::from_slice(b"test").unwrap(),
1218            ..Default::default()
1219        }]];
1220        let data = msgpack_encoder::v04::to_vec(&traces);
1221        let result = exporter.send(data.as_ref());
1222
1223        assert!(result.is_err());
1224
1225        // Collect all metrics
1226        let mut received_metrics = Vec::new();
1227        for _ in 0..5 {
1228            received_metrics.push(read(&stats_socket));
1229        }
1230
1231        // Expected metrics for 404 error
1232        let expected_deser = format!(
1233            "datadog.tracer.exporter.deserialize.traces:1|c|#libdatadog_version:{}",
1234            env!("CARGO_PKG_VERSION")
1235        );
1236        let expected_error = format!(
1237            "datadog.tracer.exporter.transport.traces.failed:1|c|#libdatadog_version:{},type:404",
1238            env!("CARGO_PKG_VERSION")
1239        );
1240        let expected_sent_bytes = format!(
1241            "datadog.tracer.exporter.transport.sent.bytes:{}|d|#libdatadog_version:{}",
1242            data.len(),
1243            env!("CARGO_PKG_VERSION")
1244        );
1245        let expected_sent_traces = format!(
1246            "datadog.tracer.exporter.transport.traces.sent:1|d|#libdatadog_version:{}",
1247            env!("CARGO_PKG_VERSION")
1248        );
1249        let expected_requests = format!(
1250            "datadog.tracer.exporter.transport.requests:5|d|#libdatadog_version:{}",
1251            env!("CARGO_PKG_VERSION")
1252        );
1253
1254        // Should emit these metrics
1255        assert!(
1256            received_metrics.contains(&expected_deser),
1257            "Missing deser_traces metric. Got: {received_metrics:?}"
1258        );
1259        assert!(
1260            received_metrics.contains(&expected_error),
1261            "Missing send.traces.errors metric. Got: {received_metrics:?}"
1262        );
1263        assert!(
1264            received_metrics.contains(&expected_sent_bytes),
1265            "Missing http.sent.bytes metric. Got: {received_metrics:?}"
1266        );
1267        assert!(
1268            received_metrics.contains(&expected_sent_traces),
1269            "Missing http.sent.traces metric. Got: {received_metrics:?}"
1270        );
1271        assert!(
1272            received_metrics.contains(&expected_requests),
1273            "Missing http.requests metric. Got: {received_metrics:?}"
1274        );
1275
1276        // Should NOT emit http.dropped.bytes for 404
1277        let dropped_bytes_metric = format!(
1278            "datadog.tracer.exporter.transport.dropped.bytes:{}|d|#libdatadog_version:{}",
1279            data.len(),
1280            env!("CARGO_PKG_VERSION")
1281        );
1282        assert!(
1283            !received_metrics.contains(&dropped_bytes_metric),
1284            "Should not emit http.dropped.bytes for 404. Got: {received_metrics:?}"
1285        );
1286
1287        // Should NOT emit http.dropped.traces for 404
1288        let dropped_traces_metric = format!(
1289            "datadog.tracer.exporter.transport.traces.dropped:1|d|#libdatadog_version:{}",
1290            env!("CARGO_PKG_VERSION")
1291        );
1292        assert!(
1293            !received_metrics.contains(&dropped_traces_metric),
1294            "Should not emit http.dropped.traces for 404. Got: {received_metrics:?}"
1295        );
1296    }
1297
1298    #[test]
1299    #[cfg_attr(miri, ignore)]
1300    fn test_health_metrics_disabled() {
1301        let stats_socket = net::UdpSocket::bind("127.0.0.1:0").expect("failed to bind host socket");
1302        let _ = stats_socket.set_read_timeout(Some(Duration::from_millis(500)));
1303
1304        let fake_agent = MockServer::start();
1305        let _mock_traces = fake_agent.mock(|_, then| {
1306            then.status(200)
1307                .header("content-type", "application/json")
1308                .body(r#"{ "rate_by_service": { "service:test,env:staging": 1.0 } }"#);
1309        });
1310
1311        let exporter = build_test_exporter(
1312            fake_agent.url("/v0.4/traces"),
1313            Some(stats_socket.local_addr().unwrap().to_string()),
1314            TraceExporterInputFormat::V04,
1315            TraceExporterOutputFormat::V04,
1316            false,
1317            false, // Health metrics disabled
1318        );
1319
1320        let traces: Vec<Vec<SpanBytes>> = vec![vec![SpanBytes {
1321            name: BytesString::from_slice(b"test").unwrap(),
1322            ..Default::default()
1323        }]];
1324        let data = msgpack_encoder::v04::to_vec(&traces);
1325
1326        let _result = exporter.send(data.as_ref()).expect("failed to send trace");
1327
1328        // Try to read metrics - should timeout since none are sent
1329        let mut buf = [0; 1_000];
1330        match stats_socket.recv(&mut buf) {
1331            Ok(_) => {
1332                let datagram = String::from_utf8_lossy(buf.as_ref());
1333                let received = datagram.trim_matches(char::from(0)).to_string();
1334                panic!(
1335                    "Expected no metrics when health metrics disabled, but received: {received}"
1336                );
1337            }
1338            Err(e)
1339                if e.kind() == std::io::ErrorKind::WouldBlock
1340                    || e.kind() == std::io::ErrorKind::TimedOut
1341                    || e.kind() == std::io::ErrorKind::Interrupted =>
1342            {
1343                // This is expected - no metrics should be sent when disabled.
1344                // WouldBlock on Unix, TimedOut on Windows.
1345                // Interrupted can occur when signals interrupt the blocking
1346                // recvfrom() syscall before the timeout expires.
1347            }
1348            Err(e) => panic!("Unexpected error reading from socket: {e}"),
1349        }
1350    }
1351
1352    #[test]
1353    #[cfg_attr(miri, ignore)]
1354    fn test_agent_response_parse_default() {
1355        let server = MockServer::start();
1356        let _agent = server.mock(|_, then| {
1357            then.status(200)
1358                .header("content-type", "application/json")
1359                .body(
1360                    r#"{
1361                    "rate_by_service": {
1362                        "service:foo,env:staging": 1.0,
1363                        "service:,env:": 0.8
1364                    }
1365                }"#,
1366                );
1367        });
1368
1369        let mut builder = TraceExporterBuilder::default();
1370        builder
1371            .set_url(&server.url("/"))
1372            .set_service("foo")
1373            .set_env("foo-env")
1374            .set_tracer_version("v0.1")
1375            .set_language("nodejs")
1376            .set_language_version("1.0")
1377            .set_language_interpreter("v8");
1378        let exporter = builder.build().unwrap();
1379
1380        let traces: Vec<Vec<SpanBytes>> = vec![vec![SpanBytes {
1381            name: BytesString::from_slice(b"test").unwrap(),
1382            ..Default::default()
1383        }]];
1384        let data = msgpack_encoder::v04::to_vec(&traces);
1385        let result = exporter.send(data.as_ref()).unwrap();
1386
1387        assert_eq!(
1388            result,
1389            AgentResponse::Changed {
1390                body: r#"{
1391                    "rate_by_service": {
1392                        "service:foo,env:staging": 1.0,
1393                        "service:,env:": 0.8
1394                    }
1395                }"#
1396                .to_string()
1397            }
1398        );
1399    }
1400
1401    #[test]
1402    #[cfg_attr(miri, ignore)]
1403    fn test_agent_response_error() {
1404        let server = MockServer::start();
1405        let _agent = server.mock(|_, then| {
1406            then.status(500)
1407                .header("content-type", "application/json")
1408                .body(r#"{ "error": "Unavailable" }"#);
1409        });
1410
1411        let mut builder = TraceExporterBuilder::default();
1412        builder
1413            .set_url(&server.url("/"))
1414            .set_service("foo")
1415            .set_env("foo-env")
1416            .set_tracer_version("v0.1")
1417            .set_language("nodejs")
1418            .set_language_version("1.0")
1419            .set_language_interpreter("v8");
1420        let exporter = builder.build().unwrap();
1421
1422        let traces: Vec<Vec<SpanBytes>> = vec![vec![SpanBytes {
1423            name: BytesString::from_slice(b"test").unwrap(),
1424            ..Default::default()
1425        }]];
1426        let data = msgpack_encoder::v04::to_vec(&traces);
1427        let code = match exporter.send(data.as_ref()).unwrap_err() {
1428            TraceExporterError::Request(e) => Some(e.status()),
1429            _ => None,
1430        }
1431        .unwrap();
1432
1433        assert_eq!(code, 500);
1434    }
1435
1436    #[test]
1437    #[cfg_attr(miri, ignore)]
1438    fn test_agent_empty_response_error() {
1439        let server = MockServer::start();
1440        let _agent = server.mock(|_, then| {
1441            then.status(200)
1442                .header("content-type", "application/json")
1443                .body("");
1444        });
1445
1446        let mut builder = TraceExporterBuilder::default();
1447        builder
1448            .set_url(&server.url("/"))
1449            .set_service("foo")
1450            .set_env("foo-env")
1451            .set_tracer_version("v0.1")
1452            .set_language("nodejs")
1453            .set_language_version("1.0")
1454            .set_language_interpreter("v8");
1455        let exporter = builder.build().unwrap();
1456
1457        let traces: Vec<Vec<SpanBytes>> = vec![vec![SpanBytes {
1458            name: BytesString::from_slice(b"test").unwrap(),
1459            ..Default::default()
1460        }]];
1461        let data = msgpack_encoder::v04::to_vec(&traces);
1462        let err = exporter.send(data.as_ref());
1463
1464        assert!(err.is_err());
1465        assert_eq!(
1466            match err.unwrap_err() {
1467                TraceExporterError::Agent(e) => Some(e),
1468                _ => None,
1469            },
1470            Some(AgentErrorKind::EmptyResponse)
1471        );
1472    }
1473
1474    #[test]
1475    #[cfg_attr(miri, ignore)]
1476    fn test_exporter_metrics_v4() {
1477        let server = MockServer::start();
1478        let response_body = r#"{
1479                        "rate_by_service": {
1480                            "service:foo,env:staging": 1.0,
1481                            "service:,env:": 0.8
1482                        }
1483                    }"#;
1484        let traces_endpoint = server.mock(|when, then| {
1485            when.method(POST).path("/v0.4/traces");
1486            then.status(200)
1487                .header("content-type", "application/json")
1488                .body(response_body);
1489        });
1490
1491        let metrics_endpoint = server.mock(|when, then| {
1492            when.method(POST)
1493                .body_includes("\"metric\":\"trace_api.bytes\"")
1494                .path("/telemetry/proxy/api/v2/apmtelemetry");
1495            then.status(200)
1496                .header("content-type", "application/json")
1497                .body("");
1498        });
1499
1500        let mut builder = TraceExporterBuilder::default();
1501        builder
1502            .set_url(&server.url("/"))
1503            .set_service("foo")
1504            .set_env("foo-env")
1505            .set_tracer_version("v0.1")
1506            .set_language("nodejs")
1507            .set_language_version("1.0")
1508            .set_language_interpreter("v8")
1509            .enable_telemetry(TelemetryConfig {
1510                heartbeat: 100,
1511                ..Default::default()
1512            });
1513        let exporter = builder.build().unwrap();
1514
1515        let traces = vec![0x90];
1516        let result = exporter.send(traces.as_ref()).unwrap();
1517        let AgentResponse::Changed { body } = result else {
1518            panic!("Expected Changed response");
1519        };
1520        assert_eq!(body, response_body);
1521
1522        traces_endpoint.assert_calls(1);
1523        while metrics_endpoint.calls() == 0 {
1524            exporter
1525                .runtime
1526                .lock()
1527                .unwrap()
1528                .as_ref()
1529                .unwrap()
1530                .block_on(async {
1531                    sleep(Duration::from_millis(100)).await;
1532                })
1533        }
1534        metrics_endpoint.assert_calls(1);
1535    }
1536
1537    #[test]
1538    #[cfg_attr(miri, ignore)]
1539    fn test_exporter_metrics_v5() {
1540        let server = MockServer::start();
1541        let response_body = r#"{
1542                        "rate_by_service": {
1543                            "service:foo,env:staging": 1.0,
1544                            "service:,env:": 0.8
1545                        }
1546                    }"#;
1547        let traces_endpoint = server.mock(|when, then| {
1548            when.method(POST).path("/v0.5/traces");
1549            then.status(200)
1550                .header("content-type", "application/json")
1551                .body(response_body);
1552        });
1553
1554        let metrics_endpoint = server.mock(|when, then| {
1555            when.method(POST)
1556                .body_includes("\"metric\":\"trace_api.bytes\"")
1557                .path("/telemetry/proxy/api/v2/apmtelemetry");
1558            then.status(200)
1559                .header("content-type", "application/json")
1560                .body("");
1561        });
1562
1563        let exporter = build_test_exporter(
1564            server.url("/"),
1565            None,
1566            TraceExporterInputFormat::V05,
1567            TraceExporterOutputFormat::V05,
1568            true,
1569            true,
1570        );
1571
1572        let v5: (Vec<BytesString>, Vec<Vec<v05::Span>>) = (vec![], vec![]);
1573        let traces = rmp_serde::to_vec(&v5).unwrap();
1574        let result = exporter.send(traces.as_ref()).unwrap();
1575        let AgentResponse::Changed { body } = result else {
1576            panic!("Expected Changed response");
1577        };
1578        assert_eq!(body, response_body);
1579
1580        traces_endpoint.assert_calls(1);
1581        while metrics_endpoint.calls() == 0 {
1582            exporter
1583                .runtime
1584                .lock()
1585                .unwrap()
1586                .as_ref()
1587                .unwrap()
1588                .block_on(async {
1589                    sleep(Duration::from_millis(100)).await;
1590                })
1591        }
1592        metrics_endpoint.assert_calls(1);
1593    }
1594
1595    #[test]
1596    #[cfg_attr(miri, ignore)]
1597    fn test_exporter_metrics_v4_to_v5() {
1598        let server = MockServer::start();
1599        let response_body = r#"{
1600                        "rate_by_service": {
1601                            "service:foo,env:staging": 1.0,
1602                            "service:,env:": 0.8
1603                        }
1604                    }"#;
1605        let traces_endpoint = server.mock(|when, then| {
1606            when.method(POST).path("/v0.5/traces").is_true(|req| {
1607                let bytes = libdd_tinybytes::Bytes::copy_from_slice(req.body_ref());
1608                bytes.to_vec() == V5_EMPTY
1609            });
1610            then.status(200)
1611                .header("content-type", "application/json")
1612                .body(response_body);
1613        });
1614
1615        let metrics_endpoint = server.mock(|when, then| {
1616            when.method(POST)
1617                .body_includes("\"metric\":\"trace_api.bytes\"")
1618                .path("/telemetry/proxy/api/v2/apmtelemetry");
1619            then.status(200)
1620                .header("content-type", "application/json")
1621                .body("");
1622        });
1623
1624        let mut builder = TraceExporterBuilder::default();
1625        builder
1626            .set_url(&server.url("/"))
1627            .set_service("foo")
1628            .set_env("foo-env")
1629            .set_tracer_version("v0.1")
1630            .set_language("nodejs")
1631            .set_language_version("1.0")
1632            .set_language_interpreter("v8")
1633            .enable_telemetry(TelemetryConfig {
1634                heartbeat: 100,
1635                ..Default::default()
1636            })
1637            .set_input_format(TraceExporterInputFormat::V04)
1638            .set_output_format(TraceExporterOutputFormat::V05);
1639
1640        let exporter = builder.build().unwrap();
1641
1642        let traces = vec![0x90];
1643        let result = exporter.send(traces.as_ref()).unwrap();
1644        let AgentResponse::Changed { body } = result else {
1645            panic!("Expected Changed response");
1646        };
1647        assert_eq!(body, response_body);
1648
1649        traces_endpoint.assert_calls(1);
1650        while metrics_endpoint.calls() == 0 {
1651            exporter
1652                .runtime
1653                .lock()
1654                .unwrap()
1655                .as_ref()
1656                .unwrap()
1657                .block_on(async {
1658                    sleep(Duration::from_millis(100)).await;
1659                })
1660        }
1661        metrics_endpoint.assert_calls(1);
1662    }
1663
1664    #[test]
1665    #[cfg_attr(miri, ignore)]
1666    /// Tests that if agent_response_payload_version is not enabled
1667    /// the exporter always returns the response body
1668    fn test_agent_response_payload_version_disabled() {
1669        let server = MockServer::start();
1670        let response_body = r#"{
1671                        "rate_by_service": {
1672                            "service:foo,env:staging": 1.0,
1673                            "service:,env:": 0.8
1674                        }
1675                    }"#;
1676        let traces_endpoint = server.mock(|when, then| {
1677            when.method(POST).path("/v0.4/traces");
1678            then.status(200)
1679                .header("content-type", "application/json")
1680                .header("datadog-rates-payload-version", "abc")
1681                .body(response_body);
1682        });
1683
1684        let mut builder = TraceExporterBuilder::default();
1685        builder.set_url(&server.url("/"));
1686        let exporter = builder.build().unwrap();
1687        let traces = vec![0x90];
1688        for _ in 0..2 {
1689            let result = exporter.send(traces.as_ref()).unwrap();
1690            let AgentResponse::Changed { body } = result else {
1691                panic!("Expected Changed response");
1692            };
1693            assert_eq!(body, response_body);
1694        }
1695        traces_endpoint.assert_calls(2);
1696    }
1697
1698    #[test]
1699    #[cfg_attr(miri, ignore)]
1700    /// Tests that if agent_response_payload_version is enabled
1701    /// the exporter returns the response body only once
1702    /// and then returns Unchanged response until the payload version header changes
1703    fn test_agent_response_payload_version() {
1704        let server = MockServer::start();
1705        let response_body = r#"{
1706                        "rate_by_service": {
1707                            "service:foo,env:staging": 1.0,
1708                            "service:,env:": 0.8
1709                        }
1710                    }"#;
1711        let mut traces_endpoint = server.mock(|when, then| {
1712            when.method(POST).path("/v0.4/traces");
1713            then.status(200)
1714                .header("content-type", "application/json")
1715                .header("datadog-rates-payload-version", "abc")
1716                .body(response_body);
1717        });
1718
1719        let mut builder = TraceExporterBuilder::default();
1720        builder
1721            .set_url(&server.url("/"))
1722            .enable_agent_rates_payload_version();
1723        let exporter = builder.build().unwrap();
1724        let traces = vec![0x90];
1725        let result = exporter.send(traces.as_ref()).unwrap();
1726        let AgentResponse::Changed { body } = result else {
1727            panic!("Expected Changed response");
1728        };
1729        assert_eq!(body, response_body);
1730
1731        let result = exporter.send(traces.as_ref()).unwrap();
1732        let AgentResponse::Unchanged = result else {
1733            panic!("Expected Unchanged response");
1734        };
1735        traces_endpoint.assert_calls(2);
1736        traces_endpoint.delete();
1737
1738        let traces_endpoint = server.mock(|when, then| {
1739            when.method(POST).path("/v0.4/traces");
1740            then.status(200)
1741                .header("content-type", "application/json")
1742                .header("datadog-rates-payload-version", "def")
1743                .body(response_body);
1744        });
1745        let result = exporter.send(traces.as_ref()).unwrap();
1746        let AgentResponse::Changed { body } = result else {
1747            panic!("Expected Changed response");
1748        };
1749        assert_eq!(body, response_body);
1750
1751        let result = exporter.send(traces.as_ref()).unwrap();
1752        let AgentResponse::Unchanged = result else {
1753            panic!("Expected Unchanged response");
1754        };
1755        traces_endpoint.assert_calls(2);
1756    }
1757
1758    #[test]
1759    #[cfg_attr(miri, ignore)]
1760    fn test_agent_malfunction_info_4xx() {
1761        test_agent_malfunction_info(404, r#"{"error":"Not Found"}"#, Duration::from_secs(0));
1762    }
1763
1764    #[test]
1765    #[cfg_attr(miri, ignore)]
1766    fn test_agent_malfunction_info_5xx() {
1767        test_agent_malfunction_info(
1768            500,
1769            r#"{"error":"Internal Server Error"}"#,
1770            Duration::from_secs(0),
1771        );
1772    }
1773
1774    #[test]
1775    #[cfg_attr(miri, ignore)]
1776    fn test_agent_malfunction_info_timeout() {
1777        test_agent_malfunction_info(
1778            408,
1779            r#"{"error":"Internal Server Error"}"#,
1780            Duration::from_secs(600),
1781        );
1782    }
1783
1784    #[test]
1785    #[cfg_attr(miri, ignore)]
1786    fn test_agent_malfunction_info_wrong_answer() {
1787        test_agent_malfunction_info(200, "WRONG_ANSWER", Duration::from_secs(0));
1788    }
1789
1790    fn test_agent_malfunction_info(status: u16, response: &str, delay: Duration) {
1791        let server = MockServer::start();
1792
1793        let mock_traces = server.mock(|when, then| {
1794            when.method(POST)
1795                .header("Content-type", "application/msgpack")
1796                .path("/v0.4/traces");
1797            then.status(200).body(
1798                r#"{
1799                    "rate_by_service": {
1800                        "service:test,env:staging": 1.0,
1801                    }
1802                }"#,
1803            );
1804        });
1805
1806        let mock_info = server.mock(|when, then| {
1807            when.method(GET).path("/info");
1808            then.delay(delay).status(status).body(response);
1809        });
1810
1811        let mut builder = TraceExporterBuilder::default();
1812        builder
1813            .set_url(&server.url("/"))
1814            .set_service("test")
1815            .set_env("staging")
1816            .set_tracer_version("v0.1")
1817            .set_language("nodejs")
1818            .set_language_version("1.0")
1819            .set_language_interpreter("v8")
1820            .set_input_format(TraceExporterInputFormat::V04)
1821            .set_output_format(TraceExporterOutputFormat::V04)
1822            .enable_stats(Duration::from_secs(10));
1823        let exporter = builder.build().unwrap();
1824
1825        let trace_chunk = vec![SpanBytes {
1826            duration: 10,
1827            ..Default::default()
1828        }];
1829
1830        let data = msgpack_encoder::v04::to_vec(&[trace_chunk]);
1831
1832        // Wait for the info fetcher to get the config
1833        while mock_info.calls() == 0 {
1834            exporter
1835                .runtime
1836                .lock()
1837                .unwrap()
1838                .as_ref()
1839                .unwrap()
1840                .block_on(async {
1841                    sleep(Duration::from_millis(100)).await;
1842                })
1843        }
1844
1845        let _ = exporter.send(data.as_ref()).unwrap();
1846
1847        exporter.shutdown(None).unwrap();
1848
1849        mock_traces.assert();
1850    }
1851
1852    #[test]
1853    #[cfg_attr(miri, ignore)]
1854    fn test_connection_timeout() {
1855        let exporter = TraceExporterBuilder::default().build().unwrap();
1856
1857        assert_eq!(exporter.endpoint.timeout_ms, Endpoint::default().timeout_ms);
1858
1859        let timeout = Some(42);
1860        let mut builder = TraceExporterBuilder::default();
1861        builder.set_connection_timeout(timeout);
1862
1863        let exporter = builder.build().unwrap();
1864
1865        assert_eq!(exporter.endpoint.timeout_ms, 42);
1866    }
1867
1868    #[test]
1869    #[cfg_attr(miri, ignore)]
1870    fn stop_and_start_runtime() {
1871        let builder = TraceExporterBuilder::default();
1872        let exporter = builder.build().unwrap();
1873        exporter.stop_worker();
1874        exporter.run_worker().unwrap();
1875    }
1876}
1877
1878#[cfg(test)]
1879mod single_threaded_tests {
1880    use super::*;
1881    use crate::agent_info;
1882    use httpmock::prelude::*;
1883    use libdd_trace_utils::msgpack_encoder;
1884    use libdd_trace_utils::span::v04::SpanBytes;
1885    use std::time::Duration;
1886    use tokio::time::sleep;
1887
1888    #[cfg_attr(miri, ignore)]
1889    #[test]
1890    fn test_shutdown() {
1891        // Clear the agent info cache to ensure test isolation
1892        agent_info::clear_cache_for_test();
1893
1894        let server = MockServer::start();
1895
1896        let mock_traces = server.mock(|when, then| {
1897            when.method(POST)
1898                .header("Content-type", "application/msgpack")
1899                .path("/v0.4/traces");
1900            then.status(200).body("");
1901        });
1902
1903        let mock_stats = server.mock(|when, then| {
1904            when.method(POST)
1905                .header("Content-type", "application/msgpack")
1906                .path("/v0.6/stats");
1907            then.status(200).body("");
1908        });
1909
1910        let _mock_info = server.mock(|when, then| {
1911            when.method(GET).path("/info");
1912            then.status(200)
1913                .header("content-type", "application/json")
1914                .header("datadog-agent-state", "1")
1915                .body(r#"{"version":"1","client_drop_p0s":true,"endpoints":["/v0.4/traces","/v0.6/stats"]}"#);
1916        });
1917
1918        let mut builder = TraceExporterBuilder::default();
1919        builder
1920            .set_url(&server.url("/"))
1921            .set_service("test")
1922            .set_env("staging")
1923            .set_tracer_version("v0.1")
1924            .set_language("nodejs")
1925            .set_language_version("1.0")
1926            .set_language_interpreter("v8")
1927            .set_input_format(TraceExporterInputFormat::V04)
1928            .set_output_format(TraceExporterOutputFormat::V04)
1929            .enable_stats(Duration::from_secs(10));
1930        let exporter = builder.build().unwrap();
1931
1932        let trace_chunk = vec![SpanBytes {
1933            duration: 10,
1934            ..Default::default()
1935        }];
1936
1937        let data = msgpack_encoder::v04::to_vec(&[trace_chunk]);
1938
1939        // Wait for the info fetcher to get the config
1940        while agent_info::get_agent_info().is_none() {
1941            exporter
1942                .runtime
1943                .lock()
1944                .unwrap()
1945                .as_ref()
1946                .unwrap()
1947                .block_on(async {
1948                    sleep(Duration::from_millis(100)).await;
1949                })
1950        }
1951
1952        let result = exporter.send(data.as_ref());
1953        // Error received because server is returning an empty body.
1954        assert!(result.is_err());
1955
1956        // Wait for the stats worker to be active before shutting down to avoid potential flaky
1957        // tests on CI where we shutdown before the stats worker had time to start
1958        let start_time = std::time::Instant::now();
1959        while !exporter.is_stats_worker_active() {
1960            if start_time.elapsed() > Duration::from_secs(10) {
1961                panic!("Timeout waiting for stats worker to become active");
1962            }
1963            std::thread::sleep(Duration::from_millis(10));
1964        }
1965
1966        exporter.shutdown(None).unwrap();
1967
1968        // Wait for the mock server to process the stats
1969        for _ in 0..1000 {
1970            if mock_traces.calls() > 0 && mock_stats.calls() > 0 {
1971                break;
1972            } else {
1973                std::thread::sleep(Duration::from_millis(10));
1974            }
1975        }
1976
1977        mock_traces.assert();
1978        mock_stats.assert();
1979    }
1980
1981    #[cfg_attr(miri, ignore)]
1982    #[test]
1983    fn test_shutdown_with_timeout() {
1984        // Clear the agent info cache to ensure test isolation
1985        agent_info::clear_cache_for_test();
1986
1987        let server = MockServer::start();
1988
1989        let mock_traces = server.mock(|when, then| {
1990            when.method(POST)
1991                .header("Content-type", "application/msgpack")
1992                .path("/v0.4/traces");
1993            then.status(200).body(
1994                r#"{
1995                    "rate_by_service": {
1996                        "service:foo,env:staging": 1.0,
1997                        "service:,env:": 0.8
1998                    }
1999                }"#,
2000            );
2001        });
2002
2003        let _mock_stats = server.mock(|when, then| {
2004            when.method(POST)
2005                .header("Content-type", "application/msgpack")
2006                .path("/v0.6/stats");
2007            then.delay(Duration::from_secs(10)).status(200).body("");
2008        });
2009
2010        let _mock_info = server.mock(|when, then| {
2011            when.method(GET).path("/info");
2012            then.status(200)
2013                .header("content-type", "application/json")
2014                .header("datadog-agent-state", "1")
2015                .body(r#"{"version":"1","client_drop_p0s":true,"endpoints":["/v0.4/traces","/v0.6/stats"]}"#);
2016        });
2017
2018        let mut builder = TraceExporterBuilder::default();
2019        builder
2020            .set_url(&server.url("/"))
2021            .set_service("test")
2022            .set_env("staging")
2023            .set_tracer_version("v0.1")
2024            .set_language("nodejs")
2025            .set_language_version("1.0")
2026            .set_language_interpreter("v8")
2027            .set_input_format(TraceExporterInputFormat::V04)
2028            .set_output_format(TraceExporterOutputFormat::V04)
2029            .enable_stats(Duration::from_secs(10));
2030        let exporter = builder.build().unwrap();
2031
2032        let trace_chunk = vec![SpanBytes {
2033            service: "test".into(),
2034            name: "test".into(),
2035            resource: "test".into(),
2036            r#type: "test".into(),
2037            duration: 10,
2038            ..Default::default()
2039        }];
2040
2041        let data = msgpack_encoder::v04::to_vec(&[trace_chunk]);
2042
2043        // Wait for agent_info to be present so that sending a trace will trigger the stats worker
2044        // to start
2045        while agent_info::get_agent_info().is_none() {
2046            exporter
2047                .runtime
2048                .lock()
2049                .unwrap()
2050                .as_ref()
2051                .unwrap()
2052                .block_on(async {
2053                    sleep(Duration::from_millis(100)).await;
2054                })
2055        }
2056
2057        exporter.send(data.as_ref()).unwrap();
2058
2059        // Wait for the stats worker to be active before shutting down to avoid potential flaky
2060        // tests on CI where we shutdown before the stats worker had time to start
2061        let start_time = std::time::Instant::now();
2062        while !exporter.is_stats_worker_active() {
2063            if start_time.elapsed() > Duration::from_secs(10) {
2064                panic!("Timeout waiting for stats worker to become active");
2065            }
2066            std::thread::sleep(Duration::from_millis(10));
2067        }
2068
2069        exporter
2070            .shutdown(Some(Duration::from_millis(5)))
2071            .unwrap_err(); // The shutdown should timeout
2072
2073        mock_traces.assert();
2074    }
2075}