1use std::collections::BTreeMap;
2
3use command::{
4 AggregatedMetrics, BackendMetrics, Bucket, FilteredHistogram, FilteredMetrics, Percentiles,
5 filtered_metrics::Inner,
6};
7use prost::UnknownEnumValue;
8
9pub mod command;
11
12pub mod display;
14
15#[derive(thiserror::Error, Debug)]
16pub enum DisplayError {
17 #[error("Could not display content")]
18 DisplayContent(String),
19 #[error("Error while parsing response to JSON")]
20 Json(serde_json::Error),
21 #[error("got the wrong response content type: {0}")]
22 WrongResponseType(String),
23 #[error("Could not format the datetime to ISO 8601")]
24 DateTime,
25 #[error("unrecognized protobuf variant: {0}")]
26 DecodeError(UnknownEnumValue),
27}
28
29impl From<command::response_content::ContentType> for command::ResponseContent {
31 fn from(value: command::response_content::ContentType) -> Self {
32 Self {
33 content_type: Some(value),
34 }
35 }
36}
37
38impl From<command::request::RequestType> for command::Request {
40 fn from(value: command::request::RequestType) -> Self {
41 Self {
42 request_type: Some(value),
43 }
44 }
45}
46
47impl AggregatedMetrics {
48 pub fn merge_metrics(&mut self) {
54 let workers = std::mem::take(&mut self.workers);
56
57 for (_worker_id, worker) in workers {
58 for (metric_name, new_value) in worker.proxy {
59 if new_value.is_mergeable() {
60 self.proxying
61 .entry(metric_name)
62 .and_modify(|old_value| old_value.merge(&new_value))
63 .or_insert(new_value);
64 }
65 }
66
67 for (cluster_id, mut cluster_metrics) in worker.clusters {
68 for (metric_name, new_value) in cluster_metrics.cluster {
69 if new_value.is_mergeable() {
70 let cluster = self.clusters.entry(cluster_id.to_owned()).or_default();
71
72 cluster
73 .cluster
74 .entry(metric_name)
75 .and_modify(|old_value| old_value.merge(&new_value))
76 .or_insert(new_value);
77 }
78 }
79
80 for backend in cluster_metrics.backends.drain(..) {
81 for (metric_name, new_value) in backend.metrics {
82 if new_value.is_mergeable() {
83 let cluster = self.clusters.entry(cluster_id.to_owned()).or_default();
84
85 let found_backend = cluster
86 .backends
87 .iter_mut()
88 .find(|present| present.backend_id == backend.backend_id);
89
90 if let Some(existing_backend) = found_backend {
91 let _ = existing_backend
92 .metrics
93 .entry(metric_name)
94 .and_modify(|old_value| old_value.merge(&new_value))
95 .or_insert(new_value);
96 } else {
97 cluster.backends.push(BackendMetrics {
98 backend_id: backend.backend_id.clone(),
99 metrics: BTreeMap::from([(metric_name, new_value)]),
100 });
101 };
102 }
103 }
104 }
105 }
106 }
107 }
108}
109
110impl FilteredMetrics {
111 pub fn merge(&mut self, right: &Self) {
112 match (&self.inner, &right.inner) {
113 (Some(Inner::Gauge(a)), Some(Inner::Gauge(b))) => {
114 *self = Self {
115 inner: Some(Inner::Gauge(a + b)),
116 };
117 }
118 (Some(Inner::Count(a)), Some(Inner::Count(b))) => {
119 *self = Self {
120 inner: Some(Inner::Count(a + b)),
121 };
122 }
123 (Some(Inner::Histogram(a)), Some(Inner::Histogram(b))) => {
124 let longest_len = a.buckets.len().max(b.buckets.len());
125
126 let mut a_count = 0;
127 let mut b_count = 0;
128 let buckets = (0..longest_len)
129 .map(|i| {
130 if let Some(a_bucket) = a.buckets.get(i) {
131 a_count = a_bucket.count;
132 }
133 if let Some(b_bucket) = b.buckets.get(i) {
134 b_count = b_bucket.count;
135 }
136 Bucket {
137 le: (1 << i) - 1, count: a_count + b_count,
139 }
140 })
141 .collect();
142
143 *self = Self {
144 inner: Some(Inner::Histogram(FilteredHistogram {
145 count: a.count + b.count,
146 sum: a.sum + b.sum,
147 buckets,
148 })),
149 };
150 }
151 (Some(Inner::Percentiles(a)), Some(Inner::Percentiles(b))) => {
152 *self = Self {
162 inner: Some(Inner::Percentiles(Percentiles {
163 samples: a.samples + b.samples,
164 p_50: a.p_50.max(b.p_50),
165 p_90: a.p_90.max(b.p_90),
166 p_99: a.p_99.max(b.p_99),
167 p_99_9: a.p_99_9.max(b.p_99_9),
168 p_99_99: a.p_99_99.max(b.p_99_99),
169 p_99_999: a.p_99_999.max(b.p_99_999),
170 p_100: a.p_100.max(b.p_100),
171 sum: a.sum + b.sum,
172 })),
173 };
174 }
175 _ => {}
176 }
177 }
178
179 fn is_mergeable(&self) -> bool {
180 match &self.inner {
181 Some(Inner::Gauge(_))
182 | Some(Inner::Count(_))
183 | Some(Inner::Histogram(_))
184 | Some(Inner::Percentiles(_)) => true,
185 Some(Inner::Time(_)) | Some(Inner::TimeSerie(_)) | None => false,
187 }
188 }
189}
190
191#[cfg(test)]
192mod tests {
193 use std::collections::BTreeMap;
194
195 use super::AggregatedMetrics;
196 use super::command::{
197 Bucket, ClusterMetrics, FilteredHistogram, FilteredMetrics, Percentiles, WorkerMetrics,
198 filtered_metrics::Inner,
199 };
200
201 #[test]
202 fn merge_relocates_single_worker_to_top_level() {
203 let mut worker = WorkerMetrics {
209 proxy: BTreeMap::new(),
210 clusters: BTreeMap::new(),
211 };
212 worker.proxy.insert(
213 "requests".to_owned(),
214 FilteredMetrics {
215 inner: Some(Inner::Count(42)),
216 },
217 );
218 let mut cluster = ClusterMetrics {
219 cluster: BTreeMap::new(),
220 backends: Vec::new(),
221 };
222 cluster.cluster.insert(
223 "requests".to_owned(),
224 FilteredMetrics {
225 inner: Some(Inner::Count(7)),
226 },
227 );
228 worker.clusters.insert("cluster-a".to_owned(), cluster);
229
230 let mut agg = AggregatedMetrics {
231 main: BTreeMap::new(),
232 workers: BTreeMap::from([("0".to_owned(), worker)]),
233 clusters: BTreeMap::new(),
234 proxying: BTreeMap::new(),
235 };
236
237 agg.merge_metrics();
238
239 assert!(
240 agg.workers.is_empty(),
241 "merge takes ownership of the per-worker map"
242 );
243 assert_eq!(
244 agg.proxying.get("requests"),
245 Some(&FilteredMetrics {
246 inner: Some(Inner::Count(42)),
247 }),
248 "single worker's proxy counter must surface in proxying"
249 );
250 let cluster_a = agg
251 .clusters
252 .get("cluster-a")
253 .expect("cluster row must surface in top-level clusters");
254 assert_eq!(
255 cluster_a.cluster.get("requests"),
256 Some(&FilteredMetrics {
257 inner: Some(Inner::Count(7)),
258 })
259 );
260 }
261
262 #[test]
263 fn merge_counts_and_gauges() {
264 let mut gauge_a = FilteredMetrics {
265 inner: Some(Inner::Gauge(4)),
266 };
267 let gauge_b = FilteredMetrics {
268 inner: Some(Inner::Gauge(4)),
269 };
270
271 gauge_a.merge(&gauge_b);
272
273 assert_eq!(
274 gauge_a,
275 FilteredMetrics {
276 inner: Some(Inner::Gauge(8)),
277 }
278 );
279
280 let mut count_a = FilteredMetrics {
281 inner: Some(Inner::Count(3)),
282 };
283 let count_b = FilteredMetrics {
284 inner: Some(Inner::Count(3)),
285 };
286
287 count_a.merge(&count_b);
288
289 assert_eq!(
290 count_a,
291 FilteredMetrics {
292 inner: Some(Inner::Count(6)),
293 }
294 );
295 }
296
297 #[test]
298 fn merge_percentiles_takes_max_per_quantile() {
299 let mut left = FilteredMetrics {
303 inner: Some(Inner::Percentiles(Percentiles {
304 samples: 100,
305 p_50: 5,
306 p_90: 20,
307 p_99: 100,
308 p_99_9: 200,
309 p_99_99: 250,
310 p_99_999: 300,
311 p_100: 400,
312 sum: 12_000,
313 })),
314 };
315 let right = FilteredMetrics {
316 inner: Some(Inner::Percentiles(Percentiles {
317 samples: 50,
318 p_50: 7,
319 p_90: 15,
320 p_99: 80,
321 p_99_9: 240,
322 p_99_99: 245,
323 p_99_999: 290,
324 p_100: 380,
325 sum: 6_000,
326 })),
327 };
328 left.merge(&right);
329 assert_eq!(
330 left,
331 FilteredMetrics {
332 inner: Some(Inner::Percentiles(Percentiles {
333 samples: 150,
334 p_50: 7,
335 p_90: 20,
336 p_99: 100,
337 p_99_9: 240,
338 p_99_99: 250,
339 p_99_999: 300,
340 p_100: 400,
341 sum: 18_000,
342 })),
343 }
344 );
345 }
346
347 #[test]
348 fn merge_histograms() {
349 let mut histogram_a = FilteredMetrics {
350 inner: Some(Inner::Histogram(FilteredHistogram {
351 sum: 95,
352 count: 30,
353 buckets: vec![
354 Bucket { le: 0, count: 1 },
355 Bucket { le: 1, count: 2 },
356 Bucket { le: 3, count: 10 },
357 Bucket { le: 7, count: 25 },
358 Bucket { le: 15, count: 27 },
359 Bucket { le: 31, count: 30 },
360 ],
361 })),
362 };
363
364 let histogram_b = FilteredMetrics {
365 inner: Some(Inner::Histogram(FilteredHistogram {
366 sum: 82,
367 count: 40,
368 buckets: vec![
369 Bucket { le: 0, count: 0 },
370 Bucket { le: 1, count: 0 },
371 Bucket { le: 3, count: 12 },
372 Bucket { le: 7, count: 30 },
373 Bucket { le: 15, count: 40 },
374 ],
376 })),
377 };
378
379 histogram_a.merge(&histogram_b);
380
381 let merged_histogram = FilteredMetrics {
382 inner: Some(Inner::Histogram(FilteredHistogram {
383 sum: 177,
384 count: 70,
385 buckets: vec![
386 Bucket { le: 0, count: 1 },
387 Bucket { le: 1, count: 2 },
388 Bucket { le: 3, count: 22 },
389 Bucket { le: 7, count: 55 },
390 Bucket { le: 15, count: 67 },
391 Bucket { le: 31, count: 70 }, ],
393 })),
394 };
395
396 assert_eq!(histogram_a, merged_histogram);
397 }
398}