use std::{borrow::Cow, collections::BTreeMap, sync::Arc, task::Poll};
use arrow::record_batch::RecordBatch;
use datafusion_common::{Result, utils::memory::get_record_batch_memory_size};
use super::{
Count, ExecutionPlanMetricsSet, Metric, MetricBuilder, MetricsSet, Time, Timestamp,
};
const OUTPUT_ROWS_SKEW_METRIC_NAME: &str = "output_rows_skew";
#[derive(Debug, Clone)]
pub struct BaselineMetrics {
end_time: Timestamp,
elapsed_compute: Time,
output_rows: Count,
output_bytes: Count,
output_batches: Count,
}
impl BaselineMetrics {
pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
let start_time = MetricBuilder::new(metrics).start_timestamp(partition);
start_time.record();
Self {
end_time: MetricBuilder::new(metrics)
.with_type(super::MetricType::Summary)
.end_timestamp(partition),
elapsed_compute: MetricBuilder::new(metrics)
.with_type(super::MetricType::Summary)
.elapsed_compute(partition),
output_rows: MetricBuilder::new(metrics)
.with_type(super::MetricType::Summary)
.output_rows(partition),
output_bytes: MetricBuilder::new(metrics)
.with_type(super::MetricType::Summary)
.output_bytes(partition),
output_batches: MetricBuilder::new(metrics)
.with_type(super::MetricType::Dev)
.output_batches(partition),
}
}
pub fn intermediate(&self) -> BaselineMetrics {
Self {
end_time: Default::default(),
elapsed_compute: self.elapsed_compute.clone(),
output_rows: Default::default(),
output_bytes: Default::default(),
output_batches: Default::default(),
}
}
pub fn elapsed_compute(&self) -> &Time {
&self.elapsed_compute
}
pub fn output_rows(&self) -> &Count {
&self.output_rows
}
pub fn output_batches(&self) -> &Count {
&self.output_batches
}
pub fn output_rows_skew_metric(metrics: &MetricsSet) -> Option<Arc<Metric>> {
let output_rows = metrics
.iter()
.filter_map(|metric| match (metric.partition(), metric.value()) {
(Some(partition), super::MetricValue::OutputRows(count)) => {
Some((partition, count.value() as u128))
}
_ => None,
})
.fold(
BTreeMap::<usize, u128>::new(),
|mut output_rows, (partition, rows)| {
*output_rows.entry(partition).or_default() += rows;
output_rows
},
)
.into_values()
.collect::<Vec<_>>();
if output_rows.is_empty() {
return None;
}
let ratio_metrics = super::RatioMetrics::new().with_display_raw_values(false);
if let Some(score) = output_rows_skew_score(&output_rows) {
ratio_metrics.set_part((score * 10_000.0).round() as usize);
ratio_metrics.set_total(10_000);
}
Some(Arc::new(
Metric::new(
super::MetricValue::Ratio {
name: Cow::Borrowed(OUTPUT_ROWS_SKEW_METRIC_NAME),
ratio_metrics,
},
None,
)
.with_type(super::MetricType::Dev),
))
}
pub fn done(&self) {
self.end_time.record()
}
pub fn record_output(&self, num_rows: usize) {
self.output_rows.add(num_rows);
}
pub fn try_done(&self) {
if self.end_time.value().is_none() {
self.end_time.record()
}
}
pub fn record_poll(
&self,
poll: Poll<Option<Result<RecordBatch>>>,
) -> Poll<Option<Result<RecordBatch>>> {
if let Poll::Ready(maybe_batch) = &poll {
match maybe_batch {
Some(Ok(batch)) => {
batch.record_output(self);
}
Some(Err(_)) => self.done(),
None => self.done(),
}
}
poll
}
}
impl Drop for BaselineMetrics {
fn drop(&mut self) {
self.try_done()
}
}
fn output_rows_skew_score(output_rows: &[u128]) -> Option<f64> {
if output_rows.is_empty() {
return None;
}
let partition_count = output_rows.len();
if partition_count == 1 {
return Some(0.0);
}
let (total_rows, sum_of_squares) =
output_rows
.iter()
.fold((0.0, 0.0), |(total_rows, sum_of_squares), rows| {
let rows = *rows as f64;
(total_rows + rows, sum_of_squares + rows.powi(2))
});
if total_rows == 0.0 {
return None;
}
if sum_of_squares == 0.0 {
return None;
}
let effective_parallelism = total_rows.powi(2) / sum_of_squares;
let balanced_score = (effective_parallelism - 1.0) / (partition_count as f64 - 1.0);
Some((1.0 - balanced_score).clamp(0.0, 1.0))
}
#[derive(Debug, Clone)]
pub struct SpillMetrics {
pub spill_file_count: Count,
pub spilled_bytes: Count,
pub spilled_rows: Count,
}
impl SpillMetrics {
pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
Self {
spill_file_count: MetricBuilder::new(metrics).spill_count(partition),
spilled_bytes: MetricBuilder::new(metrics).spilled_bytes(partition),
spilled_rows: MetricBuilder::new(metrics).spilled_rows(partition),
}
}
}
#[derive(Debug, Clone)]
pub struct SplitMetrics {
pub batches_split: Count,
}
impl SplitMetrics {
pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
Self {
batches_split: MetricBuilder::new(metrics)
.with_category(super::MetricCategory::Rows)
.counter("batches_split", partition),
}
}
}
pub trait RecordOutput {
fn record_output(self, bm: &BaselineMetrics) -> Self;
}
impl RecordOutput for usize {
fn record_output(self, bm: &BaselineMetrics) -> Self {
bm.record_output(self);
self
}
}
impl RecordOutput for RecordBatch {
fn record_output(self, bm: &BaselineMetrics) -> Self {
bm.record_output(self.num_rows());
let n_bytes = get_record_batch_memory_size(&self);
bm.output_bytes.add(n_bytes);
bm.output_batches.add(1);
self
}
}
impl RecordOutput for &RecordBatch {
fn record_output(self, bm: &BaselineMetrics) -> Self {
bm.record_output(self.num_rows());
let n_bytes = get_record_batch_memory_size(self);
bm.output_bytes.add(n_bytes);
bm.output_batches.add(1);
self
}
}
impl RecordOutput for Option<&RecordBatch> {
fn record_output(self, bm: &BaselineMetrics) -> Self {
if let Some(record_batch) = &self {
record_batch.record_output(bm);
}
self
}
}
impl RecordOutput for Option<RecordBatch> {
fn record_output(self, bm: &BaselineMetrics) -> Self {
if let Some(record_batch) = &self {
record_batch.record_output(bm);
}
self
}
}
impl RecordOutput for Result<RecordBatch> {
fn record_output(self, bm: &BaselineMetrics) -> Self {
if let Ok(record_batch) = &self {
record_batch.record_output(bm);
}
self
}
}