1use super::details::data_segment::{DataSegment, DataSegmentType};
105use super::details::segment_state::SegmentState;
106use super::port_identifiers::UniquePublisherId;
107use super::{LoanError, SendError};
108use crate::port::details::sender::*;
109use crate::port::update_connections::{ConnectionFailure, UpdateConnections};
110use crate::prelude::UnableToDeliverStrategy;
111use crate::raw_sample::RawSampleMut;
112use crate::sample_mut::SampleMut;
113use crate::sample_mut_uninit::SampleMutUninit;
114use crate::service::builder::{CustomHeaderMarker, CustomPayloadMarker};
115use crate::service::dynamic_config::publish_subscribe::{PublisherDetails, SubscriberDetails};
116use crate::service::header::publish_subscribe::Header;
117use crate::service::naming_scheme::data_segment_name;
118use crate::service::port_factory::publisher::LocalPublisherConfig;
119use crate::service::static_config::message_type_details::TypeVariant;
120use crate::service::static_config::publish_subscribe;
121use crate::service::{self, NoResource, ServiceState};
122use alloc::sync::Arc;
123use core::any::TypeId;
124use core::cell::UnsafeCell;
125use core::fmt::Debug;
126use core::sync::atomic::Ordering;
127use core::{marker::PhantomData, mem::MaybeUninit};
128use iceoryx2_bb_container::queue::Queue;
129use iceoryx2_bb_elementary::cyclic_tagger::CyclicTagger;
130use iceoryx2_bb_elementary::CallbackProgression;
131use iceoryx2_bb_elementary_traits::zero_copy_send::ZeroCopySend;
132use iceoryx2_bb_lock_free::mpmc::container::{ContainerHandle, ContainerState};
133use iceoryx2_bb_log::{fail, warn};
134use iceoryx2_bb_posix::unique_system_id::UniqueSystemId;
135use iceoryx2_cal::arc_sync_policy::ArcSyncPolicy;
136use iceoryx2_cal::dynamic_storage::DynamicStorage;
137use iceoryx2_cal::shm_allocator::{AllocationStrategy, PointerOffset};
138use iceoryx2_cal::zero_copy_connection::{
139 ChannelId, ZeroCopyCreationError, ZeroCopyPortDetails, ZeroCopySender,
140};
141use iceoryx2_pal_concurrency_sync::iox_atomic::{IoxAtomicBool, IoxAtomicUsize};
142
143#[derive(Debug, PartialEq, Eq, Copy, Clone)]
146pub enum PublisherCreateError {
147 ExceedsMaxSupportedPublishers,
152 UnableToCreateDataSegment,
154 FailedToDeployThreadsafetyPolicy,
157}
158
159impl core::fmt::Display for PublisherCreateError {
160 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
161 write!(f, "PublisherCreateError::{self:?}")
162 }
163}
164
165impl core::error::Error for PublisherCreateError {}
166
167#[derive(Debug, Clone, Copy)]
168struct OffsetAndSize {
169 offset: u64,
170 size: usize,
171}
172
173#[derive(Debug)]
174pub(crate) struct PublisherSharedState<Service: service::Service> {
175 config: LocalPublisherConfig,
176 pub(crate) sender: Sender<Service>,
177 subscriber_list_state: UnsafeCell<ContainerState<SubscriberDetails>>,
178 history: Option<UnsafeCell<Queue<OffsetAndSize>>>,
179 is_active: IoxAtomicBool,
180}
181
182impl<Service: service::Service> PublisherSharedState<Service> {
183 fn add_sample_to_history(&self, offset: PointerOffset, sample_size: usize) {
184 match &self.history {
185 None => (),
186 Some(history) => {
187 let history = unsafe { &mut *history.get() };
188 self.sender.borrow_sample(offset);
189 match history.push_with_overflow(OffsetAndSize {
190 offset: offset.as_value(),
191 size: sample_size,
192 }) {
193 None => (),
194 Some(old) => self
195 .sender
196 .release_sample(PointerOffset::from_value(old.offset)),
197 }
198 }
199 }
200 }
201
202 fn force_update_connections(&self) -> Result<(), ZeroCopyCreationError> {
203 let mut result = Ok(());
204 self.sender.start_update_connection_cycle();
205 unsafe {
206 (*self.subscriber_list_state.get()).for_each(|h, port| {
207 let inner_result = self.sender.update_connection(
208 h.index() as usize,
209 ReceiverDetails {
210 port_id: port.subscriber_id.value(),
211 buffer_size: port.buffer_size,
212 },
213 |connection| self.deliver_sample_history(connection),
214 );
215
216 if result.is_ok() {
217 result = inner_result;
218 }
219
220 CallbackProgression::Continue
221 })
222 };
223
224 self.sender.finish_update_connection_cycle();
225
226 result
227 }
228
229 fn update_connections(&self) -> Result<(), ConnectionFailure> {
230 if unsafe {
231 self.sender
232 .service_state
233 .dynamic_storage
234 .get()
235 .publish_subscribe()
236 .subscribers
237 .update_state(&mut *self.subscriber_list_state.get())
238 } {
239 fail!(from self, when self.force_update_connections(),
240 "Connections were updated only partially since at least one connection to a Subscriber port failed.");
241 }
242
243 Ok(())
244 }
245
246 fn deliver_sample_history(&self, connection: &Connection<Service>) {
247 match &self.history {
248 None => (),
249 Some(history) => {
250 let history = unsafe { &mut *history.get() };
251 let buffer_size = connection.sender.buffer_size();
252 let history_start = history.len().saturating_sub(buffer_size);
253
254 for i in history_start..history.len() {
255 let old_sample = unsafe { history.get_unchecked(i) };
256 self.sender.retrieve_returned_samples();
257
258 let offset = PointerOffset::from_value(old_sample.offset);
259 match connection
260 .sender
261 .try_send(offset, old_sample.size, ChannelId::new(0))
262 {
263 Ok(overflow) => {
264 self.sender.borrow_sample(offset);
265
266 if let Some(old) = overflow {
267 self.sender.release_sample(old);
268 }
269 }
270 Err(e) => {
271 warn!(from self, "Failed to deliver history to new subscriber via {:?} due to {:?}", connection, e);
272 }
273 }
274 }
275 }
276 }
277 }
278
279 pub(crate) fn send_sample(
280 &self,
281 offset: PointerOffset,
282 sample_size: usize,
283 ) -> Result<usize, SendError> {
284 let msg = "Unable to send sample";
285 if !self.is_active.load(Ordering::Relaxed) {
286 fail!(from self, with SendError::ConnectionBrokenSinceSenderNoLongerExists,
287 "{} since the corresponding publisher is already disconnected.", msg);
288 }
289
290 fail!(from self, when self.update_connections(),
291 "{} since the connections could not be updated.", msg);
292
293 self.add_sample_to_history(offset, sample_size);
294 self.sender
295 .deliver_offset(offset, sample_size, ChannelId::new(0))
296 }
297}
298
299#[derive(Debug)]
301pub struct Publisher<
302 Service: service::Service,
303 Payload: Debug + ZeroCopySend + ?Sized + 'static,
304 UserHeader: Debug + ZeroCopySend,
305> {
306 pub(crate) publisher_shared_state:
307 Service::ArcThreadSafetyPolicy<PublisherSharedState<Service>>,
308 dynamic_publisher_handle: Option<ContainerHandle>,
309 _payload: PhantomData<Payload>,
310 _user_header: PhantomData<UserHeader>,
311}
312
313unsafe impl<
314 Service: service::Service,
315 Payload: Debug + ZeroCopySend + ?Sized,
316 UserHeader: Debug + ZeroCopySend,
317 > Send for Publisher<Service, Payload, UserHeader>
318where
319 Service::ArcThreadSafetyPolicy<PublisherSharedState<Service>>: Send + Sync,
320{
321}
322
323unsafe impl<
324 Service: service::Service,
325 Payload: Debug + ZeroCopySend + ?Sized,
326 UserHeader: Debug + ZeroCopySend,
327 > Sync for Publisher<Service, Payload, UserHeader>
328where
329 Service::ArcThreadSafetyPolicy<PublisherSharedState<Service>>: Send + Sync,
330{
331}
332
333impl<
334 Service: service::Service,
335 Payload: Debug + ZeroCopySend + ?Sized,
336 UserHeader: Debug + ZeroCopySend,
337 > Drop for Publisher<Service, Payload, UserHeader>
338{
339 fn drop(&mut self) {
340 let shared_state = self.publisher_shared_state.lock();
341 shared_state.is_active.store(false, Ordering::Relaxed);
342 if let Some(handle) = self.dynamic_publisher_handle {
343 shared_state
344 .sender
345 .service_state
346 .dynamic_storage
347 .get()
348 .publish_subscribe()
349 .release_publisher_handle(handle)
350 }
351 }
352}
353
354impl<
355 Service: service::Service,
356 Payload: Debug + ZeroCopySend + ?Sized,
357 UserHeader: Debug + ZeroCopySend,
358 > Publisher<Service, Payload, UserHeader>
359{
360 pub(crate) fn new(
361 service: Arc<ServiceState<Service, NoResource>>,
362 static_config: &publish_subscribe::StaticConfig,
363 config: LocalPublisherConfig,
364 ) -> Result<Self, PublisherCreateError> {
365 let msg = "Unable to create Publisher port";
366 let origin = "Publisher::new()";
367 let port_id = UniquePublisherId::new();
368 let subscriber_list = &service
369 .dynamic_storage
370 .get()
371 .publish_subscribe()
372 .subscribers;
373
374 let number_of_samples =
375 unsafe { service.static_config.messaging_pattern.publish_subscribe() }
376 .required_amount_of_samples_per_data_segment(config.max_loaned_samples);
377
378 let data_segment_type =
379 DataSegmentType::new_from_allocation_strategy(config.allocation_strategy);
380
381 let sample_layout = static_config
382 .message_type_details
383 .sample_layout(config.initial_max_slice_len);
384
385 let max_slice_len = config.initial_max_slice_len;
386 let max_number_of_segments =
387 DataSegment::<Service>::max_number_of_segments(data_segment_type);
388 let publisher_details = PublisherDetails {
389 data_segment_type,
390 publisher_id: port_id,
391 number_of_samples,
392 max_slice_len,
393 node_id: *service.shared_node.id(),
394 max_number_of_segments,
395 };
396 let global_config = service.shared_node.config();
397
398 let segment_name = data_segment_name(publisher_details.publisher_id.value());
399 let data_segment = match data_segment_type {
400 DataSegmentType::Static => DataSegment::create_static_segment(
401 &segment_name,
402 sample_layout,
403 global_config,
404 number_of_samples,
405 ),
406 DataSegmentType::Dynamic => DataSegment::create_dynamic_segment(
407 &segment_name,
408 sample_layout,
409 global_config,
410 number_of_samples,
411 config.allocation_strategy,
412 ),
413 };
414
415 let data_segment = fail!(from origin,
416 when data_segment,
417 with PublisherCreateError::UnableToCreateDataSegment,
418 "{} since the data segment could not be acquired.", msg);
419
420 let publisher_shared_state =
421 <Service as service::Service>::ArcThreadSafetyPolicy::new(PublisherSharedState {
422 is_active: IoxAtomicBool::new(true),
423 sender: Sender {
424 data_segment,
425 segment_states: {
426 let mut v: Vec<SegmentState> =
427 Vec::with_capacity(max_number_of_segments as usize);
428 for _ in 0..max_number_of_segments {
429 v.push(SegmentState::new(number_of_samples))
430 }
431 v
432 },
433 connections: (0..subscriber_list.capacity())
434 .map(|_| UnsafeCell::new(None))
435 .collect(),
436 sender_port_id: port_id.value(),
437 shared_node: service.shared_node.clone(),
438 receiver_max_buffer_size: static_config.subscriber_max_buffer_size,
439 receiver_max_borrowed_samples: static_config.subscriber_max_borrowed_samples,
440 enable_safe_overflow: static_config.enable_safe_overflow,
441 number_of_samples,
442 max_number_of_segments,
443 degradation_callback: None,
444 service_state: service.clone(),
445 tagger: CyclicTagger::new(),
446 loan_counter: IoxAtomicUsize::new(0),
447 sender_max_borrowed_samples: config.max_loaned_samples,
448 unable_to_deliver_strategy: config.unable_to_deliver_strategy,
449 message_type_details: static_config.message_type_details.clone(),
450 number_of_channels: 1,
451 },
452 config,
453 subscriber_list_state: UnsafeCell::new(unsafe { subscriber_list.get_state() }),
454 history: match static_config.history_size == 0 {
455 true => None,
456 false => Some(UnsafeCell::new(Queue::new(static_config.history_size))),
457 },
458 });
459
460 let publisher_shared_state = match publisher_shared_state {
461 Ok(v) => v,
462 Err(e) => {
463 fail!(from origin,
464 with PublisherCreateError::FailedToDeployThreadsafetyPolicy,
465 "{msg} since the threadsafety policy could not be instantiated ({e:?}).");
466 }
467 };
468
469 let mut new_self = Self {
470 publisher_shared_state,
471 dynamic_publisher_handle: None,
472 _payload: PhantomData,
473 _user_header: PhantomData,
474 };
475
476 if let Err(e) = new_self
477 .publisher_shared_state
478 .lock()
479 .force_update_connections()
480 {
481 warn!(from new_self,
482 "The new Publisher port is unable to connect to every Subscriber port, caused by {:?}.", e);
483 }
484
485 core::sync::atomic::compiler_fence(Ordering::SeqCst);
486
487 let dynamic_publisher_handle = match service
490 .dynamic_storage
491 .get()
492 .publish_subscribe()
493 .add_publisher_id(publisher_details)
494 {
495 Some(unique_index) => unique_index,
496 None => {
497 fail!(from origin, with PublisherCreateError::ExceedsMaxSupportedPublishers,
498 "{} since it would exceed the maximum supported amount of publishers of {}.",
499 msg, service.static_config.publish_subscribe().max_publishers);
500 }
501 };
502
503 new_self.dynamic_publisher_handle = Some(dynamic_publisher_handle);
504
505 Ok(new_self)
506 }
507
508 pub fn id(&self) -> UniquePublisherId {
510 UniquePublisherId(UniqueSystemId::from(
511 self.publisher_shared_state.lock().sender.sender_port_id,
512 ))
513 }
514
515 pub fn unable_to_deliver_strategy(&self) -> UnableToDeliverStrategy {
518 self.publisher_shared_state
519 .lock()
520 .sender
521 .unable_to_deliver_strategy
522 }
523}
524
525impl<
529 Service: service::Service,
530 Payload: Debug + ZeroCopySend + Sized,
531 UserHeader: Default + Debug + ZeroCopySend,
532 > Publisher<Service, Payload, UserHeader>
533{
534 pub fn send_copy(&self, value: Payload) -> Result<usize, SendError> {
557 let msg = "Unable to send copy of payload";
558 let sample = fail!(from self, when self.loan_uninit(),
559 "{} since the loan of a sample failed.", msg);
560
561 sample.write_payload(value).send()
562 }
563
564 pub fn loan_uninit(
592 &self,
593 ) -> Result<SampleMutUninit<Service, MaybeUninit<Payload>, UserHeader>, LoanError> {
594 let shared_state = self.publisher_shared_state.lock();
595 let chunk = shared_state
596 .sender
597 .allocate(shared_state.sender.sample_layout(1))?;
598 let node_id = shared_state.sender.service_state.shared_node.id();
599 let header_ptr = chunk.header as *mut Header;
600 let user_header_ptr: *mut UserHeader = chunk.user_header.cast();
601 unsafe { header_ptr.write(Header::new(*node_id, self.id(), 1)) };
602 unsafe { user_header_ptr.write(UserHeader::default()) };
603
604 let sample = unsafe {
605 RawSampleMut::new_unchecked(header_ptr, user_header_ptr, chunk.payload.cast())
606 };
607 Ok(
608 SampleMutUninit::<Service, MaybeUninit<Payload>, UserHeader>::new(
609 &self.publisher_shared_state,
610 sample,
611 chunk.offset,
612 chunk.size,
613 ),
614 )
615 }
616}
617
618impl<
619 Service: service::Service,
620 Payload: Default + Debug + ZeroCopySend + Sized,
621 UserHeader: Default + Debug + ZeroCopySend,
622 > Publisher<Service, Payload, UserHeader>
623{
624 pub fn loan(&self) -> Result<SampleMut<Service, Payload, UserHeader>, LoanError> {
652 Ok(self.loan_uninit()?.write_payload(Payload::default()))
653 }
654}
655impl<
663 Service: service::Service,
664 Payload: Default + Debug + ZeroCopySend,
665 UserHeader: Default + Debug + ZeroCopySend,
666 > Publisher<Service, [Payload], UserHeader>
667{
668 pub fn loan_slice(
700 &self,
701 number_of_elements: usize,
702 ) -> Result<SampleMut<Service, [Payload], UserHeader>, LoanError> {
703 let sample = self.loan_slice_uninit(number_of_elements)?;
704 Ok(sample.write_from_fn(|_| Payload::default()))
705 }
706}
707
708impl<
709 Service: service::Service,
710 Payload: Debug + ZeroCopySend,
711 UserHeader: Debug + ZeroCopySend,
712 > Publisher<Service, [Payload], UserHeader>
713{
714 pub fn initial_max_slice_len(&self) -> usize {
716 self.publisher_shared_state
717 .lock()
718 .config
719 .initial_max_slice_len
720 }
721}
722
723impl<
724 Service: service::Service,
725 Payload: Debug + ZeroCopySend,
726 UserHeader: Default + Debug + ZeroCopySend,
727 > Publisher<Service, [Payload], UserHeader>
728{
729 pub fn loan_slice_uninit(
757 &self,
758 slice_len: usize,
759 ) -> Result<SampleMutUninit<Service, [MaybeUninit<Payload>], UserHeader>, LoanError> {
760 debug_assert!(TypeId::of::<Payload>() != TypeId::of::<CustomPayloadMarker>());
762
763 self.loan_slice_uninit_impl(slice_len, slice_len)
764 }
765
766 fn loan_slice_uninit_impl(
767 &self,
768 slice_len: usize,
769 underlying_number_of_slice_elements: usize,
770 ) -> Result<SampleMutUninit<Service, [MaybeUninit<Payload>], UserHeader>, LoanError> {
771 let shared_state = self.publisher_shared_state.lock();
772 let max_slice_len = shared_state.config.initial_max_slice_len;
773 if shared_state.config.allocation_strategy == AllocationStrategy::Static
774 && max_slice_len < slice_len
775 {
776 fail!(from self, with LoanError::ExceedsMaxLoanSize,
777 "Unable to loan slice with {} elements since it would exceed the max supported slice length of {}.",
778 slice_len, max_slice_len);
779 }
780
781 let sample_layout = shared_state.sender.sample_layout(slice_len);
782 let chunk = shared_state.sender.allocate(sample_layout)?;
783 let user_header_ptr: *mut UserHeader = chunk.user_header.cast();
784 let header_ptr = chunk.header as *mut Header;
785 let node_id = shared_state.sender.service_state.shared_node.id();
786 unsafe { header_ptr.write(Header::new(*node_id, self.id(), slice_len as _)) };
787 unsafe { user_header_ptr.write(UserHeader::default()) };
788
789 let sample = unsafe {
790 RawSampleMut::new_unchecked(
791 header_ptr,
792 user_header_ptr,
793 core::slice::from_raw_parts_mut(
794 chunk.payload.cast(),
795 underlying_number_of_slice_elements,
796 ),
797 )
798 };
799
800 Ok(
801 SampleMutUninit::<Service, [MaybeUninit<Payload>], UserHeader>::new(
802 &self.publisher_shared_state,
803 sample,
804 chunk.offset,
805 chunk.size,
806 ),
807 )
808 }
809}
810
811impl<Service: service::Service> Publisher<Service, [CustomPayloadMarker], CustomHeaderMarker> {
812 #[doc(hidden)]
820 pub unsafe fn loan_custom_payload(
821 &self,
822 slice_len: usize,
823 ) -> Result<
824 SampleMutUninit<Service, [MaybeUninit<CustomPayloadMarker>], CustomHeaderMarker>,
825 LoanError,
826 > {
827 let shared_state = self.publisher_shared_state.lock();
828
829 debug_assert!(
831 slice_len == 1 || shared_state.sender.payload_type_variant() == TypeVariant::Dynamic
832 );
833
834 self.loan_slice_uninit_impl(slice_len, shared_state.sender.payload_size() * slice_len)
835 }
836}
837impl<
842 Service: service::Service,
843 Payload: Debug + ZeroCopySend + ?Sized,
844 UserHeader: Debug + ZeroCopySend,
845 > UpdateConnections for Publisher<Service, Payload, UserHeader>
846{
847 fn update_connections(&self) -> Result<(), ConnectionFailure> {
848 self.publisher_shared_state.lock().update_connections()
849 }
850}