pub mod common;
pub mod file;
pub mod posix_shared_memory;
pub mod process_local;
pub mod recommended;
pub mod used_chunk_list;
use core::fmt::Debug;
use core::time::Duration;
pub use crate::shared_memory::PointerOffset;
use iceoryx2_bb_elementary_traits::testing::abandonable::Abandonable;
pub use iceoryx2_bb_system_types::file_name::*;
pub use iceoryx2_bb_system_types::path::Path;
use iceoryx2_log::fail;
use crate::static_storage::file::{NamedConcept, NamedConceptBuilder, NamedConceptMgmt};
use iceoryx2_bb_concurrency::atomic::{AtomicU64, Ordering};
use iceoryx2_bb_derive_macros::ZeroCopySend;
use iceoryx2_bb_elementary_traits::zero_copy_send::ZeroCopySend;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum ZeroCopyPortRemoveError {
InternalError,
VersionMismatch,
InsufficientPermissions,
DoesNotExist,
}
impl core::fmt::Display for ZeroCopyPortRemoveError {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
write!(f, "ZeroCopyPortRemoveError::{self:?}")
}
}
impl core::error::Error for ZeroCopyPortRemoveError {}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum ZeroCopyCreationError {
InternalError,
IsBeingCleanedUp,
AnotherInstanceIsAlreadyConnected,
InsufficientPermissions,
VersionMismatch,
ConnectionMaybeCorrupted,
InvalidSampleSize,
InitializationNotYetFinalized,
IncompatibleBufferSize,
IncompatibleMaxBorrowedSamplesPerChannelSetting,
IncompatibleOverflowSetting,
IncompatibleNumberOfSamples,
IncompatibleNumberOfSegments,
IncompatibleNumberOfChannels,
}
impl core::fmt::Display for ZeroCopyCreationError {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
write!(f, "{}::{:?}", stringify!(Self), self)
}
}
impl core::error::Error for ZeroCopyCreationError {}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ZeroCopySendError {
ConnectionCorrupted,
ReceiveBufferFull,
UnableToDeliver, UsedChunkListFull,
NoConnectedReceiver,
ChannelIsClosed,
InternalError,
}
impl core::fmt::Display for ZeroCopySendError {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
write!(f, "{}::{:?}", stringify!(Self), self)
}
}
impl core::error::Error for ZeroCopySendError {}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ZeroCopyReceiveError {
ReceiveWouldExceedMaxBorrowValue,
}
impl core::fmt::Display for ZeroCopyReceiveError {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
write!(f, "{}::{:?}", stringify!(Self), self)
}
}
impl core::error::Error for ZeroCopyReceiveError {}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ZeroCopyReclaimError {
ReceiverReturnedCorruptedPointerOffset,
}
impl core::fmt::Display for ZeroCopyReclaimError {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
write!(f, "{}::{:?}", stringify!(Self), self)
}
}
impl core::error::Error for ZeroCopyReclaimError {}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ZeroCopyReleaseError {
RetrieveBufferFull,
}
impl core::fmt::Display for ZeroCopyReleaseError {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
write!(f, "{}::{:?}", stringify!(Self), self)
}
}
impl core::error::Error for ZeroCopyReleaseError {}
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
pub enum BackpressureToReceiverAction {
FollowBackpressureyStrategy,
Retry,
DiscardPointerOffset,
DiscardPointerOffsetAndFail,
}
#[repr(C)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, ZeroCopySend)]
pub struct ChannelId(usize);
impl ChannelId {
pub const fn new(value: usize) -> Self {
Self(value)
}
pub const fn value(&self) -> usize {
self.0
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ChannelStateNewError {
ValueOutOfBounds,
}
impl core::fmt::Display for ChannelStateNewError {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
write!(f, "{}::{:?}", stringify!(Self), self)
}
}
impl core::error::Error for ChannelStateNewError {}
#[repr(C)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, ZeroCopySend)]
pub struct ChannelState(u64);
impl ChannelState {
pub fn new(value: u64) -> Result<Self, ChannelStateNewError> {
if value > Self::max_value() {
fail!(from "ChannelState::new()", with ChannelStateNewError::ValueOutOfBounds,
"Unable to create new ChannelState since the value must be less or equal than 2^62 and this value is {value},");
}
Ok(Self(value))
}
pub const fn max_value() -> u64 {
2u64.pow(62)
}
pub fn value(&self) -> u64 {
self.0
}
}
pub const DEFAULT_BUFFER_SIZE: usize = 4;
pub const DEFAULT_ENABLE_SAFE_OVERFLOW: bool = false;
pub const DEFAULT_MAX_BORROWED_SAMPLES_PER_CHANNEL: usize = 4;
pub const DEFAULT_MAX_SUPPORTED_SHARED_MEMORY_SEGMENTS: u8 = 1;
pub const DEFAULT_NUMBER_OF_CHANNELS: usize = 1;
pub const DEFAULT_NUMBER_OF_SAMPLES_PER_SEGMENT: usize = 8;
pub const CHANNEL_STATE_OPEN: ChannelState = ChannelState(0);
pub const CHANNEL_STATE_CLOSED: ChannelState = ChannelState(u64::MAX);
const CHANNEL_STATE_DISCONNECT_HINT_BIT: u64 = 1u64 << 63;
pub trait ZeroCopyConnectionBuilder<C: ZeroCopyConnection>: NamedConceptBuilder<C> {
fn buffer_size(self, value: usize) -> Self;
fn enable_safe_overflow(self, value: bool) -> Self;
fn receiver_max_borrowed_samples_per_channel(self, value: usize) -> Self;
fn max_supported_shared_memory_segments(self, value: u8) -> Self;
fn number_of_samples_per_segment(self, value: usize) -> Self;
fn number_of_channels(self, value: usize) -> Self;
fn initial_channel_state(self, value: ChannelState) -> Self;
fn timeout(self, value: Duration) -> Self;
fn create_sender(self) -> Result<C::Sender, ZeroCopyCreationError>;
fn create_receiver(self) -> Result<C::Receiver, ZeroCopyCreationError>;
}
pub trait ZeroCopyPortDetails {
fn number_of_channels(&self) -> usize;
fn buffer_size(&self) -> usize;
fn has_enabled_safe_overflow(&self) -> bool;
fn max_borrowed_samples(&self) -> usize;
fn max_supported_shared_memory_segments(&self) -> u8;
fn is_connected(&self) -> bool;
#[doc(hidden)]
fn __internal_get_channel_state(&self, channel_id: ChannelId) -> &AtomicU64;
fn set_channel_state(&self, channel_id: ChannelId, state: ChannelState) -> bool {
self.__internal_get_channel_state(channel_id)
.compare_exchange(
CHANNEL_STATE_CLOSED.0,
state.0,
Ordering::Relaxed,
Ordering::Relaxed,
)
.is_ok()
}
fn set_disconnect_hint(&self, channel_id: ChannelId, expected_state: ChannelState) {
let disconnect_hint_state = expected_state.0 | CHANNEL_STATE_DISCONNECT_HINT_BIT;
let _ = self
.__internal_get_channel_state(channel_id)
.compare_exchange(
expected_state.0,
disconnect_hint_state,
Ordering::Relaxed,
Ordering::Relaxed,
);
}
fn has_disconnect_hint(&self, channel_id: ChannelId, expected_state: ChannelState) -> bool {
let disconnect_hint_state = expected_state.0 | CHANNEL_STATE_DISCONNECT_HINT_BIT;
disconnect_hint_state
== self
.__internal_get_channel_state(channel_id)
.load(Ordering::Relaxed)
}
fn has_channel_state(&self, channel_id: ChannelId, expected_state: ChannelState) -> bool {
let state = self
.__internal_get_channel_state(channel_id)
.load(Ordering::Relaxed);
let state_without_disconnect_hint_bit = state & !(CHANNEL_STATE_DISCONNECT_HINT_BIT);
expected_state.0 == state_without_disconnect_hint_bit
}
fn close_channel(&self, channel_id: ChannelId, expected_state: ChannelState) {
match self
.__internal_get_channel_state(channel_id)
.compare_exchange(
expected_state.0,
CHANNEL_STATE_CLOSED.0,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => (),
Err(v) => {
let graceful_disconnect_state =
expected_state.0 | CHANNEL_STATE_DISCONNECT_HINT_BIT;
if v == graceful_disconnect_state {
let _ = self
.__internal_get_channel_state(channel_id)
.compare_exchange(
graceful_disconnect_state,
CHANNEL_STATE_CLOSED.0,
Ordering::Relaxed,
Ordering::Relaxed,
);
}
}
}
}
}
pub trait BackpressureToReceiverFn: Fn(u64, Duration) -> BackpressureToReceiverAction {}
impl<F: Fn(u64, Duration) -> BackpressureToReceiverAction> BackpressureToReceiverFn for F {}
pub trait ZeroCopySender: Debug + ZeroCopyPortDetails + NamedConcept + Send + Abandonable {
fn try_send(
&self,
ptr: PointerOffset,
sample_size: usize,
channel_id: ChannelId,
) -> Result<Option<PointerOffset>, ZeroCopySendError>;
fn blocking_send<F: BackpressureToReceiverFn>(
&self,
ptr: PointerOffset,
sample_size: usize,
channel_id: ChannelId,
backpressure_to_receiver_handler: F,
backpressure_action_for_strategy: BackpressureToReceiverAction,
) -> Result<Option<PointerOffset>, ZeroCopySendError>;
fn reclaim(&self, channel_id: ChannelId)
-> Result<Option<PointerOffset>, ZeroCopyReclaimError>;
unsafe fn acquire_used_offsets<F: FnMut(PointerOffset)>(&self, callback: F);
}
pub trait ZeroCopyReceiver:
Debug + ZeroCopyPortDetails + NamedConcept + Send + Abandonable
{
fn has_data(&self, channel_id: ChannelId) -> bool;
fn receive(&self, channel_id: ChannelId)
-> Result<Option<PointerOffset>, ZeroCopyReceiveError>;
fn release(
&self,
ptr: PointerOffset,
channel_id: ChannelId,
) -> Result<(), ZeroCopyReleaseError>;
fn borrow_count(&self, channel_id: ChannelId) -> usize;
}
pub trait ZeroCopyConnection: Debug + Sized + NamedConceptMgmt {
type Sender: ZeroCopySender;
type Receiver: ZeroCopyReceiver;
type Builder: ZeroCopyConnectionBuilder<Self>;
unsafe fn remove_sender(
name: &FileName,
config: &Self::Configuration,
) -> Result<(), ZeroCopyPortRemoveError>;
unsafe fn remove_receiver(
name: &FileName,
config: &Self::Configuration,
) -> Result<(), ZeroCopyPortRemoveError>;
fn does_support_safe_overflow() -> bool {
false
}
fn has_configurable_buffer_size() -> bool {
false
}
fn default_suffix() -> FileName {
unsafe { FileName::new_unchecked(b".rx") }
}
}