use shared_memory::{Shmem, ShmemConf};
use crate::error::KGDataError;
pub struct SharedMemBuffer {
pub shmem: Shmem,
pub allocator: LinearAllocator,
}
pub struct ReadonlySharedMemBuffer(pub SharedMemBuffer);
unsafe impl Send for ReadonlySharedMemBuffer {}
unsafe impl Sync for ReadonlySharedMemBuffer {}
impl SharedMemBuffer {
pub fn get_flink(url: &str) -> String {
assert!(
url.starts_with("ipc://") && url.ends_with(".ipc"),
"Cannot create flink from socket url: {}",
url
);
format!("{}.flink", &url["ipc://".len()..url.len() - ".ipc".len()])
}
pub fn new(flink: &str, size: usize) -> Result<Self, KGDataError> {
let shmem = match ShmemConf::new().size(size).flink(flink).create() {
Ok(m) => m,
Err(_) => {
return Err(KGDataError::SharedMemoryError(
"Failed to create shared memory file".to_owned(),
));
}
};
Ok(Self {
allocator: LinearAllocator::new(flink.to_owned(), shmem.as_ptr(), size),
shmem,
})
}
pub fn open(flink: &str) -> Result<Self, KGDataError> {
let shmem = match ShmemConf::new().flink(flink).open() {
Ok(m) => m,
Err(_) => {
return Err(KGDataError::SharedMemoryError(
"Failed to open shared memory file".to_owned(),
));
}
};
Ok(Self {
allocator: LinearAllocator::new(flink.to_owned(), shmem.as_ptr(), shmem.len()),
shmem,
})
}
pub fn restore(&self, ser: &[u8]) -> AllocatedMem {
AllocatedMem::deserialize(self.shmem.as_ptr(), ser)
}
pub fn alloc(&mut self, size: usize) -> Result<AllocatedMem, KGDataError> {
self.allocator.allocate(size)
}
pub fn get_blocks(&self) -> Vec<AllocatedMem> {
let mut blocks = Vec::new();
let mut pos = 0;
while pos < self.shmem.len() {
let block = AllocatedMem::from_position(self.shmem.as_ptr(), pos);
pos = block.end;
blocks.push(block);
}
blocks
}
}
pub struct AllocatedMem {
pub mem: *mut u8,
pub begin: usize,
pub end: usize,
}
impl AllocatedMem {
pub const HEADER: usize = 5;
#[inline]
pub fn get_slice(&self) -> &[u8] {
unsafe {
std::slice::from_raw_parts(
self.mem.add(self.begin + AllocatedMem::HEADER),
self.end - self.begin - AllocatedMem::HEADER,
)
}
}
#[inline]
pub fn get_slice_mut(&self) -> &mut [u8] {
unsafe {
std::slice::from_raw_parts_mut(
self.mem.add(self.begin + AllocatedMem::HEADER),
self.end - self.begin - AllocatedMem::HEADER,
)
}
}
#[inline]
pub fn alloc(mem: *mut u8, begin: usize, end: usize) -> Result<AllocatedMem, KGDataError> {
if begin + AllocatedMem::HEADER > end {
return Err(KGDataError::SharedMemoryError(format!(
"Cannot allocate a block of memory with size less than {} bytes",
AllocatedMem::HEADER
)));
}
AllocatedMem::init(mem, 1, begin, end);
Ok(Self { begin, end, mem })
}
#[inline]
pub fn init(mem: *mut u8, is_occupied: u8, begin: usize, end: usize) {
let usable_space = ((end - begin - AllocatedMem::HEADER) as u32).to_le_bytes();
unsafe {
mem.add(begin).write(is_occupied);
mem.add(begin + 1)
.copy_from_nonoverlapping(&usable_space as *const u8, 4);
}
}
#[inline]
pub fn is_free(mem: *mut u8, pos: usize) -> bool {
unsafe { mem.add(pos).read() == 0 }
}
#[inline]
pub fn free(&mut self) {
unsafe {
self.mem.add(self.begin).write(0);
}
}
#[inline]
pub fn get_allocated_size(mem: *mut u8, pos: usize) -> usize {
let blocksize: u32 = u32::from_le_bytes(
unsafe { std::slice::from_raw_parts(mem.add(pos + 1), 4) }
.try_into()
.unwrap(),
);
blocksize as usize + 5
}
pub fn serialize(&self) -> [u8; 4] {
(self.begin as u32).to_le_bytes()
}
pub fn deserialize(mem: *mut u8, value: &[u8]) -> AllocatedMem {
let begin = u32::from_le_bytes(value.try_into().unwrap()) as usize;
Self {
mem,
begin,
end: begin + AllocatedMem::get_allocated_size(mem, begin),
}
}
pub fn from_position(mem: *mut u8, begin: usize) -> AllocatedMem {
Self {
mem,
begin,
end: begin + AllocatedMem::get_allocated_size(mem, begin),
}
}
}
pub struct LinearAllocator {
id: String,
mem: *mut u8,
size: usize,
begin: usize,
end: usize,
usable_end: usize,
}
impl LinearAllocator {
pub const CHECK_INTERVAL: std::time::Duration = std::time::Duration::from_millis(1);
pub fn new(id: String, mem: *mut u8, size: usize) -> Self {
AllocatedMem::init(mem, 0, 0, size);
Self {
id,
mem,
size,
begin: 0,
end: size,
usable_end: size - AllocatedMem::HEADER,
}
}
pub fn allocate(&mut self, size: usize) -> Result<AllocatedMem, KGDataError> {
let actual_size = size + AllocatedMem::HEADER;
if actual_size > self.size {
return Err(KGDataError::SharedMemoryError(
format!(
"Cannot allocate more memory than what have been assigned (request {} bytes but only have {} bytes)",
size, self.size),
));
}
self._alloc(actual_size)
}
fn _alloc(&mut self, size: usize) -> Result<AllocatedMem, KGDataError> {
while size > self.end - self.begin && self.end < self.size {
self.end += self.try_free(self.end)?;
}
if size < self.end - self.begin {
self.begin += size;
if self.begin < self.usable_end {
AllocatedMem::init(self.mem, 0, self.begin, self.end);
}
return AllocatedMem::alloc(self.mem, self.begin - size, self.begin);
} else {
AllocatedMem::init(self.mem, 0, self.begin, self.end);
}
self.begin = 0;
self.end = 0;
while let Some(freed) = self.try_free_nonblocking(self.end) {
self.end += freed;
if self.end >= self.size {
break;
}
}
self._alloc(size)
}
fn try_free(&self, pos: usize) -> Result<usize, KGDataError> {
if pos >= self.usable_end {
return Ok(self.end - pos);
}
if !AllocatedMem::is_free(self.mem, pos) {
for _ in 0..5000 {
if AllocatedMem::is_free(self.mem, pos) {
break;
}
std::thread::sleep(LinearAllocator::CHECK_INTERVAL);
}
if !AllocatedMem::is_free(self.mem, pos) {
return Err(KGDataError::SharedMemoryError(
format!(
"Encounter possible deadlock situation as we have wait for more than 5s for a free block of shared memory at {}:{}",
&self.id, pos,
),
));
}
}
Ok(AllocatedMem::get_allocated_size(self.mem, pos))
}
fn try_free_nonblocking(&self, pos: usize) -> Option<usize> {
if pos >= self.usable_end {
return Some(self.usable_end - pos);
}
if AllocatedMem::is_free(self.mem, pos) {
Some(AllocatedMem::get_allocated_size(self.mem, pos))
} else {
None
}
}
}