Skip to main content

polars_arrow/array/binview/
builder.rs

1use std::marker::PhantomData;
2use std::sync::LazyLock;
3
4use hashbrown::hash_map::Entry;
5use polars_buffer::Buffer;
6use polars_utils::IdxSize;
7use polars_utils::aliases::{InitHashMaps, PlHashMap};
8
9use crate::array::binview::{DEFAULT_BLOCK_SIZE, MAX_EXP_BLOCK_SIZE};
10use crate::array::builder::{ShareStrategy, StaticArrayBuilder};
11use crate::array::{Array, BinaryViewArrayGeneric, View, ViewType};
12use crate::bitmap::OptBitmapBuilder;
13use crate::datatypes::ArrowDataType;
14use crate::pushable::Pushable;
15
16static PLACEHOLDER_BUFFER: LazyLock<Buffer<u8>> = LazyLock::new(|| Buffer::from_static(&[]));
17
18pub struct BinaryViewArrayGenericBuilder<V: ViewType + ?Sized> {
19    dtype: ArrowDataType,
20    views: Vec<View>,
21    active_buffer: Vec<u8>,
22    active_buffer_idx: u32,
23    buffer_set: Vec<Buffer<u8>>,
24    stolen_buffers: PlHashMap<usize, u32>,
25
26    // With these we can amortize buffer set translation costs if repeatedly
27    // stealing from the same set of buffers.
28    last_buffer_set_stolen_from: Option<Buffer<Buffer<u8>>>,
29    buffer_set_translation_idxs: Vec<(u32, u32)>, // (idx, generation)
30    buffer_set_translation_generation: u32,
31
32    validity: OptBitmapBuilder,
33    /// Total bytes length if we would concatenate them all.
34    total_bytes_len: usize,
35    /// Total bytes in the buffer set (excluding remaining capacity).
36    total_buffer_len: usize,
37    view_type: PhantomData<V>,
38}
39
40impl<V: ViewType + ?Sized> BinaryViewArrayGenericBuilder<V> {
41    pub const MAX_ROW_BYTE_LEN: usize = (u32::MAX - 1) as _;
42
43    pub fn new(dtype: ArrowDataType) -> Self {
44        Self {
45            dtype,
46            views: Vec::new(),
47            active_buffer: Vec::new(),
48            active_buffer_idx: 0,
49            buffer_set: Vec::new(),
50            stolen_buffers: PlHashMap::new(),
51            last_buffer_set_stolen_from: None,
52            buffer_set_translation_idxs: Vec::new(),
53            buffer_set_translation_generation: 0,
54            validity: OptBitmapBuilder::default(),
55            total_bytes_len: 0,
56            total_buffer_len: 0,
57            view_type: PhantomData,
58        }
59    }
60
61    #[inline]
62    fn reserve_active_buffer(&mut self, additional: usize) {
63        let len = self.active_buffer.len();
64        let cap = self.active_buffer.capacity();
65        if additional > cap - len || len + additional >= Self::MAX_ROW_BYTE_LEN {
66            self.reserve_active_buffer_slow(additional);
67        }
68    }
69
70    #[cold]
71    fn reserve_active_buffer_slow(&mut self, additional: usize) {
72        assert!(
73            additional <= Self::MAX_ROW_BYTE_LEN,
74            "strings longer than 2^32 - 2 are not supported"
75        );
76
77        // Allocate a new buffer and flush the old buffer.
78        let new_capacity = (self.active_buffer.capacity() * 2)
79            .clamp(DEFAULT_BLOCK_SIZE, MAX_EXP_BLOCK_SIZE)
80            .max(additional);
81
82        let old_buffer =
83            core::mem::replace(&mut self.active_buffer, Vec::with_capacity(new_capacity));
84        if !old_buffer.is_empty() {
85            //  Replace dummy with real buffer.
86            self.buffer_set[self.active_buffer_idx as usize] = Buffer::from(old_buffer);
87        }
88        self.active_buffer_idx = self.buffer_set.len().try_into().unwrap();
89        self.buffer_set.push(PLACEHOLDER_BUFFER.clone()) // Push placeholder so active_buffer_idx stays valid.
90    }
91
92    pub fn push_value_ignore_validity(&mut self, bytes: &V) {
93        let bytes = bytes.to_bytes();
94        self.total_bytes_len += bytes.len();
95        unsafe {
96            let view = if bytes.len() > View::MAX_INLINE_SIZE as usize {
97                self.reserve_active_buffer(bytes.len());
98
99                let offset = self.active_buffer.len() as u32; // Ensured no overflow by reserve_active_buffer.
100                self.active_buffer.extend_from_slice(bytes);
101                self.total_buffer_len += bytes.len();
102                View::new_noninline_unchecked(bytes, self.active_buffer_idx, offset)
103            } else {
104                View::new_inline_unchecked(bytes)
105            };
106            self.views.push(view);
107        }
108    }
109
110    /// # Safety
111    /// The view must be inline.
112    pub unsafe fn push_inline_view_ignore_validity(&mut self, view: View) {
113        debug_assert!(view.is_inline());
114        self.total_bytes_len += view.length as usize;
115        self.views.push(view);
116    }
117
118    fn switch_active_stealing_bufferset_to(&mut self, buffer_set: &Buffer<Buffer<u8>>) {
119        if self
120            .last_buffer_set_stolen_from
121            .as_ref()
122            .is_some_and(|stolen_bs| {
123                stolen_bs.as_ptr() == buffer_set.as_ptr() && stolen_bs.len() >= buffer_set.len()
124            })
125        {
126            return; // Already active.
127        }
128
129        // Switch to new generation (invalidating all old translation indices),
130        // and resizing the buffer with invalid indices if necessary.
131        let old_gen = self.buffer_set_translation_generation;
132        self.buffer_set_translation_generation = old_gen.wrapping_add(1);
133        if self.buffer_set_translation_idxs.len() < buffer_set.len() {
134            self.buffer_set_translation_idxs
135                .resize(buffer_set.len(), (0, old_gen));
136        }
137    }
138
139    unsafe fn translate_view(
140        &mut self,
141        mut view: View,
142        other_bufferset: &Buffer<Buffer<u8>>,
143    ) -> View {
144        // Translate from old array-local buffer idx to global stolen buffer idx.
145        let (mut new_buffer_idx, gen_) = *self
146            .buffer_set_translation_idxs
147            .get_unchecked(view.buffer_idx as usize);
148        if gen_ != self.buffer_set_translation_generation {
149            // This buffer index wasn't seen before for this array, do a dedup lookup.
150            // Since we map by starting pointer and different subslices may have different lengths, we expand
151            // the buffer to the maximum it could be.
152            let buffer = other_bufferset
153                .get_unchecked(view.buffer_idx as usize)
154                .clone()
155                .expand_end_to_storage();
156            let buf_id = buffer.as_slice().as_ptr().addr();
157            let idx = match self.stolen_buffers.entry(buf_id) {
158                Entry::Occupied(o) => *o.get(),
159                Entry::Vacant(v) => {
160                    let idx = self.buffer_set.len() as u32;
161                    self.total_buffer_len += buffer.len();
162                    self.buffer_set.push(buffer);
163                    v.insert(idx);
164                    idx
165                },
166            };
167
168            // Cache result for future lookups.
169            *self
170                .buffer_set_translation_idxs
171                .get_unchecked_mut(view.buffer_idx as usize) =
172                (idx, self.buffer_set_translation_generation);
173            new_buffer_idx = idx;
174        }
175        view.buffer_idx = new_buffer_idx;
176        view
177    }
178
179    unsafe fn extend_views_dedup_ignore_validity(
180        &mut self,
181        views: impl IntoIterator<Item = View>,
182        other_bufferset: &Buffer<Buffer<u8>>,
183    ) {
184        // TODO: if there are way more buffers than length translate per-view
185        // rather than all at once.
186        self.switch_active_stealing_bufferset_to(other_bufferset);
187
188        for mut view in views {
189            if view.length > View::MAX_INLINE_SIZE {
190                view = self.translate_view(view, other_bufferset);
191            }
192            self.total_bytes_len += view.length as usize;
193            self.views.push(view);
194        }
195    }
196
197    unsafe fn extend_views_each_repeated_dedup_ignore_validity(
198        &mut self,
199        views: impl IntoIterator<Item = View>,
200        repeats: usize,
201        other_bufferset: &Buffer<Buffer<u8>>,
202    ) {
203        // TODO: if there are way more buffers than length translate per-view
204        // rather than all at once.
205        self.switch_active_stealing_bufferset_to(other_bufferset);
206
207        for mut view in views {
208            if view.length > View::MAX_INLINE_SIZE {
209                view = self.translate_view(view, other_bufferset);
210            }
211            self.total_bytes_len += repeats * view.length as usize;
212            for _ in 0..repeats {
213                self.views.push(view);
214            }
215        }
216    }
217}
218
219impl<V: ViewType + ?Sized> StaticArrayBuilder for BinaryViewArrayGenericBuilder<V> {
220    type Array = BinaryViewArrayGeneric<V>;
221
222    fn dtype(&self) -> &ArrowDataType {
223        &self.dtype
224    }
225
226    fn reserve(&mut self, additional: usize) {
227        self.views.reserve(additional);
228        self.validity.reserve(additional);
229    }
230
231    fn freeze(mut self) -> Self::Array {
232        // Flush active buffer and/or remove extra placeholder buffer.
233        if !self.active_buffer.is_empty() {
234            self.buffer_set[self.active_buffer_idx as usize] = Buffer::from(self.active_buffer);
235        } else if self.buffer_set.last().is_some_and(|b| b.is_empty()) {
236            self.buffer_set.pop();
237        }
238
239        unsafe {
240            BinaryViewArrayGeneric::new_unchecked(
241                self.dtype,
242                Buffer::from(self.views),
243                Buffer::from(self.buffer_set),
244                self.validity.into_opt_validity(),
245                Some(self.total_bytes_len),
246                self.total_buffer_len,
247            )
248        }
249    }
250
251    fn freeze_reset(&mut self) -> Self::Array {
252        // Flush active buffer and/or remove extra placeholder buffer.
253        if !self.active_buffer.is_empty() {
254            self.buffer_set[self.active_buffer_idx as usize] =
255                Buffer::from(core::mem::take(&mut self.active_buffer));
256        } else if self.buffer_set.last().is_some_and(|b| b.is_empty()) {
257            self.buffer_set.pop();
258        }
259
260        let out = unsafe {
261            BinaryViewArrayGeneric::new_unchecked(
262                self.dtype.clone(),
263                Buffer::from(core::mem::take(&mut self.views)),
264                Buffer::from(core::mem::take(&mut self.buffer_set)),
265                core::mem::take(&mut self.validity).into_opt_validity(),
266                Some(self.total_bytes_len),
267                self.total_buffer_len,
268            )
269        };
270
271        self.total_buffer_len = 0;
272        self.total_bytes_len = 0;
273        self.active_buffer_idx = 0;
274        self.stolen_buffers.clear();
275        self.last_buffer_set_stolen_from = None;
276        out
277    }
278
279    fn len(&self) -> usize {
280        self.views.len()
281    }
282
283    fn extend_nulls(&mut self, length: usize) {
284        self.views.extend_constant(length, View::default());
285        self.validity.extend_constant(length, false);
286    }
287
288    fn subslice_extend(
289        &mut self,
290        other: &Self::Array,
291        start: usize,
292        length: usize,
293        share: ShareStrategy,
294    ) {
295        self.views.reserve(length);
296
297        unsafe {
298            match share {
299                ShareStrategy::Never => {
300                    if let Some(v) = other.validity() {
301                        for i in start..start + length {
302                            if v.get_bit_unchecked(i) {
303                                self.push_value_ignore_validity(other.value_unchecked(i));
304                            } else {
305                                self.views.push(View::default())
306                            }
307                        }
308                    } else {
309                        for i in start..start + length {
310                            self.push_value_ignore_validity(other.value_unchecked(i));
311                        }
312                    }
313                },
314                ShareStrategy::Always => {
315                    let other_views = &other.views()[start..start + length];
316                    self.extend_views_dedup_ignore_validity(
317                        other_views.iter().copied(),
318                        other.data_buffers(),
319                    );
320                },
321            }
322        }
323
324        self.validity
325            .subslice_extend_from_opt_validity(other.validity(), start, length);
326    }
327
328    fn subslice_extend_each_repeated(
329        &mut self,
330        other: &Self::Array,
331        start: usize,
332        length: usize,
333        repeats: usize,
334        share: ShareStrategy,
335    ) {
336        self.views.reserve(length * repeats);
337
338        unsafe {
339            match share {
340                ShareStrategy::Never => {
341                    if let Some(v) = other.validity() {
342                        for i in start..start + length {
343                            if v.get_bit_unchecked(i) {
344                                for _ in 0..repeats {
345                                    self.push_value_ignore_validity(other.value_unchecked(i));
346                                }
347                            } else {
348                                for _ in 0..repeats {
349                                    self.views.push(View::default())
350                                }
351                            }
352                        }
353                    } else {
354                        for i in start..start + length {
355                            for _ in 0..repeats {
356                                self.push_value_ignore_validity(other.value_unchecked(i));
357                            }
358                        }
359                    }
360                },
361                ShareStrategy::Always => {
362                    let other_views = &other.views()[start..start + length];
363                    self.extend_views_each_repeated_dedup_ignore_validity(
364                        other_views.iter().copied(),
365                        repeats,
366                        other.data_buffers(),
367                    );
368                },
369            }
370        }
371
372        self.validity
373            .subslice_extend_each_repeated_from_opt_validity(
374                other.validity(),
375                start,
376                length,
377                repeats,
378            );
379    }
380
381    unsafe fn gather_extend(
382        &mut self,
383        other: &Self::Array,
384        idxs: &[IdxSize],
385        share: ShareStrategy,
386    ) {
387        self.views.reserve(idxs.len());
388
389        unsafe {
390            match share {
391                ShareStrategy::Never => {
392                    if let Some(v) = other.validity() {
393                        for idx in idxs {
394                            if v.get_bit_unchecked(*idx as usize) {
395                                self.push_value_ignore_validity(
396                                    other.value_unchecked(*idx as usize),
397                                );
398                            } else {
399                                self.views.push(View::default())
400                            }
401                        }
402                    } else {
403                        for idx in idxs {
404                            self.push_value_ignore_validity(other.value_unchecked(*idx as usize));
405                        }
406                    }
407                },
408                ShareStrategy::Always => {
409                    let other_view_slice = other.views().as_slice();
410                    let other_views = idxs
411                        .iter()
412                        .map(|idx| *other_view_slice.get_unchecked(*idx as usize));
413                    self.extend_views_dedup_ignore_validity(other_views, other.data_buffers());
414                },
415            }
416        }
417
418        self.validity
419            .gather_extend_from_opt_validity(other.validity(), idxs);
420    }
421
422    fn opt_gather_extend(&mut self, other: &Self::Array, idxs: &[IdxSize], share: ShareStrategy) {
423        self.views.reserve(idxs.len());
424
425        unsafe {
426            match share {
427                ShareStrategy::Never => {
428                    if let Some(v) = other.validity() {
429                        for idx in idxs {
430                            if (*idx as usize) < v.len() && v.get_bit_unchecked(*idx as usize) {
431                                self.push_value_ignore_validity(
432                                    other.value_unchecked(*idx as usize),
433                                );
434                            } else {
435                                self.views.push(View::default())
436                            }
437                        }
438                    } else {
439                        for idx in idxs {
440                            if (*idx as usize) < other.len() {
441                                self.push_value_ignore_validity(
442                                    other.value_unchecked(*idx as usize),
443                                );
444                            } else {
445                                self.views.push(View::default())
446                            }
447                        }
448                    }
449                },
450                ShareStrategy::Always => {
451                    let other_view_slice = other.views().as_slice();
452                    let other_views = idxs.iter().map(|idx| {
453                        other_view_slice
454                            .get(*idx as usize)
455                            .copied()
456                            .unwrap_or_default()
457                    });
458                    self.extend_views_dedup_ignore_validity(other_views, other.data_buffers());
459                },
460            }
461        }
462
463        self.validity
464            .opt_gather_extend_from_opt_validity(other.validity(), idxs, other.len());
465    }
466}