libdd_trace_utils/
stats_utils.rs

1// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/
2// SPDX-License-Identifier: Apache-2.0
3
4#[cfg(feature = "mini_agent")]
5pub use mini_agent::*;
6
7#[cfg(feature = "mini_agent")]
8mod mini_agent {
9    use http_body_util::BodyExt;
10    use hyper::{body::Buf, Method, Request, StatusCode};
11    use libdd_common::hyper_migration;
12    use libdd_common::Endpoint;
13    use libdd_trace_protobuf::pb;
14    use std::io::Write;
15    use tracing::debug;
16
17    pub async fn get_stats_from_request_body(
18        body: hyper_migration::Body,
19    ) -> anyhow::Result<pb::ClientStatsPayload> {
20        let buffer = body.collect().await?.aggregate();
21
22        let client_stats_payload: pb::ClientStatsPayload =
23            match rmp_serde::from_read(buffer.reader()) {
24                Ok(res) => res,
25                Err(err) => {
26                    anyhow::bail!("Error deserializing stats from request body: {err}")
27                }
28            };
29
30        if client_stats_payload.stats.is_empty() {
31            debug!("Empty trace stats payload received, but this is okay");
32        }
33        Ok(client_stats_payload)
34    }
35
36    pub fn construct_stats_payload(stats: Vec<pb::ClientStatsPayload>) -> pb::StatsPayload {
37        pb::StatsPayload {
38            agent_hostname: "".to_string(),
39            agent_env: "".to_string(),
40            stats,
41            agent_version: "".to_string(),
42            client_computed: true,
43            split_payload: false,
44        }
45    }
46
47    pub fn serialize_stats_payload(payload: pb::StatsPayload) -> anyhow::Result<Vec<u8>> {
48        let msgpack = rmp_serde::to_vec_named(&payload)?;
49        let mut encoder = flate2::write::GzEncoder::new(Vec::new(), flate2::Compression::fast());
50        encoder.write_all(&msgpack)?;
51        match encoder.finish() {
52            Ok(res) => Ok(res),
53            Err(e) => anyhow::bail!("Error serializing stats payload: {e}"),
54        }
55    }
56
57    pub async fn send_stats_payload(
58        data: Vec<u8>,
59        target: &Endpoint,
60        api_key: &str,
61    ) -> anyhow::Result<()> {
62        let req = Request::builder()
63            .method(Method::POST)
64            .uri(target.url.clone())
65            .header("Content-Type", "application/msgpack")
66            .header("Content-Encoding", "gzip")
67            .header("DD-API-KEY", api_key)
68            .body(hyper_migration::Body::from(data.clone()))?;
69
70        let client = hyper_migration::new_default_client();
71        match client.request(req).await {
72            Ok(response) => {
73                if response.status() != StatusCode::ACCEPTED {
74                    let body_bytes = response.into_body().collect().await?.to_bytes();
75                    let response_body = String::from_utf8(body_bytes.to_vec()).unwrap_or_default();
76                    anyhow::bail!("Server did not accept trace stats: {response_body}");
77                }
78                Ok(())
79            }
80            Err(e) => anyhow::bail!("Failed to send trace stats: {e}"),
81        }
82    }
83}
84
85#[cfg(test)]
86#[cfg(feature = "mini_agent")]
87mod mini_agent_tests {
88    use crate::stats_utils;
89    use hyper::Request;
90    use libdd_common::hyper_migration;
91    use libdd_trace_protobuf::pb::{
92        ClientGroupedStats, ClientStatsBucket, ClientStatsPayload, Trilean::NotSet,
93    };
94    use serde_json::Value;
95
96    #[tokio::test]
97    #[cfg_attr(all(miri, target_os = "macos"), ignore)]
98    async fn test_get_stats_from_request_body() {
99        let stats_json = r#"{
100            "Hostname": "TestHost",
101            "Env": "test",
102            "Version": "1.0.0",
103            "Stats": [
104                {
105                    "Start": 0,
106                    "Duration": 10000000000,
107                    "Stats": [
108                        {
109                            "Name": "test-span",
110                            "Service": "test-service",
111                            "Resource": "test-span",
112                            "Type": "",
113                            "HTTPStatusCode": 0,
114                            "Synthetics": false,
115                            "Hits": 1,
116                            "TopLevelHits": 1,
117                            "Errors": 0,
118                            "Duration": 10000000,
119                            "OkSummary": [
120                                0,
121                                0,
122                                0
123                            ],
124                            "ErrorSummary": [
125                                0,
126                                0,
127                                0
128                            ],
129                            "GrpcStatusCode": "0",
130                            "HTTPMethod": "GET",
131                            "HTTPEndpoint": "/test"
132                        }
133                    ]
134                }
135            ],
136            "Lang": "javascript",
137            "TracerVersion": "1.0.0",
138            "RuntimeID": "00000000-0000-0000-0000-000000000000",
139            "Sequence": 1
140        }"#;
141
142        let v: Value = match serde_json::from_str(stats_json) {
143            Ok(value) => value,
144            Err(err) => {
145                panic!("Failed to parse stats JSON: {err}");
146            }
147        };
148
149        let bytes = rmp_serde::to_vec(&v).unwrap();
150        let request = Request::builder()
151            .body(hyper_migration::Body::from(bytes))
152            .unwrap();
153
154        let res = stats_utils::get_stats_from_request_body(request.into_body()).await;
155
156        let client_stats_payload = ClientStatsPayload {
157            hostname: "TestHost".to_string(),
158            env: "test".to_string(),
159            version: "1.0.0".to_string(),
160            stats: vec![ClientStatsBucket {
161                start: 0,
162                duration: 10000000000,
163                stats: vec![ClientGroupedStats {
164                    service: "test-service".to_string(),
165                    name: "test-span".to_string(),
166                    resource: "test-span".to_string(),
167                    http_status_code: 0,
168                    r#type: "".to_string(),
169                    db_type: "".to_string(),
170                    hits: 1,
171                    errors: 0,
172                    duration: 10000000,
173                    ok_summary: vec![0, 0, 0],
174                    error_summary: vec![0, 0, 0],
175                    synthetics: false,
176                    top_level_hits: 1,
177                    span_kind: "".to_string(),
178                    peer_tags: vec![],
179                    is_trace_root: NotSet.into(),
180                    grpc_status_code: "0".to_string(),
181                    http_endpoint: "/test".to_string(),
182                    http_method: "GET".to_string(),
183                }],
184                agent_time_shift: 0,
185            }],
186            lang: "javascript".to_string(),
187            tracer_version: "1.0.0".to_string(),
188            runtime_id: "00000000-0000-0000-0000-000000000000".to_string(),
189            sequence: 1,
190            agent_aggregation: "".to_string(),
191            service: "".to_string(),
192            container_id: "".to_string(),
193            tags: vec![],
194            git_commit_sha: "".to_string(),
195            image_tag: "".to_string(),
196            process_tags_hash: 0,
197            process_tags: "".to_string(),
198        };
199
200        assert!(
201            res.is_ok(),
202            "Expected Ok result, but got Err: {}",
203            res.unwrap_err()
204        );
205        assert_eq!(res.unwrap(), client_stats_payload)
206    }
207
208    #[tokio::test]
209    #[cfg_attr(all(miri, target_os = "macos"), ignore)]
210    async fn test_get_stats_from_request_body_without_stats() {
211        let stats_json = r#"{
212            "Hostname": "TestHost",
213            "Env": "test",
214            "Version": "1.0.0",
215            "Lang": "javascript",
216            "TracerVersion": "1.0.0",
217            "RuntimeID": "00000000-0000-0000-0000-000000000000",
218            "Sequence": 1
219        }"#;
220
221        let v: Value = match serde_json::from_str(stats_json) {
222            Ok(value) => value,
223            Err(err) => {
224                panic!("Failed to parse stats JSON: {err}");
225            }
226        };
227
228        let bytes = rmp_serde::to_vec(&v).unwrap();
229        let request = Request::builder()
230            .body(hyper_migration::Body::from(bytes))
231            .unwrap();
232
233        let res = stats_utils::get_stats_from_request_body(request.into_body()).await;
234
235        let client_stats_payload = ClientStatsPayload {
236            hostname: "TestHost".to_string(),
237            env: "test".to_string(),
238            version: "1.0.0".to_string(),
239            stats: vec![],
240            lang: "javascript".to_string(),
241            tracer_version: "1.0.0".to_string(),
242            runtime_id: "00000000-0000-0000-0000-000000000000".to_string(),
243            sequence: 1,
244            agent_aggregation: "".to_string(),
245            service: "".to_string(),
246            container_id: "".to_string(),
247            tags: vec![],
248            git_commit_sha: "".to_string(),
249            image_tag: "".to_string(),
250            process_tags_hash: 0,
251            process_tags: "".to_string(),
252        };
253
254        assert!(
255            res.is_ok(),
256            "Expected Ok result, but got Err: {}",
257            res.unwrap_err()
258        );
259        assert_eq!(res.unwrap(), client_stats_payload)
260    }
261
262    #[tokio::test]
263    #[cfg_attr(all(miri, target_os = "macos"), ignore)]
264    async fn test_serialize_client_stats_payload_without_stats() {
265        let client_stats_payload_without_stats = ClientStatsPayload {
266            hostname: "TestHost".to_string(),
267            env: "test".to_string(),
268            version: "1.0.0".to_string(),
269            stats: vec![],
270            lang: "javascript".to_string(),
271            tracer_version: "1.0.0".to_string(),
272            runtime_id: "00000000-0000-0000-0000-000000000000".to_string(),
273            sequence: 1,
274            agent_aggregation: "".to_string(),
275            service: "".to_string(),
276            container_id: "".to_string(),
277            tags: vec![],
278            git_commit_sha: "".to_string(),
279            image_tag: "".to_string(),
280            process_tags_hash: 0,
281            process_tags: "".to_string(),
282        };
283
284        let client_stats_payload_without_inner_stats = ClientStatsPayload {
285            hostname: "TestHost".to_string(),
286            env: "test".to_string(),
287            version: "1.0.0".to_string(),
288            stats: vec![ClientStatsBucket {
289                start: 0,
290                duration: 10000000000,
291                stats: vec![],
292                agent_time_shift: 0,
293            }],
294            lang: "javascript".to_string(),
295            tracer_version: "1.0.0".to_string(),
296            runtime_id: "00000000-0000-0000-0000-000000000000".to_string(),
297            sequence: 1,
298            agent_aggregation: "".to_string(),
299            service: "".to_string(),
300            container_id: "".to_string(),
301            tags: vec![],
302            git_commit_sha: "".to_string(),
303            image_tag: "".to_string(),
304            process_tags_hash: 0,
305            process_tags: "".to_string(),
306        };
307
308        let res = stats_utils::serialize_stats_payload(stats_utils::construct_stats_payload(vec![
309            client_stats_payload_without_stats,
310            client_stats_payload_without_inner_stats,
311        ]));
312
313        assert!(
314            res.is_ok(),
315            "Expected Ok result, but got Err: {}",
316            res.unwrap_err()
317        );
318    }
319}