use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use chrono::{DateTime, FixedOffset, Utc};
use deltalake_derive::DeltaConfig;
use object_store::DynObjectStore;
use serde::{Deserialize, Serialize};
use tracing::debug;
use url::Url;
use super::normalize_table_url;
use crate::kernel::Version;
use crate::logstore::storage::IORuntime;
use crate::logstore::{LogStoreRef, StorageConfig, object_store_factories};
use crate::{DeltaResult, DeltaTable, DeltaTableError};
#[derive(Debug, Copy, Clone, PartialEq, Eq, Default)]
pub enum DeltaVersion {
#[default]
Newest,
Version(Version),
Timestamp(DateTime<Utc>),
}
#[derive(Debug, Serialize, Deserialize, Clone, DeltaConfig)]
#[serde(rename_all = "camelCase")]
pub struct DeltaTableConfig {
pub require_files: bool,
pub log_buffer_size: usize,
pub log_batch_size: usize,
#[serde(default)]
pub skip_stats: bool,
#[serde(skip_serializing, skip_deserializing)]
#[delta(skip)]
pub io_runtime: Option<IORuntime>,
}
impl Default for DeltaTableConfig {
fn default() -> Self {
Self {
require_files: true,
log_buffer_size: num_cpus::get() * 4,
log_batch_size: 1024,
skip_stats: false,
io_runtime: None,
}
}
}
impl PartialEq for DeltaTableConfig {
fn eq(&self, other: &Self) -> bool {
self.require_files == other.require_files
&& self.log_buffer_size == other.log_buffer_size
&& self.log_batch_size == other.log_batch_size
&& self.skip_stats == other.skip_stats
}
}
#[derive(Debug)]
pub struct DeltaTableBuilder {
table_url: Url,
storage_backend: Option<(Arc<DynObjectStore>, Url)>,
version: DeltaVersion,
storage_options: Option<HashMap<String, String>>,
allow_http: Option<bool>,
table_config: DeltaTableConfig,
}
impl DeltaTableBuilder {
pub fn from_url(table_url: Url) -> DeltaResult<Self> {
let table_url = Url::parse(table_url.as_str()).map_err(|_| {
DeltaTableError::NotATable(
"Received path segments that could not be canonicalized".into(),
)
})?;
debug!("creating table builder with {table_url}");
Ok(Self {
table_url,
storage_backend: None,
version: DeltaVersion::default(),
storage_options: None,
allow_http: None,
table_config: DeltaTableConfig::default(),
})
}
pub fn without_files(mut self) -> Self {
self.table_config.require_files = false;
self
}
pub fn with_skip_stats(mut self, skip_stats: bool) -> Self {
self.table_config.skip_stats = skip_stats;
self
}
pub fn with_version(mut self, version: Version) -> Self {
self.version = DeltaVersion::Version(version);
self
}
pub fn with_log_buffer_size(mut self, log_buffer_size: usize) -> DeltaResult<Self> {
if log_buffer_size == 0 {
return Err(DeltaTableError::Generic(String::from(
"Log buffer size should be positive",
)));
}
self.table_config.log_buffer_size = log_buffer_size;
Ok(self)
}
pub fn with_datestring(self, date_string: impl AsRef<str>) -> DeltaResult<Self> {
let datetime = DateTime::<Utc>::from(DateTime::<FixedOffset>::parse_from_rfc3339(
date_string.as_ref(),
)?);
Ok(self.with_timestamp(datetime))
}
pub fn with_timestamp(mut self, timestamp: DateTime<Utc>) -> Self {
self.version = DeltaVersion::Timestamp(timestamp);
self
}
pub fn with_storage_backend(
mut self,
root_storage: Arc<DynObjectStore>,
location: Url,
) -> Self {
self.storage_backend = Some((root_storage, location));
self
}
pub fn with_storage_options(mut self, storage_options: HashMap<String, String>) -> Self {
self.storage_options = Some(
storage_options
.clone()
.into_iter()
.map(|(k, v)| {
let needs_trim = v.starts_with("http://")
|| v.starts_with("https://")
|| k.to_lowercase().ends_with("_url");
if needs_trim {
(k.to_owned(), v.trim_end_matches('/').to_owned())
} else {
(k, v)
}
})
.collect(),
);
self
}
pub fn with_allow_http(mut self, allow_http: bool) -> Self {
self.allow_http = Some(allow_http);
self
}
pub fn with_io_runtime(mut self, io_runtime: IORuntime) -> Self {
self.table_config.io_runtime = Some(io_runtime);
self
}
pub fn storage_options(&self) -> HashMap<String, String> {
let mut storage_options = self.storage_options.clone().unwrap_or_default();
if let Some(allow) = self.allow_http {
storage_options.insert(
"allow_http".into(),
if allow { "true" } else { "false" }.into(),
);
};
storage_options
}
pub fn build_storage(&self) -> DeltaResult<LogStoreRef> {
debug!("build_storage() with {}", self.table_url);
let mut storage_config = StorageConfig::parse_options(self.storage_options())?;
if let Some(io_runtime) = self.table_config.io_runtime.clone() {
storage_config = storage_config.with_io_runtime(io_runtime);
}
if let Some((store, _url)) = self.storage_backend.as_ref() {
debug!("Loading a logstore with a custom store: {store:?}");
crate::logstore::logstore_with(store.clone(), &self.table_url, storage_config)
} else {
debug!(
"Loading a logstore based off the location: {:?}",
self.table_url
);
crate::logstore::logstore_for(&self.table_url, storage_config)
}
}
pub fn build(self) -> DeltaResult<DeltaTable> {
Ok(DeltaTable::new(self.build_storage()?, self.table_config))
}
pub async fn load(self) -> DeltaResult<DeltaTable> {
let version = self.version;
let mut table = self.build()?;
match version {
DeltaVersion::Newest => table.load().await?,
DeltaVersion::Version(v) => table.load_version(v).await?,
DeltaVersion::Timestamp(ts) => table.load_with_datetime(ts).await?,
}
Ok(table)
}
}
enum UriType {
LocalPath(PathBuf),
Url(Url),
}
fn expand_tilde_path(path: &str) -> DeltaResult<PathBuf> {
if path.starts_with("~/") || path == "~" {
let home_dir = dirs::home_dir().ok_or_else(|| {
DeltaTableError::InvalidTableLocation(
"Could not determine home directory for tilde expansion".to_string(),
)
})?;
if path == "~" {
Ok(home_dir)
} else {
let relative_path = &path[2..];
Ok(home_dir.join(relative_path))
}
} else {
Ok(PathBuf::from(path))
}
}
fn resolve_uri_type(table_uri: impl AsRef<str>) -> DeltaResult<UriType> {
let table_uri = table_uri.as_ref();
let known_schemes: Vec<_> = object_store_factories()
.iter()
.map(|v| v.key().scheme().to_owned())
.collect();
match Url::parse(table_uri) {
Ok(url) => {
let scheme = url.scheme().to_string();
if url.scheme() == "file" {
Ok(UriType::LocalPath(url.to_file_path().map_err(|err| {
let msg = format!("Invalid table location: {table_uri}\nError: {err:?}");
DeltaTableError::InvalidTableLocation(msg)
})?))
} else if known_schemes.contains(&scheme) {
Ok(UriType::Url(url))
} else if scheme.len() == 1 {
Ok(UriType::LocalPath(expand_tilde_path(table_uri)?))
} else {
Err(DeltaTableError::InvalidTableLocation(format!(
"Unknown scheme: {scheme}. Known schemes: {}",
known_schemes.join(",")
)))
}
}
Err(url_error) => {
match url_error {
url::ParseError::RelativeUrlWithoutBase => {
Ok(UriType::LocalPath(expand_tilde_path(table_uri)?))
}
_others => Err(DeltaTableError::InvalidTableLocation(format!(
"Could not parse {table_uri} as a URL: {url_error}"
))),
}
}
}
}
pub fn parse_table_uri(table_uri: impl AsRef<str>) -> DeltaResult<Url> {
let table_uri = table_uri.as_ref();
let uri_type: UriType = resolve_uri_type(table_uri)?;
let mut url = match uri_type {
UriType::LocalPath(path) => {
let path = std::fs::canonicalize(path).map_err(|err| {
let msg = format!("Invalid table location: {table_uri}\nError: {err:?}");
DeltaTableError::InvalidTableLocation(msg)
})?;
Url::from_directory_path(path).map_err(|_| {
let msg = format!(
"Could not construct a URL from the canonical path: {table_uri}.\n\
Something must be very wrong with the table path.",
);
DeltaTableError::InvalidTableLocation(msg)
})?
}
UriType::Url(url) => url,
};
let trimmed_path = url.path().trim_end_matches('/').to_owned();
url.set_path(&trimmed_path);
Ok(url)
}
pub fn ensure_table_uri(table_uri: impl AsRef<str>) -> DeltaResult<Url> {
let table_uri = table_uri.as_ref();
let uri_type: UriType = resolve_uri_type(table_uri)?;
let url = match uri_type {
UriType::LocalPath(path) => {
if !path.exists() {
std::fs::create_dir_all(&path).map_err(|err| {
let msg =
format!("Could not create local directory: {table_uri}\nError: {err:?}");
DeltaTableError::InvalidTableLocation(msg)
})?;
}
let path = std::fs::canonicalize(path).map_err(|err| {
let msg = format!("Invalid table location: {table_uri}\nError: {err:?}");
DeltaTableError::InvalidTableLocation(msg)
})?;
Url::from_directory_path(path).map_err(|_| {
let msg = format!(
"Could not construct a URL from the canonical path: {table_uri}.\n\
Something must be very wrong with the table path.",
);
DeltaTableError::InvalidTableLocation(msg)
})?
}
UriType::Url(url) => url,
};
Ok(normalize_table_url(&url))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::logstore::factories::DefaultObjectStoreFactory;
#[test]
fn test_ensure_table_uri() {
object_store_factories().insert(
Url::parse("s3://").unwrap(),
Arc::new(DefaultObjectStoreFactory::default()),
);
let uri = ensure_table_uri(".");
assert!(uri.is_ok());
let uri = ensure_table_uri("s3://container/path");
assert!(uri.is_ok());
assert_eq!(Url::parse("s3://container/path/").unwrap(), uri.unwrap());
#[cfg(not(windows))]
{
let uri = ensure_table_uri("file:///tmp/nonexistent/some/path");
assert!(uri.is_ok());
}
let uri = ensure_table_uri("./nonexistent");
assert!(uri.is_ok());
let file_path = std::path::Path::new("./nonexistent");
std::fs::remove_dir(file_path).unwrap();
cfg_if::cfg_if! {
if #[cfg(windows)] {
let roundtrip_cases = &[
"s3://tests/data/delta-0.8.0/",
"memory://",
"s3://bucket/my%20table/", ];
} else {
let roundtrip_cases = &[
"s3://tests/data/delta-0.8.0/",
"memory://",
"file:///",
"s3://bucket/my%20table/", ];
}
}
for case in roundtrip_cases {
let uri = ensure_table_uri(case).unwrap();
assert_eq!(case, &uri.as_str());
}
let map_cases = &[
(
"s3://tests/data/delta-0.8.0//",
"s3://tests/data/delta-0.8.0/",
),
("s3://bucket/my table", "s3://bucket/my%20table/"),
];
for (case, expected) in map_cases {
let uri = ensure_table_uri(case).unwrap();
assert_eq!(expected, &uri.as_str());
}
}
#[test]
#[cfg(windows)]
fn test_windows_uri() {
let map_cases = &[
("c://", "file:///C:/"),
];
for (case, expected) in map_cases {
let uri = ensure_table_uri(case).unwrap();
assert_eq!(expected, &uri.as_str());
}
}
#[test]
fn test_ensure_table_uri_path() {
let tmp_dir = tempfile::tempdir().unwrap();
let tmp_path = std::fs::canonicalize(tmp_dir.path()).unwrap();
let paths = &[
tmp_path.join("data/delta-0.8.0"),
tmp_path.join("space in path"),
tmp_path.join("special&chars/ä½ å¥½/😊"),
];
for path in paths {
let expected = Url::from_directory_path(path).unwrap();
let uri = ensure_table_uri(path.as_os_str().to_str().unwrap()).unwrap();
assert_eq!(expected.as_str(), uri.as_str());
assert!(path.exists());
}
let relative_path = std::path::Path::new("_tmp/test %3F");
assert!(!relative_path.exists());
ensure_table_uri(relative_path.as_os_str().to_str().unwrap()).unwrap();
assert!(relative_path.exists());
std::fs::remove_dir_all(std::path::Path::new("_tmp")).unwrap();
}
#[test]
fn test_ensure_table_uri_url() {
let expected = Url::parse("memory:///test/tests/data/delta-0.8.0/").unwrap();
let url = ensure_table_uri(&expected).unwrap();
assert_eq!(expected, url);
let tmp_dir = tempfile::tempdir().unwrap();
let tmp_path = std::fs::canonicalize(tmp_dir.path()).unwrap();
let path = tmp_path.join("data/delta-0.8.0");
let expected = Url::from_directory_path(path).unwrap();
let url = ensure_table_uri(&expected).unwrap();
assert_eq!(expected.as_str(), url.as_str());
}
#[test]
fn test_writer_storage_opts_url_trim() {
let cases = [
("SOMETHING_URL", "something://else/", "something://else"),
(
"SOMETHING",
"http://something:port/",
"http://something:port",
),
(
"SOMETHING",
"https://something:port/",
"https://something:port",
),
(
"SOME_JDBC_PREFIX",
"jdbc:mysql://mysql.db.server:3306/",
"jdbc:mysql://mysql.db.server:3306/",
),
("SOME_S3_LINK", "s3a://bucket-name/", "s3a://bucket-name/"),
("SOME_RANDOM_STRING", "a1b2c3d4e5f#/", "a1b2c3d4e5f#/"),
(
"SOME_VALUE",
"/ This is some value 123 /",
"/ This is some value 123 /",
),
];
for (key, val, expected) in cases {
let table_uri = Url::parse("memory:///test/tests/data/delta-0.8.0").unwrap();
let mut storage_opts = HashMap::<String, String>::new();
storage_opts.insert(key.to_owned(), val.to_owned());
let table = DeltaTableBuilder::from_url(table_uri)
.unwrap()
.with_storage_options(storage_opts);
let found_opts = table.storage_options();
assert_eq!(expected, found_opts.get(key).unwrap());
}
}
#[test]
fn test_expand_tilde_path() {
let home_dir = dirs::home_dir().expect("Should have home directory");
let result = expand_tilde_path("~").unwrap();
assert_eq!(result, home_dir);
let result = expand_tilde_path("~/test/path").unwrap();
assert_eq!(result, home_dir.join("test/path"));
let result = expand_tilde_path("/absolute/path").unwrap();
assert_eq!(result, PathBuf::from("/absolute/path"));
let result = expand_tilde_path("relative/path").unwrap();
assert_eq!(result, PathBuf::from("relative/path"));
let result = expand_tilde_path("~other").unwrap();
assert_eq!(result, PathBuf::from("~other"));
}
#[test]
fn test_resolve_uri_type_with_tilde() {
let home_dir = dirs::home_dir().expect("Should have home directory");
match resolve_uri_type("~/test/path").unwrap() {
UriType::LocalPath(path) => {
assert_eq!(path, home_dir.join("test/path"));
}
_ => panic!("Expected LocalPath"),
}
match resolve_uri_type("~").unwrap() {
UriType::LocalPath(path) => {
assert_eq!(path, home_dir);
}
_ => panic!("Expected LocalPath"),
}
match resolve_uri_type("regular/path").unwrap() {
UriType::LocalPath(path) => {
assert_eq!(path, PathBuf::from("regular/path"));
}
_ => panic!("Expected LocalPath"),
}
}
#[test]
fn test_invalid_url_but_invalid_file_path_too() -> DeltaResult<()> {
for wrong in &["s3://arn:aws:s3:::something", "hdfs://"] {
let result = ensure_table_uri(wrong);
assert!(
result.is_err(),
"Expected {wrong} parsed into {result:#?} to return an error because I gave it something URLish"
);
}
Ok(())
}
#[test]
fn test_ensure_table_uri_with_tilde() {
let home_dir = dirs::home_dir().expect("Should have home directory");
let test_dir = home_dir.join("delta_test_temp");
std::fs::create_dir_all(&test_dir).ok();
let tilde_path = "~/delta_test_temp";
let result = ensure_table_uri(tilde_path);
assert!(
result.is_ok(),
"ensure_table_uri should work with tilde paths"
);
let url = result.unwrap();
assert!(!url.as_str().contains("~"));
#[cfg(windows)]
{
let home_dir_normalized = home_dir.to_string_lossy().replace('\\', "/");
assert!(url.as_str().contains(&home_dir_normalized));
}
#[cfg(not(windows))]
{
assert!(url.as_str().contains(home_dir.to_string_lossy().as_ref()));
}
std::fs::remove_dir_all(&test_dir).ok();
}
#[test]
fn test_create_builder_from_non_existent_path() {
let tmp_dir = tempfile::tempdir().unwrap();
let tmp_path = std::fs::canonicalize(tmp_dir.path()).unwrap();
let new_path = tmp_path.join("new_table");
assert!(!new_path.exists());
let builder_result =
DeltaTableBuilder::from_url(Url::from_directory_path(&new_path).unwrap());
assert!(
builder_result.is_ok(),
"Builder should be created successfully even if the path does not exist"
);
let builder = builder_result.unwrap();
assert_eq!(
builder.table_url.as_str(),
Url::from_directory_path(&new_path).unwrap().as_str()
);
}
}