1#![doc(
19 html_logo_url = "https://raw.githubusercontent.com/apache/datafusion/19fe44cf2f30cbdd63d4a4f52c74055163c6cc38/docs/logos/standalone_logo/logo_original.svg",
20 html_favicon_url = "https://raw.githubusercontent.com/apache/datafusion/19fe44cf2f30cbdd63d4a4f52c74055163c6cc38/docs/logos/standalone_logo/logo_original.svg"
21)]
22#![cfg_attr(docsrs, feature(doc_auto_cfg))]
23#![cfg_attr(not(test), deny(clippy::clone_on_ref_ptr))]
26
27pub mod decoder;
31pub mod display;
32pub mod file;
33pub mod file_compression_type;
34pub mod file_format;
35pub mod file_groups;
36pub mod file_meta;
37pub mod file_scan_config;
38pub mod file_sink_config;
39pub mod file_stream;
40pub mod memory;
41pub mod schema_adapter;
42pub mod sink;
43pub mod source;
44mod statistics;
45
46#[cfg(test)]
47pub mod test_util;
48
49pub mod url;
50pub mod write;
51pub use self::file::as_file_source;
52pub use self::url::ListingTableUrl;
53use crate::file_groups::FileGroup;
54use chrono::TimeZone;
55use datafusion_common::stats::Precision;
56use datafusion_common::{exec_datafusion_err, ColumnStatistics, Result};
57use datafusion_common::{ScalarValue, Statistics};
58use file_meta::FileMeta;
59use futures::{Stream, StreamExt};
60use object_store::{path::Path, ObjectMeta};
61use object_store::{GetOptions, GetRange, ObjectStore};
62#[allow(deprecated)]
64pub use statistics::add_row_stats;
65pub use statistics::compute_all_files_statistics;
66use std::ops::Range;
67use std::pin::Pin;
68use std::sync::Arc;
69
70pub type PartitionedFileStream =
72 Pin<Box<dyn Stream<Item = Result<PartitionedFile>> + Send + Sync + 'static>>;
73
74#[derive(Debug, Clone, PartialEq, Hash, Eq, PartialOrd, Ord)]
78pub struct FileRange {
79 pub start: i64,
81 pub end: i64,
83}
84
85impl FileRange {
86 pub fn contains(&self, offset: i64) -> bool {
88 offset >= self.start && offset < self.end
89 }
90}
91
92#[derive(Debug, Clone)]
93pub struct PartitionedFile {
96 pub object_meta: ObjectMeta,
98 pub partition_values: Vec<ScalarValue>,
109 pub range: Option<FileRange>,
111 pub statistics: Option<Arc<Statistics>>,
116 pub extensions: Option<Arc<dyn std::any::Any + Send + Sync>>,
118 pub metadata_size_hint: Option<usize>,
120}
121
122impl PartitionedFile {
123 pub fn new(path: impl Into<String>, size: u64) -> Self {
125 Self {
126 object_meta: ObjectMeta {
127 location: Path::from(path.into()),
128 last_modified: chrono::Utc.timestamp_nanos(0),
129 size,
130 e_tag: None,
131 version: None,
132 },
133 partition_values: vec![],
134 range: None,
135 statistics: None,
136 extensions: None,
137 metadata_size_hint: None,
138 }
139 }
140
141 pub fn new_with_range(path: String, size: u64, start: i64, end: i64) -> Self {
143 Self {
144 object_meta: ObjectMeta {
145 location: Path::from(path),
146 last_modified: chrono::Utc.timestamp_nanos(0),
147 size,
148 e_tag: None,
149 version: None,
150 },
151 partition_values: vec![],
152 range: Some(FileRange { start, end }),
153 statistics: None,
154 extensions: None,
155 metadata_size_hint: None,
156 }
157 .with_range(start, end)
158 }
159
160 pub fn with_metadata_size_hint(mut self, metadata_size_hint: usize) -> Self {
164 self.metadata_size_hint = Some(metadata_size_hint);
165 self
166 }
167
168 pub fn from_path(path: String) -> Result<Self> {
170 let size = std::fs::metadata(path.clone())?.len();
171 Ok(Self::new(path, size))
172 }
173
174 pub fn path(&self) -> &Path {
176 &self.object_meta.location
177 }
178
179 pub fn with_range(mut self, start: i64, end: i64) -> Self {
181 self.range = Some(FileRange { start, end });
182 self
183 }
184
185 pub fn with_extensions(
189 mut self,
190 extensions: Arc<dyn std::any::Any + Send + Sync>,
191 ) -> Self {
192 self.extensions = Some(extensions);
193 self
194 }
195
196 pub fn with_statistics(mut self, statistics: Arc<Statistics>) -> Self {
198 self.statistics = Some(statistics);
199 self
200 }
201
202 pub fn has_statistics(&self) -> bool {
206 if let Some(stats) = &self.statistics {
207 stats.column_statistics.iter().any(|col_stats| {
208 col_stats.null_count != Precision::Absent
209 || col_stats.max_value != Precision::Absent
210 || col_stats.min_value != Precision::Absent
211 || col_stats.sum_value != Precision::Absent
212 || col_stats.distinct_count != Precision::Absent
213 })
214 } else {
215 false
216 }
217 }
218}
219
220impl From<ObjectMeta> for PartitionedFile {
221 fn from(object_meta: ObjectMeta) -> Self {
222 PartitionedFile {
223 object_meta,
224 partition_values: vec![],
225 range: None,
226 statistics: None,
227 extensions: None,
228 metadata_size_hint: None,
229 }
230 }
231}
232
233pub enum RangeCalculation {
247 Range(Option<Range<u64>>),
248 TerminateEarly,
249}
250
251pub async fn calculate_range(
262 file_meta: &FileMeta,
263 store: &Arc<dyn ObjectStore>,
264 terminator: Option<u8>,
265) -> Result<RangeCalculation> {
266 let location = file_meta.location();
267 let file_size = file_meta.object_meta.size;
268 let newline = terminator.unwrap_or(b'\n');
269
270 match file_meta.range {
271 None => Ok(RangeCalculation::Range(None)),
272 Some(FileRange { start, end }) => {
273 let start: u64 = start.try_into().map_err(|_| {
274 exec_datafusion_err!("Expect start range to fit in u64, got {start}")
275 })?;
276 let end: u64 = end.try_into().map_err(|_| {
277 exec_datafusion_err!("Expect end range to fit in u64, got {end}")
278 })?;
279
280 let start_delta = if start != 0 {
281 find_first_newline(store, location, start - 1, file_size, newline).await?
282 } else {
283 0
284 };
285
286 let end_delta = if end != file_size {
287 find_first_newline(store, location, end - 1, file_size, newline).await?
288 } else {
289 0
290 };
291
292 let range = start + start_delta..end + end_delta;
293
294 if range.start == range.end {
295 return Ok(RangeCalculation::TerminateEarly);
296 }
297
298 Ok(RangeCalculation::Range(Some(range)))
299 }
300 }
301}
302
303async fn find_first_newline(
315 object_store: &Arc<dyn ObjectStore>,
316 location: &Path,
317 start: u64,
318 end: u64,
319 newline: u8,
320) -> Result<u64> {
321 let options = GetOptions {
322 range: Some(GetRange::Bounded(start..end)),
323 ..Default::default()
324 };
325
326 let result = object_store.get_opts(location, options).await?;
327 let mut result_stream = result.into_stream();
328
329 let mut index = 0;
330
331 while let Some(chunk) = result_stream.next().await.transpose()? {
332 if let Some(position) = chunk.iter().position(|&byte| byte == newline) {
333 let position = position as u64;
334 return Ok(index + position);
335 }
336
337 index += chunk.len() as u64;
338 }
339
340 Ok(index)
341}
342
343pub fn generate_test_files(num_files: usize, overlap_factor: f64) -> Vec<FileGroup> {
383 let mut files = Vec::with_capacity(num_files);
384 if num_files == 0 {
385 return vec![];
386 }
387 let range_size = if overlap_factor == 0.0 {
388 100 / num_files as i64
389 } else {
390 (100.0 / (overlap_factor * num_files as f64)).max(1.0) as i64
391 };
392
393 for i in 0..num_files {
394 let base = (i as f64 * range_size as f64 * (1.0 - overlap_factor)) as i64;
395 let min = base as f64;
396 let max = (base + range_size) as f64;
397
398 let file = PartitionedFile {
399 object_meta: ObjectMeta {
400 location: Path::from(format!("file_{i}.parquet")),
401 last_modified: chrono::Utc::now(),
402 size: 1000,
403 e_tag: None,
404 version: None,
405 },
406 partition_values: vec![],
407 range: None,
408 statistics: Some(Arc::new(Statistics {
409 num_rows: Precision::Exact(100),
410 total_byte_size: Precision::Exact(1000),
411 column_statistics: vec![ColumnStatistics {
412 null_count: Precision::Exact(0),
413 max_value: Precision::Exact(ScalarValue::Float64(Some(max))),
414 min_value: Precision::Exact(ScalarValue::Float64(Some(min))),
415 sum_value: Precision::Absent,
416 distinct_count: Precision::Absent,
417 }],
418 })),
419 extensions: None,
420 metadata_size_hint: None,
421 };
422 files.push(file);
423 }
424
425 vec![FileGroup::new(files)]
426}
427
428pub fn verify_sort_integrity(file_groups: &[FileGroup]) -> bool {
431 for group in file_groups {
432 let files = group.iter().collect::<Vec<_>>();
433 for i in 1..files.len() {
434 let prev_file = files[i - 1];
435 let curr_file = files[i];
436
437 if let (Some(prev_stats), Some(curr_stats)) =
439 (&prev_file.statistics, &curr_file.statistics)
440 {
441 let prev_max = &prev_stats.column_statistics[0].max_value;
442 let curr_min = &curr_stats.column_statistics[0].min_value;
443 if curr_min.get_value().unwrap() <= prev_max.get_value().unwrap() {
444 return false;
445 }
446 }
447 }
448 }
449 true
450}
451
452#[cfg(test)]
453mod tests {
454 use super::ListingTableUrl;
455 use arrow::{
456 array::{ArrayRef, Int32Array, RecordBatch},
457 datatypes::{DataType, Field, Schema, SchemaRef},
458 };
459 use datafusion_execution::object_store::{
460 DefaultObjectStoreRegistry, ObjectStoreRegistry,
461 };
462 use object_store::{local::LocalFileSystem, path::Path};
463 use std::{collections::HashMap, ops::Not, sync::Arc};
464 use url::Url;
465
466 pub fn make_partition(sz: i32) -> RecordBatch {
468 let seq_start = 0;
469 let seq_end = sz;
470 let values = (seq_start..seq_end).collect::<Vec<_>>();
471 let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
472 let arr = Arc::new(Int32Array::from(values));
473
474 RecordBatch::try_new(schema, vec![arr as ArrayRef]).unwrap()
475 }
476
477 pub fn aggr_test_schema() -> SchemaRef {
479 let mut f1 = Field::new("c1", DataType::Utf8, false);
480 f1.set_metadata(HashMap::from_iter(vec![("testing".into(), "test".into())]));
481 let schema = Schema::new(vec![
482 f1,
483 Field::new("c2", DataType::UInt32, false),
484 Field::new("c3", DataType::Int8, false),
485 Field::new("c4", DataType::Int16, false),
486 Field::new("c5", DataType::Int32, false),
487 Field::new("c6", DataType::Int64, false),
488 Field::new("c7", DataType::UInt8, false),
489 Field::new("c8", DataType::UInt16, false),
490 Field::new("c9", DataType::UInt32, false),
491 Field::new("c10", DataType::UInt64, false),
492 Field::new("c11", DataType::Float32, false),
493 Field::new("c12", DataType::Float64, false),
494 Field::new("c13", DataType::Utf8, false),
495 ]);
496
497 Arc::new(schema)
498 }
499
500 #[test]
501 fn test_object_store_listing_url() {
502 let listing = ListingTableUrl::parse("file:///").unwrap();
503 let store = listing.object_store();
504 assert_eq!(store.as_str(), "file:///");
505
506 let listing = ListingTableUrl::parse("s3://bucket/").unwrap();
507 let store = listing.object_store();
508 assert_eq!(store.as_str(), "s3://bucket/");
509 }
510
511 #[test]
512 fn test_get_store_hdfs() {
513 let sut = DefaultObjectStoreRegistry::default();
514 let url = Url::parse("hdfs://localhost:8020").unwrap();
515 sut.register_store(&url, Arc::new(LocalFileSystem::new()));
516 let url = ListingTableUrl::parse("hdfs://localhost:8020/key").unwrap();
517 sut.get_store(url.as_ref()).unwrap();
518 }
519
520 #[test]
521 fn test_get_store_s3() {
522 let sut = DefaultObjectStoreRegistry::default();
523 let url = Url::parse("s3://bucket/key").unwrap();
524 sut.register_store(&url, Arc::new(LocalFileSystem::new()));
525 let url = ListingTableUrl::parse("s3://bucket/key").unwrap();
526 sut.get_store(url.as_ref()).unwrap();
527 }
528
529 #[test]
530 fn test_get_store_file() {
531 let sut = DefaultObjectStoreRegistry::default();
532 let url = ListingTableUrl::parse("file:///bucket/key").unwrap();
533 sut.get_store(url.as_ref()).unwrap();
534 }
535
536 #[test]
537 fn test_get_store_local() {
538 let sut = DefaultObjectStoreRegistry::default();
539 let url = ListingTableUrl::parse("../").unwrap();
540 sut.get_store(url.as_ref()).unwrap();
541 }
542
543 #[test]
544 fn test_url_contains() {
545 let url = ListingTableUrl::parse("file:///var/data/mytable/").unwrap();
546
547 assert!(url.contains(
549 &Path::parse("/var/data/mytable/data.parquet").unwrap(),
550 true
551 ));
552
553 assert!(url.contains(
555 &Path::parse("/var/data/mytable/data.parquet").unwrap(),
556 false
557 ));
558
559 assert!(url
562 .contains(
563 &Path::parse("/var/data/mytable/mysubfolder/data.parquet").unwrap(),
564 true
565 )
566 .not());
567
568 assert!(url.contains(
570 &Path::parse("/var/data/mytable/mysubfolder/data.parquet").unwrap(),
571 false
572 ));
573
574 assert!(url.contains(
576 &Path::parse("/var/data/mytable/year=2024/data.parquet").unwrap(),
577 false
578 ));
579
580 assert!(url.contains(
584 &Path::parse("/var/data/mytable/year=2024/data.parquet").unwrap(),
585 true
586 ));
587
588 assert!(url.contains(&Path::parse("/var/data/mytable/").unwrap(), true));
590
591 assert!(url.contains(&Path::parse("/var/data/mytable/").unwrap(), false));
593 }
594}