Skip to main content

iceoryx2_cal/zero_copy_connection/
mod.rs

1// Copyright (c) 2023 Contributors to the Eclipse Foundation
2//
3// See the NOTICE file(s) distributed with this work for additional
4// information regarding copyright ownership.
5//
6// This program and the accompanying materials are made available under the
7// terms of the Apache Software License 2.0 which is available at
8// https://www.apache.org/licenses/LICENSE-2.0, or the MIT license
9// which is available at https://opensource.org/licenses/MIT.
10//
11// SPDX-License-Identifier: Apache-2.0 OR MIT
12
13pub mod common;
14pub mod file;
15pub mod posix_shared_memory;
16pub mod process_local;
17pub mod recommended;
18pub mod used_chunk_list;
19
20use core::fmt::Debug;
21use core::time::Duration;
22
23pub use crate::shared_memory::PointerOffset;
24use iceoryx2_bb_elementary_traits::testing::abandonable::Abandonable;
25pub use iceoryx2_bb_system_types::file_name::*;
26pub use iceoryx2_bb_system_types::path::Path;
27use iceoryx2_log::fail;
28
29use crate::static_storage::file::{NamedConcept, NamedConceptBuilder, NamedConceptMgmt};
30use iceoryx2_bb_concurrency::atomic::{AtomicU64, Ordering};
31use iceoryx2_bb_derive_macros::ZeroCopySend;
32use iceoryx2_bb_elementary_traits::zero_copy_send::ZeroCopySend;
33
34#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
35pub enum ZeroCopyPortRemoveError {
36    InternalError,
37    VersionMismatch,
38    InsufficientPermissions,
39    DoesNotExist,
40}
41
42impl core::fmt::Display for ZeroCopyPortRemoveError {
43    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
44        write!(f, "ZeroCopyPortRemoveError::{self:?}")
45    }
46}
47
48impl core::error::Error for ZeroCopyPortRemoveError {}
49
50#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
51pub enum ZeroCopyCreationError {
52    InternalError,
53    IsBeingCleanedUp,
54    AnotherInstanceIsAlreadyConnected,
55    InsufficientPermissions,
56    VersionMismatch,
57    ConnectionMaybeCorrupted,
58    InvalidSampleSize,
59    InitializationNotYetFinalized,
60    IncompatibleBufferSize,
61    IncompatibleMaxBorrowedSamplesPerChannelSetting,
62    IncompatibleOverflowSetting,
63    IncompatibleNumberOfSamples,
64    IncompatibleNumberOfSegments,
65    IncompatibleNumberOfChannels,
66}
67
68impl core::fmt::Display for ZeroCopyCreationError {
69    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
70        write!(f, "{}::{:?}", stringify!(Self), self)
71    }
72}
73
74impl core::error::Error for ZeroCopyCreationError {}
75
76#[derive(Debug, Clone, Copy, PartialEq, Eq)]
77pub enum ZeroCopySendError {
78    ConnectionCorrupted,
79    ReceiveBufferFull,
80    UnableToDeliver, // NOTE: in order to distinguish between a try_send failure and a user induced send aborting, the UnableToDeliver error is used
81    UsedChunkListFull,
82    NoConnectedReceiver,
83    ChannelIsClosed,
84    InternalError,
85}
86
87impl core::fmt::Display for ZeroCopySendError {
88    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
89        write!(f, "{}::{:?}", stringify!(Self), self)
90    }
91}
92
93impl core::error::Error for ZeroCopySendError {}
94
95#[derive(Debug, Clone, Copy, PartialEq, Eq)]
96pub enum ZeroCopyReceiveError {
97    ReceiveWouldExceedMaxBorrowValue,
98}
99
100impl core::fmt::Display for ZeroCopyReceiveError {
101    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
102        write!(f, "{}::{:?}", stringify!(Self), self)
103    }
104}
105
106impl core::error::Error for ZeroCopyReceiveError {}
107
108#[derive(Debug, Clone, Copy, PartialEq, Eq)]
109pub enum ZeroCopyReclaimError {
110    ReceiverReturnedCorruptedPointerOffset,
111}
112
113impl core::fmt::Display for ZeroCopyReclaimError {
114    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
115        write!(f, "{}::{:?}", stringify!(Self), self)
116    }
117}
118
119impl core::error::Error for ZeroCopyReclaimError {}
120
121#[derive(Debug, Clone, Copy, PartialEq, Eq)]
122pub enum ZeroCopyReleaseError {
123    RetrieveBufferFull,
124}
125
126impl core::fmt::Display for ZeroCopyReleaseError {
127    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
128        write!(f, "{}::{:?}", stringify!(Self), self)
129    }
130}
131
132impl core::error::Error for ZeroCopyReleaseError {}
133
134/// Defines the action that shall be take when a [`PointerOffset`]
135/// cannot be delivered.
136#[derive(Debug, PartialEq, Eq, Copy, Clone)]
137pub enum BackpressureToReceiverAction {
138    /// Use an action which is derived from the `BackpressureStrategy`
139    FollowBackpressureyStrategy,
140    /// Retry to send and invoke the handler again, if sending does not succeed
141    Retry,
142    /// Discard the [`PointerOffset`] for the [`ZeroCopyReceiver`] which cause the incident
143    /// and continue to deliver the [`PointerOffset`] to the remaining [`ZeroCopyReceiver`]s
144    DiscardPointerOffset,
145    /// Discard the [`PointerOffset`] for the [`ZeroCopyReceiver`] which caused the incident,
146    /// continue to deliver the [`PointerOffset`] to the remaining [`ZeroCopyReceiver`]s;
147    /// return with an error if the [`PointerOffset`] was not delivered to all [`ZeroCopyReceiver`]s
148    DiscardPointerOffsetAndFail,
149}
150
151#[repr(C)]
152#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, ZeroCopySend)]
153pub struct ChannelId(usize);
154
155impl ChannelId {
156    pub const fn new(value: usize) -> Self {
157        Self(value)
158    }
159
160    pub const fn value(&self) -> usize {
161        self.0
162    }
163}
164
165#[derive(Debug, Clone, Copy, PartialEq, Eq)]
166pub enum ChannelStateNewError {
167    ValueOutOfBounds,
168}
169
170impl core::fmt::Display for ChannelStateNewError {
171    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
172        write!(f, "{}::{:?}", stringify!(Self), self)
173    }
174}
175
176impl core::error::Error for ChannelStateNewError {}
177
178#[repr(C)]
179#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, ZeroCopySend)]
180pub struct ChannelState(u64);
181
182impl ChannelState {
183    pub fn new(value: u64) -> Result<Self, ChannelStateNewError> {
184        if value > Self::max_value() {
185            fail!(from "ChannelState::new()", with ChannelStateNewError::ValueOutOfBounds,
186                "Unable to create new ChannelState since the value must be less or equal than 2^62 and this value is {value},");
187        }
188
189        Ok(Self(value))
190    }
191
192    pub const fn max_value() -> u64 {
193        2u64.pow(62)
194    }
195
196    pub fn value(&self) -> u64 {
197        self.0
198    }
199}
200
201pub const DEFAULT_BUFFER_SIZE: usize = 4;
202pub const DEFAULT_ENABLE_SAFE_OVERFLOW: bool = false;
203pub const DEFAULT_MAX_BORROWED_SAMPLES_PER_CHANNEL: usize = 4;
204pub const DEFAULT_MAX_SUPPORTED_SHARED_MEMORY_SEGMENTS: u8 = 1;
205pub const DEFAULT_NUMBER_OF_CHANNELS: usize = 1;
206pub const DEFAULT_NUMBER_OF_SAMPLES_PER_SEGMENT: usize = 8;
207/// The initial value of the channel state
208pub const CHANNEL_STATE_OPEN: ChannelState = ChannelState(0);
209/// A channel with an invalid state will never block in [`ZeroCopySender::blocking_send()`];
210pub const CHANNEL_STATE_CLOSED: ChannelState = ChannelState(u64::MAX);
211/// Hints the channel that the other side intends to disconnect.
212const CHANNEL_STATE_DISCONNECT_HINT_BIT: u64 = 1u64 << 63;
213
214pub trait ZeroCopyConnectionBuilder<C: ZeroCopyConnection>: NamedConceptBuilder<C> {
215    fn buffer_size(self, value: usize) -> Self;
216    fn enable_safe_overflow(self, value: bool) -> Self;
217    fn receiver_max_borrowed_samples_per_channel(self, value: usize) -> Self;
218    fn max_supported_shared_memory_segments(self, value: u8) -> Self;
219    fn number_of_samples_per_segment(self, value: usize) -> Self;
220    fn number_of_channels(self, value: usize) -> Self;
221    fn initial_channel_state(self, value: ChannelState) -> Self;
222    /// The timeout defines how long the [`ZeroCopyConnectionBuilder`] should wait for
223    /// concurrent
224    /// [`ZeroCopyConnectionBuilder::create_sender()`] or
225    /// [`ZeroCopyConnectionBuilder::create_receiver()`] call to finalize its initialization.
226    /// By default it is set to [`Duration::ZERO`] for no timeout.
227    fn timeout(self, value: Duration) -> Self;
228
229    fn create_sender(self) -> Result<C::Sender, ZeroCopyCreationError>;
230    fn create_receiver(self) -> Result<C::Receiver, ZeroCopyCreationError>;
231}
232
233pub trait ZeroCopyPortDetails {
234    fn number_of_channels(&self) -> usize;
235    fn buffer_size(&self) -> usize;
236    fn has_enabled_safe_overflow(&self) -> bool;
237    fn max_borrowed_samples(&self) -> usize;
238    fn max_supported_shared_memory_segments(&self) -> u8;
239    fn is_connected(&self) -> bool;
240    #[doc(hidden)]
241    fn __internal_get_channel_state(&self, channel_id: ChannelId) -> &AtomicU64;
242
243    fn set_channel_state(&self, channel_id: ChannelId, state: ChannelState) -> bool {
244        self.__internal_get_channel_state(channel_id)
245            .compare_exchange(
246                CHANNEL_STATE_CLOSED.0,
247                state.0,
248                Ordering::Relaxed,
249                Ordering::Relaxed,
250            )
251            .is_ok()
252    }
253
254    fn set_disconnect_hint(&self, channel_id: ChannelId, expected_state: ChannelState) {
255        let disconnect_hint_state = expected_state.0 | CHANNEL_STATE_DISCONNECT_HINT_BIT;
256
257        let _ = self
258            .__internal_get_channel_state(channel_id)
259            .compare_exchange(
260                expected_state.0,
261                disconnect_hint_state,
262                Ordering::Relaxed,
263                Ordering::Relaxed,
264            );
265    }
266
267    fn has_disconnect_hint(&self, channel_id: ChannelId, expected_state: ChannelState) -> bool {
268        let disconnect_hint_state = expected_state.0 | CHANNEL_STATE_DISCONNECT_HINT_BIT;
269        disconnect_hint_state
270            == self
271                .__internal_get_channel_state(channel_id)
272                .load(Ordering::Relaxed)
273    }
274
275    fn has_channel_state(&self, channel_id: ChannelId, expected_state: ChannelState) -> bool {
276        let state = self
277            .__internal_get_channel_state(channel_id)
278            .load(Ordering::Relaxed);
279        let state_without_disconnect_hint_bit = state & !(CHANNEL_STATE_DISCONNECT_HINT_BIT);
280        expected_state.0 == state_without_disconnect_hint_bit
281    }
282
283    fn close_channel(&self, channel_id: ChannelId, expected_state: ChannelState) {
284        match self
285            .__internal_get_channel_state(channel_id)
286            .compare_exchange(
287                expected_state.0,
288                CHANNEL_STATE_CLOSED.0,
289                Ordering::Relaxed,
290                Ordering::Relaxed,
291            ) {
292            Ok(_) => (),
293            Err(v) => {
294                let graceful_disconnect_state =
295                    expected_state.0 | CHANNEL_STATE_DISCONNECT_HINT_BIT;
296                if v == graceful_disconnect_state {
297                    let _ = self
298                        .__internal_get_channel_state(channel_id)
299                        .compare_exchange(
300                            graceful_disconnect_state,
301                            CHANNEL_STATE_CLOSED.0,
302                            Ordering::Relaxed,
303                            Ordering::Relaxed,
304                        );
305                }
306            }
307        }
308    }
309}
310
311/// The backpressurey handler invoked by a send function when a [`PointerOffset`]
312/// cannot be delivered to a [`ZeroCopyReceiver`]
313///
314/// # Arguments
315///
316/// * u64: is the retry counter for a delivery incident with a specific sender-receiver pair
317/// * Duration: is the elapsed time since the incident was detected
318///
319/// # Returns
320///
321/// The [`BackpressureToReceiverAction`] to be taken to mitigate the incident
322pub trait BackpressureToReceiverFn: Fn(u64, Duration) -> BackpressureToReceiverAction {}
323
324impl<F: Fn(u64, Duration) -> BackpressureToReceiverAction> BackpressureToReceiverFn for F {}
325
326pub trait ZeroCopySender: Debug + ZeroCopyPortDetails + NamedConcept + Send + Abandonable {
327    fn try_send(
328        &self,
329        ptr: PointerOffset,
330        sample_size: usize,
331        channel_id: ChannelId,
332    ) -> Result<Option<PointerOffset>, ZeroCopySendError>;
333
334    fn blocking_send<F: BackpressureToReceiverFn>(
335        &self,
336        ptr: PointerOffset,
337        sample_size: usize,
338        channel_id: ChannelId,
339        backpressure_to_receiver_handler: F,
340        backpressure_action_for_strategy: BackpressureToReceiverAction,
341    ) -> Result<Option<PointerOffset>, ZeroCopySendError>;
342
343    fn reclaim(&self, channel_id: ChannelId)
344    -> Result<Option<PointerOffset>, ZeroCopyReclaimError>;
345
346    /// # Safety
347    ///
348    /// * must ensure that no receiver is still holding data, otherwise data races may occur on
349    ///   receiver side
350    /// * must ensure that [`ZeroCopySender::try_send()`] and [`ZeroCopySender::blocking_send()`]
351    ///   are not called after using this method
352    unsafe fn acquire_used_offsets<F: FnMut(PointerOffset)>(&self, callback: F);
353}
354
355pub trait ZeroCopyReceiver:
356    Debug + ZeroCopyPortDetails + NamedConcept + Send + Abandonable
357{
358    fn has_data(&self, channel_id: ChannelId) -> bool;
359    fn receive(&self, channel_id: ChannelId)
360    -> Result<Option<PointerOffset>, ZeroCopyReceiveError>;
361    fn release(
362        &self,
363        ptr: PointerOffset,
364        channel_id: ChannelId,
365    ) -> Result<(), ZeroCopyReleaseError>;
366    fn borrow_count(&self, channel_id: ChannelId) -> usize;
367}
368
369pub trait ZeroCopyConnection: Debug + Sized + NamedConceptMgmt {
370    type Sender: ZeroCopySender;
371    type Receiver: ZeroCopyReceiver;
372    type Builder: ZeroCopyConnectionBuilder<Self>;
373
374    /// Removes the [`ZeroCopySender`] forcefully from the [`ZeroCopyConnection`]. This shall only
375    /// be called when the [`ZeroCopySender`] died and the connection shall be cleaned up without
376    /// causing any problems on the living [`ZeroCopyReceiver`] side.
377    ///
378    /// # Safety
379    ///
380    ///  * must ensure that the [`ZeroCopySender`] died while being connected.
381    unsafe fn remove_sender(
382        name: &FileName,
383        config: &Self::Configuration,
384    ) -> Result<(), ZeroCopyPortRemoveError>;
385
386    /// Removes the [`ZeroCopyReceiver`] forcefully from the [`ZeroCopyConnection`]. This shall
387    /// only be called when the [`ZeroCopySender`] died and the connection shall be cleaned up
388    /// without causing any problems on the living [`ZeroCopySender`] side.
389    ///
390    /// # Safety
391    ///
392    ///  * must ensure that the [`ZeroCopyReceiver`] died while being connected.
393    unsafe fn remove_receiver(
394        name: &FileName,
395        config: &Self::Configuration,
396    ) -> Result<(), ZeroCopyPortRemoveError>;
397
398    /// Returns true if the connection supports safe overflow
399    fn does_support_safe_overflow() -> bool {
400        false
401    }
402
403    /// Returns true if the buffer size of the connection can be configured
404    fn has_configurable_buffer_size() -> bool {
405        false
406    }
407
408    /// The default suffix of every zero copy connection
409    fn default_suffix() -> FileName {
410        unsafe { FileName::new_unchecked(b".rx") }
411    }
412}