use iceoryx2_bb_elementary_traits::relocatable_container::RelocatableContainer;
use iceoryx2_bb_lock_free::mpmc::{container::*, unique_index_set_enums::ReleaseMode};
use iceoryx2_bb_memory::bump_allocator::BumpAllocator;
use iceoryx2_log::{error, fatal_panic};
use crate::{
identifiers::{UniqueNodeId, UniquePortId, UniquePublisherId, UniqueSubscriberId},
port::details::data_segment::DataSegmentType,
};
use super::PortCleanupAction;
#[repr(C)]
#[derive(Debug, Clone, Copy)]
pub(crate) struct DynamicConfigSettings {
pub number_of_subscribers: usize,
pub number_of_publishers: usize,
}
#[repr(C)]
#[derive(Debug, Clone, Copy)]
pub struct PublisherDetails {
pub publisher_id: UniquePublisherId,
pub node_id: UniqueNodeId,
pub number_of_samples: usize,
pub max_slice_len: usize,
pub data_segment_type: DataSegmentType,
pub max_number_of_segments: u8,
}
#[repr(C)]
#[derive(Debug, Copy, Clone)]
pub struct SubscriberDetails {
pub subscriber_id: UniqueSubscriberId,
pub node_id: UniqueNodeId,
pub buffer_size: usize,
}
#[repr(C)]
#[derive(Debug)]
pub struct DynamicConfig {
pub(crate) subscribers: Container<SubscriberDetails>,
pub(crate) publishers: Container<PublisherDetails>,
}
impl DynamicConfig {
pub(crate) fn new(config: &DynamicConfigSettings) -> Self {
Self {
subscribers: unsafe { Container::new_uninit(config.number_of_subscribers) },
publishers: unsafe { Container::new_uninit(config.number_of_publishers) },
}
}
pub(crate) unsafe fn init(&mut self, allocator: &BumpAllocator) {
unsafe {
fatal_panic!(from self,
when self.subscribers.init(allocator),
"This should never happen! Unable to initialize subscriber port id container.");
fatal_panic!(from self,
when self.publishers.init(allocator),
"This should never happen! Unable to initialize publisher port id container.");
}
}
pub(crate) fn memory_size(config: &DynamicConfigSettings) -> usize {
Container::<SubscriberDetails>::memory_size(config.number_of_subscribers)
+ Container::<PublisherDetails>::memory_size(config.number_of_publishers)
}
pub(crate) unsafe fn remove_dead_node_id<
PortCleanup: FnMut(UniquePortId) -> PortCleanupAction,
>(
&self,
node_id: &UniqueNodeId,
mut port_cleanup_callback: PortCleanup,
) {
unsafe {
self.publishers.recover(
node_id.owner_id(),
|registered_publisher| {
registered_publisher.node_id == *node_id
&& port_cleanup_callback(UniquePortId::Publisher(
registered_publisher.publisher_id,
)) == PortCleanupAction::RemovePort
},
ReleaseMode::Default,
);
self.subscribers.recover(
node_id.owner_id(),
|registered_subscriber| {
registered_subscriber.node_id == *node_id
&& port_cleanup_callback(UniquePortId::Subscriber(
registered_subscriber.subscriber_id,
)) == PortCleanupAction::RemovePort
},
ReleaseMode::Default,
);
}
}
pub fn number_of_publishers(&self) -> usize {
self.publishers.len()
}
pub fn number_of_subscribers(&self) -> usize {
self.subscribers.len()
}
pub fn list_subscribers<F: FnMut(&SubscriberDetails) -> CallbackProgression>(
&self,
mut callback: F,
) {
let state = unsafe { self.subscribers.get_state() };
state.for_each(|_, details| callback(details));
}
pub fn list_publishers<F: FnMut(&PublisherDetails) -> CallbackProgression>(
&self,
mut callback: F,
) {
let state = unsafe { self.publishers.get_state() };
state.for_each(|_, details| callback(details));
}
pub(crate) fn add_subscriber_id(&self, details: SubscriberDetails) -> Option<ContainerHandle> {
unsafe {
self.subscribers
.add(details, details.node_id.owner_id())
.ok()
}
}
pub(crate) fn release_subscriber_handle(&self, handle: ContainerHandle) {
if let Err(e) = unsafe { self.subscribers.remove(handle, ReleaseMode::Default) } {
error!(from self, "Unable to deregister subscriber from service. This could indicate a corrupted system! [{e:?}]");
}
}
pub(crate) fn add_publisher_id(&self, details: PublisherDetails) -> Option<ContainerHandle> {
unsafe {
self.publishers
.add(details, details.node_id.owner_id())
.ok()
}
}
pub(crate) fn release_publisher_handle(&self, handle: ContainerHandle) {
if let Err(e) = unsafe { self.publishers.remove(handle, ReleaseMode::Default) } {
error!(from self, "Unable to deregister publisher from service. This could indicate a corrupted system! [{e:?}]");
}
}
}