1use std::any::Any;
21use std::collections::HashMap;
22use std::fmt;
23use std::fmt::{Debug, Formatter};
24use std::pin::Pin;
25use std::sync::Arc;
26use std::task::Context;
27
28use crate::ExecutionPlan;
29use crate::common;
30use crate::execution_plan::{Boundedness, EmissionType};
31use crate::memory::MemoryStream;
32use crate::metrics::MetricsSet;
33use crate::stream::RecordBatchStreamAdapter;
34use crate::streaming::PartitionStream;
35use crate::{DisplayAs, DisplayFormatType, PlanProperties};
36
37use arrow::array::{Array, ArrayRef, Int32Array, RecordBatch};
38use arrow_schema::{DataType, Field, Schema, SchemaRef};
39use datafusion_common::{
40 Result, Statistics, assert_or_internal_err, config::ConfigOptions, project_schema,
41};
42use datafusion_execution::{SendableRecordBatchStream, TaskContext};
43use datafusion_physical_expr::equivalence::{
44 OrderingEquivalenceClass, ProjectionMapping,
45};
46use datafusion_physical_expr::expressions::Column;
47use datafusion_physical_expr::utils::collect_columns;
48use datafusion_physical_expr::{EquivalenceProperties, LexOrdering, Partitioning};
49
50use futures::{Future, FutureExt};
51
52pub mod exec;
53
54#[derive(Clone, Debug)]
62pub struct TestMemoryExec {
63 partitions: Vec<Vec<RecordBatch>>,
65 schema: SchemaRef,
67 projected_schema: SchemaRef,
69 projection: Option<Vec<usize>>,
71 sort_information: Vec<LexOrdering>,
73 show_sizes: bool,
75 fetch: Option<usize>,
78 cache: PlanProperties,
79}
80
81impl DisplayAs for TestMemoryExec {
82 fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result {
83 write!(f, "DataSourceExec: ")?;
84 match t {
85 DisplayFormatType::Default | DisplayFormatType::Verbose => {
86 let partition_sizes: Vec<_> =
87 self.partitions.iter().map(|b| b.len()).collect();
88
89 let output_ordering = self
90 .sort_information
91 .first()
92 .map(|output_ordering| format!(", output_ordering={output_ordering}"))
93 .unwrap_or_default();
94
95 let eq_properties = self.eq_properties();
96 let constraints = eq_properties.constraints();
97 let constraints = if constraints.is_empty() {
98 String::new()
99 } else {
100 format!(", {constraints}")
101 };
102
103 let limit = self
104 .fetch
105 .map_or(String::new(), |limit| format!(", fetch={limit}"));
106 if self.show_sizes {
107 write!(
108 f,
109 "partitions={}, partition_sizes={partition_sizes:?}{limit}{output_ordering}{constraints}",
110 partition_sizes.len(),
111 )
112 } else {
113 write!(
114 f,
115 "partitions={}{limit}{output_ordering}{constraints}",
116 partition_sizes.len(),
117 )
118 }
119 }
120 DisplayFormatType::TreeRender => {
121 write!(f, "")
123 }
124 }
125 }
126}
127
128impl ExecutionPlan for TestMemoryExec {
129 fn name(&self) -> &'static str {
130 "DataSourceExec"
131 }
132
133 fn as_any(&self) -> &dyn Any {
134 self
135 }
136
137 fn properties(&self) -> &PlanProperties {
138 &self.cache
139 }
140
141 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
142 Vec::new()
143 }
144
145 fn with_new_children(
146 self: Arc<Self>,
147 _: Vec<Arc<dyn ExecutionPlan>>,
148 ) -> Result<Arc<dyn ExecutionPlan>> {
149 unimplemented!()
150 }
151
152 fn repartitioned(
153 &self,
154 _target_partitions: usize,
155 _config: &ConfigOptions,
156 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
157 unimplemented!()
158 }
159
160 fn execute(
161 &self,
162 partition: usize,
163 context: Arc<TaskContext>,
164 ) -> Result<SendableRecordBatchStream> {
165 self.open(partition, context)
166 }
167
168 fn metrics(&self) -> Option<MetricsSet> {
169 unimplemented!()
170 }
171
172 fn statistics(&self) -> Result<Statistics> {
173 self.statistics_inner()
174 }
175
176 fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
177 if partition.is_some() {
178 Ok(Statistics::new_unknown(&self.schema))
179 } else {
180 self.statistics_inner()
181 }
182 }
183
184 fn fetch(&self) -> Option<usize> {
185 self.fetch
186 }
187}
188
189impl TestMemoryExec {
190 fn open(
191 &self,
192 partition: usize,
193 _context: Arc<TaskContext>,
194 ) -> Result<SendableRecordBatchStream> {
195 Ok(Box::pin(
196 MemoryStream::try_new(
197 self.partitions[partition].clone(),
198 Arc::clone(&self.projected_schema),
199 self.projection.clone(),
200 )?
201 .with_fetch(self.fetch),
202 ))
203 }
204
205 fn compute_properties(&self) -> PlanProperties {
206 PlanProperties::new(
207 self.eq_properties(),
208 self.output_partitioning(),
209 EmissionType::Incremental,
210 Boundedness::Bounded,
211 )
212 }
213
214 fn output_partitioning(&self) -> Partitioning {
215 Partitioning::UnknownPartitioning(self.partitions.len())
216 }
217
218 fn eq_properties(&self) -> EquivalenceProperties {
219 EquivalenceProperties::new_with_orderings(
220 Arc::clone(&self.projected_schema),
221 self.sort_information.clone(),
222 )
223 }
224
225 fn statistics_inner(&self) -> Result<Statistics> {
226 Ok(common::compute_record_batch_statistics(
227 &self.partitions,
228 &self.schema,
229 self.projection.clone(),
230 ))
231 }
232
233 pub fn try_new(
234 partitions: &[Vec<RecordBatch>],
235 schema: SchemaRef,
236 projection: Option<Vec<usize>>,
237 ) -> Result<Self> {
238 let projected_schema = project_schema(&schema, projection.as_ref())?;
239 Ok(Self {
240 partitions: partitions.to_vec(),
241 schema,
242 cache: PlanProperties::new(
243 EquivalenceProperties::new_with_orderings(
244 Arc::clone(&projected_schema),
245 Vec::<LexOrdering>::new(),
246 ),
247 Partitioning::UnknownPartitioning(partitions.len()),
248 EmissionType::Incremental,
249 Boundedness::Bounded,
250 ),
251 projected_schema,
252 projection,
253 sort_information: vec![],
254 show_sizes: true,
255 fetch: None,
256 })
257 }
258
259 pub fn try_new_exec(
262 partitions: &[Vec<RecordBatch>],
263 schema: SchemaRef,
264 projection: Option<Vec<usize>>,
265 ) -> Result<Arc<TestMemoryExec>> {
266 let mut source = Self::try_new(partitions, schema, projection)?;
267 let cache = source.compute_properties();
268 source.cache = cache;
269 Ok(Arc::new(source))
270 }
271
272 pub fn update_cache(source: &Arc<TestMemoryExec>) -> TestMemoryExec {
274 let cache = source.compute_properties();
275 let mut source = (**source).clone();
276 source.cache = cache;
277 source
278 }
279
280 pub fn with_limit(mut self, limit: Option<usize>) -> Self {
282 self.fetch = limit;
283 self
284 }
285
286 pub fn partitions(&self) -> &[Vec<RecordBatch>] {
288 &self.partitions
289 }
290
291 pub fn projection(&self) -> &Option<Vec<usize>> {
293 &self.projection
294 }
295
296 pub fn sort_information(&self) -> &[LexOrdering] {
298 &self.sort_information
299 }
300
301 pub fn try_with_sort_information(
304 mut self,
305 mut sort_information: Vec<LexOrdering>,
306 ) -> Result<Self> {
307 let fields = self.schema.fields();
309 let ambiguous_column = sort_information
310 .iter()
311 .flat_map(|ordering| ordering.clone())
312 .flat_map(|expr| collect_columns(&expr.expr))
313 .find(|col| {
314 fields
315 .get(col.index())
316 .map(|field| field.name() != col.name())
317 .unwrap_or(true)
318 });
319 assert_or_internal_err!(
320 ambiguous_column.is_none(),
321 "Column {:?} is not found in the original schema of the TestMemoryExec",
322 ambiguous_column.as_ref().unwrap()
323 );
324
325 if let Some(projection) = &self.projection {
327 let base_schema = self.original_schema();
328 let proj_exprs = projection.iter().map(|idx| {
329 let name = base_schema.field(*idx).name();
330 (Arc::new(Column::new(name, *idx)) as _, name.to_string())
331 });
332 let projection_mapping =
333 ProjectionMapping::try_new(proj_exprs, &base_schema)?;
334 let base_eqp = EquivalenceProperties::new_with_orderings(
335 Arc::clone(&base_schema),
336 sort_information,
337 );
338 let proj_eqp =
339 base_eqp.project(&projection_mapping, Arc::clone(&self.projected_schema));
340 let oeq_class: OrderingEquivalenceClass = proj_eqp.into();
341 sort_information = oeq_class.into();
342 }
343
344 self.sort_information = sort_information;
345 self.cache = self.compute_properties();
346 Ok(self)
347 }
348
349 pub fn original_schema(&self) -> SchemaRef {
351 Arc::clone(&self.schema)
352 }
353}
354
355pub fn assert_is_pending<'a, T>(fut: &mut Pin<Box<dyn Future<Output = T> + Send + 'a>>) {
357 let waker = futures::task::noop_waker();
358 let mut cx = Context::from_waker(&waker);
359 let poll = fut.poll_unpin(&mut cx);
360
361 assert!(poll.is_pending());
362}
363
364pub fn aggr_test_schema() -> SchemaRef {
366 let mut f1 = Field::new("c1", DataType::Utf8, false);
367 f1.set_metadata(HashMap::from_iter(vec![("testing".into(), "test".into())]));
368 let schema = Schema::new(vec![
369 f1,
370 Field::new("c2", DataType::UInt32, false),
371 Field::new("c3", DataType::Int8, false),
372 Field::new("c4", DataType::Int16, false),
373 Field::new("c5", DataType::Int32, false),
374 Field::new("c6", DataType::Int64, false),
375 Field::new("c7", DataType::UInt8, false),
376 Field::new("c8", DataType::UInt16, false),
377 Field::new("c9", DataType::UInt32, false),
378 Field::new("c10", DataType::UInt64, false),
379 Field::new("c11", DataType::Float32, false),
380 Field::new("c12", DataType::Float64, false),
381 Field::new("c13", DataType::Utf8, false),
382 ]);
383
384 Arc::new(schema)
385}
386
387pub fn build_table_i32(
389 a: (&str, &Vec<i32>),
390 b: (&str, &Vec<i32>),
391 c: (&str, &Vec<i32>),
392) -> RecordBatch {
393 let schema = Schema::new(vec![
394 Field::new(a.0, DataType::Int32, false),
395 Field::new(b.0, DataType::Int32, false),
396 Field::new(c.0, DataType::Int32, false),
397 ]);
398
399 RecordBatch::try_new(
400 Arc::new(schema),
401 vec![
402 Arc::new(Int32Array::from(a.1.clone())),
403 Arc::new(Int32Array::from(b.1.clone())),
404 Arc::new(Int32Array::from(c.1.clone())),
405 ],
406 )
407 .unwrap()
408}
409
410pub fn build_table_i32_two_cols(
412 a: (&str, &Vec<i32>),
413 b: (&str, &Vec<i32>),
414) -> RecordBatch {
415 let schema = Schema::new(vec![
416 Field::new(a.0, DataType::Int32, false),
417 Field::new(b.0, DataType::Int32, false),
418 ]);
419
420 RecordBatch::try_new(
421 Arc::new(schema),
422 vec![
423 Arc::new(Int32Array::from(a.1.clone())),
424 Arc::new(Int32Array::from(b.1.clone())),
425 ],
426 )
427 .unwrap()
428}
429
430pub fn build_table_scan_i32(
432 a: (&str, &Vec<i32>),
433 b: (&str, &Vec<i32>),
434 c: (&str, &Vec<i32>),
435) -> Arc<dyn ExecutionPlan> {
436 let batch = build_table_i32(a, b, c);
437 let schema = batch.schema();
438 TestMemoryExec::try_new_exec(&[vec![batch]], schema, None).unwrap()
439}
440
441pub fn make_partition(sz: i32) -> RecordBatch {
443 let seq_start = 0;
444 let seq_end = sz;
445 let values = (seq_start..seq_end).collect::<Vec<_>>();
446 let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
447 let arr = Arc::new(Int32Array::from(values));
448 let arr = arr as ArrayRef;
449
450 RecordBatch::try_new(schema, vec![arr]).unwrap()
451}
452
453pub fn make_partition_utf8(sz: i32) -> RecordBatch {
454 let seq_start = 0;
455 let seq_end = sz;
456 let values = (seq_start..seq_end)
457 .map(|i| format!("test_long_string_that_is_roughly_42_bytes_{i}"))
458 .collect::<Vec<_>>();
459 let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Utf8, true)]));
460 let mut string_array = arrow::array::StringArray::from(values);
461 string_array.shrink_to_fit();
462 let arr = Arc::new(string_array);
463 let arr = arr as ArrayRef;
464
465 RecordBatch::try_new(schema, vec![arr]).unwrap()
466}
467
468pub fn scan_partitioned(partitions: usize) -> Arc<dyn ExecutionPlan> {
470 Arc::new(mem_exec(partitions))
471}
472
473pub fn scan_partitioned_utf8(partitions: usize) -> Arc<dyn ExecutionPlan> {
474 Arc::new(mem_exec_utf8(partitions))
475}
476
477pub fn mem_exec(partitions: usize) -> TestMemoryExec {
479 let data: Vec<Vec<_>> = (0..partitions).map(|_| vec![make_partition(100)]).collect();
480
481 let schema = data[0][0].schema();
482 let projection = None;
483
484 TestMemoryExec::try_new(&data, schema, projection).unwrap()
485}
486
487pub fn mem_exec_utf8(partitions: usize) -> TestMemoryExec {
488 let data: Vec<Vec<_>> = (0..partitions)
489 .map(|_| vec![make_partition_utf8(100)])
490 .collect();
491
492 let schema = data[0][0].schema();
493 let projection = None;
494
495 TestMemoryExec::try_new(&data, schema, projection).unwrap()
496}
497
498#[derive(Debug)]
500pub struct TestPartitionStream {
501 pub schema: SchemaRef,
502 pub batches: Vec<RecordBatch>,
503}
504
505impl TestPartitionStream {
506 pub fn new_with_batches(batches: Vec<RecordBatch>) -> Self {
508 let schema = batches[0].schema();
509 Self { schema, batches }
510 }
511}
512impl PartitionStream for TestPartitionStream {
513 fn schema(&self) -> &SchemaRef {
514 &self.schema
515 }
516 fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
517 let stream = futures::stream::iter(self.batches.clone().into_iter().map(Ok));
518 Box::pin(RecordBatchStreamAdapter::new(
519 Arc::clone(&self.schema),
520 stream,
521 ))
522 }
523}
524
525#[cfg(test)]
526macro_rules! assert_join_metrics {
527 ($metrics:expr, $expected_rows:expr) => {
528 assert_eq!($metrics.output_rows().unwrap(), $expected_rows);
529
530 let elapsed_compute = $metrics
531 .elapsed_compute()
532 .expect("did not find elapsed_compute metric");
533 let join_time = $metrics
534 .sum_by_name("join_time")
535 .expect("did not find join_time metric")
536 .as_usize();
537 let build_time = $metrics
538 .sum_by_name("build_time")
539 .expect("did not find build_time metric")
540 .as_usize();
541 assert!(
543 join_time + build_time <= elapsed_compute,
544 "join_time ({}) + build_time ({}) = {} was <= elapsed_compute = {}",
545 join_time,
546 build_time,
547 join_time + build_time,
548 elapsed_compute
549 );
550 };
551}
552#[cfg(test)]
553pub(crate) use assert_join_metrics;