Skip to main content

polars_buffer/
buffer.rs

1use std::ops::{Deref, Range, RangeBounds};
2use std::sync::LazyLock;
3
4use bytemuck::{Pod, Zeroable};
5use either::Either;
6use polars_utils::range::{check_range, decode_range_unchecked};
7
8use crate::storage::SharedStorage;
9
10/// [`Buffer`] is a contiguous memory region that can be shared across
11/// thread boundaries.
12///
13/// The easiest way to think about [`Buffer<T>`] is being equivalent to
14/// a `Arc<Vec<T>>`, with the following differences:
15/// * slicing and cloning is `O(1)`.
16/// * it supports external allocated memory
17///
18/// The easiest way to create one is to use its implementation of `From<Vec<T>>`.
19///
20/// # Examples
21/// ```
22/// use polars_buffer::Buffer;
23///
24/// let mut buffer: Buffer<u32> = vec![1, 2, 3].into();
25/// assert_eq!(buffer.as_ref(), [1, 2, 3].as_ref());
26///
27/// // it supports copy-on-write semantics (i.e. back to a `Vec`)
28/// let vec: Vec<u32> = buffer.into_mut().right().unwrap();
29/// assert_eq!(vec, vec![1, 2, 3]);
30///
31/// // cloning and slicing is `O(1)` (data is shared)
32/// let mut buffer: Buffer<u32> = vec![1, 2, 3].into();
33/// let mut sliced = buffer.clone();
34/// sliced.slice(1, 1);
35/// assert_eq!(sliced.as_ref(), [2].as_ref());
36/// // but cloning forbids getting mut since `slice` and `buffer` now share data
37/// assert_eq!(buffer.get_mut_slice(), None);
38/// ```
39pub struct Buffer<T> {
40    /// The internal byte buffer.
41    storage: SharedStorage<T>,
42
43    /// A pointer into the buffer where our data starts.
44    ptr: *const T,
45
46    // The length of the buffer.
47    length: usize,
48}
49
50impl<T> Clone for Buffer<T> {
51    fn clone(&self) -> Self {
52        Self {
53            storage: self.storage.clone(),
54            ptr: self.ptr,
55            length: self.length,
56        }
57    }
58}
59
60unsafe impl<T: Send + Sync> Sync for Buffer<T> {}
61unsafe impl<T: Send + Sync> Send for Buffer<T> {}
62
63impl<T: PartialEq> PartialEq for Buffer<T> {
64    #[inline]
65    fn eq(&self, other: &Self) -> bool {
66        self.deref() == other.deref()
67    }
68}
69
70impl<T: Eq> Eq for Buffer<T> {}
71
72impl<T: std::hash::Hash> std::hash::Hash for Buffer<T> {
73    #[inline]
74    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
75        self.as_slice().hash(state);
76    }
77}
78
79impl<T: std::fmt::Debug> std::fmt::Debug for Buffer<T> {
80    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
81        std::fmt::Debug::fmt(&**self, f)
82    }
83}
84
85impl<T> Default for Buffer<T> {
86    #[inline]
87    fn default() -> Self {
88        Self::new()
89    }
90}
91
92impl<T> Buffer<T> {
93    /// Creates an empty [`Buffer`].
94    #[inline]
95    pub const fn new() -> Self {
96        Self::from_storage(SharedStorage::empty())
97    }
98
99    /// Auxiliary method to create a new Buffer.
100    pub const fn from_storage(storage: SharedStorage<T>) -> Self {
101        let ptr = storage.as_ptr();
102        let length = storage.len();
103        Buffer {
104            storage,
105            ptr,
106            length,
107        }
108    }
109
110    /// Creates a [`Buffer`] backed by static data.
111    pub fn from_static(data: &'static [T]) -> Self {
112        Self::from_storage(SharedStorage::from_static(data))
113    }
114
115    /// Creates a [`Buffer`] backed by a vec.
116    pub fn from_vec(data: Vec<T>) -> Self {
117        Self::from_storage(SharedStorage::from_vec(data))
118    }
119
120    /// Creates a [`Buffer`] backed by `owner`.
121    pub fn from_owner<O: Send + AsRef<[T]> + 'static>(owner: O) -> Self {
122        Self::from_storage(SharedStorage::from_owner(owner))
123    }
124
125    /// Calls f with a [`Buffer`] backed by this slice.
126    ///
127    /// Aborts if any clones of the [`Buffer`] still live when `f` returns.
128    pub fn with_slice<R, F: FnOnce(Buffer<T>) -> R>(slice: &[T], f: F) -> R {
129        SharedStorage::with_slice(slice, |ss| f(Self::from_storage(ss)))
130    }
131
132    /// Calls f with a [`Buffer`] backed by this vec.
133    ///
134    /// # Panics
135    /// Panics if any clones of the [`Buffer`] still live when `f` returns.
136    pub fn with_vec<R, F: FnOnce(Buffer<T>) -> R>(vec: &mut Vec<T>, f: F) -> R {
137        SharedStorage::with_vec(vec, |ss| f(Self::from_storage(ss)))
138    }
139
140    /// Returns the storage backing this [`Buffer`].
141    pub fn into_storage(self) -> SharedStorage<T> {
142        self.storage
143    }
144
145    /// Returns the number of bytes in the buffer
146    #[inline]
147    pub fn len(&self) -> usize {
148        self.length
149    }
150
151    /// Returns whether the buffer is empty.
152    #[inline]
153    pub fn is_empty(&self) -> bool {
154        self.length == 0
155    }
156
157    /// Returns whether underlying data is sliced.
158    /// If sliced the [`Buffer`] is backed by
159    /// more data than the length of `Self`.
160    pub fn is_sliced(&self) -> bool {
161        self.storage.len() != self.length
162    }
163
164    /// Expands this slice to the maximum allowed by the underlying storage.
165    /// Only expands towards the end, the offset isn't changed. That is, element
166    /// i before and after this operation refer to the same element.
167    pub fn expand_end_to_storage(self) -> Self {
168        unsafe {
169            let offset = self.ptr.offset_from(self.storage.as_ptr()) as usize;
170            Self {
171                ptr: self.ptr,
172                length: self.storage.len() - offset,
173                storage: self.storage,
174            }
175        }
176    }
177
178    /// Returns the byte slice stored in this buffer.
179    #[inline]
180    pub fn as_slice(&self) -> &[T] {
181        // SAFETY: invariant of this struct `offset + length <= data.len()`.
182        debug_assert!(self.offset() + self.length <= self.storage.len());
183        unsafe { std::slice::from_raw_parts(self.ptr, self.length) }
184    }
185
186    /// Returns a new [`Buffer`] that is a slice of this buffer.
187    /// Doing so allows the same memory region to be shared between buffers.
188    ///
189    /// # Panics
190    /// Panics iff the range is out of bounds.
191    #[inline]
192    #[must_use]
193    pub fn sliced<R: RangeBounds<usize>>(mut self, range: R) -> Self {
194        self.slice_in_place(range);
195        self
196    }
197
198    /// Returns a new [`Buffer`] that is a slice of this buffer starting at `offset`.
199    /// Doing so allows the same memory region to be shared between buffers.
200    ///
201    /// # Safety
202    /// The caller must ensure the range is in-bounds.
203    #[inline]
204    #[must_use]
205    pub unsafe fn sliced_unchecked<R: RangeBounds<usize>>(mut self, range: R) -> Self {
206        unsafe {
207            self.slice_in_place_unchecked(range);
208        }
209        self
210    }
211
212    /// Slices this buffer to the given range.
213    ///
214    /// # Panics
215    /// Panics iff the range is out of bounds.
216    #[inline]
217    pub fn slice_in_place<R: RangeBounds<usize>>(&mut self, range: R) {
218        unsafe {
219            let Range { start, end } = check_range(range, ..self.len());
220            self.ptr = self.ptr.add(start);
221            self.length = end - start;
222        }
223    }
224
225    /// Slices this buffer to the given range.
226    ///
227    /// # Safety
228    /// The caller must ensure the range is in-bounds.
229    #[inline]
230    pub unsafe fn slice_in_place_unchecked<R: RangeBounds<usize>>(&mut self, range: R) {
231        unsafe {
232            let Range { start, end } = decode_range_unchecked(range, ..self.len());
233            self.ptr = self.ptr.add(start);
234            self.length = end - start;
235        }
236    }
237
238    /// Divides one buffer into two at an index.
239    ///
240    /// The first will contain all indices from `[0, mid)` (excluding
241    /// the index `mid` itself) and the second will contain all
242    /// indices from `[mid, len)` (excluding the index `len` itself).
243    ///
244    /// # Panics
245    /// Panics if `mid > len`.
246    #[must_use]
247    pub fn split_at(self, mid: usize) -> (Self, Self) {
248        (self.clone().sliced(..mid), self.sliced(mid..))
249    }
250
251    /// Splits the buffer into two at the given index.
252    ///
253    /// Returns a buffer containing the elements in the range
254    /// `[at, len)`. After the call, self will be left containing
255    /// the elements `[0, at)`.
256    ///
257    /// # Panics
258    /// Panics if `at > len`.
259    #[must_use]
260    pub fn split_off(&mut self, at: usize) -> Self {
261        let out = self.clone().sliced(at..);
262        self.slice_in_place(..at);
263        out
264    }
265
266    /// Returns a pointer to the start of the storage underlying this buffer.
267    #[inline]
268    pub fn storage_ptr(&self) -> *const T {
269        self.storage.as_ptr()
270    }
271
272    /// Returns the start offset of this buffer within the underlying storage.
273    #[inline]
274    pub fn offset(&self) -> usize {
275        unsafe {
276            let ret = self.ptr.offset_from(self.storage.as_ptr()) as usize;
277            debug_assert!(ret <= self.storage.len());
278            ret
279        }
280    }
281
282    /// # Safety
283    /// The caller must ensure that the buffer was properly initialized up to `len`.
284    #[inline]
285    pub unsafe fn set_len(&mut self, len: usize) {
286        self.length = len;
287    }
288
289    /// Returns a mutable reference to its underlying [`Vec`], if possible.
290    ///
291    /// This operation returns [`Either::Right`] iff this [`Buffer`]:
292    /// * has no alive clones
293    /// * has not been imported from the C data interface (FFI)
294    #[inline]
295    pub fn into_mut(mut self) -> Either<Self, Vec<T>> {
296        // We lose information if the data is sliced.
297        if self.is_sliced() {
298            return Either::Left(self);
299        }
300        match self.storage.try_into_vec() {
301            Ok(v) => Either::Right(v),
302            Err(slf) => {
303                self.storage = slf;
304                Either::Left(self)
305            },
306        }
307    }
308
309    /// Returns a mutable reference to its slice, if possible.
310    ///
311    /// This operation returns [`Some`] iff this [`Buffer`]:
312    /// * has no alive clones
313    /// * has not been imported from the C data interface (FFI)
314    #[inline]
315    pub fn get_mut_slice(&mut self) -> Option<&mut [T]> {
316        let offset = self.offset();
317        let slice = self.storage.try_as_mut_slice()?;
318        Some(unsafe { slice.get_unchecked_mut(offset..offset + self.length) })
319    }
320
321    /// Since this takes a shared reference to self, beware that others might
322    /// increment this after you've checked it's equal to 1.
323    pub fn storage_refcount(&self) -> u64 {
324        self.storage.refcount()
325    }
326
327    /// Whether these two buffers share the exact same data.
328    pub fn is_same_buffer(&self, other: &Self) -> bool {
329        self.ptr == other.ptr && self.length == other.length
330    }
331}
332
333impl<T: Pod> Buffer<T> {
334    pub fn try_transmute<U: Pod>(mut self) -> Result<Buffer<U>, Self> {
335        assert_ne!(size_of::<U>(), 0);
336        let ptr = self.ptr as *const U;
337        let length = self.length;
338        match self.storage.try_transmute() {
339            Err(v) => {
340                self.storage = v;
341                Err(self)
342            },
343            Ok(storage) => Ok(Buffer {
344                storage,
345                ptr,
346                length: length.checked_mul(size_of::<T>()).expect("overflow") / size_of::<U>(),
347            }),
348        }
349    }
350}
351
352impl<T: Clone> Buffer<T> {
353    pub fn to_vec(self) -> Vec<T> {
354        match self.into_mut() {
355            Either::Right(v) => v,
356            Either::Left(same) => same.as_slice().to_vec(),
357        }
358    }
359}
360
361#[repr(C, align(4096))]
362#[derive(Copy, Clone)]
363struct Aligned([u8; 4096]);
364
365// We intentionally leak 8MiB of zeroed memory once so we don't have to
366// refcount it.
367const GLOBAL_ZERO_SIZE: usize = 8 * 1024 * 1024;
368static GLOBAL_ZEROES: LazyLock<SharedStorage<Aligned>> = LazyLock::new(|| {
369    assert!(GLOBAL_ZERO_SIZE.is_multiple_of(size_of::<Aligned>()));
370    let chunks = GLOBAL_ZERO_SIZE / size_of::<Aligned>();
371    let v = vec![Aligned([0; _]); chunks];
372    let mut ss = SharedStorage::from_vec(v);
373    ss.leak();
374    ss
375});
376
377impl<T: Zeroable> Buffer<T> {
378    pub fn zeroed(length: usize) -> Self {
379        let bytes_needed = length * size_of::<T>();
380        if align_of::<T>() <= align_of::<Aligned>() && bytes_needed <= GLOBAL_ZERO_SIZE {
381            unsafe {
382                // SAFETY: we checked the alignment of T, that it fits, and T is zeroable.
383                let storage = GLOBAL_ZEROES.clone().transmute_unchecked::<T>();
384                let ptr = storage.as_ptr();
385                Buffer {
386                    storage,
387                    ptr,
388                    length,
389                }
390            }
391        } else {
392            bytemuck::zeroed_vec(length).into()
393        }
394    }
395}
396
397impl<T> From<Vec<T>> for Buffer<T> {
398    #[inline]
399    fn from(v: Vec<T>) -> Self {
400        Self::from_vec(v)
401    }
402}
403
404impl<T> Deref for Buffer<T> {
405    type Target = [T];
406
407    #[inline(always)]
408    fn deref(&self) -> &[T] {
409        self.as_slice()
410    }
411}
412
413impl<T> AsRef<[T]> for Buffer<T> {
414    #[inline(always)]
415    fn as_ref(&self) -> &[T] {
416        self.as_slice()
417    }
418}
419
420impl<T> FromIterator<T> for Buffer<T> {
421    #[inline]
422    fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {
423        Vec::from_iter(iter).into()
424    }
425}
426
427#[cfg(feature = "serde")]
428mod _serde_impl {
429    use serde::{Deserialize, Serialize};
430
431    use super::Buffer;
432
433    impl<T> Serialize for Buffer<T>
434    where
435        T: Serialize,
436    {
437        fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
438        where
439            S: serde::Serializer,
440        {
441            <[T] as Serialize>::serialize(self.as_slice(), serializer)
442        }
443    }
444
445    impl<'de, T> Deserialize<'de> for Buffer<T>
446    where
447        T: Deserialize<'de>,
448    {
449        fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
450        where
451            D: serde::Deserializer<'de>,
452        {
453            <Vec<T> as Deserialize>::deserialize(deserializer).map(Buffer::from)
454        }
455    }
456}
457
458impl<T: Copy> IntoIterator for Buffer<T> {
459    type Item = T;
460
461    type IntoIter = IntoIter<T>;
462
463    fn into_iter(self) -> Self::IntoIter {
464        IntoIter::new(self)
465    }
466}
467
468/// This crates' equivalent of [`std::vec::IntoIter`] for [`Buffer`].
469#[derive(Debug, Clone)]
470pub struct IntoIter<T: Copy> {
471    values: Buffer<T>,
472    index: usize,
473    end: usize,
474}
475
476impl<T: Copy> IntoIter<T> {
477    #[inline]
478    fn new(values: Buffer<T>) -> Self {
479        let end = values.len();
480        Self {
481            values,
482            index: 0,
483            end,
484        }
485    }
486}
487
488impl<T: Copy> Iterator for IntoIter<T> {
489    type Item = T;
490
491    #[inline]
492    fn next(&mut self) -> Option<Self::Item> {
493        if self.index == self.end {
494            return None;
495        }
496        let old = self.index;
497        self.index += 1;
498        Some(*unsafe { self.values.get_unchecked(old) })
499    }
500
501    #[inline]
502    fn size_hint(&self) -> (usize, Option<usize>) {
503        (self.end - self.index, Some(self.end - self.index))
504    }
505
506    #[inline]
507    fn nth(&mut self, n: usize) -> Option<Self::Item> {
508        let new_index = self.index + n;
509        if new_index > self.end {
510            self.index = self.end;
511            None
512        } else {
513            self.index = new_index;
514            self.next()
515        }
516    }
517}
518
519impl<T: Copy> DoubleEndedIterator for IntoIter<T> {
520    #[inline]
521    fn next_back(&mut self) -> Option<Self::Item> {
522        if self.index == self.end {
523            None
524        } else {
525            self.end -= 1;
526            Some(*unsafe { self.values.get_unchecked(self.end) })
527        }
528    }
529}
530
531impl<T: Copy> ExactSizeIterator for IntoIter<T> {}