Skip to main content

crossbar/protocol/
region.rs

1// Copyright (c) 2026 The Crossbar Contributors
2// This source code is licensed under the Apache License, Version 2.0.
3// See the LICENSE file in the project root for details.
4
5// SPDX-License-Identifier: Apache-2.0
6
7//! Region: shared state for the mmap region.
8//!
9//! Works on raw pointers -- no OS dependency. The platform layer constructs
10//! a `Region` from an mmap'd pointer.
11
12use super::config::Config;
13
14/// Shared state for the mmap region -- held by both publisher/subscriber
15/// and by `Sample` (via `Arc`) to keep the mmap alive.
16pub struct Region {
17    #[cfg(feature = "std")]
18    base: *mut u8,
19    #[cfg(feature = "std")]
20    pub(crate) config: Config,
21    #[cfg(feature = "std")]
22    pub(crate) pool_offset: usize,
23    #[cfg(feature = "std")]
24    pub(crate) last_freed: core::sync::atomic::AtomicU32,
25    // In no_std mode, Region is a zero-sized type placeholder.
26    #[cfg(not(feature = "std"))]
27    _phantom: core::marker::PhantomData<Config>,
28}
29
30// SAFETY: The mmap region is process-shared memory backed by a named file in
31// /dev/shm. All cross-process access is mediated by atomic operations in the
32// caller. The raw pointer is never dereferenced without explicit unsafe blocks.
33unsafe impl Send for Region {}
34unsafe impl Sync for Region {}
35
36#[cfg(feature = "std")]
37use super::layout::*;
38#[cfg(feature = "std")]
39use crate::platform::notify;
40#[cfg(feature = "std")]
41use core::sync::atomic::{AtomicU32, AtomicU64, Ordering};
42
43#[cfg(feature = "std")]
44impl Region {
45    /// Construct a Region from a raw pointer and length.
46    ///
47    /// # Safety
48    ///
49    /// `base` must point to a valid, mmap'd region of at least `len` bytes
50    /// that remains valid for the lifetime of this Region.
51    pub(crate) unsafe fn from_raw(base: *mut u8, _len: usize, config: Config) -> Self {
52        let pool_offset = block_pool_offset(&config);
53        Self {
54            base,
55            config,
56            pool_offset,
57            last_freed: AtomicU32::new(NO_BLOCK),
58        }
59    }
60
61    /// Returns the raw base pointer to the mmap region.
62    #[inline]
63    pub(crate) fn base_ptr(&self) -> *mut u8 {
64        self.base
65    }
66
67    #[inline]
68    pub(crate) fn pool_head(&self) -> &AtomicU64 {
69        unsafe { &*(self.base.add(GH_POOL_HEAD) as *const AtomicU64) }
70    }
71
72    /// Returns a pointer to the block at `idx`, or `None` if out of bounds.
73    #[inline]
74    pub(crate) fn block_ptr_checked(&self, idx: u32) -> Option<*mut u8> {
75        if (idx as usize) >= self.config.block_count as usize {
76            return None;
77        }
78        Some(unsafe {
79            self.base
80                .add(self.pool_offset + idx as usize * self.config.block_size as usize)
81        })
82    }
83
84    /// Returns a pointer to the block at `idx`.
85    ///
86    /// # Panics
87    ///
88    /// Panics in debug mode if `idx >= block_count`.
89    #[inline]
90    pub(crate) fn block_ptr(&self, idx: u32) -> *mut u8 {
91        debug_assert!((idx as usize) < self.config.block_count as usize);
92        unsafe {
93            self.base
94                .add(self.pool_offset + idx as usize * self.config.block_size as usize)
95        }
96    }
97
98    /// Returns the refcount atomic for block `idx`, with bounds check.
99    /// Returns `None` if `idx >= block_count`.
100    #[inline]
101    pub(crate) fn block_refcount_checked(&self, idx: u32) -> Option<&AtomicU32> {
102        let ptr = self.block_ptr_checked(idx)?;
103        Some(unsafe { &*(ptr.add(BK_REFCOUNT) as *const AtomicU32) })
104    }
105
106    #[inline]
107    pub(crate) fn block_refcount(&self, idx: u32) -> &AtomicU32 {
108        unsafe { &*(self.block_ptr(idx).add(BK_REFCOUNT) as *const AtomicU32) }
109    }
110
111    /// Allocates a block from the free-list.
112    ///
113    /// Returns:
114    /// - `Ok(Some(idx))` — success, block `idx` was allocated.
115    /// - `Ok(None)` — pool empty (normal backpressure).
116    /// - `Err(())` — free-list corruption detected (out-of-bounds index).
117    #[inline]
118    pub(crate) fn alloc_block(&self) -> Result<Option<u32>, ()> {
119        let mut head = self.pool_head().load(Ordering::Acquire);
120        loop {
121            let (gen, idx) = unpack(head);
122            if idx == NO_BLOCK {
123                return Ok(None);
124            }
125            // Validate idx is within bounds (H6: free-list corruption defense)
126            if (idx as usize) >= self.config.block_count as usize {
127                return Err(());
128            }
129            let block = self.block_ptr(idx);
130            let next = unsafe { &*(block as *const AtomicU32) }.load(Ordering::Relaxed);
131
132            // Validate next pointer too (defense against corrupted free-list)
133            if next != NO_BLOCK && (next as usize) >= self.config.block_count as usize {
134                return Err(());
135            }
136
137            let new_head = pack(gen.wrapping_add(1), next);
138
139            // Prefetch the block data into L1 cache before the CAS attempt
140            #[cfg(target_arch = "x86_64")]
141            unsafe {
142                core::arch::x86_64::_mm_prefetch::<{ core::arch::x86_64::_MM_HINT_T0 }>(
143                    block as *const i8,
144                );
145            }
146            #[cfg(target_arch = "aarch64")]
147            unsafe {
148                core::arch::asm!(
149                    "prfm pstl1keep, [{addr}]",
150                    addr = in(reg) block,
151                    options(nostack, preserves_flags)
152                );
153            }
154
155            match self.pool_head().compare_exchange_weak(
156                head,
157                new_head,
158                Ordering::AcqRel,
159                Ordering::Acquire,
160            ) {
161                Ok(_) => return Ok(Some(idx)),
162                Err(current) => head = current,
163            }
164        }
165    }
166
167    /// Try to grab the recycled block (cache-warm from the last publish).
168    /// Returns `Some(idx)` if a recycled block was available, `None` otherwise.
169    #[inline]
170    pub(crate) fn alloc_recycled(&self) -> Option<u32> {
171        let idx = self.last_freed.swap(NO_BLOCK, Ordering::AcqRel);
172        if idx != NO_BLOCK {
173            Some(idx)
174        } else {
175            None
176        }
177    }
178
179    #[inline]
180    pub(crate) fn free_block(&self, idx: u32) {
181        if (idx as usize) >= self.config.block_count as usize {
182            return; // Bounds check: silently ignore corrupt block_idx in Drop paths
183        }
184        let mut head = self.pool_head().load(Ordering::Acquire);
185        loop {
186            let (gen, old_head_idx) = unpack(head);
187            // Write next pointer into the block's free-list link (offset 0)
188            unsafe { &*(self.block_ptr(idx) as *const AtomicU32) }
189                .store(old_head_idx, Ordering::Relaxed);
190            let new_head = pack(gen.wrapping_add(1), idx);
191            match self.pool_head().compare_exchange_weak(
192                head,
193                new_head,
194                Ordering::AcqRel,
195                Ordering::Acquire,
196            ) {
197                Ok(_) => return,
198                Err(current) => head = current,
199            }
200        }
201    }
202
203    pub(crate) fn init_free_list(&self) {
204        for i in 0..self.config.block_count {
205            let next = if i + 1 < self.config.block_count {
206                i + 1
207            } else {
208                NO_BLOCK
209            };
210            let ptr = self.block_ptr(i);
211            unsafe { &*(ptr as *const AtomicU32) }.store(next, Ordering::Relaxed);
212            // Zero refcount
213            unsafe { &*(ptr.add(BK_REFCOUNT) as *const AtomicU32) }.store(0, Ordering::Relaxed);
214        }
215        self.pool_head().store(pack(0, 0), Ordering::Release);
216    }
217
218    /// Data capacity per block (block_size minus the 8-byte header).
219    pub(crate) fn data_capacity(&self) -> usize {
220        self.config.block_size as usize - BLOCK_DATA_OFFSET
221    }
222
223    fn heartbeat_atom(&self) -> &AtomicU64 {
224        unsafe { &*(self.base.add(GH_HEARTBEAT) as *const AtomicU64) }
225    }
226
227    /// Returns microseconds since UNIX epoch, or error if clock is behind epoch.
228    #[cfg(feature = "std")]
229    #[allow(clippy::cast_possible_truncation)]
230    fn now_micros() -> Result<u64, crate::error::Error> {
231        std::time::SystemTime::now()
232            .duration_since(std::time::UNIX_EPOCH)
233            .map(|d| d.as_micros() as u64)
234            .map_err(|_| crate::error::Error::ClockError)
235    }
236
237    #[cfg(feature = "std")]
238    pub(crate) fn update_heartbeat(&self) -> Result<(), crate::error::Error> {
239        let now = Self::now_micros()?;
240        self.heartbeat_atom().fetch_max(now, Ordering::Release);
241        Ok(())
242    }
243
244    #[cfg(feature = "std")]
245    #[allow(clippy::cast_possible_truncation)]
246    pub(crate) fn check_heartbeat(&self) -> Result<(), crate::error::Error> {
247        let hb = self.heartbeat_atom().load(Ordering::Acquire);
248        let now = Self::now_micros()?;
249        if now.saturating_sub(hb) > self.config.stale_timeout.as_micros() as u64 {
250            return Err(crate::error::Error::PublisherDead);
251        }
252        Ok(())
253    }
254
255    /// Pinned publish: store (seq, data_len) atomically. The block is permanently
256    /// assigned -- no alloc, no free, no refcount. 1 atomic Release store.
257    #[cfg(feature = "std")]
258    #[inline]
259    pub(crate) fn commit_pinned(
260        &self,
261        data_len: u32,
262        topic_idx: u32,
263        write_seq_atom: &AtomicU64,
264        waiters_atom: &AtomicU32,
265    ) {
266        // Claim sequence number (still needed for subscriber to detect new data)
267        let seq = write_seq_atom.fetch_add(1, Ordering::AcqRel) + 1;
268
269        // Store packed (seq:32 | data_len:32) in the pinned seqlock field.
270        // The Release ordering ensures all data writes are visible before this store.
271        let off = topic_entry_off(topic_idx);
272        let pinned_seq = unsafe { &*(self.base.add(off + TE_PINNED_SEQ) as *const AtomicU64) };
273        let packed = (seq & 0xFFFF_FFFF) << 32 | u64::from(data_len);
274        pinned_seq.store(packed, Ordering::Release);
275
276        // Smart wake: use write_seq low-32 as futex address (same as regular path)
277        if waiters_atom.load(Ordering::Acquire) > 0 {
278            let seq_futex = unsafe { &*(write_seq_atom as *const AtomicU64 as *const AtomicU32) };
279            notify::wake_all(seq_futex);
280        }
281    }
282
283    #[inline]
284    pub(crate) fn set_pinned_block(&self, topic_idx: u32, block_idx: u32) {
285        let off = topic_entry_off(topic_idx);
286        // Use atomic store with Release so subscriber's Acquire load sees the value
287        unsafe { &*(self.base.add(off + TE_PINNED_BLOCK) as *const AtomicU32) }
288            .store(block_idx, Ordering::Release);
289    }
290
291    /// Load pinned block index atomically.
292    #[inline]
293    pub(crate) fn pinned_block(&self, topic_idx: u32) -> u32 {
294        let off = topic_entry_off(topic_idx);
295        unsafe { &*(self.base.add(off + TE_PINNED_BLOCK) as *const AtomicU32) }
296            .load(Ordering::Acquire)
297    }
298
299    /// Returns the pinned readers atomic for a topic.
300    #[inline]
301    pub(crate) fn pinned_readers(&self, topic_idx: u32) -> &AtomicU32 {
302        let off = topic_entry_off(topic_idx);
303        unsafe { &*(self.base.add(off + TE_PINNED_READERS) as *const AtomicU32) }
304    }
305
306    /// Shared commit logic for both `Loan` and `TypedLoan`.
307    ///
308    /// Atomically claims the next sequence number via `fetch_add` on
309    /// `write_seq_atom`, then uses CAS-based seqlock to prevent two
310    /// publishers from writing the same ring slot simultaneously.
311    #[cfg(feature = "std")]
312    #[inline]
313    #[allow(clippy::too_many_arguments)]
314    pub(crate) fn commit_to_ring(
315        &self,
316        block_idx: u32,
317        data_len: u32,
318        topic_idx: u32,
319        write_seq_atom: &AtomicU64,
320        waiters_atom: &AtomicU32,
321        wake: bool,
322        single_publisher: bool,
323    ) {
324        // 1. Atomically claim the next sequence number
325        let seq = write_seq_atom.fetch_add(1, Ordering::AcqRel) + 1;
326
327        let ring_mask = self.config.ring_depth as u64 - 1;
328        let slot = (seq & ring_mask) as u32;
329        let entry_off = ring_entry_off(&self.config, topic_idx, slot);
330        let entry_ptr = unsafe { self.base.add(entry_off) };
331        let entry_seq = unsafe { &*(entry_ptr.add(RE_SEQ) as *const AtomicU64) };
332
333        // Prefetch next slot (slot+1, masked to ring depth) to hide the RFO stall
334        let next_slot = ((seq + 1) & ring_mask) as u32;
335        let next_entry_off = ring_entry_off(&self.config, topic_idx, next_slot);
336        let next_entry_ptr = unsafe { self.base.add(next_entry_off) };
337        #[cfg(target_arch = "x86_64")]
338        unsafe {
339            core::arch::x86_64::_mm_prefetch::<{ core::arch::x86_64::_MM_HINT_T0 }>(
340                next_entry_ptr as *const i8,
341            );
342        }
343        #[cfg(target_arch = "aarch64")]
344        unsafe {
345            core::arch::asm!(
346                "prfm pstl1keep, [{addr}]",
347                addr = in(reg) next_entry_ptr,
348                options(nostack, preserves_flags)
349            );
350        }
351
352        // 2. Acquire the ring slot via CAS (prevents two publishers from
353        //    writing the same slot when seqs differ by exactly ring_depth).
354        //    Single-publisher mode skips the CAS loop entirely.
355        if single_publisher {
356            entry_seq.store(SEQ_WRITING, Ordering::Release);
357        } else {
358            let mut spin_count = 0u32;
359            loop {
360                let current = entry_seq.load(Ordering::Acquire);
361                if current == SEQ_WRITING {
362                    spin_count += 1;
363                    if spin_count > 1_000_000 {
364                        // Slot stuck in WRITING state — likely a crashed publisher.
365                        // Force-claim it by overwriting.
366                        break;
367                    }
368                    crate::wait::yield_hint();
369                    continue;
370                }
371                if entry_seq
372                    .compare_exchange_weak(
373                        current,
374                        SEQ_WRITING,
375                        Ordering::AcqRel,
376                        Ordering::Acquire,
377                    )
378                    .is_ok()
379                {
380                    break;
381                }
382                crate::wait::yield_hint();
383            }
384        }
385
386        // 3. Read old block_idx from the slot we're overwriting
387        let old_block_idx =
388            unsafe { &*(entry_ptr.add(RE_BLOCK_IDX) as *const AtomicU32) }.load(Ordering::Relaxed);
389
390        // 4. Set new block's refcount to 1
391        self.block_refcount(block_idx).store(1, Ordering::Release);
392
393        // 5. Write new block_idx and data_len
394        unsafe { &*(entry_ptr.add(RE_BLOCK_IDX) as *const AtomicU32) }
395            .store(block_idx, Ordering::Relaxed);
396        unsafe { &*(entry_ptr.add(RE_DATA_LEN) as *const AtomicU32) }
397            .store(data_len, Ordering::Relaxed);
398
399        // 6. Seqlock close -- data visible to subscribers
400        entry_seq.store(seq, Ordering::Release);
401
402        // 7. Release old block (decrement refcount; recycle if no subscribers hold it)
403        if old_block_idx != NO_BLOCK && (old_block_idx as usize) < self.config.block_count as usize
404        {
405            let prev = self
406                .block_refcount(old_block_idx)
407                .fetch_sub(1, Ordering::AcqRel);
408            if prev == 1 {
409                // Recycle this block for warm reuse by the next alloc.
410                // If a previous recycled block wasn't claimed, free it to the pool.
411                let old_recycled = self.last_freed.swap(old_block_idx, Ordering::AcqRel);
412                if old_recycled != NO_BLOCK {
413                    self.free_block(old_recycled);
414                }
415            }
416        }
417
418        // 8. Smart wake: only issue the expensive futex_wake syscall (~170ns)
419        //    when a subscriber is actually blocked in recv(). Reinterpret the
420        //    low 32 bits of write_seq as the futex address -- the fetch_add
421        //    already changed the value so waiters will see it and wake up.
422        if wake && waiters_atom.load(Ordering::Acquire) > 0 {
423            // Safety: AtomicU64 is at least 4-byte aligned. On little-endian
424            // (enforced by compile_error in layout.rs), the low 32 bits are
425            // at the base address.
426            let seq_futex = unsafe { &*(write_seq_atom as *const AtomicU64 as *const AtomicU32) };
427            notify::wake_all(seq_futex);
428        }
429    }
430}
431
432#[cfg(feature = "std")]
433/// Release a block's refcount. If this is the last reference, free it to the pool.
434/// Shared by `Sample::drop`, `TypedSample::drop`, and `CrossbarSample::drop`.
435pub(crate) fn release_block(region: &Region, block_idx: u32) {
436    // Runtime bounds check (Codex/Security: must not use debug_assert in Drop paths)
437    let refcount = match region.block_refcount_checked(block_idx) {
438        Some(rc) => rc,
439        None => return, // OOB block_idx — silently ignore in Drop path
440    };
441    let prev = refcount.fetch_sub(1, Ordering::Release);
442    if prev == 1 {
443        core::sync::atomic::fence(Ordering::Acquire);
444        region.free_block(block_idx);
445    }
446}