1pub mod agent_response;
4pub mod builder;
5pub mod error;
6pub mod metrics;
7pub mod stats;
8mod trace_serializer;
9
10pub use builder::TraceExporterBuilder;
12
13use self::agent_response::AgentResponse;
14use self::metrics::MetricsEmitter;
15use self::stats::StatsComputationStatus;
16use self::trace_serializer::TraceSerializer;
17use crate::agent_info::{AgentInfoFetcher, ResponseObserver};
18use crate::pausable_worker::PausableWorker;
19use crate::stats_exporter::StatsExporter;
20use crate::telemetry::{SendPayloadTelemetry, TelemetryClient};
21use crate::trace_exporter::agent_response::{
22 AgentResponsePayloadVersion, DATADOG_RATES_PAYLOAD_VERSION_HEADER,
23};
24use crate::trace_exporter::error::{InternalErrorKind, RequestError, TraceExporterError};
25use crate::{
26 agent_info::{self, schema::AgentInfo},
27 health_metrics,
28 health_metrics::{HealthMetric, SendResult, TransportErrorType},
29};
30use arc_swap::{ArcSwap, ArcSwapOption};
31use http::uri::PathAndQuery;
32use http::Uri;
33use http_body_util::BodyExt;
34use libdd_common::tag::Tag;
35use libdd_common::{http_common, Endpoint};
36use libdd_common::{HttpClient, MutexExt};
37use libdd_dogstatsd_client::Client;
38use libdd_telemetry::worker::TelemetryWorker;
39use libdd_trace_utils::msgpack_decoder;
40use libdd_trace_utils::send_with_retry::{
41 send_with_retry, RetryStrategy, SendWithRetryError, SendWithRetryResult,
42};
43use libdd_trace_utils::span::{v04::Span, TraceData};
44use libdd_trace_utils::trace_utils::TracerHeaderTags;
45use std::io;
46use std::sync::{Arc, Mutex};
47use std::time::Duration;
48use std::{borrow::Borrow, collections::HashMap, str::FromStr};
49use tokio::runtime::Runtime;
50use tracing::{debug, error, warn};
51
52const INFO_ENDPOINT: &str = "/info";
53
54#[derive(Copy, Clone, Debug, Default, PartialEq)]
57#[repr(C)]
58pub enum TraceExporterInputFormat {
59 #[allow(missing_docs)]
60 #[default]
61 V04,
62 V05,
63}
64
65#[derive(Copy, Clone, Debug, Default, PartialEq)]
68#[repr(C)]
69pub enum TraceExporterOutputFormat {
70 #[allow(missing_docs)]
71 #[default]
72 V04,
73 V05,
74}
75
76impl TraceExporterOutputFormat {
77 fn add_path(&self, url: &Uri) -> Uri {
79 add_path(
80 url,
81 match self {
82 TraceExporterOutputFormat::V04 => "/v0.4/traces",
83 TraceExporterOutputFormat::V05 => "/v0.5/traces",
84 },
85 )
86 }
87}
88
89fn add_path(url: &Uri, path: &str) -> Uri {
96 let p_and_q = url.path_and_query();
97
98 #[allow(clippy::unwrap_used)]
99 let new_p_and_q = match p_and_q {
100 Some(pq) => {
101 let p = pq.path();
102 let mut p = p.strip_suffix('/').unwrap_or(p).to_owned();
103 p.push_str(path);
104
105 PathAndQuery::from_str(p.as_str())
106 }
107 None => PathAndQuery::from_str(path),
108 }
109 .unwrap();
111 let mut parts = url.clone().into_parts();
112 parts.path_and_query = Some(new_p_and_q);
113 #[allow(clippy::unwrap_used)]
115 Uri::from_parts(parts).unwrap()
116}
117
118#[derive(Clone, Default, Debug)]
119pub struct TracerMetadata {
120 pub hostname: String,
121 pub env: String,
122 pub app_version: String,
123 pub runtime_id: String,
124 pub service: String,
125 pub tracer_version: String,
126 pub language: String,
127 pub language_version: String,
128 pub language_interpreter: String,
129 pub language_interpreter_vendor: String,
130 pub git_commit_sha: String,
131 pub process_tags: String,
132 pub client_computed_stats: bool,
133 pub client_computed_top_level: bool,
134}
135
136impl<'a> From<&'a TracerMetadata> for TracerHeaderTags<'a> {
137 fn from(tags: &'a TracerMetadata) -> TracerHeaderTags<'a> {
138 TracerHeaderTags::<'_> {
139 lang: &tags.language,
140 lang_version: &tags.language_version,
141 tracer_version: &tags.tracer_version,
142 lang_interpreter: &tags.language_interpreter,
143 lang_vendor: &tags.language_interpreter_vendor,
144 client_computed_stats: tags.client_computed_stats,
145 client_computed_top_level: tags.client_computed_top_level,
146 ..Default::default()
147 }
148 }
149}
150
151impl<'a> From<&'a TracerMetadata> for HashMap<&'static str, String> {
152 fn from(tags: &'a TracerMetadata) -> HashMap<&'static str, String> {
153 TracerHeaderTags::from(tags).into()
154 }
155}
156
157#[derive(Debug)]
158pub(crate) struct TraceExporterWorkers {
159 pub info: PausableWorker<AgentInfoFetcher>,
160 pub stats: Option<PausableWorker<StatsExporter>>,
161 pub telemetry: Option<PausableWorker<TelemetryWorker>>,
162}
163
164#[allow(missing_docs)]
183enum DeserInputFormat {
184 V04,
185 V05,
186}
187
188#[derive(Debug)]
189pub struct TraceExporter {
190 endpoint: Endpoint,
191 metadata: TracerMetadata,
192 input_format: TraceExporterInputFormat,
193 output_format: TraceExporterOutputFormat,
194 runtime: Arc<Mutex<Option<Arc<Runtime>>>>,
196 dogstatsd: Option<Client>,
198 common_stats_tags: Vec<Tag>,
199 client_computed_top_level: bool,
200 client_side_stats: ArcSwap<StatsComputationStatus>,
201 previous_info_state: ArcSwapOption<String>,
202 info_response_observer: ResponseObserver,
203 telemetry: Option<TelemetryClient>,
204 health_metrics_enabled: bool,
205 workers: Arc<Mutex<TraceExporterWorkers>>,
206 agent_payload_response_version: Option<AgentResponsePayloadVersion>,
207 http_client: HttpClient,
208}
209
210impl TraceExporter {
211 #[allow(missing_docs)]
212 pub fn builder() -> TraceExporterBuilder {
213 TraceExporterBuilder::default()
214 }
215
216 fn runtime(&self) -> Result<Arc<Runtime>, TraceExporterError> {
218 let mut runtime_guard = self.runtime.lock_or_panic();
219 match runtime_guard.as_ref() {
220 Some(runtime) => {
221 Ok(runtime.clone())
223 }
224 None => {
225 let runtime = Arc::new(
227 tokio::runtime::Builder::new_multi_thread()
228 .worker_threads(1)
229 .enable_all()
230 .build()?,
231 );
232 *runtime_guard = Some(runtime.clone());
233 self.start_all_workers(&runtime)?;
234 Ok(runtime)
235 }
236 }
237 }
238
239 pub fn run_worker(&self) -> Result<(), TraceExporterError> {
241 self.runtime()?;
242 Ok(())
243 }
244
245 fn start_all_workers(&self, runtime: &Arc<Runtime>) -> Result<(), TraceExporterError> {
247 let mut workers = self.workers.lock_or_panic();
248
249 self.start_info_worker(&mut workers, runtime)?;
250 self.start_stats_worker(&mut workers, runtime)?;
251 self.start_telemetry_worker(&mut workers, runtime)?;
252
253 Ok(())
254 }
255
256 fn start_info_worker(
258 &self,
259 workers: &mut TraceExporterWorkers,
260 runtime: &Arc<Runtime>,
261 ) -> Result<(), TraceExporterError> {
262 workers.info.start(runtime).map_err(|e| {
263 TraceExporterError::Internal(InternalErrorKind::InvalidWorkerState(e.to_string()))
264 })
265 }
266
267 fn start_stats_worker(
269 &self,
270 workers: &mut TraceExporterWorkers,
271 runtime: &Arc<Runtime>,
272 ) -> Result<(), TraceExporterError> {
273 if let Some(stats_worker) = &mut workers.stats {
274 stats_worker.start(runtime).map_err(|e| {
275 TraceExporterError::Internal(InternalErrorKind::InvalidWorkerState(e.to_string()))
276 })?;
277 }
278 Ok(())
279 }
280
281 fn start_telemetry_worker(
283 &self,
284 workers: &mut TraceExporterWorkers,
285 runtime: &Arc<Runtime>,
286 ) -> Result<(), TraceExporterError> {
287 if let Some(telemetry_worker) = &mut workers.telemetry {
288 telemetry_worker.start(runtime).map_err(|e| {
289 TraceExporterError::Internal(InternalErrorKind::InvalidWorkerState(e.to_string()))
290 })?;
291 if let Some(client) = &self.telemetry {
292 runtime.block_on(client.start());
293 }
294 }
295 Ok(())
296 }
297
298 pub fn stop_worker(&self) {
299 let runtime = self.runtime.lock_or_panic().take();
300 if let Some(ref rt) = runtime {
301 let mut workers = self.workers.lock_or_panic();
303 rt.block_on(async {
304 let _ = workers.info.pause().await;
305 if let Some(stats_worker) = &mut workers.stats {
306 let _ = stats_worker.pause().await;
307 };
308 if let Some(telemetry_worker) = &mut workers.telemetry {
309 let _ = telemetry_worker.pause().await;
310 };
311 });
312 }
313 if let PausableWorker::Paused { worker } = &mut self.workers.lock_or_panic().info {
319 self.info_response_observer.manual_trigger();
320 worker.drain();
321 }
322 drop(runtime);
323 }
324
325 pub fn send(&self, data: &[u8]) -> Result<AgentResponse, TraceExporterError> {
336 self.check_agent_info();
337
338 let res = match self.input_format {
339 TraceExporterInputFormat::V04 => self.send_deser(data, DeserInputFormat::V04),
340 TraceExporterInputFormat::V05 => self.send_deser(data, DeserInputFormat::V05),
341 }?;
342 if matches!(&res, AgentResponse::Changed { body } if body.is_empty()) {
343 return Err(TraceExporterError::Agent(
344 error::AgentErrorKind::EmptyResponse,
345 ));
346 }
347
348 Ok(res)
349 }
350
351 pub fn shutdown(mut self, timeout: Option<Duration>) -> Result<(), TraceExporterError> {
353 let runtime = tokio::runtime::Builder::new_current_thread()
354 .enable_all()
355 .build()?;
356
357 if let Some(timeout) = timeout {
358 match runtime
359 .block_on(async { tokio::time::timeout(timeout, self.shutdown_async()).await })
360 {
361 Ok(()) => Ok(()),
362 Err(_e) => Err(TraceExporterError::Shutdown(
363 error::ShutdownError::TimedOut(timeout),
364 )),
365 }
366 } else {
367 runtime.block_on(self.shutdown_async());
368 Ok(())
369 }
370 }
371
372 async fn shutdown_async(&mut self) {
377 let stats_status = self.client_side_stats.load();
378 if let StatsComputationStatus::Enabled {
379 cancellation_token, ..
380 } = stats_status.as_ref()
381 {
382 cancellation_token.cancel();
383
384 let stats_worker = self.workers.lock_or_panic().stats.take();
385
386 if let Some(stats_worker) = stats_worker {
387 let _ = stats_worker.join().await;
388 }
389 }
390 if let Some(telemetry) = self.telemetry.take() {
391 telemetry.shutdown().await;
392 let telemetry_worker = self.workers.lock_or_panic().telemetry.take();
393
394 if let Some(telemetry_worker) = telemetry_worker {
395 let _ = telemetry_worker.join().await;
396 }
397 }
398 }
399
400 fn has_agent_info_state_changed(&self, agent_info: &Arc<AgentInfo>) -> bool {
402 Some(agent_info.state_hash.as_str())
403 != self
404 .previous_info_state
405 .load()
406 .as_deref()
407 .map(|s| s.as_str())
408 }
409
410 fn check_agent_info(&self) {
411 if let Some(agent_info) = agent_info::get_agent_info() {
412 if self.has_agent_info_state_changed(&agent_info) {
413 match &**self.client_side_stats.load() {
414 StatsComputationStatus::Disabled => {}
415 StatsComputationStatus::DisabledByAgent { .. } => {
416 let ctx = stats::StatsContext {
417 metadata: &self.metadata,
418 endpoint_url: &self.endpoint.url,
419 runtime: &self.runtime,
420 };
421 stats::handle_stats_disabled_by_agent(
422 &ctx,
423 &agent_info,
424 &self.client_side_stats,
425 &self.workers,
426 self.http_client.clone(),
427 );
428 }
429 StatsComputationStatus::Enabled {
430 stats_concentrator, ..
431 } => {
432 let ctx = stats::StatsContext {
433 metadata: &self.metadata,
434 endpoint_url: &self.endpoint.url,
435 runtime: &self.runtime,
436 };
437 stats::handle_stats_enabled(
438 &ctx,
439 &agent_info,
440 stats_concentrator,
441 &self.client_side_stats,
442 &self.workers,
443 );
444 }
445 }
446 self.previous_info_state
447 .store(Some(agent_info.state_hash.clone().into()))
448 }
449 }
450 }
451
452 #[cfg(feature = "test-utils")]
468 pub fn wait_agent_info_ready(&self, timeout: Duration) -> anyhow::Result<()> {
469 let start = std::time::Instant::now();
470 loop {
471 if std::time::Instant::now().duration_since(start) > timeout {
472 anyhow::bail!("Timeout waiting for agent info to be ready",);
473 }
474 if agent_info::get_agent_info().is_some() {
475 return Ok(());
476 }
477 std::thread::sleep(Duration::from_millis(10));
478 }
479 }
480
481 fn emit_metric(&self, metric: HealthMetric, custom_tags: Option<Vec<&Tag>>) {
483 if self.health_metrics_enabled {
484 let emitter = MetricsEmitter::new(self.dogstatsd.as_ref(), &self.common_stats_tags);
485 emitter.emit(metric, custom_tags);
486 }
487 }
488
489 fn emit_send_result(&self, result: &SendResult) {
491 if self.health_metrics_enabled {
492 let emitter = MetricsEmitter::new(self.dogstatsd.as_ref(), &self.common_stats_tags);
493 emitter.emit_from_send_result(result);
494 }
495 }
496
497 pub fn send_trace_chunks<T: TraceData>(
506 &self,
507 trace_chunks: Vec<Vec<Span<T>>>,
508 ) -> Result<AgentResponse, TraceExporterError> {
509 self.check_agent_info();
510 self.runtime()?
511 .block_on(async { self.send_trace_chunks_inner(trace_chunks).await })
512 }
513
514 pub async fn send_trace_chunks_async<T: TraceData>(
523 &self,
524 trace_chunks: Vec<Vec<Span<T>>>,
525 ) -> Result<AgentResponse, TraceExporterError> {
526 self.check_agent_info();
527 self.send_trace_chunks_inner(trace_chunks).await
528 }
529
530 fn send_deser(
532 &self,
533 data: &[u8],
534 format: DeserInputFormat,
535 ) -> Result<AgentResponse, TraceExporterError> {
536 let (traces, _) = match format {
537 DeserInputFormat::V04 => msgpack_decoder::v04::from_slice(data),
538 DeserInputFormat::V05 => msgpack_decoder::v05::from_slice(data),
539 }
540 .map_err(|e| {
541 error!("Error deserializing trace from request body: {e}");
542 self.emit_metric(
543 HealthMetric::Count(health_metrics::DESERIALIZE_TRACES_ERRORS, 1),
544 None,
545 );
546 TraceExporterError::Deserialization(e)
547 })?;
548 debug!(
549 trace_count = traces.len(),
550 "Trace deserialization completed successfully"
551 );
552 self.emit_metric(
553 HealthMetric::Count(health_metrics::DESERIALIZE_TRACES, traces.len() as i64),
554 None,
555 );
556
557 self.runtime()?
558 .block_on(async { self.send_trace_chunks_inner(traces).await })
559 }
560
561 async fn send_traces_with_telemetry(
563 &self,
564 endpoint: &Endpoint,
565 mp_payload: Vec<u8>,
566 headers: HashMap<&'static str, String>,
567 chunks: usize,
568 chunks_dropped_p0: usize,
569 ) -> Result<AgentResponse, TraceExporterError> {
570 let strategy = RetryStrategy::default();
571 let payload_len = mp_payload.len();
572
573 let result =
575 send_with_retry(&self.http_client, endpoint, mp_payload, &headers, &strategy).await;
576
577 if let Some(telemetry) = &self.telemetry {
579 if let Err(e) = telemetry.send(&SendPayloadTelemetry::from_retry_result(
580 &result,
581 payload_len as u64,
582 chunks as u64,
583 chunks_dropped_p0 as u64,
584 )) {
585 error!(?e, "Error sending telemetry");
586 }
587 }
588
589 self.handle_send_result(result, chunks, payload_len).await
590 }
591
592 async fn send_trace_chunks_inner<T: TraceData>(
593 &self,
594 mut traces: Vec<Vec<Span<T>>>,
595 ) -> Result<AgentResponse, TraceExporterError> {
596 let mut header_tags: TracerHeaderTags = self.metadata.borrow().into();
597
598 let dropped_p0_stats = stats::process_traces_for_stats(
600 &mut traces,
601 &mut header_tags,
602 &self.client_side_stats,
603 self.client_computed_top_level,
604 );
605
606 let serializer = TraceSerializer::new(
607 self.output_format,
608 self.agent_payload_response_version.as_ref(),
609 );
610 let prepared = match serializer.prepare_traces_payload(traces, header_tags) {
611 Ok(p) => p,
612 Err(e) => {
613 error!("Error serializing traces: {e}");
614 self.emit_metric(
615 HealthMetric::Count(health_metrics::SERIALIZE_TRACES_ERRORS, 1),
616 None,
617 );
618 return Err(e);
619 }
620 };
621
622 let endpoint = Endpoint {
623 url: self.get_agent_url(),
624 ..self.endpoint.clone()
625 };
626
627 self.send_traces_with_telemetry(
628 &endpoint,
629 prepared.data,
630 prepared.headers,
631 prepared.chunk_count,
632 dropped_p0_stats.dropped_p0_traces,
633 )
634 .await
635 }
636
637 async fn handle_send_result(
639 &self,
640 result: SendWithRetryResult,
641 chunks: usize,
642 payload_len: usize,
643 ) -> Result<AgentResponse, TraceExporterError> {
644 match result {
645 Ok((response, attempts)) => {
646 self.handle_agent_response(chunks, response, payload_len, attempts)
647 .await
648 }
649 Err(err) => self.handle_send_error(err, payload_len, chunks).await,
650 }
651 }
652
653 async fn handle_send_error(
655 &self,
656 err: SendWithRetryError,
657 payload_len: usize,
658 chunks: usize,
659 ) -> Result<AgentResponse, TraceExporterError> {
660 error!(?err, "Error sending traces");
661
662 match err {
663 SendWithRetryError::Http(response, attempts) => {
664 self.handle_http_send_error(response, payload_len, chunks, attempts)
665 .await
666 }
667 SendWithRetryError::Timeout(attempts) => {
668 let send_result =
669 SendResult::failure(TransportErrorType::Timeout, payload_len, chunks, attempts);
670 self.emit_send_result(&send_result);
671 Err(TraceExporterError::from(io::Error::from(
672 io::ErrorKind::TimedOut,
673 )))
674 }
675 SendWithRetryError::Network(err, attempts) => {
676 let send_result =
677 SendResult::failure(TransportErrorType::Network, payload_len, chunks, attempts);
678 self.emit_send_result(&send_result);
679 Err(TraceExporterError::from(err))
680 }
681 SendWithRetryError::Build(attempts) => {
682 let send_result =
683 SendResult::failure(TransportErrorType::Build, payload_len, chunks, attempts);
684 self.emit_send_result(&send_result);
685 Err(TraceExporterError::from(io::Error::from(
686 io::ErrorKind::Other,
687 )))
688 }
689 }
690 }
691
692 async fn handle_http_send_error(
694 &self,
695 response: http_common::HttpResponse,
696 payload_len: usize,
697 chunks: usize,
698 attempts: u32,
699 ) -> Result<AgentResponse, TraceExporterError> {
700 let status = response.status();
701
702 self.info_response_observer.check_response(&response);
704
705 let send_result = SendResult::failure(
707 TransportErrorType::Http(status.as_u16()),
708 payload_len,
709 chunks,
710 attempts,
711 );
712 self.emit_send_result(&send_result);
713
714 let body = self.read_error_response_body(response).await?;
715 Err(TraceExporterError::Request(RequestError::new(
716 status,
717 &String::from_utf8_lossy(&body),
718 )))
719 }
720
721 async fn read_error_response_body(
723 &self,
724 response: http_common::HttpResponse,
725 ) -> Result<bytes::Bytes, TraceExporterError> {
726 match response.into_body().collect().await {
727 Ok(body) => Ok(body.to_bytes()),
728 Err(err) => {
729 error!(?err, "Error reading agent response body");
730 Err(TraceExporterError::from(err))
731 }
732 }
733 }
734
735 fn check_payload_version_changed(&self, response: &http_common::HttpResponse) -> bool {
737 let status = response.status();
738 match (
739 status.is_success(),
740 self.agent_payload_response_version.as_ref(),
741 response.headers().get(DATADOG_RATES_PAYLOAD_VERSION_HEADER),
742 ) {
743 (false, _, _) => {
744 false
746 }
747 (true, None, _) => {
748 true
751 }
752 (true, Some(agent_payload_response_version), Some(new_payload_version)) => {
753 if let Ok(new_payload_version_str) = new_payload_version.to_str() {
754 agent_payload_response_version.check_and_update(new_payload_version_str)
755 } else {
756 false
757 }
758 }
759 _ => false,
760 }
761 }
762
763 async fn read_response_body(
765 response: http_common::HttpResponse,
766 ) -> Result<String, http_common::Error> {
767 let body = http_common::collect_response_bytes(response).await?;
768 Ok(String::from_utf8_lossy(&body).to_string())
769 }
770
771 fn handle_successful_trace_response(
773 &self,
774 chunks: usize,
775 payload_len: usize,
776 attempts: u32,
777 body: String,
778 payload_version_changed: bool,
779 ) -> Result<AgentResponse, TraceExporterError> {
780 debug!(chunks = chunks, "Trace chunks sent successfully to agent");
781 let send_result = SendResult::success(payload_len, chunks, attempts);
782 self.emit_send_result(&send_result);
783
784 Ok(if payload_version_changed {
785 AgentResponse::Changed { body }
786 } else {
787 AgentResponse::Unchanged
788 })
789 }
790
791 async fn handle_agent_response(
792 &self,
793 chunks: usize,
794 response: http_common::HttpResponse,
795 payload_len: usize,
796 attempts: u32,
797 ) -> Result<AgentResponse, TraceExporterError> {
798 self.info_response_observer.check_response(&response);
800
801 let status = response.status();
802 let payload_version_changed = self.check_payload_version_changed(&response);
803
804 match Self::read_response_body(response).await {
805 Ok(body) => {
806 if !status.is_success() {
807 warn!(
808 status = %status,
809 "Agent returned non-success status for trace send"
810 );
811 let send_result = SendResult::failure(
812 TransportErrorType::Http(status.as_u16()),
813 payload_len,
814 chunks,
815 attempts,
816 );
817 self.emit_send_result(&send_result);
818 return Err(TraceExporterError::Request(RequestError::new(
819 status, &body,
820 )));
821 }
822
823 self.handle_successful_trace_response(
824 chunks,
825 payload_len,
826 attempts,
827 body,
828 payload_version_changed,
829 )
830 }
831 Err(err) => {
832 error!(?err, "Error reading agent response body");
833 let send_result = SendResult::failure(
834 TransportErrorType::ResponseBody,
835 payload_len,
836 chunks,
837 attempts,
838 );
839 self.emit_send_result(&send_result);
840 Err(TraceExporterError::from(err))
841 }
842 }
843 }
844
845 fn get_agent_url(&self) -> Uri {
846 self.output_format.add_path(&self.endpoint.url)
847 }
848
849 #[cfg(test)]
850 pub fn is_stats_worker_active(&self) -> bool {
852 stats::is_stats_worker_active(&self.client_side_stats, &self.workers)
853 }
854}
855
856#[derive(Debug, Default, Clone)]
857pub struct TelemetryConfig {
858 pub heartbeat: u64,
859 pub runtime_id: Option<String>,
860 pub debug_enabled: bool,
861}
862
863#[allow(missing_docs)]
864pub trait ResponseCallback {
865 #[allow(missing_docs)]
866 fn call(&self, response: &str);
867}
868
869#[cfg(test)]
870mod tests {
871 use self::error::AgentErrorKind;
872 use super::*;
873 use httpmock::prelude::*;
874 use httpmock::MockServer;
875 use libdd_tinybytes::BytesString;
876 use libdd_trace_utils::msgpack_encoder;
877 use libdd_trace_utils::span::v04::SpanBytes;
878 use libdd_trace_utils::span::v05;
879 use std::collections::HashMap;
880 use std::net;
881 use std::time::Duration;
882 use tokio::time::sleep;
883
884 const V5_EMPTY: [u8; 4] = [0x92, 0x91, 0xA0, 0x90];
886
887 #[test]
888 fn test_from_tracer_tags_to_tracer_header_tags() {
889 let tracer_tags = TracerMetadata {
890 tracer_version: "v0.1".to_string(),
891 language: "rust".to_string(),
892 language_version: "1.52.1".to_string(),
893 language_interpreter: "rustc".to_string(),
894 language_interpreter_vendor: "rust-lang".to_string(),
895 client_computed_stats: true,
896 client_computed_top_level: true,
897 ..Default::default()
898 };
899
900 let tracer_header_tags: TracerHeaderTags = (&tracer_tags).into();
901
902 assert_eq!(tracer_header_tags.tracer_version, "v0.1");
903 assert_eq!(tracer_header_tags.lang, "rust");
904 assert_eq!(tracer_header_tags.lang_version, "1.52.1");
905 assert_eq!(tracer_header_tags.lang_interpreter, "rustc");
906 assert_eq!(tracer_header_tags.lang_vendor, "rust-lang");
907 assert!(tracer_header_tags.client_computed_stats);
908 assert!(tracer_header_tags.client_computed_top_level);
909 }
910
911 #[test]
912 fn test_from_tracer_tags_to_hashmap() {
913 let tracer_tags = TracerMetadata {
914 tracer_version: "v0.1".to_string(),
915 language: "rust".to_string(),
916 language_version: "1.52.1".to_string(),
917 language_interpreter: "rustc".to_string(),
918 client_computed_stats: true,
919 client_computed_top_level: true,
920 ..Default::default()
921 };
922
923 let hashmap: HashMap<&'static str, String> = (&tracer_tags).into();
924
925 assert_eq!(hashmap.get("datadog-meta-tracer-version").unwrap(), "v0.1");
926 assert_eq!(hashmap.get("datadog-meta-lang").unwrap(), "rust");
927 assert_eq!(hashmap.get("datadog-meta-lang-version").unwrap(), "1.52.1");
928 assert_eq!(
929 hashmap.get("datadog-meta-lang-interpreter").unwrap(),
930 "rustc"
931 );
932 assert!(hashmap.contains_key("datadog-client-computed-stats"));
933 assert!(hashmap.contains_key("datadog-client-computed-top-level"));
934 }
935
936 fn read(socket: &net::UdpSocket) -> String {
937 let mut buf = [0; 1_000];
938 socket.recv(&mut buf).expect("No data");
939 let datagram = String::from_utf8_lossy(buf.as_ref());
940 datagram.trim_matches(char::from(0)).to_string()
941 }
942
943 fn build_test_exporter(
944 url: String,
945 dogstatsd_url: Option<String>,
946 input: TraceExporterInputFormat,
947 output: TraceExporterOutputFormat,
948 enable_telemetry: bool,
949 enable_health_metrics: bool,
950 ) -> TraceExporter {
951 let mut builder = TraceExporterBuilder::default();
952 builder
953 .set_url(&url)
954 .set_service("test")
955 .set_env("staging")
956 .set_tracer_version("v0.1")
957 .set_language("nodejs")
958 .set_language_version("1.0")
959 .set_language_interpreter("v8")
960 .set_input_format(input)
961 .set_output_format(output);
962
963 if enable_health_metrics {
964 builder.enable_health_metrics();
965 }
966
967 if let Some(url) = dogstatsd_url {
968 builder.set_dogstatsd_url(&url);
969 };
970
971 if enable_telemetry {
972 builder.enable_telemetry(TelemetryConfig {
973 heartbeat: 100,
974 ..Default::default()
975 });
976 }
977
978 builder.build().unwrap()
979 }
980
981 #[test]
982 #[cfg_attr(miri, ignore)]
983 fn test_health_metrics() {
984 let stats_socket = net::UdpSocket::bind("127.0.0.1:0").expect("failed to bind host socket");
985 let _ = stats_socket.set_read_timeout(Some(Duration::from_millis(500)));
986
987 let fake_agent = MockServer::start();
988 let _mock_traces = fake_agent.mock(|_, then| {
989 then.status(200)
990 .header("content-type", "application/json")
991 .body(r#"{ "rate_by_service": { "service:test,env:staging": 1.0, "service:test,env:prod": 0.3 } }"#);
992 });
993
994 let exporter = build_test_exporter(
995 fake_agent.url("/v0.4/traces"),
996 Some(stats_socket.local_addr().unwrap().to_string()),
997 TraceExporterInputFormat::V04,
998 TraceExporterOutputFormat::V04,
999 false,
1000 true,
1001 );
1002
1003 let traces: Vec<Vec<SpanBytes>> = vec![
1004 vec![SpanBytes {
1005 name: BytesString::from_slice(b"test").unwrap(),
1006 ..Default::default()
1007 }],
1008 vec![SpanBytes {
1009 name: BytesString::from_slice(b"test2").unwrap(),
1010 ..Default::default()
1011 }],
1012 ];
1013 let data = msgpack_encoder::v04::to_vec(&traces);
1014
1015 let _result = exporter.send(data.as_ref()).expect("failed to send trace");
1016
1017 let mut received_metrics = Vec::new();
1019 for _ in 0..5 {
1020 received_metrics.push(read(&stats_socket));
1021 }
1022
1023 let expected_metrics = vec![
1025 format!(
1026 "datadog.tracer.exporter.deserialize.traces:2|c|#libdatadog_version:{}",
1027 env!("CARGO_PKG_VERSION")
1028 ),
1029 format!(
1030 "datadog.tracer.exporter.transport.traces.successful:2|c|#libdatadog_version:{}",
1031 env!("CARGO_PKG_VERSION")
1032 ),
1033 format!(
1034 "datadog.tracer.exporter.transport.sent.bytes:{}|d|#libdatadog_version:{}",
1035 data.len(),
1036 env!("CARGO_PKG_VERSION")
1037 ),
1038 format!(
1039 "datadog.tracer.exporter.transport.traces.sent:2|d|#libdatadog_version:{}",
1040 env!("CARGO_PKG_VERSION")
1041 ),
1042 format!(
1043 "datadog.tracer.exporter.transport.requests:1|d|#libdatadog_version:{}",
1044 env!("CARGO_PKG_VERSION")
1045 ),
1046 ];
1047
1048 for expected in expected_metrics {
1049 assert!(
1050 received_metrics.contains(&expected),
1051 "Expected metric '{expected}' not found in received metrics: {received_metrics:?}"
1052 );
1053 }
1054 }
1055
1056 #[test]
1057 #[cfg_attr(miri, ignore)]
1058 fn test_invalid_traces() {
1059 let stats_socket = net::UdpSocket::bind("127.0.0.1:0").expect("failed to bind host socket");
1060 let _ = stats_socket.set_read_timeout(Some(Duration::from_millis(500)));
1061
1062 let fake_agent = MockServer::start();
1063
1064 let exporter = build_test_exporter(
1065 fake_agent.url("/v0.4/traces"),
1066 Some(stats_socket.local_addr().unwrap().to_string()),
1067 TraceExporterInputFormat::V04,
1068 TraceExporterOutputFormat::V04,
1069 false,
1070 true,
1071 );
1072
1073 let bad_payload = b"some_bad_payload".as_ref();
1074 let result = exporter.send(bad_payload);
1075
1076 assert!(result.is_err());
1077
1078 assert_eq!(
1079 &format!(
1080 "datadog.tracer.exporter.deserialize.errors:1|c|#libdatadog_version:{}",
1081 env!("CARGO_PKG_VERSION")
1082 ),
1083 &read(&stats_socket)
1084 );
1085 }
1086
1087 #[test]
1088 #[cfg_attr(miri, ignore)]
1089 fn test_health_metrics_error() {
1090 let stats_socket = net::UdpSocket::bind("127.0.0.1:0").expect("failed to bind host socket");
1091 let _ = stats_socket.set_read_timeout(Some(Duration::from_millis(500)));
1092
1093 let fake_agent = MockServer::start();
1094 let _mock_traces = fake_agent.mock(|_, then| {
1095 then.status(400)
1096 .header("content-type", "application/json")
1097 .body("{}");
1098 });
1099
1100 let exporter = build_test_exporter(
1101 fake_agent.url("/v0.4/traces"),
1102 Some(stats_socket.local_addr().unwrap().to_string()),
1103 TraceExporterInputFormat::V04,
1104 TraceExporterOutputFormat::V04,
1105 false,
1106 true,
1107 );
1108
1109 let traces: Vec<Vec<SpanBytes>> = vec![vec![SpanBytes {
1110 name: BytesString::from_slice(b"test").unwrap(),
1111 ..Default::default()
1112 }]];
1113 let data = msgpack_encoder::v04::to_vec(&traces);
1114 let result = exporter.send(data.as_ref());
1115
1116 assert!(result.is_err());
1117
1118 let mut metrics = Vec::new();
1120 loop {
1121 let mut buf = [0; 1_000];
1122 match stats_socket.recv(&mut buf) {
1123 Ok(size) => {
1124 let datagram = String::from_utf8_lossy(&buf[..size]);
1125 metrics.push(datagram.to_string());
1126 }
1127 Err(_) => break, }
1129 }
1130
1131 let expected_deser = format!(
1133 "datadog.tracer.exporter.deserialize.traces:1|c|#libdatadog_version:{}",
1134 env!("CARGO_PKG_VERSION")
1135 );
1136 let expected_error = format!(
1137 "datadog.tracer.exporter.transport.traces.failed:1|c|#libdatadog_version:{},type:400",
1138 env!("CARGO_PKG_VERSION")
1139 );
1140 let expected_dropped = format!(
1141 "datadog.tracer.exporter.transport.dropped.bytes:{}|d|#libdatadog_version:{}",
1142 data.len(),
1143 env!("CARGO_PKG_VERSION")
1144 );
1145 let expected_sent_bytes = format!(
1146 "datadog.tracer.exporter.transport.sent.bytes:{}|d|#libdatadog_version:{}",
1147 data.len(),
1148 env!("CARGO_PKG_VERSION")
1149 );
1150 let expected_sent_traces = format!(
1151 "datadog.tracer.exporter.transport.traces.sent:1|d|#libdatadog_version:{}",
1152 env!("CARGO_PKG_VERSION")
1153 );
1154 let expected_dropped_traces = format!(
1155 "datadog.tracer.exporter.transport.traces.dropped:1|d|#libdatadog_version:{}",
1156 env!("CARGO_PKG_VERSION")
1157 );
1158 let expected_requests = format!(
1159 "datadog.tracer.exporter.transport.requests:5|d|#libdatadog_version:{}",
1160 env!("CARGO_PKG_VERSION")
1161 );
1162
1163 assert!(
1165 metrics.contains(&expected_deser),
1166 "Missing deser_traces metric. Got: {metrics:?}"
1167 );
1168 assert!(
1169 metrics.contains(&expected_error),
1170 "Missing send.traces.errors metric. Got: {metrics:?}"
1171 );
1172 assert!(
1173 metrics.contains(&expected_dropped),
1174 "Missing http.dropped.bytes metric. Got: {metrics:?}"
1175 );
1176 assert!(
1177 metrics.contains(&expected_dropped_traces),
1178 "Missing http.dropped.traces metric. Got: {metrics:?}"
1179 );
1180 assert!(
1181 metrics.contains(&expected_sent_bytes),
1182 "Missing http.sent.bytes metric. Got: {metrics:?}"
1183 );
1184 assert!(
1185 metrics.contains(&expected_sent_traces),
1186 "Missing http.sent.traces metric. Got: {metrics:?}"
1187 );
1188 assert!(
1189 metrics.contains(&expected_requests),
1190 "Missing http.requests metric. Got: {metrics:?}"
1191 );
1192 }
1193
1194 #[test]
1195 #[cfg_attr(miri, ignore)]
1196 fn test_health_metrics_dropped_bytes_exclusions() {
1197 let stats_socket = net::UdpSocket::bind("127.0.0.1:0").expect("failed to bind host socket");
1198 let _ = stats_socket.set_read_timeout(Some(Duration::from_millis(500)));
1199
1200 let fake_agent = MockServer::start();
1202 let _mock_traces = fake_agent.mock(|_, then| {
1203 then.status(404)
1204 .header("content-type", "application/json")
1205 .body("{}");
1206 });
1207
1208 let exporter = build_test_exporter(
1209 fake_agent.url("/v0.4/traces"),
1210 Some(stats_socket.local_addr().unwrap().to_string()),
1211 TraceExporterInputFormat::V04,
1212 TraceExporterOutputFormat::V04,
1213 false,
1214 true,
1215 );
1216
1217 let traces: Vec<Vec<SpanBytes>> = vec![vec![SpanBytes {
1218 name: BytesString::from_slice(b"test").unwrap(),
1219 ..Default::default()
1220 }]];
1221 let data = msgpack_encoder::v04::to_vec(&traces);
1222 let result = exporter.send(data.as_ref());
1223
1224 assert!(result.is_err());
1225
1226 let mut received_metrics = Vec::new();
1228 for _ in 0..5 {
1229 received_metrics.push(read(&stats_socket));
1230 }
1231
1232 let expected_deser = format!(
1234 "datadog.tracer.exporter.deserialize.traces:1|c|#libdatadog_version:{}",
1235 env!("CARGO_PKG_VERSION")
1236 );
1237 let expected_error = format!(
1238 "datadog.tracer.exporter.transport.traces.failed:1|c|#libdatadog_version:{},type:404",
1239 env!("CARGO_PKG_VERSION")
1240 );
1241 let expected_sent_bytes = format!(
1242 "datadog.tracer.exporter.transport.sent.bytes:{}|d|#libdatadog_version:{}",
1243 data.len(),
1244 env!("CARGO_PKG_VERSION")
1245 );
1246 let expected_sent_traces = format!(
1247 "datadog.tracer.exporter.transport.traces.sent:1|d|#libdatadog_version:{}",
1248 env!("CARGO_PKG_VERSION")
1249 );
1250 let expected_requests = format!(
1251 "datadog.tracer.exporter.transport.requests:5|d|#libdatadog_version:{}",
1252 env!("CARGO_PKG_VERSION")
1253 );
1254
1255 assert!(
1257 received_metrics.contains(&expected_deser),
1258 "Missing deser_traces metric. Got: {received_metrics:?}"
1259 );
1260 assert!(
1261 received_metrics.contains(&expected_error),
1262 "Missing send.traces.errors metric. Got: {received_metrics:?}"
1263 );
1264 assert!(
1265 received_metrics.contains(&expected_sent_bytes),
1266 "Missing http.sent.bytes metric. Got: {received_metrics:?}"
1267 );
1268 assert!(
1269 received_metrics.contains(&expected_sent_traces),
1270 "Missing http.sent.traces metric. Got: {received_metrics:?}"
1271 );
1272 assert!(
1273 received_metrics.contains(&expected_requests),
1274 "Missing http.requests metric. Got: {received_metrics:?}"
1275 );
1276
1277 let dropped_bytes_metric = format!(
1279 "datadog.tracer.exporter.transport.dropped.bytes:{}|d|#libdatadog_version:{}",
1280 data.len(),
1281 env!("CARGO_PKG_VERSION")
1282 );
1283 assert!(
1284 !received_metrics.contains(&dropped_bytes_metric),
1285 "Should not emit http.dropped.bytes for 404. Got: {received_metrics:?}"
1286 );
1287
1288 let dropped_traces_metric = format!(
1290 "datadog.tracer.exporter.transport.traces.dropped:1|d|#libdatadog_version:{}",
1291 env!("CARGO_PKG_VERSION")
1292 );
1293 assert!(
1294 !received_metrics.contains(&dropped_traces_metric),
1295 "Should not emit http.dropped.traces for 404. Got: {received_metrics:?}"
1296 );
1297 }
1298
1299 #[test]
1300 #[cfg_attr(miri, ignore)]
1301 fn test_health_metrics_disabled() {
1302 let stats_socket = net::UdpSocket::bind("127.0.0.1:0").expect("failed to bind host socket");
1303 let _ = stats_socket.set_read_timeout(Some(Duration::from_millis(500)));
1304
1305 let fake_agent = MockServer::start();
1306 let _mock_traces = fake_agent.mock(|_, then| {
1307 then.status(200)
1308 .header("content-type", "application/json")
1309 .body(r#"{ "rate_by_service": { "service:test,env:staging": 1.0 } }"#);
1310 });
1311
1312 let exporter = build_test_exporter(
1313 fake_agent.url("/v0.4/traces"),
1314 Some(stats_socket.local_addr().unwrap().to_string()),
1315 TraceExporterInputFormat::V04,
1316 TraceExporterOutputFormat::V04,
1317 false,
1318 false, );
1320
1321 let traces: Vec<Vec<SpanBytes>> = vec![vec![SpanBytes {
1322 name: BytesString::from_slice(b"test").unwrap(),
1323 ..Default::default()
1324 }]];
1325 let data = msgpack_encoder::v04::to_vec(&traces);
1326
1327 let _result = exporter.send(data.as_ref()).expect("failed to send trace");
1328
1329 let mut buf = [0; 1_000];
1331 match stats_socket.recv(&mut buf) {
1332 Ok(_) => {
1333 let datagram = String::from_utf8_lossy(buf.as_ref());
1334 let received = datagram.trim_matches(char::from(0)).to_string();
1335 panic!(
1336 "Expected no metrics when health metrics disabled, but received: {received}"
1337 );
1338 }
1339 Err(e)
1340 if e.kind() == std::io::ErrorKind::WouldBlock
1341 || e.kind() == std::io::ErrorKind::TimedOut
1342 || e.kind() == std::io::ErrorKind::Interrupted =>
1343 {
1344 }
1349 Err(e) => panic!("Unexpected error reading from socket: {e}"),
1350 }
1351 }
1352
1353 #[test]
1354 #[cfg_attr(miri, ignore)]
1355 fn test_agent_response_parse_default() {
1356 let server = MockServer::start();
1357 let _agent = server.mock(|_, then| {
1358 then.status(200)
1359 .header("content-type", "application/json")
1360 .body(
1361 r#"{
1362 "rate_by_service": {
1363 "service:foo,env:staging": 1.0,
1364 "service:,env:": 0.8
1365 }
1366 }"#,
1367 );
1368 });
1369
1370 let mut builder = TraceExporterBuilder::default();
1371 builder
1372 .set_url(&server.url("/"))
1373 .set_service("foo")
1374 .set_env("foo-env")
1375 .set_tracer_version("v0.1")
1376 .set_language("nodejs")
1377 .set_language_version("1.0")
1378 .set_language_interpreter("v8");
1379 let exporter = builder.build().unwrap();
1380
1381 let traces: Vec<Vec<SpanBytes>> = vec![vec![SpanBytes {
1382 name: BytesString::from_slice(b"test").unwrap(),
1383 ..Default::default()
1384 }]];
1385 let data = msgpack_encoder::v04::to_vec(&traces);
1386 let result = exporter.send(data.as_ref()).unwrap();
1387
1388 assert_eq!(
1389 result,
1390 AgentResponse::Changed {
1391 body: r#"{
1392 "rate_by_service": {
1393 "service:foo,env:staging": 1.0,
1394 "service:,env:": 0.8
1395 }
1396 }"#
1397 .to_string()
1398 }
1399 );
1400 }
1401
1402 #[test]
1403 #[cfg_attr(miri, ignore)]
1404 fn test_agent_response_error() {
1405 let server = MockServer::start();
1406 let _agent = server.mock(|_, then| {
1407 then.status(500)
1408 .header("content-type", "application/json")
1409 .body(r#"{ "error": "Unavailable" }"#);
1410 });
1411
1412 let mut builder = TraceExporterBuilder::default();
1413 builder
1414 .set_url(&server.url("/"))
1415 .set_service("foo")
1416 .set_env("foo-env")
1417 .set_tracer_version("v0.1")
1418 .set_language("nodejs")
1419 .set_language_version("1.0")
1420 .set_language_interpreter("v8");
1421 let exporter = builder.build().unwrap();
1422
1423 let traces: Vec<Vec<SpanBytes>> = vec![vec![SpanBytes {
1424 name: BytesString::from_slice(b"test").unwrap(),
1425 ..Default::default()
1426 }]];
1427 let data = msgpack_encoder::v04::to_vec(&traces);
1428 let code = match exporter.send(data.as_ref()).unwrap_err() {
1429 TraceExporterError::Request(e) => Some(e.status()),
1430 _ => None,
1431 }
1432 .unwrap();
1433
1434 assert_eq!(code, 500);
1435 }
1436
1437 #[test]
1438 #[cfg_attr(miri, ignore)]
1439 fn test_agent_empty_response_error() {
1440 let server = MockServer::start();
1441 let _agent = server.mock(|_, then| {
1442 then.status(200)
1443 .header("content-type", "application/json")
1444 .body("");
1445 });
1446
1447 let mut builder = TraceExporterBuilder::default();
1448 builder
1449 .set_url(&server.url("/"))
1450 .set_service("foo")
1451 .set_env("foo-env")
1452 .set_tracer_version("v0.1")
1453 .set_language("nodejs")
1454 .set_language_version("1.0")
1455 .set_language_interpreter("v8");
1456 let exporter = builder.build().unwrap();
1457
1458 let traces: Vec<Vec<SpanBytes>> = vec![vec![SpanBytes {
1459 name: BytesString::from_slice(b"test").unwrap(),
1460 ..Default::default()
1461 }]];
1462 let data = msgpack_encoder::v04::to_vec(&traces);
1463 let err = exporter.send(data.as_ref());
1464
1465 assert!(err.is_err());
1466 assert_eq!(
1467 match err.unwrap_err() {
1468 TraceExporterError::Agent(e) => Some(e),
1469 _ => None,
1470 },
1471 Some(AgentErrorKind::EmptyResponse)
1472 );
1473 }
1474
1475 #[test]
1476 #[cfg_attr(miri, ignore)]
1477 fn test_exporter_metrics_v4() {
1478 let server = MockServer::start();
1479 let response_body = r#"{
1480 "rate_by_service": {
1481 "service:foo,env:staging": 1.0,
1482 "service:,env:": 0.8
1483 }
1484 }"#;
1485 let traces_endpoint = server.mock(|when, then| {
1486 when.method(POST).path("/v0.4/traces");
1487 then.status(200)
1488 .header("content-type", "application/json")
1489 .body(response_body);
1490 });
1491
1492 let metrics_endpoint = server.mock(|when, then| {
1493 when.method(POST)
1494 .body_includes("\"metric\":\"trace_api.bytes\"")
1495 .path("/telemetry/proxy/api/v2/apmtelemetry");
1496 then.status(200)
1497 .header("content-type", "application/json")
1498 .body("");
1499 });
1500
1501 let mut builder = TraceExporterBuilder::default();
1502 builder
1503 .set_url(&server.url("/"))
1504 .set_service("foo")
1505 .set_env("foo-env")
1506 .set_tracer_version("v0.1")
1507 .set_language("nodejs")
1508 .set_language_version("1.0")
1509 .set_language_interpreter("v8")
1510 .enable_telemetry(TelemetryConfig {
1511 heartbeat: 100,
1512 ..Default::default()
1513 });
1514 let exporter = builder.build().unwrap();
1515
1516 let traces = vec![0x90];
1517 let result = exporter.send(traces.as_ref()).unwrap();
1518 let AgentResponse::Changed { body } = result else {
1519 panic!("Expected Changed response");
1520 };
1521 assert_eq!(body, response_body);
1522
1523 traces_endpoint.assert_calls(1);
1524 while metrics_endpoint.calls() == 0 {
1525 exporter
1526 .runtime
1527 .lock()
1528 .unwrap()
1529 .as_ref()
1530 .unwrap()
1531 .block_on(async {
1532 sleep(Duration::from_millis(100)).await;
1533 })
1534 }
1535 metrics_endpoint.assert_calls(1);
1536 }
1537
1538 #[test]
1539 #[cfg_attr(miri, ignore)]
1540 fn test_exporter_metrics_v5() {
1541 let server = MockServer::start();
1542 let response_body = r#"{
1543 "rate_by_service": {
1544 "service:foo,env:staging": 1.0,
1545 "service:,env:": 0.8
1546 }
1547 }"#;
1548 let traces_endpoint = server.mock(|when, then| {
1549 when.method(POST).path("/v0.5/traces");
1550 then.status(200)
1551 .header("content-type", "application/json")
1552 .body(response_body);
1553 });
1554
1555 let metrics_endpoint = server.mock(|when, then| {
1556 when.method(POST)
1557 .body_includes("\"metric\":\"trace_api.bytes\"")
1558 .path("/telemetry/proxy/api/v2/apmtelemetry");
1559 then.status(200)
1560 .header("content-type", "application/json")
1561 .body("");
1562 });
1563
1564 let exporter = build_test_exporter(
1565 server.url("/"),
1566 None,
1567 TraceExporterInputFormat::V05,
1568 TraceExporterOutputFormat::V05,
1569 true,
1570 true,
1571 );
1572
1573 let v5: (Vec<BytesString>, Vec<Vec<v05::Span>>) = (vec![], vec![]);
1574 let traces = rmp_serde::to_vec(&v5).unwrap();
1575 let result = exporter.send(traces.as_ref()).unwrap();
1576 let AgentResponse::Changed { body } = result else {
1577 panic!("Expected Changed response");
1578 };
1579 assert_eq!(body, response_body);
1580
1581 traces_endpoint.assert_calls(1);
1582 while metrics_endpoint.calls() == 0 {
1583 exporter
1584 .runtime
1585 .lock()
1586 .unwrap()
1587 .as_ref()
1588 .unwrap()
1589 .block_on(async {
1590 sleep(Duration::from_millis(100)).await;
1591 })
1592 }
1593 metrics_endpoint.assert_calls(1);
1594 }
1595
1596 #[test]
1597 #[cfg_attr(miri, ignore)]
1598 fn test_exporter_metrics_v4_to_v5() {
1599 let server = MockServer::start();
1600 let response_body = r#"{
1601 "rate_by_service": {
1602 "service:foo,env:staging": 1.0,
1603 "service:,env:": 0.8
1604 }
1605 }"#;
1606 let traces_endpoint = server.mock(|when, then| {
1607 when.method(POST).path("/v0.5/traces").is_true(|req| {
1608 let bytes = libdd_tinybytes::Bytes::copy_from_slice(req.body_ref());
1609 bytes.to_vec() == V5_EMPTY
1610 });
1611 then.status(200)
1612 .header("content-type", "application/json")
1613 .body(response_body);
1614 });
1615
1616 let metrics_endpoint = server.mock(|when, then| {
1617 when.method(POST)
1618 .body_includes("\"metric\":\"trace_api.bytes\"")
1619 .path("/telemetry/proxy/api/v2/apmtelemetry");
1620 then.status(200)
1621 .header("content-type", "application/json")
1622 .body("");
1623 });
1624
1625 let mut builder = TraceExporterBuilder::default();
1626 builder
1627 .set_url(&server.url("/"))
1628 .set_service("foo")
1629 .set_env("foo-env")
1630 .set_tracer_version("v0.1")
1631 .set_language("nodejs")
1632 .set_language_version("1.0")
1633 .set_language_interpreter("v8")
1634 .enable_telemetry(TelemetryConfig {
1635 heartbeat: 100,
1636 ..Default::default()
1637 })
1638 .set_input_format(TraceExporterInputFormat::V04)
1639 .set_output_format(TraceExporterOutputFormat::V05);
1640
1641 let exporter = builder.build().unwrap();
1642
1643 let traces = vec![0x90];
1644 let result = exporter.send(traces.as_ref()).unwrap();
1645 let AgentResponse::Changed { body } = result else {
1646 panic!("Expected Changed response");
1647 };
1648 assert_eq!(body, response_body);
1649
1650 traces_endpoint.assert_calls(1);
1651 while metrics_endpoint.calls() == 0 {
1652 exporter
1653 .runtime
1654 .lock()
1655 .unwrap()
1656 .as_ref()
1657 .unwrap()
1658 .block_on(async {
1659 sleep(Duration::from_millis(100)).await;
1660 })
1661 }
1662 metrics_endpoint.assert_calls(1);
1663 }
1664
1665 #[test]
1666 #[cfg_attr(miri, ignore)]
1667 fn test_agent_response_payload_version_disabled() {
1670 let server = MockServer::start();
1671 let response_body = r#"{
1672 "rate_by_service": {
1673 "service:foo,env:staging": 1.0,
1674 "service:,env:": 0.8
1675 }
1676 }"#;
1677 let traces_endpoint = server.mock(|when, then| {
1678 when.method(POST).path("/v0.4/traces");
1679 then.status(200)
1680 .header("content-type", "application/json")
1681 .header("datadog-rates-payload-version", "abc")
1682 .body(response_body);
1683 });
1684
1685 let mut builder = TraceExporterBuilder::default();
1686 builder.set_url(&server.url("/"));
1687 let exporter = builder.build().unwrap();
1688 let traces = vec![0x90];
1689 for _ in 0..2 {
1690 let result = exporter.send(traces.as_ref()).unwrap();
1691 let AgentResponse::Changed { body } = result else {
1692 panic!("Expected Changed response");
1693 };
1694 assert_eq!(body, response_body);
1695 }
1696 traces_endpoint.assert_calls(2);
1697 }
1698
1699 #[test]
1700 #[cfg_attr(miri, ignore)]
1701 fn test_agent_response_payload_version() {
1705 let server = MockServer::start();
1706 let response_body = r#"{
1707 "rate_by_service": {
1708 "service:foo,env:staging": 1.0,
1709 "service:,env:": 0.8
1710 }
1711 }"#;
1712 let mut traces_endpoint = server.mock(|when, then| {
1713 when.method(POST).path("/v0.4/traces");
1714 then.status(200)
1715 .header("content-type", "application/json")
1716 .header("datadog-rates-payload-version", "abc")
1717 .body(response_body);
1718 });
1719
1720 let mut builder = TraceExporterBuilder::default();
1721 builder
1722 .set_url(&server.url("/"))
1723 .enable_agent_rates_payload_version();
1724 let exporter = builder.build().unwrap();
1725 let traces = vec![0x90];
1726 let result = exporter.send(traces.as_ref()).unwrap();
1727 let AgentResponse::Changed { body } = result else {
1728 panic!("Expected Changed response");
1729 };
1730 assert_eq!(body, response_body);
1731
1732 let result = exporter.send(traces.as_ref()).unwrap();
1733 let AgentResponse::Unchanged = result else {
1734 panic!("Expected Unchanged response");
1735 };
1736 traces_endpoint.assert_calls(2);
1737 traces_endpoint.delete();
1738
1739 let traces_endpoint = server.mock(|when, then| {
1740 when.method(POST).path("/v0.4/traces");
1741 then.status(200)
1742 .header("content-type", "application/json")
1743 .header("datadog-rates-payload-version", "def")
1744 .body(response_body);
1745 });
1746 let result = exporter.send(traces.as_ref()).unwrap();
1747 let AgentResponse::Changed { body } = result else {
1748 panic!("Expected Changed response");
1749 };
1750 assert_eq!(body, response_body);
1751
1752 let result = exporter.send(traces.as_ref()).unwrap();
1753 let AgentResponse::Unchanged = result else {
1754 panic!("Expected Unchanged response");
1755 };
1756 traces_endpoint.assert_calls(2);
1757 }
1758
1759 #[test]
1760 #[cfg_attr(miri, ignore)]
1761 fn test_agent_malfunction_info_4xx() {
1762 test_agent_malfunction_info(404, r#"{"error":"Not Found"}"#, Duration::from_secs(0));
1763 }
1764
1765 #[test]
1766 #[cfg_attr(miri, ignore)]
1767 fn test_agent_malfunction_info_5xx() {
1768 test_agent_malfunction_info(
1769 500,
1770 r#"{"error":"Internal Server Error"}"#,
1771 Duration::from_secs(0),
1772 );
1773 }
1774
1775 #[test]
1776 #[cfg_attr(miri, ignore)]
1777 fn test_agent_malfunction_info_timeout() {
1778 test_agent_malfunction_info(
1779 408,
1780 r#"{"error":"Internal Server Error"}"#,
1781 Duration::from_secs(600),
1782 );
1783 }
1784
1785 #[test]
1786 #[cfg_attr(miri, ignore)]
1787 fn test_agent_malfunction_info_wrong_answer() {
1788 test_agent_malfunction_info(200, "WRONG_ANSWER", Duration::from_secs(0));
1789 }
1790
1791 fn test_agent_malfunction_info(status: u16, response: &str, delay: Duration) {
1792 let server = MockServer::start();
1793
1794 let mock_traces = server.mock(|when, then| {
1795 when.method(POST)
1796 .header("Content-type", "application/msgpack")
1797 .path("/v0.4/traces");
1798 then.status(200).body(
1799 r#"{
1800 "rate_by_service": {
1801 "service:test,env:staging": 1.0,
1802 }
1803 }"#,
1804 );
1805 });
1806
1807 let mock_info = server.mock(|when, then| {
1808 when.method(GET).path("/info");
1809 then.delay(delay).status(status).body(response);
1810 });
1811
1812 let mut builder = TraceExporterBuilder::default();
1813 builder
1814 .set_url(&server.url("/"))
1815 .set_service("test")
1816 .set_env("staging")
1817 .set_tracer_version("v0.1")
1818 .set_language("nodejs")
1819 .set_language_version("1.0")
1820 .set_language_interpreter("v8")
1821 .set_input_format(TraceExporterInputFormat::V04)
1822 .set_output_format(TraceExporterOutputFormat::V04)
1823 .enable_stats(Duration::from_secs(10));
1824 let exporter = builder.build().unwrap();
1825
1826 let trace_chunk = vec![SpanBytes {
1827 duration: 10,
1828 ..Default::default()
1829 }];
1830
1831 let data = msgpack_encoder::v04::to_vec(&[trace_chunk]);
1832
1833 while mock_info.calls() == 0 {
1835 exporter
1836 .runtime
1837 .lock()
1838 .unwrap()
1839 .as_ref()
1840 .unwrap()
1841 .block_on(async {
1842 sleep(Duration::from_millis(100)).await;
1843 })
1844 }
1845
1846 let _ = exporter.send(data.as_ref()).unwrap();
1847
1848 exporter.shutdown(None).unwrap();
1849
1850 mock_traces.assert();
1851 }
1852
1853 #[test]
1854 #[cfg_attr(miri, ignore)]
1855 fn test_connection_timeout() {
1856 let exporter = TraceExporterBuilder::default().build().unwrap();
1857
1858 assert_eq!(exporter.endpoint.timeout_ms, Endpoint::default().timeout_ms);
1859
1860 let timeout = Some(42);
1861 let mut builder = TraceExporterBuilder::default();
1862 builder.set_connection_timeout(timeout);
1863
1864 let exporter = builder.build().unwrap();
1865
1866 assert_eq!(exporter.endpoint.timeout_ms, 42);
1867 }
1868
1869 #[test]
1870 #[cfg_attr(miri, ignore)]
1871 fn stop_and_start_runtime() {
1872 let builder = TraceExporterBuilder::default();
1873 let exporter = builder.build().unwrap();
1874 exporter.stop_worker();
1875 exporter.run_worker().unwrap();
1876 }
1877}
1878
1879#[cfg(test)]
1880mod single_threaded_tests {
1881 use super::*;
1882 use crate::agent_info;
1883 use httpmock::prelude::*;
1884 use libdd_trace_utils::msgpack_encoder;
1885 use libdd_trace_utils::span::v04::SpanBytes;
1886 use std::time::Duration;
1887 use tokio::time::sleep;
1888
1889 #[cfg_attr(miri, ignore)]
1890 #[test]
1891 fn test_shutdown() {
1892 agent_info::clear_cache_for_test();
1894
1895 let server = MockServer::start();
1896
1897 let mock_traces = server.mock(|when, then| {
1898 when.method(POST)
1899 .header("Content-type", "application/msgpack")
1900 .path("/v0.4/traces");
1901 then.status(200).body("");
1902 });
1903
1904 let mock_stats = server.mock(|when, then| {
1905 when.method(POST)
1906 .header("Content-type", "application/msgpack")
1907 .path("/v0.6/stats");
1908 then.status(200).body("");
1909 });
1910
1911 let _mock_info = server.mock(|when, then| {
1912 when.method(GET).path("/info");
1913 then.status(200)
1914 .header("content-type", "application/json")
1915 .header("datadog-agent-state", "1")
1916 .body(r#"{"version":"1","client_drop_p0s":true,"endpoints":["/v0.4/traces","/v0.6/stats"]}"#);
1917 });
1918
1919 let mut builder = TraceExporterBuilder::default();
1920 builder
1921 .set_url(&server.url("/"))
1922 .set_service("test")
1923 .set_env("staging")
1924 .set_tracer_version("v0.1")
1925 .set_language("nodejs")
1926 .set_language_version("1.0")
1927 .set_language_interpreter("v8")
1928 .set_input_format(TraceExporterInputFormat::V04)
1929 .set_output_format(TraceExporterOutputFormat::V04)
1930 .enable_stats(Duration::from_secs(10));
1931 let exporter = builder.build().unwrap();
1932
1933 let trace_chunk = vec![SpanBytes {
1934 duration: 10,
1935 ..Default::default()
1936 }];
1937
1938 let data = msgpack_encoder::v04::to_vec(&[trace_chunk]);
1939
1940 while agent_info::get_agent_info().is_none() {
1942 exporter
1943 .runtime
1944 .lock()
1945 .unwrap()
1946 .as_ref()
1947 .unwrap()
1948 .block_on(async {
1949 sleep(Duration::from_millis(100)).await;
1950 })
1951 }
1952
1953 let result = exporter.send(data.as_ref());
1954 assert!(result.is_err());
1956
1957 let start_time = std::time::Instant::now();
1960 while !exporter.is_stats_worker_active() {
1961 if start_time.elapsed() > Duration::from_secs(10) {
1962 panic!("Timeout waiting for stats worker to become active");
1963 }
1964 std::thread::sleep(Duration::from_millis(10));
1965 }
1966
1967 exporter.shutdown(None).unwrap();
1968
1969 for _ in 0..1000 {
1971 if mock_traces.calls() > 0 && mock_stats.calls() > 0 {
1972 break;
1973 } else {
1974 std::thread::sleep(Duration::from_millis(10));
1975 }
1976 }
1977
1978 mock_traces.assert();
1979 mock_stats.assert();
1980 }
1981
1982 #[cfg_attr(miri, ignore)]
1983 #[test]
1984 fn test_shutdown_with_timeout() {
1985 agent_info::clear_cache_for_test();
1987
1988 let server = MockServer::start();
1989
1990 let mock_traces = server.mock(|when, then| {
1991 when.method(POST)
1992 .header("Content-type", "application/msgpack")
1993 .path("/v0.4/traces");
1994 then.status(200).body(
1995 r#"{
1996 "rate_by_service": {
1997 "service:foo,env:staging": 1.0,
1998 "service:,env:": 0.8
1999 }
2000 }"#,
2001 );
2002 });
2003
2004 let _mock_stats = server.mock(|when, then| {
2005 when.method(POST)
2006 .header("Content-type", "application/msgpack")
2007 .path("/v0.6/stats");
2008 then.delay(Duration::from_secs(10)).status(200).body("");
2009 });
2010
2011 let _mock_info = server.mock(|when, then| {
2012 when.method(GET).path("/info");
2013 then.status(200)
2014 .header("content-type", "application/json")
2015 .header("datadog-agent-state", "1")
2016 .body(r#"{"version":"1","client_drop_p0s":true,"endpoints":["/v0.4/traces","/v0.6/stats"]}"#);
2017 });
2018
2019 let mut builder = TraceExporterBuilder::default();
2020 builder
2021 .set_url(&server.url("/"))
2022 .set_service("test")
2023 .set_env("staging")
2024 .set_tracer_version("v0.1")
2025 .set_language("nodejs")
2026 .set_language_version("1.0")
2027 .set_language_interpreter("v8")
2028 .set_input_format(TraceExporterInputFormat::V04)
2029 .set_output_format(TraceExporterOutputFormat::V04)
2030 .enable_stats(Duration::from_secs(10));
2031 let exporter = builder.build().unwrap();
2032
2033 let trace_chunk = vec![SpanBytes {
2034 service: "test".into(),
2035 name: "test".into(),
2036 resource: "test".into(),
2037 r#type: "test".into(),
2038 duration: 10,
2039 ..Default::default()
2040 }];
2041
2042 let data = msgpack_encoder::v04::to_vec(&[trace_chunk]);
2043
2044 while agent_info::get_agent_info().is_none() {
2047 exporter
2048 .runtime
2049 .lock()
2050 .unwrap()
2051 .as_ref()
2052 .unwrap()
2053 .block_on(async {
2054 sleep(Duration::from_millis(100)).await;
2055 })
2056 }
2057
2058 exporter.send(data.as_ref()).unwrap();
2059
2060 let start_time = std::time::Instant::now();
2063 while !exporter.is_stats_worker_active() {
2064 if start_time.elapsed() > Duration::from_secs(10) {
2065 panic!("Timeout waiting for stats worker to become active");
2066 }
2067 std::thread::sleep(Duration::from_millis(10));
2068 }
2069
2070 exporter
2071 .shutdown(Some(Duration::from_millis(5)))
2072 .unwrap_err(); mock_traces.assert();
2075 }
2076}