use anyhow::{ensure, Result};
use crc32c::*;
use fs2::FileExt;
use std::cmp::Ordering;
use std::convert::TryInto;
use std::fs::{File, OpenOptions};
use std::iter;
use std::ops::Bound::*;
use std::ops::{Bound, RangeBounds};
use std::os::unix::prelude::FileExt as UnixFileExt;
use std::path::Path;
use std::sync::{Condvar, Mutex, MutexGuard, RwLock, RwLockWriteGuard};
type PageId = u32; type BufferId = u32; type LSN = u64; type ItemPointer = usize;
pub type Key = Vec<u8>;
pub type Value = Vec<u8>;
const PAGE_SIZE: usize = 8192;
const MAGIC: u32 = 0xBACE2021;
const VERSION: u32 = 1;
const METADATA_SIZE: usize = 7 * 4;
const PAGE_HEADER_SIZE: usize = 2; const MAX_VALUE_LEN: usize = PAGE_SIZE / 4; const MAX_KEY_LEN: usize = u8::MAX as usize; const N_BUSY_EVENTS: usize = 8;
const PAGE_RAW: u16 = 1; const PAGE_BUSY: u16 = 2; const PAGE_DIRTY: u16 = 4; const PAGE_WAIT: u16 = 8;
enum LookupOp<'a> {
First,
Last,
Next,
Prev,
GreaterOrEqual(&'a Key),
}
#[derive(PartialEq, Copy, Clone, Debug)]
pub enum DatabaseState {
InRecovery,
Opened,
Closed,
Corrupted,
}
#[derive(PartialEq)]
enum AccessMode {
ReadOnly,
WriteOnly,
}
#[derive(PartialEq)]
pub enum TransactionStatus {
InProgress,
Committed,
Aborted,
}
pub struct Transaction<'a> {
pub status: TransactionStatus,
storage: &'a Storage,
db: RwLockWriteGuard<'a, Database>,
}
#[derive(Clone, Copy, Default)]
pub struct RecoveryStatus {
pub recovered_transactions: u64,
pub wal_size: u64,
pub recovery_end: u64,
}
#[derive(Clone, Copy, Debug)]
pub struct DatabaseInfo {
pub tree_height: usize,
pub db_size: u64,
pub db_used: u64,
pub log_size: u64,
pub n_committed_transactions: u64,
pub n_aborted_transactions: u64,
pub state: DatabaseState,
}
#[derive(Clone, Copy, Debug)]
pub struct CacheInfo {
pub pinned: usize,
pub used: usize,
}
pub struct StorageIterator<'a> {
storage: &'a Storage,
trans: Option<&'a Transaction<'a>>,
from: Bound<Key>,
till: Bound<Key>,
left: TreePath,
right: TreePath,
}
struct PagePos {
pid: PageId,
pos: usize,
}
struct TreePath {
curr: Option<(Key, Value)>, result: Option<Result<(Key, Value)>>,
stack: Vec<PagePos>, lsn: LSN, }
impl TreePath {
fn new() -> TreePath {
TreePath {
curr: None,
result: None,
stack: Vec::new(),
lsn: 0,
}
}
}
impl<'a> StorageIterator<'a> {
fn next_locked(&mut self, db: &Database) -> Option<<StorageIterator<'a> as Iterator>::Item> {
if self.left.stack.len() == 0 {
match &self.from {
Bound::Included(key) => {
self.storage
.lookup(db, LookupOp::GreaterOrEqual(key), &mut self.left)
}
Bound::Excluded(key) => {
self.storage
.lookup(db, LookupOp::GreaterOrEqual(key), &mut self.left);
if let Some((curr_key, _value)) = &self.left.curr {
if curr_key == key {
self.storage.lookup(db, LookupOp::Next, &mut self.left);
}
}
}
Bound::Unbounded => self.storage.lookup(db, LookupOp::First, &mut self.left),
}
} else {
self.storage.lookup(db, LookupOp::Next, &mut self.left);
}
if let Some((curr_key, _value)) = &self.left.curr {
match &self.till {
Bound::Included(key) => {
if curr_key > key {
return None;
}
}
Bound::Excluded(key) => {
if curr_key >= key {
return None;
}
}
Bound::Unbounded => {}
}
}
self.left.result.take()
}
fn next_back_locked(
&mut self,
db: &Database,
) -> Option<<StorageIterator<'a> as Iterator>::Item> {
if self.right.stack.len() == 0 {
match &self.till {
Bound::Included(key) => {
self.storage
.lookup(db, LookupOp::GreaterOrEqual(key), &mut self.right);
if let Some((curr_key, _value)) = &self.right.curr {
if curr_key > key {
self.storage.lookup(db, LookupOp::Prev, &mut self.right);
}
} else {
self.storage.lookup(db, LookupOp::Last, &mut self.right);
}
}
Bound::Excluded(key) => {
self.storage
.lookup(db, LookupOp::GreaterOrEqual(key), &mut self.right);
if let Some((curr_key, _value)) = &self.right.curr {
if curr_key >= key {
self.storage.lookup(db, LookupOp::Prev, &mut self.right);
}
} else {
self.storage.lookup(db, LookupOp::Last, &mut self.right);
}
}
Bound::Unbounded => self.storage.lookup(db, LookupOp::Last, &mut self.right),
}
} else {
self.storage.lookup(db, LookupOp::Prev, &mut self.right);
}
if let Some((curr_key, _value)) = &self.right.curr {
match &self.from {
Bound::Included(key) => {
if curr_key < key {
return None;
}
}
Bound::Excluded(key) => {
if curr_key <= key {
return None;
}
}
Bound::Unbounded => {}
}
}
self.right.result.take()
}
}
impl<'a> Iterator for StorageIterator<'a> {
type Item = Result<(Key, Value)>;
fn next(&mut self) -> Option<Self::Item> {
if let Some(trans) = self.trans {
assert!(trans.status == TransactionStatus::InProgress);
self.next_locked(&trans.db)
} else {
let db = self.storage.db.read().unwrap();
self.next_locked(&db)
}
}
}
impl<'a> DoubleEndedIterator for StorageIterator<'a> {
fn next_back(&mut self) -> Option<Self::Item> {
if let Some(trans) = self.trans {
assert!(trans.status == TransactionStatus::InProgress);
self.next_back_locked(&trans.db)
} else {
let db = self.storage.db.read().unwrap();
self.next_back_locked(&db)
}
}
}
pub struct Storage {
db: RwLock<Database>,
buf_mgr: Mutex<BufferManager>,
busy_events: [Condvar; N_BUSY_EVENTS],
pool: Vec<RwLock<PageData>>,
checkpoint_interval: u64,
file: File,
log: Option<File>,
}
#[derive(Clone, Copy, Default)]
struct PageHeader {
id: PageId,
collision: BufferId, next: BufferId,
prev: BufferId,
access_count: u16,
state: u16, }
impl PageHeader {
fn new() -> PageHeader {
Default::default()
}
}
#[derive(Copy, Clone)]
struct Metadata {
magic: u32, version: u32, free: PageId, size: PageId, used: PageId, root: PageId, height: u32, }
impl Metadata {
fn pack(self) -> [u8; METADATA_SIZE] {
unsafe { std::mem::transmute::<Metadata, [u8; METADATA_SIZE]>(self) }
}
fn unpack(buf: &[u8]) -> Metadata {
unsafe {
std::mem::transmute::<[u8; METADATA_SIZE], Metadata>(
buf[0..METADATA_SIZE].try_into().unwrap(),
)
}
}
}
struct Database {
meta: Metadata, meta_updated: bool, lsn: LSN, n_aborted_txns: LSN, state: DatabaseState, wal_pos: u64, recovery: RecoveryStatus, }
impl Database {
fn get_info(&self) -> DatabaseInfo {
DatabaseInfo {
db_size: self.meta.size as u64 * PAGE_SIZE as u64,
db_used: self.meta.used as u64 * PAGE_SIZE as u64,
tree_height: self.meta.height as usize,
log_size: self.wal_pos,
state: self.state,
n_committed_transactions: self.lsn,
n_aborted_transactions: self.n_aborted_txns,
}
}
}
struct BufferManager {
head: BufferId,
tail: BufferId,
free_pages: BufferId, dirty_pages: BufferId, used: BufferId, pinned: BufferId, cached: BufferId,
hash_table: Vec<BufferId>, pages: Vec<PageHeader>, }
struct PageData {
data: [u8; PAGE_SIZE],
}
impl PageData {
fn new() -> PageData {
PageData {
data: [0u8; PAGE_SIZE],
}
}
}
impl PageData {
fn get_offs(&self, ip: ItemPointer) -> usize {
self.get_u16(PAGE_HEADER_SIZE + ip * 2) as usize
}
fn set_offs(&mut self, ip: ItemPointer, offs: usize) {
self.set_u16(PAGE_HEADER_SIZE + ip * 2, offs as u16)
}
fn get_child(&self, ip: ItemPointer) -> PageId {
let offs = self.get_offs(ip);
let key_len = self.data[offs] as usize;
self.get_u32(offs + key_len + 1)
}
fn get_key(&self, ip: ItemPointer) -> Key {
let offs = self.get_offs(ip);
let key_len = self.data[offs] as usize;
self.data[offs + 1..offs + 1 + key_len].to_vec()
}
fn get_last_key(&self) -> Key {
let n_items = self.get_n_items();
self.get_key(n_items - 1)
}
fn get_item(&self, ip: ItemPointer) -> (Key, Value) {
let (item_offs, item_len) = self.get_item_offs_len(ip);
let key_len = self.data[item_offs] as usize;
(
self.data[item_offs + 1..item_offs + 1 + key_len].to_vec(),
self.data[item_offs + 1 + key_len..item_offs + item_len].to_vec(),
)
}
fn get_item_offs_len(&self, ip: ItemPointer) -> (usize, usize) {
let offs = self.get_offs(ip);
let next_offs = if ip == 0 {
PAGE_SIZE
} else {
self.get_offs(ip - 1)
};
debug_assert!(next_offs > offs);
(offs, next_offs - offs)
}
fn set_u16(&mut self, offs: usize, data: u16) {
self.copy(offs, &data.to_be_bytes());
}
fn set_u32(&mut self, offs: usize, data: u32) {
self.copy(offs, &data.to_be_bytes());
}
fn get_u16(&self, offs: usize) -> u16 {
u16::from_be_bytes(self.data[offs..offs + 2].try_into().unwrap())
}
fn get_u32(&self, offs: usize) -> u32 {
u32::from_be_bytes(self.data[offs..offs + 4].try_into().unwrap())
}
fn get_n_items(&self) -> ItemPointer {
self.get_u16(0) as ItemPointer
}
fn get_size(&self) -> ItemPointer {
let n_items = self.get_n_items();
if n_items == 0 {
0
} else {
PAGE_SIZE - self.get_offs(n_items - 1)
}
}
fn set_n_items(&mut self, n_items: ItemPointer) {
self.set_u16(0, n_items as u16)
}
fn copy(&mut self, offs: usize, data: &[u8]) {
let len = data.len();
self.data[offs..offs + len].copy_from_slice(&data);
}
fn compare_key(&self, ip: ItemPointer, key: &Key) -> Ordering {
let offs = self.get_offs(ip);
let key_len = self.data[offs] as usize;
if key_len == 0 {
Ordering::Less
} else {
key[..].cmp(&self.data[offs + 1..offs + 1 + key_len])
}
}
fn remove_key(&mut self, ip: ItemPointer) {
let n_items = self.get_n_items();
let size = self.get_size();
let (item_offs, item_len) = self.get_item_offs_len(ip);
for i in ip + 1..n_items {
self.set_offs(i - 1, self.get_offs(i) + item_len);
}
let items_origin = PAGE_SIZE - size;
if n_items > 1 && ip + 1 == n_items && self.data[item_offs] == 0 {
let prev_key_len = self.data[item_offs + item_len] as usize;
self.set_offs(ip - 1, item_offs + item_len + prev_key_len);
self.data[item_offs + item_len + prev_key_len] = 0u8; } else {
self.data
.copy_within(items_origin..item_offs, items_origin + item_len);
}
self.set_n_items(n_items - 1);
}
fn insert_item(&mut self, ip: ItemPointer, key: &Key, value: &[u8]) -> bool {
let n_items = self.get_n_items();
let size = self.get_size();
let key_len = key.len();
let item_len = 1 + key_len + value.len();
if (n_items + 1) * 2 + size + item_len <= PAGE_SIZE - PAGE_HEADER_SIZE {
for i in (ip..n_items).rev() {
self.set_offs(i + 1, self.get_offs(i) - item_len);
}
let item_offs = if ip != 0 {
self.get_offs(ip - 1) - item_len
} else {
PAGE_SIZE - item_len
};
self.set_offs(ip, item_offs);
let items_origin = PAGE_SIZE - size;
self.data
.copy_within(items_origin..item_offs + item_len, items_origin - item_len);
self.data[item_offs] = key_len as u8;
self.data[item_offs + 1..item_offs + 1 + key_len].copy_from_slice(&key);
self.data[item_offs + 1 + key_len..item_offs + item_len].copy_from_slice(&value);
self.set_n_items(n_items + 1);
true
} else {
false
}
}
fn split(&mut self, new_page: &mut PageData) -> ItemPointer {
let n_items = self.get_n_items();
let size = self.get_size();
let margin = PAGE_SIZE - size / 2;
let mut l: ItemPointer = 0;
let mut r = n_items;
while l < r {
let m = (l + r) >> 1;
if self.get_offs(m) > margin {
l = m + 1;
} else {
r = m;
}
}
debug_assert!(l == r);
let moved_size = PAGE_SIZE - self.get_offs(r);
new_page.data[PAGE_HEADER_SIZE..PAGE_HEADER_SIZE + (r + 1) * 2]
.copy_from_slice(&self.data[PAGE_HEADER_SIZE..PAGE_HEADER_SIZE + (r + 1) * 2]);
let dst = PAGE_SIZE - moved_size;
new_page.data[dst..].copy_from_slice(&self.data[dst..]);
for i in r + 1..n_items {
self.set_offs(i - r - 1, self.get_offs(i) + moved_size);
}
let src = PAGE_SIZE - size;
self.data.copy_within(src..dst, src + moved_size);
new_page.set_n_items(r + 1);
self.set_n_items(n_items - r - 1);
r
}
}
impl BufferManager {
fn unpin(&mut self, id: BufferId) {
debug_assert!(self.pages[id as usize].access_count == 1);
self.pages[id as usize].access_count = 0;
self.pages[id as usize].next = self.head;
self.pages[id as usize].prev = 0;
self.pinned -= 1;
if self.head != 0 {
self.pages[self.head as usize].prev = id;
} else {
self.tail = id;
}
self.head = id;
}
fn pin(&mut self, id: BufferId) {
debug_assert!(self.pages[id as usize].access_count == 0);
let prev = self.pages[id as usize].prev as usize;
if prev == 0 {
self.head = self.pages[id as usize].next;
} else {
self.pages[prev].next = self.pages[id as usize].next;
}
let next = self.pages[id as usize].next as usize;
if next == 0 {
self.tail = self.pages[id as usize].prev;
} else {
self.pages[next].prev = self.pages[id as usize].prev;
}
self.pinned += 1;
}
fn insert(&mut self, id: BufferId) {
let h = self.pages[id as usize].id as usize % self.hash_table.len();
self.pages[id as usize].collision = self.hash_table[h];
self.hash_table[h] = id;
}
fn remove(&mut self, id: BufferId) {
let h = self.pages[id as usize].id as usize % self.hash_table.len();
let mut p = self.hash_table[h];
if p == id {
self.hash_table[h] = self.pages[id as usize].collision;
} else {
while self.pages[p as usize].collision != id {
p = self.pages[p as usize].collision;
}
self.pages[p as usize].collision = self.pages[id as usize].collision;
}
}
fn throw_buffer(&mut self, id: BufferId) {
self.remove(id);
self.pages[id as usize].next = self.free_pages;
self.free_pages = id;
self.cached -= 1;
}
fn modify_buffer(&mut self, id: BufferId) {
debug_assert!(self.pages[id as usize].access_count > 0);
if (self.pages[id as usize].state & PAGE_DIRTY) == 0 {
self.pages[id as usize].access_count += 1; self.pages[id as usize].state = PAGE_DIRTY;
self.pages[id as usize].next = self.dirty_pages;
self.dirty_pages = id;
}
}
fn release_buffer(&mut self, id: BufferId) {
debug_assert!(self.pages[id as usize].access_count > 0);
if self.pages[id as usize].access_count == 1 {
debug_assert!((self.pages[id as usize].state & PAGE_DIRTY) == 0);
self.unpin(id);
} else {
self.pages[id as usize].access_count -= 1;
}
}
fn get_buffer(&mut self, pid: PageId) -> Result<BufferId> {
let hash = pid as usize % self.hash_table.len();
let mut h = self.hash_table[hash];
while h != 0 {
if self.pages[h as usize].id == pid {
let access_count = self.pages[h as usize].access_count;
debug_assert!(access_count < u16::MAX - 1);
if access_count == 0 {
self.pin(h);
}
self.pages[h as usize].access_count = access_count + 1;
return Ok(h);
}
h = self.pages[h as usize].collision;
}
h = self.free_pages;
if h != 0 {
self.free_pages = self.pages[h as usize].next;
self.cached += 1;
self.pinned += 1;
} else {
h = self.used;
if (h as usize) < self.hash_table.len() {
self.used += 1;
self.cached += 1;
self.pinned += 1;
} else {
let victim = self.tail;
ensure!(victim != 0);
debug_assert!(self.pages[victim as usize].access_count == 0);
debug_assert!((self.pages[victim as usize].state & PAGE_DIRTY) == 0);
self.pin(victim);
self.remove(victim);
h = victim;
}
}
self.pages[h as usize].access_count = 1;
self.pages[h as usize].id = pid;
self.pages[h as usize].state = PAGE_RAW;
self.insert(h);
Ok(h)
}
}
struct PageGuard<'a> {
buf: BufferId,
pid: PageId,
storage: &'a Storage,
}
impl<'a> Drop for PageGuard<'a> {
fn drop(&mut self) {
self.storage.release_page(self.buf);
}
}
impl Storage {
fn release_page(&self, buf: BufferId) {
let mut bm = self.buf_mgr.lock().unwrap();
bm.release_buffer(buf);
}
fn new_page(&self, db: &mut RwLockWriteGuard<Database>) -> Result<PageGuard<'_>> {
let mut bm = self.buf_mgr.lock().unwrap();
let free = db.meta.free;
let buf;
if free != 0 {
buf = bm.get_buffer(free)?;
let page = self.pool[buf as usize].read().unwrap();
db.meta.free = page.get_u32(0);
} else {
buf = bm.get_buffer(db.meta.size)?;
db.meta.size += 1;
}
db.meta.used += 1;
db.meta_updated = true;
bm.modify_buffer(buf);
Ok(PageGuard {
buf,
pid: bm.pages[buf as usize].id,
storage: &self,
})
}
fn get_page(&self, pid: PageId, mode: AccessMode) -> Result<PageGuard<'_>> {
let mut bm = self.buf_mgr.lock().unwrap();
let buf = bm.get_buffer(pid)?;
if (bm.pages[buf as usize].state & PAGE_BUSY) != 0 {
bm.pages[buf as usize].state |= PAGE_WAIT;
loop {
debug_assert!((bm.pages[buf as usize].state & PAGE_WAIT) != 0);
bm = self.busy_events[buf as usize % N_BUSY_EVENTS]
.wait(bm)
.unwrap();
if (bm.pages[buf as usize].state & PAGE_BUSY) == 0 {
break;
}
}
} else if (bm.pages[buf as usize].state & PAGE_RAW) != 0 {
let mut page = self.pool[buf as usize].write().unwrap();
if mode != AccessMode::WriteOnly {
bm.pages[buf as usize].state = PAGE_BUSY;
drop(bm); self.file
.read_exact_at(&mut page.data, pid as u64 * PAGE_SIZE as u64)?;
bm = self.buf_mgr.lock().unwrap();
if (bm.pages[buf as usize].state & PAGE_WAIT) != 0 {
self.busy_events[buf as usize % N_BUSY_EVENTS].notify_all();
}
} else {
page.data.fill(0u8); }
bm.pages[buf as usize].state = 0;
}
if mode != AccessMode::ReadOnly {
bm.modify_buffer(buf);
}
Ok(PageGuard {
buf,
pid,
storage: &self,
})
}
fn modify_page(&self, buf: BufferId) {
let mut bm = self.buf_mgr.lock().unwrap();
bm.modify_buffer(buf);
}
pub fn start_transaction(&self) -> Transaction<'_> {
Transaction {
status: TransactionStatus::InProgress,
storage: self,
db: self.db.write().unwrap(),
}
}
fn commit(&self, db: &mut RwLockWriteGuard<Database>) -> Result<()> {
let bm = self.buf_mgr.lock().unwrap();
if db.meta_updated {
let meta = db.meta.pack();
let mut page = self.pool[0].write().unwrap();
page.data[0..METADATA_SIZE].copy_from_slice(&meta);
}
if let Some(log) = &self.log {
let mut dirty = bm.dirty_pages;
let mut crc = 0u32;
while dirty != 0 {
let mut buf = [0u8; PAGE_SIZE + 4];
let pid = bm.pages[dirty as usize].id;
let page = self.pool[dirty as usize].read().unwrap();
buf[0..4].copy_from_slice(&pid.to_be_bytes());
buf[4..].copy_from_slice(&page.data);
crc = crc32c_append(crc, &buf);
log.write_all_at(&buf, db.wal_pos)?;
db.wal_pos += (4 + PAGE_SIZE) as u64;
dirty = bm.pages[dirty as usize].next;
}
if bm.dirty_pages != 0 {
let mut buf = [0u8; METADATA_SIZE + 8];
{
let page = self.pool[0].read().unwrap();
buf[4..4 + METADATA_SIZE].copy_from_slice(&page.data[0..METADATA_SIZE]);
}
crc = crc32c_append(crc, &buf[..4 + METADATA_SIZE]);
buf[4 + METADATA_SIZE..].copy_from_slice(&crc.to_be_bytes());
log.write_all_at(&buf, db.wal_pos)?;
db.wal_pos += (8 + METADATA_SIZE) as u64;
log.sync_all()?;
db.lsn += 1;
self.flush_buffers(bm, db.meta_updated)?;
if db.wal_pos >= self.checkpoint_interval {
self.file.sync_all()?;
db.wal_pos = 0;
}
}
} else {
if self.flush_buffers(bm, db.meta_updated)? {
db.lsn += 1;
}
}
db.meta_updated = false;
Ok(())
}
fn flush_buffers(&self, mut bm: MutexGuard<BufferManager>, save_meta: bool) -> Result<bool> {
let mut dirty = bm.dirty_pages;
if save_meta {
assert!(dirty != 0); let page = self.pool[0].read().unwrap();
self.file.write_all_at(&page.data, 0)?;
}
while dirty != 0 {
let pid = bm.pages[dirty as usize].id;
let file_offs = pid as u64 * PAGE_SIZE as u64;
let page = self.pool[dirty as usize].read().unwrap();
let next = bm.pages[dirty as usize].next;
self.file.write_all_at(&page.data, file_offs)?;
debug_assert!(bm.pages[dirty as usize].state == PAGE_DIRTY);
bm.pages[dirty as usize].state = 0;
bm.unpin(dirty);
dirty = next;
}
if bm.dirty_pages != 0 {
bm.dirty_pages = 0;
Ok(true)
} else {
Ok(false)
}
}
fn rollback(&self, db: &mut RwLockWriteGuard<Database>) -> Result<()> {
let mut bm = self.buf_mgr.lock().unwrap();
let mut dirty = bm.dirty_pages;
while dirty != 0 {
debug_assert!((bm.pages[dirty as usize].state & PAGE_DIRTY) != 0);
debug_assert!(bm.pages[dirty as usize].access_count == 1);
let next = bm.pages[dirty as usize].next;
bm.throw_buffer(dirty);
dirty = next;
}
bm.dirty_pages = 0;
if db.meta_updated {
let mut page = self.pool[0].write().unwrap();
self.file.read_exact_at(&mut page.data, 0)?;
db.meta = Metadata::unpack(&page.data);
db.meta_updated = false;
}
db.n_aborted_txns += 1;
Ok(())
}
pub fn open(
db_path: &Path,
log_path: Option<&Path>,
cache_size: usize,
checkpoint_interval: u64,
) -> Result<Storage> {
let mut buf = [0u8; PAGE_SIZE];
let (file, meta) = if let Ok(file) = OpenOptions::new().write(true).read(true).open(db_path)
{
file.try_lock_exclusive()?;
file.read_exact_at(&mut buf, 0)?;
let meta = Metadata::unpack(&buf);
ensure!(meta.magic == MAGIC && meta.version == VERSION && meta.size >= 1);
(file, meta)
} else {
let file = OpenOptions::new()
.write(true)
.read(true)
.create(true)
.open(db_path)?;
file.try_lock_exclusive()?;
let meta = Metadata {
magic: MAGIC,
version: VERSION,
free: 0,
size: 1,
used: 1,
root: 0,
height: 0,
};
let metadata = meta.pack();
buf[0..METADATA_SIZE].copy_from_slice(&metadata);
file.write_all_at(&mut buf, 0)?;
(file, meta)
};
let log = if let Some(path) = log_path {
let log = OpenOptions::new()
.write(true)
.read(true)
.create(true)
.open(path)?;
log.try_lock_exclusive()?;
Some(log)
} else {
None
};
let storage = Storage {
busy_events: [(); N_BUSY_EVENTS].map(|_| Condvar::new()),
buf_mgr: Mutex::new(BufferManager {
head: 0,
tail: 0,
free_pages: 0,
dirty_pages: 0,
used: 1, cached: 1,
pinned: 1,
hash_table: vec![0; cache_size],
pages: vec![PageHeader::new(); cache_size],
}),
pool: iter::repeat_with(|| RwLock::new(PageData::new()))
.take(cache_size)
.collect(),
file,
log,
checkpoint_interval,
db: RwLock::new(Database {
lsn: 0,
n_aborted_txns: 0,
meta,
meta_updated: false,
recovery: RecoveryStatus {
..Default::default()
},
state: DatabaseState::InRecovery,
wal_pos: 0,
}),
};
storage.recovery()?;
Ok(storage)
}
fn recovery(&self) -> Result<()> {
let mut db = self.db.write().unwrap();
if let Some(log) = &self.log {
let mut buf = [0u8; 4];
let mut crc = 0u32;
let mut wal_pos = 0u64;
db.recovery.wal_size = log.metadata()?.len();
loop {
let len = log.read_at(&mut buf, wal_pos)?;
if len != 4 {
break;
}
wal_pos += 4;
let pid = PageId::from_be_bytes(buf);
crc = crc32c_append(crc, &buf);
if pid != 0 {
let pin = self.get_page(pid, AccessMode::WriteOnly)?;
let mut page = self.pool[pin.buf as usize].write().unwrap();
let len = log.read_at(&mut page.data, wal_pos)?;
if len != PAGE_SIZE {
break;
}
wal_pos += len as u64;
crc = crc32c_append(crc, &page.data);
} else {
let mut meta_buf = [0u8; METADATA_SIZE];
let len = log.read_at(&mut meta_buf, wal_pos)?;
if len != PAGE_SIZE {
break;
}
wal_pos += len as u64;
crc = crc32c_append(crc, &meta_buf);
let len = log.read_at(&mut buf, wal_pos)?;
if len != 4 {
break;
}
wal_pos += 4;
if u32::from_be_bytes(buf) != crc {
break;
}
{
let mut page = self.pool[0].write().unwrap();
page.data[0..METADATA_SIZE].copy_from_slice(&meta_buf);
db.meta_updated = true;
}
let bm = self.buf_mgr.lock().unwrap();
self.flush_buffers(bm, true)?;
db.meta_updated = false;
db.recovery.recovered_transactions += 1;
db.recovery.recovery_end = wal_pos;
}
}
self.rollback(&mut db)?;
self.file.sync_all()?;
db.wal_pos = 0;
log.set_len(0)?; }
let mut page = self.pool[0].write().unwrap();
self.file.read_exact_at(&mut page.data, 0)?;
db.meta = Metadata::unpack(&page.data);
db.state = DatabaseState::Opened;
Ok(())
}
fn do_updates(
&self,
db: &mut RwLockWriteGuard<Database>,
to_upsert: &mut dyn Iterator<Item = Result<(Key, Value)>>,
to_remove: &mut dyn Iterator<Item = Result<Key>>,
) -> Result<()> {
for pair in to_upsert {
let kv = pair?;
self.do_upsert(db, &kv.0, &kv.1)?;
}
for key in to_remove {
self.do_remove(db, &key?)?;
}
Ok(())
}
fn btree_allocate_leaf_page(
&self,
db: &mut RwLockWriteGuard<Database>,
key: &Key,
value: &Value,
) -> Result<PageId> {
let pin = self.new_page(db)?;
let mut page = self.pool[pin.buf as usize].write().unwrap();
page.set_n_items(0);
page.insert_item(0, key, value);
Ok(pin.pid)
}
fn btree_allocate_internal_page(
&self,
db: &mut RwLockWriteGuard<Database>,
key: &Key,
left_child: PageId,
right_child: PageId,
) -> Result<PageId> {
let pin = self.new_page(db)?;
let mut page = self.pool[pin.buf as usize].write().unwrap();
page.set_n_items(0);
debug_assert!(left_child != 0);
debug_assert!(right_child != 0);
page.insert_item(0, key, &left_child.to_be_bytes().to_vec());
page.insert_item(1, &vec![], &right_child.to_be_bytes().to_vec());
Ok(pin.pid)
}
fn btree_insert_in_page(
&self,
db: &mut RwLockWriteGuard<Database>,
page: &mut PageData,
ip: ItemPointer,
key: &Key,
value: &Value,
) -> Result<Option<(Key, PageId)>> {
if !page.insert_item(ip, key, value) {
let pin = self.new_page(db)?;
let mut new_page = self.pool[pin.buf as usize].write().unwrap();
let split = page.split(&mut new_page);
let ok = if ip > split {
page.insert_item(ip - split - 1, key, value)
} else {
new_page.insert_item(ip, key, value)
};
ensure!(ok);
Ok(Some((new_page.get_last_key(), pin.pid)))
} else {
Ok(None)
}
}
fn btree_remove(
&self,
db: &mut RwLockWriteGuard<Database>,
pid: PageId,
key: &Key,
height: u32,
) -> Result<bool> {
let pin = self.get_page(pid, AccessMode::ReadOnly)?;
let mut page = self.pool[pin.buf as usize].write().unwrap();
let mut l: ItemPointer = 0;
let n = page.get_n_items();
let mut r = n;
while l < r {
let m = (l + r) >> 1;
if page.compare_key(m, key) == Ordering::Greater {
l = m + 1;
} else {
r = m;
}
}
debug_assert!(l == r);
if height == 1 {
if r < n && page.compare_key(r, key) == Ordering::Equal {
self.modify_page(pin.buf);
page.remove_key(r);
}
} else {
debug_assert!(r < n);
let underflow = self.btree_remove(db, page.get_child(r), key, height - 1)?;
if underflow {
self.modify_page(pin.buf);
page.remove_key(r);
}
}
if page.get_n_items() == 0 {
page.set_u32(0, db.meta.free);
db.meta.free = pid;
db.meta.used -= 1;
db.meta_updated = true;
Ok(true)
} else {
Ok(false)
}
}
fn btree_insert(
&self,
db: &mut RwLockWriteGuard<Database>,
pid: PageId,
key: &Key,
value: &Value,
height: u32,
) -> Result<Option<(Key, PageId)>> {
let pin = self.get_page(pid, AccessMode::ReadOnly)?;
let mut page = self.pool[pin.buf as usize].write().unwrap();
let mut l: ItemPointer = 0;
let n = page.get_n_items();
let mut r = n;
while l < r {
let m = (l + r) >> 1;
if page.compare_key(m, key) == Ordering::Greater {
l = m + 1;
} else {
r = m;
}
}
debug_assert!(l == r);
if height == 1 {
self.modify_page(pin.buf);
if r < n && page.compare_key(r, key) == Ordering::Equal {
page.remove_key(r);
}
self.btree_insert_in_page(db, &mut page, r, key, value)
} else {
debug_assert!(r < n);
let overflow = self.btree_insert(db, page.get_child(r), key, value, height - 1)?;
if let Some((key, child)) = overflow {
self.modify_page(pin.buf);
debug_assert!(child != 0);
self.btree_insert_in_page(db, &mut page, r, &key, &child.to_be_bytes().to_vec())
} else {
Ok(None)
}
}
}
fn do_upsert(
&self,
db: &mut RwLockWriteGuard<Database>,
key: &Key,
value: &Value,
) -> Result<()> {
ensure!(key.len() != 0 && key.len() <= MAX_KEY_LEN && value.len() <= MAX_VALUE_LEN);
if db.meta.root == 0 {
db.meta.root = self.btree_allocate_leaf_page(db, key, value)?;
db.meta.height = 1;
db.meta_updated = true;
} else if let Some((key, page)) =
self.btree_insert(db, db.meta.root, key, value, db.meta.height)?
{
db.meta.root = self.btree_allocate_internal_page(db, &key, page, db.meta.root)?;
db.meta.height += 1;
db.meta_updated = true;
}
Ok(())
}
fn do_remove(&self, db: &mut RwLockWriteGuard<Database>, key: &Key) -> Result<()> {
if db.meta.root != 0 {
let underflow = self.btree_remove(db, db.meta.root, key, db.meta.height)?;
if underflow {
db.meta.height = 0;
db.meta.root = 0;
db.meta_updated = true;
}
}
Ok(())
}
fn do_lookup(
&self,
db: &Database,
op: LookupOp,
path: &mut TreePath,
) -> Result<Option<(Key, Value)>> {
ensure!(db.state == DatabaseState::Opened);
match op {
LookupOp::First => {
let mut pid = db.meta.root;
if pid != 0 {
let mut level = db.meta.height;
loop {
let pin = self.get_page(pid, AccessMode::ReadOnly)?;
let page = self.pool[pin.buf as usize].read().unwrap();
path.stack.push(PagePos { pid, pos: 0 });
level -= 1;
if level == 0 {
path.curr = Some(page.get_item(0));
path.lsn = db.lsn;
break;
} else {
pid = page.get_child(0)
}
}
}
}
LookupOp::Last => {
let mut pid = db.meta.root;
if pid != 0 {
let mut level = db.meta.height;
loop {
let pin = self.get_page(pid, AccessMode::ReadOnly)?;
let page = self.pool[pin.buf as usize].read().unwrap();
let pos = page.get_n_items() - 1;
level -= 1;
path.stack.push(PagePos { pid, pos });
if level == 0 {
path.curr = Some(page.get_item(pos));
path.lsn = db.lsn;
break;
} else {
pid = page.get_child(pos)
}
}
}
}
LookupOp::Next => {
if path.lsn == db.lsn || self.reconstruct_path(path, db)? {
self.move_forward(path, db.meta.height)?;
}
}
LookupOp::Prev => {
if path.lsn == db.lsn || self.reconstruct_path(path, db)? {
self.move_backward(path, db.meta.height)?;
}
}
LookupOp::GreaterOrEqual(key) => {
if db.meta.root != 0 && self.find(db.meta.root, path, &key, db.meta.height)? {
path.lsn = db.lsn;
}
}
}
Ok(path.curr.clone())
}
fn lookup(&self, db: &Database, op: LookupOp, path: &mut TreePath) {
let result = self.do_lookup(db, op, path);
if result.is_err() {
path.curr = None;
}
path.result = result.transpose();
}
fn find(&self, pid: PageId, path: &mut TreePath, key: &Key, height: u32) -> Result<bool> {
let pin = self.get_page(pid, AccessMode::ReadOnly)?;
let page = self.pool[pin.buf as usize].read().unwrap();
let n = page.get_n_items();
let mut l: ItemPointer = 0;
let mut r = n;
while l < r {
let m = (l + r) >> 1;
if page.compare_key(m, key) == Ordering::Greater {
l = m + 1;
} else {
r = m;
}
}
debug_assert!(l == r);
path.stack.push(PagePos { pid, pos: r });
if height == 1 {
if r < n {
path.curr = Some(page.get_item(r));
Ok(true)
} else {
path.curr = None;
path.stack.clear();
Ok(false)
}
} else {
debug_assert!(r < n);
debug_assert!(page.get_child(r) != 0);
self.find(page.get_child(r), path, key, height - 1)
}
}
fn reconstruct_path(&self, path: &mut TreePath, db: &Database) -> Result<bool> {
path.stack.clear();
if let Some((key, _value)) = &path.curr.clone() {
if self.find(db.meta.root, path, &key, db.meta.height)? {
if let Some((ge_key, _value)) = &path.curr {
if ge_key == key {
path.lsn = db.lsn;
return Ok(true);
}
}
}
}
path.curr = None;
Ok(false)
}
fn move_forward(&self, path: &mut TreePath, height: u32) -> Result<()> {
let mut inc: usize = 1;
path.curr = None;
while !path.stack.is_empty() {
let top = path.stack.pop().unwrap();
let pin = self.get_page(top.pid, AccessMode::ReadOnly)?;
let page = self.pool[pin.buf as usize].read().unwrap();
let n_items = page.get_n_items();
let pos = top.pos + inc;
if pos < n_items {
path.stack.push(PagePos {
pid: top.pid,
pos: pos,
});
if path.stack.len() == height as usize {
let item = page.get_item(pos);
path.curr = Some(item);
break;
}
debug_assert!(page.get_child(pos) != 0);
path.stack.push(PagePos {
pid: page.get_child(pos),
pos: 0,
});
inc = 0;
} else {
inc = 1;
}
}
Ok(())
}
fn move_backward(&self, path: &mut TreePath, height: u32) -> Result<()> {
path.curr = None;
while !path.stack.is_empty() {
let top = path.stack.pop().unwrap();
let pin = self.get_page(top.pid, AccessMode::ReadOnly)?;
let page = self.pool[pin.buf as usize].read().unwrap();
let pos = if top.pos == usize::MAX {
page.get_n_items()
} else {
top.pos
};
if pos != 0 {
path.stack.push(PagePos {
pid: top.pid,
pos: pos - 1,
});
if path.stack.len() == height as usize {
let item = page.get_item(pos - 1);
path.curr = Some(item);
break;
}
path.stack.push(PagePos {
pid: page.get_child(pos - 1),
pos: usize::MAX, });
}
}
Ok(())
}
fn traverse(&self, pid: PageId, prev_key: &mut Key, height: u32) -> Result<u64> {
let pin = self.get_page(pid, AccessMode::ReadOnly)?;
let page = self.pool[pin.buf as usize].read().unwrap();
let n_items = page.get_n_items();
let mut count = 0u64;
if height == 1 {
for i in 0..n_items {
ensure!(page.compare_key(i, prev_key) == Ordering::Less);
*prev_key = page.get_key(i);
}
count += n_items as u64;
} else {
for i in 0..n_items {
count += self.traverse(page.get_child(i), prev_key, height - 1)?;
let ord = page.compare_key(i, prev_key);
ensure!(ord == Ordering::Less || ord == Ordering::Equal);
}
}
Ok(count)
}
}
impl Storage {
pub fn update(
&self,
to_upsert: &mut dyn Iterator<Item = Result<(Key, Value)>>,
to_remove: &mut dyn Iterator<Item = Result<Key>>,
) -> Result<()> {
let mut db = self.db.write().unwrap(); ensure!(db.state == DatabaseState::Opened);
let mut result = self.do_updates(&mut db, to_upsert, to_remove);
if result.is_ok() {
result = self.commit(&mut db);
if !result.is_ok() {
db.state = DatabaseState::Corrupted;
}
} else {
if !self.rollback(&mut db).is_ok() {
db.state = DatabaseState::Corrupted;
}
}
result
}
pub fn verify(&self) -> Result<u64> {
let db = self.db.read().unwrap();
ensure!(db.state == DatabaseState::Opened);
if db.meta.root != 0 {
let mut prev_key = Vec::new();
self.traverse(db.meta.root, &mut prev_key, db.meta.height)
} else {
Ok(0)
}
}
pub fn put(&self, key: Key, value: Value) -> Result<()> {
self.update(&mut iter::once(Ok((key, value))), &mut iter::empty())
}
pub fn put_u32(&self, key: u32, value: Value) -> Result<()> {
self.put(key.to_be_bytes().to_vec(), value)
}
pub fn put_u64(&self, key: u64, value: Value) -> Result<()> {
self.put(key.to_be_bytes().to_vec(), value)
}
pub fn put_all(&self, pairs: &mut dyn Iterator<Item = Result<(Key, Value)>>) -> Result<()> {
self.update(pairs, &mut iter::empty())
}
pub fn remove(&self, key: Key) -> Result<()> {
self.update(&mut iter::empty(), &mut iter::once(Ok(key)))
}
pub fn remove_u32(&self, key: u32) -> Result<()> {
self.remove(key.to_be_bytes().to_vec())
}
pub fn remove_u64(&self, key: u64) -> Result<()> {
self.remove(key.to_be_bytes().to_vec())
}
pub fn remove_all(&self, keys: &mut dyn Iterator<Item = Result<Key>>) -> Result<()> {
self.update(&mut iter::empty(), keys)
}
pub fn iter(&self) -> StorageIterator<'_> {
self.range(..)
}
pub fn get(&self, key: &Key) -> Result<Option<Value>> {
let mut iter = self.range((Included(key), Included(key)));
Ok(iter.next().transpose()?.map(|kv| kv.1))
}
pub fn get_u32(&self, key: u32) -> Result<Option<Value>> {
self.get(&key.to_be_bytes().to_vec())
}
pub fn get_u64(&self, key: u64) -> Result<Option<Value>> {
self.get(&key.to_be_bytes().to_vec())
}
pub fn range<R: RangeBounds<Key>>(&self, range: R) -> StorageIterator<'_> {
StorageIterator {
storage: &self,
trans: None,
from: range.start_bound().cloned(),
till: range.end_bound().cloned(),
left: TreePath::new(),
right: TreePath::new(),
}
}
pub fn close(&self) -> Result<()> {
if let Ok(mut db) = self.db.write() {
if db.state == DatabaseState::Opened {
let mut delayed_commit = false;
if let Ok(bm) = self.buf_mgr.lock() {
if bm.dirty_pages != 0 {
delayed_commit = true;
}
}
if delayed_commit {
self.commit(&mut db)?;
}
self.file.sync_all()?;
if let Some(log) = &self.log {
log.set_len(0)?; }
db.state = DatabaseState::Closed;
}
}
Ok(())
}
pub fn get_recovery_status(&self) -> RecoveryStatus {
let db = self.db.read().unwrap();
db.recovery
}
pub fn get_database_info(&self) -> DatabaseInfo {
let db = self.db.read().unwrap();
db.get_info()
}
pub fn get_cache_info(&self) -> CacheInfo {
let bm = self.buf_mgr.lock().unwrap();
CacheInfo {
used: bm.cached as usize,
pinned: bm.pinned as usize,
}
}
}
impl Drop for Storage {
fn drop(&mut self) {
self.close().unwrap();
}
}
impl<'a> Transaction<'_> {
pub fn commit(&mut self) -> Result<()> {
ensure!(self.status == TransactionStatus::InProgress);
self.storage.commit(&mut self.db)?;
self.status = TransactionStatus::Committed;
Ok(())
}
pub fn delay(&mut self) -> Result<()> {
ensure!(self.status == TransactionStatus::InProgress);
self.db.lsn += 1;
self.status = TransactionStatus::Committed;
Ok(())
}
pub fn rollback(&mut self) -> Result<()> {
ensure!(self.status == TransactionStatus::InProgress);
self.storage.rollback(&mut self.db)?;
self.status = TransactionStatus::Aborted;
Ok(())
}
pub fn put(&mut self, key: &Key, value: &Value) -> Result<()> {
ensure!(self.status == TransactionStatus::InProgress);
self.storage.do_upsert(&mut self.db, key, value)?;
Ok(())
}
pub fn put_u32(&mut self, key: u32, value: &Value) -> Result<()> {
self.put(&key.to_be_bytes().to_vec(), value)
}
pub fn put_u64(&mut self, key: u64, value: &Value) -> Result<()> {
self.put(&key.to_be_bytes().to_vec(), value)
}
pub fn remove(&mut self, key: &Key) -> Result<()> {
ensure!(self.status == TransactionStatus::InProgress);
self.storage.do_remove(&mut self.db, key)?;
Ok(())
}
pub fn remove_u32(&mut self, key: u32) -> Result<()> {
self.remove(&key.to_be_bytes().to_vec())
}
pub fn remove_u64(&mut self, key: u64) -> Result<()> {
self.remove(&key.to_be_bytes().to_vec())
}
pub fn iter(&self) -> StorageIterator<'_> {
self.range(..)
}
pub fn get(&self, key: &Key) -> Result<Option<Value>> {
let mut iter = self.range((Included(key), Included(key)));
Ok(iter.next().transpose()?.map(|kv| kv.1))
}
pub fn get_u32(&self, key: u32) -> Result<Option<Value>> {
self.get(&key.to_be_bytes().to_vec())
}
pub fn get_u64(&self, key: u64) -> Result<Option<Value>> {
self.get(&key.to_be_bytes().to_vec())
}
pub fn range<R: RangeBounds<Key>>(&self, range: R) -> StorageIterator<'_> {
StorageIterator {
storage: self.storage,
trans: Some(&self),
from: range.start_bound().cloned(),
till: range.end_bound().cloned(),
left: TreePath::new(),
right: TreePath::new(),
}
}
pub fn verify(&self) -> Result<u64> {
ensure!(self.status == TransactionStatus::InProgress);
if self.db.meta.root != 0 {
let mut prev_key = Vec::new();
self.storage
.traverse(self.db.meta.root, &mut prev_key, self.db.meta.height)
} else {
Ok(0)
}
}
pub fn get_database_info(&self) -> DatabaseInfo {
self.db.get_info()
}
pub fn get_cache_info(&self) -> CacheInfo {
self.storage.get_cache_info()
}
}
impl<'a> Drop for Transaction<'a> {
fn drop(&mut self) {
if self.status == TransactionStatus::InProgress {
self.storage.rollback(&mut self.db).unwrap();
}
}
}