use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Instant;
use std::{fmt, io};
use parking_lot::RwLock;
pub type Result<T> = std::result::Result<T, Error>;
#[derive(Clone, Debug)]
pub enum Error {
Abort, Io(Arc<io::Error>), Send(String),
Receive(String),
CorruptedBlock(String),
Compression(String),
KeyNotInOrder,
FilterBlockEmpty,
Decompression(String),
InvalidFilename(String),
CorruptedTableMetadata(String),
InvalidTableFormat,
TableMetadataNotFound,
Wal(String),
BlockNotFound,
BatchTooLarge,
InvalidBatchRecord,
TransactionWriteConflict,
TransactionRetry,
TransactionClosed,
EmptyKey,
TransactionWriteOnly,
TransactionReadOnly,
TransactionWithoutSavepoint,
KeyNotFound,
WriteStall {
reason: WriteStallReason,
},
ArenaFull, FileDescriptorNotFound,
TableIDCollision(u64),
TableNotFound(u64),
PipelineStall,
Other(String), NoSnapshot,
CommitFail(String),
LoadManifestFail(String),
Corruption(String), ManifestCorruption(String),
InvalidArgument(String),
InvalidTag(String),
BPlusTree(String), InterleavedIteration, WalCorruption {
segment_id: usize,
offset: usize,
message: String,
},
SSTable(crate::sstable::error::SSTableError), }
impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Abort => write!(f, "Operation aborted"),
Self::Io(err) => write!(f, "IO error: {err}"),
Self::Send(err) => write!(f, "Send error: {err}"),
Self::Receive(err) => write!(f, "Receive error: {err}"),
Self::CorruptedBlock(err) => write!(f, "Corrupted block: {err}"),
Self::Compression(err) => write!(f, "Compression error: {err}"),
Self::KeyNotInOrder => write!(f, "Keys are not in order"),
Self::FilterBlockEmpty => write!(f, "Filter block is empty"),
Self::Decompression(err) => write!(f, "Decompression error: {err}"),
Self::InvalidFilename(err) => write!(f, "Invalid filename: {err}"),
Self::CorruptedTableMetadata(err) => write!(f, "Corrupted table metadata: {err}"),
Self::InvalidTableFormat => write!(f, "Invalid table format"),
Self::TableMetadataNotFound => write!(f, "Table metadata not found"),
Self::Wal(err) => write!(f, "WAL error: {err}"),
Self::BlockNotFound => write!(f, "Block not found"),
Self::BatchTooLarge => write!(f, "Batch too large"),
Self::InvalidBatchRecord => write!(f, "Invalid batch record"),
Self::TransactionWriteConflict => write!(f, "Transaction write conflict"),
Self::TransactionRetry => write!(f, "Transaction retry required: memtable history insufficient for conflict detection"),
Self::TransactionClosed => write!(f, "Transaction closed"),
Self::EmptyKey => write!(f, "Empty key"),
Self::TransactionWriteOnly => write!(f, "Transaction is write-only"),
Self::TransactionReadOnly => write!(f, "Transaction is read-only"),
Self::TransactionWithoutSavepoint => write!(f, "Transaction has no savepoint to rollback to"),
Self::KeyNotFound => write!(f, "Key not found"),
Self::WriteStall { reason } => write!(f, "Write stall: {:?}", reason),
Self::ArenaFull => write!(f, "Memtable arena is full"),
Self::FileDescriptorNotFound => write!(f, "File descriptor not found"),
Self::TableIDCollision(id) => write!(f, "CRITICAL ERROR: Table ID collision detected. New table ID {id} conflicts with a table ID in the merge list."),
Self::TableNotFound(id) => write!(f, "Table not found: {id}"),
Self::PipelineStall => write!(f, "Pipeline stall"),
Self::Other(err) => write!(f, "Other error: {err}"),
Self::NoSnapshot => write!(f, "No snapshot available"),
Self::CommitFail(err) => write!(f, "Commit failed: {err}"),
Self::LoadManifestFail(err) => write!(f, "Failed to load manifest: {err}"),
Self::Corruption(err) => write!(f, "Data corruption detected: {err}"),
Self::ManifestCorruption(err) => write!(f, "Manifest corruption detected: {err}"),
Self::InvalidArgument(err) => write!(f, "Invalid argument: {err}"),
Self::InvalidTag(err) => write!(f, "Invalid tag: {err}"),
Self::BPlusTree(err) => write!(f, "B+ tree error: {err}"),
Self::InterleavedIteration => write!(f, "Interleaved iteration not supported: cannot mix next() and next_back() on same iterator"),
Self::WalCorruption { segment_id, offset, message } => write!(
f,
"WAL corruption in segment {} at offset {}: {}",
segment_id, offset, message
),
Self::SSTable(err) => write!(f, "SSTable error: {err}"),
}
}
}
impl std::error::Error for Error {}
impl Error {
pub fn wal_corruption(segment_id: usize, offset: usize, message: impl Into<String>) -> Self {
Self::WalCorruption {
segment_id,
offset,
message: message.into(),
}
}
}
impl From<io::Error> for Error {
fn from(e: io::Error) -> Error {
Error::Io(Arc::new(e))
}
}
impl From<crate::wal::Error> for Error {
fn from(err: crate::wal::Error) -> Self {
Error::Wal(err.to_string())
}
}
impl From<crate::bplustree::tree::BPlusTreeError> for Error {
fn from(err: crate::bplustree::tree::BPlusTreeError) -> Self {
Error::BPlusTree(err.to_string())
}
}
impl From<crate::sstable::error::SSTableError> for Error {
fn from(err: crate::sstable::error::SSTableError) -> Self {
Error::SSTable(err)
}
}
impl<T> From<std::sync::PoisonError<T>> for Error {
fn from(_err: std::sync::PoisonError<T>) -> Self {
Error::Other("Lock poisoned - another thread panicked while holding the lock".to_string())
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub enum ErrorSeverity {
#[allow(unused)]
NoError = 0,
#[allow(unused)]
SoftError = 1,
HardError = 2,
FatalError = 3,
Unrecoverable = 4,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum BackgroundErrorReason {
MemtablaFlush,
Compaction,
ManifestWrite,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum WriteStallReason {
MemtableLimit,
L0FileLimit,
}
#[derive(Debug, Clone)]
pub struct BackgroundError {
pub error: Error,
pub severity: ErrorSeverity,
pub reason: BackgroundErrorReason,
pub timestamp: Instant,
}
pub struct BackgroundErrorHandler {
bg_error: RwLock<Option<BackgroundError>>,
is_db_stopped: AtomicBool,
error_count: AtomicU64,
}
impl BackgroundErrorHandler {
pub fn new() -> Self {
Self {
bg_error: RwLock::new(None),
is_db_stopped: AtomicBool::new(false),
error_count: AtomicU64::new(0),
}
}
fn classify_error(error: &Error, reason: BackgroundErrorReason) -> ErrorSeverity {
match (reason, error) {
(_, Error::Corruption(_) | Error::CorruptedBlock(_) | Error::ManifestCorruption(_)) => {
ErrorSeverity::Unrecoverable
}
(_, Error::TableIDCollision(_)) => ErrorSeverity::Unrecoverable,
(_, Error::CorruptedTableMetadata(_)) => ErrorSeverity::Unrecoverable,
(BackgroundErrorReason::MemtablaFlush, Error::Io(_)) => ErrorSeverity::FatalError,
(BackgroundErrorReason::Compaction, Error::Io(_)) => ErrorSeverity::FatalError,
(BackgroundErrorReason::ManifestWrite, Error::Io(_)) => ErrorSeverity::FatalError,
_ => ErrorSeverity::HardError,
}
}
pub fn set_error(&self, error: Error, reason: BackgroundErrorReason) {
let severity = Self::classify_error(&error, reason);
let bg_error = BackgroundError {
error: error.clone(),
severity,
reason,
timestamp: Instant::now(),
};
let mut current_error = self.bg_error.write();
if let Some(ref existing) = *current_error {
if severity <= existing.severity {
log::debug!(
"Background error not updated: new severity {:?} <= existing {:?}, error: {:?}, reason: {:?}",
severity,
existing.severity,
error.to_string(),
reason
);
return;
}
}
*current_error = Some(bg_error.clone());
drop(current_error);
self.error_count.fetch_add(1, Ordering::Relaxed);
if severity >= ErrorSeverity::HardError {
self.is_db_stopped.store(true, Ordering::Release);
log::error!(
"Background error (severity {:?}, reason {:?}, timestamp {:?}): {}",
severity,
bg_error.reason,
bg_error.timestamp,
error
);
} else {
log::warn!(
"Background error (severity {:?}, reason {:?}, timestamp {:?}): {}",
severity,
bg_error.reason,
bg_error.timestamp,
error
);
}
}
pub fn check_error(&self) -> Result<()> {
if !self.is_db_stopped.load(Ordering::Acquire) {
return Ok(());
}
let bg_error = self.bg_error.read();
if let Some(ref error) = *bg_error {
if error.severity >= ErrorSeverity::HardError {
return Err(error.error.clone());
}
}
Ok(())
}
#[cfg(test)]
pub fn get_error(&self) -> Option<BackgroundError> {
self.bg_error.read().clone()
}
#[cfg(test)]
pub fn is_db_stopped(&self) -> bool {
self.is_db_stopped.load(Ordering::Acquire)
}
#[cfg(test)]
pub fn error_count(&self) -> u64 {
self.error_count.load(Ordering::Relaxed)
}
#[cfg(test)]
pub fn clear_error(&self) {
let mut error = self.bg_error.write();
*error = None;
drop(error);
self.is_db_stopped.store(false, Ordering::Release);
log::info!("Background error cleared");
}
}
impl Default for BackgroundErrorHandler {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use super::*;
#[test]
fn test_error_severity_ordering() {
assert!(ErrorSeverity::NoError < ErrorSeverity::SoftError);
assert!(ErrorSeverity::SoftError < ErrorSeverity::HardError);
assert!(ErrorSeverity::HardError < ErrorSeverity::FatalError);
assert!(ErrorSeverity::FatalError < ErrorSeverity::Unrecoverable);
}
#[test]
fn test_set_and_check_error() {
let handler = BackgroundErrorHandler::new();
assert!(!handler.is_db_stopped());
handler.set_error(
Error::Io(Arc::new(std::io::Error::other("test error"))),
BackgroundErrorReason::MemtablaFlush,
);
assert!(handler.is_db_stopped());
assert!(handler.check_error().is_err());
assert_eq!(handler.error_count(), 1);
}
#[test]
fn test_error_severity_upgrade() {
let handler = BackgroundErrorHandler::new();
handler.set_error(
Error::Io(Arc::new(std::io::Error::other("hard error"))),
BackgroundErrorReason::ManifestWrite,
);
assert!(handler.is_db_stopped());
handler.set_error(
Error::Io(Arc::new(std::io::Error::other("fatal error"))),
BackgroundErrorReason::MemtablaFlush,
);
assert!(handler.is_db_stopped());
let error = handler.get_error().unwrap();
assert_eq!(error.severity, ErrorSeverity::FatalError);
}
#[test]
fn test_error_downgrade_prevented() {
let handler = BackgroundErrorHandler::new();
handler.set_error(
Error::Io(Arc::new(std::io::Error::other("hard error"))),
BackgroundErrorReason::MemtablaFlush,
);
let first_error = handler.get_error().unwrap().error;
handler.set_error(
Error::Io(Arc::new(std::io::Error::other("soft error"))),
BackgroundErrorReason::Compaction,
);
let current_error = handler.get_error().unwrap();
assert_eq!(current_error.error.to_string(), first_error.to_string());
}
#[test]
fn test_clear_error() {
let handler = BackgroundErrorHandler::new();
handler.set_error(
Error::Io(Arc::new(std::io::Error::other("test error"))),
BackgroundErrorReason::MemtablaFlush,
);
assert!(handler.is_db_stopped());
handler.clear_error();
assert!(!handler.is_db_stopped());
assert!(handler.check_error().is_ok());
}
}