1#![doc(
19 html_logo_url = "https://raw.githubusercontent.com/apache/datafusion/19fe44cf2f30cbdd63d4a4f52c74055163c6cc38/docs/logos/standalone_logo/logo_original.svg",
20 html_favicon_url = "https://raw.githubusercontent.com/apache/datafusion/19fe44cf2f30cbdd63d4a4f52c74055163c6cc38/docs/logos/standalone_logo/logo_original.svg"
21)]
22#![cfg_attr(docsrs, feature(doc_auto_cfg))]
23#![cfg_attr(not(test), deny(clippy::clone_on_ref_ptr))]
26
27pub mod decoder;
31pub mod display;
32pub mod file;
33pub mod file_compression_type;
34pub mod file_format;
35pub mod file_groups;
36pub mod file_meta;
37pub mod file_scan_config;
38pub mod file_sink_config;
39pub mod file_stream;
40pub mod memory;
41pub mod schema_adapter;
42pub mod sink;
43pub mod source;
44mod statistics;
45
46#[cfg(test)]
47pub mod test_util;
48
49pub mod url;
50pub mod write;
51pub use self::file::as_file_source;
52pub use self::url::ListingTableUrl;
53use crate::file_groups::FileGroup;
54use chrono::TimeZone;
55use datafusion_common::stats::Precision;
56use datafusion_common::{exec_datafusion_err, ColumnStatistics, Result};
57use datafusion_common::{ScalarValue, Statistics};
58use file_meta::FileMeta;
59use futures::{Stream, StreamExt};
60use object_store::{path::Path, ObjectMeta};
61use object_store::{GetOptions, GetRange, ObjectStore};
62#[allow(deprecated)]
64pub use statistics::add_row_stats;
65pub use statistics::compute_all_files_statistics;
66use std::ops::Range;
67use std::pin::Pin;
68use std::sync::Arc;
69
70pub type PartitionedFileStream =
72 Pin<Box<dyn Stream<Item = Result<PartitionedFile>> + Send + Sync + 'static>>;
73
74#[derive(Debug, Clone, PartialEq, Hash, Eq, PartialOrd, Ord)]
78pub struct FileRange {
79 pub start: i64,
81 pub end: i64,
83}
84
85impl FileRange {
86 pub fn contains(&self, offset: i64) -> bool {
88 offset >= self.start && offset < self.end
89 }
90}
91
92#[derive(Debug, Clone)]
93pub struct PartitionedFile {
96 pub object_meta: ObjectMeta,
98 pub partition_values: Vec<ScalarValue>,
109 pub range: Option<FileRange>,
111 pub statistics: Option<Arc<Statistics>>,
116 pub extensions: Option<Arc<dyn std::any::Any + Send + Sync>>,
118 pub metadata_size_hint: Option<usize>,
120}
121
122impl PartitionedFile {
123 pub fn new(path: impl Into<String>, size: u64) -> Self {
125 Self {
126 object_meta: ObjectMeta {
127 location: Path::from(path.into()),
128 last_modified: chrono::Utc.timestamp_nanos(0),
129 size,
130 e_tag: None,
131 version: None,
132 },
133 partition_values: vec![],
134 range: None,
135 statistics: None,
136 extensions: None,
137 metadata_size_hint: None,
138 }
139 }
140
141 pub fn new_with_range(path: String, size: u64, start: i64, end: i64) -> Self {
143 Self {
144 object_meta: ObjectMeta {
145 location: Path::from(path),
146 last_modified: chrono::Utc.timestamp_nanos(0),
147 size,
148 e_tag: None,
149 version: None,
150 },
151 partition_values: vec![],
152 range: Some(FileRange { start, end }),
153 statistics: None,
154 extensions: None,
155 metadata_size_hint: None,
156 }
157 .with_range(start, end)
158 }
159
160 pub fn with_metadata_size_hint(mut self, metadata_size_hint: usize) -> Self {
164 self.metadata_size_hint = Some(metadata_size_hint);
165 self
166 }
167
168 pub fn from_path(path: String) -> Result<Self> {
170 let size = std::fs::metadata(path.clone())?.len();
171 Ok(Self::new(path, size))
172 }
173
174 pub fn path(&self) -> &Path {
176 &self.object_meta.location
177 }
178
179 pub fn with_range(mut self, start: i64, end: i64) -> Self {
181 self.range = Some(FileRange { start, end });
182 self
183 }
184
185 pub fn with_extensions(
189 mut self,
190 extensions: Arc<dyn std::any::Any + Send + Sync>,
191 ) -> Self {
192 self.extensions = Some(extensions);
193 self
194 }
195
196 pub fn with_statistics(mut self, statistics: Arc<Statistics>) -> Self {
198 self.statistics = Some(statistics);
199 self
200 }
201}
202
203impl From<ObjectMeta> for PartitionedFile {
204 fn from(object_meta: ObjectMeta) -> Self {
205 PartitionedFile {
206 object_meta,
207 partition_values: vec![],
208 range: None,
209 statistics: None,
210 extensions: None,
211 metadata_size_hint: None,
212 }
213 }
214}
215
216pub enum RangeCalculation {
230 Range(Option<Range<u64>>),
231 TerminateEarly,
232}
233
234pub async fn calculate_range(
245 file_meta: &FileMeta,
246 store: &Arc<dyn ObjectStore>,
247 terminator: Option<u8>,
248) -> Result<RangeCalculation> {
249 let location = file_meta.location();
250 let file_size = file_meta.object_meta.size;
251 let newline = terminator.unwrap_or(b'\n');
252
253 match file_meta.range {
254 None => Ok(RangeCalculation::Range(None)),
255 Some(FileRange { start, end }) => {
256 let start: u64 = start.try_into().map_err(|_| {
257 exec_datafusion_err!("Expect start range to fit in u64, got {start}")
258 })?;
259 let end: u64 = end.try_into().map_err(|_| {
260 exec_datafusion_err!("Expect end range to fit in u64, got {end}")
261 })?;
262
263 let start_delta = if start != 0 {
264 find_first_newline(store, location, start - 1, file_size, newline).await?
265 } else {
266 0
267 };
268
269 let end_delta = if end != file_size {
270 find_first_newline(store, location, end - 1, file_size, newline).await?
271 } else {
272 0
273 };
274
275 let range = start + start_delta..end + end_delta;
276
277 if range.start == range.end {
278 return Ok(RangeCalculation::TerminateEarly);
279 }
280
281 Ok(RangeCalculation::Range(Some(range)))
282 }
283 }
284}
285
286async fn find_first_newline(
298 object_store: &Arc<dyn ObjectStore>,
299 location: &Path,
300 start: u64,
301 end: u64,
302 newline: u8,
303) -> Result<u64> {
304 let options = GetOptions {
305 range: Some(GetRange::Bounded(start..end)),
306 ..Default::default()
307 };
308
309 let result = object_store.get_opts(location, options).await?;
310 let mut result_stream = result.into_stream();
311
312 let mut index = 0;
313
314 while let Some(chunk) = result_stream.next().await.transpose()? {
315 if let Some(position) = chunk.iter().position(|&byte| byte == newline) {
316 let position = position as u64;
317 return Ok(index + position);
318 }
319
320 index += chunk.len() as u64;
321 }
322
323 Ok(index)
324}
325
326pub fn generate_test_files(num_files: usize, overlap_factor: f64) -> Vec<FileGroup> {
366 let mut files = Vec::with_capacity(num_files);
367 if num_files == 0 {
368 return vec![];
369 }
370 let range_size = if overlap_factor == 0.0 {
371 100 / num_files as i64
372 } else {
373 (100.0 / (overlap_factor * num_files as f64)).max(1.0) as i64
374 };
375
376 for i in 0..num_files {
377 let base = (i as f64 * range_size as f64 * (1.0 - overlap_factor)) as i64;
378 let min = base as f64;
379 let max = (base + range_size) as f64;
380
381 let file = PartitionedFile {
382 object_meta: ObjectMeta {
383 location: Path::from(format!("file_{i}.parquet")),
384 last_modified: chrono::Utc::now(),
385 size: 1000,
386 e_tag: None,
387 version: None,
388 },
389 partition_values: vec![],
390 range: None,
391 statistics: Some(Arc::new(Statistics {
392 num_rows: Precision::Exact(100),
393 total_byte_size: Precision::Exact(1000),
394 column_statistics: vec![ColumnStatistics {
395 null_count: Precision::Exact(0),
396 max_value: Precision::Exact(ScalarValue::Float64(Some(max))),
397 min_value: Precision::Exact(ScalarValue::Float64(Some(min))),
398 sum_value: Precision::Absent,
399 distinct_count: Precision::Absent,
400 }],
401 })),
402 extensions: None,
403 metadata_size_hint: None,
404 };
405 files.push(file);
406 }
407
408 vec![FileGroup::new(files)]
409}
410
411pub fn verify_sort_integrity(file_groups: &[FileGroup]) -> bool {
414 for group in file_groups {
415 let files = group.iter().collect::<Vec<_>>();
416 for i in 1..files.len() {
417 let prev_file = files[i - 1];
418 let curr_file = files[i];
419
420 if let (Some(prev_stats), Some(curr_stats)) =
422 (&prev_file.statistics, &curr_file.statistics)
423 {
424 let prev_max = &prev_stats.column_statistics[0].max_value;
425 let curr_min = &curr_stats.column_statistics[0].min_value;
426 if curr_min.get_value().unwrap() <= prev_max.get_value().unwrap() {
427 return false;
428 }
429 }
430 }
431 }
432 true
433}
434
435#[cfg(test)]
436mod tests {
437 use super::ListingTableUrl;
438 use arrow::{
439 array::{ArrayRef, Int32Array, RecordBatch},
440 datatypes::{DataType, Field, Schema, SchemaRef},
441 };
442 use datafusion_execution::object_store::{
443 DefaultObjectStoreRegistry, ObjectStoreRegistry,
444 };
445 use object_store::{local::LocalFileSystem, path::Path};
446 use std::{collections::HashMap, ops::Not, sync::Arc};
447 use url::Url;
448
449 pub fn make_partition(sz: i32) -> RecordBatch {
451 let seq_start = 0;
452 let seq_end = sz;
453 let values = (seq_start..seq_end).collect::<Vec<_>>();
454 let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
455 let arr = Arc::new(Int32Array::from(values));
456
457 RecordBatch::try_new(schema, vec![arr as ArrayRef]).unwrap()
458 }
459
460 pub fn aggr_test_schema() -> SchemaRef {
462 let mut f1 = Field::new("c1", DataType::Utf8, false);
463 f1.set_metadata(HashMap::from_iter(vec![("testing".into(), "test".into())]));
464 let schema = Schema::new(vec![
465 f1,
466 Field::new("c2", DataType::UInt32, false),
467 Field::new("c3", DataType::Int8, false),
468 Field::new("c4", DataType::Int16, false),
469 Field::new("c5", DataType::Int32, false),
470 Field::new("c6", DataType::Int64, false),
471 Field::new("c7", DataType::UInt8, false),
472 Field::new("c8", DataType::UInt16, false),
473 Field::new("c9", DataType::UInt32, false),
474 Field::new("c10", DataType::UInt64, false),
475 Field::new("c11", DataType::Float32, false),
476 Field::new("c12", DataType::Float64, false),
477 Field::new("c13", DataType::Utf8, false),
478 ]);
479
480 Arc::new(schema)
481 }
482
483 #[test]
484 fn test_object_store_listing_url() {
485 let listing = ListingTableUrl::parse("file:///").unwrap();
486 let store = listing.object_store();
487 assert_eq!(store.as_str(), "file:///");
488
489 let listing = ListingTableUrl::parse("s3://bucket/").unwrap();
490 let store = listing.object_store();
491 assert_eq!(store.as_str(), "s3://bucket/");
492 }
493
494 #[test]
495 fn test_get_store_hdfs() {
496 let sut = DefaultObjectStoreRegistry::default();
497 let url = Url::parse("hdfs://localhost:8020").unwrap();
498 sut.register_store(&url, Arc::new(LocalFileSystem::new()));
499 let url = ListingTableUrl::parse("hdfs://localhost:8020/key").unwrap();
500 sut.get_store(url.as_ref()).unwrap();
501 }
502
503 #[test]
504 fn test_get_store_s3() {
505 let sut = DefaultObjectStoreRegistry::default();
506 let url = Url::parse("s3://bucket/key").unwrap();
507 sut.register_store(&url, Arc::new(LocalFileSystem::new()));
508 let url = ListingTableUrl::parse("s3://bucket/key").unwrap();
509 sut.get_store(url.as_ref()).unwrap();
510 }
511
512 #[test]
513 fn test_get_store_file() {
514 let sut = DefaultObjectStoreRegistry::default();
515 let url = ListingTableUrl::parse("file:///bucket/key").unwrap();
516 sut.get_store(url.as_ref()).unwrap();
517 }
518
519 #[test]
520 fn test_get_store_local() {
521 let sut = DefaultObjectStoreRegistry::default();
522 let url = ListingTableUrl::parse("../").unwrap();
523 sut.get_store(url.as_ref()).unwrap();
524 }
525
526 #[test]
527 fn test_url_contains() {
528 let url = ListingTableUrl::parse("file:///var/data/mytable/").unwrap();
529
530 assert!(url.contains(
532 &Path::parse("/var/data/mytable/data.parquet").unwrap(),
533 true
534 ));
535
536 assert!(url.contains(
538 &Path::parse("/var/data/mytable/data.parquet").unwrap(),
539 false
540 ));
541
542 assert!(url
545 .contains(
546 &Path::parse("/var/data/mytable/mysubfolder/data.parquet").unwrap(),
547 true
548 )
549 .not());
550
551 assert!(url.contains(
553 &Path::parse("/var/data/mytable/mysubfolder/data.parquet").unwrap(),
554 false
555 ));
556
557 assert!(url.contains(
559 &Path::parse("/var/data/mytable/year=2024/data.parquet").unwrap(),
560 false
561 ));
562
563 assert!(url.contains(
567 &Path::parse("/var/data/mytable/year=2024/data.parquet").unwrap(),
568 true
569 ));
570
571 assert!(url.contains(&Path::parse("/var/data/mytable/").unwrap(), true));
573
574 assert!(url.contains(&Path::parse("/var/data/mytable/").unwrap(), false));
576 }
577}