prima_datadog/tracker.rs
1use std::{
2 collections::{BTreeMap, BTreeSet, HashMap, HashSet},
3 iter::FromIterator,
4 sync::Mutex,
5};
6
7use crate::{DogstatsdClient, TagsProvider};
8
9/// See <https://www.datadoghq.com/pricing/> and <https://docs.datadoghq.com/account_management/billing/custom_metrics/>,
10///
11/// 100 seems like a reasonable place to start warning for now
12pub const DEFAULT_TAG_THRESHOLD: usize = 100;
13
14enum TrackerState {
15 Running {
16 seen: BTreeMap<String, Vec<BTreeSet<String>>>,
17 cardinality_count: usize,
18 actions: Vec<ThresholdAction>,
19 },
20 Done,
21}
22
23pub(crate) struct Tracker {
24 /// Threshold at which to take the user defined action, and stop tracking
25 cardinality_threshold: usize,
26
27 /// Our internal state
28 ///
29 /// This will be `None` if the user does not want to track cardinality.
30 state: Option<Mutex<TrackerState>>,
31}
32
33impl Tracker {
34 fn new(cardinality_threshold: usize, actions: Vec<ThresholdAction>) -> Self {
35 Tracker {
36 cardinality_threshold,
37 state: if !actions.is_empty() && cardinality_threshold != 0 {
38 Some(Mutex::new(TrackerState::Running {
39 actions,
40 seen: Default::default(),
41 cardinality_count: 0,
42 }))
43 } else {
44 None
45 },
46 }
47 }
48
49 pub(crate) fn track<S, T>(&self, dd: &impl DogstatsdClient, metric: &str, tags: T) -> T
50 where
51 S: AsRef<str>,
52 T: TagsProvider<S>,
53 {
54 let mut lock = match self.state.as_ref() {
55 Some(state) => state.lock().unwrap(),
56 None => return tags,
57 };
58 let tag_slice = tags.as_ref();
59 let state = std::mem::replace(&mut *lock, TrackerState::Done);
60 match state {
61 TrackerState::Running {
62 mut seen,
63 mut cardinality_count,
64 actions,
65 } => {
66 let cardinality_grown = Self::update(&mut seen, metric, tag_slice);
67 if cardinality_grown {
68 cardinality_count += 1;
69 if cardinality_count >= self.cardinality_threshold {
70 drop(lock);
71 Self::do_actions(dd, seen, actions, metric, tag_slice);
72 return tags;
73 }
74 }
75 // Rebuild the running state if we didn't run the actions
76 *lock = TrackerState::Running {
77 seen,
78 cardinality_count,
79 actions,
80 };
81 }
82 TrackerState::Done => {}
83 };
84 tags
85 }
86
87 fn update(seen: &mut BTreeMap<String, Vec<BTreeSet<String>>>, metric: &str, tags: &[impl AsRef<str>]) -> bool {
88 let seen_tag_set = match seen.get_mut(metric) {
89 Some(seen_tag_set) => seen_tag_set,
90 None => {
91 seen.insert(
92 metric.to_string(),
93 vec![BTreeSet::from_iter(tags.iter().map(|tag| tag.as_ref().to_string()))],
94 );
95 return true;
96 }
97 };
98 // Is this set of tags new for this metric?
99 let set_is_novel = seen_tag_set
100 .iter()
101 .all(|tag_set| tag_set.len() != tags.len() || tags.iter().any(|tag| !tag_set.contains(tag.as_ref())));
102 if set_is_novel {
103 seen_tag_set.push(BTreeSet::from_iter(tags.iter().map(|tag| tag.as_ref().to_string())));
104 };
105 set_is_novel
106 }
107
108 fn do_actions(
109 dd: &impl DogstatsdClient,
110 seen: BTreeMap<String, Vec<BTreeSet<String>>>,
111 actions: Vec<ThresholdAction>,
112 metric: &str,
113 tags: &[impl AsRef<str>],
114 ) {
115 let event_tags = seen
116 .iter()
117 .map(|(metric, tags)| format!("{}:{}", metric, tags.len()))
118 .collect::<Vec<_>>();
119
120 let tags = tags.iter().map(|t| t.as_ref()).collect::<Vec<&str>>();
121 // Map from a BTreeMap<String, Vec<BTreeSet<String>>> to a HashMap<String, Vec<HashSet<String>>>
122 let seen: HashMap<String, Vec<HashSet<String>>> = seen
123 .into_iter()
124 .map(|(metric, sets)| (metric, sets.into_iter().map(|set| set.into_iter().collect()).collect()))
125 .collect();
126 for action in actions {
127 match action {
128 ThresholdAction::Event { title, text } => dd.event(&title, &text, &event_tags),
129 ThresholdAction::Custom(mut action) => {
130 action(metric, &tags, &seen);
131 }
132 }
133 }
134 }
135}
136
137/// Actions that define what the tracker will do when the custom metric threshold is passed.
138/// A user may define any number of these, and by default none are taken.
139enum ThresholdAction {
140 /// Emit an event. The count of unique tag sets, per metric, is provided as the tags
141 Event { title: String, text: String },
142
143 /// Take some custom action.
144 Custom(ThresholdCustomAction),
145}
146
147type ThresholdCustomAction = Box<dyn FnMut(&str, &[&str], &HashMap<String, Vec<HashSet<String>>>) + Send + Sync>;
148
149/// The configuration for the tag tracker. By default, the tag tracking is not enabled.
150/// To enable it, set the `count_threshold` to a non-zero value, and add at least one event
151/// or custom action
152/// Example usage:
153/// ```rust
154/// use prima_datadog::{
155/// configuration::{Country, Configuration},
156/// Datadog, TagTrackerConfiguration,
157/// };
158/// let tracker_config = TagTrackerConfiguration::new()
159/// .with_threshold(21)
160/// .with_custom_action(|_, _, _| {});
161/// let configuration = Configuration::new(
162/// "0.0.0.0:1234",
163/// "prima_datadog_benchmarks",
164/// ).with_country(Country::It).with_tracker_configuration(tracker_config);
165/// Datadog::init(configuration).unwrap();
166/// ```
167pub struct TagTrackerConfiguration {
168 count_threshold: usize,
169 actions: Vec<ThresholdAction>,
170}
171
172impl Default for TagTrackerConfiguration {
173 fn default() -> Self {
174 Self {
175 count_threshold: DEFAULT_TAG_THRESHOLD,
176 actions: Vec::new(),
177 }
178 }
179}
180
181impl TagTrackerConfiguration {
182 pub fn new() -> Self {
183 Self {
184 count_threshold: DEFAULT_TAG_THRESHOLD,
185 actions: Vec::new(),
186 }
187 }
188
189 /// Configure the tracker to emit an event when the threshold is reached.
190 /// The count of unique tag sets, per metric, is provided as the tags
191 /// for the event, i.e. for a given metric `metric`, there will be a tag `metric:count`,
192 /// where count is the number of unique tag sets seen for that metric.
193 /// Any number of events may be configured, and all will be emitted when the
194 /// threshold is reached.
195 pub fn with_event(mut self, title: String, text: String) -> Self {
196 self.actions.push(ThresholdAction::Event { title, text });
197 self
198 }
199
200 /// Add a custom action to execute when the custom metric threshold is reached.
201 /// These actions are run exactly once, at the point the threshold is reached.
202 /// Any number of actions may be added. The function will be passed the metric name,
203 /// the tags of the metric causing the crossing of the threshold, and a
204 /// HashMap containing all the unique tag sets seen for each metric.
205 ///
206 /// # Example
207 ///
208 /// ```rust
209 /// prima_datadog::TagTrackerConfiguration::new().with_custom_action(|metric, tags, _| {
210 /// println!("Exceeded custom metric threshold for metric {} with tags {:?}", metric, tags);
211 /// });
212 /// ```
213 pub fn with_custom_action(
214 mut self,
215 custom_action: impl FnMut(&str, &[&str], &HashMap<String, Vec<HashSet<String>>>) + Send + Sync + 'static,
216 ) -> Self {
217 self.actions
218 .push(ThresholdAction::Custom(Box::new(custom_action) as Box<_>));
219 self
220 }
221
222 /// Set the threshold for the maximum number of custom metrics
223 /// This defaults to ```DEFAULT_TAG_THRESHOLD```
224 ///
225 /// The threshold is the maximum number of "custom metrics" which can
226 /// be generated before the tracker actions are run.
227 ///
228 /// A "custom metric" is defined as the unique combination of metric name
229 /// and unique tag set, e.g., a metric, "test", with the following tag sets:
230 /// - [a, b, c]
231 /// - [a, b, d]
232 /// - [a, c, d]
233 ///
234 /// is counted as 3 "custom metrics", as there are 3 unique tag sets here.
235 ///
236 /// See <https://docs.datadoghq.com/developers/metrics/custom_metrics/> for
237 /// more information.
238 pub fn with_threshold(mut self, count_threshold: usize) -> Self {
239 self.count_threshold = count_threshold;
240 self
241 }
242
243 pub(crate) fn build(self) -> Tracker {
244 Tracker::new(self.count_threshold, self.actions)
245 }
246}