#[derive(Debug)]
pub struct Queue<MessageHandlerArguments: Debug + Copy, E: Debug>
{
magic_ring_buffer: MagicRingBuffer,
message_handlers: UnsafeCell<MutableTypeErasedBoxedFunctionCompressedMap<MessageHandlerArguments, Result<(), E>>>,
}
impl<MessageHandlerArguments: Debug + Copy, E: Debug> Drop for Queue<MessageHandlerArguments, E>
{
#[inline(always)]
fn drop(&mut self)
{
let message_handlers = self.message_handlers();
while
{
let more_data_to_read = self.magic_ring_buffer.single_reader_read_some_data::<E, _>
(
|buffer|
{
Message::process_next_message_in_buffer::<Result<(), E>, _>
(
buffer,
|compressed_type_identifier, receiver|
{
message_handlers.drop_in_place(compressed_type_identifier, receiver);
Ok(())
}
)
}
).expect("Should never happen");
more_data_to_read
}
{
}
}
}
impl<MessageHandlerArguments: Debug + Copy, E: Debug> Enqueue for Queue<MessageHandlerArguments, E>
{
#[inline(always)]
fn enqueue<MessageContents>(&self, compressed_type_identifier: CompressedTypeIdentifier, message_contents_constructor: impl FnOnce(NonNull<MessageContents>))
{
Message::enqueue(&self.magic_ring_buffer, compressed_type_identifier, message_contents_constructor)
}
}
impl<MessageHandlerArguments: Debug + Copy, E: Debug> Dequeue<MessageHandlerArguments, E> for Queue<MessageHandlerArguments, E>
{
#[inline(always)]
fn dequeue(&self, terminate: &impl Terminate, message_handler_arguments: MessageHandlerArguments) -> Result<(), E>
{
let message_handlers = self.message_handlers();
while
{
let more_data_to_read = self.magic_ring_buffer.single_reader_read_some_data::<E, _>
(
|buffer|
{
Message::process_next_message_in_buffer::<Result<(), E>, _>
(
buffer,
|compressed_type_identifier, receiver|
{
message_handlers.call_and_drop_in_place(compressed_type_identifier, receiver, message_handler_arguments)
}
)
}
)?;
more_data_to_read && terminate.should_continue()
}
{
}
Ok(())
}
}
impl<MessageHandlerArguments: Debug + Copy, E: Debug> Queue<MessageHandlerArguments, E>
{
#[inline(always)]
pub fn allocate_from_dev_shm(file_extension: &str, queue_size_in_bytes: usize) -> Result<Arc<Self>, MirroredMemoryMapCreationError>
{
Ok
(
Arc::new
(
Self
{
magic_ring_buffer: MagicRingBuffer::allocate_mirrored_and_not_swappable_from_dev_shm(file_extension, queue_size_in_bytes)?,
message_handlers: Default::default(),
}
)
)
}
#[inline(always)]
pub fn queues(logical_cores: &LogicalCores, queue_size_in_bytes: usize) -> Arc<PerLogicalCoreData<Arc<Self>>>
{
Arc::new
(
logical_cores.populate_per_logical_core_data(|_logical_core_identifier|
{
Queue::allocate_from_dev_shm("queue", queue_size_in_bytes).unwrap()
})
)
}
#[inline(always)]
pub(crate) fn message_handlers(&self) -> &mut MutableTypeErasedBoxedFunctionCompressedMap<MessageHandlerArguments, Result<(), E>>
{
unsafe { &mut * self.message_handlers.get() }
}
}