use crate::port::update_connections::UpdateConnections;
use crate::prelude::BackpressureStrategy;
use crate::service::builder::CustomPayloadMarker;
use crate::service::naming_scheme::data_segment_name;
use crate::service::port_factory::server::LocalServerConfig;
use crate::service::{NoResource, SharedServiceState};
use crate::{
active_request::ActiveRequest,
prelude::PortFactory,
raw_sample::RawSample,
service::{
self,
dynamic_config::request_response::{ClientDetails, ServerDetails},
port_factory::server::{PortFactoryServer, ServerCreateError},
},
};
use alloc::sync::Arc;
use core::ptr::NonNull;
use core::{fmt::Debug, marker::PhantomData};
use iceoryx2_bb_concurrency::atomic::AtomicUsize;
use iceoryx2_bb_concurrency::atomic::Ordering;
use iceoryx2_bb_concurrency::cell::UnsafeCell;
use iceoryx2_bb_container::slotmap::SlotMap;
use iceoryx2_bb_container::vector::polymorphic_vec::*;
use iceoryx2_bb_elementary::{CallbackProgression, cyclic_tagger::CyclicTagger};
use iceoryx2_bb_elementary_traits::non_null::NonNullCompat;
use iceoryx2_bb_elementary_traits::testing::abandonable::Abandonable;
use iceoryx2_bb_elementary_traits::zero_copy_send::ZeroCopySend;
use iceoryx2_bb_lock_free::mpmc::container::{ContainerHandle, ContainerState};
use iceoryx2_bb_memory::heap_allocator::HeapAllocator;
use iceoryx2_bb_posix::unique_system_id::UniqueSystemId;
use iceoryx2_cal::arc_sync_policy::ArcSyncPolicy;
use iceoryx2_cal::dynamic_storage::DynamicStorage;
use iceoryx2_cal::zero_copy_connection::{CHANNEL_STATE_CLOSED, CHANNEL_STATE_OPEN, ChannelId};
use iceoryx2_log::{fail, warn};
use super::details::data_segment::DataSegment;
use super::details::segment_state::SegmentState;
use super::details::sender::{ReceiverDetails, Sender};
use super::{
ReceiveError,
details::{
chunk::Chunk,
chunk_details::ChunkDetails,
data_segment::DataSegmentType,
receiver::{Receiver, SenderDetails},
},
update_connections::ConnectionFailure,
};
use crate::identifiers::UniqueServerId;
const REQUEST_CHANNEL_ID: ChannelId = ChannelId::new(0);
pub(crate) const INVALID_CONNECTION_ID: usize = usize::MAX;
#[derive(Debug)]
pub(crate) struct SharedServerState<Service: service::Service> {
pub(crate) config: LocalServerConfig,
pub(crate) response_sender: Sender<Service>,
server_handle: UnsafeCell<Option<ContainerHandle>>,
pub(crate) request_receiver: Receiver<Service>,
client_list_state: UnsafeCell<ContainerState<ClientDetails>>,
service_state: SharedServiceState<Service, NoResource>,
port_tag: Service::StaticStorage,
}
impl<Service: service::Service> Abandonable for SharedServerState<Service> {
unsafe fn abandon_in_place(mut this: NonNull<Self>) {
let this = unsafe { this.as_mut() };
unsafe { Sender::abandon_in_place(NonNull::iox2_from_mut(&mut this.response_sender)) };
unsafe { Receiver::abandon_in_place(NonNull::iox2_from_mut(&mut this.request_receiver)) };
unsafe {
SharedServiceState::abandon_in_place(NonNull::iox2_from_mut(&mut this.service_state))
};
unsafe {
Service::StaticStorage::abandon_in_place(NonNull::iox2_from_mut(&mut this.port_tag))
};
}
}
impl<Service: service::Service> Drop for SharedServerState<Service> {
fn drop(&mut self) {
if let Some(handle) = unsafe { *self.server_handle.get() } {
self.service_state
.dynamic_storage()
.get()
.request_response()
.release_server_handle(handle);
}
}
}
impl<Service: service::Service> SharedServerState<Service> {
pub(crate) fn update_connections(&self) -> Result<(), ConnectionFailure> {
if unsafe {
self.request_receiver
.service_state
.dynamic_storage()
.get()
.request_response()
.clients
.update_state(&mut *self.client_list_state.get())
} {
fail!(from self,
when self.force_update_connections(),
"Connections were updated only partially since at least one connection to a client failed.");
}
Ok(())
}
fn force_update_connections(&self) -> Result<(), ConnectionFailure> {
self.request_receiver.start_update_connection_cycle();
self.response_sender.start_update_connection_cycle();
let mut result = Ok(());
unsafe {
(*self.client_list_state.get()).for_each(|index, details| {
let inner_result = self.request_receiver.update_connection(
index,
SenderDetails {
port_id: details.client_id.value(),
number_of_samples: details.number_of_requests,
max_number_of_segments: details.max_number_of_segments,
data_segment_type: details.data_segment_type,
},
);
result = result.and(inner_result);
let inner_result = self.response_sender.update_connection(
index,
ReceiverDetails {
port_id: details.client_id.value(),
buffer_size: details.response_buffer_size,
},
|_| {},
);
if let Some(err) = inner_result.err() {
result = result.and(Err(err.into()));
}
CallbackProgression::Continue
})
};
self.response_sender.finish_update_connection_cycle();
self.request_receiver.finish_update_connection_cycle();
result
}
}
#[derive(Debug)]
pub struct Server<
Service: service::Service,
RequestPayload: Debug + ZeroCopySend + ?Sized,
RequestHeader: Debug + ZeroCopySend,
ResponsePayload: Debug + ZeroCopySend + ?Sized,
ResponseHeader: Debug + ZeroCopySend,
> {
shared_state: Service::ArcThreadSafetyPolicy<SharedServerState<Service>>,
max_loaned_responses_per_request: usize,
enable_fire_and_forget: bool,
_request_payload: PhantomData<RequestPayload>,
_request_header: PhantomData<RequestHeader>,
_response_payload: PhantomData<ResponsePayload>,
_response_header: PhantomData<ResponseHeader>,
}
impl<
Service: service::Service,
RequestPayload: Debug + ZeroCopySend + ?Sized,
RequestHeader: Debug + ZeroCopySend,
ResponsePayload: Debug + ZeroCopySend + ?Sized,
ResponseHeader: Debug + ZeroCopySend,
> Abandonable for Server<Service, RequestPayload, RequestHeader, ResponsePayload, ResponseHeader>
{
unsafe fn abandon_in_place(mut this: NonNull<Self>) {
let this = unsafe { this.as_mut() };
unsafe {
Service::ArcThreadSafetyPolicy::abandon_in_place(NonNull::iox2_from_mut(
&mut this.shared_state,
));
}
}
}
unsafe impl<
Service: service::Service,
RequestPayload: Debug + ZeroCopySend + ?Sized,
RequestHeader: Debug + ZeroCopySend,
ResponsePayload: Debug + ZeroCopySend + ?Sized,
ResponseHeader: Debug + ZeroCopySend,
> Send for Server<Service, RequestPayload, RequestHeader, ResponsePayload, ResponseHeader>
where
Service::ArcThreadSafetyPolicy<SharedServerState<Service>>: Send + Sync,
{
}
unsafe impl<
Service: service::Service,
RequestPayload: Debug + ZeroCopySend + ?Sized,
RequestHeader: Debug + ZeroCopySend,
ResponsePayload: Debug + ZeroCopySend + ?Sized,
ResponseHeader: Debug + ZeroCopySend,
> Sync for Server<Service, RequestPayload, RequestHeader, ResponsePayload, ResponseHeader>
where
Service::ArcThreadSafetyPolicy<SharedServerState<Service>>: Send + Sync,
{
}
impl<
Service: service::Service,
RequestPayload: Debug + ZeroCopySend + ?Sized,
RequestHeader: Debug + ZeroCopySend,
ResponsePayload: Debug + ZeroCopySend + ?Sized,
ResponseHeader: Debug + ZeroCopySend,
> UpdateConnections
for Server<Service, RequestPayload, RequestHeader, ResponsePayload, ResponseHeader>
{
fn update_connections(&self) -> Result<(), ConnectionFailure> {
self.shared_state.lock().update_connections()
}
}
impl<
Service: service::Service,
RequestPayload: Debug + ZeroCopySend + ?Sized,
RequestHeader: Debug + ZeroCopySend,
ResponsePayload: Debug + ZeroCopySend + ?Sized,
ResponseHeader: Debug + ZeroCopySend,
> Server<Service, RequestPayload, RequestHeader, ResponsePayload, ResponseHeader>
{
pub(crate) fn new(
server_factory: PortFactoryServer<
Service,
RequestPayload,
RequestHeader,
ResponsePayload,
ResponseHeader,
>,
) -> Result<Self, ServerCreateError> {
let msg = "Failed to create Server port";
let origin = "Server::new()";
let server_id = UniqueServerId::new();
let service = &server_factory.factory.service;
let port_tag = match service
.shared_node()
.create_port_tag(origin, msg, server_id.0.value())
{
Ok(port_tag) => port_tag,
Err(e) => {
fail!(from origin, with ServerCreateError::UnableToCreatePortTag,
"{msg} since the port tag, that is required for cleanup, could not be created. [{e:?}]");
}
};
let static_config = server_factory.factory.static_config();
let number_of_requests_per_client =
unsafe { service.static_config().messaging_pattern.request_response() }
.required_amount_of_chunks_per_client_data_segment(
static_config.max_loaned_requests,
);
let number_of_responses =
unsafe { service.static_config().messaging_pattern.request_response() }
.required_amount_of_chunks_per_server_data_segment(
server_factory.config.max_loaned_responses_per_request,
);
let number_of_responses = server_factory
.preallocated_number_of_responses_override
.call(number_of_responses);
let client_list = &service.dynamic_storage().get().request_response().clients;
let number_of_to_be_removed_connections = service
.shared_node()
.config()
.defaults
.request_response
.server_expired_connection_buffer;
let number_of_active_connections = client_list.capacity();
let number_of_connections =
number_of_to_be_removed_connections + number_of_active_connections;
let request_receiver = Receiver {
connections: PolymorphicVec::from_fn(
HeapAllocator::global(),
number_of_active_connections,
|_| UnsafeCell::new(None),
)
.expect("Heap allocator provides memory."),
receiver_port_id: server_id.value(),
service_state: service.clone(),
message_type_details: static_config.request_message_type_details,
receiver_max_borrowed_samples: static_config.max_active_requests_per_client,
enable_safe_overflow: static_config.enable_safe_overflow_for_requests,
buffer_size: static_config.max_active_requests_per_client,
tagger: CyclicTagger::new(),
to_be_removed_connections: if static_config.enable_fire_and_forget_requests {
Some(UnsafeCell::new(
PolymorphicVec::new(
HeapAllocator::global(),
number_of_to_be_removed_connections,
)
.expect("Heap allocator provides memory."),
))
} else {
None
},
degradation_handler: server_factory.request_degradation_handler,
number_of_channels: 1,
connection_storage: UnsafeCell::new(SlotMap::new(number_of_connections)),
initial_channel_state: CHANNEL_STATE_OPEN,
};
let global_config = service.shared_node().config();
let data_segment_type = DataSegmentType::new_from_allocation_strategy(
server_factory.config.allocation_strategy,
);
let segment_name = data_segment_name(server_id.value());
let max_number_of_segments =
DataSegment::<Service>::max_number_of_segments(data_segment_type);
let sample_layout = static_config
.response_message_type_details
.sample_layout(server_factory.config.initial_max_slice_len);
let data_segment = match data_segment_type {
DataSegmentType::Static => DataSegment::<Service>::create_static_segment(
&segment_name,
sample_layout,
global_config,
number_of_responses,
),
DataSegmentType::Dynamic => DataSegment::<Service>::create_dynamic_segment(
&segment_name,
sample_layout,
global_config,
number_of_responses,
server_factory.config.allocation_strategy,
),
};
let data_segment = fail!(from origin,
when data_segment,
with ServerCreateError::UnableToCreateDataSegment,
"{} since the server data segment could not be created.", msg);
let response_sender = Sender {
segment_states: {
let mut v =
alloc::vec::Vec::<SegmentState>::with_capacity(max_number_of_segments as usize);
for _ in 0..max_number_of_segments {
v.push(SegmentState::new(number_of_responses))
}
v
},
data_segment,
connections: (0..client_list.capacity())
.map(|_| UnsafeCell::new(None))
.collect(),
sender_port_id: server_id.value(),
shared_node: service.shared_node().clone(),
receiver_max_buffer_size: static_config.max_response_buffer_size,
receiver_max_borrowed_samples: static_config
.max_borrowed_responses_per_pending_response,
sender_max_borrowed_samples: server_factory.config.max_loaned_responses_per_request
* static_config.max_active_requests_per_client
* static_config.max_clients,
enable_safe_overflow: static_config.enable_safe_overflow_for_responses,
number_of_samples: number_of_responses,
max_number_of_segments,
degradation_handler: server_factory.response_degradation_handler,
backpressure_handler: server_factory.backpressure_handler,
service_state: service.clone(),
tagger: CyclicTagger::new(),
loan_counter: AtomicUsize::new(0),
backpressure_strategy: server_factory.config.backpressure_strategy,
message_type_details: static_config.response_message_type_details,
number_of_channels: number_of_requests_per_client,
initial_channel_state: CHANNEL_STATE_CLOSED,
};
let shared_state = Service::ArcThreadSafetyPolicy::new(SharedServerState {
port_tag,
config: server_factory.config,
request_receiver,
client_list_state: UnsafeCell::new(unsafe { client_list.get_state() }),
server_handle: UnsafeCell::new(None),
service_state: service.clone(),
response_sender,
});
let shared_state = match shared_state {
Ok(v) => v,
Err(e) => {
fail!(from origin, with ServerCreateError::FailedToDeployThreadsafetyPolicy,
"{msg} since the threadsafety policy could not be instantiated ({e:?}).");
}
};
let new_self = Self {
max_loaned_responses_per_request: server_factory
.config
.max_loaned_responses_per_request,
enable_fire_and_forget: service
.static_config()
.request_response()
.enable_fire_and_forget_requests,
shared_state,
_request_payload: PhantomData,
_request_header: PhantomData,
_response_payload: PhantomData,
_response_header: PhantomData,
};
if let Err(e) = new_self.shared_state.lock().force_update_connections() {
warn!(from new_self, "The new server is unable to connect to every client, caused by {:?}.", e);
}
core::sync::atomic::compiler_fence(Ordering::SeqCst);
unsafe {
*new_self.shared_state.lock().server_handle.get() = match service
.dynamic_storage()
.get()
.request_response()
.add_server_id(ServerDetails {
server_id,
node_id: *service.shared_node().id(),
request_buffer_size: static_config.max_active_requests_per_client,
number_of_responses,
max_slice_len: server_factory.config.initial_max_slice_len,
data_segment_type,
max_number_of_segments,
}) {
Some(v) => Some(v),
None => {
fail!(from origin,
with ServerCreateError::ExceedsMaxSupportedServers,
"{} since it would exceed the maximum supported amount of servers of {}.",
msg, service.static_config().request_response().max_servers());
}
}
};
Ok(new_self)
}
pub fn id(&self) -> UniqueServerId {
UniqueServerId(UniqueSystemId::from(
self.shared_state.lock().request_receiver.receiver_port_id,
))
}
pub fn has_requests(&self) -> Result<bool, ConnectionFailure> {
let shared_state = self.shared_state.lock();
fail!(from self, when shared_state.update_connections(),
"Some requests are not being received since not all connections to clients could be established.");
if self.enable_fire_and_forget {
Ok(shared_state
.request_receiver
.has_samples(REQUEST_CHANNEL_ID))
} else {
Ok(shared_state
.request_receiver
.has_samples_in_active_connection(REQUEST_CHANNEL_ID))
}
}
pub fn backpressure_strategy(&self) -> BackpressureStrategy {
self.shared_state
.lock()
.response_sender
.backpressure_strategy
}
fn receive_impl(&self) -> Result<Option<(ChunkDetails, Chunk)>, ReceiveError> {
let shared_state = self.shared_state.lock();
if let Err(e) = shared_state.update_connections() {
fail!(from self,
with ReceiveError::ConnectionFailure(e),
"Some requests are not being received since not all connections to the clients could be established.");
}
shared_state.request_receiver.receive(REQUEST_CHANNEL_ID)
}
}
impl<
Service: service::Service,
RequestPayload: Debug + ZeroCopySend,
RequestHeader: Debug + ZeroCopySend,
ResponsePayload: Debug + ZeroCopySend + ?Sized,
ResponseHeader: Debug + ZeroCopySend,
> Server<Service, RequestPayload, RequestHeader, ResponsePayload, ResponseHeader>
{
fn create_active_request(
&self,
details: ChunkDetails,
chunk: Chunk,
connection_id: usize,
) -> ActiveRequest<Service, RequestPayload, RequestHeader, ResponsePayload, ResponseHeader>
{
let header =
unsafe { &*(chunk.header as *const service::header::request_response::RequestHeader) };
ActiveRequest {
details,
shared_loan_counter: Arc::new(AtomicUsize::new(0)),
max_loan_count: self.max_loaned_responses_per_request,
request_id: header.request_id,
channel_id: header.channel_id,
connection_id,
shared_state: self.shared_state.clone(),
ptr: unsafe {
RawSample::new_unchecked(
chunk.header.cast(),
chunk.user_header.cast(),
chunk.payload.cast::<RequestPayload>(),
)
},
_response_payload: PhantomData,
_response_header: PhantomData,
}
}
#[allow(clippy::type_complexity)] pub fn receive(
&self,
) -> Result<
Option<
ActiveRequest<Service, RequestPayload, RequestHeader, ResponsePayload, ResponseHeader>,
>,
ReceiveError,
> {
loop {
match self.receive_impl()? {
Some((details, chunk)) => {
let header = unsafe {
&*(chunk.header as *const service::header::request_response::RequestHeader)
};
if let Some(connection_id) = self
.shared_state
.lock()
.response_sender
.get_connection_id_of(header.client_id.value())
{
let active_request =
self.create_active_request(details, chunk, connection_id);
if !self.enable_fire_and_forget && !active_request.is_connected() {
continue;
}
return Ok(Some(active_request));
} else if self.enable_fire_and_forget {
let active_request =
self.create_active_request(details, chunk, INVALID_CONNECTION_ID);
return Ok(Some(active_request));
}
}
None => return Ok(None),
}
}
}
}
impl<
Service: service::Service,
RequestPayload: Debug + ZeroCopySend,
RequestHeader: Debug + ZeroCopySend,
ResponsePayload: Debug + ZeroCopySend + ?Sized,
ResponseHeader: Debug + ZeroCopySend,
> Server<Service, [RequestPayload], RequestHeader, ResponsePayload, ResponseHeader>
{
fn create_active_request(
&self,
details: ChunkDetails,
chunk: Chunk,
connection_id: usize,
number_of_elements: usize,
) -> ActiveRequest<Service, [RequestPayload], RequestHeader, ResponsePayload, ResponseHeader>
{
let header =
unsafe { &*(chunk.header as *const service::header::request_response::RequestHeader) };
ActiveRequest {
details,
shared_loan_counter: Arc::new(AtomicUsize::new(0)),
max_loan_count: self.max_loaned_responses_per_request,
request_id: header.request_id,
channel_id: header.channel_id,
connection_id,
shared_state: self.shared_state.clone(),
ptr: unsafe {
RawSample::new_slice_unchecked(
chunk.header.cast(),
chunk.user_header.cast(),
core::ptr::slice_from_raw_parts(
chunk.payload.cast::<RequestPayload>(),
number_of_elements as _,
),
)
},
_response_payload: PhantomData,
_response_header: PhantomData,
}
}
#[allow(clippy::type_complexity)] pub fn receive(
&self,
) -> Result<
Option<
ActiveRequest<
Service,
[RequestPayload],
RequestHeader,
ResponsePayload,
ResponseHeader,
>,
>,
ReceiveError,
> {
loop {
match self.receive_impl()? {
Some((details, chunk)) => {
let header = unsafe {
&*(chunk.header as *const service::header::request_response::RequestHeader)
};
if let Some(connection_id) = self
.shared_state
.lock()
.response_sender
.get_connection_id_of(header.client_id.value())
{
let active_request = self.create_active_request(
details,
chunk,
connection_id,
header.number_of_elements() as _,
);
if !self.enable_fire_and_forget && !active_request.is_connected() {
continue;
}
return Ok(Some(active_request));
}
}
None => return Ok(None),
}
}
}
}
impl<
Service: service::Service,
RequestPayload: Debug + ZeroCopySend + ?Sized,
RequestHeader: Debug + ZeroCopySend,
ResponsePayload: Debug + ZeroCopySend,
ResponseHeader: Debug + ZeroCopySend,
> Server<Service, RequestPayload, RequestHeader, [ResponsePayload], ResponseHeader>
{
pub fn initial_max_slice_len(&self) -> usize {
self.shared_state.lock().config.initial_max_slice_len
}
}
impl<
Service: service::Service,
RequestHeader: Debug + ZeroCopySend,
ResponsePayload: Debug + ZeroCopySend + ?Sized,
ResponseHeader: Debug + ZeroCopySend,
> Server<Service, [CustomPayloadMarker], RequestHeader, ResponsePayload, ResponseHeader>
{
#[doc(hidden)]
#[allow(clippy::type_complexity)] pub unsafe fn receive_custom_payload(
&self,
) -> Result<
Option<
ActiveRequest<
Service,
[CustomPayloadMarker],
RequestHeader,
ResponsePayload,
ResponseHeader,
>,
>,
ReceiveError,
> {
let shared_state = self.shared_state.lock();
loop {
match self.receive_impl()? {
Some((details, chunk)) => {
let header = unsafe {
&*(chunk.header as *const service::header::request_response::RequestHeader)
};
let number_of_elements = (*header).number_of_elements();
let number_of_bytes =
number_of_elements as usize * shared_state.request_receiver.payload_size();
if let Some(connection_id) = shared_state
.response_sender
.get_connection_id_of(header.client_id.value())
{
let active_request = self.create_active_request(
details,
chunk,
connection_id,
number_of_bytes,
);
if !self.enable_fire_and_forget && !active_request.is_connected() {
continue;
}
return Ok(Some(active_request));
}
}
None => return Ok(None),
}
}
}
}