libdd_profiling/exporter/
mod.rs

1// Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/
2// SPDX-License-Identifier: Apache-2.0
3
4use anyhow::Context;
5use bytes::Bytes;
6pub use chrono::{DateTime, Utc};
7pub use hyper::Uri;
8use hyper_multipart_rfc7578::client::multipart;
9pub use libdd_common::tag::Tag;
10use serde_json::json;
11use std::borrow::Cow;
12use std::fmt::Debug;
13use std::io::{Cursor, Write};
14use std::{future, iter};
15use tokio::runtime::Runtime;
16use tokio_util::sync::CancellationToken;
17
18use libdd_common::{
19    azure_app_services, connector, hyper_migration, tag, Endpoint, HttpClient, HttpResponse,
20};
21
22pub mod config;
23mod errors;
24
25#[cfg(unix)]
26pub use connector::uds::{socket_path_from_uri, socket_path_to_uri};
27
28#[cfg(windows)]
29pub use connector::named_pipe::{named_pipe_path_from_uri, named_pipe_path_to_uri};
30
31use crate::internal::{EncodedProfile, Profile};
32use crate::profiles::{Compressor, DefaultProfileCodec};
33
34const DURATION_ZERO: std::time::Duration = std::time::Duration::from_millis(0);
35
36pub struct Exporter {
37    client: HttpClient,
38    runtime: Runtime,
39}
40
41pub struct Fields {
42    pub start: DateTime<Utc>,
43    pub end: DateTime<Utc>,
44}
45
46pub struct ProfileExporter {
47    exporter: Exporter,
48    endpoint: Endpoint,
49    family: Cow<'static, str>,
50    profiling_library_name: Cow<'static, str>,
51    profiling_library_version: Cow<'static, str>,
52    tags: Option<Vec<Tag>>,
53}
54
55pub struct File<'a> {
56    pub name: &'a str,
57    pub bytes: &'a [u8],
58}
59
60#[derive(Debug)]
61pub struct Request {
62    timeout: Option<std::time::Duration>,
63    req: hyper_migration::HttpRequest,
64}
65
66impl From<hyper_migration::HttpRequest> for Request {
67    fn from(req: hyper_migration::HttpRequest) -> Self {
68        Self { req, timeout: None }
69    }
70}
71
72impl Request {
73    fn with_timeout(mut self, timeout: std::time::Duration) -> Self {
74        self.timeout = if timeout != DURATION_ZERO {
75            Some(timeout)
76        } else {
77            None
78        };
79        self
80    }
81
82    pub fn timeout(&self) -> &Option<std::time::Duration> {
83        &self.timeout
84    }
85
86    pub fn uri(&self) -> &hyper::Uri {
87        self.req.uri()
88    }
89
90    pub fn headers(&self) -> &hyper::HeaderMap {
91        self.req.headers()
92    }
93
94    pub fn body(self) -> hyper_migration::Body {
95        self.req.into_body()
96    }
97
98    async fn send(
99        self,
100        client: &HttpClient,
101        cancel: Option<&CancellationToken>,
102    ) -> anyhow::Result<HttpResponse> {
103        tokio::select! {
104            _ = async { match cancel {
105                    Some(cancellation_token) => cancellation_token.cancelled().await,
106                    // If no token is provided, future::pending() provides a no-op future that never resolves
107                    None => future::pending().await,
108                }}
109            => Err(crate::exporter::errors::Error::UserRequestedCancellation.into()),
110            result = async {
111                match self.timeout {
112                    Some(t) => {
113                        let res = tokio::time::timeout(t, client.request(self.req)).await;
114                        res
115                            .map_err(|_| anyhow::Error::from(crate::exporter::errors::Error::OperationTimedOut))
116                    },
117                    None => Ok(client.request(self.req).await),
118                }
119            }
120            => Ok(hyper_migration::into_response(result??)),
121        }
122    }
123}
124
125impl ProfileExporter {
126    /// Creates a new exporter to be used to report profiling data.
127    /// # Arguments
128    /// * `profiling_library_name` - Profiling library name, usually dd-trace-something, e.g. "dd-trace-rb". See
129    ///   https://datadoghq.atlassian.net/wiki/spaces/PROF/pages/1538884229/Client#Header-values (Datadog internal link)
130    ///   for a list of common values.
131    /// * `profiling_library_version` - Version used when publishing the profiling library to a
132    ///   package manager
133    /// * `family` - Profile family, e.g. "ruby"
134    /// * `tags` - Tags to include with every profile reported by this exporter. It's also possible
135    ///   to include profile-specific tags, see `additional_tags` on `build`.
136    /// * `endpoint` - Configuration for reporting data
137    pub fn new<F, N, V>(
138        profiling_library_name: N,
139        profiling_library_version: V,
140        family: F,
141        tags: Option<Vec<Tag>>,
142        endpoint: Endpoint,
143    ) -> anyhow::Result<ProfileExporter>
144    where
145        F: Into<Cow<'static, str>>,
146        N: Into<Cow<'static, str>>,
147        V: Into<Cow<'static, str>>,
148    {
149        Ok(Self {
150            exporter: Exporter::new()?,
151            endpoint,
152            family: family.into(),
153            profiling_library_name: profiling_library_name.into(),
154            profiling_library_version: profiling_library_version.into(),
155            tags,
156        })
157    }
158
159    /// The target triple. This is a string like:
160    ///  - aarch64-apple-darwin
161    ///  - x86_64-unknown-linux-gnu
162    ///
163    /// The name is which is a misnomer, it traditionally had 3 pieces, but
164    /// it's commonly 4+ fragments today.
165    const TARGET_TRIPLE: &'static str = target_triple::TARGET;
166
167    #[inline]
168    fn runtime_platform_tag(&self) -> Tag {
169        tag!("runtime_platform", ProfileExporter::TARGET_TRIPLE)
170    }
171
172    #[allow(clippy::too_many_arguments)]
173    /// Build a Request object representing the profile information provided.
174    ///
175    /// Consumes the `EncodedProfile`, which is unavailable for use after.
176    ///
177    /// For details on the `internal_metadata` parameter, please reference the Datadog-internal
178    /// "RFC: Attaching internal metadata to pprof profiles".
179    /// If you use this parameter, please update the RFC with your use-case, so we can keep track of
180    /// how this is getting used.
181    ///
182    /// For details on the `info` parameter, please reference the Datadog-internal
183    /// "RFC: Pprof System Info Support".
184    pub fn build(
185        &self,
186        profile: EncodedProfile,
187        files_to_compress_and_export: &[File],
188        files_to_export_unmodified: &[File],
189        additional_tags: Option<&Vec<Tag>>,
190        internal_metadata: Option<serde_json::Value>,
191        info: Option<serde_json::Value>,
192    ) -> anyhow::Result<Request> {
193        let mut form = multipart::Form::default();
194
195        // combine tags and additional_tags
196        let mut tags_profiler = String::new();
197        let other_tags = additional_tags.into_iter();
198        for tag in self.tags.iter().chain(other_tags).flatten() {
199            tags_profiler.push_str(tag.as_ref());
200            tags_profiler.push(',');
201        }
202
203        if let Some(aas_metadata) = &*azure_app_services::AAS_METADATA {
204            let aas_tags = [
205                ("aas.resource.id", aas_metadata.get_resource_id()),
206                (
207                    "aas.environment.extension_version",
208                    aas_metadata.get_extension_version(),
209                ),
210                (
211                    "aas.environment.instance_id",
212                    aas_metadata.get_instance_id(),
213                ),
214                (
215                    "aas.environment.instance_name",
216                    aas_metadata.get_instance_name(),
217                ),
218                ("aas.environment.os", aas_metadata.get_operating_system()),
219                ("aas.resource.group", aas_metadata.get_resource_group()),
220                ("aas.site.name", aas_metadata.get_site_name()),
221                ("aas.site.kind", aas_metadata.get_site_kind()),
222                ("aas.site.type", aas_metadata.get_site_type()),
223                ("aas.subscription.id", aas_metadata.get_subscription_id()),
224            ];
225            aas_tags.into_iter().for_each(|(name, value)| {
226                if let Ok(tag) = Tag::new(name, value) {
227                    tags_profiler.push_str(tag.as_ref());
228                    tags_profiler.push(',');
229                }
230            });
231        }
232
233        // Since this is the last tag, we add it without a comma afterward. If
234        // any tags get added after this one, you'll need to add the comma
235        // between them.
236        tags_profiler.push_str(self.runtime_platform_tag().as_ref());
237
238        let attachments: Vec<String> = files_to_compress_and_export
239            .iter()
240            .chain(files_to_export_unmodified.iter())
241            .map(|file| file.name.to_owned())
242            .chain(iter::once("profile.pprof".to_string()))
243            .collect();
244
245        let endpoint_counts = if profile.endpoints_stats.is_empty() {
246            None
247        } else {
248            Some(profile.endpoints_stats)
249        };
250        let mut internal: serde_json::value::Value = internal_metadata.unwrap_or_else(|| json!({}));
251        internal["libdatadog_version"] = json!(env!("CARGO_PKG_VERSION"));
252
253        let event = json!({
254            "attachments": attachments,
255            "tags_profiler": tags_profiler,
256            "start": DateTime::<Utc>::from(profile.start).format("%Y-%m-%dT%H:%M:%S%.9fZ").to_string(),
257            "end": DateTime::<Utc>::from(profile.end).format("%Y-%m-%dT%H:%M:%S%.9fZ").to_string(),
258            "family": self.family.as_ref(),
259            "version": "4",
260            "endpoint_counts" : endpoint_counts,
261            "internal": internal,
262            "info": info.unwrap_or_else(|| json!({})),
263        })
264        .to_string();
265
266        form.add_reader_file_with_mime(
267            // Intake does not look for filename=event.json, it looks for name=event.
268            "event",
269            // this one shouldn't be compressed
270            Cursor::new(event),
271            "event.json",
272            mime::APPLICATION_JSON,
273        );
274
275        for file in files_to_compress_and_export {
276            // We don't know the file types and how well they compress. So for
277            // a size hint, we look at roughly 1/8th of the file size.
278            let capacity = (file.bytes.len() >> 3).next_power_of_two();
279            // Most proxies/web server have a size limit per attachment,
280            // 10 MiB should be plenty for everything we upload.
281            let max_capacity = 10 * 1024 * 1024;
282            // We haven't yet tested compression for attachments other than
283            // profiles, which are compressed already before this point. We're
284            // re-using the  same level here for now.
285            let compression_level = Profile::COMPRESSION_LEVEL;
286            let mut encoder = Compressor::<DefaultProfileCodec>::try_new(
287                capacity,
288                max_capacity,
289                compression_level,
290            )
291            .context("failed to create compressor")?;
292            encoder.write_all(file.bytes)?;
293            let encoded = encoder.finish()?;
294            /* The Datadog RFC examples strip off the file extension, but the exact behavior
295             * isn't specified. This does the simple thing of using the filename
296             * without modification for the form name because intake does not care
297             * about these name of the form field for these attachments.
298             */
299            form.add_reader_file(file.name, Cursor::new(encoded), file.name);
300        }
301
302        for file in files_to_export_unmodified {
303            let encoded = file.bytes.to_vec();
304            /* The Datadog RFC examples strip off the file extension, but the exact behavior
305             * isn't specified. This does the simple thing of using the filename
306             * without modification for the form name because intake does not care
307             * about these name of the form field for these attachments.
308             */
309            form.add_reader_file(file.name, Cursor::new(encoded), file.name)
310        }
311        // Add the actual pprof
312        form.add_reader_file(
313            "profile.pprof",
314            Cursor::new(profile.buffer),
315            "profile.pprof",
316        );
317
318        let builder = self
319            .endpoint
320            .to_request_builder(concat!("DDProf/", env!("CARGO_PKG_VERSION")))?
321            .method(http::Method::POST)
322            .header("Connection", "close")
323            .header("DD-EVP-ORIGIN", self.profiling_library_name.as_ref())
324            .header(
325                "DD-EVP-ORIGIN-VERSION",
326                self.profiling_library_version.as_ref(),
327            );
328
329        Ok(Request::from(
330            form.set_body::<multipart::Body>(builder)?
331                .map(hyper_migration::Body::boxed),
332        )
333        .with_timeout(std::time::Duration::from_millis(self.endpoint.timeout_ms)))
334    }
335
336    pub fn send(
337        &self,
338        request: Request,
339        cancel: Option<&CancellationToken>,
340    ) -> anyhow::Result<HttpResponse> {
341        self.exporter
342            .runtime
343            .block_on(request.send(&self.exporter.client, cancel))
344    }
345
346    pub fn set_timeout(&mut self, timeout_ms: u64) {
347        self.endpoint.timeout_ms = timeout_ms;
348    }
349}
350
351impl Exporter {
352    /// Creates a new Exporter, initializing the TLS stack.
353    pub fn new() -> anyhow::Result<Self> {
354        // Set idle to 0, which prevents the pipe being broken every 2nd request
355        let client = hyper_migration::new_client_periodic();
356        let runtime = tokio::runtime::Builder::new_current_thread()
357            .enable_all()
358            .build()?;
359        Ok(Self { client, runtime })
360    }
361
362    pub fn send(
363        &self,
364        http_method: http::Method,
365        url: &str,
366        mut headers: hyper::header::HeaderMap,
367        body: &[u8],
368        timeout: std::time::Duration,
369    ) -> anyhow::Result<hyper_migration::HttpResponse> {
370        self.runtime.block_on(async {
371            let mut request = hyper::Request::builder()
372                .method(http_method)
373                .uri(url)
374                .body(hyper_migration::Body::from_bytes(Bytes::copy_from_slice(
375                    body,
376                )))?;
377            std::mem::swap(request.headers_mut(), &mut headers);
378
379            let request: Request = request.into();
380            request.with_timeout(timeout).send(&self.client, None).await
381        })
382    }
383}