1pub mod common;
14pub mod file;
15pub mod posix_shared_memory;
16pub mod process_local;
17pub mod recommended;
18pub mod used_chunk_list;
19
20use core::fmt::Debug;
21use core::time::Duration;
22
23pub use crate::shared_memory::PointerOffset;
24use iceoryx2_bb_elementary_traits::testing::abandonable::Abandonable;
25pub use iceoryx2_bb_system_types::file_name::*;
26pub use iceoryx2_bb_system_types::path::Path;
27use iceoryx2_log::fail;
28
29use crate::static_storage::file::{NamedConcept, NamedConceptBuilder, NamedConceptMgmt};
30use iceoryx2_bb_concurrency::atomic::{AtomicU64, Ordering};
31use iceoryx2_bb_derive_macros::ZeroCopySend;
32use iceoryx2_bb_elementary_traits::zero_copy_send::ZeroCopySend;
33
34#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
35pub enum ZeroCopyPortRemoveError {
36 InternalError,
37 VersionMismatch,
38 InsufficientPermissions,
39 DoesNotExist,
40}
41
42impl core::fmt::Display for ZeroCopyPortRemoveError {
43 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
44 write!(f, "ZeroCopyPortRemoveError::{self:?}")
45 }
46}
47
48impl core::error::Error for ZeroCopyPortRemoveError {}
49
50#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
51pub enum ZeroCopyCreationError {
52 InternalError,
53 IsBeingCleanedUp,
54 AnotherInstanceIsAlreadyConnected,
55 InsufficientPermissions,
56 VersionMismatch,
57 ConnectionMaybeCorrupted,
58 InvalidSampleSize,
59 InitializationNotYetFinalized,
60 IncompatibleBufferSize,
61 IncompatibleMaxBorrowedSamplesPerChannelSetting,
62 IncompatibleOverflowSetting,
63 IncompatibleNumberOfSamples,
64 IncompatibleNumberOfSegments,
65 IncompatibleNumberOfChannels,
66}
67
68impl core::fmt::Display for ZeroCopyCreationError {
69 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
70 write!(f, "{}::{:?}", stringify!(Self), self)
71 }
72}
73
74impl core::error::Error for ZeroCopyCreationError {}
75
76#[derive(Debug, Clone, Copy, PartialEq, Eq)]
77pub enum ZeroCopySendError {
78 ConnectionCorrupted,
79 ReceiveBufferFull,
80 UnableToDeliver, UsedChunkListFull,
82 NoConnectedReceiver,
83 ChannelIsClosed,
84 InternalError,
85}
86
87impl core::fmt::Display for ZeroCopySendError {
88 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
89 write!(f, "{}::{:?}", stringify!(Self), self)
90 }
91}
92
93impl core::error::Error for ZeroCopySendError {}
94
95#[derive(Debug, Clone, Copy, PartialEq, Eq)]
96pub enum ZeroCopyReceiveError {
97 ReceiveWouldExceedMaxBorrowValue,
98}
99
100impl core::fmt::Display for ZeroCopyReceiveError {
101 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
102 write!(f, "{}::{:?}", stringify!(Self), self)
103 }
104}
105
106impl core::error::Error for ZeroCopyReceiveError {}
107
108#[derive(Debug, Clone, Copy, PartialEq, Eq)]
109pub enum ZeroCopyReclaimError {
110 ReceiverReturnedCorruptedPointerOffset,
111}
112
113impl core::fmt::Display for ZeroCopyReclaimError {
114 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
115 write!(f, "{}::{:?}", stringify!(Self), self)
116 }
117}
118
119impl core::error::Error for ZeroCopyReclaimError {}
120
121#[derive(Debug, Clone, Copy, PartialEq, Eq)]
122pub enum ZeroCopyReleaseError {
123 RetrieveBufferFull,
124}
125
126impl core::fmt::Display for ZeroCopyReleaseError {
127 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
128 write!(f, "{}::{:?}", stringify!(Self), self)
129 }
130}
131
132impl core::error::Error for ZeroCopyReleaseError {}
133
134#[derive(Debug, PartialEq, Eq, Copy, Clone)]
137pub enum BackpressureToReceiverAction {
138 FollowBackpressureyStrategy,
140 Retry,
142 DiscardPointerOffset,
145 DiscardPointerOffsetAndFail,
149}
150
151#[repr(C)]
152#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, ZeroCopySend)]
153pub struct ChannelId(usize);
154
155impl ChannelId {
156 pub const fn new(value: usize) -> Self {
157 Self(value)
158 }
159
160 pub const fn value(&self) -> usize {
161 self.0
162 }
163}
164
165#[derive(Debug, Clone, Copy, PartialEq, Eq)]
166pub enum ChannelStateNewError {
167 ValueOutOfBounds,
168}
169
170impl core::fmt::Display for ChannelStateNewError {
171 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
172 write!(f, "{}::{:?}", stringify!(Self), self)
173 }
174}
175
176impl core::error::Error for ChannelStateNewError {}
177
178#[repr(C)]
179#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, ZeroCopySend)]
180pub struct ChannelState(u64);
181
182impl ChannelState {
183 pub fn new(value: u64) -> Result<Self, ChannelStateNewError> {
184 if value > Self::max_value() {
185 fail!(from "ChannelState::new()", with ChannelStateNewError::ValueOutOfBounds,
186 "Unable to create new ChannelState since the value must be less or equal than 2^62 and this value is {value},");
187 }
188
189 Ok(Self(value))
190 }
191
192 pub const fn max_value() -> u64 {
193 2u64.pow(62)
194 }
195
196 pub fn value(&self) -> u64 {
197 self.0
198 }
199}
200
201pub const DEFAULT_BUFFER_SIZE: usize = 4;
202pub const DEFAULT_ENABLE_SAFE_OVERFLOW: bool = false;
203pub const DEFAULT_MAX_BORROWED_SAMPLES_PER_CHANNEL: usize = 4;
204pub const DEFAULT_MAX_SUPPORTED_SHARED_MEMORY_SEGMENTS: u8 = 1;
205pub const DEFAULT_NUMBER_OF_CHANNELS: usize = 1;
206pub const DEFAULT_NUMBER_OF_SAMPLES_PER_SEGMENT: usize = 8;
207pub const CHANNEL_STATE_OPEN: ChannelState = ChannelState(0);
209pub const CHANNEL_STATE_CLOSED: ChannelState = ChannelState(u64::MAX);
211const CHANNEL_STATE_DISCONNECT_HINT_BIT: u64 = 1u64 << 63;
213
214pub trait ZeroCopyConnectionBuilder<C: ZeroCopyConnection>: NamedConceptBuilder<C> {
215 fn buffer_size(self, value: usize) -> Self;
216 fn enable_safe_overflow(self, value: bool) -> Self;
217 fn receiver_max_borrowed_samples_per_channel(self, value: usize) -> Self;
218 fn max_supported_shared_memory_segments(self, value: u8) -> Self;
219 fn number_of_samples_per_segment(self, value: usize) -> Self;
220 fn number_of_channels(self, value: usize) -> Self;
221 fn initial_channel_state(self, value: ChannelState) -> Self;
222 fn timeout(self, value: Duration) -> Self;
228
229 fn create_sender(self) -> Result<C::Sender, ZeroCopyCreationError>;
230 fn create_receiver(self) -> Result<C::Receiver, ZeroCopyCreationError>;
231}
232
233pub trait ZeroCopyPortDetails {
234 fn number_of_channels(&self) -> usize;
235 fn buffer_size(&self) -> usize;
236 fn has_enabled_safe_overflow(&self) -> bool;
237 fn max_borrowed_samples(&self) -> usize;
238 fn max_supported_shared_memory_segments(&self) -> u8;
239 fn is_connected(&self) -> bool;
240 #[doc(hidden)]
241 fn __internal_get_channel_state(&self, channel_id: ChannelId) -> &AtomicU64;
242
243 fn set_channel_state(&self, channel_id: ChannelId, state: ChannelState) -> bool {
244 self.__internal_get_channel_state(channel_id)
245 .compare_exchange(
246 CHANNEL_STATE_CLOSED.0,
247 state.0,
248 Ordering::Relaxed,
249 Ordering::Relaxed,
250 )
251 .is_ok()
252 }
253
254 fn set_disconnect_hint(&self, channel_id: ChannelId, expected_state: ChannelState) {
255 let disconnect_hint_state = expected_state.0 | CHANNEL_STATE_DISCONNECT_HINT_BIT;
256
257 let _ = self
258 .__internal_get_channel_state(channel_id)
259 .compare_exchange(
260 expected_state.0,
261 disconnect_hint_state,
262 Ordering::Relaxed,
263 Ordering::Relaxed,
264 );
265 }
266
267 fn has_disconnect_hint(&self, channel_id: ChannelId, expected_state: ChannelState) -> bool {
268 let disconnect_hint_state = expected_state.0 | CHANNEL_STATE_DISCONNECT_HINT_BIT;
269 disconnect_hint_state
270 == self
271 .__internal_get_channel_state(channel_id)
272 .load(Ordering::Relaxed)
273 }
274
275 fn has_channel_state(&self, channel_id: ChannelId, expected_state: ChannelState) -> bool {
276 let state = self
277 .__internal_get_channel_state(channel_id)
278 .load(Ordering::Relaxed);
279 let state_without_disconnect_hint_bit = state & !(CHANNEL_STATE_DISCONNECT_HINT_BIT);
280 expected_state.0 == state_without_disconnect_hint_bit
281 }
282
283 fn close_channel(&self, channel_id: ChannelId, expected_state: ChannelState) {
284 match self
285 .__internal_get_channel_state(channel_id)
286 .compare_exchange(
287 expected_state.0,
288 CHANNEL_STATE_CLOSED.0,
289 Ordering::Relaxed,
290 Ordering::Relaxed,
291 ) {
292 Ok(_) => (),
293 Err(v) => {
294 let graceful_disconnect_state =
295 expected_state.0 | CHANNEL_STATE_DISCONNECT_HINT_BIT;
296 if v == graceful_disconnect_state {
297 let _ = self
298 .__internal_get_channel_state(channel_id)
299 .compare_exchange(
300 graceful_disconnect_state,
301 CHANNEL_STATE_CLOSED.0,
302 Ordering::Relaxed,
303 Ordering::Relaxed,
304 );
305 }
306 }
307 }
308 }
309}
310
311pub trait BackpressureToReceiverFn: Fn(u64, Duration) -> BackpressureToReceiverAction {}
323
324impl<F: Fn(u64, Duration) -> BackpressureToReceiverAction> BackpressureToReceiverFn for F {}
325
326pub trait ZeroCopySender: Debug + ZeroCopyPortDetails + NamedConcept + Send + Abandonable {
327 fn try_send(
328 &self,
329 ptr: PointerOffset,
330 sample_size: usize,
331 channel_id: ChannelId,
332 ) -> Result<Option<PointerOffset>, ZeroCopySendError>;
333
334 fn blocking_send<F: BackpressureToReceiverFn>(
335 &self,
336 ptr: PointerOffset,
337 sample_size: usize,
338 channel_id: ChannelId,
339 backpressure_to_receiver_handler: F,
340 backpressure_action_for_strategy: BackpressureToReceiverAction,
341 ) -> Result<Option<PointerOffset>, ZeroCopySendError>;
342
343 fn reclaim(&self, channel_id: ChannelId)
344 -> Result<Option<PointerOffset>, ZeroCopyReclaimError>;
345
346 unsafe fn acquire_used_offsets<F: FnMut(PointerOffset)>(&self, callback: F);
353}
354
355pub trait ZeroCopyReceiver:
356 Debug + ZeroCopyPortDetails + NamedConcept + Send + Abandonable
357{
358 fn has_data(&self, channel_id: ChannelId) -> bool;
359 fn receive(&self, channel_id: ChannelId)
360 -> Result<Option<PointerOffset>, ZeroCopyReceiveError>;
361 fn release(
362 &self,
363 ptr: PointerOffset,
364 channel_id: ChannelId,
365 ) -> Result<(), ZeroCopyReleaseError>;
366 fn borrow_count(&self, channel_id: ChannelId) -> usize;
367}
368
369pub trait ZeroCopyConnection: Debug + Sized + NamedConceptMgmt {
370 type Sender: ZeroCopySender;
371 type Receiver: ZeroCopyReceiver;
372 type Builder: ZeroCopyConnectionBuilder<Self>;
373
374 unsafe fn remove_sender(
382 name: &FileName,
383 config: &Self::Configuration,
384 ) -> Result<(), ZeroCopyPortRemoveError>;
385
386 unsafe fn remove_receiver(
394 name: &FileName,
395 config: &Self::Configuration,
396 ) -> Result<(), ZeroCopyPortRemoveError>;
397
398 fn does_support_safe_overflow() -> bool {
400 false
401 }
402
403 fn has_configurable_buffer_size() -> bool {
405 false
406 }
407
408 fn default_suffix() -> FileName {
410 unsafe { FileName::new_unchecked(b".rx") }
411 }
412}