1use crate::{
21 DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
22 RecordBatchStream, SendableRecordBatchStream, Statistics, common,
23 execution_plan::Boundedness,
24};
25use crate::{
26 execution_plan::EmissionType,
27 stream::{RecordBatchReceiverStream, RecordBatchStreamAdapter},
28};
29use std::sync::atomic::{AtomicUsize, Ordering};
30use std::{
31 any::Any,
32 pin::Pin,
33 sync::{Arc, Weak},
34 task::{Context, Poll},
35};
36
37use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
38use arrow::record_batch::RecordBatch;
39use datafusion_common::{DataFusionError, Result, internal_err};
40use datafusion_execution::TaskContext;
41use datafusion_physical_expr::EquivalenceProperties;
42
43use futures::Stream;
44use tokio::sync::Barrier;
45
46#[derive(Debug, Default, Clone)]
48pub struct BatchIndex {
49 inner: Arc<std::sync::Mutex<usize>>,
50}
51
52impl BatchIndex {
53 pub fn value(&self) -> usize {
55 let inner = self.inner.lock().unwrap();
56 *inner
57 }
58
59 pub fn incr(&self) {
61 let mut inner = self.inner.lock().unwrap();
62 *inner += 1;
63 }
64}
65
66#[derive(Debug, Default)]
68pub struct TestStream {
69 data: Vec<RecordBatch>,
71 index: BatchIndex,
73}
74
75impl TestStream {
76 pub fn new(data: Vec<RecordBatch>) -> Self {
79 Self {
80 data,
81 ..Default::default()
82 }
83 }
84
85 pub fn index(&self) -> BatchIndex {
87 self.index.clone()
88 }
89}
90
91impl Stream for TestStream {
92 type Item = Result<RecordBatch>;
93
94 fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
95 let next_batch = self.index.value();
96
97 Poll::Ready(if next_batch < self.data.len() {
98 let next_batch = self.index.value();
99 self.index.incr();
100 Some(Ok(self.data[next_batch].clone()))
101 } else {
102 None
103 })
104 }
105
106 fn size_hint(&self) -> (usize, Option<usize>) {
107 (self.data.len(), Some(self.data.len()))
108 }
109}
110
111impl RecordBatchStream for TestStream {
112 fn schema(&self) -> SchemaRef {
114 self.data[0].schema()
115 }
116}
117
118#[derive(Debug)]
121pub struct MockExec {
122 data: Vec<Result<RecordBatch>>,
124 schema: SchemaRef,
125 use_task: bool,
128 cache: Arc<PlanProperties>,
129}
130
131impl MockExec {
132 pub fn new(data: Vec<Result<RecordBatch>>, schema: SchemaRef) -> Self {
140 let cache = Self::compute_properties(Arc::clone(&schema));
141 Self {
142 data,
143 schema,
144 use_task: true,
145 cache: Arc::new(cache),
146 }
147 }
148
149 pub fn with_use_task(mut self, use_task: bool) -> Self {
153 self.use_task = use_task;
154 self
155 }
156
157 fn compute_properties(schema: SchemaRef) -> PlanProperties {
159 PlanProperties::new(
160 EquivalenceProperties::new(schema),
161 Partitioning::UnknownPartitioning(1),
162 EmissionType::Incremental,
163 Boundedness::Bounded,
164 )
165 }
166}
167
168impl DisplayAs for MockExec {
169 fn fmt_as(
170 &self,
171 t: DisplayFormatType,
172 f: &mut std::fmt::Formatter,
173 ) -> std::fmt::Result {
174 match t {
175 DisplayFormatType::Default | DisplayFormatType::Verbose => {
176 write!(f, "MockExec")
177 }
178 DisplayFormatType::TreeRender => {
179 write!(f, "")
181 }
182 }
183 }
184}
185
186impl ExecutionPlan for MockExec {
187 fn name(&self) -> &'static str {
188 Self::static_name()
189 }
190
191 fn as_any(&self) -> &dyn Any {
192 self
193 }
194
195 fn properties(&self) -> &Arc<PlanProperties> {
196 &self.cache
197 }
198
199 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
200 vec![]
201 }
202
203 fn with_new_children(
204 self: Arc<Self>,
205 _: Vec<Arc<dyn ExecutionPlan>>,
206 ) -> Result<Arc<dyn ExecutionPlan>> {
207 unimplemented!()
208 }
209
210 fn execute(
212 &self,
213 partition: usize,
214 _context: Arc<TaskContext>,
215 ) -> Result<SendableRecordBatchStream> {
216 assert_eq!(partition, 0);
217
218 let data: Vec<_> = self
220 .data
221 .iter()
222 .map(|r| match r {
223 Ok(batch) => Ok(batch.clone()),
224 Err(e) => Err(clone_error(e)),
225 })
226 .collect();
227
228 if self.use_task {
229 let mut builder = RecordBatchReceiverStream::builder(self.schema(), 2);
230 let tx = builder.tx();
234 builder.spawn(async move {
235 for batch in data {
236 println!("Sending batch via delayed stream");
237 if let Err(e) = tx.send(batch).await {
238 println!("ERROR batch via delayed stream: {e}");
239 }
240 }
241
242 Ok(())
243 });
244 Ok(builder.build())
246 } else {
247 let stream = futures::stream::iter(data);
249 Ok(Box::pin(RecordBatchStreamAdapter::new(
250 self.schema(),
251 stream,
252 )))
253 }
254 }
255
256 fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
258 if partition.is_some() {
259 return Ok(Statistics::new_unknown(&self.schema));
260 }
261 let data: Result<Vec<_>> = self
262 .data
263 .iter()
264 .map(|r| match r {
265 Ok(batch) => Ok(batch.clone()),
266 Err(e) => Err(clone_error(e)),
267 })
268 .collect();
269
270 let data = data?;
271
272 Ok(common::compute_record_batch_statistics(
273 &[data],
274 &self.schema,
275 None,
276 ))
277 }
278}
279
280fn clone_error(e: &DataFusionError) -> DataFusionError {
281 use DataFusionError::*;
282 match e {
283 Execution(msg) => Execution(msg.to_string()),
284 _ => unimplemented!(),
285 }
286}
287
288#[derive(Debug)]
291pub struct BarrierExec {
292 data: Vec<Vec<RecordBatch>>,
294 schema: SchemaRef,
295
296 start_data_barrier: Option<Arc<Barrier>>,
298
299 finish_barrier: Option<Arc<(Barrier, AtomicUsize)>>,
301
302 cache: Arc<PlanProperties>,
303
304 log: bool,
305}
306
307impl BarrierExec {
308 pub fn new(data: Vec<Vec<RecordBatch>>, schema: SchemaRef) -> Self {
310 let barrier = Some(Arc::new(Barrier::new(data.len() + 1)));
312 let cache = Self::compute_properties(Arc::clone(&schema), &data);
313 Self {
314 data,
315 schema,
316 start_data_barrier: barrier,
317 cache: Arc::new(cache),
318 finish_barrier: None,
319 log: true,
320 }
321 }
322
323 pub fn with_log(mut self, log: bool) -> Self {
324 self.log = log;
325 self
326 }
327
328 pub fn without_start_barrier(mut self) -> Self {
329 self.start_data_barrier = None;
330 self
331 }
332
333 pub fn with_finish_barrier(mut self) -> Self {
334 let barrier = Arc::new((
335 Barrier::new(self.data.len() + 1),
337 AtomicUsize::new(0),
338 ));
339
340 self.finish_barrier = Some(barrier);
341 self
342 }
343
344 pub async fn wait(&self) {
346 let barrier = &self
347 .start_data_barrier
348 .as_ref()
349 .expect("Must only be called when having a start barrier");
350 if self.log {
351 println!("BarrierExec::wait waiting on barrier");
352 }
353 barrier.wait().await;
354 if self.log {
355 println!("BarrierExec::wait done waiting");
356 }
357 }
358
359 pub async fn wait_finish(&self) {
360 let (barrier, _) = &self
361 .finish_barrier
362 .as_deref()
363 .expect("Must only be called when having a finish barrier");
364
365 if self.log {
366 println!("BarrierExec::wait_finish waiting on barrier");
367 }
368 barrier.wait().await;
369 if self.log {
370 println!("BarrierExec::wait_finish done waiting");
371 }
372 }
373
374 pub fn is_finish_barrier_reached(&self) -> bool {
376 let (_, reached_finish) = self
377 .finish_barrier
378 .as_deref()
379 .expect("Must only be called when having finish barrier");
380
381 reached_finish.load(Ordering::Relaxed) == self.data.len()
382 }
383
384 fn compute_properties(
386 schema: SchemaRef,
387 data: &[Vec<RecordBatch>],
388 ) -> PlanProperties {
389 PlanProperties::new(
390 EquivalenceProperties::new(schema),
391 Partitioning::UnknownPartitioning(data.len()),
392 EmissionType::Incremental,
393 Boundedness::Bounded,
394 )
395 }
396}
397
398impl DisplayAs for BarrierExec {
399 fn fmt_as(
400 &self,
401 t: DisplayFormatType,
402 f: &mut std::fmt::Formatter,
403 ) -> std::fmt::Result {
404 match t {
405 DisplayFormatType::Default | DisplayFormatType::Verbose => {
406 write!(f, "BarrierExec")
407 }
408 DisplayFormatType::TreeRender => {
409 write!(f, "")
411 }
412 }
413 }
414}
415
416impl ExecutionPlan for BarrierExec {
417 fn name(&self) -> &'static str {
418 Self::static_name()
419 }
420
421 fn as_any(&self) -> &dyn Any {
422 self
423 }
424
425 fn properties(&self) -> &Arc<PlanProperties> {
426 &self.cache
427 }
428
429 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
430 unimplemented!()
431 }
432
433 fn with_new_children(
434 self: Arc<Self>,
435 _: Vec<Arc<dyn ExecutionPlan>>,
436 ) -> Result<Arc<dyn ExecutionPlan>> {
437 unimplemented!()
438 }
439
440 fn execute(
442 &self,
443 partition: usize,
444 _context: Arc<TaskContext>,
445 ) -> Result<SendableRecordBatchStream> {
446 assert!(partition < self.data.len());
447
448 let mut builder = RecordBatchReceiverStream::builder(self.schema(), 2);
449
450 let data = self.data[partition].clone();
452 let start_barrier = self.start_data_barrier.as_ref().map(Arc::clone);
453 let finish_barrier = self.finish_barrier.as_ref().map(Arc::clone);
454 let log = self.log;
455 let tx = builder.tx();
456 builder.spawn(async move {
457 if let Some(barrier) = start_barrier {
458 if log {
459 println!("Partition {partition} waiting on barrier");
460 }
461 barrier.wait().await;
462 }
463 for batch in data {
464 if log {
465 println!("Partition {partition} sending batch");
466 }
467 if let Err(e) = tx.send(Ok(batch)).await {
468 println!("ERROR batch via barrier stream stream: {e}");
469 }
470 }
471 if let Some((barrier, reached_finish)) = finish_barrier.as_deref() {
472 if log {
473 println!("Partition {partition} waiting on finish barrier");
474 }
475 reached_finish.fetch_add(1, Ordering::Relaxed);
476 barrier.wait().await;
477 }
478
479 Ok(())
480 });
481
482 Ok(builder.build())
484 }
485
486 fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
487 if partition.is_some() {
488 return Ok(Statistics::new_unknown(&self.schema));
489 }
490 Ok(common::compute_record_batch_statistics(
491 &self.data,
492 &self.schema,
493 None,
494 ))
495 }
496}
497
498#[derive(Debug)]
500pub struct ErrorExec {
501 cache: Arc<PlanProperties>,
502}
503
504impl Default for ErrorExec {
505 fn default() -> Self {
506 Self::new()
507 }
508}
509
510impl ErrorExec {
511 pub fn new() -> Self {
512 let schema = Arc::new(Schema::new(vec![Field::new(
513 "dummy",
514 DataType::Int64,
515 true,
516 )]));
517 let cache = Self::compute_properties(schema);
518 Self {
519 cache: Arc::new(cache),
520 }
521 }
522
523 fn compute_properties(schema: SchemaRef) -> PlanProperties {
525 PlanProperties::new(
526 EquivalenceProperties::new(schema),
527 Partitioning::UnknownPartitioning(1),
528 EmissionType::Incremental,
529 Boundedness::Bounded,
530 )
531 }
532}
533
534impl DisplayAs for ErrorExec {
535 fn fmt_as(
536 &self,
537 t: DisplayFormatType,
538 f: &mut std::fmt::Formatter,
539 ) -> std::fmt::Result {
540 match t {
541 DisplayFormatType::Default | DisplayFormatType::Verbose => {
542 write!(f, "ErrorExec")
543 }
544 DisplayFormatType::TreeRender => {
545 write!(f, "")
547 }
548 }
549 }
550}
551
552impl ExecutionPlan for ErrorExec {
553 fn name(&self) -> &'static str {
554 Self::static_name()
555 }
556
557 fn as_any(&self) -> &dyn Any {
558 self
559 }
560
561 fn properties(&self) -> &Arc<PlanProperties> {
562 &self.cache
563 }
564
565 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
566 unimplemented!()
567 }
568
569 fn with_new_children(
570 self: Arc<Self>,
571 _: Vec<Arc<dyn ExecutionPlan>>,
572 ) -> Result<Arc<dyn ExecutionPlan>> {
573 unimplemented!()
574 }
575
576 fn execute(
578 &self,
579 partition: usize,
580 _context: Arc<TaskContext>,
581 ) -> Result<SendableRecordBatchStream> {
582 internal_err!("ErrorExec, unsurprisingly, errored in partition {partition}")
583 }
584}
585
586#[derive(Debug, Clone)]
588pub struct StatisticsExec {
589 stats: Statistics,
590 schema: Arc<Schema>,
591 cache: Arc<PlanProperties>,
592}
593impl StatisticsExec {
594 pub fn new(stats: Statistics, schema: Schema) -> Self {
595 assert_eq!(
596 stats.column_statistics.len(),
597 schema.fields().len(),
598 "if defined, the column statistics vector length should be the number of fields"
599 );
600 let cache = Self::compute_properties(Arc::new(schema.clone()));
601 Self {
602 stats,
603 schema: Arc::new(schema),
604 cache: Arc::new(cache),
605 }
606 }
607
608 fn compute_properties(schema: SchemaRef) -> PlanProperties {
610 PlanProperties::new(
611 EquivalenceProperties::new(schema),
612 Partitioning::UnknownPartitioning(2),
613 EmissionType::Incremental,
614 Boundedness::Bounded,
615 )
616 }
617}
618
619impl DisplayAs for StatisticsExec {
620 fn fmt_as(
621 &self,
622 t: DisplayFormatType,
623 f: &mut std::fmt::Formatter,
624 ) -> std::fmt::Result {
625 match t {
626 DisplayFormatType::Default | DisplayFormatType::Verbose => {
627 write!(
628 f,
629 "StatisticsExec: col_count={}, row_count={:?}",
630 self.schema.fields().len(),
631 self.stats.num_rows,
632 )
633 }
634 DisplayFormatType::TreeRender => {
635 write!(f, "")
637 }
638 }
639 }
640}
641
642impl ExecutionPlan for StatisticsExec {
643 fn name(&self) -> &'static str {
644 Self::static_name()
645 }
646
647 fn as_any(&self) -> &dyn Any {
648 self
649 }
650
651 fn properties(&self) -> &Arc<PlanProperties> {
652 &self.cache
653 }
654
655 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
656 vec![]
657 }
658
659 fn with_new_children(
660 self: Arc<Self>,
661 _: Vec<Arc<dyn ExecutionPlan>>,
662 ) -> Result<Arc<dyn ExecutionPlan>> {
663 Ok(self)
664 }
665
666 fn execute(
667 &self,
668 _partition: usize,
669 _context: Arc<TaskContext>,
670 ) -> Result<SendableRecordBatchStream> {
671 unimplemented!("This plan only serves for testing statistics")
672 }
673
674 fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
675 Ok(if partition.is_some() {
676 Statistics::new_unknown(&self.schema)
677 } else {
678 self.stats.clone()
679 })
680 }
681}
682
683#[derive(Debug)]
687pub struct BlockingExec {
688 schema: SchemaRef,
690
691 refs: Arc<()>,
693 cache: Arc<PlanProperties>,
694}
695
696impl BlockingExec {
697 pub fn new(schema: SchemaRef, n_partitions: usize) -> Self {
699 let cache = Self::compute_properties(Arc::clone(&schema), n_partitions);
700 Self {
701 schema,
702 refs: Default::default(),
703 cache: Arc::new(cache),
704 }
705 }
706
707 pub fn refs(&self) -> Weak<()> {
713 Arc::downgrade(&self.refs)
714 }
715
716 fn compute_properties(schema: SchemaRef, n_partitions: usize) -> PlanProperties {
718 PlanProperties::new(
719 EquivalenceProperties::new(schema),
720 Partitioning::UnknownPartitioning(n_partitions),
721 EmissionType::Incremental,
722 Boundedness::Bounded,
723 )
724 }
725}
726
727impl DisplayAs for BlockingExec {
728 fn fmt_as(
729 &self,
730 t: DisplayFormatType,
731 f: &mut std::fmt::Formatter,
732 ) -> std::fmt::Result {
733 match t {
734 DisplayFormatType::Default | DisplayFormatType::Verbose => {
735 write!(f, "BlockingExec",)
736 }
737 DisplayFormatType::TreeRender => {
738 write!(f, "")
740 }
741 }
742 }
743}
744
745impl ExecutionPlan for BlockingExec {
746 fn name(&self) -> &'static str {
747 Self::static_name()
748 }
749
750 fn as_any(&self) -> &dyn Any {
751 self
752 }
753
754 fn properties(&self) -> &Arc<PlanProperties> {
755 &self.cache
756 }
757
758 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
759 vec![]
761 }
762
763 fn with_new_children(
764 self: Arc<Self>,
765 _: Vec<Arc<dyn ExecutionPlan>>,
766 ) -> Result<Arc<dyn ExecutionPlan>> {
767 internal_err!("Children cannot be replaced in {self:?}")
768 }
769
770 fn execute(
771 &self,
772 _partition: usize,
773 _context: Arc<TaskContext>,
774 ) -> Result<SendableRecordBatchStream> {
775 Ok(Box::pin(BlockingStream {
776 schema: Arc::clone(&self.schema),
777 _refs: Arc::clone(&self.refs),
778 }))
779 }
780}
781
782#[derive(Debug)]
784pub struct BlockingStream {
785 schema: SchemaRef,
787
788 _refs: Arc<()>,
790}
791
792impl Stream for BlockingStream {
793 type Item = Result<RecordBatch>;
794
795 fn poll_next(
796 self: Pin<&mut Self>,
797 _cx: &mut Context<'_>,
798 ) -> Poll<Option<Self::Item>> {
799 Poll::Pending
800 }
801}
802
803impl RecordBatchStream for BlockingStream {
804 fn schema(&self) -> SchemaRef {
805 Arc::clone(&self.schema)
806 }
807}
808
809pub async fn assert_strong_count_converges_to_zero<T>(refs: Weak<T>) {
813 tokio::time::timeout(std::time::Duration::from_secs(10), async {
814 loop {
815 if dbg!(Weak::strong_count(&refs)) == 0 {
816 break;
817 }
818 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
819 }
820 })
821 .await
822 .unwrap();
823}
824
825#[derive(Debug)]
829pub struct PanicExec {
830 schema: SchemaRef,
832
833 batches_until_panics: Vec<usize>,
836 cache: Arc<PlanProperties>,
837}
838
839impl PanicExec {
840 pub fn new(schema: SchemaRef, n_partitions: usize) -> Self {
843 let batches_until_panics = vec![0; n_partitions];
844 let cache = Self::compute_properties(Arc::clone(&schema), &batches_until_panics);
845 Self {
846 schema,
847 batches_until_panics,
848 cache: Arc::new(cache),
849 }
850 }
851
852 pub fn with_partition_panic(mut self, partition: usize, count: usize) -> Self {
854 self.batches_until_panics[partition] = count;
855 self
856 }
857
858 fn compute_properties(
860 schema: SchemaRef,
861 batches_until_panics: &[usize],
862 ) -> PlanProperties {
863 let num_partitions = batches_until_panics.len();
864 PlanProperties::new(
865 EquivalenceProperties::new(schema),
866 Partitioning::UnknownPartitioning(num_partitions),
867 EmissionType::Incremental,
868 Boundedness::Bounded,
869 )
870 }
871}
872
873impl DisplayAs for PanicExec {
874 fn fmt_as(
875 &self,
876 t: DisplayFormatType,
877 f: &mut std::fmt::Formatter,
878 ) -> std::fmt::Result {
879 match t {
880 DisplayFormatType::Default | DisplayFormatType::Verbose => {
881 write!(f, "PanicExec",)
882 }
883 DisplayFormatType::TreeRender => {
884 write!(f, "")
886 }
887 }
888 }
889}
890
891impl ExecutionPlan for PanicExec {
892 fn name(&self) -> &'static str {
893 Self::static_name()
894 }
895
896 fn as_any(&self) -> &dyn Any {
897 self
898 }
899
900 fn properties(&self) -> &Arc<PlanProperties> {
901 &self.cache
902 }
903
904 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
905 vec![]
907 }
908
909 fn with_new_children(
910 self: Arc<Self>,
911 _: Vec<Arc<dyn ExecutionPlan>>,
912 ) -> Result<Arc<dyn ExecutionPlan>> {
913 internal_err!("Children cannot be replaced in {:?}", self)
914 }
915
916 fn execute(
917 &self,
918 partition: usize,
919 _context: Arc<TaskContext>,
920 ) -> Result<SendableRecordBatchStream> {
921 Ok(Box::pin(PanicStream {
922 partition,
923 batches_until_panic: self.batches_until_panics[partition],
924 schema: Arc::clone(&self.schema),
925 ready: false,
926 }))
927 }
928}
929
930#[derive(Debug)]
935struct PanicStream {
936 partition: usize,
938 batches_until_panic: usize,
940 schema: SchemaRef,
942 ready: bool,
944}
945
946impl Stream for PanicStream {
947 type Item = Result<RecordBatch>;
948
949 fn poll_next(
950 mut self: Pin<&mut Self>,
951 cx: &mut Context<'_>,
952 ) -> Poll<Option<Self::Item>> {
953 if self.batches_until_panic > 0 {
954 if self.ready {
955 self.batches_until_panic -= 1;
956 self.ready = false;
957 let batch = RecordBatch::new_empty(Arc::clone(&self.schema));
958 return Poll::Ready(Some(Ok(batch)));
959 } else {
960 self.ready = true;
961 cx.waker().wake_by_ref();
963 return Poll::Pending;
964 }
965 }
966 panic!("PanickingStream did panic: {}", self.partition)
967 }
968}
969
970impl RecordBatchStream for PanicStream {
971 fn schema(&self) -> SchemaRef {
972 Arc::clone(&self.schema)
973 }
974}