use core::ptr::NonNull;
use core::{any::TypeId, fmt::Debug, marker::PhantomData, mem::MaybeUninit};
use iceoryx2_bb_container::{queue::Queue, slotmap::SlotMap, vector::polymorphic_vec::*};
use iceoryx2_bb_concurrency::atomic::Ordering;
use iceoryx2_bb_concurrency::atomic::{AtomicBool, AtomicU64, AtomicUsize};
use iceoryx2_bb_concurrency::cell::UnsafeCell;
use iceoryx2_bb_elementary::{CallbackProgression, cyclic_tagger::CyclicTagger};
use iceoryx2_bb_elementary_traits::non_null::NonNullCompat;
use iceoryx2_bb_elementary_traits::testing::abandonable::Abandonable;
use iceoryx2_bb_elementary_traits::zero_copy_send::ZeroCopySend;
use iceoryx2_bb_lock_free::mpmc::container::{ContainerHandle, ContainerState};
use iceoryx2_bb_memory::heap_allocator::HeapAllocator;
use iceoryx2_cal::zero_copy_connection::{CHANNEL_STATE_CLOSED, CHANNEL_STATE_OPEN};
use iceoryx2_cal::{
arc_sync_policy::ArcSyncPolicy,
dynamic_storage::DynamicStorage,
shm_allocator::{AllocationStrategy, PointerOffset},
zero_copy_connection::ChannelId,
};
use iceoryx2_log::{fail, fatal_panic, warn};
use crate::active_request::RequestId;
use crate::{
identifiers::UniqueClientId,
pending_response::PendingResponse,
port::{details::data_segment::DataSegment, update_connections::UpdateConnections},
prelude::{BackpressureStrategy, PortFactory},
raw_sample::RawSampleMut,
request_mut::RequestMut,
request_mut_uninit::RequestMutUninit,
service::{
self,
builder::{CustomHeaderMarker, CustomPayloadMarker},
dynamic_config::request_response::{ClientDetails, ServerDetails},
header,
naming_scheme::data_segment_name,
port_factory::client::{ClientCreateError, LocalClientConfig, PortFactoryClient},
static_config::message_type_details::TypeVariant,
},
};
use super::{
LoanError, SendError,
details::{
data_segment::DataSegmentType,
receiver::{Receiver, SenderDetails},
segment_state::SegmentState,
sender::{ReceiverDetails, Sender},
},
update_connections::ConnectionFailure,
};
#[derive(Debug, Eq, PartialEq, Copy, Clone)]
pub enum RequestSendError {
ExceedsMaxActiveRequests,
SendError(SendError),
}
impl From<SendError> for RequestSendError {
fn from(value: SendError) -> Self {
RequestSendError::SendError(value)
}
}
impl From<LoanError> for RequestSendError {
fn from(value: LoanError) -> Self {
RequestSendError::SendError(SendError::LoanError(value))
}
}
impl From<ConnectionFailure> for RequestSendError {
fn from(value: ConnectionFailure) -> Self {
RequestSendError::SendError(SendError::ConnectionError(value))
}
}
impl core::fmt::Display for RequestSendError {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
write!(f, "RequestSendError::{self:?}")
}
}
impl core::error::Error for RequestSendError {}
#[derive(Debug)]
pub(crate) struct ClientSharedState<Service: service::Service> {
pub(crate) config: LocalClientConfig,
pub(crate) request_sender: Sender<Service>,
pub(crate) response_receiver: Receiver<Service>,
client_handle: UnsafeCell<Option<ContainerHandle>>,
server_list_state: UnsafeCell<ContainerState<ServerDetails>>,
pub(crate) active_request_counter: AtomicUsize,
pub(crate) available_channel_ids: UnsafeCell<Queue<ChannelId>>,
port_tag: Service::StaticStorage,
}
impl<Service: service::Service> Abandonable for ClientSharedState<Service> {
unsafe fn abandon_in_place(mut this: NonNull<Self>) {
let this = unsafe { this.as_mut() };
unsafe { Sender::abandon_in_place(NonNull::iox2_from_mut(&mut this.request_sender)) };
unsafe { Receiver::abandon_in_place(NonNull::iox2_from_mut(&mut this.response_receiver)) };
unsafe {
Service::StaticStorage::abandon_in_place(NonNull::iox2_from_mut(&mut this.port_tag))
};
}
}
impl<Service: service::Service> Drop for ClientSharedState<Service> {
fn drop(&mut self) {
if let Some(handle) = unsafe { *self.client_handle.get() } {
self.request_sender
.service_state
.dynamic_storage()
.get()
.request_response()
.release_client_handle(handle)
}
}
}
impl<Service: service::Service> ClientSharedState<Service> {
fn prepare_channel_to_receive_responses(&self, channel_id: ChannelId, request_id: RequestId) {
self.response_receiver
.set_channel_state(channel_id, request_id);
}
pub(crate) fn send_request(
&self,
offset: PointerOffset,
sample_size: usize,
channel_id: ChannelId,
request_id: RequestId,
) -> Result<usize, RequestSendError> {
let msg = "Unable to send request";
let active_request_counter = self.active_request_counter.load(Ordering::Relaxed);
if self
.request_sender
.service_state
.static_config()
.request_response()
.max_active_requests_per_client
<= active_request_counter
{
fail!(from self, with RequestSendError::ExceedsMaxActiveRequests,
"{} since the number of active requests is limited to {} and sending this request would exceed the limit.", msg, active_request_counter);
}
fail!(from self, when self.update_connections(),
"{} since the connections could not be updated.", msg);
self.prepare_channel_to_receive_responses(channel_id, request_id);
self.active_request_counter.fetch_add(1, Ordering::Relaxed);
Ok(self.request_sender.deliver_offset(
offset,
sample_size,
ChannelId::new(0),
)?)
}
pub(crate) fn update_connections(
&self,
) -> Result<(), super::update_connections::ConnectionFailure> {
if unsafe {
self.request_sender
.service_state
.dynamic_storage()
.get()
.request_response()
.servers
.update_state(&mut *self.server_list_state.get())
} {
fail!(from self, when self.force_update_connections(),
"Connections were updated only partially since at least one connection to a Server port failed.");
}
Ok(())
}
fn force_update_connections(&self) -> Result<(), ConnectionFailure> {
let mut result = Ok(());
self.request_sender.start_update_connection_cycle();
self.response_receiver.start_update_connection_cycle();
unsafe {
(*self.server_list_state.get()).for_each(|index, port| {
let inner_result = self.response_receiver.update_connection(
index,
SenderDetails {
port_id: port.server_id.value(),
max_number_of_segments: port.max_number_of_segments,
data_segment_type: port.data_segment_type,
number_of_samples: port.number_of_responses,
},
);
result = result.and(inner_result);
let inner_result = self.request_sender.update_connection(
index,
ReceiverDetails {
port_id: port.server_id.value(),
buffer_size: port.request_buffer_size,
},
|_| {},
);
if let Some(err) = inner_result.err() {
result = result.and(Err(err.into()));
}
CallbackProgression::Continue
})
};
self.response_receiver.finish_update_connection_cycle();
self.request_sender.finish_update_connection_cycle();
result
}
}
#[derive(Debug)]
pub struct Client<
Service: service::Service,
RequestPayload: Debug + ZeroCopySend + ?Sized,
RequestHeader: Debug + ZeroCopySend,
ResponsePayload: Debug + ZeroCopySend + ?Sized,
ResponseHeader: Debug + ZeroCopySend,
> {
client_id: UniqueClientId,
client_shared_state: Service::ArcThreadSafetyPolicy<ClientSharedState<Service>>,
request_id_counter: AtomicU64,
_request_payload: PhantomData<RequestPayload>,
_request_header: PhantomData<RequestHeader>,
_response_payload: PhantomData<ResponsePayload>,
_response_header: PhantomData<ResponseHeader>,
}
impl<
Service: service::Service,
RequestPayload: Debug + ZeroCopySend + ?Sized,
RequestHeader: Debug + ZeroCopySend,
ResponsePayload: Debug + ZeroCopySend + ?Sized,
ResponseHeader: Debug + ZeroCopySend,
> Abandonable for Client<Service, RequestPayload, RequestHeader, ResponsePayload, ResponseHeader>
{
unsafe fn abandon_in_place(mut this: NonNull<Self>) {
let this = unsafe { this.as_mut() };
unsafe {
Service::ArcThreadSafetyPolicy::abandon_in_place(NonNull::iox2_from_mut(
&mut this.client_shared_state,
))
};
}
}
unsafe impl<
Service: service::Service,
RequestPayload: Debug + ZeroCopySend + ?Sized,
RequestHeader: Debug + ZeroCopySend,
ResponsePayload: Debug + ZeroCopySend + ?Sized,
ResponseHeader: Debug + ZeroCopySend,
> Send for Client<Service, RequestPayload, RequestHeader, ResponsePayload, ResponseHeader>
where
Service::ArcThreadSafetyPolicy<ClientSharedState<Service>>: Send + Sync,
{
}
unsafe impl<
Service: service::Service,
RequestPayload: Debug + ZeroCopySend + ?Sized,
RequestHeader: Debug + ZeroCopySend,
ResponsePayload: Debug + ZeroCopySend + ?Sized,
ResponseHeader: Debug + ZeroCopySend,
> Sync for Client<Service, RequestPayload, RequestHeader, ResponsePayload, ResponseHeader>
where
Service::ArcThreadSafetyPolicy<ClientSharedState<Service>>: Send + Sync,
{
}
impl<
Service: service::Service,
RequestPayload: Debug + ZeroCopySend + ?Sized,
RequestHeader: Debug + ZeroCopySend,
ResponsePayload: Debug + ZeroCopySend + ?Sized,
ResponseHeader: Debug + ZeroCopySend,
> Client<Service, RequestPayload, RequestHeader, ResponsePayload, ResponseHeader>
{
pub(crate) fn new(
client_factory: PortFactoryClient<
Service,
RequestPayload,
RequestHeader,
ResponsePayload,
ResponseHeader,
>,
) -> Result<Self, ClientCreateError> {
let msg = "Unable to create Client port";
let origin = "Client::new()";
let service = &client_factory.factory.service;
let client_id = UniqueClientId::new();
let port_tag = match service
.shared_node()
.create_port_tag(origin, msg, client_id.0.value())
{
Ok(port_tag) => port_tag,
Err(e) => {
fail!(from origin, with ClientCreateError::UnableToCreatePortTag,
"{msg} since the port tag, that is required for cleanup, could not be created. [{e:?}]");
}
};
let static_config = client_factory.factory.static_config();
let number_of_requests =
unsafe { service.static_config().messaging_pattern.request_response() }
.required_amount_of_chunks_per_client_data_segment(
static_config.max_loaned_requests,
);
let number_of_requests = client_factory
.preallocate_number_of_requests_override
.call(number_of_requests);
let server_list = &service.dynamic_storage().get().request_response().servers;
let global_config = service.shared_node().config();
let segment_name = data_segment_name(client_id.value());
let data_segment_type = DataSegmentType::new_from_allocation_strategy(
client_factory.config.allocation_strategy,
);
let max_number_of_segments =
DataSegment::<Service>::max_number_of_segments(data_segment_type);
let sample_layout = static_config
.request_message_type_details
.sample_layout(client_factory.config.initial_max_slice_len);
let data_segment = match data_segment_type {
DataSegmentType::Static => DataSegment::<Service>::create_static_segment(
&segment_name,
sample_layout,
global_config,
number_of_requests,
),
DataSegmentType::Dynamic => DataSegment::<Service>::create_dynamic_segment(
&segment_name,
sample_layout,
global_config,
number_of_requests,
client_factory.config.allocation_strategy,
),
};
let data_segment = fail!(from origin,
when data_segment,
with ClientCreateError::UnableToCreateDataSegment,
"{} since the client data segment could not be created.", msg);
let client_details = ClientDetails {
client_id,
node_id: *service.shared_node().id(),
number_of_requests,
response_buffer_size: static_config.max_response_buffer_size,
max_slice_len: client_factory.config.initial_max_slice_len,
data_segment_type,
max_number_of_segments,
};
let request_sender = Sender {
data_segment,
segment_states: {
let mut v =
alloc::vec::Vec::<SegmentState>::with_capacity(max_number_of_segments as usize);
for _ in 0..max_number_of_segments {
v.push(SegmentState::new(number_of_requests))
}
v
},
sender_port_id: client_id.value(),
shared_node: service.shared_node().clone(),
connections: (0..server_list.capacity())
.map(|_| UnsafeCell::new(None))
.collect(),
receiver_max_buffer_size: static_config.max_active_requests_per_client,
receiver_max_borrowed_samples: static_config.max_active_requests_per_client,
enable_safe_overflow: static_config.enable_safe_overflow_for_requests,
degradation_handler: client_factory.request_degradation_handler,
backpressure_handler: client_factory.backpressure_handler,
number_of_samples: number_of_requests,
max_number_of_segments,
service_state: service.clone(),
tagger: CyclicTagger::new(),
loan_counter: AtomicUsize::new(0),
sender_max_borrowed_samples: static_config.max_loaned_requests,
backpressure_strategy: client_factory.config.backpressure_strategy,
message_type_details: static_config.request_message_type_details,
number_of_channels: 1,
initial_channel_state: CHANNEL_STATE_OPEN,
};
let number_of_to_be_removed_connections = service
.shared_node()
.config()
.defaults
.request_response
.client_expired_connection_buffer;
let number_of_active_connections = server_list.capacity();
let number_of_connections =
number_of_to_be_removed_connections + number_of_active_connections;
let response_receiver = Receiver {
connections: PolymorphicVec::from_fn(
HeapAllocator::global(),
number_of_active_connections,
|_| UnsafeCell::new(None),
)
.expect("Heap allocator provides memory."),
receiver_port_id: client_id.value(),
service_state: service.clone(),
buffer_size: static_config.max_response_buffer_size,
tagger: CyclicTagger::new(),
to_be_removed_connections: Some(UnsafeCell::new(
PolymorphicVec::new(HeapAllocator::global(), number_of_to_be_removed_connections)
.expect("Heap allocator provides memory."),
)),
degradation_handler: client_factory.response_degradation_handler,
message_type_details: static_config.response_message_type_details,
receiver_max_borrowed_samples: static_config
.max_borrowed_responses_per_pending_response,
enable_safe_overflow: static_config.enable_safe_overflow_for_responses,
number_of_channels: number_of_requests,
connection_storage: UnsafeCell::new(SlotMap::new(number_of_connections)),
initial_channel_state: CHANNEL_STATE_CLOSED,
};
let client_shared_state = Service::ArcThreadSafetyPolicy::new(ClientSharedState {
port_tag,
config: client_factory.config,
client_handle: UnsafeCell::new(None),
available_channel_ids: {
let mut queue = Queue::new(number_of_requests);
for n in 0..number_of_requests {
queue.push(ChannelId::new(n));
}
UnsafeCell::new(queue)
},
request_sender,
response_receiver,
server_list_state: UnsafeCell::new(unsafe { server_list.get_state() }),
active_request_counter: AtomicUsize::new(0),
});
let client_shared_state = match client_shared_state {
Ok(v) => v,
Err(e) => {
fail!(from origin, with ClientCreateError::FailedToDeployThreadsafetyPolicy,
"{msg} since the threadsafety policy could not be instantiated ({e:?}).");
}
};
let new_self = Self {
request_id_counter: AtomicU64::new(0),
client_shared_state,
client_id,
_request_payload: PhantomData,
_request_header: PhantomData,
_response_payload: PhantomData,
_response_header: PhantomData,
};
if let Err(e) = new_self
.client_shared_state
.lock()
.force_update_connections()
{
warn!(from new_self,
"The new Client port is unable to connect to every Server port, caused by {:?}.", e);
}
core::sync::atomic::compiler_fence(Ordering::SeqCst);
unsafe {
*new_self.client_shared_state.lock().client_handle.get() = match service
.dynamic_storage()
.get()
.request_response()
.add_client_id(client_details)
{
Some(handle) => Some(handle),
None => {
fail!(from origin,
with ClientCreateError::ExceedsMaxSupportedClients,
"{} since it would exceed the maximum support amount of clients of {}.",
msg, service.static_config().request_response().max_clients());
}
}
};
Ok(new_self)
}
pub fn id(&self) -> UniqueClientId {
self.client_id
}
fn next_request_id(&self) -> RequestId {
RequestId::new(
self.request_id_counter
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |v| {
Some((v + 1) % RequestId::max_value())
})
.expect("We return some, therefore the Result always contains a value."),
)
.expect("With modulo RequestId::max_value() when incrementing the request id we ensure that the value is always in bounds")
}
pub fn backpressure_strategy(&self) -> BackpressureStrategy {
self.client_shared_state
.lock()
.request_sender
.backpressure_strategy
}
}
impl<
Service: service::Service,
RequestPayload: Debug + ZeroCopySend + ?Sized,
RequestHeader: Debug + ZeroCopySend,
ResponsePayload: Debug + ZeroCopySend + ?Sized,
ResponseHeader: Debug + ZeroCopySend,
> UpdateConnections
for Client<Service, RequestPayload, RequestHeader, ResponsePayload, ResponseHeader>
{
fn update_connections(&self) -> Result<(), ConnectionFailure> {
self.client_shared_state.lock().update_connections()
}
}
impl<
Service: service::Service,
RequestPayload: Debug + ZeroCopySend,
RequestHeader: Default + Debug + ZeroCopySend,
ResponsePayload: Debug + ZeroCopySend + ?Sized,
ResponseHeader: Debug + ZeroCopySend,
> Client<Service, RequestPayload, RequestHeader, ResponsePayload, ResponseHeader>
{
pub fn loan_uninit(
&self,
) -> Result<
RequestMutUninit<
Service,
MaybeUninit<RequestPayload>,
RequestHeader,
ResponsePayload,
ResponseHeader,
>,
LoanError,
> {
let client_shared_state = self.client_shared_state.lock();
let chunk = client_shared_state
.request_sender
.allocate(client_shared_state.request_sender.sample_layout(1))?;
let channel_id =
match unsafe { &mut *client_shared_state.available_channel_ids.get() }.pop() {
Some(channel_id) => channel_id,
None => {
fatal_panic!(from self,
"This should never happen! There are no more available response channels.");
}
};
let header_ptr: *mut service::header::request_response::RequestHeader = chunk.header.cast();
let user_header_ptr: *mut RequestHeader = chunk.user_header.cast();
unsafe {
header_ptr.write(service::header::request_response::RequestHeader {
node_id: *client_shared_state.request_sender.shared_node.id(),
client_id: self.id(),
channel_id,
request_id: self.next_request_id(),
number_of_elements: 1,
})
};
unsafe { user_header_ptr.write(RequestHeader::default()) };
let ptr = unsafe {
RawSampleMut::<
service::header::request_response::RequestHeader,
RequestHeader,
MaybeUninit<RequestPayload>,
>::new_unchecked(header_ptr, user_header_ptr, chunk.payload.cast())
};
Ok(RequestMutUninit {
request: RequestMut {
ptr,
sample_size: chunk.size,
channel_id,
offset_to_chunk: chunk.offset,
client_shared_state: self.client_shared_state.clone(),
_response_payload: PhantomData,
_response_header: PhantomData,
was_sample_sent: AtomicBool::new(false),
},
})
}
pub fn send_copy(
&self,
value: RequestPayload,
) -> Result<
PendingResponse<Service, RequestPayload, RequestHeader, ResponsePayload, ResponseHeader>,
RequestSendError,
> {
let msg = "Unable to send copy of request";
let request = fail!(from self,
when self.loan_uninit(),
"{} since the loan of the request failed.", msg);
request.write_payload(value).send()
}
}
impl<
Service: service::Service,
RequestPayload: Debug + Default + ZeroCopySend,
RequestHeader: Default + Debug + ZeroCopySend,
ResponsePayload: Debug + ZeroCopySend + ?Sized,
ResponseHeader: Debug + ZeroCopySend,
> Client<Service, RequestPayload, RequestHeader, ResponsePayload, ResponseHeader>
{
pub fn loan(
&self,
) -> Result<
RequestMut<Service, RequestPayload, RequestHeader, ResponsePayload, ResponseHeader>,
LoanError,
> {
Ok(self.loan_uninit()?.write_payload(RequestPayload::default()))
}
}
impl<
Service: service::Service,
RequestPayload: Default + Debug + ZeroCopySend + 'static,
RequestHeader: Default + Debug + ZeroCopySend,
ResponsePayload: Debug + ZeroCopySend + ?Sized,
ResponseHeader: Debug + ZeroCopySend,
> Client<Service, [RequestPayload], RequestHeader, ResponsePayload, ResponseHeader>
{
pub fn loan_slice(
&self,
slice_len: usize,
) -> Result<
RequestMut<Service, [RequestPayload], RequestHeader, ResponsePayload, ResponseHeader>,
LoanError,
> {
let request = self.loan_slice_uninit(slice_len)?;
Ok(request.write_from_fn(|_| RequestPayload::default()))
}
}
impl<
Service: service::Service,
RequestPayload: Debug + ZeroCopySend + 'static,
RequestHeader: Debug + ZeroCopySend,
ResponsePayload: Debug + ZeroCopySend + ?Sized,
ResponseHeader: Debug + ZeroCopySend,
> Client<Service, [RequestPayload], RequestHeader, ResponsePayload, ResponseHeader>
{
pub fn initial_max_slice_len(&self) -> usize {
self.client_shared_state.lock().config.initial_max_slice_len
}
}
impl<
Service: service::Service,
RequestPayload: Debug + ZeroCopySend + 'static,
RequestHeader: Default + Debug + ZeroCopySend,
ResponsePayload: Debug + ZeroCopySend + ?Sized,
ResponseHeader: Debug + ZeroCopySend,
> Client<Service, [RequestPayload], RequestHeader, ResponsePayload, ResponseHeader>
{
#[allow(clippy::type_complexity)] pub fn loan_slice_uninit(
&self,
slice_len: usize,
) -> Result<
RequestMutUninit<
Service,
[MaybeUninit<RequestPayload>],
RequestHeader,
ResponsePayload,
ResponseHeader,
>,
LoanError,
> {
debug_assert!(TypeId::of::<RequestPayload>() != TypeId::of::<CustomPayloadMarker>());
unsafe { self.loan_slice_uninit_impl(slice_len, slice_len) }
}
#[allow(clippy::type_complexity)] unsafe fn loan_slice_uninit_impl(
&self,
slice_len: usize,
underlying_number_of_slice_elements: usize,
) -> Result<
RequestMutUninit<
Service,
[MaybeUninit<RequestPayload>],
RequestHeader,
ResponsePayload,
ResponseHeader,
>,
LoanError,
> {
let client_shared_state = self.client_shared_state.lock();
let max_slice_len = client_shared_state.config.initial_max_slice_len;
if client_shared_state.config.allocation_strategy == AllocationStrategy::Static
&& max_slice_len < slice_len
{
fail!(from self, with LoanError::ExceedsMaxLoanSize,
"Unable to loan slice with {} elements since it would exceed the max supported slice length of {}.",
slice_len, max_slice_len);
}
let request_layout = client_shared_state.request_sender.sample_layout(slice_len);
let chunk = client_shared_state
.request_sender
.allocate(request_layout)?;
let channel_id =
match unsafe { &mut *client_shared_state.available_channel_ids.get() }.pop() {
Some(channel_id) => channel_id,
None => {
fatal_panic!(from self,
"This should never happen! There are no more available response channels.");
}
};
let user_header_ptr: *mut RequestHeader = chunk.user_header.cast();
let header_ptr = chunk.header as *mut header::request_response::RequestHeader;
unsafe {
header_ptr.write(header::request_response::RequestHeader {
node_id: *client_shared_state.request_sender.shared_node.id(),
client_id: self.id(),
channel_id,
request_id: self.next_request_id(),
number_of_elements: slice_len as _,
})
};
unsafe { user_header_ptr.write(RequestHeader::default()) };
let ptr = unsafe {
RawSampleMut::<
service::header::request_response::RequestHeader,
RequestHeader,
[MaybeUninit<RequestPayload>],
>::new_unchecked(
header_ptr,
user_header_ptr,
core::ptr::slice_from_raw_parts_mut(
chunk.payload.cast(),
underlying_number_of_slice_elements,
),
)
};
Ok(RequestMutUninit {
request: RequestMut {
ptr,
sample_size: chunk.size,
channel_id,
offset_to_chunk: chunk.offset,
client_shared_state: self.client_shared_state.clone(),
_response_payload: PhantomData,
_response_header: PhantomData,
was_sample_sent: AtomicBool::new(false),
},
})
}
}
impl<Service: service::Service>
Client<
Service,
[CustomPayloadMarker],
CustomHeaderMarker,
[CustomPayloadMarker],
CustomHeaderMarker,
>
{
#[doc(hidden)]
#[allow(clippy::type_complexity)] pub unsafe fn loan_custom_payload(
&self,
slice_len: usize,
) -> Result<
RequestMutUninit<
Service,
[MaybeUninit<CustomPayloadMarker>],
CustomHeaderMarker,
[CustomPayloadMarker],
CustomHeaderMarker,
>,
LoanError,
> {
let client_shared_state = self.client_shared_state.lock();
debug_assert!(
slice_len == 1
|| client_shared_state.request_sender.payload_type_variant()
== TypeVariant::Dynamic
);
unsafe {
self.loan_slice_uninit_impl(
slice_len,
client_shared_state.request_sender.payload_size() * slice_len,
)
}
}
}