1use crate::{
21 DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
22 RecordBatchStream, SendableRecordBatchStream, Statistics, common,
23 execution_plan::Boundedness,
24};
25use crate::{
26 execution_plan::EmissionType,
27 stream::{RecordBatchReceiverStream, RecordBatchStreamAdapter},
28};
29use std::sync::atomic::{AtomicUsize, Ordering};
30use std::{
31 pin::Pin,
32 sync::{Arc, Weak},
33 task::{Context, Poll},
34};
35
36use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
37use arrow::record_batch::RecordBatch;
38use datafusion_common::{DataFusionError, Result, internal_err};
39use datafusion_execution::TaskContext;
40use datafusion_physical_expr::EquivalenceProperties;
41
42use futures::Stream;
43use tokio::sync::Barrier;
44
45#[derive(Debug, Default, Clone)]
47pub struct BatchIndex {
48 inner: Arc<std::sync::Mutex<usize>>,
49}
50
51impl BatchIndex {
52 pub fn value(&self) -> usize {
54 let inner = self.inner.lock().unwrap();
55 *inner
56 }
57
58 pub fn incr(&self) {
60 let mut inner = self.inner.lock().unwrap();
61 *inner += 1;
62 }
63}
64
65#[derive(Debug, Default)]
67pub struct TestStream {
68 data: Vec<RecordBatch>,
70 index: BatchIndex,
72}
73
74impl TestStream {
75 pub fn new(data: Vec<RecordBatch>) -> Self {
78 Self {
79 data,
80 ..Default::default()
81 }
82 }
83
84 pub fn index(&self) -> BatchIndex {
86 self.index.clone()
87 }
88}
89
90impl Stream for TestStream {
91 type Item = Result<RecordBatch>;
92
93 fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
94 let next_batch = self.index.value();
95
96 Poll::Ready(if next_batch < self.data.len() {
97 let next_batch = self.index.value();
98 self.index.incr();
99 Some(Ok(self.data[next_batch].clone()))
100 } else {
101 None
102 })
103 }
104
105 fn size_hint(&self) -> (usize, Option<usize>) {
106 (self.data.len(), Some(self.data.len()))
107 }
108}
109
110impl RecordBatchStream for TestStream {
111 fn schema(&self) -> SchemaRef {
113 self.data[0].schema()
114 }
115}
116
117#[derive(Debug)]
120pub struct MockExec {
121 data: Vec<Result<RecordBatch>>,
123 schema: SchemaRef,
124 use_task: bool,
127 cache: Arc<PlanProperties>,
128}
129
130impl MockExec {
131 pub fn new(data: Vec<Result<RecordBatch>>, schema: SchemaRef) -> Self {
139 let cache = Self::compute_properties(Arc::clone(&schema));
140 Self {
141 data,
142 schema,
143 use_task: true,
144 cache: Arc::new(cache),
145 }
146 }
147
148 pub fn with_use_task(mut self, use_task: bool) -> Self {
152 self.use_task = use_task;
153 self
154 }
155
156 fn compute_properties(schema: SchemaRef) -> PlanProperties {
158 PlanProperties::new(
159 EquivalenceProperties::new(schema),
160 Partitioning::UnknownPartitioning(1),
161 EmissionType::Incremental,
162 Boundedness::Bounded,
163 )
164 }
165}
166
167impl DisplayAs for MockExec {
168 fn fmt_as(
169 &self,
170 t: DisplayFormatType,
171 f: &mut std::fmt::Formatter,
172 ) -> std::fmt::Result {
173 match t {
174 DisplayFormatType::Default | DisplayFormatType::Verbose => {
175 write!(f, "MockExec")
176 }
177 DisplayFormatType::TreeRender => {
178 write!(f, "")
180 }
181 }
182 }
183}
184
185impl ExecutionPlan for MockExec {
186 fn name(&self) -> &'static str {
187 Self::static_name()
188 }
189
190 fn properties(&self) -> &Arc<PlanProperties> {
191 &self.cache
192 }
193
194 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
195 vec![]
196 }
197
198 fn with_new_children(
199 self: Arc<Self>,
200 _: Vec<Arc<dyn ExecutionPlan>>,
201 ) -> Result<Arc<dyn ExecutionPlan>> {
202 unimplemented!()
203 }
204
205 fn execute(
207 &self,
208 partition: usize,
209 _context: Arc<TaskContext>,
210 ) -> Result<SendableRecordBatchStream> {
211 assert_eq!(partition, 0);
212
213 let data: Vec<_> = self
215 .data
216 .iter()
217 .map(|r| match r {
218 Ok(batch) => Ok(batch.clone()),
219 Err(e) => Err(clone_error(e)),
220 })
221 .collect();
222
223 if self.use_task {
224 let mut builder = RecordBatchReceiverStream::builder(self.schema(), 2);
225 let tx = builder.tx();
229 builder.spawn(async move {
230 for batch in data {
231 println!("Sending batch via delayed stream");
232 if let Err(e) = tx.send(batch).await {
233 println!("ERROR batch via delayed stream: {e}");
234 }
235 }
236
237 Ok(())
238 });
239 Ok(builder.build())
241 } else {
242 let stream = futures::stream::iter(data);
244 Ok(Box::pin(RecordBatchStreamAdapter::new(
245 self.schema(),
246 stream,
247 )))
248 }
249 }
250
251 fn partition_statistics(&self, partition: Option<usize>) -> Result<Arc<Statistics>> {
253 if partition.is_some() {
254 return Ok(Arc::new(Statistics::new_unknown(&self.schema)));
255 }
256 let data: Result<Vec<_>> = self
257 .data
258 .iter()
259 .map(|r| match r {
260 Ok(batch) => Ok(batch.clone()),
261 Err(e) => Err(clone_error(e)),
262 })
263 .collect();
264
265 let data = data?;
266
267 Ok(Arc::new(common::compute_record_batch_statistics(
268 &[data],
269 &self.schema,
270 None,
271 )))
272 }
273}
274
275fn clone_error(e: &DataFusionError) -> DataFusionError {
276 use DataFusionError::*;
277 match e {
278 Execution(msg) => Execution(msg.to_string()),
279 _ => unimplemented!(),
280 }
281}
282
283#[derive(Debug)]
286pub struct BarrierExec {
287 data: Vec<Vec<RecordBatch>>,
289 schema: SchemaRef,
290
291 start_data_barrier: Option<Arc<Barrier>>,
293
294 finish_barrier: Option<Arc<(Barrier, AtomicUsize)>>,
296
297 cache: Arc<PlanProperties>,
298
299 log: bool,
300}
301
302impl BarrierExec {
303 pub fn new(data: Vec<Vec<RecordBatch>>, schema: SchemaRef) -> Self {
305 let barrier = Some(Arc::new(Barrier::new(data.len() + 1)));
307 let cache = Self::compute_properties(Arc::clone(&schema), &data);
308 Self {
309 data,
310 schema,
311 start_data_barrier: barrier,
312 cache: Arc::new(cache),
313 finish_barrier: None,
314 log: true,
315 }
316 }
317
318 pub fn with_log(mut self, log: bool) -> Self {
319 self.log = log;
320 self
321 }
322
323 pub fn without_start_barrier(mut self) -> Self {
324 self.start_data_barrier = None;
325 self
326 }
327
328 pub fn with_finish_barrier(mut self) -> Self {
329 let barrier = Arc::new((
330 Barrier::new(self.data.len() + 1),
332 AtomicUsize::new(0),
333 ));
334
335 self.finish_barrier = Some(barrier);
336 self
337 }
338
339 pub async fn wait(&self) {
341 let barrier = &self
342 .start_data_barrier
343 .as_ref()
344 .expect("Must only be called when having a start barrier");
345 if self.log {
346 println!("BarrierExec::wait waiting on barrier");
347 }
348 barrier.wait().await;
349 if self.log {
350 println!("BarrierExec::wait done waiting");
351 }
352 }
353
354 pub async fn wait_finish(&self) {
355 let (barrier, _) = &self
356 .finish_barrier
357 .as_deref()
358 .expect("Must only be called when having a finish barrier");
359
360 if self.log {
361 println!("BarrierExec::wait_finish waiting on barrier");
362 }
363 barrier.wait().await;
364 if self.log {
365 println!("BarrierExec::wait_finish done waiting");
366 }
367 }
368
369 pub fn is_finish_barrier_reached(&self) -> bool {
371 let (_, reached_finish) = self
372 .finish_barrier
373 .as_deref()
374 .expect("Must only be called when having finish barrier");
375
376 reached_finish.load(Ordering::Relaxed) == self.data.len()
377 }
378
379 fn compute_properties(
381 schema: SchemaRef,
382 data: &[Vec<RecordBatch>],
383 ) -> PlanProperties {
384 PlanProperties::new(
385 EquivalenceProperties::new(schema),
386 Partitioning::UnknownPartitioning(data.len()),
387 EmissionType::Incremental,
388 Boundedness::Bounded,
389 )
390 }
391}
392
393impl DisplayAs for BarrierExec {
394 fn fmt_as(
395 &self,
396 t: DisplayFormatType,
397 f: &mut std::fmt::Formatter,
398 ) -> std::fmt::Result {
399 match t {
400 DisplayFormatType::Default | DisplayFormatType::Verbose => {
401 write!(f, "BarrierExec")
402 }
403 DisplayFormatType::TreeRender => {
404 write!(f, "")
406 }
407 }
408 }
409}
410
411impl ExecutionPlan for BarrierExec {
412 fn name(&self) -> &'static str {
413 Self::static_name()
414 }
415
416 fn properties(&self) -> &Arc<PlanProperties> {
417 &self.cache
418 }
419
420 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
421 unimplemented!()
422 }
423
424 fn with_new_children(
425 self: Arc<Self>,
426 _: Vec<Arc<dyn ExecutionPlan>>,
427 ) -> Result<Arc<dyn ExecutionPlan>> {
428 unimplemented!()
429 }
430
431 fn execute(
433 &self,
434 partition: usize,
435 _context: Arc<TaskContext>,
436 ) -> Result<SendableRecordBatchStream> {
437 assert!(partition < self.data.len());
438
439 let mut builder = RecordBatchReceiverStream::builder(self.schema(), 2);
440
441 let data = self.data[partition].clone();
443 let start_barrier = self.start_data_barrier.as_ref().map(Arc::clone);
444 let finish_barrier = self.finish_barrier.as_ref().map(Arc::clone);
445 let log = self.log;
446 let tx = builder.tx();
447 builder.spawn(async move {
448 if let Some(barrier) = start_barrier {
449 if log {
450 println!("Partition {partition} waiting on barrier");
451 }
452 barrier.wait().await;
453 }
454 for batch in data {
455 if log {
456 println!("Partition {partition} sending batch");
457 }
458 if let Err(e) = tx.send(Ok(batch)).await {
459 println!("ERROR batch via barrier stream stream: {e}");
460 }
461 }
462 if let Some((barrier, reached_finish)) = finish_barrier.as_deref() {
463 if log {
464 println!("Partition {partition} waiting on finish barrier");
465 }
466 reached_finish.fetch_add(1, Ordering::Relaxed);
467 barrier.wait().await;
468 }
469
470 Ok(())
471 });
472
473 Ok(builder.build())
475 }
476
477 fn partition_statistics(&self, partition: Option<usize>) -> Result<Arc<Statistics>> {
478 if partition.is_some() {
479 return Ok(Arc::new(Statistics::new_unknown(&self.schema)));
480 }
481 Ok(Arc::new(common::compute_record_batch_statistics(
482 &self.data,
483 &self.schema,
484 None,
485 )))
486 }
487}
488
489#[derive(Debug)]
491pub struct ErrorExec {
492 cache: Arc<PlanProperties>,
493}
494
495impl Default for ErrorExec {
496 fn default() -> Self {
497 Self::new()
498 }
499}
500
501impl ErrorExec {
502 pub fn new() -> Self {
503 let schema = Arc::new(Schema::new(vec![Field::new(
504 "dummy",
505 DataType::Int64,
506 true,
507 )]));
508 let cache = Self::compute_properties(schema);
509 Self {
510 cache: Arc::new(cache),
511 }
512 }
513
514 fn compute_properties(schema: SchemaRef) -> PlanProperties {
516 PlanProperties::new(
517 EquivalenceProperties::new(schema),
518 Partitioning::UnknownPartitioning(1),
519 EmissionType::Incremental,
520 Boundedness::Bounded,
521 )
522 }
523}
524
525impl DisplayAs for ErrorExec {
526 fn fmt_as(
527 &self,
528 t: DisplayFormatType,
529 f: &mut std::fmt::Formatter,
530 ) -> std::fmt::Result {
531 match t {
532 DisplayFormatType::Default | DisplayFormatType::Verbose => {
533 write!(f, "ErrorExec")
534 }
535 DisplayFormatType::TreeRender => {
536 write!(f, "")
538 }
539 }
540 }
541}
542
543impl ExecutionPlan for ErrorExec {
544 fn name(&self) -> &'static str {
545 Self::static_name()
546 }
547
548 fn properties(&self) -> &Arc<PlanProperties> {
549 &self.cache
550 }
551
552 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
553 unimplemented!()
554 }
555
556 fn with_new_children(
557 self: Arc<Self>,
558 _: Vec<Arc<dyn ExecutionPlan>>,
559 ) -> Result<Arc<dyn ExecutionPlan>> {
560 unimplemented!()
561 }
562
563 fn execute(
565 &self,
566 partition: usize,
567 _context: Arc<TaskContext>,
568 ) -> Result<SendableRecordBatchStream> {
569 internal_err!("ErrorExec, unsurprisingly, errored in partition {partition}")
570 }
571}
572
573#[derive(Debug, Clone)]
575pub struct StatisticsExec {
576 stats: Statistics,
577 schema: Arc<Schema>,
578 cache: Arc<PlanProperties>,
579}
580impl StatisticsExec {
581 pub fn new(stats: Statistics, schema: Schema) -> Self {
582 assert_eq!(
583 stats.column_statistics.len(),
584 schema.fields().len(),
585 "if defined, the column statistics vector length should be the number of fields"
586 );
587 let cache = Self::compute_properties(Arc::new(schema.clone()));
588 Self {
589 stats,
590 schema: Arc::new(schema),
591 cache: Arc::new(cache),
592 }
593 }
594
595 fn compute_properties(schema: SchemaRef) -> PlanProperties {
597 PlanProperties::new(
598 EquivalenceProperties::new(schema),
599 Partitioning::UnknownPartitioning(2),
600 EmissionType::Incremental,
601 Boundedness::Bounded,
602 )
603 }
604}
605
606impl DisplayAs for StatisticsExec {
607 fn fmt_as(
608 &self,
609 t: DisplayFormatType,
610 f: &mut std::fmt::Formatter,
611 ) -> std::fmt::Result {
612 match t {
613 DisplayFormatType::Default | DisplayFormatType::Verbose => {
614 write!(
615 f,
616 "StatisticsExec: col_count={}, row_count={:?}",
617 self.schema.fields().len(),
618 self.stats.num_rows,
619 )
620 }
621 DisplayFormatType::TreeRender => {
622 write!(f, "")
624 }
625 }
626 }
627}
628
629impl ExecutionPlan for StatisticsExec {
630 fn name(&self) -> &'static str {
631 Self::static_name()
632 }
633
634 fn properties(&self) -> &Arc<PlanProperties> {
635 &self.cache
636 }
637
638 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
639 vec![]
640 }
641
642 fn with_new_children(
643 self: Arc<Self>,
644 _: Vec<Arc<dyn ExecutionPlan>>,
645 ) -> Result<Arc<dyn ExecutionPlan>> {
646 Ok(self)
647 }
648
649 fn execute(
650 &self,
651 _partition: usize,
652 _context: Arc<TaskContext>,
653 ) -> Result<SendableRecordBatchStream> {
654 unimplemented!("This plan only serves for testing statistics")
655 }
656
657 fn partition_statistics(&self, partition: Option<usize>) -> Result<Arc<Statistics>> {
658 Ok(Arc::new(if partition.is_some() {
659 Statistics::new_unknown(&self.schema)
660 } else {
661 self.stats.clone()
662 }))
663 }
664}
665
666#[derive(Debug)]
670pub struct BlockingExec {
671 schema: SchemaRef,
673
674 refs: Arc<()>,
676 cache: Arc<PlanProperties>,
677}
678
679impl BlockingExec {
680 pub fn new(schema: SchemaRef, n_partitions: usize) -> Self {
682 let cache = Self::compute_properties(Arc::clone(&schema), n_partitions);
683 Self {
684 schema,
685 refs: Default::default(),
686 cache: Arc::new(cache),
687 }
688 }
689
690 pub fn refs(&self) -> Weak<()> {
696 Arc::downgrade(&self.refs)
697 }
698
699 fn compute_properties(schema: SchemaRef, n_partitions: usize) -> PlanProperties {
701 PlanProperties::new(
702 EquivalenceProperties::new(schema),
703 Partitioning::UnknownPartitioning(n_partitions),
704 EmissionType::Incremental,
705 Boundedness::Bounded,
706 )
707 }
708}
709
710impl DisplayAs for BlockingExec {
711 fn fmt_as(
712 &self,
713 t: DisplayFormatType,
714 f: &mut std::fmt::Formatter,
715 ) -> std::fmt::Result {
716 match t {
717 DisplayFormatType::Default | DisplayFormatType::Verbose => {
718 write!(f, "BlockingExec",)
719 }
720 DisplayFormatType::TreeRender => {
721 write!(f, "")
723 }
724 }
725 }
726}
727
728impl ExecutionPlan for BlockingExec {
729 fn name(&self) -> &'static str {
730 Self::static_name()
731 }
732
733 fn properties(&self) -> &Arc<PlanProperties> {
734 &self.cache
735 }
736
737 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
738 vec![]
740 }
741
742 fn with_new_children(
743 self: Arc<Self>,
744 _: Vec<Arc<dyn ExecutionPlan>>,
745 ) -> Result<Arc<dyn ExecutionPlan>> {
746 internal_err!("Children cannot be replaced in {self:?}")
747 }
748
749 fn execute(
750 &self,
751 _partition: usize,
752 _context: Arc<TaskContext>,
753 ) -> Result<SendableRecordBatchStream> {
754 Ok(Box::pin(BlockingStream {
755 schema: Arc::clone(&self.schema),
756 _refs: Arc::clone(&self.refs),
757 }))
758 }
759}
760
761#[derive(Debug)]
763pub struct BlockingStream {
764 schema: SchemaRef,
766
767 _refs: Arc<()>,
769}
770
771impl Stream for BlockingStream {
772 type Item = Result<RecordBatch>;
773
774 fn poll_next(
775 self: Pin<&mut Self>,
776 _cx: &mut Context<'_>,
777 ) -> Poll<Option<Self::Item>> {
778 Poll::Pending
779 }
780}
781
782impl RecordBatchStream for BlockingStream {
783 fn schema(&self) -> SchemaRef {
784 Arc::clone(&self.schema)
785 }
786}
787
788pub async fn assert_strong_count_converges_to_zero<T>(refs: Weak<T>) {
792 tokio::time::timeout(std::time::Duration::from_secs(10), async {
793 loop {
794 if dbg!(Weak::strong_count(&refs)) == 0 {
795 break;
796 }
797 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
798 }
799 })
800 .await
801 .unwrap();
802}
803
804#[derive(Debug)]
808pub struct PanicExec {
809 schema: SchemaRef,
811
812 batches_until_panics: Vec<usize>,
815 cache: Arc<PlanProperties>,
816}
817
818impl PanicExec {
819 pub fn new(schema: SchemaRef, n_partitions: usize) -> Self {
822 let batches_until_panics = vec![0; n_partitions];
823 let cache = Self::compute_properties(Arc::clone(&schema), &batches_until_panics);
824 Self {
825 schema,
826 batches_until_panics,
827 cache: Arc::new(cache),
828 }
829 }
830
831 pub fn with_partition_panic(mut self, partition: usize, count: usize) -> Self {
833 self.batches_until_panics[partition] = count;
834 self
835 }
836
837 fn compute_properties(
839 schema: SchemaRef,
840 batches_until_panics: &[usize],
841 ) -> PlanProperties {
842 let num_partitions = batches_until_panics.len();
843 PlanProperties::new(
844 EquivalenceProperties::new(schema),
845 Partitioning::UnknownPartitioning(num_partitions),
846 EmissionType::Incremental,
847 Boundedness::Bounded,
848 )
849 }
850}
851
852impl DisplayAs for PanicExec {
853 fn fmt_as(
854 &self,
855 t: DisplayFormatType,
856 f: &mut std::fmt::Formatter,
857 ) -> std::fmt::Result {
858 match t {
859 DisplayFormatType::Default | DisplayFormatType::Verbose => {
860 write!(f, "PanicExec",)
861 }
862 DisplayFormatType::TreeRender => {
863 write!(f, "")
865 }
866 }
867 }
868}
869
870impl ExecutionPlan for PanicExec {
871 fn name(&self) -> &'static str {
872 Self::static_name()
873 }
874
875 fn properties(&self) -> &Arc<PlanProperties> {
876 &self.cache
877 }
878
879 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
880 vec![]
882 }
883
884 fn with_new_children(
885 self: Arc<Self>,
886 _: Vec<Arc<dyn ExecutionPlan>>,
887 ) -> Result<Arc<dyn ExecutionPlan>> {
888 internal_err!("Children cannot be replaced in {:?}", self)
889 }
890
891 fn execute(
892 &self,
893 partition: usize,
894 _context: Arc<TaskContext>,
895 ) -> Result<SendableRecordBatchStream> {
896 Ok(Box::pin(PanicStream {
897 partition,
898 batches_until_panic: self.batches_until_panics[partition],
899 schema: Arc::clone(&self.schema),
900 ready: false,
901 }))
902 }
903}
904
905#[derive(Debug)]
910struct PanicStream {
911 partition: usize,
913 batches_until_panic: usize,
915 schema: SchemaRef,
917 ready: bool,
919}
920
921impl Stream for PanicStream {
922 type Item = Result<RecordBatch>;
923
924 fn poll_next(
925 mut self: Pin<&mut Self>,
926 cx: &mut Context<'_>,
927 ) -> Poll<Option<Self::Item>> {
928 if self.batches_until_panic > 0 {
929 if self.ready {
930 self.batches_until_panic -= 1;
931 self.ready = false;
932 let batch = RecordBatch::new_empty(Arc::clone(&self.schema));
933 return Poll::Ready(Some(Ok(batch)));
934 } else {
935 self.ready = true;
936 cx.waker().wake_by_ref();
938 return Poll::Pending;
939 }
940 }
941 panic!("PanickingStream did panic: {}", self.partition)
942 }
943}
944
945impl RecordBatchStream for PanicStream {
946 fn schema(&self) -> SchemaRef {
947 Arc::clone(&self.schema)
948 }
949}