deltalake_core/operations/
restore.rs1use std::cmp::max;
24use std::collections::HashSet;
25use std::ops::BitXor;
26use std::sync::Arc;
27use std::time::{SystemTime, UNIX_EPOCH};
28
29use chrono::{DateTime, Utc};
30use futures::TryStreamExt;
31use futures::future::BoxFuture;
32use object_store::path::Path;
33use object_store::{ObjectStore, ObjectStoreExt as _};
34use serde::Serialize;
35use uuid::Uuid;
36
37use super::{CustomExecuteHandler, Operation};
38use crate::kernel::transaction::{CommitBuilder, CommitProperties};
39use crate::kernel::{
40 Action, Add, EagerSnapshot, ProtocolExt as _, ProtocolInner, Remove, Version, resolve_snapshot,
41};
42use crate::logstore::LogStoreRef;
43use crate::protocol::DeltaOperation;
44use crate::table::state::DeltaTableState;
45use crate::{DeltaResult, DeltaTable, DeltaTableConfig, DeltaTableError, ObjectStoreError};
46
47#[derive(thiserror::Error, Debug)]
49enum RestoreError {
50 #[error("Either the version or datetime should be provided for restore")]
51 InvalidRestoreParameter,
52
53 #[error("Version to restore {0} should be less then last available version {1}.")]
54 TooLargeRestoreVersion(Version, Version),
55
56 #[error("Find missing file {0} when restore.")]
57 MissingDataFile(String),
58}
59
60impl From<RestoreError> for DeltaTableError {
61 fn from(err: RestoreError) -> Self {
62 DeltaTableError::GenericError {
63 source: Box::new(err),
64 }
65 }
66}
67
68#[derive(Default, Debug, Serialize)]
70#[serde(rename_all = "camelCase")]
71pub struct RestoreMetrics {
72 pub num_removed_file: usize,
74 pub num_restored_file: usize,
76}
77
78pub struct RestoreBuilder {
81 snapshot: Option<EagerSnapshot>,
83 log_store: LogStoreRef,
85 version_to_restore: Option<Version>,
87 datetime_to_restore: Option<DateTime<Utc>>,
89 ignore_missing_files: bool,
91 protocol_downgrade_allowed: bool,
93 commit_properties: CommitProperties,
95 custom_execute_handler: Option<Arc<dyn CustomExecuteHandler>>,
96}
97
98impl super::Operation for RestoreBuilder {
99 fn log_store(&self) -> &LogStoreRef {
100 &self.log_store
101 }
102 fn get_custom_execute_handler(&self) -> Option<Arc<dyn CustomExecuteHandler>> {
103 self.custom_execute_handler.clone()
104 }
105}
106
107impl RestoreBuilder {
108 pub(crate) fn new(log_store: LogStoreRef, snapshot: Option<EagerSnapshot>) -> Self {
110 Self {
111 snapshot,
112 log_store,
113 version_to_restore: None,
114 datetime_to_restore: None,
115 ignore_missing_files: false,
116 protocol_downgrade_allowed: false,
117 commit_properties: CommitProperties::default(),
118 custom_execute_handler: None,
119 }
120 }
121
122 pub fn with_version_to_restore(mut self, version: Version) -> Self {
124 self.version_to_restore = Some(version);
125 self
126 }
127
128 pub fn with_datetime_to_restore(mut self, datetime: DateTime<Utc>) -> Self {
130 self.datetime_to_restore = Some(datetime);
131 self
132 }
133
134 pub fn with_ignore_missing_files(mut self, ignore_missing_files: bool) -> Self {
137 self.ignore_missing_files = ignore_missing_files;
138 self
139 }
140
141 pub fn with_protocol_downgrade_allowed(mut self, protocol_downgrade_allowed: bool) -> Self {
143 self.protocol_downgrade_allowed = protocol_downgrade_allowed;
144 self
145 }
146
147 pub fn with_commit_properties(mut self, commit_properties: CommitProperties) -> Self {
149 self.commit_properties = commit_properties;
150 self
151 }
152
153 pub fn with_custom_execute_handler(mut self, handler: Arc<dyn CustomExecuteHandler>) -> Self {
155 self.custom_execute_handler = Some(handler);
156 self
157 }
158}
159
160#[allow(clippy::too_many_arguments)]
161async fn execute(
162 log_store: LogStoreRef,
163 snapshot: EagerSnapshot,
164 version_to_restore: Option<Version>,
165 datetime_to_restore: Option<DateTime<Utc>>,
166 ignore_missing_files: bool,
167 protocol_downgrade_allowed: bool,
168 mut commit_properties: CommitProperties,
169 custom_execute_handler: Option<Arc<dyn CustomExecuteHandler>>,
170 operation_id: Uuid,
171) -> DeltaResult<(RestoreMetrics, DeltaTableState)> {
172 if !(version_to_restore
173 .is_none()
174 .bitxor(datetime_to_restore.is_none()))
175 {
176 return Err(DeltaTableError::from(RestoreError::InvalidRestoreParameter));
177 }
178 let mut table = DeltaTable::new(log_store.clone(), DeltaTableConfig::default());
179
180 let version = match datetime_to_restore {
181 Some(datetime) => {
182 table.load_with_datetime(datetime).await?;
183 table
184 .version()
185 .ok_or_else(|| DeltaTableError::NotInitialized)?
186 }
187 None => {
188 table.load_version(version_to_restore.unwrap()).await?;
189 table
190 .version()
191 .ok_or_else(|| DeltaTableError::NotInitialized)?
192 }
193 };
194
195 if version >= snapshot.version() {
196 return Err(DeltaTableError::from(RestoreError::TooLargeRestoreVersion(
197 version,
198 snapshot.version(),
199 )));
200 }
201
202 let snapshot_restored = table.snapshot()?;
203 let metadata_restored_version = snapshot_restored.metadata();
204
205 let state_to_restore_files: Vec<_> = snapshot_restored
206 .snapshot()
207 .file_views(&log_store, None)
208 .try_collect()
209 .await?;
210 let latest_state_files: Vec<_> = snapshot.file_views(&log_store, None).try_collect().await?;
211 let state_to_restore_files_set =
212 HashSet::<_>::from_iter(state_to_restore_files.iter().map(|f| f.path().to_string()));
213 let latest_state_files_set =
214 HashSet::<_>::from_iter(latest_state_files.iter().map(|f| f.path().to_string()));
215
216 let files_to_add: Vec<Add> = state_to_restore_files
217 .iter()
218 .filter(|a| !latest_state_files_set.contains(&a.path().to_string()))
219 .map(|f| {
220 let mut a = f.to_add();
221 a.data_change = true;
222 a
223 })
224 .collect();
225
226 let deletion_timestamp = SystemTime::now()
227 .duration_since(UNIX_EPOCH)
228 .unwrap()
229 .as_millis() as i64;
230 let files_to_remove: Vec<Remove> = latest_state_files
231 .iter()
232 .filter(|f| !state_to_restore_files_set.contains(&f.path().to_string()))
233 .map(|f| {
234 let mut rm = f.remove_action(true);
235 rm.deletion_timestamp = Some(deletion_timestamp);
236 rm
237 })
238 .collect();
239
240 if !ignore_missing_files {
241 check_files_available(log_store.object_store(None).as_ref(), &files_to_add).await?;
242 }
243
244 let metrics = RestoreMetrics {
245 num_removed_file: files_to_remove.len(),
246 num_restored_file: files_to_add.len(),
247 };
248
249 let mut actions = vec![];
250 let protocol = if protocol_downgrade_allowed {
251 ProtocolInner {
252 min_reader_version: snapshot_restored.protocol().min_reader_version(),
253 min_writer_version: snapshot_restored.protocol().min_writer_version(),
254 writer_features: if snapshot.protocol().min_writer_version() < 7 {
255 None
256 } else {
257 snapshot_restored.protocol().writer_features_set()
258 },
259 reader_features: if snapshot.protocol().min_reader_version() < 3 {
260 None
261 } else {
262 snapshot_restored.protocol().reader_features_set()
263 },
264 }
265 } else {
266 ProtocolInner {
267 min_reader_version: max(
268 snapshot_restored.protocol().min_reader_version(),
269 snapshot.protocol().min_reader_version(),
270 ),
271 min_writer_version: max(
272 snapshot_restored.protocol().min_writer_version(),
273 snapshot.protocol().min_writer_version(),
274 ),
275 writer_features: snapshot.protocol().writer_features_set(),
276 reader_features: snapshot.protocol().reader_features_set(),
277 }
278 };
279 commit_properties
280 .app_metadata
281 .insert("readVersion".to_owned(), snapshot.version().into());
282 commit_properties.app_metadata.insert(
283 "operationMetrics".to_owned(),
284 serde_json::to_value(&metrics)?,
285 );
286
287 actions.push(Action::Protocol(protocol.as_kernel()));
288 actions.extend(files_to_add.into_iter().map(Action::Add));
289 actions.extend(files_to_remove.into_iter().map(Action::Remove));
290 actions.push(Action::Metadata(metadata_restored_version.clone()));
292
293 let operation = DeltaOperation::Restore {
294 version: version_to_restore,
295 datetime: datetime_to_restore.map(|time| -> i64 { time.timestamp_millis() }),
296 };
297
298 let commit = CommitBuilder::from(commit_properties)
299 .with_actions(actions)
300 .with_max_retries(0)
301 .with_operation_id(operation_id)
302 .with_post_commit_hook_handler(custom_execute_handler)
303 .build(Some(&snapshot), log_store.clone(), operation)
304 .await?;
305
306 Ok((metrics, commit.snapshot()))
307}
308
309async fn check_files_available(
310 object_store: &dyn ObjectStore,
311 files: &Vec<Add>,
312) -> DeltaResult<()> {
313 for file in files {
314 let file_path = Path::parse(file.path.clone())?;
315 match object_store.head(&file_path).await {
316 Ok(_) => {}
317 Err(ObjectStoreError::NotFound { .. }) => {
318 return Err(DeltaTableError::from(RestoreError::MissingDataFile(
319 file.path.clone(),
320 )));
321 }
322 Err(e) => return Err(DeltaTableError::from(e)),
323 }
324 }
325 Ok(())
326}
327
328impl std::future::IntoFuture for RestoreBuilder {
329 type Output = DeltaResult<(DeltaTable, RestoreMetrics)>;
330 type IntoFuture = BoxFuture<'static, Self::Output>;
331
332 fn into_future(self) -> Self::IntoFuture {
333 let mut this = self;
334
335 Box::pin(async move {
336 let snapshot =
337 resolve_snapshot(&this.log_store, this.snapshot.clone(), true, None).await?;
338
339 let operation_id = this.get_operation_id();
340 this.pre_execute(operation_id).await?;
341
342 let handle = this.custom_execute_handler.take();
343 let (metrics, new_state) = execute(
344 this.log_store.clone(),
345 snapshot,
346 this.version_to_restore,
347 this.datetime_to_restore,
348 this.ignore_missing_files,
349 this.protocol_downgrade_allowed,
350 this.commit_properties.clone(),
351 handle.clone(),
352 operation_id,
353 )
354 .await?;
355
356 if let Some(handler) = handle {
357 handler.post_execute(&this.log_store, operation_id).await?;
358 }
359
360 Ok((
361 DeltaTable::new_with_state(this.log_store, new_state),
362 metrics,
363 ))
364 })
365 }
366}
367
368#[cfg(test)]
369#[cfg(feature = "datafusion")]
370mod tests {
371
372 use crate::DeltaResult;
373 use crate::writer::test_utils::{create_bare_table, get_record_batch};
374
375 #[tokio::test]
378 async fn test_simple_restore_constraints() -> DeltaResult<()> {
379 use crate::table::config::TablePropertiesExt as _;
380
381 let batch = get_record_batch(None, false);
382 let table = create_bare_table().write(vec![batch.clone()]).await?;
383 let first_v = table.version().unwrap();
384
385 let constraint = table
386 .add_constraint()
387 .with_constraint("my_custom_constraint", "value < 100")
388 .await;
389 let table = constraint.expect("Failed to add constraint to table");
390
391 let constraints = table
392 .state
393 .as_ref()
394 .unwrap()
395 .table_config()
396 .get_constraints();
397 assert!(constraints.len() == 1);
398 assert_eq!(constraints[0].name, "my_custom_constraint");
399
400 let (table, _metrics) = table.restore().with_version_to_restore(first_v).await?;
401 assert_ne!(table.version(), Some(first_v));
402
403 let constraints = table.state.unwrap().table_config().get_constraints();
404 assert!(constraints.is_empty());
405
406 Ok(())
407 }
408}