1#![doc(
19 html_logo_url = "https://raw.githubusercontent.com/apache/datafusion/19fe44cf2f30cbdd63d4a4f52c74055163c6cc38/docs/logos/standalone_logo/logo_original.svg",
20 html_favicon_url = "https://raw.githubusercontent.com/apache/datafusion/19fe44cf2f30cbdd63d4a4f52c74055163c6cc38/docs/logos/standalone_logo/logo_original.svg"
21)]
22#![cfg_attr(docsrs, feature(doc_auto_cfg))]
23#![cfg_attr(not(test), deny(clippy::clone_on_ref_ptr))]
26
27pub mod decoder;
31pub mod display;
32pub mod file;
33pub mod file_compression_type;
34pub mod file_format;
35pub mod file_groups;
36pub mod file_meta;
37pub mod file_scan_config;
38pub mod file_sink_config;
39pub mod file_stream;
40pub mod memory;
41pub mod schema_adapter;
42pub mod sink;
43pub mod source;
44mod statistics;
45
46#[cfg(test)]
47pub mod test_util;
48
49pub mod url;
50pub mod write;
51pub use self::url::ListingTableUrl;
52use crate::file_groups::FileGroup;
53use chrono::TimeZone;
54use datafusion_common::stats::Precision;
55use datafusion_common::{exec_datafusion_err, ColumnStatistics, Result};
56use datafusion_common::{ScalarValue, Statistics};
57use file_meta::FileMeta;
58use futures::{Stream, StreamExt};
59use object_store::{path::Path, ObjectMeta};
60use object_store::{GetOptions, GetRange, ObjectStore};
61#[allow(deprecated)]
63pub use statistics::add_row_stats;
64pub use statistics::compute_all_files_statistics;
65use std::ops::Range;
66use std::pin::Pin;
67use std::sync::Arc;
68
69pub type PartitionedFileStream =
71 Pin<Box<dyn Stream<Item = Result<PartitionedFile>> + Send + Sync + 'static>>;
72
73#[derive(Debug, Clone, PartialEq, Hash, Eq, PartialOrd, Ord)]
77pub struct FileRange {
78 pub start: i64,
80 pub end: i64,
82}
83
84impl FileRange {
85 pub fn contains(&self, offset: i64) -> bool {
87 offset >= self.start && offset < self.end
88 }
89}
90
91#[derive(Debug, Clone)]
92pub struct PartitionedFile {
95 pub object_meta: ObjectMeta,
97 pub partition_values: Vec<ScalarValue>,
108 pub range: Option<FileRange>,
110 pub statistics: Option<Arc<Statistics>>,
115 pub extensions: Option<Arc<dyn std::any::Any + Send + Sync>>,
117 pub metadata_size_hint: Option<usize>,
119}
120
121impl PartitionedFile {
122 pub fn new(path: impl Into<String>, size: u64) -> Self {
124 Self {
125 object_meta: ObjectMeta {
126 location: Path::from(path.into()),
127 last_modified: chrono::Utc.timestamp_nanos(0),
128 size,
129 e_tag: None,
130 version: None,
131 },
132 partition_values: vec![],
133 range: None,
134 statistics: None,
135 extensions: None,
136 metadata_size_hint: None,
137 }
138 }
139
140 pub fn new_with_range(path: String, size: u64, start: i64, end: i64) -> Self {
142 Self {
143 object_meta: ObjectMeta {
144 location: Path::from(path),
145 last_modified: chrono::Utc.timestamp_nanos(0),
146 size,
147 e_tag: None,
148 version: None,
149 },
150 partition_values: vec![],
151 range: Some(FileRange { start, end }),
152 statistics: None,
153 extensions: None,
154 metadata_size_hint: None,
155 }
156 .with_range(start, end)
157 }
158
159 pub fn with_metadata_size_hint(mut self, metadata_size_hint: usize) -> Self {
163 self.metadata_size_hint = Some(metadata_size_hint);
164 self
165 }
166
167 pub fn from_path(path: String) -> Result<Self> {
169 let size = std::fs::metadata(path.clone())?.len();
170 Ok(Self::new(path, size))
171 }
172
173 pub fn path(&self) -> &Path {
175 &self.object_meta.location
176 }
177
178 pub fn with_range(mut self, start: i64, end: i64) -> Self {
180 self.range = Some(FileRange { start, end });
181 self
182 }
183
184 pub fn with_extensions(
188 mut self,
189 extensions: Arc<dyn std::any::Any + Send + Sync>,
190 ) -> Self {
191 self.extensions = Some(extensions);
192 self
193 }
194
195 pub fn with_statistics(mut self, statistics: Arc<Statistics>) -> Self {
197 self.statistics = Some(statistics);
198 self
199 }
200}
201
202impl From<ObjectMeta> for PartitionedFile {
203 fn from(object_meta: ObjectMeta) -> Self {
204 PartitionedFile {
205 object_meta,
206 partition_values: vec![],
207 range: None,
208 statistics: None,
209 extensions: None,
210 metadata_size_hint: None,
211 }
212 }
213}
214
215pub enum RangeCalculation {
229 Range(Option<Range<u64>>),
230 TerminateEarly,
231}
232
233pub async fn calculate_range(
244 file_meta: &FileMeta,
245 store: &Arc<dyn ObjectStore>,
246 terminator: Option<u8>,
247) -> Result<RangeCalculation> {
248 let location = file_meta.location();
249 let file_size = file_meta.object_meta.size;
250 let newline = terminator.unwrap_or(b'\n');
251
252 match file_meta.range {
253 None => Ok(RangeCalculation::Range(None)),
254 Some(FileRange { start, end }) => {
255 let start: u64 = start.try_into().map_err(|_| {
256 exec_datafusion_err!("Expect start range to fit in u64, got {start}")
257 })?;
258 let end: u64 = end.try_into().map_err(|_| {
259 exec_datafusion_err!("Expect end range to fit in u64, got {end}")
260 })?;
261
262 let start_delta = if start != 0 {
263 find_first_newline(store, location, start - 1, file_size, newline).await?
264 } else {
265 0
266 };
267
268 let end_delta = if end != file_size {
269 find_first_newline(store, location, end - 1, file_size, newline).await?
270 } else {
271 0
272 };
273
274 let range = start + start_delta..end + end_delta;
275
276 if range.start == range.end {
277 return Ok(RangeCalculation::TerminateEarly);
278 }
279
280 Ok(RangeCalculation::Range(Some(range)))
281 }
282 }
283}
284
285async fn find_first_newline(
297 object_store: &Arc<dyn ObjectStore>,
298 location: &Path,
299 start: u64,
300 end: u64,
301 newline: u8,
302) -> Result<u64> {
303 let options = GetOptions {
304 range: Some(GetRange::Bounded(start..end)),
305 ..Default::default()
306 };
307
308 let result = object_store.get_opts(location, options).await?;
309 let mut result_stream = result.into_stream();
310
311 let mut index = 0;
312
313 while let Some(chunk) = result_stream.next().await.transpose()? {
314 if let Some(position) = chunk.iter().position(|&byte| byte == newline) {
315 let position = position as u64;
316 return Ok(index + position);
317 }
318
319 index += chunk.len() as u64;
320 }
321
322 Ok(index)
323}
324
325pub fn generate_test_files(num_files: usize, overlap_factor: f64) -> Vec<FileGroup> {
365 let mut files = Vec::with_capacity(num_files);
366 if num_files == 0 {
367 return vec![];
368 }
369 let range_size = if overlap_factor == 0.0 {
370 100 / num_files as i64
371 } else {
372 (100.0 / (overlap_factor * num_files as f64)).max(1.0) as i64
373 };
374
375 for i in 0..num_files {
376 let base = (i as f64 * range_size as f64 * (1.0 - overlap_factor)) as i64;
377 let min = base as f64;
378 let max = (base + range_size) as f64;
379
380 let file = PartitionedFile {
381 object_meta: ObjectMeta {
382 location: Path::from(format!("file_{}.parquet", i)),
383 last_modified: chrono::Utc::now(),
384 size: 1000,
385 e_tag: None,
386 version: None,
387 },
388 partition_values: vec![],
389 range: None,
390 statistics: Some(Arc::new(Statistics {
391 num_rows: Precision::Exact(100),
392 total_byte_size: Precision::Exact(1000),
393 column_statistics: vec![ColumnStatistics {
394 null_count: Precision::Exact(0),
395 max_value: Precision::Exact(ScalarValue::Float64(Some(max))),
396 min_value: Precision::Exact(ScalarValue::Float64(Some(min))),
397 sum_value: Precision::Absent,
398 distinct_count: Precision::Absent,
399 }],
400 })),
401 extensions: None,
402 metadata_size_hint: None,
403 };
404 files.push(file);
405 }
406
407 vec![FileGroup::new(files)]
408}
409
410pub fn verify_sort_integrity(file_groups: &[FileGroup]) -> bool {
413 for group in file_groups {
414 let files = group.iter().collect::<Vec<_>>();
415 for i in 1..files.len() {
416 let prev_file = files[i - 1];
417 let curr_file = files[i];
418
419 if let (Some(prev_stats), Some(curr_stats)) =
421 (&prev_file.statistics, &curr_file.statistics)
422 {
423 let prev_max = &prev_stats.column_statistics[0].max_value;
424 let curr_min = &curr_stats.column_statistics[0].min_value;
425 if curr_min.get_value().unwrap() <= prev_max.get_value().unwrap() {
426 return false;
427 }
428 }
429 }
430 }
431 true
432}
433
434#[cfg(test)]
435mod tests {
436 use super::ListingTableUrl;
437 use arrow::{
438 array::{ArrayRef, Int32Array, RecordBatch},
439 datatypes::{DataType, Field, Schema, SchemaRef},
440 };
441 use datafusion_execution::object_store::{
442 DefaultObjectStoreRegistry, ObjectStoreRegistry,
443 };
444 use object_store::{local::LocalFileSystem, path::Path};
445 use std::{collections::HashMap, ops::Not, sync::Arc};
446 use url::Url;
447
448 pub fn make_partition(sz: i32) -> RecordBatch {
450 let seq_start = 0;
451 let seq_end = sz;
452 let values = (seq_start..seq_end).collect::<Vec<_>>();
453 let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
454 let arr = Arc::new(Int32Array::from(values));
455
456 RecordBatch::try_new(schema, vec![arr as ArrayRef]).unwrap()
457 }
458
459 pub fn aggr_test_schema() -> SchemaRef {
461 let mut f1 = Field::new("c1", DataType::Utf8, false);
462 f1.set_metadata(HashMap::from_iter(vec![("testing".into(), "test".into())]));
463 let schema = Schema::new(vec![
464 f1,
465 Field::new("c2", DataType::UInt32, false),
466 Field::new("c3", DataType::Int8, false),
467 Field::new("c4", DataType::Int16, false),
468 Field::new("c5", DataType::Int32, false),
469 Field::new("c6", DataType::Int64, false),
470 Field::new("c7", DataType::UInt8, false),
471 Field::new("c8", DataType::UInt16, false),
472 Field::new("c9", DataType::UInt32, false),
473 Field::new("c10", DataType::UInt64, false),
474 Field::new("c11", DataType::Float32, false),
475 Field::new("c12", DataType::Float64, false),
476 Field::new("c13", DataType::Utf8, false),
477 ]);
478
479 Arc::new(schema)
480 }
481
482 #[test]
483 fn test_object_store_listing_url() {
484 let listing = ListingTableUrl::parse("file:///").unwrap();
485 let store = listing.object_store();
486 assert_eq!(store.as_str(), "file:///");
487
488 let listing = ListingTableUrl::parse("s3://bucket/").unwrap();
489 let store = listing.object_store();
490 assert_eq!(store.as_str(), "s3://bucket/");
491 }
492
493 #[test]
494 fn test_get_store_hdfs() {
495 let sut = DefaultObjectStoreRegistry::default();
496 let url = Url::parse("hdfs://localhost:8020").unwrap();
497 sut.register_store(&url, Arc::new(LocalFileSystem::new()));
498 let url = ListingTableUrl::parse("hdfs://localhost:8020/key").unwrap();
499 sut.get_store(url.as_ref()).unwrap();
500 }
501
502 #[test]
503 fn test_get_store_s3() {
504 let sut = DefaultObjectStoreRegistry::default();
505 let url = Url::parse("s3://bucket/key").unwrap();
506 sut.register_store(&url, Arc::new(LocalFileSystem::new()));
507 let url = ListingTableUrl::parse("s3://bucket/key").unwrap();
508 sut.get_store(url.as_ref()).unwrap();
509 }
510
511 #[test]
512 fn test_get_store_file() {
513 let sut = DefaultObjectStoreRegistry::default();
514 let url = ListingTableUrl::parse("file:///bucket/key").unwrap();
515 sut.get_store(url.as_ref()).unwrap();
516 }
517
518 #[test]
519 fn test_get_store_local() {
520 let sut = DefaultObjectStoreRegistry::default();
521 let url = ListingTableUrl::parse("../").unwrap();
522 sut.get_store(url.as_ref()).unwrap();
523 }
524
525 #[test]
526 fn test_url_contains() {
527 let url = ListingTableUrl::parse("file:///var/data/mytable/").unwrap();
528
529 assert!(url.contains(
531 &Path::parse("/var/data/mytable/data.parquet").unwrap(),
532 true
533 ));
534
535 assert!(url.contains(
537 &Path::parse("/var/data/mytable/data.parquet").unwrap(),
538 false
539 ));
540
541 assert!(url
544 .contains(
545 &Path::parse("/var/data/mytable/mysubfolder/data.parquet").unwrap(),
546 true
547 )
548 .not());
549
550 assert!(url.contains(
552 &Path::parse("/var/data/mytable/mysubfolder/data.parquet").unwrap(),
553 false
554 ));
555
556 assert!(url.contains(
558 &Path::parse("/var/data/mytable/year=2024/data.parquet").unwrap(),
559 false
560 ));
561
562 assert!(url.contains(
566 &Path::parse("/var/data/mytable/year=2024/data.parquet").unwrap(),
567 true
568 ));
569
570 assert!(url.contains(&Path::parse("/var/data/mytable/").unwrap(), true));
572
573 assert!(url.contains(&Path::parse("/var/data/mytable/").unwrap(), false));
575 }
576}