polars_arrow/array/binview/
mod.rs

1#![allow(unsafe_op_in_unsafe_fn)]
2//! See thread: https://lists.apache.org/thread/w88tpz76ox8h3rxkjl4so6rg3f1rv7wt
3
4mod builder;
5pub use builder::*;
6mod ffi;
7pub(super) mod fmt;
8mod iterator;
9mod mutable;
10mod view;
11
12use std::any::Any;
13use std::fmt::Debug;
14use std::marker::PhantomData;
15use std::sync::Arc;
16use std::sync::atomic::{AtomicU64, Ordering};
17
18use polars_error::*;
19
20use crate::array::Array;
21use crate::bitmap::Bitmap;
22use crate::buffer::Buffer;
23use crate::datatypes::ArrowDataType;
24
25mod private {
26    pub trait Sealed: Send + Sync {}
27
28    impl Sealed for str {}
29    impl Sealed for [u8] {}
30}
31pub use iterator::BinaryViewValueIter;
32pub use mutable::MutableBinaryViewArray;
33use polars_utils::aliases::{InitHashMaps, PlHashMap};
34use private::Sealed;
35
36use crate::array::binview::view::{validate_binary_views, validate_views_utf8_only};
37use crate::array::iterator::NonNullValuesIter;
38use crate::bitmap::utils::{BitmapIter, ZipValidity};
39pub type BinaryViewArray = BinaryViewArrayGeneric<[u8]>;
40pub type Utf8ViewArray = BinaryViewArrayGeneric<str>;
41pub use view::{View, validate_utf8_views};
42
43use super::Splitable;
44
45pub type MutablePlString = MutableBinaryViewArray<str>;
46pub type MutablePlBinary = MutableBinaryViewArray<[u8]>;
47
48static BIN_VIEW_TYPE: ArrowDataType = ArrowDataType::BinaryView;
49static UTF8_VIEW_TYPE: ArrowDataType = ArrowDataType::Utf8View;
50
51// Growth parameters of view array buffers.
52const DEFAULT_BLOCK_SIZE: usize = 8 * 1024;
53const MAX_EXP_BLOCK_SIZE: usize = 16 * 1024 * 1024;
54
55pub trait ViewType: Sealed + 'static + PartialEq + AsRef<Self> {
56    const IS_UTF8: bool;
57    const DATA_TYPE: ArrowDataType;
58    type Owned: Debug + Clone + Sync + Send + AsRef<Self>;
59
60    /// # Safety
61    /// The caller must ensure that `slice` is a valid view.
62    unsafe fn from_bytes_unchecked(slice: &[u8]) -> &Self;
63    fn from_bytes(slice: &[u8]) -> Option<&Self>;
64
65    fn to_bytes(&self) -> &[u8];
66
67    #[allow(clippy::wrong_self_convention)]
68    fn into_owned(&self) -> Self::Owned;
69
70    fn dtype() -> &'static ArrowDataType;
71}
72
73impl ViewType for str {
74    const IS_UTF8: bool = true;
75    const DATA_TYPE: ArrowDataType = ArrowDataType::Utf8View;
76    type Owned = String;
77
78    #[inline(always)]
79    unsafe fn from_bytes_unchecked(slice: &[u8]) -> &Self {
80        std::str::from_utf8_unchecked(slice)
81    }
82    #[inline(always)]
83    fn from_bytes(slice: &[u8]) -> Option<&Self> {
84        std::str::from_utf8(slice).ok()
85    }
86
87    #[inline(always)]
88    fn to_bytes(&self) -> &[u8] {
89        self.as_bytes()
90    }
91
92    fn into_owned(&self) -> Self::Owned {
93        self.to_string()
94    }
95    fn dtype() -> &'static ArrowDataType {
96        &UTF8_VIEW_TYPE
97    }
98}
99
100impl ViewType for [u8] {
101    const IS_UTF8: bool = false;
102    const DATA_TYPE: ArrowDataType = ArrowDataType::BinaryView;
103    type Owned = Vec<u8>;
104
105    #[inline(always)]
106    unsafe fn from_bytes_unchecked(slice: &[u8]) -> &Self {
107        slice
108    }
109    #[inline(always)]
110    fn from_bytes(slice: &[u8]) -> Option<&Self> {
111        Some(slice)
112    }
113
114    #[inline(always)]
115    fn to_bytes(&self) -> &[u8] {
116        self
117    }
118
119    fn into_owned(&self) -> Self::Owned {
120        self.to_vec()
121    }
122
123    fn dtype() -> &'static ArrowDataType {
124        &BIN_VIEW_TYPE
125    }
126}
127
128pub struct BinaryViewArrayGeneric<T: ViewType + ?Sized> {
129    dtype: ArrowDataType,
130    views: Buffer<View>,
131    buffers: Arc<[Buffer<u8>]>,
132    validity: Option<Bitmap>,
133    phantom: PhantomData<T>,
134    /// Total bytes length if we would concatenate them all.
135    total_bytes_len: AtomicU64,
136    /// Total bytes in the buffer (excluding remaining capacity)
137    total_buffer_len: usize,
138}
139
140impl<T: ViewType + ?Sized> PartialEq for BinaryViewArrayGeneric<T> {
141    fn eq(&self, other: &Self) -> bool {
142        self.len() == other.len() && self.into_iter().zip(other).all(|(l, r)| l == r)
143    }
144}
145
146impl<T: ViewType + ?Sized> Clone for BinaryViewArrayGeneric<T> {
147    fn clone(&self) -> Self {
148        Self {
149            dtype: self.dtype.clone(),
150            views: self.views.clone(),
151            buffers: self.buffers.clone(),
152            validity: self.validity.clone(),
153            phantom: Default::default(),
154            total_bytes_len: AtomicU64::new(self.total_bytes_len.load(Ordering::Relaxed)),
155            total_buffer_len: self.total_buffer_len,
156        }
157    }
158}
159
160unsafe impl<T: ViewType + ?Sized> Send for BinaryViewArrayGeneric<T> {}
161unsafe impl<T: ViewType + ?Sized> Sync for BinaryViewArrayGeneric<T> {}
162
163const UNKNOWN_LEN: u64 = u64::MAX;
164
165impl<T: ViewType + ?Sized> BinaryViewArrayGeneric<T> {
166    /// # Safety
167    /// The caller must ensure
168    /// - the data is valid utf8 (if required)
169    /// - The offsets match the buffers.
170    pub unsafe fn new_unchecked(
171        dtype: ArrowDataType,
172        views: Buffer<View>,
173        buffers: Arc<[Buffer<u8>]>,
174        validity: Option<Bitmap>,
175        total_bytes_len: usize,
176        total_buffer_len: usize,
177    ) -> Self {
178        // Verify the invariants
179        #[cfg(debug_assertions)]
180        {
181            if let Some(validity) = validity.as_ref() {
182                assert_eq!(validity.len(), views.len());
183            }
184
185            // @TODO: Enable this. This is currently bugged with concatenate.
186            // let mut actual_total_buffer_len = 0;
187            // let mut actual_total_bytes_len = 0;
188            //
189            // for buffer in buffers.iter() {
190            //     actual_total_buffer_len += buffer.len();
191            // }
192
193            for (i, view) in views.iter().enumerate() {
194                let is_valid = validity.as_ref().is_none_or(|v| v.get_bit(i));
195
196                if !is_valid {
197                    continue;
198                }
199
200                // actual_total_bytes_len += view.length as usize;
201                if view.length > View::MAX_INLINE_SIZE {
202                    assert!((view.buffer_idx as usize) < (buffers.len()));
203                    assert!(
204                        view.offset as usize + view.length as usize
205                            <= buffers[view.buffer_idx as usize].len()
206                    );
207                }
208            }
209
210            // assert_eq!(actual_total_buffer_len, total_buffer_len);
211            // if (total_bytes_len as u64) != UNKNOWN_LEN {
212            //     assert_eq!(actual_total_bytes_len, total_bytes_len);
213            // }
214        }
215
216        Self {
217            dtype,
218            views,
219            buffers,
220            validity,
221            phantom: Default::default(),
222            total_bytes_len: AtomicU64::new(total_bytes_len as u64),
223            total_buffer_len,
224        }
225    }
226
227    /// Create a new BinaryViewArray but initialize a statistics compute.
228    ///
229    /// # Safety
230    /// The caller must ensure the invariants
231    pub unsafe fn new_unchecked_unknown_md(
232        dtype: ArrowDataType,
233        views: Buffer<View>,
234        buffers: Arc<[Buffer<u8>]>,
235        validity: Option<Bitmap>,
236        total_buffer_len: Option<usize>,
237    ) -> Self {
238        let total_bytes_len = UNKNOWN_LEN as usize;
239        let total_buffer_len =
240            total_buffer_len.unwrap_or_else(|| buffers.iter().map(|b| b.len()).sum());
241        Self::new_unchecked(
242            dtype,
243            views,
244            buffers,
245            validity,
246            total_bytes_len,
247            total_buffer_len,
248        )
249    }
250
251    pub fn data_buffers(&self) -> &Arc<[Buffer<u8>]> {
252        &self.buffers
253    }
254
255    pub fn variadic_buffer_lengths(&self) -> Vec<i64> {
256        self.buffers.iter().map(|buf| buf.len() as i64).collect()
257    }
258
259    pub fn views(&self) -> &Buffer<View> {
260        &self.views
261    }
262
263    pub fn into_views(self) -> Vec<View> {
264        self.views.make_mut()
265    }
266
267    pub fn into_inner(
268        self,
269    ) -> (
270        Buffer<View>,
271        Arc<[Buffer<u8>]>,
272        Option<Bitmap>,
273        usize,
274        usize,
275    ) {
276        let views = self.views;
277        let buffers = self.buffers;
278        let validity = self.validity;
279
280        (
281            views,
282            buffers,
283            validity,
284            self.total_bytes_len.load(Ordering::Relaxed) as usize,
285            self.total_buffer_len,
286        )
287    }
288
289    /// Apply a function over the views. This can be used to update views in operations like slicing.
290    ///
291    /// # Safety
292    /// Update the views. All invariants of the views apply.
293    pub unsafe fn apply_views<F: FnMut(View, &T) -> View>(&self, mut update_view: F) -> Self {
294        let arr = self.clone();
295        let (views, buffers, validity, total_bytes_len, total_buffer_len) = arr.into_inner();
296
297        let mut views = views.make_mut();
298        for v in views.iter_mut() {
299            let str_slice = T::from_bytes_unchecked(v.get_slice_unchecked(&buffers));
300            *v = update_view(*v, str_slice);
301        }
302        Self::new_unchecked(
303            self.dtype.clone(),
304            views.into(),
305            buffers,
306            validity,
307            total_bytes_len,
308            total_buffer_len,
309        )
310    }
311
312    pub fn try_new(
313        dtype: ArrowDataType,
314        views: Buffer<View>,
315        buffers: Arc<[Buffer<u8>]>,
316        validity: Option<Bitmap>,
317    ) -> PolarsResult<Self> {
318        if T::IS_UTF8 {
319            validate_utf8_views(views.as_ref(), buffers.as_ref())?;
320        } else {
321            validate_binary_views(views.as_ref(), buffers.as_ref())?;
322        }
323
324        if let Some(validity) = &validity {
325            polars_ensure!(validity.len()== views.len(), ComputeError: "validity mask length must match the number of values" )
326        }
327
328        unsafe {
329            Ok(Self::new_unchecked_unknown_md(
330                dtype, views, buffers, validity, None,
331            ))
332        }
333    }
334
335    /// Creates an empty [`BinaryViewArrayGeneric`], i.e. whose `.len` is zero.
336    #[inline]
337    pub fn new_empty(dtype: ArrowDataType) -> Self {
338        unsafe { Self::new_unchecked(dtype, Buffer::new(), Arc::from([]), None, 0, 0) }
339    }
340
341    /// Returns a new null [`BinaryViewArrayGeneric`] of `length`.
342    #[inline]
343    pub fn new_null(dtype: ArrowDataType, length: usize) -> Self {
344        let validity = Some(Bitmap::new_zeroed(length));
345        unsafe { Self::new_unchecked(dtype, Buffer::zeroed(length), Arc::from([]), validity, 0, 0) }
346    }
347
348    /// Returns the element at index `i`
349    /// # Panics
350    /// iff `i >= self.len()`
351    #[inline]
352    pub fn value(&self, i: usize) -> &T {
353        assert!(i < self.len());
354        unsafe { self.value_unchecked(i) }
355    }
356
357    /// Returns the element at index `i`
358    ///
359    /// # Safety
360    /// Assumes that the `i < self.len`.
361    #[inline]
362    pub unsafe fn value_unchecked(&self, i: usize) -> &T {
363        let v = self.views.get_unchecked(i);
364        T::from_bytes_unchecked(v.get_slice_unchecked(&self.buffers))
365    }
366
367    /// Returns the element at index `i`, or None if it is null.
368    /// # Panics
369    /// iff `i >= self.len()`
370    #[inline]
371    pub fn get(&self, i: usize) -> Option<&T> {
372        assert!(i < self.len());
373        unsafe { self.get_unchecked(i) }
374    }
375
376    /// Returns the element at index `i`, or None if it is null.
377    ///
378    /// # Safety
379    /// Assumes that the `i < self.len`.
380    #[inline]
381    pub unsafe fn get_unchecked(&self, i: usize) -> Option<&T> {
382        if self
383            .validity
384            .as_ref()
385            .is_none_or(|v| v.get_bit_unchecked(i))
386        {
387            let v = self.views.get_unchecked(i);
388            Some(T::from_bytes_unchecked(
389                v.get_slice_unchecked(&self.buffers),
390            ))
391        } else {
392            None
393        }
394    }
395
396    /// Returns an iterator of `Option<&T>` over every element of this array.
397    pub fn iter(&self) -> ZipValidity<&T, BinaryViewValueIter<T>, BitmapIter> {
398        ZipValidity::new_with_validity(self.values_iter(), self.validity.as_ref())
399    }
400
401    /// Returns an iterator of `&[u8]` over every element of this array, ignoring the validity
402    pub fn values_iter(&self) -> BinaryViewValueIter<T> {
403        BinaryViewValueIter::new(self)
404    }
405
406    pub fn len_iter(&self) -> impl Iterator<Item = u32> + '_ {
407        self.views.iter().map(|v| v.length)
408    }
409
410    /// Returns an iterator of the non-null values.
411    pub fn non_null_values_iter(&self) -> NonNullValuesIter<'_, BinaryViewArrayGeneric<T>> {
412        NonNullValuesIter::new(self, self.validity())
413    }
414
415    /// Returns an iterator of the non-null values.
416    pub fn non_null_views_iter(&self) -> NonNullValuesIter<'_, Buffer<View>> {
417        NonNullValuesIter::new(self.views(), self.validity())
418    }
419
420    impl_sliced!();
421    impl_mut_validity!();
422    impl_into_array!();
423
424    pub fn from_slice<S: AsRef<T>, P: AsRef<[Option<S>]>>(slice: P) -> Self {
425        let mutable = MutableBinaryViewArray::from_iterator(
426            slice.as_ref().iter().map(|opt_v| opt_v.as_ref()),
427        );
428        mutable.into()
429    }
430
431    pub fn from_slice_values<S: AsRef<T>, P: AsRef<[S]>>(slice: P) -> Self {
432        let mutable =
433            MutableBinaryViewArray::from_values_iter(slice.as_ref().iter().map(|v| v.as_ref()));
434        mutable.into()
435    }
436
437    /// Get the total length of bytes that it would take to concatenate all binary/str values in this array.
438    pub fn total_bytes_len(&self) -> usize {
439        let total = self.total_bytes_len.load(Ordering::Relaxed);
440        if total == UNKNOWN_LEN {
441            let total = self.len_iter().map(|v| v as usize).sum::<usize>();
442            self.total_bytes_len.store(total as u64, Ordering::Relaxed);
443            total
444        } else {
445            total as usize
446        }
447    }
448
449    /// Get the length of bytes that are stored in the variadic buffers.
450    pub fn total_buffer_len(&self) -> usize {
451        self.total_buffer_len
452    }
453
454    fn total_unshared_buffer_len(&self) -> usize {
455        // XXX: it is O(n), not O(1).
456        // Given this function is only called in `maybe_gc()`,
457        // it may not be worthy to add an extra field for this.
458        self.buffers
459            .iter()
460            .map(|buf| {
461                if buf.storage_refcount() > 1 {
462                    0
463                } else {
464                    buf.len()
465                }
466            })
467            .sum()
468    }
469
470    #[inline(always)]
471    pub fn len(&self) -> usize {
472        self.views.len()
473    }
474
475    /// Garbage collect
476    pub fn gc(self) -> Self {
477        if self.buffers.is_empty() {
478            return self;
479        }
480        let mut mutable = MutableBinaryViewArray::with_capacity(self.len());
481        let buffers = self.buffers.as_ref();
482
483        for view in self.views.as_ref() {
484            unsafe { mutable.push_view_unchecked(*view, buffers) }
485        }
486        mutable.freeze().with_validity(self.validity)
487    }
488
489    pub fn deshare(&self) -> Self {
490        if Arc::strong_count(&self.buffers) == 1
491            && self.buffers.iter().all(|b| b.storage_refcount() == 1)
492        {
493            return self.clone();
494        }
495        self.clone().gc()
496    }
497
498    pub fn is_sliced(&self) -> bool {
499        !std::ptr::eq(self.views.as_ptr(), self.views.storage_ptr())
500    }
501
502    pub fn maybe_gc(self) -> Self {
503        const GC_MINIMUM_SAVINGS: usize = 16 * 1024; // At least 16 KiB.
504
505        if self.total_buffer_len <= GC_MINIMUM_SAVINGS {
506            return self;
507        }
508
509        if Arc::strong_count(&self.buffers) != 1 {
510            // There are multiple holders of this `buffers`.
511            // If we allow gc in this case,
512            // it may end up copying the same content multiple times.
513            return self;
514        }
515
516        // Subtract the maximum amount of inlined strings to get a lower bound
517        // on the number of buffer bytes needed (assuming no dedup).
518        let total_bytes_len = self.total_bytes_len();
519        let buffer_req_lower_bound = total_bytes_len.saturating_sub(self.len() * 12);
520
521        let lower_bound_mem_usage_post_gc = self.len() * 16 + buffer_req_lower_bound;
522        // Use unshared buffer len. Shared buffer won't be freed; no savings.
523        let cur_mem_usage = self.len() * 16 + self.total_unshared_buffer_len();
524        let savings_upper_bound = cur_mem_usage.saturating_sub(lower_bound_mem_usage_post_gc);
525
526        if savings_upper_bound >= GC_MINIMUM_SAVINGS
527            && cur_mem_usage >= 4 * lower_bound_mem_usage_post_gc
528        {
529            self.gc()
530        } else {
531            self
532        }
533    }
534
535    pub fn make_mut(self) -> MutableBinaryViewArray<T> {
536        let views = self.views.make_mut();
537        let completed_buffers = self.buffers.to_vec();
538        let validity = self.validity.map(|bitmap| bitmap.make_mut());
539
540        // We need to know the total_bytes_len if we are going to mutate it.
541        let mut total_bytes_len = self.total_bytes_len.load(Ordering::Relaxed);
542        if total_bytes_len == UNKNOWN_LEN {
543            total_bytes_len = views.iter().map(|view| view.length as u64).sum();
544        }
545        let total_bytes_len = total_bytes_len as usize;
546
547        MutableBinaryViewArray {
548            views,
549            completed_buffers,
550            in_progress_buffer: vec![],
551            validity,
552            phantom: Default::default(),
553            total_bytes_len,
554            total_buffer_len: self.total_buffer_len,
555            stolen_buffers: PlHashMap::new(),
556        }
557    }
558}
559
560impl BinaryViewArray {
561    /// Validate the underlying bytes on UTF-8.
562    pub fn validate_utf8(&self) -> PolarsResult<()> {
563        // SAFETY: views are correct
564        unsafe { validate_views_utf8_only(&self.views, &self.buffers, 0) }
565    }
566
567    /// Convert [`BinaryViewArray`] to [`Utf8ViewArray`].
568    pub fn to_utf8view(&self) -> PolarsResult<Utf8ViewArray> {
569        self.validate_utf8()?;
570        unsafe { Ok(self.to_utf8view_unchecked()) }
571    }
572
573    /// Convert [`BinaryViewArray`] to [`Utf8ViewArray`] without checking UTF-8.
574    ///
575    /// # Safety
576    /// The caller must ensure the underlying data is valid UTF-8.
577    pub unsafe fn to_utf8view_unchecked(&self) -> Utf8ViewArray {
578        Utf8ViewArray::new_unchecked(
579            ArrowDataType::Utf8View,
580            self.views.clone(),
581            self.buffers.clone(),
582            self.validity.clone(),
583            self.total_bytes_len.load(Ordering::Relaxed) as usize,
584            self.total_buffer_len,
585        )
586    }
587}
588
589impl Utf8ViewArray {
590    pub fn to_binview(&self) -> BinaryViewArray {
591        // SAFETY: same invariants.
592        unsafe {
593            BinaryViewArray::new_unchecked(
594                ArrowDataType::BinaryView,
595                self.views.clone(),
596                self.buffers.clone(),
597                self.validity.clone(),
598                self.total_bytes_len.load(Ordering::Relaxed) as usize,
599                self.total_buffer_len,
600            )
601        }
602    }
603}
604
605impl<T: ViewType + ?Sized> Array for BinaryViewArrayGeneric<T> {
606    fn as_any(&self) -> &dyn Any {
607        self
608    }
609
610    fn as_any_mut(&mut self) -> &mut dyn Any {
611        self
612    }
613
614    #[inline(always)]
615    fn len(&self) -> usize {
616        BinaryViewArrayGeneric::len(self)
617    }
618
619    fn dtype(&self) -> &ArrowDataType {
620        T::dtype()
621    }
622
623    fn validity(&self) -> Option<&Bitmap> {
624        self.validity.as_ref()
625    }
626
627    fn split_at_boxed(&self, offset: usize) -> (Box<dyn Array>, Box<dyn Array>) {
628        let (lhs, rhs) = Splitable::split_at(self, offset);
629        (Box::new(lhs), Box::new(rhs))
630    }
631
632    unsafe fn split_at_boxed_unchecked(&self, offset: usize) -> (Box<dyn Array>, Box<dyn Array>) {
633        let (lhs, rhs) = unsafe { Splitable::split_at_unchecked(self, offset) };
634        (Box::new(lhs), Box::new(rhs))
635    }
636
637    fn slice(&mut self, offset: usize, length: usize) {
638        assert!(
639            offset + length <= self.len(),
640            "the offset of the new Buffer cannot exceed the existing length"
641        );
642        unsafe { self.slice_unchecked(offset, length) }
643    }
644
645    unsafe fn slice_unchecked(&mut self, offset: usize, length: usize) {
646        debug_assert!(offset + length <= self.len());
647        self.validity = self
648            .validity
649            .take()
650            .map(|bitmap| bitmap.sliced_unchecked(offset, length))
651            .filter(|bitmap| bitmap.unset_bits() > 0);
652        self.views.slice_unchecked(offset, length);
653        self.total_bytes_len.store(UNKNOWN_LEN, Ordering::Relaxed)
654    }
655
656    fn with_validity(&self, validity: Option<Bitmap>) -> Box<dyn Array> {
657        debug_assert!(
658            validity.as_ref().is_none_or(|v| v.len() == self.len()),
659            "{} != {}",
660            validity.as_ref().unwrap().len(),
661            self.len()
662        );
663
664        let mut new = self.clone();
665        new.validity = validity;
666        Box::new(new)
667    }
668
669    fn to_boxed(&self) -> Box<dyn Array> {
670        Box::new(self.clone())
671    }
672}
673
674impl<T: ViewType + ?Sized> Splitable for BinaryViewArrayGeneric<T> {
675    fn check_bound(&self, offset: usize) -> bool {
676        offset <= self.len()
677    }
678
679    unsafe fn _split_at_unchecked(&self, offset: usize) -> (Self, Self) {
680        let (lhs_views, rhs_views) = unsafe { self.views.split_at_unchecked(offset) };
681        let (lhs_validity, rhs_validity) = unsafe { self.validity.split_at_unchecked(offset) };
682
683        unsafe {
684            (
685                Self::new_unchecked(
686                    self.dtype.clone(),
687                    lhs_views,
688                    self.buffers.clone(),
689                    lhs_validity,
690                    if offset == 0 { 0 } else { UNKNOWN_LEN as _ },
691                    self.total_buffer_len(),
692                ),
693                Self::new_unchecked(
694                    self.dtype.clone(),
695                    rhs_views,
696                    self.buffers.clone(),
697                    rhs_validity,
698                    if offset == self.len() {
699                        0
700                    } else {
701                        UNKNOWN_LEN as _
702                    },
703                    self.total_buffer_len(),
704                ),
705            )
706        }
707    }
708}