use std::{borrow::Cow, sync::Arc};
use crate::metrics::{
MetricType,
value::{PruningMetrics, RatioMergeStrategy, RatioMetrics},
};
use super::{
Count, ExecutionPlanMetricsSet, Gauge, Label, Metric, MetricValue, Time, Timestamp,
};
pub struct MetricBuilder<'a> {
metrics: &'a ExecutionPlanMetricsSet,
partition: Option<usize>,
labels: Vec<Label>,
metric_type: MetricType,
}
impl<'a> MetricBuilder<'a> {
pub fn new(metrics: &'a ExecutionPlanMetricsSet) -> Self {
Self {
metrics,
partition: None,
labels: vec![],
metric_type: MetricType::DEV,
}
}
pub fn with_label(mut self, label: Label) -> Self {
self.labels.push(label);
self
}
pub fn with_type(mut self, metric_type: MetricType) -> Self {
self.metric_type = metric_type;
self
}
pub fn with_new_label(
self,
name: impl Into<Cow<'static, str>>,
value: impl Into<Cow<'static, str>>,
) -> Self {
self.with_label(Label::new(name.into(), value.into()))
}
pub fn with_partition(mut self, partition: usize) -> Self {
self.partition = Some(partition);
self
}
pub fn build(self, value: MetricValue) {
let Self {
labels,
partition,
metrics,
metric_type,
} = self;
let metric = Arc::new(
Metric::new_with_labels(value, partition, labels).with_type(metric_type),
);
metrics.register(metric);
}
pub fn output_rows(self, partition: usize) -> Count {
let count = Count::new();
self.with_partition(partition)
.build(MetricValue::OutputRows(count.clone()));
count
}
pub fn spill_count(self, partition: usize) -> Count {
let count = Count::new();
self.with_partition(partition)
.build(MetricValue::SpillCount(count.clone()));
count
}
pub fn spilled_bytes(self, partition: usize) -> Count {
let count = Count::new();
self.with_partition(partition)
.build(MetricValue::SpilledBytes(count.clone()));
count
}
pub fn spilled_rows(self, partition: usize) -> Count {
let count = Count::new();
self.with_partition(partition)
.build(MetricValue::SpilledRows(count.clone()));
count
}
pub fn output_bytes(self, partition: usize) -> Count {
let count = Count::new();
self.with_partition(partition)
.build(MetricValue::OutputBytes(count.clone()));
count
}
pub fn output_batches(self, partition: usize) -> Count {
let count = Count::new();
self.with_partition(partition)
.build(MetricValue::OutputBatches(count.clone()));
count
}
pub fn mem_used(self, partition: usize) -> Gauge {
let gauge = Gauge::new();
self.with_partition(partition)
.build(MetricValue::CurrentMemoryUsage(gauge.clone()));
gauge
}
pub fn counter(
self,
counter_name: impl Into<Cow<'static, str>>,
partition: usize,
) -> Count {
self.with_partition(partition).global_counter(counter_name)
}
pub fn gauge(
self,
gauge_name: impl Into<Cow<'static, str>>,
partition: usize,
) -> Gauge {
self.with_partition(partition).global_gauge(gauge_name)
}
pub fn global_counter(self, counter_name: impl Into<Cow<'static, str>>) -> Count {
let count = Count::new();
self.build(MetricValue::Count {
name: counter_name.into(),
count: count.clone(),
});
count
}
pub fn global_gauge(self, gauge_name: impl Into<Cow<'static, str>>) -> Gauge {
let gauge = Gauge::new();
self.build(MetricValue::Gauge {
name: gauge_name.into(),
gauge: gauge.clone(),
});
gauge
}
pub fn elapsed_compute(self, partition: usize) -> Time {
let time = Time::new();
self.with_partition(partition)
.build(MetricValue::ElapsedCompute(time.clone()));
time
}
pub fn subset_time(
self,
subset_name: impl Into<Cow<'static, str>>,
partition: usize,
) -> Time {
let time = Time::new();
self.with_partition(partition).build(MetricValue::Time {
name: subset_name.into(),
time: time.clone(),
});
time
}
pub fn start_timestamp(self, partition: usize) -> Timestamp {
let timestamp = Timestamp::new();
self.with_partition(partition)
.build(MetricValue::StartTimestamp(timestamp.clone()));
timestamp
}
pub fn end_timestamp(self, partition: usize) -> Timestamp {
let timestamp = Timestamp::new();
self.with_partition(partition)
.build(MetricValue::EndTimestamp(timestamp.clone()));
timestamp
}
pub fn pruning_metrics(
self,
name: impl Into<Cow<'static, str>>,
partition: usize,
) -> PruningMetrics {
let pruning_metrics = PruningMetrics::new();
self.with_partition(partition)
.build(MetricValue::PruningMetrics {
name: name.into(),
pruning_metrics: pruning_metrics.clone(),
});
pruning_metrics
}
pub fn ratio_metrics(
self,
name: impl Into<Cow<'static, str>>,
partition: usize,
) -> RatioMetrics {
self.ratio_metrics_with_strategy(name, partition, RatioMergeStrategy::default())
}
pub fn ratio_metrics_with_strategy(
self,
name: impl Into<Cow<'static, str>>,
partition: usize,
merge_strategy: RatioMergeStrategy,
) -> RatioMetrics {
let ratio_metrics = RatioMetrics::new().with_merge_strategy(merge_strategy);
self.with_partition(partition).build(MetricValue::Ratio {
name: name.into(),
ratio_metrics: ratio_metrics.clone(),
});
ratio_metrics
}
}