datafusion_datasource/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#![doc(
19    html_logo_url = "https://raw.githubusercontent.com/apache/datafusion/19fe44cf2f30cbdd63d4a4f52c74055163c6cc38/docs/logos/standalone_logo/logo_original.svg",
20    html_favicon_url = "https://raw.githubusercontent.com/apache/datafusion/19fe44cf2f30cbdd63d4a4f52c74055163c6cc38/docs/logos/standalone_logo/logo_original.svg"
21)]
22#![cfg_attr(docsrs, feature(doc_auto_cfg))]
23// Make sure fast / cheap clones on Arc are explicit:
24// https://github.com/apache/datafusion/issues/11143
25#![cfg_attr(not(test), deny(clippy::clone_on_ref_ptr))]
26
27//! A table that uses the `ObjectStore` listing capability
28//! to get the list of files to process.
29
30pub mod decoder;
31pub mod display;
32pub mod file;
33pub mod file_compression_type;
34pub mod file_format;
35pub mod file_groups;
36pub mod file_meta;
37pub mod file_scan_config;
38pub mod file_sink_config;
39pub mod file_stream;
40pub mod memory;
41pub mod schema_adapter;
42pub mod sink;
43pub mod source;
44mod statistics;
45
46#[cfg(test)]
47pub mod test_util;
48
49pub mod url;
50pub mod write;
51pub use self::file::as_file_source;
52pub use self::url::ListingTableUrl;
53use crate::file_groups::FileGroup;
54use chrono::TimeZone;
55use datafusion_common::stats::Precision;
56use datafusion_common::{exec_datafusion_err, ColumnStatistics, Result};
57use datafusion_common::{ScalarValue, Statistics};
58use file_meta::FileMeta;
59use futures::{Stream, StreamExt};
60use object_store::{path::Path, ObjectMeta};
61use object_store::{GetOptions, GetRange, ObjectStore};
62// Remove when add_row_stats is remove
63#[allow(deprecated)]
64pub use statistics::add_row_stats;
65pub use statistics::compute_all_files_statistics;
66use std::ops::Range;
67use std::pin::Pin;
68use std::sync::Arc;
69
70/// Stream of files get listed from object store
71pub type PartitionedFileStream =
72    Pin<Box<dyn Stream<Item = Result<PartitionedFile>> + Send + Sync + 'static>>;
73
74/// Only scan a subset of Row Groups from the Parquet file whose data "midpoint"
75/// lies within the [start, end) byte offsets. This option can be used to scan non-overlapping
76/// sections of a Parquet file in parallel.
77#[derive(Debug, Clone, PartialEq, Hash, Eq, PartialOrd, Ord)]
78pub struct FileRange {
79    /// Range start
80    pub start: i64,
81    /// Range end
82    pub end: i64,
83}
84
85impl FileRange {
86    /// returns true if this file range contains the specified offset
87    pub fn contains(&self, offset: i64) -> bool {
88        offset >= self.start && offset < self.end
89    }
90}
91
92#[derive(Debug, Clone)]
93/// A single file or part of a file that should be read, along with its schema, statistics
94/// and partition column values that need to be appended to each row.
95pub struct PartitionedFile {
96    /// Path for the file (e.g. URL, filesystem path, etc)
97    pub object_meta: ObjectMeta,
98    /// Values of partition columns to be appended to each row.
99    ///
100    /// These MUST have the same count, order, and type than the [`table_partition_cols`].
101    ///
102    /// You may use [`wrap_partition_value_in_dict`] to wrap them if you have used [`wrap_partition_type_in_dict`] to wrap the column type.
103    ///
104    ///
105    /// [`wrap_partition_type_in_dict`]: https://github.com/apache/datafusion/blob/main/datafusion/core/src/datasource/physical_plan/file_scan_config.rs#L55
106    /// [`wrap_partition_value_in_dict`]: https://github.com/apache/datafusion/blob/main/datafusion/core/src/datasource/physical_plan/file_scan_config.rs#L62
107    /// [`table_partition_cols`]: https://github.com/apache/datafusion/blob/main/datafusion/core/src/datasource/file_format/options.rs#L190
108    pub partition_values: Vec<ScalarValue>,
109    /// An optional file range for a more fine-grained parallel execution
110    pub range: Option<FileRange>,
111    /// Optional statistics that describe the data in this file if known.
112    ///
113    /// DataFusion relies on these statistics for planning (in particular to sort file groups),
114    /// so if they are incorrect, incorrect answers may result.
115    pub statistics: Option<Arc<Statistics>>,
116    /// An optional field for user defined per object metadata
117    pub extensions: Option<Arc<dyn std::any::Any + Send + Sync>>,
118    /// The estimated size of the parquet metadata, in bytes
119    pub metadata_size_hint: Option<usize>,
120}
121
122impl PartitionedFile {
123    /// Create a simple file without metadata or partition
124    pub fn new(path: impl Into<String>, size: u64) -> Self {
125        Self {
126            object_meta: ObjectMeta {
127                location: Path::from(path.into()),
128                last_modified: chrono::Utc.timestamp_nanos(0),
129                size,
130                e_tag: None,
131                version: None,
132            },
133            partition_values: vec![],
134            range: None,
135            statistics: None,
136            extensions: None,
137            metadata_size_hint: None,
138        }
139    }
140
141    /// Create a file range without metadata or partition
142    pub fn new_with_range(path: String, size: u64, start: i64, end: i64) -> Self {
143        Self {
144            object_meta: ObjectMeta {
145                location: Path::from(path),
146                last_modified: chrono::Utc.timestamp_nanos(0),
147                size,
148                e_tag: None,
149                version: None,
150            },
151            partition_values: vec![],
152            range: Some(FileRange { start, end }),
153            statistics: None,
154            extensions: None,
155            metadata_size_hint: None,
156        }
157        .with_range(start, end)
158    }
159
160    /// Provide a hint to the size of the file metadata. If a hint is provided
161    /// the reader will try and fetch the last `size_hint` bytes of the parquet file optimistically.
162    /// Without an appropriate hint, two read may be required to fetch the metadata.
163    pub fn with_metadata_size_hint(mut self, metadata_size_hint: usize) -> Self {
164        self.metadata_size_hint = Some(metadata_size_hint);
165        self
166    }
167
168    /// Return a file reference from the given path
169    pub fn from_path(path: String) -> Result<Self> {
170        let size = std::fs::metadata(path.clone())?.len();
171        Ok(Self::new(path, size))
172    }
173
174    /// Return the path of this partitioned file
175    pub fn path(&self) -> &Path {
176        &self.object_meta.location
177    }
178
179    /// Update the file to only scan the specified range (in bytes)
180    pub fn with_range(mut self, start: i64, end: i64) -> Self {
181        self.range = Some(FileRange { start, end });
182        self
183    }
184
185    /// Update the user defined extensions for this file.
186    ///
187    /// This can be used to pass reader specific information.
188    pub fn with_extensions(
189        mut self,
190        extensions: Arc<dyn std::any::Any + Send + Sync>,
191    ) -> Self {
192        self.extensions = Some(extensions);
193        self
194    }
195
196    // Update the statistics for this file.
197    pub fn with_statistics(mut self, statistics: Arc<Statistics>) -> Self {
198        self.statistics = Some(statistics);
199        self
200    }
201}
202
203impl From<ObjectMeta> for PartitionedFile {
204    fn from(object_meta: ObjectMeta) -> Self {
205        PartitionedFile {
206            object_meta,
207            partition_values: vec![],
208            range: None,
209            statistics: None,
210            extensions: None,
211            metadata_size_hint: None,
212        }
213    }
214}
215
216/// Represents the possible outcomes of a range calculation.
217///
218/// This enum is used to encapsulate the result of calculating the range of
219/// bytes to read from an object (like a file) in an object store.
220///
221/// Variants:
222/// - `Range(Option<Range<usize>>)`:
223///   Represents a range of bytes to be read. It contains an `Option` wrapping a
224///   `Range<usize>`. `None` signifies that the entire object should be read,
225///   while `Some(range)` specifies the exact byte range to read.
226/// - `TerminateEarly`:
227///   Indicates that the range calculation determined no further action is
228///   necessary, possibly because the calculated range is empty or invalid.
229pub enum RangeCalculation {
230    Range(Option<Range<u64>>),
231    TerminateEarly,
232}
233
234/// Calculates an appropriate byte range for reading from an object based on the
235/// provided metadata.
236///
237/// This asynchronous function examines the `FileMeta` of an object in an object store
238/// and determines the range of bytes to be read. The range calculation may adjust
239/// the start and end points to align with meaningful data boundaries (like newlines).
240///
241/// Returns a `Result` wrapping a `RangeCalculation`, which is either a calculated byte range or an indication to terminate early.
242///
243/// Returns an `Error` if any part of the range calculation fails, such as issues in reading from the object store or invalid range boundaries.
244pub async fn calculate_range(
245    file_meta: &FileMeta,
246    store: &Arc<dyn ObjectStore>,
247    terminator: Option<u8>,
248) -> Result<RangeCalculation> {
249    let location = file_meta.location();
250    let file_size = file_meta.object_meta.size;
251    let newline = terminator.unwrap_or(b'\n');
252
253    match file_meta.range {
254        None => Ok(RangeCalculation::Range(None)),
255        Some(FileRange { start, end }) => {
256            let start: u64 = start.try_into().map_err(|_| {
257                exec_datafusion_err!("Expect start range to fit in u64, got {start}")
258            })?;
259            let end: u64 = end.try_into().map_err(|_| {
260                exec_datafusion_err!("Expect end range to fit in u64, got {end}")
261            })?;
262
263            let start_delta = if start != 0 {
264                find_first_newline(store, location, start - 1, file_size, newline).await?
265            } else {
266                0
267            };
268
269            let end_delta = if end != file_size {
270                find_first_newline(store, location, end - 1, file_size, newline).await?
271            } else {
272                0
273            };
274
275            let range = start + start_delta..end + end_delta;
276
277            if range.start == range.end {
278                return Ok(RangeCalculation::TerminateEarly);
279            }
280
281            Ok(RangeCalculation::Range(Some(range)))
282        }
283    }
284}
285
286/// Asynchronously finds the position of the first newline character in a specified byte range
287/// within an object, such as a file, in an object store.
288///
289/// This function scans the contents of the object starting from the specified `start` position
290/// up to the `end` position, looking for the first occurrence of a newline character.
291/// It returns the position of the first newline relative to the start of the range.
292///
293/// Returns a `Result` wrapping a `usize` that represents the position of the first newline character found within the specified range. If no newline is found, it returns the length of the scanned data, effectively indicating the end of the range.
294///
295/// The function returns an `Error` if any issues arise while reading from the object store or processing the data stream.
296///
297async fn find_first_newline(
298    object_store: &Arc<dyn ObjectStore>,
299    location: &Path,
300    start: u64,
301    end: u64,
302    newline: u8,
303) -> Result<u64> {
304    let options = GetOptions {
305        range: Some(GetRange::Bounded(start..end)),
306        ..Default::default()
307    };
308
309    let result = object_store.get_opts(location, options).await?;
310    let mut result_stream = result.into_stream();
311
312    let mut index = 0;
313
314    while let Some(chunk) = result_stream.next().await.transpose()? {
315        if let Some(position) = chunk.iter().position(|&byte| byte == newline) {
316            let position = position as u64;
317            return Ok(index + position);
318        }
319
320        index += chunk.len() as u64;
321    }
322
323    Ok(index)
324}
325
326/// Generates test files with min-max statistics in different overlap patterns.
327///
328/// Used by tests and benchmarks.
329///
330/// # Overlap Factors
331///
332/// The `overlap_factor` parameter controls how much the value ranges in generated test files overlap:
333/// - `0.0`: No overlap between files (completely disjoint ranges)
334/// - `0.2`: Low overlap (20% of the range size overlaps with adjacent files)
335/// - `0.5`: Medium overlap (50% of ranges overlap)
336/// - `0.8`: High overlap (80% of ranges overlap between files)
337///
338/// # Examples
339///
340/// With 5 files and different overlap factors showing `[min, max]` ranges:
341///
342/// overlap_factor = 0.0 (no overlap):
343///
344/// File 0: [0, 20]
345/// File 1: [20, 40]
346/// File 2: [40, 60]
347/// File 3: [60, 80]
348/// File 4: [80, 100]
349///
350/// overlap_factor = 0.5 (50% overlap):
351///
352/// File 0: [0, 40]
353/// File 1: [20, 60]
354/// File 2: [40, 80]
355/// File 3: [60, 100]
356/// File 4: [80, 120]
357///
358/// overlap_factor = 0.8 (80% overlap):
359///
360/// File 0: [0, 100]
361/// File 1: [20, 120]
362/// File 2: [40, 140]
363/// File 3: [60, 160]
364/// File 4: [80, 180]
365pub fn generate_test_files(num_files: usize, overlap_factor: f64) -> Vec<FileGroup> {
366    let mut files = Vec::with_capacity(num_files);
367    if num_files == 0 {
368        return vec![];
369    }
370    let range_size = if overlap_factor == 0.0 {
371        100 / num_files as i64
372    } else {
373        (100.0 / (overlap_factor * num_files as f64)).max(1.0) as i64
374    };
375
376    for i in 0..num_files {
377        let base = (i as f64 * range_size as f64 * (1.0 - overlap_factor)) as i64;
378        let min = base as f64;
379        let max = (base + range_size) as f64;
380
381        let file = PartitionedFile {
382            object_meta: ObjectMeta {
383                location: Path::from(format!("file_{i}.parquet")),
384                last_modified: chrono::Utc::now(),
385                size: 1000,
386                e_tag: None,
387                version: None,
388            },
389            partition_values: vec![],
390            range: None,
391            statistics: Some(Arc::new(Statistics {
392                num_rows: Precision::Exact(100),
393                total_byte_size: Precision::Exact(1000),
394                column_statistics: vec![ColumnStatistics {
395                    null_count: Precision::Exact(0),
396                    max_value: Precision::Exact(ScalarValue::Float64(Some(max))),
397                    min_value: Precision::Exact(ScalarValue::Float64(Some(min))),
398                    sum_value: Precision::Absent,
399                    distinct_count: Precision::Absent,
400                }],
401            })),
402            extensions: None,
403            metadata_size_hint: None,
404        };
405        files.push(file);
406    }
407
408    vec![FileGroup::new(files)]
409}
410
411// Helper function to verify that files within each group maintain sort order
412/// Used by tests and benchmarks
413pub fn verify_sort_integrity(file_groups: &[FileGroup]) -> bool {
414    for group in file_groups {
415        let files = group.iter().collect::<Vec<_>>();
416        for i in 1..files.len() {
417            let prev_file = files[i - 1];
418            let curr_file = files[i];
419
420            // Check if the min value of current file is greater than max value of previous file
421            if let (Some(prev_stats), Some(curr_stats)) =
422                (&prev_file.statistics, &curr_file.statistics)
423            {
424                let prev_max = &prev_stats.column_statistics[0].max_value;
425                let curr_min = &curr_stats.column_statistics[0].min_value;
426                if curr_min.get_value().unwrap() <= prev_max.get_value().unwrap() {
427                    return false;
428                }
429            }
430        }
431    }
432    true
433}
434
435#[cfg(test)]
436mod tests {
437    use super::ListingTableUrl;
438    use arrow::{
439        array::{ArrayRef, Int32Array, RecordBatch},
440        datatypes::{DataType, Field, Schema, SchemaRef},
441    };
442    use datafusion_execution::object_store::{
443        DefaultObjectStoreRegistry, ObjectStoreRegistry,
444    };
445    use object_store::{local::LocalFileSystem, path::Path};
446    use std::{collections::HashMap, ops::Not, sync::Arc};
447    use url::Url;
448
449    /// Return a RecordBatch with a single Int32 array with values (0..sz) in a field named "i"
450    pub fn make_partition(sz: i32) -> RecordBatch {
451        let seq_start = 0;
452        let seq_end = sz;
453        let values = (seq_start..seq_end).collect::<Vec<_>>();
454        let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
455        let arr = Arc::new(Int32Array::from(values));
456
457        RecordBatch::try_new(schema, vec![arr as ArrayRef]).unwrap()
458    }
459
460    /// Get the schema for the aggregate_test_* csv files
461    pub fn aggr_test_schema() -> SchemaRef {
462        let mut f1 = Field::new("c1", DataType::Utf8, false);
463        f1.set_metadata(HashMap::from_iter(vec![("testing".into(), "test".into())]));
464        let schema = Schema::new(vec![
465            f1,
466            Field::new("c2", DataType::UInt32, false),
467            Field::new("c3", DataType::Int8, false),
468            Field::new("c4", DataType::Int16, false),
469            Field::new("c5", DataType::Int32, false),
470            Field::new("c6", DataType::Int64, false),
471            Field::new("c7", DataType::UInt8, false),
472            Field::new("c8", DataType::UInt16, false),
473            Field::new("c9", DataType::UInt32, false),
474            Field::new("c10", DataType::UInt64, false),
475            Field::new("c11", DataType::Float32, false),
476            Field::new("c12", DataType::Float64, false),
477            Field::new("c13", DataType::Utf8, false),
478        ]);
479
480        Arc::new(schema)
481    }
482
483    #[test]
484    fn test_object_store_listing_url() {
485        let listing = ListingTableUrl::parse("file:///").unwrap();
486        let store = listing.object_store();
487        assert_eq!(store.as_str(), "file:///");
488
489        let listing = ListingTableUrl::parse("s3://bucket/").unwrap();
490        let store = listing.object_store();
491        assert_eq!(store.as_str(), "s3://bucket/");
492    }
493
494    #[test]
495    fn test_get_store_hdfs() {
496        let sut = DefaultObjectStoreRegistry::default();
497        let url = Url::parse("hdfs://localhost:8020").unwrap();
498        sut.register_store(&url, Arc::new(LocalFileSystem::new()));
499        let url = ListingTableUrl::parse("hdfs://localhost:8020/key").unwrap();
500        sut.get_store(url.as_ref()).unwrap();
501    }
502
503    #[test]
504    fn test_get_store_s3() {
505        let sut = DefaultObjectStoreRegistry::default();
506        let url = Url::parse("s3://bucket/key").unwrap();
507        sut.register_store(&url, Arc::new(LocalFileSystem::new()));
508        let url = ListingTableUrl::parse("s3://bucket/key").unwrap();
509        sut.get_store(url.as_ref()).unwrap();
510    }
511
512    #[test]
513    fn test_get_store_file() {
514        let sut = DefaultObjectStoreRegistry::default();
515        let url = ListingTableUrl::parse("file:///bucket/key").unwrap();
516        sut.get_store(url.as_ref()).unwrap();
517    }
518
519    #[test]
520    fn test_get_store_local() {
521        let sut = DefaultObjectStoreRegistry::default();
522        let url = ListingTableUrl::parse("../").unwrap();
523        sut.get_store(url.as_ref()).unwrap();
524    }
525
526    #[test]
527    fn test_url_contains() {
528        let url = ListingTableUrl::parse("file:///var/data/mytable/").unwrap();
529
530        // standard case with default config
531        assert!(url.contains(
532            &Path::parse("/var/data/mytable/data.parquet").unwrap(),
533            true
534        ));
535
536        // standard case with `ignore_subdirectory` set to false
537        assert!(url.contains(
538            &Path::parse("/var/data/mytable/data.parquet").unwrap(),
539            false
540        ));
541
542        // as per documentation, when `ignore_subdirectory` is true, we should ignore files that aren't
543        // a direct child of the `url`
544        assert!(url
545            .contains(
546                &Path::parse("/var/data/mytable/mysubfolder/data.parquet").unwrap(),
547                true
548            )
549            .not());
550
551        // when we set `ignore_subdirectory` to false, we should not ignore the file
552        assert!(url.contains(
553            &Path::parse("/var/data/mytable/mysubfolder/data.parquet").unwrap(),
554            false
555        ));
556
557        // as above, `ignore_subdirectory` is false, so we include the file
558        assert!(url.contains(
559            &Path::parse("/var/data/mytable/year=2024/data.parquet").unwrap(),
560            false
561        ));
562
563        // in this case, we include the file even when `ignore_subdirectory` is true because the
564        // path segment is a hive partition which doesn't count as a subdirectory for the purposes
565        // of `Url::contains`
566        assert!(url.contains(
567            &Path::parse("/var/data/mytable/year=2024/data.parquet").unwrap(),
568            true
569        ));
570
571        // testing an empty path with default config
572        assert!(url.contains(&Path::parse("/var/data/mytable/").unwrap(), true));
573
574        // testing an empty path with `ignore_subdirectory` set to false
575        assert!(url.contains(&Path::parse("/var/data/mytable/").unwrap(), false));
576    }
577}