#![doc(
html_logo_url = "https://raw.githubusercontent.com/apache/datafusion/19fe44cf2f30cbdd63d4a4f52c74055163c6cc38/docs/logos/standalone_logo/logo_original.svg",
html_favicon_url = "https://raw.githubusercontent.com/apache/datafusion/19fe44cf2f30cbdd63d4a4f52c74055163c6cc38/docs/logos/standalone_logo/logo_original.svg"
)]
#![cfg_attr(docsrs, feature(doc_auto_cfg))]
pub mod display;
pub mod file;
pub mod file_compression_type;
pub mod file_format;
pub mod file_groups;
pub mod file_meta;
pub mod file_scan_config;
pub mod file_sink_config;
pub mod file_stream;
pub mod memory;
pub mod source;
mod statistics;
#[cfg(test)]
mod test_util;
pub mod url;
pub mod write;
use chrono::TimeZone;
use datafusion_common::Result;
use datafusion_common::{ScalarValue, Statistics};
use futures::Stream;
use object_store::{path::Path, ObjectMeta};
use std::pin::Pin;
use std::sync::Arc;
pub use self::url::ListingTableUrl;
pub type PartitionedFileStream =
Pin<Box<dyn Stream<Item = Result<PartitionedFile>> + Send + Sync + 'static>>;
#[derive(Debug, Clone, PartialEq, Hash, Eq, PartialOrd, Ord)]
pub struct FileRange {
pub start: i64,
pub end: i64,
}
impl FileRange {
pub fn contains(&self, offset: i64) -> bool {
offset >= self.start && offset < self.end
}
}
#[derive(Debug, Clone)]
pub struct PartitionedFile {
pub object_meta: ObjectMeta,
pub partition_values: Vec<ScalarValue>,
pub range: Option<FileRange>,
pub statistics: Option<Statistics>,
pub extensions: Option<Arc<dyn std::any::Any + Send + Sync>>,
pub metadata_size_hint: Option<usize>,
}
impl PartitionedFile {
pub fn new(path: impl Into<String>, size: u64) -> Self {
Self {
object_meta: ObjectMeta {
location: Path::from(path.into()),
last_modified: chrono::Utc.timestamp_nanos(0),
size: size as usize,
e_tag: None,
version: None,
},
partition_values: vec![],
range: None,
statistics: None,
extensions: None,
metadata_size_hint: None,
}
}
pub fn new_with_range(path: String, size: u64, start: i64, end: i64) -> Self {
Self {
object_meta: ObjectMeta {
location: Path::from(path),
last_modified: chrono::Utc.timestamp_nanos(0),
size: size as usize,
e_tag: None,
version: None,
},
partition_values: vec![],
range: Some(FileRange { start, end }),
statistics: None,
extensions: None,
metadata_size_hint: None,
}
.with_range(start, end)
}
pub fn with_metadata_size_hint(mut self, metadata_size_hint: usize) -> Self {
self.metadata_size_hint = Some(metadata_size_hint);
self
}
pub fn from_path(path: String) -> Result<Self> {
let size = std::fs::metadata(path.clone())?.len();
Ok(Self::new(path, size))
}
pub fn path(&self) -> &Path {
&self.object_meta.location
}
pub fn with_range(mut self, start: i64, end: i64) -> Self {
self.range = Some(FileRange { start, end });
self
}
pub fn with_extensions(
mut self,
extensions: Arc<dyn std::any::Any + Send + Sync>,
) -> Self {
self.extensions = Some(extensions);
self
}
}
impl From<ObjectMeta> for PartitionedFile {
fn from(object_meta: ObjectMeta) -> Self {
PartitionedFile {
object_meta,
partition_values: vec![],
range: None,
statistics: None,
extensions: None,
metadata_size_hint: None,
}
}
}
#[cfg(test)]
mod tests {
use super::ListingTableUrl;
use arrow::{
array::{ArrayRef, Int32Array, RecordBatch},
datatypes::{DataType, Field, Schema, SchemaRef},
};
use datafusion_execution::object_store::{
DefaultObjectStoreRegistry, ObjectStoreRegistry,
};
use object_store::{local::LocalFileSystem, path::Path};
use std::{collections::HashMap, ops::Not, sync::Arc};
use url::Url;
pub fn make_partition(sz: i32) -> RecordBatch {
let seq_start = 0;
let seq_end = sz;
let values = (seq_start..seq_end).collect::<Vec<_>>();
let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
let arr = Arc::new(Int32Array::from(values));
RecordBatch::try_new(schema, vec![arr as ArrayRef]).unwrap()
}
pub fn aggr_test_schema() -> SchemaRef {
let mut f1 = Field::new("c1", DataType::Utf8, false);
f1.set_metadata(HashMap::from_iter(vec![("testing".into(), "test".into())]));
let schema = Schema::new(vec![
f1,
Field::new("c2", DataType::UInt32, false),
Field::new("c3", DataType::Int8, false),
Field::new("c4", DataType::Int16, false),
Field::new("c5", DataType::Int32, false),
Field::new("c6", DataType::Int64, false),
Field::new("c7", DataType::UInt8, false),
Field::new("c8", DataType::UInt16, false),
Field::new("c9", DataType::UInt32, false),
Field::new("c10", DataType::UInt64, false),
Field::new("c11", DataType::Float32, false),
Field::new("c12", DataType::Float64, false),
Field::new("c13", DataType::Utf8, false),
]);
Arc::new(schema)
}
#[test]
fn test_object_store_listing_url() {
let listing = ListingTableUrl::parse("file:///").unwrap();
let store = listing.object_store();
assert_eq!(store.as_str(), "file:///");
let listing = ListingTableUrl::parse("s3://bucket/").unwrap();
let store = listing.object_store();
assert_eq!(store.as_str(), "s3://bucket/");
}
#[test]
fn test_get_store_hdfs() {
let sut = DefaultObjectStoreRegistry::default();
let url = Url::parse("hdfs://localhost:8020").unwrap();
sut.register_store(&url, Arc::new(LocalFileSystem::new()));
let url = ListingTableUrl::parse("hdfs://localhost:8020/key").unwrap();
sut.get_store(url.as_ref()).unwrap();
}
#[test]
fn test_get_store_s3() {
let sut = DefaultObjectStoreRegistry::default();
let url = Url::parse("s3://bucket/key").unwrap();
sut.register_store(&url, Arc::new(LocalFileSystem::new()));
let url = ListingTableUrl::parse("s3://bucket/key").unwrap();
sut.get_store(url.as_ref()).unwrap();
}
#[test]
fn test_get_store_file() {
let sut = DefaultObjectStoreRegistry::default();
let url = ListingTableUrl::parse("file:///bucket/key").unwrap();
sut.get_store(url.as_ref()).unwrap();
}
#[test]
fn test_get_store_local() {
let sut = DefaultObjectStoreRegistry::default();
let url = ListingTableUrl::parse("../").unwrap();
sut.get_store(url.as_ref()).unwrap();
}
#[test]
fn test_url_contains() {
let url = ListingTableUrl::parse("file:///var/data/mytable/").unwrap();
assert!(url.contains(
&Path::parse("/var/data/mytable/data.parquet").unwrap(),
true
));
assert!(url.contains(
&Path::parse("/var/data/mytable/data.parquet").unwrap(),
false
));
assert!(url
.contains(
&Path::parse("/var/data/mytable/mysubfolder/data.parquet").unwrap(),
true
)
.not());
assert!(url.contains(
&Path::parse("/var/data/mytable/mysubfolder/data.parquet").unwrap(),
false
));
assert!(url.contains(
&Path::parse("/var/data/mytable/year=2024/data.parquet").unwrap(),
false
));
assert!(url.contains(
&Path::parse("/var/data/mytable/year=2024/data.parquet").unwrap(),
true
));
assert!(url.contains(&Path::parse("/var/data/mytable/").unwrap(), true));
assert!(url.contains(&Path::parse("/var/data/mytable/").unwrap(), false));
}
}