use std::borrow::Cow;
use apollo_configuration::configuration;
use apollo_configuration::types::{HeaderName, HeaderValue, Url};
use apollo_redaction::Redacted;
use apollo_redaction::strategy::First4Redactor;
use percent_encoding::percent_decode_str;
use schemars::JsonSchema;
#[configuration]
pub(crate) struct OtlpHttpExporterConfig {
#[config(default = "http://localhost:4318".parse::<url::Url>().unwrap().into())]
pub(crate) endpoint: Url,
pub(crate) encoding: OtlpEncoding,
pub(crate) compression: Compression,
#[config(default = 10000)]
pub(crate) timeout: u64,
pub(crate) headers: Vec<Header>,
pub(crate) headers_list: HeadersList,
pub(crate) tls: Option<TlsConfig>,
pub(crate) temporality_preference: TemporalityPreference,
}
#[configuration]
pub(crate) struct OtlpGrpcExporterConfig {
#[config(default = "http://localhost:4317".parse::<url::Url>().unwrap().into())]
pub(crate) endpoint: Url,
pub(crate) compression: Compression,
#[config(default = 10000)]
pub(crate) timeout: u64,
pub(crate) headers: Vec<Header>,
pub(crate) headers_list: HeadersList,
pub(crate) tls: Option<TlsConfig>,
pub(crate) temporality_preference: TemporalityPreference,
}
#[configuration]
#[derive(Copy, PartialEq, Eq)]
pub(crate) enum OtlpEncoding {
#[config(default)]
Protobuf,
Json,
}
#[configuration]
#[derive(Copy, PartialEq, Eq)]
pub(crate) enum TemporalityPreference {
#[config(default)]
Cumulative,
Delta,
LowMemory,
}
#[configuration]
#[derive(Copy, PartialEq, Eq)]
pub(crate) enum Compression {
#[config(default)]
None,
Gzip,
}
#[derive(Clone, Default, Debug)]
pub(crate) struct HeadersList(Redacted<String>);
impl HeadersList {
pub(crate) fn iter(&self) -> impl Iterator<Item = Header> + '_ {
self.0.unredact().split(',').filter_map(|entry| {
let entry = entry.trim();
if entry.is_empty() {
return None;
}
Some(
parse_header_entry(entry)
.expect("invalid header entry should have been caught by validate()"),
)
})
}
}
fn parse_header_entry(entry: &str) -> Result<Header, String> {
let (name, value) = entry.split_once('=').ok_or_else(|| {
format!(
"invalid header entry {}: missing '='",
Redacted::<_, First4Redactor>::new(entry)
)
})?;
let name = name.trim();
if name.is_empty() {
return Err("invalid header entry: name must not be empty".to_string());
}
let name = name
.parse::<http::HeaderName>()
.map_err(|_| {
format!(
"invalid header name {}",
Redacted::<_, First4Redactor>::new(name)
)
})
.map(HeaderName::from)?;
let raw_value = value.trim();
let decoded = percent_decode_str(raw_value)
.decode_utf8_lossy()
.into_owned();
let value = http::HeaderValue::from_str(&decoded)
.map_err(|_| {
format!(
"invalid header value for {}",
Redacted::<_, First4Redactor>::new(name.as_str())
)
})
.map(HeaderValue::from)?;
Ok(Header {
name,
value: Redacted::new(value),
})
}
#[cfg(test)]
impl HeadersList {
pub(crate) fn new(s: impl Into<String>) -> Self {
HeadersList(Redacted::new(s.into()))
}
}
impl<'de> serde::Deserialize<'de> for HeadersList {
fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
Ok(HeadersList(Redacted::new(String::deserialize(
deserializer,
)?)))
}
}
impl serde::Serialize for HeadersList {
fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
self.0.unredact().serialize(serializer)
}
}
impl JsonSchema for HeadersList {
fn schema_name() -> Cow<'static, str> {
Cow::Borrowed("HeadersList")
}
fn json_schema(generator: &mut schemars::SchemaGenerator) -> schemars::Schema {
String::json_schema(generator)
}
}
impl apollo_configuration::Validate for HeadersList {
fn validate(&self, mut errors: apollo_configuration::ErrorCollector<'_>) {
for entry in self.0.unredact().split(',') {
let entry = entry.trim();
if entry.is_empty() {
continue;
}
if let Err(message) = parse_header_entry(entry) {
errors.report_simple(message);
}
}
}
}
#[configuration]
pub(crate) struct Header {
#[config(required)]
pub(crate) name: HeaderName,
#[config(required)]
pub(crate) value: Redacted<HeaderValue>,
}
#[configuration]
pub(crate) struct TlsConfig {
pub(crate) ca_file: Option<String>,
pub(crate) cert_file: Option<String>,
pub(crate) key_file: Option<String>,
#[config(default = false)]
pub(crate) insecure: bool,
#[config(default = false)]
pub(crate) insecure_skip_verify: bool,
}
#[cfg(test)]
mod tests {
use apollo_configuration::parse_yaml;
use super::*;
use crate::config::OpenTelemetryConfig;
use crate::config::SpanExporter;
use crate::config::traces::SpanProcessor;
fn print_miette(diagnostic: &dyn miette::Diagnostic) -> String {
use miette::EyreContext as _;
struct F<'a>(&'a dyn miette::Diagnostic);
impl std::fmt::Display for F<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
miette::MietteHandlerOpts::new()
.color(false)
.build()
.debug(self.0, f)
}
}
F(diagnostic).to_string()
}
#[test]
fn otlp_http_defaults() {
let config: OpenTelemetryConfig = parse_yaml(
indoc::indoc! {"
tracer_provider:
processors:
- batch:
exporter:
otlp_http: {}
"},
&Default::default(),
)
.unwrap();
let SpanProcessor::Batch(batch) = &config.tracer_provider.processors[0] else {
panic!("expected Batch processor");
};
let SpanExporter::OtlpHttp(cfg) = &batch.exporter else {
panic!("expected OtlpHttp exporter");
};
assert_eq!(cfg.endpoint.as_str(), "http://localhost:4318/");
assert_eq!(cfg.timeout, 10000);
assert_eq!(cfg.encoding, OtlpEncoding::Protobuf);
assert_eq!(cfg.compression, Compression::None);
assert_eq!(
cfg.temporality_preference,
TemporalityPreference::Cumulative
);
assert!(cfg.headers.is_empty());
assert!(cfg.headers_list.iter().next().is_none());
assert!(cfg.tls.is_none());
}
#[test]
fn otlp_grpc_defaults() {
let config: OpenTelemetryConfig = parse_yaml(
indoc::indoc! {"
tracer_provider:
processors:
- batch:
exporter:
otlp_grpc: {}
"},
&Default::default(),
)
.unwrap();
let SpanProcessor::Batch(batch) = &config.tracer_provider.processors[0] else {
panic!("expected Batch processor");
};
let SpanExporter::OtlpGrpc(cfg) = &batch.exporter else {
panic!("expected OtlpGrpc exporter");
};
assert_eq!(cfg.endpoint.as_str(), "http://localhost:4317/");
assert_eq!(cfg.timeout, 10000);
assert_eq!(cfg.compression, Compression::None);
assert_eq!(
cfg.temporality_preference,
TemporalityPreference::Cumulative
);
assert!(cfg.headers.is_empty());
assert!(cfg.headers_list.iter().next().is_none());
assert!(cfg.tls.is_none());
}
fn to_str_pairs(list: &HeadersList) -> Vec<(String, String)> {
list.iter()
.map(|h| {
(
h.name.to_string(),
h.value.unredact().to_str().unwrap().to_owned(),
)
})
.collect()
}
#[test]
fn headers_list_iter_decodes_percent_encoded_values() {
let list = HeadersList::new("x-api-key=abc123,x-tenant=my%2Dorg");
assert_eq!(
to_str_pairs(&list),
vec![
("x-api-key".to_string(), "abc123".to_string()),
("x-tenant".to_string(), "my-org".to_string()),
]
);
}
#[test]
fn headers_list_validate_rejects_missing_equals() {
let err = parse_yaml::<OpenTelemetryConfig>(
indoc::indoc! {"
tracer_provider:
processors:
- batch:
exporter:
otlp_http:
headers_list: no-equals
"},
&Default::default(),
)
.unwrap_err();
insta::assert_snapshot!(print_miette(&err), @r"
apollo::configuration::schema
× schema validation error
╰─▶ schema validation error
Error: apollo::configuration::validation
× invalid header entry no-e****: missing '='
╭─[6:27]
5 │ otlp_http:
6 │ headers_list: no-equals
· ────┬────
· ╰── invalid header entry no-e****: missing '='
╰────
");
}
#[test]
fn headers_list_validate_rejects_empty_name() {
let err = parse_yaml::<OpenTelemetryConfig>(
indoc::indoc! {"
tracer_provider:
processors:
- batch:
exporter:
otlp_http:
headers_list: \"=no-name\"
"},
&Default::default(),
)
.unwrap_err();
insta::assert_snapshot!(print_miette(&err), @r#"
apollo::configuration::schema
× schema validation error
╰─▶ schema validation error
Error: apollo::configuration::validation
× invalid header entry: name must not be empty
╭─[6:27]
5 │ otlp_http:
6 │ headers_list: "=no-name"
· ─────┬────
· ╰── invalid header entry: name must not be empty
╰────
"#);
}
#[test]
fn headers_list_validate_accepts_valid_entries() {
let result: Result<OpenTelemetryConfig, _> = parse_yaml(
indoc::indoc! {"
tracer_provider:
processors:
- batch:
exporter:
otlp_http:
headers_list: \"x-api-key=abc123,x-tenant=my-org\"
"},
&Default::default(),
);
assert!(result.is_ok());
}
#[test]
fn headers_list_validate_rejects_invalid_header_name() {
let err = parse_yaml::<OpenTelemetryConfig>(
indoc::indoc! {"
tracer_provider:
processors:
- batch:
exporter:
otlp_http:
headers_list: \"invalid name=value\"
"},
&Default::default(),
)
.unwrap_err();
insta::assert_snapshot!(print_miette(&err), @r#"
apollo::configuration::schema
× schema validation error
╰─▶ schema validation error
Error: apollo::configuration::validation
× invalid header name inva****
╭─[6:27]
5 │ otlp_http:
6 │ headers_list: "invalid name=value"
· ──────────┬─────────
· ╰── invalid header name inva****
╰────
"#);
}
#[test]
fn headers_list_validate_rejects_invalid_header_value() {
let err = parse_yaml::<OpenTelemetryConfig>(
indoc::indoc! {"
tracer_provider:
processors:
- batch:
exporter:
otlp_http:
headers_list: \"x-api-key=%0A\"
"},
&Default::default(),
)
.unwrap_err();
insta::assert_snapshot!(print_miette(&err), @r#"
apollo::configuration::schema
× schema validation error
╰─▶ schema validation error
Error: apollo::configuration::validation
× invalid header value for x-ap****
╭─[6:27]
5 │ otlp_http:
6 │ headers_list: "x-api-key=%0A"
· ───────┬───────
· ╰── invalid header value for x-ap****
╰────
"#);
}
#[test]
fn otlp_http_temporality_preference_all_variants() {
for (yaml_value, expected) in [
("cumulative", TemporalityPreference::Cumulative),
("delta", TemporalityPreference::Delta),
("low_memory", TemporalityPreference::LowMemory),
] {
let config: OpenTelemetryConfig = parse_yaml(
&indoc::formatdoc! {"
tracer_provider:
processors:
- batch:
exporter:
otlp_http:
temporality_preference: {yaml_value}
"},
&Default::default(),
)
.unwrap();
let SpanProcessor::Batch(batch) = &config.tracer_provider.processors[0] else {
panic!("expected Batch processor");
};
let SpanExporter::OtlpHttp(cfg) = &batch.exporter else {
panic!("expected OtlpHttp exporter");
};
assert_eq!(cfg.temporality_preference, expected);
}
}
}