use std::collections::HashMap;
use opentelemetry_sdk::metrics::Temporality;
use crate::InitError;
use crate::config::exporters::{
Header, HeadersList, OtlpGrpcExporterConfig, OtlpHttpExporterConfig, TemporalityPreference,
};
use crate::error::ExporterKind;
use apollo_configuration::types::{HeaderName, HeaderValue};
use apollo_redaction::Redacted;
type HeaderMap = HashMap<HeaderName, Redacted<HeaderValue>>;
impl From<TemporalityPreference> for Temporality {
fn from(value: TemporalityPreference) -> Self {
match value {
TemporalityPreference::Cumulative => Temporality::Cumulative,
TemporalityPreference::Delta => Temporality::Delta,
TemporalityPreference::LowMemory => Temporality::LowMemory,
}
}
}
const DEFAULT_OTLP_HTTP_TRACES_ENDPOINT: &str = "http://localhost:4318/v1/traces";
const DEFAULT_OTLP_HTTP_LOGS_ENDPOINT: &str = "http://localhost:4318/v1/logs";
const DEFAULT_OTLP_HTTP_METRICS_ENDPOINT: &str = "http://localhost:4318/v1/metrics";
fn build_final_headers(headers: &[Header], headers_list: &HeadersList) -> HeaderMap {
let mut map = HashMap::new();
for h in headers_list.iter().chain(headers.iter().cloned()) {
map.insert(h.name, h.value);
}
map
}
fn into_grpc_metadata(headers: HeaderMap) -> tonic::metadata::MetadataMap {
let mut metadata = tonic::metadata::MetadataMap::new();
for (name, value) in headers {
if let (Ok(key), Some(val)) = (
name.to_string()
.parse::<tonic::metadata::MetadataKey<tonic::metadata::Ascii>>(),
value.unredact().to_str().ok().and_then(|v| {
v.parse::<tonic::metadata::MetadataValue<tonic::metadata::Ascii>>()
.ok()
}),
) {
metadata.insert(key, val);
}
}
metadata
}
fn configure_http_exporter<T>(
mut builder: T,
config: &OtlpHttpExporterConfig,
default_endpoint: &str,
) -> T
where
T: opentelemetry_otlp::WithHttpConfig + opentelemetry_otlp::WithExportConfig,
{
let path = config.endpoint.path();
let endpoint = if path.ends_with("/v1/traces")
|| path.ends_with("/v1/logs")
|| path.ends_with("/v1/metrics")
{
config.endpoint.as_str().to_string()
} else {
let signal_path = default_endpoint
.strip_prefix("http://localhost:4318")
.unwrap_or("");
format!("{}{}", config.endpoint, signal_path)
};
builder = builder.with_endpoint(endpoint);
let headers: HashMap<String, String> =
build_final_headers(&config.headers, &config.headers_list)
.into_iter()
.filter_map(|(name, value)| {
value
.unredact()
.to_str()
.ok()
.map(|v| (name.to_string(), v.to_string()))
})
.collect();
if !headers.is_empty() {
builder = builder.with_headers(headers);
}
builder
}
fn configure_grpc_exporter<T>(mut builder: T, config: &OtlpGrpcExporterConfig) -> T
where
T: opentelemetry_otlp::WithTonicConfig + opentelemetry_otlp::WithExportConfig,
{
builder = builder.with_endpoint(config.endpoint.as_str());
let metadata = into_grpc_metadata(build_final_headers(&config.headers, &config.headers_list));
if !metadata.is_empty() {
builder = builder.with_metadata(metadata);
}
builder
}
pub(crate) fn build_otlp_span_exporter(
config: &OtlpHttpExporterConfig,
) -> Result<opentelemetry_otlp::SpanExporter, InitError> {
use opentelemetry_otlp::SpanExporter;
let builder = SpanExporter::builder().with_http();
let builder = configure_http_exporter(builder, config, DEFAULT_OTLP_HTTP_TRACES_ENDPOINT);
builder.build().map_err(|e| InitError::Exporter {
exporter: ExporterKind::OtlpHttp,
reason: e.to_string(),
})
}
pub(crate) fn build_otlp_log_exporter(
config: &OtlpHttpExporterConfig,
) -> Result<opentelemetry_otlp::LogExporter, InitError> {
use opentelemetry_otlp::LogExporter;
let builder = LogExporter::builder().with_http();
let builder = configure_http_exporter(builder, config, DEFAULT_OTLP_HTTP_LOGS_ENDPOINT);
builder.build().map_err(|e| InitError::Exporter {
exporter: ExporterKind::OtlpHttp,
reason: e.to_string(),
})
}
pub(crate) fn build_otlp_metric_exporter(
config: &OtlpHttpExporterConfig,
) -> Result<opentelemetry_otlp::MetricExporter, InitError> {
use opentelemetry_otlp::MetricExporter;
let builder = MetricExporter::builder().with_http();
let builder = configure_http_exporter(builder, config, DEFAULT_OTLP_HTTP_METRICS_ENDPOINT);
let builder = builder.with_temporality(config.temporality_preference.into());
builder.build().map_err(|e| InitError::Exporter {
exporter: ExporterKind::OtlpHttp,
reason: e.to_string(),
})
}
pub(crate) fn build_otlp_grpc_span_exporter(
config: &OtlpGrpcExporterConfig,
) -> Result<opentelemetry_otlp::SpanExporter, InitError> {
use opentelemetry_otlp::SpanExporter;
let builder = SpanExporter::builder().with_tonic();
let builder = configure_grpc_exporter(builder, config);
builder.build().map_err(|e| InitError::Exporter {
exporter: ExporterKind::OtlpGrpc,
reason: e.to_string(),
})
}
pub(crate) fn build_otlp_grpc_log_exporter(
config: &OtlpGrpcExporterConfig,
) -> Result<opentelemetry_otlp::LogExporter, InitError> {
use opentelemetry_otlp::LogExporter;
let builder = LogExporter::builder().with_tonic();
let builder = configure_grpc_exporter(builder, config);
builder.build().map_err(|e| InitError::Exporter {
exporter: ExporterKind::OtlpGrpc,
reason: e.to_string(),
})
}
pub(crate) fn build_otlp_grpc_metric_exporter(
config: &OtlpGrpcExporterConfig,
) -> Result<opentelemetry_otlp::MetricExporter, InitError> {
use opentelemetry_otlp::MetricExporter;
let builder = MetricExporter::builder().with_tonic();
let builder = configure_grpc_exporter(builder, config);
let builder = builder.with_temporality(config.temporality_preference.into());
builder.build().map_err(|e| InitError::Exporter {
exporter: ExporterKind::OtlpGrpc,
reason: e.to_string(),
})
}
#[cfg(test)]
mod tests {
use apollo_configuration::parse_yaml;
use super::*;
use crate::config::OpenTelemetryConfig;
#[test]
fn build_http_span_exporter() {
let config: OpenTelemetryConfig = parse_yaml(
indoc::indoc! {"
tracer_provider:
processors:
- batch:
exporter:
otlp_http:
endpoint: http://localhost:4318
"},
&Default::default(),
)
.unwrap();
let crate::config::traces::SpanProcessor::Batch(batch) =
&config.tracer_provider.processors[0]
else {
panic!("Expected Batch processor");
};
let crate::config::SpanExporter::OtlpHttp(otlp_config) = &batch.exporter else {
panic!("Expected OtlpHttp exporter");
};
assert!(build_otlp_span_exporter(otlp_config).is_ok());
}
#[test]
fn build_http_log_exporter() {
let config: OpenTelemetryConfig = parse_yaml(
indoc::indoc! {"
logger_provider:
processors:
- batch:
exporter:
otlp_http:
endpoint: http://localhost:4318
"},
&Default::default(),
)
.unwrap();
let crate::config::logs::LogRecordProcessor::Batch(batch) =
&config.logger_provider.processors[0]
else {
panic!("Expected Batch processor");
};
let crate::config::LogExporter::OtlpHttp(otlp_config) = &batch.exporter else {
panic!("Expected OtlpHttp exporter");
};
assert!(build_otlp_log_exporter(otlp_config).is_ok());
}
#[test]
fn build_http_metric_exporter() {
let config: OpenTelemetryConfig = parse_yaml(
indoc::indoc! {"
meter_provider:
readers:
- periodic:
exporter:
otlp_http:
endpoint: http://localhost:4318
"},
&Default::default(),
)
.unwrap();
let crate::config::metrics::MetricReader::Periodic(periodic) =
&config.meter_provider.readers[0]
else {
panic!("Expected Periodic reader");
};
let crate::config::MetricExporter::OtlpHttp(otlp_config) = &periodic.exporter else {
panic!("Expected OtlpHttp exporter");
};
assert!(build_otlp_metric_exporter(otlp_config).is_ok());
}
#[test]
fn test_build_final_headers() {
let key_api = HeaderName::from("x-api-key".parse::<http::HeaderName>().unwrap());
let key_tenant = HeaderName::from("x-tenant".parse::<http::HeaderName>().unwrap());
let headers = vec![
crate::config::exporters::Header {
name: key_api.clone(),
value: apollo_redaction::Redacted::new(HeaderValue::from(
http::HeaderValue::from_static("secret"),
)),
},
crate::config::exporters::Header {
name: key_tenant.clone(),
value: apollo_redaction::Redacted::new(HeaderValue::from(
http::HeaderValue::from_static("tenant-1"),
)),
},
];
let result = build_final_headers(&headers, &HeadersList::default());
assert_eq!(result.len(), 2);
assert_eq!(result[&key_api].unredact().to_str().unwrap(), "secret");
assert_eq!(result[&key_tenant].unredact().to_str().unwrap(), "tenant-1");
}
#[test]
fn test_build_final_headers_list_merges_and_headers_wins() {
let key_api = HeaderName::from("x-api-key".parse::<http::HeaderName>().unwrap());
let key_from_list = HeaderName::from("x-from-list".parse::<http::HeaderName>().unwrap());
let list = HeadersList::new("x-api-key=from-list,x-from-list=only-list");
let headers = vec![crate::config::exporters::Header {
name: key_api.clone(),
value: apollo_redaction::Redacted::new(HeaderValue::from(
http::HeaderValue::from_static("from-headers"),
)),
}];
let result = build_final_headers(&headers, &list);
assert_eq!(result.len(), 2);
assert_eq!(
result[&key_api].unredact().to_str().unwrap(),
"from-headers"
);
assert_eq!(
result[&key_from_list].unredact().to_str().unwrap(),
"only-list"
);
}
#[test]
fn test_build_grpc_metadata() {
let headers = vec![crate::config::exporters::Header {
name: HeaderName::from("x-api-key".parse::<http::HeaderName>().unwrap()),
value: apollo_redaction::Redacted::new(HeaderValue::from(
http::HeaderValue::from_static("secret"),
)),
}];
let result = into_grpc_metadata(build_final_headers(&headers, &HeadersList::default()));
assert!(!result.is_empty());
}
#[test]
fn test_build_grpc_metadata_list_merges_and_headers_wins() {
let list = HeadersList::new("x-api-key=from-list,x-from-list=only-list");
let headers = vec![crate::config::exporters::Header {
name: HeaderName::from("x-api-key".parse::<http::HeaderName>().unwrap()),
value: apollo_redaction::Redacted::new(HeaderValue::from(
http::HeaderValue::from_static("from-headers"),
)),
}];
let result = into_grpc_metadata(build_final_headers(&headers, &list));
assert_eq!(
result.get("x-api-key").unwrap().to_str().unwrap(),
"from-headers"
);
assert_eq!(
result.get("x-from-list").unwrap().to_str().unwrap(),
"only-list"
);
}
#[test]
fn build_http_metric_exporter_with_temporality_delta() {
let config: OpenTelemetryConfig = parse_yaml(
indoc::indoc! {"
meter_provider:
readers:
- periodic:
exporter:
otlp_http:
endpoint: http://localhost:4318
temporality_preference: delta
"},
&Default::default(),
)
.unwrap();
let crate::config::metrics::MetricReader::Periodic(periodic) =
&config.meter_provider.readers[0]
else {
panic!("Expected Periodic reader");
};
let crate::config::MetricExporter::OtlpHttp(otlp_config) = &periodic.exporter else {
panic!("Expected OtlpHttp exporter");
};
assert!(build_otlp_metric_exporter(otlp_config).is_ok());
}
#[test]
fn temporality_preference_conversion() {
use crate::config::exporters::TemporalityPreference;
use opentelemetry_sdk::metrics::Temporality;
assert_eq!(
Temporality::from(TemporalityPreference::Cumulative),
Temporality::Cumulative
);
assert_eq!(
Temporality::from(TemporalityPreference::Delta),
Temporality::Delta
);
assert_eq!(
Temporality::from(TemporalityPreference::LowMemory),
Temporality::LowMemory
);
}
}