#[cfg(feature = "grpc-tonic")]
#[allow(deprecated)]
pub(crate) mod tonic {
use opentelemetry::metrics::MetricsError;
use opentelemetry::sdk::export::metrics::{
aggregation::{
Count, Histogram as SdkHistogram, LastValue, Sum as SdkSum, TemporalitySelector,
},
Record,
};
use opentelemetry::sdk::metrics::aggregators::{
HistogramAggregator, LastValueAggregator, SumAggregator,
};
use opentelemetry::sdk::InstrumentationLibrary;
use opentelemetry_proto::tonic::metrics::v1::DataPointFlags;
use opentelemetry_proto::tonic::FromNumber;
use opentelemetry_proto::tonic::{
collector::metrics::v1::ExportMetricsServiceRequest,
common::v1::KeyValue,
metrics::v1::{
metric::Data, number_data_point, AggregationTemporality, Gauge, Histogram,
HistogramDataPoint, InstrumentationLibraryMetrics, Metric, NumberDataPoint,
ResourceMetrics, Sum,
},
};
use crate::to_nanos;
use crate::transform::{CheckpointedMetrics, ResourceWrapper};
use std::collections::{BTreeMap, HashMap};
pub(crate) fn record_to_metric(
record: &Record,
temporality_selector: &dyn TemporalitySelector,
) -> Result<Metric, MetricsError> {
let descriptor = record.descriptor();
let aggregator = record.aggregator().ok_or(MetricsError::NoDataCollected)?;
let attributes = record
.attributes()
.iter()
.map(|kv| kv.into())
.collect::<Vec<KeyValue>>();
let temporality: AggregationTemporality = temporality_selector
.temporality_for(descriptor, aggregator.aggregation().kind())
.into();
let kind = descriptor.number_kind();
Ok(Metric {
name: descriptor.name().to_string(),
description: descriptor.description().cloned().unwrap_or_default(),
unit: descriptor.unit().unwrap_or("").to_string(),
data: {
if let Some(last_value) = aggregator.as_any().downcast_ref::<LastValueAggregator>()
{
Some({
let (val, sample_time) = last_value.last_value()?;
Data::Gauge(Gauge {
data_points: vec![NumberDataPoint {
flags: DataPointFlags::FlagNone as u32,
attributes,
start_time_unix_nano: to_nanos(*record.start_time()),
time_unix_nano: to_nanos(sample_time),
value: Some(number_data_point::Value::from_number(val, kind)),
exemplars: Vec::default(),
}],
})
})
} else if let Some(sum) = aggregator.as_any().downcast_ref::<SumAggregator>() {
Some({
let val = sum.sum()?;
Data::Sum(Sum {
data_points: vec![NumberDataPoint {
flags: DataPointFlags::FlagNone as u32,
attributes,
start_time_unix_nano: to_nanos(*record.start_time()),
time_unix_nano: to_nanos(*record.end_time()),
value: Some(number_data_point::Value::from_number(val, kind)),
exemplars: Vec::default(),
}],
aggregation_temporality: temporality as i32,
is_monotonic: descriptor.instrument_kind().monotonic(),
})
})
} else if let Some(histogram) =
aggregator.as_any().downcast_ref::<HistogramAggregator>()
{
Some({
let (sum, count, buckets) =
(histogram.sum()?, histogram.count()?, histogram.histogram()?);
Data::Histogram(Histogram {
data_points: vec![HistogramDataPoint {
flags: DataPointFlags::FlagNone as u32,
attributes,
start_time_unix_nano: to_nanos(*record.start_time()),
time_unix_nano: to_nanos(*record.end_time()),
count,
sum: sum.to_f64(kind),
bucket_counts: buckets
.counts()
.iter()
.cloned()
.map(|c| c as u64)
.collect(),
explicit_bounds: buckets.boundaries().clone(),
exemplars: Vec::default(),
}],
aggregation_temporality: temporality as i32,
})
})
} else {
None
}
},
})
}
#[allow(clippy::map_entry)] pub(crate) fn sink(metrics: Vec<CheckpointedMetrics>) -> ExportMetricsServiceRequest {
let mut sink_map = BTreeMap::<
ResourceWrapper,
HashMap<InstrumentationLibrary, HashMap<String, Metric>>,
>::new();
for (resource, instrumentation_library, metric) in metrics {
if sink_map.contains_key(&resource) {
sink_map.entry(resource).and_modify(|map| {
if map.contains_key(&instrumentation_library) {
map.entry(instrumentation_library).and_modify(|map| {
if map.contains_key(&metric.name) {
map.entry(metric.name.clone())
.and_modify(|base| merge(base, metric));
} else {
map.insert(metric.name.clone(), metric);
}
});
} else {
map.insert(instrumentation_library, {
let mut map = HashMap::new();
map.insert(metric.name.clone(), metric);
map
});
}
});
} else {
sink_map.insert(resource, {
let mut map = HashMap::new();
map.insert(instrumentation_library, {
let mut map = HashMap::new();
map.insert(metric.name.clone(), metric);
map
});
map
});
}
}
ExportMetricsServiceRequest {
resource_metrics: sink_map
.into_iter()
.map(|(resource, metric_map)| ResourceMetrics {
schema_url: resource
.schema_url()
.map(|s| s.to_string())
.unwrap_or_default(),
resource: Some(resource.into()),
instrumentation_library_metrics: metric_map
.into_iter()
.map(
|(instrumentation_library, metrics)| InstrumentationLibraryMetrics {
schema_url: instrumentation_library
.schema_url
.clone()
.unwrap_or_default()
.to_string(),
instrumentation_library: Some(instrumentation_library.into()),
metrics: metrics
.into_iter()
.map(|(_k, v)| v)
.collect::<Vec<Metric>>(), },
)
.collect::<Vec<InstrumentationLibraryMetrics>>(),
})
.collect::<Vec<ResourceMetrics>>(),
}
}
macro_rules! merge_compatible_type {
($base: ident, $other: ident,
$ (
$t:path => $($other_t: path),*
) ; *) => {
match &mut $base.data {
$(
Some($t(base_data)) => {
match $other.data {
$(
Some($other_t(other_data)) => {
if other_data.data_points.len() > 0 {
base_data.data_points.extend(other_data.data_points);
}
},
)*
_ => {}
}
},
)*
_ => {}
}
};
}
pub(crate) fn merge(base: &mut Metric, other: Metric) {
if base.name != other.name {
return;
}
merge_compatible_type!(base, other,
Data::Sum => Data::Sum;
Data::Gauge => Data::Sum, Data::Gauge;
Data::Histogram => Data::Histogram;
Data::Summary => Data::Summary
);
}
}
#[cfg(test)]
#[allow(deprecated)]
mod tests {
#[cfg(feature = "grpc-tonic")]
mod tonic {
use crate::transform::metrics::tonic::merge;
use crate::transform::{record_to_metric, sink, ResourceWrapper};
use opentelemetry::attributes::AttributeSet;
use opentelemetry::metrics::MetricsError;
use opentelemetry::sdk::export::metrics::aggregation::cumulative_temporality_selector;
use opentelemetry::sdk::export::metrics::record;
use opentelemetry::sdk::metrics::aggregators::{
histogram, last_value, Aggregator, SumAggregator,
};
use opentelemetry::sdk::metrics::sdk_api::{
Descriptor, InstrumentKind, Number, NumberKind,
};
use opentelemetry::sdk::{InstrumentationLibrary, Resource};
use opentelemetry::Context;
use opentelemetry_proto::tonic::metrics::v1::DataPointFlags;
use opentelemetry_proto::tonic::{
common::v1::{any_value, AnyValue, KeyValue},
metrics::v1::{
metric::Data, number_data_point, Gauge, Histogram, HistogramDataPoint,
InstrumentationLibraryMetrics, Metric, NumberDataPoint, ResourceMetrics, Sum,
},
Attributes, FromNumber,
};
use std::cmp::Ordering;
use std::sync::Arc;
use time::macros::datetime;
fn key_value(key: &str, value: &str) -> KeyValue {
KeyValue {
key: key.to_string(),
value: Some(AnyValue {
value: Some(any_value::Value::StringValue(value.to_string())),
}),
}
}
fn i64_to_value(val: i64) -> number_data_point::Value {
number_data_point::Value::AsInt(val)
}
#[allow(clippy::type_complexity)]
fn get_metric_with_name(
name: &'static str,
data_points: Vec<(Vec<(&'static str, &'static str)>, u64, u64, i64)>,
) -> Metric {
Metric {
name: name.to_string(),
description: "".to_string(),
unit: "".to_string(),
data: Some(Data::Gauge(Gauge {
data_points: data_points
.into_iter()
.map(|(attributes, start_time, end_time, value)| {
get_int_data_point(attributes, start_time, end_time, value)
})
.collect::<Vec<NumberDataPoint>>(),
})),
}
}
fn get_int_data_point(
attributes: Vec<(&'static str, &'static str)>,
start_time: u64,
end_time: u64,
value: i64,
) -> NumberDataPoint {
NumberDataPoint {
flags: DataPointFlags::FlagNone as u32,
attributes: attributes
.into_iter()
.map(|(key, value)| key_value(key, value))
.collect::<Vec<KeyValue>>(),
start_time_unix_nano: start_time,
time_unix_nano: end_time,
value: Some(number_data_point::Value::from_number(
value.into(),
&NumberKind::I64,
)),
exemplars: vec![],
}
}
type InstrumentationLibraryKv = (&'static str, Option<&'static str>);
type ResourceKv = Vec<(&'static str, &'static str)>;
type MetricRaw = (&'static str, Vec<DataPointRaw>);
type DataPointRaw = (Vec<(&'static str, &'static str)>, u64, u64, i64);
fn convert_to_resource_metrics(
data: (ResourceKv, Vec<(InstrumentationLibraryKv, Vec<MetricRaw>)>),
) -> opentelemetry_proto::tonic::metrics::v1::ResourceMetrics {
let attributes: Attributes = data
.0
.into_iter()
.map(|(k, v)| opentelemetry::KeyValue::new(k.to_string(), v.to_string()))
.collect::<Vec<opentelemetry::KeyValue>>()
.into();
let resource = opentelemetry_proto::tonic::resource::v1::Resource {
attributes: attributes.0,
dropped_attributes_count: 0,
};
let mut instrumentation_library_metrics = vec![];
for ((instrumentation_name, instrumentation_version), metrics) in data.1 {
instrumentation_library_metrics.push(InstrumentationLibraryMetrics {
instrumentation_library: Some(
opentelemetry_proto::tonic::common::v1::InstrumentationLibrary {
name: instrumentation_name.to_string(),
version: instrumentation_version.unwrap_or("").to_string(),
},
),
schema_url: "".to_string(),
metrics: metrics
.into_iter()
.map(|(name, data_points)| get_metric_with_name(name, data_points))
.collect::<Vec<Metric>>(),
});
}
ResourceMetrics {
resource: Some(resource),
schema_url: "".to_string(),
instrumentation_library_metrics,
}
}
fn assert_resource_metrics(mut expect: ResourceMetrics, mut actual: ResourceMetrics) {
assert_eq!(
expect
.resource
.as_mut()
.map(|r| r.attributes.sort_by_key(|kv| kv.key.to_string())),
actual
.resource
.as_mut()
.map(|r| r.attributes.sort_by_key(|kv| kv.key.to_string()))
);
assert_eq!(
expect.instrumentation_library_metrics.len(),
actual.instrumentation_library_metrics.len()
);
let sort_instrumentation_library =
|metric: &InstrumentationLibraryMetrics,
other_metric: &InstrumentationLibraryMetrics| {
match (
metric.instrumentation_library.as_ref(),
other_metric.instrumentation_library.as_ref(),
) {
(Some(library), Some(other_library)) => library
.name
.cmp(&other_library.name)
.then(library.version.cmp(&other_library.version)),
_ => Ordering::Equal,
}
};
let sort_metrics = |metric: &Metric, other_metric: &Metric| {
metric.name.cmp(&other_metric.name).then(
metric
.description
.cmp(&other_metric.description)
.then(metric.unit.cmp(&other_metric.unit)),
)
};
expect
.instrumentation_library_metrics
.sort_by(sort_instrumentation_library);
actual
.instrumentation_library_metrics
.sort_by(sort_instrumentation_library);
for (mut expect, mut actual) in expect
.instrumentation_library_metrics
.into_iter()
.zip(actual.instrumentation_library_metrics.into_iter())
{
assert_eq!(expect.metrics.len(), actual.metrics.len());
expect.metrics.sort_by(sort_metrics);
actual.metrics.sort_by(sort_metrics);
assert_eq!(expect.metrics, actual.metrics)
}
}
#[test]
fn test_record_to_metric() -> Result<(), MetricsError> {
let cx = Context::new();
let attributes = vec![("test1", "value1"), ("test2", "value2")];
let str_kv_attributes = attributes
.iter()
.cloned()
.map(|(key, value)| key_value(key, value))
.collect::<Vec<KeyValue>>();
let attribute_set = AttributeSet::from_attributes(
attributes
.iter()
.cloned()
.map(|(k, v)| opentelemetry::KeyValue::new(k, v)),
);
let start_time = datetime!(2020-12-25 10:10:0 UTC); let end_time = datetime!(2020-12-25 10:10:30 UTC);
{
let descriptor = Descriptor::new(
"test".to_string(),
InstrumentKind::Counter,
NumberKind::I64,
None,
None,
);
let aggregator = SumAggregator::default();
let val = Number::from(12_i64);
aggregator.update(&cx, &val, &descriptor)?;
let wrapped_aggregator: Arc<dyn Aggregator + Send + Sync> = Arc::new(aggregator);
let record = record(
&descriptor,
&attribute_set,
Some(&wrapped_aggregator),
start_time.into(),
end_time.into(),
);
let metric = record_to_metric(&record, &cumulative_temporality_selector())?;
let expect = Metric {
name: "test".to_string(),
description: "".to_string(),
unit: "".to_string(),
data: Some(Data::Sum(Sum {
data_points: vec![NumberDataPoint {
flags: DataPointFlags::FlagNone as u32,
attributes: str_kv_attributes.clone(),
start_time_unix_nano: 1608891000000000000,
time_unix_nano: 1608891030000000000,
value: Some(i64_to_value(12i64)),
exemplars: vec![],
}],
aggregation_temporality: 2,
is_monotonic: true,
})),
};
assert_eq!(expect, metric);
}
{
let descriptor = Descriptor::new(
"test".to_string(),
InstrumentKind::GaugeObserver,
NumberKind::I64,
None,
None,
);
let aggregator = last_value();
let val1 = Number::from(12_i64);
let val2 = Number::from(14_i64);
aggregator.update(&cx, &val1, &descriptor)?;
aggregator.update(&cx, &val2, &descriptor)?;
let wrapped_aggregator: Arc<dyn Aggregator + Send + Sync> = Arc::new(aggregator);
let record = record(
&descriptor,
&attribute_set,
Some(&wrapped_aggregator),
start_time.into(),
end_time.into(),
);
let metric = record_to_metric(&record, &cumulative_temporality_selector())?;
let expect = Metric {
name: "test".to_string(),
description: "".to_string(),
unit: "".to_string(),
data: Some(Data::Gauge(Gauge {
data_points: vec![NumberDataPoint {
flags: DataPointFlags::FlagNone as u32,
attributes: str_kv_attributes.clone(),
start_time_unix_nano: 1608891000000000000,
time_unix_nano: if let Data::Gauge(gauge) = metric.data.clone().unwrap()
{
gauge.data_points[0].time_unix_nano
} else {
0
},
value: Some(i64_to_value(14i64)),
exemplars: vec![],
}],
})),
};
assert_eq!(expect, metric);
}
{
let descriptor = Descriptor::new(
"test".to_string(),
InstrumentKind::Histogram,
NumberKind::I64,
None,
None,
);
let bound = [0.1, 0.2, 0.3];
let aggregator = histogram(&bound);
let vals = vec![1i64.into(), 2i64.into(), 3i64.into()];
for val in vals.iter() {
aggregator.update(&cx, val, &descriptor)?;
}
let wrapped_aggregator: Arc<dyn Aggregator + Send + Sync> = Arc::new(aggregator);
let record = record(
&descriptor,
&attribute_set,
Some(&wrapped_aggregator),
start_time.into(),
end_time.into(),
);
let metric = record_to_metric(&record, &cumulative_temporality_selector())?;
let expect = Metric {
name: "test".to_string(),
description: "".to_string(),
unit: "".to_string(),
data: Some(Data::Histogram(Histogram {
data_points: vec![HistogramDataPoint {
flags: DataPointFlags::FlagNone as u32,
attributes: str_kv_attributes,
start_time_unix_nano: 1608891000000000000,
time_unix_nano: 1608891030000000000,
count: 3,
sum: 6f64,
bucket_counts: vec![0, 0, 0, 3],
explicit_bounds: vec![0.1, 0.2, 0.3],
exemplars: vec![],
}],
aggregation_temporality: 2,
})),
};
assert_eq!(expect, metric);
}
Ok(())
}
#[test]
fn test_sink() {
let test_data: Vec<(ResourceWrapper, InstrumentationLibrary, Metric)> = vec![
(
vec![("runtime", "tokio")],
("otlp", Some("0.1.1")),
"test",
(vec![("attribute1", "attribute2")], 12, 23, 2),
),
(
vec![("runtime", "tokio")],
("otlp", Some("0.1.1")),
"test",
(vec![("attribute2", "attribute2")], 16, 19, 20),
),
(
vec![("runtime", "tokio"), ("rustc", "v48.0")],
("otlp", Some("0.1.1")),
"test",
(vec![("attribute2", "attribute2")], 16, 19, 20),
),
(
vec![("runtime", "tokio")],
("otlp", None),
"test",
(vec![("attribute1", "attribute2")], 15, 16, 88),
),
(
vec![("runtime", "tokio")],
("otlp", None),
"another_test",
(vec![("attribute1", "attribute2")], 15, 16, 99),
),
]
.into_iter()
.map(
|(kvs, (name, version), metric_name, (attributes, start_time, end_time, value))| {
(
ResourceWrapper::from(Resource::new(kvs.into_iter().map(|(k, v)| {
opentelemetry::KeyValue::new(k.to_string(), v.to_string())
}))),
InstrumentationLibrary::new(name, version, None),
get_metric_with_name(
metric_name,
vec![(attributes, start_time, end_time, value)],
),
)
},
)
.collect::<Vec<(ResourceWrapper, InstrumentationLibrary, Metric)>>();
let request = sink(test_data);
let actual = request.resource_metrics;
let expect = vec![
(
vec![("runtime", "tokio")],
vec![
(
("otlp", Some("0.1.1")),
vec![(
"test",
vec![
(vec![("attribute1", "attribute2")], 12, 23, 2),
(vec![("attribute2", "attribute2")], 16, 19, 20),
],
)],
),
(
("otlp", None),
vec![
(
"test",
vec![(vec![("attribute1", "attribute2")], 15, 16, 88)],
),
(
"another_test",
vec![(vec![("attribute1", "attribute2")], 15, 16, 99)],
),
],
),
],
),
(
vec![("runtime", "tokio"), ("rustc", "v48.0")],
vec![(
("otlp", Some("0.1.1")),
vec![(
"test",
vec![(vec![("attribute2", "attribute2")], 16, 19, 20)],
)],
)],
),
]
.into_iter()
.map(convert_to_resource_metrics);
for (expect, actual) in expect.into_iter().zip(actual.into_iter()) {
assert_resource_metrics(expect, actual);
}
}
#[test]
fn test_merge() {
let data_point_base = get_int_data_point(vec![("method", "POST")], 12, 12, 3);
let data_point_addon = get_int_data_point(vec![("method", "PUT")], 12, 12, 3);
let mut metric1 = Metric {
name: "test".to_string(),
description: "".to_string(),
unit: "".to_string(),
data: Some(Data::Sum(Sum {
data_points: vec![data_point_base.clone()],
aggregation_temporality: 2,
is_monotonic: true,
})),
};
let metric2 = Metric {
name: "test".to_string(),
description: "".to_string(),
unit: "".to_string(),
data: Some(Data::Sum(Sum {
data_points: vec![data_point_addon.clone()],
aggregation_temporality: 2,
is_monotonic: true,
})),
};
let expect = Metric {
name: "test".to_string(),
description: "".to_string(),
unit: "".to_string(),
data: Some(Data::Sum(Sum {
data_points: vec![data_point_base, data_point_addon],
aggregation_temporality: 2,
is_monotonic: true,
})),
};
merge(&mut metric1, metric2);
assert_eq!(metric1, expect);
}
}
}