use std::cmp::{Ordering, min};
use std::fmt;
use std::fmt::Formatter;
use std::sync::Arc;
use chrono::{DateTime, Utc};
use futures::future::ready;
use futures::stream::{BoxStream, once};
use futures::{StreamExt, TryStreamExt};
use object_store::{ObjectStore, ObjectStoreExt as _, path::Path};
use serde::de::{Error, SeqAccess, Visitor};
use serde::ser::SerializeSeq;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use url::Url;
use self::builder::DeltaTableConfig;
use self::state::DeltaTableState;
use crate::kernel::{CommitInfo, DataCheck, LogicalFileView, Version};
use crate::logstore::{
LogStoreConfig, LogStoreExt, LogStoreRef, ObjectStoreRef, commit_uri_from_version,
extract_version_from_filename,
};
use crate::partitions::PartitionFilter;
use crate::{DeltaResult, DeltaTableBuilder, DeltaTableError};
pub mod builder;
pub mod config;
pub mod state;
mod columns;
pub use columns::*;
#[derive(Clone)]
pub struct DeltaTable {
pub state: Option<DeltaTableState>,
pub config: DeltaTableConfig,
pub(crate) log_store: LogStoreRef,
}
impl Serialize for DeltaTable {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut seq = serializer.serialize_seq(None)?;
seq.serialize_element(&self.state)?;
seq.serialize_element(&self.config)?;
seq.serialize_element(self.log_store.config())?;
seq.end()
}
}
impl<'de> Deserialize<'de> for DeltaTable {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
struct DeltaTableVisitor {}
impl<'de> Visitor<'de> for DeltaTableVisitor {
type Value = DeltaTable;
fn expecting(&self, formatter: &mut Formatter) -> fmt::Result {
formatter.write_str("struct DeltaTable")
}
fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
where
A: SeqAccess<'de>,
{
let state = seq
.next_element()?
.ok_or_else(|| A::Error::invalid_length(0, &self))?;
let config = seq
.next_element()?
.ok_or_else(|| A::Error::invalid_length(0, &self))?;
let storage_config: LogStoreConfig = seq
.next_element()?
.ok_or_else(|| A::Error::invalid_length(0, &self))?;
let log_store = crate::logstore::logstore_for(
storage_config.location(),
storage_config.options().clone(),
)
.map_err(|_| A::Error::custom("Failed deserializing LogStore"))?;
let table = DeltaTable {
state,
config,
log_store,
};
Ok(table)
}
}
deserializer.deserialize_seq(DeltaTableVisitor {})
}
}
impl DeltaTable {
pub fn new(log_store: LogStoreRef, config: DeltaTableConfig) -> Self {
Self {
state: None,
log_store,
config,
}
}
pub fn new_in_memory() -> Self {
let url = Url::parse("memory:///").unwrap();
DeltaTableBuilder::from_url(url).unwrap().build().unwrap()
}
pub(crate) fn new_with_state(log_store: LogStoreRef, state: DeltaTableState) -> Self {
let config = state.load_config().clone();
Self {
state: Some(state),
log_store,
config,
}
}
pub fn object_store(&self) -> ObjectStoreRef {
self.log_store.object_store(None)
}
pub async fn verify_deltatable_existence(&self) -> DeltaResult<bool> {
self.log_store.is_delta_table_location().await
}
pub fn table_url(&self) -> &Url {
self.log_store.root_url()
}
pub fn log_store(&self) -> LogStoreRef {
self.log_store.clone()
}
pub async fn get_latest_version(&self) -> Result<Version, DeltaTableError> {
self.log_store
.get_latest_version(self.version().unwrap_or(0))
.await
}
pub fn version(&self) -> Option<Version> {
self.state.as_ref().map(|s| s.version())
}
pub async fn load(&mut self) -> Result<(), DeltaTableError> {
self.update_incremental(None).await
}
pub async fn update_state(&mut self) -> Result<(), DeltaTableError> {
self.update_incremental(None).await
}
pub async fn update_incremental(
&mut self,
max_version: Option<Version>,
) -> Result<(), DeltaTableError> {
let Some(state) = self.state.as_mut() else {
self.state = Some(
DeltaTableState::try_new(&self.log_store, self.config.clone(), max_version).await?,
);
return Ok(());
};
let current_version = state.version();
if let Some(requested_version) = max_version
&& requested_version < current_version
{
return Err(DeltaTableError::VersionDowngrade {
current_version,
requested_version,
});
}
state.update(&self.log_store, max_version).await?;
Ok(())
}
pub async fn load_version(&mut self, version: Version) -> Result<(), DeltaTableError> {
if let Some(snapshot) = &self.state
&& snapshot.version() > version
{
self.state = None;
}
self.update_incremental(Some(version)).await
}
pub(crate) async fn get_version_timestamp(
&self,
version: Version,
) -> Result<i64, DeltaTableError> {
match self
.state
.as_ref()
.and_then(|s| s.version_timestamp(version))
{
Some(ts) => Ok(ts),
None => {
let meta = self
.object_store()
.head(&commit_uri_from_version(Some(version)))
.await?;
let ts = meta.last_modified.timestamp_millis();
Ok(ts)
}
}
}
pub async fn history(
&self,
limit: Option<usize>,
) -> Result<impl Iterator<Item = CommitInfo> + use<>, DeltaTableError> {
let infos = self
.snapshot()?
.snapshot()
.snapshot()
.commit_infos(&self.log_store(), limit)
.await?
.try_collect::<Vec<_>>()
.await?;
Ok(infos.into_iter().flatten())
}
#[cfg(test)]
pub(crate) async fn last_commit(&self) -> Result<CommitInfo, DeltaTableError> {
let mut infos: Vec<_> = self.history(Some(1)).await?.collect();
infos.pop().ok_or(DeltaTableError::Generic(
"Somehow there is nothing in the history!".into(),
))
}
pub fn get_active_add_actions_by_partitions(
&self,
filters: &[PartitionFilter],
) -> BoxStream<'_, DeltaResult<LogicalFileView>> {
let Some(state) = self.state.as_ref() else {
return Box::pin(futures::stream::once(async {
Err(DeltaTableError::NotInitialized)
}));
};
if filters.is_empty() {
return state.snapshot().file_views(&self.log_store, None);
}
let predicate =
match crate::to_kernel_predicate(filters, state.snapshot().schema().as_ref()) {
Ok(predicate) => Arc::new(predicate),
Err(err) => return Box::pin(once(ready(Err(err)))),
};
state
.snapshot()
.file_views(&self.log_store, Some(predicate))
}
pub async fn get_files_by_partitions(
&self,
filters: &[PartitionFilter],
) -> Result<Vec<Path>, DeltaTableError> {
Ok(self
.get_active_add_actions_by_partitions(filters)
.try_collect::<Vec<_>>()
.await?
.into_iter()
.map(|add| add.object_store_path())
.collect())
}
pub async fn get_file_uris_by_partitions(
&self,
filters: &[PartitionFilter],
) -> Result<Vec<String>, DeltaTableError> {
let files = self.get_files_by_partitions(filters).await?;
Ok(files
.iter()
.map(|fname| self.log_store.to_uri(fname))
.collect())
}
pub fn get_file_uris(&self) -> DeltaResult<impl Iterator<Item = String> + '_> {
Ok(self
.state
.as_ref()
.ok_or(DeltaTableError::NotInitialized)?
.log_data()
.into_iter()
.map(|add| add.object_store_path())
.map(|path| self.log_store.to_uri(&path)))
}
pub fn snapshot(&self) -> DeltaResult<&DeltaTableState> {
self.state.as_ref().ok_or(DeltaTableError::NotInitialized)
}
pub async fn load_with_datetime(
&mut self,
datetime: DateTime<Utc>,
) -> Result<(), DeltaTableError> {
let mut min_version: i64 = -1;
let log_store = self.log_store();
let prefix = log_store.log_path();
let offset_path = commit_uri_from_version(None);
let object_store = log_store.object_store(None);
let mut files = object_store.list_with_offset(Some(prefix), &offset_path);
while let Some(obj_meta) = files.next().await {
let obj_meta = obj_meta?;
let location_path: Path = obj_meta.location.clone();
let part_count = location_path.prefix_match(prefix).unwrap().count();
if part_count > 1 {
continue;
}
if let Some(log_version) = extract_version_from_filename(obj_meta.location.as_ref()) {
if min_version == -1 {
min_version = log_version as i64;
} else {
min_version = min(min_version, log_version as i64);
}
}
if min_version == 0 {
break;
}
}
let latest_default_version = if min_version < 0 {
0
} else {
min_version.try_into().unwrap()
};
let mut max_version = match self
.log_store
.get_latest_version(self.version().unwrap_or(latest_default_version))
.await
{
Ok(version) => version,
Err(DeltaTableError::InvalidVersion(_)) => {
return Err(DeltaTableError::NotATable(
log_store.table_root_url().to_string(),
));
}
Err(e) => return Err(e),
} as i64;
let mut version = min_version;
let lowest_table_version = min_version;
let target_ts = datetime.timestamp_millis();
while min_version <= max_version {
let pivot = (max_version + min_version) / 2;
version = pivot;
let pts: i64 = self
.get_version_timestamp(pivot.try_into().unwrap())
.await?;
match pts.cmp(&target_ts) {
Ordering::Equal => {
break;
}
Ordering::Less => {
min_version = pivot + 1;
}
Ordering::Greater => {
max_version = pivot - 1;
version = max_version
}
}
}
if version < lowest_table_version {
version = lowest_table_version;
}
assert!(
version >= 0,
"load_with_datetime() came up with a negative version which shouldn't be possible"
);
self.load_version(version.try_into().unwrap()).await
}
}
impl fmt::Display for DeltaTable {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
writeln!(f, "DeltaTable({})", self.table_url())?;
writeln!(f, "\tversion: {:?}", self.version())
}
}
impl std::fmt::Debug for DeltaTable {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
write!(f, "DeltaTable <{}>", self.table_url())
}
}
pub fn normalize_table_url(url: &Url) -> Url {
let mut new_segments = vec![];
for segment in url.path().split('/') {
if !segment.is_empty() {
new_segments.push(segment);
}
}
new_segments.push("");
let mut url = url.clone();
url.set_path(&new_segments.join("/"));
url
}
#[cfg(test)]
mod tests {
use arrow_ipc::writer::FileWriter;
use pretty_assertions::assert_eq;
use serde_json::json;
use tempfile::TempDir;
use super::*;
use crate::kernel::{DataType, PrimitiveType, StructField};
use crate::operations::create::CreateBuilder;
fn legacy_eager_snapshot_payload(snapshot: &crate::kernel::EagerSnapshot) -> serde_json::Value {
let mut snapshot_value = serde_json::to_value(snapshot.snapshot()).unwrap();
let snapshot_fields = snapshot_value
.as_array_mut()
.expect("snapshot serde should use a sequence");
snapshot_fields.pop();
let materialized_files = snapshot
.snapshot()
.materialized_files()
.expect("expected materialized files for legacy eager snapshot payload");
let bytes = if materialized_files.batches.is_empty() {
Vec::new()
} else {
let mut buffer = vec![];
let mut writer =
FileWriter::try_new(&mut buffer, materialized_files.batches[0].schema().as_ref())
.unwrap();
for batch in materialized_files.batches.iter() {
writer.write(batch).unwrap();
}
writer.finish().unwrap();
drop(writer);
buffer
};
json!([snapshot_value, bytes])
}
#[test]
fn test_normalize_table_url() {
for (u, path) in [
(Url::parse("s3://bucket/prefix/").unwrap(), "/prefix/"),
(Url::parse("s3://bucket/prefix").unwrap(), "/prefix/"),
(
Url::parse("s3://bucket/prefix with space/").unwrap(),
"/prefix%20with%20space/",
),
(
Url::parse("s3://bucket/special&chars/你好/😊").unwrap(),
"/special&chars/%E4%BD%A0%E5%A5%BD/%F0%9F%98%8A/",
),
(
Url::parse("s3://bucket/prefix/with/redundant/slashes//").unwrap(),
"/prefix/with/redundant/slashes/",
),
] {
assert_eq!(
normalize_table_url(&u).path(),
path,
"Failed to normalize: {}",
u.as_str()
);
}
}
#[tokio::test]
async fn table_round_trip() {
let (dt, tmp_dir) = create_test_table().await;
let bytes = serde_json::to_vec(&dt).unwrap();
let actual: DeltaTable = serde_json::from_slice(&bytes).unwrap();
assert_eq!(actual.version(), dt.version());
drop(tmp_dir);
}
#[tokio::test]
async fn table_round_trip_preserves_legacy_eager_snapshot_payload() {
let (dt, tmp_dir) = create_test_table().await;
let mut value = serde_json::to_value(&dt).unwrap();
let table_fields = value.as_array_mut().unwrap();
let state = table_fields[0].as_object_mut().unwrap();
state.insert(
"snapshot".to_string(),
legacy_eager_snapshot_payload(dt.state.as_ref().unwrap().snapshot()),
);
let actual: DeltaTable = serde_json::from_value(value).unwrap();
assert_eq!(
actual.snapshot().unwrap().log_data().num_files(),
dt.snapshot().unwrap().log_data().num_files()
);
drop(tmp_dir);
}
#[tokio::test]
async fn table_without_files_does_not_panic_on_log_data() {
let (dt, _tmp_dir) = create_test_table().await;
let url = dt.table_url().clone();
let table = DeltaTableBuilder::from_url(url)
.unwrap()
.without_files()
.load()
.await
.unwrap();
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
table.snapshot().unwrap().log_data().num_files()
}));
assert!(result.is_ok());
}
async fn create_test_table() -> (DeltaTable, TempDir) {
let tmp_dir = tempfile::tempdir().unwrap();
let table_dir = tmp_dir.path().join("test_create");
std::fs::create_dir(&table_dir).unwrap();
let dt = CreateBuilder::new()
.with_location(table_dir.to_str().unwrap())
.with_table_name("Test Table Create")
.with_comment("This table is made to test the create function for a DeltaTable")
.with_columns(vec![
StructField::new(
"Id".to_string(),
DataType::Primitive(PrimitiveType::Integer),
true,
),
StructField::new(
"Name".to_string(),
DataType::Primitive(PrimitiveType::String),
true,
),
])
.await
.unwrap();
(dt, tmp_dir)
}
}