1use libc::{
4 c_uint, c_void, ftruncate, madvise, sem_close, sem_open, sem_post, sem_t, sem_trywait,
5 sem_unlink, sem_wait, shm_open, shm_unlink, MADV_FREE, O_CREAT, O_RDWR, S_IRUSR, S_IWUSR,
6};
7use memmap2::{Mmap, MmapMut};
8use std::cmp::Reverse;
9use std::collections::BinaryHeap;
10use std::ffi::CString;
11use std::fs::File;
12use std::io::{self, Error};
13use std::ops::{Deref, DerefMut};
14use std::os::fd::{AsRawFd, FromRawFd, RawFd};
15use std::sync::atomic::{fence, AtomicUsize, Ordering};
16use std::sync::{Arc, Mutex};
17use std::time::Duration;
18
19const HUGE_PAGE_SIZE: usize = 2 * 1024 * 1024;
37const HEADER_SIZE: usize = HUGE_PAGE_SIZE;
38const SPIN_LIMIT: usize = 10000;
39
40#[repr(C)]
41#[derive(Debug, Default, Clone, Copy)]
42pub struct CrashDetails {
43 pub signal: i32, pub addr: u64, pub operation: i32, }
47
48#[repr(C)]
49struct RingHeader {
50 write_idx: AtomicUsize,
51 read_idx: AtomicUsize,
53 reserved_idx: AtomicUsize,
55 capacity: usize,
56 slot_size: usize,
57 finished: AtomicUsize,
59 crash_details: CrashDetails,
61}
62
63pub enum TraceResult {
65 Data(ConsumerGuard),
66 Finished, Crashed(CrashDetails), Timeout, }
70
71struct PosixSemaphore {
73 sem: *mut sem_t,
74}
75unsafe impl Send for PosixSemaphore {}
76unsafe impl Sync for PosixSemaphore {}
77
78impl PosixSemaphore {
79 fn create(name: &str, value: u32) -> std::io::Result<Self> {
80 let c_name = CString::new(name).unwrap();
81 unsafe {
82 let sem = sem_open(c_name.as_ptr(), O_CREAT, 0o644, value);
83 if sem == libc::SEM_FAILED {
84 return Err(std::io::Error::last_os_error());
85 }
86 Ok(Self { sem })
87 }
88 }
89
90 fn wait(&self) {
91 unsafe {
92 sem_wait(self.sem);
93 }
94 }
95
96 fn try_wait(&self) -> bool {
97 unsafe { sem_trywait(self.sem) == 0 }
98 }
99
100 fn post(&self) {
101 unsafe {
102 sem_post(self.sem);
103 }
104 }
105
106 fn wait_timeout(&self, timeout: Duration) -> bool {
107 unsafe {
108 #[cfg(target_os = "linux")]
109 {
110 let mut ts = libc::timespec { tv_sec: 0, tv_nsec: 0 };
111 libc::clock_gettime(libc::CLOCK_REALTIME, &mut ts);
112
113 ts.tv_sec += timeout.as_secs() as i64;
114 ts.tv_nsec += timeout.subsec_nanos() as i64;
115 if ts.tv_nsec >= 1_000_000_000 {
116 ts.tv_sec += 1;
117 ts.tv_nsec -= 1_000_000_000;
118 }
119
120 libc::sem_timedwait(self.sem, &ts) == 0
121 }
122
123 #[cfg(target_os = "macos")]
124 {
125 let start = std::time::Instant::now();
126 while start.elapsed() < timeout {
127 if sem_trywait(self.sem) == 0 {
128 return true;
129 }
130 std::thread::sleep(Duration::from_millis(1));
131 }
132 false
133 }
134 }
135 }
136}
137
138impl Drop for PosixSemaphore {
139 fn drop(&mut self) {
140 unsafe {
141 sem_close(self.sem);
142 }
143 }
144}
145
146struct InnerRing {
147 _file: File,
148 _mmap: MmapMut,
149 header: *mut RingHeader,
150 data_start: *mut u8,
151 sem_filled: PosixSemaphore,
152 sem_empty: PosixSemaphore,
153 name: String,
154 is_owner: bool,
155
156 pending_completions: Mutex<BinaryHeap<Reverse<usize>>>,
158}
159
160unsafe impl Send for InnerRing {}
161unsafe impl Sync for InnerRing {}
162
163impl Drop for InnerRing {
164 fn drop(&mut self) {
165 if self.is_owner {
166 let c_name = CString::new(self.name.clone()).unwrap();
167 let c_fill = CString::new(format!("{}_filled", self.name)).unwrap();
168 let c_empty = CString::new(format!("{}_empty", self.name)).unwrap();
169 unsafe {
170 shm_unlink(c_name.as_ptr());
171 sem_unlink(c_fill.as_ptr());
172 sem_unlink(c_empty.as_ptr());
173 }
174 }
175 }
176}
177
178#[derive(Clone)]
181pub struct ShmTraceRing {
182 inner: Arc<InnerRing>,
183}
184
185pub struct ProducerGuard {
188 inner: Arc<InnerRing>,
189 data_ptr: *mut u8,
190 len: usize,
191}
192unsafe impl Send for ProducerGuard {}
193unsafe impl Sync for ProducerGuard {}
194
195impl Drop for ProducerGuard {
196 fn drop(&mut self) {
197 unsafe {
198 (*self.inner.header).write_idx.fetch_add(1, Ordering::Release);
199 }
200 self.inner.sem_filled.post();
201 }
202}
203impl Deref for ProducerGuard {
204 type Target = [u8];
205 fn deref(&self) -> &Self::Target {
206 unsafe { std::slice::from_raw_parts(self.data_ptr, self.len) }
207 }
208}
209impl DerefMut for ProducerGuard {
210 fn deref_mut(&mut self) -> &mut Self::Target {
211 unsafe { std::slice::from_raw_parts_mut(self.data_ptr, self.len) }
212 }
213}
214
215pub struct ConsumerGuard {
216 inner: Arc<InnerRing>,
217 data_ptr: *const u8,
218 len: usize,
219 index: usize,
220}
221unsafe impl Send for ConsumerGuard {}
222unsafe impl Sync for ConsumerGuard {}
223
224impl Drop for ConsumerGuard {
225 fn drop(&mut self) {
226 self.inner.complete_read(self.index);
227 }
228}
229impl Deref for ConsumerGuard {
230 type Target = [u8];
231 fn deref(&self) -> &Self::Target {
232 unsafe { std::slice::from_raw_parts(self.data_ptr, self.len) }
233 }
234}
235
236impl InnerRing {
239 fn complete_read(&self, completed_idx: usize) {
240 let mut heap = self.pending_completions.lock().unwrap();
241 heap.push(Reverse(completed_idx));
243
244 let mut current_read = unsafe { (*self.header).read_idx.load(Ordering::Acquire) };
245
246 while let Some(Reverse(min_idx)) = heap.peek() {
248 if *min_idx == current_read {
249 heap.pop();
250 unsafe {
251 (*self.header).read_idx.fetch_add(1, Ordering::Release);
252 }
253 self.sem_empty.post();
254 current_read += 1;
255 } else {
256 break;
257 }
258 }
259 }
260}
261
262impl ShmTraceRing {
263 pub fn create(id: &str, capacity: usize, slot_size: usize) -> std::io::Result<Self> {
265 Self::init(id, capacity, slot_size, true)
266 }
267
268 pub fn open(id: &str, capacity: usize, slot_size: usize) -> std::io::Result<Self> {
270 Self::init(id, capacity, slot_size, false)
271 }
272
273 fn init(id: &str, capacity: usize, slot_size: usize, is_owner: bool) -> std::io::Result<Self> {
274 let aligned_size = (slot_size + HUGE_PAGE_SIZE - 1) & !(HUGE_PAGE_SIZE - 1);
276 let base_name =
278 if id.starts_with('/') { format!("{}_t", id) } else { format!("/{}_t", id) };
279 let c_name = CString::new(base_name.clone()).unwrap();
280 let total_size = HEADER_SIZE + (capacity * aligned_size);
281
282 let fd = unsafe {
283 if is_owner {
284 shm_unlink(c_name.as_ptr());
285 shm_open(c_name.as_ptr(), O_CREAT | O_RDWR, (S_IRUSR | S_IWUSR) as c_uint)
286 } else {
287 shm_open(c_name.as_ptr(), O_RDWR, 0)
288 }
289 };
290 if fd < 0 {
291 return Err(std::io::Error::last_os_error());
292 }
293
294 if is_owner {
295 unsafe { ftruncate(fd, total_size as libc::off_t) };
296 }
297
298 let file = unsafe { File::from_raw_fd(fd) };
299 let mut mmap = unsafe { MmapMut::map_mut(&file)? };
300
301 #[cfg(target_os = "linux")]
302 unsafe {
303 libc::madvise(mmap.as_mut_ptr() as *mut c_void, mmap.len(), libc::MADV_HUGEPAGE);
304 }
305
306 let header = mmap.as_ptr() as *mut RingHeader;
307 let data_start = unsafe { mmap.as_mut_ptr().add(HEADER_SIZE) };
308
309 if is_owner {
310 unsafe {
311 (*header).capacity = capacity;
312 (*header).slot_size = aligned_size;
313 (*header).write_idx.store(0, Ordering::Release);
314 (*header).read_idx.store(0, Ordering::Release);
315 (*header).reserved_idx.store(0, Ordering::Release);
316 (*header).finished.store(0, Ordering::Release);
317 std::ptr::write_volatile(&mut (*header).crash_details, CrashDetails::default());
319 }
320 }
321
322 let sem_filled_name = format!("{}_filled", base_name);
323 let sem_empty_name = format!("{}_empty", base_name);
324
325 if is_owner {
326 let c_fill = CString::new(sem_filled_name.clone()).unwrap();
327 let c_empty = CString::new(sem_empty_name.clone()).unwrap();
328 unsafe {
329 sem_unlink(c_fill.as_ptr());
330 sem_unlink(c_empty.as_ptr());
331 }
332 }
333
334 let sem_filled = PosixSemaphore::create(&sem_filled_name, 0)?;
335 let initial_empty = if is_owner { capacity as u32 } else { 0 };
336 let sem_empty = PosixSemaphore::create(&sem_empty_name, initial_empty)?;
337
338 if !is_owner {
340 unsafe {
341 let committed = (*header).read_idx.load(Ordering::Acquire);
342 let reserved = (*header).reserved_idx.load(Ordering::Acquire);
343 if reserved > committed {
344 (*header).reserved_idx.store(committed, Ordering::Release);
345 }
346 }
347 }
348
349 Ok(Self {
350 inner: Arc::new(InnerRing {
351 _file: file,
352 _mmap: mmap,
353 header,
354 data_start,
355 sem_filled,
356 sem_empty,
357 name: base_name,
358 is_owner,
359 pending_completions: Mutex::new(BinaryHeap::new()),
360 }),
361 })
362 }
363
364 pub fn acquire(&self) -> ProducerGuard {
367 for _ in 0..SPIN_LIMIT {
368 if self.inner.sem_empty.try_wait() {
369 return self.claim_write_slot();
370 }
371 std::hint::spin_loop();
372 }
373 self.inner.sem_empty.wait();
374 self.claim_write_slot()
375 }
376
377 fn claim_write_slot(&self) -> ProducerGuard {
378 let (w, _, cap, size) = unsafe { self.load_state() };
379 let slot_idx = w % cap;
380 let offset = slot_idx * size;
381
382 unsafe {
383 let ptr = self.inner.data_start.add(offset);
384 madvise(ptr as *mut c_void, size, MADV_FREE);
385 ProducerGuard { inner: self.inner.clone(), data_ptr: ptr, len: size }
386 }
387 }
388
389 pub fn mark_finished(&self) {
390 unsafe {
391 (*self.inner.header).finished.store(1, Ordering::Release);
392 }
393 self.inner.sem_filled.post();
394 }
395
396 pub fn notify_crash(&self, signal: i32, addr: u64, operation: i32) {
401 unsafe {
402 let h = self.inner.header;
403 std::ptr::write_volatile(
406 &mut (*h).crash_details,
407 CrashDetails { signal, addr, operation },
408 );
409
410 fence(Ordering::Release);
413
414 (*h).finished.store(2, Ordering::Release);
416
417 self.inner.sem_filled.post();
419 }
420 }
421
422 pub fn access(&self, timeout: Duration) -> TraceResult {
425 if let Some(details) = self.check_crash() {
427 return TraceResult::Crashed(details);
428 }
429
430 if !self.inner.sem_filled.wait_timeout(timeout) {
443 if let Some(details) = self.check_crash() {
445 return TraceResult::Crashed(details);
446 }
447
448 unsafe {
449 if (*self.inner.header).finished.load(Ordering::Acquire) == 1 {
450 let w = (*self.inner.header).write_idx.load(Ordering::Acquire);
453 let r = (*self.inner.header).reserved_idx.load(Ordering::Acquire);
454 if w == r {
455 return TraceResult::Finished;
456 }
457 }
458 }
459 return TraceResult::Timeout;
460 }
461
462 self.claim_read_slot()
464 }
465
466 fn check_crash(&self) -> Option<CrashDetails> {
467 unsafe {
468 if (*self.inner.header).finished.load(Ordering::Acquire) == 2 {
469 Some((*self.inner.header).crash_details)
471 } else {
472 None
473 }
474 }
475 }
476
477 fn claim_read_slot(&self) -> TraceResult {
478 unsafe {
479 let h = self.inner.header;
480
481 if (*h).finished.load(Ordering::Acquire) == 2 {
483 let details = (*h).crash_details;
484 self.inner.sem_filled.post(); return TraceResult::Crashed(details);
486 }
487
488 let w = (*h).write_idx.load(Ordering::Acquire);
489 let current_reserved = (*h).reserved_idx.load(Ordering::Acquire);
490
491 if w == current_reserved {
492 if (*h).finished.load(Ordering::Acquire) == 1 {
493 self.inner.sem_filled.post(); return TraceResult::Finished;
495 }
496
497 return TraceResult::Timeout;
500 }
501
502 let my_idx = (*h).reserved_idx.fetch_add(1, Ordering::Release);
503
504 let cap = (*h).capacity;
505 let size = (*h).slot_size;
506 let slot_idx = my_idx % cap;
507 let offset = slot_idx * size;
508 let ptr = self.inner.data_start.add(offset);
509
510 TraceResult::Data(ConsumerGuard {
511 inner: self.inner.clone(),
512 data_ptr: ptr,
513 len: size,
514 index: my_idx,
515 })
516 }
517 }
518
519 unsafe fn load_state(&self) -> (usize, usize, usize, usize) {
520 let h = self.inner.header;
521 (
522 (*h).write_idx.load(Ordering::Relaxed),
523 (*h).read_idx.load(Ordering::Relaxed),
524 (*h).capacity,
525 (*h).slot_size,
526 )
527 }
528}
529
530#[derive(Debug)]
557enum InnerMap {
558 ReadOnly(Mmap),
559 ReadWrite(MmapMut),
560}
561
562pub struct ShmMemory {
568 name: String,
570 file: File,
572 map: InnerMap,
574 should_unlink: bool,
576}
577
578impl ShmMemory {
579 pub fn create_readonly(id: &str, size: usize) -> io::Result<Self> {
585 let (file, name, _) = Self::open_libc(id, libc::O_CREAT | libc::O_RDWR, size)?;
586
587 let map = unsafe { Mmap::map(&file)? };
589
590 Ok(Self {
591 name,
592 file,
593 map: InnerMap::ReadOnly(map),
594 should_unlink: true, })
596 }
597
598 pub fn open_readwrite(id: &str) -> io::Result<Self> {
603 let (file, name, _) = Self::open_libc(id, libc::O_RDWR, 0)?;
605
606 let map = unsafe { MmapMut::map_mut(&file)? };
608
609 Ok(Self {
610 name,
611 file,
612 map: InnerMap::ReadWrite(map),
613 should_unlink: false, })
615 }
616
617 fn open_libc(id: &str, flags: libc::c_int, size: usize) -> io::Result<(File, String, bool)> {
620 let name = format!("{}_m", id);
621 let clean_name = if name.starts_with('/') { name } else { format!("/{}", name) };
622 let c_id = CString::new(clean_name.as_str())
623 .map_err(|e| Error::new(io::ErrorKind::InvalidInput, e))?;
624
625 unsafe {
626 let owner_flag = if flags & libc::O_CREAT != 0 { S_IRUSR | S_IWUSR } else { 0 };
627 let fd = shm_open(c_id.as_ptr(), flags, owner_flag as c_uint);
630 if fd < 0 {
631 return Err(Error::last_os_error());
632 }
633
634 let file = File::from_raw_fd(fd);
636
637 if size > 0 {
639 let ret = libc::ftruncate(fd, size as libc::off_t);
640 if ret < 0 {
641 return Err(Error::last_os_error());
642 }
643 }
644
645 let created = (flags & libc::O_CREAT) != 0;
646 Ok((file, clean_name, created))
647 }
648 }
649
650 pub fn keep_on_drop(&mut self) {
653 self.should_unlink = false;
654 }
655}
656
657impl AsRawFd for ShmMemory {
660 fn as_raw_fd(&self) -> RawFd {
661 self.file.as_raw_fd()
662 }
663}
664
665impl Deref for ShmMemory {
666 type Target = [u8];
667
668 fn deref(&self) -> &Self::Target {
669 match &self.map {
670 InnerMap::ReadOnly(m) => m.as_ref(),
671 InnerMap::ReadWrite(m) => m.as_ref(),
672 }
673 }
674}
675
676impl DerefMut for ShmMemory {
677 fn deref_mut(&mut self) -> &mut Self::Target {
678 match &mut self.map {
679 InnerMap::ReadOnly(_) => {
680 panic!("ShmMemory Error: Attempted to DerefMut on a Read-Only handle");
682 }
683 InnerMap::ReadWrite(m) => m.as_mut(),
684 }
685 }
686}
687
688impl Drop for ShmMemory {
689 fn drop(&mut self) {
690 if self.should_unlink {
691 if let Ok(c_name) = CString::new(self.name.as_str()) {
693 unsafe {
694 libc::shm_unlink(c_name.as_ptr());
695 }
696 }
697 }
698 }
700}