1use std::{
5 borrow::Borrow,
6 collections::HashMap,
7 sync::{
8 atomic::{AtomicU64, Ordering},
9 Arc, Mutex,
10 },
11 time,
12};
13
14use crate::trace_exporter::TracerMetadata;
15use libdd_common::{worker::Worker, Endpoint, HttpClient};
16use libdd_trace_protobuf::pb;
17use libdd_trace_stats::span_concentrator::SpanConcentrator;
18use libdd_trace_utils::send_with_retry::{send_with_retry, RetryStrategy};
19use tokio::select;
20use tokio_util::sync::CancellationToken;
21use tracing::error;
22
23const STATS_ENDPOINT_PATH: &str = "/v0.6/stats";
24
25#[derive(Debug)]
27pub struct StatsExporter {
28 flush_interval: time::Duration,
29 concentrator: Arc<Mutex<SpanConcentrator>>,
30 endpoint: Endpoint,
31 meta: TracerMetadata,
32 sequence_id: AtomicU64,
33 cancellation_token: CancellationToken,
34 client: HttpClient,
35}
36
37impl StatsExporter {
38 pub fn new(
47 flush_interval: time::Duration,
48 concentrator: Arc<Mutex<SpanConcentrator>>,
49 meta: TracerMetadata,
50 endpoint: Endpoint,
51 cancellation_token: CancellationToken,
52 client: HttpClient,
53 ) -> Self {
54 Self {
55 flush_interval,
56 concentrator,
57 endpoint,
58 meta,
59 sequence_id: AtomicU64::new(0),
60 cancellation_token,
61 client,
62 }
63 }
64
65 pub async fn send(&self, force_flush: bool) -> anyhow::Result<()> {
81 let payload = self.flush(force_flush);
82 if payload.stats.is_empty() {
83 return Ok(());
84 }
85 let body = rmp_serde::encode::to_vec_named(&payload)?;
86
87 let mut headers: HashMap<&'static str, String> = self.meta.borrow().into();
88
89 headers.insert(
90 http::header::CONTENT_TYPE.as_str(),
91 libdd_common::header::APPLICATION_MSGPACK_STR.to_string(),
92 );
93
94 let result = send_with_retry(
95 &self.client,
96 &self.endpoint,
97 body,
98 &headers,
99 &RetryStrategy::default(),
100 )
101 .await;
102
103 match result {
104 Ok(_) => Ok(()),
105 Err(err) => {
106 error!(?err, "Error with the StateExporter when sending stats");
107 anyhow::bail!("Failed to send stats: {err}");
108 }
109 }
110 }
111
112 fn flush(&self, force_flush: bool) -> pb::ClientStatsPayload {
122 let sequence = self.sequence_id.fetch_add(1, Ordering::Relaxed);
123 encode_stats_payload(
124 self.meta.borrow(),
125 sequence,
126 #[allow(clippy::unwrap_used)]
127 self.concentrator
128 .lock()
129 .unwrap()
130 .flush(time::SystemTime::now(), force_flush),
131 )
132 }
133}
134
135impl Worker for StatsExporter {
136 async fn run(&mut self) {
142 loop {
143 select! {
144 _ = self.cancellation_token.cancelled() => {
145 let _ = self.send(true).await;
146 break;
147 },
148 _ = tokio::time::sleep(self.flush_interval) => {
149 let _ = self.send(false).await;
150 },
151 };
152 }
153 }
154}
155
156fn encode_stats_payload(
157 meta: &TracerMetadata,
158 sequence: u64,
159 buckets: Vec<pb::ClientStatsBucket>,
160) -> pb::ClientStatsPayload {
161 pb::ClientStatsPayload {
162 hostname: meta.hostname.clone(),
163 env: meta.env.clone(),
164 lang: meta.language.clone(),
165 version: meta.app_version.clone(),
166 runtime_id: meta.runtime_id.clone(),
167 tracer_version: meta.tracer_version.clone(),
168 sequence,
169 stats: buckets,
170 git_commit_sha: meta.git_commit_sha.clone(),
171 service: String::new(),
173 container_id: String::new(),
174 tags: Vec::new(),
175 agent_aggregation: String::new(),
176 image_tag: String::new(),
177 process_tags: String::new(),
178 process_tags_hash: 0,
179 }
180}
181
182pub fn stats_url_from_agent_url(agent_url: &str) -> anyhow::Result<http::Uri> {
184 let mut parts = agent_url.parse::<http::Uri>()?.into_parts();
185 parts.path_and_query = Some(http::uri::PathAndQuery::from_static(STATS_ENDPOINT_PATH));
186 Ok(http::Uri::from_parts(parts)?)
187}
188
189#[cfg(test)]
190mod tests {
191 use super::*;
192 use httpmock::prelude::*;
193 use httpmock::MockServer;
194 use libdd_common::http_common::new_default_client;
195 use libdd_trace_utils::span::{trace_utils, v04::SpanSlice};
196 use libdd_trace_utils::test_utils::poll_for_mock_hit;
197 use time::Duration;
198 use time::SystemTime;
199
200 fn is_send<T: Send>() {}
201 fn is_sync<T: Sync>() {}
202
203 const BUCKETS_DURATION: Duration = Duration::from_secs(10);
204
205 #[test]
207 fn test_stats_exporter_sync_send() {
208 let _ = is_send::<StatsExporter>;
209 let _ = is_sync::<StatsExporter>;
210 }
211
212 fn get_test_metadata() -> TracerMetadata {
213 TracerMetadata {
214 hostname: "libdatadog-test".into(),
215 env: "test".into(),
216 app_version: "0.0.0".into(),
217 language: "rust".into(),
218 tracer_version: "0.0.0".into(),
219 runtime_id: "e39d6d12-0752-489f-b488-cf80006c0378".into(),
220 ..Default::default()
221 }
222 }
223
224 fn get_test_concentrator() -> SpanConcentrator {
225 let mut concentrator = SpanConcentrator::new(
226 BUCKETS_DURATION,
227 SystemTime::now() - BUCKETS_DURATION * 3,
229 vec![],
230 vec![],
231 );
232 let mut trace = vec![];
233
234 for i in 1..100 {
235 trace.push(SpanSlice {
236 service: "libdatadog-test",
237 duration: i,
238 ..Default::default()
239 })
240 }
241
242 trace_utils::compute_top_level_span(trace.as_mut_slice());
243
244 for span in trace.iter() {
245 concentrator.add_span(span);
246 }
247 concentrator
248 }
249
250 #[cfg_attr(miri, ignore)]
251 #[tokio::test]
252 async fn test_send_stats() {
253 let server = MockServer::start_async().await;
254
255 let mock = server
256 .mock_async(|when, then| {
257 when.method(POST)
258 .header("Content-type", "application/msgpack")
259 .path("/v0.6/stats")
260 .body_includes("libdatadog-test");
261 then.status(200).body("");
262 })
263 .await;
264
265 let stats_exporter = StatsExporter::new(
266 BUCKETS_DURATION,
267 Arc::new(Mutex::new(get_test_concentrator())),
268 get_test_metadata(),
269 Endpoint::from_url(stats_url_from_agent_url(&server.url("/")).unwrap()),
270 CancellationToken::new(),
271 new_default_client(),
272 );
273
274 let send_status = stats_exporter.send(true).await;
275 send_status.unwrap();
276
277 mock.assert_async().await;
278 }
279
280 #[cfg_attr(miri, ignore)]
281 #[tokio::test]
282 async fn test_send_stats_fail() {
283 let server = MockServer::start_async().await;
284
285 let mut mock = server
286 .mock_async(|_when, then| {
287 then.status(503)
288 .header("content-type", "application/json")
289 .body(r#"{"status":"error"}"#);
290 })
291 .await;
292
293 let stats_exporter = StatsExporter::new(
294 BUCKETS_DURATION,
295 Arc::new(Mutex::new(get_test_concentrator())),
296 get_test_metadata(),
297 Endpoint::from_url(stats_url_from_agent_url(&server.url("/")).unwrap()),
298 CancellationToken::new(),
299 new_default_client(),
300 );
301
302 let send_status = stats_exporter.send(true).await;
303 send_status.unwrap_err();
304
305 assert!(
306 poll_for_mock_hit(&mut mock, 10, 100, 5, true).await,
307 "Expected max retry attempts"
308 );
309 }
310
311 #[cfg_attr(miri, ignore)]
312 #[tokio::test]
313 async fn test_run() {
314 let server = MockServer::start_async().await;
315
316 let mut mock = server
317 .mock_async(|when, then| {
318 when.method(POST)
319 .header("Content-type", "application/msgpack")
320 .path("/v0.6/stats")
321 .body_includes("libdatadog-test");
322 then.status(200).body("");
323 })
324 .await;
325
326 let mut stats_exporter = StatsExporter::new(
327 BUCKETS_DURATION,
328 Arc::new(Mutex::new(get_test_concentrator())),
329 get_test_metadata(),
330 Endpoint::from_url(stats_url_from_agent_url(&server.url("/")).unwrap()),
331 CancellationToken::new(),
332 new_default_client(),
333 );
334
335 tokio::time::pause();
336 tokio::spawn(async move {
337 stats_exporter.run().await;
338 });
339 tokio::time::sleep(BUCKETS_DURATION + Duration::from_secs(1)).await;
341 tokio::time::resume();
343 assert!(
344 poll_for_mock_hit(&mut mock, 10, 100, 1, false).await,
345 "Expected max retry attempts"
346 );
347 }
348
349 #[cfg_attr(miri, ignore)]
350 #[tokio::test]
351 async fn test_cancellation_token() {
352 let server = MockServer::start_async().await;
353
354 let mut mock = server
355 .mock_async(|when, then| {
356 when.method(POST)
357 .header("Content-type", "application/msgpack")
358 .path("/v0.6/stats")
359 .body_includes("libdatadog-test");
360 then.status(200).body("");
361 })
362 .await;
363
364 let buckets_duration = Duration::from_secs(10);
365 let cancellation_token = CancellationToken::new();
366
367 let mut stats_exporter = StatsExporter::new(
368 buckets_duration,
369 Arc::new(Mutex::new(get_test_concentrator())),
370 get_test_metadata(),
371 Endpoint::from_url(stats_url_from_agent_url(&server.url("/")).unwrap()),
372 cancellation_token.clone(),
373 new_default_client(),
374 );
375
376 tokio::spawn(async move {
377 stats_exporter.run().await;
378 });
379 cancellation_token.cancel();
381
382 assert!(
383 poll_for_mock_hit(&mut mock, 10, 100, 1, false).await,
384 "Expected max retry attempts"
385 );
386 }
387}