datafusion_physical_expr_common/binary_view_map.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`ArrowBytesViewMap`] and [`ArrowBytesViewSet`] for storing maps/sets of values from
19//! `StringViewArray`/`BinaryViewArray`.
20use crate::binary_map::OutputType;
21use arrow::array::NullBufferBuilder;
22use arrow::array::cast::AsArray;
23use arrow::array::{Array, ArrayRef, BinaryViewArray, ByteView, make_view};
24use arrow::buffer::{Buffer, ScalarBuffer};
25use arrow::datatypes::{BinaryViewType, ByteViewType, DataType, StringViewType};
26use datafusion_common::hash_utils::RandomState;
27use datafusion_common::hash_utils::create_hashes;
28use datafusion_common::utils::proxy::{HashTableAllocExt, VecAllocExt};
29use std::fmt::Debug;
30use std::mem::size_of;
31use std::sync::Arc;
32
33/// HashSet optimized for storing string or binary values that can produce that
34/// the final set as a `GenericBinaryViewArray` with minimal copies.
35#[derive(Debug)]
36pub struct ArrowBytesViewSet(ArrowBytesViewMap<()>);
37
38impl ArrowBytesViewSet {
39 pub fn new(output_type: OutputType) -> Self {
40 Self(ArrowBytesViewMap::new(output_type))
41 }
42
43 /// Inserts each value from `values` into the set
44 pub fn insert(&mut self, values: &ArrayRef) {
45 fn make_payload_fn(_value: Option<&[u8]>) {}
46 fn observe_payload_fn(_payload: ()) {}
47 self.0
48 .insert_if_new(values, make_payload_fn, observe_payload_fn);
49 }
50
51 /// Return the contents of this map and replace it with a new empty map with
52 /// the same output type
53 pub fn take(&mut self) -> Self {
54 let mut new_self = Self::new(self.0.output_type);
55 std::mem::swap(self, &mut new_self);
56 new_self
57 }
58
59 /// Converts this set into a `StringViewArray` or `BinaryViewArray`
60 /// containing each distinct value that was interned.
61 /// This is done without copying the values.
62 pub fn into_state(self) -> ArrayRef {
63 self.0.into_state()
64 }
65
66 /// Returns the total number of distinct values (including nulls) seen so far
67 pub fn len(&self) -> usize {
68 self.0.len()
69 }
70
71 pub fn is_empty(&self) -> bool {
72 self.0.is_empty()
73 }
74
75 /// returns the total number of distinct values (not including nulls) seen so far
76 pub fn non_null_len(&self) -> usize {
77 self.0.non_null_len()
78 }
79
80 /// Return the total size, in bytes, of memory used to store the data in
81 /// this set, not including `self`
82 pub fn size(&self) -> usize {
83 self.0.size()
84 }
85}
86
87/// Optimized map for storing Arrow "byte view" types (`StringView`, `BinaryView`)
88/// values that can produce the set of keys on
89/// output as `GenericBinaryViewArray` without copies.
90///
91/// Equivalent to `HashSet<String, V>` but with better performance if you need
92/// to emit the keys as an Arrow `StringViewArray` / `BinaryViewArray`. For other
93/// purposes it is the same as a `HashMap<String, V>`
94///
95/// # Generic Arguments
96///
97/// * `V`: payload type
98///
99/// # Description
100///
101/// This is a specialized HashMap with the following properties:
102///
103/// 1. Optimized for storing and emitting Arrow byte types (e.g.
104/// `StringViewArray` / `BinaryViewArray`) very efficiently by minimizing copying of
105/// the string values themselves, both when inserting and when emitting the
106/// final array.
107///
108/// 2. Retains the insertion order of entries in the final array. The values are
109/// in the same order as they were inserted.
110///
111/// Note this structure can be used as a `HashSet` by specifying the value type
112/// as `()`, as is done by [`ArrowBytesViewSet`].
113///
114/// This map is used by the special `COUNT DISTINCT` aggregate function to
115/// store the distinct values, and by the `GROUP BY` operator to store
116/// group values when they are a single string array.
117/// Max size of the in-progress buffer before flushing to completed buffers
118const BYTE_VIEW_MAX_BLOCK_SIZE: usize = 2 * 1024 * 1024;
119
120pub struct ArrowBytesViewMap<V>
121where
122 V: Debug + PartialEq + Eq + Clone + Copy + Default,
123{
124 /// Should the output be StringView or BinaryView?
125 output_type: OutputType,
126 /// Underlying hash set for each distinct value
127 map: hashbrown::hash_table::HashTable<Entry<V>>,
128 /// Total size of the map in bytes
129 map_size: usize,
130
131 /// Views for all stored values (in insertion order)
132 views: Vec<u128>,
133 /// In-progress buffer for out-of-line string data
134 in_progress: Vec<u8>,
135 /// Completed buffers containing string data
136 completed: Vec<Buffer>,
137 /// Tracks null values (true = null)
138 nulls: NullBufferBuilder,
139
140 /// random state used to generate hashes
141 random_state: RandomState,
142 /// buffer that stores hash values (reused across batches to save allocations)
143 hashes_buffer: Vec<u64>,
144 /// `(payload, null_index)` for the 'null' value, if any
145 /// NOTE null_index is the logical index in the final array, not the index
146 /// in the buffer
147 null: Option<(V, usize)>,
148}
149
150/// The size, in number of entries, of the initial hash table
151const INITIAL_MAP_CAPACITY: usize = 512;
152
153impl<V> ArrowBytesViewMap<V>
154where
155 V: Debug + PartialEq + Eq + Clone + Copy + Default,
156{
157 pub fn new(output_type: OutputType) -> Self {
158 Self {
159 output_type,
160 map: hashbrown::hash_table::HashTable::with_capacity(INITIAL_MAP_CAPACITY),
161 map_size: 0,
162 views: Vec::new(),
163 in_progress: Vec::new(),
164 completed: Vec::new(),
165 nulls: NullBufferBuilder::new(0),
166 random_state: RandomState::default(),
167 hashes_buffer: vec![],
168 null: None,
169 }
170 }
171
172 /// Return the contents of this map and replace it with a new empty map with
173 /// the same output type
174 pub fn take(&mut self) -> Self {
175 let mut new_self = Self::new(self.output_type);
176 std::mem::swap(self, &mut new_self);
177 new_self
178 }
179
180 /// Inserts each value from `values` into the map, invoking `payload_fn` for
181 /// each value if *not* already present, deferring the allocation of the
182 /// payload until it is needed.
183 ///
184 /// Note that this is different than a normal map that would replace the
185 /// existing entry
186 ///
187 /// # Arguments:
188 ///
189 /// `values`: array whose values are inserted
190 ///
191 /// `make_payload_fn`: invoked for each value that is not already present
192 /// to create the payload, in order of the values in `values`
193 ///
194 /// `observe_payload_fn`: invoked once, for each value in `values`, that was
195 /// already present in the map, with corresponding payload value.
196 ///
197 /// # Returns
198 ///
199 /// The payload value for the entry, either the existing value or
200 /// the newly inserted value
201 ///
202 /// # Safety:
203 ///
204 /// Note that `make_payload_fn` and `observe_payload_fn` are only invoked
205 /// with valid values from `values`, not for the `NULL` value.
206 pub fn insert_if_new<MP, OP>(
207 &mut self,
208 values: &ArrayRef,
209 make_payload_fn: MP,
210 observe_payload_fn: OP,
211 ) where
212 MP: FnMut(Option<&[u8]>) -> V,
213 OP: FnMut(V),
214 {
215 // Sanity check array type
216 match self.output_type {
217 OutputType::BinaryView => {
218 assert!(matches!(values.data_type(), DataType::BinaryView));
219 self.insert_if_new_inner::<MP, OP, BinaryViewType>(
220 values,
221 make_payload_fn,
222 observe_payload_fn,
223 )
224 }
225 OutputType::Utf8View => {
226 assert!(matches!(values.data_type(), DataType::Utf8View));
227 self.insert_if_new_inner::<MP, OP, StringViewType>(
228 values,
229 make_payload_fn,
230 observe_payload_fn,
231 )
232 }
233 _ => unreachable!("Utf8/Binary should use `ArrowBytesSet`"),
234 };
235 }
236
237 /// Generic version of [`Self::insert_if_new`] that handles `ByteViewType`
238 /// (both StringView and BinaryView)
239 ///
240 /// Note this is the only function that is generic on [`ByteViewType`], which
241 /// avoids having to template the entire structure, making the code
242 /// simpler and understand and reducing code bloat due to duplication.
243 ///
244 /// See comments on `insert_if_new` for more details
245 fn insert_if_new_inner<MP, OP, B>(
246 &mut self,
247 values: &ArrayRef,
248 mut make_payload_fn: MP,
249 mut observe_payload_fn: OP,
250 ) where
251 MP: FnMut(Option<&[u8]>) -> V,
252 OP: FnMut(V),
253 B: ByteViewType,
254 {
255 // step 1: compute hashes
256 let batch_hashes = &mut self.hashes_buffer;
257 batch_hashes.clear();
258 batch_hashes.resize(values.len(), 0);
259 create_hashes([values], &self.random_state, batch_hashes)
260 // hash is supported for all types and create_hashes only
261 // returns errors for unsupported types
262 .unwrap();
263
264 // step 2: insert each value into the set, if not already present
265 let values = values.as_byte_view::<B>();
266
267 // Get raw views buffer for direct comparison
268 let input_views = values.views();
269
270 // Ensure lengths are equivalent
271 assert_eq!(values.len(), self.hashes_buffer.len());
272
273 for i in 0..values.len() {
274 let view_u128 = input_views[i];
275 let hash = self.hashes_buffer[i];
276
277 // handle null value via validity bitmap check
278 if values.is_null(i) {
279 let payload = if let Some(&(payload, _offset)) = self.null.as_ref() {
280 payload
281 } else {
282 let payload = make_payload_fn(None);
283 let null_index = self.views.len();
284 self.views.push(0);
285 self.nulls.append_null();
286 self.null = Some((payload, null_index));
287 payload
288 };
289 observe_payload_fn(payload);
290 continue;
291 }
292
293 // Extract length from the view (first 4 bytes of u128 in little-endian)
294 let len = view_u128 as u32;
295
296 // Check if value already exists
297 let maybe_payload = {
298 // Borrow completed and in_progress for comparison
299 let completed = &self.completed;
300 let in_progress = &self.in_progress;
301
302 self.map
303 .find(hash, |header| {
304 if header.hash != hash {
305 return false;
306 }
307
308 // Fast path: inline strings can be compared directly
309 if len <= 12 {
310 return header.view == view_u128;
311 }
312
313 // For larger strings: first compare the 4-byte prefix
314 let stored_prefix = (header.view >> 32) as u32;
315 let input_prefix = (view_u128 >> 32) as u32;
316 if stored_prefix != input_prefix {
317 return false;
318 }
319
320 // Prefix matched - compare full bytes
321 let byte_view = ByteView::from(header.view);
322 let stored_len = byte_view.length as usize;
323 let buffer_index = byte_view.buffer_index as usize;
324 let offset = byte_view.offset as usize;
325
326 let stored_value = if buffer_index < completed.len() {
327 &completed[buffer_index].as_slice()
328 [offset..offset + stored_len]
329 } else {
330 &in_progress[offset..offset + stored_len]
331 };
332 let input_value: &[u8] = values.value(i).as_ref();
333 stored_value == input_value
334 })
335 .map(|entry| entry.payload)
336 };
337
338 let payload = if let Some(payload) = maybe_payload {
339 payload
340 } else {
341 // no existing value, make a new one
342 let (new_view, payload) = if len <= 12 {
343 // Inline path: bytes are already packed in view_u128.
344 // The inline ByteView format is [len:u32 LE][data:12 bytes zero-padded],
345 // so extracting bytes from the u128 avoids a round-trip through
346 // values.value(i) (which reads the views buffer and returns the same slice).
347 let view_bytes = view_u128.to_le_bytes();
348 let value = &view_bytes[4..4 + len as usize];
349 let payload = make_payload_fn(Some(value));
350 // For inline strings, the stored view is identical to the input view:
351 // make_view(value, 0, 0) produces the same u128 as view_u128.
352 //
353 // SAFETY: view_u128 was a valid view, and the enclosing `len <= 12`
354 // ensures it is inline
355 let new_view = unsafe { self.append_inline_view(view_u128) };
356 (new_view, payload)
357 } else {
358 let value: &[u8] = values.value(i).as_ref();
359 let payload = make_payload_fn(Some(value));
360 let new_view = self.append_value(value);
361 (new_view, payload)
362 };
363
364 let new_header = Entry {
365 view: new_view,
366 hash,
367 payload,
368 };
369
370 self.map
371 .insert_accounted(new_header, |h| h.hash, &mut self.map_size);
372 payload
373 };
374 observe_payload_fn(payload);
375 }
376 }
377
378 /// Converts this set into a `StringViewArray`, or `BinaryViewArray`,
379 /// containing each distinct value
380 /// that was inserted. This is done without copying the values.
381 ///
382 /// The values are guaranteed to be returned in the same order in which
383 /// they were first seen.
384 pub fn into_state(mut self) -> ArrayRef {
385 // Flush any remaining in-progress buffer
386 if !self.in_progress.is_empty() {
387 let flushed = std::mem::take(&mut self.in_progress);
388 self.completed.push(Buffer::from_vec(flushed));
389 }
390
391 // Build null buffer if we have any nulls
392 let null_buffer = self.nulls.finish();
393
394 let views = ScalarBuffer::from(self.views);
395 let array =
396 unsafe { BinaryViewArray::new_unchecked(views, self.completed, null_buffer) };
397
398 match self.output_type {
399 OutputType::BinaryView => Arc::new(array),
400 OutputType::Utf8View => {
401 // SAFETY: all input was valid utf8
402 let array = unsafe { array.to_string_view_unchecked() };
403 Arc::new(array)
404 }
405 _ => unreachable!("Utf8/Binary should use `ArrowBytesMap`"),
406 }
407 }
408
409 /// Append an already-computed inline view (len <= 12) directly, bypassing
410 /// buffer allocation.
411 ///
412 /// Returns the view that was stored (identical to the argument).
413 ///
414 /// # Safety
415 ///
416 /// `view` must be a valid inline `ByteView`: the length field in the low
417 /// 32 bits must be <= 12, and the remaining 12 bytes must hold the
418 /// value's bytes (zero-padded if shorter). Calling with a non-inline view
419 /// would store a value that downstream `views` consumers interpret as
420 /// `[buffer_index, offset]` into the `completed`/`in_progress` buffers,
421 /// which is unsound for any view that didn't originate from a real
422 /// allocation in those buffers.
423 unsafe fn append_inline_view(&mut self, view: u128) -> u128 {
424 self.views.push(view);
425 self.nulls.append_non_null();
426 view
427 }
428
429 /// Append a value to our buffers and return the view pointing to it
430 fn append_value(&mut self, value: &[u8]) -> u128 {
431 let len = value.len();
432 let view = if len <= 12 {
433 make_view(value, 0, 0)
434 } else {
435 // Ensure buffer is big enough
436 if self.in_progress.len() + len > BYTE_VIEW_MAX_BLOCK_SIZE {
437 let flushed = std::mem::replace(
438 &mut self.in_progress,
439 Vec::with_capacity(BYTE_VIEW_MAX_BLOCK_SIZE),
440 );
441 self.completed.push(Buffer::from_vec(flushed));
442 }
443
444 let buffer_index = self.completed.len() as u32;
445 let offset = self.in_progress.len() as u32;
446 self.in_progress.extend_from_slice(value);
447
448 make_view(value, buffer_index, offset)
449 };
450
451 self.views.push(view);
452 self.nulls.append_non_null();
453 view
454 }
455
456 /// Total number of entries (including null, if present)
457 pub fn len(&self) -> usize {
458 self.non_null_len() + self.null.map(|_| 1).unwrap_or(0)
459 }
460
461 /// Is the set empty?
462 pub fn is_empty(&self) -> bool {
463 self.map.is_empty() && self.null.is_none()
464 }
465
466 /// Number of non null entries
467 pub fn non_null_len(&self) -> usize {
468 self.map.len()
469 }
470
471 /// Return the total size, in bytes, of memory used to store the data in
472 /// this set, not including `self`
473 pub fn size(&self) -> usize {
474 let views_size = self.views.len() * size_of::<u128>();
475 let in_progress_size = self.in_progress.capacity();
476 let completed_size: usize = self.completed.iter().map(|b| b.len()).sum();
477 let nulls_size = self.nulls.allocated_size();
478
479 self.map_size
480 + views_size
481 + in_progress_size
482 + completed_size
483 + nulls_size
484 + self.hashes_buffer.allocated_size()
485 }
486}
487
488impl<V> Debug for ArrowBytesViewMap<V>
489where
490 V: Debug + PartialEq + Eq + Clone + Copy + Default,
491{
492 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
493 f.debug_struct("ArrowBytesMap")
494 .field("map", &"<map>")
495 .field("map_size", &self.map_size)
496 .field("views_len", &self.views.len())
497 .field("completed_buffers", &self.completed.len())
498 .field("random_state", &self.random_state)
499 .field("hashes_buffer", &self.hashes_buffer)
500 .finish()
501 }
502}
503
504/// Entry in the hash table -- see [`ArrowBytesViewMap`] for more details
505///
506/// Stores the view pointing to our internal buffers, eliminating the need
507/// for a separate builder index. For inline strings (<=12 bytes), the view
508/// contains the entire value. For out-of-line strings, the view contains
509/// buffer_index and offset pointing directly to our storage.
510#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)]
511struct Entry<V>
512where
513 V: Debug + PartialEq + Eq + Clone + Copy + Default,
514{
515 /// The u128 view pointing to our internal buffers. For inline strings,
516 /// this contains the complete value. For larger strings, this contains
517 /// the buffer_index/offset into our completed/in_progress buffers.
518 view: u128,
519
520 hash: u64,
521
522 /// value stored by the entry
523 payload: V,
524}
525
526#[cfg(test)]
527mod tests {
528 use arrow::array::{GenericByteViewArray, StringViewArray};
529 use datafusion_common::HashMap;
530
531 use super::*;
532
533 // asserts that the set contains the expected strings, in the same order
534 fn assert_set(set: ArrowBytesViewSet, expected: &[Option<&str>]) {
535 let strings = set.into_state();
536 let strings = strings.as_string_view();
537 let state = strings.into_iter().collect::<Vec<_>>();
538 assert_eq!(state, expected);
539 }
540
541 #[test]
542 fn string_view_set_empty() {
543 let mut set = ArrowBytesViewSet::new(OutputType::Utf8View);
544 let array: ArrayRef = Arc::new(StringViewArray::new_null(0));
545 set.insert(&array);
546 assert_eq!(set.len(), 0);
547 assert_eq!(set.non_null_len(), 0);
548 assert_set(set, &[]);
549 }
550
551 #[test]
552 fn string_view_set_one_null() {
553 let mut set = ArrowBytesViewSet::new(OutputType::Utf8View);
554 let array: ArrayRef = Arc::new(StringViewArray::new_null(1));
555 set.insert(&array);
556 assert_eq!(set.len(), 1);
557 assert_eq!(set.non_null_len(), 0);
558 assert_set(set, &[None]);
559 }
560
561 #[test]
562 fn string_view_set_many_null() {
563 let mut set = ArrowBytesViewSet::new(OutputType::Utf8View);
564 let array: ArrayRef = Arc::new(StringViewArray::new_null(11));
565 set.insert(&array);
566 assert_eq!(set.len(), 1);
567 assert_eq!(set.non_null_len(), 0);
568 assert_set(set, &[None]);
569 }
570
571 #[test]
572 fn test_string_view_set_basic() {
573 // basic test for mixed small and large string values
574 let values = GenericByteViewArray::from(vec![
575 Some("a"),
576 Some("b"),
577 Some("CXCCCCCCCCAABB"), // 14 bytes
578 Some(""),
579 Some("cbcxx"), // 5 bytes
580 None,
581 Some("AAAAAAAA"), // 8 bytes
582 Some("BBBBBQBBBAAA"), // 12 bytes
583 Some("a"),
584 Some("cbcxx"),
585 Some("b"),
586 Some("cbcxx"),
587 Some(""),
588 None,
589 Some("BBBBBQBBBAAA"),
590 Some("BBBBBQBBBAAA"),
591 Some("AAAAAAAA"),
592 Some("CXCCCCCCCCAABB"),
593 ]);
594
595 let mut set = ArrowBytesViewSet::new(OutputType::Utf8View);
596 let array: ArrayRef = Arc::new(values);
597 set.insert(&array);
598 // values mut appear be in the order they were inserted
599 assert_set(
600 set,
601 &[
602 Some("a"),
603 Some("b"),
604 Some("CXCCCCCCCCAABB"),
605 Some(""),
606 Some("cbcxx"),
607 None,
608 Some("AAAAAAAA"),
609 Some("BBBBBQBBBAAA"),
610 ],
611 );
612 }
613
614 #[test]
615 fn test_string_set_non_utf8() {
616 // basic test for mixed small and large string values
617 let values = GenericByteViewArray::from(vec![
618 Some("a"),
619 Some("✨🔥✨🔥✨🔥✨🔥✨🔥✨🔥✨🔥✨🔥"),
620 Some("🔥"),
621 Some("✨✨✨"),
622 Some("foobarbaz"),
623 Some("🔥"),
624 Some("✨🔥✨🔥✨🔥✨🔥✨🔥✨🔥✨🔥✨🔥"),
625 ]);
626
627 let mut set = ArrowBytesViewSet::new(OutputType::Utf8View);
628 let array: ArrayRef = Arc::new(values);
629 set.insert(&array);
630 // strings mut appear be in the order they were inserted
631 assert_set(
632 set,
633 &[
634 Some("a"),
635 Some("✨🔥✨🔥✨🔥✨🔥✨🔥✨🔥✨🔥✨🔥"),
636 Some("🔥"),
637 Some("✨✨✨"),
638 Some("foobarbaz"),
639 ],
640 );
641 }
642
643 // Test use of binary output type
644 #[test]
645 fn test_binary_set() {
646 let v: Vec<Option<&[u8]>> = vec![
647 Some(b"a"),
648 Some(b"CXCCCCCCCCCCCCC"),
649 None,
650 Some(b"CXCCCCCCCCCCCCC"),
651 ];
652 let values: ArrayRef = Arc::new(BinaryViewArray::from(v));
653
654 let expected: Vec<Option<&[u8]>> =
655 vec![Some(b"a"), Some(b"CXCCCCCCCCCCCCC"), None];
656 let expected: ArrayRef = Arc::new(GenericByteViewArray::from(expected));
657
658 let mut set = ArrowBytesViewSet::new(OutputType::BinaryView);
659 set.insert(&values);
660 assert_eq!(&set.into_state(), &expected);
661 }
662
663 // inserting strings into the set does not increase reported memory
664 #[test]
665 fn test_string_set_memory_usage() {
666 let strings1 = StringViewArray::from(vec![
667 Some("a"),
668 Some("b"),
669 Some("CXCCCCCCCCCCC"), // 13 bytes
670 Some("AAAAAAAA"), // 8 bytes
671 Some("BBBBBQBBB"), // 9 bytes
672 ]);
673 let total_strings1_len = strings1
674 .iter()
675 .map(|s| s.map(|s| s.len()).unwrap_or(0))
676 .sum::<usize>();
677 let values1: ArrayRef = Arc::new(StringViewArray::from(strings1));
678
679 // Much larger strings in strings2
680 let strings2 = StringViewArray::from(vec![
681 "FOO".repeat(1000),
682 "BAR larger than 12 bytes.".repeat(100_000),
683 "more unique.".repeat(1000),
684 "more unique2.".repeat(1000),
685 "FOO".repeat(3000),
686 ]);
687 let total_strings2_len = strings2
688 .iter()
689 .map(|s| s.map(|s| s.len()).unwrap_or(0))
690 .sum::<usize>();
691 let values2: ArrayRef = Arc::new(StringViewArray::from(strings2));
692
693 let mut set = ArrowBytesViewSet::new(OutputType::Utf8View);
694 let size_empty = set.size();
695
696 set.insert(&values1);
697 let size_after_values1 = set.size();
698 assert!(size_empty < size_after_values1);
699 assert!(
700 size_after_values1 > total_strings1_len,
701 "expect {size_after_values1} to be more than {total_strings1_len}"
702 );
703 assert!(size_after_values1 < total_strings1_len + total_strings2_len);
704
705 // inserting the same strings should not affect the size
706 set.insert(&values1);
707 assert_eq!(set.size(), size_after_values1);
708 assert_eq!(set.len(), 5);
709
710 // inserting the large strings should increase the reported size
711 set.insert(&values2);
712 let size_after_values2 = set.size();
713 assert!(size_after_values2 > size_after_values1);
714
715 assert_eq!(set.len(), 10);
716 }
717
718 #[derive(Debug, PartialEq, Eq, Default, Clone, Copy)]
719 struct TestPayload {
720 // store the string value to check against input
721 index: usize, // store the index of the string (each new string gets the next sequential input)
722 }
723
724 /// Wraps an [`ArrowBytesViewMap`], validating its invariants
725 struct TestMap {
726 map: ArrowBytesViewMap<TestPayload>,
727 // stores distinct strings seen, in order
728 strings: Vec<Option<String>>,
729 // map strings to index in strings
730 indexes: HashMap<Option<String>, usize>,
731 }
732
733 impl TestMap {
734 /// creates a map with TestPayloads for the given strings and then
735 /// validates the payloads
736 fn new() -> Self {
737 Self {
738 map: ArrowBytesViewMap::new(OutputType::Utf8View),
739 strings: vec![],
740 indexes: HashMap::new(),
741 }
742 }
743
744 /// Inserts strings into the map
745 fn insert(&mut self, strings: &[Option<&str>]) {
746 let string_array = StringViewArray::from(strings.to_vec());
747 let arr: ArrayRef = Arc::new(string_array);
748
749 let mut next_index = self.indexes.len();
750 let mut actual_new_strings = vec![];
751 let mut actual_seen_indexes = vec![];
752 // update self with new values, keeping track of newly added values
753 for str in strings {
754 let str = str.map(|s| s.to_string());
755 let index = self.indexes.get(&str).cloned().unwrap_or_else(|| {
756 actual_new_strings.push(str.clone());
757 let index = self.strings.len();
758 self.strings.push(str.clone());
759 self.indexes.insert(str, index);
760 index
761 });
762 actual_seen_indexes.push(index);
763 }
764
765 // insert the values into the map, recording what we did
766 let mut seen_new_strings = vec![];
767 let mut seen_indexes = vec![];
768 self.map.insert_if_new(
769 &arr,
770 |s| {
771 let value = s
772 .map(|s| String::from_utf8(s.to_vec()).expect("Non utf8 string"));
773 let index = next_index;
774 next_index += 1;
775 seen_new_strings.push(value);
776 TestPayload { index }
777 },
778 |payload| {
779 seen_indexes.push(payload.index);
780 },
781 );
782
783 assert_eq!(actual_seen_indexes, seen_indexes);
784 assert_eq!(actual_new_strings, seen_new_strings);
785 }
786
787 /// Call `self.map.into_array()` validating that the strings are in the same
788 /// order as they were inserted
789 fn into_array(self) -> ArrayRef {
790 let Self {
791 map,
792 strings,
793 indexes: _,
794 } = self;
795
796 let arr = map.into_state();
797 let expected: ArrayRef = Arc::new(StringViewArray::from(strings));
798 assert_eq!(&arr, &expected);
799 arr
800 }
801 }
802
803 #[test]
804 fn test_map() {
805 let input = vec![
806 // Note mix of short/long strings
807 Some("A"),
808 Some("bcdefghijklmnop1234567"),
809 Some("X"),
810 Some("Y"),
811 None,
812 Some("qrstuvqxyzhjwya"),
813 Some("✨🔥"),
814 Some("🔥"),
815 Some("🔥🔥🔥🔥🔥🔥"),
816 ];
817
818 let mut test_map = TestMap::new();
819 test_map.insert(&input);
820 test_map.insert(&input); // put it in twice
821 let expected_output: ArrayRef = Arc::new(StringViewArray::from(input));
822 assert_eq!(&test_map.into_array(), &expected_output);
823 }
824}