Skip to main content

lance_encoding/
repdef.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Utilities for rep-def levels
5//!
6//! Repetition and definition levels are a way to encode multipile validity / offsets arrays
7//! into a single buffer.  They are a form of "zipping" buffers together that takes advantage
8//! of the fact that, if the outermost array is invalid, then the validity of the inner items
9//! is irrelevant.
10//!
11//! Note: the concept of repetition & definition levels comes from the Dremel paper and has
12//! been implemented in Apache Parquet.  However, the implementation here is not necessarily
13//! compatible with Parquet.  For example, we use 0 to represent the "inner-most" item and
14//! Parquet uses 0 to represent the "outer-most" item.
15//!
16//! # Repetition Levels
17//!
18//! With repetition levels we convert a sparse array of offsets into a dense array of levels.
19//! These levels are marked non-zero whenever a new list begins.  In other words, given the
20//! list array with 3 rows [{<0,1>, <>, <2>}, {<3>}, {}], [], [{<4>}] we would have three
21//! offsets arrays:
22//!
23//! Outer-most ([]): [0, 3, 3, 4]
24//! Middle     ({}): [0, 3, 4, 4, 5]
25//! Inner      (<>): [0, 2, 2, 3, 4, 5]
26//! Values         : [0, 1, 2, 3, 4]
27//!
28//! We can convert these into repetition levels as follows:
29//!
30//! | Values | Repetition |
31//! | ------ | ---------- |
32//! |      0 |          3 | // Start of outer-most list
33//! |      1 |          0 | // Continues inner-most list (no new lists)
34//! |      - |          1 | // Start of new inner-most list (empty list)
35//! |      2 |          1 | // Start of new inner-most list
36//! |      3 |          2 | // Start of new middle list
37//! |      - |          2 | // Start of new inner-most list (empty list)
38//! |      - |          3 | // Start of new outer-most list (empty list)
39//! |      4 |          0 | // Start of new outer-most list
40//!
41//! Note: We actually have MORE repetition levels than values.  This is because the repetition
42//! levels need to be able to represent empty lists.
43//!
44//! # Definition Levels
45//!
46//! Definition levels are simpler.  We can think of them as zipping together various validity bitmaps
47//! (from different levels of nesting) into a single buffer.  For example, we could zip the arrays
48//! [1, 1, 0, 0] and [1, 0, 1, 0] into [11, 10, 01, 00].  However, 00 and 01 are redundant.  If the
49//! outer level is null then the validity of the inner levels is irrelevant.  To save space we instead
50//! encode a "level" which is the "depth" of the null.  Let's look at a more complete example:
51//!
52//! Array: [{"middle": {"inner": 1]}}, NULL, {"middle": NULL}, {"middle": {"inner": NULL}}]
53//!
54//! In Arrow we would have the following validity arrays:
55//! Outer validity : 1, 0, 1, 1
56//! Middle validity: 1, ?, 0, 1
57//! Inner validity : 1, ?, ?, 0
58//! Values         : 1, ?, ?, ?
59//!
60//! The ? values are undefined in the Arrow format.  We can convert these into definition levels as follows:
61//!
62//! | Values | Definition |
63//! | ------ | ---------- |
64//! |      1 |          0 | // Valid at all levels
65//! |      - |          3 | // Null at outer level
66//! |      - |          2 | // Null at middle level
67//! |      - |          1 | // Null at inner level
68//!
69//! # Compression
70//!
71//! Note that we only need 2 bits of definition levels to represent 3 levels of nesting.  Definition
72//! levels are always more compact than the input validity arrays.  However, compressed levels are not
73//! necessarily more compact than the compressed validity arrays.
74//!
75//! Repetition levels are more complex.  If there are very large lists then a sparse array of offsets
76//! (which has one element per list) might be more compact than a dense array of repetition levels
77//! (which has one element per list value, possibly even more if there are empty lists).
78//!
79//! However, both repetition levels and definition levels are typically very compressible with RLE.
80//!
81//! However, in Lance we don't always take advantage of that compression because we want to be able
82//! to zip rep-def levels together with our values.  This gives us fewer IOPS when accessing row values.
83//!
84//! # Utilities in this Module
85//!
86//! - `RepDefBuilder` - Extracts validity and offset information from Arrow arrays.  We use this as we
87//!   shred the incoming data into primitive leaf arrays.  We don't immediately convert into rep-def because
88//!   we need to share and cheaply clone the builder when we have structs (each struct child shares some parent
89//!   validity / offset information)  The `serialize` method is called once all data has been received to create
90//!   the final rep-def levels.
91//!
92//! - `SerializerContext` - This is an internal utility that helps with serializing rep-def levels.
93//!
94//! - `CompositeRepDefUnraveler` - This structure is used to reverse the process.  It starts with a set of
95//!   rep-def levels and then uses the `unravel_validity` and `unravel_offsets` methods to produce validity
96//!   buffers and offset buffers.  It is "composite" because we may be combining sets of rep-def buffers from
97//!   multiple locations (e.g. multiple blocks in a mini-block encoded file).
98//!
99//! - `RepDefSlicer` - This is a utility that helps with slicing rep-def buffers.  These buffers are "kind of"
100//!   transparent (maps 1:1 with the values in the array) but not exactly because of the special (empty/null) lists.
101//!   The slicer helps with this issue and is used when slicing a rep-def buffer into mini-blocks.
102//!
103//! - `build_control_word_iterator` - This takes in rep-def levels and returns an iterator that returns byte-padded
104//!   "control words" which are used when creating full-zip encoded data.
105//!
106//! - `ControlWordParser` - This parser can parse the control words returned by `build_control_word_iterator` and is
107//!   used when decoding full-zip encoded data.
108
109use std::{
110    iter::{Copied, Zip},
111    sync::Arc,
112};
113
114use arrow_array::OffsetSizeTrait;
115use arrow_buffer::{
116    ArrowNativeType, BooleanBuffer, BooleanBufferBuilder, NullBuffer, OffsetBuffer, ScalarBuffer,
117};
118use lance_core::{Error, Result, utils::bit::log_2_ceil};
119
120use crate::buffer::LanceBuffer;
121
122pub type LevelBuffer = Vec<u16>;
123
124// As we build def levels we add this to special values to indicate that they
125// are special so that we can skip over them when processing lower levels.
126//
127// We assume 16 bits is good enough for rep-def levels.  This _would_ give
128// us 65536 levels of struct nesting and list nesting.  However, we cut that
129// in half for SPECIAL_THRESHOLD because we use the top bit to indicate if an
130// item is a special value (null list / empty list) during construction.
131//
132// We subtract this off at the end of construction to get the actual definition
133// levels.
134const SPECIAL_THRESHOLD: u16 = u16::MAX / 2;
135
136/// Represents information that we extract from a list array as we are
137/// encoding
138#[derive(Clone, Debug)]
139struct OffsetDesc {
140    offsets: Arc<[i64]>,
141    validity: Option<BooleanBuffer>,
142    has_empty_lists: bool,
143    num_values: usize,
144    num_specials: usize,
145}
146
147/// Represents validity information that we extract from non-list arrays (that
148/// have nulls) as we are encoding
149#[derive(Clone, Debug)]
150struct ValidityDesc {
151    validity: Option<BooleanBuffer>,
152    num_values: usize,
153}
154
155/// Represents validity information that we extract from FSL arrays.  This is
156/// just validity (no offsets) but we also record the dimension of the FSL array
157/// as that will impact the next layer
158#[derive(Clone, Debug)]
159struct FslDesc {
160    validity: Option<BooleanBuffer>,
161    dimension: usize,
162    num_values: usize,
163}
164
165// As we build up rep/def from arrow arrays we record a
166// series of RawRepDef objects.  Each one corresponds to layer
167// in the array structure
168#[derive(Clone, Debug)]
169enum RawRepDef {
170    Offsets(OffsetDesc),
171    Validity(ValidityDesc),
172    Fsl(FslDesc),
173}
174
175impl RawRepDef {
176    // Are there any nulls in this layer
177    fn has_nulls(&self) -> bool {
178        match self {
179            Self::Offsets(OffsetDesc { validity, .. }) => validity.is_some(),
180            Self::Validity(ValidityDesc { validity, .. }) => validity.is_some(),
181            Self::Fsl(FslDesc { validity, .. }) => validity.is_some(),
182        }
183    }
184
185    // How many values are in this layer
186    fn num_values(&self) -> usize {
187        match self {
188            Self::Offsets(OffsetDesc { num_values, .. }) => *num_values,
189            Self::Validity(ValidityDesc { num_values, .. }) => *num_values,
190            Self::Fsl(FslDesc { num_values, .. }) => *num_values,
191        }
192    }
193
194    /// How many empty/null lists are in this layer
195    fn num_specials(&self) -> usize {
196        match self {
197            Self::Offsets(OffsetDesc { num_specials, .. }) => *num_specials,
198            _ => 0,
199        }
200    }
201
202    /// How many definition levels do we need for this layer
203    fn max_def(&self) -> u16 {
204        match self {
205            Self::Offsets(OffsetDesc {
206                has_empty_lists,
207                validity,
208                ..
209            }) => {
210                let mut max_def = 0;
211                if *has_empty_lists {
212                    max_def += 1;
213                }
214                if validity.is_some() {
215                    max_def += 1;
216                }
217                max_def
218            }
219            Self::Validity(ValidityDesc { validity: None, .. }) => 0,
220            Self::Validity(ValidityDesc { .. }) => 1,
221            Self::Fsl(FslDesc { validity: None, .. }) => 0,
222            Self::Fsl(FslDesc { .. }) => 1,
223        }
224    }
225
226    /// How many repetition levels do we need for this layer
227    fn max_rep(&self) -> u16 {
228        match self {
229            Self::Offsets(_) => 1,
230            _ => 0,
231        }
232    }
233}
234
235/// Represents repetition and definition levels that have been
236/// serialized into a pair of (optional) level buffers
237#[derive(Debug)]
238pub struct SerializedRepDefs {
239    /// The repetition levels, one per item
240    ///
241    /// If None, there are no lists
242    pub repetition_levels: Option<Arc<[u16]>>,
243    /// The definition levels, one per item
244    ///
245    /// If None, there are no nulls
246    pub definition_levels: Option<Arc<[u16]>>,
247    /// The meaning of each definition level
248    pub def_meaning: Vec<DefinitionInterpretation>,
249    /// The maximum level that is "visible" from the lowest level
250    ///
251    /// This is the last level before we encounter a list level of some kind.  Once we've
252    /// hit a list level then nulls in any level beyond do not map to actual items.
253    ///
254    /// This is None if there are no lists
255    pub max_visible_level: Option<u16>,
256}
257
258impl SerializedRepDefs {
259    pub fn new(
260        repetition_levels: Option<LevelBuffer>,
261        definition_levels: Option<LevelBuffer>,
262        def_meaning: Vec<DefinitionInterpretation>,
263    ) -> Self {
264        let first_list = def_meaning.iter().position(|level| level.is_list());
265        let max_visible_level = first_list.map(|first_list| {
266            def_meaning
267                .iter()
268                .map(|level| level.num_def_levels())
269                .take(first_list)
270                .sum::<u16>()
271        });
272        Self {
273            repetition_levels: repetition_levels.map(Arc::from),
274            definition_levels: definition_levels.map(Arc::from),
275            def_meaning,
276            max_visible_level,
277        }
278    }
279
280    /// Creates an empty SerializedRepDefs (no repetition, all valid)
281    pub fn empty(def_meaning: Vec<DefinitionInterpretation>) -> Self {
282        Self {
283            repetition_levels: None,
284            definition_levels: None,
285            def_meaning,
286            max_visible_level: None,
287        }
288    }
289
290    pub fn rep_slicer(&self) -> Option<RepDefSlicer<'_>> {
291        self.repetition_levels
292            .as_ref()
293            .map(|rep| RepDefSlicer::new(self, rep.clone()))
294    }
295
296    pub fn def_slicer(&self) -> Option<RepDefSlicer<'_>> {
297        self.definition_levels
298            .as_ref()
299            .map(|def| RepDefSlicer::new(self, def.clone()))
300    }
301}
302
303/// Slices a level buffer into pieces
304///
305/// This is needed to handle the fact that a level buffer may have more
306/// levels than values due to special (empty/null) lists.
307///
308/// As a result, a call to `slice_next(10)` may return 10 levels or it may
309/// return more than 10 levels if any special values are encountered.
310#[derive(Debug)]
311pub struct RepDefSlicer<'a> {
312    repdef: &'a SerializedRepDefs,
313    to_slice: LanceBuffer,
314    current: usize,
315}
316
317// TODO: All of this logic will need some changing when we compress rep/def levels.
318impl<'a> RepDefSlicer<'a> {
319    fn new(repdef: &'a SerializedRepDefs, levels: Arc<[u16]>) -> Self {
320        Self {
321            repdef,
322            to_slice: LanceBuffer::reinterpret_slice(levels),
323            current: 0,
324        }
325    }
326
327    pub fn num_levels(&self) -> usize {
328        self.to_slice.len() / 2
329    }
330
331    pub fn num_levels_remaining(&self) -> usize {
332        self.num_levels() - self.current
333    }
334
335    pub fn all_levels(&self) -> &LanceBuffer {
336        &self.to_slice
337    }
338
339    /// Returns the rest of the levels not yet sliced
340    ///
341    /// This must be called instead of `slice_next` on the final iteration.
342    /// This is because anytime we slice there may be empty/null lists on the
343    /// boundary that are "free" and the current behavior in `slice_next` is to
344    /// leave them for the next call.
345    ///
346    /// `slice_rest` will slice all remaining levels and return them.
347    pub fn slice_rest(&mut self) -> LanceBuffer {
348        let start = self.current;
349        let remaining = self.num_levels_remaining();
350        self.current = self.num_levels();
351        self.to_slice.slice_with_length(start * 2, remaining * 2)
352    }
353
354    /// Returns enough levels to satisfy the next `num_values` values
355    pub fn slice_next(&mut self, num_values: usize) -> LanceBuffer {
356        let start = self.current;
357        let Some(max_visible_level) = self.repdef.max_visible_level else {
358            // No lists, should be 1:1 mapping from levels to values
359            self.current = start + num_values;
360            return self.to_slice.slice_with_length(start * 2, num_values * 2);
361        };
362        if let Some(def) = self.repdef.definition_levels.as_ref() {
363            // There are lists and there are def levels.  That means there may be
364            // more rep/def levels than values.  We need to scan the def levels to figure
365            // out which items are "invisible" and skip over them
366            let mut def_itr = def[start..].iter();
367            let mut num_taken = 0;
368            let mut num_passed = 0;
369            while num_taken < num_values {
370                let def_level = *def_itr.next().unwrap();
371                if def_level <= max_visible_level {
372                    num_taken += 1;
373                }
374                num_passed += 1;
375            }
376            self.current = start + num_passed;
377            self.to_slice.slice_with_length(start * 2, num_passed * 2)
378        } else {
379            // No def levels, should be 1:1 mapping from levels to values
380            self.current = start + num_values;
381            self.to_slice.slice_with_length(start * 2, num_values * 2)
382        }
383    }
384}
385
386/// This tells us how an array handles definition.  Given a stack of
387/// these and a nested array and a set of definition levels we can calculate
388/// how we should interpret the definition levels.
389///
390/// For example, if the interpretation is [AllValidItem, NullableItem] then
391/// a 0 means "valid item" and a 1 means "null struct".  If the interpretation
392/// is [NullableItem, NullableItem] then a 0 means "valid item" and a 1 means
393/// "null item" and a 2 means "null struct".
394///
395/// Lists are tricky because we might use up to two definition levels for a
396/// single layer of list nesting because we need one value to indicate "empty list"
397/// and another value to indicate "null list".
398#[derive(Debug, Copy, Clone, PartialEq, Eq)]
399pub enum DefinitionInterpretation {
400    AllValidItem,
401    AllValidList,
402    NullableItem,
403    NullableList,
404    EmptyableList,
405    NullableAndEmptyableList,
406}
407
408impl DefinitionInterpretation {
409    /// How many definition levels do we need for this layer
410    pub fn num_def_levels(&self) -> u16 {
411        match self {
412            Self::AllValidItem => 0,
413            Self::AllValidList => 0,
414            Self::NullableItem => 1,
415            Self::NullableList => 1,
416            Self::EmptyableList => 1,
417            Self::NullableAndEmptyableList => 2,
418        }
419    }
420
421    /// Does this layer have nulls?
422    pub fn is_all_valid(&self) -> bool {
423        matches!(
424            self,
425            Self::AllValidItem | Self::AllValidList | Self::EmptyableList
426        )
427    }
428
429    /// Does this layer represent a list?
430    pub fn is_list(&self) -> bool {
431        matches!(
432            self,
433            Self::AllValidList
434                | Self::NullableList
435                | Self::EmptyableList
436                | Self::NullableAndEmptyableList
437        )
438    }
439}
440
441/// The RepDefBuilder is used to collect offsets & validity buffers
442/// from arrow structures.  Once we have those we use the SerializerContext
443/// to build the actual repetition and definition levels.
444///
445/// We know ahead of time how many rep/def levels we will need (number of items
446/// in inner-most array + the number of empty/null lists in any parent arrays).
447///
448/// As a result we try and avoid any re-allocations by pre-allocating the buffers
449/// up front.  We allocate two copies of each buffer which allows us to avoid unsafe
450/// code caused by reading and writing to the same buffer (also, it's unavoidable
451/// because there are times we need to write 'faster' than we read)
452#[derive(Debug)]
453struct SerializerContext {
454    // This is built from outer-to-inner and then reversed at the end
455    def_meaning: Vec<DefinitionInterpretation>,
456    rep_levels: LevelBuffer,
457    spare_rep: LevelBuffer,
458    def_levels: LevelBuffer,
459    spare_def: LevelBuffer,
460    current_rep: u16,
461    current_def: u16,
462    current_len: usize,
463    current_num_specials: usize,
464}
465
466impl SerializerContext {
467    fn new(len: usize, num_layers: usize, max_rep: u16, max_def: u16) -> Self {
468        let def_meaning = Vec::with_capacity(num_layers);
469        Self {
470            rep_levels: if max_rep > 0 {
471                vec![0; len]
472            } else {
473                LevelBuffer::default()
474            },
475            spare_rep: if max_rep > 0 {
476                vec![0; len]
477            } else {
478                LevelBuffer::default()
479            },
480            def_levels: if max_def > 0 {
481                vec![0; len]
482            } else {
483                LevelBuffer::default()
484            },
485            spare_def: if max_def > 0 {
486                vec![0; len]
487            } else {
488                LevelBuffer::default()
489            },
490            def_meaning,
491            current_rep: max_rep,
492            current_def: max_def,
493            current_len: 0,
494            current_num_specials: 0,
495        }
496    }
497
498    fn checkout_def(&mut self, meaning: DefinitionInterpretation) -> u16 {
499        let def = self.current_def;
500        self.current_def -= meaning.num_def_levels();
501        self.def_meaning.push(meaning);
502        def
503    }
504
505    fn record_offsets(&mut self, offset_desc: &OffsetDesc) {
506        let rep_level = self.current_rep;
507        let (null_list_level, empty_list_level) =
508            match (offset_desc.validity.is_some(), offset_desc.has_empty_lists) {
509                (true, true) => {
510                    let level =
511                        self.checkout_def(DefinitionInterpretation::NullableAndEmptyableList);
512                    (level - 1, level)
513                }
514                (true, false) => (self.checkout_def(DefinitionInterpretation::NullableList), 0),
515                (false, true) => (
516                    0,
517                    self.checkout_def(DefinitionInterpretation::EmptyableList),
518                ),
519                (false, false) => {
520                    self.checkout_def(DefinitionInterpretation::AllValidList);
521                    (0, 0)
522                }
523            };
524        self.current_rep -= 1;
525
526        if let Some(validity) = &offset_desc.validity {
527            self.do_record_validity(validity, null_list_level);
528        }
529
530        // We write into the spare buffers and read from the active buffers
531        // and then swap at the end.  This way we don't write over what we
532        // are reading.
533
534        let mut new_len = 0;
535        let expected_len = offset_desc.num_values + self.current_num_specials;
536        if expected_len == 0 {
537            // Offsets [0] mean no list values, so no levels.
538            self.current_len = 0;
539            return;
540        }
541        assert!(self.rep_levels.len() >= expected_len - 1);
542        if self.def_levels.is_empty() {
543            let mut write_itr = self.spare_rep.iter_mut();
544            let mut read_iter = self.rep_levels.iter().copied();
545            for w in offset_desc.offsets.windows(2) {
546                let len = w[1] - w[0];
547                // len can't be 0 because then we'd have def levels
548                assert!(len > 0);
549                let rep = read_iter.next().unwrap();
550                let list_level = if rep == 0 { rep_level } else { rep };
551                *write_itr.next().unwrap() = list_level;
552
553                for _ in 1..len {
554                    *write_itr.next().unwrap() = 0;
555                }
556                new_len += len as usize;
557            }
558            std::mem::swap(&mut self.rep_levels, &mut self.spare_rep);
559        } else {
560            assert!(self.def_levels.len() >= expected_len - 1);
561            let mut def_write_itr = self.spare_def.iter_mut();
562            let mut rep_write_itr = self.spare_rep.iter_mut();
563            let mut rep_read_itr = self.rep_levels.iter().copied();
564            let mut def_read_itr = self.def_levels.iter().copied();
565            let specials_to_pass = self.current_num_specials;
566            let mut specials_passed = 0;
567
568            for w in offset_desc.offsets.windows(2) {
569                let mut def = def_read_itr.next().unwrap();
570                // Copy over any higher-level special values in place
571                while def > SPECIAL_THRESHOLD {
572                    *def_write_itr.next().unwrap() = def;
573                    *rep_write_itr.next().unwrap() = rep_read_itr.next().unwrap();
574                    def = def_read_itr.next().unwrap();
575                    new_len += 1;
576                    specials_passed += 1;
577                }
578
579                let len = w[1] - w[0];
580                let rep = rep_read_itr.next().unwrap();
581
582                // If the rep_level is 0 then we are the first list level
583                // otherwise we are starting a higher level list so keep
584                // existing rep level
585                let list_level = if rep == 0 { rep_level } else { rep };
586
587                if def == 0 && len > 0 {
588                    // New valid list, write a rep level and then add new 0/0 items
589                    *def_write_itr.next().unwrap() = 0;
590                    *rep_write_itr.next().unwrap() = list_level;
591
592                    for _ in 1..len {
593                        *def_write_itr.next().unwrap() = 0;
594                        *rep_write_itr.next().unwrap() = 0;
595                    }
596
597                    new_len += len as usize;
598                } else if def == 0 {
599                    // Empty list, insert new special
600                    *def_write_itr.next().unwrap() = empty_list_level + SPECIAL_THRESHOLD;
601                    *rep_write_itr.next().unwrap() = list_level;
602                    new_len += 1;
603                } else {
604                    // Either the list is null or one of its struct parents
605                    // is null.  Promote it to a special value.
606                    *def_write_itr.next().unwrap() = def + SPECIAL_THRESHOLD;
607                    *rep_write_itr.next().unwrap() = list_level;
608                    new_len += 1;
609                }
610            }
611
612            // If we have any special values at the end, we need to copy them over
613            while specials_passed < specials_to_pass {
614                *def_write_itr.next().unwrap() = def_read_itr.next().unwrap();
615                *rep_write_itr.next().unwrap() = rep_read_itr.next().unwrap();
616                new_len += 1;
617                specials_passed += 1;
618            }
619            std::mem::swap(&mut self.def_levels, &mut self.spare_def);
620            std::mem::swap(&mut self.rep_levels, &mut self.spare_rep);
621        }
622
623        self.current_len = new_len;
624        self.current_num_specials += offset_desc.num_specials;
625    }
626
627    fn do_record_validity(&mut self, validity: &BooleanBuffer, null_level: u16) {
628        assert!(self.def_levels.len() >= validity.len() + self.current_num_specials);
629        debug_assert!(
630            self.current_len == 0 || self.current_len == validity.len() + self.current_num_specials
631        );
632        self.current_len = validity.len();
633
634        let mut def_read_itr = self.def_levels.iter().copied();
635        let mut def_write_itr = self.spare_def.iter_mut();
636
637        let specials_to_pass = self.current_num_specials;
638        let mut specials_passed = 0;
639
640        for incoming_validity in validity.iter() {
641            let mut def = def_read_itr.next().unwrap();
642            while def > SPECIAL_THRESHOLD {
643                *def_write_itr.next().unwrap() = def;
644                def = def_read_itr.next().unwrap();
645                specials_passed += 1;
646            }
647            if def == 0 && !incoming_validity {
648                *def_write_itr.next().unwrap() = null_level;
649            } else {
650                *def_write_itr.next().unwrap() = def;
651            }
652        }
653
654        while specials_passed < specials_to_pass {
655            *def_write_itr.next().unwrap() = def_read_itr.next().unwrap();
656            specials_passed += 1;
657        }
658
659        std::mem::swap(&mut self.def_levels, &mut self.spare_def);
660    }
661
662    fn multiply_levels(&mut self, multiplier: usize) {
663        let old_len = self.current_len;
664        // All non-special values will be broadcasted by the multiplier.  Special values are copied as-is.
665        self.current_len =
666            (self.current_len - self.current_num_specials) * multiplier + self.current_num_specials;
667
668        if self.rep_levels.is_empty() && self.def_levels.is_empty() {
669            // All valid with no rep/def levels, nothing to do
670            return;
671        } else if self.rep_levels.is_empty() {
672            assert!(self.def_levels.len() >= self.current_len);
673            // No rep levels, just multiply the def levels
674            let mut def_read_itr = self.def_levels.iter().copied();
675            let mut def_write_itr = self.spare_def.iter_mut();
676            for _ in 0..old_len {
677                let mut def = def_read_itr.next().unwrap();
678                while def > SPECIAL_THRESHOLD {
679                    *def_write_itr.next().unwrap() = def;
680                    def = def_read_itr.next().unwrap();
681                }
682                for _ in 0..multiplier {
683                    *def_write_itr.next().unwrap() = def;
684                }
685            }
686        } else if self.def_levels.is_empty() {
687            assert!(self.rep_levels.len() >= self.current_len);
688            // No def levels, just multiply the rep levels
689            let mut rep_read_itr = self.rep_levels.iter().copied();
690            let mut rep_write_itr = self.spare_rep.iter_mut();
691            for _ in 0..old_len {
692                let rep = rep_read_itr.next().unwrap();
693                for _ in 0..multiplier {
694                    *rep_write_itr.next().unwrap() = rep;
695                }
696            }
697        } else {
698            assert!(self.rep_levels.len() >= self.current_len);
699            assert!(self.def_levels.len() >= self.current_len);
700            let mut rep_read_itr = self.rep_levels.iter().copied();
701            let mut def_read_itr = self.def_levels.iter().copied();
702            let mut rep_write_itr = self.spare_rep.iter_mut();
703            let mut def_write_itr = self.spare_def.iter_mut();
704            for _ in 0..old_len {
705                let mut def = def_read_itr.next().unwrap();
706                while def > SPECIAL_THRESHOLD {
707                    *def_write_itr.next().unwrap() = def;
708                    *rep_write_itr.next().unwrap() = rep_read_itr.next().unwrap();
709                    def = def_read_itr.next().unwrap();
710                }
711                let rep = rep_read_itr.next().unwrap();
712                for _ in 0..multiplier {
713                    *def_write_itr.next().unwrap() = def;
714                    *rep_write_itr.next().unwrap() = rep;
715                }
716            }
717        }
718        std::mem::swap(&mut self.def_levels, &mut self.spare_def);
719        std::mem::swap(&mut self.rep_levels, &mut self.spare_rep);
720    }
721
722    fn record_validity_buf(&mut self, validity: &Option<BooleanBuffer>) {
723        if let Some(validity) = validity {
724            let def_level = self.checkout_def(DefinitionInterpretation::NullableItem);
725            self.do_record_validity(validity, def_level);
726        } else {
727            self.checkout_def(DefinitionInterpretation::AllValidItem);
728        }
729    }
730
731    fn record_validity(&mut self, validity_desc: &ValidityDesc) {
732        self.record_validity_buf(&validity_desc.validity)
733    }
734
735    fn record_fsl(&mut self, fsl_desc: &FslDesc) {
736        self.record_validity_buf(&fsl_desc.validity);
737        self.multiply_levels(fsl_desc.dimension);
738    }
739
740    fn normalize_specials(&mut self) {
741        for def in self.def_levels.iter_mut() {
742            if *def > SPECIAL_THRESHOLD {
743                *def -= SPECIAL_THRESHOLD;
744            }
745        }
746    }
747
748    fn build(mut self) -> SerializedRepDefs {
749        if self.current_len == 0 {
750            return SerializedRepDefs::new(None, None, self.def_meaning);
751        }
752
753        self.normalize_specials();
754
755        let definition_levels = if self.def_levels.is_empty() {
756            None
757        } else {
758            Some(self.def_levels)
759        };
760        let repetition_levels = if self.rep_levels.is_empty() {
761            None
762        } else {
763            Some(self.rep_levels)
764        };
765
766        // Need to reverse the def meaning since we build rep / def levels in reverse
767        let def_meaning = self.def_meaning.into_iter().rev().collect::<Vec<_>>();
768
769        SerializedRepDefs::new(repetition_levels, definition_levels, def_meaning)
770    }
771}
772
773/// A structure used to collect validity buffers and offsets from arrow
774/// arrays and eventually create repetition and definition levels
775///
776/// As we are encoding the structural encoders are given this struct and
777/// will record the arrow information into it.  Once we hit a leaf node we
778/// serialize the data into rep/def levels and write these into the page.
779#[derive(Clone, Default, Debug)]
780pub struct RepDefBuilder {
781    // The rep/def info we have collected so far
782    repdefs: Vec<RawRepDef>,
783    // The current length, can get larger as we traverse lists (e.g. an
784    // array might have 5 lists which results in 50 items)
785    //
786    // Starts uninitialized until we see the first rep/def item
787    len: Option<usize>,
788}
789
790impl RepDefBuilder {
791    fn check_validity_len(&mut self, incoming_len: usize) {
792        if let Some(len) = self.len {
793            assert_eq!(incoming_len, len);
794        } else {
795            // First validity buffer we've seen
796            self.len = Some(incoming_len);
797        }
798    }
799
800    fn num_layers(&self) -> usize {
801        self.repdefs.len()
802    }
803
804    /// The builder is "empty" if there is no repetition and no nulls.  In this case we don't need
805    /// to store anything to disk (except the description)
806    pub fn is_empty(&self) -> bool {
807        self.repdefs
808            .iter()
809            .all(|r| matches!(r, RawRepDef::Validity(ValidityDesc { validity: None, .. })))
810    }
811
812    /// Returns true if there is only a single layer of definition
813    pub fn is_simple_validity(&self) -> bool {
814        self.repdefs.len() == 1 && matches!(self.repdefs[0], RawRepDef::Validity(_))
815    }
816
817    /// Registers a nullable validity bitmap
818    pub fn add_validity_bitmap(&mut self, validity: NullBuffer) {
819        self.check_validity_len(validity.len());
820        if validity.null_count() == 0 {
821            self.add_no_null(validity.len());
822            return;
823        }
824        self.repdefs.push(RawRepDef::Validity(ValidityDesc {
825            num_values: validity.len(),
826            validity: Some(validity.into_inner()),
827        }));
828    }
829
830    /// Registers an all-valid validity layer
831    pub fn add_no_null(&mut self, len: usize) {
832        self.check_validity_len(len);
833        self.repdefs.push(RawRepDef::Validity(ValidityDesc {
834            validity: None,
835            num_values: len,
836        }));
837    }
838
839    pub fn add_fsl(&mut self, validity: Option<NullBuffer>, dimension: usize, num_values: usize) {
840        if let Some(len) = self.len {
841            assert_eq!(num_values, len);
842        }
843        self.len = Some(num_values * dimension);
844        debug_assert!(validity.is_none() || validity.as_ref().unwrap().len() == num_values);
845        self.repdefs.push(RawRepDef::Fsl(FslDesc {
846            num_values,
847            validity: validity.map(|v| v.into_inner()),
848            dimension,
849        }))
850    }
851
852    fn check_offset_len(&mut self, offsets: &[i64]) {
853        if let Some(len) = self.len {
854            assert!(offsets.len() == len + 1);
855        }
856        self.len = Some(offsets[offsets.len() - 1] as usize);
857    }
858
859    fn do_add_offsets(
860        &mut self,
861        lengths: impl Iterator<Item = i64>,
862        validity: Option<NullBuffer>,
863        capacity: usize,
864    ) -> bool {
865        let mut num_specials = 0;
866        let mut has_empty_lists = false;
867        let mut has_garbage_values = false;
868        let mut last_off: i64 = 0;
869
870        let mut normalized_offsets = Vec::with_capacity(capacity);
871        normalized_offsets.push(0);
872
873        if let Some(ref validity) = validity {
874            for (len, is_valid) in lengths.zip(validity.iter()) {
875                match (is_valid, len == 0) {
876                    (false, is_empty) => {
877                        num_specials += 1;
878                        has_garbage_values |= !is_empty;
879                    }
880                    (true, true) => {
881                        num_specials += 1;
882                        has_empty_lists = true;
883                    }
884                    _ => {
885                        last_off += len;
886                    }
887                }
888                normalized_offsets.push(last_off);
889            }
890        } else {
891            for len in lengths {
892                if len == 0 {
893                    num_specials += 1;
894                    has_empty_lists = true;
895                }
896                last_off += len;
897                normalized_offsets.push(last_off);
898            }
899        }
900
901        self.check_offset_len(&normalized_offsets);
902        self.repdefs.push(RawRepDef::Offsets(OffsetDesc {
903            num_values: normalized_offsets.len() - 1,
904            offsets: normalized_offsets.into(),
905            validity: validity.map(|v| v.into_inner()),
906            has_empty_lists,
907            num_specials: num_specials as usize,
908        }));
909
910        has_garbage_values
911    }
912
913    /// Adds a layer of offsets
914    ///
915    /// Offsets are casted to a common type (i64) and also normalized.  Null lists are
916    /// always represented by a zero-length (identical) pair of offsets and so the caller
917    /// should filter out any garbage items before encoding them.  To assist with this the
918    /// method will return true if any non-empty null lists were found.
919    pub fn add_offsets<O: OffsetSizeTrait>(
920        &mut self,
921        offsets: OffsetBuffer<O>,
922        validity: Option<NullBuffer>,
923    ) -> bool {
924        let inner = offsets.into_inner();
925        let buffer_len = inner.len();
926
927        if O::IS_LARGE {
928            let i64_buff = ScalarBuffer::<i64>::new(inner.into_inner(), 0, buffer_len);
929            let lengths = i64_buff.windows(2).map(|off| off[1] - off[0]);
930            self.do_add_offsets(lengths, validity, buffer_len)
931        } else {
932            let i32_buff = ScalarBuffer::<i32>::new(inner.into_inner(), 0, buffer_len);
933            let lengths = i32_buff.windows(2).map(|off| (off[1] - off[0]) as i64);
934            self.do_add_offsets(lengths, validity, buffer_len)
935        }
936    }
937
938    // When we are encoding data it arrives in batches.  For each batch we create a RepDefBuilder and collect the
939    // various validity buffers and offset buffers from that batch.  Once we have enough batches to write a page we
940    // need to take this collection of RepDefBuilders and concatenate them and then serialize them into rep/def levels.
941    //
942    // TODO: In the future, we may concatenate and serialize at the same time?
943    //
944    // This method takes care of the concatenation part.  First we collect all of layer 0 from each builder, then we
945    // call this method.  Then we collect all of layer 1 from each builder and call this method.  And so on.
946    //
947    // That means this method should get a collection of `RawRepDef` where each item is the same kind (all validity or
948    // all offsets) though the nullability / lengths may be different in each layer.
949    fn concat_layers<'a>(
950        layers: impl Iterator<Item = &'a RawRepDef>,
951        num_layers: usize,
952    ) -> RawRepDef {
953        enum LayerKind {
954            Validity,
955            Fsl,
956            Offsets,
957        }
958
959        // We make two passes through the layers.  The first determines if we need to pay the cost of allocating
960        // buffers.  The second pass actually adds the values.
961        let mut collected = Vec::with_capacity(num_layers);
962        let mut has_nulls = false;
963        let mut layer_kind = LayerKind::Validity;
964        let mut total_num_specials = 0;
965        let mut all_dimension = 0;
966        let mut all_has_empty_lists = false;
967        let mut all_num_values = 0;
968        for layer in layers {
969            has_nulls |= layer.has_nulls();
970            match layer {
971                RawRepDef::Validity(_) => {
972                    layer_kind = LayerKind::Validity;
973                }
974                RawRepDef::Offsets(OffsetDesc {
975                    num_specials,
976                    has_empty_lists,
977                    ..
978                }) => {
979                    all_has_empty_lists |= *has_empty_lists;
980                    layer_kind = LayerKind::Offsets;
981                    total_num_specials += num_specials;
982                }
983                RawRepDef::Fsl(FslDesc { dimension, .. }) => {
984                    layer_kind = LayerKind::Fsl;
985                    all_dimension = *dimension;
986                }
987            }
988            collected.push(layer);
989            all_num_values += layer.num_values();
990        }
991
992        // Shortcut if there are no nulls
993        if !has_nulls {
994            match layer_kind {
995                LayerKind::Validity => {
996                    return RawRepDef::Validity(ValidityDesc {
997                        validity: None,
998                        num_values: all_num_values,
999                    });
1000                }
1001                LayerKind::Fsl => {
1002                    return RawRepDef::Fsl(FslDesc {
1003                        validity: None,
1004                        num_values: all_num_values,
1005                        dimension: all_dimension,
1006                    });
1007                }
1008                LayerKind::Offsets => {}
1009            }
1010        }
1011
1012        // Only allocate if needed
1013        let mut validity_builder = if has_nulls {
1014            BooleanBufferBuilder::new(all_num_values)
1015        } else {
1016            BooleanBufferBuilder::new(0)
1017        };
1018        let mut all_offsets = if matches!(layer_kind, LayerKind::Offsets) {
1019            let mut all_offsets = Vec::with_capacity(all_num_values);
1020            all_offsets.push(0);
1021            all_offsets
1022        } else {
1023            Vec::new()
1024        };
1025
1026        for layer in collected {
1027            match layer {
1028                RawRepDef::Validity(ValidityDesc {
1029                    validity: Some(validity),
1030                    ..
1031                }) => {
1032                    validity_builder.append_buffer(validity);
1033                }
1034                RawRepDef::Validity(ValidityDesc {
1035                    validity: None,
1036                    num_values,
1037                }) => {
1038                    validity_builder.append_n(*num_values, true);
1039                }
1040                RawRepDef::Fsl(FslDesc {
1041                    validity,
1042                    num_values,
1043                    ..
1044                }) => {
1045                    if let Some(validity) = validity {
1046                        validity_builder.append_buffer(validity);
1047                    } else {
1048                        validity_builder.append_n(*num_values, true);
1049                    }
1050                }
1051                RawRepDef::Offsets(OffsetDesc {
1052                    offsets,
1053                    validity: Some(validity),
1054                    has_empty_lists,
1055                    ..
1056                }) => {
1057                    all_has_empty_lists |= has_empty_lists;
1058                    validity_builder.append_buffer(validity);
1059                    let last = *all_offsets.last().unwrap();
1060                    all_offsets.extend(offsets.iter().skip(1).map(|off| *off + last));
1061                }
1062                RawRepDef::Offsets(OffsetDesc {
1063                    offsets,
1064                    validity: None,
1065                    has_empty_lists,
1066                    num_values,
1067                    ..
1068                }) => {
1069                    all_has_empty_lists |= has_empty_lists;
1070                    if has_nulls {
1071                        validity_builder.append_n(*num_values, true);
1072                    }
1073                    let last = *all_offsets.last().unwrap();
1074                    all_offsets.extend(offsets.iter().skip(1).map(|off| *off + last));
1075                }
1076            }
1077        }
1078        let validity = if has_nulls {
1079            Some(validity_builder.finish())
1080        } else {
1081            None
1082        };
1083        match layer_kind {
1084            LayerKind::Fsl => RawRepDef::Fsl(FslDesc {
1085                validity,
1086                num_values: all_num_values,
1087                dimension: all_dimension,
1088            }),
1089            LayerKind::Validity => RawRepDef::Validity(ValidityDesc {
1090                validity,
1091                num_values: all_num_values,
1092            }),
1093            LayerKind::Offsets => RawRepDef::Offsets(OffsetDesc {
1094                offsets: all_offsets.into(),
1095                validity,
1096                has_empty_lists: all_has_empty_lists,
1097                num_values: all_num_values,
1098                num_specials: total_num_specials,
1099            }),
1100        }
1101    }
1102
1103    /// Converts the validity / offsets buffers that have been gathered so far
1104    /// into repetition and definition levels
1105    pub fn serialize(builders: Vec<Self>) -> SerializedRepDefs {
1106        assert!(!builders.is_empty());
1107        if builders.iter().all(|b| b.is_empty()) {
1108            // No repetition, all-valid
1109            return SerializedRepDefs::empty(
1110                builders
1111                    .first()
1112                    .unwrap()
1113                    .repdefs
1114                    .iter()
1115                    .map(|_| DefinitionInterpretation::AllValidItem)
1116                    .collect::<Vec<_>>(),
1117            );
1118        }
1119
1120        let num_layers = builders[0].num_layers();
1121        let combined_layers = (0..num_layers)
1122            .map(|layer_index| {
1123                Self::concat_layers(
1124                    builders.iter().map(|b| &b.repdefs[layer_index]),
1125                    builders.len(),
1126                )
1127            })
1128            .collect::<Vec<_>>();
1129        debug_assert!(
1130            builders
1131                .iter()
1132                .all(|b| b.num_layers() == builders[0].num_layers())
1133        );
1134
1135        let total_len = combined_layers.last().unwrap().num_values()
1136            + combined_layers
1137                .iter()
1138                .map(|l| l.num_specials())
1139                .sum::<usize>();
1140        let max_rep = combined_layers.iter().map(|l| l.max_rep()).sum::<u16>();
1141        let max_def = combined_layers.iter().map(|l| l.max_def()).sum::<u16>();
1142
1143        let mut context = SerializerContext::new(total_len, num_layers, max_rep, max_def);
1144        for layer in combined_layers.into_iter() {
1145            match layer {
1146                RawRepDef::Validity(def) => {
1147                    context.record_validity(&def);
1148                }
1149                RawRepDef::Offsets(rep) => {
1150                    context.record_offsets(&rep);
1151                }
1152                RawRepDef::Fsl(fsl) => {
1153                    context.record_fsl(&fsl);
1154                }
1155            }
1156        }
1157        context.build()
1158    }
1159}
1160
1161/// Starts with serialized repetition and definition levels and unravels
1162/// them into validity buffers and offsets buffers
1163///
1164/// This is used during decoding to create the necessary arrow structures
1165#[derive(Debug)]
1166pub struct RepDefUnraveler {
1167    rep_levels: Option<LevelBuffer>,
1168    def_levels: Option<LevelBuffer>,
1169    // Maps from definition level to the rep level at which that definition level is visible
1170    levels_to_rep: Vec<u16>,
1171    def_meaning: Arc<[DefinitionInterpretation]>,
1172    // Current definition level to compare to.
1173    current_def_cmp: u16,
1174    // Current rep level, determines which specials we can see
1175    current_rep_cmp: u16,
1176    // Current layer index, 0 means inner-most layer and it counts up from there.  Used to index
1177    // into special_defs
1178    current_layer: usize,
1179    // Number of items in the inner-most layer (needed if the definition levels are not present)
1180    num_items: u64,
1181}
1182
1183impl RepDefUnraveler {
1184    /// Creates a new unraveler from serialized repetition and definition information
1185    pub fn new(
1186        rep_levels: Option<LevelBuffer>,
1187        def_levels: Option<LevelBuffer>,
1188        def_meaning: Arc<[DefinitionInterpretation]>,
1189        num_items: u64,
1190    ) -> Self {
1191        let mut levels_to_rep = Vec::with_capacity(def_meaning.len());
1192        let mut rep_counter = 0;
1193        // Level=0 is always visible and means valid item
1194        levels_to_rep.push(0);
1195        for meaning in def_meaning.as_ref() {
1196            match meaning {
1197                DefinitionInterpretation::AllValidItem | DefinitionInterpretation::AllValidList => {
1198                    // There is no corresponding level, so nothing to put in levels_to_rep
1199                }
1200                DefinitionInterpretation::NullableItem => {
1201                    // Some null structs are not visible at inner rep levels in cases like LIST<STRUCT<LIST<...>>>
1202                    levels_to_rep.push(rep_counter);
1203                }
1204                DefinitionInterpretation::NullableList => {
1205                    rep_counter += 1;
1206                    levels_to_rep.push(rep_counter);
1207                }
1208                DefinitionInterpretation::EmptyableList => {
1209                    rep_counter += 1;
1210                    levels_to_rep.push(rep_counter);
1211                }
1212                DefinitionInterpretation::NullableAndEmptyableList => {
1213                    rep_counter += 1;
1214                    levels_to_rep.push(rep_counter);
1215                    levels_to_rep.push(rep_counter);
1216                }
1217            }
1218        }
1219        Self {
1220            rep_levels,
1221            def_levels,
1222            current_def_cmp: 0,
1223            current_rep_cmp: 0,
1224            levels_to_rep,
1225            current_layer: 0,
1226            def_meaning,
1227            num_items,
1228        }
1229    }
1230
1231    pub fn is_all_valid(&self) -> bool {
1232        self.def_levels.is_none() || self.def_meaning[self.current_layer].is_all_valid()
1233    }
1234
1235    /// If the current level is a repetition layer then this returns the number of lists
1236    /// at this level.
1237    ///
1238    /// This is not valid to call when the current level is a struct/primitive layer because
1239    /// in some cases there may be no rep or def information to know this.
1240    pub fn max_lists(&self) -> usize {
1241        debug_assert!(
1242            self.def_meaning[self.current_layer] != DefinitionInterpretation::NullableItem
1243        );
1244        self.rep_levels
1245            .as_ref()
1246            // Worst case every rep item is max_rep and a new list
1247            .map(|levels| levels.len())
1248            .unwrap_or(0)
1249    }
1250
1251    /// Unravels a layer of offsets from the unraveler into the given offset width
1252    ///
1253    /// When decoding a list the caller should first unravel the offsets and then
1254    /// unravel the validity (this is the opposite order used during encoding)
1255    pub fn unravel_offsets<T: ArrowNativeType>(
1256        &mut self,
1257        offsets: &mut Vec<T>,
1258        validity: Option<&mut BooleanBufferBuilder>,
1259    ) -> Result<()> {
1260        let rep_levels = self
1261            .rep_levels
1262            .as_mut()
1263            .expect("Expected repetition level but data didn't contain repetition");
1264        let valid_level = self.current_def_cmp;
1265        let (null_level, empty_level) = match self.def_meaning[self.current_layer] {
1266            DefinitionInterpretation::NullableList => {
1267                self.current_def_cmp += 1;
1268                (valid_level + 1, 0)
1269            }
1270            DefinitionInterpretation::EmptyableList => {
1271                self.current_def_cmp += 1;
1272                (0, valid_level + 1)
1273            }
1274            DefinitionInterpretation::NullableAndEmptyableList => {
1275                self.current_def_cmp += 2;
1276                (valid_level + 1, valid_level + 2)
1277            }
1278            DefinitionInterpretation::AllValidList => (0, 0),
1279            _ => unreachable!(),
1280        };
1281        self.current_layer += 1;
1282
1283        // This is the highest def level that is still visible.  Once we hit a list then
1284        // we stop looking because any null / empty list (or list masked by a higher level
1285        // null) will not be visible
1286        let mut max_level = null_level.max(empty_level).max(valid_level);
1287        // Anything higher than this (but less than max_level) is a null struct masking our
1288        // list.  We will materialize this is a null list.
1289        let upper_null = max_level;
1290        for level in self.def_meaning[self.current_layer..].iter() {
1291            match level {
1292                DefinitionInterpretation::NullableItem => {
1293                    max_level += 1;
1294                }
1295                DefinitionInterpretation::AllValidItem => {}
1296                _ => {
1297                    break;
1298                }
1299            }
1300        }
1301
1302        let mut curlen: usize = offsets.last().map(|o| o.as_usize()).unwrap_or(0);
1303
1304        // If offsets is empty this is a no-op.  If offsets is not empty that means we already
1305        // added a set of offsets.  For example, we might have added [0, 3, 5] (2 lists).  Now
1306        // say we want to add [0, 1, 4] (2 lists).  We should get [0, 3, 5, 6, 9] (4 lists).  If
1307        // we don't pop here we get [0, 3, 5, 5, 6, 9] which is wrong.
1308        //
1309        // Or, to think about it another way, if every unraveler adds the starting 0 and the trailing
1310        // length then we have N + unravelers.len() values instead of N + 1.
1311        offsets.pop();
1312
1313        let to_offset = |val: usize| {
1314            T::from_usize(val)
1315            .ok_or_else(|| Error::invalid_input("A single batch had more than i32::MAX values and so a large container type is required"))
1316        };
1317        self.current_rep_cmp += 1;
1318        if let Some(def_levels) = &mut self.def_levels {
1319            assert!(rep_levels.len() == def_levels.len());
1320            // It's possible validity is None even if we have def levels.  For example, we might have
1321            // empty lists (which require def levels) but no nulls.
1322            let mut push_validity: Box<dyn FnMut(bool)> = if let Some(validity) = validity {
1323                Box::new(|is_valid| validity.append(is_valid))
1324            } else {
1325                Box::new(|_| {})
1326            };
1327            // This is a strange access pattern.  We are iterating over the rep/def levels and
1328            // at the same time writing the rep/def levels.  This means we need both a mutable
1329            // and immutable reference to the rep/def levels.
1330            let mut read_idx = 0;
1331            let mut write_idx = 0;
1332            while read_idx < rep_levels.len() {
1333                // SAFETY: We assert that rep_levels and def_levels have the same
1334                // len and read_idx and write_idx can never go past the end.
1335                unsafe {
1336                    let rep_val = *rep_levels.get_unchecked(read_idx);
1337                    if rep_val != 0 {
1338                        let def_val = *def_levels.get_unchecked(read_idx);
1339                        // Copy over
1340                        *rep_levels.get_unchecked_mut(write_idx) = rep_val - 1;
1341                        *def_levels.get_unchecked_mut(write_idx) = def_val;
1342                        write_idx += 1;
1343
1344                        if def_val == 0 {
1345                            // This is a valid list
1346                            offsets.push(to_offset(curlen)?);
1347                            curlen += 1;
1348                            push_validity(true);
1349                        } else if def_val > max_level {
1350                            // This is not visible at this rep level, do not add to offsets, but keep in repdef
1351                        } else if def_val == null_level || def_val > upper_null {
1352                            // This is a null list (or a list masked by a null struct)
1353                            offsets.push(to_offset(curlen)?);
1354                            push_validity(false);
1355                        } else if def_val == empty_level {
1356                            // This is an empty list
1357                            offsets.push(to_offset(curlen)?);
1358                            push_validity(true);
1359                        } else {
1360                            // New valid list starting with null item
1361                            offsets.push(to_offset(curlen)?);
1362                            curlen += 1;
1363                            push_validity(true);
1364                        }
1365                    } else {
1366                        curlen += 1;
1367                    }
1368                    read_idx += 1;
1369                }
1370            }
1371            offsets.push(to_offset(curlen)?);
1372            rep_levels.truncate(write_idx);
1373            def_levels.truncate(write_idx);
1374            Ok(())
1375        } else {
1376            // SAFETY: See above loop
1377            let mut read_idx = 0;
1378            let mut write_idx = 0;
1379            let old_offsets_len = offsets.len();
1380            while read_idx < rep_levels.len() {
1381                // SAFETY: read_idx / write_idx cannot go past rep_levels.len()
1382                unsafe {
1383                    let rep_val = *rep_levels.get_unchecked(read_idx);
1384                    if rep_val != 0 {
1385                        // Finish the current list
1386                        offsets.push(to_offset(curlen)?);
1387                        *rep_levels.get_unchecked_mut(write_idx) = rep_val - 1;
1388                        write_idx += 1;
1389                    }
1390                    curlen += 1;
1391                    read_idx += 1;
1392                }
1393            }
1394            let num_new_lists = offsets.len() - old_offsets_len;
1395            offsets.push(to_offset(curlen)?);
1396            rep_levels.truncate(offsets.len() - 1);
1397            if let Some(validity) = validity {
1398                // Even though we don't have validity it is possible another unraveler did and so we need
1399                // to push all valids
1400                validity.append_n(num_new_lists, true);
1401            }
1402            Ok(())
1403        }
1404    }
1405
1406    pub fn skip_validity(&mut self) {
1407        debug_assert!(self.is_all_valid());
1408        self.current_layer += 1;
1409    }
1410
1411    /// Unravels a layer of validity from the definition levels
1412    pub fn unravel_validity(&mut self, validity: &mut BooleanBufferBuilder) {
1413        let meaning = self.def_meaning[self.current_layer];
1414        if meaning == DefinitionInterpretation::AllValidItem || self.def_levels.is_none() {
1415            self.current_layer += 1;
1416            validity.append_n(self.num_items as usize, true);
1417            return;
1418        }
1419
1420        self.current_layer += 1;
1421        let def_levels = &self.def_levels.as_ref().unwrap();
1422
1423        let current_def_cmp = self.current_def_cmp;
1424        self.current_def_cmp += 1;
1425
1426        for is_valid in def_levels.iter().filter_map(|&level| {
1427            if self.levels_to_rep[level as usize] <= self.current_rep_cmp {
1428                Some(level <= current_def_cmp)
1429            } else {
1430                None
1431            }
1432        }) {
1433            validity.append(is_valid);
1434        }
1435    }
1436
1437    pub fn decimate(&mut self, dimension: usize) {
1438        if self.rep_levels.is_some() {
1439            // If we need to support this then I think we need to walk through the rep def levels to find
1440            // the spots at which we keep.  E.g. if we have:
1441            //  rep: 1 0 0 1 0 1 0 0 0 1 0 0
1442            //  def: 1 1 1 0 1 0 1 1 0 1 1 0
1443            //  dimension: 2
1444            //
1445            // The output should be:
1446            //  rep: 1 0 0 1 0 0 0
1447            //  def: 1 1 1 0 1 1 0
1448            //
1449            // Maybe there's some special logic for empty/null lists?  I'll save the headache for future me.
1450            todo!("Not yet supported FSL<...List<...>>");
1451        }
1452        let Some(def_levels) = self.def_levels.as_mut() else {
1453            return;
1454        };
1455        let mut read_idx = 0;
1456        let mut write_idx = 0;
1457        while read_idx < def_levels.len() {
1458            unsafe {
1459                *def_levels.get_unchecked_mut(write_idx) = *def_levels.get_unchecked(read_idx);
1460            }
1461            write_idx += 1;
1462            read_idx += dimension;
1463        }
1464        def_levels.truncate(write_idx);
1465    }
1466}
1467
1468/// As we decode we may extract rep/def information from multiple pages (or multiple
1469/// chunks within a page).
1470///
1471/// For each chunk we create an unraveler.  Each unraveler can have a completely different
1472/// interpretation (e.g. one page might contain null items but no null structs and the next
1473/// page might have null structs but no null items).
1474///
1475/// Concatenating these unravelers would be tricky and expensive so instead we have a
1476/// composite unraveler which unravels across multiple unravelers.
1477///
1478/// Note: this class should be used even if there is only one page / unraveler.  This is
1479/// because the `RepDefUnraveler`'s API is more complex (it's meant to be called by this
1480/// class)
1481#[derive(Debug)]
1482pub struct CompositeRepDefUnraveler {
1483    unravelers: Vec<RepDefUnraveler>,
1484}
1485
1486impl CompositeRepDefUnraveler {
1487    pub fn new(unravelers: Vec<RepDefUnraveler>) -> Self {
1488        Self { unravelers }
1489    }
1490
1491    /// Unravels a layer of validity
1492    ///
1493    /// Returns None if there are no null items in this layer
1494    pub fn unravel_validity(&mut self, num_values: usize) -> Option<NullBuffer> {
1495        let is_all_valid = self
1496            .unravelers
1497            .iter()
1498            .all(|unraveler| unraveler.is_all_valid());
1499
1500        if is_all_valid {
1501            for unraveler in self.unravelers.iter_mut() {
1502                unraveler.skip_validity();
1503            }
1504            None
1505        } else {
1506            let mut validity = BooleanBufferBuilder::new(num_values);
1507            for unraveler in self.unravelers.iter_mut() {
1508                unraveler.unravel_validity(&mut validity);
1509            }
1510            Some(NullBuffer::new(validity.finish()))
1511        }
1512    }
1513
1514    pub fn unravel_fsl_validity(
1515        &mut self,
1516        num_values: usize,
1517        dimension: usize,
1518    ) -> Option<NullBuffer> {
1519        for unraveler in self.unravelers.iter_mut() {
1520            unraveler.decimate(dimension);
1521        }
1522        self.unravel_validity(num_values)
1523    }
1524
1525    /// Unravels a layer of offsets (and the validity for that layer)
1526    pub fn unravel_offsets<T: ArrowNativeType>(
1527        &mut self,
1528    ) -> Result<(OffsetBuffer<T>, Option<NullBuffer>)> {
1529        let mut is_all_valid = true;
1530        let mut max_num_lists = 0;
1531        for unraveler in self.unravelers.iter() {
1532            is_all_valid &= unraveler.is_all_valid();
1533            max_num_lists += unraveler.max_lists();
1534        }
1535
1536        let mut validity = if is_all_valid {
1537            None
1538        } else {
1539            // Note: This is probably an over-estimate and potentially even an under-estimate.  We only know
1540            // right now how many items we have and not how many rows.  (TODO: Shouldn't we know the # of rows?)
1541            Some(BooleanBufferBuilder::new(max_num_lists))
1542        };
1543
1544        let mut offsets = Vec::with_capacity(max_num_lists + 1);
1545
1546        for unraveler in self.unravelers.iter_mut() {
1547            unraveler.unravel_offsets(&mut offsets, validity.as_mut())?;
1548        }
1549
1550        Ok((
1551            OffsetBuffer::new(ScalarBuffer::from(offsets)),
1552            validity.map(|mut v| NullBuffer::new(v.finish())),
1553        ))
1554    }
1555}
1556
1557/// A [`ControlWordIterator`] when there are both repetition and definition levels
1558///
1559/// The iterator will put the repetition level in the upper bits and the definition
1560/// level in the lower bits.  The number of bits used for each level is determined
1561/// by the width of the repetition and definition levels.
1562#[derive(Debug)]
1563pub struct BinaryControlWordIterator<I: Iterator<Item = (u16, u16)>, W> {
1564    repdef: I,
1565    def_width: usize,
1566    max_rep: u16,
1567    max_visible_def: u16,
1568    rep_mask: u16,
1569    def_mask: u16,
1570    bits_rep: u8,
1571    bits_def: u8,
1572    phantom: std::marker::PhantomData<W>,
1573}
1574
1575impl<I: Iterator<Item = (u16, u16)>> BinaryControlWordIterator<I, u8> {
1576    fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1577        let next = self.repdef.next()?;
1578        let control_word: u8 =
1579            (((next.0 & self.rep_mask) as u8) << self.def_width) + ((next.1 & self.def_mask) as u8);
1580        buf.push(control_word);
1581        let is_new_row = next.0 == self.max_rep;
1582        let is_visible = next.1 <= self.max_visible_def;
1583        let is_valid_item = next.1 == 0;
1584        Some(ControlWordDesc {
1585            is_new_row,
1586            is_visible,
1587            is_valid_item,
1588        })
1589    }
1590}
1591
1592impl<I: Iterator<Item = (u16, u16)>> BinaryControlWordIterator<I, u16> {
1593    fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1594        let next = self.repdef.next()?;
1595        let control_word: u16 =
1596            ((next.0 & self.rep_mask) << self.def_width) + (next.1 & self.def_mask);
1597        let control_word = control_word.to_le_bytes();
1598        buf.push(control_word[0]);
1599        buf.push(control_word[1]);
1600        let is_new_row = next.0 == self.max_rep;
1601        let is_visible = next.1 <= self.max_visible_def;
1602        let is_valid_item = next.1 == 0;
1603        Some(ControlWordDesc {
1604            is_new_row,
1605            is_visible,
1606            is_valid_item,
1607        })
1608    }
1609}
1610
1611impl<I: Iterator<Item = (u16, u16)>> BinaryControlWordIterator<I, u32> {
1612    fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1613        let next = self.repdef.next()?;
1614        let control_word: u32 = (((next.0 & self.rep_mask) as u32) << self.def_width)
1615            + ((next.1 & self.def_mask) as u32);
1616        let control_word = control_word.to_le_bytes();
1617        buf.push(control_word[0]);
1618        buf.push(control_word[1]);
1619        buf.push(control_word[2]);
1620        buf.push(control_word[3]);
1621        let is_new_row = next.0 == self.max_rep;
1622        let is_visible = next.1 <= self.max_visible_def;
1623        let is_valid_item = next.1 == 0;
1624        Some(ControlWordDesc {
1625            is_new_row,
1626            is_visible,
1627            is_valid_item,
1628        })
1629    }
1630}
1631
1632/// A [`ControlWordIterator`] when there are only definition levels or only repetition levels
1633#[derive(Debug)]
1634pub struct UnaryControlWordIterator<I: Iterator<Item = u16>, W> {
1635    repdef: I,
1636    level_mask: u16,
1637    bits_rep: u8,
1638    bits_def: u8,
1639    max_rep: u16,
1640    phantom: std::marker::PhantomData<W>,
1641}
1642
1643impl<I: Iterator<Item = u16>> UnaryControlWordIterator<I, u8> {
1644    fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1645        let next = self.repdef.next()?;
1646        buf.push((next & self.level_mask) as u8);
1647        let is_new_row = self.max_rep == 0 || next == self.max_rep;
1648        let is_valid_item = next == 0 || self.bits_def == 0;
1649        Some(ControlWordDesc {
1650            is_new_row,
1651            // Either there is no rep, in which case there are no invisible items
1652            // or there is no def, in which case there are no invisible items
1653            is_visible: true,
1654            is_valid_item,
1655        })
1656    }
1657}
1658
1659impl<I: Iterator<Item = u16>> UnaryControlWordIterator<I, u16> {
1660    fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1661        let next = self.repdef.next().unwrap() & self.level_mask;
1662        let control_word = next.to_le_bytes();
1663        buf.push(control_word[0]);
1664        buf.push(control_word[1]);
1665        let is_new_row = self.max_rep == 0 || next == self.max_rep;
1666        let is_valid_item = next == 0 || self.bits_def == 0;
1667        Some(ControlWordDesc {
1668            is_new_row,
1669            is_visible: true,
1670            is_valid_item,
1671        })
1672    }
1673}
1674
1675impl<I: Iterator<Item = u16>> UnaryControlWordIterator<I, u32> {
1676    fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1677        let next = self.repdef.next()?;
1678        let next = (next & self.level_mask) as u32;
1679        let control_word = next.to_le_bytes();
1680        buf.push(control_word[0]);
1681        buf.push(control_word[1]);
1682        buf.push(control_word[2]);
1683        buf.push(control_word[3]);
1684        let is_new_row = self.max_rep == 0 || next as u16 == self.max_rep;
1685        let is_valid_item = next == 0 || self.bits_def == 0;
1686        Some(ControlWordDesc {
1687            is_new_row,
1688            is_visible: true,
1689            is_valid_item,
1690        })
1691    }
1692}
1693
1694/// A [`ControlWordIterator`] when there are no repetition or definition levels
1695#[derive(Debug)]
1696pub struct NilaryControlWordIterator {
1697    len: usize,
1698    idx: usize,
1699}
1700
1701impl NilaryControlWordIterator {
1702    fn append_next(&mut self) -> Option<ControlWordDesc> {
1703        if self.idx == self.len {
1704            None
1705        } else {
1706            self.idx += 1;
1707            Some(ControlWordDesc {
1708                is_new_row: true,
1709                is_visible: true,
1710                is_valid_item: true,
1711            })
1712        }
1713    }
1714}
1715
1716/// Helper function to get a bit mask of the given width
1717fn get_mask(width: u16) -> u16 {
1718    (1 << width) - 1
1719}
1720
1721// We're really going out of our way to avoid boxing here but this will be called on a per-value basis
1722// so it is in the critical path.
1723type SpecificBinaryControlWordIterator<'a, T> = BinaryControlWordIterator<
1724    Zip<Copied<std::slice::Iter<'a, u16>>, Copied<std::slice::Iter<'a, u16>>>,
1725    T,
1726>;
1727
1728/// An iterator that generates control words from repetition and definition levels
1729///
1730/// "Control word" is just a fancy term for a single u8/u16/u32 that contains both
1731/// the repetition and definition in it.
1732///
1733/// In the large majority of case we only need a single byte to represent both the
1734/// repetition and definition levels.  However, if there is deep nesting then we may
1735/// need two bytes.  In the worst case we need 4 bytes though this suggests hundreds of
1736/// levels of nesting which seems unlikely to encounter in practice.
1737#[derive(Debug)]
1738pub enum ControlWordIterator<'a> {
1739    Binary8(SpecificBinaryControlWordIterator<'a, u8>),
1740    Binary16(SpecificBinaryControlWordIterator<'a, u16>),
1741    Binary32(SpecificBinaryControlWordIterator<'a, u32>),
1742    Unary8(UnaryControlWordIterator<Copied<std::slice::Iter<'a, u16>>, u8>),
1743    Unary16(UnaryControlWordIterator<Copied<std::slice::Iter<'a, u16>>, u16>),
1744    Unary32(UnaryControlWordIterator<Copied<std::slice::Iter<'a, u16>>, u32>),
1745    Nilary(NilaryControlWordIterator),
1746}
1747
1748/// Describes the properties of a control word
1749#[derive(Debug)]
1750pub struct ControlWordDesc {
1751    pub is_new_row: bool,
1752    pub is_visible: bool,
1753    pub is_valid_item: bool,
1754}
1755
1756impl ControlWordIterator<'_> {
1757    /// Appends the next control word to the buffer
1758    ///
1759    /// Returns true if this is the start of a new item (i.e. the repetition level is maxed out)
1760    pub fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1761        match self {
1762            Self::Binary8(iter) => iter.append_next(buf),
1763            Self::Binary16(iter) => iter.append_next(buf),
1764            Self::Binary32(iter) => iter.append_next(buf),
1765            Self::Unary8(iter) => iter.append_next(buf),
1766            Self::Unary16(iter) => iter.append_next(buf),
1767            Self::Unary32(iter) => iter.append_next(buf),
1768            Self::Nilary(iter) => iter.append_next(),
1769        }
1770    }
1771
1772    /// Return true if the control word iterator has repetition levels
1773    pub fn has_repetition(&self) -> bool {
1774        match self {
1775            Self::Binary8(_) | Self::Binary16(_) | Self::Binary32(_) => true,
1776            Self::Unary8(iter) => iter.bits_rep > 0,
1777            Self::Unary16(iter) => iter.bits_rep > 0,
1778            Self::Unary32(iter) => iter.bits_rep > 0,
1779            Self::Nilary(_) => false,
1780        }
1781    }
1782
1783    /// Returns the number of bytes per control word
1784    pub fn bytes_per_word(&self) -> usize {
1785        match self {
1786            Self::Binary8(_) => 1,
1787            Self::Binary16(_) => 2,
1788            Self::Binary32(_) => 4,
1789            Self::Unary8(_) => 1,
1790            Self::Unary16(_) => 2,
1791            Self::Unary32(_) => 4,
1792            Self::Nilary(_) => 0,
1793        }
1794    }
1795
1796    /// Returns the number of bits used for the repetition level
1797    pub fn bits_rep(&self) -> u8 {
1798        match self {
1799            Self::Binary8(iter) => iter.bits_rep,
1800            Self::Binary16(iter) => iter.bits_rep,
1801            Self::Binary32(iter) => iter.bits_rep,
1802            Self::Unary8(iter) => iter.bits_rep,
1803            Self::Unary16(iter) => iter.bits_rep,
1804            Self::Unary32(iter) => iter.bits_rep,
1805            Self::Nilary(_) => 0,
1806        }
1807    }
1808
1809    /// Returns the number of bits used for the definition level
1810    pub fn bits_def(&self) -> u8 {
1811        match self {
1812            Self::Binary8(iter) => iter.bits_def,
1813            Self::Binary16(iter) => iter.bits_def,
1814            Self::Binary32(iter) => iter.bits_def,
1815            Self::Unary8(iter) => iter.bits_def,
1816            Self::Unary16(iter) => iter.bits_def,
1817            Self::Unary32(iter) => iter.bits_def,
1818            Self::Nilary(_) => 0,
1819        }
1820    }
1821}
1822
1823/// Builds a [`ControlWordIterator`] from repetition and definition levels
1824/// by first calculating the width needed and then creating the iterator
1825/// with the appropriate width
1826pub fn build_control_word_iterator<'a>(
1827    rep: Option<&'a [u16]>,
1828    max_rep: u16,
1829    def: Option<&'a [u16]>,
1830    max_def: u16,
1831    max_visible_def: u16,
1832    len: usize,
1833) -> ControlWordIterator<'a> {
1834    let rep_width = if max_rep == 0 {
1835        0
1836    } else {
1837        log_2_ceil(max_rep as u32) as u16
1838    };
1839    let rep_mask = if max_rep == 0 { 0 } else { get_mask(rep_width) };
1840    let def_width = if max_def == 0 {
1841        0
1842    } else {
1843        log_2_ceil(max_def as u32) as u16
1844    };
1845    let def_mask = if max_def == 0 { 0 } else { get_mask(def_width) };
1846    let total_width = rep_width + def_width;
1847    match (rep, def) {
1848        (Some(rep), Some(def)) => {
1849            let iter = rep.iter().copied().zip(def.iter().copied());
1850            let def_width = def_width as usize;
1851            if total_width <= 8 {
1852                ControlWordIterator::Binary8(BinaryControlWordIterator {
1853                    repdef: iter,
1854                    rep_mask,
1855                    def_mask,
1856                    def_width,
1857                    max_rep,
1858                    max_visible_def,
1859                    bits_rep: rep_width as u8,
1860                    bits_def: def_width as u8,
1861                    phantom: std::marker::PhantomData,
1862                })
1863            } else if total_width <= 16 {
1864                ControlWordIterator::Binary16(BinaryControlWordIterator {
1865                    repdef: iter,
1866                    rep_mask,
1867                    def_mask,
1868                    def_width,
1869                    max_rep,
1870                    max_visible_def,
1871                    bits_rep: rep_width as u8,
1872                    bits_def: def_width as u8,
1873                    phantom: std::marker::PhantomData,
1874                })
1875            } else {
1876                ControlWordIterator::Binary32(BinaryControlWordIterator {
1877                    repdef: iter,
1878                    rep_mask,
1879                    def_mask,
1880                    def_width,
1881                    max_rep,
1882                    max_visible_def,
1883                    bits_rep: rep_width as u8,
1884                    bits_def: def_width as u8,
1885                    phantom: std::marker::PhantomData,
1886                })
1887            }
1888        }
1889        (Some(lev), None) => {
1890            let iter = lev.iter().copied();
1891            if total_width <= 8 {
1892                ControlWordIterator::Unary8(UnaryControlWordIterator {
1893                    repdef: iter,
1894                    level_mask: rep_mask,
1895                    bits_rep: total_width as u8,
1896                    bits_def: 0,
1897                    max_rep,
1898                    phantom: std::marker::PhantomData,
1899                })
1900            } else if total_width <= 16 {
1901                ControlWordIterator::Unary16(UnaryControlWordIterator {
1902                    repdef: iter,
1903                    level_mask: rep_mask,
1904                    bits_rep: total_width as u8,
1905                    bits_def: 0,
1906                    max_rep,
1907                    phantom: std::marker::PhantomData,
1908                })
1909            } else {
1910                ControlWordIterator::Unary32(UnaryControlWordIterator {
1911                    repdef: iter,
1912                    level_mask: rep_mask,
1913                    bits_rep: total_width as u8,
1914                    bits_def: 0,
1915                    max_rep,
1916                    phantom: std::marker::PhantomData,
1917                })
1918            }
1919        }
1920        (None, Some(lev)) => {
1921            let iter = lev.iter().copied();
1922            if total_width <= 8 {
1923                ControlWordIterator::Unary8(UnaryControlWordIterator {
1924                    repdef: iter,
1925                    level_mask: def_mask,
1926                    bits_rep: 0,
1927                    bits_def: total_width as u8,
1928                    max_rep: 0,
1929                    phantom: std::marker::PhantomData,
1930                })
1931            } else if total_width <= 16 {
1932                ControlWordIterator::Unary16(UnaryControlWordIterator {
1933                    repdef: iter,
1934                    level_mask: def_mask,
1935                    bits_rep: 0,
1936                    bits_def: total_width as u8,
1937                    max_rep: 0,
1938                    phantom: std::marker::PhantomData,
1939                })
1940            } else {
1941                ControlWordIterator::Unary32(UnaryControlWordIterator {
1942                    repdef: iter,
1943                    level_mask: def_mask,
1944                    bits_rep: 0,
1945                    bits_def: total_width as u8,
1946                    max_rep: 0,
1947                    phantom: std::marker::PhantomData,
1948                })
1949            }
1950        }
1951        (None, None) => ControlWordIterator::Nilary(NilaryControlWordIterator { len, idx: 0 }),
1952    }
1953}
1954
1955/// A parser to unwrap control words into repetition and definition levels
1956///
1957/// This is the inverse of the [`ControlWordIterator`].
1958#[derive(Copy, Clone, Debug)]
1959pub enum ControlWordParser {
1960    // First item is the bits to shift, second is the mask to apply (the mask can be
1961    // calculated from the bits to shift but we don't want to calculate it each time)
1962    BOTH8(u8, u32),
1963    BOTH16(u8, u32),
1964    BOTH32(u8, u32),
1965    REP8,
1966    REP16,
1967    REP32,
1968    DEF8,
1969    DEF16,
1970    DEF32,
1971    NIL,
1972}
1973
1974impl ControlWordParser {
1975    fn parse_both<const WORD_SIZE: u8>(
1976        src: &[u8],
1977        dst_rep: &mut Vec<u16>,
1978        dst_def: &mut Vec<u16>,
1979        bits_to_shift: u8,
1980        mask_to_apply: u32,
1981    ) {
1982        match WORD_SIZE {
1983            1 => {
1984                let word = src[0];
1985                let rep = word >> bits_to_shift;
1986                let def = word & (mask_to_apply as u8);
1987                dst_rep.push(rep as u16);
1988                dst_def.push(def as u16);
1989            }
1990            2 => {
1991                let word = u16::from_le_bytes([src[0], src[1]]);
1992                let rep = word >> bits_to_shift;
1993                let def = word & mask_to_apply as u16;
1994                dst_rep.push(rep);
1995                dst_def.push(def);
1996            }
1997            4 => {
1998                let word = u32::from_le_bytes([src[0], src[1], src[2], src[3]]);
1999                let rep = word >> bits_to_shift;
2000                let def = word & mask_to_apply;
2001                dst_rep.push(rep as u16);
2002                dst_def.push(def as u16);
2003            }
2004            _ => unreachable!(),
2005        }
2006    }
2007
2008    fn parse_desc_both<const WORD_SIZE: u8>(
2009        src: &[u8],
2010        bits_to_shift: u8,
2011        mask_to_apply: u32,
2012        max_rep: u16,
2013        max_visible_def: u16,
2014    ) -> ControlWordDesc {
2015        match WORD_SIZE {
2016            1 => {
2017                let word = src[0];
2018                let rep = word >> bits_to_shift;
2019                let def = word & (mask_to_apply as u8);
2020                let is_visible = def as u16 <= max_visible_def;
2021                let is_new_row = rep as u16 == max_rep;
2022                let is_valid_item = def == 0;
2023                ControlWordDesc {
2024                    is_visible,
2025                    is_new_row,
2026                    is_valid_item,
2027                }
2028            }
2029            2 => {
2030                let word = u16::from_le_bytes([src[0], src[1]]);
2031                let rep = word >> bits_to_shift;
2032                let def = word & mask_to_apply as u16;
2033                let is_visible = def <= max_visible_def;
2034                let is_new_row = rep == max_rep;
2035                let is_valid_item = def == 0;
2036                ControlWordDesc {
2037                    is_visible,
2038                    is_new_row,
2039                    is_valid_item,
2040                }
2041            }
2042            4 => {
2043                let word = u32::from_le_bytes([src[0], src[1], src[2], src[3]]);
2044                let rep = word >> bits_to_shift;
2045                let def = word & mask_to_apply;
2046                let is_visible = def as u16 <= max_visible_def;
2047                let is_new_row = rep as u16 == max_rep;
2048                let is_valid_item = def == 0;
2049                ControlWordDesc {
2050                    is_visible,
2051                    is_new_row,
2052                    is_valid_item,
2053                }
2054            }
2055            _ => unreachable!(),
2056        }
2057    }
2058
2059    fn parse_one<const WORD_SIZE: u8>(src: &[u8], dst: &mut Vec<u16>) {
2060        match WORD_SIZE {
2061            1 => {
2062                let word = src[0];
2063                dst.push(word as u16);
2064            }
2065            2 => {
2066                let word = u16::from_le_bytes([src[0], src[1]]);
2067                dst.push(word);
2068            }
2069            4 => {
2070                let word = u32::from_le_bytes([src[0], src[1], src[2], src[3]]);
2071                dst.push(word as u16);
2072            }
2073            _ => unreachable!(),
2074        }
2075    }
2076
2077    fn parse_rep_desc_one<const WORD_SIZE: u8>(src: &[u8], max_rep: u16) -> ControlWordDesc {
2078        match WORD_SIZE {
2079            1 => ControlWordDesc {
2080                is_new_row: src[0] as u16 == max_rep,
2081                is_visible: true,
2082                is_valid_item: true,
2083            },
2084            2 => ControlWordDesc {
2085                is_new_row: u16::from_le_bytes([src[0], src[1]]) == max_rep,
2086                is_visible: true,
2087                is_valid_item: true,
2088            },
2089            4 => ControlWordDesc {
2090                is_new_row: u32::from_le_bytes([src[0], src[1], src[2], src[3]]) as u16 == max_rep,
2091                is_visible: true,
2092                is_valid_item: true,
2093            },
2094            _ => unreachable!(),
2095        }
2096    }
2097
2098    fn parse_def_desc_one<const WORD_SIZE: u8>(src: &[u8]) -> ControlWordDesc {
2099        match WORD_SIZE {
2100            1 => ControlWordDesc {
2101                is_new_row: true,
2102                is_visible: true,
2103                is_valid_item: src[0] == 0,
2104            },
2105            2 => ControlWordDesc {
2106                is_new_row: true,
2107                is_visible: true,
2108                is_valid_item: u16::from_le_bytes([src[0], src[1]]) == 0,
2109            },
2110            4 => ControlWordDesc {
2111                is_new_row: true,
2112                is_visible: true,
2113                is_valid_item: u32::from_le_bytes([src[0], src[1], src[2], src[3]]) as u16 == 0,
2114            },
2115            _ => unreachable!(),
2116        }
2117    }
2118
2119    /// Returns the number of bytes per control word
2120    pub fn bytes_per_word(&self) -> usize {
2121        match self {
2122            Self::BOTH8(..) => 1,
2123            Self::BOTH16(..) => 2,
2124            Self::BOTH32(..) => 4,
2125            Self::REP8 => 1,
2126            Self::REP16 => 2,
2127            Self::REP32 => 4,
2128            Self::DEF8 => 1,
2129            Self::DEF16 => 2,
2130            Self::DEF32 => 4,
2131            Self::NIL => 0,
2132        }
2133    }
2134
2135    /// Appends the next control word to the rep & def buffers
2136    ///
2137    /// `src` should be pointing at the first byte (little endian) of the control word
2138    ///
2139    /// `dst_rep` and `dst_def` are the buffers to append the rep and def levels to.
2140    /// They will not be appended to if not needed.
2141    pub fn parse(&self, src: &[u8], dst_rep: &mut Vec<u16>, dst_def: &mut Vec<u16>) {
2142        match self {
2143            Self::BOTH8(bits_to_shift, mask_to_apply) => {
2144                Self::parse_both::<1>(src, dst_rep, dst_def, *bits_to_shift, *mask_to_apply)
2145            }
2146            Self::BOTH16(bits_to_shift, mask_to_apply) => {
2147                Self::parse_both::<2>(src, dst_rep, dst_def, *bits_to_shift, *mask_to_apply)
2148            }
2149            Self::BOTH32(bits_to_shift, mask_to_apply) => {
2150                Self::parse_both::<4>(src, dst_rep, dst_def, *bits_to_shift, *mask_to_apply)
2151            }
2152            Self::REP8 => Self::parse_one::<1>(src, dst_rep),
2153            Self::REP16 => Self::parse_one::<2>(src, dst_rep),
2154            Self::REP32 => Self::parse_one::<4>(src, dst_rep),
2155            Self::DEF8 => Self::parse_one::<1>(src, dst_def),
2156            Self::DEF16 => Self::parse_one::<2>(src, dst_def),
2157            Self::DEF32 => Self::parse_one::<4>(src, dst_def),
2158            Self::NIL => {}
2159        }
2160    }
2161
2162    /// Return true if the control words contain repetition information
2163    pub fn has_rep(&self) -> bool {
2164        match self {
2165            Self::BOTH8(..)
2166            | Self::BOTH16(..)
2167            | Self::BOTH32(..)
2168            | Self::REP8
2169            | Self::REP16
2170            | Self::REP32 => true,
2171            Self::DEF8 | Self::DEF16 | Self::DEF32 | Self::NIL => false,
2172        }
2173    }
2174
2175    /// Temporarily parses the control word to inspect its properties but does not append to any buffers
2176    pub fn parse_desc(&self, src: &[u8], max_rep: u16, max_visible_def: u16) -> ControlWordDesc {
2177        match self {
2178            Self::BOTH8(bits_to_shift, mask_to_apply) => Self::parse_desc_both::<1>(
2179                src,
2180                *bits_to_shift,
2181                *mask_to_apply,
2182                max_rep,
2183                max_visible_def,
2184            ),
2185            Self::BOTH16(bits_to_shift, mask_to_apply) => Self::parse_desc_both::<2>(
2186                src,
2187                *bits_to_shift,
2188                *mask_to_apply,
2189                max_rep,
2190                max_visible_def,
2191            ),
2192            Self::BOTH32(bits_to_shift, mask_to_apply) => Self::parse_desc_both::<4>(
2193                src,
2194                *bits_to_shift,
2195                *mask_to_apply,
2196                max_rep,
2197                max_visible_def,
2198            ),
2199            Self::REP8 => Self::parse_rep_desc_one::<1>(src, max_rep),
2200            Self::REP16 => Self::parse_rep_desc_one::<2>(src, max_rep),
2201            Self::REP32 => Self::parse_rep_desc_one::<4>(src, max_rep),
2202            Self::DEF8 => Self::parse_def_desc_one::<1>(src),
2203            Self::DEF16 => Self::parse_def_desc_one::<2>(src),
2204            Self::DEF32 => Self::parse_def_desc_one::<4>(src),
2205            Self::NIL => ControlWordDesc {
2206                is_new_row: true,
2207                is_valid_item: true,
2208                is_visible: true,
2209            },
2210        }
2211    }
2212
2213    /// Creates a new parser from the number of bits used for the repetition and definition levels
2214    pub fn new(bits_rep: u8, bits_def: u8) -> Self {
2215        let total_bits = bits_rep + bits_def;
2216
2217        enum WordSize {
2218            One,
2219            Two,
2220            Four,
2221        }
2222
2223        let word_size = if total_bits <= 8 {
2224            WordSize::One
2225        } else if total_bits <= 16 {
2226            WordSize::Two
2227        } else {
2228            WordSize::Four
2229        };
2230
2231        match (bits_rep > 0, bits_def > 0, word_size) {
2232            (false, false, _) => Self::NIL,
2233            (false, true, WordSize::One) => Self::DEF8,
2234            (false, true, WordSize::Two) => Self::DEF16,
2235            (false, true, WordSize::Four) => Self::DEF32,
2236            (true, false, WordSize::One) => Self::REP8,
2237            (true, false, WordSize::Two) => Self::REP16,
2238            (true, false, WordSize::Four) => Self::REP32,
2239            (true, true, WordSize::One) => Self::BOTH8(bits_def, get_mask(bits_def as u16) as u32),
2240            (true, true, WordSize::Two) => Self::BOTH16(bits_def, get_mask(bits_def as u16) as u32),
2241            (true, true, WordSize::Four) => {
2242                Self::BOTH32(bits_def, get_mask(bits_def as u16) as u32)
2243            }
2244        }
2245    }
2246}
2247
2248#[cfg(test)]
2249mod tests {
2250    use arrow_buffer::{NullBuffer, OffsetBuffer, ScalarBuffer};
2251
2252    use crate::repdef::{
2253        CompositeRepDefUnraveler, DefinitionInterpretation, RepDefUnraveler, SerializedRepDefs,
2254    };
2255
2256    use super::RepDefBuilder;
2257
2258    fn validity(values: &[bool]) -> NullBuffer {
2259        NullBuffer::from_iter(values.iter().copied())
2260    }
2261
2262    fn offsets_32(values: &[i32]) -> OffsetBuffer<i32> {
2263        OffsetBuffer::<i32>::new(ScalarBuffer::from_iter(values.iter().copied()))
2264    }
2265
2266    fn offsets_64(values: &[i64]) -> OffsetBuffer<i64> {
2267        OffsetBuffer::<i64>::new(ScalarBuffer::from_iter(values.iter().copied()))
2268    }
2269
2270    #[test]
2271    fn test_repdef_empty_offsets() {
2272        // Empty offsets should serialize without panicking.
2273        let mut builder = RepDefBuilder::default();
2274        builder.add_offsets(offsets_32(&[0]), None);
2275        let repdefs = RepDefBuilder::serialize(vec![builder]);
2276        assert!(repdefs.repetition_levels.is_none());
2277        assert!(repdefs.definition_levels.is_none());
2278    }
2279
2280    #[test]
2281    fn test_repdef_basic() {
2282        // Basic case, rep & def
2283        let mut builder = RepDefBuilder::default();
2284        builder.add_offsets(
2285            offsets_64(&[0, 2, 2, 5]),
2286            Some(validity(&[true, false, true])),
2287        );
2288        builder.add_offsets(
2289            offsets_64(&[0, 1, 3, 5, 5, 9]),
2290            Some(validity(&[true, true, true, false, true])),
2291        );
2292        builder.add_validity_bitmap(validity(&[
2293            true, true, true, false, false, false, true, true, false,
2294        ]));
2295
2296        let repdefs = RepDefBuilder::serialize(vec![builder]);
2297        let rep = repdefs.repetition_levels.unwrap();
2298        let def = repdefs.definition_levels.unwrap();
2299
2300        assert_eq!(vec![0, 0, 0, 3, 1, 1, 2, 1, 0, 0, 1], *def);
2301        assert_eq!(vec![2, 1, 0, 2, 2, 0, 1, 1, 0, 0, 0], *rep);
2302
2303        // [[I], [I, I]], NULL, [[NULL, NULL], NULL, [NULL, I, I, NULL]]
2304
2305        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2306            Some(rep.as_ref().to_vec()),
2307            Some(def.as_ref().to_vec()),
2308            repdefs.def_meaning.into(),
2309            9,
2310        )]);
2311
2312        // Note: validity doesn't exactly round-trip because repdef normalizes some of the
2313        // redundant validity values
2314        assert_eq!(
2315            unraveler.unravel_validity(9),
2316            Some(validity(&[
2317                true, true, true, false, false, false, true, true, false
2318            ]))
2319        );
2320        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2321        assert_eq!(off.inner(), offsets_32(&[0, 1, 3, 5, 5, 9]).inner());
2322        assert_eq!(val, Some(validity(&[true, true, true, false, true])));
2323        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2324        assert_eq!(off.inner(), offsets_32(&[0, 2, 2, 5]).inner());
2325        assert_eq!(val, Some(validity(&[true, false, true])));
2326    }
2327
2328    #[test]
2329    fn test_repdef_simple_null_empty_list() {
2330        let check = |repdefs: SerializedRepDefs, last_def: DefinitionInterpretation| {
2331            let rep = repdefs.repetition_levels.unwrap();
2332            let def = repdefs.definition_levels.unwrap();
2333
2334            assert_eq!([1, 0, 1, 1, 0, 0], *rep);
2335            assert_eq!([0, 0, 2, 0, 1, 0], *def);
2336            assert_eq!(
2337                vec![DefinitionInterpretation::NullableItem, last_def,],
2338                repdefs.def_meaning
2339            );
2340        };
2341
2342        // Null list and empty list should be serialized mostly the same
2343
2344        // Null case
2345        let mut builder = RepDefBuilder::default();
2346        builder.add_offsets(
2347            offsets_32(&[0, 2, 2, 5]),
2348            Some(validity(&[true, false, true])),
2349        );
2350        builder.add_validity_bitmap(validity(&[true, true, true, false, true]));
2351
2352        let repdefs = RepDefBuilder::serialize(vec![builder]);
2353
2354        check(repdefs, DefinitionInterpretation::NullableList);
2355
2356        // Empty case
2357        let mut builder = RepDefBuilder::default();
2358        builder.add_offsets(offsets_32(&[0, 2, 2, 5]), None);
2359        builder.add_validity_bitmap(validity(&[true, true, true, false, true]));
2360
2361        let repdefs = RepDefBuilder::serialize(vec![builder]);
2362
2363        check(repdefs, DefinitionInterpretation::EmptyableList);
2364    }
2365
2366    #[test]
2367    fn test_repdef_empty_list_at_end() {
2368        // Regresses a failure we encountered when the last item was an empty list
2369        let mut builder = RepDefBuilder::default();
2370        builder.add_offsets(offsets_32(&[0, 2, 5, 5]), None);
2371        builder.add_validity_bitmap(validity(&[true, true, true, false, true]));
2372
2373        let repdefs = RepDefBuilder::serialize(vec![builder]);
2374
2375        let rep = repdefs.repetition_levels.unwrap();
2376        let def = repdefs.definition_levels.unwrap();
2377
2378        assert_eq!([1, 0, 1, 0, 0, 1], *rep);
2379        assert_eq!([0, 0, 0, 1, 0, 2], *def);
2380        assert_eq!(
2381            vec![
2382                DefinitionInterpretation::NullableItem,
2383                DefinitionInterpretation::EmptyableList,
2384            ],
2385            repdefs.def_meaning
2386        );
2387    }
2388
2389    #[test]
2390    fn test_repdef_abnormal_nulls() {
2391        // List nulls are allowed to have non-empty offsets and garbage values
2392        // and the add_offsets call should normalize this
2393        let mut builder = RepDefBuilder::default();
2394        builder.add_offsets(
2395            offsets_32(&[0, 2, 5, 8]),
2396            Some(validity(&[true, false, true])),
2397        );
2398        // Note: we pass 5 here and not 8.  If add_offsets tells us there is garbage nulls they
2399        // should be removed before continuing
2400        builder.add_no_null(5);
2401
2402        let repdefs = RepDefBuilder::serialize(vec![builder]);
2403
2404        let rep = repdefs.repetition_levels.unwrap();
2405        let def = repdefs.definition_levels.unwrap();
2406
2407        assert_eq!([1, 0, 1, 1, 0, 0], *rep);
2408        assert_eq!([0, 0, 1, 0, 0, 0], *def);
2409
2410        assert_eq!(
2411            vec![
2412                DefinitionInterpretation::AllValidItem,
2413                DefinitionInterpretation::NullableList,
2414            ],
2415            repdefs.def_meaning
2416        );
2417    }
2418
2419    #[test]
2420    fn test_repdef_fsl() {
2421        let mut builder = RepDefBuilder::default();
2422        builder.add_fsl(Some(validity(&[true, false])), 2, 2);
2423        builder.add_fsl(None, 2, 4);
2424        builder.add_validity_bitmap(validity(&[
2425            true, false, true, false, true, false, true, false,
2426        ]));
2427
2428        let repdefs = RepDefBuilder::serialize(vec![builder]);
2429
2430        assert_eq!(
2431            vec![
2432                DefinitionInterpretation::NullableItem,
2433                DefinitionInterpretation::AllValidItem,
2434                DefinitionInterpretation::NullableItem
2435            ],
2436            repdefs.def_meaning
2437        );
2438
2439        assert!(repdefs.repetition_levels.is_none());
2440
2441        let def = repdefs.definition_levels.unwrap();
2442
2443        assert_eq!([0, 1, 0, 1, 2, 2, 2, 2], *def);
2444
2445        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2446            None,
2447            Some(def.as_ref().to_vec()),
2448            repdefs.def_meaning.into(),
2449            8,
2450        )]);
2451
2452        assert_eq!(
2453            unraveler.unravel_validity(8),
2454            Some(validity(&[
2455                true, false, true, false, false, false, false, false
2456            ]))
2457        );
2458        assert_eq!(unraveler.unravel_fsl_validity(4, 2), None);
2459        assert_eq!(
2460            unraveler.unravel_fsl_validity(2, 2),
2461            Some(validity(&[true, false]))
2462        );
2463    }
2464
2465    #[test]
2466    fn test_repdef_fsl_allvalid_item() {
2467        let mut builder = RepDefBuilder::default();
2468        builder.add_fsl(Some(validity(&[true, false])), 2, 2);
2469        builder.add_fsl(None, 2, 4);
2470        builder.add_no_null(8);
2471
2472        let repdefs = RepDefBuilder::serialize(vec![builder]);
2473
2474        assert_eq!(
2475            vec![
2476                DefinitionInterpretation::AllValidItem,
2477                DefinitionInterpretation::AllValidItem,
2478                DefinitionInterpretation::NullableItem
2479            ],
2480            repdefs.def_meaning
2481        );
2482
2483        assert!(repdefs.repetition_levels.is_none());
2484
2485        let def = repdefs.definition_levels.unwrap();
2486
2487        assert_eq!([0, 0, 0, 0, 1, 1, 1, 1], *def);
2488
2489        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2490            None,
2491            Some(def.as_ref().to_vec()),
2492            repdefs.def_meaning.into(),
2493            8,
2494        )]);
2495
2496        assert_eq!(unraveler.unravel_validity(8), None);
2497        assert_eq!(unraveler.unravel_fsl_validity(4, 2), None);
2498        assert_eq!(
2499            unraveler.unravel_fsl_validity(2, 2),
2500            Some(validity(&[true, false]))
2501        );
2502    }
2503
2504    #[test]
2505    fn test_repdef_sliced_offsets() {
2506        // Sliced lists may have offsets that don't start with zero.  The
2507        // add_offsets call needs to normalize these to operate correctly.
2508        let mut builder = RepDefBuilder::default();
2509        builder.add_offsets(
2510            offsets_32(&[5, 7, 7, 10]),
2511            Some(validity(&[true, false, true])),
2512        );
2513        builder.add_no_null(5);
2514
2515        let repdefs = RepDefBuilder::serialize(vec![builder]);
2516
2517        let rep = repdefs.repetition_levels.unwrap();
2518        let def = repdefs.definition_levels.unwrap();
2519
2520        assert_eq!([1, 0, 1, 1, 0, 0], *rep);
2521        assert_eq!([0, 0, 1, 0, 0, 0], *def);
2522
2523        assert_eq!(
2524            vec![
2525                DefinitionInterpretation::AllValidItem,
2526                DefinitionInterpretation::NullableList,
2527            ],
2528            repdefs.def_meaning
2529        );
2530    }
2531
2532    #[test]
2533    fn test_repdef_complex_null_empty() {
2534        let mut builder = RepDefBuilder::default();
2535        builder.add_offsets(
2536            offsets_32(&[0, 4, 4, 4, 6]),
2537            Some(validity(&[true, false, true, true])),
2538        );
2539        builder.add_offsets(
2540            offsets_32(&[0, 1, 1, 2, 2, 2, 3]),
2541            Some(validity(&[true, false, true, false, true, true])),
2542        );
2543        builder.add_no_null(3);
2544
2545        let repdefs = RepDefBuilder::serialize(vec![builder]);
2546
2547        let rep = repdefs.repetition_levels.unwrap();
2548        let def = repdefs.definition_levels.unwrap();
2549
2550        assert_eq!([2, 1, 1, 1, 2, 2, 2, 1], *rep);
2551        assert_eq!([0, 1, 0, 1, 3, 4, 2, 0], *def);
2552    }
2553
2554    #[test]
2555    fn test_repdef_empty_list_no_null() {
2556        // Tests when we have some empty lists but no null lists.  This case
2557        // caused some bugs because we have definition but no nulls
2558        let mut builder = RepDefBuilder::default();
2559        builder.add_offsets(offsets_32(&[0, 4, 4, 4, 6]), None);
2560        builder.add_no_null(6);
2561
2562        let repdefs = RepDefBuilder::serialize(vec![builder]);
2563
2564        let rep = repdefs.repetition_levels.unwrap();
2565        let def = repdefs.definition_levels.unwrap();
2566
2567        assert_eq!([1, 0, 0, 0, 1, 1, 1, 0], *rep);
2568        assert_eq!([0, 0, 0, 0, 1, 1, 0, 0], *def);
2569
2570        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2571            Some(rep.as_ref().to_vec()),
2572            Some(def.as_ref().to_vec()),
2573            repdefs.def_meaning.into(),
2574            8,
2575        )]);
2576
2577        assert_eq!(unraveler.unravel_validity(6), None);
2578        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2579        assert_eq!(off.inner(), offsets_32(&[0, 4, 4, 4, 6]).inner());
2580        assert_eq!(val, None);
2581    }
2582
2583    #[test]
2584    fn test_repdef_all_valid() {
2585        let mut builder = RepDefBuilder::default();
2586        builder.add_offsets(offsets_64(&[0, 2, 3, 5]), None);
2587        builder.add_offsets(offsets_64(&[0, 1, 3, 5, 7, 9]), None);
2588        builder.add_no_null(9);
2589
2590        let repdefs = RepDefBuilder::serialize(vec![builder]);
2591        let rep = repdefs.repetition_levels.unwrap();
2592        assert!(repdefs.definition_levels.is_none());
2593
2594        assert_eq!([2, 1, 0, 2, 0, 2, 0, 1, 0], *rep);
2595
2596        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2597            Some(rep.as_ref().to_vec()),
2598            None,
2599            repdefs.def_meaning.into(),
2600            9,
2601        )]);
2602
2603        assert_eq!(unraveler.unravel_validity(9), None);
2604        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2605        assert_eq!(off.inner(), offsets_32(&[0, 1, 3, 5, 7, 9]).inner());
2606        assert_eq!(val, None);
2607        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2608        assert_eq!(off.inner(), offsets_32(&[0, 2, 3, 5]).inner());
2609        assert_eq!(val, None);
2610    }
2611
2612    #[test]
2613    fn test_only_empty_lists() {
2614        let mut builder = RepDefBuilder::default();
2615        builder.add_offsets(offsets_32(&[0, 4, 4, 4, 6]), None);
2616        builder.add_no_null(6);
2617
2618        let repdefs = RepDefBuilder::serialize(vec![builder]);
2619
2620        let rep = repdefs.repetition_levels.unwrap();
2621        let def = repdefs.definition_levels.unwrap();
2622
2623        assert_eq!([1, 0, 0, 0, 1, 1, 1, 0], *rep);
2624        assert_eq!([0, 0, 0, 0, 1, 1, 0, 0], *def);
2625
2626        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2627            Some(rep.as_ref().to_vec()),
2628            Some(def.as_ref().to_vec()),
2629            repdefs.def_meaning.into(),
2630            8,
2631        )]);
2632
2633        assert_eq!(unraveler.unravel_validity(6), None);
2634        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2635        assert_eq!(off.inner(), offsets_32(&[0, 4, 4, 4, 6]).inner());
2636        assert_eq!(val, None);
2637    }
2638
2639    #[test]
2640    fn test_only_null_lists() {
2641        let mut builder = RepDefBuilder::default();
2642        builder.add_offsets(
2643            offsets_32(&[0, 4, 4, 4, 6]),
2644            Some(validity(&[true, false, false, true])),
2645        );
2646        builder.add_no_null(6);
2647
2648        let repdefs = RepDefBuilder::serialize(vec![builder]);
2649
2650        let rep = repdefs.repetition_levels.unwrap();
2651        let def = repdefs.definition_levels.unwrap();
2652
2653        assert_eq!([1, 0, 0, 0, 1, 1, 1, 0], *rep);
2654        assert_eq!([0, 0, 0, 0, 1, 1, 0, 0], *def);
2655
2656        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2657            Some(rep.as_ref().to_vec()),
2658            Some(def.as_ref().to_vec()),
2659            repdefs.def_meaning.into(),
2660            8,
2661        )]);
2662
2663        assert_eq!(unraveler.unravel_validity(6), None);
2664        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2665        assert_eq!(off.inner(), offsets_32(&[0, 4, 4, 4, 6]).inner());
2666        assert_eq!(val, Some(validity(&[true, false, false, true])));
2667    }
2668
2669    #[test]
2670    fn test_null_and_empty_lists() {
2671        let mut builder = RepDefBuilder::default();
2672        builder.add_offsets(
2673            offsets_32(&[0, 4, 4, 4, 6]),
2674            Some(validity(&[true, false, true, true])),
2675        );
2676        builder.add_no_null(6);
2677
2678        let repdefs = RepDefBuilder::serialize(vec![builder]);
2679
2680        let rep = repdefs.repetition_levels.unwrap();
2681        let def = repdefs.definition_levels.unwrap();
2682
2683        assert_eq!([1, 0, 0, 0, 1, 1, 1, 0], *rep);
2684        assert_eq!([0, 0, 0, 0, 1, 2, 0, 0], *def);
2685
2686        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2687            Some(rep.as_ref().to_vec()),
2688            Some(def.as_ref().to_vec()),
2689            repdefs.def_meaning.into(),
2690            8,
2691        )]);
2692
2693        assert_eq!(unraveler.unravel_validity(6), None);
2694        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2695        assert_eq!(off.inner(), offsets_32(&[0, 4, 4, 4, 6]).inner());
2696        assert_eq!(val, Some(validity(&[true, false, true, true])));
2697    }
2698
2699    #[test]
2700    fn test_repdef_null_struct_valid_list() {
2701        // This regresses a bug
2702
2703        let rep = vec![1, 0, 0, 0];
2704        let def = vec![2, 0, 2, 2];
2705        // AllValidList<NullableStruct<NullableItem>>
2706        let def_meaning = vec![
2707            DefinitionInterpretation::NullableItem,
2708            DefinitionInterpretation::NullableItem,
2709            DefinitionInterpretation::AllValidList,
2710        ];
2711        let num_items = 4;
2712
2713        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2714            Some(rep),
2715            Some(def),
2716            def_meaning.into(),
2717            num_items,
2718        )]);
2719
2720        assert_eq!(
2721            unraveler.unravel_validity(4),
2722            Some(validity(&[false, true, false, false]))
2723        );
2724        assert_eq!(
2725            unraveler.unravel_validity(4),
2726            Some(validity(&[false, true, false, false]))
2727        );
2728        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2729        assert_eq!(off.inner(), offsets_32(&[0, 4]).inner());
2730        assert_eq!(val, None);
2731    }
2732
2733    #[test]
2734    fn test_repdef_no_rep() {
2735        let mut builder = RepDefBuilder::default();
2736        builder.add_no_null(5);
2737        builder.add_validity_bitmap(validity(&[false, false, true, true, true]));
2738        builder.add_validity_bitmap(validity(&[false, true, true, true, false]));
2739
2740        let repdefs = RepDefBuilder::serialize(vec![builder]);
2741        assert!(repdefs.repetition_levels.is_none());
2742        let def = repdefs.definition_levels.unwrap();
2743
2744        assert_eq!([2, 2, 0, 0, 1], *def);
2745
2746        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2747            None,
2748            Some(def.as_ref().to_vec()),
2749            repdefs.def_meaning.into(),
2750            5,
2751        )]);
2752
2753        assert_eq!(
2754            unraveler.unravel_validity(5),
2755            Some(validity(&[false, false, true, true, false]))
2756        );
2757        assert_eq!(
2758            unraveler.unravel_validity(5),
2759            Some(validity(&[false, false, true, true, true]))
2760        );
2761        assert_eq!(unraveler.unravel_validity(5), None);
2762    }
2763
2764    #[test]
2765    fn test_composite_unravel() {
2766        let mut builder = RepDefBuilder::default();
2767        builder.add_offsets(
2768            offsets_64(&[0, 2, 2, 5]),
2769            Some(validity(&[true, false, true])),
2770        );
2771        builder.add_no_null(5);
2772        let repdef1 = RepDefBuilder::serialize(vec![builder]);
2773
2774        let mut builder = RepDefBuilder::default();
2775        builder.add_offsets(offsets_64(&[0, 1, 3, 5, 7, 9]), None);
2776        builder.add_no_null(9);
2777        let repdef2 = RepDefBuilder::serialize(vec![builder]);
2778
2779        let rep1 = repdef1.repetition_levels.clone().unwrap();
2780        let def1 = repdef1.definition_levels.clone().unwrap();
2781        let rep2 = repdef2.repetition_levels.clone().unwrap();
2782        assert!(repdef2.definition_levels.is_none());
2783
2784        assert_eq!([1, 0, 1, 1, 0, 0], *rep1);
2785        assert_eq!([0, 0, 1, 0, 0, 0], *def1);
2786        assert_eq!([1, 1, 0, 1, 0, 1, 0, 1, 0], *rep2);
2787
2788        let unravel1 = RepDefUnraveler::new(
2789            repdef1.repetition_levels.map(|l| l.to_vec()),
2790            repdef1.definition_levels.map(|l| l.to_vec()),
2791            repdef1.def_meaning.into(),
2792            5,
2793        );
2794        let unravel2 = RepDefUnraveler::new(
2795            repdef2.repetition_levels.map(|l| l.to_vec()),
2796            repdef2.definition_levels.map(|l| l.to_vec()),
2797            repdef2.def_meaning.into(),
2798            9,
2799        );
2800
2801        let mut unraveler = CompositeRepDefUnraveler::new(vec![unravel1, unravel2]);
2802
2803        assert!(unraveler.unravel_validity(9).is_none());
2804        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2805        assert_eq!(
2806            off.inner(),
2807            offsets_32(&[0, 2, 2, 5, 6, 8, 10, 12, 14]).inner()
2808        );
2809        assert_eq!(
2810            val,
2811            Some(validity(&[true, false, true, true, true, true, true, true]))
2812        );
2813    }
2814
2815    #[test]
2816    fn test_repdef_multiple_builders() {
2817        // Basic case, rep & def
2818        let mut builder1 = RepDefBuilder::default();
2819        builder1.add_offsets(offsets_64(&[0, 2]), None);
2820        builder1.add_offsets(offsets_64(&[0, 1, 3]), None);
2821        builder1.add_validity_bitmap(validity(&[true, true, true]));
2822
2823        let mut builder2 = RepDefBuilder::default();
2824        builder2.add_offsets(offsets_64(&[0, 0, 3]), Some(validity(&[false, true])));
2825        builder2.add_offsets(
2826            offsets_64(&[0, 2, 2, 6]),
2827            Some(validity(&[true, false, true])),
2828        );
2829        builder2.add_validity_bitmap(validity(&[false, false, false, true, true, false]));
2830
2831        let repdefs = RepDefBuilder::serialize(vec![builder1, builder2]);
2832
2833        let rep = repdefs.repetition_levels.unwrap();
2834        let def = repdefs.definition_levels.unwrap();
2835
2836        assert_eq!([2, 1, 0, 2, 2, 0, 1, 1, 0, 0, 0], *rep);
2837        assert_eq!([0, 0, 0, 3, 1, 1, 2, 1, 0, 0, 1], *def);
2838    }
2839
2840    #[test]
2841    fn test_all_valid_validity_bitmap_serializes_as_no_null() {
2842        let mut from_bitmap = RepDefBuilder::default();
2843        from_bitmap.add_validity_bitmap(validity(&[true, true, true, true]));
2844
2845        let mut from_no_null = RepDefBuilder::default();
2846        from_no_null.add_no_null(4);
2847
2848        let from_bitmap = RepDefBuilder::serialize(vec![from_bitmap]);
2849        let from_no_null = RepDefBuilder::serialize(vec![from_no_null]);
2850
2851        assert!(from_bitmap.repetition_levels.is_none());
2852        assert!(from_bitmap.definition_levels.is_none());
2853        assert_eq!(from_bitmap.def_meaning, from_no_null.def_meaning);
2854        assert_eq!(
2855            from_bitmap.max_visible_level,
2856            from_no_null.max_visible_level
2857        );
2858    }
2859
2860    #[test]
2861    fn test_slicer() {
2862        let mut builder = RepDefBuilder::default();
2863        builder.add_offsets(
2864            offsets_64(&[0, 2, 2, 30, 30]),
2865            Some(validity(&[true, false, true, true])),
2866        );
2867        builder.add_no_null(30);
2868
2869        let repdefs = RepDefBuilder::serialize(vec![builder]);
2870
2871        let mut rep_slicer = repdefs.rep_slicer().unwrap();
2872
2873        // First 5 items include a null list so we get 6 levels (12 bytes)
2874        assert_eq!(rep_slicer.slice_next(5).len(), 12);
2875        // Next 20 are all plain
2876        assert_eq!(rep_slicer.slice_next(20).len(), 40);
2877        // Last 5 include an empty list so we get 6 levels (12 bytes)
2878        assert_eq!(rep_slicer.slice_rest().len(), 12);
2879
2880        let mut def_slicer = repdefs.rep_slicer().unwrap();
2881
2882        // First 5 items include a null list so we get 6 levels (12 bytes)
2883        assert_eq!(def_slicer.slice_next(5).len(), 12);
2884        // Next 20 are all plain
2885        assert_eq!(def_slicer.slice_next(20).len(), 40);
2886        // Last 5 include an empty list so we get 6 levels (12 bytes)
2887        assert_eq!(def_slicer.slice_rest().len(), 12);
2888    }
2889
2890    #[test]
2891    fn test_control_words() {
2892        // Convert to control words, verify expected, convert back, verify same as original
2893        fn check(
2894            rep: &[u16],
2895            def: &[u16],
2896            expected_values: Vec<u8>,
2897            expected_bytes_per_word: usize,
2898            expected_bits_rep: u8,
2899            expected_bits_def: u8,
2900        ) {
2901            let num_vals = rep.len().max(def.len());
2902            let max_rep = rep.iter().max().copied().unwrap_or(0);
2903            let max_def = def.iter().max().copied().unwrap_or(0);
2904
2905            let in_rep = if rep.is_empty() { None } else { Some(rep) };
2906            let in_def = if def.is_empty() { None } else { Some(def) };
2907
2908            let mut iter = super::build_control_word_iterator(
2909                in_rep,
2910                max_rep,
2911                in_def,
2912                max_def,
2913                max_def + 1,
2914                expected_values.len(),
2915            );
2916            assert_eq!(iter.bytes_per_word(), expected_bytes_per_word);
2917            assert_eq!(iter.bits_rep(), expected_bits_rep);
2918            assert_eq!(iter.bits_def(), expected_bits_def);
2919            let mut cw_vec = Vec::with_capacity(num_vals * iter.bytes_per_word());
2920
2921            for _ in 0..num_vals {
2922                iter.append_next(&mut cw_vec);
2923            }
2924            assert!(iter.append_next(&mut cw_vec).is_none());
2925
2926            assert_eq!(expected_values, cw_vec);
2927
2928            let parser = super::ControlWordParser::new(expected_bits_rep, expected_bits_def);
2929
2930            let mut rep_out = Vec::with_capacity(num_vals);
2931            let mut def_out = Vec::with_capacity(num_vals);
2932
2933            if expected_bytes_per_word > 0 {
2934                for slice in cw_vec.chunks_exact(expected_bytes_per_word) {
2935                    parser.parse(slice, &mut rep_out, &mut def_out);
2936                }
2937            }
2938
2939            assert_eq!(rep, rep_out.as_slice());
2940            assert_eq!(def, def_out.as_slice());
2941        }
2942
2943        // Each will need 4 bits and so we should get 1-byte control words
2944        let rep = &[0_u16, 7, 3, 2, 9, 8, 12, 5];
2945        let def = &[5_u16, 3, 1, 2, 12, 15, 0, 2];
2946        let expected = vec![
2947            0b00000101, // 0, 5
2948            0b01110011, // 7, 3
2949            0b00110001, // 3, 1
2950            0b00100010, // 2, 2
2951            0b10011100, // 9, 12
2952            0b10001111, // 8, 15
2953            0b11000000, // 12, 0
2954            0b01010010, // 5, 2
2955        ];
2956        check(rep, def, expected, 1, 4, 4);
2957
2958        // Now we need 5 bits for def so we get 2-byte control words
2959        let rep = &[0_u16, 7, 3, 2, 9, 8, 12, 5];
2960        let def = &[5_u16, 3, 1, 2, 12, 22, 0, 2];
2961        let expected = vec![
2962            0b00000101, 0b00000000, // 0, 5
2963            0b11100011, 0b00000000, // 7, 3
2964            0b01100001, 0b00000000, // 3, 1
2965            0b01000010, 0b00000000, // 2, 2
2966            0b00101100, 0b00000001, // 9, 12
2967            0b00010110, 0b00000001, // 8, 22
2968            0b10000000, 0b00000001, // 12, 0
2969            0b10100010, 0b00000000, // 5, 2
2970        ];
2971        check(rep, def, expected, 2, 4, 5);
2972
2973        // Just rep, 4 bits so 1 byte each
2974        let levels = &[0_u16, 7, 3, 2, 9, 8, 12, 5];
2975        let expected = vec![
2976            0b00000000, // 0
2977            0b00000111, // 7
2978            0b00000011, // 3
2979            0b00000010, // 2
2980            0b00001001, // 9
2981            0b00001000, // 8
2982            0b00001100, // 12
2983            0b00000101, // 5
2984        ];
2985        check(levels, &[], expected.clone(), 1, 4, 0);
2986
2987        // Just def
2988        check(&[], levels, expected, 1, 0, 4);
2989
2990        // No rep, no def, no bytes
2991        check(&[], &[], Vec::default(), 0, 0, 0);
2992    }
2993
2994    #[test]
2995    fn test_control_words_rep_index() {
2996        fn check(
2997            rep: &[u16],
2998            def: &[u16],
2999            expected_new_rows: Vec<bool>,
3000            expected_is_visible: Vec<bool>,
3001        ) {
3002            let num_vals = rep.len().max(def.len());
3003            let max_rep = rep.iter().max().copied().unwrap_or(0);
3004            let max_def = def.iter().max().copied().unwrap_or(0);
3005
3006            let in_rep = if rep.is_empty() { None } else { Some(rep) };
3007            let in_def = if def.is_empty() { None } else { Some(def) };
3008
3009            let mut iter = super::build_control_word_iterator(
3010                in_rep,
3011                max_rep,
3012                in_def,
3013                max_def,
3014                /*max_visible_def=*/ 2,
3015                expected_new_rows.len(),
3016            );
3017
3018            let mut cw_vec = Vec::with_capacity(num_vals * iter.bytes_per_word());
3019            let mut expected_new_rows = expected_new_rows.iter().copied();
3020            let mut expected_is_visible = expected_is_visible.iter().copied();
3021            for _ in 0..expected_new_rows.len() {
3022                let word_desc = iter.append_next(&mut cw_vec).unwrap();
3023                assert_eq!(word_desc.is_new_row, expected_new_rows.next().unwrap());
3024                assert_eq!(word_desc.is_visible, expected_is_visible.next().unwrap());
3025            }
3026            assert!(iter.append_next(&mut cw_vec).is_none());
3027        }
3028
3029        // 2 means new list
3030        let rep = &[2_u16, 1, 0, 2, 2, 0, 1, 1, 0, 2, 0];
3031        // These values don't matter for this test
3032        let def = &[0_u16, 0, 0, 3, 1, 1, 2, 1, 0, 0, 1];
3033
3034        // Rep & def
3035        check(
3036            rep,
3037            def,
3038            vec![
3039                true, false, false, true, true, false, false, false, false, true, false,
3040            ],
3041            vec![
3042                true, true, true, false, true, true, true, true, true, true, true,
3043            ],
3044        );
3045        // Rep only
3046        check(
3047            rep,
3048            &[],
3049            vec![
3050                true, false, false, true, true, false, false, false, false, true, false,
3051            ],
3052            vec![true; 11],
3053        );
3054        // No repetition
3055        check(
3056            &[],
3057            def,
3058            vec![
3059                true, true, true, true, true, true, true, true, true, true, true,
3060            ],
3061            vec![true; 11],
3062        );
3063        // No repetition, no definition
3064        check(
3065            &[],
3066            &[],
3067            vec![
3068                true, true, true, true, true, true, true, true, true, true, true,
3069            ],
3070            vec![true; 11],
3071        );
3072    }
3073
3074    #[test]
3075    fn regress_empty_list_case() {
3076        // This regresses a case where we had 3 null lists inside a struct
3077        let mut builder = RepDefBuilder::default();
3078        builder.add_validity_bitmap(validity(&[true, false, true]));
3079        builder.add_offsets(
3080            offsets_32(&[0, 0, 0, 0]),
3081            Some(validity(&[false, false, false])),
3082        );
3083        builder.add_no_null(0);
3084
3085        let repdefs = RepDefBuilder::serialize(vec![builder]);
3086        let rep = repdefs.repetition_levels.unwrap();
3087        let def = repdefs.definition_levels.unwrap();
3088
3089        assert_eq!([1, 1, 1], *rep);
3090        assert_eq!([1, 2, 1], *def);
3091
3092        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
3093            Some(rep.as_ref().to_vec()),
3094            Some(def.as_ref().to_vec()),
3095            repdefs.def_meaning.into(),
3096            0,
3097        )]);
3098
3099        assert_eq!(unraveler.unravel_validity(0), None);
3100        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
3101        assert_eq!(off.inner(), offsets_32(&[0, 0, 0, 0]).inner());
3102        assert_eq!(val, Some(validity(&[false, false, false])));
3103        let val = unraveler.unravel_validity(3).unwrap();
3104        assert_eq!(val.inner(), validity(&[true, false, true]).inner());
3105    }
3106
3107    #[test]
3108    fn regress_list_ends_null_case() {
3109        let mut builder = RepDefBuilder::default();
3110        builder.add_offsets(
3111            offsets_64(&[0, 1, 2, 2]),
3112            Some(validity(&[true, true, false])),
3113        );
3114        builder.add_offsets(offsets_64(&[0, 1, 1]), Some(validity(&[true, false])));
3115        builder.add_no_null(1);
3116
3117        let repdefs = RepDefBuilder::serialize(vec![builder]);
3118        let rep = repdefs.repetition_levels.unwrap();
3119        let def = repdefs.definition_levels.unwrap();
3120
3121        assert_eq!([2, 2, 2], *rep);
3122        assert_eq!([0, 1, 2], *def);
3123
3124        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
3125            Some(rep.as_ref().to_vec()),
3126            Some(def.as_ref().to_vec()),
3127            repdefs.def_meaning.into(),
3128            1,
3129        )]);
3130
3131        assert_eq!(unraveler.unravel_validity(1), None);
3132        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
3133        assert_eq!(off.inner(), offsets_32(&[0, 1, 1]).inner());
3134        assert_eq!(val, Some(validity(&[true, false])));
3135        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
3136        assert_eq!(off.inner(), offsets_32(&[0, 1, 2, 2]).inner());
3137        assert_eq!(val, Some(validity(&[true, true, false])));
3138    }
3139
3140    #[test]
3141    fn test_mixed_unraveler() {
3142        // This tests cases where the validity is different between two different pages
3143        // because one page has nulls and the other doesn't.
3144
3145        // Simple case with one layer of validity and no repetition
3146        let mut unraveler = CompositeRepDefUnraveler::new(vec![
3147            RepDefUnraveler::new(
3148                None,
3149                Some(vec![0, 1, 0, 1]),
3150                vec![DefinitionInterpretation::NullableItem].into(),
3151                4,
3152            ),
3153            RepDefUnraveler::new(
3154                None,
3155                None,
3156                vec![DefinitionInterpretation::AllValidItem].into(),
3157                4,
3158            ),
3159        ]);
3160
3161        assert_eq!(
3162            unraveler.unravel_validity(8),
3163            Some(validity(&[
3164                true, false, true, false, true, true, true, true
3165            ]))
3166        );
3167
3168        // More complex case with two layers of validity and repetition
3169        let def1 = Some(vec![0, 1, 2]);
3170        let rep1 = Some(vec![1, 0, 1]);
3171
3172        let def2 = Some(vec![1, 0, 0]);
3173        let rep2 = Some(vec![1, 1, 0]);
3174
3175        let mut unraveler = CompositeRepDefUnraveler::new(vec![
3176            RepDefUnraveler::new(
3177                rep1,
3178                def1,
3179                vec![
3180                    DefinitionInterpretation::NullableItem,
3181                    DefinitionInterpretation::EmptyableList,
3182                ]
3183                .into(),
3184                2,
3185            ),
3186            RepDefUnraveler::new(
3187                rep2,
3188                def2,
3189                vec![
3190                    DefinitionInterpretation::AllValidItem,
3191                    DefinitionInterpretation::NullableList,
3192                ]
3193                .into(),
3194                2,
3195            ),
3196        ]);
3197
3198        assert_eq!(
3199            unraveler.unravel_validity(4),
3200            Some(validity(&[true, false, true, true]))
3201        );
3202        assert_eq!(
3203            unraveler.unravel_offsets::<i32>().unwrap(),
3204            (
3205                offsets_32(&[0, 2, 2, 2, 4]),
3206                Some(validity(&[true, true, false, true]))
3207            )
3208        );
3209    }
3210
3211    #[test]
3212    fn test_mixed_unraveler_nullable_without_def_levels() {
3213        // A page can keep nullable layer metadata even when all definition levels are 0
3214        // and no definition buffer needs to be materialized. This should decode as all-valid.
3215        let mut unraveler = CompositeRepDefUnraveler::new(vec![
3216            RepDefUnraveler::new(
3217                None,
3218                Some(vec![0, 1, 0, 1]),
3219                vec![DefinitionInterpretation::NullableItem].into(),
3220                4,
3221            ),
3222            RepDefUnraveler::new(
3223                None,
3224                None,
3225                vec![DefinitionInterpretation::NullableItem].into(),
3226                4,
3227            ),
3228        ]);
3229
3230        assert_eq!(
3231            unraveler.unravel_validity(8),
3232            Some(validity(&[
3233                true, false, true, false, true, true, true, true
3234            ]))
3235        );
3236    }
3237}