vortex_metrics/
lib.rs

1#![deny(missing_docs)]
2//! Vortex metrics
3
4use std::borrow::Cow;
5use std::collections::BTreeMap;
6use std::fmt::{Debug, Formatter};
7use std::sync::{Arc, RwLock};
8
9use vortex_error::VortexExpect;
10use witchcraft_metrics::{MetricRegistry, Metrics, MetricsIter};
11
12/// A metric registry for various performance metrics.
13#[derive(Default, Clone)]
14pub struct VortexMetrics {
15    inner: Arc<Inner>,
16}
17
18#[derive(Default)]
19struct Inner {
20    registry: MetricRegistry,
21    default_tags: DefaultTags,
22    children: RwLock<Vec<VortexMetrics>>,
23}
24
25impl Debug for VortexMetrics {
26    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
27        f.debug_struct("VortexMetrics")
28            .field("default_tags", &self.inner.default_tags)
29            .field("children", &self.inner.children)
30            .finish_non_exhaustive()
31    }
32}
33
34// re-export exposed metric types
35pub use witchcraft_metrics::{Counter, Histogram, Metric, MetricId, Tags, Timer};
36
37/// Default tags for metrics used in [`VortexMetrics`].
38#[derive(Default, Clone, Debug)]
39pub struct DefaultTags(BTreeMap<Cow<'static, str>, Cow<'static, str>>);
40
41impl<K, V, I> From<I> for DefaultTags
42where
43    I: IntoIterator<Item = (K, V)>,
44    K: Into<Cow<'static, str>>,
45    V: Into<Cow<'static, str>>,
46{
47    fn from(pairs: I) -> Self {
48        DefaultTags(
49            pairs
50                .into_iter()
51                .map(|(k, v)| (k.into(), v.into()))
52                .collect(),
53        )
54    }
55}
56
57impl VortexMetrics {
58    /// Create a new [`VortexMetrics`] instance.
59    pub fn new(registry: MetricRegistry, default_tags: impl Into<DefaultTags>) -> Self {
60        let inner = Arc::new(Inner {
61            registry,
62            default_tags: default_tags.into(),
63            children: Default::default(),
64        });
65        Self { inner }
66    }
67
68    /// Create an empty metric registry with default tags.
69    pub fn new_with_tags(default_tags: impl Into<DefaultTags>) -> Self {
70        Self::new(MetricRegistry::default(), default_tags)
71    }
72
73    /// Create a new metrics registry with additional tags. Metrics created in the
74    /// child registry will be included in this registry's snapshots.
75    pub fn child_with_tags(&self, additional_tags: impl Into<DefaultTags>) -> Self {
76        let child = Self::new_with_tags(self.inner.default_tags.merge(&additional_tags.into()));
77        self.inner
78            .children
79            .write()
80            .vortex_expect("failed to acquire write lock on children")
81            .push(child.clone());
82        child
83    }
84
85    /// Returns the counter with the specified ID, creating a default instance if absent.
86    ///
87    /// # Panics
88    ///
89    /// Panics if a metric is registered with the ID that is not a counter.
90    pub fn counter<T>(&self, id: T) -> Arc<Counter>
91    where
92        T: Into<MetricId>,
93    {
94        self.inner.registry.counter(id)
95    }
96
97    /// Returns the histogram with the specified ID, creating a default instance if absent.
98    ///
99    /// # Panics
100    ///
101    /// Panics if a metric is registered with the ID that is not a histogram.
102    pub fn histogram<T>(&self, id: T) -> Arc<Histogram>
103    where
104        T: Into<MetricId>,
105    {
106        self.inner.registry.histogram(id)
107    }
108
109    /// Returns the timer with the specified ID, creating a default instance if absent.
110    ///
111    /// # Panics
112    ///
113    /// Panics if a metric is registered with the ID that is not a timer.
114    pub fn timer<T>(&self, id: T) -> Arc<Timer>
115    where
116        T: Into<MetricId>,
117    {
118        self.inner.registry.timer(id)
119    }
120
121    /// Returns a snapshot of the metrics in the registry.
122    ///
123    /// Modifications to the registry after this method is called will not affect the state of the returned `MetricsSnapshot`.
124    ///
125    /// Note: Tag values may contain sensitive information and should be properly sanitized before external exposure.
126    pub fn snapshot(&self) -> MetricsSnapshot {
127        let children = self
128            .inner
129            .children
130            .read()
131            .vortex_expect("failed to acquire read lock on children");
132        let snapshots = children.iter().map(|c| c.snapshot());
133        MetricsSnapshot(
134            std::iter::once((
135                self.inner.default_tags.clone(),
136                self.inner.registry.metrics(),
137            ))
138            .chain(snapshots.flat_map(|snapshots| snapshots.0.into_iter()))
139            .collect(),
140        )
141    }
142}
143
144/// A snapshot of the metrics in a registry with default tags.
145pub struct MetricsSnapshot(Vec<(DefaultTags, Metrics)>);
146
147impl MetricsSnapshot {
148    /// Create an iterator over the metrics snapshot.
149    pub fn iter(&self) -> impl Iterator<Item = (MetricId, &Metric)> {
150        self.0
151            .iter()
152            .flat_map(|(default_tags, metrics)| VortexMetricsIter {
153                iter: metrics.iter(),
154                default_tags,
155            })
156    }
157}
158
159/// Metrics Iterator that applies the default tags to each metric in the inner iterator.
160pub struct VortexMetricsIter<'a> {
161    iter: MetricsIter<'a>,
162    default_tags: &'a DefaultTags,
163}
164
165impl<'a> Iterator for VortexMetricsIter<'a> {
166    type Item = (MetricId, &'a Metric);
167
168    #[inline]
169    fn next(&mut self) -> Option<(MetricId, &'a Metric)> {
170        self.iter.next().map(|(k, v)| {
171            let mut metric_id = k.clone();
172            for (tag_key, tag_value) in self.default_tags.0.iter() {
173                metric_id = metric_id.with_tag(tag_key.clone(), tag_value.clone())
174            }
175
176            (metric_id, v)
177        })
178    }
179
180    #[inline]
181    fn size_hint(&self) -> (usize, Option<usize>) {
182        self.iter.size_hint()
183    }
184}
185
186impl DefaultTags {
187    fn merge(&self, other: &Self) -> Self {
188        DefaultTags(
189            self.0
190                .iter()
191                .chain(other.0.iter())
192                .map(|(k, v)| (k.clone(), v.clone()))
193                .collect(),
194        )
195    }
196}
197
198#[cfg(test)]
199mod tests {
200    use super::*;
201
202    #[test]
203    fn test_default_tags() -> Result<(), &'static str> {
204        let tags = [("file", "a"), ("partition", "1")];
205        let metrics = VortexMetrics::new_with_tags(tags);
206
207        // Create a metric to verify tags
208        let counter = metrics.counter("test.counter");
209        counter.inc();
210        let snapshot = metrics.snapshot();
211        let (name, metric) = snapshot.iter().next().unwrap();
212        assert_eq!(
213            name,
214            MetricId::new("test.counter")
215                .with_tag("file", "a")
216                .with_tag("partition", "1")
217        );
218        match metric {
219            Metric::Counter(c) => assert_eq!(c.count(), 1),
220            _ => return Err("metric is not a counter"),
221        }
222        Ok(())
223    }
224
225    #[test]
226    fn test_multiple_children_with_different_tags() -> Result<(), &'static str> {
227        let parent_tags = [("service", "vortex")];
228        let parent = VortexMetrics::new_with_tags(parent_tags);
229
230        let child1_tags = [("instance", "child1")];
231        let child2_tags = [("instance", "child2")];
232
233        let child1 = parent.child_with_tags(child1_tags);
234        let child2 = parent.child_with_tags(child2_tags);
235
236        // Create same metric in both children
237        let counter1 = child1.counter("test.counter");
238        let counter2 = child2.counter("test.counter");
239
240        counter1.inc();
241        counter2.add(2);
242
243        // Verify child1 metrics
244        let child1_snapshot = child1.snapshot();
245        let (name, metric) = child1_snapshot.iter().next().unwrap();
246        assert_eq!(
247            name,
248            MetricId::new("test.counter")
249                .with_tag("service", "vortex")
250                .with_tag("instance", "child1")
251        );
252        match metric {
253            Metric::Counter(c) => assert_eq!(c.count(), 1),
254            _ => return Err("metric is not a counter"),
255        }
256
257        // Verify child2 metrics
258        let child2_snapshot = child2.snapshot();
259        let (name, metric) = child2_snapshot.iter().next().unwrap();
260        assert_eq!(
261            name,
262            MetricId::new("test.counter")
263                .with_tag("service", "vortex")
264                .with_tag("instance", "child2")
265        );
266        match metric {
267            Metric::Counter(c) => assert_eq!(c.count(), 2),
268            _ => return Err("metric is not a counter"),
269        }
270        Ok(())
271    }
272
273    #[test]
274    fn test_tag_overriding() -> Result<(), &'static str> {
275        let parent_tags = [("service", "vortex"), ("environment", "test")];
276        let parent = VortexMetrics::new_with_tags(parent_tags);
277
278        // Child tries to override parent's service tag
279        let child_tags = [("service", "override"), ("instance", "child1")];
280        let child = parent.child_with_tags(child_tags);
281
282        let child_counter = child.counter("test.counter");
283        child_counter.inc();
284
285        // Verify child metrics have the overridden tag value
286        let child_snapshot = child.snapshot();
287        let (name, metric) = child_snapshot.iter().next().unwrap();
288        assert_eq!(
289            name,
290            MetricId::new("test.counter")
291                .with_tag("service", "override")
292                .with_tag("environment", "test")
293                .with_tag("instance", "child1")
294        );
295        match metric {
296            Metric::Counter(c) => assert_eq!(c.count(), 1),
297            _ => return Err("metric is not a counter"),
298        }
299        Ok(())
300    }
301}