use super::super::{
super::ogre_sync,
meta_publisher::MovePublisher,
meta_subscriber::MoveSubscriber,
meta_container::MoveContainer,
};
use std::{
fmt::Debug,
ptr,
sync::atomic::{
AtomicBool,
Ordering::Relaxed,
},
pin::Pin,
num::NonZeroU32,
cell::UnsafeCell,
mem::ManuallyDrop,
};
#[repr(C,align(128))] pub struct FullSyncMove<SlotType: Debug + Default,
const BUFFER_SIZE: usize> {
head: UnsafeCell<u32>,
tail: UnsafeCell<u32>,
concurrency_guard: AtomicBool,
buffer: UnsafeCell<Pin<Box<[ManuallyDrop<SlotType>; BUFFER_SIZE]>>>,
}
impl<'a, SlotType: 'a + Debug + Default,
const BUFFER_SIZE: usize>
MoveContainer<SlotType> for
FullSyncMove<SlotType, BUFFER_SIZE> {
fn new() -> Self {
Self::with_initializer(|| SlotType::default())
}
fn with_initializer<F: Fn() -> SlotType>(slot_initializer: F) -> Self {
debug_assert!(Self::BUFFER_SIZE_MUST_BE_A_POWER_OF_2); Self {
head: UnsafeCell::new(0),
tail: UnsafeCell::new(0),
concurrency_guard: AtomicBool::new(false),
buffer: UnsafeCell::new(Box::pin([0; BUFFER_SIZE].map(|_| ManuallyDrop::new(slot_initializer())))),
}
}
}
impl<'a, SlotType: 'a + Debug + Default,
const BUFFER_SIZE: usize>
MovePublisher<SlotType> for
FullSyncMove<SlotType, BUFFER_SIZE> {
#[inline(always)]
fn publish_movable(&self, item: SlotType) -> (Option<NonZeroU32>, Option<SlotType>) {
match self.leak_slot_internal(|| false) {
Some( (slot, _slot_id, len_before) ) => {
unsafe { ptr::write(slot, item); }
self.publish_leaked_internal();
(NonZeroU32::new(len_before+1), None)
},
None => (None, Some(item)),
}
}
#[inline(always)]
fn publish<SetterFn: FnOnce(&mut SlotType),
ReportFullFn: Fn() -> bool,
ReportLenAfterEnqueueingFn: FnOnce(u32)>
(&self, setter_fn: SetterFn,
report_full_fn: ReportFullFn,
report_len_after_enqueueing_fn: ReportLenAfterEnqueueingFn)
-> Option<SetterFn> {
match self.leak_slot_internal(report_full_fn) {
Some( (slot_ref, _slot_id, len_before) ) => {
setter_fn(slot_ref);
self.publish_leaked_internal();
report_len_after_enqueueing_fn(len_before+1);
None
}
None => Some(setter_fn)
}
}
#[inline(always)]
fn available_elements_count(&self) -> usize {
let tail = unsafe { &* self.tail.get() };
let head = unsafe { &* self.head.get() };
tail.overflowing_sub(*head).0 as usize
}
#[inline(always)]
fn max_size(&self) -> usize {
BUFFER_SIZE
}
fn debug_info(&self) -> String {
let Self {concurrency_guard, tail, buffer: _, head} = self;
let tail = unsafe { &* tail.get() };
let head = unsafe { &* head.get() };
let concurrency_guard = concurrency_guard.load(Relaxed);
format!("ogre_queues::full_sync_meta's state: {{head: {head}, tail: {tail}, (len: {}), locked: {concurrency_guard}, elements: {{{}}}'}}",
self.available_elements_count(),
unsafe {self.peek_remaining()}.iter().flat_map(|&slice| slice).fold(String::new(), |mut acc, e| {
acc.push_str(&format!("'{:?}',", e));
acc
}))
}
}
impl<'a, SlotType: 'a + Debug + Default,
const BUFFER_SIZE: usize>
MoveSubscriber<SlotType> for
FullSyncMove<SlotType, BUFFER_SIZE> {
#[inline(always)]
fn consume_movable(&self) -> Option<SlotType> {
match self.consume_leaking_internal(|| false) {
Some( (slot_ref, _len_before) ) => {
let item = unsafe { Some(ptr::read(slot_ref)) };
self.release_leaked_internal();
ogre_sync::unlock(&self.concurrency_guard);
item
}
None => None,
}
}
#[inline(always)]
unsafe fn peek_remaining(&self) -> [&[SlotType];2] {
let tail = *unsafe { &* self.tail.get() };
let head = *unsafe { &* self.head.get() };
let head_index = head as usize % BUFFER_SIZE;
let tail_index = tail as usize % BUFFER_SIZE;
if head == tail {
[&[],&[]]
} else if head_index < tail_index {
unsafe {
let const_ptr = self.buffer.get();
let ptr = const_ptr as *const Box<[SlotType; BUFFER_SIZE]>;
let array = &*ptr;
[&array[head_index ..tail_index], &[]]
}
} else {
unsafe {
let const_ptr = self.buffer.get();
let ptr = const_ptr as *const Box<[SlotType; BUFFER_SIZE]>;
let array = &*ptr;
[&array[head_index..BUFFER_SIZE], &array[0..tail_index]]
}
}
}
}
impl<'a, SlotType: 'a + Debug + Default,
const BUFFER_SIZE: usize>
FullSyncMove<SlotType, BUFFER_SIZE> {
const BUFFER_SIZE_MUST_BE_A_POWER_OF_2: bool = usize::MAX / if BUFFER_SIZE.is_power_of_two() {1} else {0} > 0;
#[inline(always)]
pub fn leak_slot_internal(&self, report_full_fn: impl Fn() -> bool) -> Option<(&'a mut SlotType, /*slot_id:*/ u32, /*len_before:*/ u32)> {
let mutable_buffer = unsafe { &mut * (self.buffer.get() as *mut Box<[SlotType; BUFFER_SIZE]>) };
let mut len_before;
loop {
ogre_sync::lock(&self.concurrency_guard);
let tail = *unsafe { &* self.tail.get() };
let head = *unsafe { &* self.head.get() };
len_before = tail.overflowing_sub(head).0;
if len_before < BUFFER_SIZE as u32 {
break unsafe { Some( (mutable_buffer.get_unchecked_mut(tail as usize % BUFFER_SIZE), tail, len_before) ) }
} else {
ogre_sync::unlock(&self.concurrency_guard);
let maybe_no_longer_full = report_full_fn();
if !maybe_no_longer_full {
break None;
}
}
}
}
#[inline(always)]
pub fn publish_leaked_internal(&self) {
let tail = unsafe { &mut * self.tail.get() };
*tail = tail.overflowing_add(1).0;
ogre_sync::unlock(&self.concurrency_guard);
}
#[inline(always)]
pub fn unleak_internal(&self) {
let tail = unsafe { &mut * self.tail.get() };
*tail = tail.overflowing_sub(1).0;
ogre_sync::unlock(&self.concurrency_guard);
}
#[inline(always)]
fn consume_leaking_internal(&self, report_empty_fn: impl Fn() -> bool) -> Option<(&'a mut SlotType, /*len_before:*/ i32)> {
let mutable_buffer = unsafe { &mut * (self.buffer.get() as *mut Box<[SlotType; BUFFER_SIZE]>) };
let mut len_before;
loop {
ogre_sync::lock(&self.concurrency_guard);
let head = *unsafe { &mut * self.head.get() };
len_before = self.available_elements_count() as i32;
if len_before > 0 {
break unsafe { Some( (mutable_buffer.get_unchecked_mut(head as usize % BUFFER_SIZE), len_before) ) }
} else {
ogre_sync::unlock(&self.concurrency_guard);
let maybe_no_longer_empty = report_empty_fn();
if !maybe_no_longer_empty {
break None;
}
}
}
}
#[inline(always)]
fn release_leaked_internal(&self) {
let head = unsafe { &mut * self.head.get() };
*head = head.overflowing_add(1).0;
}
#[inline(always)]
pub fn slot_index_from_slot_ref(&'a self, slot: &'a SlotType) -> u32 {
unsafe {
let mutable_buffer = &mut * (self.buffer.get() as *mut Box<[SlotType; BUFFER_SIZE]>);
(slot as *const SlotType).offset_from(mutable_buffer.get_unchecked(0) as *const SlotType) as u32
}
}
#[inline(always)]
pub fn slot_ref_from_slot_index(&'a self, slot_index: u32) -> &'a SlotType {
unsafe {
let buffer = &*self.buffer.get();
buffer.get_unchecked(slot_index as usize % BUFFER_SIZE)
}
}
}
impl<SlotType: Debug + Default,
const BUFFER_SIZE: usize>
Drop for
FullSyncMove<SlotType, BUFFER_SIZE> {
fn drop(&mut self) {
loop {
ogre_sync::unlock(&self.concurrency_guard);
match self.consume_movable() {
None => break,
Some(item) => drop(item),
}
}
}
}
unsafe impl<SlotType: Debug + Default,
const BUFFER_SIZE: usize>
Send for
FullSyncMove<SlotType, BUFFER_SIZE> {}
unsafe impl<SlotType: Debug + Default,
const BUFFER_SIZE: usize>
Sync for
FullSyncMove<SlotType, BUFFER_SIZE> {}
#[cfg(any(test,doc))]
mod tests {
use super::*;
use crate::ogre_std::test_commons::{self, ContainerKind,Blocking};
#[cfg_attr(not(doc),test)]
fn basic_queue_use_cases() {
let queue = FullSyncMove::<i32, 16>::new();
test_commons::basic_container_use_cases("Movable API",
ContainerKind::Queue, Blocking::NonBlocking, queue.max_size(),
|e| queue.publish_movable(e).0.is_some(),
|| queue.consume_movable(),
|| queue.available_elements_count());
test_commons::basic_container_use_cases("Zero-Copy Producer/Movable Subscriber API",
ContainerKind::Queue, Blocking::NonBlocking, queue.max_size(),
|e| queue.publish(|slot| *slot = e, || false, |_| {}).is_none(),
|| queue.consume_movable(),
|| queue.available_elements_count());
}
#[cfg_attr(not(doc),test)]
#[ignore] fn single_producer_multiple_consumers() {
let queue = FullSyncMove::<u32, 65536>::new();
test_commons::container_single_producer_multiple_consumers("Movable API",
|e| queue.publish_movable(e).0.is_some(),
|| queue.consume_movable());
test_commons::container_single_producer_multiple_consumers("Zero-Copy Producer/Movable Subscriber API",
|e| queue.publish(|slot| *slot = e, || false, |_| {}).is_none(),
|| queue.consume_movable());
}
#[cfg_attr(not(doc),test)]
#[ignore] fn multiple_producers_single_consumer() {
let queue = FullSyncMove::<u32, 65536>::new();
test_commons::container_multiple_producers_single_consumer("Movable API",
|e| queue.publish_movable(e).0.is_some(),
|| queue.consume_movable());
test_commons::container_multiple_producers_single_consumer("Zero-Copy Producer/Movable Subscriber API",
|e| queue.publish(|slot| *slot = e, || false, |_| {}).is_none(),
|| queue.consume_movable());
}
#[cfg_attr(not(doc),test)]
#[ignore] pub fn multiple_producers_and_consumers_all_in_and_out() {
let queue = FullSyncMove::<u32, 65536>::new();
test_commons::container_multiple_producers_and_consumers_all_in_and_out("Movable API",
Blocking::NonBlocking,
queue.max_size(),
|e| queue.publish_movable(e).0.is_some(),
|| queue.consume_movable());
test_commons::container_multiple_producers_and_consumers_all_in_and_out("Zero-Copy Producer/Movable Subscriber API",
Blocking::NonBlocking,
queue.max_size(),
|e| queue.publish(|slot| *slot = e, || false, |_| {}).is_none(),
|| queue.consume_movable());
}
#[cfg_attr(not(doc),test)]
#[ignore] pub fn multiple_producers_and_consumers_single_in_and_out() {
let queue = FullSyncMove::<u32, 65536>::new();
test_commons::container_multiple_producers_and_consumers_single_in_and_out("Movable API",
|e| queue.publish_movable(e).0.is_some(),
|| queue.consume_movable());
test_commons::container_multiple_producers_and_consumers_single_in_and_out("Zero-Copy Producer/Movable Subscriber API",
|e| queue.publish(|slot| *slot = e, || false, |_| {}).is_none(),
|| queue.consume_movable());
}
#[cfg_attr(not(doc),test)]
pub fn peek_test() {
let queue = FullSyncMove::<u32, 16>::new();
test_commons::peak_remaining("Movable API",
|e| queue.publish_movable(e).0.is_some(),
|| queue.consume_movable(),
|| unsafe {
let mut iter = queue.peek_remaining().into_iter();
( iter.next().expect("no item @0").iter(),
iter.next().expect("no item @1").iter() )
} );
test_commons::peak_remaining("Zero-Copy Producer/Movable Subscriber API",
|e| queue.publish(|slot| *slot = e, || false, |_| {}).is_none(),
|| queue.consume_movable(),
|| unsafe {
let mut iter = queue.peek_remaining().into_iter();
( iter.next().expect("no item @0").iter(),
iter.next().expect("no item @1").iter() )
} );
}
#[cfg_attr(not(doc),test)]
fn indexes_and_references_conversions() {
let queue = FullSyncMove::<i32, 1024>::new();
let Some((first_item, _, _)) = queue.leak_slot_internal(|| false) else {
panic!("Can't determine the reference for the element at #0");
};
test_commons::indexes_and_references_conversions(first_item,
|index| queue.slot_ref_from_slot_index(index),
|slot_ref| queue.slot_index_from_slot_ref(slot_ref));
}
}