vortex_metrics/
lib.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4#![deny(missing_docs)]
5
6//! Vortex metrics
7
8mod macros;
9mod session;
10
11use std::borrow::Cow;
12use std::collections::BTreeMap;
13use std::fmt::Debug;
14use std::fmt::Formatter;
15use std::sync::Arc;
16
17use parking_lot::RwLock;
18pub use session::*;
19use witchcraft_metrics::MetricRegistry;
20use witchcraft_metrics::Metrics;
21use witchcraft_metrics::MetricsIter;
22
23/// A metric registry for various performance metrics.
24#[derive(Default, Clone)]
25pub struct VortexMetrics {
26    inner: Arc<Inner>,
27}
28
29#[derive(Default)]
30struct Inner {
31    registry: MetricRegistry,
32    default_tags: DefaultTags,
33    children: RwLock<Vec<VortexMetrics>>,
34}
35
36impl Debug for VortexMetrics {
37    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
38        f.debug_struct("VortexMetrics")
39            .field("default_tags", &self.inner.default_tags)
40            .field("children", &self.inner.children)
41            .finish_non_exhaustive()
42    }
43}
44
45// re-export exposed metric types
46pub use witchcraft_metrics::Counter;
47pub use witchcraft_metrics::Histogram;
48pub use witchcraft_metrics::Metric;
49pub use witchcraft_metrics::MetricId;
50pub use witchcraft_metrics::Tags;
51pub use witchcraft_metrics::Timer;
52
53/// Default tags for metrics used in [`VortexMetrics`].
54#[derive(Default, Clone, Debug)]
55pub struct DefaultTags(BTreeMap<Cow<'static, str>, Cow<'static, str>>);
56
57impl<K, V, I> From<I> for DefaultTags
58where
59    I: IntoIterator<Item = (K, V)>,
60    K: Into<Cow<'static, str>>,
61    V: Into<Cow<'static, str>>,
62{
63    fn from(pairs: I) -> Self {
64        DefaultTags(
65            pairs
66                .into_iter()
67                .map(|(k, v)| (k.into(), v.into()))
68                .collect(),
69        )
70    }
71}
72
73impl VortexMetrics {
74    /// Create a new [`VortexMetrics`] instance.
75    pub fn new(registry: MetricRegistry, default_tags: impl Into<DefaultTags>) -> Self {
76        let inner = Arc::new(Inner {
77            registry,
78            default_tags: default_tags.into(),
79            children: Default::default(),
80        });
81        Self { inner }
82    }
83
84    /// Create an empty metric registry with default tags.
85    pub fn new_with_tags(default_tags: impl Into<DefaultTags>) -> Self {
86        Self::new(MetricRegistry::default(), default_tags)
87    }
88
89    /// Create a new metrics registry with additional tags. Metrics created in the
90    /// child registry will be included in this registry's snapshots.
91    pub fn child_with_tags(&self, additional_tags: impl Into<DefaultTags>) -> Self {
92        let child = Self::new_with_tags(self.inner.default_tags.merge(&additional_tags.into()));
93        self.inner.children.write().push(child.clone());
94        child
95    }
96
97    /// Returns the counter with the specified ID, creating a default instance if absent.
98    ///
99    /// # Panics
100    ///
101    /// Panics if a metric is registered with the ID that is not a counter.
102    pub fn counter<T>(&self, id: T) -> Arc<Counter>
103    where
104        T: Into<MetricId>,
105    {
106        self.inner.registry.counter(id)
107    }
108
109    /// Returns the histogram with the specified ID, creating a default instance if absent.
110    ///
111    /// # Panics
112    ///
113    /// Panics if a metric is registered with the ID that is not a histogram.
114    pub fn histogram<T>(&self, id: T) -> Arc<Histogram>
115    where
116        T: Into<MetricId>,
117    {
118        self.inner.registry.histogram(id)
119    }
120
121    /// Returns the timer with the specified ID, creating a default instance if absent.
122    ///
123    /// # Panics
124    ///
125    /// Panics if a metric is registered with the ID that is not a timer.
126    pub fn timer<T>(&self, id: T) -> Arc<Timer>
127    where
128        T: Into<MetricId>,
129    {
130        self.inner.registry.timer(id)
131    }
132
133    /// Returns a snapshot of the metrics in the registry.
134    ///
135    /// Modifications to the registry after this method is called will not affect the state of the returned `MetricsSnapshot`.
136    ///
137    /// Note: Tag values may contain sensitive information and should be properly sanitized before external exposure.
138    pub fn snapshot(&self) -> MetricsSnapshot {
139        let children = self.inner.children.read();
140        let snapshots = children.iter().map(|c| c.snapshot());
141        MetricsSnapshot(
142            std::iter::once((
143                self.inner.default_tags.clone(),
144                self.inner.registry.metrics(),
145            ))
146            .chain(snapshots.flat_map(|snapshots| snapshots.0.into_iter()))
147            .collect(),
148        )
149    }
150}
151
152/// A snapshot of the metrics in a registry with default tags.
153pub struct MetricsSnapshot(Vec<(DefaultTags, Metrics)>);
154
155impl MetricsSnapshot {
156    /// Create an iterator over the metrics snapshot.
157    pub fn iter(&self) -> impl Iterator<Item = (MetricId, &Metric)> {
158        self.0
159            .iter()
160            .flat_map(|(default_tags, metrics)| VortexMetricsIter {
161                iter: metrics.iter(),
162                default_tags,
163            })
164    }
165}
166
167/// Metrics Iterator that applies the default tags to each metric in the inner iterator.
168pub struct VortexMetricsIter<'a> {
169    iter: MetricsIter<'a>,
170    default_tags: &'a DefaultTags,
171}
172
173impl<'a> Iterator for VortexMetricsIter<'a> {
174    type Item = (MetricId, &'a Metric);
175
176    #[inline]
177    fn next(&mut self) -> Option<(MetricId, &'a Metric)> {
178        self.iter.next().map(|(k, v)| {
179            let mut metric_id = k.clone();
180            for (tag_key, tag_value) in self.default_tags.0.iter() {
181                metric_id = metric_id.with_tag(tag_key.clone(), tag_value.clone())
182            }
183
184            (metric_id, v)
185        })
186    }
187
188    #[inline]
189    fn size_hint(&self) -> (usize, Option<usize>) {
190        self.iter.size_hint()
191    }
192}
193
194impl DefaultTags {
195    fn merge(&self, other: &Self) -> Self {
196        DefaultTags(
197            self.0
198                .iter()
199                .chain(other.0.iter())
200                .map(|(k, v)| (k.clone(), v.clone()))
201                .collect(),
202        )
203    }
204}
205
206#[cfg(test)]
207mod tests {
208    use super::*;
209
210    #[test]
211    fn test_default_tags() -> Result<(), &'static str> {
212        let tags = [("file", "a"), ("partition", "1")];
213        let metrics = VortexMetrics::new_with_tags(tags);
214
215        // Create a metric to verify tags
216        let counter = metrics.counter("test.counter");
217        counter.inc();
218        let snapshot = metrics.snapshot();
219        let (name, metric) = snapshot.iter().next().unwrap();
220        assert_eq!(
221            name,
222            MetricId::new("test.counter")
223                .with_tag("file", "a")
224                .with_tag("partition", "1")
225        );
226        match metric {
227            Metric::Counter(c) => assert_eq!(c.count(), 1),
228            _ => return Err("metric is not a counter"),
229        }
230        Ok(())
231    }
232
233    #[test]
234    fn test_multiple_children_with_different_tags() -> Result<(), &'static str> {
235        let parent_tags = [("service", "vortex")];
236        let parent = VortexMetrics::new_with_tags(parent_tags);
237
238        let child1_tags = [("instance", "child1")];
239        let child2_tags = [("instance", "child2")];
240
241        let child1 = parent.child_with_tags(child1_tags);
242        let child2 = parent.child_with_tags(child2_tags);
243
244        // Create same metric in both children
245        let counter1 = child1.counter("test.counter");
246        let counter2 = child2.counter("test.counter");
247
248        counter1.inc();
249        counter2.add(2);
250
251        // Verify child1 metrics
252        let child1_snapshot = child1.snapshot();
253        let (name, metric) = child1_snapshot.iter().next().unwrap();
254        assert_eq!(
255            name,
256            MetricId::new("test.counter")
257                .with_tag("service", "vortex")
258                .with_tag("instance", "child1")
259        );
260        match metric {
261            Metric::Counter(c) => assert_eq!(c.count(), 1),
262            _ => return Err("metric is not a counter"),
263        }
264
265        // Verify child2 metrics
266        let child2_snapshot = child2.snapshot();
267        let (name, metric) = child2_snapshot.iter().next().unwrap();
268        assert_eq!(
269            name,
270            MetricId::new("test.counter")
271                .with_tag("service", "vortex")
272                .with_tag("instance", "child2")
273        );
274        match metric {
275            Metric::Counter(c) => assert_eq!(c.count(), 2),
276            _ => return Err("metric is not a counter"),
277        }
278        Ok(())
279    }
280
281    #[test]
282    fn test_tag_overriding() -> Result<(), &'static str> {
283        let parent_tags = [("service", "vortex"), ("environment", "test")];
284        let parent = VortexMetrics::new_with_tags(parent_tags);
285
286        // Child tries to override parent's service tag
287        let child_tags = [("service", "override"), ("instance", "child1")];
288        let child = parent.child_with_tags(child_tags);
289
290        let child_counter = child.counter("test.counter");
291        child_counter.inc();
292
293        // Verify child metrics have the overridden tag value
294        let child_snapshot = child.snapshot();
295        let (name, metric) = child_snapshot.iter().next().unwrap();
296        assert_eq!(
297            name,
298            MetricId::new("test.counter")
299                .with_tag("service", "override")
300                .with_tag("environment", "test")
301                .with_tag("instance", "child1")
302        );
303        match metric {
304            Metric::Counter(c) => assert_eq!(c.count(), 1),
305            _ => return Err("metric is not a counter"),
306        }
307        Ok(())
308    }
309}