Skip to main content

datafusion_physical_expr_common/
binary_view_map.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`ArrowBytesViewMap`] and [`ArrowBytesViewSet`] for storing maps/sets of values from
19//! `StringViewArray`/`BinaryViewArray`.
20use crate::binary_map::OutputType;
21use arrow::array::NullBufferBuilder;
22use arrow::array::cast::AsArray;
23use arrow::array::{Array, ArrayRef, BinaryViewArray, ByteView, make_view};
24use arrow::buffer::{Buffer, ScalarBuffer};
25use arrow::datatypes::{BinaryViewType, ByteViewType, DataType, StringViewType};
26use datafusion_common::hash_utils::RandomState;
27use datafusion_common::hash_utils::create_hashes;
28use datafusion_common::utils::proxy::{HashTableAllocExt, VecAllocExt};
29use std::fmt::Debug;
30use std::mem::size_of;
31use std::sync::Arc;
32
33/// HashSet optimized for storing string or binary values that can produce that
34/// the final set as a `GenericBinaryViewArray` with minimal copies.
35#[derive(Debug)]
36pub struct ArrowBytesViewSet(ArrowBytesViewMap<()>);
37
38impl ArrowBytesViewSet {
39    pub fn new(output_type: OutputType) -> Self {
40        Self(ArrowBytesViewMap::new(output_type))
41    }
42
43    /// Inserts each value from `values` into the set
44    pub fn insert(&mut self, values: &ArrayRef) {
45        fn make_payload_fn(_value: Option<&[u8]>) {}
46        fn observe_payload_fn(_payload: ()) {}
47        self.0
48            .insert_if_new(values, make_payload_fn, observe_payload_fn);
49    }
50
51    /// Return the contents of this map and replace it with a new empty map with
52    /// the same output type
53    pub fn take(&mut self) -> Self {
54        let mut new_self = Self::new(self.0.output_type);
55        std::mem::swap(self, &mut new_self);
56        new_self
57    }
58
59    /// Converts this set into a `StringViewArray` or `BinaryViewArray`
60    /// containing each distinct value that was interned.
61    /// This is done without copying the values.
62    pub fn into_state(self) -> ArrayRef {
63        self.0.into_state()
64    }
65
66    /// Returns the total number of distinct values (including nulls) seen so far
67    pub fn len(&self) -> usize {
68        self.0.len()
69    }
70
71    pub fn is_empty(&self) -> bool {
72        self.0.is_empty()
73    }
74
75    /// returns the total number of distinct values (not including nulls) seen so far
76    pub fn non_null_len(&self) -> usize {
77        self.0.non_null_len()
78    }
79
80    /// Return the total size, in bytes, of memory used to store the data in
81    /// this set, not including `self`
82    pub fn size(&self) -> usize {
83        self.0.size()
84    }
85}
86
87/// Optimized map for storing Arrow "byte view" types (`StringView`, `BinaryView`)
88/// values that can produce the set of keys on
89/// output as `GenericBinaryViewArray` without copies.
90///
91/// Equivalent to `HashSet<String, V>` but with better performance if you need
92/// to emit the keys as an Arrow `StringViewArray` / `BinaryViewArray`. For other
93/// purposes it is the same as a `HashMap<String, V>`
94///
95/// # Generic Arguments
96///
97/// * `V`: payload type
98///
99/// # Description
100///
101/// This is a specialized HashMap with the following properties:
102///
103/// 1. Optimized for storing and emitting Arrow byte types  (e.g.
104///    `StringViewArray` / `BinaryViewArray`) very efficiently by minimizing copying of
105///    the string values themselves, both when inserting and when emitting the
106///    final array.
107///
108/// 2. Retains the insertion order of entries in the final array. The values are
109///    in the same order as they were inserted.
110///
111/// Note this structure can be used as a `HashSet` by specifying the value type
112/// as `()`, as is done by [`ArrowBytesViewSet`].
113///
114/// This map is used by the special `COUNT DISTINCT` aggregate function to
115/// store the distinct values, and by the `GROUP BY` operator to store
116/// group values when they are a single string array.
117/// Max size of the in-progress buffer before flushing to completed buffers
118const BYTE_VIEW_MAX_BLOCK_SIZE: usize = 2 * 1024 * 1024;
119
120pub struct ArrowBytesViewMap<V>
121where
122    V: Debug + PartialEq + Eq + Clone + Copy + Default,
123{
124    /// Should the output be StringView or BinaryView?
125    output_type: OutputType,
126    /// Underlying hash set for each distinct value
127    map: hashbrown::hash_table::HashTable<Entry<V>>,
128    /// Total size of the map in bytes
129    map_size: usize,
130
131    /// Views for all stored values (in insertion order)
132    views: Vec<u128>,
133    /// In-progress buffer for out-of-line string data
134    in_progress: Vec<u8>,
135    /// Completed buffers containing string data
136    completed: Vec<Buffer>,
137    /// Tracks null values (true = null)
138    nulls: NullBufferBuilder,
139
140    /// random state used to generate hashes
141    random_state: RandomState,
142    /// buffer that stores hash values (reused across batches to save allocations)
143    hashes_buffer: Vec<u64>,
144    /// `(payload, null_index)` for the 'null' value, if any
145    /// NOTE null_index is the logical index in the final array, not the index
146    /// in the buffer
147    null: Option<(V, usize)>,
148}
149
150/// The size, in number of entries, of the initial hash table
151const INITIAL_MAP_CAPACITY: usize = 512;
152
153impl<V> ArrowBytesViewMap<V>
154where
155    V: Debug + PartialEq + Eq + Clone + Copy + Default,
156{
157    pub fn new(output_type: OutputType) -> Self {
158        Self {
159            output_type,
160            map: hashbrown::hash_table::HashTable::with_capacity(INITIAL_MAP_CAPACITY),
161            map_size: 0,
162            views: Vec::new(),
163            in_progress: Vec::new(),
164            completed: Vec::new(),
165            nulls: NullBufferBuilder::new(0),
166            random_state: RandomState::default(),
167            hashes_buffer: vec![],
168            null: None,
169        }
170    }
171
172    /// Return the contents of this map and replace it with a new empty map with
173    /// the same output type
174    pub fn take(&mut self) -> Self {
175        let mut new_self = Self::new(self.output_type);
176        std::mem::swap(self, &mut new_self);
177        new_self
178    }
179
180    /// Inserts each value from `values` into the map, invoking `payload_fn` for
181    /// each value if *not* already present, deferring the allocation of the
182    /// payload until it is needed.
183    ///
184    /// Note that this is different than a normal map that would replace the
185    /// existing entry
186    ///
187    /// # Arguments:
188    ///
189    /// `values`: array whose values are inserted
190    ///
191    /// `make_payload_fn`:  invoked for each value that is not already present
192    /// to create the payload, in order of the values in `values`
193    ///
194    /// `observe_payload_fn`: invoked once, for each value in `values`, that was
195    /// already present in the map, with corresponding payload value.
196    ///
197    /// # Returns
198    ///
199    /// The payload value for the entry, either the existing value or
200    /// the newly inserted value
201    ///
202    /// # Safety:
203    ///
204    /// Note that `make_payload_fn` and `observe_payload_fn` are only invoked
205    /// with valid values from `values`, not for the `NULL` value.
206    pub fn insert_if_new<MP, OP>(
207        &mut self,
208        values: &ArrayRef,
209        make_payload_fn: MP,
210        observe_payload_fn: OP,
211    ) where
212        MP: FnMut(Option<&[u8]>) -> V,
213        OP: FnMut(V),
214    {
215        // Sanity check array type
216        match self.output_type {
217            OutputType::BinaryView => {
218                assert!(matches!(values.data_type(), DataType::BinaryView));
219                self.insert_if_new_inner::<MP, OP, BinaryViewType>(
220                    values,
221                    make_payload_fn,
222                    observe_payload_fn,
223                )
224            }
225            OutputType::Utf8View => {
226                assert!(matches!(values.data_type(), DataType::Utf8View));
227                self.insert_if_new_inner::<MP, OP, StringViewType>(
228                    values,
229                    make_payload_fn,
230                    observe_payload_fn,
231                )
232            }
233            _ => unreachable!("Utf8/Binary should use `ArrowBytesSet`"),
234        };
235    }
236
237    /// Generic version of [`Self::insert_if_new`] that handles `ByteViewType`
238    /// (both StringView and BinaryView)
239    ///
240    /// Note this is the only function that is generic on [`ByteViewType`], which
241    /// avoids having to template the entire structure,  making the code
242    /// simpler and understand and reducing code bloat due to duplication.
243    ///
244    /// See comments on `insert_if_new` for more details
245    fn insert_if_new_inner<MP, OP, B>(
246        &mut self,
247        values: &ArrayRef,
248        mut make_payload_fn: MP,
249        mut observe_payload_fn: OP,
250    ) where
251        MP: FnMut(Option<&[u8]>) -> V,
252        OP: FnMut(V),
253        B: ByteViewType,
254    {
255        // step 1: compute hashes
256        let batch_hashes = &mut self.hashes_buffer;
257        batch_hashes.clear();
258        batch_hashes.resize(values.len(), 0);
259        create_hashes([values], &self.random_state, batch_hashes)
260            // hash is supported for all types and create_hashes only
261            // returns errors for unsupported types
262            .unwrap();
263
264        // step 2: insert each value into the set, if not already present
265        let values = values.as_byte_view::<B>();
266
267        // Get raw views buffer for direct comparison
268        let input_views = values.views();
269
270        // Ensure lengths are equivalent
271        assert_eq!(values.len(), self.hashes_buffer.len());
272
273        for i in 0..values.len() {
274            let view_u128 = input_views[i];
275            let hash = self.hashes_buffer[i];
276
277            // handle null value via validity bitmap check
278            if values.is_null(i) {
279                let payload = if let Some(&(payload, _offset)) = self.null.as_ref() {
280                    payload
281                } else {
282                    let payload = make_payload_fn(None);
283                    let null_index = self.views.len();
284                    self.views.push(0);
285                    self.nulls.append_null();
286                    self.null = Some((payload, null_index));
287                    payload
288                };
289                observe_payload_fn(payload);
290                continue;
291            }
292
293            // Extract length from the view (first 4 bytes of u128 in little-endian)
294            let len = view_u128 as u32;
295
296            // Check if value already exists
297            let maybe_payload = {
298                // Borrow completed and in_progress for comparison
299                let completed = &self.completed;
300                let in_progress = &self.in_progress;
301
302                self.map
303                    .find(hash, |header| {
304                        if header.hash != hash {
305                            return false;
306                        }
307
308                        // Fast path: inline strings can be compared directly
309                        if len <= 12 {
310                            return header.view == view_u128;
311                        }
312
313                        // For larger strings: first compare the 4-byte prefix
314                        let stored_prefix = (header.view >> 32) as u32;
315                        let input_prefix = (view_u128 >> 32) as u32;
316                        if stored_prefix != input_prefix {
317                            return false;
318                        }
319
320                        // Prefix matched - compare full bytes
321                        let byte_view = ByteView::from(header.view);
322                        let stored_len = byte_view.length as usize;
323                        let buffer_index = byte_view.buffer_index as usize;
324                        let offset = byte_view.offset as usize;
325
326                        let stored_value = if buffer_index < completed.len() {
327                            &completed[buffer_index].as_slice()
328                                [offset..offset + stored_len]
329                        } else {
330                            &in_progress[offset..offset + stored_len]
331                        };
332                        let input_value: &[u8] = values.value(i).as_ref();
333                        stored_value == input_value
334                    })
335                    .map(|entry| entry.payload)
336            };
337
338            let payload = if let Some(payload) = maybe_payload {
339                payload
340            } else {
341                // no existing value, make a new one
342                let (new_view, payload) = if len <= 12 {
343                    // Inline path: bytes are already packed in view_u128.
344                    // The inline ByteView format is [len:u32 LE][data:12 bytes zero-padded],
345                    // so extracting bytes from the u128 avoids a round-trip through
346                    // values.value(i) (which reads the views buffer and returns the same slice).
347                    let view_bytes = view_u128.to_le_bytes();
348                    let value = &view_bytes[4..4 + len as usize];
349                    let payload = make_payload_fn(Some(value));
350                    // For inline strings, the stored view is identical to the input view:
351                    // make_view(value, 0, 0) produces the same u128 as view_u128.
352                    //
353                    // SAFETY: view_u128 was a valid view, and the enclosing `len <= 12`
354                    // ensures it is inline
355                    let new_view = unsafe { self.append_inline_view(view_u128) };
356                    (new_view, payload)
357                } else {
358                    let value: &[u8] = values.value(i).as_ref();
359                    let payload = make_payload_fn(Some(value));
360                    let new_view = self.append_value(value);
361                    (new_view, payload)
362                };
363
364                let new_header = Entry {
365                    view: new_view,
366                    hash,
367                    payload,
368                };
369
370                self.map
371                    .insert_accounted(new_header, |h| h.hash, &mut self.map_size);
372                payload
373            };
374            observe_payload_fn(payload);
375        }
376    }
377
378    /// Converts this set into a `StringViewArray`, or `BinaryViewArray`,
379    /// containing each distinct value
380    /// that was inserted. This is done without copying the values.
381    ///
382    /// The values are guaranteed to be returned in the same order in which
383    /// they were first seen.
384    pub fn into_state(mut self) -> ArrayRef {
385        // Flush any remaining in-progress buffer
386        if !self.in_progress.is_empty() {
387            let flushed = std::mem::take(&mut self.in_progress);
388            self.completed.push(Buffer::from_vec(flushed));
389        }
390
391        // Build null buffer if we have any nulls
392        let null_buffer = self.nulls.finish();
393
394        let views = ScalarBuffer::from(self.views);
395        let array =
396            unsafe { BinaryViewArray::new_unchecked(views, self.completed, null_buffer) };
397
398        match self.output_type {
399            OutputType::BinaryView => Arc::new(array),
400            OutputType::Utf8View => {
401                // SAFETY: all input was valid utf8
402                let array = unsafe { array.to_string_view_unchecked() };
403                Arc::new(array)
404            }
405            _ => unreachable!("Utf8/Binary should use `ArrowBytesMap`"),
406        }
407    }
408
409    /// Append an already-computed inline view (len <= 12) directly, bypassing
410    /// buffer allocation.
411    ///
412    /// Returns the view that was stored (identical to the argument).
413    ///
414    /// # Safety
415    ///
416    /// `view` must be a valid inline `ByteView`: the length field in the low
417    /// 32 bits must be <= 12, and the remaining 12 bytes must hold the
418    /// value's bytes (zero-padded if shorter). Calling with a non-inline view
419    /// would store a value that downstream `views` consumers interpret as
420    /// `[buffer_index, offset]` into the `completed`/`in_progress` buffers,
421    /// which is unsound for any view that didn't originate from a real
422    /// allocation in those buffers.
423    unsafe fn append_inline_view(&mut self, view: u128) -> u128 {
424        self.views.push(view);
425        self.nulls.append_non_null();
426        view
427    }
428
429    /// Append a value to our buffers and return the view pointing to it
430    fn append_value(&mut self, value: &[u8]) -> u128 {
431        let len = value.len();
432        let view = if len <= 12 {
433            make_view(value, 0, 0)
434        } else {
435            // Ensure buffer is big enough
436            if self.in_progress.len() + len > BYTE_VIEW_MAX_BLOCK_SIZE {
437                let flushed = std::mem::replace(
438                    &mut self.in_progress,
439                    Vec::with_capacity(BYTE_VIEW_MAX_BLOCK_SIZE),
440                );
441                self.completed.push(Buffer::from_vec(flushed));
442            }
443
444            let buffer_index = self.completed.len() as u32;
445            let offset = self.in_progress.len() as u32;
446            self.in_progress.extend_from_slice(value);
447
448            make_view(value, buffer_index, offset)
449        };
450
451        self.views.push(view);
452        self.nulls.append_non_null();
453        view
454    }
455
456    /// Total number of entries (including null, if present)
457    pub fn len(&self) -> usize {
458        self.non_null_len() + self.null.map(|_| 1).unwrap_or(0)
459    }
460
461    /// Is the set empty?
462    pub fn is_empty(&self) -> bool {
463        self.map.is_empty() && self.null.is_none()
464    }
465
466    /// Number of non null entries
467    pub fn non_null_len(&self) -> usize {
468        self.map.len()
469    }
470
471    /// Return the total size, in bytes, of memory used to store the data in
472    /// this set, not including `self`
473    pub fn size(&self) -> usize {
474        let views_size = self.views.len() * size_of::<u128>();
475        let in_progress_size = self.in_progress.capacity();
476        let completed_size: usize = self.completed.iter().map(|b| b.len()).sum();
477        let nulls_size = self.nulls.allocated_size();
478
479        self.map_size
480            + views_size
481            + in_progress_size
482            + completed_size
483            + nulls_size
484            + self.hashes_buffer.allocated_size()
485    }
486}
487
488impl<V> Debug for ArrowBytesViewMap<V>
489where
490    V: Debug + PartialEq + Eq + Clone + Copy + Default,
491{
492    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
493        f.debug_struct("ArrowBytesMap")
494            .field("map", &"<map>")
495            .field("map_size", &self.map_size)
496            .field("views_len", &self.views.len())
497            .field("completed_buffers", &self.completed.len())
498            .field("random_state", &self.random_state)
499            .field("hashes_buffer", &self.hashes_buffer)
500            .finish()
501    }
502}
503
504/// Entry in the hash table -- see [`ArrowBytesViewMap`] for more details
505///
506/// Stores the view pointing to our internal buffers, eliminating the need
507/// for a separate builder index. For inline strings (<=12 bytes), the view
508/// contains the entire value. For out-of-line strings, the view contains
509/// buffer_index and offset pointing directly to our storage.
510#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)]
511struct Entry<V>
512where
513    V: Debug + PartialEq + Eq + Clone + Copy + Default,
514{
515    /// The u128 view pointing to our internal buffers. For inline strings,
516    /// this contains the complete value. For larger strings, this contains
517    /// the buffer_index/offset into our completed/in_progress buffers.
518    view: u128,
519
520    hash: u64,
521
522    /// value stored by the entry
523    payload: V,
524}
525
526#[cfg(test)]
527mod tests {
528    use arrow::array::{GenericByteViewArray, StringViewArray};
529    use datafusion_common::HashMap;
530
531    use super::*;
532
533    // asserts that the set contains the expected strings, in the same order
534    fn assert_set(set: ArrowBytesViewSet, expected: &[Option<&str>]) {
535        let strings = set.into_state();
536        let strings = strings.as_string_view();
537        let state = strings.into_iter().collect::<Vec<_>>();
538        assert_eq!(state, expected);
539    }
540
541    #[test]
542    fn string_view_set_empty() {
543        let mut set = ArrowBytesViewSet::new(OutputType::Utf8View);
544        let array: ArrayRef = Arc::new(StringViewArray::new_null(0));
545        set.insert(&array);
546        assert_eq!(set.len(), 0);
547        assert_eq!(set.non_null_len(), 0);
548        assert_set(set, &[]);
549    }
550
551    #[test]
552    fn string_view_set_one_null() {
553        let mut set = ArrowBytesViewSet::new(OutputType::Utf8View);
554        let array: ArrayRef = Arc::new(StringViewArray::new_null(1));
555        set.insert(&array);
556        assert_eq!(set.len(), 1);
557        assert_eq!(set.non_null_len(), 0);
558        assert_set(set, &[None]);
559    }
560
561    #[test]
562    fn string_view_set_many_null() {
563        let mut set = ArrowBytesViewSet::new(OutputType::Utf8View);
564        let array: ArrayRef = Arc::new(StringViewArray::new_null(11));
565        set.insert(&array);
566        assert_eq!(set.len(), 1);
567        assert_eq!(set.non_null_len(), 0);
568        assert_set(set, &[None]);
569    }
570
571    #[test]
572    fn test_string_view_set_basic() {
573        // basic test for mixed small and large string values
574        let values = GenericByteViewArray::from(vec![
575            Some("a"),
576            Some("b"),
577            Some("CXCCCCCCCCAABB"), // 14 bytes
578            Some(""),
579            Some("cbcxx"), // 5 bytes
580            None,
581            Some("AAAAAAAA"),     // 8 bytes
582            Some("BBBBBQBBBAAA"), // 12 bytes
583            Some("a"),
584            Some("cbcxx"),
585            Some("b"),
586            Some("cbcxx"),
587            Some(""),
588            None,
589            Some("BBBBBQBBBAAA"),
590            Some("BBBBBQBBBAAA"),
591            Some("AAAAAAAA"),
592            Some("CXCCCCCCCCAABB"),
593        ]);
594
595        let mut set = ArrowBytesViewSet::new(OutputType::Utf8View);
596        let array: ArrayRef = Arc::new(values);
597        set.insert(&array);
598        // values mut appear be in the order they were inserted
599        assert_set(
600            set,
601            &[
602                Some("a"),
603                Some("b"),
604                Some("CXCCCCCCCCAABB"),
605                Some(""),
606                Some("cbcxx"),
607                None,
608                Some("AAAAAAAA"),
609                Some("BBBBBQBBBAAA"),
610            ],
611        );
612    }
613
614    #[test]
615    fn test_string_set_non_utf8() {
616        // basic test for mixed small and large string values
617        let values = GenericByteViewArray::from(vec![
618            Some("a"),
619            Some("✨🔥✨🔥✨🔥✨🔥✨🔥✨🔥✨🔥✨🔥"),
620            Some("🔥"),
621            Some("✨✨✨"),
622            Some("foobarbaz"),
623            Some("🔥"),
624            Some("✨🔥✨🔥✨🔥✨🔥✨🔥✨🔥✨🔥✨🔥"),
625        ]);
626
627        let mut set = ArrowBytesViewSet::new(OutputType::Utf8View);
628        let array: ArrayRef = Arc::new(values);
629        set.insert(&array);
630        // strings mut appear be in the order they were inserted
631        assert_set(
632            set,
633            &[
634                Some("a"),
635                Some("✨🔥✨🔥✨🔥✨🔥✨🔥✨🔥✨🔥✨🔥"),
636                Some("🔥"),
637                Some("✨✨✨"),
638                Some("foobarbaz"),
639            ],
640        );
641    }
642
643    // Test use of binary output type
644    #[test]
645    fn test_binary_set() {
646        let v: Vec<Option<&[u8]>> = vec![
647            Some(b"a"),
648            Some(b"CXCCCCCCCCCCCCC"),
649            None,
650            Some(b"CXCCCCCCCCCCCCC"),
651        ];
652        let values: ArrayRef = Arc::new(BinaryViewArray::from(v));
653
654        let expected: Vec<Option<&[u8]>> =
655            vec![Some(b"a"), Some(b"CXCCCCCCCCCCCCC"), None];
656        let expected: ArrayRef = Arc::new(GenericByteViewArray::from(expected));
657
658        let mut set = ArrowBytesViewSet::new(OutputType::BinaryView);
659        set.insert(&values);
660        assert_eq!(&set.into_state(), &expected);
661    }
662
663    // inserting strings into the set does not increase reported memory
664    #[test]
665    fn test_string_set_memory_usage() {
666        let strings1 = StringViewArray::from(vec![
667            Some("a"),
668            Some("b"),
669            Some("CXCCCCCCCCCCC"), // 13 bytes
670            Some("AAAAAAAA"),      // 8 bytes
671            Some("BBBBBQBBB"),     // 9 bytes
672        ]);
673        let total_strings1_len = strings1
674            .iter()
675            .map(|s| s.map(|s| s.len()).unwrap_or(0))
676            .sum::<usize>();
677        let values1: ArrayRef = Arc::new(StringViewArray::from(strings1));
678
679        // Much larger strings in strings2
680        let strings2 = StringViewArray::from(vec![
681            "FOO".repeat(1000),
682            "BAR larger than 12 bytes.".repeat(100_000),
683            "more unique.".repeat(1000),
684            "more unique2.".repeat(1000),
685            "FOO".repeat(3000),
686        ]);
687        let total_strings2_len = strings2
688            .iter()
689            .map(|s| s.map(|s| s.len()).unwrap_or(0))
690            .sum::<usize>();
691        let values2: ArrayRef = Arc::new(StringViewArray::from(strings2));
692
693        let mut set = ArrowBytesViewSet::new(OutputType::Utf8View);
694        let size_empty = set.size();
695
696        set.insert(&values1);
697        let size_after_values1 = set.size();
698        assert!(size_empty < size_after_values1);
699        assert!(
700            size_after_values1 > total_strings1_len,
701            "expect {size_after_values1} to be more than {total_strings1_len}"
702        );
703        assert!(size_after_values1 < total_strings1_len + total_strings2_len);
704
705        // inserting the same strings should not affect the size
706        set.insert(&values1);
707        assert_eq!(set.size(), size_after_values1);
708        assert_eq!(set.len(), 5);
709
710        // inserting the large strings should increase the reported size
711        set.insert(&values2);
712        let size_after_values2 = set.size();
713        assert!(size_after_values2 > size_after_values1);
714
715        assert_eq!(set.len(), 10);
716    }
717
718    #[derive(Debug, PartialEq, Eq, Default, Clone, Copy)]
719    struct TestPayload {
720        // store the string value to check against input
721        index: usize, // store the index of the string (each new string gets the next sequential input)
722    }
723
724    /// Wraps an [`ArrowBytesViewMap`], validating its invariants
725    struct TestMap {
726        map: ArrowBytesViewMap<TestPayload>,
727        // stores distinct strings seen, in order
728        strings: Vec<Option<String>>,
729        // map strings to index in strings
730        indexes: HashMap<Option<String>, usize>,
731    }
732
733    impl TestMap {
734        /// creates a map with TestPayloads for the given strings and then
735        /// validates the payloads
736        fn new() -> Self {
737            Self {
738                map: ArrowBytesViewMap::new(OutputType::Utf8View),
739                strings: vec![],
740                indexes: HashMap::new(),
741            }
742        }
743
744        /// Inserts strings into the map
745        fn insert(&mut self, strings: &[Option<&str>]) {
746            let string_array = StringViewArray::from(strings.to_vec());
747            let arr: ArrayRef = Arc::new(string_array);
748
749            let mut next_index = self.indexes.len();
750            let mut actual_new_strings = vec![];
751            let mut actual_seen_indexes = vec![];
752            // update self with new values, keeping track of newly added values
753            for str in strings {
754                let str = str.map(|s| s.to_string());
755                let index = self.indexes.get(&str).cloned().unwrap_or_else(|| {
756                    actual_new_strings.push(str.clone());
757                    let index = self.strings.len();
758                    self.strings.push(str.clone());
759                    self.indexes.insert(str, index);
760                    index
761                });
762                actual_seen_indexes.push(index);
763            }
764
765            // insert the values into the map, recording what we did
766            let mut seen_new_strings = vec![];
767            let mut seen_indexes = vec![];
768            self.map.insert_if_new(
769                &arr,
770                |s| {
771                    let value = s
772                        .map(|s| String::from_utf8(s.to_vec()).expect("Non utf8 string"));
773                    let index = next_index;
774                    next_index += 1;
775                    seen_new_strings.push(value);
776                    TestPayload { index }
777                },
778                |payload| {
779                    seen_indexes.push(payload.index);
780                },
781            );
782
783            assert_eq!(actual_seen_indexes, seen_indexes);
784            assert_eq!(actual_new_strings, seen_new_strings);
785        }
786
787        /// Call `self.map.into_array()` validating that the strings are in the same
788        /// order as they were inserted
789        fn into_array(self) -> ArrayRef {
790            let Self {
791                map,
792                strings,
793                indexes: _,
794            } = self;
795
796            let arr = map.into_state();
797            let expected: ArrayRef = Arc::new(StringViewArray::from(strings));
798            assert_eq!(&arr, &expected);
799            arr
800        }
801    }
802
803    #[test]
804    fn test_map() {
805        let input = vec![
806            // Note mix of short/long strings
807            Some("A"),
808            Some("bcdefghijklmnop1234567"),
809            Some("X"),
810            Some("Y"),
811            None,
812            Some("qrstuvqxyzhjwya"),
813            Some("✨🔥"),
814            Some("🔥"),
815            Some("🔥🔥🔥🔥🔥🔥"),
816        ];
817
818        let mut test_map = TestMap::new();
819        test_map.insert(&input);
820        test_map.insert(&input); // put it in twice
821        let expected_output: ArrayRef = Arc::new(StringViewArray::from(input));
822        assert_eq!(&test_map.into_array(), &expected_output);
823    }
824}