use std::cmp::max;
use std::collections::HashSet;
use std::ops::BitXor;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use chrono::{DateTime, Utc};
use futures::TryStreamExt;
use futures::future::BoxFuture;
use object_store::path::Path;
use object_store::{ObjectStore, ObjectStoreExt as _};
use serde::Serialize;
use uuid::Uuid;
use super::{CustomExecuteHandler, Operation};
use crate::kernel::transaction::{CommitBuilder, CommitProperties};
use crate::kernel::{
Action, Add, EagerSnapshot, ProtocolExt as _, ProtocolInner, Remove, Version, resolve_snapshot,
};
use crate::logstore::LogStoreRef;
use crate::protocol::DeltaOperation;
use crate::table::state::DeltaTableState;
use crate::{DeltaResult, DeltaTable, DeltaTableConfig, DeltaTableError, ObjectStoreError};
#[derive(thiserror::Error, Debug)]
enum RestoreError {
#[error("Either the version or datetime should be provided for restore")]
InvalidRestoreParameter,
#[error("Version to restore {0} should be less then last available version {1}.")]
TooLargeRestoreVersion(Version, Version),
#[error("Find missing file {0} when restore.")]
MissingDataFile(String),
}
impl From<RestoreError> for DeltaTableError {
fn from(err: RestoreError) -> Self {
DeltaTableError::GenericError {
source: Box::new(err),
}
}
}
#[derive(Default, Debug, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct RestoreMetrics {
pub num_removed_file: usize,
pub num_restored_file: usize,
}
pub struct RestoreBuilder {
snapshot: Option<EagerSnapshot>,
log_store: LogStoreRef,
version_to_restore: Option<Version>,
datetime_to_restore: Option<DateTime<Utc>>,
ignore_missing_files: bool,
protocol_downgrade_allowed: bool,
commit_properties: CommitProperties,
custom_execute_handler: Option<Arc<dyn CustomExecuteHandler>>,
}
impl super::Operation for RestoreBuilder {
fn log_store(&self) -> &LogStoreRef {
&self.log_store
}
fn get_custom_execute_handler(&self) -> Option<Arc<dyn CustomExecuteHandler>> {
self.custom_execute_handler.clone()
}
}
impl RestoreBuilder {
pub(crate) fn new(log_store: LogStoreRef, snapshot: Option<EagerSnapshot>) -> Self {
Self {
snapshot,
log_store,
version_to_restore: None,
datetime_to_restore: None,
ignore_missing_files: false,
protocol_downgrade_allowed: false,
commit_properties: CommitProperties::default(),
custom_execute_handler: None,
}
}
pub fn with_version_to_restore(mut self, version: Version) -> Self {
self.version_to_restore = Some(version);
self
}
pub fn with_datetime_to_restore(mut self, datetime: DateTime<Utc>) -> Self {
self.datetime_to_restore = Some(datetime);
self
}
pub fn with_ignore_missing_files(mut self, ignore_missing_files: bool) -> Self {
self.ignore_missing_files = ignore_missing_files;
self
}
pub fn with_protocol_downgrade_allowed(mut self, protocol_downgrade_allowed: bool) -> Self {
self.protocol_downgrade_allowed = protocol_downgrade_allowed;
self
}
pub fn with_commit_properties(mut self, commit_properties: CommitProperties) -> Self {
self.commit_properties = commit_properties;
self
}
pub fn with_custom_execute_handler(mut self, handler: Arc<dyn CustomExecuteHandler>) -> Self {
self.custom_execute_handler = Some(handler);
self
}
}
#[allow(clippy::too_many_arguments)]
async fn execute(
log_store: LogStoreRef,
snapshot: EagerSnapshot,
version_to_restore: Option<Version>,
datetime_to_restore: Option<DateTime<Utc>>,
ignore_missing_files: bool,
protocol_downgrade_allowed: bool,
mut commit_properties: CommitProperties,
custom_execute_handler: Option<Arc<dyn CustomExecuteHandler>>,
operation_id: Uuid,
) -> DeltaResult<(RestoreMetrics, DeltaTableState)> {
if !(version_to_restore
.is_none()
.bitxor(datetime_to_restore.is_none()))
{
return Err(DeltaTableError::from(RestoreError::InvalidRestoreParameter));
}
let mut table = DeltaTable::new(log_store.clone(), DeltaTableConfig::default());
let version = match datetime_to_restore {
Some(datetime) => {
table.load_with_datetime(datetime).await?;
table
.version()
.ok_or_else(|| DeltaTableError::NotInitialized)?
}
None => {
table.load_version(version_to_restore.unwrap()).await?;
table
.version()
.ok_or_else(|| DeltaTableError::NotInitialized)?
}
};
if version >= snapshot.version() {
return Err(DeltaTableError::from(RestoreError::TooLargeRestoreVersion(
version,
snapshot.version(),
)));
}
let snapshot_restored = table.snapshot()?;
let metadata_restored_version = snapshot_restored.metadata();
let state_to_restore_files: Vec<_> = snapshot_restored
.snapshot()
.file_views(&log_store, None)
.try_collect()
.await?;
let latest_state_files: Vec<_> = snapshot.file_views(&log_store, None).try_collect().await?;
let state_to_restore_files_set =
HashSet::<_>::from_iter(state_to_restore_files.iter().map(|f| f.path().to_string()));
let latest_state_files_set =
HashSet::<_>::from_iter(latest_state_files.iter().map(|f| f.path().to_string()));
let files_to_add: Vec<Add> = state_to_restore_files
.iter()
.filter(|a| !latest_state_files_set.contains(&a.path().to_string()))
.map(|f| {
let mut a = f.to_add();
a.data_change = true;
a
})
.collect();
let deletion_timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as i64;
let files_to_remove: Vec<Remove> = latest_state_files
.iter()
.filter(|f| !state_to_restore_files_set.contains(&f.path().to_string()))
.map(|f| {
let mut rm = f.remove_action(true);
rm.deletion_timestamp = Some(deletion_timestamp);
rm
})
.collect();
if !ignore_missing_files {
check_files_available(log_store.object_store(None).as_ref(), &files_to_add).await?;
}
let metrics = RestoreMetrics {
num_removed_file: files_to_remove.len(),
num_restored_file: files_to_add.len(),
};
let mut actions = vec![];
let protocol = if protocol_downgrade_allowed {
ProtocolInner {
min_reader_version: snapshot_restored.protocol().min_reader_version(),
min_writer_version: snapshot_restored.protocol().min_writer_version(),
writer_features: if snapshot.protocol().min_writer_version() < 7 {
None
} else {
snapshot_restored.protocol().writer_features_set()
},
reader_features: if snapshot.protocol().min_reader_version() < 3 {
None
} else {
snapshot_restored.protocol().reader_features_set()
},
}
} else {
ProtocolInner {
min_reader_version: max(
snapshot_restored.protocol().min_reader_version(),
snapshot.protocol().min_reader_version(),
),
min_writer_version: max(
snapshot_restored.protocol().min_writer_version(),
snapshot.protocol().min_writer_version(),
),
writer_features: snapshot.protocol().writer_features_set(),
reader_features: snapshot.protocol().reader_features_set(),
}
};
commit_properties
.app_metadata
.insert("readVersion".to_owned(), snapshot.version().into());
commit_properties.app_metadata.insert(
"operationMetrics".to_owned(),
serde_json::to_value(&metrics)?,
);
actions.push(Action::Protocol(protocol.as_kernel()));
actions.extend(files_to_add.into_iter().map(Action::Add));
actions.extend(files_to_remove.into_iter().map(Action::Remove));
actions.push(Action::Metadata(metadata_restored_version.clone()));
let operation = DeltaOperation::Restore {
version: version_to_restore,
datetime: datetime_to_restore.map(|time| -> i64 { time.timestamp_millis() }),
};
let commit = CommitBuilder::from(commit_properties)
.with_actions(actions)
.with_max_retries(0)
.with_operation_id(operation_id)
.with_post_commit_hook_handler(custom_execute_handler)
.build(Some(&snapshot), log_store.clone(), operation)
.await?;
Ok((metrics, commit.snapshot()))
}
async fn check_files_available(
object_store: &dyn ObjectStore,
files: &Vec<Add>,
) -> DeltaResult<()> {
for file in files {
let file_path = Path::parse(file.path.clone())?;
match object_store.head(&file_path).await {
Ok(_) => {}
Err(ObjectStoreError::NotFound { .. }) => {
return Err(DeltaTableError::from(RestoreError::MissingDataFile(
file.path.clone(),
)));
}
Err(e) => return Err(DeltaTableError::from(e)),
}
}
Ok(())
}
impl std::future::IntoFuture for RestoreBuilder {
type Output = DeltaResult<(DeltaTable, RestoreMetrics)>;
type IntoFuture = BoxFuture<'static, Self::Output>;
fn into_future(self) -> Self::IntoFuture {
let mut this = self;
Box::pin(async move {
let snapshot =
resolve_snapshot(&this.log_store, this.snapshot.clone(), true, None).await?;
let operation_id = this.get_operation_id();
this.pre_execute(operation_id).await?;
let handle = this.custom_execute_handler.take();
let (metrics, new_state) = execute(
this.log_store.clone(),
snapshot,
this.version_to_restore,
this.datetime_to_restore,
this.ignore_missing_files,
this.protocol_downgrade_allowed,
this.commit_properties.clone(),
handle.clone(),
operation_id,
)
.await?;
if let Some(handler) = handle {
handler.post_execute(&this.log_store, operation_id).await?;
}
Ok((
DeltaTable::new_with_state(this.log_store, new_state),
metrics,
))
})
}
}
#[cfg(test)]
#[cfg(feature = "datafusion")]
mod tests {
use crate::DeltaResult;
use crate::writer::test_utils::{create_bare_table, get_record_batch};
#[tokio::test]
async fn test_simple_restore_constraints() -> DeltaResult<()> {
use crate::table::config::TablePropertiesExt as _;
let batch = get_record_batch(None, false);
let table = create_bare_table().write(vec![batch.clone()]).await?;
let first_v = table.version().unwrap();
let constraint = table
.add_constraint()
.with_constraint("my_custom_constraint", "value < 100")
.await;
let table = constraint.expect("Failed to add constraint to table");
let constraints = table
.state
.as_ref()
.unwrap()
.table_config()
.get_constraints();
assert!(constraints.len() == 1);
assert_eq!(constraints[0].name, "my_custom_constraint");
let (table, _metrics) = table.restore().with_version_to_restore(first_v).await?;
assert_ne!(table.version(), Some(first_v));
let constraints = table.state.unwrap().table_config().get_constraints();
assert!(constraints.is_empty());
Ok(())
}
}