1use std::collections::BTreeMap;
2
3use command::{
4 filtered_metrics::Inner, AggregatedMetrics, BackendMetrics, Bucket, FilteredHistogram,
5 FilteredMetrics,
6};
7use prost::UnknownEnumValue;
8
9pub mod command;
11
12pub mod display;
14
15#[derive(thiserror::Error, Debug)]
16pub enum DisplayError {
17 #[error("Could not display content")]
18 DisplayContent(String),
19 #[error("Error while parsing response to JSON")]
20 Json(serde_json::Error),
21 #[error("got the wrong response content type: {0}")]
22 WrongResponseType(String),
23 #[error("Could not format the datetime to ISO 8601")]
24 DateTime,
25 #[error("unrecognized protobuf variant: {0}")]
26 DecodeError(UnknownEnumValue),
27}
28
29impl From<command::response_content::ContentType> for command::ResponseContent {
31 fn from(value: command::response_content::ContentType) -> Self {
32 Self {
33 content_type: Some(value),
34 }
35 }
36}
37
38impl From<command::request::RequestType> for command::Request {
40 fn from(value: command::request::RequestType) -> Self {
41 Self {
42 request_type: Some(value),
43 }
44 }
45}
46
47impl AggregatedMetrics {
48 pub fn merge_metrics(&mut self) {
54 let workers = std::mem::take(&mut self.workers);
56
57 for (_worker_id, worker) in workers {
58 for (metric_name, new_value) in worker.proxy {
59 if new_value.is_mergeable() {
60 self.proxying
61 .entry(metric_name)
62 .and_modify(|old_value| old_value.merge(&new_value))
63 .or_insert(new_value);
64 }
65 }
66
67 for (cluster_id, mut cluster_metrics) in worker.clusters {
68 for (metric_name, new_value) in cluster_metrics.cluster {
69 if new_value.is_mergeable() {
70 let cluster = self.clusters.entry(cluster_id.to_owned()).or_default();
71
72 cluster
73 .cluster
74 .entry(metric_name)
75 .and_modify(|old_value| old_value.merge(&new_value))
76 .or_insert(new_value);
77 }
78 }
79
80 for backend in cluster_metrics.backends.drain(..) {
81 for (metric_name, new_value) in backend.metrics {
82 if new_value.is_mergeable() {
83 let cluster = self.clusters.entry(cluster_id.to_owned()).or_default();
84
85 let found_backend = cluster
86 .backends
87 .iter_mut()
88 .find(|present| present.backend_id == backend.backend_id);
89
90 if let Some(existing_backend) = found_backend {
91 let _ = existing_backend
92 .metrics
93 .entry(metric_name)
94 .and_modify(|old_value| old_value.merge(&new_value))
95 .or_insert(new_value);
96 } else {
97 cluster.backends.push(BackendMetrics {
98 backend_id: backend.backend_id.clone(),
99 metrics: BTreeMap::from([(metric_name, new_value)]),
100 });
101 };
102 }
103 }
104 }
105 }
106 }
107 }
108}
109
110impl FilteredMetrics {
111 pub fn merge(&mut self, right: &Self) {
112 match (&self.inner, &right.inner) {
113 (Some(Inner::Gauge(a)), Some(Inner::Gauge(b))) => {
114 *self = Self {
115 inner: Some(Inner::Gauge(a + b)),
116 };
117 }
118 (Some(Inner::Count(a)), Some(Inner::Count(b))) => {
119 *self = Self {
120 inner: Some(Inner::Count(a + b)),
121 };
122 }
123 (Some(Inner::Histogram(a)), Some(Inner::Histogram(b))) => {
124 let longest_len = a.buckets.len().max(b.buckets.len());
125
126 let mut a_count = 0;
127 let mut b_count = 0;
128 let buckets = (0..longest_len)
129 .map(|i| {
130 if let Some(a_bucket) = a.buckets.get(i) {
131 a_count = a_bucket.count;
132 }
133 if let Some(b_bucket) = b.buckets.get(i) {
134 b_count = b_bucket.count;
135 }
136 Bucket {
137 le: (1 << i) - 1, count: a_count + b_count,
139 }
140 })
141 .collect();
142
143 *self = Self {
144 inner: Some(Inner::Histogram(FilteredHistogram {
145 count: a.count + b.count,
146 sum: a.sum + b.sum,
147 buckets,
148 })),
149 };
150 }
151 _ => {}
152 }
153 }
154
155 fn is_mergeable(&self) -> bool {
156 match &self.inner {
157 Some(Inner::Gauge(_)) | Some(Inner::Count(_)) | Some(Inner::Histogram(_)) => true,
158 Some(Inner::Time(_))
160 | Some(Inner::Percentiles(_))
161 | Some(Inner::TimeSerie(_))
162 | None => false,
163 }
164 }
165}
166
167#[cfg(test)]
168mod tests {
169 use super::command::{filtered_metrics::Inner, Bucket, FilteredHistogram, FilteredMetrics};
170
171 #[test]
172 fn merge_counts_and_gauges() {
173 let mut gauge_a = FilteredMetrics {
174 inner: Some(Inner::Gauge(4)),
175 };
176 let gauge_b = FilteredMetrics {
177 inner: Some(Inner::Gauge(4)),
178 };
179
180 gauge_a.merge(&gauge_b);
181
182 assert_eq!(
183 gauge_a,
184 FilteredMetrics {
185 inner: Some(Inner::Gauge(8)),
186 }
187 );
188
189 let mut count_a = FilteredMetrics {
190 inner: Some(Inner::Count(3)),
191 };
192 let count_b = FilteredMetrics {
193 inner: Some(Inner::Count(3)),
194 };
195
196 count_a.merge(&count_b);
197
198 assert_eq!(
199 count_a,
200 FilteredMetrics {
201 inner: Some(Inner::Count(6)),
202 }
203 );
204 }
205
206 #[test]
207 fn merge_histograms() {
208 let mut histogram_a = FilteredMetrics {
209 inner: Some(Inner::Histogram(FilteredHistogram {
210 sum: 95,
211 count: 30,
212 buckets: vec![
213 Bucket { le: 0, count: 1 },
214 Bucket { le: 1, count: 2 },
215 Bucket { le: 3, count: 10 },
216 Bucket { le: 7, count: 25 },
217 Bucket { le: 15, count: 27 },
218 Bucket { le: 31, count: 30 },
219 ],
220 })),
221 };
222
223 let histogram_b = FilteredMetrics {
224 inner: Some(Inner::Histogram(FilteredHistogram {
225 sum: 82,
226 count: 40,
227 buckets: vec![
228 Bucket { le: 0, count: 0 },
229 Bucket { le: 1, count: 0 },
230 Bucket { le: 3, count: 12 },
231 Bucket { le: 7, count: 30 },
232 Bucket { le: 15, count: 40 },
233 ],
235 })),
236 };
237
238 histogram_a.merge(&histogram_b);
239
240 let merged_histogram = FilteredMetrics {
241 inner: Some(Inner::Histogram(FilteredHistogram {
242 sum: 177,
243 count: 70,
244 buckets: vec![
245 Bucket { le: 0, count: 1 },
246 Bucket { le: 1, count: 2 },
247 Bucket { le: 3, count: 22 },
248 Bucket { le: 7, count: 55 },
249 Bucket { le: 15, count: 67 },
250 Bucket { le: 31, count: 70 }, ],
252 })),
253 };
254
255 assert_eq!(histogram_a, merged_histogram);
256 }
257}