sozu_lib/metrics/
local_drain.rs

1//! A local drain to accumulate metrics and return them in a protobuf format
2//!
3//! The metrics are stored following this hierarchy (pseudo-rust):
4//!
5//! ```plain
6//! LocalDrain {
7//!     proxy_metrics: MetricsMap {
8//!         map: BTreeMap<metric_name, AggregatedMetric>
9//!     },
10//!     cluster_metrics: BTreeMap<cluster_id, LocalClusterMetrics {
11//!         cluster: MetricsMap {
12//!             map: BTreeMap<metric_name, AggregatedMetric>
13//!         },
14//!         backends: Vec<LocalBackendMetrics {
15//!             backend_id,
16//!             MetricsMap {
17//!                 map: BTreeMap<metric_name, AggregatedMetric>
18//!             },
19//!         }>
20//!     }>
21//! }
22//! ```
23
24#![allow(dead_code)]
25use std::{
26    collections::{BTreeMap, HashSet},
27    str,
28    time::Instant,
29};
30
31use hdrhistogram::Histogram;
32use sozu_command::proto::command::{
33    AvailableMetrics, BackendMetrics, Bucket, ClusterMetrics, FilteredHistogram, FilteredMetrics,
34    MetricsConfiguration, Percentiles, QueryMetricsOptions, ResponseContent, WorkerMetrics,
35    filtered_metrics, response_content::ContentType,
36};
37
38use crate::metrics::{MetricError, MetricValue, Subscriber};
39
40/// metrics as stored in the local drain
41#[derive(Debug, Clone)]
42pub enum AggregatedMetric {
43    Gauge(usize),
44    Count(i64),
45    Time(Histogram<u32>),
46}
47
48impl AggregatedMetric {
49    fn new(metric: MetricValue) -> Result<AggregatedMetric, MetricError> {
50        match metric {
51            MetricValue::Gauge(value) => Ok(AggregatedMetric::Gauge(value)),
52            MetricValue::GaugeAdd(value) => Ok(AggregatedMetric::Gauge(value as usize)),
53            MetricValue::Count(value) => Ok(AggregatedMetric::Count(value)),
54            MetricValue::Time(value) => {
55                let mut histogram = ::hdrhistogram::Histogram::new(3).map_err(|error| {
56                    MetricError::HistogramCreation {
57                        time_metric: metric.clone(),
58                        error: error.to_string(),
59                    }
60                })?;
61
62                histogram.record(value as u64).map_err(|error| {
63                    MetricError::TimeMetricRecordingError {
64                        time_metric: metric.clone(),
65                        error: error.to_string(),
66                    }
67                })?;
68
69                Ok(AggregatedMetric::Time(histogram))
70            }
71        }
72    }
73
74    fn update(&mut self, key: &str, m: MetricValue) {
75        match (self, m) {
76            (&mut AggregatedMetric::Gauge(ref mut v1), MetricValue::Gauge(v2)) => {
77                *v1 = v2;
78            }
79            (&mut AggregatedMetric::Gauge(ref mut v1), MetricValue::GaugeAdd(v2)) => {
80                *v1 = (*v1 as i64 + v2) as usize;
81            }
82            (&mut AggregatedMetric::Count(ref mut v1), MetricValue::Count(v2)) => {
83                *v1 += v2;
84            }
85            (&mut AggregatedMetric::Time(ref mut v1), MetricValue::Time(v2)) => {
86                if let Err(e) = (*v1).record(v2 as u64) {
87                    error!("could not record time metric: {:?}", e.to_string());
88                }
89            }
90            (s, m) => panic!(
91                "tried to update metric {key} of value {s:?} with an incompatible metric: {m:?}"
92            ),
93        }
94    }
95
96    pub fn to_filtered(&self) -> FilteredMetrics {
97        match *self {
98            AggregatedMetric::Gauge(i) => FilteredMetrics {
99                inner: Some(filtered_metrics::Inner::Gauge(i as u64)),
100            },
101            AggregatedMetric::Count(i) => FilteredMetrics {
102                inner: Some(filtered_metrics::Inner::Count(i)),
103            },
104            AggregatedMetric::Time(ref hist) => FilteredMetrics {
105                inner: Some(filtered_metrics::Inner::Percentiles(
106                    histogram_to_percentiles(hist),
107                )),
108            },
109        }
110    }
111}
112
113pub fn histogram_to_percentiles(hist: &Histogram<u32>) -> Percentiles {
114    let sum = hist.len() as f64 * hist.mean();
115    Percentiles {
116        samples: hist.len(),
117        p_50: hist.value_at_percentile(50.0),
118        p_90: hist.value_at_percentile(90.0),
119        p_99: hist.value_at_percentile(99.0),
120        p_99_9: hist.value_at_percentile(99.9),
121        p_99_99: hist.value_at_percentile(99.99),
122        p_99_999: hist.value_at_percentile(99.999),
123        p_100: hist.value_at_percentile(100.0),
124        sum: sum as u64,
125    }
126}
127
128/// convert a collected histogram to a prometheus-compatible format
129pub fn filter_histogram(hist: &Histogram<u32>) -> FilteredMetrics {
130    let sum: u64 = hist
131        .iter_recorded()
132        .map(|item| item.value_iterated_to() * item.count_at_value() as u64)
133        .sum();
134
135    let mut count = 0;
136    let buckets = hist
137        .iter_log(1, 2.0)
138        .map(|value| {
139            count += value.count_since_last_iteration();
140            Bucket {
141                le: value.value_iterated_to(),
142                count,
143            }
144        })
145        .collect();
146
147    FilteredMetrics {
148        inner: Some(filtered_metrics::Inner::Histogram(FilteredHistogram {
149            sum,
150            count,
151            buckets,
152        })),
153    }
154}
155
156/// a map of metric_name -> metric value
157#[derive(Debug, Clone, Default)]
158pub struct MetricsMap {
159    map: BTreeMap<String, AggregatedMetric>,
160}
161
162impl MetricsMap {
163    fn new() -> Self {
164        Self {
165            map: BTreeMap::new(),
166        }
167    }
168
169    /// convert a metrics map to a map of filtered metrics,
170    /// perform a double conversion for time metrics: to percentiles and to histogram
171    fn to_filtered_metrics(&self, filter_by_names: &[String]) -> BTreeMap<String, FilteredMetrics> {
172        self.map
173            .iter()
174            .filter(|(name, _)| {
175                if !filter_by_names.is_empty() {
176                    filter_by_names.contains(name)
177                } else {
178                    true
179                }
180            })
181            .flat_map(|(name, metric)| {
182                let mut filtered = vec![(name.to_owned(), metric.to_filtered())];
183
184                // convert time metrics to a histogram format, on top of percentiles
185                if let AggregatedMetric::Time(hist) = metric {
186                    filtered.push((format!("{}_histogram", name), filter_histogram(hist)));
187                }
188                filtered.into_iter()
189            })
190            .collect()
191    }
192
193    /// update an old value with the new one, or insert the new one
194    fn receive_metric(
195        &mut self,
196        metric_name: &str,
197        new_value: MetricValue,
198    ) -> Result<(), MetricError> {
199        match self.map.get_mut(metric_name) {
200            Some(old_value) => old_value.update(metric_name, new_value),
201            None => {
202                let aggregated_metric = AggregatedMetric::new(new_value)?;
203                self.map.insert(metric_name.to_owned(), aggregated_metric);
204            }
205        }
206        Ok(())
207    }
208
209    fn metric_names(&self) -> impl Iterator<Item = &str> {
210        self.map.keys().map(|name| name.as_str())
211    }
212}
213
214/// local equivalent to proto::command::ClusterMetrics
215#[derive(Debug, Default)]
216pub struct LocalClusterMetrics {
217    cluster: MetricsMap,
218    backends: Vec<LocalBackendMetrics>,
219}
220
221impl LocalClusterMetrics {
222    fn receive_metric(
223        &mut self,
224        metric_name: &str,
225        metric: MetricValue,
226    ) -> Result<(), MetricError> {
227        self.cluster.receive_metric(metric_name, metric)
228    }
229
230    fn receive_backend_metric(
231        &mut self,
232        metric_name: &str,
233        backend_id: &str,
234        new_value: MetricValue,
235    ) -> Result<(), MetricError> {
236        let backend = self
237            .backends
238            .iter_mut()
239            .find(|backend| backend.backend_id == backend_id);
240
241        if let Some(backend) = backend {
242            backend.metrics.receive_metric(metric_name, new_value)?;
243            return Ok(());
244        }
245
246        let mut metrics = MetricsMap::new();
247        metrics.receive_metric(metric_name, new_value)?;
248
249        self.backends.push(LocalBackendMetrics {
250            backend_id: backend_id.to_owned(),
251            metrics,
252        });
253        Ok(())
254    }
255
256    fn to_filtered_metrics(&self, metric_names: &[String]) -> Result<ClusterMetrics, MetricError> {
257        let cluster = self.cluster.to_filtered_metrics(metric_names);
258
259        let mut backends: Vec<BackendMetrics> = Vec::new();
260        for backend in &self.backends {
261            backends.push(backend.to_filtered_metrics(metric_names)?);
262        }
263        Ok(ClusterMetrics { cluster, backends })
264    }
265
266    fn metric_names(&self) -> impl Iterator<Item = &str> {
267        let mut dedup_set = HashSet::new();
268        self.cluster
269            .metric_names()
270            .chain(
271                self.backends
272                    .iter()
273                    .flat_map(|backend| backend.metrics_names()),
274            )
275            .filter(move |&item| dedup_set.insert(item))
276    }
277
278    fn contains_backend(&self, backend_id: &str) -> bool {
279        for backend in &self.backends {
280            if backend.backend_id == backend_id {
281                return true;
282            }
283        }
284        false
285    }
286}
287
288/// local equivalent to proto::command::BackendMetrics
289#[derive(Debug, Clone)]
290pub struct LocalBackendMetrics {
291    backend_id: String,
292    metrics: MetricsMap,
293}
294
295impl LocalBackendMetrics {
296    fn to_filtered_metrics(&self, metric_names: &[String]) -> Result<BackendMetrics, MetricError> {
297        let filtered_backend_metrics = self.metrics.to_filtered_metrics(metric_names);
298
299        Ok(BackendMetrics {
300            backend_id: self.backend_id.to_owned(),
301            metrics: filtered_backend_metrics,
302        })
303    }
304
305    fn metrics_names(&self) -> impl Iterator<Item = &str> {
306        self.metrics.metric_names()
307    }
308}
309
310/// This gathers metrics locally, to be queried by the CLI
311#[derive(Debug)]
312pub struct LocalDrain {
313    /// a prefix to metric keys, usually "sozu-"
314    pub prefix: String,
315    pub created: Instant,
316    /// metrics of the proxy server (metric_name -> metric value)
317    pub proxy_metrics: MetricsMap,
318    /// cluster_id -> cluster_metrics
319    cluster_metrics: BTreeMap<String, LocalClusterMetrics>,
320    use_tagged_metrics: bool,
321    origin: String,
322    disable_cluster_metrics: bool,
323}
324
325impl LocalDrain {
326    pub fn new(prefix: String) -> Self {
327        LocalDrain {
328            prefix,
329            created: Instant::now(),
330            proxy_metrics: MetricsMap::new(),
331            cluster_metrics: BTreeMap::new(),
332            use_tagged_metrics: false,
333            origin: String::from("x"),
334            disable_cluster_metrics: false,
335        }
336    }
337
338    pub fn configure(&mut self, config: &MetricsConfiguration) {
339        match config {
340            MetricsConfiguration::Enabled => self.disable_cluster_metrics = false,
341            MetricsConfiguration::Disabled => self.disable_cluster_metrics = true,
342            MetricsConfiguration::Clear => self.clear(),
343        }
344    }
345
346    pub fn clear(&mut self) {
347        self.cluster_metrics.clear();
348    }
349
350    pub fn query(&mut self, options: &QueryMetricsOptions) -> Result<ResponseContent, MetricError> {
351        trace!(
352            "The local drain received a metrics query with this options: {:?}",
353            options
354        );
355
356        let QueryMetricsOptions {
357            metric_names,
358            cluster_ids,
359            backend_ids,
360            list,
361            no_clusters,
362            workers: _workers,
363        } = options;
364
365        if *list {
366            return self.list_all_metric_names();
367        }
368
369        if *no_clusters {
370            let proxy_metrics = self.dump_proxy_metrics(metric_names);
371            return Ok(ContentType::WorkerMetrics(WorkerMetrics {
372                proxy: proxy_metrics,
373                clusters: BTreeMap::new(),
374            })
375            .into());
376        }
377
378        let worker_metrics = match (cluster_ids.is_empty(), backend_ids.is_empty()) {
379            (false, _) => self.query_clusters(cluster_ids, metric_names)?,
380            (true, false) => self.query_backends(backend_ids, metric_names)?,
381            (true, true) => self.dump_all_metrics(metric_names)?,
382        };
383
384        Ok(ContentType::WorkerMetrics(worker_metrics).into())
385    }
386
387    fn list_all_metric_names(&self) -> Result<ResponseContent, MetricError> {
388        let proxy_metrics = self
389            .proxy_metrics
390            .metric_names()
391            .map(ToString::to_string)
392            .collect();
393
394        let mut dedup_set = HashSet::new();
395
396        let mut cluster_metrics: Vec<String> = self
397            .cluster_metrics
398            .values()
399            .flat_map(|cluster| cluster.metric_names())
400            .filter(move |&item| dedup_set.insert(item))
401            .map(ToString::to_string)
402            .collect();
403
404        cluster_metrics.sort_unstable();
405
406        Ok(ContentType::AvailableMetrics(AvailableMetrics {
407            proxy_metrics,
408            cluster_metrics,
409        })
410        .into())
411    }
412
413    pub fn dump_all_metrics(
414        &mut self,
415        metric_names: &[String],
416    ) -> Result<WorkerMetrics, MetricError> {
417        Ok(WorkerMetrics {
418            proxy: self.dump_proxy_metrics(metric_names),
419            clusters: self.dump_cluster_metrics(metric_names)?,
420        })
421    }
422
423    pub fn dump_proxy_metrics(
424        &mut self,
425        metric_names: &[String],
426    ) -> BTreeMap<String, FilteredMetrics> {
427        self.proxy_metrics.to_filtered_metrics(metric_names)
428    }
429
430    pub fn dump_cluster_metrics(
431        &mut self,
432        metric_names: &[String],
433    ) -> Result<BTreeMap<String, ClusterMetrics>, MetricError> {
434        self.cluster_metrics
435            .keys()
436            .map(|cluster_id| {
437                Ok((
438                    cluster_id.to_owned(),
439                    self.metrics_of_one_cluster(cluster_id, metric_names)?,
440                ))
441            })
442            .collect()
443    }
444
445    fn metrics_of_one_cluster(
446        &self,
447        cluster_id: &str,
448        metric_names: &[String],
449    ) -> Result<ClusterMetrics, MetricError> {
450        let local_cluster_metrics = self
451            .cluster_metrics
452            .get(cluster_id)
453            .ok_or(MetricError::NoMetrics(cluster_id.to_owned()))?;
454
455        let filtered = local_cluster_metrics.to_filtered_metrics(metric_names)?;
456
457        Ok(filtered)
458    }
459
460    fn metrics_of_one_backend(
461        &self,
462        backend_id: &str,
463        metric_names: &[String],
464    ) -> Result<BackendMetrics, MetricError> {
465        for cluster_metrics in self.cluster_metrics.values() {
466            if let Some(backend_metrics) = cluster_metrics
467                .backends
468                .iter()
469                .find(|backend_metrics| backend_metrics.backend_id == backend_id)
470            {
471                return backend_metrics.to_filtered_metrics(metric_names);
472            }
473        }
474
475        Err(MetricError::NoMetrics(format!(
476            "No metric for backend {}",
477            backend_id
478        )))
479    }
480
481    fn query_clusters(
482        &mut self,
483        cluster_ids: &[String],
484        metric_names: &[String],
485    ) -> Result<WorkerMetrics, MetricError> {
486        debug!("Querying cluster with ids: {:?}", cluster_ids);
487        let mut clusters: BTreeMap<String, ClusterMetrics> = BTreeMap::new();
488
489        for cluster_id in cluster_ids {
490            clusters.insert(
491                cluster_id.to_owned(),
492                self.metrics_of_one_cluster(cluster_id, metric_names)?,
493            );
494        }
495
496        trace!("query result: {:#?}", clusters);
497        Ok(WorkerMetrics {
498            proxy: BTreeMap::new(),
499            clusters,
500        })
501    }
502
503    fn query_backends(
504        &mut self,
505        backend_ids: &[String],
506        metric_names: &[String],
507    ) -> Result<WorkerMetrics, MetricError> {
508        let mut clusters: BTreeMap<String, ClusterMetrics> = BTreeMap::new();
509
510        for backend_id in backend_ids {
511            // find the cluster
512            let (cluster_id, cluster) = match self
513                .cluster_metrics
514                .iter()
515                .find(|(_, cluster)| cluster.contains_backend(backend_id))
516            {
517                Some(cluster) => cluster,
518                None => continue,
519            };
520
521            let mut backend_metrics = Vec::new();
522            for backend in &cluster.backends {
523                backend_metrics.push(backend.to_filtered_metrics(metric_names)?);
524            }
525
526            clusters.insert(
527                cluster_id.to_owned(),
528                ClusterMetrics {
529                    cluster: BTreeMap::new(),
530                    backends: backend_metrics,
531                },
532            );
533        }
534
535        trace!("query result: {:#?}", clusters);
536        Ok(WorkerMetrics {
537            proxy: BTreeMap::new(),
538            clusters,
539        })
540    }
541
542    fn receive_cluster_metric(
543        &mut self,
544        metric_name: &str,
545        cluster_id: &str,
546        metric: MetricValue,
547    ) -> Result<(), MetricError> {
548        if self.disable_cluster_metrics {
549            return Ok(());
550        }
551
552        let local_cluster_metric = self
553            .cluster_metrics
554            .entry(cluster_id.to_owned())
555            .or_default();
556
557        local_cluster_metric.receive_metric(metric_name, metric)
558    }
559
560    fn receive_backend_metric(
561        &mut self,
562        metric_name: &str,
563        cluster_id: &str,
564        backend_id: &str,
565        metric: MetricValue,
566    ) -> Result<(), MetricError> {
567        if self.disable_cluster_metrics {
568            return Ok(());
569        }
570
571        let local_cluster_metric = self
572            .cluster_metrics
573            .entry(cluster_id.to_owned())
574            .or_default();
575
576        local_cluster_metric.receive_backend_metric(metric_name, backend_id, metric)
577    }
578
579    fn receive_proxy_metric(
580        &mut self,
581        metric_name: &str,
582        metric: MetricValue,
583    ) -> Result<(), MetricError> {
584        match self.proxy_metrics.map.get_mut(metric_name) {
585            Some(stored_metric) => stored_metric.update(metric_name, metric),
586            None => {
587                let aggregated_metric = AggregatedMetric::new(metric)?;
588
589                self.proxy_metrics
590                    .map
591                    .insert(String::from(metric_name), aggregated_metric);
592            }
593        }
594        Ok(())
595    }
596}
597
598impl Subscriber for LocalDrain {
599    fn receive_metric(
600        &mut self,
601        key: &'static str,
602        cluster_id: Option<&str>,
603        backend_id: Option<&str>,
604        metric: MetricValue,
605    ) {
606        trace!(
607            "receiving metric with key {}, cluster_id: {:?}, backend_id: {:?}, metric data: {:?}",
608            key, cluster_id, backend_id, metric
609        );
610
611        let receive_result = match (cluster_id, backend_id) {
612            (Some(cluster_id), Some(backend_id)) => {
613                self.receive_backend_metric(key, cluster_id, backend_id, metric)
614            }
615            (Some(cluster_id), None) => self.receive_cluster_metric(key, cluster_id, metric),
616            (None, _) => self.receive_proxy_metric(key, metric),
617        };
618
619        if let Err(e) = receive_result {
620            error!("Could not receive metric: {}", e);
621        }
622    }
623}
624#[cfg(test)]
625mod tests {
626    use sozu_command::proto::command::{FilteredMetrics, filtered_metrics::Inner};
627
628    use super::*;
629
630    #[test]
631    fn receive_and_yield_backend_metrics() {
632        let mut local_drain = LocalDrain::new("prefix".to_string());
633
634        local_drain.receive_metric(
635            "connections_per_backend",
636            Some("test-cluster"),
637            Some("test-backend-1"),
638            MetricValue::Count(1),
639        );
640        local_drain.receive_metric(
641            "connections_per_backend",
642            Some("test-cluster"),
643            Some("test-backend-1"),
644            MetricValue::Count(1),
645        );
646
647        let mut expected_metrics_1 = BTreeMap::new();
648        expected_metrics_1.insert(
649            "connections_per_backend".to_string(),
650            FilteredMetrics {
651                inner: Some(Inner::Count(2)),
652            },
653        );
654
655        let expected_backend_metrics = BackendMetrics {
656            backend_id: "test-backend-1".to_string(),
657            metrics: expected_metrics_1,
658        };
659
660        assert_eq!(
661            expected_backend_metrics,
662            local_drain
663                .metrics_of_one_backend(
664                    "test-backend-1",
665                    ["connections_per_backend".to_string()].as_ref(),
666                )
667                .expect("could not query metrics for this backend")
668        )
669    }
670
671    #[test]
672    fn receive_and_yield_cluster_metrics() {
673        let mut local_drain = LocalDrain::new("prefix".to_string());
674        local_drain.receive_metric(
675            "http_errors",
676            Some("test-cluster"),
677            None,
678            MetricValue::Count(1),
679        );
680        local_drain.receive_metric(
681            "http_errors",
682            Some("test-cluster"),
683            None,
684            MetricValue::Count(1),
685        );
686
687        let mut map = BTreeMap::new();
688        map.insert(
689            "http_errors".to_string(),
690            FilteredMetrics {
691                inner: Some(Inner::Count(2)),
692            },
693        );
694        let expected_cluster_metrics = ClusterMetrics {
695            cluster: map,
696            backends: vec![],
697        };
698
699        let returned_cluster_metrics = local_drain
700            .metrics_of_one_cluster("test-cluster", ["http_errors".to_string()].as_ref())
701            .expect("could not query metrics for this cluster");
702
703        assert_eq!(expected_cluster_metrics, returned_cluster_metrics);
704    }
705}