use super::{Page, PageSource};
use crate::error::{Error, Result};
use crate::format::header::HEADER_LEN;
use crate::format::{DatabaseHeader, TextEncoding};
use crate::vfs::File;
use alloc::boxed::Box;
use alloc::collections::BTreeMap;
use alloc::format;
use alloc::vec;
use alloc::vec::Vec;
const JOURNAL_MAGIC: &[u8; 8] = b"GSQLJRN1";
pub struct WritePager {
file: Box<dyn File>,
journal: Option<Box<dyn File>>,
header: DatabaseHeader,
page_size: usize,
disk_pages: u32,
page_count: u32,
overlay: BTreeMap<u32, Vec<u8>>,
wal_file: Option<Box<dyn File>>,
wal: Option<WalRuntime>,
}
struct WalRuntime {
frames: BTreeMap<u32, Vec<u8>>,
offset: u64,
cksum: (u32, u32),
salt: [u8; 8],
db_size: u32,
}
const WAL_MAGIC_LE: u32 = 0x377f_0682; const WAL_HDR_LEN: usize = 32;
const WAL_FRAME_HDR_LEN: usize = 24;
impl WritePager {
pub fn open(file: Box<dyn File>, journal: Option<Box<dyn File>>) -> Result<WritePager> {
Self::open_wal(file, journal, None)
}
pub fn open_wal(
mut file: Box<dyn File>,
mut journal: Option<Box<dyn File>>,
mut wal_file: Option<Box<dyn File>>,
) -> Result<WritePager> {
if let Some(j) = journal.as_mut() {
Self::recover(file.as_mut(), j.as_mut())?;
}
let file_size = file.size()?;
if file_size < HEADER_LEN as u64 {
return Err(Error::Corrupt("file too small to be a database".into()));
}
let mut head = [0u8; HEADER_LEN];
file.read_exact_at(&mut head, 0)?;
let header = DatabaseHeader::parse(&head)?;
let page_size = header.page_size as usize;
if file_size % page_size as u64 != 0 {
return Err(Error::Corrupt(
"file size not a multiple of page size".into(),
));
}
let pages = (file_size / page_size as u64) as u32;
let wal = if header.read_version == 2 {
match wal_file.as_mut() {
Some(w) => Self::load_wal(w.as_mut(), page_size)?,
None => None,
}
} else {
None
};
let page_count = wal.as_ref().map(|w| w.db_size).unwrap_or(pages);
Ok(WritePager {
file,
journal,
header,
page_size,
disk_pages: pages,
page_count,
overlay: BTreeMap::new(),
wal_file,
wal,
})
}
pub fn create(
file: Box<dyn File>,
journal: Option<Box<dyn File>>,
page_size: u32,
) -> Result<WritePager> {
Self::create_wal(file, journal, None, page_size)
}
pub fn create_wal(
file: Box<dyn File>,
journal: Option<Box<dyn File>>,
wal_file: Option<Box<dyn File>>,
page_size: u32,
) -> Result<WritePager> {
if page_size < 512 || !page_size.is_power_of_two() {
return Err(Error::Error(format!("invalid page size {page_size}")));
}
let header = DatabaseHeader {
page_size,
write_version: 1,
read_version: 1,
reserved_space: 0,
change_counter: 1,
size_in_pages: 1,
freelist_trunk: 0,
freelist_count: 0,
schema_cookie: 0,
schema_format: 4,
default_cache_size: 0,
largest_root_page: 0,
text_encoding: TextEncoding::Utf8,
user_version: 0,
incremental_vacuum: 0,
application_id: 0,
version_valid_for: 1,
sqlite_version_number: 3_053_002,
};
let mut wp = WritePager {
file,
journal,
header,
page_size: page_size as usize,
disk_pages: 0,
page_count: 1,
overlay: BTreeMap::new(),
wal_file,
wal: None,
};
let mut page1 = vec![0u8; page_size as usize];
wp.header.write_to(&mut page1)?;
write_empty_leaf_header(&mut page1, HEADER_LEN, page_size);
wp.overlay.insert(1, page1);
Ok(wp)
}
pub fn header_mut(&mut self) -> &mut DatabaseHeader {
&mut self.header
}
pub fn read_page(&self, number: u32) -> Result<Vec<u8>> {
if let Some(bytes) = self.overlay.get(&number) {
return Ok(bytes.clone());
}
if let Some(w) = &self.wal {
if let Some(bytes) = w.frames.get(&number) {
return Ok(bytes.clone());
}
}
if number == 0 || number > self.disk_pages {
return Err(Error::Corrupt(format!("page {number} out of range")));
}
let mut buf = vec![0u8; self.page_size];
self.file
.read_exact_at(&mut buf, (number as u64 - 1) * self.page_size as u64)?;
Ok(buf)
}
pub fn write_page(&mut self, number: u32, bytes: Vec<u8>) -> Result<()> {
if bytes.len() != self.page_size {
return Err(Error::Error("page image has wrong size".into()));
}
self.overlay.insert(number, bytes);
Ok(())
}
pub fn allocate_page(&mut self) -> Result<u32> {
if self.header.freelist_count > 0 && self.header.freelist_trunk != 0 {
return self.alloc_from_freelist();
}
self.page_count += 1;
let n = self.page_count;
self.overlay.insert(n, vec![0u8; self.page_size]);
Ok(n)
}
fn alloc_from_freelist(&mut self) -> Result<u32> {
let trunk = self.header.freelist_trunk;
let mut tbytes = self.read_page(trunk)?;
let leaf_count = be32(&tbytes, 4);
if leaf_count > 0 {
let idx = 8 + 4 * (leaf_count as usize - 1);
let leaf = be32(&tbytes, idx);
put32(&mut tbytes, 4, leaf_count - 1);
self.write_page(trunk, tbytes)?;
self.header.freelist_count -= 1;
self.overlay.insert(leaf, vec![0u8; self.page_size]);
Ok(leaf)
} else {
let next = be32(&tbytes, 0);
self.header.freelist_trunk = next;
self.header.freelist_count -= 1;
self.overlay.insert(trunk, vec![0u8; self.page_size]);
Ok(trunk)
}
}
pub fn free_page(&mut self, page: u32) -> Result<()> {
let trunk = self.header.freelist_trunk;
let max_leaves = (self.usable_size() / 4).saturating_sub(2) as u32;
if trunk != 0 {
let mut tbytes = self.read_page(trunk)?;
let leaf_count = be32(&tbytes, 4);
if leaf_count < max_leaves {
let idx = 8 + 4 * leaf_count as usize;
put32(&mut tbytes, idx, page);
put32(&mut tbytes, 4, leaf_count + 1);
self.write_page(trunk, tbytes)?;
self.header.freelist_count += 1;
return Ok(());
}
}
let mut nb = vec![0u8; self.page_size];
put32(&mut nb, 0, trunk); put32(&mut nb, 4, 0); self.write_page(page, nb)?;
self.header.freelist_trunk = page;
self.header.freelist_count += 1;
Ok(())
}
pub fn rollback(&mut self) {
self.overlay.clear();
self.page_count = match &self.wal {
Some(w) => w.db_size,
None => self.disk_pages,
};
}
pub fn commit(&mut self) -> Result<()> {
if self.overlay.is_empty() {
return Ok(());
}
if self.wal.is_some() {
return self.commit_wal();
}
self.header.change_counter = self.header.change_counter.wrapping_add(1);
self.header.size_in_pages = self.page_count;
self.header.version_valid_for = self.header.change_counter;
let mut page1 = self.overlay.get(&1).cloned().unwrap_or(self.read_page(1)?);
self.header.write_to(&mut page1)?;
self.overlay.insert(1, page1);
if self.journal.is_some() {
self.write_journal()?;
}
let page_size = self.page_size as u64;
let pages: Vec<(u32, Vec<u8>)> =
self.overlay.iter().map(|(k, v)| (*k, v.clone())).collect();
for (n, bytes) in &pages {
self.file.write_all_at(bytes, (*n as u64 - 1) * page_size)?;
}
self.file.truncate(self.page_count as u64 * page_size)?;
self.file.sync()?;
if let Some(j) = self.journal.as_mut() {
j.truncate(0)?;
j.sync()?;
}
self.disk_pages = self.page_count;
self.overlay.clear();
Ok(())
}
pub fn wal_mode(&self) -> bool {
self.wal.is_some()
}
pub fn set_wal_mode(&mut self) -> Result<bool> {
if self.wal.is_some() {
return Ok(true);
}
if self.wal_file.is_none() {
return Ok(false);
}
if self.header.read_version != 2 || self.header.write_version != 2 {
self.header.read_version = 2;
self.header.write_version = 2;
let mut page1 = self.overlay.get(&1).cloned().unwrap_or(self.read_page(1)?);
self.header.write_to(&mut page1)?;
self.overlay.insert(1, page1);
self.commit()?; }
if let Some(w) = self.wal_file.as_mut() {
w.truncate(0)?;
w.sync()?;
}
let salt = (self.header.change_counter as u64)
.wrapping_mul(0x9E37_79B9)
.to_be_bytes();
self.wal = Some(WalRuntime {
frames: BTreeMap::new(),
offset: 0,
cksum: (0, 0),
salt,
db_size: self.page_count,
});
Ok(true)
}
fn commit_wal(&mut self) -> Result<()> {
self.header.change_counter = self.header.change_counter.wrapping_add(1);
self.header.size_in_pages = self.page_count;
self.header.version_valid_for = self.header.change_counter;
let mut page1 = self.overlay.get(&1).cloned().unwrap_or(self.read_page(1)?);
self.header.write_to(&mut page1)?;
self.overlay.insert(1, page1);
let page_size = self.page_size;
let pages: Vec<(u32, Vec<u8>)> =
self.overlay.iter().map(|(k, v)| (*k, v.clone())).collect();
let wal = self.wal.as_mut().expect("wal mode");
let salt = wal.salt;
let file = self.wal_file.as_mut().expect("wal file");
if wal.offset == 0 {
let mut hdr = [0u8; WAL_HDR_LEN];
hdr[0..4].copy_from_slice(&WAL_MAGIC_LE.to_be_bytes());
hdr[4..8].copy_from_slice(&3_007_000u32.to_be_bytes()); hdr[8..12].copy_from_slice(&(page_size as u32).to_be_bytes());
hdr[12..16].copy_from_slice(&0u32.to_be_bytes()); hdr[16..24].copy_from_slice(&salt);
let (h0, h1) = super::wal::checksum(false, 0, 0, &hdr[0..24]);
hdr[24..28].copy_from_slice(&h0.to_be_bytes());
hdr[28..32].copy_from_slice(&h1.to_be_bytes());
file.write_all_at(&hdr, 0)?;
wal.offset = WAL_HDR_LEN as u64;
wal.cksum = (h0, h1);
}
let (mut s0, mut s1) = wal.cksum;
let n = pages.len();
let frame_len = WAL_FRAME_HDR_LEN + page_size;
for (i, (page_no, data)) in pages.iter().enumerate() {
let db_size = if i + 1 == n { self.page_count } else { 0 };
let mut fhdr = [0u8; WAL_FRAME_HDR_LEN];
fhdr[0..4].copy_from_slice(&page_no.to_be_bytes());
fhdr[4..8].copy_from_slice(&db_size.to_be_bytes());
fhdr[8..16].copy_from_slice(&salt);
let (c0, c1) = super::wal::checksum(false, s0, s1, &fhdr[0..8]);
let (c0, c1) = super::wal::checksum(false, c0, c1, data);
fhdr[16..20].copy_from_slice(&c0.to_be_bytes());
fhdr[20..24].copy_from_slice(&c1.to_be_bytes());
let mut frame = Vec::with_capacity(frame_len);
frame.extend_from_slice(&fhdr);
frame.extend_from_slice(data);
file.write_all_at(&frame, wal.offset)?;
wal.offset += frame_len as u64;
s0 = c0;
s1 = c1;
}
file.sync()?;
wal.cksum = (s0, s1);
wal.db_size = self.page_count;
for (page_no, data) in pages {
wal.frames.insert(page_no, data);
}
self.overlay.clear();
Ok(())
}
pub fn checkpoint(&mut self) -> Result<()> {
let Some(wal) = self.wal.as_mut() else {
return Ok(());
};
let page_size = self.page_size as u64;
let frames: Vec<(u32, Vec<u8>)> = wal.frames.iter().map(|(k, v)| (*k, v.clone())).collect();
let db_size = wal.db_size;
for (page_no, data) in &frames {
self.file
.write_all_at(data, (*page_no as u64 - 1) * page_size)?;
}
self.file.truncate(db_size as u64 * page_size)?;
self.file.sync()?;
self.disk_pages = db_size;
if let Some(w) = self.wal_file.as_mut() {
w.truncate(0)?;
w.sync()?;
}
let new_salt = {
let mut s = wal.salt;
let v = u32::from_be_bytes([s[0], s[1], s[2], s[3]]).wrapping_add(1);
s[0..4].copy_from_slice(&v.to_be_bytes());
s
};
self.wal = Some(WalRuntime {
frames: BTreeMap::new(),
offset: 0,
cksum: (0, 0),
salt: new_salt,
db_size,
});
Ok(())
}
pub fn replace_image(&mut self, image: Vec<Vec<u8>>) -> Result<()> {
if image.is_empty() || image[0].len() != self.page_size {
return Err(Error::Error("invalid VACUUM image".into()));
}
let ps = self.page_size as u64;
for (i, bytes) in image.iter().enumerate() {
self.file.write_all_at(bytes, i as u64 * ps)?;
}
self.file.truncate(image.len() as u64 * ps)?;
self.file.sync()?;
let count = image.len() as u32;
self.header = DatabaseHeader::parse(&image[0])?;
self.disk_pages = count;
self.page_count = count;
self.overlay.clear();
if self.wal.is_some() {
if let Some(w) = self.wal_file.as_mut() {
w.truncate(0)?;
w.sync()?;
}
self.header.read_version = 2;
self.header.write_version = 2;
self.wal = Some(WalRuntime {
frames: BTreeMap::new(),
offset: 0,
cksum: (0, 0),
salt: (count as u64).wrapping_mul(0x9E37_79B9).to_be_bytes(),
db_size: count,
});
}
Ok(())
}
fn load_wal(wal: &mut dyn File, page_size: usize) -> Result<Option<WalRuntime>> {
let size = wal.size()?;
if size < WAL_HDR_LEN as u64 {
return Ok(None);
}
let mut hdr = [0u8; WAL_HDR_LEN];
wal.read_exact_at(&mut hdr, 0)?;
let magic = u32::from_be_bytes([hdr[0], hdr[1], hdr[2], hdr[3]]);
if magic & 0xFFFF_FFFE != WAL_MAGIC_LE {
return Ok(None);
}
let big_endian = (magic & 1) == 1;
let wal_ps = u32::from_be_bytes([hdr[8], hdr[9], hdr[10], hdr[11]]) as usize;
if wal_ps != page_size {
return Ok(None);
}
let mut salt = [0u8; 8];
salt.copy_from_slice(&hdr[16..24]);
let (h0, h1) = super::wal::checksum(big_endian, 0, 0, &hdr[0..24]);
if h0 != u32::from_be_bytes([hdr[24], hdr[25], hdr[26], hdr[27]])
|| h1 != u32::from_be_bytes([hdr[28], hdr[29], hdr[30], hdr[31]])
{
return Ok(None);
}
let frame_len = WAL_FRAME_HDR_LEN + page_size;
let (mut s0, mut s1) = (h0, h1);
let mut off = WAL_HDR_LEN as u64;
let mut frames: BTreeMap<u32, Vec<u8>> = BTreeMap::new();
let mut pending: Vec<(u32, Vec<u8>)> = Vec::new();
let mut db_size = 0u32;
let mut committed_off = WAL_HDR_LEN as u64;
let mut committed_cksum = (h0, h1);
while off + frame_len as u64 <= size {
let mut fhdr = [0u8; WAL_FRAME_HDR_LEN];
wal.read_exact_at(&mut fhdr, off)?;
let mut page = vec![0u8; page_size];
wal.read_exact_at(&mut page, off + WAL_FRAME_HDR_LEN as u64)?;
if fhdr[8..16] != salt {
break;
}
let (c0, c1) = super::wal::checksum(big_endian, s0, s1, &fhdr[0..8]);
let (c0, c1) = super::wal::checksum(big_endian, c0, c1, &page);
if c0 != u32::from_be_bytes([fhdr[16], fhdr[17], fhdr[18], fhdr[19]])
|| c1 != u32::from_be_bytes([fhdr[20], fhdr[21], fhdr[22], fhdr[23]])
{
break;
}
s0 = c0;
s1 = c1;
let page_no = u32::from_be_bytes([fhdr[0], fhdr[1], fhdr[2], fhdr[3]]);
let commit = u32::from_be_bytes([fhdr[4], fhdr[5], fhdr[6], fhdr[7]]);
pending.push((page_no, page));
off += frame_len as u64;
if commit != 0 {
for (p, d) in pending.drain(..) {
frames.insert(p, d);
}
db_size = commit;
committed_off = off;
committed_cksum = (s0, s1);
}
}
if frames.is_empty() {
return Ok(None);
}
Ok(Some(WalRuntime {
frames,
offset: committed_off,
cksum: committed_cksum,
salt,
db_size,
}))
}
fn write_journal(&mut self) -> Result<()> {
let mut originals: Vec<(u32, Vec<u8>)> = Vec::new();
for &n in self.overlay.keys() {
if n <= self.disk_pages {
let mut buf = vec![0u8; self.page_size];
self.file
.read_exact_at(&mut buf, (n as u64 - 1) * self.page_size as u64)?;
originals.push((n, buf));
}
}
let j = self.journal.as_mut().unwrap();
j.truncate(0)?;
let mut hdr = Vec::with_capacity(16);
hdr.extend_from_slice(JOURNAL_MAGIC);
hdr.extend_from_slice(&self.disk_pages.to_be_bytes());
hdr.extend_from_slice(&(self.page_size as u32).to_be_bytes());
j.write_all_at(&hdr, 0)?;
let mut off = hdr.len() as u64;
for (n, bytes) in &originals {
j.write_all_at(&n.to_be_bytes(), off)?;
off += 4;
j.write_all_at(bytes, off)?;
off += self.page_size as u64;
}
j.sync()?;
Ok(())
}
fn recover(file: &mut dyn File, journal: &mut dyn File) -> Result<()> {
let jsize = journal.size()?;
if jsize < 16 {
return Ok(()); }
let mut hdr = [0u8; 16];
journal.read_exact_at(&mut hdr, 0)?;
if &hdr[0..8] != JOURNAL_MAGIC {
return Ok(()); }
let orig_pages = u32::from_be_bytes([hdr[8], hdr[9], hdr[10], hdr[11]]);
let page_size = u32::from_be_bytes([hdr[12], hdr[13], hdr[14], hdr[15]]) as usize;
let mut off = 16u64;
while off + 4 + page_size as u64 <= jsize {
let mut nb = [0u8; 4];
journal.read_exact_at(&mut nb, off)?;
off += 4;
let n = u32::from_be_bytes(nb);
let mut buf = vec![0u8; page_size];
journal.read_exact_at(&mut buf, off)?;
off += page_size as u64;
file.write_all_at(&buf, (n as u64 - 1) * page_size as u64)?;
}
file.truncate(orig_pages as u64 * page_size as u64)?;
file.sync()?;
journal.truncate(0)?;
journal.sync()?;
Ok(())
}
}
impl PageSource for WritePager {
fn page(&self, number: u32) -> Result<Page> {
Ok(Page::from_bytes(number, self.read_page(number)?))
}
fn header(&self) -> &DatabaseHeader {
&self.header
}
fn usable_size(&self) -> usize {
self.header.usable_size() as usize
}
fn page_count(&self) -> u32 {
self.page_count
}
}
#[inline]
fn be32(b: &[u8], at: usize) -> u32 {
u32::from_be_bytes([b[at], b[at + 1], b[at + 2], b[at + 3]])
}
#[inline]
fn put32(b: &mut [u8], at: usize, v: u32) {
b[at..at + 4].copy_from_slice(&v.to_be_bytes());
}
fn write_empty_leaf_header(page: &mut [u8], offset: usize, page_size: u32) {
page[offset] = 0x0d; page[offset + 1] = 0; page[offset + 2] = 0;
page[offset + 3] = 0; page[offset + 4] = 0;
let ccs: u16 = if page_size == 65536 {
0
} else {
page_size as u16
};
page[offset + 5] = (ccs >> 8) as u8;
page[offset + 6] = ccs as u8;
page[offset + 7] = 0; }
#[cfg(test)]
mod tests {
use super::*;
use crate::vfs::{memory::MemoryVfs, OpenFlags, Vfs};
fn mem_wp() -> WritePager {
let vfs = MemoryVfs::new();
let file = vfs.open("db", OpenFlags::READ_WRITE_CREATE).unwrap();
WritePager::create(file, None, 4096).unwrap()
}
#[test]
fn create_yields_valid_empty_db() {
let mut wp = mem_wp();
wp.commit().unwrap();
let p1 = wp.read_page(1).unwrap();
let h = DatabaseHeader::parse(&p1).unwrap();
assert_eq!(h.page_size, 4096);
assert_eq!(h.size_in_pages, 1);
assert_eq!(p1[100], 0x0d); }
#[test]
fn allocate_and_readback() {
let mut wp = mem_wp();
let n = wp.allocate_page().unwrap();
assert_eq!(n, 2);
let mut img = vec![0u8; 4096];
img[0] = 0x0d;
wp.write_page(n, img).unwrap();
wp.commit().unwrap();
assert_eq!(wp.page_count(), 2);
assert_eq!(wp.read_page(2).unwrap()[0], 0x0d);
}
#[test]
fn rollback_discards_overlay() {
let mut wp = mem_wp();
wp.commit().unwrap(); wp.allocate_page().unwrap();
wp.rollback();
assert_eq!(wp.page_count(), 1);
}
#[test]
fn journal_recovery_restores_originals() {
let vfs = MemoryVfs::new();
{
let file = vfs.open("db", OpenFlags::READ_WRITE_CREATE).unwrap();
let jf = vfs
.open("db-journal", OpenFlags::READ_WRITE_CREATE)
.unwrap();
let mut wp = WritePager::create(file, Some(jf), 4096).unwrap();
wp.commit().unwrap(); }
let orig_p1 = {
let f = vfs.open("db", OpenFlags::READ_ONLY).unwrap();
let mut b = vec![0u8; 4096];
f.read_exact_at(&mut b, 0).unwrap();
b
};
{
let mut j = vfs.open("db-journal", OpenFlags::READ_WRITE).unwrap();
let mut hdr = Vec::new();
hdr.extend_from_slice(JOURNAL_MAGIC);
hdr.extend_from_slice(&1u32.to_be_bytes()); hdr.extend_from_slice(&4096u32.to_be_bytes());
j.write_all_at(&hdr, 0).unwrap();
j.write_all_at(&1u32.to_be_bytes(), 16).unwrap();
j.write_all_at(&orig_p1, 20).unwrap();
j.sync().unwrap();
}
{
let mut f = vfs.open("db", OpenFlags::READ_WRITE).unwrap();
f.write_all_at(&[0xFFu8; 16], 0).unwrap();
}
let file = vfs.open("db", OpenFlags::READ_WRITE).unwrap();
let jf = vfs.open("db-journal", OpenFlags::READ_WRITE).unwrap();
let wp = WritePager::open(file, Some(jf)).unwrap();
assert_eq!(wp.read_page(1).unwrap(), orig_p1);
}
}