use core::{fmt::Debug, marker::PhantomData};
use core::{
ops::{Deref, DerefMut},
sync::atomic::Ordering,
};
use iceoryx2_bb_elementary_traits::zero_copy_send::ZeroCopySend;
use iceoryx2_bb_log::fatal_panic;
use iceoryx2_cal::arc_sync_policy::ArcSyncPolicy;
use iceoryx2_cal::zero_copy_connection::ChannelId;
use iceoryx2_cal::shm_allocator::PointerOffset;
use iceoryx2_pal_concurrency_sync::iox_atomic::IoxAtomicBool;
use crate::{
pending_response::PendingResponse,
port::client::{ClientSharedState, RequestSendError},
raw_sample::RawSampleMut,
service,
};
pub struct RequestMut<
Service: crate::service::Service,
RequestPayload: Debug + ZeroCopySend + ?Sized,
RequestHeader: Debug + ZeroCopySend,
ResponsePayload: Debug + ZeroCopySend + ?Sized,
ResponseHeader: Debug + ZeroCopySend,
> {
pub(crate) ptr: RawSampleMut<
service::header::request_response::RequestHeader,
RequestHeader,
RequestPayload,
>,
pub(crate) sample_size: usize,
pub(crate) offset_to_chunk: PointerOffset,
pub(crate) client_shared_state: Service::ArcThreadSafetyPolicy<ClientSharedState<Service>>,
pub(crate) was_sample_sent: IoxAtomicBool,
pub(crate) channel_id: ChannelId,
pub(crate) _response_payload: PhantomData<ResponsePayload>,
pub(crate) _response_header: PhantomData<ResponseHeader>,
}
unsafe impl<
Service: crate::service::Service,
RequestPayload: Debug + ZeroCopySend + ?Sized,
RequestHeader: Debug + ZeroCopySend,
ResponsePayload: Debug + ZeroCopySend + ?Sized,
ResponseHeader: Debug + ZeroCopySend,
> Send for RequestMut<Service, RequestPayload, RequestHeader, ResponsePayload, ResponseHeader>
where
Service::ArcThreadSafetyPolicy<ClientSharedState<Service>>: Send + Sync,
{
}
impl<
Service: crate::service::Service,
RequestPayload: Debug + ZeroCopySend + ?Sized,
RequestHeader: Debug + ZeroCopySend,
ResponsePayload: Debug + ZeroCopySend + ?Sized,
ResponseHeader: Debug + ZeroCopySend,
> Drop for RequestMut<Service, RequestPayload, RequestHeader, ResponsePayload, ResponseHeader>
{
fn drop(&mut self) {
let client_shared_state = self.client_shared_state.lock();
if !unsafe { &mut *client_shared_state.available_channel_ids.get() }
.push(self.header().channel_id)
{
fatal_panic!(from self,
"This should never happen! The channel id could not be returned.");
}
client_shared_state
.request_sender
.release_sample(self.offset_to_chunk);
if !self.was_sample_sent.load(Ordering::Relaxed) {
client_shared_state
.request_sender
.loan_counter
.fetch_sub(1, Ordering::Relaxed);
}
}
}
impl<
Service: crate::service::Service,
RequestPayload: Debug + ZeroCopySend + ?Sized,
RequestHeader: Debug + ZeroCopySend,
ResponsePayload: Debug + ZeroCopySend + ?Sized,
ResponseHeader: Debug + ZeroCopySend,
> Debug
for RequestMut<Service, RequestPayload, RequestHeader, ResponsePayload, ResponseHeader>
{
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
write!(
f,
"RequestMut<{}, {}, {}, {}, {}> {{ ptr: {:?}, sample_size: {}, offset_to_chunk: {:?}, was_sample_sent: {}, channel_id: {} }}",
core::any::type_name::<Service>(),
core::any::type_name::<RequestPayload>(),
core::any::type_name::<RequestHeader>(),
core::any::type_name::<ResponsePayload>(),
core::any::type_name::<ResponseHeader>(),
self.ptr,
self.sample_size,
self.offset_to_chunk,
self.was_sample_sent.load(Ordering::Relaxed),
self.channel_id.value()
)
}
}
impl<
Service: crate::service::Service,
RequestPayload: Debug + ZeroCopySend + ?Sized,
RequestHeader: Debug + ZeroCopySend,
ResponsePayload: Debug + ZeroCopySend + ?Sized,
ResponseHeader: Debug + ZeroCopySend,
> Deref
for RequestMut<Service, RequestPayload, RequestHeader, ResponsePayload, ResponseHeader>
{
type Target = RequestPayload;
fn deref(&self) -> &Self::Target {
self.ptr.as_payload_ref()
}
}
impl<
Service: crate::service::Service,
RequestPayload: Debug + ZeroCopySend + ?Sized,
RequestHeader: Debug + ZeroCopySend,
ResponsePayload: Debug + ZeroCopySend + ?Sized,
ResponseHeader: Debug + ZeroCopySend,
> DerefMut
for RequestMut<Service, RequestPayload, RequestHeader, ResponsePayload, ResponseHeader>
{
fn deref_mut(&mut self) -> &mut Self::Target {
self.ptr.as_payload_mut()
}
}
impl<
Service: crate::service::Service,
RequestPayload: Debug + ZeroCopySend + ?Sized,
RequestHeader: Debug + ZeroCopySend,
ResponsePayload: Debug + ZeroCopySend + ?Sized,
ResponseHeader: Debug + ZeroCopySend,
> RequestMut<Service, RequestPayload, RequestHeader, ResponsePayload, ResponseHeader>
{
pub fn header(&self) -> &service::header::request_response::RequestHeader {
self.ptr.as_header_ref()
}
pub fn user_header(&self) -> &RequestHeader {
self.ptr.as_user_header_ref()
}
pub fn user_header_mut(&mut self) -> &mut RequestHeader {
self.ptr.as_user_header_mut()
}
pub fn payload(&self) -> &RequestPayload {
self.ptr.as_payload_ref()
}
pub fn payload_mut(&mut self) -> &mut RequestPayload {
self.ptr.as_payload_mut()
}
pub fn send(
self,
) -> Result<
PendingResponse<Service, RequestPayload, RequestHeader, ResponsePayload, ResponseHeader>,
RequestSendError,
> {
let client_shared_state = self.client_shared_state.lock();
match client_shared_state.send_request(
self.offset_to_chunk,
self.sample_size,
self.channel_id,
self.header().request_id,
) {
Ok(number_of_server_connections) => {
self.was_sample_sent.store(true, Ordering::Relaxed);
client_shared_state
.request_sender
.loan_counter
.fetch_sub(1, Ordering::Relaxed);
drop(client_shared_state);
let active_request = PendingResponse {
number_of_server_connections,
request: self,
_service: PhantomData,
_response_payload: PhantomData,
_response_header: PhantomData,
};
Ok(active_request)
}
Err(e) => Err(e),
}
}
}