1use std::any::Any;
21use std::collections::HashMap;
22use std::fmt;
23use std::fmt::{Debug, Formatter};
24use std::pin::Pin;
25use std::sync::Arc;
26use std::task::Context;
27
28use crate::ExecutionPlan;
29use crate::common;
30use crate::execution_plan::{Boundedness, EmissionType};
31use crate::memory::MemoryStream;
32use crate::metrics::MetricsSet;
33use crate::stream::RecordBatchStreamAdapter;
34use crate::streaming::PartitionStream;
35use crate::{DisplayAs, DisplayFormatType, PlanProperties};
36
37use arrow::array::{Array, ArrayRef, Int32Array, RecordBatch};
38use arrow_schema::{DataType, Field, Schema, SchemaRef};
39use datafusion_common::{
40 Result, Statistics, assert_or_internal_err, config::ConfigOptions, project_schema,
41};
42use datafusion_execution::{SendableRecordBatchStream, TaskContext};
43use datafusion_physical_expr::equivalence::{
44 OrderingEquivalenceClass, ProjectionMapping,
45};
46use datafusion_physical_expr::expressions::Column;
47use datafusion_physical_expr::utils::collect_columns;
48use datafusion_physical_expr::{EquivalenceProperties, LexOrdering, Partitioning};
49
50use futures::{Future, FutureExt};
51
52pub mod exec;
53
54#[derive(Clone, Debug)]
62pub struct TestMemoryExec {
63 partitions: Vec<Vec<RecordBatch>>,
65 schema: SchemaRef,
67 projected_schema: SchemaRef,
69 projection: Option<Vec<usize>>,
71 sort_information: Vec<LexOrdering>,
73 show_sizes: bool,
75 fetch: Option<usize>,
78 cache: Arc<PlanProperties>,
79}
80
81impl DisplayAs for TestMemoryExec {
82 fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result {
83 write!(f, "DataSourceExec: ")?;
84 match t {
85 DisplayFormatType::Default | DisplayFormatType::Verbose => {
86 let partition_sizes: Vec<_> =
87 self.partitions.iter().map(|b| b.len()).collect();
88
89 let output_ordering = self
90 .sort_information
91 .first()
92 .map(|output_ordering| format!(", output_ordering={output_ordering}"))
93 .unwrap_or_default();
94
95 let eq_properties = self.eq_properties();
96 let constraints = eq_properties.constraints();
97 let constraints = if constraints.is_empty() {
98 String::new()
99 } else {
100 format!(", {constraints}")
101 };
102
103 let limit = self
104 .fetch
105 .map_or(String::new(), |limit| format!(", fetch={limit}"));
106 if self.show_sizes {
107 write!(
108 f,
109 "partitions={}, partition_sizes={partition_sizes:?}{limit}{output_ordering}{constraints}",
110 partition_sizes.len(),
111 )
112 } else {
113 write!(
114 f,
115 "partitions={}{limit}{output_ordering}{constraints}",
116 partition_sizes.len(),
117 )
118 }
119 }
120 DisplayFormatType::TreeRender => {
121 write!(f, "")
123 }
124 }
125 }
126}
127
128impl ExecutionPlan for TestMemoryExec {
129 fn name(&self) -> &'static str {
130 "DataSourceExec"
131 }
132
133 fn as_any(&self) -> &dyn Any {
134 self
135 }
136
137 fn properties(&self) -> &Arc<PlanProperties> {
138 &self.cache
139 }
140
141 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
142 Vec::new()
143 }
144
145 fn with_new_children(
146 self: Arc<Self>,
147 _: Vec<Arc<dyn ExecutionPlan>>,
148 ) -> Result<Arc<dyn ExecutionPlan>> {
149 Ok(self)
150 }
151
152 fn repartitioned(
153 &self,
154 _target_partitions: usize,
155 _config: &ConfigOptions,
156 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
157 unimplemented!()
158 }
159
160 fn execute(
161 &self,
162 partition: usize,
163 context: Arc<TaskContext>,
164 ) -> Result<SendableRecordBatchStream> {
165 self.open(partition, context)
166 }
167
168 fn metrics(&self) -> Option<MetricsSet> {
169 unimplemented!()
170 }
171
172 fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
173 if partition.is_some() {
174 Ok(Statistics::new_unknown(&self.schema))
175 } else {
176 self.statistics_inner()
177 }
178 }
179
180 fn fetch(&self) -> Option<usize> {
181 self.fetch
182 }
183}
184
185impl TestMemoryExec {
186 fn open(
187 &self,
188 partition: usize,
189 _context: Arc<TaskContext>,
190 ) -> Result<SendableRecordBatchStream> {
191 Ok(Box::pin(
192 MemoryStream::try_new(
193 self.partitions[partition].clone(),
194 Arc::clone(&self.projected_schema),
195 self.projection.clone(),
196 )?
197 .with_fetch(self.fetch),
198 ))
199 }
200
201 fn compute_properties(&self) -> PlanProperties {
202 PlanProperties::new(
203 self.eq_properties(),
204 self.output_partitioning(),
205 EmissionType::Incremental,
206 Boundedness::Bounded,
207 )
208 }
209
210 fn output_partitioning(&self) -> Partitioning {
211 Partitioning::UnknownPartitioning(self.partitions.len())
212 }
213
214 fn eq_properties(&self) -> EquivalenceProperties {
215 EquivalenceProperties::new_with_orderings(
216 Arc::clone(&self.projected_schema),
217 self.sort_information.clone(),
218 )
219 }
220
221 fn statistics_inner(&self) -> Result<Statistics> {
222 Ok(common::compute_record_batch_statistics(
223 &self.partitions,
224 &self.schema,
225 self.projection.clone(),
226 ))
227 }
228
229 pub fn try_new(
230 partitions: &[Vec<RecordBatch>],
231 schema: SchemaRef,
232 projection: Option<Vec<usize>>,
233 ) -> Result<Self> {
234 let projected_schema = project_schema(&schema, projection.as_ref())?;
235 Ok(Self {
236 partitions: partitions.to_vec(),
237 schema,
238 cache: Arc::new(PlanProperties::new(
239 EquivalenceProperties::new_with_orderings(
240 Arc::clone(&projected_schema),
241 Vec::<LexOrdering>::new(),
242 ),
243 Partitioning::UnknownPartitioning(partitions.len()),
244 EmissionType::Incremental,
245 Boundedness::Bounded,
246 )),
247 projected_schema,
248 projection,
249 sort_information: vec![],
250 show_sizes: true,
251 fetch: None,
252 })
253 }
254
255 pub fn try_new_exec(
258 partitions: &[Vec<RecordBatch>],
259 schema: SchemaRef,
260 projection: Option<Vec<usize>>,
261 ) -> Result<Arc<TestMemoryExec>> {
262 let mut source = Self::try_new(partitions, schema, projection)?;
263 let cache = source.compute_properties();
264 source.cache = Arc::new(cache);
265 Ok(Arc::new(source))
266 }
267
268 pub fn update_cache(source: &Arc<TestMemoryExec>) -> TestMemoryExec {
270 let cache = source.compute_properties();
271 let mut source = (**source).clone();
272 source.cache = Arc::new(cache);
273 source
274 }
275
276 pub fn with_limit(mut self, limit: Option<usize>) -> Self {
278 self.fetch = limit;
279 self
280 }
281
282 pub fn partitions(&self) -> &[Vec<RecordBatch>] {
284 &self.partitions
285 }
286
287 pub fn projection(&self) -> &Option<Vec<usize>> {
289 &self.projection
290 }
291
292 pub fn sort_information(&self) -> &[LexOrdering] {
294 &self.sort_information
295 }
296
297 pub fn try_with_sort_information(
300 mut self,
301 mut sort_information: Vec<LexOrdering>,
302 ) -> Result<Self> {
303 let fields = self.schema.fields();
305 let ambiguous_column = sort_information
306 .iter()
307 .flat_map(|ordering| ordering.clone())
308 .flat_map(|expr| collect_columns(&expr.expr))
309 .find(|col| {
310 fields
311 .get(col.index())
312 .map(|field| field.name() != col.name())
313 .unwrap_or(true)
314 });
315 assert_or_internal_err!(
316 ambiguous_column.is_none(),
317 "Column {:?} is not found in the original schema of the TestMemoryExec",
318 ambiguous_column.as_ref().unwrap()
319 );
320
321 if let Some(projection) = &self.projection {
323 let base_schema = self.original_schema();
324 let proj_exprs = projection.iter().map(|idx| {
325 let name = base_schema.field(*idx).name();
326 (Arc::new(Column::new(name, *idx)) as _, name.to_string())
327 });
328 let projection_mapping =
329 ProjectionMapping::try_new(proj_exprs, &base_schema)?;
330 let base_eqp = EquivalenceProperties::new_with_orderings(
331 Arc::clone(&base_schema),
332 sort_information,
333 );
334 let proj_eqp =
335 base_eqp.project(&projection_mapping, Arc::clone(&self.projected_schema));
336 let oeq_class: OrderingEquivalenceClass = proj_eqp.into();
337 sort_information = oeq_class.into();
338 }
339
340 self.sort_information = sort_information;
341 self.cache = Arc::new(self.compute_properties());
342 Ok(self)
343 }
344
345 pub fn original_schema(&self) -> SchemaRef {
347 Arc::clone(&self.schema)
348 }
349}
350
351pub fn assert_is_pending<'a, T>(fut: &mut Pin<Box<dyn Future<Output = T> + Send + 'a>>) {
353 let waker = futures::task::noop_waker();
354 let mut cx = Context::from_waker(&waker);
355 let poll = fut.poll_unpin(&mut cx);
356
357 assert!(poll.is_pending());
358}
359
360pub fn aggr_test_schema() -> SchemaRef {
362 let mut f1 = Field::new("c1", DataType::Utf8, false);
363 f1.set_metadata(HashMap::from_iter(vec![("testing".into(), "test".into())]));
364 let schema = Schema::new(vec![
365 f1,
366 Field::new("c2", DataType::UInt32, false),
367 Field::new("c3", DataType::Int8, false),
368 Field::new("c4", DataType::Int16, false),
369 Field::new("c5", DataType::Int32, false),
370 Field::new("c6", DataType::Int64, false),
371 Field::new("c7", DataType::UInt8, false),
372 Field::new("c8", DataType::UInt16, false),
373 Field::new("c9", DataType::UInt32, false),
374 Field::new("c10", DataType::UInt64, false),
375 Field::new("c11", DataType::Float32, false),
376 Field::new("c12", DataType::Float64, false),
377 Field::new("c13", DataType::Utf8, false),
378 ]);
379
380 Arc::new(schema)
381}
382
383pub fn build_table_i32(
385 a: (&str, &Vec<i32>),
386 b: (&str, &Vec<i32>),
387 c: (&str, &Vec<i32>),
388) -> RecordBatch {
389 let schema = Schema::new(vec![
390 Field::new(a.0, DataType::Int32, false),
391 Field::new(b.0, DataType::Int32, false),
392 Field::new(c.0, DataType::Int32, false),
393 ]);
394
395 RecordBatch::try_new(
396 Arc::new(schema),
397 vec![
398 Arc::new(Int32Array::from(a.1.clone())),
399 Arc::new(Int32Array::from(b.1.clone())),
400 Arc::new(Int32Array::from(c.1.clone())),
401 ],
402 )
403 .unwrap()
404}
405
406pub fn build_table_i32_two_cols(
408 a: (&str, &Vec<i32>),
409 b: (&str, &Vec<i32>),
410) -> RecordBatch {
411 let schema = Schema::new(vec![
412 Field::new(a.0, DataType::Int32, false),
413 Field::new(b.0, DataType::Int32, false),
414 ]);
415
416 RecordBatch::try_new(
417 Arc::new(schema),
418 vec![
419 Arc::new(Int32Array::from(a.1.clone())),
420 Arc::new(Int32Array::from(b.1.clone())),
421 ],
422 )
423 .unwrap()
424}
425
426pub fn build_table_scan_i32(
428 a: (&str, &Vec<i32>),
429 b: (&str, &Vec<i32>),
430 c: (&str, &Vec<i32>),
431) -> Arc<dyn ExecutionPlan> {
432 let batch = build_table_i32(a, b, c);
433 let schema = batch.schema();
434 TestMemoryExec::try_new_exec(&[vec![batch]], schema, None).unwrap()
435}
436
437pub fn make_partition(sz: i32) -> RecordBatch {
439 let seq_start = 0;
440 let seq_end = sz;
441 let values = (seq_start..seq_end).collect::<Vec<_>>();
442 let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
443 let arr = Arc::new(Int32Array::from(values));
444 let arr = arr as ArrayRef;
445
446 RecordBatch::try_new(schema, vec![arr]).unwrap()
447}
448
449pub fn make_partition_utf8(sz: i32) -> RecordBatch {
450 let seq_start = 0;
451 let seq_end = sz;
452 let values = (seq_start..seq_end)
453 .map(|i| format!("test_long_string_that_is_roughly_42_bytes_{i}"))
454 .collect::<Vec<_>>();
455 let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Utf8, true)]));
456 let mut string_array = arrow::array::StringArray::from(values);
457 string_array.shrink_to_fit();
458 let arr = Arc::new(string_array);
459 let arr = arr as ArrayRef;
460
461 RecordBatch::try_new(schema, vec![arr]).unwrap()
462}
463
464pub fn scan_partitioned(partitions: usize) -> Arc<dyn ExecutionPlan> {
466 Arc::new(mem_exec(partitions))
467}
468
469pub fn scan_partitioned_utf8(partitions: usize) -> Arc<dyn ExecutionPlan> {
470 Arc::new(mem_exec_utf8(partitions))
471}
472
473pub fn mem_exec(partitions: usize) -> TestMemoryExec {
475 let data: Vec<Vec<_>> = (0..partitions).map(|_| vec![make_partition(100)]).collect();
476
477 let schema = data[0][0].schema();
478 let projection = None;
479
480 TestMemoryExec::try_new(&data, schema, projection).unwrap()
481}
482
483pub fn mem_exec_utf8(partitions: usize) -> TestMemoryExec {
484 let data: Vec<Vec<_>> = (0..partitions)
485 .map(|_| vec![make_partition_utf8(100)])
486 .collect();
487
488 let schema = data[0][0].schema();
489 let projection = None;
490
491 TestMemoryExec::try_new(&data, schema, projection).unwrap()
492}
493
494#[derive(Debug)]
496pub struct TestPartitionStream {
497 pub schema: SchemaRef,
498 pub batches: Vec<RecordBatch>,
499}
500
501impl TestPartitionStream {
502 pub fn new_with_batches(batches: Vec<RecordBatch>) -> Self {
504 let schema = batches[0].schema();
505 Self { schema, batches }
506 }
507}
508impl PartitionStream for TestPartitionStream {
509 fn schema(&self) -> &SchemaRef {
510 &self.schema
511 }
512 fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
513 let stream = futures::stream::iter(self.batches.clone().into_iter().map(Ok));
514 Box::pin(RecordBatchStreamAdapter::new(
515 Arc::clone(&self.schema),
516 stream,
517 ))
518 }
519}
520
521#[cfg(test)]
522macro_rules! assert_join_metrics {
523 ($metrics:expr, $expected_rows:expr) => {
524 assert_eq!($metrics.output_rows().unwrap(), $expected_rows);
525
526 let elapsed_compute = $metrics
527 .elapsed_compute()
528 .expect("did not find elapsed_compute metric");
529 let join_time = $metrics
530 .sum_by_name("join_time")
531 .expect("did not find join_time metric")
532 .as_usize();
533 let build_time = $metrics
534 .sum_by_name("build_time")
535 .expect("did not find build_time metric")
536 .as_usize();
537 assert!(
539 join_time + build_time <= elapsed_compute,
540 "join_time ({}) + build_time ({}) = {} was <= elapsed_compute = {}",
541 join_time,
542 build_time,
543 join_time + build_time,
544 elapsed_compute
545 );
546 };
547}
548#[cfg(test)]
549pub(crate) use assert_join_metrics;