1#[cfg(feature = "mini_agent")]
5pub use mini_agent::*;
6
7#[cfg(feature = "mini_agent")]
8mod mini_agent {
9 use http_body_util::BodyExt;
10 use hyper::{body::Buf, Method, Request, StatusCode};
11 use libdd_common::hyper_migration;
12 use libdd_common::Endpoint;
13 use libdd_trace_protobuf::pb;
14 use std::io::Write;
15 use tracing::debug;
16
17 pub async fn get_stats_from_request_body(
18 body: hyper_migration::Body,
19 ) -> anyhow::Result<pb::ClientStatsPayload> {
20 let buffer = body.collect().await?.aggregate();
21
22 let client_stats_payload: pb::ClientStatsPayload =
23 match rmp_serde::from_read(buffer.reader()) {
24 Ok(res) => res,
25 Err(err) => {
26 anyhow::bail!("Error deserializing stats from request body: {err}")
27 }
28 };
29
30 if client_stats_payload.stats.is_empty() {
31 debug!("Empty trace stats payload received, but this is okay");
32 }
33 Ok(client_stats_payload)
34 }
35
36 pub fn construct_stats_payload(stats: Vec<pb::ClientStatsPayload>) -> pb::StatsPayload {
37 pb::StatsPayload {
38 agent_hostname: "".to_string(),
39 agent_env: "".to_string(),
40 stats,
41 agent_version: "".to_string(),
42 client_computed: true,
43 split_payload: false,
44 }
45 }
46
47 pub fn serialize_stats_payload(payload: pb::StatsPayload) -> anyhow::Result<Vec<u8>> {
48 let msgpack = rmp_serde::to_vec_named(&payload)?;
49 let mut encoder = flate2::write::GzEncoder::new(Vec::new(), flate2::Compression::fast());
50 encoder.write_all(&msgpack)?;
51 match encoder.finish() {
52 Ok(res) => Ok(res),
53 Err(e) => anyhow::bail!("Error serializing stats payload: {e}"),
54 }
55 }
56
57 pub async fn send_stats_payload(
58 data: Vec<u8>,
59 target: &Endpoint,
60 api_key: &str,
61 ) -> anyhow::Result<()> {
62 let req = Request::builder()
63 .method(Method::POST)
64 .uri(target.url.clone())
65 .header("Content-Type", "application/msgpack")
66 .header("Content-Encoding", "gzip")
67 .header("DD-API-KEY", api_key)
68 .body(hyper_migration::Body::from(data.clone()))?;
69
70 let client = hyper_migration::new_default_client();
71 match client.request(req).await {
72 Ok(response) => {
73 if response.status() != StatusCode::ACCEPTED {
74 let body_bytes = response.into_body().collect().await?.to_bytes();
75 let response_body = String::from_utf8(body_bytes.to_vec()).unwrap_or_default();
76 anyhow::bail!("Server did not accept trace stats: {response_body}");
77 }
78 Ok(())
79 }
80 Err(e) => anyhow::bail!("Failed to send trace stats: {e}"),
81 }
82 }
83}
84
85#[cfg(test)]
86#[cfg(feature = "mini_agent")]
87mod mini_agent_tests {
88 use crate::stats_utils;
89 use hyper::Request;
90 use libdd_common::hyper_migration;
91 use libdd_trace_protobuf::pb::{
92 ClientGroupedStats, ClientStatsBucket, ClientStatsPayload, Trilean::NotSet,
93 };
94 use serde_json::Value;
95
96 #[tokio::test]
97 #[cfg_attr(all(miri, target_os = "macos"), ignore)]
98 async fn test_get_stats_from_request_body() {
99 let stats_json = r#"{
100 "Hostname": "TestHost",
101 "Env": "test",
102 "Version": "1.0.0",
103 "Stats": [
104 {
105 "Start": 0,
106 "Duration": 10000000000,
107 "Stats": [
108 {
109 "Name": "test-span",
110 "Service": "test-service",
111 "Resource": "test-span",
112 "Type": "",
113 "HTTPStatusCode": 0,
114 "Synthetics": false,
115 "Hits": 1,
116 "TopLevelHits": 1,
117 "Errors": 0,
118 "Duration": 10000000,
119 "OkSummary": [
120 0,
121 0,
122 0
123 ],
124 "ErrorSummary": [
125 0,
126 0,
127 0
128 ],
129 "GrpcStatusCode": "0",
130 "HTTPMethod": "GET",
131 "HTTPEndpoint": "/test"
132 }
133 ]
134 }
135 ],
136 "Lang": "javascript",
137 "TracerVersion": "1.0.0",
138 "RuntimeID": "00000000-0000-0000-0000-000000000000",
139 "Sequence": 1
140 }"#;
141
142 let v: Value = match serde_json::from_str(stats_json) {
143 Ok(value) => value,
144 Err(err) => {
145 panic!("Failed to parse stats JSON: {err}");
146 }
147 };
148
149 let bytes = rmp_serde::to_vec(&v).unwrap();
150 let request = Request::builder()
151 .body(hyper_migration::Body::from(bytes))
152 .unwrap();
153
154 let res = stats_utils::get_stats_from_request_body(request.into_body()).await;
155
156 let client_stats_payload = ClientStatsPayload {
157 hostname: "TestHost".to_string(),
158 env: "test".to_string(),
159 version: "1.0.0".to_string(),
160 stats: vec![ClientStatsBucket {
161 start: 0,
162 duration: 10000000000,
163 stats: vec![ClientGroupedStats {
164 service: "test-service".to_string(),
165 name: "test-span".to_string(),
166 resource: "test-span".to_string(),
167 http_status_code: 0,
168 r#type: "".to_string(),
169 db_type: "".to_string(),
170 hits: 1,
171 errors: 0,
172 duration: 10000000,
173 ok_summary: vec![0, 0, 0],
174 error_summary: vec![0, 0, 0],
175 synthetics: false,
176 top_level_hits: 1,
177 span_kind: "".to_string(),
178 peer_tags: vec![],
179 is_trace_root: NotSet.into(),
180 grpc_status_code: "0".to_string(),
181 http_endpoint: "/test".to_string(),
182 http_method: "GET".to_string(),
183 }],
184 agent_time_shift: 0,
185 }],
186 lang: "javascript".to_string(),
187 tracer_version: "1.0.0".to_string(),
188 runtime_id: "00000000-0000-0000-0000-000000000000".to_string(),
189 sequence: 1,
190 agent_aggregation: "".to_string(),
191 service: "".to_string(),
192 container_id: "".to_string(),
193 tags: vec![],
194 git_commit_sha: "".to_string(),
195 image_tag: "".to_string(),
196 process_tags_hash: 0,
197 process_tags: "".to_string(),
198 };
199
200 assert!(
201 res.is_ok(),
202 "Expected Ok result, but got Err: {}",
203 res.unwrap_err()
204 );
205 assert_eq!(res.unwrap(), client_stats_payload)
206 }
207
208 #[tokio::test]
209 #[cfg_attr(all(miri, target_os = "macos"), ignore)]
210 async fn test_get_stats_from_request_body_without_stats() {
211 let stats_json = r#"{
212 "Hostname": "TestHost",
213 "Env": "test",
214 "Version": "1.0.0",
215 "Lang": "javascript",
216 "TracerVersion": "1.0.0",
217 "RuntimeID": "00000000-0000-0000-0000-000000000000",
218 "Sequence": 1
219 }"#;
220
221 let v: Value = match serde_json::from_str(stats_json) {
222 Ok(value) => value,
223 Err(err) => {
224 panic!("Failed to parse stats JSON: {err}");
225 }
226 };
227
228 let bytes = rmp_serde::to_vec(&v).unwrap();
229 let request = Request::builder()
230 .body(hyper_migration::Body::from(bytes))
231 .unwrap();
232
233 let res = stats_utils::get_stats_from_request_body(request.into_body()).await;
234
235 let client_stats_payload = ClientStatsPayload {
236 hostname: "TestHost".to_string(),
237 env: "test".to_string(),
238 version: "1.0.0".to_string(),
239 stats: vec![],
240 lang: "javascript".to_string(),
241 tracer_version: "1.0.0".to_string(),
242 runtime_id: "00000000-0000-0000-0000-000000000000".to_string(),
243 sequence: 1,
244 agent_aggregation: "".to_string(),
245 service: "".to_string(),
246 container_id: "".to_string(),
247 tags: vec![],
248 git_commit_sha: "".to_string(),
249 image_tag: "".to_string(),
250 process_tags_hash: 0,
251 process_tags: "".to_string(),
252 };
253
254 assert!(
255 res.is_ok(),
256 "Expected Ok result, but got Err: {}",
257 res.unwrap_err()
258 );
259 assert_eq!(res.unwrap(), client_stats_payload)
260 }
261
262 #[tokio::test]
263 #[cfg_attr(all(miri, target_os = "macos"), ignore)]
264 async fn test_serialize_client_stats_payload_without_stats() {
265 let client_stats_payload_without_stats = ClientStatsPayload {
266 hostname: "TestHost".to_string(),
267 env: "test".to_string(),
268 version: "1.0.0".to_string(),
269 stats: vec![],
270 lang: "javascript".to_string(),
271 tracer_version: "1.0.0".to_string(),
272 runtime_id: "00000000-0000-0000-0000-000000000000".to_string(),
273 sequence: 1,
274 agent_aggregation: "".to_string(),
275 service: "".to_string(),
276 container_id: "".to_string(),
277 tags: vec![],
278 git_commit_sha: "".to_string(),
279 image_tag: "".to_string(),
280 process_tags_hash: 0,
281 process_tags: "".to_string(),
282 };
283
284 let client_stats_payload_without_inner_stats = ClientStatsPayload {
285 hostname: "TestHost".to_string(),
286 env: "test".to_string(),
287 version: "1.0.0".to_string(),
288 stats: vec![ClientStatsBucket {
289 start: 0,
290 duration: 10000000000,
291 stats: vec![],
292 agent_time_shift: 0,
293 }],
294 lang: "javascript".to_string(),
295 tracer_version: "1.0.0".to_string(),
296 runtime_id: "00000000-0000-0000-0000-000000000000".to_string(),
297 sequence: 1,
298 agent_aggregation: "".to_string(),
299 service: "".to_string(),
300 container_id: "".to_string(),
301 tags: vec![],
302 git_commit_sha: "".to_string(),
303 image_tag: "".to_string(),
304 process_tags_hash: 0,
305 process_tags: "".to_string(),
306 };
307
308 let res = stats_utils::serialize_stats_payload(stats_utils::construct_stats_payload(vec![
309 client_stats_payload_without_stats,
310 client_stats_payload_without_inner_stats,
311 ]));
312
313 assert!(
314 res.is_ok(),
315 "Expected Ok result, but got Err: {}",
316 res.unwrap_err()
317 );
318 }
319}