use std::collections::HashMap;
use std::default::Default;
use std::sync::Arc;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use crate::name::{MetricName, NameParts};
use crate::scheduler::{Cancel, SCHEDULER};
use crate::{CancelHandle, Flush, InputMetric, InputScope, MetricValue};
use std::fmt;
use std::time::{Duration, Instant};
#[cfg(not(feature = "parking_lot"))]
use std::sync::RwLock;
use crate::Labels;
#[cfg(feature = "parking_lot")]
use parking_lot::RwLock;
use std::ops::Deref;
#[derive(Debug, Clone, Copy, Default)]
pub enum Sampling {
#[default]
Full,
Random(f64),
}
#[derive(Debug, Clone, Copy, Eq, PartialEq, Default)]
pub enum Buffering {
#[default]
Unbuffered,
BufferSize(usize),
Unlimited,
}
#[derive(Clone, Debug, Hash, Eq, PartialOrd, PartialEq)]
pub struct MetricId(String);
impl MetricId {
pub fn forge(out_type: &str, name: MetricName) -> Self {
let id: String = name.join("/");
MetricId(format!("{out_type}:{id}"))
}
}
pub type Shared<T> = Arc<RwLock<T>>;
pub struct Listener {
listener_id: usize,
listener_fn: Arc<dyn Fn(Instant) + Send + Sync + 'static>,
}
#[derive(Clone, Default)]
pub struct Attributes {
naming: NameParts,
sampling: Sampling,
buffering: Buffering,
flush_listeners: Shared<HashMap<MetricId, Listener>>,
tasks: Shared<Vec<CancelHandle>>,
}
impl fmt::Debug for Attributes {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "naming: {:?}", self.naming)?;
write!(f, "sampling: {:?}", self.sampling)?;
write!(f, "buffering: {:?}", self.buffering)
}
}
pub trait WithAttributes: Clone {
fn get_attributes(&self) -> &Attributes;
fn mut_attributes(&mut self) -> &mut Attributes;
fn with_attributes<F: Fn(&mut Attributes)>(&self, edit: F) -> Self {
let mut cloned = self.clone();
(edit)(cloned.mut_attributes());
cloned
}
}
pub trait OnFlush {
fn notify_flush_listeners(&self);
}
impl<T> OnFlush for T
where
T: Flush + WithAttributes,
{
fn notify_flush_listeners(&self) {
let now = Instant::now();
for listener in read_lock!(self.get_attributes().flush_listeners).values() {
(listener.listener_fn)(now)
}
}
}
pub struct ObserveWhen<'a, T, F> {
target: &'a T,
metric: InputMetric,
operation: Arc<F>,
}
static ID_GENERATOR: AtomicUsize = AtomicUsize::new(0);
pub struct OnFlushCancel(Arc<dyn Fn() + Send + Sync>);
impl Cancel for OnFlushCancel {
fn cancel(&self) {
(self.0)()
}
}
impl<'a, T, F> ObserveWhen<'a, T, F>
where
F: Fn(Instant) -> MetricValue + Send + Sync + 'static,
T: InputScope + WithAttributes + Send + Sync,
{
pub fn on_flush(self) -> OnFlushCancel {
let gauge = self.metric;
let metric_id = gauge.metric_id().clone();
let op = self.operation;
let listener_id = ID_GENERATOR.fetch_add(1, Ordering::Relaxed);
write_lock!(self.target.get_attributes().flush_listeners).insert(
metric_id.clone(),
Listener {
listener_id,
listener_fn: Arc::new(move |now| gauge.write(op(now), Labels::default())),
},
);
let flush_listeners = self.target.get_attributes().flush_listeners.clone();
OnFlushCancel(Arc::new(move || {
let mut listeners = write_lock!(flush_listeners);
let installed_listener_id = listeners.get(&metric_id).map(|v| v.listener_id);
if let Some(id) = installed_listener_id {
if id == listener_id {
listeners.remove(&metric_id);
}
}
}))
}
pub fn every(self, period: Duration) -> CancelHandle {
let gauge = self.metric;
let op = self.operation;
let handle = SCHEDULER.schedule(period, move |now| gauge.write(op(now), Labels::default()));
write_lock!(self.target.get_attributes().tasks).push(handle.clone());
handle
}
}
pub trait Observe {
type Inner;
#[must_use = "must specify when to observe"]
fn observe<F>(
&self,
metric: impl Deref<Target = InputMetric>,
operation: F,
) -> ObserveWhen<Self::Inner, F>
where
F: Fn(Instant) -> MetricValue + Send + Sync + 'static,
Self: Sized;
}
impl<T: InputScope + WithAttributes> Observe for T {
type Inner = Self;
fn observe<F>(
&self,
metric: impl Deref<Target = InputMetric>,
operation: F,
) -> ObserveWhen<Self, F>
where
F: Fn(Instant) -> MetricValue + Send + Sync + 'static,
Self: Sized,
{
ObserveWhen {
target: self,
metric: (*metric).clone(),
operation: Arc::new(operation),
}
}
}
impl Drop for Attributes {
fn drop(&mut self) {
let mut tasks = write_lock!(self.tasks);
for task in tasks.drain(..) {
task.cancel()
}
}
}
pub trait Prefixed {
fn get_prefixes(&self) -> &NameParts;
#[deprecated(since = "0.7.2", note = "Use named() or add_name()")]
fn add_prefix<S: Into<String>>(&self, name: S) -> Self;
fn add_name<S: Into<String>>(&self, name: S) -> Self;
fn named<S: Into<String>>(&self, name: S) -> Self;
fn prefix_append<S: Into<MetricName>>(&self, name: S) -> MetricName {
name.into().append(self.get_prefixes().clone())
}
fn prefix_prepend<S: Into<MetricName>>(&self, name: S) -> MetricName {
name.into().prepend(self.get_prefixes().clone())
}
}
impl<T: WithAttributes> Prefixed for T {
fn get_prefixes(&self) -> &NameParts {
&self.get_attributes().naming
}
fn add_prefix<S: Into<String>>(&self, name: S) -> Self {
self.add_name(name)
}
fn add_name<S: Into<String>>(&self, name: S) -> Self {
let name = name.into();
self.with_attributes(|new_attr| new_attr.naming.push_back(name.clone()))
}
fn named<S: Into<String>>(&self, name: S) -> Self {
let parts = NameParts::from(name);
self.with_attributes(|new_attr| new_attr.naming = parts.clone())
}
}
pub trait Sampled: WithAttributes {
fn sampled(&self, sampling: Sampling) -> Self {
self.with_attributes(|new_attr| new_attr.sampling = sampling)
}
fn get_sampling(&self) -> Sampling {
self.get_attributes().sampling
}
}
pub trait Buffered: WithAttributes {
fn buffered(&self, buffering: Buffering) -> Self {
self.with_attributes(|new_attr| new_attr.buffering = buffering)
}
fn get_buffering(&self) -> Buffering {
self.get_attributes().buffering
}
fn is_buffered(&self) -> bool {
!(self.get_attributes().buffering == Buffering::Unbuffered)
}
}
#[cfg(test)]
mod test {
use crate::Flush;
use crate::StatsMapScope;
use crate::attributes::*;
use crate::input::Input;
use crate::input::*;
use crate::output::map::StatsMap;
#[test]
fn on_flush() {
let metrics: StatsMapScope = StatsMap::default().metrics();
let gauge = metrics.gauge("my_gauge");
metrics.observe(gauge, |_| 4).on_flush();
metrics.flush().unwrap();
assert_eq!(Some(&4), metrics.into_map().get("my_gauge"))
}
}