Skip to main content

polars_buffer/
storage.rs

1use std::any::Any;
2use std::marker::PhantomData;
3use std::mem::ManuallyDrop;
4use std::ops::{Deref, DerefMut};
5use std::process::abort;
6use std::ptr::NonNull;
7use std::sync::atomic::{AtomicU64, Ordering};
8
9use bytemuck::Pod;
10
11// Allows us to transmute between types while also keeping the original
12// stats and drop method of the Vec around.
13struct VecVTable {
14    size: usize,
15    align: usize,
16    drop_buffer: unsafe fn(*mut (), usize),
17}
18
19impl VecVTable {
20    const fn new<T>() -> Self {
21        unsafe fn drop_buffer<T>(ptr: *mut (), cap: usize) {
22            unsafe { drop(Vec::from_raw_parts(ptr.cast::<T>(), 0, cap)) }
23        }
24
25        Self {
26            size: size_of::<T>(),
27            align: align_of::<T>(),
28            drop_buffer: drop_buffer::<T>,
29        }
30    }
31
32    fn new_static<T>() -> &'static Self {
33        const { &Self::new::<T>() }
34    }
35}
36
37enum BackingStorage {
38    Vec {
39        original_capacity: usize, // Elements, not bytes.
40        vtable: &'static VecVTable,
41    },
42    ForeignOwner(Box<dyn Any + Send + 'static>),
43
44    /// Backed by some external method which we do not need to take care of,
45    /// but we still should refcount and drop the SharedStorageInner.
46    External,
47
48    /// Both the backing storage and the SharedStorageInner are leaked, no
49    /// refcounting is done. This technically should be a flag on
50    /// SharedStorageInner instead of being here, but that would add 8 more
51    /// bytes to SharedStorageInner, so here it is.
52    Leaked,
53}
54
55struct SharedStorageInner<T> {
56    ref_count: AtomicU64,
57    ptr: *mut T,
58    length_in_bytes: usize,
59    backing: BackingStorage,
60    // https://github.com/rust-lang/rfcs/blob/master/text/0769-sound-generic-drop.md#phantom-data
61    phantom: PhantomData<T>,
62}
63
64unsafe impl<T: Sync + Send> Sync for SharedStorageInner<T> {}
65
66impl<T> SharedStorageInner<T> {
67    pub fn from_vec(mut v: Vec<T>) -> Self {
68        let length_in_bytes = v.len() * size_of::<T>();
69        let original_capacity = v.capacity();
70        let ptr = v.as_mut_ptr();
71        core::mem::forget(v);
72        Self {
73            ref_count: AtomicU64::new(1),
74            ptr,
75            length_in_bytes,
76            backing: BackingStorage::Vec {
77                original_capacity,
78                vtable: VecVTable::new_static::<T>(),
79            },
80            phantom: PhantomData,
81        }
82    }
83}
84
85impl<T> Drop for SharedStorageInner<T> {
86    fn drop(&mut self) {
87        match core::mem::replace(&mut self.backing, BackingStorage::External) {
88            BackingStorage::ForeignOwner(o) => drop(o),
89            BackingStorage::Vec {
90                original_capacity,
91                vtable,
92            } => unsafe {
93                // Drop the elements in our slice.
94                if std::mem::needs_drop::<T>() {
95                    core::ptr::drop_in_place(core::ptr::slice_from_raw_parts_mut(
96                        self.ptr,
97                        self.length_in_bytes / size_of::<T>(),
98                    ));
99                }
100
101                // Free the buffer.
102                if original_capacity > 0 {
103                    (vtable.drop_buffer)(self.ptr.cast(), original_capacity);
104                }
105            },
106            BackingStorage::External | BackingStorage::Leaked => {},
107        }
108    }
109}
110
111#[repr(transparent)]
112pub struct SharedStorage<T> {
113    inner: NonNull<SharedStorageInner<T>>,
114    phantom: PhantomData<SharedStorageInner<T>>,
115}
116
117unsafe impl<T: Sync + Send> Send for SharedStorage<T> {}
118unsafe impl<T: Sync + Send> Sync for SharedStorage<T> {}
119
120impl<T> Default for SharedStorage<T> {
121    fn default() -> Self {
122        Self::empty()
123    }
124}
125
126impl<T> SharedStorage<T> {
127    /// Creates an empty SharedStorage.
128    pub const fn empty() -> Self {
129        assert!(align_of::<T>() <= 1 << 30);
130        static INNER: SharedStorageInner<()> = SharedStorageInner {
131            ref_count: AtomicU64::new(1),
132            ptr: core::ptr::without_provenance_mut(1 << 30), // Very overaligned for any T.
133            length_in_bytes: 0,
134            backing: BackingStorage::Leaked,
135            phantom: PhantomData,
136        };
137
138        Self {
139            inner: NonNull::new(&raw const INNER as *mut SharedStorageInner<T>).unwrap(),
140            phantom: PhantomData,
141        }
142    }
143
144    /// Creates a SharedStorage backed by this static slice.
145    pub fn from_static(slice: &'static [T]) -> Self {
146        // SAFETY: the slice has a static lifetime.
147        unsafe { Self::from_slice_unchecked(slice) }
148    }
149
150    /// Creates a SharedStorage backed by this slice.
151    ///
152    /// # Safety
153    /// You must ensure this SharedStorage or any of its clones does not outlive
154    /// this slice.
155    pub unsafe fn from_slice_unchecked(slice: &[T]) -> Self {
156        #[expect(clippy::manual_slice_size_calculation)]
157        let length_in_bytes = slice.len() * size_of::<T>();
158        let ptr = slice.as_ptr().cast_mut();
159        let inner = SharedStorageInner {
160            ref_count: AtomicU64::new(1),
161            ptr,
162            length_in_bytes,
163            backing: BackingStorage::External,
164            phantom: PhantomData,
165        };
166        Self {
167            inner: NonNull::new(Box::into_raw(Box::new(inner))).unwrap(),
168            phantom: PhantomData,
169        }
170    }
171
172    /// Calls f with a `SharedStorage` backed by this slice.
173    ///
174    /// Aborts if any clones of the SharedStorage still live when `f` returns.
175    pub fn with_slice<R, F: FnOnce(SharedStorage<T>) -> R>(slice: &[T], f: F) -> R {
176        struct AbortIfNotExclusive<T>(SharedStorage<T>);
177        impl<T> Drop for AbortIfNotExclusive<T> {
178            fn drop(&mut self) {
179                if !self.0.is_exclusive() {
180                    abort()
181                }
182            }
183        }
184
185        unsafe {
186            let ss = AbortIfNotExclusive(Self::from_slice_unchecked(slice));
187            f(ss.0.clone())
188        }
189    }
190
191    /// Calls f with a `SharedStorage` backed by this vec.
192    ///
193    /// # Panics
194    /// Panics if any clones of the SharedStorage still live when `f` returns.
195    pub fn with_vec<R, F: FnOnce(SharedStorage<T>) -> R>(vec: &mut Vec<T>, f: F) -> R {
196        // TODO: this function is intended to allow exclusive conversion back to
197        // a vec, but we need some kind of weak reference for this (that is, two
198        // tiers of 'is_exclusive', one for access and one for keeping the inner
199        // state alive).
200        struct RestoreVec<'a, T>(&'a mut Vec<T>, SharedStorage<T>);
201        impl<'a, T> Drop for RestoreVec<'a, T> {
202            fn drop(&mut self) {
203                *self.0 = self.1.try_take_vec().unwrap();
204            }
205        }
206
207        let tmp = core::mem::take(vec);
208        let ss = RestoreVec(vec, Self::from_vec(tmp));
209        f(ss.1.clone())
210    }
211
212    /// # Safety
213    /// The slice must be valid as long as owner lives.
214    pub unsafe fn from_slice_with_owner<O: Send + 'static>(slice: &[T], owner: O) -> Self {
215        #[expect(clippy::manual_slice_size_calculation)]
216        let length_in_bytes = slice.len() * size_of::<T>();
217        let ptr = slice.as_ptr().cast_mut();
218        let inner = SharedStorageInner {
219            ref_count: AtomicU64::new(1),
220            ptr,
221            length_in_bytes,
222            backing: BackingStorage::ForeignOwner(Box::new(owner)),
223            phantom: PhantomData,
224        };
225        Self {
226            inner: NonNull::new(Box::into_raw(Box::new(inner))).unwrap(),
227            phantom: PhantomData,
228        }
229    }
230
231    pub fn from_owner<O: Send + AsRef<[T]> + 'static>(owner: O) -> Self {
232        let owner = Box::new(owner);
233        let slice: &[T] = (*owner).as_ref();
234        #[expect(clippy::manual_slice_size_calculation)]
235        let length_in_bytes = slice.len() * size_of::<T>();
236        let ptr = slice.as_ptr().cast_mut();
237        let inner = SharedStorageInner {
238            ref_count: AtomicU64::new(1),
239            ptr,
240            length_in_bytes,
241            backing: BackingStorage::ForeignOwner(owner),
242            phantom: PhantomData,
243        };
244        Self {
245            inner: NonNull::new(Box::into_raw(Box::new(inner))).unwrap(),
246            phantom: PhantomData,
247        }
248    }
249
250    pub fn from_vec(v: Vec<T>) -> Self {
251        Self {
252            inner: NonNull::new(Box::into_raw(Box::new(SharedStorageInner::from_vec(v)))).unwrap(),
253            phantom: PhantomData,
254        }
255    }
256
257    /// Leaks this SharedStorage such that it and its inner value is never
258    /// dropped. In return no refcounting needs to be performed.
259    ///
260    /// The SharedStorage must be exclusive.
261    pub fn leak(&mut self) {
262        assert!(self.is_exclusive());
263        unsafe {
264            let inner = &mut *self.inner.as_ptr();
265            core::mem::forget(core::mem::replace(
266                &mut inner.backing,
267                BackingStorage::Leaked,
268            ));
269        }
270    }
271
272    /// # Safety
273    /// The caller is responsible for ensuring the resulting slice is valid and aligned for U.
274    pub unsafe fn transmute_unchecked<U>(self) -> SharedStorage<U> {
275        let storage = SharedStorage {
276            inner: self.inner.cast(),
277            phantom: PhantomData,
278        };
279        std::mem::forget(self);
280        storage
281    }
282}
283
284pub struct SharedStorageAsVecMut<'a, T> {
285    ss: &'a mut SharedStorage<T>,
286    vec: ManuallyDrop<Vec<T>>,
287}
288
289impl<T> Deref for SharedStorageAsVecMut<'_, T> {
290    type Target = Vec<T>;
291
292    fn deref(&self) -> &Self::Target {
293        &self.vec
294    }
295}
296
297impl<T> DerefMut for SharedStorageAsVecMut<'_, T> {
298    fn deref_mut(&mut self) -> &mut Self::Target {
299        &mut self.vec
300    }
301}
302
303impl<T> Drop for SharedStorageAsVecMut<'_, T> {
304    fn drop(&mut self) {
305        unsafe {
306            // Restore the SharedStorage.
307            let vec = ManuallyDrop::take(&mut self.vec);
308            let inner = self.ss.inner.as_ptr();
309            inner.write(SharedStorageInner::from_vec(vec));
310        }
311    }
312}
313
314impl<T> SharedStorage<T> {
315    #[inline(always)]
316    pub const fn len(&self) -> usize {
317        self.inner().length_in_bytes / size_of::<T>()
318    }
319
320    #[inline(always)]
321    pub const fn is_empty(&self) -> bool {
322        self.inner().length_in_bytes == 0
323    }
324
325    #[inline(always)]
326    pub const fn as_ptr(&self) -> *const T {
327        self.inner().ptr
328    }
329
330    #[inline(always)]
331    pub fn is_exclusive(&self) -> bool {
332        // Ordering semantics copied from Arc<T>.
333        self.inner().ref_count.load(Ordering::Acquire) == 1
334    }
335
336    /// Gets the reference count of this storage.
337    ///
338    /// Because this function takes a shared reference this should not be used
339    /// in cases where we are checking if the refcount is one for safety,
340    /// someone else could increment it in the meantime.
341    #[inline(always)]
342    pub fn refcount(&self) -> u64 {
343        // Ordering semantics copied from Arc<T>.
344        self.inner().ref_count.load(Ordering::Acquire)
345    }
346
347    pub fn try_as_mut_slice(&mut self) -> Option<&mut [T]> {
348        // We don't know if what we're created from may be mutated unless we're
349        // backed by an exclusive Vec. Perhaps in the future we can add a
350        // mutability bit?
351        let inner = self.inner();
352        let may_mut = inner.ref_count.load(Ordering::Acquire) == 1
353            && matches!(inner.backing, BackingStorage::Vec { .. });
354        may_mut.then(|| {
355            let inner = self.inner();
356            let len = inner.length_in_bytes / size_of::<T>();
357            unsafe { core::slice::from_raw_parts_mut(inner.ptr, len) }
358        })
359    }
360
361    /// Try to take the vec backing this SharedStorage, leaving this as an empty slice.
362    pub fn try_take_vec(&mut self) -> Option<Vec<T>> {
363        // If there are other references we can't get an exclusive reference.
364        if !self.is_exclusive() {
365            return None;
366        }
367
368        let ret;
369        unsafe {
370            let inner = &mut *self.inner.as_ptr();
371
372            // We may only go back to a Vec if we originally came from a Vec
373            // where the desired size/align matches the original.
374            let BackingStorage::Vec {
375                original_capacity,
376                vtable,
377            } = &mut inner.backing
378            else {
379                return None;
380            };
381
382            if vtable.size != size_of::<T>() || vtable.align != align_of::<T>() {
383                return None;
384            }
385
386            // Steal vec from inner.
387            let len = inner.length_in_bytes / size_of::<T>();
388            ret = Vec::from_raw_parts(inner.ptr, len, *original_capacity);
389            *original_capacity = 0;
390            inner.length_in_bytes = 0;
391        }
392        Some(ret)
393    }
394
395    /// Attempts to call the given function with this SharedStorage as a
396    /// reference to a mutable Vec. If this SharedStorage can't be converted to
397    /// a Vec the function is not called and instead returned as an error.
398    pub fn try_as_mut_vec(&mut self) -> Option<SharedStorageAsVecMut<'_, T>> {
399        Some(SharedStorageAsVecMut {
400            vec: ManuallyDrop::new(self.try_take_vec()?),
401            ss: self,
402        })
403    }
404
405    pub fn try_into_vec(mut self) -> Result<Vec<T>, Self> {
406        self.try_take_vec().ok_or(self)
407    }
408
409    #[inline(always)]
410    const fn inner(&self) -> &SharedStorageInner<T> {
411        unsafe { &*self.inner.as_ptr() }
412    }
413
414    /// # Safety
415    /// May only be called once.
416    #[cold]
417    unsafe fn drop_slow(&mut self) {
418        unsafe { drop(Box::from_raw(self.inner.as_ptr())) }
419    }
420}
421
422impl<T: Pod> SharedStorage<T> {
423    pub fn try_transmute<U: Pod>(self) -> Result<SharedStorage<U>, Self> {
424        let inner = self.inner();
425
426        // The length of the array in bytes must be a multiple of the target size.
427        // We can skip this check if the size of U divides the size of T.
428        if !size_of::<T>().is_multiple_of(size_of::<U>())
429            && !inner.length_in_bytes.is_multiple_of(size_of::<U>())
430        {
431            return Err(self);
432        }
433
434        // The pointer must be properly aligned for U.
435        // We can skip this check if the alignment of U divides the alignment of T.
436        if !align_of::<T>().is_multiple_of(align_of::<U>()) && !inner.ptr.cast::<U>().is_aligned() {
437            return Err(self);
438        }
439
440        Ok(unsafe { self.transmute_unchecked::<U>() })
441    }
442}
443
444impl SharedStorage<u8> {
445    /// Create a [`SharedStorage<u8>`][SharedStorage] from a [`Vec`] of [`Pod`].
446    pub fn bytes_from_pod_vec<T: Pod>(v: Vec<T>) -> Self {
447        // This can't fail, bytes is compatible with everything.
448        SharedStorage::from_vec(v)
449            .try_transmute::<u8>()
450            .unwrap_or_else(|_| unreachable!())
451    }
452}
453
454impl<T> Deref for SharedStorage<T> {
455    type Target = [T];
456
457    #[inline]
458    fn deref(&self) -> &Self::Target {
459        unsafe {
460            let inner = self.inner();
461            let len = inner.length_in_bytes / size_of::<T>();
462            core::slice::from_raw_parts(inner.ptr, len)
463        }
464    }
465}
466
467impl<T> Clone for SharedStorage<T> {
468    fn clone(&self) -> Self {
469        let inner = self.inner();
470        if !matches!(inner.backing, BackingStorage::Leaked) {
471            // Ordering semantics copied from Arc<T>.
472            inner.ref_count.fetch_add(1, Ordering::Relaxed);
473        }
474        Self {
475            inner: self.inner,
476            phantom: PhantomData,
477        }
478    }
479}
480
481impl<T> Drop for SharedStorage<T> {
482    fn drop(&mut self) {
483        let inner = self.inner();
484        if matches!(inner.backing, BackingStorage::Leaked) {
485            return;
486        }
487
488        // Ordering semantics copied from Arc<T>.
489        if inner.ref_count.fetch_sub(1, Ordering::Release) == 1 {
490            std::sync::atomic::fence(Ordering::Acquire);
491            unsafe {
492                self.drop_slow();
493            }
494        }
495    }
496}