polars_arrow/buffer/
immutable.rs

1#![allow(unsafe_op_in_unsafe_fn)]
2use std::ops::Deref;
3
4use bytemuck::Zeroable;
5use either::Either;
6
7use super::IntoIter;
8use crate::array::{ArrayAccessor, Splitable};
9use crate::storage::SharedStorage;
10
11/// [`Buffer`] is a contiguous memory region that can be shared across
12/// thread boundaries.
13///
14/// The easiest way to think about [`Buffer<T>`] is being equivalent to
15/// a `Arc<Vec<T>>`, with the following differences:
16/// * slicing and cloning is `O(1)`.
17/// * it supports external allocated memory
18///
19/// The easiest way to create one is to use its implementation of `From<Vec<T>>`.
20///
21/// # Examples
22/// ```
23/// use polars_arrow::buffer::Buffer;
24///
25/// let mut buffer: Buffer<u32> = vec![1, 2, 3].into();
26/// assert_eq!(buffer.as_ref(), [1, 2, 3].as_ref());
27///
28/// // it supports copy-on-write semantics (i.e. back to a `Vec`)
29/// let vec: Vec<u32> = buffer.into_mut().right().unwrap();
30/// assert_eq!(vec, vec![1, 2, 3]);
31///
32/// // cloning and slicing is `O(1)` (data is shared)
33/// let mut buffer: Buffer<u32> = vec![1, 2, 3].into();
34/// let mut sliced = buffer.clone();
35/// sliced.slice(1, 1);
36/// assert_eq!(sliced.as_ref(), [2].as_ref());
37/// // but cloning forbids getting mut since `slice` and `buffer` now share data
38/// assert_eq!(buffer.get_mut_slice(), None);
39/// ```
40#[derive(Clone)]
41pub struct Buffer<T> {
42    /// The internal byte buffer.
43    storage: SharedStorage<T>,
44
45    /// A pointer into the buffer where our data starts.
46    ptr: *const T,
47
48    // The length of the buffer.
49    length: usize,
50}
51
52unsafe impl<T: Send + Sync> Sync for Buffer<T> {}
53unsafe impl<T: Send + Sync> Send for Buffer<T> {}
54
55impl<T: PartialEq> PartialEq for Buffer<T> {
56    #[inline]
57    fn eq(&self, other: &Self) -> bool {
58        self.deref() == other.deref()
59    }
60}
61
62impl<T: Eq> Eq for Buffer<T> {}
63
64impl<T: std::hash::Hash> std::hash::Hash for Buffer<T> {
65    #[inline]
66    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
67        self.as_slice().hash(state);
68    }
69}
70
71impl<T: std::fmt::Debug> std::fmt::Debug for Buffer<T> {
72    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73        std::fmt::Debug::fmt(&**self, f)
74    }
75}
76
77impl<T> Default for Buffer<T> {
78    #[inline]
79    fn default() -> Self {
80        Vec::new().into()
81    }
82}
83
84impl<T> Buffer<T> {
85    /// Creates an empty [`Buffer`].
86    #[inline]
87    pub fn new() -> Self {
88        Self::default()
89    }
90
91    /// Auxiliary method to create a new Buffer
92    pub fn from_storage(storage: SharedStorage<T>) -> Self {
93        let ptr = storage.as_ptr();
94        let length = storage.len();
95        Buffer {
96            storage,
97            ptr,
98            length,
99        }
100    }
101
102    pub fn from_static(data: &'static [T]) -> Self {
103        Self::from_storage(SharedStorage::from_static(data))
104    }
105
106    /// Returns the number of bytes in the buffer
107    #[inline]
108    pub fn len(&self) -> usize {
109        self.length
110    }
111
112    /// Returns whether the buffer is empty.
113    #[inline]
114    pub fn is_empty(&self) -> bool {
115        self.length == 0
116    }
117
118    /// Returns whether underlying data is sliced.
119    /// If sliced the [`Buffer`] is backed by
120    /// more data than the length of `Self`.
121    pub fn is_sliced(&self) -> bool {
122        self.storage.len() != self.length
123    }
124
125    /// Expands this slice to the maximum allowed by the underlying storage.
126    /// Only expands towards the end, the offset isn't changed. That is, element
127    /// i before and after this operation refer to the same element.
128    pub fn expand_end_to_storage(self) -> Self {
129        unsafe {
130            let offset = self.ptr.offset_from(self.storage.as_ptr()) as usize;
131            Self {
132                ptr: self.ptr,
133                length: self.storage.len() - offset,
134                storage: self.storage,
135            }
136        }
137    }
138
139    /// Returns the byte slice stored in this buffer
140    #[inline]
141    pub fn as_slice(&self) -> &[T] {
142        // SAFETY:
143        // invariant of this struct `offset + length <= data.len()`
144        debug_assert!(self.offset() + self.length <= self.storage.len());
145        unsafe { std::slice::from_raw_parts(self.ptr, self.length) }
146    }
147
148    /// Returns the byte slice stored in this buffer
149    ///
150    /// # Safety
151    /// `index` must be smaller than `len`
152    #[inline]
153    pub(super) unsafe fn get_unchecked(&self, index: usize) -> &T {
154        // SAFETY:
155        // invariant of this function
156        debug_assert!(index < self.length);
157        unsafe { &*self.ptr.add(index) }
158    }
159
160    /// Returns a new [`Buffer`] that is a slice of this buffer starting at `offset`.
161    /// Doing so allows the same memory region to be shared between buffers.
162    /// # Panics
163    /// Panics iff `offset + length` is larger than `len`.
164    #[inline]
165    pub fn sliced(self, offset: usize, length: usize) -> Self {
166        assert!(
167            offset + length <= self.len(),
168            "the offset of the new Buffer cannot exceed the existing length"
169        );
170        // SAFETY: we just checked bounds
171        unsafe { self.sliced_unchecked(offset, length) }
172    }
173
174    /// Slices this buffer starting at `offset`.
175    /// # Panics
176    /// Panics iff `offset + length` is larger than `len`.
177    #[inline]
178    pub fn slice(&mut self, offset: usize, length: usize) {
179        assert!(
180            offset + length <= self.len(),
181            "the offset of the new Buffer cannot exceed the existing length"
182        );
183        // SAFETY: we just checked bounds
184        unsafe { self.slice_unchecked(offset, length) }
185    }
186
187    /// Returns a new [`Buffer`] that is a slice of this buffer starting at `offset`.
188    /// Doing so allows the same memory region to be shared between buffers.
189    ///
190    /// # Safety
191    /// The caller must ensure `offset + length <= self.len()`
192    #[inline]
193    #[must_use]
194    pub unsafe fn sliced_unchecked(mut self, offset: usize, length: usize) -> Self {
195        debug_assert!(offset + length <= self.len());
196
197        self.slice_unchecked(offset, length);
198        self
199    }
200
201    /// Slices this buffer starting at `offset`.
202    ///
203    /// # Safety
204    /// The caller must ensure `offset + length <= self.len()`
205    #[inline]
206    pub unsafe fn slice_unchecked(&mut self, offset: usize, length: usize) {
207        self.ptr = self.ptr.add(offset);
208        self.length = length;
209    }
210
211    /// Returns a pointer to the start of the storage underlying this buffer.
212    #[inline]
213    pub(crate) fn storage_ptr(&self) -> *const T {
214        self.storage.as_ptr()
215    }
216
217    /// Returns the start offset of this buffer within the underlying storage.
218    #[inline]
219    pub fn offset(&self) -> usize {
220        unsafe {
221            let ret = self.ptr.offset_from(self.storage.as_ptr()) as usize;
222            debug_assert!(ret <= self.storage.len());
223            ret
224        }
225    }
226
227    /// # Safety
228    /// The caller must ensure that the buffer was properly initialized up to `len`.
229    #[inline]
230    pub unsafe fn set_len(&mut self, len: usize) {
231        self.length = len;
232    }
233
234    /// Returns a mutable reference to its underlying [`Vec`], if possible.
235    ///
236    /// This operation returns [`Either::Right`] iff this [`Buffer`]:
237    /// * has no alive clones
238    /// * has not been imported from the C data interface (FFI)
239    #[inline]
240    pub fn into_mut(mut self) -> Either<Self, Vec<T>> {
241        // We lose information if the data is sliced.
242        if self.is_sliced() {
243            return Either::Left(self);
244        }
245        match self.storage.try_into_vec() {
246            Ok(v) => Either::Right(v),
247            Err(slf) => {
248                self.storage = slf;
249                Either::Left(self)
250            },
251        }
252    }
253
254    /// Returns a mutable reference to its slice, if possible.
255    ///
256    /// This operation returns [`Some`] iff this [`Buffer`]:
257    /// * has no alive clones
258    /// * has not been imported from the C data interface (FFI)
259    #[inline]
260    pub fn get_mut_slice(&mut self) -> Option<&mut [T]> {
261        let offset = self.offset();
262        let slice = self.storage.try_as_mut_slice()?;
263        Some(unsafe { slice.get_unchecked_mut(offset..offset + self.length) })
264    }
265
266    /// Since this takes a shared reference to self, beware that others might
267    /// increment this after you've checked it's equal to 1.
268    pub fn storage_refcount(&self) -> u64 {
269        self.storage.refcount()
270    }
271}
272
273impl<T: Clone> Buffer<T> {
274    pub fn make_mut(self) -> Vec<T> {
275        match self.into_mut() {
276            Either::Right(v) => v,
277            Either::Left(same) => same.as_slice().to_vec(),
278        }
279    }
280}
281
282impl<T: Zeroable + Copy> Buffer<T> {
283    pub fn zeroed(len: usize) -> Self {
284        vec![T::zeroed(); len].into()
285    }
286}
287
288impl<T> From<Vec<T>> for Buffer<T> {
289    #[inline]
290    fn from(v: Vec<T>) -> Self {
291        Self::from_storage(SharedStorage::from_vec(v))
292    }
293}
294
295impl<T> Deref for Buffer<T> {
296    type Target = [T];
297
298    #[inline(always)]
299    fn deref(&self) -> &[T] {
300        self.as_slice()
301    }
302}
303
304impl<T> AsRef<[T]> for Buffer<T> {
305    #[inline(always)]
306    fn as_ref(&self) -> &[T] {
307        self.as_slice()
308    }
309}
310
311impl<T> FromIterator<T> for Buffer<T> {
312    #[inline]
313    fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {
314        Vec::from_iter(iter).into()
315    }
316}
317
318impl<T: Copy> IntoIterator for Buffer<T> {
319    type Item = T;
320
321    type IntoIter = IntoIter<T>;
322
323    fn into_iter(self) -> Self::IntoIter {
324        IntoIter::new(self)
325    }
326}
327
328unsafe impl<'a, T: 'a> ArrayAccessor<'a> for Buffer<T> {
329    type Item = &'a T;
330
331    unsafe fn value_unchecked(&'a self, index: usize) -> Self::Item {
332        unsafe { &*self.ptr.add(index) }
333    }
334
335    fn len(&self) -> usize {
336        Buffer::len(self)
337    }
338}
339
340impl<T> Splitable for Buffer<T> {
341    #[inline(always)]
342    fn check_bound(&self, offset: usize) -> bool {
343        offset <= self.len()
344    }
345
346    unsafe fn _split_at_unchecked(&self, offset: usize) -> (Self, Self) {
347        let storage = &self.storage;
348
349        (
350            Self {
351                storage: storage.clone(),
352                ptr: self.ptr,
353                length: offset,
354            },
355            Self {
356                storage: storage.clone(),
357                ptr: self.ptr.wrapping_add(offset),
358                length: self.length - offset,
359            },
360        )
361    }
362}