1use std::any::Any;
21use std::collections::HashMap;
22use std::fmt;
23use std::fmt::{Debug, Formatter};
24use std::pin::Pin;
25use std::sync::Arc;
26use std::task::Context;
27
28use crate::common;
29use crate::execution_plan::{Boundedness, EmissionType};
30use crate::memory::MemoryStream;
31use crate::metrics::MetricsSet;
32use crate::stream::RecordBatchStreamAdapter;
33use crate::streaming::PartitionStream;
34use crate::ExecutionPlan;
35use crate::{DisplayAs, DisplayFormatType, PlanProperties};
36
37use arrow::array::{Array, ArrayRef, Int32Array, RecordBatch};
38use arrow_schema::{DataType, Field, Schema, SchemaRef};
39use datafusion_common::{
40 config::ConfigOptions, internal_err, project_schema, Result, Statistics,
41};
42use datafusion_execution::{SendableRecordBatchStream, TaskContext};
43use datafusion_physical_expr::equivalence::{
44 OrderingEquivalenceClass, ProjectionMapping,
45};
46use datafusion_physical_expr::expressions::Column;
47use datafusion_physical_expr::utils::collect_columns;
48use datafusion_physical_expr::{EquivalenceProperties, LexOrdering, Partitioning};
49
50use futures::{Future, FutureExt};
51
52pub mod exec;
53
54#[derive(Clone, Debug)]
62pub struct TestMemoryExec {
63 partitions: Vec<Vec<RecordBatch>>,
65 schema: SchemaRef,
67 projected_schema: SchemaRef,
69 projection: Option<Vec<usize>>,
71 sort_information: Vec<LexOrdering>,
73 show_sizes: bool,
75 fetch: Option<usize>,
78 cache: PlanProperties,
79}
80
81impl DisplayAs for TestMemoryExec {
82 fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result {
83 write!(f, "DataSourceExec: ")?;
84 match t {
85 DisplayFormatType::Default | DisplayFormatType::Verbose => {
86 let partition_sizes: Vec<_> =
87 self.partitions.iter().map(|b| b.len()).collect();
88
89 let output_ordering = self
90 .sort_information
91 .first()
92 .map(|output_ordering| format!(", output_ordering={output_ordering}"))
93 .unwrap_or_default();
94
95 let eq_properties = self.eq_properties();
96 let constraints = eq_properties.constraints();
97 let constraints = if constraints.is_empty() {
98 String::new()
99 } else {
100 format!(", {constraints}")
101 };
102
103 let limit = self
104 .fetch
105 .map_or(String::new(), |limit| format!(", fetch={limit}"));
106 if self.show_sizes {
107 write!(
108 f,
109 "partitions={}, partition_sizes={partition_sizes:?}{limit}{output_ordering}{constraints}",
110 partition_sizes.len(),
111 )
112 } else {
113 write!(
114 f,
115 "partitions={}{limit}{output_ordering}{constraints}",
116 partition_sizes.len(),
117 )
118 }
119 }
120 DisplayFormatType::TreeRender => {
121 write!(f, "")
123 }
124 }
125 }
126}
127
128impl ExecutionPlan for TestMemoryExec {
129 fn name(&self) -> &'static str {
130 "DataSourceExec"
131 }
132
133 fn as_any(&self) -> &dyn Any {
134 self
135 }
136
137 fn properties(&self) -> &PlanProperties {
138 &self.cache
139 }
140
141 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
142 Vec::new()
143 }
144
145 fn with_new_children(
146 self: Arc<Self>,
147 _: Vec<Arc<dyn ExecutionPlan>>,
148 ) -> Result<Arc<dyn ExecutionPlan>> {
149 unimplemented!()
150 }
151
152 fn repartitioned(
153 &self,
154 _target_partitions: usize,
155 _config: &ConfigOptions,
156 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
157 unimplemented!()
158 }
159
160 fn execute(
161 &self,
162 partition: usize,
163 context: Arc<TaskContext>,
164 ) -> Result<SendableRecordBatchStream> {
165 self.open(partition, context)
166 }
167
168 fn metrics(&self) -> Option<MetricsSet> {
169 unimplemented!()
170 }
171
172 fn statistics(&self) -> Result<Statistics> {
173 self.statistics_inner()
174 }
175
176 fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
177 if partition.is_some() {
178 Ok(Statistics::new_unknown(&self.schema))
179 } else {
180 self.statistics_inner()
181 }
182 }
183
184 fn fetch(&self) -> Option<usize> {
185 self.fetch
186 }
187}
188
189impl TestMemoryExec {
190 fn open(
191 &self,
192 partition: usize,
193 _context: Arc<TaskContext>,
194 ) -> Result<SendableRecordBatchStream> {
195 Ok(Box::pin(
196 MemoryStream::try_new(
197 self.partitions[partition].clone(),
198 Arc::clone(&self.projected_schema),
199 self.projection.clone(),
200 )?
201 .with_fetch(self.fetch),
202 ))
203 }
204
205 fn compute_properties(&self) -> PlanProperties {
206 PlanProperties::new(
207 self.eq_properties(),
208 self.output_partitioning(),
209 EmissionType::Incremental,
210 Boundedness::Bounded,
211 )
212 }
213
214 fn output_partitioning(&self) -> Partitioning {
215 Partitioning::UnknownPartitioning(self.partitions.len())
216 }
217
218 fn eq_properties(&self) -> EquivalenceProperties {
219 EquivalenceProperties::new_with_orderings(
220 Arc::clone(&self.projected_schema),
221 self.sort_information.clone(),
222 )
223 }
224
225 fn statistics_inner(&self) -> Result<Statistics> {
226 Ok(common::compute_record_batch_statistics(
227 &self.partitions,
228 &self.schema,
229 self.projection.clone(),
230 ))
231 }
232
233 pub fn try_new(
234 partitions: &[Vec<RecordBatch>],
235 schema: SchemaRef,
236 projection: Option<Vec<usize>>,
237 ) -> Result<Self> {
238 let projected_schema = project_schema(&schema, projection.as_ref())?;
239 Ok(Self {
240 partitions: partitions.to_vec(),
241 schema,
242 cache: PlanProperties::new(
243 EquivalenceProperties::new_with_orderings(
244 Arc::clone(&projected_schema),
245 Vec::<LexOrdering>::new(),
246 ),
247 Partitioning::UnknownPartitioning(partitions.len()),
248 EmissionType::Incremental,
249 Boundedness::Bounded,
250 ),
251 projected_schema,
252 projection,
253 sort_information: vec![],
254 show_sizes: true,
255 fetch: None,
256 })
257 }
258
259 pub fn try_new_exec(
262 partitions: &[Vec<RecordBatch>],
263 schema: SchemaRef,
264 projection: Option<Vec<usize>>,
265 ) -> Result<Arc<TestMemoryExec>> {
266 let mut source = Self::try_new(partitions, schema, projection)?;
267 let cache = source.compute_properties();
268 source.cache = cache;
269 Ok(Arc::new(source))
270 }
271
272 pub fn update_cache(source: Arc<TestMemoryExec>) -> TestMemoryExec {
274 let cache = source.compute_properties();
275 let source = &*source;
276 let mut source = source.clone();
277 source.cache = cache;
278 source
279 }
280
281 pub fn with_limit(mut self, limit: Option<usize>) -> Self {
283 self.fetch = limit;
284 self
285 }
286
287 pub fn partitions(&self) -> &[Vec<RecordBatch>] {
289 &self.partitions
290 }
291
292 pub fn projection(&self) -> &Option<Vec<usize>> {
294 &self.projection
295 }
296
297 pub fn sort_information(&self) -> &[LexOrdering] {
299 &self.sort_information
300 }
301
302 pub fn try_with_sort_information(
305 mut self,
306 mut sort_information: Vec<LexOrdering>,
307 ) -> Result<Self> {
308 let fields = self.schema.fields();
310 let ambiguous_column = sort_information
311 .iter()
312 .flat_map(|ordering| ordering.clone())
313 .flat_map(|expr| collect_columns(&expr.expr))
314 .find(|col| {
315 fields
316 .get(col.index())
317 .map(|field| field.name() != col.name())
318 .unwrap_or(true)
319 });
320 if let Some(col) = ambiguous_column {
321 return internal_err!(
322 "Column {:?} is not found in the original schema of the TestMemoryExec",
323 col
324 );
325 }
326
327 if let Some(projection) = &self.projection {
329 let base_schema = self.original_schema();
330 let proj_exprs = projection.iter().map(|idx| {
331 let name = base_schema.field(*idx).name();
332 (Arc::new(Column::new(name, *idx)) as _, name.to_string())
333 });
334 let projection_mapping =
335 ProjectionMapping::try_new(proj_exprs, &base_schema)?;
336 let base_eqp = EquivalenceProperties::new_with_orderings(
337 Arc::clone(&base_schema),
338 sort_information,
339 );
340 let proj_eqp =
341 base_eqp.project(&projection_mapping, Arc::clone(&self.projected_schema));
342 let oeq_class: OrderingEquivalenceClass = proj_eqp.into();
343 sort_information = oeq_class.into();
344 }
345
346 self.sort_information = sort_information;
347 Ok(self)
348 }
349
350 pub fn original_schema(&self) -> SchemaRef {
352 Arc::clone(&self.schema)
353 }
354}
355
356pub fn assert_is_pending<'a, T>(fut: &mut Pin<Box<dyn Future<Output = T> + Send + 'a>>) {
358 let waker = futures::task::noop_waker();
359 let mut cx = Context::from_waker(&waker);
360 let poll = fut.poll_unpin(&mut cx);
361
362 assert!(poll.is_pending());
363}
364
365pub fn aggr_test_schema() -> SchemaRef {
367 let mut f1 = Field::new("c1", DataType::Utf8, false);
368 f1.set_metadata(HashMap::from_iter(vec![("testing".into(), "test".into())]));
369 let schema = Schema::new(vec![
370 f1,
371 Field::new("c2", DataType::UInt32, false),
372 Field::new("c3", DataType::Int8, false),
373 Field::new("c4", DataType::Int16, false),
374 Field::new("c5", DataType::Int32, false),
375 Field::new("c6", DataType::Int64, false),
376 Field::new("c7", DataType::UInt8, false),
377 Field::new("c8", DataType::UInt16, false),
378 Field::new("c9", DataType::UInt32, false),
379 Field::new("c10", DataType::UInt64, false),
380 Field::new("c11", DataType::Float32, false),
381 Field::new("c12", DataType::Float64, false),
382 Field::new("c13", DataType::Utf8, false),
383 ]);
384
385 Arc::new(schema)
386}
387
388pub fn build_table_i32(
390 a: (&str, &Vec<i32>),
391 b: (&str, &Vec<i32>),
392 c: (&str, &Vec<i32>),
393) -> RecordBatch {
394 let schema = Schema::new(vec![
395 Field::new(a.0, DataType::Int32, false),
396 Field::new(b.0, DataType::Int32, false),
397 Field::new(c.0, DataType::Int32, false),
398 ]);
399
400 RecordBatch::try_new(
401 Arc::new(schema),
402 vec![
403 Arc::new(Int32Array::from(a.1.clone())),
404 Arc::new(Int32Array::from(b.1.clone())),
405 Arc::new(Int32Array::from(c.1.clone())),
406 ],
407 )
408 .unwrap()
409}
410
411pub fn build_table_i32_two_cols(
413 a: (&str, &Vec<i32>),
414 b: (&str, &Vec<i32>),
415) -> RecordBatch {
416 let schema = Schema::new(vec![
417 Field::new(a.0, DataType::Int32, false),
418 Field::new(b.0, DataType::Int32, false),
419 ]);
420
421 RecordBatch::try_new(
422 Arc::new(schema),
423 vec![
424 Arc::new(Int32Array::from(a.1.clone())),
425 Arc::new(Int32Array::from(b.1.clone())),
426 ],
427 )
428 .unwrap()
429}
430
431pub fn build_table_scan_i32(
433 a: (&str, &Vec<i32>),
434 b: (&str, &Vec<i32>),
435 c: (&str, &Vec<i32>),
436) -> Arc<dyn ExecutionPlan> {
437 let batch = build_table_i32(a, b, c);
438 let schema = batch.schema();
439 TestMemoryExec::try_new_exec(&[vec![batch]], schema, None).unwrap()
440}
441
442pub fn make_partition(sz: i32) -> RecordBatch {
444 let seq_start = 0;
445 let seq_end = sz;
446 let values = (seq_start..seq_end).collect::<Vec<_>>();
447 let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
448 let arr = Arc::new(Int32Array::from(values));
449 let arr = arr as ArrayRef;
450
451 RecordBatch::try_new(schema, vec![arr]).unwrap()
452}
453
454pub fn make_partition_utf8(sz: i32) -> RecordBatch {
455 let seq_start = 0;
456 let seq_end = sz;
457 let values = (seq_start..seq_end)
458 .map(|i| format!("test_long_string_that_is_roughly_42_bytes_{i}"))
459 .collect::<Vec<_>>();
460 let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Utf8, true)]));
461 let mut string_array = arrow::array::StringArray::from(values);
462 string_array.shrink_to_fit();
463 let arr = Arc::new(string_array);
464 let arr = arr as ArrayRef;
465
466 RecordBatch::try_new(schema, vec![arr]).unwrap()
467}
468
469pub fn scan_partitioned(partitions: usize) -> Arc<dyn ExecutionPlan> {
471 Arc::new(mem_exec(partitions))
472}
473
474pub fn scan_partitioned_utf8(partitions: usize) -> Arc<dyn ExecutionPlan> {
475 Arc::new(mem_exec_utf8(partitions))
476}
477
478pub fn mem_exec(partitions: usize) -> TestMemoryExec {
480 let data: Vec<Vec<_>> = (0..partitions).map(|_| vec![make_partition(100)]).collect();
481
482 let schema = data[0][0].schema();
483 let projection = None;
484
485 TestMemoryExec::try_new(&data, schema, projection).unwrap()
486}
487
488pub fn mem_exec_utf8(partitions: usize) -> TestMemoryExec {
489 let data: Vec<Vec<_>> = (0..partitions)
490 .map(|_| vec![make_partition_utf8(100)])
491 .collect();
492
493 let schema = data[0][0].schema();
494 let projection = None;
495
496 TestMemoryExec::try_new(&data, schema, projection).unwrap()
497}
498
499#[derive(Debug)]
501pub struct TestPartitionStream {
502 pub schema: SchemaRef,
503 pub batches: Vec<RecordBatch>,
504}
505
506impl TestPartitionStream {
507 pub fn new_with_batches(batches: Vec<RecordBatch>) -> Self {
509 let schema = batches[0].schema();
510 Self { schema, batches }
511 }
512}
513impl PartitionStream for TestPartitionStream {
514 fn schema(&self) -> &SchemaRef {
515 &self.schema
516 }
517 fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
518 let stream = futures::stream::iter(self.batches.clone().into_iter().map(Ok));
519 Box::pin(RecordBatchStreamAdapter::new(
520 Arc::clone(&self.schema),
521 stream,
522 ))
523 }
524}
525
526#[cfg(test)]
527macro_rules! assert_join_metrics {
528 ($metrics:expr, $expected_rows:expr) => {
529 assert_eq!($metrics.output_rows().unwrap(), $expected_rows);
530
531 let elapsed_compute = $metrics
532 .elapsed_compute()
533 .expect("did not find elapsed_compute metric");
534 let join_time = $metrics
535 .sum_by_name("join_time")
536 .expect("did not find join_time metric")
537 .as_usize();
538 let build_time = $metrics
539 .sum_by_name("build_time")
540 .expect("did not find build_time metric")
541 .as_usize();
542 assert!(
544 join_time + build_time <= elapsed_compute,
545 "join_time ({}) + build_time ({}) = {} was <= elapsed_compute = {}",
546 join_time,
547 build_time,
548 join_time + build_time,
549 elapsed_compute
550 );
551 };
552}
553#[cfg(test)]
554pub(crate) use assert_join_metrics;