use alloc::sync::Arc;
use crate::bf_tree::{BfTree, LeafInsertResult, LeafReadResult, ScanIter, ScanReturnField};
use super::config::BfTreeConfig;
use super::error::BfTreeError;
pub struct BfTreeAdapter {
inner: BfTree,
max_key_len: usize,
}
impl BfTreeAdapter {
pub fn open(config: BfTreeConfig) -> Result<Self, BfTreeError> {
let max_key_len = config.max_key_len;
let bf_config = config.into_bf_config()?;
let inner = BfTree::with_config(bf_config, None)?;
Ok(Self { inner, max_key_len })
}
pub fn open_from_snapshot(config: BfTreeConfig) -> Result<Self, BfTreeError> {
let max_key_len = config.max_key_len;
let bf_config = config.into_bf_config()?;
let inner = BfTree::new_from_snapshot(bf_config, None)?;
Ok(Self { inner, max_key_len })
}
pub fn max_key_len(&self) -> usize {
self.max_key_len
}
pub fn insert(&self, key: &[u8], value: &[u8]) -> Result<(), BfTreeError> {
match self.inner.insert(key, value) {
LeafInsertResult::Success => Ok(()),
LeafInsertResult::InvalidKV(msg) => Err(BfTreeError::InvalidKV(msg)),
}
}
pub fn read(&self, key: &[u8], out_buffer: &mut [u8]) -> Result<u32, BfTreeError> {
match self.inner.read(key, out_buffer) {
LeafReadResult::Found(len) => Ok(len),
LeafReadResult::NotFound => Err(BfTreeError::NotFound),
LeafReadResult::Deleted => Err(BfTreeError::Deleted),
LeafReadResult::InvalidKey => Err(BfTreeError::InvalidKey),
}
}
pub fn delete(&self, key: &[u8]) {
self.inner.delete(key);
}
pub fn insert_deferred_wal(&self, key: &[u8], value: &[u8]) -> Result<(), BfTreeError> {
match self.inner.insert_deferred_wal(key, value) {
LeafInsertResult::Success => Ok(()),
LeafInsertResult::InvalidKV(msg) => Err(BfTreeError::InvalidKV(msg)),
}
}
pub fn delete_deferred_wal(&self, key: &[u8]) {
self.inner.delete_deferred_wal(key);
}
pub fn batch_insert_sorted_deferred_wal(
&self,
entries: &[(&[u8], &[u8])],
) -> Result<(), BfTreeError> {
self.inner
.batch_insert_sorted_deferred_wal(entries)
.map_err(|e| BfTreeError::InvalidKV(alloc::format!("batch insert failed: {e:?}")))
}
pub fn batch_delete_sorted_deferred_wal(&self, keys: &[&[u8]]) -> Result<(), BfTreeError> {
self.inner
.batch_delete_sorted_deferred_wal(keys)
.map_err(|e| BfTreeError::InvalidKV(alloc::format!("batch delete failed: {e:?}")))
}
pub fn flush_wal(&self) -> Result<(), crate::bf_tree::BfTreeError> {
self.inner.flush_wal()
}
pub fn contains_key(&self, key: &[u8]) -> bool {
let max_val = self.inner.config().get_cb_max_record_size();
let mut buf = vec![0u8; max_val];
matches!(self.inner.read(key, &mut buf), LeafReadResult::Found(_))
}
pub fn scan_from(
&self,
start_key: &[u8],
count: usize,
) -> Result<ScanIter<'_, '_>, BfTreeError> {
self.inner
.scan_with_count(start_key, count, ScanReturnField::KeyAndValue)
.map_err(BfTreeError::from)
}
pub fn scan_range(
&self,
start_key: &[u8],
end_key: &[u8],
) -> Result<ScanIter<'_, '_>, BfTreeError> {
self.inner
.scan_with_end_key(start_key, end_key, ScanReturnField::KeyAndValue)
.map_err(BfTreeError::from)
}
pub fn snapshot(&self) -> Result<std::path::PathBuf, crate::bf_tree::BfTreeError> {
self.inner.snapshot()
}
pub fn inner(&self) -> &BfTree {
&self.inner
}
pub fn buffer_metrics(&self) -> crate::bf_tree::circular_buffer::CircularBufferMetrics {
self.inner.get_buffer_metrics()
}
}
impl BfTreeAdapter {
pub fn into_shared(self) -> Arc<Self> {
Arc::new(self)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn basic_insert_read() {
let config = BfTreeConfig::new_memory(4);
let adapter = BfTreeAdapter::open(config).unwrap();
adapter.insert(b"hello", b"world").unwrap();
let mut buf = [0u8; 64];
let len = adapter.read(b"hello", &mut buf).unwrap();
assert_eq!(&buf[..len as usize], b"world");
}
#[test]
fn read_not_found() {
let config = BfTreeConfig::new_memory(4);
let adapter = BfTreeAdapter::open(config).unwrap();
let mut buf = [0u8; 64];
let result = adapter.read(b"missing", &mut buf);
assert!(matches!(result, Err(BfTreeError::NotFound)));
}
#[test]
fn insert_delete_read() {
let config = BfTreeConfig::new_memory(4);
let adapter = BfTreeAdapter::open(config).unwrap();
adapter.insert(b"key1", b"val1").unwrap();
adapter.delete(b"key1");
let mut buf = [0u8; 64];
let result = adapter.read(b"key1", &mut buf);
assert!(matches!(result, Err(BfTreeError::Deleted)));
}
#[test]
fn contains_key() {
let config = BfTreeConfig::new_memory(4);
let adapter = BfTreeAdapter::open(config).unwrap();
assert!(!adapter.contains_key(b"key"));
adapter.insert(b"key", b"val").unwrap();
assert!(adapter.contains_key(b"key"));
}
#[test]
fn concurrent_writes() {
use std::sync::Arc;
use std::thread;
let config = BfTreeConfig::new_memory(8);
let adapter = Arc::new(BfTreeAdapter::open(config).unwrap());
let num_threads = 4;
let writes_per_thread = 100;
let handles: Vec<_> = (0..num_threads)
.map(|t| {
let adapter = adapter.clone();
thread::spawn(move || {
for i in 0..writes_per_thread {
let key = format!("t{t}_k{i}");
let val = format!("t{t}_v{i}");
adapter.insert(key.as_bytes(), val.as_bytes()).unwrap();
}
})
})
.collect();
for h in handles {
h.join().unwrap();
}
let mut buf = [0u8; 256];
for t in 0..num_threads {
for i in 0..writes_per_thread {
let key = format!("t{t}_k{i}");
let expected = format!("t{t}_v{i}");
let len = adapter.read(key.as_bytes(), &mut buf).unwrap();
assert_eq!(&buf[..len as usize], expected.as_bytes());
}
}
}
#[test]
fn scan_basic() {
let config = BfTreeConfig::new_memory(4);
let adapter = BfTreeAdapter::open(config).unwrap();
adapter.insert(b"aaa", b"1").unwrap();
adapter.insert(b"bbb", b"2").unwrap();
adapter.insert(b"ccc", b"3").unwrap();
let mut iter = adapter.scan_from(b"aaa", 10).unwrap();
let mut buf = [0u8; 256];
let mut count = 0;
while matches!(iter.next(&mut buf), Ok(Some(_))) {
count += 1;
}
assert_eq!(count, 3);
}
#[test]
fn overwrite_value() {
let config = BfTreeConfig::new_memory(4);
let adapter = BfTreeAdapter::open(config).unwrap();
adapter.insert(b"key", b"v1").unwrap();
adapter.insert(b"key", b"v2_longer").unwrap();
let mut buf = [0u8; 64];
let len = adapter.read(b"key", &mut buf).unwrap();
assert_eq!(&buf[..len as usize], b"v2_longer");
}
}