use std::task::Poll;
use arrow::record_batch::RecordBatch;
use datafusion_common::{Result, utils::memory::get_record_batch_memory_size};
use super::{Count, ExecutionPlanMetricsSet, MetricBuilder, Time, Timestamp};
#[derive(Debug, Clone)]
pub struct BaselineMetrics {
end_time: Timestamp,
elapsed_compute: Time,
output_rows: Count,
output_bytes: Count,
output_batches: Count,
}
impl BaselineMetrics {
pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
let start_time = MetricBuilder::new(metrics).start_timestamp(partition);
start_time.record();
Self {
end_time: MetricBuilder::new(metrics)
.with_type(super::MetricType::SUMMARY)
.end_timestamp(partition),
elapsed_compute: MetricBuilder::new(metrics)
.with_type(super::MetricType::SUMMARY)
.elapsed_compute(partition),
output_rows: MetricBuilder::new(metrics)
.with_type(super::MetricType::SUMMARY)
.output_rows(partition),
output_bytes: MetricBuilder::new(metrics)
.with_type(super::MetricType::SUMMARY)
.output_bytes(partition),
output_batches: MetricBuilder::new(metrics)
.with_type(super::MetricType::DEV)
.output_batches(partition),
}
}
pub fn intermediate(&self) -> BaselineMetrics {
Self {
end_time: Default::default(),
elapsed_compute: self.elapsed_compute.clone(),
output_rows: Default::default(),
output_bytes: Default::default(),
output_batches: Default::default(),
}
}
pub fn elapsed_compute(&self) -> &Time {
&self.elapsed_compute
}
pub fn output_rows(&self) -> &Count {
&self.output_rows
}
pub fn output_batches(&self) -> &Count {
&self.output_batches
}
pub fn done(&self) {
self.end_time.record()
}
pub fn record_output(&self, num_rows: usize) {
self.output_rows.add(num_rows);
}
pub fn try_done(&self) {
if self.end_time.value().is_none() {
self.end_time.record()
}
}
pub fn record_poll(
&self,
poll: Poll<Option<Result<RecordBatch>>>,
) -> Poll<Option<Result<RecordBatch>>> {
if let Poll::Ready(maybe_batch) = &poll {
match maybe_batch {
Some(Ok(batch)) => {
batch.record_output(self);
}
Some(Err(_)) => self.done(),
None => self.done(),
}
}
poll
}
}
impl Drop for BaselineMetrics {
fn drop(&mut self) {
self.try_done()
}
}
#[derive(Debug, Clone)]
pub struct SpillMetrics {
pub spill_file_count: Count,
pub spilled_bytes: Count,
pub spilled_rows: Count,
}
impl SpillMetrics {
pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
Self {
spill_file_count: MetricBuilder::new(metrics).spill_count(partition),
spilled_bytes: MetricBuilder::new(metrics).spilled_bytes(partition),
spilled_rows: MetricBuilder::new(metrics).spilled_rows(partition),
}
}
}
#[derive(Debug, Clone)]
pub struct SplitMetrics {
pub batches_split: Count,
}
impl SplitMetrics {
pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
Self {
batches_split: MetricBuilder::new(metrics)
.counter("batches_split", partition),
}
}
}
pub trait RecordOutput {
fn record_output(self, bm: &BaselineMetrics) -> Self;
}
impl RecordOutput for usize {
fn record_output(self, bm: &BaselineMetrics) -> Self {
bm.record_output(self);
self
}
}
impl RecordOutput for RecordBatch {
fn record_output(self, bm: &BaselineMetrics) -> Self {
bm.record_output(self.num_rows());
let n_bytes = get_record_batch_memory_size(&self);
bm.output_bytes.add(n_bytes);
bm.output_batches.add(1);
self
}
}
impl RecordOutput for &RecordBatch {
fn record_output(self, bm: &BaselineMetrics) -> Self {
bm.record_output(self.num_rows());
let n_bytes = get_record_batch_memory_size(self);
bm.output_bytes.add(n_bytes);
bm.output_batches.add(1);
self
}
}
impl RecordOutput for Option<&RecordBatch> {
fn record_output(self, bm: &BaselineMetrics) -> Self {
if let Some(record_batch) = &self {
record_batch.record_output(bm);
}
self
}
}
impl RecordOutput for Option<RecordBatch> {
fn record_output(self, bm: &BaselineMetrics) -> Self {
if let Some(record_batch) = &self {
record_batch.record_output(bm);
}
self
}
}
impl RecordOutput for Result<RecordBatch> {
fn record_output(self, bm: &BaselineMetrics) -> Self {
if let Ok(record_batch) = &self {
record_batch.record_output(bm);
}
self
}
}