polars_arrow/array/binview/
builder.rs

1use std::marker::PhantomData;
2use std::sync::{Arc, LazyLock};
3
4use hashbrown::hash_map::Entry;
5use polars_utils::IdxSize;
6use polars_utils::aliases::{InitHashMaps, PlHashMap};
7
8use crate::array::binview::{DEFAULT_BLOCK_SIZE, MAX_EXP_BLOCK_SIZE};
9use crate::array::builder::{ShareStrategy, StaticArrayBuilder};
10use crate::array::{Array, BinaryViewArrayGeneric, View, ViewType};
11use crate::bitmap::OptBitmapBuilder;
12use crate::buffer::Buffer;
13use crate::datatypes::ArrowDataType;
14use crate::pushable::Pushable;
15
16static PLACEHOLDER_BUFFER: LazyLock<Buffer<u8>> = LazyLock::new(|| Buffer::from_static(&[]));
17
18pub struct BinaryViewArrayGenericBuilder<V: ViewType + ?Sized> {
19    dtype: ArrowDataType,
20    views: Vec<View>,
21    active_buffer: Vec<u8>,
22    active_buffer_idx: u32,
23    buffer_set: Vec<Buffer<u8>>,
24    stolen_buffers: PlHashMap<usize, u32>,
25
26    // With these we can amortize buffer set translation costs if repeatedly
27    // stealing from the same set of buffers.
28    last_buffer_set_stolen_from: Option<Arc<[Buffer<u8>]>>,
29    buffer_set_translation_idxs: Vec<(u32, u32)>, // (idx, generation)
30    buffer_set_translation_generation: u32,
31
32    validity: OptBitmapBuilder,
33    /// Total bytes length if we would concatenate them all.
34    total_bytes_len: usize,
35    /// Total bytes in the buffer set (excluding remaining capacity).
36    total_buffer_len: usize,
37    view_type: PhantomData<V>,
38}
39
40impl<V: ViewType + ?Sized> BinaryViewArrayGenericBuilder<V> {
41    pub fn new(dtype: ArrowDataType) -> Self {
42        Self {
43            dtype,
44            views: Vec::new(),
45            active_buffer: Vec::new(),
46            active_buffer_idx: 0,
47            buffer_set: Vec::new(),
48            stolen_buffers: PlHashMap::new(),
49            last_buffer_set_stolen_from: None,
50            buffer_set_translation_idxs: Vec::new(),
51            buffer_set_translation_generation: 0,
52            validity: OptBitmapBuilder::default(),
53            total_bytes_len: 0,
54            total_buffer_len: 0,
55            view_type: PhantomData,
56        }
57    }
58
59    #[inline]
60    fn reserve_active_buffer(&mut self, additional: usize) {
61        let len = self.active_buffer.len();
62        let cap = self.active_buffer.capacity();
63        if additional > cap - len || len + additional >= (u32::MAX - 1) as usize {
64            self.reserve_active_buffer_slow(additional);
65        }
66    }
67
68    #[cold]
69    fn reserve_active_buffer_slow(&mut self, additional: usize) {
70        assert!(
71            additional <= (u32::MAX - 1) as usize,
72            "strings longer than 2^32 - 2 are not supported"
73        );
74
75        // Allocate a new buffer and flush the old buffer.
76        let new_capacity = (self.active_buffer.capacity() * 2)
77            .clamp(DEFAULT_BLOCK_SIZE, MAX_EXP_BLOCK_SIZE)
78            .max(additional);
79
80        let old_buffer =
81            core::mem::replace(&mut self.active_buffer, Vec::with_capacity(new_capacity));
82        if !old_buffer.is_empty() {
83            //  Replace dummy with real buffer.
84            self.buffer_set[self.active_buffer_idx as usize] = Buffer::from(old_buffer);
85        }
86        self.active_buffer_idx = self.buffer_set.len().try_into().unwrap();
87        self.buffer_set.push(PLACEHOLDER_BUFFER.clone()) // Push placeholder so active_buffer_idx stays valid.
88    }
89
90    pub fn push_value_ignore_validity(&mut self, bytes: &V) {
91        let bytes = bytes.to_bytes();
92        self.total_bytes_len += bytes.len();
93        unsafe {
94            let view = if bytes.len() > View::MAX_INLINE_SIZE as usize {
95                self.reserve_active_buffer(bytes.len());
96
97                let offset = self.active_buffer.len() as u32; // Ensured no overflow by reserve_active_buffer.
98                self.active_buffer.extend_from_slice(bytes);
99                self.total_buffer_len += bytes.len();
100                View::new_noninline_unchecked(bytes, self.active_buffer_idx, offset)
101            } else {
102                View::new_inline_unchecked(bytes)
103            };
104            self.views.push(view);
105        }
106    }
107
108    /// # Safety
109    /// The view must be inline.
110    pub unsafe fn push_inline_view_ignore_validity(&mut self, view: View) {
111        debug_assert!(view.is_inline());
112        self.total_bytes_len += view.length as usize;
113        self.views.push(view);
114    }
115
116    fn switch_active_stealing_bufferset_to(&mut self, buffer_set: &Arc<[Buffer<u8>]>) {
117        // Fat pointer equality, checks both start and length.
118        if self
119            .last_buffer_set_stolen_from
120            .as_ref()
121            .is_some_and(|stolen_bs| std::ptr::eq(Arc::as_ptr(stolen_bs), Arc::as_ptr(buffer_set)))
122        {
123            return; // Already active.
124        }
125
126        // Switch to new generation (invalidating all old translation indices),
127        // and resizing the buffer with invalid indices if necessary.
128        let old_gen = self.buffer_set_translation_generation;
129        self.buffer_set_translation_generation = old_gen.wrapping_add(1);
130        if self.buffer_set_translation_idxs.len() < buffer_set.len() {
131            self.buffer_set_translation_idxs
132                .resize(buffer_set.len(), (0, old_gen));
133        }
134    }
135
136    unsafe fn translate_view(
137        &mut self,
138        mut view: View,
139        other_bufferset: &Arc<[Buffer<u8>]>,
140    ) -> View {
141        // Translate from old array-local buffer idx to global stolen buffer idx.
142        let (mut new_buffer_idx, gen_) = *self
143            .buffer_set_translation_idxs
144            .get_unchecked(view.buffer_idx as usize);
145        if gen_ != self.buffer_set_translation_generation {
146            // This buffer index wasn't seen before for this array, do a dedup lookup.
147            // Since we map by starting pointer and different subslices may have different lengths, we expand
148            // the buffer to the maximum it could be.
149            let buffer = other_bufferset
150                .get_unchecked(view.buffer_idx as usize)
151                .clone()
152                .expand_end_to_storage();
153            let buf_id = buffer.as_slice().as_ptr().addr();
154            let idx = match self.stolen_buffers.entry(buf_id) {
155                Entry::Occupied(o) => *o.get(),
156                Entry::Vacant(v) => {
157                    let idx = self.buffer_set.len() as u32;
158                    self.total_buffer_len += buffer.len();
159                    self.buffer_set.push(buffer);
160                    v.insert(idx);
161                    idx
162                },
163            };
164
165            // Cache result for future lookups.
166            *self
167                .buffer_set_translation_idxs
168                .get_unchecked_mut(view.buffer_idx as usize) =
169                (idx, self.buffer_set_translation_generation);
170            new_buffer_idx = idx;
171        }
172        view.buffer_idx = new_buffer_idx;
173        view
174    }
175
176    unsafe fn extend_views_dedup_ignore_validity(
177        &mut self,
178        views: impl IntoIterator<Item = View>,
179        other_bufferset: &Arc<[Buffer<u8>]>,
180    ) {
181        // TODO: if there are way more buffers than length translate per-view
182        // rather than all at once.
183        self.switch_active_stealing_bufferset_to(other_bufferset);
184
185        for mut view in views {
186            if view.length > View::MAX_INLINE_SIZE {
187                view = self.translate_view(view, other_bufferset);
188            }
189            self.total_bytes_len += view.length as usize;
190            self.views.push(view);
191        }
192    }
193
194    unsafe fn extend_views_each_repeated_dedup_ignore_validity(
195        &mut self,
196        views: impl IntoIterator<Item = View>,
197        repeats: usize,
198        other_bufferset: &Arc<[Buffer<u8>]>,
199    ) {
200        // TODO: if there are way more buffers than length translate per-view
201        // rather than all at once.
202        self.switch_active_stealing_bufferset_to(other_bufferset);
203
204        for mut view in views {
205            if view.length > View::MAX_INLINE_SIZE {
206                view = self.translate_view(view, other_bufferset);
207            }
208            self.total_bytes_len += repeats * view.length as usize;
209            for _ in 0..repeats {
210                self.views.push(view);
211            }
212        }
213    }
214}
215
216impl<V: ViewType + ?Sized> StaticArrayBuilder for BinaryViewArrayGenericBuilder<V> {
217    type Array = BinaryViewArrayGeneric<V>;
218
219    fn dtype(&self) -> &ArrowDataType {
220        &self.dtype
221    }
222
223    fn reserve(&mut self, additional: usize) {
224        self.views.reserve(additional);
225        self.validity.reserve(additional);
226    }
227
228    fn freeze(mut self) -> Self::Array {
229        // Flush active buffer and/or remove extra placeholder buffer.
230        if !self.active_buffer.is_empty() {
231            self.buffer_set[self.active_buffer_idx as usize] = Buffer::from(self.active_buffer);
232        } else if self.buffer_set.last().is_some_and(|b| b.is_empty()) {
233            self.buffer_set.pop();
234        }
235
236        unsafe {
237            BinaryViewArrayGeneric::new_unchecked(
238                self.dtype,
239                Buffer::from(self.views),
240                Arc::from(self.buffer_set),
241                self.validity.into_opt_validity(),
242                self.total_bytes_len,
243                self.total_buffer_len,
244            )
245        }
246    }
247
248    fn freeze_reset(&mut self) -> Self::Array {
249        // Flush active buffer and/or remove extra placeholder buffer.
250        if !self.active_buffer.is_empty() {
251            self.buffer_set[self.active_buffer_idx as usize] =
252                Buffer::from(core::mem::take(&mut self.active_buffer));
253        } else if self.buffer_set.last().is_some_and(|b| b.is_empty()) {
254            self.buffer_set.pop();
255        }
256
257        let out = unsafe {
258            BinaryViewArrayGeneric::new_unchecked(
259                self.dtype.clone(),
260                Buffer::from(core::mem::take(&mut self.views)),
261                Arc::from(core::mem::take(&mut self.buffer_set)),
262                core::mem::take(&mut self.validity).into_opt_validity(),
263                self.total_bytes_len,
264                self.total_buffer_len,
265            )
266        };
267
268        self.total_buffer_len = 0;
269        self.total_bytes_len = 0;
270        self.active_buffer_idx = 0;
271        self.stolen_buffers.clear();
272        self.last_buffer_set_stolen_from = None;
273        out
274    }
275
276    fn len(&self) -> usize {
277        self.views.len()
278    }
279
280    fn extend_nulls(&mut self, length: usize) {
281        self.views.extend_constant(length, View::default());
282        self.validity.extend_constant(length, false);
283    }
284
285    fn subslice_extend(
286        &mut self,
287        other: &Self::Array,
288        start: usize,
289        length: usize,
290        share: ShareStrategy,
291    ) {
292        self.views.reserve(length);
293
294        unsafe {
295            match share {
296                ShareStrategy::Never => {
297                    if let Some(v) = other.validity() {
298                        for i in start..start + length {
299                            if v.get_bit_unchecked(i) {
300                                self.push_value_ignore_validity(other.value_unchecked(i));
301                            } else {
302                                self.views.push(View::default())
303                            }
304                        }
305                    } else {
306                        for i in start..start + length {
307                            self.push_value_ignore_validity(other.value_unchecked(i));
308                        }
309                    }
310                },
311                ShareStrategy::Always => {
312                    let other_views = &other.views()[start..start + length];
313                    self.extend_views_dedup_ignore_validity(
314                        other_views.iter().copied(),
315                        other.data_buffers(),
316                    );
317                },
318            }
319        }
320
321        self.validity
322            .subslice_extend_from_opt_validity(other.validity(), start, length);
323    }
324
325    fn subslice_extend_each_repeated(
326        &mut self,
327        other: &Self::Array,
328        start: usize,
329        length: usize,
330        repeats: usize,
331        share: ShareStrategy,
332    ) {
333        self.views.reserve(length * repeats);
334
335        unsafe {
336            match share {
337                ShareStrategy::Never => {
338                    if let Some(v) = other.validity() {
339                        for i in start..start + length {
340                            if v.get_bit_unchecked(i) {
341                                for _ in 0..repeats {
342                                    self.push_value_ignore_validity(other.value_unchecked(i));
343                                }
344                            } else {
345                                for _ in 0..repeats {
346                                    self.views.push(View::default())
347                                }
348                            }
349                        }
350                    } else {
351                        for i in start..start + length {
352                            for _ in 0..repeats {
353                                self.push_value_ignore_validity(other.value_unchecked(i));
354                            }
355                        }
356                    }
357                },
358                ShareStrategy::Always => {
359                    let other_views = &other.views()[start..start + length];
360                    self.extend_views_each_repeated_dedup_ignore_validity(
361                        other_views.iter().copied(),
362                        repeats,
363                        other.data_buffers(),
364                    );
365                },
366            }
367        }
368
369        self.validity
370            .subslice_extend_each_repeated_from_opt_validity(
371                other.validity(),
372                start,
373                length,
374                repeats,
375            );
376    }
377
378    unsafe fn gather_extend(
379        &mut self,
380        other: &Self::Array,
381        idxs: &[IdxSize],
382        share: ShareStrategy,
383    ) {
384        self.views.reserve(idxs.len());
385
386        unsafe {
387            match share {
388                ShareStrategy::Never => {
389                    if let Some(v) = other.validity() {
390                        for idx in idxs {
391                            if v.get_bit_unchecked(*idx as usize) {
392                                self.push_value_ignore_validity(
393                                    other.value_unchecked(*idx as usize),
394                                );
395                            } else {
396                                self.views.push(View::default())
397                            }
398                        }
399                    } else {
400                        for idx in idxs {
401                            self.push_value_ignore_validity(other.value_unchecked(*idx as usize));
402                        }
403                    }
404                },
405                ShareStrategy::Always => {
406                    let other_view_slice = other.views().as_slice();
407                    let other_views = idxs
408                        .iter()
409                        .map(|idx| *other_view_slice.get_unchecked(*idx as usize));
410                    self.extend_views_dedup_ignore_validity(other_views, other.data_buffers());
411                },
412            }
413        }
414
415        self.validity
416            .gather_extend_from_opt_validity(other.validity(), idxs);
417    }
418
419    fn opt_gather_extend(&mut self, other: &Self::Array, idxs: &[IdxSize], share: ShareStrategy) {
420        self.views.reserve(idxs.len());
421
422        unsafe {
423            match share {
424                ShareStrategy::Never => {
425                    if let Some(v) = other.validity() {
426                        for idx in idxs {
427                            if (*idx as usize) < v.len() && v.get_bit_unchecked(*idx as usize) {
428                                self.push_value_ignore_validity(
429                                    other.value_unchecked(*idx as usize),
430                                );
431                            } else {
432                                self.views.push(View::default())
433                            }
434                        }
435                    } else {
436                        for idx in idxs {
437                            if (*idx as usize) < other.len() {
438                                self.push_value_ignore_validity(
439                                    other.value_unchecked(*idx as usize),
440                                );
441                            } else {
442                                self.views.push(View::default())
443                            }
444                        }
445                    }
446                },
447                ShareStrategy::Always => {
448                    let other_view_slice = other.views().as_slice();
449                    let other_views = idxs.iter().map(|idx| {
450                        other_view_slice
451                            .get(*idx as usize)
452                            .copied()
453                            .unwrap_or_default()
454                    });
455                    self.extend_views_dedup_ignore_validity(other_views, other.data_buffers());
456                },
457            }
458        }
459
460        self.validity
461            .opt_gather_extend_from_opt_validity(other.validity(), idxs, other.len());
462    }
463}