vortex_metrics/
lib.rs

1#![deny(missing_docs)]
2//! Vortex metrics
3
4mod macros;
5
6use std::borrow::Cow;
7use std::collections::BTreeMap;
8use std::fmt::{Debug, Formatter};
9use std::sync::{Arc, RwLock};
10
11use vortex_error::VortexExpect;
12use witchcraft_metrics::{MetricRegistry, Metrics, MetricsIter};
13
14/// A metric registry for various performance metrics.
15#[derive(Default, Clone)]
16pub struct VortexMetrics {
17    inner: Arc<Inner>,
18}
19
20#[derive(Default)]
21struct Inner {
22    registry: MetricRegistry,
23    default_tags: DefaultTags,
24    children: RwLock<Vec<VortexMetrics>>,
25}
26
27impl Debug for VortexMetrics {
28    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
29        f.debug_struct("VortexMetrics")
30            .field("default_tags", &self.inner.default_tags)
31            .field("children", &self.inner.children)
32            .finish_non_exhaustive()
33    }
34}
35
36// re-export exposed metric types
37pub use witchcraft_metrics::{Counter, Histogram, Metric, MetricId, Tags, Timer};
38
39/// Default tags for metrics used in [`VortexMetrics`].
40#[derive(Default, Clone, Debug)]
41pub struct DefaultTags(BTreeMap<Cow<'static, str>, Cow<'static, str>>);
42
43impl<K, V, I> From<I> for DefaultTags
44where
45    I: IntoIterator<Item = (K, V)>,
46    K: Into<Cow<'static, str>>,
47    V: Into<Cow<'static, str>>,
48{
49    fn from(pairs: I) -> Self {
50        DefaultTags(
51            pairs
52                .into_iter()
53                .map(|(k, v)| (k.into(), v.into()))
54                .collect(),
55        )
56    }
57}
58
59impl VortexMetrics {
60    /// Create a new [`VortexMetrics`] instance.
61    pub fn new(registry: MetricRegistry, default_tags: impl Into<DefaultTags>) -> Self {
62        let inner = Arc::new(Inner {
63            registry,
64            default_tags: default_tags.into(),
65            children: Default::default(),
66        });
67        Self { inner }
68    }
69
70    /// Create an empty metric registry with default tags.
71    pub fn new_with_tags(default_tags: impl Into<DefaultTags>) -> Self {
72        Self::new(MetricRegistry::default(), default_tags)
73    }
74
75    /// Create a new metrics registry with additional tags. Metrics created in the
76    /// child registry will be included in this registry's snapshots.
77    pub fn child_with_tags(&self, additional_tags: impl Into<DefaultTags>) -> Self {
78        let child = Self::new_with_tags(self.inner.default_tags.merge(&additional_tags.into()));
79        self.inner
80            .children
81            .write()
82            .vortex_expect("failed to acquire write lock on children")
83            .push(child.clone());
84        child
85    }
86
87    /// Returns the counter with the specified ID, creating a default instance if absent.
88    ///
89    /// # Panics
90    ///
91    /// Panics if a metric is registered with the ID that is not a counter.
92    pub fn counter<T>(&self, id: T) -> Arc<Counter>
93    where
94        T: Into<MetricId>,
95    {
96        self.inner.registry.counter(id)
97    }
98
99    /// Returns the histogram with the specified ID, creating a default instance if absent.
100    ///
101    /// # Panics
102    ///
103    /// Panics if a metric is registered with the ID that is not a histogram.
104    pub fn histogram<T>(&self, id: T) -> Arc<Histogram>
105    where
106        T: Into<MetricId>,
107    {
108        self.inner.registry.histogram(id)
109    }
110
111    /// Returns the timer with the specified ID, creating a default instance if absent.
112    ///
113    /// # Panics
114    ///
115    /// Panics if a metric is registered with the ID that is not a timer.
116    pub fn timer<T>(&self, id: T) -> Arc<Timer>
117    where
118        T: Into<MetricId>,
119    {
120        self.inner.registry.timer(id)
121    }
122
123    /// Returns a snapshot of the metrics in the registry.
124    ///
125    /// Modifications to the registry after this method is called will not affect the state of the returned `MetricsSnapshot`.
126    ///
127    /// Note: Tag values may contain sensitive information and should be properly sanitized before external exposure.
128    pub fn snapshot(&self) -> MetricsSnapshot {
129        let children = self
130            .inner
131            .children
132            .read()
133            .vortex_expect("failed to acquire read lock on children");
134        let snapshots = children.iter().map(|c| c.snapshot());
135        MetricsSnapshot(
136            std::iter::once((
137                self.inner.default_tags.clone(),
138                self.inner.registry.metrics(),
139            ))
140            .chain(snapshots.flat_map(|snapshots| snapshots.0.into_iter()))
141            .collect(),
142        )
143    }
144}
145
146/// A snapshot of the metrics in a registry with default tags.
147pub struct MetricsSnapshot(Vec<(DefaultTags, Metrics)>);
148
149impl MetricsSnapshot {
150    /// Create an iterator over the metrics snapshot.
151    pub fn iter(&self) -> impl Iterator<Item = (MetricId, &Metric)> {
152        self.0
153            .iter()
154            .flat_map(|(default_tags, metrics)| VortexMetricsIter {
155                iter: metrics.iter(),
156                default_tags,
157            })
158    }
159}
160
161/// Metrics Iterator that applies the default tags to each metric in the inner iterator.
162pub struct VortexMetricsIter<'a> {
163    iter: MetricsIter<'a>,
164    default_tags: &'a DefaultTags,
165}
166
167impl<'a> Iterator for VortexMetricsIter<'a> {
168    type Item = (MetricId, &'a Metric);
169
170    #[inline]
171    fn next(&mut self) -> Option<(MetricId, &'a Metric)> {
172        self.iter.next().map(|(k, v)| {
173            let mut metric_id = k.clone();
174            for (tag_key, tag_value) in self.default_tags.0.iter() {
175                metric_id = metric_id.with_tag(tag_key.clone(), tag_value.clone())
176            }
177
178            (metric_id, v)
179        })
180    }
181
182    #[inline]
183    fn size_hint(&self) -> (usize, Option<usize>) {
184        self.iter.size_hint()
185    }
186}
187
188impl DefaultTags {
189    fn merge(&self, other: &Self) -> Self {
190        DefaultTags(
191            self.0
192                .iter()
193                .chain(other.0.iter())
194                .map(|(k, v)| (k.clone(), v.clone()))
195                .collect(),
196        )
197    }
198}
199
200#[cfg(test)]
201mod tests {
202    use super::*;
203
204    #[test]
205    fn test_default_tags() -> Result<(), &'static str> {
206        let tags = [("file", "a"), ("partition", "1")];
207        let metrics = VortexMetrics::new_with_tags(tags);
208
209        // Create a metric to verify tags
210        let counter = metrics.counter("test.counter");
211        counter.inc();
212        let snapshot = metrics.snapshot();
213        let (name, metric) = snapshot.iter().next().unwrap();
214        assert_eq!(
215            name,
216            MetricId::new("test.counter")
217                .with_tag("file", "a")
218                .with_tag("partition", "1")
219        );
220        match metric {
221            Metric::Counter(c) => assert_eq!(c.count(), 1),
222            _ => return Err("metric is not a counter"),
223        }
224        Ok(())
225    }
226
227    #[test]
228    fn test_multiple_children_with_different_tags() -> Result<(), &'static str> {
229        let parent_tags = [("service", "vortex")];
230        let parent = VortexMetrics::new_with_tags(parent_tags);
231
232        let child1_tags = [("instance", "child1")];
233        let child2_tags = [("instance", "child2")];
234
235        let child1 = parent.child_with_tags(child1_tags);
236        let child2 = parent.child_with_tags(child2_tags);
237
238        // Create same metric in both children
239        let counter1 = child1.counter("test.counter");
240        let counter2 = child2.counter("test.counter");
241
242        counter1.inc();
243        counter2.add(2);
244
245        // Verify child1 metrics
246        let child1_snapshot = child1.snapshot();
247        let (name, metric) = child1_snapshot.iter().next().unwrap();
248        assert_eq!(
249            name,
250            MetricId::new("test.counter")
251                .with_tag("service", "vortex")
252                .with_tag("instance", "child1")
253        );
254        match metric {
255            Metric::Counter(c) => assert_eq!(c.count(), 1),
256            _ => return Err("metric is not a counter"),
257        }
258
259        // Verify child2 metrics
260        let child2_snapshot = child2.snapshot();
261        let (name, metric) = child2_snapshot.iter().next().unwrap();
262        assert_eq!(
263            name,
264            MetricId::new("test.counter")
265                .with_tag("service", "vortex")
266                .with_tag("instance", "child2")
267        );
268        match metric {
269            Metric::Counter(c) => assert_eq!(c.count(), 2),
270            _ => return Err("metric is not a counter"),
271        }
272        Ok(())
273    }
274
275    #[test]
276    fn test_tag_overriding() -> Result<(), &'static str> {
277        let parent_tags = [("service", "vortex"), ("environment", "test")];
278        let parent = VortexMetrics::new_with_tags(parent_tags);
279
280        // Child tries to override parent's service tag
281        let child_tags = [("service", "override"), ("instance", "child1")];
282        let child = parent.child_with_tags(child_tags);
283
284        let child_counter = child.counter("test.counter");
285        child_counter.inc();
286
287        // Verify child metrics have the overridden tag value
288        let child_snapshot = child.snapshot();
289        let (name, metric) = child_snapshot.iter().next().unwrap();
290        assert_eq!(
291            name,
292            MetricId::new("test.counter")
293                .with_tag("service", "override")
294                .with_tag("environment", "test")
295                .with_tag("instance", "child1")
296        );
297        match metric {
298            Metric::Counter(c) => assert_eq!(c.count(), 1),
299            _ => return Err("metric is not a counter"),
300        }
301        Ok(())
302    }
303}