1#![deny(missing_docs)]
2mod macros;
5
6use std::borrow::Cow;
7use std::collections::BTreeMap;
8use std::fmt::{Debug, Formatter};
9use std::sync::Arc;
10
11use parking_lot::RwLock;
12use witchcraft_metrics::{MetricRegistry, Metrics, MetricsIter};
13
14#[derive(Default, Clone)]
16pub struct VortexMetrics {
17 inner: Arc<Inner>,
18}
19
20#[derive(Default)]
21struct Inner {
22 registry: MetricRegistry,
23 default_tags: DefaultTags,
24 children: RwLock<Vec<VortexMetrics>>,
25}
26
27impl Debug for VortexMetrics {
28 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
29 f.debug_struct("VortexMetrics")
30 .field("default_tags", &self.inner.default_tags)
31 .field("children", &self.inner.children)
32 .finish_non_exhaustive()
33 }
34}
35
36pub use witchcraft_metrics::{Counter, Histogram, Metric, MetricId, Tags, Timer};
38
39#[derive(Default, Clone, Debug)]
41pub struct DefaultTags(BTreeMap<Cow<'static, str>, Cow<'static, str>>);
42
43impl<K, V, I> From<I> for DefaultTags
44where
45 I: IntoIterator<Item = (K, V)>,
46 K: Into<Cow<'static, str>>,
47 V: Into<Cow<'static, str>>,
48{
49 fn from(pairs: I) -> Self {
50 DefaultTags(
51 pairs
52 .into_iter()
53 .map(|(k, v)| (k.into(), v.into()))
54 .collect(),
55 )
56 }
57}
58
59impl VortexMetrics {
60 pub fn new(registry: MetricRegistry, default_tags: impl Into<DefaultTags>) -> Self {
62 let inner = Arc::new(Inner {
63 registry,
64 default_tags: default_tags.into(),
65 children: Default::default(),
66 });
67 Self { inner }
68 }
69
70 pub fn new_with_tags(default_tags: impl Into<DefaultTags>) -> Self {
72 Self::new(MetricRegistry::default(), default_tags)
73 }
74
75 pub fn child_with_tags(&self, additional_tags: impl Into<DefaultTags>) -> Self {
78 let child = Self::new_with_tags(self.inner.default_tags.merge(&additional_tags.into()));
79 self.inner.children.write().push(child.clone());
80 child
81 }
82
83 pub fn counter<T>(&self, id: T) -> Arc<Counter>
89 where
90 T: Into<MetricId>,
91 {
92 self.inner.registry.counter(id)
93 }
94
95 pub fn histogram<T>(&self, id: T) -> Arc<Histogram>
101 where
102 T: Into<MetricId>,
103 {
104 self.inner.registry.histogram(id)
105 }
106
107 pub fn timer<T>(&self, id: T) -> Arc<Timer>
113 where
114 T: Into<MetricId>,
115 {
116 self.inner.registry.timer(id)
117 }
118
119 pub fn snapshot(&self) -> MetricsSnapshot {
125 let children = self.inner.children.read();
126 let snapshots = children.iter().map(|c| c.snapshot());
127 MetricsSnapshot(
128 std::iter::once((
129 self.inner.default_tags.clone(),
130 self.inner.registry.metrics(),
131 ))
132 .chain(snapshots.flat_map(|snapshots| snapshots.0.into_iter()))
133 .collect(),
134 )
135 }
136}
137
138pub struct MetricsSnapshot(Vec<(DefaultTags, Metrics)>);
140
141impl MetricsSnapshot {
142 pub fn iter(&self) -> impl Iterator<Item = (MetricId, &Metric)> {
144 self.0
145 .iter()
146 .flat_map(|(default_tags, metrics)| VortexMetricsIter {
147 iter: metrics.iter(),
148 default_tags,
149 })
150 }
151}
152
153pub struct VortexMetricsIter<'a> {
155 iter: MetricsIter<'a>,
156 default_tags: &'a DefaultTags,
157}
158
159impl<'a> Iterator for VortexMetricsIter<'a> {
160 type Item = (MetricId, &'a Metric);
161
162 #[inline]
163 fn next(&mut self) -> Option<(MetricId, &'a Metric)> {
164 self.iter.next().map(|(k, v)| {
165 let mut metric_id = k.clone();
166 for (tag_key, tag_value) in self.default_tags.0.iter() {
167 metric_id = metric_id.with_tag(tag_key.clone(), tag_value.clone())
168 }
169
170 (metric_id, v)
171 })
172 }
173
174 #[inline]
175 fn size_hint(&self) -> (usize, Option<usize>) {
176 self.iter.size_hint()
177 }
178}
179
180impl DefaultTags {
181 fn merge(&self, other: &Self) -> Self {
182 DefaultTags(
183 self.0
184 .iter()
185 .chain(other.0.iter())
186 .map(|(k, v)| (k.clone(), v.clone()))
187 .collect(),
188 )
189 }
190}
191
192#[cfg(test)]
193mod tests {
194 use super::*;
195
196 #[test]
197 fn test_default_tags() -> Result<(), &'static str> {
198 let tags = [("file", "a"), ("partition", "1")];
199 let metrics = VortexMetrics::new_with_tags(tags);
200
201 let counter = metrics.counter("test.counter");
203 counter.inc();
204 let snapshot = metrics.snapshot();
205 let (name, metric) = snapshot.iter().next().unwrap();
206 assert_eq!(
207 name,
208 MetricId::new("test.counter")
209 .with_tag("file", "a")
210 .with_tag("partition", "1")
211 );
212 match metric {
213 Metric::Counter(c) => assert_eq!(c.count(), 1),
214 _ => return Err("metric is not a counter"),
215 }
216 Ok(())
217 }
218
219 #[test]
220 fn test_multiple_children_with_different_tags() -> Result<(), &'static str> {
221 let parent_tags = [("service", "vortex")];
222 let parent = VortexMetrics::new_with_tags(parent_tags);
223
224 let child1_tags = [("instance", "child1")];
225 let child2_tags = [("instance", "child2")];
226
227 let child1 = parent.child_with_tags(child1_tags);
228 let child2 = parent.child_with_tags(child2_tags);
229
230 let counter1 = child1.counter("test.counter");
232 let counter2 = child2.counter("test.counter");
233
234 counter1.inc();
235 counter2.add(2);
236
237 let child1_snapshot = child1.snapshot();
239 let (name, metric) = child1_snapshot.iter().next().unwrap();
240 assert_eq!(
241 name,
242 MetricId::new("test.counter")
243 .with_tag("service", "vortex")
244 .with_tag("instance", "child1")
245 );
246 match metric {
247 Metric::Counter(c) => assert_eq!(c.count(), 1),
248 _ => return Err("metric is not a counter"),
249 }
250
251 let child2_snapshot = child2.snapshot();
253 let (name, metric) = child2_snapshot.iter().next().unwrap();
254 assert_eq!(
255 name,
256 MetricId::new("test.counter")
257 .with_tag("service", "vortex")
258 .with_tag("instance", "child2")
259 );
260 match metric {
261 Metric::Counter(c) => assert_eq!(c.count(), 2),
262 _ => return Err("metric is not a counter"),
263 }
264 Ok(())
265 }
266
267 #[test]
268 fn test_tag_overriding() -> Result<(), &'static str> {
269 let parent_tags = [("service", "vortex"), ("environment", "test")];
270 let parent = VortexMetrics::new_with_tags(parent_tags);
271
272 let child_tags = [("service", "override"), ("instance", "child1")];
274 let child = parent.child_with_tags(child_tags);
275
276 let child_counter = child.counter("test.counter");
277 child_counter.inc();
278
279 let child_snapshot = child.snapshot();
281 let (name, metric) = child_snapshot.iter().next().unwrap();
282 assert_eq!(
283 name,
284 MetricId::new("test.counter")
285 .with_tag("service", "override")
286 .with_tag("environment", "test")
287 .with_tag("instance", "child1")
288 );
289 match metric {
290 Metric::Counter(c) => assert_eq!(c.count(), 1),
291 _ => return Err("metric is not a counter"),
292 }
293 Ok(())
294 }
295}