deltalake_core/operations/
filesystem_check.rs1use std::collections::HashMap;
16use std::fmt::Debug;
17use std::sync::Arc;
18use std::time::SystemTime;
19use std::time::UNIX_EPOCH;
20
21use futures::future::BoxFuture;
22use futures::StreamExt;
23use object_store::ObjectStore;
24use serde::{de::Error as DeError, Deserialize, Deserializer, Serialize, Serializer};
25use url::{ParseError, Url};
26use uuid::Uuid;
27
28use super::CustomExecuteHandler;
29use super::Operation;
30use crate::errors::{DeltaResult, DeltaTableError};
31use crate::kernel::transaction::{CommitBuilder, CommitProperties};
32use crate::kernel::{Action, Add, Remove};
33use crate::logstore::LogStoreRef;
34use crate::protocol::DeltaOperation;
35use crate::table::state::DeltaTableState;
36use crate::DeltaTable;
37
38pub struct FileSystemCheckBuilder {
41 snapshot: DeltaTableState,
43 log_store: LogStoreRef,
45 dry_run: bool,
47 commit_properties: CommitProperties,
49 custom_execute_handler: Option<Arc<dyn CustomExecuteHandler>>,
50}
51
52#[derive(Debug, Serialize)]
54pub struct FileSystemCheckMetrics {
55 pub dry_run: bool,
57 #[serde(
59 serialize_with = "serialize_vec_string",
60 deserialize_with = "deserialize_vec_string"
61 )]
62 pub files_removed: Vec<String>,
63}
64
65struct FileSystemCheckPlan {
66 log_store: LogStoreRef,
68 pub files_to_remove: Vec<Add>,
70}
71
72fn serialize_vec_string<S>(value: &Vec<String>, serializer: S) -> Result<S::Ok, S::Error>
74where
75 S: Serializer,
76{
77 let json_string = serde_json::to_string(value).map_err(serde::ser::Error::custom)?;
78 serializer.serialize_str(&json_string)
79}
80
81#[expect(dead_code)]
83fn deserialize_vec_string<'de, D>(deserializer: D) -> Result<Vec<String>, D::Error>
84where
85 D: Deserializer<'de>,
86{
87 let s: String = Deserialize::deserialize(deserializer)?;
88 serde_json::from_str(&s).map_err(DeError::custom)
89}
90
91fn is_absolute_path(path: &str) -> DeltaResult<bool> {
92 match Url::parse(path) {
93 Ok(_) => Ok(true),
94 Err(ParseError::RelativeUrlWithoutBase) => Ok(false),
95 Err(_) => Err(DeltaTableError::Generic(format!(
96 "Unable to parse path: {path}"
97 ))),
98 }
99}
100
101impl super::Operation<()> for FileSystemCheckBuilder {
102 fn log_store(&self) -> &LogStoreRef {
103 &self.log_store
104 }
105 fn get_custom_execute_handler(&self) -> Option<Arc<dyn CustomExecuteHandler>> {
106 self.custom_execute_handler.clone()
107 }
108}
109
110impl FileSystemCheckBuilder {
111 pub fn new(log_store: LogStoreRef, state: DeltaTableState) -> Self {
113 FileSystemCheckBuilder {
114 snapshot: state,
115 log_store,
116 dry_run: false,
117 commit_properties: CommitProperties::default(),
118 custom_execute_handler: None,
119 }
120 }
121
122 pub fn with_dry_run(mut self, dry_run: bool) -> Self {
124 self.dry_run = dry_run;
125 self
126 }
127
128 pub fn with_commit_properties(mut self, commit_properties: CommitProperties) -> Self {
130 self.commit_properties = commit_properties;
131 self
132 }
133
134 pub fn with_custom_execute_handler(mut self, handler: Arc<dyn CustomExecuteHandler>) -> Self {
136 self.custom_execute_handler = Some(handler);
137 self
138 }
139
140 async fn create_fsck_plan(&self) -> DeltaResult<FileSystemCheckPlan> {
141 let mut files_relative: HashMap<String, Add> = HashMap::new();
142 let log_store = self.log_store.clone();
143 let mut file_stream = self.snapshot.file_actions_iter(&self.log_store);
144 while let Some(active) = file_stream.next().await {
145 let active = active?;
146 if is_absolute_path(&active.path)? {
147 return Err(DeltaTableError::Generic(
148 "Filesystem check does not support absolute paths".to_string(),
149 ));
150 } else {
151 files_relative.insert(active.path.clone(), active);
152 }
153 }
154
155 let object_store = log_store.object_store(None);
156 let mut files = object_store.list(None);
157 while let Some(result) = files.next().await {
158 let file = result?;
159 files_relative.remove(file.location.as_ref());
160
161 if files_relative.is_empty() {
162 break;
163 }
164 }
165
166 let files_to_remove: Vec<Add> = files_relative
167 .into_values()
168 .map(|file| file.to_owned())
169 .collect();
170
171 Ok(FileSystemCheckPlan {
172 files_to_remove,
173 log_store,
174 })
175 }
176}
177
178impl FileSystemCheckPlan {
179 pub async fn execute(
180 self,
181 snapshot: &DeltaTableState,
182 mut commit_properties: CommitProperties,
183 operation_id: Uuid,
184 handle: Option<Arc<dyn CustomExecuteHandler>>,
185 ) -> DeltaResult<FileSystemCheckMetrics> {
186 let mut actions = Vec::with_capacity(self.files_to_remove.len());
187 let mut removed_file_paths = Vec::with_capacity(self.files_to_remove.len());
188
189 for file in self.files_to_remove {
190 let deletion_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
191 let deletion_time = deletion_time.as_millis() as i64;
192 removed_file_paths.push(file.path.clone());
193 actions.push(Action::Remove(Remove {
194 path: file.path,
195 deletion_timestamp: Some(deletion_time),
196 data_change: true,
197 extended_file_metadata: None,
198 partition_values: Some(file.partition_values),
199 size: Some(file.size),
200 deletion_vector: None,
201 tags: file.tags,
202 base_row_id: file.base_row_id,
203 default_row_commit_version: file.default_row_commit_version,
204 }));
205 }
206 let metrics = FileSystemCheckMetrics {
207 dry_run: false,
208 files_removed: removed_file_paths,
209 };
210
211 commit_properties
212 .app_metadata
213 .insert("readVersion".to_owned(), snapshot.version().into());
214 commit_properties.app_metadata.insert(
215 "operationMetrics".to_owned(),
216 serde_json::to_value(&metrics)?,
217 );
218
219 CommitBuilder::from(commit_properties)
220 .with_operation_id(operation_id)
221 .with_post_commit_hook_handler(handle)
222 .with_actions(actions)
223 .build(
224 Some(snapshot),
225 self.log_store.clone(),
226 DeltaOperation::FileSystemCheck {},
227 )
228 .await?;
229
230 Ok(metrics)
231 }
232}
233
234impl std::future::IntoFuture for FileSystemCheckBuilder {
235 type Output = DeltaResult<(DeltaTable, FileSystemCheckMetrics)>;
236 type IntoFuture = BoxFuture<'static, Self::Output>;
237
238 fn into_future(self) -> Self::IntoFuture {
239 let this = self;
240
241 Box::pin(async move {
242 let plan = this.create_fsck_plan().await?;
243 if this.dry_run {
244 return Ok((
245 DeltaTable::new_with_state(this.log_store, this.snapshot),
246 FileSystemCheckMetrics {
247 files_removed: plan.files_to_remove.into_iter().map(|f| f.path).collect(),
248 dry_run: true,
249 },
250 ));
251 }
252 if plan.files_to_remove.is_empty() {
253 return Ok((
254 DeltaTable::new_with_state(this.log_store, this.snapshot),
255 FileSystemCheckMetrics {
256 dry_run: false,
257 files_removed: Vec::new(),
258 },
259 ));
260 };
261 let operation_id = this.get_operation_id();
262 this.pre_execute(operation_id).await?;
263
264 let metrics = plan
265 .execute(
266 &this.snapshot,
267 this.commit_properties.clone(),
268 operation_id,
269 this.get_custom_execute_handler(),
270 )
271 .await?;
272
273 this.post_execute(operation_id).await?;
274
275 let mut table = DeltaTable::new_with_state(this.log_store, this.snapshot);
276 table.update().await?;
277 Ok((table, metrics))
278 })
279 }
280}
281
282#[cfg(test)]
283mod tests {
284 use super::*;
285
286 #[test]
287 fn absolute_path() {
288 assert!(!is_absolute_path(
289 "part-00003-53f42606-6cda-4f13-8d07-599a21197296-c000.snappy.parquet"
290 )
291 .unwrap());
292 assert!(!is_absolute_path(
293 "x=9/y=9.9/part-00007-3c50fba1-4264-446c-9c67-d8e24a1ccf83.c000.snappy.parquet"
294 )
295 .unwrap());
296
297 assert!(is_absolute_path("abfss://container@account_name.blob.core.windows.net/full/part-00000-a72b1fb3-f2df-41fe-a8f0-e65b746382dd-c000.snappy.parquet").unwrap());
298 assert!(is_absolute_path("file:///C:/my_table/windows.parquet").unwrap());
299 assert!(is_absolute_path("file:///home/my_table/unix.parquet").unwrap());
300 assert!(is_absolute_path("s3://container/path/file.parquet").unwrap());
301 assert!(is_absolute_path("gs://container/path/file.parquet").unwrap());
302 assert!(is_absolute_path("scheme://table/file.parquet").unwrap());
303 }
304}