1#![doc(
19 html_logo_url = "https://raw.githubusercontent.com/apache/datafusion/19fe44cf2f30cbdd63d4a4f52c74055163c6cc38/docs/logos/standalone_logo/logo_original.svg",
20 html_favicon_url = "https://raw.githubusercontent.com/apache/datafusion/19fe44cf2f30cbdd63d4a4f52c74055163c6cc38/docs/logos/standalone_logo/logo_original.svg"
21)]
22#![cfg_attr(docsrs, feature(doc_cfg))]
23#![cfg_attr(not(test), deny(clippy::clone_on_ref_ptr))]
26#![cfg_attr(test, allow(clippy::needless_pass_by_value))]
27
28pub mod decoder;
32pub mod display;
33pub mod file;
34pub mod file_compression_type;
35pub mod file_format;
36pub mod file_groups;
37pub mod file_scan_config;
38pub mod file_sink_config;
39pub mod file_stream;
40pub mod memory;
41pub mod morsel;
42pub mod projection;
43pub mod schema_adapter;
44pub mod sink;
45pub mod source;
46mod statistics;
47pub mod table_schema;
48
49#[cfg(test)]
50pub mod test_util;
51
52pub mod url;
53pub mod write;
54pub use self::file::as_file_source;
55pub use self::url::ListingTableUrl;
56use crate::file_groups::FileGroup;
57use chrono::TimeZone;
58use datafusion_common::stats::Precision;
59use datafusion_common::{ColumnStatistics, Result, TableReference, exec_datafusion_err};
60use datafusion_common::{ScalarValue, Statistics};
61use datafusion_physical_expr::LexOrdering;
62use futures::{Stream, StreamExt};
63use object_store::{GetOptions, GetRange, ObjectStore};
64use object_store::{ObjectMeta, path::Path};
65pub use table_schema::TableSchema;
66#[expect(deprecated)]
68pub use statistics::add_row_stats;
69pub use statistics::compute_all_files_statistics;
70use std::any::Any;
71use std::ops::Range;
72use std::pin::Pin;
73use std::sync::Arc;
74
75pub type FileExtensions = datafusion_common::extensions::Extensions;
81
82#[deprecated(
84 since = "54.0.0",
85 note = "This type is unused and will be removed in a future release"
86)]
87pub type PartitionedFileStream =
88 Pin<Box<dyn Stream<Item = Result<PartitionedFile>> + Send + Sync + 'static>>;
89
90#[derive(Debug, Clone, PartialEq, Hash, Eq, PartialOrd, Ord)]
94pub struct FileRange {
95 pub start: i64,
97 pub end: i64,
99}
100
101impl FileRange {
102 pub fn contains(&self, offset: i64) -> bool {
104 offset >= self.start && offset < self.end
105 }
106}
107
108#[derive(Debug, Clone)]
109pub struct PartitionedFile {
125 pub object_meta: ObjectMeta,
127 pub partition_values: Vec<ScalarValue>,
138 pub range: Option<FileRange>,
140 pub statistics: Option<Arc<Statistics>>,
149 pub ordering: Option<LexOrdering>,
159 pub extensions: FileExtensions,
163 pub metadata_size_hint: Option<usize>,
165 pub table_reference: Option<TableReference>,
166}
167
168impl PartitionedFile {
169 pub fn new(path: impl Into<String>, size: u64) -> Self {
171 Self {
172 object_meta: ObjectMeta {
173 location: Path::from(path.into()),
174 last_modified: chrono::Utc.timestamp_nanos(0),
175 size,
176 e_tag: None,
177 version: None,
178 },
179 partition_values: vec![],
180 range: None,
181 statistics: None,
182 ordering: None,
183 extensions: FileExtensions::new(),
184 metadata_size_hint: None,
185 table_reference: None,
186 }
187 }
188
189 pub fn new_from_meta(object_meta: ObjectMeta) -> Self {
191 Self {
192 object_meta,
193 partition_values: vec![],
194 range: None,
195 statistics: None,
196 ordering: None,
197 extensions: FileExtensions::new(),
198 metadata_size_hint: None,
199 table_reference: None,
200 }
201 }
202
203 pub fn new_with_range(path: String, size: u64, start: i64, end: i64) -> Self {
205 Self {
206 object_meta: ObjectMeta {
207 location: Path::from(path),
208 last_modified: chrono::Utc.timestamp_nanos(0),
209 size,
210 e_tag: None,
211 version: None,
212 },
213 partition_values: vec![],
214 range: Some(FileRange { start, end }),
215 statistics: None,
216 ordering: None,
217 extensions: FileExtensions::new(),
218 metadata_size_hint: None,
219 table_reference: None,
220 }
221 .with_range(start, end)
222 }
223
224 pub fn with_partition_values(mut self, partition_values: Vec<ScalarValue>) -> Self {
227 self.partition_values = partition_values;
228 self
229 }
230
231 pub fn with_table_reference(
232 mut self,
233 table_reference: Option<TableReference>,
234 ) -> Self {
235 self.table_reference = table_reference;
236 self
237 }
238
239 pub fn effective_size(&self) -> u64 {
241 if let Some(range) = &self.range {
242 (range.end - range.start) as u64
243 } else {
244 self.object_meta.size
245 }
246 }
247
248 pub fn range(&self) -> (u64, u64) {
250 if let Some(range) = &self.range {
251 (range.start as u64, range.end as u64)
252 } else {
253 (0, self.object_meta.size)
254 }
255 }
256
257 pub fn with_metadata_size_hint(mut self, metadata_size_hint: usize) -> Self {
261 self.metadata_size_hint = Some(metadata_size_hint);
262 self
263 }
264
265 pub fn from_path(path: String) -> Result<Self> {
267 let size = std::fs::metadata(path.clone())?.len();
268 Ok(Self::new(path, size))
269 }
270
271 pub fn path(&self) -> &Path {
273 &self.object_meta.location
274 }
275
276 pub fn with_range(mut self, start: i64, end: i64) -> Self {
278 self.range = Some(FileRange { start, end });
279 self
280 }
281
282 pub fn with_extension<T: Any + Send + Sync>(mut self, value: T) -> Self {
290 self.extensions.insert(value);
291 self
292 }
293
294 pub fn extension<T: Any + Send + Sync>(&self) -> Option<&T> {
296 self.extensions.get::<T>()
297 }
298
299 #[deprecated(
304 since = "54.0.0",
305 note = "use `with_extension`; the extension is keyed by its concrete type"
306 )]
307 pub fn with_extensions(mut self, extensions: Arc<dyn Any + Send + Sync>) -> Self {
308 #[expect(deprecated)]
309 self.extensions.insert_dyn(extensions);
310 self
311 }
312
313 pub fn with_statistics(mut self, file_statistics: Arc<Statistics>) -> Self {
322 if self.partition_values.is_empty() {
323 self.statistics = Some(file_statistics);
325 } else {
326 let mut stats = Arc::unwrap_or_clone(file_statistics);
328 for partition_value in &self.partition_values {
329 let col_stats = ColumnStatistics {
330 null_count: Precision::Exact(0),
331 max_value: Precision::Exact(partition_value.clone()),
332 min_value: Precision::Exact(partition_value.clone()),
333 distinct_count: Precision::Exact(1),
334 sum_value: Precision::Absent,
335 byte_size: partition_value
336 .data_type()
337 .primitive_width()
338 .map(|w| stats.num_rows.multiply(&Precision::Exact(w)))
339 .unwrap_or_else(|| Precision::Absent),
340 };
341 stats.column_statistics.push(col_stats);
342 }
343 self.statistics = Some(Arc::new(stats));
344 }
345 self
346 }
347
348 pub fn has_statistics(&self) -> bool {
352 if let Some(stats) = &self.statistics {
353 stats.column_statistics.iter().any(|col_stats| {
354 col_stats.null_count != Precision::Absent
355 || col_stats.max_value != Precision::Absent
356 || col_stats.min_value != Precision::Absent
357 || col_stats.sum_value != Precision::Absent
358 || col_stats.distinct_count != Precision::Absent
359 })
360 } else {
361 false
362 }
363 }
364
365 pub fn with_ordering(mut self, ordering: Option<LexOrdering>) -> Self {
370 self.ordering = ordering;
371 self
372 }
373}
374
375impl From<ObjectMeta> for PartitionedFile {
376 fn from(object_meta: ObjectMeta) -> Self {
377 PartitionedFile {
378 object_meta,
379 partition_values: vec![],
380 range: None,
381 statistics: None,
382 ordering: None,
383 extensions: FileExtensions::new(),
384 metadata_size_hint: None,
385 table_reference: None,
386 }
387 }
388}
389
390pub enum RangeCalculation {
404 Range(Option<Range<u64>>),
405 TerminateEarly,
406}
407
408pub async fn calculate_range(
419 file: &PartitionedFile,
420 store: &Arc<dyn ObjectStore>,
421 terminator: Option<u8>,
422) -> Result<RangeCalculation> {
423 let location = &file.object_meta.location;
424 let file_size = file.object_meta.size;
425 let newline = terminator.unwrap_or(b'\n');
426
427 match file.range {
428 None => Ok(RangeCalculation::Range(None)),
429 Some(FileRange { start, end }) => {
430 let start: u64 = start.try_into().map_err(|_| {
431 exec_datafusion_err!("Expect start range to fit in u64, got {start}")
432 })?;
433 let end: u64 = end.try_into().map_err(|_| {
434 exec_datafusion_err!("Expect end range to fit in u64, got {end}")
435 })?;
436
437 let start_delta = if start != 0 {
438 find_first_newline(store, location, start - 1, file_size, newline).await?
439 } else {
440 0
441 };
442
443 if start + start_delta > end {
444 return Ok(RangeCalculation::TerminateEarly);
445 }
446
447 let end_delta = if end != file_size {
448 find_first_newline(store, location, end - 1, file_size, newline).await?
449 } else {
450 0
451 };
452
453 let range = start + start_delta..end + end_delta;
454
455 if range.start >= range.end {
456 return Ok(RangeCalculation::TerminateEarly);
457 }
458
459 Ok(RangeCalculation::Range(Some(range)))
460 }
461 }
462}
463
464async fn find_first_newline(
475 object_store: &Arc<dyn ObjectStore>,
476 location: &Path,
477 start: u64,
478 end: u64,
479 newline: u8,
480) -> Result<u64> {
481 let options = GetOptions {
482 range: Some(GetRange::Bounded(start..end)),
483 ..Default::default()
484 };
485
486 let result = object_store.get_opts(location, options).await?;
487 let mut result_stream = result.into_stream();
488
489 let mut index = 0;
490
491 while let Some(chunk) = result_stream.next().await.transpose()? {
492 if let Some(position) = chunk.iter().position(|&byte| byte == newline) {
493 let position = position as u64;
494 return Ok(index + position);
495 }
496
497 index += chunk.len() as u64;
498 }
499
500 Ok(index)
501}
502
503pub fn generate_test_files(num_files: usize, overlap_factor: f64) -> Vec<FileGroup> {
543 let mut files = Vec::with_capacity(num_files);
544 if num_files == 0 {
545 return vec![];
546 }
547 let range_size = if overlap_factor == 0.0 {
548 100 / num_files as i64
549 } else {
550 (100.0 / (overlap_factor * num_files as f64)).max(1.0) as i64
551 };
552
553 for i in 0..num_files {
554 let base = (i as f64 * range_size as f64 * (1.0 - overlap_factor)) as i64;
555 let min = base as f64;
556 let max = (base + range_size) as f64;
557
558 let file = PartitionedFile {
559 object_meta: ObjectMeta {
560 location: Path::from(format!("file_{i}.parquet")),
561 last_modified: chrono::Utc::now(),
562 size: 1000,
563 e_tag: None,
564 version: None,
565 },
566 partition_values: vec![],
567 range: None,
568 statistics: Some(Arc::new(Statistics {
569 num_rows: Precision::Exact(100),
570 total_byte_size: Precision::Exact(1000),
571 column_statistics: vec![ColumnStatistics {
572 null_count: Precision::Exact(0),
573 max_value: Precision::Exact(ScalarValue::Float64(Some(max))),
574 min_value: Precision::Exact(ScalarValue::Float64(Some(min))),
575 sum_value: Precision::Absent,
576 distinct_count: Precision::Absent,
577 byte_size: Precision::Absent,
578 }],
579 })),
580 ordering: None,
581 extensions: FileExtensions::new(),
582 metadata_size_hint: None,
583 table_reference: None,
584 };
585 files.push(file);
586 }
587
588 vec![FileGroup::new(files)]
589}
590
591pub fn verify_sort_integrity(file_groups: &[FileGroup]) -> bool {
594 for group in file_groups {
595 let files = group.iter().collect::<Vec<_>>();
596 for i in 1..files.len() {
597 let prev_file = files[i - 1];
598 let curr_file = files[i];
599
600 if let (Some(prev_stats), Some(curr_stats)) =
602 (&prev_file.statistics, &curr_file.statistics)
603 {
604 let prev_max = &prev_stats.column_statistics[0].max_value;
605 let curr_min = &curr_stats.column_statistics[0].min_value;
606 if curr_min.get_value().unwrap() <= prev_max.get_value().unwrap() {
607 return false;
608 }
609 }
610 }
611 }
612 true
613}
614
615#[cfg(test)]
616mod tests {
617 use super::ListingTableUrl;
618 use arrow::{
619 array::{ArrayRef, Int32Array, RecordBatch},
620 datatypes::{DataType, Field, Schema, SchemaRef},
621 };
622 use datafusion_execution::object_store::{
623 DefaultObjectStoreRegistry, ObjectStoreRegistry,
624 };
625 use object_store::{ObjectStoreExt, local::LocalFileSystem, path::Path};
626 use std::{collections::HashMap, ops::Not, sync::Arc};
627 use url::Url;
628
629 pub fn make_partition(sz: i32) -> RecordBatch {
631 let seq_start = 0;
632 let seq_end = sz;
633 let values = (seq_start..seq_end).collect::<Vec<_>>();
634 let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
635 let arr = Arc::new(Int32Array::from(values));
636
637 RecordBatch::try_new(schema, vec![arr as ArrayRef]).unwrap()
638 }
639
640 pub fn aggr_test_schema() -> SchemaRef {
642 let mut f1 = Field::new("c1", DataType::Utf8, false);
643 f1.set_metadata(HashMap::from_iter(vec![("testing".into(), "test".into())]));
644 let schema = Schema::new(vec![
645 f1,
646 Field::new("c2", DataType::UInt32, false),
647 Field::new("c3", DataType::Int8, false),
648 Field::new("c4", DataType::Int16, false),
649 Field::new("c5", DataType::Int32, false),
650 Field::new("c6", DataType::Int64, false),
651 Field::new("c7", DataType::UInt8, false),
652 Field::new("c8", DataType::UInt16, false),
653 Field::new("c9", DataType::UInt32, false),
654 Field::new("c10", DataType::UInt64, false),
655 Field::new("c11", DataType::Float32, false),
656 Field::new("c12", DataType::Float64, false),
657 Field::new("c13", DataType::Utf8, false),
658 ]);
659
660 Arc::new(schema)
661 }
662
663 #[test]
664 fn test_object_store_listing_url() {
665 let listing = ListingTableUrl::parse("file:///").unwrap();
666 let store = listing.object_store();
667 assert_eq!(store.as_str(), "file:///");
668
669 let listing = ListingTableUrl::parse("s3://bucket/").unwrap();
670 let store = listing.object_store();
671 assert_eq!(store.as_str(), "s3://bucket/");
672 }
673
674 #[test]
675 fn test_get_store_hdfs() {
676 let sut = DefaultObjectStoreRegistry::default();
677 let url = Url::parse("hdfs://localhost:8020").unwrap();
678 sut.register_store(&url, Arc::new(LocalFileSystem::new()));
679 let url = ListingTableUrl::parse("hdfs://localhost:8020/key").unwrap();
680 sut.get_store(url.as_ref()).unwrap();
681 }
682
683 #[test]
684 fn test_get_store_s3() {
685 let sut = DefaultObjectStoreRegistry::default();
686 let url = Url::parse("s3://bucket/key").unwrap();
687 sut.register_store(&url, Arc::new(LocalFileSystem::new()));
688 let url = ListingTableUrl::parse("s3://bucket/key").unwrap();
689 sut.get_store(url.as_ref()).unwrap();
690 }
691
692 #[test]
693 fn test_get_store_file() {
694 let sut = DefaultObjectStoreRegistry::default();
695 let url = ListingTableUrl::parse("file:///bucket/key").unwrap();
696 sut.get_store(url.as_ref()).unwrap();
697 }
698
699 #[test]
700 fn test_get_store_local() {
701 let sut = DefaultObjectStoreRegistry::default();
702 let url = ListingTableUrl::parse("../").unwrap();
703 sut.get_store(url.as_ref()).unwrap();
704 }
705
706 #[test]
707 fn test_with_statistics_appends_partition_column_stats() {
708 use crate::PartitionedFile;
709 use datafusion_common::stats::Precision;
710 use datafusion_common::{ColumnStatistics, ScalarValue, Statistics};
711
712 let mut pf = PartitionedFile::new(
714 "test.parquet",
715 100, );
717 pf.partition_values = vec![
718 ScalarValue::Date32(Some(20148)), ];
720
721 let file_stats = Arc::new(Statistics {
723 num_rows: Precision::Exact(2),
724 total_byte_size: Precision::Exact(16),
725 column_statistics: vec![ColumnStatistics {
726 null_count: Precision::Exact(0),
727 max_value: Precision::Exact(ScalarValue::Int32(Some(4))),
728 min_value: Precision::Exact(ScalarValue::Int32(Some(3))),
729 sum_value: Precision::Absent,
730 distinct_count: Precision::Absent,
731 byte_size: Precision::Absent,
732 }],
733 });
734
735 let pf = pf.with_statistics(file_stats);
737
738 let stats = pf.statistics.unwrap();
740 assert_eq!(
741 stats.column_statistics.len(),
742 2,
743 "Expected 2 columns (id + date partition)"
744 );
745
746 let partition_col_stats = &stats.column_statistics[1];
748 assert_eq!(
749 partition_col_stats.null_count,
750 Precision::Exact(0),
751 "Partition column null_count should be Exact(0)"
752 );
753 assert_eq!(
754 partition_col_stats.min_value,
755 Precision::Exact(ScalarValue::Date32(Some(20148))),
756 "Partition column min should match partition value"
757 );
758 assert_eq!(
759 partition_col_stats.max_value,
760 Precision::Exact(ScalarValue::Date32(Some(20148))),
761 "Partition column max should match partition value"
762 );
763 assert_eq!(
764 partition_col_stats.distinct_count,
765 Precision::Exact(1),
766 "Partition column distinct_count should be Exact(1)"
767 );
768 }
769
770 #[test]
771 fn test_url_contains() {
772 let url = ListingTableUrl::parse("file:///var/data/mytable/").unwrap();
773
774 assert!(url.contains(
776 &Path::parse("/var/data/mytable/data.parquet").unwrap(),
777 true
778 ));
779
780 assert!(url.contains(
782 &Path::parse("/var/data/mytable/data.parquet").unwrap(),
783 false
784 ));
785
786 assert!(
789 url.contains(
790 &Path::parse("/var/data/mytable/mysubfolder/data.parquet").unwrap(),
791 true
792 )
793 .not()
794 );
795
796 assert!(url.contains(
798 &Path::parse("/var/data/mytable/mysubfolder/data.parquet").unwrap(),
799 false
800 ));
801
802 assert!(url.contains(
804 &Path::parse("/var/data/mytable/year=2024/data.parquet").unwrap(),
805 false
806 ));
807
808 assert!(url.contains(
812 &Path::parse("/var/data/mytable/year=2024/data.parquet").unwrap(),
813 true
814 ));
815
816 assert!(url.contains(&Path::parse("/var/data/mytable/").unwrap(), true));
818
819 assert!(url.contains(&Path::parse("/var/data/mytable/").unwrap(), false));
821 }
822
823 #[tokio::test]
825 async fn test_calculate_range_single_line_file() {
826 use super::{PartitionedFile, RangeCalculation, calculate_range};
827 use object_store::ObjectStore;
828 use object_store::memory::InMemory;
829
830 let content = r#"{"id":1,"data":"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}"#;
831 let file_size = content.len() as u64;
832
833 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
834 let path = Path::from("test.json");
835 store.put(&path, content.into()).await.unwrap();
836
837 let mid = file_size / 2;
838 let partitioned_file = PartitionedFile::new_with_range(
839 path.to_string(),
840 file_size,
841 mid as i64,
842 file_size as i64,
843 );
844
845 let result = calculate_range(&partitioned_file, &store, None).await;
846
847 assert!(matches!(result, Ok(RangeCalculation::TerminateEarly)));
848 }
849}