1#[cfg(feature = "mini_agent")]
5pub use mini_agent::*;
6
7#[cfg(feature = "mini_agent")]
8mod mini_agent {
9 use bytes::Buf;
10 use http::{Method, Request, StatusCode};
11 use http_body_util::BodyExt;
12 use libdd_common::http_common;
13 use libdd_common::Connect;
14 use libdd_common::Endpoint;
15 use libdd_common::GenericHttpClient;
16 use libdd_trace_protobuf::pb;
17 use std::io::Write;
18 use tracing::debug;
19
20 pub async fn get_stats_from_request_body(
21 body: http_common::Body,
22 ) -> anyhow::Result<pb::ClientStatsPayload> {
23 let buffer = body.collect().await?.aggregate();
24
25 let client_stats_payload: pb::ClientStatsPayload =
26 match rmp_serde::from_read(buffer.reader()) {
27 Ok(res) => res,
28 Err(err) => {
29 anyhow::bail!("Error deserializing stats from request body: {err}")
30 }
31 };
32
33 if client_stats_payload.stats.is_empty() {
34 debug!("Empty trace stats payload received, but this is okay");
35 }
36 Ok(client_stats_payload)
37 }
38
39 pub fn construct_stats_payload(stats: Vec<pb::ClientStatsPayload>) -> pb::StatsPayload {
40 let stats = stats
42 .into_iter()
43 .map(|mut stat| {
44 stat.hostname = "".to_string();
45 stat
46 })
47 .collect();
48 pb::StatsPayload {
49 agent_hostname: "".to_string(),
50 agent_env: "".to_string(),
51 stats,
52 agent_version: "".to_string(),
53 client_computed: true,
54 split_payload: false,
55 }
56 }
57
58 pub fn serialize_stats_payload(payload: pb::StatsPayload) -> anyhow::Result<Vec<u8>> {
59 let msgpack = rmp_serde::to_vec_named(&payload)?;
60 let mut encoder = flate2::write::GzEncoder::new(Vec::new(), flate2::Compression::fast());
61 encoder.write_all(&msgpack)?;
62 match encoder.finish() {
63 Ok(res) => Ok(res),
64 Err(e) => anyhow::bail!("Error serializing stats payload: {e}"),
65 }
66 }
67
68 pub async fn send_stats_payload(
69 data: Vec<u8>,
70 target: &Endpoint,
71 api_key: &str,
72 ) -> anyhow::Result<()> {
73 send_stats_payload_with_client::<libdd_common::connector::Connector>(
74 data, target, api_key, None,
75 )
76 .await
77 }
78
79 pub async fn send_stats_payload_with_client<C: Connect>(
80 data: Vec<u8>,
81 target: &Endpoint,
82 api_key: &str,
83 client: Option<&GenericHttpClient<C>>,
84 ) -> anyhow::Result<()> {
85 let req = Request::builder()
86 .method(Method::POST)
87 .uri(target.url.clone())
88 .header("Content-Type", "application/msgpack")
89 .header("Content-Encoding", "gzip")
90 .header("DD-API-KEY", api_key)
91 .body(http_common::Body::from(data.clone()))?;
92
93 let response = if let Some(client) = client {
94 client.request(req).await
95 } else {
96 let default_client = http_common::new_default_client();
97 default_client.request(req).await
98 };
99
100 match response {
101 Ok(response) => {
102 if response.status() != StatusCode::ACCEPTED {
103 let body_bytes = response.into_body().collect().await?.to_bytes();
104 let response_body = String::from_utf8(body_bytes.to_vec()).unwrap_or_default();
105 anyhow::bail!("Server did not accept trace stats: {response_body}");
106 }
107 Ok(())
108 }
109 Err(e) => anyhow::bail!("Failed to send trace stats: {e}"),
110 }
111 }
112}
113
114#[cfg(test)]
115#[cfg(feature = "mini_agent")]
116mod mini_agent_tests {
117 use crate::stats_utils;
118 use http::Request;
119 use libdd_common::http_common;
120 use libdd_trace_protobuf::pb::{
121 ClientGroupedStats, ClientStatsBucket, ClientStatsPayload, Trilean::NotSet,
122 };
123 use serde_json::Value;
124
125 #[tokio::test]
126 #[cfg_attr(all(miri, target_os = "macos"), ignore)]
127 async fn test_get_stats_from_request_body() {
128 let stats_json = r#"{
129 "Hostname": "TestHost",
130 "Env": "test",
131 "Version": "1.0.0",
132 "Stats": [
133 {
134 "Start": 0,
135 "Duration": 10000000000,
136 "Stats": [
137 {
138 "Name": "test-span",
139 "Service": "test-service",
140 "Resource": "test-span",
141 "Type": "",
142 "HTTPStatusCode": 0,
143 "Synthetics": false,
144 "Hits": 1,
145 "TopLevelHits": 1,
146 "Errors": 0,
147 "Duration": 10000000,
148 "OkSummary": [
149 0,
150 0,
151 0
152 ],
153 "ErrorSummary": [
154 0,
155 0,
156 0
157 ],
158 "GrpcStatusCode": "0",
159 "HTTPMethod": "GET",
160 "HTTPEndpoint": "/test"
161 }
162 ]
163 }
164 ],
165 "Lang": "javascript",
166 "TracerVersion": "1.0.0",
167 "RuntimeID": "00000000-0000-0000-0000-000000000000",
168 "Sequence": 1
169 }"#;
170
171 let v: Value = match serde_json::from_str(stats_json) {
172 Ok(value) => value,
173 Err(err) => {
174 panic!("Failed to parse stats JSON: {err}");
175 }
176 };
177
178 let bytes = rmp_serde::to_vec(&v).unwrap();
179 let request = Request::builder()
180 .body(http_common::Body::from(bytes))
181 .unwrap();
182
183 let res = stats_utils::get_stats_from_request_body(request.into_body()).await;
184
185 let client_stats_payload = ClientStatsPayload {
186 hostname: "TestHost".to_string(),
187 env: "test".to_string(),
188 version: "1.0.0".to_string(),
189 stats: vec![ClientStatsBucket {
190 start: 0,
191 duration: 10000000000,
192 stats: vec![ClientGroupedStats {
193 service: "test-service".to_string(),
194 name: "test-span".to_string(),
195 resource: "test-span".to_string(),
196 http_status_code: 0,
197 r#type: "".to_string(),
198 db_type: "".to_string(),
199 hits: 1,
200 errors: 0,
201 duration: 10000000,
202 ok_summary: vec![0, 0, 0],
203 error_summary: vec![0, 0, 0],
204 synthetics: false,
205 top_level_hits: 1,
206 span_kind: "".to_string(),
207 peer_tags: vec![],
208 is_trace_root: NotSet.into(),
209 grpc_status_code: "0".to_string(),
210 http_endpoint: "/test".to_string(),
211 http_method: "GET".to_string(),
212 service_source: "".to_string(),
213 span_derived_primary_tags: vec![],
214 }],
215 agent_time_shift: 0,
216 }],
217 lang: "javascript".to_string(),
218 tracer_version: "1.0.0".to_string(),
219 runtime_id: "00000000-0000-0000-0000-000000000000".to_string(),
220 sequence: 1,
221 agent_aggregation: "".to_string(),
222 service: "".to_string(),
223 container_id: "".to_string(),
224 tags: vec![],
225 git_commit_sha: "".to_string(),
226 image_tag: "".to_string(),
227 process_tags_hash: 0,
228 process_tags: "".to_string(),
229 };
230
231 assert!(
232 res.is_ok(),
233 "Expected Ok result, but got Err: {}",
234 res.unwrap_err()
235 );
236 assert_eq!(res.unwrap(), client_stats_payload)
237 }
238
239 #[tokio::test]
240 #[cfg_attr(all(miri, target_os = "macos"), ignore)]
241 async fn test_get_stats_from_request_body_without_stats() {
242 let stats_json = r#"{
243 "Hostname": "TestHost",
244 "Env": "test",
245 "Version": "1.0.0",
246 "Lang": "javascript",
247 "TracerVersion": "1.0.0",
248 "RuntimeID": "00000000-0000-0000-0000-000000000000",
249 "Sequence": 1
250 }"#;
251
252 let v: Value = match serde_json::from_str(stats_json) {
253 Ok(value) => value,
254 Err(err) => {
255 panic!("Failed to parse stats JSON: {err}");
256 }
257 };
258
259 let bytes = rmp_serde::to_vec(&v).unwrap();
260 let request = Request::builder()
261 .body(http_common::Body::from(bytes))
262 .unwrap();
263
264 let res = stats_utils::get_stats_from_request_body(request.into_body()).await;
265
266 let client_stats_payload = ClientStatsPayload {
267 hostname: "TestHost".to_string(),
268 env: "test".to_string(),
269 version: "1.0.0".to_string(),
270 stats: vec![],
271 lang: "javascript".to_string(),
272 tracer_version: "1.0.0".to_string(),
273 runtime_id: "00000000-0000-0000-0000-000000000000".to_string(),
274 sequence: 1,
275 agent_aggregation: "".to_string(),
276 service: "".to_string(),
277 container_id: "".to_string(),
278 tags: vec![],
279 git_commit_sha: "".to_string(),
280 image_tag: "".to_string(),
281 process_tags_hash: 0,
282 process_tags: "".to_string(),
283 };
284
285 assert!(
286 res.is_ok(),
287 "Expected Ok result, but got Err: {}",
288 res.unwrap_err()
289 );
290 assert_eq!(res.unwrap(), client_stats_payload)
291 }
292
293 #[tokio::test]
294 #[cfg_attr(all(miri, target_os = "macos"), ignore)]
295 async fn test_serialize_client_stats_payload_without_stats() {
296 let client_stats_payload_without_stats = ClientStatsPayload {
297 hostname: "TestHost".to_string(),
298 env: "test".to_string(),
299 version: "1.0.0".to_string(),
300 stats: vec![],
301 lang: "javascript".to_string(),
302 tracer_version: "1.0.0".to_string(),
303 runtime_id: "00000000-0000-0000-0000-000000000000".to_string(),
304 sequence: 1,
305 agent_aggregation: "".to_string(),
306 service: "".to_string(),
307 container_id: "".to_string(),
308 tags: vec![],
309 git_commit_sha: "".to_string(),
310 image_tag: "".to_string(),
311 process_tags_hash: 0,
312 process_tags: "".to_string(),
313 };
314
315 let client_stats_payload_without_inner_stats = ClientStatsPayload {
316 hostname: "TestHost".to_string(),
317 env: "test".to_string(),
318 version: "1.0.0".to_string(),
319 stats: vec![ClientStatsBucket {
320 start: 0,
321 duration: 10000000000,
322 stats: vec![],
323 agent_time_shift: 0,
324 }],
325 lang: "javascript".to_string(),
326 tracer_version: "1.0.0".to_string(),
327 runtime_id: "00000000-0000-0000-0000-000000000000".to_string(),
328 sequence: 1,
329 agent_aggregation: "".to_string(),
330 service: "".to_string(),
331 container_id: "".to_string(),
332 tags: vec![],
333 git_commit_sha: "".to_string(),
334 image_tag: "".to_string(),
335 process_tags_hash: 0,
336 process_tags: "".to_string(),
337 };
338
339 let res = stats_utils::serialize_stats_payload(stats_utils::construct_stats_payload(vec![
340 client_stats_payload_without_stats,
341 client_stats_payload_without_inner_stats,
342 ]));
343
344 assert!(
345 res.is_ok(),
346 "Expected Ok result, but got Err: {}",
347 res.unwrap_err()
348 );
349 }
350}