iceoryx2/port/
publisher.rs

1// Copyright (c) 2023 - 2024 Contributors to the Eclipse Foundation
2//
3// See the NOTICE file(s) distributed with this work for additional
4// information regarding copyright ownership.
5//
6// This program and the accompanying materials are made available under the
7// terms of the Apache Software License 2.0 which is available at
8// https://www.apache.org/licenses/LICENSE-2.0, or the MIT license
9// which is available at https://opensource.org/licenses/MIT.
10//
11// SPDX-License-Identifier: Apache-2.0 OR MIT
12
13//! # Examples
14//!
15//! ## Typed API
16//!
17//! ```
18//! use iceoryx2::prelude::*;
19//!
20//! # fn main() -> Result<(), Box<dyn core::error::Error>> {
21//! let node = NodeBuilder::new().create::<ipc::Service>()?;
22//! let service = node.service_builder(&"My/Funk/ServiceName".try_into()?)
23//!     .publish_subscribe::<u64>()
24//!     .open_or_create()?;
25//!
26//! let publisher = service
27//!     .publisher_builder()
28//!     // defines how many samples can be loaned in parallel
29//!     .max_loaned_samples(5)
30//!     // defines behavior when subscriber queue is full in an non-overflowing service
31//!     .unable_to_deliver_strategy(UnableToDeliverStrategy::DiscardSample)
32//!     .create()?;
33//!
34//! // loan some initialized memory and send it
35//! // the payload type must implement the [`core::default::Default`] trait in order to be able to use this API
36//! let mut sample = publisher.loan()?;
37//! *sample.payload_mut() = 1337;
38//! sample.send()?;
39//!
40//! // loan some uninitialized memory and send it
41//! let sample = publisher.loan_uninit()?;
42//! let sample = sample.write_payload(1337);
43//! sample.send()?;
44//!
45//! // loan some uninitialized memory and send it (with direct access of [`core::mem::MaybeUninit<Payload>`])
46//! let mut sample = publisher.loan_uninit()?;
47//! sample.payload_mut().write(1337);
48//! let sample = unsafe { sample.assume_init() };
49//! sample.send()?;
50//!
51//! // send a copy of the value
52//! publisher.send_copy(313)?;
53//!
54//! # Ok(())
55//! # }
56//! ```
57//!
58//! ## Slice API
59//!
60//! ```
61//! use iceoryx2::prelude::*;
62//!
63//! # fn main() -> Result<(), Box<dyn core::error::Error>> {
64//! let node = NodeBuilder::new().create::<ipc::Service>()?;
65//! let service = node.service_builder(&"My/Funk/ServiceName".try_into()?)
66//!     .publish_subscribe::<[usize]>()
67//!     .open_or_create()?;
68//!
69//! let publisher = service
70//!     .publisher_builder()
71//!     // defines the maximum length of a slice
72//!     .initial_max_slice_len(128)
73//!     // defines how many samples can be loaned in parallel
74//!     .max_loaned_samples(5)
75//!     // defines behavior when subscriber queue is full in an non-overflowing service
76//!     .unable_to_deliver_strategy(UnableToDeliverStrategy::DiscardSample)
77//!     .create()?;
78//!
79//! // loan some initialized memory and send it
80//! // the payload type must implement the [`core::default::Default`] trait in order to be able to use this API
81//! // we acquire a slice of length 12
82//! let mut sample = publisher.loan_slice(12)?;
83//! sample.payload_mut()[5] = 1337;
84//! sample.send()?;
85//!
86//! // loan uninitialized slice of length 60 and send it
87//! let sample = publisher.loan_slice_uninit(60)?;
88//! // initialize the n element of the slice with the value n * 123
89//! let sample = sample.write_from_fn(|n| n * 123 );
90//! sample.send()?;
91//!
92//! // loan some uninitialized memory and send it (with direct access of [`core::mem::MaybeUninit<Payload>`])
93//! let mut sample = publisher.loan_slice_uninit(42)?;
94//! for element in sample.payload_mut() {
95//!     element.write(1337);
96//! }
97//! let sample = unsafe { sample.assume_init() };
98//! sample.send()?;
99//!
100//! # Ok(())
101//! # }
102//! ```
103
104use super::details::data_segment::{DataSegment, DataSegmentType};
105use super::details::segment_state::SegmentState;
106use super::port_identifiers::UniquePublisherId;
107use super::{LoanError, SendError};
108use crate::port::details::sender::*;
109use crate::port::update_connections::{ConnectionFailure, UpdateConnections};
110use crate::prelude::UnableToDeliverStrategy;
111use crate::raw_sample::RawSampleMut;
112use crate::sample_mut::SampleMut;
113use crate::sample_mut_uninit::SampleMutUninit;
114use crate::service::builder::{CustomHeaderMarker, CustomPayloadMarker};
115use crate::service::dynamic_config::publish_subscribe::{PublisherDetails, SubscriberDetails};
116use crate::service::header::publish_subscribe::Header;
117use crate::service::naming_scheme::data_segment_name;
118use crate::service::port_factory::publisher::LocalPublisherConfig;
119use crate::service::static_config::message_type_details::TypeVariant;
120use crate::service::static_config::publish_subscribe;
121use crate::service::{self, NoResource, ServiceState};
122use alloc::sync::Arc;
123use core::any::TypeId;
124use core::cell::UnsafeCell;
125use core::fmt::Debug;
126use core::sync::atomic::Ordering;
127use core::{marker::PhantomData, mem::MaybeUninit};
128use iceoryx2_bb_container::queue::Queue;
129use iceoryx2_bb_elementary::cyclic_tagger::CyclicTagger;
130use iceoryx2_bb_elementary::CallbackProgression;
131use iceoryx2_bb_elementary_traits::zero_copy_send::ZeroCopySend;
132use iceoryx2_bb_lock_free::mpmc::container::{ContainerHandle, ContainerState};
133use iceoryx2_bb_log::{fail, warn};
134use iceoryx2_bb_posix::unique_system_id::UniqueSystemId;
135use iceoryx2_cal::arc_sync_policy::ArcSyncPolicy;
136use iceoryx2_cal::dynamic_storage::DynamicStorage;
137use iceoryx2_cal::shm_allocator::{AllocationStrategy, PointerOffset};
138use iceoryx2_cal::zero_copy_connection::{
139    ChannelId, ZeroCopyCreationError, ZeroCopyPortDetails, ZeroCopySender,
140};
141use iceoryx2_pal_concurrency_sync::iox_atomic::{IoxAtomicBool, IoxAtomicUsize};
142
143/// Defines a failure that can occur when a [`Publisher`] is created with
144/// [`crate::service::port_factory::publisher::PortFactoryPublisher`].
145#[derive(Debug, PartialEq, Eq, Copy, Clone)]
146pub enum PublisherCreateError {
147    /// The maximum amount of [`Publisher`]s that can connect to a
148    /// [`Service`](crate::service::Service) is
149    /// defined in [`crate::config::Config`]. When this is exceeded no more [`Publisher`]s
150    /// can be created for a specific [`Service`](crate::service::Service).
151    ExceedsMaxSupportedPublishers,
152    /// The datasegment in which the payload of the [`Publisher`] is stored, could not be created.
153    UnableToCreateDataSegment,
154    /// Caused by a failure when instantiating a [`ArcSyncPolicy`] defined in the
155    /// [`Service`](crate::service::Service) as `ArcThreadSafetyPolicy`.
156    FailedToDeployThreadsafetyPolicy,
157}
158
159impl core::fmt::Display for PublisherCreateError {
160    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
161        write!(f, "PublisherCreateError::{self:?}")
162    }
163}
164
165impl core::error::Error for PublisherCreateError {}
166
167#[derive(Debug, Clone, Copy)]
168struct OffsetAndSize {
169    offset: u64,
170    size: usize,
171}
172
173#[derive(Debug)]
174pub(crate) struct PublisherSharedState<Service: service::Service> {
175    config: LocalPublisherConfig,
176    pub(crate) sender: Sender<Service>,
177    subscriber_list_state: UnsafeCell<ContainerState<SubscriberDetails>>,
178    history: Option<UnsafeCell<Queue<OffsetAndSize>>>,
179    is_active: IoxAtomicBool,
180}
181
182impl<Service: service::Service> PublisherSharedState<Service> {
183    fn add_sample_to_history(&self, offset: PointerOffset, sample_size: usize) {
184        match &self.history {
185            None => (),
186            Some(history) => {
187                let history = unsafe { &mut *history.get() };
188                self.sender.borrow_sample(offset);
189                match history.push_with_overflow(OffsetAndSize {
190                    offset: offset.as_value(),
191                    size: sample_size,
192                }) {
193                    None => (),
194                    Some(old) => self
195                        .sender
196                        .release_sample(PointerOffset::from_value(old.offset)),
197                }
198            }
199        }
200    }
201
202    fn force_update_connections(&self) -> Result<(), ZeroCopyCreationError> {
203        let mut result = Ok(());
204        self.sender.start_update_connection_cycle();
205        unsafe {
206            (*self.subscriber_list_state.get()).for_each(|h, port| {
207                let inner_result = self.sender.update_connection(
208                    h.index() as usize,
209                    ReceiverDetails {
210                        port_id: port.subscriber_id.value(),
211                        buffer_size: port.buffer_size,
212                    },
213                    |connection| self.deliver_sample_history(connection),
214                );
215
216                if result.is_ok() {
217                    result = inner_result;
218                }
219
220                CallbackProgression::Continue
221            })
222        };
223
224        self.sender.finish_update_connection_cycle();
225
226        result
227    }
228
229    fn update_connections(&self) -> Result<(), ConnectionFailure> {
230        if unsafe {
231            self.sender
232                .service_state
233                .dynamic_storage
234                .get()
235                .publish_subscribe()
236                .subscribers
237                .update_state(&mut *self.subscriber_list_state.get())
238        } {
239            fail!(from self, when self.force_update_connections(),
240                "Connections were updated only partially since at least one connection to a Subscriber port failed.");
241        }
242
243        Ok(())
244    }
245
246    fn deliver_sample_history(&self, connection: &Connection<Service>) {
247        match &self.history {
248            None => (),
249            Some(history) => {
250                let history = unsafe { &mut *history.get() };
251                let buffer_size = connection.sender.buffer_size();
252                let history_start = history.len().saturating_sub(buffer_size);
253
254                for i in history_start..history.len() {
255                    let old_sample = unsafe { history.get_unchecked(i) };
256                    self.sender.retrieve_returned_samples();
257
258                    let offset = PointerOffset::from_value(old_sample.offset);
259                    match connection
260                        .sender
261                        .try_send(offset, old_sample.size, ChannelId::new(0))
262                    {
263                        Ok(overflow) => {
264                            self.sender.borrow_sample(offset);
265
266                            if let Some(old) = overflow {
267                                self.sender.release_sample(old);
268                            }
269                        }
270                        Err(e) => {
271                            warn!(from self, "Failed to deliver history to new subscriber via {:?} due to {:?}", connection, e);
272                        }
273                    }
274                }
275            }
276        }
277    }
278
279    pub(crate) fn send_sample(
280        &self,
281        offset: PointerOffset,
282        sample_size: usize,
283    ) -> Result<usize, SendError> {
284        let msg = "Unable to send sample";
285        if !self.is_active.load(Ordering::Relaxed) {
286            fail!(from self, with SendError::ConnectionBrokenSinceSenderNoLongerExists,
287                "{} since the corresponding publisher is already disconnected.", msg);
288        }
289
290        fail!(from self, when self.update_connections(),
291            "{} since the connections could not be updated.", msg);
292
293        self.add_sample_to_history(offset, sample_size);
294        self.sender
295            .deliver_offset(offset, sample_size, ChannelId::new(0))
296    }
297}
298
299/// Sending endpoint of a publish-subscriber based communication.
300#[derive(Debug)]
301pub struct Publisher<
302    Service: service::Service,
303    Payload: Debug + ZeroCopySend + ?Sized + 'static,
304    UserHeader: Debug + ZeroCopySend,
305> {
306    pub(crate) publisher_shared_state:
307        Service::ArcThreadSafetyPolicy<PublisherSharedState<Service>>,
308    dynamic_publisher_handle: Option<ContainerHandle>,
309    _payload: PhantomData<Payload>,
310    _user_header: PhantomData<UserHeader>,
311}
312
313unsafe impl<
314        Service: service::Service,
315        Payload: Debug + ZeroCopySend + ?Sized,
316        UserHeader: Debug + ZeroCopySend,
317    > Send for Publisher<Service, Payload, UserHeader>
318where
319    Service::ArcThreadSafetyPolicy<PublisherSharedState<Service>>: Send + Sync,
320{
321}
322
323unsafe impl<
324        Service: service::Service,
325        Payload: Debug + ZeroCopySend + ?Sized,
326        UserHeader: Debug + ZeroCopySend,
327    > Sync for Publisher<Service, Payload, UserHeader>
328where
329    Service::ArcThreadSafetyPolicy<PublisherSharedState<Service>>: Send + Sync,
330{
331}
332
333impl<
334        Service: service::Service,
335        Payload: Debug + ZeroCopySend + ?Sized,
336        UserHeader: Debug + ZeroCopySend,
337    > Drop for Publisher<Service, Payload, UserHeader>
338{
339    fn drop(&mut self) {
340        let shared_state = self.publisher_shared_state.lock();
341        shared_state.is_active.store(false, Ordering::Relaxed);
342        if let Some(handle) = self.dynamic_publisher_handle {
343            shared_state
344                .sender
345                .service_state
346                .dynamic_storage
347                .get()
348                .publish_subscribe()
349                .release_publisher_handle(handle)
350        }
351    }
352}
353
354impl<
355        Service: service::Service,
356        Payload: Debug + ZeroCopySend + ?Sized,
357        UserHeader: Debug + ZeroCopySend,
358    > Publisher<Service, Payload, UserHeader>
359{
360    pub(crate) fn new(
361        service: Arc<ServiceState<Service, NoResource>>,
362        static_config: &publish_subscribe::StaticConfig,
363        config: LocalPublisherConfig,
364    ) -> Result<Self, PublisherCreateError> {
365        let msg = "Unable to create Publisher port";
366        let origin = "Publisher::new()";
367        let port_id = UniquePublisherId::new();
368        let subscriber_list = &service
369            .dynamic_storage
370            .get()
371            .publish_subscribe()
372            .subscribers;
373
374        let number_of_samples =
375            unsafe { service.static_config.messaging_pattern.publish_subscribe() }
376                .required_amount_of_samples_per_data_segment(config.max_loaned_samples);
377
378        let data_segment_type =
379            DataSegmentType::new_from_allocation_strategy(config.allocation_strategy);
380
381        let sample_layout = static_config
382            .message_type_details
383            .sample_layout(config.initial_max_slice_len);
384
385        let max_slice_len = config.initial_max_slice_len;
386        let max_number_of_segments =
387            DataSegment::<Service>::max_number_of_segments(data_segment_type);
388        let publisher_details = PublisherDetails {
389            data_segment_type,
390            publisher_id: port_id,
391            number_of_samples,
392            max_slice_len,
393            node_id: *service.shared_node.id(),
394            max_number_of_segments,
395        };
396        let global_config = service.shared_node.config();
397
398        let segment_name = data_segment_name(publisher_details.publisher_id.value());
399        let data_segment = match data_segment_type {
400            DataSegmentType::Static => DataSegment::create_static_segment(
401                &segment_name,
402                sample_layout,
403                global_config,
404                number_of_samples,
405            ),
406            DataSegmentType::Dynamic => DataSegment::create_dynamic_segment(
407                &segment_name,
408                sample_layout,
409                global_config,
410                number_of_samples,
411                config.allocation_strategy,
412            ),
413        };
414
415        let data_segment = fail!(from origin,
416                when data_segment,
417                with PublisherCreateError::UnableToCreateDataSegment,
418                "{} since the data segment could not be acquired.", msg);
419
420        let publisher_shared_state =
421            <Service as service::Service>::ArcThreadSafetyPolicy::new(PublisherSharedState {
422                is_active: IoxAtomicBool::new(true),
423                sender: Sender {
424                    data_segment,
425                    segment_states: {
426                        let mut v: Vec<SegmentState> =
427                            Vec::with_capacity(max_number_of_segments as usize);
428                        for _ in 0..max_number_of_segments {
429                            v.push(SegmentState::new(number_of_samples))
430                        }
431                        v
432                    },
433                    connections: (0..subscriber_list.capacity())
434                        .map(|_| UnsafeCell::new(None))
435                        .collect(),
436                    sender_port_id: port_id.value(),
437                    shared_node: service.shared_node.clone(),
438                    receiver_max_buffer_size: static_config.subscriber_max_buffer_size,
439                    receiver_max_borrowed_samples: static_config.subscriber_max_borrowed_samples,
440                    enable_safe_overflow: static_config.enable_safe_overflow,
441                    number_of_samples,
442                    max_number_of_segments,
443                    degradation_callback: None,
444                    service_state: service.clone(),
445                    tagger: CyclicTagger::new(),
446                    loan_counter: IoxAtomicUsize::new(0),
447                    sender_max_borrowed_samples: config.max_loaned_samples,
448                    unable_to_deliver_strategy: config.unable_to_deliver_strategy,
449                    message_type_details: static_config.message_type_details.clone(),
450                    number_of_channels: 1,
451                },
452                config,
453                subscriber_list_state: UnsafeCell::new(unsafe { subscriber_list.get_state() }),
454                history: match static_config.history_size == 0 {
455                    true => None,
456                    false => Some(UnsafeCell::new(Queue::new(static_config.history_size))),
457                },
458            });
459
460        let publisher_shared_state = match publisher_shared_state {
461            Ok(v) => v,
462            Err(e) => {
463                fail!(from origin,
464                            with PublisherCreateError::FailedToDeployThreadsafetyPolicy,
465                            "{msg} since the threadsafety policy could not be instantiated ({e:?}).");
466            }
467        };
468
469        let mut new_self = Self {
470            publisher_shared_state,
471            dynamic_publisher_handle: None,
472            _payload: PhantomData,
473            _user_header: PhantomData,
474        };
475
476        if let Err(e) = new_self
477            .publisher_shared_state
478            .lock()
479            .force_update_connections()
480        {
481            warn!(from new_self,
482                "The new Publisher port is unable to connect to every Subscriber port, caused by {:?}.", e);
483        }
484
485        core::sync::atomic::compiler_fence(Ordering::SeqCst);
486
487        // !MUST! be the last task otherwise a publisher is added to the dynamic config without the
488        // creation of all required resources
489        let dynamic_publisher_handle = match service
490            .dynamic_storage
491            .get()
492            .publish_subscribe()
493            .add_publisher_id(publisher_details)
494        {
495            Some(unique_index) => unique_index,
496            None => {
497                fail!(from origin, with PublisherCreateError::ExceedsMaxSupportedPublishers,
498                            "{} since it would exceed the maximum supported amount of publishers of {}.",
499                            msg, service.static_config.publish_subscribe().max_publishers);
500            }
501        };
502
503        new_self.dynamic_publisher_handle = Some(dynamic_publisher_handle);
504
505        Ok(new_self)
506    }
507
508    /// Returns the [`UniquePublisherId`] of the [`Publisher`]
509    pub fn id(&self) -> UniquePublisherId {
510        UniquePublisherId(UniqueSystemId::from(
511            self.publisher_shared_state.lock().sender.sender_port_id,
512        ))
513    }
514
515    /// Returns the strategy the [`Publisher`] follows when a [`SampleMut`] cannot be delivered
516    /// since the [`Subscriber`](crate::port::subscriber::Subscriber)s buffer is full.
517    pub fn unable_to_deliver_strategy(&self) -> UnableToDeliverStrategy {
518        self.publisher_shared_state
519            .lock()
520            .sender
521            .unable_to_deliver_strategy
522    }
523}
524
525////////////////////////
526// BEGIN: typed API
527////////////////////////
528impl<
529        Service: service::Service,
530        Payload: Debug + ZeroCopySend + Sized,
531        UserHeader: Default + Debug + ZeroCopySend,
532    > Publisher<Service, Payload, UserHeader>
533{
534    /// Copies the input `value` into a [`crate::sample_mut::SampleMut`] and delivers it.
535    /// On success it returns the number of [`crate::port::subscriber::Subscriber`]s that received
536    /// the data, otherwise a [`SendError`] describing the failure.
537    ///
538    /// # Example
539    ///
540    /// ```
541    /// use iceoryx2::prelude::*;
542    /// # fn main() -> Result<(), Box<dyn core::error::Error>> {
543    /// # let node = NodeBuilder::new().create::<ipc::Service>()?;
544    /// #
545    /// # let service = node.service_builder(&"My/Funk/ServiceName".try_into()?)
546    /// #     .publish_subscribe::<u64>()
547    /// #     .open_or_create()?;
548    /// #
549    /// # let publisher = service.publisher_builder()
550    /// #                        .create()?;
551    ///
552    /// publisher.send_copy(1234)?;
553    /// # Ok(())
554    /// # }
555    /// ```
556    pub fn send_copy(&self, value: Payload) -> Result<usize, SendError> {
557        let msg = "Unable to send copy of payload";
558        let sample = fail!(from self, when self.loan_uninit(),
559                                    "{} since the loan of a sample failed.", msg);
560
561        sample.write_payload(value).send()
562    }
563
564    /// Loans/allocates a [`SampleMutUninit`] from the underlying data segment of the [`Publisher`].
565    /// The user has to initialize the payload before it can be sent.
566    ///
567    /// On failure it returns [`LoanError`] describing the failure.
568    ///
569    /// # Example
570    ///
571    /// ```
572    /// use iceoryx2::prelude::*;
573    /// # fn main() -> Result<(), Box<dyn core::error::Error>> {
574    /// # let node = NodeBuilder::new().create::<ipc::Service>()?;
575    /// #
576    /// # let service = node.service_builder(&"My/Funk/ServiceName".try_into()?)
577    /// #     .publish_subscribe::<u64>()
578    /// #     .open_or_create()?;
579    /// #
580    /// # let publisher = service.publisher_builder()
581    /// #                        .create()?;
582    ///
583    /// let sample = publisher.loan_uninit()?;
584    /// let sample = sample.write_payload(42); // alternatively `sample.payload_mut()` can be use to access the `MaybeUninit<Payload>`
585    ///
586    /// sample.send()?;
587    ///
588    /// # Ok(())
589    /// # }
590    /// ```
591    pub fn loan_uninit(
592        &self,
593    ) -> Result<SampleMutUninit<Service, MaybeUninit<Payload>, UserHeader>, LoanError> {
594        let shared_state = self.publisher_shared_state.lock();
595        let chunk = shared_state
596            .sender
597            .allocate(shared_state.sender.sample_layout(1))?;
598        let node_id = shared_state.sender.service_state.shared_node.id();
599        let header_ptr = chunk.header as *mut Header;
600        let user_header_ptr: *mut UserHeader = chunk.user_header.cast();
601        unsafe { header_ptr.write(Header::new(*node_id, self.id(), 1)) };
602        unsafe { user_header_ptr.write(UserHeader::default()) };
603
604        let sample = unsafe {
605            RawSampleMut::new_unchecked(header_ptr, user_header_ptr, chunk.payload.cast())
606        };
607        Ok(
608            SampleMutUninit::<Service, MaybeUninit<Payload>, UserHeader>::new(
609                &self.publisher_shared_state,
610                sample,
611                chunk.offset,
612                chunk.size,
613            ),
614        )
615    }
616}
617
618impl<
619        Service: service::Service,
620        Payload: Default + Debug + ZeroCopySend + Sized,
621        UserHeader: Default + Debug + ZeroCopySend,
622    > Publisher<Service, Payload, UserHeader>
623{
624    /// Loans/allocates a [`crate::sample_mut::SampleMut`] from the underlying data segment of the [`Publisher`]
625    /// and initialize it with the default value. This can be a performance hit and [`Publisher::loan_uninit`]
626    /// can be used to loan a [`core::mem::MaybeUninit<Payload>`].
627    ///
628    /// On failure it returns [`LoanError`] describing the failure.
629    ///
630    /// # Example
631    ///
632    /// ```
633    /// use iceoryx2::prelude::*;
634    /// # fn main() -> Result<(), Box<dyn core::error::Error>> {
635    /// # let node = NodeBuilder::new().create::<ipc::Service>()?;
636    /// #
637    /// # let service = node.service_builder(&"My/Funk/ServiceName".try_into()?)
638    /// #     .publish_subscribe::<u64>()
639    /// #     .open_or_create()?;
640    /// #
641    /// # let publisher = service.publisher_builder().create()?;
642    ///
643    /// let mut sample = publisher.loan()?;
644    /// *sample.payload_mut() = 42;
645    ///
646    /// sample.send()?;
647    ///
648    /// # Ok(())
649    /// # }
650    /// ```
651    pub fn loan(&self) -> Result<SampleMut<Service, Payload, UserHeader>, LoanError> {
652        Ok(self.loan_uninit()?.write_payload(Payload::default()))
653    }
654}
655////////////////////////
656// END: typed API
657////////////////////////
658
659////////////////////////
660// BEGIN: sliced API
661////////////////////////
662impl<
663        Service: service::Service,
664        Payload: Default + Debug + ZeroCopySend,
665        UserHeader: Default + Debug + ZeroCopySend,
666    > Publisher<Service, [Payload], UserHeader>
667{
668    /// Loans/allocates a [`crate::sample_mut::SampleMut`] from the underlying data segment of the [`Publisher`]
669    /// and initializes all slice elements with the default value. This can be a performance hit
670    /// and [`Publisher::loan_slice_uninit()`] can be used to loan a slice of
671    /// [`core::mem::MaybeUninit<Payload>`].
672    ///
673    /// On failure it returns [`LoanError`] describing the failure.
674    ///
675    /// # Example
676    ///
677    /// ```
678    /// use iceoryx2::prelude::*;
679    /// # fn main() -> Result<(), Box<dyn core::error::Error>> {
680    /// # let node = NodeBuilder::new().create::<ipc::Service>()?;
681    /// #
682    /// # let service = node.service_builder(&"My/Funk/ServiceName".try_into()?)
683    /// #     .publish_subscribe::<[u64]>()
684    /// #     .open_or_create()?;
685    /// #
686    /// # let publisher = service.publisher_builder()
687    /// #                        .initial_max_slice_len(120)
688    /// #                        .create()?;
689    ///
690    /// let slice_length = 5;
691    /// let mut sample = publisher.loan_slice(slice_length)?;
692    /// sample.payload_mut()[2] = 42;
693    ///
694    /// sample.send()?;
695    ///
696    /// # Ok(())
697    /// # }
698    /// ```
699    pub fn loan_slice(
700        &self,
701        number_of_elements: usize,
702    ) -> Result<SampleMut<Service, [Payload], UserHeader>, LoanError> {
703        let sample = self.loan_slice_uninit(number_of_elements)?;
704        Ok(sample.write_from_fn(|_| Payload::default()))
705    }
706}
707
708impl<
709        Service: service::Service,
710        Payload: Debug + ZeroCopySend,
711        UserHeader: Debug + ZeroCopySend,
712    > Publisher<Service, [Payload], UserHeader>
713{
714    /// Returns the maximum initial slice length configured for this [`Publisher`].
715    pub fn initial_max_slice_len(&self) -> usize {
716        self.publisher_shared_state
717            .lock()
718            .config
719            .initial_max_slice_len
720    }
721}
722
723impl<
724        Service: service::Service,
725        Payload: Debug + ZeroCopySend,
726        UserHeader: Default + Debug + ZeroCopySend,
727    > Publisher<Service, [Payload], UserHeader>
728{
729    /// Loans/allocates a [`SampleMutUninit`] from the underlying data segment of the [`Publisher`].
730    /// The user has to initialize the payload before it can be sent.
731    ///
732    /// On failure it returns [`LoanError`] describing the failure.
733    ///
734    /// # Example
735    ///
736    /// ```
737    /// use iceoryx2::prelude::*;
738    ///
739    /// # let node = NodeBuilder::new().create::<ipc::Service>()?;
740    /// #
741    /// # let service = node.service_builder(&"My/Funk/ServiceName".try_into()?)
742    /// #     .publish_subscribe::<[usize]>()
743    /// #     .open_or_create()?;
744    /// #
745    /// # let publisher = service.publisher_builder()
746    /// #                        .initial_max_slice_len(120)
747    /// #                        .create()?;
748    ///
749    /// let slice_length = 5;
750    /// let sample = publisher.loan_slice_uninit(slice_length)?;
751    /// let sample = sample.write_from_fn(|n| n * 2); // alternatively `sample.payload_mut()` can be use to access the `[MaybeUninit<Payload>]`
752    ///
753    /// sample.send()?;
754    /// # Ok::<_, Box<dyn core::error::Error>>(())
755    /// ```
756    pub fn loan_slice_uninit(
757        &self,
758        slice_len: usize,
759    ) -> Result<SampleMutUninit<Service, [MaybeUninit<Payload>], UserHeader>, LoanError> {
760        // required since Rust does not support generic specializations or negative traits
761        debug_assert!(TypeId::of::<Payload>() != TypeId::of::<CustomPayloadMarker>());
762
763        self.loan_slice_uninit_impl(slice_len, slice_len)
764    }
765
766    fn loan_slice_uninit_impl(
767        &self,
768        slice_len: usize,
769        underlying_number_of_slice_elements: usize,
770    ) -> Result<SampleMutUninit<Service, [MaybeUninit<Payload>], UserHeader>, LoanError> {
771        let shared_state = self.publisher_shared_state.lock();
772        let max_slice_len = shared_state.config.initial_max_slice_len;
773        if shared_state.config.allocation_strategy == AllocationStrategy::Static
774            && max_slice_len < slice_len
775        {
776            fail!(from self, with LoanError::ExceedsMaxLoanSize,
777                "Unable to loan slice with {} elements since it would exceed the max supported slice length of {}.",
778                slice_len, max_slice_len);
779        }
780
781        let sample_layout = shared_state.sender.sample_layout(slice_len);
782        let chunk = shared_state.sender.allocate(sample_layout)?;
783        let user_header_ptr: *mut UserHeader = chunk.user_header.cast();
784        let header_ptr = chunk.header as *mut Header;
785        let node_id = shared_state.sender.service_state.shared_node.id();
786        unsafe { header_ptr.write(Header::new(*node_id, self.id(), slice_len as _)) };
787        unsafe { user_header_ptr.write(UserHeader::default()) };
788
789        let sample = unsafe {
790            RawSampleMut::new_unchecked(
791                header_ptr,
792                user_header_ptr,
793                core::slice::from_raw_parts_mut(
794                    chunk.payload.cast(),
795                    underlying_number_of_slice_elements,
796                ),
797            )
798        };
799
800        Ok(
801            SampleMutUninit::<Service, [MaybeUninit<Payload>], UserHeader>::new(
802                &self.publisher_shared_state,
803                sample,
804                chunk.offset,
805                chunk.size,
806            ),
807        )
808    }
809}
810
811impl<Service: service::Service> Publisher<Service, [CustomPayloadMarker], CustomHeaderMarker> {
812    /// # Safety
813    ///
814    ///  * slice_len != 1 only when payload TypeVariant == Dynamic
815    ///  * The number_of_elements in the [`Header`](crate::service::header::publish_subscribe::Header)
816    ///     is set to `slice_len`
817    ///  * The [`SampleMutUninit`] will contain `slice_len` * `MessageTypeDetails::payload.size`
818    ///     elements of type [`CustomPayloadMarker`].
819    #[doc(hidden)]
820    pub unsafe fn loan_custom_payload(
821        &self,
822        slice_len: usize,
823    ) -> Result<
824        SampleMutUninit<Service, [MaybeUninit<CustomPayloadMarker>], CustomHeaderMarker>,
825        LoanError,
826    > {
827        let shared_state = self.publisher_shared_state.lock();
828
829        // TypeVariant::Dynamic == slice and only here it makes sense to loan more than one element
830        debug_assert!(
831            slice_len == 1 || shared_state.sender.payload_type_variant() == TypeVariant::Dynamic
832        );
833
834        self.loan_slice_uninit_impl(slice_len, shared_state.sender.payload_size() * slice_len)
835    }
836}
837////////////////////////
838// END: sliced API
839////////////////////////
840
841impl<
842        Service: service::Service,
843        Payload: Debug + ZeroCopySend + ?Sized,
844        UserHeader: Debug + ZeroCopySend,
845    > UpdateConnections for Publisher<Service, Payload, UserHeader>
846{
847    fn update_connections(&self) -> Result<(), ConnectionFailure> {
848        self.publisher_shared_state.lock().update_connections()
849    }
850}