1#![allow(dead_code)]
25use std::{
26 collections::{BTreeMap, HashSet},
27 str,
28 time::Instant,
29};
30
31use hdrhistogram::Histogram;
32use sozu_command::proto::command::{
33 AvailableMetrics, BackendMetrics, Bucket, ClusterMetrics, FilteredHistogram, FilteredMetrics,
34 MetricsConfiguration, Percentiles, QueryMetricsOptions, ResponseContent, WorkerMetrics,
35 filtered_metrics, response_content::ContentType,
36};
37
38use crate::metrics::{MetricError, MetricValue, Subscriber};
39
40#[derive(Debug, Clone)]
42pub enum AggregatedMetric {
43 Gauge(usize),
44 Count(i64),
45 Time(Histogram<u32>),
46}
47
48impl AggregatedMetric {
49 fn new(metric: MetricValue) -> Result<AggregatedMetric, MetricError> {
50 match metric {
51 MetricValue::Gauge(value) => Ok(AggregatedMetric::Gauge(value)),
52 MetricValue::GaugeAdd(value) => Ok(AggregatedMetric::Gauge(value as usize)),
53 MetricValue::Count(value) => Ok(AggregatedMetric::Count(value)),
54 MetricValue::Time(value) => {
55 let mut histogram = ::hdrhistogram::Histogram::new(3).map_err(|error| {
56 MetricError::HistogramCreation {
57 time_metric: metric.clone(),
58 error: error.to_string(),
59 }
60 })?;
61
62 histogram.record(value as u64).map_err(|error| {
63 MetricError::TimeMetricRecordingError {
64 time_metric: metric.clone(),
65 error: error.to_string(),
66 }
67 })?;
68
69 Ok(AggregatedMetric::Time(histogram))
70 }
71 }
72 }
73
74 fn update(&mut self, key: &str, m: MetricValue) {
75 match (self, m) {
76 (&mut AggregatedMetric::Gauge(ref mut v1), MetricValue::Gauge(v2)) => {
77 *v1 = v2;
78 }
79 (&mut AggregatedMetric::Gauge(ref mut v1), MetricValue::GaugeAdd(v2)) => {
80 *v1 = (*v1 as i64 + v2) as usize;
81 }
82 (&mut AggregatedMetric::Count(ref mut v1), MetricValue::Count(v2)) => {
83 *v1 += v2;
84 }
85 (&mut AggregatedMetric::Time(ref mut v1), MetricValue::Time(v2)) => {
86 if let Err(e) = (*v1).record(v2 as u64) {
87 error!("could not record time metric: {:?}", e.to_string());
88 }
89 }
90 (s, m) => panic!(
91 "tried to update metric {key} of value {s:?} with an incompatible metric: {m:?}"
92 ),
93 }
94 }
95
96 pub fn to_filtered(&self) -> FilteredMetrics {
97 match *self {
98 AggregatedMetric::Gauge(i) => FilteredMetrics {
99 inner: Some(filtered_metrics::Inner::Gauge(i as u64)),
100 },
101 AggregatedMetric::Count(i) => FilteredMetrics {
102 inner: Some(filtered_metrics::Inner::Count(i)),
103 },
104 AggregatedMetric::Time(ref hist) => FilteredMetrics {
105 inner: Some(filtered_metrics::Inner::Percentiles(
106 histogram_to_percentiles(hist),
107 )),
108 },
109 }
110 }
111}
112
113pub fn histogram_to_percentiles(hist: &Histogram<u32>) -> Percentiles {
114 let sum = hist.len() as f64 * hist.mean();
115 Percentiles {
116 samples: hist.len(),
117 p_50: hist.value_at_percentile(50.0),
118 p_90: hist.value_at_percentile(90.0),
119 p_99: hist.value_at_percentile(99.0),
120 p_99_9: hist.value_at_percentile(99.9),
121 p_99_99: hist.value_at_percentile(99.99),
122 p_99_999: hist.value_at_percentile(99.999),
123 p_100: hist.value_at_percentile(100.0),
124 sum: sum as u64,
125 }
126}
127
128pub fn filter_histogram(hist: &Histogram<u32>) -> FilteredMetrics {
130 let sum: u64 = hist
131 .iter_recorded()
132 .map(|item| item.value_iterated_to() * item.count_at_value() as u64)
133 .sum();
134
135 let mut count = 0;
136 let buckets = hist
137 .iter_log(1, 2.0)
138 .map(|value| {
139 count += value.count_since_last_iteration();
140 Bucket {
141 le: value.value_iterated_to(),
142 count,
143 }
144 })
145 .collect();
146
147 FilteredMetrics {
148 inner: Some(filtered_metrics::Inner::Histogram(FilteredHistogram {
149 sum,
150 count,
151 buckets,
152 })),
153 }
154}
155
156#[derive(Debug, Clone, Default)]
158pub struct MetricsMap {
159 map: BTreeMap<String, AggregatedMetric>,
160}
161
162impl MetricsMap {
163 fn new() -> Self {
164 Self {
165 map: BTreeMap::new(),
166 }
167 }
168
169 fn to_filtered_metrics(&self, filter_by_names: &[String]) -> BTreeMap<String, FilteredMetrics> {
172 self.map
173 .iter()
174 .filter(|(name, _)| {
175 if !filter_by_names.is_empty() {
176 filter_by_names.contains(name)
177 } else {
178 true
179 }
180 })
181 .flat_map(|(name, metric)| {
182 let mut filtered = vec![(name.to_owned(), metric.to_filtered())];
183
184 if let AggregatedMetric::Time(hist) = metric {
186 filtered.push((format!("{}_histogram", name), filter_histogram(hist)));
187 }
188 filtered.into_iter()
189 })
190 .collect()
191 }
192
193 fn receive_metric(
195 &mut self,
196 metric_name: &str,
197 new_value: MetricValue,
198 ) -> Result<(), MetricError> {
199 match self.map.get_mut(metric_name) {
200 Some(old_value) => old_value.update(metric_name, new_value),
201 None => {
202 let aggregated_metric = AggregatedMetric::new(new_value)?;
203 self.map.insert(metric_name.to_owned(), aggregated_metric);
204 }
205 }
206 Ok(())
207 }
208
209 fn metric_names(&self) -> impl Iterator<Item = &str> {
210 self.map.keys().map(|name| name.as_str())
211 }
212}
213
214#[derive(Debug, Default)]
216pub struct LocalClusterMetrics {
217 cluster: MetricsMap,
218 backends: Vec<LocalBackendMetrics>,
219}
220
221impl LocalClusterMetrics {
222 fn receive_metric(
223 &mut self,
224 metric_name: &str,
225 metric: MetricValue,
226 ) -> Result<(), MetricError> {
227 self.cluster.receive_metric(metric_name, metric)
228 }
229
230 fn receive_backend_metric(
231 &mut self,
232 metric_name: &str,
233 backend_id: &str,
234 new_value: MetricValue,
235 ) -> Result<(), MetricError> {
236 let backend = self
237 .backends
238 .iter_mut()
239 .find(|backend| backend.backend_id == backend_id);
240
241 if let Some(backend) = backend {
242 backend.metrics.receive_metric(metric_name, new_value)?;
243 return Ok(());
244 }
245
246 let mut metrics = MetricsMap::new();
247 metrics.receive_metric(metric_name, new_value)?;
248
249 self.backends.push(LocalBackendMetrics {
250 backend_id: backend_id.to_owned(),
251 metrics,
252 });
253 Ok(())
254 }
255
256 fn to_filtered_metrics(&self, metric_names: &[String]) -> Result<ClusterMetrics, MetricError> {
257 let cluster = self.cluster.to_filtered_metrics(metric_names);
258
259 let mut backends: Vec<BackendMetrics> = Vec::new();
260 for backend in &self.backends {
261 backends.push(backend.to_filtered_metrics(metric_names)?);
262 }
263 Ok(ClusterMetrics { cluster, backends })
264 }
265
266 fn metric_names(&self) -> impl Iterator<Item = &str> {
267 let mut dedup_set = HashSet::new();
268 self.cluster
269 .metric_names()
270 .chain(
271 self.backends
272 .iter()
273 .flat_map(|backend| backend.metrics_names()),
274 )
275 .filter(move |&item| dedup_set.insert(item))
276 }
277
278 fn contains_backend(&self, backend_id: &str) -> bool {
279 for backend in &self.backends {
280 if backend.backend_id == backend_id {
281 return true;
282 }
283 }
284 false
285 }
286}
287
288#[derive(Debug, Clone)]
290pub struct LocalBackendMetrics {
291 backend_id: String,
292 metrics: MetricsMap,
293}
294
295impl LocalBackendMetrics {
296 fn to_filtered_metrics(&self, metric_names: &[String]) -> Result<BackendMetrics, MetricError> {
297 let filtered_backend_metrics = self.metrics.to_filtered_metrics(metric_names);
298
299 Ok(BackendMetrics {
300 backend_id: self.backend_id.to_owned(),
301 metrics: filtered_backend_metrics,
302 })
303 }
304
305 fn metrics_names(&self) -> impl Iterator<Item = &str> {
306 self.metrics.metric_names()
307 }
308}
309
310#[derive(Debug)]
312pub struct LocalDrain {
313 pub prefix: String,
315 pub created: Instant,
316 pub proxy_metrics: MetricsMap,
318 cluster_metrics: BTreeMap<String, LocalClusterMetrics>,
320 use_tagged_metrics: bool,
321 origin: String,
322 disable_cluster_metrics: bool,
323}
324
325impl LocalDrain {
326 pub fn new(prefix: String) -> Self {
327 LocalDrain {
328 prefix,
329 created: Instant::now(),
330 proxy_metrics: MetricsMap::new(),
331 cluster_metrics: BTreeMap::new(),
332 use_tagged_metrics: false,
333 origin: String::from("x"),
334 disable_cluster_metrics: false,
335 }
336 }
337
338 pub fn configure(&mut self, config: &MetricsConfiguration) {
339 match config {
340 MetricsConfiguration::Enabled => self.disable_cluster_metrics = false,
341 MetricsConfiguration::Disabled => self.disable_cluster_metrics = true,
342 MetricsConfiguration::Clear => self.clear(),
343 }
344 }
345
346 pub fn clear(&mut self) {
347 self.cluster_metrics.clear();
348 }
349
350 pub fn query(&mut self, options: &QueryMetricsOptions) -> Result<ResponseContent, MetricError> {
351 trace!(
352 "The local drain received a metrics query with this options: {:?}",
353 options
354 );
355
356 let QueryMetricsOptions {
357 metric_names,
358 cluster_ids,
359 backend_ids,
360 list,
361 no_clusters,
362 workers: _workers,
363 } = options;
364
365 if *list {
366 return self.list_all_metric_names();
367 }
368
369 if *no_clusters {
370 let proxy_metrics = self.dump_proxy_metrics(metric_names);
371 return Ok(ContentType::WorkerMetrics(WorkerMetrics {
372 proxy: proxy_metrics,
373 clusters: BTreeMap::new(),
374 })
375 .into());
376 }
377
378 let worker_metrics = match (cluster_ids.is_empty(), backend_ids.is_empty()) {
379 (false, _) => self.query_clusters(cluster_ids, metric_names)?,
380 (true, false) => self.query_backends(backend_ids, metric_names)?,
381 (true, true) => self.dump_all_metrics(metric_names)?,
382 };
383
384 Ok(ContentType::WorkerMetrics(worker_metrics).into())
385 }
386
387 fn list_all_metric_names(&self) -> Result<ResponseContent, MetricError> {
388 let proxy_metrics = self
389 .proxy_metrics
390 .metric_names()
391 .map(ToString::to_string)
392 .collect();
393
394 let mut dedup_set = HashSet::new();
395
396 let mut cluster_metrics: Vec<String> = self
397 .cluster_metrics
398 .values()
399 .flat_map(|cluster| cluster.metric_names())
400 .filter(move |&item| dedup_set.insert(item))
401 .map(ToString::to_string)
402 .collect();
403
404 cluster_metrics.sort_unstable();
405
406 Ok(ContentType::AvailableMetrics(AvailableMetrics {
407 proxy_metrics,
408 cluster_metrics,
409 })
410 .into())
411 }
412
413 pub fn dump_all_metrics(
414 &mut self,
415 metric_names: &[String],
416 ) -> Result<WorkerMetrics, MetricError> {
417 Ok(WorkerMetrics {
418 proxy: self.dump_proxy_metrics(metric_names),
419 clusters: self.dump_cluster_metrics(metric_names)?,
420 })
421 }
422
423 pub fn dump_proxy_metrics(
424 &mut self,
425 metric_names: &[String],
426 ) -> BTreeMap<String, FilteredMetrics> {
427 self.proxy_metrics.to_filtered_metrics(metric_names)
428 }
429
430 pub fn dump_cluster_metrics(
431 &mut self,
432 metric_names: &[String],
433 ) -> Result<BTreeMap<String, ClusterMetrics>, MetricError> {
434 self.cluster_metrics
435 .keys()
436 .map(|cluster_id| {
437 Ok((
438 cluster_id.to_owned(),
439 self.metrics_of_one_cluster(cluster_id, metric_names)?,
440 ))
441 })
442 .collect()
443 }
444
445 fn metrics_of_one_cluster(
446 &self,
447 cluster_id: &str,
448 metric_names: &[String],
449 ) -> Result<ClusterMetrics, MetricError> {
450 let local_cluster_metrics = self
451 .cluster_metrics
452 .get(cluster_id)
453 .ok_or(MetricError::NoMetrics(cluster_id.to_owned()))?;
454
455 let filtered = local_cluster_metrics.to_filtered_metrics(metric_names)?;
456
457 Ok(filtered)
458 }
459
460 fn metrics_of_one_backend(
461 &self,
462 backend_id: &str,
463 metric_names: &[String],
464 ) -> Result<BackendMetrics, MetricError> {
465 for cluster_metrics in self.cluster_metrics.values() {
466 if let Some(backend_metrics) = cluster_metrics
467 .backends
468 .iter()
469 .find(|backend_metrics| backend_metrics.backend_id == backend_id)
470 {
471 return backend_metrics.to_filtered_metrics(metric_names);
472 }
473 }
474
475 Err(MetricError::NoMetrics(format!(
476 "No metric for backend {}",
477 backend_id
478 )))
479 }
480
481 fn query_clusters(
482 &mut self,
483 cluster_ids: &[String],
484 metric_names: &[String],
485 ) -> Result<WorkerMetrics, MetricError> {
486 debug!("Querying cluster with ids: {:?}", cluster_ids);
487 let mut clusters: BTreeMap<String, ClusterMetrics> = BTreeMap::new();
488
489 for cluster_id in cluster_ids {
490 clusters.insert(
491 cluster_id.to_owned(),
492 self.metrics_of_one_cluster(cluster_id, metric_names)?,
493 );
494 }
495
496 trace!("query result: {:#?}", clusters);
497 Ok(WorkerMetrics {
498 proxy: BTreeMap::new(),
499 clusters,
500 })
501 }
502
503 fn query_backends(
504 &mut self,
505 backend_ids: &[String],
506 metric_names: &[String],
507 ) -> Result<WorkerMetrics, MetricError> {
508 let mut clusters: BTreeMap<String, ClusterMetrics> = BTreeMap::new();
509
510 for backend_id in backend_ids {
511 let (cluster_id, cluster) = match self
513 .cluster_metrics
514 .iter()
515 .find(|(_, cluster)| cluster.contains_backend(backend_id))
516 {
517 Some(cluster) => cluster,
518 None => continue,
519 };
520
521 let mut backend_metrics = Vec::new();
522 for backend in &cluster.backends {
523 backend_metrics.push(backend.to_filtered_metrics(metric_names)?);
524 }
525
526 clusters.insert(
527 cluster_id.to_owned(),
528 ClusterMetrics {
529 cluster: BTreeMap::new(),
530 backends: backend_metrics,
531 },
532 );
533 }
534
535 trace!("query result: {:#?}", clusters);
536 Ok(WorkerMetrics {
537 proxy: BTreeMap::new(),
538 clusters,
539 })
540 }
541
542 fn receive_cluster_metric(
543 &mut self,
544 metric_name: &str,
545 cluster_id: &str,
546 metric: MetricValue,
547 ) -> Result<(), MetricError> {
548 if self.disable_cluster_metrics {
549 return Ok(());
550 }
551
552 let local_cluster_metric = self
553 .cluster_metrics
554 .entry(cluster_id.to_owned())
555 .or_default();
556
557 local_cluster_metric.receive_metric(metric_name, metric)
558 }
559
560 fn receive_backend_metric(
561 &mut self,
562 metric_name: &str,
563 cluster_id: &str,
564 backend_id: &str,
565 metric: MetricValue,
566 ) -> Result<(), MetricError> {
567 if self.disable_cluster_metrics {
568 return Ok(());
569 }
570
571 let local_cluster_metric = self
572 .cluster_metrics
573 .entry(cluster_id.to_owned())
574 .or_default();
575
576 local_cluster_metric.receive_backend_metric(metric_name, backend_id, metric)
577 }
578
579 fn receive_proxy_metric(
580 &mut self,
581 metric_name: &str,
582 metric: MetricValue,
583 ) -> Result<(), MetricError> {
584 match self.proxy_metrics.map.get_mut(metric_name) {
585 Some(stored_metric) => stored_metric.update(metric_name, metric),
586 None => {
587 let aggregated_metric = AggregatedMetric::new(metric)?;
588
589 self.proxy_metrics
590 .map
591 .insert(String::from(metric_name), aggregated_metric);
592 }
593 }
594 Ok(())
595 }
596}
597
598impl Subscriber for LocalDrain {
599 fn receive_metric(
600 &mut self,
601 key: &'static str,
602 cluster_id: Option<&str>,
603 backend_id: Option<&str>,
604 metric: MetricValue,
605 ) {
606 trace!(
607 "receiving metric with key {}, cluster_id: {:?}, backend_id: {:?}, metric data: {:?}",
608 key, cluster_id, backend_id, metric
609 );
610
611 let receive_result = match (cluster_id, backend_id) {
612 (Some(cluster_id), Some(backend_id)) => {
613 self.receive_backend_metric(key, cluster_id, backend_id, metric)
614 }
615 (Some(cluster_id), None) => self.receive_cluster_metric(key, cluster_id, metric),
616 (None, _) => self.receive_proxy_metric(key, metric),
617 };
618
619 if let Err(e) = receive_result {
620 error!("Could not receive metric: {}", e);
621 }
622 }
623}
624#[cfg(test)]
625mod tests {
626 use sozu_command::proto::command::{FilteredMetrics, filtered_metrics::Inner};
627
628 use super::*;
629
630 #[test]
631 fn receive_and_yield_backend_metrics() {
632 let mut local_drain = LocalDrain::new("prefix".to_string());
633
634 local_drain.receive_metric(
635 "connections_per_backend",
636 Some("test-cluster"),
637 Some("test-backend-1"),
638 MetricValue::Count(1),
639 );
640 local_drain.receive_metric(
641 "connections_per_backend",
642 Some("test-cluster"),
643 Some("test-backend-1"),
644 MetricValue::Count(1),
645 );
646
647 let mut expected_metrics_1 = BTreeMap::new();
648 expected_metrics_1.insert(
649 "connections_per_backend".to_string(),
650 FilteredMetrics {
651 inner: Some(Inner::Count(2)),
652 },
653 );
654
655 let expected_backend_metrics = BackendMetrics {
656 backend_id: "test-backend-1".to_string(),
657 metrics: expected_metrics_1,
658 };
659
660 assert_eq!(
661 expected_backend_metrics,
662 local_drain
663 .metrics_of_one_backend(
664 "test-backend-1",
665 ["connections_per_backend".to_string()].as_ref(),
666 )
667 .expect("could not query metrics for this backend")
668 )
669 }
670
671 #[test]
672 fn receive_and_yield_cluster_metrics() {
673 let mut local_drain = LocalDrain::new("prefix".to_string());
674 local_drain.receive_metric(
675 "http_errors",
676 Some("test-cluster"),
677 None,
678 MetricValue::Count(1),
679 );
680 local_drain.receive_metric(
681 "http_errors",
682 Some("test-cluster"),
683 None,
684 MetricValue::Count(1),
685 );
686
687 let mut map = BTreeMap::new();
688 map.insert(
689 "http_errors".to_string(),
690 FilteredMetrics {
691 inner: Some(Inner::Count(2)),
692 },
693 );
694 let expected_cluster_metrics = ClusterMetrics {
695 cluster: map,
696 backends: vec![],
697 };
698
699 let returned_cluster_metrics = local_drain
700 .metrics_of_one_cluster("test-cluster", ["http_errors".to_string()].as_ref())
701 .expect("could not query metrics for this cluster");
702
703 assert_eq!(expected_cluster_metrics, returned_cluster_metrics);
704 }
705}