Skip to main content

libdd_trace_utils/
stats_utils.rs

1// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/
2// SPDX-License-Identifier: Apache-2.0
3
4#[cfg(feature = "mini_agent")]
5pub use mini_agent::*;
6
7#[cfg(feature = "mini_agent")]
8mod mini_agent {
9    use bytes::{Buf, Bytes};
10    use http_body_util::BodyExt;
11    use libdd_capabilities::HttpClientCapability;
12    use libdd_common::http_common;
13    use libdd_common::Endpoint;
14    use libdd_trace_protobuf::pb;
15    use std::io::Write;
16    use tracing::debug;
17
18    pub async fn get_stats_from_request_body(
19        body: http_common::Body,
20    ) -> anyhow::Result<pb::ClientStatsPayload> {
21        let buffer = BodyExt::collect(body).await?.aggregate();
22
23        let client_stats_payload: pb::ClientStatsPayload =
24            match rmp_serde::from_read(buffer.reader()) {
25                Ok(res) => res,
26                Err(err) => {
27                    anyhow::bail!("Error deserializing stats from request body: {err}")
28                }
29            };
30
31        if client_stats_payload.stats.is_empty() {
32            debug!("Empty trace stats payload received, but this is okay");
33        }
34        Ok(client_stats_payload)
35    }
36
37    pub fn construct_stats_payload(stats: Vec<pb::ClientStatsPayload>) -> pb::StatsPayload {
38        // set hostname on stats from tracer to empty string for serverless
39        let stats = stats
40            .into_iter()
41            .map(|mut stat| {
42                stat.hostname = "".to_string();
43                stat
44            })
45            .collect();
46        pb::StatsPayload {
47            agent_hostname: "".to_string(),
48            agent_env: "".to_string(),
49            stats,
50            agent_version: "".to_string(),
51            client_computed: true,
52            split_payload: false,
53        }
54    }
55
56    pub fn serialize_stats_payload(payload: pb::StatsPayload) -> anyhow::Result<Vec<u8>> {
57        let msgpack = rmp_serde::to_vec_named(&payload)?;
58        let mut encoder = flate2::write::GzEncoder::new(Vec::new(), flate2::Compression::fast());
59        encoder.write_all(&msgpack)?;
60        match encoder.finish() {
61            Ok(res) => Ok(res),
62            Err(e) => anyhow::bail!("Error serializing stats payload: {e}"),
63        }
64    }
65
66    pub async fn send_stats_payload<H: HttpClientCapability>(
67        data: Vec<u8>,
68        target: &Endpoint,
69        api_key: &str,
70    ) -> anyhow::Result<()> {
71        let client = H::new_client();
72        let req = http::Request::builder()
73            .method(http::Method::POST)
74            .uri(target.url.clone())
75            .header("Content-Type", "application/msgpack")
76            .header("Content-Encoding", "gzip")
77            .header("DD-API-KEY", api_key)
78            .body(Bytes::from(data))?;
79
80        let response = client
81            .request(req)
82            .await
83            .map_err(|e| anyhow::anyhow!("Failed to send trace stats: {e}"))?;
84
85        if response.status() != http::StatusCode::ACCEPTED {
86            let response_body =
87                String::from_utf8(response.into_body().to_vec()).unwrap_or_default();
88            anyhow::bail!("Server did not accept trace stats: {response_body}");
89        }
90        Ok(())
91    }
92}
93
94#[cfg(test)]
95#[cfg(feature = "mini_agent")]
96mod mini_agent_tests {
97    use crate::stats_utils;
98    use http::Request;
99    use libdd_common::http_common;
100    use libdd_trace_protobuf::pb::{
101        ClientGroupedStats, ClientStatsBucket, ClientStatsPayload, Trilean::NotSet,
102    };
103    use serde_json::Value;
104
105    #[tokio::test]
106    #[cfg_attr(all(miri, target_os = "macos"), ignore)]
107    async fn test_get_stats_from_request_body() {
108        let stats_json = r#"{
109            "Hostname": "TestHost",
110            "Env": "test",
111            "Version": "1.0.0",
112            "Stats": [
113                {
114                    "Start": 0,
115                    "Duration": 10000000000,
116                    "Stats": [
117                        {
118                            "Name": "test-span",
119                            "Service": "test-service",
120                            "Resource": "test-span",
121                            "Type": "",
122                            "HTTPStatusCode": 0,
123                            "Synthetics": false,
124                            "Hits": 1,
125                            "TopLevelHits": 1,
126                            "Errors": 0,
127                            "Duration": 10000000,
128                            "OkSummary": [
129                                0,
130                                0,
131                                0
132                            ],
133                            "ErrorSummary": [
134                                0,
135                                0,
136                                0
137                            ],
138                            "GRPCStatusCode": "0",
139                            "HTTPMethod": "GET",
140                            "HTTPEndpoint": "/test"
141                        }
142                    ]
143                }
144            ],
145            "Lang": "javascript",
146            "TracerVersion": "1.0.0",
147            "RuntimeID": "00000000-0000-0000-0000-000000000000",
148            "Sequence": 1
149        }"#;
150
151        let v: Value = match serde_json::from_str(stats_json) {
152            Ok(value) => value,
153            Err(err) => {
154                panic!("Failed to parse stats JSON: {err}");
155            }
156        };
157
158        let bytes = rmp_serde::to_vec(&v).unwrap();
159        let request = Request::builder()
160            .body(http_common::Body::from(bytes))
161            .unwrap();
162
163        let res = stats_utils::get_stats_from_request_body(request.into_body()).await;
164
165        let client_stats_payload = ClientStatsPayload {
166            hostname: "TestHost".to_string(),
167            env: "test".to_string(),
168            version: "1.0.0".to_string(),
169            stats: vec![ClientStatsBucket {
170                start: 0,
171                duration: 10000000000,
172                stats: vec![ClientGroupedStats {
173                    service: "test-service".to_string(),
174                    name: "test-span".to_string(),
175                    resource: "test-span".to_string(),
176                    http_status_code: 0,
177                    r#type: "".to_string(),
178                    db_type: "".to_string(),
179                    hits: 1,
180                    errors: 0,
181                    duration: 10000000,
182                    ok_summary: vec![0, 0, 0],
183                    error_summary: vec![0, 0, 0],
184                    synthetics: false,
185                    top_level_hits: 1,
186                    span_kind: "".to_string(),
187                    peer_tags: vec![],
188                    is_trace_root: NotSet.into(),
189                    grpc_status_code: "0".to_string(),
190                    http_endpoint: "/test".to_string(),
191                    http_method: "GET".to_string(),
192                    service_source: "".to_string(),
193                    span_derived_primary_tags: vec![],
194                }],
195                agent_time_shift: 0,
196            }],
197            lang: "javascript".to_string(),
198            tracer_version: "1.0.0".to_string(),
199            runtime_id: "00000000-0000-0000-0000-000000000000".to_string(),
200            sequence: 1,
201            agent_aggregation: "".to_string(),
202            service: "".to_string(),
203            container_id: "".to_string(),
204            tags: vec![],
205            git_commit_sha: "".to_string(),
206            image_tag: "".to_string(),
207            process_tags_hash: 0,
208            process_tags: "".to_string(),
209        };
210
211        assert!(
212            res.is_ok(),
213            "Expected Ok result, but got Err: {}",
214            res.unwrap_err()
215        );
216        assert_eq!(res.unwrap(), client_stats_payload)
217    }
218
219    #[tokio::test]
220    #[cfg_attr(all(miri, target_os = "macos"), ignore)]
221    async fn test_get_stats_from_request_body_without_stats() {
222        let stats_json = r#"{
223            "Hostname": "TestHost",
224            "Env": "test",
225            "Version": "1.0.0",
226            "Lang": "javascript",
227            "TracerVersion": "1.0.0",
228            "RuntimeID": "00000000-0000-0000-0000-000000000000",
229            "Sequence": 1
230        }"#;
231
232        let v: Value = match serde_json::from_str(stats_json) {
233            Ok(value) => value,
234            Err(err) => {
235                panic!("Failed to parse stats JSON: {err}");
236            }
237        };
238
239        let bytes = rmp_serde::to_vec(&v).unwrap();
240        let request = Request::builder()
241            .body(http_common::Body::from(bytes))
242            .unwrap();
243
244        let res = stats_utils::get_stats_from_request_body(request.into_body()).await;
245
246        let client_stats_payload = ClientStatsPayload {
247            hostname: "TestHost".to_string(),
248            env: "test".to_string(),
249            version: "1.0.0".to_string(),
250            stats: vec![],
251            lang: "javascript".to_string(),
252            tracer_version: "1.0.0".to_string(),
253            runtime_id: "00000000-0000-0000-0000-000000000000".to_string(),
254            sequence: 1,
255            agent_aggregation: "".to_string(),
256            service: "".to_string(),
257            container_id: "".to_string(),
258            tags: vec![],
259            git_commit_sha: "".to_string(),
260            image_tag: "".to_string(),
261            process_tags_hash: 0,
262            process_tags: "".to_string(),
263        };
264
265        assert!(
266            res.is_ok(),
267            "Expected Ok result, but got Err: {}",
268            res.unwrap_err()
269        );
270        assert_eq!(res.unwrap(), client_stats_payload)
271    }
272
273    #[tokio::test]
274    #[cfg_attr(all(miri, target_os = "macos"), ignore)]
275    async fn test_serialize_client_stats_payload_without_stats() {
276        let client_stats_payload_without_stats = ClientStatsPayload {
277            hostname: "TestHost".to_string(),
278            env: "test".to_string(),
279            version: "1.0.0".to_string(),
280            stats: vec![],
281            lang: "javascript".to_string(),
282            tracer_version: "1.0.0".to_string(),
283            runtime_id: "00000000-0000-0000-0000-000000000000".to_string(),
284            sequence: 1,
285            agent_aggregation: "".to_string(),
286            service: "".to_string(),
287            container_id: "".to_string(),
288            tags: vec![],
289            git_commit_sha: "".to_string(),
290            image_tag: "".to_string(),
291            process_tags_hash: 0,
292            process_tags: "".to_string(),
293        };
294
295        let client_stats_payload_without_inner_stats = ClientStatsPayload {
296            hostname: "TestHost".to_string(),
297            env: "test".to_string(),
298            version: "1.0.0".to_string(),
299            stats: vec![ClientStatsBucket {
300                start: 0,
301                duration: 10000000000,
302                stats: vec![],
303                agent_time_shift: 0,
304            }],
305            lang: "javascript".to_string(),
306            tracer_version: "1.0.0".to_string(),
307            runtime_id: "00000000-0000-0000-0000-000000000000".to_string(),
308            sequence: 1,
309            agent_aggregation: "".to_string(),
310            service: "".to_string(),
311            container_id: "".to_string(),
312            tags: vec![],
313            git_commit_sha: "".to_string(),
314            image_tag: "".to_string(),
315            process_tags_hash: 0,
316            process_tags: "".to_string(),
317        };
318
319        let res = stats_utils::serialize_stats_payload(stats_utils::construct_stats_payload(vec![
320            client_stats_payload_without_stats,
321            client_stats_payload_without_inner_stats,
322        ]));
323
324        assert!(
325            res.is_ok(),
326            "Expected Ok result, but got Err: {}",
327            res.unwrap_err()
328        );
329    }
330}