Skip to main content

lance_encoding/
repdef.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Utilities for rep-def levels
5//!
6//! Repetition and definition levels are a way to encode multipile validity / offsets arrays
7//! into a single buffer.  They are a form of "zipping" buffers together that takes advantage
8//! of the fact that, if the outermost array is invalid, then the validity of the inner items
9//! is irrelevant.
10//!
11//! Note: the concept of repetition & definition levels comes from the Dremel paper and has
12//! been implemented in Apache Parquet.  However, the implementation here is not necessarily
13//! compatible with Parquet.  For example, we use 0 to represent the "inner-most" item and
14//! Parquet uses 0 to represent the "outer-most" item.
15//!
16//! # Repetition Levels
17//!
18//! With repetition levels we convert a sparse array of offsets into a dense array of levels.
19//! These levels are marked non-zero whenever a new list begins.  In other words, given the
20//! list array with 3 rows [{<0,1>, <>, <2>}, {<3>}, {}], [], [{<4>}] we would have three
21//! offsets arrays:
22//!
23//! Outer-most ([]): [0, 3, 3, 4]
24//! Middle     ({}): [0, 3, 4, 4, 5]
25//! Inner      (<>): [0, 2, 2, 3, 4, 5]
26//! Values         : [0, 1, 2, 3, 4]
27//!
28//! We can convert these into repetition levels as follows:
29//!
30//! | Values | Repetition |
31//! | ------ | ---------- |
32//! |      0 |          3 | // Start of outer-most list
33//! |      1 |          0 | // Continues inner-most list (no new lists)
34//! |      - |          1 | // Start of new inner-most list (empty list)
35//! |      2 |          1 | // Start of new inner-most list
36//! |      3 |          2 | // Start of new middle list
37//! |      - |          2 | // Start of new inner-most list (empty list)
38//! |      - |          3 | // Start of new outer-most list (empty list)
39//! |      4 |          0 | // Start of new outer-most list
40//!
41//! Note: We actually have MORE repetition levels than values.  This is because the repetition
42//! levels need to be able to represent empty lists.
43//!
44//! # Definition Levels
45//!
46//! Definition levels are simpler.  We can think of them as zipping together various validity bitmaps
47//! (from different levels of nesting) into a single buffer.  For example, we could zip the arrays
48//! [1, 1, 0, 0] and [1, 0, 1, 0] into [11, 10, 01, 00].  However, 00 and 01 are redundant.  If the
49//! outer level is null then the validity of the inner levels is irrelevant.  To save space we instead
50//! encode a "level" which is the "depth" of the null.  Let's look at a more complete example:
51//!
52//! Array: [{"middle": {"inner": 1]}}, NULL, {"middle": NULL}, {"middle": {"inner": NULL}}]
53//!
54//! In Arrow we would have the following validity arrays:
55//! Outer validity : 1, 0, 1, 1
56//! Middle validity: 1, ?, 0, 1
57//! Inner validity : 1, ?, ?, 0
58//! Values         : 1, ?, ?, ?
59//!
60//! The ? values are undefined in the Arrow format.  We can convert these into definition levels as follows:
61//!
62//! | Values | Definition |
63//! | ------ | ---------- |
64//! |      1 |          0 | // Valid at all levels
65//! |      - |          3 | // Null at outer level
66//! |      - |          2 | // Null at middle level
67//! |      - |          1 | // Null at inner level
68//!
69//! # Compression
70//!
71//! Note that we only need 2 bits of definition levels to represent 3 levels of nesting.  Definition
72//! levels are always more compact than the input validity arrays.  However, compressed levels are not
73//! necessarily more compact than the compressed validity arrays.
74//!
75//! Repetition levels are more complex.  If there are very large lists then a sparse array of offsets
76//! (which has one element per list) might be more compact than a dense array of repetition levels
77//! (which has one element per list value, possibly even more if there are empty lists).
78//!
79//! However, both repetition levels and definition levels are typically very compressible with RLE.
80//!
81//! However, in Lance we don't always take advantage of that compression because we want to be able
82//! to zip rep-def levels together with our values.  This gives us fewer IOPS when accessing row values.
83//!
84//! # Utilities in this Module
85//!
86//! - `RepDefBuilder` - Extracts validity and offset information from Arrow arrays.  We use this as we
87//!   shred the incoming data into primitive leaf arrays.  We don't immediately convert into rep-def because
88//!   we need to share and cheaply clone the builder when we have structs (each struct child shares some parent
89//!   validity / offset information)  The `serialize` method is called once all data has been received to create
90//!   the final rep-def levels.
91//!
92//! - `SerializerContext` - This is an internal utility that helps with serializing rep-def levels.
93//!
94//! - `CompositeRepDefUnraveler` - This structure is used to reverse the process.  It starts with a set of
95//!   rep-def levels and then uses the `unravel_validity` and `unravel_offsets` methods to produce validity
96//!   buffers and offset buffers.  It is "composite" because we may be combining sets of rep-def buffers from
97//!   multiple locations (e.g. multiple blocks in a mini-block encoded file).
98//!
99//! - `RepDefSlicer` - This is a utility that helps with slicing rep-def buffers.  These buffers are "kind of"
100//!   transparent (maps 1:1 with the values in the array) but not exactly because of the special (empty/null) lists.
101//!   The slicer helps with this issue and is used when slicing a rep-def buffer into mini-blocks.
102//!
103//! - `build_control_word_iterator` - This takes in rep-def levels and returns an iterator that returns byte-padded
104//!   "control words" which are used when creating full-zip encoded data.
105//!
106//! - `ControlWordParser` - This parser can parse the control words returned by `build_control_word_iterator` and is
107//!   used when decoding full-zip encoded data.
108
109use std::{
110    iter::{Copied, Zip},
111    sync::Arc,
112};
113
114use arrow_array::OffsetSizeTrait;
115use arrow_buffer::{
116    ArrowNativeType, BooleanBuffer, BooleanBufferBuilder, NullBuffer, OffsetBuffer, ScalarBuffer,
117};
118use lance_core::{Error, Result, utils::bit::log_2_ceil};
119
120use crate::buffer::LanceBuffer;
121
122pub type LevelBuffer = Vec<u16>;
123
124// As we build def levels we add this to special values to indicate that they
125// are special so that we can skip over them when processing lower levels.
126//
127// We assume 16 bits is good enough for rep-def levels.  This _would_ give
128// us 65536 levels of struct nesting and list nesting.  However, we cut that
129// in half for SPECIAL_THRESHOLD because we use the top bit to indicate if an
130// item is a special value (null list / empty list) during construction.
131//
132// We subtract this off at the end of construction to get the actual definition
133// levels.
134const SPECIAL_THRESHOLD: u16 = u16::MAX / 2;
135
136/// Represents information that we extract from a list array as we are
137/// encoding
138#[derive(Clone, Debug)]
139struct OffsetDesc {
140    offsets: Arc<[i64]>,
141    validity: Option<BooleanBuffer>,
142    has_empty_lists: bool,
143    num_values: usize,
144    num_specials: usize,
145}
146
147/// Represents validity information that we extract from non-list arrays (that
148/// have nulls) as we are encoding
149#[derive(Clone, Debug)]
150struct ValidityDesc {
151    validity: Option<BooleanBuffer>,
152    num_values: usize,
153}
154
155/// Represents validity information that we extract from FSL arrays.  This is
156/// just validity (no offsets) but we also record the dimension of the FSL array
157/// as that will impact the next layer
158#[derive(Clone, Debug)]
159struct FslDesc {
160    validity: Option<BooleanBuffer>,
161    dimension: usize,
162    num_values: usize,
163}
164
165// As we build up rep/def from arrow arrays we record a
166// series of RawRepDef objects.  Each one corresponds to layer
167// in the array structure
168#[derive(Clone, Debug)]
169enum RawRepDef {
170    Offsets(OffsetDesc),
171    Validity(ValidityDesc),
172    Fsl(FslDesc),
173}
174
175impl RawRepDef {
176    // Are there any nulls in this layer
177    fn has_nulls(&self) -> bool {
178        match self {
179            Self::Offsets(OffsetDesc { validity, .. }) => validity.is_some(),
180            Self::Validity(ValidityDesc { validity, .. }) => validity.is_some(),
181            Self::Fsl(FslDesc { validity, .. }) => validity.is_some(),
182        }
183    }
184
185    // How many values are in this layer
186    fn num_values(&self) -> usize {
187        match self {
188            Self::Offsets(OffsetDesc { num_values, .. }) => *num_values,
189            Self::Validity(ValidityDesc { num_values, .. }) => *num_values,
190            Self::Fsl(FslDesc { num_values, .. }) => *num_values,
191        }
192    }
193
194    /// How many empty/null lists are in this layer
195    fn num_specials(&self) -> usize {
196        match self {
197            Self::Offsets(OffsetDesc { num_specials, .. }) => *num_specials,
198            _ => 0,
199        }
200    }
201
202    /// How many definition levels do we need for this layer
203    fn max_def(&self) -> u16 {
204        match self {
205            Self::Offsets(OffsetDesc {
206                has_empty_lists,
207                validity,
208                ..
209            }) => {
210                let mut max_def = 0;
211                if *has_empty_lists {
212                    max_def += 1;
213                }
214                if validity.is_some() {
215                    max_def += 1;
216                }
217                max_def
218            }
219            Self::Validity(ValidityDesc { validity: None, .. }) => 0,
220            Self::Validity(ValidityDesc { .. }) => 1,
221            Self::Fsl(FslDesc { validity: None, .. }) => 0,
222            Self::Fsl(FslDesc { .. }) => 1,
223        }
224    }
225
226    /// How many repetition levels do we need for this layer
227    fn max_rep(&self) -> u16 {
228        match self {
229            Self::Offsets(_) => 1,
230            _ => 0,
231        }
232    }
233}
234
235/// Represents repetition and definition levels that have been
236/// serialized into a pair of (optional) level buffers
237#[derive(Debug)]
238pub struct SerializedRepDefs {
239    /// The repetition levels, one per item
240    ///
241    /// If None, there are no lists
242    pub repetition_levels: Option<Arc<[u16]>>,
243    /// The definition levels, one per item
244    ///
245    /// If None, there are no nulls
246    pub definition_levels: Option<Arc<[u16]>>,
247    /// The meaning of each definition level
248    pub def_meaning: Vec<DefinitionInterpretation>,
249    /// The maximum level that is "visible" from the lowest level
250    ///
251    /// This is the last level before we encounter a list level of some kind.  Once we've
252    /// hit a list level then nulls in any level beyond do not map to actual items.
253    ///
254    /// This is None if there are no lists
255    pub max_visible_level: Option<u16>,
256}
257
258impl SerializedRepDefs {
259    pub fn new(
260        repetition_levels: Option<LevelBuffer>,
261        definition_levels: Option<LevelBuffer>,
262        def_meaning: Vec<DefinitionInterpretation>,
263    ) -> Self {
264        let first_list = def_meaning.iter().position(|level| level.is_list());
265        let max_visible_level = first_list.map(|first_list| {
266            def_meaning
267                .iter()
268                .map(|level| level.num_def_levels())
269                .take(first_list)
270                .sum::<u16>()
271        });
272        Self {
273            repetition_levels: repetition_levels.map(Arc::from),
274            definition_levels: definition_levels.map(Arc::from),
275            def_meaning,
276            max_visible_level,
277        }
278    }
279
280    /// Creates an empty SerializedRepDefs (no repetition, all valid)
281    pub fn empty(def_meaning: Vec<DefinitionInterpretation>) -> Self {
282        Self {
283            repetition_levels: None,
284            definition_levels: None,
285            def_meaning,
286            max_visible_level: None,
287        }
288    }
289
290    pub fn rep_slicer(&self) -> Option<RepDefSlicer<'_>> {
291        self.repetition_levels
292            .as_ref()
293            .map(|rep| RepDefSlicer::new(self, rep.clone()))
294    }
295
296    pub fn def_slicer(&self) -> Option<RepDefSlicer<'_>> {
297        self.definition_levels
298            .as_ref()
299            .map(|def| RepDefSlicer::new(self, def.clone()))
300    }
301}
302
303/// Slices a level buffer into pieces
304///
305/// This is needed to handle the fact that a level buffer may have more
306/// levels than values due to special (empty/null) lists.
307///
308/// As a result, a call to `slice_next(10)` may return 10 levels or it may
309/// return more than 10 levels if any special values are encountered.
310#[derive(Debug)]
311pub struct RepDefSlicer<'a> {
312    repdef: &'a SerializedRepDefs,
313    to_slice: LanceBuffer,
314    current: usize,
315}
316
317// TODO: All of this logic will need some changing when we compress rep/def levels.
318impl<'a> RepDefSlicer<'a> {
319    fn new(repdef: &'a SerializedRepDefs, levels: Arc<[u16]>) -> Self {
320        Self {
321            repdef,
322            to_slice: LanceBuffer::reinterpret_slice(levels),
323            current: 0,
324        }
325    }
326
327    pub fn num_levels(&self) -> usize {
328        self.to_slice.len() / 2
329    }
330
331    pub fn num_levels_remaining(&self) -> usize {
332        self.num_levels() - self.current
333    }
334
335    pub fn all_levels(&self) -> &LanceBuffer {
336        &self.to_slice
337    }
338
339    /// Returns the rest of the levels not yet sliced
340    ///
341    /// This must be called instead of `slice_next` on the final iteration.
342    /// This is because anytime we slice there may be empty/null lists on the
343    /// boundary that are "free" and the current behavior in `slice_next` is to
344    /// leave them for the next call.
345    ///
346    /// `slice_rest` will slice all remaining levels and return them.
347    pub fn slice_rest(&mut self) -> LanceBuffer {
348        let start = self.current;
349        let remaining = self.num_levels_remaining();
350        self.current = self.num_levels();
351        self.to_slice.slice_with_length(start * 2, remaining * 2)
352    }
353
354    /// Returns enough levels to satisfy the next `num_values` values
355    pub fn slice_next(&mut self, num_values: usize) -> LanceBuffer {
356        let start = self.current;
357        let Some(max_visible_level) = self.repdef.max_visible_level else {
358            // No lists, should be 1:1 mapping from levels to values
359            self.current = start + num_values;
360            return self.to_slice.slice_with_length(start * 2, num_values * 2);
361        };
362        if let Some(def) = self.repdef.definition_levels.as_ref() {
363            // There are lists and there are def levels.  That means there may be
364            // more rep/def levels than values.  We need to scan the def levels to figure
365            // out which items are "invisible" and skip over them
366            let mut def_itr = def[start..].iter();
367            let mut num_taken = 0;
368            let mut num_passed = 0;
369            while num_taken < num_values {
370                let def_level = *def_itr.next().unwrap();
371                if def_level <= max_visible_level {
372                    num_taken += 1;
373                }
374                num_passed += 1;
375            }
376            self.current = start + num_passed;
377            self.to_slice.slice_with_length(start * 2, num_passed * 2)
378        } else {
379            // No def levels, should be 1:1 mapping from levels to values
380            self.current = start + num_values;
381            self.to_slice.slice_with_length(start * 2, num_values * 2)
382        }
383    }
384}
385
386/// This tells us how an array handles definition.  Given a stack of
387/// these and a nested array and a set of definition levels we can calculate
388/// how we should interpret the definition levels.
389///
390/// For example, if the interpretation is [AllValidItem, NullableItem] then
391/// a 0 means "valid item" and a 1 means "null struct".  If the interpretation
392/// is [NullableItem, NullableItem] then a 0 means "valid item" and a 1 means
393/// "null item" and a 2 means "null struct".
394///
395/// Lists are tricky because we might use up to two definition levels for a
396/// single layer of list nesting because we need one value to indicate "empty list"
397/// and another value to indicate "null list".
398#[derive(Debug, Copy, Clone, PartialEq, Eq)]
399pub enum DefinitionInterpretation {
400    AllValidItem,
401    AllValidList,
402    NullableItem,
403    NullableList,
404    EmptyableList,
405    NullableAndEmptyableList,
406}
407
408impl DefinitionInterpretation {
409    /// How many definition levels do we need for this layer
410    pub fn num_def_levels(&self) -> u16 {
411        match self {
412            Self::AllValidItem => 0,
413            Self::AllValidList => 0,
414            Self::NullableItem => 1,
415            Self::NullableList => 1,
416            Self::EmptyableList => 1,
417            Self::NullableAndEmptyableList => 2,
418        }
419    }
420
421    /// Does this layer have nulls?
422    pub fn is_all_valid(&self) -> bool {
423        matches!(
424            self,
425            Self::AllValidItem | Self::AllValidList | Self::EmptyableList
426        )
427    }
428
429    /// Does this layer represent a list?
430    pub fn is_list(&self) -> bool {
431        matches!(
432            self,
433            Self::AllValidList
434                | Self::NullableList
435                | Self::EmptyableList
436                | Self::NullableAndEmptyableList
437        )
438    }
439}
440
441/// The RepDefBuilder is used to collect offsets & validity buffers
442/// from arrow structures.  Once we have those we use the SerializerContext
443/// to build the actual repetition and definition levels.
444///
445/// We know ahead of time how many rep/def levels we will need (number of items
446/// in inner-most array + the number of empty/null lists in any parent arrays).
447///
448/// As a result we try and avoid any re-allocations by pre-allocating the buffers
449/// up front.  We allocate two copies of each buffer which allows us to avoid unsafe
450/// code caused by reading and writing to the same buffer (also, it's unavoidable
451/// because there are times we need to write 'faster' than we read)
452#[derive(Debug)]
453struct SerializerContext {
454    // This is built from outer-to-inner and then reversed at the end
455    def_meaning: Vec<DefinitionInterpretation>,
456    rep_levels: LevelBuffer,
457    spare_rep: LevelBuffer,
458    def_levels: LevelBuffer,
459    spare_def: LevelBuffer,
460    current_rep: u16,
461    current_def: u16,
462    current_len: usize,
463    current_num_specials: usize,
464}
465
466impl SerializerContext {
467    fn new(len: usize, num_layers: usize, max_rep: u16, max_def: u16) -> Self {
468        let def_meaning = Vec::with_capacity(num_layers);
469        Self {
470            rep_levels: if max_rep > 0 {
471                vec![0; len]
472            } else {
473                LevelBuffer::default()
474            },
475            spare_rep: if max_rep > 0 {
476                vec![0; len]
477            } else {
478                LevelBuffer::default()
479            },
480            def_levels: if max_def > 0 {
481                vec![0; len]
482            } else {
483                LevelBuffer::default()
484            },
485            spare_def: if max_def > 0 {
486                vec![0; len]
487            } else {
488                LevelBuffer::default()
489            },
490            def_meaning,
491            current_rep: max_rep,
492            current_def: max_def,
493            current_len: 0,
494            current_num_specials: 0,
495        }
496    }
497
498    fn checkout_def(&mut self, meaning: DefinitionInterpretation) -> u16 {
499        let def = self.current_def;
500        self.current_def -= meaning.num_def_levels();
501        self.def_meaning.push(meaning);
502        def
503    }
504
505    fn record_offsets(&mut self, offset_desc: &OffsetDesc) {
506        let rep_level = self.current_rep;
507        let (null_list_level, empty_list_level) =
508            match (offset_desc.validity.is_some(), offset_desc.has_empty_lists) {
509                (true, true) => {
510                    let level =
511                        self.checkout_def(DefinitionInterpretation::NullableAndEmptyableList);
512                    (level - 1, level)
513                }
514                (true, false) => (self.checkout_def(DefinitionInterpretation::NullableList), 0),
515                (false, true) => (
516                    0,
517                    self.checkout_def(DefinitionInterpretation::EmptyableList),
518                ),
519                (false, false) => {
520                    self.checkout_def(DefinitionInterpretation::AllValidList);
521                    (0, 0)
522                }
523            };
524        self.current_rep -= 1;
525
526        if let Some(validity) = &offset_desc.validity {
527            self.do_record_validity(validity, null_list_level);
528        }
529
530        // We write into the spare buffers and read from the active buffers
531        // and then swap at the end.  This way we don't write over what we
532        // are reading.
533
534        let mut new_len = 0;
535        let expected_len = offset_desc.num_values + self.current_num_specials;
536        if expected_len == 0 {
537            // Offsets [0] mean no list values, so no levels.
538            self.current_len = 0;
539            return;
540        }
541        assert!(self.rep_levels.len() >= expected_len - 1);
542        if self.def_levels.is_empty() {
543            let mut write_itr = self.spare_rep.iter_mut();
544            let mut read_iter = self.rep_levels.iter().copied();
545            for w in offset_desc.offsets.windows(2) {
546                let len = w[1] - w[0];
547                // len can't be 0 because then we'd have def levels
548                assert!(len > 0);
549                let rep = read_iter.next().unwrap();
550                let list_level = if rep == 0 { rep_level } else { rep };
551                *write_itr.next().unwrap() = list_level;
552
553                for _ in 1..len {
554                    *write_itr.next().unwrap() = 0;
555                }
556                new_len += len as usize;
557            }
558            std::mem::swap(&mut self.rep_levels, &mut self.spare_rep);
559        } else {
560            assert!(self.def_levels.len() >= expected_len - 1);
561            let mut def_write_itr = self.spare_def.iter_mut();
562            let mut rep_write_itr = self.spare_rep.iter_mut();
563            let mut rep_read_itr = self.rep_levels.iter().copied();
564            let mut def_read_itr = self.def_levels.iter().copied();
565            let specials_to_pass = self.current_num_specials;
566            let mut specials_passed = 0;
567
568            for w in offset_desc.offsets.windows(2) {
569                let mut def = def_read_itr.next().unwrap();
570                // Copy over any higher-level special values in place
571                while def > SPECIAL_THRESHOLD {
572                    *def_write_itr.next().unwrap() = def;
573                    *rep_write_itr.next().unwrap() = rep_read_itr.next().unwrap();
574                    def = def_read_itr.next().unwrap();
575                    new_len += 1;
576                    specials_passed += 1;
577                }
578
579                let len = w[1] - w[0];
580                let rep = rep_read_itr.next().unwrap();
581
582                // If the rep_level is 0 then we are the first list level
583                // otherwise we are starting a higher level list so keep
584                // existing rep level
585                let list_level = if rep == 0 { rep_level } else { rep };
586
587                if def == 0 && len > 0 {
588                    // New valid list, write a rep level and then add new 0/0 items
589                    *def_write_itr.next().unwrap() = 0;
590                    *rep_write_itr.next().unwrap() = list_level;
591
592                    for _ in 1..len {
593                        *def_write_itr.next().unwrap() = 0;
594                        *rep_write_itr.next().unwrap() = 0;
595                    }
596
597                    new_len += len as usize;
598                } else if def == 0 {
599                    // Empty list, insert new special
600                    *def_write_itr.next().unwrap() = empty_list_level + SPECIAL_THRESHOLD;
601                    *rep_write_itr.next().unwrap() = list_level;
602                    new_len += 1;
603                } else {
604                    // Either the list is null or one of its struct parents
605                    // is null.  Promote it to a special value.
606                    *def_write_itr.next().unwrap() = def + SPECIAL_THRESHOLD;
607                    *rep_write_itr.next().unwrap() = list_level;
608                    new_len += 1;
609                }
610            }
611
612            // If we have any special values at the end, we need to copy them over
613            while specials_passed < specials_to_pass {
614                *def_write_itr.next().unwrap() = def_read_itr.next().unwrap();
615                *rep_write_itr.next().unwrap() = rep_read_itr.next().unwrap();
616                new_len += 1;
617                specials_passed += 1;
618            }
619            std::mem::swap(&mut self.def_levels, &mut self.spare_def);
620            std::mem::swap(&mut self.rep_levels, &mut self.spare_rep);
621        }
622
623        self.current_len = new_len;
624        self.current_num_specials += offset_desc.num_specials;
625    }
626
627    fn do_record_validity(&mut self, validity: &BooleanBuffer, null_level: u16) {
628        assert!(self.def_levels.len() >= validity.len() + self.current_num_specials);
629        debug_assert!(
630            self.current_len == 0 || self.current_len == validity.len() + self.current_num_specials
631        );
632        self.current_len = validity.len();
633
634        let mut def_read_itr = self.def_levels.iter().copied();
635        let mut def_write_itr = self.spare_def.iter_mut();
636
637        let specials_to_pass = self.current_num_specials;
638        let mut specials_passed = 0;
639
640        for incoming_validity in validity.iter() {
641            let mut def = def_read_itr.next().unwrap();
642            while def > SPECIAL_THRESHOLD {
643                *def_write_itr.next().unwrap() = def;
644                def = def_read_itr.next().unwrap();
645                specials_passed += 1;
646            }
647            if def == 0 && !incoming_validity {
648                *def_write_itr.next().unwrap() = null_level;
649            } else {
650                *def_write_itr.next().unwrap() = def;
651            }
652        }
653
654        while specials_passed < specials_to_pass {
655            *def_write_itr.next().unwrap() = def_read_itr.next().unwrap();
656            specials_passed += 1;
657        }
658
659        std::mem::swap(&mut self.def_levels, &mut self.spare_def);
660    }
661
662    fn multiply_levels(&mut self, multiplier: usize) {
663        let old_len = self.current_len;
664        // All non-special values will be broadcasted by the multiplier.  Special values are copied as-is.
665        self.current_len =
666            (self.current_len - self.current_num_specials) * multiplier + self.current_num_specials;
667
668        if self.rep_levels.is_empty() && self.def_levels.is_empty() {
669            // All valid with no rep/def levels, nothing to do
670            return;
671        } else if self.rep_levels.is_empty() {
672            assert!(self.def_levels.len() >= self.current_len);
673            // No rep levels, just multiply the def levels
674            let mut def_read_itr = self.def_levels.iter().copied();
675            let mut def_write_itr = self.spare_def.iter_mut();
676            for _ in 0..old_len {
677                let mut def = def_read_itr.next().unwrap();
678                while def > SPECIAL_THRESHOLD {
679                    *def_write_itr.next().unwrap() = def;
680                    def = def_read_itr.next().unwrap();
681                }
682                for _ in 0..multiplier {
683                    *def_write_itr.next().unwrap() = def;
684                }
685            }
686        } else if self.def_levels.is_empty() {
687            assert!(self.rep_levels.len() >= self.current_len);
688            // No def levels, just multiply the rep levels
689            let mut rep_read_itr = self.rep_levels.iter().copied();
690            let mut rep_write_itr = self.spare_rep.iter_mut();
691            for _ in 0..old_len {
692                let rep = rep_read_itr.next().unwrap();
693                for _ in 0..multiplier {
694                    *rep_write_itr.next().unwrap() = rep;
695                }
696            }
697        } else {
698            assert!(self.rep_levels.len() >= self.current_len);
699            assert!(self.def_levels.len() >= self.current_len);
700            let mut rep_read_itr = self.rep_levels.iter().copied();
701            let mut def_read_itr = self.def_levels.iter().copied();
702            let mut rep_write_itr = self.spare_rep.iter_mut();
703            let mut def_write_itr = self.spare_def.iter_mut();
704            for _ in 0..old_len {
705                let mut def = def_read_itr.next().unwrap();
706                while def > SPECIAL_THRESHOLD {
707                    *def_write_itr.next().unwrap() = def;
708                    *rep_write_itr.next().unwrap() = rep_read_itr.next().unwrap();
709                    def = def_read_itr.next().unwrap();
710                }
711                let rep = rep_read_itr.next().unwrap();
712                for _ in 0..multiplier {
713                    *def_write_itr.next().unwrap() = def;
714                    *rep_write_itr.next().unwrap() = rep;
715                }
716            }
717        }
718        std::mem::swap(&mut self.def_levels, &mut self.spare_def);
719        std::mem::swap(&mut self.rep_levels, &mut self.spare_rep);
720    }
721
722    fn record_validity_buf(&mut self, validity: &Option<BooleanBuffer>) {
723        if let Some(validity) = validity {
724            let def_level = self.checkout_def(DefinitionInterpretation::NullableItem);
725            self.do_record_validity(validity, def_level);
726        } else {
727            self.checkout_def(DefinitionInterpretation::AllValidItem);
728        }
729    }
730
731    fn record_validity(&mut self, validity_desc: &ValidityDesc) {
732        self.record_validity_buf(&validity_desc.validity)
733    }
734
735    fn record_fsl(&mut self, fsl_desc: &FslDesc) {
736        self.record_validity_buf(&fsl_desc.validity);
737        self.multiply_levels(fsl_desc.dimension);
738    }
739
740    fn normalize_specials(&mut self) {
741        for def in self.def_levels.iter_mut() {
742            if *def > SPECIAL_THRESHOLD {
743                *def -= SPECIAL_THRESHOLD;
744            }
745        }
746    }
747
748    fn build(mut self) -> SerializedRepDefs {
749        if self.current_len == 0 {
750            return SerializedRepDefs::new(None, None, self.def_meaning);
751        }
752
753        self.normalize_specials();
754
755        let definition_levels = if self.def_levels.is_empty() {
756            None
757        } else {
758            Some(self.def_levels)
759        };
760        let repetition_levels = if self.rep_levels.is_empty() {
761            None
762        } else {
763            Some(self.rep_levels)
764        };
765
766        // Need to reverse the def meaning since we build rep / def levels in reverse
767        let def_meaning = self.def_meaning.into_iter().rev().collect::<Vec<_>>();
768
769        SerializedRepDefs::new(repetition_levels, definition_levels, def_meaning)
770    }
771}
772
773/// A structure used to collect validity buffers and offsets from arrow
774/// arrays and eventually create repetition and definition levels
775///
776/// As we are encoding the structural encoders are given this struct and
777/// will record the arrow information into it.  Once we hit a leaf node we
778/// serialize the data into rep/def levels and write these into the page.
779#[derive(Clone, Default, Debug)]
780pub struct RepDefBuilder {
781    // The rep/def info we have collected so far
782    repdefs: Vec<RawRepDef>,
783    // The current length, can get larger as we traverse lists (e.g. an
784    // array might have 5 lists which results in 50 items)
785    //
786    // Starts uninitialized until we see the first rep/def item
787    len: Option<usize>,
788}
789
790impl RepDefBuilder {
791    fn check_validity_len(&mut self, incoming_len: usize) {
792        if let Some(len) = self.len {
793            assert_eq!(incoming_len, len);
794        } else {
795            // First validity buffer we've seen
796            self.len = Some(incoming_len);
797        }
798    }
799
800    fn num_layers(&self) -> usize {
801        self.repdefs.len()
802    }
803
804    /// The builder is "empty" if there is no repetition and no nulls.  In this case we don't need
805    /// to store anything to disk (except the description)
806    pub fn is_empty(&self) -> bool {
807        self.repdefs
808            .iter()
809            .all(|r| matches!(r, RawRepDef::Validity(ValidityDesc { validity: None, .. })))
810    }
811
812    /// Returns true if there is only a single layer of definition
813    pub fn is_simple_validity(&self) -> bool {
814        self.repdefs.len() == 1 && matches!(self.repdefs[0], RawRepDef::Validity(_))
815    }
816
817    /// Registers a nullable validity bitmap
818    pub fn add_validity_bitmap(&mut self, validity: NullBuffer) {
819        self.check_validity_len(validity.len());
820        self.repdefs.push(RawRepDef::Validity(ValidityDesc {
821            num_values: validity.len(),
822            validity: Some(validity.into_inner()),
823        }));
824    }
825
826    /// Registers an all-valid validity layer
827    pub fn add_no_null(&mut self, len: usize) {
828        self.check_validity_len(len);
829        self.repdefs.push(RawRepDef::Validity(ValidityDesc {
830            validity: None,
831            num_values: len,
832        }));
833    }
834
835    pub fn add_fsl(&mut self, validity: Option<NullBuffer>, dimension: usize, num_values: usize) {
836        if let Some(len) = self.len {
837            assert_eq!(num_values, len);
838        }
839        self.len = Some(num_values * dimension);
840        debug_assert!(validity.is_none() || validity.as_ref().unwrap().len() == num_values);
841        self.repdefs.push(RawRepDef::Fsl(FslDesc {
842            num_values,
843            validity: validity.map(|v| v.into_inner()),
844            dimension,
845        }))
846    }
847
848    fn check_offset_len(&mut self, offsets: &[i64]) {
849        if let Some(len) = self.len {
850            assert!(offsets.len() == len + 1);
851        }
852        self.len = Some(offsets[offsets.len() - 1] as usize);
853    }
854
855    fn do_add_offsets(
856        &mut self,
857        lengths: impl Iterator<Item = i64>,
858        validity: Option<NullBuffer>,
859        capacity: usize,
860    ) -> bool {
861        let mut num_specials = 0;
862        let mut has_empty_lists = false;
863        let mut has_garbage_values = false;
864        let mut last_off: i64 = 0;
865
866        let mut normalized_offsets = Vec::with_capacity(capacity);
867        normalized_offsets.push(0);
868
869        if let Some(ref validity) = validity {
870            for (len, is_valid) in lengths.zip(validity.iter()) {
871                match (is_valid, len == 0) {
872                    (false, is_empty) => {
873                        num_specials += 1;
874                        has_garbage_values |= !is_empty;
875                    }
876                    (true, true) => {
877                        num_specials += 1;
878                        has_empty_lists = true;
879                    }
880                    _ => {
881                        last_off += len;
882                    }
883                }
884                normalized_offsets.push(last_off);
885            }
886        } else {
887            for len in lengths {
888                if len == 0 {
889                    num_specials += 1;
890                    has_empty_lists = true;
891                }
892                last_off += len;
893                normalized_offsets.push(last_off);
894            }
895        }
896
897        self.check_offset_len(&normalized_offsets);
898        self.repdefs.push(RawRepDef::Offsets(OffsetDesc {
899            num_values: normalized_offsets.len() - 1,
900            offsets: normalized_offsets.into(),
901            validity: validity.map(|v| v.into_inner()),
902            has_empty_lists,
903            num_specials: num_specials as usize,
904        }));
905
906        has_garbage_values
907    }
908
909    /// Adds a layer of offsets
910    ///
911    /// Offsets are casted to a common type (i64) and also normalized.  Null lists are
912    /// always represented by a zero-length (identical) pair of offsets and so the caller
913    /// should filter out any garbage items before encoding them.  To assist with this the
914    /// method will return true if any non-empty null lists were found.
915    pub fn add_offsets<O: OffsetSizeTrait>(
916        &mut self,
917        offsets: OffsetBuffer<O>,
918        validity: Option<NullBuffer>,
919    ) -> bool {
920        let inner = offsets.into_inner();
921        let buffer_len = inner.len();
922
923        if O::IS_LARGE {
924            let i64_buff = ScalarBuffer::<i64>::new(inner.into_inner(), 0, buffer_len);
925            let lengths = i64_buff.windows(2).map(|off| off[1] - off[0]);
926            self.do_add_offsets(lengths, validity, buffer_len)
927        } else {
928            let i32_buff = ScalarBuffer::<i32>::new(inner.into_inner(), 0, buffer_len);
929            let lengths = i32_buff.windows(2).map(|off| (off[1] - off[0]) as i64);
930            self.do_add_offsets(lengths, validity, buffer_len)
931        }
932    }
933
934    // When we are encoding data it arrives in batches.  For each batch we create a RepDefBuilder and collect the
935    // various validity buffers and offset buffers from that batch.  Once we have enough batches to write a page we
936    // need to take this collection of RepDefBuilders and concatenate them and then serialize them into rep/def levels.
937    //
938    // TODO: In the future, we may concatenate and serialize at the same time?
939    //
940    // This method takes care of the concatenation part.  First we collect all of layer 0 from each builder, then we
941    // call this method.  Then we collect all of layer 1 from each builder and call this method.  And so on.
942    //
943    // That means this method should get a collection of `RawRepDef` where each item is the same kind (all validity or
944    // all offsets) though the nullability / lengths may be different in each layer.
945    fn concat_layers<'a>(
946        layers: impl Iterator<Item = &'a RawRepDef>,
947        num_layers: usize,
948    ) -> RawRepDef {
949        enum LayerKind {
950            Validity,
951            Fsl,
952            Offsets,
953        }
954
955        // We make two passes through the layers.  The first determines if we need to pay the cost of allocating
956        // buffers.  The second pass actually adds the values.
957        let mut collected = Vec::with_capacity(num_layers);
958        let mut has_nulls = false;
959        let mut layer_kind = LayerKind::Validity;
960        let mut total_num_specials = 0;
961        let mut all_dimension = 0;
962        let mut all_has_empty_lists = false;
963        let mut all_num_values = 0;
964        for layer in layers {
965            has_nulls |= layer.has_nulls();
966            match layer {
967                RawRepDef::Validity(_) => {
968                    layer_kind = LayerKind::Validity;
969                }
970                RawRepDef::Offsets(OffsetDesc {
971                    num_specials,
972                    has_empty_lists,
973                    ..
974                }) => {
975                    all_has_empty_lists |= *has_empty_lists;
976                    layer_kind = LayerKind::Offsets;
977                    total_num_specials += num_specials;
978                }
979                RawRepDef::Fsl(FslDesc { dimension, .. }) => {
980                    layer_kind = LayerKind::Fsl;
981                    all_dimension = *dimension;
982                }
983            }
984            collected.push(layer);
985            all_num_values += layer.num_values();
986        }
987
988        // Shortcut if there are no nulls
989        if !has_nulls {
990            match layer_kind {
991                LayerKind::Validity => {
992                    return RawRepDef::Validity(ValidityDesc {
993                        validity: None,
994                        num_values: all_num_values,
995                    });
996                }
997                LayerKind::Fsl => {
998                    return RawRepDef::Fsl(FslDesc {
999                        validity: None,
1000                        num_values: all_num_values,
1001                        dimension: all_dimension,
1002                    });
1003                }
1004                LayerKind::Offsets => {}
1005            }
1006        }
1007
1008        // Only allocate if needed
1009        let mut validity_builder = if has_nulls {
1010            BooleanBufferBuilder::new(all_num_values)
1011        } else {
1012            BooleanBufferBuilder::new(0)
1013        };
1014        let mut all_offsets = if matches!(layer_kind, LayerKind::Offsets) {
1015            let mut all_offsets = Vec::with_capacity(all_num_values);
1016            all_offsets.push(0);
1017            all_offsets
1018        } else {
1019            Vec::new()
1020        };
1021
1022        for layer in collected {
1023            match layer {
1024                RawRepDef::Validity(ValidityDesc {
1025                    validity: Some(validity),
1026                    ..
1027                }) => {
1028                    validity_builder.append_buffer(validity);
1029                }
1030                RawRepDef::Validity(ValidityDesc {
1031                    validity: None,
1032                    num_values,
1033                }) => {
1034                    validity_builder.append_n(*num_values, true);
1035                }
1036                RawRepDef::Fsl(FslDesc {
1037                    validity,
1038                    num_values,
1039                    ..
1040                }) => {
1041                    if let Some(validity) = validity {
1042                        validity_builder.append_buffer(validity);
1043                    } else {
1044                        validity_builder.append_n(*num_values, true);
1045                    }
1046                }
1047                RawRepDef::Offsets(OffsetDesc {
1048                    offsets,
1049                    validity: Some(validity),
1050                    has_empty_lists,
1051                    ..
1052                }) => {
1053                    all_has_empty_lists |= has_empty_lists;
1054                    validity_builder.append_buffer(validity);
1055                    let last = *all_offsets.last().unwrap();
1056                    all_offsets.extend(offsets.iter().skip(1).map(|off| *off + last));
1057                }
1058                RawRepDef::Offsets(OffsetDesc {
1059                    offsets,
1060                    validity: None,
1061                    has_empty_lists,
1062                    num_values,
1063                    ..
1064                }) => {
1065                    all_has_empty_lists |= has_empty_lists;
1066                    if has_nulls {
1067                        validity_builder.append_n(*num_values, true);
1068                    }
1069                    let last = *all_offsets.last().unwrap();
1070                    all_offsets.extend(offsets.iter().skip(1).map(|off| *off + last));
1071                }
1072            }
1073        }
1074        let validity = if has_nulls {
1075            Some(validity_builder.finish())
1076        } else {
1077            None
1078        };
1079        match layer_kind {
1080            LayerKind::Fsl => RawRepDef::Fsl(FslDesc {
1081                validity,
1082                num_values: all_num_values,
1083                dimension: all_dimension,
1084            }),
1085            LayerKind::Validity => RawRepDef::Validity(ValidityDesc {
1086                validity,
1087                num_values: all_num_values,
1088            }),
1089            LayerKind::Offsets => RawRepDef::Offsets(OffsetDesc {
1090                offsets: all_offsets.into(),
1091                validity,
1092                has_empty_lists: all_has_empty_lists,
1093                num_values: all_num_values,
1094                num_specials: total_num_specials,
1095            }),
1096        }
1097    }
1098
1099    /// Converts the validity / offsets buffers that have been gathered so far
1100    /// into repetition and definition levels
1101    pub fn serialize(builders: Vec<Self>) -> SerializedRepDefs {
1102        assert!(!builders.is_empty());
1103        if builders.iter().all(|b| b.is_empty()) {
1104            // No repetition, all-valid
1105            return SerializedRepDefs::empty(
1106                builders
1107                    .first()
1108                    .unwrap()
1109                    .repdefs
1110                    .iter()
1111                    .map(|_| DefinitionInterpretation::AllValidItem)
1112                    .collect::<Vec<_>>(),
1113            );
1114        }
1115
1116        let num_layers = builders[0].num_layers();
1117        let combined_layers = (0..num_layers)
1118            .map(|layer_index| {
1119                Self::concat_layers(
1120                    builders.iter().map(|b| &b.repdefs[layer_index]),
1121                    builders.len(),
1122                )
1123            })
1124            .collect::<Vec<_>>();
1125        debug_assert!(
1126            builders
1127                .iter()
1128                .all(|b| b.num_layers() == builders[0].num_layers())
1129        );
1130
1131        let total_len = combined_layers.last().unwrap().num_values()
1132            + combined_layers
1133                .iter()
1134                .map(|l| l.num_specials())
1135                .sum::<usize>();
1136        let max_rep = combined_layers.iter().map(|l| l.max_rep()).sum::<u16>();
1137        let max_def = combined_layers.iter().map(|l| l.max_def()).sum::<u16>();
1138
1139        let mut context = SerializerContext::new(total_len, num_layers, max_rep, max_def);
1140        for layer in combined_layers.into_iter() {
1141            match layer {
1142                RawRepDef::Validity(def) => {
1143                    context.record_validity(&def);
1144                }
1145                RawRepDef::Offsets(rep) => {
1146                    context.record_offsets(&rep);
1147                }
1148                RawRepDef::Fsl(fsl) => {
1149                    context.record_fsl(&fsl);
1150                }
1151            }
1152        }
1153        context.build()
1154    }
1155}
1156
1157/// Starts with serialized repetition and definition levels and unravels
1158/// them into validity buffers and offsets buffers
1159///
1160/// This is used during decoding to create the necessary arrow structures
1161#[derive(Debug)]
1162pub struct RepDefUnraveler {
1163    rep_levels: Option<LevelBuffer>,
1164    def_levels: Option<LevelBuffer>,
1165    // Maps from definition level to the rep level at which that definition level is visible
1166    levels_to_rep: Vec<u16>,
1167    def_meaning: Arc<[DefinitionInterpretation]>,
1168    // Current definition level to compare to.
1169    current_def_cmp: u16,
1170    // Current rep level, determines which specials we can see
1171    current_rep_cmp: u16,
1172    // Current layer index, 0 means inner-most layer and it counts up from there.  Used to index
1173    // into special_defs
1174    current_layer: usize,
1175    // Number of items in the inner-most layer (needed if the definition levels are not present)
1176    num_items: u64,
1177}
1178
1179impl RepDefUnraveler {
1180    /// Creates a new unraveler from serialized repetition and definition information
1181    pub fn new(
1182        rep_levels: Option<LevelBuffer>,
1183        def_levels: Option<LevelBuffer>,
1184        def_meaning: Arc<[DefinitionInterpretation]>,
1185        num_items: u64,
1186    ) -> Self {
1187        let mut levels_to_rep = Vec::with_capacity(def_meaning.len());
1188        let mut rep_counter = 0;
1189        // Level=0 is always visible and means valid item
1190        levels_to_rep.push(0);
1191        for meaning in def_meaning.as_ref() {
1192            match meaning {
1193                DefinitionInterpretation::AllValidItem | DefinitionInterpretation::AllValidList => {
1194                    // There is no corresponding level, so nothing to put in levels_to_rep
1195                }
1196                DefinitionInterpretation::NullableItem => {
1197                    // Some null structs are not visible at inner rep levels in cases like LIST<STRUCT<LIST<...>>>
1198                    levels_to_rep.push(rep_counter);
1199                }
1200                DefinitionInterpretation::NullableList => {
1201                    rep_counter += 1;
1202                    levels_to_rep.push(rep_counter);
1203                }
1204                DefinitionInterpretation::EmptyableList => {
1205                    rep_counter += 1;
1206                    levels_to_rep.push(rep_counter);
1207                }
1208                DefinitionInterpretation::NullableAndEmptyableList => {
1209                    rep_counter += 1;
1210                    levels_to_rep.push(rep_counter);
1211                    levels_to_rep.push(rep_counter);
1212                }
1213            }
1214        }
1215        Self {
1216            rep_levels,
1217            def_levels,
1218            current_def_cmp: 0,
1219            current_rep_cmp: 0,
1220            levels_to_rep,
1221            current_layer: 0,
1222            def_meaning,
1223            num_items,
1224        }
1225    }
1226
1227    pub fn is_all_valid(&self) -> bool {
1228        self.def_levels.is_none() || self.def_meaning[self.current_layer].is_all_valid()
1229    }
1230
1231    /// If the current level is a repetition layer then this returns the number of lists
1232    /// at this level.
1233    ///
1234    /// This is not valid to call when the current level is a struct/primitive layer because
1235    /// in some cases there may be no rep or def information to know this.
1236    pub fn max_lists(&self) -> usize {
1237        debug_assert!(
1238            self.def_meaning[self.current_layer] != DefinitionInterpretation::NullableItem
1239        );
1240        self.rep_levels
1241            .as_ref()
1242            // Worst case every rep item is max_rep and a new list
1243            .map(|levels| levels.len())
1244            .unwrap_or(0)
1245    }
1246
1247    /// Unravels a layer of offsets from the unraveler into the given offset width
1248    ///
1249    /// When decoding a list the caller should first unravel the offsets and then
1250    /// unravel the validity (this is the opposite order used during encoding)
1251    pub fn unravel_offsets<T: ArrowNativeType>(
1252        &mut self,
1253        offsets: &mut Vec<T>,
1254        validity: Option<&mut BooleanBufferBuilder>,
1255    ) -> Result<()> {
1256        let rep_levels = self
1257            .rep_levels
1258            .as_mut()
1259            .expect("Expected repetition level but data didn't contain repetition");
1260        let valid_level = self.current_def_cmp;
1261        let (null_level, empty_level) = match self.def_meaning[self.current_layer] {
1262            DefinitionInterpretation::NullableList => {
1263                self.current_def_cmp += 1;
1264                (valid_level + 1, 0)
1265            }
1266            DefinitionInterpretation::EmptyableList => {
1267                self.current_def_cmp += 1;
1268                (0, valid_level + 1)
1269            }
1270            DefinitionInterpretation::NullableAndEmptyableList => {
1271                self.current_def_cmp += 2;
1272                (valid_level + 1, valid_level + 2)
1273            }
1274            DefinitionInterpretation::AllValidList => (0, 0),
1275            _ => unreachable!(),
1276        };
1277        self.current_layer += 1;
1278
1279        // This is the highest def level that is still visible.  Once we hit a list then
1280        // we stop looking because any null / empty list (or list masked by a higher level
1281        // null) will not be visible
1282        let mut max_level = null_level.max(empty_level).max(valid_level);
1283        // Anything higher than this (but less than max_level) is a null struct masking our
1284        // list.  We will materialize this is a null list.
1285        let upper_null = max_level;
1286        for level in self.def_meaning[self.current_layer..].iter() {
1287            match level {
1288                DefinitionInterpretation::NullableItem => {
1289                    max_level += 1;
1290                }
1291                DefinitionInterpretation::AllValidItem => {}
1292                _ => {
1293                    break;
1294                }
1295            }
1296        }
1297
1298        let mut curlen: usize = offsets.last().map(|o| o.as_usize()).unwrap_or(0);
1299
1300        // If offsets is empty this is a no-op.  If offsets is not empty that means we already
1301        // added a set of offsets.  For example, we might have added [0, 3, 5] (2 lists).  Now
1302        // say we want to add [0, 1, 4] (2 lists).  We should get [0, 3, 5, 6, 9] (4 lists).  If
1303        // we don't pop here we get [0, 3, 5, 5, 6, 9] which is wrong.
1304        //
1305        // Or, to think about it another way, if every unraveler adds the starting 0 and the trailing
1306        // length then we have N + unravelers.len() values instead of N + 1.
1307        offsets.pop();
1308
1309        let to_offset = |val: usize| {
1310            T::from_usize(val)
1311            .ok_or_else(|| Error::invalid_input("A single batch had more than i32::MAX values and so a large container type is required"))
1312        };
1313        self.current_rep_cmp += 1;
1314        if let Some(def_levels) = &mut self.def_levels {
1315            assert!(rep_levels.len() == def_levels.len());
1316            // It's possible validity is None even if we have def levels.  For example, we might have
1317            // empty lists (which require def levels) but no nulls.
1318            let mut push_validity: Box<dyn FnMut(bool)> = if let Some(validity) = validity {
1319                Box::new(|is_valid| validity.append(is_valid))
1320            } else {
1321                Box::new(|_| {})
1322            };
1323            // This is a strange access pattern.  We are iterating over the rep/def levels and
1324            // at the same time writing the rep/def levels.  This means we need both a mutable
1325            // and immutable reference to the rep/def levels.
1326            let mut read_idx = 0;
1327            let mut write_idx = 0;
1328            while read_idx < rep_levels.len() {
1329                // SAFETY: We assert that rep_levels and def_levels have the same
1330                // len and read_idx and write_idx can never go past the end.
1331                unsafe {
1332                    let rep_val = *rep_levels.get_unchecked(read_idx);
1333                    if rep_val != 0 {
1334                        let def_val = *def_levels.get_unchecked(read_idx);
1335                        // Copy over
1336                        *rep_levels.get_unchecked_mut(write_idx) = rep_val - 1;
1337                        *def_levels.get_unchecked_mut(write_idx) = def_val;
1338                        write_idx += 1;
1339
1340                        if def_val == 0 {
1341                            // This is a valid list
1342                            offsets.push(to_offset(curlen)?);
1343                            curlen += 1;
1344                            push_validity(true);
1345                        } else if def_val > max_level {
1346                            // This is not visible at this rep level, do not add to offsets, but keep in repdef
1347                        } else if def_val == null_level || def_val > upper_null {
1348                            // This is a null list (or a list masked by a null struct)
1349                            offsets.push(to_offset(curlen)?);
1350                            push_validity(false);
1351                        } else if def_val == empty_level {
1352                            // This is an empty list
1353                            offsets.push(to_offset(curlen)?);
1354                            push_validity(true);
1355                        } else {
1356                            // New valid list starting with null item
1357                            offsets.push(to_offset(curlen)?);
1358                            curlen += 1;
1359                            push_validity(true);
1360                        }
1361                    } else {
1362                        curlen += 1;
1363                    }
1364                    read_idx += 1;
1365                }
1366            }
1367            offsets.push(to_offset(curlen)?);
1368            rep_levels.truncate(write_idx);
1369            def_levels.truncate(write_idx);
1370            Ok(())
1371        } else {
1372            // SAFETY: See above loop
1373            let mut read_idx = 0;
1374            let mut write_idx = 0;
1375            let old_offsets_len = offsets.len();
1376            while read_idx < rep_levels.len() {
1377                // SAFETY: read_idx / write_idx cannot go past rep_levels.len()
1378                unsafe {
1379                    let rep_val = *rep_levels.get_unchecked(read_idx);
1380                    if rep_val != 0 {
1381                        // Finish the current list
1382                        offsets.push(to_offset(curlen)?);
1383                        *rep_levels.get_unchecked_mut(write_idx) = rep_val - 1;
1384                        write_idx += 1;
1385                    }
1386                    curlen += 1;
1387                    read_idx += 1;
1388                }
1389            }
1390            let num_new_lists = offsets.len() - old_offsets_len;
1391            offsets.push(to_offset(curlen)?);
1392            rep_levels.truncate(offsets.len() - 1);
1393            if let Some(validity) = validity {
1394                // Even though we don't have validity it is possible another unraveler did and so we need
1395                // to push all valids
1396                validity.append_n(num_new_lists, true);
1397            }
1398            Ok(())
1399        }
1400    }
1401
1402    pub fn skip_validity(&mut self) {
1403        debug_assert!(self.is_all_valid());
1404        self.current_layer += 1;
1405    }
1406
1407    /// Unravels a layer of validity from the definition levels
1408    pub fn unravel_validity(&mut self, validity: &mut BooleanBufferBuilder) {
1409        let meaning = self.def_meaning[self.current_layer];
1410        if meaning == DefinitionInterpretation::AllValidItem || self.def_levels.is_none() {
1411            self.current_layer += 1;
1412            validity.append_n(self.num_items as usize, true);
1413            return;
1414        }
1415
1416        self.current_layer += 1;
1417        let def_levels = &self.def_levels.as_ref().unwrap();
1418
1419        let current_def_cmp = self.current_def_cmp;
1420        self.current_def_cmp += 1;
1421
1422        for is_valid in def_levels.iter().filter_map(|&level| {
1423            if self.levels_to_rep[level as usize] <= self.current_rep_cmp {
1424                Some(level <= current_def_cmp)
1425            } else {
1426                None
1427            }
1428        }) {
1429            validity.append(is_valid);
1430        }
1431    }
1432
1433    pub fn decimate(&mut self, dimension: usize) {
1434        if self.rep_levels.is_some() {
1435            // If we need to support this then I think we need to walk through the rep def levels to find
1436            // the spots at which we keep.  E.g. if we have:
1437            //  rep: 1 0 0 1 0 1 0 0 0 1 0 0
1438            //  def: 1 1 1 0 1 0 1 1 0 1 1 0
1439            //  dimension: 2
1440            //
1441            // The output should be:
1442            //  rep: 1 0 0 1 0 0 0
1443            //  def: 1 1 1 0 1 1 0
1444            //
1445            // Maybe there's some special logic for empty/null lists?  I'll save the headache for future me.
1446            todo!("Not yet supported FSL<...List<...>>");
1447        }
1448        let Some(def_levels) = self.def_levels.as_mut() else {
1449            return;
1450        };
1451        let mut read_idx = 0;
1452        let mut write_idx = 0;
1453        while read_idx < def_levels.len() {
1454            unsafe {
1455                *def_levels.get_unchecked_mut(write_idx) = *def_levels.get_unchecked(read_idx);
1456            }
1457            write_idx += 1;
1458            read_idx += dimension;
1459        }
1460        def_levels.truncate(write_idx);
1461    }
1462}
1463
1464/// As we decode we may extract rep/def information from multiple pages (or multiple
1465/// chunks within a page).
1466///
1467/// For each chunk we create an unraveler.  Each unraveler can have a completely different
1468/// interpretation (e.g. one page might contain null items but no null structs and the next
1469/// page might have null structs but no null items).
1470///
1471/// Concatenating these unravelers would be tricky and expensive so instead we have a
1472/// composite unraveler which unravels across multiple unravelers.
1473///
1474/// Note: this class should be used even if there is only one page / unraveler.  This is
1475/// because the `RepDefUnraveler`'s API is more complex (it's meant to be called by this
1476/// class)
1477#[derive(Debug)]
1478pub struct CompositeRepDefUnraveler {
1479    unravelers: Vec<RepDefUnraveler>,
1480}
1481
1482impl CompositeRepDefUnraveler {
1483    pub fn new(unravelers: Vec<RepDefUnraveler>) -> Self {
1484        Self { unravelers }
1485    }
1486
1487    /// Unravels a layer of validity
1488    ///
1489    /// Returns None if there are no null items in this layer
1490    pub fn unravel_validity(&mut self, num_values: usize) -> Option<NullBuffer> {
1491        let is_all_valid = self
1492            .unravelers
1493            .iter()
1494            .all(|unraveler| unraveler.is_all_valid());
1495
1496        if is_all_valid {
1497            for unraveler in self.unravelers.iter_mut() {
1498                unraveler.skip_validity();
1499            }
1500            None
1501        } else {
1502            let mut validity = BooleanBufferBuilder::new(num_values);
1503            for unraveler in self.unravelers.iter_mut() {
1504                unraveler.unravel_validity(&mut validity);
1505            }
1506            Some(NullBuffer::new(validity.finish()))
1507        }
1508    }
1509
1510    pub fn unravel_fsl_validity(
1511        &mut self,
1512        num_values: usize,
1513        dimension: usize,
1514    ) -> Option<NullBuffer> {
1515        for unraveler in self.unravelers.iter_mut() {
1516            unraveler.decimate(dimension);
1517        }
1518        self.unravel_validity(num_values)
1519    }
1520
1521    /// Unravels a layer of offsets (and the validity for that layer)
1522    pub fn unravel_offsets<T: ArrowNativeType>(
1523        &mut self,
1524    ) -> Result<(OffsetBuffer<T>, Option<NullBuffer>)> {
1525        let mut is_all_valid = true;
1526        let mut max_num_lists = 0;
1527        for unraveler in self.unravelers.iter() {
1528            is_all_valid &= unraveler.is_all_valid();
1529            max_num_lists += unraveler.max_lists();
1530        }
1531
1532        let mut validity = if is_all_valid {
1533            None
1534        } else {
1535            // Note: This is probably an over-estimate and potentially even an under-estimate.  We only know
1536            // right now how many items we have and not how many rows.  (TODO: Shouldn't we know the # of rows?)
1537            Some(BooleanBufferBuilder::new(max_num_lists))
1538        };
1539
1540        let mut offsets = Vec::with_capacity(max_num_lists + 1);
1541
1542        for unraveler in self.unravelers.iter_mut() {
1543            unraveler.unravel_offsets(&mut offsets, validity.as_mut())?;
1544        }
1545
1546        Ok((
1547            OffsetBuffer::new(ScalarBuffer::from(offsets)),
1548            validity.map(|mut v| NullBuffer::new(v.finish())),
1549        ))
1550    }
1551}
1552
1553/// A [`ControlWordIterator`] when there are both repetition and definition levels
1554///
1555/// The iterator will put the repetition level in the upper bits and the definition
1556/// level in the lower bits.  The number of bits used for each level is determined
1557/// by the width of the repetition and definition levels.
1558#[derive(Debug)]
1559pub struct BinaryControlWordIterator<I: Iterator<Item = (u16, u16)>, W> {
1560    repdef: I,
1561    def_width: usize,
1562    max_rep: u16,
1563    max_visible_def: u16,
1564    rep_mask: u16,
1565    def_mask: u16,
1566    bits_rep: u8,
1567    bits_def: u8,
1568    phantom: std::marker::PhantomData<W>,
1569}
1570
1571impl<I: Iterator<Item = (u16, u16)>> BinaryControlWordIterator<I, u8> {
1572    fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1573        let next = self.repdef.next()?;
1574        let control_word: u8 =
1575            (((next.0 & self.rep_mask) as u8) << self.def_width) + ((next.1 & self.def_mask) as u8);
1576        buf.push(control_word);
1577        let is_new_row = next.0 == self.max_rep;
1578        let is_visible = next.1 <= self.max_visible_def;
1579        let is_valid_item = next.1 == 0;
1580        Some(ControlWordDesc {
1581            is_new_row,
1582            is_visible,
1583            is_valid_item,
1584        })
1585    }
1586}
1587
1588impl<I: Iterator<Item = (u16, u16)>> BinaryControlWordIterator<I, u16> {
1589    fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1590        let next = self.repdef.next()?;
1591        let control_word: u16 =
1592            ((next.0 & self.rep_mask) << self.def_width) + (next.1 & self.def_mask);
1593        let control_word = control_word.to_le_bytes();
1594        buf.push(control_word[0]);
1595        buf.push(control_word[1]);
1596        let is_new_row = next.0 == self.max_rep;
1597        let is_visible = next.1 <= self.max_visible_def;
1598        let is_valid_item = next.1 == 0;
1599        Some(ControlWordDesc {
1600            is_new_row,
1601            is_visible,
1602            is_valid_item,
1603        })
1604    }
1605}
1606
1607impl<I: Iterator<Item = (u16, u16)>> BinaryControlWordIterator<I, u32> {
1608    fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1609        let next = self.repdef.next()?;
1610        let control_word: u32 = (((next.0 & self.rep_mask) as u32) << self.def_width)
1611            + ((next.1 & self.def_mask) as u32);
1612        let control_word = control_word.to_le_bytes();
1613        buf.push(control_word[0]);
1614        buf.push(control_word[1]);
1615        buf.push(control_word[2]);
1616        buf.push(control_word[3]);
1617        let is_new_row = next.0 == self.max_rep;
1618        let is_visible = next.1 <= self.max_visible_def;
1619        let is_valid_item = next.1 == 0;
1620        Some(ControlWordDesc {
1621            is_new_row,
1622            is_visible,
1623            is_valid_item,
1624        })
1625    }
1626}
1627
1628/// A [`ControlWordIterator`] when there are only definition levels or only repetition levels
1629#[derive(Debug)]
1630pub struct UnaryControlWordIterator<I: Iterator<Item = u16>, W> {
1631    repdef: I,
1632    level_mask: u16,
1633    bits_rep: u8,
1634    bits_def: u8,
1635    max_rep: u16,
1636    phantom: std::marker::PhantomData<W>,
1637}
1638
1639impl<I: Iterator<Item = u16>> UnaryControlWordIterator<I, u8> {
1640    fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1641        let next = self.repdef.next()?;
1642        buf.push((next & self.level_mask) as u8);
1643        let is_new_row = self.max_rep == 0 || next == self.max_rep;
1644        let is_valid_item = next == 0 || self.bits_def == 0;
1645        Some(ControlWordDesc {
1646            is_new_row,
1647            // Either there is no rep, in which case there are no invisible items
1648            // or there is no def, in which case there are no invisible items
1649            is_visible: true,
1650            is_valid_item,
1651        })
1652    }
1653}
1654
1655impl<I: Iterator<Item = u16>> UnaryControlWordIterator<I, u16> {
1656    fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1657        let next = self.repdef.next().unwrap() & self.level_mask;
1658        let control_word = next.to_le_bytes();
1659        buf.push(control_word[0]);
1660        buf.push(control_word[1]);
1661        let is_new_row = self.max_rep == 0 || next == self.max_rep;
1662        let is_valid_item = next == 0 || self.bits_def == 0;
1663        Some(ControlWordDesc {
1664            is_new_row,
1665            is_visible: true,
1666            is_valid_item,
1667        })
1668    }
1669}
1670
1671impl<I: Iterator<Item = u16>> UnaryControlWordIterator<I, u32> {
1672    fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1673        let next = self.repdef.next()?;
1674        let next = (next & self.level_mask) as u32;
1675        let control_word = next.to_le_bytes();
1676        buf.push(control_word[0]);
1677        buf.push(control_word[1]);
1678        buf.push(control_word[2]);
1679        buf.push(control_word[3]);
1680        let is_new_row = self.max_rep == 0 || next as u16 == self.max_rep;
1681        let is_valid_item = next == 0 || self.bits_def == 0;
1682        Some(ControlWordDesc {
1683            is_new_row,
1684            is_visible: true,
1685            is_valid_item,
1686        })
1687    }
1688}
1689
1690/// A [`ControlWordIterator`] when there are no repetition or definition levels
1691#[derive(Debug)]
1692pub struct NilaryControlWordIterator {
1693    len: usize,
1694    idx: usize,
1695}
1696
1697impl NilaryControlWordIterator {
1698    fn append_next(&mut self) -> Option<ControlWordDesc> {
1699        if self.idx == self.len {
1700            None
1701        } else {
1702            self.idx += 1;
1703            Some(ControlWordDesc {
1704                is_new_row: true,
1705                is_visible: true,
1706                is_valid_item: true,
1707            })
1708        }
1709    }
1710}
1711
1712/// Helper function to get a bit mask of the given width
1713fn get_mask(width: u16) -> u16 {
1714    (1 << width) - 1
1715}
1716
1717// We're really going out of our way to avoid boxing here but this will be called on a per-value basis
1718// so it is in the critical path.
1719type SpecificBinaryControlWordIterator<'a, T> = BinaryControlWordIterator<
1720    Zip<Copied<std::slice::Iter<'a, u16>>, Copied<std::slice::Iter<'a, u16>>>,
1721    T,
1722>;
1723
1724/// An iterator that generates control words from repetition and definition levels
1725///
1726/// "Control word" is just a fancy term for a single u8/u16/u32 that contains both
1727/// the repetition and definition in it.
1728///
1729/// In the large majority of case we only need a single byte to represent both the
1730/// repetition and definition levels.  However, if there is deep nesting then we may
1731/// need two bytes.  In the worst case we need 4 bytes though this suggests hundreds of
1732/// levels of nesting which seems unlikely to encounter in practice.
1733#[derive(Debug)]
1734pub enum ControlWordIterator<'a> {
1735    Binary8(SpecificBinaryControlWordIterator<'a, u8>),
1736    Binary16(SpecificBinaryControlWordIterator<'a, u16>),
1737    Binary32(SpecificBinaryControlWordIterator<'a, u32>),
1738    Unary8(UnaryControlWordIterator<Copied<std::slice::Iter<'a, u16>>, u8>),
1739    Unary16(UnaryControlWordIterator<Copied<std::slice::Iter<'a, u16>>, u16>),
1740    Unary32(UnaryControlWordIterator<Copied<std::slice::Iter<'a, u16>>, u32>),
1741    Nilary(NilaryControlWordIterator),
1742}
1743
1744/// Describes the properties of a control word
1745#[derive(Debug)]
1746pub struct ControlWordDesc {
1747    pub is_new_row: bool,
1748    pub is_visible: bool,
1749    pub is_valid_item: bool,
1750}
1751
1752impl ControlWordIterator<'_> {
1753    /// Appends the next control word to the buffer
1754    ///
1755    /// Returns true if this is the start of a new item (i.e. the repetition level is maxed out)
1756    pub fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1757        match self {
1758            Self::Binary8(iter) => iter.append_next(buf),
1759            Self::Binary16(iter) => iter.append_next(buf),
1760            Self::Binary32(iter) => iter.append_next(buf),
1761            Self::Unary8(iter) => iter.append_next(buf),
1762            Self::Unary16(iter) => iter.append_next(buf),
1763            Self::Unary32(iter) => iter.append_next(buf),
1764            Self::Nilary(iter) => iter.append_next(),
1765        }
1766    }
1767
1768    /// Return true if the control word iterator has repetition levels
1769    pub fn has_repetition(&self) -> bool {
1770        match self {
1771            Self::Binary8(_) | Self::Binary16(_) | Self::Binary32(_) => true,
1772            Self::Unary8(iter) => iter.bits_rep > 0,
1773            Self::Unary16(iter) => iter.bits_rep > 0,
1774            Self::Unary32(iter) => iter.bits_rep > 0,
1775            Self::Nilary(_) => false,
1776        }
1777    }
1778
1779    /// Returns the number of bytes per control word
1780    pub fn bytes_per_word(&self) -> usize {
1781        match self {
1782            Self::Binary8(_) => 1,
1783            Self::Binary16(_) => 2,
1784            Self::Binary32(_) => 4,
1785            Self::Unary8(_) => 1,
1786            Self::Unary16(_) => 2,
1787            Self::Unary32(_) => 4,
1788            Self::Nilary(_) => 0,
1789        }
1790    }
1791
1792    /// Returns the number of bits used for the repetition level
1793    pub fn bits_rep(&self) -> u8 {
1794        match self {
1795            Self::Binary8(iter) => iter.bits_rep,
1796            Self::Binary16(iter) => iter.bits_rep,
1797            Self::Binary32(iter) => iter.bits_rep,
1798            Self::Unary8(iter) => iter.bits_rep,
1799            Self::Unary16(iter) => iter.bits_rep,
1800            Self::Unary32(iter) => iter.bits_rep,
1801            Self::Nilary(_) => 0,
1802        }
1803    }
1804
1805    /// Returns the number of bits used for the definition level
1806    pub fn bits_def(&self) -> u8 {
1807        match self {
1808            Self::Binary8(iter) => iter.bits_def,
1809            Self::Binary16(iter) => iter.bits_def,
1810            Self::Binary32(iter) => iter.bits_def,
1811            Self::Unary8(iter) => iter.bits_def,
1812            Self::Unary16(iter) => iter.bits_def,
1813            Self::Unary32(iter) => iter.bits_def,
1814            Self::Nilary(_) => 0,
1815        }
1816    }
1817}
1818
1819/// Builds a [`ControlWordIterator`] from repetition and definition levels
1820/// by first calculating the width needed and then creating the iterator
1821/// with the appropriate width
1822pub fn build_control_word_iterator<'a>(
1823    rep: Option<&'a [u16]>,
1824    max_rep: u16,
1825    def: Option<&'a [u16]>,
1826    max_def: u16,
1827    max_visible_def: u16,
1828    len: usize,
1829) -> ControlWordIterator<'a> {
1830    let rep_width = if max_rep == 0 {
1831        0
1832    } else {
1833        log_2_ceil(max_rep as u32) as u16
1834    };
1835    let rep_mask = if max_rep == 0 { 0 } else { get_mask(rep_width) };
1836    let def_width = if max_def == 0 {
1837        0
1838    } else {
1839        log_2_ceil(max_def as u32) as u16
1840    };
1841    let def_mask = if max_def == 0 { 0 } else { get_mask(def_width) };
1842    let total_width = rep_width + def_width;
1843    match (rep, def) {
1844        (Some(rep), Some(def)) => {
1845            let iter = rep.iter().copied().zip(def.iter().copied());
1846            let def_width = def_width as usize;
1847            if total_width <= 8 {
1848                ControlWordIterator::Binary8(BinaryControlWordIterator {
1849                    repdef: iter,
1850                    rep_mask,
1851                    def_mask,
1852                    def_width,
1853                    max_rep,
1854                    max_visible_def,
1855                    bits_rep: rep_width as u8,
1856                    bits_def: def_width as u8,
1857                    phantom: std::marker::PhantomData,
1858                })
1859            } else if total_width <= 16 {
1860                ControlWordIterator::Binary16(BinaryControlWordIterator {
1861                    repdef: iter,
1862                    rep_mask,
1863                    def_mask,
1864                    def_width,
1865                    max_rep,
1866                    max_visible_def,
1867                    bits_rep: rep_width as u8,
1868                    bits_def: def_width as u8,
1869                    phantom: std::marker::PhantomData,
1870                })
1871            } else {
1872                ControlWordIterator::Binary32(BinaryControlWordIterator {
1873                    repdef: iter,
1874                    rep_mask,
1875                    def_mask,
1876                    def_width,
1877                    max_rep,
1878                    max_visible_def,
1879                    bits_rep: rep_width as u8,
1880                    bits_def: def_width as u8,
1881                    phantom: std::marker::PhantomData,
1882                })
1883            }
1884        }
1885        (Some(lev), None) => {
1886            let iter = lev.iter().copied();
1887            if total_width <= 8 {
1888                ControlWordIterator::Unary8(UnaryControlWordIterator {
1889                    repdef: iter,
1890                    level_mask: rep_mask,
1891                    bits_rep: total_width as u8,
1892                    bits_def: 0,
1893                    max_rep,
1894                    phantom: std::marker::PhantomData,
1895                })
1896            } else if total_width <= 16 {
1897                ControlWordIterator::Unary16(UnaryControlWordIterator {
1898                    repdef: iter,
1899                    level_mask: rep_mask,
1900                    bits_rep: total_width as u8,
1901                    bits_def: 0,
1902                    max_rep,
1903                    phantom: std::marker::PhantomData,
1904                })
1905            } else {
1906                ControlWordIterator::Unary32(UnaryControlWordIterator {
1907                    repdef: iter,
1908                    level_mask: rep_mask,
1909                    bits_rep: total_width as u8,
1910                    bits_def: 0,
1911                    max_rep,
1912                    phantom: std::marker::PhantomData,
1913                })
1914            }
1915        }
1916        (None, Some(lev)) => {
1917            let iter = lev.iter().copied();
1918            if total_width <= 8 {
1919                ControlWordIterator::Unary8(UnaryControlWordIterator {
1920                    repdef: iter,
1921                    level_mask: def_mask,
1922                    bits_rep: 0,
1923                    bits_def: total_width as u8,
1924                    max_rep: 0,
1925                    phantom: std::marker::PhantomData,
1926                })
1927            } else if total_width <= 16 {
1928                ControlWordIterator::Unary16(UnaryControlWordIterator {
1929                    repdef: iter,
1930                    level_mask: def_mask,
1931                    bits_rep: 0,
1932                    bits_def: total_width as u8,
1933                    max_rep: 0,
1934                    phantom: std::marker::PhantomData,
1935                })
1936            } else {
1937                ControlWordIterator::Unary32(UnaryControlWordIterator {
1938                    repdef: iter,
1939                    level_mask: def_mask,
1940                    bits_rep: 0,
1941                    bits_def: total_width as u8,
1942                    max_rep: 0,
1943                    phantom: std::marker::PhantomData,
1944                })
1945            }
1946        }
1947        (None, None) => ControlWordIterator::Nilary(NilaryControlWordIterator { len, idx: 0 }),
1948    }
1949}
1950
1951/// A parser to unwrap control words into repetition and definition levels
1952///
1953/// This is the inverse of the [`ControlWordIterator`].
1954#[derive(Copy, Clone, Debug)]
1955pub enum ControlWordParser {
1956    // First item is the bits to shift, second is the mask to apply (the mask can be
1957    // calculated from the bits to shift but we don't want to calculate it each time)
1958    BOTH8(u8, u32),
1959    BOTH16(u8, u32),
1960    BOTH32(u8, u32),
1961    REP8,
1962    REP16,
1963    REP32,
1964    DEF8,
1965    DEF16,
1966    DEF32,
1967    NIL,
1968}
1969
1970impl ControlWordParser {
1971    fn parse_both<const WORD_SIZE: u8>(
1972        src: &[u8],
1973        dst_rep: &mut Vec<u16>,
1974        dst_def: &mut Vec<u16>,
1975        bits_to_shift: u8,
1976        mask_to_apply: u32,
1977    ) {
1978        match WORD_SIZE {
1979            1 => {
1980                let word = src[0];
1981                let rep = word >> bits_to_shift;
1982                let def = word & (mask_to_apply as u8);
1983                dst_rep.push(rep as u16);
1984                dst_def.push(def as u16);
1985            }
1986            2 => {
1987                let word = u16::from_le_bytes([src[0], src[1]]);
1988                let rep = word >> bits_to_shift;
1989                let def = word & mask_to_apply as u16;
1990                dst_rep.push(rep);
1991                dst_def.push(def);
1992            }
1993            4 => {
1994                let word = u32::from_le_bytes([src[0], src[1], src[2], src[3]]);
1995                let rep = word >> bits_to_shift;
1996                let def = word & mask_to_apply;
1997                dst_rep.push(rep as u16);
1998                dst_def.push(def as u16);
1999            }
2000            _ => unreachable!(),
2001        }
2002    }
2003
2004    fn parse_desc_both<const WORD_SIZE: u8>(
2005        src: &[u8],
2006        bits_to_shift: u8,
2007        mask_to_apply: u32,
2008        max_rep: u16,
2009        max_visible_def: u16,
2010    ) -> ControlWordDesc {
2011        match WORD_SIZE {
2012            1 => {
2013                let word = src[0];
2014                let rep = word >> bits_to_shift;
2015                let def = word & (mask_to_apply as u8);
2016                let is_visible = def as u16 <= max_visible_def;
2017                let is_new_row = rep as u16 == max_rep;
2018                let is_valid_item = def == 0;
2019                ControlWordDesc {
2020                    is_visible,
2021                    is_new_row,
2022                    is_valid_item,
2023                }
2024            }
2025            2 => {
2026                let word = u16::from_le_bytes([src[0], src[1]]);
2027                let rep = word >> bits_to_shift;
2028                let def = word & mask_to_apply as u16;
2029                let is_visible = def <= max_visible_def;
2030                let is_new_row = rep == max_rep;
2031                let is_valid_item = def == 0;
2032                ControlWordDesc {
2033                    is_visible,
2034                    is_new_row,
2035                    is_valid_item,
2036                }
2037            }
2038            4 => {
2039                let word = u32::from_le_bytes([src[0], src[1], src[2], src[3]]);
2040                let rep = word >> bits_to_shift;
2041                let def = word & mask_to_apply;
2042                let is_visible = def as u16 <= max_visible_def;
2043                let is_new_row = rep as u16 == max_rep;
2044                let is_valid_item = def == 0;
2045                ControlWordDesc {
2046                    is_visible,
2047                    is_new_row,
2048                    is_valid_item,
2049                }
2050            }
2051            _ => unreachable!(),
2052        }
2053    }
2054
2055    fn parse_one<const WORD_SIZE: u8>(src: &[u8], dst: &mut Vec<u16>) {
2056        match WORD_SIZE {
2057            1 => {
2058                let word = src[0];
2059                dst.push(word as u16);
2060            }
2061            2 => {
2062                let word = u16::from_le_bytes([src[0], src[1]]);
2063                dst.push(word);
2064            }
2065            4 => {
2066                let word = u32::from_le_bytes([src[0], src[1], src[2], src[3]]);
2067                dst.push(word as u16);
2068            }
2069            _ => unreachable!(),
2070        }
2071    }
2072
2073    fn parse_rep_desc_one<const WORD_SIZE: u8>(src: &[u8], max_rep: u16) -> ControlWordDesc {
2074        match WORD_SIZE {
2075            1 => ControlWordDesc {
2076                is_new_row: src[0] as u16 == max_rep,
2077                is_visible: true,
2078                is_valid_item: true,
2079            },
2080            2 => ControlWordDesc {
2081                is_new_row: u16::from_le_bytes([src[0], src[1]]) == max_rep,
2082                is_visible: true,
2083                is_valid_item: true,
2084            },
2085            4 => ControlWordDesc {
2086                is_new_row: u32::from_le_bytes([src[0], src[1], src[2], src[3]]) as u16 == max_rep,
2087                is_visible: true,
2088                is_valid_item: true,
2089            },
2090            _ => unreachable!(),
2091        }
2092    }
2093
2094    fn parse_def_desc_one<const WORD_SIZE: u8>(src: &[u8]) -> ControlWordDesc {
2095        match WORD_SIZE {
2096            1 => ControlWordDesc {
2097                is_new_row: true,
2098                is_visible: true,
2099                is_valid_item: src[0] == 0,
2100            },
2101            2 => ControlWordDesc {
2102                is_new_row: true,
2103                is_visible: true,
2104                is_valid_item: u16::from_le_bytes([src[0], src[1]]) == 0,
2105            },
2106            4 => ControlWordDesc {
2107                is_new_row: true,
2108                is_visible: true,
2109                is_valid_item: u32::from_le_bytes([src[0], src[1], src[2], src[3]]) as u16 == 0,
2110            },
2111            _ => unreachable!(),
2112        }
2113    }
2114
2115    /// Returns the number of bytes per control word
2116    pub fn bytes_per_word(&self) -> usize {
2117        match self {
2118            Self::BOTH8(..) => 1,
2119            Self::BOTH16(..) => 2,
2120            Self::BOTH32(..) => 4,
2121            Self::REP8 => 1,
2122            Self::REP16 => 2,
2123            Self::REP32 => 4,
2124            Self::DEF8 => 1,
2125            Self::DEF16 => 2,
2126            Self::DEF32 => 4,
2127            Self::NIL => 0,
2128        }
2129    }
2130
2131    /// Appends the next control word to the rep & def buffers
2132    ///
2133    /// `src` should be pointing at the first byte (little endian) of the control word
2134    ///
2135    /// `dst_rep` and `dst_def` are the buffers to append the rep and def levels to.
2136    /// They will not be appended to if not needed.
2137    pub fn parse(&self, src: &[u8], dst_rep: &mut Vec<u16>, dst_def: &mut Vec<u16>) {
2138        match self {
2139            Self::BOTH8(bits_to_shift, mask_to_apply) => {
2140                Self::parse_both::<1>(src, dst_rep, dst_def, *bits_to_shift, *mask_to_apply)
2141            }
2142            Self::BOTH16(bits_to_shift, mask_to_apply) => {
2143                Self::parse_both::<2>(src, dst_rep, dst_def, *bits_to_shift, *mask_to_apply)
2144            }
2145            Self::BOTH32(bits_to_shift, mask_to_apply) => {
2146                Self::parse_both::<4>(src, dst_rep, dst_def, *bits_to_shift, *mask_to_apply)
2147            }
2148            Self::REP8 => Self::parse_one::<1>(src, dst_rep),
2149            Self::REP16 => Self::parse_one::<2>(src, dst_rep),
2150            Self::REP32 => Self::parse_one::<4>(src, dst_rep),
2151            Self::DEF8 => Self::parse_one::<1>(src, dst_def),
2152            Self::DEF16 => Self::parse_one::<2>(src, dst_def),
2153            Self::DEF32 => Self::parse_one::<4>(src, dst_def),
2154            Self::NIL => {}
2155        }
2156    }
2157
2158    /// Return true if the control words contain repetition information
2159    pub fn has_rep(&self) -> bool {
2160        match self {
2161            Self::BOTH8(..)
2162            | Self::BOTH16(..)
2163            | Self::BOTH32(..)
2164            | Self::REP8
2165            | Self::REP16
2166            | Self::REP32 => true,
2167            Self::DEF8 | Self::DEF16 | Self::DEF32 | Self::NIL => false,
2168        }
2169    }
2170
2171    /// Temporarily parses the control word to inspect its properties but does not append to any buffers
2172    pub fn parse_desc(&self, src: &[u8], max_rep: u16, max_visible_def: u16) -> ControlWordDesc {
2173        match self {
2174            Self::BOTH8(bits_to_shift, mask_to_apply) => Self::parse_desc_both::<1>(
2175                src,
2176                *bits_to_shift,
2177                *mask_to_apply,
2178                max_rep,
2179                max_visible_def,
2180            ),
2181            Self::BOTH16(bits_to_shift, mask_to_apply) => Self::parse_desc_both::<2>(
2182                src,
2183                *bits_to_shift,
2184                *mask_to_apply,
2185                max_rep,
2186                max_visible_def,
2187            ),
2188            Self::BOTH32(bits_to_shift, mask_to_apply) => Self::parse_desc_both::<4>(
2189                src,
2190                *bits_to_shift,
2191                *mask_to_apply,
2192                max_rep,
2193                max_visible_def,
2194            ),
2195            Self::REP8 => Self::parse_rep_desc_one::<1>(src, max_rep),
2196            Self::REP16 => Self::parse_rep_desc_one::<2>(src, max_rep),
2197            Self::REP32 => Self::parse_rep_desc_one::<4>(src, max_rep),
2198            Self::DEF8 => Self::parse_def_desc_one::<1>(src),
2199            Self::DEF16 => Self::parse_def_desc_one::<2>(src),
2200            Self::DEF32 => Self::parse_def_desc_one::<4>(src),
2201            Self::NIL => ControlWordDesc {
2202                is_new_row: true,
2203                is_valid_item: true,
2204                is_visible: true,
2205            },
2206        }
2207    }
2208
2209    /// Creates a new parser from the number of bits used for the repetition and definition levels
2210    pub fn new(bits_rep: u8, bits_def: u8) -> Self {
2211        let total_bits = bits_rep + bits_def;
2212
2213        enum WordSize {
2214            One,
2215            Two,
2216            Four,
2217        }
2218
2219        let word_size = if total_bits <= 8 {
2220            WordSize::One
2221        } else if total_bits <= 16 {
2222            WordSize::Two
2223        } else {
2224            WordSize::Four
2225        };
2226
2227        match (bits_rep > 0, bits_def > 0, word_size) {
2228            (false, false, _) => Self::NIL,
2229            (false, true, WordSize::One) => Self::DEF8,
2230            (false, true, WordSize::Two) => Self::DEF16,
2231            (false, true, WordSize::Four) => Self::DEF32,
2232            (true, false, WordSize::One) => Self::REP8,
2233            (true, false, WordSize::Two) => Self::REP16,
2234            (true, false, WordSize::Four) => Self::REP32,
2235            (true, true, WordSize::One) => Self::BOTH8(bits_def, get_mask(bits_def as u16) as u32),
2236            (true, true, WordSize::Two) => Self::BOTH16(bits_def, get_mask(bits_def as u16) as u32),
2237            (true, true, WordSize::Four) => {
2238                Self::BOTH32(bits_def, get_mask(bits_def as u16) as u32)
2239            }
2240        }
2241    }
2242}
2243
2244#[cfg(test)]
2245mod tests {
2246    use arrow_buffer::{NullBuffer, OffsetBuffer, ScalarBuffer};
2247
2248    use crate::repdef::{
2249        CompositeRepDefUnraveler, DefinitionInterpretation, RepDefUnraveler, SerializedRepDefs,
2250    };
2251
2252    use super::RepDefBuilder;
2253
2254    fn validity(values: &[bool]) -> NullBuffer {
2255        NullBuffer::from_iter(values.iter().copied())
2256    }
2257
2258    fn offsets_32(values: &[i32]) -> OffsetBuffer<i32> {
2259        OffsetBuffer::<i32>::new(ScalarBuffer::from_iter(values.iter().copied()))
2260    }
2261
2262    fn offsets_64(values: &[i64]) -> OffsetBuffer<i64> {
2263        OffsetBuffer::<i64>::new(ScalarBuffer::from_iter(values.iter().copied()))
2264    }
2265
2266    #[test]
2267    fn test_repdef_empty_offsets() {
2268        // Empty offsets should serialize without panicking.
2269        let mut builder = RepDefBuilder::default();
2270        builder.add_offsets(offsets_32(&[0]), None);
2271        let repdefs = RepDefBuilder::serialize(vec![builder]);
2272        assert!(repdefs.repetition_levels.is_none());
2273        assert!(repdefs.definition_levels.is_none());
2274    }
2275
2276    #[test]
2277    fn test_repdef_basic() {
2278        // Basic case, rep & def
2279        let mut builder = RepDefBuilder::default();
2280        builder.add_offsets(
2281            offsets_64(&[0, 2, 2, 5]),
2282            Some(validity(&[true, false, true])),
2283        );
2284        builder.add_offsets(
2285            offsets_64(&[0, 1, 3, 5, 5, 9]),
2286            Some(validity(&[true, true, true, false, true])),
2287        );
2288        builder.add_validity_bitmap(validity(&[
2289            true, true, true, false, false, false, true, true, false,
2290        ]));
2291
2292        let repdefs = RepDefBuilder::serialize(vec![builder]);
2293        let rep = repdefs.repetition_levels.unwrap();
2294        let def = repdefs.definition_levels.unwrap();
2295
2296        assert_eq!(vec![0, 0, 0, 3, 1, 1, 2, 1, 0, 0, 1], *def);
2297        assert_eq!(vec![2, 1, 0, 2, 2, 0, 1, 1, 0, 0, 0], *rep);
2298
2299        // [[I], [I, I]], NULL, [[NULL, NULL], NULL, [NULL, I, I, NULL]]
2300
2301        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2302            Some(rep.as_ref().to_vec()),
2303            Some(def.as_ref().to_vec()),
2304            repdefs.def_meaning.into(),
2305            9,
2306        )]);
2307
2308        // Note: validity doesn't exactly round-trip because repdef normalizes some of the
2309        // redundant validity values
2310        assert_eq!(
2311            unraveler.unravel_validity(9),
2312            Some(validity(&[
2313                true, true, true, false, false, false, true, true, false
2314            ]))
2315        );
2316        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2317        assert_eq!(off.inner(), offsets_32(&[0, 1, 3, 5, 5, 9]).inner());
2318        assert_eq!(val, Some(validity(&[true, true, true, false, true])));
2319        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2320        assert_eq!(off.inner(), offsets_32(&[0, 2, 2, 5]).inner());
2321        assert_eq!(val, Some(validity(&[true, false, true])));
2322    }
2323
2324    #[test]
2325    fn test_repdef_simple_null_empty_list() {
2326        let check = |repdefs: SerializedRepDefs, last_def: DefinitionInterpretation| {
2327            let rep = repdefs.repetition_levels.unwrap();
2328            let def = repdefs.definition_levels.unwrap();
2329
2330            assert_eq!([1, 0, 1, 1, 0, 0], *rep);
2331            assert_eq!([0, 0, 2, 0, 1, 0], *def);
2332            assert_eq!(
2333                vec![DefinitionInterpretation::NullableItem, last_def,],
2334                repdefs.def_meaning
2335            );
2336        };
2337
2338        // Null list and empty list should be serialized mostly the same
2339
2340        // Null case
2341        let mut builder = RepDefBuilder::default();
2342        builder.add_offsets(
2343            offsets_32(&[0, 2, 2, 5]),
2344            Some(validity(&[true, false, true])),
2345        );
2346        builder.add_validity_bitmap(validity(&[true, true, true, false, true]));
2347
2348        let repdefs = RepDefBuilder::serialize(vec![builder]);
2349
2350        check(repdefs, DefinitionInterpretation::NullableList);
2351
2352        // Empty case
2353        let mut builder = RepDefBuilder::default();
2354        builder.add_offsets(offsets_32(&[0, 2, 2, 5]), None);
2355        builder.add_validity_bitmap(validity(&[true, true, true, false, true]));
2356
2357        let repdefs = RepDefBuilder::serialize(vec![builder]);
2358
2359        check(repdefs, DefinitionInterpretation::EmptyableList);
2360    }
2361
2362    #[test]
2363    fn test_repdef_empty_list_at_end() {
2364        // Regresses a failure we encountered when the last item was an empty list
2365        let mut builder = RepDefBuilder::default();
2366        builder.add_offsets(offsets_32(&[0, 2, 5, 5]), None);
2367        builder.add_validity_bitmap(validity(&[true, true, true, false, true]));
2368
2369        let repdefs = RepDefBuilder::serialize(vec![builder]);
2370
2371        let rep = repdefs.repetition_levels.unwrap();
2372        let def = repdefs.definition_levels.unwrap();
2373
2374        assert_eq!([1, 0, 1, 0, 0, 1], *rep);
2375        assert_eq!([0, 0, 0, 1, 0, 2], *def);
2376        assert_eq!(
2377            vec![
2378                DefinitionInterpretation::NullableItem,
2379                DefinitionInterpretation::EmptyableList,
2380            ],
2381            repdefs.def_meaning
2382        );
2383    }
2384
2385    #[test]
2386    fn test_repdef_abnormal_nulls() {
2387        // List nulls are allowed to have non-empty offsets and garbage values
2388        // and the add_offsets call should normalize this
2389        let mut builder = RepDefBuilder::default();
2390        builder.add_offsets(
2391            offsets_32(&[0, 2, 5, 8]),
2392            Some(validity(&[true, false, true])),
2393        );
2394        // Note: we pass 5 here and not 8.  If add_offsets tells us there is garbage nulls they
2395        // should be removed before continuing
2396        builder.add_no_null(5);
2397
2398        let repdefs = RepDefBuilder::serialize(vec![builder]);
2399
2400        let rep = repdefs.repetition_levels.unwrap();
2401        let def = repdefs.definition_levels.unwrap();
2402
2403        assert_eq!([1, 0, 1, 1, 0, 0], *rep);
2404        assert_eq!([0, 0, 1, 0, 0, 0], *def);
2405
2406        assert_eq!(
2407            vec![
2408                DefinitionInterpretation::AllValidItem,
2409                DefinitionInterpretation::NullableList,
2410            ],
2411            repdefs.def_meaning
2412        );
2413    }
2414
2415    #[test]
2416    fn test_repdef_fsl() {
2417        let mut builder = RepDefBuilder::default();
2418        builder.add_fsl(Some(validity(&[true, false])), 2, 2);
2419        builder.add_fsl(None, 2, 4);
2420        builder.add_validity_bitmap(validity(&[
2421            true, false, true, false, true, false, true, false,
2422        ]));
2423
2424        let repdefs = RepDefBuilder::serialize(vec![builder]);
2425
2426        assert_eq!(
2427            vec![
2428                DefinitionInterpretation::NullableItem,
2429                DefinitionInterpretation::AllValidItem,
2430                DefinitionInterpretation::NullableItem
2431            ],
2432            repdefs.def_meaning
2433        );
2434
2435        assert!(repdefs.repetition_levels.is_none());
2436
2437        let def = repdefs.definition_levels.unwrap();
2438
2439        assert_eq!([0, 1, 0, 1, 2, 2, 2, 2], *def);
2440
2441        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2442            None,
2443            Some(def.as_ref().to_vec()),
2444            repdefs.def_meaning.into(),
2445            8,
2446        )]);
2447
2448        assert_eq!(
2449            unraveler.unravel_validity(8),
2450            Some(validity(&[
2451                true, false, true, false, false, false, false, false
2452            ]))
2453        );
2454        assert_eq!(unraveler.unravel_fsl_validity(4, 2), None);
2455        assert_eq!(
2456            unraveler.unravel_fsl_validity(2, 2),
2457            Some(validity(&[true, false]))
2458        );
2459    }
2460
2461    #[test]
2462    fn test_repdef_fsl_allvalid_item() {
2463        let mut builder = RepDefBuilder::default();
2464        builder.add_fsl(Some(validity(&[true, false])), 2, 2);
2465        builder.add_fsl(None, 2, 4);
2466        builder.add_no_null(8);
2467
2468        let repdefs = RepDefBuilder::serialize(vec![builder]);
2469
2470        assert_eq!(
2471            vec![
2472                DefinitionInterpretation::AllValidItem,
2473                DefinitionInterpretation::AllValidItem,
2474                DefinitionInterpretation::NullableItem
2475            ],
2476            repdefs.def_meaning
2477        );
2478
2479        assert!(repdefs.repetition_levels.is_none());
2480
2481        let def = repdefs.definition_levels.unwrap();
2482
2483        assert_eq!([0, 0, 0, 0, 1, 1, 1, 1], *def);
2484
2485        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2486            None,
2487            Some(def.as_ref().to_vec()),
2488            repdefs.def_meaning.into(),
2489            8,
2490        )]);
2491
2492        assert_eq!(unraveler.unravel_validity(8), None);
2493        assert_eq!(unraveler.unravel_fsl_validity(4, 2), None);
2494        assert_eq!(
2495            unraveler.unravel_fsl_validity(2, 2),
2496            Some(validity(&[true, false]))
2497        );
2498    }
2499
2500    #[test]
2501    fn test_repdef_sliced_offsets() {
2502        // Sliced lists may have offsets that don't start with zero.  The
2503        // add_offsets call needs to normalize these to operate correctly.
2504        let mut builder = RepDefBuilder::default();
2505        builder.add_offsets(
2506            offsets_32(&[5, 7, 7, 10]),
2507            Some(validity(&[true, false, true])),
2508        );
2509        builder.add_no_null(5);
2510
2511        let repdefs = RepDefBuilder::serialize(vec![builder]);
2512
2513        let rep = repdefs.repetition_levels.unwrap();
2514        let def = repdefs.definition_levels.unwrap();
2515
2516        assert_eq!([1, 0, 1, 1, 0, 0], *rep);
2517        assert_eq!([0, 0, 1, 0, 0, 0], *def);
2518
2519        assert_eq!(
2520            vec![
2521                DefinitionInterpretation::AllValidItem,
2522                DefinitionInterpretation::NullableList,
2523            ],
2524            repdefs.def_meaning
2525        );
2526    }
2527
2528    #[test]
2529    fn test_repdef_complex_null_empty() {
2530        let mut builder = RepDefBuilder::default();
2531        builder.add_offsets(
2532            offsets_32(&[0, 4, 4, 4, 6]),
2533            Some(validity(&[true, false, true, true])),
2534        );
2535        builder.add_offsets(
2536            offsets_32(&[0, 1, 1, 2, 2, 2, 3]),
2537            Some(validity(&[true, false, true, false, true, true])),
2538        );
2539        builder.add_no_null(3);
2540
2541        let repdefs = RepDefBuilder::serialize(vec![builder]);
2542
2543        let rep = repdefs.repetition_levels.unwrap();
2544        let def = repdefs.definition_levels.unwrap();
2545
2546        assert_eq!([2, 1, 1, 1, 2, 2, 2, 1], *rep);
2547        assert_eq!([0, 1, 0, 1, 3, 4, 2, 0], *def);
2548    }
2549
2550    #[test]
2551    fn test_repdef_empty_list_no_null() {
2552        // Tests when we have some empty lists but no null lists.  This case
2553        // caused some bugs because we have definition but no nulls
2554        let mut builder = RepDefBuilder::default();
2555        builder.add_offsets(offsets_32(&[0, 4, 4, 4, 6]), None);
2556        builder.add_no_null(6);
2557
2558        let repdefs = RepDefBuilder::serialize(vec![builder]);
2559
2560        let rep = repdefs.repetition_levels.unwrap();
2561        let def = repdefs.definition_levels.unwrap();
2562
2563        assert_eq!([1, 0, 0, 0, 1, 1, 1, 0], *rep);
2564        assert_eq!([0, 0, 0, 0, 1, 1, 0, 0], *def);
2565
2566        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2567            Some(rep.as_ref().to_vec()),
2568            Some(def.as_ref().to_vec()),
2569            repdefs.def_meaning.into(),
2570            8,
2571        )]);
2572
2573        assert_eq!(unraveler.unravel_validity(6), None);
2574        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2575        assert_eq!(off.inner(), offsets_32(&[0, 4, 4, 4, 6]).inner());
2576        assert_eq!(val, None);
2577    }
2578
2579    #[test]
2580    fn test_repdef_all_valid() {
2581        let mut builder = RepDefBuilder::default();
2582        builder.add_offsets(offsets_64(&[0, 2, 3, 5]), None);
2583        builder.add_offsets(offsets_64(&[0, 1, 3, 5, 7, 9]), None);
2584        builder.add_no_null(9);
2585
2586        let repdefs = RepDefBuilder::serialize(vec![builder]);
2587        let rep = repdefs.repetition_levels.unwrap();
2588        assert!(repdefs.definition_levels.is_none());
2589
2590        assert_eq!([2, 1, 0, 2, 0, 2, 0, 1, 0], *rep);
2591
2592        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2593            Some(rep.as_ref().to_vec()),
2594            None,
2595            repdefs.def_meaning.into(),
2596            9,
2597        )]);
2598
2599        assert_eq!(unraveler.unravel_validity(9), None);
2600        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2601        assert_eq!(off.inner(), offsets_32(&[0, 1, 3, 5, 7, 9]).inner());
2602        assert_eq!(val, None);
2603        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2604        assert_eq!(off.inner(), offsets_32(&[0, 2, 3, 5]).inner());
2605        assert_eq!(val, None);
2606    }
2607
2608    #[test]
2609    fn test_only_empty_lists() {
2610        let mut builder = RepDefBuilder::default();
2611        builder.add_offsets(offsets_32(&[0, 4, 4, 4, 6]), None);
2612        builder.add_no_null(6);
2613
2614        let repdefs = RepDefBuilder::serialize(vec![builder]);
2615
2616        let rep = repdefs.repetition_levels.unwrap();
2617        let def = repdefs.definition_levels.unwrap();
2618
2619        assert_eq!([1, 0, 0, 0, 1, 1, 1, 0], *rep);
2620        assert_eq!([0, 0, 0, 0, 1, 1, 0, 0], *def);
2621
2622        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2623            Some(rep.as_ref().to_vec()),
2624            Some(def.as_ref().to_vec()),
2625            repdefs.def_meaning.into(),
2626            8,
2627        )]);
2628
2629        assert_eq!(unraveler.unravel_validity(6), None);
2630        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2631        assert_eq!(off.inner(), offsets_32(&[0, 4, 4, 4, 6]).inner());
2632        assert_eq!(val, None);
2633    }
2634
2635    #[test]
2636    fn test_only_null_lists() {
2637        let mut builder = RepDefBuilder::default();
2638        builder.add_offsets(
2639            offsets_32(&[0, 4, 4, 4, 6]),
2640            Some(validity(&[true, false, false, true])),
2641        );
2642        builder.add_no_null(6);
2643
2644        let repdefs = RepDefBuilder::serialize(vec![builder]);
2645
2646        let rep = repdefs.repetition_levels.unwrap();
2647        let def = repdefs.definition_levels.unwrap();
2648
2649        assert_eq!([1, 0, 0, 0, 1, 1, 1, 0], *rep);
2650        assert_eq!([0, 0, 0, 0, 1, 1, 0, 0], *def);
2651
2652        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2653            Some(rep.as_ref().to_vec()),
2654            Some(def.as_ref().to_vec()),
2655            repdefs.def_meaning.into(),
2656            8,
2657        )]);
2658
2659        assert_eq!(unraveler.unravel_validity(6), None);
2660        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2661        assert_eq!(off.inner(), offsets_32(&[0, 4, 4, 4, 6]).inner());
2662        assert_eq!(val, Some(validity(&[true, false, false, true])));
2663    }
2664
2665    #[test]
2666    fn test_null_and_empty_lists() {
2667        let mut builder = RepDefBuilder::default();
2668        builder.add_offsets(
2669            offsets_32(&[0, 4, 4, 4, 6]),
2670            Some(validity(&[true, false, true, true])),
2671        );
2672        builder.add_no_null(6);
2673
2674        let repdefs = RepDefBuilder::serialize(vec![builder]);
2675
2676        let rep = repdefs.repetition_levels.unwrap();
2677        let def = repdefs.definition_levels.unwrap();
2678
2679        assert_eq!([1, 0, 0, 0, 1, 1, 1, 0], *rep);
2680        assert_eq!([0, 0, 0, 0, 1, 2, 0, 0], *def);
2681
2682        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2683            Some(rep.as_ref().to_vec()),
2684            Some(def.as_ref().to_vec()),
2685            repdefs.def_meaning.into(),
2686            8,
2687        )]);
2688
2689        assert_eq!(unraveler.unravel_validity(6), None);
2690        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2691        assert_eq!(off.inner(), offsets_32(&[0, 4, 4, 4, 6]).inner());
2692        assert_eq!(val, Some(validity(&[true, false, true, true])));
2693    }
2694
2695    #[test]
2696    fn test_repdef_null_struct_valid_list() {
2697        // This regresses a bug
2698
2699        let rep = vec![1, 0, 0, 0];
2700        let def = vec![2, 0, 2, 2];
2701        // AllValidList<NullableStruct<NullableItem>>
2702        let def_meaning = vec![
2703            DefinitionInterpretation::NullableItem,
2704            DefinitionInterpretation::NullableItem,
2705            DefinitionInterpretation::AllValidList,
2706        ];
2707        let num_items = 4;
2708
2709        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2710            Some(rep),
2711            Some(def),
2712            def_meaning.into(),
2713            num_items,
2714        )]);
2715
2716        assert_eq!(
2717            unraveler.unravel_validity(4),
2718            Some(validity(&[false, true, false, false]))
2719        );
2720        assert_eq!(
2721            unraveler.unravel_validity(4),
2722            Some(validity(&[false, true, false, false]))
2723        );
2724        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2725        assert_eq!(off.inner(), offsets_32(&[0, 4]).inner());
2726        assert_eq!(val, None);
2727    }
2728
2729    #[test]
2730    fn test_repdef_no_rep() {
2731        let mut builder = RepDefBuilder::default();
2732        builder.add_no_null(5);
2733        builder.add_validity_bitmap(validity(&[false, false, true, true, true]));
2734        builder.add_validity_bitmap(validity(&[false, true, true, true, false]));
2735
2736        let repdefs = RepDefBuilder::serialize(vec![builder]);
2737        assert!(repdefs.repetition_levels.is_none());
2738        let def = repdefs.definition_levels.unwrap();
2739
2740        assert_eq!([2, 2, 0, 0, 1], *def);
2741
2742        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2743            None,
2744            Some(def.as_ref().to_vec()),
2745            repdefs.def_meaning.into(),
2746            5,
2747        )]);
2748
2749        assert_eq!(
2750            unraveler.unravel_validity(5),
2751            Some(validity(&[false, false, true, true, false]))
2752        );
2753        assert_eq!(
2754            unraveler.unravel_validity(5),
2755            Some(validity(&[false, false, true, true, true]))
2756        );
2757        assert_eq!(unraveler.unravel_validity(5), None);
2758    }
2759
2760    #[test]
2761    fn test_composite_unravel() {
2762        let mut builder = RepDefBuilder::default();
2763        builder.add_offsets(
2764            offsets_64(&[0, 2, 2, 5]),
2765            Some(validity(&[true, false, true])),
2766        );
2767        builder.add_no_null(5);
2768        let repdef1 = RepDefBuilder::serialize(vec![builder]);
2769
2770        let mut builder = RepDefBuilder::default();
2771        builder.add_offsets(offsets_64(&[0, 1, 3, 5, 7, 9]), None);
2772        builder.add_no_null(9);
2773        let repdef2 = RepDefBuilder::serialize(vec![builder]);
2774
2775        let rep1 = repdef1.repetition_levels.clone().unwrap();
2776        let def1 = repdef1.definition_levels.clone().unwrap();
2777        let rep2 = repdef2.repetition_levels.clone().unwrap();
2778        assert!(repdef2.definition_levels.is_none());
2779
2780        assert_eq!([1, 0, 1, 1, 0, 0], *rep1);
2781        assert_eq!([0, 0, 1, 0, 0, 0], *def1);
2782        assert_eq!([1, 1, 0, 1, 0, 1, 0, 1, 0], *rep2);
2783
2784        let unravel1 = RepDefUnraveler::new(
2785            repdef1.repetition_levels.map(|l| l.to_vec()),
2786            repdef1.definition_levels.map(|l| l.to_vec()),
2787            repdef1.def_meaning.into(),
2788            5,
2789        );
2790        let unravel2 = RepDefUnraveler::new(
2791            repdef2.repetition_levels.map(|l| l.to_vec()),
2792            repdef2.definition_levels.map(|l| l.to_vec()),
2793            repdef2.def_meaning.into(),
2794            9,
2795        );
2796
2797        let mut unraveler = CompositeRepDefUnraveler::new(vec![unravel1, unravel2]);
2798
2799        assert!(unraveler.unravel_validity(9).is_none());
2800        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2801        assert_eq!(
2802            off.inner(),
2803            offsets_32(&[0, 2, 2, 5, 6, 8, 10, 12, 14]).inner()
2804        );
2805        assert_eq!(
2806            val,
2807            Some(validity(&[true, false, true, true, true, true, true, true]))
2808        );
2809    }
2810
2811    #[test]
2812    fn test_repdef_multiple_builders() {
2813        // Basic case, rep & def
2814        let mut builder1 = RepDefBuilder::default();
2815        builder1.add_offsets(offsets_64(&[0, 2]), None);
2816        builder1.add_offsets(offsets_64(&[0, 1, 3]), None);
2817        builder1.add_validity_bitmap(validity(&[true, true, true]));
2818
2819        let mut builder2 = RepDefBuilder::default();
2820        builder2.add_offsets(offsets_64(&[0, 0, 3]), Some(validity(&[false, true])));
2821        builder2.add_offsets(
2822            offsets_64(&[0, 2, 2, 6]),
2823            Some(validity(&[true, false, true])),
2824        );
2825        builder2.add_validity_bitmap(validity(&[false, false, false, true, true, false]));
2826
2827        let repdefs = RepDefBuilder::serialize(vec![builder1, builder2]);
2828
2829        let rep = repdefs.repetition_levels.unwrap();
2830        let def = repdefs.definition_levels.unwrap();
2831
2832        assert_eq!([2, 1, 0, 2, 2, 0, 1, 1, 0, 0, 0], *rep);
2833        assert_eq!([0, 0, 0, 3, 1, 1, 2, 1, 0, 0, 1], *def);
2834    }
2835
2836    #[test]
2837    fn test_slicer() {
2838        let mut builder = RepDefBuilder::default();
2839        builder.add_offsets(
2840            offsets_64(&[0, 2, 2, 30, 30]),
2841            Some(validity(&[true, false, true, true])),
2842        );
2843        builder.add_no_null(30);
2844
2845        let repdefs = RepDefBuilder::serialize(vec![builder]);
2846
2847        let mut rep_slicer = repdefs.rep_slicer().unwrap();
2848
2849        // First 5 items include a null list so we get 6 levels (12 bytes)
2850        assert_eq!(rep_slicer.slice_next(5).len(), 12);
2851        // Next 20 are all plain
2852        assert_eq!(rep_slicer.slice_next(20).len(), 40);
2853        // Last 5 include an empty list so we get 6 levels (12 bytes)
2854        assert_eq!(rep_slicer.slice_rest().len(), 12);
2855
2856        let mut def_slicer = repdefs.rep_slicer().unwrap();
2857
2858        // First 5 items include a null list so we get 6 levels (12 bytes)
2859        assert_eq!(def_slicer.slice_next(5).len(), 12);
2860        // Next 20 are all plain
2861        assert_eq!(def_slicer.slice_next(20).len(), 40);
2862        // Last 5 include an empty list so we get 6 levels (12 bytes)
2863        assert_eq!(def_slicer.slice_rest().len(), 12);
2864    }
2865
2866    #[test]
2867    fn test_control_words() {
2868        // Convert to control words, verify expected, convert back, verify same as original
2869        fn check(
2870            rep: &[u16],
2871            def: &[u16],
2872            expected_values: Vec<u8>,
2873            expected_bytes_per_word: usize,
2874            expected_bits_rep: u8,
2875            expected_bits_def: u8,
2876        ) {
2877            let num_vals = rep.len().max(def.len());
2878            let max_rep = rep.iter().max().copied().unwrap_or(0);
2879            let max_def = def.iter().max().copied().unwrap_or(0);
2880
2881            let in_rep = if rep.is_empty() { None } else { Some(rep) };
2882            let in_def = if def.is_empty() { None } else { Some(def) };
2883
2884            let mut iter = super::build_control_word_iterator(
2885                in_rep,
2886                max_rep,
2887                in_def,
2888                max_def,
2889                max_def + 1,
2890                expected_values.len(),
2891            );
2892            assert_eq!(iter.bytes_per_word(), expected_bytes_per_word);
2893            assert_eq!(iter.bits_rep(), expected_bits_rep);
2894            assert_eq!(iter.bits_def(), expected_bits_def);
2895            let mut cw_vec = Vec::with_capacity(num_vals * iter.bytes_per_word());
2896
2897            for _ in 0..num_vals {
2898                iter.append_next(&mut cw_vec);
2899            }
2900            assert!(iter.append_next(&mut cw_vec).is_none());
2901
2902            assert_eq!(expected_values, cw_vec);
2903
2904            let parser = super::ControlWordParser::new(expected_bits_rep, expected_bits_def);
2905
2906            let mut rep_out = Vec::with_capacity(num_vals);
2907            let mut def_out = Vec::with_capacity(num_vals);
2908
2909            if expected_bytes_per_word > 0 {
2910                for slice in cw_vec.chunks_exact(expected_bytes_per_word) {
2911                    parser.parse(slice, &mut rep_out, &mut def_out);
2912                }
2913            }
2914
2915            assert_eq!(rep, rep_out.as_slice());
2916            assert_eq!(def, def_out.as_slice());
2917        }
2918
2919        // Each will need 4 bits and so we should get 1-byte control words
2920        let rep = &[0_u16, 7, 3, 2, 9, 8, 12, 5];
2921        let def = &[5_u16, 3, 1, 2, 12, 15, 0, 2];
2922        let expected = vec![
2923            0b00000101, // 0, 5
2924            0b01110011, // 7, 3
2925            0b00110001, // 3, 1
2926            0b00100010, // 2, 2
2927            0b10011100, // 9, 12
2928            0b10001111, // 8, 15
2929            0b11000000, // 12, 0
2930            0b01010010, // 5, 2
2931        ];
2932        check(rep, def, expected, 1, 4, 4);
2933
2934        // Now we need 5 bits for def so we get 2-byte control words
2935        let rep = &[0_u16, 7, 3, 2, 9, 8, 12, 5];
2936        let def = &[5_u16, 3, 1, 2, 12, 22, 0, 2];
2937        let expected = vec![
2938            0b00000101, 0b00000000, // 0, 5
2939            0b11100011, 0b00000000, // 7, 3
2940            0b01100001, 0b00000000, // 3, 1
2941            0b01000010, 0b00000000, // 2, 2
2942            0b00101100, 0b00000001, // 9, 12
2943            0b00010110, 0b00000001, // 8, 22
2944            0b10000000, 0b00000001, // 12, 0
2945            0b10100010, 0b00000000, // 5, 2
2946        ];
2947        check(rep, def, expected, 2, 4, 5);
2948
2949        // Just rep, 4 bits so 1 byte each
2950        let levels = &[0_u16, 7, 3, 2, 9, 8, 12, 5];
2951        let expected = vec![
2952            0b00000000, // 0
2953            0b00000111, // 7
2954            0b00000011, // 3
2955            0b00000010, // 2
2956            0b00001001, // 9
2957            0b00001000, // 8
2958            0b00001100, // 12
2959            0b00000101, // 5
2960        ];
2961        check(levels, &[], expected.clone(), 1, 4, 0);
2962
2963        // Just def
2964        check(&[], levels, expected, 1, 0, 4);
2965
2966        // No rep, no def, no bytes
2967        check(&[], &[], Vec::default(), 0, 0, 0);
2968    }
2969
2970    #[test]
2971    fn test_control_words_rep_index() {
2972        fn check(
2973            rep: &[u16],
2974            def: &[u16],
2975            expected_new_rows: Vec<bool>,
2976            expected_is_visible: Vec<bool>,
2977        ) {
2978            let num_vals = rep.len().max(def.len());
2979            let max_rep = rep.iter().max().copied().unwrap_or(0);
2980            let max_def = def.iter().max().copied().unwrap_or(0);
2981
2982            let in_rep = if rep.is_empty() { None } else { Some(rep) };
2983            let in_def = if def.is_empty() { None } else { Some(def) };
2984
2985            let mut iter = super::build_control_word_iterator(
2986                in_rep,
2987                max_rep,
2988                in_def,
2989                max_def,
2990                /*max_visible_def=*/ 2,
2991                expected_new_rows.len(),
2992            );
2993
2994            let mut cw_vec = Vec::with_capacity(num_vals * iter.bytes_per_word());
2995            let mut expected_new_rows = expected_new_rows.iter().copied();
2996            let mut expected_is_visible = expected_is_visible.iter().copied();
2997            for _ in 0..expected_new_rows.len() {
2998                let word_desc = iter.append_next(&mut cw_vec).unwrap();
2999                assert_eq!(word_desc.is_new_row, expected_new_rows.next().unwrap());
3000                assert_eq!(word_desc.is_visible, expected_is_visible.next().unwrap());
3001            }
3002            assert!(iter.append_next(&mut cw_vec).is_none());
3003        }
3004
3005        // 2 means new list
3006        let rep = &[2_u16, 1, 0, 2, 2, 0, 1, 1, 0, 2, 0];
3007        // These values don't matter for this test
3008        let def = &[0_u16, 0, 0, 3, 1, 1, 2, 1, 0, 0, 1];
3009
3010        // Rep & def
3011        check(
3012            rep,
3013            def,
3014            vec![
3015                true, false, false, true, true, false, false, false, false, true, false,
3016            ],
3017            vec![
3018                true, true, true, false, true, true, true, true, true, true, true,
3019            ],
3020        );
3021        // Rep only
3022        check(
3023            rep,
3024            &[],
3025            vec![
3026                true, false, false, true, true, false, false, false, false, true, false,
3027            ],
3028            vec![true; 11],
3029        );
3030        // No repetition
3031        check(
3032            &[],
3033            def,
3034            vec![
3035                true, true, true, true, true, true, true, true, true, true, true,
3036            ],
3037            vec![true; 11],
3038        );
3039        // No repetition, no definition
3040        check(
3041            &[],
3042            &[],
3043            vec![
3044                true, true, true, true, true, true, true, true, true, true, true,
3045            ],
3046            vec![true; 11],
3047        );
3048    }
3049
3050    #[test]
3051    fn regress_empty_list_case() {
3052        // This regresses a case where we had 3 null lists inside a struct
3053        let mut builder = RepDefBuilder::default();
3054        builder.add_validity_bitmap(validity(&[true, false, true]));
3055        builder.add_offsets(
3056            offsets_32(&[0, 0, 0, 0]),
3057            Some(validity(&[false, false, false])),
3058        );
3059        builder.add_no_null(0);
3060
3061        let repdefs = RepDefBuilder::serialize(vec![builder]);
3062        let rep = repdefs.repetition_levels.unwrap();
3063        let def = repdefs.definition_levels.unwrap();
3064
3065        assert_eq!([1, 1, 1], *rep);
3066        assert_eq!([1, 2, 1], *def);
3067
3068        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
3069            Some(rep.as_ref().to_vec()),
3070            Some(def.as_ref().to_vec()),
3071            repdefs.def_meaning.into(),
3072            0,
3073        )]);
3074
3075        assert_eq!(unraveler.unravel_validity(0), None);
3076        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
3077        assert_eq!(off.inner(), offsets_32(&[0, 0, 0, 0]).inner());
3078        assert_eq!(val, Some(validity(&[false, false, false])));
3079        let val = unraveler.unravel_validity(3).unwrap();
3080        assert_eq!(val.inner(), validity(&[true, false, true]).inner());
3081    }
3082
3083    #[test]
3084    fn regress_list_ends_null_case() {
3085        let mut builder = RepDefBuilder::default();
3086        builder.add_offsets(
3087            offsets_64(&[0, 1, 2, 2]),
3088            Some(validity(&[true, true, false])),
3089        );
3090        builder.add_offsets(offsets_64(&[0, 1, 1]), Some(validity(&[true, false])));
3091        builder.add_no_null(1);
3092
3093        let repdefs = RepDefBuilder::serialize(vec![builder]);
3094        let rep = repdefs.repetition_levels.unwrap();
3095        let def = repdefs.definition_levels.unwrap();
3096
3097        assert_eq!([2, 2, 2], *rep);
3098        assert_eq!([0, 1, 2], *def);
3099
3100        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
3101            Some(rep.as_ref().to_vec()),
3102            Some(def.as_ref().to_vec()),
3103            repdefs.def_meaning.into(),
3104            1,
3105        )]);
3106
3107        assert_eq!(unraveler.unravel_validity(1), None);
3108        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
3109        assert_eq!(off.inner(), offsets_32(&[0, 1, 1]).inner());
3110        assert_eq!(val, Some(validity(&[true, false])));
3111        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
3112        assert_eq!(off.inner(), offsets_32(&[0, 1, 2, 2]).inner());
3113        assert_eq!(val, Some(validity(&[true, true, false])));
3114    }
3115
3116    #[test]
3117    fn test_mixed_unraveler() {
3118        // This tests cases where the validity is different between two different pages
3119        // because one page has nulls and the other doesn't.
3120
3121        // Simple case with one layer of validity and no repetition
3122        let mut unraveler = CompositeRepDefUnraveler::new(vec![
3123            RepDefUnraveler::new(
3124                None,
3125                Some(vec![0, 1, 0, 1]),
3126                vec![DefinitionInterpretation::NullableItem].into(),
3127                4,
3128            ),
3129            RepDefUnraveler::new(
3130                None,
3131                None,
3132                vec![DefinitionInterpretation::AllValidItem].into(),
3133                4,
3134            ),
3135        ]);
3136
3137        assert_eq!(
3138            unraveler.unravel_validity(8),
3139            Some(validity(&[
3140                true, false, true, false, true, true, true, true
3141            ]))
3142        );
3143
3144        // More complex case with two layers of validity and repetition
3145        let def1 = Some(vec![0, 1, 2]);
3146        let rep1 = Some(vec![1, 0, 1]);
3147
3148        let def2 = Some(vec![1, 0, 0]);
3149        let rep2 = Some(vec![1, 1, 0]);
3150
3151        let mut unraveler = CompositeRepDefUnraveler::new(vec![
3152            RepDefUnraveler::new(
3153                rep1,
3154                def1,
3155                vec![
3156                    DefinitionInterpretation::NullableItem,
3157                    DefinitionInterpretation::EmptyableList,
3158                ]
3159                .into(),
3160                2,
3161            ),
3162            RepDefUnraveler::new(
3163                rep2,
3164                def2,
3165                vec![
3166                    DefinitionInterpretation::AllValidItem,
3167                    DefinitionInterpretation::NullableList,
3168                ]
3169                .into(),
3170                2,
3171            ),
3172        ]);
3173
3174        assert_eq!(
3175            unraveler.unravel_validity(4),
3176            Some(validity(&[true, false, true, true]))
3177        );
3178        assert_eq!(
3179            unraveler.unravel_offsets::<i32>().unwrap(),
3180            (
3181                offsets_32(&[0, 2, 2, 2, 4]),
3182                Some(validity(&[true, true, false, true]))
3183            )
3184        );
3185    }
3186
3187    #[test]
3188    fn test_mixed_unraveler_nullable_without_def_levels() {
3189        // A page can keep nullable layer metadata even when all definition levels are 0
3190        // and no definition buffer needs to be materialized. This should decode as all-valid.
3191        let mut unraveler = CompositeRepDefUnraveler::new(vec![
3192            RepDefUnraveler::new(
3193                None,
3194                Some(vec![0, 1, 0, 1]),
3195                vec![DefinitionInterpretation::NullableItem].into(),
3196                4,
3197            ),
3198            RepDefUnraveler::new(
3199                None,
3200                None,
3201                vec![DefinitionInterpretation::NullableItem].into(),
3202                4,
3203            ),
3204        ]);
3205
3206        assert_eq!(
3207            unraveler.unravel_validity(8),
3208            Some(validity(&[
3209                true, false, true, false, true, true, true, true
3210            ]))
3211        );
3212    }
3213}