1#![deny(missing_docs)]
2mod macros;
5
6use std::borrow::Cow;
7use std::collections::BTreeMap;
8use std::fmt::{Debug, Formatter};
9use std::sync::{Arc, RwLock};
10
11use vortex_error::VortexExpect;
12use witchcraft_metrics::{MetricRegistry, Metrics, MetricsIter};
13
14#[derive(Default, Clone)]
16pub struct VortexMetrics {
17 inner: Arc<Inner>,
18}
19
20#[derive(Default)]
21struct Inner {
22 registry: MetricRegistry,
23 default_tags: DefaultTags,
24 children: RwLock<Vec<VortexMetrics>>,
25}
26
27impl Debug for VortexMetrics {
28 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
29 f.debug_struct("VortexMetrics")
30 .field("default_tags", &self.inner.default_tags)
31 .field("children", &self.inner.children)
32 .finish_non_exhaustive()
33 }
34}
35
36pub use witchcraft_metrics::{Counter, Histogram, Metric, MetricId, Tags, Timer};
38
39#[derive(Default, Clone, Debug)]
41pub struct DefaultTags(BTreeMap<Cow<'static, str>, Cow<'static, str>>);
42
43impl<K, V, I> From<I> for DefaultTags
44where
45 I: IntoIterator<Item = (K, V)>,
46 K: Into<Cow<'static, str>>,
47 V: Into<Cow<'static, str>>,
48{
49 fn from(pairs: I) -> Self {
50 DefaultTags(
51 pairs
52 .into_iter()
53 .map(|(k, v)| (k.into(), v.into()))
54 .collect(),
55 )
56 }
57}
58
59impl VortexMetrics {
60 pub fn new(registry: MetricRegistry, default_tags: impl Into<DefaultTags>) -> Self {
62 let inner = Arc::new(Inner {
63 registry,
64 default_tags: default_tags.into(),
65 children: Default::default(),
66 });
67 Self { inner }
68 }
69
70 pub fn new_with_tags(default_tags: impl Into<DefaultTags>) -> Self {
72 Self::new(MetricRegistry::default(), default_tags)
73 }
74
75 pub fn child_with_tags(&self, additional_tags: impl Into<DefaultTags>) -> Self {
78 let child = Self::new_with_tags(self.inner.default_tags.merge(&additional_tags.into()));
79 self.inner
80 .children
81 .write()
82 .vortex_expect("failed to acquire write lock on children")
83 .push(child.clone());
84 child
85 }
86
87 pub fn counter<T>(&self, id: T) -> Arc<Counter>
93 where
94 T: Into<MetricId>,
95 {
96 self.inner.registry.counter(id)
97 }
98
99 pub fn histogram<T>(&self, id: T) -> Arc<Histogram>
105 where
106 T: Into<MetricId>,
107 {
108 self.inner.registry.histogram(id)
109 }
110
111 pub fn timer<T>(&self, id: T) -> Arc<Timer>
117 where
118 T: Into<MetricId>,
119 {
120 self.inner.registry.timer(id)
121 }
122
123 pub fn snapshot(&self) -> MetricsSnapshot {
129 let children = self
130 .inner
131 .children
132 .read()
133 .vortex_expect("failed to acquire read lock on children");
134 let snapshots = children.iter().map(|c| c.snapshot());
135 MetricsSnapshot(
136 std::iter::once((
137 self.inner.default_tags.clone(),
138 self.inner.registry.metrics(),
139 ))
140 .chain(snapshots.flat_map(|snapshots| snapshots.0.into_iter()))
141 .collect(),
142 )
143 }
144}
145
146pub struct MetricsSnapshot(Vec<(DefaultTags, Metrics)>);
148
149impl MetricsSnapshot {
150 pub fn iter(&self) -> impl Iterator<Item = (MetricId, &Metric)> {
152 self.0
153 .iter()
154 .flat_map(|(default_tags, metrics)| VortexMetricsIter {
155 iter: metrics.iter(),
156 default_tags,
157 })
158 }
159}
160
161pub struct VortexMetricsIter<'a> {
163 iter: MetricsIter<'a>,
164 default_tags: &'a DefaultTags,
165}
166
167impl<'a> Iterator for VortexMetricsIter<'a> {
168 type Item = (MetricId, &'a Metric);
169
170 #[inline]
171 fn next(&mut self) -> Option<(MetricId, &'a Metric)> {
172 self.iter.next().map(|(k, v)| {
173 let mut metric_id = k.clone();
174 for (tag_key, tag_value) in self.default_tags.0.iter() {
175 metric_id = metric_id.with_tag(tag_key.clone(), tag_value.clone())
176 }
177
178 (metric_id, v)
179 })
180 }
181
182 #[inline]
183 fn size_hint(&self) -> (usize, Option<usize>) {
184 self.iter.size_hint()
185 }
186}
187
188impl DefaultTags {
189 fn merge(&self, other: &Self) -> Self {
190 DefaultTags(
191 self.0
192 .iter()
193 .chain(other.0.iter())
194 .map(|(k, v)| (k.clone(), v.clone()))
195 .collect(),
196 )
197 }
198}
199
200#[cfg(test)]
201mod tests {
202 use super::*;
203
204 #[test]
205 fn test_default_tags() -> Result<(), &'static str> {
206 let tags = [("file", "a"), ("partition", "1")];
207 let metrics = VortexMetrics::new_with_tags(tags);
208
209 let counter = metrics.counter("test.counter");
211 counter.inc();
212 let snapshot = metrics.snapshot();
213 let (name, metric) = snapshot.iter().next().unwrap();
214 assert_eq!(
215 name,
216 MetricId::new("test.counter")
217 .with_tag("file", "a")
218 .with_tag("partition", "1")
219 );
220 match metric {
221 Metric::Counter(c) => assert_eq!(c.count(), 1),
222 _ => return Err("metric is not a counter"),
223 }
224 Ok(())
225 }
226
227 #[test]
228 fn test_multiple_children_with_different_tags() -> Result<(), &'static str> {
229 let parent_tags = [("service", "vortex")];
230 let parent = VortexMetrics::new_with_tags(parent_tags);
231
232 let child1_tags = [("instance", "child1")];
233 let child2_tags = [("instance", "child2")];
234
235 let child1 = parent.child_with_tags(child1_tags);
236 let child2 = parent.child_with_tags(child2_tags);
237
238 let counter1 = child1.counter("test.counter");
240 let counter2 = child2.counter("test.counter");
241
242 counter1.inc();
243 counter2.add(2);
244
245 let child1_snapshot = child1.snapshot();
247 let (name, metric) = child1_snapshot.iter().next().unwrap();
248 assert_eq!(
249 name,
250 MetricId::new("test.counter")
251 .with_tag("service", "vortex")
252 .with_tag("instance", "child1")
253 );
254 match metric {
255 Metric::Counter(c) => assert_eq!(c.count(), 1),
256 _ => return Err("metric is not a counter"),
257 }
258
259 let child2_snapshot = child2.snapshot();
261 let (name, metric) = child2_snapshot.iter().next().unwrap();
262 assert_eq!(
263 name,
264 MetricId::new("test.counter")
265 .with_tag("service", "vortex")
266 .with_tag("instance", "child2")
267 );
268 match metric {
269 Metric::Counter(c) => assert_eq!(c.count(), 2),
270 _ => return Err("metric is not a counter"),
271 }
272 Ok(())
273 }
274
275 #[test]
276 fn test_tag_overriding() -> Result<(), &'static str> {
277 let parent_tags = [("service", "vortex"), ("environment", "test")];
278 let parent = VortexMetrics::new_with_tags(parent_tags);
279
280 let child_tags = [("service", "override"), ("instance", "child1")];
282 let child = parent.child_with_tags(child_tags);
283
284 let child_counter = child.counter("test.counter");
285 child_counter.inc();
286
287 let child_snapshot = child.snapshot();
289 let (name, metric) = child_snapshot.iter().next().unwrap();
290 assert_eq!(
291 name,
292 MetricId::new("test.counter")
293 .with_tag("service", "override")
294 .with_tag("environment", "test")
295 .with_tag("instance", "child1")
296 );
297 match metric {
298 Metric::Counter(c) => assert_eq!(c.count(), 1),
299 _ => return Err("metric is not a counter"),
300 }
301 Ok(())
302 }
303}