use anyhow::Context;
use bytes::Bytes;
pub use chrono::{DateTime, Utc};
pub use hyper::Uri;
use hyper_multipart_rfc7578::client::multipart;
pub use libdd_common::tag::Tag;
use serde_json::json;
use std::borrow::Cow;
use std::fmt::Debug;
use std::io::{Cursor, Write};
use std::{future, iter};
use tokio::runtime::Runtime;
use tokio_util::sync::CancellationToken;
use libdd_common::{
azure_app_services, connector, hyper_migration, tag, Endpoint, HttpClient, HttpResponse,
};
pub mod config;
mod errors;
#[cfg(unix)]
pub use connector::uds::{socket_path_from_uri, socket_path_to_uri};
#[cfg(windows)]
pub use connector::named_pipe::{named_pipe_path_from_uri, named_pipe_path_to_uri};
use crate::internal::{EncodedProfile, Profile};
use crate::profiles::{Compressor, DefaultProfileCodec};
const DURATION_ZERO: std::time::Duration = std::time::Duration::from_millis(0);
pub struct Exporter {
client: HttpClient,
runtime: Runtime,
}
pub struct Fields {
pub start: DateTime<Utc>,
pub end: DateTime<Utc>,
}
pub struct ProfileExporter {
exporter: Exporter,
endpoint: Endpoint,
family: Cow<'static, str>,
profiling_library_name: Cow<'static, str>,
profiling_library_version: Cow<'static, str>,
tags: Option<Vec<Tag>>,
}
pub struct File<'a> {
pub name: &'a str,
pub bytes: &'a [u8],
}
#[derive(Debug)]
pub struct Request {
timeout: Option<std::time::Duration>,
req: hyper_migration::HttpRequest,
}
impl From<hyper_migration::HttpRequest> for Request {
fn from(req: hyper_migration::HttpRequest) -> Self {
Self { req, timeout: None }
}
}
impl Request {
fn with_timeout(mut self, timeout: std::time::Duration) -> Self {
self.timeout = if timeout != DURATION_ZERO {
Some(timeout)
} else {
None
};
self
}
pub fn timeout(&self) -> &Option<std::time::Duration> {
&self.timeout
}
pub fn uri(&self) -> &hyper::Uri {
self.req.uri()
}
pub fn headers(&self) -> &hyper::HeaderMap {
self.req.headers()
}
pub fn body(self) -> hyper_migration::Body {
self.req.into_body()
}
async fn send(
self,
client: &HttpClient,
cancel: Option<&CancellationToken>,
) -> anyhow::Result<HttpResponse> {
tokio::select! {
_ = async { match cancel {
Some(cancellation_token) => cancellation_token.cancelled().await,
None => future::pending().await,
}}
=> Err(crate::exporter::errors::Error::UserRequestedCancellation.into()),
result = async {
match self.timeout {
Some(t) => {
let res = tokio::time::timeout(t, client.request(self.req)).await;
res
.map_err(|_| anyhow::Error::from(crate::exporter::errors::Error::OperationTimedOut))
},
None => Ok(client.request(self.req).await),
}
}
=> Ok(hyper_migration::into_response(result??)),
}
}
}
impl ProfileExporter {
pub fn new<F, N, V>(
profiling_library_name: N,
profiling_library_version: V,
family: F,
tags: Option<Vec<Tag>>,
endpoint: Endpoint,
) -> anyhow::Result<ProfileExporter>
where
F: Into<Cow<'static, str>>,
N: Into<Cow<'static, str>>,
V: Into<Cow<'static, str>>,
{
Ok(Self {
exporter: Exporter::new()?,
endpoint,
family: family.into(),
profiling_library_name: profiling_library_name.into(),
profiling_library_version: profiling_library_version.into(),
tags,
})
}
const TARGET_TRIPLE: &'static str = target_triple::TARGET;
#[inline]
fn runtime_platform_tag(&self) -> Tag {
tag!("runtime_platform", ProfileExporter::TARGET_TRIPLE)
}
#[allow(clippy::too_many_arguments)]
pub fn build(
&self,
profile: EncodedProfile,
files_to_compress_and_export: &[File],
files_to_export_unmodified: &[File],
additional_tags: Option<&Vec<Tag>>,
internal_metadata: Option<serde_json::Value>,
info: Option<serde_json::Value>,
) -> anyhow::Result<Request> {
let mut form = multipart::Form::default();
let mut tags_profiler = String::new();
let other_tags = additional_tags.into_iter();
for tag in self.tags.iter().chain(other_tags).flatten() {
tags_profiler.push_str(tag.as_ref());
tags_profiler.push(',');
}
if let Some(aas_metadata) = &*azure_app_services::AAS_METADATA {
let aas_tags = [
("aas.resource.id", aas_metadata.get_resource_id()),
(
"aas.environment.extension_version",
aas_metadata.get_extension_version(),
),
(
"aas.environment.instance_id",
aas_metadata.get_instance_id(),
),
(
"aas.environment.instance_name",
aas_metadata.get_instance_name(),
),
("aas.environment.os", aas_metadata.get_operating_system()),
("aas.resource.group", aas_metadata.get_resource_group()),
("aas.site.name", aas_metadata.get_site_name()),
("aas.site.kind", aas_metadata.get_site_kind()),
("aas.site.type", aas_metadata.get_site_type()),
("aas.subscription.id", aas_metadata.get_subscription_id()),
];
aas_tags.into_iter().for_each(|(name, value)| {
if let Ok(tag) = Tag::new(name, value) {
tags_profiler.push_str(tag.as_ref());
tags_profiler.push(',');
}
});
}
tags_profiler.push_str(self.runtime_platform_tag().as_ref());
let attachments: Vec<String> = files_to_compress_and_export
.iter()
.chain(files_to_export_unmodified.iter())
.map(|file| file.name.to_owned())
.chain(iter::once("profile.pprof".to_string()))
.collect();
let endpoint_counts = if profile.endpoints_stats.is_empty() {
None
} else {
Some(profile.endpoints_stats)
};
let mut internal: serde_json::value::Value = internal_metadata.unwrap_or_else(|| json!({}));
internal["libdatadog_version"] = json!(env!("CARGO_PKG_VERSION"));
let event = json!({
"attachments": attachments,
"tags_profiler": tags_profiler,
"start": DateTime::<Utc>::from(profile.start).format("%Y-%m-%dT%H:%M:%S%.9fZ").to_string(),
"end": DateTime::<Utc>::from(profile.end).format("%Y-%m-%dT%H:%M:%S%.9fZ").to_string(),
"family": self.family.as_ref(),
"version": "4",
"endpoint_counts" : endpoint_counts,
"internal": internal,
"info": info.unwrap_or_else(|| json!({})),
})
.to_string();
form.add_reader_file_with_mime(
"event",
Cursor::new(event),
"event.json",
mime::APPLICATION_JSON,
);
for file in files_to_compress_and_export {
let capacity = (file.bytes.len() >> 3).next_power_of_two();
let max_capacity = 10 * 1024 * 1024;
let compression_level = Profile::COMPRESSION_LEVEL;
let mut encoder = Compressor::<DefaultProfileCodec>::try_new(
capacity,
max_capacity,
compression_level,
)
.context("failed to create compressor")?;
encoder.write_all(file.bytes)?;
let encoded = encoder.finish()?;
form.add_reader_file(file.name, Cursor::new(encoded), file.name);
}
for file in files_to_export_unmodified {
let encoded = file.bytes.to_vec();
form.add_reader_file(file.name, Cursor::new(encoded), file.name)
}
form.add_reader_file(
"profile.pprof",
Cursor::new(profile.buffer),
"profile.pprof",
);
let builder = self
.endpoint
.to_request_builder(concat!("DDProf/", env!("CARGO_PKG_VERSION")))?
.method(http::Method::POST)
.header("Connection", "close")
.header("DD-EVP-ORIGIN", self.profiling_library_name.as_ref())
.header(
"DD-EVP-ORIGIN-VERSION",
self.profiling_library_version.as_ref(),
);
Ok(Request::from(
form.set_body::<multipart::Body>(builder)?
.map(hyper_migration::Body::boxed),
)
.with_timeout(std::time::Duration::from_millis(self.endpoint.timeout_ms)))
}
pub fn send(
&self,
request: Request,
cancel: Option<&CancellationToken>,
) -> anyhow::Result<HttpResponse> {
self.exporter
.runtime
.block_on(request.send(&self.exporter.client, cancel))
}
pub fn set_timeout(&mut self, timeout_ms: u64) {
self.endpoint.timeout_ms = timeout_ms;
}
}
impl Exporter {
pub fn new() -> anyhow::Result<Self> {
let client = hyper_migration::new_client_periodic();
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;
Ok(Self { client, runtime })
}
pub fn send(
&self,
http_method: http::Method,
url: &str,
mut headers: hyper::header::HeaderMap,
body: &[u8],
timeout: std::time::Duration,
) -> anyhow::Result<hyper_migration::HttpResponse> {
self.runtime.block_on(async {
let mut request = hyper::Request::builder()
.method(http_method)
.uri(url)
.body(hyper_migration::Body::from_bytes(Bytes::copy_from_slice(
body,
)))?;
std::mem::swap(request.headers_mut(), &mut headers);
let request: Request = request.into();
request.with_timeout(timeout).send(&self.client, None).await
})
}
}