Skip to main content

libdd_trace_utils/
stats_utils.rs

1// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/
2// SPDX-License-Identifier: Apache-2.0
3
4#[cfg(feature = "mini_agent")]
5pub use mini_agent::*;
6
7#[cfg(feature = "mini_agent")]
8mod mini_agent {
9    use bytes::Buf;
10    use http::{Method, Request, StatusCode};
11    use http_body_util::BodyExt;
12    use libdd_common::http_common;
13    use libdd_common::Connect;
14    use libdd_common::Endpoint;
15    use libdd_common::GenericHttpClient;
16    use libdd_trace_protobuf::pb;
17    use std::io::Write;
18    use tracing::debug;
19
20    pub async fn get_stats_from_request_body(
21        body: http_common::Body,
22    ) -> anyhow::Result<pb::ClientStatsPayload> {
23        let buffer = body.collect().await?.aggregate();
24
25        let client_stats_payload: pb::ClientStatsPayload =
26            match rmp_serde::from_read(buffer.reader()) {
27                Ok(res) => res,
28                Err(err) => {
29                    anyhow::bail!("Error deserializing stats from request body: {err}")
30                }
31            };
32
33        if client_stats_payload.stats.is_empty() {
34            debug!("Empty trace stats payload received, but this is okay");
35        }
36        Ok(client_stats_payload)
37    }
38
39    pub fn construct_stats_payload(stats: Vec<pb::ClientStatsPayload>) -> pb::StatsPayload {
40        // set hostname on stats from tracer to empty string for serverless
41        let stats = stats
42            .into_iter()
43            .map(|mut stat| {
44                stat.hostname = "".to_string();
45                stat
46            })
47            .collect();
48        pb::StatsPayload {
49            agent_hostname: "".to_string(),
50            agent_env: "".to_string(),
51            stats,
52            agent_version: "".to_string(),
53            client_computed: true,
54            split_payload: false,
55        }
56    }
57
58    pub fn serialize_stats_payload(payload: pb::StatsPayload) -> anyhow::Result<Vec<u8>> {
59        let msgpack = rmp_serde::to_vec_named(&payload)?;
60        let mut encoder = flate2::write::GzEncoder::new(Vec::new(), flate2::Compression::fast());
61        encoder.write_all(&msgpack)?;
62        match encoder.finish() {
63            Ok(res) => Ok(res),
64            Err(e) => anyhow::bail!("Error serializing stats payload: {e}"),
65        }
66    }
67
68    pub async fn send_stats_payload(
69        data: Vec<u8>,
70        target: &Endpoint,
71        api_key: &str,
72    ) -> anyhow::Result<()> {
73        send_stats_payload_with_client::<libdd_common::connector::Connector>(
74            data, target, api_key, None,
75        )
76        .await
77    }
78
79    pub async fn send_stats_payload_with_client<C: Connect>(
80        data: Vec<u8>,
81        target: &Endpoint,
82        api_key: &str,
83        client: Option<&GenericHttpClient<C>>,
84    ) -> anyhow::Result<()> {
85        let req = Request::builder()
86            .method(Method::POST)
87            .uri(target.url.clone())
88            .header("Content-Type", "application/msgpack")
89            .header("Content-Encoding", "gzip")
90            .header("DD-API-KEY", api_key)
91            .body(http_common::Body::from(data.clone()))?;
92
93        let response = if let Some(client) = client {
94            client.request(req).await
95        } else {
96            let default_client = http_common::new_default_client();
97            default_client.request(req).await
98        };
99
100        match response {
101            Ok(response) => {
102                if response.status() != StatusCode::ACCEPTED {
103                    let body_bytes = response.into_body().collect().await?.to_bytes();
104                    let response_body = String::from_utf8(body_bytes.to_vec()).unwrap_or_default();
105                    anyhow::bail!("Server did not accept trace stats: {response_body}");
106                }
107                Ok(())
108            }
109            Err(e) => anyhow::bail!("Failed to send trace stats: {e}"),
110        }
111    }
112}
113
114#[cfg(test)]
115#[cfg(feature = "mini_agent")]
116mod mini_agent_tests {
117    use crate::stats_utils;
118    use http::Request;
119    use libdd_common::http_common;
120    use libdd_trace_protobuf::pb::{
121        ClientGroupedStats, ClientStatsBucket, ClientStatsPayload, Trilean::NotSet,
122    };
123    use serde_json::Value;
124
125    #[tokio::test]
126    #[cfg_attr(all(miri, target_os = "macos"), ignore)]
127    async fn test_get_stats_from_request_body() {
128        let stats_json = r#"{
129            "Hostname": "TestHost",
130            "Env": "test",
131            "Version": "1.0.0",
132            "Stats": [
133                {
134                    "Start": 0,
135                    "Duration": 10000000000,
136                    "Stats": [
137                        {
138                            "Name": "test-span",
139                            "Service": "test-service",
140                            "Resource": "test-span",
141                            "Type": "",
142                            "HTTPStatusCode": 0,
143                            "Synthetics": false,
144                            "Hits": 1,
145                            "TopLevelHits": 1,
146                            "Errors": 0,
147                            "Duration": 10000000,
148                            "OkSummary": [
149                                0,
150                                0,
151                                0
152                            ],
153                            "ErrorSummary": [
154                                0,
155                                0,
156                                0
157                            ],
158                            "GrpcStatusCode": "0",
159                            "HTTPMethod": "GET",
160                            "HTTPEndpoint": "/test"
161                        }
162                    ]
163                }
164            ],
165            "Lang": "javascript",
166            "TracerVersion": "1.0.0",
167            "RuntimeID": "00000000-0000-0000-0000-000000000000",
168            "Sequence": 1
169        }"#;
170
171        let v: Value = match serde_json::from_str(stats_json) {
172            Ok(value) => value,
173            Err(err) => {
174                panic!("Failed to parse stats JSON: {err}");
175            }
176        };
177
178        let bytes = rmp_serde::to_vec(&v).unwrap();
179        let request = Request::builder()
180            .body(http_common::Body::from(bytes))
181            .unwrap();
182
183        let res = stats_utils::get_stats_from_request_body(request.into_body()).await;
184
185        let client_stats_payload = ClientStatsPayload {
186            hostname: "TestHost".to_string(),
187            env: "test".to_string(),
188            version: "1.0.0".to_string(),
189            stats: vec![ClientStatsBucket {
190                start: 0,
191                duration: 10000000000,
192                stats: vec![ClientGroupedStats {
193                    service: "test-service".to_string(),
194                    name: "test-span".to_string(),
195                    resource: "test-span".to_string(),
196                    http_status_code: 0,
197                    r#type: "".to_string(),
198                    db_type: "".to_string(),
199                    hits: 1,
200                    errors: 0,
201                    duration: 10000000,
202                    ok_summary: vec![0, 0, 0],
203                    error_summary: vec![0, 0, 0],
204                    synthetics: false,
205                    top_level_hits: 1,
206                    span_kind: "".to_string(),
207                    peer_tags: vec![],
208                    is_trace_root: NotSet.into(),
209                    grpc_status_code: "0".to_string(),
210                    http_endpoint: "/test".to_string(),
211                    http_method: "GET".to_string(),
212                    service_source: "".to_string(),
213                    span_derived_primary_tags: vec![],
214                }],
215                agent_time_shift: 0,
216            }],
217            lang: "javascript".to_string(),
218            tracer_version: "1.0.0".to_string(),
219            runtime_id: "00000000-0000-0000-0000-000000000000".to_string(),
220            sequence: 1,
221            agent_aggregation: "".to_string(),
222            service: "".to_string(),
223            container_id: "".to_string(),
224            tags: vec![],
225            git_commit_sha: "".to_string(),
226            image_tag: "".to_string(),
227            process_tags_hash: 0,
228            process_tags: "".to_string(),
229        };
230
231        assert!(
232            res.is_ok(),
233            "Expected Ok result, but got Err: {}",
234            res.unwrap_err()
235        );
236        assert_eq!(res.unwrap(), client_stats_payload)
237    }
238
239    #[tokio::test]
240    #[cfg_attr(all(miri, target_os = "macos"), ignore)]
241    async fn test_get_stats_from_request_body_without_stats() {
242        let stats_json = r#"{
243            "Hostname": "TestHost",
244            "Env": "test",
245            "Version": "1.0.0",
246            "Lang": "javascript",
247            "TracerVersion": "1.0.0",
248            "RuntimeID": "00000000-0000-0000-0000-000000000000",
249            "Sequence": 1
250        }"#;
251
252        let v: Value = match serde_json::from_str(stats_json) {
253            Ok(value) => value,
254            Err(err) => {
255                panic!("Failed to parse stats JSON: {err}");
256            }
257        };
258
259        let bytes = rmp_serde::to_vec(&v).unwrap();
260        let request = Request::builder()
261            .body(http_common::Body::from(bytes))
262            .unwrap();
263
264        let res = stats_utils::get_stats_from_request_body(request.into_body()).await;
265
266        let client_stats_payload = ClientStatsPayload {
267            hostname: "TestHost".to_string(),
268            env: "test".to_string(),
269            version: "1.0.0".to_string(),
270            stats: vec![],
271            lang: "javascript".to_string(),
272            tracer_version: "1.0.0".to_string(),
273            runtime_id: "00000000-0000-0000-0000-000000000000".to_string(),
274            sequence: 1,
275            agent_aggregation: "".to_string(),
276            service: "".to_string(),
277            container_id: "".to_string(),
278            tags: vec![],
279            git_commit_sha: "".to_string(),
280            image_tag: "".to_string(),
281            process_tags_hash: 0,
282            process_tags: "".to_string(),
283        };
284
285        assert!(
286            res.is_ok(),
287            "Expected Ok result, but got Err: {}",
288            res.unwrap_err()
289        );
290        assert_eq!(res.unwrap(), client_stats_payload)
291    }
292
293    #[tokio::test]
294    #[cfg_attr(all(miri, target_os = "macos"), ignore)]
295    async fn test_serialize_client_stats_payload_without_stats() {
296        let client_stats_payload_without_stats = ClientStatsPayload {
297            hostname: "TestHost".to_string(),
298            env: "test".to_string(),
299            version: "1.0.0".to_string(),
300            stats: vec![],
301            lang: "javascript".to_string(),
302            tracer_version: "1.0.0".to_string(),
303            runtime_id: "00000000-0000-0000-0000-000000000000".to_string(),
304            sequence: 1,
305            agent_aggregation: "".to_string(),
306            service: "".to_string(),
307            container_id: "".to_string(),
308            tags: vec![],
309            git_commit_sha: "".to_string(),
310            image_tag: "".to_string(),
311            process_tags_hash: 0,
312            process_tags: "".to_string(),
313        };
314
315        let client_stats_payload_without_inner_stats = ClientStatsPayload {
316            hostname: "TestHost".to_string(),
317            env: "test".to_string(),
318            version: "1.0.0".to_string(),
319            stats: vec![ClientStatsBucket {
320                start: 0,
321                duration: 10000000000,
322                stats: vec![],
323                agent_time_shift: 0,
324            }],
325            lang: "javascript".to_string(),
326            tracer_version: "1.0.0".to_string(),
327            runtime_id: "00000000-0000-0000-0000-000000000000".to_string(),
328            sequence: 1,
329            agent_aggregation: "".to_string(),
330            service: "".to_string(),
331            container_id: "".to_string(),
332            tags: vec![],
333            git_commit_sha: "".to_string(),
334            image_tag: "".to_string(),
335            process_tags_hash: 0,
336            process_tags: "".to_string(),
337        };
338
339        let res = stats_utils::serialize_stats_payload(stats_utils::construct_stats_payload(vec![
340            client_stats_payload_without_stats,
341            client_stats_payload_without_inner_stats,
342        ]));
343
344        assert!(
345            res.is_ok(),
346            "Expected Ok result, but got Err: {}",
347            res.unwrap_err()
348        );
349    }
350}