datafusion_datasource/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#![doc(
19    html_logo_url = "https://raw.githubusercontent.com/apache/datafusion/19fe44cf2f30cbdd63d4a4f52c74055163c6cc38/docs/logos/standalone_logo/logo_original.svg",
20    html_favicon_url = "https://raw.githubusercontent.com/apache/datafusion/19fe44cf2f30cbdd63d4a4f52c74055163c6cc38/docs/logos/standalone_logo/logo_original.svg"
21)]
22#![cfg_attr(docsrs, feature(doc_cfg))]
23// Make sure fast / cheap clones on Arc are explicit:
24// https://github.com/apache/datafusion/issues/11143
25#![cfg_attr(not(test), deny(clippy::clone_on_ref_ptr))]
26#![cfg_attr(test, allow(clippy::needless_pass_by_value))]
27#![deny(clippy::allow_attributes)]
28
29//! A table that uses the `ObjectStore` listing capability
30//! to get the list of files to process.
31
32pub mod decoder;
33pub mod display;
34pub mod file;
35pub mod file_compression_type;
36pub mod file_format;
37pub mod file_groups;
38pub mod file_scan_config;
39pub mod file_sink_config;
40pub mod file_stream;
41pub mod memory;
42pub mod projection;
43pub mod schema_adapter;
44pub mod sink;
45pub mod source;
46mod statistics;
47pub mod table_schema;
48
49#[cfg(test)]
50pub mod test_util;
51
52pub mod url;
53pub mod write;
54pub use self::file::as_file_source;
55pub use self::url::ListingTableUrl;
56use crate::file_groups::FileGroup;
57use chrono::TimeZone;
58use datafusion_common::stats::Precision;
59use datafusion_common::{ColumnStatistics, Result, exec_datafusion_err};
60use datafusion_common::{ScalarValue, Statistics};
61use futures::{Stream, StreamExt};
62use object_store::{GetOptions, GetRange, ObjectStore};
63use object_store::{ObjectMeta, path::Path};
64pub use table_schema::TableSchema;
65// Remove when add_row_stats is remove
66#[expect(deprecated)]
67pub use statistics::add_row_stats;
68pub use statistics::compute_all_files_statistics;
69use std::ops::Range;
70use std::pin::Pin;
71use std::sync::Arc;
72
73/// Stream of files get listed from object store
74pub type PartitionedFileStream =
75    Pin<Box<dyn Stream<Item = Result<PartitionedFile>> + Send + Sync + 'static>>;
76
77/// Only scan a subset of Row Groups from the Parquet file whose data "midpoint"
78/// lies within the [start, end) byte offsets. This option can be used to scan non-overlapping
79/// sections of a Parquet file in parallel.
80#[derive(Debug, Clone, PartialEq, Hash, Eq, PartialOrd, Ord)]
81pub struct FileRange {
82    /// Range start
83    pub start: i64,
84    /// Range end
85    pub end: i64,
86}
87
88impl FileRange {
89    /// returns true if this file range contains the specified offset
90    pub fn contains(&self, offset: i64) -> bool {
91        offset >= self.start && offset < self.end
92    }
93}
94
95#[derive(Debug, Clone)]
96/// A single file or part of a file that should be read, along with its schema, statistics
97/// and partition column values that need to be appended to each row.
98///
99/// # Statistics
100///
101/// The [`Self::statistics`] field contains statistics for the **full table schema**,
102/// which includes both file columns and partition columns. When statistics are set via
103/// [`Self::with_statistics`], exact statistics for partition columns are automatically
104/// computed from [`Self::partition_values`]:
105///
106/// - `min = max = partition_value` (all rows in a file share the same partition value)
107/// - `null_count = 0` (partition values extracted from paths are never null)
108/// - `distinct_count = 1` (single distinct value per file for each partition column)
109///
110/// This enables query optimizers to use partition column bounds for pruning and planning.
111pub struct PartitionedFile {
112    /// Path for the file (e.g. URL, filesystem path, etc)
113    pub object_meta: ObjectMeta,
114    /// Values of partition columns to be appended to each row.
115    ///
116    /// These MUST have the same count, order, and type than the [`table_partition_cols`].
117    ///
118    /// You may use [`wrap_partition_value_in_dict`] to wrap them if you have used [`wrap_partition_type_in_dict`] to wrap the column type.
119    ///
120    ///
121    /// [`wrap_partition_type_in_dict`]: crate::file_scan_config::wrap_partition_type_in_dict
122    /// [`wrap_partition_value_in_dict`]: crate::file_scan_config::wrap_partition_value_in_dict
123    /// [`table_partition_cols`]: https://github.com/apache/datafusion/blob/main/datafusion/core/src/datasource/file_format/options.rs#L87
124    pub partition_values: Vec<ScalarValue>,
125    /// An optional file range for a more fine-grained parallel execution
126    pub range: Option<FileRange>,
127    /// Optional statistics that describe the data in this file if known.
128    ///
129    /// DataFusion relies on these statistics for planning (in particular to sort file groups),
130    /// so if they are incorrect, incorrect answers may result.
131    ///
132    /// These statistics cover the full table schema: file columns plus partition columns.
133    /// When set via [`Self::with_statistics`], partition column statistics are automatically
134    /// computed from [`Self::partition_values`] with exact min/max/null_count/distinct_count.
135    pub statistics: Option<Arc<Statistics>>,
136    /// An optional field for user defined per object metadata
137    pub extensions: Option<Arc<dyn std::any::Any + Send + Sync>>,
138    /// The estimated size of the parquet metadata, in bytes
139    pub metadata_size_hint: Option<usize>,
140}
141
142impl PartitionedFile {
143    /// Create a simple file without metadata or partition
144    pub fn new(path: impl Into<String>, size: u64) -> Self {
145        Self {
146            object_meta: ObjectMeta {
147                location: Path::from(path.into()),
148                last_modified: chrono::Utc.timestamp_nanos(0),
149                size,
150                e_tag: None,
151                version: None,
152            },
153            partition_values: vec![],
154            range: None,
155            statistics: None,
156            extensions: None,
157            metadata_size_hint: None,
158        }
159    }
160
161    /// Create a file range without metadata or partition
162    pub fn new_with_range(path: String, size: u64, start: i64, end: i64) -> Self {
163        Self {
164            object_meta: ObjectMeta {
165                location: Path::from(path),
166                last_modified: chrono::Utc.timestamp_nanos(0),
167                size,
168                e_tag: None,
169                version: None,
170            },
171            partition_values: vec![],
172            range: Some(FileRange { start, end }),
173            statistics: None,
174            extensions: None,
175            metadata_size_hint: None,
176        }
177        .with_range(start, end)
178    }
179
180    /// Size of the file to be scanned (taking into account the range, if present).
181    pub fn effective_size(&self) -> u64 {
182        if let Some(range) = &self.range {
183            (range.end - range.start) as u64
184        } else {
185            self.object_meta.size
186        }
187    }
188
189    /// Effective range of the file to be scanned.
190    pub fn range(&self) -> (u64, u64) {
191        if let Some(range) = &self.range {
192            (range.start as u64, range.end as u64)
193        } else {
194            (0, self.object_meta.size)
195        }
196    }
197
198    /// Provide a hint to the size of the file metadata. If a hint is provided
199    /// the reader will try and fetch the last `size_hint` bytes of the parquet file optimistically.
200    /// Without an appropriate hint, two read may be required to fetch the metadata.
201    pub fn with_metadata_size_hint(mut self, metadata_size_hint: usize) -> Self {
202        self.metadata_size_hint = Some(metadata_size_hint);
203        self
204    }
205
206    /// Return a file reference from the given path
207    pub fn from_path(path: String) -> Result<Self> {
208        let size = std::fs::metadata(path.clone())?.len();
209        Ok(Self::new(path, size))
210    }
211
212    /// Return the path of this partitioned file
213    pub fn path(&self) -> &Path {
214        &self.object_meta.location
215    }
216
217    /// Update the file to only scan the specified range (in bytes)
218    pub fn with_range(mut self, start: i64, end: i64) -> Self {
219        self.range = Some(FileRange { start, end });
220        self
221    }
222
223    /// Update the user defined extensions for this file.
224    ///
225    /// This can be used to pass reader specific information.
226    pub fn with_extensions(
227        mut self,
228        extensions: Arc<dyn std::any::Any + Send + Sync>,
229    ) -> Self {
230        self.extensions = Some(extensions);
231        self
232    }
233
234    /// Update the statistics for this file.
235    ///
236    /// The provided `statistics` should cover only the file schema columns.
237    /// This method will automatically append exact statistics for partition columns
238    /// based on `partition_values`:
239    /// - `min = max = partition_value` (all rows have the same value)
240    /// - `null_count = 0` (partition values from paths are never null)
241    /// - `distinct_count = 1` (all rows have the same partition value)
242    pub fn with_statistics(mut self, file_statistics: Arc<Statistics>) -> Self {
243        if self.partition_values.is_empty() {
244            // No partition columns, use stats as-is
245            self.statistics = Some(file_statistics);
246        } else {
247            // Extend stats with exact partition column statistics
248            let mut stats = Arc::unwrap_or_clone(file_statistics);
249            for partition_value in &self.partition_values {
250                let col_stats = ColumnStatistics {
251                    null_count: Precision::Exact(0),
252                    max_value: Precision::Exact(partition_value.clone()),
253                    min_value: Precision::Exact(partition_value.clone()),
254                    distinct_count: Precision::Exact(1),
255                    sum_value: Precision::Absent,
256                    byte_size: partition_value
257                        .data_type()
258                        .primitive_width()
259                        .map(|w| stats.num_rows.multiply(&Precision::Exact(w)))
260                        .unwrap_or_else(|| Precision::Absent),
261                };
262                stats.column_statistics.push(col_stats);
263            }
264            self.statistics = Some(Arc::new(stats));
265        }
266        self
267    }
268
269    /// Check if this file has any statistics.
270    /// This returns `true` if the file has any Exact or Inexact statistics
271    /// and `false` if all statistics are `Precision::Absent`.
272    pub fn has_statistics(&self) -> bool {
273        if let Some(stats) = &self.statistics {
274            stats.column_statistics.iter().any(|col_stats| {
275                col_stats.null_count != Precision::Absent
276                    || col_stats.max_value != Precision::Absent
277                    || col_stats.min_value != Precision::Absent
278                    || col_stats.sum_value != Precision::Absent
279                    || col_stats.distinct_count != Precision::Absent
280            })
281        } else {
282            false
283        }
284    }
285}
286
287impl From<ObjectMeta> for PartitionedFile {
288    fn from(object_meta: ObjectMeta) -> Self {
289        PartitionedFile {
290            object_meta,
291            partition_values: vec![],
292            range: None,
293            statistics: None,
294            extensions: None,
295            metadata_size_hint: None,
296        }
297    }
298}
299
300/// Represents the possible outcomes of a range calculation.
301///
302/// This enum is used to encapsulate the result of calculating the range of
303/// bytes to read from an object (like a file) in an object store.
304///
305/// Variants:
306/// - `Range(Option<Range<usize>>)`:
307///   Represents a range of bytes to be read. It contains an `Option` wrapping a
308///   `Range<usize>`. `None` signifies that the entire object should be read,
309///   while `Some(range)` specifies the exact byte range to read.
310/// - `TerminateEarly`:
311///   Indicates that the range calculation determined no further action is
312///   necessary, possibly because the calculated range is empty or invalid.
313pub enum RangeCalculation {
314    Range(Option<Range<u64>>),
315    TerminateEarly,
316}
317
318/// Calculates an appropriate byte range for reading from an object based on the
319/// provided metadata.
320///
321/// This asynchronous function examines the [`PartitionedFile`] of an object in an object store
322/// and determines the range of bytes to be read. The range calculation may adjust
323/// the start and end points to align with meaningful data boundaries (like newlines).
324///
325/// Returns a `Result` wrapping a [`RangeCalculation`], which is either a calculated byte range or an indication to terminate early.
326///
327/// Returns an `Error` if any part of the range calculation fails, such as issues in reading from the object store or invalid range boundaries.
328pub async fn calculate_range(
329    file: &PartitionedFile,
330    store: &Arc<dyn ObjectStore>,
331    terminator: Option<u8>,
332) -> Result<RangeCalculation> {
333    let location = &file.object_meta.location;
334    let file_size = file.object_meta.size;
335    let newline = terminator.unwrap_or(b'\n');
336
337    match file.range {
338        None => Ok(RangeCalculation::Range(None)),
339        Some(FileRange { start, end }) => {
340            let start: u64 = start.try_into().map_err(|_| {
341                exec_datafusion_err!("Expect start range to fit in u64, got {start}")
342            })?;
343            let end: u64 = end.try_into().map_err(|_| {
344                exec_datafusion_err!("Expect end range to fit in u64, got {end}")
345            })?;
346
347            let start_delta = if start != 0 {
348                find_first_newline(store, location, start - 1, file_size, newline).await?
349            } else {
350                0
351            };
352
353            if start + start_delta > end {
354                return Ok(RangeCalculation::TerminateEarly);
355            }
356
357            let end_delta = if end != file_size {
358                find_first_newline(store, location, end - 1, file_size, newline).await?
359            } else {
360                0
361            };
362
363            let range = start + start_delta..end + end_delta;
364
365            if range.start >= range.end {
366                return Ok(RangeCalculation::TerminateEarly);
367            }
368
369            Ok(RangeCalculation::Range(Some(range)))
370        }
371    }
372}
373
374/// Asynchronously finds the position of the first newline character in a specified byte range
375/// within an object, such as a file, in an object store.
376///
377/// This function scans the contents of the object starting from the specified `start` position
378/// up to the `end` position, looking for the first occurrence of a newline character.
379/// It returns the position of the first newline relative to the start of the range.
380///
381/// Returns a `Result` wrapping a `usize` that represents the position of the first newline character found within the specified range. If no newline is found, it returns the length of the scanned data, effectively indicating the end of the range.
382///
383/// The function returns an `Error` if any issues arise while reading from the object store or processing the data stream.
384async fn find_first_newline(
385    object_store: &Arc<dyn ObjectStore>,
386    location: &Path,
387    start: u64,
388    end: u64,
389    newline: u8,
390) -> Result<u64> {
391    let options = GetOptions {
392        range: Some(GetRange::Bounded(start..end)),
393        ..Default::default()
394    };
395
396    let result = object_store.get_opts(location, options).await?;
397    let mut result_stream = result.into_stream();
398
399    let mut index = 0;
400
401    while let Some(chunk) = result_stream.next().await.transpose()? {
402        if let Some(position) = chunk.iter().position(|&byte| byte == newline) {
403            let position = position as u64;
404            return Ok(index + position);
405        }
406
407        index += chunk.len() as u64;
408    }
409
410    Ok(index)
411}
412
413/// Generates test files with min-max statistics in different overlap patterns.
414///
415/// Used by tests and benchmarks.
416///
417/// # Overlap Factors
418///
419/// The `overlap_factor` parameter controls how much the value ranges in generated test files overlap:
420/// - `0.0`: No overlap between files (completely disjoint ranges)
421/// - `0.2`: Low overlap (20% of the range size overlaps with adjacent files)
422/// - `0.5`: Medium overlap (50% of ranges overlap)
423/// - `0.8`: High overlap (80% of ranges overlap between files)
424///
425/// # Examples
426///
427/// With 5 files and different overlap factors showing `[min, max]` ranges:
428///
429/// overlap_factor = 0.0 (no overlap):
430///
431/// File 0: [0, 20]
432/// File 1: [20, 40]
433/// File 2: [40, 60]
434/// File 3: [60, 80]
435/// File 4: [80, 100]
436///
437/// overlap_factor = 0.5 (50% overlap):
438///
439/// File 0: [0, 40]
440/// File 1: [20, 60]
441/// File 2: [40, 80]
442/// File 3: [60, 100]
443/// File 4: [80, 120]
444///
445/// overlap_factor = 0.8 (80% overlap):
446///
447/// File 0: [0, 100]
448/// File 1: [20, 120]
449/// File 2: [40, 140]
450/// File 3: [60, 160]
451/// File 4: [80, 180]
452pub fn generate_test_files(num_files: usize, overlap_factor: f64) -> Vec<FileGroup> {
453    let mut files = Vec::with_capacity(num_files);
454    if num_files == 0 {
455        return vec![];
456    }
457    let range_size = if overlap_factor == 0.0 {
458        100 / num_files as i64
459    } else {
460        (100.0 / (overlap_factor * num_files as f64)).max(1.0) as i64
461    };
462
463    for i in 0..num_files {
464        let base = (i as f64 * range_size as f64 * (1.0 - overlap_factor)) as i64;
465        let min = base as f64;
466        let max = (base + range_size) as f64;
467
468        let file = PartitionedFile {
469            object_meta: ObjectMeta {
470                location: Path::from(format!("file_{i}.parquet")),
471                last_modified: chrono::Utc::now(),
472                size: 1000,
473                e_tag: None,
474                version: None,
475            },
476            partition_values: vec![],
477            range: None,
478            statistics: Some(Arc::new(Statistics {
479                num_rows: Precision::Exact(100),
480                total_byte_size: Precision::Exact(1000),
481                column_statistics: vec![ColumnStatistics {
482                    null_count: Precision::Exact(0),
483                    max_value: Precision::Exact(ScalarValue::Float64(Some(max))),
484                    min_value: Precision::Exact(ScalarValue::Float64(Some(min))),
485                    sum_value: Precision::Absent,
486                    distinct_count: Precision::Absent,
487                    byte_size: Precision::Absent,
488                }],
489            })),
490            extensions: None,
491            metadata_size_hint: None,
492        };
493        files.push(file);
494    }
495
496    vec![FileGroup::new(files)]
497}
498
499// Helper function to verify that files within each group maintain sort order
500/// Used by tests and benchmarks
501pub fn verify_sort_integrity(file_groups: &[FileGroup]) -> bool {
502    for group in file_groups {
503        let files = group.iter().collect::<Vec<_>>();
504        for i in 1..files.len() {
505            let prev_file = files[i - 1];
506            let curr_file = files[i];
507
508            // Check if the min value of current file is greater than max value of previous file
509            if let (Some(prev_stats), Some(curr_stats)) =
510                (&prev_file.statistics, &curr_file.statistics)
511            {
512                let prev_max = &prev_stats.column_statistics[0].max_value;
513                let curr_min = &curr_stats.column_statistics[0].min_value;
514                if curr_min.get_value().unwrap() <= prev_max.get_value().unwrap() {
515                    return false;
516                }
517            }
518        }
519    }
520    true
521}
522
523#[cfg(test)]
524mod tests {
525    use super::ListingTableUrl;
526    use arrow::{
527        array::{ArrayRef, Int32Array, RecordBatch},
528        datatypes::{DataType, Field, Schema, SchemaRef},
529    };
530    use datafusion_execution::object_store::{
531        DefaultObjectStoreRegistry, ObjectStoreRegistry,
532    };
533    use object_store::{local::LocalFileSystem, path::Path};
534    use std::{collections::HashMap, ops::Not, sync::Arc};
535    use url::Url;
536
537    /// Return a RecordBatch with a single Int32 array with values (0..sz) in a field named "i"
538    pub fn make_partition(sz: i32) -> RecordBatch {
539        let seq_start = 0;
540        let seq_end = sz;
541        let values = (seq_start..seq_end).collect::<Vec<_>>();
542        let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
543        let arr = Arc::new(Int32Array::from(values));
544
545        RecordBatch::try_new(schema, vec![arr as ArrayRef]).unwrap()
546    }
547
548    /// Get the schema for the aggregate_test_* csv files
549    pub fn aggr_test_schema() -> SchemaRef {
550        let mut f1 = Field::new("c1", DataType::Utf8, false);
551        f1.set_metadata(HashMap::from_iter(vec![("testing".into(), "test".into())]));
552        let schema = Schema::new(vec![
553            f1,
554            Field::new("c2", DataType::UInt32, false),
555            Field::new("c3", DataType::Int8, false),
556            Field::new("c4", DataType::Int16, false),
557            Field::new("c5", DataType::Int32, false),
558            Field::new("c6", DataType::Int64, false),
559            Field::new("c7", DataType::UInt8, false),
560            Field::new("c8", DataType::UInt16, false),
561            Field::new("c9", DataType::UInt32, false),
562            Field::new("c10", DataType::UInt64, false),
563            Field::new("c11", DataType::Float32, false),
564            Field::new("c12", DataType::Float64, false),
565            Field::new("c13", DataType::Utf8, false),
566        ]);
567
568        Arc::new(schema)
569    }
570
571    #[test]
572    fn test_object_store_listing_url() {
573        let listing = ListingTableUrl::parse("file:///").unwrap();
574        let store = listing.object_store();
575        assert_eq!(store.as_str(), "file:///");
576
577        let listing = ListingTableUrl::parse("s3://bucket/").unwrap();
578        let store = listing.object_store();
579        assert_eq!(store.as_str(), "s3://bucket/");
580    }
581
582    #[test]
583    fn test_get_store_hdfs() {
584        let sut = DefaultObjectStoreRegistry::default();
585        let url = Url::parse("hdfs://localhost:8020").unwrap();
586        sut.register_store(&url, Arc::new(LocalFileSystem::new()));
587        let url = ListingTableUrl::parse("hdfs://localhost:8020/key").unwrap();
588        sut.get_store(url.as_ref()).unwrap();
589    }
590
591    #[test]
592    fn test_get_store_s3() {
593        let sut = DefaultObjectStoreRegistry::default();
594        let url = Url::parse("s3://bucket/key").unwrap();
595        sut.register_store(&url, Arc::new(LocalFileSystem::new()));
596        let url = ListingTableUrl::parse("s3://bucket/key").unwrap();
597        sut.get_store(url.as_ref()).unwrap();
598    }
599
600    #[test]
601    fn test_get_store_file() {
602        let sut = DefaultObjectStoreRegistry::default();
603        let url = ListingTableUrl::parse("file:///bucket/key").unwrap();
604        sut.get_store(url.as_ref()).unwrap();
605    }
606
607    #[test]
608    fn test_get_store_local() {
609        let sut = DefaultObjectStoreRegistry::default();
610        let url = ListingTableUrl::parse("../").unwrap();
611        sut.get_store(url.as_ref()).unwrap();
612    }
613
614    #[test]
615    fn test_with_statistics_appends_partition_column_stats() {
616        use crate::PartitionedFile;
617        use datafusion_common::stats::Precision;
618        use datafusion_common::{ColumnStatistics, ScalarValue, Statistics};
619
620        // Create a PartitionedFile with partition values
621        let mut pf = PartitionedFile::new(
622            "test.parquet",
623            100, // file size
624        );
625        pf.partition_values = vec![
626            ScalarValue::Date32(Some(20148)), // 2025-03-01
627        ];
628
629        // Create file-only statistics (1 column for 'id')
630        let file_stats = Arc::new(Statistics {
631            num_rows: Precision::Exact(2),
632            total_byte_size: Precision::Exact(16),
633            column_statistics: vec![ColumnStatistics {
634                null_count: Precision::Exact(0),
635                max_value: Precision::Exact(ScalarValue::Int32(Some(4))),
636                min_value: Precision::Exact(ScalarValue::Int32(Some(3))),
637                sum_value: Precision::Absent,
638                distinct_count: Precision::Absent,
639                byte_size: Precision::Absent,
640            }],
641        });
642
643        // Call with_statistics - should append partition column stats
644        let pf = pf.with_statistics(file_stats);
645
646        // Verify the statistics now have 2 columns
647        let stats = pf.statistics.unwrap();
648        assert_eq!(
649            stats.column_statistics.len(),
650            2,
651            "Expected 2 columns (id + date partition)"
652        );
653
654        // Verify partition column statistics
655        let partition_col_stats = &stats.column_statistics[1];
656        assert_eq!(
657            partition_col_stats.null_count,
658            Precision::Exact(0),
659            "Partition column null_count should be Exact(0)"
660        );
661        assert_eq!(
662            partition_col_stats.min_value,
663            Precision::Exact(ScalarValue::Date32(Some(20148))),
664            "Partition column min should match partition value"
665        );
666        assert_eq!(
667            partition_col_stats.max_value,
668            Precision::Exact(ScalarValue::Date32(Some(20148))),
669            "Partition column max should match partition value"
670        );
671        assert_eq!(
672            partition_col_stats.distinct_count,
673            Precision::Exact(1),
674            "Partition column distinct_count should be Exact(1)"
675        );
676    }
677
678    #[test]
679    fn test_url_contains() {
680        let url = ListingTableUrl::parse("file:///var/data/mytable/").unwrap();
681
682        // standard case with default config
683        assert!(url.contains(
684            &Path::parse("/var/data/mytable/data.parquet").unwrap(),
685            true
686        ));
687
688        // standard case with `ignore_subdirectory` set to false
689        assert!(url.contains(
690            &Path::parse("/var/data/mytable/data.parquet").unwrap(),
691            false
692        ));
693
694        // as per documentation, when `ignore_subdirectory` is true, we should ignore files that aren't
695        // a direct child of the `url`
696        assert!(
697            url.contains(
698                &Path::parse("/var/data/mytable/mysubfolder/data.parquet").unwrap(),
699                true
700            )
701            .not()
702        );
703
704        // when we set `ignore_subdirectory` to false, we should not ignore the file
705        assert!(url.contains(
706            &Path::parse("/var/data/mytable/mysubfolder/data.parquet").unwrap(),
707            false
708        ));
709
710        // as above, `ignore_subdirectory` is false, so we include the file
711        assert!(url.contains(
712            &Path::parse("/var/data/mytable/year=2024/data.parquet").unwrap(),
713            false
714        ));
715
716        // in this case, we include the file even when `ignore_subdirectory` is true because the
717        // path segment is a hive partition which doesn't count as a subdirectory for the purposes
718        // of `Url::contains`
719        assert!(url.contains(
720            &Path::parse("/var/data/mytable/year=2024/data.parquet").unwrap(),
721            true
722        ));
723
724        // testing an empty path with default config
725        assert!(url.contains(&Path::parse("/var/data/mytable/").unwrap(), true));
726
727        // testing an empty path with `ignore_subdirectory` set to false
728        assert!(url.contains(&Path::parse("/var/data/mytable/").unwrap(), false));
729    }
730
731    /// Regression test for <https://github.com/apache/datafusion/issues/19605>
732    #[tokio::test]
733    async fn test_calculate_range_single_line_file() {
734        use super::{PartitionedFile, RangeCalculation, calculate_range};
735        use object_store::ObjectStore;
736        use object_store::memory::InMemory;
737
738        let content = r#"{"id":1,"data":"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}"#;
739        let file_size = content.len() as u64;
740
741        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
742        let path = Path::from("test.json");
743        store.put(&path, content.into()).await.unwrap();
744
745        let mid = file_size / 2;
746        let partitioned_file = PartitionedFile::new_with_range(
747            path.to_string(),
748            file_size,
749            mid as i64,
750            file_size as i64,
751        );
752
753        let result = calculate_range(&partitioned_file, &store, None).await;
754
755        assert!(matches!(result, Ok(RangeCalculation::TerminateEarly)));
756    }
757}