datafusion_datasource/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#![doc(
19    html_logo_url = "https://raw.githubusercontent.com/apache/datafusion/19fe44cf2f30cbdd63d4a4f52c74055163c6cc38/docs/logos/standalone_logo/logo_original.svg",
20    html_favicon_url = "https://raw.githubusercontent.com/apache/datafusion/19fe44cf2f30cbdd63d4a4f52c74055163c6cc38/docs/logos/standalone_logo/logo_original.svg"
21)]
22#![cfg_attr(docsrs, feature(doc_auto_cfg))]
23
24//! A table that uses the `ObjectStore` listing capability
25//! to get the list of files to process.
26
27pub mod display;
28pub mod file;
29pub mod file_compression_type;
30pub mod file_format;
31pub mod file_groups;
32pub mod file_meta;
33pub mod file_scan_config;
34pub mod file_sink_config;
35pub mod file_stream;
36pub mod memory;
37pub mod source;
38mod statistics;
39#[cfg(test)]
40mod test_util;
41pub mod url;
42pub mod write;
43use chrono::TimeZone;
44use datafusion_common::Result;
45use datafusion_common::{ScalarValue, Statistics};
46use futures::Stream;
47use object_store::{path::Path, ObjectMeta};
48use std::pin::Pin;
49use std::sync::Arc;
50
51pub use self::url::ListingTableUrl;
52
53/// Stream of files get listed from object store
54pub type PartitionedFileStream =
55    Pin<Box<dyn Stream<Item = Result<PartitionedFile>> + Send + Sync + 'static>>;
56
57/// Only scan a subset of Row Groups from the Parquet file whose data "midpoint"
58/// lies within the [start, end) byte offsets. This option can be used to scan non-overlapping
59/// sections of a Parquet file in parallel.
60#[derive(Debug, Clone, PartialEq, Hash, Eq, PartialOrd, Ord)]
61pub struct FileRange {
62    /// Range start
63    pub start: i64,
64    /// Range end
65    pub end: i64,
66}
67
68impl FileRange {
69    /// returns true if this file range contains the specified offset
70    pub fn contains(&self, offset: i64) -> bool {
71        offset >= self.start && offset < self.end
72    }
73}
74
75#[derive(Debug, Clone)]
76/// A single file or part of a file that should be read, along with its schema, statistics
77/// and partition column values that need to be appended to each row.
78pub struct PartitionedFile {
79    /// Path for the file (e.g. URL, filesystem path, etc)
80    pub object_meta: ObjectMeta,
81    /// Values of partition columns to be appended to each row.
82    ///
83    /// These MUST have the same count, order, and type than the [`table_partition_cols`].
84    ///
85    /// You may use [`wrap_partition_value_in_dict`] to wrap them if you have used [`wrap_partition_type_in_dict`] to wrap the column type.
86    ///
87    ///
88    /// [`wrap_partition_type_in_dict`]: https://github.com/apache/datafusion/blob/main/datafusion/core/src/datasource/physical_plan/file_scan_config.rs#L55
89    /// [`wrap_partition_value_in_dict`]: https://github.com/apache/datafusion/blob/main/datafusion/core/src/datasource/physical_plan/file_scan_config.rs#L62
90    /// [`table_partition_cols`]: https://github.com/apache/datafusion/blob/main/datafusion/core/src/datasource/file_format/options.rs#L190
91    pub partition_values: Vec<ScalarValue>,
92    /// An optional file range for a more fine-grained parallel execution
93    pub range: Option<FileRange>,
94    /// Optional statistics that describe the data in this file if known.
95    ///
96    /// DataFusion relies on these statistics for planning (in particular to sort file groups),
97    /// so if they are incorrect, incorrect answers may result.
98    pub statistics: Option<Statistics>,
99    /// An optional field for user defined per object metadata
100    pub extensions: Option<Arc<dyn std::any::Any + Send + Sync>>,
101    /// The estimated size of the parquet metadata, in bytes
102    pub metadata_size_hint: Option<usize>,
103}
104
105impl PartitionedFile {
106    /// Create a simple file without metadata or partition
107    pub fn new(path: impl Into<String>, size: u64) -> Self {
108        Self {
109            object_meta: ObjectMeta {
110                location: Path::from(path.into()),
111                last_modified: chrono::Utc.timestamp_nanos(0),
112                size: size as usize,
113                e_tag: None,
114                version: None,
115            },
116            partition_values: vec![],
117            range: None,
118            statistics: None,
119            extensions: None,
120            metadata_size_hint: None,
121        }
122    }
123
124    /// Create a file range without metadata or partition
125    pub fn new_with_range(path: String, size: u64, start: i64, end: i64) -> Self {
126        Self {
127            object_meta: ObjectMeta {
128                location: Path::from(path),
129                last_modified: chrono::Utc.timestamp_nanos(0),
130                size: size as usize,
131                e_tag: None,
132                version: None,
133            },
134            partition_values: vec![],
135            range: Some(FileRange { start, end }),
136            statistics: None,
137            extensions: None,
138            metadata_size_hint: None,
139        }
140        .with_range(start, end)
141    }
142
143    /// Provide a hint to the size of the file metadata. If a hint is provided
144    /// the reader will try and fetch the last `size_hint` bytes of the parquet file optimistically.
145    /// Without an appropriate hint, two read may be required to fetch the metadata.
146    pub fn with_metadata_size_hint(mut self, metadata_size_hint: usize) -> Self {
147        self.metadata_size_hint = Some(metadata_size_hint);
148        self
149    }
150
151    /// Return a file reference from the given path
152    pub fn from_path(path: String) -> Result<Self> {
153        let size = std::fs::metadata(path.clone())?.len();
154        Ok(Self::new(path, size))
155    }
156
157    /// Return the path of this partitioned file
158    pub fn path(&self) -> &Path {
159        &self.object_meta.location
160    }
161
162    /// Update the file to only scan the specified range (in bytes)
163    pub fn with_range(mut self, start: i64, end: i64) -> Self {
164        self.range = Some(FileRange { start, end });
165        self
166    }
167
168    /// Update the user defined extensions for this file.
169    ///
170    /// This can be used to pass reader specific information.
171    pub fn with_extensions(
172        mut self,
173        extensions: Arc<dyn std::any::Any + Send + Sync>,
174    ) -> Self {
175        self.extensions = Some(extensions);
176        self
177    }
178}
179
180impl From<ObjectMeta> for PartitionedFile {
181    fn from(object_meta: ObjectMeta) -> Self {
182        PartitionedFile {
183            object_meta,
184            partition_values: vec![],
185            range: None,
186            statistics: None,
187            extensions: None,
188            metadata_size_hint: None,
189        }
190    }
191}
192
193#[cfg(test)]
194mod tests {
195    use super::ListingTableUrl;
196    use arrow::{
197        array::{ArrayRef, Int32Array, RecordBatch},
198        datatypes::{DataType, Field, Schema, SchemaRef},
199    };
200    use datafusion_execution::object_store::{
201        DefaultObjectStoreRegistry, ObjectStoreRegistry,
202    };
203    use object_store::{local::LocalFileSystem, path::Path};
204    use std::{collections::HashMap, ops::Not, sync::Arc};
205    use url::Url;
206
207    /// Return a RecordBatch with a single Int32 array with values (0..sz) in a field named "i"
208    pub fn make_partition(sz: i32) -> RecordBatch {
209        let seq_start = 0;
210        let seq_end = sz;
211        let values = (seq_start..seq_end).collect::<Vec<_>>();
212        let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
213        let arr = Arc::new(Int32Array::from(values));
214
215        RecordBatch::try_new(schema, vec![arr as ArrayRef]).unwrap()
216    }
217
218    /// Get the schema for the aggregate_test_* csv files
219    pub fn aggr_test_schema() -> SchemaRef {
220        let mut f1 = Field::new("c1", DataType::Utf8, false);
221        f1.set_metadata(HashMap::from_iter(vec![("testing".into(), "test".into())]));
222        let schema = Schema::new(vec![
223            f1,
224            Field::new("c2", DataType::UInt32, false),
225            Field::new("c3", DataType::Int8, false),
226            Field::new("c4", DataType::Int16, false),
227            Field::new("c5", DataType::Int32, false),
228            Field::new("c6", DataType::Int64, false),
229            Field::new("c7", DataType::UInt8, false),
230            Field::new("c8", DataType::UInt16, false),
231            Field::new("c9", DataType::UInt32, false),
232            Field::new("c10", DataType::UInt64, false),
233            Field::new("c11", DataType::Float32, false),
234            Field::new("c12", DataType::Float64, false),
235            Field::new("c13", DataType::Utf8, false),
236        ]);
237
238        Arc::new(schema)
239    }
240
241    #[test]
242    fn test_object_store_listing_url() {
243        let listing = ListingTableUrl::parse("file:///").unwrap();
244        let store = listing.object_store();
245        assert_eq!(store.as_str(), "file:///");
246
247        let listing = ListingTableUrl::parse("s3://bucket/").unwrap();
248        let store = listing.object_store();
249        assert_eq!(store.as_str(), "s3://bucket/");
250    }
251
252    #[test]
253    fn test_get_store_hdfs() {
254        let sut = DefaultObjectStoreRegistry::default();
255        let url = Url::parse("hdfs://localhost:8020").unwrap();
256        sut.register_store(&url, Arc::new(LocalFileSystem::new()));
257        let url = ListingTableUrl::parse("hdfs://localhost:8020/key").unwrap();
258        sut.get_store(url.as_ref()).unwrap();
259    }
260
261    #[test]
262    fn test_get_store_s3() {
263        let sut = DefaultObjectStoreRegistry::default();
264        let url = Url::parse("s3://bucket/key").unwrap();
265        sut.register_store(&url, Arc::new(LocalFileSystem::new()));
266        let url = ListingTableUrl::parse("s3://bucket/key").unwrap();
267        sut.get_store(url.as_ref()).unwrap();
268    }
269
270    #[test]
271    fn test_get_store_file() {
272        let sut = DefaultObjectStoreRegistry::default();
273        let url = ListingTableUrl::parse("file:///bucket/key").unwrap();
274        sut.get_store(url.as_ref()).unwrap();
275    }
276
277    #[test]
278    fn test_get_store_local() {
279        let sut = DefaultObjectStoreRegistry::default();
280        let url = ListingTableUrl::parse("../").unwrap();
281        sut.get_store(url.as_ref()).unwrap();
282    }
283
284    #[test]
285    fn test_url_contains() {
286        let url = ListingTableUrl::parse("file:///var/data/mytable/").unwrap();
287
288        // standard case with default config
289        assert!(url.contains(
290            &Path::parse("/var/data/mytable/data.parquet").unwrap(),
291            true
292        ));
293
294        // standard case with `ignore_subdirectory` set to false
295        assert!(url.contains(
296            &Path::parse("/var/data/mytable/data.parquet").unwrap(),
297            false
298        ));
299
300        // as per documentation, when `ignore_subdirectory` is true, we should ignore files that aren't
301        // a direct child of the `url`
302        assert!(url
303            .contains(
304                &Path::parse("/var/data/mytable/mysubfolder/data.parquet").unwrap(),
305                true
306            )
307            .not());
308
309        // when we set `ignore_subdirectory` to false, we should not ignore the file
310        assert!(url.contains(
311            &Path::parse("/var/data/mytable/mysubfolder/data.parquet").unwrap(),
312            false
313        ));
314
315        // as above, `ignore_subdirectory` is false, so we include the file
316        assert!(url.contains(
317            &Path::parse("/var/data/mytable/year=2024/data.parquet").unwrap(),
318            false
319        ));
320
321        // in this case, we include the file even when `ignore_subdirectory` is true because the
322        // path segment is a hive partition which doesn't count as a subdirectory for the purposes
323        // of `Url::contains`
324        assert!(url.contains(
325            &Path::parse("/var/data/mytable/year=2024/data.parquet").unwrap(),
326            true
327        ));
328
329        // testing an empty path with default config
330        assert!(url.contains(&Path::parse("/var/data/mytable/").unwrap(), true));
331
332        // testing an empty path with `ignore_subdirectory` set to false
333        assert!(url.contains(&Path::parse("/var/data/mytable/").unwrap(), false));
334    }
335}