Skip to main content

datafusion_datasource/file_scan_config/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`FileScanConfig`] to configure scanning of possibly partitioned
19//! file sources.
20
21pub(crate) mod sort_pushdown;
22
23use crate::file_groups::FileGroup;
24use crate::{
25    PartitionedFile, display::FileGroupsDisplay, file::FileSource,
26    file_compression_type::FileCompressionType, file_stream::FileStreamBuilder,
27    file_stream::work_source::SharedWorkSource, source::DataSource,
28    statistics::MinMaxStatistics,
29};
30use arrow::datatypes::FieldRef;
31use arrow::datatypes::{DataType, Schema, SchemaRef};
32use datafusion_common::config::ConfigOptions;
33use datafusion_common::{
34    Constraints, Result, ScalarValue, Statistics, internal_datafusion_err, internal_err,
35};
36use datafusion_execution::{
37    SendableRecordBatchStream, TaskContext, object_store::ObjectStoreUrl,
38};
39use datafusion_expr::Operator;
40
41use crate::source::OpenArgs;
42use datafusion_physical_expr::expressions::{BinaryExpr, Column};
43use datafusion_physical_expr::projection::ProjectionExprs;
44use datafusion_physical_expr::utils::reassign_expr_columns;
45use datafusion_physical_expr::{EquivalenceProperties, Partitioning, split_conjunction};
46use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
47use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
48use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
49use datafusion_physical_plan::SortOrderPushdownResult;
50use datafusion_physical_plan::coop::cooperative;
51use datafusion_physical_plan::execution_plan::SchedulingType;
52use datafusion_physical_plan::{
53    DisplayAs, DisplayFormatType,
54    display::{ProjectSchemaDisplay, display_orderings},
55    filter_pushdown::FilterPushdownPropagation,
56    metrics::ExecutionPlanMetricsSet,
57};
58use log::{debug, warn};
59use std::any::Any;
60use std::{fmt::Debug, fmt::Formatter, fmt::Result as FmtResult, sync::Arc};
61
62/// [`FileScanConfig`] represents scanning data from a group of files
63///
64/// `FileScanConfig` is used to create a [`DataSourceExec`], the physical plan
65/// for scanning files with a particular file format.
66///
67/// The [`FileSource`] (e.g. `ParquetSource`, `CsvSource`, etc.) is responsible
68/// for creating the actual execution plan to read the files based on a
69/// `FileScanConfig`. Fields in a `FileScanConfig` such as Statistics represent
70/// information about the files **before** any projection or filtering is
71/// applied in the file source.
72///
73/// Use [`FileScanConfigBuilder`] to construct a `FileScanConfig`.
74///
75/// Use [`DataSourceExec::from_data_source`] to create a [`DataSourceExec`] from
76/// a `FileScanConfig`.
77///
78/// # Example
79/// ```
80/// # use std::sync::Arc;
81/// # use arrow::datatypes::{Field, Fields, DataType, Schema, SchemaRef};
82/// # use object_store::ObjectStore;
83/// # use datafusion_common::Result;
84/// # use datafusion_datasource::file::FileSource;
85/// # use datafusion_datasource::file_groups::FileGroup;
86/// # use datafusion_datasource::PartitionedFile;
87/// # use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
88/// # use datafusion_datasource::file_stream::FileOpener;
89/// # use datafusion_datasource::source::DataSourceExec;
90/// # use datafusion_datasource::table_schema::TableSchema;
91/// # use datafusion_execution::object_store::ObjectStoreUrl;
92/// # use datafusion_physical_expr::projection::ProjectionExprs;
93/// # use datafusion_physical_plan::ExecutionPlan;
94/// # use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
95/// # let file_schema = Arc::new(Schema::new(vec![
96/// #  Field::new("c1", DataType::Int32, false),
97/// #  Field::new("c2", DataType::Int32, false),
98/// #  Field::new("c3", DataType::Int32, false),
99/// #  Field::new("c4", DataType::Int32, false),
100/// # ]));
101/// # // Note: crate mock ParquetSource, as ParquetSource is not in the datasource crate
102/// #[derive(Clone)]
103/// # struct ParquetSource {
104/// #    table_schema: TableSchema,
105/// # };
106/// # impl FileSource for ParquetSource {
107/// #  fn create_file_opener(&self, _: Arc<dyn ObjectStore>, _: &FileScanConfig, _: usize) -> Result<Arc<dyn FileOpener>> { unimplemented!() }
108/// #  fn table_schema(&self) -> &TableSchema { &self.table_schema }
109/// #  fn with_batch_size(&self, _: usize) -> Arc<dyn FileSource> { unimplemented!() }
110/// #  fn metrics(&self) -> &ExecutionPlanMetricsSet { unimplemented!() }
111/// #  fn file_type(&self) -> &str { "parquet" }
112/// #  // Note that this implementation drops the projection on the floor, it is not complete!
113/// #  fn try_pushdown_projection(&self, projection: &ProjectionExprs) -> Result<Option<Arc<dyn FileSource>>> { Ok(Some(Arc::new(self.clone()) as Arc<dyn FileSource>)) }
114/// #  }
115/// # impl ParquetSource {
116/// #  fn new(table_schema: impl Into<TableSchema>) -> Self { Self {table_schema: table_schema.into()} }
117/// # }
118/// // create FileScan config for reading parquet files from file://
119/// let object_store_url = ObjectStoreUrl::local_filesystem();
120/// let file_source = Arc::new(ParquetSource::new(file_schema.clone()));
121/// let config = FileScanConfigBuilder::new(object_store_url, file_source)
122///   .with_limit(Some(1000))            // read only the first 1000 records
123///   .with_projection_indices(Some(vec![2, 3])) // project columns 2 and 3
124///   .expect("Failed to push down projection")
125///    // Read /tmp/file1.parquet with known size of 1234 bytes in a single group
126///   .with_file(PartitionedFile::new("file1.parquet", 1234))
127///   // Read /tmp/file2.parquet 56 bytes and /tmp/file3.parquet 78 bytes
128///   // in a  single row group
129///   .with_file_group(FileGroup::new(vec![
130///    PartitionedFile::new("file2.parquet", 56),
131///    PartitionedFile::new("file3.parquet", 78),
132///   ])).build();
133/// // create an execution plan from the config
134/// let plan: Arc<dyn ExecutionPlan> = DataSourceExec::from_data_source(config);
135/// ```
136///
137/// [`DataSourceExec`]: crate::source::DataSourceExec
138/// [`DataSourceExec::from_data_source`]: crate::source::DataSourceExec::from_data_source
139#[derive(Clone)]
140pub struct FileScanConfig {
141    /// Object store URL, used to get an [`ObjectStore`] instance from
142    /// [`RuntimeEnv::object_store`]
143    ///
144    /// This `ObjectStoreUrl` should be the prefix of the absolute url for files
145    /// as `file://` or `s3://my_bucket`. It should not include the path to the
146    /// file itself. The relevant URL prefix must be registered via
147    /// [`RuntimeEnv::register_object_store`]
148    ///
149    /// [`ObjectStore`]: object_store::ObjectStore
150    /// [`RuntimeEnv::register_object_store`]: datafusion_execution::runtime_env::RuntimeEnv::register_object_store
151    /// [`RuntimeEnv::object_store`]: datafusion_execution::runtime_env::RuntimeEnv::object_store
152    pub object_store_url: ObjectStoreUrl,
153    /// List of files to be processed, grouped into partitions
154    ///
155    /// Each file must have a schema of `file_schema` or a subset. If
156    /// a particular file has a subset, the missing columns are
157    /// padded with NULLs.
158    ///
159    /// DataFusion may attempt to read each partition of files
160    /// concurrently, however files *within* a partition will be read
161    /// sequentially, one after the next.
162    pub file_groups: Vec<FileGroup>,
163    /// Table constraints
164    pub constraints: Constraints,
165    /// The maximum number of records to read from this plan. If `None`,
166    /// all records after filtering are returned.
167    pub limit: Option<usize>,
168    /// Whether the scan's limit is order sensitive
169    /// When `true`, files must be read in the exact order specified to produce
170    /// correct results (e.g., for `ORDER BY ... LIMIT` queries). When `false`,
171    /// DataFusion may reorder file processing for optimization without affecting correctness.
172    pub preserve_order: bool,
173    /// All equivalent lexicographical output orderings of this file scan, in terms of
174    /// [`FileSource::table_schema`]. See [`FileScanConfigBuilder::with_output_ordering`] for more
175    /// details.
176    ///
177    /// [`Self::eq_properties`] uses this information along with projection
178    /// and filtering information to compute the effective
179    /// [`EquivalenceProperties`]
180    pub output_ordering: Vec<LexOrdering>,
181    /// File compression type
182    pub file_compression_type: FileCompressionType,
183    /// File source such as `ParquetSource`, `CsvSource`, `JsonSource`, etc.
184    pub file_source: Arc<dyn FileSource>,
185    /// Batch size while creating new batches
186    /// Defaults to [`datafusion_common::config::ExecutionOptions`] batch_size.
187    pub batch_size: Option<usize>,
188    /// Expression adapter used to adapt filters and projections that are pushed down into the scan
189    /// from the logical schema to the physical schema of the file.
190    pub expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
191    /// Statistics for the entire table (file schema + partition columns).
192    /// See [`FileScanConfigBuilder::with_statistics`] for more details.
193    ///
194    /// The effective statistics are computed on-demand via
195    /// [`ProjectionExprs::project_statistics`].
196    ///
197    /// Note that this field is pub(crate) because accessing it directly from outside
198    /// would be incorrect if there are filters being applied, thus this should be accessed
199    /// via [`FileScanConfig::statistics`].
200    pub(crate) statistics: Statistics,
201    /// When true, file_groups are organized by partition column values
202    /// and output_partitioning will return Hash partitioning on partition columns.
203    /// This allows the optimizer to skip hash repartitioning for aggregates and joins
204    /// on partition columns.
205    ///
206    /// If the number of file partitions > target_partitions, the file partitions will be grouped
207    /// in a round-robin fashion such that number of file partitions = target_partitions.
208    pub partitioned_by_file_group: bool,
209}
210
211/// A builder for [`FileScanConfig`]'s.
212///
213/// Example:
214///
215/// ```rust
216/// # use std::sync::Arc;
217/// # use arrow::datatypes::{DataType, Field, Schema};
218/// # use datafusion_datasource::file_scan_config::{FileScanConfigBuilder, FileScanConfig};
219/// # use datafusion_datasource::file_compression_type::FileCompressionType;
220/// # use datafusion_datasource::file_groups::FileGroup;
221/// # use datafusion_datasource::PartitionedFile;
222/// # use datafusion_datasource::table_schema::TableSchema;
223/// # use datafusion_execution::object_store::ObjectStoreUrl;
224/// # use datafusion_common::Statistics;
225/// # use datafusion_datasource::file::FileSource;
226///
227/// # fn main() {
228/// # fn with_source(file_source: Arc<dyn FileSource>) {
229///     // Create a schema for our Parquet files
230///     let file_schema = Arc::new(Schema::new(vec![
231///         Field::new("id", DataType::Int32, false),
232///         Field::new("value", DataType::Utf8, false),
233///     ]));
234///
235///     // Create partition columns
236///     let partition_cols = vec![
237///         Arc::new(Field::new("date", DataType::Utf8, false)),
238///     ];
239///
240///     // Create table schema with file schema and partition columns
241///     let table_schema = TableSchema::new(file_schema, partition_cols);
242///
243///     // Create a builder for scanning Parquet files from a local filesystem
244///     let config = FileScanConfigBuilder::new(
245///         ObjectStoreUrl::local_filesystem(),
246///         file_source,
247///     )
248///     // Set a limit of 1000 rows
249///     .with_limit(Some(1000))
250///     // Project only the first column
251///     .with_projection_indices(Some(vec![0]))
252///     .expect("Failed to push down projection")
253///     // Add a file group with two files
254///     .with_file_group(FileGroup::new(vec![
255///         PartitionedFile::new("data/date=2024-01-01/file1.parquet", 1024),
256///         PartitionedFile::new("data/date=2024-01-01/file2.parquet", 2048),
257///     ]))
258///     // Set compression type
259///     .with_file_compression_type(FileCompressionType::UNCOMPRESSED)
260///     // Build the final config
261///     .build();
262/// # }
263/// # }
264/// ```
265#[derive(Clone)]
266pub struct FileScanConfigBuilder {
267    object_store_url: ObjectStoreUrl,
268    file_source: Arc<dyn FileSource>,
269    limit: Option<usize>,
270    preserve_order: bool,
271    constraints: Option<Constraints>,
272    file_groups: Vec<FileGroup>,
273    statistics: Option<Statistics>,
274    output_ordering: Vec<LexOrdering>,
275    file_compression_type: Option<FileCompressionType>,
276    batch_size: Option<usize>,
277    expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
278    partitioned_by_file_group: bool,
279}
280
281impl FileScanConfigBuilder {
282    /// Create a new [`FileScanConfigBuilder`] with default settings for scanning files.
283    ///
284    /// # Parameters:
285    /// * `object_store_url`: See [`FileScanConfig::object_store_url`]
286    /// * `file_source`: See [`FileScanConfig::file_source`]. The file source must have
287    ///   a schema set via its constructor.
288    pub fn new(
289        object_store_url: ObjectStoreUrl,
290        file_source: Arc<dyn FileSource>,
291    ) -> Self {
292        Self {
293            object_store_url,
294            file_source,
295            file_groups: vec![],
296            statistics: None,
297            output_ordering: vec![],
298            file_compression_type: None,
299            limit: None,
300            preserve_order: false,
301            constraints: None,
302            batch_size: None,
303            expr_adapter_factory: None,
304            partitioned_by_file_group: false,
305        }
306    }
307
308    /// Set the maximum number of records to read from this plan.
309    ///
310    /// If `None`, all records after filtering are returned.
311    pub fn with_limit(mut self, limit: Option<usize>) -> Self {
312        self.limit = limit;
313        self
314    }
315
316    /// Set whether the limit should be order-sensitive.
317    ///
318    /// When `true`, files must be read in the exact order specified to produce
319    /// correct results (e.g., for `ORDER BY ... LIMIT` queries). When `false`,
320    /// DataFusion may reorder file processing for optimization without
321    /// affecting correctness.
322    pub fn with_preserve_order(mut self, order_sensitive: bool) -> Self {
323        self.preserve_order = order_sensitive;
324        self
325    }
326
327    /// Set the file source for scanning files.
328    ///
329    /// This method allows you to change the file source implementation (e.g.
330    /// ParquetSource, CsvSource, etc.) after the builder has been created.
331    pub fn with_source(mut self, file_source: Arc<dyn FileSource>) -> Self {
332        self.file_source = file_source;
333        self
334    }
335
336    /// Return the table schema
337    pub fn table_schema(&self) -> &SchemaRef {
338        self.file_source.table_schema().table_schema()
339    }
340
341    /// Set the columns on which to project the data. Indexes that are higher than the
342    /// number of columns of `file_schema` refer to `table_partition_cols`.
343    ///
344    /// # Deprecated
345    /// Use [`Self::with_projection_indices`] instead. This method will be removed in a future release.
346    #[deprecated(since = "51.0.0", note = "Use with_projection_indices instead")]
347    pub fn with_projection(self, indices: Option<Vec<usize>>) -> Self {
348        match self.clone().with_projection_indices(indices) {
349            Ok(builder) => builder,
350            Err(e) => {
351                warn!(
352                    "Failed to push down projection in FileScanConfigBuilder::with_projection: {e}"
353                );
354                self
355            }
356        }
357    }
358
359    /// Set the columns on which to project the data using column indices.
360    ///
361    /// This method attempts to push down the projection to the underlying file
362    /// source if supported. If the file source does not support projection
363    /// pushdown, an error is returned.
364    ///
365    /// Indexes that are higher than the number of columns of `file_schema`
366    /// refer to `table_partition_cols`.
367    pub fn with_projection_indices(
368        mut self,
369        indices: Option<Vec<usize>>,
370    ) -> Result<Self> {
371        let projection_exprs = indices.map(|indices| {
372            ProjectionExprs::from_indices(
373                &indices,
374                self.file_source.table_schema().table_schema(),
375            )
376        });
377        let Some(projection_exprs) = projection_exprs else {
378            return Ok(self);
379        };
380        let new_source = self
381            .file_source
382            .try_pushdown_projection(&projection_exprs)
383            .map_err(|e| {
384                internal_datafusion_err!(
385                    "Failed to push down projection in FileScanConfigBuilder::build: {e}"
386                )
387            })?;
388        if let Some(new_source) = new_source {
389            self.file_source = new_source;
390        } else {
391            internal_err!(
392                "FileSource {} does not support projection pushdown",
393                self.file_source.file_type()
394            )?;
395        }
396        Ok(self)
397    }
398
399    /// Set the table constraints
400    pub fn with_constraints(mut self, constraints: Constraints) -> Self {
401        self.constraints = Some(constraints);
402        self
403    }
404
405    /// Set the statistics of the files, including partition
406    /// columns. Defaults to [`Statistics::new_unknown`].
407    ///
408    /// These statistics are for the entire table (file schema + partition
409    /// columns) before any projection or filtering is applied. Projections are
410    /// applied when statistics are retrieved, and if a filter is present,
411    /// [`FileScanConfig::statistics`] will mark the statistics as inexact
412    /// (counts are not adjusted).
413    ///
414    /// Projections and filters may be applied by the file source, either by
415    /// [`Self::with_projection_indices`] or a preexisting
416    /// [`FileSource::projection`] or [`FileSource::filter`].
417    pub fn with_statistics(mut self, statistics: Statistics) -> Self {
418        self.statistics = Some(statistics);
419        self
420    }
421
422    /// Set the list of files to be processed, grouped into partitions.
423    ///
424    /// Each file must have a schema of `file_schema` or a subset. If
425    /// a particular file has a subset, the missing columns are
426    /// padded with NULLs.
427    ///
428    /// DataFusion may attempt to read each partition of files
429    /// concurrently, however files *within* a partition will be read
430    /// sequentially, one after the next.
431    pub fn with_file_groups(mut self, file_groups: Vec<FileGroup>) -> Self {
432        self.file_groups = file_groups;
433        self
434    }
435
436    /// Add a new file group
437    ///
438    /// See [`Self::with_file_groups`] for more information
439    pub fn with_file_group(mut self, file_group: FileGroup) -> Self {
440        self.file_groups.push(file_group);
441        self
442    }
443
444    /// Add a file as a single group
445    ///
446    /// See [`Self::with_file_groups`] for more information.
447    pub fn with_file(self, partitioned_file: PartitionedFile) -> Self {
448        self.with_file_group(FileGroup::new(vec![partitioned_file]))
449    }
450
451    /// Set the output ordering of the files
452    ///
453    /// The expressions are in terms of the entire table schema (file schema +
454    /// partition columns), before any projection or filtering from the file
455    /// scan is applied.
456    ///
457    /// This is used for optimization purposes, e.g. to determine if a file scan
458    /// can satisfy an `ORDER BY` without an additional sort.
459    pub fn with_output_ordering(mut self, output_ordering: Vec<LexOrdering>) -> Self {
460        self.output_ordering = output_ordering;
461        self
462    }
463
464    /// Set the file compression type
465    pub fn with_file_compression_type(
466        mut self,
467        file_compression_type: FileCompressionType,
468    ) -> Self {
469        self.file_compression_type = Some(file_compression_type);
470        self
471    }
472
473    /// Set the batch_size property
474    pub fn with_batch_size(mut self, batch_size: Option<usize>) -> Self {
475        self.batch_size = batch_size;
476        self
477    }
478
479    /// Register an expression adapter used to adapt filters and projections that are pushed down into the scan
480    /// from the logical schema to the physical schema of the file.
481    /// This can include things like:
482    /// - Column ordering changes
483    /// - Handling of missing columns
484    /// - Rewriting expression to use pre-computed values or file format specific optimizations
485    pub fn with_expr_adapter(
486        mut self,
487        expr_adapter: Option<Arc<dyn PhysicalExprAdapterFactory>>,
488    ) -> Self {
489        self.expr_adapter_factory = expr_adapter;
490        self
491    }
492
493    /// Set whether file groups are organized by partition column values.
494    ///
495    /// When set to true, the output partitioning will be declared as Hash partitioning
496    /// on the partition columns.
497    pub fn with_partitioned_by_file_group(
498        mut self,
499        partitioned_by_file_group: bool,
500    ) -> Self {
501        self.partitioned_by_file_group = partitioned_by_file_group;
502        self
503    }
504
505    /// Build the final [`FileScanConfig`] with all the configured settings.
506    ///
507    /// This method takes ownership of the builder and returns the constructed `FileScanConfig`.
508    /// Any unset optional fields will use their default values.
509    ///
510    /// # Errors
511    /// Returns an error if projection pushdown fails or if schema operations fail.
512    pub fn build(self) -> FileScanConfig {
513        let Self {
514            object_store_url,
515            file_source,
516            limit,
517            preserve_order,
518            constraints,
519            file_groups,
520            statistics,
521            output_ordering,
522            file_compression_type,
523            batch_size,
524            expr_adapter_factory: expr_adapter,
525            partitioned_by_file_group,
526        } = self;
527
528        let constraints = constraints.unwrap_or_default();
529        let statistics = statistics.unwrap_or_else(|| {
530            Statistics::new_unknown(file_source.table_schema().table_schema())
531        });
532        let file_compression_type =
533            file_compression_type.unwrap_or(FileCompressionType::UNCOMPRESSED);
534
535        // If there is an output ordering, we should preserve it.
536        let preserve_order = preserve_order || !output_ordering.is_empty();
537
538        FileScanConfig {
539            object_store_url,
540            file_source,
541            limit,
542            preserve_order,
543            constraints,
544            file_groups,
545            output_ordering,
546            file_compression_type,
547            batch_size,
548            expr_adapter_factory: expr_adapter,
549            statistics,
550            partitioned_by_file_group,
551        }
552    }
553}
554
555impl From<FileScanConfig> for FileScanConfigBuilder {
556    fn from(config: FileScanConfig) -> Self {
557        Self {
558            object_store_url: config.object_store_url,
559            file_source: Arc::<dyn FileSource>::clone(&config.file_source),
560            file_groups: config.file_groups,
561            statistics: Some(config.statistics),
562            output_ordering: config.output_ordering,
563            file_compression_type: Some(config.file_compression_type),
564            limit: config.limit,
565            preserve_order: config.preserve_order,
566            constraints: Some(config.constraints),
567            batch_size: config.batch_size,
568            expr_adapter_factory: config.expr_adapter_factory,
569            partitioned_by_file_group: config.partitioned_by_file_group,
570        }
571    }
572}
573
574impl DataSource for FileScanConfig {
575    fn open(
576        &self,
577        partition: usize,
578        context: Arc<TaskContext>,
579    ) -> Result<SendableRecordBatchStream> {
580        self.open_with_args(OpenArgs::new(partition, context))
581    }
582
583    fn open_with_args(&self, args: OpenArgs) -> Result<SendableRecordBatchStream> {
584        let OpenArgs {
585            partition,
586            context,
587            sibling_state,
588        } = args;
589        let object_store = context.runtime_env().object_store(&self.object_store_url)?;
590        let batch_size = self
591            .batch_size
592            .unwrap_or_else(|| context.session_config().batch_size());
593
594        let source = self.file_source.with_batch_size(batch_size);
595
596        let morselizer = source.create_morselizer(object_store, self, partition)?;
597
598        // Extract the shared work source from the sibling state if it exists.
599        // This allows multiple sibling streams to steal work from a single
600        // shared queue of unopened files.
601        let shared_work_source = sibling_state
602            .as_ref()
603            .and_then(|state| state.downcast_ref::<SharedWorkSource>())
604            .cloned();
605
606        let stream = FileStreamBuilder::new(self)
607            .with_partition(partition)
608            .with_shared_work_source(shared_work_source)
609            .with_morselizer(morselizer)
610            .with_metrics(source.metrics())
611            .build()?;
612        Ok(Box::pin(cooperative(stream)))
613    }
614
615    fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult {
616        match t {
617            DisplayFormatType::Default | DisplayFormatType::Verbose => {
618                let schema = self.projected_schema().map_err(|_| std::fmt::Error {})?;
619                let orderings =
620                    sort_pushdown::get_projected_output_ordering(self, &schema);
621
622                write!(f, "file_groups=")?;
623                FileGroupsDisplay(&self.file_groups).fmt_as(t, f)?;
624
625                if !schema.fields().is_empty() {
626                    if let Some(projection) = self.file_source.projection() {
627                        // This matches what ProjectionExec does.
628                        // TODO: can we put this into ProjectionExprs so that it's shared code?
629                        let expr: Vec<String> = projection
630                            .as_ref()
631                            .iter()
632                            .map(|proj_expr| {
633                                if let Some(column) =
634                                    proj_expr.expr.downcast_ref::<Column>()
635                                {
636                                    if column.name() == proj_expr.alias {
637                                        column.name().to_string()
638                                    } else {
639                                        format!(
640                                            "{} as {}",
641                                            proj_expr.expr, proj_expr.alias
642                                        )
643                                    }
644                                } else {
645                                    format!("{} as {}", proj_expr.expr, proj_expr.alias)
646                                }
647                            })
648                            .collect();
649                        write!(f, ", projection=[{}]", expr.join(", "))?;
650                    } else {
651                        write!(f, ", projection={}", ProjectSchemaDisplay(&schema))?;
652                    }
653                }
654
655                if let Some(limit) = self.limit {
656                    write!(f, ", limit={limit}")?;
657                }
658
659                display_orderings(f, &orderings)?;
660
661                if !self.constraints.is_empty() {
662                    write!(f, ", {}", self.constraints)?;
663                }
664
665                self.fmt_file_source(t, f)
666            }
667            DisplayFormatType::TreeRender => {
668                writeln!(f, "format={}", self.file_source.file_type())?;
669                self.file_source.fmt_extra(t, f)?;
670                let num_files = self.file_groups.iter().map(|fg| fg.len()).sum::<usize>();
671                writeln!(f, "files={num_files}")?;
672                Ok(())
673            }
674        }
675    }
676
677    /// If supported by the underlying [`FileSource`], redistribute files across partitions according to their size.
678    fn repartitioned(
679        &self,
680        target_partitions: usize,
681        repartition_file_min_size: usize,
682        output_ordering: Option<LexOrdering>,
683    ) -> Result<Option<Arc<dyn DataSource>>> {
684        // When files are grouped by partition values, we cannot allow byte-range
685        // splitting. It would mix rows from different partition values across
686        // file groups, breaking the Hash partitioning.
687        if self.partitioned_by_file_group {
688            return Ok(None);
689        }
690
691        let source = self.file_source.repartitioned(
692            target_partitions,
693            repartition_file_min_size,
694            output_ordering,
695            self,
696        )?;
697
698        Ok(source.map(|s| Arc::new(s) as _))
699    }
700
701    /// Returns the output partitioning for this file scan.
702    ///
703    /// When `partitioned_by_file_group` is true, this returns `Partitioning::Hash` on
704    /// the Hive partition columns, allowing the optimizer to skip hash repartitioning
705    /// for aggregates and joins on those columns.
706    ///
707    /// Tradeoffs
708    /// - Benefit: Eliminates `RepartitionExec` and `SortExec` for queries with
709    ///   `GROUP BY` or `ORDER BY` on partition columns.
710    /// - Cost: Files are grouped by partition values rather than split by byte
711    ///   ranges, which may reduce I/O parallelism when partition sizes are uneven.
712    ///   For simple aggregations without `ORDER BY`, this cost may outweigh the benefit.
713    ///
714    /// Follow-up Work
715    /// - Idea: Could allow byte-range splitting within partition-aware groups,
716    ///   preserving I/O parallelism while maintaining partition semantics.
717    fn output_partitioning(&self) -> Partitioning {
718        if self.partitioned_by_file_group {
719            let partition_cols = self.table_partition_cols();
720            if !partition_cols.is_empty() {
721                let projected_schema = match self.projected_schema() {
722                    Ok(schema) => schema,
723                    Err(_) => {
724                        debug!(
725                            "Could not get projected schema, falling back to UnknownPartitioning."
726                        );
727                        return Partitioning::UnknownPartitioning(self.file_groups.len());
728                    }
729                };
730
731                // Build Column expressions for partition columns based on their
732                // position in the projected schema
733                let mut exprs: Vec<Arc<dyn PhysicalExpr>> = Vec::new();
734                for partition_col in partition_cols {
735                    if let Some((idx, _)) = projected_schema
736                        .fields()
737                        .iter()
738                        .enumerate()
739                        .find(|(_, f)| f.name() == partition_col.name())
740                    {
741                        exprs.push(Arc::new(Column::new(partition_col.name(), idx)));
742                    }
743                }
744
745                if exprs.len() == partition_cols.len() {
746                    return Partitioning::Hash(exprs, self.file_groups.len());
747                }
748            }
749        }
750        Partitioning::UnknownPartitioning(self.file_groups.len())
751    }
752
753    /// Computes the effective equivalence properties of this file scan, taking
754    /// into account the file schema, any projections or filters applied by the
755    /// file source, and the output ordering.
756    fn eq_properties(&self) -> EquivalenceProperties {
757        let schema = self.file_source.table_schema().table_schema();
758        let mut eq_properties = EquivalenceProperties::new_with_orderings(
759            Arc::clone(schema),
760            self.validated_output_ordering(),
761        )
762        .with_constraints(self.constraints.clone());
763
764        if let Some(filter) = self.file_source.filter() {
765            // We need to remap column indexes to match the projected schema since that's what the equivalence properties deal with.
766            // Note that this will *ignore* any non-projected columns: these don't factor into ordering / equivalence.
767            match Self::add_filter_equivalence_info(&filter, &mut eq_properties, schema) {
768                Ok(()) => {}
769                Err(e) => {
770                    warn!("Failed to add filter equivalence info: {e}");
771                    #[cfg(debug_assertions)]
772                    panic!("Failed to add filter equivalence info: {e}");
773                }
774            }
775        }
776
777        if let Some(projection) = self.file_source.projection() {
778            match (
779                projection.project_schema(schema),
780                projection.projection_mapping(schema),
781            ) {
782                (Ok(output_schema), Ok(mapping)) => {
783                    eq_properties =
784                        eq_properties.project(&mapping, Arc::new(output_schema));
785                }
786                (Err(e), _) | (_, Err(e)) => {
787                    warn!("Failed to project equivalence properties: {e}");
788                    #[cfg(debug_assertions)]
789                    panic!("Failed to project equivalence properties: {e}");
790                }
791            }
792        }
793
794        eq_properties
795    }
796
797    fn scheduling_type(&self) -> SchedulingType {
798        SchedulingType::Cooperative
799    }
800
801    fn partition_statistics(&self, partition: Option<usize>) -> Result<Arc<Statistics>> {
802        if let Some(partition) = partition {
803            // Get statistics for a specific partition
804            // Note: FileGroup statistics include partition columns (computed from partition_values)
805            if let Some(file_group) = self.file_groups.get(partition)
806                && let Some(stat) = file_group.file_statistics(None)
807            {
808                // Project the statistics based on the projection
809                let output_schema = self.projected_schema()?;
810                return if let Some(projection) = self.file_source.projection() {
811                    Ok(Arc::new(
812                        projection.project_statistics(stat.clone(), &output_schema)?,
813                    ))
814                } else {
815                    Ok(Arc::new(stat.clone()))
816                };
817            }
818            // If no statistics available for this partition, return unknown
819            Ok(Arc::new(Statistics::new_unknown(
820                self.projected_schema()?.as_ref(),
821            )))
822        } else {
823            // Return aggregate statistics across all partitions
824            let statistics = self.statistics();
825            let projection = self.file_source.projection();
826            let output_schema = self.projected_schema()?;
827            if let Some(projection) = &projection {
828                Ok(Arc::new(
829                    projection.project_statistics(statistics.clone(), &output_schema)?,
830                ))
831            } else {
832                Ok(Arc::new(statistics))
833            }
834        }
835    }
836
837    fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn DataSource>> {
838        let source = FileScanConfigBuilder::from(self.clone())
839            .with_limit(limit)
840            .build();
841        Some(Arc::new(source))
842    }
843
844    fn fetch(&self) -> Option<usize> {
845        self.limit
846    }
847
848    fn metrics(&self) -> ExecutionPlanMetricsSet {
849        self.file_source.metrics().clone()
850    }
851
852    fn try_swapping_with_projection(
853        &self,
854        projection: &ProjectionExprs,
855    ) -> Result<Option<Arc<dyn DataSource>>> {
856        match self.file_source.try_pushdown_projection(projection)? {
857            Some(new_source) => {
858                let mut new_file_scan_config = self.clone();
859                new_file_scan_config.file_source = new_source;
860                Ok(Some(Arc::new(new_file_scan_config) as Arc<dyn DataSource>))
861            }
862            None => Ok(None),
863        }
864    }
865
866    fn try_pushdown_filters(
867        &self,
868        filters: Vec<Arc<dyn PhysicalExpr>>,
869        config: &ConfigOptions,
870    ) -> Result<FilterPushdownPropagation<Arc<dyn DataSource>>> {
871        // Remap filter Column indices to match the table schema (file + partition columns).
872        // This is necessary because filters refer to the output schema of this `DataSource`
873        // (e.g., after projection pushdown has been applied) and need to be remapped to the table schema
874        // before being passed to the file source
875        //
876        // For example, consider a filter `c1_c2 > 5` being pushed down. If the
877        // `DataSource` has a projection `c1 + c2 as c1_c2`, the filter must be rewritten
878        // to refer to the table schema `c1 + c2 > 5`
879        let table_schema = self.file_source.table_schema().table_schema();
880        let filters_to_remap = if let Some(projection) = self.file_source.projection() {
881            filters
882                .into_iter()
883                .map(|filter| projection.unproject_expr(&filter))
884                .collect::<Result<Vec<_>>>()?
885        } else {
886            filters
887        };
888        // Now remap column indices to match the table schema.
889        let remapped_filters = filters_to_remap
890            .into_iter()
891            .map(|filter| reassign_expr_columns(filter, table_schema))
892            .collect::<Result<Vec<_>>>()?;
893
894        let result = self
895            .file_source
896            .try_pushdown_filters(remapped_filters, config)?;
897        match result.updated_node {
898            Some(new_file_source) => {
899                let mut new_file_scan_config = self.clone();
900                new_file_scan_config.file_source = new_file_source;
901                Ok(FilterPushdownPropagation {
902                    filters: result.filters,
903                    updated_node: Some(Arc::new(new_file_scan_config) as _),
904                })
905            }
906            None => {
907                // If the file source does not support filter pushdown, return the original config
908                Ok(FilterPushdownPropagation {
909                    filters: result.filters,
910                    updated_node: None,
911                })
912            }
913        }
914    }
915
916    /// Push sort requirements into file-based data sources.
917    ///
918    /// # Sort Pushdown Architecture
919    ///
920    /// When a partition (file group) contains multiple files in wrong order,
921    /// `validated_output_ordering()` strips the ordering and `EnforceSorting`
922    /// inserts a `SortExec`. This optimizer fixes the file order by sorting
923    /// files within each group by min/max statistics, enabling sort elimination.
924    ///
925    /// This applies to both single-partition and multi-partition plans — any
926    /// file group with multiple files in wrong order benefits.
927    ///
928    /// ```text
929    /// PushdownSort optimizer finds SortExec
930    ///   │
931    ///   ▼
932    /// FileScanConfig::try_pushdown_sort()
933    ///   │
934    ///   ├─► FileSource returns Exact
935    ///   │     (natural ordering satisfies request)
936    ///   │     → rebuild_with_source: sort files by stats, verify non-overlapping
937    ///   │     → SortExec removed, fetch (LIMIT) pushed to DataSourceExec
938    ///   │
939    ///   ├─► FileSource returns Inexact
940    ///   │     (e.g. column_in_file_schema: opener will reorder RGs at runtime)
941    ///   │     → rebuild_with_source: sort files by stats; if the post-sort
942    ///   │       file groups are non-overlapping AND the request now validates
943    ///   │       AND no NULLs sit in the sort columns of non-last files,
944    ///   │       upgrade back to Exact (SortExec removed). Otherwise stays
945    ///   │       Inexact and SortExec is kept while the scan is still
946    ///   │       optimised via `sort_order_for_reorder` / `reverse_row_groups`.
947    ///   │
948    ///   └─► FileSource returns Unsupported
949    ///         (e.g. expression sort key or partition column)
950    ///         → try_sort_file_groups_by_statistics():
951    ///           1. Sort files within each group by min/max statistics
952    ///           2. Re-check: non-overlapping + ordering valid + no NULLs?
953    ///              YES → Exact → SortExec removed
954    ///              NO  → Inexact (files reordered, Sort stays)
955    /// ```
956    fn try_pushdown_sort(
957        &self,
958        order: &[PhysicalSortExpr],
959    ) -> Result<SortOrderPushdownResult<Arc<dyn DataSource>>> {
960        let pushdown_result = self
961            .file_source
962            .try_pushdown_sort(order, &self.eq_properties())?;
963
964        match pushdown_result {
965            SortOrderPushdownResult::Exact { inner } => {
966                let config = self.rebuild_with_source(inner, true, order)?;
967                // rebuild_with_source keeps output_ordering only when all groups
968                // are non-overlapping. If output_ordering was cleared, files
969                // overlap despite within-file ordering → downgrade to Inexact.
970                if config.output_ordering.is_empty() {
971                    Ok(SortOrderPushdownResult::Inexact {
972                        inner: Arc::new(config),
973                    })
974                } else {
975                    Ok(SortOrderPushdownResult::Exact {
976                        inner: Arc::new(config),
977                    })
978                }
979            }
980            SortOrderPushdownResult::Inexact { inner } => {
981                let mut config = self.rebuild_with_source(inner, false, order)?;
982                // `rebuild_with_source` reorders files by stats; if the
983                // post-sort files are non-overlapping AND the request now
984                // validates against the new file groups, `output_ordering`
985                // is preserved and we can upgrade back to Exact. This
986                // restores the sort-elimination behaviour that lived in
987                // the `Unsupported` → `try_sort_file_groups_by_statistics`
988                // path before #21956 routed `column_in_file_schema` cases
989                // here.
990                if config.output_ordering.is_empty() {
991                    return Ok(SortOrderPushdownResult::Inexact {
992                        inner: Arc::new(config),
993                    });
994                }
995                // Upgrading to Exact: the post-sort file groups are
996                // non-overlapping and each file's declared ordering
997                // re-validates, so reading the files in their natural
998                // (declared-sorted) order already yields the requested
999                // ordering — exactly like the `Unsupported` → Exact path,
1000                // which reads files in natural order too.
1001                //
1002                // Drop the runtime row-group reorder hints the Inexact
1003                // source carried (`sort_order_for_reorder` /
1004                // `reverse_row_groups`) by restoring the original,
1005                // hint-free source. With the `SortExec` removed those
1006                // hints are not just redundant but unsafe: for a DESC
1007                // request the opener sorts row groups ASC-by-min and then
1008                // reverses them, which reorders two row groups within a
1009                // single file that share the same `min` incorrectly
1010                // (e.g. a file `[10,8,8,8]` whose row groups are
1011                // `[10,8]` and `[8,8]` would stream as `8,8,10,8`).
1012                // The `SortExec` used to mask this; once it is gone the
1013                // reordered stream is the final, wrong answer.
1014                config.file_source = Arc::clone(&self.file_source);
1015                Ok(SortOrderPushdownResult::Exact {
1016                    inner: Arc::new(config),
1017                })
1018            }
1019            SortOrderPushdownResult::Unsupported => {
1020                self.try_sort_file_groups_by_statistics(order)
1021            }
1022        }
1023    }
1024
1025    fn with_preserve_order(&self, preserve_order: bool) -> Option<Arc<dyn DataSource>> {
1026        if self.preserve_order == preserve_order {
1027            return Some(Arc::new(self.clone()));
1028        }
1029
1030        let new_config = FileScanConfig {
1031            preserve_order,
1032            ..self.clone()
1033        };
1034        Some(Arc::new(new_config))
1035    }
1036
1037    /// Create any shared state that should be passed between sibling streams
1038    /// during one execution.
1039    ///
1040    /// This returns `None` when sibling streams must not share work, such as
1041    /// when file order must be preserved or the file groups define the output
1042    /// partitioning needed for the rest of the plan
1043    fn create_sibling_state(&self) -> Option<Arc<dyn Any + Send + Sync>> {
1044        if self.preserve_order || self.partitioned_by_file_group {
1045            return None;
1046        }
1047
1048        Some(Arc::new(SharedWorkSource::from_config(self)) as Arc<dyn Any + Send + Sync>)
1049    }
1050}
1051
1052impl FileScanConfig {
1053    /// Returns only the output orderings that are validated against actual
1054    /// file group statistics.
1055    ///
1056    /// For example, individual files may be ordered by `col1 ASC`,
1057    /// but if we have files with these min/max statistics in a single partition / file group:
1058    ///
1059    /// - file1: min(col1) = 10, max(col1) = 20
1060    /// - file2: min(col1) = 5, max(col1) = 15
1061    ///
1062    /// Because reading file1 followed by file2 would produce out-of-order output (there is overlap
1063    /// in the ranges), we cannot retain `col1 ASC` as a valid output ordering.
1064    ///
1065    /// Similarly this would not be a valid order (non-overlapping ranges but not ordered):
1066    ///
1067    /// - file1: min(col1) = 20, max(col1) = 30
1068    /// - file2: min(col1) = 10, max(col1) = 15
1069    ///
1070    /// On the other hand if we had:
1071    ///
1072    /// - file1: min(col1) = 5, max(col1) = 15
1073    /// - file2: min(col1) = 16, max(col1) = 25
1074    ///
1075    /// Then we know that reading file1 followed by file2 will produce ordered output,
1076    /// so `col1 ASC` would be retained.
1077    ///
1078    /// Note that we are checking for ordering *within* *each* file group / partition,
1079    /// files in different partitions are read independently and do not affect each other's ordering.
1080    /// Merging of the multiple partition streams into a single ordered stream is handled
1081    /// upstream e.g. by `SortPreservingMergeExec`.
1082    fn validated_output_ordering(&self) -> Vec<LexOrdering> {
1083        let schema = self.file_source.table_schema().table_schema();
1084        sort_pushdown::validate_orderings(
1085            &self.output_ordering,
1086            schema,
1087            &self.file_groups,
1088            None,
1089        )
1090    }
1091
1092    /// Get the file schema (schema of the files without partition columns)
1093    pub fn file_schema(&self) -> &SchemaRef {
1094        self.file_source.table_schema().file_schema()
1095    }
1096
1097    /// Get the table partition columns
1098    pub fn table_partition_cols(&self) -> &Vec<FieldRef> {
1099        self.file_source.table_schema().table_partition_cols()
1100    }
1101
1102    /// Returns the unprojected table statistics, marking them as inexact if filters are present.
1103    ///
1104    /// When filters are pushed down (including pruning predicates and bloom filters),
1105    /// we can't guarantee the statistics are exact because we don't know how many
1106    /// rows will be filtered out.
1107    pub fn statistics(&self) -> Statistics {
1108        if self.file_source.filter().is_some() {
1109            self.statistics.clone().to_inexact()
1110        } else {
1111            self.statistics.clone()
1112        }
1113    }
1114
1115    pub fn projected_schema(&self) -> Result<Arc<Schema>> {
1116        let schema = self.file_source.table_schema().table_schema();
1117        match self.file_source.projection() {
1118            Some(proj) => Ok(Arc::new(proj.project_schema(schema)?)),
1119            None => Ok(Arc::clone(schema)),
1120        }
1121    }
1122
1123    fn add_filter_equivalence_info(
1124        filter: &Arc<dyn PhysicalExpr>,
1125        eq_properties: &mut EquivalenceProperties,
1126        schema: &Schema,
1127    ) -> Result<()> {
1128        // Gather valid equality pairs from the filter expression
1129        let equal_pairs = split_conjunction(filter).into_iter().filter_map(|expr| {
1130            // Ignore any binary expressions that reference non-existent columns in the current schema
1131            // (e.g. due to unnecessary projections being removed)
1132            reassign_expr_columns(Arc::clone(expr), schema)
1133                .ok()
1134                .and_then(|expr| match expr.downcast_ref::<BinaryExpr>() {
1135                    Some(expr) if expr.op() == &Operator::Eq => {
1136                        Some((Arc::clone(expr.left()), Arc::clone(expr.right())))
1137                    }
1138                    _ => None,
1139                })
1140        });
1141
1142        for (lhs, rhs) in equal_pairs {
1143            eq_properties.add_equal_conditions(lhs, rhs)?
1144        }
1145
1146        Ok(())
1147    }
1148
1149    /// Returns whether newlines in values are supported.
1150    ///
1151    /// This method always returns `false`. The actual newlines_in_values setting
1152    /// has been moved to [`CsvSource`] and should be accessed via
1153    /// [`CsvSource::csv_options()`] instead.
1154    ///
1155    /// [`CsvSource`]: https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.CsvSource.html
1156    /// [`CsvSource::csv_options()`]: https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.CsvSource.html#method.csv_options
1157    #[deprecated(
1158        since = "52.0.0",
1159        note = "newlines_in_values has moved to CsvSource. Access it via CsvSource::csv_options().newlines_in_values instead. It will be removed in 58.0.0 or 6 months after 52.0.0 is released, whichever comes first."
1160    )]
1161    pub fn newlines_in_values(&self) -> bool {
1162        false
1163    }
1164
1165    #[deprecated(
1166        since = "52.0.0",
1167        note = "This method is no longer used, use eq_properties instead. It will be removed in 58.0.0 or 6 months after 52.0.0 is released, whichever comes first."
1168    )]
1169    pub fn projected_constraints(&self) -> Constraints {
1170        let props = self.eq_properties();
1171        props.constraints().clone()
1172    }
1173
1174    #[deprecated(
1175        since = "52.0.0",
1176        note = "This method is no longer used, use eq_properties instead. It will be removed in 58.0.0 or 6 months after 52.0.0 is released, whichever comes first."
1177    )]
1178    pub fn file_column_projection_indices(&self) -> Option<Vec<usize>> {
1179        #[expect(deprecated)]
1180        self.file_source.projection().as_ref().map(|p| {
1181            p.ordered_column_indices()
1182                .into_iter()
1183                .filter(|&i| i < self.file_schema().fields().len())
1184                .collect::<Vec<_>>()
1185        })
1186    }
1187
1188    /// Splits file groups into new groups based on statistics to enable efficient parallel processing.
1189    ///
1190    /// The method distributes files across a target number of partitions while ensuring
1191    /// files within each partition maintain sort order based on their min/max statistics.
1192    ///
1193    /// The algorithm works by:
1194    /// 1. Takes files sorted by minimum values
1195    /// 2. For each file:
1196    ///   - Finds eligible groups (empty or where file's min > group's last max)
1197    ///   - Selects the smallest eligible group
1198    ///   - Creates a new group if needed
1199    ///
1200    /// # Parameters
1201    /// * `table_schema`: Schema containing information about the columns
1202    /// * `file_groups`: The original file groups to split
1203    /// * `sort_order`: The lexicographical ordering to maintain within each group
1204    /// * `target_partitions`: The desired number of output partitions
1205    ///
1206    /// # Returns
1207    /// A new set of file groups, where files within each group are non-overlapping with respect to
1208    /// their min/max statistics and maintain the specified sort order.
1209    pub fn split_groups_by_statistics_with_target_partitions(
1210        table_schema: &SchemaRef,
1211        file_groups: &[FileGroup],
1212        sort_order: &LexOrdering,
1213        target_partitions: usize,
1214    ) -> Result<Vec<FileGroup>> {
1215        if target_partitions == 0 {
1216            return Err(internal_datafusion_err!(
1217                "target_partitions must be greater than 0"
1218            ));
1219        }
1220
1221        let flattened_files = file_groups
1222            .iter()
1223            .flat_map(FileGroup::iter)
1224            .collect::<Vec<_>>();
1225
1226        if flattened_files.is_empty() {
1227            return Ok(vec![]);
1228        }
1229
1230        let statistics = MinMaxStatistics::new_from_files(
1231            sort_order,
1232            table_schema,
1233            None,
1234            flattened_files.iter().copied(),
1235        )?;
1236
1237        let indices_sorted_by_min = statistics.min_values_sorted();
1238
1239        // Initialize with target_partitions empty groups
1240        let mut file_groups_indices: Vec<Vec<usize>> = vec![vec![]; target_partitions];
1241
1242        for (idx, min) in indices_sorted_by_min {
1243            if let Some((_, group)) = file_groups_indices
1244                .iter_mut()
1245                .enumerate()
1246                .filter(|(_, group)| {
1247                    group.is_empty()
1248                        || min
1249                            > statistics
1250                                .max(*group.last().expect("groups should not be empty"))
1251                })
1252                .min_by_key(|(_, group)| group.len())
1253            {
1254                group.push(idx);
1255            } else {
1256                // Create a new group if no existing group fits
1257                file_groups_indices.push(vec![idx]);
1258            }
1259        }
1260
1261        // Remove any empty groups
1262        file_groups_indices.retain(|group| !group.is_empty());
1263
1264        // Assemble indices back into groups of PartitionedFiles
1265        Ok(file_groups_indices
1266            .into_iter()
1267            .map(|file_group_indices| {
1268                FileGroup::new(
1269                    file_group_indices
1270                        .into_iter()
1271                        .map(|idx| flattened_files[idx].clone())
1272                        .collect(),
1273                )
1274            })
1275            .collect())
1276    }
1277
1278    /// Attempts to do a bin-packing on files into file groups, such that any two files
1279    /// in a file group are ordered and non-overlapping with respect to their statistics.
1280    /// It will produce the smallest number of file groups possible.
1281    pub fn split_groups_by_statistics(
1282        table_schema: &SchemaRef,
1283        file_groups: &[FileGroup],
1284        sort_order: &LexOrdering,
1285    ) -> Result<Vec<FileGroup>> {
1286        let flattened_files = file_groups
1287            .iter()
1288            .flat_map(FileGroup::iter)
1289            .collect::<Vec<_>>();
1290        // First Fit:
1291        // * Choose the first file group that a file can be placed into.
1292        // * If it fits into no existing file groups, create a new one.
1293        //
1294        // By sorting files by min values and then applying first-fit bin packing,
1295        // we can produce the smallest number of file groups such that
1296        // files within a group are in order and non-overlapping.
1297        //
1298        // Source: Applied Combinatorics (Keller and Trotter), Chapter 6.8
1299        // https://www.appliedcombinatorics.org/book/s_posets_dilworth-intord.html
1300
1301        if flattened_files.is_empty() {
1302            return Ok(vec![]);
1303        }
1304
1305        let statistics = MinMaxStatistics::new_from_files(
1306            sort_order,
1307            table_schema,
1308            None,
1309            flattened_files.iter().copied(),
1310        )
1311        .map_err(|e| {
1312            e.context("construct min/max statistics for split_groups_by_statistics")
1313        })?;
1314
1315        let indices_sorted_by_min = statistics.min_values_sorted();
1316        let mut file_groups_indices: Vec<Vec<usize>> = vec![];
1317
1318        for (idx, min) in indices_sorted_by_min {
1319            let file_group_to_insert = file_groups_indices.iter_mut().find(|group| {
1320                // If our file is non-overlapping and comes _after_ the last file,
1321                // it fits in this file group.
1322                min > statistics.max(
1323                    *group
1324                        .last()
1325                        .expect("groups should be nonempty at construction"),
1326                )
1327            });
1328            match file_group_to_insert {
1329                Some(group) => group.push(idx),
1330                None => file_groups_indices.push(vec![idx]),
1331            }
1332        }
1333
1334        // Assemble indices back into groups of PartitionedFiles
1335        Ok(file_groups_indices
1336            .into_iter()
1337            .map(|file_group_indices| {
1338                file_group_indices
1339                    .into_iter()
1340                    .map(|idx| flattened_files[idx].clone())
1341                    .collect()
1342            })
1343            .collect())
1344    }
1345
1346    /// Write the data_type based on file_source
1347    fn fmt_file_source(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult {
1348        write!(f, ", file_type={}", self.file_source.file_type())?;
1349        self.file_source.fmt_extra(t, f)
1350    }
1351
1352    /// Returns the file_source
1353    pub fn file_source(&self) -> &Arc<dyn FileSource> {
1354        &self.file_source
1355    }
1356
1357    // Sort pushdown methods (rebuild_with_source, try_sort_file_groups_by_statistics,
1358    // sort_files_within_groups_by_statistics, any_file_has_nulls_in_sort_columns)
1359    // are in crate::sort_pushdown module.
1360}
1361
1362impl Debug for FileScanConfig {
1363    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
1364        write!(f, "FileScanConfig {{")?;
1365        write!(f, "object_store_url={:?}, ", self.object_store_url)?;
1366
1367        write!(f, "statistics={:?}, ", self.statistics())?;
1368
1369        DisplayAs::fmt_as(self, DisplayFormatType::Verbose, f)?;
1370        write!(f, "}}")
1371    }
1372}
1373
1374impl DisplayAs for FileScanConfig {
1375    fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult {
1376        let schema = self.projected_schema().map_err(|_| std::fmt::Error {})?;
1377        let orderings = sort_pushdown::get_projected_output_ordering(self, &schema);
1378
1379        write!(f, "file_groups=")?;
1380        FileGroupsDisplay(&self.file_groups).fmt_as(t, f)?;
1381
1382        if !schema.fields().is_empty() {
1383            write!(f, ", projection={}", ProjectSchemaDisplay(&schema))?;
1384        }
1385
1386        if let Some(limit) = self.limit {
1387            write!(f, ", limit={limit}")?;
1388        }
1389
1390        display_orderings(f, &orderings)?;
1391
1392        if !self.constraints.is_empty() {
1393            write!(f, ", {}", self.constraints)?;
1394        }
1395
1396        Ok(())
1397    }
1398}
1399
1400/// Convert type to a type suitable for use as a `ListingTable`
1401/// partition column. Returns `Dictionary(UInt16, val_type)`, which is
1402/// a reasonable trade off between a reasonable number of partition
1403/// values and space efficiency.
1404///
1405/// This use this to specify types for partition columns. However
1406/// you MAY also choose not to dictionary-encode the data or to use a
1407/// different dictionary type.
1408///
1409/// Use [`wrap_partition_value_in_dict`] to wrap a [`ScalarValue`] in the same say.
1410pub fn wrap_partition_type_in_dict(val_type: DataType) -> DataType {
1411    DataType::Dictionary(Box::new(DataType::UInt16), Box::new(val_type))
1412}
1413
1414/// Convert a [`ScalarValue`] of partition columns to a type, as
1415/// described in the documentation of [`wrap_partition_type_in_dict`],
1416/// which can wrap the types.
1417pub fn wrap_partition_value_in_dict(val: ScalarValue) -> ScalarValue {
1418    ScalarValue::Dictionary(Box::new(DataType::UInt16), Box::new(val))
1419}
1420
1421#[cfg(test)]
1422mod tests {
1423    use std::collections::HashMap;
1424
1425    use super::*;
1426    use crate::TableSchema;
1427    use crate::source::DataSourceExec;
1428    use crate::test_util::col;
1429    use crate::{
1430        generate_test_files, test_util::MockSource, tests::aggr_test_schema,
1431        verify_sort_integrity,
1432    };
1433
1434    use arrow::array::{Int32Array, RecordBatch};
1435    use arrow::datatypes::Field;
1436    use datafusion_common::ColumnStatistics;
1437    use datafusion_common::stats::Precision;
1438    use datafusion_common::{Result, assert_batches_eq, internal_err};
1439    use datafusion_execution::TaskContext;
1440    use datafusion_expr::SortExpr;
1441    use datafusion_physical_expr::create_physical_sort_expr;
1442    use datafusion_physical_expr::expressions::Literal;
1443    use datafusion_physical_expr::projection::ProjectionExpr;
1444    use datafusion_physical_expr::projection::ProjectionExprs;
1445    use datafusion_physical_plan::ExecutionPlan;
1446    use datafusion_physical_plan::execution_plan::collect;
1447    use futures::FutureExt as _;
1448    use futures::StreamExt as _;
1449    use futures::stream;
1450    use object_store::ObjectStore;
1451    use std::fmt::Debug;
1452
1453    #[derive(Clone)]
1454    struct InexactSortPushdownSource {
1455        metrics: ExecutionPlanMetricsSet,
1456        table_schema: TableSchema,
1457    }
1458
1459    impl InexactSortPushdownSource {
1460        fn new(table_schema: TableSchema) -> Self {
1461            Self {
1462                metrics: ExecutionPlanMetricsSet::new(),
1463                table_schema,
1464            }
1465        }
1466    }
1467
1468    impl FileSource for InexactSortPushdownSource {
1469        fn create_file_opener(
1470            &self,
1471            _object_store: Arc<dyn ObjectStore>,
1472            _base_config: &FileScanConfig,
1473            _partition: usize,
1474        ) -> Result<Arc<dyn crate::file_stream::FileOpener>> {
1475            unimplemented!()
1476        }
1477
1478        fn table_schema(&self) -> &TableSchema {
1479            &self.table_schema
1480        }
1481
1482        fn with_batch_size(&self, _batch_size: usize) -> Arc<dyn FileSource> {
1483            Arc::new(self.clone())
1484        }
1485
1486        fn metrics(&self) -> &ExecutionPlanMetricsSet {
1487            &self.metrics
1488        }
1489
1490        fn file_type(&self) -> &str {
1491            "mock"
1492        }
1493
1494        fn try_pushdown_sort(
1495            &self,
1496            _order: &[PhysicalSortExpr],
1497            _eq_properties: &EquivalenceProperties,
1498        ) -> Result<SortOrderPushdownResult<Arc<dyn FileSource>>> {
1499            Ok(SortOrderPushdownResult::Inexact {
1500                inner: Arc::new(self.clone()) as Arc<dyn FileSource>,
1501            })
1502        }
1503    }
1504
1505    #[test]
1506    fn physical_plan_config_no_projection_tab_cols_as_field() {
1507        let file_schema = aggr_test_schema();
1508
1509        // make a table_partition_col as a field
1510        let table_partition_col =
1511            Field::new("date", wrap_partition_type_in_dict(DataType::Utf8), true)
1512                .with_metadata(HashMap::from_iter(vec![(
1513                    "key_whatever".to_owned(),
1514                    "value_whatever".to_owned(),
1515                )]));
1516
1517        let conf = config_for_projection(
1518            Arc::clone(&file_schema),
1519            None,
1520            Statistics::new_unknown(&file_schema),
1521            vec![table_partition_col.clone()],
1522        );
1523
1524        // verify the proj_schema includes the last column and exactly the same the field it is defined
1525        let proj_schema = conf.projected_schema().unwrap();
1526        assert_eq!(proj_schema.fields().len(), file_schema.fields().len() + 1);
1527        assert_eq!(
1528            *proj_schema.field(file_schema.fields().len()),
1529            table_partition_col,
1530            "partition columns are the last columns and ust have all values defined in created field"
1531        );
1532    }
1533
1534    #[test]
1535    fn test_split_groups_by_statistics() -> Result<()> {
1536        use chrono::TimeZone;
1537        use datafusion_common::DFSchema;
1538        use datafusion_expr::execution_props::ExecutionProps;
1539        use object_store::{ObjectMeta, path::Path};
1540
1541        struct File {
1542            name: &'static str,
1543            date: &'static str,
1544            statistics: Vec<Option<(Option<f64>, Option<f64>)>>,
1545        }
1546        impl File {
1547            fn new(
1548                name: &'static str,
1549                date: &'static str,
1550                statistics: Vec<Option<(f64, f64)>>,
1551            ) -> Self {
1552                Self::new_nullable(
1553                    name,
1554                    date,
1555                    statistics
1556                        .into_iter()
1557                        .map(|opt| opt.map(|(min, max)| (Some(min), Some(max))))
1558                        .collect(),
1559                )
1560            }
1561
1562            fn new_nullable(
1563                name: &'static str,
1564                date: &'static str,
1565                statistics: Vec<Option<(Option<f64>, Option<f64>)>>,
1566            ) -> Self {
1567                Self {
1568                    name,
1569                    date,
1570                    statistics,
1571                }
1572            }
1573        }
1574
1575        struct TestCase {
1576            name: &'static str,
1577            file_schema: Schema,
1578            files: Vec<File>,
1579            sort: Vec<SortExpr>,
1580            expected_result: Result<Vec<Vec<&'static str>>, &'static str>,
1581        }
1582
1583        use datafusion_expr::col;
1584        let cases = vec![
1585            TestCase {
1586                name: "test sort",
1587                file_schema: Schema::new(vec![Field::new(
1588                    "value".to_string(),
1589                    DataType::Float64,
1590                    false,
1591                )]),
1592                files: vec![
1593                    File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1594                    File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
1595                    File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
1596                ],
1597                sort: vec![col("value").sort(true, false)],
1598                expected_result: Ok(vec![vec!["0", "1"], vec!["2"]]),
1599            },
1600            // same input but file '2' is in the middle
1601            // test that we still order correctly
1602            TestCase {
1603                name: "test sort with files ordered differently",
1604                file_schema: Schema::new(vec![Field::new(
1605                    "value".to_string(),
1606                    DataType::Float64,
1607                    false,
1608                )]),
1609                files: vec![
1610                    File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1611                    File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
1612                    File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
1613                ],
1614                sort: vec![col("value").sort(true, false)],
1615                expected_result: Ok(vec![vec!["0", "1"], vec!["2"]]),
1616            },
1617            TestCase {
1618                name: "reverse sort",
1619                file_schema: Schema::new(vec![Field::new(
1620                    "value".to_string(),
1621                    DataType::Float64,
1622                    false,
1623                )]),
1624                files: vec![
1625                    File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1626                    File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
1627                    File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
1628                ],
1629                sort: vec![col("value").sort(false, true)],
1630                expected_result: Ok(vec![vec!["1", "0"], vec!["2"]]),
1631            },
1632            TestCase {
1633                name: "nullable sort columns, nulls last",
1634                file_schema: Schema::new(vec![Field::new(
1635                    "value".to_string(),
1636                    DataType::Float64,
1637                    true,
1638                )]),
1639                files: vec![
1640                    File::new_nullable(
1641                        "0",
1642                        "2023-01-01",
1643                        vec![Some((Some(0.00), Some(0.49)))],
1644                    ),
1645                    File::new_nullable("1", "2023-01-01", vec![Some((Some(0.50), None))]),
1646                    File::new_nullable("2", "2023-01-02", vec![Some((Some(0.00), None))]),
1647                ],
1648                sort: vec![col("value").sort(true, false)],
1649                expected_result: Ok(vec![vec!["0", "1"], vec!["2"]]),
1650            },
1651            TestCase {
1652                name: "nullable sort columns, nulls first",
1653                file_schema: Schema::new(vec![Field::new(
1654                    "value".to_string(),
1655                    DataType::Float64,
1656                    true,
1657                )]),
1658                files: vec![
1659                    File::new_nullable("0", "2023-01-01", vec![Some((None, Some(0.49)))]),
1660                    File::new_nullable(
1661                        "1",
1662                        "2023-01-01",
1663                        vec![Some((Some(0.50), Some(1.00)))],
1664                    ),
1665                    File::new_nullable("2", "2023-01-02", vec![Some((None, Some(1.00)))]),
1666                ],
1667                sort: vec![col("value").sort(true, true)],
1668                expected_result: Ok(vec![vec!["0", "1"], vec!["2"]]),
1669            },
1670            TestCase {
1671                name: "all three non-overlapping",
1672                file_schema: Schema::new(vec![Field::new(
1673                    "value".to_string(),
1674                    DataType::Float64,
1675                    false,
1676                )]),
1677                files: vec![
1678                    File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1679                    File::new("1", "2023-01-01", vec![Some((0.50, 0.99))]),
1680                    File::new("2", "2023-01-02", vec![Some((1.00, 1.49))]),
1681                ],
1682                sort: vec![col("value").sort(true, false)],
1683                expected_result: Ok(vec![vec!["0", "1", "2"]]),
1684            },
1685            TestCase {
1686                name: "all three overlapping",
1687                file_schema: Schema::new(vec![Field::new(
1688                    "value".to_string(),
1689                    DataType::Float64,
1690                    false,
1691                )]),
1692                files: vec![
1693                    File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1694                    File::new("1", "2023-01-01", vec![Some((0.00, 0.49))]),
1695                    File::new("2", "2023-01-02", vec![Some((0.00, 0.49))]),
1696                ],
1697                sort: vec![col("value").sort(true, false)],
1698                expected_result: Ok(vec![vec!["0"], vec!["1"], vec!["2"]]),
1699            },
1700            TestCase {
1701                name: "empty input",
1702                file_schema: Schema::new(vec![Field::new(
1703                    "value".to_string(),
1704                    DataType::Float64,
1705                    false,
1706                )]),
1707                files: vec![],
1708                sort: vec![col("value").sort(true, false)],
1709                expected_result: Ok(vec![]),
1710            },
1711            TestCase {
1712                name: "one file missing statistics",
1713                file_schema: Schema::new(vec![Field::new(
1714                    "value".to_string(),
1715                    DataType::Float64,
1716                    false,
1717                )]),
1718                files: vec![
1719                    File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
1720                    File::new("1", "2023-01-01", vec![Some((0.00, 0.49))]),
1721                    File::new("2", "2023-01-02", vec![None]),
1722                ],
1723                sort: vec![col("value").sort(true, false)],
1724                expected_result: Err(
1725                    "construct min/max statistics for split_groups_by_statistics\ncaused by\ncollect min/max values\ncaused by\nget min/max for column: 'value'\ncaused by\nError during planning: statistics not found",
1726                ),
1727            },
1728        ];
1729
1730        for case in cases {
1731            let table_schema = Arc::new(Schema::new(
1732                case.file_schema
1733                    .fields()
1734                    .clone()
1735                    .into_iter()
1736                    .cloned()
1737                    .chain(Some(Arc::new(Field::new(
1738                        "date".to_string(),
1739                        DataType::Utf8,
1740                        false,
1741                    ))))
1742                    .collect::<Vec<_>>(),
1743            ));
1744            let Some(sort_order) = LexOrdering::new(
1745                case.sort
1746                    .into_iter()
1747                    .map(|expr| {
1748                        create_physical_sort_expr(
1749                            &expr,
1750                            &DFSchema::try_from(Arc::clone(&table_schema))?,
1751                            &ExecutionProps::default(),
1752                        )
1753                    })
1754                    .collect::<Result<Vec<_>>>()?,
1755            ) else {
1756                return internal_err!("This test should always use an ordering");
1757            };
1758
1759            let partitioned_files = FileGroup::new(
1760                case.files.into_iter().map(From::from).collect::<Vec<_>>(),
1761            );
1762            let result = FileScanConfig::split_groups_by_statistics(
1763                &table_schema,
1764                std::slice::from_ref(&partitioned_files),
1765                &sort_order,
1766            );
1767            let results_by_name = result
1768                .as_ref()
1769                .map(|file_groups| {
1770                    file_groups
1771                        .iter()
1772                        .map(|file_group| {
1773                            file_group
1774                                .iter()
1775                                .map(|file| {
1776                                    partitioned_files
1777                                        .iter()
1778                                        .find_map(|f| {
1779                                            if f.object_meta == file.object_meta {
1780                                                Some(
1781                                                    f.object_meta
1782                                                        .location
1783                                                        .as_ref()
1784                                                        .rsplit('/')
1785                                                        .next()
1786                                                        .unwrap()
1787                                                        .trim_end_matches(".parquet"),
1788                                                )
1789                                            } else {
1790                                                None
1791                                            }
1792                                        })
1793                                        .unwrap()
1794                                })
1795                                .collect::<Vec<_>>()
1796                        })
1797                        .collect::<Vec<_>>()
1798                })
1799                .map_err(|e| e.strip_backtrace().leak() as &'static str);
1800
1801            assert_eq!(results_by_name, case.expected_result, "{}", case.name);
1802        }
1803
1804        return Ok(());
1805
1806        impl From<File> for PartitionedFile {
1807            fn from(file: File) -> Self {
1808                let object_meta = ObjectMeta {
1809                    location: Path::from(format!(
1810                        "data/date={}/{}.parquet",
1811                        file.date, file.name
1812                    )),
1813                    last_modified: chrono::Utc.timestamp_nanos(0),
1814                    size: 0,
1815                    e_tag: None,
1816                    version: None,
1817                };
1818                let statistics = Arc::new(Statistics {
1819                    num_rows: Precision::Absent,
1820                    total_byte_size: Precision::Absent,
1821                    column_statistics: file
1822                        .statistics
1823                        .into_iter()
1824                        .map(|stats| {
1825                            stats
1826                                .map(|(min, max)| ColumnStatistics {
1827                                    min_value: Precision::Exact(ScalarValue::Float64(
1828                                        min,
1829                                    )),
1830                                    max_value: Precision::Exact(ScalarValue::Float64(
1831                                        max,
1832                                    )),
1833                                    ..Default::default()
1834                                })
1835                                .unwrap_or_default()
1836                        })
1837                        .collect::<Vec<_>>(),
1838                });
1839                PartitionedFile::new_from_meta(object_meta)
1840                    .with_partition_values(vec![ScalarValue::from(file.date)])
1841                    .with_statistics(statistics)
1842            }
1843        }
1844    }
1845
1846    // sets default for configs that play no role in projections
1847    fn config_for_projection(
1848        file_schema: SchemaRef,
1849        projection: Option<Vec<usize>>,
1850        statistics: Statistics,
1851        table_partition_cols: Vec<Field>,
1852    ) -> FileScanConfig {
1853        let table_schema = TableSchema::new(
1854            file_schema,
1855            table_partition_cols.into_iter().map(Arc::new).collect(),
1856        );
1857        FileScanConfigBuilder::new(
1858            ObjectStoreUrl::parse("test:///").unwrap(),
1859            Arc::new(MockSource::new(table_schema.clone())),
1860        )
1861        .with_projection_indices(projection)
1862        .unwrap()
1863        .with_statistics(statistics)
1864        .build()
1865    }
1866
1867    #[test]
1868    fn test_file_scan_config_builder() {
1869        let file_schema = aggr_test_schema();
1870        let object_store_url = ObjectStoreUrl::parse("test:///").unwrap();
1871
1872        let table_schema = TableSchema::new(
1873            Arc::clone(&file_schema),
1874            vec![Arc::new(Field::new(
1875                "date",
1876                wrap_partition_type_in_dict(DataType::Utf8),
1877                false,
1878            ))],
1879        );
1880
1881        let file_source: Arc<dyn FileSource> =
1882            Arc::new(MockSource::new(table_schema.clone()));
1883
1884        // Create a builder with required parameters
1885        let builder = FileScanConfigBuilder::new(
1886            object_store_url.clone(),
1887            Arc::clone(&file_source),
1888        );
1889
1890        // Build with various configurations
1891        let config = builder
1892            .with_limit(Some(1000))
1893            .with_projection_indices(Some(vec![0, 1]))
1894            .unwrap()
1895            .with_statistics(Statistics::new_unknown(&file_schema))
1896            .with_file_groups(vec![FileGroup::new(vec![PartitionedFile::new(
1897                "test.parquet".to_string(),
1898                1024,
1899            )])])
1900            .with_output_ordering(vec![
1901                [PhysicalSortExpr::new_default(Arc::new(Column::new(
1902                    "date", 0,
1903                )))]
1904                .into(),
1905            ])
1906            .with_file_compression_type(FileCompressionType::UNCOMPRESSED)
1907            .build();
1908
1909        // Verify the built config has all the expected values
1910        assert_eq!(config.object_store_url, object_store_url);
1911        assert_eq!(*config.file_schema(), file_schema);
1912        assert_eq!(config.limit, Some(1000));
1913        assert_eq!(
1914            config
1915                .file_source
1916                .projection()
1917                .as_ref()
1918                .map(|p| p.column_indices()),
1919            Some(vec![0, 1])
1920        );
1921        assert_eq!(config.table_partition_cols().len(), 1);
1922        assert_eq!(config.table_partition_cols()[0].name(), "date");
1923        assert_eq!(config.file_groups.len(), 1);
1924        assert_eq!(config.file_groups[0].len(), 1);
1925        assert_eq!(
1926            config.file_groups[0][0].object_meta.location.as_ref(),
1927            "test.parquet"
1928        );
1929        assert_eq!(
1930            config.file_compression_type,
1931            FileCompressionType::UNCOMPRESSED
1932        );
1933        assert_eq!(config.output_ordering.len(), 1);
1934    }
1935
1936    #[test]
1937    fn equivalence_properties_after_schema_change() {
1938        let file_schema = aggr_test_schema();
1939        let object_store_url = ObjectStoreUrl::parse("test:///").unwrap();
1940
1941        let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]);
1942
1943        // Create a file source with a filter
1944        let file_source: Arc<dyn FileSource> = Arc::new(
1945            MockSource::new(table_schema.clone()).with_filter(Arc::new(BinaryExpr::new(
1946                col("c2", &file_schema).unwrap(),
1947                Operator::Eq,
1948                Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
1949            ))),
1950        );
1951
1952        let config = FileScanConfigBuilder::new(
1953            object_store_url.clone(),
1954            Arc::clone(&file_source),
1955        )
1956        .with_projection_indices(Some(vec![0, 1, 2]))
1957        .unwrap()
1958        .build();
1959
1960        // Simulate projection being updated. Since the filter has already been pushed down,
1961        // the new projection won't include the filtered column.
1962        let exprs = ProjectionExprs::new(vec![ProjectionExpr::new(
1963            col("c1", &file_schema).unwrap(),
1964            "c1",
1965        )]);
1966        let data_source = config
1967            .try_swapping_with_projection(&exprs)
1968            .unwrap()
1969            .unwrap();
1970
1971        // Gather the equivalence properties from the new data source. There should
1972        // be no equivalence class for column c2 since it was removed by the projection.
1973        let eq_properties = data_source.eq_properties();
1974        let eq_group = eq_properties.eq_group();
1975
1976        for class in eq_group.iter() {
1977            for expr in class.iter() {
1978                if let Some(col) = expr.downcast_ref::<Column>() {
1979                    assert_ne!(
1980                        col.name(),
1981                        "c2",
1982                        "c2 should not be present in any equivalence class"
1983                    );
1984                }
1985            }
1986        }
1987    }
1988
1989    #[test]
1990    fn test_file_scan_config_builder_defaults() {
1991        let file_schema = aggr_test_schema();
1992        let object_store_url = ObjectStoreUrl::parse("test:///").unwrap();
1993
1994        let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]);
1995
1996        let file_source: Arc<dyn FileSource> =
1997            Arc::new(MockSource::new(table_schema.clone()));
1998
1999        // Create a builder with only required parameters and build without any additional configurations
2000        let config = FileScanConfigBuilder::new(
2001            object_store_url.clone(),
2002            Arc::clone(&file_source),
2003        )
2004        .build();
2005
2006        // Verify default values
2007        assert_eq!(config.object_store_url, object_store_url);
2008        assert_eq!(*config.file_schema(), file_schema);
2009        assert_eq!(config.limit, None);
2010        // When no projection is specified, the file source should have an unprojected projection
2011        // (i.e., all columns)
2012        let expected_projection: Vec<usize> = (0..file_schema.fields().len()).collect();
2013        assert_eq!(
2014            config
2015                .file_source
2016                .projection()
2017                .as_ref()
2018                .map(|p| p.column_indices()),
2019            Some(expected_projection)
2020        );
2021        assert!(config.table_partition_cols().is_empty());
2022        assert!(config.file_groups.is_empty());
2023        assert_eq!(
2024            config.file_compression_type,
2025            FileCompressionType::UNCOMPRESSED
2026        );
2027        assert!(config.output_ordering.is_empty());
2028        assert!(config.constraints.is_empty());
2029
2030        // Verify statistics are set to unknown
2031        assert_eq!(config.statistics().num_rows, Precision::Absent);
2032        assert_eq!(config.statistics().total_byte_size, Precision::Absent);
2033        assert_eq!(
2034            config.statistics().column_statistics.len(),
2035            file_schema.fields().len()
2036        );
2037        for stat in config.statistics().column_statistics {
2038            assert_eq!(stat.distinct_count, Precision::Absent);
2039            assert_eq!(stat.min_value, Precision::Absent);
2040            assert_eq!(stat.max_value, Precision::Absent);
2041            assert_eq!(stat.null_count, Precision::Absent);
2042        }
2043    }
2044
2045    #[test]
2046    fn test_file_scan_config_builder_new_from() {
2047        let schema = aggr_test_schema();
2048        let object_store_url = ObjectStoreUrl::parse("test:///").unwrap();
2049        let partition_cols = vec![Field::new(
2050            "date",
2051            wrap_partition_type_in_dict(DataType::Utf8),
2052            false,
2053        )];
2054        let file = PartitionedFile::new("test_file.parquet", 100);
2055
2056        let table_schema = TableSchema::new(
2057            Arc::clone(&schema),
2058            partition_cols.iter().map(|f| Arc::new(f.clone())).collect(),
2059        );
2060
2061        let file_source: Arc<dyn FileSource> =
2062            Arc::new(MockSource::new(table_schema.clone()));
2063
2064        // Create a config with non-default values
2065        let original_config = FileScanConfigBuilder::new(
2066            object_store_url.clone(),
2067            Arc::clone(&file_source),
2068        )
2069        .with_projection_indices(Some(vec![0, 2]))
2070        .unwrap()
2071        .with_limit(Some(10))
2072        .with_file(file.clone())
2073        .with_constraints(Constraints::default())
2074        .build();
2075
2076        // Create a new builder from the config
2077        let new_builder = FileScanConfigBuilder::from(original_config);
2078
2079        // Build a new config from this builder
2080        let new_config = new_builder.build();
2081
2082        // Verify properties match
2083        let partition_cols = partition_cols.into_iter().map(Arc::new).collect::<Vec<_>>();
2084        assert_eq!(new_config.object_store_url, object_store_url);
2085        assert_eq!(*new_config.file_schema(), schema);
2086        assert_eq!(
2087            new_config
2088                .file_source
2089                .projection()
2090                .as_ref()
2091                .map(|p| p.column_indices()),
2092            Some(vec![0, 2])
2093        );
2094        assert_eq!(new_config.limit, Some(10));
2095        assert_eq!(*new_config.table_partition_cols(), partition_cols);
2096        assert_eq!(new_config.file_groups.len(), 1);
2097        assert_eq!(new_config.file_groups[0].len(), 1);
2098        assert_eq!(
2099            new_config.file_groups[0][0].object_meta.location.as_ref(),
2100            "test_file.parquet"
2101        );
2102        assert_eq!(new_config.constraints, Constraints::default());
2103    }
2104
2105    #[test]
2106    fn test_split_groups_by_statistics_with_target_partitions() -> Result<()> {
2107        use datafusion_common::DFSchema;
2108        use datafusion_expr::{col, execution_props::ExecutionProps};
2109
2110        let schema = Arc::new(Schema::new(vec![Field::new(
2111            "value",
2112            DataType::Float64,
2113            false,
2114        )]));
2115
2116        // Setup sort expression
2117        let exec_props = ExecutionProps::new();
2118        let df_schema = DFSchema::try_from_qualified_schema("test", schema.as_ref())?;
2119        let sort_expr = [col("value").sort(true, false)];
2120        let sort_ordering = sort_expr
2121            .map(|expr| {
2122                create_physical_sort_expr(&expr, &df_schema, &exec_props).unwrap()
2123            })
2124            .into();
2125
2126        // Test case parameters
2127        struct TestCase {
2128            name: String,
2129            file_count: usize,
2130            overlap_factor: f64,
2131            target_partitions: usize,
2132            expected_partition_count: usize,
2133        }
2134
2135        let test_cases = vec![
2136            // Basic cases
2137            TestCase {
2138                name: "no_overlap_10_files_4_partitions".to_string(),
2139                file_count: 10,
2140                overlap_factor: 0.0,
2141                target_partitions: 4,
2142                expected_partition_count: 4,
2143            },
2144            TestCase {
2145                name: "medium_overlap_20_files_5_partitions".to_string(),
2146                file_count: 20,
2147                overlap_factor: 0.5,
2148                target_partitions: 5,
2149                expected_partition_count: 5,
2150            },
2151            TestCase {
2152                name: "high_overlap_30_files_3_partitions".to_string(),
2153                file_count: 30,
2154                overlap_factor: 0.8,
2155                target_partitions: 3,
2156                expected_partition_count: 7,
2157            },
2158            // Edge cases
2159            TestCase {
2160                name: "fewer_files_than_partitions".to_string(),
2161                file_count: 3,
2162                overlap_factor: 0.0,
2163                target_partitions: 10,
2164                expected_partition_count: 3, // Should only create as many partitions as files
2165            },
2166            TestCase {
2167                name: "single_file".to_string(),
2168                file_count: 1,
2169                overlap_factor: 0.0,
2170                target_partitions: 5,
2171                expected_partition_count: 1, // Should create only one partition
2172            },
2173            TestCase {
2174                name: "empty_files".to_string(),
2175                file_count: 0,
2176                overlap_factor: 0.0,
2177                target_partitions: 3,
2178                expected_partition_count: 0, // Empty result for empty input
2179            },
2180        ];
2181
2182        for case in test_cases {
2183            println!("Running test case: {}", case.name);
2184
2185            // Generate files using bench utility function
2186            let file_groups = generate_test_files(case.file_count, case.overlap_factor);
2187
2188            // Call the function under test
2189            let result =
2190                FileScanConfig::split_groups_by_statistics_with_target_partitions(
2191                    &schema,
2192                    &file_groups,
2193                    &sort_ordering,
2194                    case.target_partitions,
2195                )?;
2196
2197            // Verify results
2198            println!(
2199                "Created {} partitions (target was {})",
2200                result.len(),
2201                case.target_partitions
2202            );
2203
2204            // Check partition count
2205            assert_eq!(
2206                result.len(),
2207                case.expected_partition_count,
2208                "Case '{}': Unexpected partition count",
2209                case.name
2210            );
2211
2212            // Verify sort integrity
2213            assert!(
2214                verify_sort_integrity(&result),
2215                "Case '{}': Files within partitions are not properly ordered",
2216                case.name
2217            );
2218
2219            // Distribution check for partitions
2220            if case.file_count > 1 && case.expected_partition_count > 1 {
2221                let group_sizes: Vec<usize> = result.iter().map(FileGroup::len).collect();
2222                let max_size = *group_sizes.iter().max().unwrap();
2223                let min_size = *group_sizes.iter().min().unwrap();
2224
2225                // Check partition balancing - difference shouldn't be extreme
2226                let avg_files_per_partition =
2227                    case.file_count as f64 / case.expected_partition_count as f64;
2228                assert!(
2229                    (max_size as f64) < 2.0 * avg_files_per_partition,
2230                    "Case '{}': Unbalanced distribution. Max partition size {} exceeds twice the average {}",
2231                    case.name,
2232                    max_size,
2233                    avg_files_per_partition
2234                );
2235
2236                println!("Distribution - min files: {min_size}, max files: {max_size}");
2237            }
2238        }
2239
2240        // Test error case: zero target partitions
2241        let empty_groups: Vec<FileGroup> = vec![];
2242        let err = FileScanConfig::split_groups_by_statistics_with_target_partitions(
2243            &schema,
2244            &empty_groups,
2245            &sort_ordering,
2246            0,
2247        )
2248        .unwrap_err();
2249
2250        assert!(
2251            err.to_string()
2252                .contains("target_partitions must be greater than 0"),
2253            "Expected error for zero target partitions"
2254        );
2255
2256        Ok(())
2257    }
2258
2259    #[test]
2260    fn test_partition_statistics_projection() {
2261        // This test verifies that partition_statistics applies projection correctly.
2262        // The old implementation had a bug where it returned file group statistics
2263        // without applying the projection, returning all column statistics instead
2264        // of just the projected ones.
2265
2266        use crate::source::DataSourceExec;
2267        use datafusion_physical_plan::ExecutionPlan;
2268
2269        // Create a schema with 4 columns
2270        let schema = Arc::new(Schema::new(vec![
2271            Field::new("col0", DataType::Int32, false),
2272            Field::new("col1", DataType::Int32, false),
2273            Field::new("col2", DataType::Int32, false),
2274            Field::new("col3", DataType::Int32, false),
2275        ]));
2276
2277        // Create statistics for all 4 columns
2278        let file_group_stats = Statistics {
2279            num_rows: Precision::Exact(100),
2280            total_byte_size: Precision::Exact(1024),
2281            column_statistics: vec![
2282                ColumnStatistics {
2283                    null_count: Precision::Exact(0),
2284                    ..ColumnStatistics::new_unknown()
2285                },
2286                ColumnStatistics {
2287                    null_count: Precision::Exact(5),
2288                    ..ColumnStatistics::new_unknown()
2289                },
2290                ColumnStatistics {
2291                    null_count: Precision::Exact(10),
2292                    ..ColumnStatistics::new_unknown()
2293                },
2294                ColumnStatistics {
2295                    null_count: Precision::Exact(15),
2296                    ..ColumnStatistics::new_unknown()
2297                },
2298            ],
2299        };
2300
2301        // Create a file group with statistics
2302        let file_group = FileGroup::new(vec![PartitionedFile::new("test.parquet", 1024)])
2303            .with_statistics(Arc::new(file_group_stats));
2304
2305        let table_schema = TableSchema::new(Arc::clone(&schema), vec![]);
2306
2307        // Create a FileScanConfig with projection: only keep columns 0 and 2
2308        let config = FileScanConfigBuilder::new(
2309            ObjectStoreUrl::parse("test:///").unwrap(),
2310            Arc::new(MockSource::new(table_schema.clone())),
2311        )
2312        .with_projection_indices(Some(vec![0, 2]))
2313        .unwrap() // Only project columns 0 and 2
2314        .with_file_groups(vec![file_group])
2315        .build();
2316
2317        // Create a DataSourceExec from the config
2318        let exec = DataSourceExec::from_data_source(config);
2319
2320        // Get statistics for partition 0
2321        let partition_stats = exec.partition_statistics(Some(0)).unwrap();
2322
2323        // Verify that only 2 columns are in the statistics (the projected ones)
2324        assert_eq!(
2325            partition_stats.column_statistics.len(),
2326            2,
2327            "Expected 2 column statistics (projected), but got {}",
2328            partition_stats.column_statistics.len()
2329        );
2330
2331        // Verify the column statistics are for columns 0 and 2
2332        assert_eq!(
2333            partition_stats.column_statistics[0].null_count,
2334            Precision::Exact(0),
2335            "First projected column should be col0 with 0 nulls"
2336        );
2337        assert_eq!(
2338            partition_stats.column_statistics[1].null_count,
2339            Precision::Exact(10),
2340            "Second projected column should be col2 with 10 nulls"
2341        );
2342
2343        // Verify row count and byte size
2344        assert_eq!(partition_stats.num_rows, Precision::Exact(100));
2345        assert_eq!(partition_stats.total_byte_size, Precision::Exact(800));
2346    }
2347
2348    /// Regression test for reusing a `DataSourceExec` after its execution-local
2349    /// shared work queue has been drained.
2350    ///
2351    /// This test uses a single file group with two files so the scan creates a
2352    /// shared unopened-file queue. Executing after `reset_state` must recreate
2353    /// the shared queue and return the same rows again.
2354    #[tokio::test]
2355    async fn reset_state_recreates_shared_work_source() -> Result<()> {
2356        let schema = Arc::new(Schema::new(vec![Field::new(
2357            "value",
2358            DataType::Int32,
2359            false,
2360        )]));
2361        let file_source = Arc::new(
2362            MockSource::new(Arc::clone(&schema))
2363                .with_file_opener(Arc::new(ResetStateTestFileOpener { schema })),
2364        );
2365
2366        let config =
2367            FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source)
2368                .with_file_group(FileGroup::new(vec![
2369                    PartitionedFile::new("file1.parquet", 100),
2370                    PartitionedFile::new("file2.parquet", 100),
2371                ]))
2372                .build();
2373
2374        let exec: Arc<dyn ExecutionPlan> = DataSourceExec::from_data_source(config);
2375        let task_ctx = Arc::new(TaskContext::default());
2376
2377        // Running the same scan after resetting the state, should
2378        // produce the same answer.
2379        let first_run = collect(Arc::clone(&exec), Arc::clone(&task_ctx)).await?;
2380        let reset_exec = exec.reset_state()?;
2381        let second_run = collect(reset_exec, task_ctx).await?;
2382
2383        let expected = [
2384            "+-------+",
2385            "| value |",
2386            "+-------+",
2387            "| 1     |",
2388            "| 2     |",
2389            "+-------+",
2390        ];
2391        assert_batches_eq!(expected, &first_run);
2392        assert_batches_eq!(expected, &second_run);
2393
2394        Ok(())
2395    }
2396
2397    /// Test-only `FileOpener` that turns file names like `file1.parquet` into a
2398    /// single-batch stream containing that numeric value
2399    #[derive(Debug)]
2400    struct ResetStateTestFileOpener {
2401        schema: SchemaRef,
2402    }
2403
2404    impl crate::file_stream::FileOpener for ResetStateTestFileOpener {
2405        fn open(
2406            &self,
2407            file: PartitionedFile,
2408        ) -> Result<crate::file_stream::FileOpenFuture> {
2409            let value = file
2410                .object_meta
2411                .location
2412                .as_ref()
2413                .trim_start_matches("file")
2414                .trim_end_matches(".parquet")
2415                .parse::<i32>()
2416                .expect("invalid test file name");
2417            let schema = Arc::clone(&self.schema);
2418            Ok(async move {
2419                let batch = RecordBatch::try_new(
2420                    schema,
2421                    vec![Arc::new(Int32Array::from(vec![value]))],
2422                )
2423                .expect("test batch should be valid");
2424                Ok(stream::iter(vec![Ok(batch)]).boxed())
2425            }
2426            .boxed())
2427        }
2428    }
2429
2430    #[test]
2431    fn test_output_partitioning_not_partitioned_by_file_group() {
2432        let file_schema = aggr_test_schema();
2433        let partition_col =
2434            Field::new("date", wrap_partition_type_in_dict(DataType::Utf8), false);
2435
2436        let config = config_for_projection(
2437            Arc::clone(&file_schema),
2438            None,
2439            Statistics::new_unknown(&file_schema),
2440            vec![partition_col],
2441        );
2442
2443        // partitioned_by_file_group defaults to false
2444        let partitioning = config.output_partitioning();
2445        assert!(matches!(partitioning, Partitioning::UnknownPartitioning(_)));
2446    }
2447
2448    #[test]
2449    fn test_output_partitioning_no_partition_columns() {
2450        let file_schema = aggr_test_schema();
2451        let mut config = config_for_projection(
2452            Arc::clone(&file_schema),
2453            None,
2454            Statistics::new_unknown(&file_schema),
2455            vec![], // No partition columns
2456        );
2457        config.partitioned_by_file_group = true;
2458
2459        let partitioning = config.output_partitioning();
2460        assert!(matches!(partitioning, Partitioning::UnknownPartitioning(_)));
2461    }
2462
2463    #[test]
2464    fn test_output_partitioning_with_partition_columns() {
2465        let file_schema = aggr_test_schema();
2466
2467        // Test single partition column
2468        let single_partition_col = vec![Field::new(
2469            "date",
2470            wrap_partition_type_in_dict(DataType::Utf8),
2471            false,
2472        )];
2473
2474        let mut config = config_for_projection(
2475            Arc::clone(&file_schema),
2476            None,
2477            Statistics::new_unknown(&file_schema),
2478            single_partition_col,
2479        );
2480        config.partitioned_by_file_group = true;
2481        config.file_groups = vec![
2482            FileGroup::new(vec![PartitionedFile::new("f1.parquet".to_string(), 1024)]),
2483            FileGroup::new(vec![PartitionedFile::new("f2.parquet".to_string(), 1024)]),
2484            FileGroup::new(vec![PartitionedFile::new("f3.parquet".to_string(), 1024)]),
2485        ];
2486
2487        let partitioning = config.output_partitioning();
2488        match partitioning {
2489            Partitioning::Hash(exprs, num_partitions) => {
2490                assert_eq!(num_partitions, 3);
2491                assert_eq!(exprs.len(), 1);
2492                assert_eq!(exprs[0].downcast_ref::<Column>().unwrap().name(), "date");
2493            }
2494            _ => panic!("Expected Hash partitioning"),
2495        }
2496
2497        // Test multiple partition columns
2498        let multiple_partition_cols = vec![
2499            Field::new("year", wrap_partition_type_in_dict(DataType::Utf8), false),
2500            Field::new("month", wrap_partition_type_in_dict(DataType::Utf8), false),
2501        ];
2502
2503        config = config_for_projection(
2504            Arc::clone(&file_schema),
2505            None,
2506            Statistics::new_unknown(&file_schema),
2507            multiple_partition_cols,
2508        );
2509        config.partitioned_by_file_group = true;
2510        config.file_groups = vec![
2511            FileGroup::new(vec![PartitionedFile::new("f1.parquet".to_string(), 1024)]),
2512            FileGroup::new(vec![PartitionedFile::new("f2.parquet".to_string(), 1024)]),
2513        ];
2514
2515        let partitioning = config.output_partitioning();
2516        match partitioning {
2517            Partitioning::Hash(exprs, num_partitions) => {
2518                assert_eq!(num_partitions, 2);
2519                assert_eq!(exprs.len(), 2);
2520                let col_names: Vec<_> = exprs
2521                    .iter()
2522                    .map(|e| e.downcast_ref::<Column>().unwrap().name())
2523                    .collect();
2524                assert_eq!(col_names, vec!["year", "month"]);
2525            }
2526            _ => panic!("Expected Hash partitioning"),
2527        }
2528    }
2529
2530    #[test]
2531    fn try_pushdown_sort_reverses_file_groups_only_when_requested_is_reverse()
2532    -> Result<()> {
2533        let file_schema =
2534            Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)]));
2535
2536        let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]);
2537        let file_source = Arc::new(InexactSortPushdownSource::new(table_schema));
2538
2539        let file_groups = vec![FileGroup::new(vec![
2540            PartitionedFile::new("file1", 1),
2541            PartitionedFile::new("file2", 1),
2542        ])];
2543
2544        let sort_expr_asc = PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)));
2545        let config =
2546            FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source)
2547                .with_file_groups(file_groups)
2548                .with_output_ordering(vec![
2549                    LexOrdering::new(vec![sort_expr_asc.clone()]).unwrap(),
2550                ])
2551                .build();
2552
2553        let requested_asc = vec![sort_expr_asc.clone()];
2554        let result = config.try_pushdown_sort(&requested_asc)?;
2555        let SortOrderPushdownResult::Inexact { inner } = result else {
2556            panic!("Expected Inexact result");
2557        };
2558        let pushed_config = inner
2559            .downcast_ref::<FileScanConfig>()
2560            .expect("Expected FileScanConfig");
2561        let pushed_files = pushed_config.file_groups[0].files();
2562        assert_eq!(pushed_files[0].object_meta.location.as_ref(), "file1");
2563        assert_eq!(pushed_files[1].object_meta.location.as_ref(), "file2");
2564
2565        let requested_desc = vec![sort_expr_asc.reverse()];
2566        let result = config.try_pushdown_sort(&requested_desc)?;
2567        let SortOrderPushdownResult::Inexact { inner } = result else {
2568            panic!("Expected Inexact result");
2569        };
2570        let pushed_config = inner
2571            .downcast_ref::<FileScanConfig>()
2572            .expect("Expected FileScanConfig");
2573        let pushed_files = pushed_config.file_groups[0].files();
2574        assert_eq!(pushed_files[0].object_meta.location.as_ref(), "file2");
2575        assert_eq!(pushed_files[1].object_meta.location.as_ref(), "file1");
2576
2577        Ok(())
2578    }
2579
2580    fn make_file_with_stats(name: &str, min: f64, max: f64) -> PartitionedFile {
2581        PartitionedFile::new(name.to_string(), 1024).with_statistics(Arc::new(
2582            Statistics {
2583                num_rows: Precision::Exact(100),
2584                total_byte_size: Precision::Exact(1024),
2585                column_statistics: vec![ColumnStatistics {
2586                    null_count: Precision::Exact(0),
2587                    min_value: Precision::Exact(ScalarValue::Float64(Some(min))),
2588                    max_value: Precision::Exact(ScalarValue::Float64(Some(max))),
2589                    ..Default::default()
2590                }],
2591            },
2592        ))
2593    }
2594
2595    #[derive(Clone)]
2596    struct ExactSortPushdownSource {
2597        metrics: ExecutionPlanMetricsSet,
2598        table_schema: TableSchema,
2599    }
2600
2601    impl ExactSortPushdownSource {
2602        fn new(table_schema: TableSchema) -> Self {
2603            Self {
2604                metrics: ExecutionPlanMetricsSet::new(),
2605                table_schema,
2606            }
2607        }
2608    }
2609
2610    impl FileSource for ExactSortPushdownSource {
2611        fn create_file_opener(
2612            &self,
2613            _object_store: Arc<dyn ObjectStore>,
2614            _base_config: &FileScanConfig,
2615            _partition: usize,
2616        ) -> Result<Arc<dyn crate::file_stream::FileOpener>> {
2617            unimplemented!()
2618        }
2619
2620        fn table_schema(&self) -> &TableSchema {
2621            &self.table_schema
2622        }
2623
2624        fn with_batch_size(&self, _batch_size: usize) -> Arc<dyn FileSource> {
2625            Arc::new(self.clone())
2626        }
2627
2628        fn metrics(&self) -> &ExecutionPlanMetricsSet {
2629            &self.metrics
2630        }
2631
2632        fn file_type(&self) -> &str {
2633            "mock_exact"
2634        }
2635
2636        fn try_pushdown_sort(
2637            &self,
2638            _order: &[PhysicalSortExpr],
2639            _eq_properties: &EquivalenceProperties,
2640        ) -> Result<SortOrderPushdownResult<Arc<dyn FileSource>>> {
2641            Ok(SortOrderPushdownResult::Exact {
2642                inner: Arc::new(self.clone()) as Arc<dyn FileSource>,
2643            })
2644        }
2645    }
2646
2647    #[test]
2648    fn sort_pushdown_unsupported_source_files_get_sorted() -> Result<()> {
2649        let file_schema =
2650            Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, false)]));
2651        let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]);
2652        let file_source = Arc::new(MockSource::new(table_schema));
2653
2654        let file_groups = vec![FileGroup::new(vec![
2655            make_file_with_stats("file3", 20.0, 30.0),
2656            make_file_with_stats("file1", 0.0, 9.0),
2657            make_file_with_stats("file2", 10.0, 19.0),
2658        ])];
2659
2660        let sort_expr = PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)));
2661        let config =
2662            FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source)
2663                .with_file_groups(file_groups)
2664                .build();
2665
2666        let result = config.try_pushdown_sort(&[sort_expr])?;
2667        let SortOrderPushdownResult::Inexact { inner } = result else {
2668            panic!("Expected Inexact result, got {result:?}");
2669        };
2670        let pushed_config = inner
2671            .downcast_ref::<FileScanConfig>()
2672            .expect("Expected FileScanConfig");
2673        let files = pushed_config.file_groups[0].files();
2674        assert_eq!(files[0].object_meta.location.as_ref(), "file1");
2675        assert_eq!(files[1].object_meta.location.as_ref(), "file2");
2676        assert_eq!(files[2].object_meta.location.as_ref(), "file3");
2677        assert!(pushed_config.output_ordering.is_empty());
2678        Ok(())
2679    }
2680
2681    #[test]
2682    fn sort_pushdown_unsupported_source_already_sorted() -> Result<()> {
2683        let file_schema =
2684            Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, false)]));
2685        let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]);
2686        let file_source = Arc::new(MockSource::new(table_schema));
2687
2688        let file_groups = vec![FileGroup::new(vec![
2689            make_file_with_stats("file1", 0.0, 9.0),
2690            make_file_with_stats("file2", 10.0, 19.0),
2691            make_file_with_stats("file3", 20.0, 30.0),
2692        ])];
2693
2694        let sort_expr = PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)));
2695        let config =
2696            FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source)
2697                .with_file_groups(file_groups)
2698                .build();
2699
2700        let result = config.try_pushdown_sort(&[sort_expr])?;
2701        assert!(matches!(result, SortOrderPushdownResult::Unsupported));
2702        Ok(())
2703    }
2704
2705    #[test]
2706    fn sort_pushdown_unsupported_source_descending_sort() -> Result<()> {
2707        let file_schema =
2708            Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, false)]));
2709        let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]);
2710        let file_source = Arc::new(MockSource::new(table_schema));
2711
2712        let file_groups = vec![FileGroup::new(vec![
2713            make_file_with_stats("file1", 0.0, 9.0),
2714            make_file_with_stats("file3", 20.0, 30.0),
2715            make_file_with_stats("file2", 10.0, 19.0),
2716        ])];
2717
2718        let sort_expr = PhysicalSortExpr::new(
2719            Arc::new(Column::new("a", 0)),
2720            arrow::compute::SortOptions {
2721                descending: true,
2722                nulls_first: true,
2723            },
2724        );
2725        let config =
2726            FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source)
2727                .with_file_groups(file_groups)
2728                .build();
2729
2730        let result = config.try_pushdown_sort(&[sort_expr])?;
2731        let SortOrderPushdownResult::Inexact { inner } = result else {
2732            panic!("Expected Inexact result");
2733        };
2734        let pushed_config = inner
2735            .downcast_ref::<FileScanConfig>()
2736            .expect("Expected FileScanConfig");
2737        let files = pushed_config.file_groups[0].files();
2738        assert_eq!(files[0].object_meta.location.as_ref(), "file3");
2739        assert_eq!(files[1].object_meta.location.as_ref(), "file2");
2740        assert_eq!(files[2].object_meta.location.as_ref(), "file1");
2741        Ok(())
2742    }
2743
2744    #[test]
2745    fn sort_pushdown_exact_source_non_overlapping_returns_exact() -> Result<()> {
2746        let file_schema =
2747            Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, false)]));
2748        let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]);
2749        let file_source = Arc::new(ExactSortPushdownSource::new(table_schema));
2750
2751        let sort_expr = PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)));
2752
2753        let file_groups = vec![FileGroup::new(vec![
2754            make_file_with_stats("file1", 0.0, 9.0),
2755            make_file_with_stats("file2", 10.0, 19.0),
2756            make_file_with_stats("file3", 20.0, 30.0),
2757        ])];
2758
2759        let config =
2760            FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source)
2761                .with_file_groups(file_groups)
2762                .with_output_ordering(vec![
2763                    LexOrdering::new(vec![sort_expr.clone()]).unwrap(),
2764                ])
2765                .build();
2766
2767        let result = config.try_pushdown_sort(&[sort_expr])?;
2768        let SortOrderPushdownResult::Exact { inner } = result else {
2769            panic!("Expected Exact result, got {result:?}");
2770        };
2771        let pushed_config = inner
2772            .downcast_ref::<FileScanConfig>()
2773            .expect("Expected FileScanConfig");
2774        assert!(!pushed_config.output_ordering.is_empty());
2775        Ok(())
2776    }
2777
2778    #[test]
2779    fn sort_pushdown_exact_source_overlapping_downgraded_to_inexact() -> Result<()> {
2780        let file_schema =
2781            Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, false)]));
2782        let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]);
2783        let file_source = Arc::new(ExactSortPushdownSource::new(table_schema));
2784
2785        let sort_expr = PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)));
2786
2787        let file_groups = vec![FileGroup::new(vec![
2788            make_file_with_stats("file1", 0.0, 15.0),
2789            make_file_with_stats("file2", 10.0, 25.0),
2790            make_file_with_stats("file3", 20.0, 30.0),
2791        ])];
2792
2793        let config =
2794            FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source)
2795                .with_file_groups(file_groups)
2796                .with_output_ordering(vec![
2797                    LexOrdering::new(vec![sort_expr.clone()]).unwrap(),
2798                ])
2799                .build();
2800
2801        let result = config.try_pushdown_sort(&[sort_expr])?;
2802        let SortOrderPushdownResult::Inexact { inner } = result else {
2803            panic!("Expected Inexact (downgraded), got {result:?}");
2804        };
2805        let pushed_config = inner
2806            .downcast_ref::<FileScanConfig>()
2807            .expect("Expected FileScanConfig");
2808        assert!(pushed_config.output_ordering.is_empty());
2809        Ok(())
2810    }
2811
2812    #[test]
2813    fn sort_pushdown_exact_source_out_of_order_returns_exact() -> Result<()> {
2814        let file_schema =
2815            Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, false)]));
2816        let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]);
2817        let file_source = Arc::new(ExactSortPushdownSource::new(table_schema));
2818
2819        let sort_expr = PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)));
2820
2821        let file_groups = vec![FileGroup::new(vec![
2822            make_file_with_stats("file3", 20.0, 30.0),
2823            make_file_with_stats("file1", 0.0, 9.0),
2824            make_file_with_stats("file2", 10.0, 19.0),
2825        ])];
2826
2827        let config =
2828            FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source)
2829                .with_file_groups(file_groups)
2830                .with_output_ordering(vec![
2831                    LexOrdering::new(vec![sort_expr.clone()]).unwrap(),
2832                ])
2833                .build();
2834
2835        let result = config.try_pushdown_sort(&[sort_expr])?;
2836        let SortOrderPushdownResult::Exact { inner } = result else {
2837            panic!("Expected Exact result, got {result:?}");
2838        };
2839        let pushed_config = inner
2840            .downcast_ref::<FileScanConfig>()
2841            .expect("Expected FileScanConfig");
2842        let files = pushed_config.file_groups[0].files();
2843        assert_eq!(files[0].object_meta.location.as_ref(), "file1");
2844        assert_eq!(files[1].object_meta.location.as_ref(), "file2");
2845        assert_eq!(files[2].object_meta.location.as_ref(), "file3");
2846        assert!(!pushed_config.output_ordering.is_empty());
2847        Ok(())
2848    }
2849
2850    #[test]
2851    fn sort_pushdown_unsupported_source_single_file_groups() -> Result<()> {
2852        let file_schema =
2853            Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, false)]));
2854        let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]);
2855        let file_source = Arc::new(MockSource::new(table_schema));
2856
2857        let file_groups = vec![
2858            FileGroup::new(vec![make_file_with_stats("file1", 0.0, 9.0)]),
2859            FileGroup::new(vec![make_file_with_stats("file2", 10.0, 19.0)]),
2860        ];
2861
2862        let sort_expr = PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)));
2863        let config =
2864            FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source)
2865                .with_file_groups(file_groups)
2866                .build();
2867
2868        let result = config.try_pushdown_sort(&[sort_expr])?;
2869        assert!(
2870            matches!(result, SortOrderPushdownResult::Unsupported),
2871            "Expected Unsupported for single-file groups"
2872        );
2873        Ok(())
2874    }
2875
2876    #[test]
2877    fn sort_pushdown_unsupported_source_multiple_groups() -> Result<()> {
2878        let file_schema =
2879            Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, false)]));
2880        let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]);
2881        let file_source = Arc::new(MockSource::new(table_schema));
2882
2883        let file_groups = vec![
2884            FileGroup::new(vec![
2885                make_file_with_stats("file_b", 10.0, 19.0),
2886                make_file_with_stats("file_a", 0.0, 9.0),
2887            ]),
2888            FileGroup::new(vec![
2889                make_file_with_stats("file_d", 30.0, 39.0),
2890                make_file_with_stats("file_c", 20.0, 29.0),
2891            ]),
2892        ];
2893
2894        let sort_expr = PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)));
2895        let config =
2896            FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source)
2897                .with_file_groups(file_groups)
2898                .build();
2899
2900        let result = config.try_pushdown_sort(&[sort_expr])?;
2901        let SortOrderPushdownResult::Inexact { inner } = result else {
2902            panic!("Expected Inexact result");
2903        };
2904        let pushed_config = inner
2905            .downcast_ref::<FileScanConfig>()
2906            .expect("Expected FileScanConfig");
2907        let files0 = pushed_config.file_groups[0].files();
2908        assert_eq!(files0[0].object_meta.location.as_ref(), "file_a");
2909        assert_eq!(files0[1].object_meta.location.as_ref(), "file_b");
2910        let files1 = pushed_config.file_groups[1].files();
2911        assert_eq!(files1[0].object_meta.location.as_ref(), "file_c");
2912        assert_eq!(files1[1].object_meta.location.as_ref(), "file_d");
2913        Ok(())
2914    }
2915
2916    #[test]
2917    fn sort_pushdown_unsupported_source_partial_statistics() -> Result<()> {
2918        let file_schema =
2919            Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, false)]));
2920        let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]);
2921        let file_source = Arc::new(MockSource::new(table_schema));
2922
2923        let file_groups = vec![
2924            FileGroup::new(vec![
2925                make_file_with_stats("file_b", 10.0, 19.0),
2926                make_file_with_stats("file_a", 0.0, 9.0),
2927            ]),
2928            FileGroup::new(vec![
2929                PartitionedFile::new("file_d".to_string(), 1024),
2930                PartitionedFile::new("file_c".to_string(), 1024),
2931            ]),
2932        ];
2933
2934        let sort_expr = PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)));
2935        let config =
2936            FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source)
2937                .with_file_groups(file_groups)
2938                .build();
2939
2940        let result = config.try_pushdown_sort(&[sort_expr])?;
2941        let SortOrderPushdownResult::Inexact { inner } = result else {
2942            panic!("Expected Inexact result");
2943        };
2944        let pushed_config = inner
2945            .downcast_ref::<FileScanConfig>()
2946            .expect("Expected FileScanConfig");
2947        let files0 = pushed_config.file_groups[0].files();
2948        assert_eq!(files0[0].object_meta.location.as_ref(), "file_a");
2949        assert_eq!(files0[1].object_meta.location.as_ref(), "file_b");
2950        let files1 = pushed_config.file_groups[1].files();
2951        assert_eq!(files1[0].object_meta.location.as_ref(), "file_d");
2952        assert_eq!(files1[1].object_meta.location.as_ref(), "file_c");
2953        Ok(())
2954    }
2955
2956    #[test]
2957    fn sort_pushdown_inexact_source_with_statistics_sorting() -> Result<()> {
2958        let file_schema =
2959            Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, false)]));
2960        let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]);
2961        let file_source = Arc::new(InexactSortPushdownSource::new(table_schema));
2962
2963        let file_groups = vec![FileGroup::new(vec![
2964            make_file_with_stats("file2", 10.0, 19.0),
2965            make_file_with_stats("file1", 0.0, 9.0),
2966        ])];
2967
2968        let sort_expr = PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)));
2969        let config =
2970            FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source)
2971                .with_file_groups(file_groups)
2972                .build();
2973
2974        let result = config.try_pushdown_sort(&[sort_expr])?;
2975        let SortOrderPushdownResult::Inexact { inner } = result else {
2976            panic!("Expected Inexact result");
2977        };
2978        let pushed_config = inner
2979            .downcast_ref::<FileScanConfig>()
2980            .expect("Expected FileScanConfig");
2981        let files = pushed_config.file_groups[0].files();
2982        assert_eq!(files[0].object_meta.location.as_ref(), "file1");
2983        assert_eq!(files[1].object_meta.location.as_ref(), "file2");
2984        assert!(pushed_config.output_ordering.is_empty());
2985        Ok(())
2986    }
2987
2988    #[test]
2989    fn sort_pushdown_exact_multi_group_preserves_parallelism() -> Result<()> {
2990        // ExactSortPushdownSource + 4 non-overlapping files in 2 interleaved groups.
2991        // Groups should NOT be redistributed — interleaved groups allow SPM to
2992        // pull from both partitions concurrently, keeping parallel I/O active.
2993        // Redistributing consecutively would make SPM read one partition at a
2994        // time (all values in group 0 < group 1), degrading to single-threaded I/O.
2995        let file_schema =
2996            Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, false)]));
2997        let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]);
2998        let file_source = Arc::new(ExactSortPushdownSource::new(table_schema));
2999
3000        let sort_expr = PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)));
3001
3002        // 2 groups with interleaved ranges (simulating bin-packing result):
3003        // Group 0: [file_01(0-9), file_03(20-29)]
3004        // Group 1: [file_02(10-19), file_04(30-39)]
3005        let file_groups = vec![
3006            FileGroup::new(vec![
3007                make_file_with_stats("file_01", 0.0, 9.0),
3008                make_file_with_stats("file_03", 20.0, 29.0),
3009            ]),
3010            FileGroup::new(vec![
3011                make_file_with_stats("file_02", 10.0, 19.0),
3012                make_file_with_stats("file_04", 30.0, 39.0),
3013            ]),
3014        ];
3015
3016        let config =
3017            FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source)
3018                .with_file_groups(file_groups)
3019                .with_output_ordering(vec![
3020                    LexOrdering::new(vec![sort_expr.clone()]).unwrap(),
3021                ])
3022                .build();
3023
3024        let result = config.try_pushdown_sort(&[sort_expr])?;
3025        let SortOrderPushdownResult::Exact { inner } = result else {
3026            panic!("Expected Exact result, got {result:?}");
3027        };
3028        let pushed_config = inner
3029            .downcast_ref::<FileScanConfig>()
3030            .expect("Expected FileScanConfig");
3031
3032        // 2 groups preserved (parallelism maintained)
3033        assert_eq!(pushed_config.file_groups.len(), 2);
3034
3035        // Files within each group are sorted by stats, but groups are NOT
3036        // redistributed — interleaved assignment from bin-packing is kept
3037        let files0 = pushed_config.file_groups[0].files();
3038        assert_eq!(files0[0].object_meta.location.as_ref(), "file_01");
3039        assert_eq!(files0[1].object_meta.location.as_ref(), "file_03");
3040        let files1 = pushed_config.file_groups[1].files();
3041        assert_eq!(files1[0].object_meta.location.as_ref(), "file_02");
3042        assert_eq!(files1[1].object_meta.location.as_ref(), "file_04");
3043
3044        // output_ordering preserved (Exact, each group internally non-overlapping)
3045        assert!(!pushed_config.output_ordering.is_empty());
3046        Ok(())
3047    }
3048
3049    #[test]
3050    fn sort_pushdown_reverse_preserves_file_order_with_stats() -> Result<()> {
3051        // Reverse scan should reverse file order but NOT apply statistics-based
3052        // sorting (which would undo the reversal). The result is Inexact.
3053        let file_schema =
3054            Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, false)]));
3055        let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]);
3056        let file_source = Arc::new(InexactSortPushdownSource::new(table_schema));
3057
3058        let sort_expr = PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)));
3059
3060        // Files with stats, in ASC order. Output ordering is [a ASC].
3061        let file_groups = vec![FileGroup::new(vec![
3062            make_file_with_stats("file1", 0.0, 9.0),
3063            make_file_with_stats("file2", 10.0, 19.0),
3064            make_file_with_stats("file3", 20.0, 30.0),
3065        ])];
3066
3067        let config =
3068            FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source)
3069                .with_file_groups(file_groups)
3070                .with_output_ordering(vec![
3071                    LexOrdering::new(vec![sort_expr.clone()]).unwrap(),
3072                ])
3073                .build();
3074
3075        // Request DESC → reverse path
3076        let result = config.try_pushdown_sort(&[sort_expr.reverse()])?;
3077        let SortOrderPushdownResult::Inexact { inner } = result else {
3078            panic!("Expected Inexact for reverse scan, got {result:?}");
3079        };
3080        let pushed_config = inner
3081            .downcast_ref::<FileScanConfig>()
3082            .expect("Expected FileScanConfig");
3083
3084        // Files should be reversed (not re-sorted by stats)
3085        let files = pushed_config.file_groups[0].files();
3086        assert_eq!(files[0].object_meta.location.as_ref(), "file3");
3087        assert_eq!(files[1].object_meta.location.as_ref(), "file2");
3088        assert_eq!(files[2].object_meta.location.as_ref(), "file1");
3089
3090        // output_ordering cleared (Inexact)
3091        assert!(pushed_config.output_ordering.is_empty());
3092        Ok(())
3093    }
3094
3095    /// Helper: create a PartitionedFile with stats including null count
3096    fn make_file_with_null_stats(
3097        name: &str,
3098        min: f64,
3099        max: f64,
3100        null_count: usize,
3101    ) -> PartitionedFile {
3102        PartitionedFile::new(name.to_string(), 1024).with_statistics(Arc::new(
3103            Statistics {
3104                num_rows: Precision::Exact(100),
3105                total_byte_size: Precision::Exact(1024),
3106                column_statistics: vec![ColumnStatistics {
3107                    null_count: Precision::Exact(null_count),
3108                    min_value: Precision::Exact(ScalarValue::Float64(Some(min))),
3109                    max_value: Precision::Exact(ScalarValue::Float64(Some(max))),
3110                    ..Default::default()
3111                }],
3112            },
3113        ))
3114    }
3115
3116    #[test]
3117    fn sort_pushdown_unsupported_with_nulls_does_not_upgrade_to_exact() -> Result<()> {
3118        // Files are non-overlapping but one has NULLs.
3119        // Should NOT upgrade to Exact — NULLs would appear in wrong position.
3120        let file_schema =
3121            Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, true)]));
3122        let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]);
3123        let file_source = Arc::new(MockSource::new(table_schema));
3124
3125        let sort_expr = PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)));
3126
3127        // Files in wrong order (high min first) to trigger reordering
3128        let file_groups = vec![FileGroup::new(vec![
3129            make_file_with_null_stats("b_no_nulls", 10.0, 19.0, 0),
3130            make_file_with_null_stats("a_with_nulls", 0.0, 9.0, 5), // has NULLs
3131        ])];
3132
3133        let config =
3134            FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source)
3135                .with_file_groups(file_groups)
3136                .with_output_ordering(vec![
3137                    LexOrdering::new(vec![sort_expr.clone()]).unwrap(),
3138                ])
3139                .build();
3140
3141        let result = config.try_pushdown_sort(&[sort_expr])?;
3142        // Should be Inexact (not Exact) because of NULLs
3143        assert!(
3144            matches!(result, SortOrderPushdownResult::Inexact { .. }),
3145            "Expected Inexact due to NULLs, got {result:?}"
3146        );
3147        Ok(())
3148    }
3149
3150    #[test]
3151    fn sort_pushdown_unsupported_no_nulls_upgrades_to_exact() -> Result<()> {
3152        // Files are non-overlapping, no NULLs → should upgrade to Exact
3153        let file_schema =
3154            Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, true)]));
3155        let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]);
3156        let file_source = Arc::new(MockSource::new(table_schema));
3157
3158        let sort_expr = PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)));
3159
3160        let file_groups = vec![FileGroup::new(vec![
3161            make_file_with_null_stats("b_high", 10.0, 19.0, 0),
3162            make_file_with_null_stats("a_low", 0.0, 9.0, 0),
3163        ])];
3164
3165        let config =
3166            FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source)
3167                .with_file_groups(file_groups)
3168                .with_output_ordering(vec![
3169                    LexOrdering::new(vec![sort_expr.clone()]).unwrap(),
3170                ])
3171                .build();
3172
3173        let result = config.try_pushdown_sort(&[sort_expr])?;
3174        assert!(
3175            matches!(result, SortOrderPushdownResult::Exact { .. }),
3176            "Expected Exact (no NULLs), got {result:?}"
3177        );
3178        Ok(())
3179    }
3180}