1use crate::{
21 DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
22 RecordBatchStream, SendableRecordBatchStream, Statistics, common,
23 execution_plan::Boundedness,
24};
25use crate::{
26 execution_plan::EmissionType,
27 stream::{RecordBatchReceiverStream, RecordBatchStreamAdapter},
28};
29use std::sync::atomic::{AtomicUsize, Ordering};
30use std::{
31 any::Any,
32 pin::Pin,
33 sync::{Arc, Weak},
34 task::{Context, Poll},
35};
36
37use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
38use arrow::record_batch::RecordBatch;
39use datafusion_common::{DataFusionError, Result, internal_err};
40use datafusion_execution::TaskContext;
41use datafusion_physical_expr::EquivalenceProperties;
42
43use futures::Stream;
44use tokio::sync::Barrier;
45
46#[derive(Debug, Default, Clone)]
48pub struct BatchIndex {
49 inner: Arc<std::sync::Mutex<usize>>,
50}
51
52impl BatchIndex {
53 pub fn value(&self) -> usize {
55 let inner = self.inner.lock().unwrap();
56 *inner
57 }
58
59 pub fn incr(&self) {
61 let mut inner = self.inner.lock().unwrap();
62 *inner += 1;
63 }
64}
65
66#[derive(Debug, Default)]
68pub struct TestStream {
69 data: Vec<RecordBatch>,
71 index: BatchIndex,
73}
74
75impl TestStream {
76 pub fn new(data: Vec<RecordBatch>) -> Self {
79 Self {
80 data,
81 ..Default::default()
82 }
83 }
84
85 pub fn index(&self) -> BatchIndex {
87 self.index.clone()
88 }
89}
90
91impl Stream for TestStream {
92 type Item = Result<RecordBatch>;
93
94 fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
95 let next_batch = self.index.value();
96
97 Poll::Ready(if next_batch < self.data.len() {
98 let next_batch = self.index.value();
99 self.index.incr();
100 Some(Ok(self.data[next_batch].clone()))
101 } else {
102 None
103 })
104 }
105
106 fn size_hint(&self) -> (usize, Option<usize>) {
107 (self.data.len(), Some(self.data.len()))
108 }
109}
110
111impl RecordBatchStream for TestStream {
112 fn schema(&self) -> SchemaRef {
114 self.data[0].schema()
115 }
116}
117
118#[derive(Debug)]
121pub struct MockExec {
122 data: Vec<Result<RecordBatch>>,
124 schema: SchemaRef,
125 use_task: bool,
128 cache: PlanProperties,
129}
130
131impl MockExec {
132 pub fn new(data: Vec<Result<RecordBatch>>, schema: SchemaRef) -> Self {
140 let cache = Self::compute_properties(Arc::clone(&schema));
141 Self {
142 data,
143 schema,
144 use_task: true,
145 cache,
146 }
147 }
148
149 pub fn with_use_task(mut self, use_task: bool) -> Self {
153 self.use_task = use_task;
154 self
155 }
156
157 fn compute_properties(schema: SchemaRef) -> PlanProperties {
159 PlanProperties::new(
160 EquivalenceProperties::new(schema),
161 Partitioning::UnknownPartitioning(1),
162 EmissionType::Incremental,
163 Boundedness::Bounded,
164 )
165 }
166}
167
168impl DisplayAs for MockExec {
169 fn fmt_as(
170 &self,
171 t: DisplayFormatType,
172 f: &mut std::fmt::Formatter,
173 ) -> std::fmt::Result {
174 match t {
175 DisplayFormatType::Default | DisplayFormatType::Verbose => {
176 write!(f, "MockExec")
177 }
178 DisplayFormatType::TreeRender => {
179 write!(f, "")
181 }
182 }
183 }
184}
185
186impl ExecutionPlan for MockExec {
187 fn name(&self) -> &'static str {
188 Self::static_name()
189 }
190
191 fn as_any(&self) -> &dyn Any {
192 self
193 }
194
195 fn properties(&self) -> &PlanProperties {
196 &self.cache
197 }
198
199 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
200 vec![]
201 }
202
203 fn with_new_children(
204 self: Arc<Self>,
205 _: Vec<Arc<dyn ExecutionPlan>>,
206 ) -> Result<Arc<dyn ExecutionPlan>> {
207 unimplemented!()
208 }
209
210 fn execute(
212 &self,
213 partition: usize,
214 _context: Arc<TaskContext>,
215 ) -> Result<SendableRecordBatchStream> {
216 assert_eq!(partition, 0);
217
218 let data: Vec<_> = self
220 .data
221 .iter()
222 .map(|r| match r {
223 Ok(batch) => Ok(batch.clone()),
224 Err(e) => Err(clone_error(e)),
225 })
226 .collect();
227
228 if self.use_task {
229 let mut builder = RecordBatchReceiverStream::builder(self.schema(), 2);
230 let tx = builder.tx();
234 builder.spawn(async move {
235 for batch in data {
236 println!("Sending batch via delayed stream");
237 if let Err(e) = tx.send(batch).await {
238 println!("ERROR batch via delayed stream: {e}");
239 }
240 }
241
242 Ok(())
243 });
244 Ok(builder.build())
246 } else {
247 let stream = futures::stream::iter(data);
249 Ok(Box::pin(RecordBatchStreamAdapter::new(
250 self.schema(),
251 stream,
252 )))
253 }
254 }
255
256 fn statistics(&self) -> Result<Statistics> {
258 self.partition_statistics(None)
259 }
260
261 fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
262 if partition.is_some() {
263 return Ok(Statistics::new_unknown(&self.schema));
264 }
265 let data: Result<Vec<_>> = self
266 .data
267 .iter()
268 .map(|r| match r {
269 Ok(batch) => Ok(batch.clone()),
270 Err(e) => Err(clone_error(e)),
271 })
272 .collect();
273
274 let data = data?;
275
276 Ok(common::compute_record_batch_statistics(
277 &[data],
278 &self.schema,
279 None,
280 ))
281 }
282}
283
284fn clone_error(e: &DataFusionError) -> DataFusionError {
285 use DataFusionError::*;
286 match e {
287 Execution(msg) => Execution(msg.to_string()),
288 _ => unimplemented!(),
289 }
290}
291
292#[derive(Debug)]
295pub struct BarrierExec {
296 data: Vec<Vec<RecordBatch>>,
298 schema: SchemaRef,
299
300 start_data_barrier: Option<Arc<Barrier>>,
302
303 finish_barrier: Option<Arc<(Barrier, AtomicUsize)>>,
305
306 cache: PlanProperties,
307
308 log: bool,
309}
310
311impl BarrierExec {
312 pub fn new(data: Vec<Vec<RecordBatch>>, schema: SchemaRef) -> Self {
314 let barrier = Some(Arc::new(Barrier::new(data.len() + 1)));
316 let cache = Self::compute_properties(Arc::clone(&schema), &data);
317 Self {
318 data,
319 schema,
320 start_data_barrier: barrier,
321 cache,
322 finish_barrier: None,
323 log: true,
324 }
325 }
326
327 pub fn with_log(mut self, log: bool) -> Self {
328 self.log = log;
329 self
330 }
331
332 pub fn without_start_barrier(mut self) -> Self {
333 self.start_data_barrier = None;
334 self
335 }
336
337 pub fn with_finish_barrier(mut self) -> Self {
338 let barrier = Arc::new((
339 Barrier::new(self.data.len() + 1),
341 AtomicUsize::new(0),
342 ));
343
344 self.finish_barrier = Some(barrier);
345 self
346 }
347
348 pub async fn wait(&self) {
350 let barrier = &self
351 .start_data_barrier
352 .as_ref()
353 .expect("Must only be called when having a start barrier");
354 if self.log {
355 println!("BarrierExec::wait waiting on barrier");
356 }
357 barrier.wait().await;
358 if self.log {
359 println!("BarrierExec::wait done waiting");
360 }
361 }
362
363 pub async fn wait_finish(&self) {
364 let (barrier, _) = &self
365 .finish_barrier
366 .as_deref()
367 .expect("Must only be called when having a finish barrier");
368
369 if self.log {
370 println!("BarrierExec::wait_finish waiting on barrier");
371 }
372 barrier.wait().await;
373 if self.log {
374 println!("BarrierExec::wait_finish done waiting");
375 }
376 }
377
378 pub fn is_finish_barrier_reached(&self) -> bool {
380 let (_, reached_finish) = self
381 .finish_barrier
382 .as_deref()
383 .expect("Must only be called when having finish barrier");
384
385 reached_finish.load(Ordering::Relaxed) == self.data.len()
386 }
387
388 fn compute_properties(
390 schema: SchemaRef,
391 data: &[Vec<RecordBatch>],
392 ) -> PlanProperties {
393 PlanProperties::new(
394 EquivalenceProperties::new(schema),
395 Partitioning::UnknownPartitioning(data.len()),
396 EmissionType::Incremental,
397 Boundedness::Bounded,
398 )
399 }
400}
401
402impl DisplayAs for BarrierExec {
403 fn fmt_as(
404 &self,
405 t: DisplayFormatType,
406 f: &mut std::fmt::Formatter,
407 ) -> std::fmt::Result {
408 match t {
409 DisplayFormatType::Default | DisplayFormatType::Verbose => {
410 write!(f, "BarrierExec")
411 }
412 DisplayFormatType::TreeRender => {
413 write!(f, "")
415 }
416 }
417 }
418}
419
420impl ExecutionPlan for BarrierExec {
421 fn name(&self) -> &'static str {
422 Self::static_name()
423 }
424
425 fn as_any(&self) -> &dyn Any {
426 self
427 }
428
429 fn properties(&self) -> &PlanProperties {
430 &self.cache
431 }
432
433 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
434 unimplemented!()
435 }
436
437 fn with_new_children(
438 self: Arc<Self>,
439 _: Vec<Arc<dyn ExecutionPlan>>,
440 ) -> Result<Arc<dyn ExecutionPlan>> {
441 unimplemented!()
442 }
443
444 fn execute(
446 &self,
447 partition: usize,
448 _context: Arc<TaskContext>,
449 ) -> Result<SendableRecordBatchStream> {
450 assert!(partition < self.data.len());
451
452 let mut builder = RecordBatchReceiverStream::builder(self.schema(), 2);
453
454 let data = self.data[partition].clone();
456 let start_barrier = self.start_data_barrier.as_ref().map(Arc::clone);
457 let finish_barrier = self.finish_barrier.as_ref().map(Arc::clone);
458 let log = self.log;
459 let tx = builder.tx();
460 builder.spawn(async move {
461 if let Some(barrier) = start_barrier {
462 if log {
463 println!("Partition {partition} waiting on barrier");
464 }
465 barrier.wait().await;
466 }
467 for batch in data {
468 if log {
469 println!("Partition {partition} sending batch");
470 }
471 if let Err(e) = tx.send(Ok(batch)).await {
472 println!("ERROR batch via barrier stream stream: {e}");
473 }
474 }
475 if let Some((barrier, reached_finish)) = finish_barrier.as_deref() {
476 if log {
477 println!("Partition {partition} waiting on finish barrier");
478 }
479 reached_finish.fetch_add(1, Ordering::Relaxed);
480 barrier.wait().await;
481 }
482
483 Ok(())
484 });
485
486 Ok(builder.build())
488 }
489
490 fn statistics(&self) -> Result<Statistics> {
491 self.partition_statistics(None)
492 }
493
494 fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
495 if partition.is_some() {
496 return Ok(Statistics::new_unknown(&self.schema));
497 }
498 Ok(common::compute_record_batch_statistics(
499 &self.data,
500 &self.schema,
501 None,
502 ))
503 }
504}
505
506#[derive(Debug)]
508pub struct ErrorExec {
509 cache: PlanProperties,
510}
511
512impl Default for ErrorExec {
513 fn default() -> Self {
514 Self::new()
515 }
516}
517
518impl ErrorExec {
519 pub fn new() -> Self {
520 let schema = Arc::new(Schema::new(vec![Field::new(
521 "dummy",
522 DataType::Int64,
523 true,
524 )]));
525 let cache = Self::compute_properties(schema);
526 Self { cache }
527 }
528
529 fn compute_properties(schema: SchemaRef) -> PlanProperties {
531 PlanProperties::new(
532 EquivalenceProperties::new(schema),
533 Partitioning::UnknownPartitioning(1),
534 EmissionType::Incremental,
535 Boundedness::Bounded,
536 )
537 }
538}
539
540impl DisplayAs for ErrorExec {
541 fn fmt_as(
542 &self,
543 t: DisplayFormatType,
544 f: &mut std::fmt::Formatter,
545 ) -> std::fmt::Result {
546 match t {
547 DisplayFormatType::Default | DisplayFormatType::Verbose => {
548 write!(f, "ErrorExec")
549 }
550 DisplayFormatType::TreeRender => {
551 write!(f, "")
553 }
554 }
555 }
556}
557
558impl ExecutionPlan for ErrorExec {
559 fn name(&self) -> &'static str {
560 Self::static_name()
561 }
562
563 fn as_any(&self) -> &dyn Any {
564 self
565 }
566
567 fn properties(&self) -> &PlanProperties {
568 &self.cache
569 }
570
571 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
572 unimplemented!()
573 }
574
575 fn with_new_children(
576 self: Arc<Self>,
577 _: Vec<Arc<dyn ExecutionPlan>>,
578 ) -> Result<Arc<dyn ExecutionPlan>> {
579 unimplemented!()
580 }
581
582 fn execute(
584 &self,
585 partition: usize,
586 _context: Arc<TaskContext>,
587 ) -> Result<SendableRecordBatchStream> {
588 internal_err!("ErrorExec, unsurprisingly, errored in partition {partition}")
589 }
590}
591
592#[derive(Debug, Clone)]
594pub struct StatisticsExec {
595 stats: Statistics,
596 schema: Arc<Schema>,
597 cache: PlanProperties,
598}
599impl StatisticsExec {
600 pub fn new(stats: Statistics, schema: Schema) -> Self {
601 assert_eq!(
602 stats.column_statistics.len(),
603 schema.fields().len(),
604 "if defined, the column statistics vector length should be the number of fields"
605 );
606 let cache = Self::compute_properties(Arc::new(schema.clone()));
607 Self {
608 stats,
609 schema: Arc::new(schema),
610 cache,
611 }
612 }
613
614 fn compute_properties(schema: SchemaRef) -> PlanProperties {
616 PlanProperties::new(
617 EquivalenceProperties::new(schema),
618 Partitioning::UnknownPartitioning(2),
619 EmissionType::Incremental,
620 Boundedness::Bounded,
621 )
622 }
623}
624
625impl DisplayAs for StatisticsExec {
626 fn fmt_as(
627 &self,
628 t: DisplayFormatType,
629 f: &mut std::fmt::Formatter,
630 ) -> std::fmt::Result {
631 match t {
632 DisplayFormatType::Default | DisplayFormatType::Verbose => {
633 write!(
634 f,
635 "StatisticsExec: col_count={}, row_count={:?}",
636 self.schema.fields().len(),
637 self.stats.num_rows,
638 )
639 }
640 DisplayFormatType::TreeRender => {
641 write!(f, "")
643 }
644 }
645 }
646}
647
648impl ExecutionPlan for StatisticsExec {
649 fn name(&self) -> &'static str {
650 Self::static_name()
651 }
652
653 fn as_any(&self) -> &dyn Any {
654 self
655 }
656
657 fn properties(&self) -> &PlanProperties {
658 &self.cache
659 }
660
661 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
662 vec![]
663 }
664
665 fn with_new_children(
666 self: Arc<Self>,
667 _: Vec<Arc<dyn ExecutionPlan>>,
668 ) -> Result<Arc<dyn ExecutionPlan>> {
669 Ok(self)
670 }
671
672 fn execute(
673 &self,
674 _partition: usize,
675 _context: Arc<TaskContext>,
676 ) -> Result<SendableRecordBatchStream> {
677 unimplemented!("This plan only serves for testing statistics")
678 }
679
680 fn statistics(&self) -> Result<Statistics> {
681 Ok(self.stats.clone())
682 }
683
684 fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
685 Ok(if partition.is_some() {
686 Statistics::new_unknown(&self.schema)
687 } else {
688 self.stats.clone()
689 })
690 }
691}
692
693#[derive(Debug)]
697pub struct BlockingExec {
698 schema: SchemaRef,
700
701 refs: Arc<()>,
703 cache: PlanProperties,
704}
705
706impl BlockingExec {
707 pub fn new(schema: SchemaRef, n_partitions: usize) -> Self {
709 let cache = Self::compute_properties(Arc::clone(&schema), n_partitions);
710 Self {
711 schema,
712 refs: Default::default(),
713 cache,
714 }
715 }
716
717 pub fn refs(&self) -> Weak<()> {
723 Arc::downgrade(&self.refs)
724 }
725
726 fn compute_properties(schema: SchemaRef, n_partitions: usize) -> PlanProperties {
728 PlanProperties::new(
729 EquivalenceProperties::new(schema),
730 Partitioning::UnknownPartitioning(n_partitions),
731 EmissionType::Incremental,
732 Boundedness::Bounded,
733 )
734 }
735}
736
737impl DisplayAs for BlockingExec {
738 fn fmt_as(
739 &self,
740 t: DisplayFormatType,
741 f: &mut std::fmt::Formatter,
742 ) -> std::fmt::Result {
743 match t {
744 DisplayFormatType::Default | DisplayFormatType::Verbose => {
745 write!(f, "BlockingExec",)
746 }
747 DisplayFormatType::TreeRender => {
748 write!(f, "")
750 }
751 }
752 }
753}
754
755impl ExecutionPlan for BlockingExec {
756 fn name(&self) -> &'static str {
757 Self::static_name()
758 }
759
760 fn as_any(&self) -> &dyn Any {
761 self
762 }
763
764 fn properties(&self) -> &PlanProperties {
765 &self.cache
766 }
767
768 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
769 vec![]
771 }
772
773 fn with_new_children(
774 self: Arc<Self>,
775 _: Vec<Arc<dyn ExecutionPlan>>,
776 ) -> Result<Arc<dyn ExecutionPlan>> {
777 internal_err!("Children cannot be replaced in {self:?}")
778 }
779
780 fn execute(
781 &self,
782 _partition: usize,
783 _context: Arc<TaskContext>,
784 ) -> Result<SendableRecordBatchStream> {
785 Ok(Box::pin(BlockingStream {
786 schema: Arc::clone(&self.schema),
787 _refs: Arc::clone(&self.refs),
788 }))
789 }
790}
791
792#[derive(Debug)]
794pub struct BlockingStream {
795 schema: SchemaRef,
797
798 _refs: Arc<()>,
800}
801
802impl Stream for BlockingStream {
803 type Item = Result<RecordBatch>;
804
805 fn poll_next(
806 self: Pin<&mut Self>,
807 _cx: &mut Context<'_>,
808 ) -> Poll<Option<Self::Item>> {
809 Poll::Pending
810 }
811}
812
813impl RecordBatchStream for BlockingStream {
814 fn schema(&self) -> SchemaRef {
815 Arc::clone(&self.schema)
816 }
817}
818
819pub async fn assert_strong_count_converges_to_zero<T>(refs: Weak<T>) {
823 tokio::time::timeout(std::time::Duration::from_secs(10), async {
824 loop {
825 if dbg!(Weak::strong_count(&refs)) == 0 {
826 break;
827 }
828 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
829 }
830 })
831 .await
832 .unwrap();
833}
834
835#[derive(Debug)]
839pub struct PanicExec {
840 schema: SchemaRef,
842
843 batches_until_panics: Vec<usize>,
846 cache: PlanProperties,
847}
848
849impl PanicExec {
850 pub fn new(schema: SchemaRef, n_partitions: usize) -> Self {
853 let batches_until_panics = vec![0; n_partitions];
854 let cache = Self::compute_properties(Arc::clone(&schema), &batches_until_panics);
855 Self {
856 schema,
857 batches_until_panics,
858 cache,
859 }
860 }
861
862 pub fn with_partition_panic(mut self, partition: usize, count: usize) -> Self {
864 self.batches_until_panics[partition] = count;
865 self
866 }
867
868 fn compute_properties(
870 schema: SchemaRef,
871 batches_until_panics: &[usize],
872 ) -> PlanProperties {
873 let num_partitions = batches_until_panics.len();
874 PlanProperties::new(
875 EquivalenceProperties::new(schema),
876 Partitioning::UnknownPartitioning(num_partitions),
877 EmissionType::Incremental,
878 Boundedness::Bounded,
879 )
880 }
881}
882
883impl DisplayAs for PanicExec {
884 fn fmt_as(
885 &self,
886 t: DisplayFormatType,
887 f: &mut std::fmt::Formatter,
888 ) -> std::fmt::Result {
889 match t {
890 DisplayFormatType::Default | DisplayFormatType::Verbose => {
891 write!(f, "PanicExec",)
892 }
893 DisplayFormatType::TreeRender => {
894 write!(f, "")
896 }
897 }
898 }
899}
900
901impl ExecutionPlan for PanicExec {
902 fn name(&self) -> &'static str {
903 Self::static_name()
904 }
905
906 fn as_any(&self) -> &dyn Any {
907 self
908 }
909
910 fn properties(&self) -> &PlanProperties {
911 &self.cache
912 }
913
914 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
915 vec![]
917 }
918
919 fn with_new_children(
920 self: Arc<Self>,
921 _: Vec<Arc<dyn ExecutionPlan>>,
922 ) -> Result<Arc<dyn ExecutionPlan>> {
923 internal_err!("Children cannot be replaced in {:?}", self)
924 }
925
926 fn execute(
927 &self,
928 partition: usize,
929 _context: Arc<TaskContext>,
930 ) -> Result<SendableRecordBatchStream> {
931 Ok(Box::pin(PanicStream {
932 partition,
933 batches_until_panic: self.batches_until_panics[partition],
934 schema: Arc::clone(&self.schema),
935 ready: false,
936 }))
937 }
938}
939
940#[derive(Debug)]
945struct PanicStream {
946 partition: usize,
948 batches_until_panic: usize,
950 schema: SchemaRef,
952 ready: bool,
954}
955
956impl Stream for PanicStream {
957 type Item = Result<RecordBatch>;
958
959 fn poll_next(
960 mut self: Pin<&mut Self>,
961 cx: &mut Context<'_>,
962 ) -> Poll<Option<Self::Item>> {
963 if self.batches_until_panic > 0 {
964 if self.ready {
965 self.batches_until_panic -= 1;
966 self.ready = false;
967 let batch = RecordBatch::new_empty(Arc::clone(&self.schema));
968 return Poll::Ready(Some(Ok(batch)));
969 } else {
970 self.ready = true;
971 cx.waker().wake_by_ref();
973 return Poll::Pending;
974 }
975 }
976 panic!("PanickingStream did panic: {}", self.partition)
977 }
978}
979
980impl RecordBatchStream for PanicStream {
981 fn schema(&self) -> SchemaRef {
982 Arc::clone(&self.schema)
983 }
984}