use crate::sdk::{
export::metrics::{
self, Accumulation, Aggregator, AggregatorSelector, CheckpointSet, Checkpointer,
ExportKind, ExportKindFor, LockedProcessor, Processor, Record, Subtractor,
},
metrics::aggregators::SumAggregator,
Resource,
};
use crate::{
labels::{hash_labels, LabelSet},
metrics::{Descriptor, MetricsError, Result},
};
use fnv::FnvHasher;
use std::collections::HashMap;
use std::hash::{Hash, Hasher};
use std::sync::{Arc, Mutex, MutexGuard};
use std::time::SystemTime;
pub fn basic(
aggregator_selector: Box<dyn AggregatorSelector + Send + Sync>,
export_selector: Box<dyn ExportKindFor + Send + Sync>,
memory: bool,
) -> BasicProcessor {
BasicProcessor {
aggregator_selector,
export_selector,
state: Mutex::new(BasicProcessorState::with_memory(memory)),
}
}
#[derive(Debug)]
pub struct BasicProcessor {
aggregator_selector: Box<dyn AggregatorSelector + Send + Sync>,
export_selector: Box<dyn ExportKindFor + Send + Sync>,
state: Mutex<BasicProcessorState>,
}
impl BasicProcessor {
pub fn lock(&self) -> Result<BasicLockedProcessor<'_>> {
self.state
.lock()
.map_err(From::from)
.map(|locked| BasicLockedProcessor {
parent: self,
state: locked,
})
}
}
impl Processor for BasicProcessor {
fn aggregation_selector(&self) -> &dyn AggregatorSelector {
self.aggregator_selector.as_ref()
}
}
#[derive(Debug)]
pub struct BasicLockedProcessor<'a> {
parent: &'a BasicProcessor,
state: MutexGuard<'a, BasicProcessorState>,
}
impl<'a> LockedProcessor for BasicLockedProcessor<'a> {
fn process(&mut self, accumulation: Accumulation<'_>) -> Result<()> {
if self.state.started_collection != self.state.finished_collection.wrapping_add(1) {
return Err(MetricsError::InconsistentState);
}
let desc = accumulation.descriptor();
let mut hasher = FnvHasher::default();
desc.attribute_hash().hash(&mut hasher);
hash_labels(&mut hasher, accumulation.labels().into_iter());
hash_labels(&mut hasher, accumulation.resource().into_iter());
let key = StateKey(hasher.finish());
let agg = accumulation.aggregator();
let finished_collection = self.state.finished_collection;
if let Some(value) = self.state.values.get_mut(&key) {
let same_collection = finished_collection == value.updated;
value.updated = finished_collection;
if !same_collection {
if !value.current_owned {
value.current = agg.clone();
return Ok(());
}
return agg.synchronized_move(&value.current, desc);
}
if !value.current_owned {
let tmp = value.current.clone();
if let Some(current) = self.parent.aggregation_selector().aggregator_for(desc) {
value.current = current;
value.current_owned = true;
tmp.synchronized_move(&value.current, &desc)?;
}
}
return value.current.merge(agg.as_ref(), desc);
}
let stateful = self
.parent
.export_selector
.export_kind_for(&desc)
.memory_required(desc.instrument_kind());
let mut delta = None;
let cumulative = if stateful {
if desc.instrument_kind().precomputed_sum() {
delta = self.parent.aggregation_selector().aggregator_for(desc);
}
self.parent.aggregation_selector().aggregator_for(desc)
} else {
None
};
self.state.values.insert(
key,
StateValue {
descriptor: desc.clone(),
labels: accumulation.labels().clone(),
resource: accumulation.resource().clone(),
current_owned: false,
current: agg.clone(),
delta,
cumulative,
stateful,
updated: finished_collection,
},
);
Ok(())
}
}
impl Checkpointer for BasicLockedProcessor<'_> {
fn checkpoint_set(&mut self) -> &mut dyn CheckpointSet {
&mut *self.state
}
fn start_collection(&mut self) {
if self.state.started_collection != 0 {
self.state.interval_start = self.state.interval_end;
}
self.state.started_collection = self.state.started_collection.wrapping_add(1);
}
fn finish_collection(&mut self) -> Result<()> {
self.state.interval_end = crate::time::now();
if self.state.started_collection != self.state.finished_collection.wrapping_add(1) {
return Err(MetricsError::InconsistentState);
}
let finished_collection = self.state.finished_collection;
self.state.finished_collection = self.state.finished_collection.wrapping_add(1);
let has_memory = self.state.config.memory;
let mut result = Ok(());
self.state.values.retain(|_key, value| {
if result.is_err() {
return true;
}
let mkind = value.descriptor.instrument_kind();
let stale = value.updated != finished_collection;
let stateless = !value.stateful;
if stale || stateless {
if stale && stateless && !has_memory {
return false;
}
return true;
}
if mkind.precomputed_sum() {
if let Some(current_subtractor) =
value.current.as_any().downcast_ref::<SumAggregator>()
{
if let (Some(cumulative), Some(delta)) =
(value.cumulative.as_ref(), value.delta.as_ref())
{
result = current_subtractor
.subtract(cumulative.as_ref(), delta.as_ref(), &value.descriptor)
.and_then(|_| {
value
.current
.synchronized_move(cumulative, &value.descriptor)
});
}
} else {
result = Err(MetricsError::NoSubtraction);
}
} else {
if let Some(cumulative) = value.cumulative.as_ref() {
result = cumulative.merge(value.current.as_ref(), &value.descriptor)
}
}
true
});
result
}
}
#[derive(Debug, Default)]
struct BasicProcessorConfig {
memory: bool,
}
#[derive(Debug)]
struct BasicProcessorState {
config: BasicProcessorConfig,
values: HashMap<StateKey, StateValue>,
process_start: SystemTime,
interval_start: SystemTime,
interval_end: SystemTime,
started_collection: u64,
finished_collection: u64,
}
impl BasicProcessorState {
fn with_memory(memory: bool) -> Self {
let mut state = BasicProcessorState::default();
state.config.memory = memory;
state
}
}
impl Default for BasicProcessorState {
fn default() -> Self {
BasicProcessorState {
config: BasicProcessorConfig::default(),
values: HashMap::default(),
process_start: crate::time::now(),
interval_start: crate::time::now(),
interval_end: crate::time::now(),
started_collection: 0,
finished_collection: 0,
}
}
}
impl CheckpointSet for BasicProcessorState {
fn try_for_each(
&mut self,
exporter: &dyn ExportKindFor,
f: &mut dyn FnMut(&Record<'_>) -> Result<()>,
) -> Result<()> {
if self.started_collection != self.finished_collection {
return Err(MetricsError::InconsistentState);
}
self.values.iter().try_for_each(|(_key, value)| {
let instrument_kind = value.descriptor.instrument_kind();
let agg;
let start;
if !self.config.memory && value.updated != self.finished_collection.wrapping_sub(1) {
return Ok(());
}
match exporter.export_kind_for(&value.descriptor) {
ExportKind::Cumulative => {
if value.stateful {
agg = value.cumulative.as_ref();
} else {
agg = Some(&value.current);
}
start = self.process_start;
}
ExportKind::Delta => {
if instrument_kind.precomputed_sum() {
agg = value.delta.as_ref();
} else {
agg = Some(&value.current);
}
start = self.interval_start;
}
}
let res = f(&metrics::record(
&value.descriptor,
&value.labels,
&value.resource,
agg,
start,
self.interval_end,
));
if let Err(MetricsError::NoDataCollected) = res {
Ok(())
} else {
res
}
})
}
}
#[derive(Debug, PartialEq, Eq, Hash)]
struct StateKey(u64);
#[derive(Debug)]
struct StateValue {
descriptor: Descriptor,
labels: LabelSet,
resource: Resource,
updated: u64,
stateful: bool,
current_owned: bool,
current: Arc<dyn Aggregator + Send + Sync>,
delta: Option<Arc<dyn Aggregator + Send + Sync>>,
cumulative: Option<Arc<dyn Aggregator + Send + Sync>>,
}