use std::{
pin::Pin,
sync::{
Arc,
atomic::{AtomicPtr, AtomicUsize, Ordering},
mpsc::{self, Receiver, Sender},
},
};
use crate::{
BufferError, BufferMsg, FLUSH_IN_PROGRESS_BIT, FOUR_KB_BLOCK, FlushBuffer, OFFSET_SHIFT,
SEALED_BIT, quik_io::QuikIO, state_offset, state_sealed, state_writers,
};
pub struct BufferRing {
current_buffer: AtomicPtr<FlushBuffer>,
ring: Pin<Box<[Arc<FlushBuffer>]>>,
next_index: AtomicUsize,
next_address_range: AtomicUsize,
store: Option<Arc<QuikIO>>,
auto_flush: bool,
auto_rotate: bool,
size: usize,
cq_tx: Option<Sender<(u64, usize)>>,
pending_flushes: AtomicUsize,
}
pub struct BufferRingOptions {
capacity: usize,
buffer_size: usize,
io_instance: Option<Arc<QuikIO>>,
auto_flush: bool,
auto_rotate: bool,
cq_tx: Option<Sender<(u64, usize)>>,
}
impl BufferRingOptions {
pub fn new() -> Self {
Self {
capacity: 0,
buffer_size: 0,
io_instance: None,
auto_flush: true,
auto_rotate: true,
cq_tx: None,
}
}
pub fn capacity(&mut self, cap: usize) -> &mut Self {
self.capacity = cap;
self
}
pub fn buffer_size(&mut self, buffer_size: usize) -> &mut Self {
let size = buffer_size.next_multiple_of(buffer_size);
self.buffer_size = size;
self
}
pub fn io_instance(&mut self, io: Arc<QuikIO>) -> &mut Self {
self.io_instance = Some(io);
self
}
pub fn auto_flush(&mut self, enabled: bool) -> &mut Self {
self.auto_flush = enabled;
self
}
pub fn auto_rotate(&mut self, enabled: bool) -> &mut Self {
self.auto_rotate = enabled;
self
}
pub fn completion_receiver(&mut self) -> Receiver<(u64, usize)> {
let (tx, rx) = mpsc::channel();
self.cq_tx = Some(tx);
rx
}
}
impl BufferRing {
pub fn with_options(options: &mut BufferRingOptions) -> BufferRing {
let buffers: Vec<Arc<FlushBuffer>> = (0..options.capacity)
.map(|i| Arc::new(FlushBuffer::new_buffer(i, options.buffer_size)))
.collect();
let buffers = Pin::new(buffers.into_boxed_slice());
let current = &*buffers[0] as *const FlushBuffer as *mut FlushBuffer;
let instance = options.io_instance.take();
BufferRing {
current_buffer: AtomicPtr::new(current),
ring: buffers,
next_index: AtomicUsize::new(1),
size: options.capacity,
next_address_range: AtomicUsize::new(0),
store: instance,
auto_flush: options.auto_flush,
auto_rotate: options.auto_rotate,
cq_tx: options.cq_tx.take(),
pending_flushes: AtomicUsize::new(0),
}
}
pub fn put(
&self,
current: &FlushBuffer,
reserve_result: Result<usize, BufferError>,
payload: &[u8],
) -> Result<BufferMsg, BufferError> {
match reserve_result {
Err(BufferError::InsufficientSpace) => {
let prev = current.state.fetch_or(SEALED_BIT, Ordering::AcqRel);
if prev & SEALED_BIT != 0 {
return Err(BufferError::EncounteredSealedBuffer);
}
let padded = current.size().next_multiple_of(FOUR_KB_BLOCK);
let slot = self.incrment_address(padded, Ordering::Acquire);
current.local_address.store(slot, Ordering::Release);
if self.auto_rotate {
let _ = self.rotate_after_seal(current.pos); }
let state_now = current.state.load(Ordering::Acquire);
if state_writers(state_now) == 0 {
let before = current.set_flush_in_progress();
if before & FLUSH_IN_PROGRESS_BIT == 0 {
match self.store.as_ref() {
Some(_) if self.auto_flush => {
let flush_buffer = self.ring.get(current.pos).unwrap().clone();
self.flush(&flush_buffer);
}
_ => self.reset_buffer(current),
}
return Ok(BufferMsg::SuccessfullWriteFlush);
}
}
Err(BufferError::EncounteredSealedBuffer)
}
Err(BufferError::EncounteredSealedBuffer) => {
return Err(BufferError::EncounteredSealedBuffer);
}
Err(e) => return Err(e),
Ok(offset) => {
current.write(offset, payload);
let prev = current.decrement_writers();
let was_last_writer = state_writers(prev) == 1;
let was_sealed = state_sealed(prev);
if was_last_writer && was_sealed {
let prev = current.set_flush_in_progress();
if prev & FLUSH_IN_PROGRESS_BIT == 0 {
match self.store.as_ref() {
Some(_) if self.auto_flush => {
let flush_buffer = self.ring.get(current.pos).unwrap().clone();
self.flush(&flush_buffer);
}
_ => self.reset_buffer(current),
}
return Ok(BufferMsg::SuccessfullWriteFlush);
}
}
return Ok(BufferMsg::SuccessfullWrite);
}
}
}
pub fn flush(&self, buffer: &FlushBuffer) {
buffer.set_flush_in_progress();
match self.store.as_ref() {
Some(store) => {
self.pending_flushes.fetch_add(1, Ordering::Release);
store.submit_buffer(buffer);
}
None => {
self.reset_buffer(buffer);
}
}
}
pub fn f_sync(&self, buffer: &FlushBuffer) {
buffer.set_flush_in_progress();
match self.store.as_ref() {
Some(store) => {
store.submit_buffer(buffer);
let _ = store.wait_for_all();
store.sync_data().expect("Drained Submission Queue");
self.reset_buffer(buffer);
}
None => {
self.reset_buffer(buffer);
}
}
}
pub fn rotate_after_seal(&self, sealed_pos: usize) -> Result<(), BufferError> {
let current = self.current_buffer.load(Ordering::Acquire);
let current_ref = unsafe { current.as_ref().ok_or(BufferError::InvalidState)? };
let current_size = current_ref.size();
if current_ref.pos != sealed_pos {
return Ok(());
}
let ring_len = self.ring.len();
for _ in 0..ring_len {
let raw = self.next_index.fetch_add(1, Ordering::AcqRel);
let next_index = raw % ring_len;
let new_buffer = &self.ring[next_index];
if new_buffer.is_available() {
let _ = self.current_buffer.compare_exchange(
current,
Arc::as_ptr(new_buffer) as *mut FlushBuffer,
Ordering::AcqRel,
Ordering::Acquire,
);
self.__reserve_buf_addr(new_buffer, current_size);
return Ok(());
}
}
Err(BufferError::RingExhausted)
}
pub fn reset_buffer(&self, buffer: &FlushBuffer) {
buffer.state.store(0, Ordering::SeqCst);
buffer.local_address.store(0, Ordering::Release);
}
pub fn check_cque(&self) -> Result<(), String> {
let Some(store) = &self.store else {
return Err("Store not present".to_string());
};
loop {
let cqes = store.cqe();
if cqes.is_empty() {
return Ok(());
}
for cqe in cqes {
if cqe.user_data() == 0 {
continue;
}
let ptr = cqe.user_data() as *const FlushBuffer;
let buffer = unsafe { &*ptr };
if cqe.result() < 0 {
if let Some(sqe) = unsafe { (*buffer.sqe.get()).as_ref() } {
let mut ring_guard = store.ring();
let _ = unsafe { ring_guard.submission().push(sqe) };
let _ = ring_guard.submit();
}
} else {
self.pending_flushes.fetch_sub(1, Ordering::Release);
if let Some(tx) = &self.cq_tx {
let file_offset = buffer.local_address(Ordering::Acquire) as u64;
let byte_count = buffer.size();
let _ = tx.send((file_offset, byte_count));
}
self.reset_buffer(buffer); }
}
}
}
pub fn flush_current(&self) -> Result<(), BufferError> {
let current_ptr = self.current_buffer.load(Ordering::Acquire);
let current = unsafe { current_ptr.as_ref().ok_or(BufferError::InvalidState)? };
if current.size() == 0 {
return Ok(());
}
current.seal()?;
let actual_len = current.size();
let slot = self.incrment_address(actual_len, Ordering::SeqCst);
current.local_address.store(slot, Ordering::Release);
self.flush(current);
let _ = self.rotate_after_seal(current.pos);
Ok(())
}
pub fn next_address(&self, ordering: Ordering) -> usize {
self.next_address_range.load(ordering)
}
pub fn incrment_address(&self, val: usize, ordering: Ordering) -> usize {
self.next_address_range.fetch_add(val, ordering)
}
fn __reserve_buf_addr(&self, buffer: &FlushBuffer, size: usize) {
let slot = self.incrment_address(size, Ordering::SeqCst);
let _ = buffer
.local_address
.compare_exchange(0, slot, Ordering::AcqRel, Ordering::Relaxed);
}
pub fn pending_flushes(&self, ordering: Ordering) -> usize {
self.pending_flushes.load(ordering)
}
pub fn current_buffer(&self, ordering: Ordering) -> &'static FlushBuffer {
let ptr = self.current_buffer.load(ordering);
unsafe { ptr.as_ref().unwrap() }
}
pub fn ring_size(&self) -> usize {
self.size
}
}