zenoh_shm/api/protocol_implementations/posix/
posix_shm_provider_backend.rs1use std::{
16 borrow::Borrow,
17 cmp,
18 collections::BinaryHeap,
19 num::NonZeroUsize,
20 sync::{
21 atomic::{AtomicPtr, AtomicUsize, Ordering},
22 Mutex,
23 },
24};
25
26use zenoh_core::{zlock, Resolvable, Wait};
27use zenoh_result::ZResult;
28
29use super::posix_shm_segment::PosixShmSegment;
30use crate::api::{
31 common::types::ChunkID,
32 provider::{
33 chunk::{AllocatedChunk, ChunkDescriptor},
34 shm_provider_backend::ShmProviderBackend,
35 types::{AllocAlignment, ChunkAllocResult, MemoryLayout, ZAllocError, ZLayoutError},
36 },
37};
38
39#[derive(Eq, Copy, Clone, Debug)]
40struct Chunk {
41 offset: ChunkID,
42 size: NonZeroUsize,
43}
44
45impl Ord for Chunk {
46 fn cmp(&self, other: &Self) -> cmp::Ordering {
47 self.size.cmp(&other.size)
48 }
49}
50
51impl PartialOrd for Chunk {
52 fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
53 Some(self.cmp(other))
54 }
55}
56
57impl PartialEq for Chunk {
58 fn eq(&self, other: &Self) -> bool {
59 self.size == other.size
60 }
61}
62
63#[zenoh_macros::unstable_doc]
65pub struct PosixShmProviderBackendBuilder;
66
67impl PosixShmProviderBackendBuilder {
68 #[zenoh_macros::unstable_doc]
70 pub fn with_layout<Layout: Borrow<MemoryLayout>>(
71 self,
72 layout: Layout,
73 ) -> LayoutedPosixShmProviderBackendBuilder<Layout> {
74 LayoutedPosixShmProviderBackendBuilder { layout }
75 }
76
77 #[zenoh_macros::unstable_doc]
79 pub fn with_layout_args(
80 self,
81 size: usize,
82 alignment: AllocAlignment,
83 ) -> Result<LayoutedPosixShmProviderBackendBuilder<MemoryLayout>, ZLayoutError> {
84 let layout = MemoryLayout::new(size, alignment)?;
85 Ok(LayoutedPosixShmProviderBackendBuilder { layout })
86 }
87
88 #[zenoh_macros::unstable_doc]
90 pub fn with_size(
91 self,
92 size: usize,
93 ) -> Result<LayoutedPosixShmProviderBackendBuilder<MemoryLayout>, ZLayoutError> {
94 let layout = MemoryLayout::new(size, AllocAlignment::default())?;
95 Ok(LayoutedPosixShmProviderBackendBuilder { layout })
96 }
97}
98
99#[zenoh_macros::unstable_doc]
100pub struct LayoutedPosixShmProviderBackendBuilder<Layout: Borrow<MemoryLayout>> {
101 layout: Layout,
102}
103
104#[zenoh_macros::unstable_doc]
105impl<Layout: Borrow<MemoryLayout>> Resolvable for LayoutedPosixShmProviderBackendBuilder<Layout> {
106 type To = ZResult<PosixShmProviderBackend>;
107}
108
109#[zenoh_macros::unstable_doc]
110impl<Layout: Borrow<MemoryLayout>> Wait for LayoutedPosixShmProviderBackendBuilder<Layout> {
111 fn wait(self) -> <Self as Resolvable>::To {
112 PosixShmProviderBackend::new(self.layout.borrow())
113 }
114}
115
116#[zenoh_macros::unstable_doc]
119pub struct PosixShmProviderBackend {
120 available: AtomicUsize,
121 segment: PosixShmSegment,
122 free_list: Mutex<BinaryHeap<Chunk>>,
123 alignment: AllocAlignment,
124}
125
126impl PosixShmProviderBackend {
127 #[zenoh_macros::unstable_doc]
129 pub fn builder() -> PosixShmProviderBackendBuilder {
130 PosixShmProviderBackendBuilder
131 }
132
133 fn new(layout: &MemoryLayout) -> ZResult<Self> {
134 let segment = PosixShmSegment::create(layout.size())?;
135
136 let real_size = segment.segment.elem_count().get();
139 let aligned_size = (real_size
140 - (real_size % layout.alignment().get_alignment_value().get()))
141 .try_into()?;
142
143 let mut free_list = BinaryHeap::new();
144 let root_chunk = Chunk {
145 offset: 0,
146 size: aligned_size,
147 };
148 free_list.push(root_chunk);
149
150 tracing::trace!(
151 "Created PosixShmProviderBackend id {}, layout {:?}, aligned size {aligned_size}",
152 segment.segment.id(),
153 layout
154 );
155
156 Ok(Self {
157 available: AtomicUsize::new(aligned_size.get()),
158 segment,
159 free_list: Mutex::new(free_list),
160 alignment: layout.alignment(),
161 })
162 }
163}
164
165impl ShmProviderBackend for PosixShmProviderBackend {
166 fn alloc(&self, layout: &MemoryLayout) -> ChunkAllocResult {
167 tracing::trace!("PosixShmProviderBackend::alloc({:?})", layout);
168
169 let required_len = layout.size();
170
171 if self.available.load(Ordering::Relaxed) < required_len.get() {
172 tracing::trace!( "PosixShmProviderBackend does not have sufficient free memory to allocate {:?}, try de-fragmenting!", layout);
173 return Err(ZAllocError::OutOfMemory);
174 }
175
176 let mut guard = zlock!(self.free_list);
177 match guard.pop() {
181 Some(mut chunk) if chunk.size >= required_len => {
182 tracing::trace!("Allocator selected Chunk ({:?})", &chunk);
184
185 if chunk.size > required_len {
186 let free_chunk = Chunk {
187 offset: chunk.offset + required_len.get() as ChunkID,
188 size: unsafe {
190 NonZeroUsize::new_unchecked(chunk.size.get() - required_len.get())
191 },
192 };
193 tracing::trace!("The allocation will leave a Free Chunk: {:?}", &free_chunk);
194 guard.push(free_chunk);
195 chunk.size = required_len;
196 }
197
198 self.available
199 .fetch_sub(chunk.size.get(), Ordering::Relaxed);
200
201 let descriptor =
202 ChunkDescriptor::new(self.segment.segment.id(), chunk.offset, chunk.size);
203
204 Ok(AllocatedChunk {
205 descriptor,
206 data: unsafe { AtomicPtr::new(self.segment.segment.elem_mut(chunk.offset)) },
207 })
208 }
209 Some(c) => {
210 tracing::trace!("PosixShmProviderBackend::alloc({:?}) cannot find any big enough chunk\nShmManager::free_list = {:?}", layout, self.free_list);
211 guard.push(c);
212 Err(ZAllocError::NeedDefragment)
213 }
214 None => {
215 let err = format!("PosixShmProviderBackend::alloc({:?}) cannot find any available chunk\nShmManager::free_list = {:?}", layout, self.free_list);
217 #[cfg(feature = "test")]
218 panic!("{err}");
219 #[cfg(not(feature = "test"))]
220 {
221 tracing::error!("{err}");
222 Err(ZAllocError::OutOfMemory)
223 }
224 }
225 }
226 }
227
228 fn free(&self, chunk: &ChunkDescriptor) {
229 let free_chunk = Chunk {
230 offset: chunk.chunk,
231 size: chunk.len,
232 };
233 self.available
234 .fetch_add(free_chunk.size.get(), Ordering::Relaxed);
235 zlock!(self.free_list).push(free_chunk);
236 }
237
238 fn defragment(&self) -> usize {
239 fn try_merge_adjacent_chunks(a: &Chunk, b: &Chunk) -> Option<Chunk> {
240 let end_offset = a.offset as usize + a.size.get();
241 if end_offset == b.offset as usize {
242 Some(Chunk {
243 size: unsafe { NonZeroUsize::new_unchecked(a.size.get() + b.size.get()) },
245 offset: a.offset,
246 })
247 } else {
248 None
249 }
250 }
251
252 let mut largest = 0usize;
253
254 let mut guard = zlock!(self.free_list);
260 if guard.len() > 1 {
261 let mut fbs: Vec<Chunk> = guard.drain().collect();
262 fbs.sort_by(|x, y| x.offset.cmp(&y.offset));
263 let mut current = fbs.remove(0);
264 let mut i = 0;
265 let n = fbs.len();
266 for chunk in fbs.iter() {
267 i += 1;
268 let next = *chunk;
269 match try_merge_adjacent_chunks(¤t, &next) {
270 Some(c) => {
271 current = c;
272 largest = largest.max(current.size.get());
273 if i == n {
274 guard.push(current)
275 }
276 }
277 None => {
278 guard.push(current);
279 if i == n {
280 guard.push(next);
281 } else {
282 current = next;
283 }
284 }
285 }
286 }
287 }
288 largest
289 }
290
291 fn available(&self) -> usize {
292 self.available.load(Ordering::Relaxed)
293 }
294
295 fn layout_for(&self, layout: MemoryLayout) -> Result<MemoryLayout, ZLayoutError> {
296 layout.extend(self.alignment)
297 }
298}