1#![doc(
19 html_logo_url = "https://raw.githubusercontent.com/apache/datafusion/19fe44cf2f30cbdd63d4a4f52c74055163c6cc38/docs/logos/standalone_logo/logo_original.svg",
20 html_favicon_url = "https://raw.githubusercontent.com/apache/datafusion/19fe44cf2f30cbdd63d4a4f52c74055163c6cc38/docs/logos/standalone_logo/logo_original.svg"
21)]
22#![cfg_attr(docsrs, feature(doc_cfg))]
23#![cfg_attr(not(test), deny(clippy::clone_on_ref_ptr))]
26#![cfg_attr(test, allow(clippy::needless_pass_by_value))]
27#![deny(clippy::allow_attributes)]
28
29pub mod decoder;
33pub mod display;
34pub mod file;
35pub mod file_compression_type;
36pub mod file_format;
37pub mod file_groups;
38pub mod file_scan_config;
39pub mod file_sink_config;
40pub mod file_stream;
41pub mod memory;
42pub mod projection;
43pub mod schema_adapter;
44pub mod sink;
45pub mod source;
46mod statistics;
47pub mod table_schema;
48
49#[cfg(test)]
50pub mod test_util;
51
52pub mod url;
53pub mod write;
54pub use self::file::as_file_source;
55pub use self::url::ListingTableUrl;
56use crate::file_groups::FileGroup;
57use chrono::TimeZone;
58use datafusion_common::stats::Precision;
59use datafusion_common::{ColumnStatistics, Result, exec_datafusion_err};
60use datafusion_common::{ScalarValue, Statistics};
61use futures::{Stream, StreamExt};
62use object_store::{GetOptions, GetRange, ObjectStore};
63use object_store::{ObjectMeta, path::Path};
64pub use table_schema::TableSchema;
65#[expect(deprecated)]
67pub use statistics::add_row_stats;
68pub use statistics::compute_all_files_statistics;
69use std::ops::Range;
70use std::pin::Pin;
71use std::sync::Arc;
72
73pub type PartitionedFileStream =
75 Pin<Box<dyn Stream<Item = Result<PartitionedFile>> + Send + Sync + 'static>>;
76
77#[derive(Debug, Clone, PartialEq, Hash, Eq, PartialOrd, Ord)]
81pub struct FileRange {
82 pub start: i64,
84 pub end: i64,
86}
87
88impl FileRange {
89 pub fn contains(&self, offset: i64) -> bool {
91 offset >= self.start && offset < self.end
92 }
93}
94
95#[derive(Debug, Clone)]
96pub struct PartitionedFile {
112 pub object_meta: ObjectMeta,
114 pub partition_values: Vec<ScalarValue>,
125 pub range: Option<FileRange>,
127 pub statistics: Option<Arc<Statistics>>,
136 pub extensions: Option<Arc<dyn std::any::Any + Send + Sync>>,
138 pub metadata_size_hint: Option<usize>,
140}
141
142impl PartitionedFile {
143 pub fn new(path: impl Into<String>, size: u64) -> Self {
145 Self {
146 object_meta: ObjectMeta {
147 location: Path::from(path.into()),
148 last_modified: chrono::Utc.timestamp_nanos(0),
149 size,
150 e_tag: None,
151 version: None,
152 },
153 partition_values: vec![],
154 range: None,
155 statistics: None,
156 extensions: None,
157 metadata_size_hint: None,
158 }
159 }
160
161 pub fn new_with_range(path: String, size: u64, start: i64, end: i64) -> Self {
163 Self {
164 object_meta: ObjectMeta {
165 location: Path::from(path),
166 last_modified: chrono::Utc.timestamp_nanos(0),
167 size,
168 e_tag: None,
169 version: None,
170 },
171 partition_values: vec![],
172 range: Some(FileRange { start, end }),
173 statistics: None,
174 extensions: None,
175 metadata_size_hint: None,
176 }
177 .with_range(start, end)
178 }
179
180 pub fn effective_size(&self) -> u64 {
182 if let Some(range) = &self.range {
183 (range.end - range.start) as u64
184 } else {
185 self.object_meta.size
186 }
187 }
188
189 pub fn range(&self) -> (u64, u64) {
191 if let Some(range) = &self.range {
192 (range.start as u64, range.end as u64)
193 } else {
194 (0, self.object_meta.size)
195 }
196 }
197
198 pub fn with_metadata_size_hint(mut self, metadata_size_hint: usize) -> Self {
202 self.metadata_size_hint = Some(metadata_size_hint);
203 self
204 }
205
206 pub fn from_path(path: String) -> Result<Self> {
208 let size = std::fs::metadata(path.clone())?.len();
209 Ok(Self::new(path, size))
210 }
211
212 pub fn path(&self) -> &Path {
214 &self.object_meta.location
215 }
216
217 pub fn with_range(mut self, start: i64, end: i64) -> Self {
219 self.range = Some(FileRange { start, end });
220 self
221 }
222
223 pub fn with_extensions(
227 mut self,
228 extensions: Arc<dyn std::any::Any + Send + Sync>,
229 ) -> Self {
230 self.extensions = Some(extensions);
231 self
232 }
233
234 pub fn with_statistics(mut self, file_statistics: Arc<Statistics>) -> Self {
243 if self.partition_values.is_empty() {
244 self.statistics = Some(file_statistics);
246 } else {
247 let mut stats = Arc::unwrap_or_clone(file_statistics);
249 for partition_value in &self.partition_values {
250 let col_stats = ColumnStatistics {
251 null_count: Precision::Exact(0),
252 max_value: Precision::Exact(partition_value.clone()),
253 min_value: Precision::Exact(partition_value.clone()),
254 distinct_count: Precision::Exact(1),
255 sum_value: Precision::Absent,
256 byte_size: partition_value
257 .data_type()
258 .primitive_width()
259 .map(|w| stats.num_rows.multiply(&Precision::Exact(w)))
260 .unwrap_or_else(|| Precision::Absent),
261 };
262 stats.column_statistics.push(col_stats);
263 }
264 self.statistics = Some(Arc::new(stats));
265 }
266 self
267 }
268
269 pub fn has_statistics(&self) -> bool {
273 if let Some(stats) = &self.statistics {
274 stats.column_statistics.iter().any(|col_stats| {
275 col_stats.null_count != Precision::Absent
276 || col_stats.max_value != Precision::Absent
277 || col_stats.min_value != Precision::Absent
278 || col_stats.sum_value != Precision::Absent
279 || col_stats.distinct_count != Precision::Absent
280 })
281 } else {
282 false
283 }
284 }
285}
286
287impl From<ObjectMeta> for PartitionedFile {
288 fn from(object_meta: ObjectMeta) -> Self {
289 PartitionedFile {
290 object_meta,
291 partition_values: vec![],
292 range: None,
293 statistics: None,
294 extensions: None,
295 metadata_size_hint: None,
296 }
297 }
298}
299
300pub enum RangeCalculation {
314 Range(Option<Range<u64>>),
315 TerminateEarly,
316}
317
318pub async fn calculate_range(
329 file: &PartitionedFile,
330 store: &Arc<dyn ObjectStore>,
331 terminator: Option<u8>,
332) -> Result<RangeCalculation> {
333 let location = &file.object_meta.location;
334 let file_size = file.object_meta.size;
335 let newline = terminator.unwrap_or(b'\n');
336
337 match file.range {
338 None => Ok(RangeCalculation::Range(None)),
339 Some(FileRange { start, end }) => {
340 let start: u64 = start.try_into().map_err(|_| {
341 exec_datafusion_err!("Expect start range to fit in u64, got {start}")
342 })?;
343 let end: u64 = end.try_into().map_err(|_| {
344 exec_datafusion_err!("Expect end range to fit in u64, got {end}")
345 })?;
346
347 let start_delta = if start != 0 {
348 find_first_newline(store, location, start - 1, file_size, newline).await?
349 } else {
350 0
351 };
352
353 if start + start_delta > end {
354 return Ok(RangeCalculation::TerminateEarly);
355 }
356
357 let end_delta = if end != file_size {
358 find_first_newline(store, location, end - 1, file_size, newline).await?
359 } else {
360 0
361 };
362
363 let range = start + start_delta..end + end_delta;
364
365 if range.start >= range.end {
366 return Ok(RangeCalculation::TerminateEarly);
367 }
368
369 Ok(RangeCalculation::Range(Some(range)))
370 }
371 }
372}
373
374async fn find_first_newline(
385 object_store: &Arc<dyn ObjectStore>,
386 location: &Path,
387 start: u64,
388 end: u64,
389 newline: u8,
390) -> Result<u64> {
391 let options = GetOptions {
392 range: Some(GetRange::Bounded(start..end)),
393 ..Default::default()
394 };
395
396 let result = object_store.get_opts(location, options).await?;
397 let mut result_stream = result.into_stream();
398
399 let mut index = 0;
400
401 while let Some(chunk) = result_stream.next().await.transpose()? {
402 if let Some(position) = chunk.iter().position(|&byte| byte == newline) {
403 let position = position as u64;
404 return Ok(index + position);
405 }
406
407 index += chunk.len() as u64;
408 }
409
410 Ok(index)
411}
412
413pub fn generate_test_files(num_files: usize, overlap_factor: f64) -> Vec<FileGroup> {
453 let mut files = Vec::with_capacity(num_files);
454 if num_files == 0 {
455 return vec![];
456 }
457 let range_size = if overlap_factor == 0.0 {
458 100 / num_files as i64
459 } else {
460 (100.0 / (overlap_factor * num_files as f64)).max(1.0) as i64
461 };
462
463 for i in 0..num_files {
464 let base = (i as f64 * range_size as f64 * (1.0 - overlap_factor)) as i64;
465 let min = base as f64;
466 let max = (base + range_size) as f64;
467
468 let file = PartitionedFile {
469 object_meta: ObjectMeta {
470 location: Path::from(format!("file_{i}.parquet")),
471 last_modified: chrono::Utc::now(),
472 size: 1000,
473 e_tag: None,
474 version: None,
475 },
476 partition_values: vec![],
477 range: None,
478 statistics: Some(Arc::new(Statistics {
479 num_rows: Precision::Exact(100),
480 total_byte_size: Precision::Exact(1000),
481 column_statistics: vec![ColumnStatistics {
482 null_count: Precision::Exact(0),
483 max_value: Precision::Exact(ScalarValue::Float64(Some(max))),
484 min_value: Precision::Exact(ScalarValue::Float64(Some(min))),
485 sum_value: Precision::Absent,
486 distinct_count: Precision::Absent,
487 byte_size: Precision::Absent,
488 }],
489 })),
490 extensions: None,
491 metadata_size_hint: None,
492 };
493 files.push(file);
494 }
495
496 vec![FileGroup::new(files)]
497}
498
499pub fn verify_sort_integrity(file_groups: &[FileGroup]) -> bool {
502 for group in file_groups {
503 let files = group.iter().collect::<Vec<_>>();
504 for i in 1..files.len() {
505 let prev_file = files[i - 1];
506 let curr_file = files[i];
507
508 if let (Some(prev_stats), Some(curr_stats)) =
510 (&prev_file.statistics, &curr_file.statistics)
511 {
512 let prev_max = &prev_stats.column_statistics[0].max_value;
513 let curr_min = &curr_stats.column_statistics[0].min_value;
514 if curr_min.get_value().unwrap() <= prev_max.get_value().unwrap() {
515 return false;
516 }
517 }
518 }
519 }
520 true
521}
522
523#[cfg(test)]
524mod tests {
525 use super::ListingTableUrl;
526 use arrow::{
527 array::{ArrayRef, Int32Array, RecordBatch},
528 datatypes::{DataType, Field, Schema, SchemaRef},
529 };
530 use datafusion_execution::object_store::{
531 DefaultObjectStoreRegistry, ObjectStoreRegistry,
532 };
533 use object_store::{local::LocalFileSystem, path::Path};
534 use std::{collections::HashMap, ops::Not, sync::Arc};
535 use url::Url;
536
537 pub fn make_partition(sz: i32) -> RecordBatch {
539 let seq_start = 0;
540 let seq_end = sz;
541 let values = (seq_start..seq_end).collect::<Vec<_>>();
542 let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
543 let arr = Arc::new(Int32Array::from(values));
544
545 RecordBatch::try_new(schema, vec![arr as ArrayRef]).unwrap()
546 }
547
548 pub fn aggr_test_schema() -> SchemaRef {
550 let mut f1 = Field::new("c1", DataType::Utf8, false);
551 f1.set_metadata(HashMap::from_iter(vec![("testing".into(), "test".into())]));
552 let schema = Schema::new(vec![
553 f1,
554 Field::new("c2", DataType::UInt32, false),
555 Field::new("c3", DataType::Int8, false),
556 Field::new("c4", DataType::Int16, false),
557 Field::new("c5", DataType::Int32, false),
558 Field::new("c6", DataType::Int64, false),
559 Field::new("c7", DataType::UInt8, false),
560 Field::new("c8", DataType::UInt16, false),
561 Field::new("c9", DataType::UInt32, false),
562 Field::new("c10", DataType::UInt64, false),
563 Field::new("c11", DataType::Float32, false),
564 Field::new("c12", DataType::Float64, false),
565 Field::new("c13", DataType::Utf8, false),
566 ]);
567
568 Arc::new(schema)
569 }
570
571 #[test]
572 fn test_object_store_listing_url() {
573 let listing = ListingTableUrl::parse("file:///").unwrap();
574 let store = listing.object_store();
575 assert_eq!(store.as_str(), "file:///");
576
577 let listing = ListingTableUrl::parse("s3://bucket/").unwrap();
578 let store = listing.object_store();
579 assert_eq!(store.as_str(), "s3://bucket/");
580 }
581
582 #[test]
583 fn test_get_store_hdfs() {
584 let sut = DefaultObjectStoreRegistry::default();
585 let url = Url::parse("hdfs://localhost:8020").unwrap();
586 sut.register_store(&url, Arc::new(LocalFileSystem::new()));
587 let url = ListingTableUrl::parse("hdfs://localhost:8020/key").unwrap();
588 sut.get_store(url.as_ref()).unwrap();
589 }
590
591 #[test]
592 fn test_get_store_s3() {
593 let sut = DefaultObjectStoreRegistry::default();
594 let url = Url::parse("s3://bucket/key").unwrap();
595 sut.register_store(&url, Arc::new(LocalFileSystem::new()));
596 let url = ListingTableUrl::parse("s3://bucket/key").unwrap();
597 sut.get_store(url.as_ref()).unwrap();
598 }
599
600 #[test]
601 fn test_get_store_file() {
602 let sut = DefaultObjectStoreRegistry::default();
603 let url = ListingTableUrl::parse("file:///bucket/key").unwrap();
604 sut.get_store(url.as_ref()).unwrap();
605 }
606
607 #[test]
608 fn test_get_store_local() {
609 let sut = DefaultObjectStoreRegistry::default();
610 let url = ListingTableUrl::parse("../").unwrap();
611 sut.get_store(url.as_ref()).unwrap();
612 }
613
614 #[test]
615 fn test_with_statistics_appends_partition_column_stats() {
616 use crate::PartitionedFile;
617 use datafusion_common::stats::Precision;
618 use datafusion_common::{ColumnStatistics, ScalarValue, Statistics};
619
620 let mut pf = PartitionedFile::new(
622 "test.parquet",
623 100, );
625 pf.partition_values = vec![
626 ScalarValue::Date32(Some(20148)), ];
628
629 let file_stats = Arc::new(Statistics {
631 num_rows: Precision::Exact(2),
632 total_byte_size: Precision::Exact(16),
633 column_statistics: vec![ColumnStatistics {
634 null_count: Precision::Exact(0),
635 max_value: Precision::Exact(ScalarValue::Int32(Some(4))),
636 min_value: Precision::Exact(ScalarValue::Int32(Some(3))),
637 sum_value: Precision::Absent,
638 distinct_count: Precision::Absent,
639 byte_size: Precision::Absent,
640 }],
641 });
642
643 let pf = pf.with_statistics(file_stats);
645
646 let stats = pf.statistics.unwrap();
648 assert_eq!(
649 stats.column_statistics.len(),
650 2,
651 "Expected 2 columns (id + date partition)"
652 );
653
654 let partition_col_stats = &stats.column_statistics[1];
656 assert_eq!(
657 partition_col_stats.null_count,
658 Precision::Exact(0),
659 "Partition column null_count should be Exact(0)"
660 );
661 assert_eq!(
662 partition_col_stats.min_value,
663 Precision::Exact(ScalarValue::Date32(Some(20148))),
664 "Partition column min should match partition value"
665 );
666 assert_eq!(
667 partition_col_stats.max_value,
668 Precision::Exact(ScalarValue::Date32(Some(20148))),
669 "Partition column max should match partition value"
670 );
671 assert_eq!(
672 partition_col_stats.distinct_count,
673 Precision::Exact(1),
674 "Partition column distinct_count should be Exact(1)"
675 );
676 }
677
678 #[test]
679 fn test_url_contains() {
680 let url = ListingTableUrl::parse("file:///var/data/mytable/").unwrap();
681
682 assert!(url.contains(
684 &Path::parse("/var/data/mytable/data.parquet").unwrap(),
685 true
686 ));
687
688 assert!(url.contains(
690 &Path::parse("/var/data/mytable/data.parquet").unwrap(),
691 false
692 ));
693
694 assert!(
697 url.contains(
698 &Path::parse("/var/data/mytable/mysubfolder/data.parquet").unwrap(),
699 true
700 )
701 .not()
702 );
703
704 assert!(url.contains(
706 &Path::parse("/var/data/mytable/mysubfolder/data.parquet").unwrap(),
707 false
708 ));
709
710 assert!(url.contains(
712 &Path::parse("/var/data/mytable/year=2024/data.parquet").unwrap(),
713 false
714 ));
715
716 assert!(url.contains(
720 &Path::parse("/var/data/mytable/year=2024/data.parquet").unwrap(),
721 true
722 ));
723
724 assert!(url.contains(&Path::parse("/var/data/mytable/").unwrap(), true));
726
727 assert!(url.contains(&Path::parse("/var/data/mytable/").unwrap(), false));
729 }
730
731 #[tokio::test]
733 async fn test_calculate_range_single_line_file() {
734 use super::{PartitionedFile, RangeCalculation, calculate_range};
735 use object_store::ObjectStore;
736 use object_store::memory::InMemory;
737
738 let content = r#"{"id":1,"data":"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}"#;
739 let file_size = content.len() as u64;
740
741 let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
742 let path = Path::from("test.json");
743 store.put(&path, content.into()).await.unwrap();
744
745 let mid = file_size / 2;
746 let partitioned_file = PartitionedFile::new_with_range(
747 path.to_string(),
748 file_size,
749 mid as i64,
750 file_size as i64,
751 );
752
753 let result = calculate_range(&partitioned_file, &store, None).await;
754
755 assert!(matches!(result, Ok(RangeCalculation::TerminateEarly)));
756 }
757}