use datafusion::arrow::array::RecordBatch;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream};
use datafusion::physical_plan::ExecutionPlan;
use futures::Stream;
use pin_project::pin_project;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use tracing::{Span, field};
pub(crate) struct MetricsRecorder {
execution_plan: Arc<dyn ExecutionPlan>,
span: Span,
}
impl MetricsRecorder {
pub fn new(execution_plan: Arc<dyn ExecutionPlan>, span: Span) -> Self {
Self {
execution_plan,
span,
}
}
}
impl Drop for MetricsRecorder {
fn drop(&mut self) {
let Some(metrics) = self.execution_plan.metrics() else {
return;
};
for metric in metrics.aggregate_by_name().iter() {
self.span.record(
format!("datafusion.metrics.{}", metric.value().name()).as_str(),
field::display(metric.value()),
);
}
}
}
#[pin_project]
pub(crate) struct MetricsRecordingStream {
#[pin] inner: SendableRecordBatchStream,
metrics_recorder: Arc<MetricsRecorder>,
}
impl MetricsRecordingStream {
pub fn new(
inner: SendableRecordBatchStream,
metrics_recorder: Arc<MetricsRecorder>,
) -> Self {
Self {
inner,
metrics_recorder,
}
}
}
impl Stream for MetricsRecordingStream {
type Item = datafusion::common::Result<RecordBatch>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.project().inner.poll_next(cx)
}
}
impl RecordBatchStream for MetricsRecordingStream {
fn schema(&self) -> SchemaRef {
self.inner.schema()
}
}