Skip to main content

crossbar/platform/
loan.rs

1// Copyright (c) 2026 The Crossbar Contributors
2// This source code is licensed under the Apache License, Version 2.0.
3// See the LICENSE file in the project root for details.
4
5// SPDX-License-Identifier: Apache-2.0
6
7//! Loan, TypedLoan, Topic.
8
9use std::io;
10use std::sync::atomic::{AtomicU32, AtomicU64};
11use std::sync::Arc;
12
13use crate::error::Error;
14use crate::protocol::layout::BLOCK_DATA_OFFSET;
15use crate::protocol::Region;
16
17// ---- Non-temporal copy (x86-64) ----
18
19/// Copies `len` bytes from `src` to `dst` using non-temporal (streaming) stores.
20/// The 16-byte-aligned portion uses `_mm_stream_si128`; any head/tail remainder
21/// uses `copy_nonoverlapping`. An `_mm_sfence` at the end ensures all stores
22/// are globally visible before the function returns.
23///
24/// # Safety
25///
26/// `src` and `dst` must be valid for `len` bytes and must not overlap.
27#[cfg(target_arch = "x86_64")]
28unsafe fn nontemporal_copy(src: *const u8, dst: *mut u8, len: usize) {
29    use core::arch::x86_64::{__m128i, _mm_loadu_si128, _mm_sfence, _mm_stream_si128};
30
31    let mut offset = 0usize;
32
33    // Handle unaligned head: copy bytes until dst is 16-byte aligned
34    let align_offset = dst.align_offset(16).min(len);
35    if align_offset > 0 {
36        core::ptr::copy_nonoverlapping(src, dst, align_offset);
37        offset = align_offset;
38    }
39
40    // Stream 16 bytes at a time via non-temporal stores
41    while offset + 16 <= len {
42        let chunk = _mm_loadu_si128(src.add(offset) as *const __m128i);
43        _mm_stream_si128(dst.add(offset) as *mut __m128i, chunk);
44        offset += 16;
45    }
46
47    // Handle remainder tail
48    if offset < len {
49        core::ptr::copy_nonoverlapping(src.add(offset), dst.add(offset), len - offset);
50    }
51
52    // Ensure all streaming stores are visible before any subsequent loads
53    _mm_sfence();
54}
55
56// ---- Topic ----
57
58/// Handle returned by [`super::shm::Publisher::register`]. Identifies a topic
59/// for use with [`super::shm::Publisher::loan`].
60#[derive(Clone, Copy, Debug, PartialEq, Eq)]
61pub struct Topic {
62    pub(crate) topic_idx: u32,
63    pub(crate) publisher_id: u64,
64}
65
66// ---- Loan ----
67
68/// A mutable view into a pool block in shared memory.
69///
70/// Write your data (any format), then call [`publish`](Self::publish) to
71/// transfer ownership to subscribers. The transfer is O(1) -- only 8 bytes
72/// (block index + data length) are written to the ring, regardless of how
73/// much data you wrote into the block.
74///
75/// If dropped without publishing, the block is freed back to the pool.
76pub struct Loan<'a> {
77    pub(crate) region: &'a Arc<Region>,
78    pub(crate) data_ptr: *mut u8,
79    pub(crate) capacity: usize,
80    pub(crate) len: usize,
81    pub(crate) block_idx: u32,
82    pub(crate) topic_idx: u32,
83    pub(crate) write_seq_atom: &'a AtomicU64,
84    pub(crate) waiters_atom: &'a AtomicU32,
85    pub(crate) single_publisher: bool,
86}
87
88impl<'a> Loan<'a> {
89    /// Returns the writable data region as a mutable slice.
90    pub fn as_mut_slice(&mut self) -> &mut [u8] {
91        unsafe { core::slice::from_raw_parts_mut(self.data_ptr, self.capacity) }
92    }
93
94    /// Copies `data` into the block starting at offset 0.
95    ///
96    /// # Errors
97    ///
98    /// Returns [`Error::DataTooLarge`] if `data` exceeds block data capacity.
99    pub fn set_data(&mut self, data: &[u8]) -> Result<(), Error> {
100        if data.len() > self.capacity {
101            return Err(Error::DataTooLarge {
102                size: data.len(),
103                capacity: self.capacity,
104            });
105        }
106        #[cfg(target_arch = "x86_64")]
107        {
108            if data.len() >= 2_097_152 {
109                // Non-temporal stores bypass the cache hierarchy, avoiding
110                // pollution for large payloads that subscribers will read
111                // from their own cache lines.
112                unsafe { nontemporal_copy(data.as_ptr(), self.data_ptr, data.len()) };
113                self.len = data.len();
114                return Ok(());
115            }
116        }
117        unsafe {
118            core::ptr::copy_nonoverlapping(data.as_ptr(), self.data_ptr, data.len());
119        }
120        self.len = data.len();
121        Ok(())
122    }
123
124    /// Sets the valid data length (use after writing via `as_mut_slice`).
125    ///
126    /// # Errors
127    ///
128    /// Returns [`Error::DataTooLarge`] if `len` exceeds block data capacity.
129    pub fn set_len(&mut self, len: usize) -> Result<(), Error> {
130        if len > self.capacity {
131            return Err(Error::DataTooLarge {
132                size: len,
133                capacity: self.capacity,
134            });
135        }
136        self.len = len;
137        Ok(())
138    }
139
140    /// Maximum bytes this loan can hold.
141    pub fn capacity(&self) -> usize {
142        self.capacity
143    }
144
145    /// Write a `T: Pod` header followed by a `[E: Pod]` array into the block.
146    ///
147    /// Layout in the block: `[T bytes][E bytes * entries.len()][u32 entry_count]`
148    /// The entry count is stored at the END so the reader can verify.
149    ///
150    /// # Errors
151    ///
152    /// Returns `Error::DataTooLarge` if header + array + 4 bytes exceeds capacity.
153    pub fn write_structured<T: crate::Pod, E: crate::Pod>(
154        &mut self,
155        header: &T,
156        entries: &[E],
157    ) -> Result<(), Error> {
158        let header_size = core::mem::size_of::<T>();
159        let array_size = core::mem::size_of_val(entries);
160        let total = header_size + array_size + 4; // +4 for entry count u32
161
162        if total > self.capacity {
163            return Err(Error::DataTooLarge {
164                size: total,
165                capacity: self.capacity,
166            });
167        }
168
169        let buf = self.as_mut_slice();
170        // Write header
171        unsafe {
172            core::ptr::copy_nonoverlapping(
173                header as *const T as *const u8,
174                buf.as_mut_ptr(),
175                header_size,
176            );
177        }
178        // Write array
179        if !entries.is_empty() {
180            unsafe {
181                core::ptr::copy_nonoverlapping(
182                    entries.as_ptr() as *const u8,
183                    buf.as_mut_ptr().add(header_size),
184                    array_size,
185                );
186            }
187        }
188        // Write entry count at the end
189        let count = entries.len() as u32;
190        buf[header_size + array_size..header_size + array_size + 4]
191            .copy_from_slice(&count.to_le_bytes());
192
193        self.set_len(total)?;
194        Ok(())
195    }
196
197    /// Publishes the block and wakes any blocked subscribers.
198    /// O(1) -- writes 8 bytes to the ring regardless of payload size.
199    #[inline]
200    pub fn publish(self) {
201        self.commit(true);
202        core::mem::forget(self); // block is published; don't run Drop
203    }
204
205    /// Publishes without waking subscribers. Saves ~170 ns by skipping
206    /// the futex syscall.
207    #[inline]
208    pub fn publish_silent(self) {
209        self.commit(false);
210        core::mem::forget(self);
211    }
212
213    #[inline]
214    fn commit(&self, wake: bool) {
215        // Runtime check: len must fit in u32 (structurally enforced by u32 block_size,
216        // but defense-in-depth against corruption via io::Write accumulation)
217        assert!(self.len <= u32::MAX as usize, "data_len overflow");
218        self.region.commit_to_ring(
219            self.block_idx,
220            self.len as u32,
221            self.topic_idx,
222            self.write_seq_atom,
223            self.waiters_atom,
224            wake,
225            self.single_publisher,
226        );
227    }
228}
229
230impl<'a> Drop for Loan<'a> {
231    fn drop(&mut self) {
232        // Loan dropped without publish -- return block to pool
233        self.region.free_block(self.block_idx);
234    }
235}
236
237impl<'a> io::Write for Loan<'a> {
238    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
239        let remaining = self.capacity - self.len;
240        if remaining == 0 && !buf.is_empty() {
241            return Err(io::Error::new(io::ErrorKind::WriteZero, "block full"));
242        }
243        let n = buf.len().min(remaining);
244        unsafe {
245            core::ptr::copy_nonoverlapping(buf.as_ptr(), self.data_ptr.add(self.len), n);
246        }
247        self.len += n;
248        Ok(n)
249    }
250
251    fn flush(&mut self) -> io::Result<()> {
252        Ok(())
253    }
254}
255
256// ---- TypedLoan ----
257
258/// Typed mutable loan of a `T: Pod` value in shared memory.
259///
260/// Use [`send`](Self::send) to write a value and publish in one step,
261/// or [`as_mut`](Self::as_mut) + [`publish`](Self::publish) to fill
262/// fields individually (born-in-SHM pattern).
263///
264/// If dropped without publishing, the block is freed back to the pool.
265pub struct TypedLoan<'a, T: crate::Pod> {
266    pub(crate) region: &'a Arc<Region>,
267    pub(crate) block_idx: u32,
268    pub(crate) topic_idx: u32,
269    pub(crate) write_seq_atom: &'a AtomicU64,
270    pub(crate) waiters_atom: &'a AtomicU32,
271    pub(crate) single_publisher: bool,
272    pub(crate) _marker: core::marker::PhantomData<&'a mut T>,
273}
274
275impl<'a, T: crate::Pod> TypedLoan<'a, T> {
276    /// Returns a mutable reference to the `T` value in shared memory.
277    ///
278    /// Use this for the born-in-SHM pattern: fill fields individually,
279    /// then call [`publish`](Self::publish).
280    #[allow(clippy::should_implement_trait)]
281    pub fn as_mut(&mut self) -> &mut T {
282        unsafe {
283            let ptr = self.region.block_ptr(self.block_idx).add(BLOCK_DATA_OFFSET);
284            &mut *(ptr as *mut T)
285        }
286    }
287
288    /// Writes `value` into the shared-memory block and publishes it,
289    /// waking any blocked subscribers.
290    #[inline]
291    pub fn send(self, value: T) {
292        unsafe {
293            let ptr = self.region.block_ptr(self.block_idx).add(BLOCK_DATA_OFFSET);
294            core::ptr::write(ptr as *mut T, value);
295        }
296        self.commit(true);
297        core::mem::forget(self);
298    }
299
300    /// Publishes whatever was written via [`as_mut`](Self::as_mut),
301    /// waking any blocked subscribers.
302    #[inline]
303    pub fn publish(self) {
304        self.commit(true);
305        core::mem::forget(self);
306    }
307
308    /// Publishes without waking subscribers.
309    #[inline]
310    pub fn publish_silent(self) {
311        self.commit(false);
312        core::mem::forget(self);
313    }
314
315    #[inline]
316    fn commit(&self, wake: bool) {
317        self.region.commit_to_ring(
318            self.block_idx,
319            core::mem::size_of::<T>() as u32,
320            self.topic_idx,
321            self.write_seq_atom,
322            self.waiters_atom,
323            wake,
324            self.single_publisher,
325        );
326    }
327}
328
329impl<T: crate::Pod> Drop for TypedLoan<'_, T> {
330    fn drop(&mut self) {
331        // Loan dropped without publish -- return block to pool
332        self.region.free_block(self.block_idx);
333    }
334}
335
336// ---- PinnedLoan ----
337
338/// Mutable view into a pinned block in shared memory.
339///
340/// The block is permanently assigned to the topic -- no allocation on loan,
341/// no refcount. `publish()` is a single atomic Release store.
342///
343/// # Panics
344///
345/// `loan_pinned` returns an error if subscribers hold a `PinnedSample`.
346/// Overlapping reads and writes are prevented at runtime.
347pub struct PinnedLoan<'a> {
348    pub(crate) region: &'a Arc<Region>,
349    pub(crate) data_ptr: *mut u8,
350    pub(crate) capacity: usize,
351    pub(crate) len: usize,
352    pub(crate) topic_idx: u32,
353    pub(crate) write_seq_atom: &'a AtomicU64,
354    pub(crate) waiters_atom: &'a AtomicU32,
355    pub(crate) readers: &'a AtomicU32,
356}
357
358impl<'a> PinnedLoan<'a> {
359    /// Returns the writable data region as a mutable slice.
360    pub fn as_mut_slice(&mut self) -> &mut [u8] {
361        unsafe { core::slice::from_raw_parts_mut(self.data_ptr, self.capacity) }
362    }
363
364    /// Copies `data` into the block starting at offset 0.
365    ///
366    /// # Errors
367    ///
368    /// Returns [`Error::DataTooLarge`] if `data` exceeds block data capacity.
369    pub fn set_data(&mut self, data: &[u8]) -> Result<(), Error> {
370        if data.len() > self.capacity {
371            return Err(Error::DataTooLarge {
372                size: data.len(),
373                capacity: self.capacity,
374            });
375        }
376        unsafe {
377            core::ptr::copy_nonoverlapping(data.as_ptr(), self.data_ptr, data.len());
378        }
379        self.len = data.len();
380        Ok(())
381    }
382
383    /// Sets the valid data length.
384    ///
385    /// # Errors
386    ///
387    /// Returns [`Error::DataTooLarge`] if `len` exceeds block data capacity.
388    pub fn set_len(&mut self, len: usize) -> Result<(), Error> {
389        if len > self.capacity {
390            return Err(Error::DataTooLarge {
391                size: len,
392                capacity: self.capacity,
393            });
394        }
395        self.len = len;
396        Ok(())
397    }
398
399    /// Maximum bytes this loan can hold.
400    pub fn capacity(&self) -> usize {
401        self.capacity
402    }
403
404    /// Write a `T: Pod` header followed by a `[E: Pod]` array into the block.
405    ///
406    /// Layout in the block: `[T bytes][E bytes * entries.len()][u32 entry_count]`
407    /// The entry count is stored at the END so the reader can verify.
408    ///
409    /// # Errors
410    ///
411    /// Returns `Error::DataTooLarge` if header + array + 4 bytes exceeds capacity.
412    pub fn write_structured<T: crate::Pod, E: crate::Pod>(
413        &mut self,
414        header: &T,
415        entries: &[E],
416    ) -> Result<(), Error> {
417        let header_size = core::mem::size_of::<T>();
418        let array_size = core::mem::size_of_val(entries);
419        let total = header_size + array_size + 4; // +4 for entry count u32
420
421        if total > self.capacity {
422            return Err(Error::DataTooLarge {
423                size: total,
424                capacity: self.capacity,
425            });
426        }
427
428        let buf = self.as_mut_slice();
429        // Write header
430        unsafe {
431            core::ptr::copy_nonoverlapping(
432                header as *const T as *const u8,
433                buf.as_mut_ptr(),
434                header_size,
435            );
436        }
437        // Write array
438        if !entries.is_empty() {
439            unsafe {
440                core::ptr::copy_nonoverlapping(
441                    entries.as_ptr() as *const u8,
442                    buf.as_mut_ptr().add(header_size),
443                    array_size,
444                );
445            }
446        }
447        // Write entry count at the end
448        let count = entries.len() as u32;
449        buf[header_size + array_size..header_size + array_size + 4]
450            .copy_from_slice(&count.to_le_bytes());
451
452        self.set_len(total)?;
453        Ok(())
454    }
455
456    /// Publish. 1 atomic Release store + clear writer sentinel.
457    #[inline]
458    pub fn publish(self) {
459        assert!(self.len <= u32::MAX as usize, "data_len overflow");
460        self.region.commit_pinned(
461            self.len as u32,
462            self.topic_idx,
463            self.write_seq_atom,
464            self.waiters_atom,
465        );
466        // Clear the writer sentinel so subscribers can enter again
467        self.readers.store(0, core::sync::atomic::Ordering::Release);
468        // Skip Drop — we already cleared the sentinel
469        core::mem::forget(self);
470    }
471}
472
473impl Drop for PinnedLoan<'_> {
474    fn drop(&mut self) {
475        // PinnedLoan dropped without publish — clear the writer sentinel
476        // so subscribers aren't permanently blocked. Data in the pinned
477        // block is stale (partial write), but the old pinned_seq is still
478        // valid so subscribers won't see the partial data.
479        self.readers.store(0, core::sync::atomic::Ordering::Release);
480    }
481}