use alloc::sync::Arc;
use core::{
fmt::Debug,
marker::PhantomData,
ops::{Deref, DerefMut},
sync::atomic::Ordering,
};
use iceoryx2_bb_elementary_traits::zero_copy_send::ZeroCopySend;
use iceoryx2_bb_log::fail;
use iceoryx2_cal::{
arc_sync_policy::ArcSyncPolicy, shm_allocator::PointerOffset, zero_copy_connection::ChannelId,
};
use iceoryx2_pal_concurrency_sync::iox_atomic::IoxAtomicUsize;
use crate::{
port::{
server::{SharedServerState, INVALID_CONNECTION_ID},
SendError,
},
raw_sample::RawSampleMut,
service,
};
pub struct ResponseMut<
Service: service::Service,
ResponsePayload: Debug + ZeroCopySend + ?Sized,
ResponseHeader: Debug + ZeroCopySend,
> {
pub(crate) ptr: RawSampleMut<
service::header::request_response::ResponseHeader,
ResponseHeader,
ResponsePayload,
>,
pub(crate) shared_state: Service::ArcThreadSafetyPolicy<SharedServerState<Service>>,
pub(crate) shared_loan_counter: Arc<IoxAtomicUsize>,
pub(crate) offset_to_chunk: PointerOffset,
pub(crate) sample_size: usize,
pub(crate) channel_id: ChannelId,
pub(crate) connection_id: usize,
pub(crate) _response_payload: PhantomData<ResponsePayload>,
pub(crate) _response_header: PhantomData<ResponseHeader>,
}
unsafe impl<
Service: crate::service::Service,
ResponsePayload: Debug + ZeroCopySend + ?Sized,
ResponseHeader: Debug + ZeroCopySend,
> Send for ResponseMut<Service, ResponsePayload, ResponseHeader>
where
Service::ArcThreadSafetyPolicy<SharedServerState<Service>>: Send + Sync,
{
}
impl<
Service: crate::service::Service,
ResponsePayload: Debug + ZeroCopySend + ?Sized,
ResponseHeader: Debug + ZeroCopySend,
> Debug for ResponseMut<Service, ResponsePayload, ResponseHeader>
{
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
write!(
f,
"ResponseMut<{}, {}, {}> {{ ptr: {:?}, offset_to_chunk: {:?}, sample_size: {}, channel_id: {} }}",
core::any::type_name::<Service>(),
core::any::type_name::<ResponsePayload>(),
core::any::type_name::<ResponseHeader>(),
self.ptr,
self.offset_to_chunk,
self.sample_size,
self.channel_id.value()
)
}
}
impl<
Service: crate::service::Service,
ResponsePayload: Debug + ZeroCopySend + ?Sized,
ResponseHeader: Debug + ZeroCopySend,
> Drop for ResponseMut<Service, ResponsePayload, ResponseHeader>
{
fn drop(&mut self) {
self.shared_state
.lock()
.response_sender
.return_loaned_sample(self.offset_to_chunk);
self.shared_loan_counter.fetch_sub(1, Ordering::Relaxed);
}
}
impl<
Service: crate::service::Service,
ResponsePayload: Debug + ZeroCopySend + ?Sized,
ResponseHeader: Debug + ZeroCopySend,
> Deref for ResponseMut<Service, ResponsePayload, ResponseHeader>
{
type Target = ResponsePayload;
fn deref(&self) -> &Self::Target {
self.ptr.as_payload_ref()
}
}
impl<
Service: crate::service::Service,
ResponsePayload: Debug + ZeroCopySend + ?Sized,
ResponseHeader: Debug + ZeroCopySend,
> DerefMut for ResponseMut<Service, ResponsePayload, ResponseHeader>
{
fn deref_mut(&mut self) -> &mut Self::Target {
self.ptr.as_payload_mut()
}
}
impl<
Service: crate::service::Service,
ResponsePayload: Debug + ZeroCopySend + ?Sized,
ResponseHeader: Debug + ZeroCopySend,
> ResponseMut<Service, ResponsePayload, ResponseHeader>
{
pub fn header(&self) -> &service::header::request_response::ResponseHeader {
self.ptr.as_header_ref()
}
pub fn user_header(&self) -> &ResponseHeader {
self.ptr.as_user_header_ref()
}
pub fn user_header_mut(&mut self) -> &mut ResponseHeader {
self.ptr.as_user_header_mut()
}
pub fn payload(&self) -> &ResponsePayload {
self.ptr.as_payload_ref()
}
pub fn payload_mut(&mut self) -> &mut ResponsePayload {
self.ptr.as_payload_mut()
}
pub fn send(self) -> Result<(), SendError> {
let msg = "Unable to send response";
let shared_state = self.shared_state.lock();
fail!(from self, when shared_state.update_connections(),
"{} since the connections could not be updated.", msg);
if self.connection_id != INVALID_CONNECTION_ID {
shared_state.response_sender.deliver_offset_to_connection(
self.offset_to_chunk,
self.sample_size,
self.channel_id,
self.connection_id,
)?;
}
Ok(())
}
}