use apollo_configuration::configuration;
use opentelemetry::Key;
use opentelemetry_sdk::metrics::{Aggregation, Instrument, InstrumentKind, Stream, StreamBuilder};
#[configuration]
pub(crate) struct ViewConfig {
pub(crate) selector: ViewSelectorConfig,
pub(crate) stream: ViewStreamConfig,
}
#[configuration]
pub(crate) struct ViewSelectorConfig {
pub(crate) instrument_name: Option<String>,
pub(crate) instrument_type: Option<InstrumentType>,
pub(crate) unit: Option<String>,
pub(crate) meter_name: Option<String>,
pub(crate) meter_version: Option<String>,
pub(crate) meter_schema_url: Option<String>,
}
#[configuration]
#[derive(Copy, PartialEq, Eq)]
pub(crate) enum InstrumentType {
Counter,
UpDownCounter,
Histogram,
Gauge,
ObservableCounter,
ObservableUpDownCounter,
ObservableGauge,
}
#[configuration]
pub(crate) struct ViewStreamConfig {
pub(crate) name: Option<String>,
pub(crate) description: Option<String>,
pub(crate) aggregation: AggregationConfig,
pub(crate) aggregation_cardinality_limit: Option<u32>,
pub(crate) attribute_keys: AttributeKeysConfig,
}
#[configuration]
pub(crate) struct AttributeKeysConfig {
pub(crate) included: Vec<String>,
pub(crate) excluded: Vec<String>,
}
#[configuration]
pub(crate) enum AggregationConfig {
#[config(default)]
Default,
Sum,
LastValue,
Drop,
ExplicitBucketHistogram(ExplicitBucketHistogramConfig),
Base2ExponentialBucketHistogram(Base2ExponentialBucketHistogramConfig),
}
#[configuration]
pub(crate) struct ExplicitBucketHistogramConfig {
#[config(skip_validate)]
pub(crate) boundaries: Vec<f64>,
#[config(default = true)]
pub(crate) record_min_max: bool,
}
#[configuration]
pub(crate) struct Base2ExponentialBucketHistogramConfig {
#[config(default = 160)]
pub(crate) max_size: u32,
#[config(default = 20)]
pub(crate) max_scale: i32,
#[config(default = true)]
pub(crate) record_min_max: bool,
}
impl From<InstrumentType> for InstrumentKind {
fn from(value: InstrumentType) -> Self {
match value {
InstrumentType::Counter => InstrumentKind::Counter,
InstrumentType::UpDownCounter => InstrumentKind::UpDownCounter,
InstrumentType::Histogram => InstrumentKind::Histogram,
InstrumentType::Gauge => InstrumentKind::Gauge,
InstrumentType::ObservableCounter => InstrumentKind::ObservableCounter,
InstrumentType::ObservableUpDownCounter => InstrumentKind::ObservableUpDownCounter,
InstrumentType::ObservableGauge => InstrumentKind::ObservableGauge,
}
}
}
impl From<&AggregationConfig> for Aggregation {
fn from(value: &AggregationConfig) -> Self {
match value {
AggregationConfig::Default => Aggregation::Default,
AggregationConfig::Sum => Aggregation::Sum,
AggregationConfig::LastValue => Aggregation::LastValue,
AggregationConfig::Drop => Aggregation::Drop,
AggregationConfig::ExplicitBucketHistogram(config) => {
Aggregation::ExplicitBucketHistogram {
boundaries: config.boundaries.clone(),
record_min_max: config.record_min_max,
}
}
AggregationConfig::Base2ExponentialBucketHistogram(config) => {
Aggregation::Base2ExponentialHistogram {
max_size: config.max_size,
max_scale: config.max_scale.clamp(i8::MIN as i32, i8::MAX as i32) as i8,
record_min_max: config.record_min_max,
}
}
}
}
}
impl ViewSelectorConfig {
pub(crate) fn matches(&self, instrument: &Instrument) -> bool {
if let Some(ref name_pattern) = self.instrument_name
&& !matches_wildcard(name_pattern, instrument.name())
{
return false;
}
if let Some(instrument_type) = self.instrument_type
&& InstrumentKind::from(instrument_type) != instrument.kind()
{
return false;
}
if let Some(ref unit) = self.unit
&& instrument.unit() != unit.as_str()
{
return false;
}
if let Some(ref meter_name) = self.meter_name
&& instrument.scope().name() != meter_name
{
return false;
}
if let Some(ref meter_version) = self.meter_version
&& instrument.scope().version() != Some(meter_version.as_str())
{
return false;
}
if let Some(ref meter_schema_url) = self.meter_schema_url
&& instrument.scope().schema_url() != Some(meter_schema_url.as_str())
{
return false;
}
true
}
}
fn matches_wildcard(pattern: &str, value: &str) -> bool {
if pattern == "*" {
return true;
}
if !pattern.contains('*') {
return pattern == value;
}
if let Some(suffix) = pattern.strip_prefix('*')
&& !suffix.contains('*')
{
return value.ends_with(suffix);
}
if let Some(prefix) = pattern.strip_suffix('*')
&& !prefix.contains('*')
{
return value.starts_with(prefix);
}
if pattern.starts_with('*') && pattern.ends_with('*') {
let middle = &pattern[1..pattern.len() - 1];
if !middle.contains('*') {
return value.contains(middle);
}
}
let parts: Vec<&str> = pattern.split('*').collect();
if parts.is_empty() {
return true;
}
let mut remaining = value;
if !pattern.starts_with('*') {
let first = parts[0];
if !remaining.starts_with(first) {
return false;
}
remaining = &remaining[first.len()..];
}
for part in parts
.iter()
.skip(if pattern.starts_with('*') { 0 } else { 1 })
{
if part.is_empty() {
continue;
}
if let Some(pos) = remaining.find(part) {
remaining = &remaining[pos + part.len()..];
} else {
return false;
}
}
if !pattern.ends_with('*') && !parts.is_empty() {
let last = parts[parts.len() - 1];
if !last.is_empty() && !remaining.is_empty() {
return false;
}
}
true
}
impl ViewStreamConfig {
pub(crate) fn to_stream(&self) -> Stream {
let mut builder = StreamBuilder::default();
if let Some(ref name) = self.name {
builder = builder.with_name(name.clone());
}
if let Some(ref description) = self.description {
builder = builder.with_description(description.clone());
}
if !matches!(self.aggregation, AggregationConfig::Default) {
builder = builder.with_aggregation(Aggregation::from(&self.aggregation));
}
if let Some(limit) = self.aggregation_cardinality_limit {
builder = builder.with_cardinality_limit(limit as usize);
}
if !self.attribute_keys.included.is_empty() {
let excluded_set: std::collections::HashSet<&str> = self
.attribute_keys
.excluded
.iter()
.map(|s| s.as_str())
.collect();
let keys: Vec<Key> = self
.attribute_keys
.included
.iter()
.filter(|k| !excluded_set.contains(k.as_str()))
.map(|k| Key::new(k.clone()))
.collect();
builder = builder.with_allowed_attribute_keys(keys);
}
builder.build().expect("valid stream configuration")
}
}
impl ViewConfig {
pub(crate) fn to_view(&self) -> impl Fn(&Instrument) -> Option<Stream> + Send + Sync + 'static {
let selector = self.selector.clone();
let stream_config = self.stream.clone();
move |instrument: &Instrument| {
if selector.matches(instrument) {
Some(stream_config.to_stream())
} else {
None
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use apollo_opentelemetry_test::TelemetryContext;
use opentelemetry::InstrumentationScope;
use opentelemetry::KeyValue;
use opentelemetry_sdk::metrics::data::{AggregatedMetrics, MetricData as SdkMetricData};
fn find_metric<'a>(
metrics: &'a [opentelemetry_sdk::metrics::data::ResourceMetrics],
name: &str,
) -> Option<&'a opentelemetry_sdk::metrics::data::Metric> {
metrics
.iter()
.flat_map(|rm| rm.scope_metrics())
.flat_map(|sm| sm.metrics())
.find(|m| m.name() == name)
}
#[test]
fn wildcard_match_star_matches_anything() {
assert!(matches_wildcard("*", "anything"));
assert!(matches_wildcard("*", ""));
assert!(matches_wildcard("*", "foo.bar.baz"));
}
#[test]
fn wildcard_match_exact() {
assert!(matches_wildcard("foo", "foo"));
assert!(!matches_wildcard("foo", "bar"));
assert!(!matches_wildcard("foo", "foobar"));
}
#[test]
fn wildcard_match_prefix() {
assert!(matches_wildcard("foo.*", "foo.bar"));
assert!(matches_wildcard("foo.*", "foo."));
assert!(matches_wildcard("http.*", "http.request.duration"));
assert!(!matches_wildcard("foo.*", "bar.baz"));
assert!(!matches_wildcard("foo.*", "foo"));
}
#[test]
fn wildcard_match_suffix() {
assert!(matches_wildcard("*.bar", "foo.bar"));
assert!(matches_wildcard("*.bar", ".bar"));
assert!(matches_wildcard("*.duration", "http.request.duration"));
assert!(!matches_wildcard("*.bar", "bar.baz"));
}
#[test]
fn wildcard_match_contains() {
assert!(matches_wildcard("*request*", "http.request.duration"));
assert!(matches_wildcard("*foo*", "foo"));
assert!(matches_wildcard("*foo*", "xfoobar"));
assert!(!matches_wildcard("*foo*", "bar"));
}
#[test]
fn instrument_kind_conversion() {
assert_eq!(
InstrumentKind::from(InstrumentType::Counter),
InstrumentKind::Counter
);
assert_eq!(
InstrumentKind::from(InstrumentType::Histogram),
InstrumentKind::Histogram
);
assert_eq!(
InstrumentKind::from(InstrumentType::Gauge),
InstrumentKind::Gauge
);
assert_eq!(
InstrumentKind::from(InstrumentType::ObservableCounter),
InstrumentKind::ObservableCounter
);
}
#[test]
fn aggregation_conversion_simple_variants() {
assert!(matches!(
Aggregation::from(&AggregationConfig::Default),
Aggregation::Default
));
assert!(matches!(
Aggregation::from(&AggregationConfig::Sum),
Aggregation::Sum
));
assert!(matches!(
Aggregation::from(&AggregationConfig::LastValue),
Aggregation::LastValue
));
assert!(matches!(
Aggregation::from(&AggregationConfig::Drop),
Aggregation::Drop
));
}
#[test]
fn aggregation_conversion_explicit_histogram() {
let config = AggregationConfig::ExplicitBucketHistogram(ExplicitBucketHistogramConfig {
boundaries: vec![0.1, 0.5, 1.0, 5.0],
record_min_max: false,
});
match Aggregation::from(&config) {
Aggregation::ExplicitBucketHistogram {
boundaries,
record_min_max,
} => {
assert_eq!(boundaries, vec![0.1, 0.5, 1.0, 5.0]);
assert!(!record_min_max);
}
_ => panic!("expected ExplicitBucketHistogram"),
}
}
#[test]
fn aggregation_conversion_exponential_histogram() {
let config = AggregationConfig::Base2ExponentialBucketHistogram(
Base2ExponentialBucketHistogramConfig {
max_size: 200,
max_scale: 10,
record_min_max: true,
},
);
match Aggregation::from(&config) {
Aggregation::Base2ExponentialHistogram {
max_size,
max_scale,
record_min_max,
} => {
assert_eq!(max_size, 200);
assert_eq!(max_scale, 10);
assert!(record_min_max);
}
_ => panic!("expected Base2ExponentialHistogram"),
}
}
#[test]
fn aggregation_conversion_exponential_histogram_clamps_scale() {
let config = AggregationConfig::Base2ExponentialBucketHistogram(
Base2ExponentialBucketHistogramConfig {
max_size: 160,
max_scale: 1000, record_min_max: true,
},
);
match Aggregation::from(&config) {
Aggregation::Base2ExponentialHistogram { max_scale, .. } => {
assert_eq!(max_scale, i8::MAX);
}
_ => panic!("expected Base2ExponentialHistogram"),
}
}
#[test]
fn stream_renames_metric() {
let view = ViewConfig {
selector: ViewSelectorConfig {
instrument_name: Some("original.counter".to_string()),
instrument_type: None,
unit: None,
meter_name: None,
meter_version: None,
meter_schema_url: None,
},
stream: ViewStreamConfig {
name: Some("renamed.counter".to_string()),
description: None,
aggregation: AggregationConfig::Default,
aggregation_cardinality_limit: None,
attribute_keys: AttributeKeysConfig {
included: vec![],
excluded: vec![],
},
},
};
let ctx = TelemetryContext::builder()
.with_view(view.to_view())
.build();
let meter = opentelemetry::global::meter_provider().meter("test");
let counter = meter.u64_counter("original.counter").build();
counter.add(1, &[]);
let metrics = ctx.metrics();
assert!(find_metric(&metrics, "renamed.counter").is_some());
assert!(find_metric(&metrics, "original.counter").is_none());
}
#[test]
fn stream_overrides_description() {
let view = ViewConfig {
selector: ViewSelectorConfig {
instrument_name: Some("my.counter".to_string()),
instrument_type: None,
unit: None,
meter_name: None,
meter_version: None,
meter_schema_url: None,
},
stream: ViewStreamConfig {
name: None,
description: Some("Custom description".to_string()),
aggregation: AggregationConfig::Default,
aggregation_cardinality_limit: None,
attribute_keys: AttributeKeysConfig {
included: vec![],
excluded: vec![],
},
},
};
let ctx = TelemetryContext::builder()
.with_view(view.to_view())
.build();
let meter = opentelemetry::global::meter_provider().meter("test");
let counter = meter
.u64_counter("my.counter")
.with_description("Original description")
.build();
counter.add(1, &[]);
let metrics = ctx.metrics();
let metric = find_metric(&metrics, "my.counter").expect("metric not found");
assert_eq!(metric.description(), "Custom description");
}
#[test]
fn stream_applies_custom_histogram_boundaries() {
let view = ViewConfig {
selector: ViewSelectorConfig {
instrument_name: Some("request.duration".to_string()),
instrument_type: None,
unit: None,
meter_name: None,
meter_version: None,
meter_schema_url: None,
},
stream: ViewStreamConfig {
name: None,
description: None,
aggregation: AggregationConfig::ExplicitBucketHistogram(
ExplicitBucketHistogramConfig {
boundaries: vec![10.0, 50.0, 100.0, 500.0],
record_min_max: true,
},
),
aggregation_cardinality_limit: None,
attribute_keys: AttributeKeysConfig {
included: vec![],
excluded: vec![],
},
},
};
let ctx = TelemetryContext::builder()
.with_view(view.to_view())
.build();
let meter = opentelemetry::global::meter_provider().meter("test");
let histogram = meter.f64_histogram("request.duration").build();
histogram.record(25.0, &[]);
histogram.record(75.0, &[]);
histogram.record(200.0, &[]);
let metrics = ctx.metrics();
let metric = find_metric(&metrics, "request.duration").expect("metric not found");
match metric.data() {
AggregatedMetrics::F64(SdkMetricData::Histogram(h)) => {
let dp = h.data_points().next().expect("no data points");
let bounds: Vec<f64> = dp.bounds().collect();
assert_eq!(bounds, vec![10.0, 50.0, 100.0, 500.0]);
let counts: Vec<u64> = dp.bucket_counts().collect();
assert_eq!(counts, vec![0, 1, 1, 1, 0]);
}
_ => panic!("expected F64 histogram"),
}
}
#[test]
fn stream_filters_attributes_with_included() {
let view = ViewConfig {
selector: ViewSelectorConfig {
instrument_name: Some("http.requests".to_string()),
instrument_type: None,
unit: None,
meter_name: None,
meter_version: None,
meter_schema_url: None,
},
stream: ViewStreamConfig {
name: None,
description: None,
aggregation: AggregationConfig::Default,
aggregation_cardinality_limit: None,
attribute_keys: AttributeKeysConfig {
included: vec!["method".to_string(), "status".to_string()],
excluded: vec![],
},
},
};
let ctx = TelemetryContext::builder()
.with_view(view.to_view())
.build();
let meter = opentelemetry::global::meter_provider().meter("test");
let counter = meter.u64_counter("http.requests").build();
counter.add(
1,
&[
KeyValue::new("method", "GET"),
KeyValue::new("status", "200"),
KeyValue::new("path", "/api/users"), KeyValue::new("host", "localhost"), ],
);
let metrics = ctx.metrics();
let metric = find_metric(&metrics, "http.requests").expect("metric not found");
match metric.data() {
AggregatedMetrics::U64(SdkMetricData::Sum(sum)) => {
let dp = sum.data_points().next().expect("no data points");
let attrs: Vec<_> = dp.attributes().collect();
assert_eq!(attrs.len(), 2);
let keys: Vec<_> = attrs.iter().map(|kv| kv.key.as_str()).collect();
assert!(keys.contains(&"method"));
assert!(keys.contains(&"status"));
assert!(!keys.contains(&"path"));
assert!(!keys.contains(&"host"));
}
_ => panic!("expected U64 Sum"),
}
}
#[test]
fn stream_filters_attributes_with_excluded() {
let view = ViewConfig {
selector: ViewSelectorConfig {
instrument_name: Some("db.queries".to_string()),
instrument_type: None,
unit: None,
meter_name: None,
meter_version: None,
meter_schema_url: None,
},
stream: ViewStreamConfig {
name: None,
description: None,
aggregation: AggregationConfig::Default,
aggregation_cardinality_limit: None,
attribute_keys: AttributeKeysConfig {
included: vec![
"operation".to_string(),
"table".to_string(),
"sensitive".to_string(),
],
excluded: vec!["sensitive".to_string()],
},
},
};
let ctx = TelemetryContext::builder()
.with_view(view.to_view())
.build();
let meter = opentelemetry::global::meter_provider().meter("test");
let counter = meter.u64_counter("db.queries").build();
counter.add(
1,
&[
KeyValue::new("operation", "SELECT"),
KeyValue::new("table", "users"),
KeyValue::new("sensitive", "secret"), ],
);
let metrics = ctx.metrics();
let metric = find_metric(&metrics, "db.queries").expect("metric not found");
match metric.data() {
AggregatedMetrics::U64(SdkMetricData::Sum(sum)) => {
let dp = sum.data_points().next().expect("no data points");
let keys: Vec<_> = dp.attributes().map(|kv| kv.key.as_str()).collect();
assert!(keys.contains(&"operation"));
assert!(keys.contains(&"table"));
assert!(!keys.contains(&"sensitive"));
}
_ => panic!("expected U64 Sum"),
}
}
#[test]
fn stream_drops_metric() {
let view = ViewConfig {
selector: ViewSelectorConfig {
instrument_name: Some("noisy.metric".to_string()),
instrument_type: None,
unit: None,
meter_name: None,
meter_version: None,
meter_schema_url: None,
},
stream: ViewStreamConfig {
name: None,
description: None,
aggregation: AggregationConfig::Drop,
aggregation_cardinality_limit: None,
attribute_keys: AttributeKeysConfig {
included: vec![],
excluded: vec![],
},
},
};
let ctx = TelemetryContext::builder()
.with_view(view.to_view())
.build();
let meter = opentelemetry::global::meter_provider().meter("test");
let dropped = meter.u64_counter("noisy.metric").build();
let kept = meter.u64_counter("important.metric").build();
dropped.add(100, &[]);
kept.add(1, &[]);
let metrics = ctx.metrics();
assert!(find_metric(&metrics, "noisy.metric").is_none());
assert!(find_metric(&metrics, "important.metric").is_some());
}
#[test]
fn stream_applies_sum_aggregation_to_histogram() {
let view = ViewConfig {
selector: ViewSelectorConfig {
instrument_name: Some("simple.duration".to_string()),
instrument_type: None,
unit: None,
meter_name: None,
meter_version: None,
meter_schema_url: None,
},
stream: ViewStreamConfig {
name: None,
description: None,
aggregation: AggregationConfig::Sum,
aggregation_cardinality_limit: None,
attribute_keys: AttributeKeysConfig {
included: vec![],
excluded: vec![],
},
},
};
let ctx = TelemetryContext::builder()
.with_view(view.to_view())
.build();
let meter = opentelemetry::global::meter_provider().meter("test");
let histogram = meter.f64_histogram("simple.duration").build();
histogram.record(10.0, &[]);
histogram.record(20.0, &[]);
histogram.record(30.0, &[]);
let metrics = ctx.metrics();
let metric = find_metric(&metrics, "simple.duration").expect("metric not found");
match metric.data() {
AggregatedMetrics::F64(SdkMetricData::Sum(sum)) => {
let dp = sum.data_points().next().expect("no data points");
assert!((dp.value() - 60.0).abs() < f64::EPSILON);
}
_ => panic!("expected F64 Sum, got {:?}", metric.data()),
}
}
#[test]
fn selector_matches_by_instrument_type() {
let view = ViewConfig {
selector: ViewSelectorConfig {
instrument_name: None,
instrument_type: Some(InstrumentType::Histogram),
unit: None,
meter_name: None,
meter_version: None,
meter_schema_url: None,
},
stream: ViewStreamConfig {
name: Some("all.histograms".to_string()),
description: None,
aggregation: AggregationConfig::Default,
aggregation_cardinality_limit: None,
attribute_keys: AttributeKeysConfig {
included: vec![],
excluded: vec![],
},
},
};
let ctx = TelemetryContext::builder()
.with_view(view.to_view())
.build();
let meter = opentelemetry::global::meter_provider().meter("test");
let counter = meter.u64_counter("my.counter").build();
counter.add(1, &[]);
let histogram = meter.f64_histogram("my.histogram").build();
histogram.record(1.0, &[]);
let metrics = ctx.metrics();
assert!(find_metric(&metrics, "my.counter").is_some());
assert!(find_metric(&metrics, "all.histograms").is_some());
assert!(find_metric(&metrics, "my.histogram").is_none());
}
#[test]
fn selector_matches_by_meter_name() {
let view = ViewConfig {
selector: ViewSelectorConfig {
instrument_name: None,
instrument_type: None,
unit: None,
meter_name: Some("my-library".to_string()),
meter_version: None,
meter_schema_url: None,
},
stream: ViewStreamConfig {
name: None,
description: Some("From my-library".to_string()),
aggregation: AggregationConfig::Default,
aggregation_cardinality_limit: None,
attribute_keys: AttributeKeysConfig {
included: vec![],
excluded: vec![],
},
},
};
let ctx = TelemetryContext::builder()
.with_view(view.to_view())
.build();
let provider = opentelemetry::global::meter_provider();
let meter1 = provider.meter("my-library");
let counter1 = meter1.u64_counter("lib.counter").build();
counter1.add(1, &[]);
let meter2 = provider.meter("other-library");
let counter2 = meter2.u64_counter("other.counter").build();
counter2.add(1, &[]);
let metrics = ctx.metrics();
let lib_metric = find_metric(&metrics, "lib.counter").expect("lib.counter not found");
assert_eq!(lib_metric.description(), "From my-library");
let other_metric = find_metric(&metrics, "other.counter").expect("other.counter not found");
assert_eq!(other_metric.description(), "");
}
#[test]
fn selector_matches_by_unit() {
let view = ViewConfig {
selector: ViewSelectorConfig {
instrument_name: None,
instrument_type: None,
unit: Some("ms".to_string()),
meter_name: None,
meter_version: None,
meter_schema_url: None,
},
stream: ViewStreamConfig {
name: None,
description: Some("Millisecond metric".to_string()),
aggregation: AggregationConfig::Default,
aggregation_cardinality_limit: None,
attribute_keys: AttributeKeysConfig {
included: vec![],
excluded: vec![],
},
},
};
let ctx = TelemetryContext::builder()
.with_view(view.to_view())
.build();
let meter = opentelemetry::global::meter_provider().meter("test");
let ms_histogram = meter.f64_histogram("latency").with_unit("ms").build();
ms_histogram.record(100.0, &[]);
let s_histogram = meter.f64_histogram("duration").with_unit("s").build();
s_histogram.record(1.0, &[]);
let metrics = ctx.metrics();
let latency = find_metric(&metrics, "latency").expect("latency not found");
assert_eq!(latency.description(), "Millisecond metric");
let duration = find_metric(&metrics, "duration").expect("duration not found");
assert_eq!(duration.description(), "");
}
#[test]
fn selector_matches_by_meter_version() {
let view = ViewConfig {
selector: ViewSelectorConfig {
instrument_name: None,
instrument_type: None,
unit: None,
meter_name: None,
meter_version: Some("1.0.0".to_string()),
meter_schema_url: None,
},
stream: ViewStreamConfig {
name: None,
description: Some("Version 1.0.0".to_string()),
aggregation: AggregationConfig::Default,
aggregation_cardinality_limit: None,
attribute_keys: AttributeKeysConfig {
included: vec![],
excluded: vec![],
},
},
};
let ctx = TelemetryContext::builder()
.with_view(view.to_view())
.build();
let provider = opentelemetry::global::meter_provider();
let scope_v1 = InstrumentationScope::builder("test")
.with_version("1.0.0")
.build();
let meter_v1 = provider.meter_with_scope(scope_v1);
let counter_v1 = meter_v1.u64_counter("v1.counter").build();
counter_v1.add(1, &[]);
let scope_v2 = InstrumentationScope::builder("test")
.with_version("2.0.0")
.build();
let meter_v2 = provider.meter_with_scope(scope_v2);
let counter_v2 = meter_v2.u64_counter("v2.counter").build();
counter_v2.add(1, &[]);
let metrics = ctx.metrics();
let v1_metric = find_metric(&metrics, "v1.counter").expect("v1.counter not found");
assert_eq!(v1_metric.description(), "Version 1.0.0");
let v2_metric = find_metric(&metrics, "v2.counter").expect("v2.counter not found");
assert_eq!(v2_metric.description(), "");
}
#[test]
fn selector_matches_with_wildcard_pattern() {
let view = ViewConfig {
selector: ViewSelectorConfig {
instrument_name: Some("http.*".to_string()),
instrument_type: None,
unit: None,
meter_name: None,
meter_version: None,
meter_schema_url: None,
},
stream: ViewStreamConfig {
name: None,
description: Some("HTTP metric".to_string()),
aggregation: AggregationConfig::Default,
aggregation_cardinality_limit: None,
attribute_keys: AttributeKeysConfig {
included: vec![],
excluded: vec![],
},
},
};
let ctx = TelemetryContext::builder()
.with_view(view.to_view())
.build();
let meter = opentelemetry::global::meter_provider().meter("test");
let http_requests = meter.u64_counter("http.requests").build();
http_requests.add(1, &[]);
let http_duration = meter.f64_histogram("http.duration").build();
http_duration.record(100.0, &[]);
let db_queries = meter.u64_counter("db.queries").build();
db_queries.add(1, &[]);
let metrics = ctx.metrics();
let http_req = find_metric(&metrics, "http.requests").expect("http.requests not found");
assert_eq!(http_req.description(), "HTTP metric");
let http_dur = find_metric(&metrics, "http.duration").expect("http.duration not found");
assert_eq!(http_dur.description(), "HTTP metric");
let db = find_metric(&metrics, "db.queries").expect("db.queries not found");
assert_eq!(db.description(), "");
}
#[test]
fn to_view_integration_with_multiple_selectors() {
let view1 = ViewConfig {
selector: ViewSelectorConfig {
instrument_name: Some("http.*".to_string()),
instrument_type: None,
unit: None,
meter_name: None,
meter_version: None,
meter_schema_url: None,
},
stream: ViewStreamConfig {
name: None,
description: Some("HTTP metric".to_string()),
aggregation: AggregationConfig::Default,
aggregation_cardinality_limit: None,
attribute_keys: AttributeKeysConfig {
included: vec!["method".to_string(), "status".to_string()],
excluded: vec![],
},
},
};
let view2 = ViewConfig {
selector: ViewSelectorConfig {
instrument_name: Some("db.*".to_string()),
instrument_type: None,
unit: None,
meter_name: None,
meter_version: None,
meter_schema_url: None,
},
stream: ViewStreamConfig {
name: None,
description: Some("Database metric".to_string()),
aggregation: AggregationConfig::Default,
aggregation_cardinality_limit: None,
attribute_keys: AttributeKeysConfig {
included: vec!["operation".to_string()],
excluded: vec![],
},
},
};
let ctx = TelemetryContext::builder()
.with_view(view1.to_view())
.with_view(view2.to_view())
.build();
let meter = opentelemetry::global::meter_provider().meter("test");
let http_counter = meter.u64_counter("http.requests").build();
http_counter.add(
1,
&[
KeyValue::new("method", "GET"),
KeyValue::new("status", "200"),
KeyValue::new("path", "/api"), ],
);
let db_counter = meter.u64_counter("db.queries").build();
db_counter.add(
1,
&[
KeyValue::new("operation", "SELECT"),
KeyValue::new("table", "users"), ],
);
let metrics = ctx.metrics();
let http_metric = find_metric(&metrics, "http.requests").expect("http.requests not found");
assert_eq!(http_metric.description(), "HTTP metric");
match http_metric.data() {
AggregatedMetrics::U64(SdkMetricData::Sum(sum)) => {
let dp = sum.data_points().next().expect("no data points");
let keys: Vec<_> = dp.attributes().map(|kv| kv.key.as_str()).collect();
assert!(keys.contains(&"method"));
assert!(keys.contains(&"status"));
assert!(!keys.contains(&"path"));
}
_ => panic!("expected U64 Sum"),
}
let db_metric = find_metric(&metrics, "db.queries").expect("db.queries not found");
assert_eq!(db_metric.description(), "Database metric");
match db_metric.data() {
AggregatedMetrics::U64(SdkMetricData::Sum(sum)) => {
let dp = sum.data_points().next().expect("no data points");
let keys: Vec<_> = dp.attributes().map(|kv| kv.key.as_str()).collect();
assert!(keys.contains(&"operation"));
assert!(!keys.contains(&"table"));
}
_ => panic!("expected U64 Sum"),
}
}
}