Skip to main content

datafusion_datasource/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#![doc(
19    html_logo_url = "https://raw.githubusercontent.com/apache/datafusion/19fe44cf2f30cbdd63d4a4f52c74055163c6cc38/docs/logos/standalone_logo/logo_original.svg",
20    html_favicon_url = "https://raw.githubusercontent.com/apache/datafusion/19fe44cf2f30cbdd63d4a4f52c74055163c6cc38/docs/logos/standalone_logo/logo_original.svg"
21)]
22#![cfg_attr(docsrs, feature(doc_cfg))]
23// Make sure fast / cheap clones on Arc are explicit:
24// https://github.com/apache/datafusion/issues/11143
25#![cfg_attr(not(test), deny(clippy::clone_on_ref_ptr))]
26#![cfg_attr(test, allow(clippy::needless_pass_by_value))]
27
28//! A table that uses the `ObjectStore` listing capability
29//! to get the list of files to process.
30
31pub mod decoder;
32pub mod display;
33pub mod file;
34pub mod file_compression_type;
35pub mod file_format;
36pub mod file_groups;
37pub mod file_scan_config;
38pub mod file_sink_config;
39pub mod file_stream;
40pub mod memory;
41pub mod projection;
42pub mod schema_adapter;
43pub mod sink;
44pub mod source;
45mod statistics;
46pub mod table_schema;
47
48#[cfg(test)]
49pub mod test_util;
50
51pub mod url;
52pub mod write;
53pub use self::file::as_file_source;
54pub use self::url::ListingTableUrl;
55use crate::file_groups::FileGroup;
56use chrono::TimeZone;
57use datafusion_common::stats::Precision;
58use datafusion_common::{ColumnStatistics, Result, exec_datafusion_err};
59use datafusion_common::{ScalarValue, Statistics};
60use datafusion_physical_expr::LexOrdering;
61use futures::{Stream, StreamExt};
62use object_store::{GetOptions, GetRange, ObjectStore};
63use object_store::{ObjectMeta, path::Path};
64pub use table_schema::TableSchema;
65// Remove when add_row_stats is remove
66#[expect(deprecated)]
67pub use statistics::add_row_stats;
68pub use statistics::compute_all_files_statistics;
69use std::ops::Range;
70use std::pin::Pin;
71use std::sync::Arc;
72
73/// Stream of files get listed from object store
74pub type PartitionedFileStream =
75    Pin<Box<dyn Stream<Item = Result<PartitionedFile>> + Send + Sync + 'static>>;
76
77/// Only scan a subset of Row Groups from the Parquet file whose data "midpoint"
78/// lies within the [start, end) byte offsets. This option can be used to scan non-overlapping
79/// sections of a Parquet file in parallel.
80#[derive(Debug, Clone, PartialEq, Hash, Eq, PartialOrd, Ord)]
81pub struct FileRange {
82    /// Range start
83    pub start: i64,
84    /// Range end
85    pub end: i64,
86}
87
88impl FileRange {
89    /// returns true if this file range contains the specified offset
90    pub fn contains(&self, offset: i64) -> bool {
91        offset >= self.start && offset < self.end
92    }
93}
94
95#[derive(Debug, Clone)]
96/// A single file or part of a file that should be read, along with its schema, statistics
97/// and partition column values that need to be appended to each row.
98///
99/// # Statistics
100///
101/// The [`Self::statistics`] field contains statistics for the **full table schema**,
102/// which includes both file columns and partition columns. When statistics are set via
103/// [`Self::with_statistics`], exact statistics for partition columns are automatically
104/// computed from [`Self::partition_values`]:
105///
106/// - `min = max = partition_value` (all rows in a file share the same partition value)
107/// - `null_count = 0` (partition values extracted from paths are never null)
108/// - `distinct_count = 1` (single distinct value per file for each partition column)
109///
110/// This enables query optimizers to use partition column bounds for pruning and planning.
111pub struct PartitionedFile {
112    /// Path for the file (e.g. URL, filesystem path, etc)
113    pub object_meta: ObjectMeta,
114    /// Values of partition columns to be appended to each row.
115    ///
116    /// These MUST have the same count, order, and type than the [`table_partition_cols`].
117    ///
118    /// You may use [`wrap_partition_value_in_dict`] to wrap them if you have used [`wrap_partition_type_in_dict`] to wrap the column type.
119    ///
120    ///
121    /// [`wrap_partition_type_in_dict`]: crate::file_scan_config::wrap_partition_type_in_dict
122    /// [`wrap_partition_value_in_dict`]: crate::file_scan_config::wrap_partition_value_in_dict
123    /// [`table_partition_cols`]: https://github.com/apache/datafusion/blob/main/datafusion/core/src/datasource/file_format/options.rs#L87
124    pub partition_values: Vec<ScalarValue>,
125    /// An optional file range for a more fine-grained parallel execution
126    pub range: Option<FileRange>,
127    /// Optional statistics that describe the data in this file if known.
128    ///
129    /// DataFusion relies on these statistics for planning (in particular to sort file groups),
130    /// so if they are incorrect, incorrect answers may result.
131    ///
132    /// These statistics cover the full table schema: file columns plus partition columns.
133    /// When set via [`Self::with_statistics`], partition column statistics are automatically
134    /// computed from [`Self::partition_values`] with exact min/max/null_count/distinct_count.
135    pub statistics: Option<Arc<Statistics>>,
136    /// The known lexicographical ordering of the rows in this file, if any.
137    ///
138    /// This describes how the data within the file is sorted with respect to one or more
139    /// columns, and is used by the optimizer for planning operations that depend on input
140    /// ordering (e.g. merges, sorts, and certain aggregations).
141    ///
142    /// When available, this is typically inferred from file-level metadata exposed by the
143    /// underlying format (for example, Parquet `sorting_columns`), but it may also be set
144    /// explicitly via [`Self::with_ordering`].
145    pub ordering: Option<LexOrdering>,
146    /// An optional field for user defined per object metadata
147    pub extensions: Option<Arc<dyn std::any::Any + Send + Sync>>,
148    /// The estimated size of the parquet metadata, in bytes
149    pub metadata_size_hint: Option<usize>,
150}
151
152impl PartitionedFile {
153    /// Create a simple file without metadata or partition
154    pub fn new(path: impl Into<String>, size: u64) -> Self {
155        Self {
156            object_meta: ObjectMeta {
157                location: Path::from(path.into()),
158                last_modified: chrono::Utc.timestamp_nanos(0),
159                size,
160                e_tag: None,
161                version: None,
162            },
163            partition_values: vec![],
164            range: None,
165            statistics: None,
166            ordering: None,
167            extensions: None,
168            metadata_size_hint: None,
169        }
170    }
171
172    /// Create a file from a known ObjectMeta without partition
173    pub fn new_from_meta(object_meta: ObjectMeta) -> Self {
174        Self {
175            object_meta,
176            partition_values: vec![],
177            range: None,
178            statistics: None,
179            ordering: None,
180            extensions: None,
181            metadata_size_hint: None,
182        }
183    }
184
185    /// Create a file range without metadata or partition
186    pub fn new_with_range(path: String, size: u64, start: i64, end: i64) -> Self {
187        Self {
188            object_meta: ObjectMeta {
189                location: Path::from(path),
190                last_modified: chrono::Utc.timestamp_nanos(0),
191                size,
192                e_tag: None,
193                version: None,
194            },
195            partition_values: vec![],
196            range: Some(FileRange { start, end }),
197            statistics: None,
198            ordering: None,
199            extensions: None,
200            metadata_size_hint: None,
201        }
202        .with_range(start, end)
203    }
204
205    /// Attach partition values to this file.
206    /// This replaces any existing partition values.
207    pub fn with_partition_values(mut self, partition_values: Vec<ScalarValue>) -> Self {
208        self.partition_values = partition_values;
209        self
210    }
211
212    /// Size of the file to be scanned (taking into account the range, if present).
213    pub fn effective_size(&self) -> u64 {
214        if let Some(range) = &self.range {
215            (range.end - range.start) as u64
216        } else {
217            self.object_meta.size
218        }
219    }
220
221    /// Effective range of the file to be scanned.
222    pub fn range(&self) -> (u64, u64) {
223        if let Some(range) = &self.range {
224            (range.start as u64, range.end as u64)
225        } else {
226            (0, self.object_meta.size)
227        }
228    }
229
230    /// Provide a hint to the size of the file metadata. If a hint is provided
231    /// the reader will try and fetch the last `size_hint` bytes of the parquet file optimistically.
232    /// Without an appropriate hint, two read may be required to fetch the metadata.
233    pub fn with_metadata_size_hint(mut self, metadata_size_hint: usize) -> Self {
234        self.metadata_size_hint = Some(metadata_size_hint);
235        self
236    }
237
238    /// Return a file reference from the given path
239    pub fn from_path(path: String) -> Result<Self> {
240        let size = std::fs::metadata(path.clone())?.len();
241        Ok(Self::new(path, size))
242    }
243
244    /// Return the path of this partitioned file
245    pub fn path(&self) -> &Path {
246        &self.object_meta.location
247    }
248
249    /// Update the file to only scan the specified range (in bytes)
250    pub fn with_range(mut self, start: i64, end: i64) -> Self {
251        self.range = Some(FileRange { start, end });
252        self
253    }
254
255    /// Update the user defined extensions for this file.
256    ///
257    /// This can be used to pass reader specific information.
258    pub fn with_extensions(
259        mut self,
260        extensions: Arc<dyn std::any::Any + Send + Sync>,
261    ) -> Self {
262        self.extensions = Some(extensions);
263        self
264    }
265
266    /// Update the statistics for this file.
267    ///
268    /// The provided `statistics` should cover only the file schema columns.
269    /// This method will automatically append exact statistics for partition columns
270    /// based on `partition_values`:
271    /// - `min = max = partition_value` (all rows have the same value)
272    /// - `null_count = 0` (partition values from paths are never null)
273    /// - `distinct_count = 1` (all rows have the same partition value)
274    pub fn with_statistics(mut self, file_statistics: Arc<Statistics>) -> Self {
275        if self.partition_values.is_empty() {
276            // No partition columns, use stats as-is
277            self.statistics = Some(file_statistics);
278        } else {
279            // Extend stats with exact partition column statistics
280            let mut stats = Arc::unwrap_or_clone(file_statistics);
281            for partition_value in &self.partition_values {
282                let col_stats = ColumnStatistics {
283                    null_count: Precision::Exact(0),
284                    max_value: Precision::Exact(partition_value.clone()),
285                    min_value: Precision::Exact(partition_value.clone()),
286                    distinct_count: Precision::Exact(1),
287                    sum_value: Precision::Absent,
288                    byte_size: partition_value
289                        .data_type()
290                        .primitive_width()
291                        .map(|w| stats.num_rows.multiply(&Precision::Exact(w)))
292                        .unwrap_or_else(|| Precision::Absent),
293                };
294                stats.column_statistics.push(col_stats);
295            }
296            self.statistics = Some(Arc::new(stats));
297        }
298        self
299    }
300
301    /// Check if this file has any statistics.
302    /// This returns `true` if the file has any Exact or Inexact statistics
303    /// and `false` if all statistics are `Precision::Absent`.
304    pub fn has_statistics(&self) -> bool {
305        if let Some(stats) = &self.statistics {
306            stats.column_statistics.iter().any(|col_stats| {
307                col_stats.null_count != Precision::Absent
308                    || col_stats.max_value != Precision::Absent
309                    || col_stats.min_value != Precision::Absent
310                    || col_stats.sum_value != Precision::Absent
311                    || col_stats.distinct_count != Precision::Absent
312            })
313        } else {
314            false
315        }
316    }
317
318    /// Set the known ordering of data in this file.
319    ///
320    /// The ordering represents the lexicographical sort order of the data,
321    /// typically inferred from file metadata (e.g., Parquet sorting_columns).
322    pub fn with_ordering(mut self, ordering: Option<LexOrdering>) -> Self {
323        self.ordering = ordering;
324        self
325    }
326}
327
328impl From<ObjectMeta> for PartitionedFile {
329    fn from(object_meta: ObjectMeta) -> Self {
330        PartitionedFile {
331            object_meta,
332            partition_values: vec![],
333            range: None,
334            statistics: None,
335            ordering: None,
336            extensions: None,
337            metadata_size_hint: None,
338        }
339    }
340}
341
342/// Represents the possible outcomes of a range calculation.
343///
344/// This enum is used to encapsulate the result of calculating the range of
345/// bytes to read from an object (like a file) in an object store.
346///
347/// Variants:
348/// - `Range(Option<Range<usize>>)`:
349///   Represents a range of bytes to be read. It contains an `Option` wrapping a
350///   `Range<usize>`. `None` signifies that the entire object should be read,
351///   while `Some(range)` specifies the exact byte range to read.
352/// - `TerminateEarly`:
353///   Indicates that the range calculation determined no further action is
354///   necessary, possibly because the calculated range is empty or invalid.
355pub enum RangeCalculation {
356    Range(Option<Range<u64>>),
357    TerminateEarly,
358}
359
360/// Calculates an appropriate byte range for reading from an object based on the
361/// provided metadata.
362///
363/// This asynchronous function examines the [`PartitionedFile`] of an object in an object store
364/// and determines the range of bytes to be read. The range calculation may adjust
365/// the start and end points to align with meaningful data boundaries (like newlines).
366///
367/// Returns a `Result` wrapping a [`RangeCalculation`], which is either a calculated byte range or an indication to terminate early.
368///
369/// Returns an `Error` if any part of the range calculation fails, such as issues in reading from the object store or invalid range boundaries.
370pub async fn calculate_range(
371    file: &PartitionedFile,
372    store: &Arc<dyn ObjectStore>,
373    terminator: Option<u8>,
374) -> Result<RangeCalculation> {
375    let location = &file.object_meta.location;
376    let file_size = file.object_meta.size;
377    let newline = terminator.unwrap_or(b'\n');
378
379    match file.range {
380        None => Ok(RangeCalculation::Range(None)),
381        Some(FileRange { start, end }) => {
382            let start: u64 = start.try_into().map_err(|_| {
383                exec_datafusion_err!("Expect start range to fit in u64, got {start}")
384            })?;
385            let end: u64 = end.try_into().map_err(|_| {
386                exec_datafusion_err!("Expect end range to fit in u64, got {end}")
387            })?;
388
389            let start_delta = if start != 0 {
390                find_first_newline(store, location, start - 1, file_size, newline).await?
391            } else {
392                0
393            };
394
395            if start + start_delta > end {
396                return Ok(RangeCalculation::TerminateEarly);
397            }
398
399            let end_delta = if end != file_size {
400                find_first_newline(store, location, end - 1, file_size, newline).await?
401            } else {
402                0
403            };
404
405            let range = start + start_delta..end + end_delta;
406
407            if range.start >= range.end {
408                return Ok(RangeCalculation::TerminateEarly);
409            }
410
411            Ok(RangeCalculation::Range(Some(range)))
412        }
413    }
414}
415
416/// Asynchronously finds the position of the first newline character in a specified byte range
417/// within an object, such as a file, in an object store.
418///
419/// This function scans the contents of the object starting from the specified `start` position
420/// up to the `end` position, looking for the first occurrence of a newline character.
421/// It returns the position of the first newline relative to the start of the range.
422///
423/// Returns a `Result` wrapping a `usize` that represents the position of the first newline character found within the specified range. If no newline is found, it returns the length of the scanned data, effectively indicating the end of the range.
424///
425/// The function returns an `Error` if any issues arise while reading from the object store or processing the data stream.
426async fn find_first_newline(
427    object_store: &Arc<dyn ObjectStore>,
428    location: &Path,
429    start: u64,
430    end: u64,
431    newline: u8,
432) -> Result<u64> {
433    let options = GetOptions {
434        range: Some(GetRange::Bounded(start..end)),
435        ..Default::default()
436    };
437
438    let result = object_store.get_opts(location, options).await?;
439    let mut result_stream = result.into_stream();
440
441    let mut index = 0;
442
443    while let Some(chunk) = result_stream.next().await.transpose()? {
444        if let Some(position) = chunk.iter().position(|&byte| byte == newline) {
445            let position = position as u64;
446            return Ok(index + position);
447        }
448
449        index += chunk.len() as u64;
450    }
451
452    Ok(index)
453}
454
455/// Generates test files with min-max statistics in different overlap patterns.
456///
457/// Used by tests and benchmarks.
458///
459/// # Overlap Factors
460///
461/// The `overlap_factor` parameter controls how much the value ranges in generated test files overlap:
462/// - `0.0`: No overlap between files (completely disjoint ranges)
463/// - `0.2`: Low overlap (20% of the range size overlaps with adjacent files)
464/// - `0.5`: Medium overlap (50% of ranges overlap)
465/// - `0.8`: High overlap (80% of ranges overlap between files)
466///
467/// # Examples
468///
469/// With 5 files and different overlap factors showing `[min, max]` ranges:
470///
471/// overlap_factor = 0.0 (no overlap):
472///
473/// File 0: [0, 20]
474/// File 1: [20, 40]
475/// File 2: [40, 60]
476/// File 3: [60, 80]
477/// File 4: [80, 100]
478///
479/// overlap_factor = 0.5 (50% overlap):
480///
481/// File 0: [0, 40]
482/// File 1: [20, 60]
483/// File 2: [40, 80]
484/// File 3: [60, 100]
485/// File 4: [80, 120]
486///
487/// overlap_factor = 0.8 (80% overlap):
488///
489/// File 0: [0, 100]
490/// File 1: [20, 120]
491/// File 2: [40, 140]
492/// File 3: [60, 160]
493/// File 4: [80, 180]
494pub fn generate_test_files(num_files: usize, overlap_factor: f64) -> Vec<FileGroup> {
495    let mut files = Vec::with_capacity(num_files);
496    if num_files == 0 {
497        return vec![];
498    }
499    let range_size = if overlap_factor == 0.0 {
500        100 / num_files as i64
501    } else {
502        (100.0 / (overlap_factor * num_files as f64)).max(1.0) as i64
503    };
504
505    for i in 0..num_files {
506        let base = (i as f64 * range_size as f64 * (1.0 - overlap_factor)) as i64;
507        let min = base as f64;
508        let max = (base + range_size) as f64;
509
510        let file = PartitionedFile {
511            object_meta: ObjectMeta {
512                location: Path::from(format!("file_{i}.parquet")),
513                last_modified: chrono::Utc::now(),
514                size: 1000,
515                e_tag: None,
516                version: None,
517            },
518            partition_values: vec![],
519            range: None,
520            statistics: Some(Arc::new(Statistics {
521                num_rows: Precision::Exact(100),
522                total_byte_size: Precision::Exact(1000),
523                column_statistics: vec![ColumnStatistics {
524                    null_count: Precision::Exact(0),
525                    max_value: Precision::Exact(ScalarValue::Float64(Some(max))),
526                    min_value: Precision::Exact(ScalarValue::Float64(Some(min))),
527                    sum_value: Precision::Absent,
528                    distinct_count: Precision::Absent,
529                    byte_size: Precision::Absent,
530                }],
531            })),
532            ordering: None,
533            extensions: None,
534            metadata_size_hint: None,
535        };
536        files.push(file);
537    }
538
539    vec![FileGroup::new(files)]
540}
541
542// Helper function to verify that files within each group maintain sort order
543/// Used by tests and benchmarks
544pub fn verify_sort_integrity(file_groups: &[FileGroup]) -> bool {
545    for group in file_groups {
546        let files = group.iter().collect::<Vec<_>>();
547        for i in 1..files.len() {
548            let prev_file = files[i - 1];
549            let curr_file = files[i];
550
551            // Check if the min value of current file is greater than max value of previous file
552            if let (Some(prev_stats), Some(curr_stats)) =
553                (&prev_file.statistics, &curr_file.statistics)
554            {
555                let prev_max = &prev_stats.column_statistics[0].max_value;
556                let curr_min = &curr_stats.column_statistics[0].min_value;
557                if curr_min.get_value().unwrap() <= prev_max.get_value().unwrap() {
558                    return false;
559                }
560            }
561        }
562    }
563    true
564}
565
566#[cfg(test)]
567mod tests {
568    use super::ListingTableUrl;
569    use arrow::{
570        array::{ArrayRef, Int32Array, RecordBatch},
571        datatypes::{DataType, Field, Schema, SchemaRef},
572    };
573    use datafusion_execution::object_store::{
574        DefaultObjectStoreRegistry, ObjectStoreRegistry,
575    };
576    use object_store::{ObjectStoreExt, local::LocalFileSystem, path::Path};
577    use std::{collections::HashMap, ops::Not, sync::Arc};
578    use url::Url;
579
580    /// Return a RecordBatch with a single Int32 array with values (0..sz) in a field named "i"
581    pub fn make_partition(sz: i32) -> RecordBatch {
582        let seq_start = 0;
583        let seq_end = sz;
584        let values = (seq_start..seq_end).collect::<Vec<_>>();
585        let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
586        let arr = Arc::new(Int32Array::from(values));
587
588        RecordBatch::try_new(schema, vec![arr as ArrayRef]).unwrap()
589    }
590
591    /// Get the schema for the aggregate_test_* csv files
592    pub fn aggr_test_schema() -> SchemaRef {
593        let mut f1 = Field::new("c1", DataType::Utf8, false);
594        f1.set_metadata(HashMap::from_iter(vec![("testing".into(), "test".into())]));
595        let schema = Schema::new(vec![
596            f1,
597            Field::new("c2", DataType::UInt32, false),
598            Field::new("c3", DataType::Int8, false),
599            Field::new("c4", DataType::Int16, false),
600            Field::new("c5", DataType::Int32, false),
601            Field::new("c6", DataType::Int64, false),
602            Field::new("c7", DataType::UInt8, false),
603            Field::new("c8", DataType::UInt16, false),
604            Field::new("c9", DataType::UInt32, false),
605            Field::new("c10", DataType::UInt64, false),
606            Field::new("c11", DataType::Float32, false),
607            Field::new("c12", DataType::Float64, false),
608            Field::new("c13", DataType::Utf8, false),
609        ]);
610
611        Arc::new(schema)
612    }
613
614    #[test]
615    fn test_object_store_listing_url() {
616        let listing = ListingTableUrl::parse("file:///").unwrap();
617        let store = listing.object_store();
618        assert_eq!(store.as_str(), "file:///");
619
620        let listing = ListingTableUrl::parse("s3://bucket/").unwrap();
621        let store = listing.object_store();
622        assert_eq!(store.as_str(), "s3://bucket/");
623    }
624
625    #[test]
626    fn test_get_store_hdfs() {
627        let sut = DefaultObjectStoreRegistry::default();
628        let url = Url::parse("hdfs://localhost:8020").unwrap();
629        sut.register_store(&url, Arc::new(LocalFileSystem::new()));
630        let url = ListingTableUrl::parse("hdfs://localhost:8020/key").unwrap();
631        sut.get_store(url.as_ref()).unwrap();
632    }
633
634    #[test]
635    fn test_get_store_s3() {
636        let sut = DefaultObjectStoreRegistry::default();
637        let url = Url::parse("s3://bucket/key").unwrap();
638        sut.register_store(&url, Arc::new(LocalFileSystem::new()));
639        let url = ListingTableUrl::parse("s3://bucket/key").unwrap();
640        sut.get_store(url.as_ref()).unwrap();
641    }
642
643    #[test]
644    fn test_get_store_file() {
645        let sut = DefaultObjectStoreRegistry::default();
646        let url = ListingTableUrl::parse("file:///bucket/key").unwrap();
647        sut.get_store(url.as_ref()).unwrap();
648    }
649
650    #[test]
651    fn test_get_store_local() {
652        let sut = DefaultObjectStoreRegistry::default();
653        let url = ListingTableUrl::parse("../").unwrap();
654        sut.get_store(url.as_ref()).unwrap();
655    }
656
657    #[test]
658    fn test_with_statistics_appends_partition_column_stats() {
659        use crate::PartitionedFile;
660        use datafusion_common::stats::Precision;
661        use datafusion_common::{ColumnStatistics, ScalarValue, Statistics};
662
663        // Create a PartitionedFile with partition values
664        let mut pf = PartitionedFile::new(
665            "test.parquet",
666            100, // file size
667        );
668        pf.partition_values = vec![
669            ScalarValue::Date32(Some(20148)), // 2025-03-01
670        ];
671
672        // Create file-only statistics (1 column for 'id')
673        let file_stats = Arc::new(Statistics {
674            num_rows: Precision::Exact(2),
675            total_byte_size: Precision::Exact(16),
676            column_statistics: vec![ColumnStatistics {
677                null_count: Precision::Exact(0),
678                max_value: Precision::Exact(ScalarValue::Int32(Some(4))),
679                min_value: Precision::Exact(ScalarValue::Int32(Some(3))),
680                sum_value: Precision::Absent,
681                distinct_count: Precision::Absent,
682                byte_size: Precision::Absent,
683            }],
684        });
685
686        // Call with_statistics - should append partition column stats
687        let pf = pf.with_statistics(file_stats);
688
689        // Verify the statistics now have 2 columns
690        let stats = pf.statistics.unwrap();
691        assert_eq!(
692            stats.column_statistics.len(),
693            2,
694            "Expected 2 columns (id + date partition)"
695        );
696
697        // Verify partition column statistics
698        let partition_col_stats = &stats.column_statistics[1];
699        assert_eq!(
700            partition_col_stats.null_count,
701            Precision::Exact(0),
702            "Partition column null_count should be Exact(0)"
703        );
704        assert_eq!(
705            partition_col_stats.min_value,
706            Precision::Exact(ScalarValue::Date32(Some(20148))),
707            "Partition column min should match partition value"
708        );
709        assert_eq!(
710            partition_col_stats.max_value,
711            Precision::Exact(ScalarValue::Date32(Some(20148))),
712            "Partition column max should match partition value"
713        );
714        assert_eq!(
715            partition_col_stats.distinct_count,
716            Precision::Exact(1),
717            "Partition column distinct_count should be Exact(1)"
718        );
719    }
720
721    #[test]
722    fn test_url_contains() {
723        let url = ListingTableUrl::parse("file:///var/data/mytable/").unwrap();
724
725        // standard case with default config
726        assert!(url.contains(
727            &Path::parse("/var/data/mytable/data.parquet").unwrap(),
728            true
729        ));
730
731        // standard case with `ignore_subdirectory` set to false
732        assert!(url.contains(
733            &Path::parse("/var/data/mytable/data.parquet").unwrap(),
734            false
735        ));
736
737        // as per documentation, when `ignore_subdirectory` is true, we should ignore files that aren't
738        // a direct child of the `url`
739        assert!(
740            url.contains(
741                &Path::parse("/var/data/mytable/mysubfolder/data.parquet").unwrap(),
742                true
743            )
744            .not()
745        );
746
747        // when we set `ignore_subdirectory` to false, we should not ignore the file
748        assert!(url.contains(
749            &Path::parse("/var/data/mytable/mysubfolder/data.parquet").unwrap(),
750            false
751        ));
752
753        // as above, `ignore_subdirectory` is false, so we include the file
754        assert!(url.contains(
755            &Path::parse("/var/data/mytable/year=2024/data.parquet").unwrap(),
756            false
757        ));
758
759        // in this case, we include the file even when `ignore_subdirectory` is true because the
760        // path segment is a hive partition which doesn't count as a subdirectory for the purposes
761        // of `Url::contains`
762        assert!(url.contains(
763            &Path::parse("/var/data/mytable/year=2024/data.parquet").unwrap(),
764            true
765        ));
766
767        // testing an empty path with default config
768        assert!(url.contains(&Path::parse("/var/data/mytable/").unwrap(), true));
769
770        // testing an empty path with `ignore_subdirectory` set to false
771        assert!(url.contains(&Path::parse("/var/data/mytable/").unwrap(), false));
772    }
773
774    /// Regression test for <https://github.com/apache/datafusion/issues/19605>
775    #[tokio::test]
776    async fn test_calculate_range_single_line_file() {
777        use super::{PartitionedFile, RangeCalculation, calculate_range};
778        use object_store::ObjectStore;
779        use object_store::memory::InMemory;
780
781        let content = r#"{"id":1,"data":"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}"#;
782        let file_size = content.len() as u64;
783
784        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
785        let path = Path::from("test.json");
786        store.put(&path, content.into()).await.unwrap();
787
788        let mid = file_size / 2;
789        let partitioned_file = PartitionedFile::new_with_range(
790            path.to_string(),
791            file_size,
792            mid as i64,
793            file_size as i64,
794        );
795
796        let result = calculate_range(&partitioned_file, &store, None).await;
797
798        assert!(matches!(result, Ok(RangeCalculation::TerminateEarly)));
799    }
800}