1#![deny(missing_docs)]
5
6mod macros;
9mod session;
10
11use std::borrow::Cow;
12use std::collections::BTreeMap;
13use std::fmt::{Debug, Formatter};
14use std::sync::Arc;
15
16use parking_lot::RwLock;
17pub use session::*;
18use witchcraft_metrics::{MetricRegistry, Metrics, MetricsIter};
19
20#[derive(Default, Clone)]
22pub struct VortexMetrics {
23 inner: Arc<Inner>,
24}
25
26#[derive(Default)]
27struct Inner {
28 registry: MetricRegistry,
29 default_tags: DefaultTags,
30 children: RwLock<Vec<VortexMetrics>>,
31}
32
33impl Debug for VortexMetrics {
34 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
35 f.debug_struct("VortexMetrics")
36 .field("default_tags", &self.inner.default_tags)
37 .field("children", &self.inner.children)
38 .finish_non_exhaustive()
39 }
40}
41
42pub use witchcraft_metrics::{Counter, Histogram, Metric, MetricId, Tags, Timer};
44
45#[derive(Default, Clone, Debug)]
47pub struct DefaultTags(BTreeMap<Cow<'static, str>, Cow<'static, str>>);
48
49impl<K, V, I> From<I> for DefaultTags
50where
51 I: IntoIterator<Item = (K, V)>,
52 K: Into<Cow<'static, str>>,
53 V: Into<Cow<'static, str>>,
54{
55 fn from(pairs: I) -> Self {
56 DefaultTags(
57 pairs
58 .into_iter()
59 .map(|(k, v)| (k.into(), v.into()))
60 .collect(),
61 )
62 }
63}
64
65impl VortexMetrics {
66 pub fn new(registry: MetricRegistry, default_tags: impl Into<DefaultTags>) -> Self {
68 let inner = Arc::new(Inner {
69 registry,
70 default_tags: default_tags.into(),
71 children: Default::default(),
72 });
73 Self { inner }
74 }
75
76 pub fn new_with_tags(default_tags: impl Into<DefaultTags>) -> Self {
78 Self::new(MetricRegistry::default(), default_tags)
79 }
80
81 pub fn child_with_tags(&self, additional_tags: impl Into<DefaultTags>) -> Self {
84 let child = Self::new_with_tags(self.inner.default_tags.merge(&additional_tags.into()));
85 self.inner.children.write().push(child.clone());
86 child
87 }
88
89 pub fn counter<T>(&self, id: T) -> Arc<Counter>
95 where
96 T: Into<MetricId>,
97 {
98 self.inner.registry.counter(id)
99 }
100
101 pub fn histogram<T>(&self, id: T) -> Arc<Histogram>
107 where
108 T: Into<MetricId>,
109 {
110 self.inner.registry.histogram(id)
111 }
112
113 pub fn timer<T>(&self, id: T) -> Arc<Timer>
119 where
120 T: Into<MetricId>,
121 {
122 self.inner.registry.timer(id)
123 }
124
125 pub fn snapshot(&self) -> MetricsSnapshot {
131 let children = self.inner.children.read();
132 let snapshots = children.iter().map(|c| c.snapshot());
133 MetricsSnapshot(
134 std::iter::once((
135 self.inner.default_tags.clone(),
136 self.inner.registry.metrics(),
137 ))
138 .chain(snapshots.flat_map(|snapshots| snapshots.0.into_iter()))
139 .collect(),
140 )
141 }
142}
143
144pub struct MetricsSnapshot(Vec<(DefaultTags, Metrics)>);
146
147impl MetricsSnapshot {
148 pub fn iter(&self) -> impl Iterator<Item = (MetricId, &Metric)> {
150 self.0
151 .iter()
152 .flat_map(|(default_tags, metrics)| VortexMetricsIter {
153 iter: metrics.iter(),
154 default_tags,
155 })
156 }
157}
158
159pub struct VortexMetricsIter<'a> {
161 iter: MetricsIter<'a>,
162 default_tags: &'a DefaultTags,
163}
164
165impl<'a> Iterator for VortexMetricsIter<'a> {
166 type Item = (MetricId, &'a Metric);
167
168 #[inline]
169 fn next(&mut self) -> Option<(MetricId, &'a Metric)> {
170 self.iter.next().map(|(k, v)| {
171 let mut metric_id = k.clone();
172 for (tag_key, tag_value) in self.default_tags.0.iter() {
173 metric_id = metric_id.with_tag(tag_key.clone(), tag_value.clone())
174 }
175
176 (metric_id, v)
177 })
178 }
179
180 #[inline]
181 fn size_hint(&self) -> (usize, Option<usize>) {
182 self.iter.size_hint()
183 }
184}
185
186impl DefaultTags {
187 fn merge(&self, other: &Self) -> Self {
188 DefaultTags(
189 self.0
190 .iter()
191 .chain(other.0.iter())
192 .map(|(k, v)| (k.clone(), v.clone()))
193 .collect(),
194 )
195 }
196}
197
198#[cfg(test)]
199mod tests {
200 use super::*;
201
202 #[test]
203 fn test_default_tags() -> Result<(), &'static str> {
204 let tags = [("file", "a"), ("partition", "1")];
205 let metrics = VortexMetrics::new_with_tags(tags);
206
207 let counter = metrics.counter("test.counter");
209 counter.inc();
210 let snapshot = metrics.snapshot();
211 let (name, metric) = snapshot.iter().next().unwrap();
212 assert_eq!(
213 name,
214 MetricId::new("test.counter")
215 .with_tag("file", "a")
216 .with_tag("partition", "1")
217 );
218 match metric {
219 Metric::Counter(c) => assert_eq!(c.count(), 1),
220 _ => return Err("metric is not a counter"),
221 }
222 Ok(())
223 }
224
225 #[test]
226 fn test_multiple_children_with_different_tags() -> Result<(), &'static str> {
227 let parent_tags = [("service", "vortex")];
228 let parent = VortexMetrics::new_with_tags(parent_tags);
229
230 let child1_tags = [("instance", "child1")];
231 let child2_tags = [("instance", "child2")];
232
233 let child1 = parent.child_with_tags(child1_tags);
234 let child2 = parent.child_with_tags(child2_tags);
235
236 let counter1 = child1.counter("test.counter");
238 let counter2 = child2.counter("test.counter");
239
240 counter1.inc();
241 counter2.add(2);
242
243 let child1_snapshot = child1.snapshot();
245 let (name, metric) = child1_snapshot.iter().next().unwrap();
246 assert_eq!(
247 name,
248 MetricId::new("test.counter")
249 .with_tag("service", "vortex")
250 .with_tag("instance", "child1")
251 );
252 match metric {
253 Metric::Counter(c) => assert_eq!(c.count(), 1),
254 _ => return Err("metric is not a counter"),
255 }
256
257 let child2_snapshot = child2.snapshot();
259 let (name, metric) = child2_snapshot.iter().next().unwrap();
260 assert_eq!(
261 name,
262 MetricId::new("test.counter")
263 .with_tag("service", "vortex")
264 .with_tag("instance", "child2")
265 );
266 match metric {
267 Metric::Counter(c) => assert_eq!(c.count(), 2),
268 _ => return Err("metric is not a counter"),
269 }
270 Ok(())
271 }
272
273 #[test]
274 fn test_tag_overriding() -> Result<(), &'static str> {
275 let parent_tags = [("service", "vortex"), ("environment", "test")];
276 let parent = VortexMetrics::new_with_tags(parent_tags);
277
278 let child_tags = [("service", "override"), ("instance", "child1")];
280 let child = parent.child_with_tags(child_tags);
281
282 let child_counter = child.counter("test.counter");
283 child_counter.inc();
284
285 let child_snapshot = child.snapshot();
287 let (name, metric) = child_snapshot.iter().next().unwrap();
288 assert_eq!(
289 name,
290 MetricId::new("test.counter")
291 .with_tag("service", "override")
292 .with_tag("environment", "test")
293 .with_tag("instance", "child1")
294 );
295 match metric {
296 Metric::Counter(c) => assert_eq!(c.count(), 1),
297 _ => return Err("metric is not a counter"),
298 }
299 Ok(())
300 }
301}