use std::collections::HashMap;
use std::str::FromStr;
use std::sync::LazyLock;
use http::Uri;
use http::uri::PathAndQuery;
use opentelemetry::sdk::metrics::InstrumentKind;
use opentelemetry::sdk::metrics::reader::TemporalitySelector;
use opentelemetry_otlp::HttpExporterBuilder;
use opentelemetry_otlp::TonicExporterBuilder;
use opentelemetry_otlp::WithExportConfig;
use schemars::JsonSchema;
use serde::Deserialize;
use serde::Serialize;
use serde_json::Value;
use tonic::metadata::MetadataMap;
use tonic::transport::Certificate;
use tonic::transport::ClientTlsConfig;
use tonic::transport::Identity;
use tower::BoxError;
use url::Url;
use crate::plugins::telemetry::endpoint::UriEndpoint;
use crate::plugins::telemetry::tracing::BatchProcessorConfig;
static DEFAULT_GRPC_ENDPOINT: LazyLock<Uri> =
LazyLock::new(|| Uri::from_static("http://127.0.0.1:4317"));
static DEFAULT_HTTP_ENDPOINT: LazyLock<Uri> =
LazyLock::new(|| Uri::from_static("http://127.0.0.1:4318"));
const DEFAULT_HTTP_ENDPOINT_PATH: &str = "/v1/traces";
#[derive(Debug, Clone, Deserialize, JsonSchema, Default)]
#[serde(deny_unknown_fields)]
pub(crate) struct Config {
pub(crate) enabled: bool,
#[serde(default)]
pub(crate) endpoint: UriEndpoint,
#[serde(default)]
pub(crate) protocol: Protocol,
#[serde(default)]
pub(crate) grpc: GrpcExporter,
#[serde(default)]
pub(crate) http: HttpExporter,
#[serde(default)]
pub(crate) batch_processor: BatchProcessorConfig,
#[serde(default)]
pub(crate) temporality: Temporality,
}
#[derive(Copy, Clone)]
pub(crate) enum TelemetryDataKind {
Traces,
Metrics,
}
impl Config {
pub(crate) fn exporter<T: From<HttpExporterBuilder> + From<TonicExporterBuilder>>(
&self,
kind: TelemetryDataKind,
) -> Result<T, BoxError> {
match self.protocol {
Protocol::Grpc => {
let endpoint = self.endpoint.to_full_uri(&DEFAULT_GRPC_ENDPOINT);
let mut exporter = opentelemetry_otlp::new_exporter()
.tonic()
.with_timeout(self.batch_processor.max_export_timeout)
.with_endpoint(endpoint.to_string())
.with_metadata(MetadataMap::from_headers(self.grpc.metadata.clone()));
if let Some(tls_config) = self.grpc.clone().to_tls_config(&endpoint)? {
exporter = exporter.with_tls_config(tls_config);
}
Ok(exporter.into())
}
Protocol::Http => {
let mut endpoint = self.endpoint.to_full_uri(&DEFAULT_HTTP_ENDPOINT);
if let TelemetryDataKind::Traces = kind {
endpoint = add_missing_traces_path(endpoint)?;
}
let http = self.http.clone();
let exporter = opentelemetry_otlp::new_exporter()
.http()
.with_timeout(self.batch_processor.max_export_timeout)
.with_endpoint(endpoint.to_string())
.with_headers(http.headers);
Ok(exporter.into())
}
}
}
}
fn add_missing_traces_path(uri: Uri) -> Result<Uri, BoxError> {
let mut parts = uri.into_parts();
parts.path_and_query = Some(match parts.path_and_query {
Some(path_and_query) if path_and_query.path().ends_with(DEFAULT_HTTP_ENDPOINT_PATH) => {
path_and_query
}
Some(path_and_query) => match path_and_query.query() {
Some(query) => PathAndQuery::from_str(&format!(
"{}{DEFAULT_HTTP_ENDPOINT_PATH}?{query}",
path_and_query.path().trim_end_matches('/')
))?,
None => PathAndQuery::from_str(&format!(
"{}{DEFAULT_HTTP_ENDPOINT_PATH}",
path_and_query.path().trim_end_matches('/')
))?,
},
None => PathAndQuery::from_static(DEFAULT_HTTP_ENDPOINT_PATH),
});
Ok(Uri::from_parts(parts)?)
}
#[derive(Debug, Clone, Deserialize, Serialize, Default, JsonSchema)]
#[serde(deny_unknown_fields, default)]
pub(crate) struct HttpExporter {
pub(crate) headers: HashMap<String, String>,
}
#[derive(Debug, Clone, Deserialize, Serialize, Default, JsonSchema)]
#[serde(deny_unknown_fields, default)]
pub(crate) struct GrpcExporter {
pub(crate) domain_name: Option<String>,
pub(crate) ca: Option<String>,
pub(crate) cert: Option<String>,
pub(crate) key: Option<String>,
#[serde(with = "http_serde::header_map")]
#[schemars(schema_with = "header_map", default)]
pub(crate) metadata: http::HeaderMap,
}
fn header_map(gen: &mut schemars::gen::SchemaGenerator) -> schemars::schema::Schema {
HashMap::<String, Value>::json_schema(gen)
}
impl GrpcExporter {
pub(crate) fn to_tls_config(
&self,
endpoint: &Uri,
) -> Result<Option<ClientTlsConfig>, BoxError> {
let endpoint = endpoint
.to_string()
.parse::<Url>()
.map_err(|e| BoxError::from(format!("invalid GRPC endpoint {}, {}", endpoint, e)))?;
let domain_name = self.default_tls_domain(&endpoint);
if let (Some(ca), Some(key), Some(cert), Some(domain_name)) =
(&self.ca, &self.key, &self.cert, domain_name)
{
return Ok(Some(
ClientTlsConfig::new()
.domain_name(domain_name)
.ca_certificate(Certificate::from_pem(ca.clone()))
.identity(Identity::from_pem(cert.clone(), key.clone())),
));
}
Ok(None)
}
fn default_tls_domain<'a>(&'a self, endpoint: &'a Url) -> Option<&'a str> {
let domain_name = match (&self.domain_name, endpoint) {
(Some(domain), _) => Some(domain.as_str()),
(None, endpoint) if endpoint.scheme() == "https" => endpoint.host_str(),
(None, endpoint) if endpoint.port() == Some(443) && endpoint.scheme() != "http" => {
tracing::warn!(
"telemetry otlp exporter has been configured with port 443 but TLS domain has not been set. This is likely a configuration error"
);
None
}
_ => None,
};
domain_name
}
}
#[derive(Debug, Default, Clone, Deserialize, Serialize, JsonSchema)]
#[serde(deny_unknown_fields, rename_all = "snake_case")]
pub(crate) enum Protocol {
#[default]
Grpc,
Http,
}
#[derive(Debug, Default, Clone, Deserialize, Serialize, JsonSchema)]
#[serde(deny_unknown_fields, rename_all = "snake_case")]
pub(crate) enum Temporality {
#[default]
Cumulative,
Delta,
}
pub(crate) struct CustomTemporalitySelector(
pub(crate) opentelemetry::sdk::metrics::data::Temporality,
);
impl TemporalitySelector for CustomTemporalitySelector {
fn temporality(&self, _kind: InstrumentKind) -> opentelemetry::sdk::metrics::data::Temporality {
self.0
}
}
impl From<&Temporality> for Box<dyn TemporalitySelector> {
fn from(value: &Temporality) -> Self {
Box::new(match value {
Temporality::Cumulative => CustomTemporalitySelector(
opentelemetry::sdk::metrics::data::Temporality::Cumulative,
),
Temporality::Delta => {
CustomTemporalitySelector(opentelemetry::sdk::metrics::data::Temporality::Delta)
}
})
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn endpoint_grpc_defaulting_no_scheme() {
let url = Url::parse("api.apm.com:433").unwrap();
let exporter = GrpcExporter::default();
let domain = exporter.default_tls_domain(&url);
assert_eq!(domain, None);
}
#[test]
fn endpoint_grpc_defaulting_scheme() {
let url = Url::parse("https://api.apm.com:433").unwrap();
let exporter = GrpcExporter::default();
let domain = exporter.default_tls_domain(&url);
assert_eq!(domain, Some(url.domain().expect("domain was expected")),);
}
#[test]
fn endpoint_grpc_explicit_domain() {
let url = Url::parse("https://api.apm.com:433").unwrap();
let exporter = GrpcExporter {
domain_name: Some("foo.bar".to_string()),
..Default::default()
};
let domain = exporter.default_tls_domain(&url);
assert_eq!(domain, Some("foo.bar"));
}
#[test]
fn test_add_missing_traces_path() {
let url = Uri::from_str("https://api.apm.com:433/v1/traces").unwrap();
let url = add_missing_traces_path(url).unwrap();
assert_eq!(
url.to_string(),
String::from("https://api.apm.com:433/v1/traces")
);
let url = Uri::from_str("https://api.apm.com:433/").unwrap();
let url = add_missing_traces_path(url).unwrap();
assert_eq!(
url.to_string(),
String::from("https://api.apm.com:433/v1/traces")
);
let url = Uri::from_str("https://api.apm.com:433/?hi=hello").unwrap();
let url = add_missing_traces_path(url).unwrap();
assert_eq!(
url.to_string(),
String::from("https://api.apm.com:433/v1/traces?hi=hello")
);
let url = Uri::from_str("https://api.apm.com:433/v1?hi=hello").unwrap();
let url = add_missing_traces_path(url).unwrap();
assert_eq!(
url.to_string(),
String::from("https://api.apm.com:433/v1/v1/traces?hi=hello")
);
}
}