use chrono::{DateTime, Utc};
use object_store::Error as ObjectStoreError;
use crate::kernel::Version;
use crate::kernel::transaction::{CommitBuilderError, TransactionError};
pub type DeltaResult<T, E = DeltaTableError> = Result<T, E>;
pub(crate) fn unsupported_column_mapping_write(operation: &str) -> DeltaTableError {
DeltaTableError::Generic(format!(
"column mapping writes are not supported for {operation} yet"
))
}
#[allow(missing_docs)]
#[derive(thiserror::Error, Debug)]
pub enum DeltaTableError {
#[error("Kernel error: {0}")]
KernelError(#[from] delta_kernel::error::Error),
#[error("Failed to read delta log object: {}", .source)]
ObjectStore {
#[from]
source: ObjectStoreError,
},
#[error("Failed to parse parquet: {}", .source)]
Parquet {
#[from]
source: parquet::errors::ParquetError,
},
#[error("Failed to convert into Arrow schema: {}", .source)]
Arrow {
#[from]
source: arrow::error::ArrowError,
},
#[error("Invalid JSON in log record, version={}, line=`{}`, err=`{}`", .version, .line, .json_err)]
InvalidJsonLog {
json_err: serde_json::error::Error,
line: String,
version: Version,
},
#[error("Invalid JSON in file stats: {}", .json_err)]
InvalidStatsJson {
json_err: serde_json::error::Error,
},
#[error("Invalid table version: {0}")]
InvalidVersion(Version),
#[error(
"Cannot downgrade from version {current_version} to {requested_version}; use DeltaTable.load_version()"
)]
VersionDowngrade {
current_version: Version,
requested_version: Version,
},
#[error("Invalid datetime string: {}", .source)]
InvalidDateTimeString {
#[from]
source: chrono::ParseError,
},
#[error("{message}")]
InvalidData {
message: String,
},
#[error("Not a Delta table: {0}")]
NotATable(String),
#[error("No schema found, please make sure table is loaded.")]
NoSchema,
#[error("Data does not match the schema or partitions of the table: {}", msg)]
SchemaMismatch {
msg: String,
},
#[error("This partition is not formatted with key=value: {}", .partition)]
PartitionError {
partition: String,
},
#[error("Invalid partition filter found: {}.", .partition_filter)]
InvalidPartitionFilter {
partition_filter: String,
},
#[error("Failed to read line from log record")]
Io {
#[from]
source: std::io::Error,
},
#[error("Commit actions are unsound: {source}")]
CommitValidation {
source: CommitBuilderError,
},
#[error("Transaction failed: {source}")]
Transaction {
source: TransactionError,
},
#[error("Delta transaction failed, version {0} already exists.")]
VersionAlreadyExists(Version),
#[error("Delta transaction failed, version {0} does not follow {1}")]
VersionMismatch(Version, Version),
#[error("Delta-rs must be build with feature '{feature}' to support loading from: {url}.")]
MissingFeature {
feature: &'static str,
url: String,
},
#[error("Cannot infer storage location from: {0}")]
InvalidTableLocation(String),
#[error("Log JSON serialization error: {json_err}")]
SerializeLogJson {
json_err: serde_json::error::Error,
},
#[error("Generic DeltaTable error: {0}")]
Generic(String),
#[error("Generic error: {source}")]
GenericError {
source: Box<dyn std::error::Error + Send + Sync + 'static>,
},
#[error("Kernel: {source}")]
Kernel {
#[from]
source: crate::kernel::Error,
},
#[error("Table metadata is invalid: {0}")]
MetadataError(String),
#[error("Table has not yet been initialized")]
NotInitialized,
#[error("Table has not yet been initialized with files, therefore {0} is not supported")]
NotInitializedWithFiles(String),
#[error("Change Data not enabled for version: {version}, Start: {start}, End: {end}")]
ChangeDataNotRecorded {
version: Version,
start: Version,
end: Version,
},
#[error("Reading a table version: {version} that does not have change data enabled")]
ChangeDataNotEnabled { version: Version },
#[error("Invalid version. Start version {start} is greater than end version {end}")]
ChangeDataInvalidVersionRange { start: Version, end: Version },
#[error("End timestamp {ending_timestamp} is greater than latest commit timestamp")]
ChangeDataTimestampGreaterThanCommit { ending_timestamp: DateTime<Utc> },
#[error("No starting version or timestamp provided for CDC")]
NoStartingVersionOrTimestamp,
}
impl From<object_store::path::Error> for DeltaTableError {
fn from(err: object_store::path::Error) -> Self {
Self::GenericError {
source: Box::new(err),
}
}
}
impl From<serde_json::Error> for DeltaTableError {
fn from(value: serde_json::Error) -> Self {
DeltaTableError::InvalidStatsJson { json_err: value }
}
}
impl DeltaTableError {
pub fn not_a_table(path: impl AsRef<str>) -> Self {
let msg = format!(
"No snapshot or version 0 found, perhaps {} is an empty dir?",
path.as_ref()
);
Self::NotATable(msg)
}
pub fn generic(msg: impl ToString) -> Self {
Self::Generic(msg.to_string())
}
}