pub(crate) mod aggregation;
pub mod data;
pub mod exporter;
pub(crate) mod instrument;
pub(crate) mod internal;
pub(crate) mod manual_reader;
pub(crate) mod meter;
mod meter_provider;
pub(crate) mod periodic_reader;
pub(crate) mod pipeline;
pub mod reader;
pub(crate) mod view;
pub use aggregation::*;
pub use instrument::*;
pub use manual_reader::*;
pub use meter::*;
pub use meter_provider::*;
pub use periodic_reader::*;
pub use pipeline::Pipeline;
pub use view::*;
#[cfg(all(test, feature = "testing"))]
mod tests {
use super::*;
use crate::metrics::data::{ResourceMetrics, Temporality};
use crate::metrics::reader::TemporalitySelector;
use crate::testing::metrics::InMemoryMetricsExporterBuilder;
use crate::{runtime, testing::metrics::InMemoryMetricsExporter};
use opentelemetry::metrics::{Counter, UpDownCounter};
use opentelemetry::{
metrics::{MeterProvider as _, Unit},
KeyValue,
};
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn counter_aggregation() {
let exporter = InMemoryMetricsExporter::default();
let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build();
let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
let meter = meter_provider.meter("test");
let counter = meter
.u64_counter("my_counter")
.with_unit(Unit::new("my_unit"))
.init();
counter.add(1, &[KeyValue::new("key1", "value1")]);
counter.add(1, &[KeyValue::new("key1", "value1")]);
counter.add(1, &[KeyValue::new("key1", "value1")]);
counter.add(1, &[KeyValue::new("key1", "value1")]);
counter.add(1, &[KeyValue::new("key1", "value1")]);
counter.add(1, &[KeyValue::new("key1", "value2")]);
counter.add(1, &[KeyValue::new("key1", "value2")]);
counter.add(1, &[KeyValue::new("key1", "value2")]);
meter_provider.force_flush().unwrap();
let resource_metrics = exporter
.get_finished_metrics()
.expect("metrics are expected to be exported.");
assert!(!resource_metrics.is_empty());
let metric = &resource_metrics[0].scope_metrics[0].metrics[0];
assert_eq!(metric.name, "my_counter");
assert_eq!(metric.unit.as_str(), "my_unit");
let sum = metric
.data
.as_any()
.downcast_ref::<data::Sum<u64>>()
.expect("Sum aggregation expected for Counter instruments by default");
assert_eq!(sum.data_points.len(), 2);
assert!(sum.is_monotonic, "Counter should produce monotonic.");
assert_eq!(
sum.temporality,
data::Temporality::Cumulative,
"Should produce cumulative by default."
);
let mut data_point1 = None;
for datapoint in &sum.data_points {
if datapoint
.attributes
.iter()
.any(|(k, v)| k.as_str() == "key1" && v.as_str() == "value1")
{
data_point1 = Some(datapoint);
}
}
assert_eq!(
data_point1
.expect("datapoint with key1=value1 expected")
.value,
5
);
let mut data_point1 = None;
for datapoint in &sum.data_points {
if datapoint
.attributes
.iter()
.any(|(k, v)| k.as_str() == "key1" && v.as_str() == "value2")
{
data_point1 = Some(datapoint);
}
}
assert_eq!(
data_point1
.expect("datapoint with key1=value2 expected")
.value,
3
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn counter_duplicate_instrument_merge() {
let exporter = InMemoryMetricsExporter::default();
let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build();
let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
let meter = meter_provider.meter("test");
let counter = meter
.u64_counter("my_counter")
.with_unit(Unit::new("my_unit"))
.with_description("my_description")
.init();
let counter_duplicated = meter
.u64_counter("my_counter")
.with_unit(Unit::new("my_unit"))
.with_description("my_description")
.init();
let attribute = vec![KeyValue::new("key1", "value1")];
counter.add(10, &attribute);
counter_duplicated.add(5, &attribute);
meter_provider.force_flush().unwrap();
let resource_metrics = exporter
.get_finished_metrics()
.expect("metrics are expected to be exported.");
assert!(
resource_metrics[0].scope_metrics[0].metrics.len() == 1,
"There should be single metric merging duplicate instruments"
);
let metric = &resource_metrics[0].scope_metrics[0].metrics[0];
assert_eq!(metric.name, "my_counter");
assert_eq!(metric.unit.as_str(), "my_unit");
let sum = metric
.data
.as_any()
.downcast_ref::<data::Sum<u64>>()
.expect("Sum aggregation expected for Counter instruments by default");
assert_eq!(sum.data_points.len(), 1);
let datapoint = &sum.data_points[0];
assert_eq!(datapoint.value, 15);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn histogram_aggregation_with_invalid_aggregation_should_proceed_as_if_view_not_exist() {
let exporter = InMemoryMetricsExporter::default();
let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build();
let criteria = Instrument::new().name("test_histogram");
let stream_invalid_aggregation = Stream::new()
.aggregation(Aggregation::ExplicitBucketHistogram {
boundaries: vec![0.9, 1.9, 1.2, 1.3, 1.4, 1.5], record_min_max: false,
})
.name("test_histogram_renamed")
.unit(Unit::new("test_unit_renamed"));
let view =
new_view(criteria, stream_invalid_aggregation).expect("Expected to create a new view");
let meter_provider = SdkMeterProvider::builder()
.with_reader(reader)
.with_view(view)
.build();
let meter = meter_provider.meter("test");
let histogram = meter
.f64_histogram("test_histogram")
.with_unit(Unit::new("test_unit"))
.init();
histogram.record(1.5, &[KeyValue::new("key1", "value1")]);
meter_provider.force_flush().unwrap();
let resource_metrics = exporter
.get_finished_metrics()
.expect("metrics are expected to be exported.");
assert!(!resource_metrics.is_empty());
let metric = &resource_metrics[0].scope_metrics[0].metrics[0];
assert_eq!(
metric.name, "test_histogram",
"View rename should be ignored and original name retained."
);
assert_eq!(
metric.unit.as_str(),
"test_unit",
"View rename of unit should be ignored and original unit retained."
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
#[ignore = "Spatial aggregation is not yet implemented."]
async fn spatial_aggregation_when_view_drops_attributes_observable_counter() {
let exporter = InMemoryMetricsExporter::default();
let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build();
let criteria = Instrument::new().name("my_observable_counter");
let stream_invalid_aggregation = Stream::new().allowed_attribute_keys(vec![]);
let view =
new_view(criteria, stream_invalid_aggregation).expect("Expected to create a new view");
let meter_provider = SdkMeterProvider::builder()
.with_reader(reader)
.with_view(view)
.build();
let meter = meter_provider.meter("test");
let observable_counter = meter.u64_observable_counter("my_observable_counter").init();
meter
.register_callback(&[observable_counter.as_any()], move |observer| {
observer.observe_u64(
&observable_counter,
100,
[
KeyValue::new("statusCode", "200"),
KeyValue::new("verb", "get"),
]
.as_ref(),
);
observer.observe_u64(
&observable_counter,
100,
[
KeyValue::new("statusCode", "200"),
KeyValue::new("verb", "post"),
]
.as_ref(),
);
observer.observe_u64(
&observable_counter,
100,
[
KeyValue::new("statusCode", "500"),
KeyValue::new("verb", "get"),
]
.as_ref(),
);
})
.expect("Expected to register callback");
meter_provider.force_flush().unwrap();
let resource_metrics = exporter
.get_finished_metrics()
.expect("metrics are expected to be exported.");
assert!(!resource_metrics.is_empty());
let metric = &resource_metrics[0].scope_metrics[0].metrics[0];
assert_eq!(metric.name, "my_observable_counter",);
let sum = metric
.data
.as_any()
.downcast_ref::<data::Sum<u64>>()
.expect("Sum aggregation expected for ObservableCounter instruments by default");
assert_eq!(sum.data_points.len(), 1);
let data_point = &sum.data_points[0];
assert_eq!(data_point.value, 300);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
#[ignore = "Spatial aggregation is not yet implemented."]
async fn spatial_aggregation_when_view_drops_attributes_counter() {
let exporter = InMemoryMetricsExporter::default();
let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build();
let criteria = Instrument::new().name("my_counter");
let stream_invalid_aggregation = Stream::new().allowed_attribute_keys(vec![]);
let view =
new_view(criteria, stream_invalid_aggregation).expect("Expected to create a new view");
let meter_provider = SdkMeterProvider::builder()
.with_reader(reader)
.with_view(view)
.build();
let meter = meter_provider.meter("test");
let counter = meter.u64_counter("my_counter").init();
counter.add(
10,
[
KeyValue::new("statusCode", "200"),
KeyValue::new("verb", "Get"),
]
.as_ref(),
);
counter.add(
10,
[
KeyValue::new("statusCode", "500"),
KeyValue::new("verb", "Get"),
]
.as_ref(),
);
counter.add(
10,
[
KeyValue::new("statusCode", "200"),
KeyValue::new("verb", "Post"),
]
.as_ref(),
);
meter_provider.force_flush().unwrap();
let resource_metrics = exporter
.get_finished_metrics()
.expect("metrics are expected to be exported.");
assert!(!resource_metrics.is_empty());
let metric = &resource_metrics[0].scope_metrics[0].metrics[0];
assert_eq!(metric.name, "my_counter",);
let sum = metric
.data
.as_any()
.downcast_ref::<data::Sum<u64>>()
.expect("Sum aggregation expected for Counter instruments by default");
assert_eq!(sum.data_points.len(), 1);
let data_point = &sum.data_points[0];
assert_eq!(data_point.value, 30);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn no_attr_cumulative_counter() {
let mut test_context = TestContext::new(Some(Temporality::Cumulative));
let counter = test_context.u64_counter("test", "my_counter", "my_unit");
counter.add(50, &[]);
test_context.flush_metrics();
let sum = test_context.get_aggregation::<data::Sum<u64>>("my_counter", "my_unit");
assert_eq!(sum.data_points.len(), 1, "Expected only one data point");
assert!(sum.is_monotonic, "Should produce monotonic.");
assert_eq!(
sum.temporality,
Temporality::Cumulative,
"Should produce cumulative"
);
let data_point = &sum.data_points[0];
assert!(data_point.attributes.is_empty(), "Non-empty attribute set");
assert_eq!(data_point.value, 50, "Unexpected data point value");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn no_attr_delta_counter() {
let mut test_context = TestContext::new(Some(Temporality::Delta));
let counter = test_context.u64_counter("test", "my_counter", "my_unit");
counter.add(50, &[]);
test_context.flush_metrics();
let sum = test_context.get_aggregation::<data::Sum<u64>>("my_counter", "my_unit");
assert_eq!(sum.data_points.len(), 1, "Expected only one data point");
assert!(sum.is_monotonic, "Should produce monotonic.");
assert_eq!(sum.temporality, Temporality::Delta, "Should produce delta");
let data_point = &sum.data_points[0];
assert!(data_point.attributes.is_empty(), "Non-empty attribute set");
assert_eq!(data_point.value, 50, "Unexpected data point value");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn no_attr_cumulative_up_down_counter() {
let mut test_context = TestContext::new(Some(Temporality::Cumulative));
let counter = test_context.i64_up_down_counter("test", "my_counter", "my_unit");
counter.add(50, &[]);
test_context.flush_metrics();
let sum = test_context.get_aggregation::<data::Sum<i64>>("my_counter", "my_unit");
assert_eq!(sum.data_points.len(), 1, "Expected only one data point");
assert!(!sum.is_monotonic, "Should not produce monotonic.");
assert_eq!(
sum.temporality,
Temporality::Cumulative,
"Should produce cumulative"
);
let data_point = &sum.data_points[0];
assert!(data_point.attributes.is_empty(), "Non-empty attribute set");
assert_eq!(data_point.value, 50, "Unexpected data point value");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn no_attr_delta_up_down_counter() {
let mut test_context = TestContext::new(Some(Temporality::Delta));
let counter = test_context.i64_up_down_counter("test", "my_counter", "my_unit");
counter.add(50, &[]);
test_context.flush_metrics();
let sum = test_context.get_aggregation::<data::Sum<i64>>("my_counter", "my_unit");
assert_eq!(sum.data_points.len(), 1, "Expected only one data point");
assert!(!sum.is_monotonic, "Should not produce monotonic.");
assert_eq!(sum.temporality, Temporality::Delta, "Should produce Delta");
let data_point = &sum.data_points[0];
assert!(data_point.attributes.is_empty(), "Non-empty attribute set");
assert_eq!(data_point.value, 50, "Unexpected data point value");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn no_attr_cumulative_counter_value_added_after_export() {
let mut test_context = TestContext::new(Some(Temporality::Cumulative));
let counter = test_context.u64_counter("test", "my_counter", "my_unit");
counter.add(50, &[]);
test_context.flush_metrics();
let _ = test_context.get_aggregation::<data::Sum<u64>>("my_counter", "my_unit");
counter.add(5, &[]);
test_context.flush_metrics();
let sum = test_context.get_aggregation::<data::Sum<u64>>("my_counter", "my_unit");
assert_eq!(sum.data_points.len(), 1, "Expected only one data point");
assert!(sum.is_monotonic, "Should produce monotonic.");
assert_eq!(
sum.temporality,
Temporality::Cumulative,
"Should produce cumulative"
);
let data_point = &sum.data_points[0];
assert!(data_point.attributes.is_empty(), "Non-empty attribute set");
assert_eq!(data_point.value, 55, "Unexpected data point value");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn no_attr_delta_counter_value_reset_after_export() {
let mut test_context = TestContext::new(Some(Temporality::Delta));
let counter = test_context.u64_counter("test", "my_counter", "my_unit");
counter.add(50, &[]);
test_context.flush_metrics();
let _ = test_context.get_aggregation::<data::Sum<u64>>("my_counter", "my_unit");
counter.add(5, &[]);
test_context.flush_metrics();
let sum = test_context.get_aggregation::<data::Sum<u64>>("my_counter", "my_unit");
assert_eq!(sum.data_points.len(), 1, "Expected only one data point");
assert!(sum.is_monotonic, "Should produce monotonic.");
assert_eq!(
sum.temporality,
Temporality::Delta,
"Should produce cumulative"
);
let data_point = &sum.data_points[0];
assert!(data_point.attributes.is_empty(), "Non-empty attribute set");
assert_eq!(data_point.value, 5, "Unexpected data point value");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn second_delta_export_does_not_give_no_attr_value_if_add_not_called() {
let mut test_context = TestContext::new(Some(Temporality::Delta));
let counter = test_context.u64_counter("test", "my_counter", "my_unit");
counter.add(50, &[]);
test_context.flush_metrics();
let _ = test_context.get_aggregation::<data::Sum<u64>>("my_counter", "my_unit");
counter.add(50, &[KeyValue::new("a", "b")]);
test_context.flush_metrics();
let sum = test_context.get_aggregation::<data::Sum<u64>>("my_counter", "my_unit");
let no_attr_data_point = sum.data_points.iter().find(|x| x.attributes.is_empty());
assert!(
no_attr_data_point.is_none(),
"Expected no data points with no attributes"
);
}
struct TestContext {
exporter: InMemoryMetricsExporter,
meter_provider: SdkMeterProvider,
resource_metrics: Vec<ResourceMetrics>,
}
impl TestContext {
fn new(temporality: Option<Temporality>) -> Self {
struct TestTemporalitySelector(Temporality);
impl TemporalitySelector for TestTemporalitySelector {
fn temporality(&self, _kind: InstrumentKind) -> Temporality {
self.0
}
}
let mut exporter = InMemoryMetricsExporterBuilder::new();
if let Some(temporality) = temporality {
exporter = exporter.with_temporality_selector(TestTemporalitySelector(temporality));
}
let exporter = exporter.build();
let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build();
let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
TestContext {
exporter,
meter_provider,
resource_metrics: vec![],
}
}
fn u64_counter(
&self,
meter_name: &'static str,
counter_name: &'static str,
unit_name: &'static str,
) -> Counter<u64> {
self.meter_provider
.meter(meter_name)
.u64_counter(counter_name)
.with_unit(Unit::new(unit_name))
.init()
}
fn i64_up_down_counter(
&self,
meter_name: &'static str,
counter_name: &'static str,
unit_name: &'static str,
) -> UpDownCounter<i64> {
self.meter_provider
.meter(meter_name)
.i64_up_down_counter(counter_name)
.with_unit(Unit::new(unit_name))
.init()
}
fn flush_metrics(&self) {
self.meter_provider.force_flush().unwrap();
}
fn get_aggregation<T: data::Aggregation>(
&mut self,
counter_name: &str,
unit_name: &str,
) -> &T {
self.resource_metrics = self
.exporter
.get_finished_metrics()
.expect("metrics expected to be exported");
assert!(
!self.resource_metrics.is_empty(),
"no metrics were exported"
);
let resource_metric = self.resource_metrics.last().unwrap();
assert!(
!resource_metric.scope_metrics.is_empty(),
"No scope metrics in latest export"
);
assert!(!resource_metric.scope_metrics[0].metrics.is_empty());
let metric = &resource_metric.scope_metrics[0].metrics[0];
assert_eq!(metric.name, counter_name);
assert_eq!(metric.unit.as_str(), unit_name);
metric
.data
.as_any()
.downcast_ref::<T>()
.expect("Failed to cast aggregation to expected type")
}
}
}