use std::collections::BTreeMap;
use std::collections::HashSet;
use derivative::Derivative;
use http::HeaderName;
use num_traits::ToPrimitive;
use opentelemetry::Array;
use opentelemetry::Value;
use opentelemetry_sdk::metrics::Aggregation;
use opentelemetry_sdk::metrics::Instrument;
use opentelemetry_sdk::metrics::Stream;
use opentelemetry_sdk::trace::SpanLimits;
use schemars::JsonSchema;
use serde::Deserialize;
use serde::Serialize;
use super::*;
use crate::Configuration;
use crate::plugin::serde::deserialize_option_header_name;
use crate::plugins::telemetry::apollo::Config as ApolloTelemetryConfig;
use crate::plugins::telemetry::metrics;
use crate::plugins::telemetry::resource::ConfigResource;
use crate::plugins::telemetry::tracing::datadog::DatadogAgentSampling;
#[derive(thiserror::Error, Debug)]
pub(crate) enum Error {
#[error(
"field level instrumentation sampler must sample less frequently than tracing level sampler"
)]
InvalidFieldLevelInstrumentationSampler,
}
pub(in crate::plugins::telemetry) trait GenericWith<T>
where
Self: Sized,
{
fn with<B>(self, option: &Option<B>, apply: fn(Self, &B) -> Self) -> Self {
if let Some(option) = option {
return apply(self, option);
}
self
}
}
impl<T> GenericWith<T> for T where Self: Sized {}
#[derive(Clone, Default, Debug, Deserialize, JsonSchema)]
#[serde(deny_unknown_fields, default)]
#[schemars(rename = "TelemetryConfig")]
pub(crate) struct Conf {
pub(crate) apollo: apollo::Config,
pub(crate) exporters: Exporters,
pub(crate) instrumentation: Instrumentation,
}
#[derive(Clone, Default, Debug, Deserialize, JsonSchema, PartialEq)]
#[serde(deny_unknown_fields, default)]
pub(crate) struct Exporters {
pub(crate) logging: config_new::logging::Logging,
pub(crate) metrics: Metrics,
pub(crate) tracing: Tracing,
}
#[derive(Clone, Default, Debug, Deserialize, JsonSchema)]
#[serde(deny_unknown_fields, default)]
pub(crate) struct Instrumentation {
pub(crate) events: config_new::events::Events,
pub(crate) spans: config_new::spans::Spans,
pub(crate) instruments: config_new::instruments::InstrumentsConfig,
}
impl Instrumentation {
pub(crate) fn validate(&self) -> Result<(), String> {
self.events.validate()?;
self.instruments.validate()?;
self.spans.validate()
}
}
#[derive(Clone, Default, Debug, Deserialize, JsonSchema, PartialEq)]
#[serde(deny_unknown_fields, default)]
pub(crate) struct Metrics {
pub(crate) common: MetricsCommon,
pub(crate) otlp: otlp::Config,
pub(crate) prometheus: metrics::prometheus::Config,
}
#[derive(Clone, Debug, Deserialize, JsonSchema, PartialEq)]
#[serde(deny_unknown_fields, default)]
pub(crate) struct MetricsCommon {
pub(crate) service_name: Option<String>,
pub(crate) service_namespace: Option<String>,
pub(crate) resource: BTreeMap<String, AttributeValue>,
pub(crate) buckets: Vec<f64>,
pub(crate) views: Vec<MetricView>,
}
impl Default for MetricsCommon {
fn default() -> Self {
Self {
service_name: None,
service_namespace: None,
resource: BTreeMap::new(),
views: Vec::with_capacity(0),
buckets: vec![
0.001, 0.005, 0.015, 0.05, 0.1, 0.2, 0.3, 0.4, 0.5, 1.0, 5.0, 10.0,
],
}
}
}
#[derive(Clone, Debug, Deserialize, JsonSchema, PartialEq)]
#[serde(deny_unknown_fields)]
pub(crate) struct MetricView {
pub(crate) name: String,
pub(crate) rename: Option<String>,
pub(crate) description: Option<String>,
pub(crate) unit: Option<String>,
pub(crate) aggregation: Option<MetricAggregation>,
pub(crate) allowed_attribute_keys: Option<HashSet<String>>,
}
impl MetricView {
pub(crate) fn default_histogram(name: String, boundaries: Vec<f64>) -> Self {
Self {
name,
rename: None,
description: None,
unit: None,
aggregation: Some(MetricAggregation::Histogram {
buckets: boundaries,
}),
allowed_attribute_keys: None,
}
}
pub(crate) fn merge(self, user: Self) -> Self {
Self {
name: self.name,
rename: user.rename.or(self.rename),
description: user.description.or(self.description),
unit: user.unit.or(self.unit),
aggregation: user.aggregation.or(self.aggregation),
allowed_attribute_keys: user.allowed_attribute_keys.or(self.allowed_attribute_keys),
}
}
pub(crate) fn into_stream(self) -> Stream {
let mut stream = Stream::builder();
if let Some(new_name) = self.rename {
stream = stream.with_name(new_name);
}
if let Some(desc) = self.description {
stream = stream.with_description(desc);
}
if let Some(u) = self.unit {
stream = stream.with_unit(u);
}
if let Some(agg) = self.aggregation {
let aggregation = match agg {
MetricAggregation::Histogram { buckets } => Aggregation::ExplicitBucketHistogram {
boundaries: buckets,
record_min_max: true,
},
MetricAggregation::Drop => Aggregation::Drop,
};
stream = stream.with_aggregation(aggregation);
}
if let Some(keys) = self.allowed_attribute_keys {
stream = stream.with_allowed_attribute_keys(keys.into_iter().map(Key::new));
}
stream.build().expect("Failed to build metric view")
}
pub(crate) fn into_view_fn(
self,
) -> impl Fn(&Instrument) -> Option<Stream> + Send + Sync + 'static {
let name = self.name.clone();
let view = self;
move |instrument: &Instrument| {
if instrument.name() != name {
return None;
}
Some(view.clone().into_stream())
}
}
}
#[derive(Clone, Debug, Deserialize, JsonSchema, PartialEq)]
#[serde(deny_unknown_fields, rename_all = "snake_case")]
pub(crate) enum MetricAggregation {
Histogram { buckets: Vec<f64> },
Drop,
}
#[derive(Clone, Default, Debug, Deserialize, JsonSchema, PartialEq)]
#[serde(deny_unknown_fields, default)]
pub(crate) struct Tracing {
#[serde(default, rename = "experimental_response_trace_id")]
pub(crate) response_trace_id: ExposeTraceId,
pub(crate) propagation: Propagation,
pub(crate) common: TracingCommon,
pub(crate) otlp: otlp::Config,
pub(crate) zipkin: tracing::zipkin::Config,
pub(crate) datadog: tracing::datadog::Config,
}
impl Tracing {
pub(crate) fn is_baggage_propagation_enabled(&self) -> bool {
self.propagation.baggage
}
pub(crate) fn is_trace_context_propagation_enabled(&self) -> bool {
self.propagation.trace_context || self.otlp.enabled
}
pub(crate) fn is_jaeger_propagation_enabled(&self) -> bool {
self.propagation.jaeger
}
pub(crate) fn is_datadog_propagation_enabled(&self) -> bool {
self.propagation.datadog.unwrap_or(false) || self.datadog.enabled
}
pub(crate) fn is_zipkin_propagation_enabled(&self) -> bool {
self.propagation.zipkin || self.zipkin.enabled
}
pub(crate) fn is_aws_xray_propagation_enabled(&self) -> bool {
self.propagation.aws_xray
}
}
#[derive(Clone, Default, Debug, Deserialize, JsonSchema, PartialEq)]
#[serde(deny_unknown_fields, default)]
pub(crate) struct ExposeTraceId {
pub(crate) enabled: bool,
#[schemars(with = "Option<String>")]
#[serde(deserialize_with = "deserialize_option_header_name")]
pub(crate) header_name: Option<HeaderName>,
pub(crate) format: TraceIdFormat,
}
#[derive(Clone, Default, Debug, Deserialize, JsonSchema, PartialEq, Eq)]
#[serde(deny_unknown_fields, rename_all = "snake_case")]
pub(crate) enum TraceIdFormat {
#[default]
Hexadecimal,
OpenTelemetry,
Decimal,
Datadog,
Uuid,
}
impl TraceIdFormat {
pub(crate) fn format(&self, trace_id: TraceId) -> String {
match self {
TraceIdFormat::Hexadecimal | TraceIdFormat::OpenTelemetry => {
format!("{trace_id:032x}")
}
TraceIdFormat::Decimal => format!("{}", u128::from_be_bytes(trace_id.to_bytes())),
TraceIdFormat::Datadog => trace_id.to_datadog(),
TraceIdFormat::Uuid => Uuid::from_bytes(trace_id.to_bytes()).to_string(),
}
}
}
#[derive(Clone, PartialEq, Eq, Default, Derivative, Serialize, Deserialize, JsonSchema)]
#[derivative(Debug)]
#[serde(deny_unknown_fields, rename_all = "lowercase")]
pub(crate) enum ApolloSignatureNormalizationAlgorithm {
Legacy,
#[default]
Enhanced,
}
#[derive(Clone, Default, Debug, Deserialize, JsonSchema, Copy, PartialEq)]
#[serde(deny_unknown_fields, rename_all = "lowercase")]
pub(crate) enum ApolloMetricsReferenceMode {
#[default]
Extended,
Standard,
}
#[derive(Clone, Default, Debug, Deserialize, JsonSchema, PartialEq)]
#[serde(deny_unknown_fields, default)]
pub(crate) struct Propagation {
pub(crate) request: RequestPropagation,
pub(crate) baggage: bool,
pub(crate) trace_context: bool,
pub(crate) jaeger: bool,
pub(crate) datadog: Option<bool>,
pub(crate) zipkin: bool,
pub(crate) aws_xray: bool,
}
#[derive(Clone, Debug, Deserialize, JsonSchema, Default, PartialEq)]
#[serde(deny_unknown_fields)]
pub(crate) struct RequestPropagation {
#[schemars(with = "String")]
#[serde(deserialize_with = "deserialize_option_header_name")]
pub(crate) header_name: Option<HeaderName>,
#[serde(default)]
pub(crate) format: TraceIdFormat,
}
#[derive(Debug, Clone, Deserialize, JsonSchema, PartialEq)]
#[serde(deny_unknown_fields, default)]
#[non_exhaustive]
pub(crate) struct TracingCommon {
pub(crate) service_name: Option<String>,
pub(crate) service_namespace: Option<String>,
pub(crate) sampler: SamplerOption,
pub(crate) preview_datadog_agent_sampling: Option<bool>,
pub(crate) parent_based_sampler: bool,
pub(crate) max_events_per_span: u32,
pub(crate) max_attributes_per_span: u32,
pub(crate) max_links_per_span: u32,
pub(crate) max_attributes_per_event: u32,
pub(crate) max_attributes_per_link: u32,
pub(crate) resource: BTreeMap<String, AttributeValue>,
}
impl ConfigResource for TracingCommon {
fn service_name(&self) -> &Option<String> {
&self.service_name
}
fn service_namespace(&self) -> &Option<String> {
&self.service_namespace
}
fn resource(&self) -> &BTreeMap<String, AttributeValue> {
&self.resource
}
}
impl ConfigResource for MetricsCommon {
fn service_name(&self) -> &Option<String> {
&self.service_name
}
fn service_namespace(&self) -> &Option<String> {
&self.service_namespace
}
fn resource(&self) -> &BTreeMap<String, AttributeValue> {
&self.resource
}
}
fn default_parent_based_sampler() -> bool {
true
}
fn default_sampler() -> SamplerOption {
SamplerOption::Always(Sampler::AlwaysOn)
}
impl Default for TracingCommon {
fn default() -> Self {
Self {
service_name: Default::default(),
service_namespace: Default::default(),
sampler: default_sampler(),
preview_datadog_agent_sampling: None,
parent_based_sampler: default_parent_based_sampler(),
max_events_per_span: default_max_events_per_span(),
max_attributes_per_span: default_max_attributes_per_span(),
max_links_per_span: default_max_links_per_span(),
max_attributes_per_event: default_max_attributes_per_event(),
max_attributes_per_link: default_max_attributes_per_link(),
resource: Default::default(),
}
}
}
fn default_max_events_per_span() -> u32 {
SpanLimits::default().max_events_per_span
}
fn default_max_attributes_per_span() -> u32 {
SpanLimits::default().max_attributes_per_span
}
fn default_max_links_per_span() -> u32 {
SpanLimits::default().max_links_per_span
}
fn default_max_attributes_per_event() -> u32 {
SpanLimits::default().max_attributes_per_event
}
fn default_max_attributes_per_link() -> u32 {
SpanLimits::default().max_attributes_per_link
}
#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, PartialEq)]
#[serde(untagged, deny_unknown_fields)]
#[allow(missing_docs)] pub enum AttributeValue {
Bool(bool),
I64(i64),
F64(f64),
String(String),
#[allow(private_interfaces)]
Array(AttributeArray),
}
impl AttributeValue {
pub(crate) fn as_f64(&self) -> Option<f64> {
match self {
AttributeValue::Bool(_) => None,
AttributeValue::I64(v) => Some(*v as f64),
AttributeValue::F64(v) => Some(*v),
AttributeValue::String(v) => v.parse::<f64>().ok(),
AttributeValue::Array(_) => None,
}
}
}
impl From<String> for AttributeValue {
fn from(value: String) -> Self {
AttributeValue::String(value)
}
}
impl From<&'static str> for AttributeValue {
fn from(value: &'static str) -> Self {
AttributeValue::String(value.to_string())
}
}
impl From<bool> for AttributeValue {
fn from(value: bool) -> Self {
AttributeValue::Bool(value)
}
}
impl From<f64> for AttributeValue {
fn from(value: f64) -> Self {
AttributeValue::F64(value)
}
}
impl From<i64> for AttributeValue {
fn from(value: i64) -> Self {
AttributeValue::I64(value)
}
}
impl std::fmt::Display for AttributeValue {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
AttributeValue::Bool(val) => write!(f, "{val}"),
AttributeValue::I64(val) => write!(f, "{val}"),
AttributeValue::F64(val) => write!(f, "{val}"),
AttributeValue::String(val) => write!(f, "{val}"),
AttributeValue::Array(val) => write!(f, "{val}"),
}
}
}
impl PartialOrd for AttributeValue {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
match (self, other) {
(AttributeValue::Bool(b1), AttributeValue::Bool(b2)) => b1.partial_cmp(b2),
(AttributeValue::F64(f1), AttributeValue::F64(f2)) => f1.partial_cmp(f2),
(AttributeValue::I64(i1), AttributeValue::I64(i2)) => i1.partial_cmp(i2),
(AttributeValue::String(s1), AttributeValue::String(s2)) => s1.partial_cmp(s2),
(AttributeValue::F64(f1), AttributeValue::I64(i)) => {
i.to_f64().as_ref().and_then(|f2| f1.partial_cmp(f2))
}
(AttributeValue::I64(i), AttributeValue::F64(f)) => i.to_f64()?.partial_cmp(f),
_ => None,
}
}
}
impl TryFrom<serde_json::Value> for AttributeValue {
type Error = ();
fn try_from(value: serde_json::Value) -> Result<Self, Self::Error> {
match value {
serde_json::Value::Null => Err(()),
serde_json::Value::Bool(v) => Ok(AttributeValue::Bool(v)),
serde_json::Value::Number(v) if v.is_i64() => {
Ok(AttributeValue::I64(v.as_i64().expect("i64 checked")))
}
serde_json::Value::Number(v) if v.is_f64() => {
Ok(AttributeValue::F64(v.as_f64().expect("f64 checked")))
}
serde_json::Value::String(v) => Ok(AttributeValue::String(v)),
serde_json::Value::Array(v) => {
if v.iter().all(|v| v.is_boolean()) {
Ok(AttributeValue::Array(AttributeArray::Bool(
v.iter()
.map(|v| v.as_bool().expect("all bools checked"))
.collect(),
)))
} else if v.iter().all(|v| v.is_f64()) {
Ok(AttributeValue::Array(AttributeArray::F64(
v.iter()
.map(|v| v.as_f64().expect("all f64 checked"))
.collect(),
)))
} else if v.iter().all(|v| v.is_i64()) {
Ok(AttributeValue::Array(AttributeArray::I64(
v.iter()
.map(|v| v.as_i64().expect("all i64 checked"))
.collect(),
)))
} else if v.iter().all(|v| v.is_string()) {
Ok(AttributeValue::Array(AttributeArray::String(
v.iter()
.map(|v| v.as_str().expect("all strings checked").to_string())
.collect(),
)))
} else {
Err(())
}
}
serde_json::Value::Object(_v) => Err(()),
_ => Err(()),
}
}
}
impl From<AttributeValue> for opentelemetry::Value {
fn from(value: AttributeValue) -> Self {
match value {
AttributeValue::Bool(v) => Value::Bool(v),
AttributeValue::I64(v) => Value::I64(v),
AttributeValue::F64(v) => Value::F64(v),
AttributeValue::String(v) => Value::String(v.into()),
AttributeValue::Array(v) => Value::Array(v.into()),
}
}
}
#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, PartialEq)]
#[serde(untagged, deny_unknown_fields)]
pub(crate) enum AttributeArray {
Bool(Vec<bool>),
I64(Vec<i64>),
F64(Vec<f64>),
String(Vec<String>),
}
impl std::fmt::Display for AttributeArray {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
AttributeArray::Bool(val) => write!(f, "{val:?}"),
AttributeArray::I64(val) => write!(f, "{val:?}"),
AttributeArray::F64(val) => write!(f, "{val:?}"),
AttributeArray::String(val) => write!(f, "{val:?}"),
}
}
}
impl From<AttributeArray> for opentelemetry::Array {
fn from(array: AttributeArray) -> Self {
match array {
AttributeArray::Bool(v) => Array::Bool(v),
AttributeArray::I64(v) => Array::I64(v),
AttributeArray::F64(v) => Array::F64(v),
AttributeArray::String(v) => Array::String(v.into_iter().map(|v| v.into()).collect()),
}
}
}
impl From<opentelemetry::Array> for AttributeArray {
fn from(array: opentelemetry::Array) -> Self {
match array {
opentelemetry::Array::Bool(v) => AttributeArray::Bool(v),
opentelemetry::Array::I64(v) => AttributeArray::I64(v),
opentelemetry::Array::F64(v) => AttributeArray::F64(v),
opentelemetry::Array::String(v) => {
AttributeArray::String(v.into_iter().map(|v| v.into()).collect())
}
_ => unreachable!("unexpected opentelemetry::Array variant"),
}
}
}
#[derive(Clone, Debug, Deserialize, JsonSchema, PartialEq)]
#[serde(deny_unknown_fields, untagged)]
pub(crate) enum SamplerOption {
TraceIdRatioBased(f64),
Always(Sampler),
}
#[derive(Clone, Debug, Deserialize, JsonSchema, PartialEq)]
#[serde(deny_unknown_fields, rename_all = "snake_case")]
pub(crate) enum Sampler {
AlwaysOn,
AlwaysOff,
}
impl From<Sampler> for opentelemetry_sdk::trace::Sampler {
fn from(s: Sampler) -> Self {
match s {
Sampler::AlwaysOn => opentelemetry_sdk::trace::Sampler::AlwaysOn,
Sampler::AlwaysOff => opentelemetry_sdk::trace::Sampler::AlwaysOff,
}
}
}
impl From<SamplerOption> for opentelemetry_sdk::trace::Sampler {
fn from(s: SamplerOption) -> Self {
match s {
SamplerOption::Always(s) => s.into(),
SamplerOption::TraceIdRatioBased(ratio) => {
opentelemetry_sdk::trace::Sampler::TraceIdRatioBased(ratio)
}
}
}
}
impl TracingCommon {
pub(crate) fn configure_tracer_provider_builder(
&self,
builder: opentelemetry_sdk::trace::TracerProviderBuilder,
) -> opentelemetry_sdk::trace::TracerProviderBuilder {
let mut sampler: opentelemetry_sdk::trace::Sampler = self.sampler.clone().into();
if self.parent_based_sampler {
sampler = parent_based(sampler);
}
let builder = builder
.with_span_limits(SpanLimits {
max_events_per_span: self.max_events_per_span,
max_attributes_per_span: self.max_attributes_per_span,
max_links_per_span: self.max_links_per_span,
max_attributes_per_event: self.max_attributes_per_event,
max_attributes_per_link: self.max_attributes_per_link,
})
.with_resource(self.to_resource());
if self.preview_datadog_agent_sampling.unwrap_or_default() {
builder.with_sampler(DatadogAgentSampling::new(
sampler,
self.parent_based_sampler,
))
} else {
builder.with_sampler(sampler)
}
}
}
fn parent_based(sampler: opentelemetry_sdk::trace::Sampler) -> opentelemetry_sdk::trace::Sampler {
opentelemetry_sdk::trace::Sampler::ParentBased(Box::new(sampler))
}
impl Conf {
pub(crate) fn calculate_field_level_instrumentation_ratio(&self) -> Result<f64, Error> {
if self
.exporters
.tracing
.common
.preview_datadog_agent_sampling
.unwrap_or_default()
{
let field_ratio = match &self.apollo.field_level_instrumentation_sampler {
SamplerOption::TraceIdRatioBased(ratio) => *ratio,
SamplerOption::Always(Sampler::AlwaysOn) => 1.0,
SamplerOption::Always(Sampler::AlwaysOff) => 0.0,
};
return Ok(field_ratio);
}
Ok(
match (
&self.exporters.tracing.common.sampler,
&self.apollo.field_level_instrumentation_sampler,
) {
(
SamplerOption::TraceIdRatioBased(global_ratio),
SamplerOption::TraceIdRatioBased(field_ratio),
) if field_ratio > global_ratio => {
Err(Error::InvalidFieldLevelInstrumentationSampler)?
}
(
SamplerOption::Always(Sampler::AlwaysOff),
SamplerOption::Always(Sampler::AlwaysOn),
) => Err(Error::InvalidFieldLevelInstrumentationSampler)?,
(
SamplerOption::Always(Sampler::AlwaysOff),
SamplerOption::TraceIdRatioBased(ratio),
) if *ratio != 0.0 => Err(Error::InvalidFieldLevelInstrumentationSampler)?,
(
SamplerOption::TraceIdRatioBased(ratio),
SamplerOption::Always(Sampler::AlwaysOn),
) if *ratio != 1.0 => Err(Error::InvalidFieldLevelInstrumentationSampler)?,
(_, SamplerOption::TraceIdRatioBased(ratio)) if *ratio == 0.0 => 0.0,
(SamplerOption::TraceIdRatioBased(ratio), _) if *ratio == 0.0 => 0.0,
(_, SamplerOption::Always(Sampler::AlwaysOn)) => 1.0,
(
SamplerOption::TraceIdRatioBased(global_ratio),
SamplerOption::TraceIdRatioBased(field_ratio),
) => field_ratio / global_ratio,
(
SamplerOption::Always(Sampler::AlwaysOn),
SamplerOption::TraceIdRatioBased(field_ratio),
) => *field_ratio,
(_, _) => 0.0,
},
)
}
pub(crate) fn metrics_reference_mode(
configuration: &Configuration,
) -> ApolloMetricsReferenceMode {
match configuration.apollo_plugins.plugins.get("telemetry") {
Some(telemetry_config) => {
match serde_json::from_value::<Conf>(telemetry_config.clone()) {
Ok(conf) => conf.apollo.metrics_reference_mode,
_ => ApolloMetricsReferenceMode::default(),
}
}
_ => ApolloMetricsReferenceMode::default(),
}
}
pub(crate) fn signature_normalization_algorithm(
configuration: &Configuration,
) -> ApolloSignatureNormalizationAlgorithm {
match configuration.apollo_plugins.plugins.get("telemetry") {
Some(telemetry_config) => {
match serde_json::from_value::<Conf>(telemetry_config.clone()) {
Ok(conf) => conf.apollo.signature_normalization_algorithm,
_ => ApolloSignatureNormalizationAlgorithm::default(),
}
}
_ => ApolloSignatureNormalizationAlgorithm::default(),
}
}
pub(crate) fn apollo(configuration: &Configuration) -> ApolloTelemetryConfig {
match configuration.apollo_plugins.plugins.get("telemetry") {
Some(telemetry_config) => {
match serde_json::from_value::<Conf>(telemetry_config.clone()) {
Ok(conf) => conf.apollo,
_ => ApolloTelemetryConfig::default(),
}
}
_ => ApolloTelemetryConfig::default(),
}
}
}
#[cfg(test)]
mod tests {
use opentelemetry::metrics::MeterProvider;
use opentelemetry_sdk::metrics::InMemoryMetricExporter;
use opentelemetry_sdk::metrics::MeterProviderBuilder;
use opentelemetry_sdk::metrics::data::MetricData;
use opentelemetry_sdk::metrics::periodic_reader_with_async_runtime::PeriodicReader;
use opentelemetry_sdk::runtime;
use serde_json::json;
use super::*;
#[test]
fn test_attribute_value_from_json() {
assert_eq!(
AttributeValue::try_from(json!("foo")),
Ok(AttributeValue::String("foo".to_string()))
);
assert_eq!(
AttributeValue::try_from(json!(1)),
Ok(AttributeValue::I64(1))
);
assert_eq!(
AttributeValue::try_from(json!(1.1)),
Ok(AttributeValue::F64(1.1))
);
assert_eq!(
AttributeValue::try_from(json!(true)),
Ok(AttributeValue::Bool(true))
);
assert_eq!(
AttributeValue::try_from(json!(["foo", "bar"])),
Ok(AttributeValue::Array(AttributeArray::String(vec![
"foo".to_string(),
"bar".to_string()
])))
);
assert_eq!(
AttributeValue::try_from(json!([1, 2])),
Ok(AttributeValue::Array(AttributeArray::I64(vec![1, 2])))
);
assert_eq!(
AttributeValue::try_from(json!([1.1, 1.5])),
Ok(AttributeValue::Array(AttributeArray::F64(vec![1.1, 1.5])))
);
assert_eq!(
AttributeValue::try_from(json!([true, false])),
Ok(AttributeValue::Array(AttributeArray::Bool(vec![
true, false
])))
);
AttributeValue::try_from(json!(["foo", true])).expect_err("mixed conversion must fail");
AttributeValue::try_from(json!([1, true])).expect_err("mixed conversion must fail");
AttributeValue::try_from(json!([1.1, true])).expect_err("mixed conversion must fail");
AttributeValue::try_from(json!([true, "bar"])).expect_err("mixed conversion must fail");
}
#[test]
fn test_metric_view_rename_deserialization() {
let json_config = json!({
"name": "http.server.request.duration",
"rename": "apollo.router.http.duration"
});
let view: MetricView = serde_json::from_value(json_config).expect("should deserialize");
assert_eq!(view.name, "http.server.request.duration");
assert_eq!(view.rename, Some("apollo.router.http.duration".to_string()));
assert_eq!(view.description, None);
assert_eq!(view.unit, None);
assert_eq!(view.aggregation, None);
assert_eq!(view.allowed_attribute_keys, None);
}
#[test]
fn test_metric_view_without_rename() {
let json_config = json!({
"name": "http.server.request.duration",
"description": "HTTP request duration"
});
let view: MetricView = serde_json::from_value(json_config).expect("should deserialize");
assert_eq!(view.name, "http.server.request.duration");
assert_eq!(view.rename, None);
assert_eq!(view.description, Some("HTTP request duration".to_string()));
}
#[test]
fn test_metric_view_with_all_fields() {
let json_config = json!({
"name": "http.server.request.duration",
"rename": "custom.metric.name",
"description": "Custom description",
"unit": "s",
"aggregation": {
"histogram": {
"buckets": [0.1, 0.5, 1.0, 5.0]
}
}
});
let view: MetricView = serde_json::from_value(json_config).expect("should deserialize");
assert_eq!(view.name, "http.server.request.duration");
assert_eq!(view.rename, Some("custom.metric.name".to_string()));
assert_eq!(view.description, Some("Custom description".to_string()));
assert_eq!(view.unit, Some("s".to_string()));
assert!(view.aggregation.is_some());
}
#[test]
fn test_default_histogram_creates_view_with_buckets() {
let boundaries = vec![0.1, 0.5, 1.0, 5.0];
let view = MetricView::default_histogram("my.metric".to_string(), boundaries.clone());
assert_eq!(view.name, "my.metric");
assert_eq!(view.rename, None);
assert_eq!(view.description, None);
assert_eq!(view.unit, None);
assert_eq!(
view.aggregation,
Some(MetricAggregation::Histogram {
buckets: boundaries
})
);
assert_eq!(view.allowed_attribute_keys, None);
}
#[test]
fn test_merge_user_overrides_all_fields() {
let default =
MetricView::default_histogram("my.histogram".to_string(), vec![0.1, 0.5, 1.0]);
let user = MetricView {
name: "my.histogram".to_string(),
rename: Some("renamed.histogram".to_string()),
description: Some("User description".to_string()),
unit: Some("ms".to_string()),
aggregation: Some(MetricAggregation::Histogram {
buckets: vec![1.0, 5.0, 10.0],
}),
allowed_attribute_keys: Some(HashSet::from(["key1".to_string()])),
};
let merged = default.merge(user);
assert_eq!(merged.name, "my.histogram");
assert_eq!(merged.rename, Some("renamed.histogram".to_string()));
assert_eq!(merged.description, Some("User description".to_string()));
assert_eq!(merged.unit, Some("ms".to_string()));
assert_eq!(
merged.aggregation,
Some(MetricAggregation::Histogram {
buckets: vec![1.0, 5.0, 10.0]
})
);
assert_eq!(
merged.allowed_attribute_keys,
Some(HashSet::from(["key1".to_string()]))
);
}
#[test]
fn test_merge_user_specifies_nothing_preserves_defaults() {
let default_buckets = vec![0.1, 0.5, 1.0];
let default =
MetricView::default_histogram("my.histogram".to_string(), default_buckets.clone());
let user = MetricView {
name: "my.histogram".to_string(),
rename: None,
description: None,
unit: None,
aggregation: None,
allowed_attribute_keys: None,
};
let merged = default.merge(user);
assert_eq!(merged.name, "my.histogram");
assert_eq!(merged.rename, None);
assert_eq!(merged.description, None);
assert_eq!(merged.unit, None);
assert_eq!(
merged.aggregation,
Some(MetricAggregation::Histogram {
buckets: default_buckets
}),
"default histogram aggregation should be preserved when user specifies none"
);
assert_eq!(merged.allowed_attribute_keys, None);
}
#[test]
fn test_merge_partial_override_preserves_default_aggregation() {
let default_buckets = vec![0.001, 0.005, 0.015, 0.05, 0.1];
let default = MetricView::default_histogram(
"http.server.request.duration".to_string(),
default_buckets.clone(),
);
let user = MetricView {
name: "http.server.request.duration".to_string(),
rename: None,
description: Some("Custom description".to_string()),
unit: None,
aggregation: None,
allowed_attribute_keys: Some(HashSet::from([
"http.method".to_string(),
"http.status_code".to_string(),
])),
};
let merged = default.merge(user);
assert_eq!(
merged.aggregation,
Some(MetricAggregation::Histogram {
buckets: default_buckets
}),
"default histogram buckets should be inherited when user doesn't specify aggregation"
);
assert_eq!(merged.description, Some("Custom description".to_string()));
assert_eq!(
merged.allowed_attribute_keys,
Some(HashSet::from([
"http.method".to_string(),
"http.status_code".to_string(),
]))
);
}
#[test]
fn test_merge_user_drop_overrides_default_histogram() {
let default =
MetricView::default_histogram("noisy.metric".to_string(), vec![0.1, 0.5, 1.0]);
let user = MetricView {
name: "noisy.metric".to_string(),
rename: None,
description: None,
unit: None,
aggregation: Some(MetricAggregation::Drop),
allowed_attribute_keys: None,
};
let merged = default.merge(user);
assert_eq!(
merged.aggregation,
Some(MetricAggregation::Drop),
"user Drop aggregation should override default histogram"
);
}
fn get_histogram_bounds(
exporter: &InMemoryMetricExporter,
metric_name: &str,
) -> Option<Vec<f64>> {
let metrics = exporter.get_finished_metrics().ok()?;
for resource_metrics in metrics {
for scope_metrics in resource_metrics.scope_metrics() {
for metric in scope_metrics.metrics() {
if metric.name() == metric_name
&& let opentelemetry_sdk::metrics::data::AggregatedMetrics::F64(
MetricData::Histogram(histogram),
) = metric.data()
&& let Some(dp) = histogram.data_points().next()
{
return Some(dp.bounds().collect());
}
}
}
}
None
}
fn metric_exists(exporter: &InMemoryMetricExporter, metric_name: &str) -> bool {
let Ok(metrics) = exporter.get_finished_metrics() else {
return false;
};
metrics
.iter()
.flat_map(|rm| rm.scope_metrics())
.flat_map(|sm| sm.metrics())
.any(|m| m.name() == metric_name)
}
#[tokio::test(flavor = "multi_thread")]
async fn test_user_custom_buckets_are_applied() {
let exporter = InMemoryMetricExporter::default();
let custom_buckets = vec![0.005, 0.05, 0.5, 5.0];
let view = MetricView {
name: "test.histogram".to_string(),
rename: None,
description: None,
unit: None,
aggregation: Some(MetricAggregation::Histogram {
buckets: custom_buckets.clone(),
}),
allowed_attribute_keys: None,
};
let meter_provider = MeterProviderBuilder::default()
.with_reader(PeriodicReader::builder(exporter.clone(), runtime::Tokio).build())
.with_view(view.into_view_fn())
.build();
let meter = meter_provider.meter("test");
let histogram = meter.f64_histogram("test.histogram").build();
histogram.record(0.1, &[]);
meter_provider.force_flush().unwrap();
let bounds =
get_histogram_bounds(&exporter, "test.histogram").expect("histogram should exist");
assert_eq!(
bounds, custom_buckets,
"histogram should use user-specified custom buckets"
);
}
#[tokio::test(flavor = "multi_thread")]
async fn test_merged_view_inherits_default_buckets() {
let exporter = InMemoryMetricExporter::default();
let default_buckets = vec![0.001, 0.01, 0.1, 1.0, 10.0];
let default_view =
MetricView::default_histogram("test.histogram".to_string(), default_buckets.clone());
let user_view = MetricView {
name: "test.histogram".to_string(),
rename: None,
description: Some("Custom description".to_string()),
unit: None,
aggregation: None, allowed_attribute_keys: None,
};
let merged = default_view.merge(user_view);
let meter_provider = MeterProviderBuilder::default()
.with_reader(PeriodicReader::builder(exporter.clone(), runtime::Tokio).build())
.with_view(merged.into_view_fn())
.build();
let meter = meter_provider.meter("test");
let histogram = meter.f64_histogram("test.histogram").build();
histogram.record(0.05, &[]);
meter_provider.force_flush().unwrap();
let bounds =
get_histogram_bounds(&exporter, "test.histogram").expect("histogram should exist");
assert_eq!(
bounds, default_buckets,
"merged view should inherit default buckets when user doesn't specify aggregation"
);
}
#[tokio::test(flavor = "multi_thread")]
async fn test_drop_aggregation_suppresses_metric() {
let exporter = InMemoryMetricExporter::default();
let view = MetricView {
name: "dropped.histogram".to_string(),
rename: None,
description: None,
unit: None,
aggregation: Some(MetricAggregation::Drop),
allowed_attribute_keys: None,
};
let meter_provider = MeterProviderBuilder::default()
.with_reader(PeriodicReader::builder(exporter.clone(), runtime::Tokio).build())
.with_view(view.into_view_fn())
.build();
let meter = meter_provider.meter("test");
let histogram = meter.f64_histogram("dropped.histogram").build();
histogram.record(1.0, &[]);
meter_provider.force_flush().unwrap();
assert!(
!metric_exists(&exporter, "dropped.histogram"),
"metric with Drop aggregation should not be exported"
);
}
#[tokio::test(flavor = "multi_thread")]
async fn test_user_buckets_override_merged_defaults() {
let exporter = InMemoryMetricExporter::default();
let default_buckets = vec![0.001, 0.01, 0.1, 1.0, 10.0];
let user_buckets = vec![1.0, 5.0, 10.0, 50.0];
let default_view =
MetricView::default_histogram("test.histogram".to_string(), default_buckets);
let user_view = MetricView {
name: "test.histogram".to_string(),
rename: None,
description: None,
unit: None,
aggregation: Some(MetricAggregation::Histogram {
buckets: user_buckets.clone(),
}),
allowed_attribute_keys: None,
};
let merged = default_view.merge(user_view);
let meter_provider = MeterProviderBuilder::default()
.with_reader(PeriodicReader::builder(exporter.clone(), runtime::Tokio).build())
.with_view(merged.into_view_fn())
.build();
let meter = meter_provider.meter("test");
let histogram = meter.f64_histogram("test.histogram").build();
histogram.record(2.5, &[]);
meter_provider.force_flush().unwrap();
let bounds =
get_histogram_bounds(&exporter, "test.histogram").expect("histogram should exist");
assert_eq!(
bounds, user_buckets,
"user-specified buckets should override default buckets in merged view"
);
}
}