1use std::{
21 any::Any,
22 pin::Pin,
23 sync::{Arc, Weak},
24 task::{Context, Poll},
25};
26
27use crate::{
28 common, execution_plan::Boundedness, DisplayAs, DisplayFormatType, ExecutionPlan,
29 Partitioning, PlanProperties, RecordBatchStream, SendableRecordBatchStream,
30 Statistics,
31};
32use crate::{
33 execution_plan::EmissionType,
34 stream::{RecordBatchReceiverStream, RecordBatchStreamAdapter},
35};
36
37use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
38use arrow::record_batch::RecordBatch;
39use datafusion_common::{internal_err, DataFusionError, Result};
40use datafusion_execution::TaskContext;
41use datafusion_physical_expr::EquivalenceProperties;
42
43use futures::Stream;
44use tokio::sync::Barrier;
45
46#[derive(Debug, Default, Clone)]
48pub struct BatchIndex {
49 inner: Arc<std::sync::Mutex<usize>>,
50}
51
52impl BatchIndex {
53 pub fn value(&self) -> usize {
55 let inner = self.inner.lock().unwrap();
56 *inner
57 }
58
59 pub fn incr(&self) {
61 let mut inner = self.inner.lock().unwrap();
62 *inner += 1;
63 }
64}
65
66#[derive(Debug, Default)]
68pub struct TestStream {
69 data: Vec<RecordBatch>,
71 index: BatchIndex,
73}
74
75impl TestStream {
76 pub fn new(data: Vec<RecordBatch>) -> Self {
79 Self {
80 data,
81 ..Default::default()
82 }
83 }
84
85 pub fn index(&self) -> BatchIndex {
87 self.index.clone()
88 }
89}
90
91impl Stream for TestStream {
92 type Item = Result<RecordBatch>;
93
94 fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
95 let next_batch = self.index.value();
96
97 Poll::Ready(if next_batch < self.data.len() {
98 let next_batch = self.index.value();
99 self.index.incr();
100 Some(Ok(self.data[next_batch].clone()))
101 } else {
102 None
103 })
104 }
105
106 fn size_hint(&self) -> (usize, Option<usize>) {
107 (self.data.len(), Some(self.data.len()))
108 }
109}
110
111impl RecordBatchStream for TestStream {
112 fn schema(&self) -> SchemaRef {
114 self.data[0].schema()
115 }
116}
117
118#[derive(Debug)]
121pub struct MockExec {
122 data: Vec<Result<RecordBatch>>,
124 schema: SchemaRef,
125 use_task: bool,
128 cache: PlanProperties,
129}
130
131impl MockExec {
132 pub fn new(data: Vec<Result<RecordBatch>>, schema: SchemaRef) -> Self {
140 let cache = Self::compute_properties(Arc::clone(&schema));
141 Self {
142 data,
143 schema,
144 use_task: true,
145 cache,
146 }
147 }
148
149 pub fn with_use_task(mut self, use_task: bool) -> Self {
153 self.use_task = use_task;
154 self
155 }
156
157 fn compute_properties(schema: SchemaRef) -> PlanProperties {
159 PlanProperties::new(
160 EquivalenceProperties::new(schema),
161 Partitioning::UnknownPartitioning(1),
162 EmissionType::Incremental,
163 Boundedness::Bounded,
164 )
165 }
166}
167
168impl DisplayAs for MockExec {
169 fn fmt_as(
170 &self,
171 t: DisplayFormatType,
172 f: &mut std::fmt::Formatter,
173 ) -> std::fmt::Result {
174 match t {
175 DisplayFormatType::Default | DisplayFormatType::Verbose => {
176 write!(f, "MockExec")
177 }
178 DisplayFormatType::TreeRender => {
179 write!(f, "")
181 }
182 }
183 }
184}
185
186impl ExecutionPlan for MockExec {
187 fn name(&self) -> &'static str {
188 Self::static_name()
189 }
190
191 fn as_any(&self) -> &dyn Any {
192 self
193 }
194
195 fn properties(&self) -> &PlanProperties {
196 &self.cache
197 }
198
199 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
200 vec![]
201 }
202
203 fn with_new_children(
204 self: Arc<Self>,
205 _: Vec<Arc<dyn ExecutionPlan>>,
206 ) -> Result<Arc<dyn ExecutionPlan>> {
207 unimplemented!()
208 }
209
210 fn execute(
212 &self,
213 partition: usize,
214 _context: Arc<TaskContext>,
215 ) -> Result<SendableRecordBatchStream> {
216 assert_eq!(partition, 0);
217
218 let data: Vec<_> = self
220 .data
221 .iter()
222 .map(|r| match r {
223 Ok(batch) => Ok(batch.clone()),
224 Err(e) => Err(clone_error(e)),
225 })
226 .collect();
227
228 if self.use_task {
229 let mut builder = RecordBatchReceiverStream::builder(self.schema(), 2);
230 let tx = builder.tx();
234 builder.spawn(async move {
235 for batch in data {
236 println!("Sending batch via delayed stream");
237 if let Err(e) = tx.send(batch).await {
238 println!("ERROR batch via delayed stream: {e}");
239 }
240 }
241
242 Ok(())
243 });
244 Ok(builder.build())
246 } else {
247 let stream = futures::stream::iter(data);
249 Ok(Box::pin(RecordBatchStreamAdapter::new(
250 self.schema(),
251 stream,
252 )))
253 }
254 }
255
256 fn statistics(&self) -> Result<Statistics> {
258 self.partition_statistics(None)
259 }
260
261 fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
262 if partition.is_some() {
263 return Ok(Statistics::new_unknown(&self.schema));
264 }
265 let data: Result<Vec<_>> = self
266 .data
267 .iter()
268 .map(|r| match r {
269 Ok(batch) => Ok(batch.clone()),
270 Err(e) => Err(clone_error(e)),
271 })
272 .collect();
273
274 let data = data?;
275
276 Ok(common::compute_record_batch_statistics(
277 &[data],
278 &self.schema,
279 None,
280 ))
281 }
282}
283
284fn clone_error(e: &DataFusionError) -> DataFusionError {
285 use DataFusionError::*;
286 match e {
287 Execution(msg) => Execution(msg.to_string()),
288 _ => unimplemented!(),
289 }
290}
291
292#[derive(Debug)]
296pub struct BarrierExec {
297 data: Vec<Vec<RecordBatch>>,
299 schema: SchemaRef,
300
301 barrier: Arc<Barrier>,
303 cache: PlanProperties,
304}
305
306impl BarrierExec {
307 pub fn new(data: Vec<Vec<RecordBatch>>, schema: SchemaRef) -> Self {
309 let barrier = Arc::new(Barrier::new(data.len() + 1));
311 let cache = Self::compute_properties(Arc::clone(&schema), &data);
312 Self {
313 data,
314 schema,
315 barrier,
316 cache,
317 }
318 }
319
320 pub async fn wait(&self) {
322 println!("BarrierExec::wait waiting on barrier");
323 self.barrier.wait().await;
324 println!("BarrierExec::wait done waiting");
325 }
326
327 fn compute_properties(
329 schema: SchemaRef,
330 data: &[Vec<RecordBatch>],
331 ) -> PlanProperties {
332 PlanProperties::new(
333 EquivalenceProperties::new(schema),
334 Partitioning::UnknownPartitioning(data.len()),
335 EmissionType::Incremental,
336 Boundedness::Bounded,
337 )
338 }
339}
340
341impl DisplayAs for BarrierExec {
342 fn fmt_as(
343 &self,
344 t: DisplayFormatType,
345 f: &mut std::fmt::Formatter,
346 ) -> std::fmt::Result {
347 match t {
348 DisplayFormatType::Default | DisplayFormatType::Verbose => {
349 write!(f, "BarrierExec")
350 }
351 DisplayFormatType::TreeRender => {
352 write!(f, "")
354 }
355 }
356 }
357}
358
359impl ExecutionPlan for BarrierExec {
360 fn name(&self) -> &'static str {
361 Self::static_name()
362 }
363
364 fn as_any(&self) -> &dyn Any {
365 self
366 }
367
368 fn properties(&self) -> &PlanProperties {
369 &self.cache
370 }
371
372 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
373 unimplemented!()
374 }
375
376 fn with_new_children(
377 self: Arc<Self>,
378 _: Vec<Arc<dyn ExecutionPlan>>,
379 ) -> Result<Arc<dyn ExecutionPlan>> {
380 unimplemented!()
381 }
382
383 fn execute(
385 &self,
386 partition: usize,
387 _context: Arc<TaskContext>,
388 ) -> Result<SendableRecordBatchStream> {
389 assert!(partition < self.data.len());
390
391 let mut builder = RecordBatchReceiverStream::builder(self.schema(), 2);
392
393 let data = self.data[partition].clone();
395 let b = Arc::clone(&self.barrier);
396 let tx = builder.tx();
397 builder.spawn(async move {
398 println!("Partition {partition} waiting on barrier");
399 b.wait().await;
400 for batch in data {
401 println!("Partition {partition} sending batch");
402 if let Err(e) = tx.send(Ok(batch)).await {
403 println!("ERROR batch via barrier stream stream: {e}");
404 }
405 }
406
407 Ok(())
408 });
409
410 Ok(builder.build())
412 }
413
414 fn statistics(&self) -> Result<Statistics> {
415 self.partition_statistics(None)
416 }
417
418 fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
419 if partition.is_some() {
420 return Ok(Statistics::new_unknown(&self.schema));
421 }
422 Ok(common::compute_record_batch_statistics(
423 &self.data,
424 &self.schema,
425 None,
426 ))
427 }
428}
429
430#[derive(Debug)]
432pub struct ErrorExec {
433 cache: PlanProperties,
434}
435
436impl Default for ErrorExec {
437 fn default() -> Self {
438 Self::new()
439 }
440}
441
442impl ErrorExec {
443 pub fn new() -> Self {
444 let schema = Arc::new(Schema::new(vec![Field::new(
445 "dummy",
446 DataType::Int64,
447 true,
448 )]));
449 let cache = Self::compute_properties(schema);
450 Self { cache }
451 }
452
453 fn compute_properties(schema: SchemaRef) -> PlanProperties {
455 PlanProperties::new(
456 EquivalenceProperties::new(schema),
457 Partitioning::UnknownPartitioning(1),
458 EmissionType::Incremental,
459 Boundedness::Bounded,
460 )
461 }
462}
463
464impl DisplayAs for ErrorExec {
465 fn fmt_as(
466 &self,
467 t: DisplayFormatType,
468 f: &mut std::fmt::Formatter,
469 ) -> std::fmt::Result {
470 match t {
471 DisplayFormatType::Default | DisplayFormatType::Verbose => {
472 write!(f, "ErrorExec")
473 }
474 DisplayFormatType::TreeRender => {
475 write!(f, "")
477 }
478 }
479 }
480}
481
482impl ExecutionPlan for ErrorExec {
483 fn name(&self) -> &'static str {
484 Self::static_name()
485 }
486
487 fn as_any(&self) -> &dyn Any {
488 self
489 }
490
491 fn properties(&self) -> &PlanProperties {
492 &self.cache
493 }
494
495 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
496 unimplemented!()
497 }
498
499 fn with_new_children(
500 self: Arc<Self>,
501 _: Vec<Arc<dyn ExecutionPlan>>,
502 ) -> Result<Arc<dyn ExecutionPlan>> {
503 unimplemented!()
504 }
505
506 fn execute(
508 &self,
509 partition: usize,
510 _context: Arc<TaskContext>,
511 ) -> Result<SendableRecordBatchStream> {
512 internal_err!("ErrorExec, unsurprisingly, errored in partition {partition}")
513 }
514}
515
516#[derive(Debug, Clone)]
518pub struct StatisticsExec {
519 stats: Statistics,
520 schema: Arc<Schema>,
521 cache: PlanProperties,
522}
523impl StatisticsExec {
524 pub fn new(stats: Statistics, schema: Schema) -> Self {
525 assert_eq!(
526 stats
527 .column_statistics.len(), schema.fields().len(),
528 "if defined, the column statistics vector length should be the number of fields"
529 );
530 let cache = Self::compute_properties(Arc::new(schema.clone()));
531 Self {
532 stats,
533 schema: Arc::new(schema),
534 cache,
535 }
536 }
537
538 fn compute_properties(schema: SchemaRef) -> PlanProperties {
540 PlanProperties::new(
541 EquivalenceProperties::new(schema),
542 Partitioning::UnknownPartitioning(2),
543 EmissionType::Incremental,
544 Boundedness::Bounded,
545 )
546 }
547}
548
549impl DisplayAs for StatisticsExec {
550 fn fmt_as(
551 &self,
552 t: DisplayFormatType,
553 f: &mut std::fmt::Formatter,
554 ) -> std::fmt::Result {
555 match t {
556 DisplayFormatType::Default | DisplayFormatType::Verbose => {
557 write!(
558 f,
559 "StatisticsExec: col_count={}, row_count={:?}",
560 self.schema.fields().len(),
561 self.stats.num_rows,
562 )
563 }
564 DisplayFormatType::TreeRender => {
565 write!(f, "")
567 }
568 }
569 }
570}
571
572impl ExecutionPlan for StatisticsExec {
573 fn name(&self) -> &'static str {
574 Self::static_name()
575 }
576
577 fn as_any(&self) -> &dyn Any {
578 self
579 }
580
581 fn properties(&self) -> &PlanProperties {
582 &self.cache
583 }
584
585 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
586 vec![]
587 }
588
589 fn with_new_children(
590 self: Arc<Self>,
591 _: Vec<Arc<dyn ExecutionPlan>>,
592 ) -> Result<Arc<dyn ExecutionPlan>> {
593 Ok(self)
594 }
595
596 fn execute(
597 &self,
598 _partition: usize,
599 _context: Arc<TaskContext>,
600 ) -> Result<SendableRecordBatchStream> {
601 unimplemented!("This plan only serves for testing statistics")
602 }
603
604 fn statistics(&self) -> Result<Statistics> {
605 Ok(self.stats.clone())
606 }
607
608 fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
609 Ok(if partition.is_some() {
610 Statistics::new_unknown(&self.schema)
611 } else {
612 self.stats.clone()
613 })
614 }
615}
616
617#[derive(Debug)]
621pub struct BlockingExec {
622 schema: SchemaRef,
624
625 refs: Arc<()>,
627 cache: PlanProperties,
628}
629
630impl BlockingExec {
631 pub fn new(schema: SchemaRef, n_partitions: usize) -> Self {
633 let cache = Self::compute_properties(Arc::clone(&schema), n_partitions);
634 Self {
635 schema,
636 refs: Default::default(),
637 cache,
638 }
639 }
640
641 pub fn refs(&self) -> Weak<()> {
647 Arc::downgrade(&self.refs)
648 }
649
650 fn compute_properties(schema: SchemaRef, n_partitions: usize) -> PlanProperties {
652 PlanProperties::new(
653 EquivalenceProperties::new(schema),
654 Partitioning::UnknownPartitioning(n_partitions),
655 EmissionType::Incremental,
656 Boundedness::Bounded,
657 )
658 }
659}
660
661impl DisplayAs for BlockingExec {
662 fn fmt_as(
663 &self,
664 t: DisplayFormatType,
665 f: &mut std::fmt::Formatter,
666 ) -> std::fmt::Result {
667 match t {
668 DisplayFormatType::Default | DisplayFormatType::Verbose => {
669 write!(f, "BlockingExec",)
670 }
671 DisplayFormatType::TreeRender => {
672 write!(f, "")
674 }
675 }
676 }
677}
678
679impl ExecutionPlan for BlockingExec {
680 fn name(&self) -> &'static str {
681 Self::static_name()
682 }
683
684 fn as_any(&self) -> &dyn Any {
685 self
686 }
687
688 fn properties(&self) -> &PlanProperties {
689 &self.cache
690 }
691
692 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
693 vec![]
695 }
696
697 fn with_new_children(
698 self: Arc<Self>,
699 _: Vec<Arc<dyn ExecutionPlan>>,
700 ) -> Result<Arc<dyn ExecutionPlan>> {
701 internal_err!("Children cannot be replaced in {self:?}")
702 }
703
704 fn execute(
705 &self,
706 _partition: usize,
707 _context: Arc<TaskContext>,
708 ) -> Result<SendableRecordBatchStream> {
709 Ok(Box::pin(BlockingStream {
710 schema: Arc::clone(&self.schema),
711 _refs: Arc::clone(&self.refs),
712 }))
713 }
714}
715
716#[derive(Debug)]
718pub struct BlockingStream {
719 schema: SchemaRef,
721
722 _refs: Arc<()>,
724}
725
726impl Stream for BlockingStream {
727 type Item = Result<RecordBatch>;
728
729 fn poll_next(
730 self: Pin<&mut Self>,
731 _cx: &mut Context<'_>,
732 ) -> Poll<Option<Self::Item>> {
733 Poll::Pending
734 }
735}
736
737impl RecordBatchStream for BlockingStream {
738 fn schema(&self) -> SchemaRef {
739 Arc::clone(&self.schema)
740 }
741}
742
743pub async fn assert_strong_count_converges_to_zero<T>(refs: Weak<T>) {
747 tokio::time::timeout(std::time::Duration::from_secs(10), async {
748 loop {
749 if dbg!(Weak::strong_count(&refs)) == 0 {
750 break;
751 }
752 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
753 }
754 })
755 .await
756 .unwrap();
757}
758
759#[derive(Debug)]
763pub struct PanicExec {
764 schema: SchemaRef,
766
767 batches_until_panics: Vec<usize>,
770 cache: PlanProperties,
771}
772
773impl PanicExec {
774 pub fn new(schema: SchemaRef, n_partitions: usize) -> Self {
777 let batches_until_panics = vec![0; n_partitions];
778 let cache = Self::compute_properties(Arc::clone(&schema), &batches_until_panics);
779 Self {
780 schema,
781 batches_until_panics,
782 cache,
783 }
784 }
785
786 pub fn with_partition_panic(mut self, partition: usize, count: usize) -> Self {
788 self.batches_until_panics[partition] = count;
789 self
790 }
791
792 fn compute_properties(
794 schema: SchemaRef,
795 batches_until_panics: &[usize],
796 ) -> PlanProperties {
797 let num_partitions = batches_until_panics.len();
798 PlanProperties::new(
799 EquivalenceProperties::new(schema),
800 Partitioning::UnknownPartitioning(num_partitions),
801 EmissionType::Incremental,
802 Boundedness::Bounded,
803 )
804 }
805}
806
807impl DisplayAs for PanicExec {
808 fn fmt_as(
809 &self,
810 t: DisplayFormatType,
811 f: &mut std::fmt::Formatter,
812 ) -> std::fmt::Result {
813 match t {
814 DisplayFormatType::Default | DisplayFormatType::Verbose => {
815 write!(f, "PanicExec",)
816 }
817 DisplayFormatType::TreeRender => {
818 write!(f, "")
820 }
821 }
822 }
823}
824
825impl ExecutionPlan for PanicExec {
826 fn name(&self) -> &'static str {
827 Self::static_name()
828 }
829
830 fn as_any(&self) -> &dyn Any {
831 self
832 }
833
834 fn properties(&self) -> &PlanProperties {
835 &self.cache
836 }
837
838 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
839 vec![]
841 }
842
843 fn with_new_children(
844 self: Arc<Self>,
845 _: Vec<Arc<dyn ExecutionPlan>>,
846 ) -> Result<Arc<dyn ExecutionPlan>> {
847 internal_err!("Children cannot be replaced in {:?}", self)
848 }
849
850 fn execute(
851 &self,
852 partition: usize,
853 _context: Arc<TaskContext>,
854 ) -> Result<SendableRecordBatchStream> {
855 Ok(Box::pin(PanicStream {
856 partition,
857 batches_until_panic: self.batches_until_panics[partition],
858 schema: Arc::clone(&self.schema),
859 ready: false,
860 }))
861 }
862}
863
864#[derive(Debug)]
869struct PanicStream {
870 partition: usize,
872 batches_until_panic: usize,
874 schema: SchemaRef,
876 ready: bool,
878}
879
880impl Stream for PanicStream {
881 type Item = Result<RecordBatch>;
882
883 fn poll_next(
884 mut self: Pin<&mut Self>,
885 cx: &mut Context<'_>,
886 ) -> Poll<Option<Self::Item>> {
887 if self.batches_until_panic > 0 {
888 if self.ready {
889 self.batches_until_panic -= 1;
890 self.ready = false;
891 let batch = RecordBatch::new_empty(Arc::clone(&self.schema));
892 return Poll::Ready(Some(Ok(batch)));
893 } else {
894 self.ready = true;
895 cx.waker().wake_by_ref();
897 return Poll::Pending;
898 }
899 }
900 panic!("PanickingStream did panic: {}", self.partition)
901 }
902}
903
904impl RecordBatchStream for PanicStream {
905 fn schema(&self) -> SchemaRef {
906 Arc::clone(&self.schema)
907 }
908}