Skip to main content

libdd_data_pipeline/
stats_exporter.rs

1// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    borrow::Borrow,
6    collections::HashMap,
7    sync::{
8        atomic::{AtomicU64, Ordering},
9        Arc, Mutex,
10    },
11    time,
12};
13
14use crate::trace_exporter::TracerMetadata;
15use libdd_common::{worker::Worker, Endpoint, HttpClient};
16use libdd_trace_protobuf::pb;
17use libdd_trace_stats::span_concentrator::SpanConcentrator;
18use libdd_trace_utils::send_with_retry::{send_with_retry, RetryStrategy};
19use tokio::select;
20use tokio_util::sync::CancellationToken;
21use tracing::error;
22
23const STATS_ENDPOINT_PATH: &str = "/v0.6/stats";
24
25/// An exporter that concentrates and sends stats to the agent
26#[derive(Debug)]
27pub struct StatsExporter {
28    flush_interval: time::Duration,
29    concentrator: Arc<Mutex<SpanConcentrator>>,
30    endpoint: Endpoint,
31    meta: TracerMetadata,
32    sequence_id: AtomicU64,
33    cancellation_token: CancellationToken,
34    client: HttpClient,
35}
36
37impl StatsExporter {
38    /// Return a new StatsExporter
39    ///
40    /// - `flush_interval` the interval on which the concentrator is flushed
41    /// - `concentrator` SpanConcentrator storing the stats to be sent to the agent
42    /// - `meta` metadata used in ClientStatsPayload and as headers to send stats to the agent
43    /// - `endpoint` the Endpoint used to send stats to the agent
44    /// - `cancellation_token` Token used to safely shutdown the exporter by force flushing the
45    ///   concentrator
46    pub fn new(
47        flush_interval: time::Duration,
48        concentrator: Arc<Mutex<SpanConcentrator>>,
49        meta: TracerMetadata,
50        endpoint: Endpoint,
51        cancellation_token: CancellationToken,
52        client: HttpClient,
53    ) -> Self {
54        Self {
55            flush_interval,
56            concentrator,
57            endpoint,
58            meta,
59            sequence_id: AtomicU64::new(0),
60            cancellation_token,
61            client,
62        }
63    }
64
65    /// Flush the stats stored in the concentrator and send them
66    ///
67    /// If the stats flushed from the concentrator contain at least one time bucket the stats are
68    /// sent to `self.endpoint`. The stats are serialized as msgpack.
69    ///
70    /// # Errors
71    /// The function will return an error in the following case:
72    /// - The endpoint failed to build
73    /// - The stats payload cannot be serialized as a valid http body
74    /// - The http client failed while sending the request
75    /// - The http status of the response is not 2xx
76    ///
77    /// # Panic
78    /// Will panic if another thread panicked while holding the concentrator lock in which
79    /// case stats cannot be flushed since the concentrator might be corrupted.
80    pub async fn send(&self, force_flush: bool) -> anyhow::Result<()> {
81        let payload = self.flush(force_flush);
82        if payload.stats.is_empty() {
83            return Ok(());
84        }
85        let body = rmp_serde::encode::to_vec_named(&payload)?;
86
87        let mut headers: HashMap<&'static str, String> = self.meta.borrow().into();
88
89        headers.insert(
90            http::header::CONTENT_TYPE.as_str(),
91            libdd_common::header::APPLICATION_MSGPACK_STR.to_string(),
92        );
93
94        let result = send_with_retry(
95            &self.client,
96            &self.endpoint,
97            body,
98            &headers,
99            &RetryStrategy::default(),
100        )
101        .await;
102
103        match result {
104            Ok(_) => Ok(()),
105            Err(err) => {
106                error!(?err, "Error with the StateExporter when sending stats");
107                anyhow::bail!("Failed to send stats: {err}");
108            }
109        }
110    }
111
112    /// Flush stats from the concentrator into a payload
113    ///
114    /// # Arguments
115    /// - `force_flush` if true, triggers a force flush on the concentrator causing all buckets to
116    ///   be flushed regardless of their age.
117    ///
118    /// # Panic
119    /// Will panic if another thread panicked while holding the concentrator lock in which
120    /// case stats cannot be flushed since the concentrator might be corrupted.
121    fn flush(&self, force_flush: bool) -> pb::ClientStatsPayload {
122        let sequence = self.sequence_id.fetch_add(1, Ordering::Relaxed);
123        encode_stats_payload(
124            self.meta.borrow(),
125            sequence,
126            #[allow(clippy::unwrap_used)]
127            self.concentrator
128                .lock()
129                .unwrap()
130                .flush(time::SystemTime::now(), force_flush),
131        )
132    }
133}
134
135impl Worker for StatsExporter {
136    /// Run loop of the stats exporter
137    ///
138    /// Once started, the stats exporter will flush and send stats on every `self.flush_interval`.
139    /// If the `self.cancellation_token` is cancelled, the exporter will force flush all stats and
140    /// return.
141    async fn run(&mut self) {
142        loop {
143            select! {
144                _ = self.cancellation_token.cancelled() => {
145                    let _ = self.send(true).await;
146                    break;
147                },
148                _ = tokio::time::sleep(self.flush_interval) => {
149                        let _ = self.send(false).await;
150                },
151            };
152        }
153    }
154}
155
156fn encode_stats_payload(
157    meta: &TracerMetadata,
158    sequence: u64,
159    buckets: Vec<pb::ClientStatsBucket>,
160) -> pb::ClientStatsPayload {
161    pb::ClientStatsPayload {
162        hostname: meta.hostname.clone(),
163        env: meta.env.clone(),
164        lang: meta.language.clone(),
165        version: meta.app_version.clone(),
166        runtime_id: meta.runtime_id.clone(),
167        tracer_version: meta.tracer_version.clone(),
168        sequence,
169        stats: buckets,
170        git_commit_sha: meta.git_commit_sha.clone(),
171        // These fields are unused or will be set by the Agent
172        service: String::new(),
173        container_id: String::new(),
174        tags: Vec::new(),
175        agent_aggregation: String::new(),
176        image_tag: String::new(),
177        process_tags: String::new(),
178        process_tags_hash: 0,
179    }
180}
181
182/// Return the stats endpoint url to send stats to the agent at `agent_url`
183pub fn stats_url_from_agent_url(agent_url: &str) -> anyhow::Result<http::Uri> {
184    let mut parts = agent_url.parse::<http::Uri>()?.into_parts();
185    parts.path_and_query = Some(http::uri::PathAndQuery::from_static(STATS_ENDPOINT_PATH));
186    Ok(http::Uri::from_parts(parts)?)
187}
188
189#[cfg(test)]
190mod tests {
191    use super::*;
192    use httpmock::prelude::*;
193    use httpmock::MockServer;
194    use libdd_common::http_common::new_default_client;
195    use libdd_trace_utils::span::{trace_utils, v04::SpanSlice};
196    use libdd_trace_utils::test_utils::poll_for_mock_hit;
197    use time::Duration;
198    use time::SystemTime;
199
200    fn is_send<T: Send>() {}
201    fn is_sync<T: Sync>() {}
202
203    const BUCKETS_DURATION: Duration = Duration::from_secs(10);
204
205    /// Fails to compile if stats exporter is not Send and Sync
206    #[test]
207    fn test_stats_exporter_sync_send() {
208        let _ = is_send::<StatsExporter>;
209        let _ = is_sync::<StatsExporter>;
210    }
211
212    fn get_test_metadata() -> TracerMetadata {
213        TracerMetadata {
214            hostname: "libdatadog-test".into(),
215            env: "test".into(),
216            app_version: "0.0.0".into(),
217            language: "rust".into(),
218            tracer_version: "0.0.0".into(),
219            runtime_id: "e39d6d12-0752-489f-b488-cf80006c0378".into(),
220            ..Default::default()
221        }
222    }
223
224    fn get_test_concentrator() -> SpanConcentrator {
225        let mut concentrator = SpanConcentrator::new(
226            BUCKETS_DURATION,
227            // Make sure the oldest bucket will be flushed on next send
228            SystemTime::now() - BUCKETS_DURATION * 3,
229            vec![],
230            vec![],
231        );
232        let mut trace = vec![];
233
234        for i in 1..100 {
235            trace.push(SpanSlice {
236                service: "libdatadog-test",
237                duration: i,
238                ..Default::default()
239            })
240        }
241
242        trace_utils::compute_top_level_span(trace.as_mut_slice());
243
244        for span in trace.iter() {
245            concentrator.add_span(span);
246        }
247        concentrator
248    }
249
250    #[cfg_attr(miri, ignore)]
251    #[tokio::test]
252    async fn test_send_stats() {
253        let server = MockServer::start_async().await;
254
255        let mock = server
256            .mock_async(|when, then| {
257                when.method(POST)
258                    .header("Content-type", "application/msgpack")
259                    .path("/v0.6/stats")
260                    .body_includes("libdatadog-test");
261                then.status(200).body("");
262            })
263            .await;
264
265        let stats_exporter = StatsExporter::new(
266            BUCKETS_DURATION,
267            Arc::new(Mutex::new(get_test_concentrator())),
268            get_test_metadata(),
269            Endpoint::from_url(stats_url_from_agent_url(&server.url("/")).unwrap()),
270            CancellationToken::new(),
271            new_default_client(),
272        );
273
274        let send_status = stats_exporter.send(true).await;
275        send_status.unwrap();
276
277        mock.assert_async().await;
278    }
279
280    #[cfg_attr(miri, ignore)]
281    #[tokio::test]
282    async fn test_send_stats_fail() {
283        let server = MockServer::start_async().await;
284
285        let mut mock = server
286            .mock_async(|_when, then| {
287                then.status(503)
288                    .header("content-type", "application/json")
289                    .body(r#"{"status":"error"}"#);
290            })
291            .await;
292
293        let stats_exporter = StatsExporter::new(
294            BUCKETS_DURATION,
295            Arc::new(Mutex::new(get_test_concentrator())),
296            get_test_metadata(),
297            Endpoint::from_url(stats_url_from_agent_url(&server.url("/")).unwrap()),
298            CancellationToken::new(),
299            new_default_client(),
300        );
301
302        let send_status = stats_exporter.send(true).await;
303        send_status.unwrap_err();
304
305        assert!(
306            poll_for_mock_hit(&mut mock, 10, 100, 5, true).await,
307            "Expected max retry attempts"
308        );
309    }
310
311    #[cfg_attr(miri, ignore)]
312    #[tokio::test]
313    async fn test_run() {
314        let server = MockServer::start_async().await;
315
316        let mut mock = server
317            .mock_async(|when, then| {
318                when.method(POST)
319                    .header("Content-type", "application/msgpack")
320                    .path("/v0.6/stats")
321                    .body_includes("libdatadog-test");
322                then.status(200).body("");
323            })
324            .await;
325
326        let mut stats_exporter = StatsExporter::new(
327            BUCKETS_DURATION,
328            Arc::new(Mutex::new(get_test_concentrator())),
329            get_test_metadata(),
330            Endpoint::from_url(stats_url_from_agent_url(&server.url("/")).unwrap()),
331            CancellationToken::new(),
332            new_default_client(),
333        );
334
335        tokio::time::pause();
336        tokio::spawn(async move {
337            stats_exporter.run().await;
338        });
339        // Wait for the stats to be flushed
340        tokio::time::sleep(BUCKETS_DURATION + Duration::from_secs(1)).await;
341        // Resume time to sleep while the stats are being sent
342        tokio::time::resume();
343        assert!(
344            poll_for_mock_hit(&mut mock, 10, 100, 1, false).await,
345            "Expected max retry attempts"
346        );
347    }
348
349    #[cfg_attr(miri, ignore)]
350    #[tokio::test]
351    async fn test_cancellation_token() {
352        let server = MockServer::start_async().await;
353
354        let mut mock = server
355            .mock_async(|when, then| {
356                when.method(POST)
357                    .header("Content-type", "application/msgpack")
358                    .path("/v0.6/stats")
359                    .body_includes("libdatadog-test");
360                then.status(200).body("");
361            })
362            .await;
363
364        let buckets_duration = Duration::from_secs(10);
365        let cancellation_token = CancellationToken::new();
366
367        let mut stats_exporter = StatsExporter::new(
368            buckets_duration,
369            Arc::new(Mutex::new(get_test_concentrator())),
370            get_test_metadata(),
371            Endpoint::from_url(stats_url_from_agent_url(&server.url("/")).unwrap()),
372            cancellation_token.clone(),
373            new_default_client(),
374        );
375
376        tokio::spawn(async move {
377            stats_exporter.run().await;
378        });
379        // Cancel token to trigger force flush
380        cancellation_token.cancel();
381
382        assert!(
383            poll_for_mock_hit(&mut mock, 10, 100, 1, false).await,
384            "Expected max retry attempts"
385        );
386    }
387}