use crate::execution::runtime_env::RuntimeEnv;
use crate::execution::MemoryConsumerId;
use crate::physical_plan::metrics::{
BaselineMetrics, Count, ExecutionPlanMetricsSet, Time,
};
use std::sync::Arc;
use std::task::Poll;
use arrow::{error::ArrowError, record_batch::RecordBatch};
#[derive(Debug)]
pub struct MemTrackingMetrics {
id: MemoryConsumerId,
runtime: Option<Arc<RuntimeEnv>>,
metrics: BaselineMetrics,
}
impl MemTrackingMetrics {
pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
let id = MemoryConsumerId::new(partition);
Self {
id,
runtime: None,
metrics: BaselineMetrics::new(metrics, partition),
}
}
pub fn new_with_rt(
metrics: &ExecutionPlanMetricsSet,
partition: usize,
runtime: Arc<RuntimeEnv>,
) -> Self {
let id = MemoryConsumerId::new(partition);
Self {
id,
runtime: Some(runtime),
metrics: BaselineMetrics::new(metrics, partition),
}
}
pub fn elapsed_compute(&self) -> &Time {
self.metrics.elapsed_compute()
}
pub fn mem_used(&self) -> usize {
self.metrics.mem_used().value()
}
pub fn init_mem_used(&self, size: usize) {
self.metrics.mem_used().set(size);
if let Some(rt) = self.runtime.as_ref() {
rt.memory_manager.grow_tracker_usage(size);
}
}
pub fn output_rows(&self) -> &Count {
self.metrics.output_rows()
}
pub fn done(&self) {
self.metrics.done()
}
pub fn record_output(&self, num_rows: usize) {
self.metrics.record_output(num_rows)
}
pub fn record_poll(
&self,
poll: Poll<Option<Result<RecordBatch, ArrowError>>>,
) -> Poll<Option<Result<RecordBatch, ArrowError>>> {
self.metrics.record_poll(poll)
}
}
impl Drop for MemTrackingMetrics {
fn drop(&mut self) {
self.metrics.try_done();
if self.mem_used() != 0 {
if let Some(rt) = self.runtime.as_ref() {
rt.drop_consumer(&self.id, self.mem_used());
}
}
}
}