Skip to main content

datafusion_datasource/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#![doc(
19    html_logo_url = "https://raw.githubusercontent.com/apache/datafusion/19fe44cf2f30cbdd63d4a4f52c74055163c6cc38/docs/logos/standalone_logo/logo_original.svg",
20    html_favicon_url = "https://raw.githubusercontent.com/apache/datafusion/19fe44cf2f30cbdd63d4a4f52c74055163c6cc38/docs/logos/standalone_logo/logo_original.svg"
21)]
22#![cfg_attr(docsrs, feature(doc_cfg))]
23// Make sure fast / cheap clones on Arc are explicit:
24// https://github.com/apache/datafusion/issues/11143
25#![cfg_attr(not(test), deny(clippy::clone_on_ref_ptr))]
26#![cfg_attr(test, allow(clippy::needless_pass_by_value))]
27
28//! A table that uses the `ObjectStore` listing capability
29//! to get the list of files to process.
30
31pub mod decoder;
32pub mod display;
33pub mod file;
34pub mod file_compression_type;
35pub mod file_format;
36pub mod file_groups;
37pub mod file_scan_config;
38pub mod file_sink_config;
39pub mod file_stream;
40pub mod memory;
41pub mod morsel;
42pub mod projection;
43pub mod schema_adapter;
44pub mod sink;
45pub mod source;
46mod statistics;
47pub mod table_schema;
48
49#[cfg(test)]
50pub mod test_util;
51
52pub mod url;
53pub mod write;
54pub use self::file::as_file_source;
55pub use self::url::ListingTableUrl;
56use crate::file_groups::FileGroup;
57use chrono::TimeZone;
58use datafusion_common::stats::Precision;
59use datafusion_common::{ColumnStatistics, Result, TableReference, exec_datafusion_err};
60use datafusion_common::{ScalarValue, Statistics};
61use datafusion_physical_expr::LexOrdering;
62use futures::{Stream, StreamExt};
63use object_store::{GetOptions, GetRange, ObjectStore};
64use object_store::{ObjectMeta, path::Path};
65pub use table_schema::TableSchema;
66// Remove when add_row_stats is remove
67#[expect(deprecated)]
68pub use statistics::add_row_stats;
69pub use statistics::compute_all_files_statistics;
70use std::any::Any;
71use std::ops::Range;
72use std::pin::Pin;
73use std::sync::Arc;
74
75/// User-defined per-file extension data, keyed by concrete Rust type.
76///
77/// Re-exported from [`datafusion_common::extensions::Extensions`]; the same
78/// type backs `SessionConfig::extensions`, `ExtendedStatistics::extensions`,
79/// and other extension fields throughout DataFusion.
80pub type FileExtensions = datafusion_common::extensions::Extensions;
81
82/// Stream of files get listed from object store
83#[deprecated(
84    since = "54.0.0",
85    note = "This type is unused and will be removed in a future release"
86)]
87pub type PartitionedFileStream =
88    Pin<Box<dyn Stream<Item = Result<PartitionedFile>> + Send + Sync + 'static>>;
89
90/// Only scan a subset of Row Groups from the Parquet file whose data "midpoint"
91/// lies within the [start, end) byte offsets. This option can be used to scan non-overlapping
92/// sections of a Parquet file in parallel.
93#[derive(Debug, Clone, PartialEq, Hash, Eq, PartialOrd, Ord)]
94pub struct FileRange {
95    /// Range start
96    pub start: i64,
97    /// Range end
98    pub end: i64,
99}
100
101impl FileRange {
102    /// returns true if this file range contains the specified offset
103    pub fn contains(&self, offset: i64) -> bool {
104        offset >= self.start && offset < self.end
105    }
106}
107
108#[derive(Debug, Clone)]
109/// A single file or part of a file that should be read, along with its schema, statistics
110/// and partition column values that need to be appended to each row.
111///
112/// # Statistics
113///
114/// The [`Self::statistics`] field contains statistics for the **full table schema**,
115/// which includes both file columns and partition columns. When statistics are set via
116/// [`Self::with_statistics`], exact statistics for partition columns are automatically
117/// computed from [`Self::partition_values`]:
118///
119/// - `min = max = partition_value` (all rows in a file share the same partition value)
120/// - `null_count = 0` (partition values extracted from paths are never null)
121/// - `distinct_count = 1` (single distinct value per file for each partition column)
122///
123/// This enables query optimizers to use partition column bounds for pruning and planning.
124pub struct PartitionedFile {
125    /// Path for the file (e.g. URL, filesystem path, etc)
126    pub object_meta: ObjectMeta,
127    /// Values of partition columns to be appended to each row.
128    ///
129    /// These MUST have the same count, order, and type than the [`table_partition_cols`].
130    ///
131    /// You may use [`wrap_partition_value_in_dict`] to wrap them if you have used [`wrap_partition_type_in_dict`] to wrap the column type.
132    ///
133    ///
134    /// [`wrap_partition_type_in_dict`]: crate::file_scan_config::wrap_partition_type_in_dict
135    /// [`wrap_partition_value_in_dict`]: crate::file_scan_config::wrap_partition_value_in_dict
136    /// [`table_partition_cols`]: https://github.com/apache/datafusion/blob/main/datafusion/core/src/datasource/file_format/options.rs#L87
137    pub partition_values: Vec<ScalarValue>,
138    /// An optional file range for a more fine-grained parallel execution
139    pub range: Option<FileRange>,
140    /// Optional statistics that describe the data in this file if known.
141    ///
142    /// DataFusion relies on these statistics for planning (in particular to sort file groups),
143    /// so if they are incorrect, incorrect answers may result.
144    ///
145    /// These statistics cover the full table schema: file columns plus partition columns.
146    /// When set via [`Self::with_statistics`], partition column statistics are automatically
147    /// computed from [`Self::partition_values`] with exact min/max/null_count/distinct_count.
148    pub statistics: Option<Arc<Statistics>>,
149    /// The known lexicographical ordering of the rows in this file, if any.
150    ///
151    /// This describes how the data within the file is sorted with respect to one or more
152    /// columns, and is used by the optimizer for planning operations that depend on input
153    /// ordering (e.g. merges, sorts, and certain aggregations).
154    ///
155    /// When available, this is typically inferred from file-level metadata exposed by the
156    /// underlying format (for example, Parquet `sorting_columns`), but it may also be set
157    /// explicitly via [`Self::with_ordering`].
158    pub ordering: Option<LexOrdering>,
159    /// User-defined per-file metadata, keyed by Rust type. Multiple
160    /// independent components can each attach their own data here without
161    /// conflict — see [`FileExtensions`].
162    pub extensions: FileExtensions,
163    /// The estimated size of the parquet metadata, in bytes
164    pub metadata_size_hint: Option<usize>,
165    pub table_reference: Option<TableReference>,
166}
167
168impl PartitionedFile {
169    /// Create a simple file without metadata or partition
170    pub fn new(path: impl Into<String>, size: u64) -> Self {
171        Self {
172            object_meta: ObjectMeta {
173                location: Path::from(path.into()),
174                last_modified: chrono::Utc.timestamp_nanos(0),
175                size,
176                e_tag: None,
177                version: None,
178            },
179            partition_values: vec![],
180            range: None,
181            statistics: None,
182            ordering: None,
183            extensions: FileExtensions::new(),
184            metadata_size_hint: None,
185            table_reference: None,
186        }
187    }
188
189    /// Create a file from a known ObjectMeta without partition
190    pub fn new_from_meta(object_meta: ObjectMeta) -> Self {
191        Self {
192            object_meta,
193            partition_values: vec![],
194            range: None,
195            statistics: None,
196            ordering: None,
197            extensions: FileExtensions::new(),
198            metadata_size_hint: None,
199            table_reference: None,
200        }
201    }
202
203    /// Create a file range without metadata or partition
204    pub fn new_with_range(path: String, size: u64, start: i64, end: i64) -> Self {
205        Self {
206            object_meta: ObjectMeta {
207                location: Path::from(path),
208                last_modified: chrono::Utc.timestamp_nanos(0),
209                size,
210                e_tag: None,
211                version: None,
212            },
213            partition_values: vec![],
214            range: Some(FileRange { start, end }),
215            statistics: None,
216            ordering: None,
217            extensions: FileExtensions::new(),
218            metadata_size_hint: None,
219            table_reference: None,
220        }
221        .with_range(start, end)
222    }
223
224    /// Attach partition values to this file.
225    /// This replaces any existing partition values.
226    pub fn with_partition_values(mut self, partition_values: Vec<ScalarValue>) -> Self {
227        self.partition_values = partition_values;
228        self
229    }
230
231    pub fn with_table_reference(
232        mut self,
233        table_reference: Option<TableReference>,
234    ) -> Self {
235        self.table_reference = table_reference;
236        self
237    }
238
239    /// Size of the file to be scanned (taking into account the range, if present).
240    pub fn effective_size(&self) -> u64 {
241        if let Some(range) = &self.range {
242            (range.end - range.start) as u64
243        } else {
244            self.object_meta.size
245        }
246    }
247
248    /// Effective range of the file to be scanned.
249    pub fn range(&self) -> (u64, u64) {
250        if let Some(range) = &self.range {
251            (range.start as u64, range.end as u64)
252        } else {
253            (0, self.object_meta.size)
254        }
255    }
256
257    /// Provide a hint to the size of the file metadata. If a hint is provided
258    /// the reader will try and fetch the last `size_hint` bytes of the parquet file optimistically.
259    /// Without an appropriate hint, two read may be required to fetch the metadata.
260    pub fn with_metadata_size_hint(mut self, metadata_size_hint: usize) -> Self {
261        self.metadata_size_hint = Some(metadata_size_hint);
262        self
263    }
264
265    /// Return a file reference from the given path
266    pub fn from_path(path: String) -> Result<Self> {
267        let size = std::fs::metadata(path.clone())?.len();
268        Ok(Self::new(path, size))
269    }
270
271    /// Return the path of this partitioned file
272    pub fn path(&self) -> &Path {
273        &self.object_meta.location
274    }
275
276    /// Update the file to only scan the specified range (in bytes)
277    pub fn with_range(mut self, start: i64, end: i64) -> Self {
278        self.range = Some(FileRange { start, end });
279        self
280    }
281
282    /// Attach a typed user-defined extension to this file. Multiple
283    /// independent extensions can be attached, each keyed by its concrete
284    /// Rust type. Inserting a value of a type that already has an extension
285    /// replaces the previous one.
286    ///
287    /// This can be used to pass reader-specific information (e.g. a
288    /// `ParquetAccessPlan`, or a custom index entry).
289    pub fn with_extension<T: Any + Send + Sync>(mut self, value: T) -> Self {
290        self.extensions.insert(value);
291        self
292    }
293
294    /// Borrow the extension of type `T`, if one is attached.
295    pub fn extension<T: Any + Send + Sync>(&self) -> Option<&T> {
296        self.extensions.get::<T>()
297    }
298
299    /// Attach a type-erased extension to this file.
300    ///
301    /// Kept as a backwards-compatible shim; prefer [`Self::with_extension`]
302    /// which keys the extension by its concrete Rust type at the call site.
303    #[deprecated(
304        since = "54.0.0",
305        note = "use `with_extension`; the extension is keyed by its concrete type"
306    )]
307    pub fn with_extensions(mut self, extensions: Arc<dyn Any + Send + Sync>) -> Self {
308        #[expect(deprecated)]
309        self.extensions.insert_dyn(extensions);
310        self
311    }
312
313    /// Update the statistics for this file.
314    ///
315    /// The provided `statistics` should cover only the file schema columns.
316    /// This method will automatically append exact statistics for partition columns
317    /// based on `partition_values`:
318    /// - `min = max = partition_value` (all rows have the same value)
319    /// - `null_count = 0` (partition values from paths are never null)
320    /// - `distinct_count = 1` (all rows have the same partition value)
321    pub fn with_statistics(mut self, file_statistics: Arc<Statistics>) -> Self {
322        if self.partition_values.is_empty() {
323            // No partition columns, use stats as-is
324            self.statistics = Some(file_statistics);
325        } else {
326            // Extend stats with exact partition column statistics
327            let mut stats = Arc::unwrap_or_clone(file_statistics);
328            for partition_value in &self.partition_values {
329                let col_stats = ColumnStatistics {
330                    null_count: Precision::Exact(0),
331                    max_value: Precision::Exact(partition_value.clone()),
332                    min_value: Precision::Exact(partition_value.clone()),
333                    distinct_count: Precision::Exact(1),
334                    sum_value: Precision::Absent,
335                    byte_size: partition_value
336                        .data_type()
337                        .primitive_width()
338                        .map(|w| stats.num_rows.multiply(&Precision::Exact(w)))
339                        .unwrap_or_else(|| Precision::Absent),
340                };
341                stats.column_statistics.push(col_stats);
342            }
343            self.statistics = Some(Arc::new(stats));
344        }
345        self
346    }
347
348    /// Check if this file has any statistics.
349    /// This returns `true` if the file has any Exact or Inexact statistics
350    /// and `false` if all statistics are `Precision::Absent`.
351    pub fn has_statistics(&self) -> bool {
352        if let Some(stats) = &self.statistics {
353            stats.column_statistics.iter().any(|col_stats| {
354                col_stats.null_count != Precision::Absent
355                    || col_stats.max_value != Precision::Absent
356                    || col_stats.min_value != Precision::Absent
357                    || col_stats.sum_value != Precision::Absent
358                    || col_stats.distinct_count != Precision::Absent
359            })
360        } else {
361            false
362        }
363    }
364
365    /// Set the known ordering of data in this file.
366    ///
367    /// The ordering represents the lexicographical sort order of the data,
368    /// typically inferred from file metadata (e.g., Parquet sorting_columns).
369    pub fn with_ordering(mut self, ordering: Option<LexOrdering>) -> Self {
370        self.ordering = ordering;
371        self
372    }
373}
374
375impl From<ObjectMeta> for PartitionedFile {
376    fn from(object_meta: ObjectMeta) -> Self {
377        PartitionedFile {
378            object_meta,
379            partition_values: vec![],
380            range: None,
381            statistics: None,
382            ordering: None,
383            extensions: FileExtensions::new(),
384            metadata_size_hint: None,
385            table_reference: None,
386        }
387    }
388}
389
390/// Represents the possible outcomes of a range calculation.
391///
392/// This enum is used to encapsulate the result of calculating the range of
393/// bytes to read from an object (like a file) in an object store.
394///
395/// Variants:
396/// - `Range(Option<Range<usize>>)`:
397///   Represents a range of bytes to be read. It contains an `Option` wrapping a
398///   `Range<usize>`. `None` signifies that the entire object should be read,
399///   while `Some(range)` specifies the exact byte range to read.
400/// - `TerminateEarly`:
401///   Indicates that the range calculation determined no further action is
402///   necessary, possibly because the calculated range is empty or invalid.
403pub enum RangeCalculation {
404    Range(Option<Range<u64>>),
405    TerminateEarly,
406}
407
408/// Calculates an appropriate byte range for reading from an object based on the
409/// provided metadata.
410///
411/// This asynchronous function examines the [`PartitionedFile`] of an object in an object store
412/// and determines the range of bytes to be read. The range calculation may adjust
413/// the start and end points to align with meaningful data boundaries (like newlines).
414///
415/// Returns a `Result` wrapping a [`RangeCalculation`], which is either a calculated byte range or an indication to terminate early.
416///
417/// Returns an `Error` if any part of the range calculation fails, such as issues in reading from the object store or invalid range boundaries.
418pub async fn calculate_range(
419    file: &PartitionedFile,
420    store: &Arc<dyn ObjectStore>,
421    terminator: Option<u8>,
422) -> Result<RangeCalculation> {
423    let location = &file.object_meta.location;
424    let file_size = file.object_meta.size;
425    let newline = terminator.unwrap_or(b'\n');
426
427    match file.range {
428        None => Ok(RangeCalculation::Range(None)),
429        Some(FileRange { start, end }) => {
430            let start: u64 = start.try_into().map_err(|_| {
431                exec_datafusion_err!("Expect start range to fit in u64, got {start}")
432            })?;
433            let end: u64 = end.try_into().map_err(|_| {
434                exec_datafusion_err!("Expect end range to fit in u64, got {end}")
435            })?;
436
437            let start_delta = if start != 0 {
438                find_first_newline(store, location, start - 1, file_size, newline).await?
439            } else {
440                0
441            };
442
443            if start + start_delta > end {
444                return Ok(RangeCalculation::TerminateEarly);
445            }
446
447            let end_delta = if end != file_size {
448                find_first_newline(store, location, end - 1, file_size, newline).await?
449            } else {
450                0
451            };
452
453            let range = start + start_delta..end + end_delta;
454
455            if range.start >= range.end {
456                return Ok(RangeCalculation::TerminateEarly);
457            }
458
459            Ok(RangeCalculation::Range(Some(range)))
460        }
461    }
462}
463
464/// Asynchronously finds the position of the first newline character in a specified byte range
465/// within an object, such as a file, in an object store.
466///
467/// This function scans the contents of the object starting from the specified `start` position
468/// up to the `end` position, looking for the first occurrence of a newline character.
469/// It returns the position of the first newline relative to the start of the range.
470///
471/// Returns a `Result` wrapping a `usize` that represents the position of the first newline character found within the specified range. If no newline is found, it returns the length of the scanned data, effectively indicating the end of the range.
472///
473/// The function returns an `Error` if any issues arise while reading from the object store or processing the data stream.
474async fn find_first_newline(
475    object_store: &Arc<dyn ObjectStore>,
476    location: &Path,
477    start: u64,
478    end: u64,
479    newline: u8,
480) -> Result<u64> {
481    let options = GetOptions {
482        range: Some(GetRange::Bounded(start..end)),
483        ..Default::default()
484    };
485
486    let result = object_store.get_opts(location, options).await?;
487    let mut result_stream = result.into_stream();
488
489    let mut index = 0;
490
491    while let Some(chunk) = result_stream.next().await.transpose()? {
492        if let Some(position) = chunk.iter().position(|&byte| byte == newline) {
493            let position = position as u64;
494            return Ok(index + position);
495        }
496
497        index += chunk.len() as u64;
498    }
499
500    Ok(index)
501}
502
503/// Generates test files with min-max statistics in different overlap patterns.
504///
505/// Used by tests and benchmarks.
506///
507/// # Overlap Factors
508///
509/// The `overlap_factor` parameter controls how much the value ranges in generated test files overlap:
510/// - `0.0`: No overlap between files (completely disjoint ranges)
511/// - `0.2`: Low overlap (20% of the range size overlaps with adjacent files)
512/// - `0.5`: Medium overlap (50% of ranges overlap)
513/// - `0.8`: High overlap (80% of ranges overlap between files)
514///
515/// # Examples
516///
517/// With 5 files and different overlap factors showing `[min, max]` ranges:
518///
519/// overlap_factor = 0.0 (no overlap):
520///
521/// File 0: [0, 20]
522/// File 1: [20, 40]
523/// File 2: [40, 60]
524/// File 3: [60, 80]
525/// File 4: [80, 100]
526///
527/// overlap_factor = 0.5 (50% overlap):
528///
529/// File 0: [0, 40]
530/// File 1: [20, 60]
531/// File 2: [40, 80]
532/// File 3: [60, 100]
533/// File 4: [80, 120]
534///
535/// overlap_factor = 0.8 (80% overlap):
536///
537/// File 0: [0, 100]
538/// File 1: [20, 120]
539/// File 2: [40, 140]
540/// File 3: [60, 160]
541/// File 4: [80, 180]
542pub fn generate_test_files(num_files: usize, overlap_factor: f64) -> Vec<FileGroup> {
543    let mut files = Vec::with_capacity(num_files);
544    if num_files == 0 {
545        return vec![];
546    }
547    let range_size = if overlap_factor == 0.0 {
548        100 / num_files as i64
549    } else {
550        (100.0 / (overlap_factor * num_files as f64)).max(1.0) as i64
551    };
552
553    for i in 0..num_files {
554        let base = (i as f64 * range_size as f64 * (1.0 - overlap_factor)) as i64;
555        let min = base as f64;
556        let max = (base + range_size) as f64;
557
558        let file = PartitionedFile {
559            object_meta: ObjectMeta {
560                location: Path::from(format!("file_{i}.parquet")),
561                last_modified: chrono::Utc::now(),
562                size: 1000,
563                e_tag: None,
564                version: None,
565            },
566            partition_values: vec![],
567            range: None,
568            statistics: Some(Arc::new(Statistics {
569                num_rows: Precision::Exact(100),
570                total_byte_size: Precision::Exact(1000),
571                column_statistics: vec![ColumnStatistics {
572                    null_count: Precision::Exact(0),
573                    max_value: Precision::Exact(ScalarValue::Float64(Some(max))),
574                    min_value: Precision::Exact(ScalarValue::Float64(Some(min))),
575                    sum_value: Precision::Absent,
576                    distinct_count: Precision::Absent,
577                    byte_size: Precision::Absent,
578                }],
579            })),
580            ordering: None,
581            extensions: FileExtensions::new(),
582            metadata_size_hint: None,
583            table_reference: None,
584        };
585        files.push(file);
586    }
587
588    vec![FileGroup::new(files)]
589}
590
591// Helper function to verify that files within each group maintain sort order
592/// Used by tests and benchmarks
593pub fn verify_sort_integrity(file_groups: &[FileGroup]) -> bool {
594    for group in file_groups {
595        let files = group.iter().collect::<Vec<_>>();
596        for i in 1..files.len() {
597            let prev_file = files[i - 1];
598            let curr_file = files[i];
599
600            // Check if the min value of current file is greater than max value of previous file
601            if let (Some(prev_stats), Some(curr_stats)) =
602                (&prev_file.statistics, &curr_file.statistics)
603            {
604                let prev_max = &prev_stats.column_statistics[0].max_value;
605                let curr_min = &curr_stats.column_statistics[0].min_value;
606                if curr_min.get_value().unwrap() <= prev_max.get_value().unwrap() {
607                    return false;
608                }
609            }
610        }
611    }
612    true
613}
614
615#[cfg(test)]
616mod tests {
617    use super::ListingTableUrl;
618    use arrow::{
619        array::{ArrayRef, Int32Array, RecordBatch},
620        datatypes::{DataType, Field, Schema, SchemaRef},
621    };
622    use datafusion_execution::object_store::{
623        DefaultObjectStoreRegistry, ObjectStoreRegistry,
624    };
625    use object_store::{ObjectStoreExt, local::LocalFileSystem, path::Path};
626    use std::{collections::HashMap, ops::Not, sync::Arc};
627    use url::Url;
628
629    /// Return a RecordBatch with a single Int32 array with values (0..sz) in a field named "i"
630    pub fn make_partition(sz: i32) -> RecordBatch {
631        let seq_start = 0;
632        let seq_end = sz;
633        let values = (seq_start..seq_end).collect::<Vec<_>>();
634        let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
635        let arr = Arc::new(Int32Array::from(values));
636
637        RecordBatch::try_new(schema, vec![arr as ArrayRef]).unwrap()
638    }
639
640    /// Get the schema for the aggregate_test_* csv files
641    pub fn aggr_test_schema() -> SchemaRef {
642        let mut f1 = Field::new("c1", DataType::Utf8, false);
643        f1.set_metadata(HashMap::from_iter(vec![("testing".into(), "test".into())]));
644        let schema = Schema::new(vec![
645            f1,
646            Field::new("c2", DataType::UInt32, false),
647            Field::new("c3", DataType::Int8, false),
648            Field::new("c4", DataType::Int16, false),
649            Field::new("c5", DataType::Int32, false),
650            Field::new("c6", DataType::Int64, false),
651            Field::new("c7", DataType::UInt8, false),
652            Field::new("c8", DataType::UInt16, false),
653            Field::new("c9", DataType::UInt32, false),
654            Field::new("c10", DataType::UInt64, false),
655            Field::new("c11", DataType::Float32, false),
656            Field::new("c12", DataType::Float64, false),
657            Field::new("c13", DataType::Utf8, false),
658        ]);
659
660        Arc::new(schema)
661    }
662
663    #[test]
664    fn test_object_store_listing_url() {
665        let listing = ListingTableUrl::parse("file:///").unwrap();
666        let store = listing.object_store();
667        assert_eq!(store.as_str(), "file:///");
668
669        let listing = ListingTableUrl::parse("s3://bucket/").unwrap();
670        let store = listing.object_store();
671        assert_eq!(store.as_str(), "s3://bucket/");
672    }
673
674    #[test]
675    fn test_get_store_hdfs() {
676        let sut = DefaultObjectStoreRegistry::default();
677        let url = Url::parse("hdfs://localhost:8020").unwrap();
678        sut.register_store(&url, Arc::new(LocalFileSystem::new()));
679        let url = ListingTableUrl::parse("hdfs://localhost:8020/key").unwrap();
680        sut.get_store(url.as_ref()).unwrap();
681    }
682
683    #[test]
684    fn test_get_store_s3() {
685        let sut = DefaultObjectStoreRegistry::default();
686        let url = Url::parse("s3://bucket/key").unwrap();
687        sut.register_store(&url, Arc::new(LocalFileSystem::new()));
688        let url = ListingTableUrl::parse("s3://bucket/key").unwrap();
689        sut.get_store(url.as_ref()).unwrap();
690    }
691
692    #[test]
693    fn test_get_store_file() {
694        let sut = DefaultObjectStoreRegistry::default();
695        let url = ListingTableUrl::parse("file:///bucket/key").unwrap();
696        sut.get_store(url.as_ref()).unwrap();
697    }
698
699    #[test]
700    fn test_get_store_local() {
701        let sut = DefaultObjectStoreRegistry::default();
702        let url = ListingTableUrl::parse("../").unwrap();
703        sut.get_store(url.as_ref()).unwrap();
704    }
705
706    #[test]
707    fn test_with_statistics_appends_partition_column_stats() {
708        use crate::PartitionedFile;
709        use datafusion_common::stats::Precision;
710        use datafusion_common::{ColumnStatistics, ScalarValue, Statistics};
711
712        // Create a PartitionedFile with partition values
713        let mut pf = PartitionedFile::new(
714            "test.parquet",
715            100, // file size
716        );
717        pf.partition_values = vec![
718            ScalarValue::Date32(Some(20148)), // 2025-03-01
719        ];
720
721        // Create file-only statistics (1 column for 'id')
722        let file_stats = Arc::new(Statistics {
723            num_rows: Precision::Exact(2),
724            total_byte_size: Precision::Exact(16),
725            column_statistics: vec![ColumnStatistics {
726                null_count: Precision::Exact(0),
727                max_value: Precision::Exact(ScalarValue::Int32(Some(4))),
728                min_value: Precision::Exact(ScalarValue::Int32(Some(3))),
729                sum_value: Precision::Absent,
730                distinct_count: Precision::Absent,
731                byte_size: Precision::Absent,
732            }],
733        });
734
735        // Call with_statistics - should append partition column stats
736        let pf = pf.with_statistics(file_stats);
737
738        // Verify the statistics now have 2 columns
739        let stats = pf.statistics.unwrap();
740        assert_eq!(
741            stats.column_statistics.len(),
742            2,
743            "Expected 2 columns (id + date partition)"
744        );
745
746        // Verify partition column statistics
747        let partition_col_stats = &stats.column_statistics[1];
748        assert_eq!(
749            partition_col_stats.null_count,
750            Precision::Exact(0),
751            "Partition column null_count should be Exact(0)"
752        );
753        assert_eq!(
754            partition_col_stats.min_value,
755            Precision::Exact(ScalarValue::Date32(Some(20148))),
756            "Partition column min should match partition value"
757        );
758        assert_eq!(
759            partition_col_stats.max_value,
760            Precision::Exact(ScalarValue::Date32(Some(20148))),
761            "Partition column max should match partition value"
762        );
763        assert_eq!(
764            partition_col_stats.distinct_count,
765            Precision::Exact(1),
766            "Partition column distinct_count should be Exact(1)"
767        );
768    }
769
770    #[test]
771    fn test_url_contains() {
772        let url = ListingTableUrl::parse("file:///var/data/mytable/").unwrap();
773
774        // standard case with default config
775        assert!(url.contains(
776            &Path::parse("/var/data/mytable/data.parquet").unwrap(),
777            true
778        ));
779
780        // standard case with `ignore_subdirectory` set to false
781        assert!(url.contains(
782            &Path::parse("/var/data/mytable/data.parquet").unwrap(),
783            false
784        ));
785
786        // as per documentation, when `ignore_subdirectory` is true, we should ignore files that aren't
787        // a direct child of the `url`
788        assert!(
789            url.contains(
790                &Path::parse("/var/data/mytable/mysubfolder/data.parquet").unwrap(),
791                true
792            )
793            .not()
794        );
795
796        // when we set `ignore_subdirectory` to false, we should not ignore the file
797        assert!(url.contains(
798            &Path::parse("/var/data/mytable/mysubfolder/data.parquet").unwrap(),
799            false
800        ));
801
802        // as above, `ignore_subdirectory` is false, so we include the file
803        assert!(url.contains(
804            &Path::parse("/var/data/mytable/year=2024/data.parquet").unwrap(),
805            false
806        ));
807
808        // in this case, we include the file even when `ignore_subdirectory` is true because the
809        // path segment is a hive partition which doesn't count as a subdirectory for the purposes
810        // of `Url::contains`
811        assert!(url.contains(
812            &Path::parse("/var/data/mytable/year=2024/data.parquet").unwrap(),
813            true
814        ));
815
816        // testing an empty path with default config
817        assert!(url.contains(&Path::parse("/var/data/mytable/").unwrap(), true));
818
819        // testing an empty path with `ignore_subdirectory` set to false
820        assert!(url.contains(&Path::parse("/var/data/mytable/").unwrap(), false));
821    }
822
823    /// Regression test for <https://github.com/apache/datafusion/issues/19605>
824    #[tokio::test]
825    async fn test_calculate_range_single_line_file() {
826        use super::{PartitionedFile, RangeCalculation, calculate_range};
827        use object_store::ObjectStore;
828        use object_store::memory::InMemory;
829
830        let content = r#"{"id":1,"data":"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}"#;
831        let file_size = content.len() as u64;
832
833        let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
834        let path = Path::from("test.json");
835        store.put(&path, content.into()).await.unwrap();
836
837        let mid = file_size / 2;
838        let partitioned_file = PartitionedFile::new_with_range(
839            path.to_string(),
840            file_size,
841            mid as i64,
842            file_size as i64,
843        );
844
845        let result = calculate_range(&partitioned_file, &store, None).await;
846
847        assert!(matches!(result, Ok(RangeCalculation::TerminateEarly)));
848    }
849}