use std::{
collections::{BTreeMap, BTreeSet, HashMap, HashSet},
iter::FromIterator,
sync::Mutex,
};
use crate::{DogstatsdClient, TagsProvider};
pub const DEFAULT_TAG_THRESHOLD: usize = 100;
enum TrackerState {
Running {
seen: BTreeMap<String, Vec<BTreeSet<String>>>,
cardinality_count: usize,
actions: Vec<ThresholdAction>,
},
Done,
}
pub(crate) struct Tracker {
cardinality_threshold: usize,
state: Option<Mutex<TrackerState>>,
}
impl Tracker {
fn new(cardinality_threshold: usize, actions: Vec<ThresholdAction>) -> Self {
Tracker {
cardinality_threshold,
state: if !actions.is_empty() && cardinality_threshold != 0 {
Some(Mutex::new(TrackerState::Running {
actions,
seen: Default::default(),
cardinality_count: 0,
}))
} else {
None
},
}
}
pub(crate) fn track<S, T>(&self, dd: &impl DogstatsdClient, metric: &str, tags: T) -> T
where
S: AsRef<str>,
T: TagsProvider<S>,
{
let mut lock = match self.state.as_ref() {
Some(state) => state.lock().unwrap(),
None => return tags,
};
let tag_slice = tags.as_ref();
let state = std::mem::replace(&mut *lock, TrackerState::Done);
match state {
TrackerState::Running {
mut seen,
mut cardinality_count,
actions,
} => {
let cardinality_grown = Self::update(&mut seen, metric, tag_slice);
if cardinality_grown {
cardinality_count += 1;
if cardinality_count >= self.cardinality_threshold {
drop(lock);
Self::do_actions(dd, seen, actions, metric, tag_slice);
return tags;
}
}
*lock = TrackerState::Running {
seen,
cardinality_count,
actions,
};
}
TrackerState::Done => {}
};
tags
}
fn update(seen: &mut BTreeMap<String, Vec<BTreeSet<String>>>, metric: &str, tags: &[impl AsRef<str>]) -> bool {
let seen_tag_set = match seen.get_mut(metric) {
Some(seen_tag_set) => seen_tag_set,
None => {
seen.insert(
metric.to_string(),
vec![BTreeSet::from_iter(tags.iter().map(|tag| tag.as_ref().to_string()))],
);
return true;
}
};
let set_is_novel = seen_tag_set
.iter()
.all(|tag_set| tag_set.len() != tags.len() || tags.iter().any(|tag| !tag_set.contains(tag.as_ref())));
if set_is_novel {
seen_tag_set.push(BTreeSet::from_iter(tags.iter().map(|tag| tag.as_ref().to_string())));
};
set_is_novel
}
fn do_actions(
dd: &impl DogstatsdClient,
seen: BTreeMap<String, Vec<BTreeSet<String>>>,
actions: Vec<ThresholdAction>,
metric: &str,
tags: &[impl AsRef<str>],
) {
let event_tags = seen
.iter()
.map(|(metric, tags)| format!("{}:{}", metric, tags.len()))
.collect::<Vec<_>>();
let tags = tags.iter().map(|t| t.as_ref()).collect::<Vec<&str>>();
let seen: HashMap<String, Vec<HashSet<String>>> = seen
.into_iter()
.map(|(metric, sets)| (metric, sets.into_iter().map(|set| set.into_iter().collect()).collect()))
.collect();
for action in actions {
match action {
ThresholdAction::Event { title, text } => dd.event(&title, &text, &event_tags),
ThresholdAction::Custom(mut action) => {
action(metric, &tags, &seen);
}
}
}
}
}
enum ThresholdAction {
Event { title: String, text: String },
Custom(ThresholdCustomAction),
}
type ThresholdCustomAction = Box<dyn FnMut(&str, &[&str], &HashMap<String, Vec<HashSet<String>>>) + Send + Sync>;
pub struct TagTrackerConfiguration {
count_threshold: usize,
actions: Vec<ThresholdAction>,
}
impl Default for TagTrackerConfiguration {
fn default() -> Self {
Self {
count_threshold: DEFAULT_TAG_THRESHOLD,
actions: Vec::new(),
}
}
}
impl TagTrackerConfiguration {
pub fn new() -> Self {
Self {
count_threshold: DEFAULT_TAG_THRESHOLD,
actions: Vec::new(),
}
}
pub fn with_event(mut self, title: String, text: String) -> Self {
self.actions.push(ThresholdAction::Event { title, text });
self
}
pub fn with_custom_action(
mut self,
custom_action: impl FnMut(&str, &[&str], &HashMap<String, Vec<HashSet<String>>>) + Send + Sync + 'static,
) -> Self {
self.actions
.push(ThresholdAction::Custom(Box::new(custom_action) as Box<_>));
self
}
pub fn with_threshold(mut self, count_threshold: usize) -> Self {
self.count_threshold = count_threshold;
self
}
pub(crate) fn build(self) -> Tracker {
Tracker::new(self.count_threshold, self.actions)
}
}