use super::request_response::PortFactory;
use crate::{
port::{
BackpressureFn, BackpressureHandler, DegradationAction, DegradationFn, DegradationHandler,
server::Server,
},
prelude::BackpressureStrategy,
service,
};
use alloc::format;
use core::fmt::Debug;
use iceoryx2_bb_elementary_traits::zero_copy_send::ZeroCopySend;
use iceoryx2_cal::shm_allocator::AllocationStrategy;
use iceoryx2_log::{fail, warn};
use tiny_fn::tiny_fn;
#[derive(Debug, Clone, Copy)]
pub(crate) struct LocalServerConfig {
pub(crate) backpressure_strategy: BackpressureStrategy,
pub(crate) initial_max_slice_len: usize,
pub(crate) allocation_strategy: AllocationStrategy,
pub(crate) max_loaned_responses_per_request: usize,
}
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
pub enum ServerCreateError {
ExceedsMaxSupportedServers,
UnableToCreateDataSegment,
FailedToDeployThreadsafetyPolicy,
UnableToCreatePortTag,
}
impl core::fmt::Display for ServerCreateError {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
write!(f, "ServerCreateError::{self:?}")
}
}
impl core::error::Error for ServerCreateError {}
tiny_fn! {
pub struct PreallocatedResponseOverride = Fn(number_of_preallocated_responses: usize) -> usize;
}
impl<'a> Debug for PreallocatedResponseOverride<'a> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
write!(f, "PreallocatedResponseOverride")
}
}
#[derive(Debug)]
pub struct PortFactoryServer<
'factory,
Service: service::Service,
RequestPayload: Debug + ZeroCopySend + ?Sized,
RequestHeader: Debug + ZeroCopySend,
ResponsePayload: Debug + ZeroCopySend + ?Sized,
ResponseHeader: Debug + ZeroCopySend,
> {
pub(crate) factory: &'factory PortFactory<
Service,
RequestPayload,
RequestHeader,
ResponsePayload,
ResponseHeader,
>,
pub(crate) config: LocalServerConfig,
pub(crate) request_degradation_handler: DegradationHandler<'static>,
pub(crate) response_degradation_handler: DegradationHandler<'static>,
pub(crate) backpressure_handler: Option<BackpressureHandler<'static>>,
pub(crate) preallocated_number_of_responses_override: PreallocatedResponseOverride<'static>,
}
unsafe impl<
Service: service::Service,
RequestPayload: Debug + ZeroCopySend + ?Sized,
RequestHeader: Debug + ZeroCopySend,
ResponsePayload: Debug + ZeroCopySend + ?Sized,
ResponseHeader: Debug + ZeroCopySend,
> Send
for PortFactoryServer<
'_,
Service,
RequestPayload,
RequestHeader,
ResponsePayload,
ResponseHeader,
>
{
}
impl<
'factory,
Service: service::Service,
RequestPayload: Debug + ZeroCopySend + ?Sized,
RequestHeader: Debug + ZeroCopySend,
ResponsePayload: Debug + ZeroCopySend + ?Sized,
ResponseHeader: Debug + ZeroCopySend,
>
PortFactoryServer<
'factory,
Service,
RequestPayload,
RequestHeader,
ResponsePayload,
ResponseHeader,
>
{
#[doc(hidden)]
pub unsafe fn __internal_partial_clone(&self) -> Self {
Self {
factory: self.factory,
config: self.config,
request_degradation_handler: DegradationHandler::new_with(DegradationAction::Warn),
response_degradation_handler: DegradationHandler::new_with(DegradationAction::Warn),
backpressure_handler: None,
preallocated_number_of_responses_override: PreallocatedResponseOverride::new(|v| v),
}
}
pub(crate) fn new(
factory: &'factory PortFactory<
Service,
RequestPayload,
RequestHeader,
ResponsePayload,
ResponseHeader,
>,
) -> Self {
let defs = &factory
.service
.shared_node()
.config()
.defaults
.request_response;
Self {
factory,
config: LocalServerConfig {
backpressure_strategy: defs.server_backpressure_strategy,
initial_max_slice_len: 1,
allocation_strategy: defs.server_allocation_strategy,
max_loaned_responses_per_request: defs.server_max_loaned_responses_per_request,
},
request_degradation_handler: DegradationHandler::new_with(DegradationAction::Warn),
response_degradation_handler: DegradationHandler::new_with(DegradationAction::Warn),
backpressure_handler: None,
preallocated_number_of_responses_override: PreallocatedResponseOverride::new(|v| v),
}
}
pub fn override_response_preallocation<F: Fn(usize) -> usize + 'static>(
mut self,
callback: F,
) -> Self {
self.preallocated_number_of_responses_override =
PreallocatedResponseOverride::new(move |v| callback(v).clamp(1, v));
self
}
pub fn backpressure_strategy(mut self, value: BackpressureStrategy) -> Self {
self.config.backpressure_strategy = value;
self
}
pub fn max_loaned_responses_per_request(mut self, value: usize) -> Self {
if value == 0 {
warn!(from self,
"A value of 0 is not allowed for max loaned responses per request. Adjusting it to 1.");
}
self.config.max_loaned_responses_per_request = value.max(1);
self
}
pub fn set_request_degradation_handler<F: DegradationFn + 'static>(
mut self,
handler: F,
) -> Self {
self.request_degradation_handler = DegradationHandler::new(handler);
self
}
pub fn set_response_degradation_handler<F: DegradationFn + 'static>(
mut self,
handler: F,
) -> Self {
self.response_degradation_handler = DegradationHandler::new(handler);
self
}
pub fn set_backpressure_handler<F: BackpressureFn + 'static>(mut self, handler: F) -> Self {
self.backpressure_handler = Some(BackpressureHandler::new(handler));
self
}
pub fn create(
self,
) -> Result<
Server<Service, RequestPayload, RequestHeader, ResponsePayload, ResponseHeader>,
ServerCreateError,
> {
let origin = format!("{self:?}");
Ok(fail!(from origin,
when Server::new(self),
"Failed to create new Server port."))
}
}
impl<
Service: service::Service,
RequestPayload: Debug + ZeroCopySend + ?Sized,
RequestHeader: Debug + ZeroCopySend,
ResponsePayload: Debug + ZeroCopySend,
ResponseHeader: Debug + ZeroCopySend,
> PortFactoryServer<'_, Service, RequestPayload, RequestHeader, [ResponsePayload], ResponseHeader>
{
pub fn initial_max_slice_len(mut self, value: usize) -> Self {
self.config.initial_max_slice_len = value;
self
}
pub fn allocation_strategy(mut self, value: AllocationStrategy) -> Self {
self.config.allocation_strategy = value;
self
}
}