lance_encoding/
repdef.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Utilities for rep-def levels
5//!
6//! Repetition and definition levels are a way to encode multipile validity / offsets arrays
7//! into a single buffer.  They are a form of "zipping" buffers together that takes advantage
8//! of the fact that, if the outermost array is invalid, then the validity of the inner items
9//! is irrelevant.
10//!
11//! Note: the concept of repetition & definition levels comes from the Dremel paper and has
12//! been implemented in Apache Parquet.  However, the implementation here is not necessarily
13//! compatible with Parquet.  For example, we use 0 to represent the "inner-most" item and
14//! Parquet uses 0 to represent the "outer-most" item.
15//!
16//! # Repetition Levels
17//!
18//! With repetition levels we convert a sparse array of offsets into a dense array of levels.
19//! These levels are marked non-zero whenever a new list begins.  In other words, given the
20//! list array with 3 rows [{<0,1>, <>, <2>}, {<3>}, {}], [], [{<4>}] we would have three
21//! offsets arrays:
22//!
23//! Outer-most ([]): [0, 3, 3, 4]
24//! Middle     ({}): [0, 3, 4, 4, 5]
25//! Inner      (<>): [0, 2, 2, 3, 4, 5]
26//! Values         : [0, 1, 2, 3, 4]
27//!
28//! We can convert these into repetition levels as follows:
29//!
30//! | Values | Repetition |
31//! | ------ | ---------- |
32//! |      0 |          3 | // Start of outer-most list
33//! |      1 |          0 | // Continues inner-most list (no new lists)
34//! |      - |          1 | // Start of new inner-most list (empty list)
35//! |      2 |          1 | // Start of new inner-most list
36//! |      3 |          2 | // Start of new middle list
37//! |      - |          2 | // Start of new inner-most list (empty list)
38//! |      - |          3 | // Start of new outer-most list (empty list)
39//! |      4 |          0 | // Start of new outer-most list
40//!
41//! Note: We actually have MORE repetition levels than values.  This is because the repetition
42//! levels need to be able to represent empty lists.
43//!
44//! # Definition Levels
45//!
46//! Definition levels are simpler.  We can think of them as zipping together various validity bitmaps
47//! (from different levels of nesting) into a single buffer.  For example, we could zip the arrays
48//! [1, 1, 0, 0] and [1, 0, 1, 0] into [11, 10, 01, 00].  However, 00 and 01 are redundant.  If the
49//! outer level is null then the validity of the inner levels is irrelevant.  To save space we instead
50//! encode a "level" which is the "depth" of the null.  Let's look at a more complete example:
51//!
52//! Array: [{"middle": {"inner": 1]}}, NULL, {"middle": NULL}, {"middle": {"inner": NULL}}]
53//!
54//! In Arrow we would have the following validity arrays:
55//! Outer validity : 1, 0, 1, 1
56//! Middle validity: 1, ?, 0, 1
57//! Inner validity : 1, ?, ?, 0
58//! Values         : 1, ?, ?, ?
59//!
60//! The ? values are undefined in the Arrow format.  We can convert these into definition levels as follows:
61//!
62//! | Values | Definition |
63//! | ------ | ---------- |
64//! |      1 |          0 | // Valid at all levels
65//! |      - |          3 | // Null at outer level
66//! |      - |          2 | // Null at middle level
67//! |      - |          1 | // Null at inner level
68//!
69//! # Compression
70//!
71//! Note that we only need 2 bits of definition levels to represent 3 levels of nesting.  Definition
72//! levels are always more compact than the input validity arrays.  However, compressed levels are not
73//! necessarily more compact than the compressed validity arrays.
74//!
75//! Repetition levels are more complex.  If there are very large lists then a sparse array of offsets
76//! (which has one element per list) might be more compact than a dense array of repetition levels
77//! (which has one element per list value, possibly even more if there are empty lists).
78//!
79//! However, both repetition levels and definition levels are typically very compressible with RLE.
80//!
81//! However, in Lance we don't always take advantage of that compression because we want to be able
82//! to zip rep-def levels together with our values.  This gives us fewer IOPS when accessing row values.
83//!
84//! # Utilities in this Module
85//!
86//! - `RepDefBuilder` - Extracts validity and offset information from Arrow arrays.  We use this as we
87//!   shred the incoming data into primitive leaf arrays.  We don't immediately convert into rep-def because
88//!   we need to share and cheaply clone the builder when we have structs (each struct child shares some parent
89//!   validity / offset information)  The `serialize` method is called once all data has been received to create
90//!   the final rep-def levels.
91//!
92//! - `SerializerContext` - This is an internal utility that helps with serializing rep-def levels.
93//!
94//! - `CompositeRepDefUnraveler` - This structure is used to reverse the process.  It starts with a set of
95//!   rep-def levels and then uses the `unravel_validity` and `unravel_offsets` methods to produce validity
96//!   buffers and offset buffers.  It is "composite" because we may be combining sets of rep-def buffers from
97//!   multiple locations (e.g. multiple blocks in a mini-block encoded file).
98//!
99//! - `RepDefSlicer` - This is a utility that helps with slicing rep-def buffers.  These buffers are "kind of"
100//!   transparent (maps 1:1 with the values in the array) but not exactly because of the special (empty/null) lists.
101//!   The slicer helps with this issue and is used when slicing a rep-def buffer into mini-blocks.
102//!
103//! - `build_control_word_iterator` - This takes in rep-def levels and returns an iterator that returns byte-padded
104//!   "control words" which are used when creating full-zip encoded data.
105//!
106//! - `ControlWordParser` - This parser can parse the control words returned by `build_control_word_iterator` and is
107//!   used when decoding full-zip encoded data.
108
109use std::{
110    iter::{Copied, Zip},
111    sync::Arc,
112};
113
114use arrow_array::OffsetSizeTrait;
115use arrow_buffer::{
116    ArrowNativeType, BooleanBuffer, BooleanBufferBuilder, NullBuffer, OffsetBuffer, ScalarBuffer,
117};
118use lance_core::{utils::bit::log_2_ceil, Error, Result};
119use snafu::location;
120
121use crate::buffer::LanceBuffer;
122
123// We assume 16 bits is good enough for rep-def levels.  This _would_ give
124// us 65536 levels of struct nesting and list nesting.  However, we cut that
125// in half for SPECIAL_THRESHOLD because we use the top bit to indicate if an
126// item is a special value (null list / empty list) during construction.
127pub type LevelBuffer = Vec<u16>;
128
129// As we build def levels we add this to special values to indicate that they
130// are special so that we can skip over them when processing lower levels.
131//
132// We subtract this off at the end of construction to get the actual definition
133// levels.
134const SPECIAL_THRESHOLD: u16 = u16::MAX / 2;
135
136/// Represents information that we extract from a list array as we are
137/// encoding
138#[derive(Clone, Debug)]
139struct OffsetDesc {
140    offsets: Arc<[i64]>,
141    validity: Option<BooleanBuffer>,
142    has_empty_lists: bool,
143    num_values: usize,
144    num_specials: usize,
145}
146
147/// Represents validity information that we extract from non-list arrays (that
148/// have nulls) as we are encoding
149#[derive(Clone, Debug)]
150struct ValidityDesc {
151    validity: Option<BooleanBuffer>,
152    num_values: usize,
153}
154
155/// Represents validity information that we extract from FSL arrays.  This is
156/// just validity (no offsets) but we also record the dimension of the FSL array
157/// as that will impact the next layer
158#[derive(Clone, Debug)]
159struct FslDesc {
160    validity: Option<BooleanBuffer>,
161    dimension: usize,
162    num_values: usize,
163}
164
165// As we build up rep/def from arrow arrays we record a
166// series of RawRepDef objects.  Each one corresponds to layer
167// in the array structure
168#[derive(Clone, Debug)]
169enum RawRepDef {
170    Offsets(OffsetDesc),
171    Validity(ValidityDesc),
172    Fsl(FslDesc),
173}
174
175impl RawRepDef {
176    // Are there any nulls in this layer
177    fn has_nulls(&self) -> bool {
178        match self {
179            Self::Offsets(OffsetDesc { validity, .. }) => validity.is_some(),
180            Self::Validity(ValidityDesc { validity, .. }) => validity.is_some(),
181            Self::Fsl(FslDesc { validity, .. }) => validity.is_some(),
182        }
183    }
184
185    // How many values are in this layer
186    fn num_values(&self) -> usize {
187        match self {
188            Self::Offsets(OffsetDesc { num_values, .. }) => *num_values,
189            Self::Validity(ValidityDesc { num_values, .. }) => *num_values,
190            Self::Fsl(FslDesc { num_values, .. }) => *num_values,
191        }
192    }
193
194    /// How many empty/null lists are in this layer
195    fn num_specials(&self) -> usize {
196        match self {
197            Self::Offsets(OffsetDesc { num_specials, .. }) => *num_specials,
198            _ => 0,
199        }
200    }
201
202    /// How many definition levels do we need for this layer
203    fn max_def(&self) -> u16 {
204        match self {
205            Self::Offsets(OffsetDesc {
206                has_empty_lists,
207                validity,
208                ..
209            }) => {
210                let mut max_def = 0;
211                if *has_empty_lists {
212                    max_def += 1;
213                }
214                if validity.is_some() {
215                    max_def += 1;
216                }
217                max_def
218            }
219            Self::Validity(ValidityDesc { validity: None, .. }) => 0,
220            Self::Validity(ValidityDesc { .. }) => 1,
221            Self::Fsl(FslDesc { validity: None, .. }) => 0,
222            Self::Fsl(FslDesc { .. }) => 1,
223        }
224    }
225
226    /// How many repetition levels do we need for this layer
227    fn max_rep(&self) -> u16 {
228        match self {
229            Self::Offsets(_) => 1,
230            _ => 0,
231        }
232    }
233}
234
235/// Represents repetition and definition levels that have been
236/// serialized into a pair of (optional) level buffers
237#[derive(Debug)]
238pub struct SerializedRepDefs {
239    /// The repetition levels, one per item
240    ///
241    /// If None, there are no lists
242    pub repetition_levels: Option<Arc<[u16]>>,
243    /// The definition levels, one per item
244    ///
245    /// If None, there are no nulls
246    pub definition_levels: Option<Arc<[u16]>>,
247    /// The meaning of each definition level
248    pub def_meaning: Vec<DefinitionInterpretation>,
249    /// The maximum level that is "visible" from the lowest level
250    ///
251    /// This is the last level before we encounter a list level of some kind.  Once we've
252    /// hit a list level then nulls in any level beyond do not map to actual items.
253    ///
254    /// This is None if there are no lists
255    pub max_visible_level: Option<u16>,
256}
257
258impl SerializedRepDefs {
259    pub fn new(
260        repetition_levels: Option<LevelBuffer>,
261        definition_levels: Option<LevelBuffer>,
262        def_meaning: Vec<DefinitionInterpretation>,
263    ) -> Self {
264        let first_list = def_meaning.iter().position(|level| level.is_list());
265        let max_visible_level = first_list.map(|first_list| {
266            def_meaning
267                .iter()
268                .map(|level| level.num_def_levels())
269                .take(first_list)
270                .sum::<u16>()
271        });
272        Self {
273            repetition_levels: repetition_levels.map(Arc::from),
274            definition_levels: definition_levels.map(Arc::from),
275            def_meaning,
276            max_visible_level,
277        }
278    }
279
280    /// Creates an empty SerializedRepDefs (no repetition, all valid)
281    pub fn empty(def_meaning: Vec<DefinitionInterpretation>) -> Self {
282        Self {
283            repetition_levels: None,
284            definition_levels: None,
285            def_meaning,
286            max_visible_level: None,
287        }
288    }
289
290    pub fn rep_slicer(&self) -> Option<RepDefSlicer> {
291        self.repetition_levels
292            .as_ref()
293            .map(|rep| RepDefSlicer::new(self, rep.clone()))
294    }
295
296    pub fn def_slicer(&self) -> Option<RepDefSlicer> {
297        self.definition_levels
298            .as_ref()
299            .map(|def| RepDefSlicer::new(self, def.clone()))
300    }
301}
302
303/// Slices a level buffer into pieces
304///
305/// This is needed to handle the fact that a level buffer may have more
306/// levels than values due to special (empty/null) lists.
307///
308/// As a result, a call to `slice_next(10)` may return 10 levels or it may
309/// return more than 10 levels if any special values are encountered.
310#[derive(Debug)]
311pub struct RepDefSlicer<'a> {
312    repdef: &'a SerializedRepDefs,
313    to_slice: LanceBuffer,
314    current: usize,
315}
316
317// TODO: All of this logic will need some changing when we compress rep/def levels.
318impl<'a> RepDefSlicer<'a> {
319    fn new(repdef: &'a SerializedRepDefs, levels: Arc<[u16]>) -> Self {
320        Self {
321            repdef,
322            to_slice: LanceBuffer::reinterpret_slice(levels),
323            current: 0,
324        }
325    }
326
327    pub fn num_levels(&self) -> usize {
328        self.to_slice.len() / 2
329    }
330
331    pub fn num_levels_remaining(&self) -> usize {
332        self.num_levels() - self.current
333    }
334
335    pub fn all_levels(&self) -> &LanceBuffer {
336        &self.to_slice
337    }
338
339    /// Returns the rest of the levels not yet sliced
340    ///
341    /// This must be called instead of `slice_next` on the final iteration.
342    /// This is because anytime we slice there may be empty/null lists on the
343    /// boundary that are "free" and the current behavior in `slice_next` is to
344    /// leave them for the next call.
345    ///
346    /// `slice_rest` will slice all remaining levels and return them.
347    pub fn slice_rest(&mut self) -> LanceBuffer {
348        let start = self.current;
349        let remaining = self.num_levels_remaining();
350        self.current = self.num_levels();
351        self.to_slice.slice_with_length(start * 2, remaining * 2)
352    }
353
354    /// Returns enough levels to satisfy the next `num_values` values
355    pub fn slice_next(&mut self, num_values: usize) -> LanceBuffer {
356        let start = self.current;
357        let Some(max_visible_level) = self.repdef.max_visible_level else {
358            // No lists, should be 1:1 mapping from levels to values
359            self.current = start + num_values;
360            return self.to_slice.slice_with_length(start * 2, num_values * 2);
361        };
362        if let Some(def) = self.repdef.definition_levels.as_ref() {
363            // There are lists and there are def levels.  That means there may be
364            // more rep/def levels than values.  We need to scan the def levels to figure
365            // out which items are "invisible" and skip over them
366            let mut def_itr = def[start..].iter();
367            let mut num_taken = 0;
368            let mut num_passed = 0;
369            while num_taken < num_values {
370                let def_level = *def_itr.next().unwrap();
371                if def_level <= max_visible_level {
372                    num_taken += 1;
373                }
374                num_passed += 1;
375            }
376            self.current = start + num_passed;
377            self.to_slice.slice_with_length(start * 2, num_passed * 2)
378        } else {
379            // No def levels, should be 1:1 mapping from levels to values
380            self.current = start + num_values;
381            self.to_slice.slice_with_length(start * 2, num_values * 2)
382        }
383    }
384}
385
386/// This tells us how an array handles definition.  Given a stack of
387/// these and a nested array and a set of definition levels we can calculate
388/// how we should interpret the definition levels.
389///
390/// For example, if the interpretation is [AllValidItem, NullableItem] then
391/// a 0 means "valid item" and a 1 means "null struct".  If the interpretation
392/// is [NullableItem, NullableItem] then a 0 means "valid item" and a 1 means
393/// "null item" and a 2 means "null struct".
394///
395/// Lists are tricky because we might use up to two definition levels for a
396/// single layer of list nesting because we need one value to indicate "empty list"
397/// and another value to indicate "null list".
398#[derive(Debug, Copy, Clone, PartialEq, Eq)]
399pub enum DefinitionInterpretation {
400    AllValidItem,
401    AllValidList,
402    NullableItem,
403    NullableList,
404    EmptyableList,
405    NullableAndEmptyableList,
406}
407
408impl DefinitionInterpretation {
409    /// How many definition levels do we need for this layer
410    pub fn num_def_levels(&self) -> u16 {
411        match self {
412            Self::AllValidItem => 0,
413            Self::AllValidList => 0,
414            Self::NullableItem => 1,
415            Self::NullableList => 1,
416            Self::EmptyableList => 1,
417            Self::NullableAndEmptyableList => 2,
418        }
419    }
420
421    /// Does this layer have nulls?
422    pub fn is_all_valid(&self) -> bool {
423        matches!(
424            self,
425            Self::AllValidItem | Self::AllValidList | Self::EmptyableList
426        )
427    }
428
429    /// Does this layer represent a list?
430    pub fn is_list(&self) -> bool {
431        matches!(
432            self,
433            Self::AllValidList
434                | Self::NullableList
435                | Self::EmptyableList
436                | Self::NullableAndEmptyableList
437        )
438    }
439}
440
441/// The RepDefBuilder is used to collect offsets & validity buffers
442/// from arrow structures.  Once we have those we use the SerializerContext
443/// to build the actual repetition and definition levels.
444///
445/// We know ahead of time how many rep/def levels we will need (number of items
446/// in inner-most array + the number of empty/null lists in any parent arrays).
447///
448/// As a result we try and avoid any re-allocations by pre-allocating the buffers
449/// up front.  We allocate two copies of each buffer which allows us to avoid unsafe
450/// code caused by reading and writing to the same buffer (also, it's unavoidable
451/// because there are times we need to write 'faster' than we read)
452#[derive(Debug)]
453struct SerializerContext {
454    // This is built from outer-to-inner and then reversed at the end
455    def_meaning: Vec<DefinitionInterpretation>,
456    rep_levels: LevelBuffer,
457    spare_rep: LevelBuffer,
458    def_levels: LevelBuffer,
459    spare_def: LevelBuffer,
460    current_rep: u16,
461    current_def: u16,
462    current_len: usize,
463    current_num_specials: usize,
464}
465
466impl SerializerContext {
467    fn new(len: usize, num_layers: usize, max_rep: u16, max_def: u16) -> Self {
468        let def_meaning = Vec::with_capacity(num_layers);
469        Self {
470            rep_levels: if max_rep > 0 {
471                vec![0; len]
472            } else {
473                LevelBuffer::default()
474            },
475            spare_rep: if max_rep > 0 {
476                vec![0; len]
477            } else {
478                LevelBuffer::default()
479            },
480            def_levels: if max_def > 0 {
481                vec![0; len]
482            } else {
483                LevelBuffer::default()
484            },
485            spare_def: if max_def > 0 {
486                vec![0; len]
487            } else {
488                LevelBuffer::default()
489            },
490            def_meaning,
491            current_rep: max_rep,
492            current_def: max_def,
493            current_len: 0,
494            current_num_specials: 0,
495        }
496    }
497
498    fn checkout_def(&mut self, meaning: DefinitionInterpretation) -> u16 {
499        let def = self.current_def;
500        self.current_def -= meaning.num_def_levels();
501        self.def_meaning.push(meaning);
502        def
503    }
504
505    fn record_offsets(&mut self, offset_desc: &OffsetDesc) {
506        let rep_level = self.current_rep;
507        let (null_list_level, empty_list_level) =
508            match (offset_desc.validity.is_some(), offset_desc.has_empty_lists) {
509                (true, true) => {
510                    let level =
511                        self.checkout_def(DefinitionInterpretation::NullableAndEmptyableList);
512                    (level - 1, level)
513                }
514                (true, false) => (self.checkout_def(DefinitionInterpretation::NullableList), 0),
515                (false, true) => (
516                    0,
517                    self.checkout_def(DefinitionInterpretation::EmptyableList),
518                ),
519                (false, false) => {
520                    self.checkout_def(DefinitionInterpretation::AllValidList);
521                    (0, 0)
522                }
523            };
524        self.current_rep -= 1;
525
526        if let Some(validity) = &offset_desc.validity {
527            self.do_record_validity(validity, null_list_level);
528        }
529
530        // We write into the spare buffers and read from the active buffers
531        // and then swap at the end.  This way we don't write over what we
532        // are reading.
533
534        let mut new_len = 0;
535        assert!(self.rep_levels.len() >= offset_desc.num_values - 1 + self.current_num_specials);
536        if self.def_levels.is_empty() {
537            let mut write_itr = self.spare_rep.iter_mut();
538            let mut read_iter = self.rep_levels.iter().copied();
539            for w in offset_desc.offsets.windows(2) {
540                let len = w[1] - w[0];
541                // len can't be 0 because then we'd have def levels
542                assert!(len > 0);
543                let rep = read_iter.next().unwrap();
544                let list_level = if rep == 0 { rep_level } else { rep };
545                *write_itr.next().unwrap() = list_level;
546
547                for _ in 1..len {
548                    *write_itr.next().unwrap() = 0;
549                }
550                new_len += len as usize;
551            }
552            std::mem::swap(&mut self.rep_levels, &mut self.spare_rep);
553        } else {
554            assert!(
555                self.def_levels.len() >= offset_desc.num_values - 1 + self.current_num_specials
556            );
557            let mut def_write_itr = self.spare_def.iter_mut();
558            let mut rep_write_itr = self.spare_rep.iter_mut();
559            let mut rep_read_itr = self.rep_levels.iter().copied();
560            let mut def_read_itr = self.def_levels.iter().copied();
561            let specials_to_pass = self.current_num_specials;
562            let mut specials_passed = 0;
563
564            for w in offset_desc.offsets.windows(2) {
565                let mut def = def_read_itr.next().unwrap();
566                // Copy over any higher-level special values in place
567                while def > SPECIAL_THRESHOLD {
568                    *def_write_itr.next().unwrap() = def;
569                    *rep_write_itr.next().unwrap() = rep_read_itr.next().unwrap();
570                    def = def_read_itr.next().unwrap();
571                    new_len += 1;
572                    specials_passed += 1;
573                }
574
575                let len = w[1] - w[0];
576                let rep = rep_read_itr.next().unwrap();
577
578                // If the rep_level is 0 then we are the first list level
579                // otherwise we are starting a higher level list so keep
580                // existing rep level
581                let list_level = if rep == 0 { rep_level } else { rep };
582
583                if def == 0 && len > 0 {
584                    // New valid list, write a rep level and then add new 0/0 items
585                    *def_write_itr.next().unwrap() = 0;
586                    *rep_write_itr.next().unwrap() = list_level;
587
588                    for _ in 1..len {
589                        *def_write_itr.next().unwrap() = 0;
590                        *rep_write_itr.next().unwrap() = 0;
591                    }
592
593                    new_len += len as usize;
594                } else if def == 0 {
595                    // Empty list, insert new special
596                    *def_write_itr.next().unwrap() = empty_list_level + SPECIAL_THRESHOLD;
597                    *rep_write_itr.next().unwrap() = list_level;
598                    new_len += 1;
599                } else {
600                    // Either the list is null or one of its struct parents
601                    // is null.  Promote it to a special value.
602                    *def_write_itr.next().unwrap() = def + SPECIAL_THRESHOLD;
603                    *rep_write_itr.next().unwrap() = list_level;
604                    new_len += 1;
605                }
606            }
607
608            // If we have any special values at the end, we need to copy them over
609            while specials_passed < specials_to_pass {
610                *def_write_itr.next().unwrap() = def_read_itr.next().unwrap();
611                *rep_write_itr.next().unwrap() = rep_read_itr.next().unwrap();
612                new_len += 1;
613                specials_passed += 1;
614            }
615            std::mem::swap(&mut self.def_levels, &mut self.spare_def);
616            std::mem::swap(&mut self.rep_levels, &mut self.spare_rep);
617        }
618
619        self.current_len = new_len;
620        self.current_num_specials += offset_desc.num_specials;
621    }
622
623    fn do_record_validity(&mut self, validity: &BooleanBuffer, null_level: u16) {
624        assert!(self.def_levels.len() >= validity.len() + self.current_num_specials);
625        debug_assert!(
626            self.current_len == 0 || self.current_len == validity.len() + self.current_num_specials
627        );
628        self.current_len = validity.len();
629
630        let mut def_read_itr = self.def_levels.iter().copied();
631        let mut def_write_itr = self.spare_def.iter_mut();
632
633        let specials_to_pass = self.current_num_specials;
634        let mut specials_passed = 0;
635
636        for incoming_validity in validity.iter() {
637            let mut def = def_read_itr.next().unwrap();
638            while def > SPECIAL_THRESHOLD {
639                *def_write_itr.next().unwrap() = def;
640                def = def_read_itr.next().unwrap();
641                specials_passed += 1;
642            }
643            if def == 0 && !incoming_validity {
644                *def_write_itr.next().unwrap() = null_level;
645            } else {
646                *def_write_itr.next().unwrap() = def;
647            }
648        }
649
650        while specials_passed < specials_to_pass {
651            *def_write_itr.next().unwrap() = def_read_itr.next().unwrap();
652            specials_passed += 1;
653        }
654
655        std::mem::swap(&mut self.def_levels, &mut self.spare_def);
656    }
657
658    fn multiply_levels(&mut self, multiplier: usize) {
659        let old_len = self.current_len;
660        // All non-special values will be broadcasted by the multiplier.  Special values are copied as-is.
661        self.current_len =
662            (self.current_len - self.current_num_specials) * multiplier + self.current_num_specials;
663
664        if self.rep_levels.is_empty() && self.def_levels.is_empty() {
665            // All valid with no rep/def levels, nothing to do
666            return;
667        } else if self.rep_levels.is_empty() {
668            assert!(self.def_levels.len() >= self.current_len);
669            // No rep levels, just multiply the def levels
670            let mut def_read_itr = self.def_levels.iter().copied();
671            let mut def_write_itr = self.spare_def.iter_mut();
672            for _ in 0..old_len {
673                let mut def = def_read_itr.next().unwrap();
674                while def > SPECIAL_THRESHOLD {
675                    *def_write_itr.next().unwrap() = def;
676                    def = def_read_itr.next().unwrap();
677                }
678                for _ in 0..multiplier {
679                    *def_write_itr.next().unwrap() = def;
680                }
681            }
682        } else if self.def_levels.is_empty() {
683            assert!(self.rep_levels.len() >= self.current_len);
684            // No def levels, just multiply the rep levels
685            let mut rep_read_itr = self.rep_levels.iter().copied();
686            let mut rep_write_itr = self.spare_rep.iter_mut();
687            for _ in 0..old_len {
688                let rep = rep_read_itr.next().unwrap();
689                for _ in 0..multiplier {
690                    *rep_write_itr.next().unwrap() = rep;
691                }
692            }
693        } else {
694            assert!(self.rep_levels.len() >= self.current_len);
695            assert!(self.def_levels.len() >= self.current_len);
696            let mut rep_read_itr = self.rep_levels.iter().copied();
697            let mut def_read_itr = self.def_levels.iter().copied();
698            let mut rep_write_itr = self.spare_rep.iter_mut();
699            let mut def_write_itr = self.spare_def.iter_mut();
700            for _ in 0..old_len {
701                let mut def = def_read_itr.next().unwrap();
702                while def > SPECIAL_THRESHOLD {
703                    *def_write_itr.next().unwrap() = def;
704                    *rep_write_itr.next().unwrap() = rep_read_itr.next().unwrap();
705                    def = def_read_itr.next().unwrap();
706                }
707                let rep = rep_read_itr.next().unwrap();
708                for _ in 0..multiplier {
709                    *def_write_itr.next().unwrap() = def;
710                    *rep_write_itr.next().unwrap() = rep;
711                }
712            }
713        }
714        std::mem::swap(&mut self.def_levels, &mut self.spare_def);
715        std::mem::swap(&mut self.rep_levels, &mut self.spare_rep);
716    }
717
718    fn record_validity_buf(&mut self, validity: &Option<BooleanBuffer>) {
719        if let Some(validity) = validity {
720            let def_level = self.checkout_def(DefinitionInterpretation::NullableItem);
721            self.do_record_validity(validity, def_level);
722        } else {
723            self.checkout_def(DefinitionInterpretation::AllValidItem);
724        }
725    }
726
727    fn record_validity(&mut self, validity_desc: &ValidityDesc) {
728        self.record_validity_buf(&validity_desc.validity)
729    }
730
731    fn record_fsl(&mut self, fsl_desc: &FslDesc) {
732        self.record_validity_buf(&fsl_desc.validity);
733        self.multiply_levels(fsl_desc.dimension);
734    }
735
736    fn normalize_specials(&mut self) {
737        for def in self.def_levels.iter_mut() {
738            if *def > SPECIAL_THRESHOLD {
739                *def -= SPECIAL_THRESHOLD;
740            }
741        }
742    }
743
744    fn build(mut self) -> SerializedRepDefs {
745        if self.current_len == 0 {
746            return SerializedRepDefs::new(None, None, self.def_meaning);
747        }
748
749        self.normalize_specials();
750
751        let definition_levels = if self.def_levels.is_empty() {
752            None
753        } else {
754            Some(self.def_levels)
755        };
756        let repetition_levels = if self.rep_levels.is_empty() {
757            None
758        } else {
759            Some(self.rep_levels)
760        };
761
762        // Need to reverse the def meaning since we build rep / def levels in reverse
763        let def_meaning = self.def_meaning.into_iter().rev().collect::<Vec<_>>();
764
765        SerializedRepDefs::new(repetition_levels, definition_levels, def_meaning)
766    }
767}
768
769/// A structure used to collect validity buffers and offsets from arrow
770/// arrays and eventually create repetition and definition levels
771///
772/// As we are encoding the structural encoders are given this struct and
773/// will record the arrow information into it.  Once we hit a leaf node we
774/// serialize the data into rep/def levels and write these into the page.
775#[derive(Clone, Default, Debug)]
776pub struct RepDefBuilder {
777    // The rep/def info we have collected so far
778    repdefs: Vec<RawRepDef>,
779    // The current length, can get larger as we traverse lists (e.g. an
780    // array might have 5 lists which results in 50 items)
781    //
782    // Starts uninitialized until we see the first rep/def item
783    len: Option<usize>,
784}
785
786impl RepDefBuilder {
787    fn check_validity_len(&mut self, incoming_len: usize) {
788        if let Some(len) = self.len {
789            assert_eq!(incoming_len, len);
790        } else {
791            // First validity buffer we've seen
792            self.len = Some(incoming_len);
793        }
794    }
795
796    fn num_layers(&self) -> usize {
797        self.repdefs.len()
798    }
799
800    /// The builder is "empty" if there is no repetition and no nulls.  In this case we don't need
801    /// to store anything to disk (except the description)
802    fn is_empty(&self) -> bool {
803        self.repdefs
804            .iter()
805            .all(|r| matches!(r, RawRepDef::Validity(ValidityDesc { validity: None, .. })))
806    }
807
808    /// Returns true if there is only a single layer of definition
809    pub fn is_simple_validity(&self) -> bool {
810        self.repdefs.len() == 1 && matches!(self.repdefs[0], RawRepDef::Validity(_))
811    }
812
813    /// Registers a nullable validity bitmap
814    pub fn add_validity_bitmap(&mut self, validity: NullBuffer) {
815        self.check_validity_len(validity.len());
816        self.repdefs.push(RawRepDef::Validity(ValidityDesc {
817            num_values: validity.len(),
818            validity: Some(validity.into_inner()),
819        }));
820    }
821
822    /// Registers an all-valid validity layer
823    pub fn add_no_null(&mut self, len: usize) {
824        self.check_validity_len(len);
825        self.repdefs.push(RawRepDef::Validity(ValidityDesc {
826            validity: None,
827            num_values: len,
828        }));
829    }
830
831    pub fn add_fsl(&mut self, validity: Option<NullBuffer>, dimension: usize, num_values: usize) {
832        if let Some(len) = self.len {
833            assert_eq!(num_values, len);
834        }
835        self.len = Some(num_values * dimension);
836        debug_assert!(validity.is_none() || validity.as_ref().unwrap().len() == num_values);
837        self.repdefs.push(RawRepDef::Fsl(FslDesc {
838            num_values,
839            validity: validity.map(|v| v.into_inner()),
840            dimension,
841        }))
842    }
843
844    fn check_offset_len(&mut self, offsets: &[i64]) {
845        if let Some(len) = self.len {
846            assert!(offsets.len() == len + 1);
847        }
848        self.len = Some(offsets[offsets.len() - 1] as usize);
849    }
850
851    /// Adds a layer of offsets
852    ///
853    /// Offsets are casted to a common type (i64) and also normalized.  Null lists are
854    /// always represented by a zero-length (identical) pair of offsets and so the caller
855    /// should filter out any garbage items before encoding them.  To assist with this the
856    /// method will return true if any non-empty null lists were found.
857    pub fn add_offsets<O: OffsetSizeTrait>(
858        &mut self,
859        offsets: OffsetBuffer<O>,
860        validity: Option<NullBuffer>,
861    ) -> bool {
862        let mut has_garbage_values = false;
863        let mut num_specials = 0;
864        if O::IS_LARGE {
865            let inner = offsets.into_inner();
866            let len = inner.len();
867            let i64_buff = ScalarBuffer::<i64>::new(inner.into_inner(), 0, len);
868            let mut normalized = Vec::with_capacity(len);
869            normalized.push(0_i64);
870            let mut has_empty_lists = false;
871            let mut last_off = 0;
872            if let Some(validity) = validity.as_ref() {
873                for (off, valid) in i64_buff.windows(2).zip(validity.iter()) {
874                    let len: i64 = off[1] - off[0];
875                    match (valid, len == 0) {
876                        (false, is_empty) => {
877                            num_specials += 1;
878                            has_garbage_values |= !is_empty;
879                        }
880                        (true, true) => {
881                            num_specials += 1;
882                            has_empty_lists = true;
883                        }
884                        _ => {
885                            last_off += len;
886                        }
887                    }
888                    normalized.push(last_off);
889                }
890            } else {
891                for off in i64_buff.windows(2) {
892                    let len: i64 = off[1] - off[0];
893                    if len == 0 {
894                        num_specials += 1;
895                        has_empty_lists = true;
896                    }
897                    last_off += len;
898                    normalized.push(last_off);
899                }
900            };
901            self.check_offset_len(&normalized);
902            self.repdefs.push(RawRepDef::Offsets(OffsetDesc {
903                num_values: normalized.len() - 1,
904                offsets: normalized.into(),
905                validity: validity.map(|v| v.into_inner()),
906                has_empty_lists,
907                num_specials,
908            }));
909            has_garbage_values
910        } else {
911            let inner = offsets.into_inner();
912            let len = inner.len();
913            let scalar_off = ScalarBuffer::<i32>::new(inner.into_inner(), 0, len);
914            let mut casted = Vec::with_capacity(len);
915            casted.push(0);
916            let mut has_empty_lists = false;
917            let mut num_specials = 0;
918            let mut last_off: i64 = 0;
919            if let Some(validity) = validity.as_ref() {
920                for (off, valid) in scalar_off.windows(2).zip(validity.iter()) {
921                    let len = (off[1] - off[0]) as i64;
922                    match (valid, len == 0) {
923                        (false, is_empty) => {
924                            num_specials += 1;
925                            has_garbage_values |= !is_empty;
926                        }
927                        (true, true) => {
928                            num_specials += 1;
929                            has_empty_lists = true;
930                        }
931                        _ => {
932                            last_off += len;
933                        }
934                    }
935                    casted.push(last_off);
936                }
937            } else {
938                for off in scalar_off.windows(2) {
939                    let len = (off[1] - off[0]) as i64;
940                    if len == 0 {
941                        num_specials += 1;
942                        has_empty_lists = true;
943                    }
944                    last_off += len;
945                    casted.push(last_off);
946                }
947            };
948            self.check_offset_len(&casted);
949            self.repdefs.push(RawRepDef::Offsets(OffsetDesc {
950                num_values: casted.len() - 1,
951                offsets: casted.into(),
952                validity: validity.map(|v| v.into_inner()),
953                has_empty_lists,
954                num_specials,
955            }));
956            has_garbage_values
957        }
958    }
959
960    // When we are encoding data it arrives in batches.  For each batch we create a RepDefBuilder and collect the
961    // various validity buffers and offset buffers from that batch.  Once we have enough batches to write a page we
962    // need to take this collection of RepDefBuilders and concatenate them and then serialize them into rep/def levels.
963    //
964    // TODO: In the future, we may concatenate and serialize at the same time?
965    //
966    // This method takes care of the concatenation part.  First we collect all of layer 0 from each builder, then we
967    // call this method.  Then we collect all of layer 1 from each builder and call this method.  And so on.
968    //
969    // That means this method should get a collection of `RawRepDef` where each item is the same kind (all validity or
970    // all offsets) though the nullability / lengths may be different in each layer.
971    fn concat_layers<'a>(
972        layers: impl Iterator<Item = &'a RawRepDef>,
973        num_layers: usize,
974    ) -> RawRepDef {
975        enum LayerKind {
976            Validity,
977            Fsl,
978            Offsets,
979        }
980
981        // We make two passes through the layers.  The first determines if we need to pay the cost of allocating
982        // buffers.  The second pass actually adds the values.
983        let mut collected = Vec::with_capacity(num_layers);
984        let mut has_nulls = false;
985        let mut layer_kind = LayerKind::Validity;
986        let mut total_num_specials = 0;
987        let mut all_dimension = 0;
988        let mut all_has_empty_lists = false;
989        let mut all_num_values = 0;
990        for layer in layers {
991            has_nulls |= layer.has_nulls();
992            match layer {
993                RawRepDef::Validity(_) => {
994                    layer_kind = LayerKind::Validity;
995                }
996                RawRepDef::Offsets(OffsetDesc {
997                    num_specials,
998                    has_empty_lists,
999                    ..
1000                }) => {
1001                    all_has_empty_lists |= *has_empty_lists;
1002                    layer_kind = LayerKind::Offsets;
1003                    total_num_specials += num_specials;
1004                }
1005                RawRepDef::Fsl(FslDesc { dimension, .. }) => {
1006                    layer_kind = LayerKind::Fsl;
1007                    all_dimension = *dimension;
1008                }
1009            }
1010            collected.push(layer);
1011            all_num_values += layer.num_values();
1012        }
1013
1014        // Shortcut if there are no nulls
1015        if !has_nulls {
1016            match layer_kind {
1017                LayerKind::Validity => {
1018                    return RawRepDef::Validity(ValidityDesc {
1019                        validity: None,
1020                        num_values: all_num_values,
1021                    });
1022                }
1023                LayerKind::Fsl => {
1024                    return RawRepDef::Fsl(FslDesc {
1025                        validity: None,
1026                        num_values: all_num_values,
1027                        dimension: all_dimension,
1028                    })
1029                }
1030                LayerKind::Offsets => {}
1031            }
1032        }
1033
1034        // Only allocate if needed
1035        let mut validity_builder = if has_nulls {
1036            BooleanBufferBuilder::new(all_num_values)
1037        } else {
1038            BooleanBufferBuilder::new(0)
1039        };
1040        let mut all_offsets = if matches!(layer_kind, LayerKind::Offsets) {
1041            let mut all_offsets = Vec::with_capacity(all_num_values);
1042            all_offsets.push(0);
1043            all_offsets
1044        } else {
1045            Vec::new()
1046        };
1047
1048        for layer in collected {
1049            match layer {
1050                RawRepDef::Validity(ValidityDesc {
1051                    validity: Some(validity),
1052                    ..
1053                }) => {
1054                    validity_builder.append_buffer(validity);
1055                }
1056                RawRepDef::Validity(ValidityDesc {
1057                    validity: None,
1058                    num_values,
1059                }) => {
1060                    validity_builder.append_n(*num_values, true);
1061                }
1062                RawRepDef::Fsl(FslDesc {
1063                    validity,
1064                    num_values,
1065                    ..
1066                }) => {
1067                    if let Some(validity) = validity {
1068                        validity_builder.append_buffer(validity);
1069                    } else {
1070                        validity_builder.append_n(*num_values, true);
1071                    }
1072                }
1073                RawRepDef::Offsets(OffsetDesc {
1074                    offsets,
1075                    validity: Some(validity),
1076                    has_empty_lists,
1077                    ..
1078                }) => {
1079                    all_has_empty_lists |= has_empty_lists;
1080                    validity_builder.append_buffer(validity);
1081                    let last = *all_offsets.last().unwrap();
1082                    all_offsets.extend(offsets.iter().skip(1).map(|off| *off + last));
1083                }
1084                RawRepDef::Offsets(OffsetDesc {
1085                    offsets,
1086                    validity: None,
1087                    has_empty_lists,
1088                    num_values,
1089                    ..
1090                }) => {
1091                    all_has_empty_lists |= has_empty_lists;
1092                    if has_nulls {
1093                        validity_builder.append_n(*num_values, true);
1094                    }
1095                    let last = *all_offsets.last().unwrap();
1096                    all_offsets.extend(offsets.iter().skip(1).map(|off| *off + last));
1097                }
1098            }
1099        }
1100        let validity = if has_nulls {
1101            Some(validity_builder.finish())
1102        } else {
1103            None
1104        };
1105        match layer_kind {
1106            LayerKind::Fsl => RawRepDef::Fsl(FslDesc {
1107                validity,
1108                num_values: all_num_values,
1109                dimension: all_dimension,
1110            }),
1111            LayerKind::Validity => RawRepDef::Validity(ValidityDesc {
1112                validity,
1113                num_values: all_num_values,
1114            }),
1115            LayerKind::Offsets => RawRepDef::Offsets(OffsetDesc {
1116                offsets: all_offsets.into(),
1117                validity,
1118                has_empty_lists: all_has_empty_lists,
1119                num_values: all_num_values,
1120                num_specials: total_num_specials,
1121            }),
1122        }
1123    }
1124
1125    /// Converts the validity / offsets buffers that have been gathered so far
1126    /// into repetition and definition levels
1127    pub fn serialize(builders: Vec<Self>) -> SerializedRepDefs {
1128        assert!(!builders.is_empty());
1129        if builders.iter().all(|b| b.is_empty()) {
1130            // No repetition, all-valid
1131            return SerializedRepDefs::empty(
1132                builders
1133                    .first()
1134                    .unwrap()
1135                    .repdefs
1136                    .iter()
1137                    .map(|_| DefinitionInterpretation::AllValidItem)
1138                    .collect::<Vec<_>>(),
1139            );
1140        }
1141
1142        let num_layers = builders[0].num_layers();
1143        let combined_layers = (0..num_layers)
1144            .map(|layer_index| {
1145                Self::concat_layers(
1146                    builders.iter().map(|b| &b.repdefs[layer_index]),
1147                    builders.len(),
1148                )
1149            })
1150            .collect::<Vec<_>>();
1151        debug_assert!(builders
1152            .iter()
1153            .all(|b| b.num_layers() == builders[0].num_layers()));
1154
1155        let total_len = combined_layers.last().unwrap().num_values()
1156            + combined_layers
1157                .iter()
1158                .map(|l| l.num_specials())
1159                .sum::<usize>();
1160        let max_rep = combined_layers.iter().map(|l| l.max_rep()).sum::<u16>();
1161        let max_def = combined_layers.iter().map(|l| l.max_def()).sum::<u16>();
1162
1163        let mut context = SerializerContext::new(total_len, num_layers, max_rep, max_def);
1164        for layer in combined_layers.into_iter() {
1165            match layer {
1166                RawRepDef::Validity(def) => {
1167                    context.record_validity(&def);
1168                }
1169                RawRepDef::Offsets(rep) => {
1170                    context.record_offsets(&rep);
1171                }
1172                RawRepDef::Fsl(fsl) => {
1173                    context.record_fsl(&fsl);
1174                }
1175            }
1176        }
1177        context.build()
1178    }
1179}
1180
1181/// Starts with serialized repetition and definition levels and unravels
1182/// them into validity buffers and offsets buffers
1183///
1184/// This is used during decoding to create the necessary arrow structures
1185#[derive(Debug)]
1186pub struct RepDefUnraveler {
1187    rep_levels: Option<LevelBuffer>,
1188    def_levels: Option<LevelBuffer>,
1189    // Maps from definition level to the rep level at which that definition level is visible
1190    levels_to_rep: Vec<u16>,
1191    def_meaning: Arc<[DefinitionInterpretation]>,
1192    // Current definition level to compare to.
1193    current_def_cmp: u16,
1194    // Current rep level, determines which specials we can see
1195    current_rep_cmp: u16,
1196    // Current layer index, 0 means inner-most layer and it counts up from there.  Used to index
1197    // into special_defs
1198    current_layer: usize,
1199}
1200
1201impl RepDefUnraveler {
1202    /// Creates a new unraveler from serialized repetition and definition information
1203    pub fn new(
1204        rep_levels: Option<LevelBuffer>,
1205        def_levels: Option<LevelBuffer>,
1206        def_meaning: Arc<[DefinitionInterpretation]>,
1207    ) -> Self {
1208        let mut levels_to_rep = Vec::with_capacity(def_meaning.len());
1209        let mut rep_counter = 0;
1210        // Level=0 is always visible and means valid item
1211        levels_to_rep.push(0);
1212        for meaning in def_meaning.as_ref() {
1213            match meaning {
1214                DefinitionInterpretation::AllValidItem | DefinitionInterpretation::AllValidList => {
1215                    // There is no corresponding level, so nothing to put in levels_to_rep
1216                }
1217                DefinitionInterpretation::NullableItem => {
1218                    // Some null structs are not visible at inner rep levels in cases like LIST<STRUCT<LIST<...>>>
1219                    levels_to_rep.push(rep_counter);
1220                }
1221                DefinitionInterpretation::NullableList => {
1222                    rep_counter += 1;
1223                    levels_to_rep.push(rep_counter);
1224                }
1225                DefinitionInterpretation::EmptyableList => {
1226                    rep_counter += 1;
1227                    levels_to_rep.push(rep_counter);
1228                }
1229                DefinitionInterpretation::NullableAndEmptyableList => {
1230                    rep_counter += 1;
1231                    levels_to_rep.push(rep_counter);
1232                    levels_to_rep.push(rep_counter);
1233                }
1234            }
1235        }
1236        Self {
1237            rep_levels,
1238            def_levels,
1239            current_def_cmp: 0,
1240            current_rep_cmp: 0,
1241            levels_to_rep,
1242            current_layer: 0,
1243            def_meaning,
1244        }
1245    }
1246
1247    pub fn is_all_valid(&self) -> bool {
1248        self.def_meaning[self.current_layer].is_all_valid()
1249    }
1250
1251    /// If the current level is a repetition layer then this returns the number of lists
1252    /// at this level.
1253    ///
1254    /// This is not valid to call when the current level is a struct/primitive layer because
1255    /// in some cases there may be no rep or def information to know this.
1256    pub fn max_lists(&self) -> usize {
1257        debug_assert!(
1258            self.def_meaning[self.current_layer] != DefinitionInterpretation::NullableItem
1259        );
1260        self.rep_levels
1261            .as_ref()
1262            // Worst case every rep item is max_rep and a new list
1263            .map(|levels| levels.len())
1264            .unwrap_or(0)
1265    }
1266
1267    /// Unravels a layer of offsets from the unraveler into the given offset width
1268    ///
1269    /// When decoding a list the caller should first unravel the offsets and then
1270    /// unravel the validity (this is the opposite order used during encoding)
1271    pub fn unravel_offsets<T: ArrowNativeType>(
1272        &mut self,
1273        offsets: &mut Vec<T>,
1274        validity: Option<&mut BooleanBufferBuilder>,
1275    ) -> Result<()> {
1276        let rep_levels = self
1277            .rep_levels
1278            .as_mut()
1279            .expect("Expected repetition level but data didn't contain repetition");
1280        let valid_level = self.current_def_cmp;
1281        let (null_level, empty_level) = match self.def_meaning[self.current_layer] {
1282            DefinitionInterpretation::NullableList => {
1283                self.current_def_cmp += 1;
1284                (valid_level + 1, 0)
1285            }
1286            DefinitionInterpretation::EmptyableList => {
1287                self.current_def_cmp += 1;
1288                (0, valid_level + 1)
1289            }
1290            DefinitionInterpretation::NullableAndEmptyableList => {
1291                self.current_def_cmp += 2;
1292                (valid_level + 1, valid_level + 2)
1293            }
1294            DefinitionInterpretation::AllValidList => (0, 0),
1295            _ => unreachable!(),
1296        };
1297        self.current_layer += 1;
1298
1299        // This is the highest def level that is still visible.  Once we hit a list then
1300        // we stop looking because any null / empty list (or list masked by a higher level
1301        // null) will not be visible
1302        let mut max_level = null_level.max(empty_level);
1303        // Anything higher than this (but less than max_level) is a null struct masking our
1304        // list.  We will materialize this is a null list.
1305        let upper_null = max_level;
1306        for level in self.def_meaning[self.current_layer..].iter() {
1307            match level {
1308                DefinitionInterpretation::NullableItem => {
1309                    max_level += 1;
1310                }
1311                DefinitionInterpretation::AllValidItem => {}
1312                _ => {
1313                    break;
1314                }
1315            }
1316        }
1317
1318        let mut curlen: usize = offsets.last().map(|o| o.as_usize()).unwrap_or(0);
1319
1320        // If offsets is empty this is a no-op.  If offsets is not empty that means we already
1321        // added a set of offsets.  For example, we might have added [0, 3, 5] (2 lists).  Now
1322        // say we want to add [0, 1, 4] (2 lists).  We should get [0, 3, 5, 6, 9] (4 lists).  If
1323        // we don't pop here we get [0, 3, 5, 5, 6, 9] which is wrong.
1324        //
1325        // Or, to think about it another way, if every unraveler adds the starting 0 and the trailing
1326        // length then we have N + unravelers.len() values instead of N + 1.
1327        offsets.pop();
1328
1329        let to_offset = |val: usize| {
1330            T::from_usize(val)
1331            .ok_or_else(|| Error::invalid_input("A single batch had more than i32::MAX values and so a large container type is required", location!()))
1332        };
1333        self.current_rep_cmp += 1;
1334        if let Some(def_levels) = &mut self.def_levels {
1335            assert!(rep_levels.len() == def_levels.len());
1336            // It's possible validity is None even if we have def levels.  For example, we might have
1337            // empty lists (which require def levels) but no nulls.
1338            let mut push_validity: Box<dyn FnMut(bool)> = if let Some(validity) = validity {
1339                Box::new(|is_valid| validity.append(is_valid))
1340            } else {
1341                Box::new(|_| {})
1342            };
1343            // This is a strange access pattern.  We are iterating over the rep/def levels and
1344            // at the same time writing the rep/def levels.  This means we need both a mutable
1345            // and immutable reference to the rep/def levels.
1346            let mut read_idx = 0;
1347            let mut write_idx = 0;
1348            while read_idx < rep_levels.len() {
1349                // SAFETY: We assert that rep_levels and def_levels have the same
1350                // len and read_idx and write_idx can never go past the end.
1351                unsafe {
1352                    let rep_val = *rep_levels.get_unchecked(read_idx);
1353                    if rep_val != 0 {
1354                        let def_val = *def_levels.get_unchecked(read_idx);
1355                        // Copy over
1356                        *rep_levels.get_unchecked_mut(write_idx) = rep_val - 1;
1357                        *def_levels.get_unchecked_mut(write_idx) = def_val;
1358                        write_idx += 1;
1359
1360                        if def_val == 0 {
1361                            // This is a valid list
1362                            offsets.push(to_offset(curlen)?);
1363                            curlen += 1;
1364                            push_validity(true);
1365                        } else if def_val > max_level {
1366                            // This is not visible at this rep level, do not add to offsets, but keep in repdef
1367                        } else if def_val == null_level || def_val > upper_null {
1368                            // This is a null list (or a list masked by a null struct)
1369                            offsets.push(to_offset(curlen)?);
1370                            push_validity(false);
1371                        } else if def_val == empty_level {
1372                            // This is an empty list
1373                            offsets.push(to_offset(curlen)?);
1374                            push_validity(true);
1375                        } else {
1376                            // New valid list starting with null item
1377                            offsets.push(to_offset(curlen)?);
1378                            curlen += 1;
1379                            push_validity(true);
1380                        }
1381                    } else {
1382                        curlen += 1;
1383                    }
1384                    read_idx += 1;
1385                }
1386            }
1387            offsets.push(to_offset(curlen)?);
1388            rep_levels.truncate(write_idx);
1389            def_levels.truncate(write_idx);
1390            Ok(())
1391        } else {
1392            // SAFETY: See above loop
1393            let mut read_idx = 0;
1394            let mut write_idx = 0;
1395            let old_offsets_len = offsets.len();
1396            while read_idx < rep_levels.len() {
1397                // SAFETY: read_idx / write_idx cannot go past rep_levels.len()
1398                unsafe {
1399                    let rep_val = *rep_levels.get_unchecked(read_idx);
1400                    if rep_val != 0 {
1401                        // Finish the current list
1402                        offsets.push(to_offset(curlen)?);
1403                        *rep_levels.get_unchecked_mut(write_idx) = rep_val - 1;
1404                        write_idx += 1;
1405                    }
1406                    curlen += 1;
1407                    read_idx += 1;
1408                }
1409            }
1410            let num_new_lists = offsets.len() - old_offsets_len;
1411            offsets.push(to_offset(curlen)?);
1412            rep_levels.truncate(offsets.len() - 1);
1413            if let Some(validity) = validity {
1414                // Even though we don't have validity it is possible another unraveler did and so we need
1415                // to push all valids
1416                validity.append_n(num_new_lists, true);
1417            }
1418            Ok(())
1419        }
1420    }
1421
1422    pub fn skip_validity(&mut self) {
1423        debug_assert!(
1424            self.def_meaning[self.current_layer] == DefinitionInterpretation::AllValidItem
1425        );
1426        self.current_layer += 1;
1427    }
1428
1429    /// Unravels a layer of validity from the definition levels
1430    pub fn unravel_validity(&mut self, validity: &mut BooleanBufferBuilder) {
1431        debug_assert!(
1432            self.def_meaning[self.current_layer] != DefinitionInterpretation::AllValidItem
1433        );
1434        self.current_layer += 1;
1435
1436        let def_levels = &self.def_levels.as_ref().unwrap();
1437
1438        let current_def_cmp = self.current_def_cmp;
1439        self.current_def_cmp += 1;
1440
1441        for is_valid in def_levels.iter().filter_map(|&level| {
1442            if self.levels_to_rep[level as usize] <= self.current_rep_cmp {
1443                Some(level <= current_def_cmp)
1444            } else {
1445                None
1446            }
1447        }) {
1448            validity.append(is_valid);
1449        }
1450    }
1451
1452    pub fn decimate(&mut self, dimension: usize) {
1453        if self.rep_levels.is_some() {
1454            // If we need to support this then I think we need to walk through the rep def levels to find
1455            // the spots at which we keep.  E.g. if we have:
1456            //  rep: 1 0 0 1 0 1 0 0 0 1 0 0
1457            //  def: 1 1 1 0 1 0 1 1 0 1 1 0
1458            //  dimension: 2
1459            //
1460            // The output should be:
1461            //  rep: 1 0 0 1 0 0 0
1462            //  def: 1 1 1 0 1 1 0
1463            //
1464            // Maybe there's some special logic for empty/null lists?  I'll save the headache for future me.
1465            todo!("Not yet supported FSL<...List<...>>");
1466        }
1467        let Some(def_levels) = self.def_levels.as_mut() else {
1468            return;
1469        };
1470        let mut read_idx = 0;
1471        let mut write_idx = 0;
1472        while read_idx < def_levels.len() {
1473            unsafe {
1474                *def_levels.get_unchecked_mut(write_idx) = *def_levels.get_unchecked(read_idx);
1475            }
1476            write_idx += 1;
1477            read_idx += dimension;
1478        }
1479        def_levels.truncate(write_idx);
1480    }
1481}
1482
1483/// As we decode we may extract rep/def information from multiple pages (or multiple
1484/// chunks within a page).
1485///
1486/// For each chunk we create an unraveler.  Each unraveler can have a completely different
1487/// interpretation (e.g. one page might contain null items but no null structs and the next
1488/// page might have null structs but no null items).
1489///
1490/// Concatenating these unravelers would be tricky and expensive so instead we have a
1491/// composite unraveler which unravels across multiple unravelers.
1492///
1493/// Note: this class should be used even if there is only one page / unraveler.  This is
1494/// because the `RepDefUnraveler`'s API is more complex (it's meant to be called by this
1495/// class)
1496#[derive(Debug)]
1497pub struct CompositeRepDefUnraveler {
1498    unravelers: Vec<RepDefUnraveler>,
1499}
1500
1501impl CompositeRepDefUnraveler {
1502    pub fn new(unravelers: Vec<RepDefUnraveler>) -> Self {
1503        Self { unravelers }
1504    }
1505
1506    /// Unravels a layer of validity
1507    ///
1508    /// Returns None if there are no null items in this layer
1509    pub fn unravel_validity(&mut self, num_values: usize) -> Option<NullBuffer> {
1510        let is_all_valid = self
1511            .unravelers
1512            .iter()
1513            .all(|unraveler| unraveler.is_all_valid());
1514
1515        if is_all_valid {
1516            for unraveler in self.unravelers.iter_mut() {
1517                unraveler.skip_validity();
1518            }
1519            None
1520        } else {
1521            let mut validity = BooleanBufferBuilder::new(num_values);
1522            for unraveler in self.unravelers.iter_mut() {
1523                unraveler.unravel_validity(&mut validity);
1524            }
1525            Some(NullBuffer::new(validity.finish()))
1526        }
1527    }
1528
1529    pub fn unravel_fsl_validity(
1530        &mut self,
1531        num_values: usize,
1532        dimension: usize,
1533    ) -> Option<NullBuffer> {
1534        for unraveler in self.unravelers.iter_mut() {
1535            unraveler.decimate(dimension);
1536        }
1537        self.unravel_validity(num_values)
1538    }
1539
1540    /// Unravels a layer of offsets (and the validity for that layer)
1541    pub fn unravel_offsets<T: ArrowNativeType>(
1542        &mut self,
1543    ) -> Result<(OffsetBuffer<T>, Option<NullBuffer>)> {
1544        let mut is_all_valid = true;
1545        let mut max_num_lists = 0;
1546        for unraveler in self.unravelers.iter() {
1547            is_all_valid &= unraveler.is_all_valid();
1548            max_num_lists += unraveler.max_lists();
1549        }
1550
1551        let mut validity = if is_all_valid {
1552            None
1553        } else {
1554            // Note: This is probably an over-estimate and potentially even an under-estimate.  We only know
1555            // right now how many items we have and not how many rows.  (TODO: Shouldn't we know the # of rows?)
1556            Some(BooleanBufferBuilder::new(max_num_lists))
1557        };
1558
1559        let mut offsets = Vec::with_capacity(max_num_lists + 1);
1560
1561        for unraveler in self.unravelers.iter_mut() {
1562            unraveler.unravel_offsets(&mut offsets, validity.as_mut())?;
1563        }
1564
1565        Ok((
1566            OffsetBuffer::new(ScalarBuffer::from(offsets)),
1567            validity.map(|mut v| NullBuffer::new(v.finish())),
1568        ))
1569    }
1570}
1571
1572/// A [`ControlWordIterator`] when there are both repetition and definition levels
1573///
1574/// The iterator will put the repetition level in the upper bits and the definition
1575/// level in the lower bits.  The number of bits used for each level is determined
1576/// by the width of the repetition and definition levels.
1577#[derive(Debug)]
1578pub struct BinaryControlWordIterator<I: Iterator<Item = (u16, u16)>, W> {
1579    repdef: I,
1580    def_width: usize,
1581    max_rep: u16,
1582    max_visible_def: u16,
1583    rep_mask: u16,
1584    def_mask: u16,
1585    bits_rep: u8,
1586    bits_def: u8,
1587    phantom: std::marker::PhantomData<W>,
1588}
1589
1590impl<I: Iterator<Item = (u16, u16)>> BinaryControlWordIterator<I, u8> {
1591    fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1592        let next = self.repdef.next()?;
1593        let control_word: u8 =
1594            (((next.0 & self.rep_mask) as u8) << self.def_width) + ((next.1 & self.def_mask) as u8);
1595        buf.push(control_word);
1596        let is_new_row = next.0 == self.max_rep;
1597        let is_visible = next.1 <= self.max_visible_def;
1598        let is_valid_item = next.1 == 0;
1599        Some(ControlWordDesc {
1600            is_new_row,
1601            is_visible,
1602            is_valid_item,
1603        })
1604    }
1605}
1606
1607impl<I: Iterator<Item = (u16, u16)>> BinaryControlWordIterator<I, u16> {
1608    fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1609        let next = self.repdef.next()?;
1610        let control_word: u16 =
1611            ((next.0 & self.rep_mask) << self.def_width) + (next.1 & self.def_mask);
1612        let control_word = control_word.to_le_bytes();
1613        buf.push(control_word[0]);
1614        buf.push(control_word[1]);
1615        let is_new_row = next.0 == self.max_rep;
1616        let is_visible = next.1 <= self.max_visible_def;
1617        let is_valid_item = next.1 == 0;
1618        Some(ControlWordDesc {
1619            is_new_row,
1620            is_visible,
1621            is_valid_item,
1622        })
1623    }
1624}
1625
1626impl<I: Iterator<Item = (u16, u16)>> BinaryControlWordIterator<I, u32> {
1627    fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1628        let next = self.repdef.next()?;
1629        let control_word: u32 = (((next.0 & self.rep_mask) as u32) << self.def_width)
1630            + ((next.1 & self.def_mask) as u32);
1631        let control_word = control_word.to_le_bytes();
1632        buf.push(control_word[0]);
1633        buf.push(control_word[1]);
1634        buf.push(control_word[2]);
1635        buf.push(control_word[3]);
1636        let is_new_row = next.0 == self.max_rep;
1637        let is_visible = next.1 <= self.max_visible_def;
1638        let is_valid_item = next.1 == 0;
1639        Some(ControlWordDesc {
1640            is_new_row,
1641            is_visible,
1642            is_valid_item,
1643        })
1644    }
1645}
1646
1647/// A [`ControlWordIterator`] when there are only definition levels or only repetition levels
1648#[derive(Debug)]
1649pub struct UnaryControlWordIterator<I: Iterator<Item = u16>, W> {
1650    repdef: I,
1651    level_mask: u16,
1652    bits_rep: u8,
1653    bits_def: u8,
1654    max_rep: u16,
1655    phantom: std::marker::PhantomData<W>,
1656}
1657
1658impl<I: Iterator<Item = u16>> UnaryControlWordIterator<I, u8> {
1659    fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1660        let next = self.repdef.next()?;
1661        buf.push((next & self.level_mask) as u8);
1662        let is_new_row = self.max_rep == 0 || next == self.max_rep;
1663        let is_valid_item = next == 0 || self.bits_def == 0;
1664        Some(ControlWordDesc {
1665            is_new_row,
1666            // Either there is no rep, in which case there are no invisible items
1667            // or there is no def, in which case there are no invisible items
1668            is_visible: true,
1669            is_valid_item,
1670        })
1671    }
1672}
1673
1674impl<I: Iterator<Item = u16>> UnaryControlWordIterator<I, u16> {
1675    fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1676        let next = self.repdef.next().unwrap() & self.level_mask;
1677        let control_word = next.to_le_bytes();
1678        buf.push(control_word[0]);
1679        buf.push(control_word[1]);
1680        let is_new_row = self.max_rep == 0 || next == self.max_rep;
1681        let is_valid_item = next == 0 || self.bits_def == 0;
1682        Some(ControlWordDesc {
1683            is_new_row,
1684            is_visible: true,
1685            is_valid_item,
1686        })
1687    }
1688}
1689
1690impl<I: Iterator<Item = u16>> UnaryControlWordIterator<I, u32> {
1691    fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1692        let next = self.repdef.next()?;
1693        let next = (next & self.level_mask) as u32;
1694        let control_word = next.to_le_bytes();
1695        buf.push(control_word[0]);
1696        buf.push(control_word[1]);
1697        buf.push(control_word[2]);
1698        buf.push(control_word[3]);
1699        let is_new_row = self.max_rep == 0 || next as u16 == self.max_rep;
1700        let is_valid_item = next == 0 || self.bits_def == 0;
1701        Some(ControlWordDesc {
1702            is_new_row,
1703            is_visible: true,
1704            is_valid_item,
1705        })
1706    }
1707}
1708
1709/// A [`ControlWordIterator`] when there are no repetition or definition levels
1710#[derive(Debug)]
1711pub struct NilaryControlWordIterator {
1712    len: usize,
1713    idx: usize,
1714}
1715
1716impl NilaryControlWordIterator {
1717    fn append_next(&mut self) -> Option<ControlWordDesc> {
1718        if self.idx == self.len {
1719            None
1720        } else {
1721            self.idx += 1;
1722            Some(ControlWordDesc {
1723                is_new_row: true,
1724                is_visible: true,
1725                is_valid_item: true,
1726            })
1727        }
1728    }
1729}
1730
1731/// Helper function to get a bit mask of the given width
1732fn get_mask(width: u16) -> u16 {
1733    (1 << width) - 1
1734}
1735
1736// We're really going out of our way to avoid boxing here but this will be called on a per-value basis
1737// so it is in the critical path.
1738type SpecificBinaryControlWordIterator<'a, T> = BinaryControlWordIterator<
1739    Zip<Copied<std::slice::Iter<'a, u16>>, Copied<std::slice::Iter<'a, u16>>>,
1740    T,
1741>;
1742
1743/// An iterator that generates control words from repetition and definition levels
1744///
1745/// "Control word" is just a fancy term for a single u8/u16/u32 that contains both
1746/// the repetition and definition in it.
1747///
1748/// In the large majority of case we only need a single byte to represent both the
1749/// repetition and definition levels.  However, if there is deep nesting then we may
1750/// need two bytes.  In the worst case we need 4 bytes though this suggests hundreds of
1751/// levels of nesting which seems unlikely to encounter in practice.
1752#[derive(Debug)]
1753pub enum ControlWordIterator<'a> {
1754    Binary8(SpecificBinaryControlWordIterator<'a, u8>),
1755    Binary16(SpecificBinaryControlWordIterator<'a, u16>),
1756    Binary32(SpecificBinaryControlWordIterator<'a, u32>),
1757    Unary8(UnaryControlWordIterator<Copied<std::slice::Iter<'a, u16>>, u8>),
1758    Unary16(UnaryControlWordIterator<Copied<std::slice::Iter<'a, u16>>, u16>),
1759    Unary32(UnaryControlWordIterator<Copied<std::slice::Iter<'a, u16>>, u32>),
1760    Nilary(NilaryControlWordIterator),
1761}
1762
1763/// Describes the properties of a control word
1764#[derive(Debug)]
1765pub struct ControlWordDesc {
1766    pub is_new_row: bool,
1767    pub is_visible: bool,
1768    pub is_valid_item: bool,
1769}
1770
1771impl ControlWordIterator<'_> {
1772    /// Appends the next control word to the buffer
1773    ///
1774    /// Returns true if this is the start of a new item (i.e. the repetition level is maxed out)
1775    pub fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1776        match self {
1777            Self::Binary8(iter) => iter.append_next(buf),
1778            Self::Binary16(iter) => iter.append_next(buf),
1779            Self::Binary32(iter) => iter.append_next(buf),
1780            Self::Unary8(iter) => iter.append_next(buf),
1781            Self::Unary16(iter) => iter.append_next(buf),
1782            Self::Unary32(iter) => iter.append_next(buf),
1783            Self::Nilary(iter) => iter.append_next(),
1784        }
1785    }
1786
1787    /// Return true if the control word iterator has repetition levels
1788    pub fn has_repetition(&self) -> bool {
1789        match self {
1790            Self::Binary8(_) | Self::Binary16(_) | Self::Binary32(_) => true,
1791            Self::Unary8(iter) => iter.bits_rep > 0,
1792            Self::Unary16(iter) => iter.bits_rep > 0,
1793            Self::Unary32(iter) => iter.bits_rep > 0,
1794            Self::Nilary(_) => false,
1795        }
1796    }
1797
1798    /// Returns the number of bytes per control word
1799    pub fn bytes_per_word(&self) -> usize {
1800        match self {
1801            Self::Binary8(_) => 1,
1802            Self::Binary16(_) => 2,
1803            Self::Binary32(_) => 4,
1804            Self::Unary8(_) => 1,
1805            Self::Unary16(_) => 2,
1806            Self::Unary32(_) => 4,
1807            Self::Nilary(_) => 0,
1808        }
1809    }
1810
1811    /// Returns the number of bits used for the repetition level
1812    pub fn bits_rep(&self) -> u8 {
1813        match self {
1814            Self::Binary8(iter) => iter.bits_rep,
1815            Self::Binary16(iter) => iter.bits_rep,
1816            Self::Binary32(iter) => iter.bits_rep,
1817            Self::Unary8(iter) => iter.bits_rep,
1818            Self::Unary16(iter) => iter.bits_rep,
1819            Self::Unary32(iter) => iter.bits_rep,
1820            Self::Nilary(_) => 0,
1821        }
1822    }
1823
1824    /// Returns the number of bits used for the definition level
1825    pub fn bits_def(&self) -> u8 {
1826        match self {
1827            Self::Binary8(iter) => iter.bits_def,
1828            Self::Binary16(iter) => iter.bits_def,
1829            Self::Binary32(iter) => iter.bits_def,
1830            Self::Unary8(iter) => iter.bits_def,
1831            Self::Unary16(iter) => iter.bits_def,
1832            Self::Unary32(iter) => iter.bits_def,
1833            Self::Nilary(_) => 0,
1834        }
1835    }
1836}
1837
1838/// Builds a [`ControlWordIterator`] from repetition and definition levels
1839/// by first calculating the width needed and then creating the iterator
1840/// with the appropriate width
1841pub fn build_control_word_iterator<'a>(
1842    rep: Option<&'a [u16]>,
1843    max_rep: u16,
1844    def: Option<&'a [u16]>,
1845    max_def: u16,
1846    max_visible_def: u16,
1847    len: usize,
1848) -> ControlWordIterator<'a> {
1849    let rep_width = if max_rep == 0 {
1850        0
1851    } else {
1852        log_2_ceil(max_rep as u32) as u16
1853    };
1854    let rep_mask = if max_rep == 0 { 0 } else { get_mask(rep_width) };
1855    let def_width = if max_def == 0 {
1856        0
1857    } else {
1858        log_2_ceil(max_def as u32) as u16
1859    };
1860    let def_mask = if max_def == 0 { 0 } else { get_mask(def_width) };
1861    let total_width = rep_width + def_width;
1862    match (rep, def) {
1863        (Some(rep), Some(def)) => {
1864            let iter = rep.iter().copied().zip(def.iter().copied());
1865            let def_width = def_width as usize;
1866            if total_width <= 8 {
1867                ControlWordIterator::Binary8(BinaryControlWordIterator {
1868                    repdef: iter,
1869                    rep_mask,
1870                    def_mask,
1871                    def_width,
1872                    max_rep,
1873                    max_visible_def,
1874                    bits_rep: rep_width as u8,
1875                    bits_def: def_width as u8,
1876                    phantom: std::marker::PhantomData,
1877                })
1878            } else if total_width <= 16 {
1879                ControlWordIterator::Binary16(BinaryControlWordIterator {
1880                    repdef: iter,
1881                    rep_mask,
1882                    def_mask,
1883                    def_width,
1884                    max_rep,
1885                    max_visible_def,
1886                    bits_rep: rep_width as u8,
1887                    bits_def: def_width as u8,
1888                    phantom: std::marker::PhantomData,
1889                })
1890            } else {
1891                ControlWordIterator::Binary32(BinaryControlWordIterator {
1892                    repdef: iter,
1893                    rep_mask,
1894                    def_mask,
1895                    def_width,
1896                    max_rep,
1897                    max_visible_def,
1898                    bits_rep: rep_width as u8,
1899                    bits_def: def_width as u8,
1900                    phantom: std::marker::PhantomData,
1901                })
1902            }
1903        }
1904        (Some(lev), None) => {
1905            let iter = lev.iter().copied();
1906            if total_width <= 8 {
1907                ControlWordIterator::Unary8(UnaryControlWordIterator {
1908                    repdef: iter,
1909                    level_mask: rep_mask,
1910                    bits_rep: total_width as u8,
1911                    bits_def: 0,
1912                    max_rep,
1913                    phantom: std::marker::PhantomData,
1914                })
1915            } else if total_width <= 16 {
1916                ControlWordIterator::Unary16(UnaryControlWordIterator {
1917                    repdef: iter,
1918                    level_mask: rep_mask,
1919                    bits_rep: total_width as u8,
1920                    bits_def: 0,
1921                    max_rep,
1922                    phantom: std::marker::PhantomData,
1923                })
1924            } else {
1925                ControlWordIterator::Unary32(UnaryControlWordIterator {
1926                    repdef: iter,
1927                    level_mask: rep_mask,
1928                    bits_rep: total_width as u8,
1929                    bits_def: 0,
1930                    max_rep,
1931                    phantom: std::marker::PhantomData,
1932                })
1933            }
1934        }
1935        (None, Some(lev)) => {
1936            let iter = lev.iter().copied();
1937            if total_width <= 8 {
1938                ControlWordIterator::Unary8(UnaryControlWordIterator {
1939                    repdef: iter,
1940                    level_mask: def_mask,
1941                    bits_rep: 0,
1942                    bits_def: total_width as u8,
1943                    max_rep: 0,
1944                    phantom: std::marker::PhantomData,
1945                })
1946            } else if total_width <= 16 {
1947                ControlWordIterator::Unary16(UnaryControlWordIterator {
1948                    repdef: iter,
1949                    level_mask: def_mask,
1950                    bits_rep: 0,
1951                    bits_def: total_width as u8,
1952                    max_rep: 0,
1953                    phantom: std::marker::PhantomData,
1954                })
1955            } else {
1956                ControlWordIterator::Unary32(UnaryControlWordIterator {
1957                    repdef: iter,
1958                    level_mask: def_mask,
1959                    bits_rep: 0,
1960                    bits_def: total_width as u8,
1961                    max_rep: 0,
1962                    phantom: std::marker::PhantomData,
1963                })
1964            }
1965        }
1966        (None, None) => ControlWordIterator::Nilary(NilaryControlWordIterator { len, idx: 0 }),
1967    }
1968}
1969
1970/// A parser to unwrap control words into repetition and definition levels
1971///
1972/// This is the inverse of the [`ControlWordIterator`].
1973#[derive(Copy, Clone, Debug)]
1974pub enum ControlWordParser {
1975    // First item is the bits to shift, second is the mask to apply (the mask can be
1976    // calculated from the bits to shift but we don't want to calculate it each time)
1977    BOTH8(u8, u32),
1978    BOTH16(u8, u32),
1979    BOTH32(u8, u32),
1980    REP8,
1981    REP16,
1982    REP32,
1983    DEF8,
1984    DEF16,
1985    DEF32,
1986    NIL,
1987}
1988
1989impl ControlWordParser {
1990    fn parse_both<const WORD_SIZE: u8>(
1991        src: &[u8],
1992        dst_rep: &mut Vec<u16>,
1993        dst_def: &mut Vec<u16>,
1994        bits_to_shift: u8,
1995        mask_to_apply: u32,
1996    ) {
1997        match WORD_SIZE {
1998            1 => {
1999                let word = src[0];
2000                let rep = word >> bits_to_shift;
2001                let def = word & (mask_to_apply as u8);
2002                dst_rep.push(rep as u16);
2003                dst_def.push(def as u16);
2004            }
2005            2 => {
2006                let word = u16::from_le_bytes([src[0], src[1]]);
2007                let rep = word >> bits_to_shift;
2008                let def = word & mask_to_apply as u16;
2009                dst_rep.push(rep);
2010                dst_def.push(def);
2011            }
2012            4 => {
2013                let word = u32::from_le_bytes([src[0], src[1], src[2], src[3]]);
2014                let rep = word >> bits_to_shift;
2015                let def = word & mask_to_apply;
2016                dst_rep.push(rep as u16);
2017                dst_def.push(def as u16);
2018            }
2019            _ => unreachable!(),
2020        }
2021    }
2022
2023    fn parse_desc_both<const WORD_SIZE: u8>(
2024        src: &[u8],
2025        bits_to_shift: u8,
2026        mask_to_apply: u32,
2027        max_rep: u16,
2028        max_visible_def: u16,
2029    ) -> ControlWordDesc {
2030        match WORD_SIZE {
2031            1 => {
2032                let word = src[0];
2033                let rep = word >> bits_to_shift;
2034                let def = word & (mask_to_apply as u8);
2035                let is_visible = def as u16 <= max_visible_def;
2036                let is_new_row = rep as u16 == max_rep;
2037                let is_valid_item = def == 0;
2038                ControlWordDesc {
2039                    is_visible,
2040                    is_new_row,
2041                    is_valid_item,
2042                }
2043            }
2044            2 => {
2045                let word = u16::from_le_bytes([src[0], src[1]]);
2046                let rep = word >> bits_to_shift;
2047                let def = word & mask_to_apply as u16;
2048                let is_visible = def <= max_visible_def;
2049                let is_new_row = rep == max_rep;
2050                let is_valid_item = def == 0;
2051                ControlWordDesc {
2052                    is_visible,
2053                    is_new_row,
2054                    is_valid_item,
2055                }
2056            }
2057            4 => {
2058                let word = u32::from_le_bytes([src[0], src[1], src[2], src[3]]);
2059                let rep = word >> bits_to_shift;
2060                let def = word & mask_to_apply;
2061                let is_visible = def as u16 <= max_visible_def;
2062                let is_new_row = rep as u16 == max_rep;
2063                let is_valid_item = def == 0;
2064                ControlWordDesc {
2065                    is_visible,
2066                    is_new_row,
2067                    is_valid_item,
2068                }
2069            }
2070            _ => unreachable!(),
2071        }
2072    }
2073
2074    fn parse_one<const WORD_SIZE: u8>(src: &[u8], dst: &mut Vec<u16>) {
2075        match WORD_SIZE {
2076            1 => {
2077                let word = src[0];
2078                dst.push(word as u16);
2079            }
2080            2 => {
2081                let word = u16::from_le_bytes([src[0], src[1]]);
2082                dst.push(word);
2083            }
2084            4 => {
2085                let word = u32::from_le_bytes([src[0], src[1], src[2], src[3]]);
2086                dst.push(word as u16);
2087            }
2088            _ => unreachable!(),
2089        }
2090    }
2091
2092    fn parse_rep_desc_one<const WORD_SIZE: u8>(src: &[u8], max_rep: u16) -> ControlWordDesc {
2093        match WORD_SIZE {
2094            1 => ControlWordDesc {
2095                is_new_row: src[0] as u16 == max_rep,
2096                is_visible: true,
2097                is_valid_item: true,
2098            },
2099            2 => ControlWordDesc {
2100                is_new_row: u16::from_le_bytes([src[0], src[1]]) == max_rep,
2101                is_visible: true,
2102                is_valid_item: true,
2103            },
2104            4 => ControlWordDesc {
2105                is_new_row: u32::from_le_bytes([src[0], src[1], src[2], src[3]]) as u16 == max_rep,
2106                is_visible: true,
2107                is_valid_item: true,
2108            },
2109            _ => unreachable!(),
2110        }
2111    }
2112
2113    fn parse_def_desc_one<const WORD_SIZE: u8>(src: &[u8]) -> ControlWordDesc {
2114        match WORD_SIZE {
2115            1 => ControlWordDesc {
2116                is_new_row: true,
2117                is_visible: true,
2118                is_valid_item: src[0] == 0,
2119            },
2120            2 => ControlWordDesc {
2121                is_new_row: true,
2122                is_visible: true,
2123                is_valid_item: u16::from_le_bytes([src[0], src[1]]) == 0,
2124            },
2125            4 => ControlWordDesc {
2126                is_new_row: true,
2127                is_visible: true,
2128                is_valid_item: u32::from_le_bytes([src[0], src[1], src[2], src[3]]) as u16 == 0,
2129            },
2130            _ => unreachable!(),
2131        }
2132    }
2133
2134    /// Returns the number of bytes per control word
2135    pub fn bytes_per_word(&self) -> usize {
2136        match self {
2137            Self::BOTH8(..) => 1,
2138            Self::BOTH16(..) => 2,
2139            Self::BOTH32(..) => 4,
2140            Self::REP8 => 1,
2141            Self::REP16 => 2,
2142            Self::REP32 => 4,
2143            Self::DEF8 => 1,
2144            Self::DEF16 => 2,
2145            Self::DEF32 => 4,
2146            Self::NIL => 0,
2147        }
2148    }
2149
2150    /// Appends the next control word to the rep & def buffers
2151    ///
2152    /// `src` should be pointing at the first byte (little endian) of the control word
2153    ///
2154    /// `dst_rep` and `dst_def` are the buffers to append the rep and def levels to.
2155    /// They will not be appended to if not needed.
2156    pub fn parse(&self, src: &[u8], dst_rep: &mut Vec<u16>, dst_def: &mut Vec<u16>) {
2157        match self {
2158            Self::BOTH8(bits_to_shift, mask_to_apply) => {
2159                Self::parse_both::<1>(src, dst_rep, dst_def, *bits_to_shift, *mask_to_apply)
2160            }
2161            Self::BOTH16(bits_to_shift, mask_to_apply) => {
2162                Self::parse_both::<2>(src, dst_rep, dst_def, *bits_to_shift, *mask_to_apply)
2163            }
2164            Self::BOTH32(bits_to_shift, mask_to_apply) => {
2165                Self::parse_both::<4>(src, dst_rep, dst_def, *bits_to_shift, *mask_to_apply)
2166            }
2167            Self::REP8 => Self::parse_one::<1>(src, dst_rep),
2168            Self::REP16 => Self::parse_one::<2>(src, dst_rep),
2169            Self::REP32 => Self::parse_one::<4>(src, dst_rep),
2170            Self::DEF8 => Self::parse_one::<1>(src, dst_def),
2171            Self::DEF16 => Self::parse_one::<2>(src, dst_def),
2172            Self::DEF32 => Self::parse_one::<4>(src, dst_def),
2173            Self::NIL => {}
2174        }
2175    }
2176
2177    /// Return true if the control words contain repetition information
2178    pub fn has_rep(&self) -> bool {
2179        match self {
2180            Self::BOTH8(..)
2181            | Self::BOTH16(..)
2182            | Self::BOTH32(..)
2183            | Self::REP8
2184            | Self::REP16
2185            | Self::REP32 => true,
2186            Self::DEF8 | Self::DEF16 | Self::DEF32 | Self::NIL => false,
2187        }
2188    }
2189
2190    /// Temporarily parses the control word to inspect its properties but does not append to any buffers
2191    pub fn parse_desc(&self, src: &[u8], max_rep: u16, max_visible_def: u16) -> ControlWordDesc {
2192        match self {
2193            Self::BOTH8(bits_to_shift, mask_to_apply) => Self::parse_desc_both::<1>(
2194                src,
2195                *bits_to_shift,
2196                *mask_to_apply,
2197                max_rep,
2198                max_visible_def,
2199            ),
2200            Self::BOTH16(bits_to_shift, mask_to_apply) => Self::parse_desc_both::<2>(
2201                src,
2202                *bits_to_shift,
2203                *mask_to_apply,
2204                max_rep,
2205                max_visible_def,
2206            ),
2207            Self::BOTH32(bits_to_shift, mask_to_apply) => Self::parse_desc_both::<4>(
2208                src,
2209                *bits_to_shift,
2210                *mask_to_apply,
2211                max_rep,
2212                max_visible_def,
2213            ),
2214            Self::REP8 => Self::parse_rep_desc_one::<1>(src, max_rep),
2215            Self::REP16 => Self::parse_rep_desc_one::<2>(src, max_rep),
2216            Self::REP32 => Self::parse_rep_desc_one::<4>(src, max_rep),
2217            Self::DEF8 => Self::parse_def_desc_one::<1>(src),
2218            Self::DEF16 => Self::parse_def_desc_one::<2>(src),
2219            Self::DEF32 => Self::parse_def_desc_one::<4>(src),
2220            Self::NIL => ControlWordDesc {
2221                is_new_row: true,
2222                is_valid_item: true,
2223                is_visible: true,
2224            },
2225        }
2226    }
2227
2228    /// Creates a new parser from the number of bits used for the repetition and definition levels
2229    pub fn new(bits_rep: u8, bits_def: u8) -> Self {
2230        let total_bits = bits_rep + bits_def;
2231
2232        enum WordSize {
2233            One,
2234            Two,
2235            Four,
2236        }
2237
2238        let word_size = if total_bits <= 8 {
2239            WordSize::One
2240        } else if total_bits <= 16 {
2241            WordSize::Two
2242        } else {
2243            WordSize::Four
2244        };
2245
2246        match (bits_rep > 0, bits_def > 0, word_size) {
2247            (false, false, _) => Self::NIL,
2248            (false, true, WordSize::One) => Self::DEF8,
2249            (false, true, WordSize::Two) => Self::DEF16,
2250            (false, true, WordSize::Four) => Self::DEF32,
2251            (true, false, WordSize::One) => Self::REP8,
2252            (true, false, WordSize::Two) => Self::REP16,
2253            (true, false, WordSize::Four) => Self::REP32,
2254            (true, true, WordSize::One) => Self::BOTH8(bits_def, get_mask(bits_def as u16) as u32),
2255            (true, true, WordSize::Two) => Self::BOTH16(bits_def, get_mask(bits_def as u16) as u32),
2256            (true, true, WordSize::Four) => {
2257                Self::BOTH32(bits_def, get_mask(bits_def as u16) as u32)
2258            }
2259        }
2260    }
2261}
2262
2263#[cfg(test)]
2264mod tests {
2265    use arrow_buffer::{NullBuffer, OffsetBuffer, ScalarBuffer};
2266
2267    use crate::repdef::{
2268        CompositeRepDefUnraveler, DefinitionInterpretation, RepDefUnraveler, SerializedRepDefs,
2269    };
2270
2271    use super::RepDefBuilder;
2272
2273    fn validity(values: &[bool]) -> NullBuffer {
2274        NullBuffer::from_iter(values.iter().copied())
2275    }
2276
2277    fn offsets_32(values: &[i32]) -> OffsetBuffer<i32> {
2278        OffsetBuffer::<i32>::new(ScalarBuffer::from_iter(values.iter().copied()))
2279    }
2280
2281    fn offsets_64(values: &[i64]) -> OffsetBuffer<i64> {
2282        OffsetBuffer::<i64>::new(ScalarBuffer::from_iter(values.iter().copied()))
2283    }
2284
2285    #[test]
2286    fn test_repdef_basic() {
2287        // Basic case, rep & def
2288        let mut builder = RepDefBuilder::default();
2289        builder.add_offsets(
2290            offsets_64(&[0, 2, 2, 5]),
2291            Some(validity(&[true, false, true])),
2292        );
2293        builder.add_offsets(
2294            offsets_64(&[0, 1, 3, 5, 5, 9]),
2295            Some(validity(&[true, true, true, false, true])),
2296        );
2297        builder.add_validity_bitmap(validity(&[
2298            true, true, true, false, false, false, true, true, false,
2299        ]));
2300
2301        let repdefs = RepDefBuilder::serialize(vec![builder]);
2302        let rep = repdefs.repetition_levels.unwrap();
2303        let def = repdefs.definition_levels.unwrap();
2304
2305        assert_eq!(vec![0, 0, 0, 3, 1, 1, 2, 1, 0, 0, 1], *def);
2306        assert_eq!(vec![2, 1, 0, 2, 2, 0, 1, 1, 0, 0, 0], *rep);
2307
2308        // [[I], [I, I]], NULL, [[NULL, NULL], NULL, [NULL, I, I, NULL]]
2309
2310        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2311            Some(rep.as_ref().to_vec()),
2312            Some(def.as_ref().to_vec()),
2313            repdefs.def_meaning.into(),
2314        )]);
2315
2316        // Note: validity doesn't exactly round-trip because repdef normalizes some of the
2317        // redundant validity values
2318        assert_eq!(
2319            unraveler.unravel_validity(9),
2320            Some(validity(&[
2321                true, true, true, false, false, false, true, true, false
2322            ]))
2323        );
2324        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2325        assert_eq!(off.inner(), offsets_32(&[0, 1, 3, 5, 5, 9]).inner());
2326        assert_eq!(val, Some(validity(&[true, true, true, false, true])));
2327        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2328        assert_eq!(off.inner(), offsets_32(&[0, 2, 2, 5]).inner());
2329        assert_eq!(val, Some(validity(&[true, false, true])));
2330    }
2331
2332    #[test]
2333    fn test_repdef_simple_null_empty_list() {
2334        let check = |repdefs: SerializedRepDefs, last_def: DefinitionInterpretation| {
2335            let rep = repdefs.repetition_levels.unwrap();
2336            let def = repdefs.definition_levels.unwrap();
2337
2338            assert_eq!([1, 0, 1, 1, 0, 0], *rep);
2339            assert_eq!([0, 0, 2, 0, 1, 0], *def);
2340            assert_eq!(
2341                vec![DefinitionInterpretation::NullableItem, last_def,],
2342                repdefs.def_meaning
2343            );
2344        };
2345
2346        // Null list and empty list should be serialized mostly the same
2347
2348        // Null case
2349        let mut builder = RepDefBuilder::default();
2350        builder.add_offsets(
2351            offsets_32(&[0, 2, 2, 5]),
2352            Some(validity(&[true, false, true])),
2353        );
2354        builder.add_validity_bitmap(validity(&[true, true, true, false, true]));
2355
2356        let repdefs = RepDefBuilder::serialize(vec![builder]);
2357
2358        check(repdefs, DefinitionInterpretation::NullableList);
2359
2360        // Empty case
2361        let mut builder = RepDefBuilder::default();
2362        builder.add_offsets(offsets_32(&[0, 2, 2, 5]), None);
2363        builder.add_validity_bitmap(validity(&[true, true, true, false, true]));
2364
2365        let repdefs = RepDefBuilder::serialize(vec![builder]);
2366
2367        check(repdefs, DefinitionInterpretation::EmptyableList);
2368    }
2369
2370    #[test]
2371    fn test_repdef_empty_list_at_end() {
2372        // Regresses a failure we encountered when the last item was an empty list
2373        let mut builder = RepDefBuilder::default();
2374        builder.add_offsets(offsets_32(&[0, 2, 5, 5]), None);
2375        builder.add_validity_bitmap(validity(&[true, true, true, false, true]));
2376
2377        let repdefs = RepDefBuilder::serialize(vec![builder]);
2378
2379        let rep = repdefs.repetition_levels.unwrap();
2380        let def = repdefs.definition_levels.unwrap();
2381
2382        assert_eq!([1, 0, 1, 0, 0, 1], *rep);
2383        assert_eq!([0, 0, 0, 1, 0, 2], *def);
2384        assert_eq!(
2385            vec![
2386                DefinitionInterpretation::NullableItem,
2387                DefinitionInterpretation::EmptyableList,
2388            ],
2389            repdefs.def_meaning
2390        );
2391    }
2392
2393    #[test]
2394    fn test_repdef_abnormal_nulls() {
2395        // List nulls are allowed to have non-empty offsets and garbage values
2396        // and the add_offsets call should normalize this
2397        let mut builder = RepDefBuilder::default();
2398        builder.add_offsets(
2399            offsets_32(&[0, 2, 5, 8]),
2400            Some(validity(&[true, false, true])),
2401        );
2402        // Note: we pass 5 here and not 8.  If add_offsets tells us there is garbage nulls they
2403        // should be removed before continuing
2404        builder.add_no_null(5);
2405
2406        let repdefs = RepDefBuilder::serialize(vec![builder]);
2407
2408        let rep = repdefs.repetition_levels.unwrap();
2409        let def = repdefs.definition_levels.unwrap();
2410
2411        assert_eq!([1, 0, 1, 1, 0, 0], *rep);
2412        assert_eq!([0, 0, 1, 0, 0, 0], *def);
2413
2414        assert_eq!(
2415            vec![
2416                DefinitionInterpretation::AllValidItem,
2417                DefinitionInterpretation::NullableList,
2418            ],
2419            repdefs.def_meaning
2420        );
2421    }
2422
2423    #[test]
2424    fn test_repdef_fsl() {
2425        let mut builder = RepDefBuilder::default();
2426        builder.add_fsl(Some(validity(&[true, false])), 2, 2);
2427        builder.add_fsl(None, 2, 4);
2428        builder.add_validity_bitmap(validity(&[
2429            true, false, true, false, true, false, true, false,
2430        ]));
2431
2432        let repdefs = RepDefBuilder::serialize(vec![builder]);
2433
2434        assert_eq!(
2435            vec![
2436                DefinitionInterpretation::NullableItem,
2437                DefinitionInterpretation::AllValidItem,
2438                DefinitionInterpretation::NullableItem
2439            ],
2440            repdefs.def_meaning
2441        );
2442
2443        assert!(repdefs.repetition_levels.is_none());
2444
2445        let def = repdefs.definition_levels.unwrap();
2446
2447        assert_eq!([0, 1, 0, 1, 2, 2, 2, 2], *def);
2448
2449        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2450            None,
2451            Some(def.as_ref().to_vec()),
2452            repdefs.def_meaning.into(),
2453        )]);
2454
2455        assert_eq!(
2456            unraveler.unravel_validity(8),
2457            Some(validity(&[
2458                true, false, true, false, false, false, false, false
2459            ]))
2460        );
2461        assert_eq!(unraveler.unravel_fsl_validity(4, 2), None);
2462        assert_eq!(
2463            unraveler.unravel_fsl_validity(2, 2),
2464            Some(validity(&[true, false]))
2465        );
2466    }
2467
2468    #[test]
2469    fn test_repdef_fsl_allvalid_item() {
2470        let mut builder = RepDefBuilder::default();
2471        builder.add_fsl(Some(validity(&[true, false])), 2, 2);
2472        builder.add_fsl(None, 2, 4);
2473        builder.add_no_null(8);
2474
2475        let repdefs = RepDefBuilder::serialize(vec![builder]);
2476
2477        assert_eq!(
2478            vec![
2479                DefinitionInterpretation::AllValidItem,
2480                DefinitionInterpretation::AllValidItem,
2481                DefinitionInterpretation::NullableItem
2482            ],
2483            repdefs.def_meaning
2484        );
2485
2486        assert!(repdefs.repetition_levels.is_none());
2487
2488        let def = repdefs.definition_levels.unwrap();
2489
2490        assert_eq!([0, 0, 0, 0, 1, 1, 1, 1], *def);
2491
2492        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2493            None,
2494            Some(def.as_ref().to_vec()),
2495            repdefs.def_meaning.into(),
2496        )]);
2497
2498        assert_eq!(unraveler.unravel_validity(8), None);
2499        assert_eq!(unraveler.unravel_fsl_validity(4, 2), None);
2500        assert_eq!(
2501            unraveler.unravel_fsl_validity(2, 2),
2502            Some(validity(&[true, false]))
2503        );
2504    }
2505
2506    #[test]
2507    fn test_repdef_sliced_offsets() {
2508        // Sliced lists may have offsets that don't start with zero.  The
2509        // add_offsets call needs to normalize these to operate correctly.
2510        let mut builder = RepDefBuilder::default();
2511        builder.add_offsets(
2512            offsets_32(&[5, 7, 7, 10]),
2513            Some(validity(&[true, false, true])),
2514        );
2515        builder.add_no_null(5);
2516
2517        let repdefs = RepDefBuilder::serialize(vec![builder]);
2518
2519        let rep = repdefs.repetition_levels.unwrap();
2520        let def = repdefs.definition_levels.unwrap();
2521
2522        assert_eq!([1, 0, 1, 1, 0, 0], *rep);
2523        assert_eq!([0, 0, 1, 0, 0, 0], *def);
2524
2525        assert_eq!(
2526            vec![
2527                DefinitionInterpretation::AllValidItem,
2528                DefinitionInterpretation::NullableList,
2529            ],
2530            repdefs.def_meaning
2531        );
2532    }
2533
2534    #[test]
2535    fn test_repdef_complex_null_empty() {
2536        let mut builder = RepDefBuilder::default();
2537        builder.add_offsets(
2538            offsets_32(&[0, 4, 4, 4, 6]),
2539            Some(validity(&[true, false, true, true])),
2540        );
2541        builder.add_offsets(
2542            offsets_32(&[0, 1, 1, 2, 2, 2, 3]),
2543            Some(validity(&[true, false, true, false, true, true])),
2544        );
2545        builder.add_no_null(3);
2546
2547        let repdefs = RepDefBuilder::serialize(vec![builder]);
2548
2549        let rep = repdefs.repetition_levels.unwrap();
2550        let def = repdefs.definition_levels.unwrap();
2551
2552        assert_eq!([2, 1, 1, 1, 2, 2, 2, 1], *rep);
2553        assert_eq!([0, 1, 0, 1, 3, 4, 2, 0], *def);
2554    }
2555
2556    #[test]
2557    fn test_repdef_empty_list_no_null() {
2558        // Tests when we have some empty lists but no null lists.  This case
2559        // caused some bugs because we have definition but no nulls
2560        let mut builder = RepDefBuilder::default();
2561        builder.add_offsets(offsets_32(&[0, 4, 4, 4, 6]), None);
2562        builder.add_no_null(6);
2563
2564        let repdefs = RepDefBuilder::serialize(vec![builder]);
2565
2566        let rep = repdefs.repetition_levels.unwrap();
2567        let def = repdefs.definition_levels.unwrap();
2568
2569        assert_eq!([1, 0, 0, 0, 1, 1, 1, 0], *rep);
2570        assert_eq!([0, 0, 0, 0, 1, 1, 0, 0], *def);
2571
2572        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2573            Some(rep.as_ref().to_vec()),
2574            Some(def.as_ref().to_vec()),
2575            repdefs.def_meaning.into(),
2576        )]);
2577
2578        assert_eq!(unraveler.unravel_validity(6), None);
2579        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2580        assert_eq!(off.inner(), offsets_32(&[0, 4, 4, 4, 6]).inner());
2581        assert_eq!(val, None);
2582    }
2583
2584    #[test]
2585    fn test_repdef_all_valid() {
2586        let mut builder = RepDefBuilder::default();
2587        builder.add_offsets(offsets_64(&[0, 2, 3, 5]), None);
2588        builder.add_offsets(offsets_64(&[0, 1, 3, 5, 7, 9]), None);
2589        builder.add_no_null(9);
2590
2591        let repdefs = RepDefBuilder::serialize(vec![builder]);
2592        let rep = repdefs.repetition_levels.unwrap();
2593        assert!(repdefs.definition_levels.is_none());
2594
2595        assert_eq!([2, 1, 0, 2, 0, 2, 0, 1, 0], *rep);
2596
2597        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2598            Some(rep.as_ref().to_vec()),
2599            None,
2600            repdefs.def_meaning.into(),
2601        )]);
2602
2603        assert_eq!(unraveler.unravel_validity(9), None);
2604        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2605        assert_eq!(off.inner(), offsets_32(&[0, 1, 3, 5, 7, 9]).inner());
2606        assert_eq!(val, None);
2607        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2608        assert_eq!(off.inner(), offsets_32(&[0, 2, 3, 5]).inner());
2609        assert_eq!(val, None);
2610    }
2611
2612    #[test]
2613    fn test_only_empty_lists() {
2614        let mut builder = RepDefBuilder::default();
2615        builder.add_offsets(offsets_32(&[0, 4, 4, 4, 6]), None);
2616        builder.add_no_null(6);
2617
2618        let repdefs = RepDefBuilder::serialize(vec![builder]);
2619
2620        let rep = repdefs.repetition_levels.unwrap();
2621        let def = repdefs.definition_levels.unwrap();
2622
2623        assert_eq!([1, 0, 0, 0, 1, 1, 1, 0], *rep);
2624        assert_eq!([0, 0, 0, 0, 1, 1, 0, 0], *def);
2625
2626        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2627            Some(rep.as_ref().to_vec()),
2628            Some(def.as_ref().to_vec()),
2629            repdefs.def_meaning.into(),
2630        )]);
2631
2632        assert_eq!(unraveler.unravel_validity(6), None);
2633        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2634        assert_eq!(off.inner(), offsets_32(&[0, 4, 4, 4, 6]).inner());
2635        assert_eq!(val, None);
2636    }
2637
2638    #[test]
2639    fn test_only_null_lists() {
2640        let mut builder = RepDefBuilder::default();
2641        builder.add_offsets(
2642            offsets_32(&[0, 4, 4, 4, 6]),
2643            Some(validity(&[true, false, false, true])),
2644        );
2645        builder.add_no_null(6);
2646
2647        let repdefs = RepDefBuilder::serialize(vec![builder]);
2648
2649        let rep = repdefs.repetition_levels.unwrap();
2650        let def = repdefs.definition_levels.unwrap();
2651
2652        assert_eq!([1, 0, 0, 0, 1, 1, 1, 0], *rep);
2653        assert_eq!([0, 0, 0, 0, 1, 1, 0, 0], *def);
2654
2655        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2656            Some(rep.as_ref().to_vec()),
2657            Some(def.as_ref().to_vec()),
2658            repdefs.def_meaning.into(),
2659        )]);
2660
2661        assert_eq!(unraveler.unravel_validity(6), None);
2662        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2663        assert_eq!(off.inner(), offsets_32(&[0, 4, 4, 4, 6]).inner());
2664        assert_eq!(val, Some(validity(&[true, false, false, true])));
2665    }
2666
2667    #[test]
2668    fn test_null_and_empty_lists() {
2669        let mut builder = RepDefBuilder::default();
2670        builder.add_offsets(
2671            offsets_32(&[0, 4, 4, 4, 6]),
2672            Some(validity(&[true, false, true, true])),
2673        );
2674        builder.add_no_null(6);
2675
2676        let repdefs = RepDefBuilder::serialize(vec![builder]);
2677
2678        let rep = repdefs.repetition_levels.unwrap();
2679        let def = repdefs.definition_levels.unwrap();
2680
2681        assert_eq!([1, 0, 0, 0, 1, 1, 1, 0], *rep);
2682        assert_eq!([0, 0, 0, 0, 1, 2, 0, 0], *def);
2683
2684        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2685            Some(rep.as_ref().to_vec()),
2686            Some(def.as_ref().to_vec()),
2687            repdefs.def_meaning.into(),
2688        )]);
2689
2690        assert_eq!(unraveler.unravel_validity(6), None);
2691        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2692        assert_eq!(off.inner(), offsets_32(&[0, 4, 4, 4, 6]).inner());
2693        assert_eq!(val, Some(validity(&[true, false, true, true])));
2694    }
2695
2696    #[test]
2697    fn test_repdef_no_rep() {
2698        let mut builder = RepDefBuilder::default();
2699        builder.add_no_null(5);
2700        builder.add_validity_bitmap(validity(&[false, false, true, true, true]));
2701        builder.add_validity_bitmap(validity(&[false, true, true, true, false]));
2702
2703        let repdefs = RepDefBuilder::serialize(vec![builder]);
2704        assert!(repdefs.repetition_levels.is_none());
2705        let def = repdefs.definition_levels.unwrap();
2706
2707        assert_eq!([2, 2, 0, 0, 1], *def);
2708
2709        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2710            None,
2711            Some(def.as_ref().to_vec()),
2712            repdefs.def_meaning.into(),
2713        )]);
2714
2715        assert_eq!(
2716            unraveler.unravel_validity(5),
2717            Some(validity(&[false, false, true, true, false]))
2718        );
2719        assert_eq!(
2720            unraveler.unravel_validity(5),
2721            Some(validity(&[false, false, true, true, true]))
2722        );
2723        assert_eq!(unraveler.unravel_validity(5), None);
2724    }
2725
2726    #[test]
2727    fn test_composite_unravel() {
2728        let mut builder = RepDefBuilder::default();
2729        builder.add_offsets(
2730            offsets_64(&[0, 2, 2, 5]),
2731            Some(validity(&[true, false, true])),
2732        );
2733        builder.add_no_null(5);
2734        let repdef1 = RepDefBuilder::serialize(vec![builder]);
2735
2736        let mut builder = RepDefBuilder::default();
2737        builder.add_offsets(offsets_64(&[0, 1, 3, 5, 7, 9]), None);
2738        builder.add_no_null(9);
2739        let repdef2 = RepDefBuilder::serialize(vec![builder]);
2740
2741        let rep1 = repdef1.repetition_levels.clone().unwrap();
2742        let def1 = repdef1.definition_levels.clone().unwrap();
2743        let rep2 = repdef2.repetition_levels.clone().unwrap();
2744        assert!(repdef2.definition_levels.is_none());
2745
2746        assert_eq!([1, 0, 1, 1, 0, 0], *rep1);
2747        assert_eq!([0, 0, 1, 0, 0, 0], *def1);
2748        assert_eq!([1, 1, 0, 1, 0, 1, 0, 1, 0], *rep2);
2749
2750        let unravel1 = RepDefUnraveler::new(
2751            repdef1.repetition_levels.map(|l| l.to_vec()),
2752            repdef1.definition_levels.map(|l| l.to_vec()),
2753            repdef1.def_meaning.into(),
2754        );
2755        let unravel2 = RepDefUnraveler::new(
2756            repdef2.repetition_levels.map(|l| l.to_vec()),
2757            repdef2.definition_levels.map(|l| l.to_vec()),
2758            repdef2.def_meaning.into(),
2759        );
2760
2761        let mut unraveler = CompositeRepDefUnraveler::new(vec![unravel1, unravel2]);
2762
2763        assert!(unraveler.unravel_validity(9).is_none());
2764        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2765        assert_eq!(
2766            off.inner(),
2767            offsets_32(&[0, 2, 2, 5, 6, 8, 10, 12, 14]).inner()
2768        );
2769        assert_eq!(
2770            val,
2771            Some(validity(&[true, false, true, true, true, true, true, true]))
2772        );
2773    }
2774
2775    #[test]
2776    fn test_repdef_multiple_builders() {
2777        // Basic case, rep & def
2778        let mut builder1 = RepDefBuilder::default();
2779        builder1.add_offsets(offsets_64(&[0, 2]), None);
2780        builder1.add_offsets(offsets_64(&[0, 1, 3]), None);
2781        builder1.add_validity_bitmap(validity(&[true, true, true]));
2782
2783        let mut builder2 = RepDefBuilder::default();
2784        builder2.add_offsets(offsets_64(&[0, 0, 3]), Some(validity(&[false, true])));
2785        builder2.add_offsets(
2786            offsets_64(&[0, 2, 2, 6]),
2787            Some(validity(&[true, false, true])),
2788        );
2789        builder2.add_validity_bitmap(validity(&[false, false, false, true, true, false]));
2790
2791        let repdefs = RepDefBuilder::serialize(vec![builder1, builder2]);
2792
2793        let rep = repdefs.repetition_levels.unwrap();
2794        let def = repdefs.definition_levels.unwrap();
2795
2796        assert_eq!([2, 1, 0, 2, 2, 0, 1, 1, 0, 0, 0], *rep);
2797        assert_eq!([0, 0, 0, 3, 1, 1, 2, 1, 0, 0, 1], *def);
2798    }
2799
2800    #[test]
2801    fn test_slicer() {
2802        let mut builder = RepDefBuilder::default();
2803        builder.add_offsets(
2804            offsets_64(&[0, 2, 2, 30, 30]),
2805            Some(validity(&[true, false, true, true])),
2806        );
2807        builder.add_no_null(30);
2808
2809        let repdefs = RepDefBuilder::serialize(vec![builder]);
2810
2811        let mut rep_slicer = repdefs.rep_slicer().unwrap();
2812
2813        // First 5 items include a null list so we get 6 levels (12 bytes)
2814        assert_eq!(rep_slicer.slice_next(5).len(), 12);
2815        // Next 20 are all plain
2816        assert_eq!(rep_slicer.slice_next(20).len(), 40);
2817        // Last 5 include an empty list so we get 6 levels (12 bytes)
2818        assert_eq!(rep_slicer.slice_rest().len(), 12);
2819
2820        let mut def_slicer = repdefs.rep_slicer().unwrap();
2821
2822        // First 5 items include a null list so we get 6 levels (12 bytes)
2823        assert_eq!(def_slicer.slice_next(5).len(), 12);
2824        // Next 20 are all plain
2825        assert_eq!(def_slicer.slice_next(20).len(), 40);
2826        // Last 5 include an empty list so we get 6 levels (12 bytes)
2827        assert_eq!(def_slicer.slice_rest().len(), 12);
2828    }
2829
2830    #[test]
2831    fn test_control_words() {
2832        // Convert to control words, verify expected, convert back, verify same as original
2833        fn check(
2834            rep: &[u16],
2835            def: &[u16],
2836            expected_values: Vec<u8>,
2837            expected_bytes_per_word: usize,
2838            expected_bits_rep: u8,
2839            expected_bits_def: u8,
2840        ) {
2841            let num_vals = rep.len().max(def.len());
2842            let max_rep = rep.iter().max().copied().unwrap_or(0);
2843            let max_def = def.iter().max().copied().unwrap_or(0);
2844
2845            let in_rep = if rep.is_empty() { None } else { Some(rep) };
2846            let in_def = if def.is_empty() { None } else { Some(def) };
2847
2848            let mut iter = super::build_control_word_iterator(
2849                in_rep,
2850                max_rep,
2851                in_def,
2852                max_def,
2853                max_def + 1,
2854                expected_values.len(),
2855            );
2856            assert_eq!(iter.bytes_per_word(), expected_bytes_per_word);
2857            assert_eq!(iter.bits_rep(), expected_bits_rep);
2858            assert_eq!(iter.bits_def(), expected_bits_def);
2859            let mut cw_vec = Vec::with_capacity(num_vals * iter.bytes_per_word());
2860
2861            for _ in 0..num_vals {
2862                iter.append_next(&mut cw_vec);
2863            }
2864            assert!(iter.append_next(&mut cw_vec).is_none());
2865
2866            assert_eq!(expected_values, cw_vec);
2867
2868            let parser = super::ControlWordParser::new(expected_bits_rep, expected_bits_def);
2869
2870            let mut rep_out = Vec::with_capacity(num_vals);
2871            let mut def_out = Vec::with_capacity(num_vals);
2872
2873            if expected_bytes_per_word > 0 {
2874                for slice in cw_vec.chunks_exact(expected_bytes_per_word) {
2875                    parser.parse(slice, &mut rep_out, &mut def_out);
2876                }
2877            }
2878
2879            assert_eq!(rep, rep_out.as_slice());
2880            assert_eq!(def, def_out.as_slice());
2881        }
2882
2883        // Each will need 4 bits and so we should get 1-byte control words
2884        let rep = &[0_u16, 7, 3, 2, 9, 8, 12, 5];
2885        let def = &[5_u16, 3, 1, 2, 12, 15, 0, 2];
2886        let expected = vec![
2887            0b00000101, // 0, 5
2888            0b01110011, // 7, 3
2889            0b00110001, // 3, 1
2890            0b00100010, // 2, 2
2891            0b10011100, // 9, 12
2892            0b10001111, // 8, 15
2893            0b11000000, // 12, 0
2894            0b01010010, // 5, 2
2895        ];
2896        check(rep, def, expected, 1, 4, 4);
2897
2898        // Now we need 5 bits for def so we get 2-byte control words
2899        let rep = &[0_u16, 7, 3, 2, 9, 8, 12, 5];
2900        let def = &[5_u16, 3, 1, 2, 12, 22, 0, 2];
2901        let expected = vec![
2902            0b00000101, 0b00000000, // 0, 5
2903            0b11100011, 0b00000000, // 7, 3
2904            0b01100001, 0b00000000, // 3, 1
2905            0b01000010, 0b00000000, // 2, 2
2906            0b00101100, 0b00000001, // 9, 12
2907            0b00010110, 0b00000001, // 8, 22
2908            0b10000000, 0b00000001, // 12, 0
2909            0b10100010, 0b00000000, // 5, 2
2910        ];
2911        check(rep, def, expected, 2, 4, 5);
2912
2913        // Just rep, 4 bits so 1 byte each
2914        let levels = &[0_u16, 7, 3, 2, 9, 8, 12, 5];
2915        let expected = vec![
2916            0b00000000, // 0
2917            0b00000111, // 7
2918            0b00000011, // 3
2919            0b00000010, // 2
2920            0b00001001, // 9
2921            0b00001000, // 8
2922            0b00001100, // 12
2923            0b00000101, // 5
2924        ];
2925        check(levels, &[], expected.clone(), 1, 4, 0);
2926
2927        // Just def
2928        check(&[], levels, expected, 1, 0, 4);
2929
2930        // No rep, no def, no bytes
2931        check(&[], &[], Vec::default(), 0, 0, 0);
2932    }
2933
2934    #[test]
2935    fn test_control_words_rep_index() {
2936        fn check(
2937            rep: &[u16],
2938            def: &[u16],
2939            expected_new_rows: Vec<bool>,
2940            expected_is_visible: Vec<bool>,
2941        ) {
2942            let num_vals = rep.len().max(def.len());
2943            let max_rep = rep.iter().max().copied().unwrap_or(0);
2944            let max_def = def.iter().max().copied().unwrap_or(0);
2945
2946            let in_rep = if rep.is_empty() { None } else { Some(rep) };
2947            let in_def = if def.is_empty() { None } else { Some(def) };
2948
2949            let mut iter = super::build_control_word_iterator(
2950                in_rep,
2951                max_rep,
2952                in_def,
2953                max_def,
2954                /*max_visible_def=*/ 2,
2955                expected_new_rows.len(),
2956            );
2957
2958            let mut cw_vec = Vec::with_capacity(num_vals * iter.bytes_per_word());
2959            let mut expected_new_rows = expected_new_rows.iter().copied();
2960            let mut expected_is_visible = expected_is_visible.iter().copied();
2961            for _ in 0..expected_new_rows.len() {
2962                let word_desc = iter.append_next(&mut cw_vec).unwrap();
2963                assert_eq!(word_desc.is_new_row, expected_new_rows.next().unwrap());
2964                assert_eq!(word_desc.is_visible, expected_is_visible.next().unwrap());
2965            }
2966            assert!(iter.append_next(&mut cw_vec).is_none());
2967        }
2968
2969        // 2 means new list
2970        let rep = &[2_u16, 1, 0, 2, 2, 0, 1, 1, 0, 2, 0];
2971        // These values don't matter for this test
2972        let def = &[0_u16, 0, 0, 3, 1, 1, 2, 1, 0, 0, 1];
2973
2974        // Rep & def
2975        check(
2976            rep,
2977            def,
2978            vec![
2979                true, false, false, true, true, false, false, false, false, true, false,
2980            ],
2981            vec![
2982                true, true, true, false, true, true, true, true, true, true, true,
2983            ],
2984        );
2985        // Rep only
2986        check(
2987            rep,
2988            &[],
2989            vec![
2990                true, false, false, true, true, false, false, false, false, true, false,
2991            ],
2992            vec![true; 11],
2993        );
2994        // No repetition
2995        check(
2996            &[],
2997            def,
2998            vec![
2999                true, true, true, true, true, true, true, true, true, true, true,
3000            ],
3001            vec![true; 11],
3002        );
3003        // No repetition, no definition
3004        check(
3005            &[],
3006            &[],
3007            vec![
3008                true, true, true, true, true, true, true, true, true, true, true,
3009            ],
3010            vec![true; 11],
3011        );
3012    }
3013
3014    #[test]
3015    fn regress_empty_list_case() {
3016        // This regresses a case where we had 3 null lists inside a struct
3017        let mut builder = RepDefBuilder::default();
3018        builder.add_validity_bitmap(validity(&[true, false, true]));
3019        builder.add_offsets(
3020            offsets_32(&[0, 0, 0, 0]),
3021            Some(validity(&[false, false, false])),
3022        );
3023        builder.add_no_null(0);
3024
3025        let repdefs = RepDefBuilder::serialize(vec![builder]);
3026        let rep = repdefs.repetition_levels.unwrap();
3027        let def = repdefs.definition_levels.unwrap();
3028
3029        assert_eq!([1, 1, 1], *rep);
3030        assert_eq!([1, 2, 1], *def);
3031
3032        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
3033            Some(rep.as_ref().to_vec()),
3034            Some(def.as_ref().to_vec()),
3035            repdefs.def_meaning.into(),
3036        )]);
3037
3038        assert_eq!(unraveler.unravel_validity(0), None);
3039        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
3040        assert_eq!(off.inner(), offsets_32(&[0, 0, 0, 0]).inner());
3041        assert_eq!(val, Some(validity(&[false, false, false])));
3042        let val = unraveler.unravel_validity(3).unwrap();
3043        assert_eq!(val.inner(), validity(&[true, false, true]).inner());
3044    }
3045
3046    #[test]
3047    fn regress_list_ends_null_case() {
3048        let mut builder = RepDefBuilder::default();
3049        builder.add_offsets(
3050            offsets_64(&[0, 1, 2, 2]),
3051            Some(validity(&[true, true, false])),
3052        );
3053        builder.add_offsets(offsets_64(&[0, 1, 1]), Some(validity(&[true, false])));
3054        builder.add_no_null(1);
3055
3056        let repdefs = RepDefBuilder::serialize(vec![builder]);
3057        let rep = repdefs.repetition_levels.unwrap();
3058        let def = repdefs.definition_levels.unwrap();
3059
3060        assert_eq!([2, 2, 2], *rep);
3061        assert_eq!([0, 1, 2], *def);
3062
3063        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
3064            Some(rep.as_ref().to_vec()),
3065            Some(def.as_ref().to_vec()),
3066            repdefs.def_meaning.into(),
3067        )]);
3068
3069        assert_eq!(unraveler.unravel_validity(1), None);
3070        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
3071        assert_eq!(off.inner(), offsets_32(&[0, 1, 1]).inner());
3072        assert_eq!(val, Some(validity(&[true, false])));
3073        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
3074        assert_eq!(off.inner(), offsets_32(&[0, 1, 2, 2]).inner());
3075        assert_eq!(val, Some(validity(&[true, true, false])));
3076    }
3077}