1use std::{
110 iter::{Copied, Zip},
111 ops::Range,
112 sync::Arc,
113};
114
115use arrow_array::OffsetSizeTrait;
116use arrow_buffer::{
117 ArrowNativeType, BooleanBuffer, BooleanBufferBuilder, NullBuffer, OffsetBuffer, ScalarBuffer,
118};
119use lance_core::{Error, Result, utils::bit::log_2_ceil};
120
121use crate::buffer::LanceBuffer;
122
123pub type LevelBuffer = Vec<u16>;
124
125#[derive(Debug, Clone, PartialEq, Eq)]
127pub(crate) struct StructuralPageSplit {
128 pub(crate) row_start: u64,
130 pub(crate) num_rows: u64,
132 pub(crate) level_range: Range<usize>,
134 pub(crate) value_start: u64,
136 pub(crate) num_values: u64,
138}
139
140#[derive(Debug, Clone, PartialEq, Eq)]
142pub(crate) enum StructuralPagePlan {
143 Fits,
145 Split(Vec<StructuralPageSplit>),
147 UnsplittableOverBudget(u64),
149}
150
151const SPECIAL_THRESHOLD: u16 = u16::MAX / 2;
162
163#[derive(Clone, Debug)]
166struct OffsetDesc {
167 offsets: Arc<[i64]>,
168 validity: Option<BooleanBuffer>,
169 has_empty_lists: bool,
170 num_values: usize,
171 num_specials: usize,
172}
173
174#[derive(Clone, Debug)]
177struct ValidityDesc {
178 validity: Option<BooleanBuffer>,
179 num_values: usize,
180}
181
182#[derive(Clone, Debug)]
186struct FslDesc {
187 validity: Option<BooleanBuffer>,
188 dimension: usize,
189 num_values: usize,
190}
191
192#[derive(Clone, Debug)]
196enum RawRepDef {
197 Offsets(OffsetDesc),
198 Validity(ValidityDesc),
199 Fsl(FslDesc),
200}
201
202impl RawRepDef {
203 fn has_nulls(&self) -> bool {
205 match self {
206 Self::Offsets(OffsetDesc { validity, .. }) => validity.is_some(),
207 Self::Validity(ValidityDesc { validity, .. }) => validity.is_some(),
208 Self::Fsl(FslDesc { validity, .. }) => validity.is_some(),
209 }
210 }
211
212 fn num_values(&self) -> usize {
214 match self {
215 Self::Offsets(OffsetDesc { num_values, .. }) => *num_values,
216 Self::Validity(ValidityDesc { num_values, .. }) => *num_values,
217 Self::Fsl(FslDesc { num_values, .. }) => *num_values,
218 }
219 }
220
221 fn num_specials(&self) -> usize {
223 match self {
224 Self::Offsets(OffsetDesc { num_specials, .. }) => *num_specials,
225 _ => 0,
226 }
227 }
228
229 fn max_def(&self) -> u16 {
231 match self {
232 Self::Offsets(OffsetDesc {
233 has_empty_lists,
234 validity,
235 ..
236 }) => {
237 let mut max_def = 0;
238 if *has_empty_lists {
239 max_def += 1;
240 }
241 if validity.is_some() {
242 max_def += 1;
243 }
244 max_def
245 }
246 Self::Validity(ValidityDesc { validity: None, .. }) => 0,
247 Self::Validity(ValidityDesc { .. }) => 1,
248 Self::Fsl(FslDesc { validity: None, .. }) => 0,
249 Self::Fsl(FslDesc { .. }) => 1,
250 }
251 }
252
253 fn max_rep(&self) -> u16 {
255 match self {
256 Self::Offsets(_) => 1,
257 _ => 0,
258 }
259 }
260}
261
262#[derive(Debug)]
265pub struct SerializedRepDefs {
266 pub repetition_levels: Option<Arc<[u16]>>,
270 pub definition_levels: Option<Arc<[u16]>>,
274 pub def_meaning: Vec<DefinitionInterpretation>,
276 pub max_visible_level: Option<u16>,
283 has_fsl: bool,
284}
285
286impl SerializedRepDefs {
287 fn max_visible_level(def_meaning: &[DefinitionInterpretation]) -> Option<u16> {
288 let first_list = def_meaning.iter().position(|level| level.is_list());
289 first_list.map(|first_list| {
290 def_meaning
291 .iter()
292 .map(|level| level.num_def_levels())
293 .take(first_list)
294 .sum::<u16>()
295 })
296 }
297
298 pub fn new(
299 repetition_levels: Option<LevelBuffer>,
300 definition_levels: Option<LevelBuffer>,
301 def_meaning: Vec<DefinitionInterpretation>,
302 ) -> Self {
303 Self::new_with_fixed_size_list_levels(
304 repetition_levels,
305 definition_levels,
306 def_meaning,
307 false,
308 )
309 }
310
311 pub(crate) fn new_with_fixed_size_list_levels(
312 repetition_levels: Option<LevelBuffer>,
313 definition_levels: Option<LevelBuffer>,
314 def_meaning: Vec<DefinitionInterpretation>,
315 has_fsl: bool,
316 ) -> Self {
317 let max_visible_level = Self::max_visible_level(&def_meaning);
318 Self {
319 repetition_levels: repetition_levels.map(Arc::from),
320 definition_levels: definition_levels.map(Arc::from),
321 def_meaning,
322 max_visible_level,
323 has_fsl,
324 }
325 }
326
327 pub fn empty(def_meaning: Vec<DefinitionInterpretation>) -> Self {
329 Self {
330 repetition_levels: None,
331 definition_levels: None,
332 def_meaning,
333 max_visible_level: None,
334 has_fsl: false,
335 }
336 }
337
338 pub fn rep_slicer(&self) -> Option<RepDefSlicer<'_>> {
339 self.repetition_levels
340 .as_ref()
341 .map(|rep| RepDefSlicer::new(self, rep.clone()))
342 }
343
344 pub fn def_slicer(&self) -> Option<RepDefSlicer<'_>> {
345 self.definition_levels
346 .as_ref()
347 .map(|def| RepDefSlicer::new(self, def.clone()))
348 }
349
350 pub(crate) fn has_fixed_size_list_levels(&self) -> bool {
351 self.has_fsl
352 }
353}
354
355#[derive(Debug)]
363pub struct RepDefSlicer<'a> {
364 repdef: &'a SerializedRepDefs,
365 to_slice: LanceBuffer,
366 current: usize,
367}
368
369impl<'a> RepDefSlicer<'a> {
371 fn new(repdef: &'a SerializedRepDefs, levels: Arc<[u16]>) -> Self {
372 Self {
373 repdef,
374 to_slice: LanceBuffer::reinterpret_slice(levels),
375 current: 0,
376 }
377 }
378
379 pub fn num_levels(&self) -> usize {
380 self.to_slice.len() / 2
381 }
382
383 pub fn num_levels_remaining(&self) -> usize {
384 self.num_levels() - self.current
385 }
386
387 pub fn all_levels(&self) -> &LanceBuffer {
388 &self.to_slice
389 }
390
391 pub fn slice_rest(&mut self) -> LanceBuffer {
400 let start = self.current;
401 let remaining = self.num_levels_remaining();
402 self.current = self.num_levels();
403 self.to_slice.slice_with_length(start * 2, remaining * 2)
404 }
405
406 pub fn slice_next(&mut self, num_values: usize) -> LanceBuffer {
408 let start = self.current;
409 let Some(max_visible_level) = self.repdef.max_visible_level else {
410 self.current = start + num_values;
412 return self.to_slice.slice_with_length(start * 2, num_values * 2);
413 };
414 if let Some(def) = self.repdef.definition_levels.as_ref() {
415 let mut def_itr = def[start..].iter();
419 let mut num_taken = 0;
420 let mut num_passed = 0;
421 while num_taken < num_values {
422 let def_level = *def_itr.next().unwrap();
423 if def_level <= max_visible_level {
424 num_taken += 1;
425 }
426 num_passed += 1;
427 }
428 self.current = start + num_passed;
429 self.to_slice.slice_with_length(start * 2, num_passed * 2)
430 } else {
431 self.current = start + num_values;
433 self.to_slice.slice_with_length(start * 2, num_values * 2)
434 }
435 }
436}
437
438#[derive(Debug, Copy, Clone, PartialEq, Eq)]
451pub enum DefinitionInterpretation {
452 AllValidItem,
453 AllValidList,
454 NullableItem,
455 NullableList,
456 EmptyableList,
457 NullableAndEmptyableList,
458}
459
460impl DefinitionInterpretation {
461 pub fn num_def_levels(&self) -> u16 {
463 match self {
464 Self::AllValidItem => 0,
465 Self::AllValidList => 0,
466 Self::NullableItem => 1,
467 Self::NullableList => 1,
468 Self::EmptyableList => 1,
469 Self::NullableAndEmptyableList => 2,
470 }
471 }
472
473 pub fn is_all_valid(&self) -> bool {
475 matches!(
476 self,
477 Self::AllValidItem | Self::AllValidList | Self::EmptyableList
478 )
479 }
480
481 pub fn is_list(&self) -> bool {
483 matches!(
484 self,
485 Self::AllValidList
486 | Self::NullableList
487 | Self::EmptyableList
488 | Self::NullableAndEmptyableList
489 )
490 }
491}
492
493#[derive(Debug)]
505struct SerializerContext {
506 def_meaning: Vec<DefinitionInterpretation>,
508 rep_levels: LevelBuffer,
509 spare_rep: LevelBuffer,
510 def_levels: LevelBuffer,
511 spare_def: LevelBuffer,
512 current_rep: u16,
513 current_def: u16,
514 current_len: usize,
515 current_num_specials: usize,
516 has_fsl: bool,
517}
518
519impl SerializerContext {
520 fn new(len: usize, num_layers: usize, max_rep: u16, max_def: u16) -> Self {
521 let def_meaning = Vec::with_capacity(num_layers);
522 Self {
523 rep_levels: if max_rep > 0 {
524 vec![0; len]
525 } else {
526 LevelBuffer::default()
527 },
528 spare_rep: if max_rep > 0 {
529 vec![0; len]
530 } else {
531 LevelBuffer::default()
532 },
533 def_levels: if max_def > 0 {
534 vec![0; len]
535 } else {
536 LevelBuffer::default()
537 },
538 spare_def: if max_def > 0 {
539 vec![0; len]
540 } else {
541 LevelBuffer::default()
542 },
543 def_meaning,
544 current_rep: max_rep,
545 current_def: max_def,
546 current_len: 0,
547 current_num_specials: 0,
548 has_fsl: false,
549 }
550 }
551
552 fn checkout_def(&mut self, meaning: DefinitionInterpretation) -> u16 {
553 let def = self.current_def;
554 self.current_def -= meaning.num_def_levels();
555 self.def_meaning.push(meaning);
556 def
557 }
558
559 fn record_offsets(&mut self, offset_desc: &OffsetDesc) {
560 let rep_level = self.current_rep;
561 let (null_list_level, empty_list_level) =
562 match (offset_desc.validity.is_some(), offset_desc.has_empty_lists) {
563 (true, true) => {
564 let level =
565 self.checkout_def(DefinitionInterpretation::NullableAndEmptyableList);
566 (level - 1, level)
567 }
568 (true, false) => (self.checkout_def(DefinitionInterpretation::NullableList), 0),
569 (false, true) => (
570 0,
571 self.checkout_def(DefinitionInterpretation::EmptyableList),
572 ),
573 (false, false) => {
574 self.checkout_def(DefinitionInterpretation::AllValidList);
575 (0, 0)
576 }
577 };
578 self.current_rep -= 1;
579
580 if let Some(validity) = &offset_desc.validity {
581 self.do_record_validity(validity, null_list_level);
582 }
583
584 let mut new_len = 0;
589 let expected_len = offset_desc.num_values + self.current_num_specials;
590 if expected_len == 0 {
591 self.current_len = 0;
593 return;
594 }
595 assert!(self.rep_levels.len() >= expected_len - 1);
596 if self.def_levels.is_empty() {
597 let mut write_itr = self.spare_rep.iter_mut();
598 let mut read_iter = self.rep_levels.iter().copied();
599 for w in offset_desc.offsets.windows(2) {
600 let len = w[1] - w[0];
601 assert!(len > 0);
603 let rep = read_iter.next().unwrap();
604 let list_level = if rep == 0 { rep_level } else { rep };
605 *write_itr.next().unwrap() = list_level;
606
607 for _ in 1..len {
608 *write_itr.next().unwrap() = 0;
609 }
610 new_len += len as usize;
611 }
612 std::mem::swap(&mut self.rep_levels, &mut self.spare_rep);
613 } else {
614 assert!(self.def_levels.len() >= expected_len - 1);
615 let mut def_write_itr = self.spare_def.iter_mut();
616 let mut rep_write_itr = self.spare_rep.iter_mut();
617 let mut rep_read_itr = self.rep_levels.iter().copied();
618 let mut def_read_itr = self.def_levels.iter().copied();
619 let specials_to_pass = self.current_num_specials;
620 let mut specials_passed = 0;
621
622 for w in offset_desc.offsets.windows(2) {
623 let mut def = def_read_itr.next().unwrap();
624 while def > SPECIAL_THRESHOLD {
626 *def_write_itr.next().unwrap() = def;
627 *rep_write_itr.next().unwrap() = rep_read_itr.next().unwrap();
628 def = def_read_itr.next().unwrap();
629 new_len += 1;
630 specials_passed += 1;
631 }
632
633 let len = w[1] - w[0];
634 let rep = rep_read_itr.next().unwrap();
635
636 let list_level = if rep == 0 { rep_level } else { rep };
640
641 if def == 0 && len > 0 {
642 *def_write_itr.next().unwrap() = 0;
644 *rep_write_itr.next().unwrap() = list_level;
645
646 for _ in 1..len {
647 *def_write_itr.next().unwrap() = 0;
648 *rep_write_itr.next().unwrap() = 0;
649 }
650
651 new_len += len as usize;
652 } else if def == 0 {
653 *def_write_itr.next().unwrap() = empty_list_level + SPECIAL_THRESHOLD;
655 *rep_write_itr.next().unwrap() = list_level;
656 new_len += 1;
657 } else {
658 *def_write_itr.next().unwrap() = def + SPECIAL_THRESHOLD;
661 *rep_write_itr.next().unwrap() = list_level;
662 new_len += 1;
663 }
664 }
665
666 while specials_passed < specials_to_pass {
668 *def_write_itr.next().unwrap() = def_read_itr.next().unwrap();
669 *rep_write_itr.next().unwrap() = rep_read_itr.next().unwrap();
670 new_len += 1;
671 specials_passed += 1;
672 }
673 std::mem::swap(&mut self.def_levels, &mut self.spare_def);
674 std::mem::swap(&mut self.rep_levels, &mut self.spare_rep);
675 }
676
677 self.current_len = new_len;
678 self.current_num_specials += offset_desc.num_specials;
679 }
680
681 fn do_record_validity(&mut self, validity: &BooleanBuffer, null_level: u16) {
682 assert!(self.def_levels.len() >= validity.len() + self.current_num_specials);
683 debug_assert!(
684 self.current_len == 0 || self.current_len == validity.len() + self.current_num_specials
685 );
686 self.current_len = validity.len();
687
688 let mut def_read_itr = self.def_levels.iter().copied();
689 let mut def_write_itr = self.spare_def.iter_mut();
690
691 let specials_to_pass = self.current_num_specials;
692 let mut specials_passed = 0;
693
694 for incoming_validity in validity.iter() {
695 let mut def = def_read_itr.next().unwrap();
696 while def > SPECIAL_THRESHOLD {
697 *def_write_itr.next().unwrap() = def;
698 def = def_read_itr.next().unwrap();
699 specials_passed += 1;
700 }
701 if def == 0 && !incoming_validity {
702 *def_write_itr.next().unwrap() = null_level;
703 } else {
704 *def_write_itr.next().unwrap() = def;
705 }
706 }
707
708 while specials_passed < specials_to_pass {
709 *def_write_itr.next().unwrap() = def_read_itr.next().unwrap();
710 specials_passed += 1;
711 }
712
713 std::mem::swap(&mut self.def_levels, &mut self.spare_def);
714 }
715
716 fn multiply_levels(&mut self, multiplier: usize) {
717 let old_len = self.current_len;
718 self.current_len =
720 (self.current_len - self.current_num_specials) * multiplier + self.current_num_specials;
721
722 if self.rep_levels.is_empty() && self.def_levels.is_empty() {
723 return;
725 } else if self.rep_levels.is_empty() {
726 assert!(self.def_levels.len() >= self.current_len);
727 let mut def_read_itr = self.def_levels.iter().copied();
729 let mut def_write_itr = self.spare_def.iter_mut();
730 for _ in 0..old_len {
731 let mut def = def_read_itr.next().unwrap();
732 while def > SPECIAL_THRESHOLD {
733 *def_write_itr.next().unwrap() = def;
734 def = def_read_itr.next().unwrap();
735 }
736 for _ in 0..multiplier {
737 *def_write_itr.next().unwrap() = def;
738 }
739 }
740 } else if self.def_levels.is_empty() {
741 assert!(self.rep_levels.len() >= self.current_len);
742 let mut rep_read_itr = self.rep_levels.iter().copied();
744 let mut rep_write_itr = self.spare_rep.iter_mut();
745 for _ in 0..old_len {
746 let rep = rep_read_itr.next().unwrap();
747 for _ in 0..multiplier {
748 *rep_write_itr.next().unwrap() = rep;
749 }
750 }
751 } else {
752 assert!(self.rep_levels.len() >= self.current_len);
753 assert!(self.def_levels.len() >= self.current_len);
754 let mut rep_read_itr = self.rep_levels.iter().copied();
755 let mut def_read_itr = self.def_levels.iter().copied();
756 let mut rep_write_itr = self.spare_rep.iter_mut();
757 let mut def_write_itr = self.spare_def.iter_mut();
758 for _ in 0..old_len {
759 let mut def = def_read_itr.next().unwrap();
760 while def > SPECIAL_THRESHOLD {
761 *def_write_itr.next().unwrap() = def;
762 *rep_write_itr.next().unwrap() = rep_read_itr.next().unwrap();
763 def = def_read_itr.next().unwrap();
764 }
765 let rep = rep_read_itr.next().unwrap();
766 for _ in 0..multiplier {
767 *def_write_itr.next().unwrap() = def;
768 *rep_write_itr.next().unwrap() = rep;
769 }
770 }
771 }
772 std::mem::swap(&mut self.def_levels, &mut self.spare_def);
773 std::mem::swap(&mut self.rep_levels, &mut self.spare_rep);
774 }
775
776 fn record_validity_buf(&mut self, validity: &Option<BooleanBuffer>) {
777 if let Some(validity) = validity {
778 let def_level = self.checkout_def(DefinitionInterpretation::NullableItem);
779 self.do_record_validity(validity, def_level);
780 } else {
781 self.checkout_def(DefinitionInterpretation::AllValidItem);
782 }
783 }
784
785 fn record_validity(&mut self, validity_desc: &ValidityDesc) {
786 self.record_validity_buf(&validity_desc.validity)
787 }
788
789 fn record_fsl(&mut self, fsl_desc: &FslDesc) {
790 self.has_fsl = true;
791 self.record_validity_buf(&fsl_desc.validity);
792 self.multiply_levels(fsl_desc.dimension);
793 }
794
795 fn normalize_specials(&mut self) {
796 for def in self.def_levels.iter_mut() {
797 if *def > SPECIAL_THRESHOLD {
798 *def -= SPECIAL_THRESHOLD;
799 }
800 }
801 }
802
803 fn normalize_specials_and_plan_splits(
804 &mut self,
805 def_meaning: &[DefinitionInterpretation],
806 max_levels_per_page: Option<u64>,
807 num_rows: u64,
808 num_values: u64,
809 ) -> Result<StructuralPagePlan> {
810 if self.def_levels.is_empty() {
818 return Ok(StructuralPagePlan::Fits);
819 }
820
821 if self.rep_levels.is_empty() {
822 self.normalize_specials();
823 return Ok(StructuralPagePlan::Fits);
824 }
825
826 if self.rep_levels.len() != self.def_levels.len() {
827 return Err(Error::internal(format!(
828 "Cannot plan structural page splits with mismatched rep/def lengths: rep={}, def={}",
829 self.rep_levels.len(),
830 self.def_levels.len()
831 )));
832 }
833
834 let Some(max_levels_per_page) = max_levels_per_page else {
835 self.normalize_specials();
836 return Ok(StructuralPagePlan::Fits);
837 };
838
839 if num_values == 0 {
840 self.normalize_specials();
841 return Ok(StructuralPagePlan::Fits);
842 }
843
844 let max_schema_rep = def_meaning.iter().filter(|level| level.is_list()).count() as u16;
845 let max_visible_level = SerializedRepDefs::max_visible_level(def_meaning);
846 let should_plan = !self.has_fsl && max_schema_rep > 0 && max_visible_level.is_some();
847
848 if !should_plan {
849 self.normalize_specials();
850 return Ok(StructuralPagePlan::Fits);
851 }
852
853 let max_visible_level = max_visible_level.unwrap();
854 let mut splits = Vec::new();
855 let mut counted_rows = 0u64;
856 let mut counted_values = 0u64;
857 let mut saw_structural_overhead = false;
858 let mut unsplittable_over_budget = None;
859
860 let mut current_row_level_start = None;
861 let mut current_row_num_values = 0u64;
862
863 let mut current_page_row_start = 0u64;
864 let mut current_page_num_rows = 0u64;
865 let mut current_page_level_start = 0usize;
866 let mut current_page_level_end = 0usize;
867 let mut current_page_value_start = 0u64;
868 let mut current_page_num_values = 0u64;
869 let mut current_page_num_levels = 0u64;
870 let mut current_page_has_structural_overhead = false;
871
872 let mut finish_row =
873 |row_level_start: usize, row_level_end: usize, row_num_values: u64| -> Result<()> {
874 let row_num_levels = (row_level_end - row_level_start) as u64;
875 let row_has_structural_overhead = row_num_levels > row_num_values;
876 saw_structural_overhead |= row_has_structural_overhead;
877
878 if row_has_structural_overhead && row_num_levels > max_levels_per_page {
879 unsplittable_over_budget = Some(row_num_levels);
880 }
881
882 if current_page_num_rows > 0
883 && (current_page_has_structural_overhead || row_has_structural_overhead)
884 && current_page_num_levels + row_num_levels > max_levels_per_page
885 {
886 splits.push(StructuralPageSplit {
887 row_start: current_page_row_start,
888 num_rows: current_page_num_rows,
889 level_range: current_page_level_start..current_page_level_end,
890 value_start: current_page_value_start,
891 num_values: current_page_num_values,
892 });
893 current_page_row_start = counted_rows;
894 current_page_num_rows = 0;
895 current_page_level_start = row_level_start;
896 current_page_value_start = counted_values;
897 current_page_num_values = 0;
898 current_page_num_levels = 0;
899 current_page_has_structural_overhead = false;
900 }
901
902 if current_page_num_rows == 0 {
903 current_page_level_start = row_level_start;
904 }
905 current_page_num_rows += 1;
906 current_page_level_end = row_level_end;
907 current_page_num_values += row_num_values;
908 current_page_num_levels += row_num_levels;
909 current_page_has_structural_overhead |= row_has_structural_overhead;
910 counted_rows += 1;
911 counted_values += row_num_values;
912 Ok(())
913 };
914
915 for (idx, (rep_level, def_level)) in self
916 .rep_levels
917 .iter()
918 .copied()
919 .zip(self.def_levels.iter_mut())
920 .enumerate()
921 {
922 if *def_level > SPECIAL_THRESHOLD {
923 *def_level -= SPECIAL_THRESHOLD;
924 }
925
926 if rep_level == max_schema_rep {
927 if let Some(level_start) = current_row_level_start {
928 finish_row(level_start, idx, current_row_num_values)?;
929 current_row_num_values = 0;
930 } else if idx != 0 {
931 return Err(Error::internal(format!(
932 "Cannot plan structural page splits: first top-level row starts at level {}, expected 0",
933 idx
934 )));
935 }
936 current_row_level_start = Some(idx);
937 }
938
939 if current_row_level_start.is_none() {
940 return Err(Error::internal(
941 "Cannot plan structural page splits: found levels before the first top-level row start",
942 ));
943 }
944 if *def_level <= max_visible_level {
945 current_row_num_values += 1;
946 }
947 }
948
949 let Some(level_start) = current_row_level_start else {
950 return Err(Error::internal(
951 "Cannot plan structural page splits: found no top-level row starts",
952 ));
953 };
954 finish_row(level_start, self.rep_levels.len(), current_row_num_values)?;
955
956 if counted_rows != num_rows {
957 return Err(Error::internal(format!(
958 "Cannot plan structural page splits: expected {} top-level row starts, found {}",
959 num_rows, counted_rows
960 )));
961 }
962 if counted_values != num_values {
963 return Err(Error::internal(format!(
964 "Cannot plan structural page splits: counted {} visible values, expected {}",
965 counted_values, num_values
966 )));
967 }
968 if !saw_structural_overhead {
969 return Ok(StructuralPagePlan::Fits);
970 }
971 if let Some(row_num_levels) = unsplittable_over_budget {
972 return Ok(StructuralPagePlan::UnsplittableOverBudget(row_num_levels));
973 }
974
975 if current_page_num_rows > 0 {
976 splits.push(StructuralPageSplit {
977 row_start: current_page_row_start,
978 num_rows: current_page_num_rows,
979 level_range: current_page_level_start..current_page_level_end,
980 value_start: current_page_value_start,
981 num_values: current_page_num_values,
982 });
983 }
984
985 if splits.len() > 1 {
986 Ok(StructuralPagePlan::Split(splits))
987 } else {
988 Ok(StructuralPagePlan::Fits)
989 }
990 }
991
992 fn build(mut self) -> SerializedRepDefs {
993 if self.current_len == 0 {
994 return SerializedRepDefs::new_with_fixed_size_list_levels(
995 None,
996 None,
997 self.def_meaning,
998 self.has_fsl,
999 );
1000 }
1001
1002 self.normalize_specials();
1003
1004 let definition_levels = if self.def_levels.is_empty() {
1005 None
1006 } else {
1007 Some(self.def_levels)
1008 };
1009 let repetition_levels = if self.rep_levels.is_empty() {
1010 None
1011 } else {
1012 Some(self.rep_levels)
1013 };
1014
1015 let def_meaning = self.def_meaning.into_iter().rev().collect::<Vec<_>>();
1017
1018 SerializedRepDefs::new_with_fixed_size_list_levels(
1019 repetition_levels,
1020 definition_levels,
1021 def_meaning,
1022 self.has_fsl,
1023 )
1024 }
1025
1026 fn build_with_structural_plan(
1027 mut self,
1028 max_levels_per_page: Option<u64>,
1029 num_rows: u64,
1030 num_values: u64,
1031 ) -> Result<(SerializedRepDefs, StructuralPagePlan)> {
1032 if self.current_len == 0 {
1033 return Ok((
1034 SerializedRepDefs::new_with_fixed_size_list_levels(
1035 None,
1036 None,
1037 self.def_meaning,
1038 self.has_fsl,
1039 ),
1040 StructuralPagePlan::Fits,
1041 ));
1042 }
1043
1044 let def_meaning = std::mem::take(&mut self.def_meaning)
1046 .into_iter()
1047 .rev()
1048 .collect::<Vec<_>>();
1049 let plan = self.normalize_specials_and_plan_splits(
1050 &def_meaning,
1051 max_levels_per_page,
1052 num_rows,
1053 num_values,
1054 )?;
1055
1056 let definition_levels = if self.def_levels.is_empty() {
1057 None
1058 } else {
1059 Some(self.def_levels)
1060 };
1061 let repetition_levels = if self.rep_levels.is_empty() {
1062 None
1063 } else {
1064 Some(self.rep_levels)
1065 };
1066
1067 Ok((
1068 SerializedRepDefs::new_with_fixed_size_list_levels(
1069 repetition_levels,
1070 definition_levels,
1071 def_meaning,
1072 self.has_fsl,
1073 ),
1074 plan,
1075 ))
1076 }
1077}
1078
1079#[derive(Clone, Default, Debug)]
1086pub struct RepDefBuilder {
1087 repdefs: Vec<RawRepDef>,
1089 len: Option<usize>,
1094}
1095
1096impl RepDefBuilder {
1097 fn check_validity_len(&mut self, incoming_len: usize) {
1098 if let Some(len) = self.len {
1099 assert_eq!(incoming_len, len);
1100 } else {
1101 self.len = Some(incoming_len);
1103 }
1104 }
1105
1106 fn num_layers(&self) -> usize {
1107 self.repdefs.len()
1108 }
1109
1110 pub fn is_empty(&self) -> bool {
1113 self.repdefs
1114 .iter()
1115 .all(|r| matches!(r, RawRepDef::Validity(ValidityDesc { validity: None, .. })))
1116 }
1117
1118 pub fn is_simple_validity(&self) -> bool {
1120 self.repdefs.len() == 1 && matches!(self.repdefs[0], RawRepDef::Validity(_))
1121 }
1122
1123 pub fn add_validity_bitmap(&mut self, validity: NullBuffer) {
1125 self.check_validity_len(validity.len());
1126 if validity.null_count() == 0 {
1127 self.add_no_null(validity.len());
1128 return;
1129 }
1130 self.repdefs.push(RawRepDef::Validity(ValidityDesc {
1131 num_values: validity.len(),
1132 validity: Some(validity.into_inner()),
1133 }));
1134 }
1135
1136 pub fn add_no_null(&mut self, len: usize) {
1138 self.check_validity_len(len);
1139 self.repdefs.push(RawRepDef::Validity(ValidityDesc {
1140 validity: None,
1141 num_values: len,
1142 }));
1143 }
1144
1145 pub fn add_fsl(&mut self, validity: Option<NullBuffer>, dimension: usize, num_values: usize) {
1146 if let Some(len) = self.len {
1147 assert_eq!(num_values, len);
1148 }
1149 self.len = Some(num_values * dimension);
1150 debug_assert!(validity.is_none() || validity.as_ref().unwrap().len() == num_values);
1151 self.repdefs.push(RawRepDef::Fsl(FslDesc {
1152 num_values,
1153 validity: validity.map(|v| v.into_inner()),
1154 dimension,
1155 }))
1156 }
1157
1158 fn check_offset_len(&mut self, offsets: &[i64]) {
1159 if let Some(len) = self.len {
1160 assert!(offsets.len() == len + 1);
1161 }
1162 self.len = Some(offsets[offsets.len() - 1] as usize);
1163 }
1164
1165 fn do_add_offsets(
1166 &mut self,
1167 lengths: impl Iterator<Item = i64>,
1168 validity: Option<NullBuffer>,
1169 capacity: usize,
1170 ) -> bool {
1171 let mut num_specials = 0;
1172 let mut has_empty_lists = false;
1173 let mut has_garbage_values = false;
1174 let mut last_off: i64 = 0;
1175
1176 let mut normalized_offsets = Vec::with_capacity(capacity);
1177 normalized_offsets.push(0);
1178
1179 if let Some(ref validity) = validity {
1180 for (len, is_valid) in lengths.zip(validity.iter()) {
1181 match (is_valid, len == 0) {
1182 (false, is_empty) => {
1183 num_specials += 1;
1184 has_garbage_values |= !is_empty;
1185 }
1186 (true, true) => {
1187 num_specials += 1;
1188 has_empty_lists = true;
1189 }
1190 _ => {
1191 last_off += len;
1192 }
1193 }
1194 normalized_offsets.push(last_off);
1195 }
1196 } else {
1197 for len in lengths {
1198 if len == 0 {
1199 num_specials += 1;
1200 has_empty_lists = true;
1201 }
1202 last_off += len;
1203 normalized_offsets.push(last_off);
1204 }
1205 }
1206
1207 self.check_offset_len(&normalized_offsets);
1208 self.repdefs.push(RawRepDef::Offsets(OffsetDesc {
1209 num_values: normalized_offsets.len() - 1,
1210 offsets: normalized_offsets.into(),
1211 validity: validity.map(|v| v.into_inner()),
1212 has_empty_lists,
1213 num_specials: num_specials as usize,
1214 }));
1215
1216 has_garbage_values
1217 }
1218
1219 pub fn add_offsets<O: OffsetSizeTrait>(
1226 &mut self,
1227 offsets: OffsetBuffer<O>,
1228 validity: Option<NullBuffer>,
1229 ) -> bool {
1230 let inner = offsets.into_inner();
1231 let buffer_len = inner.len();
1232
1233 if O::IS_LARGE {
1234 let i64_buff = ScalarBuffer::<i64>::new(inner.into_inner(), 0, buffer_len);
1235 let lengths = i64_buff.windows(2).map(|off| off[1] - off[0]);
1236 self.do_add_offsets(lengths, validity, buffer_len)
1237 } else {
1238 let i32_buff = ScalarBuffer::<i32>::new(inner.into_inner(), 0, buffer_len);
1239 let lengths = i32_buff.windows(2).map(|off| (off[1] - off[0]) as i64);
1240 self.do_add_offsets(lengths, validity, buffer_len)
1241 }
1242 }
1243
1244 fn concat_layers<'a>(
1256 layers: impl Iterator<Item = &'a RawRepDef>,
1257 num_layers: usize,
1258 ) -> RawRepDef {
1259 enum LayerKind {
1260 Validity,
1261 Fsl,
1262 Offsets,
1263 }
1264
1265 let mut collected = Vec::with_capacity(num_layers);
1268 let mut has_nulls = false;
1269 let mut layer_kind = LayerKind::Validity;
1270 let mut total_num_specials = 0;
1271 let mut all_dimension = 0;
1272 let mut all_has_empty_lists = false;
1273 let mut all_num_values = 0;
1274 for layer in layers {
1275 has_nulls |= layer.has_nulls();
1276 match layer {
1277 RawRepDef::Validity(_) => {
1278 layer_kind = LayerKind::Validity;
1279 }
1280 RawRepDef::Offsets(OffsetDesc {
1281 num_specials,
1282 has_empty_lists,
1283 ..
1284 }) => {
1285 all_has_empty_lists |= *has_empty_lists;
1286 layer_kind = LayerKind::Offsets;
1287 total_num_specials += num_specials;
1288 }
1289 RawRepDef::Fsl(FslDesc { dimension, .. }) => {
1290 layer_kind = LayerKind::Fsl;
1291 all_dimension = *dimension;
1292 }
1293 }
1294 collected.push(layer);
1295 all_num_values += layer.num_values();
1296 }
1297
1298 if !has_nulls {
1300 match layer_kind {
1301 LayerKind::Validity => {
1302 return RawRepDef::Validity(ValidityDesc {
1303 validity: None,
1304 num_values: all_num_values,
1305 });
1306 }
1307 LayerKind::Fsl => {
1308 return RawRepDef::Fsl(FslDesc {
1309 validity: None,
1310 num_values: all_num_values,
1311 dimension: all_dimension,
1312 });
1313 }
1314 LayerKind::Offsets => {}
1315 }
1316 }
1317
1318 let mut validity_builder = if has_nulls {
1320 BooleanBufferBuilder::new(all_num_values)
1321 } else {
1322 BooleanBufferBuilder::new(0)
1323 };
1324 let mut all_offsets = if matches!(layer_kind, LayerKind::Offsets) {
1325 let mut all_offsets = Vec::with_capacity(all_num_values);
1326 all_offsets.push(0);
1327 all_offsets
1328 } else {
1329 Vec::new()
1330 };
1331
1332 for layer in collected {
1333 match layer {
1334 RawRepDef::Validity(ValidityDesc {
1335 validity: Some(validity),
1336 ..
1337 }) => {
1338 validity_builder.append_buffer(validity);
1339 }
1340 RawRepDef::Validity(ValidityDesc {
1341 validity: None,
1342 num_values,
1343 }) => {
1344 validity_builder.append_n(*num_values, true);
1345 }
1346 RawRepDef::Fsl(FslDesc {
1347 validity,
1348 num_values,
1349 ..
1350 }) => {
1351 if let Some(validity) = validity {
1352 validity_builder.append_buffer(validity);
1353 } else {
1354 validity_builder.append_n(*num_values, true);
1355 }
1356 }
1357 RawRepDef::Offsets(OffsetDesc {
1358 offsets,
1359 validity: Some(validity),
1360 has_empty_lists,
1361 ..
1362 }) => {
1363 all_has_empty_lists |= has_empty_lists;
1364 validity_builder.append_buffer(validity);
1365 let last = *all_offsets.last().unwrap();
1366 all_offsets.extend(offsets.iter().skip(1).map(|off| *off + last));
1367 }
1368 RawRepDef::Offsets(OffsetDesc {
1369 offsets,
1370 validity: None,
1371 has_empty_lists,
1372 num_values,
1373 ..
1374 }) => {
1375 all_has_empty_lists |= has_empty_lists;
1376 if has_nulls {
1377 validity_builder.append_n(*num_values, true);
1378 }
1379 let last = *all_offsets.last().unwrap();
1380 all_offsets.extend(offsets.iter().skip(1).map(|off| *off + last));
1381 }
1382 }
1383 }
1384 let validity = if has_nulls {
1385 Some(validity_builder.finish())
1386 } else {
1387 None
1388 };
1389 match layer_kind {
1390 LayerKind::Fsl => RawRepDef::Fsl(FslDesc {
1391 validity,
1392 num_values: all_num_values,
1393 dimension: all_dimension,
1394 }),
1395 LayerKind::Validity => RawRepDef::Validity(ValidityDesc {
1396 validity,
1397 num_values: all_num_values,
1398 }),
1399 LayerKind::Offsets => RawRepDef::Offsets(OffsetDesc {
1400 offsets: all_offsets.into(),
1401 validity,
1402 has_empty_lists: all_has_empty_lists,
1403 num_values: all_num_values,
1404 num_specials: total_num_specials,
1405 }),
1406 }
1407 }
1408
1409 pub fn serialize(builders: Vec<Self>) -> SerializedRepDefs {
1412 Self::serialize_builders(builders).0.build()
1413 }
1414
1415 pub(crate) fn serialize_with_structural_plan(
1417 builders: Vec<Self>,
1418 max_levels_for_bits: impl FnOnce(u64) -> u64,
1419 num_rows: u64,
1420 num_values: u64,
1421 ) -> Result<(SerializedRepDefs, StructuralPagePlan)> {
1422 let (context, bits_per_level) = Self::serialize_builders(builders);
1423 context.build_with_structural_plan(
1424 bits_per_level.map(max_levels_for_bits),
1425 num_rows,
1426 num_values,
1427 )
1428 }
1429
1430 fn serialize_builders(builders: Vec<Self>) -> (SerializerContext, Option<u64>) {
1431 assert!(!builders.is_empty());
1432 if builders.iter().all(|b| b.is_empty()) {
1433 let def_meaning = builders
1435 .first()
1436 .unwrap()
1437 .repdefs
1438 .iter()
1439 .map(|_| DefinitionInterpretation::AllValidItem)
1440 .collect::<Vec<_>>();
1441 return (
1442 SerializerContext {
1443 def_meaning,
1444 rep_levels: LevelBuffer::default(),
1445 spare_rep: LevelBuffer::default(),
1446 def_levels: LevelBuffer::default(),
1447 spare_def: LevelBuffer::default(),
1448 current_rep: 0,
1449 current_def: 0,
1450 current_len: 0,
1451 current_num_specials: 0,
1452 has_fsl: false,
1453 },
1454 None,
1455 );
1456 }
1457
1458 let num_layers = builders[0].num_layers();
1459 let combined_layers = (0..num_layers)
1460 .map(|layer_index| {
1461 Self::concat_layers(
1462 builders.iter().map(|b| &b.repdefs[layer_index]),
1463 builders.len(),
1464 )
1465 })
1466 .collect::<Vec<_>>();
1467 debug_assert!(
1468 builders
1469 .iter()
1470 .all(|b| b.num_layers() == builders[0].num_layers())
1471 );
1472
1473 let total_len = combined_layers.last().unwrap().num_values()
1474 + combined_layers
1475 .iter()
1476 .map(|l| l.num_specials())
1477 .sum::<usize>();
1478 let max_rep = combined_layers.iter().map(|l| l.max_rep()).sum::<u16>();
1479 let max_def = combined_layers.iter().map(|l| l.max_def()).sum::<u16>();
1480 let bits_per_rep = if max_rep > 0 {
1481 u64::from(u16::BITS - max_rep.leading_zeros())
1482 } else {
1483 0
1484 };
1485 let bits_per_def = if max_def > 0 {
1486 u64::from(u16::BITS - max_def.leading_zeros())
1487 } else {
1488 0
1489 };
1490 let bits_per_level =
1491 (bits_per_rep + bits_per_def > 0).then_some(bits_per_rep + bits_per_def);
1492
1493 let mut context = SerializerContext::new(total_len, num_layers, max_rep, max_def);
1494 for layer in combined_layers.into_iter() {
1495 match layer {
1496 RawRepDef::Validity(def) => {
1497 context.record_validity(&def);
1498 }
1499 RawRepDef::Offsets(rep) => {
1500 context.record_offsets(&rep);
1501 }
1502 RawRepDef::Fsl(fsl) => {
1503 context.record_fsl(&fsl);
1504 }
1505 }
1506 }
1507 (context, bits_per_level)
1508 }
1509}
1510
1511#[derive(Debug)]
1516pub struct RepDefUnraveler {
1517 rep_levels: Option<LevelBuffer>,
1518 def_levels: Option<LevelBuffer>,
1519 levels_to_rep: Vec<u16>,
1521 def_meaning: Arc<[DefinitionInterpretation]>,
1522 current_def_cmp: u16,
1524 current_rep_cmp: u16,
1526 current_layer: usize,
1529 num_items: u64,
1531}
1532
1533impl RepDefUnraveler {
1534 pub fn new(
1536 rep_levels: Option<LevelBuffer>,
1537 def_levels: Option<LevelBuffer>,
1538 def_meaning: Arc<[DefinitionInterpretation]>,
1539 num_items: u64,
1540 ) -> Self {
1541 let mut levels_to_rep = Vec::with_capacity(def_meaning.len());
1542 let mut rep_counter = 0;
1543 levels_to_rep.push(0);
1545 for meaning in def_meaning.as_ref() {
1546 match meaning {
1547 DefinitionInterpretation::AllValidItem | DefinitionInterpretation::AllValidList => {
1548 }
1550 DefinitionInterpretation::NullableItem => {
1551 levels_to_rep.push(rep_counter);
1553 }
1554 DefinitionInterpretation::NullableList => {
1555 rep_counter += 1;
1556 levels_to_rep.push(rep_counter);
1557 }
1558 DefinitionInterpretation::EmptyableList => {
1559 rep_counter += 1;
1560 levels_to_rep.push(rep_counter);
1561 }
1562 DefinitionInterpretation::NullableAndEmptyableList => {
1563 rep_counter += 1;
1564 levels_to_rep.push(rep_counter);
1565 levels_to_rep.push(rep_counter);
1566 }
1567 }
1568 }
1569 Self {
1570 rep_levels,
1571 def_levels,
1572 current_def_cmp: 0,
1573 current_rep_cmp: 0,
1574 levels_to_rep,
1575 current_layer: 0,
1576 def_meaning,
1577 num_items,
1578 }
1579 }
1580
1581 pub fn is_all_valid(&self) -> bool {
1582 self.def_levels.is_none() || self.def_meaning[self.current_layer].is_all_valid()
1583 }
1584
1585 pub fn max_lists(&self) -> usize {
1591 debug_assert!(
1592 self.def_meaning[self.current_layer] != DefinitionInterpretation::NullableItem
1593 );
1594 self.rep_levels
1595 .as_ref()
1596 .map(|levels| levels.len())
1598 .unwrap_or(0)
1599 }
1600
1601 pub fn unravel_offsets<T: ArrowNativeType>(
1606 &mut self,
1607 offsets: &mut Vec<T>,
1608 validity: Option<&mut BooleanBufferBuilder>,
1609 ) -> Result<()> {
1610 let rep_levels = self
1611 .rep_levels
1612 .as_mut()
1613 .expect("Expected repetition level but data didn't contain repetition");
1614 let valid_level = self.current_def_cmp;
1615 let (null_level, empty_level) = match self.def_meaning[self.current_layer] {
1616 DefinitionInterpretation::NullableList => {
1617 self.current_def_cmp += 1;
1618 (valid_level + 1, 0)
1619 }
1620 DefinitionInterpretation::EmptyableList => {
1621 self.current_def_cmp += 1;
1622 (0, valid_level + 1)
1623 }
1624 DefinitionInterpretation::NullableAndEmptyableList => {
1625 self.current_def_cmp += 2;
1626 (valid_level + 1, valid_level + 2)
1627 }
1628 DefinitionInterpretation::AllValidList => (0, 0),
1629 _ => unreachable!(),
1630 };
1631 self.current_layer += 1;
1632
1633 let mut max_level = null_level.max(empty_level).max(valid_level);
1637 let upper_null = max_level;
1640 for level in self.def_meaning[self.current_layer..].iter() {
1641 match level {
1642 DefinitionInterpretation::NullableItem => {
1643 max_level += 1;
1644 }
1645 DefinitionInterpretation::AllValidItem => {}
1646 _ => {
1647 break;
1648 }
1649 }
1650 }
1651
1652 let mut curlen: usize = offsets.last().map(|o| o.as_usize()).unwrap_or(0);
1653
1654 offsets.pop();
1662
1663 let to_offset = |val: usize| {
1664 T::from_usize(val)
1665 .ok_or_else(|| Error::invalid_input("A single batch had more than i32::MAX values and so a large container type is required"))
1666 };
1667 self.current_rep_cmp += 1;
1668 if let Some(def_levels) = &mut self.def_levels {
1669 assert!(rep_levels.len() == def_levels.len());
1670 let mut push_validity: Box<dyn FnMut(bool)> = if let Some(validity) = validity {
1673 Box::new(|is_valid| validity.append(is_valid))
1674 } else {
1675 Box::new(|_| {})
1676 };
1677 let mut read_idx = 0;
1681 let mut write_idx = 0;
1682 while read_idx < rep_levels.len() {
1683 unsafe {
1686 let rep_val = *rep_levels.get_unchecked(read_idx);
1687 if rep_val != 0 {
1688 let def_val = *def_levels.get_unchecked(read_idx);
1689 *rep_levels.get_unchecked_mut(write_idx) = rep_val - 1;
1691 *def_levels.get_unchecked_mut(write_idx) = def_val;
1692 write_idx += 1;
1693
1694 if def_val == 0 {
1695 offsets.push(to_offset(curlen)?);
1697 curlen += 1;
1698 push_validity(true);
1699 } else if def_val > max_level {
1700 } else if def_val == null_level || def_val > upper_null {
1702 offsets.push(to_offset(curlen)?);
1704 push_validity(false);
1705 } else if def_val == empty_level {
1706 offsets.push(to_offset(curlen)?);
1708 push_validity(true);
1709 } else {
1710 offsets.push(to_offset(curlen)?);
1712 curlen += 1;
1713 push_validity(true);
1714 }
1715 } else {
1716 curlen += 1;
1717 }
1718 read_idx += 1;
1719 }
1720 }
1721 offsets.push(to_offset(curlen)?);
1722 rep_levels.truncate(write_idx);
1723 def_levels.truncate(write_idx);
1724 Ok(())
1725 } else {
1726 let mut read_idx = 0;
1728 let mut write_idx = 0;
1729 let old_offsets_len = offsets.len();
1730 while read_idx < rep_levels.len() {
1731 unsafe {
1733 let rep_val = *rep_levels.get_unchecked(read_idx);
1734 if rep_val != 0 {
1735 offsets.push(to_offset(curlen)?);
1737 *rep_levels.get_unchecked_mut(write_idx) = rep_val - 1;
1738 write_idx += 1;
1739 }
1740 curlen += 1;
1741 read_idx += 1;
1742 }
1743 }
1744 let num_new_lists = offsets.len() - old_offsets_len;
1745 offsets.push(to_offset(curlen)?);
1746 rep_levels.truncate(offsets.len() - 1);
1747 if let Some(validity) = validity {
1748 validity.append_n(num_new_lists, true);
1751 }
1752 Ok(())
1753 }
1754 }
1755
1756 pub fn skip_validity(&mut self) {
1757 debug_assert!(self.is_all_valid());
1758 self.current_layer += 1;
1759 }
1760
1761 pub fn unravel_validity(&mut self, validity: &mut BooleanBufferBuilder) {
1763 let meaning = self.def_meaning[self.current_layer];
1764 if meaning == DefinitionInterpretation::AllValidItem || self.def_levels.is_none() {
1765 self.current_layer += 1;
1766 validity.append_n(self.num_items as usize, true);
1767 return;
1768 }
1769
1770 self.current_layer += 1;
1771 let def_levels = &self.def_levels.as_ref().unwrap();
1772
1773 let current_def_cmp = self.current_def_cmp;
1774 self.current_def_cmp += 1;
1775
1776 for is_valid in def_levels.iter().filter_map(|&level| {
1777 if self.levels_to_rep[level as usize] <= self.current_rep_cmp {
1778 Some(level <= current_def_cmp)
1779 } else {
1780 None
1781 }
1782 }) {
1783 validity.append(is_valid);
1784 }
1785 }
1786
1787 pub fn decimate(&mut self, dimension: usize) {
1788 if self.rep_levels.is_some() {
1789 todo!("Not yet supported FSL<...List<...>>");
1801 }
1802 let Some(def_levels) = self.def_levels.as_mut() else {
1803 return;
1804 };
1805 let mut read_idx = 0;
1806 let mut write_idx = 0;
1807 while read_idx < def_levels.len() {
1808 unsafe {
1809 *def_levels.get_unchecked_mut(write_idx) = *def_levels.get_unchecked(read_idx);
1810 }
1811 write_idx += 1;
1812 read_idx += dimension;
1813 }
1814 def_levels.truncate(write_idx);
1815 }
1816}
1817
1818#[derive(Debug)]
1832pub struct CompositeRepDefUnraveler {
1833 unravelers: Vec<RepDefUnraveler>,
1834}
1835
1836impl CompositeRepDefUnraveler {
1837 pub fn new(unravelers: Vec<RepDefUnraveler>) -> Self {
1838 Self { unravelers }
1839 }
1840
1841 pub fn unravel_validity(&mut self, num_values: usize) -> Option<NullBuffer> {
1845 let is_all_valid = self
1846 .unravelers
1847 .iter()
1848 .all(|unraveler| unraveler.is_all_valid());
1849
1850 if is_all_valid {
1851 for unraveler in self.unravelers.iter_mut() {
1852 unraveler.skip_validity();
1853 }
1854 None
1855 } else {
1856 let mut validity = BooleanBufferBuilder::new(num_values);
1857 for unraveler in self.unravelers.iter_mut() {
1858 unraveler.unravel_validity(&mut validity);
1859 }
1860 Some(NullBuffer::new(validity.finish()))
1861 }
1862 }
1863
1864 pub fn unravel_fsl_validity(
1865 &mut self,
1866 num_values: usize,
1867 dimension: usize,
1868 ) -> Option<NullBuffer> {
1869 for unraveler in self.unravelers.iter_mut() {
1870 unraveler.decimate(dimension);
1871 }
1872 self.unravel_validity(num_values)
1873 }
1874
1875 pub fn unravel_offsets<T: ArrowNativeType>(
1877 &mut self,
1878 ) -> Result<(OffsetBuffer<T>, Option<NullBuffer>)> {
1879 let mut is_all_valid = true;
1880 let mut max_num_lists = 0;
1881 for unraveler in self.unravelers.iter() {
1882 is_all_valid &= unraveler.is_all_valid();
1883 max_num_lists += unraveler.max_lists();
1884 }
1885
1886 let mut validity = if is_all_valid {
1887 None
1888 } else {
1889 Some(BooleanBufferBuilder::new(max_num_lists))
1892 };
1893
1894 let mut offsets = Vec::with_capacity(max_num_lists + 1);
1895
1896 for unraveler in self.unravelers.iter_mut() {
1897 unraveler.unravel_offsets(&mut offsets, validity.as_mut())?;
1898 }
1899
1900 Ok((
1901 OffsetBuffer::new(ScalarBuffer::from(offsets)),
1902 validity.map(|mut v| NullBuffer::new(v.finish())),
1903 ))
1904 }
1905}
1906
1907#[derive(Debug)]
1913pub struct BinaryControlWordIterator<I: Iterator<Item = (u16, u16)>, W> {
1914 repdef: I,
1915 def_width: usize,
1916 max_rep: u16,
1917 max_visible_def: u16,
1918 rep_mask: u16,
1919 def_mask: u16,
1920 bits_rep: u8,
1921 bits_def: u8,
1922 phantom: std::marker::PhantomData<W>,
1923}
1924
1925impl<I: Iterator<Item = (u16, u16)>> BinaryControlWordIterator<I, u8> {
1926 fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1927 let next = self.repdef.next()?;
1928 let control_word: u8 =
1929 (((next.0 & self.rep_mask) as u8) << self.def_width) + ((next.1 & self.def_mask) as u8);
1930 buf.push(control_word);
1931 let is_new_row = next.0 == self.max_rep;
1932 let is_visible = next.1 <= self.max_visible_def;
1933 let is_valid_item = next.1 == 0;
1934 Some(ControlWordDesc {
1935 is_new_row,
1936 is_visible,
1937 is_valid_item,
1938 })
1939 }
1940}
1941
1942impl<I: Iterator<Item = (u16, u16)>> BinaryControlWordIterator<I, u16> {
1943 fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1944 let next = self.repdef.next()?;
1945 let control_word: u16 =
1946 ((next.0 & self.rep_mask) << self.def_width) + (next.1 & self.def_mask);
1947 let control_word = control_word.to_le_bytes();
1948 buf.push(control_word[0]);
1949 buf.push(control_word[1]);
1950 let is_new_row = next.0 == self.max_rep;
1951 let is_visible = next.1 <= self.max_visible_def;
1952 let is_valid_item = next.1 == 0;
1953 Some(ControlWordDesc {
1954 is_new_row,
1955 is_visible,
1956 is_valid_item,
1957 })
1958 }
1959}
1960
1961impl<I: Iterator<Item = (u16, u16)>> BinaryControlWordIterator<I, u32> {
1962 fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1963 let next = self.repdef.next()?;
1964 let control_word: u32 = (((next.0 & self.rep_mask) as u32) << self.def_width)
1965 + ((next.1 & self.def_mask) as u32);
1966 let control_word = control_word.to_le_bytes();
1967 buf.push(control_word[0]);
1968 buf.push(control_word[1]);
1969 buf.push(control_word[2]);
1970 buf.push(control_word[3]);
1971 let is_new_row = next.0 == self.max_rep;
1972 let is_visible = next.1 <= self.max_visible_def;
1973 let is_valid_item = next.1 == 0;
1974 Some(ControlWordDesc {
1975 is_new_row,
1976 is_visible,
1977 is_valid_item,
1978 })
1979 }
1980}
1981
1982#[derive(Debug)]
1984pub struct UnaryControlWordIterator<I: Iterator<Item = u16>, W> {
1985 repdef: I,
1986 level_mask: u16,
1987 bits_rep: u8,
1988 bits_def: u8,
1989 max_rep: u16,
1990 phantom: std::marker::PhantomData<W>,
1991}
1992
1993impl<I: Iterator<Item = u16>> UnaryControlWordIterator<I, u8> {
1994 fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1995 let next = self.repdef.next()?;
1996 buf.push((next & self.level_mask) as u8);
1997 let is_new_row = self.max_rep == 0 || next == self.max_rep;
1998 let is_valid_item = next == 0 || self.bits_def == 0;
1999 Some(ControlWordDesc {
2000 is_new_row,
2001 is_visible: true,
2004 is_valid_item,
2005 })
2006 }
2007}
2008
2009impl<I: Iterator<Item = u16>> UnaryControlWordIterator<I, u16> {
2010 fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
2011 let next = self.repdef.next().unwrap() & self.level_mask;
2012 let control_word = next.to_le_bytes();
2013 buf.push(control_word[0]);
2014 buf.push(control_word[1]);
2015 let is_new_row = self.max_rep == 0 || next == self.max_rep;
2016 let is_valid_item = next == 0 || self.bits_def == 0;
2017 Some(ControlWordDesc {
2018 is_new_row,
2019 is_visible: true,
2020 is_valid_item,
2021 })
2022 }
2023}
2024
2025impl<I: Iterator<Item = u16>> UnaryControlWordIterator<I, u32> {
2026 fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
2027 let next = self.repdef.next()?;
2028 let next = (next & self.level_mask) as u32;
2029 let control_word = next.to_le_bytes();
2030 buf.push(control_word[0]);
2031 buf.push(control_word[1]);
2032 buf.push(control_word[2]);
2033 buf.push(control_word[3]);
2034 let is_new_row = self.max_rep == 0 || next as u16 == self.max_rep;
2035 let is_valid_item = next == 0 || self.bits_def == 0;
2036 Some(ControlWordDesc {
2037 is_new_row,
2038 is_visible: true,
2039 is_valid_item,
2040 })
2041 }
2042}
2043
2044#[derive(Debug)]
2046pub struct NilaryControlWordIterator {
2047 len: usize,
2048 idx: usize,
2049}
2050
2051impl NilaryControlWordIterator {
2052 fn append_next(&mut self) -> Option<ControlWordDesc> {
2053 if self.idx == self.len {
2054 None
2055 } else {
2056 self.idx += 1;
2057 Some(ControlWordDesc {
2058 is_new_row: true,
2059 is_visible: true,
2060 is_valid_item: true,
2061 })
2062 }
2063 }
2064}
2065
2066fn get_mask(width: u16) -> u16 {
2068 (1 << width) - 1
2069}
2070
2071type SpecificBinaryControlWordIterator<'a, T> = BinaryControlWordIterator<
2074 Zip<Copied<std::slice::Iter<'a, u16>>, Copied<std::slice::Iter<'a, u16>>>,
2075 T,
2076>;
2077
2078#[derive(Debug)]
2088pub enum ControlWordIterator<'a> {
2089 Binary8(SpecificBinaryControlWordIterator<'a, u8>),
2090 Binary16(SpecificBinaryControlWordIterator<'a, u16>),
2091 Binary32(SpecificBinaryControlWordIterator<'a, u32>),
2092 Unary8(UnaryControlWordIterator<Copied<std::slice::Iter<'a, u16>>, u8>),
2093 Unary16(UnaryControlWordIterator<Copied<std::slice::Iter<'a, u16>>, u16>),
2094 Unary32(UnaryControlWordIterator<Copied<std::slice::Iter<'a, u16>>, u32>),
2095 Nilary(NilaryControlWordIterator),
2096}
2097
2098#[derive(Debug)]
2100pub struct ControlWordDesc {
2101 pub is_new_row: bool,
2102 pub is_visible: bool,
2103 pub is_valid_item: bool,
2104}
2105
2106impl ControlWordIterator<'_> {
2107 pub fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
2111 match self {
2112 Self::Binary8(iter) => iter.append_next(buf),
2113 Self::Binary16(iter) => iter.append_next(buf),
2114 Self::Binary32(iter) => iter.append_next(buf),
2115 Self::Unary8(iter) => iter.append_next(buf),
2116 Self::Unary16(iter) => iter.append_next(buf),
2117 Self::Unary32(iter) => iter.append_next(buf),
2118 Self::Nilary(iter) => iter.append_next(),
2119 }
2120 }
2121
2122 pub fn has_repetition(&self) -> bool {
2124 match self {
2125 Self::Binary8(_) | Self::Binary16(_) | Self::Binary32(_) => true,
2126 Self::Unary8(iter) => iter.bits_rep > 0,
2127 Self::Unary16(iter) => iter.bits_rep > 0,
2128 Self::Unary32(iter) => iter.bits_rep > 0,
2129 Self::Nilary(_) => false,
2130 }
2131 }
2132
2133 pub fn bytes_per_word(&self) -> usize {
2135 match self {
2136 Self::Binary8(_) => 1,
2137 Self::Binary16(_) => 2,
2138 Self::Binary32(_) => 4,
2139 Self::Unary8(_) => 1,
2140 Self::Unary16(_) => 2,
2141 Self::Unary32(_) => 4,
2142 Self::Nilary(_) => 0,
2143 }
2144 }
2145
2146 pub fn bits_rep(&self) -> u8 {
2148 match self {
2149 Self::Binary8(iter) => iter.bits_rep,
2150 Self::Binary16(iter) => iter.bits_rep,
2151 Self::Binary32(iter) => iter.bits_rep,
2152 Self::Unary8(iter) => iter.bits_rep,
2153 Self::Unary16(iter) => iter.bits_rep,
2154 Self::Unary32(iter) => iter.bits_rep,
2155 Self::Nilary(_) => 0,
2156 }
2157 }
2158
2159 pub fn bits_def(&self) -> u8 {
2161 match self {
2162 Self::Binary8(iter) => iter.bits_def,
2163 Self::Binary16(iter) => iter.bits_def,
2164 Self::Binary32(iter) => iter.bits_def,
2165 Self::Unary8(iter) => iter.bits_def,
2166 Self::Unary16(iter) => iter.bits_def,
2167 Self::Unary32(iter) => iter.bits_def,
2168 Self::Nilary(_) => 0,
2169 }
2170 }
2171}
2172
2173pub fn build_control_word_iterator<'a>(
2177 rep: Option<&'a [u16]>,
2178 max_rep: u16,
2179 def: Option<&'a [u16]>,
2180 max_def: u16,
2181 max_visible_def: u16,
2182 len: usize,
2183) -> ControlWordIterator<'a> {
2184 let rep_width = if max_rep == 0 {
2185 0
2186 } else {
2187 log_2_ceil(max_rep as u32) as u16
2188 };
2189 let rep_mask = if max_rep == 0 { 0 } else { get_mask(rep_width) };
2190 let def_width = if max_def == 0 {
2191 0
2192 } else {
2193 log_2_ceil(max_def as u32) as u16
2194 };
2195 let def_mask = if max_def == 0 { 0 } else { get_mask(def_width) };
2196 let total_width = rep_width + def_width;
2197 match (rep, def) {
2198 (Some(rep), Some(def)) => {
2199 let iter = rep.iter().copied().zip(def.iter().copied());
2200 let def_width = def_width as usize;
2201 if total_width <= 8 {
2202 ControlWordIterator::Binary8(BinaryControlWordIterator {
2203 repdef: iter,
2204 rep_mask,
2205 def_mask,
2206 def_width,
2207 max_rep,
2208 max_visible_def,
2209 bits_rep: rep_width as u8,
2210 bits_def: def_width as u8,
2211 phantom: std::marker::PhantomData,
2212 })
2213 } else if total_width <= 16 {
2214 ControlWordIterator::Binary16(BinaryControlWordIterator {
2215 repdef: iter,
2216 rep_mask,
2217 def_mask,
2218 def_width,
2219 max_rep,
2220 max_visible_def,
2221 bits_rep: rep_width as u8,
2222 bits_def: def_width as u8,
2223 phantom: std::marker::PhantomData,
2224 })
2225 } else {
2226 ControlWordIterator::Binary32(BinaryControlWordIterator {
2227 repdef: iter,
2228 rep_mask,
2229 def_mask,
2230 def_width,
2231 max_rep,
2232 max_visible_def,
2233 bits_rep: rep_width as u8,
2234 bits_def: def_width as u8,
2235 phantom: std::marker::PhantomData,
2236 })
2237 }
2238 }
2239 (Some(lev), None) => {
2240 let iter = lev.iter().copied();
2241 if total_width <= 8 {
2242 ControlWordIterator::Unary8(UnaryControlWordIterator {
2243 repdef: iter,
2244 level_mask: rep_mask,
2245 bits_rep: total_width as u8,
2246 bits_def: 0,
2247 max_rep,
2248 phantom: std::marker::PhantomData,
2249 })
2250 } else if total_width <= 16 {
2251 ControlWordIterator::Unary16(UnaryControlWordIterator {
2252 repdef: iter,
2253 level_mask: rep_mask,
2254 bits_rep: total_width as u8,
2255 bits_def: 0,
2256 max_rep,
2257 phantom: std::marker::PhantomData,
2258 })
2259 } else {
2260 ControlWordIterator::Unary32(UnaryControlWordIterator {
2261 repdef: iter,
2262 level_mask: rep_mask,
2263 bits_rep: total_width as u8,
2264 bits_def: 0,
2265 max_rep,
2266 phantom: std::marker::PhantomData,
2267 })
2268 }
2269 }
2270 (None, Some(lev)) => {
2271 let iter = lev.iter().copied();
2272 if total_width <= 8 {
2273 ControlWordIterator::Unary8(UnaryControlWordIterator {
2274 repdef: iter,
2275 level_mask: def_mask,
2276 bits_rep: 0,
2277 bits_def: total_width as u8,
2278 max_rep: 0,
2279 phantom: std::marker::PhantomData,
2280 })
2281 } else if total_width <= 16 {
2282 ControlWordIterator::Unary16(UnaryControlWordIterator {
2283 repdef: iter,
2284 level_mask: def_mask,
2285 bits_rep: 0,
2286 bits_def: total_width as u8,
2287 max_rep: 0,
2288 phantom: std::marker::PhantomData,
2289 })
2290 } else {
2291 ControlWordIterator::Unary32(UnaryControlWordIterator {
2292 repdef: iter,
2293 level_mask: def_mask,
2294 bits_rep: 0,
2295 bits_def: total_width as u8,
2296 max_rep: 0,
2297 phantom: std::marker::PhantomData,
2298 })
2299 }
2300 }
2301 (None, None) => ControlWordIterator::Nilary(NilaryControlWordIterator { len, idx: 0 }),
2302 }
2303}
2304
2305#[derive(Copy, Clone, Debug)]
2309pub enum ControlWordParser {
2310 BOTH8(u8, u32),
2313 BOTH16(u8, u32),
2314 BOTH32(u8, u32),
2315 REP8,
2316 REP16,
2317 REP32,
2318 DEF8,
2319 DEF16,
2320 DEF32,
2321 NIL,
2322}
2323
2324impl ControlWordParser {
2325 fn parse_both<const WORD_SIZE: u8>(
2326 src: &[u8],
2327 dst_rep: &mut Vec<u16>,
2328 dst_def: &mut Vec<u16>,
2329 bits_to_shift: u8,
2330 mask_to_apply: u32,
2331 ) {
2332 match WORD_SIZE {
2333 1 => {
2334 let word = src[0];
2335 let rep = word >> bits_to_shift;
2336 let def = word & (mask_to_apply as u8);
2337 dst_rep.push(rep as u16);
2338 dst_def.push(def as u16);
2339 }
2340 2 => {
2341 let word = u16::from_le_bytes([src[0], src[1]]);
2342 let rep = word >> bits_to_shift;
2343 let def = word & mask_to_apply as u16;
2344 dst_rep.push(rep);
2345 dst_def.push(def);
2346 }
2347 4 => {
2348 let word = u32::from_le_bytes([src[0], src[1], src[2], src[3]]);
2349 let rep = word >> bits_to_shift;
2350 let def = word & mask_to_apply;
2351 dst_rep.push(rep as u16);
2352 dst_def.push(def as u16);
2353 }
2354 _ => unreachable!(),
2355 }
2356 }
2357
2358 fn parse_desc_both<const WORD_SIZE: u8>(
2359 src: &[u8],
2360 bits_to_shift: u8,
2361 mask_to_apply: u32,
2362 max_rep: u16,
2363 max_visible_def: u16,
2364 ) -> ControlWordDesc {
2365 match WORD_SIZE {
2366 1 => {
2367 let word = src[0];
2368 let rep = word >> bits_to_shift;
2369 let def = word & (mask_to_apply as u8);
2370 let is_visible = def as u16 <= max_visible_def;
2371 let is_new_row = rep as u16 == max_rep;
2372 let is_valid_item = def == 0;
2373 ControlWordDesc {
2374 is_visible,
2375 is_new_row,
2376 is_valid_item,
2377 }
2378 }
2379 2 => {
2380 let word = u16::from_le_bytes([src[0], src[1]]);
2381 let rep = word >> bits_to_shift;
2382 let def = word & mask_to_apply as u16;
2383 let is_visible = def <= max_visible_def;
2384 let is_new_row = rep == max_rep;
2385 let is_valid_item = def == 0;
2386 ControlWordDesc {
2387 is_visible,
2388 is_new_row,
2389 is_valid_item,
2390 }
2391 }
2392 4 => {
2393 let word = u32::from_le_bytes([src[0], src[1], src[2], src[3]]);
2394 let rep = word >> bits_to_shift;
2395 let def = word & mask_to_apply;
2396 let is_visible = def as u16 <= max_visible_def;
2397 let is_new_row = rep as u16 == max_rep;
2398 let is_valid_item = def == 0;
2399 ControlWordDesc {
2400 is_visible,
2401 is_new_row,
2402 is_valid_item,
2403 }
2404 }
2405 _ => unreachable!(),
2406 }
2407 }
2408
2409 fn parse_one<const WORD_SIZE: u8>(src: &[u8], dst: &mut Vec<u16>) {
2410 match WORD_SIZE {
2411 1 => {
2412 let word = src[0];
2413 dst.push(word as u16);
2414 }
2415 2 => {
2416 let word = u16::from_le_bytes([src[0], src[1]]);
2417 dst.push(word);
2418 }
2419 4 => {
2420 let word = u32::from_le_bytes([src[0], src[1], src[2], src[3]]);
2421 dst.push(word as u16);
2422 }
2423 _ => unreachable!(),
2424 }
2425 }
2426
2427 fn parse_rep_desc_one<const WORD_SIZE: u8>(src: &[u8], max_rep: u16) -> ControlWordDesc {
2428 match WORD_SIZE {
2429 1 => ControlWordDesc {
2430 is_new_row: src[0] as u16 == max_rep,
2431 is_visible: true,
2432 is_valid_item: true,
2433 },
2434 2 => ControlWordDesc {
2435 is_new_row: u16::from_le_bytes([src[0], src[1]]) == max_rep,
2436 is_visible: true,
2437 is_valid_item: true,
2438 },
2439 4 => ControlWordDesc {
2440 is_new_row: u32::from_le_bytes([src[0], src[1], src[2], src[3]]) as u16 == max_rep,
2441 is_visible: true,
2442 is_valid_item: true,
2443 },
2444 _ => unreachable!(),
2445 }
2446 }
2447
2448 fn parse_def_desc_one<const WORD_SIZE: u8>(src: &[u8]) -> ControlWordDesc {
2449 match WORD_SIZE {
2450 1 => ControlWordDesc {
2451 is_new_row: true,
2452 is_visible: true,
2453 is_valid_item: src[0] == 0,
2454 },
2455 2 => ControlWordDesc {
2456 is_new_row: true,
2457 is_visible: true,
2458 is_valid_item: u16::from_le_bytes([src[0], src[1]]) == 0,
2459 },
2460 4 => ControlWordDesc {
2461 is_new_row: true,
2462 is_visible: true,
2463 is_valid_item: u32::from_le_bytes([src[0], src[1], src[2], src[3]]) as u16 == 0,
2464 },
2465 _ => unreachable!(),
2466 }
2467 }
2468
2469 pub fn bytes_per_word(&self) -> usize {
2471 match self {
2472 Self::BOTH8(..) => 1,
2473 Self::BOTH16(..) => 2,
2474 Self::BOTH32(..) => 4,
2475 Self::REP8 => 1,
2476 Self::REP16 => 2,
2477 Self::REP32 => 4,
2478 Self::DEF8 => 1,
2479 Self::DEF16 => 2,
2480 Self::DEF32 => 4,
2481 Self::NIL => 0,
2482 }
2483 }
2484
2485 pub fn parse(&self, src: &[u8], dst_rep: &mut Vec<u16>, dst_def: &mut Vec<u16>) {
2492 match self {
2493 Self::BOTH8(bits_to_shift, mask_to_apply) => {
2494 Self::parse_both::<1>(src, dst_rep, dst_def, *bits_to_shift, *mask_to_apply)
2495 }
2496 Self::BOTH16(bits_to_shift, mask_to_apply) => {
2497 Self::parse_both::<2>(src, dst_rep, dst_def, *bits_to_shift, *mask_to_apply)
2498 }
2499 Self::BOTH32(bits_to_shift, mask_to_apply) => {
2500 Self::parse_both::<4>(src, dst_rep, dst_def, *bits_to_shift, *mask_to_apply)
2501 }
2502 Self::REP8 => Self::parse_one::<1>(src, dst_rep),
2503 Self::REP16 => Self::parse_one::<2>(src, dst_rep),
2504 Self::REP32 => Self::parse_one::<4>(src, dst_rep),
2505 Self::DEF8 => Self::parse_one::<1>(src, dst_def),
2506 Self::DEF16 => Self::parse_one::<2>(src, dst_def),
2507 Self::DEF32 => Self::parse_one::<4>(src, dst_def),
2508 Self::NIL => {}
2509 }
2510 }
2511
2512 pub fn has_rep(&self) -> bool {
2514 match self {
2515 Self::BOTH8(..)
2516 | Self::BOTH16(..)
2517 | Self::BOTH32(..)
2518 | Self::REP8
2519 | Self::REP16
2520 | Self::REP32 => true,
2521 Self::DEF8 | Self::DEF16 | Self::DEF32 | Self::NIL => false,
2522 }
2523 }
2524
2525 pub fn parse_desc(&self, src: &[u8], max_rep: u16, max_visible_def: u16) -> ControlWordDesc {
2527 match self {
2528 Self::BOTH8(bits_to_shift, mask_to_apply) => Self::parse_desc_both::<1>(
2529 src,
2530 *bits_to_shift,
2531 *mask_to_apply,
2532 max_rep,
2533 max_visible_def,
2534 ),
2535 Self::BOTH16(bits_to_shift, mask_to_apply) => Self::parse_desc_both::<2>(
2536 src,
2537 *bits_to_shift,
2538 *mask_to_apply,
2539 max_rep,
2540 max_visible_def,
2541 ),
2542 Self::BOTH32(bits_to_shift, mask_to_apply) => Self::parse_desc_both::<4>(
2543 src,
2544 *bits_to_shift,
2545 *mask_to_apply,
2546 max_rep,
2547 max_visible_def,
2548 ),
2549 Self::REP8 => Self::parse_rep_desc_one::<1>(src, max_rep),
2550 Self::REP16 => Self::parse_rep_desc_one::<2>(src, max_rep),
2551 Self::REP32 => Self::parse_rep_desc_one::<4>(src, max_rep),
2552 Self::DEF8 => Self::parse_def_desc_one::<1>(src),
2553 Self::DEF16 => Self::parse_def_desc_one::<2>(src),
2554 Self::DEF32 => Self::parse_def_desc_one::<4>(src),
2555 Self::NIL => ControlWordDesc {
2556 is_new_row: true,
2557 is_valid_item: true,
2558 is_visible: true,
2559 },
2560 }
2561 }
2562
2563 pub fn new(bits_rep: u8, bits_def: u8) -> Self {
2565 let total_bits = bits_rep + bits_def;
2566
2567 enum WordSize {
2568 One,
2569 Two,
2570 Four,
2571 }
2572
2573 let word_size = if total_bits <= 8 {
2574 WordSize::One
2575 } else if total_bits <= 16 {
2576 WordSize::Two
2577 } else {
2578 WordSize::Four
2579 };
2580
2581 match (bits_rep > 0, bits_def > 0, word_size) {
2582 (false, false, _) => Self::NIL,
2583 (false, true, WordSize::One) => Self::DEF8,
2584 (false, true, WordSize::Two) => Self::DEF16,
2585 (false, true, WordSize::Four) => Self::DEF32,
2586 (true, false, WordSize::One) => Self::REP8,
2587 (true, false, WordSize::Two) => Self::REP16,
2588 (true, false, WordSize::Four) => Self::REP32,
2589 (true, true, WordSize::One) => Self::BOTH8(bits_def, get_mask(bits_def as u16) as u32),
2590 (true, true, WordSize::Two) => Self::BOTH16(bits_def, get_mask(bits_def as u16) as u32),
2591 (true, true, WordSize::Four) => {
2592 Self::BOTH32(bits_def, get_mask(bits_def as u16) as u32)
2593 }
2594 }
2595 }
2596}
2597
2598#[cfg(test)]
2599mod tests {
2600 use arrow_buffer::{NullBuffer, OffsetBuffer, ScalarBuffer};
2601
2602 use crate::repdef::{
2603 CompositeRepDefUnraveler, DefinitionInterpretation, RepDefUnraveler, SerializedRepDefs,
2604 };
2605
2606 use super::RepDefBuilder;
2607
2608 fn validity(values: &[bool]) -> NullBuffer {
2609 NullBuffer::from_iter(values.iter().copied())
2610 }
2611
2612 fn offsets_32(values: &[i32]) -> OffsetBuffer<i32> {
2613 OffsetBuffer::<i32>::new(ScalarBuffer::from_iter(values.iter().copied()))
2614 }
2615
2616 fn offsets_64(values: &[i64]) -> OffsetBuffer<i64> {
2617 OffsetBuffer::<i64>::new(ScalarBuffer::from_iter(values.iter().copied()))
2618 }
2619
2620 #[test]
2621 fn test_repdef_empty_offsets() {
2622 let mut builder = RepDefBuilder::default();
2624 builder.add_offsets(offsets_32(&[0]), None);
2625 let repdefs = RepDefBuilder::serialize(vec![builder]);
2626 assert!(repdefs.repetition_levels.is_none());
2627 assert!(repdefs.definition_levels.is_none());
2628 }
2629
2630 #[test]
2631 fn test_repdef_basic() {
2632 let mut builder = RepDefBuilder::default();
2634 builder.add_offsets(
2635 offsets_64(&[0, 2, 2, 5]),
2636 Some(validity(&[true, false, true])),
2637 );
2638 builder.add_offsets(
2639 offsets_64(&[0, 1, 3, 5, 5, 9]),
2640 Some(validity(&[true, true, true, false, true])),
2641 );
2642 builder.add_validity_bitmap(validity(&[
2643 true, true, true, false, false, false, true, true, false,
2644 ]));
2645
2646 let repdefs = RepDefBuilder::serialize(vec![builder]);
2647 let rep = repdefs.repetition_levels.unwrap();
2648 let def = repdefs.definition_levels.unwrap();
2649
2650 assert_eq!(vec![0, 0, 0, 3, 1, 1, 2, 1, 0, 0, 1], *def);
2651 assert_eq!(vec![2, 1, 0, 2, 2, 0, 1, 1, 0, 0, 0], *rep);
2652
2653 let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2656 Some(rep.as_ref().to_vec()),
2657 Some(def.as_ref().to_vec()),
2658 repdefs.def_meaning.into(),
2659 9,
2660 )]);
2661
2662 assert_eq!(
2665 unraveler.unravel_validity(9),
2666 Some(validity(&[
2667 true, true, true, false, false, false, true, true, false
2668 ]))
2669 );
2670 let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2671 assert_eq!(off.inner(), offsets_32(&[0, 1, 3, 5, 5, 9]).inner());
2672 assert_eq!(val, Some(validity(&[true, true, true, false, true])));
2673 let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2674 assert_eq!(off.inner(), offsets_32(&[0, 2, 2, 5]).inner());
2675 assert_eq!(val, Some(validity(&[true, false, true])));
2676 }
2677
2678 #[test]
2679 fn test_repdef_simple_null_empty_list() {
2680 let check = |repdefs: SerializedRepDefs, last_def: DefinitionInterpretation| {
2681 let rep = repdefs.repetition_levels.unwrap();
2682 let def = repdefs.definition_levels.unwrap();
2683
2684 assert_eq!([1, 0, 1, 1, 0, 0], *rep);
2685 assert_eq!([0, 0, 2, 0, 1, 0], *def);
2686 assert_eq!(
2687 vec![DefinitionInterpretation::NullableItem, last_def,],
2688 repdefs.def_meaning
2689 );
2690 };
2691
2692 let mut builder = RepDefBuilder::default();
2696 builder.add_offsets(
2697 offsets_32(&[0, 2, 2, 5]),
2698 Some(validity(&[true, false, true])),
2699 );
2700 builder.add_validity_bitmap(validity(&[true, true, true, false, true]));
2701
2702 let repdefs = RepDefBuilder::serialize(vec![builder]);
2703
2704 check(repdefs, DefinitionInterpretation::NullableList);
2705
2706 let mut builder = RepDefBuilder::default();
2708 builder.add_offsets(offsets_32(&[0, 2, 2, 5]), None);
2709 builder.add_validity_bitmap(validity(&[true, true, true, false, true]));
2710
2711 let repdefs = RepDefBuilder::serialize(vec![builder]);
2712
2713 check(repdefs, DefinitionInterpretation::EmptyableList);
2714 }
2715
2716 #[test]
2717 fn test_repdef_empty_list_at_end() {
2718 let mut builder = RepDefBuilder::default();
2720 builder.add_offsets(offsets_32(&[0, 2, 5, 5]), None);
2721 builder.add_validity_bitmap(validity(&[true, true, true, false, true]));
2722
2723 let repdefs = RepDefBuilder::serialize(vec![builder]);
2724
2725 let rep = repdefs.repetition_levels.unwrap();
2726 let def = repdefs.definition_levels.unwrap();
2727
2728 assert_eq!([1, 0, 1, 0, 0, 1], *rep);
2729 assert_eq!([0, 0, 0, 1, 0, 2], *def);
2730 assert_eq!(
2731 vec![
2732 DefinitionInterpretation::NullableItem,
2733 DefinitionInterpretation::EmptyableList,
2734 ],
2735 repdefs.def_meaning
2736 );
2737 }
2738
2739 #[test]
2740 fn test_repdef_abnormal_nulls() {
2741 let mut builder = RepDefBuilder::default();
2744 builder.add_offsets(
2745 offsets_32(&[0, 2, 5, 8]),
2746 Some(validity(&[true, false, true])),
2747 );
2748 builder.add_no_null(5);
2751
2752 let repdefs = RepDefBuilder::serialize(vec![builder]);
2753
2754 let rep = repdefs.repetition_levels.unwrap();
2755 let def = repdefs.definition_levels.unwrap();
2756
2757 assert_eq!([1, 0, 1, 1, 0, 0], *rep);
2758 assert_eq!([0, 0, 1, 0, 0, 0], *def);
2759
2760 assert_eq!(
2761 vec![
2762 DefinitionInterpretation::AllValidItem,
2763 DefinitionInterpretation::NullableList,
2764 ],
2765 repdefs.def_meaning
2766 );
2767 }
2768
2769 #[test]
2770 fn test_repdef_fsl() {
2771 let mut builder = RepDefBuilder::default();
2772 builder.add_fsl(Some(validity(&[true, false])), 2, 2);
2773 builder.add_fsl(None, 2, 4);
2774 builder.add_validity_bitmap(validity(&[
2775 true, false, true, false, true, false, true, false,
2776 ]));
2777
2778 let repdefs = RepDefBuilder::serialize(vec![builder]);
2779
2780 assert_eq!(
2781 vec![
2782 DefinitionInterpretation::NullableItem,
2783 DefinitionInterpretation::AllValidItem,
2784 DefinitionInterpretation::NullableItem
2785 ],
2786 repdefs.def_meaning
2787 );
2788
2789 assert!(repdefs.repetition_levels.is_none());
2790
2791 let def = repdefs.definition_levels.unwrap();
2792
2793 assert_eq!([0, 1, 0, 1, 2, 2, 2, 2], *def);
2794
2795 let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2796 None,
2797 Some(def.as_ref().to_vec()),
2798 repdefs.def_meaning.into(),
2799 8,
2800 )]);
2801
2802 assert_eq!(
2803 unraveler.unravel_validity(8),
2804 Some(validity(&[
2805 true, false, true, false, false, false, false, false
2806 ]))
2807 );
2808 assert_eq!(unraveler.unravel_fsl_validity(4, 2), None);
2809 assert_eq!(
2810 unraveler.unravel_fsl_validity(2, 2),
2811 Some(validity(&[true, false]))
2812 );
2813 }
2814
2815 #[test]
2816 fn test_repdef_fsl_allvalid_item() {
2817 let mut builder = RepDefBuilder::default();
2818 builder.add_fsl(Some(validity(&[true, false])), 2, 2);
2819 builder.add_fsl(None, 2, 4);
2820 builder.add_no_null(8);
2821
2822 let repdefs = RepDefBuilder::serialize(vec![builder]);
2823
2824 assert_eq!(
2825 vec![
2826 DefinitionInterpretation::AllValidItem,
2827 DefinitionInterpretation::AllValidItem,
2828 DefinitionInterpretation::NullableItem
2829 ],
2830 repdefs.def_meaning
2831 );
2832
2833 assert!(repdefs.repetition_levels.is_none());
2834
2835 let def = repdefs.definition_levels.unwrap();
2836
2837 assert_eq!([0, 0, 0, 0, 1, 1, 1, 1], *def);
2838
2839 let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2840 None,
2841 Some(def.as_ref().to_vec()),
2842 repdefs.def_meaning.into(),
2843 8,
2844 )]);
2845
2846 assert_eq!(unraveler.unravel_validity(8), None);
2847 assert_eq!(unraveler.unravel_fsl_validity(4, 2), None);
2848 assert_eq!(
2849 unraveler.unravel_fsl_validity(2, 2),
2850 Some(validity(&[true, false]))
2851 );
2852 }
2853
2854 #[test]
2855 fn test_repdef_sliced_offsets() {
2856 let mut builder = RepDefBuilder::default();
2859 builder.add_offsets(
2860 offsets_32(&[5, 7, 7, 10]),
2861 Some(validity(&[true, false, true])),
2862 );
2863 builder.add_no_null(5);
2864
2865 let repdefs = RepDefBuilder::serialize(vec![builder]);
2866
2867 let rep = repdefs.repetition_levels.unwrap();
2868 let def = repdefs.definition_levels.unwrap();
2869
2870 assert_eq!([1, 0, 1, 1, 0, 0], *rep);
2871 assert_eq!([0, 0, 1, 0, 0, 0], *def);
2872
2873 assert_eq!(
2874 vec![
2875 DefinitionInterpretation::AllValidItem,
2876 DefinitionInterpretation::NullableList,
2877 ],
2878 repdefs.def_meaning
2879 );
2880 }
2881
2882 #[test]
2883 fn test_repdef_complex_null_empty() {
2884 let mut builder = RepDefBuilder::default();
2885 builder.add_offsets(
2886 offsets_32(&[0, 4, 4, 4, 6]),
2887 Some(validity(&[true, false, true, true])),
2888 );
2889 builder.add_offsets(
2890 offsets_32(&[0, 1, 1, 2, 2, 2, 3]),
2891 Some(validity(&[true, false, true, false, true, true])),
2892 );
2893 builder.add_no_null(3);
2894
2895 let repdefs = RepDefBuilder::serialize(vec![builder]);
2896
2897 let rep = repdefs.repetition_levels.unwrap();
2898 let def = repdefs.definition_levels.unwrap();
2899
2900 assert_eq!([2, 1, 1, 1, 2, 2, 2, 1], *rep);
2901 assert_eq!([0, 1, 0, 1, 3, 4, 2, 0], *def);
2902 }
2903
2904 #[test]
2905 fn test_repdef_empty_list_no_null() {
2906 let mut builder = RepDefBuilder::default();
2909 builder.add_offsets(offsets_32(&[0, 4, 4, 4, 6]), None);
2910 builder.add_no_null(6);
2911
2912 let repdefs = RepDefBuilder::serialize(vec![builder]);
2913
2914 let rep = repdefs.repetition_levels.unwrap();
2915 let def = repdefs.definition_levels.unwrap();
2916
2917 assert_eq!([1, 0, 0, 0, 1, 1, 1, 0], *rep);
2918 assert_eq!([0, 0, 0, 0, 1, 1, 0, 0], *def);
2919
2920 let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2921 Some(rep.as_ref().to_vec()),
2922 Some(def.as_ref().to_vec()),
2923 repdefs.def_meaning.into(),
2924 8,
2925 )]);
2926
2927 assert_eq!(unraveler.unravel_validity(6), None);
2928 let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2929 assert_eq!(off.inner(), offsets_32(&[0, 4, 4, 4, 6]).inner());
2930 assert_eq!(val, None);
2931 }
2932
2933 #[test]
2934 fn test_repdef_all_valid() {
2935 let mut builder = RepDefBuilder::default();
2936 builder.add_offsets(offsets_64(&[0, 2, 3, 5]), None);
2937 builder.add_offsets(offsets_64(&[0, 1, 3, 5, 7, 9]), None);
2938 builder.add_no_null(9);
2939
2940 let repdefs = RepDefBuilder::serialize(vec![builder]);
2941 let rep = repdefs.repetition_levels.unwrap();
2942 assert!(repdefs.definition_levels.is_none());
2943
2944 assert_eq!([2, 1, 0, 2, 0, 2, 0, 1, 0], *rep);
2945
2946 let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2947 Some(rep.as_ref().to_vec()),
2948 None,
2949 repdefs.def_meaning.into(),
2950 9,
2951 )]);
2952
2953 assert_eq!(unraveler.unravel_validity(9), None);
2954 let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2955 assert_eq!(off.inner(), offsets_32(&[0, 1, 3, 5, 7, 9]).inner());
2956 assert_eq!(val, None);
2957 let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2958 assert_eq!(off.inner(), offsets_32(&[0, 2, 3, 5]).inner());
2959 assert_eq!(val, None);
2960 }
2961
2962 #[test]
2963 fn test_only_empty_lists() {
2964 let mut builder = RepDefBuilder::default();
2965 builder.add_offsets(offsets_32(&[0, 4, 4, 4, 6]), None);
2966 builder.add_no_null(6);
2967
2968 let repdefs = RepDefBuilder::serialize(vec![builder]);
2969
2970 let rep = repdefs.repetition_levels.unwrap();
2971 let def = repdefs.definition_levels.unwrap();
2972
2973 assert_eq!([1, 0, 0, 0, 1, 1, 1, 0], *rep);
2974 assert_eq!([0, 0, 0, 0, 1, 1, 0, 0], *def);
2975
2976 let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2977 Some(rep.as_ref().to_vec()),
2978 Some(def.as_ref().to_vec()),
2979 repdefs.def_meaning.into(),
2980 8,
2981 )]);
2982
2983 assert_eq!(unraveler.unravel_validity(6), None);
2984 let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2985 assert_eq!(off.inner(), offsets_32(&[0, 4, 4, 4, 6]).inner());
2986 assert_eq!(val, None);
2987 }
2988
2989 #[test]
2990 fn test_only_null_lists() {
2991 let mut builder = RepDefBuilder::default();
2992 builder.add_offsets(
2993 offsets_32(&[0, 4, 4, 4, 6]),
2994 Some(validity(&[true, false, false, true])),
2995 );
2996 builder.add_no_null(6);
2997
2998 let repdefs = RepDefBuilder::serialize(vec![builder]);
2999
3000 let rep = repdefs.repetition_levels.unwrap();
3001 let def = repdefs.definition_levels.unwrap();
3002
3003 assert_eq!([1, 0, 0, 0, 1, 1, 1, 0], *rep);
3004 assert_eq!([0, 0, 0, 0, 1, 1, 0, 0], *def);
3005
3006 let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
3007 Some(rep.as_ref().to_vec()),
3008 Some(def.as_ref().to_vec()),
3009 repdefs.def_meaning.into(),
3010 8,
3011 )]);
3012
3013 assert_eq!(unraveler.unravel_validity(6), None);
3014 let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
3015 assert_eq!(off.inner(), offsets_32(&[0, 4, 4, 4, 6]).inner());
3016 assert_eq!(val, Some(validity(&[true, false, false, true])));
3017 }
3018
3019 #[test]
3020 fn test_null_and_empty_lists() {
3021 let mut builder = RepDefBuilder::default();
3022 builder.add_offsets(
3023 offsets_32(&[0, 4, 4, 4, 6]),
3024 Some(validity(&[true, false, true, true])),
3025 );
3026 builder.add_no_null(6);
3027
3028 let repdefs = RepDefBuilder::serialize(vec![builder]);
3029
3030 let rep = repdefs.repetition_levels.unwrap();
3031 let def = repdefs.definition_levels.unwrap();
3032
3033 assert_eq!([1, 0, 0, 0, 1, 1, 1, 0], *rep);
3034 assert_eq!([0, 0, 0, 0, 1, 2, 0, 0], *def);
3035
3036 let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
3037 Some(rep.as_ref().to_vec()),
3038 Some(def.as_ref().to_vec()),
3039 repdefs.def_meaning.into(),
3040 8,
3041 )]);
3042
3043 assert_eq!(unraveler.unravel_validity(6), None);
3044 let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
3045 assert_eq!(off.inner(), offsets_32(&[0, 4, 4, 4, 6]).inner());
3046 assert_eq!(val, Some(validity(&[true, false, true, true])));
3047 }
3048
3049 #[test]
3050 fn test_repdef_null_struct_valid_list() {
3051 let rep = vec![1, 0, 0, 0];
3054 let def = vec![2, 0, 2, 2];
3055 let def_meaning = vec![
3057 DefinitionInterpretation::NullableItem,
3058 DefinitionInterpretation::NullableItem,
3059 DefinitionInterpretation::AllValidList,
3060 ];
3061 let num_items = 4;
3062
3063 let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
3064 Some(rep),
3065 Some(def),
3066 def_meaning.into(),
3067 num_items,
3068 )]);
3069
3070 assert_eq!(
3071 unraveler.unravel_validity(4),
3072 Some(validity(&[false, true, false, false]))
3073 );
3074 assert_eq!(
3075 unraveler.unravel_validity(4),
3076 Some(validity(&[false, true, false, false]))
3077 );
3078 let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
3079 assert_eq!(off.inner(), offsets_32(&[0, 4]).inner());
3080 assert_eq!(val, None);
3081 }
3082
3083 #[test]
3084 fn test_repdef_no_rep() {
3085 let mut builder = RepDefBuilder::default();
3086 builder.add_no_null(5);
3087 builder.add_validity_bitmap(validity(&[false, false, true, true, true]));
3088 builder.add_validity_bitmap(validity(&[false, true, true, true, false]));
3089
3090 let repdefs = RepDefBuilder::serialize(vec![builder]);
3091 assert!(repdefs.repetition_levels.is_none());
3092 let def = repdefs.definition_levels.unwrap();
3093
3094 assert_eq!([2, 2, 0, 0, 1], *def);
3095
3096 let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
3097 None,
3098 Some(def.as_ref().to_vec()),
3099 repdefs.def_meaning.into(),
3100 5,
3101 )]);
3102
3103 assert_eq!(
3104 unraveler.unravel_validity(5),
3105 Some(validity(&[false, false, true, true, false]))
3106 );
3107 assert_eq!(
3108 unraveler.unravel_validity(5),
3109 Some(validity(&[false, false, true, true, true]))
3110 );
3111 assert_eq!(unraveler.unravel_validity(5), None);
3112 }
3113
3114 #[test]
3115 fn test_composite_unravel() {
3116 let mut builder = RepDefBuilder::default();
3117 builder.add_offsets(
3118 offsets_64(&[0, 2, 2, 5]),
3119 Some(validity(&[true, false, true])),
3120 );
3121 builder.add_no_null(5);
3122 let repdef1 = RepDefBuilder::serialize(vec![builder]);
3123
3124 let mut builder = RepDefBuilder::default();
3125 builder.add_offsets(offsets_64(&[0, 1, 3, 5, 7, 9]), None);
3126 builder.add_no_null(9);
3127 let repdef2 = RepDefBuilder::serialize(vec![builder]);
3128
3129 let rep1 = repdef1.repetition_levels.clone().unwrap();
3130 let def1 = repdef1.definition_levels.clone().unwrap();
3131 let rep2 = repdef2.repetition_levels.clone().unwrap();
3132 assert!(repdef2.definition_levels.is_none());
3133
3134 assert_eq!([1, 0, 1, 1, 0, 0], *rep1);
3135 assert_eq!([0, 0, 1, 0, 0, 0], *def1);
3136 assert_eq!([1, 1, 0, 1, 0, 1, 0, 1, 0], *rep2);
3137
3138 let unravel1 = RepDefUnraveler::new(
3139 repdef1.repetition_levels.map(|l| l.to_vec()),
3140 repdef1.definition_levels.map(|l| l.to_vec()),
3141 repdef1.def_meaning.into(),
3142 5,
3143 );
3144 let unravel2 = RepDefUnraveler::new(
3145 repdef2.repetition_levels.map(|l| l.to_vec()),
3146 repdef2.definition_levels.map(|l| l.to_vec()),
3147 repdef2.def_meaning.into(),
3148 9,
3149 );
3150
3151 let mut unraveler = CompositeRepDefUnraveler::new(vec![unravel1, unravel2]);
3152
3153 assert!(unraveler.unravel_validity(9).is_none());
3154 let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
3155 assert_eq!(
3156 off.inner(),
3157 offsets_32(&[0, 2, 2, 5, 6, 8, 10, 12, 14]).inner()
3158 );
3159 assert_eq!(
3160 val,
3161 Some(validity(&[true, false, true, true, true, true, true, true]))
3162 );
3163 }
3164
3165 #[test]
3166 fn test_repdef_multiple_builders() {
3167 let mut builder1 = RepDefBuilder::default();
3169 builder1.add_offsets(offsets_64(&[0, 2]), None);
3170 builder1.add_offsets(offsets_64(&[0, 1, 3]), None);
3171 builder1.add_validity_bitmap(validity(&[true, true, true]));
3172
3173 let mut builder2 = RepDefBuilder::default();
3174 builder2.add_offsets(offsets_64(&[0, 0, 3]), Some(validity(&[false, true])));
3175 builder2.add_offsets(
3176 offsets_64(&[0, 2, 2, 6]),
3177 Some(validity(&[true, false, true])),
3178 );
3179 builder2.add_validity_bitmap(validity(&[false, false, false, true, true, false]));
3180
3181 let repdefs = RepDefBuilder::serialize(vec![builder1, builder2]);
3182
3183 let rep = repdefs.repetition_levels.unwrap();
3184 let def = repdefs.definition_levels.unwrap();
3185
3186 assert_eq!([2, 1, 0, 2, 2, 0, 1, 1, 0, 0, 0], *rep);
3187 assert_eq!([0, 0, 0, 3, 1, 1, 2, 1, 0, 0, 1], *def);
3188 }
3189
3190 #[test]
3191 fn test_all_valid_validity_bitmap_serializes_as_no_null() {
3192 let mut from_bitmap = RepDefBuilder::default();
3193 from_bitmap.add_validity_bitmap(validity(&[true, true, true, true]));
3194
3195 let mut from_no_null = RepDefBuilder::default();
3196 from_no_null.add_no_null(4);
3197
3198 let from_bitmap = RepDefBuilder::serialize(vec![from_bitmap]);
3199 let from_no_null = RepDefBuilder::serialize(vec![from_no_null]);
3200
3201 assert!(from_bitmap.repetition_levels.is_none());
3202 assert!(from_bitmap.definition_levels.is_none());
3203 assert_eq!(from_bitmap.def_meaning, from_no_null.def_meaning);
3204 assert_eq!(
3205 from_bitmap.max_visible_level,
3206 from_no_null.max_visible_level
3207 );
3208 }
3209
3210 #[test]
3211 fn test_slicer() {
3212 let mut builder = RepDefBuilder::default();
3213 builder.add_offsets(
3214 offsets_64(&[0, 2, 2, 30, 30]),
3215 Some(validity(&[true, false, true, true])),
3216 );
3217 builder.add_no_null(30);
3218
3219 let repdefs = RepDefBuilder::serialize(vec![builder]);
3220
3221 let mut rep_slicer = repdefs.rep_slicer().unwrap();
3222
3223 assert_eq!(rep_slicer.slice_next(5).len(), 12);
3225 assert_eq!(rep_slicer.slice_next(20).len(), 40);
3227 assert_eq!(rep_slicer.slice_rest().len(), 12);
3229
3230 let mut def_slicer = repdefs.rep_slicer().unwrap();
3231
3232 assert_eq!(def_slicer.slice_next(5).len(), 12);
3234 assert_eq!(def_slicer.slice_next(20).len(), 40);
3236 assert_eq!(def_slicer.slice_rest().len(), 12);
3238 }
3239
3240 #[test]
3241 fn test_control_words() {
3242 fn check(
3244 rep: &[u16],
3245 def: &[u16],
3246 expected_values: Vec<u8>,
3247 expected_bytes_per_word: usize,
3248 expected_bits_rep: u8,
3249 expected_bits_def: u8,
3250 ) {
3251 let num_vals = rep.len().max(def.len());
3252 let max_rep = rep.iter().max().copied().unwrap_or(0);
3253 let max_def = def.iter().max().copied().unwrap_or(0);
3254
3255 let in_rep = if rep.is_empty() { None } else { Some(rep) };
3256 let in_def = if def.is_empty() { None } else { Some(def) };
3257
3258 let mut iter = super::build_control_word_iterator(
3259 in_rep,
3260 max_rep,
3261 in_def,
3262 max_def,
3263 max_def + 1,
3264 expected_values.len(),
3265 );
3266 assert_eq!(iter.bytes_per_word(), expected_bytes_per_word);
3267 assert_eq!(iter.bits_rep(), expected_bits_rep);
3268 assert_eq!(iter.bits_def(), expected_bits_def);
3269 let mut cw_vec = Vec::with_capacity(num_vals * iter.bytes_per_word());
3270
3271 for _ in 0..num_vals {
3272 iter.append_next(&mut cw_vec);
3273 }
3274 assert!(iter.append_next(&mut cw_vec).is_none());
3275
3276 assert_eq!(expected_values, cw_vec);
3277
3278 let parser = super::ControlWordParser::new(expected_bits_rep, expected_bits_def);
3279
3280 let mut rep_out = Vec::with_capacity(num_vals);
3281 let mut def_out = Vec::with_capacity(num_vals);
3282
3283 if expected_bytes_per_word > 0 {
3284 for slice in cw_vec.chunks_exact(expected_bytes_per_word) {
3285 parser.parse(slice, &mut rep_out, &mut def_out);
3286 }
3287 }
3288
3289 assert_eq!(rep, rep_out.as_slice());
3290 assert_eq!(def, def_out.as_slice());
3291 }
3292
3293 let rep = &[0_u16, 7, 3, 2, 9, 8, 12, 5];
3295 let def = &[5_u16, 3, 1, 2, 12, 15, 0, 2];
3296 let expected = vec![
3297 0b00000101, 0b01110011, 0b00110001, 0b00100010, 0b10011100, 0b10001111, 0b11000000, 0b01010010, ];
3306 check(rep, def, expected, 1, 4, 4);
3307
3308 let rep = &[0_u16, 7, 3, 2, 9, 8, 12, 5];
3310 let def = &[5_u16, 3, 1, 2, 12, 22, 0, 2];
3311 let expected = vec![
3312 0b00000101, 0b00000000, 0b11100011, 0b00000000, 0b01100001, 0b00000000, 0b01000010, 0b00000000, 0b00101100, 0b00000001, 0b00010110, 0b00000001, 0b10000000, 0b00000001, 0b10100010, 0b00000000, ];
3321 check(rep, def, expected, 2, 4, 5);
3322
3323 let levels = &[0_u16, 7, 3, 2, 9, 8, 12, 5];
3325 let expected = vec![
3326 0b00000000, 0b00000111, 0b00000011, 0b00000010, 0b00001001, 0b00001000, 0b00001100, 0b00000101, ];
3335 check(levels, &[], expected.clone(), 1, 4, 0);
3336
3337 check(&[], levels, expected, 1, 0, 4);
3339
3340 check(&[], &[], Vec::default(), 0, 0, 0);
3342 }
3343
3344 #[test]
3345 fn test_control_words_rep_index() {
3346 fn check(
3347 rep: &[u16],
3348 def: &[u16],
3349 expected_new_rows: Vec<bool>,
3350 expected_is_visible: Vec<bool>,
3351 ) {
3352 let num_vals = rep.len().max(def.len());
3353 let max_rep = rep.iter().max().copied().unwrap_or(0);
3354 let max_def = def.iter().max().copied().unwrap_or(0);
3355
3356 let in_rep = if rep.is_empty() { None } else { Some(rep) };
3357 let in_def = if def.is_empty() { None } else { Some(def) };
3358
3359 let mut iter = super::build_control_word_iterator(
3360 in_rep,
3361 max_rep,
3362 in_def,
3363 max_def,
3364 2,
3365 expected_new_rows.len(),
3366 );
3367
3368 let mut cw_vec = Vec::with_capacity(num_vals * iter.bytes_per_word());
3369 let mut expected_new_rows = expected_new_rows.iter().copied();
3370 let mut expected_is_visible = expected_is_visible.iter().copied();
3371 for _ in 0..expected_new_rows.len() {
3372 let word_desc = iter.append_next(&mut cw_vec).unwrap();
3373 assert_eq!(word_desc.is_new_row, expected_new_rows.next().unwrap());
3374 assert_eq!(word_desc.is_visible, expected_is_visible.next().unwrap());
3375 }
3376 assert!(iter.append_next(&mut cw_vec).is_none());
3377 }
3378
3379 let rep = &[2_u16, 1, 0, 2, 2, 0, 1, 1, 0, 2, 0];
3381 let def = &[0_u16, 0, 0, 3, 1, 1, 2, 1, 0, 0, 1];
3383
3384 check(
3386 rep,
3387 def,
3388 vec![
3389 true, false, false, true, true, false, false, false, false, true, false,
3390 ],
3391 vec![
3392 true, true, true, false, true, true, true, true, true, true, true,
3393 ],
3394 );
3395 check(
3397 rep,
3398 &[],
3399 vec![
3400 true, false, false, true, true, false, false, false, false, true, false,
3401 ],
3402 vec![true; 11],
3403 );
3404 check(
3406 &[],
3407 def,
3408 vec![
3409 true, true, true, true, true, true, true, true, true, true, true,
3410 ],
3411 vec![true; 11],
3412 );
3413 check(
3415 &[],
3416 &[],
3417 vec![
3418 true, true, true, true, true, true, true, true, true, true, true,
3419 ],
3420 vec![true; 11],
3421 );
3422 }
3423
3424 #[test]
3425 fn regress_empty_list_case() {
3426 let mut builder = RepDefBuilder::default();
3428 builder.add_validity_bitmap(validity(&[true, false, true]));
3429 builder.add_offsets(
3430 offsets_32(&[0, 0, 0, 0]),
3431 Some(validity(&[false, false, false])),
3432 );
3433 builder.add_no_null(0);
3434
3435 let repdefs = RepDefBuilder::serialize(vec![builder]);
3436 let rep = repdefs.repetition_levels.unwrap();
3437 let def = repdefs.definition_levels.unwrap();
3438
3439 assert_eq!([1, 1, 1], *rep);
3440 assert_eq!([1, 2, 1], *def);
3441
3442 let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
3443 Some(rep.as_ref().to_vec()),
3444 Some(def.as_ref().to_vec()),
3445 repdefs.def_meaning.into(),
3446 0,
3447 )]);
3448
3449 assert_eq!(unraveler.unravel_validity(0), None);
3450 let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
3451 assert_eq!(off.inner(), offsets_32(&[0, 0, 0, 0]).inner());
3452 assert_eq!(val, Some(validity(&[false, false, false])));
3453 let val = unraveler.unravel_validity(3).unwrap();
3454 assert_eq!(val.inner(), validity(&[true, false, true]).inner());
3455 }
3456
3457 #[test]
3458 fn regress_list_ends_null_case() {
3459 let mut builder = RepDefBuilder::default();
3460 builder.add_offsets(
3461 offsets_64(&[0, 1, 2, 2]),
3462 Some(validity(&[true, true, false])),
3463 );
3464 builder.add_offsets(offsets_64(&[0, 1, 1]), Some(validity(&[true, false])));
3465 builder.add_no_null(1);
3466
3467 let repdefs = RepDefBuilder::serialize(vec![builder]);
3468 let rep = repdefs.repetition_levels.unwrap();
3469 let def = repdefs.definition_levels.unwrap();
3470
3471 assert_eq!([2, 2, 2], *rep);
3472 assert_eq!([0, 1, 2], *def);
3473
3474 let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
3475 Some(rep.as_ref().to_vec()),
3476 Some(def.as_ref().to_vec()),
3477 repdefs.def_meaning.into(),
3478 1,
3479 )]);
3480
3481 assert_eq!(unraveler.unravel_validity(1), None);
3482 let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
3483 assert_eq!(off.inner(), offsets_32(&[0, 1, 1]).inner());
3484 assert_eq!(val, Some(validity(&[true, false])));
3485 let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
3486 assert_eq!(off.inner(), offsets_32(&[0, 1, 2, 2]).inner());
3487 assert_eq!(val, Some(validity(&[true, true, false])));
3488 }
3489
3490 #[test]
3491 fn test_mixed_unraveler() {
3492 let mut unraveler = CompositeRepDefUnraveler::new(vec![
3497 RepDefUnraveler::new(
3498 None,
3499 Some(vec![0, 1, 0, 1]),
3500 vec![DefinitionInterpretation::NullableItem].into(),
3501 4,
3502 ),
3503 RepDefUnraveler::new(
3504 None,
3505 None,
3506 vec![DefinitionInterpretation::AllValidItem].into(),
3507 4,
3508 ),
3509 ]);
3510
3511 assert_eq!(
3512 unraveler.unravel_validity(8),
3513 Some(validity(&[
3514 true, false, true, false, true, true, true, true
3515 ]))
3516 );
3517
3518 let def1 = Some(vec![0, 1, 2]);
3520 let rep1 = Some(vec![1, 0, 1]);
3521
3522 let def2 = Some(vec![1, 0, 0]);
3523 let rep2 = Some(vec![1, 1, 0]);
3524
3525 let mut unraveler = CompositeRepDefUnraveler::new(vec![
3526 RepDefUnraveler::new(
3527 rep1,
3528 def1,
3529 vec![
3530 DefinitionInterpretation::NullableItem,
3531 DefinitionInterpretation::EmptyableList,
3532 ]
3533 .into(),
3534 2,
3535 ),
3536 RepDefUnraveler::new(
3537 rep2,
3538 def2,
3539 vec![
3540 DefinitionInterpretation::AllValidItem,
3541 DefinitionInterpretation::NullableList,
3542 ]
3543 .into(),
3544 2,
3545 ),
3546 ]);
3547
3548 assert_eq!(
3549 unraveler.unravel_validity(4),
3550 Some(validity(&[true, false, true, true]))
3551 );
3552 assert_eq!(
3553 unraveler.unravel_offsets::<i32>().unwrap(),
3554 (
3555 offsets_32(&[0, 2, 2, 2, 4]),
3556 Some(validity(&[true, true, false, true]))
3557 )
3558 );
3559 }
3560
3561 #[test]
3562 fn test_mixed_unraveler_nullable_without_def_levels() {
3563 let mut unraveler = CompositeRepDefUnraveler::new(vec![
3566 RepDefUnraveler::new(
3567 None,
3568 Some(vec![0, 1, 0, 1]),
3569 vec![DefinitionInterpretation::NullableItem].into(),
3570 4,
3571 ),
3572 RepDefUnraveler::new(
3573 None,
3574 None,
3575 vec![DefinitionInterpretation::NullableItem].into(),
3576 4,
3577 ),
3578 ]);
3579
3580 assert_eq!(
3581 unraveler.unravel_validity(8),
3582 Some(validity(&[
3583 true, false, true, false, true, true, true, true
3584 ]))
3585 );
3586 }
3587}