1#![deny(missing_docs)]
5
6mod macros;
9mod session;
10
11use std::borrow::Cow;
12use std::collections::BTreeMap;
13use std::fmt::Debug;
14use std::fmt::Formatter;
15use std::sync::Arc;
16
17use parking_lot::RwLock;
18pub use session::*;
19use witchcraft_metrics::MetricRegistry;
20use witchcraft_metrics::Metrics;
21use witchcraft_metrics::MetricsIter;
22
23#[derive(Default, Clone)]
25pub struct VortexMetrics {
26 inner: Arc<Inner>,
27}
28
29#[derive(Default)]
30struct Inner {
31 registry: MetricRegistry,
32 default_tags: DefaultTags,
33 children: RwLock<Vec<VortexMetrics>>,
34}
35
36impl Debug for VortexMetrics {
37 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
38 f.debug_struct("VortexMetrics")
39 .field("default_tags", &self.inner.default_tags)
40 .field("children", &self.inner.children)
41 .finish_non_exhaustive()
42 }
43}
44
45pub use witchcraft_metrics::Counter;
47pub use witchcraft_metrics::Histogram;
48pub use witchcraft_metrics::Metric;
49pub use witchcraft_metrics::MetricId;
50pub use witchcraft_metrics::Tags;
51pub use witchcraft_metrics::Timer;
52
53#[derive(Default, Clone, Debug)]
55pub struct DefaultTags(BTreeMap<Cow<'static, str>, Cow<'static, str>>);
56
57impl<K, V, I> From<I> for DefaultTags
58where
59 I: IntoIterator<Item = (K, V)>,
60 K: Into<Cow<'static, str>>,
61 V: Into<Cow<'static, str>>,
62{
63 fn from(pairs: I) -> Self {
64 DefaultTags(
65 pairs
66 .into_iter()
67 .map(|(k, v)| (k.into(), v.into()))
68 .collect(),
69 )
70 }
71}
72
73impl VortexMetrics {
74 pub fn new(registry: MetricRegistry, default_tags: impl Into<DefaultTags>) -> Self {
76 let inner = Arc::new(Inner {
77 registry,
78 default_tags: default_tags.into(),
79 children: Default::default(),
80 });
81 Self { inner }
82 }
83
84 pub fn new_with_tags(default_tags: impl Into<DefaultTags>) -> Self {
86 Self::new(MetricRegistry::default(), default_tags)
87 }
88
89 pub fn child_with_tags(&self, additional_tags: impl Into<DefaultTags>) -> Self {
92 let child = Self::new_with_tags(self.inner.default_tags.merge(&additional_tags.into()));
93 self.inner.children.write().push(child.clone());
94 child
95 }
96
97 pub fn counter<T>(&self, id: T) -> Arc<Counter>
103 where
104 T: Into<MetricId>,
105 {
106 self.inner.registry.counter(id)
107 }
108
109 pub fn histogram<T>(&self, id: T) -> Arc<Histogram>
115 where
116 T: Into<MetricId>,
117 {
118 self.inner.registry.histogram(id)
119 }
120
121 pub fn timer<T>(&self, id: T) -> Arc<Timer>
127 where
128 T: Into<MetricId>,
129 {
130 self.inner.registry.timer(id)
131 }
132
133 pub fn snapshot(&self) -> MetricsSnapshot {
139 let children = self.inner.children.read();
140 let snapshots = children.iter().map(|c| c.snapshot());
141 MetricsSnapshot(
142 std::iter::once((
143 self.inner.default_tags.clone(),
144 self.inner.registry.metrics(),
145 ))
146 .chain(snapshots.flat_map(|snapshots| snapshots.0.into_iter()))
147 .collect(),
148 )
149 }
150}
151
152pub struct MetricsSnapshot(Vec<(DefaultTags, Metrics)>);
154
155impl MetricsSnapshot {
156 pub fn iter(&self) -> impl Iterator<Item = (MetricId, &Metric)> {
158 self.0
159 .iter()
160 .flat_map(|(default_tags, metrics)| VortexMetricsIter {
161 iter: metrics.iter(),
162 default_tags,
163 })
164 }
165}
166
167pub struct VortexMetricsIter<'a> {
169 iter: MetricsIter<'a>,
170 default_tags: &'a DefaultTags,
171}
172
173impl<'a> Iterator for VortexMetricsIter<'a> {
174 type Item = (MetricId, &'a Metric);
175
176 #[inline]
177 fn next(&mut self) -> Option<(MetricId, &'a Metric)> {
178 self.iter.next().map(|(k, v)| {
179 let mut metric_id = k.clone();
180 for (tag_key, tag_value) in self.default_tags.0.iter() {
181 metric_id = metric_id.with_tag(tag_key.clone(), tag_value.clone())
182 }
183
184 (metric_id, v)
185 })
186 }
187
188 #[inline]
189 fn size_hint(&self) -> (usize, Option<usize>) {
190 self.iter.size_hint()
191 }
192}
193
194impl DefaultTags {
195 fn merge(&self, other: &Self) -> Self {
196 DefaultTags(
197 self.0
198 .iter()
199 .chain(other.0.iter())
200 .map(|(k, v)| (k.clone(), v.clone()))
201 .collect(),
202 )
203 }
204}
205
206#[cfg(test)]
207mod tests {
208 use super::*;
209
210 #[test]
211 fn test_default_tags() -> Result<(), &'static str> {
212 let tags = [("file", "a"), ("partition", "1")];
213 let metrics = VortexMetrics::new_with_tags(tags);
214
215 let counter = metrics.counter("test.counter");
217 counter.inc();
218 let snapshot = metrics.snapshot();
219 let (name, metric) = snapshot.iter().next().unwrap();
220 assert_eq!(
221 name,
222 MetricId::new("test.counter")
223 .with_tag("file", "a")
224 .with_tag("partition", "1")
225 );
226 match metric {
227 Metric::Counter(c) => assert_eq!(c.count(), 1),
228 _ => return Err("metric is not a counter"),
229 }
230 Ok(())
231 }
232
233 #[test]
234 fn test_multiple_children_with_different_tags() -> Result<(), &'static str> {
235 let parent_tags = [("service", "vortex")];
236 let parent = VortexMetrics::new_with_tags(parent_tags);
237
238 let child1_tags = [("instance", "child1")];
239 let child2_tags = [("instance", "child2")];
240
241 let child1 = parent.child_with_tags(child1_tags);
242 let child2 = parent.child_with_tags(child2_tags);
243
244 let counter1 = child1.counter("test.counter");
246 let counter2 = child2.counter("test.counter");
247
248 counter1.inc();
249 counter2.add(2);
250
251 let child1_snapshot = child1.snapshot();
253 let (name, metric) = child1_snapshot.iter().next().unwrap();
254 assert_eq!(
255 name,
256 MetricId::new("test.counter")
257 .with_tag("service", "vortex")
258 .with_tag("instance", "child1")
259 );
260 match metric {
261 Metric::Counter(c) => assert_eq!(c.count(), 1),
262 _ => return Err("metric is not a counter"),
263 }
264
265 let child2_snapshot = child2.snapshot();
267 let (name, metric) = child2_snapshot.iter().next().unwrap();
268 assert_eq!(
269 name,
270 MetricId::new("test.counter")
271 .with_tag("service", "vortex")
272 .with_tag("instance", "child2")
273 );
274 match metric {
275 Metric::Counter(c) => assert_eq!(c.count(), 2),
276 _ => return Err("metric is not a counter"),
277 }
278 Ok(())
279 }
280
281 #[test]
282 fn test_tag_overriding() -> Result<(), &'static str> {
283 let parent_tags = [("service", "vortex"), ("environment", "test")];
284 let parent = VortexMetrics::new_with_tags(parent_tags);
285
286 let child_tags = [("service", "override"), ("instance", "child1")];
288 let child = parent.child_with_tags(child_tags);
289
290 let child_counter = child.counter("test.counter");
291 child_counter.inc();
292
293 let child_snapshot = child.snapshot();
295 let (name, metric) = child_snapshot.iter().next().unwrap();
296 assert_eq!(
297 name,
298 MetricId::new("test.counter")
299 .with_tag("service", "override")
300 .with_tag("environment", "test")
301 .with_tag("instance", "child1")
302 );
303 match metric {
304 Metric::Counter(c) => assert_eq!(c.count(), 1),
305 _ => return Err("metric is not a counter"),
306 }
307 Ok(())
308 }
309}