minarrow/structs/shared_buffer/
mod.rs

1//! # **SharedBuffer Internal Module** - Backs *Buffer* for ZC MMAP and foreign buffer sharing
2//!
3//! Zero-copy, reference-counted byte buffer with 64-byte SIMD alignment.
4//!
5//! This is an internal module that backs the `Buffer` type supporting
6//! the typed Arrays in *Minarrow*.
7
8use crate::Vec64;
9use crate::structs::shared_buffer::internal::owned::{OWNED_VT, Owned};
10use crate::structs::shared_buffer::internal::pvec::PromotableVec;
11use crate::structs::shared_buffer::internal::vtable::{
12    PROMO_EVEN_VT, PROMO_ODD_VT, PROMO64_EVEN_VT, PROMO64_ODD_VT, STATIC_VT, Vtable,
13};
14use core::ops::RangeBounds;
15use core::{ptr, slice};
16use std::borrow::Borrow;
17use std::cmp::Ordering;
18use std::fmt;
19use std::hash::Hash;
20use std::hash::Hasher;
21use std::ops::Deref;
22use std::sync::atomic::{AtomicPtr, AtomicUsize};
23
24mod internal {
25    pub(crate) mod owned;
26    pub(crate) mod pvec;
27    pub(crate) mod vtable;
28}
29
30// # SharedBuffer
31//
32// Zero-copy, reference-counted byte buffer with SIMD alignment support.
33//
34// ## Purpose
35// This is an internal type that usually should not to be used directly.
36// It's primary purpose is to support the `Buffer` type in zero-copy IO cases,
37// enabling efficient reuse of bytes from network, memory-mapped files, and IPC
38// without copying data, whilst maintaining 64-byte SIMD alignment during operations.
39//
40// ## Features
41// - O(1) pointer-based cloning and slicing
42// - Multiple backend sources: `Vec<u8>`, `Vec64<u8>`, MMAP, `Arc<[u8]>`, static slices
43// - Zero-copy extraction to owned types when unique
44// - Thread-safe reference counting via compact vtables
45//
46// ## Usage
47// ```rust
48// let sb = SharedBuffer::from_vec(vec![1,2,3,4,5]);
49// let slice = sb.slice(0..2);        // Zero-copy slice
50// let clone = sb.clone();            // O(1) reference increment
51// let owned = clone.into_vec();      // Extract to Vec<u8>
52// ```
53//
54// Supports `from_vec64()` for SIMD-aligned buffers, `from_owner()` for arbitrary
55// containers, and `from_static()` for constant data.
56#[repr(C)]
57pub struct SharedBuffer {
58    ptr: *const u8,
59    len: usize,
60    data: AtomicPtr<()>, // header or null
61    vtable: &'static Vtable,
62}
63
64impl SharedBuffer {
65    /// Constructs a new, empty `SharedBuffer`
66    pub const fn new() -> Self {
67        const EMPTY: &[u8] = &[];
68        Self::from_static(EMPTY)
69    }
70
71    /// Constructs a `SharedBuffer` from a static slice
72    pub const fn from_static(s: &'static [u8]) -> Self {
73        Self {
74            ptr: s.as_ptr(),
75            len: s.len(),
76            data: AtomicPtr::new(ptr::null_mut()),
77            vtable: &STATIC_VT,
78        }
79    }
80
81    pub fn from_vec(mut v: Vec<u8>) -> Self {
82        let ptr = v.as_mut_ptr();
83        let len = v.len();
84        let cap = v.capacity();
85        let raw = Box::into_raw(Box::new(PromotableVec::<Vec<u8>> {
86            ref_cnt: AtomicUsize::new(1),
87            inner: v,
88        }));
89        Self {
90            ptr,
91            len,
92            data: AtomicPtr::new(raw.cast()),
93            vtable: if cap & 1 == 0 {
94                &PROMO_EVEN_VT
95            } else {
96                &PROMO_ODD_VT
97            },
98        }
99    }
100
101    /// Constructs a `SharedBuffer` from a SIMD-aligned Vec64<u8>.
102    pub fn from_vec64(mut v: Vec64<u8>) -> Self {
103        let ptr = v.as_mut_ptr();
104        let len = v.len();
105        let cap = v.capacity();
106        let raw = Box::into_raw(Box::new(PromotableVec::<Vec64<u8>> {
107            ref_cnt: AtomicUsize::new(1),
108            inner: v,
109        }));
110        Self {
111            ptr,
112            len,
113            data: AtomicPtr::new(raw.cast()),
114            vtable: if cap & 1 == 0 {
115                &PROMO64_EVEN_VT
116            } else {
117                &PROMO64_ODD_VT
118            },
119        }
120    }
121    /// Constructs a `SharedBuffer` from an arbitrary owner (e.g. Arc<[u8]>, mmap, etc).
122    ///
123    /// The owner must implement `AsRef<[u8]> + Send + Sync + 'static`.
124    pub fn from_owner<T>(owner: T) -> Self
125    where
126        T: AsRef<[u8]> + Send + Sync + 'static,
127    {
128        let raw: *mut Owned<T> = Box::into_raw(Box::new(Owned {
129            ref_cnt: AtomicUsize::new(1),
130            owner,
131        }));
132        let buf = unsafe { (*raw).owner.as_ref() };
133        Self {
134            ptr: buf.as_ptr(),
135            len: buf.len(),
136            data: AtomicPtr::new(raw.cast()),
137            vtable: &OWNED_VT,
138        }
139    }
140
141    /// Returns the number of bytes in this buffer.
142    #[inline]
143    pub fn len(&self) -> usize {
144        self.len
145    }
146
147    /// Returns true if this buffer is empty.
148    #[inline]
149    pub fn is_empty(&self) -> bool {
150        self.len == 0
151    }
152
153    /// Returns a read-only view of the data as a slice.
154    #[inline]
155    pub fn as_slice(&self) -> &[u8] {
156        unsafe { slice::from_raw_parts(self.ptr, self.len) }
157    }
158
159    /// Returns a zero-copy slice of this buffer's data.
160    ///
161    /// Panics if range is out of bounds.
162    pub fn slice(&self, range: impl RangeBounds<usize>) -> Self {
163        use core::ops::Bound::*;
164        let start = match range.start_bound() {
165            Unbounded => 0,
166            Included(&n) => n,
167            Excluded(&n) => n + 1,
168        };
169        let end = match range.end_bound() {
170            Unbounded => self.len,
171            Included(&n) => n + 1,
172            Excluded(&n) => n,
173        };
174        assert!(start <= end && end <= self.len);
175        if start == end {
176            return SharedBuffer::new();
177        }
178
179        let mut s = self.clone();
180        s.ptr = unsafe { s.ptr.add(start) };
181        s.len = end - start;
182        s
183    }
184
185    /// Attempts to convert into an owned `Vec<u8>`.
186    ///
187    /// If this is the unique owner, and it was originally allocated with a Vec<u8>,
188    /// this is zero-copy. Otherwise, the data is cloned.
189    #[inline]
190    pub fn into_vec(self) -> Vec<u8> {
191        // move‑out without running Drop first:
192        let me = core::mem::ManuallyDrop::new(self);
193        unsafe { (me.vtable.to_vec)(&me.data, me.ptr, me.len) }
194    }
195
196    /// Attempts to convert into an owned, SIMD-aligned `Vec64<u8>`.
197    ///
198    /// If this is the unique owner, and it was originally allocated with a Vec64<u8>
199    /// this is zero-copy. Otherwise, the data is cloned.
200    #[inline]
201    pub fn into_vec64(self) -> Vec64<u8> {
202        let me = core::mem::ManuallyDrop::new(self);
203        unsafe { (me.vtable.to_vec64)(&me.data, me.ptr, me.len) }
204    }
205
206    /// Returns `true` if this buffer is the unique owner of its underlying storage.
207    ///
208    /// ## Behaviour by backend:
209    /// - **Vec / Vec64** (`PROMO*`, `OWNED_VT`): Returns `true` only if the internal
210    ///   reference count is `1`.
211    /// - **Static buffers** (`STATIC_VT`): Always returns `true`, as the memory is
212    ///   immutable, globally shared, and never deallocated.  
213    ///   This does **not** imply transfer of ownership — only that no additional
214    ///   runtime references are tracked.
215    /// - **Foreign owners** (e.g., `Arc<[u8]>`): Returns `true` only when there are
216    ///   no other references to the underlying allocation.
217    ///
218    /// This method is primarily for determining whether zero-copy
219    /// conversion to an owned type is possible without cloning the underlying data.
220    /// For static buffers, this will always report `true` because the data is
221    /// permanently resident and immutable.
222    #[inline]
223    pub fn is_unique(&self) -> bool {
224        unsafe { (self.vtable.is_unique)(&self.data) }
225    }
226}
227
228impl Clone for SharedBuffer {
229    /// Clones this buffer. Always O(1), increases refcount if needed.
230    fn clone(&self) -> Self {
231        unsafe { (self.vtable.clone)(&self.data, self.ptr, self.len) }
232    }
233}
234impl Drop for SharedBuffer {
235    /// Drops this buffer, decrementing the reference count and releasing memory if unique.
236    fn drop(&mut self) {
237        unsafe { (self.vtable.drop)(&mut self.data, self.ptr, self.len) }
238    }
239}
240
241/// Default for an empty buffer (same as new()).
242impl Default for SharedBuffer {
243    #[inline]
244    fn default() -> Self {
245        Self::new()
246    }
247}
248
249/// Compare for equality (byte-wise).
250impl PartialEq for SharedBuffer {
251    #[inline]
252    fn eq(&self, other: &Self) -> bool {
253        self.as_slice() == other.as_slice()
254    }
255}
256impl Eq for SharedBuffer {}
257
258impl PartialEq<[u8]> for SharedBuffer {
259    #[inline]
260    fn eq(&self, other: &[u8]) -> bool {
261        self.as_slice() == other
262    }
263}
264impl PartialEq<SharedBuffer> for [u8] {
265    #[inline]
266    fn eq(&self, other: &SharedBuffer) -> bool {
267        self == other.as_slice()
268    }
269}
270impl PartialEq<Vec<u8>> for SharedBuffer {
271    #[inline]
272    fn eq(&self, other: &Vec<u8>) -> bool {
273        self.as_slice() == other.as_slice()
274    }
275}
276impl PartialEq<SharedBuffer> for Vec<u8> {
277    #[inline]
278    fn eq(&self, other: &SharedBuffer) -> bool {
279        self.as_slice() == other.as_slice()
280    }
281}
282
283impl PartialOrd for SharedBuffer {
284    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
285        self.as_slice().partial_cmp(other.as_slice())
286    }
287}
288impl Ord for SharedBuffer {
289    fn cmp(&self, other: &Self) -> Ordering {
290        self.as_slice().cmp(other.as_slice())
291    }
292}
293
294impl Hash for SharedBuffer {
295    fn hash<H: Hasher>(&self, state: &mut H) {
296        self.as_slice().hash(state)
297    }
298}
299
300impl fmt::Debug for SharedBuffer {
301    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
302        f.debug_tuple("SharedBuffer")
303            .field(&self.as_slice())
304            .finish()
305    }
306}
307
308/// Deref to [u8] for zero-copy APIs.
309impl Deref for SharedBuffer {
310    type Target = [u8];
311    #[inline]
312    fn deref(&self) -> &[u8] {
313        self.as_slice()
314    }
315}
316
317impl AsRef<[u8]> for SharedBuffer {
318    #[inline]
319    fn as_ref(&self) -> &[u8] {
320        self.as_slice()
321    }
322}
323impl Borrow<[u8]> for SharedBuffer {
324    #[inline]
325    fn borrow(&self) -> &[u8] {
326        self.as_slice()
327    }
328}
329
330/// From/Into for Vec and Vec64.
331impl From<Vec<u8>> for SharedBuffer {
332    #[inline]
333    fn from(v: Vec<u8>) -> Self {
334        Self::from_vec(v)
335    }
336}
337impl From<Vec64<u8>> for SharedBuffer {
338    #[inline]
339    fn from(v: Vec64<u8>) -> Self {
340        Self::from_vec64(v)
341    }
342}
343impl From<&'static [u8]> for SharedBuffer {
344    #[inline]
345    fn from(s: &'static [u8]) -> Self {
346        Self::from_static(s)
347    }
348}
349
350/// IntoIterator over bytes (by value).
351impl IntoIterator for SharedBuffer {
352    type Item = u8;
353    type IntoIter = std::vec::IntoIter<u8>;
354    fn into_iter(self) -> Self::IntoIter {
355        self.into_vec().into_iter()
356    }
357}
358
359/// By-ref iterator over bytes.
360impl<'a> IntoIterator for &'a SharedBuffer {
361    type Item = &'a u8;
362    type IntoIter = std::slice::Iter<'a, u8>;
363    #[inline]
364    fn into_iter(self) -> Self::IntoIter {
365        self.as_slice().iter()
366    }
367}
368
369/// Construction from iterator.
370impl FromIterator<u8> for SharedBuffer {
371    #[inline]
372    fn from_iter<I: IntoIterator<Item = u8>>(iter: I) -> Self {
373        let v: Vec<u8> = iter.into_iter().collect();
374        Self::from_vec(v)
375    }
376}
377
378impl fmt::Display for SharedBuffer {
379    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
380        match std::str::from_utf8(self.as_slice()) {
381            Ok(s) => f.write_str(s),
382            Err(_) => {
383                // fallback to hex
384                for byte in self.as_slice() {
385                    write!(f, "{:02x}", byte)?;
386                }
387                Ok(())
388            }
389        }
390    }
391}
392
393// SAFETY: SharedBuffer is always safe to send and share between threads.
394unsafe impl Send for SharedBuffer {}
395unsafe impl Sync for SharedBuffer {}
396
397#[cfg(test)]
398mod tests {
399    use std::sync::Arc;
400
401    use super::*;
402
403    #[test]
404    fn roundtrip_vec() {
405        let v = vec![1, 2, 3, 4, 5];
406        let sb = SharedBuffer::from_vec(v);
407        assert_eq!(sb.as_slice(), &[1, 2, 3, 4, 5]);
408        let v2 = sb.clone().into_vec();
409        assert_eq!(v2, vec![1, 2, 3, 4, 5]);
410    }
411
412    #[test]
413    fn roundtrip_vec64() {
414        let mut v64 = Vec64::with_capacity(5);
415        v64.extend_from_slice(&[9, 8, 7, 6, 5]);
416        let sb = SharedBuffer::from_vec64(v64);
417        assert_eq!(sb.as_slice(), &[9, 8, 7, 6, 5]);
418        let v64_out = sb.clone().into_vec64();
419        assert_eq!(v64_out.as_slice(), &[9, 8, 7, 6, 5]);
420    }
421
422    #[test]
423    fn owned_unique_check() {
424        let mmap = Arc::new([10u8, 11, 12, 13]) as Arc<[u8]>;
425        let sb = SharedBuffer::from_owner(mmap);
426        assert!(sb.is_unique());
427        let sb2 = sb.clone();
428        assert!(!sb.is_unique());
429        drop(sb2);
430        assert!(sb.is_unique());
431    }
432}