use std::collections::BTreeMap;
use std::collections::HashSet;
use axum::headers::HeaderName;
use derivative::Derivative;
use num_traits::ToPrimitive;
use opentelemetry::sdk::metrics::new_view;
use opentelemetry::sdk::metrics::Aggregation;
use opentelemetry::sdk::metrics::Instrument;
use opentelemetry::sdk::metrics::Stream;
use opentelemetry::sdk::metrics::View;
use opentelemetry::sdk::trace::SpanLimits;
use opentelemetry::Array;
use opentelemetry::Value;
use opentelemetry_api::metrics::MetricsError;
use opentelemetry_api::metrics::Unit;
use schemars::JsonSchema;
use serde::Deserialize;
use serde::Serialize;
use super::metrics::MetricsAttributesConf;
use super::*;
use crate::plugin::serde::deserialize_option_header_name;
use crate::plugins::telemetry::metrics;
use crate::plugins::telemetry::resource::ConfigResource;
use crate::Configuration;
#[derive(thiserror::Error, Debug)]
pub(crate) enum Error {
#[error("field level instrumentation sampler must sample less frequently than tracing level sampler")]
InvalidFieldLevelInstrumentationSampler,
}
pub(in crate::plugins::telemetry) trait GenericWith<T>
where
Self: Sized,
{
fn with<B>(self, option: &Option<B>, apply: fn(Self, &B) -> Self) -> Self {
if let Some(option) = option {
return apply(self, option);
}
self
}
fn try_with<B>(
self,
option: &Option<B>,
apply: fn(Self, &B) -> Result<Self, BoxError>,
) -> Result<Self, BoxError> {
if let Some(option) = option {
return apply(self, option);
}
Ok(self)
}
}
impl<T> GenericWith<T> for T where Self: Sized {}
#[derive(Clone, Default, Debug, Deserialize, JsonSchema)]
#[serde(deny_unknown_fields, default)]
pub(crate) struct Conf {
pub(crate) apollo: apollo::Config,
pub(crate) exporters: Exporters,
pub(crate) instrumentation: Instrumentation,
}
#[derive(Clone, Default, Debug, Deserialize, JsonSchema)]
#[serde(deny_unknown_fields, default)]
pub(crate) struct Exporters {
pub(crate) logging: config_new::logging::Logging,
pub(crate) metrics: Metrics,
pub(crate) tracing: Tracing,
}
#[derive(Clone, Default, Debug, Deserialize, JsonSchema)]
#[serde(deny_unknown_fields, default)]
pub(crate) struct Instrumentation {
pub(crate) events: config_new::events::Events,
pub(crate) spans: config_new::spans::Spans,
pub(crate) instruments: config_new::instruments::InstrumentsConfig,
}
impl Instrumentation {
pub(crate) fn validate(&self) -> Result<(), String> {
self.events.validate()?;
self.instruments.validate()?;
self.spans.validate()
}
}
#[derive(Clone, Default, Debug, Deserialize, JsonSchema)]
#[serde(deny_unknown_fields, default)]
pub(crate) struct Metrics {
pub(crate) common: MetricsCommon,
pub(crate) otlp: otlp::Config,
pub(crate) prometheus: metrics::prometheus::Config,
}
#[derive(Clone, Debug, Deserialize, JsonSchema)]
#[serde(deny_unknown_fields, default)]
pub(crate) struct MetricsCommon {
pub(crate) attributes: MetricsAttributesConf,
pub(crate) service_name: Option<String>,
pub(crate) service_namespace: Option<String>,
pub(crate) resource: BTreeMap<String, AttributeValue>,
pub(crate) buckets: Vec<f64>,
pub(crate) views: Vec<MetricView>,
}
impl Default for MetricsCommon {
fn default() -> Self {
Self {
attributes: Default::default(),
service_name: None,
service_namespace: None,
resource: BTreeMap::new(),
views: Vec::with_capacity(0),
buckets: vec![
0.001, 0.005, 0.015, 0.05, 0.1, 0.2, 0.3, 0.4, 0.5, 1.0, 5.0, 10.0,
],
}
}
}
#[derive(Clone, Debug, Deserialize, JsonSchema, PartialEq)]
#[serde(deny_unknown_fields)]
pub(crate) struct MetricView {
pub(crate) name: String,
pub(crate) description: Option<String>,
pub(crate) unit: Option<String>,
pub(crate) aggregation: Option<MetricAggregation>,
pub(crate) allowed_attribute_keys: Option<HashSet<String>>,
}
impl TryInto<Box<dyn View>> for MetricView {
type Error = MetricsError;
fn try_into(self) -> Result<Box<dyn View>, Self::Error> {
let aggregation = self.aggregation.map(|aggregation| match aggregation {
MetricAggregation::Histogram { buckets } => Aggregation::ExplicitBucketHistogram {
boundaries: buckets,
record_min_max: true,
},
MetricAggregation::Drop => Aggregation::Drop,
});
let instrument = Instrument::new().name(self.name);
let mut mask = Stream::new();
if let Some(desc) = self.description {
mask = mask.description(desc);
}
if let Some(unit) = self.unit {
mask = mask.unit(Unit::new(unit));
}
if let Some(aggregation) = aggregation {
mask = mask.aggregation(aggregation);
}
if let Some(allowed_attribute_keys) = self.allowed_attribute_keys {
mask = mask.allowed_attribute_keys(allowed_attribute_keys.into_iter().map(Key::new));
}
new_view(instrument, mask)
}
}
#[derive(Clone, Debug, Deserialize, JsonSchema, PartialEq)]
#[serde(deny_unknown_fields, rename_all = "snake_case")]
pub(crate) enum MetricAggregation {
Histogram { buckets: Vec<f64> },
Drop,
}
#[derive(Clone, Default, Debug, Deserialize, JsonSchema)]
#[serde(deny_unknown_fields, default)]
pub(crate) struct Tracing {
#[serde(default, rename = "experimental_response_trace_id")]
pub(crate) response_trace_id: ExposeTraceId,
pub(crate) propagation: Propagation,
pub(crate) common: TracingCommon,
pub(crate) otlp: otlp::Config,
pub(crate) jaeger: tracing::jaeger::Config,
pub(crate) zipkin: tracing::zipkin::Config,
pub(crate) datadog: tracing::datadog::Config,
}
#[derive(Clone, Default, Debug, Deserialize, JsonSchema)]
#[serde(deny_unknown_fields, default)]
pub(crate) struct ExposeTraceId {
pub(crate) enabled: bool,
#[schemars(with = "Option<String>")]
#[serde(deserialize_with = "deserialize_option_header_name")]
pub(crate) header_name: Option<HeaderName>,
pub(crate) format: TraceIdFormat,
}
#[derive(Clone, Default, Debug, Deserialize, JsonSchema, PartialEq, Eq)]
#[serde(deny_unknown_fields, rename_all = "snake_case")]
pub(crate) enum TraceIdFormat {
#[default]
Hexadecimal,
OpenTelemetry,
Decimal,
Datadog,
Uuid,
}
impl TraceIdFormat {
pub(crate) fn format(&self, trace_id: TraceId) -> String {
match self {
TraceIdFormat::Hexadecimal | TraceIdFormat::OpenTelemetry => {
format!("{:032x}", trace_id)
}
TraceIdFormat::Decimal => format!("{}", u128::from_be_bytes(trace_id.to_bytes())),
TraceIdFormat::Datadog => trace_id.to_datadog(),
TraceIdFormat::Uuid => Uuid::from_bytes(trace_id.to_bytes()).to_string(),
}
}
}
#[derive(Clone, PartialEq, Eq, Default, Derivative, Serialize, Deserialize, JsonSchema)]
#[derivative(Debug)]
#[serde(deny_unknown_fields, rename_all = "lowercase")]
pub(crate) enum ApolloSignatureNormalizationAlgorithm {
#[default]
Legacy,
Enhanced,
}
#[derive(Clone, Default, Debug, Deserialize, JsonSchema, Copy)]
#[serde(deny_unknown_fields, rename_all = "lowercase")]
pub(crate) enum ApolloMetricsReferenceMode {
Extended,
#[default]
Standard,
}
#[derive(Clone, Default, Debug, Deserialize, JsonSchema)]
#[serde(deny_unknown_fields, default)]
pub(crate) struct Propagation {
pub(crate) request: RequestPropagation,
pub(crate) baggage: bool,
pub(crate) trace_context: bool,
pub(crate) jaeger: bool,
pub(crate) datadog: bool,
pub(crate) zipkin: bool,
pub(crate) aws_xray: bool,
}
#[derive(Clone, Debug, Deserialize, JsonSchema, Default)]
#[serde(deny_unknown_fields)]
pub(crate) struct RequestPropagation {
#[schemars(with = "String")]
#[serde(deserialize_with = "deserialize_option_header_name")]
pub(crate) header_name: Option<HeaderName>,
#[serde(default)]
pub(crate) format: TraceIdFormat,
}
#[derive(Debug, Clone, Deserialize, JsonSchema)]
#[serde(deny_unknown_fields, default)]
#[non_exhaustive]
pub(crate) struct TracingCommon {
pub(crate) service_name: Option<String>,
pub(crate) service_namespace: Option<String>,
pub(crate) sampler: SamplerOption,
pub(crate) parent_based_sampler: bool,
pub(crate) max_events_per_span: u32,
pub(crate) max_attributes_per_span: u32,
pub(crate) max_links_per_span: u32,
pub(crate) max_attributes_per_event: u32,
pub(crate) max_attributes_per_link: u32,
pub(crate) resource: BTreeMap<String, AttributeValue>,
}
impl ConfigResource for TracingCommon {
fn service_name(&self) -> &Option<String> {
&self.service_name
}
fn service_namespace(&self) -> &Option<String> {
&self.service_namespace
}
fn resource(&self) -> &BTreeMap<String, AttributeValue> {
&self.resource
}
}
impl ConfigResource for MetricsCommon {
fn service_name(&self) -> &Option<String> {
&self.service_name
}
fn service_namespace(&self) -> &Option<String> {
&self.service_namespace
}
fn resource(&self) -> &BTreeMap<String, AttributeValue> {
&self.resource
}
}
fn default_parent_based_sampler() -> bool {
true
}
fn default_sampler() -> SamplerOption {
SamplerOption::Always(Sampler::AlwaysOn)
}
impl Default for TracingCommon {
fn default() -> Self {
Self {
service_name: Default::default(),
service_namespace: Default::default(),
sampler: default_sampler(),
parent_based_sampler: default_parent_based_sampler(),
max_events_per_span: default_max_events_per_span(),
max_attributes_per_span: default_max_attributes_per_span(),
max_links_per_span: default_max_links_per_span(),
max_attributes_per_event: default_max_attributes_per_event(),
max_attributes_per_link: default_max_attributes_per_link(),
resource: Default::default(),
}
}
}
fn default_max_events_per_span() -> u32 {
SpanLimits::default().max_events_per_span
}
fn default_max_attributes_per_span() -> u32 {
SpanLimits::default().max_attributes_per_span
}
fn default_max_links_per_span() -> u32 {
SpanLimits::default().max_links_per_span
}
fn default_max_attributes_per_event() -> u32 {
SpanLimits::default().max_attributes_per_event
}
fn default_max_attributes_per_link() -> u32 {
SpanLimits::default().max_attributes_per_link
}
#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, PartialEq)]
#[serde(untagged, deny_unknown_fields)]
pub(crate) enum AttributeValue {
Bool(bool),
I64(i64),
F64(f64),
String(String),
Array(AttributeArray),
}
impl AttributeValue {
pub(crate) fn as_f64(&self) -> Option<f64> {
match self {
AttributeValue::Bool(_) => None,
AttributeValue::I64(v) => Some(*v as f64),
AttributeValue::F64(v) => Some(*v),
AttributeValue::String(v) => v.parse::<f64>().ok(),
AttributeValue::Array(_) => None,
}
}
}
impl From<String> for AttributeValue {
fn from(value: String) -> Self {
AttributeValue::String(value)
}
}
impl From<&'static str> for AttributeValue {
fn from(value: &'static str) -> Self {
AttributeValue::String(value.to_string())
}
}
impl From<bool> for AttributeValue {
fn from(value: bool) -> Self {
AttributeValue::Bool(value)
}
}
impl From<f64> for AttributeValue {
fn from(value: f64) -> Self {
AttributeValue::F64(value)
}
}
impl From<i64> for AttributeValue {
fn from(value: i64) -> Self {
AttributeValue::I64(value)
}
}
impl std::fmt::Display for AttributeValue {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
AttributeValue::Bool(val) => write!(f, "{val}"),
AttributeValue::I64(val) => write!(f, "{val}"),
AttributeValue::F64(val) => write!(f, "{val}"),
AttributeValue::String(val) => write!(f, "{val}"),
AttributeValue::Array(val) => write!(f, "{val}"),
}
}
}
impl PartialOrd for AttributeValue {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
match (self, other) {
(AttributeValue::Bool(b1), AttributeValue::Bool(b2)) => b1.partial_cmp(b2),
(AttributeValue::F64(f1), AttributeValue::F64(f2)) => f1.partial_cmp(f2),
(AttributeValue::I64(i1), AttributeValue::I64(i2)) => i1.partial_cmp(i2),
(AttributeValue::String(s1), AttributeValue::String(s2)) => s1.partial_cmp(s2),
(AttributeValue::F64(f1), AttributeValue::I64(i)) => {
i.to_f64().as_ref().and_then(|f2| f1.partial_cmp(f2))
}
(AttributeValue::I64(i), AttributeValue::F64(f)) => i.to_f64()?.partial_cmp(f),
_ => None,
}
}
}
impl TryFrom<serde_json::Value> for AttributeValue {
type Error = ();
fn try_from(value: serde_json::Value) -> Result<Self, Self::Error> {
match value {
serde_json::Value::Null => Err(()),
serde_json::Value::Bool(v) => Ok(AttributeValue::Bool(v)),
serde_json::Value::Number(v) if v.is_i64() => {
Ok(AttributeValue::I64(v.as_i64().expect("i64 checked")))
}
serde_json::Value::Number(v) if v.is_f64() => {
Ok(AttributeValue::F64(v.as_f64().expect("f64 checked")))
}
serde_json::Value::String(v) => Ok(AttributeValue::String(v)),
serde_json::Value::Array(v) => {
if v.iter().all(|v| v.is_boolean()) {
Ok(AttributeValue::Array(AttributeArray::Bool(
v.iter()
.map(|v| v.as_bool().expect("all bools checked"))
.collect(),
)))
} else if v.iter().all(|v| v.is_f64()) {
Ok(AttributeValue::Array(AttributeArray::F64(
v.iter()
.map(|v| v.as_f64().expect("all f64 checked"))
.collect(),
)))
} else if v.iter().all(|v| v.is_i64()) {
Ok(AttributeValue::Array(AttributeArray::I64(
v.iter()
.map(|v| v.as_i64().expect("all i64 checked"))
.collect(),
)))
} else if v.iter().all(|v| v.is_string()) {
Ok(AttributeValue::Array(AttributeArray::String(
v.iter()
.map(|v| v.as_str().expect("all strings checked").to_string())
.collect(),
)))
} else {
Err(())
}
}
serde_json::Value::Object(_v) => Err(()),
_ => Err(()),
}
}
}
impl From<AttributeValue> for opentelemetry::Value {
fn from(value: AttributeValue) -> Self {
match value {
AttributeValue::Bool(v) => Value::Bool(v),
AttributeValue::I64(v) => Value::I64(v),
AttributeValue::F64(v) => Value::F64(v),
AttributeValue::String(v) => Value::String(v.into()),
AttributeValue::Array(v) => Value::Array(v.into()),
}
}
}
#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, PartialEq)]
#[serde(untagged, deny_unknown_fields)]
pub(crate) enum AttributeArray {
Bool(Vec<bool>),
I64(Vec<i64>),
F64(Vec<f64>),
String(Vec<String>),
}
impl std::fmt::Display for AttributeArray {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
AttributeArray::Bool(val) => write!(f, "{val:?}"),
AttributeArray::I64(val) => write!(f, "{val:?}"),
AttributeArray::F64(val) => write!(f, "{val:?}"),
AttributeArray::String(val) => write!(f, "{val:?}"),
}
}
}
impl From<AttributeArray> for opentelemetry::Array {
fn from(array: AttributeArray) -> Self {
match array {
AttributeArray::Bool(v) => Array::Bool(v),
AttributeArray::I64(v) => Array::I64(v),
AttributeArray::F64(v) => Array::F64(v),
AttributeArray::String(v) => Array::String(v.into_iter().map(|v| v.into()).collect()),
}
}
}
impl From<opentelemetry::Array> for AttributeArray {
fn from(array: opentelemetry::Array) -> Self {
match array {
opentelemetry::Array::Bool(v) => AttributeArray::Bool(v),
opentelemetry::Array::I64(v) => AttributeArray::I64(v),
opentelemetry::Array::F64(v) => AttributeArray::F64(v),
opentelemetry::Array::String(v) => {
AttributeArray::String(v.into_iter().map(|v| v.into()).collect())
}
}
}
}
#[derive(Clone, Debug, Deserialize, JsonSchema)]
#[serde(deny_unknown_fields, untagged)]
pub(crate) enum SamplerOption {
TraceIdRatioBased(f64),
Always(Sampler),
}
#[derive(Clone, Debug, Deserialize, JsonSchema)]
#[serde(deny_unknown_fields, rename_all = "snake_case")]
pub(crate) enum Sampler {
AlwaysOn,
AlwaysOff,
}
impl From<Sampler> for opentelemetry::sdk::trace::Sampler {
fn from(s: Sampler) -> Self {
match s {
Sampler::AlwaysOn => opentelemetry::sdk::trace::Sampler::AlwaysOn,
Sampler::AlwaysOff => opentelemetry::sdk::trace::Sampler::AlwaysOff,
}
}
}
impl From<SamplerOption> for opentelemetry::sdk::trace::Sampler {
fn from(s: SamplerOption) -> Self {
match s {
SamplerOption::Always(s) => s.into(),
SamplerOption::TraceIdRatioBased(ratio) => {
opentelemetry::sdk::trace::Sampler::TraceIdRatioBased(ratio)
}
}
}
}
impl From<&TracingCommon> for opentelemetry::sdk::trace::Config {
fn from(config: &TracingCommon) -> Self {
let mut common = opentelemetry::sdk::trace::config();
let mut sampler: opentelemetry::sdk::trace::Sampler = config.sampler.clone().into();
if config.parent_based_sampler {
sampler = parent_based(sampler);
}
common = common.with_sampler(sampler);
common = common.with_max_events_per_span(config.max_events_per_span);
common = common.with_max_attributes_per_span(config.max_attributes_per_span);
common = common.with_max_links_per_span(config.max_links_per_span);
common = common.with_max_attributes_per_event(config.max_attributes_per_event);
common = common.with_max_attributes_per_link(config.max_attributes_per_link);
common = common.with_resource(config.to_resource());
common
}
}
fn parent_based(sampler: opentelemetry::sdk::trace::Sampler) -> opentelemetry::sdk::trace::Sampler {
opentelemetry::sdk::trace::Sampler::ParentBased(Box::new(sampler))
}
impl Conf {
pub(crate) fn calculate_field_level_instrumentation_ratio(&self) -> Result<f64, Error> {
Ok(
match (
&self.exporters.tracing.common.sampler,
&self.apollo.field_level_instrumentation_sampler,
) {
(
SamplerOption::TraceIdRatioBased(global_ratio),
SamplerOption::TraceIdRatioBased(field_ratio),
) if field_ratio > global_ratio => {
Err(Error::InvalidFieldLevelInstrumentationSampler)?
}
(
SamplerOption::Always(Sampler::AlwaysOff),
SamplerOption::Always(Sampler::AlwaysOn),
) => Err(Error::InvalidFieldLevelInstrumentationSampler)?,
(
SamplerOption::Always(Sampler::AlwaysOff),
SamplerOption::TraceIdRatioBased(ratio),
) if *ratio != 0.0 => Err(Error::InvalidFieldLevelInstrumentationSampler)?,
(
SamplerOption::TraceIdRatioBased(ratio),
SamplerOption::Always(Sampler::AlwaysOn),
) if *ratio != 1.0 => Err(Error::InvalidFieldLevelInstrumentationSampler)?,
(_, SamplerOption::TraceIdRatioBased(ratio)) if *ratio == 0.0 => 0.0,
(SamplerOption::TraceIdRatioBased(ratio), _) if *ratio == 0.0 => 0.0,
(_, SamplerOption::Always(Sampler::AlwaysOn)) => 1.0,
(
SamplerOption::TraceIdRatioBased(global_ratio),
SamplerOption::TraceIdRatioBased(field_ratio),
) => field_ratio / global_ratio,
(
SamplerOption::Always(Sampler::AlwaysOn),
SamplerOption::TraceIdRatioBased(field_ratio),
) => *field_ratio,
(_, _) => 0.0,
},
)
}
pub(crate) fn metrics_reference_mode(
configuration: &Configuration,
) -> ApolloMetricsReferenceMode {
match configuration.apollo_plugins.plugins.get("telemetry") {
Some(telemetry_config) => {
match serde_json::from_value::<Conf>(telemetry_config.clone()) {
Ok(conf) => conf.apollo.metrics_reference_mode,
_ => ApolloMetricsReferenceMode::default(),
}
}
_ => ApolloMetricsReferenceMode::default(),
}
}
pub(crate) fn signature_normalization_algorithm(
configuration: &Configuration,
) -> ApolloSignatureNormalizationAlgorithm {
match configuration.apollo_plugins.plugins.get("telemetry") {
Some(telemetry_config) => {
match serde_json::from_value::<Conf>(telemetry_config.clone()) {
Ok(conf) => conf.apollo.signature_normalization_algorithm,
_ => ApolloSignatureNormalizationAlgorithm::default(),
}
}
_ => ApolloSignatureNormalizationAlgorithm::default(),
}
}
}
#[cfg(test)]
mod tests {
use serde_json::json;
use super::*;
#[test]
fn test_attribute_value_from_json() {
assert_eq!(
AttributeValue::try_from(json!("foo")),
Ok(AttributeValue::String("foo".to_string()))
);
assert_eq!(
AttributeValue::try_from(json!(1)),
Ok(AttributeValue::I64(1))
);
assert_eq!(
AttributeValue::try_from(json!(1.1)),
Ok(AttributeValue::F64(1.1))
);
assert_eq!(
AttributeValue::try_from(json!(true)),
Ok(AttributeValue::Bool(true))
);
assert_eq!(
AttributeValue::try_from(json!(["foo", "bar"])),
Ok(AttributeValue::Array(AttributeArray::String(vec![
"foo".to_string(),
"bar".to_string()
])))
);
assert_eq!(
AttributeValue::try_from(json!([1, 2])),
Ok(AttributeValue::Array(AttributeArray::I64(vec![1, 2])))
);
assert_eq!(
AttributeValue::try_from(json!([1.1, 1.5])),
Ok(AttributeValue::Array(AttributeArray::F64(vec![1.1, 1.5])))
);
assert_eq!(
AttributeValue::try_from(json!([true, false])),
Ok(AttributeValue::Array(AttributeArray::Bool(vec![
true, false
])))
);
AttributeValue::try_from(json!(["foo", true])).expect_err("mixed conversion must fail");
AttributeValue::try_from(json!([1, true])).expect_err("mixed conversion must fail");
AttributeValue::try_from(json!([1.1, true])).expect_err("mixed conversion must fail");
AttributeValue::try_from(json!([true, "bar"])).expect_err("mixed conversion must fail");
}
}