1use std::{
110 iter::{Copied, Zip},
111 sync::Arc,
112};
113
114use arrow_array::OffsetSizeTrait;
115use arrow_buffer::{
116 ArrowNativeType, BooleanBuffer, BooleanBufferBuilder, NullBuffer, OffsetBuffer, ScalarBuffer,
117};
118use lance_core::{utils::bit::log_2_ceil, Error, Result};
119use snafu::location;
120
121use crate::buffer::LanceBuffer;
122
123pub type LevelBuffer = Vec<u16>;
128
129const SPECIAL_THRESHOLD: u16 = u16::MAX / 2;
135
136#[derive(Clone, Debug)]
139struct OffsetDesc {
140 offsets: Arc<[i64]>,
141 validity: Option<BooleanBuffer>,
142 has_empty_lists: bool,
143 num_values: usize,
144 num_specials: usize,
145}
146
147#[derive(Clone, Debug)]
150struct ValidityDesc {
151 validity: Option<BooleanBuffer>,
152 num_values: usize,
153}
154
155#[derive(Clone, Debug)]
159struct FslDesc {
160 validity: Option<BooleanBuffer>,
161 dimension: usize,
162 num_values: usize,
163}
164
165#[derive(Clone, Debug)]
169enum RawRepDef {
170 Offsets(OffsetDesc),
171 Validity(ValidityDesc),
172 Fsl(FslDesc),
173}
174
175impl RawRepDef {
176 fn has_nulls(&self) -> bool {
178 match self {
179 Self::Offsets(OffsetDesc { validity, .. }) => validity.is_some(),
180 Self::Validity(ValidityDesc { validity, .. }) => validity.is_some(),
181 Self::Fsl(FslDesc { validity, .. }) => validity.is_some(),
182 }
183 }
184
185 fn num_values(&self) -> usize {
187 match self {
188 Self::Offsets(OffsetDesc { num_values, .. }) => *num_values,
189 Self::Validity(ValidityDesc { num_values, .. }) => *num_values,
190 Self::Fsl(FslDesc { num_values, .. }) => *num_values,
191 }
192 }
193
194 fn num_specials(&self) -> usize {
196 match self {
197 Self::Offsets(OffsetDesc { num_specials, .. }) => *num_specials,
198 _ => 0,
199 }
200 }
201
202 fn max_def(&self) -> u16 {
204 match self {
205 Self::Offsets(OffsetDesc {
206 has_empty_lists,
207 validity,
208 ..
209 }) => {
210 let mut max_def = 0;
211 if *has_empty_lists {
212 max_def += 1;
213 }
214 if validity.is_some() {
215 max_def += 1;
216 }
217 max_def
218 }
219 Self::Validity(ValidityDesc { validity: None, .. }) => 0,
220 Self::Validity(ValidityDesc { .. }) => 1,
221 Self::Fsl(FslDesc { validity: None, .. }) => 0,
222 Self::Fsl(FslDesc { .. }) => 1,
223 }
224 }
225
226 fn max_rep(&self) -> u16 {
228 match self {
229 Self::Offsets(_) => 1,
230 _ => 0,
231 }
232 }
233}
234
235#[derive(Debug)]
238pub struct SerializedRepDefs {
239 pub repetition_levels: Option<Arc<[u16]>>,
243 pub definition_levels: Option<Arc<[u16]>>,
247 pub def_meaning: Vec<DefinitionInterpretation>,
249 pub max_visible_level: Option<u16>,
256}
257
258impl SerializedRepDefs {
259 pub fn new(
260 repetition_levels: Option<LevelBuffer>,
261 definition_levels: Option<LevelBuffer>,
262 def_meaning: Vec<DefinitionInterpretation>,
263 ) -> Self {
264 let first_list = def_meaning.iter().position(|level| level.is_list());
265 let max_visible_level = first_list.map(|first_list| {
266 def_meaning
267 .iter()
268 .map(|level| level.num_def_levels())
269 .take(first_list)
270 .sum::<u16>()
271 });
272 Self {
273 repetition_levels: repetition_levels.map(Arc::from),
274 definition_levels: definition_levels.map(Arc::from),
275 def_meaning,
276 max_visible_level,
277 }
278 }
279
280 pub fn empty(def_meaning: Vec<DefinitionInterpretation>) -> Self {
282 Self {
283 repetition_levels: None,
284 definition_levels: None,
285 def_meaning,
286 max_visible_level: None,
287 }
288 }
289
290 pub fn rep_slicer(&self) -> Option<RepDefSlicer> {
291 self.repetition_levels
292 .as_ref()
293 .map(|rep| RepDefSlicer::new(self, rep.clone()))
294 }
295
296 pub fn def_slicer(&self) -> Option<RepDefSlicer> {
297 self.definition_levels
298 .as_ref()
299 .map(|def| RepDefSlicer::new(self, def.clone()))
300 }
301}
302
303#[derive(Debug)]
311pub struct RepDefSlicer<'a> {
312 repdef: &'a SerializedRepDefs,
313 to_slice: LanceBuffer,
314 current: usize,
315}
316
317impl<'a> RepDefSlicer<'a> {
319 fn new(repdef: &'a SerializedRepDefs, levels: Arc<[u16]>) -> Self {
320 Self {
321 repdef,
322 to_slice: LanceBuffer::reinterpret_slice(levels),
323 current: 0,
324 }
325 }
326
327 pub fn num_levels(&self) -> usize {
328 self.to_slice.len() / 2
329 }
330
331 pub fn num_levels_remaining(&self) -> usize {
332 self.num_levels() - self.current
333 }
334
335 pub fn all_levels(&self) -> &LanceBuffer {
336 &self.to_slice
337 }
338
339 pub fn slice_rest(&mut self) -> LanceBuffer {
348 let start = self.current;
349 let remaining = self.num_levels_remaining();
350 self.current = self.num_levels();
351 self.to_slice.slice_with_length(start * 2, remaining * 2)
352 }
353
354 pub fn slice_next(&mut self, num_values: usize) -> LanceBuffer {
356 let start = self.current;
357 let Some(max_visible_level) = self.repdef.max_visible_level else {
358 self.current = start + num_values;
360 return self.to_slice.slice_with_length(start * 2, num_values * 2);
361 };
362 if let Some(def) = self.repdef.definition_levels.as_ref() {
363 let mut def_itr = def[start..].iter();
367 let mut num_taken = 0;
368 let mut num_passed = 0;
369 while num_taken < num_values {
370 let def_level = *def_itr.next().unwrap();
371 if def_level <= max_visible_level {
372 num_taken += 1;
373 }
374 num_passed += 1;
375 }
376 self.current = start + num_passed;
377 self.to_slice.slice_with_length(start * 2, num_passed * 2)
378 } else {
379 self.current = start + num_values;
381 self.to_slice.slice_with_length(start * 2, num_values * 2)
382 }
383 }
384}
385
386#[derive(Debug, Copy, Clone, PartialEq, Eq)]
399pub enum DefinitionInterpretation {
400 AllValidItem,
401 AllValidList,
402 NullableItem,
403 NullableList,
404 EmptyableList,
405 NullableAndEmptyableList,
406}
407
408impl DefinitionInterpretation {
409 pub fn num_def_levels(&self) -> u16 {
411 match self {
412 Self::AllValidItem => 0,
413 Self::AllValidList => 0,
414 Self::NullableItem => 1,
415 Self::NullableList => 1,
416 Self::EmptyableList => 1,
417 Self::NullableAndEmptyableList => 2,
418 }
419 }
420
421 pub fn is_all_valid(&self) -> bool {
423 matches!(
424 self,
425 Self::AllValidItem | Self::AllValidList | Self::EmptyableList
426 )
427 }
428
429 pub fn is_list(&self) -> bool {
431 matches!(
432 self,
433 Self::AllValidList
434 | Self::NullableList
435 | Self::EmptyableList
436 | Self::NullableAndEmptyableList
437 )
438 }
439}
440
441#[derive(Debug)]
453struct SerializerContext {
454 def_meaning: Vec<DefinitionInterpretation>,
456 rep_levels: LevelBuffer,
457 spare_rep: LevelBuffer,
458 def_levels: LevelBuffer,
459 spare_def: LevelBuffer,
460 current_rep: u16,
461 current_def: u16,
462 current_len: usize,
463 current_num_specials: usize,
464}
465
466impl SerializerContext {
467 fn new(len: usize, num_layers: usize, max_rep: u16, max_def: u16) -> Self {
468 let def_meaning = Vec::with_capacity(num_layers);
469 Self {
470 rep_levels: if max_rep > 0 {
471 vec![0; len]
472 } else {
473 LevelBuffer::default()
474 },
475 spare_rep: if max_rep > 0 {
476 vec![0; len]
477 } else {
478 LevelBuffer::default()
479 },
480 def_levels: if max_def > 0 {
481 vec![0; len]
482 } else {
483 LevelBuffer::default()
484 },
485 spare_def: if max_def > 0 {
486 vec![0; len]
487 } else {
488 LevelBuffer::default()
489 },
490 def_meaning,
491 current_rep: max_rep,
492 current_def: max_def,
493 current_len: 0,
494 current_num_specials: 0,
495 }
496 }
497
498 fn checkout_def(&mut self, meaning: DefinitionInterpretation) -> u16 {
499 let def = self.current_def;
500 self.current_def -= meaning.num_def_levels();
501 self.def_meaning.push(meaning);
502 def
503 }
504
505 fn record_offsets(&mut self, offset_desc: &OffsetDesc) {
506 let rep_level = self.current_rep;
507 let (null_list_level, empty_list_level) =
508 match (offset_desc.validity.is_some(), offset_desc.has_empty_lists) {
509 (true, true) => {
510 let level =
511 self.checkout_def(DefinitionInterpretation::NullableAndEmptyableList);
512 (level - 1, level)
513 }
514 (true, false) => (self.checkout_def(DefinitionInterpretation::NullableList), 0),
515 (false, true) => (
516 0,
517 self.checkout_def(DefinitionInterpretation::EmptyableList),
518 ),
519 (false, false) => {
520 self.checkout_def(DefinitionInterpretation::AllValidList);
521 (0, 0)
522 }
523 };
524 self.current_rep -= 1;
525
526 if let Some(validity) = &offset_desc.validity {
527 self.do_record_validity(validity, null_list_level);
528 }
529
530 let mut new_len = 0;
535 assert!(self.rep_levels.len() >= offset_desc.num_values - 1 + self.current_num_specials);
536 if self.def_levels.is_empty() {
537 let mut write_itr = self.spare_rep.iter_mut();
538 let mut read_iter = self.rep_levels.iter().copied();
539 for w in offset_desc.offsets.windows(2) {
540 let len = w[1] - w[0];
541 assert!(len > 0);
543 let rep = read_iter.next().unwrap();
544 let list_level = if rep == 0 { rep_level } else { rep };
545 *write_itr.next().unwrap() = list_level;
546
547 for _ in 1..len {
548 *write_itr.next().unwrap() = 0;
549 }
550 new_len += len as usize;
551 }
552 std::mem::swap(&mut self.rep_levels, &mut self.spare_rep);
553 } else {
554 assert!(
555 self.def_levels.len() >= offset_desc.num_values - 1 + self.current_num_specials
556 );
557 let mut def_write_itr = self.spare_def.iter_mut();
558 let mut rep_write_itr = self.spare_rep.iter_mut();
559 let mut rep_read_itr = self.rep_levels.iter().copied();
560 let mut def_read_itr = self.def_levels.iter().copied();
561 let specials_to_pass = self.current_num_specials;
562 let mut specials_passed = 0;
563
564 for w in offset_desc.offsets.windows(2) {
565 let mut def = def_read_itr.next().unwrap();
566 while def > SPECIAL_THRESHOLD {
568 *def_write_itr.next().unwrap() = def;
569 *rep_write_itr.next().unwrap() = rep_read_itr.next().unwrap();
570 def = def_read_itr.next().unwrap();
571 new_len += 1;
572 specials_passed += 1;
573 }
574
575 let len = w[1] - w[0];
576 let rep = rep_read_itr.next().unwrap();
577
578 let list_level = if rep == 0 { rep_level } else { rep };
582
583 if def == 0 && len > 0 {
584 *def_write_itr.next().unwrap() = 0;
586 *rep_write_itr.next().unwrap() = list_level;
587
588 for _ in 1..len {
589 *def_write_itr.next().unwrap() = 0;
590 *rep_write_itr.next().unwrap() = 0;
591 }
592
593 new_len += len as usize;
594 } else if def == 0 {
595 *def_write_itr.next().unwrap() = empty_list_level + SPECIAL_THRESHOLD;
597 *rep_write_itr.next().unwrap() = list_level;
598 new_len += 1;
599 } else {
600 *def_write_itr.next().unwrap() = def + SPECIAL_THRESHOLD;
603 *rep_write_itr.next().unwrap() = list_level;
604 new_len += 1;
605 }
606 }
607
608 while specials_passed < specials_to_pass {
610 *def_write_itr.next().unwrap() = def_read_itr.next().unwrap();
611 *rep_write_itr.next().unwrap() = rep_read_itr.next().unwrap();
612 new_len += 1;
613 specials_passed += 1;
614 }
615 std::mem::swap(&mut self.def_levels, &mut self.spare_def);
616 std::mem::swap(&mut self.rep_levels, &mut self.spare_rep);
617 }
618
619 self.current_len = new_len;
620 self.current_num_specials += offset_desc.num_specials;
621 }
622
623 fn do_record_validity(&mut self, validity: &BooleanBuffer, null_level: u16) {
624 assert!(self.def_levels.len() >= validity.len() + self.current_num_specials);
625 debug_assert!(
626 self.current_len == 0 || self.current_len == validity.len() + self.current_num_specials
627 );
628 self.current_len = validity.len();
629
630 let mut def_read_itr = self.def_levels.iter().copied();
631 let mut def_write_itr = self.spare_def.iter_mut();
632
633 let specials_to_pass = self.current_num_specials;
634 let mut specials_passed = 0;
635
636 for incoming_validity in validity.iter() {
637 let mut def = def_read_itr.next().unwrap();
638 while def > SPECIAL_THRESHOLD {
639 *def_write_itr.next().unwrap() = def;
640 def = def_read_itr.next().unwrap();
641 specials_passed += 1;
642 }
643 if def == 0 && !incoming_validity {
644 *def_write_itr.next().unwrap() = null_level;
645 } else {
646 *def_write_itr.next().unwrap() = def;
647 }
648 }
649
650 while specials_passed < specials_to_pass {
651 *def_write_itr.next().unwrap() = def_read_itr.next().unwrap();
652 specials_passed += 1;
653 }
654
655 std::mem::swap(&mut self.def_levels, &mut self.spare_def);
656 }
657
658 fn multiply_levels(&mut self, multiplier: usize) {
659 let old_len = self.current_len;
660 self.current_len =
662 (self.current_len - self.current_num_specials) * multiplier + self.current_num_specials;
663
664 if self.rep_levels.is_empty() && self.def_levels.is_empty() {
665 return;
667 } else if self.rep_levels.is_empty() {
668 assert!(self.def_levels.len() >= self.current_len);
669 let mut def_read_itr = self.def_levels.iter().copied();
671 let mut def_write_itr = self.spare_def.iter_mut();
672 for _ in 0..old_len {
673 let mut def = def_read_itr.next().unwrap();
674 while def > SPECIAL_THRESHOLD {
675 *def_write_itr.next().unwrap() = def;
676 def = def_read_itr.next().unwrap();
677 }
678 for _ in 0..multiplier {
679 *def_write_itr.next().unwrap() = def;
680 }
681 }
682 } else if self.def_levels.is_empty() {
683 assert!(self.rep_levels.len() >= self.current_len);
684 let mut rep_read_itr = self.rep_levels.iter().copied();
686 let mut rep_write_itr = self.spare_rep.iter_mut();
687 for _ in 0..old_len {
688 let rep = rep_read_itr.next().unwrap();
689 for _ in 0..multiplier {
690 *rep_write_itr.next().unwrap() = rep;
691 }
692 }
693 } else {
694 assert!(self.rep_levels.len() >= self.current_len);
695 assert!(self.def_levels.len() >= self.current_len);
696 let mut rep_read_itr = self.rep_levels.iter().copied();
697 let mut def_read_itr = self.def_levels.iter().copied();
698 let mut rep_write_itr = self.spare_rep.iter_mut();
699 let mut def_write_itr = self.spare_def.iter_mut();
700 for _ in 0..old_len {
701 let mut def = def_read_itr.next().unwrap();
702 while def > SPECIAL_THRESHOLD {
703 *def_write_itr.next().unwrap() = def;
704 *rep_write_itr.next().unwrap() = rep_read_itr.next().unwrap();
705 def = def_read_itr.next().unwrap();
706 }
707 let rep = rep_read_itr.next().unwrap();
708 for _ in 0..multiplier {
709 *def_write_itr.next().unwrap() = def;
710 *rep_write_itr.next().unwrap() = rep;
711 }
712 }
713 }
714 std::mem::swap(&mut self.def_levels, &mut self.spare_def);
715 std::mem::swap(&mut self.rep_levels, &mut self.spare_rep);
716 }
717
718 fn record_validity_buf(&mut self, validity: &Option<BooleanBuffer>) {
719 if let Some(validity) = validity {
720 let def_level = self.checkout_def(DefinitionInterpretation::NullableItem);
721 self.do_record_validity(validity, def_level);
722 } else {
723 self.checkout_def(DefinitionInterpretation::AllValidItem);
724 }
725 }
726
727 fn record_validity(&mut self, validity_desc: &ValidityDesc) {
728 self.record_validity_buf(&validity_desc.validity)
729 }
730
731 fn record_fsl(&mut self, fsl_desc: &FslDesc) {
732 self.record_validity_buf(&fsl_desc.validity);
733 self.multiply_levels(fsl_desc.dimension);
734 }
735
736 fn normalize_specials(&mut self) {
737 for def in self.def_levels.iter_mut() {
738 if *def > SPECIAL_THRESHOLD {
739 *def -= SPECIAL_THRESHOLD;
740 }
741 }
742 }
743
744 fn build(mut self) -> SerializedRepDefs {
745 if self.current_len == 0 {
746 return SerializedRepDefs::new(None, None, self.def_meaning);
747 }
748
749 self.normalize_specials();
750
751 let definition_levels = if self.def_levels.is_empty() {
752 None
753 } else {
754 Some(self.def_levels)
755 };
756 let repetition_levels = if self.rep_levels.is_empty() {
757 None
758 } else {
759 Some(self.rep_levels)
760 };
761
762 let def_meaning = self.def_meaning.into_iter().rev().collect::<Vec<_>>();
764
765 SerializedRepDefs::new(repetition_levels, definition_levels, def_meaning)
766 }
767}
768
769#[derive(Clone, Default, Debug)]
776pub struct RepDefBuilder {
777 repdefs: Vec<RawRepDef>,
779 len: Option<usize>,
784}
785
786impl RepDefBuilder {
787 fn check_validity_len(&mut self, incoming_len: usize) {
788 if let Some(len) = self.len {
789 assert_eq!(incoming_len, len);
790 } else {
791 self.len = Some(incoming_len);
793 }
794 }
795
796 fn num_layers(&self) -> usize {
797 self.repdefs.len()
798 }
799
800 fn is_empty(&self) -> bool {
803 self.repdefs
804 .iter()
805 .all(|r| matches!(r, RawRepDef::Validity(ValidityDesc { validity: None, .. })))
806 }
807
808 pub fn is_simple_validity(&self) -> bool {
810 self.repdefs.len() == 1 && matches!(self.repdefs[0], RawRepDef::Validity(_))
811 }
812
813 pub fn add_validity_bitmap(&mut self, validity: NullBuffer) {
815 self.check_validity_len(validity.len());
816 self.repdefs.push(RawRepDef::Validity(ValidityDesc {
817 num_values: validity.len(),
818 validity: Some(validity.into_inner()),
819 }));
820 }
821
822 pub fn add_no_null(&mut self, len: usize) {
824 self.check_validity_len(len);
825 self.repdefs.push(RawRepDef::Validity(ValidityDesc {
826 validity: None,
827 num_values: len,
828 }));
829 }
830
831 pub fn add_fsl(&mut self, validity: Option<NullBuffer>, dimension: usize, num_values: usize) {
832 if let Some(len) = self.len {
833 assert_eq!(num_values, len);
834 }
835 self.len = Some(num_values * dimension);
836 debug_assert!(validity.is_none() || validity.as_ref().unwrap().len() == num_values);
837 self.repdefs.push(RawRepDef::Fsl(FslDesc {
838 num_values,
839 validity: validity.map(|v| v.into_inner()),
840 dimension,
841 }))
842 }
843
844 fn check_offset_len(&mut self, offsets: &[i64]) {
845 if let Some(len) = self.len {
846 assert!(offsets.len() == len + 1);
847 }
848 self.len = Some(offsets[offsets.len() - 1] as usize);
849 }
850
851 pub fn add_offsets<O: OffsetSizeTrait>(
858 &mut self,
859 offsets: OffsetBuffer<O>,
860 validity: Option<NullBuffer>,
861 ) -> bool {
862 let mut has_garbage_values = false;
863 let mut num_specials = 0;
864 if O::IS_LARGE {
865 let inner = offsets.into_inner();
866 let len = inner.len();
867 let i64_buff = ScalarBuffer::<i64>::new(inner.into_inner(), 0, len);
868 let mut normalized = Vec::with_capacity(len);
869 normalized.push(0_i64);
870 let mut has_empty_lists = false;
871 let mut last_off = 0;
872 if let Some(validity) = validity.as_ref() {
873 for (off, valid) in i64_buff.windows(2).zip(validity.iter()) {
874 let len: i64 = off[1] - off[0];
875 match (valid, len == 0) {
876 (false, is_empty) => {
877 num_specials += 1;
878 has_garbage_values |= !is_empty;
879 }
880 (true, true) => {
881 num_specials += 1;
882 has_empty_lists = true;
883 }
884 _ => {
885 last_off += len;
886 }
887 }
888 normalized.push(last_off);
889 }
890 } else {
891 for off in i64_buff.windows(2) {
892 let len: i64 = off[1] - off[0];
893 if len == 0 {
894 num_specials += 1;
895 has_empty_lists = true;
896 }
897 last_off += len;
898 normalized.push(last_off);
899 }
900 };
901 self.check_offset_len(&normalized);
902 self.repdefs.push(RawRepDef::Offsets(OffsetDesc {
903 num_values: normalized.len() - 1,
904 offsets: normalized.into(),
905 validity: validity.map(|v| v.into_inner()),
906 has_empty_lists,
907 num_specials,
908 }));
909 has_garbage_values
910 } else {
911 let inner = offsets.into_inner();
912 let len = inner.len();
913 let scalar_off = ScalarBuffer::<i32>::new(inner.into_inner(), 0, len);
914 let mut casted = Vec::with_capacity(len);
915 casted.push(0);
916 let mut has_empty_lists = false;
917 let mut num_specials = 0;
918 let mut last_off: i64 = 0;
919 if let Some(validity) = validity.as_ref() {
920 for (off, valid) in scalar_off.windows(2).zip(validity.iter()) {
921 let len = (off[1] - off[0]) as i64;
922 match (valid, len == 0) {
923 (false, is_empty) => {
924 num_specials += 1;
925 has_garbage_values |= !is_empty;
926 }
927 (true, true) => {
928 num_specials += 1;
929 has_empty_lists = true;
930 }
931 _ => {
932 last_off += len;
933 }
934 }
935 casted.push(last_off);
936 }
937 } else {
938 for off in scalar_off.windows(2) {
939 let len = (off[1] - off[0]) as i64;
940 if len == 0 {
941 num_specials += 1;
942 has_empty_lists = true;
943 }
944 last_off += len;
945 casted.push(last_off);
946 }
947 };
948 self.check_offset_len(&casted);
949 self.repdefs.push(RawRepDef::Offsets(OffsetDesc {
950 num_values: casted.len() - 1,
951 offsets: casted.into(),
952 validity: validity.map(|v| v.into_inner()),
953 has_empty_lists,
954 num_specials,
955 }));
956 has_garbage_values
957 }
958 }
959
960 fn concat_layers<'a>(
972 layers: impl Iterator<Item = &'a RawRepDef>,
973 num_layers: usize,
974 ) -> RawRepDef {
975 enum LayerKind {
976 Validity,
977 Fsl,
978 Offsets,
979 }
980
981 let mut collected = Vec::with_capacity(num_layers);
984 let mut has_nulls = false;
985 let mut layer_kind = LayerKind::Validity;
986 let mut total_num_specials = 0;
987 let mut all_dimension = 0;
988 let mut all_has_empty_lists = false;
989 let mut all_num_values = 0;
990 for layer in layers {
991 has_nulls |= layer.has_nulls();
992 match layer {
993 RawRepDef::Validity(_) => {
994 layer_kind = LayerKind::Validity;
995 }
996 RawRepDef::Offsets(OffsetDesc {
997 num_specials,
998 has_empty_lists,
999 ..
1000 }) => {
1001 all_has_empty_lists |= *has_empty_lists;
1002 layer_kind = LayerKind::Offsets;
1003 total_num_specials += num_specials;
1004 }
1005 RawRepDef::Fsl(FslDesc { dimension, .. }) => {
1006 layer_kind = LayerKind::Fsl;
1007 all_dimension = *dimension;
1008 }
1009 }
1010 collected.push(layer);
1011 all_num_values += layer.num_values();
1012 }
1013
1014 if !has_nulls {
1016 match layer_kind {
1017 LayerKind::Validity => {
1018 return RawRepDef::Validity(ValidityDesc {
1019 validity: None,
1020 num_values: all_num_values,
1021 });
1022 }
1023 LayerKind::Fsl => {
1024 return RawRepDef::Fsl(FslDesc {
1025 validity: None,
1026 num_values: all_num_values,
1027 dimension: all_dimension,
1028 })
1029 }
1030 LayerKind::Offsets => {}
1031 }
1032 }
1033
1034 let mut validity_builder = if has_nulls {
1036 BooleanBufferBuilder::new(all_num_values)
1037 } else {
1038 BooleanBufferBuilder::new(0)
1039 };
1040 let mut all_offsets = if matches!(layer_kind, LayerKind::Offsets) {
1041 let mut all_offsets = Vec::with_capacity(all_num_values);
1042 all_offsets.push(0);
1043 all_offsets
1044 } else {
1045 Vec::new()
1046 };
1047
1048 for layer in collected {
1049 match layer {
1050 RawRepDef::Validity(ValidityDesc {
1051 validity: Some(validity),
1052 ..
1053 }) => {
1054 validity_builder.append_buffer(validity);
1055 }
1056 RawRepDef::Validity(ValidityDesc {
1057 validity: None,
1058 num_values,
1059 }) => {
1060 validity_builder.append_n(*num_values, true);
1061 }
1062 RawRepDef::Fsl(FslDesc {
1063 validity,
1064 num_values,
1065 ..
1066 }) => {
1067 if let Some(validity) = validity {
1068 validity_builder.append_buffer(validity);
1069 } else {
1070 validity_builder.append_n(*num_values, true);
1071 }
1072 }
1073 RawRepDef::Offsets(OffsetDesc {
1074 offsets,
1075 validity: Some(validity),
1076 has_empty_lists,
1077 ..
1078 }) => {
1079 all_has_empty_lists |= has_empty_lists;
1080 validity_builder.append_buffer(validity);
1081 let last = *all_offsets.last().unwrap();
1082 all_offsets.extend(offsets.iter().skip(1).map(|off| *off + last));
1083 }
1084 RawRepDef::Offsets(OffsetDesc {
1085 offsets,
1086 validity: None,
1087 has_empty_lists,
1088 num_values,
1089 ..
1090 }) => {
1091 all_has_empty_lists |= has_empty_lists;
1092 if has_nulls {
1093 validity_builder.append_n(*num_values, true);
1094 }
1095 let last = *all_offsets.last().unwrap();
1096 all_offsets.extend(offsets.iter().skip(1).map(|off| *off + last));
1097 }
1098 }
1099 }
1100 let validity = if has_nulls {
1101 Some(validity_builder.finish())
1102 } else {
1103 None
1104 };
1105 match layer_kind {
1106 LayerKind::Fsl => RawRepDef::Fsl(FslDesc {
1107 validity,
1108 num_values: all_num_values,
1109 dimension: all_dimension,
1110 }),
1111 LayerKind::Validity => RawRepDef::Validity(ValidityDesc {
1112 validity,
1113 num_values: all_num_values,
1114 }),
1115 LayerKind::Offsets => RawRepDef::Offsets(OffsetDesc {
1116 offsets: all_offsets.into(),
1117 validity,
1118 has_empty_lists: all_has_empty_lists,
1119 num_values: all_num_values,
1120 num_specials: total_num_specials,
1121 }),
1122 }
1123 }
1124
1125 pub fn serialize(builders: Vec<Self>) -> SerializedRepDefs {
1128 assert!(!builders.is_empty());
1129 if builders.iter().all(|b| b.is_empty()) {
1130 return SerializedRepDefs::empty(
1132 builders
1133 .first()
1134 .unwrap()
1135 .repdefs
1136 .iter()
1137 .map(|_| DefinitionInterpretation::AllValidItem)
1138 .collect::<Vec<_>>(),
1139 );
1140 }
1141
1142 let num_layers = builders[0].num_layers();
1143 let combined_layers = (0..num_layers)
1144 .map(|layer_index| {
1145 Self::concat_layers(
1146 builders.iter().map(|b| &b.repdefs[layer_index]),
1147 builders.len(),
1148 )
1149 })
1150 .collect::<Vec<_>>();
1151 debug_assert!(builders
1152 .iter()
1153 .all(|b| b.num_layers() == builders[0].num_layers()));
1154
1155 let total_len = combined_layers.last().unwrap().num_values()
1156 + combined_layers
1157 .iter()
1158 .map(|l| l.num_specials())
1159 .sum::<usize>();
1160 let max_rep = combined_layers.iter().map(|l| l.max_rep()).sum::<u16>();
1161 let max_def = combined_layers.iter().map(|l| l.max_def()).sum::<u16>();
1162
1163 let mut context = SerializerContext::new(total_len, num_layers, max_rep, max_def);
1164 for layer in combined_layers.into_iter() {
1165 match layer {
1166 RawRepDef::Validity(def) => {
1167 context.record_validity(&def);
1168 }
1169 RawRepDef::Offsets(rep) => {
1170 context.record_offsets(&rep);
1171 }
1172 RawRepDef::Fsl(fsl) => {
1173 context.record_fsl(&fsl);
1174 }
1175 }
1176 }
1177 context.build()
1178 }
1179}
1180
1181#[derive(Debug)]
1186pub struct RepDefUnraveler {
1187 rep_levels: Option<LevelBuffer>,
1188 def_levels: Option<LevelBuffer>,
1189 levels_to_rep: Vec<u16>,
1191 def_meaning: Arc<[DefinitionInterpretation]>,
1192 current_def_cmp: u16,
1194 current_rep_cmp: u16,
1196 current_layer: usize,
1199}
1200
1201impl RepDefUnraveler {
1202 pub fn new(
1204 rep_levels: Option<LevelBuffer>,
1205 def_levels: Option<LevelBuffer>,
1206 def_meaning: Arc<[DefinitionInterpretation]>,
1207 ) -> Self {
1208 let mut levels_to_rep = Vec::with_capacity(def_meaning.len());
1209 let mut rep_counter = 0;
1210 levels_to_rep.push(0);
1212 for meaning in def_meaning.as_ref() {
1213 match meaning {
1214 DefinitionInterpretation::AllValidItem | DefinitionInterpretation::AllValidList => {
1215 }
1217 DefinitionInterpretation::NullableItem => {
1218 levels_to_rep.push(rep_counter);
1220 }
1221 DefinitionInterpretation::NullableList => {
1222 rep_counter += 1;
1223 levels_to_rep.push(rep_counter);
1224 }
1225 DefinitionInterpretation::EmptyableList => {
1226 rep_counter += 1;
1227 levels_to_rep.push(rep_counter);
1228 }
1229 DefinitionInterpretation::NullableAndEmptyableList => {
1230 rep_counter += 1;
1231 levels_to_rep.push(rep_counter);
1232 levels_to_rep.push(rep_counter);
1233 }
1234 }
1235 }
1236 Self {
1237 rep_levels,
1238 def_levels,
1239 current_def_cmp: 0,
1240 current_rep_cmp: 0,
1241 levels_to_rep,
1242 current_layer: 0,
1243 def_meaning,
1244 }
1245 }
1246
1247 pub fn is_all_valid(&self) -> bool {
1248 self.def_meaning[self.current_layer].is_all_valid()
1249 }
1250
1251 pub fn max_lists(&self) -> usize {
1257 debug_assert!(
1258 self.def_meaning[self.current_layer] != DefinitionInterpretation::NullableItem
1259 );
1260 self.rep_levels
1261 .as_ref()
1262 .map(|levels| levels.len())
1264 .unwrap_or(0)
1265 }
1266
1267 pub fn unravel_offsets<T: ArrowNativeType>(
1272 &mut self,
1273 offsets: &mut Vec<T>,
1274 validity: Option<&mut BooleanBufferBuilder>,
1275 ) -> Result<()> {
1276 let rep_levels = self
1277 .rep_levels
1278 .as_mut()
1279 .expect("Expected repetition level but data didn't contain repetition");
1280 let valid_level = self.current_def_cmp;
1281 let (null_level, empty_level) = match self.def_meaning[self.current_layer] {
1282 DefinitionInterpretation::NullableList => {
1283 self.current_def_cmp += 1;
1284 (valid_level + 1, 0)
1285 }
1286 DefinitionInterpretation::EmptyableList => {
1287 self.current_def_cmp += 1;
1288 (0, valid_level + 1)
1289 }
1290 DefinitionInterpretation::NullableAndEmptyableList => {
1291 self.current_def_cmp += 2;
1292 (valid_level + 1, valid_level + 2)
1293 }
1294 DefinitionInterpretation::AllValidList => (0, 0),
1295 _ => unreachable!(),
1296 };
1297 self.current_layer += 1;
1298
1299 let mut max_level = null_level.max(empty_level);
1303 let upper_null = max_level;
1306 for level in self.def_meaning[self.current_layer..].iter() {
1307 match level {
1308 DefinitionInterpretation::NullableItem => {
1309 max_level += 1;
1310 }
1311 DefinitionInterpretation::AllValidItem => {}
1312 _ => {
1313 break;
1314 }
1315 }
1316 }
1317
1318 let mut curlen: usize = offsets.last().map(|o| o.as_usize()).unwrap_or(0);
1319
1320 offsets.pop();
1328
1329 let to_offset = |val: usize| {
1330 T::from_usize(val)
1331 .ok_or_else(|| Error::invalid_input("A single batch had more than i32::MAX values and so a large container type is required", location!()))
1332 };
1333 self.current_rep_cmp += 1;
1334 if let Some(def_levels) = &mut self.def_levels {
1335 assert!(rep_levels.len() == def_levels.len());
1336 let mut push_validity: Box<dyn FnMut(bool)> = if let Some(validity) = validity {
1339 Box::new(|is_valid| validity.append(is_valid))
1340 } else {
1341 Box::new(|_| {})
1342 };
1343 let mut read_idx = 0;
1347 let mut write_idx = 0;
1348 while read_idx < rep_levels.len() {
1349 unsafe {
1352 let rep_val = *rep_levels.get_unchecked(read_idx);
1353 if rep_val != 0 {
1354 let def_val = *def_levels.get_unchecked(read_idx);
1355 *rep_levels.get_unchecked_mut(write_idx) = rep_val - 1;
1357 *def_levels.get_unchecked_mut(write_idx) = def_val;
1358 write_idx += 1;
1359
1360 if def_val == 0 {
1361 offsets.push(to_offset(curlen)?);
1363 curlen += 1;
1364 push_validity(true);
1365 } else if def_val > max_level {
1366 } else if def_val == null_level || def_val > upper_null {
1368 offsets.push(to_offset(curlen)?);
1370 push_validity(false);
1371 } else if def_val == empty_level {
1372 offsets.push(to_offset(curlen)?);
1374 push_validity(true);
1375 } else {
1376 offsets.push(to_offset(curlen)?);
1378 curlen += 1;
1379 push_validity(true);
1380 }
1381 } else {
1382 curlen += 1;
1383 }
1384 read_idx += 1;
1385 }
1386 }
1387 offsets.push(to_offset(curlen)?);
1388 rep_levels.truncate(write_idx);
1389 def_levels.truncate(write_idx);
1390 Ok(())
1391 } else {
1392 let mut read_idx = 0;
1394 let mut write_idx = 0;
1395 let old_offsets_len = offsets.len();
1396 while read_idx < rep_levels.len() {
1397 unsafe {
1399 let rep_val = *rep_levels.get_unchecked(read_idx);
1400 if rep_val != 0 {
1401 offsets.push(to_offset(curlen)?);
1403 *rep_levels.get_unchecked_mut(write_idx) = rep_val - 1;
1404 write_idx += 1;
1405 }
1406 curlen += 1;
1407 read_idx += 1;
1408 }
1409 }
1410 let num_new_lists = offsets.len() - old_offsets_len;
1411 offsets.push(to_offset(curlen)?);
1412 rep_levels.truncate(offsets.len() - 1);
1413 if let Some(validity) = validity {
1414 validity.append_n(num_new_lists, true);
1417 }
1418 Ok(())
1419 }
1420 }
1421
1422 pub fn skip_validity(&mut self) {
1423 debug_assert!(
1424 self.def_meaning[self.current_layer] == DefinitionInterpretation::AllValidItem
1425 );
1426 self.current_layer += 1;
1427 }
1428
1429 pub fn unravel_validity(&mut self, validity: &mut BooleanBufferBuilder) {
1431 debug_assert!(
1432 self.def_meaning[self.current_layer] != DefinitionInterpretation::AllValidItem
1433 );
1434 self.current_layer += 1;
1435
1436 let def_levels = &self.def_levels.as_ref().unwrap();
1437
1438 let current_def_cmp = self.current_def_cmp;
1439 self.current_def_cmp += 1;
1440
1441 for is_valid in def_levels.iter().filter_map(|&level| {
1442 if self.levels_to_rep[level as usize] <= self.current_rep_cmp {
1443 Some(level <= current_def_cmp)
1444 } else {
1445 None
1446 }
1447 }) {
1448 validity.append(is_valid);
1449 }
1450 }
1451
1452 pub fn decimate(&mut self, dimension: usize) {
1453 if self.rep_levels.is_some() {
1454 todo!("Not yet supported FSL<...List<...>>");
1466 }
1467 let Some(def_levels) = self.def_levels.as_mut() else {
1468 return;
1469 };
1470 let mut read_idx = 0;
1471 let mut write_idx = 0;
1472 while read_idx < def_levels.len() {
1473 unsafe {
1474 *def_levels.get_unchecked_mut(write_idx) = *def_levels.get_unchecked(read_idx);
1475 }
1476 write_idx += 1;
1477 read_idx += dimension;
1478 }
1479 def_levels.truncate(write_idx);
1480 }
1481}
1482
1483#[derive(Debug)]
1497pub struct CompositeRepDefUnraveler {
1498 unravelers: Vec<RepDefUnraveler>,
1499}
1500
1501impl CompositeRepDefUnraveler {
1502 pub fn new(unravelers: Vec<RepDefUnraveler>) -> Self {
1503 Self { unravelers }
1504 }
1505
1506 pub fn unravel_validity(&mut self, num_values: usize) -> Option<NullBuffer> {
1510 let is_all_valid = self
1511 .unravelers
1512 .iter()
1513 .all(|unraveler| unraveler.is_all_valid());
1514
1515 if is_all_valid {
1516 for unraveler in self.unravelers.iter_mut() {
1517 unraveler.skip_validity();
1518 }
1519 None
1520 } else {
1521 let mut validity = BooleanBufferBuilder::new(num_values);
1522 for unraveler in self.unravelers.iter_mut() {
1523 unraveler.unravel_validity(&mut validity);
1524 }
1525 Some(NullBuffer::new(validity.finish()))
1526 }
1527 }
1528
1529 pub fn unravel_fsl_validity(
1530 &mut self,
1531 num_values: usize,
1532 dimension: usize,
1533 ) -> Option<NullBuffer> {
1534 for unraveler in self.unravelers.iter_mut() {
1535 unraveler.decimate(dimension);
1536 }
1537 self.unravel_validity(num_values)
1538 }
1539
1540 pub fn unravel_offsets<T: ArrowNativeType>(
1542 &mut self,
1543 ) -> Result<(OffsetBuffer<T>, Option<NullBuffer>)> {
1544 let mut is_all_valid = true;
1545 let mut max_num_lists = 0;
1546 for unraveler in self.unravelers.iter() {
1547 is_all_valid &= unraveler.is_all_valid();
1548 max_num_lists += unraveler.max_lists();
1549 }
1550
1551 let mut validity = if is_all_valid {
1552 None
1553 } else {
1554 Some(BooleanBufferBuilder::new(max_num_lists))
1557 };
1558
1559 let mut offsets = Vec::with_capacity(max_num_lists + 1);
1560
1561 for unraveler in self.unravelers.iter_mut() {
1562 unraveler.unravel_offsets(&mut offsets, validity.as_mut())?;
1563 }
1564
1565 Ok((
1566 OffsetBuffer::new(ScalarBuffer::from(offsets)),
1567 validity.map(|mut v| NullBuffer::new(v.finish())),
1568 ))
1569 }
1570}
1571
1572#[derive(Debug)]
1578pub struct BinaryControlWordIterator<I: Iterator<Item = (u16, u16)>, W> {
1579 repdef: I,
1580 def_width: usize,
1581 max_rep: u16,
1582 max_visible_def: u16,
1583 rep_mask: u16,
1584 def_mask: u16,
1585 bits_rep: u8,
1586 bits_def: u8,
1587 phantom: std::marker::PhantomData<W>,
1588}
1589
1590impl<I: Iterator<Item = (u16, u16)>> BinaryControlWordIterator<I, u8> {
1591 fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1592 let next = self.repdef.next()?;
1593 let control_word: u8 =
1594 (((next.0 & self.rep_mask) as u8) << self.def_width) + ((next.1 & self.def_mask) as u8);
1595 buf.push(control_word);
1596 let is_new_row = next.0 == self.max_rep;
1597 let is_visible = next.1 <= self.max_visible_def;
1598 let is_valid_item = next.1 == 0;
1599 Some(ControlWordDesc {
1600 is_new_row,
1601 is_visible,
1602 is_valid_item,
1603 })
1604 }
1605}
1606
1607impl<I: Iterator<Item = (u16, u16)>> BinaryControlWordIterator<I, u16> {
1608 fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1609 let next = self.repdef.next()?;
1610 let control_word: u16 =
1611 ((next.0 & self.rep_mask) << self.def_width) + (next.1 & self.def_mask);
1612 let control_word = control_word.to_le_bytes();
1613 buf.push(control_word[0]);
1614 buf.push(control_word[1]);
1615 let is_new_row = next.0 == self.max_rep;
1616 let is_visible = next.1 <= self.max_visible_def;
1617 let is_valid_item = next.1 == 0;
1618 Some(ControlWordDesc {
1619 is_new_row,
1620 is_visible,
1621 is_valid_item,
1622 })
1623 }
1624}
1625
1626impl<I: Iterator<Item = (u16, u16)>> BinaryControlWordIterator<I, u32> {
1627 fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1628 let next = self.repdef.next()?;
1629 let control_word: u32 = (((next.0 & self.rep_mask) as u32) << self.def_width)
1630 + ((next.1 & self.def_mask) as u32);
1631 let control_word = control_word.to_le_bytes();
1632 buf.push(control_word[0]);
1633 buf.push(control_word[1]);
1634 buf.push(control_word[2]);
1635 buf.push(control_word[3]);
1636 let is_new_row = next.0 == self.max_rep;
1637 let is_visible = next.1 <= self.max_visible_def;
1638 let is_valid_item = next.1 == 0;
1639 Some(ControlWordDesc {
1640 is_new_row,
1641 is_visible,
1642 is_valid_item,
1643 })
1644 }
1645}
1646
1647#[derive(Debug)]
1649pub struct UnaryControlWordIterator<I: Iterator<Item = u16>, W> {
1650 repdef: I,
1651 level_mask: u16,
1652 bits_rep: u8,
1653 bits_def: u8,
1654 max_rep: u16,
1655 phantom: std::marker::PhantomData<W>,
1656}
1657
1658impl<I: Iterator<Item = u16>> UnaryControlWordIterator<I, u8> {
1659 fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1660 let next = self.repdef.next()?;
1661 buf.push((next & self.level_mask) as u8);
1662 let is_new_row = self.max_rep == 0 || next == self.max_rep;
1663 let is_valid_item = next == 0 || self.bits_def == 0;
1664 Some(ControlWordDesc {
1665 is_new_row,
1666 is_visible: true,
1669 is_valid_item,
1670 })
1671 }
1672}
1673
1674impl<I: Iterator<Item = u16>> UnaryControlWordIterator<I, u16> {
1675 fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1676 let next = self.repdef.next().unwrap() & self.level_mask;
1677 let control_word = next.to_le_bytes();
1678 buf.push(control_word[0]);
1679 buf.push(control_word[1]);
1680 let is_new_row = self.max_rep == 0 || next == self.max_rep;
1681 let is_valid_item = next == 0 || self.bits_def == 0;
1682 Some(ControlWordDesc {
1683 is_new_row,
1684 is_visible: true,
1685 is_valid_item,
1686 })
1687 }
1688}
1689
1690impl<I: Iterator<Item = u16>> UnaryControlWordIterator<I, u32> {
1691 fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1692 let next = self.repdef.next()?;
1693 let next = (next & self.level_mask) as u32;
1694 let control_word = next.to_le_bytes();
1695 buf.push(control_word[0]);
1696 buf.push(control_word[1]);
1697 buf.push(control_word[2]);
1698 buf.push(control_word[3]);
1699 let is_new_row = self.max_rep == 0 || next as u16 == self.max_rep;
1700 let is_valid_item = next == 0 || self.bits_def == 0;
1701 Some(ControlWordDesc {
1702 is_new_row,
1703 is_visible: true,
1704 is_valid_item,
1705 })
1706 }
1707}
1708
1709#[derive(Debug)]
1711pub struct NilaryControlWordIterator {
1712 len: usize,
1713 idx: usize,
1714}
1715
1716impl NilaryControlWordIterator {
1717 fn append_next(&mut self) -> Option<ControlWordDesc> {
1718 if self.idx == self.len {
1719 None
1720 } else {
1721 self.idx += 1;
1722 Some(ControlWordDesc {
1723 is_new_row: true,
1724 is_visible: true,
1725 is_valid_item: true,
1726 })
1727 }
1728 }
1729}
1730
1731fn get_mask(width: u16) -> u16 {
1733 (1 << width) - 1
1734}
1735
1736type SpecificBinaryControlWordIterator<'a, T> = BinaryControlWordIterator<
1739 Zip<Copied<std::slice::Iter<'a, u16>>, Copied<std::slice::Iter<'a, u16>>>,
1740 T,
1741>;
1742
1743#[derive(Debug)]
1753pub enum ControlWordIterator<'a> {
1754 Binary8(SpecificBinaryControlWordIterator<'a, u8>),
1755 Binary16(SpecificBinaryControlWordIterator<'a, u16>),
1756 Binary32(SpecificBinaryControlWordIterator<'a, u32>),
1757 Unary8(UnaryControlWordIterator<Copied<std::slice::Iter<'a, u16>>, u8>),
1758 Unary16(UnaryControlWordIterator<Copied<std::slice::Iter<'a, u16>>, u16>),
1759 Unary32(UnaryControlWordIterator<Copied<std::slice::Iter<'a, u16>>, u32>),
1760 Nilary(NilaryControlWordIterator),
1761}
1762
1763#[derive(Debug)]
1765pub struct ControlWordDesc {
1766 pub is_new_row: bool,
1767 pub is_visible: bool,
1768 pub is_valid_item: bool,
1769}
1770
1771impl ControlWordIterator<'_> {
1772 pub fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1776 match self {
1777 Self::Binary8(iter) => iter.append_next(buf),
1778 Self::Binary16(iter) => iter.append_next(buf),
1779 Self::Binary32(iter) => iter.append_next(buf),
1780 Self::Unary8(iter) => iter.append_next(buf),
1781 Self::Unary16(iter) => iter.append_next(buf),
1782 Self::Unary32(iter) => iter.append_next(buf),
1783 Self::Nilary(iter) => iter.append_next(),
1784 }
1785 }
1786
1787 pub fn has_repetition(&self) -> bool {
1789 match self {
1790 Self::Binary8(_) | Self::Binary16(_) | Self::Binary32(_) => true,
1791 Self::Unary8(iter) => iter.bits_rep > 0,
1792 Self::Unary16(iter) => iter.bits_rep > 0,
1793 Self::Unary32(iter) => iter.bits_rep > 0,
1794 Self::Nilary(_) => false,
1795 }
1796 }
1797
1798 pub fn bytes_per_word(&self) -> usize {
1800 match self {
1801 Self::Binary8(_) => 1,
1802 Self::Binary16(_) => 2,
1803 Self::Binary32(_) => 4,
1804 Self::Unary8(_) => 1,
1805 Self::Unary16(_) => 2,
1806 Self::Unary32(_) => 4,
1807 Self::Nilary(_) => 0,
1808 }
1809 }
1810
1811 pub fn bits_rep(&self) -> u8 {
1813 match self {
1814 Self::Binary8(iter) => iter.bits_rep,
1815 Self::Binary16(iter) => iter.bits_rep,
1816 Self::Binary32(iter) => iter.bits_rep,
1817 Self::Unary8(iter) => iter.bits_rep,
1818 Self::Unary16(iter) => iter.bits_rep,
1819 Self::Unary32(iter) => iter.bits_rep,
1820 Self::Nilary(_) => 0,
1821 }
1822 }
1823
1824 pub fn bits_def(&self) -> u8 {
1826 match self {
1827 Self::Binary8(iter) => iter.bits_def,
1828 Self::Binary16(iter) => iter.bits_def,
1829 Self::Binary32(iter) => iter.bits_def,
1830 Self::Unary8(iter) => iter.bits_def,
1831 Self::Unary16(iter) => iter.bits_def,
1832 Self::Unary32(iter) => iter.bits_def,
1833 Self::Nilary(_) => 0,
1834 }
1835 }
1836}
1837
1838pub fn build_control_word_iterator<'a>(
1842 rep: Option<&'a [u16]>,
1843 max_rep: u16,
1844 def: Option<&'a [u16]>,
1845 max_def: u16,
1846 max_visible_def: u16,
1847 len: usize,
1848) -> ControlWordIterator<'a> {
1849 let rep_width = if max_rep == 0 {
1850 0
1851 } else {
1852 log_2_ceil(max_rep as u32) as u16
1853 };
1854 let rep_mask = if max_rep == 0 { 0 } else { get_mask(rep_width) };
1855 let def_width = if max_def == 0 {
1856 0
1857 } else {
1858 log_2_ceil(max_def as u32) as u16
1859 };
1860 let def_mask = if max_def == 0 { 0 } else { get_mask(def_width) };
1861 let total_width = rep_width + def_width;
1862 match (rep, def) {
1863 (Some(rep), Some(def)) => {
1864 let iter = rep.iter().copied().zip(def.iter().copied());
1865 let def_width = def_width as usize;
1866 if total_width <= 8 {
1867 ControlWordIterator::Binary8(BinaryControlWordIterator {
1868 repdef: iter,
1869 rep_mask,
1870 def_mask,
1871 def_width,
1872 max_rep,
1873 max_visible_def,
1874 bits_rep: rep_width as u8,
1875 bits_def: def_width as u8,
1876 phantom: std::marker::PhantomData,
1877 })
1878 } else if total_width <= 16 {
1879 ControlWordIterator::Binary16(BinaryControlWordIterator {
1880 repdef: iter,
1881 rep_mask,
1882 def_mask,
1883 def_width,
1884 max_rep,
1885 max_visible_def,
1886 bits_rep: rep_width as u8,
1887 bits_def: def_width as u8,
1888 phantom: std::marker::PhantomData,
1889 })
1890 } else {
1891 ControlWordIterator::Binary32(BinaryControlWordIterator {
1892 repdef: iter,
1893 rep_mask,
1894 def_mask,
1895 def_width,
1896 max_rep,
1897 max_visible_def,
1898 bits_rep: rep_width as u8,
1899 bits_def: def_width as u8,
1900 phantom: std::marker::PhantomData,
1901 })
1902 }
1903 }
1904 (Some(lev), None) => {
1905 let iter = lev.iter().copied();
1906 if total_width <= 8 {
1907 ControlWordIterator::Unary8(UnaryControlWordIterator {
1908 repdef: iter,
1909 level_mask: rep_mask,
1910 bits_rep: total_width as u8,
1911 bits_def: 0,
1912 max_rep,
1913 phantom: std::marker::PhantomData,
1914 })
1915 } else if total_width <= 16 {
1916 ControlWordIterator::Unary16(UnaryControlWordIterator {
1917 repdef: iter,
1918 level_mask: rep_mask,
1919 bits_rep: total_width as u8,
1920 bits_def: 0,
1921 max_rep,
1922 phantom: std::marker::PhantomData,
1923 })
1924 } else {
1925 ControlWordIterator::Unary32(UnaryControlWordIterator {
1926 repdef: iter,
1927 level_mask: rep_mask,
1928 bits_rep: total_width as u8,
1929 bits_def: 0,
1930 max_rep,
1931 phantom: std::marker::PhantomData,
1932 })
1933 }
1934 }
1935 (None, Some(lev)) => {
1936 let iter = lev.iter().copied();
1937 if total_width <= 8 {
1938 ControlWordIterator::Unary8(UnaryControlWordIterator {
1939 repdef: iter,
1940 level_mask: def_mask,
1941 bits_rep: 0,
1942 bits_def: total_width as u8,
1943 max_rep: 0,
1944 phantom: std::marker::PhantomData,
1945 })
1946 } else if total_width <= 16 {
1947 ControlWordIterator::Unary16(UnaryControlWordIterator {
1948 repdef: iter,
1949 level_mask: def_mask,
1950 bits_rep: 0,
1951 bits_def: total_width as u8,
1952 max_rep: 0,
1953 phantom: std::marker::PhantomData,
1954 })
1955 } else {
1956 ControlWordIterator::Unary32(UnaryControlWordIterator {
1957 repdef: iter,
1958 level_mask: def_mask,
1959 bits_rep: 0,
1960 bits_def: total_width as u8,
1961 max_rep: 0,
1962 phantom: std::marker::PhantomData,
1963 })
1964 }
1965 }
1966 (None, None) => ControlWordIterator::Nilary(NilaryControlWordIterator { len, idx: 0 }),
1967 }
1968}
1969
1970#[derive(Copy, Clone, Debug)]
1974pub enum ControlWordParser {
1975 BOTH8(u8, u32),
1978 BOTH16(u8, u32),
1979 BOTH32(u8, u32),
1980 REP8,
1981 REP16,
1982 REP32,
1983 DEF8,
1984 DEF16,
1985 DEF32,
1986 NIL,
1987}
1988
1989impl ControlWordParser {
1990 fn parse_both<const WORD_SIZE: u8>(
1991 src: &[u8],
1992 dst_rep: &mut Vec<u16>,
1993 dst_def: &mut Vec<u16>,
1994 bits_to_shift: u8,
1995 mask_to_apply: u32,
1996 ) {
1997 match WORD_SIZE {
1998 1 => {
1999 let word = src[0];
2000 let rep = word >> bits_to_shift;
2001 let def = word & (mask_to_apply as u8);
2002 dst_rep.push(rep as u16);
2003 dst_def.push(def as u16);
2004 }
2005 2 => {
2006 let word = u16::from_le_bytes([src[0], src[1]]);
2007 let rep = word >> bits_to_shift;
2008 let def = word & mask_to_apply as u16;
2009 dst_rep.push(rep);
2010 dst_def.push(def);
2011 }
2012 4 => {
2013 let word = u32::from_le_bytes([src[0], src[1], src[2], src[3]]);
2014 let rep = word >> bits_to_shift;
2015 let def = word & mask_to_apply;
2016 dst_rep.push(rep as u16);
2017 dst_def.push(def as u16);
2018 }
2019 _ => unreachable!(),
2020 }
2021 }
2022
2023 fn parse_desc_both<const WORD_SIZE: u8>(
2024 src: &[u8],
2025 bits_to_shift: u8,
2026 mask_to_apply: u32,
2027 max_rep: u16,
2028 max_visible_def: u16,
2029 ) -> ControlWordDesc {
2030 match WORD_SIZE {
2031 1 => {
2032 let word = src[0];
2033 let rep = word >> bits_to_shift;
2034 let def = word & (mask_to_apply as u8);
2035 let is_visible = def as u16 <= max_visible_def;
2036 let is_new_row = rep as u16 == max_rep;
2037 let is_valid_item = def == 0;
2038 ControlWordDesc {
2039 is_visible,
2040 is_new_row,
2041 is_valid_item,
2042 }
2043 }
2044 2 => {
2045 let word = u16::from_le_bytes([src[0], src[1]]);
2046 let rep = word >> bits_to_shift;
2047 let def = word & mask_to_apply as u16;
2048 let is_visible = def <= max_visible_def;
2049 let is_new_row = rep == max_rep;
2050 let is_valid_item = def == 0;
2051 ControlWordDesc {
2052 is_visible,
2053 is_new_row,
2054 is_valid_item,
2055 }
2056 }
2057 4 => {
2058 let word = u32::from_le_bytes([src[0], src[1], src[2], src[3]]);
2059 let rep = word >> bits_to_shift;
2060 let def = word & mask_to_apply;
2061 let is_visible = def as u16 <= max_visible_def;
2062 let is_new_row = rep as u16 == max_rep;
2063 let is_valid_item = def == 0;
2064 ControlWordDesc {
2065 is_visible,
2066 is_new_row,
2067 is_valid_item,
2068 }
2069 }
2070 _ => unreachable!(),
2071 }
2072 }
2073
2074 fn parse_one<const WORD_SIZE: u8>(src: &[u8], dst: &mut Vec<u16>) {
2075 match WORD_SIZE {
2076 1 => {
2077 let word = src[0];
2078 dst.push(word as u16);
2079 }
2080 2 => {
2081 let word = u16::from_le_bytes([src[0], src[1]]);
2082 dst.push(word);
2083 }
2084 4 => {
2085 let word = u32::from_le_bytes([src[0], src[1], src[2], src[3]]);
2086 dst.push(word as u16);
2087 }
2088 _ => unreachable!(),
2089 }
2090 }
2091
2092 fn parse_rep_desc_one<const WORD_SIZE: u8>(src: &[u8], max_rep: u16) -> ControlWordDesc {
2093 match WORD_SIZE {
2094 1 => ControlWordDesc {
2095 is_new_row: src[0] as u16 == max_rep,
2096 is_visible: true,
2097 is_valid_item: true,
2098 },
2099 2 => ControlWordDesc {
2100 is_new_row: u16::from_le_bytes([src[0], src[1]]) == max_rep,
2101 is_visible: true,
2102 is_valid_item: true,
2103 },
2104 4 => ControlWordDesc {
2105 is_new_row: u32::from_le_bytes([src[0], src[1], src[2], src[3]]) as u16 == max_rep,
2106 is_visible: true,
2107 is_valid_item: true,
2108 },
2109 _ => unreachable!(),
2110 }
2111 }
2112
2113 fn parse_def_desc_one<const WORD_SIZE: u8>(src: &[u8]) -> ControlWordDesc {
2114 match WORD_SIZE {
2115 1 => ControlWordDesc {
2116 is_new_row: true,
2117 is_visible: true,
2118 is_valid_item: src[0] == 0,
2119 },
2120 2 => ControlWordDesc {
2121 is_new_row: true,
2122 is_visible: true,
2123 is_valid_item: u16::from_le_bytes([src[0], src[1]]) == 0,
2124 },
2125 4 => ControlWordDesc {
2126 is_new_row: true,
2127 is_visible: true,
2128 is_valid_item: u32::from_le_bytes([src[0], src[1], src[2], src[3]]) as u16 == 0,
2129 },
2130 _ => unreachable!(),
2131 }
2132 }
2133
2134 pub fn bytes_per_word(&self) -> usize {
2136 match self {
2137 Self::BOTH8(..) => 1,
2138 Self::BOTH16(..) => 2,
2139 Self::BOTH32(..) => 4,
2140 Self::REP8 => 1,
2141 Self::REP16 => 2,
2142 Self::REP32 => 4,
2143 Self::DEF8 => 1,
2144 Self::DEF16 => 2,
2145 Self::DEF32 => 4,
2146 Self::NIL => 0,
2147 }
2148 }
2149
2150 pub fn parse(&self, src: &[u8], dst_rep: &mut Vec<u16>, dst_def: &mut Vec<u16>) {
2157 match self {
2158 Self::BOTH8(bits_to_shift, mask_to_apply) => {
2159 Self::parse_both::<1>(src, dst_rep, dst_def, *bits_to_shift, *mask_to_apply)
2160 }
2161 Self::BOTH16(bits_to_shift, mask_to_apply) => {
2162 Self::parse_both::<2>(src, dst_rep, dst_def, *bits_to_shift, *mask_to_apply)
2163 }
2164 Self::BOTH32(bits_to_shift, mask_to_apply) => {
2165 Self::parse_both::<4>(src, dst_rep, dst_def, *bits_to_shift, *mask_to_apply)
2166 }
2167 Self::REP8 => Self::parse_one::<1>(src, dst_rep),
2168 Self::REP16 => Self::parse_one::<2>(src, dst_rep),
2169 Self::REP32 => Self::parse_one::<4>(src, dst_rep),
2170 Self::DEF8 => Self::parse_one::<1>(src, dst_def),
2171 Self::DEF16 => Self::parse_one::<2>(src, dst_def),
2172 Self::DEF32 => Self::parse_one::<4>(src, dst_def),
2173 Self::NIL => {}
2174 }
2175 }
2176
2177 pub fn has_rep(&self) -> bool {
2179 match self {
2180 Self::BOTH8(..)
2181 | Self::BOTH16(..)
2182 | Self::BOTH32(..)
2183 | Self::REP8
2184 | Self::REP16
2185 | Self::REP32 => true,
2186 Self::DEF8 | Self::DEF16 | Self::DEF32 | Self::NIL => false,
2187 }
2188 }
2189
2190 pub fn parse_desc(&self, src: &[u8], max_rep: u16, max_visible_def: u16) -> ControlWordDesc {
2192 match self {
2193 Self::BOTH8(bits_to_shift, mask_to_apply) => Self::parse_desc_both::<1>(
2194 src,
2195 *bits_to_shift,
2196 *mask_to_apply,
2197 max_rep,
2198 max_visible_def,
2199 ),
2200 Self::BOTH16(bits_to_shift, mask_to_apply) => Self::parse_desc_both::<2>(
2201 src,
2202 *bits_to_shift,
2203 *mask_to_apply,
2204 max_rep,
2205 max_visible_def,
2206 ),
2207 Self::BOTH32(bits_to_shift, mask_to_apply) => Self::parse_desc_both::<4>(
2208 src,
2209 *bits_to_shift,
2210 *mask_to_apply,
2211 max_rep,
2212 max_visible_def,
2213 ),
2214 Self::REP8 => Self::parse_rep_desc_one::<1>(src, max_rep),
2215 Self::REP16 => Self::parse_rep_desc_one::<2>(src, max_rep),
2216 Self::REP32 => Self::parse_rep_desc_one::<4>(src, max_rep),
2217 Self::DEF8 => Self::parse_def_desc_one::<1>(src),
2218 Self::DEF16 => Self::parse_def_desc_one::<2>(src),
2219 Self::DEF32 => Self::parse_def_desc_one::<4>(src),
2220 Self::NIL => ControlWordDesc {
2221 is_new_row: true,
2222 is_valid_item: true,
2223 is_visible: true,
2224 },
2225 }
2226 }
2227
2228 pub fn new(bits_rep: u8, bits_def: u8) -> Self {
2230 let total_bits = bits_rep + bits_def;
2231
2232 enum WordSize {
2233 One,
2234 Two,
2235 Four,
2236 }
2237
2238 let word_size = if total_bits <= 8 {
2239 WordSize::One
2240 } else if total_bits <= 16 {
2241 WordSize::Two
2242 } else {
2243 WordSize::Four
2244 };
2245
2246 match (bits_rep > 0, bits_def > 0, word_size) {
2247 (false, false, _) => Self::NIL,
2248 (false, true, WordSize::One) => Self::DEF8,
2249 (false, true, WordSize::Two) => Self::DEF16,
2250 (false, true, WordSize::Four) => Self::DEF32,
2251 (true, false, WordSize::One) => Self::REP8,
2252 (true, false, WordSize::Two) => Self::REP16,
2253 (true, false, WordSize::Four) => Self::REP32,
2254 (true, true, WordSize::One) => Self::BOTH8(bits_def, get_mask(bits_def as u16) as u32),
2255 (true, true, WordSize::Two) => Self::BOTH16(bits_def, get_mask(bits_def as u16) as u32),
2256 (true, true, WordSize::Four) => {
2257 Self::BOTH32(bits_def, get_mask(bits_def as u16) as u32)
2258 }
2259 }
2260 }
2261}
2262
2263#[cfg(test)]
2264mod tests {
2265 use arrow_buffer::{NullBuffer, OffsetBuffer, ScalarBuffer};
2266
2267 use crate::repdef::{
2268 CompositeRepDefUnraveler, DefinitionInterpretation, RepDefUnraveler, SerializedRepDefs,
2269 };
2270
2271 use super::RepDefBuilder;
2272
2273 fn validity(values: &[bool]) -> NullBuffer {
2274 NullBuffer::from_iter(values.iter().copied())
2275 }
2276
2277 fn offsets_32(values: &[i32]) -> OffsetBuffer<i32> {
2278 OffsetBuffer::<i32>::new(ScalarBuffer::from_iter(values.iter().copied()))
2279 }
2280
2281 fn offsets_64(values: &[i64]) -> OffsetBuffer<i64> {
2282 OffsetBuffer::<i64>::new(ScalarBuffer::from_iter(values.iter().copied()))
2283 }
2284
2285 #[test]
2286 fn test_repdef_basic() {
2287 let mut builder = RepDefBuilder::default();
2289 builder.add_offsets(
2290 offsets_64(&[0, 2, 2, 5]),
2291 Some(validity(&[true, false, true])),
2292 );
2293 builder.add_offsets(
2294 offsets_64(&[0, 1, 3, 5, 5, 9]),
2295 Some(validity(&[true, true, true, false, true])),
2296 );
2297 builder.add_validity_bitmap(validity(&[
2298 true, true, true, false, false, false, true, true, false,
2299 ]));
2300
2301 let repdefs = RepDefBuilder::serialize(vec![builder]);
2302 let rep = repdefs.repetition_levels.unwrap();
2303 let def = repdefs.definition_levels.unwrap();
2304
2305 assert_eq!(vec![0, 0, 0, 3, 1, 1, 2, 1, 0, 0, 1], *def);
2306 assert_eq!(vec![2, 1, 0, 2, 2, 0, 1, 1, 0, 0, 0], *rep);
2307
2308 let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2311 Some(rep.as_ref().to_vec()),
2312 Some(def.as_ref().to_vec()),
2313 repdefs.def_meaning.into(),
2314 )]);
2315
2316 assert_eq!(
2319 unraveler.unravel_validity(9),
2320 Some(validity(&[
2321 true, true, true, false, false, false, true, true, false
2322 ]))
2323 );
2324 let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2325 assert_eq!(off.inner(), offsets_32(&[0, 1, 3, 5, 5, 9]).inner());
2326 assert_eq!(val, Some(validity(&[true, true, true, false, true])));
2327 let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2328 assert_eq!(off.inner(), offsets_32(&[0, 2, 2, 5]).inner());
2329 assert_eq!(val, Some(validity(&[true, false, true])));
2330 }
2331
2332 #[test]
2333 fn test_repdef_simple_null_empty_list() {
2334 let check = |repdefs: SerializedRepDefs, last_def: DefinitionInterpretation| {
2335 let rep = repdefs.repetition_levels.unwrap();
2336 let def = repdefs.definition_levels.unwrap();
2337
2338 assert_eq!([1, 0, 1, 1, 0, 0], *rep);
2339 assert_eq!([0, 0, 2, 0, 1, 0], *def);
2340 assert_eq!(
2341 vec![DefinitionInterpretation::NullableItem, last_def,],
2342 repdefs.def_meaning
2343 );
2344 };
2345
2346 let mut builder = RepDefBuilder::default();
2350 builder.add_offsets(
2351 offsets_32(&[0, 2, 2, 5]),
2352 Some(validity(&[true, false, true])),
2353 );
2354 builder.add_validity_bitmap(validity(&[true, true, true, false, true]));
2355
2356 let repdefs = RepDefBuilder::serialize(vec![builder]);
2357
2358 check(repdefs, DefinitionInterpretation::NullableList);
2359
2360 let mut builder = RepDefBuilder::default();
2362 builder.add_offsets(offsets_32(&[0, 2, 2, 5]), None);
2363 builder.add_validity_bitmap(validity(&[true, true, true, false, true]));
2364
2365 let repdefs = RepDefBuilder::serialize(vec![builder]);
2366
2367 check(repdefs, DefinitionInterpretation::EmptyableList);
2368 }
2369
2370 #[test]
2371 fn test_repdef_empty_list_at_end() {
2372 let mut builder = RepDefBuilder::default();
2374 builder.add_offsets(offsets_32(&[0, 2, 5, 5]), None);
2375 builder.add_validity_bitmap(validity(&[true, true, true, false, true]));
2376
2377 let repdefs = RepDefBuilder::serialize(vec![builder]);
2378
2379 let rep = repdefs.repetition_levels.unwrap();
2380 let def = repdefs.definition_levels.unwrap();
2381
2382 assert_eq!([1, 0, 1, 0, 0, 1], *rep);
2383 assert_eq!([0, 0, 0, 1, 0, 2], *def);
2384 assert_eq!(
2385 vec![
2386 DefinitionInterpretation::NullableItem,
2387 DefinitionInterpretation::EmptyableList,
2388 ],
2389 repdefs.def_meaning
2390 );
2391 }
2392
2393 #[test]
2394 fn test_repdef_abnormal_nulls() {
2395 let mut builder = RepDefBuilder::default();
2398 builder.add_offsets(
2399 offsets_32(&[0, 2, 5, 8]),
2400 Some(validity(&[true, false, true])),
2401 );
2402 builder.add_no_null(5);
2405
2406 let repdefs = RepDefBuilder::serialize(vec![builder]);
2407
2408 let rep = repdefs.repetition_levels.unwrap();
2409 let def = repdefs.definition_levels.unwrap();
2410
2411 assert_eq!([1, 0, 1, 1, 0, 0], *rep);
2412 assert_eq!([0, 0, 1, 0, 0, 0], *def);
2413
2414 assert_eq!(
2415 vec![
2416 DefinitionInterpretation::AllValidItem,
2417 DefinitionInterpretation::NullableList,
2418 ],
2419 repdefs.def_meaning
2420 );
2421 }
2422
2423 #[test]
2424 fn test_repdef_fsl() {
2425 let mut builder = RepDefBuilder::default();
2426 builder.add_fsl(Some(validity(&[true, false])), 2, 2);
2427 builder.add_fsl(None, 2, 4);
2428 builder.add_validity_bitmap(validity(&[
2429 true, false, true, false, true, false, true, false,
2430 ]));
2431
2432 let repdefs = RepDefBuilder::serialize(vec![builder]);
2433
2434 assert_eq!(
2435 vec![
2436 DefinitionInterpretation::NullableItem,
2437 DefinitionInterpretation::AllValidItem,
2438 DefinitionInterpretation::NullableItem
2439 ],
2440 repdefs.def_meaning
2441 );
2442
2443 assert!(repdefs.repetition_levels.is_none());
2444
2445 let def = repdefs.definition_levels.unwrap();
2446
2447 assert_eq!([0, 1, 0, 1, 2, 2, 2, 2], *def);
2448
2449 let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2450 None,
2451 Some(def.as_ref().to_vec()),
2452 repdefs.def_meaning.into(),
2453 )]);
2454
2455 assert_eq!(
2456 unraveler.unravel_validity(8),
2457 Some(validity(&[
2458 true, false, true, false, false, false, false, false
2459 ]))
2460 );
2461 assert_eq!(unraveler.unravel_fsl_validity(4, 2), None);
2462 assert_eq!(
2463 unraveler.unravel_fsl_validity(2, 2),
2464 Some(validity(&[true, false]))
2465 );
2466 }
2467
2468 #[test]
2469 fn test_repdef_fsl_allvalid_item() {
2470 let mut builder = RepDefBuilder::default();
2471 builder.add_fsl(Some(validity(&[true, false])), 2, 2);
2472 builder.add_fsl(None, 2, 4);
2473 builder.add_no_null(8);
2474
2475 let repdefs = RepDefBuilder::serialize(vec![builder]);
2476
2477 assert_eq!(
2478 vec![
2479 DefinitionInterpretation::AllValidItem,
2480 DefinitionInterpretation::AllValidItem,
2481 DefinitionInterpretation::NullableItem
2482 ],
2483 repdefs.def_meaning
2484 );
2485
2486 assert!(repdefs.repetition_levels.is_none());
2487
2488 let def = repdefs.definition_levels.unwrap();
2489
2490 assert_eq!([0, 0, 0, 0, 1, 1, 1, 1], *def);
2491
2492 let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2493 None,
2494 Some(def.as_ref().to_vec()),
2495 repdefs.def_meaning.into(),
2496 )]);
2497
2498 assert_eq!(unraveler.unravel_validity(8), None);
2499 assert_eq!(unraveler.unravel_fsl_validity(4, 2), None);
2500 assert_eq!(
2501 unraveler.unravel_fsl_validity(2, 2),
2502 Some(validity(&[true, false]))
2503 );
2504 }
2505
2506 #[test]
2507 fn test_repdef_sliced_offsets() {
2508 let mut builder = RepDefBuilder::default();
2511 builder.add_offsets(
2512 offsets_32(&[5, 7, 7, 10]),
2513 Some(validity(&[true, false, true])),
2514 );
2515 builder.add_no_null(5);
2516
2517 let repdefs = RepDefBuilder::serialize(vec![builder]);
2518
2519 let rep = repdefs.repetition_levels.unwrap();
2520 let def = repdefs.definition_levels.unwrap();
2521
2522 assert_eq!([1, 0, 1, 1, 0, 0], *rep);
2523 assert_eq!([0, 0, 1, 0, 0, 0], *def);
2524
2525 assert_eq!(
2526 vec![
2527 DefinitionInterpretation::AllValidItem,
2528 DefinitionInterpretation::NullableList,
2529 ],
2530 repdefs.def_meaning
2531 );
2532 }
2533
2534 #[test]
2535 fn test_repdef_complex_null_empty() {
2536 let mut builder = RepDefBuilder::default();
2537 builder.add_offsets(
2538 offsets_32(&[0, 4, 4, 4, 6]),
2539 Some(validity(&[true, false, true, true])),
2540 );
2541 builder.add_offsets(
2542 offsets_32(&[0, 1, 1, 2, 2, 2, 3]),
2543 Some(validity(&[true, false, true, false, true, true])),
2544 );
2545 builder.add_no_null(3);
2546
2547 let repdefs = RepDefBuilder::serialize(vec![builder]);
2548
2549 let rep = repdefs.repetition_levels.unwrap();
2550 let def = repdefs.definition_levels.unwrap();
2551
2552 assert_eq!([2, 1, 1, 1, 2, 2, 2, 1], *rep);
2553 assert_eq!([0, 1, 0, 1, 3, 4, 2, 0], *def);
2554 }
2555
2556 #[test]
2557 fn test_repdef_empty_list_no_null() {
2558 let mut builder = RepDefBuilder::default();
2561 builder.add_offsets(offsets_32(&[0, 4, 4, 4, 6]), None);
2562 builder.add_no_null(6);
2563
2564 let repdefs = RepDefBuilder::serialize(vec![builder]);
2565
2566 let rep = repdefs.repetition_levels.unwrap();
2567 let def = repdefs.definition_levels.unwrap();
2568
2569 assert_eq!([1, 0, 0, 0, 1, 1, 1, 0], *rep);
2570 assert_eq!([0, 0, 0, 0, 1, 1, 0, 0], *def);
2571
2572 let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2573 Some(rep.as_ref().to_vec()),
2574 Some(def.as_ref().to_vec()),
2575 repdefs.def_meaning.into(),
2576 )]);
2577
2578 assert_eq!(unraveler.unravel_validity(6), None);
2579 let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2580 assert_eq!(off.inner(), offsets_32(&[0, 4, 4, 4, 6]).inner());
2581 assert_eq!(val, None);
2582 }
2583
2584 #[test]
2585 fn test_repdef_all_valid() {
2586 let mut builder = RepDefBuilder::default();
2587 builder.add_offsets(offsets_64(&[0, 2, 3, 5]), None);
2588 builder.add_offsets(offsets_64(&[0, 1, 3, 5, 7, 9]), None);
2589 builder.add_no_null(9);
2590
2591 let repdefs = RepDefBuilder::serialize(vec![builder]);
2592 let rep = repdefs.repetition_levels.unwrap();
2593 assert!(repdefs.definition_levels.is_none());
2594
2595 assert_eq!([2, 1, 0, 2, 0, 2, 0, 1, 0], *rep);
2596
2597 let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2598 Some(rep.as_ref().to_vec()),
2599 None,
2600 repdefs.def_meaning.into(),
2601 )]);
2602
2603 assert_eq!(unraveler.unravel_validity(9), None);
2604 let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2605 assert_eq!(off.inner(), offsets_32(&[0, 1, 3, 5, 7, 9]).inner());
2606 assert_eq!(val, None);
2607 let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2608 assert_eq!(off.inner(), offsets_32(&[0, 2, 3, 5]).inner());
2609 assert_eq!(val, None);
2610 }
2611
2612 #[test]
2613 fn test_only_empty_lists() {
2614 let mut builder = RepDefBuilder::default();
2615 builder.add_offsets(offsets_32(&[0, 4, 4, 4, 6]), None);
2616 builder.add_no_null(6);
2617
2618 let repdefs = RepDefBuilder::serialize(vec![builder]);
2619
2620 let rep = repdefs.repetition_levels.unwrap();
2621 let def = repdefs.definition_levels.unwrap();
2622
2623 assert_eq!([1, 0, 0, 0, 1, 1, 1, 0], *rep);
2624 assert_eq!([0, 0, 0, 0, 1, 1, 0, 0], *def);
2625
2626 let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2627 Some(rep.as_ref().to_vec()),
2628 Some(def.as_ref().to_vec()),
2629 repdefs.def_meaning.into(),
2630 )]);
2631
2632 assert_eq!(unraveler.unravel_validity(6), None);
2633 let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2634 assert_eq!(off.inner(), offsets_32(&[0, 4, 4, 4, 6]).inner());
2635 assert_eq!(val, None);
2636 }
2637
2638 #[test]
2639 fn test_only_null_lists() {
2640 let mut builder = RepDefBuilder::default();
2641 builder.add_offsets(
2642 offsets_32(&[0, 4, 4, 4, 6]),
2643 Some(validity(&[true, false, false, true])),
2644 );
2645 builder.add_no_null(6);
2646
2647 let repdefs = RepDefBuilder::serialize(vec![builder]);
2648
2649 let rep = repdefs.repetition_levels.unwrap();
2650 let def = repdefs.definition_levels.unwrap();
2651
2652 assert_eq!([1, 0, 0, 0, 1, 1, 1, 0], *rep);
2653 assert_eq!([0, 0, 0, 0, 1, 1, 0, 0], *def);
2654
2655 let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2656 Some(rep.as_ref().to_vec()),
2657 Some(def.as_ref().to_vec()),
2658 repdefs.def_meaning.into(),
2659 )]);
2660
2661 assert_eq!(unraveler.unravel_validity(6), None);
2662 let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2663 assert_eq!(off.inner(), offsets_32(&[0, 4, 4, 4, 6]).inner());
2664 assert_eq!(val, Some(validity(&[true, false, false, true])));
2665 }
2666
2667 #[test]
2668 fn test_null_and_empty_lists() {
2669 let mut builder = RepDefBuilder::default();
2670 builder.add_offsets(
2671 offsets_32(&[0, 4, 4, 4, 6]),
2672 Some(validity(&[true, false, true, true])),
2673 );
2674 builder.add_no_null(6);
2675
2676 let repdefs = RepDefBuilder::serialize(vec![builder]);
2677
2678 let rep = repdefs.repetition_levels.unwrap();
2679 let def = repdefs.definition_levels.unwrap();
2680
2681 assert_eq!([1, 0, 0, 0, 1, 1, 1, 0], *rep);
2682 assert_eq!([0, 0, 0, 0, 1, 2, 0, 0], *def);
2683
2684 let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2685 Some(rep.as_ref().to_vec()),
2686 Some(def.as_ref().to_vec()),
2687 repdefs.def_meaning.into(),
2688 )]);
2689
2690 assert_eq!(unraveler.unravel_validity(6), None);
2691 let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2692 assert_eq!(off.inner(), offsets_32(&[0, 4, 4, 4, 6]).inner());
2693 assert_eq!(val, Some(validity(&[true, false, true, true])));
2694 }
2695
2696 #[test]
2697 fn test_repdef_no_rep() {
2698 let mut builder = RepDefBuilder::default();
2699 builder.add_no_null(5);
2700 builder.add_validity_bitmap(validity(&[false, false, true, true, true]));
2701 builder.add_validity_bitmap(validity(&[false, true, true, true, false]));
2702
2703 let repdefs = RepDefBuilder::serialize(vec![builder]);
2704 assert!(repdefs.repetition_levels.is_none());
2705 let def = repdefs.definition_levels.unwrap();
2706
2707 assert_eq!([2, 2, 0, 0, 1], *def);
2708
2709 let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2710 None,
2711 Some(def.as_ref().to_vec()),
2712 repdefs.def_meaning.into(),
2713 )]);
2714
2715 assert_eq!(
2716 unraveler.unravel_validity(5),
2717 Some(validity(&[false, false, true, true, false]))
2718 );
2719 assert_eq!(
2720 unraveler.unravel_validity(5),
2721 Some(validity(&[false, false, true, true, true]))
2722 );
2723 assert_eq!(unraveler.unravel_validity(5), None);
2724 }
2725
2726 #[test]
2727 fn test_composite_unravel() {
2728 let mut builder = RepDefBuilder::default();
2729 builder.add_offsets(
2730 offsets_64(&[0, 2, 2, 5]),
2731 Some(validity(&[true, false, true])),
2732 );
2733 builder.add_no_null(5);
2734 let repdef1 = RepDefBuilder::serialize(vec![builder]);
2735
2736 let mut builder = RepDefBuilder::default();
2737 builder.add_offsets(offsets_64(&[0, 1, 3, 5, 7, 9]), None);
2738 builder.add_no_null(9);
2739 let repdef2 = RepDefBuilder::serialize(vec![builder]);
2740
2741 let rep1 = repdef1.repetition_levels.clone().unwrap();
2742 let def1 = repdef1.definition_levels.clone().unwrap();
2743 let rep2 = repdef2.repetition_levels.clone().unwrap();
2744 assert!(repdef2.definition_levels.is_none());
2745
2746 assert_eq!([1, 0, 1, 1, 0, 0], *rep1);
2747 assert_eq!([0, 0, 1, 0, 0, 0], *def1);
2748 assert_eq!([1, 1, 0, 1, 0, 1, 0, 1, 0], *rep2);
2749
2750 let unravel1 = RepDefUnraveler::new(
2751 repdef1.repetition_levels.map(|l| l.to_vec()),
2752 repdef1.definition_levels.map(|l| l.to_vec()),
2753 repdef1.def_meaning.into(),
2754 );
2755 let unravel2 = RepDefUnraveler::new(
2756 repdef2.repetition_levels.map(|l| l.to_vec()),
2757 repdef2.definition_levels.map(|l| l.to_vec()),
2758 repdef2.def_meaning.into(),
2759 );
2760
2761 let mut unraveler = CompositeRepDefUnraveler::new(vec![unravel1, unravel2]);
2762
2763 assert!(unraveler.unravel_validity(9).is_none());
2764 let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2765 assert_eq!(
2766 off.inner(),
2767 offsets_32(&[0, 2, 2, 5, 6, 8, 10, 12, 14]).inner()
2768 );
2769 assert_eq!(
2770 val,
2771 Some(validity(&[true, false, true, true, true, true, true, true]))
2772 );
2773 }
2774
2775 #[test]
2776 fn test_repdef_multiple_builders() {
2777 let mut builder1 = RepDefBuilder::default();
2779 builder1.add_offsets(offsets_64(&[0, 2]), None);
2780 builder1.add_offsets(offsets_64(&[0, 1, 3]), None);
2781 builder1.add_validity_bitmap(validity(&[true, true, true]));
2782
2783 let mut builder2 = RepDefBuilder::default();
2784 builder2.add_offsets(offsets_64(&[0, 0, 3]), Some(validity(&[false, true])));
2785 builder2.add_offsets(
2786 offsets_64(&[0, 2, 2, 6]),
2787 Some(validity(&[true, false, true])),
2788 );
2789 builder2.add_validity_bitmap(validity(&[false, false, false, true, true, false]));
2790
2791 let repdefs = RepDefBuilder::serialize(vec![builder1, builder2]);
2792
2793 let rep = repdefs.repetition_levels.unwrap();
2794 let def = repdefs.definition_levels.unwrap();
2795
2796 assert_eq!([2, 1, 0, 2, 2, 0, 1, 1, 0, 0, 0], *rep);
2797 assert_eq!([0, 0, 0, 3, 1, 1, 2, 1, 0, 0, 1], *def);
2798 }
2799
2800 #[test]
2801 fn test_slicer() {
2802 let mut builder = RepDefBuilder::default();
2803 builder.add_offsets(
2804 offsets_64(&[0, 2, 2, 30, 30]),
2805 Some(validity(&[true, false, true, true])),
2806 );
2807 builder.add_no_null(30);
2808
2809 let repdefs = RepDefBuilder::serialize(vec![builder]);
2810
2811 let mut rep_slicer = repdefs.rep_slicer().unwrap();
2812
2813 assert_eq!(rep_slicer.slice_next(5).len(), 12);
2815 assert_eq!(rep_slicer.slice_next(20).len(), 40);
2817 assert_eq!(rep_slicer.slice_rest().len(), 12);
2819
2820 let mut def_slicer = repdefs.rep_slicer().unwrap();
2821
2822 assert_eq!(def_slicer.slice_next(5).len(), 12);
2824 assert_eq!(def_slicer.slice_next(20).len(), 40);
2826 assert_eq!(def_slicer.slice_rest().len(), 12);
2828 }
2829
2830 #[test]
2831 fn test_control_words() {
2832 fn check(
2834 rep: &[u16],
2835 def: &[u16],
2836 expected_values: Vec<u8>,
2837 expected_bytes_per_word: usize,
2838 expected_bits_rep: u8,
2839 expected_bits_def: u8,
2840 ) {
2841 let num_vals = rep.len().max(def.len());
2842 let max_rep = rep.iter().max().copied().unwrap_or(0);
2843 let max_def = def.iter().max().copied().unwrap_or(0);
2844
2845 let in_rep = if rep.is_empty() { None } else { Some(rep) };
2846 let in_def = if def.is_empty() { None } else { Some(def) };
2847
2848 let mut iter = super::build_control_word_iterator(
2849 in_rep,
2850 max_rep,
2851 in_def,
2852 max_def,
2853 max_def + 1,
2854 expected_values.len(),
2855 );
2856 assert_eq!(iter.bytes_per_word(), expected_bytes_per_word);
2857 assert_eq!(iter.bits_rep(), expected_bits_rep);
2858 assert_eq!(iter.bits_def(), expected_bits_def);
2859 let mut cw_vec = Vec::with_capacity(num_vals * iter.bytes_per_word());
2860
2861 for _ in 0..num_vals {
2862 iter.append_next(&mut cw_vec);
2863 }
2864 assert!(iter.append_next(&mut cw_vec).is_none());
2865
2866 assert_eq!(expected_values, cw_vec);
2867
2868 let parser = super::ControlWordParser::new(expected_bits_rep, expected_bits_def);
2869
2870 let mut rep_out = Vec::with_capacity(num_vals);
2871 let mut def_out = Vec::with_capacity(num_vals);
2872
2873 if expected_bytes_per_word > 0 {
2874 for slice in cw_vec.chunks_exact(expected_bytes_per_word) {
2875 parser.parse(slice, &mut rep_out, &mut def_out);
2876 }
2877 }
2878
2879 assert_eq!(rep, rep_out.as_slice());
2880 assert_eq!(def, def_out.as_slice());
2881 }
2882
2883 let rep = &[0_u16, 7, 3, 2, 9, 8, 12, 5];
2885 let def = &[5_u16, 3, 1, 2, 12, 15, 0, 2];
2886 let expected = vec![
2887 0b00000101, 0b01110011, 0b00110001, 0b00100010, 0b10011100, 0b10001111, 0b11000000, 0b01010010, ];
2896 check(rep, def, expected, 1, 4, 4);
2897
2898 let rep = &[0_u16, 7, 3, 2, 9, 8, 12, 5];
2900 let def = &[5_u16, 3, 1, 2, 12, 22, 0, 2];
2901 let expected = vec![
2902 0b00000101, 0b00000000, 0b11100011, 0b00000000, 0b01100001, 0b00000000, 0b01000010, 0b00000000, 0b00101100, 0b00000001, 0b00010110, 0b00000001, 0b10000000, 0b00000001, 0b10100010, 0b00000000, ];
2911 check(rep, def, expected, 2, 4, 5);
2912
2913 let levels = &[0_u16, 7, 3, 2, 9, 8, 12, 5];
2915 let expected = vec![
2916 0b00000000, 0b00000111, 0b00000011, 0b00000010, 0b00001001, 0b00001000, 0b00001100, 0b00000101, ];
2925 check(levels, &[], expected.clone(), 1, 4, 0);
2926
2927 check(&[], levels, expected, 1, 0, 4);
2929
2930 check(&[], &[], Vec::default(), 0, 0, 0);
2932 }
2933
2934 #[test]
2935 fn test_control_words_rep_index() {
2936 fn check(
2937 rep: &[u16],
2938 def: &[u16],
2939 expected_new_rows: Vec<bool>,
2940 expected_is_visible: Vec<bool>,
2941 ) {
2942 let num_vals = rep.len().max(def.len());
2943 let max_rep = rep.iter().max().copied().unwrap_or(0);
2944 let max_def = def.iter().max().copied().unwrap_or(0);
2945
2946 let in_rep = if rep.is_empty() { None } else { Some(rep) };
2947 let in_def = if def.is_empty() { None } else { Some(def) };
2948
2949 let mut iter = super::build_control_word_iterator(
2950 in_rep,
2951 max_rep,
2952 in_def,
2953 max_def,
2954 2,
2955 expected_new_rows.len(),
2956 );
2957
2958 let mut cw_vec = Vec::with_capacity(num_vals * iter.bytes_per_word());
2959 let mut expected_new_rows = expected_new_rows.iter().copied();
2960 let mut expected_is_visible = expected_is_visible.iter().copied();
2961 for _ in 0..expected_new_rows.len() {
2962 let word_desc = iter.append_next(&mut cw_vec).unwrap();
2963 assert_eq!(word_desc.is_new_row, expected_new_rows.next().unwrap());
2964 assert_eq!(word_desc.is_visible, expected_is_visible.next().unwrap());
2965 }
2966 assert!(iter.append_next(&mut cw_vec).is_none());
2967 }
2968
2969 let rep = &[2_u16, 1, 0, 2, 2, 0, 1, 1, 0, 2, 0];
2971 let def = &[0_u16, 0, 0, 3, 1, 1, 2, 1, 0, 0, 1];
2973
2974 check(
2976 rep,
2977 def,
2978 vec![
2979 true, false, false, true, true, false, false, false, false, true, false,
2980 ],
2981 vec![
2982 true, true, true, false, true, true, true, true, true, true, true,
2983 ],
2984 );
2985 check(
2987 rep,
2988 &[],
2989 vec![
2990 true, false, false, true, true, false, false, false, false, true, false,
2991 ],
2992 vec![true; 11],
2993 );
2994 check(
2996 &[],
2997 def,
2998 vec![
2999 true, true, true, true, true, true, true, true, true, true, true,
3000 ],
3001 vec![true; 11],
3002 );
3003 check(
3005 &[],
3006 &[],
3007 vec![
3008 true, true, true, true, true, true, true, true, true, true, true,
3009 ],
3010 vec![true; 11],
3011 );
3012 }
3013
3014 #[test]
3015 fn regress_empty_list_case() {
3016 let mut builder = RepDefBuilder::default();
3018 builder.add_validity_bitmap(validity(&[true, false, true]));
3019 builder.add_offsets(
3020 offsets_32(&[0, 0, 0, 0]),
3021 Some(validity(&[false, false, false])),
3022 );
3023 builder.add_no_null(0);
3024
3025 let repdefs = RepDefBuilder::serialize(vec![builder]);
3026 let rep = repdefs.repetition_levels.unwrap();
3027 let def = repdefs.definition_levels.unwrap();
3028
3029 assert_eq!([1, 1, 1], *rep);
3030 assert_eq!([1, 2, 1], *def);
3031
3032 let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
3033 Some(rep.as_ref().to_vec()),
3034 Some(def.as_ref().to_vec()),
3035 repdefs.def_meaning.into(),
3036 )]);
3037
3038 assert_eq!(unraveler.unravel_validity(0), None);
3039 let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
3040 assert_eq!(off.inner(), offsets_32(&[0, 0, 0, 0]).inner());
3041 assert_eq!(val, Some(validity(&[false, false, false])));
3042 let val = unraveler.unravel_validity(3).unwrap();
3043 assert_eq!(val.inner(), validity(&[true, false, true]).inner());
3044 }
3045
3046 #[test]
3047 fn regress_list_ends_null_case() {
3048 let mut builder = RepDefBuilder::default();
3049 builder.add_offsets(
3050 offsets_64(&[0, 1, 2, 2]),
3051 Some(validity(&[true, true, false])),
3052 );
3053 builder.add_offsets(offsets_64(&[0, 1, 1]), Some(validity(&[true, false])));
3054 builder.add_no_null(1);
3055
3056 let repdefs = RepDefBuilder::serialize(vec![builder]);
3057 let rep = repdefs.repetition_levels.unwrap();
3058 let def = repdefs.definition_levels.unwrap();
3059
3060 assert_eq!([2, 2, 2], *rep);
3061 assert_eq!([0, 1, 2], *def);
3062
3063 let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
3064 Some(rep.as_ref().to_vec()),
3065 Some(def.as_ref().to_vec()),
3066 repdefs.def_meaning.into(),
3067 )]);
3068
3069 assert_eq!(unraveler.unravel_validity(1), None);
3070 let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
3071 assert_eq!(off.inner(), offsets_32(&[0, 1, 1]).inner());
3072 assert_eq!(val, Some(validity(&[true, false])));
3073 let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
3074 assert_eq!(off.inner(), offsets_32(&[0, 1, 2, 2]).inner());
3075 assert_eq!(val, Some(validity(&[true, true, false])));
3076 }
3077}