use crate::transaction_tracker::TransactionId;
use crate::tree_store::Checksum;
use crate::tree_store::btree_base::BtreeHeader;
use crate::tree_store::page_store::compression::CompressionConfig;
use crate::tree_store::page_store::layout::{DatabaseLayout, RegionLayout};
use crate::tree_store::page_store::page_manager::{
FILE_FORMAT_VERSION1, FILE_FORMAT_VERSION2, FILE_FORMAT_VERSION3, FILE_FORMAT_VERSION4,
FILE_FORMAT_VERSION5, xxh3_checksum,
};
use crate::{DatabaseError, Result, StorageError};
use alloc::format;
use alloc::string::ToString;
use core::mem::size_of;
pub(super) const MAGICNUMBER: [u8; 9] = [b'r', b'e', b'd', b'b', 0x1A, 0x0A, 0xA9, 0x0D, 0x0A];
pub(super) const MIRROR_MAGIC: [u8; 9] = [b'm', b'e', b'd', b'b', 0x1A, 0x0A, 0xA9, 0x0D, 0x0A];
const GOD_BYTE_OFFSET: usize = MAGICNUMBER.len();
const PAGE_SIZE_OFFSET: usize = GOD_BYTE_OFFSET + size_of::<u8>() + 2; const REGION_HEADER_PAGES_OFFSET: usize = PAGE_SIZE_OFFSET + size_of::<u32>();
const REGION_MAX_DATA_PAGES_OFFSET: usize = REGION_HEADER_PAGES_OFFSET + size_of::<u32>();
const NUM_FULL_REGIONS_OFFSET: usize = REGION_MAX_DATA_PAGES_OFFSET + size_of::<u32>();
const TRAILING_REGION_DATA_PAGES_OFFSET: usize = NUM_FULL_REGIONS_OFFSET + size_of::<u32>();
const _UNUSED3_OFFSET: usize = TRAILING_REGION_DATA_PAGES_OFFSET + size_of::<u32>();
const COMPRESSION_ALGO_OFFSET: usize = _UNUSED3_OFFSET + size_of::<u32>();
const TRANSACTION_SIZE: usize = 128;
const TRANSACTION_0_OFFSET: usize = 64;
const TRANSACTION_1_OFFSET: usize = TRANSACTION_0_OFFSET + TRANSACTION_SIZE;
pub(super) const DB_HEADER_SIZE: usize = TRANSACTION_1_OFFSET + TRANSACTION_SIZE;
const PRIMARY_BIT: u8 = 1;
const RECOVERY_REQUIRED: u8 = 2;
const TWO_PHASE_COMMIT: u8 = 4;
const VERSION_OFFSET: usize = 0;
const USER_ROOT_NON_NULL_OFFSET: usize = size_of::<u8>();
const SYSTEM_ROOT_NON_NULL_OFFSET: usize = USER_ROOT_NON_NULL_OFFSET + size_of::<u8>();
const _UNUSED_OFFSET: usize = SYSTEM_ROOT_NON_NULL_OFFSET + size_of::<u8>();
const PADDING: usize = 4;
const USER_ROOT_OFFSET: usize = _UNUSED_OFFSET + size_of::<u8>() + PADDING;
const SYSTEM_ROOT_OFFSET: usize = USER_ROOT_OFFSET + BtreeHeader::serialized_size();
const _UNUSED2_OFFSET: usize = SYSTEM_ROOT_OFFSET + BtreeHeader::serialized_size();
const BLOB_REGION_OFFSET_OFFSET: usize = _UNUSED2_OFFSET;
const BLOB_REGION_LENGTH_OFFSET: usize = BLOB_REGION_OFFSET_OFFSET + size_of::<u64>();
const BLOB_NEXT_SEQUENCE_OFFSET: usize = BLOB_REGION_LENGTH_OFFSET + size_of::<u64>();
const BLOB_HLC_STATE_OFFSET: usize = BLOB_NEXT_SEQUENCE_OFFSET + size_of::<u64>();
const TRANSACTION_ID_OFFSET: usize = _UNUSED2_OFFSET + BtreeHeader::serialized_size();
const TRANSACTION_LAST_FIELD: usize = TRANSACTION_ID_OFFSET + size_of::<u64>();
const SLOT_CHECKSUM_OFFSET: usize = TRANSACTION_SIZE - size_of::<Checksum>();
pub(crate) const PAGE_SIZE: usize = 4096;
fn get_u32(data: &[u8]) -> u32 {
if data.len() < size_of::<u32>() {
return 0;
}
u32::from_le_bytes([data[0], data[1], data[2], data[3]])
}
fn get_u64(data: &[u8]) -> u64 {
if data.len() < size_of::<u64>() {
return 0;
}
u64::from_le_bytes([
data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7],
])
}
#[derive(Copy, Clone)]
pub(super) struct HeaderRepairInfo {
pub(super) invalid_magic_number: bool,
pub(super) primary_corrupted: bool,
pub(super) secondary_corrupted: bool,
}
#[derive(Clone)]
pub(super) struct DatabaseHeader {
primary_slot: usize,
pub(super) recovery_required: bool,
pub(super) two_phase_commit: bool,
page_size: u32,
region_header_pages: u32,
region_max_data_pages: u32,
full_regions: u32,
trailing_partial_region_pages: u32,
pub(super) compression: CompressionConfig,
transaction_slots: [TransactionHeader; 2],
}
impl DatabaseHeader {
pub(super) fn new(
layout: DatabaseLayout,
transaction_id: TransactionId,
compression: CompressionConfig,
) -> Self {
#[allow(clippy::assertions_on_constants)]
{
assert!(TRANSACTION_LAST_FIELD <= SLOT_CHECKSUM_OFFSET);
}
let version = if compression.is_enabled() {
FILE_FORMAT_VERSION4
} else {
FILE_FORMAT_VERSION3
};
let slot = TransactionHeader::new(transaction_id, version);
Self {
primary_slot: 0,
recovery_required: true,
two_phase_commit: false,
page_size: layout.full_region_layout().page_size(),
region_header_pages: layout.full_region_layout().get_header_pages(),
region_max_data_pages: layout.full_region_layout().num_pages(),
full_regions: layout.num_full_regions(),
trailing_partial_region_pages: layout
.trailing_region_layout()
.map(|x| x.num_pages())
.unwrap_or_default(),
compression,
transaction_slots: [slot.clone(), slot],
}
}
pub(super) fn page_size(&self) -> u32 {
self.page_size
}
pub(super) fn layout(&self) -> DatabaseLayout {
let full_layout = RegionLayout::new(
self.region_max_data_pages,
self.region_header_pages,
self.page_size,
);
let trailing = if self.trailing_partial_region_pages > 0 {
Some(RegionLayout::new(
self.trailing_partial_region_pages,
self.region_header_pages,
self.page_size,
))
} else {
None
};
DatabaseLayout::new(self.full_regions, full_layout, trailing)
}
pub(super) fn set_layout(&mut self, layout: DatabaseLayout) {
assert_eq!(
self.layout().full_region_layout(),
layout.full_region_layout()
);
if let Some(trailing) = layout.trailing_region_layout() {
assert_eq!(trailing.get_header_pages(), self.region_header_pages);
assert_eq!(trailing.page_size(), self.page_size);
self.trailing_partial_region_pages = trailing.num_pages();
} else {
self.trailing_partial_region_pages = 0;
}
self.full_regions = layout.num_full_regions();
}
pub(super) fn primary_slot(&self) -> &TransactionHeader {
&self.transaction_slots[self.primary_slot]
}
pub(super) fn secondary_slot(&self) -> &TransactionHeader {
&self.transaction_slots[self.primary_slot ^ 1]
}
pub(super) fn secondary_slot_mut(&mut self) -> &mut TransactionHeader {
&mut self.transaction_slots[self.primary_slot ^ 1]
}
pub(super) fn swap_primary_slot(&mut self) {
self.primary_slot ^= 1;
}
pub(super) fn pick_primary_for_repair(
&mut self,
repair_info: HeaderRepairInfo,
) -> Result<bool> {
if self.two_phase_commit {
if repair_info.primary_corrupted {
return Err(StorageError::Corrupted(
"Primary is corrupted despite 2-phase commit".to_string(),
));
}
return Ok(true);
}
if repair_info.primary_corrupted {
if repair_info.secondary_corrupted {
return Err(StorageError::Corrupted(
"Both commit slots are corrupted".to_string(),
));
}
self.swap_primary_slot();
return Ok(false);
}
let secondary_newer =
self.secondary_slot().transaction_id > self.primary_slot().transaction_id;
if secondary_newer && !repair_info.secondary_corrupted {
self.swap_primary_slot();
return Ok(false);
}
Ok(true)
}
pub(super) fn from_bytes(data: &[u8]) -> Result<(Self, HeaderRepairInfo), DatabaseError> {
let invalid_magic_number = data[..MAGICNUMBER.len()] != MAGICNUMBER;
let primary_slot = usize::from(data[GOD_BYTE_OFFSET] & PRIMARY_BIT != 0);
let recovery_required = (data[GOD_BYTE_OFFSET] & RECOVERY_REQUIRED) != 0;
let two_phase_commit = (data[GOD_BYTE_OFFSET] & TWO_PHASE_COMMIT) != 0;
let page_size = get_u32(&data[PAGE_SIZE_OFFSET..]);
let region_header_pages = get_u32(&data[REGION_HEADER_PAGES_OFFSET..]);
let region_max_data_pages = get_u32(&data[REGION_MAX_DATA_PAGES_OFFSET..]);
let full_regions = get_u32(&data[NUM_FULL_REGIONS_OFFSET..]);
let trailing_data_pages = get_u32(&data[TRAILING_REGION_DATA_PAGES_OFFSET..]);
let (slot0, slot0_corrupted) = TransactionHeader::from_bytes(
&data[TRANSACTION_0_OFFSET..(TRANSACTION_0_OFFSET + TRANSACTION_SIZE)],
)?;
let (slot1, slot1_corrupted) = TransactionHeader::from_bytes(
&data[TRANSACTION_1_OFFSET..(TRANSACTION_1_OFFSET + TRANSACTION_SIZE)],
)?;
let (primary_corrupted, secondary_corrupted) = if primary_slot == 0 {
(slot0_corrupted, slot1_corrupted)
} else {
(slot1_corrupted, slot0_corrupted)
};
let primary = if primary_slot == 0 { &slot0 } else { &slot1 };
let compression = if primary.version >= FILE_FORMAT_VERSION4 {
CompressionConfig::from_header_byte(data[COMPRESSION_ALGO_OFFSET])?
} else {
CompressionConfig::None
};
let result = Self {
primary_slot,
recovery_required,
two_phase_commit,
page_size,
region_header_pages,
region_max_data_pages,
full_regions,
trailing_partial_region_pages: trailing_data_pages,
compression,
transaction_slots: [slot0, slot1],
};
let repair = HeaderRepairInfo {
invalid_magic_number,
primary_corrupted,
secondary_corrupted,
};
Ok((result, repair))
}
pub(super) fn to_bytes(&self, include_magic_number: bool) -> [u8; DB_HEADER_SIZE] {
let mut result = [0; DB_HEADER_SIZE];
if include_magic_number {
result[..MAGICNUMBER.len()].copy_from_slice(&MAGICNUMBER);
}
result[GOD_BYTE_OFFSET] = self.primary_slot.try_into().unwrap();
if self.recovery_required {
result[GOD_BYTE_OFFSET] |= RECOVERY_REQUIRED;
}
if self.two_phase_commit {
result[GOD_BYTE_OFFSET] |= TWO_PHASE_COMMIT;
}
result[PAGE_SIZE_OFFSET..(PAGE_SIZE_OFFSET + size_of::<u32>())]
.copy_from_slice(&self.page_size.to_le_bytes());
result[REGION_HEADER_PAGES_OFFSET..(REGION_HEADER_PAGES_OFFSET + size_of::<u32>())]
.copy_from_slice(&self.region_header_pages.to_le_bytes());
result[REGION_MAX_DATA_PAGES_OFFSET..(REGION_MAX_DATA_PAGES_OFFSET + size_of::<u32>())]
.copy_from_slice(&self.region_max_data_pages.to_le_bytes());
result[NUM_FULL_REGIONS_OFFSET..(NUM_FULL_REGIONS_OFFSET + size_of::<u32>())]
.copy_from_slice(&self.full_regions.to_le_bytes());
result[TRAILING_REGION_DATA_PAGES_OFFSET
..(TRAILING_REGION_DATA_PAGES_OFFSET + size_of::<u32>())]
.copy_from_slice(&self.trailing_partial_region_pages.to_le_bytes());
result[COMPRESSION_ALGO_OFFSET] = self.compression.header_byte();
let slot0 = self.transaction_slots[0].to_bytes();
result[TRANSACTION_0_OFFSET..(TRANSACTION_0_OFFSET + slot0.len())].copy_from_slice(&slot0);
let slot1 = self.transaction_slots[1].to_bytes();
result[TRANSACTION_1_OFFSET..(TRANSACTION_1_OFFSET + slot1.len())].copy_from_slice(&slot1);
result
}
}
#[derive(Clone)]
pub(super) struct TransactionHeader {
pub(super) version: u8,
pub(super) user_root: Option<BtreeHeader>,
pub(super) system_root: Option<BtreeHeader>,
pub(super) transaction_id: TransactionId,
pub(super) blob_region_offset: u64,
pub(super) blob_region_length: u64,
pub(super) blob_next_sequence: u64,
pub(super) blob_hlc_state: u64,
}
impl TransactionHeader {
fn new(transaction_id: TransactionId, version: u8) -> Self {
Self {
version,
user_root: None,
system_root: None,
transaction_id,
blob_region_offset: 0,
blob_region_length: 0,
blob_next_sequence: 0,
blob_hlc_state: 0,
}
}
pub(super) fn from_bytes(data: &[u8]) -> Result<(Self, bool), DatabaseError> {
let version = data[VERSION_OFFSET];
match version {
FILE_FORMAT_VERSION1 | FILE_FORMAT_VERSION2 => {
return Err(DatabaseError::UpgradeRequired(version));
}
FILE_FORMAT_VERSION3 | FILE_FORMAT_VERSION4 | FILE_FORMAT_VERSION5 => {}
_ => {
return Err(StorageError::format_error(format!(
"Expected file format version <= {FILE_FORMAT_VERSION5}, found {version}",
))
.into());
}
}
let checksum = Checksum::from_le_bytes(
data[SLOT_CHECKSUM_OFFSET..(SLOT_CHECKSUM_OFFSET + size_of::<Checksum>())]
.try_into()
.map_err(|_| {
StorageError::Corrupted("commit slot: checksum field truncated".into())
})?,
);
let corrupted = checksum != xxh3_checksum(&data[..SLOT_CHECKSUM_OFFSET]);
let user_root = if data[USER_ROOT_NON_NULL_OFFSET] != 0 {
Some(BtreeHeader::from_le_bytes(
data[USER_ROOT_OFFSET..(USER_ROOT_OFFSET + BtreeHeader::serialized_size())]
.try_into()
.map_err(|_| {
StorageError::Corrupted("commit slot: user_root truncated".into())
})?,
))
} else {
None
};
let system_root = if data[SYSTEM_ROOT_NON_NULL_OFFSET] != 0 {
Some(BtreeHeader::from_le_bytes(
data[SYSTEM_ROOT_OFFSET..(SYSTEM_ROOT_OFFSET + BtreeHeader::serialized_size())]
.try_into()
.map_err(|_| {
StorageError::Corrupted("commit slot: system_root truncated".into())
})?,
))
} else {
None
};
let transaction_id = TransactionId::new(get_u64(&data[TRANSACTION_ID_OFFSET..]));
let (blob_region_offset, blob_region_length, blob_next_sequence, blob_hlc_state) =
if version >= FILE_FORMAT_VERSION5 {
(
get_u64(&data[BLOB_REGION_OFFSET_OFFSET..]),
get_u64(&data[BLOB_REGION_LENGTH_OFFSET..]),
get_u64(&data[BLOB_NEXT_SEQUENCE_OFFSET..]),
get_u64(&data[BLOB_HLC_STATE_OFFSET..]),
)
} else {
(0, 0, 0, 0)
};
let result = Self {
version,
user_root,
system_root,
transaction_id,
blob_region_offset,
blob_region_length,
blob_next_sequence,
blob_hlc_state,
};
Ok((result, corrupted))
}
pub(super) fn to_bytes(&self) -> [u8; TRANSACTION_SIZE] {
assert!(
self.version == FILE_FORMAT_VERSION3
|| self.version == FILE_FORMAT_VERSION4
|| self.version == FILE_FORMAT_VERSION5
);
let mut result = [0; TRANSACTION_SIZE];
result[VERSION_OFFSET] = self.version;
if let Some(header) = self.user_root {
result[USER_ROOT_NON_NULL_OFFSET] = 1;
result[USER_ROOT_OFFSET..(USER_ROOT_OFFSET + BtreeHeader::serialized_size())]
.copy_from_slice(&header.to_le_bytes());
}
if let Some(header) = self.system_root {
result[SYSTEM_ROOT_NON_NULL_OFFSET] = 1;
result[SYSTEM_ROOT_OFFSET..(SYSTEM_ROOT_OFFSET + BtreeHeader::serialized_size())]
.copy_from_slice(&header.to_le_bytes());
}
result[BLOB_REGION_OFFSET_OFFSET..(BLOB_REGION_OFFSET_OFFSET + size_of::<u64>())]
.copy_from_slice(&self.blob_region_offset.to_le_bytes());
result[BLOB_REGION_LENGTH_OFFSET..(BLOB_REGION_LENGTH_OFFSET + size_of::<u64>())]
.copy_from_slice(&self.blob_region_length.to_le_bytes());
result[BLOB_NEXT_SEQUENCE_OFFSET..(BLOB_NEXT_SEQUENCE_OFFSET + size_of::<u64>())]
.copy_from_slice(&self.blob_next_sequence.to_le_bytes());
result[BLOB_HLC_STATE_OFFSET..(BLOB_HLC_STATE_OFFSET + size_of::<u64>())]
.copy_from_slice(&self.blob_hlc_state.to_le_bytes());
result[TRANSACTION_ID_OFFSET..(TRANSACTION_ID_OFFSET + size_of::<u64>())]
.copy_from_slice(&self.transaction_id.raw_id().to_le_bytes());
let checksum = xxh3_checksum(&result[..SLOT_CHECKSUM_OFFSET]);
result[SLOT_CHECKSUM_OFFSET..(SLOT_CHECKSUM_OFFSET + size_of::<Checksum>())]
.copy_from_slice(&checksum.to_le_bytes());
result
}
}
#[cfg(test)]
mod test {
use crate::backends::FileBackend;
use crate::db::TableDefinition;
use crate::error::BackendError;
use crate::tree_store::page_store::header::{
DB_HEADER_SIZE, GOD_BYTE_OFFSET, MAGICNUMBER, PRIMARY_BIT, RECOVERY_REQUIRED,
TRANSACTION_0_OFFSET, TRANSACTION_1_OFFSET, TWO_PHASE_COMMIT, USER_ROOT_OFFSET,
};
use crate::{Database, DatabaseError, ReadableTable, StorageBackend};
use crate::{ReadableDatabase, StorageError};
use std::fs::OpenOptions;
use std::io::{ErrorKind, Read, Seek, SeekFrom, Write};
use std::mem::size_of;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
const X: TableDefinition<&str, &str> = TableDefinition::new("x");
#[derive(Debug)]
struct FailingBackend {
inner: FileBackend,
fail: Arc<AtomicBool>,
}
impl FailingBackend {
fn new(backend: FileBackend) -> Self {
Self {
inner: backend,
fail: Arc::new(AtomicBool::new(false)),
}
}
fn check_fail(&self) -> Result<(), BackendError> {
if self.fail.load(Ordering::SeqCst) {
return Err(BackendError::from(std::io::Error::from(ErrorKind::Other)));
}
Ok(())
}
}
impl StorageBackend for FailingBackend {
fn len(&self) -> Result<u64, BackendError> {
self.check_fail()?;
self.inner.len()
}
fn read(&self, offset: u64, out: &mut [u8]) -> Result<(), BackendError> {
self.check_fail()?;
self.inner.read(offset, out)
}
fn set_len(&self, len: u64) -> Result<(), BackendError> {
self.check_fail()?;
self.inner.set_len(len)
}
fn sync_data(&self) -> Result<(), BackendError> {
self.check_fail()?;
self.inner.sync_data()
}
fn write(&self, offset: u64, data: &[u8]) -> Result<(), BackendError> {
self.check_fail()?;
self.inner.write(offset, data)
}
fn close(&self) -> Result<(), BackendError> {
self.inner.close()
}
}
#[test]
fn repair_allocator_checksums() {
let tmpfile = crate::create_tempfile();
let cloned = OpenOptions::new()
.read(true)
.write(true)
.open(tmpfile.path())
.unwrap();
let backend = FailingBackend::new(FileBackend::new(cloned).unwrap());
let fail = backend.fail.clone();
let db = Database::builder().create_with_backend(backend).unwrap();
let write_txn = db.begin_write().unwrap();
{
let mut table = write_txn.open_table(X).unwrap();
table.insert("hello", "world").unwrap();
}
write_txn.commit().unwrap();
let read_txn = db.begin_read().unwrap();
let mut write_txn = db.begin_write().unwrap();
{
write_txn.set_quick_repair(true);
let mut table = write_txn.open_table(X).unwrap();
table.insert("hello", "world2").unwrap();
}
write_txn.commit().unwrap();
drop(read_txn);
fail.store(true, Ordering::SeqCst);
drop(db);
let mut file = tmpfile.as_file();
file.seek(SeekFrom::Start(GOD_BYTE_OFFSET as u64)).unwrap();
let mut buffer = [0u8; 1];
file.read_exact(&mut buffer).unwrap();
file.seek(SeekFrom::Start(GOD_BYTE_OFFSET as u64)).unwrap();
buffer[0] |= RECOVERY_REQUIRED;
buffer[0] &= !TWO_PHASE_COMMIT;
file.write_all(&buffer).unwrap();
let primary_slot_offset = if buffer[0] & PRIMARY_BIT == 0 {
TRANSACTION_0_OFFSET
} else {
TRANSACTION_1_OFFSET
};
file.seek(SeekFrom::Start(
(primary_slot_offset + USER_ROOT_OFFSET) as u64,
))
.unwrap();
file.write_all(&[0; size_of::<u128>()]).unwrap();
#[allow(unused_mut)]
let mut db2 = Database::create(tmpfile.path()).unwrap();
let write_txn = db2.begin_write().unwrap();
{
let mut table = write_txn.open_table(X).unwrap();
assert_eq!(table.get("hello").unwrap().unwrap().value(), "world");
table.insert("hello2", "world2").unwrap();
}
write_txn.commit().unwrap();
#[cfg(not(target_os = "windows"))]
{
file.seek(SeekFrom::Start(GOD_BYTE_OFFSET as u64)).unwrap();
let mut buffer = [0u8; 1];
file.read_exact(&mut buffer).unwrap();
let primary_slot_offset = if buffer[0] & PRIMARY_BIT == 0 {
TRANSACTION_0_OFFSET
} else {
TRANSACTION_1_OFFSET
};
file.seek(SeekFrom::Start(
(primary_slot_offset + USER_ROOT_OFFSET) as u64,
))
.unwrap();
file.write_all(&[0; size_of::<u128>()]).unwrap();
assert!(!db2.check_integrity().unwrap());
file.seek(SeekFrom::Start(GOD_BYTE_OFFSET as u64)).unwrap();
let mut buffer = [0u8; 1];
file.read_exact(&mut buffer).unwrap();
file.seek(SeekFrom::Start(
(TRANSACTION_0_OFFSET + USER_ROOT_OFFSET) as u64,
))
.unwrap();
file.write_all(&[0; size_of::<u128>()]).unwrap();
file.seek(SeekFrom::Start(
(TRANSACTION_1_OFFSET + USER_ROOT_OFFSET) as u64,
))
.unwrap();
file.write_all(&[0; size_of::<u128>()]).unwrap();
assert!(matches!(
db2.check_integrity().unwrap_err(),
DatabaseError::Storage(StorageError::Corrupted(_))
));
}
}
#[test]
fn repair_empty() {
let tmpfile = crate::create_tempfile();
let db = Database::builder().create(tmpfile.path()).unwrap();
drop(db);
let mut file = tmpfile.as_file();
file.seek(SeekFrom::Start(GOD_BYTE_OFFSET as u64)).unwrap();
let mut buffer = [0u8; 1];
file.read_exact(&mut buffer).unwrap();
file.seek(SeekFrom::Start(GOD_BYTE_OFFSET as u64)).unwrap();
buffer[0] |= RECOVERY_REQUIRED;
file.write_all(&buffer).unwrap();
Database::open(tmpfile.path()).unwrap();
}
#[test]
fn close_on_drop() {
let tmpfile = crate::create_tempfile();
let db = Database::builder()
.set_cache_size(0)
.create(tmpfile.path())
.unwrap();
let table_def: TableDefinition<u64, u64> = TableDefinition::new("x");
let txn = db.begin_write().unwrap();
{
let mut table = txn.open_table(table_def).unwrap();
table.insert(0, 0).unwrap();
}
txn.commit().unwrap();
let txn = db.begin_read().unwrap();
drop(db);
assert!(matches!(
txn.list_tables().err().unwrap(),
StorageError::DatabaseClosed
));
let mut file = tmpfile.as_file();
file.seek(SeekFrom::Start(GOD_BYTE_OFFSET as u64)).unwrap();
let mut buffer = [0u8; 1];
file.read_exact(&mut buffer).unwrap();
assert_eq!(buffer[0] & RECOVERY_REQUIRED, 0);
drop(txn);
}
#[test]
fn abort_repair() {
let tmpfile = crate::create_tempfile();
let db = Database::builder().create(tmpfile.path()).unwrap();
drop(db);
let mut file = tmpfile.as_file();
file.seek(SeekFrom::Start(GOD_BYTE_OFFSET as u64)).unwrap();
let mut buffer = [0u8; 1];
file.read_exact(&mut buffer).unwrap();
file.seek(SeekFrom::Start(GOD_BYTE_OFFSET as u64)).unwrap();
buffer[0] |= RECOVERY_REQUIRED;
buffer[0] &= !TWO_PHASE_COMMIT;
file.write_all(&buffer).unwrap();
let err = Database::builder()
.set_repair_callback(|handle| handle.abort())
.open(tmpfile.path())
.unwrap_err();
assert!(matches!(err, DatabaseError::RepairAborted));
}
#[test]
fn repair_insert_reserve_regression() {
let tmpfile = crate::create_tempfile();
let db = Database::builder().create(tmpfile.path()).unwrap();
let def: TableDefinition<&str, &[u8]> = TableDefinition::new("x");
let write_txn = db.begin_write().unwrap();
{
let mut table = write_txn.open_table(def).unwrap();
let mut value = table.insert_reserve("hello", 5).unwrap();
value.as_mut().copy_from_slice(b"world");
}
write_txn.commit().unwrap();
let write_txn = db.begin_write().unwrap();
{
let mut table = write_txn.open_table(def).unwrap();
let mut value = table.insert_reserve("hello2", 5).unwrap();
value.as_mut().copy_from_slice(b"world");
}
write_txn.commit().unwrap();
drop(db);
let mut file = tmpfile.as_file();
file.seek(SeekFrom::Start(GOD_BYTE_OFFSET as u64)).unwrap();
let mut buffer = [0u8; 1];
file.read_exact(&mut buffer).unwrap();
file.seek(SeekFrom::Start(GOD_BYTE_OFFSET as u64)).unwrap();
buffer[0] |= RECOVERY_REQUIRED;
file.write_all(&buffer).unwrap();
Database::open(tmpfile.path()).unwrap();
}
#[test]
fn magic_number() {
#[allow(invalid_from_utf8)]
{
assert!(std::str::from_utf8(&MAGICNUMBER).is_err());
}
assert!(MAGICNUMBER.iter().any(|x| *x & 0x80 != 0));
assert!(MAGICNUMBER.iter().any(|x| *x < 0x20 || *x > 0x7E));
assert!(MAGICNUMBER.iter().any(|x| *x >= 0x20 && *x <= 0x7E));
assert!(MAGICNUMBER.iter().any(|x| *x >= 0xA0));
assert!(MAGICNUMBER.iter().any(|x| *x < 0x09
|| *x == 0x0B
|| (0x0E <= *x && *x <= 0x1F)
|| (0x7F <= *x && *x <= 0x9F)));
}
#[test]
fn mirror_magic_distinct() {
use super::MIRROR_MAGIC;
assert_ne!(MAGICNUMBER, MIRROR_MAGIC);
assert_eq!(MAGICNUMBER.len(), MIRROR_MAGIC.len());
assert_eq!(&MAGICNUMBER[1..], &MIRROR_MAGIC[1..]);
}
#[test]
fn mirror_recovery_corrupted_primary() {
let tmpfile = crate::create_tempfile();
let db = Database::builder().create(tmpfile.path()).unwrap();
let write_txn = db.begin_write().unwrap();
{
let mut table = write_txn.open_table(X).unwrap();
table.insert("mirror", "test").unwrap();
}
write_txn.commit().unwrap();
drop(db);
let mut file = tmpfile.as_file();
file.seek(SeekFrom::Start(
(TRANSACTION_0_OFFSET + USER_ROOT_OFFSET) as u64,
))
.unwrap();
file.write_all(&[0xFF; size_of::<u128>()]).unwrap();
file.seek(SeekFrom::Start(
(TRANSACTION_1_OFFSET + USER_ROOT_OFFSET) as u64,
))
.unwrap();
file.write_all(&[0xFF; size_of::<u128>()]).unwrap();
let db2 = Database::open(tmpfile.path()).unwrap();
let read_txn = db2.begin_read().unwrap();
let table = read_txn.open_table(X).unwrap();
assert_eq!(table.get("mirror").unwrap().unwrap().value(), "test");
}
#[test]
fn old_database_without_mirror_opens_normally() {
let tmpfile = crate::create_tempfile();
let db = Database::builder().create(tmpfile.path()).unwrap();
drop(db);
let mut file = tmpfile.as_file();
let header_end = { file.seek(SeekFrom::End(0)).unwrap() };
if header_end > DB_HEADER_SIZE as u64 {
file.set_len(header_end).unwrap();
}
Database::open(tmpfile.path()).unwrap();
}
#[test]
fn both_primary_and_mirror_corrupted_returns_error() {
let tmpfile = crate::create_tempfile();
let db = Database::builder().create(tmpfile.path()).unwrap();
let write_txn = db.begin_write().unwrap();
{
let mut table = write_txn.open_table(X).unwrap();
table.insert("key", "val").unwrap();
}
write_txn.commit().unwrap();
drop(db);
let mut file = tmpfile.as_file();
let file_len = file.seek(SeekFrom::End(0)).unwrap();
file.seek(SeekFrom::Start(
(TRANSACTION_0_OFFSET + USER_ROOT_OFFSET) as u64,
))
.unwrap();
file.write_all(&[0xFF; size_of::<u128>()]).unwrap();
file.seek(SeekFrom::Start(
(TRANSACTION_1_OFFSET + USER_ROOT_OFFSET) as u64,
))
.unwrap();
file.write_all(&[0xFF; size_of::<u128>()]).unwrap();
let mirror_offset = file_len - DB_HEADER_SIZE as u64;
file.seek(SeekFrom::Start(mirror_offset)).unwrap();
file.write_all(&[0x00; DB_HEADER_SIZE]).unwrap();
let result = Database::open(tmpfile.path());
assert!(result.is_err());
}
}