datafusion_datasource/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#![doc(
19    html_logo_url = "https://raw.githubusercontent.com/apache/datafusion/19fe44cf2f30cbdd63d4a4f52c74055163c6cc38/docs/logos/standalone_logo/logo_original.svg",
20    html_favicon_url = "https://raw.githubusercontent.com/apache/datafusion/19fe44cf2f30cbdd63d4a4f52c74055163c6cc38/docs/logos/standalone_logo/logo_original.svg"
21)]
22#![cfg_attr(docsrs, feature(doc_cfg))]
23// Make sure fast / cheap clones on Arc are explicit:
24// https://github.com/apache/datafusion/issues/11143
25#![cfg_attr(not(test), deny(clippy::clone_on_ref_ptr))]
26
27//! A table that uses the `ObjectStore` listing capability
28//! to get the list of files to process.
29
30pub mod decoder;
31pub mod display;
32pub mod file;
33pub mod file_compression_type;
34pub mod file_format;
35pub mod file_groups;
36pub mod file_scan_config;
37pub mod file_sink_config;
38pub mod file_stream;
39pub mod memory;
40pub mod schema_adapter;
41pub mod sink;
42pub mod source;
43mod statistics;
44pub mod table_schema;
45
46#[cfg(test)]
47pub mod test_util;
48
49pub mod url;
50pub mod write;
51pub use self::file::as_file_source;
52pub use self::url::ListingTableUrl;
53use crate::file_groups::FileGroup;
54use chrono::TimeZone;
55use datafusion_common::stats::Precision;
56use datafusion_common::{exec_datafusion_err, ColumnStatistics, Result};
57use datafusion_common::{ScalarValue, Statistics};
58use futures::{Stream, StreamExt};
59use object_store::{path::Path, ObjectMeta};
60use object_store::{GetOptions, GetRange, ObjectStore};
61pub use table_schema::TableSchema;
62// Remove when add_row_stats is remove
63#[allow(deprecated)]
64pub use statistics::add_row_stats;
65pub use statistics::compute_all_files_statistics;
66use std::ops::Range;
67use std::pin::Pin;
68use std::sync::Arc;
69
70/// Stream of files get listed from object store
71pub type PartitionedFileStream =
72    Pin<Box<dyn Stream<Item = Result<PartitionedFile>> + Send + Sync + 'static>>;
73
74/// Only scan a subset of Row Groups from the Parquet file whose data "midpoint"
75/// lies within the [start, end) byte offsets. This option can be used to scan non-overlapping
76/// sections of a Parquet file in parallel.
77#[derive(Debug, Clone, PartialEq, Hash, Eq, PartialOrd, Ord)]
78pub struct FileRange {
79    /// Range start
80    pub start: i64,
81    /// Range end
82    pub end: i64,
83}
84
85impl FileRange {
86    /// returns true if this file range contains the specified offset
87    pub fn contains(&self, offset: i64) -> bool {
88        offset >= self.start && offset < self.end
89    }
90}
91
92#[derive(Debug, Clone)]
93/// A single file or part of a file that should be read, along with its schema, statistics
94/// and partition column values that need to be appended to each row.
95pub struct PartitionedFile {
96    /// Path for the file (e.g. URL, filesystem path, etc)
97    pub object_meta: ObjectMeta,
98    /// Values of partition columns to be appended to each row.
99    ///
100    /// These MUST have the same count, order, and type than the [`table_partition_cols`].
101    ///
102    /// You may use [`wrap_partition_value_in_dict`] to wrap them if you have used [`wrap_partition_type_in_dict`] to wrap the column type.
103    ///
104    ///
105    /// [`wrap_partition_type_in_dict`]: crate::file_scan_config::wrap_partition_type_in_dict
106    /// [`wrap_partition_value_in_dict`]: crate::file_scan_config::wrap_partition_value_in_dict
107    /// [`table_partition_cols`]: https://github.com/apache/datafusion/blob/main/datafusion/core/src/datasource/file_format/options.rs#L87
108    pub partition_values: Vec<ScalarValue>,
109    /// An optional file range for a more fine-grained parallel execution
110    pub range: Option<FileRange>,
111    /// Optional statistics that describe the data in this file if known.
112    ///
113    /// DataFusion relies on these statistics for planning (in particular to sort file groups),
114    /// so if they are incorrect, incorrect answers may result.
115    pub statistics: Option<Arc<Statistics>>,
116    /// An optional field for user defined per object metadata
117    pub extensions: Option<Arc<dyn std::any::Any + Send + Sync>>,
118    /// The estimated size of the parquet metadata, in bytes
119    pub metadata_size_hint: Option<usize>,
120}
121
122impl PartitionedFile {
123    /// Create a simple file without metadata or partition
124    pub fn new(path: impl Into<String>, size: u64) -> Self {
125        Self {
126            object_meta: ObjectMeta {
127                location: Path::from(path.into()),
128                last_modified: chrono::Utc.timestamp_nanos(0),
129                size,
130                e_tag: None,
131                version: None,
132            },
133            partition_values: vec![],
134            range: None,
135            statistics: None,
136            extensions: None,
137            metadata_size_hint: None,
138        }
139    }
140
141    /// Create a file range without metadata or partition
142    pub fn new_with_range(path: String, size: u64, start: i64, end: i64) -> Self {
143        Self {
144            object_meta: ObjectMeta {
145                location: Path::from(path),
146                last_modified: chrono::Utc.timestamp_nanos(0),
147                size,
148                e_tag: None,
149                version: None,
150            },
151            partition_values: vec![],
152            range: Some(FileRange { start, end }),
153            statistics: None,
154            extensions: None,
155            metadata_size_hint: None,
156        }
157        .with_range(start, end)
158    }
159
160    /// Provide a hint to the size of the file metadata. If a hint is provided
161    /// the reader will try and fetch the last `size_hint` bytes of the parquet file optimistically.
162    /// Without an appropriate hint, two read may be required to fetch the metadata.
163    pub fn with_metadata_size_hint(mut self, metadata_size_hint: usize) -> Self {
164        self.metadata_size_hint = Some(metadata_size_hint);
165        self
166    }
167
168    /// Return a file reference from the given path
169    pub fn from_path(path: String) -> Result<Self> {
170        let size = std::fs::metadata(path.clone())?.len();
171        Ok(Self::new(path, size))
172    }
173
174    /// Return the path of this partitioned file
175    pub fn path(&self) -> &Path {
176        &self.object_meta.location
177    }
178
179    /// Update the file to only scan the specified range (in bytes)
180    pub fn with_range(mut self, start: i64, end: i64) -> Self {
181        self.range = Some(FileRange { start, end });
182        self
183    }
184
185    /// Update the user defined extensions for this file.
186    ///
187    /// This can be used to pass reader specific information.
188    pub fn with_extensions(
189        mut self,
190        extensions: Arc<dyn std::any::Any + Send + Sync>,
191    ) -> Self {
192        self.extensions = Some(extensions);
193        self
194    }
195
196    // Update the statistics for this file.
197    pub fn with_statistics(mut self, statistics: Arc<Statistics>) -> Self {
198        self.statistics = Some(statistics);
199        self
200    }
201
202    /// Check if this file has any statistics.
203    /// This returns `true` if the file has any Exact or Inexact statistics
204    /// and `false` if all statistics are `Precision::Absent`.
205    pub fn has_statistics(&self) -> bool {
206        if let Some(stats) = &self.statistics {
207            stats.column_statistics.iter().any(|col_stats| {
208                col_stats.null_count != Precision::Absent
209                    || col_stats.max_value != Precision::Absent
210                    || col_stats.min_value != Precision::Absent
211                    || col_stats.sum_value != Precision::Absent
212                    || col_stats.distinct_count != Precision::Absent
213            })
214        } else {
215            false
216        }
217    }
218}
219
220impl From<ObjectMeta> for PartitionedFile {
221    fn from(object_meta: ObjectMeta) -> Self {
222        PartitionedFile {
223            object_meta,
224            partition_values: vec![],
225            range: None,
226            statistics: None,
227            extensions: None,
228            metadata_size_hint: None,
229        }
230    }
231}
232
233/// Represents the possible outcomes of a range calculation.
234///
235/// This enum is used to encapsulate the result of calculating the range of
236/// bytes to read from an object (like a file) in an object store.
237///
238/// Variants:
239/// - `Range(Option<Range<usize>>)`:
240///   Represents a range of bytes to be read. It contains an `Option` wrapping a
241///   `Range<usize>`. `None` signifies that the entire object should be read,
242///   while `Some(range)` specifies the exact byte range to read.
243/// - `TerminateEarly`:
244///   Indicates that the range calculation determined no further action is
245///   necessary, possibly because the calculated range is empty or invalid.
246pub enum RangeCalculation {
247    Range(Option<Range<u64>>),
248    TerminateEarly,
249}
250
251/// Calculates an appropriate byte range for reading from an object based on the
252/// provided metadata.
253///
254/// This asynchronous function examines the [`PartitionedFile`] of an object in an object store
255/// and determines the range of bytes to be read. The range calculation may adjust
256/// the start and end points to align with meaningful data boundaries (like newlines).
257///
258/// Returns a `Result` wrapping a [`RangeCalculation`], which is either a calculated byte range or an indication to terminate early.
259///
260/// Returns an `Error` if any part of the range calculation fails, such as issues in reading from the object store or invalid range boundaries.
261pub async fn calculate_range(
262    file: &PartitionedFile,
263    store: &Arc<dyn ObjectStore>,
264    terminator: Option<u8>,
265) -> Result<RangeCalculation> {
266    let location = &file.object_meta.location;
267    let file_size = file.object_meta.size;
268    let newline = terminator.unwrap_or(b'\n');
269
270    match file.range {
271        None => Ok(RangeCalculation::Range(None)),
272        Some(FileRange { start, end }) => {
273            let start: u64 = start.try_into().map_err(|_| {
274                exec_datafusion_err!("Expect start range to fit in u64, got {start}")
275            })?;
276            let end: u64 = end.try_into().map_err(|_| {
277                exec_datafusion_err!("Expect end range to fit in u64, got {end}")
278            })?;
279
280            let start_delta = if start != 0 {
281                find_first_newline(store, location, start - 1, file_size, newline).await?
282            } else {
283                0
284            };
285
286            let end_delta = if end != file_size {
287                find_first_newline(store, location, end - 1, file_size, newline).await?
288            } else {
289                0
290            };
291
292            let range = start + start_delta..end + end_delta;
293
294            if range.start == range.end {
295                return Ok(RangeCalculation::TerminateEarly);
296            }
297
298            Ok(RangeCalculation::Range(Some(range)))
299        }
300    }
301}
302
303/// Asynchronously finds the position of the first newline character in a specified byte range
304/// within an object, such as a file, in an object store.
305///
306/// This function scans the contents of the object starting from the specified `start` position
307/// up to the `end` position, looking for the first occurrence of a newline character.
308/// It returns the position of the first newline relative to the start of the range.
309///
310/// Returns a `Result` wrapping a `usize` that represents the position of the first newline character found within the specified range. If no newline is found, it returns the length of the scanned data, effectively indicating the end of the range.
311///
312/// The function returns an `Error` if any issues arise while reading from the object store or processing the data stream.
313async fn find_first_newline(
314    object_store: &Arc<dyn ObjectStore>,
315    location: &Path,
316    start: u64,
317    end: u64,
318    newline: u8,
319) -> Result<u64> {
320    let options = GetOptions {
321        range: Some(GetRange::Bounded(start..end)),
322        ..Default::default()
323    };
324
325    let result = object_store.get_opts(location, options).await?;
326    let mut result_stream = result.into_stream();
327
328    let mut index = 0;
329
330    while let Some(chunk) = result_stream.next().await.transpose()? {
331        if let Some(position) = chunk.iter().position(|&byte| byte == newline) {
332            let position = position as u64;
333            return Ok(index + position);
334        }
335
336        index += chunk.len() as u64;
337    }
338
339    Ok(index)
340}
341
342/// Generates test files with min-max statistics in different overlap patterns.
343///
344/// Used by tests and benchmarks.
345///
346/// # Overlap Factors
347///
348/// The `overlap_factor` parameter controls how much the value ranges in generated test files overlap:
349/// - `0.0`: No overlap between files (completely disjoint ranges)
350/// - `0.2`: Low overlap (20% of the range size overlaps with adjacent files)
351/// - `0.5`: Medium overlap (50% of ranges overlap)
352/// - `0.8`: High overlap (80% of ranges overlap between files)
353///
354/// # Examples
355///
356/// With 5 files and different overlap factors showing `[min, max]` ranges:
357///
358/// overlap_factor = 0.0 (no overlap):
359///
360/// File 0: [0, 20]
361/// File 1: [20, 40]
362/// File 2: [40, 60]
363/// File 3: [60, 80]
364/// File 4: [80, 100]
365///
366/// overlap_factor = 0.5 (50% overlap):
367///
368/// File 0: [0, 40]
369/// File 1: [20, 60]
370/// File 2: [40, 80]
371/// File 3: [60, 100]
372/// File 4: [80, 120]
373///
374/// overlap_factor = 0.8 (80% overlap):
375///
376/// File 0: [0, 100]
377/// File 1: [20, 120]
378/// File 2: [40, 140]
379/// File 3: [60, 160]
380/// File 4: [80, 180]
381pub fn generate_test_files(num_files: usize, overlap_factor: f64) -> Vec<FileGroup> {
382    let mut files = Vec::with_capacity(num_files);
383    if num_files == 0 {
384        return vec![];
385    }
386    let range_size = if overlap_factor == 0.0 {
387        100 / num_files as i64
388    } else {
389        (100.0 / (overlap_factor * num_files as f64)).max(1.0) as i64
390    };
391
392    for i in 0..num_files {
393        let base = (i as f64 * range_size as f64 * (1.0 - overlap_factor)) as i64;
394        let min = base as f64;
395        let max = (base + range_size) as f64;
396
397        let file = PartitionedFile {
398            object_meta: ObjectMeta {
399                location: Path::from(format!("file_{i}.parquet")),
400                last_modified: chrono::Utc::now(),
401                size: 1000,
402                e_tag: None,
403                version: None,
404            },
405            partition_values: vec![],
406            range: None,
407            statistics: Some(Arc::new(Statistics {
408                num_rows: Precision::Exact(100),
409                total_byte_size: Precision::Exact(1000),
410                column_statistics: vec![ColumnStatistics {
411                    null_count: Precision::Exact(0),
412                    max_value: Precision::Exact(ScalarValue::Float64(Some(max))),
413                    min_value: Precision::Exact(ScalarValue::Float64(Some(min))),
414                    sum_value: Precision::Absent,
415                    distinct_count: Precision::Absent,
416                }],
417            })),
418            extensions: None,
419            metadata_size_hint: None,
420        };
421        files.push(file);
422    }
423
424    vec![FileGroup::new(files)]
425}
426
427// Helper function to verify that files within each group maintain sort order
428/// Used by tests and benchmarks
429pub fn verify_sort_integrity(file_groups: &[FileGroup]) -> bool {
430    for group in file_groups {
431        let files = group.iter().collect::<Vec<_>>();
432        for i in 1..files.len() {
433            let prev_file = files[i - 1];
434            let curr_file = files[i];
435
436            // Check if the min value of current file is greater than max value of previous file
437            if let (Some(prev_stats), Some(curr_stats)) =
438                (&prev_file.statistics, &curr_file.statistics)
439            {
440                let prev_max = &prev_stats.column_statistics[0].max_value;
441                let curr_min = &curr_stats.column_statistics[0].min_value;
442                if curr_min.get_value().unwrap() <= prev_max.get_value().unwrap() {
443                    return false;
444                }
445            }
446        }
447    }
448    true
449}
450
451#[cfg(test)]
452mod tests {
453    use super::ListingTableUrl;
454    use arrow::{
455        array::{ArrayRef, Int32Array, RecordBatch},
456        datatypes::{DataType, Field, Schema, SchemaRef},
457    };
458    use datafusion_execution::object_store::{
459        DefaultObjectStoreRegistry, ObjectStoreRegistry,
460    };
461    use object_store::{local::LocalFileSystem, path::Path};
462    use std::{collections::HashMap, ops::Not, sync::Arc};
463    use url::Url;
464
465    /// Return a RecordBatch with a single Int32 array with values (0..sz) in a field named "i"
466    pub fn make_partition(sz: i32) -> RecordBatch {
467        let seq_start = 0;
468        let seq_end = sz;
469        let values = (seq_start..seq_end).collect::<Vec<_>>();
470        let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
471        let arr = Arc::new(Int32Array::from(values));
472
473        RecordBatch::try_new(schema, vec![arr as ArrayRef]).unwrap()
474    }
475
476    /// Get the schema for the aggregate_test_* csv files
477    pub fn aggr_test_schema() -> SchemaRef {
478        let mut f1 = Field::new("c1", DataType::Utf8, false);
479        f1.set_metadata(HashMap::from_iter(vec![("testing".into(), "test".into())]));
480        let schema = Schema::new(vec![
481            f1,
482            Field::new("c2", DataType::UInt32, false),
483            Field::new("c3", DataType::Int8, false),
484            Field::new("c4", DataType::Int16, false),
485            Field::new("c5", DataType::Int32, false),
486            Field::new("c6", DataType::Int64, false),
487            Field::new("c7", DataType::UInt8, false),
488            Field::new("c8", DataType::UInt16, false),
489            Field::new("c9", DataType::UInt32, false),
490            Field::new("c10", DataType::UInt64, false),
491            Field::new("c11", DataType::Float32, false),
492            Field::new("c12", DataType::Float64, false),
493            Field::new("c13", DataType::Utf8, false),
494        ]);
495
496        Arc::new(schema)
497    }
498
499    #[test]
500    fn test_object_store_listing_url() {
501        let listing = ListingTableUrl::parse("file:///").unwrap();
502        let store = listing.object_store();
503        assert_eq!(store.as_str(), "file:///");
504
505        let listing = ListingTableUrl::parse("s3://bucket/").unwrap();
506        let store = listing.object_store();
507        assert_eq!(store.as_str(), "s3://bucket/");
508    }
509
510    #[test]
511    fn test_get_store_hdfs() {
512        let sut = DefaultObjectStoreRegistry::default();
513        let url = Url::parse("hdfs://localhost:8020").unwrap();
514        sut.register_store(&url, Arc::new(LocalFileSystem::new()));
515        let url = ListingTableUrl::parse("hdfs://localhost:8020/key").unwrap();
516        sut.get_store(url.as_ref()).unwrap();
517    }
518
519    #[test]
520    fn test_get_store_s3() {
521        let sut = DefaultObjectStoreRegistry::default();
522        let url = Url::parse("s3://bucket/key").unwrap();
523        sut.register_store(&url, Arc::new(LocalFileSystem::new()));
524        let url = ListingTableUrl::parse("s3://bucket/key").unwrap();
525        sut.get_store(url.as_ref()).unwrap();
526    }
527
528    #[test]
529    fn test_get_store_file() {
530        let sut = DefaultObjectStoreRegistry::default();
531        let url = ListingTableUrl::parse("file:///bucket/key").unwrap();
532        sut.get_store(url.as_ref()).unwrap();
533    }
534
535    #[test]
536    fn test_get_store_local() {
537        let sut = DefaultObjectStoreRegistry::default();
538        let url = ListingTableUrl::parse("../").unwrap();
539        sut.get_store(url.as_ref()).unwrap();
540    }
541
542    #[test]
543    fn test_url_contains() {
544        let url = ListingTableUrl::parse("file:///var/data/mytable/").unwrap();
545
546        // standard case with default config
547        assert!(url.contains(
548            &Path::parse("/var/data/mytable/data.parquet").unwrap(),
549            true
550        ));
551
552        // standard case with `ignore_subdirectory` set to false
553        assert!(url.contains(
554            &Path::parse("/var/data/mytable/data.parquet").unwrap(),
555            false
556        ));
557
558        // as per documentation, when `ignore_subdirectory` is true, we should ignore files that aren't
559        // a direct child of the `url`
560        assert!(url
561            .contains(
562                &Path::parse("/var/data/mytable/mysubfolder/data.parquet").unwrap(),
563                true
564            )
565            .not());
566
567        // when we set `ignore_subdirectory` to false, we should not ignore the file
568        assert!(url.contains(
569            &Path::parse("/var/data/mytable/mysubfolder/data.parquet").unwrap(),
570            false
571        ));
572
573        // as above, `ignore_subdirectory` is false, so we include the file
574        assert!(url.contains(
575            &Path::parse("/var/data/mytable/year=2024/data.parquet").unwrap(),
576            false
577        ));
578
579        // in this case, we include the file even when `ignore_subdirectory` is true because the
580        // path segment is a hive partition which doesn't count as a subdirectory for the purposes
581        // of `Url::contains`
582        assert!(url.contains(
583            &Path::parse("/var/data/mytable/year=2024/data.parquet").unwrap(),
584            true
585        ));
586
587        // testing an empty path with default config
588        assert!(url.contains(&Path::parse("/var/data/mytable/").unwrap(), true));
589
590        // testing an empty path with `ignore_subdirectory` set to false
591        assert!(url.contains(&Path::parse("/var/data/mytable/").unwrap(), false));
592    }
593}