1use std::{
110 iter::{Copied, Zip},
111 sync::Arc,
112};
113
114use arrow_array::OffsetSizeTrait;
115use arrow_buffer::{
116 ArrowNativeType, BooleanBuffer, BooleanBufferBuilder, NullBuffer, OffsetBuffer, ScalarBuffer,
117};
118use lance_core::{utils::bit::log_2_ceil, Error, Result};
119use snafu::location;
120
121use crate::buffer::LanceBuffer;
122
123pub type LevelBuffer = Vec<u16>;
124
125const SPECIAL_THRESHOLD: u16 = u16::MAX / 2;
136
137#[derive(Clone, Debug)]
140struct OffsetDesc {
141 offsets: Arc<[i64]>,
142 validity: Option<BooleanBuffer>,
143 has_empty_lists: bool,
144 num_values: usize,
145 num_specials: usize,
146}
147
148#[derive(Clone, Debug)]
151struct ValidityDesc {
152 validity: Option<BooleanBuffer>,
153 num_values: usize,
154}
155
156#[derive(Clone, Debug)]
160struct FslDesc {
161 validity: Option<BooleanBuffer>,
162 dimension: usize,
163 num_values: usize,
164}
165
166#[derive(Clone, Debug)]
170enum RawRepDef {
171 Offsets(OffsetDesc),
172 Validity(ValidityDesc),
173 Fsl(FslDesc),
174}
175
176impl RawRepDef {
177 fn has_nulls(&self) -> bool {
179 match self {
180 Self::Offsets(OffsetDesc { validity, .. }) => validity.is_some(),
181 Self::Validity(ValidityDesc { validity, .. }) => validity.is_some(),
182 Self::Fsl(FslDesc { validity, .. }) => validity.is_some(),
183 }
184 }
185
186 fn num_values(&self) -> usize {
188 match self {
189 Self::Offsets(OffsetDesc { num_values, .. }) => *num_values,
190 Self::Validity(ValidityDesc { num_values, .. }) => *num_values,
191 Self::Fsl(FslDesc { num_values, .. }) => *num_values,
192 }
193 }
194
195 fn num_specials(&self) -> usize {
197 match self {
198 Self::Offsets(OffsetDesc { num_specials, .. }) => *num_specials,
199 _ => 0,
200 }
201 }
202
203 fn max_def(&self) -> u16 {
205 match self {
206 Self::Offsets(OffsetDesc {
207 has_empty_lists,
208 validity,
209 ..
210 }) => {
211 let mut max_def = 0;
212 if *has_empty_lists {
213 max_def += 1;
214 }
215 if validity.is_some() {
216 max_def += 1;
217 }
218 max_def
219 }
220 Self::Validity(ValidityDesc { validity: None, .. }) => 0,
221 Self::Validity(ValidityDesc { .. }) => 1,
222 Self::Fsl(FslDesc { validity: None, .. }) => 0,
223 Self::Fsl(FslDesc { .. }) => 1,
224 }
225 }
226
227 fn max_rep(&self) -> u16 {
229 match self {
230 Self::Offsets(_) => 1,
231 _ => 0,
232 }
233 }
234}
235
236#[derive(Debug)]
239pub struct SerializedRepDefs {
240 pub repetition_levels: Option<Arc<[u16]>>,
244 pub definition_levels: Option<Arc<[u16]>>,
248 pub def_meaning: Vec<DefinitionInterpretation>,
250 pub max_visible_level: Option<u16>,
257}
258
259impl SerializedRepDefs {
260 pub fn new(
261 repetition_levels: Option<LevelBuffer>,
262 definition_levels: Option<LevelBuffer>,
263 def_meaning: Vec<DefinitionInterpretation>,
264 ) -> Self {
265 let first_list = def_meaning.iter().position(|level| level.is_list());
266 let max_visible_level = first_list.map(|first_list| {
267 def_meaning
268 .iter()
269 .map(|level| level.num_def_levels())
270 .take(first_list)
271 .sum::<u16>()
272 });
273 Self {
274 repetition_levels: repetition_levels.map(Arc::from),
275 definition_levels: definition_levels.map(Arc::from),
276 def_meaning,
277 max_visible_level,
278 }
279 }
280
281 pub fn empty(def_meaning: Vec<DefinitionInterpretation>) -> Self {
283 Self {
284 repetition_levels: None,
285 definition_levels: None,
286 def_meaning,
287 max_visible_level: None,
288 }
289 }
290
291 pub fn rep_slicer(&self) -> Option<RepDefSlicer<'_>> {
292 self.repetition_levels
293 .as_ref()
294 .map(|rep| RepDefSlicer::new(self, rep.clone()))
295 }
296
297 pub fn def_slicer(&self) -> Option<RepDefSlicer<'_>> {
298 self.definition_levels
299 .as_ref()
300 .map(|def| RepDefSlicer::new(self, def.clone()))
301 }
302}
303
304#[derive(Debug)]
312pub struct RepDefSlicer<'a> {
313 repdef: &'a SerializedRepDefs,
314 to_slice: LanceBuffer,
315 current: usize,
316}
317
318impl<'a> RepDefSlicer<'a> {
320 fn new(repdef: &'a SerializedRepDefs, levels: Arc<[u16]>) -> Self {
321 Self {
322 repdef,
323 to_slice: LanceBuffer::reinterpret_slice(levels),
324 current: 0,
325 }
326 }
327
328 pub fn num_levels(&self) -> usize {
329 self.to_slice.len() / 2
330 }
331
332 pub fn num_levels_remaining(&self) -> usize {
333 self.num_levels() - self.current
334 }
335
336 pub fn all_levels(&self) -> &LanceBuffer {
337 &self.to_slice
338 }
339
340 pub fn slice_rest(&mut self) -> LanceBuffer {
349 let start = self.current;
350 let remaining = self.num_levels_remaining();
351 self.current = self.num_levels();
352 self.to_slice.slice_with_length(start * 2, remaining * 2)
353 }
354
355 pub fn slice_next(&mut self, num_values: usize) -> LanceBuffer {
357 let start = self.current;
358 let Some(max_visible_level) = self.repdef.max_visible_level else {
359 self.current = start + num_values;
361 return self.to_slice.slice_with_length(start * 2, num_values * 2);
362 };
363 if let Some(def) = self.repdef.definition_levels.as_ref() {
364 let mut def_itr = def[start..].iter();
368 let mut num_taken = 0;
369 let mut num_passed = 0;
370 while num_taken < num_values {
371 let def_level = *def_itr.next().unwrap();
372 if def_level <= max_visible_level {
373 num_taken += 1;
374 }
375 num_passed += 1;
376 }
377 self.current = start + num_passed;
378 self.to_slice.slice_with_length(start * 2, num_passed * 2)
379 } else {
380 self.current = start + num_values;
382 self.to_slice.slice_with_length(start * 2, num_values * 2)
383 }
384 }
385}
386
387#[derive(Debug, Copy, Clone, PartialEq, Eq)]
400pub enum DefinitionInterpretation {
401 AllValidItem,
402 AllValidList,
403 NullableItem,
404 NullableList,
405 EmptyableList,
406 NullableAndEmptyableList,
407}
408
409impl DefinitionInterpretation {
410 pub fn num_def_levels(&self) -> u16 {
412 match self {
413 Self::AllValidItem => 0,
414 Self::AllValidList => 0,
415 Self::NullableItem => 1,
416 Self::NullableList => 1,
417 Self::EmptyableList => 1,
418 Self::NullableAndEmptyableList => 2,
419 }
420 }
421
422 pub fn is_all_valid(&self) -> bool {
424 matches!(
425 self,
426 Self::AllValidItem | Self::AllValidList | Self::EmptyableList
427 )
428 }
429
430 pub fn is_list(&self) -> bool {
432 matches!(
433 self,
434 Self::AllValidList
435 | Self::NullableList
436 | Self::EmptyableList
437 | Self::NullableAndEmptyableList
438 )
439 }
440}
441
442#[derive(Debug)]
454struct SerializerContext {
455 def_meaning: Vec<DefinitionInterpretation>,
457 rep_levels: LevelBuffer,
458 spare_rep: LevelBuffer,
459 def_levels: LevelBuffer,
460 spare_def: LevelBuffer,
461 current_rep: u16,
462 current_def: u16,
463 current_len: usize,
464 current_num_specials: usize,
465}
466
467impl SerializerContext {
468 fn new(len: usize, num_layers: usize, max_rep: u16, max_def: u16) -> Self {
469 let def_meaning = Vec::with_capacity(num_layers);
470 Self {
471 rep_levels: if max_rep > 0 {
472 vec![0; len]
473 } else {
474 LevelBuffer::default()
475 },
476 spare_rep: if max_rep > 0 {
477 vec![0; len]
478 } else {
479 LevelBuffer::default()
480 },
481 def_levels: if max_def > 0 {
482 vec![0; len]
483 } else {
484 LevelBuffer::default()
485 },
486 spare_def: if max_def > 0 {
487 vec![0; len]
488 } else {
489 LevelBuffer::default()
490 },
491 def_meaning,
492 current_rep: max_rep,
493 current_def: max_def,
494 current_len: 0,
495 current_num_specials: 0,
496 }
497 }
498
499 fn checkout_def(&mut self, meaning: DefinitionInterpretation) -> u16 {
500 let def = self.current_def;
501 self.current_def -= meaning.num_def_levels();
502 self.def_meaning.push(meaning);
503 def
504 }
505
506 fn record_offsets(&mut self, offset_desc: &OffsetDesc) {
507 let rep_level = self.current_rep;
508 let (null_list_level, empty_list_level) =
509 match (offset_desc.validity.is_some(), offset_desc.has_empty_lists) {
510 (true, true) => {
511 let level =
512 self.checkout_def(DefinitionInterpretation::NullableAndEmptyableList);
513 (level - 1, level)
514 }
515 (true, false) => (self.checkout_def(DefinitionInterpretation::NullableList), 0),
516 (false, true) => (
517 0,
518 self.checkout_def(DefinitionInterpretation::EmptyableList),
519 ),
520 (false, false) => {
521 self.checkout_def(DefinitionInterpretation::AllValidList);
522 (0, 0)
523 }
524 };
525 self.current_rep -= 1;
526
527 if let Some(validity) = &offset_desc.validity {
528 self.do_record_validity(validity, null_list_level);
529 }
530
531 let mut new_len = 0;
536 assert!(self.rep_levels.len() >= (offset_desc.num_values + self.current_num_specials) - 1);
537 if self.def_levels.is_empty() {
538 let mut write_itr = self.spare_rep.iter_mut();
539 let mut read_iter = self.rep_levels.iter().copied();
540 for w in offset_desc.offsets.windows(2) {
541 let len = w[1] - w[0];
542 assert!(len > 0);
544 let rep = read_iter.next().unwrap();
545 let list_level = if rep == 0 { rep_level } else { rep };
546 *write_itr.next().unwrap() = list_level;
547
548 for _ in 1..len {
549 *write_itr.next().unwrap() = 0;
550 }
551 new_len += len as usize;
552 }
553 std::mem::swap(&mut self.rep_levels, &mut self.spare_rep);
554 } else {
555 assert!(
556 self.def_levels.len() >= (offset_desc.num_values + self.current_num_specials) - 1
557 );
558 let mut def_write_itr = self.spare_def.iter_mut();
559 let mut rep_write_itr = self.spare_rep.iter_mut();
560 let mut rep_read_itr = self.rep_levels.iter().copied();
561 let mut def_read_itr = self.def_levels.iter().copied();
562 let specials_to_pass = self.current_num_specials;
563 let mut specials_passed = 0;
564
565 for w in offset_desc.offsets.windows(2) {
566 let mut def = def_read_itr.next().unwrap();
567 while def > SPECIAL_THRESHOLD {
569 *def_write_itr.next().unwrap() = def;
570 *rep_write_itr.next().unwrap() = rep_read_itr.next().unwrap();
571 def = def_read_itr.next().unwrap();
572 new_len += 1;
573 specials_passed += 1;
574 }
575
576 let len = w[1] - w[0];
577 let rep = rep_read_itr.next().unwrap();
578
579 let list_level = if rep == 0 { rep_level } else { rep };
583
584 if def == 0 && len > 0 {
585 *def_write_itr.next().unwrap() = 0;
587 *rep_write_itr.next().unwrap() = list_level;
588
589 for _ in 1..len {
590 *def_write_itr.next().unwrap() = 0;
591 *rep_write_itr.next().unwrap() = 0;
592 }
593
594 new_len += len as usize;
595 } else if def == 0 {
596 *def_write_itr.next().unwrap() = empty_list_level + SPECIAL_THRESHOLD;
598 *rep_write_itr.next().unwrap() = list_level;
599 new_len += 1;
600 } else {
601 *def_write_itr.next().unwrap() = def + SPECIAL_THRESHOLD;
604 *rep_write_itr.next().unwrap() = list_level;
605 new_len += 1;
606 }
607 }
608
609 while specials_passed < specials_to_pass {
611 *def_write_itr.next().unwrap() = def_read_itr.next().unwrap();
612 *rep_write_itr.next().unwrap() = rep_read_itr.next().unwrap();
613 new_len += 1;
614 specials_passed += 1;
615 }
616 std::mem::swap(&mut self.def_levels, &mut self.spare_def);
617 std::mem::swap(&mut self.rep_levels, &mut self.spare_rep);
618 }
619
620 self.current_len = new_len;
621 self.current_num_specials += offset_desc.num_specials;
622 }
623
624 fn do_record_validity(&mut self, validity: &BooleanBuffer, null_level: u16) {
625 assert!(self.def_levels.len() >= validity.len() + self.current_num_specials);
626 debug_assert!(
627 self.current_len == 0 || self.current_len == validity.len() + self.current_num_specials
628 );
629 self.current_len = validity.len();
630
631 let mut def_read_itr = self.def_levels.iter().copied();
632 let mut def_write_itr = self.spare_def.iter_mut();
633
634 let specials_to_pass = self.current_num_specials;
635 let mut specials_passed = 0;
636
637 for incoming_validity in validity.iter() {
638 let mut def = def_read_itr.next().unwrap();
639 while def > SPECIAL_THRESHOLD {
640 *def_write_itr.next().unwrap() = def;
641 def = def_read_itr.next().unwrap();
642 specials_passed += 1;
643 }
644 if def == 0 && !incoming_validity {
645 *def_write_itr.next().unwrap() = null_level;
646 } else {
647 *def_write_itr.next().unwrap() = def;
648 }
649 }
650
651 while specials_passed < specials_to_pass {
652 *def_write_itr.next().unwrap() = def_read_itr.next().unwrap();
653 specials_passed += 1;
654 }
655
656 std::mem::swap(&mut self.def_levels, &mut self.spare_def);
657 }
658
659 fn multiply_levels(&mut self, multiplier: usize) {
660 let old_len = self.current_len;
661 self.current_len =
663 (self.current_len - self.current_num_specials) * multiplier + self.current_num_specials;
664
665 if self.rep_levels.is_empty() && self.def_levels.is_empty() {
666 return;
668 } else if self.rep_levels.is_empty() {
669 assert!(self.def_levels.len() >= self.current_len);
670 let mut def_read_itr = self.def_levels.iter().copied();
672 let mut def_write_itr = self.spare_def.iter_mut();
673 for _ in 0..old_len {
674 let mut def = def_read_itr.next().unwrap();
675 while def > SPECIAL_THRESHOLD {
676 *def_write_itr.next().unwrap() = def;
677 def = def_read_itr.next().unwrap();
678 }
679 for _ in 0..multiplier {
680 *def_write_itr.next().unwrap() = def;
681 }
682 }
683 } else if self.def_levels.is_empty() {
684 assert!(self.rep_levels.len() >= self.current_len);
685 let mut rep_read_itr = self.rep_levels.iter().copied();
687 let mut rep_write_itr = self.spare_rep.iter_mut();
688 for _ in 0..old_len {
689 let rep = rep_read_itr.next().unwrap();
690 for _ in 0..multiplier {
691 *rep_write_itr.next().unwrap() = rep;
692 }
693 }
694 } else {
695 assert!(self.rep_levels.len() >= self.current_len);
696 assert!(self.def_levels.len() >= self.current_len);
697 let mut rep_read_itr = self.rep_levels.iter().copied();
698 let mut def_read_itr = self.def_levels.iter().copied();
699 let mut rep_write_itr = self.spare_rep.iter_mut();
700 let mut def_write_itr = self.spare_def.iter_mut();
701 for _ in 0..old_len {
702 let mut def = def_read_itr.next().unwrap();
703 while def > SPECIAL_THRESHOLD {
704 *def_write_itr.next().unwrap() = def;
705 *rep_write_itr.next().unwrap() = rep_read_itr.next().unwrap();
706 def = def_read_itr.next().unwrap();
707 }
708 let rep = rep_read_itr.next().unwrap();
709 for _ in 0..multiplier {
710 *def_write_itr.next().unwrap() = def;
711 *rep_write_itr.next().unwrap() = rep;
712 }
713 }
714 }
715 std::mem::swap(&mut self.def_levels, &mut self.spare_def);
716 std::mem::swap(&mut self.rep_levels, &mut self.spare_rep);
717 }
718
719 fn record_validity_buf(&mut self, validity: &Option<BooleanBuffer>) {
720 if let Some(validity) = validity {
721 let def_level = self.checkout_def(DefinitionInterpretation::NullableItem);
722 self.do_record_validity(validity, def_level);
723 } else {
724 self.checkout_def(DefinitionInterpretation::AllValidItem);
725 }
726 }
727
728 fn record_validity(&mut self, validity_desc: &ValidityDesc) {
729 self.record_validity_buf(&validity_desc.validity)
730 }
731
732 fn record_fsl(&mut self, fsl_desc: &FslDesc) {
733 self.record_validity_buf(&fsl_desc.validity);
734 self.multiply_levels(fsl_desc.dimension);
735 }
736
737 fn normalize_specials(&mut self) {
738 for def in self.def_levels.iter_mut() {
739 if *def > SPECIAL_THRESHOLD {
740 *def -= SPECIAL_THRESHOLD;
741 }
742 }
743 }
744
745 fn build(mut self) -> SerializedRepDefs {
746 if self.current_len == 0 {
747 return SerializedRepDefs::new(None, None, self.def_meaning);
748 }
749
750 self.normalize_specials();
751
752 let definition_levels = if self.def_levels.is_empty() {
753 None
754 } else {
755 Some(self.def_levels)
756 };
757 let repetition_levels = if self.rep_levels.is_empty() {
758 None
759 } else {
760 Some(self.rep_levels)
761 };
762
763 let def_meaning = self.def_meaning.into_iter().rev().collect::<Vec<_>>();
765
766 SerializedRepDefs::new(repetition_levels, definition_levels, def_meaning)
767 }
768}
769
770#[derive(Clone, Default, Debug)]
777pub struct RepDefBuilder {
778 repdefs: Vec<RawRepDef>,
780 len: Option<usize>,
785}
786
787impl RepDefBuilder {
788 fn check_validity_len(&mut self, incoming_len: usize) {
789 if let Some(len) = self.len {
790 assert_eq!(incoming_len, len);
791 } else {
792 self.len = Some(incoming_len);
794 }
795 }
796
797 fn num_layers(&self) -> usize {
798 self.repdefs.len()
799 }
800
801 pub fn is_empty(&self) -> bool {
804 self.repdefs
805 .iter()
806 .all(|r| matches!(r, RawRepDef::Validity(ValidityDesc { validity: None, .. })))
807 }
808
809 pub fn is_simple_validity(&self) -> bool {
811 self.repdefs.len() == 1 && matches!(self.repdefs[0], RawRepDef::Validity(_))
812 }
813
814 pub fn add_validity_bitmap(&mut self, validity: NullBuffer) {
816 self.check_validity_len(validity.len());
817 self.repdefs.push(RawRepDef::Validity(ValidityDesc {
818 num_values: validity.len(),
819 validity: Some(validity.into_inner()),
820 }));
821 }
822
823 pub fn add_no_null(&mut self, len: usize) {
825 self.check_validity_len(len);
826 self.repdefs.push(RawRepDef::Validity(ValidityDesc {
827 validity: None,
828 num_values: len,
829 }));
830 }
831
832 pub fn add_fsl(&mut self, validity: Option<NullBuffer>, dimension: usize, num_values: usize) {
833 if let Some(len) = self.len {
834 assert_eq!(num_values, len);
835 }
836 self.len = Some(num_values * dimension);
837 debug_assert!(validity.is_none() || validity.as_ref().unwrap().len() == num_values);
838 self.repdefs.push(RawRepDef::Fsl(FslDesc {
839 num_values,
840 validity: validity.map(|v| v.into_inner()),
841 dimension,
842 }))
843 }
844
845 fn check_offset_len(&mut self, offsets: &[i64]) {
846 if let Some(len) = self.len {
847 assert!(offsets.len() == len + 1);
848 }
849 self.len = Some(offsets[offsets.len() - 1] as usize);
850 }
851
852 fn do_add_offsets(
853 &mut self,
854 lengths: impl Iterator<Item = i64>,
855 validity: Option<NullBuffer>,
856 capacity: usize,
857 ) -> bool {
858 let mut num_specials = 0;
859 let mut has_empty_lists = false;
860 let mut has_garbage_values = false;
861 let mut last_off: i64 = 0;
862
863 let mut normalized_offsets = Vec::with_capacity(capacity);
864 normalized_offsets.push(0);
865
866 if let Some(ref validity) = validity {
867 for (len, is_valid) in lengths.zip(validity.iter()) {
868 match (is_valid, len == 0) {
869 (false, is_empty) => {
870 num_specials += 1;
871 has_garbage_values |= !is_empty;
872 }
873 (true, true) => {
874 num_specials += 1;
875 has_empty_lists = true;
876 }
877 _ => {
878 last_off += len;
879 }
880 }
881 normalized_offsets.push(last_off);
882 }
883 } else {
884 for len in lengths {
885 if len == 0 {
886 num_specials += 1;
887 has_empty_lists = true;
888 }
889 last_off += len;
890 normalized_offsets.push(last_off);
891 }
892 }
893
894 self.check_offset_len(&normalized_offsets);
895 self.repdefs.push(RawRepDef::Offsets(OffsetDesc {
896 num_values: normalized_offsets.len() - 1,
897 offsets: normalized_offsets.into(),
898 validity: validity.map(|v| v.into_inner()),
899 has_empty_lists,
900 num_specials: num_specials as usize,
901 }));
902
903 has_garbage_values
904 }
905
906 pub fn add_offsets<O: OffsetSizeTrait>(
913 &mut self,
914 offsets: OffsetBuffer<O>,
915 validity: Option<NullBuffer>,
916 ) -> bool {
917 let inner = offsets.into_inner();
918 let buffer_len = inner.len();
919
920 if O::IS_LARGE {
921 let i64_buff = ScalarBuffer::<i64>::new(inner.into_inner(), 0, buffer_len);
922 let lengths = i64_buff.windows(2).map(|off| off[1] - off[0]);
923 self.do_add_offsets(lengths, validity, buffer_len)
924 } else {
925 let i32_buff = ScalarBuffer::<i32>::new(inner.into_inner(), 0, buffer_len);
926 let lengths = i32_buff.windows(2).map(|off| (off[1] - off[0]) as i64);
927 self.do_add_offsets(lengths, validity, buffer_len)
928 }
929 }
930
931 fn concat_layers<'a>(
943 layers: impl Iterator<Item = &'a RawRepDef>,
944 num_layers: usize,
945 ) -> RawRepDef {
946 enum LayerKind {
947 Validity,
948 Fsl,
949 Offsets,
950 }
951
952 let mut collected = Vec::with_capacity(num_layers);
955 let mut has_nulls = false;
956 let mut layer_kind = LayerKind::Validity;
957 let mut total_num_specials = 0;
958 let mut all_dimension = 0;
959 let mut all_has_empty_lists = false;
960 let mut all_num_values = 0;
961 for layer in layers {
962 has_nulls |= layer.has_nulls();
963 match layer {
964 RawRepDef::Validity(_) => {
965 layer_kind = LayerKind::Validity;
966 }
967 RawRepDef::Offsets(OffsetDesc {
968 num_specials,
969 has_empty_lists,
970 ..
971 }) => {
972 all_has_empty_lists |= *has_empty_lists;
973 layer_kind = LayerKind::Offsets;
974 total_num_specials += num_specials;
975 }
976 RawRepDef::Fsl(FslDesc { dimension, .. }) => {
977 layer_kind = LayerKind::Fsl;
978 all_dimension = *dimension;
979 }
980 }
981 collected.push(layer);
982 all_num_values += layer.num_values();
983 }
984
985 if !has_nulls {
987 match layer_kind {
988 LayerKind::Validity => {
989 return RawRepDef::Validity(ValidityDesc {
990 validity: None,
991 num_values: all_num_values,
992 });
993 }
994 LayerKind::Fsl => {
995 return RawRepDef::Fsl(FslDesc {
996 validity: None,
997 num_values: all_num_values,
998 dimension: all_dimension,
999 })
1000 }
1001 LayerKind::Offsets => {}
1002 }
1003 }
1004
1005 let mut validity_builder = if has_nulls {
1007 BooleanBufferBuilder::new(all_num_values)
1008 } else {
1009 BooleanBufferBuilder::new(0)
1010 };
1011 let mut all_offsets = if matches!(layer_kind, LayerKind::Offsets) {
1012 let mut all_offsets = Vec::with_capacity(all_num_values);
1013 all_offsets.push(0);
1014 all_offsets
1015 } else {
1016 Vec::new()
1017 };
1018
1019 for layer in collected {
1020 match layer {
1021 RawRepDef::Validity(ValidityDesc {
1022 validity: Some(validity),
1023 ..
1024 }) => {
1025 validity_builder.append_buffer(validity);
1026 }
1027 RawRepDef::Validity(ValidityDesc {
1028 validity: None,
1029 num_values,
1030 }) => {
1031 validity_builder.append_n(*num_values, true);
1032 }
1033 RawRepDef::Fsl(FslDesc {
1034 validity,
1035 num_values,
1036 ..
1037 }) => {
1038 if let Some(validity) = validity {
1039 validity_builder.append_buffer(validity);
1040 } else {
1041 validity_builder.append_n(*num_values, true);
1042 }
1043 }
1044 RawRepDef::Offsets(OffsetDesc {
1045 offsets,
1046 validity: Some(validity),
1047 has_empty_lists,
1048 ..
1049 }) => {
1050 all_has_empty_lists |= has_empty_lists;
1051 validity_builder.append_buffer(validity);
1052 let last = *all_offsets.last().unwrap();
1053 all_offsets.extend(offsets.iter().skip(1).map(|off| *off + last));
1054 }
1055 RawRepDef::Offsets(OffsetDesc {
1056 offsets,
1057 validity: None,
1058 has_empty_lists,
1059 num_values,
1060 ..
1061 }) => {
1062 all_has_empty_lists |= has_empty_lists;
1063 if has_nulls {
1064 validity_builder.append_n(*num_values, true);
1065 }
1066 let last = *all_offsets.last().unwrap();
1067 all_offsets.extend(offsets.iter().skip(1).map(|off| *off + last));
1068 }
1069 }
1070 }
1071 let validity = if has_nulls {
1072 Some(validity_builder.finish())
1073 } else {
1074 None
1075 };
1076 match layer_kind {
1077 LayerKind::Fsl => RawRepDef::Fsl(FslDesc {
1078 validity,
1079 num_values: all_num_values,
1080 dimension: all_dimension,
1081 }),
1082 LayerKind::Validity => RawRepDef::Validity(ValidityDesc {
1083 validity,
1084 num_values: all_num_values,
1085 }),
1086 LayerKind::Offsets => RawRepDef::Offsets(OffsetDesc {
1087 offsets: all_offsets.into(),
1088 validity,
1089 has_empty_lists: all_has_empty_lists,
1090 num_values: all_num_values,
1091 num_specials: total_num_specials,
1092 }),
1093 }
1094 }
1095
1096 pub fn serialize(builders: Vec<Self>) -> SerializedRepDefs {
1099 assert!(!builders.is_empty());
1100 if builders.iter().all(|b| b.is_empty()) {
1101 return SerializedRepDefs::empty(
1103 builders
1104 .first()
1105 .unwrap()
1106 .repdefs
1107 .iter()
1108 .map(|_| DefinitionInterpretation::AllValidItem)
1109 .collect::<Vec<_>>(),
1110 );
1111 }
1112
1113 let num_layers = builders[0].num_layers();
1114 let combined_layers = (0..num_layers)
1115 .map(|layer_index| {
1116 Self::concat_layers(
1117 builders.iter().map(|b| &b.repdefs[layer_index]),
1118 builders.len(),
1119 )
1120 })
1121 .collect::<Vec<_>>();
1122 debug_assert!(builders
1123 .iter()
1124 .all(|b| b.num_layers() == builders[0].num_layers()));
1125
1126 let total_len = combined_layers.last().unwrap().num_values()
1127 + combined_layers
1128 .iter()
1129 .map(|l| l.num_specials())
1130 .sum::<usize>();
1131 let max_rep = combined_layers.iter().map(|l| l.max_rep()).sum::<u16>();
1132 let max_def = combined_layers.iter().map(|l| l.max_def()).sum::<u16>();
1133
1134 let mut context = SerializerContext::new(total_len, num_layers, max_rep, max_def);
1135 for layer in combined_layers.into_iter() {
1136 match layer {
1137 RawRepDef::Validity(def) => {
1138 context.record_validity(&def);
1139 }
1140 RawRepDef::Offsets(rep) => {
1141 context.record_offsets(&rep);
1142 }
1143 RawRepDef::Fsl(fsl) => {
1144 context.record_fsl(&fsl);
1145 }
1146 }
1147 }
1148 context.build()
1149 }
1150}
1151
1152#[derive(Debug)]
1157pub struct RepDefUnraveler {
1158 rep_levels: Option<LevelBuffer>,
1159 def_levels: Option<LevelBuffer>,
1160 levels_to_rep: Vec<u16>,
1162 def_meaning: Arc<[DefinitionInterpretation]>,
1163 current_def_cmp: u16,
1165 current_rep_cmp: u16,
1167 current_layer: usize,
1170}
1171
1172impl RepDefUnraveler {
1173 pub fn new(
1175 rep_levels: Option<LevelBuffer>,
1176 def_levels: Option<LevelBuffer>,
1177 def_meaning: Arc<[DefinitionInterpretation]>,
1178 ) -> Self {
1179 let mut levels_to_rep = Vec::with_capacity(def_meaning.len());
1180 let mut rep_counter = 0;
1181 levels_to_rep.push(0);
1183 for meaning in def_meaning.as_ref() {
1184 match meaning {
1185 DefinitionInterpretation::AllValidItem | DefinitionInterpretation::AllValidList => {
1186 }
1188 DefinitionInterpretation::NullableItem => {
1189 levels_to_rep.push(rep_counter);
1191 }
1192 DefinitionInterpretation::NullableList => {
1193 rep_counter += 1;
1194 levels_to_rep.push(rep_counter);
1195 }
1196 DefinitionInterpretation::EmptyableList => {
1197 rep_counter += 1;
1198 levels_to_rep.push(rep_counter);
1199 }
1200 DefinitionInterpretation::NullableAndEmptyableList => {
1201 rep_counter += 1;
1202 levels_to_rep.push(rep_counter);
1203 levels_to_rep.push(rep_counter);
1204 }
1205 }
1206 }
1207 Self {
1208 rep_levels,
1209 def_levels,
1210 current_def_cmp: 0,
1211 current_rep_cmp: 0,
1212 levels_to_rep,
1213 current_layer: 0,
1214 def_meaning,
1215 }
1216 }
1217
1218 pub fn is_all_valid(&self) -> bool {
1219 self.def_meaning[self.current_layer].is_all_valid()
1220 }
1221
1222 pub fn max_lists(&self) -> usize {
1228 debug_assert!(
1229 self.def_meaning[self.current_layer] != DefinitionInterpretation::NullableItem
1230 );
1231 self.rep_levels
1232 .as_ref()
1233 .map(|levels| levels.len())
1235 .unwrap_or(0)
1236 }
1237
1238 pub fn unravel_offsets<T: ArrowNativeType>(
1243 &mut self,
1244 offsets: &mut Vec<T>,
1245 validity: Option<&mut BooleanBufferBuilder>,
1246 ) -> Result<()> {
1247 let rep_levels = self
1248 .rep_levels
1249 .as_mut()
1250 .expect("Expected repetition level but data didn't contain repetition");
1251 let valid_level = self.current_def_cmp;
1252 let (null_level, empty_level) = match self.def_meaning[self.current_layer] {
1253 DefinitionInterpretation::NullableList => {
1254 self.current_def_cmp += 1;
1255 (valid_level + 1, 0)
1256 }
1257 DefinitionInterpretation::EmptyableList => {
1258 self.current_def_cmp += 1;
1259 (0, valid_level + 1)
1260 }
1261 DefinitionInterpretation::NullableAndEmptyableList => {
1262 self.current_def_cmp += 2;
1263 (valid_level + 1, valid_level + 2)
1264 }
1265 DefinitionInterpretation::AllValidList => (0, 0),
1266 _ => unreachable!(),
1267 };
1268 self.current_layer += 1;
1269
1270 let mut max_level = null_level.max(empty_level);
1274 let upper_null = max_level;
1277 for level in self.def_meaning[self.current_layer..].iter() {
1278 match level {
1279 DefinitionInterpretation::NullableItem => {
1280 max_level += 1;
1281 }
1282 DefinitionInterpretation::AllValidItem => {}
1283 _ => {
1284 break;
1285 }
1286 }
1287 }
1288
1289 let mut curlen: usize = offsets.last().map(|o| o.as_usize()).unwrap_or(0);
1290
1291 offsets.pop();
1299
1300 let to_offset = |val: usize| {
1301 T::from_usize(val)
1302 .ok_or_else(|| Error::invalid_input("A single batch had more than i32::MAX values and so a large container type is required", location!()))
1303 };
1304 self.current_rep_cmp += 1;
1305 if let Some(def_levels) = &mut self.def_levels {
1306 assert!(rep_levels.len() == def_levels.len());
1307 let mut push_validity: Box<dyn FnMut(bool)> = if let Some(validity) = validity {
1310 Box::new(|is_valid| validity.append(is_valid))
1311 } else {
1312 Box::new(|_| {})
1313 };
1314 let mut read_idx = 0;
1318 let mut write_idx = 0;
1319 while read_idx < rep_levels.len() {
1320 unsafe {
1323 let rep_val = *rep_levels.get_unchecked(read_idx);
1324 if rep_val != 0 {
1325 let def_val = *def_levels.get_unchecked(read_idx);
1326 *rep_levels.get_unchecked_mut(write_idx) = rep_val - 1;
1328 *def_levels.get_unchecked_mut(write_idx) = def_val;
1329 write_idx += 1;
1330
1331 if def_val == 0 {
1332 offsets.push(to_offset(curlen)?);
1334 curlen += 1;
1335 push_validity(true);
1336 } else if def_val > max_level {
1337 } else if def_val == null_level || def_val > upper_null {
1339 offsets.push(to_offset(curlen)?);
1341 push_validity(false);
1342 } else if def_val == empty_level {
1343 offsets.push(to_offset(curlen)?);
1345 push_validity(true);
1346 } else {
1347 offsets.push(to_offset(curlen)?);
1349 curlen += 1;
1350 push_validity(true);
1351 }
1352 } else {
1353 curlen += 1;
1354 }
1355 read_idx += 1;
1356 }
1357 }
1358 offsets.push(to_offset(curlen)?);
1359 rep_levels.truncate(write_idx);
1360 def_levels.truncate(write_idx);
1361 Ok(())
1362 } else {
1363 let mut read_idx = 0;
1365 let mut write_idx = 0;
1366 let old_offsets_len = offsets.len();
1367 while read_idx < rep_levels.len() {
1368 unsafe {
1370 let rep_val = *rep_levels.get_unchecked(read_idx);
1371 if rep_val != 0 {
1372 offsets.push(to_offset(curlen)?);
1374 *rep_levels.get_unchecked_mut(write_idx) = rep_val - 1;
1375 write_idx += 1;
1376 }
1377 curlen += 1;
1378 read_idx += 1;
1379 }
1380 }
1381 let num_new_lists = offsets.len() - old_offsets_len;
1382 offsets.push(to_offset(curlen)?);
1383 rep_levels.truncate(offsets.len() - 1);
1384 if let Some(validity) = validity {
1385 validity.append_n(num_new_lists, true);
1388 }
1389 Ok(())
1390 }
1391 }
1392
1393 pub fn skip_validity(&mut self) {
1394 debug_assert!(
1395 self.def_meaning[self.current_layer] == DefinitionInterpretation::AllValidItem
1396 );
1397 self.current_layer += 1;
1398 }
1399
1400 pub fn unravel_validity(&mut self, validity: &mut BooleanBufferBuilder) {
1402 debug_assert!(
1403 self.def_meaning[self.current_layer] != DefinitionInterpretation::AllValidItem
1404 );
1405 self.current_layer += 1;
1406
1407 let def_levels = &self.def_levels.as_ref().unwrap();
1408
1409 let current_def_cmp = self.current_def_cmp;
1410 self.current_def_cmp += 1;
1411
1412 for is_valid in def_levels.iter().filter_map(|&level| {
1413 if self.levels_to_rep[level as usize] <= self.current_rep_cmp {
1414 Some(level <= current_def_cmp)
1415 } else {
1416 None
1417 }
1418 }) {
1419 validity.append(is_valid);
1420 }
1421 }
1422
1423 pub fn decimate(&mut self, dimension: usize) {
1424 if self.rep_levels.is_some() {
1425 todo!("Not yet supported FSL<...List<...>>");
1437 }
1438 let Some(def_levels) = self.def_levels.as_mut() else {
1439 return;
1440 };
1441 let mut read_idx = 0;
1442 let mut write_idx = 0;
1443 while read_idx < def_levels.len() {
1444 unsafe {
1445 *def_levels.get_unchecked_mut(write_idx) = *def_levels.get_unchecked(read_idx);
1446 }
1447 write_idx += 1;
1448 read_idx += dimension;
1449 }
1450 def_levels.truncate(write_idx);
1451 }
1452}
1453
1454#[derive(Debug)]
1468pub struct CompositeRepDefUnraveler {
1469 unravelers: Vec<RepDefUnraveler>,
1470}
1471
1472impl CompositeRepDefUnraveler {
1473 pub fn new(unravelers: Vec<RepDefUnraveler>) -> Self {
1474 Self { unravelers }
1475 }
1476
1477 pub fn unravel_validity(&mut self, num_values: usize) -> Option<NullBuffer> {
1481 let is_all_valid = self
1482 .unravelers
1483 .iter()
1484 .all(|unraveler| unraveler.is_all_valid());
1485
1486 if is_all_valid {
1487 for unraveler in self.unravelers.iter_mut() {
1488 unraveler.skip_validity();
1489 }
1490 None
1491 } else {
1492 let mut validity = BooleanBufferBuilder::new(num_values);
1493 for unraveler in self.unravelers.iter_mut() {
1494 unraveler.unravel_validity(&mut validity);
1495 }
1496 Some(NullBuffer::new(validity.finish()))
1497 }
1498 }
1499
1500 pub fn unravel_fsl_validity(
1501 &mut self,
1502 num_values: usize,
1503 dimension: usize,
1504 ) -> Option<NullBuffer> {
1505 for unraveler in self.unravelers.iter_mut() {
1506 unraveler.decimate(dimension);
1507 }
1508 self.unravel_validity(num_values)
1509 }
1510
1511 pub fn unravel_offsets<T: ArrowNativeType>(
1513 &mut self,
1514 ) -> Result<(OffsetBuffer<T>, Option<NullBuffer>)> {
1515 let mut is_all_valid = true;
1516 let mut max_num_lists = 0;
1517 for unraveler in self.unravelers.iter() {
1518 is_all_valid &= unraveler.is_all_valid();
1519 max_num_lists += unraveler.max_lists();
1520 }
1521
1522 let mut validity = if is_all_valid {
1523 None
1524 } else {
1525 Some(BooleanBufferBuilder::new(max_num_lists))
1528 };
1529
1530 let mut offsets = Vec::with_capacity(max_num_lists + 1);
1531
1532 for unraveler in self.unravelers.iter_mut() {
1533 unraveler.unravel_offsets(&mut offsets, validity.as_mut())?;
1534 }
1535
1536 Ok((
1537 OffsetBuffer::new(ScalarBuffer::from(offsets)),
1538 validity.map(|mut v| NullBuffer::new(v.finish())),
1539 ))
1540 }
1541}
1542
1543#[derive(Debug)]
1549pub struct BinaryControlWordIterator<I: Iterator<Item = (u16, u16)>, W> {
1550 repdef: I,
1551 def_width: usize,
1552 max_rep: u16,
1553 max_visible_def: u16,
1554 rep_mask: u16,
1555 def_mask: u16,
1556 bits_rep: u8,
1557 bits_def: u8,
1558 phantom: std::marker::PhantomData<W>,
1559}
1560
1561impl<I: Iterator<Item = (u16, u16)>> BinaryControlWordIterator<I, u8> {
1562 fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1563 let next = self.repdef.next()?;
1564 let control_word: u8 =
1565 (((next.0 & self.rep_mask) as u8) << self.def_width) + ((next.1 & self.def_mask) as u8);
1566 buf.push(control_word);
1567 let is_new_row = next.0 == self.max_rep;
1568 let is_visible = next.1 <= self.max_visible_def;
1569 let is_valid_item = next.1 == 0;
1570 Some(ControlWordDesc {
1571 is_new_row,
1572 is_visible,
1573 is_valid_item,
1574 })
1575 }
1576}
1577
1578impl<I: Iterator<Item = (u16, u16)>> BinaryControlWordIterator<I, u16> {
1579 fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1580 let next = self.repdef.next()?;
1581 let control_word: u16 =
1582 ((next.0 & self.rep_mask) << self.def_width) + (next.1 & self.def_mask);
1583 let control_word = control_word.to_le_bytes();
1584 buf.push(control_word[0]);
1585 buf.push(control_word[1]);
1586 let is_new_row = next.0 == self.max_rep;
1587 let is_visible = next.1 <= self.max_visible_def;
1588 let is_valid_item = next.1 == 0;
1589 Some(ControlWordDesc {
1590 is_new_row,
1591 is_visible,
1592 is_valid_item,
1593 })
1594 }
1595}
1596
1597impl<I: Iterator<Item = (u16, u16)>> BinaryControlWordIterator<I, u32> {
1598 fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1599 let next = self.repdef.next()?;
1600 let control_word: u32 = (((next.0 & self.rep_mask) as u32) << self.def_width)
1601 + ((next.1 & self.def_mask) as u32);
1602 let control_word = control_word.to_le_bytes();
1603 buf.push(control_word[0]);
1604 buf.push(control_word[1]);
1605 buf.push(control_word[2]);
1606 buf.push(control_word[3]);
1607 let is_new_row = next.0 == self.max_rep;
1608 let is_visible = next.1 <= self.max_visible_def;
1609 let is_valid_item = next.1 == 0;
1610 Some(ControlWordDesc {
1611 is_new_row,
1612 is_visible,
1613 is_valid_item,
1614 })
1615 }
1616}
1617
1618#[derive(Debug)]
1620pub struct UnaryControlWordIterator<I: Iterator<Item = u16>, W> {
1621 repdef: I,
1622 level_mask: u16,
1623 bits_rep: u8,
1624 bits_def: u8,
1625 max_rep: u16,
1626 phantom: std::marker::PhantomData<W>,
1627}
1628
1629impl<I: Iterator<Item = u16>> UnaryControlWordIterator<I, u8> {
1630 fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1631 let next = self.repdef.next()?;
1632 buf.push((next & self.level_mask) as u8);
1633 let is_new_row = self.max_rep == 0 || next == self.max_rep;
1634 let is_valid_item = next == 0 || self.bits_def == 0;
1635 Some(ControlWordDesc {
1636 is_new_row,
1637 is_visible: true,
1640 is_valid_item,
1641 })
1642 }
1643}
1644
1645impl<I: Iterator<Item = u16>> UnaryControlWordIterator<I, u16> {
1646 fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1647 let next = self.repdef.next().unwrap() & self.level_mask;
1648 let control_word = next.to_le_bytes();
1649 buf.push(control_word[0]);
1650 buf.push(control_word[1]);
1651 let is_new_row = self.max_rep == 0 || next == self.max_rep;
1652 let is_valid_item = next == 0 || self.bits_def == 0;
1653 Some(ControlWordDesc {
1654 is_new_row,
1655 is_visible: true,
1656 is_valid_item,
1657 })
1658 }
1659}
1660
1661impl<I: Iterator<Item = u16>> UnaryControlWordIterator<I, u32> {
1662 fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1663 let next = self.repdef.next()?;
1664 let next = (next & self.level_mask) as u32;
1665 let control_word = next.to_le_bytes();
1666 buf.push(control_word[0]);
1667 buf.push(control_word[1]);
1668 buf.push(control_word[2]);
1669 buf.push(control_word[3]);
1670 let is_new_row = self.max_rep == 0 || next as u16 == self.max_rep;
1671 let is_valid_item = next == 0 || self.bits_def == 0;
1672 Some(ControlWordDesc {
1673 is_new_row,
1674 is_visible: true,
1675 is_valid_item,
1676 })
1677 }
1678}
1679
1680#[derive(Debug)]
1682pub struct NilaryControlWordIterator {
1683 len: usize,
1684 idx: usize,
1685}
1686
1687impl NilaryControlWordIterator {
1688 fn append_next(&mut self) -> Option<ControlWordDesc> {
1689 if self.idx == self.len {
1690 None
1691 } else {
1692 self.idx += 1;
1693 Some(ControlWordDesc {
1694 is_new_row: true,
1695 is_visible: true,
1696 is_valid_item: true,
1697 })
1698 }
1699 }
1700}
1701
1702fn get_mask(width: u16) -> u16 {
1704 (1 << width) - 1
1705}
1706
1707type SpecificBinaryControlWordIterator<'a, T> = BinaryControlWordIterator<
1710 Zip<Copied<std::slice::Iter<'a, u16>>, Copied<std::slice::Iter<'a, u16>>>,
1711 T,
1712>;
1713
1714#[derive(Debug)]
1724pub enum ControlWordIterator<'a> {
1725 Binary8(SpecificBinaryControlWordIterator<'a, u8>),
1726 Binary16(SpecificBinaryControlWordIterator<'a, u16>),
1727 Binary32(SpecificBinaryControlWordIterator<'a, u32>),
1728 Unary8(UnaryControlWordIterator<Copied<std::slice::Iter<'a, u16>>, u8>),
1729 Unary16(UnaryControlWordIterator<Copied<std::slice::Iter<'a, u16>>, u16>),
1730 Unary32(UnaryControlWordIterator<Copied<std::slice::Iter<'a, u16>>, u32>),
1731 Nilary(NilaryControlWordIterator),
1732}
1733
1734#[derive(Debug)]
1736pub struct ControlWordDesc {
1737 pub is_new_row: bool,
1738 pub is_visible: bool,
1739 pub is_valid_item: bool,
1740}
1741
1742impl ControlWordIterator<'_> {
1743 pub fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1747 match self {
1748 Self::Binary8(iter) => iter.append_next(buf),
1749 Self::Binary16(iter) => iter.append_next(buf),
1750 Self::Binary32(iter) => iter.append_next(buf),
1751 Self::Unary8(iter) => iter.append_next(buf),
1752 Self::Unary16(iter) => iter.append_next(buf),
1753 Self::Unary32(iter) => iter.append_next(buf),
1754 Self::Nilary(iter) => iter.append_next(),
1755 }
1756 }
1757
1758 pub fn has_repetition(&self) -> bool {
1760 match self {
1761 Self::Binary8(_) | Self::Binary16(_) | Self::Binary32(_) => true,
1762 Self::Unary8(iter) => iter.bits_rep > 0,
1763 Self::Unary16(iter) => iter.bits_rep > 0,
1764 Self::Unary32(iter) => iter.bits_rep > 0,
1765 Self::Nilary(_) => false,
1766 }
1767 }
1768
1769 pub fn bytes_per_word(&self) -> usize {
1771 match self {
1772 Self::Binary8(_) => 1,
1773 Self::Binary16(_) => 2,
1774 Self::Binary32(_) => 4,
1775 Self::Unary8(_) => 1,
1776 Self::Unary16(_) => 2,
1777 Self::Unary32(_) => 4,
1778 Self::Nilary(_) => 0,
1779 }
1780 }
1781
1782 pub fn bits_rep(&self) -> u8 {
1784 match self {
1785 Self::Binary8(iter) => iter.bits_rep,
1786 Self::Binary16(iter) => iter.bits_rep,
1787 Self::Binary32(iter) => iter.bits_rep,
1788 Self::Unary8(iter) => iter.bits_rep,
1789 Self::Unary16(iter) => iter.bits_rep,
1790 Self::Unary32(iter) => iter.bits_rep,
1791 Self::Nilary(_) => 0,
1792 }
1793 }
1794
1795 pub fn bits_def(&self) -> u8 {
1797 match self {
1798 Self::Binary8(iter) => iter.bits_def,
1799 Self::Binary16(iter) => iter.bits_def,
1800 Self::Binary32(iter) => iter.bits_def,
1801 Self::Unary8(iter) => iter.bits_def,
1802 Self::Unary16(iter) => iter.bits_def,
1803 Self::Unary32(iter) => iter.bits_def,
1804 Self::Nilary(_) => 0,
1805 }
1806 }
1807}
1808
1809pub fn build_control_word_iterator<'a>(
1813 rep: Option<&'a [u16]>,
1814 max_rep: u16,
1815 def: Option<&'a [u16]>,
1816 max_def: u16,
1817 max_visible_def: u16,
1818 len: usize,
1819) -> ControlWordIterator<'a> {
1820 let rep_width = if max_rep == 0 {
1821 0
1822 } else {
1823 log_2_ceil(max_rep as u32) as u16
1824 };
1825 let rep_mask = if max_rep == 0 { 0 } else { get_mask(rep_width) };
1826 let def_width = if max_def == 0 {
1827 0
1828 } else {
1829 log_2_ceil(max_def as u32) as u16
1830 };
1831 let def_mask = if max_def == 0 { 0 } else { get_mask(def_width) };
1832 let total_width = rep_width + def_width;
1833 match (rep, def) {
1834 (Some(rep), Some(def)) => {
1835 let iter = rep.iter().copied().zip(def.iter().copied());
1836 let def_width = def_width as usize;
1837 if total_width <= 8 {
1838 ControlWordIterator::Binary8(BinaryControlWordIterator {
1839 repdef: iter,
1840 rep_mask,
1841 def_mask,
1842 def_width,
1843 max_rep,
1844 max_visible_def,
1845 bits_rep: rep_width as u8,
1846 bits_def: def_width as u8,
1847 phantom: std::marker::PhantomData,
1848 })
1849 } else if total_width <= 16 {
1850 ControlWordIterator::Binary16(BinaryControlWordIterator {
1851 repdef: iter,
1852 rep_mask,
1853 def_mask,
1854 def_width,
1855 max_rep,
1856 max_visible_def,
1857 bits_rep: rep_width as u8,
1858 bits_def: def_width as u8,
1859 phantom: std::marker::PhantomData,
1860 })
1861 } else {
1862 ControlWordIterator::Binary32(BinaryControlWordIterator {
1863 repdef: iter,
1864 rep_mask,
1865 def_mask,
1866 def_width,
1867 max_rep,
1868 max_visible_def,
1869 bits_rep: rep_width as u8,
1870 bits_def: def_width as u8,
1871 phantom: std::marker::PhantomData,
1872 })
1873 }
1874 }
1875 (Some(lev), None) => {
1876 let iter = lev.iter().copied();
1877 if total_width <= 8 {
1878 ControlWordIterator::Unary8(UnaryControlWordIterator {
1879 repdef: iter,
1880 level_mask: rep_mask,
1881 bits_rep: total_width as u8,
1882 bits_def: 0,
1883 max_rep,
1884 phantom: std::marker::PhantomData,
1885 })
1886 } else if total_width <= 16 {
1887 ControlWordIterator::Unary16(UnaryControlWordIterator {
1888 repdef: iter,
1889 level_mask: rep_mask,
1890 bits_rep: total_width as u8,
1891 bits_def: 0,
1892 max_rep,
1893 phantom: std::marker::PhantomData,
1894 })
1895 } else {
1896 ControlWordIterator::Unary32(UnaryControlWordIterator {
1897 repdef: iter,
1898 level_mask: rep_mask,
1899 bits_rep: total_width as u8,
1900 bits_def: 0,
1901 max_rep,
1902 phantom: std::marker::PhantomData,
1903 })
1904 }
1905 }
1906 (None, Some(lev)) => {
1907 let iter = lev.iter().copied();
1908 if total_width <= 8 {
1909 ControlWordIterator::Unary8(UnaryControlWordIterator {
1910 repdef: iter,
1911 level_mask: def_mask,
1912 bits_rep: 0,
1913 bits_def: total_width as u8,
1914 max_rep: 0,
1915 phantom: std::marker::PhantomData,
1916 })
1917 } else if total_width <= 16 {
1918 ControlWordIterator::Unary16(UnaryControlWordIterator {
1919 repdef: iter,
1920 level_mask: def_mask,
1921 bits_rep: 0,
1922 bits_def: total_width as u8,
1923 max_rep: 0,
1924 phantom: std::marker::PhantomData,
1925 })
1926 } else {
1927 ControlWordIterator::Unary32(UnaryControlWordIterator {
1928 repdef: iter,
1929 level_mask: def_mask,
1930 bits_rep: 0,
1931 bits_def: total_width as u8,
1932 max_rep: 0,
1933 phantom: std::marker::PhantomData,
1934 })
1935 }
1936 }
1937 (None, None) => ControlWordIterator::Nilary(NilaryControlWordIterator { len, idx: 0 }),
1938 }
1939}
1940
1941#[derive(Copy, Clone, Debug)]
1945pub enum ControlWordParser {
1946 BOTH8(u8, u32),
1949 BOTH16(u8, u32),
1950 BOTH32(u8, u32),
1951 REP8,
1952 REP16,
1953 REP32,
1954 DEF8,
1955 DEF16,
1956 DEF32,
1957 NIL,
1958}
1959
1960impl ControlWordParser {
1961 fn parse_both<const WORD_SIZE: u8>(
1962 src: &[u8],
1963 dst_rep: &mut Vec<u16>,
1964 dst_def: &mut Vec<u16>,
1965 bits_to_shift: u8,
1966 mask_to_apply: u32,
1967 ) {
1968 match WORD_SIZE {
1969 1 => {
1970 let word = src[0];
1971 let rep = word >> bits_to_shift;
1972 let def = word & (mask_to_apply as u8);
1973 dst_rep.push(rep as u16);
1974 dst_def.push(def as u16);
1975 }
1976 2 => {
1977 let word = u16::from_le_bytes([src[0], src[1]]);
1978 let rep = word >> bits_to_shift;
1979 let def = word & mask_to_apply as u16;
1980 dst_rep.push(rep);
1981 dst_def.push(def);
1982 }
1983 4 => {
1984 let word = u32::from_le_bytes([src[0], src[1], src[2], src[3]]);
1985 let rep = word >> bits_to_shift;
1986 let def = word & mask_to_apply;
1987 dst_rep.push(rep as u16);
1988 dst_def.push(def as u16);
1989 }
1990 _ => unreachable!(),
1991 }
1992 }
1993
1994 fn parse_desc_both<const WORD_SIZE: u8>(
1995 src: &[u8],
1996 bits_to_shift: u8,
1997 mask_to_apply: u32,
1998 max_rep: u16,
1999 max_visible_def: u16,
2000 ) -> ControlWordDesc {
2001 match WORD_SIZE {
2002 1 => {
2003 let word = src[0];
2004 let rep = word >> bits_to_shift;
2005 let def = word & (mask_to_apply as u8);
2006 let is_visible = def as u16 <= max_visible_def;
2007 let is_new_row = rep as u16 == max_rep;
2008 let is_valid_item = def == 0;
2009 ControlWordDesc {
2010 is_visible,
2011 is_new_row,
2012 is_valid_item,
2013 }
2014 }
2015 2 => {
2016 let word = u16::from_le_bytes([src[0], src[1]]);
2017 let rep = word >> bits_to_shift;
2018 let def = word & mask_to_apply as u16;
2019 let is_visible = def <= max_visible_def;
2020 let is_new_row = rep == max_rep;
2021 let is_valid_item = def == 0;
2022 ControlWordDesc {
2023 is_visible,
2024 is_new_row,
2025 is_valid_item,
2026 }
2027 }
2028 4 => {
2029 let word = u32::from_le_bytes([src[0], src[1], src[2], src[3]]);
2030 let rep = word >> bits_to_shift;
2031 let def = word & mask_to_apply;
2032 let is_visible = def as u16 <= max_visible_def;
2033 let is_new_row = rep as u16 == max_rep;
2034 let is_valid_item = def == 0;
2035 ControlWordDesc {
2036 is_visible,
2037 is_new_row,
2038 is_valid_item,
2039 }
2040 }
2041 _ => unreachable!(),
2042 }
2043 }
2044
2045 fn parse_one<const WORD_SIZE: u8>(src: &[u8], dst: &mut Vec<u16>) {
2046 match WORD_SIZE {
2047 1 => {
2048 let word = src[0];
2049 dst.push(word as u16);
2050 }
2051 2 => {
2052 let word = u16::from_le_bytes([src[0], src[1]]);
2053 dst.push(word);
2054 }
2055 4 => {
2056 let word = u32::from_le_bytes([src[0], src[1], src[2], src[3]]);
2057 dst.push(word as u16);
2058 }
2059 _ => unreachable!(),
2060 }
2061 }
2062
2063 fn parse_rep_desc_one<const WORD_SIZE: u8>(src: &[u8], max_rep: u16) -> ControlWordDesc {
2064 match WORD_SIZE {
2065 1 => ControlWordDesc {
2066 is_new_row: src[0] as u16 == max_rep,
2067 is_visible: true,
2068 is_valid_item: true,
2069 },
2070 2 => ControlWordDesc {
2071 is_new_row: u16::from_le_bytes([src[0], src[1]]) == max_rep,
2072 is_visible: true,
2073 is_valid_item: true,
2074 },
2075 4 => ControlWordDesc {
2076 is_new_row: u32::from_le_bytes([src[0], src[1], src[2], src[3]]) as u16 == max_rep,
2077 is_visible: true,
2078 is_valid_item: true,
2079 },
2080 _ => unreachable!(),
2081 }
2082 }
2083
2084 fn parse_def_desc_one<const WORD_SIZE: u8>(src: &[u8]) -> ControlWordDesc {
2085 match WORD_SIZE {
2086 1 => ControlWordDesc {
2087 is_new_row: true,
2088 is_visible: true,
2089 is_valid_item: src[0] == 0,
2090 },
2091 2 => ControlWordDesc {
2092 is_new_row: true,
2093 is_visible: true,
2094 is_valid_item: u16::from_le_bytes([src[0], src[1]]) == 0,
2095 },
2096 4 => ControlWordDesc {
2097 is_new_row: true,
2098 is_visible: true,
2099 is_valid_item: u32::from_le_bytes([src[0], src[1], src[2], src[3]]) as u16 == 0,
2100 },
2101 _ => unreachable!(),
2102 }
2103 }
2104
2105 pub fn bytes_per_word(&self) -> usize {
2107 match self {
2108 Self::BOTH8(..) => 1,
2109 Self::BOTH16(..) => 2,
2110 Self::BOTH32(..) => 4,
2111 Self::REP8 => 1,
2112 Self::REP16 => 2,
2113 Self::REP32 => 4,
2114 Self::DEF8 => 1,
2115 Self::DEF16 => 2,
2116 Self::DEF32 => 4,
2117 Self::NIL => 0,
2118 }
2119 }
2120
2121 pub fn parse(&self, src: &[u8], dst_rep: &mut Vec<u16>, dst_def: &mut Vec<u16>) {
2128 match self {
2129 Self::BOTH8(bits_to_shift, mask_to_apply) => {
2130 Self::parse_both::<1>(src, dst_rep, dst_def, *bits_to_shift, *mask_to_apply)
2131 }
2132 Self::BOTH16(bits_to_shift, mask_to_apply) => {
2133 Self::parse_both::<2>(src, dst_rep, dst_def, *bits_to_shift, *mask_to_apply)
2134 }
2135 Self::BOTH32(bits_to_shift, mask_to_apply) => {
2136 Self::parse_both::<4>(src, dst_rep, dst_def, *bits_to_shift, *mask_to_apply)
2137 }
2138 Self::REP8 => Self::parse_one::<1>(src, dst_rep),
2139 Self::REP16 => Self::parse_one::<2>(src, dst_rep),
2140 Self::REP32 => Self::parse_one::<4>(src, dst_rep),
2141 Self::DEF8 => Self::parse_one::<1>(src, dst_def),
2142 Self::DEF16 => Self::parse_one::<2>(src, dst_def),
2143 Self::DEF32 => Self::parse_one::<4>(src, dst_def),
2144 Self::NIL => {}
2145 }
2146 }
2147
2148 pub fn has_rep(&self) -> bool {
2150 match self {
2151 Self::BOTH8(..)
2152 | Self::BOTH16(..)
2153 | Self::BOTH32(..)
2154 | Self::REP8
2155 | Self::REP16
2156 | Self::REP32 => true,
2157 Self::DEF8 | Self::DEF16 | Self::DEF32 | Self::NIL => false,
2158 }
2159 }
2160
2161 pub fn parse_desc(&self, src: &[u8], max_rep: u16, max_visible_def: u16) -> ControlWordDesc {
2163 match self {
2164 Self::BOTH8(bits_to_shift, mask_to_apply) => Self::parse_desc_both::<1>(
2165 src,
2166 *bits_to_shift,
2167 *mask_to_apply,
2168 max_rep,
2169 max_visible_def,
2170 ),
2171 Self::BOTH16(bits_to_shift, mask_to_apply) => Self::parse_desc_both::<2>(
2172 src,
2173 *bits_to_shift,
2174 *mask_to_apply,
2175 max_rep,
2176 max_visible_def,
2177 ),
2178 Self::BOTH32(bits_to_shift, mask_to_apply) => Self::parse_desc_both::<4>(
2179 src,
2180 *bits_to_shift,
2181 *mask_to_apply,
2182 max_rep,
2183 max_visible_def,
2184 ),
2185 Self::REP8 => Self::parse_rep_desc_one::<1>(src, max_rep),
2186 Self::REP16 => Self::parse_rep_desc_one::<2>(src, max_rep),
2187 Self::REP32 => Self::parse_rep_desc_one::<4>(src, max_rep),
2188 Self::DEF8 => Self::parse_def_desc_one::<1>(src),
2189 Self::DEF16 => Self::parse_def_desc_one::<2>(src),
2190 Self::DEF32 => Self::parse_def_desc_one::<4>(src),
2191 Self::NIL => ControlWordDesc {
2192 is_new_row: true,
2193 is_valid_item: true,
2194 is_visible: true,
2195 },
2196 }
2197 }
2198
2199 pub fn new(bits_rep: u8, bits_def: u8) -> Self {
2201 let total_bits = bits_rep + bits_def;
2202
2203 enum WordSize {
2204 One,
2205 Two,
2206 Four,
2207 }
2208
2209 let word_size = if total_bits <= 8 {
2210 WordSize::One
2211 } else if total_bits <= 16 {
2212 WordSize::Two
2213 } else {
2214 WordSize::Four
2215 };
2216
2217 match (bits_rep > 0, bits_def > 0, word_size) {
2218 (false, false, _) => Self::NIL,
2219 (false, true, WordSize::One) => Self::DEF8,
2220 (false, true, WordSize::Two) => Self::DEF16,
2221 (false, true, WordSize::Four) => Self::DEF32,
2222 (true, false, WordSize::One) => Self::REP8,
2223 (true, false, WordSize::Two) => Self::REP16,
2224 (true, false, WordSize::Four) => Self::REP32,
2225 (true, true, WordSize::One) => Self::BOTH8(bits_def, get_mask(bits_def as u16) as u32),
2226 (true, true, WordSize::Two) => Self::BOTH16(bits_def, get_mask(bits_def as u16) as u32),
2227 (true, true, WordSize::Four) => {
2228 Self::BOTH32(bits_def, get_mask(bits_def as u16) as u32)
2229 }
2230 }
2231 }
2232}
2233
2234#[cfg(test)]
2235mod tests {
2236 use arrow_buffer::{NullBuffer, OffsetBuffer, ScalarBuffer};
2237
2238 use crate::repdef::{
2239 CompositeRepDefUnraveler, DefinitionInterpretation, RepDefUnraveler, SerializedRepDefs,
2240 };
2241
2242 use super::RepDefBuilder;
2243
2244 fn validity(values: &[bool]) -> NullBuffer {
2245 NullBuffer::from_iter(values.iter().copied())
2246 }
2247
2248 fn offsets_32(values: &[i32]) -> OffsetBuffer<i32> {
2249 OffsetBuffer::<i32>::new(ScalarBuffer::from_iter(values.iter().copied()))
2250 }
2251
2252 fn offsets_64(values: &[i64]) -> OffsetBuffer<i64> {
2253 OffsetBuffer::<i64>::new(ScalarBuffer::from_iter(values.iter().copied()))
2254 }
2255
2256 #[test]
2257 fn test_repdef_basic() {
2258 let mut builder = RepDefBuilder::default();
2260 builder.add_offsets(
2261 offsets_64(&[0, 2, 2, 5]),
2262 Some(validity(&[true, false, true])),
2263 );
2264 builder.add_offsets(
2265 offsets_64(&[0, 1, 3, 5, 5, 9]),
2266 Some(validity(&[true, true, true, false, true])),
2267 );
2268 builder.add_validity_bitmap(validity(&[
2269 true, true, true, false, false, false, true, true, false,
2270 ]));
2271
2272 let repdefs = RepDefBuilder::serialize(vec![builder]);
2273 let rep = repdefs.repetition_levels.unwrap();
2274 let def = repdefs.definition_levels.unwrap();
2275
2276 assert_eq!(vec![0, 0, 0, 3, 1, 1, 2, 1, 0, 0, 1], *def);
2277 assert_eq!(vec![2, 1, 0, 2, 2, 0, 1, 1, 0, 0, 0], *rep);
2278
2279 let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2282 Some(rep.as_ref().to_vec()),
2283 Some(def.as_ref().to_vec()),
2284 repdefs.def_meaning.into(),
2285 )]);
2286
2287 assert_eq!(
2290 unraveler.unravel_validity(9),
2291 Some(validity(&[
2292 true, true, true, false, false, false, true, true, false
2293 ]))
2294 );
2295 let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2296 assert_eq!(off.inner(), offsets_32(&[0, 1, 3, 5, 5, 9]).inner());
2297 assert_eq!(val, Some(validity(&[true, true, true, false, true])));
2298 let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2299 assert_eq!(off.inner(), offsets_32(&[0, 2, 2, 5]).inner());
2300 assert_eq!(val, Some(validity(&[true, false, true])));
2301 }
2302
2303 #[test]
2304 fn test_repdef_simple_null_empty_list() {
2305 let check = |repdefs: SerializedRepDefs, last_def: DefinitionInterpretation| {
2306 let rep = repdefs.repetition_levels.unwrap();
2307 let def = repdefs.definition_levels.unwrap();
2308
2309 assert_eq!([1, 0, 1, 1, 0, 0], *rep);
2310 assert_eq!([0, 0, 2, 0, 1, 0], *def);
2311 assert_eq!(
2312 vec![DefinitionInterpretation::NullableItem, last_def,],
2313 repdefs.def_meaning
2314 );
2315 };
2316
2317 let mut builder = RepDefBuilder::default();
2321 builder.add_offsets(
2322 offsets_32(&[0, 2, 2, 5]),
2323 Some(validity(&[true, false, true])),
2324 );
2325 builder.add_validity_bitmap(validity(&[true, true, true, false, true]));
2326
2327 let repdefs = RepDefBuilder::serialize(vec![builder]);
2328
2329 check(repdefs, DefinitionInterpretation::NullableList);
2330
2331 let mut builder = RepDefBuilder::default();
2333 builder.add_offsets(offsets_32(&[0, 2, 2, 5]), None);
2334 builder.add_validity_bitmap(validity(&[true, true, true, false, true]));
2335
2336 let repdefs = RepDefBuilder::serialize(vec![builder]);
2337
2338 check(repdefs, DefinitionInterpretation::EmptyableList);
2339 }
2340
2341 #[test]
2342 fn test_repdef_empty_list_at_end() {
2343 let mut builder = RepDefBuilder::default();
2345 builder.add_offsets(offsets_32(&[0, 2, 5, 5]), None);
2346 builder.add_validity_bitmap(validity(&[true, true, true, false, true]));
2347
2348 let repdefs = RepDefBuilder::serialize(vec![builder]);
2349
2350 let rep = repdefs.repetition_levels.unwrap();
2351 let def = repdefs.definition_levels.unwrap();
2352
2353 assert_eq!([1, 0, 1, 0, 0, 1], *rep);
2354 assert_eq!([0, 0, 0, 1, 0, 2], *def);
2355 assert_eq!(
2356 vec![
2357 DefinitionInterpretation::NullableItem,
2358 DefinitionInterpretation::EmptyableList,
2359 ],
2360 repdefs.def_meaning
2361 );
2362 }
2363
2364 #[test]
2365 fn test_repdef_abnormal_nulls() {
2366 let mut builder = RepDefBuilder::default();
2369 builder.add_offsets(
2370 offsets_32(&[0, 2, 5, 8]),
2371 Some(validity(&[true, false, true])),
2372 );
2373 builder.add_no_null(5);
2376
2377 let repdefs = RepDefBuilder::serialize(vec![builder]);
2378
2379 let rep = repdefs.repetition_levels.unwrap();
2380 let def = repdefs.definition_levels.unwrap();
2381
2382 assert_eq!([1, 0, 1, 1, 0, 0], *rep);
2383 assert_eq!([0, 0, 1, 0, 0, 0], *def);
2384
2385 assert_eq!(
2386 vec![
2387 DefinitionInterpretation::AllValidItem,
2388 DefinitionInterpretation::NullableList,
2389 ],
2390 repdefs.def_meaning
2391 );
2392 }
2393
2394 #[test]
2395 fn test_repdef_fsl() {
2396 let mut builder = RepDefBuilder::default();
2397 builder.add_fsl(Some(validity(&[true, false])), 2, 2);
2398 builder.add_fsl(None, 2, 4);
2399 builder.add_validity_bitmap(validity(&[
2400 true, false, true, false, true, false, true, false,
2401 ]));
2402
2403 let repdefs = RepDefBuilder::serialize(vec![builder]);
2404
2405 assert_eq!(
2406 vec![
2407 DefinitionInterpretation::NullableItem,
2408 DefinitionInterpretation::AllValidItem,
2409 DefinitionInterpretation::NullableItem
2410 ],
2411 repdefs.def_meaning
2412 );
2413
2414 assert!(repdefs.repetition_levels.is_none());
2415
2416 let def = repdefs.definition_levels.unwrap();
2417
2418 assert_eq!([0, 1, 0, 1, 2, 2, 2, 2], *def);
2419
2420 let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2421 None,
2422 Some(def.as_ref().to_vec()),
2423 repdefs.def_meaning.into(),
2424 )]);
2425
2426 assert_eq!(
2427 unraveler.unravel_validity(8),
2428 Some(validity(&[
2429 true, false, true, false, false, false, false, false
2430 ]))
2431 );
2432 assert_eq!(unraveler.unravel_fsl_validity(4, 2), None);
2433 assert_eq!(
2434 unraveler.unravel_fsl_validity(2, 2),
2435 Some(validity(&[true, false]))
2436 );
2437 }
2438
2439 #[test]
2440 fn test_repdef_fsl_allvalid_item() {
2441 let mut builder = RepDefBuilder::default();
2442 builder.add_fsl(Some(validity(&[true, false])), 2, 2);
2443 builder.add_fsl(None, 2, 4);
2444 builder.add_no_null(8);
2445
2446 let repdefs = RepDefBuilder::serialize(vec![builder]);
2447
2448 assert_eq!(
2449 vec![
2450 DefinitionInterpretation::AllValidItem,
2451 DefinitionInterpretation::AllValidItem,
2452 DefinitionInterpretation::NullableItem
2453 ],
2454 repdefs.def_meaning
2455 );
2456
2457 assert!(repdefs.repetition_levels.is_none());
2458
2459 let def = repdefs.definition_levels.unwrap();
2460
2461 assert_eq!([0, 0, 0, 0, 1, 1, 1, 1], *def);
2462
2463 let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2464 None,
2465 Some(def.as_ref().to_vec()),
2466 repdefs.def_meaning.into(),
2467 )]);
2468
2469 assert_eq!(unraveler.unravel_validity(8), None);
2470 assert_eq!(unraveler.unravel_fsl_validity(4, 2), None);
2471 assert_eq!(
2472 unraveler.unravel_fsl_validity(2, 2),
2473 Some(validity(&[true, false]))
2474 );
2475 }
2476
2477 #[test]
2478 fn test_repdef_sliced_offsets() {
2479 let mut builder = RepDefBuilder::default();
2482 builder.add_offsets(
2483 offsets_32(&[5, 7, 7, 10]),
2484 Some(validity(&[true, false, true])),
2485 );
2486 builder.add_no_null(5);
2487
2488 let repdefs = RepDefBuilder::serialize(vec![builder]);
2489
2490 let rep = repdefs.repetition_levels.unwrap();
2491 let def = repdefs.definition_levels.unwrap();
2492
2493 assert_eq!([1, 0, 1, 1, 0, 0], *rep);
2494 assert_eq!([0, 0, 1, 0, 0, 0], *def);
2495
2496 assert_eq!(
2497 vec![
2498 DefinitionInterpretation::AllValidItem,
2499 DefinitionInterpretation::NullableList,
2500 ],
2501 repdefs.def_meaning
2502 );
2503 }
2504
2505 #[test]
2506 fn test_repdef_complex_null_empty() {
2507 let mut builder = RepDefBuilder::default();
2508 builder.add_offsets(
2509 offsets_32(&[0, 4, 4, 4, 6]),
2510 Some(validity(&[true, false, true, true])),
2511 );
2512 builder.add_offsets(
2513 offsets_32(&[0, 1, 1, 2, 2, 2, 3]),
2514 Some(validity(&[true, false, true, false, true, true])),
2515 );
2516 builder.add_no_null(3);
2517
2518 let repdefs = RepDefBuilder::serialize(vec![builder]);
2519
2520 let rep = repdefs.repetition_levels.unwrap();
2521 let def = repdefs.definition_levels.unwrap();
2522
2523 assert_eq!([2, 1, 1, 1, 2, 2, 2, 1], *rep);
2524 assert_eq!([0, 1, 0, 1, 3, 4, 2, 0], *def);
2525 }
2526
2527 #[test]
2528 fn test_repdef_empty_list_no_null() {
2529 let mut builder = RepDefBuilder::default();
2532 builder.add_offsets(offsets_32(&[0, 4, 4, 4, 6]), None);
2533 builder.add_no_null(6);
2534
2535 let repdefs = RepDefBuilder::serialize(vec![builder]);
2536
2537 let rep = repdefs.repetition_levels.unwrap();
2538 let def = repdefs.definition_levels.unwrap();
2539
2540 assert_eq!([1, 0, 0, 0, 1, 1, 1, 0], *rep);
2541 assert_eq!([0, 0, 0, 0, 1, 1, 0, 0], *def);
2542
2543 let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2544 Some(rep.as_ref().to_vec()),
2545 Some(def.as_ref().to_vec()),
2546 repdefs.def_meaning.into(),
2547 )]);
2548
2549 assert_eq!(unraveler.unravel_validity(6), None);
2550 let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2551 assert_eq!(off.inner(), offsets_32(&[0, 4, 4, 4, 6]).inner());
2552 assert_eq!(val, None);
2553 }
2554
2555 #[test]
2556 fn test_repdef_all_valid() {
2557 let mut builder = RepDefBuilder::default();
2558 builder.add_offsets(offsets_64(&[0, 2, 3, 5]), None);
2559 builder.add_offsets(offsets_64(&[0, 1, 3, 5, 7, 9]), None);
2560 builder.add_no_null(9);
2561
2562 let repdefs = RepDefBuilder::serialize(vec![builder]);
2563 let rep = repdefs.repetition_levels.unwrap();
2564 assert!(repdefs.definition_levels.is_none());
2565
2566 assert_eq!([2, 1, 0, 2, 0, 2, 0, 1, 0], *rep);
2567
2568 let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2569 Some(rep.as_ref().to_vec()),
2570 None,
2571 repdefs.def_meaning.into(),
2572 )]);
2573
2574 assert_eq!(unraveler.unravel_validity(9), None);
2575 let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2576 assert_eq!(off.inner(), offsets_32(&[0, 1, 3, 5, 7, 9]).inner());
2577 assert_eq!(val, None);
2578 let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2579 assert_eq!(off.inner(), offsets_32(&[0, 2, 3, 5]).inner());
2580 assert_eq!(val, None);
2581 }
2582
2583 #[test]
2584 fn test_only_empty_lists() {
2585 let mut builder = RepDefBuilder::default();
2586 builder.add_offsets(offsets_32(&[0, 4, 4, 4, 6]), None);
2587 builder.add_no_null(6);
2588
2589 let repdefs = RepDefBuilder::serialize(vec![builder]);
2590
2591 let rep = repdefs.repetition_levels.unwrap();
2592 let def = repdefs.definition_levels.unwrap();
2593
2594 assert_eq!([1, 0, 0, 0, 1, 1, 1, 0], *rep);
2595 assert_eq!([0, 0, 0, 0, 1, 1, 0, 0], *def);
2596
2597 let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2598 Some(rep.as_ref().to_vec()),
2599 Some(def.as_ref().to_vec()),
2600 repdefs.def_meaning.into(),
2601 )]);
2602
2603 assert_eq!(unraveler.unravel_validity(6), None);
2604 let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2605 assert_eq!(off.inner(), offsets_32(&[0, 4, 4, 4, 6]).inner());
2606 assert_eq!(val, None);
2607 }
2608
2609 #[test]
2610 fn test_only_null_lists() {
2611 let mut builder = RepDefBuilder::default();
2612 builder.add_offsets(
2613 offsets_32(&[0, 4, 4, 4, 6]),
2614 Some(validity(&[true, false, false, true])),
2615 );
2616 builder.add_no_null(6);
2617
2618 let repdefs = RepDefBuilder::serialize(vec![builder]);
2619
2620 let rep = repdefs.repetition_levels.unwrap();
2621 let def = repdefs.definition_levels.unwrap();
2622
2623 assert_eq!([1, 0, 0, 0, 1, 1, 1, 0], *rep);
2624 assert_eq!([0, 0, 0, 0, 1, 1, 0, 0], *def);
2625
2626 let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2627 Some(rep.as_ref().to_vec()),
2628 Some(def.as_ref().to_vec()),
2629 repdefs.def_meaning.into(),
2630 )]);
2631
2632 assert_eq!(unraveler.unravel_validity(6), None);
2633 let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2634 assert_eq!(off.inner(), offsets_32(&[0, 4, 4, 4, 6]).inner());
2635 assert_eq!(val, Some(validity(&[true, false, false, true])));
2636 }
2637
2638 #[test]
2639 fn test_null_and_empty_lists() {
2640 let mut builder = RepDefBuilder::default();
2641 builder.add_offsets(
2642 offsets_32(&[0, 4, 4, 4, 6]),
2643 Some(validity(&[true, false, true, true])),
2644 );
2645 builder.add_no_null(6);
2646
2647 let repdefs = RepDefBuilder::serialize(vec![builder]);
2648
2649 let rep = repdefs.repetition_levels.unwrap();
2650 let def = repdefs.definition_levels.unwrap();
2651
2652 assert_eq!([1, 0, 0, 0, 1, 1, 1, 0], *rep);
2653 assert_eq!([0, 0, 0, 0, 1, 2, 0, 0], *def);
2654
2655 let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2656 Some(rep.as_ref().to_vec()),
2657 Some(def.as_ref().to_vec()),
2658 repdefs.def_meaning.into(),
2659 )]);
2660
2661 assert_eq!(unraveler.unravel_validity(6), None);
2662 let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2663 assert_eq!(off.inner(), offsets_32(&[0, 4, 4, 4, 6]).inner());
2664 assert_eq!(val, Some(validity(&[true, false, true, true])));
2665 }
2666
2667 #[test]
2668 fn test_repdef_no_rep() {
2669 let mut builder = RepDefBuilder::default();
2670 builder.add_no_null(5);
2671 builder.add_validity_bitmap(validity(&[false, false, true, true, true]));
2672 builder.add_validity_bitmap(validity(&[false, true, true, true, false]));
2673
2674 let repdefs = RepDefBuilder::serialize(vec![builder]);
2675 assert!(repdefs.repetition_levels.is_none());
2676 let def = repdefs.definition_levels.unwrap();
2677
2678 assert_eq!([2, 2, 0, 0, 1], *def);
2679
2680 let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2681 None,
2682 Some(def.as_ref().to_vec()),
2683 repdefs.def_meaning.into(),
2684 )]);
2685
2686 assert_eq!(
2687 unraveler.unravel_validity(5),
2688 Some(validity(&[false, false, true, true, false]))
2689 );
2690 assert_eq!(
2691 unraveler.unravel_validity(5),
2692 Some(validity(&[false, false, true, true, true]))
2693 );
2694 assert_eq!(unraveler.unravel_validity(5), None);
2695 }
2696
2697 #[test]
2698 fn test_composite_unravel() {
2699 let mut builder = RepDefBuilder::default();
2700 builder.add_offsets(
2701 offsets_64(&[0, 2, 2, 5]),
2702 Some(validity(&[true, false, true])),
2703 );
2704 builder.add_no_null(5);
2705 let repdef1 = RepDefBuilder::serialize(vec![builder]);
2706
2707 let mut builder = RepDefBuilder::default();
2708 builder.add_offsets(offsets_64(&[0, 1, 3, 5, 7, 9]), None);
2709 builder.add_no_null(9);
2710 let repdef2 = RepDefBuilder::serialize(vec![builder]);
2711
2712 let rep1 = repdef1.repetition_levels.clone().unwrap();
2713 let def1 = repdef1.definition_levels.clone().unwrap();
2714 let rep2 = repdef2.repetition_levels.clone().unwrap();
2715 assert!(repdef2.definition_levels.is_none());
2716
2717 assert_eq!([1, 0, 1, 1, 0, 0], *rep1);
2718 assert_eq!([0, 0, 1, 0, 0, 0], *def1);
2719 assert_eq!([1, 1, 0, 1, 0, 1, 0, 1, 0], *rep2);
2720
2721 let unravel1 = RepDefUnraveler::new(
2722 repdef1.repetition_levels.map(|l| l.to_vec()),
2723 repdef1.definition_levels.map(|l| l.to_vec()),
2724 repdef1.def_meaning.into(),
2725 );
2726 let unravel2 = RepDefUnraveler::new(
2727 repdef2.repetition_levels.map(|l| l.to_vec()),
2728 repdef2.definition_levels.map(|l| l.to_vec()),
2729 repdef2.def_meaning.into(),
2730 );
2731
2732 let mut unraveler = CompositeRepDefUnraveler::new(vec![unravel1, unravel2]);
2733
2734 assert!(unraveler.unravel_validity(9).is_none());
2735 let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2736 assert_eq!(
2737 off.inner(),
2738 offsets_32(&[0, 2, 2, 5, 6, 8, 10, 12, 14]).inner()
2739 );
2740 assert_eq!(
2741 val,
2742 Some(validity(&[true, false, true, true, true, true, true, true]))
2743 );
2744 }
2745
2746 #[test]
2747 fn test_repdef_multiple_builders() {
2748 let mut builder1 = RepDefBuilder::default();
2750 builder1.add_offsets(offsets_64(&[0, 2]), None);
2751 builder1.add_offsets(offsets_64(&[0, 1, 3]), None);
2752 builder1.add_validity_bitmap(validity(&[true, true, true]));
2753
2754 let mut builder2 = RepDefBuilder::default();
2755 builder2.add_offsets(offsets_64(&[0, 0, 3]), Some(validity(&[false, true])));
2756 builder2.add_offsets(
2757 offsets_64(&[0, 2, 2, 6]),
2758 Some(validity(&[true, false, true])),
2759 );
2760 builder2.add_validity_bitmap(validity(&[false, false, false, true, true, false]));
2761
2762 let repdefs = RepDefBuilder::serialize(vec![builder1, builder2]);
2763
2764 let rep = repdefs.repetition_levels.unwrap();
2765 let def = repdefs.definition_levels.unwrap();
2766
2767 assert_eq!([2, 1, 0, 2, 2, 0, 1, 1, 0, 0, 0], *rep);
2768 assert_eq!([0, 0, 0, 3, 1, 1, 2, 1, 0, 0, 1], *def);
2769 }
2770
2771 #[test]
2772 fn test_slicer() {
2773 let mut builder = RepDefBuilder::default();
2774 builder.add_offsets(
2775 offsets_64(&[0, 2, 2, 30, 30]),
2776 Some(validity(&[true, false, true, true])),
2777 );
2778 builder.add_no_null(30);
2779
2780 let repdefs = RepDefBuilder::serialize(vec![builder]);
2781
2782 let mut rep_slicer = repdefs.rep_slicer().unwrap();
2783
2784 assert_eq!(rep_slicer.slice_next(5).len(), 12);
2786 assert_eq!(rep_slicer.slice_next(20).len(), 40);
2788 assert_eq!(rep_slicer.slice_rest().len(), 12);
2790
2791 let mut def_slicer = repdefs.rep_slicer().unwrap();
2792
2793 assert_eq!(def_slicer.slice_next(5).len(), 12);
2795 assert_eq!(def_slicer.slice_next(20).len(), 40);
2797 assert_eq!(def_slicer.slice_rest().len(), 12);
2799 }
2800
2801 #[test]
2802 fn test_control_words() {
2803 fn check(
2805 rep: &[u16],
2806 def: &[u16],
2807 expected_values: Vec<u8>,
2808 expected_bytes_per_word: usize,
2809 expected_bits_rep: u8,
2810 expected_bits_def: u8,
2811 ) {
2812 let num_vals = rep.len().max(def.len());
2813 let max_rep = rep.iter().max().copied().unwrap_or(0);
2814 let max_def = def.iter().max().copied().unwrap_or(0);
2815
2816 let in_rep = if rep.is_empty() { None } else { Some(rep) };
2817 let in_def = if def.is_empty() { None } else { Some(def) };
2818
2819 let mut iter = super::build_control_word_iterator(
2820 in_rep,
2821 max_rep,
2822 in_def,
2823 max_def,
2824 max_def + 1,
2825 expected_values.len(),
2826 );
2827 assert_eq!(iter.bytes_per_word(), expected_bytes_per_word);
2828 assert_eq!(iter.bits_rep(), expected_bits_rep);
2829 assert_eq!(iter.bits_def(), expected_bits_def);
2830 let mut cw_vec = Vec::with_capacity(num_vals * iter.bytes_per_word());
2831
2832 for _ in 0..num_vals {
2833 iter.append_next(&mut cw_vec);
2834 }
2835 assert!(iter.append_next(&mut cw_vec).is_none());
2836
2837 assert_eq!(expected_values, cw_vec);
2838
2839 let parser = super::ControlWordParser::new(expected_bits_rep, expected_bits_def);
2840
2841 let mut rep_out = Vec::with_capacity(num_vals);
2842 let mut def_out = Vec::with_capacity(num_vals);
2843
2844 if expected_bytes_per_word > 0 {
2845 for slice in cw_vec.chunks_exact(expected_bytes_per_word) {
2846 parser.parse(slice, &mut rep_out, &mut def_out);
2847 }
2848 }
2849
2850 assert_eq!(rep, rep_out.as_slice());
2851 assert_eq!(def, def_out.as_slice());
2852 }
2853
2854 let rep = &[0_u16, 7, 3, 2, 9, 8, 12, 5];
2856 let def = &[5_u16, 3, 1, 2, 12, 15, 0, 2];
2857 let expected = vec![
2858 0b00000101, 0b01110011, 0b00110001, 0b00100010, 0b10011100, 0b10001111, 0b11000000, 0b01010010, ];
2867 check(rep, def, expected, 1, 4, 4);
2868
2869 let rep = &[0_u16, 7, 3, 2, 9, 8, 12, 5];
2871 let def = &[5_u16, 3, 1, 2, 12, 22, 0, 2];
2872 let expected = vec![
2873 0b00000101, 0b00000000, 0b11100011, 0b00000000, 0b01100001, 0b00000000, 0b01000010, 0b00000000, 0b00101100, 0b00000001, 0b00010110, 0b00000001, 0b10000000, 0b00000001, 0b10100010, 0b00000000, ];
2882 check(rep, def, expected, 2, 4, 5);
2883
2884 let levels = &[0_u16, 7, 3, 2, 9, 8, 12, 5];
2886 let expected = vec![
2887 0b00000000, 0b00000111, 0b00000011, 0b00000010, 0b00001001, 0b00001000, 0b00001100, 0b00000101, ];
2896 check(levels, &[], expected.clone(), 1, 4, 0);
2897
2898 check(&[], levels, expected, 1, 0, 4);
2900
2901 check(&[], &[], Vec::default(), 0, 0, 0);
2903 }
2904
2905 #[test]
2906 fn test_control_words_rep_index() {
2907 fn check(
2908 rep: &[u16],
2909 def: &[u16],
2910 expected_new_rows: Vec<bool>,
2911 expected_is_visible: Vec<bool>,
2912 ) {
2913 let num_vals = rep.len().max(def.len());
2914 let max_rep = rep.iter().max().copied().unwrap_or(0);
2915 let max_def = def.iter().max().copied().unwrap_or(0);
2916
2917 let in_rep = if rep.is_empty() { None } else { Some(rep) };
2918 let in_def = if def.is_empty() { None } else { Some(def) };
2919
2920 let mut iter = super::build_control_word_iterator(
2921 in_rep,
2922 max_rep,
2923 in_def,
2924 max_def,
2925 2,
2926 expected_new_rows.len(),
2927 );
2928
2929 let mut cw_vec = Vec::with_capacity(num_vals * iter.bytes_per_word());
2930 let mut expected_new_rows = expected_new_rows.iter().copied();
2931 let mut expected_is_visible = expected_is_visible.iter().copied();
2932 for _ in 0..expected_new_rows.len() {
2933 let word_desc = iter.append_next(&mut cw_vec).unwrap();
2934 assert_eq!(word_desc.is_new_row, expected_new_rows.next().unwrap());
2935 assert_eq!(word_desc.is_visible, expected_is_visible.next().unwrap());
2936 }
2937 assert!(iter.append_next(&mut cw_vec).is_none());
2938 }
2939
2940 let rep = &[2_u16, 1, 0, 2, 2, 0, 1, 1, 0, 2, 0];
2942 let def = &[0_u16, 0, 0, 3, 1, 1, 2, 1, 0, 0, 1];
2944
2945 check(
2947 rep,
2948 def,
2949 vec![
2950 true, false, false, true, true, false, false, false, false, true, false,
2951 ],
2952 vec![
2953 true, true, true, false, true, true, true, true, true, true, true,
2954 ],
2955 );
2956 check(
2958 rep,
2959 &[],
2960 vec![
2961 true, false, false, true, true, false, false, false, false, true, false,
2962 ],
2963 vec![true; 11],
2964 );
2965 check(
2967 &[],
2968 def,
2969 vec![
2970 true, true, true, true, true, true, true, true, true, true, true,
2971 ],
2972 vec![true; 11],
2973 );
2974 check(
2976 &[],
2977 &[],
2978 vec![
2979 true, true, true, true, true, true, true, true, true, true, true,
2980 ],
2981 vec![true; 11],
2982 );
2983 }
2984
2985 #[test]
2986 fn regress_empty_list_case() {
2987 let mut builder = RepDefBuilder::default();
2989 builder.add_validity_bitmap(validity(&[true, false, true]));
2990 builder.add_offsets(
2991 offsets_32(&[0, 0, 0, 0]),
2992 Some(validity(&[false, false, false])),
2993 );
2994 builder.add_no_null(0);
2995
2996 let repdefs = RepDefBuilder::serialize(vec![builder]);
2997 let rep = repdefs.repetition_levels.unwrap();
2998 let def = repdefs.definition_levels.unwrap();
2999
3000 assert_eq!([1, 1, 1], *rep);
3001 assert_eq!([1, 2, 1], *def);
3002
3003 let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
3004 Some(rep.as_ref().to_vec()),
3005 Some(def.as_ref().to_vec()),
3006 repdefs.def_meaning.into(),
3007 )]);
3008
3009 assert_eq!(unraveler.unravel_validity(0), None);
3010 let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
3011 assert_eq!(off.inner(), offsets_32(&[0, 0, 0, 0]).inner());
3012 assert_eq!(val, Some(validity(&[false, false, false])));
3013 let val = unraveler.unravel_validity(3).unwrap();
3014 assert_eq!(val.inner(), validity(&[true, false, true]).inner());
3015 }
3016
3017 #[test]
3018 fn regress_list_ends_null_case() {
3019 let mut builder = RepDefBuilder::default();
3020 builder.add_offsets(
3021 offsets_64(&[0, 1, 2, 2]),
3022 Some(validity(&[true, true, false])),
3023 );
3024 builder.add_offsets(offsets_64(&[0, 1, 1]), Some(validity(&[true, false])));
3025 builder.add_no_null(1);
3026
3027 let repdefs = RepDefBuilder::serialize(vec![builder]);
3028 let rep = repdefs.repetition_levels.unwrap();
3029 let def = repdefs.definition_levels.unwrap();
3030
3031 assert_eq!([2, 2, 2], *rep);
3032 assert_eq!([0, 1, 2], *def);
3033
3034 let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
3035 Some(rep.as_ref().to_vec()),
3036 Some(def.as_ref().to_vec()),
3037 repdefs.def_meaning.into(),
3038 )]);
3039
3040 assert_eq!(unraveler.unravel_validity(1), None);
3041 let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
3042 assert_eq!(off.inner(), offsets_32(&[0, 1, 1]).inner());
3043 assert_eq!(val, Some(validity(&[true, false])));
3044 let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
3045 assert_eq!(off.inner(), offsets_32(&[0, 1, 2, 2]).inner());
3046 assert_eq!(val, Some(validity(&[true, true, false])));
3047 }
3048}