datafusion_datasource/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#![doc(
19    html_logo_url = "https://raw.githubusercontent.com/apache/datafusion/19fe44cf2f30cbdd63d4a4f52c74055163c6cc38/docs/logos/standalone_logo/logo_original.svg",
20    html_favicon_url = "https://raw.githubusercontent.com/apache/datafusion/19fe44cf2f30cbdd63d4a4f52c74055163c6cc38/docs/logos/standalone_logo/logo_original.svg"
21)]
22#![cfg_attr(docsrs, feature(doc_auto_cfg))]
23// Make sure fast / cheap clones on Arc are explicit:
24// https://github.com/apache/datafusion/issues/11143
25#![cfg_attr(not(test), deny(clippy::clone_on_ref_ptr))]
26
27//! A table that uses the `ObjectStore` listing capability
28//! to get the list of files to process.
29
30pub mod decoder;
31pub mod display;
32pub mod file;
33pub mod file_compression_type;
34pub mod file_format;
35pub mod file_groups;
36pub mod file_meta;
37pub mod file_scan_config;
38pub mod file_sink_config;
39pub mod file_stream;
40pub mod memory;
41pub mod schema_adapter;
42pub mod sink;
43pub mod source;
44mod statistics;
45
46#[cfg(test)]
47pub mod test_util;
48
49pub mod url;
50pub mod write;
51pub use self::file::as_file_source;
52pub use self::url::ListingTableUrl;
53use crate::file_groups::FileGroup;
54use chrono::TimeZone;
55use datafusion_common::stats::Precision;
56use datafusion_common::{exec_datafusion_err, ColumnStatistics, Result};
57use datafusion_common::{ScalarValue, Statistics};
58use file_meta::FileMeta;
59use futures::{Stream, StreamExt};
60use object_store::{path::Path, ObjectMeta};
61use object_store::{GetOptions, GetRange, ObjectStore};
62// Remove when add_row_stats is remove
63#[allow(deprecated)]
64pub use statistics::add_row_stats;
65pub use statistics::compute_all_files_statistics;
66use std::ops::Range;
67use std::pin::Pin;
68use std::sync::Arc;
69
70/// Stream of files get listed from object store
71pub type PartitionedFileStream =
72    Pin<Box<dyn Stream<Item = Result<PartitionedFile>> + Send + Sync + 'static>>;
73
74/// Only scan a subset of Row Groups from the Parquet file whose data "midpoint"
75/// lies within the [start, end) byte offsets. This option can be used to scan non-overlapping
76/// sections of a Parquet file in parallel.
77#[derive(Debug, Clone, PartialEq, Hash, Eq, PartialOrd, Ord)]
78pub struct FileRange {
79    /// Range start
80    pub start: i64,
81    /// Range end
82    pub end: i64,
83}
84
85impl FileRange {
86    /// returns true if this file range contains the specified offset
87    pub fn contains(&self, offset: i64) -> bool {
88        offset >= self.start && offset < self.end
89    }
90}
91
92#[derive(Debug, Clone)]
93/// A single file or part of a file that should be read, along with its schema, statistics
94/// and partition column values that need to be appended to each row.
95pub struct PartitionedFile {
96    /// Path for the file (e.g. URL, filesystem path, etc)
97    pub object_meta: ObjectMeta,
98    /// Values of partition columns to be appended to each row.
99    ///
100    /// These MUST have the same count, order, and type than the [`table_partition_cols`].
101    ///
102    /// You may use [`wrap_partition_value_in_dict`] to wrap them if you have used [`wrap_partition_type_in_dict`] to wrap the column type.
103    ///
104    ///
105    /// [`wrap_partition_type_in_dict`]: https://github.com/apache/datafusion/blob/main/datafusion/core/src/datasource/physical_plan/file_scan_config.rs#L55
106    /// [`wrap_partition_value_in_dict`]: https://github.com/apache/datafusion/blob/main/datafusion/core/src/datasource/physical_plan/file_scan_config.rs#L62
107    /// [`table_partition_cols`]: https://github.com/apache/datafusion/blob/main/datafusion/core/src/datasource/file_format/options.rs#L190
108    pub partition_values: Vec<ScalarValue>,
109    /// An optional file range for a more fine-grained parallel execution
110    pub range: Option<FileRange>,
111    /// Optional statistics that describe the data in this file if known.
112    ///
113    /// DataFusion relies on these statistics for planning (in particular to sort file groups),
114    /// so if they are incorrect, incorrect answers may result.
115    pub statistics: Option<Arc<Statistics>>,
116    /// An optional field for user defined per object metadata
117    pub extensions: Option<Arc<dyn std::any::Any + Send + Sync>>,
118    /// The estimated size of the parquet metadata, in bytes
119    pub metadata_size_hint: Option<usize>,
120}
121
122impl PartitionedFile {
123    /// Create a simple file without metadata or partition
124    pub fn new(path: impl Into<String>, size: u64) -> Self {
125        Self {
126            object_meta: ObjectMeta {
127                location: Path::from(path.into()),
128                last_modified: chrono::Utc.timestamp_nanos(0),
129                size,
130                e_tag: None,
131                version: None,
132            },
133            partition_values: vec![],
134            range: None,
135            statistics: None,
136            extensions: None,
137            metadata_size_hint: None,
138        }
139    }
140
141    /// Create a file range without metadata or partition
142    pub fn new_with_range(path: String, size: u64, start: i64, end: i64) -> Self {
143        Self {
144            object_meta: ObjectMeta {
145                location: Path::from(path),
146                last_modified: chrono::Utc.timestamp_nanos(0),
147                size,
148                e_tag: None,
149                version: None,
150            },
151            partition_values: vec![],
152            range: Some(FileRange { start, end }),
153            statistics: None,
154            extensions: None,
155            metadata_size_hint: None,
156        }
157        .with_range(start, end)
158    }
159
160    /// Provide a hint to the size of the file metadata. If a hint is provided
161    /// the reader will try and fetch the last `size_hint` bytes of the parquet file optimistically.
162    /// Without an appropriate hint, two read may be required to fetch the metadata.
163    pub fn with_metadata_size_hint(mut self, metadata_size_hint: usize) -> Self {
164        self.metadata_size_hint = Some(metadata_size_hint);
165        self
166    }
167
168    /// Return a file reference from the given path
169    pub fn from_path(path: String) -> Result<Self> {
170        let size = std::fs::metadata(path.clone())?.len();
171        Ok(Self::new(path, size))
172    }
173
174    /// Return the path of this partitioned file
175    pub fn path(&self) -> &Path {
176        &self.object_meta.location
177    }
178
179    /// Update the file to only scan the specified range (in bytes)
180    pub fn with_range(mut self, start: i64, end: i64) -> Self {
181        self.range = Some(FileRange { start, end });
182        self
183    }
184
185    /// Update the user defined extensions for this file.
186    ///
187    /// This can be used to pass reader specific information.
188    pub fn with_extensions(
189        mut self,
190        extensions: Arc<dyn std::any::Any + Send + Sync>,
191    ) -> Self {
192        self.extensions = Some(extensions);
193        self
194    }
195
196    // Update the statistics for this file.
197    pub fn with_statistics(mut self, statistics: Arc<Statistics>) -> Self {
198        self.statistics = Some(statistics);
199        self
200    }
201
202    /// Check if this file has any statistics.
203    /// This returns `true` if the file has any Exact or Inexact statistics
204    /// and `false` if all statistics are `Precision::Absent`.
205    pub fn has_statistics(&self) -> bool {
206        if let Some(stats) = &self.statistics {
207            stats.column_statistics.iter().any(|col_stats| {
208                col_stats.null_count != Precision::Absent
209                    || col_stats.max_value != Precision::Absent
210                    || col_stats.min_value != Precision::Absent
211                    || col_stats.sum_value != Precision::Absent
212                    || col_stats.distinct_count != Precision::Absent
213            })
214        } else {
215            false
216        }
217    }
218}
219
220impl From<ObjectMeta> for PartitionedFile {
221    fn from(object_meta: ObjectMeta) -> Self {
222        PartitionedFile {
223            object_meta,
224            partition_values: vec![],
225            range: None,
226            statistics: None,
227            extensions: None,
228            metadata_size_hint: None,
229        }
230    }
231}
232
233/// Represents the possible outcomes of a range calculation.
234///
235/// This enum is used to encapsulate the result of calculating the range of
236/// bytes to read from an object (like a file) in an object store.
237///
238/// Variants:
239/// - `Range(Option<Range<usize>>)`:
240///   Represents a range of bytes to be read. It contains an `Option` wrapping a
241///   `Range<usize>`. `None` signifies that the entire object should be read,
242///   while `Some(range)` specifies the exact byte range to read.
243/// - `TerminateEarly`:
244///   Indicates that the range calculation determined no further action is
245///   necessary, possibly because the calculated range is empty or invalid.
246pub enum RangeCalculation {
247    Range(Option<Range<u64>>),
248    TerminateEarly,
249}
250
251/// Calculates an appropriate byte range for reading from an object based on the
252/// provided metadata.
253///
254/// This asynchronous function examines the `FileMeta` of an object in an object store
255/// and determines the range of bytes to be read. The range calculation may adjust
256/// the start and end points to align with meaningful data boundaries (like newlines).
257///
258/// Returns a `Result` wrapping a `RangeCalculation`, which is either a calculated byte range or an indication to terminate early.
259///
260/// Returns an `Error` if any part of the range calculation fails, such as issues in reading from the object store or invalid range boundaries.
261pub async fn calculate_range(
262    file_meta: &FileMeta,
263    store: &Arc<dyn ObjectStore>,
264    terminator: Option<u8>,
265) -> Result<RangeCalculation> {
266    let location = file_meta.location();
267    let file_size = file_meta.object_meta.size;
268    let newline = terminator.unwrap_or(b'\n');
269
270    match file_meta.range {
271        None => Ok(RangeCalculation::Range(None)),
272        Some(FileRange { start, end }) => {
273            let start: u64 = start.try_into().map_err(|_| {
274                exec_datafusion_err!("Expect start range to fit in u64, got {start}")
275            })?;
276            let end: u64 = end.try_into().map_err(|_| {
277                exec_datafusion_err!("Expect end range to fit in u64, got {end}")
278            })?;
279
280            let start_delta = if start != 0 {
281                find_first_newline(store, location, start - 1, file_size, newline).await?
282            } else {
283                0
284            };
285
286            let end_delta = if end != file_size {
287                find_first_newline(store, location, end - 1, file_size, newline).await?
288            } else {
289                0
290            };
291
292            let range = start + start_delta..end + end_delta;
293
294            if range.start == range.end {
295                return Ok(RangeCalculation::TerminateEarly);
296            }
297
298            Ok(RangeCalculation::Range(Some(range)))
299        }
300    }
301}
302
303/// Asynchronously finds the position of the first newline character in a specified byte range
304/// within an object, such as a file, in an object store.
305///
306/// This function scans the contents of the object starting from the specified `start` position
307/// up to the `end` position, looking for the first occurrence of a newline character.
308/// It returns the position of the first newline relative to the start of the range.
309///
310/// Returns a `Result` wrapping a `usize` that represents the position of the first newline character found within the specified range. If no newline is found, it returns the length of the scanned data, effectively indicating the end of the range.
311///
312/// The function returns an `Error` if any issues arise while reading from the object store or processing the data stream.
313///
314async fn find_first_newline(
315    object_store: &Arc<dyn ObjectStore>,
316    location: &Path,
317    start: u64,
318    end: u64,
319    newline: u8,
320) -> Result<u64> {
321    let options = GetOptions {
322        range: Some(GetRange::Bounded(start..end)),
323        ..Default::default()
324    };
325
326    let result = object_store.get_opts(location, options).await?;
327    let mut result_stream = result.into_stream();
328
329    let mut index = 0;
330
331    while let Some(chunk) = result_stream.next().await.transpose()? {
332        if let Some(position) = chunk.iter().position(|&byte| byte == newline) {
333            let position = position as u64;
334            return Ok(index + position);
335        }
336
337        index += chunk.len() as u64;
338    }
339
340    Ok(index)
341}
342
343/// Generates test files with min-max statistics in different overlap patterns.
344///
345/// Used by tests and benchmarks.
346///
347/// # Overlap Factors
348///
349/// The `overlap_factor` parameter controls how much the value ranges in generated test files overlap:
350/// - `0.0`: No overlap between files (completely disjoint ranges)
351/// - `0.2`: Low overlap (20% of the range size overlaps with adjacent files)
352/// - `0.5`: Medium overlap (50% of ranges overlap)
353/// - `0.8`: High overlap (80% of ranges overlap between files)
354///
355/// # Examples
356///
357/// With 5 files and different overlap factors showing `[min, max]` ranges:
358///
359/// overlap_factor = 0.0 (no overlap):
360///
361/// File 0: [0, 20]
362/// File 1: [20, 40]
363/// File 2: [40, 60]
364/// File 3: [60, 80]
365/// File 4: [80, 100]
366///
367/// overlap_factor = 0.5 (50% overlap):
368///
369/// File 0: [0, 40]
370/// File 1: [20, 60]
371/// File 2: [40, 80]
372/// File 3: [60, 100]
373/// File 4: [80, 120]
374///
375/// overlap_factor = 0.8 (80% overlap):
376///
377/// File 0: [0, 100]
378/// File 1: [20, 120]
379/// File 2: [40, 140]
380/// File 3: [60, 160]
381/// File 4: [80, 180]
382pub fn generate_test_files(num_files: usize, overlap_factor: f64) -> Vec<FileGroup> {
383    let mut files = Vec::with_capacity(num_files);
384    if num_files == 0 {
385        return vec![];
386    }
387    let range_size = if overlap_factor == 0.0 {
388        100 / num_files as i64
389    } else {
390        (100.0 / (overlap_factor * num_files as f64)).max(1.0) as i64
391    };
392
393    for i in 0..num_files {
394        let base = (i as f64 * range_size as f64 * (1.0 - overlap_factor)) as i64;
395        let min = base as f64;
396        let max = (base + range_size) as f64;
397
398        let file = PartitionedFile {
399            object_meta: ObjectMeta {
400                location: Path::from(format!("file_{i}.parquet")),
401                last_modified: chrono::Utc::now(),
402                size: 1000,
403                e_tag: None,
404                version: None,
405            },
406            partition_values: vec![],
407            range: None,
408            statistics: Some(Arc::new(Statistics {
409                num_rows: Precision::Exact(100),
410                total_byte_size: Precision::Exact(1000),
411                column_statistics: vec![ColumnStatistics {
412                    null_count: Precision::Exact(0),
413                    max_value: Precision::Exact(ScalarValue::Float64(Some(max))),
414                    min_value: Precision::Exact(ScalarValue::Float64(Some(min))),
415                    sum_value: Precision::Absent,
416                    distinct_count: Precision::Absent,
417                }],
418            })),
419            extensions: None,
420            metadata_size_hint: None,
421        };
422        files.push(file);
423    }
424
425    vec![FileGroup::new(files)]
426}
427
428// Helper function to verify that files within each group maintain sort order
429/// Used by tests and benchmarks
430pub fn verify_sort_integrity(file_groups: &[FileGroup]) -> bool {
431    for group in file_groups {
432        let files = group.iter().collect::<Vec<_>>();
433        for i in 1..files.len() {
434            let prev_file = files[i - 1];
435            let curr_file = files[i];
436
437            // Check if the min value of current file is greater than max value of previous file
438            if let (Some(prev_stats), Some(curr_stats)) =
439                (&prev_file.statistics, &curr_file.statistics)
440            {
441                let prev_max = &prev_stats.column_statistics[0].max_value;
442                let curr_min = &curr_stats.column_statistics[0].min_value;
443                if curr_min.get_value().unwrap() <= prev_max.get_value().unwrap() {
444                    return false;
445                }
446            }
447        }
448    }
449    true
450}
451
452#[cfg(test)]
453mod tests {
454    use super::ListingTableUrl;
455    use arrow::{
456        array::{ArrayRef, Int32Array, RecordBatch},
457        datatypes::{DataType, Field, Schema, SchemaRef},
458    };
459    use datafusion_execution::object_store::{
460        DefaultObjectStoreRegistry, ObjectStoreRegistry,
461    };
462    use object_store::{local::LocalFileSystem, path::Path};
463    use std::{collections::HashMap, ops::Not, sync::Arc};
464    use url::Url;
465
466    /// Return a RecordBatch with a single Int32 array with values (0..sz) in a field named "i"
467    pub fn make_partition(sz: i32) -> RecordBatch {
468        let seq_start = 0;
469        let seq_end = sz;
470        let values = (seq_start..seq_end).collect::<Vec<_>>();
471        let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
472        let arr = Arc::new(Int32Array::from(values));
473
474        RecordBatch::try_new(schema, vec![arr as ArrayRef]).unwrap()
475    }
476
477    /// Get the schema for the aggregate_test_* csv files
478    pub fn aggr_test_schema() -> SchemaRef {
479        let mut f1 = Field::new("c1", DataType::Utf8, false);
480        f1.set_metadata(HashMap::from_iter(vec![("testing".into(), "test".into())]));
481        let schema = Schema::new(vec![
482            f1,
483            Field::new("c2", DataType::UInt32, false),
484            Field::new("c3", DataType::Int8, false),
485            Field::new("c4", DataType::Int16, false),
486            Field::new("c5", DataType::Int32, false),
487            Field::new("c6", DataType::Int64, false),
488            Field::new("c7", DataType::UInt8, false),
489            Field::new("c8", DataType::UInt16, false),
490            Field::new("c9", DataType::UInt32, false),
491            Field::new("c10", DataType::UInt64, false),
492            Field::new("c11", DataType::Float32, false),
493            Field::new("c12", DataType::Float64, false),
494            Field::new("c13", DataType::Utf8, false),
495        ]);
496
497        Arc::new(schema)
498    }
499
500    #[test]
501    fn test_object_store_listing_url() {
502        let listing = ListingTableUrl::parse("file:///").unwrap();
503        let store = listing.object_store();
504        assert_eq!(store.as_str(), "file:///");
505
506        let listing = ListingTableUrl::parse("s3://bucket/").unwrap();
507        let store = listing.object_store();
508        assert_eq!(store.as_str(), "s3://bucket/");
509    }
510
511    #[test]
512    fn test_get_store_hdfs() {
513        let sut = DefaultObjectStoreRegistry::default();
514        let url = Url::parse("hdfs://localhost:8020").unwrap();
515        sut.register_store(&url, Arc::new(LocalFileSystem::new()));
516        let url = ListingTableUrl::parse("hdfs://localhost:8020/key").unwrap();
517        sut.get_store(url.as_ref()).unwrap();
518    }
519
520    #[test]
521    fn test_get_store_s3() {
522        let sut = DefaultObjectStoreRegistry::default();
523        let url = Url::parse("s3://bucket/key").unwrap();
524        sut.register_store(&url, Arc::new(LocalFileSystem::new()));
525        let url = ListingTableUrl::parse("s3://bucket/key").unwrap();
526        sut.get_store(url.as_ref()).unwrap();
527    }
528
529    #[test]
530    fn test_get_store_file() {
531        let sut = DefaultObjectStoreRegistry::default();
532        let url = ListingTableUrl::parse("file:///bucket/key").unwrap();
533        sut.get_store(url.as_ref()).unwrap();
534    }
535
536    #[test]
537    fn test_get_store_local() {
538        let sut = DefaultObjectStoreRegistry::default();
539        let url = ListingTableUrl::parse("../").unwrap();
540        sut.get_store(url.as_ref()).unwrap();
541    }
542
543    #[test]
544    fn test_url_contains() {
545        let url = ListingTableUrl::parse("file:///var/data/mytable/").unwrap();
546
547        // standard case with default config
548        assert!(url.contains(
549            &Path::parse("/var/data/mytable/data.parquet").unwrap(),
550            true
551        ));
552
553        // standard case with `ignore_subdirectory` set to false
554        assert!(url.contains(
555            &Path::parse("/var/data/mytable/data.parquet").unwrap(),
556            false
557        ));
558
559        // as per documentation, when `ignore_subdirectory` is true, we should ignore files that aren't
560        // a direct child of the `url`
561        assert!(url
562            .contains(
563                &Path::parse("/var/data/mytable/mysubfolder/data.parquet").unwrap(),
564                true
565            )
566            .not());
567
568        // when we set `ignore_subdirectory` to false, we should not ignore the file
569        assert!(url.contains(
570            &Path::parse("/var/data/mytable/mysubfolder/data.parquet").unwrap(),
571            false
572        ));
573
574        // as above, `ignore_subdirectory` is false, so we include the file
575        assert!(url.contains(
576            &Path::parse("/var/data/mytable/year=2024/data.parquet").unwrap(),
577            false
578        ));
579
580        // in this case, we include the file even when `ignore_subdirectory` is true because the
581        // path segment is a hive partition which doesn't count as a subdirectory for the purposes
582        // of `Url::contains`
583        assert!(url.contains(
584            &Path::parse("/var/data/mytable/year=2024/data.parquet").unwrap(),
585            true
586        ));
587
588        // testing an empty path with default config
589        assert!(url.contains(&Path::parse("/var/data/mytable/").unwrap(), true));
590
591        // testing an empty path with `ignore_subdirectory` set to false
592        assert!(url.contains(&Path::parse("/var/data/mytable/").unwrap(), false));
593    }
594}