use std::cell::UnsafeCell;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Arc, Condvar, Mutex};
use super::LSN;
pub const DEFAULT_RING_BUFFER_CAPACITY: usize = 1024;
#[derive(Debug, Clone)]
pub struct BackpressureConfig {
pub initial_spins: u32,
pub max_spins: u32,
pub base_sleep_us: u64,
pub max_sleep_us: u64,
}
impl BackpressureConfig {
pub fn validate(&self) -> Result<(), String> {
if self.initial_spins == 0 {
return Err("initial_spins must be > 0 to prevent infinite spin loops".to_string());
}
if self.max_spins < self.initial_spins {
return Err("max_spins must be >= initial_spins".to_string());
}
Ok(())
}
}
impl Default for BackpressureConfig {
fn default() -> Self {
Self {
initial_spins: 10,
max_spins: 1000,
base_sleep_us: 1,
max_sleep_us: 1000,
}
}
}
impl BackpressureConfig {
pub fn low_latency() -> Self {
Self {
initial_spins: 100,
max_spins: 10_000,
base_sleep_us: 1,
max_sleep_us: 100,
}
}
pub fn high_throughput() -> Self {
Self {
initial_spins: 5,
max_spins: 100,
base_sleep_us: 10,
max_sleep_us: 10_000,
}
}
}
#[derive(Debug)]
pub struct PendingEntry {
pub lsn: LSN,
pub data: Vec<u8>,
pub completion: Option<Arc<CompletionNotifier>>,
}
impl PendingEntry {
#[inline]
pub fn new_async(lsn: LSN, data: Vec<u8>) -> Self {
Self {
lsn,
data,
completion: None,
}
}
#[inline]
pub fn new_sync(lsn: LSN, data: Vec<u8>) -> (Self, CompletionHandle) {
let notifier = Arc::new(CompletionNotifier::new());
let handle = CompletionHandle(Arc::clone(¬ifier));
let entry = Self {
lsn,
data,
completion: Some(notifier),
};
(entry, handle)
}
#[inline]
pub fn notify_completion(&self) {
if let Some(ref notifier) = self.completion {
notifier.notify_success();
}
}
#[inline]
pub fn notify_error(&self, error: &str) {
if let Some(ref notifier) = self.completion {
notifier.notify_error(error);
}
}
}
impl Drop for PendingEntry {
fn drop(&mut self) {
#[allow(clippy::collapsible_if)]
if let Some(ref notifier) = self.completion {
if !notifier.is_complete() {
notifier.notify_error("PendingEntry dropped before flush");
}
}
}
}
#[repr(u8)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum CompletionState {
Pending = 0,
Complete = 1,
Error = 2,
}
#[derive(Debug)]
pub struct CompletionNotifier {
state: AtomicU64,
error: Mutex<Option<String>>,
condvar: Condvar,
wait_mutex: Mutex<()>,
}
impl CompletionNotifier {
pub fn new() -> Self {
Self {
state: AtomicU64::new(CompletionState::Pending as u64),
error: Mutex::new(None),
condvar: Condvar::new(),
wait_mutex: Mutex::new(()),
}
}
#[inline]
pub fn is_complete(&self) -> bool {
self.state.load(Ordering::Acquire) != CompletionState::Pending as u64
}
pub fn notify_success(&self) {
let _guard = self.wait_mutex.lock().unwrap_or_else(|e| e.into_inner());
self.state
.store(CompletionState::Complete as u64, Ordering::Release);
self.condvar.notify_all();
}
pub fn notify_error(&self, error: &str) {
let _guard = self.wait_mutex.lock().unwrap_or_else(|e| e.into_inner());
{
let mut err = self.error.lock().unwrap_or_else(|e| e.into_inner());
*err = Some(error.to_string());
}
self.state
.store(CompletionState::Error as u64, Ordering::Release);
self.condvar.notify_all();
}
pub fn wait(&self) -> Result<(), String> {
let guard = self.wait_mutex.lock().unwrap_or_else(|e| e.into_inner());
let state = self.state.load(Ordering::Acquire);
if state == CompletionState::Complete as u64 {
return Ok(());
}
if state == CompletionState::Error as u64 {
let err = self.error.lock().unwrap_or_else(|e| e.into_inner());
return Err(err.clone().unwrap_or_else(|| "Unknown error".to_string()));
}
let _guard = self
.condvar
.wait_while(guard, |_| {
self.state.load(Ordering::Acquire) == CompletionState::Pending as u64
})
.unwrap_or_else(|e| e.into_inner());
let state = self.state.load(Ordering::Acquire);
if state == CompletionState::Complete as u64 {
Ok(())
} else {
let err = self.error.lock().unwrap_or_else(|e| e.into_inner());
Err(err.clone().unwrap_or_else(|| "Unknown error".to_string()))
}
}
}
impl Default for CompletionNotifier {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct CompletionHandle(pub(crate) Arc<CompletionNotifier>);
impl CompletionHandle {
pub fn wait(self) -> Result<(), String> {
self.0.wait()
}
#[inline]
pub fn is_complete(&self) -> bool {
self.0.is_complete()
}
}
struct Slot {
entry: UnsafeCell<Option<PendingEntry>>,
sequence: CacheLinePadded<AtomicU64>,
}
unsafe impl Send for Slot {}
unsafe impl Sync for Slot {}
#[repr(align(64))]
struct CacheLinePadded<T> {
value: T,
}
impl<T> CacheLinePadded<T> {
fn new(value: T) -> Self {
Self { value }
}
}
impl<T> std::ops::Deref for CacheLinePadded<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
&self.value
}
}
pub struct WalRingBuffer {
slots: Box<[Slot]>,
capacity: usize,
mask: usize,
write_pos: CacheLinePadded<AtomicU64>,
read_pos: CacheLinePadded<AtomicU64>,
closed: AtomicBool,
backpressure: BackpressureConfig,
}
unsafe impl Send for WalRingBuffer {}
unsafe impl Sync for WalRingBuffer {}
impl WalRingBuffer {
pub fn new(capacity: usize) -> Self {
Self::with_config(capacity, BackpressureConfig::default())
}
pub fn with_config(capacity: usize, backpressure: BackpressureConfig) -> Self {
assert!(capacity > 0, "Ring buffer capacity must be > 0");
backpressure.validate().expect("Invalid BackpressureConfig");
let capacity = capacity.next_power_of_two();
let mask = capacity - 1;
let slots: Vec<Slot> = (0..capacity)
.map(|i| Slot {
entry: UnsafeCell::new(None),
sequence: CacheLinePadded::new(AtomicU64::new(i as u64)),
})
.collect();
Self {
slots: slots.into_boxed_slice(),
capacity,
mask,
write_pos: CacheLinePadded::new(AtomicU64::new(0)),
read_pos: CacheLinePadded::new(AtomicU64::new(0)),
closed: AtomicBool::new(false),
backpressure,
}
}
pub fn with_default_capacity() -> Self {
Self::new(DEFAULT_RING_BUFFER_CAPACITY)
}
#[inline]
pub fn capacity(&self) -> usize {
self.capacity
}
#[inline]
pub fn is_closed(&self) -> bool {
self.closed.load(Ordering::Acquire)
}
pub fn close(&self) {
self.closed.store(true, Ordering::Release);
}
pub fn try_append(&self, entry: PendingEntry) -> Result<(), PendingEntry> {
if self.is_closed() {
return Err(entry);
}
let mut spin_count = 0u32;
let mut current_spin_limit = self.backpressure.initial_spins;
loop {
let pos = self.write_pos.load(Ordering::Relaxed);
let idx = (pos as usize) & self.mask;
let slot = &self.slots[idx];
let expected_seq = pos;
let current_seq = slot.sequence.load(Ordering::Acquire);
if current_seq == expected_seq {
match self.write_pos.compare_exchange_weak(
pos,
pos.wrapping_add(1),
Ordering::AcqRel,
Ordering::Relaxed,
) {
Ok(_) => {
unsafe {
*slot.entry.get() = Some(entry);
}
slot.sequence
.store(expected_seq.wrapping_add(1), Ordering::Release);
return Ok(());
}
Err(_) => {
continue;
}
}
} else {
let distance_behind = expected_seq.wrapping_sub(current_seq);
if distance_behind > 0 && distance_behind <= self.capacity as u64 {
spin_count += 1;
if spin_count >= current_spin_limit {
if current_spin_limit >= self.backpressure.max_spins {
return Err(entry);
}
current_spin_limit = (current_spin_limit.max(1).saturating_mul(2))
.min(self.backpressure.max_spins);
spin_count = 0;
}
std::hint::spin_loop();
} else {
continue;
}
}
}
}
pub fn append_blocking(&self, entry: PendingEntry) -> Result<(), PendingEntry> {
let mut current_entry = entry;
let mut sleep_us = self.backpressure.base_sleep_us;
loop {
match self.try_append(current_entry) {
Ok(()) => return Ok(()),
Err(e) => {
if self.is_closed() {
return Err(e);
}
if sleep_us > 0 {
std::thread::sleep(std::time::Duration::from_micros(sleep_us));
sleep_us = (sleep_us.saturating_mul(2)).min(self.backpressure.max_sleep_us);
} else {
std::thread::yield_now();
}
current_entry = e;
}
}
}
}
pub fn drain(&self) -> Vec<PendingEntry> {
let mut entries = Vec::new();
loop {
let pos = self.read_pos.load(Ordering::Relaxed);
let idx = (pos as usize) & self.mask;
let slot = &self.slots[idx];
let expected_seq = pos.wrapping_add(1);
let current_seq = slot.sequence.load(Ordering::Acquire);
if current_seq == expected_seq {
match self.read_pos.compare_exchange_weak(
pos,
pos.wrapping_add(1),
Ordering::AcqRel,
Ordering::Relaxed,
) {
Ok(_) => {
let entry = unsafe { (*slot.entry.get()).take() };
slot.sequence
.store(pos.wrapping_add(self.capacity as u64), Ordering::Release);
if let Some(e) = entry {
entries.push(e);
}
}
Err(_) => {
continue;
}
}
} else {
break;
}
}
entries
}
#[inline]
pub fn len_approx(&self) -> usize {
let write = self.write_pos.load(Ordering::Relaxed);
let read = self.read_pos.load(Ordering::Relaxed);
write.wrapping_sub(read) as usize
}
#[inline]
pub fn is_empty_approx(&self) -> bool {
self.len_approx() == 0
}
}
impl Drop for WalRingBuffer {
fn drop(&mut self) {
self.close();
let remaining = self.drain();
if !remaining.is_empty() {
#[cfg(debug_assertions)]
eprintln!(
"WalRingBuffer::drop: {} entries were not flushed",
remaining.len()
);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::AtomicUsize;
use std::thread;
#[test]
fn test_completion_notifier_success() {
let notifier = CompletionNotifier::new();
assert!(!notifier.is_complete());
notifier.notify_success();
assert!(notifier.is_complete());
assert!(notifier.wait().is_ok());
}
#[test]
fn test_backpressure_config_equal_spins() {
let config = BackpressureConfig {
initial_spins: 10,
max_spins: 10, base_sleep_us: 1,
max_sleep_us: 10,
};
let _ = WalRingBuffer::with_config(1024, config);
}
#[test]
fn test_completion_notifier_error() {
let notifier = CompletionNotifier::new();
notifier.notify_error("test error");
assert!(notifier.is_complete());
let result = notifier.wait();
assert!(result.is_err());
assert_eq!(result.unwrap_err(), "test error");
}
#[test]
fn test_completion_handle_wait() {
let notifier = Arc::new(CompletionNotifier::new());
let handle = CompletionHandle(Arc::clone(¬ifier));
let notifier_clone = Arc::clone(¬ifier);
let t = thread::spawn(move || {
thread::sleep(std::time::Duration::from_millis(10));
notifier_clone.notify_success();
});
assert!(handle.wait().is_ok());
t.join().unwrap();
}
#[test]
fn test_ring_buffer_capacity_rounding() {
let buf = WalRingBuffer::new(100);
assert_eq!(buf.capacity(), 128);
let buf = WalRingBuffer::new(64);
assert_eq!(buf.capacity(), 64);
let buf = WalRingBuffer::new(1);
assert_eq!(buf.capacity(), 1);
}
#[test]
fn test_ring_buffer_single_thread() {
let buf = WalRingBuffer::new(16);
for i in 0..10 {
let entry = PendingEntry::new_async(LSN(i), vec![i as u8]);
assert!(buf.try_append(entry).is_ok());
}
assert_eq!(buf.len_approx(), 10);
let entries = buf.drain();
assert_eq!(entries.len(), 10);
for (i, entry) in entries.iter().enumerate() {
assert_eq!(entry.lsn, LSN(i as u64));
assert_eq!(entry.data, vec![i as u8]);
}
assert!(buf.is_empty_approx());
}
#[test]
fn test_ring_buffer_full() {
let buf = WalRingBuffer::new(4);
for i in 0..4 {
let entry = PendingEntry::new_async(LSN(i), vec![]);
assert!(buf.try_append(entry).is_ok());
}
let entry = PendingEntry::new_async(LSN(4), vec![]);
assert!(buf.try_append(entry).is_err());
}
#[test]
fn test_ring_buffer_closed() {
let buf = WalRingBuffer::new(16);
buf.close();
assert!(buf.is_closed());
let entry = PendingEntry::new_async(LSN(0), vec![]);
assert!(buf.try_append(entry).is_err());
}
#[test]
fn test_ring_buffer_drain_empty() {
let buf = WalRingBuffer::new(16);
let entries = buf.drain();
assert!(entries.is_empty());
}
#[test]
fn test_ring_buffer_concurrent_producers() {
let buf = Arc::new(WalRingBuffer::new(1024));
let num_threads = 8;
let entries_per_thread = 100;
let total_appended = Arc::new(AtomicUsize::new(0));
let handles: Vec<_> = (0..num_threads)
.map(|thread_id| {
let buf = Arc::clone(&buf);
let total = Arc::clone(&total_appended);
thread::spawn(move || {
for i in 0..entries_per_thread {
let lsn = LSN((thread_id * entries_per_thread + i) as u64);
let entry = PendingEntry::new_async(lsn, vec![thread_id as u8, i as u8]);
if buf.append_blocking(entry).is_ok() {
total.fetch_add(1, Ordering::Relaxed);
}
}
})
})
.collect();
for h in handles {
h.join().unwrap();
}
let entries = buf.drain();
let total = total_appended.load(Ordering::Relaxed);
assert_eq!(entries.len(), total);
assert_eq!(total, num_threads * entries_per_thread);
}
#[test]
fn test_pending_entry_sync_mode() {
let (entry, handle) = PendingEntry::new_sync(LSN(42), vec![1, 2, 3]);
assert_eq!(entry.lsn, LSN(42));
assert_eq!(entry.data, vec![1, 2, 3]);
assert!(entry.completion.is_some());
assert!(!handle.is_complete());
entry.notify_completion();
assert!(handle.is_complete());
}
#[test]
fn test_ring_buffer_slot_reuse() {
let buf = WalRingBuffer::new(4);
for i in 0..4 {
let entry = PendingEntry::new_async(LSN(i), vec![i as u8]);
assert!(buf.try_append(entry).is_ok());
}
let entries = buf.drain();
assert_eq!(entries.len(), 4);
for i in 4..8 {
let entry = PendingEntry::new_async(LSN(i), vec![i as u8]);
assert!(buf.try_append(entry).is_ok());
}
let entries = buf.drain();
assert_eq!(entries.len(), 4);
for (i, entry) in entries.iter().enumerate() {
assert_eq!(entry.lsn, LSN((i + 4) as u64));
}
}
#[test]
fn test_ring_buffer_many_cycles() {
let buf = WalRingBuffer::new(4);
for cycle in 0..1000u64 {
for i in 0..4 {
let lsn = LSN(cycle * 4 + i);
let entry = PendingEntry::new_async(lsn, vec![(lsn.0 % 256) as u8]);
assert!(
buf.try_append(entry).is_ok(),
"Failed at cycle {}, entry {}",
cycle,
i
);
}
let entries = buf.drain();
assert_eq!(entries.len(), 4, "Drain failed at cycle {}", cycle);
for (i, entry) in entries.iter().enumerate() {
assert_eq!(
entry.lsn,
LSN(cycle * 4 + i as u64),
"Wrong LSN at cycle {}, entry {}",
cycle,
i
);
}
}
}
#[test]
fn test_ring_buffer_interleaved_append_drain() {
let buf = WalRingBuffer::new(8);
for cycle in 0..10 {
for i in 0..3 {
let lsn = LSN((cycle * 3 + i) as u64);
let entry = PendingEntry::new_async(lsn, vec![]);
assert!(buf.try_append(entry).is_ok());
}
let entries = buf.drain();
assert_eq!(entries.len(), 3);
}
}
#[test]
fn test_backpressure_config_default() {
let config = BackpressureConfig::default();
assert_eq!(config.initial_spins, 10);
assert_eq!(config.max_spins, 1000);
assert_eq!(config.base_sleep_us, 1);
assert_eq!(config.max_sleep_us, 1000);
}
#[test]
fn test_backpressure_config_presets() {
let low_latency = BackpressureConfig::low_latency();
assert!(low_latency.initial_spins > BackpressureConfig::default().initial_spins);
assert!(low_latency.max_sleep_us < BackpressureConfig::default().max_sleep_us);
let high_throughput = BackpressureConfig::high_throughput();
assert!(high_throughput.initial_spins < BackpressureConfig::default().initial_spins);
assert!(high_throughput.max_sleep_us > BackpressureConfig::default().max_sleep_us);
}
#[test]
fn test_ring_buffer_with_custom_backpressure() {
let config = BackpressureConfig {
initial_spins: 5,
max_spins: 50,
base_sleep_us: 10,
max_sleep_us: 100,
};
let buf = WalRingBuffer::with_config(4, config);
let entry = PendingEntry::new_async(LSN(1), vec![1, 2, 3]);
assert!(buf.try_append(entry).is_ok());
let entries = buf.drain();
assert_eq!(entries.len(), 1);
}
#[test]
fn test_backpressure_exponential_spin() {
let config = BackpressureConfig {
initial_spins: 2,
max_spins: 8, base_sleep_us: 0,
max_sleep_us: 0,
};
let buf = WalRingBuffer::with_config(2, config);
buf.try_append(PendingEntry::new_async(LSN(1), vec![]))
.unwrap();
buf.try_append(PendingEntry::new_async(LSN(2), vec![]))
.unwrap();
let result = buf.try_append(PendingEntry::new_async(LSN(3), vec![]));
assert!(result.is_err());
}
#[test]
fn test_concurrent_append_and_drain() {
use std::sync::{Barrier, atomic::AtomicBool};
let buf = Arc::new(WalRingBuffer::new(16));
let num_producers = 4;
let items_per_producer = 1000;
let total_items = num_producers * items_per_producer;
let barrier = Arc::new(Barrier::new(num_producers + 1));
let producers_done = Arc::new(AtomicBool::new(false));
let mut handles = Vec::new();
for p in 0..num_producers {
let buf = Arc::clone(&buf);
let barrier = Arc::clone(&barrier);
handles.push(thread::spawn(move || {
barrier.wait();
for i in 0..items_per_producer {
let val = (p * items_per_producer + i) as u64;
let entry = PendingEntry::new_async(LSN(val), val.to_le_bytes().to_vec());
buf.append_blocking(entry).unwrap();
}
}));
}
let buf_clone = Arc::clone(&buf);
let barrier_clone = Arc::clone(&barrier);
let done_flag = Arc::clone(&producers_done);
let consumer_handle = thread::spawn(move || {
let mut drained_count = 0;
let mut checksum = 0u64;
barrier_clone.wait();
while drained_count < total_items {
let entries = buf_clone.drain();
if entries.is_empty() {
if done_flag.load(Ordering::Acquire) && drained_count < total_items {
}
thread::yield_now();
continue;
}
for entry in entries {
drained_count += 1;
let val = u64::from_le_bytes(entry.data.as_slice().try_into().unwrap());
checksum = checksum.wrapping_add(val);
}
}
(drained_count, checksum)
});
for h in handles {
h.join().unwrap();
}
producers_done.store(true, Ordering::Release);
let (drained, checksum) = consumer_handle.join().unwrap();
assert_eq!(drained, total_items);
let expected_checksum = (0..total_items as u64).fold(0u64, |sum, i| sum.wrapping_add(i));
assert_eq!(checksum, expected_checksum);
}
#[test]
fn test_drop_safety() {
let buf = WalRingBuffer::new(16);
for i in 0..5 {
buf.try_append(PendingEntry::new_async(LSN(i as u64), vec![]))
.unwrap();
}
assert_eq!(buf.len_approx(), 5);
drop(buf);
}
#[test]
fn test_completion_notifier_multiple_waiters() {
let notifier = Arc::new(CompletionNotifier::new());
let handle = CompletionHandle(Arc::clone(¬ifier));
let num_waiters = 10;
let mut handles = Vec::new();
for _ in 0..num_waiters {
let h = handle.clone();
handles.push(thread::spawn(move || {
h.wait().unwrap();
}));
}
thread::sleep(std::time::Duration::from_millis(10));
notifier.notify_success();
for h in handles {
h.join().unwrap();
}
}
#[test]
fn test_ring_buffer_getters() {
let buf = WalRingBuffer::new(1024);
assert_eq!(buf.capacity(), 1024);
assert!(!buf.is_closed());
buf.close();
assert!(buf.is_closed());
}
#[cfg(test)]
impl WalRingBuffer {
pub fn set_state_for_wraparound_test(&mut self, write_pos: u64, read_pos: u64) {
self.write_pos.store(write_pos, Ordering::Relaxed);
self.read_pos.store(read_pos, Ordering::Relaxed);
for i in 0..self.capacity {
let slot_pos = write_pos.wrapping_add(i as u64);
let slot_idx = (slot_pos % self.capacity as u64) as usize;
self.slots[slot_idx]
.sequence
.store(slot_pos, Ordering::Relaxed);
unsafe {
*self.slots[slot_idx].entry.get() = None;
}
}
}
}
#[test]
fn test_wraparound_append_no_panic() {
let mut buf = WalRingBuffer::new(4);
let start_pos = u64::MAX - 2;
buf.set_state_for_wraparound_test(start_pos, start_pos);
for i in 0..4 {
let lsn = LSN(i);
let entry = PendingEntry::new_async(lsn, vec![]);
buf.try_append(entry)
.expect("Append should not panic near wraparound");
}
let expected_pos = start_pos.wrapping_add(4);
assert_eq!(buf.write_pos.load(Ordering::Relaxed), expected_pos);
}
#[test]
fn test_wraparound_drain_no_panic() {
let mut buf = WalRingBuffer::new(4);
let start_pos = u64::MAX - 2;
buf.set_state_for_wraparound_test(start_pos, start_pos);
for i in 0..4 {
let lsn = LSN(i);
let entry = PendingEntry::new_async(lsn, vec![]);
buf.try_append(entry).expect("Should append");
}
let entries = buf.drain();
assert_eq!(entries.len(), 4, "Should drain all entries near wraparound");
let expected_pos = start_pos.wrapping_add(4);
assert_eq!(buf.read_pos.load(Ordering::Relaxed), expected_pos);
}
#[test]
fn test_wraparound_logic() {
let capacity = 4;
let mut buf = WalRingBuffer::new(capacity);
let start_pos = u64::MAX - (capacity as u64 / 2);
buf.set_state_for_wraparound_test(start_pos, start_pos);
for i in 0..capacity {
let lsn = LSN(i as u64);
let entry = PendingEntry::new_async(lsn, vec![i as u8]);
assert!(
buf.try_append(entry).is_ok(),
"Append should succeed when buffer has space"
);
}
let full_entry = PendingEntry::new_async(LSN(capacity as u64), vec![]);
assert!(
buf.try_append(full_entry).is_err(),
"Append should fail when buffer is full across wraparound"
);
let entries = buf.drain();
assert_eq!(
entries.len(),
capacity,
"Should drain all appended entries across wraparound"
);
for (i, entry) in entries.iter().enumerate() {
assert_eq!(entry.data, vec![i as u8]);
}
assert!(
buf.drain().is_empty(),
"Buffer should be empty after draining"
);
let final_entry = PendingEntry::new_async(LSN(100), vec![100]);
assert!(buf.try_append(final_entry).is_ok());
let final_drained = buf.drain();
assert_eq!(final_drained.len(), 1);
assert_eq!(final_drained[0].data, vec![100]);
}
#[test]
fn test_havoc_ring_buffer_len_approx_wraparound() {
let capacity = 4;
let mut buf = WalRingBuffer::new(capacity);
let start_pos = u64::MAX - 2;
buf.set_state_for_wraparound_test(start_pos, start_pos);
for i in 0..3 {
buf.try_append(PendingEntry::new_async(LSN(i), vec![]))
.unwrap();
}
let len = buf.len_approx();
assert_eq!(
len, 3,
"👺 HAVOC SUCCESS: len_approx() failed on wraparound! Expected 3, got {}",
len
);
}
#[test]
#[should_panic(expected = "Ring buffer capacity must be > 0")]
fn test_ring_buffer_zero_capacity() {
let _ = WalRingBuffer::new(0);
}
#[test]
#[should_panic(expected = "Invalid BackpressureConfig: \"max_spins must be >= initial_spins\"")]
fn test_backpressure_invalid_config() {
let config = BackpressureConfig {
initial_spins: 100,
max_spins: 10, base_sleep_us: 1,
max_sleep_us: 10,
};
let _ = WalRingBuffer::with_config(1024, config);
}
#[test]
#[should_panic(
expected = "Invalid BackpressureConfig: \"initial_spins must be > 0 to prevent infinite spin loops\""
)]
fn test_backpressure_zero_initial_spins() {
let config = BackpressureConfig {
initial_spins: 0, max_spins: 10,
base_sleep_us: 1,
max_sleep_us: 10,
};
let _ = WalRingBuffer::with_config(1024, config);
}
}
#[cfg(test)]
mod sentry_tests {
use super::*;
#[test]
fn test_sentry_dropped_entry_notifies_error() {
let (entry, handle) = PendingEntry::new_sync(LSN(100), vec![]);
assert!(!handle.is_complete(), "Handle should be pending initially");
drop(entry);
assert!(
handle.is_complete(),
"Handle should be complete (error) after entry drop"
);
let result = handle.wait();
assert!(result.is_err(), "Handle should return error");
let err = result.unwrap_err();
assert!(
err.contains("PendingEntry dropped"),
"Error should mention dropped entry"
);
}
#[test]
fn test_buffer_drop_notifies_waiters() {
let buf = WalRingBuffer::new(16);
let (entry, handle) = PendingEntry::new_sync(LSN(100), vec![]);
buf.try_append(entry).expect("Append should succeed");
assert!(!handle.is_complete());
drop(buf);
let result = handle.wait();
assert!(
result.is_err(),
"Waiter should be notified of error on buffer drop"
);
let err = result.unwrap_err();
assert!(
err.contains("PendingEntry dropped"),
"Error should identify that entry was dropped"
);
}
#[test]
fn test_drain_stops_at_gap() {
let mut buf = WalRingBuffer::new(4);
buf.set_state_for_wraparound_test(2, 0);
buf.slots[0].sequence.store(0, Ordering::Relaxed);
let (entry0, _) = PendingEntry::new_sync(LSN(0), vec![0]);
unsafe {
*buf.slots[0].entry.get() = Some(entry0);
}
buf.slots[1].sequence.store(2, Ordering::Relaxed);
let (entry1, _) = PendingEntry::new_sync(LSN(1), vec![1]);
unsafe {
*buf.slots[1].entry.get() = Some(entry1);
}
let drained = buf.drain();
assert!(
drained.is_empty(),
"Drain must stop at gap (slot 0) even if subsequent slots (slot 1) are ready"
);
buf.slots[0].sequence.store(1, Ordering::Relaxed);
let drained_retry = buf.drain();
assert_eq!(
drained_retry.len(),
2,
"Should drain both entries after gap is filled"
);
assert_eq!(drained_retry[0].lsn, LSN(0));
assert_eq!(drained_retry[1].lsn, LSN(1));
}
#[test]
fn test_wraparound_boundary_check() {
let capacity = 4;
let mut buf = WalRingBuffer::new(capacity);
let boundary = u64::MAX;
buf.set_state_for_wraparound_test(boundary, boundary);
let slot_idx = (boundary % capacity as u64) as usize;
let busy_seq = boundary.wrapping_sub(capacity as u64).wrapping_add(1);
buf.slots[slot_idx]
.sequence
.store(busy_seq, Ordering::Relaxed);
let entry_fail = PendingEntry::new_async(LSN(1), vec![]);
let result = buf.try_append(entry_fail);
assert!(
result.is_err(),
"Should return error when buffer is full at boundary"
);
buf.slots[slot_idx]
.sequence
.store(boundary, Ordering::Relaxed);
let entry_ok = PendingEntry::new_async(LSN(1), vec![]);
assert!(
buf.try_append(entry_ok).is_ok(),
"Should append successfully when slot is available at boundary"
);
assert_eq!(buf.write_pos.load(Ordering::Relaxed), 0);
let seq = buf.slots[slot_idx].sequence.load(Ordering::Relaxed);
assert_eq!(seq, 0, "Sequence should wrap to 0 after write at boundary");
}
}