mod helpers;
mod table;
mod url;
use crate::error::Result;
use chrono::TimeZone;
use datafusion_common::ScalarValue;
use futures::Stream;
use object_store::{path::Path, ObjectMeta};
use std::pin::Pin;
use std::sync::Arc;
pub use self::url::ListingTableUrl;
pub use table::{ListingOptions, ListingTable, ListingTableConfig};
pub type PartitionedFileStream =
Pin<Box<dyn Stream<Item = Result<PartitionedFile>> + Send + Sync + 'static>>;
#[derive(Debug, Clone)]
pub struct FileRange {
pub start: i64,
pub end: i64,
}
#[derive(Debug, Clone)]
pub struct PartitionedFile {
pub object_meta: ObjectMeta,
pub partition_values: Vec<ScalarValue>,
pub range: Option<FileRange>,
pub extensions: Option<Arc<dyn std::any::Any + Send + Sync>>,
}
impl PartitionedFile {
pub fn new(path: String, size: u64) -> Self {
Self {
object_meta: ObjectMeta {
location: Path::from(path),
last_modified: chrono::Utc.timestamp_nanos(0),
size: size as usize,
},
partition_values: vec![],
range: None,
extensions: None,
}
}
pub fn new_with_range(path: String, size: u64, start: i64, end: i64) -> Self {
Self {
object_meta: ObjectMeta {
location: Path::from(path),
last_modified: chrono::Utc.timestamp_nanos(0),
size: size as usize,
},
partition_values: vec![],
range: Some(FileRange { start, end }),
extensions: None,
}
}
}
impl From<ObjectMeta> for PartitionedFile {
fn from(object_meta: ObjectMeta) -> Self {
PartitionedFile {
object_meta,
partition_values: vec![],
range: None,
extensions: None,
}
}
}
#[cfg(test)]
mod tests {
use datafusion_execution::object_store::ObjectStoreRegistry;
use object_store::local::LocalFileSystem;
use super::*;
#[test]
fn test_object_store_listing_url() {
let listing = ListingTableUrl::parse("file:///").unwrap();
let store = listing.object_store();
assert_eq!(store.as_str(), "file:///");
let listing = ListingTableUrl::parse("s3://bucket/").unwrap();
let store = listing.object_store();
assert_eq!(store.as_str(), "s3://bucket/");
}
#[test]
fn test_get_by_url_hdfs() {
let sut = ObjectStoreRegistry::default();
sut.register_store("hdfs", "localhost:8020", Arc::new(LocalFileSystem::new()));
let url = ListingTableUrl::parse("hdfs://localhost:8020/key").unwrap();
sut.get_by_url(&url).unwrap();
}
#[test]
fn test_get_by_url_s3() {
let sut = ObjectStoreRegistry::default();
sut.register_store("s3", "bucket", Arc::new(LocalFileSystem::new()));
let url = ListingTableUrl::parse("s3://bucket/key").unwrap();
sut.get_by_url(&url).unwrap();
}
#[test]
fn test_get_by_url_file() {
let sut = ObjectStoreRegistry::default();
let url = ListingTableUrl::parse("file:///bucket/key").unwrap();
sut.get_by_url(&url).unwrap();
}
#[test]
fn test_get_by_url_local() {
let sut = ObjectStoreRegistry::default();
let url = ListingTableUrl::parse("../").unwrap();
sut.get_by_url(&url).unwrap();
}
}