use libc::{
c_uint, c_void, ftruncate, madvise, sem_close, sem_open, sem_post, sem_t, sem_trywait,
sem_unlink, sem_wait, shm_open, shm_unlink, MADV_FREE, O_CREAT, O_RDWR, S_IRUSR, S_IWUSR,
};
use memmap2::{Mmap, MmapMut};
use std::cmp::Reverse;
use std::collections::BinaryHeap;
use std::ffi::CString;
use std::fs::File;
use std::io::{self, Error};
use std::ops::{Deref, DerefMut};
use std::os::fd::{AsRawFd, FromRawFd, RawFd};
use std::sync::atomic::{fence, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Duration;
const HUGE_PAGE_SIZE: usize = 2 * 1024 * 1024;
const HEADER_SIZE: usize = HUGE_PAGE_SIZE;
const SPIN_LIMIT: usize = 10000;
#[repr(C)]
#[derive(Debug, Default, Clone, Copy)]
pub struct CrashDetails {
pub signal: i32, pub addr: u64, pub operation: i32, }
#[repr(C)]
struct RingHeader {
write_idx: AtomicUsize,
read_idx: AtomicUsize,
reserved_idx: AtomicUsize,
capacity: usize,
slot_size: usize,
finished: AtomicUsize,
crash_details: CrashDetails,
}
pub enum TraceResult {
Data(ConsumerGuard),
Finished, Crashed(CrashDetails), Timeout, }
struct PosixSemaphore {
sem: *mut sem_t,
}
unsafe impl Send for PosixSemaphore {}
unsafe impl Sync for PosixSemaphore {}
impl PosixSemaphore {
fn create(name: &str, value: u32) -> std::io::Result<Self> {
let c_name = CString::new(name).unwrap();
unsafe {
let sem = sem_open(c_name.as_ptr(), O_CREAT, 0o644, value);
if sem == libc::SEM_FAILED {
return Err(std::io::Error::last_os_error());
}
Ok(Self { sem })
}
}
fn wait(&self) {
unsafe {
sem_wait(self.sem);
}
}
fn try_wait(&self) -> bool {
unsafe { sem_trywait(self.sem) == 0 }
}
fn post(&self) {
unsafe {
sem_post(self.sem);
}
}
fn wait_timeout(&self, timeout: Duration) -> bool {
unsafe {
#[cfg(target_os = "linux")]
{
let mut ts = libc::timespec { tv_sec: 0, tv_nsec: 0 };
libc::clock_gettime(libc::CLOCK_REALTIME, &mut ts);
ts.tv_sec += timeout.as_secs() as i64;
ts.tv_nsec += timeout.subsec_nanos() as i64;
if ts.tv_nsec >= 1_000_000_000 {
ts.tv_sec += 1;
ts.tv_nsec -= 1_000_000_000;
}
libc::sem_timedwait(self.sem, &ts) == 0
}
#[cfg(target_os = "macos")]
{
let start = std::time::Instant::now();
while start.elapsed() < timeout {
if sem_trywait(self.sem) == 0 {
return true;
}
std::thread::sleep(Duration::from_millis(1));
}
false
}
}
}
}
impl Drop for PosixSemaphore {
fn drop(&mut self) {
unsafe {
sem_close(self.sem);
}
}
}
struct InnerRing {
_file: File,
_mmap: MmapMut,
header: *mut RingHeader,
data_start: *mut u8,
sem_filled: PosixSemaphore,
sem_empty: PosixSemaphore,
name: String,
is_owner: bool,
pending_completions: Mutex<BinaryHeap<Reverse<usize>>>,
}
unsafe impl Send for InnerRing {}
unsafe impl Sync for InnerRing {}
impl Drop for InnerRing {
fn drop(&mut self) {
if self.is_owner {
let c_name = CString::new(self.name.clone()).unwrap();
let c_fill = CString::new(format!("{}_filled", self.name)).unwrap();
let c_empty = CString::new(format!("{}_empty", self.name)).unwrap();
unsafe {
shm_unlink(c_name.as_ptr());
sem_unlink(c_fill.as_ptr());
sem_unlink(c_empty.as_ptr());
}
}
}
}
#[derive(Clone)]
pub struct ShmTraceRing {
inner: Arc<InnerRing>,
}
pub struct ProducerGuard {
inner: Arc<InnerRing>,
data_ptr: *mut u8,
len: usize,
}
unsafe impl Send for ProducerGuard {}
unsafe impl Sync for ProducerGuard {}
impl Drop for ProducerGuard {
fn drop(&mut self) {
unsafe {
(*self.inner.header).write_idx.fetch_add(1, Ordering::Release);
}
self.inner.sem_filled.post();
}
}
impl Deref for ProducerGuard {
type Target = [u8];
fn deref(&self) -> &Self::Target {
unsafe { std::slice::from_raw_parts(self.data_ptr, self.len) }
}
}
impl DerefMut for ProducerGuard {
fn deref_mut(&mut self) -> &mut Self::Target {
unsafe { std::slice::from_raw_parts_mut(self.data_ptr, self.len) }
}
}
pub struct ConsumerGuard {
inner: Arc<InnerRing>,
data_ptr: *const u8,
len: usize,
index: usize,
}
unsafe impl Send for ConsumerGuard {}
unsafe impl Sync for ConsumerGuard {}
impl Drop for ConsumerGuard {
fn drop(&mut self) {
self.inner.complete_read(self.index);
}
}
impl Deref for ConsumerGuard {
type Target = [u8];
fn deref(&self) -> &Self::Target {
unsafe { std::slice::from_raw_parts(self.data_ptr, self.len) }
}
}
impl InnerRing {
fn complete_read(&self, completed_idx: usize) {
let mut heap = self.pending_completions.lock().unwrap();
heap.push(Reverse(completed_idx));
let mut current_read = unsafe { (*self.header).read_idx.load(Ordering::Acquire) };
while let Some(Reverse(min_idx)) = heap.peek() {
if *min_idx == current_read {
heap.pop();
unsafe {
(*self.header).read_idx.fetch_add(1, Ordering::Release);
}
self.sem_empty.post();
current_read += 1;
} else {
break;
}
}
}
}
impl ShmTraceRing {
pub fn create(id: &str, capacity: usize, slot_size: usize) -> std::io::Result<Self> {
Self::init(id, capacity, slot_size, true)
}
pub fn open(id: &str, capacity: usize, slot_size: usize) -> std::io::Result<Self> {
Self::init(id, capacity, slot_size, false)
}
fn init(id: &str, capacity: usize, slot_size: usize, is_owner: bool) -> std::io::Result<Self> {
let aligned_size = (slot_size + HUGE_PAGE_SIZE - 1) & !(HUGE_PAGE_SIZE - 1);
let base_name =
if id.starts_with('/') { format!("{}_t", id) } else { format!("/{}_t", id) };
let c_name = CString::new(base_name.clone()).unwrap();
let total_size = HEADER_SIZE + (capacity * aligned_size);
let fd = unsafe {
if is_owner {
shm_unlink(c_name.as_ptr());
shm_open(c_name.as_ptr(), O_CREAT | O_RDWR, (S_IRUSR | S_IWUSR) as c_uint)
} else {
shm_open(c_name.as_ptr(), O_RDWR, 0)
}
};
if fd < 0 {
return Err(std::io::Error::last_os_error());
}
if is_owner {
unsafe { ftruncate(fd, total_size as libc::off_t) };
}
let file = unsafe { File::from_raw_fd(fd) };
let mut mmap = unsafe { MmapMut::map_mut(&file)? };
#[cfg(target_os = "linux")]
unsafe {
libc::madvise(mmap.as_mut_ptr() as *mut c_void, mmap.len(), libc::MADV_HUGEPAGE);
}
let header = mmap.as_ptr() as *mut RingHeader;
let data_start = unsafe { mmap.as_mut_ptr().add(HEADER_SIZE) };
if is_owner {
unsafe {
(*header).capacity = capacity;
(*header).slot_size = aligned_size;
(*header).write_idx.store(0, Ordering::Release);
(*header).read_idx.store(0, Ordering::Release);
(*header).reserved_idx.store(0, Ordering::Release);
(*header).finished.store(0, Ordering::Release);
std::ptr::write_volatile(&mut (*header).crash_details, CrashDetails::default());
}
}
let sem_filled_name = format!("{}_filled", base_name);
let sem_empty_name = format!("{}_empty", base_name);
if is_owner {
let c_fill = CString::new(sem_filled_name.clone()).unwrap();
let c_empty = CString::new(sem_empty_name.clone()).unwrap();
unsafe {
sem_unlink(c_fill.as_ptr());
sem_unlink(c_empty.as_ptr());
}
}
let sem_filled = PosixSemaphore::create(&sem_filled_name, 0)?;
let initial_empty = if is_owner { capacity as u32 } else { 0 };
let sem_empty = PosixSemaphore::create(&sem_empty_name, initial_empty)?;
if !is_owner {
unsafe {
let committed = (*header).read_idx.load(Ordering::Acquire);
let reserved = (*header).reserved_idx.load(Ordering::Acquire);
if reserved > committed {
(*header).reserved_idx.store(committed, Ordering::Release);
}
}
}
Ok(Self {
inner: Arc::new(InnerRing {
_file: file,
_mmap: mmap,
header,
data_start,
sem_filled,
sem_empty,
name: base_name,
is_owner,
pending_completions: Mutex::new(BinaryHeap::new()),
}),
})
}
pub fn acquire(&self) -> ProducerGuard {
for _ in 0..SPIN_LIMIT {
if self.inner.sem_empty.try_wait() {
return self.claim_write_slot();
}
std::hint::spin_loop();
}
self.inner.sem_empty.wait();
self.claim_write_slot()
}
fn claim_write_slot(&self) -> ProducerGuard {
let (w, _, cap, size) = unsafe { self.load_state() };
let slot_idx = w % cap;
let offset = slot_idx * size;
unsafe {
let ptr = self.inner.data_start.add(offset);
madvise(ptr as *mut c_void, size, MADV_FREE);
ProducerGuard { inner: self.inner.clone(), data_ptr: ptr, len: size }
}
}
pub fn mark_finished(&self) {
unsafe {
(*self.inner.header).finished.store(1, Ordering::Release);
}
self.inner.sem_filled.post();
}
pub fn notify_crash(&self, signal: i32, addr: u64, operation: i32) {
unsafe {
let h = self.inner.header;
std::ptr::write_volatile(
&mut (*h).crash_details,
CrashDetails { signal, addr, operation },
);
fence(Ordering::Release);
(*h).finished.store(2, Ordering::Release);
self.inner.sem_filled.post();
}
}
pub fn access(&self, timeout: Duration) -> TraceResult {
if let Some(details) = self.check_crash() {
return TraceResult::Crashed(details);
}
if !self.inner.sem_filled.wait_timeout(timeout) {
if let Some(details) = self.check_crash() {
return TraceResult::Crashed(details);
}
unsafe {
if (*self.inner.header).finished.load(Ordering::Acquire) == 1 {
let w = (*self.inner.header).write_idx.load(Ordering::Acquire);
let r = (*self.inner.header).reserved_idx.load(Ordering::Acquire);
if w == r {
return TraceResult::Finished;
}
}
}
return TraceResult::Timeout;
}
self.claim_read_slot()
}
fn check_crash(&self) -> Option<CrashDetails> {
unsafe {
if (*self.inner.header).finished.load(Ordering::Acquire) == 2 {
Some((*self.inner.header).crash_details)
} else {
None
}
}
}
fn claim_read_slot(&self) -> TraceResult {
unsafe {
let h = self.inner.header;
if (*h).finished.load(Ordering::Acquire) == 2 {
let details = (*h).crash_details;
self.inner.sem_filled.post(); return TraceResult::Crashed(details);
}
let w = (*h).write_idx.load(Ordering::Acquire);
let current_reserved = (*h).reserved_idx.load(Ordering::Acquire);
if w == current_reserved {
if (*h).finished.load(Ordering::Acquire) == 1 {
self.inner.sem_filled.post(); return TraceResult::Finished;
}
return TraceResult::Timeout;
}
let my_idx = (*h).reserved_idx.fetch_add(1, Ordering::Release);
let cap = (*h).capacity;
let size = (*h).slot_size;
let slot_idx = my_idx % cap;
let offset = slot_idx * size;
let ptr = self.inner.data_start.add(offset);
TraceResult::Data(ConsumerGuard {
inner: self.inner.clone(),
data_ptr: ptr,
len: size,
index: my_idx,
})
}
}
unsafe fn load_state(&self) -> (usize, usize, usize, usize) {
let h = self.inner.header;
(
(*h).write_idx.load(Ordering::Relaxed),
(*h).read_idx.load(Ordering::Relaxed),
(*h).capacity,
(*h).slot_size,
)
}
}
#[derive(Debug)]
enum InnerMap {
ReadOnly(Mmap),
ReadWrite(MmapMut),
}
pub struct ShmMemory {
name: String,
file: File,
map: InnerMap,
should_unlink: bool,
}
impl ShmMemory {
pub fn create_readonly(id: &str, size: usize) -> io::Result<Self> {
let (file, name, _) = Self::open_libc(id, libc::O_CREAT | libc::O_RDWR, size)?;
let map = unsafe { Mmap::map(&file)? };
Ok(Self {
name,
file,
map: InnerMap::ReadOnly(map),
should_unlink: true, })
}
pub fn open_readwrite(id: &str) -> io::Result<Self> {
let (file, name, _) = Self::open_libc(id, libc::O_RDWR, 0)?;
let map = unsafe { MmapMut::map_mut(&file)? };
Ok(Self {
name,
file,
map: InnerMap::ReadWrite(map),
should_unlink: false, })
}
fn open_libc(id: &str, flags: libc::c_int, size: usize) -> io::Result<(File, String, bool)> {
let name = format!("{}_m", id);
let clean_name = if name.starts_with('/') { name } else { format!("/{}", name) };
let c_id = CString::new(clean_name.as_str())
.map_err(|e| Error::new(io::ErrorKind::InvalidInput, e))?;
unsafe {
let owner_flag = if flags & libc::O_CREAT != 0 { S_IRUSR | S_IWUSR } else { 0 };
let fd = shm_open(c_id.as_ptr(), flags, owner_flag as c_uint);
if fd < 0 {
return Err(Error::last_os_error());
}
let file = File::from_raw_fd(fd);
if size > 0 {
let ret = libc::ftruncate(fd, size as libc::off_t);
if ret < 0 {
return Err(Error::last_os_error());
}
}
let created = (flags & libc::O_CREAT) != 0;
Ok((file, clean_name, created))
}
}
pub fn keep_on_drop(&mut self) {
self.should_unlink = false;
}
}
impl AsRawFd for ShmMemory {
fn as_raw_fd(&self) -> RawFd {
self.file.as_raw_fd()
}
}
impl Deref for ShmMemory {
type Target = [u8];
fn deref(&self) -> &Self::Target {
match &self.map {
InnerMap::ReadOnly(m) => m.as_ref(),
InnerMap::ReadWrite(m) => m.as_ref(),
}
}
}
impl DerefMut for ShmMemory {
fn deref_mut(&mut self) -> &mut Self::Target {
match &mut self.map {
InnerMap::ReadOnly(_) => {
panic!("ShmMemory Error: Attempted to DerefMut on a Read-Only handle");
}
InnerMap::ReadWrite(m) => m.as_mut(),
}
}
}
impl Drop for ShmMemory {
fn drop(&mut self) {
if self.should_unlink {
if let Ok(c_name) = CString::new(self.name.as_str()) {
unsafe {
libc::shm_unlink(c_name.as_ptr());
}
}
}
}
}