use std::{
collections::BTreeMap,
marker::PhantomData,
ops::Bound::Included,
os::fd::RawFd,
sync::{atomic::AtomicU64, Arc, Mutex, Weak},
};
use carton_macros::for_each_numeric_carton_type;
use dashmap::DashMap;
use once_cell::sync::Lazy;
use super::{
alloc::{AsPtr, NumericTensorType, TypedAlloc},
alloc_pool::{AllocItem, PoolAllocator, PoolItem},
storage::TensorStorage,
};
#[derive(Debug)]
pub enum SHMTensorStorage {
Numeric {
region: PoolItem<Arc<SHMRegion>>,
offset: usize,
},
String(PoolItem<Vec<String>>),
}
impl<T> AsPtr<T> for SHMTensorStorage {
fn as_ptr(&self) -> *const T {
match self {
SHMTensorStorage::Numeric { region, offset } => (region.start_addr + *offset) as _,
SHMTensorStorage::String(s) => s.as_ptr() as _,
}
}
fn as_mut_ptr(&mut self) -> *mut T {
match self {
SHMTensorStorage::Numeric { region, offset } => (region.start_addr + *offset) as _,
SHMTensorStorage::String(s) => s.as_mut_ptr() as _,
}
}
}
enum MemoryMarker {
ShmRegionStart(SHMRegionID),
ShmRegionEnd(SHMRegionID),
}
#[derive(Hash, PartialEq, Eq, Debug, Clone, Copy)]
struct SHMRegionID(u64);
static SHM_REGION_ID_GEN: AtomicU64 = AtomicU64::new(0);
static ADDR_SPACE: Lazy<Mutex<BTreeMap<usize, MemoryMarker>>> =
Lazy::new(|| Mutex::new(Default::default()));
static ALLOCATED_REGIONS: Lazy<DashMap<SHMRegionID, Weak<SHMRegion>>> =
Lazy::new(|| Default::default());
#[derive(Debug)]
pub struct SHMRegion {
id: SHMRegionID,
fd: RawFd,
start_addr: usize,
len: usize,
}
#[cfg(not(target_os = "macos"))]
unsafe fn memfd_create() -> RawFd {
libc::memfd_create(b"carton_memfd\0" as *const u8 as _, 0)
}
#[cfg(target_os = "macos")]
unsafe fn memfd_create() -> RawFd {
static COUNTER: AtomicU64 = AtomicU64::new(0);
let shmpath = format!(
"/carton_shm_{}_{}\0",
std::process::id(),
COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
);
let fd = libc::shm_open(
shmpath.as_ptr() as _,
libc::O_CREAT | libc::O_EXCL | libc::O_RDWR,
(libc::S_IRUSR | libc::S_IWUSR) as libc::c_uint,
);
libc::shm_unlink(shmpath.as_ptr() as _);
fd
}
impl SHMRegion {
fn new(size_bytes: usize) -> Arc<Self> {
unsafe {
let fd = memfd_create();
if fd == -1 {
panic!("memfd_create failed")
}
if libc::ftruncate(fd, size_bytes as _) == -1 {
panic!("ftruncate failed")
}
let addr = libc::mmap(
std::ptr::null_mut(),
size_bytes,
libc::PROT_READ | libc::PROT_WRITE,
libc::MAP_SHARED,
fd,
0,
);
if addr == libc::MAP_FAILED {
panic!("mmap failed");
}
let region = SHMRegion {
id: SHMRegionID(
SHM_REGION_ID_GEN.fetch_add(1, std::sync::atomic::Ordering::Relaxed),
),
fd,
start_addr: addr as _,
len: size_bytes,
};
{
let mut guard = ADDR_SPACE.lock().unwrap();
guard.insert(addr as _, MemoryMarker::ShmRegionStart(region.id));
guard.insert(
addr as usize + size_bytes,
MemoryMarker::ShmRegionEnd(region.id),
);
}
let id = region.id;
let out = Arc::new(region);
ALLOCATED_REGIONS.insert(id, Arc::downgrade(&out));
out
}
}
}
impl Drop for SHMRegion {
fn drop(&mut self) {
unsafe {
let res = libc::munmap(self.start_addr as _, self.len);
if res != 0 {
panic!("munmap failed")
}
let res = libc::close(self.fd);
if res != 0 {
panic!("close failed")
}
{
let mut guard = ADDR_SPACE.lock().unwrap();
guard.remove(&self.start_addr);
guard.remove(&(self.start_addr + self.len));
}
ALLOCATED_REGIONS.remove(&self.id);
}
}
}
impl AllocItem for Arc<SHMRegion> {
fn new(numel: usize) -> Self {
SHMRegion::new(numel)
}
fn len(&self) -> usize {
self.len
}
}
pub struct SHMAllocator {
use_pool: bool,
numeric: Arc<PoolAllocator<Arc<SHMRegion>>>,
string: Arc<PoolAllocator<Vec<String>>>,
}
impl SHMAllocator {
pub(crate) fn new() -> Self {
Self {
use_pool: true,
numeric: Arc::new(PoolAllocator::new()),
string: Arc::new(PoolAllocator::new()),
}
}
#[cfg(feature = "benchmark")]
pub(crate) fn without_pool() -> Self {
Self {
use_pool: false,
numeric: Arc::new(PoolAllocator::new()),
string: Arc::new(PoolAllocator::new()),
}
}
fn get_shm_region(addr: usize) -> Option<Arc<SHMRegion>> {
match ADDR_SPACE
.lock()
.unwrap()
.range((Included(&0), Included(&addr)))
.next_back()
.map(|(_, v)| v)
{
Some(MemoryMarker::ShmRegionStart(id)) => match ALLOCATED_REGIONS.get(id) {
Some(region) => region.upgrade(),
None => None,
},
Some(MemoryMarker::ShmRegionEnd(_)) => None,
None => None,
}
}
}
for_each_numeric_carton_type! {
$(
impl TypedAlloc<$RustType> for SHMAllocator {
type Output = SHMTensorStorage;
fn alloc(&self, numel: usize) -> Self::Output {
let size_bytes = numel * std::mem::size_of::<$RustType>();
let out = if !self.use_pool {
SHMRegion::new(size_bytes).into()
} else {
self.numeric.alloc(size_bytes)
};
SHMTensorStorage::Numeric {region: out, offset: 0 }
}
}
)*
}
impl TypedAlloc<String> for SHMAllocator {
type Output = SHMTensorStorage;
fn alloc(&self, numel: usize) -> Self::Output {
let out = if !self.use_pool {
vec![String::default(); numel].into()
} else {
self.string.alloc(numel)
};
SHMTensorStorage::String(out)
}
}
impl<T: NumericTensorType> From<ndarray::ArrayViewD<'_, T>> for TensorStorage<T, SHMTensorStorage>
where
SHMAllocator: TypedAlloc<T, Output = SHMTensorStorage>,
{
fn from(view: ndarray::ArrayViewD<'_, T>) -> Self {
let ptr = view.as_ptr();
match SHMAllocator::get_shm_region(ptr as usize) {
Some(region) => TensorStorage {
data: SHMTensorStorage::Numeric {
offset: ptr as usize - region.start_addr,
region: region.into(),
},
shape: view.shape().into_iter().map(|v| *v as _).collect(),
strides: Some(
view.strides()
.into_iter()
.map(|v| (*v).try_into().unwrap())
.collect(),
),
pd: PhantomData,
},
None => {
let mut out = alloc_tensor(view.shape().iter().map(|v| (*v) as _).collect());
if view.is_standard_layout() {
out.view_mut()
.as_slice_mut()
.unwrap()
.copy_from_slice(view.as_slice().unwrap())
} else {
out.view_mut().assign(&view);
}
out
}
}
}
}
impl From<ndarray::ArrayViewD<'_, String>> for TensorStorage<String, SHMTensorStorage> {
fn from(view: ndarray::ArrayViewD<'_, String>) -> Self {
let mut out = alloc_tensor(view.shape().iter().map(|v| (*v) as _).collect());
out.view_mut().assign(&view);
out
}
}
#[cfg(feature = "benchmark")]
pub fn alloc_tensor_no_pool<T: Default + Clone>(
shape: Vec<u64>,
) -> TensorStorage<T, SHMTensorStorage>
where
SHMAllocator: TypedAlloc<T, Output = SHMTensorStorage>,
{
static POOL_ALLOCATOR: Lazy<SHMAllocator> = Lazy::new(|| SHMAllocator::without_pool());
let numel = shape.iter().product::<u64>().max(1) as usize;
let data = <SHMAllocator as TypedAlloc<T>>::alloc(&POOL_ALLOCATOR, numel);
TensorStorage {
data,
shape,
strides: None,
pd: PhantomData,
}
}
pub fn alloc_tensor<T: Default + Clone>(shape: Vec<u64>) -> TensorStorage<T, SHMTensorStorage>
where
SHMAllocator: TypedAlloc<T, Output = SHMTensorStorage>,
{
static POOL_ALLOCATOR: Lazy<SHMAllocator> = Lazy::new(|| SHMAllocator::new());
let numel = shape.iter().product::<u64>().max(1) as usize;
let data = <SHMAllocator as TypedAlloc<T>>::alloc(&POOL_ALLOCATOR, numel);
TensorStorage {
data,
shape,
strides: None,
pd: PhantomData,
}
}