1use std::{
5 borrow::Borrow,
6 collections::HashMap,
7 sync::{
8 atomic::{AtomicU64, Ordering},
9 Arc, Mutex,
10 },
11 time,
12};
13
14use crate::trace_exporter::TracerMetadata;
15use libdd_common::{worker::Worker, Endpoint, HttpClient};
16use libdd_trace_protobuf::pb;
17use libdd_trace_stats::span_concentrator::SpanConcentrator;
18use libdd_trace_utils::send_with_retry::{send_with_retry, RetryStrategy};
19use tokio::select;
20use tokio_util::sync::CancellationToken;
21use tracing::error;
22
23const STATS_ENDPOINT_PATH: &str = "/v0.6/stats";
24
25#[derive(Debug)]
27pub struct StatsExporter {
28 flush_interval: time::Duration,
29 concentrator: Arc<Mutex<SpanConcentrator>>,
30 endpoint: Endpoint,
31 meta: TracerMetadata,
32 sequence_id: AtomicU64,
33 cancellation_token: CancellationToken,
34 client: HttpClient,
35}
36
37impl StatsExporter {
38 pub fn new(
47 flush_interval: time::Duration,
48 concentrator: Arc<Mutex<SpanConcentrator>>,
49 meta: TracerMetadata,
50 endpoint: Endpoint,
51 cancellation_token: CancellationToken,
52 client: HttpClient,
53 ) -> Self {
54 Self {
55 flush_interval,
56 concentrator,
57 endpoint,
58 meta,
59 sequence_id: AtomicU64::new(0),
60 cancellation_token,
61 client,
62 }
63 }
64
65 pub async fn send(&self, force_flush: bool) -> anyhow::Result<()> {
81 let payload = self.flush(force_flush);
82 if payload.stats.is_empty() {
83 return Ok(());
84 }
85 let body = rmp_serde::encode::to_vec_named(&payload)?;
86
87 let mut headers: HashMap<&'static str, String> = self.meta.borrow().into();
88
89 headers.insert(
90 http::header::CONTENT_TYPE.as_str(),
91 libdd_common::header::APPLICATION_MSGPACK_STR.to_string(),
92 );
93
94 let result = send_with_retry(
95 &self.client,
96 &self.endpoint,
97 body,
98 &headers,
99 &RetryStrategy::default(),
100 )
101 .await;
102
103 match result {
104 Ok(_) => Ok(()),
105 Err(err) => {
106 error!(?err, "Error with the StateExporter when sending stats");
107 anyhow::bail!("Failed to send stats: {err}");
108 }
109 }
110 }
111
112 fn flush(&self, force_flush: bool) -> pb::ClientStatsPayload {
122 let sequence = self.sequence_id.fetch_add(1, Ordering::Relaxed);
123 encode_stats_payload(
124 self.meta.borrow(),
125 sequence,
126 #[allow(clippy::unwrap_used)]
127 self.concentrator
128 .lock()
129 .unwrap()
130 .flush(time::SystemTime::now(), force_flush),
131 )
132 }
133}
134
135impl Worker for StatsExporter {
136 async fn run(&mut self) {
142 loop {
143 select! {
144 _ = self.cancellation_token.cancelled() => {
145 let _ = self.send(true).await;
146 break;
147 },
148 _ = tokio::time::sleep(self.flush_interval) => {
149 let _ = self.send(false).await;
150 },
151 };
152 }
153 }
154}
155
156fn encode_stats_payload(
157 meta: &TracerMetadata,
158 sequence: u64,
159 buckets: Vec<pb::ClientStatsBucket>,
160) -> pb::ClientStatsPayload {
161 pb::ClientStatsPayload {
162 hostname: meta.hostname.clone(),
163 env: meta.env.clone(),
164 lang: meta.language.clone(),
165 version: meta.app_version.clone(),
166 runtime_id: meta.runtime_id.clone(),
167 tracer_version: meta.tracer_version.clone(),
168 sequence,
169 stats: buckets,
170 git_commit_sha: meta.git_commit_sha.clone(),
171 process_tags: meta.process_tags.clone(),
172 service: String::new(),
174 container_id: String::new(),
175 tags: Vec::new(),
176 agent_aggregation: String::new(),
177 image_tag: String::new(),
178 process_tags_hash: 0,
179 }
180}
181
182pub fn stats_url_from_agent_url(agent_url: &str) -> anyhow::Result<http::Uri> {
184 let mut parts = agent_url.parse::<http::Uri>()?.into_parts();
185 parts.path_and_query = Some(http::uri::PathAndQuery::from_static(STATS_ENDPOINT_PATH));
186 Ok(http::Uri::from_parts(parts)?)
187}
188
189#[cfg(test)]
190mod tests {
191 use super::*;
192 use httpmock::prelude::*;
193 use httpmock::MockServer;
194 use libdd_common::http_common::new_default_client;
195 use libdd_trace_utils::span::{trace_utils, v04::SpanSlice};
196 use libdd_trace_utils::test_utils::poll_for_mock_hit;
197 use time::Duration;
198 use time::SystemTime;
199
200 fn is_send<T: Send>() {}
201 fn is_sync<T: Sync>() {}
202
203 const BUCKETS_DURATION: Duration = Duration::from_secs(10);
204
205 #[test]
207 fn test_stats_exporter_sync_send() {
208 let _ = is_send::<StatsExporter>;
209 let _ = is_sync::<StatsExporter>;
210 }
211
212 fn get_test_metadata() -> TracerMetadata {
213 TracerMetadata {
214 hostname: "libdatadog-test".into(),
215 env: "test".into(),
216 app_version: "0.0.0".into(),
217 language: "rust".into(),
218 tracer_version: "0.0.0".into(),
219 runtime_id: "e39d6d12-0752-489f-b488-cf80006c0378".into(),
220 process_tags: "key1:value1,key2:value2".into(),
221 ..Default::default()
222 }
223 }
224
225 fn get_test_concentrator() -> SpanConcentrator {
226 let mut concentrator = SpanConcentrator::new(
227 BUCKETS_DURATION,
228 SystemTime::now() - BUCKETS_DURATION * 3,
230 vec![],
231 vec![],
232 );
233 let mut trace = vec![];
234
235 for i in 1..100 {
236 trace.push(SpanSlice {
237 service: "libdatadog-test",
238 duration: i,
239 ..Default::default()
240 })
241 }
242
243 trace_utils::compute_top_level_span(trace.as_mut_slice());
244
245 for span in trace.iter() {
246 concentrator.add_span(span);
247 }
248 concentrator
249 }
250
251 #[cfg_attr(miri, ignore)]
252 #[tokio::test]
253 async fn test_send_stats() {
254 let server = MockServer::start_async().await;
255
256 let mock = server
257 .mock_async(|when, then| {
258 when.method(POST)
259 .header("Content-type", "application/msgpack")
260 .path("/v0.6/stats")
261 .body_includes("libdatadog-test")
262 .body_includes("key1:value1,key2:value2");
263 then.status(200).body("");
264 })
265 .await;
266
267 let stats_exporter = StatsExporter::new(
268 BUCKETS_DURATION,
269 Arc::new(Mutex::new(get_test_concentrator())),
270 get_test_metadata(),
271 Endpoint::from_url(stats_url_from_agent_url(&server.url("/")).unwrap()),
272 CancellationToken::new(),
273 new_default_client(),
274 );
275
276 let send_status = stats_exporter.send(true).await;
277 send_status.unwrap();
278
279 mock.assert_async().await;
280 }
281
282 #[cfg_attr(miri, ignore)]
283 #[tokio::test]
284 async fn test_send_stats_fail() {
285 let server = MockServer::start_async().await;
286
287 let mut mock = server
288 .mock_async(|_when, then| {
289 then.status(503)
290 .header("content-type", "application/json")
291 .body(r#"{"status":"error"}"#);
292 })
293 .await;
294
295 let stats_exporter = StatsExporter::new(
296 BUCKETS_DURATION,
297 Arc::new(Mutex::new(get_test_concentrator())),
298 get_test_metadata(),
299 Endpoint::from_url(stats_url_from_agent_url(&server.url("/")).unwrap()),
300 CancellationToken::new(),
301 new_default_client(),
302 );
303
304 let send_status = stats_exporter.send(true).await;
305 send_status.unwrap_err();
306
307 assert!(
308 poll_for_mock_hit(&mut mock, 10, 100, 5, true).await,
309 "Expected max retry attempts"
310 );
311 }
312
313 #[cfg_attr(miri, ignore)]
314 #[tokio::test]
315 async fn test_run() {
316 let server = MockServer::start_async().await;
317
318 let mut mock = server
319 .mock_async(|when, then| {
320 when.method(POST)
321 .header("Content-type", "application/msgpack")
322 .path("/v0.6/stats")
323 .body_includes("libdatadog-test")
324 .body_includes("key1:value1,key2:value2");
325 then.status(200).body("");
326 })
327 .await;
328
329 let mut stats_exporter = StatsExporter::new(
330 BUCKETS_DURATION,
331 Arc::new(Mutex::new(get_test_concentrator())),
332 get_test_metadata(),
333 Endpoint::from_url(stats_url_from_agent_url(&server.url("/")).unwrap()),
334 CancellationToken::new(),
335 new_default_client(),
336 );
337
338 tokio::time::pause();
339 tokio::spawn(async move {
340 stats_exporter.run().await;
341 });
342 tokio::time::sleep(BUCKETS_DURATION + Duration::from_secs(1)).await;
344 tokio::time::resume();
346 assert!(
347 poll_for_mock_hit(&mut mock, 10, 100, 1, false).await,
348 "Expected max retry attempts"
349 );
350 }
351
352 #[cfg_attr(miri, ignore)]
353 #[tokio::test]
354 async fn test_cancellation_token() {
355 let server = MockServer::start_async().await;
356
357 let mut mock = server
358 .mock_async(|when, then| {
359 when.method(POST)
360 .header("Content-type", "application/msgpack")
361 .path("/v0.6/stats")
362 .body_includes("libdatadog-test")
363 .body_includes("key1:value1,key2:value2");
364 then.status(200).body("");
365 })
366 .await;
367
368 let buckets_duration = Duration::from_secs(10);
369 let cancellation_token = CancellationToken::new();
370
371 let mut stats_exporter = StatsExporter::new(
372 buckets_duration,
373 Arc::new(Mutex::new(get_test_concentrator())),
374 get_test_metadata(),
375 Endpoint::from_url(stats_url_from_agent_url(&server.url("/")).unwrap()),
376 cancellation_token.clone(),
377 new_default_client(),
378 );
379
380 tokio::spawn(async move {
381 stats_exporter.run().await;
382 });
383 cancellation_token.cancel();
385
386 assert!(
387 poll_for_mock_hit(&mut mock, 10, 100, 1, false).await,
388 "Expected max retry attempts"
389 );
390 }
391}