use crate::compat::Mutex;
use alloc::string::String;
use alloc::sync::Arc;
use alloc::vec;
use alloc::vec::Vec;
use core::marker::PhantomData;
use core::sync::atomic::{AtomicU64, Ordering};
use crate::TableHandle;
use crate::cdc::types::{CdcEvent, ChangeOp};
use crate::sealed::Sealed;
use crate::storage_traits::OwnedKv;
use crate::types::{Key, Value};
use super::adapter::BfTreeAdapter;
use super::buffered_txn::{
BufferLookup, BufferedScanIter, WriteBuffer, collect_buffer_entries_for_table,
};
use super::database::{
BfTreeTableScan, TableKind, encode_table_key, encode_table_key_into, table_prefix,
table_prefix_end,
};
use super::error::BfTreeError;
use super::verification::{VerifyMode, should_verify, unwrap_value, wrap_value};
pub struct BfTreeTable<'txn, K: Key + 'static, V: Value + 'static> {
name: String,
adapter: &'txn Arc<BfTreeAdapter>,
ops_count: &'txn AtomicU64,
cdc_log: Option<&'txn Mutex<Vec<CdcEvent>>>,
buffer: &'txn Mutex<WriteBuffer>,
verify_mode: &'txn Arc<VerifyMode>,
read_buf: Vec<u8>,
key_buf: Vec<u8>,
_key: PhantomData<K>,
_val: PhantomData<V>,
}
impl<K: Key + 'static, V: Value + 'static> Sealed for BfTreeTable<'_, K, V> {}
impl<K: Key + 'static, V: Value + 'static> TableHandle for BfTreeTable<'_, K, V> {
fn name(&self) -> &str {
&self.name
}
}
impl<'txn, K: Key + 'static, V: Value + 'static> BfTreeTable<'txn, K, V> {
pub(crate) fn new(
name: &str,
adapter: &'txn Arc<BfTreeAdapter>,
ops_count: &'txn AtomicU64,
cdc_log: Option<&'txn Mutex<Vec<CdcEvent>>>,
buffer: &'txn Mutex<WriteBuffer>,
verify_mode: &'txn Arc<VerifyMode>,
) -> Self {
let max_record = adapter.inner().config().get_cb_max_record_size();
let key_buf_size = adapter.max_key_len() + 64;
Self {
name: String::from(name),
adapter,
ops_count,
cdc_log,
buffer,
verify_mode,
read_buf: vec![0u8; max_record],
key_buf: vec![0u8; key_buf_size],
_key: PhantomData,
_val: PhantomData,
}
}
fn record_cdc(&self, event: CdcEvent) {
if let Some(log) = self.cdc_log {
log.lock().push(event);
}
}
pub fn insert(
&mut self,
key: &K::SelfType<'_>,
value: &V::SelfType<'_>,
) -> Result<Option<Vec<u8>>, BfTreeError> {
let key_bytes = K::as_bytes(key);
let val_bytes = V::as_bytes(value);
let encoded_key = encode_table_key(&self.name, TableKind::Regular, key_bytes.as_ref());
let checksumming = self.verify_mode.is_enabled();
let store_bytes = if checksumming {
wrap_value(val_bytes.as_ref())
} else {
val_bytes.as_ref().to_vec()
};
let previous = {
let mut buffer = self.buffer.lock();
let prev_raw = match buffer.get(&encoded_key) {
BufferLookup::Found(v) => Some(v),
BufferLookup::Tombstone => None,
BufferLookup::NotInBuffer => {
let max_val = self.adapter.inner().config().get_cb_max_record_size();
let mut buf = vec![0u8; max_val];
match self.adapter.read(&encoded_key, &mut buf) {
Ok(len) => Some(buf[..len as usize].to_vec()),
Err(BfTreeError::NotFound | BfTreeError::Deleted) => None,
Err(e) => return Err(e),
}
}
};
buffer.put(encoded_key, store_bytes)?;
match prev_raw {
Some(raw) if checksumming => {
let verify = should_verify(self.verify_mode.as_ref());
Some(unwrap_value(&raw, verify)?.to_vec())
}
other => other,
}
};
self.ops_count.fetch_add(1, Ordering::Relaxed);
if self.cdc_log.is_some() {
self.record_cdc(CdcEvent {
table_name: self.name.clone(),
op: if previous.is_some() {
ChangeOp::Update
} else {
ChangeOp::Insert
},
key: key_bytes.as_ref().to_vec(),
new_value: Some(val_bytes.as_ref().to_vec()),
old_value: previous.clone(),
});
}
Ok(previous)
}
pub fn remove(&mut self, key: &K::SelfType<'_>) -> Result<Option<Vec<u8>>, BfTreeError> {
let key_bytes = K::as_bytes(key);
let encoded_key = encode_table_key(&self.name, TableKind::Regular, key_bytes.as_ref());
let checksumming = self.verify_mode.is_enabled();
let previous = {
let mut buffer = self.buffer.lock();
let prev_raw = match buffer.get(&encoded_key) {
BufferLookup::Found(v) => Some(v),
BufferLookup::Tombstone => None,
BufferLookup::NotInBuffer => {
let max_val = self.adapter.inner().config().get_cb_max_record_size();
let mut buf = vec![0u8; max_val];
match self.adapter.read(&encoded_key, &mut buf) {
Ok(len) => Some(buf[..len as usize].to_vec()),
Err(BfTreeError::NotFound | BfTreeError::Deleted) => None,
Err(e) => return Err(e),
}
}
};
if prev_raw.is_some() {
buffer.delete(encoded_key);
}
match prev_raw {
Some(raw) if checksumming => {
let verify = should_verify(self.verify_mode.as_ref());
Some(unwrap_value(&raw, verify)?.to_vec())
}
other => other,
}
};
if previous.is_some() {
self.ops_count.fetch_add(1, Ordering::Relaxed);
if self.cdc_log.is_some() {
self.record_cdc(CdcEvent {
table_name: self.name.clone(),
op: ChangeOp::Delete,
key: key_bytes.as_ref().to_vec(),
new_value: None,
old_value: previous.clone(),
});
}
}
Ok(previous)
}
pub fn get(&mut self, key: &K::SelfType<'_>) -> Result<Option<Vec<u8>>, BfTreeError> {
let key_bytes = K::as_bytes(key);
let enc_len = encode_table_key_into(
&mut self.key_buf,
&self.name,
TableKind::Regular,
key_bytes.as_ref(),
);
let encoded_key = &self.key_buf[..enc_len];
let checksumming = self.verify_mode.is_enabled();
let buffer = self.buffer.lock();
match buffer.get(encoded_key) {
BufferLookup::Found(v) => {
return if checksumming {
let verify = should_verify(self.verify_mode.as_ref());
Ok(Some(unwrap_value(&v, verify)?.to_vec()))
} else {
Ok(Some(v))
};
}
BufferLookup::Tombstone => return Ok(None),
BufferLookup::NotInBuffer => {}
}
drop(buffer);
match self.adapter.read(encoded_key, &mut self.read_buf) {
Ok(len) => {
let raw = &self.read_buf[..len as usize];
if checksumming {
let verify = should_verify(self.verify_mode.as_ref());
Ok(Some(unwrap_value(raw, verify)?.to_vec()))
} else {
Ok(Some(raw.to_vec()))
}
}
Err(BfTreeError::NotFound | BfTreeError::Deleted) => Ok(None),
Err(e) => Err(e),
}
}
pub fn merge(
&mut self,
key: &K::SelfType<'_>,
operand: &[u8],
operator: &dyn crate::merge::MergeOperator,
) -> Result<(), BfTreeError> {
let key_bytes = K::as_bytes(key);
let encoded_key = encode_table_key(&self.name, TableKind::Regular, key_bytes.as_ref());
let checksumming = self.verify_mode.is_enabled();
let (old_value, new_value) = {
let mut buffer = self.buffer.lock();
let existing_raw = match buffer.get(&encoded_key) {
BufferLookup::Found(v) => Some(v),
BufferLookup::Tombstone => None,
BufferLookup::NotInBuffer => {
let max_val = self.adapter.inner().config().get_cb_max_record_size();
let mut buf = vec![0u8; max_val];
match self.adapter.read(&encoded_key, &mut buf) {
Ok(len) => Some(buf[..len as usize].to_vec()),
Err(BfTreeError::NotFound | BfTreeError::Deleted) => None,
Err(e) => return Err(e),
}
}
};
let existing = match existing_raw {
Some(ref raw) if checksumming => {
let verify = should_verify(self.verify_mode.as_ref());
Some(unwrap_value(raw, verify)?.to_vec())
}
other => other,
};
let merged = operator.merge(key_bytes.as_ref(), existing.as_deref(), operand);
match merged {
Some(ref new_val) => {
let store = if checksumming {
wrap_value(new_val)
} else {
new_val.clone()
};
buffer.put(encoded_key, store)?;
}
None => buffer.delete(encoded_key),
}
(existing, merged)
};
self.ops_count.fetch_add(1, Ordering::Relaxed);
if self.cdc_log.is_some() {
let (op, cdc_new, cdc_old) = match (&old_value, &new_value) {
(None, Some(nv)) => (ChangeOp::Insert, Some(nv.clone()), None),
(Some(_), Some(nv)) => (ChangeOp::Update, Some(nv.clone()), old_value.clone()),
(Some(_), None) => (ChangeOp::Delete, None, old_value.clone()),
(None, None) => return Ok(()),
};
self.record_cdc(CdcEvent {
table_name: self.name.clone(),
op,
key: key_bytes.as_ref().to_vec(),
new_value: cdc_new,
old_value: cdc_old,
});
}
Ok(())
}
pub fn contains_key(&self, key: &K::SelfType<'_>) -> bool {
let key_bytes = K::as_bytes(key);
let encoded_key = encode_table_key(&self.name, TableKind::Regular, key_bytes.as_ref());
let buffer = self.buffer.lock();
match buffer.get(&encoded_key) {
BufferLookup::Found(_) => return true,
BufferLookup::Tombstone => return false,
BufferLookup::NotInBuffer => {}
}
drop(buffer);
self.adapter.contains_key(&encoded_key)
}
}
pub struct BfTreeReadOnlyTable<'txn, K: Key + 'static, V: Value + 'static> {
name: String,
adapter: &'txn Arc<BfTreeAdapter>,
verify_mode: &'txn Arc<VerifyMode>,
read_buf: Vec<u8>,
key_buf: Vec<u8>,
_key: PhantomData<K>,
_val: PhantomData<V>,
}
impl<K: Key + 'static, V: Value + 'static> Sealed for BfTreeReadOnlyTable<'_, K, V> {}
impl<K: Key + 'static, V: Value + 'static> TableHandle for BfTreeReadOnlyTable<'_, K, V> {
fn name(&self) -> &str {
&self.name
}
}
impl<'txn, K: Key + 'static, V: Value + 'static> BfTreeReadOnlyTable<'txn, K, V> {
pub(crate) fn new(
name: &str,
adapter: &'txn Arc<BfTreeAdapter>,
verify_mode: &'txn Arc<VerifyMode>,
) -> Self {
let max_record = adapter.inner().config().get_cb_max_record_size();
let key_buf_size = adapter.max_key_len() + 64;
Self {
name: String::from(name),
adapter,
verify_mode,
read_buf: vec![0u8; max_record],
key_buf: vec![0u8; key_buf_size],
_key: PhantomData,
_val: PhantomData,
}
}
pub fn get(&mut self, key: &K::SelfType<'_>) -> Result<Option<Vec<u8>>, BfTreeError> {
let key_bytes = K::as_bytes(key);
let enc_len = encode_table_key_into(
&mut self.key_buf,
&self.name,
TableKind::Regular,
key_bytes.as_ref(),
);
let checksumming = self.verify_mode.is_enabled();
match self
.adapter
.read(&self.key_buf[..enc_len], &mut self.read_buf)
{
Ok(len) => {
let raw = &self.read_buf[..len as usize];
if checksumming {
let verify = should_verify(self.verify_mode.as_ref());
Ok(Some(unwrap_value(raw, verify)?.to_vec()))
} else {
Ok(Some(raw.to_vec()))
}
}
Err(BfTreeError::NotFound | BfTreeError::Deleted) => Ok(None),
Err(e) => Err(e),
}
}
pub fn contains_key(&self, key: &K::SelfType<'_>) -> bool {
let key_bytes = K::as_bytes(key);
let encoded_key = encode_table_key(&self.name, TableKind::Regular, key_bytes.as_ref());
self.adapter.contains_key(&encoded_key)
}
pub fn scan(&self) -> Result<BfTreeTableScan<'_>, BfTreeError> {
let prefix = table_prefix(&self.name, TableKind::Regular);
let prefix_end = table_prefix_end(&self.name, TableKind::Regular);
let prefix_len = prefix.len();
let iter = self.adapter.scan_range(&prefix, &prefix_end)?;
Ok(BfTreeTableScan { iter, prefix_len })
}
}
pub struct BfTreeRangeIter<'a, K: Key + 'static, V: Value + 'static> {
scan: BfTreeTableScan<'a>,
buf: Vec<u8>,
exclude_start: Option<Vec<u8>>,
exclude_end: Option<Vec<u8>>,
verify_mode: Arc<VerifyMode>,
_key: PhantomData<K>,
_val: PhantomData<V>,
}
impl<'a, K: Key + 'static, V: Value + 'static> BfTreeRangeIter<'a, K, V> {
fn new(
scan: BfTreeTableScan<'a>,
max_record_size: usize,
exclude_start: Option<Vec<u8>>,
exclude_end: Option<Vec<u8>>,
verify_mode: Arc<VerifyMode>,
) -> Self {
Self {
scan,
buf: vec![0u8; max_record_size * 2],
exclude_start,
exclude_end,
verify_mode,
_key: PhantomData,
_val: PhantomData,
}
}
}
impl<K: Key + 'static, V: Value + 'static> Iterator for BfTreeRangeIter<'_, K, V> {
type Item = crate::Result<(OwnedKv<K>, OwnedKv<V>)>;
fn next(&mut self) -> Option<Self::Item> {
loop {
let (key_bytes, val_bytes) = self.scan.next(&mut self.buf)?;
let key_owned = key_bytes.to_vec();
let val_owned = val_bytes.to_vec();
if let Some(ref excl) = self.exclude_start {
match key_owned.as_slice().cmp(excl.as_slice()) {
core::cmp::Ordering::Equal => {
self.exclude_start = None;
continue;
}
core::cmp::Ordering::Greater => {
self.exclude_start = None;
}
core::cmp::Ordering::Less => {
}
}
}
if self
.exclude_end
.as_ref()
.is_some_and(|excl| key_owned == *excl)
{
return None; }
let k = OwnedKv::new(key_owned);
let v = if self.verify_mode.is_enabled() {
let verify = should_verify(self.verify_mode.as_ref());
match unwrap_value(&val_owned, verify) {
Ok(data) => OwnedKv::new(data.to_vec()),
Err(e) => return Some(Err(e.into())),
}
} else {
OwnedKv::new(val_owned)
};
return Some(Ok((k, v)));
}
}
}
fn build_bf_range_scan<'a, K: Key + 'static, V: Value + 'static>(
name: &str,
adapter: &'a Arc<BfTreeAdapter>,
start: Option<&K::SelfType<'_>>,
end: Option<&K::SelfType<'_>>,
start_inclusive: bool,
end_inclusive: bool,
verify_mode: Arc<VerifyMode>,
) -> crate::Result<BfTreeRangeIter<'a, K, V>> {
let (scan_start, exclude_start) = match start {
Some(s) => {
let s_bytes = K::as_bytes(s).as_ref().to_vec();
let encoded = encode_table_key(name, TableKind::Regular, &s_bytes);
if start_inclusive {
(encoded, None)
} else {
(encoded, Some(s_bytes))
}
}
None => (table_prefix(name, TableKind::Regular), None),
};
let (scan_end, exclude_end) = match end {
Some(e) => {
let e_bytes = K::as_bytes(e).as_ref().to_vec();
let encoded = encode_table_key(name, TableKind::Regular, &e_bytes);
if end_inclusive {
(encoded, None)
} else {
(encoded, Some(e_bytes))
}
}
None => (table_prefix_end(name, TableKind::Regular), None),
};
let prefix_len = table_prefix(name, TableKind::Regular).len();
let iter = adapter
.scan_range(&scan_start, &scan_end)
.map_err(crate::StorageError::from)?;
let scan = BfTreeTableScan { iter, prefix_len };
let max_record_size = adapter.inner().config().get_cb_max_record_size();
Ok(BfTreeRangeIter::new(
scan,
max_record_size,
exclude_start,
exclude_end,
verify_mode,
))
}
#[allow(clippy::too_many_arguments)]
fn build_buffered_range_scan<'a, K: Key + 'static, V: Value + 'static>(
name: &str,
adapter: &'a Arc<BfTreeAdapter>,
buffer_mutex: &Mutex<WriteBuffer>,
start: Option<&K::SelfType<'_>>,
end: Option<&K::SelfType<'_>>,
start_inclusive: bool,
end_inclusive: bool,
verify_mode: Arc<VerifyMode>,
) -> crate::Result<BufferedScanIter<'a, K, V>> {
let (scan_start, exclude_start) = match start {
Some(s) => {
let s_bytes = K::as_bytes(s).as_ref().to_vec();
let encoded = encode_table_key(name, TableKind::Regular, &s_bytes);
if start_inclusive {
(encoded, None)
} else {
(encoded, Some(s_bytes))
}
}
None => (table_prefix(name, TableKind::Regular), None),
};
let (scan_end, exclude_end) = match end {
Some(e) => {
let e_bytes = K::as_bytes(e).as_ref().to_vec();
let encoded = encode_table_key(name, TableKind::Regular, &e_bytes);
if end_inclusive {
(encoded, None)
} else {
(encoded, Some(e_bytes))
}
}
None => (table_prefix_end(name, TableKind::Regular), None),
};
let prefix_len = table_prefix(name, TableKind::Regular).len();
let iter = adapter
.scan_range(&scan_start, &scan_end)
.map_err(crate::StorageError::from)?;
let scan = BfTreeTableScan { iter, prefix_len };
let max_record_size = adapter.inner().config().get_cb_max_record_size();
let buf = buffer_mutex.lock();
let buf_entries =
collect_buffer_entries_for_table(&buf, name, TableKind::Regular, &scan_start, &scan_end);
drop(buf);
Ok(BufferedScanIter::new(
buf_entries,
scan,
max_record_size,
exclude_start,
exclude_end,
verify_mode,
))
}
impl<K: Key + 'static, V: Value + 'static> crate::storage_traits::WriteTable<K, V>
for BfTreeTable<'_, K, V>
{
type RangeIter<'a>
= BufferedScanIter<'a, K, V>
where
Self: 'a;
fn st_get(&self, key: &K::SelfType<'_>) -> crate::Result<Option<OwnedKv<V>>> {
let key_bytes = K::as_bytes(key);
let encoded_key = encode_table_key(&self.name, TableKind::Regular, key_bytes.as_ref());
let checksumming = self.verify_mode.is_enabled();
let buffer = self.buffer.lock();
match buffer.get(&encoded_key) {
BufferLookup::Found(v) => {
return if checksumming {
let verify = should_verify(self.verify_mode.as_ref());
Ok(Some(OwnedKv::new(unwrap_value(&v, verify)?.to_vec())))
} else {
Ok(Some(OwnedKv::new(v)))
};
}
BufferLookup::Tombstone => return Ok(None),
BufferLookup::NotInBuffer => {}
}
drop(buffer);
let max_val = self.adapter.inner().config().get_cb_max_record_size();
let mut buf = vec![0u8; max_val];
match self.adapter.read(&encoded_key, &mut buf) {
Ok(len) => {
let raw = &buf[..len as usize];
if checksumming {
let verify = should_verify(self.verify_mode.as_ref());
Ok(Some(OwnedKv::new(unwrap_value(raw, verify)?.to_vec())))
} else {
Ok(Some(OwnedKv::new(raw.to_vec())))
}
}
Err(BfTreeError::NotFound | BfTreeError::Deleted) => Ok(None),
Err(e) => Err(e.into()),
}
}
fn st_insert(
&mut self,
key: &K::SelfType<'_>,
value: &V::SelfType<'_>,
) -> crate::Result<Option<OwnedKv<V>>> {
match self.insert(key, value) {
Ok(Some(bytes)) => Ok(Some(OwnedKv::new(bytes))),
Ok(None) => Ok(None),
Err(e) => Err(e.into()),
}
}
fn st_remove(&mut self, key: &K::SelfType<'_>) -> crate::Result<Option<OwnedKv<V>>> {
match self.remove(key) {
Ok(Some(bytes)) => Ok(Some(OwnedKv::new(bytes))),
Ok(None) => Ok(None),
Err(e) => Err(e.into()),
}
}
fn st_range<'a>(
&'a self,
start: Option<&K::SelfType<'_>>,
end: Option<&K::SelfType<'_>>,
start_inclusive: bool,
end_inclusive: bool,
) -> crate::Result<Self::RangeIter<'a>> {
build_buffered_range_scan::<K, V>(
&self.name,
self.adapter,
self.buffer,
start,
end,
start_inclusive,
end_inclusive,
Arc::clone(self.verify_mode),
)
}
fn st_drain_all(&mut self) -> crate::Result<u64> {
let prefix = table_prefix(&self.name, TableKind::Regular);
let prefix_end = table_prefix_end(&self.name, TableKind::Regular);
let max_record_size = self.adapter.inner().config().get_cb_max_record_size();
let mut total_count = 0u64;
loop {
let bftree_encoded_keys = {
let mut buf = vec![0u8; max_record_size * 2];
let mut keys: Vec<Vec<u8>> = Vec::new();
let mut iter = self
.adapter
.scan_range(&prefix, &prefix_end)
.map_err(crate::StorageError::from)?;
while let Ok(Some((key_len, _val_len))) = iter.next(&mut buf) {
keys.push(buf[..key_len].to_vec());
}
keys
};
let mut buffer = self.buffer.lock();
let pass_count = buffer.drain_table(&bftree_encoded_keys, &prefix, &prefix_end);
drop(buffer);
total_count = total_count.saturating_add(pass_count);
if pass_count == 0 {
break;
}
}
self.ops_count.fetch_add(total_count, Ordering::Relaxed);
if self.cdc_log.is_some() && total_count > 0 {
self.record_cdc(CdcEvent {
table_name: self.name.clone(),
op: ChangeOp::Delete,
key: Vec::new(),
new_value: None,
old_value: None,
});
}
Ok(total_count)
}
}
impl<K: Key + 'static, V: Value + 'static> crate::storage_traits::ReadTable<K, V>
for BfTreeTable<'_, K, V>
{
type RangeIter<'a>
= BufferedScanIter<'a, K, V>
where
Self: 'a;
fn st_get(&self, key: &K::SelfType<'_>) -> crate::Result<Option<OwnedKv<V>>> {
crate::storage_traits::WriteTable::st_get(self, key)
}
fn st_range<'a>(
&'a self,
start: Option<&K::SelfType<'_>>,
end: Option<&K::SelfType<'_>>,
start_inclusive: bool,
end_inclusive: bool,
) -> crate::Result<Self::RangeIter<'a>> {
crate::storage_traits::WriteTable::st_range(
self,
start,
end,
start_inclusive,
end_inclusive,
)
}
}
impl<K: Key + 'static, V: Value + 'static> crate::storage_traits::ReadTable<K, V>
for BfTreeReadOnlyTable<'_, K, V>
{
type RangeIter<'a>
= BfTreeRangeIter<'a, K, V>
where
Self: 'a;
fn st_get(&self, key: &K::SelfType<'_>) -> crate::Result<Option<OwnedKv<V>>> {
let key_bytes = K::as_bytes(key);
let encoded_key = encode_table_key(&self.name, TableKind::Regular, key_bytes.as_ref());
let checksumming = self.verify_mode.is_enabled();
let max_val = self.adapter.inner().config().get_cb_max_record_size();
let mut buf = vec![0u8; max_val];
match self.adapter.read(&encoded_key, &mut buf) {
Ok(len) => {
let raw = &buf[..len as usize];
if checksumming {
let verify = should_verify(self.verify_mode.as_ref());
Ok(Some(OwnedKv::new(unwrap_value(raw, verify)?.to_vec())))
} else {
Ok(Some(OwnedKv::new(raw.to_vec())))
}
}
Err(BfTreeError::NotFound | BfTreeError::Deleted) => Ok(None),
Err(e) => Err(e.into()),
}
}
fn st_range<'a>(
&'a self,
start: Option<&K::SelfType<'_>>,
end: Option<&K::SelfType<'_>>,
start_inclusive: bool,
end_inclusive: bool,
) -> crate::Result<Self::RangeIter<'a>> {
build_bf_range_scan::<K, V>(
&self.name,
self.adapter,
start,
end,
start_inclusive,
end_inclusive,
Arc::clone(self.verify_mode),
)
}
}
impl crate::storage_traits::StorageWrite for super::database::BfTreeDatabaseWriteTxn {
type Table<'txn, K: Key + 'static, V: Value + 'static>
= BfTreeTable<'txn, K, V>
where
Self: 'txn;
fn open_storage_table<K: Key + 'static, V: Value + 'static>(
&self,
definition: crate::TableDefinition<K, V>,
) -> crate::Result<Self::Table<'_, K, V>> {
Ok(self.open_table(definition)?)
}
}
impl crate::storage_traits::StorageRead for super::database::BfTreeDatabaseReadTxn {
type Table<'txn, K: Key + 'static, V: Value + 'static>
= BfTreeReadOnlyTable<'txn, K, V>
where
Self: 'txn;
fn open_storage_table<K: Key + 'static, V: Value + 'static>(
&self,
definition: crate::TableDefinition<K, V>,
) -> crate::Result<Self::Table<'_, K, V>> {
Ok(self.open_table(definition)?)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::TableDefinition;
use crate::bf_tree_store::config::BfTreeConfig;
use crate::bf_tree_store::database::BfTreeDatabase;
const ITEMS: TableDefinition<&str, u64> = TableDefinition::new("items");
#[test]
fn open_table_insert_get() {
let db = BfTreeDatabase::create(BfTreeConfig::new_memory(4)).unwrap();
let wtxn = db.begin_write();
let mut table = wtxn.open_table(ITEMS).unwrap();
let prev = table.insert(&"apple", &10u64).unwrap();
assert!(prev.is_none());
let val = table.get(&"apple").unwrap().unwrap();
assert_eq!(u64::from_le_bytes(val.as_slice().try_into().unwrap()), 10);
drop(table);
wtxn.commit().unwrap();
}
#[test]
fn open_table_insert_returns_previous() {
let db = BfTreeDatabase::create(BfTreeConfig::new_memory(4)).unwrap();
let wtxn = db.begin_write();
let mut table = wtxn.open_table(ITEMS).unwrap();
table.insert(&"key", &1u64).unwrap();
let prev = table.insert(&"key", &2u64).unwrap();
assert!(prev.is_some());
let prev_val = u64::from_le_bytes(prev.unwrap().as_slice().try_into().unwrap());
assert_eq!(prev_val, 1);
drop(table);
wtxn.commit().unwrap();
}
#[test]
fn open_table_remove() {
let db = BfTreeDatabase::create(BfTreeConfig::new_memory(4)).unwrap();
let wtxn = db.begin_write();
let mut table = wtxn.open_table(ITEMS).unwrap();
table.insert(&"temp", &99u64).unwrap();
let removed = table.remove(&"temp").unwrap();
assert!(removed.is_some());
assert!(table.get(&"temp").unwrap().is_none());
let removed2 = table.remove(&"nope").unwrap();
assert!(removed2.is_none());
drop(table);
wtxn.commit().unwrap();
}
#[test]
fn read_only_table() {
let db = BfTreeDatabase::create(BfTreeConfig::new_memory(4)).unwrap();
let wtxn = db.begin_write();
let mut table = wtxn.open_table(ITEMS).unwrap();
table.insert(&"x", &42u64).unwrap();
table.insert(&"y", &43u64).unwrap();
drop(table);
wtxn.commit().unwrap();
let rtxn = db.begin_read();
let mut ro_table = rtxn.open_table(ITEMS).unwrap();
assert!(ro_table.contains_key(&"x"));
assert!(!ro_table.contains_key(&"z"));
let val = ro_table.get(&"x").unwrap().unwrap();
assert_eq!(u64::from_le_bytes(val.as_slice().try_into().unwrap()), 42);
}
#[test]
fn table_scan_via_handle() {
let db = BfTreeDatabase::create(BfTreeConfig::new_memory(4)).unwrap();
let wtxn = db.begin_write();
let mut table = wtxn.open_table(ITEMS).unwrap();
table.insert(&"a", &1u64).unwrap();
table.insert(&"b", &2u64).unwrap();
table.insert(&"c", &3u64).unwrap();
drop(table);
wtxn.commit().unwrap();
let rtxn = db.begin_read();
let ro_table = rtxn.open_table(ITEMS).unwrap();
let mut scan = ro_table.scan().unwrap();
let mut buf = vec![0u8; 4096];
let mut count = 0;
while scan.next(&mut buf).is_some() {
count += 1;
}
assert_eq!(count, 3);
}
use crate::storage_traits::{ReadTable, StorageRead, StorageWrite, WriteTable};
#[test]
fn st_write_table_get_insert_remove() {
let db = BfTreeDatabase::create(BfTreeConfig::new_memory(4)).unwrap();
let wtxn = db.begin_write();
let mut table = wtxn.open_storage_table(ITEMS).unwrap();
let prev = WriteTable::st_insert(&mut table, &"key1", &100u64).unwrap();
assert!(prev.is_none());
let val = WriteTable::st_get(&table, &"key1").unwrap().unwrap();
assert_eq!(val.value(), 100u64);
let prev = WriteTable::st_insert(&mut table, &"key1", &200u64).unwrap();
assert!(prev.is_some());
assert_eq!(prev.unwrap().value(), 100u64);
let removed = WriteTable::st_remove(&mut table, &"key1").unwrap();
assert!(removed.is_some());
assert_eq!(removed.unwrap().value(), 200u64);
let removed2 = WriteTable::st_remove(&mut table, &"missing").unwrap();
assert!(removed2.is_none());
drop(table);
wtxn.commit().unwrap();
}
#[test]
fn st_range_scan_full() {
let db = BfTreeDatabase::create(BfTreeConfig::new_memory(4)).unwrap();
let wtxn = db.begin_write();
let mut table = wtxn.open_storage_table(ITEMS).unwrap();
WriteTable::st_insert(&mut table, &"a", &1u64).unwrap();
WriteTable::st_insert(&mut table, &"b", &2u64).unwrap();
WriteTable::st_insert(&mut table, &"c", &3u64).unwrap();
WriteTable::st_insert(&mut table, &"d", &4u64).unwrap();
let iter = WriteTable::st_range(&table, None, None, true, true).unwrap();
let entries: Vec<_> = iter.collect::<Result<Vec<_>, _>>().unwrap();
assert_eq!(entries.len(), 4);
let keys: Vec<&str> = entries.iter().map(|(k, _)| k.value()).collect();
assert_eq!(keys, vec!["a", "b", "c", "d"]);
drop(table);
wtxn.commit().unwrap();
}
#[test]
fn st_range_scan_bounded() {
let db = BfTreeDatabase::create(BfTreeConfig::new_memory(4)).unwrap();
let wtxn = db.begin_write();
let mut table = wtxn.open_storage_table(ITEMS).unwrap();
for (k, v) in [("a", 1u64), ("b", 2), ("c", 3), ("d", 4), ("e", 5)] {
WriteTable::st_insert(&mut table, &k, &v).unwrap();
}
let s = "b";
let e = "d";
let s_ref: &str = s;
let e_ref: &str = e;
let iter = WriteTable::st_range(&table, Some(&s_ref), Some(&e_ref), true, false).unwrap();
let entries: Vec<_> = iter.collect::<Result<Vec<_>, _>>().unwrap();
let keys: Vec<&str> = entries.iter().map(|(k, _)| k.value()).collect();
assert_eq!(keys, vec!["b", "c"]);
let iter = WriteTable::st_range(&table, Some(&s_ref), Some(&e_ref), true, true).unwrap();
let entries: Vec<_> = iter.collect::<Result<Vec<_>, _>>().unwrap();
let keys: Vec<&str> = entries.iter().map(|(k, _)| k.value()).collect();
assert_eq!(keys, vec!["b", "c", "d"]);
drop(table);
wtxn.commit().unwrap();
}
#[test]
fn st_drain_all() {
let db = BfTreeDatabase::create(BfTreeConfig::new_memory(4)).unwrap();
let wtxn = db.begin_write();
let mut table = wtxn.open_storage_table(ITEMS).unwrap();
WriteTable::st_insert(&mut table, &"x", &10u64).unwrap();
WriteTable::st_insert(&mut table, &"y", &20u64).unwrap();
WriteTable::st_insert(&mut table, &"z", &30u64).unwrap();
let count = WriteTable::st_drain_all(&mut table).unwrap();
assert_eq!(count, 3);
assert!(WriteTable::st_get(&table, &"x").unwrap().is_none());
assert!(WriteTable::st_get(&table, &"y").unwrap().is_none());
drop(table);
wtxn.commit().unwrap();
}
#[test]
fn st_read_table_via_read_txn() {
let db = BfTreeDatabase::create(BfTreeConfig::new_memory(4)).unwrap();
let wtxn = db.begin_write();
let mut table = wtxn.open_storage_table(ITEMS).unwrap();
WriteTable::st_insert(&mut table, &"r1", &100u64).unwrap();
WriteTable::st_insert(&mut table, &"r2", &200u64).unwrap();
drop(table);
wtxn.commit().unwrap();
let rtxn = db.begin_read();
let ro_table = StorageRead::open_storage_table(&rtxn, ITEMS).unwrap();
let v1 = ReadTable::st_get(&ro_table, &"r1").unwrap().unwrap();
assert_eq!(v1.value(), 100u64);
let v2 = ReadTable::st_get(&ro_table, &"r2").unwrap().unwrap();
assert_eq!(v2.value(), 200u64);
assert!(ReadTable::st_get(&ro_table, &"missing").unwrap().is_none());
let iter = ReadTable::st_range(&ro_table, None, None, true, true).unwrap();
let entries: Vec<_> = iter.collect::<Result<Vec<_>, _>>().unwrap();
assert_eq!(entries.len(), 2);
}
#[test]
fn st_storage_write_open_table() {
let db = BfTreeDatabase::create(BfTreeConfig::new_memory(4)).unwrap();
let wtxn = db.begin_write();
let mut table = StorageWrite::open_storage_table(&wtxn, ITEMS).unwrap();
WriteTable::st_insert(&mut table, &"trait_key", &42u64).unwrap();
let val = WriteTable::st_get(&table, &"trait_key").unwrap().unwrap();
assert_eq!(val.value(), 42u64);
drop(table);
wtxn.commit().unwrap();
}
#[test]
fn merge_increments_existing_value() {
let db = BfTreeDatabase::create(BfTreeConfig::new_memory(4)).unwrap();
let op = crate::merge::NumericAdd;
let wtxn = db.begin_write();
let mut table = wtxn.open_table(ITEMS).unwrap();
table.insert(&"counter", &10u64).unwrap();
drop(table);
wtxn.commit().unwrap();
let wtxn = db.begin_write();
let mut table = wtxn.open_table(ITEMS).unwrap();
table.merge(&"counter", &5u64.to_le_bytes(), &op).unwrap();
drop(table);
wtxn.commit().unwrap();
let rtxn = db.begin_read();
let mut ro_table = rtxn.open_table(ITEMS).unwrap();
let val = ro_table.get(&"counter").unwrap().unwrap();
let result = u64::from_le_bytes(val.as_slice().try_into().unwrap());
assert_eq!(result, 15);
}
#[test]
fn merge_on_nonexistent_key_uses_operand() {
let db = BfTreeDatabase::create(BfTreeConfig::new_memory(4)).unwrap();
let op = crate::merge::NumericAdd;
let wtxn = db.begin_write();
let mut table = wtxn.open_table(ITEMS).unwrap();
table.merge(&"fresh", &42u64.to_le_bytes(), &op).unwrap();
drop(table);
wtxn.commit().unwrap();
let rtxn = db.begin_read();
let mut ro_table = rtxn.open_table(ITEMS).unwrap();
let val = ro_table.get(&"fresh").unwrap().unwrap();
let result = u64::from_le_bytes(val.as_slice().try_into().unwrap());
assert_eq!(result, 42);
}
#[test]
fn merge_returning_none_deletes_key() {
use alloc::vec::Vec;
let db = BfTreeDatabase::create(BfTreeConfig::new_memory(4)).unwrap();
let delete_op = crate::merge::merge_fn(
|_key: &[u8], _existing: Option<&[u8]>, _operand: &[u8]| -> Option<Vec<u8>> { None },
);
let wtxn = db.begin_write();
let mut table = wtxn.open_table(ITEMS).unwrap();
table.insert(&"doomed", &99u64).unwrap();
drop(table);
wtxn.commit().unwrap();
let wtxn = db.begin_write();
let mut table = wtxn.open_table(ITEMS).unwrap();
table.merge(&"doomed", &[0u8], &delete_op).unwrap();
drop(table);
wtxn.commit().unwrap();
let rtxn = db.begin_read();
let mut ro_table = rtxn.open_table(ITEMS).unwrap();
assert!(ro_table.get(&"doomed").unwrap().is_none());
}
}