use std::{borrow::Cow, collections::HashSet, error::Error, sync::Arc};
#[cfg(feature = "experimental_metrics_bound_instruments")]
use opentelemetry::metrics::BoundSyncInstrument;
use opentelemetry::{
metrics::{AsyncInstrument, SyncInstrument},
InstrumentationScope, Key, KeyValue,
};
#[cfg(feature = "experimental_metrics_bound_instruments")]
use crate::metrics::internal::BoundMeasure;
use crate::metrics::{aggregation::Aggregation, internal::Measure};
use super::meter::{
INSTRUMENT_NAME_EMPTY, INSTRUMENT_NAME_FIRST_ALPHABETIC, INSTRUMENT_NAME_INVALID_CHAR,
INSTRUMENT_NAME_LENGTH, INSTRUMENT_UNIT_INVALID_CHAR, INSTRUMENT_UNIT_LENGTH,
};
use super::Temporality;
#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq)]
pub enum InstrumentKind {
Counter,
UpDownCounter,
Histogram,
ObservableCounter,
ObservableUpDownCounter,
Gauge,
ObservableGauge,
}
impl InstrumentKind {
pub(crate) fn temporality_preference(&self, temporality: Temporality) -> Temporality {
match temporality {
Temporality::Cumulative => Temporality::Cumulative,
Temporality::Delta => match self {
Self::Counter
| Self::Histogram
| Self::ObservableCounter
| Self::Gauge
| Self::ObservableGauge => Temporality::Delta,
Self::UpDownCounter | InstrumentKind::ObservableUpDownCounter => {
Temporality::Cumulative
}
},
Temporality::LowMemory => match self {
Self::Counter | InstrumentKind::Histogram => Temporality::Delta,
Self::ObservableCounter
| Self::Gauge
| Self::ObservableGauge
| Self::UpDownCounter
| Self::ObservableUpDownCounter => Temporality::Cumulative,
},
}
}
}
#[derive(Clone, Debug, PartialEq)]
pub struct Instrument {
pub(crate) name: Cow<'static, str>,
pub(crate) description: Cow<'static, str>,
pub(crate) kind: InstrumentKind,
pub(crate) unit: Cow<'static, str>,
pub(crate) scope: InstrumentationScope,
}
impl Instrument {
pub fn name(&self) -> &str {
self.name.as_ref()
}
pub fn kind(&self) -> InstrumentKind {
self.kind
}
pub fn unit(&self) -> &str {
self.unit.as_ref()
}
pub fn scope(&self) -> &InstrumentationScope {
&self.scope
}
}
#[derive(Default, Debug)]
pub struct StreamBuilder {
name: Option<Cow<'static, str>>,
description: Option<Cow<'static, str>>,
unit: Option<Cow<'static, str>>,
aggregation: Option<Aggregation>,
allowed_attribute_keys: Option<Arc<HashSet<Key>>>,
cardinality_limit: Option<usize>,
}
impl StreamBuilder {
pub(crate) fn new() -> Self {
StreamBuilder::default()
}
pub fn with_name(mut self, name: impl Into<Cow<'static, str>>) -> Self {
self.name = Some(name.into());
self
}
pub fn with_description(mut self, description: impl Into<Cow<'static, str>>) -> Self {
self.description = Some(description.into());
self
}
pub fn with_unit(mut self, unit: impl Into<Cow<'static, str>>) -> Self {
self.unit = Some(unit.into());
self
}
pub fn with_aggregation(mut self, aggregation: Aggregation) -> Self {
self.aggregation = Some(aggregation);
self
}
#[cfg(feature = "spec_unstable_metrics_views")]
pub fn with_allowed_attribute_keys(
mut self,
attribute_keys: impl IntoIterator<Item = Key>,
) -> Self {
self.allowed_attribute_keys = Some(Arc::new(attribute_keys.into_iter().collect()));
self
}
pub fn with_cardinality_limit(mut self, limit: usize) -> Self {
self.cardinality_limit = Some(limit);
self
}
pub fn build(self) -> Result<Stream, Box<dyn Error>> {
if let Some(name) = &self.name {
if name.is_empty() {
return Err(INSTRUMENT_NAME_EMPTY.into());
}
if name.len() > super::meter::INSTRUMENT_NAME_MAX_LENGTH {
return Err(INSTRUMENT_NAME_LENGTH.into());
}
if name.starts_with(|c: char| !c.is_ascii_alphabetic()) {
return Err(INSTRUMENT_NAME_FIRST_ALPHABETIC.into());
}
if name.contains(|c: char| {
!c.is_ascii_alphanumeric()
&& !super::meter::INSTRUMENT_NAME_ALLOWED_NON_ALPHANUMERIC_CHARS.contains(&c)
}) {
return Err(INSTRUMENT_NAME_INVALID_CHAR.into());
}
}
if let Some(unit) = &self.unit {
if unit.len() > super::meter::INSTRUMENT_UNIT_NAME_MAX_LENGTH {
return Err(INSTRUMENT_UNIT_LENGTH.into());
}
if unit.contains(|c: char| !c.is_ascii()) {
return Err(INSTRUMENT_UNIT_INVALID_CHAR.into());
}
}
if let Some(limit) = self.cardinality_limit {
if limit == 0 {
return Err("Cardinality limit must be greater than 0".into());
}
if limit == usize::MAX {
return Err("Cardinality limit must be less than usize::MAX".into());
}
}
if let Some(Aggregation::ExplicitBucketHistogram { boundaries, .. }) = &self.aggregation {
validate_bucket_boundaries(boundaries)?;
}
if let Some(Aggregation::Base2ExponentialHistogram {
max_size,
max_scale,
..
}) = &self.aggregation
{
if *max_size == 0 {
return Err("max_size must be greater than 0".into());
}
if *max_scale < super::internal::EXPO_MIN_SCALE
|| *max_scale > super::internal::EXPO_MAX_SCALE
{
return Err(format!(
"max_scale must be between {} and {}",
super::internal::EXPO_MIN_SCALE,
super::internal::EXPO_MAX_SCALE
)
.into());
}
}
Ok(Stream {
name: self.name,
description: self.description,
unit: self.unit,
aggregation: self.aggregation,
allowed_attribute_keys: self.allowed_attribute_keys,
cardinality_limit: self.cardinality_limit,
})
}
}
fn validate_bucket_boundaries(boundaries: &[f64]) -> Result<(), String> {
for boundary in boundaries {
if boundary.is_nan() || boundary.is_infinite() {
return Err(
"Bucket boundaries must not contain NaN, Infinity, or -Infinity".to_string(),
);
}
}
for i in 1..boundaries.len() {
if boundaries[i] <= boundaries[i - 1] {
return Err(
"Bucket boundaries must be sorted and not contain any duplicates".to_string(),
);
}
}
Ok(())
}
#[derive(Default, Debug)]
pub struct Stream {
pub(crate) name: Option<Cow<'static, str>>,
pub(crate) description: Option<Cow<'static, str>>,
pub(crate) unit: Option<Cow<'static, str>>,
pub(crate) aggregation: Option<Aggregation>,
pub(crate) allowed_attribute_keys: Option<Arc<HashSet<Key>>>,
pub(crate) cardinality_limit: Option<usize>,
}
impl Stream {
pub fn builder() -> StreamBuilder {
StreamBuilder::new()
}
}
#[derive(Debug, PartialEq, Eq, Hash)]
pub(crate) struct InstrumentId {
pub(crate) name: Cow<'static, str>,
pub(crate) description: Cow<'static, str>,
pub(crate) kind: InstrumentKind,
pub(crate) unit: Cow<'static, str>,
pub(crate) number: Cow<'static, str>,
}
impl InstrumentId {
pub(crate) fn normalize(&mut self) {
if self.name.chars().any(|c| c.is_ascii_uppercase()) {
self.name = self.name.to_ascii_lowercase().into();
}
}
}
pub(crate) struct ResolvedMeasures<T> {
pub(crate) measures: Vec<Arc<dyn Measure<T>>>,
}
impl<T: Copy + 'static> SyncInstrument<T> for ResolvedMeasures<T> {
fn measure(&self, val: T, attrs: &[KeyValue]) {
for measure in &self.measures {
measure.call(val, attrs)
}
}
#[cfg(feature = "experimental_metrics_bound_instruments")]
fn bind(&self, attrs: &[KeyValue]) -> Box<dyn BoundSyncInstrument<T> + Send + Sync> {
let bound_measures: Vec<Box<dyn BoundMeasure<T>>> =
self.measures.iter().map(|m| m.bind(attrs)).collect();
Box::new(ResolvedBoundMeasures {
measures: bound_measures,
})
}
}
#[cfg(feature = "experimental_metrics_bound_instruments")]
pub(crate) struct ResolvedBoundMeasures<T> {
measures: Vec<Box<dyn BoundMeasure<T>>>,
}
#[cfg(feature = "experimental_metrics_bound_instruments")]
impl<T: Copy + 'static> BoundSyncInstrument<T> for ResolvedBoundMeasures<T> {
fn measure(&self, val: T) {
for measure in &self.measures {
measure.call(val);
}
}
}
#[derive(Clone)]
pub(crate) struct Observable<T> {
measures: Vec<Arc<dyn Measure<T>>>,
}
impl<T> Observable<T> {
pub(crate) fn new(measures: Vec<Arc<dyn Measure<T>>>) -> Self {
Self { measures }
}
}
impl<T: Copy + Send + Sync + 'static> AsyncInstrument<T> for Observable<T> {
fn observe(&self, measurement: T, attrs: &[KeyValue]) {
for measure in &self.measures {
measure.call(measurement, attrs)
}
}
}
#[cfg(test)]
mod tests {
use super::StreamBuilder;
use crate::metrics::meter::{
INSTRUMENT_NAME_EMPTY, INSTRUMENT_NAME_FIRST_ALPHABETIC, INSTRUMENT_NAME_INVALID_CHAR,
INSTRUMENT_NAME_LENGTH, INSTRUMENT_UNIT_INVALID_CHAR, INSTRUMENT_UNIT_LENGTH,
};
#[test]
fn stream_name_validation() {
let stream_name_test_cases = vec![
("validateName", ""),
("_startWithNoneAlphabet", INSTRUMENT_NAME_FIRST_ALPHABETIC),
("utf8char锈", INSTRUMENT_NAME_INVALID_CHAR),
("a".repeat(255).leak(), ""),
("a".repeat(256).leak(), INSTRUMENT_NAME_LENGTH),
("invalid name", INSTRUMENT_NAME_INVALID_CHAR),
("allow/slash", ""),
("allow_under_score", ""),
("allow.dots.ok", ""),
("", INSTRUMENT_NAME_EMPTY),
("\\allow\\slash /sec", INSTRUMENT_NAME_FIRST_ALPHABETIC),
("\\allow\\$$slash /sec", INSTRUMENT_NAME_FIRST_ALPHABETIC),
("Total $ Count", INSTRUMENT_NAME_INVALID_CHAR),
(
"\\test\\UsagePercent(Total) > 80%",
INSTRUMENT_NAME_FIRST_ALPHABETIC,
),
("/not / allowed", INSTRUMENT_NAME_FIRST_ALPHABETIC),
];
for (name, expected_error) in stream_name_test_cases {
let builder = StreamBuilder::new().with_name(name);
let result = builder.build();
if expected_error.is_empty() {
assert!(
result.is_ok(),
"Expected successful build for name '{}', but got error: {:?}",
name,
result.err()
);
} else {
let err = result.err().unwrap();
let err_str = err.to_string();
assert!(
err_str == expected_error,
"For name '{name}', expected error '{expected_error}', but got '{err_str}'"
);
}
}
}
#[test]
fn stream_unit_validation() {
let stream_unit_test_cases = vec![
(
"0123456789012345678901234567890123456789012345678901234567890123",
INSTRUMENT_UNIT_LENGTH,
),
("utf8char锈", INSTRUMENT_UNIT_INVALID_CHAR),
("kb", ""),
("Kb/sec", ""),
("%", ""),
("", ""),
];
for (unit, expected_error) in stream_unit_test_cases {
let builder = StreamBuilder::new().with_name("valid_name").with_unit(unit);
let result = builder.build();
if expected_error.is_empty() {
assert!(
result.is_ok(),
"Expected successful build for unit '{}', but got error: {:?}",
unit,
result.err()
);
} else {
let err = result.err().unwrap();
let err_str = err.to_string();
assert!(
err_str == expected_error,
"For unit '{unit}', expected error '{expected_error}', but got '{err_str}'"
);
}
}
}
#[test]
fn stream_cardinality_limit_validation() {
let builder = StreamBuilder::new()
.with_name("valid_name")
.with_cardinality_limit(0);
let result = builder.build();
assert!(result.is_err(), "Expected error for zero cardinality limit");
assert_eq!(
result.err().unwrap().to_string(),
"Cardinality limit must be greater than 0",
"Expected cardinality limit validation error message"
);
let builder = StreamBuilder::new()
.with_name("valid_name")
.with_cardinality_limit(usize::MAX);
let result = builder.build();
assert!(
result.is_err(),
"Expected error for usize::MAX cardinality limit"
);
assert_eq!(
result.err().unwrap().to_string(),
"Cardinality limit must be less than usize::MAX",
"Expected cardinality limit usize::MAX error message"
);
let valid_limits = vec![1, 10, 100, 1000, usize::MAX - 1];
for limit in valid_limits {
let builder = StreamBuilder::new()
.with_name("valid_name")
.with_cardinality_limit(limit);
let result = builder.build();
assert!(
result.is_ok(),
"Expected successful build for cardinality limit {}, but got error: {:?}",
limit,
result.err()
);
}
}
#[test]
fn stream_valid_build() {
let stream = StreamBuilder::new()
.with_name("valid_name")
.with_description("Valid description")
.with_unit("ms")
.with_cardinality_limit(100)
.build();
assert!(
stream.is_ok(),
"Expected valid Stream to be built successfully"
);
}
#[test]
fn stream_histogram_bucket_validation() {
use super::Aggregation;
let valid_boundaries = vec![1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0];
let builder = StreamBuilder::new()
.with_name("valid_histogram")
.with_aggregation(Aggregation::ExplicitBucketHistogram {
boundaries: valid_boundaries.clone(),
record_min_max: true,
});
let result = builder.build();
assert!(
result.is_ok(),
"Expected successful build with valid bucket boundaries"
);
let invalid_nan_boundaries = vec![1.0, 2.0, f64::NAN, 10.0];
let builder = StreamBuilder::new()
.with_name("invalid_histogram_nan")
.with_aggregation(Aggregation::ExplicitBucketHistogram {
boundaries: invalid_nan_boundaries,
record_min_max: true,
});
let result = builder.build();
assert!(
result.is_err(),
"Expected error for NaN in bucket boundaries"
);
assert_eq!(
result.err().unwrap().to_string(),
"Bucket boundaries must not contain NaN, Infinity, or -Infinity",
"Expected correct validation error for NaN"
);
let invalid_inf_boundaries = vec![1.0, 5.0, f64::INFINITY, 100.0];
let builder = StreamBuilder::new()
.with_name("invalid_histogram_inf")
.with_aggregation(Aggregation::ExplicitBucketHistogram {
boundaries: invalid_inf_boundaries,
record_min_max: true,
});
let result = builder.build();
assert!(
result.is_err(),
"Expected error for Infinity in bucket boundaries"
);
assert_eq!(
result.err().unwrap().to_string(),
"Bucket boundaries must not contain NaN, Infinity, or -Infinity",
"Expected correct validation error for Infinity"
);
let invalid_neg_inf_boundaries = vec![f64::NEG_INFINITY, 5.0, 10.0, 100.0];
let builder = StreamBuilder::new()
.with_name("invalid_histogram_neg_inf")
.with_aggregation(Aggregation::ExplicitBucketHistogram {
boundaries: invalid_neg_inf_boundaries,
record_min_max: true,
});
let result = builder.build();
assert!(
result.is_err(),
"Expected error for negative Infinity in bucket boundaries"
);
assert_eq!(
result.err().unwrap().to_string(),
"Bucket boundaries must not contain NaN, Infinity, or -Infinity",
"Expected correct validation error for negative Infinity"
);
let unsorted_boundaries = vec![1.0, 5.0, 2.0, 10.0];
let builder = StreamBuilder::new()
.with_name("unsorted_histogram")
.with_aggregation(Aggregation::ExplicitBucketHistogram {
boundaries: unsorted_boundaries,
record_min_max: true,
});
let result = builder.build();
assert!(
result.is_err(),
"Expected error for unsorted bucket boundaries"
);
assert_eq!(
result.err().unwrap().to_string(),
"Bucket boundaries must be sorted and not contain any duplicates",
"Expected correct validation error for unsorted boundaries"
);
let duplicate_boundaries = vec![1.0, 2.0, 5.0, 5.0, 10.0];
let builder = StreamBuilder::new()
.with_name("duplicate_histogram")
.with_aggregation(Aggregation::ExplicitBucketHistogram {
boundaries: duplicate_boundaries,
record_min_max: true,
});
let result = builder.build();
assert!(
result.is_err(),
"Expected error for duplicate bucket boundaries"
);
assert_eq!(
result.err().unwrap().to_string(),
"Bucket boundaries must be sorted and not contain any duplicates",
"Expected correct validation error for duplicate boundaries"
);
}
#[test]
fn stream_exponential_histogram_validation() {
use super::Aggregation;
use crate::metrics::internal::{EXPO_MAX_SCALE, EXPO_MIN_SCALE};
let builder = StreamBuilder::new()
.with_name("valid_expo_histogram")
.with_aggregation(Aggregation::Base2ExponentialHistogram {
max_size: 160,
max_scale: 10,
record_min_max: true,
});
let result = builder.build();
assert!(
result.is_ok(),
"Expected successful build with valid exponential histogram parameters"
);
let builder = StreamBuilder::new()
.with_name("invalid_expo_histogram_size")
.with_aggregation(Aggregation::Base2ExponentialHistogram {
max_size: 0,
max_scale: 10,
record_min_max: true,
});
let result = builder.build();
assert!(result.is_err(), "Expected error for max_size = 0");
assert_eq!(
result.err().unwrap().to_string(),
"max_size must be greater than 0",
"Expected correct validation error for max_size = 0"
);
let builder = StreamBuilder::new()
.with_name("invalid_expo_histogram_scale_high")
.with_aggregation(Aggregation::Base2ExponentialHistogram {
max_size: 160,
max_scale: EXPO_MAX_SCALE + 1,
record_min_max: true,
});
let result = builder.build();
assert!(
result.is_err(),
"Expected error for max_scale > EXPO_MAX_SCALE"
);
assert_eq!(
result.err().unwrap().to_string(),
format!(
"max_scale must be between {} and {}",
EXPO_MIN_SCALE, EXPO_MAX_SCALE
),
"Expected correct validation error for max_scale too high"
);
let builder = StreamBuilder::new()
.with_name("invalid_expo_histogram_scale_low")
.with_aggregation(Aggregation::Base2ExponentialHistogram {
max_size: 160,
max_scale: EXPO_MIN_SCALE - 1,
record_min_max: true,
});
let result = builder.build();
assert!(
result.is_err(),
"Expected error for max_scale < EXPO_MIN_SCALE"
);
assert_eq!(
result.err().unwrap().to_string(),
format!(
"max_scale must be between {} and {}",
EXPO_MIN_SCALE, EXPO_MAX_SCALE
),
"Expected correct validation error for max_scale too low"
);
let builder = StreamBuilder::new()
.with_name("valid_expo_histogram_min_scale")
.with_aggregation(Aggregation::Base2ExponentialHistogram {
max_size: 160,
max_scale: EXPO_MIN_SCALE,
record_min_max: true,
});
let result = builder.build();
assert!(
result.is_ok(),
"Expected successful build with max_scale = EXPO_MIN_SCALE"
);
let builder = StreamBuilder::new()
.with_name("valid_expo_histogram_max_scale")
.with_aggregation(Aggregation::Base2ExponentialHistogram {
max_size: 160,
max_scale: EXPO_MAX_SCALE,
record_min_max: true,
});
let result = builder.build();
assert!(
result.is_ok(),
"Expected successful build with max_scale = EXPO_MAX_SCALE"
);
}
}