use std::ops::Range;
use std::path::Path;
#[cfg(feature = "fuzz-reset")]
use std::path::PathBuf;
use bytes::Bytes;
use kimberlite_types::{Hash, Offset};
use crate::batch::{WriteBatch, WriteOp};
use crate::btree::{BTree, BTreeMeta};
use crate::cache::PageCache;
use crate::error::StoreError;
use crate::superblock::Superblock;
use crate::types::{PageId, TableId};
use crate::{Key, ProjectionStore};
const DEFAULT_CACHE_CAPACITY: usize = 4096;
pub struct BTreeStore {
cache: PageCache,
superblock: Superblock,
#[cfg(feature = "fuzz-reset")]
path: PathBuf,
#[cfg(feature = "fuzz-reset")]
cache_capacity: usize,
}
impl BTreeStore {
pub fn open(path: impl AsRef<Path>) -> Result<Self, StoreError> {
Self::open_with_capacity(path, DEFAULT_CACHE_CAPACITY)
}
pub fn open_with_capacity(
path: impl AsRef<Path>,
cache_capacity: usize,
) -> Result<Self, StoreError> {
let path_ref = path.as_ref();
let mut cache = PageCache::open(path_ref, Some(cache_capacity))?;
let superblock = if cache.next_page_id() == PageId::new(0) {
let sb = Superblock::new();
let page_id = cache.allocate(crate::page::PageType::Free)?;
debug_assert_eq!(page_id, PageId::SUPERBLOCK);
Self::write_superblock(&mut cache, &sb)?;
sb
} else {
let raw_data = cache.read_raw(PageId::SUPERBLOCK)?;
Superblock::deserialize(&raw_data)?
};
Ok(Self {
cache,
superblock,
#[cfg(feature = "fuzz-reset")]
path: path_ref.to_path_buf(),
#[cfg(feature = "fuzz-reset")]
cache_capacity,
})
}
#[cfg(feature = "fuzz-reset")]
pub fn reset(&mut self) -> Result<(), StoreError> {
use std::fs::File;
let path = self.path.clone();
let capacity = self.cache_capacity;
drop(File::create(&path)?);
*self = Self::open_with_capacity(&path, capacity)?;
Ok(())
}
fn write_superblock(cache: &mut PageCache, sb: &Superblock) -> Result<(), StoreError> {
let page = cache.get_mut(PageId::SUPERBLOCK)?;
if let Some(page) = page {
let sb_bytes = sb.serialize();
page.set_raw_data(&sb_bytes);
}
Ok(())
}
pub fn applied_position(&self) -> Offset {
self.superblock.applied_position
}
pub fn applied_hash(&self) -> Hash {
self.superblock.applied_hash
}
fn ensure_table(&mut self, table: TableId) -> BTreeMeta {
self.superblock
.tables
.get(&table)
.cloned()
.unwrap_or_default()
}
fn update_table(&mut self, table: TableId, meta: BTreeMeta) {
self.superblock.tables.insert(table, meta);
}
}
impl ProjectionStore for BTreeStore {
fn apply(&mut self, batch: WriteBatch) -> Result<(), StoreError> {
let pos = batch.position();
let expected = if self.superblock.applied_position == Offset::ZERO {
Offset::new(1)
} else {
Offset::new(self.superblock.applied_position.as_u64() + 1)
};
if pos != expected
&& !(self.superblock.applied_position == Offset::ZERO && pos == Offset::new(1))
{
return Err(StoreError::NonSequentialBatch {
expected: expected.as_u64(),
actual: pos.as_u64(),
});
}
for op in batch {
match op {
WriteOp::Put { table, key, value } => {
let mut meta = self.ensure_table(table);
{
let mut tree = BTree::new(&mut meta, &mut self.cache);
tree.put(key, value, pos)?;
}
self.update_table(table, meta);
}
WriteOp::Delete { table, key } => {
let mut meta = self.ensure_table(table);
{
let mut tree = BTree::new(&mut meta, &mut self.cache);
tree.delete(&key, pos)?;
}
self.update_table(table, meta);
}
}
}
self.superblock.applied_position = pos;
Ok(())
}
fn applied_position(&self) -> Offset {
self.superblock.applied_position
}
fn get(&mut self, table: TableId, key: &Key) -> Result<Option<Bytes>, StoreError> {
let mut meta = match self.superblock.tables.get(&table) {
Some(m) => m.clone(),
None => return Ok(None),
};
let mut tree = BTree::new(&mut meta, &mut self.cache);
tree.get(key)
}
fn get_at(
&mut self,
table: TableId,
key: &Key,
pos: Offset,
) -> Result<Option<Bytes>, StoreError> {
let mut meta = match self.superblock.tables.get(&table) {
Some(m) => m.clone(),
None => return Ok(None),
};
let mut tree = BTree::new(&mut meta, &mut self.cache);
tree.get_at(key, pos)
}
fn scan(
&mut self,
table: TableId,
range: Range<Key>,
limit: usize,
) -> Result<Vec<(Key, Bytes)>, StoreError> {
let mut meta = match self.superblock.tables.get(&table) {
Some(m) => m.clone(),
None => return Ok(Vec::new()),
};
let mut tree = BTree::new(&mut meta, &mut self.cache);
tree.scan(range, limit)
}
fn scan_at(
&mut self,
table: TableId,
range: Range<Key>,
limit: usize,
pos: Offset,
) -> Result<Vec<(Key, Bytes)>, StoreError> {
let mut meta = match self.superblock.tables.get(&table) {
Some(m) => m.clone(),
None => return Ok(Vec::new()),
};
let mut tree = BTree::new(&mut meta, &mut self.cache);
tree.scan_at(range, limit, pos)
}
fn sync(&mut self) -> Result<(), StoreError> {
self.superblock.next_page_id = self.cache.next_page_id();
Self::write_superblock(&mut self.cache, &self.superblock)?;
self.cache.sync()?;
Ok(())
}
}
#[cfg(test)]
mod store_tests {
use super::*;
use tempfile::tempdir;
#[test]
fn test_new_store() {
let dir = tempdir().unwrap();
let path = dir.path().join("store.db");
let store = BTreeStore::open(&path).unwrap();
assert_eq!(store.applied_position(), Offset::ZERO);
}
#[test]
fn test_apply_batch() {
let dir = tempdir().unwrap();
let path = dir.path().join("store.db");
let mut store = BTreeStore::open(&path).unwrap();
let batch = WriteBatch::new(Offset::new(1))
.put(TableId::new(1), Key::from("key1"), Bytes::from("value1"))
.put(TableId::new(1), Key::from("key2"), Bytes::from("value2"));
store.apply(batch).unwrap();
assert_eq!(ProjectionStore::applied_position(&store), Offset::new(1));
assert_eq!(
store.get(TableId::new(1), &Key::from("key1")).unwrap(),
Some(Bytes::from("value1"))
);
assert_eq!(
store.get(TableId::new(1), &Key::from("key2")).unwrap(),
Some(Bytes::from("value2"))
);
}
#[test]
fn test_sequential_batches() {
let dir = tempdir().unwrap();
let path = dir.path().join("store.db");
let mut store = BTreeStore::open(&path).unwrap();
store
.apply(WriteBatch::new(Offset::new(1)).put(
TableId::new(1),
Key::from("a"),
Bytes::from("1"),
))
.unwrap();
store
.apply(WriteBatch::new(Offset::new(2)).put(
TableId::new(1),
Key::from("b"),
Bytes::from("2"),
))
.unwrap();
store
.apply(WriteBatch::new(Offset::new(3)).put(
TableId::new(1),
Key::from("c"),
Bytes::from("3"),
))
.unwrap();
assert_eq!(ProjectionStore::applied_position(&store), Offset::new(3));
}
#[test]
fn test_mvcc_queries() {
let dir = tempdir().unwrap();
let path = dir.path().join("store.db");
let mut store = BTreeStore::open(&path).unwrap();
store
.apply(WriteBatch::new(Offset::new(1)).put(
TableId::new(1),
Key::from("key"),
Bytes::from("v1"),
))
.unwrap();
store
.apply(WriteBatch::new(Offset::new(2)).put(
TableId::new(1),
Key::from("key"),
Bytes::from("v2"),
))
.unwrap();
assert_eq!(
store.get(TableId::new(1), &Key::from("key")).unwrap(),
Some(Bytes::from("v2"))
);
assert_eq!(
store
.get_at(TableId::new(1), &Key::from("key"), Offset::new(1))
.unwrap(),
Some(Bytes::from("v1"))
);
assert_eq!(
store
.get_at(TableId::new(1), &Key::from("key"), Offset::new(2))
.unwrap(),
Some(Bytes::from("v2"))
);
}
#[test]
fn test_delete() {
let dir = tempdir().unwrap();
let path = dir.path().join("store.db");
let mut store = BTreeStore::open(&path).unwrap();
store
.apply(WriteBatch::new(Offset::new(1)).put(
TableId::new(1),
Key::from("key"),
Bytes::from("value"),
))
.unwrap();
assert!(
store
.get(TableId::new(1), &Key::from("key"))
.unwrap()
.is_some()
);
store
.apply(WriteBatch::new(Offset::new(2)).delete(TableId::new(1), Key::from("key")))
.unwrap();
assert!(
store
.get(TableId::new(1), &Key::from("key"))
.unwrap()
.is_none()
);
assert_eq!(
store
.get_at(TableId::new(1), &Key::from("key"), Offset::new(1))
.unwrap(),
Some(Bytes::from("value"))
);
}
#[test]
fn test_multiple_tables() {
let dir = tempdir().unwrap();
let path = dir.path().join("store.db");
let mut store = BTreeStore::open(&path).unwrap();
store
.apply(
WriteBatch::new(Offset::new(1))
.put(TableId::new(1), Key::from("key"), Bytes::from("table1"))
.put(TableId::new(2), Key::from("key"), Bytes::from("table2")),
)
.unwrap();
assert_eq!(
store.get(TableId::new(1), &Key::from("key")).unwrap(),
Some(Bytes::from("table1"))
);
assert_eq!(
store.get(TableId::new(2), &Key::from("key")).unwrap(),
Some(Bytes::from("table2"))
);
assert!(
store
.get(TableId::new(999), &Key::from("key"))
.unwrap()
.is_none()
);
}
#[test]
fn test_scan() {
let dir = tempdir().unwrap();
let path = dir.path().join("store.db");
let mut store = BTreeStore::open(&path).unwrap();
let mut batch = WriteBatch::new(Offset::new(1));
for i in 0..10 {
batch.push_put(
TableId::new(1),
Key::from(format!("key{i:02}")),
Bytes::from(format!("value{i}")),
);
}
store.apply(batch).unwrap();
let results = store
.scan(TableId::new(1), Key::from("key03")..Key::from("key07"), 100)
.unwrap();
assert_eq!(results.len(), 4);
assert_eq!(results[0].0, Key::from("key03"));
assert_eq!(results[3].0, Key::from("key06"));
}
}