prima_datadog/
tracker.rs

1use std::{
2    collections::{BTreeMap, BTreeSet, HashMap, HashSet},
3    iter::FromIterator,
4    sync::Mutex,
5};
6
7use crate::{DogstatsdClient, TagsProvider};
8
9/// See <https://www.datadoghq.com/pricing/> and <https://docs.datadoghq.com/account_management/billing/custom_metrics/>,
10///
11/// 100 seems like a reasonable place to start warning for now
12pub const DEFAULT_TAG_THRESHOLD: usize = 100;
13
14enum TrackerState {
15    Running {
16        seen: BTreeMap<String, Vec<BTreeSet<String>>>,
17        cardinality_count: usize,
18        actions: Vec<ThresholdAction>,
19    },
20    Done,
21}
22
23pub(crate) struct Tracker {
24    /// Threshold at which to take the user defined action, and stop tracking
25    cardinality_threshold: usize,
26
27    /// Our internal state
28    ///
29    /// This will be `None` if the user does not want to track cardinality.
30    state: Option<Mutex<TrackerState>>,
31}
32
33impl Tracker {
34    fn new(cardinality_threshold: usize, actions: Vec<ThresholdAction>) -> Self {
35        Tracker {
36            cardinality_threshold,
37            state: if !actions.is_empty() && cardinality_threshold != 0 {
38                Some(Mutex::new(TrackerState::Running {
39                    actions,
40                    seen: Default::default(),
41                    cardinality_count: 0,
42                }))
43            } else {
44                None
45            },
46        }
47    }
48
49    pub(crate) fn track<S, T>(&self, dd: &impl DogstatsdClient, metric: &str, tags: T) -> T
50    where
51        S: AsRef<str>,
52        T: TagsProvider<S>,
53    {
54        let mut lock = match self.state.as_ref() {
55            Some(state) => state.lock().unwrap(),
56            None => return tags,
57        };
58        let tag_slice = tags.as_ref();
59        let state = std::mem::replace(&mut *lock, TrackerState::Done);
60        match state {
61            TrackerState::Running {
62                mut seen,
63                mut cardinality_count,
64                actions,
65            } => {
66                let cardinality_grown = Self::update(&mut seen, metric, tag_slice);
67                if cardinality_grown {
68                    cardinality_count += 1;
69                    if cardinality_count >= self.cardinality_threshold {
70                        drop(lock);
71                        Self::do_actions(dd, seen, actions, metric, tag_slice);
72                        return tags;
73                    }
74                }
75                // Rebuild the running state if we didn't run the actions
76                *lock = TrackerState::Running {
77                    seen,
78                    cardinality_count,
79                    actions,
80                };
81            }
82            TrackerState::Done => {}
83        };
84        tags
85    }
86
87    fn update(seen: &mut BTreeMap<String, Vec<BTreeSet<String>>>, metric: &str, tags: &[impl AsRef<str>]) -> bool {
88        let seen_tag_set = match seen.get_mut(metric) {
89            Some(seen_tag_set) => seen_tag_set,
90            None => {
91                seen.insert(
92                    metric.to_string(),
93                    vec![BTreeSet::from_iter(tags.iter().map(|tag| tag.as_ref().to_string()))],
94                );
95                return true;
96            }
97        };
98        // Is this set of tags new for this metric?
99        let set_is_novel = seen_tag_set
100            .iter()
101            .all(|tag_set| tag_set.len() != tags.len() || tags.iter().any(|tag| !tag_set.contains(tag.as_ref())));
102        if set_is_novel {
103            seen_tag_set.push(BTreeSet::from_iter(tags.iter().map(|tag| tag.as_ref().to_string())));
104        };
105        set_is_novel
106    }
107
108    fn do_actions(
109        dd: &impl DogstatsdClient,
110        seen: BTreeMap<String, Vec<BTreeSet<String>>>,
111        actions: Vec<ThresholdAction>,
112        metric: &str,
113        tags: &[impl AsRef<str>],
114    ) {
115        let event_tags = seen
116            .iter()
117            .map(|(metric, tags)| format!("{}:{}", metric, tags.len()))
118            .collect::<Vec<_>>();
119
120        let tags = tags.iter().map(|t| t.as_ref()).collect::<Vec<&str>>();
121        // Map from a BTreeMap<String, Vec<BTreeSet<String>>> to a HashMap<String, Vec<HashSet<String>>>
122        let seen: HashMap<String, Vec<HashSet<String>>> = seen
123            .into_iter()
124            .map(|(metric, sets)| (metric, sets.into_iter().map(|set| set.into_iter().collect()).collect()))
125            .collect();
126        for action in actions {
127            match action {
128                ThresholdAction::Event { title, text } => dd.event(&title, &text, &event_tags),
129                ThresholdAction::Custom(mut action) => {
130                    action(metric, &tags, &seen);
131                }
132            }
133        }
134    }
135}
136
137/// Actions that define what the tracker will do when the custom metric threshold is passed.
138/// A user may define any number of these, and by default none are taken.
139enum ThresholdAction {
140    /// Emit an event. The count of unique tag sets, per metric, is provided as the tags
141    Event { title: String, text: String },
142
143    /// Take some custom action.
144    Custom(ThresholdCustomAction),
145}
146
147type ThresholdCustomAction = Box<dyn FnMut(&str, &[&str], &HashMap<String, Vec<HashSet<String>>>) + Send + Sync>;
148
149/// The configuration for the tag tracker. By default, the tag tracking is not enabled.
150/// To enable it, set the `count_threshold` to a non-zero value, and add at least one event
151/// or custom action
152/// Example usage:
153/// ```rust
154/// use prima_datadog::{
155///     configuration::{Country, Configuration},
156///     Datadog, TagTrackerConfiguration,
157/// };
158/// let tracker_config = TagTrackerConfiguration::new()
159///     .with_threshold(21)
160///     .with_custom_action(|_, _, _| {});
161/// let configuration = Configuration::new(
162///     "0.0.0.0:1234",
163///     "prima_datadog_benchmarks",
164/// ).with_country(Country::It).with_tracker_configuration(tracker_config);
165/// Datadog::init(configuration).unwrap();
166/// ```
167pub struct TagTrackerConfiguration {
168    count_threshold: usize,
169    actions: Vec<ThresholdAction>,
170}
171
172impl Default for TagTrackerConfiguration {
173    fn default() -> Self {
174        Self {
175            count_threshold: DEFAULT_TAG_THRESHOLD,
176            actions: Vec::new(),
177        }
178    }
179}
180
181impl TagTrackerConfiguration {
182    pub fn new() -> Self {
183        Self {
184            count_threshold: DEFAULT_TAG_THRESHOLD,
185            actions: Vec::new(),
186        }
187    }
188
189    /// Configure the tracker to emit an event when the threshold is reached.
190    /// The count of unique tag sets, per metric, is provided as the tags
191    /// for the event, i.e. for a given metric `metric`, there will be a tag `metric:count`,
192    /// where count is the number of unique tag sets seen for that metric.
193    /// Any number of events may be configured, and all will be emitted when the
194    /// threshold is reached.
195    pub fn with_event(mut self, title: String, text: String) -> Self {
196        self.actions.push(ThresholdAction::Event { title, text });
197        self
198    }
199
200    /// Add a custom action to execute when the custom metric threshold is reached.
201    /// These actions are run exactly once, at the point the threshold is reached.
202    /// Any number of actions may be added. The function will be passed the metric name,
203    /// the tags of the metric causing the crossing of the threshold, and a
204    /// HashMap containing all the unique tag sets seen for each metric.
205    ///
206    /// # Example
207    ///
208    /// ```rust
209    /// prima_datadog::TagTrackerConfiguration::new().with_custom_action(|metric, tags, _| {
210    ///     println!("Exceeded custom metric threshold for metric {} with tags {:?}", metric, tags);
211    /// });
212    /// ```
213    pub fn with_custom_action(
214        mut self,
215        custom_action: impl FnMut(&str, &[&str], &HashMap<String, Vec<HashSet<String>>>) + Send + Sync + 'static,
216    ) -> Self {
217        self.actions
218            .push(ThresholdAction::Custom(Box::new(custom_action) as Box<_>));
219        self
220    }
221
222    /// Set the threshold for the maximum number of custom metrics
223    /// This defaults to ```DEFAULT_TAG_THRESHOLD```
224    ///
225    /// The threshold is the maximum number of "custom metrics" which can
226    /// be generated before the tracker actions are run.
227    ///
228    /// A "custom metric" is defined as the unique combination of metric name
229    /// and unique tag set, e.g., a metric, "test", with the following tag sets:
230    /// - [a, b, c]
231    /// - [a, b, d]
232    /// - [a, c, d]
233    ///
234    /// is counted as 3 "custom metrics", as there are 3 unique tag sets here.
235    ///
236    /// See <https://docs.datadoghq.com/developers/metrics/custom_metrics/> for
237    /// more information.
238    pub fn with_threshold(mut self, count_threshold: usize) -> Self {
239        self.count_threshold = count_threshold;
240        self
241    }
242
243    pub(crate) fn build(self) -> Tracker {
244        Tracker::new(self.count_threshold, self.actions)
245    }
246}