vortex_metrics/
lib.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4#![deny(missing_docs)]
5
6//! Vortex metrics
7
8mod macros;
9mod session;
10
11use std::borrow::Cow;
12use std::collections::BTreeMap;
13use std::fmt::{Debug, Formatter};
14use std::sync::Arc;
15
16use parking_lot::RwLock;
17pub use session::*;
18use witchcraft_metrics::{MetricRegistry, Metrics, MetricsIter};
19
20/// A metric registry for various performance metrics.
21#[derive(Default, Clone)]
22pub struct VortexMetrics {
23    inner: Arc<Inner>,
24}
25
26#[derive(Default)]
27struct Inner {
28    registry: MetricRegistry,
29    default_tags: DefaultTags,
30    children: RwLock<Vec<VortexMetrics>>,
31}
32
33impl Debug for VortexMetrics {
34    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
35        f.debug_struct("VortexMetrics")
36            .field("default_tags", &self.inner.default_tags)
37            .field("children", &self.inner.children)
38            .finish_non_exhaustive()
39    }
40}
41
42// re-export exposed metric types
43pub use witchcraft_metrics::{Counter, Histogram, Metric, MetricId, Tags, Timer};
44
45/// Default tags for metrics used in [`VortexMetrics`].
46#[derive(Default, Clone, Debug)]
47pub struct DefaultTags(BTreeMap<Cow<'static, str>, Cow<'static, str>>);
48
49impl<K, V, I> From<I> for DefaultTags
50where
51    I: IntoIterator<Item = (K, V)>,
52    K: Into<Cow<'static, str>>,
53    V: Into<Cow<'static, str>>,
54{
55    fn from(pairs: I) -> Self {
56        DefaultTags(
57            pairs
58                .into_iter()
59                .map(|(k, v)| (k.into(), v.into()))
60                .collect(),
61        )
62    }
63}
64
65impl VortexMetrics {
66    /// Create a new [`VortexMetrics`] instance.
67    pub fn new(registry: MetricRegistry, default_tags: impl Into<DefaultTags>) -> Self {
68        let inner = Arc::new(Inner {
69            registry,
70            default_tags: default_tags.into(),
71            children: Default::default(),
72        });
73        Self { inner }
74    }
75
76    /// Create an empty metric registry with default tags.
77    pub fn new_with_tags(default_tags: impl Into<DefaultTags>) -> Self {
78        Self::new(MetricRegistry::default(), default_tags)
79    }
80
81    /// Create a new metrics registry with additional tags. Metrics created in the
82    /// child registry will be included in this registry's snapshots.
83    pub fn child_with_tags(&self, additional_tags: impl Into<DefaultTags>) -> Self {
84        let child = Self::new_with_tags(self.inner.default_tags.merge(&additional_tags.into()));
85        self.inner.children.write().push(child.clone());
86        child
87    }
88
89    /// Returns the counter with the specified ID, creating a default instance if absent.
90    ///
91    /// # Panics
92    ///
93    /// Panics if a metric is registered with the ID that is not a counter.
94    pub fn counter<T>(&self, id: T) -> Arc<Counter>
95    where
96        T: Into<MetricId>,
97    {
98        self.inner.registry.counter(id)
99    }
100
101    /// Returns the histogram with the specified ID, creating a default instance if absent.
102    ///
103    /// # Panics
104    ///
105    /// Panics if a metric is registered with the ID that is not a histogram.
106    pub fn histogram<T>(&self, id: T) -> Arc<Histogram>
107    where
108        T: Into<MetricId>,
109    {
110        self.inner.registry.histogram(id)
111    }
112
113    /// Returns the timer with the specified ID, creating a default instance if absent.
114    ///
115    /// # Panics
116    ///
117    /// Panics if a metric is registered with the ID that is not a timer.
118    pub fn timer<T>(&self, id: T) -> Arc<Timer>
119    where
120        T: Into<MetricId>,
121    {
122        self.inner.registry.timer(id)
123    }
124
125    /// Returns a snapshot of the metrics in the registry.
126    ///
127    /// Modifications to the registry after this method is called will not affect the state of the returned `MetricsSnapshot`.
128    ///
129    /// Note: Tag values may contain sensitive information and should be properly sanitized before external exposure.
130    pub fn snapshot(&self) -> MetricsSnapshot {
131        let children = self.inner.children.read();
132        let snapshots = children.iter().map(|c| c.snapshot());
133        MetricsSnapshot(
134            std::iter::once((
135                self.inner.default_tags.clone(),
136                self.inner.registry.metrics(),
137            ))
138            .chain(snapshots.flat_map(|snapshots| snapshots.0.into_iter()))
139            .collect(),
140        )
141    }
142}
143
144/// A snapshot of the metrics in a registry with default tags.
145pub struct MetricsSnapshot(Vec<(DefaultTags, Metrics)>);
146
147impl MetricsSnapshot {
148    /// Create an iterator over the metrics snapshot.
149    pub fn iter(&self) -> impl Iterator<Item = (MetricId, &Metric)> {
150        self.0
151            .iter()
152            .flat_map(|(default_tags, metrics)| VortexMetricsIter {
153                iter: metrics.iter(),
154                default_tags,
155            })
156    }
157}
158
159/// Metrics Iterator that applies the default tags to each metric in the inner iterator.
160pub struct VortexMetricsIter<'a> {
161    iter: MetricsIter<'a>,
162    default_tags: &'a DefaultTags,
163}
164
165impl<'a> Iterator for VortexMetricsIter<'a> {
166    type Item = (MetricId, &'a Metric);
167
168    #[inline]
169    fn next(&mut self) -> Option<(MetricId, &'a Metric)> {
170        self.iter.next().map(|(k, v)| {
171            let mut metric_id = k.clone();
172            for (tag_key, tag_value) in self.default_tags.0.iter() {
173                metric_id = metric_id.with_tag(tag_key.clone(), tag_value.clone())
174            }
175
176            (metric_id, v)
177        })
178    }
179
180    #[inline]
181    fn size_hint(&self) -> (usize, Option<usize>) {
182        self.iter.size_hint()
183    }
184}
185
186impl DefaultTags {
187    fn merge(&self, other: &Self) -> Self {
188        DefaultTags(
189            self.0
190                .iter()
191                .chain(other.0.iter())
192                .map(|(k, v)| (k.clone(), v.clone()))
193                .collect(),
194        )
195    }
196}
197
198#[cfg(test)]
199mod tests {
200    use super::*;
201
202    #[test]
203    fn test_default_tags() -> Result<(), &'static str> {
204        let tags = [("file", "a"), ("partition", "1")];
205        let metrics = VortexMetrics::new_with_tags(tags);
206
207        // Create a metric to verify tags
208        let counter = metrics.counter("test.counter");
209        counter.inc();
210        let snapshot = metrics.snapshot();
211        let (name, metric) = snapshot.iter().next().unwrap();
212        assert_eq!(
213            name,
214            MetricId::new("test.counter")
215                .with_tag("file", "a")
216                .with_tag("partition", "1")
217        );
218        match metric {
219            Metric::Counter(c) => assert_eq!(c.count(), 1),
220            _ => return Err("metric is not a counter"),
221        }
222        Ok(())
223    }
224
225    #[test]
226    fn test_multiple_children_with_different_tags() -> Result<(), &'static str> {
227        let parent_tags = [("service", "vortex")];
228        let parent = VortexMetrics::new_with_tags(parent_tags);
229
230        let child1_tags = [("instance", "child1")];
231        let child2_tags = [("instance", "child2")];
232
233        let child1 = parent.child_with_tags(child1_tags);
234        let child2 = parent.child_with_tags(child2_tags);
235
236        // Create same metric in both children
237        let counter1 = child1.counter("test.counter");
238        let counter2 = child2.counter("test.counter");
239
240        counter1.inc();
241        counter2.add(2);
242
243        // Verify child1 metrics
244        let child1_snapshot = child1.snapshot();
245        let (name, metric) = child1_snapshot.iter().next().unwrap();
246        assert_eq!(
247            name,
248            MetricId::new("test.counter")
249                .with_tag("service", "vortex")
250                .with_tag("instance", "child1")
251        );
252        match metric {
253            Metric::Counter(c) => assert_eq!(c.count(), 1),
254            _ => return Err("metric is not a counter"),
255        }
256
257        // Verify child2 metrics
258        let child2_snapshot = child2.snapshot();
259        let (name, metric) = child2_snapshot.iter().next().unwrap();
260        assert_eq!(
261            name,
262            MetricId::new("test.counter")
263                .with_tag("service", "vortex")
264                .with_tag("instance", "child2")
265        );
266        match metric {
267            Metric::Counter(c) => assert_eq!(c.count(), 2),
268            _ => return Err("metric is not a counter"),
269        }
270        Ok(())
271    }
272
273    #[test]
274    fn test_tag_overriding() -> Result<(), &'static str> {
275        let parent_tags = [("service", "vortex"), ("environment", "test")];
276        let parent = VortexMetrics::new_with_tags(parent_tags);
277
278        // Child tries to override parent's service tag
279        let child_tags = [("service", "override"), ("instance", "child1")];
280        let child = parent.child_with_tags(child_tags);
281
282        let child_counter = child.counter("test.counter");
283        child_counter.inc();
284
285        // Verify child metrics have the overridden tag value
286        let child_snapshot = child.snapshot();
287        let (name, metric) = child_snapshot.iter().next().unwrap();
288        assert_eq!(
289            name,
290            MetricId::new("test.counter")
291                .with_tag("service", "override")
292                .with_tag("environment", "test")
293                .with_tag("instance", "child1")
294        );
295        match metric {
296            Metric::Counter(c) => assert_eq!(c.count(), 1),
297            _ => return Err("metric is not a counter"),
298        }
299        Ok(())
300    }
301}