use std::collections::HashMap;
use std::fmt::Debug;
use std::sync::Arc;
use std::time::SystemTime;
use std::time::UNIX_EPOCH;
use futures::StreamExt;
use futures::TryStreamExt;
use futures::future::BoxFuture;
use object_store::ObjectStore;
use serde::{Deserialize, Deserializer, Serialize, Serializer, de::Error as DeError};
use tracing::*;
use url::{ParseError, Url};
use uuid::Uuid;
use super::CustomExecuteHandler;
use super::Operation;
use crate::DeltaTable;
use crate::errors::{DeltaResult, DeltaTableError};
use crate::kernel::EagerSnapshot;
use crate::kernel::resolve_snapshot;
use crate::kernel::transaction::{CommitBuilder, CommitProperties};
use crate::kernel::{Action, Add, Remove};
use crate::logstore::LogStoreRef;
use crate::protocol::DeltaOperation;
use crate::table::state::DeltaTableState;
pub struct FileSystemCheckBuilder {
snapshot: Option<EagerSnapshot>,
log_store: LogStoreRef,
dry_run: bool,
commit_properties: CommitProperties,
custom_execute_handler: Option<Arc<dyn CustomExecuteHandler>>,
}
#[derive(Debug, Serialize)]
pub struct FileSystemCheckMetrics {
pub dry_run: bool,
#[serde(
serialize_with = "serialize_vec_string",
deserialize_with = "deserialize_vec_string"
)]
pub files_removed: Vec<String>,
}
struct FileSystemCheckPlan {
log_store: LogStoreRef,
pub files_to_remove: Vec<Add>,
}
fn serialize_vec_string<S>(value: &Vec<String>, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let json_string = serde_json::to_string(value).map_err(serde::ser::Error::custom)?;
serializer.serialize_str(&json_string)
}
#[expect(dead_code)]
fn deserialize_vec_string<'de, D>(deserializer: D) -> Result<Vec<String>, D::Error>
where
D: Deserializer<'de>,
{
let s: String = Deserialize::deserialize(deserializer)?;
serde_json::from_str(&s).map_err(DeError::custom)
}
fn is_absolute_path(path: &str) -> DeltaResult<bool> {
match Url::parse(path) {
Ok(_) => Ok(true),
Err(ParseError::RelativeUrlWithoutBase) => Ok(false),
Err(_) => Err(DeltaTableError::Generic(format!(
"Unable to parse path: {path}"
))),
}
}
impl super::Operation for FileSystemCheckBuilder {
fn log_store(&self) -> &LogStoreRef {
&self.log_store
}
fn get_custom_execute_handler(&self) -> Option<Arc<dyn CustomExecuteHandler>> {
self.custom_execute_handler.clone()
}
}
impl FileSystemCheckBuilder {
pub(crate) fn new(log_store: LogStoreRef, snapshot: Option<EagerSnapshot>) -> Self {
FileSystemCheckBuilder {
snapshot,
log_store,
dry_run: false,
commit_properties: CommitProperties::default(),
custom_execute_handler: None,
}
}
pub fn with_dry_run(mut self, dry_run: bool) -> Self {
self.dry_run = dry_run;
self
}
pub fn with_commit_properties(mut self, commit_properties: CommitProperties) -> Self {
self.commit_properties = commit_properties;
self
}
pub fn with_custom_execute_handler(mut self, handler: Arc<dyn CustomExecuteHandler>) -> Self {
self.custom_execute_handler = Some(handler);
self
}
async fn create_fsck_plan(&self, snapshot: &EagerSnapshot) -> DeltaResult<FileSystemCheckPlan> {
let mut files_relative: HashMap<String, Add> = HashMap::new();
let log_store = self.log_store.clone();
let mut file_stream = snapshot.file_views(&log_store, None).map_ok(|f| f.to_add());
while let Some(active) = file_stream.next().await {
let active = active?;
if is_absolute_path(&active.path)? {
return Err(DeltaTableError::Generic(
"Filesystem check does not support absolute paths".to_string(),
));
} else {
files_relative.insert(active.path.clone(), active);
}
}
let object_store = log_store.object_store(None);
let list_span = info_span!("list_files", operation = "filesystem_check");
let mut files = list_span.in_scope(|| object_store.list(None));
let mut file_count = 0;
while let Some(result) = files.next().await {
let file = result?;
file_count += 1;
files_relative.remove(file.location.as_ref());
if files_relative.is_empty() {
break;
}
}
info!(
files_scanned = file_count,
missing_files = files_relative.len(),
"filesystem check listing completed"
);
let files_to_remove: Vec<Add> = files_relative
.into_values()
.map(|file| file.to_owned())
.collect();
Ok(FileSystemCheckPlan {
files_to_remove,
log_store,
})
}
}
impl FileSystemCheckPlan {
pub async fn execute(
self,
snapshot: &EagerSnapshot,
mut commit_properties: CommitProperties,
operation_id: Uuid,
handle: Option<Arc<dyn CustomExecuteHandler>>,
) -> DeltaResult<FileSystemCheckMetrics> {
let mut actions = Vec::with_capacity(self.files_to_remove.len());
let mut removed_file_paths = Vec::with_capacity(self.files_to_remove.len());
for file in self.files_to_remove {
let deletion_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
let deletion_time = deletion_time.as_millis() as i64;
removed_file_paths.push(file.path.clone());
actions.push(Action::Remove(Remove {
path: file.path,
deletion_timestamp: Some(deletion_time),
data_change: true,
extended_file_metadata: None,
partition_values: Some(file.partition_values),
size: Some(file.size),
deletion_vector: None,
tags: file.tags,
base_row_id: file.base_row_id,
default_row_commit_version: file.default_row_commit_version,
}));
}
let metrics = FileSystemCheckMetrics {
dry_run: false,
files_removed: removed_file_paths,
};
commit_properties
.app_metadata
.insert("readVersion".to_owned(), snapshot.version().into());
commit_properties.app_metadata.insert(
"operationMetrics".to_owned(),
serde_json::to_value(&metrics)?,
);
CommitBuilder::from(commit_properties)
.with_operation_id(operation_id)
.with_post_commit_hook_handler(handle)
.with_actions(actions)
.build(
Some(snapshot),
self.log_store.clone(),
DeltaOperation::FileSystemCheck {},
)
.await?;
Ok(metrics)
}
}
impl std::future::IntoFuture for FileSystemCheckBuilder {
type Output = DeltaResult<(DeltaTable, FileSystemCheckMetrics)>;
type IntoFuture = BoxFuture<'static, Self::Output>;
fn into_future(self) -> Self::IntoFuture {
let this = self;
Box::pin(async move {
let snapshot =
resolve_snapshot(&this.log_store, this.snapshot.clone(), true, None).await?;
let plan = this.create_fsck_plan(&snapshot).await?;
if this.dry_run {
return Ok((
DeltaTable::new_with_state(this.log_store, DeltaTableState::new(snapshot)),
FileSystemCheckMetrics {
files_removed: plan.files_to_remove.into_iter().map(|f| f.path).collect(),
dry_run: true,
},
));
}
if plan.files_to_remove.is_empty() {
return Ok((
DeltaTable::new_with_state(this.log_store, DeltaTableState::new(snapshot)),
FileSystemCheckMetrics {
dry_run: false,
files_removed: Vec::new(),
},
));
};
let operation_id = this.get_operation_id();
this.pre_execute(operation_id).await?;
let metrics = plan
.execute(
&snapshot,
this.commit_properties.clone(),
operation_id,
this.get_custom_execute_handler(),
)
.await?;
this.post_execute(operation_id).await?;
let mut table =
DeltaTable::new_with_state(this.log_store, DeltaTableState::new(snapshot));
table.update_state().await?;
Ok((table, metrics))
})
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn absolute_path() {
assert!(
!is_absolute_path(
"part-00003-53f42606-6cda-4f13-8d07-599a21197296-c000.snappy.parquet"
)
.unwrap()
);
assert!(
!is_absolute_path(
"x=9/y=9.9/part-00007-3c50fba1-4264-446c-9c67-d8e24a1ccf83.c000.snappy.parquet"
)
.unwrap()
);
assert!(is_absolute_path("abfss://container@account_name.blob.core.windows.net/full/part-00000-a72b1fb3-f2df-41fe-a8f0-e65b746382dd-c000.snappy.parquet").unwrap());
assert!(is_absolute_path("file:///C:/my_table/windows.parquet").unwrap());
assert!(is_absolute_path("file:///home/my_table/unix.parquet").unwrap());
assert!(is_absolute_path("s3://container/path/file.parquet").unwrap());
assert!(is_absolute_path("gs://container/path/file.parquet").unwrap());
assert!(is_absolute_path("scheme://table/file.parquet").unwrap());
}
}