1use std::any::Any;
21use std::collections::HashMap;
22use std::fmt;
23use std::fmt::{Debug, Formatter};
24use std::pin::Pin;
25use std::sync::Arc;
26use std::task::Context;
27
28use crate::common;
29use crate::execution_plan::{Boundedness, EmissionType};
30use crate::memory::MemoryStream;
31use crate::metrics::MetricsSet;
32use crate::stream::RecordBatchStreamAdapter;
33use crate::streaming::PartitionStream;
34use crate::ExecutionPlan;
35use crate::{DisplayAs, DisplayFormatType, PlanProperties};
36
37use arrow::array::{Array, ArrayRef, Int32Array, RecordBatch};
38use arrow_schema::{DataType, Field, Schema, SchemaRef};
39use datafusion_common::{
40 config::ConfigOptions, internal_err, project_schema, Result, Statistics,
41};
42use datafusion_execution::{SendableRecordBatchStream, TaskContext};
43use datafusion_physical_expr::{
44 equivalence::ProjectionMapping, expressions::Column, utils::collect_columns,
45 EquivalenceProperties, LexOrdering, Partitioning,
46};
47
48use futures::{Future, FutureExt};
49
50pub mod exec;
51
52#[derive(Clone, Debug)]
60pub struct TestMemoryExec {
61 partitions: Vec<Vec<RecordBatch>>,
63 schema: SchemaRef,
65 projected_schema: SchemaRef,
67 projection: Option<Vec<usize>>,
69 sort_information: Vec<LexOrdering>,
71 show_sizes: bool,
73 fetch: Option<usize>,
76 cache: PlanProperties,
77}
78
79impl DisplayAs for TestMemoryExec {
80 fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result {
81 write!(f, "DataSourceExec: ")?;
82 match t {
83 DisplayFormatType::Default | DisplayFormatType::Verbose => {
84 let partition_sizes: Vec<_> =
85 self.partitions.iter().map(|b| b.len()).collect();
86
87 let output_ordering = self
88 .sort_information
89 .first()
90 .map(|output_ordering| format!(", output_ordering={output_ordering}"))
91 .unwrap_or_default();
92
93 let eq_properties = self.eq_properties();
94 let constraints = eq_properties.constraints();
95 let constraints = if constraints.is_empty() {
96 String::new()
97 } else {
98 format!(", {constraints}")
99 };
100
101 let limit = self
102 .fetch
103 .map_or(String::new(), |limit| format!(", fetch={limit}"));
104 if self.show_sizes {
105 write!(
106 f,
107 "partitions={}, partition_sizes={partition_sizes:?}{limit}{output_ordering}{constraints}",
108 partition_sizes.len(),
109 )
110 } else {
111 write!(
112 f,
113 "partitions={}{limit}{output_ordering}{constraints}",
114 partition_sizes.len(),
115 )
116 }
117 }
118 DisplayFormatType::TreeRender => {
119 write!(f, "")
121 }
122 }
123 }
124}
125
126impl ExecutionPlan for TestMemoryExec {
127 fn name(&self) -> &'static str {
128 "DataSourceExec"
129 }
130
131 fn as_any(&self) -> &dyn Any {
132 unimplemented!()
133 }
134
135 fn properties(&self) -> &PlanProperties {
136 &self.cache
137 }
138
139 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
140 Vec::new()
141 }
142
143 fn with_new_children(
144 self: Arc<Self>,
145 _: Vec<Arc<dyn ExecutionPlan>>,
146 ) -> Result<Arc<dyn ExecutionPlan>> {
147 unimplemented!()
148 }
149
150 fn repartitioned(
151 &self,
152 _target_partitions: usize,
153 _config: &ConfigOptions,
154 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
155 unimplemented!()
156 }
157
158 fn execute(
159 &self,
160 partition: usize,
161 context: Arc<TaskContext>,
162 ) -> Result<SendableRecordBatchStream> {
163 self.open(partition, context)
164 }
165
166 fn metrics(&self) -> Option<MetricsSet> {
167 unimplemented!()
168 }
169
170 fn statistics(&self) -> Result<Statistics> {
171 self.statistics_inner()
172 }
173
174 fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
175 if partition.is_some() {
176 Ok(Statistics::new_unknown(&self.schema))
177 } else {
178 self.statistics_inner()
179 }
180 }
181
182 fn fetch(&self) -> Option<usize> {
183 self.fetch
184 }
185}
186
187impl TestMemoryExec {
188 fn open(
189 &self,
190 partition: usize,
191 _context: Arc<TaskContext>,
192 ) -> Result<SendableRecordBatchStream> {
193 Ok(Box::pin(
194 MemoryStream::try_new(
195 self.partitions[partition].clone(),
196 Arc::clone(&self.projected_schema),
197 self.projection.clone(),
198 )?
199 .with_fetch(self.fetch),
200 ))
201 }
202
203 fn compute_properties(&self) -> PlanProperties {
204 PlanProperties::new(
205 self.eq_properties(),
206 self.output_partitioning(),
207 EmissionType::Incremental,
208 Boundedness::Bounded,
209 )
210 }
211
212 fn output_partitioning(&self) -> Partitioning {
213 Partitioning::UnknownPartitioning(self.partitions.len())
214 }
215
216 fn eq_properties(&self) -> EquivalenceProperties {
217 EquivalenceProperties::new_with_orderings(
218 Arc::clone(&self.projected_schema),
219 self.sort_information.as_slice(),
220 )
221 }
222
223 fn statistics_inner(&self) -> Result<Statistics> {
224 Ok(common::compute_record_batch_statistics(
225 &self.partitions,
226 &self.schema,
227 self.projection.clone(),
228 ))
229 }
230
231 pub fn try_new(
232 partitions: &[Vec<RecordBatch>],
233 schema: SchemaRef,
234 projection: Option<Vec<usize>>,
235 ) -> Result<Self> {
236 let projected_schema = project_schema(&schema, projection.as_ref())?;
237 Ok(Self {
238 partitions: partitions.to_vec(),
239 schema,
240 cache: PlanProperties::new(
241 EquivalenceProperties::new_with_orderings(
242 Arc::clone(&projected_schema),
243 vec![].as_slice(),
244 ),
245 Partitioning::UnknownPartitioning(partitions.len()),
246 EmissionType::Incremental,
247 Boundedness::Bounded,
248 ),
249 projected_schema,
250 projection,
251 sort_information: vec![],
252 show_sizes: true,
253 fetch: None,
254 })
255 }
256
257 pub fn try_new_exec(
260 partitions: &[Vec<RecordBatch>],
261 schema: SchemaRef,
262 projection: Option<Vec<usize>>,
263 ) -> Result<Arc<TestMemoryExec>> {
264 let mut source = Self::try_new(partitions, schema, projection)?;
265 let cache = source.compute_properties();
266 source.cache = cache;
267 Ok(Arc::new(source))
268 }
269
270 pub fn update_cache(source: Arc<TestMemoryExec>) -> TestMemoryExec {
272 let cache = source.compute_properties();
273 let source = &*source;
274 let mut source = source.clone();
275 source.cache = cache;
276 source
277 }
278
279 pub fn with_limit(mut self, limit: Option<usize>) -> Self {
281 self.fetch = limit;
282 self
283 }
284
285 pub fn partitions(&self) -> &[Vec<RecordBatch>] {
287 &self.partitions
288 }
289
290 pub fn projection(&self) -> &Option<Vec<usize>> {
292 &self.projection
293 }
294
295 pub fn sort_information(&self) -> &[LexOrdering] {
297 &self.sort_information
298 }
299
300 pub fn try_with_sort_information(
303 mut self,
304 mut sort_information: Vec<LexOrdering>,
305 ) -> Result<Self> {
306 let fields = self.schema.fields();
308 let ambiguous_column = sort_information
309 .iter()
310 .flat_map(|ordering| ordering.clone())
311 .flat_map(|expr| collect_columns(&expr.expr))
312 .find(|col| {
313 fields
314 .get(col.index())
315 .map(|field| field.name() != col.name())
316 .unwrap_or(true)
317 });
318 if let Some(col) = ambiguous_column {
319 return internal_err!(
320 "Column {:?} is not found in the original schema of the TestMemoryExec",
321 col
322 );
323 }
324
325 if let Some(projection) = &self.projection {
327 let base_eqp = EquivalenceProperties::new_with_orderings(
328 self.original_schema(),
329 &sort_information,
330 );
331 let proj_exprs = projection
332 .iter()
333 .map(|idx| {
334 let base_schema = self.original_schema();
335 let name = base_schema.field(*idx).name();
336 (Arc::new(Column::new(name, *idx)) as _, name.to_string())
337 })
338 .collect::<Vec<_>>();
339 let projection_mapping =
340 ProjectionMapping::try_new(&proj_exprs, &self.original_schema())?;
341 sort_information = base_eqp
342 .project(&projection_mapping, Arc::clone(&self.projected_schema))
343 .into_oeq_class()
344 .into_inner();
345 }
346
347 self.sort_information = sort_information;
348 Ok(self)
349 }
350
351 pub fn original_schema(&self) -> SchemaRef {
353 Arc::clone(&self.schema)
354 }
355}
356
357pub fn assert_is_pending<'a, T>(fut: &mut Pin<Box<dyn Future<Output = T> + Send + 'a>>) {
359 let waker = futures::task::noop_waker();
360 let mut cx = Context::from_waker(&waker);
361 let poll = fut.poll_unpin(&mut cx);
362
363 assert!(poll.is_pending());
364}
365
366pub fn aggr_test_schema() -> SchemaRef {
368 let mut f1 = Field::new("c1", DataType::Utf8, false);
369 f1.set_metadata(HashMap::from_iter(vec![("testing".into(), "test".into())]));
370 let schema = Schema::new(vec![
371 f1,
372 Field::new("c2", DataType::UInt32, false),
373 Field::new("c3", DataType::Int8, false),
374 Field::new("c4", DataType::Int16, false),
375 Field::new("c5", DataType::Int32, false),
376 Field::new("c6", DataType::Int64, false),
377 Field::new("c7", DataType::UInt8, false),
378 Field::new("c8", DataType::UInt16, false),
379 Field::new("c9", DataType::UInt32, false),
380 Field::new("c10", DataType::UInt64, false),
381 Field::new("c11", DataType::Float32, false),
382 Field::new("c12", DataType::Float64, false),
383 Field::new("c13", DataType::Utf8, false),
384 ]);
385
386 Arc::new(schema)
387}
388
389pub fn build_table_i32(
391 a: (&str, &Vec<i32>),
392 b: (&str, &Vec<i32>),
393 c: (&str, &Vec<i32>),
394) -> RecordBatch {
395 let schema = Schema::new(vec![
396 Field::new(a.0, DataType::Int32, false),
397 Field::new(b.0, DataType::Int32, false),
398 Field::new(c.0, DataType::Int32, false),
399 ]);
400
401 RecordBatch::try_new(
402 Arc::new(schema),
403 vec![
404 Arc::new(Int32Array::from(a.1.clone())),
405 Arc::new(Int32Array::from(b.1.clone())),
406 Arc::new(Int32Array::from(c.1.clone())),
407 ],
408 )
409 .unwrap()
410}
411
412pub fn build_table_i32_two_cols(
414 a: (&str, &Vec<i32>),
415 b: (&str, &Vec<i32>),
416) -> RecordBatch {
417 let schema = Schema::new(vec![
418 Field::new(a.0, DataType::Int32, false),
419 Field::new(b.0, DataType::Int32, false),
420 ]);
421
422 RecordBatch::try_new(
423 Arc::new(schema),
424 vec![
425 Arc::new(Int32Array::from(a.1.clone())),
426 Arc::new(Int32Array::from(b.1.clone())),
427 ],
428 )
429 .unwrap()
430}
431
432pub fn build_table_scan_i32(
434 a: (&str, &Vec<i32>),
435 b: (&str, &Vec<i32>),
436 c: (&str, &Vec<i32>),
437) -> Arc<dyn ExecutionPlan> {
438 let batch = build_table_i32(a, b, c);
439 let schema = batch.schema();
440 TestMemoryExec::try_new_exec(&[vec![batch]], schema, None).unwrap()
441}
442
443pub fn make_partition(sz: i32) -> RecordBatch {
445 let seq_start = 0;
446 let seq_end = sz;
447 let values = (seq_start..seq_end).collect::<Vec<_>>();
448 let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
449 let arr = Arc::new(Int32Array::from(values));
450 let arr = arr as ArrayRef;
451
452 RecordBatch::try_new(schema, vec![arr]).unwrap()
453}
454
455pub fn make_partition_utf8(sz: i32) -> RecordBatch {
456 let seq_start = 0;
457 let seq_end = sz;
458 let values = (seq_start..seq_end)
459 .map(|i| format!("test_long_string_that_is_roughly_42_bytes_{i}"))
460 .collect::<Vec<_>>();
461 let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Utf8, true)]));
462 let mut string_array = arrow::array::StringArray::from(values);
463 string_array.shrink_to_fit();
464 let arr = Arc::new(string_array);
465 let arr = arr as ArrayRef;
466
467 RecordBatch::try_new(schema, vec![arr]).unwrap()
468}
469
470pub fn scan_partitioned(partitions: usize) -> Arc<dyn ExecutionPlan> {
472 Arc::new(mem_exec(partitions))
473}
474
475pub fn scan_partitioned_utf8(partitions: usize) -> Arc<dyn ExecutionPlan> {
476 Arc::new(mem_exec_utf8(partitions))
477}
478
479pub fn mem_exec(partitions: usize) -> TestMemoryExec {
481 let data: Vec<Vec<_>> = (0..partitions).map(|_| vec![make_partition(100)]).collect();
482
483 let schema = data[0][0].schema();
484 let projection = None;
485
486 TestMemoryExec::try_new(&data, schema, projection).unwrap()
487}
488
489pub fn mem_exec_utf8(partitions: usize) -> TestMemoryExec {
490 let data: Vec<Vec<_>> = (0..partitions)
491 .map(|_| vec![make_partition_utf8(100)])
492 .collect();
493
494 let schema = data[0][0].schema();
495 let projection = None;
496
497 TestMemoryExec::try_new(&data, schema, projection).unwrap()
498}
499
500#[derive(Debug)]
502pub struct TestPartitionStream {
503 pub schema: SchemaRef,
504 pub batches: Vec<RecordBatch>,
505}
506
507impl TestPartitionStream {
508 pub fn new_with_batches(batches: Vec<RecordBatch>) -> Self {
510 let schema = batches[0].schema();
511 Self { schema, batches }
512 }
513}
514impl PartitionStream for TestPartitionStream {
515 fn schema(&self) -> &SchemaRef {
516 &self.schema
517 }
518 fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
519 let stream = futures::stream::iter(self.batches.clone().into_iter().map(Ok));
520 Box::pin(RecordBatchStreamAdapter::new(
521 Arc::clone(&self.schema),
522 stream,
523 ))
524 }
525}