zenoh_shm/
lib.rs

1//
2// Copyright (c) 2023 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14
15//! ⚠️ WARNING ⚠️
16//!
17//! This crate is intended for Zenoh's internal use.
18//!
19//! [Click here for Zenoh's documentation](https://docs.rs/zenoh/latest/zenoh)
20use std::{any::Any, num::NonZeroUsize, sync::atomic::Ordering};
21
22use api::{
23    buffer::{
24        traits::BufferRelayoutError,
25        zshm::{zshm, ZShm},
26        zshmmut::{zshmmut, ZShmMut},
27    },
28    common::types::ProtocolID,
29    provider::memory_layout::MemoryLayout,
30};
31use metadata::descriptor::MetadataDescriptor;
32use watchdog::confirmator::ConfirmedDescriptor;
33use zenoh_buffers::ZSliceBuffer;
34
35use crate::api::common::types::PtrInSegment;
36
37#[macro_export]
38macro_rules! tested_module {
39    ($module:ident) => {
40        #[cfg(feature = "test")]
41        pub mod $module;
42        #[cfg(not(feature = "test"))]
43        mod $module;
44    };
45}
46
47#[macro_export]
48macro_rules! tested_crate_module {
49    ($module:ident) => {
50        #[cfg(feature = "test")]
51        pub mod $module;
52        #[cfg(not(feature = "test"))]
53        pub(crate) mod $module;
54    };
55}
56
57pub mod api;
58mod cleanup;
59pub mod header;
60pub mod init;
61pub mod metadata;
62pub mod posix_shm;
63pub mod reader;
64pub mod version;
65pub mod watchdog;
66tested_crate_module!(shm);
67
68/// Information about a [`ShmBufInner`].
69///
70/// This that can be serialized and can be used to retrieve the [`ShmBufInner`] in a remote process.
71#[derive(Clone, Debug, PartialEq, Eq)]
72pub struct ShmBufInfo {
73    /// Actual data length
74    /// NOTE: data descriptor's len is >= of this len and describes the actual memory length
75    /// dedicated in shared memory segment for this particular buffer.
76    pub data_len: NonZeroUsize,
77
78    /// Metadata descriptor
79    pub metadata: MetadataDescriptor,
80    /// Generation of the buffer
81    pub generation: u32,
82}
83
84impl ShmBufInfo {
85    pub fn new(
86        data_len: NonZeroUsize,
87        metadata: MetadataDescriptor,
88        generation: u32,
89    ) -> ShmBufInfo {
90        ShmBufInfo {
91            data_len,
92            metadata,
93            generation,
94        }
95    }
96}
97
98/// A zenoh buffer in shared memory.
99pub struct ShmBufInner {
100    pub(crate) metadata: ConfirmedDescriptor,
101    pub(crate) buf: PtrInSegment,
102    pub info: ShmBufInfo,
103}
104
105impl PartialEq for ShmBufInner {
106    fn eq(&self, other: &Self) -> bool {
107        // currently there is no API to resize an SHM buffer, but it is intended in the future,
108        // so I add size comparison here to avoid future bugs :)
109        self.buf == other.buf && self.info.data_len == other.info.data_len
110    }
111}
112impl Eq for ShmBufInner {}
113
114impl std::fmt::Debug for ShmBufInner {
115    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
116        f.debug_struct("ShmBufInner")
117            .field("metadata", &self.metadata)
118            .field("buf", &self.buf)
119            .field("info", &self.info)
120            .finish()
121    }
122}
123
124impl ShmBufInner {
125    pub fn protocol(&self) -> ProtocolID {
126        self.metadata
127            .owned
128            .header()
129            .protocol
130            .load(Ordering::Relaxed)
131    }
132
133    /// # Safety
134    /// This operation is unsafe because we cannot guarantee that there is no upper level
135    /// ZSlice that have cached the size of underlying ShmBufInner. So this function should be
136    /// used only if it is guaranteed that there is no such situation
137    pub unsafe fn try_resize(&mut self, new_size: NonZeroUsize) -> Option<()> {
138        if self.capacity() < new_size {
139            return None;
140        }
141
142        self.info.data_len = new_size;
143
144        Some(())
145    }
146
147    /// # Safety
148    /// This operation is unsafe because we cannot guarantee that there is no upper level
149    /// ZSlice that have cached the size of underlying ShmBufInner. So this function should be
150    /// used only if it is guaranteed that there is no such situation
151    pub unsafe fn try_relayout(
152        &mut self,
153        new_layout: MemoryLayout,
154    ) -> Result<(), BufferRelayoutError> {
155        let address = self.as_ref().as_ptr() as usize;
156        if address % new_layout.alignment().get_alignment_value().get() != 0 {
157            return Err(BufferRelayoutError::IncompatibleAlignment);
158        }
159
160        if self.capacity() < new_layout.size() {
161            return Err(BufferRelayoutError::SizeTooBig);
162        }
163
164        self.info.data_len = new_layout.size();
165
166        Ok(())
167    }
168
169    pub fn len(&self) -> NonZeroUsize {
170        self.info.data_len
171    }
172
173    pub fn capacity(&self) -> NonZeroUsize {
174        self.metadata.owned.header().len()
175    }
176
177    fn is_valid(&self) -> bool {
178        let header = self.metadata.owned.header();
179
180        !header.watchdog_invalidated.load(Ordering::SeqCst)
181            && header.generation.load(Ordering::SeqCst) == self.info.generation
182    }
183
184    fn is_unique(&self) -> bool {
185        self.ref_count() == 1
186    }
187
188    pub fn ref_count(&self) -> u32 {
189        self.metadata.owned.header().refcount.load(Ordering::SeqCst)
190    }
191
192    /// Increments buffer's reference count
193    ///
194    /// # Safety
195    /// You should understand what you are doing, as overestimation
196    /// of the reference counter can lead to memory being stalled until
197    /// recovered by watchdog subsystem or forcibly deallocated
198    pub unsafe fn inc_ref_count(&self) {
199        self.metadata
200            .owned
201            .header()
202            .refcount
203            .fetch_add(1, Ordering::SeqCst);
204    }
205
206    // PRIVATE:
207    fn as_slice(&self) -> &[u8] {
208        tracing::trace!("ShmBufInner::as_slice() == len = {:?}", self.info.data_len);
209        unsafe { std::slice::from_raw_parts(self.buf.ptr(), self.info.data_len.get()) }
210    }
211
212    unsafe fn dec_ref_count(&self) {
213        self.metadata
214            .owned
215            .header()
216            .refcount
217            .fetch_sub(1, Ordering::SeqCst);
218    }
219
220    /// Gets a mutable slice.
221    ///
222    /// # Safety
223    /// This operation is marked unsafe since we cannot guarantee the single mutable reference
224    /// across multiple processes. Thus if you use it, and you'll inevitable have to use it,
225    /// you have to keep in mind that if you have multiple process retrieving a mutable slice
226    /// you may get into concurrent writes. That said, if you have a serial pipeline and
227    /// the buffer is flowing through the pipeline this will not create any issues.
228    ///
229    /// In short, whilst this operation is marked as unsafe, you are safe if you can
230    /// guarantee that your in applications only one process at the time will actually write.
231    unsafe fn as_mut_slice_inner(&mut self) -> &mut [u8] {
232        std::slice::from_raw_parts_mut(self.buf.ptr_mut(), self.info.data_len.get())
233    }
234}
235
236impl Drop for ShmBufInner {
237    fn drop(&mut self) {
238        // SAFETY: obviously, we need to decrement refcount when dropping ShmBufInner instance
239        unsafe { self.dec_ref_count() };
240    }
241}
242
243impl Clone for ShmBufInner {
244    fn clone(&self) -> Self {
245        // SAFETY: obviously, we need to increment refcount when cloning ShmBufInner instance
246        unsafe { self.inc_ref_count() };
247        ShmBufInner {
248            metadata: self.metadata.clone(),
249            buf: self.buf.clone(),
250            info: self.info.clone(),
251        }
252    }
253}
254
255// Buffer impls
256// - ShmBufInner
257impl AsRef<[u8]> for ShmBufInner {
258    fn as_ref(&self) -> &[u8] {
259        self.as_slice()
260    }
261}
262
263impl AsMut<[u8]> for ShmBufInner {
264    fn as_mut(&mut self) -> &mut [u8] {
265        unsafe { self.as_mut_slice_inner() }
266    }
267}
268
269impl ZSliceBuffer for ShmBufInner {
270    fn as_slice(&self) -> &[u8] {
271        self.as_ref()
272    }
273
274    fn as_any(&self) -> &dyn Any {
275        self
276    }
277
278    fn as_any_mut(&mut self) -> &mut dyn Any {
279        self
280    }
281}
282
283impl ZShm {
284    pub(crate) fn new(inner: ShmBufInner) -> Self {
285        Self { inner }
286    }
287}
288
289impl From<ShmBufInner> for ZShm {
290    fn from(value: ShmBufInner) -> Self {
291        Self::new(value)
292    }
293}
294
295impl ZShmMut {
296    pub(crate) unsafe fn new_unchecked(inner: ShmBufInner) -> Self {
297        Self { inner }
298    }
299}
300
301impl TryFrom<ShmBufInner> for ZShmMut {
302    type Error = ShmBufInner;
303
304    fn try_from(value: ShmBufInner) -> Result<Self, Self::Error> {
305        match value.is_unique() && value.is_valid() {
306            // SAFETY: we checked above
307            true => Ok(unsafe { Self::new_unchecked(value) }),
308            false => Err(value),
309        }
310    }
311}
312
313impl TryFrom<&mut ShmBufInner> for &mut zshmmut {
314    type Error = ();
315
316    fn try_from(value: &mut ShmBufInner) -> Result<Self, Self::Error> {
317        match value.is_unique() && value.is_valid() {
318            // SAFETY: ZShm, ZShmMut, zshm and zshmmut are #[repr(transparent)]
319            // to ShmBufInner type, so it is safe to transmute them in any direction
320            true => Ok(unsafe { core::mem::transmute::<&mut ShmBufInner, &mut zshmmut>(value) }),
321            false => Err(()),
322        }
323    }
324}
325
326impl From<&ShmBufInner> for &zshm {
327    fn from(value: &ShmBufInner) -> Self {
328        // SAFETY: ZShm, ZShmMut, zshm and zshmmut are #[repr(transparent)]
329        // to ShmBufInner type, so it is safe to transmute them in any direction
330        unsafe { core::mem::transmute::<&ShmBufInner, &zshm>(value) }
331    }
332}
333
334impl From<&mut ShmBufInner> for &mut zshm {
335    fn from(value: &mut ShmBufInner) -> Self {
336        // SAFETY: ZShm, ZShmMut, zshm and zshmmut are #[repr(transparent)]
337        // to ShmBufInner type, so it is safe to transmute them in any direction
338        unsafe { core::mem::transmute::<&mut ShmBufInner, &mut zshm>(value) }
339    }
340}