use std::collections::HashMap;
use http::Uri;
use opentelemetry_sdk::metrics::Temporality as SdkTemporality;
use schemars::JsonSchema;
use serde::Deserialize;
use serde::Serialize;
use serde_json::Value;
use tonic::transport::Certificate;
use tonic::transport::ClientTlsConfig;
use tonic::transport::Identity;
use tower::BoxError;
use url::Url;
use crate::plugins::telemetry::tracing::BatchProcessorConfig;
#[derive(Debug, Clone, Deserialize, JsonSchema, Default, PartialEq)]
#[serde(deny_unknown_fields)]
#[schemars(rename = "OTLPConfig")]
pub(crate) struct Config {
pub(crate) enabled: bool,
#[serde(default)]
pub(crate) endpoint: Option<String>,
#[serde(default)]
pub(crate) protocol: Protocol,
#[serde(default)]
pub(crate) grpc: GrpcExporter,
#[serde(default)]
pub(crate) http: HttpExporter,
#[serde(default)]
pub(crate) batch_processor: BatchProcessorConfig,
#[serde(default)]
pub(crate) temporality: Temporality,
}
impl Config {
pub(crate) fn with_tracing_env_overrides(self) -> Result<Self, BoxError> {
let endpoint = std::env::var("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT")
.ok()
.or_else(|| std::env::var("OTEL_EXPORTER_OTLP_ENDPOINT").ok())
.or(self.endpoint);
let protocol = Self::parse_protocol_env(
"OTEL_EXPORTER_OTLP_TRACES_PROTOCOL",
"OTEL_EXPORTER_OTLP_PROTOCOL",
self.protocol,
)?;
Ok(Config {
endpoint,
protocol,
..self
})
}
pub(crate) fn with_metrics_env_overrides(self) -> Result<Self, BoxError> {
let endpoint = std::env::var("OTEL_EXPORTER_OTLP_METRICS_ENDPOINT")
.ok()
.or_else(|| std::env::var("OTEL_EXPORTER_OTLP_ENDPOINT").ok())
.or(self.endpoint);
let protocol = Self::parse_protocol_env(
"OTEL_EXPORTER_OTLP_METRICS_PROTOCOL",
"OTEL_EXPORTER_OTLP_PROTOCOL",
self.protocol,
)?;
Ok(Config {
endpoint,
protocol,
..self
})
}
fn parse_protocol_env(
specific_var: &str,
general_var: &str,
default: Protocol,
) -> Result<Protocol, BoxError> {
let (var_name, value) = if let Ok(v) = std::env::var(specific_var) {
(specific_var, v)
} else if let Ok(v) = std::env::var(general_var) {
(general_var, v)
} else {
return Ok(default);
};
match value.to_lowercase().as_str() {
"grpc" => Ok(Protocol::Grpc),
"http/protobuf" | "http" => Ok(Protocol::Http),
_ => Err(format!(
"invalid value '{}' for {}, expected 'grpc' or 'http/protobuf'",
value, var_name
)
.into()),
}
}
}
#[derive(Copy, Clone, Debug)]
pub(crate) enum TelemetryDataKind {
Traces,
Metrics,
}
pub(super) fn process_endpoint(
endpoint: &Option<String>,
kind: &TelemetryDataKind,
protocol: &Protocol,
) -> Result<Option<String>, BoxError> {
endpoint
.as_ref()
.map(|v| {
let mut base = if v == "default" {
"".to_string()
} else {
v.to_string()
};
if base.is_empty() {
Ok(base)
} else {
if !base.starts_with("http") {
base = format!("http://{base}");
}
let suffix = match protocol {
Protocol::Grpc => "/",
Protocol::Http => match kind {
TelemetryDataKind::Metrics => "/v1/metrics",
TelemetryDataKind::Traces => "/v1/traces",
},
};
if base.ends_with(suffix) {
Ok(base)
} else {
let uri = http::Uri::try_from(&base)?;
if uri.path() == "/" {
if base.ends_with("/") {
base.pop();
}
Ok(format!("{base}{suffix}"))
} else {
Ok(base)
}
}
}
})
.transpose()
}
#[derive(Debug, Clone, Deserialize, Serialize, Default, JsonSchema, PartialEq)]
#[serde(deny_unknown_fields, default)]
pub(crate) struct HttpExporter {
pub(crate) headers: HashMap<String, String>,
}
#[derive(Debug, Clone, Deserialize, Serialize, Default, JsonSchema, PartialEq)]
#[serde(deny_unknown_fields, default)]
pub(crate) struct GrpcExporter {
pub(crate) domain_name: Option<String>,
pub(crate) ca: Option<String>,
pub(crate) cert: Option<String>,
pub(crate) key: Option<String>,
#[serde(with = "http_serde::header_map")]
#[schemars(schema_with = "header_map", default)]
pub(crate) metadata: http::HeaderMap,
}
fn header_map(generator: &mut schemars::SchemaGenerator) -> schemars::Schema {
HashMap::<String, Value>::json_schema(generator)
}
impl GrpcExporter {
pub(crate) fn to_tls_config(&self, endpoint: &Uri) -> Result<ClientTlsConfig, BoxError> {
let endpoint = endpoint
.to_string()
.parse::<Url>()
.map_err(|e| BoxError::from(format!("invalid GRPC endpoint {endpoint}, {e}")))?;
let domain_name = self.default_tls_domain(&endpoint);
if let (Some(ca), Some(key), Some(cert), Some(domain_name)) =
(&self.ca, &self.key, &self.cert, domain_name)
{
Ok(ClientTlsConfig::new()
.with_native_roots()
.domain_name(domain_name)
.ca_certificate(Certificate::from_pem(ca.clone()))
.identity(Identity::from_pem(cert.clone(), key.clone())))
} else {
Ok(ClientTlsConfig::new().with_native_roots())
}
}
fn default_tls_domain<'a>(&'a self, endpoint: &'a Url) -> Option<&'a str> {
match (&self.domain_name, endpoint) {
(Some(domain), _) => Some(domain.as_str()),
(None, endpoint) if endpoint.scheme() == "https" => endpoint.host_str(),
(None, endpoint) if endpoint.port() == Some(443) && endpoint.scheme() != "http" => {
tracing::warn!(
"telemetry otlp exporter has been configured with port 443 but TLS domain has not been set. This is likely a configuration error"
);
None
}
_ => None,
}
}
}
#[derive(Debug, Default, Clone, Deserialize, Serialize, JsonSchema, PartialEq)]
#[serde(deny_unknown_fields, rename_all = "snake_case")]
pub(crate) enum Protocol {
#[default]
Grpc,
Http,
}
#[derive(Debug, Default, Clone, Deserialize, Serialize, JsonSchema, PartialEq, Copy)]
#[serde(deny_unknown_fields, rename_all = "snake_case")]
pub(crate) enum Temporality {
#[default]
Cumulative,
Delta,
}
impl From<Temporality> for SdkTemporality {
fn from(value: Temporality) -> Self {
match value {
Temporality::Cumulative => SdkTemporality::Cumulative,
Temporality::Delta => SdkTemporality::Delta,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn endpoint_grpc_defaulting_no_scheme() {
let url = Url::parse("api.apm.com:433").unwrap();
let exporter = GrpcExporter::default();
let domain = exporter.default_tls_domain(&url);
assert_eq!(domain, None);
}
#[test]
fn endpoint_grpc_defaulting_scheme() {
let url = Url::parse("https://api.apm.com:433").unwrap();
let exporter = GrpcExporter::default();
let domain = exporter.default_tls_domain(&url);
assert_eq!(domain, Some(url.domain().expect("domain was expected")),);
}
#[test]
fn endpoint_grpc_explicit_domain() {
let url = Url::parse("https://api.apm.com:433").unwrap();
let exporter = GrpcExporter {
domain_name: Some("foo.bar".to_string()),
..Default::default()
};
let domain = exporter.default_tls_domain(&url);
assert_eq!(domain, Some("foo.bar"));
}
#[test]
fn test_process_endpoint() {
let endpoint = None;
let processed_endpoint =
process_endpoint(&endpoint, &TelemetryDataKind::Traces, &Protocol::Grpc).unwrap();
assert_eq!(endpoint, processed_endpoint);
let endpoint = Some("default".to_string());
let processed_endpoint =
process_endpoint(&endpoint, &TelemetryDataKind::Traces, &Protocol::Grpc).unwrap();
assert_eq!(Some("".to_string()), processed_endpoint);
let endpoint = Some("https://api.apm.com:433/v1/traces".to_string());
let processed_endpoint =
process_endpoint(&endpoint, &TelemetryDataKind::Traces, &Protocol::Grpc).unwrap();
assert_eq!(endpoint, processed_endpoint);
let endpoint = Some("https://api.apm.com:433".to_string());
let processed_endpoint =
process_endpoint(&endpoint, &TelemetryDataKind::Traces, &Protocol::Grpc).unwrap();
assert_eq!(
Some("https://api.apm.com:433/".to_string()),
processed_endpoint
);
let endpoint = Some("https://api.apm.com:433".to_string());
let processed_endpoint =
process_endpoint(&endpoint, &TelemetryDataKind::Traces, &Protocol::Http).unwrap();
assert_eq!(
Some("https://api.apm.com:433/v1/traces".to_string()),
processed_endpoint
);
let endpoint = Some("https://api.apm.com:433/".to_string());
let processed_endpoint =
process_endpoint(&endpoint, &TelemetryDataKind::Traces, &Protocol::Grpc).unwrap();
assert_eq!(endpoint, processed_endpoint);
let endpoint = Some("https://api.apm.com:433/traces".to_string());
let processed_endpoint =
process_endpoint(&endpoint, &TelemetryDataKind::Traces, &Protocol::Grpc).unwrap();
assert_eq!(endpoint, processed_endpoint);
let endpoint = Some("localhost:4317".to_string());
let processed_endpoint =
process_endpoint(&endpoint, &TelemetryDataKind::Traces, &Protocol::Grpc).unwrap();
assert_eq!(
Some("http://localhost:4317/".to_string()),
processed_endpoint
);
let endpoint = Some("localhost:4317".to_string());
let processed_endpoint =
process_endpoint(&endpoint, &TelemetryDataKind::Traces, &Protocol::Http).unwrap();
assert_eq!(
Some("http://localhost:4317/v1/traces".to_string()),
processed_endpoint
);
let endpoint = Some("https://otlp.nr-data.net".to_string());
let processed_endpoint =
process_endpoint(&endpoint, &TelemetryDataKind::Traces, &Protocol::Http).unwrap();
assert_eq!(
Some("https://otlp.nr-data.net/v1/traces".to_string()),
processed_endpoint
);
let endpoint = None;
let processed_endpoint =
process_endpoint(&endpoint, &TelemetryDataKind::Metrics, &Protocol::Grpc).unwrap();
assert_eq!(None, processed_endpoint);
let endpoint = Some("default".to_string());
let processed_endpoint =
process_endpoint(&endpoint, &TelemetryDataKind::Metrics, &Protocol::Grpc).unwrap();
assert_eq!(Some("".to_string()), processed_endpoint);
let endpoint = Some("https://api.apm.com:433/v1/metrics".to_string());
let processed_endpoint =
process_endpoint(&endpoint, &TelemetryDataKind::Metrics, &Protocol::Grpc).unwrap();
assert_eq!(endpoint, processed_endpoint);
let endpoint = Some("https://api.apm.com:433".to_string());
let processed_endpoint =
process_endpoint(&endpoint, &TelemetryDataKind::Metrics, &Protocol::Grpc).unwrap();
assert_eq!(
Some("https://api.apm.com:433/".to_string()),
processed_endpoint
);
let endpoint = Some("https://api.apm.com:433".to_string());
let processed_endpoint =
process_endpoint(&endpoint, &TelemetryDataKind::Metrics, &Protocol::Http).unwrap();
assert_eq!(
Some("https://api.apm.com:433/v1/metrics".to_string()),
processed_endpoint
);
let endpoint = Some("https://api.apm.com:433/".to_string());
let processed_endpoint =
process_endpoint(&endpoint, &TelemetryDataKind::Metrics, &Protocol::Grpc).unwrap();
assert_eq!(endpoint, processed_endpoint);
let endpoint = Some("https://api.apm.com:433/metrics".to_string());
let processed_endpoint =
process_endpoint(&endpoint, &TelemetryDataKind::Metrics, &Protocol::Grpc).unwrap();
assert_eq!(endpoint, processed_endpoint);
let endpoint = Some("localhost:4317".to_string());
let processed_endpoint =
process_endpoint(&endpoint, &TelemetryDataKind::Metrics, &Protocol::Grpc).unwrap();
assert_eq!(
Some("http://localhost:4317/".to_string()),
processed_endpoint
);
let endpoint = Some("localhost:4317".to_string());
let processed_endpoint =
process_endpoint(&endpoint, &TelemetryDataKind::Metrics, &Protocol::Http).unwrap();
assert_eq!(
Some("http://localhost:4317/v1/metrics".to_string()),
processed_endpoint
);
let endpoint = Some("https://otlp.nr-data.net".to_string());
let processed_endpoint =
process_endpoint(&endpoint, &TelemetryDataKind::Metrics, &Protocol::Http).unwrap();
assert_eq!(
Some("https://otlp.nr-data.net/v1/metrics".to_string()),
processed_endpoint
);
}
}