use crate::utils::DefaultDisplay;
use datafusion::arrow::array::RecordBatch;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream};
use datafusion::physical_plan::ExecutionPlan;
use futures::Stream;
use pin_project::pin_project;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use tracing::{Span, field};
pub(crate) struct NodeRecorder {
execution_plan: Arc<dyn ExecutionPlan>,
span: Span,
}
impl NodeRecorder {
pub fn new(execution_plan: Arc<dyn ExecutionPlan>, span: Span) -> Self {
Self {
execution_plan,
span,
}
}
}
impl Drop for NodeRecorder {
fn drop(&mut self) {
self.span.record(
"datafusion.node",
field::display(DefaultDisplay(self.execution_plan.as_ref())),
);
}
}
#[pin_project]
pub(crate) struct NodeRecordingStream {
#[pin]
inner: SendableRecordBatchStream,
_recorder: Arc<NodeRecorder>,
}
impl NodeRecordingStream {
pub fn new(inner: SendableRecordBatchStream, recorder: Arc<NodeRecorder>) -> Self {
Self {
inner,
_recorder: recorder,
}
}
}
impl Stream for NodeRecordingStream {
type Item = datafusion::common::Result<RecordBatch>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.project().inner.poll_next(cx)
}
}
impl RecordBatchStream for NodeRecordingStream {
fn schema(&self) -> SchemaRef {
self.inner.schema()
}
}