polars_arrow/array/binview/
builder.rs1use std::marker::PhantomData;
2use std::sync::LazyLock;
3
4use hashbrown::hash_map::Entry;
5use polars_buffer::Buffer;
6use polars_utils::IdxSize;
7use polars_utils::aliases::{InitHashMaps, PlHashMap};
8
9use crate::array::binview::{DEFAULT_BLOCK_SIZE, MAX_EXP_BLOCK_SIZE};
10use crate::array::builder::{ShareStrategy, StaticArrayBuilder};
11use crate::array::{Array, BinaryViewArrayGeneric, View, ViewType};
12use crate::bitmap::OptBitmapBuilder;
13use crate::datatypes::ArrowDataType;
14use crate::pushable::Pushable;
15
16static PLACEHOLDER_BUFFER: LazyLock<Buffer<u8>> = LazyLock::new(|| Buffer::from_static(&[]));
17
18pub struct BinaryViewArrayGenericBuilder<V: ViewType + ?Sized> {
19 dtype: ArrowDataType,
20 views: Vec<View>,
21 active_buffer: Vec<u8>,
22 active_buffer_idx: u32,
23 buffer_set: Vec<Buffer<u8>>,
24 stolen_buffers: PlHashMap<usize, u32>,
25
26 last_buffer_set_stolen_from: Option<Buffer<Buffer<u8>>>,
29 buffer_set_translation_idxs: Vec<(u32, u32)>, buffer_set_translation_generation: u32,
31
32 validity: OptBitmapBuilder,
33 total_bytes_len: usize,
35 total_buffer_len: usize,
37 view_type: PhantomData<V>,
38}
39
40impl<V: ViewType + ?Sized> BinaryViewArrayGenericBuilder<V> {
41 pub const MAX_ROW_BYTE_LEN: usize = (u32::MAX - 1) as _;
42
43 pub fn new(dtype: ArrowDataType) -> Self {
44 Self {
45 dtype,
46 views: Vec::new(),
47 active_buffer: Vec::new(),
48 active_buffer_idx: 0,
49 buffer_set: Vec::new(),
50 stolen_buffers: PlHashMap::new(),
51 last_buffer_set_stolen_from: None,
52 buffer_set_translation_idxs: Vec::new(),
53 buffer_set_translation_generation: 0,
54 validity: OptBitmapBuilder::default(),
55 total_bytes_len: 0,
56 total_buffer_len: 0,
57 view_type: PhantomData,
58 }
59 }
60
61 #[inline]
62 fn reserve_active_buffer(&mut self, additional: usize) {
63 let len = self.active_buffer.len();
64 let cap = self.active_buffer.capacity();
65 if additional > cap - len || len + additional >= Self::MAX_ROW_BYTE_LEN {
66 self.reserve_active_buffer_slow(additional);
67 }
68 }
69
70 #[cold]
71 fn reserve_active_buffer_slow(&mut self, additional: usize) {
72 assert!(
73 additional <= Self::MAX_ROW_BYTE_LEN,
74 "strings longer than 2^32 - 2 are not supported"
75 );
76
77 let new_capacity = (self.active_buffer.capacity() * 2)
79 .clamp(DEFAULT_BLOCK_SIZE, MAX_EXP_BLOCK_SIZE)
80 .max(additional);
81
82 let old_buffer =
83 core::mem::replace(&mut self.active_buffer, Vec::with_capacity(new_capacity));
84 if !old_buffer.is_empty() {
85 self.buffer_set[self.active_buffer_idx as usize] = Buffer::from(old_buffer);
87 }
88 self.active_buffer_idx = self.buffer_set.len().try_into().unwrap();
89 self.buffer_set.push(PLACEHOLDER_BUFFER.clone()) }
91
92 pub fn push_value_ignore_validity(&mut self, bytes: &V) {
93 let bytes = bytes.to_bytes();
94 self.total_bytes_len += bytes.len();
95 unsafe {
96 let view = if bytes.len() > View::MAX_INLINE_SIZE as usize {
97 self.reserve_active_buffer(bytes.len());
98
99 let offset = self.active_buffer.len() as u32; self.active_buffer.extend_from_slice(bytes);
101 self.total_buffer_len += bytes.len();
102 View::new_noninline_unchecked(bytes, self.active_buffer_idx, offset)
103 } else {
104 View::new_inline_unchecked(bytes)
105 };
106 self.views.push(view);
107 }
108 }
109
110 pub unsafe fn push_inline_view_ignore_validity(&mut self, view: View) {
113 debug_assert!(view.is_inline());
114 self.total_bytes_len += view.length as usize;
115 self.views.push(view);
116 }
117
118 fn switch_active_stealing_bufferset_to(&mut self, buffer_set: &Buffer<Buffer<u8>>) {
119 if self
120 .last_buffer_set_stolen_from
121 .as_ref()
122 .is_some_and(|stolen_bs| {
123 stolen_bs.as_ptr() == buffer_set.as_ptr() && stolen_bs.len() >= buffer_set.len()
124 })
125 {
126 return; }
128
129 let old_gen = self.buffer_set_translation_generation;
132 self.buffer_set_translation_generation = old_gen.wrapping_add(1);
133 if self.buffer_set_translation_idxs.len() < buffer_set.len() {
134 self.buffer_set_translation_idxs
135 .resize(buffer_set.len(), (0, old_gen));
136 }
137 }
138
139 unsafe fn translate_view(
140 &mut self,
141 mut view: View,
142 other_bufferset: &Buffer<Buffer<u8>>,
143 ) -> View {
144 let (mut new_buffer_idx, gen_) = *self
146 .buffer_set_translation_idxs
147 .get_unchecked(view.buffer_idx as usize);
148 if gen_ != self.buffer_set_translation_generation {
149 let buffer = other_bufferset
153 .get_unchecked(view.buffer_idx as usize)
154 .clone()
155 .expand_end_to_storage();
156 let buf_id = buffer.as_slice().as_ptr().addr();
157 let idx = match self.stolen_buffers.entry(buf_id) {
158 Entry::Occupied(o) => *o.get(),
159 Entry::Vacant(v) => {
160 let idx = self.buffer_set.len() as u32;
161 self.total_buffer_len += buffer.len();
162 self.buffer_set.push(buffer);
163 v.insert(idx);
164 idx
165 },
166 };
167
168 *self
170 .buffer_set_translation_idxs
171 .get_unchecked_mut(view.buffer_idx as usize) =
172 (idx, self.buffer_set_translation_generation);
173 new_buffer_idx = idx;
174 }
175 view.buffer_idx = new_buffer_idx;
176 view
177 }
178
179 unsafe fn extend_views_dedup_ignore_validity(
180 &mut self,
181 views: impl IntoIterator<Item = View>,
182 other_bufferset: &Buffer<Buffer<u8>>,
183 ) {
184 self.switch_active_stealing_bufferset_to(other_bufferset);
187
188 for mut view in views {
189 if view.length > View::MAX_INLINE_SIZE {
190 view = self.translate_view(view, other_bufferset);
191 }
192 self.total_bytes_len += view.length as usize;
193 self.views.push(view);
194 }
195 }
196
197 unsafe fn extend_views_each_repeated_dedup_ignore_validity(
198 &mut self,
199 views: impl IntoIterator<Item = View>,
200 repeats: usize,
201 other_bufferset: &Buffer<Buffer<u8>>,
202 ) {
203 self.switch_active_stealing_bufferset_to(other_bufferset);
206
207 for mut view in views {
208 if view.length > View::MAX_INLINE_SIZE {
209 view = self.translate_view(view, other_bufferset);
210 }
211 self.total_bytes_len += repeats * view.length as usize;
212 for _ in 0..repeats {
213 self.views.push(view);
214 }
215 }
216 }
217}
218
219impl<V: ViewType + ?Sized> StaticArrayBuilder for BinaryViewArrayGenericBuilder<V> {
220 type Array = BinaryViewArrayGeneric<V>;
221
222 fn dtype(&self) -> &ArrowDataType {
223 &self.dtype
224 }
225
226 fn reserve(&mut self, additional: usize) {
227 self.views.reserve(additional);
228 self.validity.reserve(additional);
229 }
230
231 fn freeze(mut self) -> Self::Array {
232 if !self.active_buffer.is_empty() {
234 self.buffer_set[self.active_buffer_idx as usize] = Buffer::from(self.active_buffer);
235 } else if self.buffer_set.last().is_some_and(|b| b.is_empty()) {
236 self.buffer_set.pop();
237 }
238
239 unsafe {
240 BinaryViewArrayGeneric::new_unchecked(
241 self.dtype,
242 Buffer::from(self.views),
243 Buffer::from(self.buffer_set),
244 self.validity.into_opt_validity(),
245 Some(self.total_bytes_len),
246 self.total_buffer_len,
247 )
248 }
249 }
250
251 fn freeze_reset(&mut self) -> Self::Array {
252 if !self.active_buffer.is_empty() {
254 self.buffer_set[self.active_buffer_idx as usize] =
255 Buffer::from(core::mem::take(&mut self.active_buffer));
256 } else if self.buffer_set.last().is_some_and(|b| b.is_empty()) {
257 self.buffer_set.pop();
258 }
259
260 let out = unsafe {
261 BinaryViewArrayGeneric::new_unchecked(
262 self.dtype.clone(),
263 Buffer::from(core::mem::take(&mut self.views)),
264 Buffer::from(core::mem::take(&mut self.buffer_set)),
265 core::mem::take(&mut self.validity).into_opt_validity(),
266 Some(self.total_bytes_len),
267 self.total_buffer_len,
268 )
269 };
270
271 self.total_buffer_len = 0;
272 self.total_bytes_len = 0;
273 self.active_buffer_idx = 0;
274 self.stolen_buffers.clear();
275 self.last_buffer_set_stolen_from = None;
276 out
277 }
278
279 fn len(&self) -> usize {
280 self.views.len()
281 }
282
283 fn extend_nulls(&mut self, length: usize) {
284 self.views.extend_constant(length, View::default());
285 self.validity.extend_constant(length, false);
286 }
287
288 fn subslice_extend(
289 &mut self,
290 other: &Self::Array,
291 start: usize,
292 length: usize,
293 share: ShareStrategy,
294 ) {
295 self.views.reserve(length);
296
297 unsafe {
298 match share {
299 ShareStrategy::Never => {
300 if let Some(v) = other.validity() {
301 for i in start..start + length {
302 if v.get_bit_unchecked(i) {
303 self.push_value_ignore_validity(other.value_unchecked(i));
304 } else {
305 self.views.push(View::default())
306 }
307 }
308 } else {
309 for i in start..start + length {
310 self.push_value_ignore_validity(other.value_unchecked(i));
311 }
312 }
313 },
314 ShareStrategy::Always => {
315 let other_views = &other.views()[start..start + length];
316 self.extend_views_dedup_ignore_validity(
317 other_views.iter().copied(),
318 other.data_buffers(),
319 );
320 },
321 }
322 }
323
324 self.validity
325 .subslice_extend_from_opt_validity(other.validity(), start, length);
326 }
327
328 fn subslice_extend_each_repeated(
329 &mut self,
330 other: &Self::Array,
331 start: usize,
332 length: usize,
333 repeats: usize,
334 share: ShareStrategy,
335 ) {
336 self.views.reserve(length * repeats);
337
338 unsafe {
339 match share {
340 ShareStrategy::Never => {
341 if let Some(v) = other.validity() {
342 for i in start..start + length {
343 if v.get_bit_unchecked(i) {
344 for _ in 0..repeats {
345 self.push_value_ignore_validity(other.value_unchecked(i));
346 }
347 } else {
348 for _ in 0..repeats {
349 self.views.push(View::default())
350 }
351 }
352 }
353 } else {
354 for i in start..start + length {
355 for _ in 0..repeats {
356 self.push_value_ignore_validity(other.value_unchecked(i));
357 }
358 }
359 }
360 },
361 ShareStrategy::Always => {
362 let other_views = &other.views()[start..start + length];
363 self.extend_views_each_repeated_dedup_ignore_validity(
364 other_views.iter().copied(),
365 repeats,
366 other.data_buffers(),
367 );
368 },
369 }
370 }
371
372 self.validity
373 .subslice_extend_each_repeated_from_opt_validity(
374 other.validity(),
375 start,
376 length,
377 repeats,
378 );
379 }
380
381 unsafe fn gather_extend(
382 &mut self,
383 other: &Self::Array,
384 idxs: &[IdxSize],
385 share: ShareStrategy,
386 ) {
387 self.views.reserve(idxs.len());
388
389 unsafe {
390 match share {
391 ShareStrategy::Never => {
392 if let Some(v) = other.validity() {
393 for idx in idxs {
394 if v.get_bit_unchecked(*idx as usize) {
395 self.push_value_ignore_validity(
396 other.value_unchecked(*idx as usize),
397 );
398 } else {
399 self.views.push(View::default())
400 }
401 }
402 } else {
403 for idx in idxs {
404 self.push_value_ignore_validity(other.value_unchecked(*idx as usize));
405 }
406 }
407 },
408 ShareStrategy::Always => {
409 let other_view_slice = other.views().as_slice();
410 let other_views = idxs
411 .iter()
412 .map(|idx| *other_view_slice.get_unchecked(*idx as usize));
413 self.extend_views_dedup_ignore_validity(other_views, other.data_buffers());
414 },
415 }
416 }
417
418 self.validity
419 .gather_extend_from_opt_validity(other.validity(), idxs);
420 }
421
422 fn opt_gather_extend(&mut self, other: &Self::Array, idxs: &[IdxSize], share: ShareStrategy) {
423 self.views.reserve(idxs.len());
424
425 unsafe {
426 match share {
427 ShareStrategy::Never => {
428 if let Some(v) = other.validity() {
429 for idx in idxs {
430 if (*idx as usize) < v.len() && v.get_bit_unchecked(*idx as usize) {
431 self.push_value_ignore_validity(
432 other.value_unchecked(*idx as usize),
433 );
434 } else {
435 self.views.push(View::default())
436 }
437 }
438 } else {
439 for idx in idxs {
440 if (*idx as usize) < other.len() {
441 self.push_value_ignore_validity(
442 other.value_unchecked(*idx as usize),
443 );
444 } else {
445 self.views.push(View::default())
446 }
447 }
448 }
449 },
450 ShareStrategy::Always => {
451 let other_view_slice = other.views().as_slice();
452 let other_views = idxs.iter().map(|idx| {
453 other_view_slice
454 .get(*idx as usize)
455 .copied()
456 .unwrap_or_default()
457 });
458 self.extend_views_dedup_ignore_validity(other_views, other.data_buffers());
459 },
460 }
461 }
462
463 self.validity
464 .opt_gather_extend_from_opt_validity(other.validity(), idxs, other.len());
465 }
466}