lance_encoding/
repdef.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Utilities for rep-def levels
5//!
6//! Repetition and definition levels are a way to encode multipile validity / offsets arrays
7//! into a single buffer.  They are a form of "zipping" buffers together that takes advantage
8//! of the fact that, if the outermost array is invalid, then the validity of the inner items
9//! is irrelevant.
10//!
11//! Note: the concept of repetition & definition levels comes from the Dremel paper and has
12//! been implemented in Apache Parquet.  However, the implementation here is not necessarily
13//! compatible with Parquet.  For example, we use 0 to represent the "inner-most" item and
14//! Parquet uses 0 to represent the "outer-most" item.
15//!
16//! # Repetition Levels
17//!
18//! With repetition levels we convert a sparse array of offsets into a dense array of levels.
19//! These levels are marked non-zero whenever a new list begins.  In other words, given the
20//! list array with 3 rows [{<0,1>, <>, <2>}, {<3>}, {}], [], [{<4>}] we would have three
21//! offsets arrays:
22//!
23//! Outer-most ([]): [0, 3, 3, 4]
24//! Middle     ({}): [0, 3, 4, 4, 5]
25//! Inner      (<>): [0, 2, 2, 3, 4, 5]
26//! Values         : [0, 1, 2, 3, 4]
27//!
28//! We can convert these into repetition levels as follows:
29//!
30//! | Values | Repetition |
31//! | ------ | ---------- |
32//! |      0 |          3 | // Start of outer-most list
33//! |      1 |          0 | // Continues inner-most list (no new lists)
34//! |      - |          1 | // Start of new inner-most list (empty list)
35//! |      2 |          1 | // Start of new inner-most list
36//! |      3 |          2 | // Start of new middle list
37//! |      - |          2 | // Start of new inner-most list (empty list)
38//! |      - |          3 | // Start of new outer-most list (empty list)
39//! |      4 |          0 | // Start of new outer-most list
40//!
41//! Note: We actually have MORE repetition levels than values.  This is because the repetition
42//! levels need to be able to represent empty lists.
43//!
44//! # Definition Levels
45//!
46//! Definition levels are simpler.  We can think of them as zipping together various validity (from
47//! different levels of nesting) into a single buffer.  For example, we could zip the arrays
48//! [1, 1, 0, 0] and [1, 0, 1, 0] into [11, 10, 01, 00].  However, 00 and 01 are redundant.  If the
49//! outer level is null then the validity of the inner levels is irrelevant.  To save space we instead
50//! encode a "level" which is the "depth" of the null.  Let's look at a more complete example:
51//!
52//! Array: [{"middle": {"inner": 1]}}, NULL, {"middle": NULL}, {"middle": {"inner": NULL}}]
53//!
54//! In Arrow we would have the following validity arrays:
55//! Outer validity : 1, 0, 1, 1
56//! Middle validity: 1, ?, 0, 1
57//! Inner validity : 1, ?, ?, 0
58//! Values         : 1, ?, ?, ?
59//!
60//! The ? values are undefined in the Arrow format.  We can convert these into definition levels as follows:
61//!
62//! | Values | Definition |
63//! | ------ | ---------- |
64//! |      1 |          0 | // Valid at all levels
65//! |      - |          3 | // Null at outer level
66//! |      - |          2 | // Null at middle level
67//! |      - |          1 | // Null at inner level
68//!
69//! # Compression
70//!
71//! Note that we only need 2 bits of definition levels to represent 3 levels of nesting.  Definition
72//! levels are always more compact than the input validity arrays.
73//!
74//! Repetition levels are more complex.  If there are very large lists then a sparse array of offsets
75//! (which has one element per list) might be more compact than a dense array of repetition levels
76//! (which has one element per list value, possibly even more if there are empty lists).
77//!
78//! However, both repetition levels and definition levels are typically very compressible with RLE.
79//!
80//! However, in Lance we don't always take advantage of that compression because we want to be able
81//! to zip rep-def levels together with our values.  This gives us fewer IOPS when accessing row values.
82
83use std::{
84    iter::{Copied, Zip},
85    sync::Arc,
86};
87
88use arrow_array::OffsetSizeTrait;
89use arrow_buffer::{
90    ArrowNativeType, BooleanBuffer, BooleanBufferBuilder, NullBuffer, OffsetBuffer, ScalarBuffer,
91};
92use lance_core::{utils::bit::log_2_ceil, Error, Result};
93use snafu::location;
94
95use crate::buffer::LanceBuffer;
96
97// We assume 16 bits is good enough for rep-def levels.  This gives us
98// 65536 levels of struct nesting and list nesting.
99pub type LevelBuffer = Vec<u16>;
100
101/// Represents information that we extract from a list array as we are
102/// encoding
103#[derive(Clone, Debug)]
104struct OffsetDesc {
105    offsets: Arc<[i64]>,
106    specials: Arc<[SpecialOffset]>,
107    validity: Option<BooleanBuffer>,
108    has_empty_lists: bool,
109    num_values: usize,
110}
111
112/// Represents validity information that we extract from non-list arrays (that
113/// have nulls) as we are encoding
114#[derive(Clone, Debug)]
115struct ValidityDesc {
116    validity: Option<BooleanBuffer>,
117    num_values: usize,
118}
119
120/// Represents validity information that we extract from FSL arrays.  This is
121/// just validity (no offsets) but we also record the dimension of the FSL array
122/// as that will impact the next layer
123#[derive(Clone, Debug)]
124struct FslDesc {
125    validity: Option<BooleanBuffer>,
126    dimension: usize,
127    num_values: usize,
128}
129
130// As we build up rep/def from arrow arrays we record a
131// series of RawRepDef objects.  Each one corresponds to layer
132// in the array structure
133#[derive(Clone, Debug)]
134enum RawRepDef {
135    Offsets(OffsetDesc),
136    Validity(ValidityDesc),
137    Fsl(FslDesc),
138}
139
140impl RawRepDef {
141    // Are there any nulls in this layer
142    fn has_nulls(&self) -> bool {
143        match self {
144            Self::Offsets(OffsetDesc { validity, .. }) => validity.is_some(),
145            Self::Validity(ValidityDesc { validity, .. }) => validity.is_some(),
146            Self::Fsl(FslDesc { validity, .. }) => validity.is_some(),
147        }
148    }
149
150    // How many values are in this layer
151    fn num_values(&self) -> usize {
152        match self {
153            Self::Offsets(OffsetDesc { num_values, .. }) => *num_values,
154            Self::Validity(ValidityDesc { num_values, .. }) => *num_values,
155            Self::Fsl(FslDesc { num_values, .. }) => *num_values,
156        }
157    }
158}
159
160/// Represents repetition and definition levels that have been
161/// serialized into a pair of (optional) level buffers
162#[derive(Debug)]
163pub struct SerializedRepDefs {
164    /// The repetition levels, one per item
165    ///
166    /// If None, there are no lists
167    pub repetition_levels: Option<Arc<[u16]>>,
168    /// The definition levels, one per item
169    ///
170    /// If None, there are no nulls
171    pub definition_levels: Option<Arc<[u16]>>,
172    /// Special records indicate empty / null lists
173    ///
174    /// These do not have any mapping to items.  There may be empty or there may
175    /// be more special records than items or anywhere in between.
176    pub special_records: Vec<SpecialRecord>,
177    /// The meaning of each definition level
178    pub def_meaning: Vec<DefinitionInterpretation>,
179    /// The maximum level that is "visible" from the lowest level
180    ///
181    /// This is the last level before we encounter a list level of some kind.  Once we've
182    /// hit a list level then nulls in any level beyond do not map to actual items.
183    ///
184    /// This is None if there are no lists
185    pub max_visible_level: Option<u16>,
186}
187
188impl SerializedRepDefs {
189    pub fn new(
190        repetition_levels: Option<LevelBuffer>,
191        definition_levels: Option<LevelBuffer>,
192        special_records: Vec<SpecialRecord>,
193        def_meaning: Vec<DefinitionInterpretation>,
194    ) -> Self {
195        let first_list = def_meaning.iter().position(|level| level.is_list());
196        let max_visible_level = first_list.map(|first_list| {
197            def_meaning
198                .iter()
199                .map(|level| level.num_def_levels())
200                .take(first_list)
201                .sum::<u16>()
202        });
203        Self {
204            repetition_levels: repetition_levels.map(Arc::from),
205            definition_levels: definition_levels.map(Arc::from),
206            special_records,
207            def_meaning,
208            max_visible_level,
209        }
210    }
211
212    /// Creates an empty SerializedRepDefs (no repetition, all valid)
213    pub fn empty(def_meaning: Vec<DefinitionInterpretation>) -> Self {
214        Self {
215            repetition_levels: None,
216            definition_levels: None,
217            special_records: Vec::new(),
218            def_meaning,
219            max_visible_level: None,
220        }
221    }
222
223    pub fn rep_slicer(&self) -> Option<RepDefSlicer> {
224        self.repetition_levels
225            .as_ref()
226            .map(|rep| RepDefSlicer::new(self, rep.clone()))
227    }
228
229    pub fn def_slicer(&self) -> Option<RepDefSlicer> {
230        self.definition_levels
231            .as_ref()
232            .map(|def| RepDefSlicer::new(self, def.clone()))
233    }
234
235    /// Creates a version of the SerializedRepDefs with the specials collapsed into
236    /// the repetition and definition levels
237    pub fn collapse_specials(self) -> Self {
238        if self.special_records.is_empty() {
239            return self;
240        }
241
242        // If we have specials then we must have repetition
243        let rep = self.repetition_levels.unwrap();
244
245        let new_len = rep.len() + self.special_records.len();
246
247        let mut new_rep = Vec::with_capacity(new_len);
248        let mut new_def = Vec::with_capacity(new_len);
249
250        // Now we just merge the rep/def levels and the specials into one list.  There is just
251        // one tricky part.  If a non-special is added after a special item then it swaps its
252        // repetition level with the special item.
253        if let Some(def) = self.definition_levels {
254            let mut def_itr = def.iter();
255            let mut rep_itr = rep.iter();
256            let mut special_itr = self.special_records.into_iter().peekable();
257            let mut last_special = None;
258
259            for idx in 0..new_len {
260                if let Some(special) = special_itr.peek() {
261                    if special.pos == idx {
262                        new_rep.push(special.rep_level);
263                        new_def.push(special.def_level);
264                        special_itr.next();
265                        last_special = Some(new_rep.last_mut().unwrap());
266                    } else {
267                        let rep = if let Some(last_special) = last_special {
268                            let rep = *last_special;
269                            *last_special = *rep_itr.next().unwrap();
270                            rep
271                        } else {
272                            *rep_itr.next().unwrap()
273                        };
274                        new_rep.push(rep);
275                        new_def.push(*def_itr.next().unwrap());
276                        last_special = None;
277                    }
278                } else {
279                    let rep = if let Some(last_special) = last_special {
280                        let rep = *last_special;
281                        *last_special = *rep_itr.next().unwrap();
282                        rep
283                    } else {
284                        *rep_itr.next().unwrap()
285                    };
286                    new_rep.push(rep);
287                    new_def.push(*def_itr.next().unwrap());
288                    last_special = None;
289                }
290            }
291        } else {
292            let mut rep_itr = rep.iter();
293            let mut special_itr = self.special_records.into_iter().peekable();
294            let mut last_special = None;
295
296            for idx in 0..new_len {
297                if let Some(special) = special_itr.peek() {
298                    if special.pos == idx {
299                        new_rep.push(special.rep_level);
300                        new_def.push(special.def_level);
301                        special_itr.next();
302                        last_special = Some(new_rep.last_mut().unwrap());
303                    } else {
304                        let rep = if let Some(last_special) = last_special {
305                            let rep = *last_special;
306                            *last_special = *rep_itr.next().unwrap();
307                            rep
308                        } else {
309                            *rep_itr.next().unwrap()
310                        };
311                        new_rep.push(rep);
312                        new_def.push(0);
313                        last_special = None;
314                    }
315                } else {
316                    let rep = if let Some(last_special) = last_special {
317                        let rep = *last_special;
318                        *last_special = *rep_itr.next().unwrap();
319                        rep
320                    } else {
321                        *rep_itr.next().unwrap()
322                    };
323                    new_rep.push(rep);
324                    new_def.push(0);
325                    last_special = None;
326                }
327            }
328        }
329
330        Self {
331            repetition_levels: Some(new_rep.into()),
332            definition_levels: Some(new_def.into()),
333            special_records: Vec::new(),
334            def_meaning: self.def_meaning,
335            max_visible_level: self.max_visible_level,
336        }
337    }
338}
339
340/// Slices a level buffer into pieces
341///
342/// This is needed to handle the fact that a level buffer may have more
343/// levels than values due to special (empty/null) lists.
344///
345/// As a result, a call to `slice_next(10)` may return 10 levels or it may
346/// return more than 10 levels if any special values are encountered.
347#[derive(Debug)]
348pub struct RepDefSlicer<'a> {
349    repdef: &'a SerializedRepDefs,
350    to_slice: LanceBuffer,
351    current: usize,
352}
353
354// TODO: All of this logic will need some changing when we compress rep/def levels.
355impl<'a> RepDefSlicer<'a> {
356    fn new(repdef: &'a SerializedRepDefs, levels: Arc<[u16]>) -> Self {
357        Self {
358            repdef,
359            to_slice: LanceBuffer::reinterpret_slice(levels),
360            current: 0,
361        }
362    }
363
364    pub fn num_levels(&self) -> usize {
365        self.to_slice.len() / 2
366    }
367
368    pub fn num_levels_remaining(&self) -> usize {
369        self.num_levels() - self.current
370    }
371
372    pub fn all_levels(&self) -> &LanceBuffer {
373        &self.to_slice
374    }
375
376    /// Returns the rest of the levels not yet sliced
377    ///
378    /// This must be called instead of `slice_next` on the final iteration.
379    /// This is because anytime we slice there may be empty/null lists on the
380    /// boundary that are "free" and the current behavior in `slice_next` is to
381    /// leave them for the next call.
382    ///
383    /// `slice_rest` will slice all remaining levels and return them.
384    pub fn slice_rest(&mut self) -> LanceBuffer {
385        let start = self.current;
386        let remaining = self.num_levels_remaining();
387        self.current = self.num_levels();
388        self.to_slice.slice_with_length(start * 2, remaining * 2)
389    }
390
391    /// Returns enough levels to satisfy the next `num_values` values
392    pub fn slice_next(&mut self, num_values: usize) -> LanceBuffer {
393        let start = self.current;
394        let Some(max_visible_level) = self.repdef.max_visible_level else {
395            // No lists, should be 1:1 mapping from levels to values
396            self.current = start + num_values;
397            return self.to_slice.slice_with_length(start * 2, num_values * 2);
398        };
399        if let Some(def) = self.repdef.definition_levels.as_ref() {
400            // There are lists and there are def levels.  That means there may be
401            // more rep/def levels than values.  We need to scan the def levels to figure
402            // out which items are "invisible" and skip over them
403            let mut def_itr = def[start..].iter();
404            let mut num_taken = 0;
405            let mut num_passed = 0;
406            while num_taken < num_values {
407                let def_level = *def_itr.next().unwrap();
408                if def_level <= max_visible_level {
409                    num_taken += 1;
410                }
411                num_passed += 1;
412            }
413            self.current = start + num_passed;
414            self.to_slice.slice_with_length(start * 2, num_passed * 2)
415        } else {
416            // No def levels, should be 1:1 mapping from levels to values
417            self.current = start + num_values;
418            self.to_slice.slice_with_length(start * 2, num_values * 2)
419        }
420    }
421}
422
423#[derive(Debug, Copy, Clone, PartialEq, Eq)]
424pub struct SpecialRecord {
425    /// The position of the special record in the items array
426    ///
427    /// Note that this is the position in the "expanded" items array (including the specials)
428    ///
429    /// For example, if we have five items [I0, I1, ..., I4] and two specials [S0(pos=3), S1(pos=6)] then
430    /// the combined array is [I0, I1, I2, S0, I3, I4, S1].
431    ///
432    /// Another tricky fact is that a special "swaps" the repetition level of the matching item when it is
433    /// being inserted into the combined list.  So, if items are [I0(rep=2), I1(rep=1), I2(rep=2), I3(rep=0)]
434    /// and a special is S0(pos=2, rep=1) then the combined list is
435    /// [I0(rep=2), I1(rep=1), S0(rep=2), I2(rep=1), I3(rep=0)].
436    ///
437    /// Or, to put it in practice we start with [[I0], [I1]], [[I2, I3]] and after inserting our special
438    /// we have [[I0], [I1]], [S0, [I2, I3]]
439    pos: usize,
440    /// The definition level of the special record.  This is never 0 and is used to distinguish between an
441    /// empty list and a null list.
442    def_level: u16,
443    /// The repetition level of the special record.  This is never 0 and is used to indicate which level of
444    /// nesting the special record is at.
445    rep_level: u16,
446}
447
448/// This tells us how an array handles definition.  Given a stack of
449/// these and a nested array and a set of definition levels we can calculate
450/// how we should interpret the definition levels.
451///
452/// For example, if the interpretation is [AllValidItem, NullableItem] then
453/// a 0 means "valid item" and a 1 means "null struct".  If the interpretation
454/// is [NullableItem, NullableItem] then a 0 means "valid item" and a 1 means
455/// "null item" and a 2 means "null struct".
456///
457/// Lists are tricky because we might use up to two definition levels for a
458/// single layer of list nesting because we need one value to indicate "empty list"
459/// and another value to indicate "null list".
460#[derive(Debug, Copy, Clone, PartialEq, Eq)]
461pub enum DefinitionInterpretation {
462    AllValidItem,
463    AllValidList,
464    NullableItem,
465    NullableList,
466    EmptyableList,
467    NullableAndEmptyableList,
468}
469
470impl DefinitionInterpretation {
471    /// How many definition levels do we need for this layer
472    pub fn num_def_levels(&self) -> u16 {
473        match self {
474            Self::AllValidItem => 0,
475            Self::AllValidList => 0,
476            Self::NullableItem => 1,
477            Self::NullableList => 1,
478            Self::EmptyableList => 1,
479            Self::NullableAndEmptyableList => 2,
480        }
481    }
482
483    /// Does this layer have nulls?
484    pub fn is_all_valid(&self) -> bool {
485        matches!(
486            self,
487            Self::AllValidItem | Self::AllValidList | Self::EmptyableList
488        )
489    }
490
491    /// Does this layer represent a list?
492    pub fn is_list(&self) -> bool {
493        matches!(
494            self,
495            Self::AllValidList
496                | Self::NullableList
497                | Self::EmptyableList
498                | Self::NullableAndEmptyableList
499        )
500    }
501}
502
503/// The RepDefBuilder is used to collect offsets & validity buffers
504/// from arrow structures.  Once we have those we use the SerializerContext
505/// to build the actual repetition and definition levels by walking through
506/// the arrow constructs in reverse order.
507///
508/// The algorithm for definition levels is as follows:
509///
510/// Given:
511///  - a validity buffer of [T, F, F, T, T]
512///  - a current def level of 5
513///  - a current definitions of [0, 1, 3, 3, 0]
514///
515/// We walk through the definitions and replace them with
516///   the current level whenever a value is invalid.  Thus
517///   our output is: [0, 5, 5, 3, 0]
518///
519/// The algorithm for repetition levels is more complex.
520///
521/// The first time we see an offsets buffer we initialize the
522/// rep levels to have a value of 1 whenever a list starts and 0
523/// otherwise.
524///
525/// So, given offsets of [0, 3, 5] and no repetition we create
526/// rep levels [1 0 0 1 0]
527///
528/// However, we also record the offsets into our current rep and
529/// def levels and all operations happen in context of those offsets.
530///
531/// For example, continuing the above scenario we might then see validity
532/// of [T, F].  This is strange since our validity bitmap has 2 items but
533/// we would have 5 definition levels.  We can use our current offsets
534/// ([0, 3, 5]) to expand [T, F] into [T, T, T, F, F].
535struct SerializerContext {
536    last_offsets: Option<Vec<usize>>,
537    last_offsets_full: Option<Vec<usize>>,
538    specials: Vec<SpecialRecord>,
539    def_meaning: Vec<DefinitionInterpretation>,
540    rep_levels: LevelBuffer,
541    def_levels: LevelBuffer,
542    current_rep: u16,
543    current_def: u16,
544    // FSL layers multiply the preceding def / rep levels by the dimension
545    current_multiplier: usize,
546    has_nulls: bool,
547}
548
549impl SerializerContext {
550    fn new(len: usize, has_nulls: bool, has_offsets: bool, num_layers: usize) -> Self {
551        let def_meaning = Vec::with_capacity(num_layers);
552        Self {
553            last_offsets: None,
554            last_offsets_full: None,
555            rep_levels: if has_offsets {
556                vec![0; len]
557            } else {
558                LevelBuffer::default()
559            },
560            def_levels: if has_nulls {
561                vec![0; len]
562            } else {
563                LevelBuffer::default()
564            },
565            def_meaning,
566            current_rep: 1,
567            current_def: 1,
568            current_multiplier: 1,
569            has_nulls: false,
570            specials: Vec::default(),
571        }
572    }
573
574    fn checkout_def(&mut self, meaning: DefinitionInterpretation) -> u16 {
575        let def = self.current_def;
576        self.current_def += meaning.num_def_levels();
577        self.def_meaning.push(meaning);
578        def
579    }
580
581    fn record_offsets(&mut self, offset_desc: &OffsetDesc) {
582        if self.current_multiplier != 1 {
583            // If we need this it isn't too terrible.  We just need to multiply all of the offsets in offset_desc by
584            // the current multiplier before we do anything with them.  Not adding at the moment simply to avoid the
585            // burden of testing
586            todo!("List<...FSL<...>> not yet supported");
587        }
588        let rep_level = self.current_rep;
589        let (null_list_level, empty_list_level) =
590            match (offset_desc.validity.is_some(), offset_desc.has_empty_lists) {
591                (true, true) => {
592                    let level =
593                        self.checkout_def(DefinitionInterpretation::NullableAndEmptyableList);
594                    (level, level + 1)
595                }
596                (true, false) => (self.checkout_def(DefinitionInterpretation::NullableList), 0),
597                (false, true) => (
598                    0,
599                    self.checkout_def(DefinitionInterpretation::EmptyableList),
600                ),
601                (false, false) => {
602                    self.checkout_def(DefinitionInterpretation::AllValidList);
603                    (0, 0)
604                }
605            };
606        self.current_rep += 1;
607        if let Some(last_offsets) = &self.last_offsets {
608            let last_offsets_full = self.last_offsets_full.as_ref().unwrap();
609            let mut new_last_off = Vec::with_capacity(offset_desc.offsets.len());
610            let mut new_last_off_full = Vec::with_capacity(offset_desc.offsets.len());
611            let mut empties_seen = 0;
612            for off in offset_desc.offsets.windows(2) {
613                let offset_ctx = last_offsets[off[0] as usize];
614                let offset_ctx_end = last_offsets[off[1] as usize];
615                new_last_off.push(offset_ctx);
616                new_last_off_full.push(last_offsets_full[off[0] as usize] + empties_seen);
617                if off[0] == off[1] {
618                    // This list has an empty/null
619                    empties_seen += 1;
620                } else if offset_ctx == offset_ctx_end {
621                    // Inner list is empty/null
622                    // We previously added a special record but now we need to upgrade its repetition
623                    // level to the current level
624                    let matching_special_idx = self
625                        .specials
626                        .binary_search_by_key(&offset_ctx, |spec| spec.pos)
627                        .unwrap();
628                    self.specials[matching_special_idx].rep_level = rep_level;
629                } else {
630                    self.rep_levels[offset_ctx] = rep_level;
631                }
632            }
633            new_last_off.push(last_offsets[*offset_desc.offsets.last().unwrap() as usize]);
634            new_last_off_full.push(
635                last_offsets_full[*offset_desc.offsets.last().unwrap() as usize] + empties_seen,
636            );
637            self.last_offsets = Some(new_last_off);
638            self.last_offsets_full = Some(new_last_off_full);
639        } else {
640            let mut new_last_off = Vec::with_capacity(offset_desc.offsets.len());
641            let mut new_last_off_full = Vec::with_capacity(offset_desc.offsets.len());
642            let mut empties_seen = 0;
643            for off in offset_desc.offsets.windows(2) {
644                new_last_off.push(off[0] as usize);
645                new_last_off_full.push(off[0] as usize + empties_seen);
646                if off[0] == off[1] {
647                    empties_seen += 1;
648                } else {
649                    self.rep_levels[off[0] as usize] = rep_level;
650                }
651            }
652            new_last_off.push(*offset_desc.offsets.last().unwrap() as usize);
653            new_last_off_full.push(*offset_desc.offsets.last().unwrap() as usize + empties_seen);
654            self.last_offsets = Some(new_last_off);
655            self.last_offsets_full = Some(new_last_off_full);
656        }
657
658        // Must update specials _after_ setting last_offsets_full
659        let last_offsets_full = self.last_offsets_full.as_ref().unwrap();
660        let num_combined_specials = self.specials.len() + offset_desc.specials.len();
661        let mut new_specials = Vec::with_capacity(num_combined_specials);
662        let mut new_inserted = 0;
663        let mut old_specials_itr = self.specials.iter().peekable();
664        let mut specials_itr = offset_desc.specials.iter().peekable();
665        for _ in 0..num_combined_specials {
666            if let Some(old_special) = old_specials_itr.peek() {
667                let old_special_pos = old_special.pos + new_inserted;
668                if let Some(new_special) = specials_itr.peek() {
669                    let new_special_pos = last_offsets_full[new_special.pos()];
670                    if old_special_pos < new_special_pos {
671                        let mut old_special = *old_specials_itr.next().unwrap();
672                        old_special.pos = old_special_pos;
673                        new_specials.push(old_special);
674                    } else {
675                        let new_special = specials_itr.next().unwrap();
676                        new_specials.push(SpecialRecord {
677                            pos: new_special_pos,
678                            def_level: if matches!(new_special, SpecialOffset::EmptyList(_)) {
679                                empty_list_level
680                            } else {
681                                null_list_level
682                            },
683                            rep_level,
684                        });
685                        new_inserted += 1;
686                    }
687                } else {
688                    let mut old_special = *old_specials_itr.next().unwrap();
689                    old_special.pos = old_special_pos;
690                    new_specials.push(old_special);
691                }
692            } else {
693                let new_special = specials_itr.next().unwrap();
694                new_specials.push(SpecialRecord {
695                    pos: last_offsets_full[new_special.pos()],
696                    def_level: if matches!(new_special, SpecialOffset::EmptyList(_)) {
697                        empty_list_level
698                    } else {
699                        null_list_level
700                    },
701                    rep_level,
702                });
703                new_inserted += 1;
704            }
705        }
706        self.specials = new_specials;
707    }
708
709    fn do_record_validity(&mut self, validity: &BooleanBuffer, null_level: u16) {
710        self.has_nulls = true;
711        assert!(!self.def_levels.is_empty());
712        if let Some(last_offsets) = &self.last_offsets {
713            last_offsets
714                .windows(2)
715                .zip(validity.iter())
716                .for_each(|(w, valid)| {
717                    let start = w[0] * self.current_multiplier;
718                    let end = w[1] * self.current_multiplier;
719                    if !valid {
720                        self.def_levels[start..end].fill(null_level);
721                    }
722                });
723        } else if self.current_multiplier == 1 {
724            self.def_levels
725                .iter_mut()
726                .zip(validity.iter())
727                .for_each(|(def, valid)| {
728                    if !valid {
729                        *def = null_level;
730                    }
731                });
732        } else {
733            self.def_levels
734                .iter_mut()
735                .zip(
736                    validity
737                        .iter()
738                        .flat_map(|v| std::iter::repeat_n(v, self.current_multiplier)),
739                )
740                .for_each(|(def, valid)| {
741                    if !valid {
742                        *def = null_level;
743                    }
744                });
745        }
746    }
747
748    fn record_validity_buf(&mut self, validity: &Option<BooleanBuffer>) {
749        if let Some(validity) = validity {
750            let def_level = self.checkout_def(DefinitionInterpretation::NullableItem);
751            self.do_record_validity(validity, def_level);
752        } else {
753            self.checkout_def(DefinitionInterpretation::AllValidItem);
754        }
755    }
756
757    fn record_validity(&mut self, validity_desc: &ValidityDesc) {
758        self.record_validity_buf(&validity_desc.validity)
759    }
760
761    fn record_fsl(&mut self, fsl_desc: &FslDesc) {
762        self.current_multiplier *= fsl_desc.dimension;
763        self.record_validity_buf(&fsl_desc.validity);
764    }
765
766    fn build(self) -> SerializedRepDefs {
767        let definition_levels = if self.has_nulls {
768            Some(self.def_levels)
769        } else {
770            None
771        };
772        let repetition_levels = if self.current_rep > 1 {
773            Some(self.rep_levels)
774        } else {
775            None
776        };
777        SerializedRepDefs::new(
778            repetition_levels,
779            definition_levels,
780            self.specials,
781            self.def_meaning,
782        )
783    }
784}
785
786/// As we are encoding we record information about "specials" which are
787/// empty lists or null lists.
788#[derive(Debug, Copy, Clone)]
789enum SpecialOffset {
790    NullList(usize),
791    EmptyList(usize),
792}
793
794impl SpecialOffset {
795    fn pos(&self) -> usize {
796        match self {
797            Self::NullList(pos) => *pos,
798            Self::EmptyList(pos) => *pos,
799        }
800    }
801}
802
803/// A structure used to collect validity buffers and offsets from arrow
804/// arrays and eventually create repetition and definition levels
805///
806/// As we are encoding the structural encoders are given this struct and
807/// will record the arrow information into it.  Once we hit a leaf node we
808/// serialize the data into rep/def levels and write these into the page.
809#[derive(Clone, Default, Debug)]
810pub struct RepDefBuilder {
811    // The rep/def info we have collected so far
812    repdefs: Vec<RawRepDef>,
813    // The current length, can get larger as we traverse lists (e.g. an
814    // array might have 5 lists which results in 50 items)
815    //
816    // Starts uninitialized until we see the first rep/def item
817    len: Option<usize>,
818}
819
820impl RepDefBuilder {
821    fn check_validity_len(&mut self, incoming_len: usize) {
822        if let Some(len) = self.len {
823            assert_eq!(incoming_len, len);
824        }
825        self.len = Some(incoming_len);
826    }
827
828    fn num_layers(&self) -> usize {
829        self.repdefs.len()
830    }
831
832    /// The builder is "empty" if there is no repetition and no nulls.  In this case we don't need
833    /// to store anything to disk (except the description)
834    fn is_empty(&self) -> bool {
835        self.repdefs
836            .iter()
837            .all(|r| matches!(r, RawRepDef::Validity(ValidityDesc { validity: None, .. })))
838    }
839
840    /// Returns true if there is only a single layer of definition
841    pub fn is_simple_validity(&self) -> bool {
842        self.repdefs.len() == 1 && matches!(self.repdefs[0], RawRepDef::Validity(_))
843    }
844
845    /// Return True if any layer has a validity bitmap
846    ///
847    /// Return False if all layers are non-null (the def levels can
848    /// be skipped in this case)
849    pub fn has_nulls(&self) -> bool {
850        self.repdefs.iter().any(|rd| {
851            matches!(
852                rd,
853                RawRepDef::Validity(ValidityDesc {
854                    validity: Some(_),
855                    ..
856                }) | RawRepDef::Fsl(FslDesc {
857                    validity: Some(_),
858                    ..
859                })
860            )
861        })
862    }
863
864    pub fn has_offsets(&self) -> bool {
865        self.repdefs
866            .iter()
867            .any(|rd| matches!(rd, RawRepDef::Offsets(OffsetDesc { .. })))
868    }
869
870    /// Registers a nullable validity bitmap
871    pub fn add_validity_bitmap(&mut self, validity: NullBuffer) {
872        self.check_validity_len(validity.len());
873        self.repdefs.push(RawRepDef::Validity(ValidityDesc {
874            num_values: validity.len(),
875            validity: Some(validity.into_inner()),
876        }));
877    }
878
879    /// Registers an all-valid validity layer
880    pub fn add_no_null(&mut self, len: usize) {
881        self.check_validity_len(len);
882        self.repdefs.push(RawRepDef::Validity(ValidityDesc {
883            validity: None,
884            num_values: len,
885        }));
886    }
887
888    pub fn add_fsl(&mut self, validity: Option<NullBuffer>, dimension: usize, num_values: usize) {
889        if let Some(len) = self.len {
890            assert_eq!(num_values, len);
891        }
892        self.len = Some(num_values * dimension);
893        debug_assert!(validity.is_none() || validity.as_ref().unwrap().len() == num_values);
894        self.repdefs.push(RawRepDef::Fsl(FslDesc {
895            num_values,
896            validity: validity.map(|v| v.into_inner()),
897            dimension,
898        }))
899    }
900
901    fn check_offset_len(&mut self, offsets: &[i64]) {
902        if let Some(len) = self.len {
903            assert!(offsets.len() == len + 1);
904        }
905        self.len = Some(offsets[offsets.len() - 1] as usize);
906    }
907
908    /// Adds a layer of offsets
909    ///
910    /// Offsets are casted to a common type (i64) and also normalized.  Null lists are
911    /// always represented by a zero-length (identical) pair of offsets and so the caller
912    /// should filter out any garbage items before encoding them.  To assist with this the
913    /// method will return true if any non-empty null lists were found.
914    pub fn add_offsets<O: OffsetSizeTrait>(
915        &mut self,
916        offsets: OffsetBuffer<O>,
917        validity: Option<NullBuffer>,
918    ) -> bool {
919        let mut has_garbage_values = false;
920        if O::IS_LARGE {
921            let inner = offsets.into_inner();
922            let len = inner.len();
923            let i64_buff = ScalarBuffer::<i64>::new(inner.into_inner(), 0, len);
924            let mut normalized = Vec::with_capacity(len);
925            normalized.push(0_i64);
926            let mut specials = Vec::new();
927            let mut has_empty_lists = false;
928            let mut last_off = 0;
929            if let Some(validity) = validity.as_ref() {
930                for (idx, (off, valid)) in i64_buff.windows(2).zip(validity.iter()).enumerate() {
931                    let len: i64 = off[1] - off[0];
932                    match (valid, len == 0) {
933                        (false, is_empty) => {
934                            specials.push(SpecialOffset::NullList(idx));
935                            has_garbage_values |= !is_empty;
936                        }
937                        (true, true) => {
938                            has_empty_lists = true;
939                            specials.push(SpecialOffset::EmptyList(idx));
940                        }
941                        _ => {
942                            last_off += len;
943                        }
944                    }
945                    normalized.push(last_off);
946                }
947            } else {
948                for (idx, off) in i64_buff.windows(2).enumerate() {
949                    let len: i64 = off[1] - off[0];
950                    if len == 0 {
951                        has_empty_lists = true;
952                        specials.push(SpecialOffset::EmptyList(idx));
953                    }
954                    last_off += len;
955                    normalized.push(last_off);
956                }
957            };
958            self.check_offset_len(&normalized);
959            self.repdefs.push(RawRepDef::Offsets(OffsetDesc {
960                num_values: normalized.len() - 1,
961                offsets: normalized.into(),
962                validity: validity.map(|v| v.into_inner()),
963                has_empty_lists,
964                specials: specials.into(),
965            }));
966            has_garbage_values
967        } else {
968            let inner = offsets.into_inner();
969            let len = inner.len();
970            let scalar_off = ScalarBuffer::<i32>::new(inner.into_inner(), 0, len);
971            let mut casted = Vec::with_capacity(len);
972            casted.push(0);
973            let mut has_empty_lists = false;
974            let mut specials = Vec::new();
975            let mut last_off: i64 = 0;
976            if let Some(validity) = validity.as_ref() {
977                for (idx, (off, valid)) in scalar_off.windows(2).zip(validity.iter()).enumerate() {
978                    let len = (off[1] - off[0]) as i64;
979                    match (valid, len == 0) {
980                        (false, is_empty) => {
981                            specials.push(SpecialOffset::NullList(idx));
982                            has_garbage_values |= !is_empty;
983                        }
984                        (true, true) => {
985                            has_empty_lists = true;
986                            specials.push(SpecialOffset::EmptyList(idx));
987                        }
988                        _ => {
989                            last_off += len;
990                        }
991                    }
992                    casted.push(last_off);
993                }
994            } else {
995                for (idx, off) in scalar_off.windows(2).enumerate() {
996                    let len = (off[1] - off[0]) as i64;
997                    if len == 0 {
998                        has_empty_lists = true;
999                        specials.push(SpecialOffset::EmptyList(idx));
1000                    }
1001                    last_off += len;
1002                    casted.push(last_off);
1003                }
1004            };
1005            self.check_offset_len(&casted);
1006            self.repdefs.push(RawRepDef::Offsets(OffsetDesc {
1007                num_values: casted.len() - 1,
1008                offsets: casted.into(),
1009                validity: validity.map(|v| v.into_inner()),
1010                has_empty_lists,
1011                specials: specials.into(),
1012            }));
1013            has_garbage_values
1014        }
1015    }
1016
1017    // When we are encoding data it arrives in batches.  For each batch we create a RepDefBuilder and collect the
1018    // various validity buffers and offset buffers from that batch.  Once we have enough batches to write a page we
1019    // need to take this collection of RepDefBuilders and concatenate them and then serialize them into rep/def levels.
1020    //
1021    // TODO: In the future, we may concatenate and serialize at the same time?
1022    //
1023    // This method takes care of the concatenation part.  First we collect all of layer 0 from each builder, then we
1024    // call this method.  Then we collect all of layer 1 from each builder and call this method.  And so on.
1025    //
1026    // That means this method should get a collection of `RawRepDef` where each item is the same kind (all validity or
1027    // all offsets) though the nullability / lengths may be different in each layer.
1028    fn concat_layers<'a>(
1029        layers: impl Iterator<Item = &'a RawRepDef>,
1030        num_layers: usize,
1031    ) -> RawRepDef {
1032        enum LayerKind {
1033            Validity,
1034            Fsl,
1035            Offsets,
1036        }
1037
1038        // We make two passes through the layers.  The first determines if we need to pay the cost of allocating
1039        // buffers.  The second pass actually adds the values.
1040        let mut collected = Vec::with_capacity(num_layers);
1041        let mut has_nulls = false;
1042        let mut layer_kind = LayerKind::Validity;
1043        let mut num_specials = 0;
1044        let mut all_dimension = 0;
1045        let mut all_has_empty_lists = false;
1046        let mut all_num_values = 0;
1047        for layer in layers {
1048            has_nulls |= layer.has_nulls();
1049            match layer {
1050                RawRepDef::Validity(_) => {
1051                    layer_kind = LayerKind::Validity;
1052                }
1053                RawRepDef::Offsets(OffsetDesc {
1054                    specials,
1055                    has_empty_lists,
1056                    ..
1057                }) => {
1058                    all_has_empty_lists |= *has_empty_lists;
1059                    layer_kind = LayerKind::Offsets;
1060                    num_specials += specials.len();
1061                }
1062                RawRepDef::Fsl(FslDesc { dimension, .. }) => {
1063                    layer_kind = LayerKind::Fsl;
1064                    all_dimension = *dimension;
1065                }
1066            }
1067            collected.push(layer);
1068            all_num_values += layer.num_values();
1069        }
1070
1071        // Shortcut if there are no nulls
1072        if !has_nulls {
1073            match layer_kind {
1074                LayerKind::Validity => {
1075                    return RawRepDef::Validity(ValidityDesc {
1076                        validity: None,
1077                        num_values: all_num_values,
1078                    });
1079                }
1080                LayerKind::Fsl => {
1081                    return RawRepDef::Fsl(FslDesc {
1082                        validity: None,
1083                        num_values: all_num_values,
1084                        dimension: all_dimension,
1085                    })
1086                }
1087                LayerKind::Offsets => {}
1088            }
1089        }
1090
1091        // Only allocate if needed
1092        let mut validity_builder = if has_nulls {
1093            BooleanBufferBuilder::new(all_num_values)
1094        } else {
1095            BooleanBufferBuilder::new(0)
1096        };
1097        let mut all_offsets = if matches!(layer_kind, LayerKind::Offsets) {
1098            let mut all_offsets = Vec::with_capacity(all_num_values);
1099            all_offsets.push(0);
1100            all_offsets
1101        } else {
1102            Vec::new()
1103        };
1104        let mut all_specials = Vec::with_capacity(num_specials);
1105
1106        for layer in collected {
1107            match layer {
1108                RawRepDef::Validity(ValidityDesc {
1109                    validity: Some(validity),
1110                    ..
1111                }) => {
1112                    validity_builder.append_buffer(validity);
1113                }
1114                RawRepDef::Validity(ValidityDesc {
1115                    validity: None,
1116                    num_values,
1117                }) => {
1118                    validity_builder.append_n(*num_values, true);
1119                }
1120                RawRepDef::Fsl(FslDesc {
1121                    validity,
1122                    num_values,
1123                    ..
1124                }) => {
1125                    if let Some(validity) = validity {
1126                        validity_builder.append_buffer(validity);
1127                    } else {
1128                        validity_builder.append_n(*num_values, true);
1129                    }
1130                }
1131                RawRepDef::Offsets(OffsetDesc {
1132                    offsets,
1133                    validity: Some(validity),
1134                    has_empty_lists,
1135                    specials,
1136                    ..
1137                }) => {
1138                    all_has_empty_lists |= has_empty_lists;
1139                    validity_builder.append_buffer(validity);
1140                    let existing_lists = all_offsets.len() - 1;
1141                    let last = *all_offsets.last().unwrap();
1142                    all_offsets.extend(offsets.iter().skip(1).map(|off| *off + last));
1143                    all_specials.extend(specials.iter().map(|s| match s {
1144                        SpecialOffset::NullList(pos) => {
1145                            SpecialOffset::NullList(*pos + existing_lists)
1146                        }
1147                        SpecialOffset::EmptyList(pos) => {
1148                            SpecialOffset::EmptyList(*pos + existing_lists)
1149                        }
1150                    }));
1151                }
1152                RawRepDef::Offsets(OffsetDesc {
1153                    offsets,
1154                    validity: None,
1155                    has_empty_lists,
1156                    num_values,
1157                    specials,
1158                }) => {
1159                    all_has_empty_lists |= has_empty_lists;
1160                    if has_nulls {
1161                        validity_builder.append_n(*num_values, true);
1162                    }
1163                    let last = *all_offsets.last().unwrap();
1164                    let existing_lists = all_offsets.len() - 1;
1165                    all_offsets.extend(offsets.iter().skip(1).map(|off| *off + last));
1166                    all_specials.extend(specials.iter().map(|s| match s {
1167                        SpecialOffset::NullList(pos) => {
1168                            SpecialOffset::NullList(*pos + existing_lists)
1169                        }
1170                        SpecialOffset::EmptyList(pos) => {
1171                            SpecialOffset::EmptyList(*pos + existing_lists)
1172                        }
1173                    }));
1174                }
1175            }
1176        }
1177        let validity = if has_nulls {
1178            Some(validity_builder.finish())
1179        } else {
1180            None
1181        };
1182        match layer_kind {
1183            LayerKind::Fsl => RawRepDef::Fsl(FslDesc {
1184                validity,
1185                num_values: all_num_values,
1186                dimension: all_dimension,
1187            }),
1188            LayerKind::Validity => RawRepDef::Validity(ValidityDesc {
1189                validity,
1190                num_values: all_num_values,
1191            }),
1192            LayerKind::Offsets => RawRepDef::Offsets(OffsetDesc {
1193                offsets: all_offsets.into(),
1194                validity,
1195                has_empty_lists: all_has_empty_lists,
1196                num_values: all_num_values,
1197                specials: all_specials.into(),
1198            }),
1199        }
1200    }
1201
1202    /// Converts the validity / offsets buffers that have been gathered so far
1203    /// into repetition and definition levels
1204    pub fn serialize(builders: Vec<Self>) -> SerializedRepDefs {
1205        assert!(!builders.is_empty());
1206        if builders.iter().all(|b| b.is_empty()) {
1207            // No repetition, all-valid
1208            return SerializedRepDefs::empty(
1209                builders
1210                    .first()
1211                    .unwrap()
1212                    .repdefs
1213                    .iter()
1214                    .map(|_| DefinitionInterpretation::AllValidItem)
1215                    .collect::<Vec<_>>(),
1216            );
1217        }
1218        let has_nulls = builders.iter().any(|b| b.has_nulls());
1219        let has_offsets = builders.iter().any(|b| b.has_offsets());
1220        let total_len = builders.iter().map(|b| b.len.unwrap()).sum();
1221        let num_layers = builders[0].num_layers();
1222        let mut context = SerializerContext::new(total_len, has_nulls, has_offsets, num_layers);
1223        let combined_layers = (0..num_layers)
1224            .map(|layer_index| {
1225                Self::concat_layers(
1226                    builders.iter().map(|b| &b.repdefs[layer_index]),
1227                    builders.len(),
1228                )
1229            })
1230            .collect::<Vec<_>>();
1231        debug_assert!(builders
1232            .iter()
1233            .all(|b| b.num_layers() == builders[0].num_layers()));
1234        for layer in combined_layers.into_iter().rev() {
1235            match layer {
1236                RawRepDef::Validity(def) => {
1237                    context.record_validity(&def);
1238                }
1239                RawRepDef::Offsets(rep) => {
1240                    context.record_offsets(&rep);
1241                }
1242                RawRepDef::Fsl(fsl) => {
1243                    context.record_fsl(&fsl);
1244                }
1245            }
1246        }
1247        context.build().collapse_specials()
1248    }
1249}
1250
1251/// Starts with serialized repetition and definition levels and unravels
1252/// them into validity buffers and offsets buffers
1253///
1254/// This is used during decoding to create the necessary arrow structures
1255#[derive(Debug)]
1256pub struct RepDefUnraveler {
1257    rep_levels: Option<LevelBuffer>,
1258    def_levels: Option<LevelBuffer>,
1259    // Maps from definition level to the rep level at which that definition level is visible
1260    levels_to_rep: Vec<u16>,
1261    def_meaning: Arc<[DefinitionInterpretation]>,
1262    // Current definition level to compare to.
1263    current_def_cmp: u16,
1264    // Current rep level, determines which specials we can see
1265    current_rep_cmp: u16,
1266    // Current layer index, 0 means inner-most layer and it counts up from there.  Used to index
1267    // into special_defs
1268    current_layer: usize,
1269}
1270
1271impl RepDefUnraveler {
1272    /// Creates a new unraveler from serialized repetition and definition information
1273    pub fn new(
1274        rep_levels: Option<LevelBuffer>,
1275        def_levels: Option<LevelBuffer>,
1276        def_meaning: Arc<[DefinitionInterpretation]>,
1277    ) -> Self {
1278        let mut levels_to_rep = Vec::with_capacity(def_meaning.len());
1279        let mut rep_counter = 0;
1280        // Level=0 is always visible and means valid item
1281        levels_to_rep.push(0);
1282        for meaning in def_meaning.as_ref() {
1283            match meaning {
1284                DefinitionInterpretation::AllValidItem | DefinitionInterpretation::AllValidList => {
1285                    // There is no corresponding level, so nothing to put in levels_to_rep
1286                }
1287                DefinitionInterpretation::NullableItem => {
1288                    // Some null structs are not visible at inner rep levels in cases like LIST<STRUCT<LIST<...>>>
1289                    levels_to_rep.push(rep_counter);
1290                }
1291                DefinitionInterpretation::NullableList => {
1292                    rep_counter += 1;
1293                    levels_to_rep.push(rep_counter);
1294                }
1295                DefinitionInterpretation::EmptyableList => {
1296                    rep_counter += 1;
1297                    levels_to_rep.push(rep_counter);
1298                }
1299                DefinitionInterpretation::NullableAndEmptyableList => {
1300                    rep_counter += 1;
1301                    levels_to_rep.push(rep_counter);
1302                    levels_to_rep.push(rep_counter);
1303                }
1304            }
1305        }
1306        Self {
1307            rep_levels,
1308            def_levels,
1309            current_def_cmp: 0,
1310            current_rep_cmp: 0,
1311            levels_to_rep,
1312            current_layer: 0,
1313            def_meaning,
1314        }
1315    }
1316
1317    pub fn is_all_valid(&self) -> bool {
1318        self.def_meaning[self.current_layer].is_all_valid()
1319    }
1320
1321    /// If the current level is a repetition layer then this returns the number of lists
1322    /// at this level.
1323    ///
1324    /// This is not valid to call when the current level is a struct/primitive layer because
1325    /// in some cases there may be no rep or def information to know this.
1326    pub fn max_lists(&self) -> usize {
1327        debug_assert!(
1328            self.def_meaning[self.current_layer] != DefinitionInterpretation::NullableItem
1329        );
1330        self.rep_levels
1331            .as_ref()
1332            // Worst case every rep item is max_rep and a new list
1333            .map(|levels| levels.len())
1334            .unwrap_or(0)
1335    }
1336
1337    /// Unravels a layer of offsets from the unraveler into the given offset width
1338    ///
1339    /// When decoding a list the caller should first unravel the offsets and then
1340    /// unravel the validity (this is the opposite order used during encoding)
1341    pub fn unravel_offsets<T: ArrowNativeType>(
1342        &mut self,
1343        offsets: &mut Vec<T>,
1344        validity: Option<&mut BooleanBufferBuilder>,
1345    ) -> Result<()> {
1346        let rep_levels = self
1347            .rep_levels
1348            .as_mut()
1349            .expect("Expected repetition level but data didn't contain repetition");
1350        let valid_level = self.current_def_cmp;
1351        let (null_level, empty_level) = match self.def_meaning[self.current_layer] {
1352            DefinitionInterpretation::NullableList => {
1353                self.current_def_cmp += 1;
1354                (valid_level + 1, 0)
1355            }
1356            DefinitionInterpretation::EmptyableList => {
1357                self.current_def_cmp += 1;
1358                (0, valid_level + 1)
1359            }
1360            DefinitionInterpretation::NullableAndEmptyableList => {
1361                self.current_def_cmp += 2;
1362                (valid_level + 1, valid_level + 2)
1363            }
1364            DefinitionInterpretation::AllValidList => (0, 0),
1365            _ => unreachable!(),
1366        };
1367        let max_level = null_level.max(empty_level);
1368        self.current_layer += 1;
1369
1370        let mut curlen: usize = offsets.last().map(|o| o.as_usize()).unwrap_or(0);
1371
1372        // If offsets is empty this is a no-op.  If offsets is not empty that means we already
1373        // added a set of offsets.  For example, we might have added [0, 3, 5] (2 lists).  Now
1374        // say we want to add [0, 1, 4] (2 lists).  We should get [0, 3, 5, 6, 9] (4 lists).  If
1375        // we don't pop here we get [0, 3, 5, 5, 6, 9] which is wrong.
1376        //
1377        // Or, to think about it another way, if every unraveler adds the starting 0 and the trailing
1378        // length then we have N + unravelers.len() values instead of N + 1.
1379        offsets.pop();
1380
1381        let to_offset = |val: usize| {
1382            T::from_usize(val)
1383            .ok_or_else(|| Error::invalid_input("A single batch had more than i32::MAX values and so a large container type is required", location!()))
1384        };
1385        self.current_rep_cmp += 1;
1386        if let Some(def_levels) = &mut self.def_levels {
1387            assert!(rep_levels.len() == def_levels.len());
1388            // It's possible validity is None even if we have def levels.  For example, we might have
1389            // empty lists (which require def levels) but no nulls.
1390            let mut push_validity: Box<dyn FnMut(bool)> = if let Some(validity) = validity {
1391                Box::new(|is_valid| validity.append(is_valid))
1392            } else {
1393                Box::new(|_| {})
1394            };
1395            // This is a strange access pattern.  We are iterating over the rep/def levels and
1396            // at the same time writing the rep/def levels.  This means we need both a mutable
1397            // and immutable reference to the rep/def levels.
1398            let mut read_idx = 0;
1399            let mut write_idx = 0;
1400            while read_idx < rep_levels.len() {
1401                // SAFETY: We assert that rep_levels and def_levels have the same
1402                // len and read_idx and write_idx can never go past the end.
1403                unsafe {
1404                    let rep_val = *rep_levels.get_unchecked(read_idx);
1405                    if rep_val != 0 {
1406                        let def_val = *def_levels.get_unchecked(read_idx);
1407                        // Copy over
1408                        *rep_levels.get_unchecked_mut(write_idx) = rep_val - 1;
1409                        *def_levels.get_unchecked_mut(write_idx) = def_val;
1410                        write_idx += 1;
1411
1412                        if def_val == 0 {
1413                            // This is a valid list
1414                            offsets.push(to_offset(curlen)?);
1415                            curlen += 1;
1416                            push_validity(true);
1417                        } else if def_val > max_level {
1418                            // This is not visible at this rep level, do not add to offsets, but keep in repdef
1419                        } else if def_val == null_level {
1420                            // This is a null list
1421                            offsets.push(to_offset(curlen)?);
1422                            push_validity(false);
1423                        } else if def_val == empty_level {
1424                            // This is an empty list
1425                            offsets.push(to_offset(curlen)?);
1426                            push_validity(true);
1427                        } else {
1428                            // New valid list starting with null item
1429                            offsets.push(to_offset(curlen)?);
1430                            curlen += 1;
1431                            push_validity(true);
1432                        }
1433                    } else {
1434                        curlen += 1;
1435                    }
1436                    read_idx += 1;
1437                }
1438            }
1439            offsets.push(to_offset(curlen)?);
1440            rep_levels.truncate(write_idx);
1441            def_levels.truncate(write_idx);
1442            Ok(())
1443        } else {
1444            // SAFETY: See above loop
1445            let mut read_idx = 0;
1446            let mut write_idx = 0;
1447            let old_offsets_len = offsets.len();
1448            while read_idx < rep_levels.len() {
1449                // SAFETY: read_idx / write_idx cannot go past rep_levels.len()
1450                unsafe {
1451                    let rep_val = *rep_levels.get_unchecked(read_idx);
1452                    if rep_val != 0 {
1453                        // Finish the current list
1454                        offsets.push(to_offset(curlen)?);
1455                        *rep_levels.get_unchecked_mut(write_idx) = rep_val - 1;
1456                        write_idx += 1;
1457                    }
1458                    curlen += 1;
1459                    read_idx += 1;
1460                }
1461            }
1462            let num_new_lists = offsets.len() - old_offsets_len;
1463            offsets.push(to_offset(curlen)?);
1464            rep_levels.truncate(offsets.len() - 1);
1465            if let Some(validity) = validity {
1466                // Even though we don't have validity it is possible another unraveler did and so we need
1467                // to push all valids
1468                validity.append_n(num_new_lists, true);
1469            }
1470            Ok(())
1471        }
1472    }
1473
1474    pub fn skip_validity(&mut self) {
1475        debug_assert!(
1476            self.def_meaning[self.current_layer] == DefinitionInterpretation::AllValidItem
1477        );
1478        self.current_layer += 1;
1479    }
1480
1481    /// Unravels a layer of validity from the definition levels
1482    pub fn unravel_validity(&mut self, validity: &mut BooleanBufferBuilder) {
1483        debug_assert!(
1484            self.def_meaning[self.current_layer] != DefinitionInterpretation::AllValidItem
1485        );
1486        self.current_layer += 1;
1487
1488        let def_levels = &self.def_levels.as_ref().unwrap();
1489
1490        let current_def_cmp = self.current_def_cmp;
1491        self.current_def_cmp += 1;
1492
1493        for is_valid in def_levels.iter().filter_map(|&level| {
1494            if self.levels_to_rep[level as usize] <= self.current_rep_cmp {
1495                Some(level <= current_def_cmp)
1496            } else {
1497                None
1498            }
1499        }) {
1500            validity.append(is_valid);
1501        }
1502    }
1503
1504    pub fn decimate(&mut self, dimension: usize) {
1505        if self.rep_levels.is_some() {
1506            // If we need to support this then I think we need to walk through the rep def levels to find
1507            // the spots at which we keep.  E.g. if we have:
1508            //  rep: 1 0 0 1 0 1 0 0 0 1 0 0
1509            //  def: 1 1 1 0 1 0 1 1 0 1 1 0
1510            //  dimension: 2
1511            //
1512            // The output should be:
1513            //  rep: 1 0 0 1 0 0 0
1514            //  def: 1 1 1 0 1 1 0
1515            //
1516            // Maybe there's some special logic for empty/null lists?  I'll save the headache for future me.
1517            todo!("Not yet supported FSL<...List<...>>");
1518        }
1519        let Some(def_levels) = self.def_levels.as_mut() else {
1520            return;
1521        };
1522        let mut read_idx = 0;
1523        let mut write_idx = 0;
1524        while read_idx < def_levels.len() {
1525            unsafe {
1526                *def_levels.get_unchecked_mut(write_idx) = *def_levels.get_unchecked(read_idx);
1527            }
1528            write_idx += 1;
1529            read_idx += dimension;
1530        }
1531        def_levels.truncate(write_idx);
1532    }
1533}
1534
1535/// As we decode we may extract rep/def information from multiple pages (or multiple
1536/// chunks within a page).
1537///
1538/// For each chunk we create an unraveler.  Each unraveler can have a completely different
1539/// interpretation (e.g. one page might contain null items but no null structs and the next
1540/// page might have null structs but no null items).
1541///
1542/// Concatenating these unravelers would be tricky and expensive so instead we have a
1543/// composite unraveler which unravels across multiple unravelers.
1544///
1545/// Note: this class should be used even if there is only one page / unraveler.  This is
1546/// because the `RepDefUnraveler`'s API is more complex (it's meant to be called by this
1547/// class)
1548#[derive(Debug)]
1549pub struct CompositeRepDefUnraveler {
1550    unravelers: Vec<RepDefUnraveler>,
1551}
1552
1553impl CompositeRepDefUnraveler {
1554    pub fn new(unravelers: Vec<RepDefUnraveler>) -> Self {
1555        Self { unravelers }
1556    }
1557
1558    /// Unravels a layer of validity
1559    ///
1560    /// Returns None if there are no null items in this layer
1561    pub fn unravel_validity(&mut self, num_values: usize) -> Option<NullBuffer> {
1562        let is_all_valid = self
1563            .unravelers
1564            .iter()
1565            .all(|unraveler| unraveler.is_all_valid());
1566
1567        if is_all_valid {
1568            for unraveler in self.unravelers.iter_mut() {
1569                unraveler.skip_validity();
1570            }
1571            None
1572        } else {
1573            let mut validity = BooleanBufferBuilder::new(num_values);
1574            for unraveler in self.unravelers.iter_mut() {
1575                unraveler.unravel_validity(&mut validity);
1576            }
1577            Some(NullBuffer::new(validity.finish()))
1578        }
1579    }
1580
1581    pub fn unravel_fsl_validity(
1582        &mut self,
1583        num_values: usize,
1584        dimension: usize,
1585    ) -> Option<NullBuffer> {
1586        for unraveler in self.unravelers.iter_mut() {
1587            unraveler.decimate(dimension);
1588        }
1589        self.unravel_validity(num_values)
1590    }
1591
1592    /// Unravels a layer of offsets (and the validity for that layer)
1593    pub fn unravel_offsets<T: ArrowNativeType>(
1594        &mut self,
1595    ) -> Result<(OffsetBuffer<T>, Option<NullBuffer>)> {
1596        let mut is_all_valid = true;
1597        let mut max_num_lists = 0;
1598        for unraveler in self.unravelers.iter() {
1599            is_all_valid &= unraveler.is_all_valid();
1600            max_num_lists += unraveler.max_lists();
1601        }
1602
1603        let mut validity = if is_all_valid {
1604            None
1605        } else {
1606            // Note: This is probably an over-estimate and potentially even an under-estimate.  We only know
1607            // right now how many items we have and not how many rows.  (TODO: Shouldn't we know the # of rows?)
1608            Some(BooleanBufferBuilder::new(max_num_lists))
1609        };
1610
1611        let mut offsets = Vec::with_capacity(max_num_lists + 1);
1612
1613        for unraveler in self.unravelers.iter_mut() {
1614            unraveler.unravel_offsets(&mut offsets, validity.as_mut())?;
1615        }
1616
1617        Ok((
1618            OffsetBuffer::new(ScalarBuffer::from(offsets)),
1619            validity.map(|mut v| NullBuffer::new(v.finish())),
1620        ))
1621    }
1622}
1623
1624/// A [`ControlWordIterator`] when there are both repetition and definition levels
1625///
1626/// The iterator will put the repetition level in the upper bits and the definition
1627/// level in the lower bits.  The number of bits used for each level is determined
1628/// by the width of the repetition and definition levels.
1629#[derive(Debug)]
1630pub struct BinaryControlWordIterator<I: Iterator<Item = (u16, u16)>, W> {
1631    repdef: I,
1632    def_width: usize,
1633    max_rep: u16,
1634    max_visible_def: u16,
1635    rep_mask: u16,
1636    def_mask: u16,
1637    bits_rep: u8,
1638    bits_def: u8,
1639    phantom: std::marker::PhantomData<W>,
1640}
1641
1642impl<I: Iterator<Item = (u16, u16)>> BinaryControlWordIterator<I, u8> {
1643    fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1644        let next = self.repdef.next()?;
1645        let control_word: u8 =
1646            (((next.0 & self.rep_mask) as u8) << self.def_width) + ((next.1 & self.def_mask) as u8);
1647        buf.push(control_word);
1648        let is_new_row = next.0 == self.max_rep;
1649        let is_visible = next.1 <= self.max_visible_def;
1650        let is_valid_item = next.1 == 0;
1651        Some(ControlWordDesc {
1652            is_new_row,
1653            is_visible,
1654            is_valid_item,
1655        })
1656    }
1657}
1658
1659impl<I: Iterator<Item = (u16, u16)>> BinaryControlWordIterator<I, u16> {
1660    fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1661        let next = self.repdef.next()?;
1662        let control_word: u16 =
1663            ((next.0 & self.rep_mask) << self.def_width) + (next.1 & self.def_mask);
1664        let control_word = control_word.to_le_bytes();
1665        buf.push(control_word[0]);
1666        buf.push(control_word[1]);
1667        let is_new_row = next.0 == self.max_rep;
1668        let is_visible = next.1 <= self.max_visible_def;
1669        let is_valid_item = next.1 == 0;
1670        Some(ControlWordDesc {
1671            is_new_row,
1672            is_visible,
1673            is_valid_item,
1674        })
1675    }
1676}
1677
1678impl<I: Iterator<Item = (u16, u16)>> BinaryControlWordIterator<I, u32> {
1679    fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1680        let next = self.repdef.next()?;
1681        let control_word: u32 = (((next.0 & self.rep_mask) as u32) << self.def_width)
1682            + ((next.1 & self.def_mask) as u32);
1683        let control_word = control_word.to_le_bytes();
1684        buf.push(control_word[0]);
1685        buf.push(control_word[1]);
1686        buf.push(control_word[2]);
1687        buf.push(control_word[3]);
1688        let is_new_row = next.0 == self.max_rep;
1689        let is_visible = next.1 <= self.max_visible_def;
1690        let is_valid_item = next.1 == 0;
1691        Some(ControlWordDesc {
1692            is_new_row,
1693            is_visible,
1694            is_valid_item,
1695        })
1696    }
1697}
1698
1699/// A [`ControlWordIterator`] when there are only definition levels or only repetition levels
1700#[derive(Debug)]
1701pub struct UnaryControlWordIterator<I: Iterator<Item = u16>, W> {
1702    repdef: I,
1703    level_mask: u16,
1704    bits_rep: u8,
1705    bits_def: u8,
1706    max_rep: u16,
1707    phantom: std::marker::PhantomData<W>,
1708}
1709
1710impl<I: Iterator<Item = u16>> UnaryControlWordIterator<I, u8> {
1711    fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1712        let next = self.repdef.next()?;
1713        buf.push((next & self.level_mask) as u8);
1714        let is_new_row = self.max_rep == 0 || next == self.max_rep;
1715        let is_valid_item = next == 0 || self.bits_def == 0;
1716        Some(ControlWordDesc {
1717            is_new_row,
1718            // Either there is no rep, in which case there are no invisible items
1719            // or there is no def, in which case there are no invisible items
1720            is_visible: true,
1721            is_valid_item,
1722        })
1723    }
1724}
1725
1726impl<I: Iterator<Item = u16>> UnaryControlWordIterator<I, u16> {
1727    fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1728        let next = self.repdef.next().unwrap() & self.level_mask;
1729        let control_word = next.to_le_bytes();
1730        buf.push(control_word[0]);
1731        buf.push(control_word[1]);
1732        let is_new_row = self.max_rep == 0 || next == self.max_rep;
1733        let is_valid_item = next == 0 || self.bits_def == 0;
1734        Some(ControlWordDesc {
1735            is_new_row,
1736            is_visible: true,
1737            is_valid_item,
1738        })
1739    }
1740}
1741
1742impl<I: Iterator<Item = u16>> UnaryControlWordIterator<I, u32> {
1743    fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1744        let next = self.repdef.next()?;
1745        let next = (next & self.level_mask) as u32;
1746        let control_word = next.to_le_bytes();
1747        buf.push(control_word[0]);
1748        buf.push(control_word[1]);
1749        buf.push(control_word[2]);
1750        buf.push(control_word[3]);
1751        let is_new_row = self.max_rep == 0 || next as u16 == self.max_rep;
1752        let is_valid_item = next == 0 || self.bits_def == 0;
1753        Some(ControlWordDesc {
1754            is_new_row,
1755            is_visible: true,
1756            is_valid_item,
1757        })
1758    }
1759}
1760
1761/// A [`ControlWordIterator`] when there are no repetition or definition levels
1762#[derive(Debug)]
1763pub struct NilaryControlWordIterator {
1764    len: usize,
1765    idx: usize,
1766}
1767
1768impl NilaryControlWordIterator {
1769    fn append_next(&mut self) -> Option<ControlWordDesc> {
1770        if self.idx == self.len {
1771            None
1772        } else {
1773            self.idx += 1;
1774            Some(ControlWordDesc {
1775                is_new_row: true,
1776                is_visible: true,
1777                is_valid_item: true,
1778            })
1779        }
1780    }
1781}
1782
1783/// Helper function to get a bit mask of the given width
1784fn get_mask(width: u16) -> u16 {
1785    (1 << width) - 1
1786}
1787
1788// We're really going out of our way to avoid boxing here but this will be called on a per-value basis
1789// so it is in the critical path.
1790type SpecificBinaryControlWordIterator<'a, T> = BinaryControlWordIterator<
1791    Zip<Copied<std::slice::Iter<'a, u16>>, Copied<std::slice::Iter<'a, u16>>>,
1792    T,
1793>;
1794
1795/// An iterator that generates control words from repetition and definition levels
1796///
1797/// "Control word" is just a fancy term for a single u8/u16/u32 that contains both
1798/// the repetition and definition in it.
1799///
1800/// In the large majority of case we only need a single byte to represent both the
1801/// repetition and definition levels.  However, if there is deep nesting then we may
1802/// need two bytes.  In the worst case we need 4 bytes though this suggests hundreds of
1803/// levels of nesting which seems unlikely to encounter in practice.
1804#[derive(Debug)]
1805pub enum ControlWordIterator<'a> {
1806    Binary8(SpecificBinaryControlWordIterator<'a, u8>),
1807    Binary16(SpecificBinaryControlWordIterator<'a, u16>),
1808    Binary32(SpecificBinaryControlWordIterator<'a, u32>),
1809    Unary8(UnaryControlWordIterator<Copied<std::slice::Iter<'a, u16>>, u8>),
1810    Unary16(UnaryControlWordIterator<Copied<std::slice::Iter<'a, u16>>, u16>),
1811    Unary32(UnaryControlWordIterator<Copied<std::slice::Iter<'a, u16>>, u32>),
1812    Nilary(NilaryControlWordIterator),
1813}
1814
1815/// Describes the properties of a control word
1816#[derive(Debug)]
1817pub struct ControlWordDesc {
1818    pub is_new_row: bool,
1819    pub is_visible: bool,
1820    pub is_valid_item: bool,
1821}
1822
1823impl ControlWordIterator<'_> {
1824    /// Appends the next control word to the buffer
1825    ///
1826    /// Returns true if this is the start of a new item (i.e. the repetition level is maxed out)
1827    pub fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1828        match self {
1829            Self::Binary8(iter) => iter.append_next(buf),
1830            Self::Binary16(iter) => iter.append_next(buf),
1831            Self::Binary32(iter) => iter.append_next(buf),
1832            Self::Unary8(iter) => iter.append_next(buf),
1833            Self::Unary16(iter) => iter.append_next(buf),
1834            Self::Unary32(iter) => iter.append_next(buf),
1835            Self::Nilary(iter) => iter.append_next(),
1836        }
1837    }
1838
1839    /// Return true if the control word iterator has repetition levels
1840    pub fn has_repetition(&self) -> bool {
1841        match self {
1842            Self::Binary8(_) | Self::Binary16(_) | Self::Binary32(_) => true,
1843            Self::Unary8(iter) => iter.bits_rep > 0,
1844            Self::Unary16(iter) => iter.bits_rep > 0,
1845            Self::Unary32(iter) => iter.bits_rep > 0,
1846            Self::Nilary(_) => false,
1847        }
1848    }
1849
1850    /// Returns the number of bytes per control word
1851    pub fn bytes_per_word(&self) -> usize {
1852        match self {
1853            Self::Binary8(_) => 1,
1854            Self::Binary16(_) => 2,
1855            Self::Binary32(_) => 4,
1856            Self::Unary8(_) => 1,
1857            Self::Unary16(_) => 2,
1858            Self::Unary32(_) => 4,
1859            Self::Nilary(_) => 0,
1860        }
1861    }
1862
1863    /// Returns the number of bits used for the repetition level
1864    pub fn bits_rep(&self) -> u8 {
1865        match self {
1866            Self::Binary8(iter) => iter.bits_rep,
1867            Self::Binary16(iter) => iter.bits_rep,
1868            Self::Binary32(iter) => iter.bits_rep,
1869            Self::Unary8(iter) => iter.bits_rep,
1870            Self::Unary16(iter) => iter.bits_rep,
1871            Self::Unary32(iter) => iter.bits_rep,
1872            Self::Nilary(_) => 0,
1873        }
1874    }
1875
1876    /// Returns the number of bits used for the definition level
1877    pub fn bits_def(&self) -> u8 {
1878        match self {
1879            Self::Binary8(iter) => iter.bits_def,
1880            Self::Binary16(iter) => iter.bits_def,
1881            Self::Binary32(iter) => iter.bits_def,
1882            Self::Unary8(iter) => iter.bits_def,
1883            Self::Unary16(iter) => iter.bits_def,
1884            Self::Unary32(iter) => iter.bits_def,
1885            Self::Nilary(_) => 0,
1886        }
1887    }
1888}
1889
1890/// Builds a [`ControlWordIterator`] from repetition and definition levels
1891/// by first calculating the width needed and then creating the iterator
1892/// with the appropriate width
1893pub fn build_control_word_iterator<'a>(
1894    rep: Option<&'a [u16]>,
1895    max_rep: u16,
1896    def: Option<&'a [u16]>,
1897    max_def: u16,
1898    max_visible_def: u16,
1899    len: usize,
1900) -> ControlWordIterator<'a> {
1901    let rep_width = if max_rep == 0 {
1902        0
1903    } else {
1904        log_2_ceil(max_rep as u32) as u16
1905    };
1906    let rep_mask = if max_rep == 0 { 0 } else { get_mask(rep_width) };
1907    let def_width = if max_def == 0 {
1908        0
1909    } else {
1910        log_2_ceil(max_def as u32) as u16
1911    };
1912    let def_mask = if max_def == 0 { 0 } else { get_mask(def_width) };
1913    let total_width = rep_width + def_width;
1914    match (rep, def) {
1915        (Some(rep), Some(def)) => {
1916            let iter = rep.iter().copied().zip(def.iter().copied());
1917            let def_width = def_width as usize;
1918            if total_width <= 8 {
1919                ControlWordIterator::Binary8(BinaryControlWordIterator {
1920                    repdef: iter,
1921                    rep_mask,
1922                    def_mask,
1923                    def_width,
1924                    max_rep,
1925                    max_visible_def,
1926                    bits_rep: rep_width as u8,
1927                    bits_def: def_width as u8,
1928                    phantom: std::marker::PhantomData,
1929                })
1930            } else if total_width <= 16 {
1931                ControlWordIterator::Binary16(BinaryControlWordIterator {
1932                    repdef: iter,
1933                    rep_mask,
1934                    def_mask,
1935                    def_width,
1936                    max_rep,
1937                    max_visible_def,
1938                    bits_rep: rep_width as u8,
1939                    bits_def: def_width as u8,
1940                    phantom: std::marker::PhantomData,
1941                })
1942            } else {
1943                ControlWordIterator::Binary32(BinaryControlWordIterator {
1944                    repdef: iter,
1945                    rep_mask,
1946                    def_mask,
1947                    def_width,
1948                    max_rep,
1949                    max_visible_def,
1950                    bits_rep: rep_width as u8,
1951                    bits_def: def_width as u8,
1952                    phantom: std::marker::PhantomData,
1953                })
1954            }
1955        }
1956        (Some(lev), None) => {
1957            let iter = lev.iter().copied();
1958            if total_width <= 8 {
1959                ControlWordIterator::Unary8(UnaryControlWordIterator {
1960                    repdef: iter,
1961                    level_mask: rep_mask,
1962                    bits_rep: total_width as u8,
1963                    bits_def: 0,
1964                    max_rep,
1965                    phantom: std::marker::PhantomData,
1966                })
1967            } else if total_width <= 16 {
1968                ControlWordIterator::Unary16(UnaryControlWordIterator {
1969                    repdef: iter,
1970                    level_mask: rep_mask,
1971                    bits_rep: total_width as u8,
1972                    bits_def: 0,
1973                    max_rep,
1974                    phantom: std::marker::PhantomData,
1975                })
1976            } else {
1977                ControlWordIterator::Unary32(UnaryControlWordIterator {
1978                    repdef: iter,
1979                    level_mask: rep_mask,
1980                    bits_rep: total_width as u8,
1981                    bits_def: 0,
1982                    max_rep,
1983                    phantom: std::marker::PhantomData,
1984                })
1985            }
1986        }
1987        (None, Some(lev)) => {
1988            let iter = lev.iter().copied();
1989            if total_width <= 8 {
1990                ControlWordIterator::Unary8(UnaryControlWordIterator {
1991                    repdef: iter,
1992                    level_mask: def_mask,
1993                    bits_rep: 0,
1994                    bits_def: total_width as u8,
1995                    max_rep: 0,
1996                    phantom: std::marker::PhantomData,
1997                })
1998            } else if total_width <= 16 {
1999                ControlWordIterator::Unary16(UnaryControlWordIterator {
2000                    repdef: iter,
2001                    level_mask: def_mask,
2002                    bits_rep: 0,
2003                    bits_def: total_width as u8,
2004                    max_rep: 0,
2005                    phantom: std::marker::PhantomData,
2006                })
2007            } else {
2008                ControlWordIterator::Unary32(UnaryControlWordIterator {
2009                    repdef: iter,
2010                    level_mask: def_mask,
2011                    bits_rep: 0,
2012                    bits_def: total_width as u8,
2013                    max_rep: 0,
2014                    phantom: std::marker::PhantomData,
2015                })
2016            }
2017        }
2018        (None, None) => ControlWordIterator::Nilary(NilaryControlWordIterator { len, idx: 0 }),
2019    }
2020}
2021
2022/// A parser to unwrap control words into repetition and definition levels
2023///
2024/// This is the inverse of the [`ControlWordIterator`].
2025#[derive(Copy, Clone, Debug)]
2026pub enum ControlWordParser {
2027    // First item is the bits to shift, second is the mask to apply (the mask can be
2028    // calculated from the bits to shift but we don't want to calculate it each time)
2029    BOTH8(u8, u32),
2030    BOTH16(u8, u32),
2031    BOTH32(u8, u32),
2032    REP8,
2033    REP16,
2034    REP32,
2035    DEF8,
2036    DEF16,
2037    DEF32,
2038    NIL,
2039}
2040
2041impl ControlWordParser {
2042    fn parse_both<const WORD_SIZE: u8>(
2043        src: &[u8],
2044        dst_rep: &mut Vec<u16>,
2045        dst_def: &mut Vec<u16>,
2046        bits_to_shift: u8,
2047        mask_to_apply: u32,
2048    ) {
2049        match WORD_SIZE {
2050            1 => {
2051                let word = src[0];
2052                let rep = word >> bits_to_shift;
2053                let def = word & (mask_to_apply as u8);
2054                dst_rep.push(rep as u16);
2055                dst_def.push(def as u16);
2056            }
2057            2 => {
2058                let word = u16::from_le_bytes([src[0], src[1]]);
2059                let rep = word >> bits_to_shift;
2060                let def = word & mask_to_apply as u16;
2061                dst_rep.push(rep);
2062                dst_def.push(def);
2063            }
2064            4 => {
2065                let word = u32::from_le_bytes([src[0], src[1], src[2], src[3]]);
2066                let rep = word >> bits_to_shift;
2067                let def = word & mask_to_apply;
2068                dst_rep.push(rep as u16);
2069                dst_def.push(def as u16);
2070            }
2071            _ => unreachable!(),
2072        }
2073    }
2074
2075    fn parse_desc_both<const WORD_SIZE: u8>(
2076        src: &[u8],
2077        bits_to_shift: u8,
2078        mask_to_apply: u32,
2079        max_rep: u16,
2080        max_visible_def: u16,
2081    ) -> ControlWordDesc {
2082        match WORD_SIZE {
2083            1 => {
2084                let word = src[0];
2085                let rep = word >> bits_to_shift;
2086                let def = word & (mask_to_apply as u8);
2087                let is_visible = def as u16 <= max_visible_def;
2088                let is_new_row = rep as u16 == max_rep;
2089                let is_valid_item = def == 0;
2090                ControlWordDesc {
2091                    is_visible,
2092                    is_new_row,
2093                    is_valid_item,
2094                }
2095            }
2096            2 => {
2097                let word = u16::from_le_bytes([src[0], src[1]]);
2098                let rep = word >> bits_to_shift;
2099                let def = word & mask_to_apply as u16;
2100                let is_visible = def <= max_visible_def;
2101                let is_new_row = rep == max_rep;
2102                let is_valid_item = def == 0;
2103                ControlWordDesc {
2104                    is_visible,
2105                    is_new_row,
2106                    is_valid_item,
2107                }
2108            }
2109            4 => {
2110                let word = u32::from_le_bytes([src[0], src[1], src[2], src[3]]);
2111                let rep = word >> bits_to_shift;
2112                let def = word & mask_to_apply;
2113                let is_visible = def as u16 <= max_visible_def;
2114                let is_new_row = rep as u16 == max_rep;
2115                let is_valid_item = def == 0;
2116                ControlWordDesc {
2117                    is_visible,
2118                    is_new_row,
2119                    is_valid_item,
2120                }
2121            }
2122            _ => unreachable!(),
2123        }
2124    }
2125
2126    fn parse_one<const WORD_SIZE: u8>(src: &[u8], dst: &mut Vec<u16>) {
2127        match WORD_SIZE {
2128            1 => {
2129                let word = src[0];
2130                dst.push(word as u16);
2131            }
2132            2 => {
2133                let word = u16::from_le_bytes([src[0], src[1]]);
2134                dst.push(word);
2135            }
2136            4 => {
2137                let word = u32::from_le_bytes([src[0], src[1], src[2], src[3]]);
2138                dst.push(word as u16);
2139            }
2140            _ => unreachable!(),
2141        }
2142    }
2143
2144    fn parse_rep_desc_one<const WORD_SIZE: u8>(src: &[u8], max_rep: u16) -> ControlWordDesc {
2145        match WORD_SIZE {
2146            1 => ControlWordDesc {
2147                is_new_row: src[0] as u16 == max_rep,
2148                is_visible: true,
2149                is_valid_item: true,
2150            },
2151            2 => ControlWordDesc {
2152                is_new_row: u16::from_le_bytes([src[0], src[1]]) == max_rep,
2153                is_visible: true,
2154                is_valid_item: true,
2155            },
2156            4 => ControlWordDesc {
2157                is_new_row: u32::from_le_bytes([src[0], src[1], src[2], src[3]]) as u16 == max_rep,
2158                is_visible: true,
2159                is_valid_item: true,
2160            },
2161            _ => unreachable!(),
2162        }
2163    }
2164
2165    fn parse_def_desc_one<const WORD_SIZE: u8>(src: &[u8]) -> ControlWordDesc {
2166        match WORD_SIZE {
2167            1 => ControlWordDesc {
2168                is_new_row: true,
2169                is_visible: true,
2170                is_valid_item: src[0] == 0,
2171            },
2172            2 => ControlWordDesc {
2173                is_new_row: true,
2174                is_visible: true,
2175                is_valid_item: u16::from_le_bytes([src[0], src[1]]) == 0,
2176            },
2177            4 => ControlWordDesc {
2178                is_new_row: true,
2179                is_visible: true,
2180                is_valid_item: u32::from_le_bytes([src[0], src[1], src[2], src[3]]) as u16 == 0,
2181            },
2182            _ => unreachable!(),
2183        }
2184    }
2185
2186    /// Returns the number of bytes per control word
2187    pub fn bytes_per_word(&self) -> usize {
2188        match self {
2189            Self::BOTH8(..) => 1,
2190            Self::BOTH16(..) => 2,
2191            Self::BOTH32(..) => 4,
2192            Self::REP8 => 1,
2193            Self::REP16 => 2,
2194            Self::REP32 => 4,
2195            Self::DEF8 => 1,
2196            Self::DEF16 => 2,
2197            Self::DEF32 => 4,
2198            Self::NIL => 0,
2199        }
2200    }
2201
2202    /// Appends the next control word to the rep & def buffers
2203    ///
2204    /// `src` should be pointing at the first byte (little endian) of the control word
2205    ///
2206    /// `dst_rep` and `dst_def` are the buffers to append the rep and def levels to.
2207    /// They will not be appended to if not needed.
2208    pub fn parse(&self, src: &[u8], dst_rep: &mut Vec<u16>, dst_def: &mut Vec<u16>) {
2209        match self {
2210            Self::BOTH8(bits_to_shift, mask_to_apply) => {
2211                Self::parse_both::<1>(src, dst_rep, dst_def, *bits_to_shift, *mask_to_apply)
2212            }
2213            Self::BOTH16(bits_to_shift, mask_to_apply) => {
2214                Self::parse_both::<2>(src, dst_rep, dst_def, *bits_to_shift, *mask_to_apply)
2215            }
2216            Self::BOTH32(bits_to_shift, mask_to_apply) => {
2217                Self::parse_both::<4>(src, dst_rep, dst_def, *bits_to_shift, *mask_to_apply)
2218            }
2219            Self::REP8 => Self::parse_one::<1>(src, dst_rep),
2220            Self::REP16 => Self::parse_one::<2>(src, dst_rep),
2221            Self::REP32 => Self::parse_one::<4>(src, dst_rep),
2222            Self::DEF8 => Self::parse_one::<1>(src, dst_def),
2223            Self::DEF16 => Self::parse_one::<2>(src, dst_def),
2224            Self::DEF32 => Self::parse_one::<4>(src, dst_def),
2225            Self::NIL => {}
2226        }
2227    }
2228
2229    /// Return true if the control words contain repetition information
2230    pub fn has_rep(&self) -> bool {
2231        match self {
2232            Self::BOTH8(..)
2233            | Self::BOTH16(..)
2234            | Self::BOTH32(..)
2235            | Self::REP8
2236            | Self::REP16
2237            | Self::REP32 => true,
2238            Self::DEF8 | Self::DEF16 | Self::DEF32 | Self::NIL => false,
2239        }
2240    }
2241
2242    /// Temporarily parses the control word to inspect its properties but does not append to any buffers
2243    pub fn parse_desc(&self, src: &[u8], max_rep: u16, max_visible_def: u16) -> ControlWordDesc {
2244        match self {
2245            Self::BOTH8(bits_to_shift, mask_to_apply) => Self::parse_desc_both::<1>(
2246                src,
2247                *bits_to_shift,
2248                *mask_to_apply,
2249                max_rep,
2250                max_visible_def,
2251            ),
2252            Self::BOTH16(bits_to_shift, mask_to_apply) => Self::parse_desc_both::<2>(
2253                src,
2254                *bits_to_shift,
2255                *mask_to_apply,
2256                max_rep,
2257                max_visible_def,
2258            ),
2259            Self::BOTH32(bits_to_shift, mask_to_apply) => Self::parse_desc_both::<4>(
2260                src,
2261                *bits_to_shift,
2262                *mask_to_apply,
2263                max_rep,
2264                max_visible_def,
2265            ),
2266            Self::REP8 => Self::parse_rep_desc_one::<1>(src, max_rep),
2267            Self::REP16 => Self::parse_rep_desc_one::<2>(src, max_rep),
2268            Self::REP32 => Self::parse_rep_desc_one::<4>(src, max_rep),
2269            Self::DEF8 => Self::parse_def_desc_one::<1>(src),
2270            Self::DEF16 => Self::parse_def_desc_one::<2>(src),
2271            Self::DEF32 => Self::parse_def_desc_one::<4>(src),
2272            Self::NIL => ControlWordDesc {
2273                is_new_row: true,
2274                is_valid_item: true,
2275                is_visible: true,
2276            },
2277        }
2278    }
2279
2280    /// Creates a new parser from the number of bits used for the repetition and definition levels
2281    pub fn new(bits_rep: u8, bits_def: u8) -> Self {
2282        let total_bits = bits_rep + bits_def;
2283
2284        enum WordSize {
2285            One,
2286            Two,
2287            Four,
2288        }
2289
2290        let word_size = if total_bits <= 8 {
2291            WordSize::One
2292        } else if total_bits <= 16 {
2293            WordSize::Two
2294        } else {
2295            WordSize::Four
2296        };
2297
2298        match (bits_rep > 0, bits_def > 0, word_size) {
2299            (false, false, _) => Self::NIL,
2300            (false, true, WordSize::One) => Self::DEF8,
2301            (false, true, WordSize::Two) => Self::DEF16,
2302            (false, true, WordSize::Four) => Self::DEF32,
2303            (true, false, WordSize::One) => Self::REP8,
2304            (true, false, WordSize::Two) => Self::REP16,
2305            (true, false, WordSize::Four) => Self::REP32,
2306            (true, true, WordSize::One) => Self::BOTH8(bits_def, get_mask(bits_def as u16) as u32),
2307            (true, true, WordSize::Two) => Self::BOTH16(bits_def, get_mask(bits_def as u16) as u32),
2308            (true, true, WordSize::Four) => {
2309                Self::BOTH32(bits_def, get_mask(bits_def as u16) as u32)
2310            }
2311        }
2312    }
2313}
2314
2315#[cfg(test)]
2316mod tests {
2317    use arrow_buffer::{NullBuffer, OffsetBuffer, ScalarBuffer};
2318
2319    use crate::repdef::{
2320        CompositeRepDefUnraveler, DefinitionInterpretation, RepDefUnraveler, SerializedRepDefs,
2321    };
2322
2323    use super::RepDefBuilder;
2324
2325    fn validity(values: &[bool]) -> NullBuffer {
2326        NullBuffer::from_iter(values.iter().copied())
2327    }
2328
2329    fn offsets_32(values: &[i32]) -> OffsetBuffer<i32> {
2330        OffsetBuffer::<i32>::new(ScalarBuffer::from_iter(values.iter().copied()))
2331    }
2332
2333    fn offsets_64(values: &[i64]) -> OffsetBuffer<i64> {
2334        OffsetBuffer::<i64>::new(ScalarBuffer::from_iter(values.iter().copied()))
2335    }
2336
2337    #[test]
2338    fn test_repdef_basic() {
2339        // Basic case, rep & def
2340        let mut builder = RepDefBuilder::default();
2341        builder.add_offsets(
2342            offsets_64(&[0, 2, 2, 5]),
2343            Some(validity(&[true, false, true])),
2344        );
2345        builder.add_offsets(
2346            offsets_64(&[0, 1, 3, 5, 5, 9]),
2347            Some(validity(&[true, true, true, false, true])),
2348        );
2349        builder.add_validity_bitmap(validity(&[
2350            true, true, true, false, false, false, true, true, false,
2351        ]));
2352
2353        let repdefs = RepDefBuilder::serialize(vec![builder]);
2354        let rep = repdefs.repetition_levels.unwrap();
2355        let def = repdefs.definition_levels.unwrap();
2356
2357        assert_eq!(vec![0, 0, 0, 3, 1, 1, 2, 1, 0, 0, 1], *def);
2358        assert_eq!(vec![2, 1, 0, 2, 2, 0, 1, 1, 0, 0, 0], *rep);
2359
2360        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2361            Some(rep.as_ref().to_vec()),
2362            Some(def.as_ref().to_vec()),
2363            repdefs.def_meaning.into(),
2364        )]);
2365
2366        // Note: validity doesn't exactly round-trip because repdef normalizes some of the
2367        // redundant validity values
2368        assert_eq!(
2369            unraveler.unravel_validity(9),
2370            Some(validity(&[
2371                true, true, true, false, false, false, true, true, false
2372            ]))
2373        );
2374        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2375        assert_eq!(off.inner(), offsets_32(&[0, 1, 3, 5, 5, 9]).inner());
2376        assert_eq!(val, Some(validity(&[true, true, true, false, true])));
2377        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2378        assert_eq!(off.inner(), offsets_32(&[0, 2, 2, 5]).inner());
2379        assert_eq!(val, Some(validity(&[true, false, true])));
2380    }
2381
2382    #[test]
2383    fn test_repdef_simple_null_empty_list() {
2384        let check = |repdefs: SerializedRepDefs, last_def: DefinitionInterpretation| {
2385            let rep = repdefs.repetition_levels.unwrap();
2386            let def = repdefs.definition_levels.unwrap();
2387
2388            assert_eq!([1, 0, 1, 1, 0, 0], *rep);
2389            assert_eq!([0, 0, 2, 0, 1, 0], *def);
2390            assert!(repdefs.special_records.is_empty());
2391            assert_eq!(
2392                vec![DefinitionInterpretation::NullableItem, last_def,],
2393                repdefs.def_meaning
2394            );
2395        };
2396
2397        // Null list and empty list should be serialized mostly the same
2398
2399        // Null case
2400        let mut builder = RepDefBuilder::default();
2401        builder.add_offsets(
2402            offsets_32(&[0, 2, 2, 5]),
2403            Some(validity(&[true, false, true])),
2404        );
2405        builder.add_validity_bitmap(validity(&[true, true, true, false, true]));
2406
2407        let repdefs = RepDefBuilder::serialize(vec![builder]);
2408
2409        check(repdefs, DefinitionInterpretation::NullableList);
2410
2411        // Empty case
2412        let mut builder = RepDefBuilder::default();
2413        builder.add_offsets(offsets_32(&[0, 2, 2, 5]), None);
2414        builder.add_validity_bitmap(validity(&[true, true, true, false, true]));
2415
2416        let repdefs = RepDefBuilder::serialize(vec![builder]);
2417
2418        check(repdefs, DefinitionInterpretation::EmptyableList);
2419    }
2420
2421    #[test]
2422    fn test_repdef_empty_list_at_end() {
2423        // Regresses a failure we encountered when the last item was an empty list
2424        let mut builder = RepDefBuilder::default();
2425        builder.add_offsets(offsets_32(&[0, 2, 5, 5]), None);
2426        builder.add_validity_bitmap(validity(&[true, true, true, false, true]));
2427
2428        let repdefs = RepDefBuilder::serialize(vec![builder]);
2429
2430        let rep = repdefs.repetition_levels.unwrap();
2431        let def = repdefs.definition_levels.unwrap();
2432
2433        assert_eq!([1, 0, 1, 0, 0, 1], *rep);
2434        assert_eq!([0, 0, 0, 1, 0, 2], *def);
2435        assert!(repdefs.special_records.is_empty());
2436        assert_eq!(
2437            vec![
2438                DefinitionInterpretation::NullableItem,
2439                DefinitionInterpretation::EmptyableList,
2440            ],
2441            repdefs.def_meaning
2442        );
2443    }
2444
2445    #[test]
2446    fn test_repdef_abnormal_nulls() {
2447        // List nulls are allowed to have non-empty offsets and garbage values
2448        // and the add_offsets call should normalize this
2449        let mut builder = RepDefBuilder::default();
2450        builder.add_offsets(
2451            offsets_32(&[0, 2, 5, 8]),
2452            Some(validity(&[true, false, true])),
2453        );
2454        // Note: we pass 5 here and not 8.  If add_offsets tells us there is garbage nulls they
2455        // should be removed before continuing
2456        builder.add_no_null(5);
2457
2458        let repdefs = RepDefBuilder::serialize(vec![builder]);
2459
2460        let rep = repdefs.repetition_levels.unwrap();
2461        let def = repdefs.definition_levels.unwrap();
2462
2463        assert_eq!([1, 0, 1, 1, 0, 0], *rep);
2464        assert_eq!([0, 0, 1, 0, 0, 0], *def);
2465
2466        assert_eq!(
2467            vec![
2468                DefinitionInterpretation::AllValidItem,
2469                DefinitionInterpretation::NullableList,
2470            ],
2471            repdefs.def_meaning
2472        );
2473    }
2474
2475    #[test]
2476    fn test_repdef_fsl() {
2477        let mut builder = RepDefBuilder::default();
2478        builder.add_fsl(Some(validity(&[true, false])), 2, 2);
2479        builder.add_fsl(None, 2, 4);
2480        builder.add_validity_bitmap(validity(&[
2481            true, false, true, false, true, false, true, false,
2482        ]));
2483
2484        let repdefs = RepDefBuilder::serialize(vec![builder]);
2485
2486        assert_eq!(
2487            vec![
2488                DefinitionInterpretation::NullableItem,
2489                DefinitionInterpretation::AllValidItem,
2490                DefinitionInterpretation::NullableItem
2491            ],
2492            repdefs.def_meaning
2493        );
2494
2495        assert!(repdefs.repetition_levels.is_none());
2496
2497        let def = repdefs.definition_levels.unwrap();
2498
2499        assert_eq!([0, 1, 0, 1, 2, 2, 2, 2], *def);
2500
2501        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2502            None,
2503            Some(def.as_ref().to_vec()),
2504            repdefs.def_meaning.into(),
2505        )]);
2506
2507        assert_eq!(
2508            unraveler.unravel_validity(8),
2509            Some(validity(&[
2510                true, false, true, false, false, false, false, false
2511            ]))
2512        );
2513        assert_eq!(unraveler.unravel_fsl_validity(4, 2), None);
2514        assert_eq!(
2515            unraveler.unravel_fsl_validity(2, 2),
2516            Some(validity(&[true, false]))
2517        );
2518    }
2519
2520    #[test]
2521    fn test_repdef_fsl_allvalid_item() {
2522        let mut builder = RepDefBuilder::default();
2523        builder.add_fsl(Some(validity(&[true, false])), 2, 2);
2524        builder.add_fsl(None, 2, 4);
2525        builder.add_no_null(8);
2526
2527        let repdefs = RepDefBuilder::serialize(vec![builder]);
2528
2529        assert_eq!(
2530            vec![
2531                DefinitionInterpretation::AllValidItem,
2532                DefinitionInterpretation::AllValidItem,
2533                DefinitionInterpretation::NullableItem
2534            ],
2535            repdefs.def_meaning
2536        );
2537
2538        assert!(repdefs.repetition_levels.is_none());
2539
2540        let def = repdefs.definition_levels.unwrap();
2541
2542        assert_eq!([0, 0, 0, 0, 1, 1, 1, 1], *def);
2543
2544        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2545            None,
2546            Some(def.as_ref().to_vec()),
2547            repdefs.def_meaning.into(),
2548        )]);
2549
2550        assert_eq!(unraveler.unravel_validity(8), None);
2551        assert_eq!(unraveler.unravel_fsl_validity(4, 2), None);
2552        assert_eq!(
2553            unraveler.unravel_fsl_validity(2, 2),
2554            Some(validity(&[true, false]))
2555        );
2556    }
2557
2558    #[test]
2559    fn test_repdef_sliced_offsets() {
2560        // Sliced lists may have offsets that don't start with zero.  The
2561        // add_offsets call needs to normalize these to operate correctly.
2562        let mut builder = RepDefBuilder::default();
2563        builder.add_offsets(
2564            offsets_32(&[5, 7, 7, 10]),
2565            Some(validity(&[true, false, true])),
2566        );
2567        builder.add_no_null(5);
2568
2569        let repdefs = RepDefBuilder::serialize(vec![builder]);
2570
2571        let rep = repdefs.repetition_levels.unwrap();
2572        let def = repdefs.definition_levels.unwrap();
2573
2574        assert_eq!([1, 0, 1, 1, 0, 0], *rep);
2575        assert_eq!([0, 0, 1, 0, 0, 0], *def);
2576
2577        assert_eq!(
2578            vec![
2579                DefinitionInterpretation::AllValidItem,
2580                DefinitionInterpretation::NullableList,
2581            ],
2582            repdefs.def_meaning
2583        );
2584    }
2585
2586    #[test]
2587    fn test_repdef_complex_null_empty() {
2588        let mut builder = RepDefBuilder::default();
2589        builder.add_offsets(
2590            offsets_32(&[0, 4, 4, 4, 6]),
2591            Some(validity(&[true, false, true, true])),
2592        );
2593        builder.add_offsets(
2594            offsets_32(&[0, 1, 1, 2, 2, 2, 3]),
2595            Some(validity(&[true, false, true, false, true, true])),
2596        );
2597        builder.add_no_null(3);
2598
2599        let repdefs = RepDefBuilder::serialize(vec![builder]);
2600
2601        let rep = repdefs.repetition_levels.unwrap();
2602        let def = repdefs.definition_levels.unwrap();
2603
2604        assert_eq!([2, 1, 1, 1, 2, 2, 2, 1], *rep);
2605        assert_eq!([0, 1, 0, 1, 3, 4, 2, 0], *def);
2606    }
2607
2608    #[test]
2609    fn test_repdef_empty_list_no_null() {
2610        // Tests when we have some empty lists but no null lists.  This case
2611        // caused some bugs because we have definition but no nulls
2612        let mut builder = RepDefBuilder::default();
2613        builder.add_offsets(offsets_32(&[0, 4, 4, 4, 6]), None);
2614        builder.add_no_null(6);
2615
2616        let repdefs = RepDefBuilder::serialize(vec![builder]);
2617
2618        let rep = repdefs.repetition_levels.unwrap();
2619        let def = repdefs.definition_levels.unwrap();
2620
2621        assert_eq!([1, 0, 0, 0, 1, 1, 1, 0], *rep);
2622        assert_eq!([0, 0, 0, 0, 1, 1, 0, 0], *def);
2623
2624        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2625            Some(rep.as_ref().to_vec()),
2626            Some(def.as_ref().to_vec()),
2627            repdefs.def_meaning.into(),
2628        )]);
2629
2630        assert_eq!(unraveler.unravel_validity(6), None);
2631        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2632        assert_eq!(off.inner(), offsets_32(&[0, 4, 4, 4, 6]).inner());
2633        assert_eq!(val, None);
2634    }
2635
2636    #[test]
2637    fn test_repdef_all_valid() {
2638        let mut builder = RepDefBuilder::default();
2639        builder.add_offsets(offsets_64(&[0, 2, 3, 5]), None);
2640        builder.add_offsets(offsets_64(&[0, 1, 3, 5, 7, 9]), None);
2641        builder.add_no_null(9);
2642
2643        let repdefs = RepDefBuilder::serialize(vec![builder]);
2644        let rep = repdefs.repetition_levels.unwrap();
2645        assert!(repdefs.definition_levels.is_none());
2646
2647        assert_eq!([2, 1, 0, 2, 0, 2, 0, 1, 0], *rep);
2648
2649        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2650            Some(rep.as_ref().to_vec()),
2651            None,
2652            repdefs.def_meaning.into(),
2653        )]);
2654
2655        assert_eq!(unraveler.unravel_validity(9), None);
2656        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2657        assert_eq!(off.inner(), offsets_32(&[0, 1, 3, 5, 7, 9]).inner());
2658        assert_eq!(val, None);
2659        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2660        assert_eq!(off.inner(), offsets_32(&[0, 2, 3, 5]).inner());
2661        assert_eq!(val, None);
2662    }
2663
2664    #[test]
2665    fn test_repdef_no_rep() {
2666        let mut builder = RepDefBuilder::default();
2667        builder.add_no_null(5);
2668        builder.add_validity_bitmap(validity(&[false, false, true, true, true]));
2669        builder.add_validity_bitmap(validity(&[false, true, true, true, false]));
2670
2671        let repdefs = RepDefBuilder::serialize(vec![builder]);
2672        assert!(repdefs.repetition_levels.is_none());
2673        let def = repdefs.definition_levels.unwrap();
2674
2675        assert_eq!([2, 2, 0, 0, 1], *def);
2676
2677        let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2678            None,
2679            Some(def.as_ref().to_vec()),
2680            repdefs.def_meaning.into(),
2681        )]);
2682
2683        assert_eq!(
2684            unraveler.unravel_validity(5),
2685            Some(validity(&[false, false, true, true, false]))
2686        );
2687        assert_eq!(
2688            unraveler.unravel_validity(5),
2689            Some(validity(&[false, false, true, true, true]))
2690        );
2691        assert_eq!(unraveler.unravel_validity(5), None);
2692    }
2693
2694    #[test]
2695    fn test_composite_unravel() {
2696        let mut builder = RepDefBuilder::default();
2697        builder.add_offsets(
2698            offsets_64(&[0, 2, 2, 5]),
2699            Some(validity(&[true, false, true])),
2700        );
2701        let repdef1 = RepDefBuilder::serialize(vec![builder]);
2702
2703        let mut builder = RepDefBuilder::default();
2704        builder.add_offsets(offsets_64(&[0, 1, 3, 5, 7, 9]), None);
2705        let repdef2 = RepDefBuilder::serialize(vec![builder]);
2706
2707        let unravel1 = RepDefUnraveler::new(
2708            repdef1.repetition_levels.map(|l| l.to_vec()),
2709            repdef1.definition_levels.map(|l| l.to_vec()),
2710            repdef1.def_meaning.into(),
2711        );
2712        let unravel2 = RepDefUnraveler::new(
2713            repdef2.repetition_levels.map(|l| l.to_vec()),
2714            repdef2.definition_levels.map(|l| l.to_vec()),
2715            repdef2.def_meaning.into(),
2716        );
2717
2718        let mut unraveler = CompositeRepDefUnraveler::new(vec![unravel1, unravel2]);
2719
2720        let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2721        assert_eq!(
2722            off.inner(),
2723            offsets_32(&[0, 2, 2, 5, 6, 8, 10, 12, 14]).inner()
2724        );
2725        assert_eq!(
2726            val,
2727            Some(validity(&[true, false, true, true, true, true, true, true]))
2728        );
2729    }
2730
2731    #[test]
2732    fn test_repdef_multiple_builders() {
2733        // Basic case, rep & def
2734        let mut builder1 = RepDefBuilder::default();
2735        builder1.add_offsets(offsets_64(&[0, 2]), None);
2736        builder1.add_offsets(offsets_64(&[0, 1, 3]), None);
2737        builder1.add_validity_bitmap(validity(&[true, true, true]));
2738
2739        let mut builder2 = RepDefBuilder::default();
2740        builder2.add_offsets(offsets_64(&[0, 0, 3]), Some(validity(&[false, true])));
2741        builder2.add_offsets(
2742            offsets_64(&[0, 2, 2, 6]),
2743            Some(validity(&[true, false, true])),
2744        );
2745        builder2.add_validity_bitmap(validity(&[false, false, false, true, true, false]));
2746
2747        let repdefs = RepDefBuilder::serialize(vec![builder1, builder2]);
2748
2749        let rep = repdefs.repetition_levels.unwrap();
2750        let def = repdefs.definition_levels.unwrap();
2751
2752        assert_eq!([2, 1, 0, 2, 2, 0, 1, 1, 0, 0, 0], *rep);
2753        assert_eq!([0, 0, 0, 3, 1, 1, 2, 1, 0, 0, 1], *def);
2754    }
2755
2756    #[test]
2757    fn test_slicer() {
2758        let mut builder = RepDefBuilder::default();
2759        builder.add_offsets(
2760            offsets_64(&[0, 2, 2, 30, 30]),
2761            Some(validity(&[true, false, true, true])),
2762        );
2763        builder.add_no_null(30);
2764
2765        let repdefs = RepDefBuilder::serialize(vec![builder]);
2766
2767        let mut rep_slicer = repdefs.rep_slicer().unwrap();
2768
2769        // First 5 items include a null list so we get 6 levels (12 bytes)
2770        assert_eq!(rep_slicer.slice_next(5).len(), 12);
2771        // Next 20 are all plain
2772        assert_eq!(rep_slicer.slice_next(20).len(), 40);
2773        // Last 5 include an empty list so we get 6 levels (12 bytes)
2774        assert_eq!(rep_slicer.slice_rest().len(), 12);
2775
2776        let mut def_slicer = repdefs.rep_slicer().unwrap();
2777
2778        // First 5 items include a null list so we get 6 levels (12 bytes)
2779        assert_eq!(def_slicer.slice_next(5).len(), 12);
2780        // Next 20 are all plain
2781        assert_eq!(def_slicer.slice_next(20).len(), 40);
2782        // Last 5 include an empty list so we get 6 levels (12 bytes)
2783        assert_eq!(def_slicer.slice_rest().len(), 12);
2784    }
2785
2786    #[test]
2787    fn test_control_words() {
2788        // Convert to control words, verify expected, convert back, verify same as original
2789        fn check(
2790            rep: &[u16],
2791            def: &[u16],
2792            expected_values: Vec<u8>,
2793            expected_bytes_per_word: usize,
2794            expected_bits_rep: u8,
2795            expected_bits_def: u8,
2796        ) {
2797            let num_vals = rep.len().max(def.len());
2798            let max_rep = rep.iter().max().copied().unwrap_or(0);
2799            let max_def = def.iter().max().copied().unwrap_or(0);
2800
2801            let in_rep = if rep.is_empty() { None } else { Some(rep) };
2802            let in_def = if def.is_empty() { None } else { Some(def) };
2803
2804            let mut iter = super::build_control_word_iterator(
2805                in_rep,
2806                max_rep,
2807                in_def,
2808                max_def,
2809                max_def + 1,
2810                expected_values.len(),
2811            );
2812            assert_eq!(iter.bytes_per_word(), expected_bytes_per_word);
2813            assert_eq!(iter.bits_rep(), expected_bits_rep);
2814            assert_eq!(iter.bits_def(), expected_bits_def);
2815            let mut cw_vec = Vec::with_capacity(num_vals * iter.bytes_per_word());
2816
2817            for _ in 0..num_vals {
2818                iter.append_next(&mut cw_vec);
2819            }
2820            assert!(iter.append_next(&mut cw_vec).is_none());
2821
2822            assert_eq!(expected_values, cw_vec);
2823
2824            let parser = super::ControlWordParser::new(expected_bits_rep, expected_bits_def);
2825
2826            let mut rep_out = Vec::with_capacity(num_vals);
2827            let mut def_out = Vec::with_capacity(num_vals);
2828
2829            if expected_bytes_per_word > 0 {
2830                for slice in cw_vec.chunks_exact(expected_bytes_per_word) {
2831                    parser.parse(slice, &mut rep_out, &mut def_out);
2832                }
2833            }
2834
2835            assert_eq!(rep, rep_out.as_slice());
2836            assert_eq!(def, def_out.as_slice());
2837        }
2838
2839        // Each will need 4 bits and so we should get 1-byte control words
2840        let rep = &[0_u16, 7, 3, 2, 9, 8, 12, 5];
2841        let def = &[5_u16, 3, 1, 2, 12, 15, 0, 2];
2842        let expected = vec![
2843            0b00000101, // 0, 5
2844            0b01110011, // 7, 3
2845            0b00110001, // 3, 1
2846            0b00100010, // 2, 2
2847            0b10011100, // 9, 12
2848            0b10001111, // 8, 15
2849            0b11000000, // 12, 0
2850            0b01010010, // 5, 2
2851        ];
2852        check(rep, def, expected, 1, 4, 4);
2853
2854        // Now we need 5 bits for def so we get 2-byte control words
2855        let rep = &[0_u16, 7, 3, 2, 9, 8, 12, 5];
2856        let def = &[5_u16, 3, 1, 2, 12, 22, 0, 2];
2857        let expected = vec![
2858            0b00000101, 0b00000000, // 0, 5
2859            0b11100011, 0b00000000, // 7, 3
2860            0b01100001, 0b00000000, // 3, 1
2861            0b01000010, 0b00000000, // 2, 2
2862            0b00101100, 0b00000001, // 9, 12
2863            0b00010110, 0b00000001, // 8, 22
2864            0b10000000, 0b00000001, // 12, 0
2865            0b10100010, 0b00000000, // 5, 2
2866        ];
2867        check(rep, def, expected, 2, 4, 5);
2868
2869        // Just rep, 4 bits so 1 byte each
2870        let levels = &[0_u16, 7, 3, 2, 9, 8, 12, 5];
2871        let expected = vec![
2872            0b00000000, // 0
2873            0b00000111, // 7
2874            0b00000011, // 3
2875            0b00000010, // 2
2876            0b00001001, // 9
2877            0b00001000, // 8
2878            0b00001100, // 12
2879            0b00000101, // 5
2880        ];
2881        check(levels, &[], expected.clone(), 1, 4, 0);
2882
2883        // Just def
2884        check(&[], levels, expected, 1, 0, 4);
2885
2886        // No rep, no def, no bytes
2887        check(&[], &[], Vec::default(), 0, 0, 0);
2888    }
2889
2890    #[test]
2891    fn test_control_words_rep_index() {
2892        fn check(
2893            rep: &[u16],
2894            def: &[u16],
2895            expected_new_rows: Vec<bool>,
2896            expected_is_visible: Vec<bool>,
2897        ) {
2898            let num_vals = rep.len().max(def.len());
2899            let max_rep = rep.iter().max().copied().unwrap_or(0);
2900            let max_def = def.iter().max().copied().unwrap_or(0);
2901
2902            let in_rep = if rep.is_empty() { None } else { Some(rep) };
2903            let in_def = if def.is_empty() { None } else { Some(def) };
2904
2905            let mut iter = super::build_control_word_iterator(
2906                in_rep,
2907                max_rep,
2908                in_def,
2909                max_def,
2910                /*max_visible_def=*/ 2,
2911                expected_new_rows.len(),
2912            );
2913
2914            let mut cw_vec = Vec::with_capacity(num_vals * iter.bytes_per_word());
2915            let mut expected_new_rows = expected_new_rows.iter().copied();
2916            let mut expected_is_visible = expected_is_visible.iter().copied();
2917            for _ in 0..expected_new_rows.len() {
2918                let word_desc = iter.append_next(&mut cw_vec).unwrap();
2919                assert_eq!(word_desc.is_new_row, expected_new_rows.next().unwrap());
2920                assert_eq!(word_desc.is_visible, expected_is_visible.next().unwrap());
2921            }
2922            assert!(iter.append_next(&mut cw_vec).is_none());
2923        }
2924
2925        // 2 means new list
2926        let rep = &[2_u16, 1, 0, 2, 2, 0, 1, 1, 0, 2, 0];
2927        // These values don't matter for this test
2928        let def = &[0_u16, 0, 0, 3, 1, 1, 2, 1, 0, 0, 1];
2929
2930        // Rep & def
2931        check(
2932            rep,
2933            def,
2934            vec![
2935                true, false, false, true, true, false, false, false, false, true, false,
2936            ],
2937            vec![
2938                true, true, true, false, true, true, true, true, true, true, true,
2939            ],
2940        );
2941        // Rep only
2942        check(
2943            rep,
2944            &[],
2945            vec![
2946                true, false, false, true, true, false, false, false, false, true, false,
2947            ],
2948            vec![true; 11],
2949        );
2950        // No repetition
2951        check(
2952            &[],
2953            def,
2954            vec![
2955                true, true, true, true, true, true, true, true, true, true, true,
2956            ],
2957            vec![true; 11],
2958        );
2959        // No repetition, no definition
2960        check(
2961            &[],
2962            &[],
2963            vec![
2964                true, true, true, true, true, true, true, true, true, true, true,
2965            ],
2966            vec![true; 11],
2967        );
2968    }
2969}