use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex, PoisonError};
use std::time::Duration;
use std::{collections::HashMap, ops::DerefMut};
use metrics::{Counter, CounterFn, Gauge, GaugeFn, Histogram, HistogramFn};
use quanta::{Clock, Instant};
use crate::Hashable;
use crate::{
kind::MetricKindMask,
registry::{AtomicStorage, Registry, Storage},
MetricKind,
};
#[derive(Clone, Copy, Debug, Eq, Ord, PartialEq, PartialOrd)]
pub struct Generation(usize);
#[derive(Clone, Debug)]
pub struct Generational<T> {
inner: T,
gen: Arc<AtomicUsize>,
}
impl<T> Generational<T> {
fn new(inner: T) -> Generational<T> {
Generational { inner, gen: Arc::new(AtomicUsize::new(0)) }
}
pub fn get_inner(&self) -> &T {
&self.inner
}
pub fn get_generation(&self) -> Generation {
Generation(self.gen.load(Ordering::Acquire))
}
pub fn with_increment<F, V>(&self, f: F) -> V
where
F: Fn(&T) -> V,
{
let result = f(&self.inner);
let _ = self.gen.fetch_add(1, Ordering::AcqRel);
result
}
}
impl<T> CounterFn for Generational<T>
where
T: CounterFn,
{
fn increment(&self, value: u64) {
self.with_increment(|c| c.increment(value))
}
fn absolute(&self, value: u64) {
self.with_increment(|c| c.absolute(value))
}
}
impl<T> GaugeFn for Generational<T>
where
T: GaugeFn,
{
fn increment(&self, value: f64) {
self.with_increment(|g| g.increment(value))
}
fn decrement(&self, value: f64) {
self.with_increment(|g| g.decrement(value))
}
fn set(&self, value: f64) {
self.with_increment(|g| g.set(value))
}
}
impl<T> HistogramFn for Generational<T>
where
T: HistogramFn,
{
fn record(&self, value: f64) {
self.with_increment(|h| h.record(value))
}
}
impl<T> From<Generational<T>> for Counter
where
T: CounterFn + Send + Sync + 'static,
{
fn from(inner: Generational<T>) -> Self {
Counter::from_arc(Arc::new(inner))
}
}
impl<T> From<Generational<T>> for Gauge
where
T: GaugeFn + Send + Sync + 'static,
{
fn from(inner: Generational<T>) -> Self {
Gauge::from_arc(Arc::new(inner))
}
}
impl<T> From<Generational<T>> for Histogram
where
T: HistogramFn + Send + Sync + 'static,
{
fn from(inner: Generational<T>) -> Self {
Histogram::from_arc(Arc::new(inner))
}
}
#[derive(Debug)]
pub struct GenerationalStorage<S> {
inner: S,
}
impl<S> GenerationalStorage<S> {
pub fn new(storage: S) -> Self {
Self { inner: storage }
}
}
impl<K, S: Storage<K>> Storage<K> for GenerationalStorage<S> {
type Counter = Generational<S::Counter>;
type Gauge = Generational<S::Gauge>;
type Histogram = Generational<S::Histogram>;
fn counter(&self, key: &K) -> Self::Counter {
Generational::new(self.inner.counter(key))
}
fn gauge(&self, key: &K) -> Self::Gauge {
Generational::new(self.inner.gauge(key))
}
fn histogram(&self, key: &K) -> Self::Histogram {
Generational::new(self.inner.histogram(key))
}
}
pub type GenerationalAtomicStorage = GenerationalStorage<AtomicStorage>;
impl GenerationalAtomicStorage {
pub fn atomic() -> Self {
Self { inner: AtomicStorage }
}
}
#[derive(Debug)]
pub struct Recency<K> {
mask: MetricKindMask,
#[allow(clippy::type_complexity)]
inner: Mutex<(Clock, HashMap<K, (Generation, Instant)>)>,
idle_timeout: Option<Duration>,
}
impl<K> Recency<K>
where
K: Clone + Eq + Hashable,
{
pub fn new(clock: Clock, mask: MetricKindMask, idle_timeout: Option<Duration>) -> Self {
Recency { mask, inner: Mutex::new((clock, HashMap::new())), idle_timeout }
}
pub fn should_store_counter<S>(
&self,
key: &K,
gen: Generation,
registry: &Registry<K, S>,
) -> bool
where
S: Storage<K>,
{
self.should_store(key, gen, registry, MetricKind::Counter, |registry, key| {
registry.delete_counter(key)
})
}
pub fn should_store_gauge<S>(&self, key: &K, gen: Generation, registry: &Registry<K, S>) -> bool
where
S: Storage<K>,
{
self.should_store(key, gen, registry, MetricKind::Gauge, |registry, key| {
registry.delete_gauge(key)
})
}
pub fn should_store_histogram<S>(
&self,
key: &K,
gen: Generation,
registry: &Registry<K, S>,
) -> bool
where
S: Storage<K>,
{
self.should_store(key, gen, registry, MetricKind::Histogram, |registry, key| {
registry.delete_histogram(key)
})
}
fn should_store<F, S>(
&self,
key: &K,
gen: Generation,
registry: &Registry<K, S>,
kind: MetricKind,
delete_op: F,
) -> bool
where
F: Fn(&Registry<K, S>, &K) -> bool,
S: Storage<K>,
{
if let Some(idle_timeout) = self.idle_timeout {
if self.mask.matches(kind) {
let mut guard = self.inner.lock().unwrap_or_else(PoisonError::into_inner);
let (clock, entries) = guard.deref_mut();
let now = clock.now();
let deleted = if let Some((last_gen, last_update)) = entries.get_mut(key) {
if *last_gen == gen {
(now - *last_update) > idle_timeout && delete_op(registry, key)
} else {
*last_update = now;
*last_gen = gen;
false
}
} else {
entries.insert(key.clone(), (gen, now));
false
};
if deleted {
entries.remove(key);
return false;
}
}
}
true
}
}