use alloc::sync::Arc;
use super::adapter::BfTreeAdapter;
use super::error::BfTreeError;
pub struct BfTreeWriteTxn {
adapter: Arc<BfTreeAdapter>,
ops_count: u64,
committed: bool,
}
impl BfTreeWriteTxn {
#[allow(dead_code)] pub(crate) fn new(adapter: Arc<BfTreeAdapter>) -> Self {
Self {
adapter,
ops_count: 0,
committed: false,
}
}
pub fn insert(&mut self, key: &[u8], value: &[u8]) -> Result<(), BfTreeError> {
self.adapter.insert(key, value)?;
self.ops_count += 1;
Ok(())
}
pub fn delete(&mut self, key: &[u8]) {
self.adapter.delete(key);
self.ops_count += 1;
}
pub fn read(&self, key: &[u8], out_buffer: &mut [u8]) -> Result<u32, BfTreeError> {
self.adapter.read(key, out_buffer)
}
pub fn contains_key(&self, key: &[u8]) -> bool {
self.adapter.contains_key(key)
}
pub fn commit(mut self) -> Result<(), BfTreeError> {
self.committed = true;
if !self.adapter.inner().config().is_memory_backend() {
self.adapter.snapshot()?;
}
Ok(())
}
pub fn commit_with_snapshot(mut self) -> Result<std::path::PathBuf, BfTreeError> {
self.committed = true;
Ok(self.adapter.snapshot()?)
}
pub fn ops_count(&self) -> u64 {
self.ops_count
}
pub fn adapter(&self) -> &BfTreeAdapter {
&self.adapter
}
}
impl Drop for BfTreeWriteTxn {
fn drop(&mut self) {
if !self.committed && self.ops_count > 0 {
#[cfg(debug_assertions)]
{
eprintln!(
"bf-tree: BfTreeWriteTxn dropped without commit ({} ops applied but not durability-flushed)",
self.ops_count
);
}
}
}
}
pub struct BfTreeReadTxn {
adapter: Arc<BfTreeAdapter>,
}
impl BfTreeReadTxn {
#[allow(dead_code)] pub(crate) fn new(adapter: Arc<BfTreeAdapter>) -> Self {
Self { adapter }
}
pub fn read(&self, key: &[u8], out_buffer: &mut [u8]) -> Result<u32, BfTreeError> {
self.adapter.read(key, out_buffer)
}
pub fn contains_key(&self, key: &[u8]) -> bool {
self.adapter.contains_key(key)
}
pub fn scan_from(
&self,
start_key: &[u8],
count: usize,
) -> Result<crate::bf_tree::ScanIter<'_, '_>, BfTreeError> {
self.adapter.scan_from(start_key, count)
}
pub fn scan_range(
&self,
start_key: &[u8],
end_key: &[u8],
) -> Result<crate::bf_tree::ScanIter<'_, '_>, BfTreeError> {
self.adapter.scan_range(start_key, end_key)
}
pub fn adapter(&self) -> &BfTreeAdapter {
&self.adapter
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::bf_tree_store::config::BfTreeConfig;
#[test]
fn write_txn_basic() {
let config = BfTreeConfig::new_memory(4);
let adapter = Arc::new(BfTreeAdapter::open(config).unwrap());
let mut txn = BfTreeWriteTxn::new(adapter.clone());
txn.insert(b"key1", b"val1").unwrap();
txn.insert(b"key2", b"val2").unwrap();
assert_eq!(txn.ops_count(), 2);
txn.commit().unwrap();
let rtxn = BfTreeReadTxn::new(adapter);
let mut buf = [0u8; 64];
let len = rtxn.read(b"key1", &mut buf).unwrap();
assert_eq!(&buf[..len as usize], b"val1");
}
#[test]
fn concurrent_write_txns() {
use std::thread;
let config = BfTreeConfig::new_memory(8);
let adapter = Arc::new(BfTreeAdapter::open(config).unwrap());
let handles: Vec<_> = (0..4)
.map(|t| {
let adapter = adapter.clone();
thread::spawn(move || {
let mut txn = BfTreeWriteTxn::new(adapter);
for i in 0..50 {
let key = format!("t{t}_k{i}");
let val = format!("t{t}_v{i}");
txn.insert(key.as_bytes(), val.as_bytes()).unwrap();
}
txn.commit().unwrap();
})
})
.collect();
for h in handles {
h.join().unwrap();
}
let rtxn = BfTreeReadTxn::new(adapter);
let mut buf = [0u8; 256];
for t in 0..4 {
for i in 0..50 {
let key = format!("t{t}_k{i}");
let expected = format!("t{t}_v{i}");
let len = rtxn.read(key.as_bytes(), &mut buf).unwrap();
assert_eq!(&buf[..len as usize], expected.as_bytes());
}
}
}
#[test]
fn write_visible_to_concurrent_read() {
let config = BfTreeConfig::new_memory(4);
let adapter = Arc::new(BfTreeAdapter::open(config).unwrap());
let mut wtxn = BfTreeWriteTxn::new(adapter.clone());
wtxn.insert(b"visible", b"yes").unwrap();
let rtxn = BfTreeReadTxn::new(adapter.clone());
let mut buf = [0u8; 64];
let len = rtxn.read(b"visible", &mut buf).unwrap();
assert_eq!(&buf[..len as usize], b"yes");
wtxn.commit().unwrap();
}
#[test]
fn delete_visible_immediately() {
let config = BfTreeConfig::new_memory(4);
let adapter = Arc::new(BfTreeAdapter::open(config).unwrap());
let mut wtxn = BfTreeWriteTxn::new(adapter.clone());
wtxn.insert(b"gone", b"soon").unwrap();
wtxn.delete(b"gone");
let rtxn = BfTreeReadTxn::new(adapter);
let mut buf = [0u8; 64];
let result = rtxn.read(b"gone", &mut buf);
assert!(matches!(result, Err(BfTreeError::Deleted)));
wtxn.commit().unwrap();
}
#[test]
fn scan_via_read_txn() {
let config = BfTreeConfig::new_memory(4);
let adapter = Arc::new(BfTreeAdapter::open(config).unwrap());
let mut wtxn = BfTreeWriteTxn::new(adapter.clone());
wtxn.insert(b"aaa", b"1").unwrap();
wtxn.insert(b"bbb", b"2").unwrap();
wtxn.insert(b"ccc", b"3").unwrap();
wtxn.commit().unwrap();
let rtxn = BfTreeReadTxn::new(adapter);
let mut iter = rtxn.scan_from(b"aaa", 10).unwrap();
let mut buf = [0u8; 256];
let mut count = 0;
while matches!(iter.next(&mut buf), Ok(Some(_))) {
count += 1;
}
assert_eq!(count, 3);
}
}