use crate::identifiers::{UniqueNodeId, UniquePortId, UniqueReaderId, UniqueWriterId};
use iceoryx2_bb_container::queue::RelocatableContainer;
use iceoryx2_bb_lock_free::mpmc::{container::*, unique_index_set_enums::ReleaseMode};
use iceoryx2_bb_memory::bump_allocator::BumpAllocator;
use iceoryx2_log::{error, fatal_panic};
use super::PortCleanupAction;
#[repr(C)]
#[derive(Debug, Clone, Copy)]
pub(crate) struct DynamicConfigSettings {
pub number_of_readers: usize,
pub number_of_writers: usize,
}
#[repr(C)]
#[derive(Debug, Clone, Copy)]
pub struct ReaderDetails {
pub reader_id: UniqueReaderId,
pub node_id: UniqueNodeId,
}
#[repr(C)]
#[derive(Debug, Clone, Copy)]
pub struct WriterDetails {
pub writer_id: UniqueWriterId,
pub node_id: UniqueNodeId,
}
#[repr(C)]
#[derive(Debug)]
pub struct DynamicConfig {
pub(crate) readers: Container<ReaderDetails>,
pub(crate) writers: Container<WriterDetails>,
}
impl DynamicConfig {
pub(crate) fn new(config: &DynamicConfigSettings) -> Self {
Self {
readers: unsafe { Container::new_uninit(config.number_of_readers) },
writers: unsafe { Container::new_uninit(config.number_of_writers) },
}
}
pub(crate) unsafe fn init(&mut self, allocator: &BumpAllocator) {
unsafe {
fatal_panic!(from self,
when self.readers.init(allocator),
"This should never happen! Unable to initialize reader port id container.");
fatal_panic!(from self,
when self.writers.init(allocator),
"This should never happen! Unable to initialize writer port id container.");
}
}
pub(crate) fn memory_size(config: &DynamicConfigSettings) -> usize {
Container::<ReaderDetails>::memory_size(config.number_of_readers)
+ Container::<WriterDetails>::memory_size(config.number_of_writers)
}
pub fn number_of_readers(&self) -> usize {
self.readers.len()
}
pub fn number_of_writers(&self) -> usize {
self.writers.len()
}
pub fn list_readers<F: FnMut(&ReaderDetails) -> CallbackProgression>(&self, mut callback: F) {
let state = unsafe { self.readers.get_state() };
state.for_each(|_, details| callback(details));
}
pub fn list_writers<F: FnMut(&WriterDetails) -> CallbackProgression>(&self, mut callback: F) {
let state = unsafe { self.writers.get_state() };
state.for_each(|_, details| callback(details));
}
pub(crate) unsafe fn remove_dead_node_id<
PortCleanup: FnMut(UniquePortId) -> PortCleanupAction,
>(
&self,
node_id: &UniqueNodeId,
mut port_cleanup_callback: PortCleanup,
) {
unsafe {
self.readers.recover(
node_id.owner_id(),
|registered_reader| {
registered_reader.node_id == *node_id
&& port_cleanup_callback(UniquePortId::Reader(registered_reader.reader_id))
== PortCleanupAction::RemovePort
},
ReleaseMode::Default,
);
self.writers.recover(
node_id.owner_id(),
|registered_writer| {
registered_writer.node_id == *node_id
&& port_cleanup_callback(UniquePortId::Writer(registered_writer.writer_id))
== PortCleanupAction::RemovePort
},
ReleaseMode::Default,
);
}
}
pub(crate) fn add_reader_id(&self, details: ReaderDetails) -> Option<ContainerHandle> {
unsafe { self.readers.add(details, details.node_id.owner_id()).ok() }
}
pub(crate) fn release_reader_handle(&self, handle: ContainerHandle) {
if let Err(e) = unsafe { self.readers.remove(handle, ReleaseMode::Default) } {
error!(from self, "Unable to deregister reader from service. This could indicate a corrupted system! [{e:?}]");
}
}
pub(crate) fn add_writer_id(&self, details: WriterDetails) -> Option<ContainerHandle> {
unsafe { self.writers.add(details, details.node_id.owner_id()).ok() }
}
pub(crate) fn release_writer_handle(&self, handle: ContainerHandle) {
if let Err(e) = unsafe { self.writers.remove(handle, ReleaseMode::Default) } {
error!(from self, "Unable to deregister writer from service. This could indicate a corrupted system! [{e:?}]");
}
}
}