1pub mod agent_response;
4pub mod builder;
5pub mod error;
6pub mod metrics;
7pub mod stats;
8mod trace_serializer;
9
10pub use builder::TraceExporterBuilder;
12
13use self::agent_response::AgentResponse;
14use self::metrics::MetricsEmitter;
15use self::stats::StatsComputationStatus;
16use self::trace_serializer::TraceSerializer;
17use crate::agent_info::{AgentInfoFetcher, ResponseObserver};
18use crate::pausable_worker::PausableWorker;
19use crate::stats_exporter::StatsExporter;
20use crate::telemetry::{SendPayloadTelemetry, TelemetryClient};
21use crate::trace_exporter::agent_response::{
22 AgentResponsePayloadVersion, DATADOG_RATES_PAYLOAD_VERSION_HEADER,
23};
24use crate::trace_exporter::error::{InternalErrorKind, RequestError, TraceExporterError};
25use crate::{
26 agent_info::{self, schema::AgentInfo},
27 health_metrics,
28 health_metrics::{HealthMetric, SendResult, TransportErrorType},
29};
30use arc_swap::{ArcSwap, ArcSwapOption};
31use http::uri::PathAndQuery;
32use http::Uri;
33use http_body_util::BodyExt;
34use libdd_common::tag::Tag;
35use libdd_common::{http_common, Endpoint};
36use libdd_common::{HttpClient, MutexExt};
37use libdd_dogstatsd_client::Client;
38use libdd_telemetry::worker::TelemetryWorker;
39use libdd_trace_utils::msgpack_decoder;
40use libdd_trace_utils::send_with_retry::{
41 send_with_retry, RetryStrategy, SendWithRetryError, SendWithRetryResult,
42};
43use libdd_trace_utils::span::{v04::Span, TraceData};
44use libdd_trace_utils::trace_utils::TracerHeaderTags;
45use std::io;
46use std::sync::{Arc, Mutex};
47use std::time::Duration;
48use std::{borrow::Borrow, collections::HashMap, str::FromStr};
49use tokio::runtime::Runtime;
50use tracing::{debug, error, warn};
51
52const INFO_ENDPOINT: &str = "/info";
53
54#[derive(Copy, Clone, Debug, Default, PartialEq)]
57#[repr(C)]
58pub enum TraceExporterInputFormat {
59 #[allow(missing_docs)]
60 #[default]
61 V04,
62 V05,
63}
64
65#[derive(Copy, Clone, Debug, Default, PartialEq)]
68#[repr(C)]
69pub enum TraceExporterOutputFormat {
70 #[allow(missing_docs)]
71 #[default]
72 V04,
73 V05,
74}
75
76impl TraceExporterOutputFormat {
77 fn add_path(&self, url: &Uri) -> Uri {
79 add_path(
80 url,
81 match self {
82 TraceExporterOutputFormat::V04 => "/v0.4/traces",
83 TraceExporterOutputFormat::V05 => "/v0.5/traces",
84 },
85 )
86 }
87}
88
89fn add_path(url: &Uri, path: &str) -> Uri {
96 let p_and_q = url.path_and_query();
97
98 #[allow(clippy::unwrap_used)]
99 let new_p_and_q = match p_and_q {
100 Some(pq) => {
101 let p = pq.path();
102 let mut p = p.strip_suffix('/').unwrap_or(p).to_owned();
103 p.push_str(path);
104
105 PathAndQuery::from_str(p.as_str())
106 }
107 None => PathAndQuery::from_str(path),
108 }
109 .unwrap();
111 let mut parts = url.clone().into_parts();
112 parts.path_and_query = Some(new_p_and_q);
113 #[allow(clippy::unwrap_used)]
115 Uri::from_parts(parts).unwrap()
116}
117
118#[derive(Clone, Default, Debug)]
119pub struct TracerMetadata {
120 pub hostname: String,
121 pub env: String,
122 pub app_version: String,
123 pub runtime_id: String,
124 pub service: String,
125 pub tracer_version: String,
126 pub language: String,
127 pub language_version: String,
128 pub language_interpreter: String,
129 pub language_interpreter_vendor: String,
130 pub git_commit_sha: String,
131 pub client_computed_stats: bool,
132 pub client_computed_top_level: bool,
133}
134
135impl<'a> From<&'a TracerMetadata> for TracerHeaderTags<'a> {
136 fn from(tags: &'a TracerMetadata) -> TracerHeaderTags<'a> {
137 TracerHeaderTags::<'_> {
138 lang: &tags.language,
139 lang_version: &tags.language_version,
140 tracer_version: &tags.tracer_version,
141 lang_interpreter: &tags.language_interpreter,
142 lang_vendor: &tags.language_interpreter_vendor,
143 client_computed_stats: tags.client_computed_stats,
144 client_computed_top_level: tags.client_computed_top_level,
145 ..Default::default()
146 }
147 }
148}
149
150impl<'a> From<&'a TracerMetadata> for HashMap<&'static str, String> {
151 fn from(tags: &'a TracerMetadata) -> HashMap<&'static str, String> {
152 TracerHeaderTags::from(tags).into()
153 }
154}
155
156#[derive(Debug)]
157pub(crate) struct TraceExporterWorkers {
158 pub info: PausableWorker<AgentInfoFetcher>,
159 pub stats: Option<PausableWorker<StatsExporter>>,
160 pub telemetry: Option<PausableWorker<TelemetryWorker>>,
161}
162
163#[allow(missing_docs)]
182enum DeserInputFormat {
183 V04,
184 V05,
185}
186
187#[derive(Debug)]
188pub struct TraceExporter {
189 endpoint: Endpoint,
190 metadata: TracerMetadata,
191 input_format: TraceExporterInputFormat,
192 output_format: TraceExporterOutputFormat,
193 runtime: Arc<Mutex<Option<Arc<Runtime>>>>,
195 dogstatsd: Option<Client>,
197 common_stats_tags: Vec<Tag>,
198 client_computed_top_level: bool,
199 client_side_stats: ArcSwap<StatsComputationStatus>,
200 previous_info_state: ArcSwapOption<String>,
201 info_response_observer: ResponseObserver,
202 telemetry: Option<TelemetryClient>,
203 health_metrics_enabled: bool,
204 workers: Arc<Mutex<TraceExporterWorkers>>,
205 agent_payload_response_version: Option<AgentResponsePayloadVersion>,
206 http_client: HttpClient,
207}
208
209impl TraceExporter {
210 #[allow(missing_docs)]
211 pub fn builder() -> TraceExporterBuilder {
212 TraceExporterBuilder::default()
213 }
214
215 fn runtime(&self) -> Result<Arc<Runtime>, TraceExporterError> {
217 let mut runtime_guard = self.runtime.lock_or_panic();
218 match runtime_guard.as_ref() {
219 Some(runtime) => {
220 Ok(runtime.clone())
222 }
223 None => {
224 let runtime = Arc::new(
226 tokio::runtime::Builder::new_multi_thread()
227 .worker_threads(1)
228 .enable_all()
229 .build()?,
230 );
231 *runtime_guard = Some(runtime.clone());
232 self.start_all_workers(&runtime)?;
233 Ok(runtime)
234 }
235 }
236 }
237
238 pub fn run_worker(&self) -> Result<(), TraceExporterError> {
240 self.runtime()?;
241 Ok(())
242 }
243
244 fn start_all_workers(&self, runtime: &Arc<Runtime>) -> Result<(), TraceExporterError> {
246 let mut workers = self.workers.lock_or_panic();
247
248 self.start_info_worker(&mut workers, runtime)?;
249 self.start_stats_worker(&mut workers, runtime)?;
250 self.start_telemetry_worker(&mut workers, runtime)?;
251
252 Ok(())
253 }
254
255 fn start_info_worker(
257 &self,
258 workers: &mut TraceExporterWorkers,
259 runtime: &Arc<Runtime>,
260 ) -> Result<(), TraceExporterError> {
261 workers.info.start(runtime).map_err(|e| {
262 TraceExporterError::Internal(InternalErrorKind::InvalidWorkerState(e.to_string()))
263 })
264 }
265
266 fn start_stats_worker(
268 &self,
269 workers: &mut TraceExporterWorkers,
270 runtime: &Arc<Runtime>,
271 ) -> Result<(), TraceExporterError> {
272 if let Some(stats_worker) = &mut workers.stats {
273 stats_worker.start(runtime).map_err(|e| {
274 TraceExporterError::Internal(InternalErrorKind::InvalidWorkerState(e.to_string()))
275 })?;
276 }
277 Ok(())
278 }
279
280 fn start_telemetry_worker(
282 &self,
283 workers: &mut TraceExporterWorkers,
284 runtime: &Arc<Runtime>,
285 ) -> Result<(), TraceExporterError> {
286 if let Some(telemetry_worker) = &mut workers.telemetry {
287 telemetry_worker.start(runtime).map_err(|e| {
288 TraceExporterError::Internal(InternalErrorKind::InvalidWorkerState(e.to_string()))
289 })?;
290 if let Some(client) = &self.telemetry {
291 runtime.block_on(client.start());
292 }
293 }
294 Ok(())
295 }
296
297 pub fn stop_worker(&self) {
298 let runtime = self.runtime.lock_or_panic().take();
299 if let Some(ref rt) = runtime {
300 let mut workers = self.workers.lock_or_panic();
302 rt.block_on(async {
303 let _ = workers.info.pause().await;
304 if let Some(stats_worker) = &mut workers.stats {
305 let _ = stats_worker.pause().await;
306 };
307 if let Some(telemetry_worker) = &mut workers.telemetry {
308 let _ = telemetry_worker.pause().await;
309 };
310 });
311 }
312 if let PausableWorker::Paused { worker } = &mut self.workers.lock_or_panic().info {
318 self.info_response_observer.manual_trigger();
319 worker.drain();
320 }
321 drop(runtime);
322 }
323
324 pub fn send(&self, data: &[u8]) -> Result<AgentResponse, TraceExporterError> {
335 self.check_agent_info();
336
337 let res = match self.input_format {
338 TraceExporterInputFormat::V04 => self.send_deser(data, DeserInputFormat::V04),
339 TraceExporterInputFormat::V05 => self.send_deser(data, DeserInputFormat::V05),
340 }?;
341 if matches!(&res, AgentResponse::Changed { body } if body.is_empty()) {
342 return Err(TraceExporterError::Agent(
343 error::AgentErrorKind::EmptyResponse,
344 ));
345 }
346
347 Ok(res)
348 }
349
350 pub fn shutdown(mut self, timeout: Option<Duration>) -> Result<(), TraceExporterError> {
352 let runtime = tokio::runtime::Builder::new_current_thread()
353 .enable_all()
354 .build()?;
355
356 if let Some(timeout) = timeout {
357 match runtime
358 .block_on(async { tokio::time::timeout(timeout, self.shutdown_async()).await })
359 {
360 Ok(()) => Ok(()),
361 Err(_e) => Err(TraceExporterError::Shutdown(
362 error::ShutdownError::TimedOut(timeout),
363 )),
364 }
365 } else {
366 runtime.block_on(self.shutdown_async());
367 Ok(())
368 }
369 }
370
371 async fn shutdown_async(&mut self) {
376 let stats_status = self.client_side_stats.load();
377 if let StatsComputationStatus::Enabled {
378 cancellation_token, ..
379 } = stats_status.as_ref()
380 {
381 cancellation_token.cancel();
382
383 let stats_worker = self.workers.lock_or_panic().stats.take();
384
385 if let Some(stats_worker) = stats_worker {
386 let _ = stats_worker.join().await;
387 }
388 }
389 if let Some(telemetry) = self.telemetry.take() {
390 telemetry.shutdown().await;
391 let telemetry_worker = self.workers.lock_or_panic().telemetry.take();
392
393 if let Some(telemetry_worker) = telemetry_worker {
394 let _ = telemetry_worker.join().await;
395 }
396 }
397 }
398
399 fn has_agent_info_state_changed(&self, agent_info: &Arc<AgentInfo>) -> bool {
401 Some(agent_info.state_hash.as_str())
402 != self
403 .previous_info_state
404 .load()
405 .as_deref()
406 .map(|s| s.as_str())
407 }
408
409 fn check_agent_info(&self) {
410 if let Some(agent_info) = agent_info::get_agent_info() {
411 if self.has_agent_info_state_changed(&agent_info) {
412 match &**self.client_side_stats.load() {
413 StatsComputationStatus::Disabled => {}
414 StatsComputationStatus::DisabledByAgent { .. } => {
415 let ctx = stats::StatsContext {
416 metadata: &self.metadata,
417 endpoint_url: &self.endpoint.url,
418 runtime: &self.runtime,
419 };
420 stats::handle_stats_disabled_by_agent(
421 &ctx,
422 &agent_info,
423 &self.client_side_stats,
424 &self.workers,
425 self.http_client.clone(),
426 );
427 }
428 StatsComputationStatus::Enabled {
429 stats_concentrator, ..
430 } => {
431 let ctx = stats::StatsContext {
432 metadata: &self.metadata,
433 endpoint_url: &self.endpoint.url,
434 runtime: &self.runtime,
435 };
436 stats::handle_stats_enabled(
437 &ctx,
438 &agent_info,
439 stats_concentrator,
440 &self.client_side_stats,
441 &self.workers,
442 );
443 }
444 }
445 self.previous_info_state
446 .store(Some(agent_info.state_hash.clone().into()))
447 }
448 }
449 }
450
451 #[cfg(feature = "test-utils")]
467 pub fn wait_agent_info_ready(&self, timeout: Duration) -> anyhow::Result<()> {
468 let start = std::time::Instant::now();
469 loop {
470 if std::time::Instant::now().duration_since(start) > timeout {
471 anyhow::bail!("Timeout waiting for agent info to be ready",);
472 }
473 if agent_info::get_agent_info().is_some() {
474 return Ok(());
475 }
476 std::thread::sleep(Duration::from_millis(10));
477 }
478 }
479
480 fn emit_metric(&self, metric: HealthMetric, custom_tags: Option<Vec<&Tag>>) {
482 if self.health_metrics_enabled {
483 let emitter = MetricsEmitter::new(self.dogstatsd.as_ref(), &self.common_stats_tags);
484 emitter.emit(metric, custom_tags);
485 }
486 }
487
488 fn emit_send_result(&self, result: &SendResult) {
490 if self.health_metrics_enabled {
491 let emitter = MetricsEmitter::new(self.dogstatsd.as_ref(), &self.common_stats_tags);
492 emitter.emit_from_send_result(result);
493 }
494 }
495
496 pub fn send_trace_chunks<T: TraceData>(
505 &self,
506 trace_chunks: Vec<Vec<Span<T>>>,
507 ) -> Result<AgentResponse, TraceExporterError> {
508 self.check_agent_info();
509 self.runtime()?
510 .block_on(async { self.send_trace_chunks_inner(trace_chunks).await })
511 }
512
513 pub async fn send_trace_chunks_async<T: TraceData>(
522 &self,
523 trace_chunks: Vec<Vec<Span<T>>>,
524 ) -> Result<AgentResponse, TraceExporterError> {
525 self.check_agent_info();
526 self.send_trace_chunks_inner(trace_chunks).await
527 }
528
529 fn send_deser(
531 &self,
532 data: &[u8],
533 format: DeserInputFormat,
534 ) -> Result<AgentResponse, TraceExporterError> {
535 let (traces, _) = match format {
536 DeserInputFormat::V04 => msgpack_decoder::v04::from_slice(data),
537 DeserInputFormat::V05 => msgpack_decoder::v05::from_slice(data),
538 }
539 .map_err(|e| {
540 error!("Error deserializing trace from request body: {e}");
541 self.emit_metric(
542 HealthMetric::Count(health_metrics::DESERIALIZE_TRACES_ERRORS, 1),
543 None,
544 );
545 TraceExporterError::Deserialization(e)
546 })?;
547 debug!(
548 trace_count = traces.len(),
549 "Trace deserialization completed successfully"
550 );
551 self.emit_metric(
552 HealthMetric::Count(health_metrics::DESERIALIZE_TRACES, traces.len() as i64),
553 None,
554 );
555
556 self.runtime()?
557 .block_on(async { self.send_trace_chunks_inner(traces).await })
558 }
559
560 async fn send_traces_with_telemetry(
562 &self,
563 endpoint: &Endpoint,
564 mp_payload: Vec<u8>,
565 headers: HashMap<&'static str, String>,
566 chunks: usize,
567 chunks_dropped_p0: usize,
568 ) -> Result<AgentResponse, TraceExporterError> {
569 let strategy = RetryStrategy::default();
570 let payload_len = mp_payload.len();
571
572 let result =
574 send_with_retry(&self.http_client, endpoint, mp_payload, &headers, &strategy).await;
575
576 if let Some(telemetry) = &self.telemetry {
578 if let Err(e) = telemetry.send(&SendPayloadTelemetry::from_retry_result(
579 &result,
580 payload_len as u64,
581 chunks as u64,
582 chunks_dropped_p0 as u64,
583 )) {
584 error!(?e, "Error sending telemetry");
585 }
586 }
587
588 self.handle_send_result(result, chunks, payload_len).await
589 }
590
591 async fn send_trace_chunks_inner<T: TraceData>(
592 &self,
593 mut traces: Vec<Vec<Span<T>>>,
594 ) -> Result<AgentResponse, TraceExporterError> {
595 let mut header_tags: TracerHeaderTags = self.metadata.borrow().into();
596
597 let dropped_p0_stats = stats::process_traces_for_stats(
599 &mut traces,
600 &mut header_tags,
601 &self.client_side_stats,
602 self.client_computed_top_level,
603 );
604
605 let serializer = TraceSerializer::new(
606 self.output_format,
607 self.agent_payload_response_version.as_ref(),
608 );
609 let prepared = match serializer.prepare_traces_payload(traces, header_tags) {
610 Ok(p) => p,
611 Err(e) => {
612 error!("Error serializing traces: {e}");
613 self.emit_metric(
614 HealthMetric::Count(health_metrics::SERIALIZE_TRACES_ERRORS, 1),
615 None,
616 );
617 return Err(e);
618 }
619 };
620
621 let endpoint = Endpoint {
622 url: self.get_agent_url(),
623 ..self.endpoint.clone()
624 };
625
626 self.send_traces_with_telemetry(
627 &endpoint,
628 prepared.data,
629 prepared.headers,
630 prepared.chunk_count,
631 dropped_p0_stats.dropped_p0_traces,
632 )
633 .await
634 }
635
636 async fn handle_send_result(
638 &self,
639 result: SendWithRetryResult,
640 chunks: usize,
641 payload_len: usize,
642 ) -> Result<AgentResponse, TraceExporterError> {
643 match result {
644 Ok((response, attempts)) => {
645 self.handle_agent_response(chunks, response, payload_len, attempts)
646 .await
647 }
648 Err(err) => self.handle_send_error(err, payload_len, chunks).await,
649 }
650 }
651
652 async fn handle_send_error(
654 &self,
655 err: SendWithRetryError,
656 payload_len: usize,
657 chunks: usize,
658 ) -> Result<AgentResponse, TraceExporterError> {
659 error!(?err, "Error sending traces");
660
661 match err {
662 SendWithRetryError::Http(response, attempts) => {
663 self.handle_http_send_error(response, payload_len, chunks, attempts)
664 .await
665 }
666 SendWithRetryError::Timeout(attempts) => {
667 let send_result =
668 SendResult::failure(TransportErrorType::Timeout, payload_len, chunks, attempts);
669 self.emit_send_result(&send_result);
670 Err(TraceExporterError::from(io::Error::from(
671 io::ErrorKind::TimedOut,
672 )))
673 }
674 SendWithRetryError::Network(err, attempts) => {
675 let send_result =
676 SendResult::failure(TransportErrorType::Network, payload_len, chunks, attempts);
677 self.emit_send_result(&send_result);
678 Err(TraceExporterError::from(err))
679 }
680 SendWithRetryError::Build(attempts) => {
681 let send_result =
682 SendResult::failure(TransportErrorType::Build, payload_len, chunks, attempts);
683 self.emit_send_result(&send_result);
684 Err(TraceExporterError::from(io::Error::from(
685 io::ErrorKind::Other,
686 )))
687 }
688 }
689 }
690
691 async fn handle_http_send_error(
693 &self,
694 response: http_common::HttpResponse,
695 payload_len: usize,
696 chunks: usize,
697 attempts: u32,
698 ) -> Result<AgentResponse, TraceExporterError> {
699 let status = response.status();
700
701 self.info_response_observer.check_response(&response);
703
704 let send_result = SendResult::failure(
706 TransportErrorType::Http(status.as_u16()),
707 payload_len,
708 chunks,
709 attempts,
710 );
711 self.emit_send_result(&send_result);
712
713 let body = self.read_error_response_body(response).await?;
714 Err(TraceExporterError::Request(RequestError::new(
715 status,
716 &String::from_utf8_lossy(&body),
717 )))
718 }
719
720 async fn read_error_response_body(
722 &self,
723 response: http_common::HttpResponse,
724 ) -> Result<bytes::Bytes, TraceExporterError> {
725 match response.into_body().collect().await {
726 Ok(body) => Ok(body.to_bytes()),
727 Err(err) => {
728 error!(?err, "Error reading agent response body");
729 Err(TraceExporterError::from(err))
730 }
731 }
732 }
733
734 fn check_payload_version_changed(&self, response: &http_common::HttpResponse) -> bool {
736 let status = response.status();
737 match (
738 status.is_success(),
739 self.agent_payload_response_version.as_ref(),
740 response.headers().get(DATADOG_RATES_PAYLOAD_VERSION_HEADER),
741 ) {
742 (false, _, _) => {
743 false
745 }
746 (true, None, _) => {
747 true
750 }
751 (true, Some(agent_payload_response_version), Some(new_payload_version)) => {
752 if let Ok(new_payload_version_str) = new_payload_version.to_str() {
753 agent_payload_response_version.check_and_update(new_payload_version_str)
754 } else {
755 false
756 }
757 }
758 _ => false,
759 }
760 }
761
762 async fn read_response_body(
764 response: http_common::HttpResponse,
765 ) -> Result<String, http_common::Error> {
766 let body = http_common::collect_response_bytes(response).await?;
767 Ok(String::from_utf8_lossy(&body).to_string())
768 }
769
770 fn handle_successful_trace_response(
772 &self,
773 chunks: usize,
774 payload_len: usize,
775 attempts: u32,
776 body: String,
777 payload_version_changed: bool,
778 ) -> Result<AgentResponse, TraceExporterError> {
779 debug!(chunks = chunks, "Trace chunks sent successfully to agent");
780 let send_result = SendResult::success(payload_len, chunks, attempts);
781 self.emit_send_result(&send_result);
782
783 Ok(if payload_version_changed {
784 AgentResponse::Changed { body }
785 } else {
786 AgentResponse::Unchanged
787 })
788 }
789
790 async fn handle_agent_response(
791 &self,
792 chunks: usize,
793 response: http_common::HttpResponse,
794 payload_len: usize,
795 attempts: u32,
796 ) -> Result<AgentResponse, TraceExporterError> {
797 self.info_response_observer.check_response(&response);
799
800 let status = response.status();
801 let payload_version_changed = self.check_payload_version_changed(&response);
802
803 match Self::read_response_body(response).await {
804 Ok(body) => {
805 if !status.is_success() {
806 warn!(
807 status = %status,
808 "Agent returned non-success status for trace send"
809 );
810 let send_result = SendResult::failure(
811 TransportErrorType::Http(status.as_u16()),
812 payload_len,
813 chunks,
814 attempts,
815 );
816 self.emit_send_result(&send_result);
817 return Err(TraceExporterError::Request(RequestError::new(
818 status, &body,
819 )));
820 }
821
822 self.handle_successful_trace_response(
823 chunks,
824 payload_len,
825 attempts,
826 body,
827 payload_version_changed,
828 )
829 }
830 Err(err) => {
831 error!(?err, "Error reading agent response body");
832 let send_result = SendResult::failure(
833 TransportErrorType::ResponseBody,
834 payload_len,
835 chunks,
836 attempts,
837 );
838 self.emit_send_result(&send_result);
839 Err(TraceExporterError::from(err))
840 }
841 }
842 }
843
844 fn get_agent_url(&self) -> Uri {
845 self.output_format.add_path(&self.endpoint.url)
846 }
847
848 #[cfg(test)]
849 pub fn is_stats_worker_active(&self) -> bool {
851 stats::is_stats_worker_active(&self.client_side_stats, &self.workers)
852 }
853}
854
855#[derive(Debug, Default, Clone)]
856pub struct TelemetryConfig {
857 pub heartbeat: u64,
858 pub runtime_id: Option<String>,
859 pub debug_enabled: bool,
860}
861
862#[allow(missing_docs)]
863pub trait ResponseCallback {
864 #[allow(missing_docs)]
865 fn call(&self, response: &str);
866}
867
868#[cfg(test)]
869mod tests {
870 use self::error::AgentErrorKind;
871 use super::*;
872 use httpmock::prelude::*;
873 use httpmock::MockServer;
874 use libdd_tinybytes::BytesString;
875 use libdd_trace_utils::msgpack_encoder;
876 use libdd_trace_utils::span::v04::SpanBytes;
877 use libdd_trace_utils::span::v05;
878 use std::collections::HashMap;
879 use std::net;
880 use std::time::Duration;
881 use tokio::time::sleep;
882
883 const V5_EMPTY: [u8; 4] = [0x92, 0x91, 0xA0, 0x90];
885
886 #[test]
887 fn test_from_tracer_tags_to_tracer_header_tags() {
888 let tracer_tags = TracerMetadata {
889 tracer_version: "v0.1".to_string(),
890 language: "rust".to_string(),
891 language_version: "1.52.1".to_string(),
892 language_interpreter: "rustc".to_string(),
893 language_interpreter_vendor: "rust-lang".to_string(),
894 client_computed_stats: true,
895 client_computed_top_level: true,
896 ..Default::default()
897 };
898
899 let tracer_header_tags: TracerHeaderTags = (&tracer_tags).into();
900
901 assert_eq!(tracer_header_tags.tracer_version, "v0.1");
902 assert_eq!(tracer_header_tags.lang, "rust");
903 assert_eq!(tracer_header_tags.lang_version, "1.52.1");
904 assert_eq!(tracer_header_tags.lang_interpreter, "rustc");
905 assert_eq!(tracer_header_tags.lang_vendor, "rust-lang");
906 assert!(tracer_header_tags.client_computed_stats);
907 assert!(tracer_header_tags.client_computed_top_level);
908 }
909
910 #[test]
911 fn test_from_tracer_tags_to_hashmap() {
912 let tracer_tags = TracerMetadata {
913 tracer_version: "v0.1".to_string(),
914 language: "rust".to_string(),
915 language_version: "1.52.1".to_string(),
916 language_interpreter: "rustc".to_string(),
917 client_computed_stats: true,
918 client_computed_top_level: true,
919 ..Default::default()
920 };
921
922 let hashmap: HashMap<&'static str, String> = (&tracer_tags).into();
923
924 assert_eq!(hashmap.get("datadog-meta-tracer-version").unwrap(), "v0.1");
925 assert_eq!(hashmap.get("datadog-meta-lang").unwrap(), "rust");
926 assert_eq!(hashmap.get("datadog-meta-lang-version").unwrap(), "1.52.1");
927 assert_eq!(
928 hashmap.get("datadog-meta-lang-interpreter").unwrap(),
929 "rustc"
930 );
931 assert!(hashmap.contains_key("datadog-client-computed-stats"));
932 assert!(hashmap.contains_key("datadog-client-computed-top-level"));
933 }
934
935 fn read(socket: &net::UdpSocket) -> String {
936 let mut buf = [0; 1_000];
937 socket.recv(&mut buf).expect("No data");
938 let datagram = String::from_utf8_lossy(buf.as_ref());
939 datagram.trim_matches(char::from(0)).to_string()
940 }
941
942 fn build_test_exporter(
943 url: String,
944 dogstatsd_url: Option<String>,
945 input: TraceExporterInputFormat,
946 output: TraceExporterOutputFormat,
947 enable_telemetry: bool,
948 enable_health_metrics: bool,
949 ) -> TraceExporter {
950 let mut builder = TraceExporterBuilder::default();
951 builder
952 .set_url(&url)
953 .set_service("test")
954 .set_env("staging")
955 .set_tracer_version("v0.1")
956 .set_language("nodejs")
957 .set_language_version("1.0")
958 .set_language_interpreter("v8")
959 .set_input_format(input)
960 .set_output_format(output);
961
962 if enable_health_metrics {
963 builder.enable_health_metrics();
964 }
965
966 if let Some(url) = dogstatsd_url {
967 builder.set_dogstatsd_url(&url);
968 };
969
970 if enable_telemetry {
971 builder.enable_telemetry(TelemetryConfig {
972 heartbeat: 100,
973 ..Default::default()
974 });
975 }
976
977 builder.build().unwrap()
978 }
979
980 #[test]
981 #[cfg_attr(miri, ignore)]
982 fn test_health_metrics() {
983 let stats_socket = net::UdpSocket::bind("127.0.0.1:0").expect("failed to bind host socket");
984 let _ = stats_socket.set_read_timeout(Some(Duration::from_millis(500)));
985
986 let fake_agent = MockServer::start();
987 let _mock_traces = fake_agent.mock(|_, then| {
988 then.status(200)
989 .header("content-type", "application/json")
990 .body(r#"{ "rate_by_service": { "service:test,env:staging": 1.0, "service:test,env:prod": 0.3 } }"#);
991 });
992
993 let exporter = build_test_exporter(
994 fake_agent.url("/v0.4/traces"),
995 Some(stats_socket.local_addr().unwrap().to_string()),
996 TraceExporterInputFormat::V04,
997 TraceExporterOutputFormat::V04,
998 false,
999 true,
1000 );
1001
1002 let traces: Vec<Vec<SpanBytes>> = vec![
1003 vec![SpanBytes {
1004 name: BytesString::from_slice(b"test").unwrap(),
1005 ..Default::default()
1006 }],
1007 vec![SpanBytes {
1008 name: BytesString::from_slice(b"test2").unwrap(),
1009 ..Default::default()
1010 }],
1011 ];
1012 let data = msgpack_encoder::v04::to_vec(&traces);
1013
1014 let _result = exporter.send(data.as_ref()).expect("failed to send trace");
1015
1016 let mut received_metrics = Vec::new();
1018 for _ in 0..5 {
1019 received_metrics.push(read(&stats_socket));
1020 }
1021
1022 let expected_metrics = vec![
1024 format!(
1025 "datadog.tracer.exporter.deserialize.traces:2|c|#libdatadog_version:{}",
1026 env!("CARGO_PKG_VERSION")
1027 ),
1028 format!(
1029 "datadog.tracer.exporter.transport.traces.successful:2|c|#libdatadog_version:{}",
1030 env!("CARGO_PKG_VERSION")
1031 ),
1032 format!(
1033 "datadog.tracer.exporter.transport.sent.bytes:{}|d|#libdatadog_version:{}",
1034 data.len(),
1035 env!("CARGO_PKG_VERSION")
1036 ),
1037 format!(
1038 "datadog.tracer.exporter.transport.traces.sent:2|d|#libdatadog_version:{}",
1039 env!("CARGO_PKG_VERSION")
1040 ),
1041 format!(
1042 "datadog.tracer.exporter.transport.requests:1|d|#libdatadog_version:{}",
1043 env!("CARGO_PKG_VERSION")
1044 ),
1045 ];
1046
1047 for expected in expected_metrics {
1048 assert!(
1049 received_metrics.contains(&expected),
1050 "Expected metric '{expected}' not found in received metrics: {received_metrics:?}"
1051 );
1052 }
1053 }
1054
1055 #[test]
1056 #[cfg_attr(miri, ignore)]
1057 fn test_invalid_traces() {
1058 let stats_socket = net::UdpSocket::bind("127.0.0.1:0").expect("failed to bind host socket");
1059 let _ = stats_socket.set_read_timeout(Some(Duration::from_millis(500)));
1060
1061 let fake_agent = MockServer::start();
1062
1063 let exporter = build_test_exporter(
1064 fake_agent.url("/v0.4/traces"),
1065 Some(stats_socket.local_addr().unwrap().to_string()),
1066 TraceExporterInputFormat::V04,
1067 TraceExporterOutputFormat::V04,
1068 false,
1069 true,
1070 );
1071
1072 let bad_payload = b"some_bad_payload".as_ref();
1073 let result = exporter.send(bad_payload);
1074
1075 assert!(result.is_err());
1076
1077 assert_eq!(
1078 &format!(
1079 "datadog.tracer.exporter.deserialize.errors:1|c|#libdatadog_version:{}",
1080 env!("CARGO_PKG_VERSION")
1081 ),
1082 &read(&stats_socket)
1083 );
1084 }
1085
1086 #[test]
1087 #[cfg_attr(miri, ignore)]
1088 fn test_health_metrics_error() {
1089 let stats_socket = net::UdpSocket::bind("127.0.0.1:0").expect("failed to bind host socket");
1090 let _ = stats_socket.set_read_timeout(Some(Duration::from_millis(500)));
1091
1092 let fake_agent = MockServer::start();
1093 let _mock_traces = fake_agent.mock(|_, then| {
1094 then.status(400)
1095 .header("content-type", "application/json")
1096 .body("{}");
1097 });
1098
1099 let exporter = build_test_exporter(
1100 fake_agent.url("/v0.4/traces"),
1101 Some(stats_socket.local_addr().unwrap().to_string()),
1102 TraceExporterInputFormat::V04,
1103 TraceExporterOutputFormat::V04,
1104 false,
1105 true,
1106 );
1107
1108 let traces: Vec<Vec<SpanBytes>> = vec![vec![SpanBytes {
1109 name: BytesString::from_slice(b"test").unwrap(),
1110 ..Default::default()
1111 }]];
1112 let data = msgpack_encoder::v04::to_vec(&traces);
1113 let result = exporter.send(data.as_ref());
1114
1115 assert!(result.is_err());
1116
1117 let mut metrics = Vec::new();
1119 loop {
1120 let mut buf = [0; 1_000];
1121 match stats_socket.recv(&mut buf) {
1122 Ok(size) => {
1123 let datagram = String::from_utf8_lossy(&buf[..size]);
1124 metrics.push(datagram.to_string());
1125 }
1126 Err(_) => break, }
1128 }
1129
1130 let expected_deser = format!(
1132 "datadog.tracer.exporter.deserialize.traces:1|c|#libdatadog_version:{}",
1133 env!("CARGO_PKG_VERSION")
1134 );
1135 let expected_error = format!(
1136 "datadog.tracer.exporter.transport.traces.failed:1|c|#libdatadog_version:{},type:400",
1137 env!("CARGO_PKG_VERSION")
1138 );
1139 let expected_dropped = format!(
1140 "datadog.tracer.exporter.transport.dropped.bytes:{}|d|#libdatadog_version:{}",
1141 data.len(),
1142 env!("CARGO_PKG_VERSION")
1143 );
1144 let expected_sent_bytes = format!(
1145 "datadog.tracer.exporter.transport.sent.bytes:{}|d|#libdatadog_version:{}",
1146 data.len(),
1147 env!("CARGO_PKG_VERSION")
1148 );
1149 let expected_sent_traces = format!(
1150 "datadog.tracer.exporter.transport.traces.sent:1|d|#libdatadog_version:{}",
1151 env!("CARGO_PKG_VERSION")
1152 );
1153 let expected_dropped_traces = format!(
1154 "datadog.tracer.exporter.transport.traces.dropped:1|d|#libdatadog_version:{}",
1155 env!("CARGO_PKG_VERSION")
1156 );
1157 let expected_requests = format!(
1158 "datadog.tracer.exporter.transport.requests:5|d|#libdatadog_version:{}",
1159 env!("CARGO_PKG_VERSION")
1160 );
1161
1162 assert!(
1164 metrics.contains(&expected_deser),
1165 "Missing deser_traces metric. Got: {metrics:?}"
1166 );
1167 assert!(
1168 metrics.contains(&expected_error),
1169 "Missing send.traces.errors metric. Got: {metrics:?}"
1170 );
1171 assert!(
1172 metrics.contains(&expected_dropped),
1173 "Missing http.dropped.bytes metric. Got: {metrics:?}"
1174 );
1175 assert!(
1176 metrics.contains(&expected_dropped_traces),
1177 "Missing http.dropped.traces metric. Got: {metrics:?}"
1178 );
1179 assert!(
1180 metrics.contains(&expected_sent_bytes),
1181 "Missing http.sent.bytes metric. Got: {metrics:?}"
1182 );
1183 assert!(
1184 metrics.contains(&expected_sent_traces),
1185 "Missing http.sent.traces metric. Got: {metrics:?}"
1186 );
1187 assert!(
1188 metrics.contains(&expected_requests),
1189 "Missing http.requests metric. Got: {metrics:?}"
1190 );
1191 }
1192
1193 #[test]
1194 #[cfg_attr(miri, ignore)]
1195 fn test_health_metrics_dropped_bytes_exclusions() {
1196 let stats_socket = net::UdpSocket::bind("127.0.0.1:0").expect("failed to bind host socket");
1197 let _ = stats_socket.set_read_timeout(Some(Duration::from_millis(500)));
1198
1199 let fake_agent = MockServer::start();
1201 let _mock_traces = fake_agent.mock(|_, then| {
1202 then.status(404)
1203 .header("content-type", "application/json")
1204 .body("{}");
1205 });
1206
1207 let exporter = build_test_exporter(
1208 fake_agent.url("/v0.4/traces"),
1209 Some(stats_socket.local_addr().unwrap().to_string()),
1210 TraceExporterInputFormat::V04,
1211 TraceExporterOutputFormat::V04,
1212 false,
1213 true,
1214 );
1215
1216 let traces: Vec<Vec<SpanBytes>> = vec![vec![SpanBytes {
1217 name: BytesString::from_slice(b"test").unwrap(),
1218 ..Default::default()
1219 }]];
1220 let data = msgpack_encoder::v04::to_vec(&traces);
1221 let result = exporter.send(data.as_ref());
1222
1223 assert!(result.is_err());
1224
1225 let mut received_metrics = Vec::new();
1227 for _ in 0..5 {
1228 received_metrics.push(read(&stats_socket));
1229 }
1230
1231 let expected_deser = format!(
1233 "datadog.tracer.exporter.deserialize.traces:1|c|#libdatadog_version:{}",
1234 env!("CARGO_PKG_VERSION")
1235 );
1236 let expected_error = format!(
1237 "datadog.tracer.exporter.transport.traces.failed:1|c|#libdatadog_version:{},type:404",
1238 env!("CARGO_PKG_VERSION")
1239 );
1240 let expected_sent_bytes = format!(
1241 "datadog.tracer.exporter.transport.sent.bytes:{}|d|#libdatadog_version:{}",
1242 data.len(),
1243 env!("CARGO_PKG_VERSION")
1244 );
1245 let expected_sent_traces = format!(
1246 "datadog.tracer.exporter.transport.traces.sent:1|d|#libdatadog_version:{}",
1247 env!("CARGO_PKG_VERSION")
1248 );
1249 let expected_requests = format!(
1250 "datadog.tracer.exporter.transport.requests:5|d|#libdatadog_version:{}",
1251 env!("CARGO_PKG_VERSION")
1252 );
1253
1254 assert!(
1256 received_metrics.contains(&expected_deser),
1257 "Missing deser_traces metric. Got: {received_metrics:?}"
1258 );
1259 assert!(
1260 received_metrics.contains(&expected_error),
1261 "Missing send.traces.errors metric. Got: {received_metrics:?}"
1262 );
1263 assert!(
1264 received_metrics.contains(&expected_sent_bytes),
1265 "Missing http.sent.bytes metric. Got: {received_metrics:?}"
1266 );
1267 assert!(
1268 received_metrics.contains(&expected_sent_traces),
1269 "Missing http.sent.traces metric. Got: {received_metrics:?}"
1270 );
1271 assert!(
1272 received_metrics.contains(&expected_requests),
1273 "Missing http.requests metric. Got: {received_metrics:?}"
1274 );
1275
1276 let dropped_bytes_metric = format!(
1278 "datadog.tracer.exporter.transport.dropped.bytes:{}|d|#libdatadog_version:{}",
1279 data.len(),
1280 env!("CARGO_PKG_VERSION")
1281 );
1282 assert!(
1283 !received_metrics.contains(&dropped_bytes_metric),
1284 "Should not emit http.dropped.bytes for 404. Got: {received_metrics:?}"
1285 );
1286
1287 let dropped_traces_metric = format!(
1289 "datadog.tracer.exporter.transport.traces.dropped:1|d|#libdatadog_version:{}",
1290 env!("CARGO_PKG_VERSION")
1291 );
1292 assert!(
1293 !received_metrics.contains(&dropped_traces_metric),
1294 "Should not emit http.dropped.traces for 404. Got: {received_metrics:?}"
1295 );
1296 }
1297
1298 #[test]
1299 #[cfg_attr(miri, ignore)]
1300 fn test_health_metrics_disabled() {
1301 let stats_socket = net::UdpSocket::bind("127.0.0.1:0").expect("failed to bind host socket");
1302 let _ = stats_socket.set_read_timeout(Some(Duration::from_millis(500)));
1303
1304 let fake_agent = MockServer::start();
1305 let _mock_traces = fake_agent.mock(|_, then| {
1306 then.status(200)
1307 .header("content-type", "application/json")
1308 .body(r#"{ "rate_by_service": { "service:test,env:staging": 1.0 } }"#);
1309 });
1310
1311 let exporter = build_test_exporter(
1312 fake_agent.url("/v0.4/traces"),
1313 Some(stats_socket.local_addr().unwrap().to_string()),
1314 TraceExporterInputFormat::V04,
1315 TraceExporterOutputFormat::V04,
1316 false,
1317 false, );
1319
1320 let traces: Vec<Vec<SpanBytes>> = vec![vec![SpanBytes {
1321 name: BytesString::from_slice(b"test").unwrap(),
1322 ..Default::default()
1323 }]];
1324 let data = msgpack_encoder::v04::to_vec(&traces);
1325
1326 let _result = exporter.send(data.as_ref()).expect("failed to send trace");
1327
1328 let mut buf = [0; 1_000];
1330 match stats_socket.recv(&mut buf) {
1331 Ok(_) => {
1332 let datagram = String::from_utf8_lossy(buf.as_ref());
1333 let received = datagram.trim_matches(char::from(0)).to_string();
1334 panic!(
1335 "Expected no metrics when health metrics disabled, but received: {received}"
1336 );
1337 }
1338 Err(e)
1339 if e.kind() == std::io::ErrorKind::WouldBlock
1340 || e.kind() == std::io::ErrorKind::TimedOut
1341 || e.kind() == std::io::ErrorKind::Interrupted =>
1342 {
1343 }
1348 Err(e) => panic!("Unexpected error reading from socket: {e}"),
1349 }
1350 }
1351
1352 #[test]
1353 #[cfg_attr(miri, ignore)]
1354 fn test_agent_response_parse_default() {
1355 let server = MockServer::start();
1356 let _agent = server.mock(|_, then| {
1357 then.status(200)
1358 .header("content-type", "application/json")
1359 .body(
1360 r#"{
1361 "rate_by_service": {
1362 "service:foo,env:staging": 1.0,
1363 "service:,env:": 0.8
1364 }
1365 }"#,
1366 );
1367 });
1368
1369 let mut builder = TraceExporterBuilder::default();
1370 builder
1371 .set_url(&server.url("/"))
1372 .set_service("foo")
1373 .set_env("foo-env")
1374 .set_tracer_version("v0.1")
1375 .set_language("nodejs")
1376 .set_language_version("1.0")
1377 .set_language_interpreter("v8");
1378 let exporter = builder.build().unwrap();
1379
1380 let traces: Vec<Vec<SpanBytes>> = vec![vec![SpanBytes {
1381 name: BytesString::from_slice(b"test").unwrap(),
1382 ..Default::default()
1383 }]];
1384 let data = msgpack_encoder::v04::to_vec(&traces);
1385 let result = exporter.send(data.as_ref()).unwrap();
1386
1387 assert_eq!(
1388 result,
1389 AgentResponse::Changed {
1390 body: r#"{
1391 "rate_by_service": {
1392 "service:foo,env:staging": 1.0,
1393 "service:,env:": 0.8
1394 }
1395 }"#
1396 .to_string()
1397 }
1398 );
1399 }
1400
1401 #[test]
1402 #[cfg_attr(miri, ignore)]
1403 fn test_agent_response_error() {
1404 let server = MockServer::start();
1405 let _agent = server.mock(|_, then| {
1406 then.status(500)
1407 .header("content-type", "application/json")
1408 .body(r#"{ "error": "Unavailable" }"#);
1409 });
1410
1411 let mut builder = TraceExporterBuilder::default();
1412 builder
1413 .set_url(&server.url("/"))
1414 .set_service("foo")
1415 .set_env("foo-env")
1416 .set_tracer_version("v0.1")
1417 .set_language("nodejs")
1418 .set_language_version("1.0")
1419 .set_language_interpreter("v8");
1420 let exporter = builder.build().unwrap();
1421
1422 let traces: Vec<Vec<SpanBytes>> = vec![vec![SpanBytes {
1423 name: BytesString::from_slice(b"test").unwrap(),
1424 ..Default::default()
1425 }]];
1426 let data = msgpack_encoder::v04::to_vec(&traces);
1427 let code = match exporter.send(data.as_ref()).unwrap_err() {
1428 TraceExporterError::Request(e) => Some(e.status()),
1429 _ => None,
1430 }
1431 .unwrap();
1432
1433 assert_eq!(code, 500);
1434 }
1435
1436 #[test]
1437 #[cfg_attr(miri, ignore)]
1438 fn test_agent_empty_response_error() {
1439 let server = MockServer::start();
1440 let _agent = server.mock(|_, then| {
1441 then.status(200)
1442 .header("content-type", "application/json")
1443 .body("");
1444 });
1445
1446 let mut builder = TraceExporterBuilder::default();
1447 builder
1448 .set_url(&server.url("/"))
1449 .set_service("foo")
1450 .set_env("foo-env")
1451 .set_tracer_version("v0.1")
1452 .set_language("nodejs")
1453 .set_language_version("1.0")
1454 .set_language_interpreter("v8");
1455 let exporter = builder.build().unwrap();
1456
1457 let traces: Vec<Vec<SpanBytes>> = vec![vec![SpanBytes {
1458 name: BytesString::from_slice(b"test").unwrap(),
1459 ..Default::default()
1460 }]];
1461 let data = msgpack_encoder::v04::to_vec(&traces);
1462 let err = exporter.send(data.as_ref());
1463
1464 assert!(err.is_err());
1465 assert_eq!(
1466 match err.unwrap_err() {
1467 TraceExporterError::Agent(e) => Some(e),
1468 _ => None,
1469 },
1470 Some(AgentErrorKind::EmptyResponse)
1471 );
1472 }
1473
1474 #[test]
1475 #[cfg_attr(miri, ignore)]
1476 fn test_exporter_metrics_v4() {
1477 let server = MockServer::start();
1478 let response_body = r#"{
1479 "rate_by_service": {
1480 "service:foo,env:staging": 1.0,
1481 "service:,env:": 0.8
1482 }
1483 }"#;
1484 let traces_endpoint = server.mock(|when, then| {
1485 when.method(POST).path("/v0.4/traces");
1486 then.status(200)
1487 .header("content-type", "application/json")
1488 .body(response_body);
1489 });
1490
1491 let metrics_endpoint = server.mock(|when, then| {
1492 when.method(POST)
1493 .body_includes("\"metric\":\"trace_api.bytes\"")
1494 .path("/telemetry/proxy/api/v2/apmtelemetry");
1495 then.status(200)
1496 .header("content-type", "application/json")
1497 .body("");
1498 });
1499
1500 let mut builder = TraceExporterBuilder::default();
1501 builder
1502 .set_url(&server.url("/"))
1503 .set_service("foo")
1504 .set_env("foo-env")
1505 .set_tracer_version("v0.1")
1506 .set_language("nodejs")
1507 .set_language_version("1.0")
1508 .set_language_interpreter("v8")
1509 .enable_telemetry(TelemetryConfig {
1510 heartbeat: 100,
1511 ..Default::default()
1512 });
1513 let exporter = builder.build().unwrap();
1514
1515 let traces = vec![0x90];
1516 let result = exporter.send(traces.as_ref()).unwrap();
1517 let AgentResponse::Changed { body } = result else {
1518 panic!("Expected Changed response");
1519 };
1520 assert_eq!(body, response_body);
1521
1522 traces_endpoint.assert_calls(1);
1523 while metrics_endpoint.calls() == 0 {
1524 exporter
1525 .runtime
1526 .lock()
1527 .unwrap()
1528 .as_ref()
1529 .unwrap()
1530 .block_on(async {
1531 sleep(Duration::from_millis(100)).await;
1532 })
1533 }
1534 metrics_endpoint.assert_calls(1);
1535 }
1536
1537 #[test]
1538 #[cfg_attr(miri, ignore)]
1539 fn test_exporter_metrics_v5() {
1540 let server = MockServer::start();
1541 let response_body = r#"{
1542 "rate_by_service": {
1543 "service:foo,env:staging": 1.0,
1544 "service:,env:": 0.8
1545 }
1546 }"#;
1547 let traces_endpoint = server.mock(|when, then| {
1548 when.method(POST).path("/v0.5/traces");
1549 then.status(200)
1550 .header("content-type", "application/json")
1551 .body(response_body);
1552 });
1553
1554 let metrics_endpoint = server.mock(|when, then| {
1555 when.method(POST)
1556 .body_includes("\"metric\":\"trace_api.bytes\"")
1557 .path("/telemetry/proxy/api/v2/apmtelemetry");
1558 then.status(200)
1559 .header("content-type", "application/json")
1560 .body("");
1561 });
1562
1563 let exporter = build_test_exporter(
1564 server.url("/"),
1565 None,
1566 TraceExporterInputFormat::V05,
1567 TraceExporterOutputFormat::V05,
1568 true,
1569 true,
1570 );
1571
1572 let v5: (Vec<BytesString>, Vec<Vec<v05::Span>>) = (vec![], vec![]);
1573 let traces = rmp_serde::to_vec(&v5).unwrap();
1574 let result = exporter.send(traces.as_ref()).unwrap();
1575 let AgentResponse::Changed { body } = result else {
1576 panic!("Expected Changed response");
1577 };
1578 assert_eq!(body, response_body);
1579
1580 traces_endpoint.assert_calls(1);
1581 while metrics_endpoint.calls() == 0 {
1582 exporter
1583 .runtime
1584 .lock()
1585 .unwrap()
1586 .as_ref()
1587 .unwrap()
1588 .block_on(async {
1589 sleep(Duration::from_millis(100)).await;
1590 })
1591 }
1592 metrics_endpoint.assert_calls(1);
1593 }
1594
1595 #[test]
1596 #[cfg_attr(miri, ignore)]
1597 fn test_exporter_metrics_v4_to_v5() {
1598 let server = MockServer::start();
1599 let response_body = r#"{
1600 "rate_by_service": {
1601 "service:foo,env:staging": 1.0,
1602 "service:,env:": 0.8
1603 }
1604 }"#;
1605 let traces_endpoint = server.mock(|when, then| {
1606 when.method(POST).path("/v0.5/traces").is_true(|req| {
1607 let bytes = libdd_tinybytes::Bytes::copy_from_slice(req.body_ref());
1608 bytes.to_vec() == V5_EMPTY
1609 });
1610 then.status(200)
1611 .header("content-type", "application/json")
1612 .body(response_body);
1613 });
1614
1615 let metrics_endpoint = server.mock(|when, then| {
1616 when.method(POST)
1617 .body_includes("\"metric\":\"trace_api.bytes\"")
1618 .path("/telemetry/proxy/api/v2/apmtelemetry");
1619 then.status(200)
1620 .header("content-type", "application/json")
1621 .body("");
1622 });
1623
1624 let mut builder = TraceExporterBuilder::default();
1625 builder
1626 .set_url(&server.url("/"))
1627 .set_service("foo")
1628 .set_env("foo-env")
1629 .set_tracer_version("v0.1")
1630 .set_language("nodejs")
1631 .set_language_version("1.0")
1632 .set_language_interpreter("v8")
1633 .enable_telemetry(TelemetryConfig {
1634 heartbeat: 100,
1635 ..Default::default()
1636 })
1637 .set_input_format(TraceExporterInputFormat::V04)
1638 .set_output_format(TraceExporterOutputFormat::V05);
1639
1640 let exporter = builder.build().unwrap();
1641
1642 let traces = vec![0x90];
1643 let result = exporter.send(traces.as_ref()).unwrap();
1644 let AgentResponse::Changed { body } = result else {
1645 panic!("Expected Changed response");
1646 };
1647 assert_eq!(body, response_body);
1648
1649 traces_endpoint.assert_calls(1);
1650 while metrics_endpoint.calls() == 0 {
1651 exporter
1652 .runtime
1653 .lock()
1654 .unwrap()
1655 .as_ref()
1656 .unwrap()
1657 .block_on(async {
1658 sleep(Duration::from_millis(100)).await;
1659 })
1660 }
1661 metrics_endpoint.assert_calls(1);
1662 }
1663
1664 #[test]
1665 #[cfg_attr(miri, ignore)]
1666 fn test_agent_response_payload_version_disabled() {
1669 let server = MockServer::start();
1670 let response_body = r#"{
1671 "rate_by_service": {
1672 "service:foo,env:staging": 1.0,
1673 "service:,env:": 0.8
1674 }
1675 }"#;
1676 let traces_endpoint = server.mock(|when, then| {
1677 when.method(POST).path("/v0.4/traces");
1678 then.status(200)
1679 .header("content-type", "application/json")
1680 .header("datadog-rates-payload-version", "abc")
1681 .body(response_body);
1682 });
1683
1684 let mut builder = TraceExporterBuilder::default();
1685 builder.set_url(&server.url("/"));
1686 let exporter = builder.build().unwrap();
1687 let traces = vec![0x90];
1688 for _ in 0..2 {
1689 let result = exporter.send(traces.as_ref()).unwrap();
1690 let AgentResponse::Changed { body } = result else {
1691 panic!("Expected Changed response");
1692 };
1693 assert_eq!(body, response_body);
1694 }
1695 traces_endpoint.assert_calls(2);
1696 }
1697
1698 #[test]
1699 #[cfg_attr(miri, ignore)]
1700 fn test_agent_response_payload_version() {
1704 let server = MockServer::start();
1705 let response_body = r#"{
1706 "rate_by_service": {
1707 "service:foo,env:staging": 1.0,
1708 "service:,env:": 0.8
1709 }
1710 }"#;
1711 let mut traces_endpoint = server.mock(|when, then| {
1712 when.method(POST).path("/v0.4/traces");
1713 then.status(200)
1714 .header("content-type", "application/json")
1715 .header("datadog-rates-payload-version", "abc")
1716 .body(response_body);
1717 });
1718
1719 let mut builder = TraceExporterBuilder::default();
1720 builder
1721 .set_url(&server.url("/"))
1722 .enable_agent_rates_payload_version();
1723 let exporter = builder.build().unwrap();
1724 let traces = vec![0x90];
1725 let result = exporter.send(traces.as_ref()).unwrap();
1726 let AgentResponse::Changed { body } = result else {
1727 panic!("Expected Changed response");
1728 };
1729 assert_eq!(body, response_body);
1730
1731 let result = exporter.send(traces.as_ref()).unwrap();
1732 let AgentResponse::Unchanged = result else {
1733 panic!("Expected Unchanged response");
1734 };
1735 traces_endpoint.assert_calls(2);
1736 traces_endpoint.delete();
1737
1738 let traces_endpoint = server.mock(|when, then| {
1739 when.method(POST).path("/v0.4/traces");
1740 then.status(200)
1741 .header("content-type", "application/json")
1742 .header("datadog-rates-payload-version", "def")
1743 .body(response_body);
1744 });
1745 let result = exporter.send(traces.as_ref()).unwrap();
1746 let AgentResponse::Changed { body } = result else {
1747 panic!("Expected Changed response");
1748 };
1749 assert_eq!(body, response_body);
1750
1751 let result = exporter.send(traces.as_ref()).unwrap();
1752 let AgentResponse::Unchanged = result else {
1753 panic!("Expected Unchanged response");
1754 };
1755 traces_endpoint.assert_calls(2);
1756 }
1757
1758 #[test]
1759 #[cfg_attr(miri, ignore)]
1760 fn test_agent_malfunction_info_4xx() {
1761 test_agent_malfunction_info(404, r#"{"error":"Not Found"}"#, Duration::from_secs(0));
1762 }
1763
1764 #[test]
1765 #[cfg_attr(miri, ignore)]
1766 fn test_agent_malfunction_info_5xx() {
1767 test_agent_malfunction_info(
1768 500,
1769 r#"{"error":"Internal Server Error"}"#,
1770 Duration::from_secs(0),
1771 );
1772 }
1773
1774 #[test]
1775 #[cfg_attr(miri, ignore)]
1776 fn test_agent_malfunction_info_timeout() {
1777 test_agent_malfunction_info(
1778 408,
1779 r#"{"error":"Internal Server Error"}"#,
1780 Duration::from_secs(600),
1781 );
1782 }
1783
1784 #[test]
1785 #[cfg_attr(miri, ignore)]
1786 fn test_agent_malfunction_info_wrong_answer() {
1787 test_agent_malfunction_info(200, "WRONG_ANSWER", Duration::from_secs(0));
1788 }
1789
1790 fn test_agent_malfunction_info(status: u16, response: &str, delay: Duration) {
1791 let server = MockServer::start();
1792
1793 let mock_traces = server.mock(|when, then| {
1794 when.method(POST)
1795 .header("Content-type", "application/msgpack")
1796 .path("/v0.4/traces");
1797 then.status(200).body(
1798 r#"{
1799 "rate_by_service": {
1800 "service:test,env:staging": 1.0,
1801 }
1802 }"#,
1803 );
1804 });
1805
1806 let mock_info = server.mock(|when, then| {
1807 when.method(GET).path("/info");
1808 then.delay(delay).status(status).body(response);
1809 });
1810
1811 let mut builder = TraceExporterBuilder::default();
1812 builder
1813 .set_url(&server.url("/"))
1814 .set_service("test")
1815 .set_env("staging")
1816 .set_tracer_version("v0.1")
1817 .set_language("nodejs")
1818 .set_language_version("1.0")
1819 .set_language_interpreter("v8")
1820 .set_input_format(TraceExporterInputFormat::V04)
1821 .set_output_format(TraceExporterOutputFormat::V04)
1822 .enable_stats(Duration::from_secs(10));
1823 let exporter = builder.build().unwrap();
1824
1825 let trace_chunk = vec![SpanBytes {
1826 duration: 10,
1827 ..Default::default()
1828 }];
1829
1830 let data = msgpack_encoder::v04::to_vec(&[trace_chunk]);
1831
1832 while mock_info.calls() == 0 {
1834 exporter
1835 .runtime
1836 .lock()
1837 .unwrap()
1838 .as_ref()
1839 .unwrap()
1840 .block_on(async {
1841 sleep(Duration::from_millis(100)).await;
1842 })
1843 }
1844
1845 let _ = exporter.send(data.as_ref()).unwrap();
1846
1847 exporter.shutdown(None).unwrap();
1848
1849 mock_traces.assert();
1850 }
1851
1852 #[test]
1853 #[cfg_attr(miri, ignore)]
1854 fn test_connection_timeout() {
1855 let exporter = TraceExporterBuilder::default().build().unwrap();
1856
1857 assert_eq!(exporter.endpoint.timeout_ms, Endpoint::default().timeout_ms);
1858
1859 let timeout = Some(42);
1860 let mut builder = TraceExporterBuilder::default();
1861 builder.set_connection_timeout(timeout);
1862
1863 let exporter = builder.build().unwrap();
1864
1865 assert_eq!(exporter.endpoint.timeout_ms, 42);
1866 }
1867
1868 #[test]
1869 #[cfg_attr(miri, ignore)]
1870 fn stop_and_start_runtime() {
1871 let builder = TraceExporterBuilder::default();
1872 let exporter = builder.build().unwrap();
1873 exporter.stop_worker();
1874 exporter.run_worker().unwrap();
1875 }
1876}
1877
1878#[cfg(test)]
1879mod single_threaded_tests {
1880 use super::*;
1881 use crate::agent_info;
1882 use httpmock::prelude::*;
1883 use libdd_trace_utils::msgpack_encoder;
1884 use libdd_trace_utils::span::v04::SpanBytes;
1885 use std::time::Duration;
1886 use tokio::time::sleep;
1887
1888 #[cfg_attr(miri, ignore)]
1889 #[test]
1890 fn test_shutdown() {
1891 agent_info::clear_cache_for_test();
1893
1894 let server = MockServer::start();
1895
1896 let mock_traces = server.mock(|when, then| {
1897 when.method(POST)
1898 .header("Content-type", "application/msgpack")
1899 .path("/v0.4/traces");
1900 then.status(200).body("");
1901 });
1902
1903 let mock_stats = server.mock(|when, then| {
1904 when.method(POST)
1905 .header("Content-type", "application/msgpack")
1906 .path("/v0.6/stats");
1907 then.status(200).body("");
1908 });
1909
1910 let _mock_info = server.mock(|when, then| {
1911 when.method(GET).path("/info");
1912 then.status(200)
1913 .header("content-type", "application/json")
1914 .header("datadog-agent-state", "1")
1915 .body(r#"{"version":"1","client_drop_p0s":true,"endpoints":["/v0.4/traces","/v0.6/stats"]}"#);
1916 });
1917
1918 let mut builder = TraceExporterBuilder::default();
1919 builder
1920 .set_url(&server.url("/"))
1921 .set_service("test")
1922 .set_env("staging")
1923 .set_tracer_version("v0.1")
1924 .set_language("nodejs")
1925 .set_language_version("1.0")
1926 .set_language_interpreter("v8")
1927 .set_input_format(TraceExporterInputFormat::V04)
1928 .set_output_format(TraceExporterOutputFormat::V04)
1929 .enable_stats(Duration::from_secs(10));
1930 let exporter = builder.build().unwrap();
1931
1932 let trace_chunk = vec![SpanBytes {
1933 duration: 10,
1934 ..Default::default()
1935 }];
1936
1937 let data = msgpack_encoder::v04::to_vec(&[trace_chunk]);
1938
1939 while agent_info::get_agent_info().is_none() {
1941 exporter
1942 .runtime
1943 .lock()
1944 .unwrap()
1945 .as_ref()
1946 .unwrap()
1947 .block_on(async {
1948 sleep(Duration::from_millis(100)).await;
1949 })
1950 }
1951
1952 let result = exporter.send(data.as_ref());
1953 assert!(result.is_err());
1955
1956 let start_time = std::time::Instant::now();
1959 while !exporter.is_stats_worker_active() {
1960 if start_time.elapsed() > Duration::from_secs(10) {
1961 panic!("Timeout waiting for stats worker to become active");
1962 }
1963 std::thread::sleep(Duration::from_millis(10));
1964 }
1965
1966 exporter.shutdown(None).unwrap();
1967
1968 for _ in 0..1000 {
1970 if mock_traces.calls() > 0 && mock_stats.calls() > 0 {
1971 break;
1972 } else {
1973 std::thread::sleep(Duration::from_millis(10));
1974 }
1975 }
1976
1977 mock_traces.assert();
1978 mock_stats.assert();
1979 }
1980
1981 #[cfg_attr(miri, ignore)]
1982 #[test]
1983 fn test_shutdown_with_timeout() {
1984 agent_info::clear_cache_for_test();
1986
1987 let server = MockServer::start();
1988
1989 let mock_traces = server.mock(|when, then| {
1990 when.method(POST)
1991 .header("Content-type", "application/msgpack")
1992 .path("/v0.4/traces");
1993 then.status(200).body(
1994 r#"{
1995 "rate_by_service": {
1996 "service:foo,env:staging": 1.0,
1997 "service:,env:": 0.8
1998 }
1999 }"#,
2000 );
2001 });
2002
2003 let _mock_stats = server.mock(|when, then| {
2004 when.method(POST)
2005 .header("Content-type", "application/msgpack")
2006 .path("/v0.6/stats");
2007 then.delay(Duration::from_secs(10)).status(200).body("");
2008 });
2009
2010 let _mock_info = server.mock(|when, then| {
2011 when.method(GET).path("/info");
2012 then.status(200)
2013 .header("content-type", "application/json")
2014 .header("datadog-agent-state", "1")
2015 .body(r#"{"version":"1","client_drop_p0s":true,"endpoints":["/v0.4/traces","/v0.6/stats"]}"#);
2016 });
2017
2018 let mut builder = TraceExporterBuilder::default();
2019 builder
2020 .set_url(&server.url("/"))
2021 .set_service("test")
2022 .set_env("staging")
2023 .set_tracer_version("v0.1")
2024 .set_language("nodejs")
2025 .set_language_version("1.0")
2026 .set_language_interpreter("v8")
2027 .set_input_format(TraceExporterInputFormat::V04)
2028 .set_output_format(TraceExporterOutputFormat::V04)
2029 .enable_stats(Duration::from_secs(10));
2030 let exporter = builder.build().unwrap();
2031
2032 let trace_chunk = vec![SpanBytes {
2033 service: "test".into(),
2034 name: "test".into(),
2035 resource: "test".into(),
2036 r#type: "test".into(),
2037 duration: 10,
2038 ..Default::default()
2039 }];
2040
2041 let data = msgpack_encoder::v04::to_vec(&[trace_chunk]);
2042
2043 while agent_info::get_agent_info().is_none() {
2046 exporter
2047 .runtime
2048 .lock()
2049 .unwrap()
2050 .as_ref()
2051 .unwrap()
2052 .block_on(async {
2053 sleep(Duration::from_millis(100)).await;
2054 })
2055 }
2056
2057 exporter.send(data.as_ref()).unwrap();
2058
2059 let start_time = std::time::Instant::now();
2062 while !exporter.is_stats_worker_active() {
2063 if start_time.elapsed() > Duration::from_secs(10) {
2064 panic!("Timeout waiting for stats worker to become active");
2065 }
2066 std::thread::sleep(Duration::from_millis(10));
2067 }
2068
2069 exporter
2070 .shutdown(Some(Duration::from_millis(5)))
2071 .unwrap_err(); mock_traces.assert();
2074 }
2075}