lance_encoding/
repdef.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Utilities for rep-def levels
5//!
6//! Repetition and definition levels are a way to encode multipile validity / offsets arrays
7//! into a single buffer.  They are a form of "zipping" buffers together that takes advantage
8//! of the fact that, if the outermost array is invalid, then the validity of the inner items
9//! is irrelevant.
10//!
11//! Note: the concept of repetition & definition levels comes from the Dremel paper and has
12//! been implemented in Apache Parquet.  However, the implementation here is not necessarily
13//! compatible with Parquet.  For example, we use 0 to represent the "inner-most" item and
14//! Parquet uses 0 to represent the "outer-most" item.
15//!
16//! # Repetition Levels
17//!
18//! With repetition levels we convert a sparse array of offsets into a dense array of levels.
19//! These levels are marked non-zero whenever a new list begins.  In other words, given the
20//! list array with 3 rows [{<0,1>, <>, <2>}, {<3>}, {}], [], [{<4>}] we would have three
21//! offsets arrays:
22//!
23//! Outer-most ([]): [0, 3, 3, 4]
24//! Middle     ({}): [0, 3, 4, 4, 5]
25//! Inner      (<>): [0, 2, 2, 3, 4, 5]
26//! Values         : [0, 1, 2, 3, 4]
27//!
28//! We can convert these into repetition levels as follows:
29//!
30//! | Values | Repetition |
31//! | ------ | ---------- |
32//! |      0 |          3 | // Start of outer-most list
33//! |      1 |          0 | // Continues inner-most list (no new lists)
34//! |      - |          1 | // Start of new inner-most list (empty list)
35//! |      2 |          1 | // Start of new inner-most list
36//! |      3 |          2 | // Start of new middle list
37//! |      - |          2 | // Start of new inner-most list (empty list)
38//! |      - |          3 | // Start of new outer-most list (empty list)
39//! |      4 |          0 | // Start of new outer-most list
40//!
41//! Note: We actually have MORE repetition levels than values.  This is because the repetition
42//! levels need to be able to represent empty lists.
43//!
44//! # Definition Levels
45//!
46//! Definition levels are simpler.  We can think of them as zipping together various validity bitmaps
47//! (from different levels of nesting) into a single buffer.  For example, we could zip the arrays
48//! [1, 1, 0, 0] and [1, 0, 1, 0] into [11, 10, 01, 00].  However, 00 and 01 are redundant.  If the
49//! outer level is null then the validity of the inner levels is irrelevant.  To save space we instead
50//! encode a "level" which is the "depth" of the null.  Let's look at a more complete example:
51//!
52//! Array: [{"middle": {"inner": 1]}}, NULL, {"middle": NULL}, {"middle": {"inner": NULL}}]
53//!
54//! In Arrow we would have the following validity arrays:
55//! Outer validity : 1, 0, 1, 1
56//! Middle validity: 1, ?, 0, 1
57//! Inner validity : 1, ?, ?, 0
58//! Values         : 1, ?, ?, ?
59//!
60//! The ? values are undefined in the Arrow format.  We can convert these into definition levels as follows:
61//!
62//! | Values | Definition |
63//! | ------ | ---------- |
64//! |      1 |          0 | // Valid at all levels
65//! |      - |          3 | // Null at outer level
66//! |      - |          2 | // Null at middle level
67//! |      - |          1 | // Null at inner level
68//!
69//! # Compression
70//!
71//! Note that we only need 2 bits of definition levels to represent 3 levels of nesting.  Definition
72//! levels are always more compact than the input validity arrays.  However, compressed levels are not
73//! necessarily more compact than the compressed validity arrays.
74//!
75//! Repetition levels are more complex.  If there are very large lists then a sparse array of offsets
76//! (which has one element per list) might be more compact than a dense array of repetition levels
77//! (which has one element per list value, possibly even more if there are empty lists).
78//!
79//! However, both repetition levels and definition levels are typically very compressible with RLE.
80//!
81//! However, in Lance we don't always take advantage of that compression because we want to be able
82//! to zip rep-def levels together with our values.  This gives us fewer IOPS when accessing row values.
83//!
84//! # Utilities in this Module
85//!
86//! - `RepDefBuilder` - Extracts validity and offset information from Arrow arrays.  We use this as we
87//!   shred the incoming data into primitive leaf arrays.  We don't immediately convert into rep-def because
88//!   we need to share and cheaply clone the builder when we have structs (each struct child shares some parent
89//!   validity / offset information)  The `serialize` method is called once all data has been received to create
90//!   the final rep-def levels.
91//!
92//! - `SerializerContext` - This is an internal utility that helps with serializing rep-def levels.
93//!
94//! - `CompositeRepDefUnraveler` - This structure is used to reverse the process.  It starts with a set of
95//!   rep-def levels and then uses the `unravel_validity` and `unravel_offsets` methods to produce validity
96//!   buffers and offset buffers.  It is "composite" because we may be combining sets of rep-def buffers from
97//!   multiple locations (e.g. multiple blocks in a mini-block encoded file).
98//!
99//! - `RepDefSlicer` - This is a utility that helps with slicing rep-def buffers.  These buffers are "kind of"
100//!   transparent (maps 1:1 with the values in the array) but not exactly because of the special (empty/null) lists.
101//!   The slicer helps with this issue and is used when slicing a rep-def buffer into mini-blocks.
102//!
103//! - `build_control_word_iterator` - This takes in rep-def levels and returns an iterator that returns byte-padded
104//!   "control words" which are used when creating full-zip encoded data.
105//!
106//! - `ControlWordParser` - This parser can parse the control words returned by `build_control_word_iterator` and is
107//!   used when decoding full-zip encoded data.
108
109use std::{
110    iter::{Copied, Zip},
111    sync::Arc,
112};
113
114use arrow_array::OffsetSizeTrait;
115use arrow_buffer::{
116    ArrowNativeType, BooleanBuffer, BooleanBufferBuilder, NullBuffer, OffsetBuffer, ScalarBuffer,
117};
118use lance_core::{utils::bit::log_2_ceil, Error, Result};
119use snafu::location;
120
121use crate::buffer::LanceBuffer;
122
123// We assume 16 bits is good enough for rep-def levels.  This _would_ give
124// us 65536 levels of struct nesting and list nesting.  However, we cut that
125// in half for SPECIAL_THRESHOLD because we use the top bit to indicate if an
126// item is a special value (null list / empty list) during construction.
127pub type LevelBuffer = Vec<u16>;
128
129// As we build def levels we add this to special values to indicate that they
130// are special so that we can skip over them when processing lower levels.
131//
132// We subtract this off at the end of construction to get the actual definition
133// levels.
134const SPECIAL_THRESHOLD: u16 = u16::MAX / 2;
135
136/// Represents information that we extract from a list array as we are
137/// encoding
138#[derive(Clone, Debug)]
139struct OffsetDesc {
140    offsets: Arc<[i64]>,
141    validity: Option<BooleanBuffer>,
142    has_empty_lists: bool,
143    num_values: usize,
144    num_specials: usize,
145}
146
147/// Represents validity information that we extract from non-list arrays (that
148/// have nulls) as we are encoding
149#[derive(Clone, Debug)]
150struct ValidityDesc {
151    validity: Option<BooleanBuffer>,
152    num_values: usize,
153}
154
155/// Represents validity information that we extract from FSL arrays.  This is
156/// just validity (no offsets) but we also record the dimension of the FSL array
157/// as that will impact the next layer
158#[derive(Clone, Debug)]
159struct FslDesc {
160    validity: Option<BooleanBuffer>,
161    dimension: usize,
162    num_values: usize,
163}
164
165// As we build up rep/def from arrow arrays we record a
166// series of RawRepDef objects.  Each one corresponds to layer
167// in the array structure
168#[derive(Clone, Debug)]
169enum RawRepDef {
170    Offsets(OffsetDesc),
171    Validity(ValidityDesc),
172    Fsl(FslDesc),
173}
174
175impl RawRepDef {
176    // Are there any nulls in this layer
177    fn has_nulls(&self) -> bool {
178        match self {
179            Self::Offsets(OffsetDesc { validity, .. }) => validity.is_some(),
180            Self::Validity(ValidityDesc { validity, .. }) => validity.is_some(),
181            Self::Fsl(FslDesc { validity, .. }) => validity.is_some(),
182        }
183    }
184
185    // How many values are in this layer
186    fn num_values(&self) -> usize {
187        match self {
188            Self::Offsets(OffsetDesc { num_values, .. }) => *num_values,
189            Self::Validity(ValidityDesc { num_values, .. }) => *num_values,
190            Self::Fsl(FslDesc { num_values, .. }) => *num_values,
191        }
192    }
193
194    /// How many empty/null lists are in this layer
195    fn num_specials(&self) -> usize {
196        match self {
197            Self::Offsets(OffsetDesc { num_specials, .. }) => *num_specials,
198            _ => 0,
199        }
200    }
201
202    /// How many definition levels do we need for this layer
203    fn max_def(&self) -> u16 {
204        match self {
205            Self::Offsets(OffsetDesc {
206                has_empty_lists,
207                validity,
208                ..
209            }) => {
210                let mut max_def = 0;
211                if *has_empty_lists {
212                    max_def += 1;
213                }
214                if validity.is_some() {
215                    max_def += 1;
216                }
217                max_def
218            }
219            Self::Validity(ValidityDesc { validity: None, .. }) => 0,
220            Self::Validity(ValidityDesc { .. }) => 1,
221            Self::Fsl(FslDesc { validity: None, .. }) => 0,
222            Self::Fsl(FslDesc { .. }) => 1,
223        }
224    }
225
226    /// How many repetition levels do we need for this layer
227    fn max_rep(&self) -> u16 {
228        match self {
229            Self::Offsets(_) => 1,
230            _ => 0,
231        }
232    }
233}
234
235/// Represents repetition and definition levels that have been
236/// serialized into a pair of (optional) level buffers
237#[derive(Debug)]
238pub struct SerializedRepDefs {
239    /// The repetition levels, one per item
240    ///
241    /// If None, there are no lists
242    pub repetition_levels: Option<Arc<[u16]>>,
243    /// The definition levels, one per item
244    ///
245    /// If None, there are no nulls
246    pub definition_levels: Option<Arc<[u16]>>,
247    /// The meaning of each definition level
248    pub def_meaning: Vec<DefinitionInterpretation>,
249    /// The maximum level that is "visible" from the lowest level
250    ///
251    /// This is the last level before we encounter a list level of some kind.  Once we've
252    /// hit a list level then nulls in any level beyond do not map to actual items.
253    ///
254    /// This is None if there are no lists
255    pub max_visible_level: Option<u16>,
256}
257
258impl SerializedRepDefs {
259    pub fn new(
260        repetition_levels: Option<LevelBuffer>,
261        definition_levels: Option<LevelBuffer>,
262        def_meaning: Vec<DefinitionInterpretation>,
263    ) -> Self {
264        let first_list = def_meaning.iter().position(|level| level.is_list());
265        let max_visible_level = first_list.map(|first_list| {
266            def_meaning
267                .iter()
268                .map(|level| level.num_def_levels())
269                .take(first_list)
270                .sum::<u16>()
271        });
272        Self {
273            repetition_levels: repetition_levels.map(Arc::from),
274            definition_levels: definition_levels.map(Arc::from),
275            def_meaning,
276            max_visible_level,
277        }
278    }
279
280    /// Creates an empty SerializedRepDefs (no repetition, all valid)
281    pub fn empty(def_meaning: Vec<DefinitionInterpretation>) -> Self {
282        Self {
283            repetition_levels: None,
284            definition_levels: None,
285            def_meaning,
286            max_visible_level: None,
287        }
288    }
289
290    pub fn rep_slicer(&self) -> Option<RepDefSlicer> {
291        self.repetition_levels
292            .as_ref()
293            .map(|rep| RepDefSlicer::new(self, rep.clone()))
294    }
295
296    pub fn def_slicer(&self) -> Option<RepDefSlicer> {
297        self.definition_levels
298            .as_ref()
299            .map(|def| RepDefSlicer::new(self, def.clone()))
300    }
301}
302
303/// Slices a level buffer into pieces
304///
305/// This is needed to handle the fact that a level buffer may have more
306/// levels than values due to special (empty/null) lists.
307///
308/// As a result, a call to `slice_next(10)` may return 10 levels or it may
309/// return more than 10 levels if any special values are encountered.
310#[derive(Debug)]
311pub struct RepDefSlicer<'a> {
312    repdef: &'a SerializedRepDefs,
313    to_slice: LanceBuffer,
314    current: usize,
315}
316
317// TODO: All of this logic will need some changing when we compress rep/def levels.
318impl<'a> RepDefSlicer<'a> {
319    fn new(repdef: &'a SerializedRepDefs, levels: Arc<[u16]>) -> Self {
320        Self {
321            repdef,
322            to_slice: LanceBuffer::reinterpret_slice(levels),
323            current: 0,
324        }
325    }
326
327    pub fn num_levels(&self) -> usize {
328        self.to_slice.len() / 2
329    }
330
331    pub fn num_levels_remaining(&self) -> usize {
332        self.num_levels() - self.current
333    }
334
335    pub fn all_levels(&self) -> &LanceBuffer {
336        &self.to_slice
337    }
338
339    /// Returns the rest of the levels not yet sliced
340    ///
341    /// This must be called instead of `slice_next` on the final iteration.
342    /// This is because anytime we slice there may be empty/null lists on the
343    /// boundary that are "free" and the current behavior in `slice_next` is to
344    /// leave them for the next call.
345    ///
346    /// `slice_rest` will slice all remaining levels and return them.
347    pub fn slice_rest(&mut self) -> LanceBuffer {
348        let start = self.current;
349        let remaining = self.num_levels_remaining();
350        self.current = self.num_levels();
351        self.to_slice.slice_with_length(start * 2, remaining * 2)
352    }
353
354    /// Returns enough levels to satisfy the next `num_values` values
355    pub fn slice_next(&mut self, num_values: usize) -> LanceBuffer {
356        let start = self.current;
357        let Some(max_visible_level) = self.repdef.max_visible_level else {
358            // No lists, should be 1:1 mapping from levels to values
359            self.current = start + num_values;
360            return self.to_slice.slice_with_length(start * 2, num_values * 2);
361        };
362        if let Some(def) = self.repdef.definition_levels.as_ref() {
363            // There are lists and there are def levels.  That means there may be
364            // more rep/def levels than values.  We need to scan the def levels to figure
365            // out which items are "invisible" and skip over them
366            let mut def_itr = def[start..].iter();
367            let mut num_taken = 0;
368            let mut num_passed = 0;
369            while num_taken < num_values {
370                let def_level = *def_itr.next().unwrap();
371                if def_level <= max_visible_level {
372                    num_taken += 1;
373                }
374                num_passed += 1;
375            }
376            self.current = start + num_passed;
377            self.to_slice.slice_with_length(start * 2, num_passed * 2)
378        } else {
379            // No def levels, should be 1:1 mapping from levels to values
380            self.current = start + num_values;
381            self.to_slice.slice_with_length(start * 2, num_values * 2)
382        }
383    }
384}
385
386/// This tells us how an array handles definition.  Given a stack of
387/// these and a nested array and a set of definition levels we can calculate
388/// how we should interpret the definition levels.
389///
390/// For example, if the interpretation is [AllValidItem, NullableItem] then
391/// a 0 means "valid item" and a 1 means "null struct".  If the interpretation
392/// is [NullableItem, NullableItem] then a 0 means "valid item" and a 1 means
393/// "null item" and a 2 means "null struct".
394///
395/// Lists are tricky because we might use up to two definition levels for a
396/// single layer of list nesting because we need one value to indicate "empty list"
397/// and another value to indicate "null list".
398#[derive(Debug, Copy, Clone, PartialEq, Eq)]
399pub enum DefinitionInterpretation {
400    AllValidItem,
401    AllValidList,
402    NullableItem,
403    NullableList,
404    EmptyableList,
405    NullableAndEmptyableList,
406}
407
408impl DefinitionInterpretation {
409    /// How many definition levels do we need for this layer
410    pub fn num_def_levels(&self) -> u16 {
411        match self {
412            Self::AllValidItem => 0,
413            Self::AllValidList => 0,
414            Self::NullableItem => 1,
415            Self::NullableList => 1,
416            Self::EmptyableList => 1,
417            Self::NullableAndEmptyableList => 2,
418        }
419    }
420
421    /// Does this layer have nulls?
422    pub fn is_all_valid(&self) -> bool {
423        matches!(
424            self,
425            Self::AllValidItem | Self::AllValidList | Self::EmptyableList
426        )
427    }
428
429    /// Does this layer represent a list?
430    pub fn is_list(&self) -> bool {
431        matches!(
432            self,
433            Self::AllValidList
434                | Self::NullableList
435                | Self::EmptyableList
436                | Self::NullableAndEmptyableList
437        )
438    }
439}
440
441/// The RepDefBuilder is used to collect offsets & validity buffers
442/// from arrow structures.  Once we have those we use the SerializerContext
443/// to build the actual repetition and definition levels.
444///
445/// We know ahead of time how many rep/def levels we will need (number of items
446/// in inner-most array + the number of empty/null lists in any parent arrays).
447///
448/// As a result we try and avoid any re-allocations by pre-allocating the buffers
449/// up front.  We allocate two copies of each buffer which allows us to avoid unsafe
450/// code caused by reading and writing to the same buffer (also, it's unavoidable
451/// because there are times we need to write 'faster' than we read)
452#[derive(Debug)]
453struct SerializerContext {
454    // This is built from outer-to-inner and then reversed at the end
455    def_meaning: Vec<DefinitionInterpretation>,
456    rep_levels: LevelBuffer,
457    spare_rep: LevelBuffer,
458    def_levels: LevelBuffer,
459    spare_def: LevelBuffer,
460    current_rep: u16,
461    current_def: u16,
462    current_len: usize,
463    current_num_specials: usize,
464}
465
466impl SerializerContext {
467    fn new(len: usize, num_layers: usize, max_rep: u16, max_def: u16) -> Self {
468        let def_meaning = Vec::with_capacity(num_layers);
469        Self {
470            rep_levels: if max_rep > 0 {
471                vec![0; len]
472            } else {
473                LevelBuffer::default()
474            },
475            spare_rep: if max_rep > 0 {
476                vec![0; len]
477            } else {
478                LevelBuffer::default()
479            },
480            def_levels: if max_def > 0 {
481                vec![0; len]
482            } else {
483                LevelBuffer::default()
484            },
485            spare_def: if max_def > 0 {
486                vec![0; len]
487            } else {
488                LevelBuffer::default()
489            },
490            def_meaning,
491            current_rep: max_rep,
492            current_def: max_def,
493            current_len: 0,
494            current_num_specials: 0,
495        }
496    }
497
498    fn checkout_def(&mut self, meaning: DefinitionInterpretation) -> u16 {
499        let def = self.current_def;
500        self.current_def -= meaning.num_def_levels();
501        self.def_meaning.push(meaning);
502        def
503    }
504
505    fn record_offsets(&mut self, offset_desc: &OffsetDesc) {
506        let rep_level = self.current_rep;
507        let (null_list_level, empty_list_level) =
508            match (offset_desc.validity.is_some(), offset_desc.has_empty_lists) {
509                (true, true) => {
510                    let level =
511                        self.checkout_def(DefinitionInterpretation::NullableAndEmptyableList);
512                    (level - 1, level)
513                }
514                (true, false) => (self.checkout_def(DefinitionInterpretation::NullableList), 0),
515                (false, true) => (
516                    0,
517                    self.checkout_def(DefinitionInterpretation::EmptyableList),
518                ),
519                (false, false) => {
520                    self.checkout_def(DefinitionInterpretation::AllValidList);
521                    (0, 0)
522                }
523            };
524        self.current_rep -= 1;
525
526        if let Some(validity) = &offset_desc.validity {
527            self.do_record_validity(validity, null_list_level);
528        }
529
530        // We write into the spare buffers and read from the active buffers
531        // and then swap at the end.  This way we don't write over what we
532        // are reading.
533
534        let mut new_len = 0;
535        assert!(self.rep_levels.len() >= offset_desc.num_values - 1 + self.current_num_specials);
536        if self.def_levels.is_empty() {
537            let mut write_itr = self.spare_rep.iter_mut();
538            let mut read_iter = self.rep_levels.iter().copied();
539            for w in offset_desc.offsets.windows(2) {
540                let len = w[1] - w[0];
541                // len can't be 0 because then we'd have def levels
542                assert!(len > 0);
543                let rep = read_iter.next().unwrap();
544                let list_level = if rep == 0 { rep_level } else { rep };
545                *write_itr.next().unwrap() = list_level;
546
547                for _ in 1..len {
548                    *write_itr.next().unwrap() = 0;
549                }
550                new_len += len as usize;
551            }
552            std::mem::swap(&mut self.rep_levels, &mut self.spare_rep);
553        } else {
554            assert!(
555                self.def_levels.len() >= offset_desc.num_values - 1 + self.current_num_specials
556            );
557            let mut def_write_itr = self.spare_def.iter_mut();
558            let mut rep_write_itr = self.spare_rep.iter_mut();
559            let mut rep_read_itr = self.rep_levels.iter().copied();
560            let mut def_read_itr = self.def_levels.iter().copied();
561            let specials_to_pass = self.current_num_specials;
562            let mut specials_passed = 0;
563
564            for w in offset_desc.offsets.windows(2) {
565                let mut def = def_read_itr.next().unwrap();
566                // Copy over any higher-level special values in place
567                while def > SPECIAL_THRESHOLD {
568                    *def_write_itr.next().unwrap() = def;
569                    *rep_write_itr.next().unwrap() = rep_read_itr.next().unwrap();
570                    def = def_read_itr.next().unwrap();
571                    new_len += 1;
572                    specials_passed += 1;
573                }
574
575                let len = w[1] - w[0];
576                let rep = rep_read_itr.next().unwrap();
577
578                // If the rep_level is 0 then we are the first list level
579                // otherwise we are starting a higher level list so keep
580                // existing rep level
581                let list_level = if rep == 0 { rep_level } else { rep };
582
583                if def == 0 && len > 0 {
584                    // New valid list, write a rep level and then add new 0/0 items
585                    *def_write_itr.next().unwrap() = 0;
586                    *rep_write_itr.next().unwrap() = list_level;
587
588                    for _ in 1..len {
589                        *def_write_itr.next().unwrap() = 0;
590                        *rep_write_itr.next().unwrap() = 0;
591                    }
592
593                    new_len += len as usize;
594                } else if def == 0 {
595                    // Empty list, insert new special
596                    *def_write_itr.next().unwrap() = empty_list_level + SPECIAL_THRESHOLD;
597                    *rep_write_itr.next().unwrap() = list_level;
598                    new_len += 1;
599                } else {
600                    // Either the list is null or one of its struct parents
601                    // is null.  Promote it to a special value.
602                    *def_write_itr.next().unwrap() = def + SPECIAL_THRESHOLD;
603                    *rep_write_itr.next().unwrap() = list_level;
604                    new_len += 1;
605                }
606            }
607
608            // If we have any special values at the end, we need to copy them over
609            while specials_passed < specials_to_pass {
610                *def_write_itr.next().unwrap() = def_read_itr.next().unwrap();
611                *rep_write_itr.next().unwrap() = rep_read_itr.next().unwrap();
612                new_len += 1;
613                specials_passed += 1;
614            }
615            std::mem::swap(&mut self.def_levels, &mut self.spare_def);
616            std::mem::swap(&mut self.rep_levels, &mut self.spare_rep);
617        }
618
619        self.current_len = new_len;
620        self.current_num_specials += offset_desc.num_specials;
621    }
622
623    fn do_record_validity(&mut self, validity: &BooleanBuffer, null_level: u16) {
624        assert!(self.def_levels.len() >= validity.len() + self.current_num_specials);
625        debug_assert!(
626            self.current_len == 0 || self.current_len == validity.len() + self.current_num_specials
627        );
628        self.current_len = validity.len();
629
630        let mut def_read_itr = self.def_levels.iter().copied();
631        let mut def_write_itr = self.spare_def.iter_mut();
632
633        let specials_to_pass = self.current_num_specials;
634        let mut specials_passed = 0;
635
636        for incoming_validity in validity.iter() {
637            let mut def = def_read_itr.next().unwrap();
638            while def > SPECIAL_THRESHOLD {
639                *def_write_itr.next().unwrap() = def;
640                def = def_read_itr.next().unwrap();
641                specials_passed += 1;
642            }
643            if def == 0 && !incoming_validity {
644                *def_write_itr.next().unwrap() = null_level;
645            } else {
646                *def_write_itr.next().unwrap() = def;
647            }
648        }
649
650        while specials_passed < specials_to_pass {
651            *def_write_itr.next().unwrap() = def_read_itr.next().unwrap();
652            specials_passed += 1;
653        }
654
655        std::mem::swap(&mut self.def_levels, &mut self.spare_def);
656    }
657
658    fn multiply_levels(&mut self, multiplier: usize) {
659        let old_len = self.current_len;
660        // All non-special values will be broadcasted by the multiplier.  Special values are copied as-is.
661        self.current_len =
662            (self.current_len - self.current_num_specials) * multiplier + self.current_num_specials;
663
664        if self.rep_levels.is_empty() && self.def_levels.is_empty() {
665            // All valid with no rep/def levels, nothing to do
666            return;
667        } else if self.rep_levels.is_empty() {
668            assert!(self.def_levels.len() >= self.current_len);
669            // No rep levels, just multiply the def levels
670            let mut def_read_itr = self.def_levels.iter().copied();
671            let mut def_write_itr = self.spare_def.iter_mut();
672            for _ in 0..old_len {
673                let mut def = def_read_itr.next().unwrap();
674                while def > SPECIAL_THRESHOLD {
675                    *def_write_itr.next().unwrap() = def;
676                    def = def_read_itr.next().unwrap();
677                }
678                for _ in 0..multiplier {
679                    *def_write_itr.next().unwrap() = def;
680                }
681            }
682        } else if self.def_levels.is_empty() {
683            assert!(self.rep_levels.len() >= self.current_len);
684            // No def levels, just multiply the rep levels
685            let mut rep_read_itr = self.rep_levels.iter().copied();
686            let mut rep_write_itr = self.spare_rep.iter_mut();
687            for _ in 0..old_len {
688                let rep = rep_read_itr.next().unwrap();
689                for _ in 0..multiplier {
690                    *rep_write_itr.next().unwrap() = rep;
691                }
692            }
693        } else {
694            assert!(self.rep_levels.len() >= self.current_len);
695            assert!(self.def_levels.len() >= self.current_len);
696            let mut rep_read_itr = self.rep_levels.iter().copied();
697            let mut def_read_itr = self.def_levels.iter().copied();
698            let mut rep_write_itr = self.spare_rep.iter_mut();
699            let mut def_write_itr = self.spare_def.iter_mut();
700            for _ in 0..old_len {
701                let mut def = def_read_itr.next().unwrap();
702                while def > SPECIAL_THRESHOLD {
703                    *def_write_itr.next().unwrap() = def;
704                    *rep_write_itr.next().unwrap() = rep_read_itr.next().unwrap();
705                    def = def_read_itr.next().unwrap();
706                }
707                let rep = rep_read_itr.next().unwrap();
708                for _ in 0..multiplier {
709                    *def_write_itr.next().unwrap() = def;
710                    *rep_write_itr.next().unwrap() = rep;
711                }
712            }
713        }
714        std::mem::swap(&mut self.def_levels, &mut self.spare_def);
715        std::mem::swap(&mut self.rep_levels, &mut self.spare_rep);
716    }
717
718    fn record_validity_buf(&mut self, validity: &Option<BooleanBuffer>) {
719        if let Some(validity) = validity {
720            let def_level = self.checkout_def(DefinitionInterpretation::NullableItem);
721            self.do_record_validity(validity, def_level);
722        } else {
723            self.checkout_def(DefinitionInterpretation::AllValidItem);
724        }
725    }
726
727    fn record_validity(&mut self, validity_desc: &ValidityDesc) {
728        self.record_validity_buf(&validity_desc.validity)
729    }
730
731    fn record_fsl(&mut self, fsl_desc: &FslDesc) {
732        self.record_validity_buf(&fsl_desc.validity);
733        self.multiply_levels(fsl_desc.dimension);
734    }
735
736    fn normalize_specials(&mut self) {
737        for def in self.def_levels.iter_mut() {
738            if *def > SPECIAL_THRESHOLD {
739                *def -= SPECIAL_THRESHOLD;
740            }
741        }
742    }
743
744    fn build(mut self) -> SerializedRepDefs {
745        if self.current_len == 0 {
746            return SerializedRepDefs::new(None, None, self.def_meaning);
747        }
748
749        self.normalize_specials();
750
751        let definition_levels = if self.def_levels.is_empty() {
752            None
753        } else {
754            Some(self.def_levels)
755        };
756        let repetition_levels = if self.rep_levels.is_empty() {
757            None
758        } else {
759            Some(self.rep_levels)
760        };
761
762        // Need to reverse the def meaning since we build rep / def levels in reverse
763        let def_meaning = self.def_meaning.into_iter().rev().collect::<Vec<_>>();
764
765        SerializedRepDefs::new(repetition_levels, definition_levels, def_meaning)
766    }
767}
768
769/// A structure used to collect validity buffers and offsets from arrow
770/// arrays and eventually create repetition and definition levels
771///
772/// As we are encoding the structural encoders are given this struct and
773/// will record the arrow information into it.  Once we hit a leaf node we
774/// serialize the data into rep/def levels and write these into the page.
775#[derive(Clone, Default, Debug)]
776pub struct RepDefBuilder {
777    // The rep/def info we have collected so far
778    repdefs: Vec<RawRepDef>,
779    // The current length, can get larger as we traverse lists (e.g. an
780    // array might have 5 lists which results in 50 items)
781    //
782    // Starts uninitialized until we see the first rep/def item
783    len: Option<usize>,
784}
785
786impl RepDefBuilder {
787    fn check_validity_len(&mut self, incoming_len: usize) {
788        if let Some(len) = self.len {
789            assert_eq!(incoming_len, len);
790        } else {
791            // First validity buffer we've seen
792            self.len = Some(incoming_len);
793        }
794    }
795
796    fn num_layers(&self) -> usize {
797        self.repdefs.len()
798    }
799
800    /// The builder is "empty" if there is no repetition and no nulls.  In this case we don't need
801    /// to store anything to disk (except the description)
802    fn is_empty(&self) -> bool {
803        self.repdefs
804            .iter()
805            .all(|r| matches!(r, RawRepDef::Validity(ValidityDesc { validity: None, .. })))
806    }
807
808    /// Returns true if there is only a single layer of definition
809    pub fn is_simple_validity(&self) -> bool {
810        self.repdefs.len() == 1 && matches!(self.repdefs[0], RawRepDef::Validity(_))
811    }
812
813    /// Registers a nullable validity bitmap
814    pub fn add_validity_bitmap(&mut self, validity: NullBuffer) {
815        self.check_validity_len(validity.len());
816        self.repdefs.push(RawRepDef::Validity(ValidityDesc {
817            num_values: validity.len(),
818            validity: Some(validity.into_inner()),
819        }));
820    }
821
822    /// Registers an all-valid validity layer
823    pub fn add_no_null(&mut self, len: usize) {
824        self.check_validity_len(len);
825        self.repdefs.push(RawRepDef::Validity(ValidityDesc {
826            validity: None,
827            num_values: len,
828        }));
829    }
830
831    pub fn add_fsl(&mut self, validity: Option<NullBuffer>, dimension: usize, num_values: usize) {
832        if let Some(len) = self.len {
833            assert_eq!(num_values, len);
834        }
835        self.len = Some(num_values * dimension);
836        debug_assert!(validity.is_none() || validity.as_ref().unwrap().len() == num_values);
837        self.repdefs.push(RawRepDef::Fsl(FslDesc {
838            num_values,
839            validity: validity.map(|v| v.into_inner()),
840            dimension,
841        }))
842    }
843
844    fn check_offset_len(&mut self, offsets: &[i64]) {
845        if let Some(len) = self.len {
846            assert!(offsets.len() == len + 1);
847        }
848        self.len = Some(offsets[offsets.len() - 1] as usize);
849    }
850
851    fn do_add_offsets(
852        &mut self,
853        lengths: impl Iterator<Item = i64>,
854        validity: Option<NullBuffer>,
855        capacity: usize,
856    ) -> bool {
857        let mut num_specials = 0;
858        let mut has_empty_lists = false;
859        let mut has_garbage_values = false;
860        let mut last_off: i64 = 0;
861
862        let mut normalized_offsets = Vec::with_capacity(capacity);
863        normalized_offsets.push(0);
864
865        if let Some(ref validity) = validity {
866            for (len, is_valid) in lengths.zip(validity.iter()) {
867                match (is_valid, len == 0) {
868                    (false, is_empty) => {
869                        num_specials += 1;
870                        has_garbage_values |= !is_empty;
871                    }
872                    (true, true) => {
873                        num_specials += 1;
874                        has_empty_lists = true;
875                    }
876                    _ => {
877                        last_off += len;
878                    }
879                }
880                normalized_offsets.push(last_off);
881            }
882        } else {
883            for len in lengths {
884                if len == 0 {
885                    num_specials += 1;
886                    has_empty_lists = true;
887                }
888                last_off += len;
889                normalized_offsets.push(last_off);
890            }
891        }
892
893        self.check_offset_len(&normalized_offsets);
894        self.repdefs.push(RawRepDef::Offsets(OffsetDesc {
895            num_values: normalized_offsets.len() - 1,
896            offsets: normalized_offsets.into(),
897            validity: validity.map(|v| v.into_inner()),
898            has_empty_lists,
899            num_specials: num_specials as usize,
900        }));
901
902        has_garbage_values
903    }
904
905    /// Adds a layer of offsets
906    ///
907    /// Offsets are casted to a common type (i64) and also normalized.  Null lists are
908    /// always represented by a zero-length (identical) pair of offsets and so the caller
909    /// should filter out any garbage items before encoding them.  To assist with this the
910    /// method will return true if any non-empty null lists were found.
911    pub fn add_offsets<O: OffsetSizeTrait>(
912        &mut self,
913        offsets: OffsetBuffer<O>,
914        validity: Option<NullBuffer>,
915    ) -> bool {
916        let inner = offsets.into_inner();
917        let buffer_len = inner.len();
918
919        if O::IS_LARGE {
920            let i64_buff = ScalarBuffer::<i64>::new(inner.into_inner(), 0, buffer_len);
921            let lengths = i64_buff.windows(2).map(|off| off[1] - off[0]);
922            self.do_add_offsets(lengths, validity, buffer_len)
923        } else {
924            let i32_buff = ScalarBuffer::<i32>::new(inner.into_inner(), 0, buffer_len);
925            let lengths = i32_buff.windows(2).map(|off| (off[1] - off[0]) as i64);
926            self.do_add_offsets(lengths, validity, buffer_len)
927        }
928    }
929
930    // When we are encoding data it arrives in batches.  For each batch we create a RepDefBuilder and collect the
931    // various validity buffers and offset buffers from that batch.  Once we have enough batches to write a page we
932    // need to take this collection of RepDefBuilders and concatenate them and then serialize them into rep/def levels.
933    //
934    // TODO: In the future, we may concatenate and serialize at the same time?
935    //
936    // This method takes care of the concatenation part.  First we collect all of layer 0 from each builder, then we
937    // call this method.  Then we collect all of layer 1 from each builder and call this method.  And so on.
938    //
939    // That means this method should get a collection of `RawRepDef` where each item is the same kind (all validity or
940    // all offsets) though the nullability / lengths may be different in each layer.
941    fn concat_layers<'a>(
942        layers: impl Iterator<Item = &'a RawRepDef>,
943        num_layers: usize,
944    ) -> RawRepDef {
945        enum LayerKind {
946            Validity,
947            Fsl,
948            Offsets,
949        }
950
951        // We make two passes through the layers.  The first determines if we need to pay the cost of allocating
952        // buffers.  The second pass actually adds the values.
953        let mut collected = Vec::with_capacity(num_layers);
954        let mut has_nulls = false;
955        let mut layer_kind = LayerKind::Validity;
956        let mut total_num_specials = 0;
957        let mut all_dimension = 0;
958        let mut all_has_empty_lists = false;
959        let mut all_num_values = 0;
960        for layer in layers {
961            has_nulls |= layer.has_nulls();
962            match layer {
963                RawRepDef::Validity(_) => {
964                    layer_kind = LayerKind::Validity;
965                }
966                RawRepDef::Offsets(OffsetDesc {
967                    num_specials,
968                    has_empty_lists,
969                    ..
970                }) => {
971                    all_has_empty_lists |= *has_empty_lists;
972                    layer_kind = LayerKind::Offsets;
973                    total_num_specials += num_specials;
974                }
975                RawRepDef::Fsl(FslDesc { dimension, .. }) => {
976                    layer_kind = LayerKind::Fsl;
977                    all_dimension = *dimension;
978                }
979            }
980            collected.push(layer);
981            all_num_values += layer.num_values();
982        }
983
984        // Shortcut if there are no nulls
985        if !has_nulls {
986            match layer_kind {
987                LayerKind::Validity => {
988                    return RawRepDef::Validity(ValidityDesc {
989                        validity: None,
990                        num_values: all_num_values,
991                    });
992                }
993                LayerKind::Fsl => {
994                    return RawRepDef::Fsl(FslDesc {
995                        validity: None,
996                        num_values: all_num_values,
997                        dimension: all_dimension,
998                    })
999                }
1000                LayerKind::Offsets => {}
1001            }
1002        }
1003
1004        // Only allocate if needed
1005        let mut validity_builder = if has_nulls {
1006            BooleanBufferBuilder::new(all_num_values)
1007        } else {
1008            BooleanBufferBuilder::new(0)
1009        };
1010        let mut all_offsets = if matches!(layer_kind, LayerKind::Offsets) {
1011            let mut all_offsets = Vec::with_capacity(all_num_values);
1012            all_offsets.push(0);
1013            all_offsets
1014        } else {
1015            Vec::new()
1016        };
1017
1018        for layer in collected {
1019            match layer {
1020                RawRepDef::Validity(ValidityDesc {
1021                    validity: Some(validity),
1022                    ..
1023                }) => {
1024                    validity_builder.append_buffer(validity);
1025                }
1026                RawRepDef::Validity(ValidityDesc {
1027                    validity: None,
1028                    num_values,
1029                }) => {
1030                    validity_builder.append_n(*num_values, true);
1031                }
1032                RawRepDef::Fsl(FslDesc {
1033                    validity,
1034                    num_values,
1035                    ..
1036                }) => {
1037                    if let Some(validity) = validity {
1038                        validity_builder.append_buffer(validity);
1039                    } else {
1040                        validity_builder.append_n(*num_values, true);
1041                    }
1042                }
1043                RawRepDef::Offsets(OffsetDesc {
1044                    offsets,
1045                    validity: Some(validity),
1046                    has_empty_lists,
1047                    ..
1048                }) => {
1049                    all_has_empty_lists |= has_empty_lists;
1050                    validity_builder.append_buffer(validity);
1051                    let last = *all_offsets.last().unwrap();
1052                    all_offsets.extend(offsets.iter().skip(1).map(|off| *off + last));
1053                }
1054                RawRepDef::Offsets(OffsetDesc {
1055                    offsets,
1056                    validity: None,
1057                    has_empty_lists,
1058                    num_values,
1059                    ..
1060                }) => {
1061                    all_has_empty_lists |= has_empty_lists;
1062                    if has_nulls {
1063                        validity_builder.append_n(*num_values, true);
1064                    }
1065                    let last = *all_offsets.last().unwrap();
1066                    all_offsets.extend(offsets.iter().skip(1).map(|off| *off + last));
1067                }
1068            }
1069        }
1070        let validity = if has_nulls {
1071            Some(validity_builder.finish())
1072        } else {
1073            None
1074        };
1075        match layer_kind {
1076            LayerKind::Fsl => RawRepDef::Fsl(FslDesc {
1077                validity,
1078                num_values: all_num_values,
1079                dimension: all_dimension,
1080            }),
1081            LayerKind::Validity => RawRepDef::Validity(ValidityDesc {
1082                validity,
1083                num_values: all_num_values,
1084            }),
1085            LayerKind::Offsets => RawRepDef::Offsets(OffsetDesc {
1086                offsets: all_offsets.into(),
1087                validity,
1088                has_empty_lists: all_has_empty_lists,
1089                num_values: all_num_values,
1090                num_specials: total_num_specials,
1091            }),
1092        }
1093    }
1094
1095    /// Converts the validity / offsets buffers that have been gathered so far
1096    /// into repetition and definition levels
1097    pub fn serialize(builders: Vec<Self>) -> SerializedRepDefs {
1098        assert!(!builders.is_empty());
1099        if builders.iter().all(|b| b.is_empty()) {
1100            // No repetition, all-valid
1101            return SerializedRepDefs::empty(
1102                builders
1103                    .first()
1104                    .unwrap()
1105                    .repdefs
1106                    .iter()
1107                    .map(|_| DefinitionInterpretation::AllValidItem)
1108                    .collect::<Vec<_>>(),
1109            );
1110        }
1111
1112        let num_layers = builders[0].num_layers();
1113        let combined_layers = (0..num_layers)
1114            .map(|layer_index| {
1115                Self::concat_layers(
1116                    builders.iter().map(|b| &b.repdefs[layer_index]),
1117                    builders.len(),
1118                )
1119            })
1120            .collect::<Vec<_>>();
1121        debug_assert!(builders
1122            .iter()
1123            .all(|b| b.num_layers() == builders[0].num_layers()));
1124
1125        let total_len = combined_layers.last().unwrap().num_values()
1126            + combined_layers
1127                .iter()
1128                .map(|l| l.num_specials())
1129                .sum::<usize>();
1130        let max_rep = combined_layers.iter().map(|l| l.max_rep()).sum::<u16>();
1131        let max_def = combined_layers.iter().map(|l| l.max_def()).sum::<u16>();
1132
1133        let mut context = SerializerContext::new(total_len, num_layers, max_rep, max_def);
1134        for layer in combined_layers.into_iter() {
1135            match layer {
1136                RawRepDef::Validity(def) => {
1137                    context.record_validity(&def);
1138                }
1139                RawRepDef::Offsets(rep) => {
1140                    context.record_offsets(&rep);
1141                }
1142                RawRepDef::Fsl(fsl) => {
1143                    context.record_fsl(&fsl);
1144                }
1145            }
1146        }
1147        context.build()
1148    }
1149}
1150
1151/// Starts with serialized repetition and definition levels and unravels
1152/// them into validity buffers and offsets buffers
1153///
1154/// This is used during decoding to create the necessary arrow structures
1155#[derive(Debug)]
1156pub struct RepDefUnraveler {
1157    rep_levels: Option<LevelBuffer>,
1158    def_levels: Option<LevelBuffer>,
1159    // Maps from definition level to the rep level at which that definition level is visible
1160    levels_to_rep: Vec<u16>,
1161    def_meaning: Arc<[DefinitionInterpretation]>,
1162    // Current definition level to compare to.
1163    current_def_cmp: u16,
1164    // Current rep level, determines which specials we can see
1165    current_rep_cmp: u16,
1166    // Current layer index, 0 means inner-most layer and it counts up from there.  Used to index
1167    // into special_defs
1168    current_layer: usize,
1169}
1170
1171impl RepDefUnraveler {
1172    /// Creates a new unraveler from serialized repetition and definition information
1173    pub fn new(
1174        rep_levels: Option<LevelBuffer>,
1175        def_levels: Option<LevelBuffer>,
1176        def_meaning: Arc<[DefinitionInterpretation]>,
1177    ) -> Self {
1178        let mut levels_to_rep = Vec::with_capacity(def_meaning.len());
1179        let mut rep_counter = 0;
1180        // Level=0 is always visible and means valid item
1181        levels_to_rep.push(0);
1182        for meaning in def_meaning.as_ref() {
1183            match meaning {
1184                DefinitionInterpretation::AllValidItem | DefinitionInterpretation::AllValidList => {
1185                    // There is no corresponding level, so nothing to put in levels_to_rep
1186                }
1187                DefinitionInterpretation::NullableItem => {
1188                    // Some null structs are not visible at inner rep levels in cases like LIST<STRUCT<LIST<...>>>
1189                    levels_to_rep.push(rep_counter);
1190                }
1191                DefinitionInterpretation::NullableList => {
1192                    rep_counter += 1;
1193                    levels_to_rep.push(rep_counter);
1194                }
1195                DefinitionInterpretation::EmptyableList => {
1196                    rep_counter += 1;
1197                    levels_to_rep.push(rep_counter);
1198                }
1199                DefinitionInterpretation::NullableAndEmptyableList => {
1200                    rep_counter += 1;
1201                    levels_to_rep.push(rep_counter);
1202                    levels_to_rep.push(rep_counter);
1203                }
1204            }
1205        }
1206        Self {
1207            rep_levels,
1208            def_levels,
1209            current_def_cmp: 0,
1210            current_rep_cmp: 0,
1211            levels_to_rep,
1212            current_layer: 0,
1213            def_meaning,
1214        }
1215    }
1216
1217    pub fn is_all_valid(&self) -> bool {
1218        self.def_meaning[self.current_layer].is_all_valid()
1219    }
1220
1221    /// If the current level is a repetition layer then this returns the number of lists
1222    /// at this level.
1223    ///
1224    /// This is not valid to call when the current level is a struct/primitive layer because
1225    /// in some cases there may be no rep or def information to know this.
1226    pub fn max_lists(&self) -> usize {
1227        debug_assert!(
1228            self.def_meaning[self.current_layer] != DefinitionInterpretation::NullableItem
1229        );
1230        self.rep_levels
1231            .as_ref()
1232            // Worst case every rep item is max_rep and a new list
1233            .map(|levels| levels.len())
1234            .unwrap_or(0)
1235    }
1236
1237    /// Unravels a layer of offsets from the unraveler into the given offset width
1238    ///
1239    /// When decoding a list the caller should first unravel the offsets and then
1240    /// unravel the validity (this is the opposite order used during encoding)
1241    pub fn unravel_offsets<T: ArrowNativeType>(
1242        &mut self,
1243        offsets: &mut Vec<T>,
1244        validity: Option<&mut BooleanBufferBuilder>,
1245    ) -> Result<()> {
1246        let rep_levels = self
1247            .rep_levels
1248            .as_mut()
1249            .expect("Expected repetition level but data didn't contain repetition");
1250        let valid_level = self.current_def_cmp;
1251        let (null_level, empty_level) = match self.def_meaning[self.current_layer] {
1252            DefinitionInterpretation::NullableList => {
1253                self.current_def_cmp += 1;
1254                (valid_level + 1, 0)
1255            }
1256            DefinitionInterpretation::EmptyableList => {
1257                self.current_def_cmp += 1;
1258                (0, valid_level + 1)
1259            }
1260            DefinitionInterpretation::NullableAndEmptyableList => {
1261                self.current_def_cmp += 2;
1262                (valid_level + 1, valid_level + 2)
1263            }
1264            DefinitionInterpretation::AllValidList => (0, 0),
1265            _ => unreachable!(),
1266        };
1267        self.current_layer += 1;
1268
1269        // This is the highest def level that is still visible.  Once we hit a list then
1270        // we stop looking because any null / empty list (or list masked by a higher level
1271        // null) will not be visible
1272        let mut max_level = null_level.max(empty_level);
1273        // Anything higher than this (but less than max_level) is a null struct masking our
1274        // list.  We will materialize this is a null list.
1275        let upper_null = max_level;
1276        for level in self.def_meaning[self.current_layer..].iter() {
1277            match level {
1278                DefinitionInterpretation::NullableItem => {
1279                    max_level += 1;
1280                }
1281                DefinitionInterpretation::AllValidItem => {}
1282                _ => {
1283                    break;
1284                }
1285            }
1286        }
1287
1288        let mut curlen: usize = offsets.last().map(|o| o.as_usize()).unwrap_or(0);
1289
1290        // If offsets is empty this is a no-op.  If offsets is not empty that means we already
1291        // added a set of offsets.  For example, we might have added [0, 3, 5] (2 lists).  Now
1292        // say we want to add [0, 1, 4] (2 lists).  We should get [0, 3, 5, 6, 9] (4 lists).  If
1293        // we don't pop here we get [0, 3, 5, 5, 6, 9] which is wrong.
1294        //
1295        // Or, to think about it another way, if every unraveler adds the starting 0 and the trailing
1296        // length then we have N + unravelers.len() values instead of N + 1.
1297        offsets.pop();
1298
1299        let to_offset = |val: usize| {
1300            T::from_usize(val)
1301            .ok_or_else(|| Error::invalid_input("A single batch had more than i32::MAX values and so a large container type is required", location!()))
1302        };
1303        self.current_rep_cmp += 1;
1304        if let Some(def_levels) = &mut self.def_levels {
1305            assert!(rep_levels.len() == def_levels.len());
1306            // It's possible validity is None even if we have def levels.  For example, we might have
1307            // empty lists (which require def levels) but no nulls.
1308            let mut push_validity: Box<dyn FnMut(bool)> = if let Some(validity) = validity {
1309                Box::new(|is_valid| validity.append(is_valid))
1310            } else {
1311                Box::new(|_| {})
1312            };
1313            // This is a strange access pattern.  We are iterating over the rep/def levels and
1314            // at the same time writing the rep/def levels.  This means we need both a mutable
1315            // and immutable reference to the rep/def levels.
1316            let mut read_idx = 0;
1317            let mut write_idx = 0;
1318            while read_idx < rep_levels.len() {
1319                // SAFETY: We assert that rep_levels and def_levels have the same
1320                // len and read_idx and write_idx can never go past the end.
1321                unsafe {
1322                    let rep_val = *rep_levels.get_unchecked(read_idx);
1323                    if rep_val != 0 {
1324                        let def_val = *def_levels.get_unchecked(read_idx);
1325                        // Copy over
1326                        *rep_levels.get_unchecked_mut(write_idx) = rep_val - 1;
1327                        *def_levels.get_unchecked_mut(write_idx) = def_val;
1328                        write_idx += 1;
1329
1330                        if def_val == 0 {
1331                            // This is a valid list
1332                            offsets.push(to_offset(curlen)?);
1333                            curlen += 1;
1334                            push_validity(true);
1335                        } else if def_val > max_level {
1336                            // This is not visible at this rep level, do not add to offsets, but keep in repdef
1337                        } else if def_val == null_level || def_val > upper_null {
1338                            // This is a null list (or a list masked by a null struct)
1339                            offsets.push(to_offset(curlen)?);
1340                            push_validity(false);
1341                        } else if def_val == empty_level {
1342                            // This is an empty list
1343                            offsets.push(to_offset(curlen)?);
1344                            push_validity(true);
1345                        } else {
1346                            // New valid list starting with null item
1347                            offsets.push(to_offset(curlen)?);
1348                            curlen += 1;
1349                            push_validity(true);
1350                        }
1351                    } else {
1352                        curlen += 1;
1353                    }
1354                    read_idx += 1;
1355                }
1356            }
1357            offsets.push(to_offset(curlen)?);
1358            rep_levels.truncate(write_idx);
1359            def_levels.truncate(write_idx);
1360            Ok(())
1361        } else {
1362            // SAFETY: See above loop
1363            let mut read_idx = 0;
1364            let mut write_idx = 0;
1365            let old_offsets_len = offsets.len();
1366            while read_idx < rep_levels.len() {
1367                // SAFETY: read_idx / write_idx cannot go past rep_levels.len()
1368                unsafe {
1369                    let rep_val = *rep_levels.get_unchecked(read_idx);
1370                    if rep_val != 0 {
1371                        // Finish the current list
1372                        offsets.push(to_offset(curlen)?);
1373                        *rep_levels.get_unchecked_mut(write_idx) = rep_val - 1;
1374                        write_idx += 1;
1375                    }
1376                    curlen += 1;
1377                    read_idx += 1;
1378                }
1379            }
1380            let num_new_lists = offsets.len() - old_offsets_len;
1381            offsets.push(to_offset(curlen)?);
1382            rep_levels.truncate(offsets.len() - 1);
1383            if let Some(validity) = validity {
1384                // Even though we don't have validity it is possible another unraveler did and so we need
1385                // to push all valids
1386                validity.append_n(num_new_lists, true);
1387            }
1388            Ok(())
1389        }
1390    }
1391
1392    pub fn skip_validity(&mut self) {
1393        debug_assert!(
1394            self.def_meaning[self.current_layer] == DefinitionInterpretation::AllValidItem
1395        );
1396        self.current_layer += 1;
1397    }
1398
1399    /// Unravels a layer of validity from the definition levels
1400    pub fn unravel_validity(&mut self, validity: &mut BooleanBufferBuilder) {
1401        debug_assert!(
1402            self.def_meaning[self.current_layer] != DefinitionInterpretation::AllValidItem
1403        );
1404        self.current_layer += 1;
1405
1406        let def_levels = &self.def_levels.as_ref().unwrap();
1407
1408        let current_def_cmp = self.current_def_cmp;
1409        self.current_def_cmp += 1;
1410
1411        for is_valid in def_levels.iter().filter_map(|&level| {
1412            if self.levels_to_rep[level as usize] <= self.current_rep_cmp {
1413                Some(level <= current_def_cmp)
1414            } else {
1415                None
1416            }
1417        }) {
1418            validity.append(is_valid);
1419        }
1420    }
1421
1422    pub fn decimate(&mut self, dimension: usize) {
1423        if self.rep_levels.is_some() {
1424            // If we need to support this then I think we need to walk through the rep def levels to find
1425            // the spots at which we keep.  E.g. if we have:
1426            //  rep: 1 0 0 1 0 1 0 0 0 1 0 0
1427            //  def: 1 1 1 0 1 0 1 1 0 1 1 0
1428            //  dimension: 2
1429            //
1430            // The output should be:
1431            //  rep: 1 0 0 1 0 0 0
1432            //  def: 1 1 1 0 1 1 0
1433            //
1434            // Maybe there's some special logic for empty/null lists?  I'll save the headache for future me.
1435            todo!("Not yet supported FSL<...List<...>>");
1436        }
1437        let Some(def_levels) = self.def_levels.as_mut() else {
1438            return;
1439        };
1440        let mut read_idx = 0;
1441        let mut write_idx = 0;
1442        while read_idx < def_levels.len() {
1443            unsafe {
1444                *def_levels.get_unchecked_mut(write_idx) = *def_levels.get_unchecked(read_idx);
1445            }
1446            write_idx += 1;
1447            read_idx += dimension;
1448        }
1449        def_levels.truncate(write_idx);
1450    }
1451}
1452
1453/// As we decode we may extract rep/def information from multiple pages (or multiple
1454/// chunks within a page).
1455///
1456/// For each chunk we create an unraveler.  Each unraveler can have a completely different
1457/// interpretation (e.g. one page might contain null items but no null structs and the next
1458/// page might have null structs but no null items).
1459///
1460/// Concatenating these unravelers would be tricky and expensive so instead we have a
1461/// composite unraveler which unravels across multiple unravelers.
1462///
1463/// Note: this class should be used even if there is only one page / unraveler.  This is
1464/// because the `RepDefUnraveler`'s API is more complex (it's meant to be called by this
1465/// class)
1466#[derive(Debug)]
1467pub struct CompositeRepDefUnraveler {
1468    unravelers: Vec<RepDefUnraveler>,
1469}
1470
1471impl CompositeRepDefUnraveler {
1472    pub fn new(unravelers: Vec<RepDefUnraveler>) -> Self {
1473        Self { unravelers }
1474    }
1475
1476    /// Unravels a layer of validity
1477    ///
1478    /// Returns None if there are no null items in this layer
1479    pub fn unravel_validity(&mut self, num_values: usize) -> Option<NullBuffer> {
1480        let is_all_valid = self
1481            .unravelers
1482            .iter()
1483            .all(|unraveler| unraveler.is_all_valid());
1484
1485        if is_all_valid {
1486            for unraveler in self.unravelers.iter_mut() {
1487                unraveler.skip_validity();
1488            }
1489            None
1490        } else {
1491            let mut validity = BooleanBufferBuilder::new(num_values);
1492            for unraveler in self.unravelers.iter_mut() {
1493                unraveler.unravel_validity(&mut validity);
1494            }
1495            Some(NullBuffer::new(validity.finish()))
1496        }
1497    }
1498
1499    pub fn unravel_fsl_validity(
1500        &mut self,
1501        num_values: usize,
1502        dimension: usize,
1503    ) -> Option<NullBuffer> {
1504        for unraveler in self.unravelers.iter_mut() {
1505            unraveler.decimate(dimension);
1506        }
1507        self.unravel_validity(num_values)
1508    }
1509
1510    /// Unravels a layer of offsets (and the validity for that layer)
1511    pub fn unravel_offsets<T: ArrowNativeType>(
1512        &mut self,
1513    ) -> Result<(OffsetBuffer<T>, Option<NullBuffer>)> {
1514        let mut is_all_valid = true;
1515        let mut max_num_lists = 0;
1516        for unraveler in self.unravelers.iter() {
1517            is_all_valid &= unraveler.is_all_valid();
1518            max_num_lists += unraveler.max_lists();
1519        }
1520
1521        let mut validity = if is_all_valid {
1522            None
1523        } else {
1524            // Note: This is probably an over-estimate and potentially even an under-estimate.  We only know
1525            // right now how many items we have and not how many rows.  (TODO: Shouldn't we know the # of rows?)
1526            Some(BooleanBufferBuilder::new(max_num_lists))
1527        };
1528
1529        let mut offsets = Vec::with_capacity(max_num_lists + 1);
1530
1531        for unraveler in self.unravelers.iter_mut() {
1532            unraveler.unravel_offsets(&mut offsets, validity.as_mut())?;
1533        }
1534
1535        Ok((
1536            OffsetBuffer::new(ScalarBuffer::from(offsets)),
1537            validity.map(|mut v| NullBuffer::new(v.finish())),
1538        ))
1539    }
1540}
1541
1542/// A [`ControlWordIterator`] when there are both repetition and definition levels
1543///
1544/// The iterator will put the repetition level in the upper bits and the definition
1545/// level in the lower bits.  The number of bits used for each level is determined
1546/// by the width of the repetition and definition levels.
1547#[derive(Debug)]
1548pub struct BinaryControlWordIterator<I: Iterator<Item = (u16, u16)>, W> {
1549    repdef: I,
1550    def_width: usize,
1551    max_rep: u16,
1552    max_visible_def: u16,
1553    rep_mask: u16,
1554    def_mask: u16,
1555    bits_rep: u8,
1556    bits_def: u8,
1557    phantom: std::marker::PhantomData<W>,
1558}
1559
1560impl<I: Iterator<Item = (u16, u16)>> BinaryControlWordIterator<I, u8> {
1561    fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1562        let next = self.repdef.next()?;
1563        let control_word: u8 =
1564            (((next.0 & self.rep_mask) as u8) << self.def_width) + ((next.1 & self.def_mask) as u8);
1565        buf.push(control_word);
1566        let is_new_row = next.0 == self.max_rep;
1567        let is_visible = next.1 <= self.max_visible_def;
1568        let is_valid_item = next.1 == 0;
1569        Some(ControlWordDesc {
1570            is_new_row,
1571            is_visible,
1572            is_valid_item,
1573        })
1574    }
1575}
1576
1577impl<I: Iterator<Item = (u16, u16)>> BinaryControlWordIterator<I, u16> {
1578    fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1579        let next = self.repdef.next()?;
1580        let control_word: u16 =
1581            ((next.0 & self.rep_mask) << self.def_width) + (next.1 & self.def_mask);
1582        let control_word = control_word.to_le_bytes();
1583        buf.push(control_word[0]);
1584        buf.push(control_word[1]);
1585        let is_new_row = next.0 == self.max_rep;
1586        let is_visible = next.1 <= self.max_visible_def;
1587        let is_valid_item = next.1 == 0;
1588        Some(ControlWordDesc {
1589            is_new_row,
1590            is_visible,
1591            is_valid_item,
1592        })
1593    }
1594}
1595
1596impl<I: Iterator<Item = (u16, u16)>> BinaryControlWordIterator<I, u32> {
1597    fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1598        let next = self.repdef.next()?;
1599        let control_word: u32 = (((next.0 & self.rep_mask) as u32) << self.def_width)
1600            + ((next.1 & self.def_mask) as u32);
1601        let control_word = control_word.to_le_bytes();
1602        buf.push(control_word[0]);
1603        buf.push(control_word[1]);
1604        buf.push(control_word[2]);
1605        buf.push(control_word[3]);
1606        let is_new_row = next.0 == self.max_rep;
1607        let is_visible = next.1 <= self.max_visible_def;
1608        let is_valid_item = next.1 == 0;
1609        Some(ControlWordDesc {
1610            is_new_row,
1611            is_visible,
1612            is_valid_item,
1613        })
1614    }
1615}
1616
1617/// A [`ControlWordIterator`] when there are only definition levels or only repetition levels
1618#[derive(Debug)]
1619pub struct UnaryControlWordIterator<I: Iterator<Item = u16>, W> {
1620    repdef: I,
1621    level_mask: u16,
1622    bits_rep: u8,
1623    bits_def: u8,
1624    max_rep: u16,
1625    phantom: std::marker::PhantomData<W>,
1626}
1627
1628impl<I: Iterator<Item = u16>> UnaryControlWordIterator<I, u8> {
1629    fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1630        let next = self.repdef.next()?;
1631        buf.push((next & self.level_mask) as u8);
1632        let is_new_row = self.max_rep == 0 || next == self.max_rep;
1633        let is_valid_item = next == 0 || self.bits_def == 0;
1634        Some(ControlWordDesc {
1635            is_new_row,
1636            // Either there is no rep, in which case there are no invisible items
1637            // or there is no def, in which case there are no invisible items
1638            is_visible: true,
1639            is_valid_item,
1640        })
1641    }
1642}
1643
1644impl<I: Iterator<Item = u16>> UnaryControlWordIterator<I, u16> {
1645    fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1646        let next = self.repdef.next().unwrap() & self.level_mask;
1647        let control_word = next.to_le_bytes();
1648        buf.push(control_word[0]);
1649        buf.push(control_word[1]);
1650        let is_new_row = self.max_rep == 0 || next == self.max_rep;
1651        let is_valid_item = next == 0 || self.bits_def == 0;
1652        Some(ControlWordDesc {
1653            is_new_row,
1654            is_visible: true,
1655            is_valid_item,
1656        })
1657    }
1658}
1659
1660impl<I: Iterator<Item = u16>> UnaryControlWordIterator<I, u32> {
1661    fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1662        let next = self.repdef.next()?;
1663        let next = (next & self.level_mask) as u32;
1664        let control_word = next.to_le_bytes();
1665        buf.push(control_word[0]);
1666        buf.push(control_word[1]);
1667        buf.push(control_word[2]);
1668        buf.push(control_word[3]);
1669        let is_new_row = self.max_rep == 0 || next as u16 == self.max_rep;
1670        let is_valid_item = next == 0 || self.bits_def == 0;
1671        Some(ControlWordDesc {
1672            is_new_row,
1673            is_visible: true,
1674            is_valid_item,
1675        })
1676    }
1677}
1678
1679/// A [`ControlWordIterator`] when there are no repetition or definition levels
1680#[derive(Debug)]
1681pub struct NilaryControlWordIterator {
1682    len: usize,
1683    idx: usize,
1684}
1685
1686impl NilaryControlWordIterator {
1687    fn append_next(&mut self) -> Option<ControlWordDesc> {
1688        if self.idx == self.len {
1689            None
1690        } else {
1691            self.idx += 1;
1692            Some(ControlWordDesc {
1693                is_new_row: true,
1694                is_visible: true,
1695                is_valid_item: true,
1696            })
1697        }
1698    }
1699}
1700
1701/// Helper function to get a bit mask of the given width
1702fn get_mask(width: u16) -> u16 {
1703    (1 << width) - 1
1704}
1705
1706// We're really going out of our way to avoid boxing here but this will be called on a per-value basis
1707// so it is in the critical path.
1708type SpecificBinaryControlWordIterator<'a, T> = BinaryControlWordIterator<
1709    Zip<Copied<std::slice::Iter<'a, u16>>, Copied<std::slice::Iter<'a, u16>>>,
1710    T,
1711>;
1712
1713/// An iterator that generates control words from repetition and definition levels
1714///
1715/// "Control word" is just a fancy term for a single u8/u16/u32 that contains both
1716/// the repetition and definition in it.
1717///
1718/// In the large majority of case we only need a single byte to represent both the
1719/// repetition and definition levels.  However, if there is deep nesting then we may
1720/// need two bytes.  In the worst case we need 4 bytes though this suggests hundreds of
1721/// levels of nesting which seems unlikely to encounter in practice.
1722#[derive(Debug)]
1723pub enum ControlWordIterator<'a> {
1724    Binary8(SpecificBinaryControlWordIterator<'a, u8>),
1725    Binary16(SpecificBinaryControlWordIterator<'a, u16>),
1726    Binary32(SpecificBinaryControlWordIterator<'a, u32>),
1727    Unary8(UnaryControlWordIterator<Copied<std::slice::Iter<'a, u16>>, u8>),
1728    Unary16(UnaryControlWordIterator<Copied<std::slice::Iter<'a, u16>>, u16>),
1729    Unary32(UnaryControlWordIterator<Copied<std::slice::Iter<'a, u16>>, u32>),
1730    Nilary(NilaryControlWordIterator),
1731}
1732
1733/// Describes the properties of a control word
1734#[derive(Debug)]
1735pub struct ControlWordDesc {
1736    pub is_new_row: bool,
1737    pub is_visible: bool,
1738    pub is_valid_item: bool,
1739}
1740
1741impl ControlWordIterator<'_> {
1742    /// Appends the next control word to the buffer
1743    ///
1744    /// Returns true if this is the start of a new item (i.e. the repetition level is maxed out)
1745    pub fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1746        match self {
1747            Self::Binary8(iter) => iter.append_next(buf),
1748            Self::Binary16(iter) => iter.append_next(buf),
1749            Self::Binary32(iter) => iter.append_next(buf),
1750            Self::Unary8(iter) => iter.append_next(buf),
1751            Self::Unary16(iter) => iter.append_next(buf),
1752            Self::Unary32(iter) => iter.append_next(buf),
1753            Self::Nilary(iter) => iter.append_next(),
1754        }
1755    }
1756
1757    /// Return true if the control word iterator has repetition levels
1758    pub fn has_repetition(&self) -> bool {
1759        match self {
1760            Self::Binary8(_) | Self::Binary16(_) | Self::Binary32(_) => true,
1761            Self::Unary8(iter) => iter.bits_rep > 0,
1762            Self::Unary16(iter) => iter.bits_rep > 0,
1763            Self::Unary32(iter) => iter.bits_rep > 0,
1764            Self::Nilary(_) => false,
1765        }
1766    }
1767
1768    /// Returns the number of bytes per control word
1769    pub fn bytes_per_word(&self) -> usize {
1770        match self {
1771            Self::Binary8(_) => 1,
1772            Self::Binary16(_) => 2,
1773            Self::Binary32(_) => 4,
1774            Self::Unary8(_) => 1,
1775            Self::Unary16(_) => 2,
1776            Self::Unary32(_) => 4,
1777            Self::Nilary(_) => 0,
1778        }
1779    }
1780
1781    /// Returns the number of bits used for the repetition level
1782    pub fn bits_rep(&self) -> u8 {
1783        match self {
1784            Self::Binary8(iter) => iter.bits_rep,
1785            Self::Binary16(iter) => iter.bits_rep,
1786            Self::Binary32(iter) => iter.bits_rep,
1787            Self::Unary8(iter) => iter.bits_rep,
1788            Self::Unary16(iter) => iter.bits_rep,
1789            Self::Unary32(iter) => iter.bits_rep,
1790            Self::Nilary(_) => 0,
1791        }
1792    }
1793
1794    /// Returns the number of bits used for the definition level
1795    pub fn bits_def(&self) -> u8 {
1796        match self {
1797            Self::Binary8(iter) => iter.bits_def,
1798            Self::Binary16(iter) => iter.bits_def,
1799            Self::Binary32(iter) => iter.bits_def,
1800            Self::Unary8(iter) => iter.bits_def,
1801            Self::Unary16(iter) => iter.bits_def,
1802            Self::Unary32(iter) => iter.bits_def,
1803            Self::Nilary(_) => 0,
1804        }
1805    }
1806}
1807
1808/// Builds a [`ControlWordIterator`] from repetition and definition levels
1809/// by first calculating the width needed and then creating the iterator
1810/// with the appropriate width
1811pub fn build_control_word_iterator<'a>(
1812    rep: Option<&'a [u16]>,
1813    max_rep: u16,
1814    def: Option<&'a [u16]>,
1815    max_def: u16,
1816    max_visible_def: u16,
1817    len: usize,
1818) -> ControlWordIterator<'a> {
1819    let rep_width = if max_rep == 0 {
1820        0
1821    } else {
1822        log_2_ceil(max_rep as u32) as u16
1823    };
1824    let rep_mask = if max_rep == 0 { 0 } else { get_mask(rep_width) };
1825    let def_width = if max_def == 0 {
1826        0
1827    } else {
1828        log_2_ceil(max_def as u32) as u16
1829    };
1830    let def_mask = if max_def == 0 { 0 } else { get_mask(def_width) };
1831    let total_width = rep_width + def_width;
1832    match (rep, def) {
1833        (Some(rep), Some(def)) => {
1834            let iter = rep.iter().copied().zip(def.iter().copied());
1835            let def_width = def_width as usize;
1836            if total_width <= 8 {
1837                ControlWordIterator::Binary8(BinaryControlWordIterator {
1838                    repdef: iter,
1839                    rep_mask,
1840                    def_mask,
1841                    def_width,
1842                    max_rep,
1843                    max_visible_def,
1844                    bits_rep: rep_width as u8,
1845                    bits_def: def_width as u8,
1846                    phantom: std::marker::PhantomData,
1847                })
1848            } else if total_width <= 16 {
1849                ControlWordIterator::Binary16(BinaryControlWordIterator {
1850                    repdef: iter,
1851                    rep_mask,
1852                    def_mask,
1853                    def_width,
1854                    max_rep,
1855                    max_visible_def,
1856                    bits_rep: rep_width as u8,
1857                    bits_def: def_width as u8,
1858                    phantom: std::marker::PhantomData,
1859                })
1860            } else {
1861                ControlWordIterator::Binary32(BinaryControlWordIterator {
1862                    repdef: iter,
1863                    rep_mask,
1864                    def_mask,
1865                    def_width,
1866                    max_rep,
1867                    max_visible_def,
1868                    bits_rep: rep_width as u8,
1869                    bits_def: def_width as u8,
1870                    phantom: std::marker::PhantomData,
1871                })
1872            }
1873        }
1874        (Some(lev), None) => {
1875            let iter = lev.iter().copied();
1876            if total_width <= 8 {
1877                ControlWordIterator::Unary8(UnaryControlWordIterator {
1878                    repdef: iter,
1879                    level_mask: rep_mask,
1880                    bits_rep: total_width as u8,
1881                    bits_def: 0,
1882                    max_rep,
1883                    phantom: std::marker::PhantomData,
1884                })
1885            } else if total_width <= 16 {
1886                ControlWordIterator::Unary16(UnaryControlWordIterator {
1887                    repdef: iter,
1888                    level_mask: rep_mask,
1889                    bits_rep: total_width as u8,
1890                    bits_def: 0,
1891                    max_rep,
1892                    phantom: std::marker::PhantomData,
1893                })
1894            } else {
1895                ControlWordIterator::Unary32(UnaryControlWordIterator {
1896                    repdef: iter,
1897                    level_mask: rep_mask,
1898                    bits_rep: total_width as u8,
1899                    bits_def: 0,
1900                    max_rep,
1901                    phantom: std::marker::PhantomData,
1902                })
1903            }
1904        }
1905        (None, Some(lev)) => {
1906            let iter = lev.iter().copied();
1907            if total_width <= 8 {
1908                ControlWordIterator::Unary8(UnaryControlWordIterator {
1909                    repdef: iter,
1910                    level_mask: def_mask,
1911                    bits_rep: 0,
1912                    bits_def: total_width as u8,
1913                    max_rep: 0,
1914                    phantom: std::marker::PhantomData,
1915                })
1916            } else if total_width <= 16 {
1917                ControlWordIterator::Unary16(UnaryControlWordIterator {
1918                    repdef: iter,
1919                    level_mask: def_mask,
1920                    bits_rep: 0,
1921                    bits_def: total_width as u8,
1922                    max_rep: 0,
1923                    phantom: std::marker::PhantomData,
1924                })
1925            } else {
1926                ControlWordIterator::Unary32(UnaryControlWordIterator {
1927                    repdef: iter,
1928                    level_mask: def_mask,
1929                    bits_rep: 0,
1930                    bits_def: total_width as u8,
1931                    max_rep: 0,
1932                    phantom: std::marker::PhantomData,
1933                })
1934            }
1935        }
1936        (None, None) => ControlWordIterator::Nilary(NilaryControlWordIterator { len, idx: 0 }),
1937    }
1938}
1939
1940/// A parser to unwrap control words into repetition and definition levels
1941///
1942/// This is the inverse of the [`ControlWordIterator`].
1943#[derive(Copy, Clone, Debug)]
1944pub enum ControlWordParser {
1945    // First item is the bits to shift, second is the mask to apply (the mask can be
1946    // calculated from the bits to shift but we don't want to calculate it each time)
1947    BOTH8(u8, u32),
1948    BOTH16(u8, u32),
1949    BOTH32(u8, u32),
1950    REP8,
1951    REP16,
1952    REP32,
1953    DEF8,
1954    DEF16,
1955    DEF32,
1956    NIL,
1957}
1958
1959impl ControlWordParser {
1960    fn parse_both<const WORD_SIZE: u8>(
1961        src: &[u8],
1962        dst_rep: &mut Vec<u16>,
1963        dst_def: &mut Vec<u16>,
1964        bits_to_shift: u8,
1965        mask_to_apply: u32,
1966    ) {
1967        match WORD_SIZE {
1968            1 => {
1969                let word = src[0];
1970                let rep = word >> bits_to_shift;
1971                let def = word & (mask_to_apply as u8);
1972                dst_rep.push(rep as u16);
1973                dst_def.push(def as u16);
1974            }
1975            2 => {
1976                let word = u16::from_le_bytes([src[0], src[1]]);
1977                let rep = word >> bits_to_shift;
1978                let def = word & mask_to_apply as u16;
1979                dst_rep.push(rep);
1980                dst_def.push(def);
1981            }
1982            4 => {
1983                let word = u32::from_le_bytes([src[0], src[1], src[2], src[3]]);
1984                let rep = word >> bits_to_shift;
1985                let def = word & mask_to_apply;
1986                dst_rep.push(rep as u16);
1987                dst_def.push(def as u16);
1988            }
1989            _ => unreachable!(),
1990        }
1991    }
1992
1993    fn parse_desc_both<const WORD_SIZE: u8>(
1994        src: &[u8],
1995        bits_to_shift: u8,
1996        mask_to_apply: u32,
1997        max_rep: u16,
1998        max_visible_def: u16,
1999    ) -> ControlWordDesc {
2000        match WORD_SIZE {
2001            1 => {
2002                let word = src[0];
2003                let rep = word >> bits_to_shift;
2004                let def = word & (mask_to_apply as u8);
2005                let is_visible = def as u16 <= max_visible_def;
2006                let is_new_row = rep as u16 == max_rep;
2007                let is_valid_item = def == 0;
2008                ControlWordDesc {
2009                    is_visible,
2010                    is_new_row,
2011                    is_valid_item,
2012                }
2013            }
2014            2 => {
2015                let word = u16::from_le_bytes([src[0], src[1]]);
2016                let rep = word >> bits_to_shift;
2017                let def = word & mask_to_apply as u16;
2018                let is_visible = def <= max_visible_def;
2019                let is_new_row = rep == max_rep;
2020                let is_valid_item = def == 0;
2021                ControlWordDesc {
2022                    is_visible,
2023                    is_new_row,
2024                    is_valid_item,
2025                }
2026            }
2027            4 => {
2028                let word = u32::from_le_bytes([src[0], src[1], src[2], src[3]]);
2029                let rep = word >> bits_to_shift;
2030                let def = word & mask_to_apply;
2031                let is_visible = def as u16 <= max_visible_def;
2032                let is_new_row = rep as u16 == max_rep;
2033                let is_valid_item = def == 0;
2034                ControlWordDesc {
2035                    is_visible,
2036                    is_new_row,
2037                    is_valid_item,
2038                }
2039            }
2040            _ => unreachable!(),
2041        }
2042    }
2043
2044    fn parse_one<const WORD_SIZE: u8>(src: &[u8], dst: &mut Vec<u16>) {
2045        match WORD_SIZE {
2046            1 => {
2047                let word = src[0];
2048                dst.push(word as u16);
2049            }
2050            2 => {
2051                let word = u16::from_le_bytes([src[0], src[1]]);
2052                dst.push(word);
2053            }
2054            4 => {
2055                let word = u32::from_le_bytes([src[0], src[1], src[2], src[3]]);
2056                dst.push(word as u16);
2057            }
2058            _ => unreachable!(),
2059        }
2060    }
2061
2062    fn parse_rep_desc_one<const WORD_SIZE: u8>(src: &[u8], max_rep: u16) -> ControlWordDesc {
2063        match WORD_SIZE {
2064            1 => ControlWordDesc {
2065                is_new_row: src[0] as u16 == max_rep,
2066                is_visible: true,
2067                is_valid_item: true,
2068            },
2069            2 => ControlWordDesc {
2070                is_new_row: u16::from_le_bytes([src[0], src[1]]) == max_rep,
2071                is_visible: true,
2072                is_valid_item: true,
2073            },
2074            4 => ControlWordDesc {
2075                is_new_row: u32::from_le_bytes([src[0], src[1], src[2], src[3]]) as u16 == max_rep,
2076                is_visible: true,
2077                is_valid_item: true,
2078            },
2079            _ => unreachable!(),
2080        }
2081    }
2082
2083    fn parse_def_desc_one<const WORD_SIZE: u8>(src: &[u8]) -> ControlWordDesc {
2084        match WORD_SIZE {
2085            1 => ControlWordDesc {
2086                is_new_row: true,
2087                is_visible: true,
2088                is_valid_item: src[0] == 0,
2089            },
2090            2 => ControlWordDesc {
2091                is_new_row: true,
2092                is_visible: true,
2093                is_valid_item: u16::from_le_bytes([src[0], src[1]]) == 0,
2094            },
2095            4 => ControlWordDesc {
2096                is_new_row: true,
2097                is_visible: true,
2098                is_valid_item: u32::from_le_bytes([src[0], src[1], src[2], src[3]]) as u16 == 0,
2099            },
2100            _ => unreachable!(),
2101        }
2102    }
2103
2104    /// Returns the number of bytes per control word
2105    pub fn bytes_per_word(&self) -> usize {
2106        match self {
2107            Self::BOTH8(..) => 1,
2108            Self::BOTH16(..) => 2,
2109            Self::BOTH32(..) => 4,
2110            Self::REP8 => 1,
2111            Self::REP16 => 2,
2112            Self::REP32 => 4,
2113            Self::DEF8 => 1,
2114            Self::DEF16 => 2,
2115            Self::DEF32 => 4,
2116            Self::NIL => 0,
2117        }
2118    }
2119
2120    /// Appends the next control word to the rep & def buffers
2121    ///
2122    /// `src` should be pointing at the first byte (little endian) of the control word
2123    ///
2124    /// `dst_rep` and `dst_def` are the buffers to append the rep and def levels to.
2125    /// They will not be appended to if not needed.
2126    pub fn parse(&self, src: &[u8], dst_rep: &mut Vec<u16>, dst_def: &mut Vec<u16>) {
2127        match self {
2128            Self::BOTH8(bits_to_shift, mask_to_apply) => {
2129                Self::parse_both::<1>(src, dst_rep, dst_def, *bits_to_shift, *mask_to_apply)
2130            }
2131            Self::BOTH16(bits_to_shift, mask_to_apply) => {
2132                Self::parse_both::<2>(src, dst_rep, dst_def, *bits_to_shift, *mask_to_apply)
2133            }
2134            Self::BOTH32(bits_to_shift, mask_to_apply) => {
2135                Self::parse_both::<4>(src, dst_rep, dst_def, *bits_to_shift, *mask_to_apply)
2136            }
2137            Self::REP8 => Self::parse_one::<1>(src, dst_rep),
2138            Self::REP16 => Self::parse_one::<2>(src, dst_rep),
2139            Self::REP32 => Self::parse_one::<4>(src, dst_rep),
2140            Self::DEF8 => Self::parse_one::<1>(src, dst_def),
2141            Self::DEF16 => Self::parse_one::<2>(src, dst_def),
2142            Self::DEF32 => Self::parse_one::<4>(src, dst_def),
2143            Self::NIL => {}
2144        }
2145    }
2146
2147    /// Return true if the control words contain repetition information
2148    pub fn has_rep(&self) -> bool {
2149        match self {
2150            Self::BOTH8(..)
2151            | Self::BOTH16(..)
2152            | Self::BOTH32(..)
2153            | Self::REP8
2154            | Self::REP16
2155            | Self::REP32 => true,
2156            Self::DEF8 | Self::DEF16 | Self::DEF32 | Self::NIL => false,
2157        }
2158    }
2159
2160    /// Temporarily parses the control word to inspect its properties but does not append to any buffers
2161    pub fn parse_desc(&self, src: &[u8], max_rep: u16, max_visible_def: u16) -> ControlWordDesc {
2162        match self {
2163            Self::BOTH8(bits_to_shift, mask_to_apply) => Self::parse_desc_both::<1>(
2164                src,
2165                *bits_to_shift,
2166                *mask_to_apply,
2167                max_rep,
2168                max_visible_def,
2169            ),
2170            Self::BOTH16(bits_to_shift, mask_to_apply) => Self::parse_desc_both::<2>(
2171                src,
2172                *bits_to_shift,
2173                *mask_to_apply,
2174                max_rep,
2175                max_visible_def,
2176            ),
2177            Self::BOTH32(bits_to_shift, mask_to_apply) => Self::parse_desc_both::<4>(
2178                src,
2179                *bits_to_shift,
2180                *mask_to_apply,
2181                max_rep,
2182                max_visible_def,
2183            ),
2184            Self::REP8 => Self::parse_rep_desc_one::<1>(src, max_rep),
2185            Self::REP16 => Self::parse_rep_desc_one::<2>(src, max_rep),
2186            Self::REP32 => Self::parse_rep_desc_one::<4>(src, max_rep),
2187            Self::DEF8 => Self::parse_def_desc_one::<1>(src),
2188            Self::DEF16 => Self::parse_def_desc_one::<2>(src),
2189            Self::DEF32 => Self::parse_def_desc_one::<4>(src),
2190            Self::NIL => ControlWordDesc {
2191                is_new_row: true,
2192                is_valid_item: true,
2193                is_visible: true,
2194            },
2195        }
2196    }
2197
2198    /// Creates a new parser from the number of bits used for the repetition and definition levels
2199    pub fn new(bits_rep: u8, bits_def: u8) -> Self {
2200        let total_bits = bits_rep + bits_def;
2201
2202        enum WordSize {
2203            One,
2204            Two,
2205            Four,
2206        }
2207
2208        let word_size = if total_bits <= 8 {
2209            WordSize::One
2210        } else if total_bits <= 16 {
2211            WordSize::Two
2212        } else {
2213            WordSize::Four
2214        };
2215
2216        match (bits_rep > 0, bits_def > 0, word_size) {
2217            (false, false, _) => Self::NIL,
2218            (false, true, WordSize::One) => Self::DEF8,
2219            (false, true, WordSize::Two) => Self::DEF16,
2220            (false, true, WordSize::Four) => Self::DEF32,
2221            (true, false, WordSize::One) => Self::REP8,
2222            (true, false, WordSize::Two) => Self::REP16,
2223            (true, false, WordSize::Four) => Self::REP32,
2224            (true, true, WordSize::One) => Self::BOTH8(bits_def, get_mask(bits_def as u16) as u32),
2225            (true, true, WordSize::Two) => Self::BOTH16(bits_def, get_mask(bits_def as u16) as u32),
2226            (true, true, WordSize::Four) => {
2227                Self::BOTH32(bits_def, get_mask(bits_def as u16) as u32)
2228            }
2229        }
2230    }
2231}
2232
2233#[cfg(test)]
2234mod tests {
2235    use arrow_buffer::{NullBuffer, OffsetBuffer, ScalarBuffer};
2236
2237    use crate::repdef::{
2238        CompositeRepDefUnraveler, DefinitionInterpretation, RepDefUnraveler, SerializedRepDefs,
2239    };
2240
2241    use super::RepDefBuilder;
2242
2243    fn validity(values: &[bool]) -> NullBuffer {
2244        NullBuffer::from_iter(values.iter().copied())
2245    }
2246
2247    fn offsets_32(values: &[i32]) -> OffsetBuffer<i32> {
2248        OffsetBuffer::<i32>::new(ScalarBuffer::from_iter(values.iter().copied()))
2249    }
2250
2251    fn offsets_64(values: &[i64]) -> OffsetBuffer<i64> {
2252        OffsetBuffer::<i64>::new(ScalarBuffer::from_iter(values.iter().copied()))
2253    }
2254
2255    #[test]
2256    fn test_repdef_basic() {
2257        // Basic case, rep & def
2258        let mut builder = RepDefBuilder::default();
2259        builder.add_offsets(
2260            offsets_64(&[0, 2, 2, 5]),
2261            Some(validity(&[true, false, true])),
2262        );
2263        builder.add_offsets(
2264            offsets_64(&[0, 1, 3, 5, 5, 9]),
2265            Some(validity(&[true, true, true, false, true])),
2266        );
2267        builder.add_validity_bitmap(validity(&[
2268            true, true, true, false, false, false, true, true, false,
2269        ]));
2270
2271        let repdefs = RepDefBuilder::serialize(vec![builder]);
2272        let rep = repdefs.repetition_levels.unwrap();
2273        let def = repdefs.definition_levels.unwrap();
2274
2275        assert_eq!(vec![0, 0, 0, 3, 1, 1, 2, 1, 0, 0, 1], *def);
2276        assert_eq!(vec![2, 1, 0, 2, 2, 0, 1, 1, 0, 0, 0], *rep);
2277
2278        // [[I], [I, I]], NULL, [[NULL, NULL], NULL, [NULL, I, I, NULL]]
2279
2280        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2281            Some(rep.as_ref().to_vec()),
2282            Some(def.as_ref().to_vec()),
2283            repdefs.def_meaning.into(),
2284        )]);
2285
2286        // Note: validity doesn't exactly round-trip because repdef normalizes some of the
2287        // redundant validity values
2288        assert_eq!(
2289            unraveler.unravel_validity(9),
2290            Some(validity(&[
2291                true, true, true, false, false, false, true, true, false
2292            ]))
2293        );
2294        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2295        assert_eq!(off.inner(), offsets_32(&[0, 1, 3, 5, 5, 9]).inner());
2296        assert_eq!(val, Some(validity(&[true, true, true, false, true])));
2297        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2298        assert_eq!(off.inner(), offsets_32(&[0, 2, 2, 5]).inner());
2299        assert_eq!(val, Some(validity(&[true, false, true])));
2300    }
2301
2302    #[test]
2303    fn test_repdef_simple_null_empty_list() {
2304        let check = |repdefs: SerializedRepDefs, last_def: DefinitionInterpretation| {
2305            let rep = repdefs.repetition_levels.unwrap();
2306            let def = repdefs.definition_levels.unwrap();
2307
2308            assert_eq!([1, 0, 1, 1, 0, 0], *rep);
2309            assert_eq!([0, 0, 2, 0, 1, 0], *def);
2310            assert_eq!(
2311                vec![DefinitionInterpretation::NullableItem, last_def,],
2312                repdefs.def_meaning
2313            );
2314        };
2315
2316        // Null list and empty list should be serialized mostly the same
2317
2318        // Null case
2319        let mut builder = RepDefBuilder::default();
2320        builder.add_offsets(
2321            offsets_32(&[0, 2, 2, 5]),
2322            Some(validity(&[true, false, true])),
2323        );
2324        builder.add_validity_bitmap(validity(&[true, true, true, false, true]));
2325
2326        let repdefs = RepDefBuilder::serialize(vec![builder]);
2327
2328        check(repdefs, DefinitionInterpretation::NullableList);
2329
2330        // Empty case
2331        let mut builder = RepDefBuilder::default();
2332        builder.add_offsets(offsets_32(&[0, 2, 2, 5]), None);
2333        builder.add_validity_bitmap(validity(&[true, true, true, false, true]));
2334
2335        let repdefs = RepDefBuilder::serialize(vec![builder]);
2336
2337        check(repdefs, DefinitionInterpretation::EmptyableList);
2338    }
2339
2340    #[test]
2341    fn test_repdef_empty_list_at_end() {
2342        // Regresses a failure we encountered when the last item was an empty list
2343        let mut builder = RepDefBuilder::default();
2344        builder.add_offsets(offsets_32(&[0, 2, 5, 5]), None);
2345        builder.add_validity_bitmap(validity(&[true, true, true, false, true]));
2346
2347        let repdefs = RepDefBuilder::serialize(vec![builder]);
2348
2349        let rep = repdefs.repetition_levels.unwrap();
2350        let def = repdefs.definition_levels.unwrap();
2351
2352        assert_eq!([1, 0, 1, 0, 0, 1], *rep);
2353        assert_eq!([0, 0, 0, 1, 0, 2], *def);
2354        assert_eq!(
2355            vec![
2356                DefinitionInterpretation::NullableItem,
2357                DefinitionInterpretation::EmptyableList,
2358            ],
2359            repdefs.def_meaning
2360        );
2361    }
2362
2363    #[test]
2364    fn test_repdef_abnormal_nulls() {
2365        // List nulls are allowed to have non-empty offsets and garbage values
2366        // and the add_offsets call should normalize this
2367        let mut builder = RepDefBuilder::default();
2368        builder.add_offsets(
2369            offsets_32(&[0, 2, 5, 8]),
2370            Some(validity(&[true, false, true])),
2371        );
2372        // Note: we pass 5 here and not 8.  If add_offsets tells us there is garbage nulls they
2373        // should be removed before continuing
2374        builder.add_no_null(5);
2375
2376        let repdefs = RepDefBuilder::serialize(vec![builder]);
2377
2378        let rep = repdefs.repetition_levels.unwrap();
2379        let def = repdefs.definition_levels.unwrap();
2380
2381        assert_eq!([1, 0, 1, 1, 0, 0], *rep);
2382        assert_eq!([0, 0, 1, 0, 0, 0], *def);
2383
2384        assert_eq!(
2385            vec![
2386                DefinitionInterpretation::AllValidItem,
2387                DefinitionInterpretation::NullableList,
2388            ],
2389            repdefs.def_meaning
2390        );
2391    }
2392
2393    #[test]
2394    fn test_repdef_fsl() {
2395        let mut builder = RepDefBuilder::default();
2396        builder.add_fsl(Some(validity(&[true, false])), 2, 2);
2397        builder.add_fsl(None, 2, 4);
2398        builder.add_validity_bitmap(validity(&[
2399            true, false, true, false, true, false, true, false,
2400        ]));
2401
2402        let repdefs = RepDefBuilder::serialize(vec![builder]);
2403
2404        assert_eq!(
2405            vec![
2406                DefinitionInterpretation::NullableItem,
2407                DefinitionInterpretation::AllValidItem,
2408                DefinitionInterpretation::NullableItem
2409            ],
2410            repdefs.def_meaning
2411        );
2412
2413        assert!(repdefs.repetition_levels.is_none());
2414
2415        let def = repdefs.definition_levels.unwrap();
2416
2417        assert_eq!([0, 1, 0, 1, 2, 2, 2, 2], *def);
2418
2419        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2420            None,
2421            Some(def.as_ref().to_vec()),
2422            repdefs.def_meaning.into(),
2423        )]);
2424
2425        assert_eq!(
2426            unraveler.unravel_validity(8),
2427            Some(validity(&[
2428                true, false, true, false, false, false, false, false
2429            ]))
2430        );
2431        assert_eq!(unraveler.unravel_fsl_validity(4, 2), None);
2432        assert_eq!(
2433            unraveler.unravel_fsl_validity(2, 2),
2434            Some(validity(&[true, false]))
2435        );
2436    }
2437
2438    #[test]
2439    fn test_repdef_fsl_allvalid_item() {
2440        let mut builder = RepDefBuilder::default();
2441        builder.add_fsl(Some(validity(&[true, false])), 2, 2);
2442        builder.add_fsl(None, 2, 4);
2443        builder.add_no_null(8);
2444
2445        let repdefs = RepDefBuilder::serialize(vec![builder]);
2446
2447        assert_eq!(
2448            vec![
2449                DefinitionInterpretation::AllValidItem,
2450                DefinitionInterpretation::AllValidItem,
2451                DefinitionInterpretation::NullableItem
2452            ],
2453            repdefs.def_meaning
2454        );
2455
2456        assert!(repdefs.repetition_levels.is_none());
2457
2458        let def = repdefs.definition_levels.unwrap();
2459
2460        assert_eq!([0, 0, 0, 0, 1, 1, 1, 1], *def);
2461
2462        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2463            None,
2464            Some(def.as_ref().to_vec()),
2465            repdefs.def_meaning.into(),
2466        )]);
2467
2468        assert_eq!(unraveler.unravel_validity(8), None);
2469        assert_eq!(unraveler.unravel_fsl_validity(4, 2), None);
2470        assert_eq!(
2471            unraveler.unravel_fsl_validity(2, 2),
2472            Some(validity(&[true, false]))
2473        );
2474    }
2475
2476    #[test]
2477    fn test_repdef_sliced_offsets() {
2478        // Sliced lists may have offsets that don't start with zero.  The
2479        // add_offsets call needs to normalize these to operate correctly.
2480        let mut builder = RepDefBuilder::default();
2481        builder.add_offsets(
2482            offsets_32(&[5, 7, 7, 10]),
2483            Some(validity(&[true, false, true])),
2484        );
2485        builder.add_no_null(5);
2486
2487        let repdefs = RepDefBuilder::serialize(vec![builder]);
2488
2489        let rep = repdefs.repetition_levels.unwrap();
2490        let def = repdefs.definition_levels.unwrap();
2491
2492        assert_eq!([1, 0, 1, 1, 0, 0], *rep);
2493        assert_eq!([0, 0, 1, 0, 0, 0], *def);
2494
2495        assert_eq!(
2496            vec![
2497                DefinitionInterpretation::AllValidItem,
2498                DefinitionInterpretation::NullableList,
2499            ],
2500            repdefs.def_meaning
2501        );
2502    }
2503
2504    #[test]
2505    fn test_repdef_complex_null_empty() {
2506        let mut builder = RepDefBuilder::default();
2507        builder.add_offsets(
2508            offsets_32(&[0, 4, 4, 4, 6]),
2509            Some(validity(&[true, false, true, true])),
2510        );
2511        builder.add_offsets(
2512            offsets_32(&[0, 1, 1, 2, 2, 2, 3]),
2513            Some(validity(&[true, false, true, false, true, true])),
2514        );
2515        builder.add_no_null(3);
2516
2517        let repdefs = RepDefBuilder::serialize(vec![builder]);
2518
2519        let rep = repdefs.repetition_levels.unwrap();
2520        let def = repdefs.definition_levels.unwrap();
2521
2522        assert_eq!([2, 1, 1, 1, 2, 2, 2, 1], *rep);
2523        assert_eq!([0, 1, 0, 1, 3, 4, 2, 0], *def);
2524    }
2525
2526    #[test]
2527    fn test_repdef_empty_list_no_null() {
2528        // Tests when we have some empty lists but no null lists.  This case
2529        // caused some bugs because we have definition but no nulls
2530        let mut builder = RepDefBuilder::default();
2531        builder.add_offsets(offsets_32(&[0, 4, 4, 4, 6]), None);
2532        builder.add_no_null(6);
2533
2534        let repdefs = RepDefBuilder::serialize(vec![builder]);
2535
2536        let rep = repdefs.repetition_levels.unwrap();
2537        let def = repdefs.definition_levels.unwrap();
2538
2539        assert_eq!([1, 0, 0, 0, 1, 1, 1, 0], *rep);
2540        assert_eq!([0, 0, 0, 0, 1, 1, 0, 0], *def);
2541
2542        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2543            Some(rep.as_ref().to_vec()),
2544            Some(def.as_ref().to_vec()),
2545            repdefs.def_meaning.into(),
2546        )]);
2547
2548        assert_eq!(unraveler.unravel_validity(6), None);
2549        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2550        assert_eq!(off.inner(), offsets_32(&[0, 4, 4, 4, 6]).inner());
2551        assert_eq!(val, None);
2552    }
2553
2554    #[test]
2555    fn test_repdef_all_valid() {
2556        let mut builder = RepDefBuilder::default();
2557        builder.add_offsets(offsets_64(&[0, 2, 3, 5]), None);
2558        builder.add_offsets(offsets_64(&[0, 1, 3, 5, 7, 9]), None);
2559        builder.add_no_null(9);
2560
2561        let repdefs = RepDefBuilder::serialize(vec![builder]);
2562        let rep = repdefs.repetition_levels.unwrap();
2563        assert!(repdefs.definition_levels.is_none());
2564
2565        assert_eq!([2, 1, 0, 2, 0, 2, 0, 1, 0], *rep);
2566
2567        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2568            Some(rep.as_ref().to_vec()),
2569            None,
2570            repdefs.def_meaning.into(),
2571        )]);
2572
2573        assert_eq!(unraveler.unravel_validity(9), None);
2574        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2575        assert_eq!(off.inner(), offsets_32(&[0, 1, 3, 5, 7, 9]).inner());
2576        assert_eq!(val, None);
2577        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2578        assert_eq!(off.inner(), offsets_32(&[0, 2, 3, 5]).inner());
2579        assert_eq!(val, None);
2580    }
2581
2582    #[test]
2583    fn test_only_empty_lists() {
2584        let mut builder = RepDefBuilder::default();
2585        builder.add_offsets(offsets_32(&[0, 4, 4, 4, 6]), None);
2586        builder.add_no_null(6);
2587
2588        let repdefs = RepDefBuilder::serialize(vec![builder]);
2589
2590        let rep = repdefs.repetition_levels.unwrap();
2591        let def = repdefs.definition_levels.unwrap();
2592
2593        assert_eq!([1, 0, 0, 0, 1, 1, 1, 0], *rep);
2594        assert_eq!([0, 0, 0, 0, 1, 1, 0, 0], *def);
2595
2596        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2597            Some(rep.as_ref().to_vec()),
2598            Some(def.as_ref().to_vec()),
2599            repdefs.def_meaning.into(),
2600        )]);
2601
2602        assert_eq!(unraveler.unravel_validity(6), None);
2603        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2604        assert_eq!(off.inner(), offsets_32(&[0, 4, 4, 4, 6]).inner());
2605        assert_eq!(val, None);
2606    }
2607
2608    #[test]
2609    fn test_only_null_lists() {
2610        let mut builder = RepDefBuilder::default();
2611        builder.add_offsets(
2612            offsets_32(&[0, 4, 4, 4, 6]),
2613            Some(validity(&[true, false, false, true])),
2614        );
2615        builder.add_no_null(6);
2616
2617        let repdefs = RepDefBuilder::serialize(vec![builder]);
2618
2619        let rep = repdefs.repetition_levels.unwrap();
2620        let def = repdefs.definition_levels.unwrap();
2621
2622        assert_eq!([1, 0, 0, 0, 1, 1, 1, 0], *rep);
2623        assert_eq!([0, 0, 0, 0, 1, 1, 0, 0], *def);
2624
2625        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2626            Some(rep.as_ref().to_vec()),
2627            Some(def.as_ref().to_vec()),
2628            repdefs.def_meaning.into(),
2629        )]);
2630
2631        assert_eq!(unraveler.unravel_validity(6), None);
2632        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2633        assert_eq!(off.inner(), offsets_32(&[0, 4, 4, 4, 6]).inner());
2634        assert_eq!(val, Some(validity(&[true, false, false, true])));
2635    }
2636
2637    #[test]
2638    fn test_null_and_empty_lists() {
2639        let mut builder = RepDefBuilder::default();
2640        builder.add_offsets(
2641            offsets_32(&[0, 4, 4, 4, 6]),
2642            Some(validity(&[true, false, true, true])),
2643        );
2644        builder.add_no_null(6);
2645
2646        let repdefs = RepDefBuilder::serialize(vec![builder]);
2647
2648        let rep = repdefs.repetition_levels.unwrap();
2649        let def = repdefs.definition_levels.unwrap();
2650
2651        assert_eq!([1, 0, 0, 0, 1, 1, 1, 0], *rep);
2652        assert_eq!([0, 0, 0, 0, 1, 2, 0, 0], *def);
2653
2654        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2655            Some(rep.as_ref().to_vec()),
2656            Some(def.as_ref().to_vec()),
2657            repdefs.def_meaning.into(),
2658        )]);
2659
2660        assert_eq!(unraveler.unravel_validity(6), None);
2661        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2662        assert_eq!(off.inner(), offsets_32(&[0, 4, 4, 4, 6]).inner());
2663        assert_eq!(val, Some(validity(&[true, false, true, true])));
2664    }
2665
2666    #[test]
2667    fn test_repdef_no_rep() {
2668        let mut builder = RepDefBuilder::default();
2669        builder.add_no_null(5);
2670        builder.add_validity_bitmap(validity(&[false, false, true, true, true]));
2671        builder.add_validity_bitmap(validity(&[false, true, true, true, false]));
2672
2673        let repdefs = RepDefBuilder::serialize(vec![builder]);
2674        assert!(repdefs.repetition_levels.is_none());
2675        let def = repdefs.definition_levels.unwrap();
2676
2677        assert_eq!([2, 2, 0, 0, 1], *def);
2678
2679        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2680            None,
2681            Some(def.as_ref().to_vec()),
2682            repdefs.def_meaning.into(),
2683        )]);
2684
2685        assert_eq!(
2686            unraveler.unravel_validity(5),
2687            Some(validity(&[false, false, true, true, false]))
2688        );
2689        assert_eq!(
2690            unraveler.unravel_validity(5),
2691            Some(validity(&[false, false, true, true, true]))
2692        );
2693        assert_eq!(unraveler.unravel_validity(5), None);
2694    }
2695
2696    #[test]
2697    fn test_composite_unravel() {
2698        let mut builder = RepDefBuilder::default();
2699        builder.add_offsets(
2700            offsets_64(&[0, 2, 2, 5]),
2701            Some(validity(&[true, false, true])),
2702        );
2703        builder.add_no_null(5);
2704        let repdef1 = RepDefBuilder::serialize(vec![builder]);
2705
2706        let mut builder = RepDefBuilder::default();
2707        builder.add_offsets(offsets_64(&[0, 1, 3, 5, 7, 9]), None);
2708        builder.add_no_null(9);
2709        let repdef2 = RepDefBuilder::serialize(vec![builder]);
2710
2711        let rep1 = repdef1.repetition_levels.clone().unwrap();
2712        let def1 = repdef1.definition_levels.clone().unwrap();
2713        let rep2 = repdef2.repetition_levels.clone().unwrap();
2714        assert!(repdef2.definition_levels.is_none());
2715
2716        assert_eq!([1, 0, 1, 1, 0, 0], *rep1);
2717        assert_eq!([0, 0, 1, 0, 0, 0], *def1);
2718        assert_eq!([1, 1, 0, 1, 0, 1, 0, 1, 0], *rep2);
2719
2720        let unravel1 = RepDefUnraveler::new(
2721            repdef1.repetition_levels.map(|l| l.to_vec()),
2722            repdef1.definition_levels.map(|l| l.to_vec()),
2723            repdef1.def_meaning.into(),
2724        );
2725        let unravel2 = RepDefUnraveler::new(
2726            repdef2.repetition_levels.map(|l| l.to_vec()),
2727            repdef2.definition_levels.map(|l| l.to_vec()),
2728            repdef2.def_meaning.into(),
2729        );
2730
2731        let mut unraveler = CompositeRepDefUnraveler::new(vec![unravel1, unravel2]);
2732
2733        assert!(unraveler.unravel_validity(9).is_none());
2734        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2735        assert_eq!(
2736            off.inner(),
2737            offsets_32(&[0, 2, 2, 5, 6, 8, 10, 12, 14]).inner()
2738        );
2739        assert_eq!(
2740            val,
2741            Some(validity(&[true, false, true, true, true, true, true, true]))
2742        );
2743    }
2744
2745    #[test]
2746    fn test_repdef_multiple_builders() {
2747        // Basic case, rep & def
2748        let mut builder1 = RepDefBuilder::default();
2749        builder1.add_offsets(offsets_64(&[0, 2]), None);
2750        builder1.add_offsets(offsets_64(&[0, 1, 3]), None);
2751        builder1.add_validity_bitmap(validity(&[true, true, true]));
2752
2753        let mut builder2 = RepDefBuilder::default();
2754        builder2.add_offsets(offsets_64(&[0, 0, 3]), Some(validity(&[false, true])));
2755        builder2.add_offsets(
2756            offsets_64(&[0, 2, 2, 6]),
2757            Some(validity(&[true, false, true])),
2758        );
2759        builder2.add_validity_bitmap(validity(&[false, false, false, true, true, false]));
2760
2761        let repdefs = RepDefBuilder::serialize(vec![builder1, builder2]);
2762
2763        let rep = repdefs.repetition_levels.unwrap();
2764        let def = repdefs.definition_levels.unwrap();
2765
2766        assert_eq!([2, 1, 0, 2, 2, 0, 1, 1, 0, 0, 0], *rep);
2767        assert_eq!([0, 0, 0, 3, 1, 1, 2, 1, 0, 0, 1], *def);
2768    }
2769
2770    #[test]
2771    fn test_slicer() {
2772        let mut builder = RepDefBuilder::default();
2773        builder.add_offsets(
2774            offsets_64(&[0, 2, 2, 30, 30]),
2775            Some(validity(&[true, false, true, true])),
2776        );
2777        builder.add_no_null(30);
2778
2779        let repdefs = RepDefBuilder::serialize(vec![builder]);
2780
2781        let mut rep_slicer = repdefs.rep_slicer().unwrap();
2782
2783        // First 5 items include a null list so we get 6 levels (12 bytes)
2784        assert_eq!(rep_slicer.slice_next(5).len(), 12);
2785        // Next 20 are all plain
2786        assert_eq!(rep_slicer.slice_next(20).len(), 40);
2787        // Last 5 include an empty list so we get 6 levels (12 bytes)
2788        assert_eq!(rep_slicer.slice_rest().len(), 12);
2789
2790        let mut def_slicer = repdefs.rep_slicer().unwrap();
2791
2792        // First 5 items include a null list so we get 6 levels (12 bytes)
2793        assert_eq!(def_slicer.slice_next(5).len(), 12);
2794        // Next 20 are all plain
2795        assert_eq!(def_slicer.slice_next(20).len(), 40);
2796        // Last 5 include an empty list so we get 6 levels (12 bytes)
2797        assert_eq!(def_slicer.slice_rest().len(), 12);
2798    }
2799
2800    #[test]
2801    fn test_control_words() {
2802        // Convert to control words, verify expected, convert back, verify same as original
2803        fn check(
2804            rep: &[u16],
2805            def: &[u16],
2806            expected_values: Vec<u8>,
2807            expected_bytes_per_word: usize,
2808            expected_bits_rep: u8,
2809            expected_bits_def: u8,
2810        ) {
2811            let num_vals = rep.len().max(def.len());
2812            let max_rep = rep.iter().max().copied().unwrap_or(0);
2813            let max_def = def.iter().max().copied().unwrap_or(0);
2814
2815            let in_rep = if rep.is_empty() { None } else { Some(rep) };
2816            let in_def = if def.is_empty() { None } else { Some(def) };
2817
2818            let mut iter = super::build_control_word_iterator(
2819                in_rep,
2820                max_rep,
2821                in_def,
2822                max_def,
2823                max_def + 1,
2824                expected_values.len(),
2825            );
2826            assert_eq!(iter.bytes_per_word(), expected_bytes_per_word);
2827            assert_eq!(iter.bits_rep(), expected_bits_rep);
2828            assert_eq!(iter.bits_def(), expected_bits_def);
2829            let mut cw_vec = Vec::with_capacity(num_vals * iter.bytes_per_word());
2830
2831            for _ in 0..num_vals {
2832                iter.append_next(&mut cw_vec);
2833            }
2834            assert!(iter.append_next(&mut cw_vec).is_none());
2835
2836            assert_eq!(expected_values, cw_vec);
2837
2838            let parser = super::ControlWordParser::new(expected_bits_rep, expected_bits_def);
2839
2840            let mut rep_out = Vec::with_capacity(num_vals);
2841            let mut def_out = Vec::with_capacity(num_vals);
2842
2843            if expected_bytes_per_word > 0 {
2844                for slice in cw_vec.chunks_exact(expected_bytes_per_word) {
2845                    parser.parse(slice, &mut rep_out, &mut def_out);
2846                }
2847            }
2848
2849            assert_eq!(rep, rep_out.as_slice());
2850            assert_eq!(def, def_out.as_slice());
2851        }
2852
2853        // Each will need 4 bits and so we should get 1-byte control words
2854        let rep = &[0_u16, 7, 3, 2, 9, 8, 12, 5];
2855        let def = &[5_u16, 3, 1, 2, 12, 15, 0, 2];
2856        let expected = vec![
2857            0b00000101, // 0, 5
2858            0b01110011, // 7, 3
2859            0b00110001, // 3, 1
2860            0b00100010, // 2, 2
2861            0b10011100, // 9, 12
2862            0b10001111, // 8, 15
2863            0b11000000, // 12, 0
2864            0b01010010, // 5, 2
2865        ];
2866        check(rep, def, expected, 1, 4, 4);
2867
2868        // Now we need 5 bits for def so we get 2-byte control words
2869        let rep = &[0_u16, 7, 3, 2, 9, 8, 12, 5];
2870        let def = &[5_u16, 3, 1, 2, 12, 22, 0, 2];
2871        let expected = vec![
2872            0b00000101, 0b00000000, // 0, 5
2873            0b11100011, 0b00000000, // 7, 3
2874            0b01100001, 0b00000000, // 3, 1
2875            0b01000010, 0b00000000, // 2, 2
2876            0b00101100, 0b00000001, // 9, 12
2877            0b00010110, 0b00000001, // 8, 22
2878            0b10000000, 0b00000001, // 12, 0
2879            0b10100010, 0b00000000, // 5, 2
2880        ];
2881        check(rep, def, expected, 2, 4, 5);
2882
2883        // Just rep, 4 bits so 1 byte each
2884        let levels = &[0_u16, 7, 3, 2, 9, 8, 12, 5];
2885        let expected = vec![
2886            0b00000000, // 0
2887            0b00000111, // 7
2888            0b00000011, // 3
2889            0b00000010, // 2
2890            0b00001001, // 9
2891            0b00001000, // 8
2892            0b00001100, // 12
2893            0b00000101, // 5
2894        ];
2895        check(levels, &[], expected.clone(), 1, 4, 0);
2896
2897        // Just def
2898        check(&[], levels, expected, 1, 0, 4);
2899
2900        // No rep, no def, no bytes
2901        check(&[], &[], Vec::default(), 0, 0, 0);
2902    }
2903
2904    #[test]
2905    fn test_control_words_rep_index() {
2906        fn check(
2907            rep: &[u16],
2908            def: &[u16],
2909            expected_new_rows: Vec<bool>,
2910            expected_is_visible: Vec<bool>,
2911        ) {
2912            let num_vals = rep.len().max(def.len());
2913            let max_rep = rep.iter().max().copied().unwrap_or(0);
2914            let max_def = def.iter().max().copied().unwrap_or(0);
2915
2916            let in_rep = if rep.is_empty() { None } else { Some(rep) };
2917            let in_def = if def.is_empty() { None } else { Some(def) };
2918
2919            let mut iter = super::build_control_word_iterator(
2920                in_rep,
2921                max_rep,
2922                in_def,
2923                max_def,
2924                /*max_visible_def=*/ 2,
2925                expected_new_rows.len(),
2926            );
2927
2928            let mut cw_vec = Vec::with_capacity(num_vals * iter.bytes_per_word());
2929            let mut expected_new_rows = expected_new_rows.iter().copied();
2930            let mut expected_is_visible = expected_is_visible.iter().copied();
2931            for _ in 0..expected_new_rows.len() {
2932                let word_desc = iter.append_next(&mut cw_vec).unwrap();
2933                assert_eq!(word_desc.is_new_row, expected_new_rows.next().unwrap());
2934                assert_eq!(word_desc.is_visible, expected_is_visible.next().unwrap());
2935            }
2936            assert!(iter.append_next(&mut cw_vec).is_none());
2937        }
2938
2939        // 2 means new list
2940        let rep = &[2_u16, 1, 0, 2, 2, 0, 1, 1, 0, 2, 0];
2941        // These values don't matter for this test
2942        let def = &[0_u16, 0, 0, 3, 1, 1, 2, 1, 0, 0, 1];
2943
2944        // Rep & def
2945        check(
2946            rep,
2947            def,
2948            vec![
2949                true, false, false, true, true, false, false, false, false, true, false,
2950            ],
2951            vec![
2952                true, true, true, false, true, true, true, true, true, true, true,
2953            ],
2954        );
2955        // Rep only
2956        check(
2957            rep,
2958            &[],
2959            vec![
2960                true, false, false, true, true, false, false, false, false, true, false,
2961            ],
2962            vec![true; 11],
2963        );
2964        // No repetition
2965        check(
2966            &[],
2967            def,
2968            vec![
2969                true, true, true, true, true, true, true, true, true, true, true,
2970            ],
2971            vec![true; 11],
2972        );
2973        // No repetition, no definition
2974        check(
2975            &[],
2976            &[],
2977            vec![
2978                true, true, true, true, true, true, true, true, true, true, true,
2979            ],
2980            vec![true; 11],
2981        );
2982    }
2983
2984    #[test]
2985    fn regress_empty_list_case() {
2986        // This regresses a case where we had 3 null lists inside a struct
2987        let mut builder = RepDefBuilder::default();
2988        builder.add_validity_bitmap(validity(&[true, false, true]));
2989        builder.add_offsets(
2990            offsets_32(&[0, 0, 0, 0]),
2991            Some(validity(&[false, false, false])),
2992        );
2993        builder.add_no_null(0);
2994
2995        let repdefs = RepDefBuilder::serialize(vec![builder]);
2996        let rep = repdefs.repetition_levels.unwrap();
2997        let def = repdefs.definition_levels.unwrap();
2998
2999        assert_eq!([1, 1, 1], *rep);
3000        assert_eq!([1, 2, 1], *def);
3001
3002        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
3003            Some(rep.as_ref().to_vec()),
3004            Some(def.as_ref().to_vec()),
3005            repdefs.def_meaning.into(),
3006        )]);
3007
3008        assert_eq!(unraveler.unravel_validity(0), None);
3009        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
3010        assert_eq!(off.inner(), offsets_32(&[0, 0, 0, 0]).inner());
3011        assert_eq!(val, Some(validity(&[false, false, false])));
3012        let val = unraveler.unravel_validity(3).unwrap();
3013        assert_eq!(val.inner(), validity(&[true, false, true]).inner());
3014    }
3015
3016    #[test]
3017    fn regress_list_ends_null_case() {
3018        let mut builder = RepDefBuilder::default();
3019        builder.add_offsets(
3020            offsets_64(&[0, 1, 2, 2]),
3021            Some(validity(&[true, true, false])),
3022        );
3023        builder.add_offsets(offsets_64(&[0, 1, 1]), Some(validity(&[true, false])));
3024        builder.add_no_null(1);
3025
3026        let repdefs = RepDefBuilder::serialize(vec![builder]);
3027        let rep = repdefs.repetition_levels.unwrap();
3028        let def = repdefs.definition_levels.unwrap();
3029
3030        assert_eq!([2, 2, 2], *rep);
3031        assert_eq!([0, 1, 2], *def);
3032
3033        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
3034            Some(rep.as_ref().to_vec()),
3035            Some(def.as_ref().to_vec()),
3036            repdefs.def_meaning.into(),
3037        )]);
3038
3039        assert_eq!(unraveler.unravel_validity(1), None);
3040        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
3041        assert_eq!(off.inner(), offsets_32(&[0, 1, 1]).inner());
3042        assert_eq!(val, Some(validity(&[true, false])));
3043        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
3044        assert_eq!(off.inner(), offsets_32(&[0, 1, 2, 2]).inner());
3045        assert_eq!(val, Some(validity(&[true, true, false])));
3046    }
3047}