use iceoryx2_bb_concurrency::atomic::AtomicU64;
use iceoryx2_bb_elementary_traits::relocatable_container::RelocatableContainer;
use iceoryx2_bb_lock_free::mpmc::{container::*, unique_index_set_enums::ReleaseMode};
use iceoryx2_bb_memory::bump_allocator::BumpAllocator;
use iceoryx2_log::{error, fatal_panic};
use crate::identifiers::{UniqueListenerId, UniqueNodeId, UniqueNotifierId, UniquePortId};
use super::PortCleanupAction;
#[repr(C)]
#[derive(Debug, Clone, Copy)]
pub(crate) struct DynamicConfigSettings {
pub number_of_listeners: usize,
pub number_of_notifiers: usize,
}
#[repr(C)]
#[derive(Debug)]
pub struct DynamicConfig {
pub(crate) listeners: Container<ListenerDetails>,
pub(crate) notifiers: Container<NotifierDetails>,
pub(crate) elapsed_time_since_last_notification: AtomicU64,
}
#[repr(C)]
#[derive(Debug, Clone, Copy)]
pub struct ListenerDetails {
pub listener_id: UniqueListenerId,
pub node_id: UniqueNodeId,
}
#[repr(C)]
#[derive(Debug, Clone, Copy)]
pub struct NotifierDetails {
pub notifier_id: UniqueNotifierId,
pub node_id: UniqueNodeId,
}
impl DynamicConfig {
pub(crate) fn new(config: &DynamicConfigSettings) -> Self {
Self {
listeners: unsafe { Container::new_uninit(config.number_of_listeners) },
notifiers: unsafe { Container::new_uninit(config.number_of_notifiers) },
elapsed_time_since_last_notification: AtomicU64::new(0),
}
}
pub(crate) unsafe fn init(&mut self, allocator: &BumpAllocator) {
unsafe {
fatal_panic!(from "event::DynamicConfig::init",
when self.listeners.init(allocator),
"This should never happen! Unable to initialize listener port id container.");
fatal_panic!(from "event::DynamicConfig::init",
when self.notifiers.init(allocator),
"This should never happen! Unable to initialize notifier port id container.");
}
}
pub(crate) fn memory_size(config: &DynamicConfigSettings) -> usize {
Container::<ListenerDetails>::memory_size(config.number_of_listeners)
+ Container::<NotifierDetails>::memory_size(config.number_of_notifiers)
}
pub fn number_of_listeners(&self) -> usize {
self.listeners.len()
}
pub fn number_of_notifiers(&self) -> usize {
self.notifiers.len()
}
pub fn list_listeners<F: FnMut(&ListenerDetails) -> CallbackProgression>(
&self,
mut callback: F,
) {
let state = unsafe { self.listeners.get_state() };
state.for_each(|_, details| callback(details));
}
pub fn list_notifiers<F: FnMut(&NotifierDetails) -> CallbackProgression>(
&self,
mut callback: F,
) {
let state = unsafe { self.notifiers.get_state() };
state.for_each(|_, details| callback(details));
}
pub(crate) unsafe fn remove_dead_node_id<
PortCleanup: FnMut(UniquePortId) -> PortCleanupAction,
>(
&self,
node_id: &UniqueNodeId,
mut port_cleanup_callback: PortCleanup,
) {
unsafe {
self.listeners.recover(
node_id.owner_id(),
|registered_listener| {
registered_listener.node_id == *node_id
&& port_cleanup_callback(UniquePortId::Listener(
registered_listener.listener_id,
)) == PortCleanupAction::RemovePort
},
ReleaseMode::Default,
);
self.notifiers.recover(
node_id.owner_id(),
|registered_notifier| {
registered_notifier.node_id == *node_id
&& port_cleanup_callback(UniquePortId::Notifier(
registered_notifier.notifier_id,
)) == PortCleanupAction::RemovePort
},
ReleaseMode::Default,
);
}
}
pub(crate) fn add_listener_id(&self, details: ListenerDetails) -> Option<ContainerHandle> {
unsafe { self.listeners.add(details, details.node_id.owner_id()).ok() }
}
pub(crate) fn release_listener_handle(&self, handle: ContainerHandle) {
if let Err(e) = unsafe { self.listeners.remove(handle, ReleaseMode::Default) } {
error!(from self, "Unable to deregister listener from service. This could indicate a corrupted system! [{e:?}]");
}
}
pub(crate) fn add_notifier_id(&self, details: NotifierDetails) -> Option<ContainerHandle> {
unsafe { self.notifiers.add(details, details.node_id.owner_id()).ok() }
}
pub(crate) fn release_notifier_handle(&self, handle: ContainerHandle) {
if let Err(e) = unsafe { self.notifiers.remove(handle, ReleaseMode::Default) } {
error!(from self, "Unable to deregister notifier from service. This could indicate a corrupted system! [{e:?}]");
}
}
}