use crate::constants::MAX_BLACKBOARD_KEY_SIZE;
use crate::identifiers::UniqueReaderId;
use crate::prelude::EventId;
use crate::service::builder::CustomKeyMarker;
use crate::service::builder::blackboard::{BlackboardResources, KeyMemory};
use crate::service::dynamic_config::blackboard::ReaderDetails;
use crate::service::static_config::message_type_details::{TypeDetail, TypeVariant};
use crate::service::{self, SharedServiceState};
use core::alloc::Layout;
use core::fmt::Debug;
use core::hash::Hash;
use core::marker::PhantomData;
use core::ops::Deref;
use core::ptr::NonNull;
use iceoryx2_bb_concurrency::atomic::Ordering;
use iceoryx2_bb_elementary::math::align;
use iceoryx2_bb_elementary_traits::non_null::NonNullCompat;
use iceoryx2_bb_elementary_traits::testing::abandonable::Abandonable;
use iceoryx2_bb_elementary_traits::zero_copy_send::ZeroCopySend;
use iceoryx2_bb_lock_free::mpmc::container::ContainerHandle;
use iceoryx2_bb_lock_free::spmc::unrestricted_atomic::{
UnrestrictedAtomic, UnrestrictedAtomicMgmt,
};
use iceoryx2_cal::arc_sync_policy::ArcSyncPolicy;
use iceoryx2_cal::dynamic_storage::DynamicStorage;
use iceoryx2_cal::shared_memory::SharedMemory;
use iceoryx2_log::{fail, fatal_panic};
pub struct BlackboardValue<ValueType: Copy> {
value: ValueType,
generation_counter: u64,
}
impl<ValueType: Copy> Deref for BlackboardValue<ValueType> {
type Target = ValueType;
fn deref(&self) -> &Self::Target {
&self.value
}
}
impl<ValueType: Copy + core::fmt::Display> core::fmt::Display for BlackboardValue<ValueType> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
write!(f, "{}", self.value)
}
}
impl<ValueType: Copy + Debug> Debug for BlackboardValue<ValueType> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
write!(
f,
"BlackboardValue<{}> {{ value: {:?}, generation_counter: {} }}",
core::any::type_name::<ValueType>(),
self.value,
self.generation_counter
)
}
}
#[derive(Debug)]
struct ReaderSharedState<
Service: service::Service,
KeyType: Send + Sync + Eq + Clone + Debug + 'static + Hash + ZeroCopySend,
> {
service_state: SharedServiceState<Service, BlackboardResources<Service>>,
_key: PhantomData<KeyType>,
port_tag: Service::StaticStorage,
}
unsafe impl<
Service: service::Service,
KeyType: Send + Sync + Eq + Clone + Debug + 'static + Hash + ZeroCopySend,
> Send for ReaderSharedState<Service, KeyType>
{
}
impl<
Service: service::Service,
KeyType: Send + Sync + Eq + Clone + Debug + 'static + Hash + ZeroCopySend,
> Abandonable for ReaderSharedState<Service, KeyType>
{
unsafe fn abandon_in_place(mut this: NonNull<Self>) {
let this = unsafe { this.as_mut() };
unsafe {
SharedServiceState::abandon_in_place(NonNull::iox2_from_mut(&mut this.service_state))
};
unsafe {
Service::StaticStorage::abandon_in_place(NonNull::iox2_from_mut(&mut this.port_tag))
};
}
}
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub enum ReaderCreateError {
ExceedsMaxSupportedReaders,
FailedToDeployThreadsafetyPolicy,
UnableToCreatePortTag,
}
impl core::fmt::Display for ReaderCreateError {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
write!(f, "ReaderCreateError::{self:?}")
}
}
impl core::error::Error for ReaderCreateError {}
#[derive(Debug)]
pub struct Reader<
Service: service::Service,
KeyType: Send + Sync + Eq + Clone + Copy + Debug + 'static + Hash + ZeroCopySend,
> {
shared_state: Service::ArcThreadSafetyPolicy<ReaderSharedState<Service, KeyType>>,
dynamic_reader_handle: Option<ContainerHandle>,
reader_id: UniqueReaderId,
}
impl<
Service: service::Service,
KeyType: Send + Sync + Eq + Clone + Copy + Debug + 'static + Hash + ZeroCopySend,
> Abandonable for Reader<Service, KeyType>
{
unsafe fn abandon_in_place(mut this: NonNull<Self>) {
let this = unsafe { this.as_mut() };
unsafe {
Service::ArcThreadSafetyPolicy::abandon_in_place(NonNull::iox2_from_mut(
&mut this.shared_state,
))
};
}
}
impl<
Service: service::Service,
KeyType: Send + Sync + Eq + Clone + Copy + Debug + 'static + Hash + ZeroCopySend,
> Drop for Reader<Service, KeyType>
{
fn drop(&mut self) {
if let Some(handle) = self.dynamic_reader_handle {
self.shared_state
.lock()
.service_state
.dynamic_storage()
.get()
.blackboard()
.release_reader_handle(handle)
}
}
}
impl<
Service: service::Service,
KeyType: Send + Sync + Eq + Clone + Copy + Debug + 'static + Hash + ZeroCopySend,
> Reader<Service, KeyType>
{
pub(crate) fn new(
service: SharedServiceState<Service, BlackboardResources<Service>>,
) -> Result<Self, ReaderCreateError> {
let origin = "Reader::new()";
let msg = "Unable to create Reader port";
let reader_id = UniqueReaderId::new();
let port_tag = match service
.shared_node()
.create_port_tag(origin, msg, reader_id.0.value())
{
Ok(port_tag) => port_tag,
Err(e) => {
fail!(from origin, with ReaderCreateError::UnableToCreatePortTag,
"{msg} since the port tag, that is required for cleanup, could not be created. [{e:?}]");
}
};
let shared_state =
<Service as service::Service>::ArcThreadSafetyPolicy::new(ReaderSharedState {
port_tag,
service_state: service.clone(),
_key: PhantomData,
});
let shared_state = match shared_state {
Ok(v) => v,
Err(e) => {
fail!(from origin, with ReaderCreateError::FailedToDeployThreadsafetyPolicy,
"{msg} since the threadsafety policy could not be instantiated ({e:?}).");
}
};
let mut new_self = Self {
shared_state,
reader_id,
dynamic_reader_handle: None,
};
core::sync::atomic::compiler_fence(Ordering::SeqCst);
let dynamic_reader_handle = match service
.dynamic_storage()
.get()
.blackboard()
.add_reader_id(ReaderDetails {
reader_id,
node_id: *service.shared_node().id(),
}) {
Some(unique_index) => unique_index,
None => {
fail!(from origin, with ReaderCreateError::ExceedsMaxSupportedReaders,
"{} since it would exceed the maximum supported amount of readers of {}.",
msg, service.static_config().blackboard().max_readers);
}
};
new_self.dynamic_reader_handle = Some(dynamic_reader_handle);
Ok(new_self)
}
pub fn id(&self) -> UniqueReaderId {
self.reader_id
}
pub fn entry<ValueType: Copy + ZeroCopySend>(
&self,
key: &KeyType,
) -> Result<EntryHandle<Service, KeyType, ValueType>, EntryHandleError> {
let msg = "Unable to create entry handle";
let key_mem = match KeyMemory::try_from(key) {
Ok(mem) => mem,
Err(_) => {
fatal_panic!(from self, "This should never happen! Key with invalid layout passed.");
}
};
let offset = self.get_entry_offset(
&key_mem,
&TypeDetail::new::<ValueType>(TypeVariant::FixedSize),
msg,
)?;
let atomic = (self
.shared_state
.lock()
.service_state
.additional_resource()
.data
.payload_start_address() as u64
+ offset) as *const UnrestrictedAtomic<ValueType>;
Ok(EntryHandle::new(self.shared_state.clone(), atomic, offset))
}
fn get_entry_offset(
&self,
key_mem: &KeyMemory<MAX_BLACKBOARD_KEY_SIZE>,
value_type_details: &TypeDetail,
msg: &str,
) -> Result<u64, EntryHandleError> {
let index = match unsafe {
self.shared_state
.lock()
.service_state
.additional_resource()
.mgmt
.get()
.map
.__internal_get(
key_mem,
self.shared_state
.lock()
.service_state
.additional_resource()
.key_eq_func
.as_ref(),
)
} {
Some(i) => i,
None => {
fail!(from self, with EntryHandleError::EntryDoesNotExist,
"{} since no entry with the given key exists.", msg);
}
};
let shared_state = self.shared_state.lock();
let entry = &shared_state
.service_state
.additional_resource()
.mgmt
.get()
.entries[index];
if *value_type_details != entry.type_details {
fail!(from self, with EntryHandleError::EntryDoesNotExist,
"{} since no entry with the given key and value type exists.", msg);
}
let offset = entry.offset.load(core::sync::atomic::Ordering::Relaxed);
Ok(offset)
}
}
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub enum EntryHandleError {
EntryDoesNotExist,
}
impl core::fmt::Display for EntryHandleError {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
write!(f, "EntryHandleError::{self:?}")
}
}
impl core::error::Error for EntryHandleError {}
pub struct EntryHandle<
Service: service::Service,
KeyType: Send + Sync + Eq + Clone + Debug + 'static + Hash + ZeroCopySend,
ValueType: Copy,
> {
atomic: *const UnrestrictedAtomic<ValueType>,
entry_id: EventId,
_shared_state: Service::ArcThreadSafetyPolicy<ReaderSharedState<Service, KeyType>>,
}
unsafe impl<
Service: service::Service,
KeyType: Send + Sync + Eq + Clone + Debug + 'static + Hash + ZeroCopySend,
ValueType: Copy + 'static,
> Send for EntryHandle<Service, KeyType, ValueType>
{
}
unsafe impl<
Service: service::Service,
KeyType: Send + Sync + Eq + Clone + Debug + 'static + Hash + ZeroCopySend,
ValueType: Copy + 'static,
> Sync for EntryHandle<Service, KeyType, ValueType>
{
}
impl<
Service: service::Service,
KeyType: Send + Sync + Eq + Clone + Debug + 'static + Hash + ZeroCopySend,
ValueType: Copy,
> EntryHandle<Service, KeyType, ValueType>
{
fn new(
reader_state: Service::ArcThreadSafetyPolicy<ReaderSharedState<Service, KeyType>>,
atomic: *const UnrestrictedAtomic<ValueType>,
offset: u64,
) -> Self {
Self {
atomic,
entry_id: EventId::new(offset as _),
_shared_state: reader_state.clone(),
}
}
pub fn get(&self) -> BlackboardValue<ValueType> {
unsafe {
let generation_counter = (*self.atomic).__internal_get_write_cell();
BlackboardValue {
value: (*self.atomic).load(),
generation_counter,
}
}
}
pub fn is_up_to_date(&self, value: &BlackboardValue<ValueType>) -> bool {
unsafe { (*self.atomic).__internal_get_write_cell() == value.generation_counter }
}
pub fn entry_id(&self) -> EventId {
self.entry_id
}
}
impl<Service: service::Service> Reader<Service, CustomKeyMarker> {
#[doc(hidden)]
pub unsafe fn __internal_entry(
&self,
key: *const u8,
value_type_details: &TypeDetail,
) -> Result<__InternalEntryHandle<Service>, EntryHandleError> {
let msg = "Unable to create entry handle";
let shared_state = self.shared_state.lock();
let key_type_details = shared_state
.service_state
.static_config()
.blackboard()
.type_details();
let key_layout = unsafe {
Layout::from_size_align_unchecked(key_type_details.size, key_type_details.alignment)
};
let key_mem = unsafe {
match KeyMemory::try_from_ptr(key, key_layout) {
Ok(mem) => mem,
Err(_) => {
fatal_panic!(from self, "This should never happen! Key with invalid layout set.");
}
}
};
let offset = self.get_entry_offset(&key_mem, value_type_details, msg)?;
let atomic_mgmt_ptr = (shared_state
.service_state
.additional_resource()
.data
.payload_start_address() as u64
+ offset) as *const UnrestrictedAtomicMgmt;
let data_ptr = atomic_mgmt_ptr as usize + core::mem::size_of::<UnrestrictedAtomicMgmt>();
let data_ptr = align(data_ptr, value_type_details.alignment);
Ok(__InternalEntryHandle {
atomic_mgmt_ptr,
data_ptr: data_ptr as *const u8,
entry_id: EventId::new(offset as _),
_shared_state: self.shared_state.clone(),
})
}
}
#[doc(hidden)]
pub struct __InternalEntryHandle<Service: service::Service> {
atomic_mgmt_ptr: *const UnrestrictedAtomicMgmt,
data_ptr: *const u8,
entry_id: EventId,
_shared_state: Service::ArcThreadSafetyPolicy<ReaderSharedState<Service, CustomKeyMarker>>,
}
unsafe impl<Service: service::Service> Send for __InternalEntryHandle<Service> {}
unsafe impl<Service: service::Service> Sync for __InternalEntryHandle<Service> {}
impl<Service: service::Service> __InternalEntryHandle<Service> {
pub unsafe fn get(
&self,
value_ptr: *mut u8,
value_size: usize,
value_alignment: usize,
generation_counter_ptr: *mut u64,
) {
unsafe {
if !generation_counter_ptr.is_null() {
let generation_counter = (*self.atomic_mgmt_ptr).__internal_get_write_cell();
core::ptr::copy_nonoverlapping(&generation_counter, generation_counter_ptr, 1);
}
(*self.atomic_mgmt_ptr).load(value_ptr, value_size, value_alignment, self.data_ptr);
}
}
pub fn entry_id(&self) -> EventId {
self.entry_id
}
pub fn is_up_to_date(&self, generation_counter: u64) -> bool {
unsafe { (*self.atomic_mgmt_ptr).__internal_get_write_cell() == generation_counter }
}
}