sozu_command_lib/proto/
mod.rs

1use std::collections::BTreeMap;
2
3use command::{
4    filtered_metrics::Inner, AggregatedMetrics, BackendMetrics, Bucket, FilteredHistogram,
5    FilteredMetrics,
6};
7use prost::UnknownEnumValue;
8
9/// Contains all types received by and sent from Sōzu
10pub mod command;
11
12/// Implementation of fmt::Display for the protobuf types, used in the CLI
13pub mod display;
14
15#[derive(thiserror::Error, Debug)]
16pub enum DisplayError {
17    #[error("Could not display content")]
18    DisplayContent(String),
19    #[error("Error while parsing response to JSON")]
20    Json(serde_json::Error),
21    #[error("got the wrong response content type: {0}")]
22    WrongResponseType(String),
23    #[error("Could not format the datetime to ISO 8601")]
24    DateTime,
25    #[error("unrecognized protobuf variant: {0}")]
26    DecodeError(UnknownEnumValue),
27}
28
29// Simple helper to build ResponseContent from ContentType
30impl From<command::response_content::ContentType> for command::ResponseContent {
31    fn from(value: command::response_content::ContentType) -> Self {
32        Self {
33            content_type: Some(value),
34        }
35    }
36}
37
38// Simple helper to build Request from RequestType
39impl From<command::request::RequestType> for command::Request {
40    fn from(value: command::request::RequestType) -> Self {
41        Self {
42            request_type: Some(value),
43        }
44    }
45}
46
47impl AggregatedMetrics {
48    /// Merge metrics that were received from several workers
49    ///
50    /// Each worker gather the same kind of metrics,
51    /// for its own proxying logic, and for the same clusters with their backends.
52    /// This means we have to reduce each metric from N instances to 1.
53    pub fn merge_metrics(&mut self) {
54        // avoid copying the worker metrics, by taking them
55        let workers = std::mem::take(&mut self.workers);
56
57        for (_worker_id, worker) in workers {
58            for (metric_name, new_value) in worker.proxy {
59                if new_value.is_mergeable() {
60                    self.proxying
61                        .entry(metric_name)
62                        .and_modify(|old_value| old_value.merge(&new_value))
63                        .or_insert(new_value);
64                }
65            }
66
67            for (cluster_id, mut cluster_metrics) in worker.clusters {
68                for (metric_name, new_value) in cluster_metrics.cluster {
69                    if new_value.is_mergeable() {
70                        let cluster = self.clusters.entry(cluster_id.to_owned()).or_default();
71
72                        cluster
73                            .cluster
74                            .entry(metric_name)
75                            .and_modify(|old_value| old_value.merge(&new_value))
76                            .or_insert(new_value);
77                    }
78                }
79
80                for backend in cluster_metrics.backends.drain(..) {
81                    for (metric_name, new_value) in backend.metrics {
82                        if new_value.is_mergeable() {
83                            let cluster = self.clusters.entry(cluster_id.to_owned()).or_default();
84
85                            let found_backend = cluster
86                                .backends
87                                .iter_mut()
88                                .find(|present| present.backend_id == backend.backend_id);
89
90                            if let Some(existing_backend) = found_backend {
91                                let _ = existing_backend
92                                    .metrics
93                                    .entry(metric_name)
94                                    .and_modify(|old_value| old_value.merge(&new_value))
95                                    .or_insert(new_value);
96                            } else {
97                                cluster.backends.push(BackendMetrics {
98                                    backend_id: backend.backend_id.clone(),
99                                    metrics: BTreeMap::from([(metric_name, new_value)]),
100                                });
101                            };
102                        }
103                    }
104                }
105            }
106        }
107    }
108}
109
110impl FilteredMetrics {
111    pub fn merge(&mut self, right: &Self) {
112        match (&self.inner, &right.inner) {
113            (Some(Inner::Gauge(a)), Some(Inner::Gauge(b))) => {
114                *self = Self {
115                    inner: Some(Inner::Gauge(a + b)),
116                };
117            }
118            (Some(Inner::Count(a)), Some(Inner::Count(b))) => {
119                *self = Self {
120                    inner: Some(Inner::Count(a + b)),
121                };
122            }
123            (Some(Inner::Histogram(a)), Some(Inner::Histogram(b))) => {
124                let longest_len = a.buckets.len().max(b.buckets.len());
125
126                let mut a_count = 0;
127                let mut b_count = 0;
128                let buckets = (0..longest_len)
129                    .map(|i| {
130                        if let Some(a_bucket) = a.buckets.get(i) {
131                            a_count = a_bucket.count;
132                        }
133                        if let Some(b_bucket) = b.buckets.get(i) {
134                            b_count = b_bucket.count;
135                        }
136                        Bucket {
137                            le: (1 << i) - 1, // the bucket less-or-equal limits are normalized: 0, 1, 3, 7, 15, ...
138                            count: a_count + b_count,
139                        }
140                    })
141                    .collect();
142
143                *self = Self {
144                    inner: Some(Inner::Histogram(FilteredHistogram {
145                        count: a.count + b.count,
146                        sum: a.sum + b.sum,
147                        buckets,
148                    })),
149                };
150            }
151            _ => {}
152        }
153    }
154
155    fn is_mergeable(&self) -> bool {
156        match &self.inner {
157            Some(Inner::Gauge(_)) | Some(Inner::Count(_)) | Some(Inner::Histogram(_)) => true,
158            // Inner::Time and Inner::Timeserie are never used in Sōzu
159            Some(Inner::Time(_))
160            | Some(Inner::Percentiles(_))
161            | Some(Inner::TimeSerie(_))
162            | None => false,
163        }
164    }
165}
166
167#[cfg(test)]
168mod tests {
169    use super::command::{filtered_metrics::Inner, Bucket, FilteredHistogram, FilteredMetrics};
170
171    #[test]
172    fn merge_counts_and_gauges() {
173        let mut gauge_a = FilteredMetrics {
174            inner: Some(Inner::Gauge(4)),
175        };
176        let gauge_b = FilteredMetrics {
177            inner: Some(Inner::Gauge(4)),
178        };
179
180        gauge_a.merge(&gauge_b);
181
182        assert_eq!(
183            gauge_a,
184            FilteredMetrics {
185                inner: Some(Inner::Gauge(8)),
186            }
187        );
188
189        let mut count_a = FilteredMetrics {
190            inner: Some(Inner::Count(3)),
191        };
192        let count_b = FilteredMetrics {
193            inner: Some(Inner::Count(3)),
194        };
195
196        count_a.merge(&count_b);
197
198        assert_eq!(
199            count_a,
200            FilteredMetrics {
201                inner: Some(Inner::Count(6)),
202            }
203        );
204    }
205
206    #[test]
207    fn merge_histograms() {
208        let mut histogram_a = FilteredMetrics {
209            inner: Some(Inner::Histogram(FilteredHistogram {
210                sum: 95,
211                count: 30,
212                buckets: vec![
213                    Bucket { le: 0, count: 1 },
214                    Bucket { le: 1, count: 2 },
215                    Bucket { le: 3, count: 10 },
216                    Bucket { le: 7, count: 25 },
217                    Bucket { le: 15, count: 27 },
218                    Bucket { le: 31, count: 30 },
219                ],
220            })),
221        };
222
223        let histogram_b = FilteredMetrics {
224            inner: Some(Inner::Histogram(FilteredHistogram {
225                sum: 82,
226                count: 40,
227                buckets: vec![
228                    Bucket { le: 0, count: 0 },
229                    Bucket { le: 1, count: 0 },
230                    Bucket { le: 3, count: 12 },
231                    Bucket { le: 7, count: 30 },
232                    Bucket { le: 15, count: 40 },
233                    // note: there is no bucket for "le: 31"
234                ],
235            })),
236        };
237
238        histogram_a.merge(&histogram_b);
239
240        let merged_histogram = FilteredMetrics {
241            inner: Some(Inner::Histogram(FilteredHistogram {
242                sum: 177,
243                count: 70,
244                buckets: vec![
245                    Bucket { le: 0, count: 1 },
246                    Bucket { le: 1, count: 2 },
247                    Bucket { le: 3, count: 22 },
248                    Bucket { le: 7, count: 55 },
249                    Bucket { le: 15, count: 67 },
250                    Bucket { le: 31, count: 70 }, // note: the total count of histogram b is added, even though histogram b has no bucket
251                ],
252            })),
253        };
254
255        assert_eq!(histogram_a, merged_histogram);
256    }
257}