zenoh_shm/api/protocol_implementations/posix/
posix_shm_provider_backend.rsuse std::{
borrow::Borrow,
cmp,
collections::BinaryHeap,
num::NonZeroUsize,
sync::{
atomic::{AtomicPtr, AtomicUsize, Ordering},
Mutex,
},
};
use zenoh_core::{zlock, Resolvable, Wait};
use zenoh_result::ZResult;
use super::posix_shm_segment::PosixShmSegment;
use crate::api::{
common::types::ChunkID,
provider::{
chunk::{AllocatedChunk, ChunkDescriptor},
shm_provider_backend::ShmProviderBackend,
types::{AllocAlignment, ChunkAllocResult, MemoryLayout, ZAllocError, ZLayoutError},
},
};
#[derive(Eq, Copy, Clone, Debug)]
struct Chunk {
offset: ChunkID,
size: NonZeroUsize,
}
impl Ord for Chunk {
fn cmp(&self, other: &Self) -> cmp::Ordering {
self.size.cmp(&other.size)
}
}
impl PartialOrd for Chunk {
fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
Some(self.cmp(other))
}
}
impl PartialEq for Chunk {
fn eq(&self, other: &Self) -> bool {
self.size == other.size
}
}
#[zenoh_macros::unstable_doc]
pub struct PosixShmProviderBackendBuilder;
impl PosixShmProviderBackendBuilder {
#[zenoh_macros::unstable_doc]
pub fn with_layout<Layout: Borrow<MemoryLayout>>(
self,
layout: Layout,
) -> LayoutedPosixShmProviderBackendBuilder<Layout> {
LayoutedPosixShmProviderBackendBuilder { layout }
}
#[zenoh_macros::unstable_doc]
pub fn with_layout_args(
self,
size: usize,
alignment: AllocAlignment,
) -> Result<LayoutedPosixShmProviderBackendBuilder<MemoryLayout>, ZLayoutError> {
let layout = MemoryLayout::new(size, alignment)?;
Ok(LayoutedPosixShmProviderBackendBuilder { layout })
}
#[zenoh_macros::unstable_doc]
pub fn with_size(
self,
size: usize,
) -> Result<LayoutedPosixShmProviderBackendBuilder<MemoryLayout>, ZLayoutError> {
let layout = MemoryLayout::new(size, AllocAlignment::default())?;
Ok(LayoutedPosixShmProviderBackendBuilder { layout })
}
}
#[zenoh_macros::unstable_doc]
pub struct LayoutedPosixShmProviderBackendBuilder<Layout: Borrow<MemoryLayout>> {
layout: Layout,
}
#[zenoh_macros::unstable_doc]
impl<Layout: Borrow<MemoryLayout>> Resolvable for LayoutedPosixShmProviderBackendBuilder<Layout> {
type To = ZResult<PosixShmProviderBackend>;
}
#[zenoh_macros::unstable_doc]
impl<Layout: Borrow<MemoryLayout>> Wait for LayoutedPosixShmProviderBackendBuilder<Layout> {
fn wait(self) -> <Self as Resolvable>::To {
PosixShmProviderBackend::new(self.layout.borrow())
}
}
#[zenoh_macros::unstable_doc]
pub struct PosixShmProviderBackend {
available: AtomicUsize,
segment: PosixShmSegment,
free_list: Mutex<BinaryHeap<Chunk>>,
alignment: AllocAlignment,
}
impl PosixShmProviderBackend {
#[zenoh_macros::unstable_doc]
pub fn builder() -> PosixShmProviderBackendBuilder {
PosixShmProviderBackendBuilder
}
fn new(layout: &MemoryLayout) -> ZResult<Self> {
let segment = PosixShmSegment::create(layout.size())?;
let mut free_list = BinaryHeap::new();
let root_chunk = Chunk {
offset: 0,
size: layout.size(),
};
free_list.push(root_chunk);
tracing::trace!(
"Created PosixShmProviderBackend id {}, layout {:?}",
segment.segment.id(),
layout
);
Ok(Self {
available: AtomicUsize::new(layout.size().get()),
segment,
free_list: Mutex::new(free_list),
alignment: layout.alignment(),
})
}
}
impl ShmProviderBackend for PosixShmProviderBackend {
fn alloc(&self, layout: &MemoryLayout) -> ChunkAllocResult {
tracing::trace!("PosixShmProviderBackend::alloc({:?})", layout);
let required_len = layout.size();
if self.available.load(Ordering::Relaxed) < required_len.get() {
tracing::trace!( "PosixShmProviderBackend does not have sufficient free memory to allocate {:?}, try de-fragmenting!", layout);
return Err(ZAllocError::OutOfMemory);
}
let mut guard = zlock!(self.free_list);
match guard.pop() {
Some(mut chunk) if chunk.size >= required_len => {
tracing::trace!("Allocator selected Chunk ({:?})", &chunk);
if chunk.size > required_len {
let free_chunk = Chunk {
offset: chunk.offset + required_len.get() as ChunkID,
size: unsafe {
NonZeroUsize::new_unchecked(chunk.size.get() - required_len.get())
},
};
tracing::trace!("The allocation will leave a Free Chunk: {:?}", &free_chunk);
guard.push(free_chunk);
chunk.size = required_len;
}
self.available
.fetch_sub(chunk.size.get(), Ordering::Relaxed);
let descriptor =
ChunkDescriptor::new(self.segment.segment.id(), chunk.offset, chunk.size);
Ok(AllocatedChunk {
descriptor,
data: unsafe { AtomicPtr::new(self.segment.segment.elem_mut(chunk.offset)) },
})
}
Some(c) => {
tracing::trace!("PosixShmProviderBackend::alloc({:?}) cannot find any big enough chunk\nShmManager::free_list = {:?}", layout, self.free_list);
guard.push(c);
Err(ZAllocError::NeedDefragment)
}
None => {
let err = format!("PosixShmProviderBackend::alloc({:?}) cannot find any available chunk\nShmManager::free_list = {:?}", layout, self.free_list);
#[cfg(feature = "test")]
panic!("{err}");
#[cfg(not(feature = "test"))]
{
tracing::error!("{err}");
Err(ZAllocError::OutOfMemory)
}
}
}
}
fn free(&self, chunk: &ChunkDescriptor) {
let free_chunk = Chunk {
offset: chunk.chunk,
size: chunk.len,
};
self.available
.fetch_add(free_chunk.size.get(), Ordering::Relaxed);
zlock!(self.free_list).push(free_chunk);
}
fn defragment(&self) -> usize {
fn try_merge_adjacent_chunks(a: &Chunk, b: &Chunk) -> Option<Chunk> {
let end_offset = a.offset as usize + a.size.get();
if end_offset == b.offset as usize {
Some(Chunk {
size: unsafe { NonZeroUsize::new_unchecked(a.size.get() + b.size.get()) },
offset: a.offset,
})
} else {
None
}
}
let mut largest = 0usize;
let mut guard = zlock!(self.free_list);
if guard.len() > 1 {
let mut fbs: Vec<Chunk> = guard.drain().collect();
fbs.sort_by(|x, y| x.offset.cmp(&y.offset));
let mut current = fbs.remove(0);
let mut i = 0;
let n = fbs.len();
for chunk in fbs.iter() {
i += 1;
let next = *chunk;
match try_merge_adjacent_chunks(¤t, &next) {
Some(c) => {
current = c;
largest = largest.max(current.size.get());
if i == n {
guard.push(current)
}
}
None => {
guard.push(current);
if i == n {
guard.push(next);
} else {
current = next;
}
}
}
}
}
largest
}
fn available(&self) -> usize {
self.available.load(Ordering::Relaxed)
}
fn layout_for(&self, layout: MemoryLayout) -> Result<MemoryLayout, ZLayoutError> {
layout.extend(self.alignment)
}
}