Skip to main content

libdd_data_pipeline/
stats_exporter.rs

1// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    borrow::Borrow,
6    collections::HashMap,
7    sync::{
8        atomic::{AtomicU64, Ordering},
9        Arc, Mutex,
10    },
11    time,
12};
13
14use crate::trace_exporter::TracerMetadata;
15use libdd_common::{worker::Worker, Endpoint, HttpClient};
16use libdd_trace_protobuf::pb;
17use libdd_trace_stats::span_concentrator::SpanConcentrator;
18use libdd_trace_utils::send_with_retry::{send_with_retry, RetryStrategy};
19use tokio::select;
20use tokio_util::sync::CancellationToken;
21use tracing::error;
22
23const STATS_ENDPOINT_PATH: &str = "/v0.6/stats";
24
25/// An exporter that concentrates and sends stats to the agent
26#[derive(Debug)]
27pub struct StatsExporter {
28    flush_interval: time::Duration,
29    concentrator: Arc<Mutex<SpanConcentrator>>,
30    endpoint: Endpoint,
31    meta: TracerMetadata,
32    sequence_id: AtomicU64,
33    cancellation_token: CancellationToken,
34    client: HttpClient,
35}
36
37impl StatsExporter {
38    /// Return a new StatsExporter
39    ///
40    /// - `flush_interval` the interval on which the concentrator is flushed
41    /// - `concentrator` SpanConcentrator storing the stats to be sent to the agent
42    /// - `meta` metadata used in ClientStatsPayload and as headers to send stats to the agent
43    /// - `endpoint` the Endpoint used to send stats to the agent
44    /// - `cancellation_token` Token used to safely shutdown the exporter by force flushing the
45    ///   concentrator
46    pub fn new(
47        flush_interval: time::Duration,
48        concentrator: Arc<Mutex<SpanConcentrator>>,
49        meta: TracerMetadata,
50        endpoint: Endpoint,
51        cancellation_token: CancellationToken,
52        client: HttpClient,
53    ) -> Self {
54        Self {
55            flush_interval,
56            concentrator,
57            endpoint,
58            meta,
59            sequence_id: AtomicU64::new(0),
60            cancellation_token,
61            client,
62        }
63    }
64
65    /// Flush the stats stored in the concentrator and send them
66    ///
67    /// If the stats flushed from the concentrator contain at least one time bucket the stats are
68    /// sent to `self.endpoint`. The stats are serialized as msgpack.
69    ///
70    /// # Errors
71    /// The function will return an error in the following case:
72    /// - The endpoint failed to build
73    /// - The stats payload cannot be serialized as a valid http body
74    /// - The http client failed while sending the request
75    /// - The http status of the response is not 2xx
76    ///
77    /// # Panic
78    /// Will panic if another thread panicked while holding the concentrator lock in which
79    /// case stats cannot be flushed since the concentrator might be corrupted.
80    pub async fn send(&self, force_flush: bool) -> anyhow::Result<()> {
81        let payload = self.flush(force_flush);
82        if payload.stats.is_empty() {
83            return Ok(());
84        }
85        let body = rmp_serde::encode::to_vec_named(&payload)?;
86
87        let mut headers: HashMap<&'static str, String> = self.meta.borrow().into();
88
89        headers.insert(
90            http::header::CONTENT_TYPE.as_str(),
91            libdd_common::header::APPLICATION_MSGPACK_STR.to_string(),
92        );
93
94        let result = send_with_retry(
95            &self.client,
96            &self.endpoint,
97            body,
98            &headers,
99            &RetryStrategy::default(),
100        )
101        .await;
102
103        match result {
104            Ok(_) => Ok(()),
105            Err(err) => {
106                error!(?err, "Error with the StateExporter when sending stats");
107                anyhow::bail!("Failed to send stats: {err}");
108            }
109        }
110    }
111
112    /// Flush stats from the concentrator into a payload
113    ///
114    /// # Arguments
115    /// - `force_flush` if true, triggers a force flush on the concentrator causing all buckets to
116    ///   be flushed regardless of their age.
117    ///
118    /// # Panic
119    /// Will panic if another thread panicked while holding the concentrator lock in which
120    /// case stats cannot be flushed since the concentrator might be corrupted.
121    fn flush(&self, force_flush: bool) -> pb::ClientStatsPayload {
122        let sequence = self.sequence_id.fetch_add(1, Ordering::Relaxed);
123        encode_stats_payload(
124            self.meta.borrow(),
125            sequence,
126            #[allow(clippy::unwrap_used)]
127            self.concentrator
128                .lock()
129                .unwrap()
130                .flush(time::SystemTime::now(), force_flush),
131        )
132    }
133}
134
135impl Worker for StatsExporter {
136    /// Run loop of the stats exporter
137    ///
138    /// Once started, the stats exporter will flush and send stats on every `self.flush_interval`.
139    /// If the `self.cancellation_token` is cancelled, the exporter will force flush all stats and
140    /// return.
141    async fn run(&mut self) {
142        loop {
143            select! {
144                _ = self.cancellation_token.cancelled() => {
145                    let _ = self.send(true).await;
146                    break;
147                },
148                _ = tokio::time::sleep(self.flush_interval) => {
149                        let _ = self.send(false).await;
150                },
151            };
152        }
153    }
154}
155
156fn encode_stats_payload(
157    meta: &TracerMetadata,
158    sequence: u64,
159    buckets: Vec<pb::ClientStatsBucket>,
160) -> pb::ClientStatsPayload {
161    pb::ClientStatsPayload {
162        hostname: meta.hostname.clone(),
163        env: meta.env.clone(),
164        lang: meta.language.clone(),
165        version: meta.app_version.clone(),
166        runtime_id: meta.runtime_id.clone(),
167        tracer_version: meta.tracer_version.clone(),
168        sequence,
169        stats: buckets,
170        git_commit_sha: meta.git_commit_sha.clone(),
171        process_tags: meta.process_tags.clone(),
172        // These fields are unused or will be set by the Agent
173        service: String::new(),
174        container_id: String::new(),
175        tags: Vec::new(),
176        agent_aggregation: String::new(),
177        image_tag: String::new(),
178        process_tags_hash: 0,
179    }
180}
181
182/// Return the stats endpoint url to send stats to the agent at `agent_url`
183pub fn stats_url_from_agent_url(agent_url: &str) -> anyhow::Result<http::Uri> {
184    let mut parts = agent_url.parse::<http::Uri>()?.into_parts();
185    parts.path_and_query = Some(http::uri::PathAndQuery::from_static(STATS_ENDPOINT_PATH));
186    Ok(http::Uri::from_parts(parts)?)
187}
188
189#[cfg(test)]
190mod tests {
191    use super::*;
192    use httpmock::prelude::*;
193    use httpmock::MockServer;
194    use libdd_common::http_common::new_default_client;
195    use libdd_trace_utils::span::{trace_utils, v04::SpanSlice};
196    use libdd_trace_utils::test_utils::poll_for_mock_hit;
197    use time::Duration;
198    use time::SystemTime;
199
200    fn is_send<T: Send>() {}
201    fn is_sync<T: Sync>() {}
202
203    const BUCKETS_DURATION: Duration = Duration::from_secs(10);
204
205    /// Fails to compile if stats exporter is not Send and Sync
206    #[test]
207    fn test_stats_exporter_sync_send() {
208        let _ = is_send::<StatsExporter>;
209        let _ = is_sync::<StatsExporter>;
210    }
211
212    fn get_test_metadata() -> TracerMetadata {
213        TracerMetadata {
214            hostname: "libdatadog-test".into(),
215            env: "test".into(),
216            app_version: "0.0.0".into(),
217            language: "rust".into(),
218            tracer_version: "0.0.0".into(),
219            runtime_id: "e39d6d12-0752-489f-b488-cf80006c0378".into(),
220            process_tags: "key1:value1,key2:value2".into(),
221            ..Default::default()
222        }
223    }
224
225    fn get_test_concentrator() -> SpanConcentrator {
226        let mut concentrator = SpanConcentrator::new(
227            BUCKETS_DURATION,
228            // Make sure the oldest bucket will be flushed on next send
229            SystemTime::now() - BUCKETS_DURATION * 3,
230            vec![],
231            vec![],
232        );
233        let mut trace = vec![];
234
235        for i in 1..100 {
236            trace.push(SpanSlice {
237                service: "libdatadog-test",
238                duration: i,
239                ..Default::default()
240            })
241        }
242
243        trace_utils::compute_top_level_span(trace.as_mut_slice());
244
245        for span in trace.iter() {
246            concentrator.add_span(span);
247        }
248        concentrator
249    }
250
251    #[cfg_attr(miri, ignore)]
252    #[tokio::test]
253    async fn test_send_stats() {
254        let server = MockServer::start_async().await;
255
256        let mock = server
257            .mock_async(|when, then| {
258                when.method(POST)
259                    .header("Content-type", "application/msgpack")
260                    .path("/v0.6/stats")
261                    .body_includes("libdatadog-test")
262                    .body_includes("key1:value1,key2:value2");
263                then.status(200).body("");
264            })
265            .await;
266
267        let stats_exporter = StatsExporter::new(
268            BUCKETS_DURATION,
269            Arc::new(Mutex::new(get_test_concentrator())),
270            get_test_metadata(),
271            Endpoint::from_url(stats_url_from_agent_url(&server.url("/")).unwrap()),
272            CancellationToken::new(),
273            new_default_client(),
274        );
275
276        let send_status = stats_exporter.send(true).await;
277        send_status.unwrap();
278
279        mock.assert_async().await;
280    }
281
282    #[cfg_attr(miri, ignore)]
283    #[tokio::test]
284    async fn test_send_stats_fail() {
285        let server = MockServer::start_async().await;
286
287        let mut mock = server
288            .mock_async(|_when, then| {
289                then.status(503)
290                    .header("content-type", "application/json")
291                    .body(r#"{"status":"error"}"#);
292            })
293            .await;
294
295        let stats_exporter = StatsExporter::new(
296            BUCKETS_DURATION,
297            Arc::new(Mutex::new(get_test_concentrator())),
298            get_test_metadata(),
299            Endpoint::from_url(stats_url_from_agent_url(&server.url("/")).unwrap()),
300            CancellationToken::new(),
301            new_default_client(),
302        );
303
304        let send_status = stats_exporter.send(true).await;
305        send_status.unwrap_err();
306
307        assert!(
308            poll_for_mock_hit(&mut mock, 10, 100, 5, true).await,
309            "Expected max retry attempts"
310        );
311    }
312
313    #[cfg_attr(miri, ignore)]
314    #[tokio::test]
315    async fn test_run() {
316        let server = MockServer::start_async().await;
317
318        let mut mock = server
319            .mock_async(|when, then| {
320                when.method(POST)
321                    .header("Content-type", "application/msgpack")
322                    .path("/v0.6/stats")
323                    .body_includes("libdatadog-test")
324                    .body_includes("key1:value1,key2:value2");
325                then.status(200).body("");
326            })
327            .await;
328
329        let mut stats_exporter = StatsExporter::new(
330            BUCKETS_DURATION,
331            Arc::new(Mutex::new(get_test_concentrator())),
332            get_test_metadata(),
333            Endpoint::from_url(stats_url_from_agent_url(&server.url("/")).unwrap()),
334            CancellationToken::new(),
335            new_default_client(),
336        );
337
338        tokio::time::pause();
339        tokio::spawn(async move {
340            stats_exporter.run().await;
341        });
342        // Wait for the stats to be flushed
343        tokio::time::sleep(BUCKETS_DURATION + Duration::from_secs(1)).await;
344        // Resume time to sleep while the stats are being sent
345        tokio::time::resume();
346        assert!(
347            poll_for_mock_hit(&mut mock, 10, 100, 1, false).await,
348            "Expected max retry attempts"
349        );
350    }
351
352    #[cfg_attr(miri, ignore)]
353    #[tokio::test]
354    async fn test_cancellation_token() {
355        let server = MockServer::start_async().await;
356
357        let mut mock = server
358            .mock_async(|when, then| {
359                when.method(POST)
360                    .header("Content-type", "application/msgpack")
361                    .path("/v0.6/stats")
362                    .body_includes("libdatadog-test")
363                    .body_includes("key1:value1,key2:value2");
364                then.status(200).body("");
365            })
366            .await;
367
368        let buckets_duration = Duration::from_secs(10);
369        let cancellation_token = CancellationToken::new();
370
371        let mut stats_exporter = StatsExporter::new(
372            buckets_duration,
373            Arc::new(Mutex::new(get_test_concentrator())),
374            get_test_metadata(),
375            Endpoint::from_url(stats_url_from_agent_url(&server.url("/")).unwrap()),
376            cancellation_token.clone(),
377            new_default_client(),
378        );
379
380        tokio::spawn(async move {
381            stats_exporter.run().await;
382        });
383        // Cancel token to trigger force flush
384        cancellation_token.cancel();
385
386        assert!(
387            poll_for_mock_hit(&mut mock, 10, 100, 1, false).await,
388            "Expected max retry attempts"
389        );
390    }
391}