use std::collections::HashMap;
use http::Uri;
use opentelemetry_otlp::HttpExporterBuilder;
use opentelemetry_otlp::TonicExporterBuilder;
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk::metrics::InstrumentKind;
use opentelemetry_sdk::metrics::reader::TemporalitySelector;
use schemars::JsonSchema;
use serde::Deserialize;
use serde::Serialize;
use serde_json::Value;
use tonic::metadata::MetadataMap;
use tonic::transport::Certificate;
use tonic::transport::ClientTlsConfig;
use tonic::transport::Identity;
use tower::BoxError;
use url::Url;
use crate::plugins::telemetry::tracing::BatchProcessorConfig;
#[derive(Debug, Clone, Deserialize, JsonSchema, Default, PartialEq)]
#[serde(deny_unknown_fields)]
#[schemars(rename = "OTLPConfig")]
pub(crate) struct Config {
pub(crate) enabled: bool,
#[serde(default)]
pub(crate) endpoint: Option<String>,
#[serde(default)]
pub(crate) protocol: Protocol,
#[serde(default)]
pub(crate) grpc: GrpcExporter,
#[serde(default)]
pub(crate) http: HttpExporter,
#[serde(default)]
pub(crate) batch_processor: BatchProcessorConfig,
#[serde(default)]
pub(crate) temporality: Temporality,
}
#[derive(Copy, Clone, Debug)]
pub(crate) enum TelemetryDataKind {
Traces,
Metrics,
}
pub(super) fn process_endpoint(
endpoint: &Option<String>,
kind: &TelemetryDataKind,
protocol: &Protocol,
) -> Result<Option<String>, BoxError> {
endpoint
.as_ref()
.map(|v| {
let mut base = if v == "default" {
"".to_string()
} else {
v.to_string()
};
if base.is_empty() {
Ok(base)
} else {
if !base.starts_with("http") {
base = format!("http://{base}");
}
let suffix = match protocol {
Protocol::Grpc => "/",
Protocol::Http => match kind {
TelemetryDataKind::Metrics => "/v1/metrics",
TelemetryDataKind::Traces => "/v1/traces",
},
};
if base.ends_with(suffix) {
Ok(base)
} else {
let uri = http::Uri::try_from(&base)?;
if uri.path() == "/" {
if base.ends_with("/") {
base.pop();
}
Ok(format!("{base}{suffix}"))
} else {
Ok(base)
}
}
}
})
.transpose()
}
impl Config {
pub(crate) fn exporter<T: From<HttpExporterBuilder> + From<TonicExporterBuilder>>(
&self,
kind: TelemetryDataKind,
) -> Result<T, BoxError> {
match self.protocol {
Protocol::Grpc => {
let endpoint_opt = process_endpoint(&self.endpoint, &kind, &self.protocol)?;
let tls_config_opt = if let Some(endpoint) = &endpoint_opt {
if !endpoint.is_empty() {
let tls_url = Uri::try_from(endpoint)?;
Some(self.grpc.clone().to_tls_config(&tls_url)?)
} else {
None
}
} else {
None
};
let mut exporter = opentelemetry_otlp::new_exporter()
.tonic()
.with_protocol(opentelemetry_otlp::Protocol::Grpc)
.with_timeout(self.batch_processor.max_export_timeout)
.with_metadata(MetadataMap::from_headers(self.grpc.metadata.clone()));
if let Some(endpoint) = endpoint_opt {
exporter = exporter.with_endpoint(endpoint);
}
if let Some(tls_config) = tls_config_opt {
exporter = exporter.with_tls_config(tls_config);
}
Ok(exporter.into())
}
Protocol::Http => {
let endpoint_opt = process_endpoint(&self.endpoint, &kind, &self.protocol)?;
let headers = self.http.headers.clone();
let mut exporter: HttpExporterBuilder = opentelemetry_otlp::new_exporter()
.http()
.with_protocol(opentelemetry_otlp::Protocol::Grpc)
.with_timeout(self.batch_processor.max_export_timeout)
.with_headers(headers);
if let Some(endpoint) = endpoint_opt {
exporter = exporter.with_endpoint(endpoint);
}
Ok(exporter.into())
}
}
}
}
#[derive(Debug, Clone, Deserialize, Serialize, Default, JsonSchema, PartialEq)]
#[serde(deny_unknown_fields, default)]
pub(crate) struct HttpExporter {
pub(crate) headers: HashMap<String, String>,
}
#[derive(Debug, Clone, Deserialize, Serialize, Default, JsonSchema, PartialEq)]
#[serde(deny_unknown_fields, default)]
pub(crate) struct GrpcExporter {
pub(crate) domain_name: Option<String>,
pub(crate) ca: Option<String>,
pub(crate) cert: Option<String>,
pub(crate) key: Option<String>,
#[serde(with = "http_serde::header_map")]
#[schemars(schema_with = "header_map", default)]
pub(crate) metadata: http::HeaderMap,
}
fn header_map(generator: &mut schemars::SchemaGenerator) -> schemars::Schema {
HashMap::<String, Value>::json_schema(generator)
}
impl GrpcExporter {
pub(crate) fn to_tls_config(&self, endpoint: &Uri) -> Result<ClientTlsConfig, BoxError> {
let endpoint = endpoint
.to_string()
.parse::<Url>()
.map_err(|e| BoxError::from(format!("invalid GRPC endpoint {endpoint}, {e}")))?;
let domain_name = self.default_tls_domain(&endpoint);
if let (Some(ca), Some(key), Some(cert), Some(domain_name)) =
(&self.ca, &self.key, &self.cert, domain_name)
{
Ok(ClientTlsConfig::new()
.with_native_roots()
.domain_name(domain_name)
.ca_certificate(Certificate::from_pem(ca.clone()))
.identity(Identity::from_pem(cert.clone(), key.clone())))
} else {
Ok(ClientTlsConfig::new().with_native_roots())
}
}
fn default_tls_domain<'a>(&'a self, endpoint: &'a Url) -> Option<&'a str> {
match (&self.domain_name, endpoint) {
(Some(domain), _) => Some(domain.as_str()),
(None, endpoint) if endpoint.scheme() == "https" => endpoint.host_str(),
(None, endpoint) if endpoint.port() == Some(443) && endpoint.scheme() != "http" => {
tracing::warn!(
"telemetry otlp exporter has been configured with port 443 but TLS domain has not been set. This is likely a configuration error"
);
None
}
_ => None,
}
}
}
#[derive(Debug, Default, Clone, Deserialize, Serialize, JsonSchema, PartialEq)]
#[serde(deny_unknown_fields, rename_all = "snake_case")]
pub(crate) enum Protocol {
#[default]
Grpc,
Http,
}
#[derive(Debug, Default, Clone, Deserialize, Serialize, JsonSchema, PartialEq)]
#[serde(deny_unknown_fields, rename_all = "snake_case")]
pub(crate) enum Temporality {
#[default]
Cumulative,
Delta,
}
pub(crate) struct CustomTemporalitySelector(
pub(crate) opentelemetry_sdk::metrics::data::Temporality,
);
impl TemporalitySelector for CustomTemporalitySelector {
fn temporality(&self, kind: InstrumentKind) -> opentelemetry_sdk::metrics::data::Temporality {
match kind {
InstrumentKind::UpDownCounter | InstrumentKind::ObservableUpDownCounter => {
opentelemetry_sdk::metrics::data::Temporality::Cumulative
}
_ => self.0,
}
}
}
impl From<&Temporality> for Box<dyn TemporalitySelector> {
fn from(value: &Temporality) -> Self {
Box::new(match value {
Temporality::Cumulative => {
CustomTemporalitySelector(opentelemetry_sdk::metrics::data::Temporality::Cumulative)
}
Temporality::Delta => {
CustomTemporalitySelector(opentelemetry_sdk::metrics::data::Temporality::Delta)
}
})
}
}
#[cfg(test)]
mod tests {
use opentelemetry_sdk::metrics::data::Temporality as SdkTemporality;
use super::*;
#[test]
fn test_updown_counter_temporality_override() {
let delta_selector = CustomTemporalitySelector(SdkTemporality::Delta);
let cumulative_selector = CustomTemporalitySelector(SdkTemporality::Cumulative);
assert_eq!(
delta_selector.temporality(InstrumentKind::UpDownCounter),
SdkTemporality::Cumulative,
"UpDownCounter should always use cumulative temporality even with delta config"
);
assert_eq!(
cumulative_selector.temporality(InstrumentKind::UpDownCounter),
SdkTemporality::Cumulative,
"UpDownCounter should use cumulative temporality with cumulative config"
);
assert_eq!(
delta_selector.temporality(InstrumentKind::ObservableUpDownCounter),
SdkTemporality::Cumulative,
"ObservableUpDownCounter should always use cumulative temporality even with delta config"
);
assert_eq!(
cumulative_selector.temporality(InstrumentKind::ObservableUpDownCounter),
SdkTemporality::Cumulative,
"ObservableUpDownCounter should use cumulative temporality with cumulative config"
);
}
#[test]
fn test_counter_temporality_respects_config() {
let delta_selector = CustomTemporalitySelector(SdkTemporality::Delta);
let cumulative_selector = CustomTemporalitySelector(SdkTemporality::Cumulative);
assert_eq!(
delta_selector.temporality(InstrumentKind::Counter),
SdkTemporality::Delta,
"Counter should use delta temporality with delta config"
);
assert_eq!(
cumulative_selector.temporality(InstrumentKind::Counter),
SdkTemporality::Cumulative,
"Counter should use cumulative temporality with cumulative config"
);
assert_eq!(
delta_selector.temporality(InstrumentKind::ObservableCounter),
SdkTemporality::Delta,
"ObservableCounter should use delta temporality with delta config"
);
assert_eq!(
cumulative_selector.temporality(InstrumentKind::ObservableCounter),
SdkTemporality::Cumulative,
"ObservableCounter should use cumulative temporality with cumulative config"
);
}
#[test]
fn test_gauge_temporality_respects_config() {
let delta_selector = CustomTemporalitySelector(SdkTemporality::Delta);
let cumulative_selector = CustomTemporalitySelector(SdkTemporality::Cumulative);
assert_eq!(
delta_selector.temporality(InstrumentKind::Gauge),
SdkTemporality::Delta,
"Gauge should use delta temporality with delta config"
);
assert_eq!(
cumulative_selector.temporality(InstrumentKind::Gauge),
SdkTemporality::Cumulative,
"Gauge should use cumulative temporality with cumulative config"
);
assert_eq!(
delta_selector.temporality(InstrumentKind::ObservableGauge),
SdkTemporality::Delta,
"ObservableGauge should use delta temporality with delta config"
);
assert_eq!(
cumulative_selector.temporality(InstrumentKind::ObservableGauge),
SdkTemporality::Cumulative,
"ObservableGauge should use cumulative temporality with cumulative config"
);
}
#[test]
fn test_histogram_temporality_respects_config() {
let delta_selector = CustomTemporalitySelector(SdkTemporality::Delta);
let cumulative_selector = CustomTemporalitySelector(SdkTemporality::Cumulative);
assert_eq!(
delta_selector.temporality(InstrumentKind::Histogram),
SdkTemporality::Delta,
"Histogram should use delta temporality with delta config"
);
assert_eq!(
cumulative_selector.temporality(InstrumentKind::Histogram),
SdkTemporality::Cumulative,
"Histogram should use cumulative temporality with cumulative config"
);
}
#[test]
fn endpoint_grpc_defaulting_no_scheme() {
let url = Url::parse("api.apm.com:433").unwrap();
let exporter = GrpcExporter::default();
let domain = exporter.default_tls_domain(&url);
assert_eq!(domain, None);
}
#[test]
fn endpoint_grpc_defaulting_scheme() {
let url = Url::parse("https://api.apm.com:433").unwrap();
let exporter = GrpcExporter::default();
let domain = exporter.default_tls_domain(&url);
assert_eq!(domain, Some(url.domain().expect("domain was expected")),);
}
#[test]
fn endpoint_grpc_explicit_domain() {
let url = Url::parse("https://api.apm.com:433").unwrap();
let exporter = GrpcExporter {
domain_name: Some("foo.bar".to_string()),
..Default::default()
};
let domain = exporter.default_tls_domain(&url);
assert_eq!(domain, Some("foo.bar"));
}
#[test]
fn test_process_endpoint() {
let endpoint = None;
let processed_endpoint =
process_endpoint(&endpoint, &TelemetryDataKind::Traces, &Protocol::Grpc).unwrap();
assert_eq!(endpoint, processed_endpoint);
let endpoint = Some("default".to_string());
let processed_endpoint =
process_endpoint(&endpoint, &TelemetryDataKind::Traces, &Protocol::Grpc).unwrap();
assert_eq!(Some("".to_string()), processed_endpoint);
let endpoint = Some("https://api.apm.com:433/v1/traces".to_string());
let processed_endpoint =
process_endpoint(&endpoint, &TelemetryDataKind::Traces, &Protocol::Grpc).unwrap();
assert_eq!(endpoint, processed_endpoint);
let endpoint = Some("https://api.apm.com:433".to_string());
let processed_endpoint =
process_endpoint(&endpoint, &TelemetryDataKind::Traces, &Protocol::Grpc).unwrap();
assert_eq!(
Some("https://api.apm.com:433/".to_string()),
processed_endpoint
);
let endpoint = Some("https://api.apm.com:433".to_string());
let processed_endpoint =
process_endpoint(&endpoint, &TelemetryDataKind::Traces, &Protocol::Http).unwrap();
assert_eq!(
Some("https://api.apm.com:433/v1/traces".to_string()),
processed_endpoint
);
let endpoint = Some("https://api.apm.com:433/".to_string());
let processed_endpoint =
process_endpoint(&endpoint, &TelemetryDataKind::Traces, &Protocol::Grpc).unwrap();
assert_eq!(endpoint, processed_endpoint);
let endpoint = Some("https://api.apm.com:433/traces".to_string());
let processed_endpoint =
process_endpoint(&endpoint, &TelemetryDataKind::Traces, &Protocol::Grpc).unwrap();
assert_eq!(endpoint, processed_endpoint);
let endpoint = Some("localhost:4317".to_string());
let processed_endpoint =
process_endpoint(&endpoint, &TelemetryDataKind::Traces, &Protocol::Grpc).unwrap();
assert_eq!(
Some("http://localhost:4317/".to_string()),
processed_endpoint
);
let endpoint = Some("localhost:4317".to_string());
let processed_endpoint =
process_endpoint(&endpoint, &TelemetryDataKind::Traces, &Protocol::Http).unwrap();
assert_eq!(
Some("http://localhost:4317/v1/traces".to_string()),
processed_endpoint
);
let endpoint = Some("https://otlp.nr-data.net".to_string());
let processed_endpoint =
process_endpoint(&endpoint, &TelemetryDataKind::Traces, &Protocol::Http).unwrap();
assert_eq!(
Some("https://otlp.nr-data.net/v1/traces".to_string()),
processed_endpoint
);
let endpoint = None;
let processed_endpoint =
process_endpoint(&endpoint, &TelemetryDataKind::Metrics, &Protocol::Grpc).unwrap();
assert_eq!(None, processed_endpoint);
let endpoint = Some("default".to_string());
let processed_endpoint =
process_endpoint(&endpoint, &TelemetryDataKind::Metrics, &Protocol::Grpc).unwrap();
assert_eq!(Some("".to_string()), processed_endpoint);
let endpoint = Some("https://api.apm.com:433/v1/metrics".to_string());
let processed_endpoint =
process_endpoint(&endpoint, &TelemetryDataKind::Metrics, &Protocol::Grpc).unwrap();
assert_eq!(endpoint, processed_endpoint);
let endpoint = Some("https://api.apm.com:433".to_string());
let processed_endpoint =
process_endpoint(&endpoint, &TelemetryDataKind::Metrics, &Protocol::Grpc).unwrap();
assert_eq!(
Some("https://api.apm.com:433/".to_string()),
processed_endpoint
);
let endpoint = Some("https://api.apm.com:433".to_string());
let processed_endpoint =
process_endpoint(&endpoint, &TelemetryDataKind::Metrics, &Protocol::Http).unwrap();
assert_eq!(
Some("https://api.apm.com:433/v1/metrics".to_string()),
processed_endpoint
);
let endpoint = Some("https://api.apm.com:433/".to_string());
let processed_endpoint =
process_endpoint(&endpoint, &TelemetryDataKind::Metrics, &Protocol::Grpc).unwrap();
assert_eq!(endpoint, processed_endpoint);
let endpoint = Some("https://api.apm.com:433/metrics".to_string());
let processed_endpoint =
process_endpoint(&endpoint, &TelemetryDataKind::Metrics, &Protocol::Grpc).unwrap();
assert_eq!(endpoint, processed_endpoint);
let endpoint = Some("localhost:4317".to_string());
let processed_endpoint =
process_endpoint(&endpoint, &TelemetryDataKind::Metrics, &Protocol::Grpc).unwrap();
assert_eq!(
Some("http://localhost:4317/".to_string()),
processed_endpoint
);
let endpoint = Some("localhost:4317".to_string());
let processed_endpoint =
process_endpoint(&endpoint, &TelemetryDataKind::Metrics, &Protocol::Http).unwrap();
assert_eq!(
Some("http://localhost:4317/v1/metrics".to_string()),
processed_endpoint
);
let endpoint = Some("https://otlp.nr-data.net".to_string());
let processed_endpoint =
process_endpoint(&endpoint, &TelemetryDataKind::Metrics, &Protocol::Http).unwrap();
assert_eq!(
Some("https://otlp.nr-data.net/v1/metrics".to_string()),
processed_endpoint
);
}
}