1use std::{
110 iter::{Copied, Zip},
111 sync::Arc,
112};
113
114use arrow_array::OffsetSizeTrait;
115use arrow_buffer::{
116 ArrowNativeType, BooleanBuffer, BooleanBufferBuilder, NullBuffer, OffsetBuffer, ScalarBuffer,
117};
118use lance_core::{utils::bit::log_2_ceil, Error, Result};
119use snafu::location;
120
121use crate::buffer::LanceBuffer;
122
123pub type LevelBuffer = Vec<u16>;
128
129const SPECIAL_THRESHOLD: u16 = u16::MAX / 2;
135
136#[derive(Clone, Debug)]
139struct OffsetDesc {
140 offsets: Arc<[i64]>,
141 validity: Option<BooleanBuffer>,
142 has_empty_lists: bool,
143 num_values: usize,
144 num_specials: usize,
145}
146
147#[derive(Clone, Debug)]
150struct ValidityDesc {
151 validity: Option<BooleanBuffer>,
152 num_values: usize,
153}
154
155#[derive(Clone, Debug)]
159struct FslDesc {
160 validity: Option<BooleanBuffer>,
161 dimension: usize,
162 num_values: usize,
163}
164
165#[derive(Clone, Debug)]
169enum RawRepDef {
170 Offsets(OffsetDesc),
171 Validity(ValidityDesc),
172 Fsl(FslDesc),
173}
174
175impl RawRepDef {
176 fn has_nulls(&self) -> bool {
178 match self {
179 Self::Offsets(OffsetDesc { validity, .. }) => validity.is_some(),
180 Self::Validity(ValidityDesc { validity, .. }) => validity.is_some(),
181 Self::Fsl(FslDesc { validity, .. }) => validity.is_some(),
182 }
183 }
184
185 fn num_values(&self) -> usize {
187 match self {
188 Self::Offsets(OffsetDesc { num_values, .. }) => *num_values,
189 Self::Validity(ValidityDesc { num_values, .. }) => *num_values,
190 Self::Fsl(FslDesc { num_values, .. }) => *num_values,
191 }
192 }
193
194 fn num_specials(&self) -> usize {
196 match self {
197 Self::Offsets(OffsetDesc { num_specials, .. }) => *num_specials,
198 _ => 0,
199 }
200 }
201
202 fn max_def(&self) -> u16 {
204 match self {
205 Self::Offsets(OffsetDesc {
206 has_empty_lists,
207 validity,
208 ..
209 }) => {
210 let mut max_def = 0;
211 if *has_empty_lists {
212 max_def += 1;
213 }
214 if validity.is_some() {
215 max_def += 1;
216 }
217 max_def
218 }
219 Self::Validity(ValidityDesc { validity: None, .. }) => 0,
220 Self::Validity(ValidityDesc { .. }) => 1,
221 Self::Fsl(FslDesc { validity: None, .. }) => 0,
222 Self::Fsl(FslDesc { .. }) => 1,
223 }
224 }
225
226 fn max_rep(&self) -> u16 {
228 match self {
229 Self::Offsets(_) => 1,
230 _ => 0,
231 }
232 }
233}
234
235#[derive(Debug)]
238pub struct SerializedRepDefs {
239 pub repetition_levels: Option<Arc<[u16]>>,
243 pub definition_levels: Option<Arc<[u16]>>,
247 pub def_meaning: Vec<DefinitionInterpretation>,
249 pub max_visible_level: Option<u16>,
256}
257
258impl SerializedRepDefs {
259 pub fn new(
260 repetition_levels: Option<LevelBuffer>,
261 definition_levels: Option<LevelBuffer>,
262 def_meaning: Vec<DefinitionInterpretation>,
263 ) -> Self {
264 let first_list = def_meaning.iter().position(|level| level.is_list());
265 let max_visible_level = first_list.map(|first_list| {
266 def_meaning
267 .iter()
268 .map(|level| level.num_def_levels())
269 .take(first_list)
270 .sum::<u16>()
271 });
272 Self {
273 repetition_levels: repetition_levels.map(Arc::from),
274 definition_levels: definition_levels.map(Arc::from),
275 def_meaning,
276 max_visible_level,
277 }
278 }
279
280 pub fn empty(def_meaning: Vec<DefinitionInterpretation>) -> Self {
282 Self {
283 repetition_levels: None,
284 definition_levels: None,
285 def_meaning,
286 max_visible_level: None,
287 }
288 }
289
290 pub fn rep_slicer(&self) -> Option<RepDefSlicer> {
291 self.repetition_levels
292 .as_ref()
293 .map(|rep| RepDefSlicer::new(self, rep.clone()))
294 }
295
296 pub fn def_slicer(&self) -> Option<RepDefSlicer> {
297 self.definition_levels
298 .as_ref()
299 .map(|def| RepDefSlicer::new(self, def.clone()))
300 }
301}
302
303#[derive(Debug)]
311pub struct RepDefSlicer<'a> {
312 repdef: &'a SerializedRepDefs,
313 to_slice: LanceBuffer,
314 current: usize,
315}
316
317impl<'a> RepDefSlicer<'a> {
319 fn new(repdef: &'a SerializedRepDefs, levels: Arc<[u16]>) -> Self {
320 Self {
321 repdef,
322 to_slice: LanceBuffer::reinterpret_slice(levels),
323 current: 0,
324 }
325 }
326
327 pub fn num_levels(&self) -> usize {
328 self.to_slice.len() / 2
329 }
330
331 pub fn num_levels_remaining(&self) -> usize {
332 self.num_levels() - self.current
333 }
334
335 pub fn all_levels(&self) -> &LanceBuffer {
336 &self.to_slice
337 }
338
339 pub fn slice_rest(&mut self) -> LanceBuffer {
348 let start = self.current;
349 let remaining = self.num_levels_remaining();
350 self.current = self.num_levels();
351 self.to_slice.slice_with_length(start * 2, remaining * 2)
352 }
353
354 pub fn slice_next(&mut self, num_values: usize) -> LanceBuffer {
356 let start = self.current;
357 let Some(max_visible_level) = self.repdef.max_visible_level else {
358 self.current = start + num_values;
360 return self.to_slice.slice_with_length(start * 2, num_values * 2);
361 };
362 if let Some(def) = self.repdef.definition_levels.as_ref() {
363 let mut def_itr = def[start..].iter();
367 let mut num_taken = 0;
368 let mut num_passed = 0;
369 while num_taken < num_values {
370 let def_level = *def_itr.next().unwrap();
371 if def_level <= max_visible_level {
372 num_taken += 1;
373 }
374 num_passed += 1;
375 }
376 self.current = start + num_passed;
377 self.to_slice.slice_with_length(start * 2, num_passed * 2)
378 } else {
379 self.current = start + num_values;
381 self.to_slice.slice_with_length(start * 2, num_values * 2)
382 }
383 }
384}
385
386#[derive(Debug, Copy, Clone, PartialEq, Eq)]
399pub enum DefinitionInterpretation {
400 AllValidItem,
401 AllValidList,
402 NullableItem,
403 NullableList,
404 EmptyableList,
405 NullableAndEmptyableList,
406}
407
408impl DefinitionInterpretation {
409 pub fn num_def_levels(&self) -> u16 {
411 match self {
412 Self::AllValidItem => 0,
413 Self::AllValidList => 0,
414 Self::NullableItem => 1,
415 Self::NullableList => 1,
416 Self::EmptyableList => 1,
417 Self::NullableAndEmptyableList => 2,
418 }
419 }
420
421 pub fn is_all_valid(&self) -> bool {
423 matches!(
424 self,
425 Self::AllValidItem | Self::AllValidList | Self::EmptyableList
426 )
427 }
428
429 pub fn is_list(&self) -> bool {
431 matches!(
432 self,
433 Self::AllValidList
434 | Self::NullableList
435 | Self::EmptyableList
436 | Self::NullableAndEmptyableList
437 )
438 }
439}
440
441#[derive(Debug)]
453struct SerializerContext {
454 def_meaning: Vec<DefinitionInterpretation>,
456 rep_levels: LevelBuffer,
457 spare_rep: LevelBuffer,
458 def_levels: LevelBuffer,
459 spare_def: LevelBuffer,
460 current_rep: u16,
461 current_def: u16,
462 current_len: usize,
463 current_num_specials: usize,
464}
465
466impl SerializerContext {
467 fn new(len: usize, num_layers: usize, max_rep: u16, max_def: u16) -> Self {
468 let def_meaning = Vec::with_capacity(num_layers);
469 Self {
470 rep_levels: if max_rep > 0 {
471 vec![0; len]
472 } else {
473 LevelBuffer::default()
474 },
475 spare_rep: if max_rep > 0 {
476 vec![0; len]
477 } else {
478 LevelBuffer::default()
479 },
480 def_levels: if max_def > 0 {
481 vec![0; len]
482 } else {
483 LevelBuffer::default()
484 },
485 spare_def: if max_def > 0 {
486 vec![0; len]
487 } else {
488 LevelBuffer::default()
489 },
490 def_meaning,
491 current_rep: max_rep,
492 current_def: max_def,
493 current_len: 0,
494 current_num_specials: 0,
495 }
496 }
497
498 fn checkout_def(&mut self, meaning: DefinitionInterpretation) -> u16 {
499 let def = self.current_def;
500 self.current_def -= meaning.num_def_levels();
501 self.def_meaning.push(meaning);
502 def
503 }
504
505 fn record_offsets(&mut self, offset_desc: &OffsetDesc) {
506 let rep_level = self.current_rep;
507 let (null_list_level, empty_list_level) =
508 match (offset_desc.validity.is_some(), offset_desc.has_empty_lists) {
509 (true, true) => {
510 let level =
511 self.checkout_def(DefinitionInterpretation::NullableAndEmptyableList);
512 (level - 1, level)
513 }
514 (true, false) => (self.checkout_def(DefinitionInterpretation::NullableList), 0),
515 (false, true) => (
516 0,
517 self.checkout_def(DefinitionInterpretation::EmptyableList),
518 ),
519 (false, false) => {
520 self.checkout_def(DefinitionInterpretation::AllValidList);
521 (0, 0)
522 }
523 };
524 self.current_rep -= 1;
525
526 if let Some(validity) = &offset_desc.validity {
527 self.do_record_validity(validity, null_list_level);
528 }
529
530 let mut new_len = 0;
535 assert!(self.rep_levels.len() >= offset_desc.num_values - 1 + self.current_num_specials);
536 if self.def_levels.is_empty() {
537 let mut write_itr = self.spare_rep.iter_mut();
538 let mut read_iter = self.rep_levels.iter().copied();
539 for w in offset_desc.offsets.windows(2) {
540 let len = w[1] - w[0];
541 assert!(len > 0);
543 let rep = read_iter.next().unwrap();
544 let list_level = if rep == 0 { rep_level } else { rep };
545 *write_itr.next().unwrap() = list_level;
546
547 for _ in 1..len {
548 *write_itr.next().unwrap() = 0;
549 }
550 new_len += len as usize;
551 }
552 std::mem::swap(&mut self.rep_levels, &mut self.spare_rep);
553 } else {
554 assert!(
555 self.def_levels.len() >= offset_desc.num_values - 1 + self.current_num_specials
556 );
557 let mut def_write_itr = self.spare_def.iter_mut();
558 let mut rep_write_itr = self.spare_rep.iter_mut();
559 let mut rep_read_itr = self.rep_levels.iter().copied();
560 let mut def_read_itr = self.def_levels.iter().copied();
561 let specials_to_pass = self.current_num_specials;
562 let mut specials_passed = 0;
563
564 for w in offset_desc.offsets.windows(2) {
565 let mut def = def_read_itr.next().unwrap();
566 while def > SPECIAL_THRESHOLD {
568 *def_write_itr.next().unwrap() = def;
569 *rep_write_itr.next().unwrap() = rep_read_itr.next().unwrap();
570 def = def_read_itr.next().unwrap();
571 new_len += 1;
572 specials_passed += 1;
573 }
574
575 let len = w[1] - w[0];
576 let rep = rep_read_itr.next().unwrap();
577
578 let list_level = if rep == 0 { rep_level } else { rep };
582
583 if def == 0 && len > 0 {
584 *def_write_itr.next().unwrap() = 0;
586 *rep_write_itr.next().unwrap() = list_level;
587
588 for _ in 1..len {
589 *def_write_itr.next().unwrap() = 0;
590 *rep_write_itr.next().unwrap() = 0;
591 }
592
593 new_len += len as usize;
594 } else if def == 0 {
595 *def_write_itr.next().unwrap() = empty_list_level + SPECIAL_THRESHOLD;
597 *rep_write_itr.next().unwrap() = list_level;
598 new_len += 1;
599 } else {
600 *def_write_itr.next().unwrap() = def + SPECIAL_THRESHOLD;
603 *rep_write_itr.next().unwrap() = list_level;
604 new_len += 1;
605 }
606 }
607
608 while specials_passed < specials_to_pass {
610 *def_write_itr.next().unwrap() = def_read_itr.next().unwrap();
611 *rep_write_itr.next().unwrap() = rep_read_itr.next().unwrap();
612 new_len += 1;
613 specials_passed += 1;
614 }
615 std::mem::swap(&mut self.def_levels, &mut self.spare_def);
616 std::mem::swap(&mut self.rep_levels, &mut self.spare_rep);
617 }
618
619 self.current_len = new_len;
620 self.current_num_specials += offset_desc.num_specials;
621 }
622
623 fn do_record_validity(&mut self, validity: &BooleanBuffer, null_level: u16) {
624 assert!(self.def_levels.len() >= validity.len() + self.current_num_specials);
625 debug_assert!(
626 self.current_len == 0 || self.current_len == validity.len() + self.current_num_specials
627 );
628 self.current_len = validity.len();
629
630 let mut def_read_itr = self.def_levels.iter().copied();
631 let mut def_write_itr = self.spare_def.iter_mut();
632
633 let specials_to_pass = self.current_num_specials;
634 let mut specials_passed = 0;
635
636 for incoming_validity in validity.iter() {
637 let mut def = def_read_itr.next().unwrap();
638 while def > SPECIAL_THRESHOLD {
639 *def_write_itr.next().unwrap() = def;
640 def = def_read_itr.next().unwrap();
641 specials_passed += 1;
642 }
643 if def == 0 && !incoming_validity {
644 *def_write_itr.next().unwrap() = null_level;
645 } else {
646 *def_write_itr.next().unwrap() = def;
647 }
648 }
649
650 while specials_passed < specials_to_pass {
651 *def_write_itr.next().unwrap() = def_read_itr.next().unwrap();
652 specials_passed += 1;
653 }
654
655 std::mem::swap(&mut self.def_levels, &mut self.spare_def);
656 }
657
658 fn multiply_levels(&mut self, multiplier: usize) {
659 let old_len = self.current_len;
660 self.current_len =
662 (self.current_len - self.current_num_specials) * multiplier + self.current_num_specials;
663
664 if self.rep_levels.is_empty() && self.def_levels.is_empty() {
665 return;
667 } else if self.rep_levels.is_empty() {
668 assert!(self.def_levels.len() >= self.current_len);
669 let mut def_read_itr = self.def_levels.iter().copied();
671 let mut def_write_itr = self.spare_def.iter_mut();
672 for _ in 0..old_len {
673 let mut def = def_read_itr.next().unwrap();
674 while def > SPECIAL_THRESHOLD {
675 *def_write_itr.next().unwrap() = def;
676 def = def_read_itr.next().unwrap();
677 }
678 for _ in 0..multiplier {
679 *def_write_itr.next().unwrap() = def;
680 }
681 }
682 } else if self.def_levels.is_empty() {
683 assert!(self.rep_levels.len() >= self.current_len);
684 let mut rep_read_itr = self.rep_levels.iter().copied();
686 let mut rep_write_itr = self.spare_rep.iter_mut();
687 for _ in 0..old_len {
688 let rep = rep_read_itr.next().unwrap();
689 for _ in 0..multiplier {
690 *rep_write_itr.next().unwrap() = rep;
691 }
692 }
693 } else {
694 assert!(self.rep_levels.len() >= self.current_len);
695 assert!(self.def_levels.len() >= self.current_len);
696 let mut rep_read_itr = self.rep_levels.iter().copied();
697 let mut def_read_itr = self.def_levels.iter().copied();
698 let mut rep_write_itr = self.spare_rep.iter_mut();
699 let mut def_write_itr = self.spare_def.iter_mut();
700 for _ in 0..old_len {
701 let mut def = def_read_itr.next().unwrap();
702 while def > SPECIAL_THRESHOLD {
703 *def_write_itr.next().unwrap() = def;
704 *rep_write_itr.next().unwrap() = rep_read_itr.next().unwrap();
705 def = def_read_itr.next().unwrap();
706 }
707 let rep = rep_read_itr.next().unwrap();
708 for _ in 0..multiplier {
709 *def_write_itr.next().unwrap() = def;
710 *rep_write_itr.next().unwrap() = rep;
711 }
712 }
713 }
714 std::mem::swap(&mut self.def_levels, &mut self.spare_def);
715 std::mem::swap(&mut self.rep_levels, &mut self.spare_rep);
716 }
717
718 fn record_validity_buf(&mut self, validity: &Option<BooleanBuffer>) {
719 if let Some(validity) = validity {
720 let def_level = self.checkout_def(DefinitionInterpretation::NullableItem);
721 self.do_record_validity(validity, def_level);
722 } else {
723 self.checkout_def(DefinitionInterpretation::AllValidItem);
724 }
725 }
726
727 fn record_validity(&mut self, validity_desc: &ValidityDesc) {
728 self.record_validity_buf(&validity_desc.validity)
729 }
730
731 fn record_fsl(&mut self, fsl_desc: &FslDesc) {
732 self.record_validity_buf(&fsl_desc.validity);
733 self.multiply_levels(fsl_desc.dimension);
734 }
735
736 fn normalize_specials(&mut self) {
737 for def in self.def_levels.iter_mut() {
738 if *def > SPECIAL_THRESHOLD {
739 *def -= SPECIAL_THRESHOLD;
740 }
741 }
742 }
743
744 fn build(mut self) -> SerializedRepDefs {
745 if self.current_len == 0 {
746 return SerializedRepDefs::new(None, None, self.def_meaning);
747 }
748
749 self.normalize_specials();
750
751 let definition_levels = if self.def_levels.is_empty() {
752 None
753 } else {
754 Some(self.def_levels)
755 };
756 let repetition_levels = if self.rep_levels.is_empty() {
757 None
758 } else {
759 Some(self.rep_levels)
760 };
761
762 let def_meaning = self.def_meaning.into_iter().rev().collect::<Vec<_>>();
764
765 SerializedRepDefs::new(repetition_levels, definition_levels, def_meaning)
766 }
767}
768
769#[derive(Clone, Default, Debug)]
776pub struct RepDefBuilder {
777 repdefs: Vec<RawRepDef>,
779 len: Option<usize>,
784}
785
786impl RepDefBuilder {
787 fn check_validity_len(&mut self, incoming_len: usize) {
788 if let Some(len) = self.len {
789 assert_eq!(incoming_len, len);
790 } else {
791 self.len = Some(incoming_len);
793 }
794 }
795
796 fn num_layers(&self) -> usize {
797 self.repdefs.len()
798 }
799
800 fn is_empty(&self) -> bool {
803 self.repdefs
804 .iter()
805 .all(|r| matches!(r, RawRepDef::Validity(ValidityDesc { validity: None, .. })))
806 }
807
808 pub fn is_simple_validity(&self) -> bool {
810 self.repdefs.len() == 1 && matches!(self.repdefs[0], RawRepDef::Validity(_))
811 }
812
813 pub fn add_validity_bitmap(&mut self, validity: NullBuffer) {
815 self.check_validity_len(validity.len());
816 self.repdefs.push(RawRepDef::Validity(ValidityDesc {
817 num_values: validity.len(),
818 validity: Some(validity.into_inner()),
819 }));
820 }
821
822 pub fn add_no_null(&mut self, len: usize) {
824 self.check_validity_len(len);
825 self.repdefs.push(RawRepDef::Validity(ValidityDesc {
826 validity: None,
827 num_values: len,
828 }));
829 }
830
831 pub fn add_fsl(&mut self, validity: Option<NullBuffer>, dimension: usize, num_values: usize) {
832 if let Some(len) = self.len {
833 assert_eq!(num_values, len);
834 }
835 self.len = Some(num_values * dimension);
836 debug_assert!(validity.is_none() || validity.as_ref().unwrap().len() == num_values);
837 self.repdefs.push(RawRepDef::Fsl(FslDesc {
838 num_values,
839 validity: validity.map(|v| v.into_inner()),
840 dimension,
841 }))
842 }
843
844 fn check_offset_len(&mut self, offsets: &[i64]) {
845 if let Some(len) = self.len {
846 assert!(offsets.len() == len + 1);
847 }
848 self.len = Some(offsets[offsets.len() - 1] as usize);
849 }
850
851 fn do_add_offsets(
852 &mut self,
853 lengths: impl Iterator<Item = i64>,
854 validity: Option<NullBuffer>,
855 capacity: usize,
856 ) -> bool {
857 let mut num_specials = 0;
858 let mut has_empty_lists = false;
859 let mut has_garbage_values = false;
860 let mut last_off: i64 = 0;
861
862 let mut normalized_offsets = Vec::with_capacity(capacity);
863 normalized_offsets.push(0);
864
865 if let Some(ref validity) = validity {
866 for (len, is_valid) in lengths.zip(validity.iter()) {
867 match (is_valid, len == 0) {
868 (false, is_empty) => {
869 num_specials += 1;
870 has_garbage_values |= !is_empty;
871 }
872 (true, true) => {
873 num_specials += 1;
874 has_empty_lists = true;
875 }
876 _ => {
877 last_off += len;
878 }
879 }
880 normalized_offsets.push(last_off);
881 }
882 } else {
883 for len in lengths {
884 if len == 0 {
885 num_specials += 1;
886 has_empty_lists = true;
887 }
888 last_off += len;
889 normalized_offsets.push(last_off);
890 }
891 }
892
893 self.check_offset_len(&normalized_offsets);
894 self.repdefs.push(RawRepDef::Offsets(OffsetDesc {
895 num_values: normalized_offsets.len() - 1,
896 offsets: normalized_offsets.into(),
897 validity: validity.map(|v| v.into_inner()),
898 has_empty_lists,
899 num_specials: num_specials as usize,
900 }));
901
902 has_garbage_values
903 }
904
905 pub fn add_offsets<O: OffsetSizeTrait>(
912 &mut self,
913 offsets: OffsetBuffer<O>,
914 validity: Option<NullBuffer>,
915 ) -> bool {
916 let inner = offsets.into_inner();
917 let buffer_len = inner.len();
918
919 if O::IS_LARGE {
920 let i64_buff = ScalarBuffer::<i64>::new(inner.into_inner(), 0, buffer_len);
921 let lengths = i64_buff.windows(2).map(|off| off[1] - off[0]);
922 self.do_add_offsets(lengths, validity, buffer_len)
923 } else {
924 let i32_buff = ScalarBuffer::<i32>::new(inner.into_inner(), 0, buffer_len);
925 let lengths = i32_buff.windows(2).map(|off| (off[1] - off[0]) as i64);
926 self.do_add_offsets(lengths, validity, buffer_len)
927 }
928 }
929
930 fn concat_layers<'a>(
942 layers: impl Iterator<Item = &'a RawRepDef>,
943 num_layers: usize,
944 ) -> RawRepDef {
945 enum LayerKind {
946 Validity,
947 Fsl,
948 Offsets,
949 }
950
951 let mut collected = Vec::with_capacity(num_layers);
954 let mut has_nulls = false;
955 let mut layer_kind = LayerKind::Validity;
956 let mut total_num_specials = 0;
957 let mut all_dimension = 0;
958 let mut all_has_empty_lists = false;
959 let mut all_num_values = 0;
960 for layer in layers {
961 has_nulls |= layer.has_nulls();
962 match layer {
963 RawRepDef::Validity(_) => {
964 layer_kind = LayerKind::Validity;
965 }
966 RawRepDef::Offsets(OffsetDesc {
967 num_specials,
968 has_empty_lists,
969 ..
970 }) => {
971 all_has_empty_lists |= *has_empty_lists;
972 layer_kind = LayerKind::Offsets;
973 total_num_specials += num_specials;
974 }
975 RawRepDef::Fsl(FslDesc { dimension, .. }) => {
976 layer_kind = LayerKind::Fsl;
977 all_dimension = *dimension;
978 }
979 }
980 collected.push(layer);
981 all_num_values += layer.num_values();
982 }
983
984 if !has_nulls {
986 match layer_kind {
987 LayerKind::Validity => {
988 return RawRepDef::Validity(ValidityDesc {
989 validity: None,
990 num_values: all_num_values,
991 });
992 }
993 LayerKind::Fsl => {
994 return RawRepDef::Fsl(FslDesc {
995 validity: None,
996 num_values: all_num_values,
997 dimension: all_dimension,
998 })
999 }
1000 LayerKind::Offsets => {}
1001 }
1002 }
1003
1004 let mut validity_builder = if has_nulls {
1006 BooleanBufferBuilder::new(all_num_values)
1007 } else {
1008 BooleanBufferBuilder::new(0)
1009 };
1010 let mut all_offsets = if matches!(layer_kind, LayerKind::Offsets) {
1011 let mut all_offsets = Vec::with_capacity(all_num_values);
1012 all_offsets.push(0);
1013 all_offsets
1014 } else {
1015 Vec::new()
1016 };
1017
1018 for layer in collected {
1019 match layer {
1020 RawRepDef::Validity(ValidityDesc {
1021 validity: Some(validity),
1022 ..
1023 }) => {
1024 validity_builder.append_buffer(validity);
1025 }
1026 RawRepDef::Validity(ValidityDesc {
1027 validity: None,
1028 num_values,
1029 }) => {
1030 validity_builder.append_n(*num_values, true);
1031 }
1032 RawRepDef::Fsl(FslDesc {
1033 validity,
1034 num_values,
1035 ..
1036 }) => {
1037 if let Some(validity) = validity {
1038 validity_builder.append_buffer(validity);
1039 } else {
1040 validity_builder.append_n(*num_values, true);
1041 }
1042 }
1043 RawRepDef::Offsets(OffsetDesc {
1044 offsets,
1045 validity: Some(validity),
1046 has_empty_lists,
1047 ..
1048 }) => {
1049 all_has_empty_lists |= has_empty_lists;
1050 validity_builder.append_buffer(validity);
1051 let last = *all_offsets.last().unwrap();
1052 all_offsets.extend(offsets.iter().skip(1).map(|off| *off + last));
1053 }
1054 RawRepDef::Offsets(OffsetDesc {
1055 offsets,
1056 validity: None,
1057 has_empty_lists,
1058 num_values,
1059 ..
1060 }) => {
1061 all_has_empty_lists |= has_empty_lists;
1062 if has_nulls {
1063 validity_builder.append_n(*num_values, true);
1064 }
1065 let last = *all_offsets.last().unwrap();
1066 all_offsets.extend(offsets.iter().skip(1).map(|off| *off + last));
1067 }
1068 }
1069 }
1070 let validity = if has_nulls {
1071 Some(validity_builder.finish())
1072 } else {
1073 None
1074 };
1075 match layer_kind {
1076 LayerKind::Fsl => RawRepDef::Fsl(FslDesc {
1077 validity,
1078 num_values: all_num_values,
1079 dimension: all_dimension,
1080 }),
1081 LayerKind::Validity => RawRepDef::Validity(ValidityDesc {
1082 validity,
1083 num_values: all_num_values,
1084 }),
1085 LayerKind::Offsets => RawRepDef::Offsets(OffsetDesc {
1086 offsets: all_offsets.into(),
1087 validity,
1088 has_empty_lists: all_has_empty_lists,
1089 num_values: all_num_values,
1090 num_specials: total_num_specials,
1091 }),
1092 }
1093 }
1094
1095 pub fn serialize(builders: Vec<Self>) -> SerializedRepDefs {
1098 assert!(!builders.is_empty());
1099 if builders.iter().all(|b| b.is_empty()) {
1100 return SerializedRepDefs::empty(
1102 builders
1103 .first()
1104 .unwrap()
1105 .repdefs
1106 .iter()
1107 .map(|_| DefinitionInterpretation::AllValidItem)
1108 .collect::<Vec<_>>(),
1109 );
1110 }
1111
1112 let num_layers = builders[0].num_layers();
1113 let combined_layers = (0..num_layers)
1114 .map(|layer_index| {
1115 Self::concat_layers(
1116 builders.iter().map(|b| &b.repdefs[layer_index]),
1117 builders.len(),
1118 )
1119 })
1120 .collect::<Vec<_>>();
1121 debug_assert!(builders
1122 .iter()
1123 .all(|b| b.num_layers() == builders[0].num_layers()));
1124
1125 let total_len = combined_layers.last().unwrap().num_values()
1126 + combined_layers
1127 .iter()
1128 .map(|l| l.num_specials())
1129 .sum::<usize>();
1130 let max_rep = combined_layers.iter().map(|l| l.max_rep()).sum::<u16>();
1131 let max_def = combined_layers.iter().map(|l| l.max_def()).sum::<u16>();
1132
1133 let mut context = SerializerContext::new(total_len, num_layers, max_rep, max_def);
1134 for layer in combined_layers.into_iter() {
1135 match layer {
1136 RawRepDef::Validity(def) => {
1137 context.record_validity(&def);
1138 }
1139 RawRepDef::Offsets(rep) => {
1140 context.record_offsets(&rep);
1141 }
1142 RawRepDef::Fsl(fsl) => {
1143 context.record_fsl(&fsl);
1144 }
1145 }
1146 }
1147 context.build()
1148 }
1149}
1150
1151#[derive(Debug)]
1156pub struct RepDefUnraveler {
1157 rep_levels: Option<LevelBuffer>,
1158 def_levels: Option<LevelBuffer>,
1159 levels_to_rep: Vec<u16>,
1161 def_meaning: Arc<[DefinitionInterpretation]>,
1162 current_def_cmp: u16,
1164 current_rep_cmp: u16,
1166 current_layer: usize,
1169}
1170
1171impl RepDefUnraveler {
1172 pub fn new(
1174 rep_levels: Option<LevelBuffer>,
1175 def_levels: Option<LevelBuffer>,
1176 def_meaning: Arc<[DefinitionInterpretation]>,
1177 ) -> Self {
1178 let mut levels_to_rep = Vec::with_capacity(def_meaning.len());
1179 let mut rep_counter = 0;
1180 levels_to_rep.push(0);
1182 for meaning in def_meaning.as_ref() {
1183 match meaning {
1184 DefinitionInterpretation::AllValidItem | DefinitionInterpretation::AllValidList => {
1185 }
1187 DefinitionInterpretation::NullableItem => {
1188 levels_to_rep.push(rep_counter);
1190 }
1191 DefinitionInterpretation::NullableList => {
1192 rep_counter += 1;
1193 levels_to_rep.push(rep_counter);
1194 }
1195 DefinitionInterpretation::EmptyableList => {
1196 rep_counter += 1;
1197 levels_to_rep.push(rep_counter);
1198 }
1199 DefinitionInterpretation::NullableAndEmptyableList => {
1200 rep_counter += 1;
1201 levels_to_rep.push(rep_counter);
1202 levels_to_rep.push(rep_counter);
1203 }
1204 }
1205 }
1206 Self {
1207 rep_levels,
1208 def_levels,
1209 current_def_cmp: 0,
1210 current_rep_cmp: 0,
1211 levels_to_rep,
1212 current_layer: 0,
1213 def_meaning,
1214 }
1215 }
1216
1217 pub fn is_all_valid(&self) -> bool {
1218 self.def_meaning[self.current_layer].is_all_valid()
1219 }
1220
1221 pub fn max_lists(&self) -> usize {
1227 debug_assert!(
1228 self.def_meaning[self.current_layer] != DefinitionInterpretation::NullableItem
1229 );
1230 self.rep_levels
1231 .as_ref()
1232 .map(|levels| levels.len())
1234 .unwrap_or(0)
1235 }
1236
1237 pub fn unravel_offsets<T: ArrowNativeType>(
1242 &mut self,
1243 offsets: &mut Vec<T>,
1244 validity: Option<&mut BooleanBufferBuilder>,
1245 ) -> Result<()> {
1246 let rep_levels = self
1247 .rep_levels
1248 .as_mut()
1249 .expect("Expected repetition level but data didn't contain repetition");
1250 let valid_level = self.current_def_cmp;
1251 let (null_level, empty_level) = match self.def_meaning[self.current_layer] {
1252 DefinitionInterpretation::NullableList => {
1253 self.current_def_cmp += 1;
1254 (valid_level + 1, 0)
1255 }
1256 DefinitionInterpretation::EmptyableList => {
1257 self.current_def_cmp += 1;
1258 (0, valid_level + 1)
1259 }
1260 DefinitionInterpretation::NullableAndEmptyableList => {
1261 self.current_def_cmp += 2;
1262 (valid_level + 1, valid_level + 2)
1263 }
1264 DefinitionInterpretation::AllValidList => (0, 0),
1265 _ => unreachable!(),
1266 };
1267 self.current_layer += 1;
1268
1269 let mut max_level = null_level.max(empty_level);
1273 let upper_null = max_level;
1276 for level in self.def_meaning[self.current_layer..].iter() {
1277 match level {
1278 DefinitionInterpretation::NullableItem => {
1279 max_level += 1;
1280 }
1281 DefinitionInterpretation::AllValidItem => {}
1282 _ => {
1283 break;
1284 }
1285 }
1286 }
1287
1288 let mut curlen: usize = offsets.last().map(|o| o.as_usize()).unwrap_or(0);
1289
1290 offsets.pop();
1298
1299 let to_offset = |val: usize| {
1300 T::from_usize(val)
1301 .ok_or_else(|| Error::invalid_input("A single batch had more than i32::MAX values and so a large container type is required", location!()))
1302 };
1303 self.current_rep_cmp += 1;
1304 if let Some(def_levels) = &mut self.def_levels {
1305 assert!(rep_levels.len() == def_levels.len());
1306 let mut push_validity: Box<dyn FnMut(bool)> = if let Some(validity) = validity {
1309 Box::new(|is_valid| validity.append(is_valid))
1310 } else {
1311 Box::new(|_| {})
1312 };
1313 let mut read_idx = 0;
1317 let mut write_idx = 0;
1318 while read_idx < rep_levels.len() {
1319 unsafe {
1322 let rep_val = *rep_levels.get_unchecked(read_idx);
1323 if rep_val != 0 {
1324 let def_val = *def_levels.get_unchecked(read_idx);
1325 *rep_levels.get_unchecked_mut(write_idx) = rep_val - 1;
1327 *def_levels.get_unchecked_mut(write_idx) = def_val;
1328 write_idx += 1;
1329
1330 if def_val == 0 {
1331 offsets.push(to_offset(curlen)?);
1333 curlen += 1;
1334 push_validity(true);
1335 } else if def_val > max_level {
1336 } else if def_val == null_level || def_val > upper_null {
1338 offsets.push(to_offset(curlen)?);
1340 push_validity(false);
1341 } else if def_val == empty_level {
1342 offsets.push(to_offset(curlen)?);
1344 push_validity(true);
1345 } else {
1346 offsets.push(to_offset(curlen)?);
1348 curlen += 1;
1349 push_validity(true);
1350 }
1351 } else {
1352 curlen += 1;
1353 }
1354 read_idx += 1;
1355 }
1356 }
1357 offsets.push(to_offset(curlen)?);
1358 rep_levels.truncate(write_idx);
1359 def_levels.truncate(write_idx);
1360 Ok(())
1361 } else {
1362 let mut read_idx = 0;
1364 let mut write_idx = 0;
1365 let old_offsets_len = offsets.len();
1366 while read_idx < rep_levels.len() {
1367 unsafe {
1369 let rep_val = *rep_levels.get_unchecked(read_idx);
1370 if rep_val != 0 {
1371 offsets.push(to_offset(curlen)?);
1373 *rep_levels.get_unchecked_mut(write_idx) = rep_val - 1;
1374 write_idx += 1;
1375 }
1376 curlen += 1;
1377 read_idx += 1;
1378 }
1379 }
1380 let num_new_lists = offsets.len() - old_offsets_len;
1381 offsets.push(to_offset(curlen)?);
1382 rep_levels.truncate(offsets.len() - 1);
1383 if let Some(validity) = validity {
1384 validity.append_n(num_new_lists, true);
1387 }
1388 Ok(())
1389 }
1390 }
1391
1392 pub fn skip_validity(&mut self) {
1393 debug_assert!(
1394 self.def_meaning[self.current_layer] == DefinitionInterpretation::AllValidItem
1395 );
1396 self.current_layer += 1;
1397 }
1398
1399 pub fn unravel_validity(&mut self, validity: &mut BooleanBufferBuilder) {
1401 debug_assert!(
1402 self.def_meaning[self.current_layer] != DefinitionInterpretation::AllValidItem
1403 );
1404 self.current_layer += 1;
1405
1406 let def_levels = &self.def_levels.as_ref().unwrap();
1407
1408 let current_def_cmp = self.current_def_cmp;
1409 self.current_def_cmp += 1;
1410
1411 for is_valid in def_levels.iter().filter_map(|&level| {
1412 if self.levels_to_rep[level as usize] <= self.current_rep_cmp {
1413 Some(level <= current_def_cmp)
1414 } else {
1415 None
1416 }
1417 }) {
1418 validity.append(is_valid);
1419 }
1420 }
1421
1422 pub fn decimate(&mut self, dimension: usize) {
1423 if self.rep_levels.is_some() {
1424 todo!("Not yet supported FSL<...List<...>>");
1436 }
1437 let Some(def_levels) = self.def_levels.as_mut() else {
1438 return;
1439 };
1440 let mut read_idx = 0;
1441 let mut write_idx = 0;
1442 while read_idx < def_levels.len() {
1443 unsafe {
1444 *def_levels.get_unchecked_mut(write_idx) = *def_levels.get_unchecked(read_idx);
1445 }
1446 write_idx += 1;
1447 read_idx += dimension;
1448 }
1449 def_levels.truncate(write_idx);
1450 }
1451}
1452
1453#[derive(Debug)]
1467pub struct CompositeRepDefUnraveler {
1468 unravelers: Vec<RepDefUnraveler>,
1469}
1470
1471impl CompositeRepDefUnraveler {
1472 pub fn new(unravelers: Vec<RepDefUnraveler>) -> Self {
1473 Self { unravelers }
1474 }
1475
1476 pub fn unravel_validity(&mut self, num_values: usize) -> Option<NullBuffer> {
1480 let is_all_valid = self
1481 .unravelers
1482 .iter()
1483 .all(|unraveler| unraveler.is_all_valid());
1484
1485 if is_all_valid {
1486 for unraveler in self.unravelers.iter_mut() {
1487 unraveler.skip_validity();
1488 }
1489 None
1490 } else {
1491 let mut validity = BooleanBufferBuilder::new(num_values);
1492 for unraveler in self.unravelers.iter_mut() {
1493 unraveler.unravel_validity(&mut validity);
1494 }
1495 Some(NullBuffer::new(validity.finish()))
1496 }
1497 }
1498
1499 pub fn unravel_fsl_validity(
1500 &mut self,
1501 num_values: usize,
1502 dimension: usize,
1503 ) -> Option<NullBuffer> {
1504 for unraveler in self.unravelers.iter_mut() {
1505 unraveler.decimate(dimension);
1506 }
1507 self.unravel_validity(num_values)
1508 }
1509
1510 pub fn unravel_offsets<T: ArrowNativeType>(
1512 &mut self,
1513 ) -> Result<(OffsetBuffer<T>, Option<NullBuffer>)> {
1514 let mut is_all_valid = true;
1515 let mut max_num_lists = 0;
1516 for unraveler in self.unravelers.iter() {
1517 is_all_valid &= unraveler.is_all_valid();
1518 max_num_lists += unraveler.max_lists();
1519 }
1520
1521 let mut validity = if is_all_valid {
1522 None
1523 } else {
1524 Some(BooleanBufferBuilder::new(max_num_lists))
1527 };
1528
1529 let mut offsets = Vec::with_capacity(max_num_lists + 1);
1530
1531 for unraveler in self.unravelers.iter_mut() {
1532 unraveler.unravel_offsets(&mut offsets, validity.as_mut())?;
1533 }
1534
1535 Ok((
1536 OffsetBuffer::new(ScalarBuffer::from(offsets)),
1537 validity.map(|mut v| NullBuffer::new(v.finish())),
1538 ))
1539 }
1540}
1541
1542#[derive(Debug)]
1548pub struct BinaryControlWordIterator<I: Iterator<Item = (u16, u16)>, W> {
1549 repdef: I,
1550 def_width: usize,
1551 max_rep: u16,
1552 max_visible_def: u16,
1553 rep_mask: u16,
1554 def_mask: u16,
1555 bits_rep: u8,
1556 bits_def: u8,
1557 phantom: std::marker::PhantomData<W>,
1558}
1559
1560impl<I: Iterator<Item = (u16, u16)>> BinaryControlWordIterator<I, u8> {
1561 fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1562 let next = self.repdef.next()?;
1563 let control_word: u8 =
1564 (((next.0 & self.rep_mask) as u8) << self.def_width) + ((next.1 & self.def_mask) as u8);
1565 buf.push(control_word);
1566 let is_new_row = next.0 == self.max_rep;
1567 let is_visible = next.1 <= self.max_visible_def;
1568 let is_valid_item = next.1 == 0;
1569 Some(ControlWordDesc {
1570 is_new_row,
1571 is_visible,
1572 is_valid_item,
1573 })
1574 }
1575}
1576
1577impl<I: Iterator<Item = (u16, u16)>> BinaryControlWordIterator<I, u16> {
1578 fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1579 let next = self.repdef.next()?;
1580 let control_word: u16 =
1581 ((next.0 & self.rep_mask) << self.def_width) + (next.1 & self.def_mask);
1582 let control_word = control_word.to_le_bytes();
1583 buf.push(control_word[0]);
1584 buf.push(control_word[1]);
1585 let is_new_row = next.0 == self.max_rep;
1586 let is_visible = next.1 <= self.max_visible_def;
1587 let is_valid_item = next.1 == 0;
1588 Some(ControlWordDesc {
1589 is_new_row,
1590 is_visible,
1591 is_valid_item,
1592 })
1593 }
1594}
1595
1596impl<I: Iterator<Item = (u16, u16)>> BinaryControlWordIterator<I, u32> {
1597 fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1598 let next = self.repdef.next()?;
1599 let control_word: u32 = (((next.0 & self.rep_mask) as u32) << self.def_width)
1600 + ((next.1 & self.def_mask) as u32);
1601 let control_word = control_word.to_le_bytes();
1602 buf.push(control_word[0]);
1603 buf.push(control_word[1]);
1604 buf.push(control_word[2]);
1605 buf.push(control_word[3]);
1606 let is_new_row = next.0 == self.max_rep;
1607 let is_visible = next.1 <= self.max_visible_def;
1608 let is_valid_item = next.1 == 0;
1609 Some(ControlWordDesc {
1610 is_new_row,
1611 is_visible,
1612 is_valid_item,
1613 })
1614 }
1615}
1616
1617#[derive(Debug)]
1619pub struct UnaryControlWordIterator<I: Iterator<Item = u16>, W> {
1620 repdef: I,
1621 level_mask: u16,
1622 bits_rep: u8,
1623 bits_def: u8,
1624 max_rep: u16,
1625 phantom: std::marker::PhantomData<W>,
1626}
1627
1628impl<I: Iterator<Item = u16>> UnaryControlWordIterator<I, u8> {
1629 fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1630 let next = self.repdef.next()?;
1631 buf.push((next & self.level_mask) as u8);
1632 let is_new_row = self.max_rep == 0 || next == self.max_rep;
1633 let is_valid_item = next == 0 || self.bits_def == 0;
1634 Some(ControlWordDesc {
1635 is_new_row,
1636 is_visible: true,
1639 is_valid_item,
1640 })
1641 }
1642}
1643
1644impl<I: Iterator<Item = u16>> UnaryControlWordIterator<I, u16> {
1645 fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1646 let next = self.repdef.next().unwrap() & self.level_mask;
1647 let control_word = next.to_le_bytes();
1648 buf.push(control_word[0]);
1649 buf.push(control_word[1]);
1650 let is_new_row = self.max_rep == 0 || next == self.max_rep;
1651 let is_valid_item = next == 0 || self.bits_def == 0;
1652 Some(ControlWordDesc {
1653 is_new_row,
1654 is_visible: true,
1655 is_valid_item,
1656 })
1657 }
1658}
1659
1660impl<I: Iterator<Item = u16>> UnaryControlWordIterator<I, u32> {
1661 fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1662 let next = self.repdef.next()?;
1663 let next = (next & self.level_mask) as u32;
1664 let control_word = next.to_le_bytes();
1665 buf.push(control_word[0]);
1666 buf.push(control_word[1]);
1667 buf.push(control_word[2]);
1668 buf.push(control_word[3]);
1669 let is_new_row = self.max_rep == 0 || next as u16 == self.max_rep;
1670 let is_valid_item = next == 0 || self.bits_def == 0;
1671 Some(ControlWordDesc {
1672 is_new_row,
1673 is_visible: true,
1674 is_valid_item,
1675 })
1676 }
1677}
1678
1679#[derive(Debug)]
1681pub struct NilaryControlWordIterator {
1682 len: usize,
1683 idx: usize,
1684}
1685
1686impl NilaryControlWordIterator {
1687 fn append_next(&mut self) -> Option<ControlWordDesc> {
1688 if self.idx == self.len {
1689 None
1690 } else {
1691 self.idx += 1;
1692 Some(ControlWordDesc {
1693 is_new_row: true,
1694 is_visible: true,
1695 is_valid_item: true,
1696 })
1697 }
1698 }
1699}
1700
1701fn get_mask(width: u16) -> u16 {
1703 (1 << width) - 1
1704}
1705
1706type SpecificBinaryControlWordIterator<'a, T> = BinaryControlWordIterator<
1709 Zip<Copied<std::slice::Iter<'a, u16>>, Copied<std::slice::Iter<'a, u16>>>,
1710 T,
1711>;
1712
1713#[derive(Debug)]
1723pub enum ControlWordIterator<'a> {
1724 Binary8(SpecificBinaryControlWordIterator<'a, u8>),
1725 Binary16(SpecificBinaryControlWordIterator<'a, u16>),
1726 Binary32(SpecificBinaryControlWordIterator<'a, u32>),
1727 Unary8(UnaryControlWordIterator<Copied<std::slice::Iter<'a, u16>>, u8>),
1728 Unary16(UnaryControlWordIterator<Copied<std::slice::Iter<'a, u16>>, u16>),
1729 Unary32(UnaryControlWordIterator<Copied<std::slice::Iter<'a, u16>>, u32>),
1730 Nilary(NilaryControlWordIterator),
1731}
1732
1733#[derive(Debug)]
1735pub struct ControlWordDesc {
1736 pub is_new_row: bool,
1737 pub is_visible: bool,
1738 pub is_valid_item: bool,
1739}
1740
1741impl ControlWordIterator<'_> {
1742 pub fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1746 match self {
1747 Self::Binary8(iter) => iter.append_next(buf),
1748 Self::Binary16(iter) => iter.append_next(buf),
1749 Self::Binary32(iter) => iter.append_next(buf),
1750 Self::Unary8(iter) => iter.append_next(buf),
1751 Self::Unary16(iter) => iter.append_next(buf),
1752 Self::Unary32(iter) => iter.append_next(buf),
1753 Self::Nilary(iter) => iter.append_next(),
1754 }
1755 }
1756
1757 pub fn has_repetition(&self) -> bool {
1759 match self {
1760 Self::Binary8(_) | Self::Binary16(_) | Self::Binary32(_) => true,
1761 Self::Unary8(iter) => iter.bits_rep > 0,
1762 Self::Unary16(iter) => iter.bits_rep > 0,
1763 Self::Unary32(iter) => iter.bits_rep > 0,
1764 Self::Nilary(_) => false,
1765 }
1766 }
1767
1768 pub fn bytes_per_word(&self) -> usize {
1770 match self {
1771 Self::Binary8(_) => 1,
1772 Self::Binary16(_) => 2,
1773 Self::Binary32(_) => 4,
1774 Self::Unary8(_) => 1,
1775 Self::Unary16(_) => 2,
1776 Self::Unary32(_) => 4,
1777 Self::Nilary(_) => 0,
1778 }
1779 }
1780
1781 pub fn bits_rep(&self) -> u8 {
1783 match self {
1784 Self::Binary8(iter) => iter.bits_rep,
1785 Self::Binary16(iter) => iter.bits_rep,
1786 Self::Binary32(iter) => iter.bits_rep,
1787 Self::Unary8(iter) => iter.bits_rep,
1788 Self::Unary16(iter) => iter.bits_rep,
1789 Self::Unary32(iter) => iter.bits_rep,
1790 Self::Nilary(_) => 0,
1791 }
1792 }
1793
1794 pub fn bits_def(&self) -> u8 {
1796 match self {
1797 Self::Binary8(iter) => iter.bits_def,
1798 Self::Binary16(iter) => iter.bits_def,
1799 Self::Binary32(iter) => iter.bits_def,
1800 Self::Unary8(iter) => iter.bits_def,
1801 Self::Unary16(iter) => iter.bits_def,
1802 Self::Unary32(iter) => iter.bits_def,
1803 Self::Nilary(_) => 0,
1804 }
1805 }
1806}
1807
1808pub fn build_control_word_iterator<'a>(
1812 rep: Option<&'a [u16]>,
1813 max_rep: u16,
1814 def: Option<&'a [u16]>,
1815 max_def: u16,
1816 max_visible_def: u16,
1817 len: usize,
1818) -> ControlWordIterator<'a> {
1819 let rep_width = if max_rep == 0 {
1820 0
1821 } else {
1822 log_2_ceil(max_rep as u32) as u16
1823 };
1824 let rep_mask = if max_rep == 0 { 0 } else { get_mask(rep_width) };
1825 let def_width = if max_def == 0 {
1826 0
1827 } else {
1828 log_2_ceil(max_def as u32) as u16
1829 };
1830 let def_mask = if max_def == 0 { 0 } else { get_mask(def_width) };
1831 let total_width = rep_width + def_width;
1832 match (rep, def) {
1833 (Some(rep), Some(def)) => {
1834 let iter = rep.iter().copied().zip(def.iter().copied());
1835 let def_width = def_width as usize;
1836 if total_width <= 8 {
1837 ControlWordIterator::Binary8(BinaryControlWordIterator {
1838 repdef: iter,
1839 rep_mask,
1840 def_mask,
1841 def_width,
1842 max_rep,
1843 max_visible_def,
1844 bits_rep: rep_width as u8,
1845 bits_def: def_width as u8,
1846 phantom: std::marker::PhantomData,
1847 })
1848 } else if total_width <= 16 {
1849 ControlWordIterator::Binary16(BinaryControlWordIterator {
1850 repdef: iter,
1851 rep_mask,
1852 def_mask,
1853 def_width,
1854 max_rep,
1855 max_visible_def,
1856 bits_rep: rep_width as u8,
1857 bits_def: def_width as u8,
1858 phantom: std::marker::PhantomData,
1859 })
1860 } else {
1861 ControlWordIterator::Binary32(BinaryControlWordIterator {
1862 repdef: iter,
1863 rep_mask,
1864 def_mask,
1865 def_width,
1866 max_rep,
1867 max_visible_def,
1868 bits_rep: rep_width as u8,
1869 bits_def: def_width as u8,
1870 phantom: std::marker::PhantomData,
1871 })
1872 }
1873 }
1874 (Some(lev), None) => {
1875 let iter = lev.iter().copied();
1876 if total_width <= 8 {
1877 ControlWordIterator::Unary8(UnaryControlWordIterator {
1878 repdef: iter,
1879 level_mask: rep_mask,
1880 bits_rep: total_width as u8,
1881 bits_def: 0,
1882 max_rep,
1883 phantom: std::marker::PhantomData,
1884 })
1885 } else if total_width <= 16 {
1886 ControlWordIterator::Unary16(UnaryControlWordIterator {
1887 repdef: iter,
1888 level_mask: rep_mask,
1889 bits_rep: total_width as u8,
1890 bits_def: 0,
1891 max_rep,
1892 phantom: std::marker::PhantomData,
1893 })
1894 } else {
1895 ControlWordIterator::Unary32(UnaryControlWordIterator {
1896 repdef: iter,
1897 level_mask: rep_mask,
1898 bits_rep: total_width as u8,
1899 bits_def: 0,
1900 max_rep,
1901 phantom: std::marker::PhantomData,
1902 })
1903 }
1904 }
1905 (None, Some(lev)) => {
1906 let iter = lev.iter().copied();
1907 if total_width <= 8 {
1908 ControlWordIterator::Unary8(UnaryControlWordIterator {
1909 repdef: iter,
1910 level_mask: def_mask,
1911 bits_rep: 0,
1912 bits_def: total_width as u8,
1913 max_rep: 0,
1914 phantom: std::marker::PhantomData,
1915 })
1916 } else if total_width <= 16 {
1917 ControlWordIterator::Unary16(UnaryControlWordIterator {
1918 repdef: iter,
1919 level_mask: def_mask,
1920 bits_rep: 0,
1921 bits_def: total_width as u8,
1922 max_rep: 0,
1923 phantom: std::marker::PhantomData,
1924 })
1925 } else {
1926 ControlWordIterator::Unary32(UnaryControlWordIterator {
1927 repdef: iter,
1928 level_mask: def_mask,
1929 bits_rep: 0,
1930 bits_def: total_width as u8,
1931 max_rep: 0,
1932 phantom: std::marker::PhantomData,
1933 })
1934 }
1935 }
1936 (None, None) => ControlWordIterator::Nilary(NilaryControlWordIterator { len, idx: 0 }),
1937 }
1938}
1939
1940#[derive(Copy, Clone, Debug)]
1944pub enum ControlWordParser {
1945 BOTH8(u8, u32),
1948 BOTH16(u8, u32),
1949 BOTH32(u8, u32),
1950 REP8,
1951 REP16,
1952 REP32,
1953 DEF8,
1954 DEF16,
1955 DEF32,
1956 NIL,
1957}
1958
1959impl ControlWordParser {
1960 fn parse_both<const WORD_SIZE: u8>(
1961 src: &[u8],
1962 dst_rep: &mut Vec<u16>,
1963 dst_def: &mut Vec<u16>,
1964 bits_to_shift: u8,
1965 mask_to_apply: u32,
1966 ) {
1967 match WORD_SIZE {
1968 1 => {
1969 let word = src[0];
1970 let rep = word >> bits_to_shift;
1971 let def = word & (mask_to_apply as u8);
1972 dst_rep.push(rep as u16);
1973 dst_def.push(def as u16);
1974 }
1975 2 => {
1976 let word = u16::from_le_bytes([src[0], src[1]]);
1977 let rep = word >> bits_to_shift;
1978 let def = word & mask_to_apply as u16;
1979 dst_rep.push(rep);
1980 dst_def.push(def);
1981 }
1982 4 => {
1983 let word = u32::from_le_bytes([src[0], src[1], src[2], src[3]]);
1984 let rep = word >> bits_to_shift;
1985 let def = word & mask_to_apply;
1986 dst_rep.push(rep as u16);
1987 dst_def.push(def as u16);
1988 }
1989 _ => unreachable!(),
1990 }
1991 }
1992
1993 fn parse_desc_both<const WORD_SIZE: u8>(
1994 src: &[u8],
1995 bits_to_shift: u8,
1996 mask_to_apply: u32,
1997 max_rep: u16,
1998 max_visible_def: u16,
1999 ) -> ControlWordDesc {
2000 match WORD_SIZE {
2001 1 => {
2002 let word = src[0];
2003 let rep = word >> bits_to_shift;
2004 let def = word & (mask_to_apply as u8);
2005 let is_visible = def as u16 <= max_visible_def;
2006 let is_new_row = rep as u16 == max_rep;
2007 let is_valid_item = def == 0;
2008 ControlWordDesc {
2009 is_visible,
2010 is_new_row,
2011 is_valid_item,
2012 }
2013 }
2014 2 => {
2015 let word = u16::from_le_bytes([src[0], src[1]]);
2016 let rep = word >> bits_to_shift;
2017 let def = word & mask_to_apply as u16;
2018 let is_visible = def <= max_visible_def;
2019 let is_new_row = rep == max_rep;
2020 let is_valid_item = def == 0;
2021 ControlWordDesc {
2022 is_visible,
2023 is_new_row,
2024 is_valid_item,
2025 }
2026 }
2027 4 => {
2028 let word = u32::from_le_bytes([src[0], src[1], src[2], src[3]]);
2029 let rep = word >> bits_to_shift;
2030 let def = word & mask_to_apply;
2031 let is_visible = def as u16 <= max_visible_def;
2032 let is_new_row = rep as u16 == max_rep;
2033 let is_valid_item = def == 0;
2034 ControlWordDesc {
2035 is_visible,
2036 is_new_row,
2037 is_valid_item,
2038 }
2039 }
2040 _ => unreachable!(),
2041 }
2042 }
2043
2044 fn parse_one<const WORD_SIZE: u8>(src: &[u8], dst: &mut Vec<u16>) {
2045 match WORD_SIZE {
2046 1 => {
2047 let word = src[0];
2048 dst.push(word as u16);
2049 }
2050 2 => {
2051 let word = u16::from_le_bytes([src[0], src[1]]);
2052 dst.push(word);
2053 }
2054 4 => {
2055 let word = u32::from_le_bytes([src[0], src[1], src[2], src[3]]);
2056 dst.push(word as u16);
2057 }
2058 _ => unreachable!(),
2059 }
2060 }
2061
2062 fn parse_rep_desc_one<const WORD_SIZE: u8>(src: &[u8], max_rep: u16) -> ControlWordDesc {
2063 match WORD_SIZE {
2064 1 => ControlWordDesc {
2065 is_new_row: src[0] as u16 == max_rep,
2066 is_visible: true,
2067 is_valid_item: true,
2068 },
2069 2 => ControlWordDesc {
2070 is_new_row: u16::from_le_bytes([src[0], src[1]]) == max_rep,
2071 is_visible: true,
2072 is_valid_item: true,
2073 },
2074 4 => ControlWordDesc {
2075 is_new_row: u32::from_le_bytes([src[0], src[1], src[2], src[3]]) as u16 == max_rep,
2076 is_visible: true,
2077 is_valid_item: true,
2078 },
2079 _ => unreachable!(),
2080 }
2081 }
2082
2083 fn parse_def_desc_one<const WORD_SIZE: u8>(src: &[u8]) -> ControlWordDesc {
2084 match WORD_SIZE {
2085 1 => ControlWordDesc {
2086 is_new_row: true,
2087 is_visible: true,
2088 is_valid_item: src[0] == 0,
2089 },
2090 2 => ControlWordDesc {
2091 is_new_row: true,
2092 is_visible: true,
2093 is_valid_item: u16::from_le_bytes([src[0], src[1]]) == 0,
2094 },
2095 4 => ControlWordDesc {
2096 is_new_row: true,
2097 is_visible: true,
2098 is_valid_item: u32::from_le_bytes([src[0], src[1], src[2], src[3]]) as u16 == 0,
2099 },
2100 _ => unreachable!(),
2101 }
2102 }
2103
2104 pub fn bytes_per_word(&self) -> usize {
2106 match self {
2107 Self::BOTH8(..) => 1,
2108 Self::BOTH16(..) => 2,
2109 Self::BOTH32(..) => 4,
2110 Self::REP8 => 1,
2111 Self::REP16 => 2,
2112 Self::REP32 => 4,
2113 Self::DEF8 => 1,
2114 Self::DEF16 => 2,
2115 Self::DEF32 => 4,
2116 Self::NIL => 0,
2117 }
2118 }
2119
2120 pub fn parse(&self, src: &[u8], dst_rep: &mut Vec<u16>, dst_def: &mut Vec<u16>) {
2127 match self {
2128 Self::BOTH8(bits_to_shift, mask_to_apply) => {
2129 Self::parse_both::<1>(src, dst_rep, dst_def, *bits_to_shift, *mask_to_apply)
2130 }
2131 Self::BOTH16(bits_to_shift, mask_to_apply) => {
2132 Self::parse_both::<2>(src, dst_rep, dst_def, *bits_to_shift, *mask_to_apply)
2133 }
2134 Self::BOTH32(bits_to_shift, mask_to_apply) => {
2135 Self::parse_both::<4>(src, dst_rep, dst_def, *bits_to_shift, *mask_to_apply)
2136 }
2137 Self::REP8 => Self::parse_one::<1>(src, dst_rep),
2138 Self::REP16 => Self::parse_one::<2>(src, dst_rep),
2139 Self::REP32 => Self::parse_one::<4>(src, dst_rep),
2140 Self::DEF8 => Self::parse_one::<1>(src, dst_def),
2141 Self::DEF16 => Self::parse_one::<2>(src, dst_def),
2142 Self::DEF32 => Self::parse_one::<4>(src, dst_def),
2143 Self::NIL => {}
2144 }
2145 }
2146
2147 pub fn has_rep(&self) -> bool {
2149 match self {
2150 Self::BOTH8(..)
2151 | Self::BOTH16(..)
2152 | Self::BOTH32(..)
2153 | Self::REP8
2154 | Self::REP16
2155 | Self::REP32 => true,
2156 Self::DEF8 | Self::DEF16 | Self::DEF32 | Self::NIL => false,
2157 }
2158 }
2159
2160 pub fn parse_desc(&self, src: &[u8], max_rep: u16, max_visible_def: u16) -> ControlWordDesc {
2162 match self {
2163 Self::BOTH8(bits_to_shift, mask_to_apply) => Self::parse_desc_both::<1>(
2164 src,
2165 *bits_to_shift,
2166 *mask_to_apply,
2167 max_rep,
2168 max_visible_def,
2169 ),
2170 Self::BOTH16(bits_to_shift, mask_to_apply) => Self::parse_desc_both::<2>(
2171 src,
2172 *bits_to_shift,
2173 *mask_to_apply,
2174 max_rep,
2175 max_visible_def,
2176 ),
2177 Self::BOTH32(bits_to_shift, mask_to_apply) => Self::parse_desc_both::<4>(
2178 src,
2179 *bits_to_shift,
2180 *mask_to_apply,
2181 max_rep,
2182 max_visible_def,
2183 ),
2184 Self::REP8 => Self::parse_rep_desc_one::<1>(src, max_rep),
2185 Self::REP16 => Self::parse_rep_desc_one::<2>(src, max_rep),
2186 Self::REP32 => Self::parse_rep_desc_one::<4>(src, max_rep),
2187 Self::DEF8 => Self::parse_def_desc_one::<1>(src),
2188 Self::DEF16 => Self::parse_def_desc_one::<2>(src),
2189 Self::DEF32 => Self::parse_def_desc_one::<4>(src),
2190 Self::NIL => ControlWordDesc {
2191 is_new_row: true,
2192 is_valid_item: true,
2193 is_visible: true,
2194 },
2195 }
2196 }
2197
2198 pub fn new(bits_rep: u8, bits_def: u8) -> Self {
2200 let total_bits = bits_rep + bits_def;
2201
2202 enum WordSize {
2203 One,
2204 Two,
2205 Four,
2206 }
2207
2208 let word_size = if total_bits <= 8 {
2209 WordSize::One
2210 } else if total_bits <= 16 {
2211 WordSize::Two
2212 } else {
2213 WordSize::Four
2214 };
2215
2216 match (bits_rep > 0, bits_def > 0, word_size) {
2217 (false, false, _) => Self::NIL,
2218 (false, true, WordSize::One) => Self::DEF8,
2219 (false, true, WordSize::Two) => Self::DEF16,
2220 (false, true, WordSize::Four) => Self::DEF32,
2221 (true, false, WordSize::One) => Self::REP8,
2222 (true, false, WordSize::Two) => Self::REP16,
2223 (true, false, WordSize::Four) => Self::REP32,
2224 (true, true, WordSize::One) => Self::BOTH8(bits_def, get_mask(bits_def as u16) as u32),
2225 (true, true, WordSize::Two) => Self::BOTH16(bits_def, get_mask(bits_def as u16) as u32),
2226 (true, true, WordSize::Four) => {
2227 Self::BOTH32(bits_def, get_mask(bits_def as u16) as u32)
2228 }
2229 }
2230 }
2231}
2232
2233#[cfg(test)]
2234mod tests {
2235 use arrow_buffer::{NullBuffer, OffsetBuffer, ScalarBuffer};
2236
2237 use crate::repdef::{
2238 CompositeRepDefUnraveler, DefinitionInterpretation, RepDefUnraveler, SerializedRepDefs,
2239 };
2240
2241 use super::RepDefBuilder;
2242
2243 fn validity(values: &[bool]) -> NullBuffer {
2244 NullBuffer::from_iter(values.iter().copied())
2245 }
2246
2247 fn offsets_32(values: &[i32]) -> OffsetBuffer<i32> {
2248 OffsetBuffer::<i32>::new(ScalarBuffer::from_iter(values.iter().copied()))
2249 }
2250
2251 fn offsets_64(values: &[i64]) -> OffsetBuffer<i64> {
2252 OffsetBuffer::<i64>::new(ScalarBuffer::from_iter(values.iter().copied()))
2253 }
2254
2255 #[test]
2256 fn test_repdef_basic() {
2257 let mut builder = RepDefBuilder::default();
2259 builder.add_offsets(
2260 offsets_64(&[0, 2, 2, 5]),
2261 Some(validity(&[true, false, true])),
2262 );
2263 builder.add_offsets(
2264 offsets_64(&[0, 1, 3, 5, 5, 9]),
2265 Some(validity(&[true, true, true, false, true])),
2266 );
2267 builder.add_validity_bitmap(validity(&[
2268 true, true, true, false, false, false, true, true, false,
2269 ]));
2270
2271 let repdefs = RepDefBuilder::serialize(vec![builder]);
2272 let rep = repdefs.repetition_levels.unwrap();
2273 let def = repdefs.definition_levels.unwrap();
2274
2275 assert_eq!(vec![0, 0, 0, 3, 1, 1, 2, 1, 0, 0, 1], *def);
2276 assert_eq!(vec![2, 1, 0, 2, 2, 0, 1, 1, 0, 0, 0], *rep);
2277
2278 let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2281 Some(rep.as_ref().to_vec()),
2282 Some(def.as_ref().to_vec()),
2283 repdefs.def_meaning.into(),
2284 )]);
2285
2286 assert_eq!(
2289 unraveler.unravel_validity(9),
2290 Some(validity(&[
2291 true, true, true, false, false, false, true, true, false
2292 ]))
2293 );
2294 let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2295 assert_eq!(off.inner(), offsets_32(&[0, 1, 3, 5, 5, 9]).inner());
2296 assert_eq!(val, Some(validity(&[true, true, true, false, true])));
2297 let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2298 assert_eq!(off.inner(), offsets_32(&[0, 2, 2, 5]).inner());
2299 assert_eq!(val, Some(validity(&[true, false, true])));
2300 }
2301
2302 #[test]
2303 fn test_repdef_simple_null_empty_list() {
2304 let check = |repdefs: SerializedRepDefs, last_def: DefinitionInterpretation| {
2305 let rep = repdefs.repetition_levels.unwrap();
2306 let def = repdefs.definition_levels.unwrap();
2307
2308 assert_eq!([1, 0, 1, 1, 0, 0], *rep);
2309 assert_eq!([0, 0, 2, 0, 1, 0], *def);
2310 assert_eq!(
2311 vec![DefinitionInterpretation::NullableItem, last_def,],
2312 repdefs.def_meaning
2313 );
2314 };
2315
2316 let mut builder = RepDefBuilder::default();
2320 builder.add_offsets(
2321 offsets_32(&[0, 2, 2, 5]),
2322 Some(validity(&[true, false, true])),
2323 );
2324 builder.add_validity_bitmap(validity(&[true, true, true, false, true]));
2325
2326 let repdefs = RepDefBuilder::serialize(vec![builder]);
2327
2328 check(repdefs, DefinitionInterpretation::NullableList);
2329
2330 let mut builder = RepDefBuilder::default();
2332 builder.add_offsets(offsets_32(&[0, 2, 2, 5]), None);
2333 builder.add_validity_bitmap(validity(&[true, true, true, false, true]));
2334
2335 let repdefs = RepDefBuilder::serialize(vec![builder]);
2336
2337 check(repdefs, DefinitionInterpretation::EmptyableList);
2338 }
2339
2340 #[test]
2341 fn test_repdef_empty_list_at_end() {
2342 let mut builder = RepDefBuilder::default();
2344 builder.add_offsets(offsets_32(&[0, 2, 5, 5]), None);
2345 builder.add_validity_bitmap(validity(&[true, true, true, false, true]));
2346
2347 let repdefs = RepDefBuilder::serialize(vec![builder]);
2348
2349 let rep = repdefs.repetition_levels.unwrap();
2350 let def = repdefs.definition_levels.unwrap();
2351
2352 assert_eq!([1, 0, 1, 0, 0, 1], *rep);
2353 assert_eq!([0, 0, 0, 1, 0, 2], *def);
2354 assert_eq!(
2355 vec![
2356 DefinitionInterpretation::NullableItem,
2357 DefinitionInterpretation::EmptyableList,
2358 ],
2359 repdefs.def_meaning
2360 );
2361 }
2362
2363 #[test]
2364 fn test_repdef_abnormal_nulls() {
2365 let mut builder = RepDefBuilder::default();
2368 builder.add_offsets(
2369 offsets_32(&[0, 2, 5, 8]),
2370 Some(validity(&[true, false, true])),
2371 );
2372 builder.add_no_null(5);
2375
2376 let repdefs = RepDefBuilder::serialize(vec![builder]);
2377
2378 let rep = repdefs.repetition_levels.unwrap();
2379 let def = repdefs.definition_levels.unwrap();
2380
2381 assert_eq!([1, 0, 1, 1, 0, 0], *rep);
2382 assert_eq!([0, 0, 1, 0, 0, 0], *def);
2383
2384 assert_eq!(
2385 vec![
2386 DefinitionInterpretation::AllValidItem,
2387 DefinitionInterpretation::NullableList,
2388 ],
2389 repdefs.def_meaning
2390 );
2391 }
2392
2393 #[test]
2394 fn test_repdef_fsl() {
2395 let mut builder = RepDefBuilder::default();
2396 builder.add_fsl(Some(validity(&[true, false])), 2, 2);
2397 builder.add_fsl(None, 2, 4);
2398 builder.add_validity_bitmap(validity(&[
2399 true, false, true, false, true, false, true, false,
2400 ]));
2401
2402 let repdefs = RepDefBuilder::serialize(vec![builder]);
2403
2404 assert_eq!(
2405 vec![
2406 DefinitionInterpretation::NullableItem,
2407 DefinitionInterpretation::AllValidItem,
2408 DefinitionInterpretation::NullableItem
2409 ],
2410 repdefs.def_meaning
2411 );
2412
2413 assert!(repdefs.repetition_levels.is_none());
2414
2415 let def = repdefs.definition_levels.unwrap();
2416
2417 assert_eq!([0, 1, 0, 1, 2, 2, 2, 2], *def);
2418
2419 let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2420 None,
2421 Some(def.as_ref().to_vec()),
2422 repdefs.def_meaning.into(),
2423 )]);
2424
2425 assert_eq!(
2426 unraveler.unravel_validity(8),
2427 Some(validity(&[
2428 true, false, true, false, false, false, false, false
2429 ]))
2430 );
2431 assert_eq!(unraveler.unravel_fsl_validity(4, 2), None);
2432 assert_eq!(
2433 unraveler.unravel_fsl_validity(2, 2),
2434 Some(validity(&[true, false]))
2435 );
2436 }
2437
2438 #[test]
2439 fn test_repdef_fsl_allvalid_item() {
2440 let mut builder = RepDefBuilder::default();
2441 builder.add_fsl(Some(validity(&[true, false])), 2, 2);
2442 builder.add_fsl(None, 2, 4);
2443 builder.add_no_null(8);
2444
2445 let repdefs = RepDefBuilder::serialize(vec![builder]);
2446
2447 assert_eq!(
2448 vec![
2449 DefinitionInterpretation::AllValidItem,
2450 DefinitionInterpretation::AllValidItem,
2451 DefinitionInterpretation::NullableItem
2452 ],
2453 repdefs.def_meaning
2454 );
2455
2456 assert!(repdefs.repetition_levels.is_none());
2457
2458 let def = repdefs.definition_levels.unwrap();
2459
2460 assert_eq!([0, 0, 0, 0, 1, 1, 1, 1], *def);
2461
2462 let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2463 None,
2464 Some(def.as_ref().to_vec()),
2465 repdefs.def_meaning.into(),
2466 )]);
2467
2468 assert_eq!(unraveler.unravel_validity(8), None);
2469 assert_eq!(unraveler.unravel_fsl_validity(4, 2), None);
2470 assert_eq!(
2471 unraveler.unravel_fsl_validity(2, 2),
2472 Some(validity(&[true, false]))
2473 );
2474 }
2475
2476 #[test]
2477 fn test_repdef_sliced_offsets() {
2478 let mut builder = RepDefBuilder::default();
2481 builder.add_offsets(
2482 offsets_32(&[5, 7, 7, 10]),
2483 Some(validity(&[true, false, true])),
2484 );
2485 builder.add_no_null(5);
2486
2487 let repdefs = RepDefBuilder::serialize(vec![builder]);
2488
2489 let rep = repdefs.repetition_levels.unwrap();
2490 let def = repdefs.definition_levels.unwrap();
2491
2492 assert_eq!([1, 0, 1, 1, 0, 0], *rep);
2493 assert_eq!([0, 0, 1, 0, 0, 0], *def);
2494
2495 assert_eq!(
2496 vec![
2497 DefinitionInterpretation::AllValidItem,
2498 DefinitionInterpretation::NullableList,
2499 ],
2500 repdefs.def_meaning
2501 );
2502 }
2503
2504 #[test]
2505 fn test_repdef_complex_null_empty() {
2506 let mut builder = RepDefBuilder::default();
2507 builder.add_offsets(
2508 offsets_32(&[0, 4, 4, 4, 6]),
2509 Some(validity(&[true, false, true, true])),
2510 );
2511 builder.add_offsets(
2512 offsets_32(&[0, 1, 1, 2, 2, 2, 3]),
2513 Some(validity(&[true, false, true, false, true, true])),
2514 );
2515 builder.add_no_null(3);
2516
2517 let repdefs = RepDefBuilder::serialize(vec![builder]);
2518
2519 let rep = repdefs.repetition_levels.unwrap();
2520 let def = repdefs.definition_levels.unwrap();
2521
2522 assert_eq!([2, 1, 1, 1, 2, 2, 2, 1], *rep);
2523 assert_eq!([0, 1, 0, 1, 3, 4, 2, 0], *def);
2524 }
2525
2526 #[test]
2527 fn test_repdef_empty_list_no_null() {
2528 let mut builder = RepDefBuilder::default();
2531 builder.add_offsets(offsets_32(&[0, 4, 4, 4, 6]), None);
2532 builder.add_no_null(6);
2533
2534 let repdefs = RepDefBuilder::serialize(vec![builder]);
2535
2536 let rep = repdefs.repetition_levels.unwrap();
2537 let def = repdefs.definition_levels.unwrap();
2538
2539 assert_eq!([1, 0, 0, 0, 1, 1, 1, 0], *rep);
2540 assert_eq!([0, 0, 0, 0, 1, 1, 0, 0], *def);
2541
2542 let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2543 Some(rep.as_ref().to_vec()),
2544 Some(def.as_ref().to_vec()),
2545 repdefs.def_meaning.into(),
2546 )]);
2547
2548 assert_eq!(unraveler.unravel_validity(6), None);
2549 let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2550 assert_eq!(off.inner(), offsets_32(&[0, 4, 4, 4, 6]).inner());
2551 assert_eq!(val, None);
2552 }
2553
2554 #[test]
2555 fn test_repdef_all_valid() {
2556 let mut builder = RepDefBuilder::default();
2557 builder.add_offsets(offsets_64(&[0, 2, 3, 5]), None);
2558 builder.add_offsets(offsets_64(&[0, 1, 3, 5, 7, 9]), None);
2559 builder.add_no_null(9);
2560
2561 let repdefs = RepDefBuilder::serialize(vec![builder]);
2562 let rep = repdefs.repetition_levels.unwrap();
2563 assert!(repdefs.definition_levels.is_none());
2564
2565 assert_eq!([2, 1, 0, 2, 0, 2, 0, 1, 0], *rep);
2566
2567 let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2568 Some(rep.as_ref().to_vec()),
2569 None,
2570 repdefs.def_meaning.into(),
2571 )]);
2572
2573 assert_eq!(unraveler.unravel_validity(9), None);
2574 let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2575 assert_eq!(off.inner(), offsets_32(&[0, 1, 3, 5, 7, 9]).inner());
2576 assert_eq!(val, None);
2577 let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2578 assert_eq!(off.inner(), offsets_32(&[0, 2, 3, 5]).inner());
2579 assert_eq!(val, None);
2580 }
2581
2582 #[test]
2583 fn test_only_empty_lists() {
2584 let mut builder = RepDefBuilder::default();
2585 builder.add_offsets(offsets_32(&[0, 4, 4, 4, 6]), None);
2586 builder.add_no_null(6);
2587
2588 let repdefs = RepDefBuilder::serialize(vec![builder]);
2589
2590 let rep = repdefs.repetition_levels.unwrap();
2591 let def = repdefs.definition_levels.unwrap();
2592
2593 assert_eq!([1, 0, 0, 0, 1, 1, 1, 0], *rep);
2594 assert_eq!([0, 0, 0, 0, 1, 1, 0, 0], *def);
2595
2596 let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2597 Some(rep.as_ref().to_vec()),
2598 Some(def.as_ref().to_vec()),
2599 repdefs.def_meaning.into(),
2600 )]);
2601
2602 assert_eq!(unraveler.unravel_validity(6), None);
2603 let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2604 assert_eq!(off.inner(), offsets_32(&[0, 4, 4, 4, 6]).inner());
2605 assert_eq!(val, None);
2606 }
2607
2608 #[test]
2609 fn test_only_null_lists() {
2610 let mut builder = RepDefBuilder::default();
2611 builder.add_offsets(
2612 offsets_32(&[0, 4, 4, 4, 6]),
2613 Some(validity(&[true, false, false, true])),
2614 );
2615 builder.add_no_null(6);
2616
2617 let repdefs = RepDefBuilder::serialize(vec![builder]);
2618
2619 let rep = repdefs.repetition_levels.unwrap();
2620 let def = repdefs.definition_levels.unwrap();
2621
2622 assert_eq!([1, 0, 0, 0, 1, 1, 1, 0], *rep);
2623 assert_eq!([0, 0, 0, 0, 1, 1, 0, 0], *def);
2624
2625 let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2626 Some(rep.as_ref().to_vec()),
2627 Some(def.as_ref().to_vec()),
2628 repdefs.def_meaning.into(),
2629 )]);
2630
2631 assert_eq!(unraveler.unravel_validity(6), None);
2632 let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2633 assert_eq!(off.inner(), offsets_32(&[0, 4, 4, 4, 6]).inner());
2634 assert_eq!(val, Some(validity(&[true, false, false, true])));
2635 }
2636
2637 #[test]
2638 fn test_null_and_empty_lists() {
2639 let mut builder = RepDefBuilder::default();
2640 builder.add_offsets(
2641 offsets_32(&[0, 4, 4, 4, 6]),
2642 Some(validity(&[true, false, true, true])),
2643 );
2644 builder.add_no_null(6);
2645
2646 let repdefs = RepDefBuilder::serialize(vec![builder]);
2647
2648 let rep = repdefs.repetition_levels.unwrap();
2649 let def = repdefs.definition_levels.unwrap();
2650
2651 assert_eq!([1, 0, 0, 0, 1, 1, 1, 0], *rep);
2652 assert_eq!([0, 0, 0, 0, 1, 2, 0, 0], *def);
2653
2654 let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2655 Some(rep.as_ref().to_vec()),
2656 Some(def.as_ref().to_vec()),
2657 repdefs.def_meaning.into(),
2658 )]);
2659
2660 assert_eq!(unraveler.unravel_validity(6), None);
2661 let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2662 assert_eq!(off.inner(), offsets_32(&[0, 4, 4, 4, 6]).inner());
2663 assert_eq!(val, Some(validity(&[true, false, true, true])));
2664 }
2665
2666 #[test]
2667 fn test_repdef_no_rep() {
2668 let mut builder = RepDefBuilder::default();
2669 builder.add_no_null(5);
2670 builder.add_validity_bitmap(validity(&[false, false, true, true, true]));
2671 builder.add_validity_bitmap(validity(&[false, true, true, true, false]));
2672
2673 let repdefs = RepDefBuilder::serialize(vec![builder]);
2674 assert!(repdefs.repetition_levels.is_none());
2675 let def = repdefs.definition_levels.unwrap();
2676
2677 assert_eq!([2, 2, 0, 0, 1], *def);
2678
2679 let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2680 None,
2681 Some(def.as_ref().to_vec()),
2682 repdefs.def_meaning.into(),
2683 )]);
2684
2685 assert_eq!(
2686 unraveler.unravel_validity(5),
2687 Some(validity(&[false, false, true, true, false]))
2688 );
2689 assert_eq!(
2690 unraveler.unravel_validity(5),
2691 Some(validity(&[false, false, true, true, true]))
2692 );
2693 assert_eq!(unraveler.unravel_validity(5), None);
2694 }
2695
2696 #[test]
2697 fn test_composite_unravel() {
2698 let mut builder = RepDefBuilder::default();
2699 builder.add_offsets(
2700 offsets_64(&[0, 2, 2, 5]),
2701 Some(validity(&[true, false, true])),
2702 );
2703 builder.add_no_null(5);
2704 let repdef1 = RepDefBuilder::serialize(vec![builder]);
2705
2706 let mut builder = RepDefBuilder::default();
2707 builder.add_offsets(offsets_64(&[0, 1, 3, 5, 7, 9]), None);
2708 builder.add_no_null(9);
2709 let repdef2 = RepDefBuilder::serialize(vec![builder]);
2710
2711 let rep1 = repdef1.repetition_levels.clone().unwrap();
2712 let def1 = repdef1.definition_levels.clone().unwrap();
2713 let rep2 = repdef2.repetition_levels.clone().unwrap();
2714 assert!(repdef2.definition_levels.is_none());
2715
2716 assert_eq!([1, 0, 1, 1, 0, 0], *rep1);
2717 assert_eq!([0, 0, 1, 0, 0, 0], *def1);
2718 assert_eq!([1, 1, 0, 1, 0, 1, 0, 1, 0], *rep2);
2719
2720 let unravel1 = RepDefUnraveler::new(
2721 repdef1.repetition_levels.map(|l| l.to_vec()),
2722 repdef1.definition_levels.map(|l| l.to_vec()),
2723 repdef1.def_meaning.into(),
2724 );
2725 let unravel2 = RepDefUnraveler::new(
2726 repdef2.repetition_levels.map(|l| l.to_vec()),
2727 repdef2.definition_levels.map(|l| l.to_vec()),
2728 repdef2.def_meaning.into(),
2729 );
2730
2731 let mut unraveler = CompositeRepDefUnraveler::new(vec![unravel1, unravel2]);
2732
2733 assert!(unraveler.unravel_validity(9).is_none());
2734 let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2735 assert_eq!(
2736 off.inner(),
2737 offsets_32(&[0, 2, 2, 5, 6, 8, 10, 12, 14]).inner()
2738 );
2739 assert_eq!(
2740 val,
2741 Some(validity(&[true, false, true, true, true, true, true, true]))
2742 );
2743 }
2744
2745 #[test]
2746 fn test_repdef_multiple_builders() {
2747 let mut builder1 = RepDefBuilder::default();
2749 builder1.add_offsets(offsets_64(&[0, 2]), None);
2750 builder1.add_offsets(offsets_64(&[0, 1, 3]), None);
2751 builder1.add_validity_bitmap(validity(&[true, true, true]));
2752
2753 let mut builder2 = RepDefBuilder::default();
2754 builder2.add_offsets(offsets_64(&[0, 0, 3]), Some(validity(&[false, true])));
2755 builder2.add_offsets(
2756 offsets_64(&[0, 2, 2, 6]),
2757 Some(validity(&[true, false, true])),
2758 );
2759 builder2.add_validity_bitmap(validity(&[false, false, false, true, true, false]));
2760
2761 let repdefs = RepDefBuilder::serialize(vec![builder1, builder2]);
2762
2763 let rep = repdefs.repetition_levels.unwrap();
2764 let def = repdefs.definition_levels.unwrap();
2765
2766 assert_eq!([2, 1, 0, 2, 2, 0, 1, 1, 0, 0, 0], *rep);
2767 assert_eq!([0, 0, 0, 3, 1, 1, 2, 1, 0, 0, 1], *def);
2768 }
2769
2770 #[test]
2771 fn test_slicer() {
2772 let mut builder = RepDefBuilder::default();
2773 builder.add_offsets(
2774 offsets_64(&[0, 2, 2, 30, 30]),
2775 Some(validity(&[true, false, true, true])),
2776 );
2777 builder.add_no_null(30);
2778
2779 let repdefs = RepDefBuilder::serialize(vec![builder]);
2780
2781 let mut rep_slicer = repdefs.rep_slicer().unwrap();
2782
2783 assert_eq!(rep_slicer.slice_next(5).len(), 12);
2785 assert_eq!(rep_slicer.slice_next(20).len(), 40);
2787 assert_eq!(rep_slicer.slice_rest().len(), 12);
2789
2790 let mut def_slicer = repdefs.rep_slicer().unwrap();
2791
2792 assert_eq!(def_slicer.slice_next(5).len(), 12);
2794 assert_eq!(def_slicer.slice_next(20).len(), 40);
2796 assert_eq!(def_slicer.slice_rest().len(), 12);
2798 }
2799
2800 #[test]
2801 fn test_control_words() {
2802 fn check(
2804 rep: &[u16],
2805 def: &[u16],
2806 expected_values: Vec<u8>,
2807 expected_bytes_per_word: usize,
2808 expected_bits_rep: u8,
2809 expected_bits_def: u8,
2810 ) {
2811 let num_vals = rep.len().max(def.len());
2812 let max_rep = rep.iter().max().copied().unwrap_or(0);
2813 let max_def = def.iter().max().copied().unwrap_or(0);
2814
2815 let in_rep = if rep.is_empty() { None } else { Some(rep) };
2816 let in_def = if def.is_empty() { None } else { Some(def) };
2817
2818 let mut iter = super::build_control_word_iterator(
2819 in_rep,
2820 max_rep,
2821 in_def,
2822 max_def,
2823 max_def + 1,
2824 expected_values.len(),
2825 );
2826 assert_eq!(iter.bytes_per_word(), expected_bytes_per_word);
2827 assert_eq!(iter.bits_rep(), expected_bits_rep);
2828 assert_eq!(iter.bits_def(), expected_bits_def);
2829 let mut cw_vec = Vec::with_capacity(num_vals * iter.bytes_per_word());
2830
2831 for _ in 0..num_vals {
2832 iter.append_next(&mut cw_vec);
2833 }
2834 assert!(iter.append_next(&mut cw_vec).is_none());
2835
2836 assert_eq!(expected_values, cw_vec);
2837
2838 let parser = super::ControlWordParser::new(expected_bits_rep, expected_bits_def);
2839
2840 let mut rep_out = Vec::with_capacity(num_vals);
2841 let mut def_out = Vec::with_capacity(num_vals);
2842
2843 if expected_bytes_per_word > 0 {
2844 for slice in cw_vec.chunks_exact(expected_bytes_per_word) {
2845 parser.parse(slice, &mut rep_out, &mut def_out);
2846 }
2847 }
2848
2849 assert_eq!(rep, rep_out.as_slice());
2850 assert_eq!(def, def_out.as_slice());
2851 }
2852
2853 let rep = &[0_u16, 7, 3, 2, 9, 8, 12, 5];
2855 let def = &[5_u16, 3, 1, 2, 12, 15, 0, 2];
2856 let expected = vec![
2857 0b00000101, 0b01110011, 0b00110001, 0b00100010, 0b10011100, 0b10001111, 0b11000000, 0b01010010, ];
2866 check(rep, def, expected, 1, 4, 4);
2867
2868 let rep = &[0_u16, 7, 3, 2, 9, 8, 12, 5];
2870 let def = &[5_u16, 3, 1, 2, 12, 22, 0, 2];
2871 let expected = vec![
2872 0b00000101, 0b00000000, 0b11100011, 0b00000000, 0b01100001, 0b00000000, 0b01000010, 0b00000000, 0b00101100, 0b00000001, 0b00010110, 0b00000001, 0b10000000, 0b00000001, 0b10100010, 0b00000000, ];
2881 check(rep, def, expected, 2, 4, 5);
2882
2883 let levels = &[0_u16, 7, 3, 2, 9, 8, 12, 5];
2885 let expected = vec![
2886 0b00000000, 0b00000111, 0b00000011, 0b00000010, 0b00001001, 0b00001000, 0b00001100, 0b00000101, ];
2895 check(levels, &[], expected.clone(), 1, 4, 0);
2896
2897 check(&[], levels, expected, 1, 0, 4);
2899
2900 check(&[], &[], Vec::default(), 0, 0, 0);
2902 }
2903
2904 #[test]
2905 fn test_control_words_rep_index() {
2906 fn check(
2907 rep: &[u16],
2908 def: &[u16],
2909 expected_new_rows: Vec<bool>,
2910 expected_is_visible: Vec<bool>,
2911 ) {
2912 let num_vals = rep.len().max(def.len());
2913 let max_rep = rep.iter().max().copied().unwrap_or(0);
2914 let max_def = def.iter().max().copied().unwrap_or(0);
2915
2916 let in_rep = if rep.is_empty() { None } else { Some(rep) };
2917 let in_def = if def.is_empty() { None } else { Some(def) };
2918
2919 let mut iter = super::build_control_word_iterator(
2920 in_rep,
2921 max_rep,
2922 in_def,
2923 max_def,
2924 2,
2925 expected_new_rows.len(),
2926 );
2927
2928 let mut cw_vec = Vec::with_capacity(num_vals * iter.bytes_per_word());
2929 let mut expected_new_rows = expected_new_rows.iter().copied();
2930 let mut expected_is_visible = expected_is_visible.iter().copied();
2931 for _ in 0..expected_new_rows.len() {
2932 let word_desc = iter.append_next(&mut cw_vec).unwrap();
2933 assert_eq!(word_desc.is_new_row, expected_new_rows.next().unwrap());
2934 assert_eq!(word_desc.is_visible, expected_is_visible.next().unwrap());
2935 }
2936 assert!(iter.append_next(&mut cw_vec).is_none());
2937 }
2938
2939 let rep = &[2_u16, 1, 0, 2, 2, 0, 1, 1, 0, 2, 0];
2941 let def = &[0_u16, 0, 0, 3, 1, 1, 2, 1, 0, 0, 1];
2943
2944 check(
2946 rep,
2947 def,
2948 vec![
2949 true, false, false, true, true, false, false, false, false, true, false,
2950 ],
2951 vec![
2952 true, true, true, false, true, true, true, true, true, true, true,
2953 ],
2954 );
2955 check(
2957 rep,
2958 &[],
2959 vec![
2960 true, false, false, true, true, false, false, false, false, true, false,
2961 ],
2962 vec![true; 11],
2963 );
2964 check(
2966 &[],
2967 def,
2968 vec![
2969 true, true, true, true, true, true, true, true, true, true, true,
2970 ],
2971 vec![true; 11],
2972 );
2973 check(
2975 &[],
2976 &[],
2977 vec![
2978 true, true, true, true, true, true, true, true, true, true, true,
2979 ],
2980 vec![true; 11],
2981 );
2982 }
2983
2984 #[test]
2985 fn regress_empty_list_case() {
2986 let mut builder = RepDefBuilder::default();
2988 builder.add_validity_bitmap(validity(&[true, false, true]));
2989 builder.add_offsets(
2990 offsets_32(&[0, 0, 0, 0]),
2991 Some(validity(&[false, false, false])),
2992 );
2993 builder.add_no_null(0);
2994
2995 let repdefs = RepDefBuilder::serialize(vec![builder]);
2996 let rep = repdefs.repetition_levels.unwrap();
2997 let def = repdefs.definition_levels.unwrap();
2998
2999 assert_eq!([1, 1, 1], *rep);
3000 assert_eq!([1, 2, 1], *def);
3001
3002 let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
3003 Some(rep.as_ref().to_vec()),
3004 Some(def.as_ref().to_vec()),
3005 repdefs.def_meaning.into(),
3006 )]);
3007
3008 assert_eq!(unraveler.unravel_validity(0), None);
3009 let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
3010 assert_eq!(off.inner(), offsets_32(&[0, 0, 0, 0]).inner());
3011 assert_eq!(val, Some(validity(&[false, false, false])));
3012 let val = unraveler.unravel_validity(3).unwrap();
3013 assert_eq!(val.inner(), validity(&[true, false, true]).inner());
3014 }
3015
3016 #[test]
3017 fn regress_list_ends_null_case() {
3018 let mut builder = RepDefBuilder::default();
3019 builder.add_offsets(
3020 offsets_64(&[0, 1, 2, 2]),
3021 Some(validity(&[true, true, false])),
3022 );
3023 builder.add_offsets(offsets_64(&[0, 1, 1]), Some(validity(&[true, false])));
3024 builder.add_no_null(1);
3025
3026 let repdefs = RepDefBuilder::serialize(vec![builder]);
3027 let rep = repdefs.repetition_levels.unwrap();
3028 let def = repdefs.definition_levels.unwrap();
3029
3030 assert_eq!([2, 2, 2], *rep);
3031 assert_eq!([0, 1, 2], *def);
3032
3033 let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
3034 Some(rep.as_ref().to_vec()),
3035 Some(def.as_ref().to_vec()),
3036 repdefs.def_meaning.into(),
3037 )]);
3038
3039 assert_eq!(unraveler.unravel_validity(1), None);
3040 let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
3041 assert_eq!(off.inner(), offsets_32(&[0, 1, 1]).inner());
3042 assert_eq!(val, Some(validity(&[true, false])));
3043 let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
3044 assert_eq!(off.inner(), offsets_32(&[0, 1, 2, 2]).inner());
3045 assert_eq!(val, Some(validity(&[true, true, false])));
3046 }
3047}