use std::sync::{Arc, OnceLock};
use bytes::Bytes;
use object_store::{Attributes, Error as ObjectStoreError, ObjectStore, PutOptions, TagSet};
use uuid::Uuid;
use super::storage::{ObjectStoreRef, utils::commit_uri_from_version};
use super::{CommitOrBytes, LogStore, LogStoreConfig};
use crate::DeltaResult;
use crate::kernel::Version;
use crate::kernel::transaction::TransactionError;
fn put_options() -> &'static PutOptions {
static PUT_OPTS: OnceLock<PutOptions> = OnceLock::new();
PUT_OPTS.get_or_init(|| PutOptions {
mode: object_store::PutMode::Create, tags: TagSet::default(),
attributes: Attributes::default(),
extensions: Default::default(),
})
}
#[derive(Debug, Clone)]
pub struct DefaultLogStore {
prefixed_store: ObjectStoreRef,
root_store: ObjectStoreRef,
config: LogStoreConfig,
}
impl DefaultLogStore {
pub fn new(
prefixed_store: ObjectStoreRef,
root_store: ObjectStoreRef,
config: LogStoreConfig,
) -> Self {
Self {
prefixed_store,
root_store,
config,
}
}
}
#[async_trait::async_trait]
impl LogStore for DefaultLogStore {
fn name(&self) -> String {
"DefaultLogStore".into()
}
async fn read_commit_entry(&self, version: Version) -> DeltaResult<Option<Bytes>> {
super::read_commit_entry(self.object_store(None).as_ref(), version).await
}
async fn write_commit_entry(
&self,
version: Version,
commit_or_bytes: CommitOrBytes,
_: Uuid,
) -> Result<(), TransactionError> {
match commit_or_bytes {
CommitOrBytes::LogBytes(log_bytes) => self
.object_store(None)
.put_opts(
&commit_uri_from_version(Some(version)),
log_bytes.into(),
put_options().clone(),
)
.await
.map_err(|err| -> TransactionError {
match err {
ObjectStoreError::AlreadyExists { .. } => {
TransactionError::VersionAlreadyExists(version)
}
_ => TransactionError::from(err),
}
})?,
_ => unreachable!(), };
Ok(())
}
async fn abort_commit_entry(
&self,
_version: Version,
commit_or_bytes: CommitOrBytes,
_: Uuid,
) -> Result<(), TransactionError> {
match &commit_or_bytes {
CommitOrBytes::LogBytes(_) => Ok(()),
_ => unreachable!(), }
}
async fn get_latest_version(&self, current_version: Version) -> DeltaResult<Version> {
super::get_latest_version(self, current_version).await
}
fn object_store(&self, _: Option<Uuid>) -> Arc<dyn ObjectStore> {
self.prefixed_store.clone()
}
fn root_object_store(&self, _: Option<Uuid>) -> Arc<dyn ObjectStore> {
self.root_store.clone()
}
fn config(&self) -> &LogStoreConfig {
&self.config
}
}