1use std::collections::HashMap;
21use std::fmt;
22use std::fmt::{Debug, Formatter};
23use std::pin::Pin;
24use std::sync::Arc;
25use std::task::Context;
26
27use crate::ExecutionPlan;
28use crate::common;
29use crate::execution_plan::{Boundedness, EmissionType};
30use crate::memory::MemoryStream;
31use crate::metrics::MetricsSet;
32use crate::stream::RecordBatchStreamAdapter;
33use crate::streaming::PartitionStream;
34use crate::{DisplayAs, DisplayFormatType, PlanProperties};
35
36use arrow::array::{Array, ArrayRef, Int32Array, RecordBatch};
37use arrow_schema::{DataType, Field, Schema, SchemaRef};
38use datafusion_common::{
39 Result, Statistics, assert_or_internal_err, config::ConfigOptions, project_schema,
40};
41use datafusion_execution::{SendableRecordBatchStream, TaskContext};
42use datafusion_physical_expr::equivalence::{
43 OrderingEquivalenceClass, ProjectionMapping,
44};
45use datafusion_physical_expr::expressions::Column;
46use datafusion_physical_expr::utils::collect_columns;
47use datafusion_physical_expr::{EquivalenceProperties, LexOrdering, Partitioning};
48
49use futures::{Future, FutureExt};
50
51pub mod exec;
52
53#[derive(Clone, Debug)]
61pub struct TestMemoryExec {
62 partitions: Vec<Vec<RecordBatch>>,
64 schema: SchemaRef,
66 projected_schema: SchemaRef,
68 projection: Option<Vec<usize>>,
70 sort_information: Vec<LexOrdering>,
72 show_sizes: bool,
74 fetch: Option<usize>,
77 cache: Arc<PlanProperties>,
78}
79
80impl DisplayAs for TestMemoryExec {
81 fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result {
82 write!(f, "DataSourceExec: ")?;
83 match t {
84 DisplayFormatType::Default | DisplayFormatType::Verbose => {
85 let partition_sizes: Vec<_> =
86 self.partitions.iter().map(|b| b.len()).collect();
87
88 let output_ordering = self
89 .sort_information
90 .first()
91 .map(|output_ordering| format!(", output_ordering={output_ordering}"))
92 .unwrap_or_default();
93
94 let eq_properties = self.eq_properties();
95 let constraints = eq_properties.constraints();
96 let constraints = if constraints.is_empty() {
97 String::new()
98 } else {
99 format!(", {constraints}")
100 };
101
102 let limit = self
103 .fetch
104 .map_or(String::new(), |limit| format!(", fetch={limit}"));
105 if self.show_sizes {
106 write!(
107 f,
108 "partitions={}, partition_sizes={partition_sizes:?}{limit}{output_ordering}{constraints}",
109 partition_sizes.len(),
110 )
111 } else {
112 write!(
113 f,
114 "partitions={}{limit}{output_ordering}{constraints}",
115 partition_sizes.len(),
116 )
117 }
118 }
119 DisplayFormatType::TreeRender => {
120 write!(f, "")
122 }
123 }
124 }
125}
126
127impl ExecutionPlan for TestMemoryExec {
128 fn name(&self) -> &'static str {
129 "DataSourceExec"
130 }
131
132 fn properties(&self) -> &Arc<PlanProperties> {
133 &self.cache
134 }
135
136 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
137 Vec::new()
138 }
139
140 fn with_new_children(
141 self: Arc<Self>,
142 _: Vec<Arc<dyn ExecutionPlan>>,
143 ) -> Result<Arc<dyn ExecutionPlan>> {
144 Ok(self)
145 }
146
147 fn repartitioned(
148 &self,
149 _target_partitions: usize,
150 _config: &ConfigOptions,
151 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
152 unimplemented!()
153 }
154
155 fn execute(
156 &self,
157 partition: usize,
158 context: Arc<TaskContext>,
159 ) -> Result<SendableRecordBatchStream> {
160 self.open(partition, context)
161 }
162
163 fn metrics(&self) -> Option<MetricsSet> {
164 unimplemented!()
165 }
166
167 fn partition_statistics(&self, partition: Option<usize>) -> Result<Arc<Statistics>> {
168 if partition.is_some() {
169 Ok(Arc::new(Statistics::new_unknown(&self.schema)))
170 } else {
171 Ok(Arc::new(self.statistics_inner()?))
172 }
173 }
174
175 fn fetch(&self) -> Option<usize> {
176 self.fetch
177 }
178}
179
180impl TestMemoryExec {
181 fn open(
182 &self,
183 partition: usize,
184 _context: Arc<TaskContext>,
185 ) -> Result<SendableRecordBatchStream> {
186 Ok(Box::pin(
187 MemoryStream::try_new(
188 self.partitions[partition].clone(),
189 Arc::clone(&self.projected_schema),
190 self.projection.clone(),
191 )?
192 .with_fetch(self.fetch),
193 ))
194 }
195
196 fn compute_properties(&self) -> PlanProperties {
197 PlanProperties::new(
198 self.eq_properties(),
199 self.output_partitioning(),
200 EmissionType::Incremental,
201 Boundedness::Bounded,
202 )
203 }
204
205 fn output_partitioning(&self) -> Partitioning {
206 Partitioning::UnknownPartitioning(self.partitions.len())
207 }
208
209 fn eq_properties(&self) -> EquivalenceProperties {
210 EquivalenceProperties::new_with_orderings(
211 Arc::clone(&self.projected_schema),
212 self.sort_information.clone(),
213 )
214 }
215
216 fn statistics_inner(&self) -> Result<Statistics> {
217 Ok(common::compute_record_batch_statistics(
218 &self.partitions,
219 &self.schema,
220 self.projection.clone(),
221 ))
222 }
223
224 pub fn try_new(
225 partitions: &[Vec<RecordBatch>],
226 schema: SchemaRef,
227 projection: Option<Vec<usize>>,
228 ) -> Result<Self> {
229 let projected_schema = project_schema(&schema, projection.as_ref())?;
230 Ok(Self {
231 partitions: partitions.to_vec(),
232 schema,
233 cache: Arc::new(PlanProperties::new(
234 EquivalenceProperties::new_with_orderings(
235 Arc::clone(&projected_schema),
236 Vec::<LexOrdering>::new(),
237 ),
238 Partitioning::UnknownPartitioning(partitions.len()),
239 EmissionType::Incremental,
240 Boundedness::Bounded,
241 )),
242 projected_schema,
243 projection,
244 sort_information: vec![],
245 show_sizes: true,
246 fetch: None,
247 })
248 }
249
250 pub fn try_new_exec(
253 partitions: &[Vec<RecordBatch>],
254 schema: SchemaRef,
255 projection: Option<Vec<usize>>,
256 ) -> Result<Arc<TestMemoryExec>> {
257 let mut source = Self::try_new(partitions, schema, projection)?;
258 let cache = source.compute_properties();
259 source.cache = Arc::new(cache);
260 Ok(Arc::new(source))
261 }
262
263 pub fn update_cache(source: &Arc<TestMemoryExec>) -> TestMemoryExec {
265 let cache = source.compute_properties();
266 let mut source = (**source).clone();
267 source.cache = Arc::new(cache);
268 source
269 }
270
271 pub fn with_limit(mut self, limit: Option<usize>) -> Self {
273 self.fetch = limit;
274 self
275 }
276
277 pub fn partitions(&self) -> &[Vec<RecordBatch>] {
279 &self.partitions
280 }
281
282 pub fn projection(&self) -> &Option<Vec<usize>> {
284 &self.projection
285 }
286
287 pub fn sort_information(&self) -> &[LexOrdering] {
289 &self.sort_information
290 }
291
292 pub fn try_with_sort_information(
295 mut self,
296 mut sort_information: Vec<LexOrdering>,
297 ) -> Result<Self> {
298 let fields = self.schema.fields();
300 let ambiguous_column = sort_information
301 .iter()
302 .flat_map(|ordering| ordering.clone())
303 .flat_map(|expr| collect_columns(&expr.expr))
304 .find(|col| {
305 fields
306 .get(col.index())
307 .map(|field| field.name() != col.name())
308 .unwrap_or(true)
309 });
310 assert_or_internal_err!(
311 ambiguous_column.is_none(),
312 "Column {:?} is not found in the original schema of the TestMemoryExec",
313 ambiguous_column.as_ref().unwrap()
314 );
315
316 if let Some(projection) = &self.projection {
318 let base_schema = self.original_schema();
319 let proj_exprs = projection.iter().map(|idx| {
320 let name = base_schema.field(*idx).name();
321 (Arc::new(Column::new(name, *idx)) as _, name.to_string())
322 });
323 let projection_mapping =
324 ProjectionMapping::try_new(proj_exprs, &base_schema)?;
325 let base_eqp = EquivalenceProperties::new_with_orderings(
326 Arc::clone(&base_schema),
327 sort_information,
328 );
329 let proj_eqp =
330 base_eqp.project(&projection_mapping, Arc::clone(&self.projected_schema));
331 let oeq_class: OrderingEquivalenceClass = proj_eqp.into();
332 sort_information = oeq_class.into();
333 }
334
335 self.sort_information = sort_information;
336 self.cache = Arc::new(self.compute_properties());
337 Ok(self)
338 }
339
340 pub fn original_schema(&self) -> SchemaRef {
342 Arc::clone(&self.schema)
343 }
344}
345
346pub fn assert_is_pending<'a, T>(fut: &mut Pin<Box<dyn Future<Output = T> + Send + 'a>>) {
348 let waker = futures::task::noop_waker();
349 let mut cx = Context::from_waker(&waker);
350 let poll = fut.poll_unpin(&mut cx);
351
352 assert!(poll.is_pending());
353}
354
355pub fn aggr_test_schema() -> SchemaRef {
357 let mut f1 = Field::new("c1", DataType::Utf8, false);
358 f1.set_metadata(HashMap::from_iter(vec![("testing".into(), "test".into())]));
359 let schema = Schema::new(vec![
360 f1,
361 Field::new("c2", DataType::UInt32, false),
362 Field::new("c3", DataType::Int8, false),
363 Field::new("c4", DataType::Int16, false),
364 Field::new("c5", DataType::Int32, false),
365 Field::new("c6", DataType::Int64, false),
366 Field::new("c7", DataType::UInt8, false),
367 Field::new("c8", DataType::UInt16, false),
368 Field::new("c9", DataType::UInt32, false),
369 Field::new("c10", DataType::UInt64, false),
370 Field::new("c11", DataType::Float32, false),
371 Field::new("c12", DataType::Float64, false),
372 Field::new("c13", DataType::Utf8, false),
373 ]);
374
375 Arc::new(schema)
376}
377
378pub fn build_table_i32(
380 a: (&str, &Vec<i32>),
381 b: (&str, &Vec<i32>),
382 c: (&str, &Vec<i32>),
383) -> RecordBatch {
384 let schema = Schema::new(vec![
385 Field::new(a.0, DataType::Int32, false),
386 Field::new(b.0, DataType::Int32, false),
387 Field::new(c.0, DataType::Int32, false),
388 ]);
389
390 RecordBatch::try_new(
391 Arc::new(schema),
392 vec![
393 Arc::new(Int32Array::from(a.1.clone())),
394 Arc::new(Int32Array::from(b.1.clone())),
395 Arc::new(Int32Array::from(c.1.clone())),
396 ],
397 )
398 .unwrap()
399}
400
401pub fn build_table_i32_two_cols(
403 a: (&str, &Vec<i32>),
404 b: (&str, &Vec<i32>),
405) -> RecordBatch {
406 let schema = Schema::new(vec![
407 Field::new(a.0, DataType::Int32, false),
408 Field::new(b.0, DataType::Int32, false),
409 ]);
410
411 RecordBatch::try_new(
412 Arc::new(schema),
413 vec![
414 Arc::new(Int32Array::from(a.1.clone())),
415 Arc::new(Int32Array::from(b.1.clone())),
416 ],
417 )
418 .unwrap()
419}
420
421pub fn build_table_scan_i32(
423 a: (&str, &Vec<i32>),
424 b: (&str, &Vec<i32>),
425 c: (&str, &Vec<i32>),
426) -> Arc<dyn ExecutionPlan> {
427 let batch = build_table_i32(a, b, c);
428 let schema = batch.schema();
429 TestMemoryExec::try_new_exec(&[vec![batch]], schema, None).unwrap()
430}
431
432pub fn make_partition(sz: i32) -> RecordBatch {
434 let seq_start = 0;
435 let seq_end = sz;
436 let values = (seq_start..seq_end).collect::<Vec<_>>();
437 let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
438 let arr = Arc::new(Int32Array::from(values));
439 let arr = arr as ArrayRef;
440
441 RecordBatch::try_new(schema, vec![arr]).unwrap()
442}
443
444pub fn make_partition_utf8(sz: i32) -> RecordBatch {
445 let seq_start = 0;
446 let seq_end = sz;
447 let values = (seq_start..seq_end)
448 .map(|i| format!("test_long_string_that_is_roughly_42_bytes_{i}"))
449 .collect::<Vec<_>>();
450 let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Utf8, true)]));
451 let mut string_array = arrow::array::StringArray::from(values);
452 string_array.shrink_to_fit();
453 let arr = Arc::new(string_array);
454 let arr = arr as ArrayRef;
455
456 RecordBatch::try_new(schema, vec![arr]).unwrap()
457}
458
459pub fn scan_partitioned(partitions: usize) -> Arc<dyn ExecutionPlan> {
461 Arc::new(mem_exec(partitions))
462}
463
464pub fn scan_partitioned_utf8(partitions: usize) -> Arc<dyn ExecutionPlan> {
465 Arc::new(mem_exec_utf8(partitions))
466}
467
468pub fn mem_exec(partitions: usize) -> TestMemoryExec {
470 let data: Vec<Vec<_>> = (0..partitions).map(|_| vec![make_partition(100)]).collect();
471
472 let schema = data[0][0].schema();
473 let projection = None;
474
475 TestMemoryExec::try_new(&data, schema, projection).unwrap()
476}
477
478pub fn mem_exec_utf8(partitions: usize) -> TestMemoryExec {
479 let data: Vec<Vec<_>> = (0..partitions)
480 .map(|_| vec![make_partition_utf8(100)])
481 .collect();
482
483 let schema = data[0][0].schema();
484 let projection = None;
485
486 TestMemoryExec::try_new(&data, schema, projection).unwrap()
487}
488
489#[derive(Debug)]
491pub struct TestPartitionStream {
492 pub schema: SchemaRef,
493 pub batches: Vec<RecordBatch>,
494}
495
496impl TestPartitionStream {
497 pub fn new_with_batches(batches: Vec<RecordBatch>) -> Self {
499 let schema = batches[0].schema();
500 Self { schema, batches }
501 }
502}
503impl PartitionStream for TestPartitionStream {
504 fn schema(&self) -> &SchemaRef {
505 &self.schema
506 }
507 fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
508 let stream = futures::stream::iter(self.batches.clone().into_iter().map(Ok));
509 Box::pin(RecordBatchStreamAdapter::new(
510 Arc::clone(&self.schema),
511 stream,
512 ))
513 }
514}
515
516#[cfg(test)]
517macro_rules! assert_join_metrics {
518 ($metrics:expr, $expected_rows:expr) => {
519 assert_eq!($metrics.output_rows().unwrap(), $expected_rows);
520
521 let elapsed_compute = $metrics
522 .elapsed_compute()
523 .expect("did not find elapsed_compute metric");
524 let join_time = $metrics
525 .sum_by_name("join_time")
526 .expect("did not find join_time metric")
527 .as_usize();
528 let build_time = $metrics
529 .sum_by_name("build_time")
530 .expect("did not find build_time metric")
531 .as_usize();
532 assert!(
534 join_time + build_time <= elapsed_compute,
535 "join_time ({}) + build_time ({}) = {} was <= elapsed_compute = {}",
536 join_time,
537 build_time,
538 join_time + build_time,
539 elapsed_compute
540 );
541 };
542}
543#[cfg(test)]
544pub(crate) use assert_join_metrics;