1#![doc(
19 html_logo_url = "https://raw.githubusercontent.com/apache/datafusion/19fe44cf2f30cbdd63d4a4f52c74055163c6cc38/docs/logos/standalone_logo/logo_original.svg",
20 html_favicon_url = "https://raw.githubusercontent.com/apache/datafusion/19fe44cf2f30cbdd63d4a4f52c74055163c6cc38/docs/logos/standalone_logo/logo_original.svg"
21)]
22#![cfg_attr(docsrs, feature(doc_auto_cfg))]
23
24pub mod display;
28pub mod file;
29pub mod file_compression_type;
30pub mod file_format;
31pub mod file_groups;
32pub mod file_meta;
33pub mod file_scan_config;
34pub mod file_sink_config;
35pub mod file_stream;
36pub mod memory;
37pub mod source;
38mod statistics;
39#[cfg(test)]
40mod test_util;
41pub mod url;
42pub mod write;
43use chrono::TimeZone;
44use datafusion_common::Result;
45use datafusion_common::{ScalarValue, Statistics};
46use futures::Stream;
47use object_store::{path::Path, ObjectMeta};
48use std::pin::Pin;
49use std::sync::Arc;
50
51pub use self::url::ListingTableUrl;
52
53pub type PartitionedFileStream =
55 Pin<Box<dyn Stream<Item = Result<PartitionedFile>> + Send + Sync + 'static>>;
56
57#[derive(Debug, Clone, PartialEq, Hash, Eq, PartialOrd, Ord)]
61pub struct FileRange {
62 pub start: i64,
64 pub end: i64,
66}
67
68impl FileRange {
69 pub fn contains(&self, offset: i64) -> bool {
71 offset >= self.start && offset < self.end
72 }
73}
74
75#[derive(Debug, Clone)]
76pub struct PartitionedFile {
79 pub object_meta: ObjectMeta,
81 pub partition_values: Vec<ScalarValue>,
92 pub range: Option<FileRange>,
94 pub statistics: Option<Statistics>,
99 pub extensions: Option<Arc<dyn std::any::Any + Send + Sync>>,
101 pub metadata_size_hint: Option<usize>,
103}
104
105impl PartitionedFile {
106 pub fn new(path: impl Into<String>, size: u64) -> Self {
108 Self {
109 object_meta: ObjectMeta {
110 location: Path::from(path.into()),
111 last_modified: chrono::Utc.timestamp_nanos(0),
112 size: size as usize,
113 e_tag: None,
114 version: None,
115 },
116 partition_values: vec![],
117 range: None,
118 statistics: None,
119 extensions: None,
120 metadata_size_hint: None,
121 }
122 }
123
124 pub fn new_with_range(path: String, size: u64, start: i64, end: i64) -> Self {
126 Self {
127 object_meta: ObjectMeta {
128 location: Path::from(path),
129 last_modified: chrono::Utc.timestamp_nanos(0),
130 size: size as usize,
131 e_tag: None,
132 version: None,
133 },
134 partition_values: vec![],
135 range: Some(FileRange { start, end }),
136 statistics: None,
137 extensions: None,
138 metadata_size_hint: None,
139 }
140 .with_range(start, end)
141 }
142
143 pub fn with_metadata_size_hint(mut self, metadata_size_hint: usize) -> Self {
147 self.metadata_size_hint = Some(metadata_size_hint);
148 self
149 }
150
151 pub fn from_path(path: String) -> Result<Self> {
153 let size = std::fs::metadata(path.clone())?.len();
154 Ok(Self::new(path, size))
155 }
156
157 pub fn path(&self) -> &Path {
159 &self.object_meta.location
160 }
161
162 pub fn with_range(mut self, start: i64, end: i64) -> Self {
164 self.range = Some(FileRange { start, end });
165 self
166 }
167
168 pub fn with_extensions(
172 mut self,
173 extensions: Arc<dyn std::any::Any + Send + Sync>,
174 ) -> Self {
175 self.extensions = Some(extensions);
176 self
177 }
178}
179
180impl From<ObjectMeta> for PartitionedFile {
181 fn from(object_meta: ObjectMeta) -> Self {
182 PartitionedFile {
183 object_meta,
184 partition_values: vec![],
185 range: None,
186 statistics: None,
187 extensions: None,
188 metadata_size_hint: None,
189 }
190 }
191}
192
193#[cfg(test)]
194mod tests {
195 use super::ListingTableUrl;
196 use arrow::{
197 array::{ArrayRef, Int32Array, RecordBatch},
198 datatypes::{DataType, Field, Schema, SchemaRef},
199 };
200 use datafusion_execution::object_store::{
201 DefaultObjectStoreRegistry, ObjectStoreRegistry,
202 };
203 use object_store::{local::LocalFileSystem, path::Path};
204 use std::{collections::HashMap, ops::Not, sync::Arc};
205 use url::Url;
206
207 pub fn make_partition(sz: i32) -> RecordBatch {
209 let seq_start = 0;
210 let seq_end = sz;
211 let values = (seq_start..seq_end).collect::<Vec<_>>();
212 let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
213 let arr = Arc::new(Int32Array::from(values));
214
215 RecordBatch::try_new(schema, vec![arr as ArrayRef]).unwrap()
216 }
217
218 pub fn aggr_test_schema() -> SchemaRef {
220 let mut f1 = Field::new("c1", DataType::Utf8, false);
221 f1.set_metadata(HashMap::from_iter(vec![("testing".into(), "test".into())]));
222 let schema = Schema::new(vec![
223 f1,
224 Field::new("c2", DataType::UInt32, false),
225 Field::new("c3", DataType::Int8, false),
226 Field::new("c4", DataType::Int16, false),
227 Field::new("c5", DataType::Int32, false),
228 Field::new("c6", DataType::Int64, false),
229 Field::new("c7", DataType::UInt8, false),
230 Field::new("c8", DataType::UInt16, false),
231 Field::new("c9", DataType::UInt32, false),
232 Field::new("c10", DataType::UInt64, false),
233 Field::new("c11", DataType::Float32, false),
234 Field::new("c12", DataType::Float64, false),
235 Field::new("c13", DataType::Utf8, false),
236 ]);
237
238 Arc::new(schema)
239 }
240
241 #[test]
242 fn test_object_store_listing_url() {
243 let listing = ListingTableUrl::parse("file:///").unwrap();
244 let store = listing.object_store();
245 assert_eq!(store.as_str(), "file:///");
246
247 let listing = ListingTableUrl::parse("s3://bucket/").unwrap();
248 let store = listing.object_store();
249 assert_eq!(store.as_str(), "s3://bucket/");
250 }
251
252 #[test]
253 fn test_get_store_hdfs() {
254 let sut = DefaultObjectStoreRegistry::default();
255 let url = Url::parse("hdfs://localhost:8020").unwrap();
256 sut.register_store(&url, Arc::new(LocalFileSystem::new()));
257 let url = ListingTableUrl::parse("hdfs://localhost:8020/key").unwrap();
258 sut.get_store(url.as_ref()).unwrap();
259 }
260
261 #[test]
262 fn test_get_store_s3() {
263 let sut = DefaultObjectStoreRegistry::default();
264 let url = Url::parse("s3://bucket/key").unwrap();
265 sut.register_store(&url, Arc::new(LocalFileSystem::new()));
266 let url = ListingTableUrl::parse("s3://bucket/key").unwrap();
267 sut.get_store(url.as_ref()).unwrap();
268 }
269
270 #[test]
271 fn test_get_store_file() {
272 let sut = DefaultObjectStoreRegistry::default();
273 let url = ListingTableUrl::parse("file:///bucket/key").unwrap();
274 sut.get_store(url.as_ref()).unwrap();
275 }
276
277 #[test]
278 fn test_get_store_local() {
279 let sut = DefaultObjectStoreRegistry::default();
280 let url = ListingTableUrl::parse("../").unwrap();
281 sut.get_store(url.as_ref()).unwrap();
282 }
283
284 #[test]
285 fn test_url_contains() {
286 let url = ListingTableUrl::parse("file:///var/data/mytable/").unwrap();
287
288 assert!(url.contains(
290 &Path::parse("/var/data/mytable/data.parquet").unwrap(),
291 true
292 ));
293
294 assert!(url.contains(
296 &Path::parse("/var/data/mytable/data.parquet").unwrap(),
297 false
298 ));
299
300 assert!(url
303 .contains(
304 &Path::parse("/var/data/mytable/mysubfolder/data.parquet").unwrap(),
305 true
306 )
307 .not());
308
309 assert!(url.contains(
311 &Path::parse("/var/data/mytable/mysubfolder/data.parquet").unwrap(),
312 false
313 ));
314
315 assert!(url.contains(
317 &Path::parse("/var/data/mytable/year=2024/data.parquet").unwrap(),
318 false
319 ));
320
321 assert!(url.contains(
325 &Path::parse("/var/data/mytable/year=2024/data.parquet").unwrap(),
326 true
327 ));
328
329 assert!(url.contains(&Path::parse("/var/data/mytable/").unwrap(), true));
331
332 assert!(url.contains(&Path::parse("/var/data/mytable/").unwrap(), false));
334 }
335}