use object_store::path::Path;
use std::ops::Bound;
use std::time::Duration;
use std::{path::PathBuf, sync::Arc};
use thiserror::Error as ThisError;
use uuid::Uuid;
use crate::bytes_range::BytesRange;
use crate::error::SlateDBError::{
LatestTransactionalObjectVersionMissing, TransactionalObjectVersionExists,
};
use crate::merge_operator::MergeOperatorError;
use slatedb_txn_obj::TransactionalObjectError;
#[non_exhaustive]
#[derive(Clone, Debug, ThisError)]
pub(crate) enum SlateDBError {
#[error("io error")]
IoError(#[from] Arc<std::io::Error>),
#[error("checksum mismatch")]
ChecksumMismatch,
#[error("empty SSTable")]
EmptySSTable,
#[error("empty block metadata")]
EmptyBlockMeta,
#[error("empty block")]
EmptyBlock,
#[error("empty RowEntry key")]
EmptyKey,
#[error("empty write batch not allowed")]
EmptyBatch,
#[error("empty manifest")]
EmptyManifest,
#[error("object store error")]
ObjectStoreError(#[from] Arc<object_store::Error>),
#[error("failed to find manifest with id. id=`{0}`")]
ManifestMissing(u64),
#[error("transactional object (e.g. manifest) version already exists")]
TransactionalObjectVersionExists,
#[error("failed to find latest transactional object (e.g. manifest) version")]
LatestTransactionalObjectVersionMissing,
#[error("generic transactional object (e.g. manifest) error {0:?}")]
TransactionalObjectError(#[from] Arc<TransactionalObjectError>),
#[error("transactional object (e.g. manifest) op timeout after {timeout:?}")]
TransactionalObjectTimeout { timeout: Duration },
#[error("transactional object (e.g. manifest) is in an invalid state")]
InvalidTransactionalObjectState,
#[error("invalid deletion")]
InvalidDeletion,
#[error("invalid sst error")]
InvalidFlatbuffer(#[from] flatbuffers::InvalidFlatbuffer),
#[error("invalid DB state error")]
InvalidDBState,
#[error("wal store reconfiguration unsupported")]
WalStoreReconfigurationError,
#[error("invalid compaction")]
InvalidCompaction,
#[error("compaction executor failed")]
CompactionExecutorFailed,
#[error(
"invalid clock tick, must be monotonic. last_tick=`{last_tick}`, next_tick=`{next_tick}`"
)]
InvalidClockTick { last_tick: i64, next_tick: i64 },
#[error("detected newer DB client")]
Fenced,
#[error("invalid cache part size bytes, it must be multiple of 1024 and greater than 0")]
InvalidCachePartSize,
#[error("invalid compression codec")]
InvalidCompressionCodec,
#[cfg(any(
feature = "snappy",
feature = "zlib",
feature = "lz4",
feature = "zstd"
))]
#[error("error decompressing block")]
BlockDecompressionError,
#[cfg(any(feature = "snappy", feature = "zlib", feature = "zstd"))]
#[error("error compressing block")]
BlockCompressionError,
#[error("error transforming block")]
BlockTransformError,
#[error("Invalid RowFlags. #{message}. encoded_bits=`{encoded_bits:#b}`, known_bits=`{known_bits:#b}`")]
InvalidRowFlags {
encoded_bits: u8,
known_bits: u8,
message: String,
},
#[error("read channel error")]
ReadChannelError(#[from] tokio::sync::oneshot::error::RecvError),
#[error("background task panicked. name=`{0}`")]
BackgroundTaskPanic(String),
#[error("background task exists. name=`{0}`")]
BackgroundTaskExists(String),
#[error("background task cancelled. name=`{0}`")]
BackgroundTaskCancelled(String),
#[error("background task executor already started")]
BackgroundTaskExecutorStarted,
#[error("db is closed")]
Closed,
#[error("merge operator error")]
MergeOperatorError(#[from] MergeOperatorError),
#[error("merge operator missing. A merge operator is required to read merge operands")]
MergeOperatorMissing,
#[error("checkpoint missing. checkpoint_id=`{0}`")]
CheckpointMissing(Uuid),
#[error(
"unsupported {format_name} format version. supported_versions=`{supported_versions:?}`, actual_version=`{actual_version}`"
)]
InvalidVersion {
format_name: &'static str,
supported_versions: Vec<u16>,
actual_version: u16,
},
#[error("foyer error")]
#[cfg(feature = "foyer")]
FoyerError(#[from] Arc<foyer::Error>),
#[error("cannot seek to a key outside the iterator range. key=`{key:?}`, range=`{range:?}`")]
SeekKeyOutOfRange { key: Vec<u8>, range: BytesRange },
#[error("cannot seek to a key less than the last returned key")]
SeekKeyLessThanLastReturnedKey,
#[error(
"parent path must be different from the clone's path. parent_path=`{0}`, clone_path=`{0}`"
)]
IdenticalClonePaths(Path),
#[error("invalid checkpoint lifetime. lifetime=`{0:?}`")]
InvalidCheckpointLifetime(Duration),
#[error("invalid manifest poll interval. interval=`{0:?}`")]
InvalidManifestPollInterval(Duration),
#[error("checkpoint lifetime must be at least double the manifest poll interval. lifetime=`{lifetime:?}`, interval=`{interval:?}`")]
CheckpointLifetimeTooShort {
lifetime: Duration,
interval: Duration,
},
#[error("invalid sst batch size. size=`{0}`")]
InvalidSSTBatchSize(usize),
#[error("cannot seek to a key outside the iterator range. key=`{key:?}`, start_key=`{start_key:?}`, end_key=`{end_key:?}`")]
SeekKeyOutOfKeyRange {
key: Vec<u8>,
start_key: Bound<Vec<u8>>,
end_key: Bound<Vec<u8>>,
},
#[error("the cloned database is not attached to any external database")]
CloneExternalDbMissing,
#[error("the cloned database is not attached to external database with a valid checkpoint. path=`{path}`, checkpoint_id=`{checkpoint_id:?}`")]
CloneIncorrectExternalDbCheckpoint {
path: String,
checkpoint_id: Option<Uuid>,
},
#[error("the final checkpoint for the cloned database no longer exists in the manifest. path=`{path}`, checkpoint_id=`{checkpoint_id}`")]
CloneIncorrectFinalCheckpoint { path: String, checkpoint_id: Uuid },
#[error("unknown configuration file format. path=`{0}`")]
UnknownConfigurationFormat(PathBuf),
#[error("invalid configuration format")]
InvalidConfigurationFormat(#[from] Box<figment::Error>),
#[error("attempted a WAL operation when the WAL is disabled")]
WalDisabled,
#[error("invalid object store URL. url=`{0}`")]
InvalidObjectStoreURL(String, #[source] url::ParseError),
#[error("transaction conflict")]
TransactionConflict,
#[error("iterator not initialized")]
IteratorNotInitialized,
#[cfg(feature = "compaction_filters")]
#[error("compaction filter error: {0}")]
CompactionFilterError(Arc<crate::compaction_filter::CompactionFilterError>),
#[error("invalid sequence number ordering during merge. expected sequence numbers in descending order, but found {current_seq} followed by {next_seq}")]
InvalidSequenceOrder { current_seq: u64, next_seq: u64 },
#[error("undefined environment variable {key}")]
UndefinedEnvironmentVariable { key: String },
#[error("invalid environment variable {key} value `{value}`")]
InvalidEnvironmentVariable { key: String, value: String },
#[error("unexpected tombstone encountered where a value was expected")]
UnexpectedTombstone,
}
impl From<TransactionalObjectError> for SlateDBError {
fn from(error: TransactionalObjectError) -> Self {
match error {
TransactionalObjectError::IoError(e) => SlateDBError::from(e),
TransactionalObjectError::ObjectStoreError(e) => SlateDBError::from(e),
TransactionalObjectError::LatestRecordMissing => {
LatestTransactionalObjectVersionMissing
}
TransactionalObjectError::ObjectVersionExists => TransactionalObjectVersionExists,
TransactionalObjectError::Fenced => SlateDBError::Fenced,
TransactionalObjectError::CallbackError(err) => match err.downcast::<SlateDBError>() {
Err(err) => SlateDBError::TransactionalObjectError(Arc::new(
TransactionalObjectError::CallbackError(err),
)),
Ok(err) => *err,
},
TransactionalObjectError::ObjectUpdateTimeout { timeout } => {
SlateDBError::TransactionalObjectTimeout { timeout }
}
TransactionalObjectError::InvalidObjectState => {
SlateDBError::InvalidTransactionalObjectState
}
other => SlateDBError::TransactionalObjectError(Arc::new(other)),
}
}
}
impl From<std::io::Error> for SlateDBError {
fn from(value: std::io::Error) -> Self {
Self::IoError(Arc::new(value))
}
}
impl From<object_store::Error> for SlateDBError {
fn from(value: object_store::Error) -> Self {
Self::ObjectStoreError(Arc::new(value))
}
}
#[cfg(feature = "foyer")]
impl From<foyer::Error> for SlateDBError {
fn from(value: foyer::Error) -> Self {
Self::FoyerError(Arc::new(value))
}
}
#[cfg(feature = "compaction_filters")]
impl From<crate::compaction_filter::CompactionFilterError> for SlateDBError {
fn from(value: crate::compaction_filter::CompactionFilterError) -> Self {
Self::CompactionFilterError(Arc::new(value))
}
}
type BoxError = Box<dyn std::error::Error + Send + Sync>;
#[non_exhaustive]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CloseReason {
Clean,
Fenced,
Panic,
}
#[non_exhaustive]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ErrorKind {
Transaction,
Closed(CloseReason),
Unavailable,
Invalid,
Data,
Internal,
}
impl From<ErrorKind> for CloseReason {
fn from(kind: ErrorKind) -> Self {
match kind {
ErrorKind::Closed(reason) => reason,
_ => CloseReason::Panic,
}
}
}
impl std::fmt::Display for ErrorKind {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ErrorKind::Transaction => write!(f, "Transaction error"),
ErrorKind::Closed(_) => write!(f, "Closed error"),
ErrorKind::Unavailable => write!(f, "Unavailable error"),
ErrorKind::Invalid => write!(f, "Invalid error"),
ErrorKind::Data => write!(f, "Data error"),
ErrorKind::Internal => write!(f, "Internal error"),
}
}
}
#[non_exhaustive]
#[derive(Debug)]
pub struct Error {
msg: String,
kind: ErrorKind,
source: Option<BoxError>,
}
impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}: {}", self.kind, self.msg)?;
if let Some(source) = self.source.as_ref() {
write!(f, " ({source})")?;
}
Ok(())
}
}
impl std::error::Error for Error {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
self.source
.as_ref()
.map(|e| e.as_ref() as &(dyn std::error::Error + 'static))
}
}
impl Error {
pub fn transaction(msg: String) -> Self {
Self {
msg,
kind: ErrorKind::Transaction,
source: None,
}
}
pub fn closed(msg: String, reason: CloseReason) -> Self {
Self {
msg,
kind: ErrorKind::Closed(reason),
source: None,
}
}
pub fn unavailable(msg: String) -> Self {
Self {
msg,
kind: ErrorKind::Unavailable,
source: None,
}
}
pub fn invalid(msg: String) -> Self {
Self {
msg,
kind: ErrorKind::Invalid,
source: None,
}
}
pub fn data(msg: String) -> Self {
Self {
msg,
kind: ErrorKind::Data,
source: None,
}
}
pub fn internal(msg: String) -> Self {
Self {
msg,
kind: ErrorKind::Internal,
source: None,
}
}
pub fn with_source(mut self, source: BoxError) -> Self {
self.source = Some(source);
self
}
pub fn kind(&self) -> ErrorKind {
self.kind
}
}
impl From<SlateDBError> for Error {
fn from(err: SlateDBError) -> Self {
let msg = err.to_string();
match err {
SlateDBError::TransactionConflict => Error::transaction(msg),
SlateDBError::Closed => Error::closed(msg, CloseReason::Clean),
SlateDBError::Fenced => Error::closed(msg, CloseReason::Fenced),
SlateDBError::BackgroundTaskPanic(_) => Error::closed(msg, CloseReason::Panic),
SlateDBError::IoError(err) => Error::unavailable(msg).with_source(Box::new(err)),
SlateDBError::ObjectStoreError(err) => {
let error = if matches!(err.as_ref(), object_store::Error::NotFound { .. }) {
Error::data(msg)
} else {
Error::unavailable(msg)
};
error.with_source(Box::new(err))
}
#[cfg(feature = "foyer")]
SlateDBError::FoyerError(err) => Error::unavailable(msg).with_source(Box::new(err)),
SlateDBError::TransactionalObjectTimeout { .. } => Error::unavailable(msg),
SlateDBError::InvalidCachePartSize => Error::invalid(msg),
SlateDBError::InvalidCompressionCodec => Error::invalid(msg),
SlateDBError::WalStoreReconfigurationError => Error::invalid(msg),
SlateDBError::InvalidConfigurationFormat(err) => {
Error::invalid(msg).with_source(Box::new(err))
}
SlateDBError::InvalidObjectStoreURL(_, err) => {
Error::invalid(msg).with_source(Box::new(err))
}
SlateDBError::UnknownConfigurationFormat(_) => Error::invalid(msg),
SlateDBError::InvalidSSTBatchSize(_) => Error::invalid(msg),
SlateDBError::InvalidCheckpointLifetime(_) => Error::invalid(msg),
SlateDBError::InvalidManifestPollInterval(_) => Error::invalid(msg),
SlateDBError::CheckpointLifetimeTooShort { .. } => Error::invalid(msg),
SlateDBError::SeekKeyOutOfRange { .. } => Error::invalid(msg),
SlateDBError::SeekKeyLessThanLastReturnedKey => Error::invalid(msg),
SlateDBError::IdenticalClonePaths { .. } => Error::invalid(msg),
SlateDBError::WalDisabled => Error::invalid(msg),
SlateDBError::InvalidCompaction => Error::invalid(msg),
SlateDBError::InvalidClockTick { .. } => Error::invalid(msg),
SlateDBError::InvalidDeletion => Error::invalid(msg),
SlateDBError::MergeOperatorError(err) => Error::invalid(msg).with_source(Box::new(err)),
SlateDBError::MergeOperatorMissing => Error::invalid(msg),
SlateDBError::IteratorNotInitialized => Error::invalid(msg),
SlateDBError::InvalidSequenceOrder { .. } => Error::data(msg),
SlateDBError::UndefinedEnvironmentVariable { .. } => Error::invalid(msg),
SlateDBError::InvalidEnvironmentVariable { .. } => Error::invalid(msg),
SlateDBError::EmptyBatch => Error::invalid(msg),
SlateDBError::InvalidFlatbuffer(err) => Error::data(msg).with_source(Box::new(err)),
SlateDBError::InvalidDBState => Error::data(msg),
#[cfg(any(
feature = "snappy",
feature = "zlib",
feature = "lz4",
feature = "zstd"
))]
SlateDBError::BlockDecompressionError => Error::data(msg),
#[cfg(any(feature = "snappy", feature = "zlib", feature = "zstd"))]
SlateDBError::BlockCompressionError => Error::data(msg),
SlateDBError::BlockTransformError => Error::data(msg),
SlateDBError::InvalidRowFlags { .. } => Error::data(msg),
SlateDBError::CheckpointMissing(_) => Error::data(msg),
SlateDBError::InvalidVersion { .. } => Error::data(msg),
SlateDBError::ManifestMissing(_) => Error::data(msg),
SlateDBError::LatestTransactionalObjectVersionMissing => Error::data(msg),
SlateDBError::TransactionalObjectVersionExists => Error::data(msg),
SlateDBError::InvalidTransactionalObjectState => Error::data(msg),
SlateDBError::EmptyManifest => Error::data(msg),
SlateDBError::EmptyBlock => Error::data(msg),
SlateDBError::EmptyKey => Error::data(msg),
SlateDBError::EmptyBlockMeta => Error::data(msg),
SlateDBError::EmptySSTable => Error::data(msg),
SlateDBError::ChecksumMismatch => Error::data(msg),
SlateDBError::CloneExternalDbMissing => Error::data(msg),
SlateDBError::CloneIncorrectExternalDbCheckpoint { .. } => Error::data(msg),
SlateDBError::CloneIncorrectFinalCheckpoint { .. } => Error::data(msg),
SlateDBError::CompactionExecutorFailed => Error::internal(msg),
#[cfg(feature = "compaction_filters")]
SlateDBError::CompactionFilterError(_) => Error::internal(msg),
SlateDBError::SeekKeyOutOfKeyRange { .. } => Error::internal(msg),
SlateDBError::ReadChannelError(err) => Error::internal(msg).with_source(Box::new(err)),
SlateDBError::BackgroundTaskExists(_) => Error::internal(msg),
SlateDBError::BackgroundTaskCancelled(_) => Error::internal(msg),
SlateDBError::BackgroundTaskExecutorStarted => Error::internal(msg),
SlateDBError::UnexpectedTombstone => Error::internal(msg),
SlateDBError::TransactionalObjectError(err) => {
Error::internal(msg).with_source(Box::new(err))
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn object_store_error_not_found_maps_to_data() {
let err = SlateDBError::from(object_store::Error::NotFound {
path: "test/path".to_string(),
source: Box::new(std::io::Error::other("not found")),
});
let public_err = Error::from(err);
assert_eq!(public_err.kind(), ErrorKind::Data);
}
#[test]
fn object_store_error_non_not_found_maps_to_unavailable() {
let err = SlateDBError::from(object_store::Error::NotImplemented);
let public_err = Error::from(err);
assert_eq!(public_err.kind(), ErrorKind::Unavailable);
}
}