use core::any::TypeId;
use core::fmt::Debug;
use core::ptr::NonNull;
use core::{marker::PhantomData, mem::MaybeUninit};
use alloc::vec::Vec;
use iceoryx2_bb_concurrency::atomic::Ordering;
use iceoryx2_bb_concurrency::atomic::{AtomicBool, AtomicUsize};
use iceoryx2_bb_concurrency::cell::UnsafeCell;
use iceoryx2_bb_container::queue::Queue;
use iceoryx2_bb_elementary::CallbackProgression;
use iceoryx2_bb_elementary::cyclic_tagger::CyclicTagger;
use iceoryx2_bb_elementary_traits::non_null::NonNullCompat;
use iceoryx2_bb_elementary_traits::testing::abandonable::Abandonable;
use iceoryx2_bb_elementary_traits::zero_copy_send::ZeroCopySend;
use iceoryx2_bb_lock_free::mpmc::container::{ContainerHandle, ContainerState};
use iceoryx2_bb_posix::unique_system_id::UniqueSystemId;
use iceoryx2_cal::arc_sync_policy::ArcSyncPolicy;
use iceoryx2_cal::dynamic_storage::DynamicStorage;
use iceoryx2_cal::shm_allocator::{AllocationStrategy, PointerOffset};
use iceoryx2_cal::zero_copy_connection::{
CHANNEL_STATE_OPEN, ChannelId, ZeroCopyCreationError, ZeroCopyPortDetails, ZeroCopySender,
};
use iceoryx2_log::{fail, warn};
use crate::port::details::sender::*;
use crate::port::update_connections::{ConnectionFailure, UpdateConnections};
use crate::prelude::BackpressureStrategy;
use crate::raw_sample::RawSampleMut;
use crate::sample_mut::SampleMut;
use crate::sample_mut_uninit::SampleMutUninit;
use crate::service::builder::{CustomHeaderMarker, CustomPayloadMarker};
use crate::service::dynamic_config::publish_subscribe::{PublisherDetails, SubscriberDetails};
use crate::service::header::publish_subscribe::Header;
use crate::service::naming_scheme::data_segment_name;
use crate::service::port_factory::publisher::{LocalPublisherConfig, PortFactoryPublisher};
use crate::service::static_config::message_type_details::TypeVariant;
use crate::service::{self};
use super::details::data_segment::{DataSegment, DataSegmentType};
use super::details::segment_state::SegmentState;
use super::{LoanError, SendError};
use crate::identifiers::UniquePublisherId;
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
pub enum PublisherCreateError {
ExceedsMaxSupportedPublishers,
UnableToCreateDataSegment,
FailedToDeployThreadsafetyPolicy,
UnableToCreatePortTag,
}
impl core::fmt::Display for PublisherCreateError {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
write!(f, "PublisherCreateError::{self:?}")
}
}
impl core::error::Error for PublisherCreateError {}
#[derive(Debug, Clone, Copy)]
struct OffsetAndSize {
offset: u64,
size: usize,
}
#[derive(Debug)]
pub(crate) struct PublisherSharedState<Service: service::Service> {
config: LocalPublisherConfig,
pub(crate) sender: Sender<Service>,
subscriber_list_state: UnsafeCell<ContainerState<SubscriberDetails>>,
history: Option<UnsafeCell<Queue<OffsetAndSize>>>,
is_active: AtomicBool,
port_tag: Service::StaticStorage,
}
impl<Service: service::Service> Abandonable for PublisherSharedState<Service> {
unsafe fn abandon_in_place(mut this: NonNull<Self>) {
let this = unsafe { this.as_mut() };
unsafe { Sender::<Service>::abandon_in_place(NonNull::iox2_from_mut(&mut this.sender)) }
unsafe {
Service::StaticStorage::abandon_in_place(NonNull::iox2_from_mut(&mut this.port_tag))
}
}
}
impl<Service: service::Service> PublisherSharedState<Service> {
fn add_sample_to_history(&self, offset: PointerOffset, sample_size: usize) {
match &self.history {
None => (),
Some(history) => {
let history = unsafe { &mut *history.get() };
self.sender.borrow_sample(offset);
match history.push_with_overflow(OffsetAndSize {
offset: offset.as_value(),
size: sample_size,
}) {
None => (),
Some(old) => self
.sender
.release_sample(PointerOffset::from_value(old.offset)),
}
}
}
}
fn force_update_connections(&self) -> Result<(), ZeroCopyCreationError> {
let mut result = Ok(());
self.sender.start_update_connection_cycle();
unsafe {
(*self.subscriber_list_state.get()).for_each(|index, port| {
let inner_result = self.sender.update_connection(
index,
ReceiverDetails {
port_id: port.subscriber_id.value(),
buffer_size: port.buffer_size,
},
|connection| self.deliver_sample_history(connection),
);
if result.is_ok() {
result = inner_result;
}
CallbackProgression::Continue
})
};
self.sender.finish_update_connection_cycle();
result
}
fn update_connections(&self) -> Result<(), ConnectionFailure> {
if unsafe {
self.sender
.service_state
.dynamic_storage()
.get()
.publish_subscribe()
.subscribers
.update_state(&mut *self.subscriber_list_state.get())
} {
fail!(from self, when self.force_update_connections(),
"Connections were updated only partially since at least one connection to a Subscriber port failed.");
}
Ok(())
}
fn deliver_sample_history(&self, connection: &Connection<Service>) {
match &self.history {
None => (),
Some(history) => {
let history = unsafe { &mut *history.get() };
let buffer_size = connection.sender.buffer_size();
let history_start = history.len().saturating_sub(buffer_size);
for i in history_start..history.len() {
let old_sample = unsafe { history.get_unchecked(i) };
self.sender.retrieve_returned_samples();
let offset = PointerOffset::from_value(old_sample.offset);
match connection
.sender
.try_send(offset, old_sample.size, ChannelId::new(0))
{
Ok(overflow) => {
self.sender.borrow_sample(offset);
if let Some(old) = overflow {
self.sender.release_sample(old);
}
}
Err(e) => {
warn!(from self, "Failed to deliver history to new subscriber via {:?} due to {:?}", connection, e);
}
}
}
}
}
}
pub(crate) fn send_sample(
&self,
offset: PointerOffset,
sample_size: usize,
) -> Result<usize, SendError> {
let msg = "Unable to send sample";
if !self.is_active.load(Ordering::Relaxed) {
fail!(from self, with SendError::ConnectionBrokenSinceSenderNoLongerExists,
"{} since the corresponding publisher is already disconnected.", msg);
}
fail!(from self, when self.update_connections(),
"{} since the connections could not be updated.", msg);
self.add_sample_to_history(offset, sample_size);
self.sender
.deliver_offset(offset, sample_size, ChannelId::new(0))
}
}
#[derive(Debug)]
pub struct Publisher<
Service: service::Service,
Payload: Debug + ZeroCopySend + ?Sized + 'static,
UserHeader: Debug + ZeroCopySend,
> {
pub(crate) publisher_shared_state:
Service::ArcThreadSafetyPolicy<PublisherSharedState<Service>>,
dynamic_publisher_handle: Option<ContainerHandle>,
_payload: PhantomData<Payload>,
_user_header: PhantomData<UserHeader>,
}
unsafe impl<
Service: service::Service,
Payload: Debug + ZeroCopySend + ?Sized,
UserHeader: Debug + ZeroCopySend,
> Send for Publisher<Service, Payload, UserHeader>
where
Service::ArcThreadSafetyPolicy<PublisherSharedState<Service>>: Send + Sync,
{
}
unsafe impl<
Service: service::Service,
Payload: Debug + ZeroCopySend + ?Sized,
UserHeader: Debug + ZeroCopySend,
> Sync for Publisher<Service, Payload, UserHeader>
where
Service::ArcThreadSafetyPolicy<PublisherSharedState<Service>>: Send + Sync,
{
}
impl<
Service: service::Service,
Payload: Debug + ZeroCopySend + ?Sized,
UserHeader: Debug + ZeroCopySend,
> Abandonable for Publisher<Service, Payload, UserHeader>
{
unsafe fn abandon_in_place(mut this: NonNull<Self>) {
let this = unsafe { this.as_mut() };
unsafe {
Service::ArcThreadSafetyPolicy::abandon_in_place(NonNull::iox2_from_mut(
&mut this.publisher_shared_state,
))
};
}
}
impl<
Service: service::Service,
Payload: Debug + ZeroCopySend + ?Sized,
UserHeader: Debug + ZeroCopySend,
> Drop for Publisher<Service, Payload, UserHeader>
{
fn drop(&mut self) {
let shared_state = self.publisher_shared_state.lock();
shared_state.is_active.store(false, Ordering::Relaxed);
if let Some(handle) = self.dynamic_publisher_handle {
shared_state
.sender
.service_state
.dynamic_storage()
.get()
.publish_subscribe()
.release_publisher_handle(handle)
}
}
}
impl<
Service: service::Service,
Payload: Debug + ZeroCopySend + ?Sized,
UserHeader: Debug + ZeroCopySend,
> Publisher<Service, Payload, UserHeader>
{
pub(crate) fn new(
publisher_factory: PortFactoryPublisher<Service, Payload, UserHeader>,
) -> Result<Self, PublisherCreateError> {
let msg = "Unable to create Publisher port";
let origin = "Publisher::new()";
let port_id = UniquePublisherId::new();
let config = &publisher_factory.config;
let service = &publisher_factory.factory.service;
let port_tag = match service
.shared_node()
.create_port_tag(origin, msg, port_id.0.value())
{
Ok(port_tag) => port_tag,
Err(e) => {
fail!(from origin, with PublisherCreateError::UnableToCreatePortTag,
"{msg} since the port tag, that is required for cleanup, could not be created. [{e:?}]");
}
};
let static_config = publisher_factory
.factory
.service
.static_config()
.publish_subscribe();
let subscriber_list = &service
.dynamic_storage()
.get()
.publish_subscribe()
.subscribers;
let number_of_samples = unsafe {
service
.static_config()
.messaging_pattern
.publish_subscribe()
}
.required_amount_of_samples_per_data_segment(config.max_loaned_samples);
let number_of_samples = publisher_factory
.preallocate_number_of_samples_override
.call(number_of_samples);
let data_segment_type =
DataSegmentType::new_from_allocation_strategy(config.allocation_strategy);
let sample_layout = static_config
.message_type_details
.sample_layout(config.initial_max_slice_len);
let max_slice_len = config.initial_max_slice_len;
let max_number_of_segments =
DataSegment::<Service>::max_number_of_segments(data_segment_type);
let publisher_details = PublisherDetails {
data_segment_type,
publisher_id: port_id,
number_of_samples,
max_slice_len,
node_id: *service.shared_node().id(),
max_number_of_segments,
};
let global_config = service.shared_node().config();
let segment_name = data_segment_name(publisher_details.publisher_id.value());
let data_segment = match data_segment_type {
DataSegmentType::Static => DataSegment::create_static_segment(
&segment_name,
sample_layout,
global_config,
number_of_samples,
),
DataSegmentType::Dynamic => DataSegment::create_dynamic_segment(
&segment_name,
sample_layout,
global_config,
number_of_samples,
config.allocation_strategy,
),
};
let data_segment = fail!(from origin,
when data_segment,
with PublisherCreateError::UnableToCreateDataSegment,
"{} since the data segment could not be acquired.", msg);
let publisher_shared_state =
<Service as service::Service>::ArcThreadSafetyPolicy::new(PublisherSharedState {
port_tag,
is_active: AtomicBool::new(true),
sender: Sender {
data_segment,
segment_states: {
let mut v: Vec<SegmentState> =
Vec::with_capacity(max_number_of_segments as usize);
for _ in 0..max_number_of_segments {
v.push(SegmentState::new(number_of_samples))
}
v
},
connections: (0..subscriber_list.capacity())
.map(|_| UnsafeCell::new(None))
.collect(),
sender_port_id: port_id.value(),
shared_node: service.shared_node().clone(),
receiver_max_buffer_size: static_config.subscriber_max_buffer_size,
receiver_max_borrowed_samples: static_config.subscriber_max_borrowed_samples,
enable_safe_overflow: static_config.enable_safe_overflow,
number_of_samples,
max_number_of_segments,
degradation_handler: publisher_factory.degradation_handler,
backpressure_handler: publisher_factory.backpressure_handler,
service_state: service.clone(),
tagger: CyclicTagger::new(),
loan_counter: AtomicUsize::new(0),
sender_max_borrowed_samples: config.max_loaned_samples,
backpressure_strategy: config.backpressure_strategy,
message_type_details: static_config.message_type_details,
number_of_channels: 1,
initial_channel_state: CHANNEL_STATE_OPEN,
},
config: *config,
subscriber_list_state: UnsafeCell::new(unsafe { subscriber_list.get_state() }),
history: match static_config.history_size == 0 {
true => None,
false => Some(UnsafeCell::new(Queue::new(static_config.history_size))),
},
});
let publisher_shared_state = match publisher_shared_state {
Ok(v) => v,
Err(e) => {
fail!(from origin,
with PublisherCreateError::FailedToDeployThreadsafetyPolicy,
"{msg} since the threadsafety policy could not be instantiated ({e:?}).");
}
};
let mut new_self = Self {
publisher_shared_state,
dynamic_publisher_handle: None,
_payload: PhantomData,
_user_header: PhantomData,
};
if let Err(e) = new_self
.publisher_shared_state
.lock()
.force_update_connections()
{
warn!(from new_self,
"The new Publisher port is unable to connect to every Subscriber port, caused by {:?}.", e);
}
core::sync::atomic::compiler_fence(Ordering::SeqCst);
let dynamic_publisher_handle = match service
.dynamic_storage()
.get()
.publish_subscribe()
.add_publisher_id(publisher_details)
{
Some(unique_index) => unique_index,
None => {
fail!(from origin, with PublisherCreateError::ExceedsMaxSupportedPublishers,
"{} since it would exceed the maximum supported amount of publishers of {}.",
msg, service.static_config().publish_subscribe().max_publishers);
}
};
new_self.dynamic_publisher_handle = Some(dynamic_publisher_handle);
Ok(new_self)
}
pub fn id(&self) -> UniquePublisherId {
UniquePublisherId(UniqueSystemId::from(
self.publisher_shared_state.lock().sender.sender_port_id,
))
}
pub fn backpressure_strategy(&self) -> BackpressureStrategy {
self.publisher_shared_state
.lock()
.sender
.backpressure_strategy
}
}
impl<
Service: service::Service,
Payload: Debug + ZeroCopySend + Sized,
UserHeader: Default + Debug + ZeroCopySend,
> Publisher<Service, Payload, UserHeader>
{
pub fn send_copy(&self, value: Payload) -> Result<usize, SendError> {
let msg = "Unable to send copy of payload";
let sample = fail!(from self, when self.loan_uninit(),
"{} since the loan of a sample failed.", msg);
sample.write_payload(value).send()
}
pub fn loan_uninit(
&self,
) -> Result<SampleMutUninit<Service, MaybeUninit<Payload>, UserHeader>, LoanError> {
let shared_state = self.publisher_shared_state.lock();
let chunk = shared_state
.sender
.allocate(shared_state.sender.sample_layout(1))?;
let node_id = shared_state.sender.service_state.shared_node().id();
let header_ptr = chunk.header as *mut Header;
let user_header_ptr: *mut UserHeader = chunk.user_header.cast();
unsafe { header_ptr.write(Header::new(*node_id, self.id(), 1)) };
unsafe { user_header_ptr.write(UserHeader::default()) };
let sample = unsafe {
RawSampleMut::new_unchecked(header_ptr, user_header_ptr, chunk.payload.cast())
};
Ok(
SampleMutUninit::<Service, MaybeUninit<Payload>, UserHeader>::new(
&self.publisher_shared_state,
sample,
chunk.offset,
chunk.size,
),
)
}
}
impl<
Service: service::Service,
Payload: Default + Debug + ZeroCopySend + Sized,
UserHeader: Default + Debug + ZeroCopySend,
> Publisher<Service, Payload, UserHeader>
{
pub fn loan(&self) -> Result<SampleMut<Service, Payload, UserHeader>, LoanError> {
Ok(self.loan_uninit()?.write_payload(Payload::default()))
}
}
impl<
Service: service::Service,
Payload: Default + Debug + ZeroCopySend,
UserHeader: Default + Debug + ZeroCopySend,
> Publisher<Service, [Payload], UserHeader>
{
pub fn loan_slice(
&self,
number_of_elements: usize,
) -> Result<SampleMut<Service, [Payload], UserHeader>, LoanError> {
let sample = self.loan_slice_uninit(number_of_elements)?;
Ok(sample.write_from_fn(|_| Payload::default()))
}
}
impl<Service: service::Service, Payload: Debug + ZeroCopySend, UserHeader: Debug + ZeroCopySend>
Publisher<Service, [Payload], UserHeader>
{
pub fn initial_max_slice_len(&self) -> usize {
self.publisher_shared_state
.lock()
.config
.initial_max_slice_len
}
}
impl<
Service: service::Service,
Payload: Debug + ZeroCopySend,
UserHeader: Default + Debug + ZeroCopySend,
> Publisher<Service, [Payload], UserHeader>
{
pub fn loan_slice_uninit(
&self,
slice_len: usize,
) -> Result<SampleMutUninit<Service, [MaybeUninit<Payload>], UserHeader>, LoanError> {
debug_assert!(TypeId::of::<Payload>() != TypeId::of::<CustomPayloadMarker>());
self.loan_slice_uninit_impl(slice_len, slice_len)
}
fn loan_slice_uninit_impl(
&self,
slice_len: usize,
underlying_number_of_slice_elements: usize,
) -> Result<SampleMutUninit<Service, [MaybeUninit<Payload>], UserHeader>, LoanError> {
let shared_state = self.publisher_shared_state.lock();
let max_slice_len = shared_state.config.initial_max_slice_len;
if shared_state.config.allocation_strategy == AllocationStrategy::Static
&& max_slice_len < slice_len
{
fail!(from self, with LoanError::ExceedsMaxLoanSize,
"Unable to loan slice with {} elements since it would exceed the max supported slice length of {}.",
slice_len, max_slice_len);
}
let sample_layout = shared_state.sender.sample_layout(slice_len);
let chunk = shared_state.sender.allocate(sample_layout)?;
let user_header_ptr: *mut UserHeader = chunk.user_header.cast();
let header_ptr = chunk.header as *mut Header;
let node_id = shared_state.sender.service_state.shared_node().id();
unsafe { header_ptr.write(Header::new(*node_id, self.id(), slice_len as _)) };
unsafe { user_header_ptr.write(UserHeader::default()) };
let sample = unsafe {
RawSampleMut::new_unchecked(
header_ptr,
user_header_ptr,
core::ptr::slice_from_raw_parts_mut(
chunk.payload.cast(),
underlying_number_of_slice_elements,
),
)
};
Ok(
SampleMutUninit::<Service, [MaybeUninit<Payload>], UserHeader>::new(
&self.publisher_shared_state,
sample,
chunk.offset,
chunk.size,
),
)
}
}
impl<Service: service::Service> Publisher<Service, [CustomPayloadMarker], CustomHeaderMarker> {
#[doc(hidden)]
pub unsafe fn loan_custom_payload(
&self,
slice_len: usize,
) -> Result<
SampleMutUninit<Service, [MaybeUninit<CustomPayloadMarker>], CustomHeaderMarker>,
LoanError,
> {
let shared_state = self.publisher_shared_state.lock();
debug_assert!(
slice_len == 1 || shared_state.sender.payload_type_variant() == TypeVariant::Dynamic
);
self.loan_slice_uninit_impl(slice_len, shared_state.sender.payload_size() * slice_len)
}
}
impl<
Service: service::Service,
Payload: Debug + ZeroCopySend + ?Sized,
UserHeader: Debug + ZeroCopySend,
> UpdateConnections for Publisher<Service, Payload, UserHeader>
{
fn update_connections(&self) -> Result<(), ConnectionFailure> {
self.publisher_shared_state.lock().update_connections()
}
}