Skip to main content

lance_encoding/
repdef.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Utilities for rep-def levels
5//!
6//! Repetition and definition levels are a way to encode multipile validity / offsets arrays
7//! into a single buffer.  They are a form of "zipping" buffers together that takes advantage
8//! of the fact that, if the outermost array is invalid, then the validity of the inner items
9//! is irrelevant.
10//!
11//! Note: the concept of repetition & definition levels comes from the Dremel paper and has
12//! been implemented in Apache Parquet.  However, the implementation here is not necessarily
13//! compatible with Parquet.  For example, we use 0 to represent the "inner-most" item and
14//! Parquet uses 0 to represent the "outer-most" item.
15//!
16//! # Repetition Levels
17//!
18//! With repetition levels we convert a sparse array of offsets into a dense array of levels.
19//! These levels are marked non-zero whenever a new list begins.  In other words, given the
20//! list array with 3 rows [{<0,1>, <>, <2>}, {<3>}, {}], [], [{<4>}] we would have three
21//! offsets arrays:
22//!
23//! Outer-most ([]): [0, 3, 3, 4]
24//! Middle     ({}): [0, 3, 4, 4, 5]
25//! Inner      (<>): [0, 2, 2, 3, 4, 5]
26//! Values         : [0, 1, 2, 3, 4]
27//!
28//! We can convert these into repetition levels as follows:
29//!
30//! | Values | Repetition |
31//! | ------ | ---------- |
32//! |      0 |          3 | // Start of outer-most list
33//! |      1 |          0 | // Continues inner-most list (no new lists)
34//! |      - |          1 | // Start of new inner-most list (empty list)
35//! |      2 |          1 | // Start of new inner-most list
36//! |      3 |          2 | // Start of new middle list
37//! |      - |          2 | // Start of new inner-most list (empty list)
38//! |      - |          3 | // Start of new outer-most list (empty list)
39//! |      4 |          0 | // Start of new outer-most list
40//!
41//! Note: We actually have MORE repetition levels than values.  This is because the repetition
42//! levels need to be able to represent empty lists.
43//!
44//! # Definition Levels
45//!
46//! Definition levels are simpler.  We can think of them as zipping together various validity bitmaps
47//! (from different levels of nesting) into a single buffer.  For example, we could zip the arrays
48//! [1, 1, 0, 0] and [1, 0, 1, 0] into [11, 10, 01, 00].  However, 00 and 01 are redundant.  If the
49//! outer level is null then the validity of the inner levels is irrelevant.  To save space we instead
50//! encode a "level" which is the "depth" of the null.  Let's look at a more complete example:
51//!
52//! Array: [{"middle": {"inner": 1]}}, NULL, {"middle": NULL}, {"middle": {"inner": NULL}}]
53//!
54//! In Arrow we would have the following validity arrays:
55//! Outer validity : 1, 0, 1, 1
56//! Middle validity: 1, ?, 0, 1
57//! Inner validity : 1, ?, ?, 0
58//! Values         : 1, ?, ?, ?
59//!
60//! The ? values are undefined in the Arrow format.  We can convert these into definition levels as follows:
61//!
62//! | Values | Definition |
63//! | ------ | ---------- |
64//! |      1 |          0 | // Valid at all levels
65//! |      - |          3 | // Null at outer level
66//! |      - |          2 | // Null at middle level
67//! |      - |          1 | // Null at inner level
68//!
69//! # Compression
70//!
71//! Note that we only need 2 bits of definition levels to represent 3 levels of nesting.  Definition
72//! levels are always more compact than the input validity arrays.  However, compressed levels are not
73//! necessarily more compact than the compressed validity arrays.
74//!
75//! Repetition levels are more complex.  If there are very large lists then a sparse array of offsets
76//! (which has one element per list) might be more compact than a dense array of repetition levels
77//! (which has one element per list value, possibly even more if there are empty lists).
78//!
79//! However, both repetition levels and definition levels are typically very compressible with RLE.
80//!
81//! However, in Lance we don't always take advantage of that compression because we want to be able
82//! to zip rep-def levels together with our values.  This gives us fewer IOPS when accessing row values.
83//!
84//! # Utilities in this Module
85//!
86//! - `RepDefBuilder` - Extracts validity and offset information from Arrow arrays.  We use this as we
87//!   shred the incoming data into primitive leaf arrays.  We don't immediately convert into rep-def because
88//!   we need to share and cheaply clone the builder when we have structs (each struct child shares some parent
89//!   validity / offset information)  The `serialize` method is called once all data has been received to create
90//!   the final rep-def levels.
91//!
92//! - `SerializerContext` - This is an internal utility that helps with serializing rep-def levels.
93//!
94//! - `CompositeRepDefUnraveler` - This structure is used to reverse the process.  It starts with a set of
95//!   rep-def levels and then uses the `unravel_validity` and `unravel_offsets` methods to produce validity
96//!   buffers and offset buffers.  It is "composite" because we may be combining sets of rep-def buffers from
97//!   multiple locations (e.g. multiple blocks in a mini-block encoded file).
98//!
99//! - `RepDefSlicer` - This is a utility that helps with slicing rep-def buffers.  These buffers are "kind of"
100//!   transparent (maps 1:1 with the values in the array) but not exactly because of the special (empty/null) lists.
101//!   The slicer helps with this issue and is used when slicing a rep-def buffer into mini-blocks.
102//!
103//! - `build_control_word_iterator` - This takes in rep-def levels and returns an iterator that returns byte-padded
104//!   "control words" which are used when creating full-zip encoded data.
105//!
106//! - `ControlWordParser` - This parser can parse the control words returned by `build_control_word_iterator` and is
107//!   used when decoding full-zip encoded data.
108
109use std::{
110    iter::{Copied, Zip},
111    sync::Arc,
112};
113
114use arrow_array::OffsetSizeTrait;
115use arrow_buffer::{
116    ArrowNativeType, BooleanBuffer, BooleanBufferBuilder, NullBuffer, OffsetBuffer, ScalarBuffer,
117};
118use lance_core::{Error, Result, utils::bit::log_2_ceil};
119
120use crate::buffer::LanceBuffer;
121
122pub type LevelBuffer = Vec<u16>;
123
124// As we build def levels we add this to special values to indicate that they
125// are special so that we can skip over them when processing lower levels.
126//
127// We assume 16 bits is good enough for rep-def levels.  This _would_ give
128// us 65536 levels of struct nesting and list nesting.  However, we cut that
129// in half for SPECIAL_THRESHOLD because we use the top bit to indicate if an
130// item is a special value (null list / empty list) during construction.
131//
132// We subtract this off at the end of construction to get the actual definition
133// levels.
134const SPECIAL_THRESHOLD: u16 = u16::MAX / 2;
135
136/// Represents information that we extract from a list array as we are
137/// encoding
138#[derive(Clone, Debug)]
139struct OffsetDesc {
140    offsets: Arc<[i64]>,
141    validity: Option<BooleanBuffer>,
142    has_empty_lists: bool,
143    num_values: usize,
144    num_specials: usize,
145}
146
147/// Represents validity information that we extract from non-list arrays (that
148/// have nulls) as we are encoding
149#[derive(Clone, Debug)]
150struct ValidityDesc {
151    validity: Option<BooleanBuffer>,
152    num_values: usize,
153}
154
155/// Represents validity information that we extract from FSL arrays.  This is
156/// just validity (no offsets) but we also record the dimension of the FSL array
157/// as that will impact the next layer
158#[derive(Clone, Debug)]
159struct FslDesc {
160    validity: Option<BooleanBuffer>,
161    dimension: usize,
162    num_values: usize,
163}
164
165// As we build up rep/def from arrow arrays we record a
166// series of RawRepDef objects.  Each one corresponds to layer
167// in the array structure
168#[derive(Clone, Debug)]
169enum RawRepDef {
170    Offsets(OffsetDesc),
171    Validity(ValidityDesc),
172    Fsl(FslDesc),
173}
174
175impl RawRepDef {
176    // Are there any nulls in this layer
177    fn has_nulls(&self) -> bool {
178        match self {
179            Self::Offsets(OffsetDesc { validity, .. }) => validity.is_some(),
180            Self::Validity(ValidityDesc { validity, .. }) => validity.is_some(),
181            Self::Fsl(FslDesc { validity, .. }) => validity.is_some(),
182        }
183    }
184
185    // How many values are in this layer
186    fn num_values(&self) -> usize {
187        match self {
188            Self::Offsets(OffsetDesc { num_values, .. }) => *num_values,
189            Self::Validity(ValidityDesc { num_values, .. }) => *num_values,
190            Self::Fsl(FslDesc { num_values, .. }) => *num_values,
191        }
192    }
193
194    /// How many empty/null lists are in this layer
195    fn num_specials(&self) -> usize {
196        match self {
197            Self::Offsets(OffsetDesc { num_specials, .. }) => *num_specials,
198            _ => 0,
199        }
200    }
201
202    /// How many definition levels do we need for this layer
203    fn max_def(&self) -> u16 {
204        match self {
205            Self::Offsets(OffsetDesc {
206                has_empty_lists,
207                validity,
208                ..
209            }) => {
210                let mut max_def = 0;
211                if *has_empty_lists {
212                    max_def += 1;
213                }
214                if validity.is_some() {
215                    max_def += 1;
216                }
217                max_def
218            }
219            Self::Validity(ValidityDesc { validity: None, .. }) => 0,
220            Self::Validity(ValidityDesc { .. }) => 1,
221            Self::Fsl(FslDesc { validity: None, .. }) => 0,
222            Self::Fsl(FslDesc { .. }) => 1,
223        }
224    }
225
226    /// How many repetition levels do we need for this layer
227    fn max_rep(&self) -> u16 {
228        match self {
229            Self::Offsets(_) => 1,
230            _ => 0,
231        }
232    }
233}
234
235/// Represents repetition and definition levels that have been
236/// serialized into a pair of (optional) level buffers
237#[derive(Debug)]
238pub struct SerializedRepDefs {
239    /// The repetition levels, one per item
240    ///
241    /// If None, there are no lists
242    pub repetition_levels: Option<Arc<[u16]>>,
243    /// The definition levels, one per item
244    ///
245    /// If None, there are no nulls
246    pub definition_levels: Option<Arc<[u16]>>,
247    /// The meaning of each definition level
248    pub def_meaning: Vec<DefinitionInterpretation>,
249    /// The maximum level that is "visible" from the lowest level
250    ///
251    /// This is the last level before we encounter a list level of some kind.  Once we've
252    /// hit a list level then nulls in any level beyond do not map to actual items.
253    ///
254    /// This is None if there are no lists
255    pub max_visible_level: Option<u16>,
256}
257
258impl SerializedRepDefs {
259    pub fn new(
260        repetition_levels: Option<LevelBuffer>,
261        definition_levels: Option<LevelBuffer>,
262        def_meaning: Vec<DefinitionInterpretation>,
263    ) -> Self {
264        let first_list = def_meaning.iter().position(|level| level.is_list());
265        let max_visible_level = first_list.map(|first_list| {
266            def_meaning
267                .iter()
268                .map(|level| level.num_def_levels())
269                .take(first_list)
270                .sum::<u16>()
271        });
272        Self {
273            repetition_levels: repetition_levels.map(Arc::from),
274            definition_levels: definition_levels.map(Arc::from),
275            def_meaning,
276            max_visible_level,
277        }
278    }
279
280    /// Creates an empty SerializedRepDefs (no repetition, all valid)
281    pub fn empty(def_meaning: Vec<DefinitionInterpretation>) -> Self {
282        Self {
283            repetition_levels: None,
284            definition_levels: None,
285            def_meaning,
286            max_visible_level: None,
287        }
288    }
289
290    pub fn rep_slicer(&self) -> Option<RepDefSlicer<'_>> {
291        self.repetition_levels
292            .as_ref()
293            .map(|rep| RepDefSlicer::new(self, rep.clone()))
294    }
295
296    pub fn def_slicer(&self) -> Option<RepDefSlicer<'_>> {
297        self.definition_levels
298            .as_ref()
299            .map(|def| RepDefSlicer::new(self, def.clone()))
300    }
301}
302
303/// Slices a level buffer into pieces
304///
305/// This is needed to handle the fact that a level buffer may have more
306/// levels than values due to special (empty/null) lists.
307///
308/// As a result, a call to `slice_next(10)` may return 10 levels or it may
309/// return more than 10 levels if any special values are encountered.
310#[derive(Debug)]
311pub struct RepDefSlicer<'a> {
312    repdef: &'a SerializedRepDefs,
313    to_slice: LanceBuffer,
314    current: usize,
315}
316
317// TODO: All of this logic will need some changing when we compress rep/def levels.
318impl<'a> RepDefSlicer<'a> {
319    fn new(repdef: &'a SerializedRepDefs, levels: Arc<[u16]>) -> Self {
320        Self {
321            repdef,
322            to_slice: LanceBuffer::reinterpret_slice(levels),
323            current: 0,
324        }
325    }
326
327    pub fn num_levels(&self) -> usize {
328        self.to_slice.len() / 2
329    }
330
331    pub fn num_levels_remaining(&self) -> usize {
332        self.num_levels() - self.current
333    }
334
335    pub fn all_levels(&self) -> &LanceBuffer {
336        &self.to_slice
337    }
338
339    /// Returns the rest of the levels not yet sliced
340    ///
341    /// This must be called instead of `slice_next` on the final iteration.
342    /// This is because anytime we slice there may be empty/null lists on the
343    /// boundary that are "free" and the current behavior in `slice_next` is to
344    /// leave them for the next call.
345    ///
346    /// `slice_rest` will slice all remaining levels and return them.
347    pub fn slice_rest(&mut self) -> LanceBuffer {
348        let start = self.current;
349        let remaining = self.num_levels_remaining();
350        self.current = self.num_levels();
351        self.to_slice.slice_with_length(start * 2, remaining * 2)
352    }
353
354    /// Returns enough levels to satisfy the next `num_values` values
355    pub fn slice_next(&mut self, num_values: usize) -> LanceBuffer {
356        let start = self.current;
357        let Some(max_visible_level) = self.repdef.max_visible_level else {
358            // No lists, should be 1:1 mapping from levels to values
359            self.current = start + num_values;
360            return self.to_slice.slice_with_length(start * 2, num_values * 2);
361        };
362        if let Some(def) = self.repdef.definition_levels.as_ref() {
363            // There are lists and there are def levels.  That means there may be
364            // more rep/def levels than values.  We need to scan the def levels to figure
365            // out which items are "invisible" and skip over them
366            let mut def_itr = def[start..].iter();
367            let mut num_taken = 0;
368            let mut num_passed = 0;
369            while num_taken < num_values {
370                let def_level = *def_itr.next().unwrap();
371                if def_level <= max_visible_level {
372                    num_taken += 1;
373                }
374                num_passed += 1;
375            }
376            self.current = start + num_passed;
377            self.to_slice.slice_with_length(start * 2, num_passed * 2)
378        } else {
379            // No def levels, should be 1:1 mapping from levels to values
380            self.current = start + num_values;
381            self.to_slice.slice_with_length(start * 2, num_values * 2)
382        }
383    }
384}
385
386/// This tells us how an array handles definition.  Given a stack of
387/// these and a nested array and a set of definition levels we can calculate
388/// how we should interpret the definition levels.
389///
390/// For example, if the interpretation is [AllValidItem, NullableItem] then
391/// a 0 means "valid item" and a 1 means "null struct".  If the interpretation
392/// is [NullableItem, NullableItem] then a 0 means "valid item" and a 1 means
393/// "null item" and a 2 means "null struct".
394///
395/// Lists are tricky because we might use up to two definition levels for a
396/// single layer of list nesting because we need one value to indicate "empty list"
397/// and another value to indicate "null list".
398#[derive(Debug, Copy, Clone, PartialEq, Eq)]
399pub enum DefinitionInterpretation {
400    AllValidItem,
401    AllValidList,
402    NullableItem,
403    NullableList,
404    EmptyableList,
405    NullableAndEmptyableList,
406}
407
408impl DefinitionInterpretation {
409    /// How many definition levels do we need for this layer
410    pub fn num_def_levels(&self) -> u16 {
411        match self {
412            Self::AllValidItem => 0,
413            Self::AllValidList => 0,
414            Self::NullableItem => 1,
415            Self::NullableList => 1,
416            Self::EmptyableList => 1,
417            Self::NullableAndEmptyableList => 2,
418        }
419    }
420
421    /// Does this layer have nulls?
422    pub fn is_all_valid(&self) -> bool {
423        matches!(
424            self,
425            Self::AllValidItem | Self::AllValidList | Self::EmptyableList
426        )
427    }
428
429    /// Does this layer represent a list?
430    pub fn is_list(&self) -> bool {
431        matches!(
432            self,
433            Self::AllValidList
434                | Self::NullableList
435                | Self::EmptyableList
436                | Self::NullableAndEmptyableList
437        )
438    }
439}
440
441/// The RepDefBuilder is used to collect offsets & validity buffers
442/// from arrow structures.  Once we have those we use the SerializerContext
443/// to build the actual repetition and definition levels.
444///
445/// We know ahead of time how many rep/def levels we will need (number of items
446/// in inner-most array + the number of empty/null lists in any parent arrays).
447///
448/// As a result we try and avoid any re-allocations by pre-allocating the buffers
449/// up front.  We allocate two copies of each buffer which allows us to avoid unsafe
450/// code caused by reading and writing to the same buffer (also, it's unavoidable
451/// because there are times we need to write 'faster' than we read)
452#[derive(Debug)]
453struct SerializerContext {
454    // This is built from outer-to-inner and then reversed at the end
455    def_meaning: Vec<DefinitionInterpretation>,
456    rep_levels: LevelBuffer,
457    spare_rep: LevelBuffer,
458    def_levels: LevelBuffer,
459    spare_def: LevelBuffer,
460    current_rep: u16,
461    current_def: u16,
462    current_len: usize,
463    current_num_specials: usize,
464}
465
466impl SerializerContext {
467    fn new(len: usize, num_layers: usize, max_rep: u16, max_def: u16) -> Self {
468        let def_meaning = Vec::with_capacity(num_layers);
469        Self {
470            rep_levels: if max_rep > 0 {
471                vec![0; len]
472            } else {
473                LevelBuffer::default()
474            },
475            spare_rep: if max_rep > 0 {
476                vec![0; len]
477            } else {
478                LevelBuffer::default()
479            },
480            def_levels: if max_def > 0 {
481                vec![0; len]
482            } else {
483                LevelBuffer::default()
484            },
485            spare_def: if max_def > 0 {
486                vec![0; len]
487            } else {
488                LevelBuffer::default()
489            },
490            def_meaning,
491            current_rep: max_rep,
492            current_def: max_def,
493            current_len: 0,
494            current_num_specials: 0,
495        }
496    }
497
498    fn checkout_def(&mut self, meaning: DefinitionInterpretation) -> u16 {
499        let def = self.current_def;
500        self.current_def -= meaning.num_def_levels();
501        self.def_meaning.push(meaning);
502        def
503    }
504
505    fn record_offsets(&mut self, offset_desc: &OffsetDesc) {
506        let rep_level = self.current_rep;
507        let (null_list_level, empty_list_level) =
508            match (offset_desc.validity.is_some(), offset_desc.has_empty_lists) {
509                (true, true) => {
510                    let level =
511                        self.checkout_def(DefinitionInterpretation::NullableAndEmptyableList);
512                    (level - 1, level)
513                }
514                (true, false) => (self.checkout_def(DefinitionInterpretation::NullableList), 0),
515                (false, true) => (
516                    0,
517                    self.checkout_def(DefinitionInterpretation::EmptyableList),
518                ),
519                (false, false) => {
520                    self.checkout_def(DefinitionInterpretation::AllValidList);
521                    (0, 0)
522                }
523            };
524        self.current_rep -= 1;
525
526        if let Some(validity) = &offset_desc.validity {
527            self.do_record_validity(validity, null_list_level);
528        }
529
530        // We write into the spare buffers and read from the active buffers
531        // and then swap at the end.  This way we don't write over what we
532        // are reading.
533
534        let mut new_len = 0;
535        let expected_len = offset_desc.num_values + self.current_num_specials;
536        if expected_len == 0 {
537            // Offsets [0] mean no list values, so no levels.
538            self.current_len = 0;
539            return;
540        }
541        assert!(self.rep_levels.len() >= expected_len - 1);
542        if self.def_levels.is_empty() {
543            let mut write_itr = self.spare_rep.iter_mut();
544            let mut read_iter = self.rep_levels.iter().copied();
545            for w in offset_desc.offsets.windows(2) {
546                let len = w[1] - w[0];
547                // len can't be 0 because then we'd have def levels
548                assert!(len > 0);
549                let rep = read_iter.next().unwrap();
550                let list_level = if rep == 0 { rep_level } else { rep };
551                *write_itr.next().unwrap() = list_level;
552
553                for _ in 1..len {
554                    *write_itr.next().unwrap() = 0;
555                }
556                new_len += len as usize;
557            }
558            std::mem::swap(&mut self.rep_levels, &mut self.spare_rep);
559        } else {
560            assert!(self.def_levels.len() >= expected_len - 1);
561            let mut def_write_itr = self.spare_def.iter_mut();
562            let mut rep_write_itr = self.spare_rep.iter_mut();
563            let mut rep_read_itr = self.rep_levels.iter().copied();
564            let mut def_read_itr = self.def_levels.iter().copied();
565            let specials_to_pass = self.current_num_specials;
566            let mut specials_passed = 0;
567
568            for w in offset_desc.offsets.windows(2) {
569                let mut def = def_read_itr.next().unwrap();
570                // Copy over any higher-level special values in place
571                while def > SPECIAL_THRESHOLD {
572                    *def_write_itr.next().unwrap() = def;
573                    *rep_write_itr.next().unwrap() = rep_read_itr.next().unwrap();
574                    def = def_read_itr.next().unwrap();
575                    new_len += 1;
576                    specials_passed += 1;
577                }
578
579                let len = w[1] - w[0];
580                let rep = rep_read_itr.next().unwrap();
581
582                // If the rep_level is 0 then we are the first list level
583                // otherwise we are starting a higher level list so keep
584                // existing rep level
585                let list_level = if rep == 0 { rep_level } else { rep };
586
587                if def == 0 && len > 0 {
588                    // New valid list, write a rep level and then add new 0/0 items
589                    *def_write_itr.next().unwrap() = 0;
590                    *rep_write_itr.next().unwrap() = list_level;
591
592                    for _ in 1..len {
593                        *def_write_itr.next().unwrap() = 0;
594                        *rep_write_itr.next().unwrap() = 0;
595                    }
596
597                    new_len += len as usize;
598                } else if def == 0 {
599                    // Empty list, insert new special
600                    *def_write_itr.next().unwrap() = empty_list_level + SPECIAL_THRESHOLD;
601                    *rep_write_itr.next().unwrap() = list_level;
602                    new_len += 1;
603                } else {
604                    // Either the list is null or one of its struct parents
605                    // is null.  Promote it to a special value.
606                    *def_write_itr.next().unwrap() = def + SPECIAL_THRESHOLD;
607                    *rep_write_itr.next().unwrap() = list_level;
608                    new_len += 1;
609                }
610            }
611
612            // If we have any special values at the end, we need to copy them over
613            while specials_passed < specials_to_pass {
614                *def_write_itr.next().unwrap() = def_read_itr.next().unwrap();
615                *rep_write_itr.next().unwrap() = rep_read_itr.next().unwrap();
616                new_len += 1;
617                specials_passed += 1;
618            }
619            std::mem::swap(&mut self.def_levels, &mut self.spare_def);
620            std::mem::swap(&mut self.rep_levels, &mut self.spare_rep);
621        }
622
623        self.current_len = new_len;
624        self.current_num_specials += offset_desc.num_specials;
625    }
626
627    fn do_record_validity(&mut self, validity: &BooleanBuffer, null_level: u16) {
628        assert!(self.def_levels.len() >= validity.len() + self.current_num_specials);
629        debug_assert!(
630            self.current_len == 0 || self.current_len == validity.len() + self.current_num_specials
631        );
632        self.current_len = validity.len();
633
634        let mut def_read_itr = self.def_levels.iter().copied();
635        let mut def_write_itr = self.spare_def.iter_mut();
636
637        let specials_to_pass = self.current_num_specials;
638        let mut specials_passed = 0;
639
640        for incoming_validity in validity.iter() {
641            let mut def = def_read_itr.next().unwrap();
642            while def > SPECIAL_THRESHOLD {
643                *def_write_itr.next().unwrap() = def;
644                def = def_read_itr.next().unwrap();
645                specials_passed += 1;
646            }
647            if def == 0 && !incoming_validity {
648                *def_write_itr.next().unwrap() = null_level;
649            } else {
650                *def_write_itr.next().unwrap() = def;
651            }
652        }
653
654        while specials_passed < specials_to_pass {
655            *def_write_itr.next().unwrap() = def_read_itr.next().unwrap();
656            specials_passed += 1;
657        }
658
659        std::mem::swap(&mut self.def_levels, &mut self.spare_def);
660    }
661
662    fn multiply_levels(&mut self, multiplier: usize) {
663        let old_len = self.current_len;
664        // All non-special values will be broadcasted by the multiplier.  Special values are copied as-is.
665        self.current_len =
666            (self.current_len - self.current_num_specials) * multiplier + self.current_num_specials;
667
668        if self.rep_levels.is_empty() && self.def_levels.is_empty() {
669            // All valid with no rep/def levels, nothing to do
670            return;
671        } else if self.rep_levels.is_empty() {
672            assert!(self.def_levels.len() >= self.current_len);
673            // No rep levels, just multiply the def levels
674            let mut def_read_itr = self.def_levels.iter().copied();
675            let mut def_write_itr = self.spare_def.iter_mut();
676            for _ in 0..old_len {
677                let mut def = def_read_itr.next().unwrap();
678                while def > SPECIAL_THRESHOLD {
679                    *def_write_itr.next().unwrap() = def;
680                    def = def_read_itr.next().unwrap();
681                }
682                for _ in 0..multiplier {
683                    *def_write_itr.next().unwrap() = def;
684                }
685            }
686        } else if self.def_levels.is_empty() {
687            assert!(self.rep_levels.len() >= self.current_len);
688            // No def levels, just multiply the rep levels
689            let mut rep_read_itr = self.rep_levels.iter().copied();
690            let mut rep_write_itr = self.spare_rep.iter_mut();
691            for _ in 0..old_len {
692                let rep = rep_read_itr.next().unwrap();
693                for _ in 0..multiplier {
694                    *rep_write_itr.next().unwrap() = rep;
695                }
696            }
697        } else {
698            assert!(self.rep_levels.len() >= self.current_len);
699            assert!(self.def_levels.len() >= self.current_len);
700            let mut rep_read_itr = self.rep_levels.iter().copied();
701            let mut def_read_itr = self.def_levels.iter().copied();
702            let mut rep_write_itr = self.spare_rep.iter_mut();
703            let mut def_write_itr = self.spare_def.iter_mut();
704            for _ in 0..old_len {
705                let mut def = def_read_itr.next().unwrap();
706                while def > SPECIAL_THRESHOLD {
707                    *def_write_itr.next().unwrap() = def;
708                    *rep_write_itr.next().unwrap() = rep_read_itr.next().unwrap();
709                    def = def_read_itr.next().unwrap();
710                }
711                let rep = rep_read_itr.next().unwrap();
712                for _ in 0..multiplier {
713                    *def_write_itr.next().unwrap() = def;
714                    *rep_write_itr.next().unwrap() = rep;
715                }
716            }
717        }
718        std::mem::swap(&mut self.def_levels, &mut self.spare_def);
719        std::mem::swap(&mut self.rep_levels, &mut self.spare_rep);
720    }
721
722    fn record_validity_buf(&mut self, validity: &Option<BooleanBuffer>) {
723        if let Some(validity) = validity {
724            let def_level = self.checkout_def(DefinitionInterpretation::NullableItem);
725            self.do_record_validity(validity, def_level);
726        } else {
727            self.checkout_def(DefinitionInterpretation::AllValidItem);
728        }
729    }
730
731    fn record_validity(&mut self, validity_desc: &ValidityDesc) {
732        self.record_validity_buf(&validity_desc.validity)
733    }
734
735    fn record_fsl(&mut self, fsl_desc: &FslDesc) {
736        self.record_validity_buf(&fsl_desc.validity);
737        self.multiply_levels(fsl_desc.dimension);
738    }
739
740    fn normalize_specials(&mut self) {
741        for def in self.def_levels.iter_mut() {
742            if *def > SPECIAL_THRESHOLD {
743                *def -= SPECIAL_THRESHOLD;
744            }
745        }
746    }
747
748    fn build(mut self) -> SerializedRepDefs {
749        if self.current_len == 0 {
750            return SerializedRepDefs::new(None, None, self.def_meaning);
751        }
752
753        self.normalize_specials();
754
755        let definition_levels = if self.def_levels.is_empty() {
756            None
757        } else {
758            Some(self.def_levels)
759        };
760        let repetition_levels = if self.rep_levels.is_empty() {
761            None
762        } else {
763            Some(self.rep_levels)
764        };
765
766        // Need to reverse the def meaning since we build rep / def levels in reverse
767        let def_meaning = self.def_meaning.into_iter().rev().collect::<Vec<_>>();
768
769        SerializedRepDefs::new(repetition_levels, definition_levels, def_meaning)
770    }
771}
772
773/// A structure used to collect validity buffers and offsets from arrow
774/// arrays and eventually create repetition and definition levels
775///
776/// As we are encoding the structural encoders are given this struct and
777/// will record the arrow information into it.  Once we hit a leaf node we
778/// serialize the data into rep/def levels and write these into the page.
779#[derive(Clone, Default, Debug)]
780pub struct RepDefBuilder {
781    // The rep/def info we have collected so far
782    repdefs: Vec<RawRepDef>,
783    // The current length, can get larger as we traverse lists (e.g. an
784    // array might have 5 lists which results in 50 items)
785    //
786    // Starts uninitialized until we see the first rep/def item
787    len: Option<usize>,
788}
789
790impl RepDefBuilder {
791    fn check_validity_len(&mut self, incoming_len: usize) {
792        if let Some(len) = self.len {
793            assert_eq!(incoming_len, len);
794        } else {
795            // First validity buffer we've seen
796            self.len = Some(incoming_len);
797        }
798    }
799
800    fn num_layers(&self) -> usize {
801        self.repdefs.len()
802    }
803
804    /// The builder is "empty" if there is no repetition and no nulls.  In this case we don't need
805    /// to store anything to disk (except the description)
806    pub fn is_empty(&self) -> bool {
807        self.repdefs
808            .iter()
809            .all(|r| matches!(r, RawRepDef::Validity(ValidityDesc { validity: None, .. })))
810    }
811
812    /// Returns true if there is only a single layer of definition
813    pub fn is_simple_validity(&self) -> bool {
814        self.repdefs.len() == 1 && matches!(self.repdefs[0], RawRepDef::Validity(_))
815    }
816
817    /// Registers a nullable validity bitmap
818    pub fn add_validity_bitmap(&mut self, validity: NullBuffer) {
819        self.check_validity_len(validity.len());
820        self.repdefs.push(RawRepDef::Validity(ValidityDesc {
821            num_values: validity.len(),
822            validity: Some(validity.into_inner()),
823        }));
824    }
825
826    /// Registers an all-valid validity layer
827    pub fn add_no_null(&mut self, len: usize) {
828        self.check_validity_len(len);
829        self.repdefs.push(RawRepDef::Validity(ValidityDesc {
830            validity: None,
831            num_values: len,
832        }));
833    }
834
835    pub fn add_fsl(&mut self, validity: Option<NullBuffer>, dimension: usize, num_values: usize) {
836        if let Some(len) = self.len {
837            assert_eq!(num_values, len);
838        }
839        self.len = Some(num_values * dimension);
840        debug_assert!(validity.is_none() || validity.as_ref().unwrap().len() == num_values);
841        self.repdefs.push(RawRepDef::Fsl(FslDesc {
842            num_values,
843            validity: validity.map(|v| v.into_inner()),
844            dimension,
845        }))
846    }
847
848    fn check_offset_len(&mut self, offsets: &[i64]) {
849        if let Some(len) = self.len {
850            assert!(offsets.len() == len + 1);
851        }
852        self.len = Some(offsets[offsets.len() - 1] as usize);
853    }
854
855    fn do_add_offsets(
856        &mut self,
857        lengths: impl Iterator<Item = i64>,
858        validity: Option<NullBuffer>,
859        capacity: usize,
860    ) -> bool {
861        let mut num_specials = 0;
862        let mut has_empty_lists = false;
863        let mut has_garbage_values = false;
864        let mut last_off: i64 = 0;
865
866        let mut normalized_offsets = Vec::with_capacity(capacity);
867        normalized_offsets.push(0);
868
869        if let Some(ref validity) = validity {
870            for (len, is_valid) in lengths.zip(validity.iter()) {
871                match (is_valid, len == 0) {
872                    (false, is_empty) => {
873                        num_specials += 1;
874                        has_garbage_values |= !is_empty;
875                    }
876                    (true, true) => {
877                        num_specials += 1;
878                        has_empty_lists = true;
879                    }
880                    _ => {
881                        last_off += len;
882                    }
883                }
884                normalized_offsets.push(last_off);
885            }
886        } else {
887            for len in lengths {
888                if len == 0 {
889                    num_specials += 1;
890                    has_empty_lists = true;
891                }
892                last_off += len;
893                normalized_offsets.push(last_off);
894            }
895        }
896
897        self.check_offset_len(&normalized_offsets);
898        self.repdefs.push(RawRepDef::Offsets(OffsetDesc {
899            num_values: normalized_offsets.len() - 1,
900            offsets: normalized_offsets.into(),
901            validity: validity.map(|v| v.into_inner()),
902            has_empty_lists,
903            num_specials: num_specials as usize,
904        }));
905
906        has_garbage_values
907    }
908
909    /// Adds a layer of offsets
910    ///
911    /// Offsets are casted to a common type (i64) and also normalized.  Null lists are
912    /// always represented by a zero-length (identical) pair of offsets and so the caller
913    /// should filter out any garbage items before encoding them.  To assist with this the
914    /// method will return true if any non-empty null lists were found.
915    pub fn add_offsets<O: OffsetSizeTrait>(
916        &mut self,
917        offsets: OffsetBuffer<O>,
918        validity: Option<NullBuffer>,
919    ) -> bool {
920        let inner = offsets.into_inner();
921        let buffer_len = inner.len();
922
923        if O::IS_LARGE {
924            let i64_buff = ScalarBuffer::<i64>::new(inner.into_inner(), 0, buffer_len);
925            let lengths = i64_buff.windows(2).map(|off| off[1] - off[0]);
926            self.do_add_offsets(lengths, validity, buffer_len)
927        } else {
928            let i32_buff = ScalarBuffer::<i32>::new(inner.into_inner(), 0, buffer_len);
929            let lengths = i32_buff.windows(2).map(|off| (off[1] - off[0]) as i64);
930            self.do_add_offsets(lengths, validity, buffer_len)
931        }
932    }
933
934    // When we are encoding data it arrives in batches.  For each batch we create a RepDefBuilder and collect the
935    // various validity buffers and offset buffers from that batch.  Once we have enough batches to write a page we
936    // need to take this collection of RepDefBuilders and concatenate them and then serialize them into rep/def levels.
937    //
938    // TODO: In the future, we may concatenate and serialize at the same time?
939    //
940    // This method takes care of the concatenation part.  First we collect all of layer 0 from each builder, then we
941    // call this method.  Then we collect all of layer 1 from each builder and call this method.  And so on.
942    //
943    // That means this method should get a collection of `RawRepDef` where each item is the same kind (all validity or
944    // all offsets) though the nullability / lengths may be different in each layer.
945    fn concat_layers<'a>(
946        layers: impl Iterator<Item = &'a RawRepDef>,
947        num_layers: usize,
948    ) -> RawRepDef {
949        enum LayerKind {
950            Validity,
951            Fsl,
952            Offsets,
953        }
954
955        // We make two passes through the layers.  The first determines if we need to pay the cost of allocating
956        // buffers.  The second pass actually adds the values.
957        let mut collected = Vec::with_capacity(num_layers);
958        let mut has_nulls = false;
959        let mut layer_kind = LayerKind::Validity;
960        let mut total_num_specials = 0;
961        let mut all_dimension = 0;
962        let mut all_has_empty_lists = false;
963        let mut all_num_values = 0;
964        for layer in layers {
965            has_nulls |= layer.has_nulls();
966            match layer {
967                RawRepDef::Validity(_) => {
968                    layer_kind = LayerKind::Validity;
969                }
970                RawRepDef::Offsets(OffsetDesc {
971                    num_specials,
972                    has_empty_lists,
973                    ..
974                }) => {
975                    all_has_empty_lists |= *has_empty_lists;
976                    layer_kind = LayerKind::Offsets;
977                    total_num_specials += num_specials;
978                }
979                RawRepDef::Fsl(FslDesc { dimension, .. }) => {
980                    layer_kind = LayerKind::Fsl;
981                    all_dimension = *dimension;
982                }
983            }
984            collected.push(layer);
985            all_num_values += layer.num_values();
986        }
987
988        // Shortcut if there are no nulls
989        if !has_nulls {
990            match layer_kind {
991                LayerKind::Validity => {
992                    return RawRepDef::Validity(ValidityDesc {
993                        validity: None,
994                        num_values: all_num_values,
995                    });
996                }
997                LayerKind::Fsl => {
998                    return RawRepDef::Fsl(FslDesc {
999                        validity: None,
1000                        num_values: all_num_values,
1001                        dimension: all_dimension,
1002                    });
1003                }
1004                LayerKind::Offsets => {}
1005            }
1006        }
1007
1008        // Only allocate if needed
1009        let mut validity_builder = if has_nulls {
1010            BooleanBufferBuilder::new(all_num_values)
1011        } else {
1012            BooleanBufferBuilder::new(0)
1013        };
1014        let mut all_offsets = if matches!(layer_kind, LayerKind::Offsets) {
1015            let mut all_offsets = Vec::with_capacity(all_num_values);
1016            all_offsets.push(0);
1017            all_offsets
1018        } else {
1019            Vec::new()
1020        };
1021
1022        for layer in collected {
1023            match layer {
1024                RawRepDef::Validity(ValidityDesc {
1025                    validity: Some(validity),
1026                    ..
1027                }) => {
1028                    validity_builder.append_buffer(validity);
1029                }
1030                RawRepDef::Validity(ValidityDesc {
1031                    validity: None,
1032                    num_values,
1033                }) => {
1034                    validity_builder.append_n(*num_values, true);
1035                }
1036                RawRepDef::Fsl(FslDesc {
1037                    validity,
1038                    num_values,
1039                    ..
1040                }) => {
1041                    if let Some(validity) = validity {
1042                        validity_builder.append_buffer(validity);
1043                    } else {
1044                        validity_builder.append_n(*num_values, true);
1045                    }
1046                }
1047                RawRepDef::Offsets(OffsetDesc {
1048                    offsets,
1049                    validity: Some(validity),
1050                    has_empty_lists,
1051                    ..
1052                }) => {
1053                    all_has_empty_lists |= has_empty_lists;
1054                    validity_builder.append_buffer(validity);
1055                    let last = *all_offsets.last().unwrap();
1056                    all_offsets.extend(offsets.iter().skip(1).map(|off| *off + last));
1057                }
1058                RawRepDef::Offsets(OffsetDesc {
1059                    offsets,
1060                    validity: None,
1061                    has_empty_lists,
1062                    num_values,
1063                    ..
1064                }) => {
1065                    all_has_empty_lists |= has_empty_lists;
1066                    if has_nulls {
1067                        validity_builder.append_n(*num_values, true);
1068                    }
1069                    let last = *all_offsets.last().unwrap();
1070                    all_offsets.extend(offsets.iter().skip(1).map(|off| *off + last));
1071                }
1072            }
1073        }
1074        let validity = if has_nulls {
1075            Some(validity_builder.finish())
1076        } else {
1077            None
1078        };
1079        match layer_kind {
1080            LayerKind::Fsl => RawRepDef::Fsl(FslDesc {
1081                validity,
1082                num_values: all_num_values,
1083                dimension: all_dimension,
1084            }),
1085            LayerKind::Validity => RawRepDef::Validity(ValidityDesc {
1086                validity,
1087                num_values: all_num_values,
1088            }),
1089            LayerKind::Offsets => RawRepDef::Offsets(OffsetDesc {
1090                offsets: all_offsets.into(),
1091                validity,
1092                has_empty_lists: all_has_empty_lists,
1093                num_values: all_num_values,
1094                num_specials: total_num_specials,
1095            }),
1096        }
1097    }
1098
1099    /// Converts the validity / offsets buffers that have been gathered so far
1100    /// into repetition and definition levels
1101    pub fn serialize(builders: Vec<Self>) -> SerializedRepDefs {
1102        assert!(!builders.is_empty());
1103        if builders.iter().all(|b| b.is_empty()) {
1104            // No repetition, all-valid
1105            return SerializedRepDefs::empty(
1106                builders
1107                    .first()
1108                    .unwrap()
1109                    .repdefs
1110                    .iter()
1111                    .map(|_| DefinitionInterpretation::AllValidItem)
1112                    .collect::<Vec<_>>(),
1113            );
1114        }
1115
1116        let num_layers = builders[0].num_layers();
1117        let combined_layers = (0..num_layers)
1118            .map(|layer_index| {
1119                Self::concat_layers(
1120                    builders.iter().map(|b| &b.repdefs[layer_index]),
1121                    builders.len(),
1122                )
1123            })
1124            .collect::<Vec<_>>();
1125        debug_assert!(
1126            builders
1127                .iter()
1128                .all(|b| b.num_layers() == builders[0].num_layers())
1129        );
1130
1131        let total_len = combined_layers.last().unwrap().num_values()
1132            + combined_layers
1133                .iter()
1134                .map(|l| l.num_specials())
1135                .sum::<usize>();
1136        let max_rep = combined_layers.iter().map(|l| l.max_rep()).sum::<u16>();
1137        let max_def = combined_layers.iter().map(|l| l.max_def()).sum::<u16>();
1138
1139        let mut context = SerializerContext::new(total_len, num_layers, max_rep, max_def);
1140        for layer in combined_layers.into_iter() {
1141            match layer {
1142                RawRepDef::Validity(def) => {
1143                    context.record_validity(&def);
1144                }
1145                RawRepDef::Offsets(rep) => {
1146                    context.record_offsets(&rep);
1147                }
1148                RawRepDef::Fsl(fsl) => {
1149                    context.record_fsl(&fsl);
1150                }
1151            }
1152        }
1153        context.build()
1154    }
1155}
1156
1157/// Starts with serialized repetition and definition levels and unravels
1158/// them into validity buffers and offsets buffers
1159///
1160/// This is used during decoding to create the necessary arrow structures
1161#[derive(Debug)]
1162pub struct RepDefUnraveler {
1163    rep_levels: Option<LevelBuffer>,
1164    def_levels: Option<LevelBuffer>,
1165    // Maps from definition level to the rep level at which that definition level is visible
1166    levels_to_rep: Vec<u16>,
1167    def_meaning: Arc<[DefinitionInterpretation]>,
1168    // Current definition level to compare to.
1169    current_def_cmp: u16,
1170    // Current rep level, determines which specials we can see
1171    current_rep_cmp: u16,
1172    // Current layer index, 0 means inner-most layer and it counts up from there.  Used to index
1173    // into special_defs
1174    current_layer: usize,
1175    // Number of items in the inner-most layer (needed if the definition levels are not present)
1176    num_items: u64,
1177}
1178
1179impl RepDefUnraveler {
1180    /// Creates a new unraveler from serialized repetition and definition information
1181    pub fn new(
1182        rep_levels: Option<LevelBuffer>,
1183        def_levels: Option<LevelBuffer>,
1184        def_meaning: Arc<[DefinitionInterpretation]>,
1185        num_items: u64,
1186    ) -> Self {
1187        let mut levels_to_rep = Vec::with_capacity(def_meaning.len());
1188        let mut rep_counter = 0;
1189        // Level=0 is always visible and means valid item
1190        levels_to_rep.push(0);
1191        for meaning in def_meaning.as_ref() {
1192            match meaning {
1193                DefinitionInterpretation::AllValidItem | DefinitionInterpretation::AllValidList => {
1194                    // There is no corresponding level, so nothing to put in levels_to_rep
1195                }
1196                DefinitionInterpretation::NullableItem => {
1197                    // Some null structs are not visible at inner rep levels in cases like LIST<STRUCT<LIST<...>>>
1198                    levels_to_rep.push(rep_counter);
1199                }
1200                DefinitionInterpretation::NullableList => {
1201                    rep_counter += 1;
1202                    levels_to_rep.push(rep_counter);
1203                }
1204                DefinitionInterpretation::EmptyableList => {
1205                    rep_counter += 1;
1206                    levels_to_rep.push(rep_counter);
1207                }
1208                DefinitionInterpretation::NullableAndEmptyableList => {
1209                    rep_counter += 1;
1210                    levels_to_rep.push(rep_counter);
1211                    levels_to_rep.push(rep_counter);
1212                }
1213            }
1214        }
1215        Self {
1216            rep_levels,
1217            def_levels,
1218            current_def_cmp: 0,
1219            current_rep_cmp: 0,
1220            levels_to_rep,
1221            current_layer: 0,
1222            def_meaning,
1223            num_items,
1224        }
1225    }
1226
1227    pub fn is_all_valid(&self) -> bool {
1228        self.def_meaning[self.current_layer].is_all_valid()
1229    }
1230
1231    /// If the current level is a repetition layer then this returns the number of lists
1232    /// at this level.
1233    ///
1234    /// This is not valid to call when the current level is a struct/primitive layer because
1235    /// in some cases there may be no rep or def information to know this.
1236    pub fn max_lists(&self) -> usize {
1237        debug_assert!(
1238            self.def_meaning[self.current_layer] != DefinitionInterpretation::NullableItem
1239        );
1240        self.rep_levels
1241            .as_ref()
1242            // Worst case every rep item is max_rep and a new list
1243            .map(|levels| levels.len())
1244            .unwrap_or(0)
1245    }
1246
1247    /// Unravels a layer of offsets from the unraveler into the given offset width
1248    ///
1249    /// When decoding a list the caller should first unravel the offsets and then
1250    /// unravel the validity (this is the opposite order used during encoding)
1251    pub fn unravel_offsets<T: ArrowNativeType>(
1252        &mut self,
1253        offsets: &mut Vec<T>,
1254        validity: Option<&mut BooleanBufferBuilder>,
1255    ) -> Result<()> {
1256        let rep_levels = self
1257            .rep_levels
1258            .as_mut()
1259            .expect("Expected repetition level but data didn't contain repetition");
1260        let valid_level = self.current_def_cmp;
1261        let (null_level, empty_level) = match self.def_meaning[self.current_layer] {
1262            DefinitionInterpretation::NullableList => {
1263                self.current_def_cmp += 1;
1264                (valid_level + 1, 0)
1265            }
1266            DefinitionInterpretation::EmptyableList => {
1267                self.current_def_cmp += 1;
1268                (0, valid_level + 1)
1269            }
1270            DefinitionInterpretation::NullableAndEmptyableList => {
1271                self.current_def_cmp += 2;
1272                (valid_level + 1, valid_level + 2)
1273            }
1274            DefinitionInterpretation::AllValidList => (0, 0),
1275            _ => unreachable!(),
1276        };
1277        self.current_layer += 1;
1278
1279        // This is the highest def level that is still visible.  Once we hit a list then
1280        // we stop looking because any null / empty list (or list masked by a higher level
1281        // null) will not be visible
1282        let mut max_level = null_level.max(empty_level).max(valid_level);
1283        // Anything higher than this (but less than max_level) is a null struct masking our
1284        // list.  We will materialize this is a null list.
1285        let upper_null = max_level;
1286        for level in self.def_meaning[self.current_layer..].iter() {
1287            match level {
1288                DefinitionInterpretation::NullableItem => {
1289                    max_level += 1;
1290                }
1291                DefinitionInterpretation::AllValidItem => {}
1292                _ => {
1293                    break;
1294                }
1295            }
1296        }
1297
1298        let mut curlen: usize = offsets.last().map(|o| o.as_usize()).unwrap_or(0);
1299
1300        // If offsets is empty this is a no-op.  If offsets is not empty that means we already
1301        // added a set of offsets.  For example, we might have added [0, 3, 5] (2 lists).  Now
1302        // say we want to add [0, 1, 4] (2 lists).  We should get [0, 3, 5, 6, 9] (4 lists).  If
1303        // we don't pop here we get [0, 3, 5, 5, 6, 9] which is wrong.
1304        //
1305        // Or, to think about it another way, if every unraveler adds the starting 0 and the trailing
1306        // length then we have N + unravelers.len() values instead of N + 1.
1307        offsets.pop();
1308
1309        let to_offset = |val: usize| {
1310            T::from_usize(val)
1311            .ok_or_else(|| Error::invalid_input("A single batch had more than i32::MAX values and so a large container type is required"))
1312        };
1313        self.current_rep_cmp += 1;
1314        if let Some(def_levels) = &mut self.def_levels {
1315            assert!(rep_levels.len() == def_levels.len());
1316            // It's possible validity is None even if we have def levels.  For example, we might have
1317            // empty lists (which require def levels) but no nulls.
1318            let mut push_validity: Box<dyn FnMut(bool)> = if let Some(validity) = validity {
1319                Box::new(|is_valid| validity.append(is_valid))
1320            } else {
1321                Box::new(|_| {})
1322            };
1323            // This is a strange access pattern.  We are iterating over the rep/def levels and
1324            // at the same time writing the rep/def levels.  This means we need both a mutable
1325            // and immutable reference to the rep/def levels.
1326            let mut read_idx = 0;
1327            let mut write_idx = 0;
1328            while read_idx < rep_levels.len() {
1329                // SAFETY: We assert that rep_levels and def_levels have the same
1330                // len and read_idx and write_idx can never go past the end.
1331                unsafe {
1332                    let rep_val = *rep_levels.get_unchecked(read_idx);
1333                    if rep_val != 0 {
1334                        let def_val = *def_levels.get_unchecked(read_idx);
1335                        // Copy over
1336                        *rep_levels.get_unchecked_mut(write_idx) = rep_val - 1;
1337                        *def_levels.get_unchecked_mut(write_idx) = def_val;
1338                        write_idx += 1;
1339
1340                        if def_val == 0 {
1341                            // This is a valid list
1342                            offsets.push(to_offset(curlen)?);
1343                            curlen += 1;
1344                            push_validity(true);
1345                        } else if def_val > max_level {
1346                            // This is not visible at this rep level, do not add to offsets, but keep in repdef
1347                        } else if def_val == null_level || def_val > upper_null {
1348                            // This is a null list (or a list masked by a null struct)
1349                            offsets.push(to_offset(curlen)?);
1350                            push_validity(false);
1351                        } else if def_val == empty_level {
1352                            // This is an empty list
1353                            offsets.push(to_offset(curlen)?);
1354                            push_validity(true);
1355                        } else {
1356                            // New valid list starting with null item
1357                            offsets.push(to_offset(curlen)?);
1358                            curlen += 1;
1359                            push_validity(true);
1360                        }
1361                    } else {
1362                        curlen += 1;
1363                    }
1364                    read_idx += 1;
1365                }
1366            }
1367            offsets.push(to_offset(curlen)?);
1368            rep_levels.truncate(write_idx);
1369            def_levels.truncate(write_idx);
1370            Ok(())
1371        } else {
1372            // SAFETY: See above loop
1373            let mut read_idx = 0;
1374            let mut write_idx = 0;
1375            let old_offsets_len = offsets.len();
1376            while read_idx < rep_levels.len() {
1377                // SAFETY: read_idx / write_idx cannot go past rep_levels.len()
1378                unsafe {
1379                    let rep_val = *rep_levels.get_unchecked(read_idx);
1380                    if rep_val != 0 {
1381                        // Finish the current list
1382                        offsets.push(to_offset(curlen)?);
1383                        *rep_levels.get_unchecked_mut(write_idx) = rep_val - 1;
1384                        write_idx += 1;
1385                    }
1386                    curlen += 1;
1387                    read_idx += 1;
1388                }
1389            }
1390            let num_new_lists = offsets.len() - old_offsets_len;
1391            offsets.push(to_offset(curlen)?);
1392            rep_levels.truncate(offsets.len() - 1);
1393            if let Some(validity) = validity {
1394                // Even though we don't have validity it is possible another unraveler did and so we need
1395                // to push all valids
1396                validity.append_n(num_new_lists, true);
1397            }
1398            Ok(())
1399        }
1400    }
1401
1402    pub fn skip_validity(&mut self) {
1403        debug_assert!(
1404            self.def_meaning[self.current_layer] == DefinitionInterpretation::AllValidItem
1405        );
1406        self.current_layer += 1;
1407    }
1408
1409    /// Unravels a layer of validity from the definition levels
1410    pub fn unravel_validity(&mut self, validity: &mut BooleanBufferBuilder) {
1411        if self.def_meaning[self.current_layer] == DefinitionInterpretation::AllValidItem {
1412            self.current_layer += 1;
1413            validity.append_n(self.num_items as usize, true);
1414            return;
1415        }
1416
1417        self.current_layer += 1;
1418        let def_levels = &self.def_levels.as_ref().unwrap();
1419
1420        let current_def_cmp = self.current_def_cmp;
1421        self.current_def_cmp += 1;
1422
1423        for is_valid in def_levels.iter().filter_map(|&level| {
1424            if self.levels_to_rep[level as usize] <= self.current_rep_cmp {
1425                Some(level <= current_def_cmp)
1426            } else {
1427                None
1428            }
1429        }) {
1430            validity.append(is_valid);
1431        }
1432    }
1433
1434    pub fn decimate(&mut self, dimension: usize) {
1435        if self.rep_levels.is_some() {
1436            // If we need to support this then I think we need to walk through the rep def levels to find
1437            // the spots at which we keep.  E.g. if we have:
1438            //  rep: 1 0 0 1 0 1 0 0 0 1 0 0
1439            //  def: 1 1 1 0 1 0 1 1 0 1 1 0
1440            //  dimension: 2
1441            //
1442            // The output should be:
1443            //  rep: 1 0 0 1 0 0 0
1444            //  def: 1 1 1 0 1 1 0
1445            //
1446            // Maybe there's some special logic for empty/null lists?  I'll save the headache for future me.
1447            todo!("Not yet supported FSL<...List<...>>");
1448        }
1449        let Some(def_levels) = self.def_levels.as_mut() else {
1450            return;
1451        };
1452        let mut read_idx = 0;
1453        let mut write_idx = 0;
1454        while read_idx < def_levels.len() {
1455            unsafe {
1456                *def_levels.get_unchecked_mut(write_idx) = *def_levels.get_unchecked(read_idx);
1457            }
1458            write_idx += 1;
1459            read_idx += dimension;
1460        }
1461        def_levels.truncate(write_idx);
1462    }
1463}
1464
1465/// As we decode we may extract rep/def information from multiple pages (or multiple
1466/// chunks within a page).
1467///
1468/// For each chunk we create an unraveler.  Each unraveler can have a completely different
1469/// interpretation (e.g. one page might contain null items but no null structs and the next
1470/// page might have null structs but no null items).
1471///
1472/// Concatenating these unravelers would be tricky and expensive so instead we have a
1473/// composite unraveler which unravels across multiple unravelers.
1474///
1475/// Note: this class should be used even if there is only one page / unraveler.  This is
1476/// because the `RepDefUnraveler`'s API is more complex (it's meant to be called by this
1477/// class)
1478#[derive(Debug)]
1479pub struct CompositeRepDefUnraveler {
1480    unravelers: Vec<RepDefUnraveler>,
1481}
1482
1483impl CompositeRepDefUnraveler {
1484    pub fn new(unravelers: Vec<RepDefUnraveler>) -> Self {
1485        Self { unravelers }
1486    }
1487
1488    /// Unravels a layer of validity
1489    ///
1490    /// Returns None if there are no null items in this layer
1491    pub fn unravel_validity(&mut self, num_values: usize) -> Option<NullBuffer> {
1492        let is_all_valid = self
1493            .unravelers
1494            .iter()
1495            .all(|unraveler| unraveler.is_all_valid());
1496
1497        if is_all_valid {
1498            for unraveler in self.unravelers.iter_mut() {
1499                unraveler.skip_validity();
1500            }
1501            None
1502        } else {
1503            let mut validity = BooleanBufferBuilder::new(num_values);
1504            for unraveler in self.unravelers.iter_mut() {
1505                unraveler.unravel_validity(&mut validity);
1506            }
1507            Some(NullBuffer::new(validity.finish()))
1508        }
1509    }
1510
1511    pub fn unravel_fsl_validity(
1512        &mut self,
1513        num_values: usize,
1514        dimension: usize,
1515    ) -> Option<NullBuffer> {
1516        for unraveler in self.unravelers.iter_mut() {
1517            unraveler.decimate(dimension);
1518        }
1519        self.unravel_validity(num_values)
1520    }
1521
1522    /// Unravels a layer of offsets (and the validity for that layer)
1523    pub fn unravel_offsets<T: ArrowNativeType>(
1524        &mut self,
1525    ) -> Result<(OffsetBuffer<T>, Option<NullBuffer>)> {
1526        let mut is_all_valid = true;
1527        let mut max_num_lists = 0;
1528        for unraveler in self.unravelers.iter() {
1529            is_all_valid &= unraveler.is_all_valid();
1530            max_num_lists += unraveler.max_lists();
1531        }
1532
1533        let mut validity = if is_all_valid {
1534            None
1535        } else {
1536            // Note: This is probably an over-estimate and potentially even an under-estimate.  We only know
1537            // right now how many items we have and not how many rows.  (TODO: Shouldn't we know the # of rows?)
1538            Some(BooleanBufferBuilder::new(max_num_lists))
1539        };
1540
1541        let mut offsets = Vec::with_capacity(max_num_lists + 1);
1542
1543        for unraveler in self.unravelers.iter_mut() {
1544            unraveler.unravel_offsets(&mut offsets, validity.as_mut())?;
1545        }
1546
1547        Ok((
1548            OffsetBuffer::new(ScalarBuffer::from(offsets)),
1549            validity.map(|mut v| NullBuffer::new(v.finish())),
1550        ))
1551    }
1552}
1553
1554/// A [`ControlWordIterator`] when there are both repetition and definition levels
1555///
1556/// The iterator will put the repetition level in the upper bits and the definition
1557/// level in the lower bits.  The number of bits used for each level is determined
1558/// by the width of the repetition and definition levels.
1559#[derive(Debug)]
1560pub struct BinaryControlWordIterator<I: Iterator<Item = (u16, u16)>, W> {
1561    repdef: I,
1562    def_width: usize,
1563    max_rep: u16,
1564    max_visible_def: u16,
1565    rep_mask: u16,
1566    def_mask: u16,
1567    bits_rep: u8,
1568    bits_def: u8,
1569    phantom: std::marker::PhantomData<W>,
1570}
1571
1572impl<I: Iterator<Item = (u16, u16)>> BinaryControlWordIterator<I, u8> {
1573    fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1574        let next = self.repdef.next()?;
1575        let control_word: u8 =
1576            (((next.0 & self.rep_mask) as u8) << self.def_width) + ((next.1 & self.def_mask) as u8);
1577        buf.push(control_word);
1578        let is_new_row = next.0 == self.max_rep;
1579        let is_visible = next.1 <= self.max_visible_def;
1580        let is_valid_item = next.1 == 0;
1581        Some(ControlWordDesc {
1582            is_new_row,
1583            is_visible,
1584            is_valid_item,
1585        })
1586    }
1587}
1588
1589impl<I: Iterator<Item = (u16, u16)>> BinaryControlWordIterator<I, u16> {
1590    fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1591        let next = self.repdef.next()?;
1592        let control_word: u16 =
1593            ((next.0 & self.rep_mask) << self.def_width) + (next.1 & self.def_mask);
1594        let control_word = control_word.to_le_bytes();
1595        buf.push(control_word[0]);
1596        buf.push(control_word[1]);
1597        let is_new_row = next.0 == self.max_rep;
1598        let is_visible = next.1 <= self.max_visible_def;
1599        let is_valid_item = next.1 == 0;
1600        Some(ControlWordDesc {
1601            is_new_row,
1602            is_visible,
1603            is_valid_item,
1604        })
1605    }
1606}
1607
1608impl<I: Iterator<Item = (u16, u16)>> BinaryControlWordIterator<I, u32> {
1609    fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1610        let next = self.repdef.next()?;
1611        let control_word: u32 = (((next.0 & self.rep_mask) as u32) << self.def_width)
1612            + ((next.1 & self.def_mask) as u32);
1613        let control_word = control_word.to_le_bytes();
1614        buf.push(control_word[0]);
1615        buf.push(control_word[1]);
1616        buf.push(control_word[2]);
1617        buf.push(control_word[3]);
1618        let is_new_row = next.0 == self.max_rep;
1619        let is_visible = next.1 <= self.max_visible_def;
1620        let is_valid_item = next.1 == 0;
1621        Some(ControlWordDesc {
1622            is_new_row,
1623            is_visible,
1624            is_valid_item,
1625        })
1626    }
1627}
1628
1629/// A [`ControlWordIterator`] when there are only definition levels or only repetition levels
1630#[derive(Debug)]
1631pub struct UnaryControlWordIterator<I: Iterator<Item = u16>, W> {
1632    repdef: I,
1633    level_mask: u16,
1634    bits_rep: u8,
1635    bits_def: u8,
1636    max_rep: u16,
1637    phantom: std::marker::PhantomData<W>,
1638}
1639
1640impl<I: Iterator<Item = u16>> UnaryControlWordIterator<I, u8> {
1641    fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1642        let next = self.repdef.next()?;
1643        buf.push((next & self.level_mask) as u8);
1644        let is_new_row = self.max_rep == 0 || next == self.max_rep;
1645        let is_valid_item = next == 0 || self.bits_def == 0;
1646        Some(ControlWordDesc {
1647            is_new_row,
1648            // Either there is no rep, in which case there are no invisible items
1649            // or there is no def, in which case there are no invisible items
1650            is_visible: true,
1651            is_valid_item,
1652        })
1653    }
1654}
1655
1656impl<I: Iterator<Item = u16>> UnaryControlWordIterator<I, u16> {
1657    fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1658        let next = self.repdef.next().unwrap() & self.level_mask;
1659        let control_word = next.to_le_bytes();
1660        buf.push(control_word[0]);
1661        buf.push(control_word[1]);
1662        let is_new_row = self.max_rep == 0 || next == self.max_rep;
1663        let is_valid_item = next == 0 || self.bits_def == 0;
1664        Some(ControlWordDesc {
1665            is_new_row,
1666            is_visible: true,
1667            is_valid_item,
1668        })
1669    }
1670}
1671
1672impl<I: Iterator<Item = u16>> UnaryControlWordIterator<I, u32> {
1673    fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1674        let next = self.repdef.next()?;
1675        let next = (next & self.level_mask) as u32;
1676        let control_word = next.to_le_bytes();
1677        buf.push(control_word[0]);
1678        buf.push(control_word[1]);
1679        buf.push(control_word[2]);
1680        buf.push(control_word[3]);
1681        let is_new_row = self.max_rep == 0 || next as u16 == self.max_rep;
1682        let is_valid_item = next == 0 || self.bits_def == 0;
1683        Some(ControlWordDesc {
1684            is_new_row,
1685            is_visible: true,
1686            is_valid_item,
1687        })
1688    }
1689}
1690
1691/// A [`ControlWordIterator`] when there are no repetition or definition levels
1692#[derive(Debug)]
1693pub struct NilaryControlWordIterator {
1694    len: usize,
1695    idx: usize,
1696}
1697
1698impl NilaryControlWordIterator {
1699    fn append_next(&mut self) -> Option<ControlWordDesc> {
1700        if self.idx == self.len {
1701            None
1702        } else {
1703            self.idx += 1;
1704            Some(ControlWordDesc {
1705                is_new_row: true,
1706                is_visible: true,
1707                is_valid_item: true,
1708            })
1709        }
1710    }
1711}
1712
1713/// Helper function to get a bit mask of the given width
1714fn get_mask(width: u16) -> u16 {
1715    (1 << width) - 1
1716}
1717
1718// We're really going out of our way to avoid boxing here but this will be called on a per-value basis
1719// so it is in the critical path.
1720type SpecificBinaryControlWordIterator<'a, T> = BinaryControlWordIterator<
1721    Zip<Copied<std::slice::Iter<'a, u16>>, Copied<std::slice::Iter<'a, u16>>>,
1722    T,
1723>;
1724
1725/// An iterator that generates control words from repetition and definition levels
1726///
1727/// "Control word" is just a fancy term for a single u8/u16/u32 that contains both
1728/// the repetition and definition in it.
1729///
1730/// In the large majority of case we only need a single byte to represent both the
1731/// repetition and definition levels.  However, if there is deep nesting then we may
1732/// need two bytes.  In the worst case we need 4 bytes though this suggests hundreds of
1733/// levels of nesting which seems unlikely to encounter in practice.
1734#[derive(Debug)]
1735pub enum ControlWordIterator<'a> {
1736    Binary8(SpecificBinaryControlWordIterator<'a, u8>),
1737    Binary16(SpecificBinaryControlWordIterator<'a, u16>),
1738    Binary32(SpecificBinaryControlWordIterator<'a, u32>),
1739    Unary8(UnaryControlWordIterator<Copied<std::slice::Iter<'a, u16>>, u8>),
1740    Unary16(UnaryControlWordIterator<Copied<std::slice::Iter<'a, u16>>, u16>),
1741    Unary32(UnaryControlWordIterator<Copied<std::slice::Iter<'a, u16>>, u32>),
1742    Nilary(NilaryControlWordIterator),
1743}
1744
1745/// Describes the properties of a control word
1746#[derive(Debug)]
1747pub struct ControlWordDesc {
1748    pub is_new_row: bool,
1749    pub is_visible: bool,
1750    pub is_valid_item: bool,
1751}
1752
1753impl ControlWordIterator<'_> {
1754    /// Appends the next control word to the buffer
1755    ///
1756    /// Returns true if this is the start of a new item (i.e. the repetition level is maxed out)
1757    pub fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1758        match self {
1759            Self::Binary8(iter) => iter.append_next(buf),
1760            Self::Binary16(iter) => iter.append_next(buf),
1761            Self::Binary32(iter) => iter.append_next(buf),
1762            Self::Unary8(iter) => iter.append_next(buf),
1763            Self::Unary16(iter) => iter.append_next(buf),
1764            Self::Unary32(iter) => iter.append_next(buf),
1765            Self::Nilary(iter) => iter.append_next(),
1766        }
1767    }
1768
1769    /// Return true if the control word iterator has repetition levels
1770    pub fn has_repetition(&self) -> bool {
1771        match self {
1772            Self::Binary8(_) | Self::Binary16(_) | Self::Binary32(_) => true,
1773            Self::Unary8(iter) => iter.bits_rep > 0,
1774            Self::Unary16(iter) => iter.bits_rep > 0,
1775            Self::Unary32(iter) => iter.bits_rep > 0,
1776            Self::Nilary(_) => false,
1777        }
1778    }
1779
1780    /// Returns the number of bytes per control word
1781    pub fn bytes_per_word(&self) -> usize {
1782        match self {
1783            Self::Binary8(_) => 1,
1784            Self::Binary16(_) => 2,
1785            Self::Binary32(_) => 4,
1786            Self::Unary8(_) => 1,
1787            Self::Unary16(_) => 2,
1788            Self::Unary32(_) => 4,
1789            Self::Nilary(_) => 0,
1790        }
1791    }
1792
1793    /// Returns the number of bits used for the repetition level
1794    pub fn bits_rep(&self) -> u8 {
1795        match self {
1796            Self::Binary8(iter) => iter.bits_rep,
1797            Self::Binary16(iter) => iter.bits_rep,
1798            Self::Binary32(iter) => iter.bits_rep,
1799            Self::Unary8(iter) => iter.bits_rep,
1800            Self::Unary16(iter) => iter.bits_rep,
1801            Self::Unary32(iter) => iter.bits_rep,
1802            Self::Nilary(_) => 0,
1803        }
1804    }
1805
1806    /// Returns the number of bits used for the definition level
1807    pub fn bits_def(&self) -> u8 {
1808        match self {
1809            Self::Binary8(iter) => iter.bits_def,
1810            Self::Binary16(iter) => iter.bits_def,
1811            Self::Binary32(iter) => iter.bits_def,
1812            Self::Unary8(iter) => iter.bits_def,
1813            Self::Unary16(iter) => iter.bits_def,
1814            Self::Unary32(iter) => iter.bits_def,
1815            Self::Nilary(_) => 0,
1816        }
1817    }
1818}
1819
1820/// Builds a [`ControlWordIterator`] from repetition and definition levels
1821/// by first calculating the width needed and then creating the iterator
1822/// with the appropriate width
1823pub fn build_control_word_iterator<'a>(
1824    rep: Option<&'a [u16]>,
1825    max_rep: u16,
1826    def: Option<&'a [u16]>,
1827    max_def: u16,
1828    max_visible_def: u16,
1829    len: usize,
1830) -> ControlWordIterator<'a> {
1831    let rep_width = if max_rep == 0 {
1832        0
1833    } else {
1834        log_2_ceil(max_rep as u32) as u16
1835    };
1836    let rep_mask = if max_rep == 0 { 0 } else { get_mask(rep_width) };
1837    let def_width = if max_def == 0 {
1838        0
1839    } else {
1840        log_2_ceil(max_def as u32) as u16
1841    };
1842    let def_mask = if max_def == 0 { 0 } else { get_mask(def_width) };
1843    let total_width = rep_width + def_width;
1844    match (rep, def) {
1845        (Some(rep), Some(def)) => {
1846            let iter = rep.iter().copied().zip(def.iter().copied());
1847            let def_width = def_width as usize;
1848            if total_width <= 8 {
1849                ControlWordIterator::Binary8(BinaryControlWordIterator {
1850                    repdef: iter,
1851                    rep_mask,
1852                    def_mask,
1853                    def_width,
1854                    max_rep,
1855                    max_visible_def,
1856                    bits_rep: rep_width as u8,
1857                    bits_def: def_width as u8,
1858                    phantom: std::marker::PhantomData,
1859                })
1860            } else if total_width <= 16 {
1861                ControlWordIterator::Binary16(BinaryControlWordIterator {
1862                    repdef: iter,
1863                    rep_mask,
1864                    def_mask,
1865                    def_width,
1866                    max_rep,
1867                    max_visible_def,
1868                    bits_rep: rep_width as u8,
1869                    bits_def: def_width as u8,
1870                    phantom: std::marker::PhantomData,
1871                })
1872            } else {
1873                ControlWordIterator::Binary32(BinaryControlWordIterator {
1874                    repdef: iter,
1875                    rep_mask,
1876                    def_mask,
1877                    def_width,
1878                    max_rep,
1879                    max_visible_def,
1880                    bits_rep: rep_width as u8,
1881                    bits_def: def_width as u8,
1882                    phantom: std::marker::PhantomData,
1883                })
1884            }
1885        }
1886        (Some(lev), None) => {
1887            let iter = lev.iter().copied();
1888            if total_width <= 8 {
1889                ControlWordIterator::Unary8(UnaryControlWordIterator {
1890                    repdef: iter,
1891                    level_mask: rep_mask,
1892                    bits_rep: total_width as u8,
1893                    bits_def: 0,
1894                    max_rep,
1895                    phantom: std::marker::PhantomData,
1896                })
1897            } else if total_width <= 16 {
1898                ControlWordIterator::Unary16(UnaryControlWordIterator {
1899                    repdef: iter,
1900                    level_mask: rep_mask,
1901                    bits_rep: total_width as u8,
1902                    bits_def: 0,
1903                    max_rep,
1904                    phantom: std::marker::PhantomData,
1905                })
1906            } else {
1907                ControlWordIterator::Unary32(UnaryControlWordIterator {
1908                    repdef: iter,
1909                    level_mask: rep_mask,
1910                    bits_rep: total_width as u8,
1911                    bits_def: 0,
1912                    max_rep,
1913                    phantom: std::marker::PhantomData,
1914                })
1915            }
1916        }
1917        (None, Some(lev)) => {
1918            let iter = lev.iter().copied();
1919            if total_width <= 8 {
1920                ControlWordIterator::Unary8(UnaryControlWordIterator {
1921                    repdef: iter,
1922                    level_mask: def_mask,
1923                    bits_rep: 0,
1924                    bits_def: total_width as u8,
1925                    max_rep: 0,
1926                    phantom: std::marker::PhantomData,
1927                })
1928            } else if total_width <= 16 {
1929                ControlWordIterator::Unary16(UnaryControlWordIterator {
1930                    repdef: iter,
1931                    level_mask: def_mask,
1932                    bits_rep: 0,
1933                    bits_def: total_width as u8,
1934                    max_rep: 0,
1935                    phantom: std::marker::PhantomData,
1936                })
1937            } else {
1938                ControlWordIterator::Unary32(UnaryControlWordIterator {
1939                    repdef: iter,
1940                    level_mask: def_mask,
1941                    bits_rep: 0,
1942                    bits_def: total_width as u8,
1943                    max_rep: 0,
1944                    phantom: std::marker::PhantomData,
1945                })
1946            }
1947        }
1948        (None, None) => ControlWordIterator::Nilary(NilaryControlWordIterator { len, idx: 0 }),
1949    }
1950}
1951
1952/// A parser to unwrap control words into repetition and definition levels
1953///
1954/// This is the inverse of the [`ControlWordIterator`].
1955#[derive(Copy, Clone, Debug)]
1956pub enum ControlWordParser {
1957    // First item is the bits to shift, second is the mask to apply (the mask can be
1958    // calculated from the bits to shift but we don't want to calculate it each time)
1959    BOTH8(u8, u32),
1960    BOTH16(u8, u32),
1961    BOTH32(u8, u32),
1962    REP8,
1963    REP16,
1964    REP32,
1965    DEF8,
1966    DEF16,
1967    DEF32,
1968    NIL,
1969}
1970
1971impl ControlWordParser {
1972    fn parse_both<const WORD_SIZE: u8>(
1973        src: &[u8],
1974        dst_rep: &mut Vec<u16>,
1975        dst_def: &mut Vec<u16>,
1976        bits_to_shift: u8,
1977        mask_to_apply: u32,
1978    ) {
1979        match WORD_SIZE {
1980            1 => {
1981                let word = src[0];
1982                let rep = word >> bits_to_shift;
1983                let def = word & (mask_to_apply as u8);
1984                dst_rep.push(rep as u16);
1985                dst_def.push(def as u16);
1986            }
1987            2 => {
1988                let word = u16::from_le_bytes([src[0], src[1]]);
1989                let rep = word >> bits_to_shift;
1990                let def = word & mask_to_apply as u16;
1991                dst_rep.push(rep);
1992                dst_def.push(def);
1993            }
1994            4 => {
1995                let word = u32::from_le_bytes([src[0], src[1], src[2], src[3]]);
1996                let rep = word >> bits_to_shift;
1997                let def = word & mask_to_apply;
1998                dst_rep.push(rep as u16);
1999                dst_def.push(def as u16);
2000            }
2001            _ => unreachable!(),
2002        }
2003    }
2004
2005    fn parse_desc_both<const WORD_SIZE: u8>(
2006        src: &[u8],
2007        bits_to_shift: u8,
2008        mask_to_apply: u32,
2009        max_rep: u16,
2010        max_visible_def: u16,
2011    ) -> ControlWordDesc {
2012        match WORD_SIZE {
2013            1 => {
2014                let word = src[0];
2015                let rep = word >> bits_to_shift;
2016                let def = word & (mask_to_apply as u8);
2017                let is_visible = def as u16 <= max_visible_def;
2018                let is_new_row = rep as u16 == max_rep;
2019                let is_valid_item = def == 0;
2020                ControlWordDesc {
2021                    is_visible,
2022                    is_new_row,
2023                    is_valid_item,
2024                }
2025            }
2026            2 => {
2027                let word = u16::from_le_bytes([src[0], src[1]]);
2028                let rep = word >> bits_to_shift;
2029                let def = word & mask_to_apply as u16;
2030                let is_visible = def <= max_visible_def;
2031                let is_new_row = rep == max_rep;
2032                let is_valid_item = def == 0;
2033                ControlWordDesc {
2034                    is_visible,
2035                    is_new_row,
2036                    is_valid_item,
2037                }
2038            }
2039            4 => {
2040                let word = u32::from_le_bytes([src[0], src[1], src[2], src[3]]);
2041                let rep = word >> bits_to_shift;
2042                let def = word & mask_to_apply;
2043                let is_visible = def as u16 <= max_visible_def;
2044                let is_new_row = rep as u16 == max_rep;
2045                let is_valid_item = def == 0;
2046                ControlWordDesc {
2047                    is_visible,
2048                    is_new_row,
2049                    is_valid_item,
2050                }
2051            }
2052            _ => unreachable!(),
2053        }
2054    }
2055
2056    fn parse_one<const WORD_SIZE: u8>(src: &[u8], dst: &mut Vec<u16>) {
2057        match WORD_SIZE {
2058            1 => {
2059                let word = src[0];
2060                dst.push(word as u16);
2061            }
2062            2 => {
2063                let word = u16::from_le_bytes([src[0], src[1]]);
2064                dst.push(word);
2065            }
2066            4 => {
2067                let word = u32::from_le_bytes([src[0], src[1], src[2], src[3]]);
2068                dst.push(word as u16);
2069            }
2070            _ => unreachable!(),
2071        }
2072    }
2073
2074    fn parse_rep_desc_one<const WORD_SIZE: u8>(src: &[u8], max_rep: u16) -> ControlWordDesc {
2075        match WORD_SIZE {
2076            1 => ControlWordDesc {
2077                is_new_row: src[0] as u16 == max_rep,
2078                is_visible: true,
2079                is_valid_item: true,
2080            },
2081            2 => ControlWordDesc {
2082                is_new_row: u16::from_le_bytes([src[0], src[1]]) == max_rep,
2083                is_visible: true,
2084                is_valid_item: true,
2085            },
2086            4 => ControlWordDesc {
2087                is_new_row: u32::from_le_bytes([src[0], src[1], src[2], src[3]]) as u16 == max_rep,
2088                is_visible: true,
2089                is_valid_item: true,
2090            },
2091            _ => unreachable!(),
2092        }
2093    }
2094
2095    fn parse_def_desc_one<const WORD_SIZE: u8>(src: &[u8]) -> ControlWordDesc {
2096        match WORD_SIZE {
2097            1 => ControlWordDesc {
2098                is_new_row: true,
2099                is_visible: true,
2100                is_valid_item: src[0] == 0,
2101            },
2102            2 => ControlWordDesc {
2103                is_new_row: true,
2104                is_visible: true,
2105                is_valid_item: u16::from_le_bytes([src[0], src[1]]) == 0,
2106            },
2107            4 => ControlWordDesc {
2108                is_new_row: true,
2109                is_visible: true,
2110                is_valid_item: u32::from_le_bytes([src[0], src[1], src[2], src[3]]) as u16 == 0,
2111            },
2112            _ => unreachable!(),
2113        }
2114    }
2115
2116    /// Returns the number of bytes per control word
2117    pub fn bytes_per_word(&self) -> usize {
2118        match self {
2119            Self::BOTH8(..) => 1,
2120            Self::BOTH16(..) => 2,
2121            Self::BOTH32(..) => 4,
2122            Self::REP8 => 1,
2123            Self::REP16 => 2,
2124            Self::REP32 => 4,
2125            Self::DEF8 => 1,
2126            Self::DEF16 => 2,
2127            Self::DEF32 => 4,
2128            Self::NIL => 0,
2129        }
2130    }
2131
2132    /// Appends the next control word to the rep & def buffers
2133    ///
2134    /// `src` should be pointing at the first byte (little endian) of the control word
2135    ///
2136    /// `dst_rep` and `dst_def` are the buffers to append the rep and def levels to.
2137    /// They will not be appended to if not needed.
2138    pub fn parse(&self, src: &[u8], dst_rep: &mut Vec<u16>, dst_def: &mut Vec<u16>) {
2139        match self {
2140            Self::BOTH8(bits_to_shift, mask_to_apply) => {
2141                Self::parse_both::<1>(src, dst_rep, dst_def, *bits_to_shift, *mask_to_apply)
2142            }
2143            Self::BOTH16(bits_to_shift, mask_to_apply) => {
2144                Self::parse_both::<2>(src, dst_rep, dst_def, *bits_to_shift, *mask_to_apply)
2145            }
2146            Self::BOTH32(bits_to_shift, mask_to_apply) => {
2147                Self::parse_both::<4>(src, dst_rep, dst_def, *bits_to_shift, *mask_to_apply)
2148            }
2149            Self::REP8 => Self::parse_one::<1>(src, dst_rep),
2150            Self::REP16 => Self::parse_one::<2>(src, dst_rep),
2151            Self::REP32 => Self::parse_one::<4>(src, dst_rep),
2152            Self::DEF8 => Self::parse_one::<1>(src, dst_def),
2153            Self::DEF16 => Self::parse_one::<2>(src, dst_def),
2154            Self::DEF32 => Self::parse_one::<4>(src, dst_def),
2155            Self::NIL => {}
2156        }
2157    }
2158
2159    /// Return true if the control words contain repetition information
2160    pub fn has_rep(&self) -> bool {
2161        match self {
2162            Self::BOTH8(..)
2163            | Self::BOTH16(..)
2164            | Self::BOTH32(..)
2165            | Self::REP8
2166            | Self::REP16
2167            | Self::REP32 => true,
2168            Self::DEF8 | Self::DEF16 | Self::DEF32 | Self::NIL => false,
2169        }
2170    }
2171
2172    /// Temporarily parses the control word to inspect its properties but does not append to any buffers
2173    pub fn parse_desc(&self, src: &[u8], max_rep: u16, max_visible_def: u16) -> ControlWordDesc {
2174        match self {
2175            Self::BOTH8(bits_to_shift, mask_to_apply) => Self::parse_desc_both::<1>(
2176                src,
2177                *bits_to_shift,
2178                *mask_to_apply,
2179                max_rep,
2180                max_visible_def,
2181            ),
2182            Self::BOTH16(bits_to_shift, mask_to_apply) => Self::parse_desc_both::<2>(
2183                src,
2184                *bits_to_shift,
2185                *mask_to_apply,
2186                max_rep,
2187                max_visible_def,
2188            ),
2189            Self::BOTH32(bits_to_shift, mask_to_apply) => Self::parse_desc_both::<4>(
2190                src,
2191                *bits_to_shift,
2192                *mask_to_apply,
2193                max_rep,
2194                max_visible_def,
2195            ),
2196            Self::REP8 => Self::parse_rep_desc_one::<1>(src, max_rep),
2197            Self::REP16 => Self::parse_rep_desc_one::<2>(src, max_rep),
2198            Self::REP32 => Self::parse_rep_desc_one::<4>(src, max_rep),
2199            Self::DEF8 => Self::parse_def_desc_one::<1>(src),
2200            Self::DEF16 => Self::parse_def_desc_one::<2>(src),
2201            Self::DEF32 => Self::parse_def_desc_one::<4>(src),
2202            Self::NIL => ControlWordDesc {
2203                is_new_row: true,
2204                is_valid_item: true,
2205                is_visible: true,
2206            },
2207        }
2208    }
2209
2210    /// Creates a new parser from the number of bits used for the repetition and definition levels
2211    pub fn new(bits_rep: u8, bits_def: u8) -> Self {
2212        let total_bits = bits_rep + bits_def;
2213
2214        enum WordSize {
2215            One,
2216            Two,
2217            Four,
2218        }
2219
2220        let word_size = if total_bits <= 8 {
2221            WordSize::One
2222        } else if total_bits <= 16 {
2223            WordSize::Two
2224        } else {
2225            WordSize::Four
2226        };
2227
2228        match (bits_rep > 0, bits_def > 0, word_size) {
2229            (false, false, _) => Self::NIL,
2230            (false, true, WordSize::One) => Self::DEF8,
2231            (false, true, WordSize::Two) => Self::DEF16,
2232            (false, true, WordSize::Four) => Self::DEF32,
2233            (true, false, WordSize::One) => Self::REP8,
2234            (true, false, WordSize::Two) => Self::REP16,
2235            (true, false, WordSize::Four) => Self::REP32,
2236            (true, true, WordSize::One) => Self::BOTH8(bits_def, get_mask(bits_def as u16) as u32),
2237            (true, true, WordSize::Two) => Self::BOTH16(bits_def, get_mask(bits_def as u16) as u32),
2238            (true, true, WordSize::Four) => {
2239                Self::BOTH32(bits_def, get_mask(bits_def as u16) as u32)
2240            }
2241        }
2242    }
2243}
2244
2245#[cfg(test)]
2246mod tests {
2247    use arrow_buffer::{NullBuffer, OffsetBuffer, ScalarBuffer};
2248
2249    use crate::repdef::{
2250        CompositeRepDefUnraveler, DefinitionInterpretation, RepDefUnraveler, SerializedRepDefs,
2251    };
2252
2253    use super::RepDefBuilder;
2254
2255    fn validity(values: &[bool]) -> NullBuffer {
2256        NullBuffer::from_iter(values.iter().copied())
2257    }
2258
2259    fn offsets_32(values: &[i32]) -> OffsetBuffer<i32> {
2260        OffsetBuffer::<i32>::new(ScalarBuffer::from_iter(values.iter().copied()))
2261    }
2262
2263    fn offsets_64(values: &[i64]) -> OffsetBuffer<i64> {
2264        OffsetBuffer::<i64>::new(ScalarBuffer::from_iter(values.iter().copied()))
2265    }
2266
2267    #[test]
2268    fn test_repdef_empty_offsets() {
2269        // Empty offsets should serialize without panicking.
2270        let mut builder = RepDefBuilder::default();
2271        builder.add_offsets(offsets_32(&[0]), None);
2272        let repdefs = RepDefBuilder::serialize(vec![builder]);
2273        assert!(repdefs.repetition_levels.is_none());
2274        assert!(repdefs.definition_levels.is_none());
2275    }
2276
2277    #[test]
2278    fn test_repdef_basic() {
2279        // Basic case, rep & def
2280        let mut builder = RepDefBuilder::default();
2281        builder.add_offsets(
2282            offsets_64(&[0, 2, 2, 5]),
2283            Some(validity(&[true, false, true])),
2284        );
2285        builder.add_offsets(
2286            offsets_64(&[0, 1, 3, 5, 5, 9]),
2287            Some(validity(&[true, true, true, false, true])),
2288        );
2289        builder.add_validity_bitmap(validity(&[
2290            true, true, true, false, false, false, true, true, false,
2291        ]));
2292
2293        let repdefs = RepDefBuilder::serialize(vec![builder]);
2294        let rep = repdefs.repetition_levels.unwrap();
2295        let def = repdefs.definition_levels.unwrap();
2296
2297        assert_eq!(vec![0, 0, 0, 3, 1, 1, 2, 1, 0, 0, 1], *def);
2298        assert_eq!(vec![2, 1, 0, 2, 2, 0, 1, 1, 0, 0, 0], *rep);
2299
2300        // [[I], [I, I]], NULL, [[NULL, NULL], NULL, [NULL, I, I, NULL]]
2301
2302        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2303            Some(rep.as_ref().to_vec()),
2304            Some(def.as_ref().to_vec()),
2305            repdefs.def_meaning.into(),
2306            9,
2307        )]);
2308
2309        // Note: validity doesn't exactly round-trip because repdef normalizes some of the
2310        // redundant validity values
2311        assert_eq!(
2312            unraveler.unravel_validity(9),
2313            Some(validity(&[
2314                true, true, true, false, false, false, true, true, false
2315            ]))
2316        );
2317        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2318        assert_eq!(off.inner(), offsets_32(&[0, 1, 3, 5, 5, 9]).inner());
2319        assert_eq!(val, Some(validity(&[true, true, true, false, true])));
2320        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2321        assert_eq!(off.inner(), offsets_32(&[0, 2, 2, 5]).inner());
2322        assert_eq!(val, Some(validity(&[true, false, true])));
2323    }
2324
2325    #[test]
2326    fn test_repdef_simple_null_empty_list() {
2327        let check = |repdefs: SerializedRepDefs, last_def: DefinitionInterpretation| {
2328            let rep = repdefs.repetition_levels.unwrap();
2329            let def = repdefs.definition_levels.unwrap();
2330
2331            assert_eq!([1, 0, 1, 1, 0, 0], *rep);
2332            assert_eq!([0, 0, 2, 0, 1, 0], *def);
2333            assert_eq!(
2334                vec![DefinitionInterpretation::NullableItem, last_def,],
2335                repdefs.def_meaning
2336            );
2337        };
2338
2339        // Null list and empty list should be serialized mostly the same
2340
2341        // Null case
2342        let mut builder = RepDefBuilder::default();
2343        builder.add_offsets(
2344            offsets_32(&[0, 2, 2, 5]),
2345            Some(validity(&[true, false, true])),
2346        );
2347        builder.add_validity_bitmap(validity(&[true, true, true, false, true]));
2348
2349        let repdefs = RepDefBuilder::serialize(vec![builder]);
2350
2351        check(repdefs, DefinitionInterpretation::NullableList);
2352
2353        // Empty case
2354        let mut builder = RepDefBuilder::default();
2355        builder.add_offsets(offsets_32(&[0, 2, 2, 5]), None);
2356        builder.add_validity_bitmap(validity(&[true, true, true, false, true]));
2357
2358        let repdefs = RepDefBuilder::serialize(vec![builder]);
2359
2360        check(repdefs, DefinitionInterpretation::EmptyableList);
2361    }
2362
2363    #[test]
2364    fn test_repdef_empty_list_at_end() {
2365        // Regresses a failure we encountered when the last item was an empty list
2366        let mut builder = RepDefBuilder::default();
2367        builder.add_offsets(offsets_32(&[0, 2, 5, 5]), None);
2368        builder.add_validity_bitmap(validity(&[true, true, true, false, true]));
2369
2370        let repdefs = RepDefBuilder::serialize(vec![builder]);
2371
2372        let rep = repdefs.repetition_levels.unwrap();
2373        let def = repdefs.definition_levels.unwrap();
2374
2375        assert_eq!([1, 0, 1, 0, 0, 1], *rep);
2376        assert_eq!([0, 0, 0, 1, 0, 2], *def);
2377        assert_eq!(
2378            vec![
2379                DefinitionInterpretation::NullableItem,
2380                DefinitionInterpretation::EmptyableList,
2381            ],
2382            repdefs.def_meaning
2383        );
2384    }
2385
2386    #[test]
2387    fn test_repdef_abnormal_nulls() {
2388        // List nulls are allowed to have non-empty offsets and garbage values
2389        // and the add_offsets call should normalize this
2390        let mut builder = RepDefBuilder::default();
2391        builder.add_offsets(
2392            offsets_32(&[0, 2, 5, 8]),
2393            Some(validity(&[true, false, true])),
2394        );
2395        // Note: we pass 5 here and not 8.  If add_offsets tells us there is garbage nulls they
2396        // should be removed before continuing
2397        builder.add_no_null(5);
2398
2399        let repdefs = RepDefBuilder::serialize(vec![builder]);
2400
2401        let rep = repdefs.repetition_levels.unwrap();
2402        let def = repdefs.definition_levels.unwrap();
2403
2404        assert_eq!([1, 0, 1, 1, 0, 0], *rep);
2405        assert_eq!([0, 0, 1, 0, 0, 0], *def);
2406
2407        assert_eq!(
2408            vec![
2409                DefinitionInterpretation::AllValidItem,
2410                DefinitionInterpretation::NullableList,
2411            ],
2412            repdefs.def_meaning
2413        );
2414    }
2415
2416    #[test]
2417    fn test_repdef_fsl() {
2418        let mut builder = RepDefBuilder::default();
2419        builder.add_fsl(Some(validity(&[true, false])), 2, 2);
2420        builder.add_fsl(None, 2, 4);
2421        builder.add_validity_bitmap(validity(&[
2422            true, false, true, false, true, false, true, false,
2423        ]));
2424
2425        let repdefs = RepDefBuilder::serialize(vec![builder]);
2426
2427        assert_eq!(
2428            vec![
2429                DefinitionInterpretation::NullableItem,
2430                DefinitionInterpretation::AllValidItem,
2431                DefinitionInterpretation::NullableItem
2432            ],
2433            repdefs.def_meaning
2434        );
2435
2436        assert!(repdefs.repetition_levels.is_none());
2437
2438        let def = repdefs.definition_levels.unwrap();
2439
2440        assert_eq!([0, 1, 0, 1, 2, 2, 2, 2], *def);
2441
2442        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2443            None,
2444            Some(def.as_ref().to_vec()),
2445            repdefs.def_meaning.into(),
2446            8,
2447        )]);
2448
2449        assert_eq!(
2450            unraveler.unravel_validity(8),
2451            Some(validity(&[
2452                true, false, true, false, false, false, false, false
2453            ]))
2454        );
2455        assert_eq!(unraveler.unravel_fsl_validity(4, 2), None);
2456        assert_eq!(
2457            unraveler.unravel_fsl_validity(2, 2),
2458            Some(validity(&[true, false]))
2459        );
2460    }
2461
2462    #[test]
2463    fn test_repdef_fsl_allvalid_item() {
2464        let mut builder = RepDefBuilder::default();
2465        builder.add_fsl(Some(validity(&[true, false])), 2, 2);
2466        builder.add_fsl(None, 2, 4);
2467        builder.add_no_null(8);
2468
2469        let repdefs = RepDefBuilder::serialize(vec![builder]);
2470
2471        assert_eq!(
2472            vec![
2473                DefinitionInterpretation::AllValidItem,
2474                DefinitionInterpretation::AllValidItem,
2475                DefinitionInterpretation::NullableItem
2476            ],
2477            repdefs.def_meaning
2478        );
2479
2480        assert!(repdefs.repetition_levels.is_none());
2481
2482        let def = repdefs.definition_levels.unwrap();
2483
2484        assert_eq!([0, 0, 0, 0, 1, 1, 1, 1], *def);
2485
2486        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2487            None,
2488            Some(def.as_ref().to_vec()),
2489            repdefs.def_meaning.into(),
2490            8,
2491        )]);
2492
2493        assert_eq!(unraveler.unravel_validity(8), None);
2494        assert_eq!(unraveler.unravel_fsl_validity(4, 2), None);
2495        assert_eq!(
2496            unraveler.unravel_fsl_validity(2, 2),
2497            Some(validity(&[true, false]))
2498        );
2499    }
2500
2501    #[test]
2502    fn test_repdef_sliced_offsets() {
2503        // Sliced lists may have offsets that don't start with zero.  The
2504        // add_offsets call needs to normalize these to operate correctly.
2505        let mut builder = RepDefBuilder::default();
2506        builder.add_offsets(
2507            offsets_32(&[5, 7, 7, 10]),
2508            Some(validity(&[true, false, true])),
2509        );
2510        builder.add_no_null(5);
2511
2512        let repdefs = RepDefBuilder::serialize(vec![builder]);
2513
2514        let rep = repdefs.repetition_levels.unwrap();
2515        let def = repdefs.definition_levels.unwrap();
2516
2517        assert_eq!([1, 0, 1, 1, 0, 0], *rep);
2518        assert_eq!([0, 0, 1, 0, 0, 0], *def);
2519
2520        assert_eq!(
2521            vec![
2522                DefinitionInterpretation::AllValidItem,
2523                DefinitionInterpretation::NullableList,
2524            ],
2525            repdefs.def_meaning
2526        );
2527    }
2528
2529    #[test]
2530    fn test_repdef_complex_null_empty() {
2531        let mut builder = RepDefBuilder::default();
2532        builder.add_offsets(
2533            offsets_32(&[0, 4, 4, 4, 6]),
2534            Some(validity(&[true, false, true, true])),
2535        );
2536        builder.add_offsets(
2537            offsets_32(&[0, 1, 1, 2, 2, 2, 3]),
2538            Some(validity(&[true, false, true, false, true, true])),
2539        );
2540        builder.add_no_null(3);
2541
2542        let repdefs = RepDefBuilder::serialize(vec![builder]);
2543
2544        let rep = repdefs.repetition_levels.unwrap();
2545        let def = repdefs.definition_levels.unwrap();
2546
2547        assert_eq!([2, 1, 1, 1, 2, 2, 2, 1], *rep);
2548        assert_eq!([0, 1, 0, 1, 3, 4, 2, 0], *def);
2549    }
2550
2551    #[test]
2552    fn test_repdef_empty_list_no_null() {
2553        // Tests when we have some empty lists but no null lists.  This case
2554        // caused some bugs because we have definition but no nulls
2555        let mut builder = RepDefBuilder::default();
2556        builder.add_offsets(offsets_32(&[0, 4, 4, 4, 6]), None);
2557        builder.add_no_null(6);
2558
2559        let repdefs = RepDefBuilder::serialize(vec![builder]);
2560
2561        let rep = repdefs.repetition_levels.unwrap();
2562        let def = repdefs.definition_levels.unwrap();
2563
2564        assert_eq!([1, 0, 0, 0, 1, 1, 1, 0], *rep);
2565        assert_eq!([0, 0, 0, 0, 1, 1, 0, 0], *def);
2566
2567        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2568            Some(rep.as_ref().to_vec()),
2569            Some(def.as_ref().to_vec()),
2570            repdefs.def_meaning.into(),
2571            8,
2572        )]);
2573
2574        assert_eq!(unraveler.unravel_validity(6), None);
2575        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2576        assert_eq!(off.inner(), offsets_32(&[0, 4, 4, 4, 6]).inner());
2577        assert_eq!(val, None);
2578    }
2579
2580    #[test]
2581    fn test_repdef_all_valid() {
2582        let mut builder = RepDefBuilder::default();
2583        builder.add_offsets(offsets_64(&[0, 2, 3, 5]), None);
2584        builder.add_offsets(offsets_64(&[0, 1, 3, 5, 7, 9]), None);
2585        builder.add_no_null(9);
2586
2587        let repdefs = RepDefBuilder::serialize(vec![builder]);
2588        let rep = repdefs.repetition_levels.unwrap();
2589        assert!(repdefs.definition_levels.is_none());
2590
2591        assert_eq!([2, 1, 0, 2, 0, 2, 0, 1, 0], *rep);
2592
2593        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2594            Some(rep.as_ref().to_vec()),
2595            None,
2596            repdefs.def_meaning.into(),
2597            9,
2598        )]);
2599
2600        assert_eq!(unraveler.unravel_validity(9), None);
2601        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2602        assert_eq!(off.inner(), offsets_32(&[0, 1, 3, 5, 7, 9]).inner());
2603        assert_eq!(val, None);
2604        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2605        assert_eq!(off.inner(), offsets_32(&[0, 2, 3, 5]).inner());
2606        assert_eq!(val, None);
2607    }
2608
2609    #[test]
2610    fn test_only_empty_lists() {
2611        let mut builder = RepDefBuilder::default();
2612        builder.add_offsets(offsets_32(&[0, 4, 4, 4, 6]), None);
2613        builder.add_no_null(6);
2614
2615        let repdefs = RepDefBuilder::serialize(vec![builder]);
2616
2617        let rep = repdefs.repetition_levels.unwrap();
2618        let def = repdefs.definition_levels.unwrap();
2619
2620        assert_eq!([1, 0, 0, 0, 1, 1, 1, 0], *rep);
2621        assert_eq!([0, 0, 0, 0, 1, 1, 0, 0], *def);
2622
2623        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2624            Some(rep.as_ref().to_vec()),
2625            Some(def.as_ref().to_vec()),
2626            repdefs.def_meaning.into(),
2627            8,
2628        )]);
2629
2630        assert_eq!(unraveler.unravel_validity(6), None);
2631        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2632        assert_eq!(off.inner(), offsets_32(&[0, 4, 4, 4, 6]).inner());
2633        assert_eq!(val, None);
2634    }
2635
2636    #[test]
2637    fn test_only_null_lists() {
2638        let mut builder = RepDefBuilder::default();
2639        builder.add_offsets(
2640            offsets_32(&[0, 4, 4, 4, 6]),
2641            Some(validity(&[true, false, false, true])),
2642        );
2643        builder.add_no_null(6);
2644
2645        let repdefs = RepDefBuilder::serialize(vec![builder]);
2646
2647        let rep = repdefs.repetition_levels.unwrap();
2648        let def = repdefs.definition_levels.unwrap();
2649
2650        assert_eq!([1, 0, 0, 0, 1, 1, 1, 0], *rep);
2651        assert_eq!([0, 0, 0, 0, 1, 1, 0, 0], *def);
2652
2653        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2654            Some(rep.as_ref().to_vec()),
2655            Some(def.as_ref().to_vec()),
2656            repdefs.def_meaning.into(),
2657            8,
2658        )]);
2659
2660        assert_eq!(unraveler.unravel_validity(6), None);
2661        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2662        assert_eq!(off.inner(), offsets_32(&[0, 4, 4, 4, 6]).inner());
2663        assert_eq!(val, Some(validity(&[true, false, false, true])));
2664    }
2665
2666    #[test]
2667    fn test_null_and_empty_lists() {
2668        let mut builder = RepDefBuilder::default();
2669        builder.add_offsets(
2670            offsets_32(&[0, 4, 4, 4, 6]),
2671            Some(validity(&[true, false, true, true])),
2672        );
2673        builder.add_no_null(6);
2674
2675        let repdefs = RepDefBuilder::serialize(vec![builder]);
2676
2677        let rep = repdefs.repetition_levels.unwrap();
2678        let def = repdefs.definition_levels.unwrap();
2679
2680        assert_eq!([1, 0, 0, 0, 1, 1, 1, 0], *rep);
2681        assert_eq!([0, 0, 0, 0, 1, 2, 0, 0], *def);
2682
2683        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2684            Some(rep.as_ref().to_vec()),
2685            Some(def.as_ref().to_vec()),
2686            repdefs.def_meaning.into(),
2687            8,
2688        )]);
2689
2690        assert_eq!(unraveler.unravel_validity(6), None);
2691        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2692        assert_eq!(off.inner(), offsets_32(&[0, 4, 4, 4, 6]).inner());
2693        assert_eq!(val, Some(validity(&[true, false, true, true])));
2694    }
2695
2696    #[test]
2697    fn test_repdef_null_struct_valid_list() {
2698        // This regresses a bug
2699
2700        let rep = vec![1, 0, 0, 0];
2701        let def = vec![2, 0, 2, 2];
2702        // AllValidList<NullableStruct<NullableItem>>
2703        let def_meaning = vec![
2704            DefinitionInterpretation::NullableItem,
2705            DefinitionInterpretation::NullableItem,
2706            DefinitionInterpretation::AllValidList,
2707        ];
2708        let num_items = 4;
2709
2710        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2711            Some(rep),
2712            Some(def),
2713            def_meaning.into(),
2714            num_items,
2715        )]);
2716
2717        assert_eq!(
2718            unraveler.unravel_validity(4),
2719            Some(validity(&[false, true, false, false]))
2720        );
2721        assert_eq!(
2722            unraveler.unravel_validity(4),
2723            Some(validity(&[false, true, false, false]))
2724        );
2725        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2726        assert_eq!(off.inner(), offsets_32(&[0, 4]).inner());
2727        assert_eq!(val, None);
2728    }
2729
2730    #[test]
2731    fn test_repdef_no_rep() {
2732        let mut builder = RepDefBuilder::default();
2733        builder.add_no_null(5);
2734        builder.add_validity_bitmap(validity(&[false, false, true, true, true]));
2735        builder.add_validity_bitmap(validity(&[false, true, true, true, false]));
2736
2737        let repdefs = RepDefBuilder::serialize(vec![builder]);
2738        assert!(repdefs.repetition_levels.is_none());
2739        let def = repdefs.definition_levels.unwrap();
2740
2741        assert_eq!([2, 2, 0, 0, 1], *def);
2742
2743        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2744            None,
2745            Some(def.as_ref().to_vec()),
2746            repdefs.def_meaning.into(),
2747            5,
2748        )]);
2749
2750        assert_eq!(
2751            unraveler.unravel_validity(5),
2752            Some(validity(&[false, false, true, true, false]))
2753        );
2754        assert_eq!(
2755            unraveler.unravel_validity(5),
2756            Some(validity(&[false, false, true, true, true]))
2757        );
2758        assert_eq!(unraveler.unravel_validity(5), None);
2759    }
2760
2761    #[test]
2762    fn test_composite_unravel() {
2763        let mut builder = RepDefBuilder::default();
2764        builder.add_offsets(
2765            offsets_64(&[0, 2, 2, 5]),
2766            Some(validity(&[true, false, true])),
2767        );
2768        builder.add_no_null(5);
2769        let repdef1 = RepDefBuilder::serialize(vec![builder]);
2770
2771        let mut builder = RepDefBuilder::default();
2772        builder.add_offsets(offsets_64(&[0, 1, 3, 5, 7, 9]), None);
2773        builder.add_no_null(9);
2774        let repdef2 = RepDefBuilder::serialize(vec![builder]);
2775
2776        let rep1 = repdef1.repetition_levels.clone().unwrap();
2777        let def1 = repdef1.definition_levels.clone().unwrap();
2778        let rep2 = repdef2.repetition_levels.clone().unwrap();
2779        assert!(repdef2.definition_levels.is_none());
2780
2781        assert_eq!([1, 0, 1, 1, 0, 0], *rep1);
2782        assert_eq!([0, 0, 1, 0, 0, 0], *def1);
2783        assert_eq!([1, 1, 0, 1, 0, 1, 0, 1, 0], *rep2);
2784
2785        let unravel1 = RepDefUnraveler::new(
2786            repdef1.repetition_levels.map(|l| l.to_vec()),
2787            repdef1.definition_levels.map(|l| l.to_vec()),
2788            repdef1.def_meaning.into(),
2789            5,
2790        );
2791        let unravel2 = RepDefUnraveler::new(
2792            repdef2.repetition_levels.map(|l| l.to_vec()),
2793            repdef2.definition_levels.map(|l| l.to_vec()),
2794            repdef2.def_meaning.into(),
2795            9,
2796        );
2797
2798        let mut unraveler = CompositeRepDefUnraveler::new(vec![unravel1, unravel2]);
2799
2800        assert!(unraveler.unravel_validity(9).is_none());
2801        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2802        assert_eq!(
2803            off.inner(),
2804            offsets_32(&[0, 2, 2, 5, 6, 8, 10, 12, 14]).inner()
2805        );
2806        assert_eq!(
2807            val,
2808            Some(validity(&[true, false, true, true, true, true, true, true]))
2809        );
2810    }
2811
2812    #[test]
2813    fn test_repdef_multiple_builders() {
2814        // Basic case, rep & def
2815        let mut builder1 = RepDefBuilder::default();
2816        builder1.add_offsets(offsets_64(&[0, 2]), None);
2817        builder1.add_offsets(offsets_64(&[0, 1, 3]), None);
2818        builder1.add_validity_bitmap(validity(&[true, true, true]));
2819
2820        let mut builder2 = RepDefBuilder::default();
2821        builder2.add_offsets(offsets_64(&[0, 0, 3]), Some(validity(&[false, true])));
2822        builder2.add_offsets(
2823            offsets_64(&[0, 2, 2, 6]),
2824            Some(validity(&[true, false, true])),
2825        );
2826        builder2.add_validity_bitmap(validity(&[false, false, false, true, true, false]));
2827
2828        let repdefs = RepDefBuilder::serialize(vec![builder1, builder2]);
2829
2830        let rep = repdefs.repetition_levels.unwrap();
2831        let def = repdefs.definition_levels.unwrap();
2832
2833        assert_eq!([2, 1, 0, 2, 2, 0, 1, 1, 0, 0, 0], *rep);
2834        assert_eq!([0, 0, 0, 3, 1, 1, 2, 1, 0, 0, 1], *def);
2835    }
2836
2837    #[test]
2838    fn test_slicer() {
2839        let mut builder = RepDefBuilder::default();
2840        builder.add_offsets(
2841            offsets_64(&[0, 2, 2, 30, 30]),
2842            Some(validity(&[true, false, true, true])),
2843        );
2844        builder.add_no_null(30);
2845
2846        let repdefs = RepDefBuilder::serialize(vec![builder]);
2847
2848        let mut rep_slicer = repdefs.rep_slicer().unwrap();
2849
2850        // First 5 items include a null list so we get 6 levels (12 bytes)
2851        assert_eq!(rep_slicer.slice_next(5).len(), 12);
2852        // Next 20 are all plain
2853        assert_eq!(rep_slicer.slice_next(20).len(), 40);
2854        // Last 5 include an empty list so we get 6 levels (12 bytes)
2855        assert_eq!(rep_slicer.slice_rest().len(), 12);
2856
2857        let mut def_slicer = repdefs.rep_slicer().unwrap();
2858
2859        // First 5 items include a null list so we get 6 levels (12 bytes)
2860        assert_eq!(def_slicer.slice_next(5).len(), 12);
2861        // Next 20 are all plain
2862        assert_eq!(def_slicer.slice_next(20).len(), 40);
2863        // Last 5 include an empty list so we get 6 levels (12 bytes)
2864        assert_eq!(def_slicer.slice_rest().len(), 12);
2865    }
2866
2867    #[test]
2868    fn test_control_words() {
2869        // Convert to control words, verify expected, convert back, verify same as original
2870        fn check(
2871            rep: &[u16],
2872            def: &[u16],
2873            expected_values: Vec<u8>,
2874            expected_bytes_per_word: usize,
2875            expected_bits_rep: u8,
2876            expected_bits_def: u8,
2877        ) {
2878            let num_vals = rep.len().max(def.len());
2879            let max_rep = rep.iter().max().copied().unwrap_or(0);
2880            let max_def = def.iter().max().copied().unwrap_or(0);
2881
2882            let in_rep = if rep.is_empty() { None } else { Some(rep) };
2883            let in_def = if def.is_empty() { None } else { Some(def) };
2884
2885            let mut iter = super::build_control_word_iterator(
2886                in_rep,
2887                max_rep,
2888                in_def,
2889                max_def,
2890                max_def + 1,
2891                expected_values.len(),
2892            );
2893            assert_eq!(iter.bytes_per_word(), expected_bytes_per_word);
2894            assert_eq!(iter.bits_rep(), expected_bits_rep);
2895            assert_eq!(iter.bits_def(), expected_bits_def);
2896            let mut cw_vec = Vec::with_capacity(num_vals * iter.bytes_per_word());
2897
2898            for _ in 0..num_vals {
2899                iter.append_next(&mut cw_vec);
2900            }
2901            assert!(iter.append_next(&mut cw_vec).is_none());
2902
2903            assert_eq!(expected_values, cw_vec);
2904
2905            let parser = super::ControlWordParser::new(expected_bits_rep, expected_bits_def);
2906
2907            let mut rep_out = Vec::with_capacity(num_vals);
2908            let mut def_out = Vec::with_capacity(num_vals);
2909
2910            if expected_bytes_per_word > 0 {
2911                for slice in cw_vec.chunks_exact(expected_bytes_per_word) {
2912                    parser.parse(slice, &mut rep_out, &mut def_out);
2913                }
2914            }
2915
2916            assert_eq!(rep, rep_out.as_slice());
2917            assert_eq!(def, def_out.as_slice());
2918        }
2919
2920        // Each will need 4 bits and so we should get 1-byte control words
2921        let rep = &[0_u16, 7, 3, 2, 9, 8, 12, 5];
2922        let def = &[5_u16, 3, 1, 2, 12, 15, 0, 2];
2923        let expected = vec![
2924            0b00000101, // 0, 5
2925            0b01110011, // 7, 3
2926            0b00110001, // 3, 1
2927            0b00100010, // 2, 2
2928            0b10011100, // 9, 12
2929            0b10001111, // 8, 15
2930            0b11000000, // 12, 0
2931            0b01010010, // 5, 2
2932        ];
2933        check(rep, def, expected, 1, 4, 4);
2934
2935        // Now we need 5 bits for def so we get 2-byte control words
2936        let rep = &[0_u16, 7, 3, 2, 9, 8, 12, 5];
2937        let def = &[5_u16, 3, 1, 2, 12, 22, 0, 2];
2938        let expected = vec![
2939            0b00000101, 0b00000000, // 0, 5
2940            0b11100011, 0b00000000, // 7, 3
2941            0b01100001, 0b00000000, // 3, 1
2942            0b01000010, 0b00000000, // 2, 2
2943            0b00101100, 0b00000001, // 9, 12
2944            0b00010110, 0b00000001, // 8, 22
2945            0b10000000, 0b00000001, // 12, 0
2946            0b10100010, 0b00000000, // 5, 2
2947        ];
2948        check(rep, def, expected, 2, 4, 5);
2949
2950        // Just rep, 4 bits so 1 byte each
2951        let levels = &[0_u16, 7, 3, 2, 9, 8, 12, 5];
2952        let expected = vec![
2953            0b00000000, // 0
2954            0b00000111, // 7
2955            0b00000011, // 3
2956            0b00000010, // 2
2957            0b00001001, // 9
2958            0b00001000, // 8
2959            0b00001100, // 12
2960            0b00000101, // 5
2961        ];
2962        check(levels, &[], expected.clone(), 1, 4, 0);
2963
2964        // Just def
2965        check(&[], levels, expected, 1, 0, 4);
2966
2967        // No rep, no def, no bytes
2968        check(&[], &[], Vec::default(), 0, 0, 0);
2969    }
2970
2971    #[test]
2972    fn test_control_words_rep_index() {
2973        fn check(
2974            rep: &[u16],
2975            def: &[u16],
2976            expected_new_rows: Vec<bool>,
2977            expected_is_visible: Vec<bool>,
2978        ) {
2979            let num_vals = rep.len().max(def.len());
2980            let max_rep = rep.iter().max().copied().unwrap_or(0);
2981            let max_def = def.iter().max().copied().unwrap_or(0);
2982
2983            let in_rep = if rep.is_empty() { None } else { Some(rep) };
2984            let in_def = if def.is_empty() { None } else { Some(def) };
2985
2986            let mut iter = super::build_control_word_iterator(
2987                in_rep,
2988                max_rep,
2989                in_def,
2990                max_def,
2991                /*max_visible_def=*/ 2,
2992                expected_new_rows.len(),
2993            );
2994
2995            let mut cw_vec = Vec::with_capacity(num_vals * iter.bytes_per_word());
2996            let mut expected_new_rows = expected_new_rows.iter().copied();
2997            let mut expected_is_visible = expected_is_visible.iter().copied();
2998            for _ in 0..expected_new_rows.len() {
2999                let word_desc = iter.append_next(&mut cw_vec).unwrap();
3000                assert_eq!(word_desc.is_new_row, expected_new_rows.next().unwrap());
3001                assert_eq!(word_desc.is_visible, expected_is_visible.next().unwrap());
3002            }
3003            assert!(iter.append_next(&mut cw_vec).is_none());
3004        }
3005
3006        // 2 means new list
3007        let rep = &[2_u16, 1, 0, 2, 2, 0, 1, 1, 0, 2, 0];
3008        // These values don't matter for this test
3009        let def = &[0_u16, 0, 0, 3, 1, 1, 2, 1, 0, 0, 1];
3010
3011        // Rep & def
3012        check(
3013            rep,
3014            def,
3015            vec![
3016                true, false, false, true, true, false, false, false, false, true, false,
3017            ],
3018            vec![
3019                true, true, true, false, true, true, true, true, true, true, true,
3020            ],
3021        );
3022        // Rep only
3023        check(
3024            rep,
3025            &[],
3026            vec![
3027                true, false, false, true, true, false, false, false, false, true, false,
3028            ],
3029            vec![true; 11],
3030        );
3031        // No repetition
3032        check(
3033            &[],
3034            def,
3035            vec![
3036                true, true, true, true, true, true, true, true, true, true, true,
3037            ],
3038            vec![true; 11],
3039        );
3040        // No repetition, no definition
3041        check(
3042            &[],
3043            &[],
3044            vec![
3045                true, true, true, true, true, true, true, true, true, true, true,
3046            ],
3047            vec![true; 11],
3048        );
3049    }
3050
3051    #[test]
3052    fn regress_empty_list_case() {
3053        // This regresses a case where we had 3 null lists inside a struct
3054        let mut builder = RepDefBuilder::default();
3055        builder.add_validity_bitmap(validity(&[true, false, true]));
3056        builder.add_offsets(
3057            offsets_32(&[0, 0, 0, 0]),
3058            Some(validity(&[false, false, false])),
3059        );
3060        builder.add_no_null(0);
3061
3062        let repdefs = RepDefBuilder::serialize(vec![builder]);
3063        let rep = repdefs.repetition_levels.unwrap();
3064        let def = repdefs.definition_levels.unwrap();
3065
3066        assert_eq!([1, 1, 1], *rep);
3067        assert_eq!([1, 2, 1], *def);
3068
3069        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
3070            Some(rep.as_ref().to_vec()),
3071            Some(def.as_ref().to_vec()),
3072            repdefs.def_meaning.into(),
3073            0,
3074        )]);
3075
3076        assert_eq!(unraveler.unravel_validity(0), None);
3077        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
3078        assert_eq!(off.inner(), offsets_32(&[0, 0, 0, 0]).inner());
3079        assert_eq!(val, Some(validity(&[false, false, false])));
3080        let val = unraveler.unravel_validity(3).unwrap();
3081        assert_eq!(val.inner(), validity(&[true, false, true]).inner());
3082    }
3083
3084    #[test]
3085    fn regress_list_ends_null_case() {
3086        let mut builder = RepDefBuilder::default();
3087        builder.add_offsets(
3088            offsets_64(&[0, 1, 2, 2]),
3089            Some(validity(&[true, true, false])),
3090        );
3091        builder.add_offsets(offsets_64(&[0, 1, 1]), Some(validity(&[true, false])));
3092        builder.add_no_null(1);
3093
3094        let repdefs = RepDefBuilder::serialize(vec![builder]);
3095        let rep = repdefs.repetition_levels.unwrap();
3096        let def = repdefs.definition_levels.unwrap();
3097
3098        assert_eq!([2, 2, 2], *rep);
3099        assert_eq!([0, 1, 2], *def);
3100
3101        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
3102            Some(rep.as_ref().to_vec()),
3103            Some(def.as_ref().to_vec()),
3104            repdefs.def_meaning.into(),
3105            1,
3106        )]);
3107
3108        assert_eq!(unraveler.unravel_validity(1), None);
3109        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
3110        assert_eq!(off.inner(), offsets_32(&[0, 1, 1]).inner());
3111        assert_eq!(val, Some(validity(&[true, false])));
3112        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
3113        assert_eq!(off.inner(), offsets_32(&[0, 1, 2, 2]).inner());
3114        assert_eq!(val, Some(validity(&[true, true, false])));
3115    }
3116
3117    #[test]
3118    fn test_mixed_unraveler() {
3119        // This tests cases where the validity is different between two different pages
3120        // because one page has nulls and the other doesn't.
3121
3122        // Simple case with one layer of validity and no repetition
3123        let mut unraveler = CompositeRepDefUnraveler::new(vec![
3124            RepDefUnraveler::new(
3125                None,
3126                Some(vec![0, 1, 0, 1]),
3127                vec![DefinitionInterpretation::NullableItem].into(),
3128                4,
3129            ),
3130            RepDefUnraveler::new(
3131                None,
3132                None,
3133                vec![DefinitionInterpretation::AllValidItem].into(),
3134                4,
3135            ),
3136        ]);
3137
3138        assert_eq!(
3139            unraveler.unravel_validity(8),
3140            Some(validity(&[
3141                true, false, true, false, true, true, true, true
3142            ]))
3143        );
3144
3145        // More complex case with two layers of validity and repetition
3146        let def1 = Some(vec![0, 1, 2]);
3147        let rep1 = Some(vec![1, 0, 1]);
3148
3149        let def2 = Some(vec![1, 0, 0]);
3150        let rep2 = Some(vec![1, 1, 0]);
3151
3152        let mut unraveler = CompositeRepDefUnraveler::new(vec![
3153            RepDefUnraveler::new(
3154                rep1,
3155                def1,
3156                vec![
3157                    DefinitionInterpretation::NullableItem,
3158                    DefinitionInterpretation::EmptyableList,
3159                ]
3160                .into(),
3161                2,
3162            ),
3163            RepDefUnraveler::new(
3164                rep2,
3165                def2,
3166                vec![
3167                    DefinitionInterpretation::AllValidItem,
3168                    DefinitionInterpretation::NullableList,
3169                ]
3170                .into(),
3171                2,
3172            ),
3173        ]);
3174
3175        assert_eq!(
3176            unraveler.unravel_validity(4),
3177            Some(validity(&[true, false, true, true]))
3178        );
3179        assert_eq!(
3180            unraveler.unravel_offsets::<i32>().unwrap(),
3181            (
3182                offsets_32(&[0, 2, 2, 2, 4]),
3183                Some(validity(&[true, true, false, true]))
3184            )
3185        );
3186    }
3187}