vortex_metrics/
lib.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4#![deny(missing_docs)]
5//! Vortex metrics
6
7mod macros;
8
9use std::borrow::Cow;
10use std::collections::BTreeMap;
11use std::fmt::{Debug, Formatter};
12use std::sync::Arc;
13
14use parking_lot::RwLock;
15use witchcraft_metrics::{MetricRegistry, Metrics, MetricsIter};
16
17/// A metric registry for various performance metrics.
18#[derive(Default, Clone)]
19pub struct VortexMetrics {
20    inner: Arc<Inner>,
21}
22
23#[derive(Default)]
24struct Inner {
25    registry: MetricRegistry,
26    default_tags: DefaultTags,
27    children: RwLock<Vec<VortexMetrics>>,
28}
29
30impl Debug for VortexMetrics {
31    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
32        f.debug_struct("VortexMetrics")
33            .field("default_tags", &self.inner.default_tags)
34            .field("children", &self.inner.children)
35            .finish_non_exhaustive()
36    }
37}
38
39// re-export exposed metric types
40pub use witchcraft_metrics::{Counter, Histogram, Metric, MetricId, Tags, Timer};
41
42/// Default tags for metrics used in [`VortexMetrics`].
43#[derive(Default, Clone, Debug)]
44pub struct DefaultTags(BTreeMap<Cow<'static, str>, Cow<'static, str>>);
45
46impl<K, V, I> From<I> for DefaultTags
47where
48    I: IntoIterator<Item = (K, V)>,
49    K: Into<Cow<'static, str>>,
50    V: Into<Cow<'static, str>>,
51{
52    fn from(pairs: I) -> Self {
53        DefaultTags(
54            pairs
55                .into_iter()
56                .map(|(k, v)| (k.into(), v.into()))
57                .collect(),
58        )
59    }
60}
61
62impl VortexMetrics {
63    /// Create a new [`VortexMetrics`] instance.
64    pub fn new(registry: MetricRegistry, default_tags: impl Into<DefaultTags>) -> Self {
65        let inner = Arc::new(Inner {
66            registry,
67            default_tags: default_tags.into(),
68            children: Default::default(),
69        });
70        Self { inner }
71    }
72
73    /// Create an empty metric registry with default tags.
74    pub fn new_with_tags(default_tags: impl Into<DefaultTags>) -> Self {
75        Self::new(MetricRegistry::default(), default_tags)
76    }
77
78    /// Create a new metrics registry with additional tags. Metrics created in the
79    /// child registry will be included in this registry's snapshots.
80    pub fn child_with_tags(&self, additional_tags: impl Into<DefaultTags>) -> Self {
81        let child = Self::new_with_tags(self.inner.default_tags.merge(&additional_tags.into()));
82        self.inner.children.write().push(child.clone());
83        child
84    }
85
86    /// Returns the counter with the specified ID, creating a default instance if absent.
87    ///
88    /// # Panics
89    ///
90    /// Panics if a metric is registered with the ID that is not a counter.
91    pub fn counter<T>(&self, id: T) -> Arc<Counter>
92    where
93        T: Into<MetricId>,
94    {
95        self.inner.registry.counter(id)
96    }
97
98    /// Returns the histogram with the specified ID, creating a default instance if absent.
99    ///
100    /// # Panics
101    ///
102    /// Panics if a metric is registered with the ID that is not a histogram.
103    pub fn histogram<T>(&self, id: T) -> Arc<Histogram>
104    where
105        T: Into<MetricId>,
106    {
107        self.inner.registry.histogram(id)
108    }
109
110    /// Returns the timer with the specified ID, creating a default instance if absent.
111    ///
112    /// # Panics
113    ///
114    /// Panics if a metric is registered with the ID that is not a timer.
115    pub fn timer<T>(&self, id: T) -> Arc<Timer>
116    where
117        T: Into<MetricId>,
118    {
119        self.inner.registry.timer(id)
120    }
121
122    /// Returns a snapshot of the metrics in the registry.
123    ///
124    /// Modifications to the registry after this method is called will not affect the state of the returned `MetricsSnapshot`.
125    ///
126    /// Note: Tag values may contain sensitive information and should be properly sanitized before external exposure.
127    pub fn snapshot(&self) -> MetricsSnapshot {
128        let children = self.inner.children.read();
129        let snapshots = children.iter().map(|c| c.snapshot());
130        MetricsSnapshot(
131            std::iter::once((
132                self.inner.default_tags.clone(),
133                self.inner.registry.metrics(),
134            ))
135            .chain(snapshots.flat_map(|snapshots| snapshots.0.into_iter()))
136            .collect(),
137        )
138    }
139}
140
141/// A snapshot of the metrics in a registry with default tags.
142pub struct MetricsSnapshot(Vec<(DefaultTags, Metrics)>);
143
144impl MetricsSnapshot {
145    /// Create an iterator over the metrics snapshot.
146    pub fn iter(&self) -> impl Iterator<Item = (MetricId, &Metric)> {
147        self.0
148            .iter()
149            .flat_map(|(default_tags, metrics)| VortexMetricsIter {
150                iter: metrics.iter(),
151                default_tags,
152            })
153    }
154}
155
156/// Metrics Iterator that applies the default tags to each metric in the inner iterator.
157pub struct VortexMetricsIter<'a> {
158    iter: MetricsIter<'a>,
159    default_tags: &'a DefaultTags,
160}
161
162impl<'a> Iterator for VortexMetricsIter<'a> {
163    type Item = (MetricId, &'a Metric);
164
165    #[inline]
166    fn next(&mut self) -> Option<(MetricId, &'a Metric)> {
167        self.iter.next().map(|(k, v)| {
168            let mut metric_id = k.clone();
169            for (tag_key, tag_value) in self.default_tags.0.iter() {
170                metric_id = metric_id.with_tag(tag_key.clone(), tag_value.clone())
171            }
172
173            (metric_id, v)
174        })
175    }
176
177    #[inline]
178    fn size_hint(&self) -> (usize, Option<usize>) {
179        self.iter.size_hint()
180    }
181}
182
183impl DefaultTags {
184    fn merge(&self, other: &Self) -> Self {
185        DefaultTags(
186            self.0
187                .iter()
188                .chain(other.0.iter())
189                .map(|(k, v)| (k.clone(), v.clone()))
190                .collect(),
191        )
192    }
193}
194
195#[cfg(test)]
196mod tests {
197    use super::*;
198
199    #[test]
200    fn test_default_tags() -> Result<(), &'static str> {
201        let tags = [("file", "a"), ("partition", "1")];
202        let metrics = VortexMetrics::new_with_tags(tags);
203
204        // Create a metric to verify tags
205        let counter = metrics.counter("test.counter");
206        counter.inc();
207        let snapshot = metrics.snapshot();
208        let (name, metric) = snapshot.iter().next().unwrap();
209        assert_eq!(
210            name,
211            MetricId::new("test.counter")
212                .with_tag("file", "a")
213                .with_tag("partition", "1")
214        );
215        match metric {
216            Metric::Counter(c) => assert_eq!(c.count(), 1),
217            _ => return Err("metric is not a counter"),
218        }
219        Ok(())
220    }
221
222    #[test]
223    fn test_multiple_children_with_different_tags() -> Result<(), &'static str> {
224        let parent_tags = [("service", "vortex")];
225        let parent = VortexMetrics::new_with_tags(parent_tags);
226
227        let child1_tags = [("instance", "child1")];
228        let child2_tags = [("instance", "child2")];
229
230        let child1 = parent.child_with_tags(child1_tags);
231        let child2 = parent.child_with_tags(child2_tags);
232
233        // Create same metric in both children
234        let counter1 = child1.counter("test.counter");
235        let counter2 = child2.counter("test.counter");
236
237        counter1.inc();
238        counter2.add(2);
239
240        // Verify child1 metrics
241        let child1_snapshot = child1.snapshot();
242        let (name, metric) = child1_snapshot.iter().next().unwrap();
243        assert_eq!(
244            name,
245            MetricId::new("test.counter")
246                .with_tag("service", "vortex")
247                .with_tag("instance", "child1")
248        );
249        match metric {
250            Metric::Counter(c) => assert_eq!(c.count(), 1),
251            _ => return Err("metric is not a counter"),
252        }
253
254        // Verify child2 metrics
255        let child2_snapshot = child2.snapshot();
256        let (name, metric) = child2_snapshot.iter().next().unwrap();
257        assert_eq!(
258            name,
259            MetricId::new("test.counter")
260                .with_tag("service", "vortex")
261                .with_tag("instance", "child2")
262        );
263        match metric {
264            Metric::Counter(c) => assert_eq!(c.count(), 2),
265            _ => return Err("metric is not a counter"),
266        }
267        Ok(())
268    }
269
270    #[test]
271    fn test_tag_overriding() -> Result<(), &'static str> {
272        let parent_tags = [("service", "vortex"), ("environment", "test")];
273        let parent = VortexMetrics::new_with_tags(parent_tags);
274
275        // Child tries to override parent's service tag
276        let child_tags = [("service", "override"), ("instance", "child1")];
277        let child = parent.child_with_tags(child_tags);
278
279        let child_counter = child.counter("test.counter");
280        child_counter.inc();
281
282        // Verify child metrics have the overridden tag value
283        let child_snapshot = child.snapshot();
284        let (name, metric) = child_snapshot.iter().next().unwrap();
285        assert_eq!(
286            name,
287            MetricId::new("test.counter")
288                .with_tag("service", "override")
289                .with_tag("environment", "test")
290                .with_tag("instance", "child1")
291        );
292        match metric {
293            Metric::Counter(c) => assert_eq!(c.count(), 1),
294            _ => return Err("metric is not a counter"),
295        }
296        Ok(())
297    }
298}