Skip to main content

lance_encoding/
repdef.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Utilities for rep-def levels
5//!
6//! Repetition and definition levels are a way to encode multipile validity / offsets arrays
7//! into a single buffer.  They are a form of "zipping" buffers together that takes advantage
8//! of the fact that, if the outermost array is invalid, then the validity of the inner items
9//! is irrelevant.
10//!
11//! Note: the concept of repetition & definition levels comes from the Dremel paper and has
12//! been implemented in Apache Parquet.  However, the implementation here is not necessarily
13//! compatible with Parquet.  For example, we use 0 to represent the "inner-most" item and
14//! Parquet uses 0 to represent the "outer-most" item.
15//!
16//! # Repetition Levels
17//!
18//! With repetition levels we convert a sparse array of offsets into a dense array of levels.
19//! These levels are marked non-zero whenever a new list begins.  In other words, given the
20//! list array with 3 rows [{<0,1>, <>, <2>}, {<3>}, {}], [], [{<4>}] we would have three
21//! offsets arrays:
22//!
23//! Outer-most ([]): [0, 3, 3, 4]
24//! Middle     ({}): [0, 3, 4, 4, 5]
25//! Inner      (<>): [0, 2, 2, 3, 4, 5]
26//! Values         : [0, 1, 2, 3, 4]
27//!
28//! We can convert these into repetition levels as follows:
29//!
30//! | Values | Repetition |
31//! | ------ | ---------- |
32//! |      0 |          3 | // Start of outer-most list
33//! |      1 |          0 | // Continues inner-most list (no new lists)
34//! |      - |          1 | // Start of new inner-most list (empty list)
35//! |      2 |          1 | // Start of new inner-most list
36//! |      3 |          2 | // Start of new middle list
37//! |      - |          2 | // Start of new inner-most list (empty list)
38//! |      - |          3 | // Start of new outer-most list (empty list)
39//! |      4 |          0 | // Start of new outer-most list
40//!
41//! Note: We actually have MORE repetition levels than values.  This is because the repetition
42//! levels need to be able to represent empty lists.
43//!
44//! # Definition Levels
45//!
46//! Definition levels are simpler.  We can think of them as zipping together various validity bitmaps
47//! (from different levels of nesting) into a single buffer.  For example, we could zip the arrays
48//! [1, 1, 0, 0] and [1, 0, 1, 0] into [11, 10, 01, 00].  However, 00 and 01 are redundant.  If the
49//! outer level is null then the validity of the inner levels is irrelevant.  To save space we instead
50//! encode a "level" which is the "depth" of the null.  Let's look at a more complete example:
51//!
52//! Array: [{"middle": {"inner": 1]}}, NULL, {"middle": NULL}, {"middle": {"inner": NULL}}]
53//!
54//! In Arrow we would have the following validity arrays:
55//! Outer validity : 1, 0, 1, 1
56//! Middle validity: 1, ?, 0, 1
57//! Inner validity : 1, ?, ?, 0
58//! Values         : 1, ?, ?, ?
59//!
60//! The ? values are undefined in the Arrow format.  We can convert these into definition levels as follows:
61//!
62//! | Values | Definition |
63//! | ------ | ---------- |
64//! |      1 |          0 | // Valid at all levels
65//! |      - |          3 | // Null at outer level
66//! |      - |          2 | // Null at middle level
67//! |      - |          1 | // Null at inner level
68//!
69//! # Compression
70//!
71//! Note that we only need 2 bits of definition levels to represent 3 levels of nesting.  Definition
72//! levels are always more compact than the input validity arrays.  However, compressed levels are not
73//! necessarily more compact than the compressed validity arrays.
74//!
75//! Repetition levels are more complex.  If there are very large lists then a sparse array of offsets
76//! (which has one element per list) might be more compact than a dense array of repetition levels
77//! (which has one element per list value, possibly even more if there are empty lists).
78//!
79//! However, both repetition levels and definition levels are typically very compressible with RLE.
80//!
81//! However, in Lance we don't always take advantage of that compression because we want to be able
82//! to zip rep-def levels together with our values.  This gives us fewer IOPS when accessing row values.
83//!
84//! # Utilities in this Module
85//!
86//! - `RepDefBuilder` - Extracts validity and offset information from Arrow arrays.  We use this as we
87//!   shred the incoming data into primitive leaf arrays.  We don't immediately convert into rep-def because
88//!   we need to share and cheaply clone the builder when we have structs (each struct child shares some parent
89//!   validity / offset information)  The `serialize` method is called once all data has been received to create
90//!   the final rep-def levels.
91//!
92//! - `SerializerContext` - This is an internal utility that helps with serializing rep-def levels.
93//!
94//! - `CompositeRepDefUnraveler` - This structure is used to reverse the process.  It starts with a set of
95//!   rep-def levels and then uses the `unravel_validity` and `unravel_offsets` methods to produce validity
96//!   buffers and offset buffers.  It is "composite" because we may be combining sets of rep-def buffers from
97//!   multiple locations (e.g. multiple blocks in a mini-block encoded file).
98//!
99//! - `RepDefSlicer` - This is a utility that helps with slicing rep-def buffers.  These buffers are "kind of"
100//!   transparent (maps 1:1 with the values in the array) but not exactly because of the special (empty/null) lists.
101//!   The slicer helps with this issue and is used when slicing a rep-def buffer into mini-blocks.
102//!
103//! - `build_control_word_iterator` - This takes in rep-def levels and returns an iterator that returns byte-padded
104//!   "control words" which are used when creating full-zip encoded data.
105//!
106//! - `ControlWordParser` - This parser can parse the control words returned by `build_control_word_iterator` and is
107//!   used when decoding full-zip encoded data.
108
109use std::{
110    iter::{Copied, Zip},
111    ops::Range,
112    sync::Arc,
113};
114
115use arrow_array::OffsetSizeTrait;
116use arrow_buffer::{
117    ArrowNativeType, BooleanBuffer, BooleanBufferBuilder, NullBuffer, OffsetBuffer, ScalarBuffer,
118};
119use lance_core::{Error, Result, utils::bit::log_2_ceil};
120
121use crate::buffer::LanceBuffer;
122
123pub type LevelBuffer = Vec<u16>;
124
125/// A contiguous top-level-row range that can be encoded as one structural page.
126#[derive(Debug, Clone, PartialEq, Eq)]
127pub(crate) struct StructuralPageSplit {
128    /// Top-level row offset, relative to the original unsplit page.
129    pub(crate) row_start: u64,
130    /// Number of top-level rows in this split.
131    pub(crate) num_rows: u64,
132    /// Rep/def level range, relative to the original unsplit page.
133    pub(crate) level_range: Range<usize>,
134    /// Visible value offset, relative to the original unsplit page.
135    pub(crate) value_start: u64,
136    /// Number of visible values in this split.
137    pub(crate) num_values: u64,
138}
139
140/// Planner result for structural page budget handling.
141#[derive(Debug, Clone, PartialEq, Eq)]
142pub(crate) enum StructuralPagePlan {
143    /// The original page can be encoded as-is.
144    Fits,
145    /// The original page should be split on top-level row boundaries.
146    Split(Vec<StructuralPageSplit>),
147    /// One top-level row is larger than the requested structural page budget.
148    UnsplittableOverBudget(u64),
149}
150
151// As we build def levels we add this to special values to indicate that they
152// are special so that we can skip over them when processing lower levels.
153//
154// We assume 16 bits is good enough for rep-def levels.  This _would_ give
155// us 65536 levels of struct nesting and list nesting.  However, we cut that
156// in half for SPECIAL_THRESHOLD because we use the top bit to indicate if an
157// item is a special value (null list / empty list) during construction.
158//
159// We subtract this off at the end of construction to get the actual definition
160// levels.
161const SPECIAL_THRESHOLD: u16 = u16::MAX / 2;
162
163/// Represents information that we extract from a list array as we are
164/// encoding
165#[derive(Clone, Debug)]
166struct OffsetDesc {
167    offsets: Arc<[i64]>,
168    validity: Option<BooleanBuffer>,
169    has_empty_lists: bool,
170    num_values: usize,
171    num_specials: usize,
172}
173
174/// Represents validity information that we extract from non-list arrays (that
175/// have nulls) as we are encoding
176#[derive(Clone, Debug)]
177struct ValidityDesc {
178    validity: Option<BooleanBuffer>,
179    num_values: usize,
180}
181
182/// Represents validity information that we extract from FSL arrays.  This is
183/// just validity (no offsets) but we also record the dimension of the FSL array
184/// as that will impact the next layer
185#[derive(Clone, Debug)]
186struct FslDesc {
187    validity: Option<BooleanBuffer>,
188    dimension: usize,
189    num_values: usize,
190}
191
192// As we build up rep/def from arrow arrays we record a
193// series of RawRepDef objects.  Each one corresponds to layer
194// in the array structure
195#[derive(Clone, Debug)]
196enum RawRepDef {
197    Offsets(OffsetDesc),
198    Validity(ValidityDesc),
199    Fsl(FslDesc),
200}
201
202impl RawRepDef {
203    // Are there any nulls in this layer
204    fn has_nulls(&self) -> bool {
205        match self {
206            Self::Offsets(OffsetDesc { validity, .. }) => validity.is_some(),
207            Self::Validity(ValidityDesc { validity, .. }) => validity.is_some(),
208            Self::Fsl(FslDesc { validity, .. }) => validity.is_some(),
209        }
210    }
211
212    // How many values are in this layer
213    fn num_values(&self) -> usize {
214        match self {
215            Self::Offsets(OffsetDesc { num_values, .. }) => *num_values,
216            Self::Validity(ValidityDesc { num_values, .. }) => *num_values,
217            Self::Fsl(FslDesc { num_values, .. }) => *num_values,
218        }
219    }
220
221    /// How many empty/null lists are in this layer
222    fn num_specials(&self) -> usize {
223        match self {
224            Self::Offsets(OffsetDesc { num_specials, .. }) => *num_specials,
225            _ => 0,
226        }
227    }
228
229    /// How many definition levels do we need for this layer
230    fn max_def(&self) -> u16 {
231        match self {
232            Self::Offsets(OffsetDesc {
233                has_empty_lists,
234                validity,
235                ..
236            }) => {
237                let mut max_def = 0;
238                if *has_empty_lists {
239                    max_def += 1;
240                }
241                if validity.is_some() {
242                    max_def += 1;
243                }
244                max_def
245            }
246            Self::Validity(ValidityDesc { validity: None, .. }) => 0,
247            Self::Validity(ValidityDesc { .. }) => 1,
248            Self::Fsl(FslDesc { validity: None, .. }) => 0,
249            Self::Fsl(FslDesc { .. }) => 1,
250        }
251    }
252
253    /// How many repetition levels do we need for this layer
254    fn max_rep(&self) -> u16 {
255        match self {
256            Self::Offsets(_) => 1,
257            _ => 0,
258        }
259    }
260}
261
262/// Represents repetition and definition levels that have been
263/// serialized into a pair of (optional) level buffers
264#[derive(Debug)]
265pub struct SerializedRepDefs {
266    /// The repetition levels, one per item
267    ///
268    /// If None, there are no lists
269    pub repetition_levels: Option<Arc<[u16]>>,
270    /// The definition levels, one per item
271    ///
272    /// If None, there are no nulls
273    pub definition_levels: Option<Arc<[u16]>>,
274    /// The meaning of each definition level
275    pub def_meaning: Vec<DefinitionInterpretation>,
276    /// The maximum level that is "visible" from the lowest level
277    ///
278    /// This is the last level before we encounter a list level of some kind.  Once we've
279    /// hit a list level then nulls in any level beyond do not map to actual items.
280    ///
281    /// This is None if there are no lists
282    pub max_visible_level: Option<u16>,
283    has_fsl: bool,
284}
285
286impl SerializedRepDefs {
287    fn max_visible_level(def_meaning: &[DefinitionInterpretation]) -> Option<u16> {
288        let first_list = def_meaning.iter().position(|level| level.is_list());
289        first_list.map(|first_list| {
290            def_meaning
291                .iter()
292                .map(|level| level.num_def_levels())
293                .take(first_list)
294                .sum::<u16>()
295        })
296    }
297
298    pub fn new(
299        repetition_levels: Option<LevelBuffer>,
300        definition_levels: Option<LevelBuffer>,
301        def_meaning: Vec<DefinitionInterpretation>,
302    ) -> Self {
303        Self::new_with_fixed_size_list_levels(
304            repetition_levels,
305            definition_levels,
306            def_meaning,
307            false,
308        )
309    }
310
311    pub(crate) fn new_with_fixed_size_list_levels(
312        repetition_levels: Option<LevelBuffer>,
313        definition_levels: Option<LevelBuffer>,
314        def_meaning: Vec<DefinitionInterpretation>,
315        has_fsl: bool,
316    ) -> Self {
317        let max_visible_level = Self::max_visible_level(&def_meaning);
318        Self {
319            repetition_levels: repetition_levels.map(Arc::from),
320            definition_levels: definition_levels.map(Arc::from),
321            def_meaning,
322            max_visible_level,
323            has_fsl,
324        }
325    }
326
327    /// Creates an empty SerializedRepDefs (no repetition, all valid)
328    pub fn empty(def_meaning: Vec<DefinitionInterpretation>) -> Self {
329        Self {
330            repetition_levels: None,
331            definition_levels: None,
332            def_meaning,
333            max_visible_level: None,
334            has_fsl: false,
335        }
336    }
337
338    pub fn rep_slicer(&self) -> Option<RepDefSlicer<'_>> {
339        self.repetition_levels
340            .as_ref()
341            .map(|rep| RepDefSlicer::new(self, rep.clone()))
342    }
343
344    pub fn def_slicer(&self) -> Option<RepDefSlicer<'_>> {
345        self.definition_levels
346            .as_ref()
347            .map(|def| RepDefSlicer::new(self, def.clone()))
348    }
349
350    pub(crate) fn has_fixed_size_list_levels(&self) -> bool {
351        self.has_fsl
352    }
353}
354
355/// Slices a level buffer into pieces
356///
357/// This is needed to handle the fact that a level buffer may have more
358/// levels than values due to special (empty/null) lists.
359///
360/// As a result, a call to `slice_next(10)` may return 10 levels or it may
361/// return more than 10 levels if any special values are encountered.
362#[derive(Debug)]
363pub struct RepDefSlicer<'a> {
364    repdef: &'a SerializedRepDefs,
365    to_slice: LanceBuffer,
366    current: usize,
367}
368
369// TODO: All of this logic will need some changing when we compress rep/def levels.
370impl<'a> RepDefSlicer<'a> {
371    fn new(repdef: &'a SerializedRepDefs, levels: Arc<[u16]>) -> Self {
372        Self {
373            repdef,
374            to_slice: LanceBuffer::reinterpret_slice(levels),
375            current: 0,
376        }
377    }
378
379    pub fn num_levels(&self) -> usize {
380        self.to_slice.len() / 2
381    }
382
383    pub fn num_levels_remaining(&self) -> usize {
384        self.num_levels() - self.current
385    }
386
387    pub fn all_levels(&self) -> &LanceBuffer {
388        &self.to_slice
389    }
390
391    /// Returns the rest of the levels not yet sliced
392    ///
393    /// This must be called instead of `slice_next` on the final iteration.
394    /// This is because anytime we slice there may be empty/null lists on the
395    /// boundary that are "free" and the current behavior in `slice_next` is to
396    /// leave them for the next call.
397    ///
398    /// `slice_rest` will slice all remaining levels and return them.
399    pub fn slice_rest(&mut self) -> LanceBuffer {
400        let start = self.current;
401        let remaining = self.num_levels_remaining();
402        self.current = self.num_levels();
403        self.to_slice.slice_with_length(start * 2, remaining * 2)
404    }
405
406    /// Returns enough levels to satisfy the next `num_values` values
407    pub fn slice_next(&mut self, num_values: usize) -> LanceBuffer {
408        let start = self.current;
409        let Some(max_visible_level) = self.repdef.max_visible_level else {
410            // No lists, should be 1:1 mapping from levels to values
411            self.current = start + num_values;
412            return self.to_slice.slice_with_length(start * 2, num_values * 2);
413        };
414        if let Some(def) = self.repdef.definition_levels.as_ref() {
415            // There are lists and there are def levels.  That means there may be
416            // more rep/def levels than values.  We need to scan the def levels to figure
417            // out which items are "invisible" and skip over them
418            let mut def_itr = def[start..].iter();
419            let mut num_taken = 0;
420            let mut num_passed = 0;
421            while num_taken < num_values {
422                let def_level = *def_itr.next().unwrap();
423                if def_level <= max_visible_level {
424                    num_taken += 1;
425                }
426                num_passed += 1;
427            }
428            self.current = start + num_passed;
429            self.to_slice.slice_with_length(start * 2, num_passed * 2)
430        } else {
431            // No def levels, should be 1:1 mapping from levels to values
432            self.current = start + num_values;
433            self.to_slice.slice_with_length(start * 2, num_values * 2)
434        }
435    }
436}
437
438/// This tells us how an array handles definition.  Given a stack of
439/// these and a nested array and a set of definition levels we can calculate
440/// how we should interpret the definition levels.
441///
442/// For example, if the interpretation is [AllValidItem, NullableItem] then
443/// a 0 means "valid item" and a 1 means "null struct".  If the interpretation
444/// is [NullableItem, NullableItem] then a 0 means "valid item" and a 1 means
445/// "null item" and a 2 means "null struct".
446///
447/// Lists are tricky because we might use up to two definition levels for a
448/// single layer of list nesting because we need one value to indicate "empty list"
449/// and another value to indicate "null list".
450#[derive(Debug, Copy, Clone, PartialEq, Eq)]
451pub enum DefinitionInterpretation {
452    AllValidItem,
453    AllValidList,
454    NullableItem,
455    NullableList,
456    EmptyableList,
457    NullableAndEmptyableList,
458}
459
460impl DefinitionInterpretation {
461    /// How many definition levels do we need for this layer
462    pub fn num_def_levels(&self) -> u16 {
463        match self {
464            Self::AllValidItem => 0,
465            Self::AllValidList => 0,
466            Self::NullableItem => 1,
467            Self::NullableList => 1,
468            Self::EmptyableList => 1,
469            Self::NullableAndEmptyableList => 2,
470        }
471    }
472
473    /// Does this layer have nulls?
474    pub fn is_all_valid(&self) -> bool {
475        matches!(
476            self,
477            Self::AllValidItem | Self::AllValidList | Self::EmptyableList
478        )
479    }
480
481    /// Does this layer represent a list?
482    pub fn is_list(&self) -> bool {
483        matches!(
484            self,
485            Self::AllValidList
486                | Self::NullableList
487                | Self::EmptyableList
488                | Self::NullableAndEmptyableList
489        )
490    }
491}
492
493/// The RepDefBuilder is used to collect offsets & validity buffers
494/// from arrow structures.  Once we have those we use the SerializerContext
495/// to build the actual repetition and definition levels.
496///
497/// We know ahead of time how many rep/def levels we will need (number of items
498/// in inner-most array + the number of empty/null lists in any parent arrays).
499///
500/// As a result we try and avoid any re-allocations by pre-allocating the buffers
501/// up front.  We allocate two copies of each buffer which allows us to avoid unsafe
502/// code caused by reading and writing to the same buffer (also, it's unavoidable
503/// because there are times we need to write 'faster' than we read)
504#[derive(Debug)]
505struct SerializerContext {
506    // This is built from outer-to-inner and then reversed at the end
507    def_meaning: Vec<DefinitionInterpretation>,
508    rep_levels: LevelBuffer,
509    spare_rep: LevelBuffer,
510    def_levels: LevelBuffer,
511    spare_def: LevelBuffer,
512    current_rep: u16,
513    current_def: u16,
514    current_len: usize,
515    current_num_specials: usize,
516    has_fsl: bool,
517}
518
519impl SerializerContext {
520    fn new(len: usize, num_layers: usize, max_rep: u16, max_def: u16) -> Self {
521        let def_meaning = Vec::with_capacity(num_layers);
522        Self {
523            rep_levels: if max_rep > 0 {
524                vec![0; len]
525            } else {
526                LevelBuffer::default()
527            },
528            spare_rep: if max_rep > 0 {
529                vec![0; len]
530            } else {
531                LevelBuffer::default()
532            },
533            def_levels: if max_def > 0 {
534                vec![0; len]
535            } else {
536                LevelBuffer::default()
537            },
538            spare_def: if max_def > 0 {
539                vec![0; len]
540            } else {
541                LevelBuffer::default()
542            },
543            def_meaning,
544            current_rep: max_rep,
545            current_def: max_def,
546            current_len: 0,
547            current_num_specials: 0,
548            has_fsl: false,
549        }
550    }
551
552    fn checkout_def(&mut self, meaning: DefinitionInterpretation) -> u16 {
553        let def = self.current_def;
554        self.current_def -= meaning.num_def_levels();
555        self.def_meaning.push(meaning);
556        def
557    }
558
559    fn record_offsets(&mut self, offset_desc: &OffsetDesc) {
560        let rep_level = self.current_rep;
561        let (null_list_level, empty_list_level) =
562            match (offset_desc.validity.is_some(), offset_desc.has_empty_lists) {
563                (true, true) => {
564                    let level =
565                        self.checkout_def(DefinitionInterpretation::NullableAndEmptyableList);
566                    (level - 1, level)
567                }
568                (true, false) => (self.checkout_def(DefinitionInterpretation::NullableList), 0),
569                (false, true) => (
570                    0,
571                    self.checkout_def(DefinitionInterpretation::EmptyableList),
572                ),
573                (false, false) => {
574                    self.checkout_def(DefinitionInterpretation::AllValidList);
575                    (0, 0)
576                }
577            };
578        self.current_rep -= 1;
579
580        if let Some(validity) = &offset_desc.validity {
581            self.do_record_validity(validity, null_list_level);
582        }
583
584        // We write into the spare buffers and read from the active buffers
585        // and then swap at the end.  This way we don't write over what we
586        // are reading.
587
588        let mut new_len = 0;
589        let expected_len = offset_desc.num_values + self.current_num_specials;
590        if expected_len == 0 {
591            // Offsets [0] mean no list values, so no levels.
592            self.current_len = 0;
593            return;
594        }
595        assert!(self.rep_levels.len() >= expected_len - 1);
596        if self.def_levels.is_empty() {
597            let mut write_itr = self.spare_rep.iter_mut();
598            let mut read_iter = self.rep_levels.iter().copied();
599            for w in offset_desc.offsets.windows(2) {
600                let len = w[1] - w[0];
601                // len can't be 0 because then we'd have def levels
602                assert!(len > 0);
603                let rep = read_iter.next().unwrap();
604                let list_level = if rep == 0 { rep_level } else { rep };
605                *write_itr.next().unwrap() = list_level;
606
607                for _ in 1..len {
608                    *write_itr.next().unwrap() = 0;
609                }
610                new_len += len as usize;
611            }
612            std::mem::swap(&mut self.rep_levels, &mut self.spare_rep);
613        } else {
614            assert!(self.def_levels.len() >= expected_len - 1);
615            let mut def_write_itr = self.spare_def.iter_mut();
616            let mut rep_write_itr = self.spare_rep.iter_mut();
617            let mut rep_read_itr = self.rep_levels.iter().copied();
618            let mut def_read_itr = self.def_levels.iter().copied();
619            let specials_to_pass = self.current_num_specials;
620            let mut specials_passed = 0;
621
622            for w in offset_desc.offsets.windows(2) {
623                let mut def = def_read_itr.next().unwrap();
624                // Copy over any higher-level special values in place
625                while def > SPECIAL_THRESHOLD {
626                    *def_write_itr.next().unwrap() = def;
627                    *rep_write_itr.next().unwrap() = rep_read_itr.next().unwrap();
628                    def = def_read_itr.next().unwrap();
629                    new_len += 1;
630                    specials_passed += 1;
631                }
632
633                let len = w[1] - w[0];
634                let rep = rep_read_itr.next().unwrap();
635
636                // If the rep_level is 0 then we are the first list level
637                // otherwise we are starting a higher level list so keep
638                // existing rep level
639                let list_level = if rep == 0 { rep_level } else { rep };
640
641                if def == 0 && len > 0 {
642                    // New valid list, write a rep level and then add new 0/0 items
643                    *def_write_itr.next().unwrap() = 0;
644                    *rep_write_itr.next().unwrap() = list_level;
645
646                    for _ in 1..len {
647                        *def_write_itr.next().unwrap() = 0;
648                        *rep_write_itr.next().unwrap() = 0;
649                    }
650
651                    new_len += len as usize;
652                } else if def == 0 {
653                    // Empty list, insert new special
654                    *def_write_itr.next().unwrap() = empty_list_level + SPECIAL_THRESHOLD;
655                    *rep_write_itr.next().unwrap() = list_level;
656                    new_len += 1;
657                } else {
658                    // Either the list is null or one of its struct parents
659                    // is null.  Promote it to a special value.
660                    *def_write_itr.next().unwrap() = def + SPECIAL_THRESHOLD;
661                    *rep_write_itr.next().unwrap() = list_level;
662                    new_len += 1;
663                }
664            }
665
666            // If we have any special values at the end, we need to copy them over
667            while specials_passed < specials_to_pass {
668                *def_write_itr.next().unwrap() = def_read_itr.next().unwrap();
669                *rep_write_itr.next().unwrap() = rep_read_itr.next().unwrap();
670                new_len += 1;
671                specials_passed += 1;
672            }
673            std::mem::swap(&mut self.def_levels, &mut self.spare_def);
674            std::mem::swap(&mut self.rep_levels, &mut self.spare_rep);
675        }
676
677        self.current_len = new_len;
678        self.current_num_specials += offset_desc.num_specials;
679    }
680
681    fn do_record_validity(&mut self, validity: &BooleanBuffer, null_level: u16) {
682        assert!(self.def_levels.len() >= validity.len() + self.current_num_specials);
683        debug_assert!(
684            self.current_len == 0 || self.current_len == validity.len() + self.current_num_specials
685        );
686        self.current_len = validity.len();
687
688        let mut def_read_itr = self.def_levels.iter().copied();
689        let mut def_write_itr = self.spare_def.iter_mut();
690
691        let specials_to_pass = self.current_num_specials;
692        let mut specials_passed = 0;
693
694        for incoming_validity in validity.iter() {
695            let mut def = def_read_itr.next().unwrap();
696            while def > SPECIAL_THRESHOLD {
697                *def_write_itr.next().unwrap() = def;
698                def = def_read_itr.next().unwrap();
699                specials_passed += 1;
700            }
701            if def == 0 && !incoming_validity {
702                *def_write_itr.next().unwrap() = null_level;
703            } else {
704                *def_write_itr.next().unwrap() = def;
705            }
706        }
707
708        while specials_passed < specials_to_pass {
709            *def_write_itr.next().unwrap() = def_read_itr.next().unwrap();
710            specials_passed += 1;
711        }
712
713        std::mem::swap(&mut self.def_levels, &mut self.spare_def);
714    }
715
716    fn multiply_levels(&mut self, multiplier: usize) {
717        let old_len = self.current_len;
718        // All non-special values will be broadcasted by the multiplier.  Special values are copied as-is.
719        self.current_len =
720            (self.current_len - self.current_num_specials) * multiplier + self.current_num_specials;
721
722        if self.rep_levels.is_empty() && self.def_levels.is_empty() {
723            // All valid with no rep/def levels, nothing to do
724            return;
725        } else if self.rep_levels.is_empty() {
726            assert!(self.def_levels.len() >= self.current_len);
727            // No rep levels, just multiply the def levels
728            let mut def_read_itr = self.def_levels.iter().copied();
729            let mut def_write_itr = self.spare_def.iter_mut();
730            for _ in 0..old_len {
731                let mut def = def_read_itr.next().unwrap();
732                while def > SPECIAL_THRESHOLD {
733                    *def_write_itr.next().unwrap() = def;
734                    def = def_read_itr.next().unwrap();
735                }
736                for _ in 0..multiplier {
737                    *def_write_itr.next().unwrap() = def;
738                }
739            }
740        } else if self.def_levels.is_empty() {
741            assert!(self.rep_levels.len() >= self.current_len);
742            // No def levels, just multiply the rep levels
743            let mut rep_read_itr = self.rep_levels.iter().copied();
744            let mut rep_write_itr = self.spare_rep.iter_mut();
745            for _ in 0..old_len {
746                let rep = rep_read_itr.next().unwrap();
747                for _ in 0..multiplier {
748                    *rep_write_itr.next().unwrap() = rep;
749                }
750            }
751        } else {
752            assert!(self.rep_levels.len() >= self.current_len);
753            assert!(self.def_levels.len() >= self.current_len);
754            let mut rep_read_itr = self.rep_levels.iter().copied();
755            let mut def_read_itr = self.def_levels.iter().copied();
756            let mut rep_write_itr = self.spare_rep.iter_mut();
757            let mut def_write_itr = self.spare_def.iter_mut();
758            for _ in 0..old_len {
759                let mut def = def_read_itr.next().unwrap();
760                while def > SPECIAL_THRESHOLD {
761                    *def_write_itr.next().unwrap() = def;
762                    *rep_write_itr.next().unwrap() = rep_read_itr.next().unwrap();
763                    def = def_read_itr.next().unwrap();
764                }
765                let rep = rep_read_itr.next().unwrap();
766                for _ in 0..multiplier {
767                    *def_write_itr.next().unwrap() = def;
768                    *rep_write_itr.next().unwrap() = rep;
769                }
770            }
771        }
772        std::mem::swap(&mut self.def_levels, &mut self.spare_def);
773        std::mem::swap(&mut self.rep_levels, &mut self.spare_rep);
774    }
775
776    fn record_validity_buf(&mut self, validity: &Option<BooleanBuffer>) {
777        if let Some(validity) = validity {
778            let def_level = self.checkout_def(DefinitionInterpretation::NullableItem);
779            self.do_record_validity(validity, def_level);
780        } else {
781            self.checkout_def(DefinitionInterpretation::AllValidItem);
782        }
783    }
784
785    fn record_validity(&mut self, validity_desc: &ValidityDesc) {
786        self.record_validity_buf(&validity_desc.validity)
787    }
788
789    fn record_fsl(&mut self, fsl_desc: &FslDesc) {
790        self.has_fsl = true;
791        self.record_validity_buf(&fsl_desc.validity);
792        self.multiply_levels(fsl_desc.dimension);
793    }
794
795    fn normalize_specials(&mut self) {
796        for def in self.def_levels.iter_mut() {
797            if *def > SPECIAL_THRESHOLD {
798                *def -= SPECIAL_THRESHOLD;
799            }
800        }
801    }
802
803    fn normalize_specials_and_plan_splits(
804        &mut self,
805        def_meaning: &[DefinitionInterpretation],
806        max_levels_per_page: Option<u64>,
807        num_rows: u64,
808        num_values: u64,
809    ) -> Result<StructuralPagePlan> {
810        // Extremely sparse lists can have many rep/def levels for very few
811        // visible leaf values.  If this ratio becomes too skewed then a
812        // miniblock structural chunk can exceed its packed rep/def metadata
813        // budget even though the value buffers are small.  We detect that case
814        // while normalizing special def levels and split the structural page on
815        // top-level row boundaries so each emitted page stays within the
816        // miniblock structural budget.
817        if self.def_levels.is_empty() {
818            return Ok(StructuralPagePlan::Fits);
819        }
820
821        if self.rep_levels.is_empty() {
822            self.normalize_specials();
823            return Ok(StructuralPagePlan::Fits);
824        }
825
826        if self.rep_levels.len() != self.def_levels.len() {
827            return Err(Error::internal(format!(
828                "Cannot plan structural page splits with mismatched rep/def lengths: rep={}, def={}",
829                self.rep_levels.len(),
830                self.def_levels.len()
831            )));
832        }
833
834        let Some(max_levels_per_page) = max_levels_per_page else {
835            self.normalize_specials();
836            return Ok(StructuralPagePlan::Fits);
837        };
838
839        if num_values == 0 {
840            self.normalize_specials();
841            return Ok(StructuralPagePlan::Fits);
842        }
843
844        let max_schema_rep = def_meaning.iter().filter(|level| level.is_list()).count() as u16;
845        let max_visible_level = SerializedRepDefs::max_visible_level(def_meaning);
846        let should_plan = !self.has_fsl && max_schema_rep > 0 && max_visible_level.is_some();
847
848        if !should_plan {
849            self.normalize_specials();
850            return Ok(StructuralPagePlan::Fits);
851        }
852
853        let max_visible_level = max_visible_level.unwrap();
854        let mut splits = Vec::new();
855        let mut counted_rows = 0u64;
856        let mut counted_values = 0u64;
857        let mut saw_structural_overhead = false;
858        let mut unsplittable_over_budget = None;
859
860        let mut current_row_level_start = None;
861        let mut current_row_num_values = 0u64;
862
863        let mut current_page_row_start = 0u64;
864        let mut current_page_num_rows = 0u64;
865        let mut current_page_level_start = 0usize;
866        let mut current_page_level_end = 0usize;
867        let mut current_page_value_start = 0u64;
868        let mut current_page_num_values = 0u64;
869        let mut current_page_num_levels = 0u64;
870        let mut current_page_has_structural_overhead = false;
871
872        let mut finish_row =
873            |row_level_start: usize, row_level_end: usize, row_num_values: u64| -> Result<()> {
874                let row_num_levels = (row_level_end - row_level_start) as u64;
875                let row_has_structural_overhead = row_num_levels > row_num_values;
876                saw_structural_overhead |= row_has_structural_overhead;
877
878                if row_has_structural_overhead && row_num_levels > max_levels_per_page {
879                    unsplittable_over_budget = Some(row_num_levels);
880                }
881
882                if current_page_num_rows > 0
883                    && (current_page_has_structural_overhead || row_has_structural_overhead)
884                    && current_page_num_levels + row_num_levels > max_levels_per_page
885                {
886                    splits.push(StructuralPageSplit {
887                        row_start: current_page_row_start,
888                        num_rows: current_page_num_rows,
889                        level_range: current_page_level_start..current_page_level_end,
890                        value_start: current_page_value_start,
891                        num_values: current_page_num_values,
892                    });
893                    current_page_row_start = counted_rows;
894                    current_page_num_rows = 0;
895                    current_page_level_start = row_level_start;
896                    current_page_value_start = counted_values;
897                    current_page_num_values = 0;
898                    current_page_num_levels = 0;
899                    current_page_has_structural_overhead = false;
900                }
901
902                if current_page_num_rows == 0 {
903                    current_page_level_start = row_level_start;
904                }
905                current_page_num_rows += 1;
906                current_page_level_end = row_level_end;
907                current_page_num_values += row_num_values;
908                current_page_num_levels += row_num_levels;
909                current_page_has_structural_overhead |= row_has_structural_overhead;
910                counted_rows += 1;
911                counted_values += row_num_values;
912                Ok(())
913            };
914
915        for (idx, (rep_level, def_level)) in self
916            .rep_levels
917            .iter()
918            .copied()
919            .zip(self.def_levels.iter_mut())
920            .enumerate()
921        {
922            if *def_level > SPECIAL_THRESHOLD {
923                *def_level -= SPECIAL_THRESHOLD;
924            }
925
926            if rep_level == max_schema_rep {
927                if let Some(level_start) = current_row_level_start {
928                    finish_row(level_start, idx, current_row_num_values)?;
929                    current_row_num_values = 0;
930                } else if idx != 0 {
931                    return Err(Error::internal(format!(
932                        "Cannot plan structural page splits: first top-level row starts at level {}, expected 0",
933                        idx
934                    )));
935                }
936                current_row_level_start = Some(idx);
937            }
938
939            if current_row_level_start.is_none() {
940                return Err(Error::internal(
941                    "Cannot plan structural page splits: found levels before the first top-level row start",
942                ));
943            }
944            if *def_level <= max_visible_level {
945                current_row_num_values += 1;
946            }
947        }
948
949        let Some(level_start) = current_row_level_start else {
950            return Err(Error::internal(
951                "Cannot plan structural page splits: found no top-level row starts",
952            ));
953        };
954        finish_row(level_start, self.rep_levels.len(), current_row_num_values)?;
955
956        if counted_rows != num_rows {
957            return Err(Error::internal(format!(
958                "Cannot plan structural page splits: expected {} top-level row starts, found {}",
959                num_rows, counted_rows
960            )));
961        }
962        if counted_values != num_values {
963            return Err(Error::internal(format!(
964                "Cannot plan structural page splits: counted {} visible values, expected {}",
965                counted_values, num_values
966            )));
967        }
968        if !saw_structural_overhead {
969            return Ok(StructuralPagePlan::Fits);
970        }
971        if let Some(row_num_levels) = unsplittable_over_budget {
972            return Ok(StructuralPagePlan::UnsplittableOverBudget(row_num_levels));
973        }
974
975        if current_page_num_rows > 0 {
976            splits.push(StructuralPageSplit {
977                row_start: current_page_row_start,
978                num_rows: current_page_num_rows,
979                level_range: current_page_level_start..current_page_level_end,
980                value_start: current_page_value_start,
981                num_values: current_page_num_values,
982            });
983        }
984
985        if splits.len() > 1 {
986            Ok(StructuralPagePlan::Split(splits))
987        } else {
988            Ok(StructuralPagePlan::Fits)
989        }
990    }
991
992    fn build(mut self) -> SerializedRepDefs {
993        if self.current_len == 0 {
994            return SerializedRepDefs::new_with_fixed_size_list_levels(
995                None,
996                None,
997                self.def_meaning,
998                self.has_fsl,
999            );
1000        }
1001
1002        self.normalize_specials();
1003
1004        let definition_levels = if self.def_levels.is_empty() {
1005            None
1006        } else {
1007            Some(self.def_levels)
1008        };
1009        let repetition_levels = if self.rep_levels.is_empty() {
1010            None
1011        } else {
1012            Some(self.rep_levels)
1013        };
1014
1015        // Need to reverse the def meaning since we build rep / def levels in reverse
1016        let def_meaning = self.def_meaning.into_iter().rev().collect::<Vec<_>>();
1017
1018        SerializedRepDefs::new_with_fixed_size_list_levels(
1019            repetition_levels,
1020            definition_levels,
1021            def_meaning,
1022            self.has_fsl,
1023        )
1024    }
1025
1026    fn build_with_structural_plan(
1027        mut self,
1028        max_levels_per_page: Option<u64>,
1029        num_rows: u64,
1030        num_values: u64,
1031    ) -> Result<(SerializedRepDefs, StructuralPagePlan)> {
1032        if self.current_len == 0 {
1033            return Ok((
1034                SerializedRepDefs::new_with_fixed_size_list_levels(
1035                    None,
1036                    None,
1037                    self.def_meaning,
1038                    self.has_fsl,
1039                ),
1040                StructuralPagePlan::Fits,
1041            ));
1042        }
1043
1044        // Need to reverse the def meaning since we build rep / def levels in reverse
1045        let def_meaning = std::mem::take(&mut self.def_meaning)
1046            .into_iter()
1047            .rev()
1048            .collect::<Vec<_>>();
1049        let plan = self.normalize_specials_and_plan_splits(
1050            &def_meaning,
1051            max_levels_per_page,
1052            num_rows,
1053            num_values,
1054        )?;
1055
1056        let definition_levels = if self.def_levels.is_empty() {
1057            None
1058        } else {
1059            Some(self.def_levels)
1060        };
1061        let repetition_levels = if self.rep_levels.is_empty() {
1062            None
1063        } else {
1064            Some(self.rep_levels)
1065        };
1066
1067        Ok((
1068            SerializedRepDefs::new_with_fixed_size_list_levels(
1069                repetition_levels,
1070                definition_levels,
1071                def_meaning,
1072                self.has_fsl,
1073            ),
1074            plan,
1075        ))
1076    }
1077}
1078
1079/// A structure used to collect validity buffers and offsets from arrow
1080/// arrays and eventually create repetition and definition levels
1081///
1082/// As we are encoding the structural encoders are given this struct and
1083/// will record the arrow information into it.  Once we hit a leaf node we
1084/// serialize the data into rep/def levels and write these into the page.
1085#[derive(Clone, Default, Debug)]
1086pub struct RepDefBuilder {
1087    // The rep/def info we have collected so far
1088    repdefs: Vec<RawRepDef>,
1089    // The current length, can get larger as we traverse lists (e.g. an
1090    // array might have 5 lists which results in 50 items)
1091    //
1092    // Starts uninitialized until we see the first rep/def item
1093    len: Option<usize>,
1094}
1095
1096impl RepDefBuilder {
1097    fn check_validity_len(&mut self, incoming_len: usize) {
1098        if let Some(len) = self.len {
1099            assert_eq!(incoming_len, len);
1100        } else {
1101            // First validity buffer we've seen
1102            self.len = Some(incoming_len);
1103        }
1104    }
1105
1106    fn num_layers(&self) -> usize {
1107        self.repdefs.len()
1108    }
1109
1110    /// The builder is "empty" if there is no repetition and no nulls.  In this case we don't need
1111    /// to store anything to disk (except the description)
1112    pub fn is_empty(&self) -> bool {
1113        self.repdefs
1114            .iter()
1115            .all(|r| matches!(r, RawRepDef::Validity(ValidityDesc { validity: None, .. })))
1116    }
1117
1118    /// Returns true if there is only a single layer of definition
1119    pub fn is_simple_validity(&self) -> bool {
1120        self.repdefs.len() == 1 && matches!(self.repdefs[0], RawRepDef::Validity(_))
1121    }
1122
1123    /// Registers a nullable validity bitmap
1124    pub fn add_validity_bitmap(&mut self, validity: NullBuffer) {
1125        self.check_validity_len(validity.len());
1126        if validity.null_count() == 0 {
1127            self.add_no_null(validity.len());
1128            return;
1129        }
1130        self.repdefs.push(RawRepDef::Validity(ValidityDesc {
1131            num_values: validity.len(),
1132            validity: Some(validity.into_inner()),
1133        }));
1134    }
1135
1136    /// Registers an all-valid validity layer
1137    pub fn add_no_null(&mut self, len: usize) {
1138        self.check_validity_len(len);
1139        self.repdefs.push(RawRepDef::Validity(ValidityDesc {
1140            validity: None,
1141            num_values: len,
1142        }));
1143    }
1144
1145    pub fn add_fsl(&mut self, validity: Option<NullBuffer>, dimension: usize, num_values: usize) {
1146        if let Some(len) = self.len {
1147            assert_eq!(num_values, len);
1148        }
1149        self.len = Some(num_values * dimension);
1150        debug_assert!(validity.is_none() || validity.as_ref().unwrap().len() == num_values);
1151        self.repdefs.push(RawRepDef::Fsl(FslDesc {
1152            num_values,
1153            validity: validity.map(|v| v.into_inner()),
1154            dimension,
1155        }))
1156    }
1157
1158    fn check_offset_len(&mut self, offsets: &[i64]) {
1159        if let Some(len) = self.len {
1160            assert!(offsets.len() == len + 1);
1161        }
1162        self.len = Some(offsets[offsets.len() - 1] as usize);
1163    }
1164
1165    fn do_add_offsets(
1166        &mut self,
1167        lengths: impl Iterator<Item = i64>,
1168        validity: Option<NullBuffer>,
1169        capacity: usize,
1170    ) -> bool {
1171        let mut num_specials = 0;
1172        let mut has_empty_lists = false;
1173        let mut has_garbage_values = false;
1174        let mut last_off: i64 = 0;
1175
1176        let mut normalized_offsets = Vec::with_capacity(capacity);
1177        normalized_offsets.push(0);
1178
1179        if let Some(ref validity) = validity {
1180            for (len, is_valid) in lengths.zip(validity.iter()) {
1181                match (is_valid, len == 0) {
1182                    (false, is_empty) => {
1183                        num_specials += 1;
1184                        has_garbage_values |= !is_empty;
1185                    }
1186                    (true, true) => {
1187                        num_specials += 1;
1188                        has_empty_lists = true;
1189                    }
1190                    _ => {
1191                        last_off += len;
1192                    }
1193                }
1194                normalized_offsets.push(last_off);
1195            }
1196        } else {
1197            for len in lengths {
1198                if len == 0 {
1199                    num_specials += 1;
1200                    has_empty_lists = true;
1201                }
1202                last_off += len;
1203                normalized_offsets.push(last_off);
1204            }
1205        }
1206
1207        self.check_offset_len(&normalized_offsets);
1208        self.repdefs.push(RawRepDef::Offsets(OffsetDesc {
1209            num_values: normalized_offsets.len() - 1,
1210            offsets: normalized_offsets.into(),
1211            validity: validity.map(|v| v.into_inner()),
1212            has_empty_lists,
1213            num_specials: num_specials as usize,
1214        }));
1215
1216        has_garbage_values
1217    }
1218
1219    /// Adds a layer of offsets
1220    ///
1221    /// Offsets are casted to a common type (i64) and also normalized.  Null lists are
1222    /// always represented by a zero-length (identical) pair of offsets and so the caller
1223    /// should filter out any garbage items before encoding them.  To assist with this the
1224    /// method will return true if any non-empty null lists were found.
1225    pub fn add_offsets<O: OffsetSizeTrait>(
1226        &mut self,
1227        offsets: OffsetBuffer<O>,
1228        validity: Option<NullBuffer>,
1229    ) -> bool {
1230        let inner = offsets.into_inner();
1231        let buffer_len = inner.len();
1232
1233        if O::IS_LARGE {
1234            let i64_buff = ScalarBuffer::<i64>::new(inner.into_inner(), 0, buffer_len);
1235            let lengths = i64_buff.windows(2).map(|off| off[1] - off[0]);
1236            self.do_add_offsets(lengths, validity, buffer_len)
1237        } else {
1238            let i32_buff = ScalarBuffer::<i32>::new(inner.into_inner(), 0, buffer_len);
1239            let lengths = i32_buff.windows(2).map(|off| (off[1] - off[0]) as i64);
1240            self.do_add_offsets(lengths, validity, buffer_len)
1241        }
1242    }
1243
1244    // When we are encoding data it arrives in batches.  For each batch we create a RepDefBuilder and collect the
1245    // various validity buffers and offset buffers from that batch.  Once we have enough batches to write a page we
1246    // need to take this collection of RepDefBuilders and concatenate them and then serialize them into rep/def levels.
1247    //
1248    // TODO: In the future, we may concatenate and serialize at the same time?
1249    //
1250    // This method takes care of the concatenation part.  First we collect all of layer 0 from each builder, then we
1251    // call this method.  Then we collect all of layer 1 from each builder and call this method.  And so on.
1252    //
1253    // That means this method should get a collection of `RawRepDef` where each item is the same kind (all validity or
1254    // all offsets) though the nullability / lengths may be different in each layer.
1255    fn concat_layers<'a>(
1256        layers: impl Iterator<Item = &'a RawRepDef>,
1257        num_layers: usize,
1258    ) -> RawRepDef {
1259        enum LayerKind {
1260            Validity,
1261            Fsl,
1262            Offsets,
1263        }
1264
1265        // We make two passes through the layers.  The first determines if we need to pay the cost of allocating
1266        // buffers.  The second pass actually adds the values.
1267        let mut collected = Vec::with_capacity(num_layers);
1268        let mut has_nulls = false;
1269        let mut layer_kind = LayerKind::Validity;
1270        let mut total_num_specials = 0;
1271        let mut all_dimension = 0;
1272        let mut all_has_empty_lists = false;
1273        let mut all_num_values = 0;
1274        for layer in layers {
1275            has_nulls |= layer.has_nulls();
1276            match layer {
1277                RawRepDef::Validity(_) => {
1278                    layer_kind = LayerKind::Validity;
1279                }
1280                RawRepDef::Offsets(OffsetDesc {
1281                    num_specials,
1282                    has_empty_lists,
1283                    ..
1284                }) => {
1285                    all_has_empty_lists |= *has_empty_lists;
1286                    layer_kind = LayerKind::Offsets;
1287                    total_num_specials += num_specials;
1288                }
1289                RawRepDef::Fsl(FslDesc { dimension, .. }) => {
1290                    layer_kind = LayerKind::Fsl;
1291                    all_dimension = *dimension;
1292                }
1293            }
1294            collected.push(layer);
1295            all_num_values += layer.num_values();
1296        }
1297
1298        // Shortcut if there are no nulls
1299        if !has_nulls {
1300            match layer_kind {
1301                LayerKind::Validity => {
1302                    return RawRepDef::Validity(ValidityDesc {
1303                        validity: None,
1304                        num_values: all_num_values,
1305                    });
1306                }
1307                LayerKind::Fsl => {
1308                    return RawRepDef::Fsl(FslDesc {
1309                        validity: None,
1310                        num_values: all_num_values,
1311                        dimension: all_dimension,
1312                    });
1313                }
1314                LayerKind::Offsets => {}
1315            }
1316        }
1317
1318        // Only allocate if needed
1319        let mut validity_builder = if has_nulls {
1320            BooleanBufferBuilder::new(all_num_values)
1321        } else {
1322            BooleanBufferBuilder::new(0)
1323        };
1324        let mut all_offsets = if matches!(layer_kind, LayerKind::Offsets) {
1325            let mut all_offsets = Vec::with_capacity(all_num_values);
1326            all_offsets.push(0);
1327            all_offsets
1328        } else {
1329            Vec::new()
1330        };
1331
1332        for layer in collected {
1333            match layer {
1334                RawRepDef::Validity(ValidityDesc {
1335                    validity: Some(validity),
1336                    ..
1337                }) => {
1338                    validity_builder.append_buffer(validity);
1339                }
1340                RawRepDef::Validity(ValidityDesc {
1341                    validity: None,
1342                    num_values,
1343                }) => {
1344                    validity_builder.append_n(*num_values, true);
1345                }
1346                RawRepDef::Fsl(FslDesc {
1347                    validity,
1348                    num_values,
1349                    ..
1350                }) => {
1351                    if let Some(validity) = validity {
1352                        validity_builder.append_buffer(validity);
1353                    } else {
1354                        validity_builder.append_n(*num_values, true);
1355                    }
1356                }
1357                RawRepDef::Offsets(OffsetDesc {
1358                    offsets,
1359                    validity: Some(validity),
1360                    has_empty_lists,
1361                    ..
1362                }) => {
1363                    all_has_empty_lists |= has_empty_lists;
1364                    validity_builder.append_buffer(validity);
1365                    let last = *all_offsets.last().unwrap();
1366                    all_offsets.extend(offsets.iter().skip(1).map(|off| *off + last));
1367                }
1368                RawRepDef::Offsets(OffsetDesc {
1369                    offsets,
1370                    validity: None,
1371                    has_empty_lists,
1372                    num_values,
1373                    ..
1374                }) => {
1375                    all_has_empty_lists |= has_empty_lists;
1376                    if has_nulls {
1377                        validity_builder.append_n(*num_values, true);
1378                    }
1379                    let last = *all_offsets.last().unwrap();
1380                    all_offsets.extend(offsets.iter().skip(1).map(|off| *off + last));
1381                }
1382            }
1383        }
1384        let validity = if has_nulls {
1385            Some(validity_builder.finish())
1386        } else {
1387            None
1388        };
1389        match layer_kind {
1390            LayerKind::Fsl => RawRepDef::Fsl(FslDesc {
1391                validity,
1392                num_values: all_num_values,
1393                dimension: all_dimension,
1394            }),
1395            LayerKind::Validity => RawRepDef::Validity(ValidityDesc {
1396                validity,
1397                num_values: all_num_values,
1398            }),
1399            LayerKind::Offsets => RawRepDef::Offsets(OffsetDesc {
1400                offsets: all_offsets.into(),
1401                validity,
1402                has_empty_lists: all_has_empty_lists,
1403                num_values: all_num_values,
1404                num_specials: total_num_specials,
1405            }),
1406        }
1407    }
1408
1409    /// Converts the validity / offsets buffers that have been gathered so far
1410    /// into repetition and definition levels
1411    pub fn serialize(builders: Vec<Self>) -> SerializedRepDefs {
1412        Self::serialize_builders(builders).0.build()
1413    }
1414
1415    /// Converts gathered structural buffers into rep/def levels and an encode-time plan.
1416    pub(crate) fn serialize_with_structural_plan(
1417        builders: Vec<Self>,
1418        max_levels_for_bits: impl FnOnce(u64) -> u64,
1419        num_rows: u64,
1420        num_values: u64,
1421    ) -> Result<(SerializedRepDefs, StructuralPagePlan)> {
1422        let (context, bits_per_level) = Self::serialize_builders(builders);
1423        context.build_with_structural_plan(
1424            bits_per_level.map(max_levels_for_bits),
1425            num_rows,
1426            num_values,
1427        )
1428    }
1429
1430    fn serialize_builders(builders: Vec<Self>) -> (SerializerContext, Option<u64>) {
1431        assert!(!builders.is_empty());
1432        if builders.iter().all(|b| b.is_empty()) {
1433            // No repetition, all-valid
1434            let def_meaning = builders
1435                .first()
1436                .unwrap()
1437                .repdefs
1438                .iter()
1439                .map(|_| DefinitionInterpretation::AllValidItem)
1440                .collect::<Vec<_>>();
1441            return (
1442                SerializerContext {
1443                    def_meaning,
1444                    rep_levels: LevelBuffer::default(),
1445                    spare_rep: LevelBuffer::default(),
1446                    def_levels: LevelBuffer::default(),
1447                    spare_def: LevelBuffer::default(),
1448                    current_rep: 0,
1449                    current_def: 0,
1450                    current_len: 0,
1451                    current_num_specials: 0,
1452                    has_fsl: false,
1453                },
1454                None,
1455            );
1456        }
1457
1458        let num_layers = builders[0].num_layers();
1459        let combined_layers = (0..num_layers)
1460            .map(|layer_index| {
1461                Self::concat_layers(
1462                    builders.iter().map(|b| &b.repdefs[layer_index]),
1463                    builders.len(),
1464                )
1465            })
1466            .collect::<Vec<_>>();
1467        debug_assert!(
1468            builders
1469                .iter()
1470                .all(|b| b.num_layers() == builders[0].num_layers())
1471        );
1472
1473        let total_len = combined_layers.last().unwrap().num_values()
1474            + combined_layers
1475                .iter()
1476                .map(|l| l.num_specials())
1477                .sum::<usize>();
1478        let max_rep = combined_layers.iter().map(|l| l.max_rep()).sum::<u16>();
1479        let max_def = combined_layers.iter().map(|l| l.max_def()).sum::<u16>();
1480        let bits_per_rep = if max_rep > 0 {
1481            u64::from(u16::BITS - max_rep.leading_zeros())
1482        } else {
1483            0
1484        };
1485        let bits_per_def = if max_def > 0 {
1486            u64::from(u16::BITS - max_def.leading_zeros())
1487        } else {
1488            0
1489        };
1490        let bits_per_level =
1491            (bits_per_rep + bits_per_def > 0).then_some(bits_per_rep + bits_per_def);
1492
1493        let mut context = SerializerContext::new(total_len, num_layers, max_rep, max_def);
1494        for layer in combined_layers.into_iter() {
1495            match layer {
1496                RawRepDef::Validity(def) => {
1497                    context.record_validity(&def);
1498                }
1499                RawRepDef::Offsets(rep) => {
1500                    context.record_offsets(&rep);
1501                }
1502                RawRepDef::Fsl(fsl) => {
1503                    context.record_fsl(&fsl);
1504                }
1505            }
1506        }
1507        (context, bits_per_level)
1508    }
1509}
1510
1511/// Starts with serialized repetition and definition levels and unravels
1512/// them into validity buffers and offsets buffers
1513///
1514/// This is used during decoding to create the necessary arrow structures
1515#[derive(Debug)]
1516pub struct RepDefUnraveler {
1517    rep_levels: Option<LevelBuffer>,
1518    def_levels: Option<LevelBuffer>,
1519    // Maps from definition level to the rep level at which that definition level is visible
1520    levels_to_rep: Vec<u16>,
1521    def_meaning: Arc<[DefinitionInterpretation]>,
1522    // Current definition level to compare to.
1523    current_def_cmp: u16,
1524    // Current rep level, determines which specials we can see
1525    current_rep_cmp: u16,
1526    // Current layer index, 0 means inner-most layer and it counts up from there.  Used to index
1527    // into special_defs
1528    current_layer: usize,
1529    // Number of items in the inner-most layer (needed if the definition levels are not present)
1530    num_items: u64,
1531}
1532
1533impl RepDefUnraveler {
1534    /// Creates a new unraveler from serialized repetition and definition information
1535    pub fn new(
1536        rep_levels: Option<LevelBuffer>,
1537        def_levels: Option<LevelBuffer>,
1538        def_meaning: Arc<[DefinitionInterpretation]>,
1539        num_items: u64,
1540    ) -> Self {
1541        let mut levels_to_rep = Vec::with_capacity(def_meaning.len());
1542        let mut rep_counter = 0;
1543        // Level=0 is always visible and means valid item
1544        levels_to_rep.push(0);
1545        for meaning in def_meaning.as_ref() {
1546            match meaning {
1547                DefinitionInterpretation::AllValidItem | DefinitionInterpretation::AllValidList => {
1548                    // There is no corresponding level, so nothing to put in levels_to_rep
1549                }
1550                DefinitionInterpretation::NullableItem => {
1551                    // Some null structs are not visible at inner rep levels in cases like LIST<STRUCT<LIST<...>>>
1552                    levels_to_rep.push(rep_counter);
1553                }
1554                DefinitionInterpretation::NullableList => {
1555                    rep_counter += 1;
1556                    levels_to_rep.push(rep_counter);
1557                }
1558                DefinitionInterpretation::EmptyableList => {
1559                    rep_counter += 1;
1560                    levels_to_rep.push(rep_counter);
1561                }
1562                DefinitionInterpretation::NullableAndEmptyableList => {
1563                    rep_counter += 1;
1564                    levels_to_rep.push(rep_counter);
1565                    levels_to_rep.push(rep_counter);
1566                }
1567            }
1568        }
1569        Self {
1570            rep_levels,
1571            def_levels,
1572            current_def_cmp: 0,
1573            current_rep_cmp: 0,
1574            levels_to_rep,
1575            current_layer: 0,
1576            def_meaning,
1577            num_items,
1578        }
1579    }
1580
1581    pub fn is_all_valid(&self) -> bool {
1582        self.def_levels.is_none() || self.def_meaning[self.current_layer].is_all_valid()
1583    }
1584
1585    /// If the current level is a repetition layer then this returns the number of lists
1586    /// at this level.
1587    ///
1588    /// This is not valid to call when the current level is a struct/primitive layer because
1589    /// in some cases there may be no rep or def information to know this.
1590    pub fn max_lists(&self) -> usize {
1591        debug_assert!(
1592            self.def_meaning[self.current_layer] != DefinitionInterpretation::NullableItem
1593        );
1594        self.rep_levels
1595            .as_ref()
1596            // Worst case every rep item is max_rep and a new list
1597            .map(|levels| levels.len())
1598            .unwrap_or(0)
1599    }
1600
1601    /// Unravels a layer of offsets from the unraveler into the given offset width
1602    ///
1603    /// When decoding a list the caller should first unravel the offsets and then
1604    /// unravel the validity (this is the opposite order used during encoding)
1605    pub fn unravel_offsets<T: ArrowNativeType>(
1606        &mut self,
1607        offsets: &mut Vec<T>,
1608        validity: Option<&mut BooleanBufferBuilder>,
1609    ) -> Result<()> {
1610        let rep_levels = self
1611            .rep_levels
1612            .as_mut()
1613            .expect("Expected repetition level but data didn't contain repetition");
1614        let valid_level = self.current_def_cmp;
1615        let (null_level, empty_level) = match self.def_meaning[self.current_layer] {
1616            DefinitionInterpretation::NullableList => {
1617                self.current_def_cmp += 1;
1618                (valid_level + 1, 0)
1619            }
1620            DefinitionInterpretation::EmptyableList => {
1621                self.current_def_cmp += 1;
1622                (0, valid_level + 1)
1623            }
1624            DefinitionInterpretation::NullableAndEmptyableList => {
1625                self.current_def_cmp += 2;
1626                (valid_level + 1, valid_level + 2)
1627            }
1628            DefinitionInterpretation::AllValidList => (0, 0),
1629            _ => unreachable!(),
1630        };
1631        self.current_layer += 1;
1632
1633        // This is the highest def level that is still visible.  Once we hit a list then
1634        // we stop looking because any null / empty list (or list masked by a higher level
1635        // null) will not be visible
1636        let mut max_level = null_level.max(empty_level).max(valid_level);
1637        // Anything higher than this (but less than max_level) is a null struct masking our
1638        // list.  We will materialize this is a null list.
1639        let upper_null = max_level;
1640        for level in self.def_meaning[self.current_layer..].iter() {
1641            match level {
1642                DefinitionInterpretation::NullableItem => {
1643                    max_level += 1;
1644                }
1645                DefinitionInterpretation::AllValidItem => {}
1646                _ => {
1647                    break;
1648                }
1649            }
1650        }
1651
1652        let mut curlen: usize = offsets.last().map(|o| o.as_usize()).unwrap_or(0);
1653
1654        // If offsets is empty this is a no-op.  If offsets is not empty that means we already
1655        // added a set of offsets.  For example, we might have added [0, 3, 5] (2 lists).  Now
1656        // say we want to add [0, 1, 4] (2 lists).  We should get [0, 3, 5, 6, 9] (4 lists).  If
1657        // we don't pop here we get [0, 3, 5, 5, 6, 9] which is wrong.
1658        //
1659        // Or, to think about it another way, if every unraveler adds the starting 0 and the trailing
1660        // length then we have N + unravelers.len() values instead of N + 1.
1661        offsets.pop();
1662
1663        let to_offset = |val: usize| {
1664            T::from_usize(val)
1665            .ok_or_else(|| Error::invalid_input("A single batch had more than i32::MAX values and so a large container type is required"))
1666        };
1667        self.current_rep_cmp += 1;
1668        if let Some(def_levels) = &mut self.def_levels {
1669            assert!(rep_levels.len() == def_levels.len());
1670            // It's possible validity is None even if we have def levels.  For example, we might have
1671            // empty lists (which require def levels) but no nulls.
1672            let mut push_validity: Box<dyn FnMut(bool)> = if let Some(validity) = validity {
1673                Box::new(|is_valid| validity.append(is_valid))
1674            } else {
1675                Box::new(|_| {})
1676            };
1677            // This is a strange access pattern.  We are iterating over the rep/def levels and
1678            // at the same time writing the rep/def levels.  This means we need both a mutable
1679            // and immutable reference to the rep/def levels.
1680            let mut read_idx = 0;
1681            let mut write_idx = 0;
1682            while read_idx < rep_levels.len() {
1683                // SAFETY: We assert that rep_levels and def_levels have the same
1684                // len and read_idx and write_idx can never go past the end.
1685                unsafe {
1686                    let rep_val = *rep_levels.get_unchecked(read_idx);
1687                    if rep_val != 0 {
1688                        let def_val = *def_levels.get_unchecked(read_idx);
1689                        // Copy over
1690                        *rep_levels.get_unchecked_mut(write_idx) = rep_val - 1;
1691                        *def_levels.get_unchecked_mut(write_idx) = def_val;
1692                        write_idx += 1;
1693
1694                        if def_val == 0 {
1695                            // This is a valid list
1696                            offsets.push(to_offset(curlen)?);
1697                            curlen += 1;
1698                            push_validity(true);
1699                        } else if def_val > max_level {
1700                            // This is not visible at this rep level, do not add to offsets, but keep in repdef
1701                        } else if def_val == null_level || def_val > upper_null {
1702                            // This is a null list (or a list masked by a null struct)
1703                            offsets.push(to_offset(curlen)?);
1704                            push_validity(false);
1705                        } else if def_val == empty_level {
1706                            // This is an empty list
1707                            offsets.push(to_offset(curlen)?);
1708                            push_validity(true);
1709                        } else {
1710                            // New valid list starting with null item
1711                            offsets.push(to_offset(curlen)?);
1712                            curlen += 1;
1713                            push_validity(true);
1714                        }
1715                    } else {
1716                        curlen += 1;
1717                    }
1718                    read_idx += 1;
1719                }
1720            }
1721            offsets.push(to_offset(curlen)?);
1722            rep_levels.truncate(write_idx);
1723            def_levels.truncate(write_idx);
1724            Ok(())
1725        } else {
1726            // SAFETY: See above loop
1727            let mut read_idx = 0;
1728            let mut write_idx = 0;
1729            let old_offsets_len = offsets.len();
1730            while read_idx < rep_levels.len() {
1731                // SAFETY: read_idx / write_idx cannot go past rep_levels.len()
1732                unsafe {
1733                    let rep_val = *rep_levels.get_unchecked(read_idx);
1734                    if rep_val != 0 {
1735                        // Finish the current list
1736                        offsets.push(to_offset(curlen)?);
1737                        *rep_levels.get_unchecked_mut(write_idx) = rep_val - 1;
1738                        write_idx += 1;
1739                    }
1740                    curlen += 1;
1741                    read_idx += 1;
1742                }
1743            }
1744            let num_new_lists = offsets.len() - old_offsets_len;
1745            offsets.push(to_offset(curlen)?);
1746            rep_levels.truncate(offsets.len() - 1);
1747            if let Some(validity) = validity {
1748                // Even though we don't have validity it is possible another unraveler did and so we need
1749                // to push all valids
1750                validity.append_n(num_new_lists, true);
1751            }
1752            Ok(())
1753        }
1754    }
1755
1756    pub fn skip_validity(&mut self) {
1757        debug_assert!(self.is_all_valid());
1758        self.current_layer += 1;
1759    }
1760
1761    /// Unravels a layer of validity from the definition levels
1762    pub fn unravel_validity(&mut self, validity: &mut BooleanBufferBuilder) {
1763        let meaning = self.def_meaning[self.current_layer];
1764        if meaning == DefinitionInterpretation::AllValidItem || self.def_levels.is_none() {
1765            self.current_layer += 1;
1766            validity.append_n(self.num_items as usize, true);
1767            return;
1768        }
1769
1770        self.current_layer += 1;
1771        let def_levels = &self.def_levels.as_ref().unwrap();
1772
1773        let current_def_cmp = self.current_def_cmp;
1774        self.current_def_cmp += 1;
1775
1776        for is_valid in def_levels.iter().filter_map(|&level| {
1777            if self.levels_to_rep[level as usize] <= self.current_rep_cmp {
1778                Some(level <= current_def_cmp)
1779            } else {
1780                None
1781            }
1782        }) {
1783            validity.append(is_valid);
1784        }
1785    }
1786
1787    pub fn decimate(&mut self, dimension: usize) {
1788        if self.rep_levels.is_some() {
1789            // If we need to support this then I think we need to walk through the rep def levels to find
1790            // the spots at which we keep.  E.g. if we have:
1791            //  rep: 1 0 0 1 0 1 0 0 0 1 0 0
1792            //  def: 1 1 1 0 1 0 1 1 0 1 1 0
1793            //  dimension: 2
1794            //
1795            // The output should be:
1796            //  rep: 1 0 0 1 0 0 0
1797            //  def: 1 1 1 0 1 1 0
1798            //
1799            // Maybe there's some special logic for empty/null lists?  I'll save the headache for future me.
1800            todo!("Not yet supported FSL<...List<...>>");
1801        }
1802        let Some(def_levels) = self.def_levels.as_mut() else {
1803            return;
1804        };
1805        let mut read_idx = 0;
1806        let mut write_idx = 0;
1807        while read_idx < def_levels.len() {
1808            unsafe {
1809                *def_levels.get_unchecked_mut(write_idx) = *def_levels.get_unchecked(read_idx);
1810            }
1811            write_idx += 1;
1812            read_idx += dimension;
1813        }
1814        def_levels.truncate(write_idx);
1815    }
1816}
1817
1818/// As we decode we may extract rep/def information from multiple pages (or multiple
1819/// chunks within a page).
1820///
1821/// For each chunk we create an unraveler.  Each unraveler can have a completely different
1822/// interpretation (e.g. one page might contain null items but no null structs and the next
1823/// page might have null structs but no null items).
1824///
1825/// Concatenating these unravelers would be tricky and expensive so instead we have a
1826/// composite unraveler which unravels across multiple unravelers.
1827///
1828/// Note: this class should be used even if there is only one page / unraveler.  This is
1829/// because the `RepDefUnraveler`'s API is more complex (it's meant to be called by this
1830/// class)
1831#[derive(Debug)]
1832pub struct CompositeRepDefUnraveler {
1833    unravelers: Vec<RepDefUnraveler>,
1834}
1835
1836impl CompositeRepDefUnraveler {
1837    pub fn new(unravelers: Vec<RepDefUnraveler>) -> Self {
1838        Self { unravelers }
1839    }
1840
1841    /// Unravels a layer of validity
1842    ///
1843    /// Returns None if there are no null items in this layer
1844    pub fn unravel_validity(&mut self, num_values: usize) -> Option<NullBuffer> {
1845        let is_all_valid = self
1846            .unravelers
1847            .iter()
1848            .all(|unraveler| unraveler.is_all_valid());
1849
1850        if is_all_valid {
1851            for unraveler in self.unravelers.iter_mut() {
1852                unraveler.skip_validity();
1853            }
1854            None
1855        } else {
1856            let mut validity = BooleanBufferBuilder::new(num_values);
1857            for unraveler in self.unravelers.iter_mut() {
1858                unraveler.unravel_validity(&mut validity);
1859            }
1860            Some(NullBuffer::new(validity.finish()))
1861        }
1862    }
1863
1864    pub fn unravel_fsl_validity(
1865        &mut self,
1866        num_values: usize,
1867        dimension: usize,
1868    ) -> Option<NullBuffer> {
1869        for unraveler in self.unravelers.iter_mut() {
1870            unraveler.decimate(dimension);
1871        }
1872        self.unravel_validity(num_values)
1873    }
1874
1875    /// Unravels a layer of offsets (and the validity for that layer)
1876    pub fn unravel_offsets<T: ArrowNativeType>(
1877        &mut self,
1878    ) -> Result<(OffsetBuffer<T>, Option<NullBuffer>)> {
1879        let mut is_all_valid = true;
1880        let mut max_num_lists = 0;
1881        for unraveler in self.unravelers.iter() {
1882            is_all_valid &= unraveler.is_all_valid();
1883            max_num_lists += unraveler.max_lists();
1884        }
1885
1886        let mut validity = if is_all_valid {
1887            None
1888        } else {
1889            // Note: This is probably an over-estimate and potentially even an under-estimate.  We only know
1890            // right now how many items we have and not how many rows.  (TODO: Shouldn't we know the # of rows?)
1891            Some(BooleanBufferBuilder::new(max_num_lists))
1892        };
1893
1894        let mut offsets = Vec::with_capacity(max_num_lists + 1);
1895
1896        for unraveler in self.unravelers.iter_mut() {
1897            unraveler.unravel_offsets(&mut offsets, validity.as_mut())?;
1898        }
1899
1900        Ok((
1901            OffsetBuffer::new(ScalarBuffer::from(offsets)),
1902            validity.map(|mut v| NullBuffer::new(v.finish())),
1903        ))
1904    }
1905}
1906
1907/// A [`ControlWordIterator`] when there are both repetition and definition levels
1908///
1909/// The iterator will put the repetition level in the upper bits and the definition
1910/// level in the lower bits.  The number of bits used for each level is determined
1911/// by the width of the repetition and definition levels.
1912#[derive(Debug)]
1913pub struct BinaryControlWordIterator<I: Iterator<Item = (u16, u16)>, W> {
1914    repdef: I,
1915    def_width: usize,
1916    max_rep: u16,
1917    max_visible_def: u16,
1918    rep_mask: u16,
1919    def_mask: u16,
1920    bits_rep: u8,
1921    bits_def: u8,
1922    phantom: std::marker::PhantomData<W>,
1923}
1924
1925impl<I: Iterator<Item = (u16, u16)>> BinaryControlWordIterator<I, u8> {
1926    fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1927        let next = self.repdef.next()?;
1928        let control_word: u8 =
1929            (((next.0 & self.rep_mask) as u8) << self.def_width) + ((next.1 & self.def_mask) as u8);
1930        buf.push(control_word);
1931        let is_new_row = next.0 == self.max_rep;
1932        let is_visible = next.1 <= self.max_visible_def;
1933        let is_valid_item = next.1 == 0;
1934        Some(ControlWordDesc {
1935            is_new_row,
1936            is_visible,
1937            is_valid_item,
1938        })
1939    }
1940}
1941
1942impl<I: Iterator<Item = (u16, u16)>> BinaryControlWordIterator<I, u16> {
1943    fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1944        let next = self.repdef.next()?;
1945        let control_word: u16 =
1946            ((next.0 & self.rep_mask) << self.def_width) + (next.1 & self.def_mask);
1947        let control_word = control_word.to_le_bytes();
1948        buf.push(control_word[0]);
1949        buf.push(control_word[1]);
1950        let is_new_row = next.0 == self.max_rep;
1951        let is_visible = next.1 <= self.max_visible_def;
1952        let is_valid_item = next.1 == 0;
1953        Some(ControlWordDesc {
1954            is_new_row,
1955            is_visible,
1956            is_valid_item,
1957        })
1958    }
1959}
1960
1961impl<I: Iterator<Item = (u16, u16)>> BinaryControlWordIterator<I, u32> {
1962    fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1963        let next = self.repdef.next()?;
1964        let control_word: u32 = (((next.0 & self.rep_mask) as u32) << self.def_width)
1965            + ((next.1 & self.def_mask) as u32);
1966        let control_word = control_word.to_le_bytes();
1967        buf.push(control_word[0]);
1968        buf.push(control_word[1]);
1969        buf.push(control_word[2]);
1970        buf.push(control_word[3]);
1971        let is_new_row = next.0 == self.max_rep;
1972        let is_visible = next.1 <= self.max_visible_def;
1973        let is_valid_item = next.1 == 0;
1974        Some(ControlWordDesc {
1975            is_new_row,
1976            is_visible,
1977            is_valid_item,
1978        })
1979    }
1980}
1981
1982/// A [`ControlWordIterator`] when there are only definition levels or only repetition levels
1983#[derive(Debug)]
1984pub struct UnaryControlWordIterator<I: Iterator<Item = u16>, W> {
1985    repdef: I,
1986    level_mask: u16,
1987    bits_rep: u8,
1988    bits_def: u8,
1989    max_rep: u16,
1990    phantom: std::marker::PhantomData<W>,
1991}
1992
1993impl<I: Iterator<Item = u16>> UnaryControlWordIterator<I, u8> {
1994    fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1995        let next = self.repdef.next()?;
1996        buf.push((next & self.level_mask) as u8);
1997        let is_new_row = self.max_rep == 0 || next == self.max_rep;
1998        let is_valid_item = next == 0 || self.bits_def == 0;
1999        Some(ControlWordDesc {
2000            is_new_row,
2001            // Either there is no rep, in which case there are no invisible items
2002            // or there is no def, in which case there are no invisible items
2003            is_visible: true,
2004            is_valid_item,
2005        })
2006    }
2007}
2008
2009impl<I: Iterator<Item = u16>> UnaryControlWordIterator<I, u16> {
2010    fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
2011        let next = self.repdef.next().unwrap() & self.level_mask;
2012        let control_word = next.to_le_bytes();
2013        buf.push(control_word[0]);
2014        buf.push(control_word[1]);
2015        let is_new_row = self.max_rep == 0 || next == self.max_rep;
2016        let is_valid_item = next == 0 || self.bits_def == 0;
2017        Some(ControlWordDesc {
2018            is_new_row,
2019            is_visible: true,
2020            is_valid_item,
2021        })
2022    }
2023}
2024
2025impl<I: Iterator<Item = u16>> UnaryControlWordIterator<I, u32> {
2026    fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
2027        let next = self.repdef.next()?;
2028        let next = (next & self.level_mask) as u32;
2029        let control_word = next.to_le_bytes();
2030        buf.push(control_word[0]);
2031        buf.push(control_word[1]);
2032        buf.push(control_word[2]);
2033        buf.push(control_word[3]);
2034        let is_new_row = self.max_rep == 0 || next as u16 == self.max_rep;
2035        let is_valid_item = next == 0 || self.bits_def == 0;
2036        Some(ControlWordDesc {
2037            is_new_row,
2038            is_visible: true,
2039            is_valid_item,
2040        })
2041    }
2042}
2043
2044/// A [`ControlWordIterator`] when there are no repetition or definition levels
2045#[derive(Debug)]
2046pub struct NilaryControlWordIterator {
2047    len: usize,
2048    idx: usize,
2049}
2050
2051impl NilaryControlWordIterator {
2052    fn append_next(&mut self) -> Option<ControlWordDesc> {
2053        if self.idx == self.len {
2054            None
2055        } else {
2056            self.idx += 1;
2057            Some(ControlWordDesc {
2058                is_new_row: true,
2059                is_visible: true,
2060                is_valid_item: true,
2061            })
2062        }
2063    }
2064}
2065
2066/// Helper function to get a bit mask of the given width
2067fn get_mask(width: u16) -> u16 {
2068    (1 << width) - 1
2069}
2070
2071// We're really going out of our way to avoid boxing here but this will be called on a per-value basis
2072// so it is in the critical path.
2073type SpecificBinaryControlWordIterator<'a, T> = BinaryControlWordIterator<
2074    Zip<Copied<std::slice::Iter<'a, u16>>, Copied<std::slice::Iter<'a, u16>>>,
2075    T,
2076>;
2077
2078/// An iterator that generates control words from repetition and definition levels
2079///
2080/// "Control word" is just a fancy term for a single u8/u16/u32 that contains both
2081/// the repetition and definition in it.
2082///
2083/// In the large majority of case we only need a single byte to represent both the
2084/// repetition and definition levels.  However, if there is deep nesting then we may
2085/// need two bytes.  In the worst case we need 4 bytes though this suggests hundreds of
2086/// levels of nesting which seems unlikely to encounter in practice.
2087#[derive(Debug)]
2088pub enum ControlWordIterator<'a> {
2089    Binary8(SpecificBinaryControlWordIterator<'a, u8>),
2090    Binary16(SpecificBinaryControlWordIterator<'a, u16>),
2091    Binary32(SpecificBinaryControlWordIterator<'a, u32>),
2092    Unary8(UnaryControlWordIterator<Copied<std::slice::Iter<'a, u16>>, u8>),
2093    Unary16(UnaryControlWordIterator<Copied<std::slice::Iter<'a, u16>>, u16>),
2094    Unary32(UnaryControlWordIterator<Copied<std::slice::Iter<'a, u16>>, u32>),
2095    Nilary(NilaryControlWordIterator),
2096}
2097
2098/// Describes the properties of a control word
2099#[derive(Debug)]
2100pub struct ControlWordDesc {
2101    pub is_new_row: bool,
2102    pub is_visible: bool,
2103    pub is_valid_item: bool,
2104}
2105
2106impl ControlWordIterator<'_> {
2107    /// Appends the next control word to the buffer
2108    ///
2109    /// Returns true if this is the start of a new item (i.e. the repetition level is maxed out)
2110    pub fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
2111        match self {
2112            Self::Binary8(iter) => iter.append_next(buf),
2113            Self::Binary16(iter) => iter.append_next(buf),
2114            Self::Binary32(iter) => iter.append_next(buf),
2115            Self::Unary8(iter) => iter.append_next(buf),
2116            Self::Unary16(iter) => iter.append_next(buf),
2117            Self::Unary32(iter) => iter.append_next(buf),
2118            Self::Nilary(iter) => iter.append_next(),
2119        }
2120    }
2121
2122    /// Return true if the control word iterator has repetition levels
2123    pub fn has_repetition(&self) -> bool {
2124        match self {
2125            Self::Binary8(_) | Self::Binary16(_) | Self::Binary32(_) => true,
2126            Self::Unary8(iter) => iter.bits_rep > 0,
2127            Self::Unary16(iter) => iter.bits_rep > 0,
2128            Self::Unary32(iter) => iter.bits_rep > 0,
2129            Self::Nilary(_) => false,
2130        }
2131    }
2132
2133    /// Returns the number of bytes per control word
2134    pub fn bytes_per_word(&self) -> usize {
2135        match self {
2136            Self::Binary8(_) => 1,
2137            Self::Binary16(_) => 2,
2138            Self::Binary32(_) => 4,
2139            Self::Unary8(_) => 1,
2140            Self::Unary16(_) => 2,
2141            Self::Unary32(_) => 4,
2142            Self::Nilary(_) => 0,
2143        }
2144    }
2145
2146    /// Returns the number of bits used for the repetition level
2147    pub fn bits_rep(&self) -> u8 {
2148        match self {
2149            Self::Binary8(iter) => iter.bits_rep,
2150            Self::Binary16(iter) => iter.bits_rep,
2151            Self::Binary32(iter) => iter.bits_rep,
2152            Self::Unary8(iter) => iter.bits_rep,
2153            Self::Unary16(iter) => iter.bits_rep,
2154            Self::Unary32(iter) => iter.bits_rep,
2155            Self::Nilary(_) => 0,
2156        }
2157    }
2158
2159    /// Returns the number of bits used for the definition level
2160    pub fn bits_def(&self) -> u8 {
2161        match self {
2162            Self::Binary8(iter) => iter.bits_def,
2163            Self::Binary16(iter) => iter.bits_def,
2164            Self::Binary32(iter) => iter.bits_def,
2165            Self::Unary8(iter) => iter.bits_def,
2166            Self::Unary16(iter) => iter.bits_def,
2167            Self::Unary32(iter) => iter.bits_def,
2168            Self::Nilary(_) => 0,
2169        }
2170    }
2171}
2172
2173/// Builds a [`ControlWordIterator`] from repetition and definition levels
2174/// by first calculating the width needed and then creating the iterator
2175/// with the appropriate width
2176pub fn build_control_word_iterator<'a>(
2177    rep: Option<&'a [u16]>,
2178    max_rep: u16,
2179    def: Option<&'a [u16]>,
2180    max_def: u16,
2181    max_visible_def: u16,
2182    len: usize,
2183) -> ControlWordIterator<'a> {
2184    let rep_width = if max_rep == 0 {
2185        0
2186    } else {
2187        log_2_ceil(max_rep as u32) as u16
2188    };
2189    let rep_mask = if max_rep == 0 { 0 } else { get_mask(rep_width) };
2190    let def_width = if max_def == 0 {
2191        0
2192    } else {
2193        log_2_ceil(max_def as u32) as u16
2194    };
2195    let def_mask = if max_def == 0 { 0 } else { get_mask(def_width) };
2196    let total_width = rep_width + def_width;
2197    match (rep, def) {
2198        (Some(rep), Some(def)) => {
2199            let iter = rep.iter().copied().zip(def.iter().copied());
2200            let def_width = def_width as usize;
2201            if total_width <= 8 {
2202                ControlWordIterator::Binary8(BinaryControlWordIterator {
2203                    repdef: iter,
2204                    rep_mask,
2205                    def_mask,
2206                    def_width,
2207                    max_rep,
2208                    max_visible_def,
2209                    bits_rep: rep_width as u8,
2210                    bits_def: def_width as u8,
2211                    phantom: std::marker::PhantomData,
2212                })
2213            } else if total_width <= 16 {
2214                ControlWordIterator::Binary16(BinaryControlWordIterator {
2215                    repdef: iter,
2216                    rep_mask,
2217                    def_mask,
2218                    def_width,
2219                    max_rep,
2220                    max_visible_def,
2221                    bits_rep: rep_width as u8,
2222                    bits_def: def_width as u8,
2223                    phantom: std::marker::PhantomData,
2224                })
2225            } else {
2226                ControlWordIterator::Binary32(BinaryControlWordIterator {
2227                    repdef: iter,
2228                    rep_mask,
2229                    def_mask,
2230                    def_width,
2231                    max_rep,
2232                    max_visible_def,
2233                    bits_rep: rep_width as u8,
2234                    bits_def: def_width as u8,
2235                    phantom: std::marker::PhantomData,
2236                })
2237            }
2238        }
2239        (Some(lev), None) => {
2240            let iter = lev.iter().copied();
2241            if total_width <= 8 {
2242                ControlWordIterator::Unary8(UnaryControlWordIterator {
2243                    repdef: iter,
2244                    level_mask: rep_mask,
2245                    bits_rep: total_width as u8,
2246                    bits_def: 0,
2247                    max_rep,
2248                    phantom: std::marker::PhantomData,
2249                })
2250            } else if total_width <= 16 {
2251                ControlWordIterator::Unary16(UnaryControlWordIterator {
2252                    repdef: iter,
2253                    level_mask: rep_mask,
2254                    bits_rep: total_width as u8,
2255                    bits_def: 0,
2256                    max_rep,
2257                    phantom: std::marker::PhantomData,
2258                })
2259            } else {
2260                ControlWordIterator::Unary32(UnaryControlWordIterator {
2261                    repdef: iter,
2262                    level_mask: rep_mask,
2263                    bits_rep: total_width as u8,
2264                    bits_def: 0,
2265                    max_rep,
2266                    phantom: std::marker::PhantomData,
2267                })
2268            }
2269        }
2270        (None, Some(lev)) => {
2271            let iter = lev.iter().copied();
2272            if total_width <= 8 {
2273                ControlWordIterator::Unary8(UnaryControlWordIterator {
2274                    repdef: iter,
2275                    level_mask: def_mask,
2276                    bits_rep: 0,
2277                    bits_def: total_width as u8,
2278                    max_rep: 0,
2279                    phantom: std::marker::PhantomData,
2280                })
2281            } else if total_width <= 16 {
2282                ControlWordIterator::Unary16(UnaryControlWordIterator {
2283                    repdef: iter,
2284                    level_mask: def_mask,
2285                    bits_rep: 0,
2286                    bits_def: total_width as u8,
2287                    max_rep: 0,
2288                    phantom: std::marker::PhantomData,
2289                })
2290            } else {
2291                ControlWordIterator::Unary32(UnaryControlWordIterator {
2292                    repdef: iter,
2293                    level_mask: def_mask,
2294                    bits_rep: 0,
2295                    bits_def: total_width as u8,
2296                    max_rep: 0,
2297                    phantom: std::marker::PhantomData,
2298                })
2299            }
2300        }
2301        (None, None) => ControlWordIterator::Nilary(NilaryControlWordIterator { len, idx: 0 }),
2302    }
2303}
2304
2305/// A parser to unwrap control words into repetition and definition levels
2306///
2307/// This is the inverse of the [`ControlWordIterator`].
2308#[derive(Copy, Clone, Debug)]
2309pub enum ControlWordParser {
2310    // First item is the bits to shift, second is the mask to apply (the mask can be
2311    // calculated from the bits to shift but we don't want to calculate it each time)
2312    BOTH8(u8, u32),
2313    BOTH16(u8, u32),
2314    BOTH32(u8, u32),
2315    REP8,
2316    REP16,
2317    REP32,
2318    DEF8,
2319    DEF16,
2320    DEF32,
2321    NIL,
2322}
2323
2324impl ControlWordParser {
2325    fn parse_both<const WORD_SIZE: u8>(
2326        src: &[u8],
2327        dst_rep: &mut Vec<u16>,
2328        dst_def: &mut Vec<u16>,
2329        bits_to_shift: u8,
2330        mask_to_apply: u32,
2331    ) {
2332        match WORD_SIZE {
2333            1 => {
2334                let word = src[0];
2335                let rep = word >> bits_to_shift;
2336                let def = word & (mask_to_apply as u8);
2337                dst_rep.push(rep as u16);
2338                dst_def.push(def as u16);
2339            }
2340            2 => {
2341                let word = u16::from_le_bytes([src[0], src[1]]);
2342                let rep = word >> bits_to_shift;
2343                let def = word & mask_to_apply as u16;
2344                dst_rep.push(rep);
2345                dst_def.push(def);
2346            }
2347            4 => {
2348                let word = u32::from_le_bytes([src[0], src[1], src[2], src[3]]);
2349                let rep = word >> bits_to_shift;
2350                let def = word & mask_to_apply;
2351                dst_rep.push(rep as u16);
2352                dst_def.push(def as u16);
2353            }
2354            _ => unreachable!(),
2355        }
2356    }
2357
2358    fn parse_desc_both<const WORD_SIZE: u8>(
2359        src: &[u8],
2360        bits_to_shift: u8,
2361        mask_to_apply: u32,
2362        max_rep: u16,
2363        max_visible_def: u16,
2364    ) -> ControlWordDesc {
2365        match WORD_SIZE {
2366            1 => {
2367                let word = src[0];
2368                let rep = word >> bits_to_shift;
2369                let def = word & (mask_to_apply as u8);
2370                let is_visible = def as u16 <= max_visible_def;
2371                let is_new_row = rep as u16 == max_rep;
2372                let is_valid_item = def == 0;
2373                ControlWordDesc {
2374                    is_visible,
2375                    is_new_row,
2376                    is_valid_item,
2377                }
2378            }
2379            2 => {
2380                let word = u16::from_le_bytes([src[0], src[1]]);
2381                let rep = word >> bits_to_shift;
2382                let def = word & mask_to_apply as u16;
2383                let is_visible = def <= max_visible_def;
2384                let is_new_row = rep == max_rep;
2385                let is_valid_item = def == 0;
2386                ControlWordDesc {
2387                    is_visible,
2388                    is_new_row,
2389                    is_valid_item,
2390                }
2391            }
2392            4 => {
2393                let word = u32::from_le_bytes([src[0], src[1], src[2], src[3]]);
2394                let rep = word >> bits_to_shift;
2395                let def = word & mask_to_apply;
2396                let is_visible = def as u16 <= max_visible_def;
2397                let is_new_row = rep as u16 == max_rep;
2398                let is_valid_item = def == 0;
2399                ControlWordDesc {
2400                    is_visible,
2401                    is_new_row,
2402                    is_valid_item,
2403                }
2404            }
2405            _ => unreachable!(),
2406        }
2407    }
2408
2409    fn parse_one<const WORD_SIZE: u8>(src: &[u8], dst: &mut Vec<u16>) {
2410        match WORD_SIZE {
2411            1 => {
2412                let word = src[0];
2413                dst.push(word as u16);
2414            }
2415            2 => {
2416                let word = u16::from_le_bytes([src[0], src[1]]);
2417                dst.push(word);
2418            }
2419            4 => {
2420                let word = u32::from_le_bytes([src[0], src[1], src[2], src[3]]);
2421                dst.push(word as u16);
2422            }
2423            _ => unreachable!(),
2424        }
2425    }
2426
2427    fn parse_rep_desc_one<const WORD_SIZE: u8>(src: &[u8], max_rep: u16) -> ControlWordDesc {
2428        match WORD_SIZE {
2429            1 => ControlWordDesc {
2430                is_new_row: src[0] as u16 == max_rep,
2431                is_visible: true,
2432                is_valid_item: true,
2433            },
2434            2 => ControlWordDesc {
2435                is_new_row: u16::from_le_bytes([src[0], src[1]]) == max_rep,
2436                is_visible: true,
2437                is_valid_item: true,
2438            },
2439            4 => ControlWordDesc {
2440                is_new_row: u32::from_le_bytes([src[0], src[1], src[2], src[3]]) as u16 == max_rep,
2441                is_visible: true,
2442                is_valid_item: true,
2443            },
2444            _ => unreachable!(),
2445        }
2446    }
2447
2448    fn parse_def_desc_one<const WORD_SIZE: u8>(src: &[u8]) -> ControlWordDesc {
2449        match WORD_SIZE {
2450            1 => ControlWordDesc {
2451                is_new_row: true,
2452                is_visible: true,
2453                is_valid_item: src[0] == 0,
2454            },
2455            2 => ControlWordDesc {
2456                is_new_row: true,
2457                is_visible: true,
2458                is_valid_item: u16::from_le_bytes([src[0], src[1]]) == 0,
2459            },
2460            4 => ControlWordDesc {
2461                is_new_row: true,
2462                is_visible: true,
2463                is_valid_item: u32::from_le_bytes([src[0], src[1], src[2], src[3]]) as u16 == 0,
2464            },
2465            _ => unreachable!(),
2466        }
2467    }
2468
2469    /// Returns the number of bytes per control word
2470    pub fn bytes_per_word(&self) -> usize {
2471        match self {
2472            Self::BOTH8(..) => 1,
2473            Self::BOTH16(..) => 2,
2474            Self::BOTH32(..) => 4,
2475            Self::REP8 => 1,
2476            Self::REP16 => 2,
2477            Self::REP32 => 4,
2478            Self::DEF8 => 1,
2479            Self::DEF16 => 2,
2480            Self::DEF32 => 4,
2481            Self::NIL => 0,
2482        }
2483    }
2484
2485    /// Appends the next control word to the rep & def buffers
2486    ///
2487    /// `src` should be pointing at the first byte (little endian) of the control word
2488    ///
2489    /// `dst_rep` and `dst_def` are the buffers to append the rep and def levels to.
2490    /// They will not be appended to if not needed.
2491    pub fn parse(&self, src: &[u8], dst_rep: &mut Vec<u16>, dst_def: &mut Vec<u16>) {
2492        match self {
2493            Self::BOTH8(bits_to_shift, mask_to_apply) => {
2494                Self::parse_both::<1>(src, dst_rep, dst_def, *bits_to_shift, *mask_to_apply)
2495            }
2496            Self::BOTH16(bits_to_shift, mask_to_apply) => {
2497                Self::parse_both::<2>(src, dst_rep, dst_def, *bits_to_shift, *mask_to_apply)
2498            }
2499            Self::BOTH32(bits_to_shift, mask_to_apply) => {
2500                Self::parse_both::<4>(src, dst_rep, dst_def, *bits_to_shift, *mask_to_apply)
2501            }
2502            Self::REP8 => Self::parse_one::<1>(src, dst_rep),
2503            Self::REP16 => Self::parse_one::<2>(src, dst_rep),
2504            Self::REP32 => Self::parse_one::<4>(src, dst_rep),
2505            Self::DEF8 => Self::parse_one::<1>(src, dst_def),
2506            Self::DEF16 => Self::parse_one::<2>(src, dst_def),
2507            Self::DEF32 => Self::parse_one::<4>(src, dst_def),
2508            Self::NIL => {}
2509        }
2510    }
2511
2512    /// Return true if the control words contain repetition information
2513    pub fn has_rep(&self) -> bool {
2514        match self {
2515            Self::BOTH8(..)
2516            | Self::BOTH16(..)
2517            | Self::BOTH32(..)
2518            | Self::REP8
2519            | Self::REP16
2520            | Self::REP32 => true,
2521            Self::DEF8 | Self::DEF16 | Self::DEF32 | Self::NIL => false,
2522        }
2523    }
2524
2525    /// Temporarily parses the control word to inspect its properties but does not append to any buffers
2526    pub fn parse_desc(&self, src: &[u8], max_rep: u16, max_visible_def: u16) -> ControlWordDesc {
2527        match self {
2528            Self::BOTH8(bits_to_shift, mask_to_apply) => Self::parse_desc_both::<1>(
2529                src,
2530                *bits_to_shift,
2531                *mask_to_apply,
2532                max_rep,
2533                max_visible_def,
2534            ),
2535            Self::BOTH16(bits_to_shift, mask_to_apply) => Self::parse_desc_both::<2>(
2536                src,
2537                *bits_to_shift,
2538                *mask_to_apply,
2539                max_rep,
2540                max_visible_def,
2541            ),
2542            Self::BOTH32(bits_to_shift, mask_to_apply) => Self::parse_desc_both::<4>(
2543                src,
2544                *bits_to_shift,
2545                *mask_to_apply,
2546                max_rep,
2547                max_visible_def,
2548            ),
2549            Self::REP8 => Self::parse_rep_desc_one::<1>(src, max_rep),
2550            Self::REP16 => Self::parse_rep_desc_one::<2>(src, max_rep),
2551            Self::REP32 => Self::parse_rep_desc_one::<4>(src, max_rep),
2552            Self::DEF8 => Self::parse_def_desc_one::<1>(src),
2553            Self::DEF16 => Self::parse_def_desc_one::<2>(src),
2554            Self::DEF32 => Self::parse_def_desc_one::<4>(src),
2555            Self::NIL => ControlWordDesc {
2556                is_new_row: true,
2557                is_valid_item: true,
2558                is_visible: true,
2559            },
2560        }
2561    }
2562
2563    /// Creates a new parser from the number of bits used for the repetition and definition levels
2564    pub fn new(bits_rep: u8, bits_def: u8) -> Self {
2565        let total_bits = bits_rep + bits_def;
2566
2567        enum WordSize {
2568            One,
2569            Two,
2570            Four,
2571        }
2572
2573        let word_size = if total_bits <= 8 {
2574            WordSize::One
2575        } else if total_bits <= 16 {
2576            WordSize::Two
2577        } else {
2578            WordSize::Four
2579        };
2580
2581        match (bits_rep > 0, bits_def > 0, word_size) {
2582            (false, false, _) => Self::NIL,
2583            (false, true, WordSize::One) => Self::DEF8,
2584            (false, true, WordSize::Two) => Self::DEF16,
2585            (false, true, WordSize::Four) => Self::DEF32,
2586            (true, false, WordSize::One) => Self::REP8,
2587            (true, false, WordSize::Two) => Self::REP16,
2588            (true, false, WordSize::Four) => Self::REP32,
2589            (true, true, WordSize::One) => Self::BOTH8(bits_def, get_mask(bits_def as u16) as u32),
2590            (true, true, WordSize::Two) => Self::BOTH16(bits_def, get_mask(bits_def as u16) as u32),
2591            (true, true, WordSize::Four) => {
2592                Self::BOTH32(bits_def, get_mask(bits_def as u16) as u32)
2593            }
2594        }
2595    }
2596}
2597
2598#[cfg(test)]
2599mod tests {
2600    use arrow_buffer::{NullBuffer, OffsetBuffer, ScalarBuffer};
2601
2602    use crate::repdef::{
2603        CompositeRepDefUnraveler, DefinitionInterpretation, RepDefUnraveler, SerializedRepDefs,
2604    };
2605
2606    use super::RepDefBuilder;
2607
2608    fn validity(values: &[bool]) -> NullBuffer {
2609        NullBuffer::from_iter(values.iter().copied())
2610    }
2611
2612    fn offsets_32(values: &[i32]) -> OffsetBuffer<i32> {
2613        OffsetBuffer::<i32>::new(ScalarBuffer::from_iter(values.iter().copied()))
2614    }
2615
2616    fn offsets_64(values: &[i64]) -> OffsetBuffer<i64> {
2617        OffsetBuffer::<i64>::new(ScalarBuffer::from_iter(values.iter().copied()))
2618    }
2619
2620    #[test]
2621    fn test_repdef_empty_offsets() {
2622        // Empty offsets should serialize without panicking.
2623        let mut builder = RepDefBuilder::default();
2624        builder.add_offsets(offsets_32(&[0]), None);
2625        let repdefs = RepDefBuilder::serialize(vec![builder]);
2626        assert!(repdefs.repetition_levels.is_none());
2627        assert!(repdefs.definition_levels.is_none());
2628    }
2629
2630    #[test]
2631    fn test_repdef_basic() {
2632        // Basic case, rep & def
2633        let mut builder = RepDefBuilder::default();
2634        builder.add_offsets(
2635            offsets_64(&[0, 2, 2, 5]),
2636            Some(validity(&[true, false, true])),
2637        );
2638        builder.add_offsets(
2639            offsets_64(&[0, 1, 3, 5, 5, 9]),
2640            Some(validity(&[true, true, true, false, true])),
2641        );
2642        builder.add_validity_bitmap(validity(&[
2643            true, true, true, false, false, false, true, true, false,
2644        ]));
2645
2646        let repdefs = RepDefBuilder::serialize(vec![builder]);
2647        let rep = repdefs.repetition_levels.unwrap();
2648        let def = repdefs.definition_levels.unwrap();
2649
2650        assert_eq!(vec![0, 0, 0, 3, 1, 1, 2, 1, 0, 0, 1], *def);
2651        assert_eq!(vec![2, 1, 0, 2, 2, 0, 1, 1, 0, 0, 0], *rep);
2652
2653        // [[I], [I, I]], NULL, [[NULL, NULL], NULL, [NULL, I, I, NULL]]
2654
2655        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2656            Some(rep.as_ref().to_vec()),
2657            Some(def.as_ref().to_vec()),
2658            repdefs.def_meaning.into(),
2659            9,
2660        )]);
2661
2662        // Note: validity doesn't exactly round-trip because repdef normalizes some of the
2663        // redundant validity values
2664        assert_eq!(
2665            unraveler.unravel_validity(9),
2666            Some(validity(&[
2667                true, true, true, false, false, false, true, true, false
2668            ]))
2669        );
2670        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2671        assert_eq!(off.inner(), offsets_32(&[0, 1, 3, 5, 5, 9]).inner());
2672        assert_eq!(val, Some(validity(&[true, true, true, false, true])));
2673        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2674        assert_eq!(off.inner(), offsets_32(&[0, 2, 2, 5]).inner());
2675        assert_eq!(val, Some(validity(&[true, false, true])));
2676    }
2677
2678    #[test]
2679    fn test_repdef_simple_null_empty_list() {
2680        let check = |repdefs: SerializedRepDefs, last_def: DefinitionInterpretation| {
2681            let rep = repdefs.repetition_levels.unwrap();
2682            let def = repdefs.definition_levels.unwrap();
2683
2684            assert_eq!([1, 0, 1, 1, 0, 0], *rep);
2685            assert_eq!([0, 0, 2, 0, 1, 0], *def);
2686            assert_eq!(
2687                vec![DefinitionInterpretation::NullableItem, last_def,],
2688                repdefs.def_meaning
2689            );
2690        };
2691
2692        // Null list and empty list should be serialized mostly the same
2693
2694        // Null case
2695        let mut builder = RepDefBuilder::default();
2696        builder.add_offsets(
2697            offsets_32(&[0, 2, 2, 5]),
2698            Some(validity(&[true, false, true])),
2699        );
2700        builder.add_validity_bitmap(validity(&[true, true, true, false, true]));
2701
2702        let repdefs = RepDefBuilder::serialize(vec![builder]);
2703
2704        check(repdefs, DefinitionInterpretation::NullableList);
2705
2706        // Empty case
2707        let mut builder = RepDefBuilder::default();
2708        builder.add_offsets(offsets_32(&[0, 2, 2, 5]), None);
2709        builder.add_validity_bitmap(validity(&[true, true, true, false, true]));
2710
2711        let repdefs = RepDefBuilder::serialize(vec![builder]);
2712
2713        check(repdefs, DefinitionInterpretation::EmptyableList);
2714    }
2715
2716    #[test]
2717    fn test_repdef_empty_list_at_end() {
2718        // Regresses a failure we encountered when the last item was an empty list
2719        let mut builder = RepDefBuilder::default();
2720        builder.add_offsets(offsets_32(&[0, 2, 5, 5]), None);
2721        builder.add_validity_bitmap(validity(&[true, true, true, false, true]));
2722
2723        let repdefs = RepDefBuilder::serialize(vec![builder]);
2724
2725        let rep = repdefs.repetition_levels.unwrap();
2726        let def = repdefs.definition_levels.unwrap();
2727
2728        assert_eq!([1, 0, 1, 0, 0, 1], *rep);
2729        assert_eq!([0, 0, 0, 1, 0, 2], *def);
2730        assert_eq!(
2731            vec![
2732                DefinitionInterpretation::NullableItem,
2733                DefinitionInterpretation::EmptyableList,
2734            ],
2735            repdefs.def_meaning
2736        );
2737    }
2738
2739    #[test]
2740    fn test_repdef_abnormal_nulls() {
2741        // List nulls are allowed to have non-empty offsets and garbage values
2742        // and the add_offsets call should normalize this
2743        let mut builder = RepDefBuilder::default();
2744        builder.add_offsets(
2745            offsets_32(&[0, 2, 5, 8]),
2746            Some(validity(&[true, false, true])),
2747        );
2748        // Note: we pass 5 here and not 8.  If add_offsets tells us there is garbage nulls they
2749        // should be removed before continuing
2750        builder.add_no_null(5);
2751
2752        let repdefs = RepDefBuilder::serialize(vec![builder]);
2753
2754        let rep = repdefs.repetition_levels.unwrap();
2755        let def = repdefs.definition_levels.unwrap();
2756
2757        assert_eq!([1, 0, 1, 1, 0, 0], *rep);
2758        assert_eq!([0, 0, 1, 0, 0, 0], *def);
2759
2760        assert_eq!(
2761            vec![
2762                DefinitionInterpretation::AllValidItem,
2763                DefinitionInterpretation::NullableList,
2764            ],
2765            repdefs.def_meaning
2766        );
2767    }
2768
2769    #[test]
2770    fn test_repdef_fsl() {
2771        let mut builder = RepDefBuilder::default();
2772        builder.add_fsl(Some(validity(&[true, false])), 2, 2);
2773        builder.add_fsl(None, 2, 4);
2774        builder.add_validity_bitmap(validity(&[
2775            true, false, true, false, true, false, true, false,
2776        ]));
2777
2778        let repdefs = RepDefBuilder::serialize(vec![builder]);
2779
2780        assert_eq!(
2781            vec![
2782                DefinitionInterpretation::NullableItem,
2783                DefinitionInterpretation::AllValidItem,
2784                DefinitionInterpretation::NullableItem
2785            ],
2786            repdefs.def_meaning
2787        );
2788
2789        assert!(repdefs.repetition_levels.is_none());
2790
2791        let def = repdefs.definition_levels.unwrap();
2792
2793        assert_eq!([0, 1, 0, 1, 2, 2, 2, 2], *def);
2794
2795        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2796            None,
2797            Some(def.as_ref().to_vec()),
2798            repdefs.def_meaning.into(),
2799            8,
2800        )]);
2801
2802        assert_eq!(
2803            unraveler.unravel_validity(8),
2804            Some(validity(&[
2805                true, false, true, false, false, false, false, false
2806            ]))
2807        );
2808        assert_eq!(unraveler.unravel_fsl_validity(4, 2), None);
2809        assert_eq!(
2810            unraveler.unravel_fsl_validity(2, 2),
2811            Some(validity(&[true, false]))
2812        );
2813    }
2814
2815    #[test]
2816    fn test_repdef_fsl_allvalid_item() {
2817        let mut builder = RepDefBuilder::default();
2818        builder.add_fsl(Some(validity(&[true, false])), 2, 2);
2819        builder.add_fsl(None, 2, 4);
2820        builder.add_no_null(8);
2821
2822        let repdefs = RepDefBuilder::serialize(vec![builder]);
2823
2824        assert_eq!(
2825            vec![
2826                DefinitionInterpretation::AllValidItem,
2827                DefinitionInterpretation::AllValidItem,
2828                DefinitionInterpretation::NullableItem
2829            ],
2830            repdefs.def_meaning
2831        );
2832
2833        assert!(repdefs.repetition_levels.is_none());
2834
2835        let def = repdefs.definition_levels.unwrap();
2836
2837        assert_eq!([0, 0, 0, 0, 1, 1, 1, 1], *def);
2838
2839        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2840            None,
2841            Some(def.as_ref().to_vec()),
2842            repdefs.def_meaning.into(),
2843            8,
2844        )]);
2845
2846        assert_eq!(unraveler.unravel_validity(8), None);
2847        assert_eq!(unraveler.unravel_fsl_validity(4, 2), None);
2848        assert_eq!(
2849            unraveler.unravel_fsl_validity(2, 2),
2850            Some(validity(&[true, false]))
2851        );
2852    }
2853
2854    #[test]
2855    fn test_repdef_sliced_offsets() {
2856        // Sliced lists may have offsets that don't start with zero.  The
2857        // add_offsets call needs to normalize these to operate correctly.
2858        let mut builder = RepDefBuilder::default();
2859        builder.add_offsets(
2860            offsets_32(&[5, 7, 7, 10]),
2861            Some(validity(&[true, false, true])),
2862        );
2863        builder.add_no_null(5);
2864
2865        let repdefs = RepDefBuilder::serialize(vec![builder]);
2866
2867        let rep = repdefs.repetition_levels.unwrap();
2868        let def = repdefs.definition_levels.unwrap();
2869
2870        assert_eq!([1, 0, 1, 1, 0, 0], *rep);
2871        assert_eq!([0, 0, 1, 0, 0, 0], *def);
2872
2873        assert_eq!(
2874            vec![
2875                DefinitionInterpretation::AllValidItem,
2876                DefinitionInterpretation::NullableList,
2877            ],
2878            repdefs.def_meaning
2879        );
2880    }
2881
2882    #[test]
2883    fn test_repdef_complex_null_empty() {
2884        let mut builder = RepDefBuilder::default();
2885        builder.add_offsets(
2886            offsets_32(&[0, 4, 4, 4, 6]),
2887            Some(validity(&[true, false, true, true])),
2888        );
2889        builder.add_offsets(
2890            offsets_32(&[0, 1, 1, 2, 2, 2, 3]),
2891            Some(validity(&[true, false, true, false, true, true])),
2892        );
2893        builder.add_no_null(3);
2894
2895        let repdefs = RepDefBuilder::serialize(vec![builder]);
2896
2897        let rep = repdefs.repetition_levels.unwrap();
2898        let def = repdefs.definition_levels.unwrap();
2899
2900        assert_eq!([2, 1, 1, 1, 2, 2, 2, 1], *rep);
2901        assert_eq!([0, 1, 0, 1, 3, 4, 2, 0], *def);
2902    }
2903
2904    #[test]
2905    fn test_repdef_empty_list_no_null() {
2906        // Tests when we have some empty lists but no null lists.  This case
2907        // caused some bugs because we have definition but no nulls
2908        let mut builder = RepDefBuilder::default();
2909        builder.add_offsets(offsets_32(&[0, 4, 4, 4, 6]), None);
2910        builder.add_no_null(6);
2911
2912        let repdefs = RepDefBuilder::serialize(vec![builder]);
2913
2914        let rep = repdefs.repetition_levels.unwrap();
2915        let def = repdefs.definition_levels.unwrap();
2916
2917        assert_eq!([1, 0, 0, 0, 1, 1, 1, 0], *rep);
2918        assert_eq!([0, 0, 0, 0, 1, 1, 0, 0], *def);
2919
2920        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2921            Some(rep.as_ref().to_vec()),
2922            Some(def.as_ref().to_vec()),
2923            repdefs.def_meaning.into(),
2924            8,
2925        )]);
2926
2927        assert_eq!(unraveler.unravel_validity(6), None);
2928        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2929        assert_eq!(off.inner(), offsets_32(&[0, 4, 4, 4, 6]).inner());
2930        assert_eq!(val, None);
2931    }
2932
2933    #[test]
2934    fn test_repdef_all_valid() {
2935        let mut builder = RepDefBuilder::default();
2936        builder.add_offsets(offsets_64(&[0, 2, 3, 5]), None);
2937        builder.add_offsets(offsets_64(&[0, 1, 3, 5, 7, 9]), None);
2938        builder.add_no_null(9);
2939
2940        let repdefs = RepDefBuilder::serialize(vec![builder]);
2941        let rep = repdefs.repetition_levels.unwrap();
2942        assert!(repdefs.definition_levels.is_none());
2943
2944        assert_eq!([2, 1, 0, 2, 0, 2, 0, 1, 0], *rep);
2945
2946        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2947            Some(rep.as_ref().to_vec()),
2948            None,
2949            repdefs.def_meaning.into(),
2950            9,
2951        )]);
2952
2953        assert_eq!(unraveler.unravel_validity(9), None);
2954        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2955        assert_eq!(off.inner(), offsets_32(&[0, 1, 3, 5, 7, 9]).inner());
2956        assert_eq!(val, None);
2957        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2958        assert_eq!(off.inner(), offsets_32(&[0, 2, 3, 5]).inner());
2959        assert_eq!(val, None);
2960    }
2961
2962    #[test]
2963    fn test_only_empty_lists() {
2964        let mut builder = RepDefBuilder::default();
2965        builder.add_offsets(offsets_32(&[0, 4, 4, 4, 6]), None);
2966        builder.add_no_null(6);
2967
2968        let repdefs = RepDefBuilder::serialize(vec![builder]);
2969
2970        let rep = repdefs.repetition_levels.unwrap();
2971        let def = repdefs.definition_levels.unwrap();
2972
2973        assert_eq!([1, 0, 0, 0, 1, 1, 1, 0], *rep);
2974        assert_eq!([0, 0, 0, 0, 1, 1, 0, 0], *def);
2975
2976        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2977            Some(rep.as_ref().to_vec()),
2978            Some(def.as_ref().to_vec()),
2979            repdefs.def_meaning.into(),
2980            8,
2981        )]);
2982
2983        assert_eq!(unraveler.unravel_validity(6), None);
2984        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2985        assert_eq!(off.inner(), offsets_32(&[0, 4, 4, 4, 6]).inner());
2986        assert_eq!(val, None);
2987    }
2988
2989    #[test]
2990    fn test_only_null_lists() {
2991        let mut builder = RepDefBuilder::default();
2992        builder.add_offsets(
2993            offsets_32(&[0, 4, 4, 4, 6]),
2994            Some(validity(&[true, false, false, true])),
2995        );
2996        builder.add_no_null(6);
2997
2998        let repdefs = RepDefBuilder::serialize(vec![builder]);
2999
3000        let rep = repdefs.repetition_levels.unwrap();
3001        let def = repdefs.definition_levels.unwrap();
3002
3003        assert_eq!([1, 0, 0, 0, 1, 1, 1, 0], *rep);
3004        assert_eq!([0, 0, 0, 0, 1, 1, 0, 0], *def);
3005
3006        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
3007            Some(rep.as_ref().to_vec()),
3008            Some(def.as_ref().to_vec()),
3009            repdefs.def_meaning.into(),
3010            8,
3011        )]);
3012
3013        assert_eq!(unraveler.unravel_validity(6), None);
3014        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
3015        assert_eq!(off.inner(), offsets_32(&[0, 4, 4, 4, 6]).inner());
3016        assert_eq!(val, Some(validity(&[true, false, false, true])));
3017    }
3018
3019    #[test]
3020    fn test_null_and_empty_lists() {
3021        let mut builder = RepDefBuilder::default();
3022        builder.add_offsets(
3023            offsets_32(&[0, 4, 4, 4, 6]),
3024            Some(validity(&[true, false, true, true])),
3025        );
3026        builder.add_no_null(6);
3027
3028        let repdefs = RepDefBuilder::serialize(vec![builder]);
3029
3030        let rep = repdefs.repetition_levels.unwrap();
3031        let def = repdefs.definition_levels.unwrap();
3032
3033        assert_eq!([1, 0, 0, 0, 1, 1, 1, 0], *rep);
3034        assert_eq!([0, 0, 0, 0, 1, 2, 0, 0], *def);
3035
3036        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
3037            Some(rep.as_ref().to_vec()),
3038            Some(def.as_ref().to_vec()),
3039            repdefs.def_meaning.into(),
3040            8,
3041        )]);
3042
3043        assert_eq!(unraveler.unravel_validity(6), None);
3044        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
3045        assert_eq!(off.inner(), offsets_32(&[0, 4, 4, 4, 6]).inner());
3046        assert_eq!(val, Some(validity(&[true, false, true, true])));
3047    }
3048
3049    #[test]
3050    fn test_repdef_null_struct_valid_list() {
3051        // This regresses a bug
3052
3053        let rep = vec![1, 0, 0, 0];
3054        let def = vec![2, 0, 2, 2];
3055        // AllValidList<NullableStruct<NullableItem>>
3056        let def_meaning = vec![
3057            DefinitionInterpretation::NullableItem,
3058            DefinitionInterpretation::NullableItem,
3059            DefinitionInterpretation::AllValidList,
3060        ];
3061        let num_items = 4;
3062
3063        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
3064            Some(rep),
3065            Some(def),
3066            def_meaning.into(),
3067            num_items,
3068        )]);
3069
3070        assert_eq!(
3071            unraveler.unravel_validity(4),
3072            Some(validity(&[false, true, false, false]))
3073        );
3074        assert_eq!(
3075            unraveler.unravel_validity(4),
3076            Some(validity(&[false, true, false, false]))
3077        );
3078        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
3079        assert_eq!(off.inner(), offsets_32(&[0, 4]).inner());
3080        assert_eq!(val, None);
3081    }
3082
3083    #[test]
3084    fn test_repdef_no_rep() {
3085        let mut builder = RepDefBuilder::default();
3086        builder.add_no_null(5);
3087        builder.add_validity_bitmap(validity(&[false, false, true, true, true]));
3088        builder.add_validity_bitmap(validity(&[false, true, true, true, false]));
3089
3090        let repdefs = RepDefBuilder::serialize(vec![builder]);
3091        assert!(repdefs.repetition_levels.is_none());
3092        let def = repdefs.definition_levels.unwrap();
3093
3094        assert_eq!([2, 2, 0, 0, 1], *def);
3095
3096        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
3097            None,
3098            Some(def.as_ref().to_vec()),
3099            repdefs.def_meaning.into(),
3100            5,
3101        )]);
3102
3103        assert_eq!(
3104            unraveler.unravel_validity(5),
3105            Some(validity(&[false, false, true, true, false]))
3106        );
3107        assert_eq!(
3108            unraveler.unravel_validity(5),
3109            Some(validity(&[false, false, true, true, true]))
3110        );
3111        assert_eq!(unraveler.unravel_validity(5), None);
3112    }
3113
3114    #[test]
3115    fn test_composite_unravel() {
3116        let mut builder = RepDefBuilder::default();
3117        builder.add_offsets(
3118            offsets_64(&[0, 2, 2, 5]),
3119            Some(validity(&[true, false, true])),
3120        );
3121        builder.add_no_null(5);
3122        let repdef1 = RepDefBuilder::serialize(vec![builder]);
3123
3124        let mut builder = RepDefBuilder::default();
3125        builder.add_offsets(offsets_64(&[0, 1, 3, 5, 7, 9]), None);
3126        builder.add_no_null(9);
3127        let repdef2 = RepDefBuilder::serialize(vec![builder]);
3128
3129        let rep1 = repdef1.repetition_levels.clone().unwrap();
3130        let def1 = repdef1.definition_levels.clone().unwrap();
3131        let rep2 = repdef2.repetition_levels.clone().unwrap();
3132        assert!(repdef2.definition_levels.is_none());
3133
3134        assert_eq!([1, 0, 1, 1, 0, 0], *rep1);
3135        assert_eq!([0, 0, 1, 0, 0, 0], *def1);
3136        assert_eq!([1, 1, 0, 1, 0, 1, 0, 1, 0], *rep2);
3137
3138        let unravel1 = RepDefUnraveler::new(
3139            repdef1.repetition_levels.map(|l| l.to_vec()),
3140            repdef1.definition_levels.map(|l| l.to_vec()),
3141            repdef1.def_meaning.into(),
3142            5,
3143        );
3144        let unravel2 = RepDefUnraveler::new(
3145            repdef2.repetition_levels.map(|l| l.to_vec()),
3146            repdef2.definition_levels.map(|l| l.to_vec()),
3147            repdef2.def_meaning.into(),
3148            9,
3149        );
3150
3151        let mut unraveler = CompositeRepDefUnraveler::new(vec![unravel1, unravel2]);
3152
3153        assert!(unraveler.unravel_validity(9).is_none());
3154        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
3155        assert_eq!(
3156            off.inner(),
3157            offsets_32(&[0, 2, 2, 5, 6, 8, 10, 12, 14]).inner()
3158        );
3159        assert_eq!(
3160            val,
3161            Some(validity(&[true, false, true, true, true, true, true, true]))
3162        );
3163    }
3164
3165    #[test]
3166    fn test_repdef_multiple_builders() {
3167        // Basic case, rep & def
3168        let mut builder1 = RepDefBuilder::default();
3169        builder1.add_offsets(offsets_64(&[0, 2]), None);
3170        builder1.add_offsets(offsets_64(&[0, 1, 3]), None);
3171        builder1.add_validity_bitmap(validity(&[true, true, true]));
3172
3173        let mut builder2 = RepDefBuilder::default();
3174        builder2.add_offsets(offsets_64(&[0, 0, 3]), Some(validity(&[false, true])));
3175        builder2.add_offsets(
3176            offsets_64(&[0, 2, 2, 6]),
3177            Some(validity(&[true, false, true])),
3178        );
3179        builder2.add_validity_bitmap(validity(&[false, false, false, true, true, false]));
3180
3181        let repdefs = RepDefBuilder::serialize(vec![builder1, builder2]);
3182
3183        let rep = repdefs.repetition_levels.unwrap();
3184        let def = repdefs.definition_levels.unwrap();
3185
3186        assert_eq!([2, 1, 0, 2, 2, 0, 1, 1, 0, 0, 0], *rep);
3187        assert_eq!([0, 0, 0, 3, 1, 1, 2, 1, 0, 0, 1], *def);
3188    }
3189
3190    #[test]
3191    fn test_all_valid_validity_bitmap_serializes_as_no_null() {
3192        let mut from_bitmap = RepDefBuilder::default();
3193        from_bitmap.add_validity_bitmap(validity(&[true, true, true, true]));
3194
3195        let mut from_no_null = RepDefBuilder::default();
3196        from_no_null.add_no_null(4);
3197
3198        let from_bitmap = RepDefBuilder::serialize(vec![from_bitmap]);
3199        let from_no_null = RepDefBuilder::serialize(vec![from_no_null]);
3200
3201        assert!(from_bitmap.repetition_levels.is_none());
3202        assert!(from_bitmap.definition_levels.is_none());
3203        assert_eq!(from_bitmap.def_meaning, from_no_null.def_meaning);
3204        assert_eq!(
3205            from_bitmap.max_visible_level,
3206            from_no_null.max_visible_level
3207        );
3208    }
3209
3210    #[test]
3211    fn test_slicer() {
3212        let mut builder = RepDefBuilder::default();
3213        builder.add_offsets(
3214            offsets_64(&[0, 2, 2, 30, 30]),
3215            Some(validity(&[true, false, true, true])),
3216        );
3217        builder.add_no_null(30);
3218
3219        let repdefs = RepDefBuilder::serialize(vec![builder]);
3220
3221        let mut rep_slicer = repdefs.rep_slicer().unwrap();
3222
3223        // First 5 items include a null list so we get 6 levels (12 bytes)
3224        assert_eq!(rep_slicer.slice_next(5).len(), 12);
3225        // Next 20 are all plain
3226        assert_eq!(rep_slicer.slice_next(20).len(), 40);
3227        // Last 5 include an empty list so we get 6 levels (12 bytes)
3228        assert_eq!(rep_slicer.slice_rest().len(), 12);
3229
3230        let mut def_slicer = repdefs.rep_slicer().unwrap();
3231
3232        // First 5 items include a null list so we get 6 levels (12 bytes)
3233        assert_eq!(def_slicer.slice_next(5).len(), 12);
3234        // Next 20 are all plain
3235        assert_eq!(def_slicer.slice_next(20).len(), 40);
3236        // Last 5 include an empty list so we get 6 levels (12 bytes)
3237        assert_eq!(def_slicer.slice_rest().len(), 12);
3238    }
3239
3240    #[test]
3241    fn test_control_words() {
3242        // Convert to control words, verify expected, convert back, verify same as original
3243        fn check(
3244            rep: &[u16],
3245            def: &[u16],
3246            expected_values: Vec<u8>,
3247            expected_bytes_per_word: usize,
3248            expected_bits_rep: u8,
3249            expected_bits_def: u8,
3250        ) {
3251            let num_vals = rep.len().max(def.len());
3252            let max_rep = rep.iter().max().copied().unwrap_or(0);
3253            let max_def = def.iter().max().copied().unwrap_or(0);
3254
3255            let in_rep = if rep.is_empty() { None } else { Some(rep) };
3256            let in_def = if def.is_empty() { None } else { Some(def) };
3257
3258            let mut iter = super::build_control_word_iterator(
3259                in_rep,
3260                max_rep,
3261                in_def,
3262                max_def,
3263                max_def + 1,
3264                expected_values.len(),
3265            );
3266            assert_eq!(iter.bytes_per_word(), expected_bytes_per_word);
3267            assert_eq!(iter.bits_rep(), expected_bits_rep);
3268            assert_eq!(iter.bits_def(), expected_bits_def);
3269            let mut cw_vec = Vec::with_capacity(num_vals * iter.bytes_per_word());
3270
3271            for _ in 0..num_vals {
3272                iter.append_next(&mut cw_vec);
3273            }
3274            assert!(iter.append_next(&mut cw_vec).is_none());
3275
3276            assert_eq!(expected_values, cw_vec);
3277
3278            let parser = super::ControlWordParser::new(expected_bits_rep, expected_bits_def);
3279
3280            let mut rep_out = Vec::with_capacity(num_vals);
3281            let mut def_out = Vec::with_capacity(num_vals);
3282
3283            if expected_bytes_per_word > 0 {
3284                for slice in cw_vec.chunks_exact(expected_bytes_per_word) {
3285                    parser.parse(slice, &mut rep_out, &mut def_out);
3286                }
3287            }
3288
3289            assert_eq!(rep, rep_out.as_slice());
3290            assert_eq!(def, def_out.as_slice());
3291        }
3292
3293        // Each will need 4 bits and so we should get 1-byte control words
3294        let rep = &[0_u16, 7, 3, 2, 9, 8, 12, 5];
3295        let def = &[5_u16, 3, 1, 2, 12, 15, 0, 2];
3296        let expected = vec![
3297            0b00000101, // 0, 5
3298            0b01110011, // 7, 3
3299            0b00110001, // 3, 1
3300            0b00100010, // 2, 2
3301            0b10011100, // 9, 12
3302            0b10001111, // 8, 15
3303            0b11000000, // 12, 0
3304            0b01010010, // 5, 2
3305        ];
3306        check(rep, def, expected, 1, 4, 4);
3307
3308        // Now we need 5 bits for def so we get 2-byte control words
3309        let rep = &[0_u16, 7, 3, 2, 9, 8, 12, 5];
3310        let def = &[5_u16, 3, 1, 2, 12, 22, 0, 2];
3311        let expected = vec![
3312            0b00000101, 0b00000000, // 0, 5
3313            0b11100011, 0b00000000, // 7, 3
3314            0b01100001, 0b00000000, // 3, 1
3315            0b01000010, 0b00000000, // 2, 2
3316            0b00101100, 0b00000001, // 9, 12
3317            0b00010110, 0b00000001, // 8, 22
3318            0b10000000, 0b00000001, // 12, 0
3319            0b10100010, 0b00000000, // 5, 2
3320        ];
3321        check(rep, def, expected, 2, 4, 5);
3322
3323        // Just rep, 4 bits so 1 byte each
3324        let levels = &[0_u16, 7, 3, 2, 9, 8, 12, 5];
3325        let expected = vec![
3326            0b00000000, // 0
3327            0b00000111, // 7
3328            0b00000011, // 3
3329            0b00000010, // 2
3330            0b00001001, // 9
3331            0b00001000, // 8
3332            0b00001100, // 12
3333            0b00000101, // 5
3334        ];
3335        check(levels, &[], expected.clone(), 1, 4, 0);
3336
3337        // Just def
3338        check(&[], levels, expected, 1, 0, 4);
3339
3340        // No rep, no def, no bytes
3341        check(&[], &[], Vec::default(), 0, 0, 0);
3342    }
3343
3344    #[test]
3345    fn test_control_words_rep_index() {
3346        fn check(
3347            rep: &[u16],
3348            def: &[u16],
3349            expected_new_rows: Vec<bool>,
3350            expected_is_visible: Vec<bool>,
3351        ) {
3352            let num_vals = rep.len().max(def.len());
3353            let max_rep = rep.iter().max().copied().unwrap_or(0);
3354            let max_def = def.iter().max().copied().unwrap_or(0);
3355
3356            let in_rep = if rep.is_empty() { None } else { Some(rep) };
3357            let in_def = if def.is_empty() { None } else { Some(def) };
3358
3359            let mut iter = super::build_control_word_iterator(
3360                in_rep,
3361                max_rep,
3362                in_def,
3363                max_def,
3364                /*max_visible_def=*/ 2,
3365                expected_new_rows.len(),
3366            );
3367
3368            let mut cw_vec = Vec::with_capacity(num_vals * iter.bytes_per_word());
3369            let mut expected_new_rows = expected_new_rows.iter().copied();
3370            let mut expected_is_visible = expected_is_visible.iter().copied();
3371            for _ in 0..expected_new_rows.len() {
3372                let word_desc = iter.append_next(&mut cw_vec).unwrap();
3373                assert_eq!(word_desc.is_new_row, expected_new_rows.next().unwrap());
3374                assert_eq!(word_desc.is_visible, expected_is_visible.next().unwrap());
3375            }
3376            assert!(iter.append_next(&mut cw_vec).is_none());
3377        }
3378
3379        // 2 means new list
3380        let rep = &[2_u16, 1, 0, 2, 2, 0, 1, 1, 0, 2, 0];
3381        // These values don't matter for this test
3382        let def = &[0_u16, 0, 0, 3, 1, 1, 2, 1, 0, 0, 1];
3383
3384        // Rep & def
3385        check(
3386            rep,
3387            def,
3388            vec![
3389                true, false, false, true, true, false, false, false, false, true, false,
3390            ],
3391            vec![
3392                true, true, true, false, true, true, true, true, true, true, true,
3393            ],
3394        );
3395        // Rep only
3396        check(
3397            rep,
3398            &[],
3399            vec![
3400                true, false, false, true, true, false, false, false, false, true, false,
3401            ],
3402            vec![true; 11],
3403        );
3404        // No repetition
3405        check(
3406            &[],
3407            def,
3408            vec![
3409                true, true, true, true, true, true, true, true, true, true, true,
3410            ],
3411            vec![true; 11],
3412        );
3413        // No repetition, no definition
3414        check(
3415            &[],
3416            &[],
3417            vec![
3418                true, true, true, true, true, true, true, true, true, true, true,
3419            ],
3420            vec![true; 11],
3421        );
3422    }
3423
3424    #[test]
3425    fn regress_empty_list_case() {
3426        // This regresses a case where we had 3 null lists inside a struct
3427        let mut builder = RepDefBuilder::default();
3428        builder.add_validity_bitmap(validity(&[true, false, true]));
3429        builder.add_offsets(
3430            offsets_32(&[0, 0, 0, 0]),
3431            Some(validity(&[false, false, false])),
3432        );
3433        builder.add_no_null(0);
3434
3435        let repdefs = RepDefBuilder::serialize(vec![builder]);
3436        let rep = repdefs.repetition_levels.unwrap();
3437        let def = repdefs.definition_levels.unwrap();
3438
3439        assert_eq!([1, 1, 1], *rep);
3440        assert_eq!([1, 2, 1], *def);
3441
3442        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
3443            Some(rep.as_ref().to_vec()),
3444            Some(def.as_ref().to_vec()),
3445            repdefs.def_meaning.into(),
3446            0,
3447        )]);
3448
3449        assert_eq!(unraveler.unravel_validity(0), None);
3450        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
3451        assert_eq!(off.inner(), offsets_32(&[0, 0, 0, 0]).inner());
3452        assert_eq!(val, Some(validity(&[false, false, false])));
3453        let val = unraveler.unravel_validity(3).unwrap();
3454        assert_eq!(val.inner(), validity(&[true, false, true]).inner());
3455    }
3456
3457    #[test]
3458    fn regress_list_ends_null_case() {
3459        let mut builder = RepDefBuilder::default();
3460        builder.add_offsets(
3461            offsets_64(&[0, 1, 2, 2]),
3462            Some(validity(&[true, true, false])),
3463        );
3464        builder.add_offsets(offsets_64(&[0, 1, 1]), Some(validity(&[true, false])));
3465        builder.add_no_null(1);
3466
3467        let repdefs = RepDefBuilder::serialize(vec![builder]);
3468        let rep = repdefs.repetition_levels.unwrap();
3469        let def = repdefs.definition_levels.unwrap();
3470
3471        assert_eq!([2, 2, 2], *rep);
3472        assert_eq!([0, 1, 2], *def);
3473
3474        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
3475            Some(rep.as_ref().to_vec()),
3476            Some(def.as_ref().to_vec()),
3477            repdefs.def_meaning.into(),
3478            1,
3479        )]);
3480
3481        assert_eq!(unraveler.unravel_validity(1), None);
3482        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
3483        assert_eq!(off.inner(), offsets_32(&[0, 1, 1]).inner());
3484        assert_eq!(val, Some(validity(&[true, false])));
3485        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
3486        assert_eq!(off.inner(), offsets_32(&[0, 1, 2, 2]).inner());
3487        assert_eq!(val, Some(validity(&[true, true, false])));
3488    }
3489
3490    #[test]
3491    fn test_mixed_unraveler() {
3492        // This tests cases where the validity is different between two different pages
3493        // because one page has nulls and the other doesn't.
3494
3495        // Simple case with one layer of validity and no repetition
3496        let mut unraveler = CompositeRepDefUnraveler::new(vec![
3497            RepDefUnraveler::new(
3498                None,
3499                Some(vec![0, 1, 0, 1]),
3500                vec![DefinitionInterpretation::NullableItem].into(),
3501                4,
3502            ),
3503            RepDefUnraveler::new(
3504                None,
3505                None,
3506                vec![DefinitionInterpretation::AllValidItem].into(),
3507                4,
3508            ),
3509        ]);
3510
3511        assert_eq!(
3512            unraveler.unravel_validity(8),
3513            Some(validity(&[
3514                true, false, true, false, true, true, true, true
3515            ]))
3516        );
3517
3518        // More complex case with two layers of validity and repetition
3519        let def1 = Some(vec![0, 1, 2]);
3520        let rep1 = Some(vec![1, 0, 1]);
3521
3522        let def2 = Some(vec![1, 0, 0]);
3523        let rep2 = Some(vec![1, 1, 0]);
3524
3525        let mut unraveler = CompositeRepDefUnraveler::new(vec![
3526            RepDefUnraveler::new(
3527                rep1,
3528                def1,
3529                vec![
3530                    DefinitionInterpretation::NullableItem,
3531                    DefinitionInterpretation::EmptyableList,
3532                ]
3533                .into(),
3534                2,
3535            ),
3536            RepDefUnraveler::new(
3537                rep2,
3538                def2,
3539                vec![
3540                    DefinitionInterpretation::AllValidItem,
3541                    DefinitionInterpretation::NullableList,
3542                ]
3543                .into(),
3544                2,
3545            ),
3546        ]);
3547
3548        assert_eq!(
3549            unraveler.unravel_validity(4),
3550            Some(validity(&[true, false, true, true]))
3551        );
3552        assert_eq!(
3553            unraveler.unravel_offsets::<i32>().unwrap(),
3554            (
3555                offsets_32(&[0, 2, 2, 2, 4]),
3556                Some(validity(&[true, true, false, true]))
3557            )
3558        );
3559    }
3560
3561    #[test]
3562    fn test_mixed_unraveler_nullable_without_def_levels() {
3563        // A page can keep nullable layer metadata even when all definition levels are 0
3564        // and no definition buffer needs to be materialized. This should decode as all-valid.
3565        let mut unraveler = CompositeRepDefUnraveler::new(vec![
3566            RepDefUnraveler::new(
3567                None,
3568                Some(vec![0, 1, 0, 1]),
3569                vec![DefinitionInterpretation::NullableItem].into(),
3570                4,
3571            ),
3572            RepDefUnraveler::new(
3573                None,
3574                None,
3575                vec![DefinitionInterpretation::NullableItem].into(),
3576                4,
3577            ),
3578        ]);
3579
3580        assert_eq!(
3581            unraveler.unravel_validity(8),
3582            Some(validity(&[
3583                true, false, true, false, true, true, true, true
3584            ]))
3585        );
3586    }
3587}