use std::collections::BTreeMap;
use command::{
filtered_metrics::Inner, AggregatedMetrics, BackendMetrics, Bucket, FilteredHistogram,
FilteredMetrics,
};
use prost::UnknownEnumValue;
pub mod command;
pub mod display;
#[derive(thiserror::Error, Debug)]
pub enum DisplayError {
#[error("Could not display content")]
DisplayContent(String),
#[error("Error while parsing response to JSON")]
Json(serde_json::Error),
#[error("got the wrong response content type: {0}")]
WrongResponseType(String),
#[error("Could not format the datetime to ISO 8601")]
DateTime,
#[error("unrecognized protobuf variant: {0}")]
DecodeError(UnknownEnumValue),
}
impl From<command::response_content::ContentType> for command::ResponseContent {
fn from(value: command::response_content::ContentType) -> Self {
Self {
content_type: Some(value),
}
}
}
impl From<command::request::RequestType> for command::Request {
fn from(value: command::request::RequestType) -> Self {
Self {
request_type: Some(value),
}
}
}
impl AggregatedMetrics {
pub fn merge_metrics(&mut self) {
let workers = std::mem::take(&mut self.workers);
for (_worker_id, worker) in workers {
for (metric_name, new_value) in worker.proxy {
if new_value.is_mergeable() {
self.proxying
.entry(metric_name)
.and_modify(|old_value| old_value.merge(&new_value))
.or_insert(new_value);
}
}
for (cluster_id, mut cluster_metrics) in worker.clusters {
for (metric_name, new_value) in cluster_metrics.cluster {
if new_value.is_mergeable() {
let cluster = self.clusters.entry(cluster_id.to_owned()).or_default();
cluster
.cluster
.entry(metric_name)
.and_modify(|old_value| old_value.merge(&new_value))
.or_insert(new_value);
}
}
for backend in cluster_metrics.backends.drain(..) {
for (metric_name, new_value) in backend.metrics {
if new_value.is_mergeable() {
let cluster = self.clusters.entry(cluster_id.to_owned()).or_default();
let found_backend = cluster
.backends
.iter_mut()
.find(|present| &present.backend_id == &backend.backend_id);
if let Some(existing_backend) = found_backend {
let _ = existing_backend
.metrics
.entry(metric_name)
.and_modify(|old_value| old_value.merge(&new_value))
.or_insert(new_value);
} else {
cluster.backends.push(BackendMetrics {
backend_id: backend.backend_id.clone(),
metrics: BTreeMap::from([(metric_name, new_value)]),
});
};
}
}
}
}
}
}
}
impl FilteredMetrics {
pub fn merge(&mut self, right: &Self) {
match (&self.inner, &right.inner) {
(Some(Inner::Gauge(a)), Some(Inner::Gauge(b))) => {
*self = Self {
inner: Some(Inner::Gauge(a + b)),
};
}
(Some(Inner::Count(a)), Some(Inner::Count(b))) => {
*self = Self {
inner: Some(Inner::Count(a + b)),
};
}
(Some(Inner::Histogram(a)), Some(Inner::Histogram(b))) => {
let longest_len = a.buckets.len().max(b.buckets.len());
let buckets = (0..longest_len)
.map(|i| Bucket {
le: (1 << i) - 1, count: a
.buckets
.get(i)
.and_then(|buck| Some(buck.count))
.unwrap_or(0)
+ b.buckets
.get(i)
.and_then(|buck| Some(buck.count))
.unwrap_or(0),
})
.collect();
*self = Self {
inner: Some(Inner::Histogram(FilteredHistogram {
count: a.count + b.count,
sum: a.sum + b.sum,
buckets,
})),
};
}
_ => {}
}
}
fn is_mergeable(&self) -> bool {
match &self.inner {
Some(Inner::Gauge(_)) | Some(Inner::Count(_)) | Some(Inner::Histogram(_)) => true,
Some(Inner::Time(_))
| Some(Inner::Percentiles(_))
| Some(Inner::TimeSerie(_))
| None => false,
}
}
}