use std::collections::HashMap;
use std::sync::{Arc, LazyLock};
use bytes::Bytes;
#[cfg(feature = "datafusion")]
use datafusion::datasource::object_store::ObjectStoreUrl;
use delta_kernel::engine::default::DefaultEngineBuilder;
use delta_kernel::engine::default::executor::tokio::{
TokioBackgroundExecutor, TokioMultiThreadExecutor,
};
use delta_kernel::log_segment::LogSegment;
use delta_kernel::path::{LogPathFileType, ParsedLogPath};
use delta_kernel::{AsAny, Engine};
use futures::StreamExt;
use object_store::ObjectStoreScheme;
use object_store::{Error as ObjectStoreError, ObjectStore, ObjectStoreExt as _, path::Path};
use regex::Regex;
use serde::de::{Error, SeqAccess, Visitor};
use serde::ser::SerializeSeq;
use serde::{Deserialize, Serialize};
use serde_json::Deserializer;
use tokio::runtime::RuntimeFlavor;
use tracing::*;
use url::Url;
use uuid::Uuid;
use crate::kernel::transaction::TransactionError;
use crate::kernel::{Action, Version, spawn_blocking_with_span};
use crate::table::normalize_table_url;
use crate::{DeltaResult, DeltaTableError};
pub use self::config::StorageConfig;
pub use self::factories::{
LogStoreFactory, LogStoreFactoryRegistry, ObjectStoreFactory, ObjectStoreFactoryRegistry,
logstore_factories, object_store_factories, store_for,
};
pub use self::storage::utils::commit_uri_from_version;
pub use self::storage::{
DefaultObjectStoreRegistry, DeltaIOStorageBackend, IORuntime, ObjectStoreRef,
ObjectStoreRegistry, ObjectStoreRetryExt, client_options_from_certificate,
};
pub use ::object_store;
pub mod config;
pub(crate) mod default_logstore;
pub(crate) mod factories;
pub(crate) mod storage;
trait LogStoreFactoryExt {
fn with_options_internal(
&self,
root_store: ObjectStoreRef,
location: &Url,
options: &StorageConfig,
) -> DeltaResult<LogStoreRef>;
}
impl<T: LogStoreFactory + ?Sized> LogStoreFactoryExt for T {
fn with_options_internal(
&self,
root_store: ObjectStoreRef,
location: &Url,
options: &StorageConfig,
) -> DeltaResult<LogStoreRef> {
let prefixed_store = options.decorate_store(root_store.clone(), location)?;
let log_store =
self.with_options(Arc::new(prefixed_store), root_store, location, options)?;
Ok(log_store)
}
}
impl<T: LogStoreFactory> LogStoreFactoryExt for Arc<T> {
fn with_options_internal(
&self,
root_store: ObjectStoreRef,
location: &Url,
options: &StorageConfig,
) -> DeltaResult<LogStoreRef> {
T::with_options_internal(self, root_store, location, options)
}
}
pub fn default_logstore(
prefixed_store: ObjectStoreRef,
root_store: ObjectStoreRef,
location: &Url,
options: &StorageConfig,
) -> Arc<dyn LogStore> {
Arc::new(default_logstore::DefaultLogStore::new(
prefixed_store,
root_store,
LogStoreConfig::new(location, options.clone()),
))
}
pub type LogStoreRef = Arc<dyn LogStore>;
static DELTA_LOG_PATH: LazyLock<Path> = LazyLock::new(|| Path::from("_delta_log"));
pub(crate) static DELTA_LOG_REGEX: LazyLock<Regex> =
LazyLock::new(|| Regex::new(r"(\d{20})\.(json|checkpoint(\.\d+)?\.parquet)$").unwrap());
pub fn logstore_for(location: &Url, storage_config: StorageConfig) -> DeltaResult<LogStoreRef> {
let scheme = Url::parse(&format!("{}://", location.scheme()))
.map_err(|_| DeltaTableError::InvalidTableLocation(location.clone().into()))?;
if let Some(entry) = object_store_factories().get(&scheme) {
debug!("Found a storage provider for {scheme} ({location})");
let (root_store, _prefix) = entry.value().parse_url_opts(location, &storage_config)?;
return logstore_with(root_store, location, storage_config);
}
Err(DeltaTableError::InvalidTableLocation(location.to_string()))
}
pub fn logstore_with(
root_store: ObjectStoreRef,
location: &Url,
storage_config: StorageConfig,
) -> DeltaResult<LogStoreRef> {
let scheme = Url::parse(&format!("{}://", location.scheme()))
.map_err(|_| DeltaTableError::InvalidTableLocation(location.clone().into()))?;
if let Some(factory) = logstore_factories().get(&scheme) {
debug!("Found a logstore provider for {scheme}");
return factory
.value()
.with_options_internal(root_store, location, &storage_config);
}
error!("Could not find a logstore for the scheme {scheme}");
Err(DeltaTableError::InvalidTableLocation(location.to_string()))
}
#[derive(Clone)]
pub enum CommitOrBytes {
TmpCommit(Path),
LogBytes(Bytes),
}
#[derive(Debug, Clone)]
pub struct LogStoreConfig {
location: Url,
options: StorageConfig,
}
impl LogStoreConfig {
pub fn new(location: &Url, options: StorageConfig) -> Self {
let location = normalize_table_url(location);
Self { location, options }
}
pub fn location(&self) -> &Url {
&self.location
}
pub fn options(&self) -> &StorageConfig {
&self.options
}
pub fn decorate_store<T: ObjectStore + Clone>(
&self,
store: T,
table_root: Option<&url::Url>,
) -> DeltaResult<Box<dyn ObjectStore>> {
let table_url = table_root.unwrap_or(&self.location);
self.options.decorate_store(store, table_url)
}
pub fn object_store_factory(&self) -> ObjectStoreFactoryRegistry {
self::factories::object_store_factories()
}
}
#[async_trait::async_trait]
pub trait LogStore: Send + Sync + AsAny {
fn name(&self) -> String;
async fn refresh(&self) -> DeltaResult<()> {
Ok(())
}
async fn read_commit_entry(&self, version: Version) -> DeltaResult<Option<Bytes>>;
async fn write_commit_entry(
&self,
version: Version,
commit_or_bytes: CommitOrBytes,
operation_id: Uuid,
) -> Result<(), TransactionError>;
async fn abort_commit_entry(
&self,
version: Version,
commit_or_bytes: CommitOrBytes,
operation_id: Uuid,
) -> Result<(), TransactionError>;
async fn get_latest_version(&self, start_version: Version) -> DeltaResult<Version>;
fn object_store(&self, operation_id: Option<Uuid>) -> Arc<dyn ObjectStore>;
fn root_object_store(&self, operation_id: Option<Uuid>) -> Arc<dyn ObjectStore>;
fn engine(&self, operation_id: Option<Uuid>) -> Arc<dyn Engine> {
let store = self.root_object_store(operation_id);
get_engine(store)
}
fn to_uri(&self, location: &Path) -> String {
let root = &self.config().location;
to_uri(root, location)
}
fn root_url(&self) -> &Url {
&self.config().location
}
fn log_path(&self) -> &Path {
&DELTA_LOG_PATH
}
fn transaction_url(&self, _operation_id: Option<Uuid>) -> DeltaResult<Url> {
Ok(self.config().location().clone())
}
async fn is_delta_table_location(&self) -> DeltaResult<bool> {
let object_store = self.object_store(None);
let dummy_url = Url::parse("http://example.com").unwrap();
let log_path = Path::from("_delta_log");
let mut stream = object_store.list(Some(&log_path));
while let Some(res) = stream.next().await {
match res {
Ok(meta) => {
let file_url = dummy_url.join(meta.location.as_ref()).unwrap();
if let Ok(Some(parsed_path)) = ParsedLogPath::try_from(file_url)
&& matches!(
parsed_path.file_type,
LogPathFileType::Commit
| LogPathFileType::SinglePartCheckpoint
| LogPathFileType::UuidCheckpoint
| LogPathFileType::MultiPartCheckpoint { .. }
| LogPathFileType::CompactedCommit { .. }
)
{
return Ok(true);
}
continue;
}
Err(ObjectStoreError::NotFound { .. }) => return Ok(false),
Err(err) => return Err(err.into()),
}
}
Ok(false)
}
fn config(&self) -> &LogStoreConfig;
#[cfg(feature = "datafusion")]
fn object_store_url(&self) -> ObjectStoreUrl {
crate::logstore::object_store_url(&self.config().location)
}
}
pub(crate) trait LogStoreExt: LogStore {
fn table_root_url(&self) -> Url {
let mut base = self.config().location.clone();
if !base.path().ends_with("/") {
base.set_path(&format!("{}/", base.path()));
}
base
}
fn log_root_url(&self) -> Url {
self.table_root_url().join("_delta_log/").unwrap()
}
}
impl<T: LogStore + ?Sized> LogStoreExt for T {}
#[async_trait::async_trait]
impl<T: LogStore + ?Sized> LogStore for Arc<T> {
fn name(&self) -> String {
T::name(self)
}
async fn refresh(&self) -> DeltaResult<()> {
T::refresh(self).await
}
async fn read_commit_entry(&self, version: Version) -> DeltaResult<Option<Bytes>> {
T::read_commit_entry(self, version).await
}
async fn write_commit_entry(
&self,
version: Version,
commit_or_bytes: CommitOrBytes,
operation_id: Uuid,
) -> Result<(), TransactionError> {
T::write_commit_entry(self, version, commit_or_bytes, operation_id).await
}
async fn abort_commit_entry(
&self,
version: Version,
commit_or_bytes: CommitOrBytes,
operation_id: Uuid,
) -> Result<(), TransactionError> {
T::abort_commit_entry(self, version, commit_or_bytes, operation_id).await
}
async fn get_latest_version(&self, start_version: Version) -> DeltaResult<Version> {
T::get_latest_version(self, start_version).await
}
fn object_store(&self, operation_id: Option<Uuid>) -> Arc<dyn ObjectStore> {
T::object_store(self, operation_id)
}
fn root_object_store(&self, operation_id: Option<Uuid>) -> Arc<dyn ObjectStore> {
T::root_object_store(self, operation_id)
}
fn engine(&self, operation_id: Option<Uuid>) -> Arc<dyn Engine> {
T::engine(self, operation_id)
}
fn to_uri(&self, location: &Path) -> String {
T::to_uri(self, location)
}
fn root_url(&self) -> &Url {
T::root_url(self)
}
fn log_path(&self) -> &Path {
T::log_path(self)
}
async fn is_delta_table_location(&self) -> DeltaResult<bool> {
T::is_delta_table_location(self).await
}
fn config(&self) -> &LogStoreConfig {
T::config(self)
}
#[cfg(feature = "datafusion")]
fn object_store_url(&self) -> ObjectStoreUrl {
T::object_store_url(self)
}
}
pub(crate) fn get_engine(store: Arc<dyn ObjectStore>) -> Arc<dyn Engine> {
let handle = tokio::runtime::Handle::current();
match handle.runtime_flavor() {
RuntimeFlavor::MultiThread => Arc::new(
DefaultEngineBuilder::new(store)
.with_task_executor(Arc::new(TokioMultiThreadExecutor::new(handle)))
.build(),
),
RuntimeFlavor::CurrentThread => Arc::new(
DefaultEngineBuilder::new(store)
.with_task_executor(Arc::new(TokioBackgroundExecutor::new()))
.build(),
),
_ => panic!("unsupported runtime flavor"),
}
}
#[cfg(feature = "datafusion")]
fn object_store_url(location: &Url) -> ObjectStoreUrl {
use object_store::path::DELIMITER;
let user_at = match location.username() {
u if !u.is_empty() => format!("{u}@"),
_ => "".to_string(),
};
ObjectStoreUrl::parse(format!(
"delta-rs://{}{}-{}{}",
user_at,
location.scheme(),
location.host_str().unwrap_or("-"),
location.path().replace(DELIMITER, "-").replace(':', "-")
))
.expect("Invalid object store url.")
}
pub(crate) fn object_store_path(table_root: &Url) -> DeltaResult<Path> {
Ok(match ObjectStoreScheme::parse(table_root) {
Ok((_, path)) => path,
_ => Path::parse(table_root.path())?,
})
}
pub fn to_uri(root: &Url, location: &Path) -> String {
match root.scheme() {
"file" => {
#[cfg(windows)]
let uri = format!(
"{}/{}",
root.as_ref().trim_end_matches('/'),
location.as_ref()
)
.replace("file:///", "");
#[cfg(unix)]
let uri = format!(
"{}/{}",
root.as_ref().trim_end_matches('/'),
location.as_ref()
)
.replace("file://", "");
uri
}
_ => {
if location.as_ref().is_empty() || location.as_ref() == "/" {
root.as_ref().to_string()
} else {
root.join(location.as_ref())
.expect("Somehow failed to join on a Url!")
.to_string()
}
}
}
}
pub fn get_actions(
version: Version,
commit_log_bytes: &bytes::Bytes,
) -> Result<Vec<Action>, DeltaTableError> {
debug!("parsing commit with version {version}...");
Deserializer::from_slice(commit_log_bytes)
.into_iter::<Action>()
.map(|result| {
result.map_err(|e| {
let line = format!("Error at line {}, column {}", e.line(), e.column());
DeltaTableError::InvalidJsonLog {
json_err: e,
line,
version,
}
})
})
.collect()
}
impl std::fmt::Debug for dyn LogStore + '_ {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}({})", self.name(), self.root_url())
}
}
impl Serialize for LogStoreConfig {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let mut seq = serializer.serialize_seq(None)?;
seq.serialize_element(&self.location.to_string())?;
seq.serialize_element(&self.options.raw)?;
seq.end()
}
}
impl<'de> Deserialize<'de> for LogStoreConfig {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
struct LogStoreConfigVisitor {}
impl<'de> Visitor<'de> for LogStoreConfigVisitor {
type Value = LogStoreConfig;
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
formatter.write_str("struct LogStoreConfig")
}
fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
where
A: SeqAccess<'de>,
{
let location_str: String = seq
.next_element()?
.ok_or_else(|| A::Error::invalid_length(0, &self))?;
let options: HashMap<String, String> = seq
.next_element()?
.ok_or_else(|| A::Error::invalid_length(0, &self))?;
let location = Url::parse(&location_str).map_err(A::Error::custom)?;
Ok(LogStoreConfig {
location,
options: StorageConfig::parse_options(options).map_err(A::Error::custom)?,
})
}
}
deserializer.deserialize_seq(LogStoreConfigVisitor {})
}
}
pub fn extract_version_from_filename(name: &str) -> Option<Version> {
DELTA_LOG_REGEX
.captures(name)
.map(|captures| captures.get(1).unwrap().as_str().parse().unwrap())
}
pub async fn get_latest_version(
log_store: &dyn LogStore,
current_version: Version,
) -> DeltaResult<Version> {
let storage = log_store.engine(None).storage_handler();
let log_root = log_store.log_root_url();
let segment = spawn_blocking_with_span(move || {
LogSegment::for_table_changes(storage.as_ref(), log_root, current_version, None)
})
.await
.map_err(|e| DeltaTableError::Generic(e.to_string()))?
.map_err(|e| {
if e.to_string()
.contains(&format!("to have version {current_version}"))
{
DeltaTableError::InvalidVersion(current_version)
} else {
DeltaTableError::Generic(e.to_string())
}
})?;
Ok(segment.end_version)
}
#[instrument(skip(storage), fields(version = version, path = %commit_uri_from_version(Some(version))))]
pub async fn read_commit_entry(
storage: &dyn ObjectStore,
version: Version,
) -> DeltaResult<Option<Bytes>> {
let commit_uri = commit_uri_from_version(Some(version));
match storage.get(&commit_uri).await {
Ok(res) => {
let bytes = res.bytes().await?;
debug!(size = bytes.len(), "commit entry read successfully");
Ok(Some(bytes))
}
Err(ObjectStoreError::NotFound { .. }) => {
debug!("commit entry not found");
Ok(None)
}
Err(err) => {
error!(error = %err, version = version, "failed to read commit entry");
Err(err.into())
}
}
}
#[instrument(skip(storage), fields(version = version, tmp_path = %tmp_commit, commit_path = %commit_uri_from_version(Some(version))))]
pub async fn write_commit_entry(
storage: &dyn ObjectStore,
version: Version,
tmp_commit: &Path,
) -> Result<(), TransactionError> {
storage
.rename_if_not_exists(tmp_commit, &commit_uri_from_version(Some(version)))
.await
.map_err(|err| -> TransactionError {
match err {
ObjectStoreError::AlreadyExists { .. } => {
warn!("commit entry already exists");
TransactionError::VersionAlreadyExists(version)
}
_ => {
error!(error = %err, "failed to write commit entry");
TransactionError::from(err)
}
}
})?;
debug!("commit entry written successfully");
Ok(())
}
#[instrument(skip(storage), fields(version = _version, tmp_path = %tmp_commit))]
pub async fn abort_commit_entry(
storage: &dyn ObjectStore,
_version: Version,
tmp_commit: &Path,
) -> Result<(), TransactionError> {
storage.delete_with_retries(tmp_commit, 15).await?;
debug!("commit entry aborted successfully");
Ok(())
}
#[cfg(test)]
pub(crate) mod tests {
use futures::TryStreamExt;
use pretty_assertions::assert_eq;
use super::*;
#[test]
fn test_logstore_config_ctor() {
let location = Url::parse("nonexistent://table/bar").unwrap();
let config = LogStoreConfig::new(&location, StorageConfig::default());
assert_eq!(config.location().to_string(), "nonexistent://table/bar/");
assert_eq!(
config.location().join("_delta_log/").unwrap(),
Url::parse("nonexistent://table/bar/_delta_log/").unwrap()
);
}
#[test]
fn logstore_with_invalid_url() {
let location = Url::parse("nonexistent://table").unwrap();
let store = logstore_for(&location, StorageConfig::default());
assert!(store.is_err());
}
#[test]
fn logstore_with_memory() {
let location = Url::parse("memory:///table").unwrap();
let store = logstore_for(&location, StorageConfig::default());
assert!(store.is_ok());
}
#[test]
fn logstore_with_memory_and_rt() {
let location = Url::parse("memory:///table").unwrap();
let store = logstore_for(
&location,
StorageConfig::default().with_io_runtime(IORuntime::default()),
);
assert!(store.is_ok());
}
#[test]
fn test_logstore_ext() {
let location = Url::parse("memory:///table").unwrap();
let store = logstore_for(&location, StorageConfig::default()).unwrap();
let table_url = store.table_root_url();
assert!(table_url.path().ends_with('/'));
let log_url = store.log_root_url();
assert!(log_url.path().ends_with("_delta_log/"));
}
#[tokio::test]
async fn test_is_location_a_table() {
use object_store::path::Path;
use object_store::{PutOptions, PutPayload};
let location = Url::parse("memory:///table").unwrap();
let store =
logstore_for(&location, StorageConfig::default()).expect("Failed to get logstore");
assert!(
!store
.is_delta_table_location()
.await
.expect("Failed to look at table")
);
let payload = PutPayload::from_static(b"test-drivin");
let _put = store
.object_store(None)
.put_opts(
&Path::from("_delta_log/_commit_failed.tmp"),
payload,
PutOptions::default(),
)
.await
.expect("Failed to put");
assert!(
!store
.is_delta_table_location()
.await
.expect("Failed to look at table")
);
}
#[tokio::test]
async fn test_is_location_a_table_commit() {
use object_store::path::Path;
use object_store::{PutOptions, PutPayload};
let location = Url::parse("memory:///table").unwrap();
let store =
logstore_for(&location, StorageConfig::default()).expect("Failed to get logstore");
assert!(
!store
.is_delta_table_location()
.await
.expect("Failed to identify table")
);
let payload = PutPayload::from_static(b"test");
let _put = store
.object_store(None)
.put_opts(
&Path::from("_delta_log/00000000000000000000.json"),
payload,
PutOptions::default(),
)
.await
.expect("Failed to put");
assert!(
store
.is_delta_table_location()
.await
.expect("Failed to identify table")
);
}
#[tokio::test]
async fn test_is_location_a_table_checkpoint() {
use object_store::path::Path;
use object_store::{PutOptions, PutPayload};
let location = Url::parse("memory:///table").unwrap();
let store =
logstore_for(&location, StorageConfig::default()).expect("Failed to get logstore");
assert!(
!store
.is_delta_table_location()
.await
.expect("Failed to identify table")
);
let payload = PutPayload::from_static(b"test");
let _put = store
.object_store(None)
.put_opts(
&Path::from("_delta_log/00000000000000000000.checkpoint.parquet"),
payload,
PutOptions::default(),
)
.await
.expect("Failed to put");
assert!(
store
.is_delta_table_location()
.await
.expect("Failed to identify table")
);
}
#[tokio::test]
async fn test_is_location_a_table_crc() {
use object_store::path::Path;
use object_store::{PutOptions, PutPayload};
let location = Url::parse("memory:///table").unwrap();
let store =
logstore_for(&location, StorageConfig::default()).expect("Failed to get logstore");
assert!(
!store
.is_delta_table_location()
.await
.expect("Failed to identify table")
);
let payload = PutPayload::from_static(b"test");
let _put = store
.object_store(None)
.put_opts(
&Path::from("_delta_log/.00000000000000000000.crc.crc"),
payload.clone(),
PutOptions::default(),
)
.await
.expect("Failed to put");
let _put = store
.object_store(None)
.put_opts(
&Path::from("_delta_log/.00000000000000000000.json.crc"),
payload.clone(),
PutOptions::default(),
)
.await
.expect("Failed to put");
let _put = store
.object_store(None)
.put_opts(
&Path::from("_delta_log/00000000000000000000.crc"),
payload.clone(),
PutOptions::default(),
)
.await
.expect("Failed to put");
let _put = store
.object_store(None)
.put_opts(
&Path::from("_delta_log/00000000000000000000.json"),
payload.clone(),
PutOptions::default(),
)
.await
.expect("Failed to put");
assert!(
store
.is_delta_table_location()
.await
.expect("Failed to identify table")
);
}
pub(crate) async fn flatten_list_stream(
storage: &object_store::DynObjectStore,
prefix: Option<&Path>,
) -> object_store::Result<Vec<Path>> {
storage
.list(prefix)
.map_ok(|meta| meta.location)
.try_collect::<Vec<Path>>()
.await
}
}
#[cfg(all(test, feature = "datafusion"))]
mod datafusion_tests {
use super::*;
use url::Url;
#[tokio::test]
async fn test_unique_object_store_url() {
for (location_1, location_2) in [
("file:///path/to/table_1", "file:///path/to/table_2"),
("s3://my_bucket/path/to/table_1", "file:///path/to/table_1"),
("s3://bucket_1/table_1", "s3://bucket_2/table_1"),
(
"abfss://container1@host/table_1",
"abfss://container2@host/table_1",
),
] {
let url_1 = Url::parse(location_1).unwrap();
let url_2 = Url::parse(location_2).unwrap();
assert_ne!(
object_store_url(&url_1).as_str(),
object_store_url(&url_2).as_str(),
);
}
}
#[tokio::test]
async fn test_get_actions_malformed_json() {
let malformed_json = bytes::Bytes::from(
r#"{"add": {"path": "test.parquet", "partitionValues": {}, "size": 100, "modificationTime": 1234567890, "dataChange": true}}
{"invalid json without closing brace"#,
);
let result = get_actions(0, &malformed_json);
match result {
Err(DeltaTableError::InvalidJsonLog {
line,
version,
json_err,
}) => {
assert_eq!(version, 0);
assert!(line.contains("line 2"));
assert!(json_err.is_eof());
}
other => panic!("Expected InvalidJsonLog error, got {:?}", other),
}
}
}