1#![deny(missing_docs)]
5mod macros;
8
9use std::borrow::Cow;
10use std::collections::BTreeMap;
11use std::fmt::{Debug, Formatter};
12use std::sync::Arc;
13
14use parking_lot::RwLock;
15use witchcraft_metrics::{MetricRegistry, Metrics, MetricsIter};
16
17#[derive(Default, Clone)]
19pub struct VortexMetrics {
20 inner: Arc<Inner>,
21}
22
23#[derive(Default)]
24struct Inner {
25 registry: MetricRegistry,
26 default_tags: DefaultTags,
27 children: RwLock<Vec<VortexMetrics>>,
28}
29
30impl Debug for VortexMetrics {
31 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
32 f.debug_struct("VortexMetrics")
33 .field("default_tags", &self.inner.default_tags)
34 .field("children", &self.inner.children)
35 .finish_non_exhaustive()
36 }
37}
38
39pub use witchcraft_metrics::{Counter, Histogram, Metric, MetricId, Tags, Timer};
41
42#[derive(Default, Clone, Debug)]
44pub struct DefaultTags(BTreeMap<Cow<'static, str>, Cow<'static, str>>);
45
46impl<K, V, I> From<I> for DefaultTags
47where
48 I: IntoIterator<Item = (K, V)>,
49 K: Into<Cow<'static, str>>,
50 V: Into<Cow<'static, str>>,
51{
52 fn from(pairs: I) -> Self {
53 DefaultTags(
54 pairs
55 .into_iter()
56 .map(|(k, v)| (k.into(), v.into()))
57 .collect(),
58 )
59 }
60}
61
62impl VortexMetrics {
63 pub fn new(registry: MetricRegistry, default_tags: impl Into<DefaultTags>) -> Self {
65 let inner = Arc::new(Inner {
66 registry,
67 default_tags: default_tags.into(),
68 children: Default::default(),
69 });
70 Self { inner }
71 }
72
73 pub fn new_with_tags(default_tags: impl Into<DefaultTags>) -> Self {
75 Self::new(MetricRegistry::default(), default_tags)
76 }
77
78 pub fn child_with_tags(&self, additional_tags: impl Into<DefaultTags>) -> Self {
81 let child = Self::new_with_tags(self.inner.default_tags.merge(&additional_tags.into()));
82 self.inner.children.write().push(child.clone());
83 child
84 }
85
86 pub fn counter<T>(&self, id: T) -> Arc<Counter>
92 where
93 T: Into<MetricId>,
94 {
95 self.inner.registry.counter(id)
96 }
97
98 pub fn histogram<T>(&self, id: T) -> Arc<Histogram>
104 where
105 T: Into<MetricId>,
106 {
107 self.inner.registry.histogram(id)
108 }
109
110 pub fn timer<T>(&self, id: T) -> Arc<Timer>
116 where
117 T: Into<MetricId>,
118 {
119 self.inner.registry.timer(id)
120 }
121
122 pub fn snapshot(&self) -> MetricsSnapshot {
128 let children = self.inner.children.read();
129 let snapshots = children.iter().map(|c| c.snapshot());
130 MetricsSnapshot(
131 std::iter::once((
132 self.inner.default_tags.clone(),
133 self.inner.registry.metrics(),
134 ))
135 .chain(snapshots.flat_map(|snapshots| snapshots.0.into_iter()))
136 .collect(),
137 )
138 }
139}
140
141pub struct MetricsSnapshot(Vec<(DefaultTags, Metrics)>);
143
144impl MetricsSnapshot {
145 pub fn iter(&self) -> impl Iterator<Item = (MetricId, &Metric)> {
147 self.0
148 .iter()
149 .flat_map(|(default_tags, metrics)| VortexMetricsIter {
150 iter: metrics.iter(),
151 default_tags,
152 })
153 }
154}
155
156pub struct VortexMetricsIter<'a> {
158 iter: MetricsIter<'a>,
159 default_tags: &'a DefaultTags,
160}
161
162impl<'a> Iterator for VortexMetricsIter<'a> {
163 type Item = (MetricId, &'a Metric);
164
165 #[inline]
166 fn next(&mut self) -> Option<(MetricId, &'a Metric)> {
167 self.iter.next().map(|(k, v)| {
168 let mut metric_id = k.clone();
169 for (tag_key, tag_value) in self.default_tags.0.iter() {
170 metric_id = metric_id.with_tag(tag_key.clone(), tag_value.clone())
171 }
172
173 (metric_id, v)
174 })
175 }
176
177 #[inline]
178 fn size_hint(&self) -> (usize, Option<usize>) {
179 self.iter.size_hint()
180 }
181}
182
183impl DefaultTags {
184 fn merge(&self, other: &Self) -> Self {
185 DefaultTags(
186 self.0
187 .iter()
188 .chain(other.0.iter())
189 .map(|(k, v)| (k.clone(), v.clone()))
190 .collect(),
191 )
192 }
193}
194
195#[cfg(test)]
196mod tests {
197 use super::*;
198
199 #[test]
200 fn test_default_tags() -> Result<(), &'static str> {
201 let tags = [("file", "a"), ("partition", "1")];
202 let metrics = VortexMetrics::new_with_tags(tags);
203
204 let counter = metrics.counter("test.counter");
206 counter.inc();
207 let snapshot = metrics.snapshot();
208 let (name, metric) = snapshot.iter().next().unwrap();
209 assert_eq!(
210 name,
211 MetricId::new("test.counter")
212 .with_tag("file", "a")
213 .with_tag("partition", "1")
214 );
215 match metric {
216 Metric::Counter(c) => assert_eq!(c.count(), 1),
217 _ => return Err("metric is not a counter"),
218 }
219 Ok(())
220 }
221
222 #[test]
223 fn test_multiple_children_with_different_tags() -> Result<(), &'static str> {
224 let parent_tags = [("service", "vortex")];
225 let parent = VortexMetrics::new_with_tags(parent_tags);
226
227 let child1_tags = [("instance", "child1")];
228 let child2_tags = [("instance", "child2")];
229
230 let child1 = parent.child_with_tags(child1_tags);
231 let child2 = parent.child_with_tags(child2_tags);
232
233 let counter1 = child1.counter("test.counter");
235 let counter2 = child2.counter("test.counter");
236
237 counter1.inc();
238 counter2.add(2);
239
240 let child1_snapshot = child1.snapshot();
242 let (name, metric) = child1_snapshot.iter().next().unwrap();
243 assert_eq!(
244 name,
245 MetricId::new("test.counter")
246 .with_tag("service", "vortex")
247 .with_tag("instance", "child1")
248 );
249 match metric {
250 Metric::Counter(c) => assert_eq!(c.count(), 1),
251 _ => return Err("metric is not a counter"),
252 }
253
254 let child2_snapshot = child2.snapshot();
256 let (name, metric) = child2_snapshot.iter().next().unwrap();
257 assert_eq!(
258 name,
259 MetricId::new("test.counter")
260 .with_tag("service", "vortex")
261 .with_tag("instance", "child2")
262 );
263 match metric {
264 Metric::Counter(c) => assert_eq!(c.count(), 2),
265 _ => return Err("metric is not a counter"),
266 }
267 Ok(())
268 }
269
270 #[test]
271 fn test_tag_overriding() -> Result<(), &'static str> {
272 let parent_tags = [("service", "vortex"), ("environment", "test")];
273 let parent = VortexMetrics::new_with_tags(parent_tags);
274
275 let child_tags = [("service", "override"), ("instance", "child1")];
277 let child = parent.child_with_tags(child_tags);
278
279 let child_counter = child.counter("test.counter");
280 child_counter.inc();
281
282 let child_snapshot = child.snapshot();
284 let (name, metric) = child_snapshot.iter().next().unwrap();
285 assert_eq!(
286 name,
287 MetricId::new("test.counter")
288 .with_tag("service", "override")
289 .with_tag("environment", "test")
290 .with_tag("instance", "child1")
291 );
292 match metric {
293 Metric::Counter(c) => assert_eq!(c.count(), 1),
294 _ => return Err("metric is not a counter"),
295 }
296 Ok(())
297 }
298}