1use std::{
21 any::Any,
22 pin::Pin,
23 sync::{Arc, Weak},
24 task::{Context, Poll},
25};
26
27use crate::{
28 common, execution_plan::Boundedness, DisplayAs, DisplayFormatType, ExecutionPlan,
29 Partitioning, PlanProperties, RecordBatchStream, SendableRecordBatchStream,
30 Statistics,
31};
32use crate::{
33 execution_plan::EmissionType,
34 stream::{RecordBatchReceiverStream, RecordBatchStreamAdapter},
35};
36
37use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
38use arrow::record_batch::RecordBatch;
39use datafusion_common::{internal_err, DataFusionError, Result};
40use datafusion_execution::TaskContext;
41use datafusion_physical_expr::EquivalenceProperties;
42
43use futures::Stream;
44use tokio::sync::Barrier;
45
46#[derive(Debug, Default, Clone)]
48pub struct BatchIndex {
49 inner: Arc<std::sync::Mutex<usize>>,
50}
51
52impl BatchIndex {
53 pub fn value(&self) -> usize {
55 let inner = self.inner.lock().unwrap();
56 *inner
57 }
58
59 pub fn incr(&self) {
61 let mut inner = self.inner.lock().unwrap();
62 *inner += 1;
63 }
64}
65
66#[derive(Debug, Default)]
68pub struct TestStream {
69 data: Vec<RecordBatch>,
71 index: BatchIndex,
73}
74
75impl TestStream {
76 pub fn new(data: Vec<RecordBatch>) -> Self {
79 Self {
80 data,
81 ..Default::default()
82 }
83 }
84
85 pub fn index(&self) -> BatchIndex {
87 self.index.clone()
88 }
89}
90
91impl Stream for TestStream {
92 type Item = Result<RecordBatch>;
93
94 fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
95 let next_batch = self.index.value();
96
97 Poll::Ready(if next_batch < self.data.len() {
98 let next_batch = self.index.value();
99 self.index.incr();
100 Some(Ok(self.data[next_batch].clone()))
101 } else {
102 None
103 })
104 }
105
106 fn size_hint(&self) -> (usize, Option<usize>) {
107 (self.data.len(), Some(self.data.len()))
108 }
109}
110
111impl RecordBatchStream for TestStream {
112 fn schema(&self) -> SchemaRef {
114 self.data[0].schema()
115 }
116}
117
118#[derive(Debug)]
121pub struct MockExec {
122 data: Vec<Result<RecordBatch>>,
124 schema: SchemaRef,
125 use_task: bool,
128 cache: PlanProperties,
129}
130
131impl MockExec {
132 pub fn new(data: Vec<Result<RecordBatch>>, schema: SchemaRef) -> Self {
140 let cache = Self::compute_properties(Arc::clone(&schema));
141 Self {
142 data,
143 schema,
144 use_task: true,
145 cache,
146 }
147 }
148
149 pub fn with_use_task(mut self, use_task: bool) -> Self {
153 self.use_task = use_task;
154 self
155 }
156
157 fn compute_properties(schema: SchemaRef) -> PlanProperties {
159 PlanProperties::new(
160 EquivalenceProperties::new(schema),
161 Partitioning::UnknownPartitioning(1),
162 EmissionType::Incremental,
163 Boundedness::Bounded,
164 )
165 }
166}
167
168impl DisplayAs for MockExec {
169 fn fmt_as(
170 &self,
171 t: DisplayFormatType,
172 f: &mut std::fmt::Formatter,
173 ) -> std::fmt::Result {
174 match t {
175 DisplayFormatType::Default | DisplayFormatType::Verbose => {
176 write!(f, "MockExec")
177 }
178 DisplayFormatType::TreeRender => {
179 write!(f, "")
181 }
182 }
183 }
184}
185
186impl ExecutionPlan for MockExec {
187 fn name(&self) -> &'static str {
188 Self::static_name()
189 }
190
191 fn as_any(&self) -> &dyn Any {
192 self
193 }
194
195 fn properties(&self) -> &PlanProperties {
196 &self.cache
197 }
198
199 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
200 vec![]
201 }
202
203 fn with_new_children(
204 self: Arc<Self>,
205 _: Vec<Arc<dyn ExecutionPlan>>,
206 ) -> Result<Arc<dyn ExecutionPlan>> {
207 unimplemented!()
208 }
209
210 fn execute(
212 &self,
213 partition: usize,
214 _context: Arc<TaskContext>,
215 ) -> Result<SendableRecordBatchStream> {
216 assert_eq!(partition, 0);
217
218 let data: Vec<_> = self
220 .data
221 .iter()
222 .map(|r| match r {
223 Ok(batch) => Ok(batch.clone()),
224 Err(e) => Err(clone_error(e)),
225 })
226 .collect();
227
228 if self.use_task {
229 let mut builder = RecordBatchReceiverStream::builder(self.schema(), 2);
230 let tx = builder.tx();
234 builder.spawn(async move {
235 for batch in data {
236 println!("Sending batch via delayed stream");
237 if let Err(e) = tx.send(batch).await {
238 println!("ERROR batch via delayed stream: {e}");
239 }
240 }
241
242 Ok(())
243 });
244 Ok(builder.build())
246 } else {
247 let stream = futures::stream::iter(data);
249 Ok(Box::pin(RecordBatchStreamAdapter::new(
250 self.schema(),
251 stream,
252 )))
253 }
254 }
255
256 fn statistics(&self) -> Result<Statistics> {
258 self.partition_statistics(None)
259 }
260
261 fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
262 if partition.is_some() {
263 return Ok(Statistics::new_unknown(&self.schema));
264 }
265 let data: Result<Vec<_>> = self
266 .data
267 .iter()
268 .map(|r| match r {
269 Ok(batch) => Ok(batch.clone()),
270 Err(e) => Err(clone_error(e)),
271 })
272 .collect();
273
274 let data = data?;
275
276 Ok(common::compute_record_batch_statistics(
277 &[data],
278 &self.schema,
279 None,
280 ))
281 }
282}
283
284fn clone_error(e: &DataFusionError) -> DataFusionError {
285 use DataFusionError::*;
286 match e {
287 Execution(msg) => Execution(msg.to_string()),
288 _ => unimplemented!(),
289 }
290}
291
292#[derive(Debug)]
295pub struct BarrierExec {
296 data: Vec<Vec<RecordBatch>>,
298 schema: SchemaRef,
299
300 barrier: Arc<Barrier>,
302 cache: PlanProperties,
303}
304
305impl BarrierExec {
306 pub fn new(data: Vec<Vec<RecordBatch>>, schema: SchemaRef) -> Self {
308 let barrier = Arc::new(Barrier::new(data.len() + 1));
310 let cache = Self::compute_properties(Arc::clone(&schema), &data);
311 Self {
312 data,
313 schema,
314 barrier,
315 cache,
316 }
317 }
318
319 pub async fn wait(&self) {
321 println!("BarrierExec::wait waiting on barrier");
322 self.barrier.wait().await;
323 println!("BarrierExec::wait done waiting");
324 }
325
326 fn compute_properties(
328 schema: SchemaRef,
329 data: &[Vec<RecordBatch>],
330 ) -> PlanProperties {
331 PlanProperties::new(
332 EquivalenceProperties::new(schema),
333 Partitioning::UnknownPartitioning(data.len()),
334 EmissionType::Incremental,
335 Boundedness::Bounded,
336 )
337 }
338}
339
340impl DisplayAs for BarrierExec {
341 fn fmt_as(
342 &self,
343 t: DisplayFormatType,
344 f: &mut std::fmt::Formatter,
345 ) -> std::fmt::Result {
346 match t {
347 DisplayFormatType::Default | DisplayFormatType::Verbose => {
348 write!(f, "BarrierExec")
349 }
350 DisplayFormatType::TreeRender => {
351 write!(f, "")
353 }
354 }
355 }
356}
357
358impl ExecutionPlan for BarrierExec {
359 fn name(&self) -> &'static str {
360 Self::static_name()
361 }
362
363 fn as_any(&self) -> &dyn Any {
364 self
365 }
366
367 fn properties(&self) -> &PlanProperties {
368 &self.cache
369 }
370
371 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
372 unimplemented!()
373 }
374
375 fn with_new_children(
376 self: Arc<Self>,
377 _: Vec<Arc<dyn ExecutionPlan>>,
378 ) -> Result<Arc<dyn ExecutionPlan>> {
379 unimplemented!()
380 }
381
382 fn execute(
384 &self,
385 partition: usize,
386 _context: Arc<TaskContext>,
387 ) -> Result<SendableRecordBatchStream> {
388 assert!(partition < self.data.len());
389
390 let mut builder = RecordBatchReceiverStream::builder(self.schema(), 2);
391
392 let data = self.data[partition].clone();
394 let b = Arc::clone(&self.barrier);
395 let tx = builder.tx();
396 builder.spawn(async move {
397 println!("Partition {partition} waiting on barrier");
398 b.wait().await;
399 for batch in data {
400 println!("Partition {partition} sending batch");
401 if let Err(e) = tx.send(Ok(batch)).await {
402 println!("ERROR batch via barrier stream stream: {e}");
403 }
404 }
405
406 Ok(())
407 });
408
409 Ok(builder.build())
411 }
412
413 fn statistics(&self) -> Result<Statistics> {
414 self.partition_statistics(None)
415 }
416
417 fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
418 if partition.is_some() {
419 return Ok(Statistics::new_unknown(&self.schema));
420 }
421 Ok(common::compute_record_batch_statistics(
422 &self.data,
423 &self.schema,
424 None,
425 ))
426 }
427}
428
429#[derive(Debug)]
431pub struct ErrorExec {
432 cache: PlanProperties,
433}
434
435impl Default for ErrorExec {
436 fn default() -> Self {
437 Self::new()
438 }
439}
440
441impl ErrorExec {
442 pub fn new() -> Self {
443 let schema = Arc::new(Schema::new(vec![Field::new(
444 "dummy",
445 DataType::Int64,
446 true,
447 )]));
448 let cache = Self::compute_properties(schema);
449 Self { cache }
450 }
451
452 fn compute_properties(schema: SchemaRef) -> PlanProperties {
454 PlanProperties::new(
455 EquivalenceProperties::new(schema),
456 Partitioning::UnknownPartitioning(1),
457 EmissionType::Incremental,
458 Boundedness::Bounded,
459 )
460 }
461}
462
463impl DisplayAs for ErrorExec {
464 fn fmt_as(
465 &self,
466 t: DisplayFormatType,
467 f: &mut std::fmt::Formatter,
468 ) -> std::fmt::Result {
469 match t {
470 DisplayFormatType::Default | DisplayFormatType::Verbose => {
471 write!(f, "ErrorExec")
472 }
473 DisplayFormatType::TreeRender => {
474 write!(f, "")
476 }
477 }
478 }
479}
480
481impl ExecutionPlan for ErrorExec {
482 fn name(&self) -> &'static str {
483 Self::static_name()
484 }
485
486 fn as_any(&self) -> &dyn Any {
487 self
488 }
489
490 fn properties(&self) -> &PlanProperties {
491 &self.cache
492 }
493
494 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
495 unimplemented!()
496 }
497
498 fn with_new_children(
499 self: Arc<Self>,
500 _: Vec<Arc<dyn ExecutionPlan>>,
501 ) -> Result<Arc<dyn ExecutionPlan>> {
502 unimplemented!()
503 }
504
505 fn execute(
507 &self,
508 partition: usize,
509 _context: Arc<TaskContext>,
510 ) -> Result<SendableRecordBatchStream> {
511 internal_err!("ErrorExec, unsurprisingly, errored in partition {partition}")
512 }
513}
514
515#[derive(Debug, Clone)]
517pub struct StatisticsExec {
518 stats: Statistics,
519 schema: Arc<Schema>,
520 cache: PlanProperties,
521}
522impl StatisticsExec {
523 pub fn new(stats: Statistics, schema: Schema) -> Self {
524 assert_eq!(
525 stats
526 .column_statistics.len(), schema.fields().len(),
527 "if defined, the column statistics vector length should be the number of fields"
528 );
529 let cache = Self::compute_properties(Arc::new(schema.clone()));
530 Self {
531 stats,
532 schema: Arc::new(schema),
533 cache,
534 }
535 }
536
537 fn compute_properties(schema: SchemaRef) -> PlanProperties {
539 PlanProperties::new(
540 EquivalenceProperties::new(schema),
541 Partitioning::UnknownPartitioning(2),
542 EmissionType::Incremental,
543 Boundedness::Bounded,
544 )
545 }
546}
547
548impl DisplayAs for StatisticsExec {
549 fn fmt_as(
550 &self,
551 t: DisplayFormatType,
552 f: &mut std::fmt::Formatter,
553 ) -> std::fmt::Result {
554 match t {
555 DisplayFormatType::Default | DisplayFormatType::Verbose => {
556 write!(
557 f,
558 "StatisticsExec: col_count={}, row_count={:?}",
559 self.schema.fields().len(),
560 self.stats.num_rows,
561 )
562 }
563 DisplayFormatType::TreeRender => {
564 write!(f, "")
566 }
567 }
568 }
569}
570
571impl ExecutionPlan for StatisticsExec {
572 fn name(&self) -> &'static str {
573 Self::static_name()
574 }
575
576 fn as_any(&self) -> &dyn Any {
577 self
578 }
579
580 fn properties(&self) -> &PlanProperties {
581 &self.cache
582 }
583
584 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
585 vec![]
586 }
587
588 fn with_new_children(
589 self: Arc<Self>,
590 _: Vec<Arc<dyn ExecutionPlan>>,
591 ) -> Result<Arc<dyn ExecutionPlan>> {
592 Ok(self)
593 }
594
595 fn execute(
596 &self,
597 _partition: usize,
598 _context: Arc<TaskContext>,
599 ) -> Result<SendableRecordBatchStream> {
600 unimplemented!("This plan only serves for testing statistics")
601 }
602
603 fn statistics(&self) -> Result<Statistics> {
604 Ok(self.stats.clone())
605 }
606
607 fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
608 Ok(if partition.is_some() {
609 Statistics::new_unknown(&self.schema)
610 } else {
611 self.stats.clone()
612 })
613 }
614}
615
616#[derive(Debug)]
620pub struct BlockingExec {
621 schema: SchemaRef,
623
624 refs: Arc<()>,
626 cache: PlanProperties,
627}
628
629impl BlockingExec {
630 pub fn new(schema: SchemaRef, n_partitions: usize) -> Self {
632 let cache = Self::compute_properties(Arc::clone(&schema), n_partitions);
633 Self {
634 schema,
635 refs: Default::default(),
636 cache,
637 }
638 }
639
640 pub fn refs(&self) -> Weak<()> {
646 Arc::downgrade(&self.refs)
647 }
648
649 fn compute_properties(schema: SchemaRef, n_partitions: usize) -> PlanProperties {
651 PlanProperties::new(
652 EquivalenceProperties::new(schema),
653 Partitioning::UnknownPartitioning(n_partitions),
654 EmissionType::Incremental,
655 Boundedness::Bounded,
656 )
657 }
658}
659
660impl DisplayAs for BlockingExec {
661 fn fmt_as(
662 &self,
663 t: DisplayFormatType,
664 f: &mut std::fmt::Formatter,
665 ) -> std::fmt::Result {
666 match t {
667 DisplayFormatType::Default | DisplayFormatType::Verbose => {
668 write!(f, "BlockingExec",)
669 }
670 DisplayFormatType::TreeRender => {
671 write!(f, "")
673 }
674 }
675 }
676}
677
678impl ExecutionPlan for BlockingExec {
679 fn name(&self) -> &'static str {
680 Self::static_name()
681 }
682
683 fn as_any(&self) -> &dyn Any {
684 self
685 }
686
687 fn properties(&self) -> &PlanProperties {
688 &self.cache
689 }
690
691 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
692 vec![]
694 }
695
696 fn with_new_children(
697 self: Arc<Self>,
698 _: Vec<Arc<dyn ExecutionPlan>>,
699 ) -> Result<Arc<dyn ExecutionPlan>> {
700 internal_err!("Children cannot be replaced in {self:?}")
701 }
702
703 fn execute(
704 &self,
705 _partition: usize,
706 _context: Arc<TaskContext>,
707 ) -> Result<SendableRecordBatchStream> {
708 Ok(Box::pin(BlockingStream {
709 schema: Arc::clone(&self.schema),
710 _refs: Arc::clone(&self.refs),
711 }))
712 }
713}
714
715#[derive(Debug)]
717pub struct BlockingStream {
718 schema: SchemaRef,
720
721 _refs: Arc<()>,
723}
724
725impl Stream for BlockingStream {
726 type Item = Result<RecordBatch>;
727
728 fn poll_next(
729 self: Pin<&mut Self>,
730 _cx: &mut Context<'_>,
731 ) -> Poll<Option<Self::Item>> {
732 Poll::Pending
733 }
734}
735
736impl RecordBatchStream for BlockingStream {
737 fn schema(&self) -> SchemaRef {
738 Arc::clone(&self.schema)
739 }
740}
741
742pub async fn assert_strong_count_converges_to_zero<T>(refs: Weak<T>) {
746 tokio::time::timeout(std::time::Duration::from_secs(10), async {
747 loop {
748 if dbg!(Weak::strong_count(&refs)) == 0 {
749 break;
750 }
751 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
752 }
753 })
754 .await
755 .unwrap();
756}
757
758#[derive(Debug)]
762pub struct PanicExec {
763 schema: SchemaRef,
765
766 batches_until_panics: Vec<usize>,
769 cache: PlanProperties,
770}
771
772impl PanicExec {
773 pub fn new(schema: SchemaRef, n_partitions: usize) -> Self {
776 let batches_until_panics = vec![0; n_partitions];
777 let cache = Self::compute_properties(Arc::clone(&schema), &batches_until_panics);
778 Self {
779 schema,
780 batches_until_panics,
781 cache,
782 }
783 }
784
785 pub fn with_partition_panic(mut self, partition: usize, count: usize) -> Self {
787 self.batches_until_panics[partition] = count;
788 self
789 }
790
791 fn compute_properties(
793 schema: SchemaRef,
794 batches_until_panics: &[usize],
795 ) -> PlanProperties {
796 let num_partitions = batches_until_panics.len();
797 PlanProperties::new(
798 EquivalenceProperties::new(schema),
799 Partitioning::UnknownPartitioning(num_partitions),
800 EmissionType::Incremental,
801 Boundedness::Bounded,
802 )
803 }
804}
805
806impl DisplayAs for PanicExec {
807 fn fmt_as(
808 &self,
809 t: DisplayFormatType,
810 f: &mut std::fmt::Formatter,
811 ) -> std::fmt::Result {
812 match t {
813 DisplayFormatType::Default | DisplayFormatType::Verbose => {
814 write!(f, "PanicExec",)
815 }
816 DisplayFormatType::TreeRender => {
817 write!(f, "")
819 }
820 }
821 }
822}
823
824impl ExecutionPlan for PanicExec {
825 fn name(&self) -> &'static str {
826 Self::static_name()
827 }
828
829 fn as_any(&self) -> &dyn Any {
830 self
831 }
832
833 fn properties(&self) -> &PlanProperties {
834 &self.cache
835 }
836
837 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
838 vec![]
840 }
841
842 fn with_new_children(
843 self: Arc<Self>,
844 _: Vec<Arc<dyn ExecutionPlan>>,
845 ) -> Result<Arc<dyn ExecutionPlan>> {
846 internal_err!("Children cannot be replaced in {:?}", self)
847 }
848
849 fn execute(
850 &self,
851 partition: usize,
852 _context: Arc<TaskContext>,
853 ) -> Result<SendableRecordBatchStream> {
854 Ok(Box::pin(PanicStream {
855 partition,
856 batches_until_panic: self.batches_until_panics[partition],
857 schema: Arc::clone(&self.schema),
858 ready: false,
859 }))
860 }
861}
862
863#[derive(Debug)]
868struct PanicStream {
869 partition: usize,
871 batches_until_panic: usize,
873 schema: SchemaRef,
875 ready: bool,
877}
878
879impl Stream for PanicStream {
880 type Item = Result<RecordBatch>;
881
882 fn poll_next(
883 mut self: Pin<&mut Self>,
884 cx: &mut Context<'_>,
885 ) -> Poll<Option<Self::Item>> {
886 if self.batches_until_panic > 0 {
887 if self.ready {
888 self.batches_until_panic -= 1;
889 self.ready = false;
890 let batch = RecordBatch::new_empty(Arc::clone(&self.schema));
891 return Poll::Ready(Some(Ok(batch)));
892 } else {
893 self.ready = true;
894 cx.waker().wake_by_ref();
896 return Poll::Pending;
897 }
898 }
899 panic!("PanickingStream did panic: {}", self.partition)
900 }
901}
902
903impl RecordBatchStream for PanicStream {
904 fn schema(&self) -> SchemaRef {
905 Arc::clone(&self.schema)
906 }
907}