Skip to main content

polars_buffer/
buffer.rs

1use std::ops::{Deref, Range, RangeBounds};
2use std::sync::LazyLock;
3
4use bytemuck::{Pod, Zeroable};
5use either::Either;
6
7use crate::storage::SharedStorage;
8
9/// [`Buffer`] is a contiguous memory region that can be shared across
10/// thread boundaries.
11///
12/// The easiest way to think about [`Buffer<T>`] is being equivalent to
13/// a `Arc<Vec<T>>`, with the following differences:
14/// * slicing and cloning is `O(1)`.
15/// * it supports external allocated memory
16///
17/// The easiest way to create one is to use its implementation of `From<Vec<T>>`.
18///
19/// # Examples
20/// ```
21/// use polars_buffer::Buffer;
22///
23/// let mut buffer: Buffer<u32> = vec![1, 2, 3].into();
24/// assert_eq!(buffer.as_ref(), [1, 2, 3].as_ref());
25///
26/// // it supports copy-on-write semantics (i.e. back to a `Vec`)
27/// let vec: Vec<u32> = buffer.into_mut().right().unwrap();
28/// assert_eq!(vec, vec![1, 2, 3]);
29///
30/// // cloning and slicing is `O(1)` (data is shared)
31/// let mut buffer: Buffer<u32> = vec![1, 2, 3].into();
32/// let mut sliced = buffer.clone();
33/// sliced.slice(1, 1);
34/// assert_eq!(sliced.as_ref(), [2].as_ref());
35/// // but cloning forbids getting mut since `slice` and `buffer` now share data
36/// assert_eq!(buffer.get_mut_slice(), None);
37/// ```
38pub struct Buffer<T> {
39    /// The internal byte buffer.
40    storage: SharedStorage<T>,
41
42    /// A pointer into the buffer where our data starts.
43    ptr: *const T,
44
45    // The length of the buffer.
46    length: usize,
47}
48
49impl<T> Clone for Buffer<T> {
50    fn clone(&self) -> Self {
51        Self {
52            storage: self.storage.clone(),
53            ptr: self.ptr,
54            length: self.length,
55        }
56    }
57}
58
59unsafe impl<T: Send + Sync> Sync for Buffer<T> {}
60unsafe impl<T: Send + Sync> Send for Buffer<T> {}
61
62impl<T: PartialEq> PartialEq for Buffer<T> {
63    #[inline]
64    fn eq(&self, other: &Self) -> bool {
65        self.deref() == other.deref()
66    }
67}
68
69impl<T: Eq> Eq for Buffer<T> {}
70
71impl<T: std::hash::Hash> std::hash::Hash for Buffer<T> {
72    #[inline]
73    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
74        self.as_slice().hash(state);
75    }
76}
77
78impl<T: std::fmt::Debug> std::fmt::Debug for Buffer<T> {
79    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
80        std::fmt::Debug::fmt(&**self, f)
81    }
82}
83
84impl<T> Default for Buffer<T> {
85    #[inline]
86    fn default() -> Self {
87        Self::new()
88    }
89}
90
91impl<T> Buffer<T> {
92    /// Creates an empty [`Buffer`].
93    #[inline]
94    pub const fn new() -> Self {
95        Self::from_storage(SharedStorage::empty())
96    }
97
98    /// Auxiliary method to create a new Buffer.
99    pub const fn from_storage(storage: SharedStorage<T>) -> Self {
100        let ptr = storage.as_ptr();
101        let length = storage.len();
102        Buffer {
103            storage,
104            ptr,
105            length,
106        }
107    }
108
109    /// Creates a [`Buffer`] backed by static data.
110    pub fn from_static(data: &'static [T]) -> Self {
111        Self::from_storage(SharedStorage::from_static(data))
112    }
113
114    /// Creates a [`Buffer`] backed by a vec.
115    pub fn from_vec(data: Vec<T>) -> Self {
116        Self::from_storage(SharedStorage::from_vec(data))
117    }
118
119    /// Creates a [`Buffer`] backed by `owner`.
120    pub fn from_owner<O: Send + AsRef<[T]> + 'static>(owner: O) -> Self {
121        Self::from_storage(SharedStorage::from_owner(owner))
122    }
123
124    /// Calls f with a [`Buffer`] backed by this slice.
125    ///
126    /// Aborts if any clones of the [`Buffer`] still live when `f` returns.
127    pub fn with_slice<R, F: FnOnce(Buffer<T>) -> R>(slice: &[T], f: F) -> R {
128        SharedStorage::with_slice(slice, |ss| f(Self::from_storage(ss)))
129    }
130
131    /// Calls f with a [`Buffer`] backed by this vec.
132    ///
133    /// # Panics
134    /// Panics if any clones of the [`Buffer`] still live when `f` returns.
135    pub fn with_vec<R, F: FnOnce(Buffer<T>) -> R>(vec: &mut Vec<T>, f: F) -> R {
136        SharedStorage::with_vec(vec, |ss| f(Self::from_storage(ss)))
137    }
138
139    /// Returns the storage backing this [`Buffer`].
140    pub fn into_storage(self) -> SharedStorage<T> {
141        self.storage
142    }
143
144    /// Returns the number of bytes in the buffer
145    #[inline]
146    pub fn len(&self) -> usize {
147        self.length
148    }
149
150    /// Returns whether the buffer is empty.
151    #[inline]
152    pub fn is_empty(&self) -> bool {
153        self.length == 0
154    }
155
156    /// Returns whether underlying data is sliced.
157    /// If sliced the [`Buffer`] is backed by
158    /// more data than the length of `Self`.
159    pub fn is_sliced(&self) -> bool {
160        self.storage.len() != self.length
161    }
162
163    /// Expands this slice to the maximum allowed by the underlying storage.
164    /// Only expands towards the end, the offset isn't changed. That is, element
165    /// i before and after this operation refer to the same element.
166    pub fn expand_end_to_storage(self) -> Self {
167        unsafe {
168            let offset = self.ptr.offset_from(self.storage.as_ptr()) as usize;
169            Self {
170                ptr: self.ptr,
171                length: self.storage.len() - offset,
172                storage: self.storage,
173            }
174        }
175    }
176
177    /// Returns the byte slice stored in this buffer.
178    #[inline]
179    pub fn as_slice(&self) -> &[T] {
180        // SAFETY: invariant of this struct `offset + length <= data.len()`.
181        debug_assert!(self.offset() + self.length <= self.storage.len());
182        unsafe { std::slice::from_raw_parts(self.ptr, self.length) }
183    }
184
185    /// Returns a new [`Buffer`] that is a slice of this buffer.
186    /// Doing so allows the same memory region to be shared between buffers.
187    ///
188    /// # Panics
189    /// Panics iff the range is out of bounds.
190    #[inline]
191    #[must_use]
192    pub fn sliced<R: RangeBounds<usize>>(mut self, range: R) -> Self {
193        self.slice_in_place(range);
194        self
195    }
196
197    /// Returns a new [`Buffer`] that is a slice of this buffer starting at `offset`.
198    /// Doing so allows the same memory region to be shared between buffers.
199    ///
200    /// # Safety
201    /// The caller must ensure the range is in-bounds.
202    #[inline]
203    #[must_use]
204    pub unsafe fn sliced_unchecked<R: RangeBounds<usize>>(mut self, range: R) -> Self {
205        unsafe {
206            self.slice_in_place_unchecked(range);
207        }
208        self
209    }
210
211    /// Slices this buffer to the given range.
212    ///
213    /// # Panics
214    /// Panics iff the range is out of bounds.
215    #[inline]
216    pub fn slice_in_place<R: RangeBounds<usize>>(&mut self, range: R) {
217        unsafe {
218            let Range { start, end } = crate::check_range(range, ..self.len());
219            self.ptr = self.ptr.add(start);
220            self.length = end - start;
221        }
222    }
223
224    /// Slices this buffer to the given range.
225    ///
226    /// # Safety
227    /// The caller must ensure the range is in-bounds.
228    #[inline]
229    pub unsafe fn slice_in_place_unchecked<R: RangeBounds<usize>>(&mut self, range: R) {
230        unsafe {
231            let Range { start, end } = crate::decode_range_unchecked(range, ..self.len());
232            self.ptr = self.ptr.add(start);
233            self.length = end - start;
234        }
235    }
236
237    /// Returns a pointer to the start of the storage underlying this buffer.
238    #[inline]
239    pub fn storage_ptr(&self) -> *const T {
240        self.storage.as_ptr()
241    }
242
243    /// Returns the start offset of this buffer within the underlying storage.
244    #[inline]
245    pub fn offset(&self) -> usize {
246        unsafe {
247            let ret = self.ptr.offset_from(self.storage.as_ptr()) as usize;
248            debug_assert!(ret <= self.storage.len());
249            ret
250        }
251    }
252
253    /// # Safety
254    /// The caller must ensure that the buffer was properly initialized up to `len`.
255    #[inline]
256    pub unsafe fn set_len(&mut self, len: usize) {
257        self.length = len;
258    }
259
260    /// Returns a mutable reference to its underlying [`Vec`], if possible.
261    ///
262    /// This operation returns [`Either::Right`] iff this [`Buffer`]:
263    /// * has no alive clones
264    /// * has not been imported from the C data interface (FFI)
265    #[inline]
266    pub fn into_mut(mut self) -> Either<Self, Vec<T>> {
267        // We lose information if the data is sliced.
268        if self.is_sliced() {
269            return Either::Left(self);
270        }
271        match self.storage.try_into_vec() {
272            Ok(v) => Either::Right(v),
273            Err(slf) => {
274                self.storage = slf;
275                Either::Left(self)
276            },
277        }
278    }
279
280    /// Returns a mutable reference to its slice, if possible.
281    ///
282    /// This operation returns [`Some`] iff this [`Buffer`]:
283    /// * has no alive clones
284    /// * has not been imported from the C data interface (FFI)
285    #[inline]
286    pub fn get_mut_slice(&mut self) -> Option<&mut [T]> {
287        let offset = self.offset();
288        let slice = self.storage.try_as_mut_slice()?;
289        Some(unsafe { slice.get_unchecked_mut(offset..offset + self.length) })
290    }
291
292    /// Since this takes a shared reference to self, beware that others might
293    /// increment this after you've checked it's equal to 1.
294    pub fn storage_refcount(&self) -> u64 {
295        self.storage.refcount()
296    }
297
298    /// Whether these two buffers share the exact same data.
299    pub fn is_same_buffer(&self, other: &Self) -> bool {
300        self.ptr == other.ptr && self.length == other.length
301    }
302}
303
304impl<T: Pod> Buffer<T> {
305    pub fn try_transmute<U: Pod>(mut self) -> Result<Buffer<U>, Self> {
306        assert_ne!(size_of::<U>(), 0);
307        let ptr = self.ptr as *const U;
308        let length = self.length;
309        match self.storage.try_transmute() {
310            Err(v) => {
311                self.storage = v;
312                Err(self)
313            },
314            Ok(storage) => Ok(Buffer {
315                storage,
316                ptr,
317                length: length.checked_mul(size_of::<T>()).expect("overflow") / size_of::<U>(),
318            }),
319        }
320    }
321}
322
323impl<T: Clone> Buffer<T> {
324    pub fn to_vec(self) -> Vec<T> {
325        match self.into_mut() {
326            Either::Right(v) => v,
327            Either::Left(same) => same.as_slice().to_vec(),
328        }
329    }
330}
331
332#[repr(C, align(4096))]
333#[derive(Copy, Clone)]
334struct Aligned([u8; 4096]);
335
336// We intentionally leak 8MiB of zeroed memory once so we don't have to
337// refcount it.
338const GLOBAL_ZERO_SIZE: usize = 8 * 1024 * 1024;
339static GLOBAL_ZEROES: LazyLock<SharedStorage<Aligned>> = LazyLock::new(|| {
340    assert!(GLOBAL_ZERO_SIZE.is_multiple_of(size_of::<Aligned>()));
341    let chunks = GLOBAL_ZERO_SIZE / size_of::<Aligned>();
342    let v = vec![Aligned([0; _]); chunks];
343    let mut ss = SharedStorage::from_vec(v);
344    ss.leak();
345    ss
346});
347
348impl<T: Zeroable> Buffer<T> {
349    pub fn zeroed(length: usize) -> Self {
350        let bytes_needed = length * size_of::<T>();
351        if align_of::<T>() <= align_of::<Aligned>() && bytes_needed <= GLOBAL_ZERO_SIZE {
352            unsafe {
353                // SAFETY: we checked the alignment of T, that it fits, and T is zeroable.
354                let storage = GLOBAL_ZEROES.clone().transmute_unchecked::<T>();
355                let ptr = storage.as_ptr();
356                Buffer {
357                    storage,
358                    ptr,
359                    length,
360                }
361            }
362        } else {
363            bytemuck::zeroed_vec(length).into()
364        }
365    }
366}
367
368impl<T> From<Vec<T>> for Buffer<T> {
369    #[inline]
370    fn from(v: Vec<T>) -> Self {
371        Self::from_vec(v)
372    }
373}
374
375impl<T> Deref for Buffer<T> {
376    type Target = [T];
377
378    #[inline(always)]
379    fn deref(&self) -> &[T] {
380        self.as_slice()
381    }
382}
383
384impl<T> AsRef<[T]> for Buffer<T> {
385    #[inline(always)]
386    fn as_ref(&self) -> &[T] {
387        self.as_slice()
388    }
389}
390
391impl<T> FromIterator<T> for Buffer<T> {
392    #[inline]
393    fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {
394        Vec::from_iter(iter).into()
395    }
396}
397
398#[cfg(feature = "serde")]
399mod _serde_impl {
400    use serde::{Deserialize, Serialize};
401
402    use super::Buffer;
403
404    impl<T> Serialize for Buffer<T>
405    where
406        T: Serialize,
407    {
408        fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
409        where
410            S: serde::Serializer,
411        {
412            <[T] as Serialize>::serialize(self.as_slice(), serializer)
413        }
414    }
415
416    impl<'de, T> Deserialize<'de> for Buffer<T>
417    where
418        T: Deserialize<'de>,
419    {
420        fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
421        where
422            D: serde::Deserializer<'de>,
423        {
424            <Vec<T> as Deserialize>::deserialize(deserializer).map(Buffer::from)
425        }
426    }
427}
428
429impl<T: Copy> IntoIterator for Buffer<T> {
430    type Item = T;
431
432    type IntoIter = IntoIter<T>;
433
434    fn into_iter(self) -> Self::IntoIter {
435        IntoIter::new(self)
436    }
437}
438
439/// This crates' equivalent of [`std::vec::IntoIter`] for [`Buffer`].
440#[derive(Debug, Clone)]
441pub struct IntoIter<T: Copy> {
442    values: Buffer<T>,
443    index: usize,
444    end: usize,
445}
446
447impl<T: Copy> IntoIter<T> {
448    #[inline]
449    fn new(values: Buffer<T>) -> Self {
450        let end = values.len();
451        Self {
452            values,
453            index: 0,
454            end,
455        }
456    }
457}
458
459impl<T: Copy> Iterator for IntoIter<T> {
460    type Item = T;
461
462    #[inline]
463    fn next(&mut self) -> Option<Self::Item> {
464        if self.index == self.end {
465            return None;
466        }
467        let old = self.index;
468        self.index += 1;
469        Some(*unsafe { self.values.get_unchecked(old) })
470    }
471
472    #[inline]
473    fn size_hint(&self) -> (usize, Option<usize>) {
474        (self.end - self.index, Some(self.end - self.index))
475    }
476
477    #[inline]
478    fn nth(&mut self, n: usize) -> Option<Self::Item> {
479        let new_index = self.index + n;
480        if new_index > self.end {
481            self.index = self.end;
482            None
483        } else {
484            self.index = new_index;
485            self.next()
486        }
487    }
488}
489
490impl<T: Copy> DoubleEndedIterator for IntoIter<T> {
491    #[inline]
492    fn next_back(&mut self) -> Option<Self::Item> {
493        if self.index == self.end {
494            None
495        } else {
496            self.end -= 1;
497            Some(*unsafe { self.values.get_unchecked(self.end) })
498        }
499    }
500}
501
502impl<T: Copy> ExactSizeIterator for IntoIter<T> {}