use crate::{
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
RecordBatchStream, SendableRecordBatchStream, Statistics, common,
execution_plan::Boundedness,
};
use crate::{
execution_plan::EmissionType,
stream::{RecordBatchReceiverStream, RecordBatchStreamAdapter},
};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::{
any::Any,
pin::Pin,
sync::{Arc, Weak},
task::{Context, Poll},
};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use datafusion_common::{DataFusionError, Result, internal_err};
use datafusion_execution::TaskContext;
use datafusion_physical_expr::EquivalenceProperties;
use futures::Stream;
use tokio::sync::Barrier;
#[derive(Debug, Default, Clone)]
pub struct BatchIndex {
inner: Arc<std::sync::Mutex<usize>>,
}
impl BatchIndex {
pub fn value(&self) -> usize {
let inner = self.inner.lock().unwrap();
*inner
}
pub fn incr(&self) {
let mut inner = self.inner.lock().unwrap();
*inner += 1;
}
}
#[derive(Debug, Default)]
pub struct TestStream {
data: Vec<RecordBatch>,
index: BatchIndex,
}
impl TestStream {
pub fn new(data: Vec<RecordBatch>) -> Self {
Self {
data,
..Default::default()
}
}
pub fn index(&self) -> BatchIndex {
self.index.clone()
}
}
impl Stream for TestStream {
type Item = Result<RecordBatch>;
fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let next_batch = self.index.value();
Poll::Ready(if next_batch < self.data.len() {
let next_batch = self.index.value();
self.index.incr();
Some(Ok(self.data[next_batch].clone()))
} else {
None
})
}
fn size_hint(&self) -> (usize, Option<usize>) {
(self.data.len(), Some(self.data.len()))
}
}
impl RecordBatchStream for TestStream {
fn schema(&self) -> SchemaRef {
self.data[0].schema()
}
}
#[derive(Debug)]
pub struct MockExec {
data: Vec<Result<RecordBatch>>,
schema: SchemaRef,
use_task: bool,
cache: Arc<PlanProperties>,
}
impl MockExec {
pub fn new(data: Vec<Result<RecordBatch>>, schema: SchemaRef) -> Self {
let cache = Self::compute_properties(Arc::clone(&schema));
Self {
data,
schema,
use_task: true,
cache: Arc::new(cache),
}
}
pub fn with_use_task(mut self, use_task: bool) -> Self {
self.use_task = use_task;
self
}
fn compute_properties(schema: SchemaRef) -> PlanProperties {
PlanProperties::new(
EquivalenceProperties::new(schema),
Partitioning::UnknownPartitioning(1),
EmissionType::Incremental,
Boundedness::Bounded,
)
}
}
impl DisplayAs for MockExec {
fn fmt_as(
&self,
t: DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(f, "MockExec")
}
DisplayFormatType::TreeRender => {
write!(f, "")
}
}
}
}
impl ExecutionPlan for MockExec {
fn name(&self) -> &'static str {
Self::static_name()
}
fn as_any(&self) -> &dyn Any {
self
}
fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![]
}
fn with_new_children(
self: Arc<Self>,
_: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
unimplemented!()
}
fn execute(
&self,
partition: usize,
_context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
assert_eq!(partition, 0);
let data: Vec<_> = self
.data
.iter()
.map(|r| match r {
Ok(batch) => Ok(batch.clone()),
Err(e) => Err(clone_error(e)),
})
.collect();
if self.use_task {
let mut builder = RecordBatchReceiverStream::builder(self.schema(), 2);
let tx = builder.tx();
builder.spawn(async move {
for batch in data {
println!("Sending batch via delayed stream");
if let Err(e) = tx.send(batch).await {
println!("ERROR batch via delayed stream: {e}");
}
}
Ok(())
});
Ok(builder.build())
} else {
let stream = futures::stream::iter(data);
Ok(Box::pin(RecordBatchStreamAdapter::new(
self.schema(),
stream,
)))
}
}
fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
if partition.is_some() {
return Ok(Statistics::new_unknown(&self.schema));
}
let data: Result<Vec<_>> = self
.data
.iter()
.map(|r| match r {
Ok(batch) => Ok(batch.clone()),
Err(e) => Err(clone_error(e)),
})
.collect();
let data = data?;
Ok(common::compute_record_batch_statistics(
&[data],
&self.schema,
None,
))
}
}
fn clone_error(e: &DataFusionError) -> DataFusionError {
use DataFusionError::*;
match e {
Execution(msg) => Execution(msg.to_string()),
_ => unimplemented!(),
}
}
#[derive(Debug)]
pub struct BarrierExec {
data: Vec<Vec<RecordBatch>>,
schema: SchemaRef,
start_data_barrier: Option<Arc<Barrier>>,
finish_barrier: Option<Arc<(Barrier, AtomicUsize)>>,
cache: Arc<PlanProperties>,
log: bool,
}
impl BarrierExec {
pub fn new(data: Vec<Vec<RecordBatch>>, schema: SchemaRef) -> Self {
let barrier = Some(Arc::new(Barrier::new(data.len() + 1)));
let cache = Self::compute_properties(Arc::clone(&schema), &data);
Self {
data,
schema,
start_data_barrier: barrier,
cache: Arc::new(cache),
finish_barrier: None,
log: true,
}
}
pub fn with_log(mut self, log: bool) -> Self {
self.log = log;
self
}
pub fn without_start_barrier(mut self) -> Self {
self.start_data_barrier = None;
self
}
pub fn with_finish_barrier(mut self) -> Self {
let barrier = Arc::new((
Barrier::new(self.data.len() + 1),
AtomicUsize::new(0),
));
self.finish_barrier = Some(barrier);
self
}
pub async fn wait(&self) {
let barrier = &self
.start_data_barrier
.as_ref()
.expect("Must only be called when having a start barrier");
if self.log {
println!("BarrierExec::wait waiting on barrier");
}
barrier.wait().await;
if self.log {
println!("BarrierExec::wait done waiting");
}
}
pub async fn wait_finish(&self) {
let (barrier, _) = &self
.finish_barrier
.as_deref()
.expect("Must only be called when having a finish barrier");
if self.log {
println!("BarrierExec::wait_finish waiting on barrier");
}
barrier.wait().await;
if self.log {
println!("BarrierExec::wait_finish done waiting");
}
}
pub fn is_finish_barrier_reached(&self) -> bool {
let (_, reached_finish) = self
.finish_barrier
.as_deref()
.expect("Must only be called when having finish barrier");
reached_finish.load(Ordering::Relaxed) == self.data.len()
}
fn compute_properties(
schema: SchemaRef,
data: &[Vec<RecordBatch>],
) -> PlanProperties {
PlanProperties::new(
EquivalenceProperties::new(schema),
Partitioning::UnknownPartitioning(data.len()),
EmissionType::Incremental,
Boundedness::Bounded,
)
}
}
impl DisplayAs for BarrierExec {
fn fmt_as(
&self,
t: DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(f, "BarrierExec")
}
DisplayFormatType::TreeRender => {
write!(f, "")
}
}
}
}
impl ExecutionPlan for BarrierExec {
fn name(&self) -> &'static str {
Self::static_name()
}
fn as_any(&self) -> &dyn Any {
self
}
fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
unimplemented!()
}
fn with_new_children(
self: Arc<Self>,
_: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
unimplemented!()
}
fn execute(
&self,
partition: usize,
_context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
assert!(partition < self.data.len());
let mut builder = RecordBatchReceiverStream::builder(self.schema(), 2);
let data = self.data[partition].clone();
let start_barrier = self.start_data_barrier.as_ref().map(Arc::clone);
let finish_barrier = self.finish_barrier.as_ref().map(Arc::clone);
let log = self.log;
let tx = builder.tx();
builder.spawn(async move {
if let Some(barrier) = start_barrier {
if log {
println!("Partition {partition} waiting on barrier");
}
barrier.wait().await;
}
for batch in data {
if log {
println!("Partition {partition} sending batch");
}
if let Err(e) = tx.send(Ok(batch)).await {
println!("ERROR batch via barrier stream stream: {e}");
}
}
if let Some((barrier, reached_finish)) = finish_barrier.as_deref() {
if log {
println!("Partition {partition} waiting on finish barrier");
}
reached_finish.fetch_add(1, Ordering::Relaxed);
barrier.wait().await;
}
Ok(())
});
Ok(builder.build())
}
fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
if partition.is_some() {
return Ok(Statistics::new_unknown(&self.schema));
}
Ok(common::compute_record_batch_statistics(
&self.data,
&self.schema,
None,
))
}
}
#[derive(Debug)]
pub struct ErrorExec {
cache: Arc<PlanProperties>,
}
impl Default for ErrorExec {
fn default() -> Self {
Self::new()
}
}
impl ErrorExec {
pub fn new() -> Self {
let schema = Arc::new(Schema::new(vec![Field::new(
"dummy",
DataType::Int64,
true,
)]));
let cache = Self::compute_properties(schema);
Self {
cache: Arc::new(cache),
}
}
fn compute_properties(schema: SchemaRef) -> PlanProperties {
PlanProperties::new(
EquivalenceProperties::new(schema),
Partitioning::UnknownPartitioning(1),
EmissionType::Incremental,
Boundedness::Bounded,
)
}
}
impl DisplayAs for ErrorExec {
fn fmt_as(
&self,
t: DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(f, "ErrorExec")
}
DisplayFormatType::TreeRender => {
write!(f, "")
}
}
}
}
impl ExecutionPlan for ErrorExec {
fn name(&self) -> &'static str {
Self::static_name()
}
fn as_any(&self) -> &dyn Any {
self
}
fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
unimplemented!()
}
fn with_new_children(
self: Arc<Self>,
_: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
unimplemented!()
}
fn execute(
&self,
partition: usize,
_context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
internal_err!("ErrorExec, unsurprisingly, errored in partition {partition}")
}
}
#[derive(Debug, Clone)]
pub struct StatisticsExec {
stats: Statistics,
schema: Arc<Schema>,
cache: Arc<PlanProperties>,
}
impl StatisticsExec {
pub fn new(stats: Statistics, schema: Schema) -> Self {
assert_eq!(
stats.column_statistics.len(),
schema.fields().len(),
"if defined, the column statistics vector length should be the number of fields"
);
let cache = Self::compute_properties(Arc::new(schema.clone()));
Self {
stats,
schema: Arc::new(schema),
cache: Arc::new(cache),
}
}
fn compute_properties(schema: SchemaRef) -> PlanProperties {
PlanProperties::new(
EquivalenceProperties::new(schema),
Partitioning::UnknownPartitioning(2),
EmissionType::Incremental,
Boundedness::Bounded,
)
}
}
impl DisplayAs for StatisticsExec {
fn fmt_as(
&self,
t: DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(
f,
"StatisticsExec: col_count={}, row_count={:?}",
self.schema.fields().len(),
self.stats.num_rows,
)
}
DisplayFormatType::TreeRender => {
write!(f, "")
}
}
}
}
impl ExecutionPlan for StatisticsExec {
fn name(&self) -> &'static str {
Self::static_name()
}
fn as_any(&self) -> &dyn Any {
self
}
fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![]
}
fn with_new_children(
self: Arc<Self>,
_: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(self)
}
fn execute(
&self,
_partition: usize,
_context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
unimplemented!("This plan only serves for testing statistics")
}
fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
Ok(if partition.is_some() {
Statistics::new_unknown(&self.schema)
} else {
self.stats.clone()
})
}
}
#[derive(Debug)]
pub struct BlockingExec {
schema: SchemaRef,
refs: Arc<()>,
cache: Arc<PlanProperties>,
}
impl BlockingExec {
pub fn new(schema: SchemaRef, n_partitions: usize) -> Self {
let cache = Self::compute_properties(Arc::clone(&schema), n_partitions);
Self {
schema,
refs: Default::default(),
cache: Arc::new(cache),
}
}
pub fn refs(&self) -> Weak<()> {
Arc::downgrade(&self.refs)
}
fn compute_properties(schema: SchemaRef, n_partitions: usize) -> PlanProperties {
PlanProperties::new(
EquivalenceProperties::new(schema),
Partitioning::UnknownPartitioning(n_partitions),
EmissionType::Incremental,
Boundedness::Bounded,
)
}
}
impl DisplayAs for BlockingExec {
fn fmt_as(
&self,
t: DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(f, "BlockingExec",)
}
DisplayFormatType::TreeRender => {
write!(f, "")
}
}
}
}
impl ExecutionPlan for BlockingExec {
fn name(&self) -> &'static str {
Self::static_name()
}
fn as_any(&self) -> &dyn Any {
self
}
fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![]
}
fn with_new_children(
self: Arc<Self>,
_: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
internal_err!("Children cannot be replaced in {self:?}")
}
fn execute(
&self,
_partition: usize,
_context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
Ok(Box::pin(BlockingStream {
schema: Arc::clone(&self.schema),
_refs: Arc::clone(&self.refs),
}))
}
}
#[derive(Debug)]
pub struct BlockingStream {
schema: SchemaRef,
_refs: Arc<()>,
}
impl Stream for BlockingStream {
type Item = Result<RecordBatch>;
fn poll_next(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
Poll::Pending
}
}
impl RecordBatchStream for BlockingStream {
fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}
}
pub async fn assert_strong_count_converges_to_zero<T>(refs: Weak<T>) {
tokio::time::timeout(std::time::Duration::from_secs(10), async {
loop {
if dbg!(Weak::strong_count(&refs)) == 0 {
break;
}
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}
})
.await
.unwrap();
}
#[derive(Debug)]
pub struct PanicExec {
schema: SchemaRef,
batches_until_panics: Vec<usize>,
cache: Arc<PlanProperties>,
}
impl PanicExec {
pub fn new(schema: SchemaRef, n_partitions: usize) -> Self {
let batches_until_panics = vec![0; n_partitions];
let cache = Self::compute_properties(Arc::clone(&schema), &batches_until_panics);
Self {
schema,
batches_until_panics,
cache: Arc::new(cache),
}
}
pub fn with_partition_panic(mut self, partition: usize, count: usize) -> Self {
self.batches_until_panics[partition] = count;
self
}
fn compute_properties(
schema: SchemaRef,
batches_until_panics: &[usize],
) -> PlanProperties {
let num_partitions = batches_until_panics.len();
PlanProperties::new(
EquivalenceProperties::new(schema),
Partitioning::UnknownPartitioning(num_partitions),
EmissionType::Incremental,
Boundedness::Bounded,
)
}
}
impl DisplayAs for PanicExec {
fn fmt_as(
&self,
t: DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(f, "PanicExec",)
}
DisplayFormatType::TreeRender => {
write!(f, "")
}
}
}
}
impl ExecutionPlan for PanicExec {
fn name(&self) -> &'static str {
Self::static_name()
}
fn as_any(&self) -> &dyn Any {
self
}
fn properties(&self) -> &Arc<PlanProperties> {
&self.cache
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![]
}
fn with_new_children(
self: Arc<Self>,
_: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
internal_err!("Children cannot be replaced in {:?}", self)
}
fn execute(
&self,
partition: usize,
_context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
Ok(Box::pin(PanicStream {
partition,
batches_until_panic: self.batches_until_panics[partition],
schema: Arc::clone(&self.schema),
ready: false,
}))
}
}
#[derive(Debug)]
struct PanicStream {
partition: usize,
batches_until_panic: usize,
schema: SchemaRef,
ready: bool,
}
impl Stream for PanicStream {
type Item = Result<RecordBatch>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
if self.batches_until_panic > 0 {
if self.ready {
self.batches_until_panic -= 1;
self.ready = false;
let batch = RecordBatch::new_empty(Arc::clone(&self.schema));
return Poll::Ready(Some(Ok(batch)));
} else {
self.ready = true;
cx.waker().wake_by_ref();
return Poll::Pending;
}
}
panic!("PanickingStream did panic: {}", self.partition)
}
}
impl RecordBatchStream for PanicStream {
fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}
}