1extern crate alloc;
35use alloc::string::{String, ToString};
36use alloc::vec::Vec;
37use core::sync::atomic::{AtomicU32, Ordering};
38use std::path::PathBuf;
39use std::sync::Mutex;
40
41use shared_memory::{Shmem, ShmemConf, ShmemError};
42
43use crate::allocator::{SlotError, SlotHandle};
44use crate::backend::SlotBackend;
45use crate::slot::{ReaderMask, SLOT_HEADER_SIZE, SlotHeader};
46
47const SEGMENT_MAGIC: u32 = 0x5A44_5353; #[derive(Debug)]
51#[non_exhaustive]
52pub enum PosixSlotError {
53 Shm(ShmemError),
55 CapacityOverflow,
57 InvalidHeader,
59 Slot(SlotError),
61}
62
63impl core::fmt::Display for PosixSlotError {
64 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
65 match self {
66 Self::Shm(e) => write!(f, "shm error: {e}"),
67 Self::CapacityOverflow => f.write_str("slot capacity overflows u32"),
68 Self::InvalidHeader => f.write_str("segment magic/version mismatch"),
69 Self::Slot(e) => write!(f, "{e}"),
70 }
71 }
72}
73
74impl std::error::Error for PosixSlotError {}
75
76impl From<ShmemError> for PosixSlotError {
77 fn from(e: ShmemError) -> Self {
78 Self::Shm(e)
79 }
80}
81
82impl From<SlotError> for PosixSlotError {
83 fn from(e: SlotError) -> Self {
84 Self::Slot(e)
85 }
86}
87
88pub struct PosixSlotAllocator {
91 shmem: Option<Shmem>,
94 flink: PathBuf,
96 loaned: Mutex<Vec<bool>>,
100 slot_count: u32,
102 slot_total_size: u32,
104 slot_capacity: u32,
106}
107
108unsafe impl Send for PosixSlotAllocator {}
112unsafe impl Sync for PosixSlotAllocator {}
115
116impl PosixSlotAllocator {
117 pub fn create<P: Into<PathBuf>>(
127 flink_path: P,
128 slot_count: usize,
129 slot_capacity: usize,
130 ) -> Result<Self, PosixSlotError> {
131 let flink_path = flink_path.into();
132 if let Some(parent) = flink_path.parent() {
133 let _ = std::fs::create_dir_all(parent);
134 }
135 let slot_capacity_u32 =
136 u32::try_from(slot_capacity).map_err(|_| PosixSlotError::CapacityOverflow)?;
137 let slot_count_u32 =
138 u32::try_from(slot_count).map_err(|_| PosixSlotError::CapacityOverflow)?;
139 let slot_total_size = align_up(SLOT_HEADER_SIZE + slot_capacity, 64);
140 let slot_total_size_u32 =
141 u32::try_from(slot_total_size).map_err(|_| PosixSlotError::CapacityOverflow)?;
142 let header_size = 0x10usize;
143 let total_size = header_size + slot_count * slot_total_size;
144
145 let shmem = ShmemConf::new()
146 .size(total_size)
147 .flink(&flink_path)
148 .create()?;
149
150 unsafe {
154 let base = shmem.as_ptr();
155 let p = base as *mut u32;
156 p.add(0).write(SEGMENT_MAGIC);
157 p.add(1).write(slot_count_u32);
158 p.add(2).write(slot_total_size_u32);
159 p.add(3).write(0); core::ptr::write_bytes(base.add(header_size), 0u8, slot_count * slot_total_size);
162 }
163
164 Ok(Self {
165 shmem: Some(shmem),
166 flink: flink_path,
167 loaned: Mutex::new(alloc::vec![false; slot_count]),
168 slot_count: slot_count_u32,
169 slot_total_size: slot_total_size_u32,
170 slot_capacity: slot_capacity_u32,
171 })
172 }
173
174 pub fn attach<P: Into<PathBuf>>(flink_path: P) -> Result<Self, PosixSlotError> {
182 let flink_path = flink_path.into();
183 let shmem = ShmemConf::new().flink(&flink_path).open()?;
184
185 let (magic, slot_count, slot_total_size, _next_sn) = unsafe {
189 let p = shmem.as_ptr() as *const u32;
190 (
191 p.add(0).read(),
192 p.add(1).read(),
193 p.add(2).read(),
194 p.add(3).read(),
195 )
196 };
197 if magic != SEGMENT_MAGIC {
198 return Err(PosixSlotError::InvalidHeader);
199 }
200
201 let slot_capacity = slot_total_size.saturating_sub(SLOT_HEADER_SIZE as u32);
202
203 Ok(Self {
204 shmem: Some(shmem),
205 flink: flink_path,
206 loaned: Mutex::new(alloc::vec![false; slot_count as usize]),
207 slot_count,
208 slot_total_size,
209 slot_capacity,
210 })
211 }
212
213 #[must_use]
215 pub fn flink_path(&self) -> &str {
216 self.flink.to_str().unwrap_or("")
217 }
218
219 pub fn segment_path(&self) -> String {
222 self.flink_path().to_string()
223 }
224
225 fn slot_ptr(&self, idx: u32) -> Result<*mut u8, SlotError> {
226 if idx >= self.slot_count {
227 return Err(SlotError::OutOfBounds);
228 }
229 let header_size = 0x10usize;
230 let shmem = self.shmem.as_ref().ok_or(SlotError::LockPoisoned)?;
233 let base = shmem.as_ptr();
234 unsafe { Ok(base.add(header_size + (idx as usize) * (self.slot_total_size as usize))) }
238 }
239
240 fn read_header(&self, idx: u32) -> Result<SlotHeader, SlotError> {
241 let p = self.slot_ptr(idx)?;
242 let header = unsafe { core::ptr::read(p as *const SlotHeader) };
245 Ok(header)
246 }
247
248 fn write_header(&self, idx: u32, header: SlotHeader) -> Result<(), SlotError> {
249 let p = self.slot_ptr(idx)?;
250 unsafe {
253 core::ptr::write(p as *mut SlotHeader, header);
254 }
255 Ok(())
256 }
257
258 fn next_sn_inc(&self) -> Result<u32, SlotError> {
259 let shmem = self.shmem.as_ref().ok_or(SlotError::LockPoisoned)?;
260 let sn_ptr = unsafe { shmem.as_ptr().add(12) as *const AtomicU32 };
264 let atomic = unsafe { &*sn_ptr };
266 Ok(atomic.fetch_add(1, Ordering::Relaxed))
267 }
268
269 fn data_ptr(&self, idx: u32) -> Result<*mut u8, SlotError> {
270 let p = self.slot_ptr(idx)?;
271 Ok(unsafe { p.add(SLOT_HEADER_SIZE) })
273 }
274}
275
276impl SlotBackend for PosixSlotAllocator {
277 fn reserve_slot(&self, active_readers_mask: ReaderMask) -> Result<SlotHandle, SlotError> {
278 let mut loaned = self.loaned.lock().map_err(|_| SlotError::LockPoisoned)?;
279 for idx in 0..self.slot_count {
280 if loaned[idx as usize] {
281 continue;
282 }
283 let header = self.read_header(idx)?;
284 if header.sample_size == 0 || header.all_read(active_readers_mask) {
285 loaned[idx as usize] = true;
286 return Ok(SlotHandle {
287 segment_id: 0,
288 slot_index: idx,
289 });
290 }
291 }
292 Err(SlotError::NoFreeSlot)
293 }
294
295 fn commit_slot(&self, handle: SlotHandle, bytes: &[u8]) -> Result<u32, SlotError> {
296 if bytes.len() > self.slot_capacity as usize {
297 return Err(SlotError::SampleTooLarge {
298 sample: bytes.len(),
299 slot_capacity: self.slot_capacity as usize,
300 });
301 }
302 let sn = self.next_sn_inc()?;
303 let sample_size = u32::try_from(bytes.len()).unwrap_or(u32::MAX);
304 let header = SlotHeader::new(sn, sample_size);
305 let dp = self.data_ptr(handle.slot_index)?;
307 unsafe {
309 core::ptr::copy_nonoverlapping(bytes.as_ptr(), dp, bytes.len());
310 }
311 self.write_header(handle.slot_index, header)?;
312 let mut loaned = self.loaned.lock().map_err(|_| SlotError::LockPoisoned)?;
314 loaned[handle.slot_index as usize] = false;
315 Ok(sn)
316 }
317
318 fn discard_slot(&self, handle: SlotHandle) -> Result<(), SlotError> {
319 let mut loaned = self.loaned.lock().map_err(|_| SlotError::LockPoisoned)?;
320 if (handle.slot_index as usize) >= loaned.len() {
321 return Err(SlotError::OutOfBounds);
322 }
323 loaned[handle.slot_index as usize] = false;
324 Ok(())
325 }
326
327 fn read_slot(&self, handle: SlotHandle) -> Result<(SlotHeader, Vec<u8>), SlotError> {
328 let header = self.read_header(handle.slot_index)?;
329 let n = (header.sample_size as usize).min(self.slot_capacity as usize);
330 let dp = self.data_ptr(handle.slot_index)?;
331 let mut buf = alloc::vec![0u8; n];
332 unsafe {
334 core::ptr::copy_nonoverlapping(dp, buf.as_mut_ptr(), n);
335 }
336 Ok((header, buf))
337 }
338
339 fn mark_read(&self, handle: SlotHandle, reader_index: u8) -> Result<(), SlotError> {
340 debug_assert!(reader_index < 32);
341 let p = self.slot_ptr(handle.slot_index)?;
345 let mask_ptr = unsafe { p.add(8) as *const AtomicU32 };
347 let atomic = unsafe { &*mask_ptr };
349 atomic.fetch_or(1u32 << reader_index, Ordering::Relaxed);
350 Ok(())
351 }
352
353 fn mark_reader_disconnected(&self, reader_index: u8) -> Result<(), SlotError> {
354 debug_assert!(reader_index < 32);
355 let bit = 1u32 << reader_index;
356 for idx in 0..self.slot_count {
357 let p = self.slot_ptr(idx)?;
358 let mask_ptr = unsafe { p.add(8) as *const AtomicU32 };
362 let atomic = unsafe { &*mask_ptr };
364 atomic.fetch_or(bit, Ordering::Relaxed);
365 }
366 Ok(())
367 }
368
369 fn slot_count(&self) -> Result<usize, SlotError> {
370 Ok(self.slot_count as usize)
371 }
372
373 fn slot_total_size(&self) -> usize {
374 self.slot_total_size as usize
375 }
376
377 fn slot_capacity(&self) -> usize {
378 self.slot_capacity as usize
379 }
380}
381
382fn align_up(x: usize, n: usize) -> usize {
383 debug_assert!(n.is_power_of_two());
384 (x + n - 1) & !(n - 1)
385}
386
387#[cfg(test)]
388#[allow(clippy::expect_used, clippy::unwrap_used)]
389mod tests {
390 use super::*;
391 use core::sync::atomic::{AtomicU64, Ordering};
392
393 fn unique_flink() -> PathBuf {
394 static N: AtomicU64 = AtomicU64::new(0);
395 let pid = std::process::id();
396 let n = N.fetch_add(1, Ordering::Relaxed);
397 let mut p = std::env::temp_dir();
398 p.push(alloc::format!("zerodds-flatdata-test-{pid}-{n}"));
399 p
400 }
401
402 #[test]
403 fn create_attach_roundtrip() {
404 let flink = unique_flink();
405 let owner = PosixSlotAllocator::create(&flink, 4, 64).expect("create");
406 let consumer = PosixSlotAllocator::attach(&flink).expect("attach");
407 assert_eq!(SlotBackend::slot_count(&owner).unwrap(), 4);
408 assert_eq!(SlotBackend::slot_count(&consumer).unwrap(), 4);
409 assert_eq!(SlotBackend::slot_total_size(&owner), 128);
411 }
412
413 #[test]
414 fn write_read_through_shm() {
415 let flink = unique_flink();
416 let owner = PosixSlotAllocator::create(&flink, 4, 64).expect("create");
417 let consumer = PosixSlotAllocator::attach(&flink).expect("attach");
418
419 let h = SlotBackend::reserve_slot(&owner, 0b1).expect("reserve");
420 let _sn = SlotBackend::commit_slot(&owner, h, &[1, 2, 3, 4]).expect("commit");
421
422 let (header, bytes) = SlotBackend::read_slot(&consumer, h).expect("read");
423 assert_eq!(header.sample_size, 4);
424 assert_eq!(bytes, vec![1, 2, 3, 4]);
425 }
426
427 #[test]
428 fn mark_read_visible_to_owner() {
429 let flink = unique_flink();
430 let owner = PosixSlotAllocator::create(&flink, 1, 64).expect("create");
431 let consumer = PosixSlotAllocator::attach(&flink).expect("attach");
432
433 let h = SlotBackend::reserve_slot(&owner, 0b011).expect("reserve");
434 SlotBackend::commit_slot(&owner, h, &[0xFF]).expect("commit");
435
436 SlotBackend::mark_read(&consumer, h, 0).expect("mark0");
438 SlotBackend::mark_read(&consumer, h, 1).expect("mark1");
439
440 let (header, _) = SlotBackend::read_slot(&owner, h).unwrap();
442 assert_eq!(header.reader_mask, 0b011);
443
444 let _ = SlotBackend::reserve_slot(&owner, 0b011).expect("reuse");
446 }
447
448 #[test]
449 fn next_sn_increments_atomically() {
450 let flink = unique_flink();
451 let owner = PosixSlotAllocator::create(&flink, 4, 64).expect("create");
452
453 let h0 = SlotBackend::reserve_slot(&owner, 0b1).unwrap();
454 let sn0 = SlotBackend::commit_slot(&owner, h0, &[0]).unwrap();
455 let h1 = SlotBackend::reserve_slot(&owner, 0b1).unwrap();
456 let sn1 = SlotBackend::commit_slot(&owner, h1, &[1]).unwrap();
457 assert!(sn1 > sn0);
458 }
459}