use std::{
cell::UnsafeCell,
pin::Pin,
sync::{
atomic::{AtomicPtr, AtomicUsize, Ordering},
Arc,
},
usize,
};
use io_uring::squeue::Entry;
use crate::{flush_behaviour::FlushBehavior, log_structured_store::FOUR_KB_PAGE};
use std::alloc::{alloc_zeroed, Layout};
#[derive(Debug)]
pub struct Buffer {
pub(crate) buffer: UnsafeCell<*mut u8>,
size: usize,
}
impl Buffer {
pub fn new_aligned(size: usize) -> Self {
let layout = Layout::from_size_align(size, FOUR_KB_PAGE).expect("invalid layout");
let ptr = unsafe { alloc_zeroed(layout) };
assert!(!ptr.is_null(), "aligned allocation failed");
Self {
buffer: UnsafeCell::new(ptr),
size,
}
}
}
impl Drop for Buffer {
fn drop(&mut self) {
let layout = Layout::from_size_align(self.size, FOUR_KB_PAGE).unwrap();
unsafe { std::alloc::dealloc(*self.buffer.get(), layout) };
}
}
unsafe impl Send for Buffer {}
unsafe impl Sync for Buffer {}
pub(crate) type SharedBuffer = Arc<Buffer>;
const SEALED_BIT: usize = 1 << 0;
const FLUSH_IN_PROGRESS_BIT: usize = 1 << 1;
const WRITER_SHIFT: usize = 8;
const WRITER_ONE: usize = 1 << WRITER_SHIFT;
const WRITER_MASK: usize = 0x00FF_FFFF00;
const OFFSET_SHIFT: usize = 32;
const OFFSET_ONE: usize = 1 << OFFSET_SHIFT;
pub const RING_SIZE: usize = 4;
#[inline(always)]
fn state_offset(state: usize) -> usize {
state >> OFFSET_SHIFT
}
#[inline(always)]
fn state_writers(state: usize) -> usize {
(state & WRITER_MASK) >> WRITER_SHIFT
}
#[inline(always)]
fn state_sealed(state: usize) -> bool {
state & SEALED_BIT != 0
}
#[inline(always)]
fn state_flush_in_progress(state: usize) -> bool {
state & FLUSH_IN_PROGRESS_BIT != 0
}
#[derive(Debug, Clone, Copy)]
pub enum BufferError {
InsufficientSpace,
EncounteredSealedBuffer,
EncounteredSealedBufferDuringCOMPEX,
EncounteredUnSealedBufferDuringCOMPEX,
ActiveUsers,
InvalidState,
RingExhausted,
FailedReservation,
FailedUnsealed,
}
#[derive(Debug, Clone)]
pub enum BufferMsg {
SealedBuffer,
SuccessfullWrite,
SuccessfullWriteFlush,
FreeToFlush(Arc<FlushBuffer>),
}
#[derive(Debug)]
pub struct FlushBuffer {
state: AtomicUsize,
pub(crate) buf: SharedBuffer,
pub(crate) pos: usize,
pub(crate) local_lss_address_slot: AtomicUsize,
pub(crate) submit_queue_entry: UnsafeCell<Option<Entry>>,
}
unsafe impl Send for FlushBuffer {}
unsafe impl Sync for FlushBuffer {}
impl FlushBuffer {
pub fn new_buffer(buffer_number: usize, size: usize) -> FlushBuffer {
Self {
state: AtomicUsize::new(0),
buf: Arc::new(Buffer::new_aligned(size)),
pos: buffer_number,
local_lss_address_slot: AtomicUsize::new(buffer_number),
submit_queue_entry: UnsafeCell::new(None),
}
}
pub fn set_new_address_space_range(&mut self, address_space: usize) -> Result<usize, usize> {
let range = self.local_lss_address_slot.load(Ordering::Relaxed);
self.local_lss_address_slot.compare_exchange(
range,
address_space,
Ordering::Acquire,
Ordering::Relaxed,
)
}
pub fn is_available(&self) -> bool {
self.state.load(Ordering::Acquire) & (SEALED_BIT | FLUSH_IN_PROGRESS_BIT) == 0
}
pub fn reserve_space(&self, payload_size: usize) -> Result<usize, BufferError> {
assert!(payload_size <= FOUR_KB_PAGE, "payload larger than buffer");
let state = self.state.load(Ordering::Acquire);
if state & (SEALED_BIT | FLUSH_IN_PROGRESS_BIT) != 0 {
return Err(BufferError::EncounteredSealedBuffer);
}
let offset = state_offset(state);
if offset + payload_size > FOUR_KB_PAGE {
return Err(BufferError::InsufficientSpace);
}
let new = state
.wrapping_add(payload_size * OFFSET_ONE)
.wrapping_add(WRITER_ONE);
match self
.state
.compare_exchange(state, new, Ordering::AcqRel, Ordering::Acquire)
{
Ok(_) => Ok(offset),
Err(_) => Err(BufferError::FailedReservation),
}
}
#[inline]
pub fn decrement_writers(&self) -> usize {
self.state.fetch_sub(WRITER_ONE, Ordering::AcqRel)
}
#[inline]
pub fn increment_writers(&self) -> usize {
self.state.fetch_add(WRITER_ONE, Ordering::AcqRel)
}
#[inline]
pub fn set_flush_in_progress(&self) -> usize {
self.state.fetch_or(FLUSH_IN_PROGRESS_BIT, Ordering::AcqRel)
}
#[inline]
pub fn clear_flush_in_progress(&self) -> usize {
self.state
.fetch_and(!FLUSH_IN_PROGRESS_BIT, Ordering::AcqRel)
}
pub fn write(&self, offset: usize, payload: &[u8]) {
debug_assert!(offset + payload.len() <= self.buf.size);
unsafe {
let dst = (*self.buf.buffer.get()).add(offset);
std::ptr::copy_nonoverlapping(payload.as_ptr(), dst, payload.len());
}
}
pub fn set_sealed_bit_true(&self) -> Result<(), BufferError> {
let prev = self.state.fetch_or(SEALED_BIT, Ordering::AcqRel);
if state_sealed(prev) {
Err(BufferError::EncounteredSealedBufferDuringCOMPEX)
} else {
Ok(())
}
}
#[allow(unused)]
pub(crate) fn set_sealed_bit_false(&self) -> Result<(), BufferError> {
let current = self.state.load(Ordering::Acquire);
if state_writers(current) != 0 || state_flush_in_progress(current) {
return Err(BufferError::ActiveUsers);
}
if !state_sealed(current) {
return Err(BufferError::EncounteredUnSealedBufferDuringCOMPEX);
}
match self.state.compare_exchange(
current,
current & !SEALED_BIT,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => Ok(()),
Err(_) => Err(BufferError::FailedUnsealed),
}
}
pub fn reset_offset(&self) {
loop {
let current = self.state.load(Ordering::Acquire);
let zeroed = current & 0x0000_0000_FFFF_FFFF;
if self
.state
.compare_exchange(current, zeroed, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
{
break;
}
}
}
#[cfg(test)]
pub(crate) fn state_snapshot(&self) -> usize {
self.state.load(Ordering::Acquire)
}
}
pub struct FlushBufferRing {
pub current_buffer: AtomicPtr<FlushBuffer>,
ring: Pin<Box<[Arc<FlushBuffer>]>>,
next_index: AtomicUsize,
pub next_address_range: AtomicUsize,
_size: usize,
store: Option<Arc<FlushBehavior>>,
}
impl FlushBufferRing {
pub fn with_buffer_amount(num_of_buffer: usize, buffer_size: usize) -> FlushBufferRing {
let buffers: Vec<Arc<FlushBuffer>> = (0..num_of_buffer)
.map(|i| Arc::new(FlushBuffer::new_buffer(i, buffer_size)))
.collect();
let buffers = Pin::new(buffers.into_boxed_slice());
let current = &*buffers[0] as *const FlushBuffer as *mut FlushBuffer;
FlushBufferRing {
current_buffer: AtomicPtr::new(current),
ring: buffers,
next_index: AtomicUsize::new(1),
_size: num_of_buffer,
next_address_range: AtomicUsize::new(4),
store: None,
}
}
pub fn with_flusher(
num_of_buffer: usize,
buffer_size: usize,
flusher: Arc<FlushBehavior>,
) -> FlushBufferRing {
let buffers: Vec<Arc<FlushBuffer>> = (0..num_of_buffer)
.map(|i| Arc::new(FlushBuffer::new_buffer(i, buffer_size)))
.collect();
let buffers = Pin::new(buffers.into_boxed_slice());
let current = &*buffers[0] as *const FlushBuffer as *mut FlushBuffer;
FlushBufferRing {
current_buffer: AtomicPtr::new(current),
ring: buffers,
next_index: AtomicUsize::new(1),
_size: num_of_buffer,
next_address_range: AtomicUsize::new(4),
store: Some(flusher),
}
}
pub fn put(
&self,
current: &FlushBuffer,
reserve_result: Result<usize, BufferError>,
payload: &[u8],
) -> Result<BufferMsg, BufferError> {
match reserve_result {
Err(BufferError::InsufficientSpace) => {
let prev = current.state.fetch_or(SEALED_BIT, Ordering::AcqRel);
if prev & SEALED_BIT != 0 {
return Err(BufferError::EncounteredSealedBuffer);
}
let slot = self.next_address_range.fetch_add(1, Ordering::AcqRel);
current
.local_lss_address_slot
.store(slot, Ordering::Release);
self.rotate_after_seal(current.pos)?;
let before = current.set_flush_in_progress();
if before & FLUSH_IN_PROGRESS_BIT == 0 {
match self.store.as_ref() {
Some(store) => {
let _ = store.submit_buffer(current);
}
None => {
self.reset_buffer(current);
}
}
return Ok(BufferMsg::SuccessfullWriteFlush);
}
return Err(BufferError::ActiveUsers);
}
Err(BufferError::EncounteredSealedBuffer) => {
return Err(BufferError::EncounteredSealedBuffer);
}
Err(e) => return Err(e),
Ok(offset) => {
current.write(offset, payload);
let prev = current.decrement_writers();
let was_last_writer = state_writers(prev) == 1;
let was_sealed = state_sealed(prev);
if was_last_writer && was_sealed {
let prev = current.set_flush_in_progress();
if prev & FLUSH_IN_PROGRESS_BIT == 0 {
let flush_buffer = self.ring.get(current.pos).unwrap().clone();
self.flush(&flush_buffer);
return Ok(BufferMsg::SuccessfullWriteFlush);
}
}
return Ok(BufferMsg::SuccessfullWrite);
}
}
}
pub fn rotate_after_seal(&self, sealed_pos: usize) -> Result<(), BufferError> {
let current = self.current_buffer.load(Ordering::Acquire);
let current_ref = unsafe { current.as_ref().ok_or(BufferError::InvalidState)? };
if current_ref.pos != sealed_pos {
return Ok(());
}
let ring_len = self.ring.len();
for _ in 0..ring_len {
let raw = self.next_index.fetch_add(1, Ordering::AcqRel);
let next_index = raw % ring_len;
let new_buffer = &self.ring[next_index];
if new_buffer.is_available() {
let _ = self.current_buffer.compare_exchange(
current,
Arc::as_ptr(new_buffer) as *const FlushBuffer as *mut FlushBuffer,
Ordering::AcqRel,
Ordering::Acquire,
);
return Ok(());
}
}
Err(BufferError::RingExhausted)
}
pub(crate) fn flush(&self, buffer: &FlushBuffer) {
buffer.set_flush_in_progress();
match self.store.as_ref() {
Some(store) => {
let _ = store.submit_buffer(buffer);
}
None => {
self.reset_buffer(buffer);
}
}
}
pub fn reset_buffer(&self, buffer: &FlushBuffer) {
loop {
let flushed_buffer_state = buffer.state.load(Ordering::Acquire);
const OFFSET_MASK: usize = usize::MAX << OFFSET_SHIFT;
let reset = flushed_buffer_state & !(SEALED_BIT | FLUSH_IN_PROGRESS_BIT | OFFSET_MASK);
if buffer
.state
.compare_exchange(
flushed_buffer_state,
reset,
Ordering::AcqRel,
Ordering::Acquire,
)
.is_ok()
{
break;
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::{
collections::HashSet,
sync::{Arc, Barrier, Mutex},
thread,
time::Instant,
};
struct Lcg {
state: u64,
}
impl Lcg {
fn new(seed: u64) -> Self {
Self { state: seed }
}
fn next_usize(&mut self, bound: usize) -> usize {
self.state = self
.state
.wrapping_mul(6364136223846793005)
.wrapping_add(1442695040888963407);
((self.state >> 33) as usize) % bound
}
}
const TEST_RING_SIZE: usize = 4;
const OPS_PER_THREAD: usize = 2_000;
const SIZES: &[usize] = &[
1, 2, 4, 7, 8, 15, 16, 32, 64, 100, 128, 200, 256, 512, 1024, 2048, 4090, 4095, 4096,
];
fn make_payload(tag: &str, size: usize) -> Vec<u8> {
let meta = format!("[{tag}:{size}]");
let mut buf = vec![0xAA_u8; size];
let n = meta.len().min(size);
buf[..n].copy_from_slice(&meta.as_bytes()[..n]);
buf
}
fn put_with_retry(ring: &FlushBufferRing, payload: &[u8]) -> Result<BufferMsg, BufferError> {
loop {
let current = unsafe {
ring.current_buffer
.load(Ordering::Acquire)
.as_ref()
.ok_or(BufferError::InvalidState)?
};
let reserve_result = current.reserve_space(payload.len());
match &reserve_result {
Err(BufferError::FailedReservation) => continue,
Err(BufferError::EncounteredSealedBuffer) => continue,
_ => {}
}
match ring.put(current, reserve_result, payload) {
Err(BufferError::ActiveUsers) => continue,
Err(BufferError::EncounteredSealedBuffer) => {
std::thread::yield_now();
continue;
}
Err(BufferError::RingExhausted) => {
std::thread::yield_now();
continue;
}
other => return other,
}
}
}
#[test]
fn reserve_on_sealed_buffer_returns_error() {
let buf = FlushBuffer::new_buffer(0, FOUR_KB_PAGE);
buf.set_sealed_bit_true().unwrap();
assert!(matches!(
buf.reserve_space(16),
Err(BufferError::EncounteredSealedBuffer)
));
}
#[test]
fn double_seal_returns_error() {
let buf = FlushBuffer::new_buffer(0, FOUR_KB_PAGE);
buf.set_sealed_bit_true().unwrap();
assert!(matches!(
buf.set_sealed_bit_true(),
Err(BufferError::EncounteredSealedBufferDuringCOMPEX)
));
}
#[test]
fn unseal_unsealed_returns_error() {
let buf = FlushBuffer::new_buffer(0, FOUR_KB_PAGE);
assert!(matches!(
buf.set_sealed_bit_false(),
Err(BufferError::EncounteredUnSealedBufferDuringCOMPEX)
));
}
#[test]
fn reserve_on_flush_in_progress_returns_error() {
let buf = FlushBuffer::new_buffer(0, FOUR_KB_PAGE);
buf.set_flush_in_progress();
assert!(matches!(
buf.reserve_space(16),
Err(BufferError::EncounteredSealedBuffer)
));
}
#[test]
fn writer_count_symmetric() {
let buf = FlushBuffer::new_buffer(0, FOUR_KB_PAGE);
buf.increment_writers();
buf.increment_writers();
buf.increment_writers();
assert_eq!(state_writers(buf.state_snapshot()), 3);
buf.decrement_writers();
buf.decrement_writers();
buf.decrement_writers();
assert_eq!(state_writers(buf.state_snapshot()), 0);
}
#[test]
fn reserve_exact_capacity() {
let buf = FlushBuffer::new_buffer(0, FOUR_KB_PAGE);
let offset = buf.reserve_space(FOUR_KB_PAGE).unwrap();
assert_eq!(offset, 0);
assert!(matches!(
buf.reserve_space(1),
Err(BufferError::InsufficientSpace)
));
}
#[test]
fn sequential_reservations_no_overlap() {
let buf = FlushBuffer::new_buffer(0, FOUR_KB_PAGE);
let a = buf.reserve_space(100).unwrap();
let b = buf.reserve_space(100).unwrap();
assert_eq!(a, 0);
assert_eq!(b, 100);
}
#[test]
fn concurrent_reserve_space_no_overlap() {
let buf = Arc::new(FlushBuffer::new_buffer(99, FOUR_KB_PAGE));
let seen: Arc<Mutex<HashSet<usize>>> = Arc::new(Mutex::new(HashSet::new()));
const THREADS: usize = 8;
const RESERVES_PER_THREAD: usize = 32;
let barrier = Arc::new(Barrier::new(THREADS));
let handles: Vec<_> = (0..THREADS)
.map(|_tid| {
let buf = Arc::clone(&buf);
let seen = Arc::clone(&seen);
let barrier = Arc::clone(&barrier);
thread::spawn(move || {
barrier.wait();
for _ in 0..RESERVES_PER_THREAD {
loop {
match buf.reserve_space(16) {
Ok(offset) => {
let mut lock = seen.lock().unwrap();
assert!(
lock.insert(offset),
"[OVERLAP] offset {offset} issued twice!"
);
break;
}
Err(BufferError::FailedReservation) => continue,
Err(BufferError::InsufficientSpace) => break,
Err(BufferError::EncounteredSealedBuffer) => break,
Err(e) => panic!("unexpected error: {e:?}"),
}
}
}
})
})
.collect();
for h in handles {
h.join().expect("reserve worker panicked");
}
let lock = seen.lock().unwrap();
assert_eq!(
lock.len(),
THREADS * RESERVES_PER_THREAD,
"expected {} unique offsets, got {}",
THREADS * RESERVES_PER_THREAD,
lock.len()
);
}
#[test]
fn exact_fill_triggers_rotate() {
let ring = FlushBufferRing::with_buffer_amount(TEST_RING_SIZE, FOUR_KB_PAGE);
let payload = make_payload("FILL", FOUR_KB_PAGE);
match put_with_retry(&ring, &payload) {
Ok(BufferMsg::SuccessfullWrite) | Ok(BufferMsg::SuccessfullWriteFlush) => {}
other => panic!("exact_fill: unexpected {other:?}"),
}
let result = put_with_retry(&ring, &make_payload("AFTER", 16));
assert!(
result.is_ok(),
"ring should still accept writes after rotate: {result:?}"
);
}
#[test]
fn manual_seal_causes_rotate() {
let ring = FlushBufferRing::with_buffer_amount(TEST_RING_SIZE, FOUR_KB_PAGE);
let current_before = unsafe {
ring.current_buffer
.load(Ordering::Acquire)
.as_ref()
.unwrap()
};
let pos_before = current_before.pos;
current_before.set_sealed_bit_true().unwrap();
ring.rotate_after_seal(pos_before).unwrap();
let current_after = unsafe {
ring.current_buffer
.load(Ordering::Acquire)
.as_ref()
.unwrap()
};
assert_ne!(
current_after.pos, pos_before,
"current_buffer should have rotated away from sealed buffer"
);
}
#[test]
fn ring_exhaustion_returns_error() {
let ring = FlushBufferRing::with_buffer_amount(TEST_RING_SIZE, FOUR_KB_PAGE);
for i in 0..TEST_RING_SIZE {
ring.ring[i].set_sealed_bit_true().ok();
}
let result = ring.rotate_after_seal(0);
assert!(
matches!(result, Err(BufferError::RingExhausted)),
"expected RingExhausted, got {result:?}"
);
}
#[test]
fn single_threaded_offset_uniqueness() {
let ring = FlushBufferRing::with_buffer_amount(TEST_RING_SIZE, FOUR_KB_PAGE);
let mut rng = Lcg::new(0);
let mut writes = 0usize;
let mut flushes = 0usize;
let mut data_written = 0usize;
let mut i = 0usize;
loop {
let size = SIZES[rng.next_usize(SIZES.len())];
if data_written + size > FOUR_KB_PAGE * TEST_RING_SIZE {
break;
}
let payload = make_payload(&format!("s{i:05}"), size);
data_written += size;
match put_with_retry(&ring, &payload) {
Ok(BufferMsg::SuccessfullWrite) => writes += 1,
Ok(BufferMsg::SuccessfullWriteFlush) => {
writes += 1;
flushes += 1;
}
other => panic!("single_threaded: unexpected {other:?}"),
}
i += 1;
}
println!(
"single_threaded_offset_uniqueness: {writes} writes, {flushes} flushes, {data_written} bytes"
);
}
#[test]
fn single_threaded_stress() {
let ring = FlushBufferRing::with_buffer_amount(TEST_RING_SIZE, FOUR_KB_PAGE);
let mut writes = 0usize;
let mut flushes = 0usize;
let mut rng = Lcg::new(0x1234_5678);
let start = Instant::now();
for op in 0..OPS_PER_THREAD {
let size = SIZES[rng.next_usize(SIZES.len())];
let payload = make_payload(&format!("S:O{op:04}"), size);
match put_with_retry(&ring, &payload) {
Ok(BufferMsg::SuccessfullWrite) => writes += 1,
Ok(BufferMsg::SuccessfullWriteFlush) => {
writes += 1;
flushes += 1;
}
other => panic!("op {op}: unexpected {other:?}"),
}
}
let elapsed = start.elapsed();
println!(
"single_threaded_stress: {writes} writes, {flushes} flushes in {elapsed:.2?} ({:.0} ops/s)",
(writes + flushes) as f64 / elapsed.as_secs_f64()
);
}
const NUM_THREADS_SMALL: usize = 2;
const NUM_THREADS_MEDIUM: usize = 4;
const NUM_THREADS_LARGE: usize = 8;
#[test]
fn multi_threaded_test_small() {
multi_threaded_stress_helper(NUM_THREADS_SMALL);
}
#[test]
fn multi_threaded_test_medium() {
multi_threaded_stress_helper(NUM_THREADS_MEDIUM);
}
#[test]
fn multi_threaded_test_large() {
multi_threaded_stress_helper(NUM_THREADS_LARGE);
}
fn multi_threaded_stress_helper(num_threads: usize) {
let ring = Arc::new(FlushBufferRing::with_buffer_amount(
TEST_RING_SIZE,
FOUR_KB_PAGE,
));
let barrier = Arc::new(Barrier::new(num_threads));
let total_writes = Arc::new(AtomicUsize::new(0));
let total_flushes = Arc::new(AtomicUsize::new(0));
let start_times = Arc::new(Mutex::new(Vec::new()));
let handles: Vec<thread::JoinHandle<()>> = (0..num_threads)
.map(|tid| {
let ring = Arc::clone(&ring);
let barrier = Arc::clone(&barrier);
let total_writes = Arc::clone(&total_writes);
let total_flushes = Arc::clone(&total_flushes);
let start_times = Arc::clone(&start_times);
let seed = 0x1234_5678_u64
.wrapping_add(tid as u64)
.wrapping_mul(0xDEAD_CAFE);
thread::spawn(move || {
let mut rng = Lcg::new(seed);
let mut local_writes = 0usize;
let mut local_flushes = 0usize;
barrier.wait();
start_times.lock().unwrap().push(Instant::now());
for op in 0..OPS_PER_THREAD {
let size = SIZES[rng.next_usize(SIZES.len())];
let payload = make_payload(&format!("T{tid}:O{op:04}"), size);
let result = loop {
let current = unsafe {
ring.current_buffer
.load(Ordering::Acquire)
.as_ref()
.expect("null current_buffer")
};
let reserve_result = current.reserve_space(payload.len());
match &reserve_result {
Err(BufferError::FailedReservation) => continue,
Err(BufferError::EncounteredSealedBuffer) => continue,
_ => {}
}
match ring.put(current, reserve_result, &payload) {
Err(BufferError::ActiveUsers) => continue,
Err(BufferError::EncounteredSealedBuffer) => continue,
Err(BufferError::RingExhausted) => {
std::thread::yield_now();
continue;
}
Ok(BufferMsg::SealedBuffer) => continue,
other => break other,
}
};
match result {
Ok(BufferMsg::SuccessfullWrite) => local_writes += 1,
Ok(BufferMsg::SuccessfullWriteFlush) => {
local_writes += 1;
local_flushes += 1;
}
other => panic!("thread {tid} op {op}: unexpected {other:?}"),
}
}
total_writes.fetch_add(local_writes, Ordering::Relaxed);
total_flushes.fetch_add(local_flushes, Ordering::Relaxed);
})
})
.collect();
for (tid, handle) in handles.into_iter().enumerate() {
handle
.join()
.unwrap_or_else(|_| panic!("worker thread {tid} panicked"));
}
let join_time = Instant::now();
let writes = total_writes.load(Ordering::Relaxed);
let flushes = total_flushes.load(Ordering::Relaxed);
let earliest_start = start_times.lock().unwrap().iter().copied().min().unwrap();
let elapsed = join_time.duration_since(earliest_start);
println!(
"multi_threaded_stress({num_threads} threads): {writes} writes, {flushes} flushes \
in {elapsed:.2?} ({:.0} ops/s)",
writes as f64 / elapsed.as_secs_f64()
);
assert_eq!(
writes,
num_threads * OPS_PER_THREAD,
"total writes should equal num_threads * OPS_PER_THREAD"
);
}
#[test]
fn hammer_seal_concurrent_rotation() {
let ring = Arc::new(FlushBufferRing::with_buffer_amount(
TEST_RING_SIZE,
FOUR_KB_PAGE,
));
let barrier = Arc::new(Barrier::new(NUM_THREADS_SMALL));
let handles: Vec<_> = (0..NUM_THREADS_SMALL)
.map(|tid| {
let ring = Arc::clone(&ring);
let barrier = Arc::clone(&barrier);
thread::spawn(move || {
barrier.wait();
for iter in 0..100_usize {
let payload = make_payload(&format!("H{tid}:{iter}"), 2048);
match put_with_retry(&ring, &payload) {
Ok(_) => {}
Err(e) => panic!("hammer thread {tid} iter {iter}: error {e:?}"),
}
}
})
})
.collect();
for h in handles {
h.join().expect("hammer worker panicked");
}
}
}