use datafusion::physical_plan::metrics::MetricsSet;
use std::sync::Arc;
use datafusion::error::Result;
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_plan::{DisplayAs, DisplayFormatType, PlanProperties};
use delegate::delegate;
use std::fmt::{Debug, Formatter};
pub(crate) struct MetricsWrapperExec {
inner: Arc<dyn ExecutionPlan>,
metrics: MetricsSet,
}
impl MetricsWrapperExec {
pub(crate) fn new(inner: Arc<dyn ExecutionPlan>, metrics: MetricsSet) -> Self {
Self { inner, metrics }
}
#[cfg(test)]
pub(crate) fn inner(&self) -> &Arc<dyn ExecutionPlan> {
&self.inner
}
pub(crate) fn inner_arc(&self) -> Arc<dyn ExecutionPlan> {
Arc::clone(&self.inner)
}
}
impl DisplayAs for MetricsWrapperExec {
fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
self.inner.fmt_as(t, f)
}
}
impl Debug for MetricsWrapperExec {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
write!(f, "MetricsWrapperExec ({:?})", self.inner)
}
}
impl ExecutionPlan for MetricsWrapperExec {
delegate! {
to self.inner {
fn name(&self) -> &str;
fn properties(&self) -> &Arc<PlanProperties>;
}
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
self.inner.children()
}
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(MetricsWrapperExec {
inner: Arc::clone(&self.inner).with_new_children(children.clone())?,
metrics: self.metrics.clone(),
}))
}
fn execute(
&self,
_partition: usize,
_contex: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
unimplemented!("MetricsWrapperExec does not implement execute")
}
fn metrics(&self) -> Option<MetricsSet> {
match self.inner.metrics() {
None => Some(self.metrics.clone()),
Some(mut all_metrics) => {
for wrapped in self.metrics.iter() {
all_metrics.push(Arc::clone(wrapped));
}
Some(all_metrics)
}
}
}
fn downcast_delegate(&self) -> Option<&dyn ExecutionPlan> {
Some(self.inner.as_ref())
}
}