1#[cfg(feature = "mini_agent")]
5pub use mini_agent::*;
6
7#[cfg(feature = "mini_agent")]
8mod mini_agent {
9 use bytes::{Buf, Bytes};
10 use http_body_util::BodyExt;
11 use libdd_capabilities::HttpClientCapability;
12 use libdd_common::http_common;
13 use libdd_common::Endpoint;
14 use libdd_trace_protobuf::pb;
15 use std::io::Write;
16 use tracing::debug;
17
18 pub async fn get_stats_from_request_body(
19 body: http_common::Body,
20 ) -> anyhow::Result<pb::ClientStatsPayload> {
21 let buffer = BodyExt::collect(body).await?.aggregate();
22
23 let client_stats_payload: pb::ClientStatsPayload =
24 match rmp_serde::from_read(buffer.reader()) {
25 Ok(res) => res,
26 Err(err) => {
27 anyhow::bail!("Error deserializing stats from request body: {err}")
28 }
29 };
30
31 if client_stats_payload.stats.is_empty() {
32 debug!("Empty trace stats payload received, but this is okay");
33 }
34 Ok(client_stats_payload)
35 }
36
37 pub fn construct_stats_payload(stats: Vec<pb::ClientStatsPayload>) -> pb::StatsPayload {
38 let stats = stats
40 .into_iter()
41 .map(|mut stat| {
42 stat.hostname = "".to_string();
43 stat
44 })
45 .collect();
46 pb::StatsPayload {
47 agent_hostname: "".to_string(),
48 agent_env: "".to_string(),
49 stats,
50 agent_version: "".to_string(),
51 client_computed: true,
52 split_payload: false,
53 }
54 }
55
56 pub fn serialize_stats_payload(payload: pb::StatsPayload) -> anyhow::Result<Vec<u8>> {
57 let msgpack = rmp_serde::to_vec_named(&payload)?;
58 let mut encoder = flate2::write::GzEncoder::new(Vec::new(), flate2::Compression::fast());
59 encoder.write_all(&msgpack)?;
60 match encoder.finish() {
61 Ok(res) => Ok(res),
62 Err(e) => anyhow::bail!("Error serializing stats payload: {e}"),
63 }
64 }
65
66 pub async fn send_stats_payload<H: HttpClientCapability>(
67 data: Vec<u8>,
68 target: &Endpoint,
69 api_key: &str,
70 ) -> anyhow::Result<()> {
71 let client = H::new_client();
72 let req = http::Request::builder()
73 .method(http::Method::POST)
74 .uri(target.url.clone())
75 .header("Content-Type", "application/msgpack")
76 .header("Content-Encoding", "gzip")
77 .header("DD-API-KEY", api_key)
78 .body(Bytes::from(data))?;
79
80 let response = client
81 .request(req)
82 .await
83 .map_err(|e| anyhow::anyhow!("Failed to send trace stats: {e}"))?;
84
85 if response.status() != http::StatusCode::ACCEPTED {
86 let response_body =
87 String::from_utf8(response.into_body().to_vec()).unwrap_or_default();
88 anyhow::bail!("Server did not accept trace stats: {response_body}");
89 }
90 Ok(())
91 }
92}
93
94#[cfg(test)]
95#[cfg(feature = "mini_agent")]
96mod mini_agent_tests {
97 use crate::stats_utils;
98 use http::Request;
99 use libdd_common::http_common;
100 use libdd_trace_protobuf::pb::{
101 ClientGroupedStats, ClientStatsBucket, ClientStatsPayload, Trilean::NotSet,
102 };
103 use serde_json::Value;
104
105 #[tokio::test]
106 #[cfg_attr(all(miri, target_os = "macos"), ignore)]
107 async fn test_get_stats_from_request_body() {
108 let stats_json = r#"{
109 "Hostname": "TestHost",
110 "Env": "test",
111 "Version": "1.0.0",
112 "Stats": [
113 {
114 "Start": 0,
115 "Duration": 10000000000,
116 "Stats": [
117 {
118 "Name": "test-span",
119 "Service": "test-service",
120 "Resource": "test-span",
121 "Type": "",
122 "HTTPStatusCode": 0,
123 "Synthetics": false,
124 "Hits": 1,
125 "TopLevelHits": 1,
126 "Errors": 0,
127 "Duration": 10000000,
128 "OkSummary": [
129 0,
130 0,
131 0
132 ],
133 "ErrorSummary": [
134 0,
135 0,
136 0
137 ],
138 "GRPCStatusCode": "0",
139 "HTTPMethod": "GET",
140 "HTTPEndpoint": "/test"
141 }
142 ]
143 }
144 ],
145 "Lang": "javascript",
146 "TracerVersion": "1.0.0",
147 "RuntimeID": "00000000-0000-0000-0000-000000000000",
148 "Sequence": 1
149 }"#;
150
151 let v: Value = match serde_json::from_str(stats_json) {
152 Ok(value) => value,
153 Err(err) => {
154 panic!("Failed to parse stats JSON: {err}");
155 }
156 };
157
158 let bytes = rmp_serde::to_vec(&v).unwrap();
159 let request = Request::builder()
160 .body(http_common::Body::from(bytes))
161 .unwrap();
162
163 let res = stats_utils::get_stats_from_request_body(request.into_body()).await;
164
165 let client_stats_payload = ClientStatsPayload {
166 hostname: "TestHost".to_string(),
167 env: "test".to_string(),
168 version: "1.0.0".to_string(),
169 stats: vec![ClientStatsBucket {
170 start: 0,
171 duration: 10000000000,
172 stats: vec![ClientGroupedStats {
173 service: "test-service".to_string(),
174 name: "test-span".to_string(),
175 resource: "test-span".to_string(),
176 http_status_code: 0,
177 r#type: "".to_string(),
178 db_type: "".to_string(),
179 hits: 1,
180 errors: 0,
181 duration: 10000000,
182 ok_summary: vec![0, 0, 0],
183 error_summary: vec![0, 0, 0],
184 synthetics: false,
185 top_level_hits: 1,
186 span_kind: "".to_string(),
187 peer_tags: vec![],
188 is_trace_root: NotSet.into(),
189 grpc_status_code: "0".to_string(),
190 http_endpoint: "/test".to_string(),
191 http_method: "GET".to_string(),
192 service_source: "".to_string(),
193 span_derived_primary_tags: vec![],
194 }],
195 agent_time_shift: 0,
196 }],
197 lang: "javascript".to_string(),
198 tracer_version: "1.0.0".to_string(),
199 runtime_id: "00000000-0000-0000-0000-000000000000".to_string(),
200 sequence: 1,
201 agent_aggregation: "".to_string(),
202 service: "".to_string(),
203 container_id: "".to_string(),
204 tags: vec![],
205 git_commit_sha: "".to_string(),
206 image_tag: "".to_string(),
207 process_tags_hash: 0,
208 process_tags: "".to_string(),
209 };
210
211 assert!(
212 res.is_ok(),
213 "Expected Ok result, but got Err: {}",
214 res.unwrap_err()
215 );
216 assert_eq!(res.unwrap(), client_stats_payload)
217 }
218
219 #[tokio::test]
220 #[cfg_attr(all(miri, target_os = "macos"), ignore)]
221 async fn test_get_stats_from_request_body_without_stats() {
222 let stats_json = r#"{
223 "Hostname": "TestHost",
224 "Env": "test",
225 "Version": "1.0.0",
226 "Lang": "javascript",
227 "TracerVersion": "1.0.0",
228 "RuntimeID": "00000000-0000-0000-0000-000000000000",
229 "Sequence": 1
230 }"#;
231
232 let v: Value = match serde_json::from_str(stats_json) {
233 Ok(value) => value,
234 Err(err) => {
235 panic!("Failed to parse stats JSON: {err}");
236 }
237 };
238
239 let bytes = rmp_serde::to_vec(&v).unwrap();
240 let request = Request::builder()
241 .body(http_common::Body::from(bytes))
242 .unwrap();
243
244 let res = stats_utils::get_stats_from_request_body(request.into_body()).await;
245
246 let client_stats_payload = ClientStatsPayload {
247 hostname: "TestHost".to_string(),
248 env: "test".to_string(),
249 version: "1.0.0".to_string(),
250 stats: vec![],
251 lang: "javascript".to_string(),
252 tracer_version: "1.0.0".to_string(),
253 runtime_id: "00000000-0000-0000-0000-000000000000".to_string(),
254 sequence: 1,
255 agent_aggregation: "".to_string(),
256 service: "".to_string(),
257 container_id: "".to_string(),
258 tags: vec![],
259 git_commit_sha: "".to_string(),
260 image_tag: "".to_string(),
261 process_tags_hash: 0,
262 process_tags: "".to_string(),
263 };
264
265 assert!(
266 res.is_ok(),
267 "Expected Ok result, but got Err: {}",
268 res.unwrap_err()
269 );
270 assert_eq!(res.unwrap(), client_stats_payload)
271 }
272
273 #[tokio::test]
274 #[cfg_attr(all(miri, target_os = "macos"), ignore)]
275 async fn test_serialize_client_stats_payload_without_stats() {
276 let client_stats_payload_without_stats = ClientStatsPayload {
277 hostname: "TestHost".to_string(),
278 env: "test".to_string(),
279 version: "1.0.0".to_string(),
280 stats: vec![],
281 lang: "javascript".to_string(),
282 tracer_version: "1.0.0".to_string(),
283 runtime_id: "00000000-0000-0000-0000-000000000000".to_string(),
284 sequence: 1,
285 agent_aggregation: "".to_string(),
286 service: "".to_string(),
287 container_id: "".to_string(),
288 tags: vec![],
289 git_commit_sha: "".to_string(),
290 image_tag: "".to_string(),
291 process_tags_hash: 0,
292 process_tags: "".to_string(),
293 };
294
295 let client_stats_payload_without_inner_stats = ClientStatsPayload {
296 hostname: "TestHost".to_string(),
297 env: "test".to_string(),
298 version: "1.0.0".to_string(),
299 stats: vec![ClientStatsBucket {
300 start: 0,
301 duration: 10000000000,
302 stats: vec![],
303 agent_time_shift: 0,
304 }],
305 lang: "javascript".to_string(),
306 tracer_version: "1.0.0".to_string(),
307 runtime_id: "00000000-0000-0000-0000-000000000000".to_string(),
308 sequence: 1,
309 agent_aggregation: "".to_string(),
310 service: "".to_string(),
311 container_id: "".to_string(),
312 tags: vec![],
313 git_commit_sha: "".to_string(),
314 image_tag: "".to_string(),
315 process_tags_hash: 0,
316 process_tags: "".to_string(),
317 };
318
319 let res = stats_utils::serialize_stats_payload(stats_utils::construct_stats_payload(vec![
320 client_stats_payload_without_stats,
321 client_stats_payload_without_inner_stats,
322 ]));
323
324 assert!(
325 res.is_ok(),
326 "Expected Ok result, but got Err: {}",
327 res.unwrap_err()
328 );
329 }
330}