use std::task::Poll;
use arrow::record_batch::RecordBatch;
use super::{Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, Time, Timestamp};
use crate::error::Result;
#[derive(Debug)]
pub struct BaselineMetrics {
end_time: Timestamp,
elapsed_compute: Time,
spill_count: Count,
spilled_bytes: Count,
mem_used: Gauge,
output_rows: Count,
}
impl BaselineMetrics {
pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
let start_time = MetricBuilder::new(metrics).start_timestamp(partition);
start_time.record();
Self {
end_time: MetricBuilder::new(metrics).end_timestamp(partition),
elapsed_compute: MetricBuilder::new(metrics).elapsed_compute(partition),
spill_count: MetricBuilder::new(metrics).spill_count(partition),
spilled_bytes: MetricBuilder::new(metrics).spilled_bytes(partition),
mem_used: MetricBuilder::new(metrics).mem_used(partition),
output_rows: MetricBuilder::new(metrics).output_rows(partition),
}
}
pub fn elapsed_compute(&self) -> &Time {
&self.elapsed_compute
}
pub fn spill_count(&self) -> &Count {
&self.spill_count
}
pub fn spilled_bytes(&self) -> &Count {
&self.spilled_bytes
}
pub fn mem_used(&self) -> &Gauge {
&self.mem_used
}
pub fn record_spill(&self, spilled_bytes: usize) {
self.spill_count.add(1);
self.spilled_bytes.add(spilled_bytes);
}
pub fn output_rows(&self) -> &Count {
&self.output_rows
}
pub fn done(&self) {
self.end_time.record()
}
pub fn record_output(&self, num_rows: usize) {
self.output_rows.add(num_rows);
}
pub fn try_done(&self) {
if self.end_time.value().is_none() {
self.end_time.record()
}
}
pub fn record_poll(
&self,
poll: Poll<Option<Result<RecordBatch>>>,
) -> Poll<Option<Result<RecordBatch>>> {
if let Poll::Ready(maybe_batch) = &poll {
match maybe_batch {
Some(Ok(batch)) => {
batch.record_output(self);
}
Some(Err(_)) => self.done(),
None => self.done(),
}
}
poll
}
}
impl Drop for BaselineMetrics {
fn drop(&mut self) {
self.try_done()
}
}
pub trait RecordOutput {
fn record_output(self, bm: &BaselineMetrics) -> Self;
}
impl RecordOutput for usize {
fn record_output(self, bm: &BaselineMetrics) -> Self {
bm.record_output(self);
self
}
}
impl RecordOutput for RecordBatch {
fn record_output(self, bm: &BaselineMetrics) -> Self {
bm.record_output(self.num_rows());
self
}
}
impl RecordOutput for &RecordBatch {
fn record_output(self, bm: &BaselineMetrics) -> Self {
bm.record_output(self.num_rows());
self
}
}
impl RecordOutput for Option<&RecordBatch> {
fn record_output(self, bm: &BaselineMetrics) -> Self {
if let Some(record_batch) = &self {
record_batch.record_output(bm);
}
self
}
}
impl RecordOutput for Option<RecordBatch> {
fn record_output(self, bm: &BaselineMetrics) -> Self {
if let Some(record_batch) = &self {
record_batch.record_output(bm);
}
self
}
}
impl RecordOutput for Result<RecordBatch> {
fn record_output(self, bm: &BaselineMetrics) -> Self {
if let Ok(record_batch) = &self {
record_batch.record_output(bm);
}
self
}
}