libdd_profiling/exporter/
mod.rs1use anyhow::Context;
5use bytes::Bytes;
6pub use chrono::{DateTime, Utc};
7pub use hyper::Uri;
8use hyper_multipart_rfc7578::client::multipart;
9pub use libdd_common::tag::Tag;
10use serde_json::json;
11use std::borrow::Cow;
12use std::fmt::Debug;
13use std::io::{Cursor, Write};
14use std::{future, iter};
15use tokio::runtime::Runtime;
16use tokio_util::sync::CancellationToken;
17
18use libdd_common::{
19 azure_app_services, connector, hyper_migration, tag, Endpoint, HttpClient, HttpResponse,
20};
21
22pub mod config;
23mod errors;
24
25#[cfg(unix)]
26pub use connector::uds::{socket_path_from_uri, socket_path_to_uri};
27
28#[cfg(windows)]
29pub use connector::named_pipe::{named_pipe_path_from_uri, named_pipe_path_to_uri};
30
31use crate::internal::{EncodedProfile, Profile};
32use crate::profiles::{Compressor, DefaultProfileCodec};
33
34const DURATION_ZERO: std::time::Duration = std::time::Duration::from_millis(0);
35
36pub struct Exporter {
37 client: HttpClient,
38 runtime: Runtime,
39}
40
41pub struct Fields {
42 pub start: DateTime<Utc>,
43 pub end: DateTime<Utc>,
44}
45
46pub struct ProfileExporter {
47 exporter: Exporter,
48 endpoint: Endpoint,
49 family: Cow<'static, str>,
50 profiling_library_name: Cow<'static, str>,
51 profiling_library_version: Cow<'static, str>,
52 tags: Option<Vec<Tag>>,
53}
54
55pub struct File<'a> {
56 pub name: &'a str,
57 pub bytes: &'a [u8],
58}
59
60#[derive(Debug)]
61pub struct Request {
62 timeout: Option<std::time::Duration>,
63 req: hyper_migration::HttpRequest,
64}
65
66impl From<hyper_migration::HttpRequest> for Request {
67 fn from(req: hyper_migration::HttpRequest) -> Self {
68 Self { req, timeout: None }
69 }
70}
71
72impl Request {
73 fn with_timeout(mut self, timeout: std::time::Duration) -> Self {
74 self.timeout = if timeout != DURATION_ZERO {
75 Some(timeout)
76 } else {
77 None
78 };
79 self
80 }
81
82 pub fn timeout(&self) -> &Option<std::time::Duration> {
83 &self.timeout
84 }
85
86 pub fn uri(&self) -> &hyper::Uri {
87 self.req.uri()
88 }
89
90 pub fn headers(&self) -> &hyper::HeaderMap {
91 self.req.headers()
92 }
93
94 pub fn body(self) -> hyper_migration::Body {
95 self.req.into_body()
96 }
97
98 async fn send(
99 self,
100 client: &HttpClient,
101 cancel: Option<&CancellationToken>,
102 ) -> anyhow::Result<HttpResponse> {
103 tokio::select! {
104 _ = async { match cancel {
105 Some(cancellation_token) => cancellation_token.cancelled().await,
106 None => future::pending().await,
108 }}
109 => Err(crate::exporter::errors::Error::UserRequestedCancellation.into()),
110 result = async {
111 match self.timeout {
112 Some(t) => {
113 let res = tokio::time::timeout(t, client.request(self.req)).await;
114 res
115 .map_err(|_| anyhow::Error::from(crate::exporter::errors::Error::OperationTimedOut))
116 },
117 None => Ok(client.request(self.req).await),
118 }
119 }
120 => Ok(hyper_migration::into_response(result??)),
121 }
122 }
123}
124
125impl ProfileExporter {
126 pub fn new<F, N, V>(
138 profiling_library_name: N,
139 profiling_library_version: V,
140 family: F,
141 tags: Option<Vec<Tag>>,
142 endpoint: Endpoint,
143 ) -> anyhow::Result<ProfileExporter>
144 where
145 F: Into<Cow<'static, str>>,
146 N: Into<Cow<'static, str>>,
147 V: Into<Cow<'static, str>>,
148 {
149 Ok(Self {
150 exporter: Exporter::new()?,
151 endpoint,
152 family: family.into(),
153 profiling_library_name: profiling_library_name.into(),
154 profiling_library_version: profiling_library_version.into(),
155 tags,
156 })
157 }
158
159 const TARGET_TRIPLE: &'static str = target_triple::TARGET;
166
167 #[inline]
168 fn runtime_platform_tag(&self) -> Tag {
169 tag!("runtime_platform", ProfileExporter::TARGET_TRIPLE)
170 }
171
172 #[allow(clippy::too_many_arguments)]
173 pub fn build(
185 &self,
186 profile: EncodedProfile,
187 files_to_compress_and_export: &[File],
188 files_to_export_unmodified: &[File],
189 additional_tags: Option<&Vec<Tag>>,
190 internal_metadata: Option<serde_json::Value>,
191 info: Option<serde_json::Value>,
192 ) -> anyhow::Result<Request> {
193 let mut form = multipart::Form::default();
194
195 let mut tags_profiler = String::new();
197 let other_tags = additional_tags.into_iter();
198 for tag in self.tags.iter().chain(other_tags).flatten() {
199 tags_profiler.push_str(tag.as_ref());
200 tags_profiler.push(',');
201 }
202
203 if let Some(aas_metadata) = &*azure_app_services::AAS_METADATA {
204 let aas_tags = [
205 ("aas.resource.id", aas_metadata.get_resource_id()),
206 (
207 "aas.environment.extension_version",
208 aas_metadata.get_extension_version(),
209 ),
210 (
211 "aas.environment.instance_id",
212 aas_metadata.get_instance_id(),
213 ),
214 (
215 "aas.environment.instance_name",
216 aas_metadata.get_instance_name(),
217 ),
218 ("aas.environment.os", aas_metadata.get_operating_system()),
219 ("aas.resource.group", aas_metadata.get_resource_group()),
220 ("aas.site.name", aas_metadata.get_site_name()),
221 ("aas.site.kind", aas_metadata.get_site_kind()),
222 ("aas.site.type", aas_metadata.get_site_type()),
223 ("aas.subscription.id", aas_metadata.get_subscription_id()),
224 ];
225 aas_tags.into_iter().for_each(|(name, value)| {
226 if let Ok(tag) = Tag::new(name, value) {
227 tags_profiler.push_str(tag.as_ref());
228 tags_profiler.push(',');
229 }
230 });
231 }
232
233 tags_profiler.push_str(self.runtime_platform_tag().as_ref());
237
238 let attachments: Vec<String> = files_to_compress_and_export
239 .iter()
240 .chain(files_to_export_unmodified.iter())
241 .map(|file| file.name.to_owned())
242 .chain(iter::once("profile.pprof".to_string()))
243 .collect();
244
245 let endpoint_counts = if profile.endpoints_stats.is_empty() {
246 None
247 } else {
248 Some(profile.endpoints_stats)
249 };
250 let mut internal: serde_json::value::Value = internal_metadata.unwrap_or_else(|| json!({}));
251 internal["libdatadog_version"] = json!(env!("CARGO_PKG_VERSION"));
252
253 let event = json!({
254 "attachments": attachments,
255 "tags_profiler": tags_profiler,
256 "start": DateTime::<Utc>::from(profile.start).format("%Y-%m-%dT%H:%M:%S%.9fZ").to_string(),
257 "end": DateTime::<Utc>::from(profile.end).format("%Y-%m-%dT%H:%M:%S%.9fZ").to_string(),
258 "family": self.family.as_ref(),
259 "version": "4",
260 "endpoint_counts" : endpoint_counts,
261 "internal": internal,
262 "info": info.unwrap_or_else(|| json!({})),
263 })
264 .to_string();
265
266 form.add_reader_file_with_mime(
267 "event",
269 Cursor::new(event),
271 "event.json",
272 mime::APPLICATION_JSON,
273 );
274
275 for file in files_to_compress_and_export {
276 let capacity = (file.bytes.len() >> 3).next_power_of_two();
279 let max_capacity = 10 * 1024 * 1024;
282 let compression_level = Profile::COMPRESSION_LEVEL;
286 let mut encoder = Compressor::<DefaultProfileCodec>::try_new(
287 capacity,
288 max_capacity,
289 compression_level,
290 )
291 .context("failed to create compressor")?;
292 encoder.write_all(file.bytes)?;
293 let encoded = encoder.finish()?;
294 form.add_reader_file(file.name, Cursor::new(encoded), file.name);
300 }
301
302 for file in files_to_export_unmodified {
303 let encoded = file.bytes.to_vec();
304 form.add_reader_file(file.name, Cursor::new(encoded), file.name)
310 }
311 form.add_reader_file(
313 "profile.pprof",
314 Cursor::new(profile.buffer),
315 "profile.pprof",
316 );
317
318 let builder = self
319 .endpoint
320 .to_request_builder(concat!("DDProf/", env!("CARGO_PKG_VERSION")))?
321 .method(http::Method::POST)
322 .header("Connection", "close")
323 .header("DD-EVP-ORIGIN", self.profiling_library_name.as_ref())
324 .header(
325 "DD-EVP-ORIGIN-VERSION",
326 self.profiling_library_version.as_ref(),
327 );
328
329 Ok(Request::from(
330 form.set_body::<multipart::Body>(builder)?
331 .map(hyper_migration::Body::boxed),
332 )
333 .with_timeout(std::time::Duration::from_millis(self.endpoint.timeout_ms)))
334 }
335
336 pub fn send(
337 &self,
338 request: Request,
339 cancel: Option<&CancellationToken>,
340 ) -> anyhow::Result<HttpResponse> {
341 self.exporter
342 .runtime
343 .block_on(request.send(&self.exporter.client, cancel))
344 }
345
346 pub fn set_timeout(&mut self, timeout_ms: u64) {
347 self.endpoint.timeout_ms = timeout_ms;
348 }
349}
350
351impl Exporter {
352 pub fn new() -> anyhow::Result<Self> {
354 let client = hyper_migration::new_client_periodic();
356 let runtime = tokio::runtime::Builder::new_current_thread()
357 .enable_all()
358 .build()?;
359 Ok(Self { client, runtime })
360 }
361
362 pub fn send(
363 &self,
364 http_method: http::Method,
365 url: &str,
366 mut headers: hyper::header::HeaderMap,
367 body: &[u8],
368 timeout: std::time::Duration,
369 ) -> anyhow::Result<hyper_migration::HttpResponse> {
370 self.runtime.block_on(async {
371 let mut request = hyper::Request::builder()
372 .method(http_method)
373 .uri(url)
374 .body(hyper_migration::Body::from_bytes(Bytes::copy_from_slice(
375 body,
376 )))?;
377 std::mem::swap(request.headers_mut(), &mut headers);
378
379 let request: Request = request.into();
380 request.with_timeout(timeout).send(&self.client, None).await
381 })
382 }
383}