Skip to main content

libdd_trace_utils/
stats_utils.rs

1// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/
2// SPDX-License-Identifier: Apache-2.0
3
4#[cfg(feature = "mini_agent")]
5pub use mini_agent::*;
6
7#[cfg(feature = "mini_agent")]
8mod mini_agent {
9    use bytes::Buf;
10    use http::{Method, Request, StatusCode};
11    use http_body_util::BodyExt;
12    use libdd_common::http_common;
13    use libdd_common::Connect;
14    use libdd_common::Endpoint;
15    use libdd_common::GenericHttpClient;
16    use libdd_trace_protobuf::pb;
17    use std::io::Write;
18    use tracing::debug;
19
20    pub async fn get_stats_from_request_body(
21        body: http_common::Body,
22    ) -> anyhow::Result<pb::ClientStatsPayload> {
23        let buffer = body.collect().await?.aggregate();
24
25        let client_stats_payload: pb::ClientStatsPayload =
26            match rmp_serde::from_read(buffer.reader()) {
27                Ok(res) => res,
28                Err(err) => {
29                    anyhow::bail!("Error deserializing stats from request body: {err}")
30                }
31            };
32
33        if client_stats_payload.stats.is_empty() {
34            debug!("Empty trace stats payload received, but this is okay");
35        }
36        Ok(client_stats_payload)
37    }
38
39    pub fn construct_stats_payload(stats: Vec<pb::ClientStatsPayload>) -> pb::StatsPayload {
40        // set hostname on stats from tracer to empty string for serverless
41        let stats = stats
42            .into_iter()
43            .map(|mut stat| {
44                stat.hostname = "".to_string();
45                stat
46            })
47            .collect();
48        pb::StatsPayload {
49            agent_hostname: "".to_string(),
50            agent_env: "".to_string(),
51            stats,
52            agent_version: "".to_string(),
53            client_computed: true,
54            split_payload: false,
55        }
56    }
57
58    pub fn serialize_stats_payload(payload: pb::StatsPayload) -> anyhow::Result<Vec<u8>> {
59        let msgpack = rmp_serde::to_vec_named(&payload)?;
60        let mut encoder = flate2::write::GzEncoder::new(Vec::new(), flate2::Compression::fast());
61        encoder.write_all(&msgpack)?;
62        match encoder.finish() {
63            Ok(res) => Ok(res),
64            Err(e) => anyhow::bail!("Error serializing stats payload: {e}"),
65        }
66    }
67
68    pub async fn send_stats_payload(
69        data: Vec<u8>,
70        target: &Endpoint,
71        api_key: &str,
72    ) -> anyhow::Result<()> {
73        send_stats_payload_with_client::<libdd_common::connector::Connector>(
74            data, target, api_key, None,
75        )
76        .await
77    }
78
79    pub async fn send_stats_payload_with_client<C: Connect>(
80        data: Vec<u8>,
81        target: &Endpoint,
82        api_key: &str,
83        client: Option<&GenericHttpClient<C>>,
84    ) -> anyhow::Result<()> {
85        let req = Request::builder()
86            .method(Method::POST)
87            .uri(target.url.clone())
88            .header("Content-Type", "application/msgpack")
89            .header("Content-Encoding", "gzip")
90            .header("DD-API-KEY", api_key)
91            .body(http_common::Body::from(data.clone()))?;
92
93        let response = if let Some(client) = client {
94            client.request(req).await
95        } else {
96            let default_client = http_common::new_default_client();
97            default_client.request(req).await
98        };
99
100        match response {
101            Ok(response) => {
102                if response.status() != StatusCode::ACCEPTED {
103                    let body_bytes = response.into_body().collect().await?.to_bytes();
104                    let response_body = String::from_utf8(body_bytes.to_vec()).unwrap_or_default();
105                    anyhow::bail!("Server did not accept trace stats: {response_body}");
106                }
107                Ok(())
108            }
109            Err(e) => anyhow::bail!("Failed to send trace stats: {e}"),
110        }
111    }
112}
113
114#[cfg(test)]
115#[cfg(feature = "mini_agent")]
116mod mini_agent_tests {
117    use crate::stats_utils;
118    use http::Request;
119    use libdd_common::http_common;
120    use libdd_trace_protobuf::pb::{
121        ClientGroupedStats, ClientStatsBucket, ClientStatsPayload, Trilean::NotSet,
122    };
123    use serde_json::Value;
124
125    #[tokio::test]
126    #[cfg_attr(all(miri, target_os = "macos"), ignore)]
127    async fn test_get_stats_from_request_body() {
128        let stats_json = r#"{
129            "Hostname": "TestHost",
130            "Env": "test",
131            "Version": "1.0.0",
132            "Stats": [
133                {
134                    "Start": 0,
135                    "Duration": 10000000000,
136                    "Stats": [
137                        {
138                            "Name": "test-span",
139                            "Service": "test-service",
140                            "Resource": "test-span",
141                            "Type": "",
142                            "HTTPStatusCode": 0,
143                            "Synthetics": false,
144                            "Hits": 1,
145                            "TopLevelHits": 1,
146                            "Errors": 0,
147                            "Duration": 10000000,
148                            "OkSummary": [
149                                0,
150                                0,
151                                0
152                            ],
153                            "ErrorSummary": [
154                                0,
155                                0,
156                                0
157                            ],
158                            "GrpcStatusCode": "0",
159                            "HTTPMethod": "GET",
160                            "HTTPEndpoint": "/test"
161                        }
162                    ]
163                }
164            ],
165            "Lang": "javascript",
166            "TracerVersion": "1.0.0",
167            "RuntimeID": "00000000-0000-0000-0000-000000000000",
168            "Sequence": 1
169        }"#;
170
171        let v: Value = match serde_json::from_str(stats_json) {
172            Ok(value) => value,
173            Err(err) => {
174                panic!("Failed to parse stats JSON: {err}");
175            }
176        };
177
178        let bytes = rmp_serde::to_vec(&v).unwrap();
179        let request = Request::builder()
180            .body(http_common::Body::from(bytes))
181            .unwrap();
182
183        let res = stats_utils::get_stats_from_request_body(request.into_body()).await;
184
185        let client_stats_payload = ClientStatsPayload {
186            hostname: "TestHost".to_string(),
187            env: "test".to_string(),
188            version: "1.0.0".to_string(),
189            stats: vec![ClientStatsBucket {
190                start: 0,
191                duration: 10000000000,
192                stats: vec![ClientGroupedStats {
193                    service: "test-service".to_string(),
194                    name: "test-span".to_string(),
195                    resource: "test-span".to_string(),
196                    http_status_code: 0,
197                    r#type: "".to_string(),
198                    db_type: "".to_string(),
199                    hits: 1,
200                    errors: 0,
201                    duration: 10000000,
202                    ok_summary: vec![0, 0, 0],
203                    error_summary: vec![0, 0, 0],
204                    synthetics: false,
205                    top_level_hits: 1,
206                    span_kind: "".to_string(),
207                    peer_tags: vec![],
208                    is_trace_root: NotSet.into(),
209                    grpc_status_code: "0".to_string(),
210                    http_endpoint: "/test".to_string(),
211                    http_method: "GET".to_string(),
212                }],
213                agent_time_shift: 0,
214            }],
215            lang: "javascript".to_string(),
216            tracer_version: "1.0.0".to_string(),
217            runtime_id: "00000000-0000-0000-0000-000000000000".to_string(),
218            sequence: 1,
219            agent_aggregation: "".to_string(),
220            service: "".to_string(),
221            container_id: "".to_string(),
222            tags: vec![],
223            git_commit_sha: "".to_string(),
224            image_tag: "".to_string(),
225            process_tags_hash: 0,
226            process_tags: "".to_string(),
227        };
228
229        assert!(
230            res.is_ok(),
231            "Expected Ok result, but got Err: {}",
232            res.unwrap_err()
233        );
234        assert_eq!(res.unwrap(), client_stats_payload)
235    }
236
237    #[tokio::test]
238    #[cfg_attr(all(miri, target_os = "macos"), ignore)]
239    async fn test_get_stats_from_request_body_without_stats() {
240        let stats_json = r#"{
241            "Hostname": "TestHost",
242            "Env": "test",
243            "Version": "1.0.0",
244            "Lang": "javascript",
245            "TracerVersion": "1.0.0",
246            "RuntimeID": "00000000-0000-0000-0000-000000000000",
247            "Sequence": 1
248        }"#;
249
250        let v: Value = match serde_json::from_str(stats_json) {
251            Ok(value) => value,
252            Err(err) => {
253                panic!("Failed to parse stats JSON: {err}");
254            }
255        };
256
257        let bytes = rmp_serde::to_vec(&v).unwrap();
258        let request = Request::builder()
259            .body(http_common::Body::from(bytes))
260            .unwrap();
261
262        let res = stats_utils::get_stats_from_request_body(request.into_body()).await;
263
264        let client_stats_payload = ClientStatsPayload {
265            hostname: "TestHost".to_string(),
266            env: "test".to_string(),
267            version: "1.0.0".to_string(),
268            stats: vec![],
269            lang: "javascript".to_string(),
270            tracer_version: "1.0.0".to_string(),
271            runtime_id: "00000000-0000-0000-0000-000000000000".to_string(),
272            sequence: 1,
273            agent_aggregation: "".to_string(),
274            service: "".to_string(),
275            container_id: "".to_string(),
276            tags: vec![],
277            git_commit_sha: "".to_string(),
278            image_tag: "".to_string(),
279            process_tags_hash: 0,
280            process_tags: "".to_string(),
281        };
282
283        assert!(
284            res.is_ok(),
285            "Expected Ok result, but got Err: {}",
286            res.unwrap_err()
287        );
288        assert_eq!(res.unwrap(), client_stats_payload)
289    }
290
291    #[tokio::test]
292    #[cfg_attr(all(miri, target_os = "macos"), ignore)]
293    async fn test_serialize_client_stats_payload_without_stats() {
294        let client_stats_payload_without_stats = ClientStatsPayload {
295            hostname: "TestHost".to_string(),
296            env: "test".to_string(),
297            version: "1.0.0".to_string(),
298            stats: vec![],
299            lang: "javascript".to_string(),
300            tracer_version: "1.0.0".to_string(),
301            runtime_id: "00000000-0000-0000-0000-000000000000".to_string(),
302            sequence: 1,
303            agent_aggregation: "".to_string(),
304            service: "".to_string(),
305            container_id: "".to_string(),
306            tags: vec![],
307            git_commit_sha: "".to_string(),
308            image_tag: "".to_string(),
309            process_tags_hash: 0,
310            process_tags: "".to_string(),
311        };
312
313        let client_stats_payload_without_inner_stats = ClientStatsPayload {
314            hostname: "TestHost".to_string(),
315            env: "test".to_string(),
316            version: "1.0.0".to_string(),
317            stats: vec![ClientStatsBucket {
318                start: 0,
319                duration: 10000000000,
320                stats: vec![],
321                agent_time_shift: 0,
322            }],
323            lang: "javascript".to_string(),
324            tracer_version: "1.0.0".to_string(),
325            runtime_id: "00000000-0000-0000-0000-000000000000".to_string(),
326            sequence: 1,
327            agent_aggregation: "".to_string(),
328            service: "".to_string(),
329            container_id: "".to_string(),
330            tags: vec![],
331            git_commit_sha: "".to_string(),
332            image_tag: "".to_string(),
333            process_tags_hash: 0,
334            process_tags: "".to_string(),
335        };
336
337        let res = stats_utils::serialize_stats_payload(stats_utils::construct_stats_payload(vec![
338            client_stats_payload_without_stats,
339            client_stats_payload_without_inner_stats,
340        ]));
341
342        assert!(
343            res.is_ok(),
344            "Expected Ok result, but got Err: {}",
345            res.unwrap_err()
346        );
347    }
348}