#[derive(Debug)]
pub struct MagicRingBuffer
{
writer_offset: CompareExchangeOnlyEverIncreasesMonotonicallyOffset,
unread_offset: CompareExchangeOnlyEverIncreasesMonotonicallyOffset,
read_offset: CompareExchangeOnlyEverIncreasesMonotonicallyOffset,
mirrored_memory_map: MirroredMemoryMap,
}
impl MagicRingBuffer
{
#[inline(always)]
pub fn allocate_mirrored_and_not_swappable_from_dev_shm(file_extension: &str, buffer_size_not_page_aligned: usize) -> Result<Self, MirroredMemoryMapCreationError>
{
Ok
(
Self
{
writer_offset: CompareExchangeOnlyEverIncreasesMonotonicallyOffset::default(),
unread_offset: CompareExchangeOnlyEverIncreasesMonotonicallyOffset::default(),
read_offset: CompareExchangeOnlyEverIncreasesMonotonicallyOffset::default(),
mirrored_memory_map: MirroredMemoryMap::allocate_mirrored_and_not_swappable_from_dev_shm(file_extension, Size::from(buffer_size_not_page_aligned))?,
}
)
}
#[inline(always)]
pub fn recovery_if_using_persistent_memory(&self)
{
self.writer_offset.set(self.unread_offset.get())
}
#[inline(always)]
pub fn write_some_data(&self, amount_we_want_to_write: usize, writer: impl FnOnce(&mut [u8]))
{
let amount_we_want_to_write = Size::from(amount_we_want_to_write);
debug_assert!(amount_we_want_to_write <= self.unmirrored_buffer_size(), "Can not write amounts large than then ring buffer's size");
let (current_writer_state_write_offset, next_writer_state_write_offset) = self.writer_offset.fetch_add(amount_we_want_to_write);
let mut current_unread_offset = loop
{
let (current_unread_offset, _current_read_offset, unread) = self.current_unread_offset_and_current_read_offset_and_unread();
let total_size_required_for_writes_in_progress = next_writer_state_write_offset - current_unread_offset;
let available_for_writes = self.unmirrored_buffer_size() - unread;
debug_assert!(available_for_writes <= self.unmirrored_buffer_size());
if likely!(available_for_writes >= total_size_required_for_writes_in_progress)
{
break current_unread_offset
}
spin_loop_hint();
};
writer(self.write_to_buffer(current_writer_state_write_offset, amount_we_want_to_write));
loop
{
current_unread_offset = match self.unread_offset.try_to_update(current_unread_offset, current_writer_state_write_offset)
{
Ok(()) => break,
Err(was_reader_state) => was_reader_state,
};
spin_loop_hint();
}
}
#[inline(always)]
pub fn single_reader_read_some_data<E, Reader: FnOnce(&mut [u8]) -> (usize, Result<(), E>)>(&self, reader: Reader) -> Result<bool, E>
{
let (_current_unread_offset, current_read_offset, unread) = self.current_unread_offset_and_current_read_offset_and_unread();
let (actually_read, outcome) = reader(self.read_from_buffer(current_read_offset, unread));
let actually_read = Size::from(actually_read);
let updated_read_offset = current_read_offset + actually_read;
self.read_offset.set(updated_read_offset);
match outcome
{
Err(error) => Err(error),
Ok(()) =>
{
let (_current_unread_offset, _current_read_offset, unread) = self.current_unread_offset_and_current_read_offset_and_unread();
Ok(unread != Size::default())
}
}
}
#[inline(always)]
fn unmirrored_buffer_size(&self) -> Size
{
self.mirrored_memory_map.unmirrored_buffer_size
}
#[inline(always)]
fn current_unread_offset_and_current_read_offset_and_unread(&self) -> (OnlyEverIncreasesMonotonicallyOffset, OnlyEverIncreasesMonotonicallyOffset, Size)
{
let current_unread_offset = self.unread_offset.get();
let current_read_offset = self.read_offset.get();
debug_assert!(current_unread_offset >= current_read_offset);
let unread = current_unread_offset - current_read_offset;
(current_unread_offset, current_read_offset, unread)
}
#[inline(always)]
fn real_pointer(&self, offset: OnlyEverIncreasesMonotonicallyOffset) -> *mut u8
{
self.mirrored_memory_map.pointer(offset)
}
#[inline(always)]
fn write_to_buffer(&self, current_writer_state_write_offset: OnlyEverIncreasesMonotonicallyOffset, amount_we_want_to_write: Size) -> &mut [u8]
{
let write_pointer = self.real_pointer(current_writer_state_write_offset);
unsafe { from_raw_parts_mut(write_pointer, amount_we_want_to_write.into()) }
}
#[inline(always)]
fn read_from_buffer(&self, current_read_offset: OnlyEverIncreasesMonotonicallyOffset, unread: Size) -> &mut [u8]
{
let read_pointer = self.real_pointer(current_read_offset);
unsafe { from_raw_parts_mut(read_pointer, unread.into()) }
}
}