use super::*;
const DWB_MAGIC: [u8; 4] = [0x52, 0x44, 0x44, 0x57];
impl Pager {
pub fn open<P: AsRef<Path>>(path: P, config: PagerConfig) -> Result<Self, PagerError> {
let path = path.as_ref().to_path_buf();
let exists = path.exists();
if !exists && !config.create {
return Err(PagerError::InvalidDatabase(
"Database does not exist".into(),
));
}
if !exists && config.read_only {
return Err(PagerError::InvalidDatabase(
"Cannot create read-only database".into(),
));
}
let file = OpenOptions::new()
.read(true)
.write(!config.read_only)
.create(config.create && !config.read_only)
.open(&path)?;
let lock_file = if !config.read_only {
let lf = OpenOptions::new().read(true).write(true).open(&path)?;
lf.try_lock_exclusive().map_err(|_| PagerError::Locked)?;
Some(lf)
} else {
let lf = OpenOptions::new().read(true).open(&path)?;
match lf.try_lock_shared() {
Ok(_) => Some(lf),
Err(_) => None,
}
};
let dwb_file = if config.double_write && !config.read_only {
let f = Self::open_dwb_file(&path)?;
Some(Mutex::new(f))
} else {
None
};
let mut pager = Self {
path,
file: Mutex::new(file),
_lock_file: lock_file,
dwb_file,
cache: PageCache::new(config.cache_size),
freelist: RwLock::new(FreeList::new()),
header: RwLock::new(DatabaseHeader::default()),
config,
header_dirty: Mutex::new(false),
wal: RwLock::new(None),
encryption: None,
};
if exists {
pager.recover_from_dwb()?;
pager.load_header()?;
pager.bind_encryption_for_existing()?;
} else {
pager.initialize()?;
pager.bind_encryption_for_new()?;
}
Ok(pager)
}
fn bind_encryption_for_existing(&mut self) -> Result<(), PagerError> {
const ENCRYPTION_MARKER_OFFSET: usize = HEADER_SIZE + 32;
const ENCRYPTION_MARKER: &[u8; 4] = b"RDBE";
if self.page_count().unwrap_or(0) == 0 {
return self.bind_encryption_for_new();
}
let header_page = self.read_page_no_checksum(0)?;
let data = header_page.as_bytes();
let has_marker = data.len() > ENCRYPTION_MARKER_OFFSET + 4
&& &data[ENCRYPTION_MARKER_OFFSET..ENCRYPTION_MARKER_OFFSET + 4] == ENCRYPTION_MARKER;
let key = self.config.encryption.clone();
match (has_marker, key) {
(true, Some(key)) => {
let header_start = ENCRYPTION_MARKER_OFFSET + 4;
let header =
crate::storage::encryption::EncryptionHeader::from_bytes(&data[header_start..])
.map_err(|e| {
PagerError::InvalidDatabase(format!(
"encryption header parse failed: {e}"
))
})?;
if !header.validate(&key) {
return Err(PagerError::InvalidKey);
}
let encryptor = crate::storage::encryption::PageEncryptor::new(key);
self.encryption = Some((encryptor, header));
Ok(())
}
(true, None) => Err(PagerError::EncryptionRequired),
(false, Some(_)) => Err(PagerError::PlainDatabaseRefusesKey),
(false, None) => Ok(()),
}
}
fn bind_encryption_for_new(&mut self) -> Result<(), PagerError> {
const ENCRYPTION_MARKER_OFFSET: usize = HEADER_SIZE + 32;
const ENCRYPTION_MARKER: &[u8; 4] = b"RDBE";
let Some(key) = self.config.encryption.clone() else {
return Ok(());
};
let header = crate::storage::encryption::EncryptionHeader::new(&key);
let encryptor = crate::storage::encryption::PageEncryptor::new(key);
if self.page_count().unwrap_or(0) > 0 {
let mut page = self.read_page_no_checksum(0)?;
let data = page.as_bytes_mut();
data[ENCRYPTION_MARKER_OFFSET..ENCRYPTION_MARKER_OFFSET + 4]
.copy_from_slice(ENCRYPTION_MARKER);
let header_bytes = header.to_bytes();
let header_start = ENCRYPTION_MARKER_OFFSET + 4;
data[header_start..header_start + header_bytes.len()].copy_from_slice(&header_bytes);
self.write_page_no_checksum(0, page)?;
}
self.encryption = Some((encryptor, header));
Ok(())
}
pub fn open_default<P: AsRef<Path>>(path: P) -> Result<Self, PagerError> {
Self::open(path, PagerConfig::default())
}
fn initialize(&self) -> Result<(), PagerError> {
if self.config.read_only {
return Err(PagerError::ReadOnly);
}
let initial_page_count = 3;
let header_page = Page::new_header_page(initial_page_count);
self.header_write()?.page_count = initial_page_count;
self.write_page_raw(0, &header_page)?;
let mut metadata_page = Page::new(PageType::Header, 1);
metadata_page.update_checksum();
self.write_page_raw(1, &metadata_page)?;
let mut vault_page = Page::new(PageType::Vault, 2);
vault_page.update_checksum();
self.write_page_raw(2, &vault_page)?;
self.sync()?;
Ok(())
}
fn header_write(&self) -> Result<std::sync::RwLockWriteGuard<'_, DatabaseHeader>, PagerError> {
self.header.write().map_err(|_| PagerError::LockPoisoned)
}
fn header_read(&self) -> Result<std::sync::RwLockReadGuard<'_, DatabaseHeader>, PagerError> {
self.header.read().map_err(|_| PagerError::LockPoisoned)
}
fn freelist_write(&self) -> Result<std::sync::RwLockWriteGuard<'_, FreeList>, PagerError> {
self.freelist.write().map_err(|_| PagerError::LockPoisoned)
}
fn file_lock(&self) -> Result<std::sync::MutexGuard<'_, File>, PagerError> {
self.file.lock().map_err(|_| PagerError::LockPoisoned)
}
fn header_dirty_lock(&self) -> Result<std::sync::MutexGuard<'_, bool>, PagerError> {
self.header_dirty
.lock()
.map_err(|_| PagerError::LockPoisoned)
}
fn load_header(&self) -> Result<(), PagerError> {
let header_page = match self.read_page_raw(0) {
Ok(page) => {
let magic = &page.as_bytes()[HEADER_SIZE..HEADER_SIZE + 4];
if magic == MAGIC_BYTES {
page
} else {
self.recover_header_from_shadow()?
}
}
Err(_) => self.recover_header_from_shadow()?,
};
let data = header_page.as_bytes();
let version = u32::from_le_bytes([
data[HEADER_SIZE + 4],
data[HEADER_SIZE + 5],
data[HEADER_SIZE + 6],
data[HEADER_SIZE + 7],
]);
let page_size = u32::from_le_bytes([
data[HEADER_SIZE + 8],
data[HEADER_SIZE + 9],
data[HEADER_SIZE + 10],
data[HEADER_SIZE + 11],
]);
if page_size != PAGE_SIZE as u32 {
return Err(PagerError::InvalidDatabase(format!(
"Unsupported page size: {}",
page_size
)));
}
if version > DB_VERSION {
return Err(PagerError::InvalidDatabase(format!(
"Unsupported database version: file version {version} is newer than supported {DB_VERSION}"
)));
}
let page_count = u32::from_le_bytes([
data[HEADER_SIZE + 12],
data[HEADER_SIZE + 13],
data[HEADER_SIZE + 14],
data[HEADER_SIZE + 15],
]);
let freelist_head = u32::from_le_bytes([
data[HEADER_SIZE + 16],
data[HEADER_SIZE + 17],
data[HEADER_SIZE + 18],
data[HEADER_SIZE + 19],
]);
let schema_version = u32::from_le_bytes([
data[HEADER_SIZE + 20],
data[HEADER_SIZE + 21],
data[HEADER_SIZE + 22],
data[HEADER_SIZE + 23],
]);
let checkpoint_lsn = u64::from_le_bytes([
data[HEADER_SIZE + 24],
data[HEADER_SIZE + 25],
data[HEADER_SIZE + 26],
data[HEADER_SIZE + 27],
data[HEADER_SIZE + 28],
data[HEADER_SIZE + 29],
data[HEADER_SIZE + 30],
data[HEADER_SIZE + 31],
]);
let physical_format_version = u32::from_le_bytes([
data[HEADER_SIZE + 32],
data[HEADER_SIZE + 33],
data[HEADER_SIZE + 34],
data[HEADER_SIZE + 35],
]);
let physical_sequence = u64::from_le_bytes([
data[HEADER_SIZE + 36],
data[HEADER_SIZE + 37],
data[HEADER_SIZE + 38],
data[HEADER_SIZE + 39],
data[HEADER_SIZE + 40],
data[HEADER_SIZE + 41],
data[HEADER_SIZE + 42],
data[HEADER_SIZE + 43],
]);
let manifest_root = u64::from_le_bytes([
data[HEADER_SIZE + 44],
data[HEADER_SIZE + 45],
data[HEADER_SIZE + 46],
data[HEADER_SIZE + 47],
data[HEADER_SIZE + 48],
data[HEADER_SIZE + 49],
data[HEADER_SIZE + 50],
data[HEADER_SIZE + 51],
]);
let manifest_oldest_root = u64::from_le_bytes([
data[HEADER_SIZE + 52],
data[HEADER_SIZE + 53],
data[HEADER_SIZE + 54],
data[HEADER_SIZE + 55],
data[HEADER_SIZE + 56],
data[HEADER_SIZE + 57],
data[HEADER_SIZE + 58],
data[HEADER_SIZE + 59],
]);
let free_set_root = u64::from_le_bytes([
data[HEADER_SIZE + 60],
data[HEADER_SIZE + 61],
data[HEADER_SIZE + 62],
data[HEADER_SIZE + 63],
data[HEADER_SIZE + 64],
data[HEADER_SIZE + 65],
data[HEADER_SIZE + 66],
data[HEADER_SIZE + 67],
]);
let manifest_page = u32::from_le_bytes([
data[HEADER_SIZE + 68],
data[HEADER_SIZE + 69],
data[HEADER_SIZE + 70],
data[HEADER_SIZE + 71],
]);
let manifest_checksum = u64::from_le_bytes([
data[HEADER_SIZE + 72],
data[HEADER_SIZE + 73],
data[HEADER_SIZE + 74],
data[HEADER_SIZE + 75],
data[HEADER_SIZE + 76],
data[HEADER_SIZE + 77],
data[HEADER_SIZE + 78],
data[HEADER_SIZE + 79],
]);
let collection_roots_page = u32::from_le_bytes([
data[HEADER_SIZE + 80],
data[HEADER_SIZE + 81],
data[HEADER_SIZE + 82],
data[HEADER_SIZE + 83],
]);
let collection_roots_checksum = u64::from_le_bytes([
data[HEADER_SIZE + 84],
data[HEADER_SIZE + 85],
data[HEADER_SIZE + 86],
data[HEADER_SIZE + 87],
data[HEADER_SIZE + 88],
data[HEADER_SIZE + 89],
data[HEADER_SIZE + 90],
data[HEADER_SIZE + 91],
]);
let collection_root_count = u32::from_le_bytes([
data[HEADER_SIZE + 92],
data[HEADER_SIZE + 93],
data[HEADER_SIZE + 94],
data[HEADER_SIZE + 95],
]);
let snapshot_count = u32::from_le_bytes([
data[HEADER_SIZE + 96],
data[HEADER_SIZE + 97],
data[HEADER_SIZE + 98],
data[HEADER_SIZE + 99],
]);
let index_count = u32::from_le_bytes([
data[HEADER_SIZE + 100],
data[HEADER_SIZE + 101],
data[HEADER_SIZE + 102],
data[HEADER_SIZE + 103],
]);
let catalog_collection_count = u32::from_le_bytes([
data[HEADER_SIZE + 104],
data[HEADER_SIZE + 105],
data[HEADER_SIZE + 106],
data[HEADER_SIZE + 107],
]);
let catalog_total_entities = u64::from_le_bytes([
data[HEADER_SIZE + 108],
data[HEADER_SIZE + 109],
data[HEADER_SIZE + 110],
data[HEADER_SIZE + 111],
data[HEADER_SIZE + 112],
data[HEADER_SIZE + 113],
data[HEADER_SIZE + 114],
data[HEADER_SIZE + 115],
]);
let export_count = u32::from_le_bytes([
data[HEADER_SIZE + 116],
data[HEADER_SIZE + 117],
data[HEADER_SIZE + 118],
data[HEADER_SIZE + 119],
]);
let graph_projection_count = u32::from_le_bytes([
data[HEADER_SIZE + 120],
data[HEADER_SIZE + 121],
data[HEADER_SIZE + 122],
data[HEADER_SIZE + 123],
]);
let analytics_job_count = u32::from_le_bytes([
data[HEADER_SIZE + 124],
data[HEADER_SIZE + 125],
data[HEADER_SIZE + 126],
data[HEADER_SIZE + 127],
]);
let manifest_event_count = u32::from_le_bytes([
data[HEADER_SIZE + 128],
data[HEADER_SIZE + 129],
data[HEADER_SIZE + 130],
data[HEADER_SIZE + 131],
]);
let registry_page = u32::from_le_bytes([
data[HEADER_SIZE + 132],
data[HEADER_SIZE + 133],
data[HEADER_SIZE + 134],
data[HEADER_SIZE + 135],
]);
let registry_checksum = u64::from_le_bytes([
data[HEADER_SIZE + 136],
data[HEADER_SIZE + 137],
data[HEADER_SIZE + 138],
data[HEADER_SIZE + 139],
data[HEADER_SIZE + 140],
data[HEADER_SIZE + 141],
data[HEADER_SIZE + 142],
data[HEADER_SIZE + 143],
]);
let recovery_page = u32::from_le_bytes([
data[HEADER_SIZE + 144],
data[HEADER_SIZE + 145],
data[HEADER_SIZE + 146],
data[HEADER_SIZE + 147],
]);
let recovery_checksum = u64::from_le_bytes([
data[HEADER_SIZE + 148],
data[HEADER_SIZE + 149],
data[HEADER_SIZE + 150],
data[HEADER_SIZE + 151],
data[HEADER_SIZE + 152],
data[HEADER_SIZE + 153],
data[HEADER_SIZE + 154],
data[HEADER_SIZE + 155],
]);
let catalog_page = u32::from_le_bytes([
data[HEADER_SIZE + 156],
data[HEADER_SIZE + 157],
data[HEADER_SIZE + 158],
data[HEADER_SIZE + 159],
]);
let catalog_checksum = u64::from_le_bytes([
data[HEADER_SIZE + 160],
data[HEADER_SIZE + 161],
data[HEADER_SIZE + 162],
data[HEADER_SIZE + 163],
data[HEADER_SIZE + 164],
data[HEADER_SIZE + 165],
data[HEADER_SIZE + 166],
data[HEADER_SIZE + 167],
]);
let metadata_state_page = u32::from_le_bytes([
data[HEADER_SIZE + 168],
data[HEADER_SIZE + 169],
data[HEADER_SIZE + 170],
data[HEADER_SIZE + 171],
]);
let metadata_state_checksum = u64::from_le_bytes([
data[HEADER_SIZE + 172],
data[HEADER_SIZE + 173],
data[HEADER_SIZE + 174],
data[HEADER_SIZE + 175],
data[HEADER_SIZE + 176],
data[HEADER_SIZE + 177],
data[HEADER_SIZE + 178],
data[HEADER_SIZE + 179],
]);
let vector_artifact_page = u32::from_le_bytes([
data[HEADER_SIZE + 180],
data[HEADER_SIZE + 181],
data[HEADER_SIZE + 182],
data[HEADER_SIZE + 183],
]);
let vector_artifact_checksum = u64::from_le_bytes([
data[HEADER_SIZE + 184],
data[HEADER_SIZE + 185],
data[HEADER_SIZE + 186],
data[HEADER_SIZE + 187],
data[HEADER_SIZE + 188],
data[HEADER_SIZE + 189],
data[HEADER_SIZE + 190],
data[HEADER_SIZE + 191],
]);
let checkpoint_in_progress = data[HEADER_SIZE + 192] != 0;
let checkpoint_target_lsn = u64::from_le_bytes([
data[HEADER_SIZE + 193],
data[HEADER_SIZE + 194],
data[HEADER_SIZE + 195],
data[HEADER_SIZE + 196],
data[HEADER_SIZE + 197],
data[HEADER_SIZE + 198],
data[HEADER_SIZE + 199],
data[HEADER_SIZE + 200],
]);
{
let mut header = self.header_write()?;
header.version = version;
header.page_size = page_size;
header.page_count = page_count;
header.freelist_head = freelist_head;
header.schema_version = schema_version;
header.checkpoint_lsn = checkpoint_lsn;
header.checkpoint_in_progress = checkpoint_in_progress;
header.checkpoint_target_lsn = checkpoint_target_lsn;
header.physical = PhysicalFileHeader {
format_version: physical_format_version,
sequence: physical_sequence,
manifest_oldest_root,
manifest_root,
free_set_root,
manifest_page,
manifest_checksum,
collection_roots_page,
collection_roots_checksum,
collection_root_count,
snapshot_count,
index_count,
catalog_collection_count,
catalog_total_entities,
export_count,
graph_projection_count,
analytics_job_count,
manifest_event_count,
registry_page,
registry_checksum,
recovery_page,
recovery_checksum,
catalog_page,
catalog_checksum,
metadata_state_page,
metadata_state_checksum,
vector_artifact_page,
vector_artifact_checksum,
};
}
{
let mut freelist = self.freelist_write()?;
*freelist = FreeList::from_header(freelist_head, 0);
}
Ok(())
}
fn write_header(&self) -> Result<(), PagerError> {
if self.config.read_only {
return Err(PagerError::ReadOnly);
}
let header = self.header_read()?;
let mut page = if let Some(cached) = self.cache.get(0) {
cached
} else {
let file = self.file_lock()?;
let len = file.metadata().map(|m| m.len()).unwrap_or(0);
drop(file);
if len >= PAGE_SIZE as u64 {
self.read_page_raw(0)?
} else {
Page::new(PageType::Header, 0)
}
};
let data = page.as_bytes_mut();
data[HEADER_SIZE..HEADER_SIZE + 4].copy_from_slice(&MAGIC_BYTES);
data[HEADER_SIZE + 4..HEADER_SIZE + 8].copy_from_slice(&header.version.to_le_bytes());
data[HEADER_SIZE + 8..HEADER_SIZE + 12].copy_from_slice(&header.page_size.to_le_bytes());
data[HEADER_SIZE + 12..HEADER_SIZE + 16].copy_from_slice(&header.page_count.to_le_bytes());
data[HEADER_SIZE + 16..HEADER_SIZE + 20]
.copy_from_slice(&header.freelist_head.to_le_bytes());
data[HEADER_SIZE + 20..HEADER_SIZE + 24]
.copy_from_slice(&header.schema_version.to_le_bytes());
data[HEADER_SIZE + 24..HEADER_SIZE + 32]
.copy_from_slice(&header.checkpoint_lsn.to_le_bytes());
data[HEADER_SIZE + 32..HEADER_SIZE + 36]
.copy_from_slice(&header.physical.format_version.to_le_bytes());
data[HEADER_SIZE + 36..HEADER_SIZE + 44]
.copy_from_slice(&header.physical.sequence.to_le_bytes());
data[HEADER_SIZE + 44..HEADER_SIZE + 52]
.copy_from_slice(&header.physical.manifest_root.to_le_bytes());
data[HEADER_SIZE + 52..HEADER_SIZE + 60]
.copy_from_slice(&header.physical.manifest_oldest_root.to_le_bytes());
data[HEADER_SIZE + 60..HEADER_SIZE + 68]
.copy_from_slice(&header.physical.free_set_root.to_le_bytes());
data[HEADER_SIZE + 68..HEADER_SIZE + 72]
.copy_from_slice(&header.physical.manifest_page.to_le_bytes());
data[HEADER_SIZE + 72..HEADER_SIZE + 80]
.copy_from_slice(&header.physical.manifest_checksum.to_le_bytes());
data[HEADER_SIZE + 80..HEADER_SIZE + 84]
.copy_from_slice(&header.physical.collection_roots_page.to_le_bytes());
data[HEADER_SIZE + 84..HEADER_SIZE + 92]
.copy_from_slice(&header.physical.collection_roots_checksum.to_le_bytes());
data[HEADER_SIZE + 92..HEADER_SIZE + 96]
.copy_from_slice(&header.physical.collection_root_count.to_le_bytes());
data[HEADER_SIZE + 96..HEADER_SIZE + 100]
.copy_from_slice(&header.physical.snapshot_count.to_le_bytes());
data[HEADER_SIZE + 100..HEADER_SIZE + 104]
.copy_from_slice(&header.physical.index_count.to_le_bytes());
data[HEADER_SIZE + 104..HEADER_SIZE + 108]
.copy_from_slice(&header.physical.catalog_collection_count.to_le_bytes());
data[HEADER_SIZE + 108..HEADER_SIZE + 116]
.copy_from_slice(&header.physical.catalog_total_entities.to_le_bytes());
data[HEADER_SIZE + 116..HEADER_SIZE + 120]
.copy_from_slice(&header.physical.export_count.to_le_bytes());
data[HEADER_SIZE + 120..HEADER_SIZE + 124]
.copy_from_slice(&header.physical.graph_projection_count.to_le_bytes());
data[HEADER_SIZE + 124..HEADER_SIZE + 128]
.copy_from_slice(&header.physical.analytics_job_count.to_le_bytes());
data[HEADER_SIZE + 128..HEADER_SIZE + 132]
.copy_from_slice(&header.physical.manifest_event_count.to_le_bytes());
data[HEADER_SIZE + 132..HEADER_SIZE + 136]
.copy_from_slice(&header.physical.registry_page.to_le_bytes());
data[HEADER_SIZE + 136..HEADER_SIZE + 144]
.copy_from_slice(&header.physical.registry_checksum.to_le_bytes());
data[HEADER_SIZE + 144..HEADER_SIZE + 148]
.copy_from_slice(&header.physical.recovery_page.to_le_bytes());
data[HEADER_SIZE + 148..HEADER_SIZE + 156]
.copy_from_slice(&header.physical.recovery_checksum.to_le_bytes());
data[HEADER_SIZE + 156..HEADER_SIZE + 160]
.copy_from_slice(&header.physical.catalog_page.to_le_bytes());
data[HEADER_SIZE + 160..HEADER_SIZE + 168]
.copy_from_slice(&header.physical.catalog_checksum.to_le_bytes());
data[HEADER_SIZE + 168..HEADER_SIZE + 172]
.copy_from_slice(&header.physical.metadata_state_page.to_le_bytes());
data[HEADER_SIZE + 172..HEADER_SIZE + 180]
.copy_from_slice(&header.physical.metadata_state_checksum.to_le_bytes());
data[HEADER_SIZE + 180..HEADER_SIZE + 184]
.copy_from_slice(&header.physical.vector_artifact_page.to_le_bytes());
data[HEADER_SIZE + 184..HEADER_SIZE + 192]
.copy_from_slice(&header.physical.vector_artifact_checksum.to_le_bytes());
data[HEADER_SIZE + 192] = if header.checkpoint_in_progress { 1 } else { 0 };
data[HEADER_SIZE + 193..HEADER_SIZE + 201]
.copy_from_slice(&header.checkpoint_target_lsn.to_le_bytes());
page.update_checksum();
self.write_header_shadow(&page)?;
self.write_page_raw(0, &page)?;
*self.header_dirty_lock()? = false;
Ok(())
}
fn read_page_raw(&self, page_id: u32) -> Result<Page, PagerError> {
let mut file = self.file_lock()?;
let offset = (page_id as u64) * (PAGE_SIZE as u64);
file.seek(SeekFrom::Start(offset))?;
let mut buf = [0u8; PAGE_SIZE];
file.read_exact(&mut buf)?;
let page = Page::from_bytes(buf);
if self.config.verify_checksums {
page.verify_checksum()?;
}
Ok(page)
}
fn write_page_raw(&self, page_id: u32, page: &Page) -> Result<(), PagerError> {
if self.config.read_only {
return Err(PagerError::ReadOnly);
}
let mut file = self.file_lock()?;
let offset = (page_id as u64) * (PAGE_SIZE as u64);
file.seek(SeekFrom::Start(offset))?;
file.write_all(page.as_bytes())?;
Ok(())
}
pub fn read_page(&self, page_id: u32) -> Result<Page, PagerError> {
if let Some(page) = self.cache.get(page_id) {
return Ok(page);
}
let page = self.read_page_raw(page_id)?;
if let Some(dirty_page) = self.cache.insert(page_id, page.clone()) {
let evicted_id = dirty_page.page_id();
self.write_page_raw(evicted_id, &dirty_page)?;
}
Ok(page)
}
pub fn read_page_no_checksum(&self, page_id: u32) -> Result<Page, PagerError> {
if let Some(page) = self.cache.get(page_id) {
return Ok(page);
}
let mut file = self.file_lock()?;
let offset = (page_id as u64) * (PAGE_SIZE as u64);
file.seek(SeekFrom::Start(offset))?;
let mut buf = [0u8; PAGE_SIZE];
file.read_exact(&mut buf)?;
drop(file);
let page = Page::from_bytes(buf);
if let Some(dirty_page) = self.cache.insert(page_id, page.clone()) {
let evicted_id = dirty_page.page_id();
self.write_page_raw(evicted_id, &dirty_page)?;
}
Ok(page)
}
pub fn write_page(&self, page_id: u32, mut page: Page) -> Result<(), PagerError> {
if self.config.read_only {
return Err(PagerError::ReadOnly);
}
page.update_checksum();
if let Some(dirty_page) = self.cache.insert(page_id, page) {
let evicted_id = dirty_page.page_id();
self.write_page_raw(evicted_id, &dirty_page)?;
}
self.cache.mark_dirty(page_id);
Ok(())
}
pub fn read_page_decrypted(&self, page_id: u32) -> Result<Page, PagerError> {
if page_id == 0 || self.encryption.is_none() {
return self.read_page(page_id);
}
let raw = self.read_page_no_checksum(page_id)?;
let (enc, _) = self
.encryption
.as_ref()
.expect("encryption presence checked above");
let plaintext = enc
.decrypt(page_id, raw.as_bytes())
.map_err(|e| PagerError::InvalidDatabase(format!("decrypt page {page_id}: {e}")))?;
let mut buf = [0u8; PAGE_SIZE];
let n = plaintext.len().min(PAGE_SIZE);
buf[..n].copy_from_slice(&plaintext[..n]);
Ok(Page::from_bytes(buf))
}
pub fn write_page_encrypted(&self, page_id: u32, page: Page) -> Result<(), PagerError> {
if page_id == 0 || self.encryption.is_none() {
return self.write_page(page_id, page);
}
const OVERHEAD: usize = 12 + 16; let plaintext_len = PAGE_SIZE - OVERHEAD;
let plaintext = &page.as_bytes()[..plaintext_len];
let (enc, _) = self
.encryption
.as_ref()
.expect("encryption presence checked above");
let ciphertext = enc.encrypt(page_id, plaintext);
debug_assert_eq!(ciphertext.len(), PAGE_SIZE);
let mut buf = [0u8; PAGE_SIZE];
buf.copy_from_slice(&ciphertext);
let cipher_page = Page::from_bytes(buf);
self.write_page_no_checksum(page_id, cipher_page)
}
pub fn write_page_no_checksum(&self, page_id: u32, page: Page) -> Result<(), PagerError> {
if self.config.read_only {
return Err(PagerError::ReadOnly);
}
if let Some(dirty_page) = self.cache.insert(page_id, page) {
let evicted_id = dirty_page.page_id();
self.write_page_raw(evicted_id, &dirty_page)?;
}
self.cache.mark_dirty(page_id);
Ok(())
}
pub fn allocate_page(&self, page_type: PageType) -> Result<Page, PagerError> {
if self.config.read_only {
return Err(PagerError::ReadOnly);
}
let page_id = {
let mut freelist = self.freelist_write()?;
if let Some(id) = freelist.allocate() {
id
} else if freelist.trunk_head() != 0 {
let trunk_id = freelist.trunk_head();
drop(freelist);
let trunk = self.read_page(trunk_id).map_err(|e| match e {
PagerError::PageNotFound(_) => {
PagerError::InvalidDatabase("Freelist trunk missing".to_string())
}
other => other,
})?;
let mut freelist = self.freelist_write()?;
freelist
.load_from_trunk(&trunk)
.map_err(|e| PagerError::InvalidDatabase(format!("Freelist: {}", e)))?;
let id = freelist.allocate().ok_or_else(|| {
PagerError::InvalidDatabase("Freelist empty after trunk load".to_string())
})?;
let mut header = self.header_write()?;
header.freelist_head = freelist.trunk_head();
*self.header_dirty_lock()? = true;
id
} else {
let mut header = self.header_write()?;
let id = header.page_count;
header.page_count += 1;
*self.header_dirty_lock()? = true;
id
}
};
let page = Page::new(page_type, page_id);
if let Some(dirty_page) = self.cache.insert(page_id, page.clone()) {
let evicted_id = dirty_page.page_id();
self.write_page_raw(evicted_id, &dirty_page)?;
}
self.cache.mark_dirty(page_id);
Ok(page)
}
pub fn free_page(&self, page_id: u32) -> Result<(), PagerError> {
if self.config.read_only {
return Err(PagerError::ReadOnly);
}
self.cache.remove(page_id);
let mut freelist = self.freelist_write()?;
freelist.free(page_id);
*self.header_dirty_lock()? = true;
Ok(())
}
pub fn header(&self) -> Result<DatabaseHeader, PagerError> {
Ok(self.header_read()?.clone())
}
pub fn physical_header(&self) -> Result<PhysicalFileHeader, PagerError> {
Ok(self.header_read()?.physical)
}
pub fn update_physical_header(&self, physical: PhysicalFileHeader) -> Result<(), PagerError> {
if self.config.read_only {
return Err(PagerError::ReadOnly);
}
let mut header = self.header_write()?;
header.physical = physical;
*self.header_dirty_lock()? = true;
Ok(())
}
pub fn page_count(&self) -> Result<u32, PagerError> {
Ok(self.header_read()?.page_count)
}
pub fn set_wal_writer(&self, wal: Arc<Mutex<crate::storage::wal::writer::WalWriter>>) {
let mut slot = self.wal.write().unwrap_or_else(|p| p.into_inner());
*slot = Some(wal);
}
pub fn clear_wal_writer(&self) {
let mut slot = self.wal.write().unwrap_or_else(|p| p.into_inner());
*slot = None;
}
pub fn has_wal_writer(&self) -> bool {
self.wal.read().map(|s| s.is_some()).unwrap_or(false)
}
pub fn flush(&self) -> Result<(), PagerError> {
if self.config.read_only {
return Ok(());
}
let trunks = {
let mut freelist = self.freelist_write()?;
if freelist.is_dirty() {
let mut header = self.header_write()?;
let trunks = freelist.flush_to_trunks(0, || {
let id = header.page_count;
header.page_count += 1;
id
});
header.freelist_head = freelist.trunk_head();
*self.header_dirty_lock()? = true;
freelist.mark_clean();
trunks
} else {
Vec::new()
}
};
for trunk in trunks {
let page_id = trunk.page_id();
self.cache.insert(page_id, trunk);
self.cache.mark_dirty(page_id);
}
let dirty_pages = self.cache.flush_dirty();
if !dirty_pages.is_empty() {
let max_lsn = dirty_pages
.iter()
.filter_map(|(_, page)| page.header().ok().map(|h| h.lsn))
.max()
.unwrap_or(0);
if max_lsn > 0 {
if let Ok(slot) = self.wal.read() {
if let Some(wal) = slot.as_ref() {
let wal = Arc::clone(wal);
drop(slot);
let mut wal_guard = wal.lock().unwrap_or_else(|p| p.into_inner());
wal_guard.flush_until(max_lsn).map_err(PagerError::Io)?;
}
}
}
self.write_pages_through_dwb(&dirty_pages)?;
}
if *self.header_dirty_lock()? {
self.write_header()?;
}
Ok(())
}
pub fn sync(&self) -> Result<(), PagerError> {
self.flush()?;
let file = self.file_lock()?;
file.sync_all()?;
Ok(())
}
pub fn cache_stats(&self) -> crate::storage::engine::page_cache::CacheStats {
self.cache.stats()
}
pub fn dirty_page_count(&self) -> usize {
self.cache.dirty_count()
}
pub fn dirty_fraction(&self) -> f64 {
let capacity = self.cache.capacity().max(1) as f64;
self.cache.dirty_count() as f64 / capacity
}
pub fn flush_some_dirty(&self, max: usize) -> Result<usize, PagerError> {
if self.config.read_only || max == 0 {
return Ok(0);
}
let dirty_pages = self.cache.flush_some_dirty(max);
if dirty_pages.is_empty() {
return Ok(0);
}
let count = dirty_pages.len();
for (page_id, page) in dirty_pages {
self.write_page(page_id, page)?;
}
Ok(count)
}
pub fn path(&self) -> &Path {
&self.path
}
pub fn is_read_only(&self) -> bool {
self.config.read_only
}
pub fn file_size(&self) -> Result<u64, PagerError> {
let file = self.file_lock()?;
Ok(file.metadata()?.len())
}
pub fn prefetch_hint(&self, page_id: u32) {
if let Ok(file) = self.file_lock() {
let _ = crate::storage::btree::prefetch::prefetch_page(
&file,
page_id as u64,
PAGE_SIZE as u32,
);
}
}
fn shadow_path(db_path: &Path) -> PathBuf {
let mut p = db_path.to_path_buf().into_os_string();
p.push("-hdr");
PathBuf::from(p)
}
fn meta_shadow_path(db_path: &Path) -> PathBuf {
let mut p = db_path.to_path_buf().into_os_string();
p.push("-meta");
PathBuf::from(p)
}
fn dwb_path(db_path: &Path) -> PathBuf {
let mut p = db_path.to_path_buf().into_os_string();
p.push("-dwb");
PathBuf::from(p)
}
fn open_dwb_file(db_path: &Path) -> Result<File, PagerError> {
Ok(OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(Self::dwb_path(db_path))?)
}
fn clear_dwb_file(file: &mut File) -> Result<(), PagerError> {
file.set_len(0)?;
file.seek(SeekFrom::Start(0))?;
file.sync_all()?;
Ok(())
}
fn write_header_shadow(&self, page: &Page) -> Result<(), PagerError> {
if self.config.read_only {
return Ok(());
}
let shadow = Self::shadow_path(&self.path);
let mut f = File::create(&shadow)?;
f.write_all(page.as_bytes())?;
f.sync_all()?;
Ok(())
}
fn recover_header_from_shadow(&self) -> Result<Page, PagerError> {
let shadow = Self::shadow_path(&self.path);
if !shadow.exists() {
return Err(PagerError::InvalidDatabase(
"Page 0 corrupted and no header shadow found".into(),
));
}
let mut f = File::open(&shadow)?;
let mut buf = [0u8; PAGE_SIZE];
f.read_exact(&mut buf)?;
let page = Page::from_bytes(buf);
let magic = &page.as_bytes()[HEADER_SIZE..HEADER_SIZE + 4];
if magic != MAGIC_BYTES {
return Err(PagerError::InvalidDatabase(
"Header shadow also corrupted".into(),
));
}
if !self.config.read_only {
self.write_page_raw(0, &page)?;
let file = self.file_lock()?;
file.sync_all()?;
}
Ok(page)
}
pub fn write_meta_shadow(&self, page: &Page) -> Result<(), PagerError> {
if self.config.read_only {
return Ok(());
}
let shadow = Self::meta_shadow_path(&self.path);
let mut f = File::create(&shadow)?;
f.write_all(page.as_bytes())?;
f.sync_all()?;
Ok(())
}
pub fn recover_meta_from_shadow(&self) -> Result<Page, PagerError> {
let shadow = Self::meta_shadow_path(&self.path);
if !shadow.exists() {
return Err(PagerError::InvalidDatabase(
"Page 1 corrupted and no metadata shadow found".into(),
));
}
let mut f = File::open(&shadow)?;
let mut buf = [0u8; PAGE_SIZE];
f.read_exact(&mut buf)?;
let page = Page::from_bytes(buf);
if !self.config.read_only {
self.write_page_raw(1, &page)?;
let file = self.file_lock()?;
file.sync_all()?;
}
Ok(page)
}
fn write_pages_through_dwb(&self, pages: &[(u32, Page)]) -> Result<(), PagerError> {
if let Some(dwb_mutex) = &self.dwb_file {
let mut dwb = dwb_mutex.lock().map_err(|_| PagerError::LockPoisoned)?;
let entry_size = 4 + PAGE_SIZE; let header_len = 4 + 4 + 4; let total = header_len + pages.len() * entry_size;
let mut buf = Vec::with_capacity(total);
buf.extend_from_slice(&DWB_MAGIC);
buf.extend_from_slice(&(pages.len() as u32).to_le_bytes());
buf.extend_from_slice(&[0u8; 4]);
for (page_id, page) in pages {
buf.extend_from_slice(&page_id.to_le_bytes());
buf.extend_from_slice(page.as_bytes());
}
let checksum = super::super::crc32::crc32(&buf[header_len..]);
buf[8..12].copy_from_slice(&checksum.to_le_bytes());
dwb.seek(SeekFrom::Start(0))?;
dwb.write_all(&buf)?;
dwb.set_len(buf.len() as u64)?;
dwb.sync_all()?;
for (page_id, page) in pages {
self.write_page_raw(*page_id, page)?;
}
Self::clear_dwb_file(&mut dwb)?;
Ok(())
} else {
for (page_id, page) in pages {
self.write_page_raw(*page_id, page)?;
}
Ok(())
}
}
fn recover_from_dwb(&self) -> Result<(), PagerError> {
let dwb_path = Self::dwb_path(&self.path);
if !dwb_path.exists() {
return Ok(());
}
if let Some(dwb_mutex) = &self.dwb_file {
let mut file = dwb_mutex.lock().map_err(|_| PagerError::LockPoisoned)?;
return self.recover_from_dwb_file(&mut file);
}
let mut file = OpenOptions::new().read(true).write(true).open(&dwb_path)?;
self.recover_from_dwb_file(&mut file)
}
fn recover_from_dwb_file(&self, file: &mut File) -> Result<(), PagerError> {
file.seek(SeekFrom::Start(0))?;
let len = file.metadata()?.len();
if len < 12 {
return Self::clear_dwb_file(file);
}
let mut buf = vec![0u8; len as usize];
file.read_exact(&mut buf)?;
if buf[0..4] != DWB_MAGIC {
return Self::clear_dwb_file(file);
}
let count = u32::from_le_bytes([buf[4], buf[5], buf[6], buf[7]]) as usize;
let stored_checksum = u32::from_le_bytes([buf[8], buf[9], buf[10], buf[11]]);
let header_len = 12;
let entry_size = 4 + PAGE_SIZE;
let expected_len = header_len + count * entry_size;
if buf.len() < expected_len {
return Self::clear_dwb_file(file);
}
let computed = super::super::crc32::crc32(&buf[header_len..expected_len]);
if computed != stored_checksum {
return Self::clear_dwb_file(file);
}
let mut offset = header_len;
for _ in 0..count {
let page_id = u32::from_le_bytes([
buf[offset],
buf[offset + 1],
buf[offset + 2],
buf[offset + 3],
]);
offset += 4;
let mut page_data = [0u8; PAGE_SIZE];
page_data.copy_from_slice(&buf[offset..offset + PAGE_SIZE]);
offset += PAGE_SIZE;
let page = Page::from_bytes(page_data);
self.write_page_raw(page_id, &page)?;
}
{
let file = self.file_lock()?;
file.sync_all()?;
}
Self::clear_dwb_file(file)
}
pub fn write_header_and_sync(&self) -> Result<(), PagerError> {
self.write_header()?;
let file = self.file_lock()?;
file.sync_all()?;
Ok(())
}
pub fn set_checkpoint_in_progress(
&self,
in_progress: bool,
target_lsn: u64,
) -> Result<(), PagerError> {
let mut header = self.header_write()?;
header.checkpoint_in_progress = in_progress;
header.checkpoint_target_lsn = target_lsn;
*self.header_dirty_lock()? = true;
drop(header);
self.write_header_and_sync()
}
pub fn complete_checkpoint(&self, lsn: u64) -> Result<(), PagerError> {
let mut header = self.header_write()?;
header.checkpoint_lsn = lsn;
header.checkpoint_in_progress = false;
header.checkpoint_target_lsn = 0;
*self.header_dirty_lock()? = true;
drop(header);
self.write_header_and_sync()
}
}
#[cfg(test)]
mod tests {
use super::*;
fn temp_db_path(name: &str) -> PathBuf {
std::env::temp_dir().join(format!(
"reddb-pager-{}-{}-{}.rdb",
name,
std::process::id(),
crate::utils::now_unix_nanos()
))
}
#[test]
fn open_refuses_future_database_version() {
let path = temp_db_path("future-version");
let pager = Pager::open_default(&path).unwrap();
drop(pager);
let mut future_header = Page::new_header_page(1);
future_header.as_bytes_mut()[HEADER_SIZE + 4..HEADER_SIZE + 8]
.copy_from_slice(&(DB_VERSION + 1).to_le_bytes());
future_header.update_checksum();
let mut file = OpenOptions::new().write(true).open(&path).unwrap();
file.seek(SeekFrom::Start(0)).unwrap();
file.write_all(future_header.as_bytes()).unwrap();
file.sync_all().unwrap();
drop(file);
let err = match Pager::open_default(&path) {
Ok(_) => panic!("future database version should be rejected"),
Err(err) => err,
};
match err {
PagerError::InvalidDatabase(msg) => {
assert!(msg.contains("newer than supported"));
}
other => panic!("expected InvalidDatabase, got {other:?}"),
}
let _ = std::fs::remove_file(&path);
let _ = std::fs::remove_file(Pager::shadow_path(&path));
let _ = std::fs::remove_file(Pager::dwb_path(&path));
}
}