use std::cell::UnsafeCell;
use std::hash::{BuildHasher, Hasher};
use std::sync::atomic::{AtomicUsize, Ordering};
use rustc_hash::FxBuildHasher;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub enum StateOp {
Put = 0,
Delete = 1,
}
impl StateOp {
#[inline]
#[must_use]
pub const fn to_u8(self) -> u8 {
self as u8
}
#[inline]
#[must_use]
pub const fn from_u8(val: u8) -> Self {
match val {
1 => Self::Delete,
_ => Self::Put,
}
}
}
#[derive(Debug, Clone, Copy)]
#[repr(C)]
pub struct StateChangelogEntry {
pub epoch: u64,
pub key_hash: u64,
pub mmap_offset: u64,
pub value_len: u32,
op: u8,
_padding: [u8; 3],
}
impl StateChangelogEntry {
#[inline]
#[must_use]
pub fn put(epoch: u64, key_hash: u64, mmap_offset: u64, value_len: u32) -> Self {
Self {
epoch,
key_hash,
mmap_offset,
value_len,
op: StateOp::Put.to_u8(),
_padding: [0; 3],
}
}
#[inline]
#[must_use]
pub fn delete(epoch: u64, key_hash: u64) -> Self {
Self {
epoch,
key_hash,
mmap_offset: 0,
value_len: 0,
op: StateOp::Delete.to_u8(),
_padding: [0; 3],
}
}
#[inline]
#[must_use]
pub fn from_key(key: &[u8], epoch: u64, mmap_offset: u64, value_len: u32, op: StateOp) -> Self {
let key_hash = Self::hash_key(key);
Self {
epoch,
key_hash,
mmap_offset,
value_len,
op: op.to_u8(),
_padding: [0; 3],
}
}
#[inline]
#[must_use]
pub fn op(&self) -> StateOp {
StateOp::from_u8(self.op)
}
#[inline]
#[must_use]
pub fn is_put(&self) -> bool {
self.op == StateOp::Put.to_u8()
}
#[inline]
#[must_use]
pub fn is_delete(&self) -> bool {
self.op == StateOp::Delete.to_u8()
}
#[inline]
#[must_use]
pub fn hash_key(key: &[u8]) -> u64 {
let hasher_builder = FxBuildHasher;
let mut hasher = hasher_builder.build_hasher();
hasher.write(key);
hasher.finish()
}
}
const _: () = assert!(std::mem::size_of::<StateChangelogEntry>() == 32);
pub struct StateChangelogBuffer {
entries: Box<[UnsafeCell<StateChangelogEntry>]>,
write_pos: AtomicUsize,
read_pos: AtomicUsize,
capacity: usize,
current_epoch: u64,
total_pushed: AtomicUsize,
total_drained: AtomicUsize,
overflow_count: AtomicUsize,
#[cfg(debug_assertions)]
producer_thread_id: std::sync::Mutex<Option<std::thread::ThreadId>>,
}
impl StateChangelogBuffer {
pub const DEFAULT_CAPACITY: usize = 16 * 1024;
#[must_use]
pub fn with_capacity(capacity: usize) -> Self {
assert!(capacity > 0, "capacity must be > 0");
let capacity = capacity.next_power_of_two();
let zero_entry = StateChangelogEntry {
epoch: 0,
key_hash: 0,
mmap_offset: 0,
value_len: 0,
op: 0,
_padding: [0; 3],
};
let entries: Vec<UnsafeCell<StateChangelogEntry>> =
(0..capacity).map(|_| UnsafeCell::new(zero_entry)).collect();
Self {
entries: entries.into_boxed_slice(),
write_pos: AtomicUsize::new(0),
read_pos: AtomicUsize::new(0),
capacity,
current_epoch: 0,
total_pushed: AtomicUsize::new(0),
total_drained: AtomicUsize::new(0),
overflow_count: AtomicUsize::new(0),
#[cfg(debug_assertions)]
producer_thread_id: std::sync::Mutex::new(None),
}
}
#[must_use]
pub fn new() -> Self {
Self::with_capacity(Self::DEFAULT_CAPACITY)
}
pub fn set_epoch(&mut self, epoch: u64) {
self.current_epoch = epoch;
}
#[must_use]
pub fn epoch(&self) -> u64 {
self.current_epoch
}
pub fn advance_epoch(&mut self) -> u64 {
self.current_epoch += 1;
self.current_epoch
}
#[inline]
#[allow(clippy::missing_panics_doc)] pub fn push(&self, entry: StateChangelogEntry) -> bool {
#[cfg(debug_assertions)]
{
let current = std::thread::current().id();
let mut guard = self.producer_thread_id.lock().unwrap();
if let Some(expected) = *guard {
debug_assert_eq!(
current, expected,
"SPSC violation: push() called from a different thread"
);
} else {
*guard = Some(current);
}
}
let write_pos = self.write_pos.load(Ordering::Relaxed);
let read_pos = self.read_pos.load(Ordering::Acquire);
let next_pos = (write_pos + 1) & (self.capacity - 1);
if next_pos == read_pos {
self.overflow_count.fetch_add(1, Ordering::Relaxed);
return false;
}
#[allow(unsafe_code)]
unsafe {
self.entries[write_pos].get().write(entry);
}
self.write_pos.store(next_pos, Ordering::Release);
self.total_pushed.fetch_add(1, Ordering::Relaxed);
true
}
#[inline]
pub fn push_put(&self, key: &[u8], mmap_offset: u64, value_len: u32) -> bool {
let entry = StateChangelogEntry::from_key(
key,
self.current_epoch,
mmap_offset,
value_len,
StateOp::Put,
);
self.push(entry)
}
#[inline]
pub fn push_delete(&self, key: &[u8]) -> bool {
let entry = StateChangelogEntry::from_key(key, self.current_epoch, 0, 0, StateOp::Delete);
self.push(entry)
}
#[inline]
pub fn pop(&self) -> Option<StateChangelogEntry> {
let read_pos = self.read_pos.load(Ordering::Relaxed);
let write_pos = self.write_pos.load(Ordering::Acquire);
if read_pos == write_pos {
return None;
}
#[allow(unsafe_code)]
let entry = unsafe { self.entries[read_pos].get().read() };
let next_pos = (read_pos + 1) & (self.capacity - 1);
self.read_pos.store(next_pos, Ordering::Release);
self.total_drained.fetch_add(1, Ordering::Relaxed);
Some(entry)
}
pub fn drain(&self, max_count: usize) -> impl Iterator<Item = StateChangelogEntry> + '_ {
DrainIter {
buffer: self,
remaining: max_count,
}
}
pub fn drain_all(&self) -> impl Iterator<Item = StateChangelogEntry> + '_ {
self.drain(usize::MAX)
}
#[must_use]
pub fn len(&self) -> usize {
let write_pos = self.write_pos.load(Ordering::Acquire);
let read_pos = self.read_pos.load(Ordering::Acquire);
write_pos.wrapping_sub(read_pos) & (self.capacity - 1)
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.write_pos.load(Ordering::Acquire) == self.read_pos.load(Ordering::Acquire)
}
#[must_use]
pub fn is_full(&self) -> bool {
let write_pos = self.write_pos.load(Ordering::Acquire);
let read_pos = self.read_pos.load(Ordering::Acquire);
((write_pos + 1) & (self.capacity - 1)) == read_pos
}
#[must_use]
pub fn capacity(&self) -> usize {
self.capacity
}
#[must_use]
pub fn available(&self) -> usize {
self.capacity - self.len() - 1
}
#[must_use]
pub fn total_pushed(&self) -> usize {
self.total_pushed.load(Ordering::Relaxed)
}
#[must_use]
pub fn total_drained(&self) -> usize {
self.total_drained.load(Ordering::Relaxed)
}
#[must_use]
pub fn overflow_count(&self) -> usize {
self.overflow_count.load(Ordering::Relaxed)
}
pub fn clear(&self) {
let write_pos = self.write_pos.load(Ordering::Acquire);
self.read_pos.store(write_pos, Ordering::Release);
}
#[must_use]
pub fn checkpoint_barrier(&self) -> (u64, usize) {
(self.current_epoch, self.write_pos.load(Ordering::Acquire))
}
}
impl Default for StateChangelogBuffer {
fn default() -> Self {
Self::new()
}
}
#[allow(unsafe_code)]
unsafe impl Send for StateChangelogBuffer {}
#[allow(unsafe_code)]
unsafe impl Sync for StateChangelogBuffer {}
struct DrainIter<'a> {
buffer: &'a StateChangelogBuffer,
remaining: usize,
}
impl Iterator for DrainIter<'_> {
type Item = StateChangelogEntry;
fn next(&mut self) -> Option<Self::Item> {
if self.remaining == 0 {
return None;
}
self.remaining -= 1;
self.buffer.pop()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_state_op_roundtrip() {
assert_eq!(StateOp::from_u8(StateOp::Put.to_u8()), StateOp::Put);
assert_eq!(StateOp::from_u8(StateOp::Delete.to_u8()), StateOp::Delete);
assert_eq!(StateOp::from_u8(255), StateOp::Put); }
#[test]
fn test_changelog_entry_size() {
assert_eq!(std::mem::size_of::<StateChangelogEntry>(), 32);
}
#[test]
fn test_changelog_entry_put() {
let entry = StateChangelogEntry::put(1, 12345, 100, 50);
assert_eq!(entry.epoch, 1);
assert_eq!(entry.key_hash, 12345);
assert_eq!(entry.mmap_offset, 100);
assert_eq!(entry.value_len, 50);
assert!(entry.is_put());
assert!(!entry.is_delete());
}
#[test]
fn test_changelog_entry_delete() {
let entry = StateChangelogEntry::delete(2, 67890);
assert_eq!(entry.epoch, 2);
assert_eq!(entry.key_hash, 67890);
assert_eq!(entry.mmap_offset, 0);
assert_eq!(entry.value_len, 0);
assert!(entry.is_delete());
assert!(!entry.is_put());
}
#[test]
fn test_changelog_entry_from_key() {
let entry = StateChangelogEntry::from_key(b"test_key", 5, 200, 75, StateOp::Put);
assert_eq!(entry.epoch, 5);
assert_eq!(entry.mmap_offset, 200);
assert_eq!(entry.value_len, 75);
assert!(entry.is_put());
let entry2 = StateChangelogEntry::from_key(b"test_key", 6, 300, 80, StateOp::Delete);
assert_eq!(entry.key_hash, entry2.key_hash);
}
#[test]
fn test_buffer_basic_operations() {
let buffer = StateChangelogBuffer::with_capacity(16);
assert!(buffer.is_empty());
assert_eq!(buffer.len(), 0);
assert_eq!(buffer.capacity(), 16);
let entry = StateChangelogEntry::put(1, 100, 0, 10);
assert!(buffer.push(entry));
assert!(!buffer.is_empty());
assert_eq!(buffer.len(), 1);
let popped = buffer.pop().unwrap();
assert_eq!(popped.key_hash, 100);
assert!(buffer.is_empty());
}
#[test]
fn test_buffer_full() {
let buffer = StateChangelogBuffer::with_capacity(4);
for i in 0..3 {
assert!(buffer.push(StateChangelogEntry::put(1, i, 0, 10)));
}
assert!(buffer.is_full());
assert!(!buffer.push(StateChangelogEntry::put(1, 999, 0, 10)));
assert_eq!(buffer.overflow_count(), 1);
}
#[test]
fn test_buffer_drain() {
let buffer = StateChangelogBuffer::with_capacity(16);
for i in 0..5 {
buffer.push(StateChangelogEntry::put(1, i, 0, 10));
}
assert_eq!(buffer.len(), 5);
let drained: Vec<_> = buffer.drain(3).collect();
assert_eq!(drained.len(), 3);
assert_eq!(buffer.len(), 2);
let remaining: Vec<_> = buffer.drain_all().collect();
assert_eq!(remaining.len(), 2);
assert!(buffer.is_empty());
}
#[test]
fn test_buffer_epoch() {
let mut buffer = StateChangelogBuffer::with_capacity(16);
assert_eq!(buffer.epoch(), 0);
buffer.set_epoch(10);
assert_eq!(buffer.epoch(), 10);
assert_eq!(buffer.advance_epoch(), 11);
assert_eq!(buffer.epoch(), 11);
}
#[test]
fn test_buffer_push_helpers() {
let buffer = StateChangelogBuffer::with_capacity(16);
assert!(buffer.push_put(b"key1", 100, 50));
assert!(buffer.push_delete(b"key2"));
let entries: Vec<_> = buffer.drain_all().collect();
assert_eq!(entries.len(), 2);
assert!(entries[0].is_put());
assert!(entries[1].is_delete());
}
#[test]
fn test_buffer_clear() {
let buffer = StateChangelogBuffer::with_capacity(16);
for i in 0..5 {
buffer.push(StateChangelogEntry::put(1, i, 0, 10));
}
assert_eq!(buffer.len(), 5);
buffer.clear();
assert!(buffer.is_empty());
}
#[test]
fn test_buffer_checkpoint_barrier() {
let mut buffer = StateChangelogBuffer::with_capacity(16);
buffer.set_epoch(42);
buffer.push(StateChangelogEntry::put(42, 1, 0, 10));
buffer.push(StateChangelogEntry::put(42, 2, 0, 10));
let (epoch, pos) = buffer.checkpoint_barrier();
assert_eq!(epoch, 42);
assert_eq!(pos, 2);
}
#[test]
fn test_buffer_metrics() {
let buffer = StateChangelogBuffer::with_capacity(8);
for i in 0..5 {
buffer.push(StateChangelogEntry::put(1, i, 0, 10));
}
assert_eq!(buffer.total_pushed(), 5);
assert_eq!(buffer.total_drained(), 0);
let _ = buffer.pop();
let _ = buffer.pop();
assert_eq!(buffer.total_drained(), 2);
}
#[test]
fn test_entry_from_key() {
let key_hash = StateChangelogEntry::hash_key(b"mykey");
let put = StateChangelogEntry::put(100, key_hash, 500, 75);
assert_eq!(put.epoch, 100);
assert_eq!(put.mmap_offset, 500);
assert_eq!(put.value_len, 75);
assert!(put.is_put());
let delete = StateChangelogEntry::delete(100, key_hash);
assert_eq!(delete.epoch, 100);
assert!(delete.is_delete());
assert_eq!(put.key_hash, delete.key_hash);
}
#[test]
fn test_buffer_wraparound() {
let buffer = StateChangelogBuffer::with_capacity(4);
for iteration in 0..5 {
for i in 0..3 {
assert!(
buffer.push(StateChangelogEntry::put(1, i + iteration * 10, 0, 10)),
"Failed at iteration {iteration}, entry {i}"
);
}
let drained: Vec<_> = buffer.drain_all().collect();
assert_eq!(drained.len(), 3, "Failed at iteration {iteration}");
}
}
#[test]
fn test_key_hash_consistency() {
let key = b"consistent_key";
let hash1 = StateChangelogEntry::hash_key(key);
let hash2 = StateChangelogEntry::hash_key(key);
assert_eq!(hash1, hash2);
let different_key = b"different_key";
let hash3 = StateChangelogEntry::hash_key(different_key);
assert_ne!(hash1, hash3);
}
}