vortex_metrics/
lib.rs

1#![deny(missing_docs)]
2//! Vortex metrics
3
4mod macros;
5
6use std::borrow::Cow;
7use std::collections::BTreeMap;
8use std::fmt::{Debug, Formatter};
9use std::sync::Arc;
10
11use parking_lot::RwLock;
12use witchcraft_metrics::{MetricRegistry, Metrics, MetricsIter};
13
14/// A metric registry for various performance metrics.
15#[derive(Default, Clone)]
16pub struct VortexMetrics {
17    inner: Arc<Inner>,
18}
19
20#[derive(Default)]
21struct Inner {
22    registry: MetricRegistry,
23    default_tags: DefaultTags,
24    children: RwLock<Vec<VortexMetrics>>,
25}
26
27impl Debug for VortexMetrics {
28    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
29        f.debug_struct("VortexMetrics")
30            .field("default_tags", &self.inner.default_tags)
31            .field("children", &self.inner.children)
32            .finish_non_exhaustive()
33    }
34}
35
36// re-export exposed metric types
37pub use witchcraft_metrics::{Counter, Histogram, Metric, MetricId, Tags, Timer};
38
39/// Default tags for metrics used in [`VortexMetrics`].
40#[derive(Default, Clone, Debug)]
41pub struct DefaultTags(BTreeMap<Cow<'static, str>, Cow<'static, str>>);
42
43impl<K, V, I> From<I> for DefaultTags
44where
45    I: IntoIterator<Item = (K, V)>,
46    K: Into<Cow<'static, str>>,
47    V: Into<Cow<'static, str>>,
48{
49    fn from(pairs: I) -> Self {
50        DefaultTags(
51            pairs
52                .into_iter()
53                .map(|(k, v)| (k.into(), v.into()))
54                .collect(),
55        )
56    }
57}
58
59impl VortexMetrics {
60    /// Create a new [`VortexMetrics`] instance.
61    pub fn new(registry: MetricRegistry, default_tags: impl Into<DefaultTags>) -> Self {
62        let inner = Arc::new(Inner {
63            registry,
64            default_tags: default_tags.into(),
65            children: Default::default(),
66        });
67        Self { inner }
68    }
69
70    /// Create an empty metric registry with default tags.
71    pub fn new_with_tags(default_tags: impl Into<DefaultTags>) -> Self {
72        Self::new(MetricRegistry::default(), default_tags)
73    }
74
75    /// Create a new metrics registry with additional tags. Metrics created in the
76    /// child registry will be included in this registry's snapshots.
77    pub fn child_with_tags(&self, additional_tags: impl Into<DefaultTags>) -> Self {
78        let child = Self::new_with_tags(self.inner.default_tags.merge(&additional_tags.into()));
79        self.inner.children.write().push(child.clone());
80        child
81    }
82
83    /// Returns the counter with the specified ID, creating a default instance if absent.
84    ///
85    /// # Panics
86    ///
87    /// Panics if a metric is registered with the ID that is not a counter.
88    pub fn counter<T>(&self, id: T) -> Arc<Counter>
89    where
90        T: Into<MetricId>,
91    {
92        self.inner.registry.counter(id)
93    }
94
95    /// Returns the histogram with the specified ID, creating a default instance if absent.
96    ///
97    /// # Panics
98    ///
99    /// Panics if a metric is registered with the ID that is not a histogram.
100    pub fn histogram<T>(&self, id: T) -> Arc<Histogram>
101    where
102        T: Into<MetricId>,
103    {
104        self.inner.registry.histogram(id)
105    }
106
107    /// Returns the timer with the specified ID, creating a default instance if absent.
108    ///
109    /// # Panics
110    ///
111    /// Panics if a metric is registered with the ID that is not a timer.
112    pub fn timer<T>(&self, id: T) -> Arc<Timer>
113    where
114        T: Into<MetricId>,
115    {
116        self.inner.registry.timer(id)
117    }
118
119    /// Returns a snapshot of the metrics in the registry.
120    ///
121    /// Modifications to the registry after this method is called will not affect the state of the returned `MetricsSnapshot`.
122    ///
123    /// Note: Tag values may contain sensitive information and should be properly sanitized before external exposure.
124    pub fn snapshot(&self) -> MetricsSnapshot {
125        let children = self.inner.children.read();
126        let snapshots = children.iter().map(|c| c.snapshot());
127        MetricsSnapshot(
128            std::iter::once((
129                self.inner.default_tags.clone(),
130                self.inner.registry.metrics(),
131            ))
132            .chain(snapshots.flat_map(|snapshots| snapshots.0.into_iter()))
133            .collect(),
134        )
135    }
136}
137
138/// A snapshot of the metrics in a registry with default tags.
139pub struct MetricsSnapshot(Vec<(DefaultTags, Metrics)>);
140
141impl MetricsSnapshot {
142    /// Create an iterator over the metrics snapshot.
143    pub fn iter(&self) -> impl Iterator<Item = (MetricId, &Metric)> {
144        self.0
145            .iter()
146            .flat_map(|(default_tags, metrics)| VortexMetricsIter {
147                iter: metrics.iter(),
148                default_tags,
149            })
150    }
151}
152
153/// Metrics Iterator that applies the default tags to each metric in the inner iterator.
154pub struct VortexMetricsIter<'a> {
155    iter: MetricsIter<'a>,
156    default_tags: &'a DefaultTags,
157}
158
159impl<'a> Iterator for VortexMetricsIter<'a> {
160    type Item = (MetricId, &'a Metric);
161
162    #[inline]
163    fn next(&mut self) -> Option<(MetricId, &'a Metric)> {
164        self.iter.next().map(|(k, v)| {
165            let mut metric_id = k.clone();
166            for (tag_key, tag_value) in self.default_tags.0.iter() {
167                metric_id = metric_id.with_tag(tag_key.clone(), tag_value.clone())
168            }
169
170            (metric_id, v)
171        })
172    }
173
174    #[inline]
175    fn size_hint(&self) -> (usize, Option<usize>) {
176        self.iter.size_hint()
177    }
178}
179
180impl DefaultTags {
181    fn merge(&self, other: &Self) -> Self {
182        DefaultTags(
183            self.0
184                .iter()
185                .chain(other.0.iter())
186                .map(|(k, v)| (k.clone(), v.clone()))
187                .collect(),
188        )
189    }
190}
191
192#[cfg(test)]
193mod tests {
194    use super::*;
195
196    #[test]
197    fn test_default_tags() -> Result<(), &'static str> {
198        let tags = [("file", "a"), ("partition", "1")];
199        let metrics = VortexMetrics::new_with_tags(tags);
200
201        // Create a metric to verify tags
202        let counter = metrics.counter("test.counter");
203        counter.inc();
204        let snapshot = metrics.snapshot();
205        let (name, metric) = snapshot.iter().next().unwrap();
206        assert_eq!(
207            name,
208            MetricId::new("test.counter")
209                .with_tag("file", "a")
210                .with_tag("partition", "1")
211        );
212        match metric {
213            Metric::Counter(c) => assert_eq!(c.count(), 1),
214            _ => return Err("metric is not a counter"),
215        }
216        Ok(())
217    }
218
219    #[test]
220    fn test_multiple_children_with_different_tags() -> Result<(), &'static str> {
221        let parent_tags = [("service", "vortex")];
222        let parent = VortexMetrics::new_with_tags(parent_tags);
223
224        let child1_tags = [("instance", "child1")];
225        let child2_tags = [("instance", "child2")];
226
227        let child1 = parent.child_with_tags(child1_tags);
228        let child2 = parent.child_with_tags(child2_tags);
229
230        // Create same metric in both children
231        let counter1 = child1.counter("test.counter");
232        let counter2 = child2.counter("test.counter");
233
234        counter1.inc();
235        counter2.add(2);
236
237        // Verify child1 metrics
238        let child1_snapshot = child1.snapshot();
239        let (name, metric) = child1_snapshot.iter().next().unwrap();
240        assert_eq!(
241            name,
242            MetricId::new("test.counter")
243                .with_tag("service", "vortex")
244                .with_tag("instance", "child1")
245        );
246        match metric {
247            Metric::Counter(c) => assert_eq!(c.count(), 1),
248            _ => return Err("metric is not a counter"),
249        }
250
251        // Verify child2 metrics
252        let child2_snapshot = child2.snapshot();
253        let (name, metric) = child2_snapshot.iter().next().unwrap();
254        assert_eq!(
255            name,
256            MetricId::new("test.counter")
257                .with_tag("service", "vortex")
258                .with_tag("instance", "child2")
259        );
260        match metric {
261            Metric::Counter(c) => assert_eq!(c.count(), 2),
262            _ => return Err("metric is not a counter"),
263        }
264        Ok(())
265    }
266
267    #[test]
268    fn test_tag_overriding() -> Result<(), &'static str> {
269        let parent_tags = [("service", "vortex"), ("environment", "test")];
270        let parent = VortexMetrics::new_with_tags(parent_tags);
271
272        // Child tries to override parent's service tag
273        let child_tags = [("service", "override"), ("instance", "child1")];
274        let child = parent.child_with_tags(child_tags);
275
276        let child_counter = child.counter("test.counter");
277        child_counter.inc();
278
279        // Verify child metrics have the overridden tag value
280        let child_snapshot = child.snapshot();
281        let (name, metric) = child_snapshot.iter().next().unwrap();
282        assert_eq!(
283            name,
284            MetricId::new("test.counter")
285                .with_tag("service", "override")
286                .with_tag("environment", "test")
287                .with_tag("instance", "child1")
288        );
289        match metric {
290            Metric::Counter(c) => assert_eq!(c.count(), 1),
291            _ => return Err("metric is not a counter"),
292        }
293        Ok(())
294    }
295}