use core::fmt;
use std::{
borrow::Cow,
collections::{HashMap, HashSet},
sync::{Arc, Mutex},
};
use opentelemetry::{otel_debug, otel_warn, InstrumentationScope, KeyValue};
use crate::{
error::{OTelSdkError, OTelSdkResult},
metrics::{
aggregation,
data::{Metric, ResourceMetrics, ScopeMetrics},
error::{MetricError, MetricResult},
instrument::{Instrument, InstrumentId, InstrumentKind, Stream},
internal::{self, AggregateBuilder, Number},
reader::{MetricReader, SdkProducer},
view::View,
},
Resource,
};
use self::internal::AggregateFns;
use super::{aggregation::Aggregation, Temporality};
#[doc(hidden)]
pub struct Pipeline {
pub(crate) resource: Resource,
reader: Box<dyn MetricReader>,
views: Vec<Arc<dyn View>>,
inner: Mutex<PipelineInner>,
}
impl fmt::Debug for Pipeline {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("Pipeline")
}
}
type GenericCallback = Arc<dyn Fn() + Send + Sync>;
const DEFAULT_CARDINALITY_LIMIT: usize = 2000;
#[derive(Default)]
struct PipelineInner {
aggregations: HashMap<InstrumentationScope, Vec<InstrumentSync>>,
callbacks: Vec<GenericCallback>,
}
impl fmt::Debug for PipelineInner {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("PipelineInner")
.field("aggregations", &self.aggregations)
.field("callbacks", &self.callbacks.len())
.finish()
}
}
impl Pipeline {
fn add_sync(&self, scope: InstrumentationScope, i_sync: InstrumentSync) {
let _ = self.inner.lock().map(|mut inner| {
inner.aggregations.entry(scope).or_default().push(i_sync);
});
}
fn add_callback(&self, callback: GenericCallback) {
let _ = self
.inner
.lock()
.map(|mut inner| inner.callbacks.push(callback));
}
fn force_flush(&self) -> OTelSdkResult {
self.reader.force_flush()
}
fn shutdown(&self) -> OTelSdkResult {
self.reader.shutdown()
}
}
impl SdkProducer for Pipeline {
fn produce(&self, rm: &mut ResourceMetrics) -> OTelSdkResult {
let inner = self
.inner
.lock()
.map_err(|_| OTelSdkError::InternalFailure("Failed to lock pipeline".into()))?;
otel_debug!(
name: "MeterProviderInvokingObservableCallbacks",
count = inner.callbacks.len(),
);
for cb in &inner.callbacks {
cb();
}
rm.resource = self.resource.clone();
if inner.aggregations.len() > rm.scope_metrics.len() {
rm.scope_metrics
.reserve(inner.aggregations.len() - rm.scope_metrics.len());
}
let mut i = 0;
for (scope, instruments) in inner.aggregations.iter() {
let sm = match rm.scope_metrics.get_mut(i) {
Some(sm) => sm,
None => {
rm.scope_metrics.push(ScopeMetrics::default());
rm.scope_metrics.last_mut().unwrap()
}
};
if instruments.len() > sm.metrics.len() {
sm.metrics.reserve(instruments.len() - sm.metrics.len());
}
let mut j = 0;
for inst in instruments {
let mut m = sm.metrics.get_mut(j);
match (inst.comp_agg.call(m.as_mut().map(|m| &mut m.data)), m) {
((len, Some(initial_agg)), None) if len > 0 => sm.metrics.push(Metric {
name: inst.name.clone(),
description: inst.description.clone(),
unit: inst.unit.clone(),
data: initial_agg,
}),
((len, data), Some(prev_agg)) if len > 0 => {
if let Some(data) = data {
prev_agg.data = data;
}
prev_agg.name.clone_from(&inst.name);
prev_agg.description.clone_from(&inst.description);
prev_agg.unit.clone_from(&inst.unit);
}
_ => continue,
}
j += 1;
}
sm.metrics.truncate(j);
if !sm.metrics.is_empty() {
sm.scope = scope.clone();
i += 1;
}
}
rm.scope_metrics.truncate(i);
Ok(())
}
}
struct InstrumentSync {
name: Cow<'static, str>,
description: Cow<'static, str>,
unit: Cow<'static, str>,
comp_agg: Arc<dyn internal::ComputeAggregation>,
}
impl fmt::Debug for InstrumentSync {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("InstrumentSync")
.field("name", &self.name)
.field("description", &self.description)
.field("unit", &self.unit)
.finish()
}
}
type Cache<T> = Mutex<HashMap<InstrumentId, MetricResult<Option<Arc<dyn internal::Measure<T>>>>>>;
struct Inserter<T> {
aggregators: Cache<T>,
views: Arc<Mutex<HashMap<Cow<'static, str>, InstrumentId>>>,
pipeline: Arc<Pipeline>,
}
impl<T> Inserter<T>
where
T: Number,
{
fn new(p: Arc<Pipeline>, vc: Arc<Mutex<HashMap<Cow<'static, str>, InstrumentId>>>) -> Self {
Inserter {
aggregators: Default::default(),
views: vc,
pipeline: Arc::clone(&p),
}
}
fn instrument(
&self,
inst: Instrument,
boundaries: Option<&[f64]>,
) -> MetricResult<Vec<Arc<dyn internal::Measure<T>>>> {
let mut matched = false;
let mut measures = vec![];
let mut errs = vec![];
let kind = inst.kind;
let mut seen = HashSet::new();
for v in &self.pipeline.views {
let mut stream = match v.match_inst(&inst) {
Some(stream) => stream,
None => continue,
};
matched = true;
if stream.name.is_none() {
stream.name = Some(inst.name.clone());
}
if stream.description.is_none() {
stream.description = Some(inst.description.clone());
}
if stream.unit.is_none() {
stream.unit = Some(inst.unit.clone());
}
if let Some(boundaries) = boundaries {
if kind == InstrumentKind::Histogram && stream.aggregation.is_none() {
stream.aggregation = Some(Aggregation::ExplicitBucketHistogram {
boundaries: boundaries.to_vec(),
record_min_max: true,
});
}
}
let id = self.inst_id(kind, &stream);
if seen.contains(&id) {
continue; }
let agg = match self.cached_aggregator(&inst.scope, kind, stream) {
Ok(Some(agg)) => agg,
Ok(None) => continue, Err(err) => {
errs.push(err);
continue;
}
};
seen.insert(id);
measures.push(agg);
}
if matched && (errs.is_empty() || !measures.is_empty()) {
return Ok(measures);
}
if matched && !errs.is_empty() {
otel_warn!(
name: "ViewAggregationIncompatible",
message = "All matching views have incompatible aggregation. Falling back to default aggregation.",
instrument_name = inst.name.as_ref(),
errors = format!("{errs:?}").as_str(),
);
}
let mut stream = Stream {
name: Some(inst.name),
description: Some(inst.description),
unit: Some(inst.unit),
aggregation: None,
allowed_attribute_keys: None,
cardinality_limit: None,
};
if let Some(boundaries) = boundaries {
stream.aggregation = Some(Aggregation::ExplicitBucketHistogram {
boundaries: boundaries.to_vec(),
record_min_max: true,
});
}
match self.cached_aggregator(&inst.scope, kind, stream) {
Ok(agg) => {
if let Some(agg) = agg {
measures.push(agg);
}
Ok(measures)
}
Err(err) => {
errs.push(err);
Err(MetricError::Other(format!("{errs:?}")))
}
}
}
fn cached_aggregator(
&self,
scope: &InstrumentationScope,
kind: InstrumentKind,
mut stream: Stream,
) -> MetricResult<Option<Arc<dyn internal::Measure<T>>>> {
let mut agg = stream
.aggregation
.take()
.unwrap_or_else(|| default_aggregation_selector(kind));
if matches!(agg, aggregation::Aggregation::Default) {
agg = default_aggregation_selector(kind);
}
if let Err(err) = is_aggregator_compatible(&kind, &agg) {
return Err(MetricError::Other(format!(
"creating aggregator with instrumentKind: {:?}, aggregation {:?}: {:?}",
kind, stream.aggregation, err,
)));
}
let mut id = self.inst_id(kind, &stream);
self.log_conflict(&id);
id.normalize();
let mut cache = self.aggregators.lock()?;
let cached = cache.entry(id).or_insert_with(|| {
let filter = stream
.allowed_attribute_keys
.clone()
.map(|allowed| Arc::new(move |kv: &KeyValue| allowed.contains(&kv.key)) as Arc<_>);
let cardinality_limit = stream
.cardinality_limit
.unwrap_or(DEFAULT_CARDINALITY_LIMIT);
let b = AggregateBuilder::new(
self.pipeline.reader.temporality(kind),
filter,
cardinality_limit,
);
let AggregateFns { measure, collect } = match aggregate_fn(b, &agg, kind) {
Ok(Some(inst)) => inst,
other => return other.map(|fs| fs.map(|inst| inst.measure)), };
otel_debug!(
name : "Metrics.InstrumentCreated",
instrument_name = stream.name.clone().unwrap_or_default().as_ref(),
cardinality_limit = cardinality_limit,
);
self.pipeline.add_sync(
scope.clone(),
InstrumentSync {
name: stream.name.unwrap_or_default(),
description: stream.description.unwrap_or_default(),
unit: stream.unit.unwrap_or_default(),
comp_agg: collect,
},
);
Ok(Some(measure))
});
match cached {
Ok(opt) => Ok(opt.clone()),
Err(err) => Err(MetricError::Other(err.to_string())),
}
}
fn log_conflict(&self, id: &InstrumentId) {
if let Ok(views) = self.views.lock() {
if let Some(existing) = views.get(id.name.to_lowercase().as_str()) {
if existing == id {
return;
}
otel_debug!(
name: "Instrument.DuplicateMetricStreamDefinitions",
message = "duplicate metric stream definitions",
reason = format!("names: ({} and {}), descriptions: ({} and {}), kinds: ({:?} and {:?}), units: ({:?} and {:?}), and numbers: ({} and {})",
existing.name, id.name,
existing.description, id.description,
existing.kind, id.kind,
existing.unit, id.unit,
existing.number, id.number,)
);
}
}
}
fn inst_id(&self, kind: InstrumentKind, stream: &Stream) -> InstrumentId {
InstrumentId {
name: stream.name.clone().unwrap_or_default(),
description: stream.description.clone().unwrap_or_default(),
kind,
unit: stream.unit.clone().unwrap_or_default(),
number: Cow::Borrowed(std::any::type_name::<T>()),
}
}
}
fn default_aggregation_selector(kind: InstrumentKind) -> Aggregation {
match kind {
InstrumentKind::Counter
| InstrumentKind::UpDownCounter
| InstrumentKind::ObservableCounter
| InstrumentKind::ObservableUpDownCounter => Aggregation::Sum,
InstrumentKind::Gauge => Aggregation::LastValue,
InstrumentKind::ObservableGauge => Aggregation::LastValue,
InstrumentKind::Histogram => Aggregation::ExplicitBucketHistogram {
boundaries: vec![
0.0, 5.0, 10.0, 25.0, 50.0, 75.0, 100.0, 250.0, 500.0, 750.0, 1000.0, 2500.0,
5000.0, 7500.0, 10000.0,
],
record_min_max: true,
},
}
}
fn aggregate_fn<T: Number>(
b: AggregateBuilder<T>,
agg: &aggregation::Aggregation,
kind: InstrumentKind,
) -> MetricResult<Option<AggregateFns<T>>> {
match agg {
Aggregation::Default => aggregate_fn(b, &default_aggregation_selector(kind), kind),
Aggregation::Drop => Ok(None),
Aggregation::LastValue => {
match kind {
InstrumentKind::Gauge => Ok(Some(b.last_value(None))),
InstrumentKind::ObservableGauge => Ok(Some(b.last_value(Some(Temporality::Delta)))),
_ => Err(MetricError::Other(format!("LastValue aggregation is only available for Gauge or ObservableGauge, but not for {kind:?}")))
}
}
Aggregation::Sum => {
let fns = match kind {
InstrumentKind::ObservableCounter => b.precomputed_sum(true),
InstrumentKind::ObservableUpDownCounter => b.precomputed_sum(false),
InstrumentKind::Counter | InstrumentKind::Histogram => b.sum(true),
_ => b.sum(false),
};
Ok(Some(fns))
}
Aggregation::ExplicitBucketHistogram {
boundaries,
record_min_max,
} => {
let record_sum = !matches!(
kind,
InstrumentKind::UpDownCounter
| InstrumentKind::ObservableUpDownCounter
| InstrumentKind::ObservableGauge
);
Ok(Some(b.explicit_bucket_histogram(
boundaries.to_vec(),
*record_min_max,
record_sum,
)))
}
Aggregation::Base2ExponentialHistogram {
max_size,
max_scale,
record_min_max,
} => {
let record_sum = !matches!(
kind,
InstrumentKind::UpDownCounter
| InstrumentKind::ObservableUpDownCounter
| InstrumentKind::ObservableGauge
);
Ok(Some(b.exponential_bucket_histogram(
*max_size,
*max_scale,
*record_min_max,
record_sum,
)))
}
}
}
fn is_aggregator_compatible(
kind: &InstrumentKind,
agg: &aggregation::Aggregation,
) -> MetricResult<()> {
match agg {
Aggregation::Default => Ok(()),
Aggregation::ExplicitBucketHistogram { .. }
| Aggregation::Base2ExponentialHistogram { .. } => {
if matches!(
kind,
InstrumentKind::Counter
| InstrumentKind::UpDownCounter
| InstrumentKind::Gauge
| InstrumentKind::Histogram
| InstrumentKind::ObservableCounter
| InstrumentKind::ObservableUpDownCounter
| InstrumentKind::ObservableGauge
) {
return Ok(());
}
Err(MetricError::Other("incompatible aggregation".into()))
}
Aggregation::Sum => {
match kind {
InstrumentKind::ObservableCounter
| InstrumentKind::ObservableUpDownCounter
| InstrumentKind::Counter
| InstrumentKind::Histogram
| InstrumentKind::UpDownCounter => Ok(()),
_ => {
Err(MetricError::Other("incompatible aggregation".into()))
}
}
}
Aggregation::LastValue => {
match kind {
InstrumentKind::Gauge | InstrumentKind::ObservableGauge => Ok(()),
_ => {
Err(MetricError::Other("incompatible aggregation".into()))
}
}
}
Aggregation::Drop => Ok(()),
}
}
#[derive(Clone, Debug)]
pub(crate) struct Pipelines(pub(crate) Vec<Arc<Pipeline>>);
impl Pipelines {
pub(crate) fn new(
res: Resource,
readers: Vec<Box<dyn MetricReader>>,
views: Vec<Arc<dyn View>>,
) -> Self {
let mut pipes = Vec::with_capacity(readers.len());
for r in readers {
let p = Arc::new(Pipeline {
resource: res.clone(),
reader: r,
views: views.clone(),
inner: Default::default(),
});
p.reader.register_pipeline(Arc::downgrade(&p));
pipes.push(p);
}
Pipelines(pipes)
}
pub(crate) fn register_callback<F>(&self, callback: F)
where
F: Fn() + Send + Sync + 'static,
{
let cb = Arc::new(callback);
for pipe in &self.0 {
pipe.add_callback(cb.clone())
}
}
pub(crate) fn force_flush(&self) -> OTelSdkResult {
let mut errs = vec![];
for pipeline in &self.0 {
if let Err(err) = pipeline.force_flush() {
errs.push(err);
}
}
if errs.is_empty() {
Ok(())
} else {
Err(OTelSdkError::InternalFailure(format!("{errs:?}")))
}
}
pub(crate) fn shutdown(&self) -> OTelSdkResult {
let mut errs = vec![];
for pipeline in &self.0 {
if let Err(err) = pipeline.shutdown() {
errs.push(err);
}
}
if errs.is_empty() {
Ok(())
} else {
Err(crate::error::OTelSdkError::InternalFailure(format!(
"{errs:?}"
)))
}
}
}
pub(crate) struct Resolver<T> {
inserters: Vec<Inserter<T>>,
}
impl<T> Resolver<T>
where
T: Number,
{
pub(crate) fn new(
pipelines: Arc<Pipelines>,
view_cache: Arc<Mutex<HashMap<Cow<'static, str>, InstrumentId>>>,
) -> Self {
let inserters = pipelines
.0
.iter()
.map(|pipe| Inserter::new(Arc::clone(pipe), Arc::clone(&view_cache)))
.collect();
Resolver { inserters }
}
pub(crate) fn measures(
&self,
id: Instrument,
boundaries: Option<Vec<f64>>,
) -> MetricResult<Vec<Arc<dyn internal::Measure<T>>>> {
let (mut measures, mut errs) = (vec![], vec![]);
for inserter in &self.inserters {
match inserter.instrument(id.clone(), boundaries.as_deref()) {
Ok(ms) => measures.extend(ms),
Err(err) => errs.push(err),
}
}
if errs.is_empty() {
if measures.is_empty() {
}
Ok(measures)
} else {
Err(MetricError::Other(format!("{errs:?}")))
}
}
}