use crate::metrics::{
data::{self, AggregatedMetrics, GaugeDataPoint, MetricData},
Temporality,
};
use opentelemetry::KeyValue;
#[cfg(feature = "experimental_metrics_bound_instruments")]
use std::sync::atomic::Ordering;
#[cfg(feature = "experimental_metrics_bound_instruments")]
use std::sync::Arc;
use super::{
aggregate::{AggregateTimeInitiator, AttributeSetFilter},
Aggregator, AtomicTracker, AtomicallyUpdate, ComputeAggregation, Measure, Number, ValueMap,
};
#[cfg(feature = "experimental_metrics_bound_instruments")]
use super::{BoundMeasure, NoopBoundMeasure, TrackerEntry};
#[cfg(feature = "experimental_metrics_bound_instruments")]
struct BoundLastValueHandle<T: Number> {
tracker: Arc<TrackerEntry<Assign<T>>>,
}
#[cfg(feature = "experimental_metrics_bound_instruments")]
impl<T: Number> BoundMeasure<T> for BoundLastValueHandle<T> {
fn call(&self, measurement: T) {
self.tracker.aggregator.update(measurement);
self.tracker.has_been_updated.store(true, Ordering::Release);
}
}
#[cfg(feature = "experimental_metrics_bound_instruments")]
impl<T: Number> Drop for BoundLastValueHandle<T> {
fn drop(&mut self) {
self.tracker.bound_count.fetch_sub(1, Ordering::Relaxed);
}
}
pub(crate) struct Assign<T>
where
T: AtomicallyUpdate<T>,
{
pub(crate) value: T::AtomicTracker,
}
impl<T> Aggregator for Assign<T>
where
T: Number,
{
type InitConfig = ();
type PreComputedValue = T;
fn create(_init: &()) -> Self {
Self {
value: T::new_atomic_tracker(T::default()),
}
}
fn update(&self, value: T) {
self.value.store(value)
}
fn clone_and_reset(&self, _: &()) -> Self {
Self {
value: T::new_atomic_tracker(self.value.get_and_reset_value()),
}
}
}
pub(crate) struct LastValue<T: Number> {
value_map: ValueMap<Assign<T>>,
init_time: AggregateTimeInitiator,
temporality: Temporality,
filter: AttributeSetFilter,
}
impl<T: Number> LastValue<T> {
pub(crate) fn new(
temporality: Temporality,
filter: AttributeSetFilter,
cardinality_limit: usize,
) -> Self {
LastValue {
value_map: ValueMap::new((), cardinality_limit),
init_time: AggregateTimeInitiator::default(),
temporality,
filter,
}
}
pub(crate) fn delta(&self, dest: Option<&mut MetricData<T>>) -> (usize, Option<MetricData<T>>) {
let time = self.init_time.delta();
let s_data = dest.and_then(|d| {
if let MetricData::Gauge(gauge) = d {
Some(gauge)
} else {
None
}
});
let mut new_agg = if s_data.is_none() {
Some(data::Gauge {
data_points: vec![],
start_time: Some(time.start),
time: time.current,
})
} else {
None
};
let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none"));
s_data.start_time = Some(time.start);
s_data.time = time.current;
self.value_map
.collect_and_reset(&mut s_data.data_points, |attributes, aggr| GaugeDataPoint {
attributes,
value: aggr.value.get_and_reset_value(),
exemplars: vec![],
});
(s_data.data_points.len(), new_agg.map(Into::into))
}
pub(crate) fn cumulative(
&self,
dest: Option<&mut MetricData<T>>,
) -> (usize, Option<MetricData<T>>) {
let time = self.init_time.cumulative();
let s_data = dest.and_then(|d| {
if let MetricData::Gauge(gauge) = d {
Some(gauge)
} else {
None
}
});
let mut new_agg = if s_data.is_none() {
Some(data::Gauge {
data_points: vec![],
start_time: Some(time.start),
time: time.current,
})
} else {
None
};
let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none"));
s_data.start_time = Some(time.start);
s_data.time = time.current;
self.value_map
.collect_readonly(&mut s_data.data_points, |attributes, aggr| GaugeDataPoint {
attributes,
value: aggr.value.get_value(),
exemplars: vec![],
});
(s_data.data_points.len(), new_agg.map(Into::into))
}
}
impl<T> Measure<T> for LastValue<T>
where
T: Number,
{
fn call(&self, measurement: T, attrs: &[KeyValue]) {
self.filter.apply(attrs, |filtered| {
self.value_map.measure(measurement, filtered);
})
}
#[cfg(feature = "experimental_metrics_bound_instruments")]
fn bind(&self, attrs: &[KeyValue]) -> Box<dyn BoundMeasure<T>> {
let mut bound_attrs = Vec::new();
self.filter.apply(attrs, |filtered| {
bound_attrs = filtered.to_vec();
});
match self.value_map.bind(&bound_attrs) {
Some(tracker) => Box::new(BoundLastValueHandle { tracker }),
None => Box::new(NoopBoundMeasure::new()),
}
}
}
impl<T> ComputeAggregation for LastValue<T>
where
T: Number,
{
fn call(&self, dest: Option<&mut AggregatedMetrics>) -> (usize, Option<AggregatedMetrics>) {
let data = dest.and_then(|d| T::extract_metrics_data_mut(d));
let (len, new) = match self.temporality {
Temporality::Delta => self.delta(data),
_ => self.cumulative(data),
};
(len, new.map(T::make_aggregated_metrics))
}
}
#[cfg(all(test, feature = "experimental_metrics_bound_instruments"))]
mod tests {
use super::*;
use crate::metrics::data::{AggregatedMetrics, Gauge, MetricData};
fn extract_gauge(agg: AggregatedMetrics) -> Gauge<u64> {
match agg {
AggregatedMetrics::U64(MetricData::Gauge(g)) => g,
_ => panic!("expected u64 Gauge"),
}
}
#[test]
fn bind_writes_through_bound_handle() {
let last_value =
LastValue::<u64>::new(Temporality::Cumulative, AttributeSetFilter::new(None), 100);
let attrs = [KeyValue::new("k", "v")];
let bound = Measure::bind(&last_value, &attrs);
bound.call(7);
bound.call(42);
let (count, agg) = ComputeAggregation::call(&last_value, None);
assert_eq!(count, 1);
let gauge = extract_gauge(agg.expect("aggregation produced"));
assert_eq!(gauge.data_points.len(), 1);
assert_eq!(gauge.data_points[0].value, 42);
assert_eq!(gauge.data_points[0].attributes, attrs.to_vec());
}
#[test]
fn bound_handle_drop_decrements_bound_count() {
let last_value =
LastValue::<u64>::new(Temporality::Delta, AttributeSetFilter::new(None), 100);
let attrs = [KeyValue::new("k", "v")];
let bound = Measure::bind(&last_value, &attrs);
bound.call(5);
let trackers = last_value.value_map.trackers.read().unwrap();
let entry = trackers
.values()
.next()
.expect("entry should exist after bind+call");
assert_eq!(
entry.bound_count.load(Ordering::Relaxed),
1,
"bound_count should reflect a live handle"
);
drop(trackers);
drop(bound);
let trackers = last_value.value_map.trackers.read().unwrap();
let entry = trackers
.values()
.next()
.expect("entry should still exist post-drop");
assert_eq!(
entry.bound_count.load(Ordering::Relaxed),
0,
"bound_count should drop to 0 after handle drops"
);
}
}