use std::{
hash::{Hash, Hasher},
mem,
sync::Arc,
};
use fxhash::FxHashMap;
use metrics::{GaugeValue, Key, KeyHasher};
use metrics_util::{Generational, Handle, Hashable, MetricKind, NotTracked, Registry};
use parking_lot::{RwLock, RwLockReadGuard};
use elfo_core::{scope::Scope, ActorMeta, Addr};
use crate::protocol::{Metrics, Snapshot};
pub(crate) struct Storage {
registry: Registry<ExtKey, ExtHandle, NotTracked<ExtHandle>>,
descriptions: RwLock<FxHashMap<String, &'static str>>,
}
#[derive(Clone, PartialEq, Eq, Hash)]
struct ExtKey {
group: Addr, key_hash: u64,
}
fn make_ext_key(scope: Option<&Scope>, key: &Key, with_actor_key: bool) -> ExtKey {
let mut key_hash = key.get_hash();
if let Some(scope) = scope.filter(|_| with_actor_key) {
debug_assert!(!scope.telemetry_meta().key.is_empty());
let mut hasher = KeyHasher::default();
scope.telemetry_meta().key.hash(&mut hasher);
key_hash ^= hasher.finish();
}
let group = if let Some(scope) = scope {
debug_assert_ne!(scope.group(), Addr::NULL);
scope.group()
} else {
Addr::NULL
};
ExtKey { group, key_hash }
}
impl Hashable for ExtKey {
#[inline]
fn hashable(&self) -> u64 {
let mut hasher = KeyHasher::default();
self.hash(&mut hasher);
hasher.finish()
}
}
#[derive(Clone)]
struct ExtHandle {
meta: Option<Arc<ActorMeta>>, with_actor_key: bool,
key: Key,
handle: Handle,
}
fn make_ext_handle(
scope: Option<&Scope>,
key: &Key,
handle: Handle,
with_actor_key: bool,
) -> ExtHandle {
ExtHandle {
meta: scope.map(|scope| scope.telemetry_meta().clone()),
with_actor_key,
key: key.clone(),
handle,
}
}
impl Storage {
pub(crate) fn new() -> Self {
Self {
registry: Registry::<ExtKey, ExtHandle, NotTracked<ExtHandle>>::untracked(),
descriptions: Default::default(),
}
}
pub(crate) fn descriptions(&self) -> RwLockReadGuard<'_, FxHashMap<String, &'static str>> {
self.descriptions.read()
}
pub(crate) fn add_description_if_missing(&self, key: &Key, description: Option<&'static str>) {
if let Some(description) = description {
let mut descriptions = self.descriptions.write();
if !descriptions.contains_key(key.name().to_string().as_str()) {
descriptions.insert(key.name().to_string(), description);
}
}
}
pub(crate) fn touch_counter(&self, scope: Option<&Scope>, key: &Key, with_actor_key: bool) {
let ext_key = make_ext_key(scope, key, with_actor_key);
self.registry.op(
MetricKind::Counter,
&ext_key,
|_| {},
|| make_ext_handle(scope, key, Handle::counter(), with_actor_key),
);
}
pub(crate) fn touch_gauge(&self, scope: Option<&Scope>, key: &Key, with_actor_key: bool) {
let ext_key = make_ext_key(scope, key, with_actor_key);
self.registry.op(
MetricKind::Gauge,
&ext_key,
|_| {},
|| make_ext_handle(scope, key, Handle::gauge(), with_actor_key),
);
}
pub(crate) fn touch_histogram(&self, scope: Option<&Scope>, key: &Key, with_actor_key: bool) {
let ext_key = make_ext_key(scope, key, with_actor_key);
self.registry.op(
MetricKind::Histogram,
&ext_key,
|_| {},
|| make_ext_handle(scope, key, Handle::histogram(), with_actor_key),
);
}
pub(crate) fn increment_counter(
&self,
scope: Option<&Scope>,
key: &Key,
value: u64,
with_actor_key: bool,
) {
let ext_key = make_ext_key(scope, key, with_actor_key);
self.registry.op(
MetricKind::Counter,
&ext_key,
|h| h.handle.increment_counter(value),
|| make_ext_handle(scope, key, Handle::counter(), with_actor_key),
);
}
pub(crate) fn update_gauge(
&self,
scope: Option<&Scope>,
key: &Key,
value: GaugeValue,
with_actor_key: bool,
) {
let ext_key = make_ext_key(scope, key, with_actor_key);
self.registry.op(
MetricKind::Gauge,
&ext_key,
|h| h.handle.update_gauge(value),
|| make_ext_handle(scope, key, Handle::gauge(), with_actor_key),
);
}
pub(crate) fn record_histogram(
&self,
scope: Option<&Scope>,
key: &Key,
value: f64,
with_actor_key: bool,
) {
let ext_key = make_ext_key(scope, key, with_actor_key);
self.registry.op(
MetricKind::Histogram,
&ext_key,
|h| h.handle.record_histogram(value),
|| make_ext_handle(scope, key, Handle::histogram(), with_actor_key),
);
}
pub(crate) fn fill_snapshot(&self, snapshot: &mut Snapshot, only_histograms: bool) -> usize {
let mut histograms = Vec::new();
let mut estimated_size = 0;
self.registry.visit(|kind, (_, h)| {
if kind == MetricKind::Histogram {
histograms.push(h.get_inner().clone());
return;
}
if only_histograms {
return;
}
estimated_size += fill_metric(snapshot, h.get_inner());
});
for handle in histograms {
estimated_size += fill_metric(snapshot, &handle);
}
estimated_size
}
}
fn fill_metric(snapshot: &mut Snapshot, handle: &ExtHandle) -> usize {
let m = get_metrics(snapshot, handle);
let h = &handle.handle;
let estimated_size = match h {
Handle::Counter(_) => {
m.counters.insert(handle.key.clone(), h.read_counter());
8
}
Handle::Gauge(_) => {
m.gauges.insert(handle.key.clone(), h.read_gauge());
8
}
Handle::Histogram(_) => {
let mut bucket_len = 0;
let d = m.distributions.entry(handle.key.clone()).or_default();
h.read_histogram_with_clear(|samples| {
bucket_len += samples.len();
d.record_samples(samples);
});
d.estimated_size() + 8 * bucket_len
}
};
mem::size_of::<ExtKey>() + mem::size_of::<ExtHandle>() + estimated_size
}
fn get_metrics<'a>(snapshot: &'a mut Snapshot, handle: &ExtHandle) -> &'a mut Metrics {
if let Some(meta) = &handle.meta {
if handle.with_actor_key {
snapshot.per_actor.entry(meta.clone()).or_default()
} else if snapshot.per_group.contains_key(&meta.group) {
snapshot.per_group.get_mut(&meta.group).unwrap()
} else {
snapshot.per_group.entry(meta.group.clone()).or_default()
}
} else {
&mut snapshot.global
}
}