use crate::blob_store::{BlobCompactionPolicy, BlobCompactionReport, BlobDedupConfig, BlobStats};
use crate::cdc::CdcConfig;
use crate::error::{BackendError, TransactionError};
#[cfg(feature = "std")]
use crate::group_commit::{GroupCommitError, GroupCommitter, WriteBatch};
#[cfg(feature = "metrics")]
use crate::observer::DbMetrics;
use crate::observer::{DatabaseObserver, default_observer};
use crate::sealed::Sealed;
use crate::transaction_tracker::{TransactionId, TransactionTracker};
use crate::transactions::{
ALLOCATOR_STATE_TABLE_NAME, AllocatorStateKey, AllocatorStateTree, DATA_ALLOCATED_TABLE,
DATA_FREED_TABLE, PageList, SYSTEM_FREED_TABLE, SystemTableDefinition,
TransactionIdWithPagination,
};
#[cfg(feature = "std")]
use crate::tree_store::ReadOnlyBackend;
#[cfg(feature = "std")]
use crate::tree_store::salvage_tree_leaves;
use crate::tree_store::{
Btree, BtreeHeader, CompressionConfig, InternalTableDefinition, PAGE_SIZE, PageHint,
PageNumber, ShrinkPolicy, TableTree, TableType, TransactionalMemory,
};
use crate::types::{Key, Value};
use crate::{
CommitError, CompactionError, DatabaseError, Error, ReadOnlyTable, SavepointError,
StorageError, TableError,
};
use crate::{ReadTransaction, Result, WriteTransaction};
use alloc::boxed::Box;
use alloc::format;
use alloc::string::{String, ToString};
use alloc::sync::Arc;
#[cfg(feature = "std")]
use alloc::vec;
use alloc::vec::Vec;
use core::fmt::{Debug, Display, Formatter};
use core::marker::PhantomData;
#[cfg(feature = "std")]
use std::fs::{File, OpenOptions};
#[cfg(feature = "std")]
use std::path::Path;
#[cfg(feature = "std")]
use std::time::{Duration, Instant};
#[cfg(feature = "std")]
use crate::tree_store::file_backend::FileBackend;
#[cfg(feature = "logging")]
use log::{debug, info, warn};
#[allow(clippy::len_without_is_empty)]
pub trait StorageBackend: 'static + Debug + Send + Sync {
fn len(&self) -> core::result::Result<u64, BackendError>;
fn read(&self, offset: u64, out: &mut [u8]) -> core::result::Result<(), BackendError>;
fn set_len(&self, len: u64) -> core::result::Result<(), BackendError>;
fn sync_data(&self) -> core::result::Result<(), BackendError>;
fn write(&self, offset: u64, data: &[u8]) -> core::result::Result<(), BackendError>;
fn close(&self) -> core::result::Result<(), BackendError> {
Ok(())
}
}
pub trait TableHandle: Sealed {
fn name(&self) -> &str;
}
#[derive(Clone)]
pub struct UntypedTableHandle {
name: String,
}
impl UntypedTableHandle {
pub(crate) fn new(name: String) -> Self {
Self { name }
}
}
impl TableHandle for UntypedTableHandle {
fn name(&self) -> &str {
&self.name
}
}
impl Sealed for UntypedTableHandle {}
pub trait MultimapTableHandle: Sealed {
fn name(&self) -> &str;
}
#[derive(Clone)]
pub struct UntypedMultimapTableHandle {
name: String,
}
impl UntypedMultimapTableHandle {
pub(crate) fn new(name: String) -> Self {
Self { name }
}
}
impl MultimapTableHandle for UntypedMultimapTableHandle {
fn name(&self) -> &str {
&self.name
}
}
impl Sealed for UntypedMultimapTableHandle {}
const fn const_starts_with(haystack: &[u8], needle: &[u8]) -> bool {
if needle.len() > haystack.len() {
return false;
}
let mut i = 0;
while i < needle.len() {
if haystack[i] != needle[i] {
return false;
}
i += 1;
}
true
}
pub struct TableDefinition<'a, K: Key + 'static, V: Value + 'static> {
name: &'a str,
_key_type: PhantomData<K>,
_value_type: PhantomData<V>,
}
impl<'a, K: Key + 'static, V: Value + 'static> TableDefinition<'a, K, V> {
pub const fn new(name: &'a str) -> Self {
assert!(!name.is_empty());
assert!(
!const_starts_with(name.as_bytes(), b"__ivfpq:"),
"table names starting with \"__ivfpq:\" are reserved for internal use"
);
Self {
name,
_key_type: PhantomData,
_value_type: PhantomData,
}
}
pub(crate) const fn new_internal(name: &'a str) -> Self {
assert!(!name.is_empty());
Self {
name,
_key_type: PhantomData,
_value_type: PhantomData,
}
}
}
impl<K: Key + 'static, V: Value + 'static> TableHandle for TableDefinition<'_, K, V> {
fn name(&self) -> &str {
self.name
}
}
impl<K: Key, V: Value> Sealed for TableDefinition<'_, K, V> {}
impl<K: Key + 'static, V: Value + 'static> Clone for TableDefinition<'_, K, V> {
fn clone(&self) -> Self {
*self
}
}
impl<K: Key + 'static, V: Value + 'static> Copy for TableDefinition<'_, K, V> {}
impl<K: Key + 'static, V: Value + 'static> Display for TableDefinition<'_, K, V> {
fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
write!(
f,
"{}<{}, {}>",
self.name,
K::type_name().name(),
V::type_name().name()
)
}
}
pub struct MultimapTableDefinition<'a, K: Key + 'static, V: Key + 'static> {
name: &'a str,
_key_type: PhantomData<K>,
_value_type: PhantomData<V>,
}
impl<'a, K: Key + 'static, V: Key + 'static> MultimapTableDefinition<'a, K, V> {
pub const fn new(name: &'a str) -> Self {
assert!(!name.is_empty());
assert!(
!const_starts_with(name.as_bytes(), b"__ivfpq:"),
"table names starting with \"__ivfpq:\" are reserved for internal use"
);
Self {
name,
_key_type: PhantomData,
_value_type: PhantomData,
}
}
}
impl<K: Key + 'static, V: Key + 'static> MultimapTableHandle for MultimapTableDefinition<'_, K, V> {
fn name(&self) -> &str {
self.name
}
}
impl<K: Key, V: Key> Sealed for MultimapTableDefinition<'_, K, V> {}
impl<K: Key + 'static, V: Key + 'static> Clone for MultimapTableDefinition<'_, K, V> {
fn clone(&self) -> Self {
*self
}
}
impl<K: Key + 'static, V: Key + 'static> Copy for MultimapTableDefinition<'_, K, V> {}
impl<K: Key + 'static, V: Key + 'static> Display for MultimapTableDefinition<'_, K, V> {
fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
write!(
f,
"{}<{}, {}>",
self.name,
K::type_name().name(),
V::type_name().name()
)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum VerifyLevel {
Header,
Pages,
Full,
}
#[derive(Debug, Clone)]
pub struct CorruptPageInfo {
pub page_number: u64,
pub table_name: Option<String>,
pub description: String,
}
#[cfg(feature = "std")]
#[derive(Debug, Clone)]
pub struct VerifyReport {
pub valid: bool,
pub header_valid: bool,
pub pages_checked: u64,
pub pages_corrupt: u64,
pub structural_valid: Option<bool>,
pub corrupt_details: Vec<CorruptPageInfo>,
pub duration: Duration,
}
#[cfg(feature = "std")]
#[derive(Debug, Clone)]
pub struct SalvageReport {
pub tables_found: u64,
pub tables_recovered: u64,
pub rows_recovered: u64,
pub rows_insert_failed: u64,
pub rows_lost: u64,
pub corrupt_details: Vec<CorruptPageInfo>,
pub duration: Duration,
}
#[derive(Debug, Clone, Copy)]
pub struct CompactionProgress {
pub pages_relocated: u64,
pub complete: bool,
}
#[derive(Debug, Clone, Copy)]
pub struct CacheStats {
pub(crate) evictions: u64,
pub(crate) read_hits: u64,
pub(crate) read_misses: u64,
pub(crate) write_hits: u64,
pub(crate) write_misses: u64,
pub(crate) used_bytes: usize,
pub(crate) budget_bytes: Option<usize>,
}
impl CacheStats {
pub fn evictions(&self) -> u64 {
self.evictions
}
pub fn read_hits(&self) -> u64 {
self.read_hits
}
pub fn read_misses(&self) -> u64 {
self.read_misses
}
pub fn write_hits(&self) -> u64 {
self.write_hits
}
pub fn write_misses(&self) -> u64 {
self.write_misses
}
pub fn used_bytes(&self) -> usize {
self.used_bytes
}
pub fn budget_bytes(&self) -> Option<usize> {
self.budget_bytes
}
}
pub(crate) enum TransactionGuard {
Active {
transaction_tracker: Arc<TransactionTracker>,
transaction_id: Option<TransactionId>,
write_transaction: bool,
},
Verification,
}
impl TransactionGuard {
pub(crate) fn new_read(
transaction_id: TransactionId,
tracker: Arc<TransactionTracker>,
) -> Self {
Self::Active {
transaction_tracker: tracker,
transaction_id: Some(transaction_id),
write_transaction: false,
}
}
pub(crate) fn new_write(
transaction_id: TransactionId,
tracker: Arc<TransactionTracker>,
) -> Self {
Self::Active {
transaction_tracker: tracker,
transaction_id: Some(transaction_id),
write_transaction: true,
}
}
pub(crate) fn id(&self) -> Result<TransactionId, StorageError> {
match self {
Self::Active { transaction_id, .. } => transaction_id.ok_or_else(|| {
StorageError::Internal(String::from("TransactionGuard::id() called after leak()"))
}),
Self::Verification => Err(StorageError::Internal(String::from(
"TransactionGuard::id() called on Verification guard",
))),
}
}
pub(crate) fn leak(&mut self) -> Result<TransactionId, StorageError> {
match self {
Self::Active { transaction_id, .. } => transaction_id.take().ok_or_else(|| {
StorageError::Internal(String::from(
"TransactionGuard::leak() called after prior leak()",
))
}),
Self::Verification => Err(StorageError::Internal(String::from(
"TransactionGuard::leak() called on Verification guard",
))),
}
}
}
impl Drop for TransactionGuard {
fn drop(&mut self) {
if let Self::Active {
transaction_tracker,
transaction_id: Some(transaction_id),
write_transaction,
} = self
{
if *write_transaction {
let _ = transaction_tracker.end_write_transaction(*transaction_id);
} else {
let _ = transaction_tracker.deallocate_read_transaction(*transaction_id);
}
}
}
}
pub trait ReadableDatabase {
fn begin_read(&self) -> Result<ReadTransaction, TransactionError>;
fn cache_stats(&self) -> CacheStats;
}
pub struct ReadOnlyDatabase {
mem: Arc<TransactionalMemory>,
transaction_tracker: Arc<TransactionTracker>,
}
impl ReadableDatabase for ReadOnlyDatabase {
fn begin_read(&self) -> Result<ReadTransaction, TransactionError> {
let id = self
.transaction_tracker
.register_read_transaction(&self.mem)?;
#[cfg(feature = "logging")]
debug!("Beginning read transaction id={id:?}");
let guard = TransactionGuard::new_read(id, self.transaction_tracker.clone());
ReadTransaction::new(
self.mem.clone(),
guard,
default_observer(),
#[cfg(feature = "metrics")]
Arc::new(DbMetrics::new()),
)
}
fn cache_stats(&self) -> CacheStats {
self.mem.cache_stats()
}
}
impl ReadOnlyDatabase {
#[cfg(feature = "std")]
pub fn open(path: impl AsRef<Path>) -> Result<ReadOnlyDatabase, DatabaseError> {
Builder::new().open_read_only(path)
}
#[allow(clippy::too_many_arguments)]
#[cfg(feature = "std")]
fn new(
file: Box<dyn StorageBackend>,
page_size: usize,
region_size: Option<u64>,
read_cache_size_bytes: usize,
compression: CompressionConfig,
memory_budget: Option<usize>,
read_verification: ReadVerification,
read_verification_callback: Option<Arc<ReadVerificationCallback>>,
) -> Result<Self, DatabaseError> {
#[cfg(feature = "logging")]
let file_path = format!("{:?}", &file);
#[cfg(feature = "logging")]
info!("Opening database in read-only {:?}", &file_path);
let mem = TransactionalMemory::new(
Box::new(ReadOnlyBackend::new(file)),
false,
page_size,
region_size,
read_cache_size_bytes,
0,
true,
compression,
memory_budget,
read_verification,
read_verification_callback,
)?;
let mem = Arc::new(mem);
if let Some(tree) = Database::get_allocator_state_table(&mem)? {
mem.load_allocator_state(&tree)?;
} else {
#[cfg(feature = "logging")]
warn!(
"Database {:?} not shutdown cleanly. Repair required",
&file_path
);
return Err(DatabaseError::RepairAborted);
}
if !Database::verify_primary_checksums(mem.clone())? {
return Err(DatabaseError::Storage(StorageError::Corrupted(
"B-tree checksum verification failed".to_string(),
)));
}
let next_transaction_id = mem.get_last_committed_transaction_id()?.next()?;
let db = Self {
mem,
transaction_tracker: Arc::new(TransactionTracker::new(next_transaction_id)),
};
Ok(db)
}
}
#[derive(Debug, Clone)]
pub struct TransactionInfo {
pub transaction_id: u64,
pub timestamp_ms: u64,
}
pub struct Database {
mem: Arc<TransactionalMemory>,
transaction_tracker: Arc<TransactionTracker>,
blob_dedup_config: BlobDedupConfig,
cdc_config: CdcConfig,
history_retention: u64,
blob_compaction_policy: BlobCompactionPolicy,
observer: Arc<dyn DatabaseObserver>,
#[cfg(feature = "metrics")]
db_metrics: Arc<DbMetrics>,
#[cfg(feature = "std")]
group_committer: GroupCommitter,
}
impl ReadableDatabase for Database {
fn begin_read(&self) -> Result<ReadTransaction, TransactionError> {
let guard = self.allocate_read_transaction()?;
let txn_id = guard.id().ok();
#[cfg(feature = "logging")]
debug!("Beginning read transaction id={txn_id:?}");
let txn = ReadTransaction::new(
self.get_memory(),
guard,
Arc::clone(&self.observer),
#[cfg(feature = "metrics")]
Arc::clone(&self.db_metrics),
)?;
if let Some(id) = txn_id {
self.observer.on_read_begin(id.raw_id());
#[cfg(feature = "metrics")]
self.db_metrics
.read_txn_opened
.fetch_add(1, portable_atomic::Ordering::Relaxed);
}
Ok(txn)
}
fn cache_stats(&self) -> CacheStats {
self.mem.cache_stats()
}
}
impl Database {
#[cfg(feature = "std")]
pub fn create(path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
Self::builder().create(path)
}
#[cfg(feature = "std")]
pub fn open(path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
Self::builder().open(path)
}
pub(crate) fn get_memory(&self) -> Arc<TransactionalMemory> {
self.mem.clone()
}
pub(crate) fn verify_primary_checksums(mem: Arc<TransactionalMemory>) -> Result<bool> {
let table_tree = TableTree::new(
mem.get_data_root(),
PageHint::None,
Arc::new(TransactionGuard::Verification),
mem.clone(),
)?;
if !table_tree.verify_checksums()? {
return Ok(false);
}
let system_table_tree = TableTree::new(
mem.get_system_root(),
PageHint::None,
Arc::new(TransactionGuard::Verification),
mem.clone(),
)?;
if !system_table_tree.verify_checksums()? {
return Ok(false);
}
Ok(true)
}
#[cfg(feature = "std")]
pub(crate) fn verify_primary_checksums_detailed(
mem: Arc<TransactionalMemory>,
data_root: Option<BtreeHeader>,
system_root: Option<BtreeHeader>,
guard: Arc<TransactionGuard>,
) -> Result<(u64, Vec<CorruptPageInfo>)> {
let mut total_pages = 0u64;
let mut all_corruptions = Vec::new();
let table_tree = TableTree::new(data_root, PageHint::None, guard.clone(), mem.clone())?;
let (pages, corruptions) = table_tree.verify_checksums_detailed()?;
total_pages += pages;
all_corruptions.extend(corruptions);
let system_table_tree = TableTree::new(system_root, PageHint::None, guard, mem.clone())?;
let (pages, corruptions) = system_table_tree.verify_checksums_detailed()?;
total_pages += pages;
all_corruptions.extend(corruptions);
Ok((total_pages, all_corruptions))
}
#[cfg(feature = "std")]
pub(crate) fn verify_primary_structure(
mem: Arc<TransactionalMemory>,
) -> Result<Vec<CorruptPageInfo>> {
Self::verify_structure_with_roots(
mem.clone(),
mem.get_data_root(),
mem.get_system_root(),
Arc::new(TransactionGuard::Verification),
)
}
#[cfg(feature = "std")]
fn verify_structure_with_roots(
mem: Arc<TransactionalMemory>,
data_root: Option<BtreeHeader>,
system_root: Option<BtreeHeader>,
guard: Arc<TransactionGuard>,
) -> Result<Vec<CorruptPageInfo>> {
let mut all_corruptions = Vec::new();
let table_tree = TableTree::new(data_root, PageHint::None, guard.clone(), mem.clone())?;
all_corruptions.extend(table_tree.verify_structure_detailed()?);
let system_table_tree = TableTree::new(system_root, PageHint::None, guard, mem.clone())?;
all_corruptions.extend(system_table_tree.verify_structure_detailed()?);
Ok(all_corruptions)
}
#[cfg(feature = "std")]
#[allow(clippy::cast_possible_truncation)]
pub fn backup(&self, path: impl AsRef<Path>) -> Result<(), StorageError> {
use std::io::Write;
const CHUNK_SIZE: usize = 1024 * 1024;
let _read_txn = self.begin_read().map_err(|e| e.into_storage_error())?;
self.mem.flush_data()?;
let file_len = self.mem.raw_len()?;
let mut dest =
File::create(path.as_ref()).map_err(|e| StorageError::Io(BackendError::Io(e)))?;
let mut buf = vec![0u8; CHUNK_SIZE];
let mut offset = 0u64;
while offset < file_len {
let remaining = (file_len - offset) as usize;
let to_read = remaining.min(CHUNK_SIZE);
let chunk = &mut buf[..to_read];
self.mem.read_raw(offset, chunk)?;
dest.write_all(chunk)
.map_err(|e| StorageError::Io(BackendError::Io(e)))?;
offset += to_read as u64;
}
dest.sync_all()
.map_err(|e| StorageError::Io(BackendError::Io(e)))?;
Ok(())
}
#[cfg(feature = "std")]
pub fn verify_backup(
path: impl AsRef<Path>,
level: VerifyLevel,
) -> core::result::Result<VerifyReport, DatabaseError> {
let start = Instant::now();
let file = OpenOptions::new().read(true).open(path.as_ref())?;
let backend: Box<dyn StorageBackend> = Box::new(
crate::tree_store::file_backend::FileBackend::new_internal(file, true)?,
);
let (mem, header_valid) =
TransactionalMemory::new_for_verify(backend, PAGE_SIZE, None, CompressionConfig::None)?;
if level == VerifyLevel::Header {
return Ok(VerifyReport {
valid: header_valid,
header_valid,
pages_checked: 0,
pages_corrupt: 0,
structural_valid: None,
corrupt_details: Vec::new(),
duration: start.elapsed(),
});
}
let mem = Arc::new(mem);
let (pages_checked, mut corrupt_details) = Self::verify_primary_checksums_detailed(
mem.clone(),
mem.get_data_root(),
mem.get_system_root(),
Arc::new(TransactionGuard::Verification),
)?;
let pages_corrupt = corrupt_details.len() as u64;
let structural_valid = if level == VerifyLevel::Full {
let structural_corruptions = Self::verify_primary_structure(mem)?;
if !structural_corruptions.is_empty() {
corrupt_details.extend(structural_corruptions);
Some(false)
} else {
Some(true)
}
} else {
None
};
let valid = header_valid && pages_corrupt == 0 && structural_valid.unwrap_or(true);
Ok(VerifyReport {
valid,
header_valid,
pages_checked,
pages_corrupt,
structural_valid,
corrupt_details,
duration: start.elapsed(),
})
}
#[cfg(feature = "std")]
pub fn salvage(
corrupted_path: impl AsRef<Path>,
output_path: impl AsRef<Path>,
) -> core::result::Result<SalvageReport, DatabaseError> {
let start = Instant::now();
let mut corrupt_details: Vec<CorruptPageInfo> = Vec::new();
let mut tables_recovered = 0u64;
let mut rows_recovered = 0u64;
let mut rows_insert_failed = 0u64;
let mut rows_lost = 0u64;
let file = OpenOptions::new()
.read(true)
.open(corrupted_path.as_ref())?;
let backend: Box<dyn crate::StorageBackend> = Box::new(
crate::tree_store::file_backend::FileBackend::new_internal(file, true)?,
);
let (mem, _header_valid) =
TransactionalMemory::new_for_verify(backend, PAGE_SIZE, None, CompressionConfig::None)?;
let mem = Arc::new(mem);
let data_root = mem.get_data_root();
let table_entries =
Self::salvage_discover_tables(mem.clone(), data_root, &mut corrupt_details);
let tables_found = table_entries.len() as u64;
let output_db = Database::builder().create(output_path.as_ref())?;
for (table_name, definition) in &table_entries {
let (table_root, fixed_key_size, fixed_value_size) = match definition {
InternalTableDefinition::Normal {
table_root,
fixed_key_size,
fixed_value_size,
..
}
| InternalTableDefinition::Multimap {
table_root,
fixed_key_size,
fixed_value_size,
..
} => (*table_root, *fixed_key_size, *fixed_value_size),
};
let Some(root) = table_root else {
continue;
};
let effective_value_size = if mem.compression().is_enabled() {
None
} else {
fixed_value_size
};
let mut pairs: Vec<(Vec<u8>, Vec<u8>)> = Vec::new();
let mut table_corruptions: Vec<CorruptPageInfo> = Vec::new();
let table_rows = salvage_tree_leaves(
root,
&mem,
fixed_key_size,
effective_value_size,
&mut pairs,
&mut table_corruptions,
);
for c in &mut table_corruptions {
c.table_name = Some(table_name.clone());
}
if !pairs.is_empty() {
let leaked_name: &'static str = Box::leak(table_name.clone().into_boxed_str());
let raw_def: TableDefinition<&[u8], &[u8]> = TableDefinition::new(leaked_name);
let write_txn = output_db
.begin_write()
.map_err(|e| DatabaseError::Storage(e.into_storage_error()))?;
let table_insert_failed;
{
let mut table = write_txn.open_table(raw_def).map_err(|e| {
DatabaseError::Storage(
e.into_storage_error_or_internal("salvage: open_table"),
)
})?;
let mut failed = 0u64;
for (key, value) in &pairs {
if table.insert(key.as_slice(), value.as_slice()).is_err() {
failed += 1;
}
}
table_insert_failed = failed;
}
write_txn
.commit()
.map_err(|e| DatabaseError::Storage(e.into_storage_error()))?;
tables_recovered += 1;
rows_insert_failed += table_insert_failed;
rows_recovered += table_rows.saturating_sub(table_insert_failed);
} else {
rows_recovered += table_rows;
}
rows_lost += table_corruptions.len() as u64;
corrupt_details.extend(table_corruptions);
}
Ok(SalvageReport {
tables_found,
tables_recovered,
rows_recovered,
rows_insert_failed,
rows_lost,
corrupt_details,
duration: start.elapsed(),
})
}
#[cfg(feature = "std")]
fn salvage_discover_tables(
mem: Arc<TransactionalMemory>,
system_root: Option<BtreeHeader>,
corruptions: &mut Vec<CorruptPageInfo>,
) -> Vec<(String, InternalTableDefinition)> {
let Some(root) = system_root else {
return Vec::new();
};
let mut raw_pairs: Vec<(Vec<u8>, Vec<u8>)> = Vec::new();
salvage_tree_leaves(root, &mem, None, None, &mut raw_pairs, corruptions);
let mut tables = Vec::new();
for (key_bytes, value_bytes) in &raw_pairs {
let name = match core::str::from_utf8(key_bytes) {
Ok(s) => s.to_string(),
Err(_) => continue,
};
if name.starts_with('\0') {
continue;
}
let vb = value_bytes.clone();
let parsed = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
<InternalTableDefinition as crate::types::Value>::from_bytes(&vb)
}));
match parsed {
Ok(definition) => tables.push((name, definition)),
Err(_) => {
corruptions.push(CorruptPageInfo {
page_number: 0,
table_name: Some(name),
description: "corrupt table definition".to_string(),
});
}
}
}
tables
}
#[cfg(feature = "std")]
pub fn verify_integrity(&self, level: VerifyLevel) -> Result<VerifyReport> {
let start = Instant::now();
let header_valid = true;
if level == VerifyLevel::Header {
return Ok(VerifyReport {
valid: true,
header_valid,
pages_checked: 0,
pages_corrupt: 0,
structural_valid: None,
corrupt_details: Vec::new(),
duration: start.elapsed(),
});
}
let guard = Arc::new(self.allocate_read_transaction()?);
let snapshot_data_root = self.mem.get_persisted_data_root();
let snapshot_system_root = self.mem.get_persisted_system_root();
let (pages_checked, mut corrupt_details) = Self::verify_primary_checksums_detailed(
self.mem.clone(),
snapshot_data_root,
snapshot_system_root,
guard.clone(),
)?;
let pages_corrupt = corrupt_details.len() as u64;
let structural_valid = if level == VerifyLevel::Full {
let structural_corruptions = Self::verify_structure_with_roots(
self.mem.clone(),
snapshot_data_root,
snapshot_system_root,
guard.clone(),
)?;
if !structural_corruptions.is_empty() {
corrupt_details.extend(structural_corruptions);
Some(false)
} else {
Some(true)
}
} else {
None
};
let valid = pages_corrupt == 0 && structural_valid.unwrap_or(true);
Ok(VerifyReport {
valid,
header_valid,
pages_checked,
pages_corrupt,
structural_valid,
corrupt_details,
duration: start.elapsed(),
})
}
pub fn check_integrity(&mut self) -> Result<bool, DatabaseError> {
let allocator_hash = self.mem.allocator_hash();
let mut was_clean = Arc::get_mut(&mut self.mem)
.ok_or_else(|| {
DatabaseError::Storage(StorageError::invalid_config(
"check_integrity() requires exclusive database access, but other references to the memory exist",
))
})?
.clear_cache_and_reload()?;
let old_roots = [self.mem.get_data_root(), self.mem.get_system_root()];
let new_roots = Self::do_repair(&mut self.mem, &|_| {}).map_err(|err| match err {
DatabaseError::Storage(storage_err) => storage_err,
_ => StorageError::Internal(
"unexpected non-storage error during integrity check repair".to_string(),
),
})?;
if old_roots != new_roots || allocator_hash != self.mem.allocator_hash() {
was_clean = false;
}
if !was_clean {
let next_transaction_id = self.mem.get_last_committed_transaction_id()?.next()?;
let [data_root, system_root] = new_roots;
self.mem.commit(
data_root,
system_root,
next_transaction_id,
true,
ShrinkPolicy::Never,
)?;
}
self.mem.begin_writable()?;
Ok(was_clean)
}
pub fn compact(&mut self) -> Result<bool, CompactionError> {
if self
.transaction_tracker
.oldest_live_read_transaction()
.map_err(CompactionError::Storage)?
.is_some()
{
return Err(CompactionError::TransactionInProgress);
}
let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
if txn.list_persistent_savepoints()?.next().is_some() {
return Err(CompactionError::PersistentSavepointExists);
}
if self
.transaction_tracker
.any_savepoint_exists()
.map_err(CompactionError::Storage)?
{
return Err(CompactionError::EphemeralSavepointExists);
}
txn.set_two_phase_commit(true);
txn.commit().map_err(|e| e.into_storage_error())?;
let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
txn.set_two_phase_commit(true);
txn.commit().map_err(|e| e.into_storage_error())?;
let txn = self.begin_write().map_err(|e| e.into_storage_error())?;
if txn.pending_free_pages()? {
return Err(StorageError::Internal(
"compaction: pending free pages remain after exclusive durable commit".into(),
)
.into());
}
txn.abort()?;
let mut compacted = false;
let max_compact_iterations = 20u32;
let mut iteration = 0u32;
loop {
if iteration >= max_compact_iterations {
return Err(CompactionError::Storage(StorageError::Corrupted(format!(
"Compaction did not converge after {max_compact_iterations} iterations"
))));
}
iteration += 1;
let mut progress = false;
let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
if txn.compact_pages()? {
progress = true;
txn.commit().map_err(|e| e.into_storage_error())?;
} else {
txn.abort()?;
}
let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
txn.set_two_phase_commit(true);
txn.set_shrink_policy(ShrinkPolicy::Maximum);
txn.commit().map_err(|e| e.into_storage_error())?;
let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
txn.set_two_phase_commit(true);
txn.set_shrink_policy(ShrinkPolicy::Maximum);
txn.commit().map_err(|e| e.into_storage_error())?;
let txn = self.begin_write().map_err(|e| e.into_storage_error())?;
if txn.pending_free_pages()? {
return Err(StorageError::Internal(
"compaction: pending free pages remain after exclusive durable commit".into(),
)
.into());
}
txn.abort()?;
if !progress {
break;
}
compacted = true;
}
Ok(compacted)
}
pub fn compact_blobs(&mut self) -> core::result::Result<BlobCompactionReport, CompactionError> {
if self
.transaction_tracker
.oldest_live_read_transaction()
.map_err(CompactionError::Storage)?
.is_some()
{
return Err(CompactionError::TransactionInProgress);
}
{
let txn = self.begin_write().map_err(|e| e.into_storage_error())?;
if txn.list_persistent_savepoints()?.next().is_some() {
txn.abort().map_err(CompactionError::Storage)?;
return Err(CompactionError::PersistentSavepointExists);
}
if self
.transaction_tracker
.any_savepoint_exists()
.map_err(CompactionError::Storage)?
{
txn.abort().map_err(CompactionError::Storage)?;
return Err(CompactionError::EphemeralSavepointExists);
}
txn.abort().map_err(CompactionError::Storage)?;
}
let stats = {
let txn = self.begin_write().map_err(|e| e.into_storage_error())?;
let s = txn.blob_stats().map_err(CompactionError::Storage)?;
txn.abort().map_err(CompactionError::Storage)?;
s
};
if stats.dead_bytes == 0 {
return Ok(BlobCompactionReport {
blobs_relocated: 0,
live_bytes: stats.live_bytes,
bytes_reclaimed: 0,
was_noop: true,
});
}
let old_region_length = stats.region_bytes;
let (blobs_relocated, total_live_size) = {
let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
let result = txn.compact_blobs_pass(false);
match result {
Ok(r) => {
txn.set_two_phase_commit(true);
txn.commit().map_err(|e| e.into_storage_error())?;
r
}
Err(e) => {
txn.abort().map_err(CompactionError::Storage)?;
return Err(CompactionError::Storage(e));
}
}
};
{
let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
let result = txn.compact_blobs_pass(true);
match result {
Ok(_) => {
txn.set_two_phase_commit(true);
txn.commit().map_err(|e| e.into_storage_error())?;
}
Err(e) => {
txn.abort().map_err(CompactionError::Storage)?;
return Err(CompactionError::Storage(e));
}
}
}
Ok(BlobCompactionReport {
blobs_relocated,
live_bytes: total_live_size,
bytes_reclaimed: old_region_length - total_live_size,
was_noop: false,
})
}
pub fn should_compact_blobs(&self) -> Result<Option<BlobStats>, TransactionError> {
let txn = self.begin_write().map_err(|e| e.into_storage_error())?;
let stats = txn.blob_stats().map_err(TransactionError::Storage)?;
txn.abort().map_err(TransactionError::Storage)?;
let policy = &self.blob_compaction_policy;
if stats.dead_bytes >= policy.min_dead_bytes
&& stats.fragmentation_ratio >= policy.fragmentation_threshold
{
Ok(Some(stats))
} else {
Ok(None)
}
}
pub fn compact_blobs_with_progress(
&mut self,
mut callback: impl FnMut(u64, u64, u64, u64) -> bool,
) -> core::result::Result<BlobCompactionReport, CompactionError> {
if self
.transaction_tracker
.oldest_live_read_transaction()
.map_err(CompactionError::Storage)?
.is_some()
{
return Err(CompactionError::TransactionInProgress);
}
{
let txn = self.begin_write().map_err(|e| e.into_storage_error())?;
if txn.list_persistent_savepoints()?.next().is_some() {
txn.abort().map_err(CompactionError::Storage)?;
return Err(CompactionError::PersistentSavepointExists);
}
if self
.transaction_tracker
.any_savepoint_exists()
.map_err(CompactionError::Storage)?
{
txn.abort().map_err(CompactionError::Storage)?;
return Err(CompactionError::EphemeralSavepointExists);
}
txn.abort().map_err(CompactionError::Storage)?;
}
let stats = {
let txn = self.begin_write().map_err(|e| e.into_storage_error())?;
let s = txn.blob_stats().map_err(CompactionError::Storage)?;
txn.abort().map_err(CompactionError::Storage)?;
s
};
if stats.dead_bytes == 0 {
return Ok(BlobCompactionReport {
blobs_relocated: 0,
live_bytes: stats.live_bytes,
bytes_reclaimed: 0,
was_noop: true,
});
}
let old_region_length = stats.region_bytes;
let total_blobs = stats.blob_count;
let total_bytes = stats.live_bytes;
if !callback(0, total_blobs, 0, total_bytes) {
return Err(CompactionError::Cancelled);
}
let (blobs_relocated, total_live_size) = {
let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
let result = txn.compact_blobs_pass(false);
match result {
Ok(r) => {
txn.set_two_phase_commit(true);
txn.commit().map_err(|e| e.into_storage_error())?;
r
}
Err(e) => {
txn.abort().map_err(CompactionError::Storage)?;
return Err(CompactionError::Storage(e));
}
}
};
if !callback(blobs_relocated, total_blobs, total_live_size, total_bytes) {
return Err(CompactionError::Cancelled);
}
{
let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
let result = txn.compact_blobs_pass(true);
match result {
Ok(_) => {
txn.set_two_phase_commit(true);
txn.commit().map_err(|e| e.into_storage_error())?;
}
Err(e) => {
txn.abort().map_err(CompactionError::Storage)?;
return Err(CompactionError::Storage(e));
}
}
}
let _ = callback(blobs_relocated, total_blobs, total_live_size, total_bytes);
Ok(BlobCompactionReport {
blobs_relocated,
live_bytes: total_live_size,
bytes_reclaimed: old_region_length - total_live_size,
was_noop: false,
})
}
pub fn start_compaction(&self) -> core::result::Result<CompactionHandle<'_>, CompactionError> {
let txn = self.begin_write().map_err(|e| e.into_storage_error())?;
if txn.list_persistent_savepoints()?.next().is_some() {
txn.abort().map_err(CompactionError::Storage)?;
return Err(CompactionError::PersistentSavepointExists);
}
if self
.transaction_tracker
.any_savepoint_exists()
.map_err(CompactionError::Storage)?
{
txn.abort().map_err(CompactionError::Storage)?;
return Err(CompactionError::EphemeralSavepointExists);
}
txn.abort().map_err(CompactionError::Storage)?;
Ok(CompactionHandle { db: self })
}
pub fn start_blob_compaction(
&self,
) -> core::result::Result<BlobCompactionHandle<'_>, CompactionError> {
if self
.transaction_tracker
.oldest_live_read_transaction()
.map_err(CompactionError::Storage)?
.is_some()
{
return Err(CompactionError::TransactionInProgress);
}
{
let txn = self.begin_write().map_err(|e| e.into_storage_error())?;
if txn.list_persistent_savepoints()?.next().is_some() {
txn.abort().map_err(CompactionError::Storage)?;
return Err(CompactionError::PersistentSavepointExists);
}
if self
.transaction_tracker
.any_savepoint_exists()
.map_err(CompactionError::Storage)?
{
txn.abort().map_err(CompactionError::Storage)?;
return Err(CompactionError::EphemeralSavepointExists);
}
txn.abort().map_err(CompactionError::Storage)?;
}
let stats = {
let txn = self.begin_write().map_err(|e| e.into_storage_error())?;
let s = txn.blob_stats().map_err(CompactionError::Storage)?;
txn.abort().map_err(CompactionError::Storage)?;
s
};
if stats.dead_bytes == 0 {
return Ok(BlobCompactionHandle {
db: self,
stats,
phase: 2,
blobs_relocated: 0,
live_bytes: stats.live_bytes,
});
}
Ok(BlobCompactionHandle {
db: self,
stats,
phase: 0,
blobs_relocated: 0,
live_bytes: 0,
})
}
#[cfg(feature = "std")]
pub fn start_integrity_scanner(
&self,
config: crate::integrity_scanner::IntegrityScannerConfig,
) -> Result<crate::integrity_scanner::IntegrityScannerHandle, DatabaseError> {
crate::integrity_scanner::IntegrityScannerHandle::start(self.mem.clone(), config)
.map_err(DatabaseError::from)
}
#[cfg(feature = "std")]
pub fn export_incremental(
&self,
since_txn: u64,
) -> core::result::Result<crate::incremental::IncrementalSnapshot, StorageError> {
crate::incremental::export_incremental(self, since_txn)
}
#[cfg(feature = "std")]
pub fn import_incremental(
&self,
snapshot: &crate::incremental::IncrementalSnapshot,
) -> core::result::Result<crate::incremental::IncrementalImportReport, StorageError> {
crate::incremental::import_incremental(self, snapshot)
}
#[cfg(feature = "std")]
pub fn backup_incremental(
&self,
dest: impl AsRef<std::path::Path>,
since_txn: u64,
) -> core::result::Result<crate::incremental::IncrementalBackupReport, StorageError> {
crate::incremental::backup_incremental(self, dest.as_ref(), since_txn)
}
#[cfg(feature = "std")]
pub fn apply_incremental_backup(
&self,
path: impl AsRef<std::path::Path>,
) -> core::result::Result<crate::incremental::IncrementalImportReport, StorageError> {
crate::incremental::apply_incremental_backup(self, path.as_ref())
}
#[cfg_attr(not(debug_assertions), expect(dead_code))]
fn check_repaired_allocated_pages_table(
system_root: Option<BtreeHeader>,
mem: Arc<TransactionalMemory>,
) -> Result {
let table_tree = TableTree::new(
system_root,
PageHint::None,
Arc::new(TransactionGuard::Verification),
mem.clone(),
)?;
if let Some(table_def) = table_tree
.get_table::<TransactionIdWithPagination, PageList>(
DATA_ALLOCATED_TABLE.name(),
TableType::Normal,
)
.map_err(|e| e.into_storage_error_or_internal("Allocated pages table corrupted"))?
{
let InternalTableDefinition::Normal { table_root, .. } = table_def else {
return Err(StorageError::Internal(
"unexpected non-normal table type for allocated pages table".to_string(),
));
};
let table: ReadOnlyTable<TransactionIdWithPagination, PageList> =
ReadOnlyTable::new_uncompressed(
DATA_ALLOCATED_TABLE.name().to_string(),
table_root,
PageHint::None,
Arc::new(TransactionGuard::Verification),
mem.clone(),
)?;
for result in table.range::<TransactionIdWithPagination>(..)? {
let (_, pages) = result?;
for i in 0..pages.value().len() {
assert!(mem.is_allocated(pages.value().get(i)));
}
}
}
Ok(())
}
fn visit_freed_tree<K: Key, V: Value, F>(
system_root: Option<BtreeHeader>,
table_def: SystemTableDefinition<K, V>,
mem: Arc<TransactionalMemory>,
mut visitor: F,
) -> Result
where
F: FnMut(PageNumber) -> Result,
{
let fake_guard = Arc::new(TransactionGuard::Verification);
let system_tree = TableTree::new(system_root, PageHint::None, fake_guard, mem.clone())?;
let table_name = table_def.name();
let result = match system_tree.get_table::<K, V>(table_name, TableType::Normal) {
Ok(result) => result,
Err(TableError::Storage(err)) => {
return Err(err);
}
Err(TableError::TableDoesNotExist(_)) => {
return Ok(());
}
Err(_) => {
return Err(StorageError::Corrupted(format!(
"Unable to open {table_name}"
)));
}
};
if let Some(definition) = result {
let table_root = match definition {
InternalTableDefinition::Normal { table_root, .. } => table_root,
InternalTableDefinition::Multimap { .. } => {
return Err(StorageError::Corrupted(
"unexpected multimap table type in freed tree lookup".to_string(),
));
}
};
let table: ReadOnlyTable<TransactionIdWithPagination, PageList<'static>> =
ReadOnlyTable::new_uncompressed(
table_name.to_string(),
table_root,
PageHint::None,
Arc::new(TransactionGuard::Verification),
mem.clone(),
)?;
for result in table.range::<TransactionIdWithPagination>(..)? {
let (_, page_list) = result?;
for i in 0..page_list.value().len() {
visitor(page_list.value().get(i))?;
}
}
}
Ok(())
}
#[cfg(debug_assertions)]
fn mark_allocated_page_for_debug(
mem: &mut Arc<TransactionalMemory>, ) -> Result {
let data_root = mem.get_data_root();
{
let fake = Arc::new(TransactionGuard::Verification);
let tables = TableTree::new(data_root, PageHint::None, fake, mem.clone())?;
tables.visit_all_pages(|path| {
mem.mark_debug_allocated_page(path.page_number());
Ok(())
})?;
}
let system_root = mem.get_system_root();
{
let fake = Arc::new(TransactionGuard::Verification);
let system_tables = TableTree::new(system_root, PageHint::None, fake, mem.clone())?;
system_tables.visit_all_pages(|path| {
mem.mark_debug_allocated_page(path.page_number());
Ok(())
})?;
}
Self::visit_freed_tree(system_root, DATA_FREED_TABLE, mem.clone(), |page| {
mem.mark_debug_allocated_page(page);
Ok(())
})?;
Self::visit_freed_tree(system_root, SYSTEM_FREED_TABLE, mem.clone(), |page| {
mem.mark_debug_allocated_page(page);
Ok(())
})?;
Ok(())
}
fn do_repair(
mem: &mut Arc<TransactionalMemory>, repair_callback: &(dyn Fn(&mut RepairSession) + 'static),
) -> Result<[Option<BtreeHeader>; 2], DatabaseError> {
if !Self::verify_primary_checksums(mem.clone())? {
if mem.used_two_phase_commit() {
return Err(DatabaseError::Storage(StorageError::Corrupted(
"Primary is corrupted despite 2-phase commit".to_string(),
)));
}
let mut handle = RepairSession::new(0.3);
repair_callback(&mut handle);
if handle.aborted() {
return Err(DatabaseError::RepairAborted);
}
mem.repair_primary_corrupted();
mem.clear_read_cache();
if !Self::verify_primary_checksums(mem.clone())? {
return Err(DatabaseError::Storage(StorageError::Corrupted(
"Failed to repair database. All roots are corrupted".to_string(),
)));
}
}
let mut handle = RepairSession::new(0.6);
repair_callback(&mut handle);
if handle.aborted() {
return Err(DatabaseError::RepairAborted);
}
mem.begin_repair()?;
let data_root = mem.get_data_root();
{
let fake = Arc::new(TransactionGuard::Verification);
let tables = TableTree::new(data_root, PageHint::None, fake, mem.clone())?;
tables.visit_all_pages(|path| {
mem.mark_page_allocated(path.page_number())?;
Ok(())
})?;
}
let mut handle = RepairSession::new(0.9);
repair_callback(&mut handle);
if handle.aborted() {
return Err(DatabaseError::RepairAborted);
}
let system_root = mem.get_system_root();
{
let fake = Arc::new(TransactionGuard::Verification);
let system_tables = TableTree::new(system_root, PageHint::None, fake, mem.clone())?;
system_tables.visit_all_pages(|path| {
mem.mark_page_allocated(path.page_number())?;
Ok(())
})?;
}
Self::visit_freed_tree(system_root, DATA_FREED_TABLE, mem.clone(), |page| {
mem.mark_page_allocated(page)?;
Ok(())
})?;
Self::visit_freed_tree(system_root, SYSTEM_FREED_TABLE, mem.clone(), |page| {
mem.mark_page_allocated(page)?;
Ok(())
})?;
#[cfg(debug_assertions)]
{
Self::check_repaired_allocated_pages_table(system_root, mem.clone())?;
}
mem.end_repair()?;
mem.clear_read_cache();
Ok([data_root, system_root])
}
#[allow(clippy::too_many_arguments)]
fn new(
file: Box<dyn StorageBackend>,
allow_initialize: bool,
page_size: usize,
region_size: Option<u64>,
read_cache_size_bytes: usize,
write_cache_size_bytes: usize,
repair_callback: &(dyn Fn(&mut RepairSession) + 'static),
compression: CompressionConfig,
blob_dedup_config: BlobDedupConfig,
memory_budget: Option<usize>,
cdc_config: CdcConfig,
history_retention: u64,
read_verification: ReadVerification,
read_verification_callback: Option<Arc<ReadVerificationCallback>>,
blob_compaction_policy: BlobCompactionPolicy,
observer: Arc<dyn DatabaseObserver>,
#[cfg(feature = "metrics")] db_metrics: Arc<DbMetrics>,
) -> Result<Self, DatabaseError> {
#[cfg(feature = "logging")]
let file_path = format!("{:?}", &file);
#[cfg(feature = "logging")]
info!("Opening database {:?}", &file_path);
let mem = TransactionalMemory::new(
file,
allow_initialize,
page_size,
region_size,
read_cache_size_bytes,
write_cache_size_bytes,
false,
compression,
memory_budget,
read_verification,
read_verification_callback,
)?;
let mut mem = Arc::new(mem);
if let Some(tree) = Self::get_allocator_state_table(&mem)? {
#[cfg(feature = "logging")]
info!("Found valid allocator state, full repair not needed");
mem.load_allocator_state(&tree)?;
#[cfg(debug_assertions)]
Self::mark_allocated_page_for_debug(&mut mem)?;
} else {
#[cfg(feature = "logging")]
warn!("Database {:?} not shutdown cleanly. Repairing", &file_path);
let mut handle = RepairSession::new(0.0);
repair_callback(&mut handle);
if handle.aborted() {
return Err(DatabaseError::RepairAborted);
}
let [data_root, system_root] = Self::do_repair(&mut mem, repair_callback)?;
let next_transaction_id = mem.get_last_committed_transaction_id()?.next()?;
mem.commit(
data_root,
system_root,
next_transaction_id,
true,
ShrinkPolicy::Never,
)?;
}
mem.begin_writable()?;
let next_transaction_id = mem.get_last_committed_transaction_id()?.next()?;
let db = Database {
mem,
transaction_tracker: Arc::new(TransactionTracker::new(next_transaction_id)),
blob_dedup_config: blob_dedup_config.clone(),
cdc_config,
history_retention,
blob_compaction_policy,
observer,
#[cfg(feature = "metrics")]
db_metrics,
#[cfg(feature = "std")]
group_committer: GroupCommitter::new(),
};
let txn = db.begin_write().map_err(|e| e.into_storage_error())?;
if let Some(next_id) = txn.next_persistent_savepoint_id()? {
db.transaction_tracker
.restore_savepoint_counter_state(next_id)?;
}
for id in txn.list_persistent_savepoints()? {
let savepoint = match txn.get_persistent_savepoint(id) {
Ok(savepoint) => savepoint,
Err(err) => match err {
SavepointError::InvalidSavepoint => {
return Err(StorageError::Corrupted(
"invalid savepoint encountered during database initialization"
.to_string(),
)
.into());
}
SavepointError::Storage(storage) => {
return Err(storage.into());
}
},
};
db.transaction_tracker
.register_persistent_savepoint(&savepoint)?;
}
let history_ids = txn.list_history_snapshot_ids()?;
if history_retention > 0 {
for id in &history_ids {
db.transaction_tracker
.register_history_hold(TransactionId::new(*id))?;
}
}
txn.abort()?;
if history_retention == 0 && !history_ids.is_empty() {
let txn = db.begin_write().map_err(|e| e.into_storage_error())?;
txn.purge_all_history_snapshots()?;
txn.commit().map_err(|e| match e {
CommitError::Storage(s) => DatabaseError::Storage(s),
})?;
}
Ok(db)
}
fn get_allocator_state_table(
mem: &Arc<TransactionalMemory>,
) -> Result<Option<AllocatorStateTree>> {
if !mem.used_two_phase_commit() {
return Ok(None);
}
let system_table_tree = TableTree::new(
mem.get_system_root(),
PageHint::None,
Arc::new(TransactionGuard::Verification),
mem.clone(),
)?;
let Some(allocator_state_table) = system_table_tree
.get_table::<AllocatorStateKey, &[u8]>(ALLOCATOR_STATE_TABLE_NAME, TableType::Normal)
.map_err(|e| e.into_storage_error_or_internal("Unexpected TableError"))?
else {
return Ok(None);
};
let InternalTableDefinition::Normal { table_root, .. } = allocator_state_table else {
return Err(StorageError::Corrupted(
"unexpected non-normal table type for allocator state table".to_string(),
));
};
let tree = Btree::new_uncompressed(
table_root,
PageHint::None,
Arc::new(TransactionGuard::Verification),
mem.clone(),
)?;
if !mem.is_valid_allocator_state(&tree)? {
return Ok(None);
}
Ok(Some(tree))
}
fn allocate_read_transaction(&self) -> Result<TransactionGuard> {
let id = self
.transaction_tracker
.register_read_transaction(&self.mem)?;
Ok(TransactionGuard::new_read(
id,
self.transaction_tracker.clone(),
))
}
pub fn builder() -> Builder {
Builder::new()
}
pub fn begin_write(&self) -> Result<WriteTransaction, TransactionError> {
self.mem.check_io_errors()?;
let guard = TransactionGuard::new_write(
self.transaction_tracker.start_write_transaction()?,
self.transaction_tracker.clone(),
);
WriteTransaction::new(
guard,
self.transaction_tracker.clone(),
self.mem.clone(),
self.blob_dedup_config.clone(),
self.cdc_config.clone(),
self.history_retention,
Arc::clone(&self.observer),
#[cfg(feature = "metrics")]
Arc::clone(&self.db_metrics),
)
.map_err(|e| e.into())
}
pub fn observer(&self) -> &Arc<dyn DatabaseObserver> {
&self.observer
}
#[cfg(feature = "metrics")]
pub fn metrics(&self) -> &DbMetrics {
&self.db_metrics
}
pub fn begin_read_at(&self, transaction_id: u64) -> Result<ReadTransaction, TransactionError> {
let lookup_txn = self.begin_read()?;
let snapshot = lookup_txn
.get_history_snapshot_ro(transaction_id)
.map_err(TransactionError::Storage)?
.ok_or(TransactionError::Storage(
StorageError::HistorySnapshotNotFound(transaction_id),
))?;
let user_root = snapshot.user_root();
let guard = self.allocate_read_transaction()?;
drop(lookup_txn);
ReadTransaction::new_historical(
self.mem.clone(),
guard,
user_root,
Arc::clone(&self.observer),
#[cfg(feature = "metrics")]
Arc::clone(&self.db_metrics),
)
}
#[cfg(feature = "std")]
pub fn begin_read_at_time(
&self,
timestamp_ms: u64,
) -> Result<ReadTransaction, TransactionError> {
let lookup_txn = self.begin_read()?;
let ids = lookup_txn
.list_history_snapshot_ids_ro()
.map_err(TransactionError::Storage)?;
let mut best: Option<Option<BtreeHeader>> = None;
for id in ids {
if let Some(snap) = lookup_txn
.get_history_snapshot_ro(id)
.map_err(TransactionError::Storage)?
&& snap.timestamp_ms() <= timestamp_ms
{
best = Some(snap.user_root());
}
}
let best_root = best.ok_or(TransactionError::Storage(
StorageError::HistorySnapshotNotFound(timestamp_ms),
))?;
let guard = self.allocate_read_transaction()?;
drop(lookup_txn);
ReadTransaction::new_historical(
self.mem.clone(),
guard,
best_root,
Arc::clone(&self.observer),
#[cfg(feature = "metrics")]
Arc::clone(&self.db_metrics),
)
}
pub fn transaction_history(&self) -> Result<Vec<TransactionInfo>, TransactionError> {
let lookup_txn = self.begin_read()?;
let ids = lookup_txn
.list_history_snapshot_ids_ro()
.map_err(TransactionError::Storage)?;
let mut result = Vec::with_capacity(ids.len());
for id in ids {
if let Some(snap) = lookup_txn
.get_history_snapshot_ro(id)
.map_err(TransactionError::Storage)?
{
result.push(TransactionInfo {
transaction_id: id,
timestamp_ms: snap.timestamp_ms(),
});
}
}
Ok(result)
}
#[cfg(feature = "std")]
pub fn submit_write_batch(&self, batch: WriteBatch) -> Result<(), GroupCommitError> {
let (should_lead, result_rx) = self.group_committer.enqueue(batch)?;
if should_lead {
self.run_group_commit();
}
result_rx.recv().unwrap_or(Err(GroupCommitError::Shutdown))
}
#[cfg(feature = "std")]
fn run_group_commit(&self) {
let Ok(mut batches) = self.group_committer.drain_pending() else {
let _ = self.group_committer.finish_leader();
return;
};
loop {
if batches.is_empty() {
match self.group_committer.finish_leader() {
Ok(remaining) if remaining.is_empty() => return,
Ok(remaining) => {
batches = remaining;
continue;
}
Err(_) => return,
}
}
let txn = match self.begin_write() {
Ok(txn) => txn,
Err(e) => {
let msg = e.into_storage_error().to_string();
for b in batches {
let _ = b.result_tx.send(Err(GroupCommitError::TransactionFailed(
StorageError::Corrupted(msg.clone()),
)));
}
let _ = self.group_committer.finish_leader();
return;
}
};
let mut senders = Vec::with_capacity(batches.len());
let mut failed = false;
for pending in batches {
if failed {
let _ = pending.result_tx.send(Err(GroupCommitError::PeerFailed));
continue;
}
match pending.batch.apply(&txn) {
Ok(()) => {
senders.push(pending.result_tx);
}
Err(e) => {
failed = true;
let _ = pending
.result_tx
.send(Err(GroupCommitError::BatchFailed(e)));
for tx in senders.drain(..) {
let _ = tx.send(Err(GroupCommitError::PeerFailed));
}
}
}
}
if failed {
let _ = txn.abort();
let Ok(b) = self.group_committer.drain_pending() else {
let _ = self.group_committer.finish_leader();
return;
};
batches = b;
continue;
}
match txn.commit() {
Ok(()) => {
for tx in senders {
let _ = tx.send(Ok(()));
}
}
Err(e) => {
let msg = e.into_storage_error().to_string();
for tx in senders {
let _ = tx.send(Err(GroupCommitError::CommitFailed(
StorageError::Corrupted(msg.clone()),
)));
}
}
}
let Ok(b) = self.group_committer.drain_pending() else {
let _ = self.group_committer.finish_leader();
return;
};
batches = b;
}
}
fn ensure_allocator_state_table_and_trim(&self) -> Result<(), Error> {
#[cfg(feature = "logging")]
debug!("Writing allocator state table");
let mut tx = self.begin_write()?;
tx.set_quick_repair(true);
tx.set_shrink_policy(ShrinkPolicy::Maximum);
tx.commit()?;
Ok(())
}
}
impl Drop for Database {
fn drop(&mut self) {
#[cfg(feature = "std")]
self.group_committer.shutdown();
let is_panicking = {
#[cfg(feature = "std")]
{
std::thread::panicking()
}
#[cfg(not(feature = "std"))]
{
false
}
};
if !is_panicking && self.ensure_allocator_state_table_and_trim().is_err() {
#[cfg(feature = "logging")]
warn!("Failed to write allocator state table. Repair may be required at restart.");
}
if self.mem.close().is_err() {
#[cfg(feature = "logging")]
warn!("Failed to flush database file. Repair may be required at restart.");
}
}
}
pub struct CompactionHandle<'db> {
db: &'db Database,
}
impl CompactionHandle<'_> {
pub fn step(&self) -> core::result::Result<CompactionProgress, CompactionError> {
let mut txn = self.db.begin_write().map_err(|e| e.into_storage_error())?;
txn.set_two_phase_commit(true);
txn.commit().map_err(|e| e.into_storage_error())?;
let mut txn = self.db.begin_write().map_err(|e| e.into_storage_error())?;
let relocated = txn.compact_pages()?;
if relocated {
txn.commit().map_err(|e| e.into_storage_error())?;
} else {
txn.abort()?;
return Ok(CompactionProgress {
pages_relocated: 0,
complete: true,
});
}
let mut txn = self.db.begin_write().map_err(|e| e.into_storage_error())?;
txn.set_two_phase_commit(true);
txn.set_shrink_policy(ShrinkPolicy::Maximum);
txn.commit().map_err(|e| e.into_storage_error())?;
let mut txn = self.db.begin_write().map_err(|e| e.into_storage_error())?;
txn.set_two_phase_commit(true);
txn.set_shrink_policy(ShrinkPolicy::Maximum);
txn.commit().map_err(|e| e.into_storage_error())?;
let progress = CompactionProgress {
pages_relocated: 1, complete: false,
};
self.db.observer.on_compaction_step(&progress);
#[cfg(feature = "metrics")]
self.db
.db_metrics
.compaction_pages_relocated
.fetch_add(1, portable_atomic::Ordering::Relaxed);
Ok(progress)
}
pub fn run(&self) -> core::result::Result<u64, CompactionError> {
const MAX_STEPS: u64 = 10_000;
let mut steps = 0u64;
loop {
let progress = self.step()?;
if progress.complete {
break;
}
steps += 1;
if steps >= MAX_STEPS {
break;
}
}
self.db.observer.on_compaction_complete(steps);
Ok(steps)
}
}
#[derive(Debug, Clone, Copy)]
pub struct BlobCompactionProgress {
pub blobs_relocated: u64,
pub live_bytes: u64,
pub phase: u8,
pub complete: bool,
}
pub struct BlobCompactionHandle<'db> {
db: &'db Database,
stats: BlobStats,
phase: u8,
blobs_relocated: u64,
live_bytes: u64,
}
impl BlobCompactionHandle<'_> {
pub fn step(&mut self) -> core::result::Result<BlobCompactionProgress, CompactionError> {
if self.phase == 0 {
let mut txn = self.db.begin_write().map_err(|e| e.into_storage_error())?;
let result = txn.compact_blobs_pass(false);
match result {
Ok((relocated, live_size)) => {
txn.set_two_phase_commit(true);
txn.commit().map_err(|e| e.into_storage_error())?;
self.blobs_relocated = relocated;
self.live_bytes = live_size;
self.phase = 1;
Ok(BlobCompactionProgress {
blobs_relocated: relocated,
live_bytes: live_size,
phase: 1,
complete: false,
})
}
Err(e) => {
txn.abort().map_err(CompactionError::Storage)?;
Err(CompactionError::Storage(e))
}
}
} else if self.phase == 1 {
let mut txn = self.db.begin_write().map_err(|e| e.into_storage_error())?;
let result = txn.compact_blobs_pass(true);
match result {
Ok(_) => {
txn.set_two_phase_commit(true);
txn.commit().map_err(|e| e.into_storage_error())?;
}
Err(e) => {
txn.abort().map_err(CompactionError::Storage)?;
return Err(CompactionError::Storage(e));
}
}
self.phase = 2;
Ok(BlobCompactionProgress {
blobs_relocated: self.blobs_relocated,
live_bytes: self.live_bytes,
phase: 2,
complete: true,
})
} else {
Ok(BlobCompactionProgress {
blobs_relocated: self.blobs_relocated,
live_bytes: self.live_bytes,
phase: 2,
complete: true,
})
}
}
pub fn run(&mut self) -> core::result::Result<BlobCompactionReport, CompactionError> {
loop {
let progress = self.step()?;
if progress.complete {
let bytes_reclaimed = self.stats.region_bytes.saturating_sub(self.live_bytes);
return Ok(BlobCompactionReport {
blobs_relocated: self.blobs_relocated,
live_bytes: self.live_bytes,
bytes_reclaimed,
was_noop: false,
});
}
}
}
}
pub struct RepairSession {
progress: f64,
aborted: bool,
}
impl RepairSession {
pub(crate) fn new(progress: f64) -> Self {
Self {
progress,
aborted: false,
}
}
pub(crate) fn aborted(&self) -> bool {
self.aborted
}
pub fn abort(&mut self) {
self.aborted = true;
}
pub fn progress(&self) -> f64 {
self.progress
}
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum ReadVerification {
None,
Sampled { rate: f32 },
Full,
}
impl Default for ReadVerification {
fn default() -> Self {
Self::None
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ReadVerificationAction {
ReturnError,
Continue,
}
pub type ReadVerificationCallback = dyn Fn(u64) -> ReadVerificationAction + Send + Sync + 'static;
pub struct Builder {
page_size: usize,
region_size: Option<u64>,
read_cache_size_bytes: usize,
write_cache_size_bytes: usize,
compression: CompressionConfig,
repair_callback: Box<dyn Fn(&mut RepairSession)>,
blob_dedup_config: BlobDedupConfig,
memory_budget: Option<usize>,
cdc_config: CdcConfig,
history_retention: u64,
read_verification: ReadVerification,
read_verification_callback: Option<Arc<ReadVerificationCallback>>,
observer: Option<Arc<dyn DatabaseObserver>>,
blob_compaction_policy: BlobCompactionPolicy,
}
impl Builder {
#[allow(clippy::new_without_default)]
pub fn new() -> Self {
let mut result = Self {
page_size: PAGE_SIZE,
region_size: None,
read_cache_size_bytes: 0,
write_cache_size_bytes: 0,
compression: CompressionConfig::None,
repair_callback: Box::new(|_| {}),
blob_dedup_config: BlobDedupConfig::default(),
memory_budget: None,
cdc_config: CdcConfig::default(),
history_retention: 0,
read_verification: ReadVerification::None,
read_verification_callback: None,
observer: None,
blob_compaction_policy: BlobCompactionPolicy::default(),
};
result.set_cache_size(Self::default_cache_size());
result
}
fn default_cache_size() -> usize {
#[cfg(feature = "std")]
{
Self::detect_system_cache_size()
}
#[cfg(not(feature = "std"))]
{
1024 * 1024 * 1024
}
}
#[cfg(feature = "std")]
fn detect_system_cache_size() -> usize {
const MIN_CACHE: usize = 16 * 1024 * 1024; const MAX_CACHE: usize = 1024 * 1024 * 1024;
match Self::total_physical_memory() {
Some(bytes) => (bytes / 4).clamp(MIN_CACHE, MAX_CACHE),
None => MAX_CACHE,
}
}
#[cfg(feature = "std")]
fn total_physical_memory() -> Option<usize> {
#[cfg(target_os = "linux")]
{
const _SC_PHYS_PAGES: core::ffi::c_int = 85;
const _SC_PAGESIZE: core::ffi::c_int = 30;
unsafe extern "C" {
fn sysconf(name: core::ffi::c_int) -> core::ffi::c_long;
}
let pages = unsafe { sysconf(_SC_PHYS_PAGES) };
let page_size = unsafe { sysconf(_SC_PAGESIZE) };
if pages > 0 && page_size > 0 {
#[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
Some((pages as usize).saturating_mul(page_size as usize))
} else {
None
}
}
#[cfg(target_os = "macos")]
{
unsafe extern "C" {
fn sysctlbyname(
name: *const core::ffi::c_char,
oldp: *mut core::ffi::c_void,
oldlenp: *mut usize,
newp: *mut core::ffi::c_void,
newlen: usize,
) -> core::ffi::c_int;
}
let mut size: u64 = 0;
let mut len = core::mem::size_of::<u64>();
let name = b"hw.memsize\0";
let ret = unsafe {
sysctlbyname(
name.as_ptr() as *const _,
&mut size as *mut u64 as *mut _,
&mut len,
core::ptr::null_mut(),
0,
)
};
if ret == 0 {
#[allow(clippy::cast_possible_truncation)]
Some(size.min(usize::MAX as u64) as usize)
} else {
None
}
}
#[cfg(target_os = "windows")]
{
#[repr(C)]
struct MemoryStatusEx {
dw_length: u32,
dw_memory_load: u32,
ull_total_phys: u64,
ull_avail_phys: u64,
ull_total_page_file: u64,
ull_avail_page_file: u64,
ull_total_virtual: u64,
ull_avail_virtual: u64,
ull_avail_extended_virtual: u64,
}
unsafe extern "system" {
fn GlobalMemoryStatusEx(lp_buffer: *mut MemoryStatusEx) -> i32;
}
let mut status = MemoryStatusEx {
#[allow(clippy::cast_possible_truncation)]
dw_length: core::mem::size_of::<MemoryStatusEx>() as u32,
dw_memory_load: 0,
ull_total_phys: 0,
ull_avail_phys: 0,
ull_total_page_file: 0,
ull_avail_page_file: 0,
ull_total_virtual: 0,
ull_avail_virtual: 0,
ull_avail_extended_virtual: 0,
};
let ret = unsafe { GlobalMemoryStatusEx(core::ptr::addr_of_mut!(status)) };
if ret != 0 {
let total = status.ull_total_phys;
#[allow(clippy::cast_possible_truncation)]
Some(total.min(usize::MAX as u64) as usize)
} else {
None
}
}
#[cfg(not(any(target_os = "linux", target_os = "macos", target_os = "windows")))]
{
None
}
}
pub fn set_repair_callback(
&mut self,
callback: impl Fn(&mut RepairSession) + 'static,
) -> &mut Self {
self.repair_callback = Box::new(callback);
self
}
pub fn set_observer(&mut self, observer: impl DatabaseObserver) -> &mut Self {
self.observer = Some(Arc::new(observer));
self
}
pub fn set_blob_compaction_policy(&mut self, policy: BlobCompactionPolicy) -> &mut Self {
self.blob_compaction_policy = policy;
self
}
#[cfg(any(fuzzing, test))]
pub fn set_page_size(&mut self, size: usize) -> &mut Self {
assert!(size.is_power_of_two());
self.page_size = std::cmp::max(size, 512);
self
}
pub fn set_cache_size(&mut self, bytes: usize) -> &mut Self {
self.read_cache_size_bytes = bytes / 10 * 9;
self.write_cache_size_bytes = bytes / 10;
self.memory_budget = Some(bytes);
self
}
pub fn set_memory_budget(&mut self, bytes: usize) -> &mut Self {
assert!(
bytes >= 16384,
"Memory budget must be at least 16 KiB (got {bytes} bytes). \
Budgets below 4 page sizes cannot cache any data."
);
self.memory_budget = Some(bytes);
self.read_cache_size_bytes = bytes / 100 * 70;
self.write_cache_size_bytes = bytes / 100 * 20;
self
}
#[cfg(any(test, fuzzing))]
pub fn set_region_size(&mut self, size: u64) -> &mut Self {
assert!(size.is_power_of_two());
self.region_size = Some(size);
self
}
pub fn set_compression(&mut self, compression: CompressionConfig) -> &mut Self {
self.compression = compression;
self
}
pub fn set_blob_dedup(&mut self, enabled: bool) -> &mut Self {
self.blob_dedup_config.enabled = enabled;
self
}
pub fn set_blob_dedup_min_size(&mut self, min_size: usize) -> &mut Self {
self.blob_dedup_config.min_size = min_size;
self
}
pub fn set_cdc(&mut self, config: CdcConfig) -> &mut Self {
self.cdc_config = config;
self
}
pub fn set_history_retention(&mut self, max_snapshots: u64) -> &mut Self {
self.history_retention = max_snapshots;
self
}
pub fn set_read_verification(&mut self, mode: ReadVerification) -> &mut Self {
self.read_verification = mode;
self
}
pub fn set_read_verification_callback(
&mut self,
callback: impl Fn(u64) -> ReadVerificationAction + Send + Sync + 'static,
) -> &mut Self {
self.read_verification_callback = Some(Arc::new(callback));
self
}
#[cfg(feature = "std")]
pub fn create(&self, path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
let file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(path)?;
Database::new(
Box::new(FileBackend::new(file)?),
true,
self.page_size,
self.region_size,
self.read_cache_size_bytes,
self.write_cache_size_bytes,
&self.repair_callback,
self.compression,
self.blob_dedup_config.clone(),
self.memory_budget,
self.cdc_config.clone(),
self.history_retention,
self.read_verification,
self.read_verification_callback.clone(),
self.blob_compaction_policy,
self.resolve_observer(),
#[cfg(feature = "metrics")]
Self::resolve_metrics(),
)
}
#[cfg(feature = "std")]
pub fn open(&self, path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
let file = OpenOptions::new().read(true).write(true).open(path)?;
Database::new(
Box::new(FileBackend::new(file)?),
false,
self.page_size,
None,
self.read_cache_size_bytes,
self.write_cache_size_bytes,
&self.repair_callback,
self.compression,
self.blob_dedup_config.clone(),
self.memory_budget,
self.cdc_config.clone(),
self.history_retention,
self.read_verification,
self.read_verification_callback.clone(),
self.blob_compaction_policy,
self.resolve_observer(),
#[cfg(feature = "metrics")]
Self::resolve_metrics(),
)
}
#[cfg(feature = "std")]
pub fn open_read_only(
&self,
path: impl AsRef<Path>,
) -> Result<ReadOnlyDatabase, DatabaseError> {
let file = OpenOptions::new().read(true).open(path)?;
ReadOnlyDatabase::new(
Box::new(FileBackend::new_internal(file, true)?),
self.page_size,
None,
self.read_cache_size_bytes,
self.compression,
self.memory_budget,
self.read_verification,
self.read_verification_callback.clone(),
)
}
#[cfg(feature = "std")]
pub fn create_file(&self, file: File) -> Result<Database, DatabaseError> {
Database::new(
Box::new(FileBackend::new(file)?),
true,
self.page_size,
self.region_size,
self.read_cache_size_bytes,
self.write_cache_size_bytes,
&self.repair_callback,
self.compression,
self.blob_dedup_config.clone(),
self.memory_budget,
self.cdc_config.clone(),
self.history_retention,
self.read_verification,
self.read_verification_callback.clone(),
self.blob_compaction_policy,
self.resolve_observer(),
#[cfg(feature = "metrics")]
Self::resolve_metrics(),
)
}
pub fn create_with_backend(
&self,
backend: impl StorageBackend,
) -> Result<Database, DatabaseError> {
Database::new(
Box::new(backend),
true,
self.page_size,
self.region_size,
self.read_cache_size_bytes,
self.write_cache_size_bytes,
&self.repair_callback,
self.compression,
self.blob_dedup_config.clone(),
self.memory_budget,
self.cdc_config.clone(),
self.history_retention,
self.read_verification,
self.read_verification_callback.clone(),
self.blob_compaction_policy,
self.resolve_observer(),
#[cfg(feature = "metrics")]
Self::resolve_metrics(),
)
}
fn resolve_observer(&self) -> Arc<dyn DatabaseObserver> {
self.observer
.as_ref()
.map_or_else(default_observer, Arc::clone)
}
#[cfg(feature = "metrics")]
fn resolve_metrics() -> Arc<DbMetrics> {
Arc::new(DbMetrics::new())
}
}
impl Debug for Database {
fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
f.debug_struct("Database").finish()
}
}
#[cfg(test)]
mod test {
use crate::backends::FileBackend;
use crate::error::BackendError;
use crate::{
CommitError, Database, DatabaseError, Durability, ReadableDatabase, ReadableTable,
ReadableTableMetadata, StorageBackend, StorageError, TableDefinition, TransactionError,
};
use core::sync::atomic::Ordering;
use portable_atomic::AtomicU64;
use std::fs::File;
use std::io::{ErrorKind, Read, Seek, SeekFrom};
use std::sync::Arc;
#[derive(Debug)]
struct FailingBackend {
inner: FileBackend,
countdown: Arc<AtomicU64>,
}
impl FailingBackend {
fn new(backend: FileBackend, countdown: u64) -> Self {
Self {
inner: backend,
countdown: Arc::new(AtomicU64::new(countdown)),
}
}
fn check_countdown(&self) -> Result<(), BackendError> {
if self.countdown.load(Ordering::SeqCst) == 0 {
return Err(BackendError::from(std::io::Error::from(ErrorKind::Other)));
}
Ok(())
}
fn decrement_countdown(&self) -> Result<(), BackendError> {
if self
.countdown
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |x| {
if x > 0 { Some(x - 1) } else { None }
})
.is_err()
{
return Err(BackendError::from(std::io::Error::from(ErrorKind::Other)));
}
Ok(())
}
}
impl StorageBackend for FailingBackend {
fn len(&self) -> Result<u64, BackendError> {
self.inner.len()
}
fn read(&self, offset: u64, out: &mut [u8]) -> Result<(), BackendError> {
self.check_countdown()?;
self.inner.read(offset, out)
}
fn set_len(&self, len: u64) -> Result<(), BackendError> {
self.inner.set_len(len)
}
fn sync_data(&self) -> Result<(), BackendError> {
self.check_countdown()?;
self.inner.sync_data()
}
fn write(&self, offset: u64, data: &[u8]) -> Result<(), BackendError> {
self.decrement_countdown()?;
self.inner.write(offset, data)
}
}
#[test]
fn crash_regression4() {
let tmpfile = crate::create_tempfile();
let (file, path) = tmpfile.into_parts();
let backend = FailingBackend::new(FileBackend::new(file).unwrap(), 20);
let db = Database::builder()
.set_cache_size(12686)
.set_page_size(8 * 1024)
.set_region_size(32 * 4096)
.create_with_backend(backend)
.unwrap();
let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
let tx = db.begin_write().unwrap();
let _savepoint = tx.ephemeral_savepoint().unwrap();
let _persistent_savepoint = tx.persistent_savepoint().unwrap();
tx.commit().unwrap();
let tx = db.begin_write().unwrap();
{
let mut table = tx.open_table(table_def).unwrap();
let _ = table.insert_reserve(118821, 360).unwrap();
}
let result = tx.commit();
assert!(result.is_err());
drop(db);
Database::builder()
.set_cache_size(1024 * 1024)
.set_page_size(8 * 1024)
.set_region_size(32 * 4096)
.create(&path)
.unwrap();
}
#[test]
fn transient_io_error() {
let tmpfile = crate::create_tempfile();
let (file, path) = tmpfile.into_parts();
let backend = FailingBackend::new(FileBackend::new(file).unwrap(), u64::MAX);
let countdown = backend.countdown.clone();
let db = Database::builder()
.set_cache_size(0)
.create_with_backend(backend)
.unwrap();
let table_def: TableDefinition<u64, u64> = TableDefinition::new("x");
let tx = db.begin_write().unwrap();
{
let mut table = tx.open_table(table_def).unwrap();
table.insert(0, 0).unwrap();
}
tx.commit().unwrap();
let tx = db.begin_write().unwrap();
{
let mut table = tx.open_table(table_def).unwrap();
table.insert(0, 1).unwrap();
}
tx.commit().unwrap();
let tx = db.begin_write().unwrap();
countdown.store(0, Ordering::SeqCst);
let result = tx.commit().err().unwrap();
assert!(matches!(result, CommitError::Storage(StorageError::Io(_))));
let result = db.begin_write().err().unwrap();
assert!(matches!(
result,
TransactionError::Storage(StorageError::PreviousIo)
));
countdown.store(u64::MAX, Ordering::SeqCst);
drop(db);
let mut file = File::open(&path).unwrap();
file.seek(SeekFrom::Start(9)).unwrap();
let mut god_byte = vec![0u8];
assert_eq!(file.read(&mut god_byte).unwrap(), 1);
assert_ne!(god_byte[0] & 2, 0);
}
#[test]
fn small_pages() {
let tmpfile = crate::create_tempfile();
let db = Database::builder()
.set_page_size(512)
.create(tmpfile.path())
.unwrap();
let table_definition: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
let txn = db.begin_write().unwrap();
{
txn.open_table(table_definition).unwrap();
}
txn.commit().unwrap();
}
#[test]
fn small_pages2() {
let tmpfile = crate::create_tempfile();
let db = Database::builder()
.set_page_size(512)
.create(tmpfile.path())
.unwrap();
let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
let mut tx = db.begin_write().unwrap();
tx.set_two_phase_commit(true);
let savepoint0 = tx.ephemeral_savepoint().unwrap();
{
tx.open_table(table_def).unwrap();
}
tx.commit().unwrap();
let mut tx = db.begin_write().unwrap();
tx.set_two_phase_commit(true);
let savepoint1 = tx.ephemeral_savepoint().unwrap();
tx.restore_savepoint(&savepoint0).unwrap();
tx.set_durability(Durability::None).unwrap();
{
let mut t = tx.open_table(table_def).unwrap();
t.insert_reserve(&660503, 489).unwrap().as_mut().fill(0xFF);
assert!(t.remove(&291295).unwrap().is_none());
}
tx.commit().unwrap();
let mut tx = db.begin_write().unwrap();
tx.set_two_phase_commit(true);
tx.restore_savepoint(&savepoint0).unwrap();
{
tx.open_table(table_def).unwrap();
}
tx.commit().unwrap();
let mut tx = db.begin_write().unwrap();
tx.set_two_phase_commit(true);
let savepoint2 = tx.ephemeral_savepoint().unwrap();
drop(savepoint0);
tx.restore_savepoint(&savepoint2).unwrap();
{
let mut t = tx.open_table(table_def).unwrap();
assert!(t.get(&2059).unwrap().is_none());
assert!(t.remove(&145227).unwrap().is_none());
assert!(t.remove(&145227).unwrap().is_none());
}
tx.commit().unwrap();
let mut tx = db.begin_write().unwrap();
tx.set_two_phase_commit(true);
let savepoint3 = tx.ephemeral_savepoint().unwrap();
drop(savepoint1);
tx.restore_savepoint(&savepoint3).unwrap();
{
tx.open_table(table_def).unwrap();
}
tx.commit().unwrap();
let mut tx = db.begin_write().unwrap();
tx.set_two_phase_commit(true);
let savepoint4 = tx.ephemeral_savepoint().unwrap();
drop(savepoint2);
tx.restore_savepoint(&savepoint3).unwrap();
tx.set_durability(Durability::None).unwrap();
{
let mut t = tx.open_table(table_def).unwrap();
assert!(t.remove(&207936).unwrap().is_none());
}
tx.abort().unwrap();
let mut tx = db.begin_write().unwrap();
tx.set_two_phase_commit(true);
let savepoint5 = tx.ephemeral_savepoint().unwrap();
drop(savepoint3);
assert!(tx.restore_savepoint(&savepoint4).is_err());
{
tx.open_table(table_def).unwrap();
}
tx.commit().unwrap();
let mut tx = db.begin_write().unwrap();
tx.set_two_phase_commit(true);
tx.restore_savepoint(&savepoint5).unwrap();
tx.set_durability(Durability::None).unwrap();
{
tx.open_table(table_def).unwrap();
}
tx.commit().unwrap();
}
#[test]
fn small_pages3() {
let tmpfile = crate::create_tempfile();
let db = Database::builder()
.set_page_size(1024)
.create(tmpfile.path())
.unwrap();
let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
let mut tx = db.begin_write().unwrap();
let _savepoint0 = tx.ephemeral_savepoint().unwrap();
tx.set_durability(Durability::None).unwrap();
{
let mut t = tx.open_table(table_def).unwrap();
let value = vec![0; 306];
t.insert(&539717, value.as_slice()).unwrap();
}
tx.abort().unwrap();
let mut tx = db.begin_write().unwrap();
let savepoint1 = tx.ephemeral_savepoint().unwrap();
tx.restore_savepoint(&savepoint1).unwrap();
tx.set_durability(Durability::None).unwrap();
{
let mut t = tx.open_table(table_def).unwrap();
let value = vec![0; 2008];
t.insert(&784384, value.as_slice()).unwrap();
}
tx.abort().unwrap();
}
#[test]
fn small_pages4() {
let tmpfile = crate::create_tempfile();
let db = Database::builder()
.set_cache_size(1024 * 1024)
.set_page_size(1024)
.create(tmpfile.path())
.unwrap();
let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
let tx = db.begin_write().unwrap();
{
tx.open_table(table_def).unwrap();
}
tx.commit().unwrap();
let tx = db.begin_write().unwrap();
{
let mut t = tx.open_table(table_def).unwrap();
assert!(t.get(&131072).unwrap().is_none());
let value = vec![0xFF; 1130];
t.insert(&42394, value.as_slice()).unwrap();
t.insert_reserve(&744037, 3645).unwrap().as_mut().fill(0xFF);
assert!(t.get(&0).unwrap().is_none());
}
tx.abort().unwrap();
let tx = db.begin_write().unwrap();
{
let mut t = tx.open_table(table_def).unwrap();
t.insert_reserve(&118749, 734).unwrap().as_mut().fill(0xFF);
}
tx.abort().unwrap();
}
#[test]
fn dynamic_shrink() {
let tmpfile = crate::create_tempfile();
let table_definition: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
let big_value = vec![0u8; 1024];
let db = Database::builder()
.set_region_size(1024 * 1024)
.create(tmpfile.path())
.unwrap();
let txn = db.begin_write().unwrap();
{
let mut table = txn.open_table(table_definition).unwrap();
for i in 0..2048 {
table.insert(&i, big_value.as_slice()).unwrap();
}
}
txn.commit().unwrap();
let file_size = tmpfile.as_file().metadata().unwrap().len();
let txn = db.begin_write().unwrap();
{
let mut table = txn.open_table(table_definition).unwrap();
for i in 0..2048 {
table.remove(&i).unwrap();
}
}
txn.commit().unwrap();
let txn = db.begin_write().unwrap();
{
let mut table = txn.open_table(table_definition).unwrap();
table.insert(0, [].as_slice()).unwrap();
}
txn.commit().unwrap();
let txn = db.begin_write().unwrap();
{
let mut table = txn.open_table(table_definition).unwrap();
table.remove(0).unwrap();
}
txn.commit().unwrap();
let txn = db.begin_write().unwrap();
txn.commit().unwrap();
let final_file_size = tmpfile.as_file().metadata().unwrap().len();
assert!(final_file_size < file_size);
}
#[test]
fn create_new_db_in_empty_file() {
let tmpfile = crate::create_tempfile();
let _db = Database::builder()
.create_file(tmpfile.into_file())
.unwrap();
}
#[test]
fn open_missing_file() {
let tmpfile = crate::create_tempfile();
let err = Database::builder()
.open(tmpfile.path().with_extension("missing"))
.unwrap_err();
match err {
DatabaseError::Storage(StorageError::Io(err)) if err.kind() == ErrorKind::NotFound => {}
err => panic!("Unexpected error for empty file: {err}"),
}
}
#[test]
fn open_empty_file() {
let tmpfile = crate::create_tempfile();
let err = Database::builder().open(tmpfile.path()).unwrap_err();
match err {
DatabaseError::Storage(StorageError::FormatError { .. }) => {}
err => panic!("Unexpected error for empty file: {err}"),
}
}
#[test]
fn salvage_valid_database() {
const T1: TableDefinition<&str, u64> = TableDefinition::new("users");
const T2: TableDefinition<u64, &[u8]> = TableDefinition::new("blobs");
let src = crate::create_tempfile();
let dst = crate::create_tempfile();
{
let db = Database::create(src.path()).unwrap();
let txn = db.begin_write().unwrap();
{
let mut t = txn.open_table(T1).unwrap();
t.insert("alice", &1u64).unwrap();
t.insert("bob", &2u64).unwrap();
t.insert("charlie", &3u64).unwrap();
}
{
let mut t = txn.open_table(T2).unwrap();
t.insert(100u64, b"hello".as_slice()).unwrap();
t.insert(200u64, b"world".as_slice()).unwrap();
}
txn.commit().unwrap();
}
let report = Database::salvage(src.path(), dst.path()).unwrap();
assert_eq!(report.tables_found, 2);
assert_eq!(report.tables_recovered, 2);
assert!(
report.rows_recovered >= 5,
"expected >= 5 rows, got {rows}",
rows = report.rows_recovered
);
assert_eq!(report.rows_lost, 0);
assert!(report.corrupt_details.is_empty());
let db = Database::open(dst.path()).unwrap();
let txn = db.begin_read().unwrap();
{
let raw: TableDefinition<&[u8], &[u8]> = TableDefinition::new("users");
let t = txn.open_table(raw).unwrap();
assert_eq!(t.len().unwrap(), 3);
}
{
let raw: TableDefinition<&[u8], &[u8]> = TableDefinition::new("blobs");
let t = txn.open_table(raw).unwrap();
assert_eq!(t.len().unwrap(), 2);
}
}
#[test]
fn salvage_empty_file_returns_error() {
let src = crate::create_tempfile();
let dst = crate::create_tempfile();
let result = Database::salvage(src.path(), dst.path());
assert!(result.is_err());
}
#[test]
fn salvage_with_data_corruption() {
const TABLE: TableDefinition<u64, &[u8]> = TableDefinition::new("data");
let src = crate::create_tempfile();
let dst = crate::create_tempfile();
{
let db = Database::create(src.path()).unwrap();
let txn = db.begin_write().unwrap();
{
let mut t = txn.open_table(TABLE).unwrap();
let payload = [0xABu8; 200];
for i in 0..500u64 {
t.insert(i, payload.as_slice()).unwrap();
}
}
txn.commit().unwrap();
}
{
use std::io::{Seek, SeekFrom, Write};
let mut f = std::fs::OpenOptions::new()
.write(true)
.open(src.path())
.unwrap();
let file_len = f.metadata().unwrap().len();
let corrupt_offset = file_len / 3;
f.seek(SeekFrom::Start(corrupt_offset)).unwrap();
f.write_all(&[0xFF; 4096]).unwrap();
f.sync_all().unwrap();
}
let report = Database::salvage(src.path(), dst.path()).unwrap();
assert!(
report.rows_recovered > 0 || report.tables_found > 0,
"expected some recovery, got: {report:?}"
);
}
#[test]
fn online_compaction_reduces_file_size() {
const TABLE: TableDefinition<u64, &[u8]> = TableDefinition::new("data");
let tmpfile = crate::create_tempfile();
let db = Database::create(tmpfile.path()).unwrap();
let payload = [0xCDu8; 512];
let txn = db.begin_write().unwrap();
{
let mut t = txn.open_table(TABLE).unwrap();
for i in 0..500u64 {
t.insert(i, payload.as_slice()).unwrap();
}
}
txn.commit().unwrap();
let txn = db.begin_write().unwrap();
{
let mut t = txn.open_table(TABLE).unwrap();
for i in 0..450u64 {
t.remove(i).unwrap();
}
}
txn.commit().unwrap();
let size_before = std::fs::metadata(tmpfile.path()).unwrap().len();
let handle = db.start_compaction().unwrap();
let steps = handle.run().unwrap();
assert!(steps > 0, "expected at least one compaction step");
let size_after = std::fs::metadata(tmpfile.path()).unwrap().len();
assert!(
size_after < size_before,
"file should shrink: before={size_before}, after={size_after}"
);
let txn = db.begin_read().unwrap();
let t = txn.open_table(TABLE).unwrap();
assert_eq!(t.len().unwrap(), 50);
for i in 450..500u64 {
let val = t.get(i).unwrap().unwrap();
assert_eq!(val.value(), payload.as_slice());
}
}
#[test]
fn online_compaction_allows_concurrent_reads() {
const TABLE: TableDefinition<u64, u64> = TableDefinition::new("nums");
let tmpfile = crate::create_tempfile();
let db = Database::create(tmpfile.path()).unwrap();
let txn = db.begin_write().unwrap();
{
let mut t = txn.open_table(TABLE).unwrap();
for i in 0..100u64 {
t.insert(i, &(i * 10)).unwrap();
}
}
txn.commit().unwrap();
let txn = db.begin_write().unwrap();
{
let mut t = txn.open_table(TABLE).unwrap();
for i in 0..50u64 {
t.remove(i).unwrap();
}
}
txn.commit().unwrap();
let read_txn = db.begin_read().unwrap();
let read_table = read_txn.open_table(TABLE).unwrap();
let handle = db.start_compaction().unwrap();
let progress = handle.step().unwrap();
let _ = progress;
assert_eq!(read_table.len().unwrap(), 50);
for i in 50..100u64 {
let val = read_table.get(i).unwrap().unwrap();
assert_eq!(val.value(), i * 10);
}
}
#[test]
fn online_compaction_rejects_persistent_savepoint() {
const TABLE: TableDefinition<u64, u64> = TableDefinition::new("sp_test");
let tmpfile = crate::create_tempfile();
let db = Database::create(tmpfile.path()).unwrap();
let txn = db.begin_write().unwrap();
{
let mut t = txn.open_table(TABLE).unwrap();
t.insert(1u64, &1u64).unwrap();
}
txn.commit().unwrap();
let txn = db.begin_write().unwrap();
let _sp = txn.persistent_savepoint().unwrap();
txn.commit().unwrap();
let result = db.start_compaction();
assert!(result.is_err());
}
#[test]
fn default_cache_size_within_bounds() {
let size = crate::db::Builder::default_cache_size();
let min = 16 * 1024 * 1024; let max = 1024 * 1024 * 1024; assert!(
size >= min && size <= max,
"default_cache_size {size} outside [{min}, {max}]"
);
}
}