datafusion_datasource/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#![doc(
19    html_logo_url = "https://raw.githubusercontent.com/apache/datafusion/19fe44cf2f30cbdd63d4a4f52c74055163c6cc38/docs/logos/standalone_logo/logo_original.svg",
20    html_favicon_url = "https://raw.githubusercontent.com/apache/datafusion/19fe44cf2f30cbdd63d4a4f52c74055163c6cc38/docs/logos/standalone_logo/logo_original.svg"
21)]
22#![cfg_attr(docsrs, feature(doc_auto_cfg))]
23// Make sure fast / cheap clones on Arc are explicit:
24// https://github.com/apache/datafusion/issues/11143
25#![cfg_attr(not(test), deny(clippy::clone_on_ref_ptr))]
26
27//! A table that uses the `ObjectStore` listing capability
28//! to get the list of files to process.
29
30pub mod decoder;
31pub mod display;
32pub mod file;
33pub mod file_compression_type;
34pub mod file_format;
35pub mod file_groups;
36pub mod file_meta;
37pub mod file_scan_config;
38pub mod file_sink_config;
39pub mod file_stream;
40pub mod memory;
41pub mod schema_adapter;
42pub mod sink;
43pub mod source;
44mod statistics;
45
46#[cfg(test)]
47pub mod test_util;
48
49pub mod url;
50pub mod write;
51pub use self::url::ListingTableUrl;
52use crate::file_groups::FileGroup;
53use chrono::TimeZone;
54use datafusion_common::stats::Precision;
55use datafusion_common::{exec_datafusion_err, ColumnStatistics, Result};
56use datafusion_common::{ScalarValue, Statistics};
57use file_meta::FileMeta;
58use futures::{Stream, StreamExt};
59use object_store::{path::Path, ObjectMeta};
60use object_store::{GetOptions, GetRange, ObjectStore};
61// Remove when add_row_stats is remove
62#[allow(deprecated)]
63pub use statistics::add_row_stats;
64pub use statistics::compute_all_files_statistics;
65use std::ops::Range;
66use std::pin::Pin;
67use std::sync::Arc;
68
69/// Stream of files get listed from object store
70pub type PartitionedFileStream =
71    Pin<Box<dyn Stream<Item = Result<PartitionedFile>> + Send + Sync + 'static>>;
72
73/// Only scan a subset of Row Groups from the Parquet file whose data "midpoint"
74/// lies within the [start, end) byte offsets. This option can be used to scan non-overlapping
75/// sections of a Parquet file in parallel.
76#[derive(Debug, Clone, PartialEq, Hash, Eq, PartialOrd, Ord)]
77pub struct FileRange {
78    /// Range start
79    pub start: i64,
80    /// Range end
81    pub end: i64,
82}
83
84impl FileRange {
85    /// returns true if this file range contains the specified offset
86    pub fn contains(&self, offset: i64) -> bool {
87        offset >= self.start && offset < self.end
88    }
89}
90
91#[derive(Debug, Clone)]
92/// A single file or part of a file that should be read, along with its schema, statistics
93/// and partition column values that need to be appended to each row.
94pub struct PartitionedFile {
95    /// Path for the file (e.g. URL, filesystem path, etc)
96    pub object_meta: ObjectMeta,
97    /// Values of partition columns to be appended to each row.
98    ///
99    /// These MUST have the same count, order, and type than the [`table_partition_cols`].
100    ///
101    /// You may use [`wrap_partition_value_in_dict`] to wrap them if you have used [`wrap_partition_type_in_dict`] to wrap the column type.
102    ///
103    ///
104    /// [`wrap_partition_type_in_dict`]: https://github.com/apache/datafusion/blob/main/datafusion/core/src/datasource/physical_plan/file_scan_config.rs#L55
105    /// [`wrap_partition_value_in_dict`]: https://github.com/apache/datafusion/blob/main/datafusion/core/src/datasource/physical_plan/file_scan_config.rs#L62
106    /// [`table_partition_cols`]: https://github.com/apache/datafusion/blob/main/datafusion/core/src/datasource/file_format/options.rs#L190
107    pub partition_values: Vec<ScalarValue>,
108    /// An optional file range for a more fine-grained parallel execution
109    pub range: Option<FileRange>,
110    /// Optional statistics that describe the data in this file if known.
111    ///
112    /// DataFusion relies on these statistics for planning (in particular to sort file groups),
113    /// so if they are incorrect, incorrect answers may result.
114    pub statistics: Option<Arc<Statistics>>,
115    /// An optional field for user defined per object metadata
116    pub extensions: Option<Arc<dyn std::any::Any + Send + Sync>>,
117    /// The estimated size of the parquet metadata, in bytes
118    pub metadata_size_hint: Option<usize>,
119}
120
121impl PartitionedFile {
122    /// Create a simple file without metadata or partition
123    pub fn new(path: impl Into<String>, size: u64) -> Self {
124        Self {
125            object_meta: ObjectMeta {
126                location: Path::from(path.into()),
127                last_modified: chrono::Utc.timestamp_nanos(0),
128                size,
129                e_tag: None,
130                version: None,
131            },
132            partition_values: vec![],
133            range: None,
134            statistics: None,
135            extensions: None,
136            metadata_size_hint: None,
137        }
138    }
139
140    /// Create a file range without metadata or partition
141    pub fn new_with_range(path: String, size: u64, start: i64, end: i64) -> Self {
142        Self {
143            object_meta: ObjectMeta {
144                location: Path::from(path),
145                last_modified: chrono::Utc.timestamp_nanos(0),
146                size,
147                e_tag: None,
148                version: None,
149            },
150            partition_values: vec![],
151            range: Some(FileRange { start, end }),
152            statistics: None,
153            extensions: None,
154            metadata_size_hint: None,
155        }
156        .with_range(start, end)
157    }
158
159    /// Provide a hint to the size of the file metadata. If a hint is provided
160    /// the reader will try and fetch the last `size_hint` bytes of the parquet file optimistically.
161    /// Without an appropriate hint, two read may be required to fetch the metadata.
162    pub fn with_metadata_size_hint(mut self, metadata_size_hint: usize) -> Self {
163        self.metadata_size_hint = Some(metadata_size_hint);
164        self
165    }
166
167    /// Return a file reference from the given path
168    pub fn from_path(path: String) -> Result<Self> {
169        let size = std::fs::metadata(path.clone())?.len();
170        Ok(Self::new(path, size))
171    }
172
173    /// Return the path of this partitioned file
174    pub fn path(&self) -> &Path {
175        &self.object_meta.location
176    }
177
178    /// Update the file to only scan the specified range (in bytes)
179    pub fn with_range(mut self, start: i64, end: i64) -> Self {
180        self.range = Some(FileRange { start, end });
181        self
182    }
183
184    /// Update the user defined extensions for this file.
185    ///
186    /// This can be used to pass reader specific information.
187    pub fn with_extensions(
188        mut self,
189        extensions: Arc<dyn std::any::Any + Send + Sync>,
190    ) -> Self {
191        self.extensions = Some(extensions);
192        self
193    }
194
195    // Update the statistics for this file.
196    pub fn with_statistics(mut self, statistics: Arc<Statistics>) -> Self {
197        self.statistics = Some(statistics);
198        self
199    }
200}
201
202impl From<ObjectMeta> for PartitionedFile {
203    fn from(object_meta: ObjectMeta) -> Self {
204        PartitionedFile {
205            object_meta,
206            partition_values: vec![],
207            range: None,
208            statistics: None,
209            extensions: None,
210            metadata_size_hint: None,
211        }
212    }
213}
214
215/// Represents the possible outcomes of a range calculation.
216///
217/// This enum is used to encapsulate the result of calculating the range of
218/// bytes to read from an object (like a file) in an object store.
219///
220/// Variants:
221/// - `Range(Option<Range<usize>>)`:
222///   Represents a range of bytes to be read. It contains an `Option` wrapping a
223///   `Range<usize>`. `None` signifies that the entire object should be read,
224///   while `Some(range)` specifies the exact byte range to read.
225/// - `TerminateEarly`:
226///   Indicates that the range calculation determined no further action is
227///   necessary, possibly because the calculated range is empty or invalid.
228pub enum RangeCalculation {
229    Range(Option<Range<u64>>),
230    TerminateEarly,
231}
232
233/// Calculates an appropriate byte range for reading from an object based on the
234/// provided metadata.
235///
236/// This asynchronous function examines the `FileMeta` of an object in an object store
237/// and determines the range of bytes to be read. The range calculation may adjust
238/// the start and end points to align with meaningful data boundaries (like newlines).
239///
240/// Returns a `Result` wrapping a `RangeCalculation`, which is either a calculated byte range or an indication to terminate early.
241///
242/// Returns an `Error` if any part of the range calculation fails, such as issues in reading from the object store or invalid range boundaries.
243pub async fn calculate_range(
244    file_meta: &FileMeta,
245    store: &Arc<dyn ObjectStore>,
246    terminator: Option<u8>,
247) -> Result<RangeCalculation> {
248    let location = file_meta.location();
249    let file_size = file_meta.object_meta.size;
250    let newline = terminator.unwrap_or(b'\n');
251
252    match file_meta.range {
253        None => Ok(RangeCalculation::Range(None)),
254        Some(FileRange { start, end }) => {
255            let start: u64 = start.try_into().map_err(|_| {
256                exec_datafusion_err!("Expect start range to fit in u64, got {start}")
257            })?;
258            let end: u64 = end.try_into().map_err(|_| {
259                exec_datafusion_err!("Expect end range to fit in u64, got {end}")
260            })?;
261
262            let start_delta = if start != 0 {
263                find_first_newline(store, location, start - 1, file_size, newline).await?
264            } else {
265                0
266            };
267
268            let end_delta = if end != file_size {
269                find_first_newline(store, location, end - 1, file_size, newline).await?
270            } else {
271                0
272            };
273
274            let range = start + start_delta..end + end_delta;
275
276            if range.start == range.end {
277                return Ok(RangeCalculation::TerminateEarly);
278            }
279
280            Ok(RangeCalculation::Range(Some(range)))
281        }
282    }
283}
284
285/// Asynchronously finds the position of the first newline character in a specified byte range
286/// within an object, such as a file, in an object store.
287///
288/// This function scans the contents of the object starting from the specified `start` position
289/// up to the `end` position, looking for the first occurrence of a newline character.
290/// It returns the position of the first newline relative to the start of the range.
291///
292/// Returns a `Result` wrapping a `usize` that represents the position of the first newline character found within the specified range. If no newline is found, it returns the length of the scanned data, effectively indicating the end of the range.
293///
294/// The function returns an `Error` if any issues arise while reading from the object store or processing the data stream.
295///
296async fn find_first_newline(
297    object_store: &Arc<dyn ObjectStore>,
298    location: &Path,
299    start: u64,
300    end: u64,
301    newline: u8,
302) -> Result<u64> {
303    let options = GetOptions {
304        range: Some(GetRange::Bounded(start..end)),
305        ..Default::default()
306    };
307
308    let result = object_store.get_opts(location, options).await?;
309    let mut result_stream = result.into_stream();
310
311    let mut index = 0;
312
313    while let Some(chunk) = result_stream.next().await.transpose()? {
314        if let Some(position) = chunk.iter().position(|&byte| byte == newline) {
315            let position = position as u64;
316            return Ok(index + position);
317        }
318
319        index += chunk.len() as u64;
320    }
321
322    Ok(index)
323}
324
325/// Generates test files with min-max statistics in different overlap patterns.
326///
327/// Used by tests and benchmarks.
328///
329/// # Overlap Factors
330///
331/// The `overlap_factor` parameter controls how much the value ranges in generated test files overlap:
332/// - `0.0`: No overlap between files (completely disjoint ranges)
333/// - `0.2`: Low overlap (20% of the range size overlaps with adjacent files)
334/// - `0.5`: Medium overlap (50% of ranges overlap)
335/// - `0.8`: High overlap (80% of ranges overlap between files)
336///
337/// # Examples
338///
339/// With 5 files and different overlap factors showing `[min, max]` ranges:
340///
341/// overlap_factor = 0.0 (no overlap):
342///
343/// File 0: [0, 20]
344/// File 1: [20, 40]
345/// File 2: [40, 60]
346/// File 3: [60, 80]
347/// File 4: [80, 100]
348///
349/// overlap_factor = 0.5 (50% overlap):
350///
351/// File 0: [0, 40]
352/// File 1: [20, 60]
353/// File 2: [40, 80]
354/// File 3: [60, 100]
355/// File 4: [80, 120]
356///
357/// overlap_factor = 0.8 (80% overlap):
358///
359/// File 0: [0, 100]
360/// File 1: [20, 120]
361/// File 2: [40, 140]
362/// File 3: [60, 160]
363/// File 4: [80, 180]
364pub fn generate_test_files(num_files: usize, overlap_factor: f64) -> Vec<FileGroup> {
365    let mut files = Vec::with_capacity(num_files);
366    if num_files == 0 {
367        return vec![];
368    }
369    let range_size = if overlap_factor == 0.0 {
370        100 / num_files as i64
371    } else {
372        (100.0 / (overlap_factor * num_files as f64)).max(1.0) as i64
373    };
374
375    for i in 0..num_files {
376        let base = (i as f64 * range_size as f64 * (1.0 - overlap_factor)) as i64;
377        let min = base as f64;
378        let max = (base + range_size) as f64;
379
380        let file = PartitionedFile {
381            object_meta: ObjectMeta {
382                location: Path::from(format!("file_{}.parquet", i)),
383                last_modified: chrono::Utc::now(),
384                size: 1000,
385                e_tag: None,
386                version: None,
387            },
388            partition_values: vec![],
389            range: None,
390            statistics: Some(Arc::new(Statistics {
391                num_rows: Precision::Exact(100),
392                total_byte_size: Precision::Exact(1000),
393                column_statistics: vec![ColumnStatistics {
394                    null_count: Precision::Exact(0),
395                    max_value: Precision::Exact(ScalarValue::Float64(Some(max))),
396                    min_value: Precision::Exact(ScalarValue::Float64(Some(min))),
397                    sum_value: Precision::Absent,
398                    distinct_count: Precision::Absent,
399                }],
400            })),
401            extensions: None,
402            metadata_size_hint: None,
403        };
404        files.push(file);
405    }
406
407    vec![FileGroup::new(files)]
408}
409
410// Helper function to verify that files within each group maintain sort order
411/// Used by tests and benchmarks
412pub fn verify_sort_integrity(file_groups: &[FileGroup]) -> bool {
413    for group in file_groups {
414        let files = group.iter().collect::<Vec<_>>();
415        for i in 1..files.len() {
416            let prev_file = files[i - 1];
417            let curr_file = files[i];
418
419            // Check if the min value of current file is greater than max value of previous file
420            if let (Some(prev_stats), Some(curr_stats)) =
421                (&prev_file.statistics, &curr_file.statistics)
422            {
423                let prev_max = &prev_stats.column_statistics[0].max_value;
424                let curr_min = &curr_stats.column_statistics[0].min_value;
425                if curr_min.get_value().unwrap() <= prev_max.get_value().unwrap() {
426                    return false;
427                }
428            }
429        }
430    }
431    true
432}
433
434#[cfg(test)]
435mod tests {
436    use super::ListingTableUrl;
437    use arrow::{
438        array::{ArrayRef, Int32Array, RecordBatch},
439        datatypes::{DataType, Field, Schema, SchemaRef},
440    };
441    use datafusion_execution::object_store::{
442        DefaultObjectStoreRegistry, ObjectStoreRegistry,
443    };
444    use object_store::{local::LocalFileSystem, path::Path};
445    use std::{collections::HashMap, ops::Not, sync::Arc};
446    use url::Url;
447
448    /// Return a RecordBatch with a single Int32 array with values (0..sz) in a field named "i"
449    pub fn make_partition(sz: i32) -> RecordBatch {
450        let seq_start = 0;
451        let seq_end = sz;
452        let values = (seq_start..seq_end).collect::<Vec<_>>();
453        let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
454        let arr = Arc::new(Int32Array::from(values));
455
456        RecordBatch::try_new(schema, vec![arr as ArrayRef]).unwrap()
457    }
458
459    /// Get the schema for the aggregate_test_* csv files
460    pub fn aggr_test_schema() -> SchemaRef {
461        let mut f1 = Field::new("c1", DataType::Utf8, false);
462        f1.set_metadata(HashMap::from_iter(vec![("testing".into(), "test".into())]));
463        let schema = Schema::new(vec![
464            f1,
465            Field::new("c2", DataType::UInt32, false),
466            Field::new("c3", DataType::Int8, false),
467            Field::new("c4", DataType::Int16, false),
468            Field::new("c5", DataType::Int32, false),
469            Field::new("c6", DataType::Int64, false),
470            Field::new("c7", DataType::UInt8, false),
471            Field::new("c8", DataType::UInt16, false),
472            Field::new("c9", DataType::UInt32, false),
473            Field::new("c10", DataType::UInt64, false),
474            Field::new("c11", DataType::Float32, false),
475            Field::new("c12", DataType::Float64, false),
476            Field::new("c13", DataType::Utf8, false),
477        ]);
478
479        Arc::new(schema)
480    }
481
482    #[test]
483    fn test_object_store_listing_url() {
484        let listing = ListingTableUrl::parse("file:///").unwrap();
485        let store = listing.object_store();
486        assert_eq!(store.as_str(), "file:///");
487
488        let listing = ListingTableUrl::parse("s3://bucket/").unwrap();
489        let store = listing.object_store();
490        assert_eq!(store.as_str(), "s3://bucket/");
491    }
492
493    #[test]
494    fn test_get_store_hdfs() {
495        let sut = DefaultObjectStoreRegistry::default();
496        let url = Url::parse("hdfs://localhost:8020").unwrap();
497        sut.register_store(&url, Arc::new(LocalFileSystem::new()));
498        let url = ListingTableUrl::parse("hdfs://localhost:8020/key").unwrap();
499        sut.get_store(url.as_ref()).unwrap();
500    }
501
502    #[test]
503    fn test_get_store_s3() {
504        let sut = DefaultObjectStoreRegistry::default();
505        let url = Url::parse("s3://bucket/key").unwrap();
506        sut.register_store(&url, Arc::new(LocalFileSystem::new()));
507        let url = ListingTableUrl::parse("s3://bucket/key").unwrap();
508        sut.get_store(url.as_ref()).unwrap();
509    }
510
511    #[test]
512    fn test_get_store_file() {
513        let sut = DefaultObjectStoreRegistry::default();
514        let url = ListingTableUrl::parse("file:///bucket/key").unwrap();
515        sut.get_store(url.as_ref()).unwrap();
516    }
517
518    #[test]
519    fn test_get_store_local() {
520        let sut = DefaultObjectStoreRegistry::default();
521        let url = ListingTableUrl::parse("../").unwrap();
522        sut.get_store(url.as_ref()).unwrap();
523    }
524
525    #[test]
526    fn test_url_contains() {
527        let url = ListingTableUrl::parse("file:///var/data/mytable/").unwrap();
528
529        // standard case with default config
530        assert!(url.contains(
531            &Path::parse("/var/data/mytable/data.parquet").unwrap(),
532            true
533        ));
534
535        // standard case with `ignore_subdirectory` set to false
536        assert!(url.contains(
537            &Path::parse("/var/data/mytable/data.parquet").unwrap(),
538            false
539        ));
540
541        // as per documentation, when `ignore_subdirectory` is true, we should ignore files that aren't
542        // a direct child of the `url`
543        assert!(url
544            .contains(
545                &Path::parse("/var/data/mytable/mysubfolder/data.parquet").unwrap(),
546                true
547            )
548            .not());
549
550        // when we set `ignore_subdirectory` to false, we should not ignore the file
551        assert!(url.contains(
552            &Path::parse("/var/data/mytable/mysubfolder/data.parquet").unwrap(),
553            false
554        ));
555
556        // as above, `ignore_subdirectory` is false, so we include the file
557        assert!(url.contains(
558            &Path::parse("/var/data/mytable/year=2024/data.parquet").unwrap(),
559            false
560        ));
561
562        // in this case, we include the file even when `ignore_subdirectory` is true because the
563        // path segment is a hive partition which doesn't count as a subdirectory for the purposes
564        // of `Url::contains`
565        assert!(url.contains(
566            &Path::parse("/var/data/mytable/year=2024/data.parquet").unwrap(),
567            true
568        ));
569
570        // testing an empty path with default config
571        assert!(url.contains(&Path::parse("/var/data/mytable/").unwrap(), true));
572
573        // testing an empty path with `ignore_subdirectory` set to false
574        assert!(url.contains(&Path::parse("/var/data/mytable/").unwrap(), false));
575    }
576}