lance_encoding/
repdef.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Utilities for rep-def levels
5//!
6//! Repetition and definition levels are a way to encode multipile validity / offsets arrays
7//! into a single buffer.  They are a form of "zipping" buffers together that takes advantage
8//! of the fact that, if the outermost array is invalid, then the validity of the inner items
9//! is irrelevant.
10//!
11//! Note: the concept of repetition & definition levels comes from the Dremel paper and has
12//! been implemented in Apache Parquet.  However, the implementation here is not necessarily
13//! compatible with Parquet.  For example, we use 0 to represent the "inner-most" item and
14//! Parquet uses 0 to represent the "outer-most" item.
15//!
16//! # Repetition Levels
17//!
18//! With repetition levels we convert a sparse array of offsets into a dense array of levels.
19//! These levels are marked non-zero whenever a new list begins.  In other words, given the
20//! list array with 3 rows [{<0,1>, <>, <2>}, {<3>}, {}], [], [{<4>}] we would have three
21//! offsets arrays:
22//!
23//! Outer-most ([]): [0, 3, 3, 4]
24//! Middle     ({}): [0, 3, 4, 4, 5]
25//! Inner      (<>): [0, 2, 2, 3, 4, 5]
26//! Values         : [0, 1, 2, 3, 4]
27//!
28//! We can convert these into repetition levels as follows:
29//!
30//! | Values | Repetition |
31//! | ------ | ---------- |
32//! |      0 |          3 | // Start of outer-most list
33//! |      1 |          0 | // Continues inner-most list (no new lists)
34//! |      - |          1 | // Start of new inner-most list (empty list)
35//! |      2 |          1 | // Start of new inner-most list
36//! |      3 |          2 | // Start of new middle list
37//! |      - |          2 | // Start of new inner-most list (empty list)
38//! |      - |          3 | // Start of new outer-most list (empty list)
39//! |      4 |          0 | // Start of new outer-most list
40//!
41//! Note: We actually have MORE repetition levels than values.  This is because the repetition
42//! levels need to be able to represent empty lists.
43//!
44//! # Definition Levels
45//!
46//! Definition levels are simpler.  We can think of them as zipping together various validity bitmaps
47//! (from different levels of nesting) into a single buffer.  For example, we could zip the arrays
48//! [1, 1, 0, 0] and [1, 0, 1, 0] into [11, 10, 01, 00].  However, 00 and 01 are redundant.  If the
49//! outer level is null then the validity of the inner levels is irrelevant.  To save space we instead
50//! encode a "level" which is the "depth" of the null.  Let's look at a more complete example:
51//!
52//! Array: [{"middle": {"inner": 1]}}, NULL, {"middle": NULL}, {"middle": {"inner": NULL}}]
53//!
54//! In Arrow we would have the following validity arrays:
55//! Outer validity : 1, 0, 1, 1
56//! Middle validity: 1, ?, 0, 1
57//! Inner validity : 1, ?, ?, 0
58//! Values         : 1, ?, ?, ?
59//!
60//! The ? values are undefined in the Arrow format.  We can convert these into definition levels as follows:
61//!
62//! | Values | Definition |
63//! | ------ | ---------- |
64//! |      1 |          0 | // Valid at all levels
65//! |      - |          3 | // Null at outer level
66//! |      - |          2 | // Null at middle level
67//! |      - |          1 | // Null at inner level
68//!
69//! # Compression
70//!
71//! Note that we only need 2 bits of definition levels to represent 3 levels of nesting.  Definition
72//! levels are always more compact than the input validity arrays.  However, compressed levels are not
73//! necessarily more compact than the compressed validity arrays.
74//!
75//! Repetition levels are more complex.  If there are very large lists then a sparse array of offsets
76//! (which has one element per list) might be more compact than a dense array of repetition levels
77//! (which has one element per list value, possibly even more if there are empty lists).
78//!
79//! However, both repetition levels and definition levels are typically very compressible with RLE.
80//!
81//! However, in Lance we don't always take advantage of that compression because we want to be able
82//! to zip rep-def levels together with our values.  This gives us fewer IOPS when accessing row values.
83//!
84//! # Utilities in this Module
85//!
86//! - `RepDefBuilder` - Extracts validity and offset information from Arrow arrays.  We use this as we
87//!   shred the incoming data into primitive leaf arrays.  We don't immediately convert into rep-def because
88//!   we need to share and cheaply clone the builder when we have structs (each struct child shares some parent
89//!   validity / offset information)  The `serialize` method is called once all data has been received to create
90//!   the final rep-def levels.
91//!
92//! - `SerializerContext` - This is an internal utility that helps with serializing rep-def levels.
93//!
94//! - `CompositeRepDefUnraveler` - This structure is used to reverse the process.  It starts with a set of
95//!   rep-def levels and then uses the `unravel_validity` and `unravel_offsets` methods to produce validity
96//!   buffers and offset buffers.  It is "composite" because we may be combining sets of rep-def buffers from
97//!   multiple locations (e.g. multiple blocks in a mini-block encoded file).
98//!
99//! - `RepDefSlicer` - This is a utility that helps with slicing rep-def buffers.  These buffers are "kind of"
100//!   transparent (maps 1:1 with the values in the array) but not exactly because of the special (empty/null) lists.
101//!   The slicer helps with this issue and is used when slicing a rep-def buffer into mini-blocks.
102//!
103//! - `build_control_word_iterator` - This takes in rep-def levels and returns an iterator that returns byte-padded
104//!   "control words" which are used when creating full-zip encoded data.
105//!
106//! - `ControlWordParser` - This parser can parse the control words returned by `build_control_word_iterator` and is
107//!   used when decoding full-zip encoded data.
108
109use std::{
110    iter::{Copied, Zip},
111    sync::Arc,
112};
113
114use arrow_array::OffsetSizeTrait;
115use arrow_buffer::{
116    ArrowNativeType, BooleanBuffer, BooleanBufferBuilder, NullBuffer, OffsetBuffer, ScalarBuffer,
117};
118use lance_core::{utils::bit::log_2_ceil, Error, Result};
119use snafu::location;
120
121use crate::buffer::LanceBuffer;
122
123pub type LevelBuffer = Vec<u16>;
124
125// As we build def levels we add this to special values to indicate that they
126// are special so that we can skip over them when processing lower levels.
127//
128// We assume 16 bits is good enough for rep-def levels.  This _would_ give
129// us 65536 levels of struct nesting and list nesting.  However, we cut that
130// in half for SPECIAL_THRESHOLD because we use the top bit to indicate if an
131// item is a special value (null list / empty list) during construction.
132//
133// We subtract this off at the end of construction to get the actual definition
134// levels.
135const SPECIAL_THRESHOLD: u16 = u16::MAX / 2;
136
137/// Represents information that we extract from a list array as we are
138/// encoding
139#[derive(Clone, Debug)]
140struct OffsetDesc {
141    offsets: Arc<[i64]>,
142    validity: Option<BooleanBuffer>,
143    has_empty_lists: bool,
144    num_values: usize,
145    num_specials: usize,
146}
147
148/// Represents validity information that we extract from non-list arrays (that
149/// have nulls) as we are encoding
150#[derive(Clone, Debug)]
151struct ValidityDesc {
152    validity: Option<BooleanBuffer>,
153    num_values: usize,
154}
155
156/// Represents validity information that we extract from FSL arrays.  This is
157/// just validity (no offsets) but we also record the dimension of the FSL array
158/// as that will impact the next layer
159#[derive(Clone, Debug)]
160struct FslDesc {
161    validity: Option<BooleanBuffer>,
162    dimension: usize,
163    num_values: usize,
164}
165
166// As we build up rep/def from arrow arrays we record a
167// series of RawRepDef objects.  Each one corresponds to layer
168// in the array structure
169#[derive(Clone, Debug)]
170enum RawRepDef {
171    Offsets(OffsetDesc),
172    Validity(ValidityDesc),
173    Fsl(FslDesc),
174}
175
176impl RawRepDef {
177    // Are there any nulls in this layer
178    fn has_nulls(&self) -> bool {
179        match self {
180            Self::Offsets(OffsetDesc { validity, .. }) => validity.is_some(),
181            Self::Validity(ValidityDesc { validity, .. }) => validity.is_some(),
182            Self::Fsl(FslDesc { validity, .. }) => validity.is_some(),
183        }
184    }
185
186    // How many values are in this layer
187    fn num_values(&self) -> usize {
188        match self {
189            Self::Offsets(OffsetDesc { num_values, .. }) => *num_values,
190            Self::Validity(ValidityDesc { num_values, .. }) => *num_values,
191            Self::Fsl(FslDesc { num_values, .. }) => *num_values,
192        }
193    }
194
195    /// How many empty/null lists are in this layer
196    fn num_specials(&self) -> usize {
197        match self {
198            Self::Offsets(OffsetDesc { num_specials, .. }) => *num_specials,
199            _ => 0,
200        }
201    }
202
203    /// How many definition levels do we need for this layer
204    fn max_def(&self) -> u16 {
205        match self {
206            Self::Offsets(OffsetDesc {
207                has_empty_lists,
208                validity,
209                ..
210            }) => {
211                let mut max_def = 0;
212                if *has_empty_lists {
213                    max_def += 1;
214                }
215                if validity.is_some() {
216                    max_def += 1;
217                }
218                max_def
219            }
220            Self::Validity(ValidityDesc { validity: None, .. }) => 0,
221            Self::Validity(ValidityDesc { .. }) => 1,
222            Self::Fsl(FslDesc { validity: None, .. }) => 0,
223            Self::Fsl(FslDesc { .. }) => 1,
224        }
225    }
226
227    /// How many repetition levels do we need for this layer
228    fn max_rep(&self) -> u16 {
229        match self {
230            Self::Offsets(_) => 1,
231            _ => 0,
232        }
233    }
234}
235
236/// Represents repetition and definition levels that have been
237/// serialized into a pair of (optional) level buffers
238#[derive(Debug)]
239pub struct SerializedRepDefs {
240    /// The repetition levels, one per item
241    ///
242    /// If None, there are no lists
243    pub repetition_levels: Option<Arc<[u16]>>,
244    /// The definition levels, one per item
245    ///
246    /// If None, there are no nulls
247    pub definition_levels: Option<Arc<[u16]>>,
248    /// The meaning of each definition level
249    pub def_meaning: Vec<DefinitionInterpretation>,
250    /// The maximum level that is "visible" from the lowest level
251    ///
252    /// This is the last level before we encounter a list level of some kind.  Once we've
253    /// hit a list level then nulls in any level beyond do not map to actual items.
254    ///
255    /// This is None if there are no lists
256    pub max_visible_level: Option<u16>,
257}
258
259impl SerializedRepDefs {
260    pub fn new(
261        repetition_levels: Option<LevelBuffer>,
262        definition_levels: Option<LevelBuffer>,
263        def_meaning: Vec<DefinitionInterpretation>,
264    ) -> Self {
265        let first_list = def_meaning.iter().position(|level| level.is_list());
266        let max_visible_level = first_list.map(|first_list| {
267            def_meaning
268                .iter()
269                .map(|level| level.num_def_levels())
270                .take(first_list)
271                .sum::<u16>()
272        });
273        Self {
274            repetition_levels: repetition_levels.map(Arc::from),
275            definition_levels: definition_levels.map(Arc::from),
276            def_meaning,
277            max_visible_level,
278        }
279    }
280
281    /// Creates an empty SerializedRepDefs (no repetition, all valid)
282    pub fn empty(def_meaning: Vec<DefinitionInterpretation>) -> Self {
283        Self {
284            repetition_levels: None,
285            definition_levels: None,
286            def_meaning,
287            max_visible_level: None,
288        }
289    }
290
291    pub fn rep_slicer(&self) -> Option<RepDefSlicer<'_>> {
292        self.repetition_levels
293            .as_ref()
294            .map(|rep| RepDefSlicer::new(self, rep.clone()))
295    }
296
297    pub fn def_slicer(&self) -> Option<RepDefSlicer<'_>> {
298        self.definition_levels
299            .as_ref()
300            .map(|def| RepDefSlicer::new(self, def.clone()))
301    }
302}
303
304/// Slices a level buffer into pieces
305///
306/// This is needed to handle the fact that a level buffer may have more
307/// levels than values due to special (empty/null) lists.
308///
309/// As a result, a call to `slice_next(10)` may return 10 levels or it may
310/// return more than 10 levels if any special values are encountered.
311#[derive(Debug)]
312pub struct RepDefSlicer<'a> {
313    repdef: &'a SerializedRepDefs,
314    to_slice: LanceBuffer,
315    current: usize,
316}
317
318// TODO: All of this logic will need some changing when we compress rep/def levels.
319impl<'a> RepDefSlicer<'a> {
320    fn new(repdef: &'a SerializedRepDefs, levels: Arc<[u16]>) -> Self {
321        Self {
322            repdef,
323            to_slice: LanceBuffer::reinterpret_slice(levels),
324            current: 0,
325        }
326    }
327
328    pub fn num_levels(&self) -> usize {
329        self.to_slice.len() / 2
330    }
331
332    pub fn num_levels_remaining(&self) -> usize {
333        self.num_levels() - self.current
334    }
335
336    pub fn all_levels(&self) -> &LanceBuffer {
337        &self.to_slice
338    }
339
340    /// Returns the rest of the levels not yet sliced
341    ///
342    /// This must be called instead of `slice_next` on the final iteration.
343    /// This is because anytime we slice there may be empty/null lists on the
344    /// boundary that are "free" and the current behavior in `slice_next` is to
345    /// leave them for the next call.
346    ///
347    /// `slice_rest` will slice all remaining levels and return them.
348    pub fn slice_rest(&mut self) -> LanceBuffer {
349        let start = self.current;
350        let remaining = self.num_levels_remaining();
351        self.current = self.num_levels();
352        self.to_slice.slice_with_length(start * 2, remaining * 2)
353    }
354
355    /// Returns enough levels to satisfy the next `num_values` values
356    pub fn slice_next(&mut self, num_values: usize) -> LanceBuffer {
357        let start = self.current;
358        let Some(max_visible_level) = self.repdef.max_visible_level else {
359            // No lists, should be 1:1 mapping from levels to values
360            self.current = start + num_values;
361            return self.to_slice.slice_with_length(start * 2, num_values * 2);
362        };
363        if let Some(def) = self.repdef.definition_levels.as_ref() {
364            // There are lists and there are def levels.  That means there may be
365            // more rep/def levels than values.  We need to scan the def levels to figure
366            // out which items are "invisible" and skip over them
367            let mut def_itr = def[start..].iter();
368            let mut num_taken = 0;
369            let mut num_passed = 0;
370            while num_taken < num_values {
371                let def_level = *def_itr.next().unwrap();
372                if def_level <= max_visible_level {
373                    num_taken += 1;
374                }
375                num_passed += 1;
376            }
377            self.current = start + num_passed;
378            self.to_slice.slice_with_length(start * 2, num_passed * 2)
379        } else {
380            // No def levels, should be 1:1 mapping from levels to values
381            self.current = start + num_values;
382            self.to_slice.slice_with_length(start * 2, num_values * 2)
383        }
384    }
385}
386
387/// This tells us how an array handles definition.  Given a stack of
388/// these and a nested array and a set of definition levels we can calculate
389/// how we should interpret the definition levels.
390///
391/// For example, if the interpretation is [AllValidItem, NullableItem] then
392/// a 0 means "valid item" and a 1 means "null struct".  If the interpretation
393/// is [NullableItem, NullableItem] then a 0 means "valid item" and a 1 means
394/// "null item" and a 2 means "null struct".
395///
396/// Lists are tricky because we might use up to two definition levels for a
397/// single layer of list nesting because we need one value to indicate "empty list"
398/// and another value to indicate "null list".
399#[derive(Debug, Copy, Clone, PartialEq, Eq)]
400pub enum DefinitionInterpretation {
401    AllValidItem,
402    AllValidList,
403    NullableItem,
404    NullableList,
405    EmptyableList,
406    NullableAndEmptyableList,
407}
408
409impl DefinitionInterpretation {
410    /// How many definition levels do we need for this layer
411    pub fn num_def_levels(&self) -> u16 {
412        match self {
413            Self::AllValidItem => 0,
414            Self::AllValidList => 0,
415            Self::NullableItem => 1,
416            Self::NullableList => 1,
417            Self::EmptyableList => 1,
418            Self::NullableAndEmptyableList => 2,
419        }
420    }
421
422    /// Does this layer have nulls?
423    pub fn is_all_valid(&self) -> bool {
424        matches!(
425            self,
426            Self::AllValidItem | Self::AllValidList | Self::EmptyableList
427        )
428    }
429
430    /// Does this layer represent a list?
431    pub fn is_list(&self) -> bool {
432        matches!(
433            self,
434            Self::AllValidList
435                | Self::NullableList
436                | Self::EmptyableList
437                | Self::NullableAndEmptyableList
438        )
439    }
440}
441
442/// The RepDefBuilder is used to collect offsets & validity buffers
443/// from arrow structures.  Once we have those we use the SerializerContext
444/// to build the actual repetition and definition levels.
445///
446/// We know ahead of time how many rep/def levels we will need (number of items
447/// in inner-most array + the number of empty/null lists in any parent arrays).
448///
449/// As a result we try and avoid any re-allocations by pre-allocating the buffers
450/// up front.  We allocate two copies of each buffer which allows us to avoid unsafe
451/// code caused by reading and writing to the same buffer (also, it's unavoidable
452/// because there are times we need to write 'faster' than we read)
453#[derive(Debug)]
454struct SerializerContext {
455    // This is built from outer-to-inner and then reversed at the end
456    def_meaning: Vec<DefinitionInterpretation>,
457    rep_levels: LevelBuffer,
458    spare_rep: LevelBuffer,
459    def_levels: LevelBuffer,
460    spare_def: LevelBuffer,
461    current_rep: u16,
462    current_def: u16,
463    current_len: usize,
464    current_num_specials: usize,
465}
466
467impl SerializerContext {
468    fn new(len: usize, num_layers: usize, max_rep: u16, max_def: u16) -> Self {
469        let def_meaning = Vec::with_capacity(num_layers);
470        Self {
471            rep_levels: if max_rep > 0 {
472                vec![0; len]
473            } else {
474                LevelBuffer::default()
475            },
476            spare_rep: if max_rep > 0 {
477                vec![0; len]
478            } else {
479                LevelBuffer::default()
480            },
481            def_levels: if max_def > 0 {
482                vec![0; len]
483            } else {
484                LevelBuffer::default()
485            },
486            spare_def: if max_def > 0 {
487                vec![0; len]
488            } else {
489                LevelBuffer::default()
490            },
491            def_meaning,
492            current_rep: max_rep,
493            current_def: max_def,
494            current_len: 0,
495            current_num_specials: 0,
496        }
497    }
498
499    fn checkout_def(&mut self, meaning: DefinitionInterpretation) -> u16 {
500        let def = self.current_def;
501        self.current_def -= meaning.num_def_levels();
502        self.def_meaning.push(meaning);
503        def
504    }
505
506    fn record_offsets(&mut self, offset_desc: &OffsetDesc) {
507        let rep_level = self.current_rep;
508        let (null_list_level, empty_list_level) =
509            match (offset_desc.validity.is_some(), offset_desc.has_empty_lists) {
510                (true, true) => {
511                    let level =
512                        self.checkout_def(DefinitionInterpretation::NullableAndEmptyableList);
513                    (level - 1, level)
514                }
515                (true, false) => (self.checkout_def(DefinitionInterpretation::NullableList), 0),
516                (false, true) => (
517                    0,
518                    self.checkout_def(DefinitionInterpretation::EmptyableList),
519                ),
520                (false, false) => {
521                    self.checkout_def(DefinitionInterpretation::AllValidList);
522                    (0, 0)
523                }
524            };
525        self.current_rep -= 1;
526
527        if let Some(validity) = &offset_desc.validity {
528            self.do_record_validity(validity, null_list_level);
529        }
530
531        // We write into the spare buffers and read from the active buffers
532        // and then swap at the end.  This way we don't write over what we
533        // are reading.
534
535        let mut new_len = 0;
536        assert!(self.rep_levels.len() >= (offset_desc.num_values + self.current_num_specials) - 1);
537        if self.def_levels.is_empty() {
538            let mut write_itr = self.spare_rep.iter_mut();
539            let mut read_iter = self.rep_levels.iter().copied();
540            for w in offset_desc.offsets.windows(2) {
541                let len = w[1] - w[0];
542                // len can't be 0 because then we'd have def levels
543                assert!(len > 0);
544                let rep = read_iter.next().unwrap();
545                let list_level = if rep == 0 { rep_level } else { rep };
546                *write_itr.next().unwrap() = list_level;
547
548                for _ in 1..len {
549                    *write_itr.next().unwrap() = 0;
550                }
551                new_len += len as usize;
552            }
553            std::mem::swap(&mut self.rep_levels, &mut self.spare_rep);
554        } else {
555            assert!(
556                self.def_levels.len() >= (offset_desc.num_values + self.current_num_specials) - 1
557            );
558            let mut def_write_itr = self.spare_def.iter_mut();
559            let mut rep_write_itr = self.spare_rep.iter_mut();
560            let mut rep_read_itr = self.rep_levels.iter().copied();
561            let mut def_read_itr = self.def_levels.iter().copied();
562            let specials_to_pass = self.current_num_specials;
563            let mut specials_passed = 0;
564
565            for w in offset_desc.offsets.windows(2) {
566                let mut def = def_read_itr.next().unwrap();
567                // Copy over any higher-level special values in place
568                while def > SPECIAL_THRESHOLD {
569                    *def_write_itr.next().unwrap() = def;
570                    *rep_write_itr.next().unwrap() = rep_read_itr.next().unwrap();
571                    def = def_read_itr.next().unwrap();
572                    new_len += 1;
573                    specials_passed += 1;
574                }
575
576                let len = w[1] - w[0];
577                let rep = rep_read_itr.next().unwrap();
578
579                // If the rep_level is 0 then we are the first list level
580                // otherwise we are starting a higher level list so keep
581                // existing rep level
582                let list_level = if rep == 0 { rep_level } else { rep };
583
584                if def == 0 && len > 0 {
585                    // New valid list, write a rep level and then add new 0/0 items
586                    *def_write_itr.next().unwrap() = 0;
587                    *rep_write_itr.next().unwrap() = list_level;
588
589                    for _ in 1..len {
590                        *def_write_itr.next().unwrap() = 0;
591                        *rep_write_itr.next().unwrap() = 0;
592                    }
593
594                    new_len += len as usize;
595                } else if def == 0 {
596                    // Empty list, insert new special
597                    *def_write_itr.next().unwrap() = empty_list_level + SPECIAL_THRESHOLD;
598                    *rep_write_itr.next().unwrap() = list_level;
599                    new_len += 1;
600                } else {
601                    // Either the list is null or one of its struct parents
602                    // is null.  Promote it to a special value.
603                    *def_write_itr.next().unwrap() = def + SPECIAL_THRESHOLD;
604                    *rep_write_itr.next().unwrap() = list_level;
605                    new_len += 1;
606                }
607            }
608
609            // If we have any special values at the end, we need to copy them over
610            while specials_passed < specials_to_pass {
611                *def_write_itr.next().unwrap() = def_read_itr.next().unwrap();
612                *rep_write_itr.next().unwrap() = rep_read_itr.next().unwrap();
613                new_len += 1;
614                specials_passed += 1;
615            }
616            std::mem::swap(&mut self.def_levels, &mut self.spare_def);
617            std::mem::swap(&mut self.rep_levels, &mut self.spare_rep);
618        }
619
620        self.current_len = new_len;
621        self.current_num_specials += offset_desc.num_specials;
622    }
623
624    fn do_record_validity(&mut self, validity: &BooleanBuffer, null_level: u16) {
625        assert!(self.def_levels.len() >= validity.len() + self.current_num_specials);
626        debug_assert!(
627            self.current_len == 0 || self.current_len == validity.len() + self.current_num_specials
628        );
629        self.current_len = validity.len();
630
631        let mut def_read_itr = self.def_levels.iter().copied();
632        let mut def_write_itr = self.spare_def.iter_mut();
633
634        let specials_to_pass = self.current_num_specials;
635        let mut specials_passed = 0;
636
637        for incoming_validity in validity.iter() {
638            let mut def = def_read_itr.next().unwrap();
639            while def > SPECIAL_THRESHOLD {
640                *def_write_itr.next().unwrap() = def;
641                def = def_read_itr.next().unwrap();
642                specials_passed += 1;
643            }
644            if def == 0 && !incoming_validity {
645                *def_write_itr.next().unwrap() = null_level;
646            } else {
647                *def_write_itr.next().unwrap() = def;
648            }
649        }
650
651        while specials_passed < specials_to_pass {
652            *def_write_itr.next().unwrap() = def_read_itr.next().unwrap();
653            specials_passed += 1;
654        }
655
656        std::mem::swap(&mut self.def_levels, &mut self.spare_def);
657    }
658
659    fn multiply_levels(&mut self, multiplier: usize) {
660        let old_len = self.current_len;
661        // All non-special values will be broadcasted by the multiplier.  Special values are copied as-is.
662        self.current_len =
663            (self.current_len - self.current_num_specials) * multiplier + self.current_num_specials;
664
665        if self.rep_levels.is_empty() && self.def_levels.is_empty() {
666            // All valid with no rep/def levels, nothing to do
667            return;
668        } else if self.rep_levels.is_empty() {
669            assert!(self.def_levels.len() >= self.current_len);
670            // No rep levels, just multiply the def levels
671            let mut def_read_itr = self.def_levels.iter().copied();
672            let mut def_write_itr = self.spare_def.iter_mut();
673            for _ in 0..old_len {
674                let mut def = def_read_itr.next().unwrap();
675                while def > SPECIAL_THRESHOLD {
676                    *def_write_itr.next().unwrap() = def;
677                    def = def_read_itr.next().unwrap();
678                }
679                for _ in 0..multiplier {
680                    *def_write_itr.next().unwrap() = def;
681                }
682            }
683        } else if self.def_levels.is_empty() {
684            assert!(self.rep_levels.len() >= self.current_len);
685            // No def levels, just multiply the rep levels
686            let mut rep_read_itr = self.rep_levels.iter().copied();
687            let mut rep_write_itr = self.spare_rep.iter_mut();
688            for _ in 0..old_len {
689                let rep = rep_read_itr.next().unwrap();
690                for _ in 0..multiplier {
691                    *rep_write_itr.next().unwrap() = rep;
692                }
693            }
694        } else {
695            assert!(self.rep_levels.len() >= self.current_len);
696            assert!(self.def_levels.len() >= self.current_len);
697            let mut rep_read_itr = self.rep_levels.iter().copied();
698            let mut def_read_itr = self.def_levels.iter().copied();
699            let mut rep_write_itr = self.spare_rep.iter_mut();
700            let mut def_write_itr = self.spare_def.iter_mut();
701            for _ in 0..old_len {
702                let mut def = def_read_itr.next().unwrap();
703                while def > SPECIAL_THRESHOLD {
704                    *def_write_itr.next().unwrap() = def;
705                    *rep_write_itr.next().unwrap() = rep_read_itr.next().unwrap();
706                    def = def_read_itr.next().unwrap();
707                }
708                let rep = rep_read_itr.next().unwrap();
709                for _ in 0..multiplier {
710                    *def_write_itr.next().unwrap() = def;
711                    *rep_write_itr.next().unwrap() = rep;
712                }
713            }
714        }
715        std::mem::swap(&mut self.def_levels, &mut self.spare_def);
716        std::mem::swap(&mut self.rep_levels, &mut self.spare_rep);
717    }
718
719    fn record_validity_buf(&mut self, validity: &Option<BooleanBuffer>) {
720        if let Some(validity) = validity {
721            let def_level = self.checkout_def(DefinitionInterpretation::NullableItem);
722            self.do_record_validity(validity, def_level);
723        } else {
724            self.checkout_def(DefinitionInterpretation::AllValidItem);
725        }
726    }
727
728    fn record_validity(&mut self, validity_desc: &ValidityDesc) {
729        self.record_validity_buf(&validity_desc.validity)
730    }
731
732    fn record_fsl(&mut self, fsl_desc: &FslDesc) {
733        self.record_validity_buf(&fsl_desc.validity);
734        self.multiply_levels(fsl_desc.dimension);
735    }
736
737    fn normalize_specials(&mut self) {
738        for def in self.def_levels.iter_mut() {
739            if *def > SPECIAL_THRESHOLD {
740                *def -= SPECIAL_THRESHOLD;
741            }
742        }
743    }
744
745    fn build(mut self) -> SerializedRepDefs {
746        if self.current_len == 0 {
747            return SerializedRepDefs::new(None, None, self.def_meaning);
748        }
749
750        self.normalize_specials();
751
752        let definition_levels = if self.def_levels.is_empty() {
753            None
754        } else {
755            Some(self.def_levels)
756        };
757        let repetition_levels = if self.rep_levels.is_empty() {
758            None
759        } else {
760            Some(self.rep_levels)
761        };
762
763        // Need to reverse the def meaning since we build rep / def levels in reverse
764        let def_meaning = self.def_meaning.into_iter().rev().collect::<Vec<_>>();
765
766        SerializedRepDefs::new(repetition_levels, definition_levels, def_meaning)
767    }
768}
769
770/// A structure used to collect validity buffers and offsets from arrow
771/// arrays and eventually create repetition and definition levels
772///
773/// As we are encoding the structural encoders are given this struct and
774/// will record the arrow information into it.  Once we hit a leaf node we
775/// serialize the data into rep/def levels and write these into the page.
776#[derive(Clone, Default, Debug)]
777pub struct RepDefBuilder {
778    // The rep/def info we have collected so far
779    repdefs: Vec<RawRepDef>,
780    // The current length, can get larger as we traverse lists (e.g. an
781    // array might have 5 lists which results in 50 items)
782    //
783    // Starts uninitialized until we see the first rep/def item
784    len: Option<usize>,
785}
786
787impl RepDefBuilder {
788    fn check_validity_len(&mut self, incoming_len: usize) {
789        if let Some(len) = self.len {
790            assert_eq!(incoming_len, len);
791        } else {
792            // First validity buffer we've seen
793            self.len = Some(incoming_len);
794        }
795    }
796
797    fn num_layers(&self) -> usize {
798        self.repdefs.len()
799    }
800
801    /// The builder is "empty" if there is no repetition and no nulls.  In this case we don't need
802    /// to store anything to disk (except the description)
803    pub fn is_empty(&self) -> bool {
804        self.repdefs
805            .iter()
806            .all(|r| matches!(r, RawRepDef::Validity(ValidityDesc { validity: None, .. })))
807    }
808
809    /// Returns true if there is only a single layer of definition
810    pub fn is_simple_validity(&self) -> bool {
811        self.repdefs.len() == 1 && matches!(self.repdefs[0], RawRepDef::Validity(_))
812    }
813
814    /// Registers a nullable validity bitmap
815    pub fn add_validity_bitmap(&mut self, validity: NullBuffer) {
816        self.check_validity_len(validity.len());
817        self.repdefs.push(RawRepDef::Validity(ValidityDesc {
818            num_values: validity.len(),
819            validity: Some(validity.into_inner()),
820        }));
821    }
822
823    /// Registers an all-valid validity layer
824    pub fn add_no_null(&mut self, len: usize) {
825        self.check_validity_len(len);
826        self.repdefs.push(RawRepDef::Validity(ValidityDesc {
827            validity: None,
828            num_values: len,
829        }));
830    }
831
832    pub fn add_fsl(&mut self, validity: Option<NullBuffer>, dimension: usize, num_values: usize) {
833        if let Some(len) = self.len {
834            assert_eq!(num_values, len);
835        }
836        self.len = Some(num_values * dimension);
837        debug_assert!(validity.is_none() || validity.as_ref().unwrap().len() == num_values);
838        self.repdefs.push(RawRepDef::Fsl(FslDesc {
839            num_values,
840            validity: validity.map(|v| v.into_inner()),
841            dimension,
842        }))
843    }
844
845    fn check_offset_len(&mut self, offsets: &[i64]) {
846        if let Some(len) = self.len {
847            assert!(offsets.len() == len + 1);
848        }
849        self.len = Some(offsets[offsets.len() - 1] as usize);
850    }
851
852    fn do_add_offsets(
853        &mut self,
854        lengths: impl Iterator<Item = i64>,
855        validity: Option<NullBuffer>,
856        capacity: usize,
857    ) -> bool {
858        let mut num_specials = 0;
859        let mut has_empty_lists = false;
860        let mut has_garbage_values = false;
861        let mut last_off: i64 = 0;
862
863        let mut normalized_offsets = Vec::with_capacity(capacity);
864        normalized_offsets.push(0);
865
866        if let Some(ref validity) = validity {
867            for (len, is_valid) in lengths.zip(validity.iter()) {
868                match (is_valid, len == 0) {
869                    (false, is_empty) => {
870                        num_specials += 1;
871                        has_garbage_values |= !is_empty;
872                    }
873                    (true, true) => {
874                        num_specials += 1;
875                        has_empty_lists = true;
876                    }
877                    _ => {
878                        last_off += len;
879                    }
880                }
881                normalized_offsets.push(last_off);
882            }
883        } else {
884            for len in lengths {
885                if len == 0 {
886                    num_specials += 1;
887                    has_empty_lists = true;
888                }
889                last_off += len;
890                normalized_offsets.push(last_off);
891            }
892        }
893
894        self.check_offset_len(&normalized_offsets);
895        self.repdefs.push(RawRepDef::Offsets(OffsetDesc {
896            num_values: normalized_offsets.len() - 1,
897            offsets: normalized_offsets.into(),
898            validity: validity.map(|v| v.into_inner()),
899            has_empty_lists,
900            num_specials: num_specials as usize,
901        }));
902
903        has_garbage_values
904    }
905
906    /// Adds a layer of offsets
907    ///
908    /// Offsets are casted to a common type (i64) and also normalized.  Null lists are
909    /// always represented by a zero-length (identical) pair of offsets and so the caller
910    /// should filter out any garbage items before encoding them.  To assist with this the
911    /// method will return true if any non-empty null lists were found.
912    pub fn add_offsets<O: OffsetSizeTrait>(
913        &mut self,
914        offsets: OffsetBuffer<O>,
915        validity: Option<NullBuffer>,
916    ) -> bool {
917        let inner = offsets.into_inner();
918        let buffer_len = inner.len();
919
920        if O::IS_LARGE {
921            let i64_buff = ScalarBuffer::<i64>::new(inner.into_inner(), 0, buffer_len);
922            let lengths = i64_buff.windows(2).map(|off| off[1] - off[0]);
923            self.do_add_offsets(lengths, validity, buffer_len)
924        } else {
925            let i32_buff = ScalarBuffer::<i32>::new(inner.into_inner(), 0, buffer_len);
926            let lengths = i32_buff.windows(2).map(|off| (off[1] - off[0]) as i64);
927            self.do_add_offsets(lengths, validity, buffer_len)
928        }
929    }
930
931    // When we are encoding data it arrives in batches.  For each batch we create a RepDefBuilder and collect the
932    // various validity buffers and offset buffers from that batch.  Once we have enough batches to write a page we
933    // need to take this collection of RepDefBuilders and concatenate them and then serialize them into rep/def levels.
934    //
935    // TODO: In the future, we may concatenate and serialize at the same time?
936    //
937    // This method takes care of the concatenation part.  First we collect all of layer 0 from each builder, then we
938    // call this method.  Then we collect all of layer 1 from each builder and call this method.  And so on.
939    //
940    // That means this method should get a collection of `RawRepDef` where each item is the same kind (all validity or
941    // all offsets) though the nullability / lengths may be different in each layer.
942    fn concat_layers<'a>(
943        layers: impl Iterator<Item = &'a RawRepDef>,
944        num_layers: usize,
945    ) -> RawRepDef {
946        enum LayerKind {
947            Validity,
948            Fsl,
949            Offsets,
950        }
951
952        // We make two passes through the layers.  The first determines if we need to pay the cost of allocating
953        // buffers.  The second pass actually adds the values.
954        let mut collected = Vec::with_capacity(num_layers);
955        let mut has_nulls = false;
956        let mut layer_kind = LayerKind::Validity;
957        let mut total_num_specials = 0;
958        let mut all_dimension = 0;
959        let mut all_has_empty_lists = false;
960        let mut all_num_values = 0;
961        for layer in layers {
962            has_nulls |= layer.has_nulls();
963            match layer {
964                RawRepDef::Validity(_) => {
965                    layer_kind = LayerKind::Validity;
966                }
967                RawRepDef::Offsets(OffsetDesc {
968                    num_specials,
969                    has_empty_lists,
970                    ..
971                }) => {
972                    all_has_empty_lists |= *has_empty_lists;
973                    layer_kind = LayerKind::Offsets;
974                    total_num_specials += num_specials;
975                }
976                RawRepDef::Fsl(FslDesc { dimension, .. }) => {
977                    layer_kind = LayerKind::Fsl;
978                    all_dimension = *dimension;
979                }
980            }
981            collected.push(layer);
982            all_num_values += layer.num_values();
983        }
984
985        // Shortcut if there are no nulls
986        if !has_nulls {
987            match layer_kind {
988                LayerKind::Validity => {
989                    return RawRepDef::Validity(ValidityDesc {
990                        validity: None,
991                        num_values: all_num_values,
992                    });
993                }
994                LayerKind::Fsl => {
995                    return RawRepDef::Fsl(FslDesc {
996                        validity: None,
997                        num_values: all_num_values,
998                        dimension: all_dimension,
999                    })
1000                }
1001                LayerKind::Offsets => {}
1002            }
1003        }
1004
1005        // Only allocate if needed
1006        let mut validity_builder = if has_nulls {
1007            BooleanBufferBuilder::new(all_num_values)
1008        } else {
1009            BooleanBufferBuilder::new(0)
1010        };
1011        let mut all_offsets = if matches!(layer_kind, LayerKind::Offsets) {
1012            let mut all_offsets = Vec::with_capacity(all_num_values);
1013            all_offsets.push(0);
1014            all_offsets
1015        } else {
1016            Vec::new()
1017        };
1018
1019        for layer in collected {
1020            match layer {
1021                RawRepDef::Validity(ValidityDesc {
1022                    validity: Some(validity),
1023                    ..
1024                }) => {
1025                    validity_builder.append_buffer(validity);
1026                }
1027                RawRepDef::Validity(ValidityDesc {
1028                    validity: None,
1029                    num_values,
1030                }) => {
1031                    validity_builder.append_n(*num_values, true);
1032                }
1033                RawRepDef::Fsl(FslDesc {
1034                    validity,
1035                    num_values,
1036                    ..
1037                }) => {
1038                    if let Some(validity) = validity {
1039                        validity_builder.append_buffer(validity);
1040                    } else {
1041                        validity_builder.append_n(*num_values, true);
1042                    }
1043                }
1044                RawRepDef::Offsets(OffsetDesc {
1045                    offsets,
1046                    validity: Some(validity),
1047                    has_empty_lists,
1048                    ..
1049                }) => {
1050                    all_has_empty_lists |= has_empty_lists;
1051                    validity_builder.append_buffer(validity);
1052                    let last = *all_offsets.last().unwrap();
1053                    all_offsets.extend(offsets.iter().skip(1).map(|off| *off + last));
1054                }
1055                RawRepDef::Offsets(OffsetDesc {
1056                    offsets,
1057                    validity: None,
1058                    has_empty_lists,
1059                    num_values,
1060                    ..
1061                }) => {
1062                    all_has_empty_lists |= has_empty_lists;
1063                    if has_nulls {
1064                        validity_builder.append_n(*num_values, true);
1065                    }
1066                    let last = *all_offsets.last().unwrap();
1067                    all_offsets.extend(offsets.iter().skip(1).map(|off| *off + last));
1068                }
1069            }
1070        }
1071        let validity = if has_nulls {
1072            Some(validity_builder.finish())
1073        } else {
1074            None
1075        };
1076        match layer_kind {
1077            LayerKind::Fsl => RawRepDef::Fsl(FslDesc {
1078                validity,
1079                num_values: all_num_values,
1080                dimension: all_dimension,
1081            }),
1082            LayerKind::Validity => RawRepDef::Validity(ValidityDesc {
1083                validity,
1084                num_values: all_num_values,
1085            }),
1086            LayerKind::Offsets => RawRepDef::Offsets(OffsetDesc {
1087                offsets: all_offsets.into(),
1088                validity,
1089                has_empty_lists: all_has_empty_lists,
1090                num_values: all_num_values,
1091                num_specials: total_num_specials,
1092            }),
1093        }
1094    }
1095
1096    /// Converts the validity / offsets buffers that have been gathered so far
1097    /// into repetition and definition levels
1098    pub fn serialize(builders: Vec<Self>) -> SerializedRepDefs {
1099        assert!(!builders.is_empty());
1100        if builders.iter().all(|b| b.is_empty()) {
1101            // No repetition, all-valid
1102            return SerializedRepDefs::empty(
1103                builders
1104                    .first()
1105                    .unwrap()
1106                    .repdefs
1107                    .iter()
1108                    .map(|_| DefinitionInterpretation::AllValidItem)
1109                    .collect::<Vec<_>>(),
1110            );
1111        }
1112
1113        let num_layers = builders[0].num_layers();
1114        let combined_layers = (0..num_layers)
1115            .map(|layer_index| {
1116                Self::concat_layers(
1117                    builders.iter().map(|b| &b.repdefs[layer_index]),
1118                    builders.len(),
1119                )
1120            })
1121            .collect::<Vec<_>>();
1122        debug_assert!(builders
1123            .iter()
1124            .all(|b| b.num_layers() == builders[0].num_layers()));
1125
1126        let total_len = combined_layers.last().unwrap().num_values()
1127            + combined_layers
1128                .iter()
1129                .map(|l| l.num_specials())
1130                .sum::<usize>();
1131        let max_rep = combined_layers.iter().map(|l| l.max_rep()).sum::<u16>();
1132        let max_def = combined_layers.iter().map(|l| l.max_def()).sum::<u16>();
1133
1134        let mut context = SerializerContext::new(total_len, num_layers, max_rep, max_def);
1135        for layer in combined_layers.into_iter() {
1136            match layer {
1137                RawRepDef::Validity(def) => {
1138                    context.record_validity(&def);
1139                }
1140                RawRepDef::Offsets(rep) => {
1141                    context.record_offsets(&rep);
1142                }
1143                RawRepDef::Fsl(fsl) => {
1144                    context.record_fsl(&fsl);
1145                }
1146            }
1147        }
1148        context.build()
1149    }
1150}
1151
1152/// Starts with serialized repetition and definition levels and unravels
1153/// them into validity buffers and offsets buffers
1154///
1155/// This is used during decoding to create the necessary arrow structures
1156#[derive(Debug)]
1157pub struct RepDefUnraveler {
1158    rep_levels: Option<LevelBuffer>,
1159    def_levels: Option<LevelBuffer>,
1160    // Maps from definition level to the rep level at which that definition level is visible
1161    levels_to_rep: Vec<u16>,
1162    def_meaning: Arc<[DefinitionInterpretation]>,
1163    // Current definition level to compare to.
1164    current_def_cmp: u16,
1165    // Current rep level, determines which specials we can see
1166    current_rep_cmp: u16,
1167    // Current layer index, 0 means inner-most layer and it counts up from there.  Used to index
1168    // into special_defs
1169    current_layer: usize,
1170    // Number of items in the inner-most layer (needed if the definition levels are not present)
1171    num_items: u64,
1172}
1173
1174impl RepDefUnraveler {
1175    /// Creates a new unraveler from serialized repetition and definition information
1176    pub fn new(
1177        rep_levels: Option<LevelBuffer>,
1178        def_levels: Option<LevelBuffer>,
1179        def_meaning: Arc<[DefinitionInterpretation]>,
1180        num_items: u64,
1181    ) -> Self {
1182        let mut levels_to_rep = Vec::with_capacity(def_meaning.len());
1183        let mut rep_counter = 0;
1184        // Level=0 is always visible and means valid item
1185        levels_to_rep.push(0);
1186        for meaning in def_meaning.as_ref() {
1187            match meaning {
1188                DefinitionInterpretation::AllValidItem | DefinitionInterpretation::AllValidList => {
1189                    // There is no corresponding level, so nothing to put in levels_to_rep
1190                }
1191                DefinitionInterpretation::NullableItem => {
1192                    // Some null structs are not visible at inner rep levels in cases like LIST<STRUCT<LIST<...>>>
1193                    levels_to_rep.push(rep_counter);
1194                }
1195                DefinitionInterpretation::NullableList => {
1196                    rep_counter += 1;
1197                    levels_to_rep.push(rep_counter);
1198                }
1199                DefinitionInterpretation::EmptyableList => {
1200                    rep_counter += 1;
1201                    levels_to_rep.push(rep_counter);
1202                }
1203                DefinitionInterpretation::NullableAndEmptyableList => {
1204                    rep_counter += 1;
1205                    levels_to_rep.push(rep_counter);
1206                    levels_to_rep.push(rep_counter);
1207                }
1208            }
1209        }
1210        Self {
1211            rep_levels,
1212            def_levels,
1213            current_def_cmp: 0,
1214            current_rep_cmp: 0,
1215            levels_to_rep,
1216            current_layer: 0,
1217            def_meaning,
1218            num_items,
1219        }
1220    }
1221
1222    pub fn is_all_valid(&self) -> bool {
1223        self.def_meaning[self.current_layer].is_all_valid()
1224    }
1225
1226    /// If the current level is a repetition layer then this returns the number of lists
1227    /// at this level.
1228    ///
1229    /// This is not valid to call when the current level is a struct/primitive layer because
1230    /// in some cases there may be no rep or def information to know this.
1231    pub fn max_lists(&self) -> usize {
1232        debug_assert!(
1233            self.def_meaning[self.current_layer] != DefinitionInterpretation::NullableItem
1234        );
1235        self.rep_levels
1236            .as_ref()
1237            // Worst case every rep item is max_rep and a new list
1238            .map(|levels| levels.len())
1239            .unwrap_or(0)
1240    }
1241
1242    /// Unravels a layer of offsets from the unraveler into the given offset width
1243    ///
1244    /// When decoding a list the caller should first unravel the offsets and then
1245    /// unravel the validity (this is the opposite order used during encoding)
1246    pub fn unravel_offsets<T: ArrowNativeType>(
1247        &mut self,
1248        offsets: &mut Vec<T>,
1249        validity: Option<&mut BooleanBufferBuilder>,
1250    ) -> Result<()> {
1251        let rep_levels = self
1252            .rep_levels
1253            .as_mut()
1254            .expect("Expected repetition level but data didn't contain repetition");
1255        let valid_level = self.current_def_cmp;
1256        let (null_level, empty_level) = match self.def_meaning[self.current_layer] {
1257            DefinitionInterpretation::NullableList => {
1258                self.current_def_cmp += 1;
1259                (valid_level + 1, 0)
1260            }
1261            DefinitionInterpretation::EmptyableList => {
1262                self.current_def_cmp += 1;
1263                (0, valid_level + 1)
1264            }
1265            DefinitionInterpretation::NullableAndEmptyableList => {
1266                self.current_def_cmp += 2;
1267                (valid_level + 1, valid_level + 2)
1268            }
1269            DefinitionInterpretation::AllValidList => (0, 0),
1270            _ => unreachable!(),
1271        };
1272        self.current_layer += 1;
1273
1274        // This is the highest def level that is still visible.  Once we hit a list then
1275        // we stop looking because any null / empty list (or list masked by a higher level
1276        // null) will not be visible
1277        let mut max_level = null_level.max(empty_level);
1278        // Anything higher than this (but less than max_level) is a null struct masking our
1279        // list.  We will materialize this is a null list.
1280        let upper_null = max_level;
1281        for level in self.def_meaning[self.current_layer..].iter() {
1282            match level {
1283                DefinitionInterpretation::NullableItem => {
1284                    max_level += 1;
1285                }
1286                DefinitionInterpretation::AllValidItem => {}
1287                _ => {
1288                    break;
1289                }
1290            }
1291        }
1292
1293        let mut curlen: usize = offsets.last().map(|o| o.as_usize()).unwrap_or(0);
1294
1295        // If offsets is empty this is a no-op.  If offsets is not empty that means we already
1296        // added a set of offsets.  For example, we might have added [0, 3, 5] (2 lists).  Now
1297        // say we want to add [0, 1, 4] (2 lists).  We should get [0, 3, 5, 6, 9] (4 lists).  If
1298        // we don't pop here we get [0, 3, 5, 5, 6, 9] which is wrong.
1299        //
1300        // Or, to think about it another way, if every unraveler adds the starting 0 and the trailing
1301        // length then we have N + unravelers.len() values instead of N + 1.
1302        offsets.pop();
1303
1304        let to_offset = |val: usize| {
1305            T::from_usize(val)
1306            .ok_or_else(|| Error::invalid_input("A single batch had more than i32::MAX values and so a large container type is required", location!()))
1307        };
1308        self.current_rep_cmp += 1;
1309        if let Some(def_levels) = &mut self.def_levels {
1310            assert!(rep_levels.len() == def_levels.len());
1311            // It's possible validity is None even if we have def levels.  For example, we might have
1312            // empty lists (which require def levels) but no nulls.
1313            let mut push_validity: Box<dyn FnMut(bool)> = if let Some(validity) = validity {
1314                Box::new(|is_valid| validity.append(is_valid))
1315            } else {
1316                Box::new(|_| {})
1317            };
1318            // This is a strange access pattern.  We are iterating over the rep/def levels and
1319            // at the same time writing the rep/def levels.  This means we need both a mutable
1320            // and immutable reference to the rep/def levels.
1321            let mut read_idx = 0;
1322            let mut write_idx = 0;
1323            while read_idx < rep_levels.len() {
1324                // SAFETY: We assert that rep_levels and def_levels have the same
1325                // len and read_idx and write_idx can never go past the end.
1326                unsafe {
1327                    let rep_val = *rep_levels.get_unchecked(read_idx);
1328                    if rep_val != 0 {
1329                        let def_val = *def_levels.get_unchecked(read_idx);
1330                        // Copy over
1331                        *rep_levels.get_unchecked_mut(write_idx) = rep_val - 1;
1332                        *def_levels.get_unchecked_mut(write_idx) = def_val;
1333                        write_idx += 1;
1334
1335                        if def_val == 0 {
1336                            // This is a valid list
1337                            offsets.push(to_offset(curlen)?);
1338                            curlen += 1;
1339                            push_validity(true);
1340                        } else if def_val > max_level {
1341                            // This is not visible at this rep level, do not add to offsets, but keep in repdef
1342                        } else if def_val == null_level || def_val > upper_null {
1343                            // This is a null list (or a list masked by a null struct)
1344                            offsets.push(to_offset(curlen)?);
1345                            push_validity(false);
1346                        } else if def_val == empty_level {
1347                            // This is an empty list
1348                            offsets.push(to_offset(curlen)?);
1349                            push_validity(true);
1350                        } else {
1351                            // New valid list starting with null item
1352                            offsets.push(to_offset(curlen)?);
1353                            curlen += 1;
1354                            push_validity(true);
1355                        }
1356                    } else {
1357                        curlen += 1;
1358                    }
1359                    read_idx += 1;
1360                }
1361            }
1362            offsets.push(to_offset(curlen)?);
1363            rep_levels.truncate(write_idx);
1364            def_levels.truncate(write_idx);
1365            Ok(())
1366        } else {
1367            // SAFETY: See above loop
1368            let mut read_idx = 0;
1369            let mut write_idx = 0;
1370            let old_offsets_len = offsets.len();
1371            while read_idx < rep_levels.len() {
1372                // SAFETY: read_idx / write_idx cannot go past rep_levels.len()
1373                unsafe {
1374                    let rep_val = *rep_levels.get_unchecked(read_idx);
1375                    if rep_val != 0 {
1376                        // Finish the current list
1377                        offsets.push(to_offset(curlen)?);
1378                        *rep_levels.get_unchecked_mut(write_idx) = rep_val - 1;
1379                        write_idx += 1;
1380                    }
1381                    curlen += 1;
1382                    read_idx += 1;
1383                }
1384            }
1385            let num_new_lists = offsets.len() - old_offsets_len;
1386            offsets.push(to_offset(curlen)?);
1387            rep_levels.truncate(offsets.len() - 1);
1388            if let Some(validity) = validity {
1389                // Even though we don't have validity it is possible another unraveler did and so we need
1390                // to push all valids
1391                validity.append_n(num_new_lists, true);
1392            }
1393            Ok(())
1394        }
1395    }
1396
1397    pub fn skip_validity(&mut self) {
1398        debug_assert!(
1399            self.def_meaning[self.current_layer] == DefinitionInterpretation::AllValidItem
1400        );
1401        self.current_layer += 1;
1402    }
1403
1404    /// Unravels a layer of validity from the definition levels
1405    pub fn unravel_validity(&mut self, validity: &mut BooleanBufferBuilder) {
1406        if self.def_meaning[self.current_layer] == DefinitionInterpretation::AllValidItem {
1407            self.current_layer += 1;
1408            validity.append_n(self.num_items as usize, true);
1409            return;
1410        }
1411
1412        self.current_layer += 1;
1413        let def_levels = &self.def_levels.as_ref().unwrap();
1414
1415        let current_def_cmp = self.current_def_cmp;
1416        self.current_def_cmp += 1;
1417
1418        for is_valid in def_levels.iter().filter_map(|&level| {
1419            if self.levels_to_rep[level as usize] <= self.current_rep_cmp {
1420                Some(level <= current_def_cmp)
1421            } else {
1422                None
1423            }
1424        }) {
1425            validity.append(is_valid);
1426        }
1427    }
1428
1429    pub fn decimate(&mut self, dimension: usize) {
1430        if self.rep_levels.is_some() {
1431            // If we need to support this then I think we need to walk through the rep def levels to find
1432            // the spots at which we keep.  E.g. if we have:
1433            //  rep: 1 0 0 1 0 1 0 0 0 1 0 0
1434            //  def: 1 1 1 0 1 0 1 1 0 1 1 0
1435            //  dimension: 2
1436            //
1437            // The output should be:
1438            //  rep: 1 0 0 1 0 0 0
1439            //  def: 1 1 1 0 1 1 0
1440            //
1441            // Maybe there's some special logic for empty/null lists?  I'll save the headache for future me.
1442            todo!("Not yet supported FSL<...List<...>>");
1443        }
1444        let Some(def_levels) = self.def_levels.as_mut() else {
1445            return;
1446        };
1447        let mut read_idx = 0;
1448        let mut write_idx = 0;
1449        while read_idx < def_levels.len() {
1450            unsafe {
1451                *def_levels.get_unchecked_mut(write_idx) = *def_levels.get_unchecked(read_idx);
1452            }
1453            write_idx += 1;
1454            read_idx += dimension;
1455        }
1456        def_levels.truncate(write_idx);
1457    }
1458}
1459
1460/// As we decode we may extract rep/def information from multiple pages (or multiple
1461/// chunks within a page).
1462///
1463/// For each chunk we create an unraveler.  Each unraveler can have a completely different
1464/// interpretation (e.g. one page might contain null items but no null structs and the next
1465/// page might have null structs but no null items).
1466///
1467/// Concatenating these unravelers would be tricky and expensive so instead we have a
1468/// composite unraveler which unravels across multiple unravelers.
1469///
1470/// Note: this class should be used even if there is only one page / unraveler.  This is
1471/// because the `RepDefUnraveler`'s API is more complex (it's meant to be called by this
1472/// class)
1473#[derive(Debug)]
1474pub struct CompositeRepDefUnraveler {
1475    unravelers: Vec<RepDefUnraveler>,
1476}
1477
1478impl CompositeRepDefUnraveler {
1479    pub fn new(unravelers: Vec<RepDefUnraveler>) -> Self {
1480        Self { unravelers }
1481    }
1482
1483    /// Unravels a layer of validity
1484    ///
1485    /// Returns None if there are no null items in this layer
1486    pub fn unravel_validity(&mut self, num_values: usize) -> Option<NullBuffer> {
1487        let is_all_valid = self
1488            .unravelers
1489            .iter()
1490            .all(|unraveler| unraveler.is_all_valid());
1491
1492        if is_all_valid {
1493            for unraveler in self.unravelers.iter_mut() {
1494                unraveler.skip_validity();
1495            }
1496            None
1497        } else {
1498            let mut validity = BooleanBufferBuilder::new(num_values);
1499            for unraveler in self.unravelers.iter_mut() {
1500                unraveler.unravel_validity(&mut validity);
1501            }
1502            Some(NullBuffer::new(validity.finish()))
1503        }
1504    }
1505
1506    pub fn unravel_fsl_validity(
1507        &mut self,
1508        num_values: usize,
1509        dimension: usize,
1510    ) -> Option<NullBuffer> {
1511        for unraveler in self.unravelers.iter_mut() {
1512            unraveler.decimate(dimension);
1513        }
1514        self.unravel_validity(num_values)
1515    }
1516
1517    /// Unravels a layer of offsets (and the validity for that layer)
1518    pub fn unravel_offsets<T: ArrowNativeType>(
1519        &mut self,
1520    ) -> Result<(OffsetBuffer<T>, Option<NullBuffer>)> {
1521        let mut is_all_valid = true;
1522        let mut max_num_lists = 0;
1523        for unraveler in self.unravelers.iter() {
1524            is_all_valid &= unraveler.is_all_valid();
1525            max_num_lists += unraveler.max_lists();
1526        }
1527
1528        let mut validity = if is_all_valid {
1529            None
1530        } else {
1531            // Note: This is probably an over-estimate and potentially even an under-estimate.  We only know
1532            // right now how many items we have and not how many rows.  (TODO: Shouldn't we know the # of rows?)
1533            Some(BooleanBufferBuilder::new(max_num_lists))
1534        };
1535
1536        let mut offsets = Vec::with_capacity(max_num_lists + 1);
1537
1538        for unraveler in self.unravelers.iter_mut() {
1539            unraveler.unravel_offsets(&mut offsets, validity.as_mut())?;
1540        }
1541
1542        Ok((
1543            OffsetBuffer::new(ScalarBuffer::from(offsets)),
1544            validity.map(|mut v| NullBuffer::new(v.finish())),
1545        ))
1546    }
1547}
1548
1549/// A [`ControlWordIterator`] when there are both repetition and definition levels
1550///
1551/// The iterator will put the repetition level in the upper bits and the definition
1552/// level in the lower bits.  The number of bits used for each level is determined
1553/// by the width of the repetition and definition levels.
1554#[derive(Debug)]
1555pub struct BinaryControlWordIterator<I: Iterator<Item = (u16, u16)>, W> {
1556    repdef: I,
1557    def_width: usize,
1558    max_rep: u16,
1559    max_visible_def: u16,
1560    rep_mask: u16,
1561    def_mask: u16,
1562    bits_rep: u8,
1563    bits_def: u8,
1564    phantom: std::marker::PhantomData<W>,
1565}
1566
1567impl<I: Iterator<Item = (u16, u16)>> BinaryControlWordIterator<I, u8> {
1568    fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1569        let next = self.repdef.next()?;
1570        let control_word: u8 =
1571            (((next.0 & self.rep_mask) as u8) << self.def_width) + ((next.1 & self.def_mask) as u8);
1572        buf.push(control_word);
1573        let is_new_row = next.0 == self.max_rep;
1574        let is_visible = next.1 <= self.max_visible_def;
1575        let is_valid_item = next.1 == 0;
1576        Some(ControlWordDesc {
1577            is_new_row,
1578            is_visible,
1579            is_valid_item,
1580        })
1581    }
1582}
1583
1584impl<I: Iterator<Item = (u16, u16)>> BinaryControlWordIterator<I, u16> {
1585    fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1586        let next = self.repdef.next()?;
1587        let control_word: u16 =
1588            ((next.0 & self.rep_mask) << self.def_width) + (next.1 & self.def_mask);
1589        let control_word = control_word.to_le_bytes();
1590        buf.push(control_word[0]);
1591        buf.push(control_word[1]);
1592        let is_new_row = next.0 == self.max_rep;
1593        let is_visible = next.1 <= self.max_visible_def;
1594        let is_valid_item = next.1 == 0;
1595        Some(ControlWordDesc {
1596            is_new_row,
1597            is_visible,
1598            is_valid_item,
1599        })
1600    }
1601}
1602
1603impl<I: Iterator<Item = (u16, u16)>> BinaryControlWordIterator<I, u32> {
1604    fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1605        let next = self.repdef.next()?;
1606        let control_word: u32 = (((next.0 & self.rep_mask) as u32) << self.def_width)
1607            + ((next.1 & self.def_mask) as u32);
1608        let control_word = control_word.to_le_bytes();
1609        buf.push(control_word[0]);
1610        buf.push(control_word[1]);
1611        buf.push(control_word[2]);
1612        buf.push(control_word[3]);
1613        let is_new_row = next.0 == self.max_rep;
1614        let is_visible = next.1 <= self.max_visible_def;
1615        let is_valid_item = next.1 == 0;
1616        Some(ControlWordDesc {
1617            is_new_row,
1618            is_visible,
1619            is_valid_item,
1620        })
1621    }
1622}
1623
1624/// A [`ControlWordIterator`] when there are only definition levels or only repetition levels
1625#[derive(Debug)]
1626pub struct UnaryControlWordIterator<I: Iterator<Item = u16>, W> {
1627    repdef: I,
1628    level_mask: u16,
1629    bits_rep: u8,
1630    bits_def: u8,
1631    max_rep: u16,
1632    phantom: std::marker::PhantomData<W>,
1633}
1634
1635impl<I: Iterator<Item = u16>> UnaryControlWordIterator<I, u8> {
1636    fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1637        let next = self.repdef.next()?;
1638        buf.push((next & self.level_mask) as u8);
1639        let is_new_row = self.max_rep == 0 || next == self.max_rep;
1640        let is_valid_item = next == 0 || self.bits_def == 0;
1641        Some(ControlWordDesc {
1642            is_new_row,
1643            // Either there is no rep, in which case there are no invisible items
1644            // or there is no def, in which case there are no invisible items
1645            is_visible: true,
1646            is_valid_item,
1647        })
1648    }
1649}
1650
1651impl<I: Iterator<Item = u16>> UnaryControlWordIterator<I, u16> {
1652    fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1653        let next = self.repdef.next().unwrap() & self.level_mask;
1654        let control_word = next.to_le_bytes();
1655        buf.push(control_word[0]);
1656        buf.push(control_word[1]);
1657        let is_new_row = self.max_rep == 0 || next == self.max_rep;
1658        let is_valid_item = next == 0 || self.bits_def == 0;
1659        Some(ControlWordDesc {
1660            is_new_row,
1661            is_visible: true,
1662            is_valid_item,
1663        })
1664    }
1665}
1666
1667impl<I: Iterator<Item = u16>> UnaryControlWordIterator<I, u32> {
1668    fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1669        let next = self.repdef.next()?;
1670        let next = (next & self.level_mask) as u32;
1671        let control_word = next.to_le_bytes();
1672        buf.push(control_word[0]);
1673        buf.push(control_word[1]);
1674        buf.push(control_word[2]);
1675        buf.push(control_word[3]);
1676        let is_new_row = self.max_rep == 0 || next as u16 == self.max_rep;
1677        let is_valid_item = next == 0 || self.bits_def == 0;
1678        Some(ControlWordDesc {
1679            is_new_row,
1680            is_visible: true,
1681            is_valid_item,
1682        })
1683    }
1684}
1685
1686/// A [`ControlWordIterator`] when there are no repetition or definition levels
1687#[derive(Debug)]
1688pub struct NilaryControlWordIterator {
1689    len: usize,
1690    idx: usize,
1691}
1692
1693impl NilaryControlWordIterator {
1694    fn append_next(&mut self) -> Option<ControlWordDesc> {
1695        if self.idx == self.len {
1696            None
1697        } else {
1698            self.idx += 1;
1699            Some(ControlWordDesc {
1700                is_new_row: true,
1701                is_visible: true,
1702                is_valid_item: true,
1703            })
1704        }
1705    }
1706}
1707
1708/// Helper function to get a bit mask of the given width
1709fn get_mask(width: u16) -> u16 {
1710    (1 << width) - 1
1711}
1712
1713// We're really going out of our way to avoid boxing here but this will be called on a per-value basis
1714// so it is in the critical path.
1715type SpecificBinaryControlWordIterator<'a, T> = BinaryControlWordIterator<
1716    Zip<Copied<std::slice::Iter<'a, u16>>, Copied<std::slice::Iter<'a, u16>>>,
1717    T,
1718>;
1719
1720/// An iterator that generates control words from repetition and definition levels
1721///
1722/// "Control word" is just a fancy term for a single u8/u16/u32 that contains both
1723/// the repetition and definition in it.
1724///
1725/// In the large majority of case we only need a single byte to represent both the
1726/// repetition and definition levels.  However, if there is deep nesting then we may
1727/// need two bytes.  In the worst case we need 4 bytes though this suggests hundreds of
1728/// levels of nesting which seems unlikely to encounter in practice.
1729#[derive(Debug)]
1730pub enum ControlWordIterator<'a> {
1731    Binary8(SpecificBinaryControlWordIterator<'a, u8>),
1732    Binary16(SpecificBinaryControlWordIterator<'a, u16>),
1733    Binary32(SpecificBinaryControlWordIterator<'a, u32>),
1734    Unary8(UnaryControlWordIterator<Copied<std::slice::Iter<'a, u16>>, u8>),
1735    Unary16(UnaryControlWordIterator<Copied<std::slice::Iter<'a, u16>>, u16>),
1736    Unary32(UnaryControlWordIterator<Copied<std::slice::Iter<'a, u16>>, u32>),
1737    Nilary(NilaryControlWordIterator),
1738}
1739
1740/// Describes the properties of a control word
1741#[derive(Debug)]
1742pub struct ControlWordDesc {
1743    pub is_new_row: bool,
1744    pub is_visible: bool,
1745    pub is_valid_item: bool,
1746}
1747
1748impl ControlWordIterator<'_> {
1749    /// Appends the next control word to the buffer
1750    ///
1751    /// Returns true if this is the start of a new item (i.e. the repetition level is maxed out)
1752    pub fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1753        match self {
1754            Self::Binary8(iter) => iter.append_next(buf),
1755            Self::Binary16(iter) => iter.append_next(buf),
1756            Self::Binary32(iter) => iter.append_next(buf),
1757            Self::Unary8(iter) => iter.append_next(buf),
1758            Self::Unary16(iter) => iter.append_next(buf),
1759            Self::Unary32(iter) => iter.append_next(buf),
1760            Self::Nilary(iter) => iter.append_next(),
1761        }
1762    }
1763
1764    /// Return true if the control word iterator has repetition levels
1765    pub fn has_repetition(&self) -> bool {
1766        match self {
1767            Self::Binary8(_) | Self::Binary16(_) | Self::Binary32(_) => true,
1768            Self::Unary8(iter) => iter.bits_rep > 0,
1769            Self::Unary16(iter) => iter.bits_rep > 0,
1770            Self::Unary32(iter) => iter.bits_rep > 0,
1771            Self::Nilary(_) => false,
1772        }
1773    }
1774
1775    /// Returns the number of bytes per control word
1776    pub fn bytes_per_word(&self) -> usize {
1777        match self {
1778            Self::Binary8(_) => 1,
1779            Self::Binary16(_) => 2,
1780            Self::Binary32(_) => 4,
1781            Self::Unary8(_) => 1,
1782            Self::Unary16(_) => 2,
1783            Self::Unary32(_) => 4,
1784            Self::Nilary(_) => 0,
1785        }
1786    }
1787
1788    /// Returns the number of bits used for the repetition level
1789    pub fn bits_rep(&self) -> u8 {
1790        match self {
1791            Self::Binary8(iter) => iter.bits_rep,
1792            Self::Binary16(iter) => iter.bits_rep,
1793            Self::Binary32(iter) => iter.bits_rep,
1794            Self::Unary8(iter) => iter.bits_rep,
1795            Self::Unary16(iter) => iter.bits_rep,
1796            Self::Unary32(iter) => iter.bits_rep,
1797            Self::Nilary(_) => 0,
1798        }
1799    }
1800
1801    /// Returns the number of bits used for the definition level
1802    pub fn bits_def(&self) -> u8 {
1803        match self {
1804            Self::Binary8(iter) => iter.bits_def,
1805            Self::Binary16(iter) => iter.bits_def,
1806            Self::Binary32(iter) => iter.bits_def,
1807            Self::Unary8(iter) => iter.bits_def,
1808            Self::Unary16(iter) => iter.bits_def,
1809            Self::Unary32(iter) => iter.bits_def,
1810            Self::Nilary(_) => 0,
1811        }
1812    }
1813}
1814
1815/// Builds a [`ControlWordIterator`] from repetition and definition levels
1816/// by first calculating the width needed and then creating the iterator
1817/// with the appropriate width
1818pub fn build_control_word_iterator<'a>(
1819    rep: Option<&'a [u16]>,
1820    max_rep: u16,
1821    def: Option<&'a [u16]>,
1822    max_def: u16,
1823    max_visible_def: u16,
1824    len: usize,
1825) -> ControlWordIterator<'a> {
1826    let rep_width = if max_rep == 0 {
1827        0
1828    } else {
1829        log_2_ceil(max_rep as u32) as u16
1830    };
1831    let rep_mask = if max_rep == 0 { 0 } else { get_mask(rep_width) };
1832    let def_width = if max_def == 0 {
1833        0
1834    } else {
1835        log_2_ceil(max_def as u32) as u16
1836    };
1837    let def_mask = if max_def == 0 { 0 } else { get_mask(def_width) };
1838    let total_width = rep_width + def_width;
1839    match (rep, def) {
1840        (Some(rep), Some(def)) => {
1841            let iter = rep.iter().copied().zip(def.iter().copied());
1842            let def_width = def_width as usize;
1843            if total_width <= 8 {
1844                ControlWordIterator::Binary8(BinaryControlWordIterator {
1845                    repdef: iter,
1846                    rep_mask,
1847                    def_mask,
1848                    def_width,
1849                    max_rep,
1850                    max_visible_def,
1851                    bits_rep: rep_width as u8,
1852                    bits_def: def_width as u8,
1853                    phantom: std::marker::PhantomData,
1854                })
1855            } else if total_width <= 16 {
1856                ControlWordIterator::Binary16(BinaryControlWordIterator {
1857                    repdef: iter,
1858                    rep_mask,
1859                    def_mask,
1860                    def_width,
1861                    max_rep,
1862                    max_visible_def,
1863                    bits_rep: rep_width as u8,
1864                    bits_def: def_width as u8,
1865                    phantom: std::marker::PhantomData,
1866                })
1867            } else {
1868                ControlWordIterator::Binary32(BinaryControlWordIterator {
1869                    repdef: iter,
1870                    rep_mask,
1871                    def_mask,
1872                    def_width,
1873                    max_rep,
1874                    max_visible_def,
1875                    bits_rep: rep_width as u8,
1876                    bits_def: def_width as u8,
1877                    phantom: std::marker::PhantomData,
1878                })
1879            }
1880        }
1881        (Some(lev), None) => {
1882            let iter = lev.iter().copied();
1883            if total_width <= 8 {
1884                ControlWordIterator::Unary8(UnaryControlWordIterator {
1885                    repdef: iter,
1886                    level_mask: rep_mask,
1887                    bits_rep: total_width as u8,
1888                    bits_def: 0,
1889                    max_rep,
1890                    phantom: std::marker::PhantomData,
1891                })
1892            } else if total_width <= 16 {
1893                ControlWordIterator::Unary16(UnaryControlWordIterator {
1894                    repdef: iter,
1895                    level_mask: rep_mask,
1896                    bits_rep: total_width as u8,
1897                    bits_def: 0,
1898                    max_rep,
1899                    phantom: std::marker::PhantomData,
1900                })
1901            } else {
1902                ControlWordIterator::Unary32(UnaryControlWordIterator {
1903                    repdef: iter,
1904                    level_mask: rep_mask,
1905                    bits_rep: total_width as u8,
1906                    bits_def: 0,
1907                    max_rep,
1908                    phantom: std::marker::PhantomData,
1909                })
1910            }
1911        }
1912        (None, Some(lev)) => {
1913            let iter = lev.iter().copied();
1914            if total_width <= 8 {
1915                ControlWordIterator::Unary8(UnaryControlWordIterator {
1916                    repdef: iter,
1917                    level_mask: def_mask,
1918                    bits_rep: 0,
1919                    bits_def: total_width as u8,
1920                    max_rep: 0,
1921                    phantom: std::marker::PhantomData,
1922                })
1923            } else if total_width <= 16 {
1924                ControlWordIterator::Unary16(UnaryControlWordIterator {
1925                    repdef: iter,
1926                    level_mask: def_mask,
1927                    bits_rep: 0,
1928                    bits_def: total_width as u8,
1929                    max_rep: 0,
1930                    phantom: std::marker::PhantomData,
1931                })
1932            } else {
1933                ControlWordIterator::Unary32(UnaryControlWordIterator {
1934                    repdef: iter,
1935                    level_mask: def_mask,
1936                    bits_rep: 0,
1937                    bits_def: total_width as u8,
1938                    max_rep: 0,
1939                    phantom: std::marker::PhantomData,
1940                })
1941            }
1942        }
1943        (None, None) => ControlWordIterator::Nilary(NilaryControlWordIterator { len, idx: 0 }),
1944    }
1945}
1946
1947/// A parser to unwrap control words into repetition and definition levels
1948///
1949/// This is the inverse of the [`ControlWordIterator`].
1950#[derive(Copy, Clone, Debug)]
1951pub enum ControlWordParser {
1952    // First item is the bits to shift, second is the mask to apply (the mask can be
1953    // calculated from the bits to shift but we don't want to calculate it each time)
1954    BOTH8(u8, u32),
1955    BOTH16(u8, u32),
1956    BOTH32(u8, u32),
1957    REP8,
1958    REP16,
1959    REP32,
1960    DEF8,
1961    DEF16,
1962    DEF32,
1963    NIL,
1964}
1965
1966impl ControlWordParser {
1967    fn parse_both<const WORD_SIZE: u8>(
1968        src: &[u8],
1969        dst_rep: &mut Vec<u16>,
1970        dst_def: &mut Vec<u16>,
1971        bits_to_shift: u8,
1972        mask_to_apply: u32,
1973    ) {
1974        match WORD_SIZE {
1975            1 => {
1976                let word = src[0];
1977                let rep = word >> bits_to_shift;
1978                let def = word & (mask_to_apply as u8);
1979                dst_rep.push(rep as u16);
1980                dst_def.push(def as u16);
1981            }
1982            2 => {
1983                let word = u16::from_le_bytes([src[0], src[1]]);
1984                let rep = word >> bits_to_shift;
1985                let def = word & mask_to_apply as u16;
1986                dst_rep.push(rep);
1987                dst_def.push(def);
1988            }
1989            4 => {
1990                let word = u32::from_le_bytes([src[0], src[1], src[2], src[3]]);
1991                let rep = word >> bits_to_shift;
1992                let def = word & mask_to_apply;
1993                dst_rep.push(rep as u16);
1994                dst_def.push(def as u16);
1995            }
1996            _ => unreachable!(),
1997        }
1998    }
1999
2000    fn parse_desc_both<const WORD_SIZE: u8>(
2001        src: &[u8],
2002        bits_to_shift: u8,
2003        mask_to_apply: u32,
2004        max_rep: u16,
2005        max_visible_def: u16,
2006    ) -> ControlWordDesc {
2007        match WORD_SIZE {
2008            1 => {
2009                let word = src[0];
2010                let rep = word >> bits_to_shift;
2011                let def = word & (mask_to_apply as u8);
2012                let is_visible = def as u16 <= max_visible_def;
2013                let is_new_row = rep as u16 == max_rep;
2014                let is_valid_item = def == 0;
2015                ControlWordDesc {
2016                    is_visible,
2017                    is_new_row,
2018                    is_valid_item,
2019                }
2020            }
2021            2 => {
2022                let word = u16::from_le_bytes([src[0], src[1]]);
2023                let rep = word >> bits_to_shift;
2024                let def = word & mask_to_apply as u16;
2025                let is_visible = def <= max_visible_def;
2026                let is_new_row = rep == max_rep;
2027                let is_valid_item = def == 0;
2028                ControlWordDesc {
2029                    is_visible,
2030                    is_new_row,
2031                    is_valid_item,
2032                }
2033            }
2034            4 => {
2035                let word = u32::from_le_bytes([src[0], src[1], src[2], src[3]]);
2036                let rep = word >> bits_to_shift;
2037                let def = word & mask_to_apply;
2038                let is_visible = def as u16 <= max_visible_def;
2039                let is_new_row = rep as u16 == max_rep;
2040                let is_valid_item = def == 0;
2041                ControlWordDesc {
2042                    is_visible,
2043                    is_new_row,
2044                    is_valid_item,
2045                }
2046            }
2047            _ => unreachable!(),
2048        }
2049    }
2050
2051    fn parse_one<const WORD_SIZE: u8>(src: &[u8], dst: &mut Vec<u16>) {
2052        match WORD_SIZE {
2053            1 => {
2054                let word = src[0];
2055                dst.push(word as u16);
2056            }
2057            2 => {
2058                let word = u16::from_le_bytes([src[0], src[1]]);
2059                dst.push(word);
2060            }
2061            4 => {
2062                let word = u32::from_le_bytes([src[0], src[1], src[2], src[3]]);
2063                dst.push(word as u16);
2064            }
2065            _ => unreachable!(),
2066        }
2067    }
2068
2069    fn parse_rep_desc_one<const WORD_SIZE: u8>(src: &[u8], max_rep: u16) -> ControlWordDesc {
2070        match WORD_SIZE {
2071            1 => ControlWordDesc {
2072                is_new_row: src[0] as u16 == max_rep,
2073                is_visible: true,
2074                is_valid_item: true,
2075            },
2076            2 => ControlWordDesc {
2077                is_new_row: u16::from_le_bytes([src[0], src[1]]) == max_rep,
2078                is_visible: true,
2079                is_valid_item: true,
2080            },
2081            4 => ControlWordDesc {
2082                is_new_row: u32::from_le_bytes([src[0], src[1], src[2], src[3]]) as u16 == max_rep,
2083                is_visible: true,
2084                is_valid_item: true,
2085            },
2086            _ => unreachable!(),
2087        }
2088    }
2089
2090    fn parse_def_desc_one<const WORD_SIZE: u8>(src: &[u8]) -> ControlWordDesc {
2091        match WORD_SIZE {
2092            1 => ControlWordDesc {
2093                is_new_row: true,
2094                is_visible: true,
2095                is_valid_item: src[0] == 0,
2096            },
2097            2 => ControlWordDesc {
2098                is_new_row: true,
2099                is_visible: true,
2100                is_valid_item: u16::from_le_bytes([src[0], src[1]]) == 0,
2101            },
2102            4 => ControlWordDesc {
2103                is_new_row: true,
2104                is_visible: true,
2105                is_valid_item: u32::from_le_bytes([src[0], src[1], src[2], src[3]]) as u16 == 0,
2106            },
2107            _ => unreachable!(),
2108        }
2109    }
2110
2111    /// Returns the number of bytes per control word
2112    pub fn bytes_per_word(&self) -> usize {
2113        match self {
2114            Self::BOTH8(..) => 1,
2115            Self::BOTH16(..) => 2,
2116            Self::BOTH32(..) => 4,
2117            Self::REP8 => 1,
2118            Self::REP16 => 2,
2119            Self::REP32 => 4,
2120            Self::DEF8 => 1,
2121            Self::DEF16 => 2,
2122            Self::DEF32 => 4,
2123            Self::NIL => 0,
2124        }
2125    }
2126
2127    /// Appends the next control word to the rep & def buffers
2128    ///
2129    /// `src` should be pointing at the first byte (little endian) of the control word
2130    ///
2131    /// `dst_rep` and `dst_def` are the buffers to append the rep and def levels to.
2132    /// They will not be appended to if not needed.
2133    pub fn parse(&self, src: &[u8], dst_rep: &mut Vec<u16>, dst_def: &mut Vec<u16>) {
2134        match self {
2135            Self::BOTH8(bits_to_shift, mask_to_apply) => {
2136                Self::parse_both::<1>(src, dst_rep, dst_def, *bits_to_shift, *mask_to_apply)
2137            }
2138            Self::BOTH16(bits_to_shift, mask_to_apply) => {
2139                Self::parse_both::<2>(src, dst_rep, dst_def, *bits_to_shift, *mask_to_apply)
2140            }
2141            Self::BOTH32(bits_to_shift, mask_to_apply) => {
2142                Self::parse_both::<4>(src, dst_rep, dst_def, *bits_to_shift, *mask_to_apply)
2143            }
2144            Self::REP8 => Self::parse_one::<1>(src, dst_rep),
2145            Self::REP16 => Self::parse_one::<2>(src, dst_rep),
2146            Self::REP32 => Self::parse_one::<4>(src, dst_rep),
2147            Self::DEF8 => Self::parse_one::<1>(src, dst_def),
2148            Self::DEF16 => Self::parse_one::<2>(src, dst_def),
2149            Self::DEF32 => Self::parse_one::<4>(src, dst_def),
2150            Self::NIL => {}
2151        }
2152    }
2153
2154    /// Return true if the control words contain repetition information
2155    pub fn has_rep(&self) -> bool {
2156        match self {
2157            Self::BOTH8(..)
2158            | Self::BOTH16(..)
2159            | Self::BOTH32(..)
2160            | Self::REP8
2161            | Self::REP16
2162            | Self::REP32 => true,
2163            Self::DEF8 | Self::DEF16 | Self::DEF32 | Self::NIL => false,
2164        }
2165    }
2166
2167    /// Temporarily parses the control word to inspect its properties but does not append to any buffers
2168    pub fn parse_desc(&self, src: &[u8], max_rep: u16, max_visible_def: u16) -> ControlWordDesc {
2169        match self {
2170            Self::BOTH8(bits_to_shift, mask_to_apply) => Self::parse_desc_both::<1>(
2171                src,
2172                *bits_to_shift,
2173                *mask_to_apply,
2174                max_rep,
2175                max_visible_def,
2176            ),
2177            Self::BOTH16(bits_to_shift, mask_to_apply) => Self::parse_desc_both::<2>(
2178                src,
2179                *bits_to_shift,
2180                *mask_to_apply,
2181                max_rep,
2182                max_visible_def,
2183            ),
2184            Self::BOTH32(bits_to_shift, mask_to_apply) => Self::parse_desc_both::<4>(
2185                src,
2186                *bits_to_shift,
2187                *mask_to_apply,
2188                max_rep,
2189                max_visible_def,
2190            ),
2191            Self::REP8 => Self::parse_rep_desc_one::<1>(src, max_rep),
2192            Self::REP16 => Self::parse_rep_desc_one::<2>(src, max_rep),
2193            Self::REP32 => Self::parse_rep_desc_one::<4>(src, max_rep),
2194            Self::DEF8 => Self::parse_def_desc_one::<1>(src),
2195            Self::DEF16 => Self::parse_def_desc_one::<2>(src),
2196            Self::DEF32 => Self::parse_def_desc_one::<4>(src),
2197            Self::NIL => ControlWordDesc {
2198                is_new_row: true,
2199                is_valid_item: true,
2200                is_visible: true,
2201            },
2202        }
2203    }
2204
2205    /// Creates a new parser from the number of bits used for the repetition and definition levels
2206    pub fn new(bits_rep: u8, bits_def: u8) -> Self {
2207        let total_bits = bits_rep + bits_def;
2208
2209        enum WordSize {
2210            One,
2211            Two,
2212            Four,
2213        }
2214
2215        let word_size = if total_bits <= 8 {
2216            WordSize::One
2217        } else if total_bits <= 16 {
2218            WordSize::Two
2219        } else {
2220            WordSize::Four
2221        };
2222
2223        match (bits_rep > 0, bits_def > 0, word_size) {
2224            (false, false, _) => Self::NIL,
2225            (false, true, WordSize::One) => Self::DEF8,
2226            (false, true, WordSize::Two) => Self::DEF16,
2227            (false, true, WordSize::Four) => Self::DEF32,
2228            (true, false, WordSize::One) => Self::REP8,
2229            (true, false, WordSize::Two) => Self::REP16,
2230            (true, false, WordSize::Four) => Self::REP32,
2231            (true, true, WordSize::One) => Self::BOTH8(bits_def, get_mask(bits_def as u16) as u32),
2232            (true, true, WordSize::Two) => Self::BOTH16(bits_def, get_mask(bits_def as u16) as u32),
2233            (true, true, WordSize::Four) => {
2234                Self::BOTH32(bits_def, get_mask(bits_def as u16) as u32)
2235            }
2236        }
2237    }
2238}
2239
2240#[cfg(test)]
2241mod tests {
2242    use arrow_buffer::{NullBuffer, OffsetBuffer, ScalarBuffer};
2243
2244    use crate::repdef::{
2245        CompositeRepDefUnraveler, DefinitionInterpretation, RepDefUnraveler, SerializedRepDefs,
2246    };
2247
2248    use super::RepDefBuilder;
2249
2250    fn validity(values: &[bool]) -> NullBuffer {
2251        NullBuffer::from_iter(values.iter().copied())
2252    }
2253
2254    fn offsets_32(values: &[i32]) -> OffsetBuffer<i32> {
2255        OffsetBuffer::<i32>::new(ScalarBuffer::from_iter(values.iter().copied()))
2256    }
2257
2258    fn offsets_64(values: &[i64]) -> OffsetBuffer<i64> {
2259        OffsetBuffer::<i64>::new(ScalarBuffer::from_iter(values.iter().copied()))
2260    }
2261
2262    #[test]
2263    fn test_repdef_basic() {
2264        // Basic case, rep & def
2265        let mut builder = RepDefBuilder::default();
2266        builder.add_offsets(
2267            offsets_64(&[0, 2, 2, 5]),
2268            Some(validity(&[true, false, true])),
2269        );
2270        builder.add_offsets(
2271            offsets_64(&[0, 1, 3, 5, 5, 9]),
2272            Some(validity(&[true, true, true, false, true])),
2273        );
2274        builder.add_validity_bitmap(validity(&[
2275            true, true, true, false, false, false, true, true, false,
2276        ]));
2277
2278        let repdefs = RepDefBuilder::serialize(vec![builder]);
2279        let rep = repdefs.repetition_levels.unwrap();
2280        let def = repdefs.definition_levels.unwrap();
2281
2282        assert_eq!(vec![0, 0, 0, 3, 1, 1, 2, 1, 0, 0, 1], *def);
2283        assert_eq!(vec![2, 1, 0, 2, 2, 0, 1, 1, 0, 0, 0], *rep);
2284
2285        // [[I], [I, I]], NULL, [[NULL, NULL], NULL, [NULL, I, I, NULL]]
2286
2287        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2288            Some(rep.as_ref().to_vec()),
2289            Some(def.as_ref().to_vec()),
2290            repdefs.def_meaning.into(),
2291            9,
2292        )]);
2293
2294        // Note: validity doesn't exactly round-trip because repdef normalizes some of the
2295        // redundant validity values
2296        assert_eq!(
2297            unraveler.unravel_validity(9),
2298            Some(validity(&[
2299                true, true, true, false, false, false, true, true, false
2300            ]))
2301        );
2302        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2303        assert_eq!(off.inner(), offsets_32(&[0, 1, 3, 5, 5, 9]).inner());
2304        assert_eq!(val, Some(validity(&[true, true, true, false, true])));
2305        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2306        assert_eq!(off.inner(), offsets_32(&[0, 2, 2, 5]).inner());
2307        assert_eq!(val, Some(validity(&[true, false, true])));
2308    }
2309
2310    #[test]
2311    fn test_repdef_simple_null_empty_list() {
2312        let check = |repdefs: SerializedRepDefs, last_def: DefinitionInterpretation| {
2313            let rep = repdefs.repetition_levels.unwrap();
2314            let def = repdefs.definition_levels.unwrap();
2315
2316            assert_eq!([1, 0, 1, 1, 0, 0], *rep);
2317            assert_eq!([0, 0, 2, 0, 1, 0], *def);
2318            assert_eq!(
2319                vec![DefinitionInterpretation::NullableItem, last_def,],
2320                repdefs.def_meaning
2321            );
2322        };
2323
2324        // Null list and empty list should be serialized mostly the same
2325
2326        // Null case
2327        let mut builder = RepDefBuilder::default();
2328        builder.add_offsets(
2329            offsets_32(&[0, 2, 2, 5]),
2330            Some(validity(&[true, false, true])),
2331        );
2332        builder.add_validity_bitmap(validity(&[true, true, true, false, true]));
2333
2334        let repdefs = RepDefBuilder::serialize(vec![builder]);
2335
2336        check(repdefs, DefinitionInterpretation::NullableList);
2337
2338        // Empty case
2339        let mut builder = RepDefBuilder::default();
2340        builder.add_offsets(offsets_32(&[0, 2, 2, 5]), None);
2341        builder.add_validity_bitmap(validity(&[true, true, true, false, true]));
2342
2343        let repdefs = RepDefBuilder::serialize(vec![builder]);
2344
2345        check(repdefs, DefinitionInterpretation::EmptyableList);
2346    }
2347
2348    #[test]
2349    fn test_repdef_empty_list_at_end() {
2350        // Regresses a failure we encountered when the last item was an empty list
2351        let mut builder = RepDefBuilder::default();
2352        builder.add_offsets(offsets_32(&[0, 2, 5, 5]), None);
2353        builder.add_validity_bitmap(validity(&[true, true, true, false, true]));
2354
2355        let repdefs = RepDefBuilder::serialize(vec![builder]);
2356
2357        let rep = repdefs.repetition_levels.unwrap();
2358        let def = repdefs.definition_levels.unwrap();
2359
2360        assert_eq!([1, 0, 1, 0, 0, 1], *rep);
2361        assert_eq!([0, 0, 0, 1, 0, 2], *def);
2362        assert_eq!(
2363            vec![
2364                DefinitionInterpretation::NullableItem,
2365                DefinitionInterpretation::EmptyableList,
2366            ],
2367            repdefs.def_meaning
2368        );
2369    }
2370
2371    #[test]
2372    fn test_repdef_abnormal_nulls() {
2373        // List nulls are allowed to have non-empty offsets and garbage values
2374        // and the add_offsets call should normalize this
2375        let mut builder = RepDefBuilder::default();
2376        builder.add_offsets(
2377            offsets_32(&[0, 2, 5, 8]),
2378            Some(validity(&[true, false, true])),
2379        );
2380        // Note: we pass 5 here and not 8.  If add_offsets tells us there is garbage nulls they
2381        // should be removed before continuing
2382        builder.add_no_null(5);
2383
2384        let repdefs = RepDefBuilder::serialize(vec![builder]);
2385
2386        let rep = repdefs.repetition_levels.unwrap();
2387        let def = repdefs.definition_levels.unwrap();
2388
2389        assert_eq!([1, 0, 1, 1, 0, 0], *rep);
2390        assert_eq!([0, 0, 1, 0, 0, 0], *def);
2391
2392        assert_eq!(
2393            vec![
2394                DefinitionInterpretation::AllValidItem,
2395                DefinitionInterpretation::NullableList,
2396            ],
2397            repdefs.def_meaning
2398        );
2399    }
2400
2401    #[test]
2402    fn test_repdef_fsl() {
2403        let mut builder = RepDefBuilder::default();
2404        builder.add_fsl(Some(validity(&[true, false])), 2, 2);
2405        builder.add_fsl(None, 2, 4);
2406        builder.add_validity_bitmap(validity(&[
2407            true, false, true, false, true, false, true, false,
2408        ]));
2409
2410        let repdefs = RepDefBuilder::serialize(vec![builder]);
2411
2412        assert_eq!(
2413            vec![
2414                DefinitionInterpretation::NullableItem,
2415                DefinitionInterpretation::AllValidItem,
2416                DefinitionInterpretation::NullableItem
2417            ],
2418            repdefs.def_meaning
2419        );
2420
2421        assert!(repdefs.repetition_levels.is_none());
2422
2423        let def = repdefs.definition_levels.unwrap();
2424
2425        assert_eq!([0, 1, 0, 1, 2, 2, 2, 2], *def);
2426
2427        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2428            None,
2429            Some(def.as_ref().to_vec()),
2430            repdefs.def_meaning.into(),
2431            8,
2432        )]);
2433
2434        assert_eq!(
2435            unraveler.unravel_validity(8),
2436            Some(validity(&[
2437                true, false, true, false, false, false, false, false
2438            ]))
2439        );
2440        assert_eq!(unraveler.unravel_fsl_validity(4, 2), None);
2441        assert_eq!(
2442            unraveler.unravel_fsl_validity(2, 2),
2443            Some(validity(&[true, false]))
2444        );
2445    }
2446
2447    #[test]
2448    fn test_repdef_fsl_allvalid_item() {
2449        let mut builder = RepDefBuilder::default();
2450        builder.add_fsl(Some(validity(&[true, false])), 2, 2);
2451        builder.add_fsl(None, 2, 4);
2452        builder.add_no_null(8);
2453
2454        let repdefs = RepDefBuilder::serialize(vec![builder]);
2455
2456        assert_eq!(
2457            vec![
2458                DefinitionInterpretation::AllValidItem,
2459                DefinitionInterpretation::AllValidItem,
2460                DefinitionInterpretation::NullableItem
2461            ],
2462            repdefs.def_meaning
2463        );
2464
2465        assert!(repdefs.repetition_levels.is_none());
2466
2467        let def = repdefs.definition_levels.unwrap();
2468
2469        assert_eq!([0, 0, 0, 0, 1, 1, 1, 1], *def);
2470
2471        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2472            None,
2473            Some(def.as_ref().to_vec()),
2474            repdefs.def_meaning.into(),
2475            8,
2476        )]);
2477
2478        assert_eq!(unraveler.unravel_validity(8), None);
2479        assert_eq!(unraveler.unravel_fsl_validity(4, 2), None);
2480        assert_eq!(
2481            unraveler.unravel_fsl_validity(2, 2),
2482            Some(validity(&[true, false]))
2483        );
2484    }
2485
2486    #[test]
2487    fn test_repdef_sliced_offsets() {
2488        // Sliced lists may have offsets that don't start with zero.  The
2489        // add_offsets call needs to normalize these to operate correctly.
2490        let mut builder = RepDefBuilder::default();
2491        builder.add_offsets(
2492            offsets_32(&[5, 7, 7, 10]),
2493            Some(validity(&[true, false, true])),
2494        );
2495        builder.add_no_null(5);
2496
2497        let repdefs = RepDefBuilder::serialize(vec![builder]);
2498
2499        let rep = repdefs.repetition_levels.unwrap();
2500        let def = repdefs.definition_levels.unwrap();
2501
2502        assert_eq!([1, 0, 1, 1, 0, 0], *rep);
2503        assert_eq!([0, 0, 1, 0, 0, 0], *def);
2504
2505        assert_eq!(
2506            vec![
2507                DefinitionInterpretation::AllValidItem,
2508                DefinitionInterpretation::NullableList,
2509            ],
2510            repdefs.def_meaning
2511        );
2512    }
2513
2514    #[test]
2515    fn test_repdef_complex_null_empty() {
2516        let mut builder = RepDefBuilder::default();
2517        builder.add_offsets(
2518            offsets_32(&[0, 4, 4, 4, 6]),
2519            Some(validity(&[true, false, true, true])),
2520        );
2521        builder.add_offsets(
2522            offsets_32(&[0, 1, 1, 2, 2, 2, 3]),
2523            Some(validity(&[true, false, true, false, true, true])),
2524        );
2525        builder.add_no_null(3);
2526
2527        let repdefs = RepDefBuilder::serialize(vec![builder]);
2528
2529        let rep = repdefs.repetition_levels.unwrap();
2530        let def = repdefs.definition_levels.unwrap();
2531
2532        assert_eq!([2, 1, 1, 1, 2, 2, 2, 1], *rep);
2533        assert_eq!([0, 1, 0, 1, 3, 4, 2, 0], *def);
2534    }
2535
2536    #[test]
2537    fn test_repdef_empty_list_no_null() {
2538        // Tests when we have some empty lists but no null lists.  This case
2539        // caused some bugs because we have definition but no nulls
2540        let mut builder = RepDefBuilder::default();
2541        builder.add_offsets(offsets_32(&[0, 4, 4, 4, 6]), None);
2542        builder.add_no_null(6);
2543
2544        let repdefs = RepDefBuilder::serialize(vec![builder]);
2545
2546        let rep = repdefs.repetition_levels.unwrap();
2547        let def = repdefs.definition_levels.unwrap();
2548
2549        assert_eq!([1, 0, 0, 0, 1, 1, 1, 0], *rep);
2550        assert_eq!([0, 0, 0, 0, 1, 1, 0, 0], *def);
2551
2552        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2553            Some(rep.as_ref().to_vec()),
2554            Some(def.as_ref().to_vec()),
2555            repdefs.def_meaning.into(),
2556            8,
2557        )]);
2558
2559        assert_eq!(unraveler.unravel_validity(6), None);
2560        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2561        assert_eq!(off.inner(), offsets_32(&[0, 4, 4, 4, 6]).inner());
2562        assert_eq!(val, None);
2563    }
2564
2565    #[test]
2566    fn test_repdef_all_valid() {
2567        let mut builder = RepDefBuilder::default();
2568        builder.add_offsets(offsets_64(&[0, 2, 3, 5]), None);
2569        builder.add_offsets(offsets_64(&[0, 1, 3, 5, 7, 9]), None);
2570        builder.add_no_null(9);
2571
2572        let repdefs = RepDefBuilder::serialize(vec![builder]);
2573        let rep = repdefs.repetition_levels.unwrap();
2574        assert!(repdefs.definition_levels.is_none());
2575
2576        assert_eq!([2, 1, 0, 2, 0, 2, 0, 1, 0], *rep);
2577
2578        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2579            Some(rep.as_ref().to_vec()),
2580            None,
2581            repdefs.def_meaning.into(),
2582            9,
2583        )]);
2584
2585        assert_eq!(unraveler.unravel_validity(9), None);
2586        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2587        assert_eq!(off.inner(), offsets_32(&[0, 1, 3, 5, 7, 9]).inner());
2588        assert_eq!(val, None);
2589        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2590        assert_eq!(off.inner(), offsets_32(&[0, 2, 3, 5]).inner());
2591        assert_eq!(val, None);
2592    }
2593
2594    #[test]
2595    fn test_only_empty_lists() {
2596        let mut builder = RepDefBuilder::default();
2597        builder.add_offsets(offsets_32(&[0, 4, 4, 4, 6]), None);
2598        builder.add_no_null(6);
2599
2600        let repdefs = RepDefBuilder::serialize(vec![builder]);
2601
2602        let rep = repdefs.repetition_levels.unwrap();
2603        let def = repdefs.definition_levels.unwrap();
2604
2605        assert_eq!([1, 0, 0, 0, 1, 1, 1, 0], *rep);
2606        assert_eq!([0, 0, 0, 0, 1, 1, 0, 0], *def);
2607
2608        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2609            Some(rep.as_ref().to_vec()),
2610            Some(def.as_ref().to_vec()),
2611            repdefs.def_meaning.into(),
2612            8,
2613        )]);
2614
2615        assert_eq!(unraveler.unravel_validity(6), None);
2616        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2617        assert_eq!(off.inner(), offsets_32(&[0, 4, 4, 4, 6]).inner());
2618        assert_eq!(val, None);
2619    }
2620
2621    #[test]
2622    fn test_only_null_lists() {
2623        let mut builder = RepDefBuilder::default();
2624        builder.add_offsets(
2625            offsets_32(&[0, 4, 4, 4, 6]),
2626            Some(validity(&[true, false, false, true])),
2627        );
2628        builder.add_no_null(6);
2629
2630        let repdefs = RepDefBuilder::serialize(vec![builder]);
2631
2632        let rep = repdefs.repetition_levels.unwrap();
2633        let def = repdefs.definition_levels.unwrap();
2634
2635        assert_eq!([1, 0, 0, 0, 1, 1, 1, 0], *rep);
2636        assert_eq!([0, 0, 0, 0, 1, 1, 0, 0], *def);
2637
2638        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2639            Some(rep.as_ref().to_vec()),
2640            Some(def.as_ref().to_vec()),
2641            repdefs.def_meaning.into(),
2642            8,
2643        )]);
2644
2645        assert_eq!(unraveler.unravel_validity(6), None);
2646        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2647        assert_eq!(off.inner(), offsets_32(&[0, 4, 4, 4, 6]).inner());
2648        assert_eq!(val, Some(validity(&[true, false, false, true])));
2649    }
2650
2651    #[test]
2652    fn test_null_and_empty_lists() {
2653        let mut builder = RepDefBuilder::default();
2654        builder.add_offsets(
2655            offsets_32(&[0, 4, 4, 4, 6]),
2656            Some(validity(&[true, false, true, true])),
2657        );
2658        builder.add_no_null(6);
2659
2660        let repdefs = RepDefBuilder::serialize(vec![builder]);
2661
2662        let rep = repdefs.repetition_levels.unwrap();
2663        let def = repdefs.definition_levels.unwrap();
2664
2665        assert_eq!([1, 0, 0, 0, 1, 1, 1, 0], *rep);
2666        assert_eq!([0, 0, 0, 0, 1, 2, 0, 0], *def);
2667
2668        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2669            Some(rep.as_ref().to_vec()),
2670            Some(def.as_ref().to_vec()),
2671            repdefs.def_meaning.into(),
2672            8,
2673        )]);
2674
2675        assert_eq!(unraveler.unravel_validity(6), None);
2676        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2677        assert_eq!(off.inner(), offsets_32(&[0, 4, 4, 4, 6]).inner());
2678        assert_eq!(val, Some(validity(&[true, false, true, true])));
2679    }
2680
2681    #[test]
2682    fn test_repdef_no_rep() {
2683        let mut builder = RepDefBuilder::default();
2684        builder.add_no_null(5);
2685        builder.add_validity_bitmap(validity(&[false, false, true, true, true]));
2686        builder.add_validity_bitmap(validity(&[false, true, true, true, false]));
2687
2688        let repdefs = RepDefBuilder::serialize(vec![builder]);
2689        assert!(repdefs.repetition_levels.is_none());
2690        let def = repdefs.definition_levels.unwrap();
2691
2692        assert_eq!([2, 2, 0, 0, 1], *def);
2693
2694        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2695            None,
2696            Some(def.as_ref().to_vec()),
2697            repdefs.def_meaning.into(),
2698            5,
2699        )]);
2700
2701        assert_eq!(
2702            unraveler.unravel_validity(5),
2703            Some(validity(&[false, false, true, true, false]))
2704        );
2705        assert_eq!(
2706            unraveler.unravel_validity(5),
2707            Some(validity(&[false, false, true, true, true]))
2708        );
2709        assert_eq!(unraveler.unravel_validity(5), None);
2710    }
2711
2712    #[test]
2713    fn test_composite_unravel() {
2714        let mut builder = RepDefBuilder::default();
2715        builder.add_offsets(
2716            offsets_64(&[0, 2, 2, 5]),
2717            Some(validity(&[true, false, true])),
2718        );
2719        builder.add_no_null(5);
2720        let repdef1 = RepDefBuilder::serialize(vec![builder]);
2721
2722        let mut builder = RepDefBuilder::default();
2723        builder.add_offsets(offsets_64(&[0, 1, 3, 5, 7, 9]), None);
2724        builder.add_no_null(9);
2725        let repdef2 = RepDefBuilder::serialize(vec![builder]);
2726
2727        let rep1 = repdef1.repetition_levels.clone().unwrap();
2728        let def1 = repdef1.definition_levels.clone().unwrap();
2729        let rep2 = repdef2.repetition_levels.clone().unwrap();
2730        assert!(repdef2.definition_levels.is_none());
2731
2732        assert_eq!([1, 0, 1, 1, 0, 0], *rep1);
2733        assert_eq!([0, 0, 1, 0, 0, 0], *def1);
2734        assert_eq!([1, 1, 0, 1, 0, 1, 0, 1, 0], *rep2);
2735
2736        let unravel1 = RepDefUnraveler::new(
2737            repdef1.repetition_levels.map(|l| l.to_vec()),
2738            repdef1.definition_levels.map(|l| l.to_vec()),
2739            repdef1.def_meaning.into(),
2740            5,
2741        );
2742        let unravel2 = RepDefUnraveler::new(
2743            repdef2.repetition_levels.map(|l| l.to_vec()),
2744            repdef2.definition_levels.map(|l| l.to_vec()),
2745            repdef2.def_meaning.into(),
2746            9,
2747        );
2748
2749        let mut unraveler = CompositeRepDefUnraveler::new(vec![unravel1, unravel2]);
2750
2751        assert!(unraveler.unravel_validity(9).is_none());
2752        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2753        assert_eq!(
2754            off.inner(),
2755            offsets_32(&[0, 2, 2, 5, 6, 8, 10, 12, 14]).inner()
2756        );
2757        assert_eq!(
2758            val,
2759            Some(validity(&[true, false, true, true, true, true, true, true]))
2760        );
2761    }
2762
2763    #[test]
2764    fn test_repdef_multiple_builders() {
2765        // Basic case, rep & def
2766        let mut builder1 = RepDefBuilder::default();
2767        builder1.add_offsets(offsets_64(&[0, 2]), None);
2768        builder1.add_offsets(offsets_64(&[0, 1, 3]), None);
2769        builder1.add_validity_bitmap(validity(&[true, true, true]));
2770
2771        let mut builder2 = RepDefBuilder::default();
2772        builder2.add_offsets(offsets_64(&[0, 0, 3]), Some(validity(&[false, true])));
2773        builder2.add_offsets(
2774            offsets_64(&[0, 2, 2, 6]),
2775            Some(validity(&[true, false, true])),
2776        );
2777        builder2.add_validity_bitmap(validity(&[false, false, false, true, true, false]));
2778
2779        let repdefs = RepDefBuilder::serialize(vec![builder1, builder2]);
2780
2781        let rep = repdefs.repetition_levels.unwrap();
2782        let def = repdefs.definition_levels.unwrap();
2783
2784        assert_eq!([2, 1, 0, 2, 2, 0, 1, 1, 0, 0, 0], *rep);
2785        assert_eq!([0, 0, 0, 3, 1, 1, 2, 1, 0, 0, 1], *def);
2786    }
2787
2788    #[test]
2789    fn test_slicer() {
2790        let mut builder = RepDefBuilder::default();
2791        builder.add_offsets(
2792            offsets_64(&[0, 2, 2, 30, 30]),
2793            Some(validity(&[true, false, true, true])),
2794        );
2795        builder.add_no_null(30);
2796
2797        let repdefs = RepDefBuilder::serialize(vec![builder]);
2798
2799        let mut rep_slicer = repdefs.rep_slicer().unwrap();
2800
2801        // First 5 items include a null list so we get 6 levels (12 bytes)
2802        assert_eq!(rep_slicer.slice_next(5).len(), 12);
2803        // Next 20 are all plain
2804        assert_eq!(rep_slicer.slice_next(20).len(), 40);
2805        // Last 5 include an empty list so we get 6 levels (12 bytes)
2806        assert_eq!(rep_slicer.slice_rest().len(), 12);
2807
2808        let mut def_slicer = repdefs.rep_slicer().unwrap();
2809
2810        // First 5 items include a null list so we get 6 levels (12 bytes)
2811        assert_eq!(def_slicer.slice_next(5).len(), 12);
2812        // Next 20 are all plain
2813        assert_eq!(def_slicer.slice_next(20).len(), 40);
2814        // Last 5 include an empty list so we get 6 levels (12 bytes)
2815        assert_eq!(def_slicer.slice_rest().len(), 12);
2816    }
2817
2818    #[test]
2819    fn test_control_words() {
2820        // Convert to control words, verify expected, convert back, verify same as original
2821        fn check(
2822            rep: &[u16],
2823            def: &[u16],
2824            expected_values: Vec<u8>,
2825            expected_bytes_per_word: usize,
2826            expected_bits_rep: u8,
2827            expected_bits_def: u8,
2828        ) {
2829            let num_vals = rep.len().max(def.len());
2830            let max_rep = rep.iter().max().copied().unwrap_or(0);
2831            let max_def = def.iter().max().copied().unwrap_or(0);
2832
2833            let in_rep = if rep.is_empty() { None } else { Some(rep) };
2834            let in_def = if def.is_empty() { None } else { Some(def) };
2835
2836            let mut iter = super::build_control_word_iterator(
2837                in_rep,
2838                max_rep,
2839                in_def,
2840                max_def,
2841                max_def + 1,
2842                expected_values.len(),
2843            );
2844            assert_eq!(iter.bytes_per_word(), expected_bytes_per_word);
2845            assert_eq!(iter.bits_rep(), expected_bits_rep);
2846            assert_eq!(iter.bits_def(), expected_bits_def);
2847            let mut cw_vec = Vec::with_capacity(num_vals * iter.bytes_per_word());
2848
2849            for _ in 0..num_vals {
2850                iter.append_next(&mut cw_vec);
2851            }
2852            assert!(iter.append_next(&mut cw_vec).is_none());
2853
2854            assert_eq!(expected_values, cw_vec);
2855
2856            let parser = super::ControlWordParser::new(expected_bits_rep, expected_bits_def);
2857
2858            let mut rep_out = Vec::with_capacity(num_vals);
2859            let mut def_out = Vec::with_capacity(num_vals);
2860
2861            if expected_bytes_per_word > 0 {
2862                for slice in cw_vec.chunks_exact(expected_bytes_per_word) {
2863                    parser.parse(slice, &mut rep_out, &mut def_out);
2864                }
2865            }
2866
2867            assert_eq!(rep, rep_out.as_slice());
2868            assert_eq!(def, def_out.as_slice());
2869        }
2870
2871        // Each will need 4 bits and so we should get 1-byte control words
2872        let rep = &[0_u16, 7, 3, 2, 9, 8, 12, 5];
2873        let def = &[5_u16, 3, 1, 2, 12, 15, 0, 2];
2874        let expected = vec![
2875            0b00000101, // 0, 5
2876            0b01110011, // 7, 3
2877            0b00110001, // 3, 1
2878            0b00100010, // 2, 2
2879            0b10011100, // 9, 12
2880            0b10001111, // 8, 15
2881            0b11000000, // 12, 0
2882            0b01010010, // 5, 2
2883        ];
2884        check(rep, def, expected, 1, 4, 4);
2885
2886        // Now we need 5 bits for def so we get 2-byte control words
2887        let rep = &[0_u16, 7, 3, 2, 9, 8, 12, 5];
2888        let def = &[5_u16, 3, 1, 2, 12, 22, 0, 2];
2889        let expected = vec![
2890            0b00000101, 0b00000000, // 0, 5
2891            0b11100011, 0b00000000, // 7, 3
2892            0b01100001, 0b00000000, // 3, 1
2893            0b01000010, 0b00000000, // 2, 2
2894            0b00101100, 0b00000001, // 9, 12
2895            0b00010110, 0b00000001, // 8, 22
2896            0b10000000, 0b00000001, // 12, 0
2897            0b10100010, 0b00000000, // 5, 2
2898        ];
2899        check(rep, def, expected, 2, 4, 5);
2900
2901        // Just rep, 4 bits so 1 byte each
2902        let levels = &[0_u16, 7, 3, 2, 9, 8, 12, 5];
2903        let expected = vec![
2904            0b00000000, // 0
2905            0b00000111, // 7
2906            0b00000011, // 3
2907            0b00000010, // 2
2908            0b00001001, // 9
2909            0b00001000, // 8
2910            0b00001100, // 12
2911            0b00000101, // 5
2912        ];
2913        check(levels, &[], expected.clone(), 1, 4, 0);
2914
2915        // Just def
2916        check(&[], levels, expected, 1, 0, 4);
2917
2918        // No rep, no def, no bytes
2919        check(&[], &[], Vec::default(), 0, 0, 0);
2920    }
2921
2922    #[test]
2923    fn test_control_words_rep_index() {
2924        fn check(
2925            rep: &[u16],
2926            def: &[u16],
2927            expected_new_rows: Vec<bool>,
2928            expected_is_visible: Vec<bool>,
2929        ) {
2930            let num_vals = rep.len().max(def.len());
2931            let max_rep = rep.iter().max().copied().unwrap_or(0);
2932            let max_def = def.iter().max().copied().unwrap_or(0);
2933
2934            let in_rep = if rep.is_empty() { None } else { Some(rep) };
2935            let in_def = if def.is_empty() { None } else { Some(def) };
2936
2937            let mut iter = super::build_control_word_iterator(
2938                in_rep,
2939                max_rep,
2940                in_def,
2941                max_def,
2942                /*max_visible_def=*/ 2,
2943                expected_new_rows.len(),
2944            );
2945
2946            let mut cw_vec = Vec::with_capacity(num_vals * iter.bytes_per_word());
2947            let mut expected_new_rows = expected_new_rows.iter().copied();
2948            let mut expected_is_visible = expected_is_visible.iter().copied();
2949            for _ in 0..expected_new_rows.len() {
2950                let word_desc = iter.append_next(&mut cw_vec).unwrap();
2951                assert_eq!(word_desc.is_new_row, expected_new_rows.next().unwrap());
2952                assert_eq!(word_desc.is_visible, expected_is_visible.next().unwrap());
2953            }
2954            assert!(iter.append_next(&mut cw_vec).is_none());
2955        }
2956
2957        // 2 means new list
2958        let rep = &[2_u16, 1, 0, 2, 2, 0, 1, 1, 0, 2, 0];
2959        // These values don't matter for this test
2960        let def = &[0_u16, 0, 0, 3, 1, 1, 2, 1, 0, 0, 1];
2961
2962        // Rep & def
2963        check(
2964            rep,
2965            def,
2966            vec![
2967                true, false, false, true, true, false, false, false, false, true, false,
2968            ],
2969            vec![
2970                true, true, true, false, true, true, true, true, true, true, true,
2971            ],
2972        );
2973        // Rep only
2974        check(
2975            rep,
2976            &[],
2977            vec![
2978                true, false, false, true, true, false, false, false, false, true, false,
2979            ],
2980            vec![true; 11],
2981        );
2982        // No repetition
2983        check(
2984            &[],
2985            def,
2986            vec![
2987                true, true, true, true, true, true, true, true, true, true, true,
2988            ],
2989            vec![true; 11],
2990        );
2991        // No repetition, no definition
2992        check(
2993            &[],
2994            &[],
2995            vec![
2996                true, true, true, true, true, true, true, true, true, true, true,
2997            ],
2998            vec![true; 11],
2999        );
3000    }
3001
3002    #[test]
3003    fn regress_empty_list_case() {
3004        // This regresses a case where we had 3 null lists inside a struct
3005        let mut builder = RepDefBuilder::default();
3006        builder.add_validity_bitmap(validity(&[true, false, true]));
3007        builder.add_offsets(
3008            offsets_32(&[0, 0, 0, 0]),
3009            Some(validity(&[false, false, false])),
3010        );
3011        builder.add_no_null(0);
3012
3013        let repdefs = RepDefBuilder::serialize(vec![builder]);
3014        let rep = repdefs.repetition_levels.unwrap();
3015        let def = repdefs.definition_levels.unwrap();
3016
3017        assert_eq!([1, 1, 1], *rep);
3018        assert_eq!([1, 2, 1], *def);
3019
3020        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
3021            Some(rep.as_ref().to_vec()),
3022            Some(def.as_ref().to_vec()),
3023            repdefs.def_meaning.into(),
3024            0,
3025        )]);
3026
3027        assert_eq!(unraveler.unravel_validity(0), None);
3028        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
3029        assert_eq!(off.inner(), offsets_32(&[0, 0, 0, 0]).inner());
3030        assert_eq!(val, Some(validity(&[false, false, false])));
3031        let val = unraveler.unravel_validity(3).unwrap();
3032        assert_eq!(val.inner(), validity(&[true, false, true]).inner());
3033    }
3034
3035    #[test]
3036    fn regress_list_ends_null_case() {
3037        let mut builder = RepDefBuilder::default();
3038        builder.add_offsets(
3039            offsets_64(&[0, 1, 2, 2]),
3040            Some(validity(&[true, true, false])),
3041        );
3042        builder.add_offsets(offsets_64(&[0, 1, 1]), Some(validity(&[true, false])));
3043        builder.add_no_null(1);
3044
3045        let repdefs = RepDefBuilder::serialize(vec![builder]);
3046        let rep = repdefs.repetition_levels.unwrap();
3047        let def = repdefs.definition_levels.unwrap();
3048
3049        assert_eq!([2, 2, 2], *rep);
3050        assert_eq!([0, 1, 2], *def);
3051
3052        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
3053            Some(rep.as_ref().to_vec()),
3054            Some(def.as_ref().to_vec()),
3055            repdefs.def_meaning.into(),
3056            1,
3057        )]);
3058
3059        assert_eq!(unraveler.unravel_validity(1), None);
3060        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
3061        assert_eq!(off.inner(), offsets_32(&[0, 1, 1]).inner());
3062        assert_eq!(val, Some(validity(&[true, false])));
3063        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
3064        assert_eq!(off.inner(), offsets_32(&[0, 1, 2, 2]).inner());
3065        assert_eq!(val, Some(validity(&[true, true, false])));
3066    }
3067
3068    #[test]
3069    fn test_mixed_unraveler() {
3070        // This tests cases where the validity is different between two different pages
3071        // because one page has nulls and the other doesn't.
3072
3073        // Simple case with one layer of validity and no repetition
3074        let mut unraveler = CompositeRepDefUnraveler::new(vec![
3075            RepDefUnraveler::new(
3076                None,
3077                Some(vec![0, 1, 0, 1]),
3078                vec![DefinitionInterpretation::NullableItem].into(),
3079                4,
3080            ),
3081            RepDefUnraveler::new(
3082                None,
3083                None,
3084                vec![DefinitionInterpretation::AllValidItem].into(),
3085                4,
3086            ),
3087        ]);
3088
3089        assert_eq!(
3090            unraveler.unravel_validity(8),
3091            Some(validity(&[
3092                true, false, true, false, true, true, true, true
3093            ]))
3094        );
3095
3096        // More complex case with two layers of validity and repetition
3097        let def1 = Some(vec![0, 1, 2]);
3098        let rep1 = Some(vec![1, 0, 1]);
3099
3100        let def2 = Some(vec![1, 0, 0]);
3101        let rep2 = Some(vec![1, 1, 0]);
3102
3103        let mut unraveler = CompositeRepDefUnraveler::new(vec![
3104            RepDefUnraveler::new(
3105                rep1,
3106                def1,
3107                vec![
3108                    DefinitionInterpretation::NullableItem,
3109                    DefinitionInterpretation::EmptyableList,
3110                ]
3111                .into(),
3112                2,
3113            ),
3114            RepDefUnraveler::new(
3115                rep2,
3116                def2,
3117                vec![
3118                    DefinitionInterpretation::AllValidItem,
3119                    DefinitionInterpretation::NullableList,
3120                ]
3121                .into(),
3122                2,
3123            ),
3124        ]);
3125
3126        assert_eq!(
3127            unraveler.unravel_validity(4),
3128            Some(validity(&[true, false, true, true]))
3129        );
3130        assert_eq!(
3131            unraveler.unravel_offsets::<i32>().unwrap(),
3132            (
3133                offsets_32(&[0, 2, 2, 2, 4]),
3134                Some(validity(&[true, true, false, true]))
3135            )
3136        );
3137    }
3138}