use crate::constants::MAX_BLACKBOARD_KEY_SIZE;
use crate::identifiers::UniqueWriterId;
use crate::prelude::EventId;
use crate::service::builder::CustomKeyMarker;
use crate::service::builder::blackboard::{BlackboardResources, KeyMemory};
use crate::service::dynamic_config::blackboard::WriterDetails;
use crate::service::static_config::message_type_details::{TypeDetail, TypeVariant};
use crate::service::{self, SharedServiceState};
use core::alloc::Layout;
use core::fmt::Debug;
use core::hash::Hash;
use core::marker::PhantomData;
use core::mem::MaybeUninit;
use core::ptr::NonNull;
use iceoryx2_bb_concurrency::atomic::Ordering;
use iceoryx2_bb_concurrency::cell::UnsafeCell;
use iceoryx2_bb_elementary::math::align;
use iceoryx2_bb_elementary_traits::non_null::NonNullCompat;
use iceoryx2_bb_elementary_traits::testing::abandonable::Abandonable;
use iceoryx2_bb_elementary_traits::zero_copy_send::ZeroCopySend;
use iceoryx2_bb_lock_free::mpmc::container::ContainerHandle;
use iceoryx2_bb_lock_free::spmc::unrestricted_atomic::{
Producer, UnrestrictedAtomic, UnrestrictedAtomicMgmt,
};
use iceoryx2_cal::arc_sync_policy::ArcSyncPolicy;
use iceoryx2_cal::dynamic_storage::DynamicStorage;
use iceoryx2_cal::shared_memory::SharedMemory;
use iceoryx2_log::{fail, fatal_panic};
#[derive(Debug)]
struct WriterSharedState<
Service: service::Service,
KeyType: Send + Sync + Eq + Clone + Debug + 'static + Hash + ZeroCopySend,
> {
service_state: SharedServiceState<Service, BlackboardResources<Service>>,
dynamic_writer_handle: UnsafeCell<Option<ContainerHandle>>,
_key: PhantomData<KeyType>,
}
impl<
Service: service::Service,
KeyType: Send + Sync + Eq + Clone + Debug + 'static + Hash + ZeroCopySend,
> Drop for WriterSharedState<Service, KeyType>
{
fn drop(&mut self) {
if let Some(handle) = unsafe { &*self.dynamic_writer_handle.get() } {
self.service_state
.dynamic_storage()
.get()
.blackboard()
.release_writer_handle(*handle)
}
}
}
unsafe impl<
Service: service::Service,
KeyType: Send + Sync + Eq + Clone + Debug + 'static + Hash + ZeroCopySend,
> Send for WriterSharedState<Service, KeyType>
{
}
impl<
Service: service::Service,
KeyType: Send + Sync + Eq + Clone + Debug + 'static + Hash + ZeroCopySend,
> Abandonable for WriterSharedState<Service, KeyType>
{
unsafe fn abandon_in_place(mut this: NonNull<Self>) {
let this = unsafe { this.as_mut() };
unsafe {
SharedServiceState::abandon_in_place(NonNull::iox2_from_mut(&mut this.service_state))
};
}
}
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub enum WriterCreateError {
ExceedsMaxSupportedWriters,
InternalFailure,
FailedToDeployThreadsafetyPolicy,
UnableToCreatePortTag,
}
impl core::fmt::Display for WriterCreateError {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
write!(f, "WriterCreateError::{self:?}")
}
}
impl core::error::Error for WriterCreateError {}
#[derive(Debug)]
pub struct Writer<
Service: service::Service,
KeyType: Send + Sync + Eq + Clone + Copy + Debug + 'static + Hash + ZeroCopySend,
> {
shared_state: Service::ArcThreadSafetyPolicy<WriterSharedState<Service, KeyType>>,
writer_id: UniqueWriterId,
port_tag: Service::StaticStorage,
}
impl<
Service: service::Service,
KeyType: Send + Sync + Eq + Clone + Copy + Debug + 'static + Hash + ZeroCopySend,
> Abandonable for Writer<Service, KeyType>
{
unsafe fn abandon_in_place(mut this: NonNull<Self>) {
let this = unsafe { this.as_mut() };
unsafe {
Service::ArcThreadSafetyPolicy::abandon_in_place(NonNull::iox2_from_mut(
&mut this.shared_state,
))
};
unsafe {
Service::StaticStorage::abandon_in_place(NonNull::iox2_from_mut(&mut this.port_tag))
};
}
}
impl<
Service: service::Service,
KeyType: Send + Sync + Eq + Clone + Copy + Debug + 'static + Hash + ZeroCopySend,
> Writer<Service, KeyType>
{
pub(crate) fn new(
service: SharedServiceState<Service, BlackboardResources<Service>>,
) -> Result<Self, WriterCreateError> {
let origin = "Writer::new()";
let msg = "Unable to create Writer port";
let writer_id = UniqueWriterId::new();
let port_tag = match service
.shared_node()
.create_port_tag(origin, msg, writer_id.0.value())
{
Ok(port_tag) => port_tag,
Err(e) => {
fail!(from origin, with WriterCreateError::UnableToCreatePortTag,
"{msg} since the port tag, that is required for cleanup, could not be created. [{e:?}]");
}
};
let shared_state = Service::ArcThreadSafetyPolicy::new(WriterSharedState {
service_state: service.clone(),
dynamic_writer_handle: UnsafeCell::new(None),
_key: PhantomData,
});
let shared_state = match shared_state {
Ok(v) => v,
Err(e) => {
fail!(from origin, with WriterCreateError::FailedToDeployThreadsafetyPolicy,
"{msg} since the threadsafety policy could not be instantiated ({e:?}).");
}
};
let new_self = Self {
shared_state,
writer_id,
port_tag,
};
core::sync::atomic::compiler_fence(Ordering::SeqCst);
let dynamic_writer_handle = match service
.dynamic_storage()
.get()
.blackboard()
.add_writer_id(WriterDetails {
writer_id,
node_id: *service.shared_node().id(),
}) {
Some(unique_index) => unique_index,
None => {
fail!(from origin, with WriterCreateError::ExceedsMaxSupportedWriters,
"{} since it would exceed the maximum supported amount of writers of {}.",
msg, service.static_config().blackboard().max_writers);
}
};
unsafe {
*new_self.shared_state.lock().dynamic_writer_handle.get() = Some(dynamic_writer_handle)
};
Ok(new_self)
}
pub fn id(&self) -> UniqueWriterId {
self.writer_id
}
pub fn entry<ValueType: Copy + ZeroCopySend>(
&self,
key: &KeyType,
) -> Result<EntryHandleMut<Service, KeyType, ValueType>, EntryHandleMutError> {
let msg = "Unable to create entry handle";
let key_mem = match KeyMemory::try_from(key) {
Ok(mem) => mem,
Err(_) => {
fatal_panic!(from self, "This should never happen! Key with invalid layout passed.");
}
};
let offset = self.get_entry_offset(
&key_mem,
&TypeDetail::new::<ValueType>(TypeVariant::FixedSize),
msg,
)?;
match EntryHandleMut::new(self.shared_state.clone(), offset) {
Ok(handle) => Ok(handle),
Err(e) => {
fail!(from self, with e,
"{} since a handle for the passed key and value type already exists.", msg);
}
}
}
fn get_entry_offset(
&self,
key_mem: &KeyMemory<MAX_BLACKBOARD_KEY_SIZE>,
value_type_details: &TypeDetail,
msg: &str,
) -> Result<u64, EntryHandleMutError> {
let shared_state = self.shared_state.lock();
let index = match unsafe {
shared_state
.service_state
.additional_resource()
.mgmt
.get()
.map
.__internal_get(
key_mem,
shared_state
.service_state
.additional_resource()
.key_eq_func
.as_ref(),
)
} {
Some(i) => i,
None => {
fail!(from self, with EntryHandleMutError::EntryDoesNotExist,
"{} since no entry with the given key exists.", msg);
}
};
let entry = &shared_state
.service_state
.additional_resource()
.mgmt
.get()
.entries[index];
if *value_type_details != entry.type_details {
fail!(from self, with EntryHandleMutError::EntryDoesNotExist,
"{} since no entry with the given key and value type exists.", msg);
}
let offset = entry.offset.load(core::sync::atomic::Ordering::Relaxed);
Ok(offset)
}
}
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub enum EntryHandleMutError {
EntryDoesNotExist,
HandleAlreadyExists,
}
impl core::fmt::Display for EntryHandleMutError {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
write!(f, "EntryHandleMutError::{self:?}")
}
}
impl core::error::Error for EntryHandleMutError {}
pub struct EntryHandleMut<
Service: service::Service,
KeyType: Send + Sync + Eq + Clone + Debug + 'static + Hash + ZeroCopySend,
ValueType: Copy + 'static,
> {
producer: Producer<'static, ValueType>,
entry_id: EventId,
_shared_state: Service::ArcThreadSafetyPolicy<WriterSharedState<Service, KeyType>>,
}
unsafe impl<
Service: service::Service,
KeyType: Send + Sync + Eq + Clone + Debug + 'static + Hash + ZeroCopySend,
ValueType: Copy + 'static,
> Send for EntryHandleMut<Service, KeyType, ValueType>
{
}
unsafe impl<
Service: service::Service,
KeyType: Send + Sync + Eq + Clone + Debug + 'static + Hash + ZeroCopySend,
ValueType: Copy + 'static,
> Sync for EntryHandleMut<Service, KeyType, ValueType>
{
}
impl<
Service: service::Service,
KeyType: Send + Sync + Eq + Clone + Debug + 'static + Hash + ZeroCopySend,
ValueType: Copy + 'static,
> EntryHandleMut<Service, KeyType, ValueType>
{
fn new(
writer_state: Service::ArcThreadSafetyPolicy<WriterSharedState<Service, KeyType>>,
offset: u64,
) -> Result<Self, EntryHandleMutError> {
let atomic = (writer_state
.lock()
.service_state
.additional_resource()
.data
.payload_start_address() as u64
+ offset) as *mut UnrestrictedAtomic<ValueType>;
match unsafe { (*atomic).acquire_producer() } {
None => Err(EntryHandleMutError::HandleAlreadyExists),
Some(producer) => {
let p: Producer<'static, ValueType> = unsafe { core::mem::transmute(producer) };
Ok(Self {
producer: p,
_shared_state: writer_state.clone(),
entry_id: EventId::new(offset as _),
})
}
}
}
pub fn update_with_copy(&self, value: ValueType) {
self.producer.store(value);
}
pub fn loan_uninit(self) -> EntryValueUninit<Service, KeyType, ValueType> {
EntryValueUninit::new(self)
}
pub fn entry_id(&self) -> EventId {
self.entry_id
}
}
pub struct EntryValueUninit<
Service: service::Service,
KeyType: Send + Sync + Eq + Clone + Debug + 'static + Hash + ZeroCopySend,
ValueType: Copy + 'static,
> {
ptr: *mut ValueType,
entry_handle_mut: EntryHandleMut<Service, KeyType, ValueType>,
}
unsafe impl<
Service: service::Service,
KeyType: Send + Sync + Eq + Clone + Debug + 'static + Hash + ZeroCopySend,
ValueType: Copy + 'static,
> Send for EntryValueUninit<Service, KeyType, ValueType>
{
}
unsafe impl<
Service: service::Service,
KeyType: Send + Sync + Eq + Clone + Debug + 'static + Hash + ZeroCopySend,
ValueType: Copy + 'static,
> Sync for EntryValueUninit<Service, KeyType, ValueType>
{
}
impl<
Service: service::Service,
KeyType: Send + Sync + Eq + Clone + Debug + 'static + Hash + ZeroCopySend,
ValueType: Copy + 'static,
> EntryValueUninit<Service, KeyType, ValueType>
{
fn new(entry_handle_mut: EntryHandleMut<Service, KeyType, ValueType>) -> Self {
let ptr = unsafe { entry_handle_mut.producer.__internal_get_ptr_to_write_cell() };
Self {
ptr,
entry_handle_mut,
}
}
pub fn update_with_copy(self, value: ValueType) -> EntryHandleMut<Service, KeyType, ValueType> {
unsafe { self.ptr.write(value) };
unsafe {
self.entry_handle_mut
.producer
.__internal_update_write_cell()
};
self.entry_handle_mut
}
pub fn discard(self) -> EntryHandleMut<Service, KeyType, ValueType> {
self.entry_handle_mut
}
pub fn value_mut(&mut self) -> &mut MaybeUninit<ValueType> {
unsafe {
&mut *core::mem::transmute::<*mut ValueType, *mut MaybeUninit<ValueType>>(self.ptr)
}
}
pub unsafe fn assume_init_and_update(self) -> EntryHandleMut<Service, KeyType, ValueType> {
unsafe {
self.entry_handle_mut
.producer
.__internal_update_write_cell();
}
self.entry_handle_mut
}
}
impl<Service: service::Service> Writer<Service, CustomKeyMarker> {
#[doc(hidden)]
pub unsafe fn __internal_entry(
&self,
key: *const u8,
value_type_details: &TypeDetail,
) -> Result<__InternalEntryHandleMut<Service>, EntryHandleMutError> {
let msg = "Unable to create entry handle";
let shared_state = self.shared_state.lock();
let key_type_details = shared_state
.service_state
.static_config()
.blackboard()
.type_details();
let key_layout = unsafe {
Layout::from_size_align_unchecked(key_type_details.size, key_type_details.alignment)
};
let key_mem = unsafe {
match KeyMemory::try_from_ptr(key, key_layout) {
Ok(mem) => mem,
Err(_) => {
fatal_panic!(from self, "This should never happen! Key with invalid layout set.");
}
}
};
let offset = self.get_entry_offset(&key_mem, value_type_details, msg)?;
let atomic_mgmt_ptr = (shared_state
.service_state
.additional_resource()
.data
.payload_start_address() as u64
+ offset) as *const UnrestrictedAtomicMgmt;
let data_ptr = atomic_mgmt_ptr as usize + core::mem::size_of::<UnrestrictedAtomicMgmt>();
let data_ptr = align(data_ptr, value_type_details.alignment);
match __InternalEntryHandleMut::new(
atomic_mgmt_ptr,
data_ptr as *mut u8,
EventId::new(offset as _),
self.shared_state.clone(),
) {
Ok(handle) => Ok(handle),
Err(e) => {
fail!(from self, with e,
"{} since a handle for the passed key and value type already exists.", msg);
}
}
}
}
#[doc(hidden)]
pub struct __InternalEntryHandleMut<Service: service::Service> {
atomic_mgmt_ptr: *const UnrestrictedAtomicMgmt,
data_ptr: *mut u8,
entry_id: EventId,
_shared_state: Service::ArcThreadSafetyPolicy<WriterSharedState<Service, CustomKeyMarker>>,
}
impl<Service: service::Service> Drop for __InternalEntryHandleMut<Service> {
fn drop(&mut self) {
unsafe { (*self.atomic_mgmt_ptr).__internal_release_producer() };
}
}
unsafe impl<Service: service::Service> Send for __InternalEntryHandleMut<Service> {}
unsafe impl<Service: service::Service> Sync for __InternalEntryHandleMut<Service> {}
impl<Service: service::Service> __InternalEntryHandleMut<Service> {
fn new(
atomic_mgmt_ptr: *const UnrestrictedAtomicMgmt,
data_ptr: *mut u8,
entry_id: EventId,
writer_state: Service::ArcThreadSafetyPolicy<WriterSharedState<Service, CustomKeyMarker>>,
) -> Result<Self, EntryHandleMutError> {
match unsafe { (*atomic_mgmt_ptr).__internal_acquire_producer() } {
Ok(_) => Ok(Self {
atomic_mgmt_ptr,
data_ptr,
entry_id,
_shared_state: writer_state.clone(),
}),
Err(_) => Err(EntryHandleMutError::HandleAlreadyExists),
}
}
pub fn loan_uninit(
self,
value_size: usize,
value_alignment: usize,
) -> __InternalEntryValueUninit<Service> {
__InternalEntryValueUninit::new(self, value_size, value_alignment)
}
pub fn entry_id(&self) -> EventId {
self.entry_id
}
pub unsafe fn __internal_get_ptr_to_write_cell(
&self,
value_size: usize,
value_alignment: usize,
) -> *mut u8 {
unsafe {
(*self.atomic_mgmt_ptr).__internal_get_ptr_to_write_cell(
value_size,
value_alignment,
self.data_ptr,
)
}
}
pub unsafe fn __internal_update_write_cell(&self) {
unsafe { (*self.atomic_mgmt_ptr).__internal_update_write_cell() };
}
}
#[doc(hidden)]
pub struct __InternalEntryValueUninit<Service: service::Service> {
write_cell_ptr: *mut u8,
entry_handle_mut: __InternalEntryHandleMut<Service>,
}
unsafe impl<Service: service::Service> Send for __InternalEntryValueUninit<Service> {}
unsafe impl<Service: service::Service> Sync for __InternalEntryValueUninit<Service> {}
impl<Service: service::Service> __InternalEntryValueUninit<Service> {
pub fn new(
entry_handle_mut: __InternalEntryHandleMut<Service>,
value_size: usize,
value_alignment: usize,
) -> Self {
let write_cell_ptr = unsafe {
(*entry_handle_mut.atomic_mgmt_ptr).__internal_get_ptr_to_write_cell(
value_size,
value_alignment,
entry_handle_mut.data_ptr,
)
};
Self {
write_cell_ptr,
entry_handle_mut,
}
}
pub fn write_cell(&self) -> *mut u8 {
self.write_cell_ptr
}
pub fn update(self) -> __InternalEntryHandleMut<Service> {
unsafe {
(*self.entry_handle_mut.atomic_mgmt_ptr).__internal_update_write_cell();
}
self.entry_handle_mut
}
pub fn discard(self) -> __InternalEntryHandleMut<Service> {
self.entry_handle_mut
}
}