Skip to main content

sozu_command_lib/proto/
mod.rs

1use std::collections::BTreeMap;
2
3use command::{
4    AggregatedMetrics, BackendMetrics, Bucket, FilteredHistogram, FilteredMetrics, Percentiles,
5    filtered_metrics::Inner,
6};
7use prost::UnknownEnumValue;
8
9/// Contains all types received by and sent from Sōzu
10pub mod command;
11
12/// Implementation of fmt::Display for the protobuf types, used in the CLI
13pub mod display;
14
15#[derive(thiserror::Error, Debug)]
16pub enum DisplayError {
17    #[error("Could not display content")]
18    DisplayContent(String),
19    #[error("Error while parsing response to JSON")]
20    Json(serde_json::Error),
21    #[error("got the wrong response content type: {0}")]
22    WrongResponseType(String),
23    #[error("Could not format the datetime to ISO 8601")]
24    DateTime,
25    #[error("unrecognized protobuf variant: {0}")]
26    DecodeError(UnknownEnumValue),
27}
28
29// Simple helper to build ResponseContent from ContentType
30impl From<command::response_content::ContentType> for command::ResponseContent {
31    fn from(value: command::response_content::ContentType) -> Self {
32        Self {
33            content_type: Some(value),
34        }
35    }
36}
37
38// Simple helper to build Request from RequestType
39impl From<command::request::RequestType> for command::Request {
40    fn from(value: command::request::RequestType) -> Self {
41        Self {
42            request_type: Some(value),
43        }
44    }
45}
46
47impl AggregatedMetrics {
48    /// Merge metrics that were received from several workers
49    ///
50    /// Each worker gather the same kind of metrics,
51    /// for its own proxying logic, and for the same clusters with their backends.
52    /// This means we have to reduce each metric from N instances to 1.
53    pub fn merge_metrics(&mut self) {
54        // avoid copying the worker metrics, by taking them
55        let workers = std::mem::take(&mut self.workers);
56
57        for (_worker_id, worker) in workers {
58            for (metric_name, new_value) in worker.proxy {
59                if new_value.is_mergeable() {
60                    self.proxying
61                        .entry(metric_name)
62                        .and_modify(|old_value| old_value.merge(&new_value))
63                        .or_insert(new_value);
64                }
65            }
66
67            for (cluster_id, mut cluster_metrics) in worker.clusters {
68                for (metric_name, new_value) in cluster_metrics.cluster {
69                    if new_value.is_mergeable() {
70                        let cluster = self.clusters.entry(cluster_id.to_owned()).or_default();
71
72                        cluster
73                            .cluster
74                            .entry(metric_name)
75                            .and_modify(|old_value| old_value.merge(&new_value))
76                            .or_insert(new_value);
77                    }
78                }
79
80                for backend in cluster_metrics.backends.drain(..) {
81                    for (metric_name, new_value) in backend.metrics {
82                        if new_value.is_mergeable() {
83                            let cluster = self.clusters.entry(cluster_id.to_owned()).or_default();
84
85                            let found_backend = cluster
86                                .backends
87                                .iter_mut()
88                                .find(|present| present.backend_id == backend.backend_id);
89
90                            if let Some(existing_backend) = found_backend {
91                                let _ = existing_backend
92                                    .metrics
93                                    .entry(metric_name)
94                                    .and_modify(|old_value| old_value.merge(&new_value))
95                                    .or_insert(new_value);
96                            } else {
97                                cluster.backends.push(BackendMetrics {
98                                    backend_id: backend.backend_id.clone(),
99                                    metrics: BTreeMap::from([(metric_name, new_value)]),
100                                });
101                            };
102                        }
103                    }
104                }
105            }
106        }
107    }
108}
109
110impl FilteredMetrics {
111    pub fn merge(&mut self, right: &Self) {
112        match (&self.inner, &right.inner) {
113            (Some(Inner::Gauge(a)), Some(Inner::Gauge(b))) => {
114                *self = Self {
115                    inner: Some(Inner::Gauge(a + b)),
116                };
117            }
118            (Some(Inner::Count(a)), Some(Inner::Count(b))) => {
119                *self = Self {
120                    inner: Some(Inner::Count(a + b)),
121                };
122            }
123            (Some(Inner::Histogram(a)), Some(Inner::Histogram(b))) => {
124                let longest_len = a.buckets.len().max(b.buckets.len());
125
126                let mut a_count = 0;
127                let mut b_count = 0;
128                let buckets = (0..longest_len)
129                    .map(|i| {
130                        if let Some(a_bucket) = a.buckets.get(i) {
131                            a_count = a_bucket.count;
132                        }
133                        if let Some(b_bucket) = b.buckets.get(i) {
134                            b_count = b_bucket.count;
135                        }
136                        Bucket {
137                            le: (1 << i) - 1, // the bucket less-or-equal limits are normalized: 0, 1, 3, 7, 15, ...
138                            count: a_count + b_count,
139                        }
140                    })
141                    .collect();
142
143                *self = Self {
144                    inner: Some(Inner::Histogram(FilteredHistogram {
145                        count: a.count + b.count,
146                        sum: a.sum + b.sum,
147                        buckets,
148                    })),
149                };
150            }
151            (Some(Inner::Percentiles(a)), Some(Inner::Percentiles(b))) => {
152                // You cannot statistically merge two percentile summaries
153                // without the underlying samples. The companion
154                // `<name>_histogram` Inner::Histogram value is the source
155                // of truth for accurate aggregation and merges correctly
156                // above. We still propagate the percentile shape so legacy
157                // consumers reading it observe at least the worst-case
158                // upper bound across workers — element-wise max preserves
159                // the "is anyone slow?" intent. `samples` and `sum` add so
160                // the totals reflect cross-worker volume.
161                *self = Self {
162                    inner: Some(Inner::Percentiles(Percentiles {
163                        samples: a.samples + b.samples,
164                        p_50: a.p_50.max(b.p_50),
165                        p_90: a.p_90.max(b.p_90),
166                        p_99: a.p_99.max(b.p_99),
167                        p_99_9: a.p_99_9.max(b.p_99_9),
168                        p_99_99: a.p_99_99.max(b.p_99_99),
169                        p_99_999: a.p_99_999.max(b.p_99_999),
170                        p_100: a.p_100.max(b.p_100),
171                        sum: a.sum + b.sum,
172                    })),
173                };
174            }
175            _ => {}
176        }
177    }
178
179    fn is_mergeable(&self) -> bool {
180        match &self.inner {
181            Some(Inner::Gauge(_))
182            | Some(Inner::Count(_))
183            | Some(Inner::Histogram(_))
184            | Some(Inner::Percentiles(_)) => true,
185            // Inner::Time and Inner::Timeserie are never used in Sōzu
186            Some(Inner::Time(_)) | Some(Inner::TimeSerie(_)) | None => false,
187        }
188    }
189}
190
191#[cfg(test)]
192mod tests {
193    use std::collections::BTreeMap;
194
195    use super::AggregatedMetrics;
196    use super::command::{
197        Bucket, ClusterMetrics, FilteredHistogram, FilteredMetrics, Percentiles, WorkerMetrics,
198        filtered_metrics::Inner,
199    };
200
201    #[test]
202    fn merge_relocates_single_worker_to_top_level() {
203        // Regression: a one-worker fleet must populate `clusters` and
204        // `proxying` so CLI/TUI consumers reading those maps see the
205        // worker's data. `std::mem::take(&mut self.workers)` empties the
206        // per-worker map after relocation, which is the documented
207        // contract when the caller asked for the merged shape.
208        let mut worker = WorkerMetrics {
209            proxy: BTreeMap::new(),
210            clusters: BTreeMap::new(),
211        };
212        worker.proxy.insert(
213            "requests".to_owned(),
214            FilteredMetrics {
215                inner: Some(Inner::Count(42)),
216            },
217        );
218        let mut cluster = ClusterMetrics {
219            cluster: BTreeMap::new(),
220            backends: Vec::new(),
221        };
222        cluster.cluster.insert(
223            "requests".to_owned(),
224            FilteredMetrics {
225                inner: Some(Inner::Count(7)),
226            },
227        );
228        worker.clusters.insert("cluster-a".to_owned(), cluster);
229
230        let mut agg = AggregatedMetrics {
231            main: BTreeMap::new(),
232            workers: BTreeMap::from([("0".to_owned(), worker)]),
233            clusters: BTreeMap::new(),
234            proxying: BTreeMap::new(),
235        };
236
237        agg.merge_metrics();
238
239        assert!(
240            agg.workers.is_empty(),
241            "merge takes ownership of the per-worker map"
242        );
243        assert_eq!(
244            agg.proxying.get("requests"),
245            Some(&FilteredMetrics {
246                inner: Some(Inner::Count(42)),
247            }),
248            "single worker's proxy counter must surface in proxying"
249        );
250        let cluster_a = agg
251            .clusters
252            .get("cluster-a")
253            .expect("cluster row must surface in top-level clusters");
254        assert_eq!(
255            cluster_a.cluster.get("requests"),
256            Some(&FilteredMetrics {
257                inner: Some(Inner::Count(7)),
258            })
259        );
260    }
261
262    #[test]
263    fn merge_counts_and_gauges() {
264        let mut gauge_a = FilteredMetrics {
265            inner: Some(Inner::Gauge(4)),
266        };
267        let gauge_b = FilteredMetrics {
268            inner: Some(Inner::Gauge(4)),
269        };
270
271        gauge_a.merge(&gauge_b);
272
273        assert_eq!(
274            gauge_a,
275            FilteredMetrics {
276                inner: Some(Inner::Gauge(8)),
277            }
278        );
279
280        let mut count_a = FilteredMetrics {
281            inner: Some(Inner::Count(3)),
282        };
283        let count_b = FilteredMetrics {
284            inner: Some(Inner::Count(3)),
285        };
286
287        count_a.merge(&count_b);
288
289        assert_eq!(
290            count_a,
291            FilteredMetrics {
292                inner: Some(Inner::Count(6)),
293            }
294        );
295    }
296
297    #[test]
298    fn merge_percentiles_takes_max_per_quantile() {
299        // Multi-worker percentile aggregation propagates the worst-case
300        // quantile across workers and accumulates samples + sum so the
301        // surfaced summary remains the "is anyone slow?" upper bound.
302        let mut left = FilteredMetrics {
303            inner: Some(Inner::Percentiles(Percentiles {
304                samples: 100,
305                p_50: 5,
306                p_90: 20,
307                p_99: 100,
308                p_99_9: 200,
309                p_99_99: 250,
310                p_99_999: 300,
311                p_100: 400,
312                sum: 12_000,
313            })),
314        };
315        let right = FilteredMetrics {
316            inner: Some(Inner::Percentiles(Percentiles {
317                samples: 50,
318                p_50: 7,
319                p_90: 15,
320                p_99: 80,
321                p_99_9: 240,
322                p_99_99: 245,
323                p_99_999: 290,
324                p_100: 380,
325                sum: 6_000,
326            })),
327        };
328        left.merge(&right);
329        assert_eq!(
330            left,
331            FilteredMetrics {
332                inner: Some(Inner::Percentiles(Percentiles {
333                    samples: 150,
334                    p_50: 7,
335                    p_90: 20,
336                    p_99: 100,
337                    p_99_9: 240,
338                    p_99_99: 250,
339                    p_99_999: 300,
340                    p_100: 400,
341                    sum: 18_000,
342                })),
343            }
344        );
345    }
346
347    #[test]
348    fn merge_histograms() {
349        let mut histogram_a = FilteredMetrics {
350            inner: Some(Inner::Histogram(FilteredHistogram {
351                sum: 95,
352                count: 30,
353                buckets: vec![
354                    Bucket { le: 0, count: 1 },
355                    Bucket { le: 1, count: 2 },
356                    Bucket { le: 3, count: 10 },
357                    Bucket { le: 7, count: 25 },
358                    Bucket { le: 15, count: 27 },
359                    Bucket { le: 31, count: 30 },
360                ],
361            })),
362        };
363
364        let histogram_b = FilteredMetrics {
365            inner: Some(Inner::Histogram(FilteredHistogram {
366                sum: 82,
367                count: 40,
368                buckets: vec![
369                    Bucket { le: 0, count: 0 },
370                    Bucket { le: 1, count: 0 },
371                    Bucket { le: 3, count: 12 },
372                    Bucket { le: 7, count: 30 },
373                    Bucket { le: 15, count: 40 },
374                    // note: there is no bucket for "le: 31"
375                ],
376            })),
377        };
378
379        histogram_a.merge(&histogram_b);
380
381        let merged_histogram = FilteredMetrics {
382            inner: Some(Inner::Histogram(FilteredHistogram {
383                sum: 177,
384                count: 70,
385                buckets: vec![
386                    Bucket { le: 0, count: 1 },
387                    Bucket { le: 1, count: 2 },
388                    Bucket { le: 3, count: 22 },
389                    Bucket { le: 7, count: 55 },
390                    Bucket { le: 15, count: 67 },
391                    Bucket { le: 31, count: 70 }, // note: the total count of histogram b is added, even though histogram b has no bucket
392                ],
393            })),
394        };
395
396        assert_eq!(histogram_a, merged_histogram);
397    }
398}