Skip to main content

deltalake_core/operations/
restore.rs

1//! Perform restore of delta table to a specified version or datetime
2//!
3//! Algorithm:
4//! 1) Read the latest state snapshot of the table.
5//! 2) Read table state for version or datetime to restore
6//! 3) Compute files available in state for restoring (files were removed by some commit)
7//!    but missed in the latest. Add these files into commit as AddFile action.
8//! 4) Compute files available in the latest state snapshot (files were added after version to restore)
9//!    but missed in the state to restore. Add these files into commit as RemoveFile action.
10//! 5) If ignore_missing_files option is false (default value) check availability of AddFile
11//!    in file system.
12//! 6) Commit Protocol, all RemoveFile and AddFile actions
13//!    into delta log using `LogStore::write_commit_entry` (commit will be failed in case of parallel transaction)
14//!    TODO: comment is outdated
15//! 7) If table was modified in parallel then ignore restore and raise exception.
16//!
17//! # Example
18//! ```rust ignore
19//! let table = open_table(Url::from_directory_path("/abs/path/to/table").unwrap())?;
20//! let (table, metrics) = RestoreBuilder::new(table.object_store(), table.state).with_version_to_restore(1).await?;
21//! ````
22
23use std::cmp::max;
24use std::collections::HashSet;
25use std::ops::BitXor;
26use std::sync::Arc;
27use std::time::{SystemTime, UNIX_EPOCH};
28
29use chrono::{DateTime, Utc};
30use futures::TryStreamExt;
31use futures::future::BoxFuture;
32use object_store::path::Path;
33use object_store::{ObjectStore, ObjectStoreExt as _};
34use serde::Serialize;
35use uuid::Uuid;
36
37use super::{CustomExecuteHandler, Operation};
38use crate::kernel::transaction::{CommitBuilder, CommitProperties};
39use crate::kernel::{
40    Action, Add, EagerSnapshot, ProtocolExt as _, ProtocolInner, Remove, Version, resolve_snapshot,
41};
42use crate::logstore::LogStoreRef;
43use crate::protocol::DeltaOperation;
44use crate::table::state::DeltaTableState;
45use crate::{DeltaResult, DeltaTable, DeltaTableConfig, DeltaTableError, ObjectStoreError};
46
47/// Errors that can occur during restore
48#[derive(thiserror::Error, Debug)]
49enum RestoreError {
50    #[error("Either the version or datetime should be provided for restore")]
51    InvalidRestoreParameter,
52
53    #[error("Version to restore {0} should be less then last available version {1}.")]
54    TooLargeRestoreVersion(Version, Version),
55
56    #[error("Find missing file {0} when restore.")]
57    MissingDataFile(String),
58}
59
60impl From<RestoreError> for DeltaTableError {
61    fn from(err: RestoreError) -> Self {
62        DeltaTableError::GenericError {
63            source: Box::new(err),
64        }
65    }
66}
67
68/// Metrics from Restore
69#[derive(Default, Debug, Serialize)]
70#[serde(rename_all = "camelCase")]
71pub struct RestoreMetrics {
72    /// Number of files removed
73    pub num_removed_file: usize,
74    /// Number of files restored
75    pub num_restored_file: usize,
76}
77
78/// Restore a Delta table with given version
79/// See this module's documentation for more information
80pub struct RestoreBuilder {
81    /// A snapshot of the to-be-restored table's state
82    snapshot: Option<EagerSnapshot>,
83    /// Delta object store for handling data files
84    log_store: LogStoreRef,
85    /// Version to restore
86    version_to_restore: Option<Version>,
87    /// Datetime to restore
88    datetime_to_restore: Option<DateTime<Utc>>,
89    /// Ignore missing files
90    ignore_missing_files: bool,
91    /// Protocol downgrade allowed
92    protocol_downgrade_allowed: bool,
93    /// Additional information to add to the commit
94    commit_properties: CommitProperties,
95    custom_execute_handler: Option<Arc<dyn CustomExecuteHandler>>,
96}
97
98impl super::Operation for RestoreBuilder {
99    fn log_store(&self) -> &LogStoreRef {
100        &self.log_store
101    }
102    fn get_custom_execute_handler(&self) -> Option<Arc<dyn CustomExecuteHandler>> {
103        self.custom_execute_handler.clone()
104    }
105}
106
107impl RestoreBuilder {
108    /// Create a new [`RestoreBuilder`]
109    pub(crate) fn new(log_store: LogStoreRef, snapshot: Option<EagerSnapshot>) -> Self {
110        Self {
111            snapshot,
112            log_store,
113            version_to_restore: None,
114            datetime_to_restore: None,
115            ignore_missing_files: false,
116            protocol_downgrade_allowed: false,
117            commit_properties: CommitProperties::default(),
118            custom_execute_handler: None,
119        }
120    }
121
122    /// Set the version to restore
123    pub fn with_version_to_restore(mut self, version: Version) -> Self {
124        self.version_to_restore = Some(version);
125        self
126    }
127
128    /// Set the datetime to restore
129    pub fn with_datetime_to_restore(mut self, datetime: DateTime<Utc>) -> Self {
130        self.datetime_to_restore = Some(datetime);
131        self
132    }
133
134    /// Set whether to ignore missing files which delete manually or by vacuum.
135    /// If true, continue to run when encountering missing files.
136    pub fn with_ignore_missing_files(mut self, ignore_missing_files: bool) -> Self {
137        self.ignore_missing_files = ignore_missing_files;
138        self
139    }
140
141    /// Set whether allow to downgrade protocol
142    pub fn with_protocol_downgrade_allowed(mut self, protocol_downgrade_allowed: bool) -> Self {
143        self.protocol_downgrade_allowed = protocol_downgrade_allowed;
144        self
145    }
146
147    /// Additional metadata to be added to commit info
148    pub fn with_commit_properties(mut self, commit_properties: CommitProperties) -> Self {
149        self.commit_properties = commit_properties;
150        self
151    }
152
153    /// Set a custom execute handler, for pre and post execution
154    pub fn with_custom_execute_handler(mut self, handler: Arc<dyn CustomExecuteHandler>) -> Self {
155        self.custom_execute_handler = Some(handler);
156        self
157    }
158}
159
160#[allow(clippy::too_many_arguments)]
161async fn execute(
162    log_store: LogStoreRef,
163    snapshot: EagerSnapshot,
164    version_to_restore: Option<Version>,
165    datetime_to_restore: Option<DateTime<Utc>>,
166    ignore_missing_files: bool,
167    protocol_downgrade_allowed: bool,
168    mut commit_properties: CommitProperties,
169    custom_execute_handler: Option<Arc<dyn CustomExecuteHandler>>,
170    operation_id: Uuid,
171) -> DeltaResult<(RestoreMetrics, DeltaTableState)> {
172    if !(version_to_restore
173        .is_none()
174        .bitxor(datetime_to_restore.is_none()))
175    {
176        return Err(DeltaTableError::from(RestoreError::InvalidRestoreParameter));
177    }
178    let mut table = DeltaTable::new(log_store.clone(), DeltaTableConfig::default());
179
180    let version = match datetime_to_restore {
181        Some(datetime) => {
182            table.load_with_datetime(datetime).await?;
183            table
184                .version()
185                .ok_or_else(|| DeltaTableError::NotInitialized)?
186        }
187        None => {
188            table.load_version(version_to_restore.unwrap()).await?;
189            table
190                .version()
191                .ok_or_else(|| DeltaTableError::NotInitialized)?
192        }
193    };
194
195    if version >= snapshot.version() {
196        return Err(DeltaTableError::from(RestoreError::TooLargeRestoreVersion(
197            version,
198            snapshot.version(),
199        )));
200    }
201
202    let snapshot_restored = table.snapshot()?;
203    let metadata_restored_version = snapshot_restored.metadata();
204
205    let state_to_restore_files: Vec<_> = snapshot_restored
206        .snapshot()
207        .file_views(&log_store, None)
208        .try_collect()
209        .await?;
210    let latest_state_files: Vec<_> = snapshot.file_views(&log_store, None).try_collect().await?;
211    let state_to_restore_files_set =
212        HashSet::<_>::from_iter(state_to_restore_files.iter().map(|f| f.path().to_string()));
213    let latest_state_files_set =
214        HashSet::<_>::from_iter(latest_state_files.iter().map(|f| f.path().to_string()));
215
216    let files_to_add: Vec<Add> = state_to_restore_files
217        .iter()
218        .filter(|a| !latest_state_files_set.contains(&a.path().to_string()))
219        .map(|f| {
220            let mut a = f.to_add();
221            a.data_change = true;
222            a
223        })
224        .collect();
225
226    let deletion_timestamp = SystemTime::now()
227        .duration_since(UNIX_EPOCH)
228        .unwrap()
229        .as_millis() as i64;
230    let files_to_remove: Vec<Remove> = latest_state_files
231        .iter()
232        .filter(|f| !state_to_restore_files_set.contains(&f.path().to_string()))
233        .map(|f| {
234            let mut rm = f.remove_action(true);
235            rm.deletion_timestamp = Some(deletion_timestamp);
236            rm
237        })
238        .collect();
239
240    if !ignore_missing_files {
241        check_files_available(log_store.object_store(None).as_ref(), &files_to_add).await?;
242    }
243
244    let metrics = RestoreMetrics {
245        num_removed_file: files_to_remove.len(),
246        num_restored_file: files_to_add.len(),
247    };
248
249    let mut actions = vec![];
250    let protocol = if protocol_downgrade_allowed {
251        ProtocolInner {
252            min_reader_version: snapshot_restored.protocol().min_reader_version(),
253            min_writer_version: snapshot_restored.protocol().min_writer_version(),
254            writer_features: if snapshot.protocol().min_writer_version() < 7 {
255                None
256            } else {
257                snapshot_restored.protocol().writer_features_set()
258            },
259            reader_features: if snapshot.protocol().min_reader_version() < 3 {
260                None
261            } else {
262                snapshot_restored.protocol().reader_features_set()
263            },
264        }
265    } else {
266        ProtocolInner {
267            min_reader_version: max(
268                snapshot_restored.protocol().min_reader_version(),
269                snapshot.protocol().min_reader_version(),
270            ),
271            min_writer_version: max(
272                snapshot_restored.protocol().min_writer_version(),
273                snapshot.protocol().min_writer_version(),
274            ),
275            writer_features: snapshot.protocol().writer_features_set(),
276            reader_features: snapshot.protocol().reader_features_set(),
277        }
278    };
279    commit_properties
280        .app_metadata
281        .insert("readVersion".to_owned(), snapshot.version().into());
282    commit_properties.app_metadata.insert(
283        "operationMetrics".to_owned(),
284        serde_json::to_value(&metrics)?,
285    );
286
287    actions.push(Action::Protocol(protocol.as_kernel()));
288    actions.extend(files_to_add.into_iter().map(Action::Add));
289    actions.extend(files_to_remove.into_iter().map(Action::Remove));
290    // Add the metadata from the restored version to undo e.g. constraint or field metadata changes
291    actions.push(Action::Metadata(metadata_restored_version.clone()));
292
293    let operation = DeltaOperation::Restore {
294        version: version_to_restore,
295        datetime: datetime_to_restore.map(|time| -> i64 { time.timestamp_millis() }),
296    };
297
298    let commit = CommitBuilder::from(commit_properties)
299        .with_actions(actions)
300        .with_max_retries(0)
301        .with_operation_id(operation_id)
302        .with_post_commit_hook_handler(custom_execute_handler)
303        .build(Some(&snapshot), log_store.clone(), operation)
304        .await?;
305
306    Ok((metrics, commit.snapshot()))
307}
308
309async fn check_files_available(
310    object_store: &dyn ObjectStore,
311    files: &Vec<Add>,
312) -> DeltaResult<()> {
313    for file in files {
314        let file_path = Path::parse(file.path.clone())?;
315        match object_store.head(&file_path).await {
316            Ok(_) => {}
317            Err(ObjectStoreError::NotFound { .. }) => {
318                return Err(DeltaTableError::from(RestoreError::MissingDataFile(
319                    file.path.clone(),
320                )));
321            }
322            Err(e) => return Err(DeltaTableError::from(e)),
323        }
324    }
325    Ok(())
326}
327
328impl std::future::IntoFuture for RestoreBuilder {
329    type Output = DeltaResult<(DeltaTable, RestoreMetrics)>;
330    type IntoFuture = BoxFuture<'static, Self::Output>;
331
332    fn into_future(self) -> Self::IntoFuture {
333        let mut this = self;
334
335        Box::pin(async move {
336            let snapshot =
337                resolve_snapshot(&this.log_store, this.snapshot.clone(), true, None).await?;
338
339            let operation_id = this.get_operation_id();
340            this.pre_execute(operation_id).await?;
341
342            let handle = this.custom_execute_handler.take();
343            let (metrics, new_state) = execute(
344                this.log_store.clone(),
345                snapshot,
346                this.version_to_restore,
347                this.datetime_to_restore,
348                this.ignore_missing_files,
349                this.protocol_downgrade_allowed,
350                this.commit_properties.clone(),
351                handle.clone(),
352                operation_id,
353            )
354            .await?;
355
356            if let Some(handler) = handle {
357                handler.post_execute(&this.log_store, operation_id).await?;
358            }
359
360            Ok((
361                DeltaTable::new_with_state(this.log_store, new_state),
362                metrics,
363            ))
364        })
365    }
366}
367
368#[cfg(test)]
369#[cfg(feature = "datafusion")]
370mod tests {
371
372    use crate::DeltaResult;
373    use crate::writer::test_utils::{create_bare_table, get_record_batch};
374
375    /// Verify that restore respects constraints that were added/removed in previous version_to_restore
376    /// <https://github.com/delta-io/delta-rs/issues/3352>
377    #[tokio::test]
378    async fn test_simple_restore_constraints() -> DeltaResult<()> {
379        use crate::table::config::TablePropertiesExt as _;
380
381        let batch = get_record_batch(None, false);
382        let table = create_bare_table().write(vec![batch.clone()]).await?;
383        let first_v = table.version().unwrap();
384
385        let constraint = table
386            .add_constraint()
387            .with_constraint("my_custom_constraint", "value < 100")
388            .await;
389        let table = constraint.expect("Failed to add constraint to table");
390
391        let constraints = table
392            .state
393            .as_ref()
394            .unwrap()
395            .table_config()
396            .get_constraints();
397        assert!(constraints.len() == 1);
398        assert_eq!(constraints[0].name, "my_custom_constraint");
399
400        let (table, _metrics) = table.restore().with_version_to_restore(first_v).await?;
401        assert_ne!(table.version(), Some(first_v));
402
403        let constraints = table.state.unwrap().table_config().get_constraints();
404        assert!(constraints.is_empty());
405
406        Ok(())
407    }
408}