use std::collections::HashMap;
use crate::layers::Layer;
use aho_corasick::{AhoCorasick, AhoCorasickBuilder};
use metrics::{GaugeValue, Key, Recorder, Unit};
use parking_lot::Mutex;
pub struct Absolute<R> {
inner: R,
automaton: AhoCorasick,
seen: Mutex<HashMap<Key, u64>>,
}
impl<R> Absolute<R> {
fn should_convert(&self, key: &Key) -> bool {
key.name()
.parts()
.any(|s| self.automaton.is_match(s.as_ref()))
}
}
impl<R: Recorder> Recorder for Absolute<R> {
fn register_counter(&self, key: Key, unit: Option<Unit>, description: Option<&'static str>) {
self.inner.register_counter(key, unit, description)
}
fn register_gauge(&self, key: Key, unit: Option<Unit>, description: Option<&'static str>) {
self.inner.register_gauge(key, unit, description)
}
fn register_histogram(&self, key: Key, unit: Option<Unit>, description: Option<&'static str>) {
self.inner.register_histogram(key, unit, description)
}
fn increment_counter(&self, key: Key, value: u64) {
let value = if self.should_convert(&key) {
let mut seen = self.seen.lock();
let curr_value = seen.entry(key.clone()).or_default();
if value <= *curr_value {
return;
}
let delta = value - *curr_value;
*curr_value = value;
delta
} else {
value
};
self.inner.increment_counter(key, value);
}
fn update_gauge(&self, key: Key, value: GaugeValue) {
self.inner.update_gauge(key, value);
}
fn record_histogram(&self, key: Key, value: f64) {
self.inner.record_histogram(key, value);
}
}
#[derive(Default)]
pub struct AbsoluteLayer {
patterns: Vec<String>,
case_insensitive: bool,
use_dfa: bool,
}
impl AbsoluteLayer {
pub fn from_patterns<P, I>(patterns: P) -> Self
where
P: IntoIterator<Item = I>,
I: AsRef<str>,
{
AbsoluteLayer {
patterns: patterns
.into_iter()
.map(|s| s.as_ref().to_string())
.collect(),
case_insensitive: false,
use_dfa: true,
}
}
pub fn add_pattern<P>(&mut self, pattern: P) -> &mut AbsoluteLayer
where
P: AsRef<str>,
{
self.patterns.push(pattern.as_ref().to_string());
self
}
pub fn case_insensitive(&mut self, case_insensitive: bool) -> &mut AbsoluteLayer {
self.case_insensitive = case_insensitive;
self
}
pub fn use_dfa(&mut self, dfa: bool) -> &mut AbsoluteLayer {
self.use_dfa = dfa;
self
}
}
impl<R> Layer<R> for AbsoluteLayer {
type Output = Absolute<R>;
fn layer(&self, inner: R) -> Self::Output {
let mut automaton_builder = AhoCorasickBuilder::new();
let automaton = automaton_builder
.ascii_case_insensitive(self.case_insensitive)
.dfa(self.use_dfa)
.auto_configure(&self.patterns)
.build(&self.patterns);
Absolute {
inner,
automaton,
seen: Mutex::new(HashMap::new()),
}
}
}
#[cfg(test)]
mod tests {
use super::AbsoluteLayer;
use crate::layers::Layer;
use crate::{debugging::DebuggingRecorder, DebugValue};
use metrics::{GaugeValue, Key, Recorder};
use ordered_float::OrderedFloat;
#[test]
fn test_basic_functionality() {
let patterns = &["rdkafka"];
let recorder = DebuggingRecorder::with_ordering(true);
let snapshotter = recorder.snapshotter();
let absolute = AbsoluteLayer::from_patterns(patterns);
let layered = absolute.layer(recorder);
let before = snapshotter.snapshot();
assert_eq!(before.len(), 0);
layered.register_counter(Key::Owned("counter".into()), None, None);
layered.register_gauge(Key::Owned("gauge".into()), None, None);
layered.register_histogram(Key::Owned("histo".into()), None, None);
let after = snapshotter.snapshot();
assert_eq!(after.len(), 3);
assert_eq!(after[0].0.key().clone(), Key::Owned("counter".into()));
assert_eq!(after[1].0.key().clone(), Key::Owned("gauge".into()));
assert_eq!(after[2].0.key().clone(), Key::Owned("histo".into()));
layered.increment_counter(Key::Owned("counter".into()), 42);
layered.update_gauge(Key::Owned("gauge".into()), GaugeValue::Absolute(-420.69));
layered.record_histogram(Key::Owned("histo".into()), 3.14);
let after = snapshotter.snapshot();
assert_eq!(after.len(), 3);
assert_eq!(after[0].0.key().clone(), Key::Owned("counter".into()));
assert_eq!(after[0].3, DebugValue::Counter(42));
assert_eq!(after[1].0.key().clone(), Key::Owned("gauge".into()));
assert_eq!(after[1].3, DebugValue::Gauge(OrderedFloat::<f64>(-420.69)));
assert_eq!(after[2].0.key().clone(), Key::Owned("histo".into()));
assert_eq!(
after[2].3,
DebugValue::Histogram(vec![OrderedFloat::<f64>(3.14)])
);
}
#[test]
fn test_absolute_to_delta() {
let patterns = &["rdkafka"];
let recorder = DebuggingRecorder::with_ordering(true);
let snapshotter = recorder.snapshotter();
let absolute = AbsoluteLayer::from_patterns(patterns);
let layered = absolute.layer(recorder);
let before = snapshotter.snapshot();
assert_eq!(before.len(), 0);
layered.increment_counter(Key::Owned("counter".into()), 42);
let after = snapshotter.snapshot();
assert_eq!(after.len(), 1);
assert_eq!(after[0].0.key().clone(), Key::Owned("counter".into()));
assert_eq!(after[0].3, DebugValue::Counter(42));
layered.increment_counter(Key::Owned("rdkafka.bytes".into()), 18);
let after = snapshotter.snapshot();
assert_eq!(after.len(), 2);
assert_eq!(after[0].0.key().clone(), Key::Owned("counter".into()));
assert_eq!(after[0].3, DebugValue::Counter(42));
assert_eq!(after[1].0.key().clone(), Key::Owned("rdkafka.bytes".into()));
assert_eq!(after[1].3, DebugValue::Counter(18));
layered.increment_counter(Key::Owned("counter".into()), 42);
layered.increment_counter(Key::Owned("rdkafka.bytes".into()), 18);
let after = snapshotter.snapshot();
assert_eq!(after.len(), 2);
assert_eq!(after[0].0.key().clone(), Key::Owned("counter".into()));
assert_eq!(after[0].3, DebugValue::Counter(84));
assert_eq!(after[1].0.key().clone(), Key::Owned("rdkafka.bytes".into()));
assert_eq!(after[1].3, DebugValue::Counter(18));
layered.increment_counter(Key::Owned("rdkafka.bytes".into()), 24);
let after = snapshotter.snapshot();
assert_eq!(after.len(), 2);
assert_eq!(after[1].0.key().clone(), Key::Owned("rdkafka.bytes".into()));
assert_eq!(after[1].3, DebugValue::Counter(24));
layered.increment_counter(Key::Owned("rdkafka.bytes".into()), 18);
let after = snapshotter.snapshot();
assert_eq!(after.len(), 2);
assert_eq!(after[1].0.key().clone(), Key::Owned("rdkafka.bytes".into()));
assert_eq!(after[1].3, DebugValue::Counter(24));
}
}