1#![doc(
19 html_logo_url = "https://raw.githubusercontent.com/apache/datafusion/19fe44cf2f30cbdd63d4a4f52c74055163c6cc38/docs/logos/standalone_logo/logo_original.svg",
20 html_favicon_url = "https://raw.githubusercontent.com/apache/datafusion/19fe44cf2f30cbdd63d4a4f52c74055163c6cc38/docs/logos/standalone_logo/logo_original.svg"
21)]
22#![cfg_attr(docsrs, feature(doc_cfg))]
23#![cfg_attr(not(test), deny(clippy::clone_on_ref_ptr))]
26#![cfg_attr(test, allow(clippy::needless_pass_by_value))]
27
28pub mod decoder;
32pub mod display;
33pub mod file;
34pub mod file_compression_type;
35pub mod file_format;
36pub mod file_groups;
37pub mod file_scan_config;
38pub mod file_sink_config;
39pub mod file_stream;
40pub mod memory;
41pub mod projection;
42pub mod schema_adapter;
43pub mod sink;
44pub mod source;
45mod statistics;
46pub mod table_schema;
47
48#[cfg(test)]
49pub mod test_util;
50
51pub mod url;
52pub mod write;
53pub use self::file::as_file_source;
54pub use self::url::ListingTableUrl;
55use crate::file_groups::FileGroup;
56use chrono::TimeZone;
57use datafusion_common::stats::Precision;
58use datafusion_common::{ColumnStatistics, Result, exec_datafusion_err};
59use datafusion_common::{ScalarValue, Statistics};
60use datafusion_physical_expr::LexOrdering;
61use futures::{Stream, StreamExt};
62use object_store::{GetOptions, GetRange, ObjectStore};
63use object_store::{ObjectMeta, path::Path};
64pub use table_schema::TableSchema;
65#[expect(deprecated)]
67pub use statistics::add_row_stats;
68pub use statistics::compute_all_files_statistics;
69use std::ops::Range;
70use std::pin::Pin;
71use std::sync::Arc;
72
73pub type PartitionedFileStream =
75 Pin<Box<dyn Stream<Item = Result<PartitionedFile>> + Send + Sync + 'static>>;
76
77#[derive(Debug, Clone, PartialEq, Hash, Eq, PartialOrd, Ord)]
81pub struct FileRange {
82 pub start: i64,
84 pub end: i64,
86}
87
88impl FileRange {
89 pub fn contains(&self, offset: i64) -> bool {
91 offset >= self.start && offset < self.end
92 }
93}
94
95#[derive(Debug, Clone)]
96pub struct PartitionedFile {
112 pub object_meta: ObjectMeta,
114 pub partition_values: Vec<ScalarValue>,
125 pub range: Option<FileRange>,
127 pub statistics: Option<Arc<Statistics>>,
136 pub ordering: Option<LexOrdering>,
146 pub extensions: Option<Arc<dyn std::any::Any + Send + Sync>>,
148 pub metadata_size_hint: Option<usize>,
150}
151
152impl PartitionedFile {
153 pub fn new(path: impl Into<String>, size: u64) -> Self {
155 Self {
156 object_meta: ObjectMeta {
157 location: Path::from(path.into()),
158 last_modified: chrono::Utc.timestamp_nanos(0),
159 size,
160 e_tag: None,
161 version: None,
162 },
163 partition_values: vec![],
164 range: None,
165 statistics: None,
166 ordering: None,
167 extensions: None,
168 metadata_size_hint: None,
169 }
170 }
171
172 pub fn new_from_meta(object_meta: ObjectMeta) -> Self {
174 Self {
175 object_meta,
176 partition_values: vec![],
177 range: None,
178 statistics: None,
179 ordering: None,
180 extensions: None,
181 metadata_size_hint: None,
182 }
183 }
184
185 pub fn new_with_range(path: String, size: u64, start: i64, end: i64) -> Self {
187 Self {
188 object_meta: ObjectMeta {
189 location: Path::from(path),
190 last_modified: chrono::Utc.timestamp_nanos(0),
191 size,
192 e_tag: None,
193 version: None,
194 },
195 partition_values: vec![],
196 range: Some(FileRange { start, end }),
197 statistics: None,
198 ordering: None,
199 extensions: None,
200 metadata_size_hint: None,
201 }
202 .with_range(start, end)
203 }
204
205 pub fn with_partition_values(mut self, partition_values: Vec<ScalarValue>) -> Self {
208 self.partition_values = partition_values;
209 self
210 }
211
212 pub fn effective_size(&self) -> u64 {
214 if let Some(range) = &self.range {
215 (range.end - range.start) as u64
216 } else {
217 self.object_meta.size
218 }
219 }
220
221 pub fn range(&self) -> (u64, u64) {
223 if let Some(range) = &self.range {
224 (range.start as u64, range.end as u64)
225 } else {
226 (0, self.object_meta.size)
227 }
228 }
229
230 pub fn with_metadata_size_hint(mut self, metadata_size_hint: usize) -> Self {
234 self.metadata_size_hint = Some(metadata_size_hint);
235 self
236 }
237
238 pub fn from_path(path: String) -> Result<Self> {
240 let size = std::fs::metadata(path.clone())?.len();
241 Ok(Self::new(path, size))
242 }
243
244 pub fn path(&self) -> &Path {
246 &self.object_meta.location
247 }
248
249 pub fn with_range(mut self, start: i64, end: i64) -> Self {
251 self.range = Some(FileRange { start, end });
252 self
253 }
254
255 pub fn with_extensions(
259 mut self,
260 extensions: Arc<dyn std::any::Any + Send + Sync>,
261 ) -> Self {
262 self.extensions = Some(extensions);
263 self
264 }
265
266 pub fn with_statistics(mut self, file_statistics: Arc<Statistics>) -> Self {
275 if self.partition_values.is_empty() {
276 self.statistics = Some(file_statistics);
278 } else {
279 let mut stats = Arc::unwrap_or_clone(file_statistics);
281 for partition_value in &self.partition_values {
282 let col_stats = ColumnStatistics {
283 null_count: Precision::Exact(0),
284 max_value: Precision::Exact(partition_value.clone()),
285 min_value: Precision::Exact(partition_value.clone()),
286 distinct_count: Precision::Exact(1),
287 sum_value: Precision::Absent,
288 byte_size: partition_value
289 .data_type()
290 .primitive_width()
291 .map(|w| stats.num_rows.multiply(&Precision::Exact(w)))
292 .unwrap_or_else(|| Precision::Absent),
293 };
294 stats.column_statistics.push(col_stats);
295 }
296 self.statistics = Some(Arc::new(stats));
297 }
298 self
299 }
300
301 pub fn has_statistics(&self) -> bool {
305 if let Some(stats) = &self.statistics {
306 stats.column_statistics.iter().any(|col_stats| {
307 col_stats.null_count != Precision::Absent
308 || col_stats.max_value != Precision::Absent
309 || col_stats.min_value != Precision::Absent
310 || col_stats.sum_value != Precision::Absent
311 || col_stats.distinct_count != Precision::Absent
312 })
313 } else {
314 false
315 }
316 }
317
318 pub fn with_ordering(mut self, ordering: Option<LexOrdering>) -> Self {
323 self.ordering = ordering;
324 self
325 }
326}
327
328impl From<ObjectMeta> for PartitionedFile {
329 fn from(object_meta: ObjectMeta) -> Self {
330 PartitionedFile {
331 object_meta,
332 partition_values: vec![],
333 range: None,
334 statistics: None,
335 ordering: None,
336 extensions: None,
337 metadata_size_hint: None,
338 }
339 }
340}
341
342pub enum RangeCalculation {
356 Range(Option<Range<u64>>),
357 TerminateEarly,
358}
359
360pub async fn calculate_range(
371 file: &PartitionedFile,
372 store: &Arc<dyn ObjectStore>,
373 terminator: Option<u8>,
374) -> Result<RangeCalculation> {
375 let location = &file.object_meta.location;
376 let file_size = file.object_meta.size;
377 let newline = terminator.unwrap_or(b'\n');
378
379 match file.range {
380 None => Ok(RangeCalculation::Range(None)),
381 Some(FileRange { start, end }) => {
382 let start: u64 = start.try_into().map_err(|_| {
383 exec_datafusion_err!("Expect start range to fit in u64, got {start}")
384 })?;
385 let end: u64 = end.try_into().map_err(|_| {
386 exec_datafusion_err!("Expect end range to fit in u64, got {end}")
387 })?;
388
389 let start_delta = if start != 0 {
390 find_first_newline(store, location, start - 1, file_size, newline).await?
391 } else {
392 0
393 };
394
395 if start + start_delta > end {
396 return Ok(RangeCalculation::TerminateEarly);
397 }
398
399 let end_delta = if end != file_size {
400 find_first_newline(store, location, end - 1, file_size, newline).await?
401 } else {
402 0
403 };
404
405 let range = start + start_delta..end + end_delta;
406
407 if range.start >= range.end {
408 return Ok(RangeCalculation::TerminateEarly);
409 }
410
411 Ok(RangeCalculation::Range(Some(range)))
412 }
413 }
414}
415
416async fn find_first_newline(
427 object_store: &Arc<dyn ObjectStore>,
428 location: &Path,
429 start: u64,
430 end: u64,
431 newline: u8,
432) -> Result<u64> {
433 let options = GetOptions {
434 range: Some(GetRange::Bounded(start..end)),
435 ..Default::default()
436 };
437
438 let result = object_store.get_opts(location, options).await?;
439 let mut result_stream = result.into_stream();
440
441 let mut index = 0;
442
443 while let Some(chunk) = result_stream.next().await.transpose()? {
444 if let Some(position) = chunk.iter().position(|&byte| byte == newline) {
445 let position = position as u64;
446 return Ok(index + position);
447 }
448
449 index += chunk.len() as u64;
450 }
451
452 Ok(index)
453}
454
455pub fn generate_test_files(num_files: usize, overlap_factor: f64) -> Vec<FileGroup> {
495 let mut files = Vec::with_capacity(num_files);
496 if num_files == 0 {
497 return vec![];
498 }
499 let range_size = if overlap_factor == 0.0 {
500 100 / num_files as i64
501 } else {
502 (100.0 / (overlap_factor * num_files as f64)).max(1.0) as i64
503 };
504
505 for i in 0..num_files {
506 let base = (i as f64 * range_size as f64 * (1.0 - overlap_factor)) as i64;
507 let min = base as f64;
508 let max = (base + range_size) as f64;
509
510 let file = PartitionedFile {
511 object_meta: ObjectMeta {
512 location: Path::from(format!("file_{i}.parquet")),
513 last_modified: chrono::Utc::now(),
514 size: 1000,
515 e_tag: None,
516 version: None,
517 },
518 partition_values: vec![],
519 range: None,
520 statistics: Some(Arc::new(Statistics {
521 num_rows: Precision::Exact(100),
522 total_byte_size: Precision::Exact(1000),
523 column_statistics: vec![ColumnStatistics {
524 null_count: Precision::Exact(0),
525 max_value: Precision::Exact(ScalarValue::Float64(Some(max))),
526 min_value: Precision::Exact(ScalarValue::Float64(Some(min))),
527 sum_value: Precision::Absent,
528 distinct_count: Precision::Absent,
529 byte_size: Precision::Absent,
530 }],
531 })),
532 ordering: None,
533 extensions: None,
534 metadata_size_hint: None,
535 };
536 files.push(file);
537 }
538
539 vec![FileGroup::new(files)]
540}
541
542pub fn verify_sort_integrity(file_groups: &[FileGroup]) -> bool {
545 for group in file_groups {
546 let files = group.iter().collect::<Vec<_>>();
547 for i in 1..files.len() {
548 let prev_file = files[i - 1];
549 let curr_file = files[i];
550
551 if let (Some(prev_stats), Some(curr_stats)) =
553 (&prev_file.statistics, &curr_file.statistics)
554 {
555 let prev_max = &prev_stats.column_statistics[0].max_value;
556 let curr_min = &curr_stats.column_statistics[0].min_value;
557 if curr_min.get_value().unwrap() <= prev_max.get_value().unwrap() {
558 return false;
559 }
560 }
561 }
562 }
563 true
564}
565
566#[cfg(test)]
567mod tests {
568 use super::ListingTableUrl;
569 use arrow::{
570 array::{ArrayRef, Int32Array, RecordBatch},
571 datatypes::{DataType, Field, Schema, SchemaRef},
572 };
573 use datafusion_execution::object_store::{
574 DefaultObjectStoreRegistry, ObjectStoreRegistry,
575 };
576 use object_store::{ObjectStoreExt, local::LocalFileSystem, path::Path};
577 use std::{collections::HashMap, ops::Not, sync::Arc};
578 use url::Url;
579
580 pub fn make_partition(sz: i32) -> RecordBatch {
582 let seq_start = 0;
583 let seq_end = sz;
584 let values = (seq_start..seq_end).collect::<Vec<_>>();
585 let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
586 let arr = Arc::new(Int32Array::from(values));
587
588 RecordBatch::try_new(schema, vec![arr as ArrayRef]).unwrap()
589 }
590
591 pub fn aggr_test_schema() -> SchemaRef {
593 let mut f1 = Field::new("c1", DataType::Utf8, false);
594 f1.set_metadata(HashMap::from_iter(vec![("testing".into(), "test".into())]));
595 let schema = Schema::new(vec![
596 f1,
597 Field::new("c2", DataType::UInt32, false),
598 Field::new("c3", DataType::Int8, false),
599 Field::new("c4", DataType::Int16, false),
600 Field::new("c5", DataType::Int32, false),
601 Field::new("c6", DataType::Int64, false),
602 Field::new("c7", DataType::UInt8, false),
603 Field::new("c8", DataType::UInt16, false),
604 Field::new("c9", DataType::UInt32, false),
605 Field::new("c10", DataType::UInt64, false),
606 Field::new("c11", DataType::Float32, false),
607 Field::new("c12", DataType::Float64, false),
608 Field::new("c13", DataType::Utf8, false),
609 ]);
610
611 Arc::new(schema)
612 }
613
614 #[test]
615 fn test_object_store_listing_url() {
616 let listing = ListingTableUrl::parse("file:///").unwrap();
617 let store = listing.object_store();
618 assert_eq!(store.as_str(), "file:///");
619
620 let listing = ListingTableUrl::parse("s3://bucket/").unwrap();
621 let store = listing.object_store();
622 assert_eq!(store.as_str(), "s3://bucket/");
623 }
624
625 #[test]
626 fn test_get_store_hdfs() {
627 let sut = DefaultObjectStoreRegistry::default();
628 let url = Url::parse("hdfs://localhost:8020").unwrap();
629 sut.register_store(&url, Arc::new(LocalFileSystem::new()));
630 let url = ListingTableUrl::parse("hdfs://localhost:8020/key").unwrap();
631 sut.get_store(url.as_ref()).unwrap();
632 }
633
634 #[test]
635 fn test_get_store_s3() {
636 let sut = DefaultObjectStoreRegistry::default();
637 let url = Url::parse("s3://bucket/key").unwrap();
638 sut.register_store(&url, Arc::new(LocalFileSystem::new()));
639 let url = ListingTableUrl::parse("s3://bucket/key").unwrap();
640 sut.get_store(url.as_ref()).unwrap();
641 }
642
643 #[test]
644 fn test_get_store_file() {
645 let sut = DefaultObjectStoreRegistry::default();
646 let url = ListingTableUrl::parse("file:///bucket/key").unwrap();
647 sut.get_store(url.as_ref()).unwrap();
648 }
649
650 #[test]
651 fn test_get_store_local() {
652 let sut = DefaultObjectStoreRegistry::default();
653 let url = ListingTableUrl::parse("../").unwrap();
654 sut.get_store(url.as_ref()).unwrap();
655 }
656
657 #[test]
658 fn test_with_statistics_appends_partition_column_stats() {
659 use crate::PartitionedFile;
660 use datafusion_common::stats::Precision;
661 use datafusion_common::{ColumnStatistics, ScalarValue, Statistics};
662
663 let mut pf = PartitionedFile::new(
665 "test.parquet",
666 100, );
668 pf.partition_values = vec![
669 ScalarValue::Date32(Some(20148)), ];
671
672 let file_stats = Arc::new(Statistics {
674 num_rows: Precision::Exact(2),
675 total_byte_size: Precision::Exact(16),
676 column_statistics: vec![ColumnStatistics {
677 null_count: Precision::Exact(0),
678 max_value: Precision::Exact(ScalarValue::Int32(Some(4))),
679 min_value: Precision::Exact(ScalarValue::Int32(Some(3))),
680 sum_value: Precision::Absent,
681 distinct_count: Precision::Absent,
682 byte_size: Precision::Absent,
683 }],
684 });
685
686 let pf = pf.with_statistics(file_stats);
688
689 let stats = pf.statistics.unwrap();
691 assert_eq!(
692 stats.column_statistics.len(),
693 2,
694 "Expected 2 columns (id + date partition)"
695 );
696
697 let partition_col_stats = &stats.column_statistics[1];
699 assert_eq!(
700 partition_col_stats.null_count,
701 Precision::Exact(0),
702 "Partition column null_count should be Exact(0)"
703 );
704 assert_eq!(
705 partition_col_stats.min_value,
706 Precision::Exact(ScalarValue::Date32(Some(20148))),
707 "Partition column min should match partition value"
708 );
709 assert_eq!(
710 partition_col_stats.max_value,
711 Precision::Exact(ScalarValue::Date32(Some(20148))),
712 "Partition column max should match partition value"
713 );
714 assert_eq!(
715 partition_col_stats.distinct_count,
716 Precision::Exact(1),
717 "Partition column distinct_count should be Exact(1)"
718 );
719 }
720
721 #[test]
722 fn test_url_contains() {
723 let url = ListingTableUrl::parse("file:///var/data/mytable/").unwrap();
724
725 assert!(url.contains(
727 &Path::parse("/var/data/mytable/data.parquet").unwrap(),
728 true
729 ));
730
731 assert!(url.contains(
733 &Path::parse("/var/data/mytable/data.parquet").unwrap(),
734 false
735 ));
736
737 assert!(
740 url.contains(
741 &Path::parse("/var/data/mytable/mysubfolder/data.parquet").unwrap(),
742 true
743 )
744 .not()
745 );
746
747 assert!(url.contains(
749 &Path::parse("/var/data/mytable/mysubfolder/data.parquet").unwrap(),
750 false
751 ));
752
753 assert!(url.contains(
755 &Path::parse("/var/data/mytable/year=2024/data.parquet").unwrap(),
756 false
757 ));
758
759 assert!(url.contains(
763 &Path::parse("/var/data/mytable/year=2024/data.parquet").unwrap(),
764 true
765 ));
766
767 assert!(url.contains(&Path::parse("/var/data/mytable/").unwrap(), true));
769
770 assert!(url.contains(&Path::parse("/var/data/mytable/").unwrap(), false));
772 }
773
774 #[tokio::test]
776 async fn test_calculate_range_single_line_file() {
777 use super::{PartitionedFile, RangeCalculation, calculate_range};
778 use object_store::ObjectStore;
779 use object_store::memory::InMemory;
780
781 let content = r#"{"id":1,"data":"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}"#;
782 let file_size = content.len() as u64;
783
784 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
785 let path = Path::from("test.json");
786 store.put(&path, content.into()).await.unwrap();
787
788 let mid = file_size / 2;
789 let partitioned_file = PartitionedFile::new_with_range(
790 path.to_string(),
791 file_size,
792 mid as i64,
793 file_size as i64,
794 );
795
796 let result = calculate_range(&partitioned_file, &store, None).await;
797
798 assert!(matches!(result, Ok(RangeCalculation::TerminateEarly)));
799 }
800}