zenoh_shm/api/protocol_implementations/posix/
posix_shm_provider_backend.rs

1//
2// Copyright (c) 2023 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14
15use std::{
16    borrow::Borrow,
17    cmp,
18    collections::BinaryHeap,
19    num::NonZeroUsize,
20    sync::{
21        atomic::{AtomicPtr, AtomicUsize, Ordering},
22        Mutex,
23    },
24};
25
26use zenoh_core::{zlock, Resolvable, Wait};
27use zenoh_result::ZResult;
28
29use super::posix_shm_segment::PosixShmSegment;
30use crate::api::{
31    common::types::ChunkID,
32    provider::{
33        chunk::{AllocatedChunk, ChunkDescriptor},
34        shm_provider_backend::ShmProviderBackend,
35        types::{AllocAlignment, ChunkAllocResult, MemoryLayout, ZAllocError, ZLayoutError},
36    },
37};
38
39#[derive(Eq, Copy, Clone, Debug)]
40struct Chunk {
41    offset: ChunkID,
42    size: NonZeroUsize,
43}
44
45impl Ord for Chunk {
46    fn cmp(&self, other: &Self) -> cmp::Ordering {
47        self.size.cmp(&other.size)
48    }
49}
50
51impl PartialOrd for Chunk {
52    fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
53        Some(self.cmp(other))
54    }
55}
56
57impl PartialEq for Chunk {
58    fn eq(&self, other: &Self) -> bool {
59        self.size == other.size
60    }
61}
62
63/// Builder to create posix SHM provider
64#[zenoh_macros::unstable_doc]
65pub struct PosixShmProviderBackendBuilder;
66
67impl PosixShmProviderBackendBuilder {
68    /// Use existing layout
69    #[zenoh_macros::unstable_doc]
70    pub fn with_layout<Layout: Borrow<MemoryLayout>>(
71        self,
72        layout: Layout,
73    ) -> LayoutedPosixShmProviderBackendBuilder<Layout> {
74        LayoutedPosixShmProviderBackendBuilder { layout }
75    }
76
77    /// Construct layout in-place using arguments
78    #[zenoh_macros::unstable_doc]
79    pub fn with_layout_args(
80        self,
81        size: usize,
82        alignment: AllocAlignment,
83    ) -> Result<LayoutedPosixShmProviderBackendBuilder<MemoryLayout>, ZLayoutError> {
84        let layout = MemoryLayout::new(size, alignment)?;
85        Ok(LayoutedPosixShmProviderBackendBuilder { layout })
86    }
87
88    /// Construct layout in-place from size (default alignment will be used)
89    #[zenoh_macros::unstable_doc]
90    pub fn with_size(
91        self,
92        size: usize,
93    ) -> Result<LayoutedPosixShmProviderBackendBuilder<MemoryLayout>, ZLayoutError> {
94        let layout = MemoryLayout::new(size, AllocAlignment::default())?;
95        Ok(LayoutedPosixShmProviderBackendBuilder { layout })
96    }
97}
98
99#[zenoh_macros::unstable_doc]
100pub struct LayoutedPosixShmProviderBackendBuilder<Layout: Borrow<MemoryLayout>> {
101    layout: Layout,
102}
103
104#[zenoh_macros::unstable_doc]
105impl<Layout: Borrow<MemoryLayout>> Resolvable for LayoutedPosixShmProviderBackendBuilder<Layout> {
106    type To = ZResult<PosixShmProviderBackend>;
107}
108
109#[zenoh_macros::unstable_doc]
110impl<Layout: Borrow<MemoryLayout>> Wait for LayoutedPosixShmProviderBackendBuilder<Layout> {
111    fn wait(self) -> <Self as Resolvable>::To {
112        PosixShmProviderBackend::new(self.layout.borrow())
113    }
114}
115
116/// A backend for ShmProvider based on POSIX shared memory.
117/// This is the default general-purpose backed shipped with Zenoh.
118#[zenoh_macros::unstable_doc]
119pub struct PosixShmProviderBackend {
120    available: AtomicUsize,
121    segment: PosixShmSegment,
122    free_list: Mutex<BinaryHeap<Chunk>>,
123    alignment: AllocAlignment,
124}
125
126impl PosixShmProviderBackend {
127    /// Get the builder to construct a new instance
128    #[zenoh_macros::unstable_doc]
129    pub fn builder() -> PosixShmProviderBackendBuilder {
130        PosixShmProviderBackendBuilder
131    }
132
133    fn new(layout: &MemoryLayout) -> ZResult<Self> {
134        let segment = PosixShmSegment::create(layout.size())?;
135
136        // because of platform specific, our shm segment is >= requested size, so in order to utilize
137        // additional memory we re-layout the size
138        let real_size = segment.segment.elem_count().get();
139        let aligned_size = (real_size
140            - (real_size % layout.alignment().get_alignment_value().get()))
141        .try_into()?;
142
143        let mut free_list = BinaryHeap::new();
144        let root_chunk = Chunk {
145            offset: 0,
146            size: aligned_size,
147        };
148        free_list.push(root_chunk);
149
150        tracing::trace!(
151            "Created PosixShmProviderBackend id {}, layout {:?}, aligned size {aligned_size}",
152            segment.segment.id(),
153            layout
154        );
155
156        Ok(Self {
157            available: AtomicUsize::new(aligned_size.get()),
158            segment,
159            free_list: Mutex::new(free_list),
160            alignment: layout.alignment(),
161        })
162    }
163}
164
165impl ShmProviderBackend for PosixShmProviderBackend {
166    fn alloc(&self, layout: &MemoryLayout) -> ChunkAllocResult {
167        tracing::trace!("PosixShmProviderBackend::alloc({:?})", layout);
168
169        let required_len = layout.size();
170
171        if self.available.load(Ordering::Relaxed) < required_len.get() {
172            tracing::trace!( "PosixShmProviderBackend does not have sufficient free memory to allocate {:?}, try de-fragmenting!", layout);
173            return Err(ZAllocError::OutOfMemory);
174        }
175
176        let mut guard = zlock!(self.free_list);
177        // The strategy taken is the same for some Unix System V implementations -- as described in the
178        // famous Bach's book --  in essence keep an ordered list of free slot and always look for the
179        // biggest as that will give the biggest left-over.
180        match guard.pop() {
181            Some(mut chunk) if chunk.size >= required_len => {
182                // NOTE: don't loose any chunks here, as it will lead to memory leak
183                tracing::trace!("Allocator selected Chunk ({:?})", &chunk);
184
185                if chunk.size > required_len {
186                    let free_chunk = Chunk {
187                        offset: chunk.offset + required_len.get() as ChunkID,
188                        // SAFETY: this is safe because we always operate on a leftover, which is checked above!
189                        size: unsafe {
190                            NonZeroUsize::new_unchecked(chunk.size.get() - required_len.get())
191                        },
192                    };
193                    tracing::trace!("The allocation will leave a Free Chunk: {:?}", &free_chunk);
194                    guard.push(free_chunk);
195                    chunk.size = required_len;
196                }
197
198                self.available
199                    .fetch_sub(chunk.size.get(), Ordering::Relaxed);
200
201                let descriptor =
202                    ChunkDescriptor::new(self.segment.segment.id(), chunk.offset, chunk.size);
203
204                Ok(AllocatedChunk {
205                    descriptor,
206                    data: unsafe { AtomicPtr::new(self.segment.segment.elem_mut(chunk.offset)) },
207                })
208            }
209            Some(c) => {
210                tracing::trace!("PosixShmProviderBackend::alloc({:?}) cannot find any big enough chunk\nShmManager::free_list = {:?}", layout, self.free_list);
211                guard.push(c);
212                Err(ZAllocError::NeedDefragment)
213            }
214            None => {
215                // NOTE: that should never happen! If this happens - there is a critical bug somewhere around!
216                let err = format!("PosixShmProviderBackend::alloc({:?}) cannot find any available chunk\nShmManager::free_list = {:?}", layout, self.free_list);
217                #[cfg(feature = "test")]
218                panic!("{err}");
219                #[cfg(not(feature = "test"))]
220                {
221                    tracing::error!("{err}");
222                    Err(ZAllocError::OutOfMemory)
223                }
224            }
225        }
226    }
227
228    fn free(&self, chunk: &ChunkDescriptor) {
229        let free_chunk = Chunk {
230            offset: chunk.chunk,
231            size: chunk.len,
232        };
233        self.available
234            .fetch_add(free_chunk.size.get(), Ordering::Relaxed);
235        zlock!(self.free_list).push(free_chunk);
236    }
237
238    fn defragment(&self) -> usize {
239        fn try_merge_adjacent_chunks(a: &Chunk, b: &Chunk) -> Option<Chunk> {
240            let end_offset = a.offset as usize + a.size.get();
241            if end_offset == b.offset as usize {
242                Some(Chunk {
243                    // SAFETY: this is safe because we operate on non-zero sizes and it will never overflow
244                    size: unsafe { NonZeroUsize::new_unchecked(a.size.get() + b.size.get()) },
245                    offset: a.offset,
246                })
247            } else {
248                None
249            }
250        }
251
252        let mut largest = 0usize;
253
254        // TODO: optimize this!
255        // this is an old legacy algo for merging adjacent chunks
256        // we extract chunks to separate container, sort them by offset and then check each chunk for
257        // adjacence with neighbour. Adjacent chunks are joined and returned back to temporary container.
258        // If chunk is not adjacent with it's neighbour, it is placed back to self.free_list
259        let mut guard = zlock!(self.free_list);
260        if guard.len() > 1 {
261            let mut fbs: Vec<Chunk> = guard.drain().collect();
262            fbs.sort_by(|x, y| x.offset.cmp(&y.offset));
263            let mut current = fbs.remove(0);
264            let mut i = 0;
265            let n = fbs.len();
266            for chunk in fbs.iter() {
267                i += 1;
268                let next = *chunk;
269                match try_merge_adjacent_chunks(&current, &next) {
270                    Some(c) => {
271                        current = c;
272                        largest = largest.max(current.size.get());
273                        if i == n {
274                            guard.push(current)
275                        }
276                    }
277                    None => {
278                        guard.push(current);
279                        if i == n {
280                            guard.push(next);
281                        } else {
282                            current = next;
283                        }
284                    }
285                }
286            }
287        }
288        largest
289    }
290
291    fn available(&self) -> usize {
292        self.available.load(Ordering::Relaxed)
293    }
294
295    fn layout_for(&self, layout: MemoryLayout) -> Result<MemoryLayout, ZLayoutError> {
296        layout.extend(self.alignment)
297    }
298}