1extern crate alloc;
35use alloc::string::{String, ToString};
36use alloc::vec::Vec;
37use core::sync::atomic::{AtomicU32, Ordering};
38use std::path::PathBuf;
39use std::sync::Mutex;
40
41use shared_memory::{Shmem, ShmemConf, ShmemError};
42
43use crate::allocator::{SlotError, SlotHandle};
44use crate::backend::SlotBackend;
45use crate::slot::{ReaderMask, SLOT_HEADER_SIZE, SlotHeader};
46
47const SEGMENT_MAGIC: u32 = 0x5A44_5353; const SEGMENT_HEADER_SIZE: usize = 0x20;
54
55const GEN_OFFSET: usize = 0x10;
57
58#[derive(Debug)]
60#[non_exhaustive]
61pub enum PosixSlotError {
62 Shm(ShmemError),
64 CapacityOverflow,
66 InvalidHeader,
68 Slot(SlotError),
70}
71
72impl core::fmt::Display for PosixSlotError {
73 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
74 match self {
75 Self::Shm(e) => write!(f, "shm error: {e}"),
76 Self::CapacityOverflow => f.write_str("slot capacity overflows u32"),
77 Self::InvalidHeader => f.write_str("segment magic/version mismatch"),
78 Self::Slot(e) => write!(f, "{e}"),
79 }
80 }
81}
82
83impl std::error::Error for PosixSlotError {}
84
85impl From<ShmemError> for PosixSlotError {
86 fn from(e: ShmemError) -> Self {
87 Self::Shm(e)
88 }
89}
90
91impl From<SlotError> for PosixSlotError {
92 fn from(e: SlotError) -> Self {
93 Self::Slot(e)
94 }
95}
96
97pub struct PosixSlotAllocator {
100 shmem: Option<Shmem>,
103 flink: PathBuf,
105 loaned: Mutex<Vec<bool>>,
109 slot_count: u32,
111 slot_total_size: u32,
113 slot_capacity: u32,
115}
116
117unsafe impl Send for PosixSlotAllocator {}
121unsafe impl Sync for PosixSlotAllocator {}
124
125impl PosixSlotAllocator {
126 pub fn create<P: Into<PathBuf>>(
136 flink_path: P,
137 slot_count: usize,
138 slot_capacity: usize,
139 ) -> Result<Self, PosixSlotError> {
140 let flink_path = flink_path.into();
141 if let Some(parent) = flink_path.parent() {
142 let _ = std::fs::create_dir_all(parent);
143 }
144 let slot_capacity_u32 =
145 u32::try_from(slot_capacity).map_err(|_| PosixSlotError::CapacityOverflow)?;
146 let slot_count_u32 =
147 u32::try_from(slot_count).map_err(|_| PosixSlotError::CapacityOverflow)?;
148 let slot_total_size = align_up(SLOT_HEADER_SIZE + slot_capacity, 64);
149 let slot_total_size_u32 =
150 u32::try_from(slot_total_size).map_err(|_| PosixSlotError::CapacityOverflow)?;
151 let header_size = SEGMENT_HEADER_SIZE;
152 let total_size = header_size + slot_count * slot_total_size;
153
154 let shmem = ShmemConf::new()
155 .size(total_size)
156 .flink(&flink_path)
157 .create()?;
158
159 #[cfg(unix)]
165 {
166 use std::os::unix::fs::PermissionsExt;
167 let mode = std::fs::Permissions::from_mode(0o600);
168 let _ = std::fs::set_permissions(&flink_path, mode.clone());
169 #[cfg(target_os = "linux")]
170 {
171 let shm_path = std::path::Path::new("/dev/shm")
173 .join(shmem.get_os_id().trim_start_matches('/'));
174 let _ = std::fs::set_permissions(&shm_path, mode);
175 }
176 }
177
178 unsafe {
182 let base = shmem.as_ptr();
183 let p = base as *mut u32;
184 p.add(0).write(SEGMENT_MAGIC);
185 p.add(1).write(slot_count_u32);
186 p.add(2).write(slot_total_size_u32);
187 p.add(3).write(0); p.add(GEN_OFFSET / 4).write(0); core::ptr::write_bytes(base.add(header_size), 0u8, slot_count * slot_total_size);
191 }
192
193 Ok(Self {
194 shmem: Some(shmem),
195 flink: flink_path,
196 loaned: Mutex::new(alloc::vec![false; slot_count]),
197 slot_count: slot_count_u32,
198 slot_total_size: slot_total_size_u32,
199 slot_capacity: slot_capacity_u32,
200 })
201 }
202
203 pub fn attach<P: Into<PathBuf>>(flink_path: P) -> Result<Self, PosixSlotError> {
211 let flink_path = flink_path.into();
212 let shmem = ShmemConf::new().flink(&flink_path).open()?;
213
214 let (magic, slot_count, slot_total_size, _next_sn) = unsafe {
218 let p = shmem.as_ptr() as *const u32;
219 (
220 p.add(0).read(),
221 p.add(1).read(),
222 p.add(2).read(),
223 p.add(3).read(),
224 )
225 };
226 if magic != SEGMENT_MAGIC {
227 return Err(PosixSlotError::InvalidHeader);
228 }
229
230 let slot_capacity = slot_total_size.saturating_sub(SLOT_HEADER_SIZE as u32);
231
232 Ok(Self {
233 shmem: Some(shmem),
234 flink: flink_path,
235 loaned: Mutex::new(alloc::vec![false; slot_count as usize]),
236 slot_count,
237 slot_total_size,
238 slot_capacity,
239 })
240 }
241
242 #[must_use]
244 pub fn flink_path(&self) -> &str {
245 self.flink.to_str().unwrap_or("")
246 }
247
248 pub fn segment_path(&self) -> String {
251 self.flink_path().to_string()
252 }
253
254 #[cfg(target_os = "linux")]
258 #[cfg_attr(not(test), allow(dead_code))]
259 pub(crate) fn shmem_os_id(&self) -> &str {
260 self.shmem
261 .as_ref()
262 .map_or("", shared_memory::Shmem::get_os_id)
263 }
264
265 fn slot_ptr(&self, idx: u32) -> Result<*mut u8, SlotError> {
266 if idx >= self.slot_count {
267 return Err(SlotError::OutOfBounds);
268 }
269 let header_size = SEGMENT_HEADER_SIZE;
270 let shmem = self.shmem.as_ref().ok_or(SlotError::LockPoisoned)?;
273 let base = shmem.as_ptr();
274 unsafe { Ok(base.add(header_size + (idx as usize) * (self.slot_total_size as usize))) }
278 }
279
280 fn read_header(&self, idx: u32) -> Result<SlotHeader, SlotError> {
281 let p = self.slot_ptr(idx)?;
282 let header = unsafe { core::ptr::read(p as *const SlotHeader) };
285 Ok(header)
286 }
287
288 fn write_header(&self, idx: u32, header: SlotHeader) -> Result<(), SlotError> {
289 let p = self.slot_ptr(idx)?;
290 unsafe {
293 core::ptr::write(p as *mut SlotHeader, header);
294 }
295 Ok(())
296 }
297
298 fn next_sn_inc(&self) -> Result<u32, SlotError> {
299 let shmem = self.shmem.as_ref().ok_or(SlotError::LockPoisoned)?;
300 let sn_ptr = unsafe { shmem.as_ptr().add(12) as *const AtomicU32 };
304 let atomic = unsafe { &*sn_ptr };
306 Ok(atomic.fetch_add(1, Ordering::Relaxed))
307 }
308
309 fn data_ptr(&self, idx: u32) -> Result<*mut u8, SlotError> {
310 let p = self.slot_ptr(idx)?;
311 Ok(unsafe { p.add(SLOT_HEADER_SIZE) })
313 }
314
315 fn gen_atomic(&self) -> Option<&AtomicU32> {
318 let shmem = self.shmem.as_ref()?;
319 Some(unsafe { &*(shmem.as_ptr().add(GEN_OFFSET) as *const AtomicU32) })
322 }
323
324 fn bump_notify(&self) {
328 if let Some(g) = self.gen_atomic() {
329 g.fetch_add(1, Ordering::Release);
330 #[cfg(target_os = "linux")]
331 futex::wake_all(g);
332 }
333 }
334}
335
336#[cfg(target_os = "linux")]
340mod futex {
341 use core::sync::atomic::AtomicU32;
342
343 pub(super) fn wake_all(word: &AtomicU32) {
344 let ptr = core::ptr::from_ref(word).cast::<u32>().cast_mut();
345 unsafe {
348 libc::syscall(libc::SYS_futex, ptr, libc::FUTEX_WAKE, i32::MAX, 0, 0, 0);
349 }
350 }
351
352 pub(super) fn wait(word: &AtomicU32, expected: u32, timeout: core::time::Duration) {
354 let ts = libc::timespec {
355 tv_sec: timeout.as_secs() as libc::time_t,
356 tv_nsec: libc::c_long::from(timeout.subsec_nanos().min(999_999_999) as i32),
357 };
358 let ptr = core::ptr::from_ref(word).cast::<u32>().cast_mut();
359 unsafe {
362 libc::syscall(
363 libc::SYS_futex,
364 ptr,
365 libc::FUTEX_WAIT,
366 expected as libc::c_int,
367 core::ptr::from_ref(&ts),
368 0,
369 0,
370 );
371 }
372 }
373}
374
375impl SlotBackend for PosixSlotAllocator {
376 fn reserve_slot(&self, active_readers_mask: ReaderMask) -> Result<SlotHandle, SlotError> {
377 let mut loaned = self.loaned.lock().map_err(|_| SlotError::LockPoisoned)?;
378 for idx in 0..self.slot_count {
379 if loaned[idx as usize] {
380 continue;
381 }
382 let header = self.read_header(idx)?;
383 if header.sample_size == 0 || header.all_read(active_readers_mask) {
384 loaned[idx as usize] = true;
385 return Ok(SlotHandle {
386 segment_id: 0,
387 slot_index: idx,
388 });
389 }
390 }
391 Err(SlotError::NoFreeSlot)
392 }
393
394 fn commit_slot(&self, handle: SlotHandle, bytes: &[u8]) -> Result<u32, SlotError> {
395 if bytes.len() > self.slot_capacity as usize {
396 return Err(SlotError::SampleTooLarge {
397 sample: bytes.len(),
398 slot_capacity: self.slot_capacity as usize,
399 });
400 }
401 let sn = self.next_sn_inc()?;
402 let sample_size = u32::try_from(bytes.len()).unwrap_or(u32::MAX);
403 let header = SlotHeader::new(sn, sample_size);
404 let dp = self.data_ptr(handle.slot_index)?;
406 unsafe {
408 core::ptr::copy_nonoverlapping(bytes.as_ptr(), dp, bytes.len());
409 }
410 self.write_header(handle.slot_index, header)?;
411 {
413 let mut loaned = self.loaned.lock().map_err(|_| SlotError::LockPoisoned)?;
414 loaned[handle.slot_index as usize] = false;
415 }
416 self.bump_notify(); Ok(sn)
418 }
419
420 fn discard_slot(&self, handle: SlotHandle) -> Result<(), SlotError> {
421 {
422 let mut loaned = self.loaned.lock().map_err(|_| SlotError::LockPoisoned)?;
423 if (handle.slot_index as usize) >= loaned.len() {
424 return Err(SlotError::OutOfBounds);
425 }
426 loaned[handle.slot_index as usize] = false;
427 }
428 self.bump_notify();
429 Ok(())
430 }
431
432 fn slot_data_ptr(&self, handle: SlotHandle) -> Result<(*mut u8, usize), SlotError> {
433 {
434 let loaned = self.loaned.lock().map_err(|_| SlotError::LockPoisoned)?;
435 let idx = handle.slot_index as usize;
436 if idx >= loaned.len() || !loaned[idx] {
437 return Err(SlotError::OutOfBounds);
438 }
439 }
440 let dp = self.data_ptr(handle.slot_index)?;
443 Ok((dp, self.slot_capacity as usize))
444 }
445
446 fn commit_in_place(&self, handle: SlotHandle, len: usize) -> Result<u32, SlotError> {
447 if len > self.slot_capacity as usize {
448 return Err(SlotError::SampleTooLarge {
449 sample: len,
450 slot_capacity: self.slot_capacity as usize,
451 });
452 }
453 let sn = self.next_sn_inc()?;
454 let sample_size = u32::try_from(len).unwrap_or(u32::MAX);
455 self.write_header(handle.slot_index, SlotHeader::new(sn, sample_size))?;
458 {
459 let mut loaned = self.loaned.lock().map_err(|_| SlotError::LockPoisoned)?;
460 loaned[handle.slot_index as usize] = false;
461 }
462 self.bump_notify();
463 Ok(sn)
464 }
465
466 fn slot_read_ptr(&self, handle: SlotHandle) -> Result<(*const u8, usize), SlotError> {
467 let header = self.read_header(handle.slot_index)?;
468 let n = (header.sample_size as usize).min(self.slot_capacity as usize);
469 let dp = self.data_ptr(handle.slot_index)?;
470 Ok((dp.cast_const(), n))
471 }
472
473 fn next_unread_slot(&self, reader_index: u8) -> Result<Option<SlotHandle>, SlotError> {
474 let bit = 1u32 << reader_index;
475 for idx in 0..self.slot_count {
476 let header = self.read_header(idx)?;
477 if header.sample_size > 0 && (header.reader_mask & bit) == 0 {
478 return Ok(Some(SlotHandle {
479 segment_id: 0,
480 slot_index: idx,
481 }));
482 }
483 }
484 Ok(None)
485 }
486
487 fn read_slot(&self, handle: SlotHandle) -> Result<(SlotHeader, Vec<u8>), SlotError> {
488 let header = self.read_header(handle.slot_index)?;
489 let n = (header.sample_size as usize).min(self.slot_capacity as usize);
490 let dp = self.data_ptr(handle.slot_index)?;
491 let mut buf = alloc::vec![0u8; n];
492 unsafe {
494 core::ptr::copy_nonoverlapping(dp, buf.as_mut_ptr(), n);
495 }
496 Ok((header, buf))
497 }
498
499 fn mark_read(&self, handle: SlotHandle, reader_index: u8) -> Result<(), SlotError> {
500 debug_assert!(reader_index < 32);
501 let p = self.slot_ptr(handle.slot_index)?;
505 let mask_ptr = unsafe { p.add(8) as *const AtomicU32 };
507 let atomic = unsafe { &*mask_ptr };
509 atomic.fetch_or(1u32 << reader_index, Ordering::Relaxed);
510 self.bump_notify(); Ok(())
512 }
513
514 fn mark_reader_disconnected(&self, reader_index: u8) -> Result<(), SlotError> {
515 debug_assert!(reader_index < 32);
516 let bit = 1u32 << reader_index;
517 for idx in 0..self.slot_count {
518 let p = self.slot_ptr(idx)?;
519 let mask_ptr = unsafe { p.add(8) as *const AtomicU32 };
523 let atomic = unsafe { &*mask_ptr };
525 atomic.fetch_or(bit, Ordering::Relaxed);
526 }
527 self.bump_notify();
528 Ok(())
529 }
530
531 fn slot_count(&self) -> Result<usize, SlotError> {
532 Ok(self.slot_count as usize)
533 }
534
535 fn slot_total_size(&self) -> usize {
536 self.slot_total_size as usize
537 }
538
539 fn slot_capacity(&self) -> usize {
540 self.slot_capacity as usize
541 }
542
543 fn notify_generation(&self) -> u64 {
544 self.gen_atomic()
545 .map_or(0, |g| u64::from(g.load(Ordering::Acquire)))
546 }
547
548 fn wait_for_change(&self, last: u64, timeout: core::time::Duration) {
549 #[cfg(target_os = "linux")]
553 if let Some(g) = self.gen_atomic() {
554 futex::wait(g, last as u32, timeout);
555 }
556 #[cfg(not(target_os = "linux"))]
557 let _ = (last, timeout);
558 }
559}
560
561fn align_up(x: usize, n: usize) -> usize {
562 debug_assert!(n.is_power_of_two());
563 (x + n - 1) & !(n - 1)
564}
565
566#[cfg(test)]
567#[allow(clippy::expect_used, clippy::unwrap_used)]
568mod tests {
569 use super::*;
570 use core::sync::atomic::{AtomicU64, Ordering};
571
572 fn unique_flink() -> PathBuf {
573 static N: AtomicU64 = AtomicU64::new(0);
574 let pid = std::process::id();
575 let n = N.fetch_add(1, Ordering::Relaxed);
576 let mut p = std::env::temp_dir();
577 p.push(alloc::format!("zerodds-flatdata-test-{pid}-{n}"));
578 p
579 }
580
581 #[test]
582 fn create_attach_roundtrip() {
583 let flink = unique_flink();
584 let owner = PosixSlotAllocator::create(&flink, 4, 64).expect("create");
585 let consumer = PosixSlotAllocator::attach(&flink).expect("attach");
586 assert_eq!(SlotBackend::slot_count(&owner).unwrap(), 4);
587 assert_eq!(SlotBackend::slot_count(&consumer).unwrap(), 4);
588 assert_eq!(SlotBackend::slot_total_size(&owner), 128);
590 }
591
592 #[cfg(target_os = "linux")]
593 #[test]
594 fn futex_notify_wakes_consumer_across_mappings() {
595 use alloc::sync::Arc;
600 use core::time::Duration;
601 let flink = unique_flink();
602 let owner = Arc::new(PosixSlotAllocator::create(&flink, 4, 64).expect("create"));
603 let consumer = PosixSlotAllocator::attach(&flink).expect("attach");
604
605 let w = Arc::clone(&owner);
606 let h = std::thread::spawn(move || {
607 std::thread::sleep(Duration::from_millis(50));
608 let handle = w.reserve_slot(0b1).expect("reserve");
609 w.commit_slot(handle, &[1, 2, 3, 4]).expect("commit"); });
611
612 let start = std::time::Instant::now();
613 let g0 = consumer.notify_generation();
614 consumer.wait_for_change(g0, Duration::from_secs(5)); let woke = start.elapsed();
616 assert!(
617 woke < Duration::from_secs(2),
618 "consumer should wake on the owner's futex_wake, not spin to timeout (waited {woke:?})"
619 );
620 assert!(
621 consumer.notify_generation() != g0,
622 "generation must have advanced"
623 );
624 h.join().unwrap();
625 }
626
627 #[cfg(target_os = "linux")]
628 #[test]
629 fn segment_is_owner_only_0600() {
630 use std::os::unix::fs::PermissionsExt;
633 let flink = unique_flink();
634 let owner = PosixSlotAllocator::create(&flink, 4, 64).expect("create");
635 let flink_mode = std::fs::metadata(&flink)
636 .expect("flink stat")
637 .permissions()
638 .mode()
639 & 0o777;
640 assert_eq!(
641 flink_mode, 0o600,
642 "flink file must be 0600, was {flink_mode:o}"
643 );
644 let shm_path =
645 std::path::Path::new("/dev/shm").join(owner.shmem_os_id().trim_start_matches('/'));
646 let shm_mode = std::fs::metadata(&shm_path)
647 .expect("shm stat")
648 .permissions()
649 .mode()
650 & 0o777;
651 assert_eq!(shm_mode, 0o600, "shm object must be 0600, was {shm_mode:o}");
652 }
653
654 #[test]
655 fn write_read_through_shm() {
656 let flink = unique_flink();
657 let owner = PosixSlotAllocator::create(&flink, 4, 64).expect("create");
658 let consumer = PosixSlotAllocator::attach(&flink).expect("attach");
659
660 let h = SlotBackend::reserve_slot(&owner, 0b1).expect("reserve");
661 let _sn = SlotBackend::commit_slot(&owner, h, &[1, 2, 3, 4]).expect("commit");
662
663 let (header, bytes) = SlotBackend::read_slot(&consumer, h).expect("read");
664 assert_eq!(header.sample_size, 4);
665 assert_eq!(bytes, vec![1, 2, 3, 4]);
666 }
667
668 #[test]
669 fn mark_read_visible_to_owner() {
670 let flink = unique_flink();
671 let owner = PosixSlotAllocator::create(&flink, 1, 64).expect("create");
672 let consumer = PosixSlotAllocator::attach(&flink).expect("attach");
673
674 let h = SlotBackend::reserve_slot(&owner, 0b011).expect("reserve");
675 SlotBackend::commit_slot(&owner, h, &[0xFF]).expect("commit");
676
677 SlotBackend::mark_read(&consumer, h, 0).expect("mark0");
679 SlotBackend::mark_read(&consumer, h, 1).expect("mark1");
680
681 let (header, _) = SlotBackend::read_slot(&owner, h).unwrap();
683 assert_eq!(header.reader_mask, 0b011);
684
685 let _ = SlotBackend::reserve_slot(&owner, 0b011).expect("reuse");
687 }
688
689 #[test]
690 fn next_sn_increments_atomically() {
691 let flink = unique_flink();
692 let owner = PosixSlotAllocator::create(&flink, 4, 64).expect("create");
693
694 let h0 = SlotBackend::reserve_slot(&owner, 0b1).unwrap();
695 let sn0 = SlotBackend::commit_slot(&owner, h0, &[0]).unwrap();
696 let h1 = SlotBackend::reserve_slot(&owner, 0b1).unwrap();
697 let sn1 = SlotBackend::commit_slot(&owner, h1, &[1]).unwrap();
698 assert!(sn1 > sn0);
699 }
700}