#![doc(
html_logo_url = "https://raw.githubusercontent.com/apache/datafusion/19fe44cf2f30cbdd63d4a4f52c74055163c6cc38/docs/logos/standalone_logo/logo_original.svg",
html_favicon_url = "https://raw.githubusercontent.com/apache/datafusion/19fe44cf2f30cbdd63d4a4f52c74055163c6cc38/docs/logos/standalone_logo/logo_original.svg"
)]
#![cfg_attr(docsrs, feature(doc_cfg))]
#![cfg_attr(not(test), deny(clippy::clone_on_ref_ptr))]
#![cfg_attr(test, allow(clippy::needless_pass_by_value))]
pub mod decoder;
pub mod display;
pub mod file;
pub mod file_compression_type;
pub mod file_format;
pub mod file_groups;
pub mod file_scan_config;
pub mod file_sink_config;
pub mod file_stream;
pub mod memory;
pub mod projection;
pub mod schema_adapter;
pub mod sink;
pub mod source;
mod statistics;
pub mod table_schema;
#[cfg(test)]
pub mod test_util;
pub mod url;
pub mod write;
pub use self::file::as_file_source;
pub use self::url::ListingTableUrl;
use crate::file_groups::FileGroup;
use chrono::TimeZone;
use datafusion_common::stats::Precision;
use datafusion_common::{ColumnStatistics, Result, exec_datafusion_err};
use datafusion_common::{ScalarValue, Statistics};
use datafusion_physical_expr::LexOrdering;
use futures::{Stream, StreamExt};
use object_store::{GetOptions, GetRange, ObjectStore};
use object_store::{ObjectMeta, path::Path};
pub use table_schema::TableSchema;
#[expect(deprecated)]
pub use statistics::add_row_stats;
pub use statistics::compute_all_files_statistics;
use std::ops::Range;
use std::pin::Pin;
use std::sync::Arc;
pub type PartitionedFileStream =
Pin<Box<dyn Stream<Item = Result<PartitionedFile>> + Send + Sync + 'static>>;
#[derive(Debug, Clone, PartialEq, Hash, Eq, PartialOrd, Ord)]
pub struct FileRange {
pub start: i64,
pub end: i64,
}
impl FileRange {
pub fn contains(&self, offset: i64) -> bool {
offset >= self.start && offset < self.end
}
}
#[derive(Debug, Clone)]
pub struct PartitionedFile {
pub object_meta: ObjectMeta,
pub partition_values: Vec<ScalarValue>,
pub range: Option<FileRange>,
pub statistics: Option<Arc<Statistics>>,
pub ordering: Option<LexOrdering>,
pub extensions: Option<Arc<dyn std::any::Any + Send + Sync>>,
pub metadata_size_hint: Option<usize>,
}
impl PartitionedFile {
pub fn new(path: impl Into<String>, size: u64) -> Self {
Self {
object_meta: ObjectMeta {
location: Path::from(path.into()),
last_modified: chrono::Utc.timestamp_nanos(0),
size,
e_tag: None,
version: None,
},
partition_values: vec![],
range: None,
statistics: None,
ordering: None,
extensions: None,
metadata_size_hint: None,
}
}
pub fn new_from_meta(object_meta: ObjectMeta) -> Self {
Self {
object_meta,
partition_values: vec![],
range: None,
statistics: None,
ordering: None,
extensions: None,
metadata_size_hint: None,
}
}
pub fn new_with_range(path: String, size: u64, start: i64, end: i64) -> Self {
Self {
object_meta: ObjectMeta {
location: Path::from(path),
last_modified: chrono::Utc.timestamp_nanos(0),
size,
e_tag: None,
version: None,
},
partition_values: vec![],
range: Some(FileRange { start, end }),
statistics: None,
ordering: None,
extensions: None,
metadata_size_hint: None,
}
.with_range(start, end)
}
pub fn with_partition_values(mut self, partition_values: Vec<ScalarValue>) -> Self {
self.partition_values = partition_values;
self
}
pub fn effective_size(&self) -> u64 {
if let Some(range) = &self.range {
(range.end - range.start) as u64
} else {
self.object_meta.size
}
}
pub fn range(&self) -> (u64, u64) {
if let Some(range) = &self.range {
(range.start as u64, range.end as u64)
} else {
(0, self.object_meta.size)
}
}
pub fn with_metadata_size_hint(mut self, metadata_size_hint: usize) -> Self {
self.metadata_size_hint = Some(metadata_size_hint);
self
}
pub fn from_path(path: String) -> Result<Self> {
let size = std::fs::metadata(path.clone())?.len();
Ok(Self::new(path, size))
}
pub fn path(&self) -> &Path {
&self.object_meta.location
}
pub fn with_range(mut self, start: i64, end: i64) -> Self {
self.range = Some(FileRange { start, end });
self
}
pub fn with_extensions(
mut self,
extensions: Arc<dyn std::any::Any + Send + Sync>,
) -> Self {
self.extensions = Some(extensions);
self
}
pub fn with_statistics(mut self, file_statistics: Arc<Statistics>) -> Self {
if self.partition_values.is_empty() {
self.statistics = Some(file_statistics);
} else {
let mut stats = Arc::unwrap_or_clone(file_statistics);
for partition_value in &self.partition_values {
let col_stats = ColumnStatistics {
null_count: Precision::Exact(0),
max_value: Precision::Exact(partition_value.clone()),
min_value: Precision::Exact(partition_value.clone()),
distinct_count: Precision::Exact(1),
sum_value: Precision::Absent,
byte_size: partition_value
.data_type()
.primitive_width()
.map(|w| stats.num_rows.multiply(&Precision::Exact(w)))
.unwrap_or_else(|| Precision::Absent),
};
stats.column_statistics.push(col_stats);
}
self.statistics = Some(Arc::new(stats));
}
self
}
pub fn has_statistics(&self) -> bool {
if let Some(stats) = &self.statistics {
stats.column_statistics.iter().any(|col_stats| {
col_stats.null_count != Precision::Absent
|| col_stats.max_value != Precision::Absent
|| col_stats.min_value != Precision::Absent
|| col_stats.sum_value != Precision::Absent
|| col_stats.distinct_count != Precision::Absent
})
} else {
false
}
}
pub fn with_ordering(mut self, ordering: Option<LexOrdering>) -> Self {
self.ordering = ordering;
self
}
}
impl From<ObjectMeta> for PartitionedFile {
fn from(object_meta: ObjectMeta) -> Self {
PartitionedFile {
object_meta,
partition_values: vec![],
range: None,
statistics: None,
ordering: None,
extensions: None,
metadata_size_hint: None,
}
}
}
pub enum RangeCalculation {
Range(Option<Range<u64>>),
TerminateEarly,
}
pub async fn calculate_range(
file: &PartitionedFile,
store: &Arc<dyn ObjectStore>,
terminator: Option<u8>,
) -> Result<RangeCalculation> {
let location = &file.object_meta.location;
let file_size = file.object_meta.size;
let newline = terminator.unwrap_or(b'\n');
match file.range {
None => Ok(RangeCalculation::Range(None)),
Some(FileRange { start, end }) => {
let start: u64 = start.try_into().map_err(|_| {
exec_datafusion_err!("Expect start range to fit in u64, got {start}")
})?;
let end: u64 = end.try_into().map_err(|_| {
exec_datafusion_err!("Expect end range to fit in u64, got {end}")
})?;
let start_delta = if start != 0 {
find_first_newline(store, location, start - 1, file_size, newline).await?
} else {
0
};
if start + start_delta > end {
return Ok(RangeCalculation::TerminateEarly);
}
let end_delta = if end != file_size {
find_first_newline(store, location, end - 1, file_size, newline).await?
} else {
0
};
let range = start + start_delta..end + end_delta;
if range.start >= range.end {
return Ok(RangeCalculation::TerminateEarly);
}
Ok(RangeCalculation::Range(Some(range)))
}
}
}
async fn find_first_newline(
object_store: &Arc<dyn ObjectStore>,
location: &Path,
start: u64,
end: u64,
newline: u8,
) -> Result<u64> {
let options = GetOptions {
range: Some(GetRange::Bounded(start..end)),
..Default::default()
};
let result = object_store.get_opts(location, options).await?;
let mut result_stream = result.into_stream();
let mut index = 0;
while let Some(chunk) = result_stream.next().await.transpose()? {
if let Some(position) = chunk.iter().position(|&byte| byte == newline) {
let position = position as u64;
return Ok(index + position);
}
index += chunk.len() as u64;
}
Ok(index)
}
pub fn generate_test_files(num_files: usize, overlap_factor: f64) -> Vec<FileGroup> {
let mut files = Vec::with_capacity(num_files);
if num_files == 0 {
return vec![];
}
let range_size = if overlap_factor == 0.0 {
100 / num_files as i64
} else {
(100.0 / (overlap_factor * num_files as f64)).max(1.0) as i64
};
for i in 0..num_files {
let base = (i as f64 * range_size as f64 * (1.0 - overlap_factor)) as i64;
let min = base as f64;
let max = (base + range_size) as f64;
let file = PartitionedFile {
object_meta: ObjectMeta {
location: Path::from(format!("file_{i}.parquet")),
last_modified: chrono::Utc::now(),
size: 1000,
e_tag: None,
version: None,
},
partition_values: vec![],
range: None,
statistics: Some(Arc::new(Statistics {
num_rows: Precision::Exact(100),
total_byte_size: Precision::Exact(1000),
column_statistics: vec![ColumnStatistics {
null_count: Precision::Exact(0),
max_value: Precision::Exact(ScalarValue::Float64(Some(max))),
min_value: Precision::Exact(ScalarValue::Float64(Some(min))),
sum_value: Precision::Absent,
distinct_count: Precision::Absent,
byte_size: Precision::Absent,
}],
})),
ordering: None,
extensions: None,
metadata_size_hint: None,
};
files.push(file);
}
vec![FileGroup::new(files)]
}
pub fn verify_sort_integrity(file_groups: &[FileGroup]) -> bool {
for group in file_groups {
let files = group.iter().collect::<Vec<_>>();
for i in 1..files.len() {
let prev_file = files[i - 1];
let curr_file = files[i];
if let (Some(prev_stats), Some(curr_stats)) =
(&prev_file.statistics, &curr_file.statistics)
{
let prev_max = &prev_stats.column_statistics[0].max_value;
let curr_min = &curr_stats.column_statistics[0].min_value;
if curr_min.get_value().unwrap() <= prev_max.get_value().unwrap() {
return false;
}
}
}
}
true
}
#[cfg(test)]
mod tests {
use super::ListingTableUrl;
use arrow::{
array::{ArrayRef, Int32Array, RecordBatch},
datatypes::{DataType, Field, Schema, SchemaRef},
};
use datafusion_execution::object_store::{
DefaultObjectStoreRegistry, ObjectStoreRegistry,
};
use object_store::{ObjectStoreExt, local::LocalFileSystem, path::Path};
use std::{collections::HashMap, ops::Not, sync::Arc};
use url::Url;
pub fn make_partition(sz: i32) -> RecordBatch {
let seq_start = 0;
let seq_end = sz;
let values = (seq_start..seq_end).collect::<Vec<_>>();
let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
let arr = Arc::new(Int32Array::from(values));
RecordBatch::try_new(schema, vec![arr as ArrayRef]).unwrap()
}
pub fn aggr_test_schema() -> SchemaRef {
let mut f1 = Field::new("c1", DataType::Utf8, false);
f1.set_metadata(HashMap::from_iter(vec![("testing".into(), "test".into())]));
let schema = Schema::new(vec![
f1,
Field::new("c2", DataType::UInt32, false),
Field::new("c3", DataType::Int8, false),
Field::new("c4", DataType::Int16, false),
Field::new("c5", DataType::Int32, false),
Field::new("c6", DataType::Int64, false),
Field::new("c7", DataType::UInt8, false),
Field::new("c8", DataType::UInt16, false),
Field::new("c9", DataType::UInt32, false),
Field::new("c10", DataType::UInt64, false),
Field::new("c11", DataType::Float32, false),
Field::new("c12", DataType::Float64, false),
Field::new("c13", DataType::Utf8, false),
]);
Arc::new(schema)
}
#[test]
fn test_object_store_listing_url() {
let listing = ListingTableUrl::parse("file:///").unwrap();
let store = listing.object_store();
assert_eq!(store.as_str(), "file:///");
let listing = ListingTableUrl::parse("s3://bucket/").unwrap();
let store = listing.object_store();
assert_eq!(store.as_str(), "s3://bucket/");
}
#[test]
fn test_get_store_hdfs() {
let sut = DefaultObjectStoreRegistry::default();
let url = Url::parse("hdfs://localhost:8020").unwrap();
sut.register_store(&url, Arc::new(LocalFileSystem::new()));
let url = ListingTableUrl::parse("hdfs://localhost:8020/key").unwrap();
sut.get_store(url.as_ref()).unwrap();
}
#[test]
fn test_get_store_s3() {
let sut = DefaultObjectStoreRegistry::default();
let url = Url::parse("s3://bucket/key").unwrap();
sut.register_store(&url, Arc::new(LocalFileSystem::new()));
let url = ListingTableUrl::parse("s3://bucket/key").unwrap();
sut.get_store(url.as_ref()).unwrap();
}
#[test]
fn test_get_store_file() {
let sut = DefaultObjectStoreRegistry::default();
let url = ListingTableUrl::parse("file:///bucket/key").unwrap();
sut.get_store(url.as_ref()).unwrap();
}
#[test]
fn test_get_store_local() {
let sut = DefaultObjectStoreRegistry::default();
let url = ListingTableUrl::parse("../").unwrap();
sut.get_store(url.as_ref()).unwrap();
}
#[test]
fn test_with_statistics_appends_partition_column_stats() {
use crate::PartitionedFile;
use datafusion_common::stats::Precision;
use datafusion_common::{ColumnStatistics, ScalarValue, Statistics};
let mut pf = PartitionedFile::new(
"test.parquet",
100, );
pf.partition_values = vec![
ScalarValue::Date32(Some(20148)), ];
let file_stats = Arc::new(Statistics {
num_rows: Precision::Exact(2),
total_byte_size: Precision::Exact(16),
column_statistics: vec![ColumnStatistics {
null_count: Precision::Exact(0),
max_value: Precision::Exact(ScalarValue::Int32(Some(4))),
min_value: Precision::Exact(ScalarValue::Int32(Some(3))),
sum_value: Precision::Absent,
distinct_count: Precision::Absent,
byte_size: Precision::Absent,
}],
});
let pf = pf.with_statistics(file_stats);
let stats = pf.statistics.unwrap();
assert_eq!(
stats.column_statistics.len(),
2,
"Expected 2 columns (id + date partition)"
);
let partition_col_stats = &stats.column_statistics[1];
assert_eq!(
partition_col_stats.null_count,
Precision::Exact(0),
"Partition column null_count should be Exact(0)"
);
assert_eq!(
partition_col_stats.min_value,
Precision::Exact(ScalarValue::Date32(Some(20148))),
"Partition column min should match partition value"
);
assert_eq!(
partition_col_stats.max_value,
Precision::Exact(ScalarValue::Date32(Some(20148))),
"Partition column max should match partition value"
);
assert_eq!(
partition_col_stats.distinct_count,
Precision::Exact(1),
"Partition column distinct_count should be Exact(1)"
);
}
#[test]
fn test_url_contains() {
let url = ListingTableUrl::parse("file:///var/data/mytable/").unwrap();
assert!(url.contains(
&Path::parse("/var/data/mytable/data.parquet").unwrap(),
true
));
assert!(url.contains(
&Path::parse("/var/data/mytable/data.parquet").unwrap(),
false
));
assert!(
url.contains(
&Path::parse("/var/data/mytable/mysubfolder/data.parquet").unwrap(),
true
)
.not()
);
assert!(url.contains(
&Path::parse("/var/data/mytable/mysubfolder/data.parquet").unwrap(),
false
));
assert!(url.contains(
&Path::parse("/var/data/mytable/year=2024/data.parquet").unwrap(),
false
));
assert!(url.contains(
&Path::parse("/var/data/mytable/year=2024/data.parquet").unwrap(),
true
));
assert!(url.contains(&Path::parse("/var/data/mytable/").unwrap(), true));
assert!(url.contains(&Path::parse("/var/data/mytable/").unwrap(), false));
}
#[tokio::test]
async fn test_calculate_range_single_line_file() {
use super::{PartitionedFile, RangeCalculation, calculate_range};
use object_store::ObjectStore;
use object_store::memory::InMemory;
let content = r#"{"id":1,"data":"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}"#;
let file_size = content.len() as u64;
let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = Path::from("test.json");
store.put(&path, content.into()).await.unwrap();
let mid = file_size / 2;
let partitioned_file = PartitionedFile::new_with_range(
path.to_string(),
file_size,
mid as i64,
file_size as i64,
);
let result = calculate_range(&partitioned_file, &store, None).await;
assert!(matches!(result, Ok(RangeCalculation::TerminateEarly)));
}
}