use crate::physical_plan::metrics::{
BaselineMetrics, Count, ExecutionPlanMetricsSet, Time,
};
use std::sync::Arc;
use std::task::Poll;
use crate::error::Result;
use crate::execution::memory_pool::{MemoryConsumer, MemoryPool, MemoryReservation};
use arrow::record_batch::RecordBatch;
#[derive(Debug)]
pub struct MemTrackingMetrics {
reservation: MemoryReservation,
metrics: BaselineMetrics,
}
impl MemTrackingMetrics {
pub fn new(
metrics: &ExecutionPlanMetricsSet,
pool: &Arc<dyn MemoryPool>,
partition: usize,
) -> Self {
let reservation = MemoryConsumer::new(format!("MemTrackingMetrics[{partition}]"))
.register(pool);
Self {
reservation,
metrics: BaselineMetrics::new(metrics, partition),
}
}
pub fn elapsed_compute(&self) -> &Time {
self.metrics.elapsed_compute()
}
pub fn mem_used(&self) -> usize {
self.metrics.mem_used().value()
}
pub fn init_mem_used(&mut self, size: usize) {
self.metrics.mem_used().set(size);
self.reservation.resize(size)
}
pub fn output_rows(&self) -> &Count {
self.metrics.output_rows()
}
pub fn done(&self) {
self.metrics.done()
}
pub fn record_output(&self, num_rows: usize) {
self.metrics.record_output(num_rows)
}
pub fn record_poll(
&self,
poll: Poll<Option<Result<RecordBatch>>>,
) -> Poll<Option<Result<RecordBatch>>> {
self.metrics.record_poll(poll)
}
}