polars_arrow/buffer/
immutable.rs

1#![allow(unsafe_op_in_unsafe_fn)]
2use std::ops::Deref;
3
4use bytemuck::{Pod, Zeroable};
5use either::Either;
6
7use super::IntoIter;
8use crate::array::{ArrayAccessor, Splitable};
9use crate::storage::SharedStorage;
10
11/// [`Buffer`] is a contiguous memory region that can be shared across
12/// thread boundaries.
13///
14/// The easiest way to think about [`Buffer<T>`] is being equivalent to
15/// a `Arc<Vec<T>>`, with the following differences:
16/// * slicing and cloning is `O(1)`.
17/// * it supports external allocated memory
18///
19/// The easiest way to create one is to use its implementation of `From<Vec<T>>`.
20///
21/// # Examples
22/// ```
23/// use polars_arrow::buffer::Buffer;
24///
25/// let mut buffer: Buffer<u32> = vec![1, 2, 3].into();
26/// assert_eq!(buffer.as_ref(), [1, 2, 3].as_ref());
27///
28/// // it supports copy-on-write semantics (i.e. back to a `Vec`)
29/// let vec: Vec<u32> = buffer.into_mut().right().unwrap();
30/// assert_eq!(vec, vec![1, 2, 3]);
31///
32/// // cloning and slicing is `O(1)` (data is shared)
33/// let mut buffer: Buffer<u32> = vec![1, 2, 3].into();
34/// let mut sliced = buffer.clone();
35/// sliced.slice(1, 1);
36/// assert_eq!(sliced.as_ref(), [2].as_ref());
37/// // but cloning forbids getting mut since `slice` and `buffer` now share data
38/// assert_eq!(buffer.get_mut_slice(), None);
39/// ```
40pub struct Buffer<T> {
41    /// The internal byte buffer.
42    storage: SharedStorage<T>,
43
44    /// A pointer into the buffer where our data starts.
45    ptr: *const T,
46
47    // The length of the buffer.
48    length: usize,
49}
50
51impl<T> Clone for Buffer<T> {
52    fn clone(&self) -> Self {
53        Self {
54            storage: self.storage.clone(),
55            ptr: self.ptr,
56            length: self.length,
57        }
58    }
59}
60
61unsafe impl<T: Send + Sync> Sync for Buffer<T> {}
62unsafe impl<T: Send + Sync> Send for Buffer<T> {}
63
64impl<T: PartialEq> PartialEq for Buffer<T> {
65    #[inline]
66    fn eq(&self, other: &Self) -> bool {
67        self.deref() == other.deref()
68    }
69}
70
71impl<T: Eq> Eq for Buffer<T> {}
72
73impl<T: std::hash::Hash> std::hash::Hash for Buffer<T> {
74    #[inline]
75    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
76        self.as_slice().hash(state);
77    }
78}
79
80impl<T: std::fmt::Debug> std::fmt::Debug for Buffer<T> {
81    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
82        std::fmt::Debug::fmt(&**self, f)
83    }
84}
85
86impl<T> Default for Buffer<T> {
87    #[inline]
88    fn default() -> Self {
89        Vec::new().into()
90    }
91}
92
93impl<T> Buffer<T> {
94    /// Creates an empty [`Buffer`].
95    #[inline]
96    pub fn new() -> Self {
97        Self::default()
98    }
99
100    /// Auxiliary method to create a new Buffer
101    pub fn from_storage(storage: SharedStorage<T>) -> Self {
102        let ptr = storage.as_ptr();
103        let length = storage.len();
104        Buffer {
105            storage,
106            ptr,
107            length,
108        }
109    }
110
111    pub fn from_static(data: &'static [T]) -> Self {
112        Self::from_storage(SharedStorage::from_static(data))
113    }
114
115    /// Returns the number of bytes in the buffer
116    #[inline]
117    pub fn len(&self) -> usize {
118        self.length
119    }
120
121    /// Returns whether the buffer is empty.
122    #[inline]
123    pub fn is_empty(&self) -> bool {
124        self.length == 0
125    }
126
127    /// Returns whether underlying data is sliced.
128    /// If sliced the [`Buffer`] is backed by
129    /// more data than the length of `Self`.
130    pub fn is_sliced(&self) -> bool {
131        self.storage.len() != self.length
132    }
133
134    /// Expands this slice to the maximum allowed by the underlying storage.
135    /// Only expands towards the end, the offset isn't changed. That is, element
136    /// i before and after this operation refer to the same element.
137    pub fn expand_end_to_storage(self) -> Self {
138        unsafe {
139            let offset = self.ptr.offset_from(self.storage.as_ptr()) as usize;
140            Self {
141                ptr: self.ptr,
142                length: self.storage.len() - offset,
143                storage: self.storage,
144            }
145        }
146    }
147
148    /// Returns the byte slice stored in this buffer
149    #[inline]
150    pub fn as_slice(&self) -> &[T] {
151        // SAFETY:
152        // invariant of this struct `offset + length <= data.len()`
153        debug_assert!(self.offset() + self.length <= self.storage.len());
154        unsafe { std::slice::from_raw_parts(self.ptr, self.length) }
155    }
156
157    /// Returns the byte slice stored in this buffer
158    ///
159    /// # Safety
160    /// `index` must be smaller than `len`
161    #[inline]
162    pub(super) unsafe fn get_unchecked(&self, index: usize) -> &T {
163        // SAFETY:
164        // invariant of this function
165        debug_assert!(index < self.length);
166        unsafe { &*self.ptr.add(index) }
167    }
168
169    /// Returns a new [`Buffer`] that is a slice of this buffer starting at `offset`.
170    /// Doing so allows the same memory region to be shared between buffers.
171    /// # Panics
172    /// Panics iff `offset + length` is larger than `len`.
173    #[inline]
174    pub fn sliced(self, offset: usize, length: usize) -> Self {
175        assert!(
176            offset + length <= self.len(),
177            "the offset of the new Buffer cannot exceed the existing length"
178        );
179        // SAFETY: we just checked bounds
180        unsafe { self.sliced_unchecked(offset, length) }
181    }
182
183    /// Slices this buffer starting at `offset`.
184    /// # Panics
185    /// Panics iff `offset + length` is larger than `len`.
186    #[inline]
187    pub fn slice(&mut self, offset: usize, length: usize) {
188        assert!(
189            offset + length <= self.len(),
190            "the offset of the new Buffer cannot exceed the existing length"
191        );
192        // SAFETY: we just checked bounds
193        unsafe { self.slice_unchecked(offset, length) }
194    }
195
196    /// Returns a new [`Buffer`] that is a slice of this buffer starting at `offset`.
197    /// Doing so allows the same memory region to be shared between buffers.
198    ///
199    /// # Safety
200    /// The caller must ensure `offset + length <= self.len()`
201    #[inline]
202    #[must_use]
203    pub unsafe fn sliced_unchecked(mut self, offset: usize, length: usize) -> Self {
204        debug_assert!(offset + length <= self.len());
205
206        self.slice_unchecked(offset, length);
207        self
208    }
209
210    /// Slices this buffer starting at `offset`.
211    ///
212    /// # Safety
213    /// The caller must ensure `offset + length <= self.len()`
214    #[inline]
215    pub unsafe fn slice_unchecked(&mut self, offset: usize, length: usize) {
216        self.ptr = self.ptr.add(offset);
217        self.length = length;
218    }
219
220    /// Returns a pointer to the start of the storage underlying this buffer.
221    #[inline]
222    pub(crate) fn storage_ptr(&self) -> *const T {
223        self.storage.as_ptr()
224    }
225
226    /// Returns the start offset of this buffer within the underlying storage.
227    #[inline]
228    pub fn offset(&self) -> usize {
229        unsafe {
230            let ret = self.ptr.offset_from(self.storage.as_ptr()) as usize;
231            debug_assert!(ret <= self.storage.len());
232            ret
233        }
234    }
235
236    /// # Safety
237    /// The caller must ensure that the buffer was properly initialized up to `len`.
238    #[inline]
239    pub unsafe fn set_len(&mut self, len: usize) {
240        self.length = len;
241    }
242
243    /// Returns a mutable reference to its underlying [`Vec`], if possible.
244    ///
245    /// This operation returns [`Either::Right`] iff this [`Buffer`]:
246    /// * has no alive clones
247    /// * has not been imported from the C data interface (FFI)
248    #[inline]
249    pub fn into_mut(mut self) -> Either<Self, Vec<T>> {
250        // We lose information if the data is sliced.
251        if self.is_sliced() {
252            return Either::Left(self);
253        }
254        match self.storage.try_into_vec() {
255            Ok(v) => Either::Right(v),
256            Err(slf) => {
257                self.storage = slf;
258                Either::Left(self)
259            },
260        }
261    }
262
263    /// Returns a mutable reference to its slice, if possible.
264    ///
265    /// This operation returns [`Some`] iff this [`Buffer`]:
266    /// * has no alive clones
267    /// * has not been imported from the C data interface (FFI)
268    #[inline]
269    pub fn get_mut_slice(&mut self) -> Option<&mut [T]> {
270        let offset = self.offset();
271        let slice = self.storage.try_as_mut_slice()?;
272        Some(unsafe { slice.get_unchecked_mut(offset..offset + self.length) })
273    }
274
275    /// Since this takes a shared reference to self, beware that others might
276    /// increment this after you've checked it's equal to 1.
277    pub fn storage_refcount(&self) -> u64 {
278        self.storage.refcount()
279    }
280}
281
282impl<T: Pod> Buffer<T> {
283    pub fn try_transmute<U: Pod>(mut self) -> Result<Buffer<U>, Self> {
284        assert_ne!(size_of::<U>(), 0);
285        let ptr = self.ptr as *const U;
286        let length = self.length;
287        match self.storage.try_transmute() {
288            Err(v) => {
289                self.storage = v;
290                Err(self)
291            },
292            Ok(storage) => Ok(Buffer {
293                storage,
294                ptr,
295                length: length.checked_mul(size_of::<T>()).expect("overflow") / size_of::<U>(),
296            }),
297        }
298    }
299}
300
301impl<T: Clone> Buffer<T> {
302    pub fn make_mut(self) -> Vec<T> {
303        match self.into_mut() {
304            Either::Right(v) => v,
305            Either::Left(same) => same.as_slice().to_vec(),
306        }
307    }
308}
309
310impl<T: Zeroable + Copy> Buffer<T> {
311    pub fn zeroed(len: usize) -> Self {
312        vec![T::zeroed(); len].into()
313    }
314}
315
316impl<T> From<Vec<T>> for Buffer<T> {
317    #[inline]
318    fn from(v: Vec<T>) -> Self {
319        Self::from_storage(SharedStorage::from_vec(v))
320    }
321}
322
323impl<T> Deref for Buffer<T> {
324    type Target = [T];
325
326    #[inline(always)]
327    fn deref(&self) -> &[T] {
328        self.as_slice()
329    }
330}
331
332impl<T> AsRef<[T]> for Buffer<T> {
333    #[inline(always)]
334    fn as_ref(&self) -> &[T] {
335        self.as_slice()
336    }
337}
338
339impl<T> FromIterator<T> for Buffer<T> {
340    #[inline]
341    fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {
342        Vec::from_iter(iter).into()
343    }
344}
345
346impl<T: Copy> IntoIterator for Buffer<T> {
347    type Item = T;
348
349    type IntoIter = IntoIter<T>;
350
351    fn into_iter(self) -> Self::IntoIter {
352        IntoIter::new(self)
353    }
354}
355
356unsafe impl<'a, T: 'a> ArrayAccessor<'a> for Buffer<T> {
357    type Item = &'a T;
358
359    unsafe fn value_unchecked(&'a self, index: usize) -> Self::Item {
360        unsafe { &*self.ptr.add(index) }
361    }
362
363    fn len(&self) -> usize {
364        Buffer::len(self)
365    }
366}
367
368impl<T> Splitable for Buffer<T> {
369    #[inline(always)]
370    fn check_bound(&self, offset: usize) -> bool {
371        offset <= self.len()
372    }
373
374    unsafe fn _split_at_unchecked(&self, offset: usize) -> (Self, Self) {
375        let storage = &self.storage;
376
377        (
378            Self {
379                storage: storage.clone(),
380                ptr: self.ptr,
381                length: offset,
382            },
383            Self {
384                storage: storage.clone(),
385                ptr: self.ptr.wrapping_add(offset),
386                length: self.length - offset,
387            },
388        )
389    }
390}
391
392#[cfg(feature = "serde")]
393mod _serde_impl {
394    use serde::{Deserialize, Serialize};
395
396    use super::Buffer;
397
398    impl<T> Serialize for Buffer<T>
399    where
400        T: Serialize,
401    {
402        fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
403        where
404            S: serde::Serializer,
405        {
406            <[T] as Serialize>::serialize(self.as_slice(), serializer)
407        }
408    }
409
410    impl<'de, T> Deserialize<'de> for Buffer<T>
411    where
412        T: Deserialize<'de>,
413    {
414        fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
415        where
416            D: serde::Deserializer<'de>,
417        {
418            <Vec<T> as Deserialize>::deserialize(deserializer).map(Buffer::from)
419        }
420    }
421}
422
423#[cfg(feature = "dsl-schema")]
424impl<T: schemars::JsonSchema> schemars::JsonSchema for Buffer<T> {
425    fn schema_name() -> String {
426        <[T] as schemars::JsonSchema>::schema_name()
427    }
428
429    fn schema_id() -> std::borrow::Cow<'static, str> {
430        <[T] as schemars::JsonSchema>::schema_id()
431    }
432
433    fn json_schema(generator: &mut schemars::r#gen::SchemaGenerator) -> schemars::schema::Schema {
434        <[T] as schemars::JsonSchema>::json_schema(generator)
435    }
436}