use crate::{
port::{
BackpressureFn, BackpressureHandler, DegradationAction, DegradationFn, DegradationHandler,
backpressure_strategy::BackpressureStrategy,
publisher::{Publisher, PublisherCreateError},
},
service,
};
use alloc::format;
use core::fmt::Debug;
use iceoryx2_bb_elementary_traits::zero_copy_send::ZeroCopySend;
use iceoryx2_cal::shm_allocator::AllocationStrategy;
use iceoryx2_log::fail;
use tiny_fn::tiny_fn;
use super::publish_subscribe::PortFactory;
tiny_fn! {
pub struct PreallocatedSamplesOverride = Fn(number_of_preallocated_samples: usize) -> usize;
}
impl<'a> Debug for PreallocatedSamplesOverride<'a> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
write!(f, "PreallocatedSamplesOverride")
}
}
#[derive(Debug, Clone, Copy)]
pub(crate) struct LocalPublisherConfig {
pub(crate) max_loaned_samples: usize,
pub(crate) backpressure_strategy: BackpressureStrategy,
pub(crate) initial_max_slice_len: usize,
pub(crate) allocation_strategy: AllocationStrategy,
}
#[derive(Debug)]
pub struct PortFactoryPublisher<
'factory,
Service: service::Service,
Payload: Debug + ZeroCopySend + ?Sized,
UserHeader: Debug + ZeroCopySend,
> {
pub(crate) config: LocalPublisherConfig,
pub(crate) degradation_handler: DegradationHandler<'static>,
pub(crate) backpressure_handler: Option<BackpressureHandler<'static>>,
pub(crate) preallocate_number_of_samples_override: PreallocatedSamplesOverride<'static>,
pub(crate) factory: &'factory PortFactory<Service, Payload, UserHeader>,
}
unsafe impl<
Service: service::Service,
Payload: Debug + ZeroCopySend + ?Sized,
UserHeader: Debug + ZeroCopySend,
> Send for PortFactoryPublisher<'_, Service, Payload, UserHeader>
{
}
impl<
Service: service::Service,
Payload: Debug + ZeroCopySend + ?Sized,
UserHeader: Debug + ZeroCopySend,
> PortFactoryPublisher<'_, Service, Payload, UserHeader>
{
#[doc(hidden)]
pub unsafe fn __internal_partial_clone(&self) -> Self {
Self {
config: self.config,
factory: self.factory,
degradation_handler: DegradationHandler::new_with(DegradationAction::Warn),
backpressure_handler: None,
preallocate_number_of_samples_override: PreallocatedSamplesOverride::new(|v| v),
}
}
}
impl<
'factory,
Service: service::Service,
Payload: Debug + ZeroCopySend + ?Sized,
UserHeader: Debug + ZeroCopySend,
> PortFactoryPublisher<'factory, Service, Payload, UserHeader>
{
pub(crate) fn new(factory: &'factory PortFactory<Service, Payload, UserHeader>) -> Self {
let defaults = &factory
.service
.shared_node()
.config()
.defaults
.publish_subscribe;
Self {
config: LocalPublisherConfig {
allocation_strategy: defaults.publisher_allocation_strategy,
initial_max_slice_len: 1,
max_loaned_samples: defaults.publisher_max_loaned_samples,
backpressure_strategy: defaults.backpressure_strategy,
},
degradation_handler: DegradationHandler::new_with(DegradationAction::Warn),
backpressure_handler: None,
preallocate_number_of_samples_override: PreallocatedSamplesOverride::new(|v| v),
factory,
}
}
pub fn override_sample_preallocation<F: Fn(usize) -> usize + 'static>(
mut self,
callback: F,
) -> Self {
self.preallocate_number_of_samples_override =
PreallocatedSamplesOverride::new(move |v| callback(v).clamp(1, v));
self
}
pub fn max_loaned_samples(mut self, value: usize) -> Self {
self.config.max_loaned_samples = value;
self
}
pub fn backpressure_strategy(mut self, value: BackpressureStrategy) -> Self {
self.config.backpressure_strategy = value;
self
}
pub fn set_degradation_handler<F: DegradationFn + 'static>(mut self, handler: F) -> Self {
self.degradation_handler = DegradationHandler::new(handler);
self
}
pub fn set_backpressure_handler<F: BackpressureFn + 'static>(mut self, handler: F) -> Self {
self.backpressure_handler = Some(BackpressureHandler::new(handler));
self
}
pub fn create(self) -> Result<Publisher<Service, Payload, UserHeader>, PublisherCreateError> {
let origin = format!("{self:?}");
Ok(fail!(from origin, when Publisher::new(self),
"Failed to create new Publisher port."))
}
}
impl<Service: service::Service, Payload: Debug + ZeroCopySend, UserHeader: Debug + ZeroCopySend>
PortFactoryPublisher<'_, Service, [Payload], UserHeader>
{
pub fn initial_max_slice_len(mut self, value: usize) -> Self {
self.config.initial_max_slice_len = value;
self
}
pub fn allocation_strategy(mut self, value: AllocationStrategy) -> Self {
self.config.allocation_strategy = value;
self
}
}