deltalake_core/operations/
filesystem_check.rs

1//! Audit the Delta Table for active files that do not exist in the underlying filesystem and remove them.
2//!
3//! Active files are ones that have an add action in the log, but no corresponding remove action.
4//! This operation creates a new transaction containing a remove action for each of the missing files.
5//!
6//! This can be used to repair tables where a data file has been deleted accidentally or
7//! purposefully, if the file was corrupted.
8//!
9//! # Example
10//! ```rust ignore
11//! let mut table = open_table("../path/to/table")?;
12//! let (table, metrics) = FileSystemCheckBuilder::new(table.object_store(), table.state).await?;
13//! ````
14
15use std::collections::HashMap;
16use std::fmt::Debug;
17use std::sync::Arc;
18use std::time::SystemTime;
19use std::time::UNIX_EPOCH;
20
21use futures::future::BoxFuture;
22use futures::StreamExt;
23use object_store::ObjectStore;
24use serde::{de::Error as DeError, Deserialize, Deserializer, Serialize, Serializer};
25use url::{ParseError, Url};
26use uuid::Uuid;
27
28use super::CustomExecuteHandler;
29use super::Operation;
30use crate::errors::{DeltaResult, DeltaTableError};
31use crate::kernel::transaction::{CommitBuilder, CommitProperties};
32use crate::kernel::{Action, Add, Remove};
33use crate::logstore::LogStoreRef;
34use crate::protocol::DeltaOperation;
35use crate::table::state::DeltaTableState;
36use crate::DeltaTable;
37
38/// Audit the Delta Table's active files with the underlying file system.
39/// See this module's documentation for more information
40pub struct FileSystemCheckBuilder {
41    /// A snapshot of the to-be-checked table's state
42    snapshot: DeltaTableState,
43    /// Delta object store for handling data files
44    log_store: LogStoreRef,
45    /// Don't remove actions to the table log. Just determine which files can be removed
46    dry_run: bool,
47    /// Commit properties and configuration
48    commit_properties: CommitProperties,
49    custom_execute_handler: Option<Arc<dyn CustomExecuteHandler>>,
50}
51
52/// Details of the FSCK operation including which files were removed from the log
53#[derive(Debug, Serialize)]
54pub struct FileSystemCheckMetrics {
55    /// Was this a dry run
56    pub dry_run: bool,
57    /// Files that wrere removed successfully
58    #[serde(
59        serialize_with = "serialize_vec_string",
60        deserialize_with = "deserialize_vec_string"
61    )]
62    pub files_removed: Vec<String>,
63}
64
65struct FileSystemCheckPlan {
66    /// Delta object store for handling data files
67    log_store: LogStoreRef,
68    /// Files that no longer exists in undlying ObjectStore but have active add actions
69    pub files_to_remove: Vec<Add>,
70}
71
72// Custom serialization function that serializes metric details as a string
73fn serialize_vec_string<S>(value: &Vec<String>, serializer: S) -> Result<S::Ok, S::Error>
74where
75    S: Serializer,
76{
77    let json_string = serde_json::to_string(value).map_err(serde::ser::Error::custom)?;
78    serializer.serialize_str(&json_string)
79}
80
81// Custom deserialization that parses a JSON string into MetricDetails
82#[expect(dead_code)]
83fn deserialize_vec_string<'de, D>(deserializer: D) -> Result<Vec<String>, D::Error>
84where
85    D: Deserializer<'de>,
86{
87    let s: String = Deserialize::deserialize(deserializer)?;
88    serde_json::from_str(&s).map_err(DeError::custom)
89}
90
91fn is_absolute_path(path: &str) -> DeltaResult<bool> {
92    match Url::parse(path) {
93        Ok(_) => Ok(true),
94        Err(ParseError::RelativeUrlWithoutBase) => Ok(false),
95        Err(_) => Err(DeltaTableError::Generic(format!(
96            "Unable to parse path: {path}"
97        ))),
98    }
99}
100
101impl super::Operation<()> for FileSystemCheckBuilder {
102    fn log_store(&self) -> &LogStoreRef {
103        &self.log_store
104    }
105    fn get_custom_execute_handler(&self) -> Option<Arc<dyn CustomExecuteHandler>> {
106        self.custom_execute_handler.clone()
107    }
108}
109
110impl FileSystemCheckBuilder {
111    /// Create a new [`FileSystemCheckBuilder`]
112    pub fn new(log_store: LogStoreRef, state: DeltaTableState) -> Self {
113        FileSystemCheckBuilder {
114            snapshot: state,
115            log_store,
116            dry_run: false,
117            commit_properties: CommitProperties::default(),
118            custom_execute_handler: None,
119        }
120    }
121
122    /// Only determine which add actions should be removed. A dry run will not commit actions to the Delta log
123    pub fn with_dry_run(mut self, dry_run: bool) -> Self {
124        self.dry_run = dry_run;
125        self
126    }
127
128    /// Additional information to write to the commit
129    pub fn with_commit_properties(mut self, commit_properties: CommitProperties) -> Self {
130        self.commit_properties = commit_properties;
131        self
132    }
133
134    /// Set a custom execute handler, for pre and post execution
135    pub fn with_custom_execute_handler(mut self, handler: Arc<dyn CustomExecuteHandler>) -> Self {
136        self.custom_execute_handler = Some(handler);
137        self
138    }
139
140    async fn create_fsck_plan(&self) -> DeltaResult<FileSystemCheckPlan> {
141        let mut files_relative: HashMap<String, Add> = HashMap::new();
142        let log_store = self.log_store.clone();
143        let mut file_stream = self.snapshot.file_actions_iter(&self.log_store);
144        while let Some(active) = file_stream.next().await {
145            let active = active?;
146            if is_absolute_path(&active.path)? {
147                return Err(DeltaTableError::Generic(
148                    "Filesystem check does not support absolute paths".to_string(),
149                ));
150            } else {
151                files_relative.insert(active.path.clone(), active);
152            }
153        }
154
155        let object_store = log_store.object_store(None);
156        let mut files = object_store.list(None);
157        while let Some(result) = files.next().await {
158            let file = result?;
159            files_relative.remove(file.location.as_ref());
160
161            if files_relative.is_empty() {
162                break;
163            }
164        }
165
166        let files_to_remove: Vec<Add> = files_relative
167            .into_values()
168            .map(|file| file.to_owned())
169            .collect();
170
171        Ok(FileSystemCheckPlan {
172            files_to_remove,
173            log_store,
174        })
175    }
176}
177
178impl FileSystemCheckPlan {
179    pub async fn execute(
180        self,
181        snapshot: &DeltaTableState,
182        mut commit_properties: CommitProperties,
183        operation_id: Uuid,
184        handle: Option<Arc<dyn CustomExecuteHandler>>,
185    ) -> DeltaResult<FileSystemCheckMetrics> {
186        let mut actions = Vec::with_capacity(self.files_to_remove.len());
187        let mut removed_file_paths = Vec::with_capacity(self.files_to_remove.len());
188
189        for file in self.files_to_remove {
190            let deletion_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
191            let deletion_time = deletion_time.as_millis() as i64;
192            removed_file_paths.push(file.path.clone());
193            actions.push(Action::Remove(Remove {
194                path: file.path,
195                deletion_timestamp: Some(deletion_time),
196                data_change: true,
197                extended_file_metadata: None,
198                partition_values: Some(file.partition_values),
199                size: Some(file.size),
200                deletion_vector: None,
201                tags: file.tags,
202                base_row_id: file.base_row_id,
203                default_row_commit_version: file.default_row_commit_version,
204            }));
205        }
206        let metrics = FileSystemCheckMetrics {
207            dry_run: false,
208            files_removed: removed_file_paths,
209        };
210
211        commit_properties
212            .app_metadata
213            .insert("readVersion".to_owned(), snapshot.version().into());
214        commit_properties.app_metadata.insert(
215            "operationMetrics".to_owned(),
216            serde_json::to_value(&metrics)?,
217        );
218
219        CommitBuilder::from(commit_properties)
220            .with_operation_id(operation_id)
221            .with_post_commit_hook_handler(handle)
222            .with_actions(actions)
223            .build(
224                Some(snapshot),
225                self.log_store.clone(),
226                DeltaOperation::FileSystemCheck {},
227            )
228            .await?;
229
230        Ok(metrics)
231    }
232}
233
234impl std::future::IntoFuture for FileSystemCheckBuilder {
235    type Output = DeltaResult<(DeltaTable, FileSystemCheckMetrics)>;
236    type IntoFuture = BoxFuture<'static, Self::Output>;
237
238    fn into_future(self) -> Self::IntoFuture {
239        let this = self;
240
241        Box::pin(async move {
242            let plan = this.create_fsck_plan().await?;
243            if this.dry_run {
244                return Ok((
245                    DeltaTable::new_with_state(this.log_store, this.snapshot),
246                    FileSystemCheckMetrics {
247                        files_removed: plan.files_to_remove.into_iter().map(|f| f.path).collect(),
248                        dry_run: true,
249                    },
250                ));
251            }
252            if plan.files_to_remove.is_empty() {
253                return Ok((
254                    DeltaTable::new_with_state(this.log_store, this.snapshot),
255                    FileSystemCheckMetrics {
256                        dry_run: false,
257                        files_removed: Vec::new(),
258                    },
259                ));
260            };
261            let operation_id = this.get_operation_id();
262            this.pre_execute(operation_id).await?;
263
264            let metrics = plan
265                .execute(
266                    &this.snapshot,
267                    this.commit_properties.clone(),
268                    operation_id,
269                    this.get_custom_execute_handler(),
270                )
271                .await?;
272
273            this.post_execute(operation_id).await?;
274
275            let mut table = DeltaTable::new_with_state(this.log_store, this.snapshot);
276            table.update().await?;
277            Ok((table, metrics))
278        })
279    }
280}
281
282#[cfg(test)]
283mod tests {
284    use super::*;
285
286    #[test]
287    fn absolute_path() {
288        assert!(!is_absolute_path(
289            "part-00003-53f42606-6cda-4f13-8d07-599a21197296-c000.snappy.parquet"
290        )
291        .unwrap());
292        assert!(!is_absolute_path(
293            "x=9/y=9.9/part-00007-3c50fba1-4264-446c-9c67-d8e24a1ccf83.c000.snappy.parquet"
294        )
295        .unwrap());
296
297        assert!(is_absolute_path("abfss://container@account_name.blob.core.windows.net/full/part-00000-a72b1fb3-f2df-41fe-a8f0-e65b746382dd-c000.snappy.parquet").unwrap());
298        assert!(is_absolute_path("file:///C:/my_table/windows.parquet").unwrap());
299        assert!(is_absolute_path("file:///home/my_table/unix.parquet").unwrap());
300        assert!(is_absolute_path("s3://container/path/file.parquet").unwrap());
301        assert!(is_absolute_path("gs://container/path/file.parquet").unwrap());
302        assert!(is_absolute_path("scheme://table/file.parquet").unwrap());
303    }
304}