lance_encoding/
repdef.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Utilities for rep-def levels
5//!
6//! Repetition and definition levels are a way to encode multipile validity / offsets arrays
7//! into a single buffer.  They are a form of "zipping" buffers together that takes advantage
8//! of the fact that, if the outermost array is invalid, then the validity of the inner items
9//! is irrelevant.
10//!
11//! Note: the concept of repetition & definition levels comes from the Dremel paper and has
12//! been implemented in Apache Parquet.  However, the implementation here is not necessarily
13//! compatible with Parquet.  For example, we use 0 to represent the "inner-most" item and
14//! Parquet uses 0 to represent the "outer-most" item.
15//!
16//! # Repetition Levels
17//!
18//! With repetition levels we convert a sparse array of offsets into a dense array of levels.
19//! These levels are marked non-zero whenever a new list begins.  In other words, given the
20//! list array with 3 rows [{<0,1>, <>, <2>}, {<3>}, {}], [], [{<4>}] we would have three
21//! offsets arrays:
22//!
23//! Outer-most ([]): [0, 3, 3, 4]
24//! Middle     ({}): [0, 3, 4, 4, 5]
25//! Inner      (<>): [0, 2, 2, 3, 4, 5]
26//! Values         : [0, 1, 2, 3, 4]
27//!
28//! We can convert these into repetition levels as follows:
29//!
30//! | Values | Repetition |
31//! | ------ | ---------- |
32//! |      0 |          3 | // Start of outer-most list
33//! |      1 |          0 | // Continues inner-most list (no new lists)
34//! |      - |          1 | // Start of new inner-most list (empty list)
35//! |      2 |          1 | // Start of new inner-most list
36//! |      3 |          2 | // Start of new middle list
37//! |      - |          2 | // Start of new inner-most list (empty list)
38//! |      - |          3 | // Start of new outer-most list (empty list)
39//! |      4 |          0 | // Start of new outer-most list
40//!
41//! Note: We actually have MORE repetition levels than values.  This is because the repetition
42//! levels need to be able to represent empty lists.
43//!
44//! # Definition Levels
45//!
46//! Definition levels are simpler.  We can think of them as zipping together various validity bitmaps
47//! (from different levels of nesting) into a single buffer.  For example, we could zip the arrays
48//! [1, 1, 0, 0] and [1, 0, 1, 0] into [11, 10, 01, 00].  However, 00 and 01 are redundant.  If the
49//! outer level is null then the validity of the inner levels is irrelevant.  To save space we instead
50//! encode a "level" which is the "depth" of the null.  Let's look at a more complete example:
51//!
52//! Array: [{"middle": {"inner": 1]}}, NULL, {"middle": NULL}, {"middle": {"inner": NULL}}]
53//!
54//! In Arrow we would have the following validity arrays:
55//! Outer validity : 1, 0, 1, 1
56//! Middle validity: 1, ?, 0, 1
57//! Inner validity : 1, ?, ?, 0
58//! Values         : 1, ?, ?, ?
59//!
60//! The ? values are undefined in the Arrow format.  We can convert these into definition levels as follows:
61//!
62//! | Values | Definition |
63//! | ------ | ---------- |
64//! |      1 |          0 | // Valid at all levels
65//! |      - |          3 | // Null at outer level
66//! |      - |          2 | // Null at middle level
67//! |      - |          1 | // Null at inner level
68//!
69//! # Compression
70//!
71//! Note that we only need 2 bits of definition levels to represent 3 levels of nesting.  Definition
72//! levels are always more compact than the input validity arrays.  However, compressed levels are not
73//! necessarily more compact than the compressed validity arrays.
74//!
75//! Repetition levels are more complex.  If there are very large lists then a sparse array of offsets
76//! (which has one element per list) might be more compact than a dense array of repetition levels
77//! (which has one element per list value, possibly even more if there are empty lists).
78//!
79//! However, both repetition levels and definition levels are typically very compressible with RLE.
80//!
81//! However, in Lance we don't always take advantage of that compression because we want to be able
82//! to zip rep-def levels together with our values.  This gives us fewer IOPS when accessing row values.
83//!
84//! # Utilities in this Module
85//!
86//! - `RepDefBuilder` - Extracts validity and offset information from Arrow arrays.  We use this as we
87//!   shred the incoming data into primitive leaf arrays.  We don't immediately convert into rep-def because
88//!   we need to share and cheaply clone the builder when we have structs (each struct child shares some parent
89//!   validity / offset information)  The `serialize` method is called once all data has been received to create
90//!   the final rep-def levels.
91//!
92//! - `SerializerContext` - This is an internal utility that helps with serializing rep-def levels.
93//!
94//! - `CompositeRepDefUnraveler` - This structure is used to reverse the process.  It starts with a set of
95//!   rep-def levels and then uses the `unravel_validity` and `unravel_offsets` methods to produce validity
96//!   buffers and offset buffers.  It is "composite" because we may be combining sets of rep-def buffers from
97//!   multiple locations (e.g. multiple blocks in a mini-block encoded file).
98//!
99//! - `RepDefSlicer` - This is a utility that helps with slicing rep-def buffers.  These buffers are "kind of"
100//!   transparent (maps 1:1 with the values in the array) but not exactly because of the special (empty/null) lists.
101//!   The slicer helps with this issue and is used when slicing a rep-def buffer into mini-blocks.
102//!
103//! - `build_control_word_iterator` - This takes in rep-def levels and returns an iterator that returns byte-padded
104//!   "control words" which are used when creating full-zip encoded data.
105//!
106//! - `ControlWordParser` - This parser can parse the control words returned by `build_control_word_iterator` and is
107//!   used when decoding full-zip encoded data.
108
109use std::{
110    iter::{Copied, Zip},
111    sync::Arc,
112};
113
114use arrow_array::OffsetSizeTrait;
115use arrow_buffer::{
116    ArrowNativeType, BooleanBuffer, BooleanBufferBuilder, NullBuffer, OffsetBuffer, ScalarBuffer,
117};
118use lance_core::{utils::bit::log_2_ceil, Error, Result};
119use snafu::location;
120
121use crate::buffer::LanceBuffer;
122
123pub type LevelBuffer = Vec<u16>;
124
125// As we build def levels we add this to special values to indicate that they
126// are special so that we can skip over them when processing lower levels.
127//
128// We assume 16 bits is good enough for rep-def levels.  This _would_ give
129// us 65536 levels of struct nesting and list nesting.  However, we cut that
130// in half for SPECIAL_THRESHOLD because we use the top bit to indicate if an
131// item is a special value (null list / empty list) during construction.
132//
133// We subtract this off at the end of construction to get the actual definition
134// levels.
135const SPECIAL_THRESHOLD: u16 = u16::MAX / 2;
136
137/// Represents information that we extract from a list array as we are
138/// encoding
139#[derive(Clone, Debug)]
140struct OffsetDesc {
141    offsets: Arc<[i64]>,
142    validity: Option<BooleanBuffer>,
143    has_empty_lists: bool,
144    num_values: usize,
145    num_specials: usize,
146}
147
148/// Represents validity information that we extract from non-list arrays (that
149/// have nulls) as we are encoding
150#[derive(Clone, Debug)]
151struct ValidityDesc {
152    validity: Option<BooleanBuffer>,
153    num_values: usize,
154}
155
156/// Represents validity information that we extract from FSL arrays.  This is
157/// just validity (no offsets) but we also record the dimension of the FSL array
158/// as that will impact the next layer
159#[derive(Clone, Debug)]
160struct FslDesc {
161    validity: Option<BooleanBuffer>,
162    dimension: usize,
163    num_values: usize,
164}
165
166// As we build up rep/def from arrow arrays we record a
167// series of RawRepDef objects.  Each one corresponds to layer
168// in the array structure
169#[derive(Clone, Debug)]
170enum RawRepDef {
171    Offsets(OffsetDesc),
172    Validity(ValidityDesc),
173    Fsl(FslDesc),
174}
175
176impl RawRepDef {
177    // Are there any nulls in this layer
178    fn has_nulls(&self) -> bool {
179        match self {
180            Self::Offsets(OffsetDesc { validity, .. }) => validity.is_some(),
181            Self::Validity(ValidityDesc { validity, .. }) => validity.is_some(),
182            Self::Fsl(FslDesc { validity, .. }) => validity.is_some(),
183        }
184    }
185
186    // How many values are in this layer
187    fn num_values(&self) -> usize {
188        match self {
189            Self::Offsets(OffsetDesc { num_values, .. }) => *num_values,
190            Self::Validity(ValidityDesc { num_values, .. }) => *num_values,
191            Self::Fsl(FslDesc { num_values, .. }) => *num_values,
192        }
193    }
194
195    /// How many empty/null lists are in this layer
196    fn num_specials(&self) -> usize {
197        match self {
198            Self::Offsets(OffsetDesc { num_specials, .. }) => *num_specials,
199            _ => 0,
200        }
201    }
202
203    /// How many definition levels do we need for this layer
204    fn max_def(&self) -> u16 {
205        match self {
206            Self::Offsets(OffsetDesc {
207                has_empty_lists,
208                validity,
209                ..
210            }) => {
211                let mut max_def = 0;
212                if *has_empty_lists {
213                    max_def += 1;
214                }
215                if validity.is_some() {
216                    max_def += 1;
217                }
218                max_def
219            }
220            Self::Validity(ValidityDesc { validity: None, .. }) => 0,
221            Self::Validity(ValidityDesc { .. }) => 1,
222            Self::Fsl(FslDesc { validity: None, .. }) => 0,
223            Self::Fsl(FslDesc { .. }) => 1,
224        }
225    }
226
227    /// How many repetition levels do we need for this layer
228    fn max_rep(&self) -> u16 {
229        match self {
230            Self::Offsets(_) => 1,
231            _ => 0,
232        }
233    }
234}
235
236/// Represents repetition and definition levels that have been
237/// serialized into a pair of (optional) level buffers
238#[derive(Debug)]
239pub struct SerializedRepDefs {
240    /// The repetition levels, one per item
241    ///
242    /// If None, there are no lists
243    pub repetition_levels: Option<Arc<[u16]>>,
244    /// The definition levels, one per item
245    ///
246    /// If None, there are no nulls
247    pub definition_levels: Option<Arc<[u16]>>,
248    /// The meaning of each definition level
249    pub def_meaning: Vec<DefinitionInterpretation>,
250    /// The maximum level that is "visible" from the lowest level
251    ///
252    /// This is the last level before we encounter a list level of some kind.  Once we've
253    /// hit a list level then nulls in any level beyond do not map to actual items.
254    ///
255    /// This is None if there are no lists
256    pub max_visible_level: Option<u16>,
257}
258
259impl SerializedRepDefs {
260    pub fn new(
261        repetition_levels: Option<LevelBuffer>,
262        definition_levels: Option<LevelBuffer>,
263        def_meaning: Vec<DefinitionInterpretation>,
264    ) -> Self {
265        let first_list = def_meaning.iter().position(|level| level.is_list());
266        let max_visible_level = first_list.map(|first_list| {
267            def_meaning
268                .iter()
269                .map(|level| level.num_def_levels())
270                .take(first_list)
271                .sum::<u16>()
272        });
273        Self {
274            repetition_levels: repetition_levels.map(Arc::from),
275            definition_levels: definition_levels.map(Arc::from),
276            def_meaning,
277            max_visible_level,
278        }
279    }
280
281    /// Creates an empty SerializedRepDefs (no repetition, all valid)
282    pub fn empty(def_meaning: Vec<DefinitionInterpretation>) -> Self {
283        Self {
284            repetition_levels: None,
285            definition_levels: None,
286            def_meaning,
287            max_visible_level: None,
288        }
289    }
290
291    pub fn rep_slicer(&self) -> Option<RepDefSlicer<'_>> {
292        self.repetition_levels
293            .as_ref()
294            .map(|rep| RepDefSlicer::new(self, rep.clone()))
295    }
296
297    pub fn def_slicer(&self) -> Option<RepDefSlicer<'_>> {
298        self.definition_levels
299            .as_ref()
300            .map(|def| RepDefSlicer::new(self, def.clone()))
301    }
302}
303
304/// Slices a level buffer into pieces
305///
306/// This is needed to handle the fact that a level buffer may have more
307/// levels than values due to special (empty/null) lists.
308///
309/// As a result, a call to `slice_next(10)` may return 10 levels or it may
310/// return more than 10 levels if any special values are encountered.
311#[derive(Debug)]
312pub struct RepDefSlicer<'a> {
313    repdef: &'a SerializedRepDefs,
314    to_slice: LanceBuffer,
315    current: usize,
316}
317
318// TODO: All of this logic will need some changing when we compress rep/def levels.
319impl<'a> RepDefSlicer<'a> {
320    fn new(repdef: &'a SerializedRepDefs, levels: Arc<[u16]>) -> Self {
321        Self {
322            repdef,
323            to_slice: LanceBuffer::reinterpret_slice(levels),
324            current: 0,
325        }
326    }
327
328    pub fn num_levels(&self) -> usize {
329        self.to_slice.len() / 2
330    }
331
332    pub fn num_levels_remaining(&self) -> usize {
333        self.num_levels() - self.current
334    }
335
336    pub fn all_levels(&self) -> &LanceBuffer {
337        &self.to_slice
338    }
339
340    /// Returns the rest of the levels not yet sliced
341    ///
342    /// This must be called instead of `slice_next` on the final iteration.
343    /// This is because anytime we slice there may be empty/null lists on the
344    /// boundary that are "free" and the current behavior in `slice_next` is to
345    /// leave them for the next call.
346    ///
347    /// `slice_rest` will slice all remaining levels and return them.
348    pub fn slice_rest(&mut self) -> LanceBuffer {
349        let start = self.current;
350        let remaining = self.num_levels_remaining();
351        self.current = self.num_levels();
352        self.to_slice.slice_with_length(start * 2, remaining * 2)
353    }
354
355    /// Returns enough levels to satisfy the next `num_values` values
356    pub fn slice_next(&mut self, num_values: usize) -> LanceBuffer {
357        let start = self.current;
358        let Some(max_visible_level) = self.repdef.max_visible_level else {
359            // No lists, should be 1:1 mapping from levels to values
360            self.current = start + num_values;
361            return self.to_slice.slice_with_length(start * 2, num_values * 2);
362        };
363        if let Some(def) = self.repdef.definition_levels.as_ref() {
364            // There are lists and there are def levels.  That means there may be
365            // more rep/def levels than values.  We need to scan the def levels to figure
366            // out which items are "invisible" and skip over them
367            let mut def_itr = def[start..].iter();
368            let mut num_taken = 0;
369            let mut num_passed = 0;
370            while num_taken < num_values {
371                let def_level = *def_itr.next().unwrap();
372                if def_level <= max_visible_level {
373                    num_taken += 1;
374                }
375                num_passed += 1;
376            }
377            self.current = start + num_passed;
378            self.to_slice.slice_with_length(start * 2, num_passed * 2)
379        } else {
380            // No def levels, should be 1:1 mapping from levels to values
381            self.current = start + num_values;
382            self.to_slice.slice_with_length(start * 2, num_values * 2)
383        }
384    }
385}
386
387/// This tells us how an array handles definition.  Given a stack of
388/// these and a nested array and a set of definition levels we can calculate
389/// how we should interpret the definition levels.
390///
391/// For example, if the interpretation is [AllValidItem, NullableItem] then
392/// a 0 means "valid item" and a 1 means "null struct".  If the interpretation
393/// is [NullableItem, NullableItem] then a 0 means "valid item" and a 1 means
394/// "null item" and a 2 means "null struct".
395///
396/// Lists are tricky because we might use up to two definition levels for a
397/// single layer of list nesting because we need one value to indicate "empty list"
398/// and another value to indicate "null list".
399#[derive(Debug, Copy, Clone, PartialEq, Eq)]
400pub enum DefinitionInterpretation {
401    AllValidItem,
402    AllValidList,
403    NullableItem,
404    NullableList,
405    EmptyableList,
406    NullableAndEmptyableList,
407}
408
409impl DefinitionInterpretation {
410    /// How many definition levels do we need for this layer
411    pub fn num_def_levels(&self) -> u16 {
412        match self {
413            Self::AllValidItem => 0,
414            Self::AllValidList => 0,
415            Self::NullableItem => 1,
416            Self::NullableList => 1,
417            Self::EmptyableList => 1,
418            Self::NullableAndEmptyableList => 2,
419        }
420    }
421
422    /// Does this layer have nulls?
423    pub fn is_all_valid(&self) -> bool {
424        matches!(
425            self,
426            Self::AllValidItem | Self::AllValidList | Self::EmptyableList
427        )
428    }
429
430    /// Does this layer represent a list?
431    pub fn is_list(&self) -> bool {
432        matches!(
433            self,
434            Self::AllValidList
435                | Self::NullableList
436                | Self::EmptyableList
437                | Self::NullableAndEmptyableList
438        )
439    }
440}
441
442/// The RepDefBuilder is used to collect offsets & validity buffers
443/// from arrow structures.  Once we have those we use the SerializerContext
444/// to build the actual repetition and definition levels.
445///
446/// We know ahead of time how many rep/def levels we will need (number of items
447/// in inner-most array + the number of empty/null lists in any parent arrays).
448///
449/// As a result we try and avoid any re-allocations by pre-allocating the buffers
450/// up front.  We allocate two copies of each buffer which allows us to avoid unsafe
451/// code caused by reading and writing to the same buffer (also, it's unavoidable
452/// because there are times we need to write 'faster' than we read)
453#[derive(Debug)]
454struct SerializerContext {
455    // This is built from outer-to-inner and then reversed at the end
456    def_meaning: Vec<DefinitionInterpretation>,
457    rep_levels: LevelBuffer,
458    spare_rep: LevelBuffer,
459    def_levels: LevelBuffer,
460    spare_def: LevelBuffer,
461    current_rep: u16,
462    current_def: u16,
463    current_len: usize,
464    current_num_specials: usize,
465}
466
467impl SerializerContext {
468    fn new(len: usize, num_layers: usize, max_rep: u16, max_def: u16) -> Self {
469        let def_meaning = Vec::with_capacity(num_layers);
470        Self {
471            rep_levels: if max_rep > 0 {
472                vec![0; len]
473            } else {
474                LevelBuffer::default()
475            },
476            spare_rep: if max_rep > 0 {
477                vec![0; len]
478            } else {
479                LevelBuffer::default()
480            },
481            def_levels: if max_def > 0 {
482                vec![0; len]
483            } else {
484                LevelBuffer::default()
485            },
486            spare_def: if max_def > 0 {
487                vec![0; len]
488            } else {
489                LevelBuffer::default()
490            },
491            def_meaning,
492            current_rep: max_rep,
493            current_def: max_def,
494            current_len: 0,
495            current_num_specials: 0,
496        }
497    }
498
499    fn checkout_def(&mut self, meaning: DefinitionInterpretation) -> u16 {
500        let def = self.current_def;
501        self.current_def -= meaning.num_def_levels();
502        self.def_meaning.push(meaning);
503        def
504    }
505
506    fn record_offsets(&mut self, offset_desc: &OffsetDesc) {
507        let rep_level = self.current_rep;
508        let (null_list_level, empty_list_level) =
509            match (offset_desc.validity.is_some(), offset_desc.has_empty_lists) {
510                (true, true) => {
511                    let level =
512                        self.checkout_def(DefinitionInterpretation::NullableAndEmptyableList);
513                    (level - 1, level)
514                }
515                (true, false) => (self.checkout_def(DefinitionInterpretation::NullableList), 0),
516                (false, true) => (
517                    0,
518                    self.checkout_def(DefinitionInterpretation::EmptyableList),
519                ),
520                (false, false) => {
521                    self.checkout_def(DefinitionInterpretation::AllValidList);
522                    (0, 0)
523                }
524            };
525        self.current_rep -= 1;
526
527        if let Some(validity) = &offset_desc.validity {
528            self.do_record_validity(validity, null_list_level);
529        }
530
531        // We write into the spare buffers and read from the active buffers
532        // and then swap at the end.  This way we don't write over what we
533        // are reading.
534
535        let mut new_len = 0;
536        assert!(self.rep_levels.len() >= (offset_desc.num_values + self.current_num_specials) - 1);
537        if self.def_levels.is_empty() {
538            let mut write_itr = self.spare_rep.iter_mut();
539            let mut read_iter = self.rep_levels.iter().copied();
540            for w in offset_desc.offsets.windows(2) {
541                let len = w[1] - w[0];
542                // len can't be 0 because then we'd have def levels
543                assert!(len > 0);
544                let rep = read_iter.next().unwrap();
545                let list_level = if rep == 0 { rep_level } else { rep };
546                *write_itr.next().unwrap() = list_level;
547
548                for _ in 1..len {
549                    *write_itr.next().unwrap() = 0;
550                }
551                new_len += len as usize;
552            }
553            std::mem::swap(&mut self.rep_levels, &mut self.spare_rep);
554        } else {
555            assert!(
556                self.def_levels.len() >= (offset_desc.num_values + self.current_num_specials) - 1
557            );
558            let mut def_write_itr = self.spare_def.iter_mut();
559            let mut rep_write_itr = self.spare_rep.iter_mut();
560            let mut rep_read_itr = self.rep_levels.iter().copied();
561            let mut def_read_itr = self.def_levels.iter().copied();
562            let specials_to_pass = self.current_num_specials;
563            let mut specials_passed = 0;
564
565            for w in offset_desc.offsets.windows(2) {
566                let mut def = def_read_itr.next().unwrap();
567                // Copy over any higher-level special values in place
568                while def > SPECIAL_THRESHOLD {
569                    *def_write_itr.next().unwrap() = def;
570                    *rep_write_itr.next().unwrap() = rep_read_itr.next().unwrap();
571                    def = def_read_itr.next().unwrap();
572                    new_len += 1;
573                    specials_passed += 1;
574                }
575
576                let len = w[1] - w[0];
577                let rep = rep_read_itr.next().unwrap();
578
579                // If the rep_level is 0 then we are the first list level
580                // otherwise we are starting a higher level list so keep
581                // existing rep level
582                let list_level = if rep == 0 { rep_level } else { rep };
583
584                if def == 0 && len > 0 {
585                    // New valid list, write a rep level and then add new 0/0 items
586                    *def_write_itr.next().unwrap() = 0;
587                    *rep_write_itr.next().unwrap() = list_level;
588
589                    for _ in 1..len {
590                        *def_write_itr.next().unwrap() = 0;
591                        *rep_write_itr.next().unwrap() = 0;
592                    }
593
594                    new_len += len as usize;
595                } else if def == 0 {
596                    // Empty list, insert new special
597                    *def_write_itr.next().unwrap() = empty_list_level + SPECIAL_THRESHOLD;
598                    *rep_write_itr.next().unwrap() = list_level;
599                    new_len += 1;
600                } else {
601                    // Either the list is null or one of its struct parents
602                    // is null.  Promote it to a special value.
603                    *def_write_itr.next().unwrap() = def + SPECIAL_THRESHOLD;
604                    *rep_write_itr.next().unwrap() = list_level;
605                    new_len += 1;
606                }
607            }
608
609            // If we have any special values at the end, we need to copy them over
610            while specials_passed < specials_to_pass {
611                *def_write_itr.next().unwrap() = def_read_itr.next().unwrap();
612                *rep_write_itr.next().unwrap() = rep_read_itr.next().unwrap();
613                new_len += 1;
614                specials_passed += 1;
615            }
616            std::mem::swap(&mut self.def_levels, &mut self.spare_def);
617            std::mem::swap(&mut self.rep_levels, &mut self.spare_rep);
618        }
619
620        self.current_len = new_len;
621        self.current_num_specials += offset_desc.num_specials;
622    }
623
624    fn do_record_validity(&mut self, validity: &BooleanBuffer, null_level: u16) {
625        assert!(self.def_levels.len() >= validity.len() + self.current_num_specials);
626        debug_assert!(
627            self.current_len == 0 || self.current_len == validity.len() + self.current_num_specials
628        );
629        self.current_len = validity.len();
630
631        let mut def_read_itr = self.def_levels.iter().copied();
632        let mut def_write_itr = self.spare_def.iter_mut();
633
634        let specials_to_pass = self.current_num_specials;
635        let mut specials_passed = 0;
636
637        for incoming_validity in validity.iter() {
638            let mut def = def_read_itr.next().unwrap();
639            while def > SPECIAL_THRESHOLD {
640                *def_write_itr.next().unwrap() = def;
641                def = def_read_itr.next().unwrap();
642                specials_passed += 1;
643            }
644            if def == 0 && !incoming_validity {
645                *def_write_itr.next().unwrap() = null_level;
646            } else {
647                *def_write_itr.next().unwrap() = def;
648            }
649        }
650
651        while specials_passed < specials_to_pass {
652            *def_write_itr.next().unwrap() = def_read_itr.next().unwrap();
653            specials_passed += 1;
654        }
655
656        std::mem::swap(&mut self.def_levels, &mut self.spare_def);
657    }
658
659    fn multiply_levels(&mut self, multiplier: usize) {
660        let old_len = self.current_len;
661        // All non-special values will be broadcasted by the multiplier.  Special values are copied as-is.
662        self.current_len =
663            (self.current_len - self.current_num_specials) * multiplier + self.current_num_specials;
664
665        if self.rep_levels.is_empty() && self.def_levels.is_empty() {
666            // All valid with no rep/def levels, nothing to do
667            return;
668        } else if self.rep_levels.is_empty() {
669            assert!(self.def_levels.len() >= self.current_len);
670            // No rep levels, just multiply the def levels
671            let mut def_read_itr = self.def_levels.iter().copied();
672            let mut def_write_itr = self.spare_def.iter_mut();
673            for _ in 0..old_len {
674                let mut def = def_read_itr.next().unwrap();
675                while def > SPECIAL_THRESHOLD {
676                    *def_write_itr.next().unwrap() = def;
677                    def = def_read_itr.next().unwrap();
678                }
679                for _ in 0..multiplier {
680                    *def_write_itr.next().unwrap() = def;
681                }
682            }
683        } else if self.def_levels.is_empty() {
684            assert!(self.rep_levels.len() >= self.current_len);
685            // No def levels, just multiply the rep levels
686            let mut rep_read_itr = self.rep_levels.iter().copied();
687            let mut rep_write_itr = self.spare_rep.iter_mut();
688            for _ in 0..old_len {
689                let rep = rep_read_itr.next().unwrap();
690                for _ in 0..multiplier {
691                    *rep_write_itr.next().unwrap() = rep;
692                }
693            }
694        } else {
695            assert!(self.rep_levels.len() >= self.current_len);
696            assert!(self.def_levels.len() >= self.current_len);
697            let mut rep_read_itr = self.rep_levels.iter().copied();
698            let mut def_read_itr = self.def_levels.iter().copied();
699            let mut rep_write_itr = self.spare_rep.iter_mut();
700            let mut def_write_itr = self.spare_def.iter_mut();
701            for _ in 0..old_len {
702                let mut def = def_read_itr.next().unwrap();
703                while def > SPECIAL_THRESHOLD {
704                    *def_write_itr.next().unwrap() = def;
705                    *rep_write_itr.next().unwrap() = rep_read_itr.next().unwrap();
706                    def = def_read_itr.next().unwrap();
707                }
708                let rep = rep_read_itr.next().unwrap();
709                for _ in 0..multiplier {
710                    *def_write_itr.next().unwrap() = def;
711                    *rep_write_itr.next().unwrap() = rep;
712                }
713            }
714        }
715        std::mem::swap(&mut self.def_levels, &mut self.spare_def);
716        std::mem::swap(&mut self.rep_levels, &mut self.spare_rep);
717    }
718
719    fn record_validity_buf(&mut self, validity: &Option<BooleanBuffer>) {
720        if let Some(validity) = validity {
721            let def_level = self.checkout_def(DefinitionInterpretation::NullableItem);
722            self.do_record_validity(validity, def_level);
723        } else {
724            self.checkout_def(DefinitionInterpretation::AllValidItem);
725        }
726    }
727
728    fn record_validity(&mut self, validity_desc: &ValidityDesc) {
729        self.record_validity_buf(&validity_desc.validity)
730    }
731
732    fn record_fsl(&mut self, fsl_desc: &FslDesc) {
733        self.record_validity_buf(&fsl_desc.validity);
734        self.multiply_levels(fsl_desc.dimension);
735    }
736
737    fn normalize_specials(&mut self) {
738        for def in self.def_levels.iter_mut() {
739            if *def > SPECIAL_THRESHOLD {
740                *def -= SPECIAL_THRESHOLD;
741            }
742        }
743    }
744
745    fn build(mut self) -> SerializedRepDefs {
746        if self.current_len == 0 {
747            return SerializedRepDefs::new(None, None, self.def_meaning);
748        }
749
750        self.normalize_specials();
751
752        let definition_levels = if self.def_levels.is_empty() {
753            None
754        } else {
755            Some(self.def_levels)
756        };
757        let repetition_levels = if self.rep_levels.is_empty() {
758            None
759        } else {
760            Some(self.rep_levels)
761        };
762
763        // Need to reverse the def meaning since we build rep / def levels in reverse
764        let def_meaning = self.def_meaning.into_iter().rev().collect::<Vec<_>>();
765
766        SerializedRepDefs::new(repetition_levels, definition_levels, def_meaning)
767    }
768}
769
770/// A structure used to collect validity buffers and offsets from arrow
771/// arrays and eventually create repetition and definition levels
772///
773/// As we are encoding the structural encoders are given this struct and
774/// will record the arrow information into it.  Once we hit a leaf node we
775/// serialize the data into rep/def levels and write these into the page.
776#[derive(Clone, Default, Debug)]
777pub struct RepDefBuilder {
778    // The rep/def info we have collected so far
779    repdefs: Vec<RawRepDef>,
780    // The current length, can get larger as we traverse lists (e.g. an
781    // array might have 5 lists which results in 50 items)
782    //
783    // Starts uninitialized until we see the first rep/def item
784    len: Option<usize>,
785}
786
787impl RepDefBuilder {
788    fn check_validity_len(&mut self, incoming_len: usize) {
789        if let Some(len) = self.len {
790            assert_eq!(incoming_len, len);
791        } else {
792            // First validity buffer we've seen
793            self.len = Some(incoming_len);
794        }
795    }
796
797    fn num_layers(&self) -> usize {
798        self.repdefs.len()
799    }
800
801    /// The builder is "empty" if there is no repetition and no nulls.  In this case we don't need
802    /// to store anything to disk (except the description)
803    pub fn is_empty(&self) -> bool {
804        self.repdefs
805            .iter()
806            .all(|r| matches!(r, RawRepDef::Validity(ValidityDesc { validity: None, .. })))
807    }
808
809    /// Returns true if there is only a single layer of definition
810    pub fn is_simple_validity(&self) -> bool {
811        self.repdefs.len() == 1 && matches!(self.repdefs[0], RawRepDef::Validity(_))
812    }
813
814    /// Registers a nullable validity bitmap
815    pub fn add_validity_bitmap(&mut self, validity: NullBuffer) {
816        self.check_validity_len(validity.len());
817        self.repdefs.push(RawRepDef::Validity(ValidityDesc {
818            num_values: validity.len(),
819            validity: Some(validity.into_inner()),
820        }));
821    }
822
823    /// Registers an all-valid validity layer
824    pub fn add_no_null(&mut self, len: usize) {
825        self.check_validity_len(len);
826        self.repdefs.push(RawRepDef::Validity(ValidityDesc {
827            validity: None,
828            num_values: len,
829        }));
830    }
831
832    pub fn add_fsl(&mut self, validity: Option<NullBuffer>, dimension: usize, num_values: usize) {
833        if let Some(len) = self.len {
834            assert_eq!(num_values, len);
835        }
836        self.len = Some(num_values * dimension);
837        debug_assert!(validity.is_none() || validity.as_ref().unwrap().len() == num_values);
838        self.repdefs.push(RawRepDef::Fsl(FslDesc {
839            num_values,
840            validity: validity.map(|v| v.into_inner()),
841            dimension,
842        }))
843    }
844
845    fn check_offset_len(&mut self, offsets: &[i64]) {
846        if let Some(len) = self.len {
847            assert!(offsets.len() == len + 1);
848        }
849        self.len = Some(offsets[offsets.len() - 1] as usize);
850    }
851
852    fn do_add_offsets(
853        &mut self,
854        lengths: impl Iterator<Item = i64>,
855        validity: Option<NullBuffer>,
856        capacity: usize,
857    ) -> bool {
858        let mut num_specials = 0;
859        let mut has_empty_lists = false;
860        let mut has_garbage_values = false;
861        let mut last_off: i64 = 0;
862
863        let mut normalized_offsets = Vec::with_capacity(capacity);
864        normalized_offsets.push(0);
865
866        if let Some(ref validity) = validity {
867            for (len, is_valid) in lengths.zip(validity.iter()) {
868                match (is_valid, len == 0) {
869                    (false, is_empty) => {
870                        num_specials += 1;
871                        has_garbage_values |= !is_empty;
872                    }
873                    (true, true) => {
874                        num_specials += 1;
875                        has_empty_lists = true;
876                    }
877                    _ => {
878                        last_off += len;
879                    }
880                }
881                normalized_offsets.push(last_off);
882            }
883        } else {
884            for len in lengths {
885                if len == 0 {
886                    num_specials += 1;
887                    has_empty_lists = true;
888                }
889                last_off += len;
890                normalized_offsets.push(last_off);
891            }
892        }
893
894        self.check_offset_len(&normalized_offsets);
895        self.repdefs.push(RawRepDef::Offsets(OffsetDesc {
896            num_values: normalized_offsets.len() - 1,
897            offsets: normalized_offsets.into(),
898            validity: validity.map(|v| v.into_inner()),
899            has_empty_lists,
900            num_specials: num_specials as usize,
901        }));
902
903        has_garbage_values
904    }
905
906    /// Adds a layer of offsets
907    ///
908    /// Offsets are casted to a common type (i64) and also normalized.  Null lists are
909    /// always represented by a zero-length (identical) pair of offsets and so the caller
910    /// should filter out any garbage items before encoding them.  To assist with this the
911    /// method will return true if any non-empty null lists were found.
912    pub fn add_offsets<O: OffsetSizeTrait>(
913        &mut self,
914        offsets: OffsetBuffer<O>,
915        validity: Option<NullBuffer>,
916    ) -> bool {
917        let inner = offsets.into_inner();
918        let buffer_len = inner.len();
919
920        if O::IS_LARGE {
921            let i64_buff = ScalarBuffer::<i64>::new(inner.into_inner(), 0, buffer_len);
922            let lengths = i64_buff.windows(2).map(|off| off[1] - off[0]);
923            self.do_add_offsets(lengths, validity, buffer_len)
924        } else {
925            let i32_buff = ScalarBuffer::<i32>::new(inner.into_inner(), 0, buffer_len);
926            let lengths = i32_buff.windows(2).map(|off| (off[1] - off[0]) as i64);
927            self.do_add_offsets(lengths, validity, buffer_len)
928        }
929    }
930
931    // When we are encoding data it arrives in batches.  For each batch we create a RepDefBuilder and collect the
932    // various validity buffers and offset buffers from that batch.  Once we have enough batches to write a page we
933    // need to take this collection of RepDefBuilders and concatenate them and then serialize them into rep/def levels.
934    //
935    // TODO: In the future, we may concatenate and serialize at the same time?
936    //
937    // This method takes care of the concatenation part.  First we collect all of layer 0 from each builder, then we
938    // call this method.  Then we collect all of layer 1 from each builder and call this method.  And so on.
939    //
940    // That means this method should get a collection of `RawRepDef` where each item is the same kind (all validity or
941    // all offsets) though the nullability / lengths may be different in each layer.
942    fn concat_layers<'a>(
943        layers: impl Iterator<Item = &'a RawRepDef>,
944        num_layers: usize,
945    ) -> RawRepDef {
946        enum LayerKind {
947            Validity,
948            Fsl,
949            Offsets,
950        }
951
952        // We make two passes through the layers.  The first determines if we need to pay the cost of allocating
953        // buffers.  The second pass actually adds the values.
954        let mut collected = Vec::with_capacity(num_layers);
955        let mut has_nulls = false;
956        let mut layer_kind = LayerKind::Validity;
957        let mut total_num_specials = 0;
958        let mut all_dimension = 0;
959        let mut all_has_empty_lists = false;
960        let mut all_num_values = 0;
961        for layer in layers {
962            has_nulls |= layer.has_nulls();
963            match layer {
964                RawRepDef::Validity(_) => {
965                    layer_kind = LayerKind::Validity;
966                }
967                RawRepDef::Offsets(OffsetDesc {
968                    num_specials,
969                    has_empty_lists,
970                    ..
971                }) => {
972                    all_has_empty_lists |= *has_empty_lists;
973                    layer_kind = LayerKind::Offsets;
974                    total_num_specials += num_specials;
975                }
976                RawRepDef::Fsl(FslDesc { dimension, .. }) => {
977                    layer_kind = LayerKind::Fsl;
978                    all_dimension = *dimension;
979                }
980            }
981            collected.push(layer);
982            all_num_values += layer.num_values();
983        }
984
985        // Shortcut if there are no nulls
986        if !has_nulls {
987            match layer_kind {
988                LayerKind::Validity => {
989                    return RawRepDef::Validity(ValidityDesc {
990                        validity: None,
991                        num_values: all_num_values,
992                    });
993                }
994                LayerKind::Fsl => {
995                    return RawRepDef::Fsl(FslDesc {
996                        validity: None,
997                        num_values: all_num_values,
998                        dimension: all_dimension,
999                    })
1000                }
1001                LayerKind::Offsets => {}
1002            }
1003        }
1004
1005        // Only allocate if needed
1006        let mut validity_builder = if has_nulls {
1007            BooleanBufferBuilder::new(all_num_values)
1008        } else {
1009            BooleanBufferBuilder::new(0)
1010        };
1011        let mut all_offsets = if matches!(layer_kind, LayerKind::Offsets) {
1012            let mut all_offsets = Vec::with_capacity(all_num_values);
1013            all_offsets.push(0);
1014            all_offsets
1015        } else {
1016            Vec::new()
1017        };
1018
1019        for layer in collected {
1020            match layer {
1021                RawRepDef::Validity(ValidityDesc {
1022                    validity: Some(validity),
1023                    ..
1024                }) => {
1025                    validity_builder.append_buffer(validity);
1026                }
1027                RawRepDef::Validity(ValidityDesc {
1028                    validity: None,
1029                    num_values,
1030                }) => {
1031                    validity_builder.append_n(*num_values, true);
1032                }
1033                RawRepDef::Fsl(FslDesc {
1034                    validity,
1035                    num_values,
1036                    ..
1037                }) => {
1038                    if let Some(validity) = validity {
1039                        validity_builder.append_buffer(validity);
1040                    } else {
1041                        validity_builder.append_n(*num_values, true);
1042                    }
1043                }
1044                RawRepDef::Offsets(OffsetDesc {
1045                    offsets,
1046                    validity: Some(validity),
1047                    has_empty_lists,
1048                    ..
1049                }) => {
1050                    all_has_empty_lists |= has_empty_lists;
1051                    validity_builder.append_buffer(validity);
1052                    let last = *all_offsets.last().unwrap();
1053                    all_offsets.extend(offsets.iter().skip(1).map(|off| *off + last));
1054                }
1055                RawRepDef::Offsets(OffsetDesc {
1056                    offsets,
1057                    validity: None,
1058                    has_empty_lists,
1059                    num_values,
1060                    ..
1061                }) => {
1062                    all_has_empty_lists |= has_empty_lists;
1063                    if has_nulls {
1064                        validity_builder.append_n(*num_values, true);
1065                    }
1066                    let last = *all_offsets.last().unwrap();
1067                    all_offsets.extend(offsets.iter().skip(1).map(|off| *off + last));
1068                }
1069            }
1070        }
1071        let validity = if has_nulls {
1072            Some(validity_builder.finish())
1073        } else {
1074            None
1075        };
1076        match layer_kind {
1077            LayerKind::Fsl => RawRepDef::Fsl(FslDesc {
1078                validity,
1079                num_values: all_num_values,
1080                dimension: all_dimension,
1081            }),
1082            LayerKind::Validity => RawRepDef::Validity(ValidityDesc {
1083                validity,
1084                num_values: all_num_values,
1085            }),
1086            LayerKind::Offsets => RawRepDef::Offsets(OffsetDesc {
1087                offsets: all_offsets.into(),
1088                validity,
1089                has_empty_lists: all_has_empty_lists,
1090                num_values: all_num_values,
1091                num_specials: total_num_specials,
1092            }),
1093        }
1094    }
1095
1096    /// Converts the validity / offsets buffers that have been gathered so far
1097    /// into repetition and definition levels
1098    pub fn serialize(builders: Vec<Self>) -> SerializedRepDefs {
1099        assert!(!builders.is_empty());
1100        if builders.iter().all(|b| b.is_empty()) {
1101            // No repetition, all-valid
1102            return SerializedRepDefs::empty(
1103                builders
1104                    .first()
1105                    .unwrap()
1106                    .repdefs
1107                    .iter()
1108                    .map(|_| DefinitionInterpretation::AllValidItem)
1109                    .collect::<Vec<_>>(),
1110            );
1111        }
1112
1113        let num_layers = builders[0].num_layers();
1114        let combined_layers = (0..num_layers)
1115            .map(|layer_index| {
1116                Self::concat_layers(
1117                    builders.iter().map(|b| &b.repdefs[layer_index]),
1118                    builders.len(),
1119                )
1120            })
1121            .collect::<Vec<_>>();
1122        debug_assert!(builders
1123            .iter()
1124            .all(|b| b.num_layers() == builders[0].num_layers()));
1125
1126        let total_len = combined_layers.last().unwrap().num_values()
1127            + combined_layers
1128                .iter()
1129                .map(|l| l.num_specials())
1130                .sum::<usize>();
1131        let max_rep = combined_layers.iter().map(|l| l.max_rep()).sum::<u16>();
1132        let max_def = combined_layers.iter().map(|l| l.max_def()).sum::<u16>();
1133
1134        let mut context = SerializerContext::new(total_len, num_layers, max_rep, max_def);
1135        for layer in combined_layers.into_iter() {
1136            match layer {
1137                RawRepDef::Validity(def) => {
1138                    context.record_validity(&def);
1139                }
1140                RawRepDef::Offsets(rep) => {
1141                    context.record_offsets(&rep);
1142                }
1143                RawRepDef::Fsl(fsl) => {
1144                    context.record_fsl(&fsl);
1145                }
1146            }
1147        }
1148        context.build()
1149    }
1150}
1151
1152/// Starts with serialized repetition and definition levels and unravels
1153/// them into validity buffers and offsets buffers
1154///
1155/// This is used during decoding to create the necessary arrow structures
1156#[derive(Debug)]
1157pub struct RepDefUnraveler {
1158    rep_levels: Option<LevelBuffer>,
1159    def_levels: Option<LevelBuffer>,
1160    // Maps from definition level to the rep level at which that definition level is visible
1161    levels_to_rep: Vec<u16>,
1162    def_meaning: Arc<[DefinitionInterpretation]>,
1163    // Current definition level to compare to.
1164    current_def_cmp: u16,
1165    // Current rep level, determines which specials we can see
1166    current_rep_cmp: u16,
1167    // Current layer index, 0 means inner-most layer and it counts up from there.  Used to index
1168    // into special_defs
1169    current_layer: usize,
1170}
1171
1172impl RepDefUnraveler {
1173    /// Creates a new unraveler from serialized repetition and definition information
1174    pub fn new(
1175        rep_levels: Option<LevelBuffer>,
1176        def_levels: Option<LevelBuffer>,
1177        def_meaning: Arc<[DefinitionInterpretation]>,
1178    ) -> Self {
1179        let mut levels_to_rep = Vec::with_capacity(def_meaning.len());
1180        let mut rep_counter = 0;
1181        // Level=0 is always visible and means valid item
1182        levels_to_rep.push(0);
1183        for meaning in def_meaning.as_ref() {
1184            match meaning {
1185                DefinitionInterpretation::AllValidItem | DefinitionInterpretation::AllValidList => {
1186                    // There is no corresponding level, so nothing to put in levels_to_rep
1187                }
1188                DefinitionInterpretation::NullableItem => {
1189                    // Some null structs are not visible at inner rep levels in cases like LIST<STRUCT<LIST<...>>>
1190                    levels_to_rep.push(rep_counter);
1191                }
1192                DefinitionInterpretation::NullableList => {
1193                    rep_counter += 1;
1194                    levels_to_rep.push(rep_counter);
1195                }
1196                DefinitionInterpretation::EmptyableList => {
1197                    rep_counter += 1;
1198                    levels_to_rep.push(rep_counter);
1199                }
1200                DefinitionInterpretation::NullableAndEmptyableList => {
1201                    rep_counter += 1;
1202                    levels_to_rep.push(rep_counter);
1203                    levels_to_rep.push(rep_counter);
1204                }
1205            }
1206        }
1207        Self {
1208            rep_levels,
1209            def_levels,
1210            current_def_cmp: 0,
1211            current_rep_cmp: 0,
1212            levels_to_rep,
1213            current_layer: 0,
1214            def_meaning,
1215        }
1216    }
1217
1218    pub fn is_all_valid(&self) -> bool {
1219        self.def_meaning[self.current_layer].is_all_valid()
1220    }
1221
1222    /// If the current level is a repetition layer then this returns the number of lists
1223    /// at this level.
1224    ///
1225    /// This is not valid to call when the current level is a struct/primitive layer because
1226    /// in some cases there may be no rep or def information to know this.
1227    pub fn max_lists(&self) -> usize {
1228        debug_assert!(
1229            self.def_meaning[self.current_layer] != DefinitionInterpretation::NullableItem
1230        );
1231        self.rep_levels
1232            .as_ref()
1233            // Worst case every rep item is max_rep and a new list
1234            .map(|levels| levels.len())
1235            .unwrap_or(0)
1236    }
1237
1238    /// Unravels a layer of offsets from the unraveler into the given offset width
1239    ///
1240    /// When decoding a list the caller should first unravel the offsets and then
1241    /// unravel the validity (this is the opposite order used during encoding)
1242    pub fn unravel_offsets<T: ArrowNativeType>(
1243        &mut self,
1244        offsets: &mut Vec<T>,
1245        validity: Option<&mut BooleanBufferBuilder>,
1246    ) -> Result<()> {
1247        let rep_levels = self
1248            .rep_levels
1249            .as_mut()
1250            .expect("Expected repetition level but data didn't contain repetition");
1251        let valid_level = self.current_def_cmp;
1252        let (null_level, empty_level) = match self.def_meaning[self.current_layer] {
1253            DefinitionInterpretation::NullableList => {
1254                self.current_def_cmp += 1;
1255                (valid_level + 1, 0)
1256            }
1257            DefinitionInterpretation::EmptyableList => {
1258                self.current_def_cmp += 1;
1259                (0, valid_level + 1)
1260            }
1261            DefinitionInterpretation::NullableAndEmptyableList => {
1262                self.current_def_cmp += 2;
1263                (valid_level + 1, valid_level + 2)
1264            }
1265            DefinitionInterpretation::AllValidList => (0, 0),
1266            _ => unreachable!(),
1267        };
1268        self.current_layer += 1;
1269
1270        // This is the highest def level that is still visible.  Once we hit a list then
1271        // we stop looking because any null / empty list (or list masked by a higher level
1272        // null) will not be visible
1273        let mut max_level = null_level.max(empty_level);
1274        // Anything higher than this (but less than max_level) is a null struct masking our
1275        // list.  We will materialize this is a null list.
1276        let upper_null = max_level;
1277        for level in self.def_meaning[self.current_layer..].iter() {
1278            match level {
1279                DefinitionInterpretation::NullableItem => {
1280                    max_level += 1;
1281                }
1282                DefinitionInterpretation::AllValidItem => {}
1283                _ => {
1284                    break;
1285                }
1286            }
1287        }
1288
1289        let mut curlen: usize = offsets.last().map(|o| o.as_usize()).unwrap_or(0);
1290
1291        // If offsets is empty this is a no-op.  If offsets is not empty that means we already
1292        // added a set of offsets.  For example, we might have added [0, 3, 5] (2 lists).  Now
1293        // say we want to add [0, 1, 4] (2 lists).  We should get [0, 3, 5, 6, 9] (4 lists).  If
1294        // we don't pop here we get [0, 3, 5, 5, 6, 9] which is wrong.
1295        //
1296        // Or, to think about it another way, if every unraveler adds the starting 0 and the trailing
1297        // length then we have N + unravelers.len() values instead of N + 1.
1298        offsets.pop();
1299
1300        let to_offset = |val: usize| {
1301            T::from_usize(val)
1302            .ok_or_else(|| Error::invalid_input("A single batch had more than i32::MAX values and so a large container type is required", location!()))
1303        };
1304        self.current_rep_cmp += 1;
1305        if let Some(def_levels) = &mut self.def_levels {
1306            assert!(rep_levels.len() == def_levels.len());
1307            // It's possible validity is None even if we have def levels.  For example, we might have
1308            // empty lists (which require def levels) but no nulls.
1309            let mut push_validity: Box<dyn FnMut(bool)> = if let Some(validity) = validity {
1310                Box::new(|is_valid| validity.append(is_valid))
1311            } else {
1312                Box::new(|_| {})
1313            };
1314            // This is a strange access pattern.  We are iterating over the rep/def levels and
1315            // at the same time writing the rep/def levels.  This means we need both a mutable
1316            // and immutable reference to the rep/def levels.
1317            let mut read_idx = 0;
1318            let mut write_idx = 0;
1319            while read_idx < rep_levels.len() {
1320                // SAFETY: We assert that rep_levels and def_levels have the same
1321                // len and read_idx and write_idx can never go past the end.
1322                unsafe {
1323                    let rep_val = *rep_levels.get_unchecked(read_idx);
1324                    if rep_val != 0 {
1325                        let def_val = *def_levels.get_unchecked(read_idx);
1326                        // Copy over
1327                        *rep_levels.get_unchecked_mut(write_idx) = rep_val - 1;
1328                        *def_levels.get_unchecked_mut(write_idx) = def_val;
1329                        write_idx += 1;
1330
1331                        if def_val == 0 {
1332                            // This is a valid list
1333                            offsets.push(to_offset(curlen)?);
1334                            curlen += 1;
1335                            push_validity(true);
1336                        } else if def_val > max_level {
1337                            // This is not visible at this rep level, do not add to offsets, but keep in repdef
1338                        } else if def_val == null_level || def_val > upper_null {
1339                            // This is a null list (or a list masked by a null struct)
1340                            offsets.push(to_offset(curlen)?);
1341                            push_validity(false);
1342                        } else if def_val == empty_level {
1343                            // This is an empty list
1344                            offsets.push(to_offset(curlen)?);
1345                            push_validity(true);
1346                        } else {
1347                            // New valid list starting with null item
1348                            offsets.push(to_offset(curlen)?);
1349                            curlen += 1;
1350                            push_validity(true);
1351                        }
1352                    } else {
1353                        curlen += 1;
1354                    }
1355                    read_idx += 1;
1356                }
1357            }
1358            offsets.push(to_offset(curlen)?);
1359            rep_levels.truncate(write_idx);
1360            def_levels.truncate(write_idx);
1361            Ok(())
1362        } else {
1363            // SAFETY: See above loop
1364            let mut read_idx = 0;
1365            let mut write_idx = 0;
1366            let old_offsets_len = offsets.len();
1367            while read_idx < rep_levels.len() {
1368                // SAFETY: read_idx / write_idx cannot go past rep_levels.len()
1369                unsafe {
1370                    let rep_val = *rep_levels.get_unchecked(read_idx);
1371                    if rep_val != 0 {
1372                        // Finish the current list
1373                        offsets.push(to_offset(curlen)?);
1374                        *rep_levels.get_unchecked_mut(write_idx) = rep_val - 1;
1375                        write_idx += 1;
1376                    }
1377                    curlen += 1;
1378                    read_idx += 1;
1379                }
1380            }
1381            let num_new_lists = offsets.len() - old_offsets_len;
1382            offsets.push(to_offset(curlen)?);
1383            rep_levels.truncate(offsets.len() - 1);
1384            if let Some(validity) = validity {
1385                // Even though we don't have validity it is possible another unraveler did and so we need
1386                // to push all valids
1387                validity.append_n(num_new_lists, true);
1388            }
1389            Ok(())
1390        }
1391    }
1392
1393    pub fn skip_validity(&mut self) {
1394        debug_assert!(
1395            self.def_meaning[self.current_layer] == DefinitionInterpretation::AllValidItem
1396        );
1397        self.current_layer += 1;
1398    }
1399
1400    /// Unravels a layer of validity from the definition levels
1401    pub fn unravel_validity(&mut self, validity: &mut BooleanBufferBuilder) {
1402        debug_assert!(
1403            self.def_meaning[self.current_layer] != DefinitionInterpretation::AllValidItem
1404        );
1405        self.current_layer += 1;
1406
1407        let def_levels = &self.def_levels.as_ref().unwrap();
1408
1409        let current_def_cmp = self.current_def_cmp;
1410        self.current_def_cmp += 1;
1411
1412        for is_valid in def_levels.iter().filter_map(|&level| {
1413            if self.levels_to_rep[level as usize] <= self.current_rep_cmp {
1414                Some(level <= current_def_cmp)
1415            } else {
1416                None
1417            }
1418        }) {
1419            validity.append(is_valid);
1420        }
1421    }
1422
1423    pub fn decimate(&mut self, dimension: usize) {
1424        if self.rep_levels.is_some() {
1425            // If we need to support this then I think we need to walk through the rep def levels to find
1426            // the spots at which we keep.  E.g. if we have:
1427            //  rep: 1 0 0 1 0 1 0 0 0 1 0 0
1428            //  def: 1 1 1 0 1 0 1 1 0 1 1 0
1429            //  dimension: 2
1430            //
1431            // The output should be:
1432            //  rep: 1 0 0 1 0 0 0
1433            //  def: 1 1 1 0 1 1 0
1434            //
1435            // Maybe there's some special logic for empty/null lists?  I'll save the headache for future me.
1436            todo!("Not yet supported FSL<...List<...>>");
1437        }
1438        let Some(def_levels) = self.def_levels.as_mut() else {
1439            return;
1440        };
1441        let mut read_idx = 0;
1442        let mut write_idx = 0;
1443        while read_idx < def_levels.len() {
1444            unsafe {
1445                *def_levels.get_unchecked_mut(write_idx) = *def_levels.get_unchecked(read_idx);
1446            }
1447            write_idx += 1;
1448            read_idx += dimension;
1449        }
1450        def_levels.truncate(write_idx);
1451    }
1452}
1453
1454/// As we decode we may extract rep/def information from multiple pages (or multiple
1455/// chunks within a page).
1456///
1457/// For each chunk we create an unraveler.  Each unraveler can have a completely different
1458/// interpretation (e.g. one page might contain null items but no null structs and the next
1459/// page might have null structs but no null items).
1460///
1461/// Concatenating these unravelers would be tricky and expensive so instead we have a
1462/// composite unraveler which unravels across multiple unravelers.
1463///
1464/// Note: this class should be used even if there is only one page / unraveler.  This is
1465/// because the `RepDefUnraveler`'s API is more complex (it's meant to be called by this
1466/// class)
1467#[derive(Debug)]
1468pub struct CompositeRepDefUnraveler {
1469    unravelers: Vec<RepDefUnraveler>,
1470}
1471
1472impl CompositeRepDefUnraveler {
1473    pub fn new(unravelers: Vec<RepDefUnraveler>) -> Self {
1474        Self { unravelers }
1475    }
1476
1477    /// Unravels a layer of validity
1478    ///
1479    /// Returns None if there are no null items in this layer
1480    pub fn unravel_validity(&mut self, num_values: usize) -> Option<NullBuffer> {
1481        let is_all_valid = self
1482            .unravelers
1483            .iter()
1484            .all(|unraveler| unraveler.is_all_valid());
1485
1486        if is_all_valid {
1487            for unraveler in self.unravelers.iter_mut() {
1488                unraveler.skip_validity();
1489            }
1490            None
1491        } else {
1492            let mut validity = BooleanBufferBuilder::new(num_values);
1493            for unraveler in self.unravelers.iter_mut() {
1494                unraveler.unravel_validity(&mut validity);
1495            }
1496            Some(NullBuffer::new(validity.finish()))
1497        }
1498    }
1499
1500    pub fn unravel_fsl_validity(
1501        &mut self,
1502        num_values: usize,
1503        dimension: usize,
1504    ) -> Option<NullBuffer> {
1505        for unraveler in self.unravelers.iter_mut() {
1506            unraveler.decimate(dimension);
1507        }
1508        self.unravel_validity(num_values)
1509    }
1510
1511    /// Unravels a layer of offsets (and the validity for that layer)
1512    pub fn unravel_offsets<T: ArrowNativeType>(
1513        &mut self,
1514    ) -> Result<(OffsetBuffer<T>, Option<NullBuffer>)> {
1515        let mut is_all_valid = true;
1516        let mut max_num_lists = 0;
1517        for unraveler in self.unravelers.iter() {
1518            is_all_valid &= unraveler.is_all_valid();
1519            max_num_lists += unraveler.max_lists();
1520        }
1521
1522        let mut validity = if is_all_valid {
1523            None
1524        } else {
1525            // Note: This is probably an over-estimate and potentially even an under-estimate.  We only know
1526            // right now how many items we have and not how many rows.  (TODO: Shouldn't we know the # of rows?)
1527            Some(BooleanBufferBuilder::new(max_num_lists))
1528        };
1529
1530        let mut offsets = Vec::with_capacity(max_num_lists + 1);
1531
1532        for unraveler in self.unravelers.iter_mut() {
1533            unraveler.unravel_offsets(&mut offsets, validity.as_mut())?;
1534        }
1535
1536        Ok((
1537            OffsetBuffer::new(ScalarBuffer::from(offsets)),
1538            validity.map(|mut v| NullBuffer::new(v.finish())),
1539        ))
1540    }
1541}
1542
1543/// A [`ControlWordIterator`] when there are both repetition and definition levels
1544///
1545/// The iterator will put the repetition level in the upper bits and the definition
1546/// level in the lower bits.  The number of bits used for each level is determined
1547/// by the width of the repetition and definition levels.
1548#[derive(Debug)]
1549pub struct BinaryControlWordIterator<I: Iterator<Item = (u16, u16)>, W> {
1550    repdef: I,
1551    def_width: usize,
1552    max_rep: u16,
1553    max_visible_def: u16,
1554    rep_mask: u16,
1555    def_mask: u16,
1556    bits_rep: u8,
1557    bits_def: u8,
1558    phantom: std::marker::PhantomData<W>,
1559}
1560
1561impl<I: Iterator<Item = (u16, u16)>> BinaryControlWordIterator<I, u8> {
1562    fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1563        let next = self.repdef.next()?;
1564        let control_word: u8 =
1565            (((next.0 & self.rep_mask) as u8) << self.def_width) + ((next.1 & self.def_mask) as u8);
1566        buf.push(control_word);
1567        let is_new_row = next.0 == self.max_rep;
1568        let is_visible = next.1 <= self.max_visible_def;
1569        let is_valid_item = next.1 == 0;
1570        Some(ControlWordDesc {
1571            is_new_row,
1572            is_visible,
1573            is_valid_item,
1574        })
1575    }
1576}
1577
1578impl<I: Iterator<Item = (u16, u16)>> BinaryControlWordIterator<I, u16> {
1579    fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1580        let next = self.repdef.next()?;
1581        let control_word: u16 =
1582            ((next.0 & self.rep_mask) << self.def_width) + (next.1 & self.def_mask);
1583        let control_word = control_word.to_le_bytes();
1584        buf.push(control_word[0]);
1585        buf.push(control_word[1]);
1586        let is_new_row = next.0 == self.max_rep;
1587        let is_visible = next.1 <= self.max_visible_def;
1588        let is_valid_item = next.1 == 0;
1589        Some(ControlWordDesc {
1590            is_new_row,
1591            is_visible,
1592            is_valid_item,
1593        })
1594    }
1595}
1596
1597impl<I: Iterator<Item = (u16, u16)>> BinaryControlWordIterator<I, u32> {
1598    fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1599        let next = self.repdef.next()?;
1600        let control_word: u32 = (((next.0 & self.rep_mask) as u32) << self.def_width)
1601            + ((next.1 & self.def_mask) as u32);
1602        let control_word = control_word.to_le_bytes();
1603        buf.push(control_word[0]);
1604        buf.push(control_word[1]);
1605        buf.push(control_word[2]);
1606        buf.push(control_word[3]);
1607        let is_new_row = next.0 == self.max_rep;
1608        let is_visible = next.1 <= self.max_visible_def;
1609        let is_valid_item = next.1 == 0;
1610        Some(ControlWordDesc {
1611            is_new_row,
1612            is_visible,
1613            is_valid_item,
1614        })
1615    }
1616}
1617
1618/// A [`ControlWordIterator`] when there are only definition levels or only repetition levels
1619#[derive(Debug)]
1620pub struct UnaryControlWordIterator<I: Iterator<Item = u16>, W> {
1621    repdef: I,
1622    level_mask: u16,
1623    bits_rep: u8,
1624    bits_def: u8,
1625    max_rep: u16,
1626    phantom: std::marker::PhantomData<W>,
1627}
1628
1629impl<I: Iterator<Item = u16>> UnaryControlWordIterator<I, u8> {
1630    fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1631        let next = self.repdef.next()?;
1632        buf.push((next & self.level_mask) as u8);
1633        let is_new_row = self.max_rep == 0 || next == self.max_rep;
1634        let is_valid_item = next == 0 || self.bits_def == 0;
1635        Some(ControlWordDesc {
1636            is_new_row,
1637            // Either there is no rep, in which case there are no invisible items
1638            // or there is no def, in which case there are no invisible items
1639            is_visible: true,
1640            is_valid_item,
1641        })
1642    }
1643}
1644
1645impl<I: Iterator<Item = u16>> UnaryControlWordIterator<I, u16> {
1646    fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1647        let next = self.repdef.next().unwrap() & self.level_mask;
1648        let control_word = next.to_le_bytes();
1649        buf.push(control_word[0]);
1650        buf.push(control_word[1]);
1651        let is_new_row = self.max_rep == 0 || next == self.max_rep;
1652        let is_valid_item = next == 0 || self.bits_def == 0;
1653        Some(ControlWordDesc {
1654            is_new_row,
1655            is_visible: true,
1656            is_valid_item,
1657        })
1658    }
1659}
1660
1661impl<I: Iterator<Item = u16>> UnaryControlWordIterator<I, u32> {
1662    fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1663        let next = self.repdef.next()?;
1664        let next = (next & self.level_mask) as u32;
1665        let control_word = next.to_le_bytes();
1666        buf.push(control_word[0]);
1667        buf.push(control_word[1]);
1668        buf.push(control_word[2]);
1669        buf.push(control_word[3]);
1670        let is_new_row = self.max_rep == 0 || next as u16 == self.max_rep;
1671        let is_valid_item = next == 0 || self.bits_def == 0;
1672        Some(ControlWordDesc {
1673            is_new_row,
1674            is_visible: true,
1675            is_valid_item,
1676        })
1677    }
1678}
1679
1680/// A [`ControlWordIterator`] when there are no repetition or definition levels
1681#[derive(Debug)]
1682pub struct NilaryControlWordIterator {
1683    len: usize,
1684    idx: usize,
1685}
1686
1687impl NilaryControlWordIterator {
1688    fn append_next(&mut self) -> Option<ControlWordDesc> {
1689        if self.idx == self.len {
1690            None
1691        } else {
1692            self.idx += 1;
1693            Some(ControlWordDesc {
1694                is_new_row: true,
1695                is_visible: true,
1696                is_valid_item: true,
1697            })
1698        }
1699    }
1700}
1701
1702/// Helper function to get a bit mask of the given width
1703fn get_mask(width: u16) -> u16 {
1704    (1 << width) - 1
1705}
1706
1707// We're really going out of our way to avoid boxing here but this will be called on a per-value basis
1708// so it is in the critical path.
1709type SpecificBinaryControlWordIterator<'a, T> = BinaryControlWordIterator<
1710    Zip<Copied<std::slice::Iter<'a, u16>>, Copied<std::slice::Iter<'a, u16>>>,
1711    T,
1712>;
1713
1714/// An iterator that generates control words from repetition and definition levels
1715///
1716/// "Control word" is just a fancy term for a single u8/u16/u32 that contains both
1717/// the repetition and definition in it.
1718///
1719/// In the large majority of case we only need a single byte to represent both the
1720/// repetition and definition levels.  However, if there is deep nesting then we may
1721/// need two bytes.  In the worst case we need 4 bytes though this suggests hundreds of
1722/// levels of nesting which seems unlikely to encounter in practice.
1723#[derive(Debug)]
1724pub enum ControlWordIterator<'a> {
1725    Binary8(SpecificBinaryControlWordIterator<'a, u8>),
1726    Binary16(SpecificBinaryControlWordIterator<'a, u16>),
1727    Binary32(SpecificBinaryControlWordIterator<'a, u32>),
1728    Unary8(UnaryControlWordIterator<Copied<std::slice::Iter<'a, u16>>, u8>),
1729    Unary16(UnaryControlWordIterator<Copied<std::slice::Iter<'a, u16>>, u16>),
1730    Unary32(UnaryControlWordIterator<Copied<std::slice::Iter<'a, u16>>, u32>),
1731    Nilary(NilaryControlWordIterator),
1732}
1733
1734/// Describes the properties of a control word
1735#[derive(Debug)]
1736pub struct ControlWordDesc {
1737    pub is_new_row: bool,
1738    pub is_visible: bool,
1739    pub is_valid_item: bool,
1740}
1741
1742impl ControlWordIterator<'_> {
1743    /// Appends the next control word to the buffer
1744    ///
1745    /// Returns true if this is the start of a new item (i.e. the repetition level is maxed out)
1746    pub fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1747        match self {
1748            Self::Binary8(iter) => iter.append_next(buf),
1749            Self::Binary16(iter) => iter.append_next(buf),
1750            Self::Binary32(iter) => iter.append_next(buf),
1751            Self::Unary8(iter) => iter.append_next(buf),
1752            Self::Unary16(iter) => iter.append_next(buf),
1753            Self::Unary32(iter) => iter.append_next(buf),
1754            Self::Nilary(iter) => iter.append_next(),
1755        }
1756    }
1757
1758    /// Return true if the control word iterator has repetition levels
1759    pub fn has_repetition(&self) -> bool {
1760        match self {
1761            Self::Binary8(_) | Self::Binary16(_) | Self::Binary32(_) => true,
1762            Self::Unary8(iter) => iter.bits_rep > 0,
1763            Self::Unary16(iter) => iter.bits_rep > 0,
1764            Self::Unary32(iter) => iter.bits_rep > 0,
1765            Self::Nilary(_) => false,
1766        }
1767    }
1768
1769    /// Returns the number of bytes per control word
1770    pub fn bytes_per_word(&self) -> usize {
1771        match self {
1772            Self::Binary8(_) => 1,
1773            Self::Binary16(_) => 2,
1774            Self::Binary32(_) => 4,
1775            Self::Unary8(_) => 1,
1776            Self::Unary16(_) => 2,
1777            Self::Unary32(_) => 4,
1778            Self::Nilary(_) => 0,
1779        }
1780    }
1781
1782    /// Returns the number of bits used for the repetition level
1783    pub fn bits_rep(&self) -> u8 {
1784        match self {
1785            Self::Binary8(iter) => iter.bits_rep,
1786            Self::Binary16(iter) => iter.bits_rep,
1787            Self::Binary32(iter) => iter.bits_rep,
1788            Self::Unary8(iter) => iter.bits_rep,
1789            Self::Unary16(iter) => iter.bits_rep,
1790            Self::Unary32(iter) => iter.bits_rep,
1791            Self::Nilary(_) => 0,
1792        }
1793    }
1794
1795    /// Returns the number of bits used for the definition level
1796    pub fn bits_def(&self) -> u8 {
1797        match self {
1798            Self::Binary8(iter) => iter.bits_def,
1799            Self::Binary16(iter) => iter.bits_def,
1800            Self::Binary32(iter) => iter.bits_def,
1801            Self::Unary8(iter) => iter.bits_def,
1802            Self::Unary16(iter) => iter.bits_def,
1803            Self::Unary32(iter) => iter.bits_def,
1804            Self::Nilary(_) => 0,
1805        }
1806    }
1807}
1808
1809/// Builds a [`ControlWordIterator`] from repetition and definition levels
1810/// by first calculating the width needed and then creating the iterator
1811/// with the appropriate width
1812pub fn build_control_word_iterator<'a>(
1813    rep: Option<&'a [u16]>,
1814    max_rep: u16,
1815    def: Option<&'a [u16]>,
1816    max_def: u16,
1817    max_visible_def: u16,
1818    len: usize,
1819) -> ControlWordIterator<'a> {
1820    let rep_width = if max_rep == 0 {
1821        0
1822    } else {
1823        log_2_ceil(max_rep as u32) as u16
1824    };
1825    let rep_mask = if max_rep == 0 { 0 } else { get_mask(rep_width) };
1826    let def_width = if max_def == 0 {
1827        0
1828    } else {
1829        log_2_ceil(max_def as u32) as u16
1830    };
1831    let def_mask = if max_def == 0 { 0 } else { get_mask(def_width) };
1832    let total_width = rep_width + def_width;
1833    match (rep, def) {
1834        (Some(rep), Some(def)) => {
1835            let iter = rep.iter().copied().zip(def.iter().copied());
1836            let def_width = def_width as usize;
1837            if total_width <= 8 {
1838                ControlWordIterator::Binary8(BinaryControlWordIterator {
1839                    repdef: iter,
1840                    rep_mask,
1841                    def_mask,
1842                    def_width,
1843                    max_rep,
1844                    max_visible_def,
1845                    bits_rep: rep_width as u8,
1846                    bits_def: def_width as u8,
1847                    phantom: std::marker::PhantomData,
1848                })
1849            } else if total_width <= 16 {
1850                ControlWordIterator::Binary16(BinaryControlWordIterator {
1851                    repdef: iter,
1852                    rep_mask,
1853                    def_mask,
1854                    def_width,
1855                    max_rep,
1856                    max_visible_def,
1857                    bits_rep: rep_width as u8,
1858                    bits_def: def_width as u8,
1859                    phantom: std::marker::PhantomData,
1860                })
1861            } else {
1862                ControlWordIterator::Binary32(BinaryControlWordIterator {
1863                    repdef: iter,
1864                    rep_mask,
1865                    def_mask,
1866                    def_width,
1867                    max_rep,
1868                    max_visible_def,
1869                    bits_rep: rep_width as u8,
1870                    bits_def: def_width as u8,
1871                    phantom: std::marker::PhantomData,
1872                })
1873            }
1874        }
1875        (Some(lev), None) => {
1876            let iter = lev.iter().copied();
1877            if total_width <= 8 {
1878                ControlWordIterator::Unary8(UnaryControlWordIterator {
1879                    repdef: iter,
1880                    level_mask: rep_mask,
1881                    bits_rep: total_width as u8,
1882                    bits_def: 0,
1883                    max_rep,
1884                    phantom: std::marker::PhantomData,
1885                })
1886            } else if total_width <= 16 {
1887                ControlWordIterator::Unary16(UnaryControlWordIterator {
1888                    repdef: iter,
1889                    level_mask: rep_mask,
1890                    bits_rep: total_width as u8,
1891                    bits_def: 0,
1892                    max_rep,
1893                    phantom: std::marker::PhantomData,
1894                })
1895            } else {
1896                ControlWordIterator::Unary32(UnaryControlWordIterator {
1897                    repdef: iter,
1898                    level_mask: rep_mask,
1899                    bits_rep: total_width as u8,
1900                    bits_def: 0,
1901                    max_rep,
1902                    phantom: std::marker::PhantomData,
1903                })
1904            }
1905        }
1906        (None, Some(lev)) => {
1907            let iter = lev.iter().copied();
1908            if total_width <= 8 {
1909                ControlWordIterator::Unary8(UnaryControlWordIterator {
1910                    repdef: iter,
1911                    level_mask: def_mask,
1912                    bits_rep: 0,
1913                    bits_def: total_width as u8,
1914                    max_rep: 0,
1915                    phantom: std::marker::PhantomData,
1916                })
1917            } else if total_width <= 16 {
1918                ControlWordIterator::Unary16(UnaryControlWordIterator {
1919                    repdef: iter,
1920                    level_mask: def_mask,
1921                    bits_rep: 0,
1922                    bits_def: total_width as u8,
1923                    max_rep: 0,
1924                    phantom: std::marker::PhantomData,
1925                })
1926            } else {
1927                ControlWordIterator::Unary32(UnaryControlWordIterator {
1928                    repdef: iter,
1929                    level_mask: def_mask,
1930                    bits_rep: 0,
1931                    bits_def: total_width as u8,
1932                    max_rep: 0,
1933                    phantom: std::marker::PhantomData,
1934                })
1935            }
1936        }
1937        (None, None) => ControlWordIterator::Nilary(NilaryControlWordIterator { len, idx: 0 }),
1938    }
1939}
1940
1941/// A parser to unwrap control words into repetition and definition levels
1942///
1943/// This is the inverse of the [`ControlWordIterator`].
1944#[derive(Copy, Clone, Debug)]
1945pub enum ControlWordParser {
1946    // First item is the bits to shift, second is the mask to apply (the mask can be
1947    // calculated from the bits to shift but we don't want to calculate it each time)
1948    BOTH8(u8, u32),
1949    BOTH16(u8, u32),
1950    BOTH32(u8, u32),
1951    REP8,
1952    REP16,
1953    REP32,
1954    DEF8,
1955    DEF16,
1956    DEF32,
1957    NIL,
1958}
1959
1960impl ControlWordParser {
1961    fn parse_both<const WORD_SIZE: u8>(
1962        src: &[u8],
1963        dst_rep: &mut Vec<u16>,
1964        dst_def: &mut Vec<u16>,
1965        bits_to_shift: u8,
1966        mask_to_apply: u32,
1967    ) {
1968        match WORD_SIZE {
1969            1 => {
1970                let word = src[0];
1971                let rep = word >> bits_to_shift;
1972                let def = word & (mask_to_apply as u8);
1973                dst_rep.push(rep as u16);
1974                dst_def.push(def as u16);
1975            }
1976            2 => {
1977                let word = u16::from_le_bytes([src[0], src[1]]);
1978                let rep = word >> bits_to_shift;
1979                let def = word & mask_to_apply as u16;
1980                dst_rep.push(rep);
1981                dst_def.push(def);
1982            }
1983            4 => {
1984                let word = u32::from_le_bytes([src[0], src[1], src[2], src[3]]);
1985                let rep = word >> bits_to_shift;
1986                let def = word & mask_to_apply;
1987                dst_rep.push(rep as u16);
1988                dst_def.push(def as u16);
1989            }
1990            _ => unreachable!(),
1991        }
1992    }
1993
1994    fn parse_desc_both<const WORD_SIZE: u8>(
1995        src: &[u8],
1996        bits_to_shift: u8,
1997        mask_to_apply: u32,
1998        max_rep: u16,
1999        max_visible_def: u16,
2000    ) -> ControlWordDesc {
2001        match WORD_SIZE {
2002            1 => {
2003                let word = src[0];
2004                let rep = word >> bits_to_shift;
2005                let def = word & (mask_to_apply as u8);
2006                let is_visible = def as u16 <= max_visible_def;
2007                let is_new_row = rep as u16 == max_rep;
2008                let is_valid_item = def == 0;
2009                ControlWordDesc {
2010                    is_visible,
2011                    is_new_row,
2012                    is_valid_item,
2013                }
2014            }
2015            2 => {
2016                let word = u16::from_le_bytes([src[0], src[1]]);
2017                let rep = word >> bits_to_shift;
2018                let def = word & mask_to_apply as u16;
2019                let is_visible = def <= max_visible_def;
2020                let is_new_row = rep == max_rep;
2021                let is_valid_item = def == 0;
2022                ControlWordDesc {
2023                    is_visible,
2024                    is_new_row,
2025                    is_valid_item,
2026                }
2027            }
2028            4 => {
2029                let word = u32::from_le_bytes([src[0], src[1], src[2], src[3]]);
2030                let rep = word >> bits_to_shift;
2031                let def = word & mask_to_apply;
2032                let is_visible = def as u16 <= max_visible_def;
2033                let is_new_row = rep as u16 == max_rep;
2034                let is_valid_item = def == 0;
2035                ControlWordDesc {
2036                    is_visible,
2037                    is_new_row,
2038                    is_valid_item,
2039                }
2040            }
2041            _ => unreachable!(),
2042        }
2043    }
2044
2045    fn parse_one<const WORD_SIZE: u8>(src: &[u8], dst: &mut Vec<u16>) {
2046        match WORD_SIZE {
2047            1 => {
2048                let word = src[0];
2049                dst.push(word as u16);
2050            }
2051            2 => {
2052                let word = u16::from_le_bytes([src[0], src[1]]);
2053                dst.push(word);
2054            }
2055            4 => {
2056                let word = u32::from_le_bytes([src[0], src[1], src[2], src[3]]);
2057                dst.push(word as u16);
2058            }
2059            _ => unreachable!(),
2060        }
2061    }
2062
2063    fn parse_rep_desc_one<const WORD_SIZE: u8>(src: &[u8], max_rep: u16) -> ControlWordDesc {
2064        match WORD_SIZE {
2065            1 => ControlWordDesc {
2066                is_new_row: src[0] as u16 == max_rep,
2067                is_visible: true,
2068                is_valid_item: true,
2069            },
2070            2 => ControlWordDesc {
2071                is_new_row: u16::from_le_bytes([src[0], src[1]]) == max_rep,
2072                is_visible: true,
2073                is_valid_item: true,
2074            },
2075            4 => ControlWordDesc {
2076                is_new_row: u32::from_le_bytes([src[0], src[1], src[2], src[3]]) as u16 == max_rep,
2077                is_visible: true,
2078                is_valid_item: true,
2079            },
2080            _ => unreachable!(),
2081        }
2082    }
2083
2084    fn parse_def_desc_one<const WORD_SIZE: u8>(src: &[u8]) -> ControlWordDesc {
2085        match WORD_SIZE {
2086            1 => ControlWordDesc {
2087                is_new_row: true,
2088                is_visible: true,
2089                is_valid_item: src[0] == 0,
2090            },
2091            2 => ControlWordDesc {
2092                is_new_row: true,
2093                is_visible: true,
2094                is_valid_item: u16::from_le_bytes([src[0], src[1]]) == 0,
2095            },
2096            4 => ControlWordDesc {
2097                is_new_row: true,
2098                is_visible: true,
2099                is_valid_item: u32::from_le_bytes([src[0], src[1], src[2], src[3]]) as u16 == 0,
2100            },
2101            _ => unreachable!(),
2102        }
2103    }
2104
2105    /// Returns the number of bytes per control word
2106    pub fn bytes_per_word(&self) -> usize {
2107        match self {
2108            Self::BOTH8(..) => 1,
2109            Self::BOTH16(..) => 2,
2110            Self::BOTH32(..) => 4,
2111            Self::REP8 => 1,
2112            Self::REP16 => 2,
2113            Self::REP32 => 4,
2114            Self::DEF8 => 1,
2115            Self::DEF16 => 2,
2116            Self::DEF32 => 4,
2117            Self::NIL => 0,
2118        }
2119    }
2120
2121    /// Appends the next control word to the rep & def buffers
2122    ///
2123    /// `src` should be pointing at the first byte (little endian) of the control word
2124    ///
2125    /// `dst_rep` and `dst_def` are the buffers to append the rep and def levels to.
2126    /// They will not be appended to if not needed.
2127    pub fn parse(&self, src: &[u8], dst_rep: &mut Vec<u16>, dst_def: &mut Vec<u16>) {
2128        match self {
2129            Self::BOTH8(bits_to_shift, mask_to_apply) => {
2130                Self::parse_both::<1>(src, dst_rep, dst_def, *bits_to_shift, *mask_to_apply)
2131            }
2132            Self::BOTH16(bits_to_shift, mask_to_apply) => {
2133                Self::parse_both::<2>(src, dst_rep, dst_def, *bits_to_shift, *mask_to_apply)
2134            }
2135            Self::BOTH32(bits_to_shift, mask_to_apply) => {
2136                Self::parse_both::<4>(src, dst_rep, dst_def, *bits_to_shift, *mask_to_apply)
2137            }
2138            Self::REP8 => Self::parse_one::<1>(src, dst_rep),
2139            Self::REP16 => Self::parse_one::<2>(src, dst_rep),
2140            Self::REP32 => Self::parse_one::<4>(src, dst_rep),
2141            Self::DEF8 => Self::parse_one::<1>(src, dst_def),
2142            Self::DEF16 => Self::parse_one::<2>(src, dst_def),
2143            Self::DEF32 => Self::parse_one::<4>(src, dst_def),
2144            Self::NIL => {}
2145        }
2146    }
2147
2148    /// Return true if the control words contain repetition information
2149    pub fn has_rep(&self) -> bool {
2150        match self {
2151            Self::BOTH8(..)
2152            | Self::BOTH16(..)
2153            | Self::BOTH32(..)
2154            | Self::REP8
2155            | Self::REP16
2156            | Self::REP32 => true,
2157            Self::DEF8 | Self::DEF16 | Self::DEF32 | Self::NIL => false,
2158        }
2159    }
2160
2161    /// Temporarily parses the control word to inspect its properties but does not append to any buffers
2162    pub fn parse_desc(&self, src: &[u8], max_rep: u16, max_visible_def: u16) -> ControlWordDesc {
2163        match self {
2164            Self::BOTH8(bits_to_shift, mask_to_apply) => Self::parse_desc_both::<1>(
2165                src,
2166                *bits_to_shift,
2167                *mask_to_apply,
2168                max_rep,
2169                max_visible_def,
2170            ),
2171            Self::BOTH16(bits_to_shift, mask_to_apply) => Self::parse_desc_both::<2>(
2172                src,
2173                *bits_to_shift,
2174                *mask_to_apply,
2175                max_rep,
2176                max_visible_def,
2177            ),
2178            Self::BOTH32(bits_to_shift, mask_to_apply) => Self::parse_desc_both::<4>(
2179                src,
2180                *bits_to_shift,
2181                *mask_to_apply,
2182                max_rep,
2183                max_visible_def,
2184            ),
2185            Self::REP8 => Self::parse_rep_desc_one::<1>(src, max_rep),
2186            Self::REP16 => Self::parse_rep_desc_one::<2>(src, max_rep),
2187            Self::REP32 => Self::parse_rep_desc_one::<4>(src, max_rep),
2188            Self::DEF8 => Self::parse_def_desc_one::<1>(src),
2189            Self::DEF16 => Self::parse_def_desc_one::<2>(src),
2190            Self::DEF32 => Self::parse_def_desc_one::<4>(src),
2191            Self::NIL => ControlWordDesc {
2192                is_new_row: true,
2193                is_valid_item: true,
2194                is_visible: true,
2195            },
2196        }
2197    }
2198
2199    /// Creates a new parser from the number of bits used for the repetition and definition levels
2200    pub fn new(bits_rep: u8, bits_def: u8) -> Self {
2201        let total_bits = bits_rep + bits_def;
2202
2203        enum WordSize {
2204            One,
2205            Two,
2206            Four,
2207        }
2208
2209        let word_size = if total_bits <= 8 {
2210            WordSize::One
2211        } else if total_bits <= 16 {
2212            WordSize::Two
2213        } else {
2214            WordSize::Four
2215        };
2216
2217        match (bits_rep > 0, bits_def > 0, word_size) {
2218            (false, false, _) => Self::NIL,
2219            (false, true, WordSize::One) => Self::DEF8,
2220            (false, true, WordSize::Two) => Self::DEF16,
2221            (false, true, WordSize::Four) => Self::DEF32,
2222            (true, false, WordSize::One) => Self::REP8,
2223            (true, false, WordSize::Two) => Self::REP16,
2224            (true, false, WordSize::Four) => Self::REP32,
2225            (true, true, WordSize::One) => Self::BOTH8(bits_def, get_mask(bits_def as u16) as u32),
2226            (true, true, WordSize::Two) => Self::BOTH16(bits_def, get_mask(bits_def as u16) as u32),
2227            (true, true, WordSize::Four) => {
2228                Self::BOTH32(bits_def, get_mask(bits_def as u16) as u32)
2229            }
2230        }
2231    }
2232}
2233
2234#[cfg(test)]
2235mod tests {
2236    use arrow_buffer::{NullBuffer, OffsetBuffer, ScalarBuffer};
2237
2238    use crate::repdef::{
2239        CompositeRepDefUnraveler, DefinitionInterpretation, RepDefUnraveler, SerializedRepDefs,
2240    };
2241
2242    use super::RepDefBuilder;
2243
2244    fn validity(values: &[bool]) -> NullBuffer {
2245        NullBuffer::from_iter(values.iter().copied())
2246    }
2247
2248    fn offsets_32(values: &[i32]) -> OffsetBuffer<i32> {
2249        OffsetBuffer::<i32>::new(ScalarBuffer::from_iter(values.iter().copied()))
2250    }
2251
2252    fn offsets_64(values: &[i64]) -> OffsetBuffer<i64> {
2253        OffsetBuffer::<i64>::new(ScalarBuffer::from_iter(values.iter().copied()))
2254    }
2255
2256    #[test]
2257    fn test_repdef_basic() {
2258        // Basic case, rep & def
2259        let mut builder = RepDefBuilder::default();
2260        builder.add_offsets(
2261            offsets_64(&[0, 2, 2, 5]),
2262            Some(validity(&[true, false, true])),
2263        );
2264        builder.add_offsets(
2265            offsets_64(&[0, 1, 3, 5, 5, 9]),
2266            Some(validity(&[true, true, true, false, true])),
2267        );
2268        builder.add_validity_bitmap(validity(&[
2269            true, true, true, false, false, false, true, true, false,
2270        ]));
2271
2272        let repdefs = RepDefBuilder::serialize(vec![builder]);
2273        let rep = repdefs.repetition_levels.unwrap();
2274        let def = repdefs.definition_levels.unwrap();
2275
2276        assert_eq!(vec![0, 0, 0, 3, 1, 1, 2, 1, 0, 0, 1], *def);
2277        assert_eq!(vec![2, 1, 0, 2, 2, 0, 1, 1, 0, 0, 0], *rep);
2278
2279        // [[I], [I, I]], NULL, [[NULL, NULL], NULL, [NULL, I, I, NULL]]
2280
2281        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2282            Some(rep.as_ref().to_vec()),
2283            Some(def.as_ref().to_vec()),
2284            repdefs.def_meaning.into(),
2285        )]);
2286
2287        // Note: validity doesn't exactly round-trip because repdef normalizes some of the
2288        // redundant validity values
2289        assert_eq!(
2290            unraveler.unravel_validity(9),
2291            Some(validity(&[
2292                true, true, true, false, false, false, true, true, false
2293            ]))
2294        );
2295        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2296        assert_eq!(off.inner(), offsets_32(&[0, 1, 3, 5, 5, 9]).inner());
2297        assert_eq!(val, Some(validity(&[true, true, true, false, true])));
2298        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2299        assert_eq!(off.inner(), offsets_32(&[0, 2, 2, 5]).inner());
2300        assert_eq!(val, Some(validity(&[true, false, true])));
2301    }
2302
2303    #[test]
2304    fn test_repdef_simple_null_empty_list() {
2305        let check = |repdefs: SerializedRepDefs, last_def: DefinitionInterpretation| {
2306            let rep = repdefs.repetition_levels.unwrap();
2307            let def = repdefs.definition_levels.unwrap();
2308
2309            assert_eq!([1, 0, 1, 1, 0, 0], *rep);
2310            assert_eq!([0, 0, 2, 0, 1, 0], *def);
2311            assert_eq!(
2312                vec![DefinitionInterpretation::NullableItem, last_def,],
2313                repdefs.def_meaning
2314            );
2315        };
2316
2317        // Null list and empty list should be serialized mostly the same
2318
2319        // Null case
2320        let mut builder = RepDefBuilder::default();
2321        builder.add_offsets(
2322            offsets_32(&[0, 2, 2, 5]),
2323            Some(validity(&[true, false, true])),
2324        );
2325        builder.add_validity_bitmap(validity(&[true, true, true, false, true]));
2326
2327        let repdefs = RepDefBuilder::serialize(vec![builder]);
2328
2329        check(repdefs, DefinitionInterpretation::NullableList);
2330
2331        // Empty case
2332        let mut builder = RepDefBuilder::default();
2333        builder.add_offsets(offsets_32(&[0, 2, 2, 5]), None);
2334        builder.add_validity_bitmap(validity(&[true, true, true, false, true]));
2335
2336        let repdefs = RepDefBuilder::serialize(vec![builder]);
2337
2338        check(repdefs, DefinitionInterpretation::EmptyableList);
2339    }
2340
2341    #[test]
2342    fn test_repdef_empty_list_at_end() {
2343        // Regresses a failure we encountered when the last item was an empty list
2344        let mut builder = RepDefBuilder::default();
2345        builder.add_offsets(offsets_32(&[0, 2, 5, 5]), None);
2346        builder.add_validity_bitmap(validity(&[true, true, true, false, true]));
2347
2348        let repdefs = RepDefBuilder::serialize(vec![builder]);
2349
2350        let rep = repdefs.repetition_levels.unwrap();
2351        let def = repdefs.definition_levels.unwrap();
2352
2353        assert_eq!([1, 0, 1, 0, 0, 1], *rep);
2354        assert_eq!([0, 0, 0, 1, 0, 2], *def);
2355        assert_eq!(
2356            vec![
2357                DefinitionInterpretation::NullableItem,
2358                DefinitionInterpretation::EmptyableList,
2359            ],
2360            repdefs.def_meaning
2361        );
2362    }
2363
2364    #[test]
2365    fn test_repdef_abnormal_nulls() {
2366        // List nulls are allowed to have non-empty offsets and garbage values
2367        // and the add_offsets call should normalize this
2368        let mut builder = RepDefBuilder::default();
2369        builder.add_offsets(
2370            offsets_32(&[0, 2, 5, 8]),
2371            Some(validity(&[true, false, true])),
2372        );
2373        // Note: we pass 5 here and not 8.  If add_offsets tells us there is garbage nulls they
2374        // should be removed before continuing
2375        builder.add_no_null(5);
2376
2377        let repdefs = RepDefBuilder::serialize(vec![builder]);
2378
2379        let rep = repdefs.repetition_levels.unwrap();
2380        let def = repdefs.definition_levels.unwrap();
2381
2382        assert_eq!([1, 0, 1, 1, 0, 0], *rep);
2383        assert_eq!([0, 0, 1, 0, 0, 0], *def);
2384
2385        assert_eq!(
2386            vec![
2387                DefinitionInterpretation::AllValidItem,
2388                DefinitionInterpretation::NullableList,
2389            ],
2390            repdefs.def_meaning
2391        );
2392    }
2393
2394    #[test]
2395    fn test_repdef_fsl() {
2396        let mut builder = RepDefBuilder::default();
2397        builder.add_fsl(Some(validity(&[true, false])), 2, 2);
2398        builder.add_fsl(None, 2, 4);
2399        builder.add_validity_bitmap(validity(&[
2400            true, false, true, false, true, false, true, false,
2401        ]));
2402
2403        let repdefs = RepDefBuilder::serialize(vec![builder]);
2404
2405        assert_eq!(
2406            vec![
2407                DefinitionInterpretation::NullableItem,
2408                DefinitionInterpretation::AllValidItem,
2409                DefinitionInterpretation::NullableItem
2410            ],
2411            repdefs.def_meaning
2412        );
2413
2414        assert!(repdefs.repetition_levels.is_none());
2415
2416        let def = repdefs.definition_levels.unwrap();
2417
2418        assert_eq!([0, 1, 0, 1, 2, 2, 2, 2], *def);
2419
2420        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2421            None,
2422            Some(def.as_ref().to_vec()),
2423            repdefs.def_meaning.into(),
2424        )]);
2425
2426        assert_eq!(
2427            unraveler.unravel_validity(8),
2428            Some(validity(&[
2429                true, false, true, false, false, false, false, false
2430            ]))
2431        );
2432        assert_eq!(unraveler.unravel_fsl_validity(4, 2), None);
2433        assert_eq!(
2434            unraveler.unravel_fsl_validity(2, 2),
2435            Some(validity(&[true, false]))
2436        );
2437    }
2438
2439    #[test]
2440    fn test_repdef_fsl_allvalid_item() {
2441        let mut builder = RepDefBuilder::default();
2442        builder.add_fsl(Some(validity(&[true, false])), 2, 2);
2443        builder.add_fsl(None, 2, 4);
2444        builder.add_no_null(8);
2445
2446        let repdefs = RepDefBuilder::serialize(vec![builder]);
2447
2448        assert_eq!(
2449            vec![
2450                DefinitionInterpretation::AllValidItem,
2451                DefinitionInterpretation::AllValidItem,
2452                DefinitionInterpretation::NullableItem
2453            ],
2454            repdefs.def_meaning
2455        );
2456
2457        assert!(repdefs.repetition_levels.is_none());
2458
2459        let def = repdefs.definition_levels.unwrap();
2460
2461        assert_eq!([0, 0, 0, 0, 1, 1, 1, 1], *def);
2462
2463        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2464            None,
2465            Some(def.as_ref().to_vec()),
2466            repdefs.def_meaning.into(),
2467        )]);
2468
2469        assert_eq!(unraveler.unravel_validity(8), None);
2470        assert_eq!(unraveler.unravel_fsl_validity(4, 2), None);
2471        assert_eq!(
2472            unraveler.unravel_fsl_validity(2, 2),
2473            Some(validity(&[true, false]))
2474        );
2475    }
2476
2477    #[test]
2478    fn test_repdef_sliced_offsets() {
2479        // Sliced lists may have offsets that don't start with zero.  The
2480        // add_offsets call needs to normalize these to operate correctly.
2481        let mut builder = RepDefBuilder::default();
2482        builder.add_offsets(
2483            offsets_32(&[5, 7, 7, 10]),
2484            Some(validity(&[true, false, true])),
2485        );
2486        builder.add_no_null(5);
2487
2488        let repdefs = RepDefBuilder::serialize(vec![builder]);
2489
2490        let rep = repdefs.repetition_levels.unwrap();
2491        let def = repdefs.definition_levels.unwrap();
2492
2493        assert_eq!([1, 0, 1, 1, 0, 0], *rep);
2494        assert_eq!([0, 0, 1, 0, 0, 0], *def);
2495
2496        assert_eq!(
2497            vec![
2498                DefinitionInterpretation::AllValidItem,
2499                DefinitionInterpretation::NullableList,
2500            ],
2501            repdefs.def_meaning
2502        );
2503    }
2504
2505    #[test]
2506    fn test_repdef_complex_null_empty() {
2507        let mut builder = RepDefBuilder::default();
2508        builder.add_offsets(
2509            offsets_32(&[0, 4, 4, 4, 6]),
2510            Some(validity(&[true, false, true, true])),
2511        );
2512        builder.add_offsets(
2513            offsets_32(&[0, 1, 1, 2, 2, 2, 3]),
2514            Some(validity(&[true, false, true, false, true, true])),
2515        );
2516        builder.add_no_null(3);
2517
2518        let repdefs = RepDefBuilder::serialize(vec![builder]);
2519
2520        let rep = repdefs.repetition_levels.unwrap();
2521        let def = repdefs.definition_levels.unwrap();
2522
2523        assert_eq!([2, 1, 1, 1, 2, 2, 2, 1], *rep);
2524        assert_eq!([0, 1, 0, 1, 3, 4, 2, 0], *def);
2525    }
2526
2527    #[test]
2528    fn test_repdef_empty_list_no_null() {
2529        // Tests when we have some empty lists but no null lists.  This case
2530        // caused some bugs because we have definition but no nulls
2531        let mut builder = RepDefBuilder::default();
2532        builder.add_offsets(offsets_32(&[0, 4, 4, 4, 6]), None);
2533        builder.add_no_null(6);
2534
2535        let repdefs = RepDefBuilder::serialize(vec![builder]);
2536
2537        let rep = repdefs.repetition_levels.unwrap();
2538        let def = repdefs.definition_levels.unwrap();
2539
2540        assert_eq!([1, 0, 0, 0, 1, 1, 1, 0], *rep);
2541        assert_eq!([0, 0, 0, 0, 1, 1, 0, 0], *def);
2542
2543        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2544            Some(rep.as_ref().to_vec()),
2545            Some(def.as_ref().to_vec()),
2546            repdefs.def_meaning.into(),
2547        )]);
2548
2549        assert_eq!(unraveler.unravel_validity(6), None);
2550        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2551        assert_eq!(off.inner(), offsets_32(&[0, 4, 4, 4, 6]).inner());
2552        assert_eq!(val, None);
2553    }
2554
2555    #[test]
2556    fn test_repdef_all_valid() {
2557        let mut builder = RepDefBuilder::default();
2558        builder.add_offsets(offsets_64(&[0, 2, 3, 5]), None);
2559        builder.add_offsets(offsets_64(&[0, 1, 3, 5, 7, 9]), None);
2560        builder.add_no_null(9);
2561
2562        let repdefs = RepDefBuilder::serialize(vec![builder]);
2563        let rep = repdefs.repetition_levels.unwrap();
2564        assert!(repdefs.definition_levels.is_none());
2565
2566        assert_eq!([2, 1, 0, 2, 0, 2, 0, 1, 0], *rep);
2567
2568        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2569            Some(rep.as_ref().to_vec()),
2570            None,
2571            repdefs.def_meaning.into(),
2572        )]);
2573
2574        assert_eq!(unraveler.unravel_validity(9), None);
2575        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2576        assert_eq!(off.inner(), offsets_32(&[0, 1, 3, 5, 7, 9]).inner());
2577        assert_eq!(val, None);
2578        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2579        assert_eq!(off.inner(), offsets_32(&[0, 2, 3, 5]).inner());
2580        assert_eq!(val, None);
2581    }
2582
2583    #[test]
2584    fn test_only_empty_lists() {
2585        let mut builder = RepDefBuilder::default();
2586        builder.add_offsets(offsets_32(&[0, 4, 4, 4, 6]), None);
2587        builder.add_no_null(6);
2588
2589        let repdefs = RepDefBuilder::serialize(vec![builder]);
2590
2591        let rep = repdefs.repetition_levels.unwrap();
2592        let def = repdefs.definition_levels.unwrap();
2593
2594        assert_eq!([1, 0, 0, 0, 1, 1, 1, 0], *rep);
2595        assert_eq!([0, 0, 0, 0, 1, 1, 0, 0], *def);
2596
2597        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2598            Some(rep.as_ref().to_vec()),
2599            Some(def.as_ref().to_vec()),
2600            repdefs.def_meaning.into(),
2601        )]);
2602
2603        assert_eq!(unraveler.unravel_validity(6), None);
2604        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2605        assert_eq!(off.inner(), offsets_32(&[0, 4, 4, 4, 6]).inner());
2606        assert_eq!(val, None);
2607    }
2608
2609    #[test]
2610    fn test_only_null_lists() {
2611        let mut builder = RepDefBuilder::default();
2612        builder.add_offsets(
2613            offsets_32(&[0, 4, 4, 4, 6]),
2614            Some(validity(&[true, false, false, true])),
2615        );
2616        builder.add_no_null(6);
2617
2618        let repdefs = RepDefBuilder::serialize(vec![builder]);
2619
2620        let rep = repdefs.repetition_levels.unwrap();
2621        let def = repdefs.definition_levels.unwrap();
2622
2623        assert_eq!([1, 0, 0, 0, 1, 1, 1, 0], *rep);
2624        assert_eq!([0, 0, 0, 0, 1, 1, 0, 0], *def);
2625
2626        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2627            Some(rep.as_ref().to_vec()),
2628            Some(def.as_ref().to_vec()),
2629            repdefs.def_meaning.into(),
2630        )]);
2631
2632        assert_eq!(unraveler.unravel_validity(6), None);
2633        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2634        assert_eq!(off.inner(), offsets_32(&[0, 4, 4, 4, 6]).inner());
2635        assert_eq!(val, Some(validity(&[true, false, false, true])));
2636    }
2637
2638    #[test]
2639    fn test_null_and_empty_lists() {
2640        let mut builder = RepDefBuilder::default();
2641        builder.add_offsets(
2642            offsets_32(&[0, 4, 4, 4, 6]),
2643            Some(validity(&[true, false, true, true])),
2644        );
2645        builder.add_no_null(6);
2646
2647        let repdefs = RepDefBuilder::serialize(vec![builder]);
2648
2649        let rep = repdefs.repetition_levels.unwrap();
2650        let def = repdefs.definition_levels.unwrap();
2651
2652        assert_eq!([1, 0, 0, 0, 1, 1, 1, 0], *rep);
2653        assert_eq!([0, 0, 0, 0, 1, 2, 0, 0], *def);
2654
2655        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2656            Some(rep.as_ref().to_vec()),
2657            Some(def.as_ref().to_vec()),
2658            repdefs.def_meaning.into(),
2659        )]);
2660
2661        assert_eq!(unraveler.unravel_validity(6), None);
2662        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2663        assert_eq!(off.inner(), offsets_32(&[0, 4, 4, 4, 6]).inner());
2664        assert_eq!(val, Some(validity(&[true, false, true, true])));
2665    }
2666
2667    #[test]
2668    fn test_repdef_no_rep() {
2669        let mut builder = RepDefBuilder::default();
2670        builder.add_no_null(5);
2671        builder.add_validity_bitmap(validity(&[false, false, true, true, true]));
2672        builder.add_validity_bitmap(validity(&[false, true, true, true, false]));
2673
2674        let repdefs = RepDefBuilder::serialize(vec![builder]);
2675        assert!(repdefs.repetition_levels.is_none());
2676        let def = repdefs.definition_levels.unwrap();
2677
2678        assert_eq!([2, 2, 0, 0, 1], *def);
2679
2680        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2681            None,
2682            Some(def.as_ref().to_vec()),
2683            repdefs.def_meaning.into(),
2684        )]);
2685
2686        assert_eq!(
2687            unraveler.unravel_validity(5),
2688            Some(validity(&[false, false, true, true, false]))
2689        );
2690        assert_eq!(
2691            unraveler.unravel_validity(5),
2692            Some(validity(&[false, false, true, true, true]))
2693        );
2694        assert_eq!(unraveler.unravel_validity(5), None);
2695    }
2696
2697    #[test]
2698    fn test_composite_unravel() {
2699        let mut builder = RepDefBuilder::default();
2700        builder.add_offsets(
2701            offsets_64(&[0, 2, 2, 5]),
2702            Some(validity(&[true, false, true])),
2703        );
2704        builder.add_no_null(5);
2705        let repdef1 = RepDefBuilder::serialize(vec![builder]);
2706
2707        let mut builder = RepDefBuilder::default();
2708        builder.add_offsets(offsets_64(&[0, 1, 3, 5, 7, 9]), None);
2709        builder.add_no_null(9);
2710        let repdef2 = RepDefBuilder::serialize(vec![builder]);
2711
2712        let rep1 = repdef1.repetition_levels.clone().unwrap();
2713        let def1 = repdef1.definition_levels.clone().unwrap();
2714        let rep2 = repdef2.repetition_levels.clone().unwrap();
2715        assert!(repdef2.definition_levels.is_none());
2716
2717        assert_eq!([1, 0, 1, 1, 0, 0], *rep1);
2718        assert_eq!([0, 0, 1, 0, 0, 0], *def1);
2719        assert_eq!([1, 1, 0, 1, 0, 1, 0, 1, 0], *rep2);
2720
2721        let unravel1 = RepDefUnraveler::new(
2722            repdef1.repetition_levels.map(|l| l.to_vec()),
2723            repdef1.definition_levels.map(|l| l.to_vec()),
2724            repdef1.def_meaning.into(),
2725        );
2726        let unravel2 = RepDefUnraveler::new(
2727            repdef2.repetition_levels.map(|l| l.to_vec()),
2728            repdef2.definition_levels.map(|l| l.to_vec()),
2729            repdef2.def_meaning.into(),
2730        );
2731
2732        let mut unraveler = CompositeRepDefUnraveler::new(vec![unravel1, unravel2]);
2733
2734        assert!(unraveler.unravel_validity(9).is_none());
2735        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2736        assert_eq!(
2737            off.inner(),
2738            offsets_32(&[0, 2, 2, 5, 6, 8, 10, 12, 14]).inner()
2739        );
2740        assert_eq!(
2741            val,
2742            Some(validity(&[true, false, true, true, true, true, true, true]))
2743        );
2744    }
2745
2746    #[test]
2747    fn test_repdef_multiple_builders() {
2748        // Basic case, rep & def
2749        let mut builder1 = RepDefBuilder::default();
2750        builder1.add_offsets(offsets_64(&[0, 2]), None);
2751        builder1.add_offsets(offsets_64(&[0, 1, 3]), None);
2752        builder1.add_validity_bitmap(validity(&[true, true, true]));
2753
2754        let mut builder2 = RepDefBuilder::default();
2755        builder2.add_offsets(offsets_64(&[0, 0, 3]), Some(validity(&[false, true])));
2756        builder2.add_offsets(
2757            offsets_64(&[0, 2, 2, 6]),
2758            Some(validity(&[true, false, true])),
2759        );
2760        builder2.add_validity_bitmap(validity(&[false, false, false, true, true, false]));
2761
2762        let repdefs = RepDefBuilder::serialize(vec![builder1, builder2]);
2763
2764        let rep = repdefs.repetition_levels.unwrap();
2765        let def = repdefs.definition_levels.unwrap();
2766
2767        assert_eq!([2, 1, 0, 2, 2, 0, 1, 1, 0, 0, 0], *rep);
2768        assert_eq!([0, 0, 0, 3, 1, 1, 2, 1, 0, 0, 1], *def);
2769    }
2770
2771    #[test]
2772    fn test_slicer() {
2773        let mut builder = RepDefBuilder::default();
2774        builder.add_offsets(
2775            offsets_64(&[0, 2, 2, 30, 30]),
2776            Some(validity(&[true, false, true, true])),
2777        );
2778        builder.add_no_null(30);
2779
2780        let repdefs = RepDefBuilder::serialize(vec![builder]);
2781
2782        let mut rep_slicer = repdefs.rep_slicer().unwrap();
2783
2784        // First 5 items include a null list so we get 6 levels (12 bytes)
2785        assert_eq!(rep_slicer.slice_next(5).len(), 12);
2786        // Next 20 are all plain
2787        assert_eq!(rep_slicer.slice_next(20).len(), 40);
2788        // Last 5 include an empty list so we get 6 levels (12 bytes)
2789        assert_eq!(rep_slicer.slice_rest().len(), 12);
2790
2791        let mut def_slicer = repdefs.rep_slicer().unwrap();
2792
2793        // First 5 items include a null list so we get 6 levels (12 bytes)
2794        assert_eq!(def_slicer.slice_next(5).len(), 12);
2795        // Next 20 are all plain
2796        assert_eq!(def_slicer.slice_next(20).len(), 40);
2797        // Last 5 include an empty list so we get 6 levels (12 bytes)
2798        assert_eq!(def_slicer.slice_rest().len(), 12);
2799    }
2800
2801    #[test]
2802    fn test_control_words() {
2803        // Convert to control words, verify expected, convert back, verify same as original
2804        fn check(
2805            rep: &[u16],
2806            def: &[u16],
2807            expected_values: Vec<u8>,
2808            expected_bytes_per_word: usize,
2809            expected_bits_rep: u8,
2810            expected_bits_def: u8,
2811        ) {
2812            let num_vals = rep.len().max(def.len());
2813            let max_rep = rep.iter().max().copied().unwrap_or(0);
2814            let max_def = def.iter().max().copied().unwrap_or(0);
2815
2816            let in_rep = if rep.is_empty() { None } else { Some(rep) };
2817            let in_def = if def.is_empty() { None } else { Some(def) };
2818
2819            let mut iter = super::build_control_word_iterator(
2820                in_rep,
2821                max_rep,
2822                in_def,
2823                max_def,
2824                max_def + 1,
2825                expected_values.len(),
2826            );
2827            assert_eq!(iter.bytes_per_word(), expected_bytes_per_word);
2828            assert_eq!(iter.bits_rep(), expected_bits_rep);
2829            assert_eq!(iter.bits_def(), expected_bits_def);
2830            let mut cw_vec = Vec::with_capacity(num_vals * iter.bytes_per_word());
2831
2832            for _ in 0..num_vals {
2833                iter.append_next(&mut cw_vec);
2834            }
2835            assert!(iter.append_next(&mut cw_vec).is_none());
2836
2837            assert_eq!(expected_values, cw_vec);
2838
2839            let parser = super::ControlWordParser::new(expected_bits_rep, expected_bits_def);
2840
2841            let mut rep_out = Vec::with_capacity(num_vals);
2842            let mut def_out = Vec::with_capacity(num_vals);
2843
2844            if expected_bytes_per_word > 0 {
2845                for slice in cw_vec.chunks_exact(expected_bytes_per_word) {
2846                    parser.parse(slice, &mut rep_out, &mut def_out);
2847                }
2848            }
2849
2850            assert_eq!(rep, rep_out.as_slice());
2851            assert_eq!(def, def_out.as_slice());
2852        }
2853
2854        // Each will need 4 bits and so we should get 1-byte control words
2855        let rep = &[0_u16, 7, 3, 2, 9, 8, 12, 5];
2856        let def = &[5_u16, 3, 1, 2, 12, 15, 0, 2];
2857        let expected = vec![
2858            0b00000101, // 0, 5
2859            0b01110011, // 7, 3
2860            0b00110001, // 3, 1
2861            0b00100010, // 2, 2
2862            0b10011100, // 9, 12
2863            0b10001111, // 8, 15
2864            0b11000000, // 12, 0
2865            0b01010010, // 5, 2
2866        ];
2867        check(rep, def, expected, 1, 4, 4);
2868
2869        // Now we need 5 bits for def so we get 2-byte control words
2870        let rep = &[0_u16, 7, 3, 2, 9, 8, 12, 5];
2871        let def = &[5_u16, 3, 1, 2, 12, 22, 0, 2];
2872        let expected = vec![
2873            0b00000101, 0b00000000, // 0, 5
2874            0b11100011, 0b00000000, // 7, 3
2875            0b01100001, 0b00000000, // 3, 1
2876            0b01000010, 0b00000000, // 2, 2
2877            0b00101100, 0b00000001, // 9, 12
2878            0b00010110, 0b00000001, // 8, 22
2879            0b10000000, 0b00000001, // 12, 0
2880            0b10100010, 0b00000000, // 5, 2
2881        ];
2882        check(rep, def, expected, 2, 4, 5);
2883
2884        // Just rep, 4 bits so 1 byte each
2885        let levels = &[0_u16, 7, 3, 2, 9, 8, 12, 5];
2886        let expected = vec![
2887            0b00000000, // 0
2888            0b00000111, // 7
2889            0b00000011, // 3
2890            0b00000010, // 2
2891            0b00001001, // 9
2892            0b00001000, // 8
2893            0b00001100, // 12
2894            0b00000101, // 5
2895        ];
2896        check(levels, &[], expected.clone(), 1, 4, 0);
2897
2898        // Just def
2899        check(&[], levels, expected, 1, 0, 4);
2900
2901        // No rep, no def, no bytes
2902        check(&[], &[], Vec::default(), 0, 0, 0);
2903    }
2904
2905    #[test]
2906    fn test_control_words_rep_index() {
2907        fn check(
2908            rep: &[u16],
2909            def: &[u16],
2910            expected_new_rows: Vec<bool>,
2911            expected_is_visible: Vec<bool>,
2912        ) {
2913            let num_vals = rep.len().max(def.len());
2914            let max_rep = rep.iter().max().copied().unwrap_or(0);
2915            let max_def = def.iter().max().copied().unwrap_or(0);
2916
2917            let in_rep = if rep.is_empty() { None } else { Some(rep) };
2918            let in_def = if def.is_empty() { None } else { Some(def) };
2919
2920            let mut iter = super::build_control_word_iterator(
2921                in_rep,
2922                max_rep,
2923                in_def,
2924                max_def,
2925                /*max_visible_def=*/ 2,
2926                expected_new_rows.len(),
2927            );
2928
2929            let mut cw_vec = Vec::with_capacity(num_vals * iter.bytes_per_word());
2930            let mut expected_new_rows = expected_new_rows.iter().copied();
2931            let mut expected_is_visible = expected_is_visible.iter().copied();
2932            for _ in 0..expected_new_rows.len() {
2933                let word_desc = iter.append_next(&mut cw_vec).unwrap();
2934                assert_eq!(word_desc.is_new_row, expected_new_rows.next().unwrap());
2935                assert_eq!(word_desc.is_visible, expected_is_visible.next().unwrap());
2936            }
2937            assert!(iter.append_next(&mut cw_vec).is_none());
2938        }
2939
2940        // 2 means new list
2941        let rep = &[2_u16, 1, 0, 2, 2, 0, 1, 1, 0, 2, 0];
2942        // These values don't matter for this test
2943        let def = &[0_u16, 0, 0, 3, 1, 1, 2, 1, 0, 0, 1];
2944
2945        // Rep & def
2946        check(
2947            rep,
2948            def,
2949            vec![
2950                true, false, false, true, true, false, false, false, false, true, false,
2951            ],
2952            vec![
2953                true, true, true, false, true, true, true, true, true, true, true,
2954            ],
2955        );
2956        // Rep only
2957        check(
2958            rep,
2959            &[],
2960            vec![
2961                true, false, false, true, true, false, false, false, false, true, false,
2962            ],
2963            vec![true; 11],
2964        );
2965        // No repetition
2966        check(
2967            &[],
2968            def,
2969            vec![
2970                true, true, true, true, true, true, true, true, true, true, true,
2971            ],
2972            vec![true; 11],
2973        );
2974        // No repetition, no definition
2975        check(
2976            &[],
2977            &[],
2978            vec![
2979                true, true, true, true, true, true, true, true, true, true, true,
2980            ],
2981            vec![true; 11],
2982        );
2983    }
2984
2985    #[test]
2986    fn regress_empty_list_case() {
2987        // This regresses a case where we had 3 null lists inside a struct
2988        let mut builder = RepDefBuilder::default();
2989        builder.add_validity_bitmap(validity(&[true, false, true]));
2990        builder.add_offsets(
2991            offsets_32(&[0, 0, 0, 0]),
2992            Some(validity(&[false, false, false])),
2993        );
2994        builder.add_no_null(0);
2995
2996        let repdefs = RepDefBuilder::serialize(vec![builder]);
2997        let rep = repdefs.repetition_levels.unwrap();
2998        let def = repdefs.definition_levels.unwrap();
2999
3000        assert_eq!([1, 1, 1], *rep);
3001        assert_eq!([1, 2, 1], *def);
3002
3003        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
3004            Some(rep.as_ref().to_vec()),
3005            Some(def.as_ref().to_vec()),
3006            repdefs.def_meaning.into(),
3007        )]);
3008
3009        assert_eq!(unraveler.unravel_validity(0), None);
3010        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
3011        assert_eq!(off.inner(), offsets_32(&[0, 0, 0, 0]).inner());
3012        assert_eq!(val, Some(validity(&[false, false, false])));
3013        let val = unraveler.unravel_validity(3).unwrap();
3014        assert_eq!(val.inner(), validity(&[true, false, true]).inner());
3015    }
3016
3017    #[test]
3018    fn regress_list_ends_null_case() {
3019        let mut builder = RepDefBuilder::default();
3020        builder.add_offsets(
3021            offsets_64(&[0, 1, 2, 2]),
3022            Some(validity(&[true, true, false])),
3023        );
3024        builder.add_offsets(offsets_64(&[0, 1, 1]), Some(validity(&[true, false])));
3025        builder.add_no_null(1);
3026
3027        let repdefs = RepDefBuilder::serialize(vec![builder]);
3028        let rep = repdefs.repetition_levels.unwrap();
3029        let def = repdefs.definition_levels.unwrap();
3030
3031        assert_eq!([2, 2, 2], *rep);
3032        assert_eq!([0, 1, 2], *def);
3033
3034        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
3035            Some(rep.as_ref().to_vec()),
3036            Some(def.as_ref().to_vec()),
3037            repdefs.def_meaning.into(),
3038        )]);
3039
3040        assert_eq!(unraveler.unravel_validity(1), None);
3041        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
3042        assert_eq!(off.inner(), offsets_32(&[0, 1, 1]).inner());
3043        assert_eq!(val, Some(validity(&[true, false])));
3044        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
3045        assert_eq!(off.inner(), offsets_32(&[0, 1, 2, 2]).inner());
3046        assert_eq!(val, Some(validity(&[true, true, false])));
3047    }
3048}