use std::cell::UnsafeCell;
use std::hash::{Hash, Hasher};
use std::sync::atomic::{AtomicU32, Ordering};
const DEFAULT_ARENA_CAPACITY: usize = 64 * 1024;
#[allow(dead_code)]
const INLINE_MAX_SIZE: usize = 24;
#[derive(Clone, Copy, Debug)]
pub struct BytesRef {
offset_or_inline: u32,
len: u32,
hash: u64,
}
impl BytesRef {
const INLINE_FLAG: u32 = 0x8000_0000;
#[inline]
pub fn from_arena(offset: u32, len: u32, hash: u64) -> Self {
debug_assert!(offset & Self::INLINE_FLAG == 0, "offset too large");
Self {
offset_or_inline: offset,
len,
hash,
}
}
#[inline]
pub fn null() -> Self {
Self {
offset_or_inline: 0,
len: 0,
hash: 0,
}
}
#[inline]
pub fn is_null(&self) -> bool {
self.len == 0
}
#[inline]
pub fn len(&self) -> usize {
self.len as usize
}
#[inline]
pub fn is_empty(&self) -> bool {
self.len == 0
}
#[inline]
pub fn hash(&self) -> u64 {
self.hash
}
#[inline]
pub fn fingerprint(&self) -> u128 {
let upper = self.hash;
let lower = (self.len as u64) ^ (self.hash.rotate_right(32));
((upper as u128) << 64) | (lower as u128)
}
#[inline]
pub fn offset(&self) -> u32 {
self.offset_or_inline & !Self::INLINE_FLAG
}
#[inline]
pub fn resolve<'a>(&self, arena: &'a TxnArena) -> &'a [u8] {
arena.get_bytes(self.offset(), self.len as usize)
}
}
impl PartialEq for BytesRef {
#[inline]
fn eq(&self, other: &Self) -> bool {
self.hash == other.hash && self.len == other.len
}
}
impl Eq for BytesRef {}
impl Hash for BytesRef {
#[inline]
fn hash<H: Hasher>(&self, state: &mut H) {
state.write_u64(self.hash);
}
}
impl PartialOrd for BytesRef {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Ord for BytesRef {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.hash
.cmp(&other.hash)
.then_with(|| self.len.cmp(&other.len))
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub struct KeyFingerprint(pub u128);
impl KeyFingerprint {
#[inline]
pub fn from_bytes(key: &[u8]) -> Self {
let hash = blake3::hash(key);
let bytes = hash.as_bytes();
let upper = u64::from_le_bytes([
bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7],
]);
let lower = u64::from_le_bytes([
bytes[8], bytes[9], bytes[10], bytes[11], bytes[12], bytes[13], bytes[14], bytes[15],
]);
Self(((upper as u128) << 64) | (lower as u128))
}
#[inline]
pub fn from_bytes_ref(bytes_ref: &BytesRef, arena: &TxnArena) -> Self {
Self::from_bytes(bytes_ref.resolve(arena))
}
#[inline]
pub fn value(&self) -> u128 {
self.0
}
}
impl From<&[u8]> for KeyFingerprint {
fn from(bytes: &[u8]) -> Self {
Self::from_bytes(bytes)
}
}
pub struct TxnArena {
txn_id: u64,
data: UnsafeCell<Vec<u8>>,
offset: AtomicU32,
key_count: AtomicU32,
value_count: AtomicU32,
}
unsafe impl Send for TxnArena {}
impl TxnArena {
#[inline]
pub fn new(txn_id: u64) -> Self {
Self::with_capacity(txn_id, DEFAULT_ARENA_CAPACITY)
}
pub fn with_capacity(txn_id: u64, capacity: usize) -> Self {
Self {
txn_id,
data: UnsafeCell::new(Vec::with_capacity(capacity)),
offset: AtomicU32::new(0),
key_count: AtomicU32::new(0),
value_count: AtomicU32::new(0),
}
}
#[inline]
pub fn txn_id(&self) -> u64 {
self.txn_id
}
#[inline]
pub fn alloc_key(&self, key: &[u8]) -> BytesRef {
let hash = Self::compute_hash(key);
let (offset, len) = self.alloc_raw(key);
self.key_count.fetch_add(1, Ordering::Relaxed);
BytesRef::from_arena(offset, len as u32, hash)
}
#[inline]
pub fn alloc_value(&self, value: &[u8]) -> BytesRef {
let hash = Self::compute_hash(value);
let (offset, len) = self.alloc_raw(value);
self.value_count.fetch_add(1, Ordering::Relaxed);
BytesRef::from_arena(offset, len as u32, hash)
}
#[inline]
pub fn alloc_kv(&self, key: &[u8], value: &[u8]) -> (BytesRef, BytesRef) {
(self.alloc_key(key), self.alloc_value(value))
}
fn alloc_raw(&self, data: &[u8]) -> (u32, usize) {
let len = data.len();
if len == 0 {
return (0, 0);
}
let offset = self.offset.fetch_add(len as u32, Ordering::Relaxed);
let vec = unsafe { &mut *self.data.get() };
if vec.len() < (offset as usize + len) {
vec.resize(offset as usize + len, 0);
}
vec[offset as usize..offset as usize + len].copy_from_slice(data);
(offset, len)
}
#[inline]
pub fn get_bytes(&self, offset: u32, len: usize) -> &[u8] {
let vec = unsafe { &*self.data.get() };
&vec[offset as usize..offset as usize + len]
}
#[inline]
fn compute_hash(data: &[u8]) -> u64 {
const FNV_OFFSET_BASIS: u64 = 0xcbf29ce484222325;
const FNV_PRIME: u64 = 0x00000100000001B3;
let mut h = FNV_OFFSET_BASIS;
for &b in data {
h ^= b as u64;
h = h.wrapping_mul(FNV_PRIME);
}
h
}
#[inline]
pub fn bytes_used(&self) -> usize {
self.offset.load(Ordering::Relaxed) as usize
}
#[inline]
pub fn key_count(&self) -> usize {
self.key_count.load(Ordering::Relaxed) as usize
}
#[inline]
pub fn value_count(&self) -> usize {
self.value_count.load(Ordering::Relaxed) as usize
}
#[inline]
pub fn reset(&self) {
self.offset.store(0, Ordering::Relaxed);
self.key_count.store(0, Ordering::Relaxed);
self.value_count.store(0, Ordering::Relaxed);
}
#[inline]
pub fn fingerprint(&self, bytes_ref: &BytesRef) -> KeyFingerprint {
KeyFingerprint::from_bytes(bytes_ref.resolve(self))
}
}
impl Drop for TxnArena {
fn drop(&mut self) {
}
}
use std::collections::HashSet;
pub struct ArenaWriteSet {
fingerprints: HashSet<KeyFingerprint>,
}
impl ArenaWriteSet {
#[inline]
pub fn new() -> Self {
Self {
fingerprints: HashSet::new(),
}
}
#[inline]
pub fn with_capacity(capacity: usize) -> Self {
Self {
fingerprints: HashSet::with_capacity(capacity),
}
}
#[inline]
pub fn insert(&mut self, fingerprint: KeyFingerprint) -> bool {
self.fingerprints.insert(fingerprint)
}
#[inline]
pub fn insert_bytes(&mut self, key: &[u8]) -> bool {
self.fingerprints.insert(KeyFingerprint::from_bytes(key))
}
#[inline]
pub fn contains(&self, fingerprint: &KeyFingerprint) -> bool {
self.fingerprints.contains(fingerprint)
}
#[inline]
pub fn contains_bytes(&self, key: &[u8]) -> bool {
self.fingerprints.contains(&KeyFingerprint::from_bytes(key))
}
#[inline]
pub fn is_disjoint(&self, other: &ArenaWriteSet) -> bool {
self.fingerprints.is_disjoint(&other.fingerprints)
}
#[inline]
pub fn len(&self) -> usize {
self.fingerprints.len()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.fingerprints.is_empty()
}
#[inline]
pub fn iter(&self) -> impl Iterator<Item = &KeyFingerprint> {
self.fingerprints.iter()
}
#[inline]
pub fn clear(&mut self) {
self.fingerprints.clear();
}
}
impl Default for ArenaWriteSet {
fn default() -> Self {
Self::new()
}
}
#[derive(Clone, Copy, Debug)]
pub struct WriteOp {
pub key: BytesRef,
pub value: BytesRef,
pub is_delete: bool,
}
pub struct TxnWriteBuffer {
txn_id: u64,
arena: TxnArena,
ops: Vec<WriteOp>,
write_set: ArenaWriteSet,
read_set: ArenaWriteSet,
}
impl TxnWriteBuffer {
#[inline]
pub fn new(txn_id: u64) -> Self {
Self {
txn_id,
arena: TxnArena::new(txn_id),
ops: Vec::with_capacity(64),
write_set: ArenaWriteSet::with_capacity(64),
read_set: ArenaWriteSet::new(),
}
}
pub fn with_capacity(txn_id: u64, ops_capacity: usize) -> Self {
Self {
txn_id,
arena: TxnArena::with_capacity(txn_id, ops_capacity * 128), ops: Vec::with_capacity(ops_capacity),
write_set: ArenaWriteSet::with_capacity(ops_capacity),
read_set: ArenaWriteSet::new(),
}
}
#[inline]
pub fn txn_id(&self) -> u64 {
self.txn_id
}
#[inline]
pub fn put(&mut self, key: &[u8], value: &[u8]) {
let key_ref = self.arena.alloc_key(key);
let val_ref = self.arena.alloc_value(value);
self.write_set.insert(KeyFingerprint::from_bytes(key));
self.ops.push(WriteOp {
key: key_ref,
value: val_ref,
is_delete: false,
});
}
#[inline]
pub fn delete(&mut self, key: &[u8]) {
let key_ref = self.arena.alloc_key(key);
self.write_set.insert(KeyFingerprint::from_bytes(key));
self.ops.push(WriteOp {
key: key_ref,
value: BytesRef::null(),
is_delete: true,
});
}
#[inline]
pub fn record_read(&mut self, key: &[u8]) {
self.read_set.insert_bytes(key);
}
#[inline]
pub fn len(&self) -> usize {
self.ops.len()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.ops.is_empty()
}
#[inline]
pub fn write_set(&self) -> &ArenaWriteSet {
&self.write_set
}
#[inline]
pub fn read_set(&self) -> &ArenaWriteSet {
&self.read_set
}
#[inline]
pub fn bytes_used(&self) -> usize {
self.arena.bytes_used()
}
#[inline]
pub fn iter(&self) -> impl Iterator<Item = &WriteOp> {
self.ops.iter()
}
pub fn iter_resolved(&self) -> impl Iterator<Item = (&[u8], Option<&[u8]>, bool)> {
self.ops.iter().map(move |op| {
let key = op.key.resolve(&self.arena);
let value = if op.is_delete {
None
} else {
Some(op.value.resolve(&self.arena))
};
(key, value, op.is_delete)
})
}
#[inline]
pub fn arena(&self) -> &TxnArena {
&self.arena
}
pub fn clear(&mut self) {
self.ops.clear();
self.write_set.clear();
self.read_set.clear();
self.arena.reset();
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_txn_arena_basic() {
let arena = TxnArena::new(1);
let key_ref = arena.alloc_key(b"users/12345");
let val_ref = arena.alloc_value(b"Alice");
assert_eq!(key_ref.resolve(&arena), b"users/12345");
assert_eq!(val_ref.resolve(&arena), b"Alice");
assert_eq!(arena.key_count(), 1);
assert_eq!(arena.value_count(), 1);
}
#[test]
fn test_bytes_ref_hash() {
let arena = TxnArena::new(1);
let key1 = arena.alloc_key(b"test_key");
let key2 = arena.alloc_key(b"test_key");
let key3 = arena.alloc_key(b"other_key");
assert_eq!(key1.hash(), key2.hash());
assert_ne!(key1.hash(), key3.hash());
}
#[test]
fn test_key_fingerprint() {
let fp1 = KeyFingerprint::from_bytes(b"test_key");
let fp2 = KeyFingerprint::from_bytes(b"test_key");
let fp3 = KeyFingerprint::from_bytes(b"other_key");
assert_eq!(fp1, fp2);
assert_ne!(fp1, fp3);
}
#[test]
fn test_arena_write_set() {
let mut ws1 = ArenaWriteSet::new();
let mut ws2 = ArenaWriteSet::new();
ws1.insert_bytes(b"key1");
ws1.insert_bytes(b"key2");
ws2.insert_bytes(b"key3");
ws2.insert_bytes(b"key4");
assert!(ws1.is_disjoint(&ws2));
ws2.insert_bytes(b"key1");
assert!(!ws1.is_disjoint(&ws2));
}
#[test]
fn test_txn_write_buffer() {
let mut buffer = TxnWriteBuffer::new(42);
buffer.put(b"key1", b"value1");
buffer.put(b"key2", b"value2");
buffer.delete(b"key3");
assert_eq!(buffer.len(), 3);
assert_eq!(buffer.write_set().len(), 3);
let ops: Vec<_> = buffer.iter_resolved().collect();
assert_eq!(
ops[0],
(b"key1".as_slice(), Some(b"value1".as_slice()), false)
);
assert_eq!(
ops[1],
(b"key2".as_slice(), Some(b"value2".as_slice()), false)
);
assert_eq!(ops[2], (b"key3".as_slice(), None, true));
}
#[test]
fn test_arena_reset() {
let arena = TxnArena::new(1);
for i in 0..100 {
let key = format!("key_{}", i);
arena.alloc_key(key.as_bytes());
}
assert_eq!(arena.key_count(), 100);
let used_before = arena.bytes_used();
assert!(used_before > 0);
arena.reset();
assert_eq!(arena.key_count(), 0);
assert_eq!(arena.bytes_used(), 0);
}
}