polars_arrow/array/binview/
builder.rs1use std::marker::PhantomData;
2use std::sync::{Arc, LazyLock};
3
4use hashbrown::hash_map::Entry;
5use polars_utils::IdxSize;
6use polars_utils::aliases::{InitHashMaps, PlHashMap};
7
8use crate::array::binview::{DEFAULT_BLOCK_SIZE, MAX_EXP_BLOCK_SIZE};
9use crate::array::builder::{ShareStrategy, StaticArrayBuilder};
10use crate::array::{Array, BinaryViewArrayGeneric, View, ViewType};
11use crate::bitmap::OptBitmapBuilder;
12use crate::buffer::Buffer;
13use crate::datatypes::ArrowDataType;
14use crate::pushable::Pushable;
15
16static PLACEHOLDER_BUFFER: LazyLock<Buffer<u8>> = LazyLock::new(|| Buffer::from_static(&[]));
17
18pub struct BinaryViewArrayGenericBuilder<V: ViewType + ?Sized> {
19 dtype: ArrowDataType,
20 views: Vec<View>,
21 active_buffer: Vec<u8>,
22 active_buffer_idx: u32,
23 buffer_set: Vec<Buffer<u8>>,
24 stolen_buffers: PlHashMap<usize, u32>,
25
26 last_buffer_set_stolen_from: Option<Arc<[Buffer<u8>]>>,
29 buffer_set_translation_idxs: Vec<(u32, u32)>, buffer_set_translation_generation: u32,
31
32 validity: OptBitmapBuilder,
33 total_bytes_len: usize,
35 total_buffer_len: usize,
37 view_type: PhantomData<V>,
38}
39
40impl<V: ViewType + ?Sized> BinaryViewArrayGenericBuilder<V> {
41 pub fn new(dtype: ArrowDataType) -> Self {
42 Self {
43 dtype,
44 views: Vec::new(),
45 active_buffer: Vec::new(),
46 active_buffer_idx: 0,
47 buffer_set: Vec::new(),
48 stolen_buffers: PlHashMap::new(),
49 last_buffer_set_stolen_from: None,
50 buffer_set_translation_idxs: Vec::new(),
51 buffer_set_translation_generation: 0,
52 validity: OptBitmapBuilder::default(),
53 total_bytes_len: 0,
54 total_buffer_len: 0,
55 view_type: PhantomData,
56 }
57 }
58
59 #[inline]
60 fn reserve_active_buffer(&mut self, additional: usize) {
61 let len = self.active_buffer.len();
62 let cap = self.active_buffer.capacity();
63 if additional > cap - len || len + additional >= (u32::MAX - 1) as usize {
64 self.reserve_active_buffer_slow(additional);
65 }
66 }
67
68 #[cold]
69 fn reserve_active_buffer_slow(&mut self, additional: usize) {
70 assert!(
71 additional <= (u32::MAX - 1) as usize,
72 "strings longer than 2^32 - 2 are not supported"
73 );
74
75 let new_capacity = (self.active_buffer.capacity() * 2)
77 .clamp(DEFAULT_BLOCK_SIZE, MAX_EXP_BLOCK_SIZE)
78 .max(additional);
79
80 let old_buffer =
81 core::mem::replace(&mut self.active_buffer, Vec::with_capacity(new_capacity));
82 if !old_buffer.is_empty() {
83 self.buffer_set[self.active_buffer_idx as usize] = Buffer::from(old_buffer);
85 }
86 self.active_buffer_idx = self.buffer_set.len().try_into().unwrap();
87 self.buffer_set.push(PLACEHOLDER_BUFFER.clone()) }
89
90 pub fn push_value_ignore_validity(&mut self, bytes: &V) {
91 let bytes = bytes.to_bytes();
92 self.total_bytes_len += bytes.len();
93 unsafe {
94 let view = if bytes.len() > View::MAX_INLINE_SIZE as usize {
95 self.reserve_active_buffer(bytes.len());
96
97 let offset = self.active_buffer.len() as u32; self.active_buffer.extend_from_slice(bytes);
99 self.total_buffer_len += bytes.len();
100 View::new_noninline_unchecked(bytes, self.active_buffer_idx, offset)
101 } else {
102 View::new_inline_unchecked(bytes)
103 };
104 self.views.push(view);
105 }
106 }
107
108 pub unsafe fn push_inline_view_ignore_validity(&mut self, view: View) {
111 debug_assert!(view.is_inline());
112 self.total_bytes_len += view.length as usize;
113 self.views.push(view);
114 }
115
116 fn switch_active_stealing_bufferset_to(&mut self, buffer_set: &Arc<[Buffer<u8>]>) {
117 if self
119 .last_buffer_set_stolen_from
120 .as_ref()
121 .is_some_and(|stolen_bs| std::ptr::eq(Arc::as_ptr(stolen_bs), Arc::as_ptr(buffer_set)))
122 {
123 return; }
125
126 let old_gen = self.buffer_set_translation_generation;
129 self.buffer_set_translation_generation = old_gen.wrapping_add(1);
130 if self.buffer_set_translation_idxs.len() < buffer_set.len() {
131 self.buffer_set_translation_idxs
132 .resize(buffer_set.len(), (0, old_gen));
133 }
134 }
135
136 unsafe fn translate_view(
137 &mut self,
138 mut view: View,
139 other_bufferset: &Arc<[Buffer<u8>]>,
140 ) -> View {
141 let (mut new_buffer_idx, gen_) = *self
143 .buffer_set_translation_idxs
144 .get_unchecked(view.buffer_idx as usize);
145 if gen_ != self.buffer_set_translation_generation {
146 let buffer = other_bufferset
150 .get_unchecked(view.buffer_idx as usize)
151 .clone()
152 .expand_end_to_storage();
153 let buf_id = buffer.as_slice().as_ptr().addr();
154 let idx = match self.stolen_buffers.entry(buf_id) {
155 Entry::Occupied(o) => *o.get(),
156 Entry::Vacant(v) => {
157 let idx = self.buffer_set.len() as u32;
158 self.total_buffer_len += buffer.len();
159 self.buffer_set.push(buffer);
160 v.insert(idx);
161 idx
162 },
163 };
164
165 *self
167 .buffer_set_translation_idxs
168 .get_unchecked_mut(view.buffer_idx as usize) =
169 (idx, self.buffer_set_translation_generation);
170 new_buffer_idx = idx;
171 }
172 view.buffer_idx = new_buffer_idx;
173 view
174 }
175
176 unsafe fn extend_views_dedup_ignore_validity(
177 &mut self,
178 views: impl IntoIterator<Item = View>,
179 other_bufferset: &Arc<[Buffer<u8>]>,
180 ) {
181 self.switch_active_stealing_bufferset_to(other_bufferset);
184
185 for mut view in views {
186 if view.length > View::MAX_INLINE_SIZE {
187 view = self.translate_view(view, other_bufferset);
188 }
189 self.total_bytes_len += view.length as usize;
190 self.views.push(view);
191 }
192 }
193
194 unsafe fn extend_views_each_repeated_dedup_ignore_validity(
195 &mut self,
196 views: impl IntoIterator<Item = View>,
197 repeats: usize,
198 other_bufferset: &Arc<[Buffer<u8>]>,
199 ) {
200 self.switch_active_stealing_bufferset_to(other_bufferset);
203
204 for mut view in views {
205 if view.length > View::MAX_INLINE_SIZE {
206 view = self.translate_view(view, other_bufferset);
207 }
208 self.total_bytes_len += repeats * view.length as usize;
209 for _ in 0..repeats {
210 self.views.push(view);
211 }
212 }
213 }
214}
215
216impl<V: ViewType + ?Sized> StaticArrayBuilder for BinaryViewArrayGenericBuilder<V> {
217 type Array = BinaryViewArrayGeneric<V>;
218
219 fn dtype(&self) -> &ArrowDataType {
220 &self.dtype
221 }
222
223 fn reserve(&mut self, additional: usize) {
224 self.views.reserve(additional);
225 self.validity.reserve(additional);
226 }
227
228 fn freeze(mut self) -> Self::Array {
229 if !self.active_buffer.is_empty() {
231 self.buffer_set[self.active_buffer_idx as usize] = Buffer::from(self.active_buffer);
232 } else if self.buffer_set.last().is_some_and(|b| b.is_empty()) {
233 self.buffer_set.pop();
234 }
235
236 unsafe {
237 BinaryViewArrayGeneric::new_unchecked(
238 self.dtype,
239 Buffer::from(self.views),
240 Arc::from(self.buffer_set),
241 self.validity.into_opt_validity(),
242 self.total_bytes_len,
243 self.total_buffer_len,
244 )
245 }
246 }
247
248 fn freeze_reset(&mut self) -> Self::Array {
249 if !self.active_buffer.is_empty() {
251 self.buffer_set[self.active_buffer_idx as usize] =
252 Buffer::from(core::mem::take(&mut self.active_buffer));
253 } else if self.buffer_set.last().is_some_and(|b| b.is_empty()) {
254 self.buffer_set.pop();
255 }
256
257 let out = unsafe {
258 BinaryViewArrayGeneric::new_unchecked(
259 self.dtype.clone(),
260 Buffer::from(core::mem::take(&mut self.views)),
261 Arc::from(core::mem::take(&mut self.buffer_set)),
262 core::mem::take(&mut self.validity).into_opt_validity(),
263 self.total_bytes_len,
264 self.total_buffer_len,
265 )
266 };
267
268 self.total_buffer_len = 0;
269 self.total_bytes_len = 0;
270 self.active_buffer_idx = 0;
271 self.stolen_buffers.clear();
272 self.last_buffer_set_stolen_from = None;
273 out
274 }
275
276 fn len(&self) -> usize {
277 self.views.len()
278 }
279
280 fn extend_nulls(&mut self, length: usize) {
281 self.views.extend_constant(length, View::default());
282 self.validity.extend_constant(length, false);
283 }
284
285 fn subslice_extend(
286 &mut self,
287 other: &Self::Array,
288 start: usize,
289 length: usize,
290 share: ShareStrategy,
291 ) {
292 self.views.reserve(length);
293
294 unsafe {
295 match share {
296 ShareStrategy::Never => {
297 if let Some(v) = other.validity() {
298 for i in start..start + length {
299 if v.get_bit_unchecked(i) {
300 self.push_value_ignore_validity(other.value_unchecked(i));
301 } else {
302 self.views.push(View::default())
303 }
304 }
305 } else {
306 for i in start..start + length {
307 self.push_value_ignore_validity(other.value_unchecked(i));
308 }
309 }
310 },
311 ShareStrategy::Always => {
312 let other_views = &other.views()[start..start + length];
313 self.extend_views_dedup_ignore_validity(
314 other_views.iter().copied(),
315 other.data_buffers(),
316 );
317 },
318 }
319 }
320
321 self.validity
322 .subslice_extend_from_opt_validity(other.validity(), start, length);
323 }
324
325 fn subslice_extend_each_repeated(
326 &mut self,
327 other: &Self::Array,
328 start: usize,
329 length: usize,
330 repeats: usize,
331 share: ShareStrategy,
332 ) {
333 self.views.reserve(length * repeats);
334
335 unsafe {
336 match share {
337 ShareStrategy::Never => {
338 if let Some(v) = other.validity() {
339 for i in start..start + length {
340 if v.get_bit_unchecked(i) {
341 for _ in 0..repeats {
342 self.push_value_ignore_validity(other.value_unchecked(i));
343 }
344 } else {
345 for _ in 0..repeats {
346 self.views.push(View::default())
347 }
348 }
349 }
350 } else {
351 for i in start..start + length {
352 for _ in 0..repeats {
353 self.push_value_ignore_validity(other.value_unchecked(i));
354 }
355 }
356 }
357 },
358 ShareStrategy::Always => {
359 let other_views = &other.views()[start..start + length];
360 self.extend_views_each_repeated_dedup_ignore_validity(
361 other_views.iter().copied(),
362 repeats,
363 other.data_buffers(),
364 );
365 },
366 }
367 }
368
369 self.validity
370 .subslice_extend_each_repeated_from_opt_validity(
371 other.validity(),
372 start,
373 length,
374 repeats,
375 );
376 }
377
378 unsafe fn gather_extend(
379 &mut self,
380 other: &Self::Array,
381 idxs: &[IdxSize],
382 share: ShareStrategy,
383 ) {
384 self.views.reserve(idxs.len());
385
386 unsafe {
387 match share {
388 ShareStrategy::Never => {
389 if let Some(v) = other.validity() {
390 for idx in idxs {
391 if v.get_bit_unchecked(*idx as usize) {
392 self.push_value_ignore_validity(
393 other.value_unchecked(*idx as usize),
394 );
395 } else {
396 self.views.push(View::default())
397 }
398 }
399 } else {
400 for idx in idxs {
401 self.push_value_ignore_validity(other.value_unchecked(*idx as usize));
402 }
403 }
404 },
405 ShareStrategy::Always => {
406 let other_view_slice = other.views().as_slice();
407 let other_views = idxs
408 .iter()
409 .map(|idx| *other_view_slice.get_unchecked(*idx as usize));
410 self.extend_views_dedup_ignore_validity(other_views, other.data_buffers());
411 },
412 }
413 }
414
415 self.validity
416 .gather_extend_from_opt_validity(other.validity(), idxs);
417 }
418
419 fn opt_gather_extend(&mut self, other: &Self::Array, idxs: &[IdxSize], share: ShareStrategy) {
420 self.views.reserve(idxs.len());
421
422 unsafe {
423 match share {
424 ShareStrategy::Never => {
425 if let Some(v) = other.validity() {
426 for idx in idxs {
427 if (*idx as usize) < v.len() && v.get_bit_unchecked(*idx as usize) {
428 self.push_value_ignore_validity(
429 other.value_unchecked(*idx as usize),
430 );
431 } else {
432 self.views.push(View::default())
433 }
434 }
435 } else {
436 for idx in idxs {
437 if (*idx as usize) < other.len() {
438 self.push_value_ignore_validity(
439 other.value_unchecked(*idx as usize),
440 );
441 } else {
442 self.views.push(View::default())
443 }
444 }
445 }
446 },
447 ShareStrategy::Always => {
448 let other_view_slice = other.views().as_slice();
449 let other_views = idxs.iter().map(|idx| {
450 other_view_slice
451 .get(*idx as usize)
452 .copied()
453 .unwrap_or_default()
454 });
455 self.extend_views_dedup_ignore_validity(other_views, other.data_buffers());
456 },
457 }
458 }
459
460 self.validity
461 .opt_gather_extend_from_opt_validity(other.validity(), idxs, other.len());
462 }
463}