1use std::{
84 iter::{Copied, Zip},
85 sync::Arc,
86};
87
88use arrow_array::OffsetSizeTrait;
89use arrow_buffer::{
90 ArrowNativeType, BooleanBuffer, BooleanBufferBuilder, NullBuffer, OffsetBuffer, ScalarBuffer,
91};
92use lance_core::{utils::bit::log_2_ceil, Error, Result};
93use snafu::location;
94
95use crate::buffer::LanceBuffer;
96
97pub type LevelBuffer = Vec<u16>;
100
101#[derive(Clone, Debug)]
104struct OffsetDesc {
105 offsets: Arc<[i64]>,
106 specials: Arc<[SpecialOffset]>,
107 validity: Option<BooleanBuffer>,
108 has_empty_lists: bool,
109 num_values: usize,
110}
111
112#[derive(Clone, Debug)]
115struct ValidityDesc {
116 validity: Option<BooleanBuffer>,
117 num_values: usize,
118}
119
120#[derive(Clone, Debug)]
124struct FslDesc {
125 validity: Option<BooleanBuffer>,
126 dimension: usize,
127 num_values: usize,
128}
129
130#[derive(Clone, Debug)]
134enum RawRepDef {
135 Offsets(OffsetDesc),
136 Validity(ValidityDesc),
137 Fsl(FslDesc),
138}
139
140impl RawRepDef {
141 fn has_nulls(&self) -> bool {
143 match self {
144 Self::Offsets(OffsetDesc { validity, .. }) => validity.is_some(),
145 Self::Validity(ValidityDesc { validity, .. }) => validity.is_some(),
146 Self::Fsl(FslDesc { validity, .. }) => validity.is_some(),
147 }
148 }
149
150 fn num_values(&self) -> usize {
152 match self {
153 Self::Offsets(OffsetDesc { num_values, .. }) => *num_values,
154 Self::Validity(ValidityDesc { num_values, .. }) => *num_values,
155 Self::Fsl(FslDesc { num_values, .. }) => *num_values,
156 }
157 }
158}
159
160#[derive(Debug)]
163pub struct SerializedRepDefs {
164 pub repetition_levels: Option<Arc<[u16]>>,
168 pub definition_levels: Option<Arc<[u16]>>,
172 pub special_records: Vec<SpecialRecord>,
177 pub def_meaning: Vec<DefinitionInterpretation>,
179 pub max_visible_level: Option<u16>,
186}
187
188impl SerializedRepDefs {
189 pub fn new(
190 repetition_levels: Option<LevelBuffer>,
191 definition_levels: Option<LevelBuffer>,
192 special_records: Vec<SpecialRecord>,
193 def_meaning: Vec<DefinitionInterpretation>,
194 ) -> Self {
195 let first_list = def_meaning.iter().position(|level| level.is_list());
196 let max_visible_level = first_list.map(|first_list| {
197 def_meaning
198 .iter()
199 .map(|level| level.num_def_levels())
200 .take(first_list)
201 .sum::<u16>()
202 });
203 Self {
204 repetition_levels: repetition_levels.map(Arc::from),
205 definition_levels: definition_levels.map(Arc::from),
206 special_records,
207 def_meaning,
208 max_visible_level,
209 }
210 }
211
212 pub fn empty(def_meaning: Vec<DefinitionInterpretation>) -> Self {
214 Self {
215 repetition_levels: None,
216 definition_levels: None,
217 special_records: Vec::new(),
218 def_meaning,
219 max_visible_level: None,
220 }
221 }
222
223 pub fn rep_slicer(&self) -> Option<RepDefSlicer> {
224 self.repetition_levels
225 .as_ref()
226 .map(|rep| RepDefSlicer::new(self, rep.clone()))
227 }
228
229 pub fn def_slicer(&self) -> Option<RepDefSlicer> {
230 self.definition_levels
231 .as_ref()
232 .map(|def| RepDefSlicer::new(self, def.clone()))
233 }
234
235 pub fn collapse_specials(self) -> Self {
238 if self.special_records.is_empty() {
239 return self;
240 }
241
242 let rep = self.repetition_levels.unwrap();
244
245 let new_len = rep.len() + self.special_records.len();
246
247 let mut new_rep = Vec::with_capacity(new_len);
248 let mut new_def = Vec::with_capacity(new_len);
249
250 if let Some(def) = self.definition_levels {
254 let mut def_itr = def.iter();
255 let mut rep_itr = rep.iter();
256 let mut special_itr = self.special_records.into_iter().peekable();
257 let mut last_special = None;
258
259 for idx in 0..new_len {
260 if let Some(special) = special_itr.peek() {
261 if special.pos == idx {
262 new_rep.push(special.rep_level);
263 new_def.push(special.def_level);
264 special_itr.next();
265 last_special = Some(new_rep.last_mut().unwrap());
266 } else {
267 let rep = if let Some(last_special) = last_special {
268 let rep = *last_special;
269 *last_special = *rep_itr.next().unwrap();
270 rep
271 } else {
272 *rep_itr.next().unwrap()
273 };
274 new_rep.push(rep);
275 new_def.push(*def_itr.next().unwrap());
276 last_special = None;
277 }
278 } else {
279 let rep = if let Some(last_special) = last_special {
280 let rep = *last_special;
281 *last_special = *rep_itr.next().unwrap();
282 rep
283 } else {
284 *rep_itr.next().unwrap()
285 };
286 new_rep.push(rep);
287 new_def.push(*def_itr.next().unwrap());
288 last_special = None;
289 }
290 }
291 } else {
292 let mut rep_itr = rep.iter();
293 let mut special_itr = self.special_records.into_iter().peekable();
294 let mut last_special = None;
295
296 for idx in 0..new_len {
297 if let Some(special) = special_itr.peek() {
298 if special.pos == idx {
299 new_rep.push(special.rep_level);
300 new_def.push(special.def_level);
301 special_itr.next();
302 last_special = Some(new_rep.last_mut().unwrap());
303 } else {
304 let rep = if let Some(last_special) = last_special {
305 let rep = *last_special;
306 *last_special = *rep_itr.next().unwrap();
307 rep
308 } else {
309 *rep_itr.next().unwrap()
310 };
311 new_rep.push(rep);
312 new_def.push(0);
313 last_special = None;
314 }
315 } else {
316 let rep = if let Some(last_special) = last_special {
317 let rep = *last_special;
318 *last_special = *rep_itr.next().unwrap();
319 rep
320 } else {
321 *rep_itr.next().unwrap()
322 };
323 new_rep.push(rep);
324 new_def.push(0);
325 last_special = None;
326 }
327 }
328 }
329
330 Self {
331 repetition_levels: Some(new_rep.into()),
332 definition_levels: Some(new_def.into()),
333 special_records: Vec::new(),
334 def_meaning: self.def_meaning,
335 max_visible_level: self.max_visible_level,
336 }
337 }
338}
339
340#[derive(Debug)]
348pub struct RepDefSlicer<'a> {
349 repdef: &'a SerializedRepDefs,
350 to_slice: LanceBuffer,
351 current: usize,
352}
353
354impl<'a> RepDefSlicer<'a> {
356 fn new(repdef: &'a SerializedRepDefs, levels: Arc<[u16]>) -> Self {
357 Self {
358 repdef,
359 to_slice: LanceBuffer::reinterpret_slice(levels),
360 current: 0,
361 }
362 }
363
364 pub fn num_levels(&self) -> usize {
365 self.to_slice.len() / 2
366 }
367
368 pub fn num_levels_remaining(&self) -> usize {
369 self.num_levels() - self.current
370 }
371
372 pub fn all_levels(&self) -> &LanceBuffer {
373 &self.to_slice
374 }
375
376 pub fn slice_rest(&mut self) -> LanceBuffer {
385 let start = self.current;
386 let remaining = self.num_levels_remaining();
387 self.current = self.num_levels();
388 self.to_slice.slice_with_length(start * 2, remaining * 2)
389 }
390
391 pub fn slice_next(&mut self, num_values: usize) -> LanceBuffer {
393 let start = self.current;
394 let Some(max_visible_level) = self.repdef.max_visible_level else {
395 self.current = start + num_values;
397 return self.to_slice.slice_with_length(start * 2, num_values * 2);
398 };
399 if let Some(def) = self.repdef.definition_levels.as_ref() {
400 let mut def_itr = def[start..].iter();
404 let mut num_taken = 0;
405 let mut num_passed = 0;
406 while num_taken < num_values {
407 let def_level = *def_itr.next().unwrap();
408 if def_level <= max_visible_level {
409 num_taken += 1;
410 }
411 num_passed += 1;
412 }
413 self.current = start + num_passed;
414 self.to_slice.slice_with_length(start * 2, num_passed * 2)
415 } else {
416 self.current = start + num_values;
418 self.to_slice.slice_with_length(start * 2, num_values * 2)
419 }
420 }
421}
422
423#[derive(Debug, Copy, Clone, PartialEq, Eq)]
424pub struct SpecialRecord {
425 pos: usize,
440 def_level: u16,
443 rep_level: u16,
446}
447
448#[derive(Debug, Copy, Clone, PartialEq, Eq)]
461pub enum DefinitionInterpretation {
462 AllValidItem,
463 AllValidList,
464 NullableItem,
465 NullableList,
466 EmptyableList,
467 NullableAndEmptyableList,
468}
469
470impl DefinitionInterpretation {
471 pub fn num_def_levels(&self) -> u16 {
473 match self {
474 Self::AllValidItem => 0,
475 Self::AllValidList => 0,
476 Self::NullableItem => 1,
477 Self::NullableList => 1,
478 Self::EmptyableList => 1,
479 Self::NullableAndEmptyableList => 2,
480 }
481 }
482
483 pub fn is_all_valid(&self) -> bool {
485 matches!(
486 self,
487 Self::AllValidItem | Self::AllValidList | Self::EmptyableList
488 )
489 }
490
491 pub fn is_list(&self) -> bool {
493 matches!(
494 self,
495 Self::AllValidList
496 | Self::NullableList
497 | Self::EmptyableList
498 | Self::NullableAndEmptyableList
499 )
500 }
501}
502
503struct SerializerContext {
536 last_offsets: Option<Vec<usize>>,
537 last_offsets_full: Option<Vec<usize>>,
538 specials: Vec<SpecialRecord>,
539 def_meaning: Vec<DefinitionInterpretation>,
540 rep_levels: LevelBuffer,
541 def_levels: LevelBuffer,
542 current_rep: u16,
543 current_def: u16,
544 current_multiplier: usize,
546 has_nulls: bool,
547}
548
549impl SerializerContext {
550 fn new(len: usize, has_nulls: bool, has_offsets: bool, num_layers: usize) -> Self {
551 let def_meaning = Vec::with_capacity(num_layers);
552 Self {
553 last_offsets: None,
554 last_offsets_full: None,
555 rep_levels: if has_offsets {
556 vec![0; len]
557 } else {
558 LevelBuffer::default()
559 },
560 def_levels: if has_nulls {
561 vec![0; len]
562 } else {
563 LevelBuffer::default()
564 },
565 def_meaning,
566 current_rep: 1,
567 current_def: 1,
568 current_multiplier: 1,
569 has_nulls: false,
570 specials: Vec::default(),
571 }
572 }
573
574 fn checkout_def(&mut self, meaning: DefinitionInterpretation) -> u16 {
575 let def = self.current_def;
576 self.current_def += meaning.num_def_levels();
577 self.def_meaning.push(meaning);
578 def
579 }
580
581 fn record_offsets(&mut self, offset_desc: &OffsetDesc) {
582 if self.current_multiplier != 1 {
583 todo!("List<...FSL<...>> not yet supported");
587 }
588 let rep_level = self.current_rep;
589 let (null_list_level, empty_list_level) =
590 match (offset_desc.validity.is_some(), offset_desc.has_empty_lists) {
591 (true, true) => {
592 let level =
593 self.checkout_def(DefinitionInterpretation::NullableAndEmptyableList);
594 (level, level + 1)
595 }
596 (true, false) => (self.checkout_def(DefinitionInterpretation::NullableList), 0),
597 (false, true) => (
598 0,
599 self.checkout_def(DefinitionInterpretation::EmptyableList),
600 ),
601 (false, false) => {
602 self.checkout_def(DefinitionInterpretation::AllValidList);
603 (0, 0)
604 }
605 };
606 self.current_rep += 1;
607 if let Some(last_offsets) = &self.last_offsets {
608 let last_offsets_full = self.last_offsets_full.as_ref().unwrap();
609 let mut new_last_off = Vec::with_capacity(offset_desc.offsets.len());
610 let mut new_last_off_full = Vec::with_capacity(offset_desc.offsets.len());
611 let mut empties_seen = 0;
612 for off in offset_desc.offsets.windows(2) {
613 let offset_ctx = last_offsets[off[0] as usize];
614 let offset_ctx_end = last_offsets[off[1] as usize];
615 new_last_off.push(offset_ctx);
616 new_last_off_full.push(last_offsets_full[off[0] as usize] + empties_seen);
617 if off[0] == off[1] {
618 empties_seen += 1;
620 } else if offset_ctx == offset_ctx_end {
621 let matching_special_idx = self
625 .specials
626 .binary_search_by_key(&offset_ctx, |spec| spec.pos)
627 .unwrap();
628 self.specials[matching_special_idx].rep_level = rep_level;
629 } else {
630 self.rep_levels[offset_ctx] = rep_level;
631 }
632 }
633 new_last_off.push(last_offsets[*offset_desc.offsets.last().unwrap() as usize]);
634 new_last_off_full.push(
635 last_offsets_full[*offset_desc.offsets.last().unwrap() as usize] + empties_seen,
636 );
637 self.last_offsets = Some(new_last_off);
638 self.last_offsets_full = Some(new_last_off_full);
639 } else {
640 let mut new_last_off = Vec::with_capacity(offset_desc.offsets.len());
641 let mut new_last_off_full = Vec::with_capacity(offset_desc.offsets.len());
642 let mut empties_seen = 0;
643 for off in offset_desc.offsets.windows(2) {
644 new_last_off.push(off[0] as usize);
645 new_last_off_full.push(off[0] as usize + empties_seen);
646 if off[0] == off[1] {
647 empties_seen += 1;
648 } else {
649 self.rep_levels[off[0] as usize] = rep_level;
650 }
651 }
652 new_last_off.push(*offset_desc.offsets.last().unwrap() as usize);
653 new_last_off_full.push(*offset_desc.offsets.last().unwrap() as usize + empties_seen);
654 self.last_offsets = Some(new_last_off);
655 self.last_offsets_full = Some(new_last_off_full);
656 }
657
658 let last_offsets_full = self.last_offsets_full.as_ref().unwrap();
660 let num_combined_specials = self.specials.len() + offset_desc.specials.len();
661 let mut new_specials = Vec::with_capacity(num_combined_specials);
662 let mut new_inserted = 0;
663 let mut old_specials_itr = self.specials.iter().peekable();
664 let mut specials_itr = offset_desc.specials.iter().peekable();
665 for _ in 0..num_combined_specials {
666 if let Some(old_special) = old_specials_itr.peek() {
667 let old_special_pos = old_special.pos + new_inserted;
668 if let Some(new_special) = specials_itr.peek() {
669 let new_special_pos = last_offsets_full[new_special.pos()];
670 if old_special_pos < new_special_pos {
671 let mut old_special = *old_specials_itr.next().unwrap();
672 old_special.pos = old_special_pos;
673 new_specials.push(old_special);
674 } else {
675 let new_special = specials_itr.next().unwrap();
676 new_specials.push(SpecialRecord {
677 pos: new_special_pos,
678 def_level: if matches!(new_special, SpecialOffset::EmptyList(_)) {
679 empty_list_level
680 } else {
681 null_list_level
682 },
683 rep_level,
684 });
685 new_inserted += 1;
686 }
687 } else {
688 let mut old_special = *old_specials_itr.next().unwrap();
689 old_special.pos = old_special_pos;
690 new_specials.push(old_special);
691 }
692 } else {
693 let new_special = specials_itr.next().unwrap();
694 new_specials.push(SpecialRecord {
695 pos: last_offsets_full[new_special.pos()],
696 def_level: if matches!(new_special, SpecialOffset::EmptyList(_)) {
697 empty_list_level
698 } else {
699 null_list_level
700 },
701 rep_level,
702 });
703 new_inserted += 1;
704 }
705 }
706 self.specials = new_specials;
707 }
708
709 fn do_record_validity(&mut self, validity: &BooleanBuffer, null_level: u16) {
710 self.has_nulls = true;
711 assert!(!self.def_levels.is_empty());
712 if let Some(last_offsets) = &self.last_offsets {
713 last_offsets
714 .windows(2)
715 .zip(validity.iter())
716 .for_each(|(w, valid)| {
717 let start = w[0] * self.current_multiplier;
718 let end = w[1] * self.current_multiplier;
719 if !valid {
720 self.def_levels[start..end].fill(null_level);
721 }
722 });
723 } else if self.current_multiplier == 1 {
724 self.def_levels
725 .iter_mut()
726 .zip(validity.iter())
727 .for_each(|(def, valid)| {
728 if !valid {
729 *def = null_level;
730 }
731 });
732 } else {
733 self.def_levels
734 .iter_mut()
735 .zip(
736 validity
737 .iter()
738 .flat_map(|v| std::iter::repeat_n(v, self.current_multiplier)),
739 )
740 .for_each(|(def, valid)| {
741 if !valid {
742 *def = null_level;
743 }
744 });
745 }
746 }
747
748 fn record_validity_buf(&mut self, validity: &Option<BooleanBuffer>) {
749 if let Some(validity) = validity {
750 let def_level = self.checkout_def(DefinitionInterpretation::NullableItem);
751 self.do_record_validity(validity, def_level);
752 } else {
753 self.checkout_def(DefinitionInterpretation::AllValidItem);
754 }
755 }
756
757 fn record_validity(&mut self, validity_desc: &ValidityDesc) {
758 self.record_validity_buf(&validity_desc.validity)
759 }
760
761 fn record_fsl(&mut self, fsl_desc: &FslDesc) {
762 self.current_multiplier *= fsl_desc.dimension;
763 self.record_validity_buf(&fsl_desc.validity);
764 }
765
766 fn build(self) -> SerializedRepDefs {
767 let definition_levels = if self.has_nulls {
768 Some(self.def_levels)
769 } else {
770 None
771 };
772 let repetition_levels = if self.current_rep > 1 {
773 Some(self.rep_levels)
774 } else {
775 None
776 };
777 SerializedRepDefs::new(
778 repetition_levels,
779 definition_levels,
780 self.specials,
781 self.def_meaning,
782 )
783 }
784}
785
786#[derive(Debug, Copy, Clone)]
789enum SpecialOffset {
790 NullList(usize),
791 EmptyList(usize),
792}
793
794impl SpecialOffset {
795 fn pos(&self) -> usize {
796 match self {
797 Self::NullList(pos) => *pos,
798 Self::EmptyList(pos) => *pos,
799 }
800 }
801}
802
803#[derive(Clone, Default, Debug)]
810pub struct RepDefBuilder {
811 repdefs: Vec<RawRepDef>,
813 len: Option<usize>,
818}
819
820impl RepDefBuilder {
821 fn check_validity_len(&mut self, incoming_len: usize) {
822 if let Some(len) = self.len {
823 assert_eq!(incoming_len, len);
824 }
825 self.len = Some(incoming_len);
826 }
827
828 fn num_layers(&self) -> usize {
829 self.repdefs.len()
830 }
831
832 fn is_empty(&self) -> bool {
835 self.repdefs
836 .iter()
837 .all(|r| matches!(r, RawRepDef::Validity(ValidityDesc { validity: None, .. })))
838 }
839
840 pub fn is_simple_validity(&self) -> bool {
842 self.repdefs.len() == 1 && matches!(self.repdefs[0], RawRepDef::Validity(_))
843 }
844
845 pub fn has_nulls(&self) -> bool {
850 self.repdefs.iter().any(|rd| {
851 matches!(
852 rd,
853 RawRepDef::Validity(ValidityDesc {
854 validity: Some(_),
855 ..
856 }) | RawRepDef::Fsl(FslDesc {
857 validity: Some(_),
858 ..
859 })
860 )
861 })
862 }
863
864 pub fn has_offsets(&self) -> bool {
865 self.repdefs
866 .iter()
867 .any(|rd| matches!(rd, RawRepDef::Offsets(OffsetDesc { .. })))
868 }
869
870 pub fn add_validity_bitmap(&mut self, validity: NullBuffer) {
872 self.check_validity_len(validity.len());
873 self.repdefs.push(RawRepDef::Validity(ValidityDesc {
874 num_values: validity.len(),
875 validity: Some(validity.into_inner()),
876 }));
877 }
878
879 pub fn add_no_null(&mut self, len: usize) {
881 self.check_validity_len(len);
882 self.repdefs.push(RawRepDef::Validity(ValidityDesc {
883 validity: None,
884 num_values: len,
885 }));
886 }
887
888 pub fn add_fsl(&mut self, validity: Option<NullBuffer>, dimension: usize, num_values: usize) {
889 if let Some(len) = self.len {
890 assert_eq!(num_values, len);
891 }
892 self.len = Some(num_values * dimension);
893 debug_assert!(validity.is_none() || validity.as_ref().unwrap().len() == num_values);
894 self.repdefs.push(RawRepDef::Fsl(FslDesc {
895 num_values,
896 validity: validity.map(|v| v.into_inner()),
897 dimension,
898 }))
899 }
900
901 fn check_offset_len(&mut self, offsets: &[i64]) {
902 if let Some(len) = self.len {
903 assert!(offsets.len() == len + 1);
904 }
905 self.len = Some(offsets[offsets.len() - 1] as usize);
906 }
907
908 pub fn add_offsets<O: OffsetSizeTrait>(
915 &mut self,
916 offsets: OffsetBuffer<O>,
917 validity: Option<NullBuffer>,
918 ) -> bool {
919 let mut has_garbage_values = false;
920 if O::IS_LARGE {
921 let inner = offsets.into_inner();
922 let len = inner.len();
923 let i64_buff = ScalarBuffer::<i64>::new(inner.into_inner(), 0, len);
924 let mut normalized = Vec::with_capacity(len);
925 normalized.push(0_i64);
926 let mut specials = Vec::new();
927 let mut has_empty_lists = false;
928 let mut last_off = 0;
929 if let Some(validity) = validity.as_ref() {
930 for (idx, (off, valid)) in i64_buff.windows(2).zip(validity.iter()).enumerate() {
931 let len: i64 = off[1] - off[0];
932 match (valid, len == 0) {
933 (false, is_empty) => {
934 specials.push(SpecialOffset::NullList(idx));
935 has_garbage_values |= !is_empty;
936 }
937 (true, true) => {
938 has_empty_lists = true;
939 specials.push(SpecialOffset::EmptyList(idx));
940 }
941 _ => {
942 last_off += len;
943 }
944 }
945 normalized.push(last_off);
946 }
947 } else {
948 for (idx, off) in i64_buff.windows(2).enumerate() {
949 let len: i64 = off[1] - off[0];
950 if len == 0 {
951 has_empty_lists = true;
952 specials.push(SpecialOffset::EmptyList(idx));
953 }
954 last_off += len;
955 normalized.push(last_off);
956 }
957 };
958 self.check_offset_len(&normalized);
959 self.repdefs.push(RawRepDef::Offsets(OffsetDesc {
960 num_values: normalized.len() - 1,
961 offsets: normalized.into(),
962 validity: validity.map(|v| v.into_inner()),
963 has_empty_lists,
964 specials: specials.into(),
965 }));
966 has_garbage_values
967 } else {
968 let inner = offsets.into_inner();
969 let len = inner.len();
970 let scalar_off = ScalarBuffer::<i32>::new(inner.into_inner(), 0, len);
971 let mut casted = Vec::with_capacity(len);
972 casted.push(0);
973 let mut has_empty_lists = false;
974 let mut specials = Vec::new();
975 let mut last_off: i64 = 0;
976 if let Some(validity) = validity.as_ref() {
977 for (idx, (off, valid)) in scalar_off.windows(2).zip(validity.iter()).enumerate() {
978 let len = (off[1] - off[0]) as i64;
979 match (valid, len == 0) {
980 (false, is_empty) => {
981 specials.push(SpecialOffset::NullList(idx));
982 has_garbage_values |= !is_empty;
983 }
984 (true, true) => {
985 has_empty_lists = true;
986 specials.push(SpecialOffset::EmptyList(idx));
987 }
988 _ => {
989 last_off += len;
990 }
991 }
992 casted.push(last_off);
993 }
994 } else {
995 for (idx, off) in scalar_off.windows(2).enumerate() {
996 let len = (off[1] - off[0]) as i64;
997 if len == 0 {
998 has_empty_lists = true;
999 specials.push(SpecialOffset::EmptyList(idx));
1000 }
1001 last_off += len;
1002 casted.push(last_off);
1003 }
1004 };
1005 self.check_offset_len(&casted);
1006 self.repdefs.push(RawRepDef::Offsets(OffsetDesc {
1007 num_values: casted.len() - 1,
1008 offsets: casted.into(),
1009 validity: validity.map(|v| v.into_inner()),
1010 has_empty_lists,
1011 specials: specials.into(),
1012 }));
1013 has_garbage_values
1014 }
1015 }
1016
1017 fn concat_layers<'a>(
1029 layers: impl Iterator<Item = &'a RawRepDef>,
1030 num_layers: usize,
1031 ) -> RawRepDef {
1032 enum LayerKind {
1033 Validity,
1034 Fsl,
1035 Offsets,
1036 }
1037
1038 let mut collected = Vec::with_capacity(num_layers);
1041 let mut has_nulls = false;
1042 let mut layer_kind = LayerKind::Validity;
1043 let mut num_specials = 0;
1044 let mut all_dimension = 0;
1045 let mut all_has_empty_lists = false;
1046 let mut all_num_values = 0;
1047 for layer in layers {
1048 has_nulls |= layer.has_nulls();
1049 match layer {
1050 RawRepDef::Validity(_) => {
1051 layer_kind = LayerKind::Validity;
1052 }
1053 RawRepDef::Offsets(OffsetDesc {
1054 specials,
1055 has_empty_lists,
1056 ..
1057 }) => {
1058 all_has_empty_lists |= *has_empty_lists;
1059 layer_kind = LayerKind::Offsets;
1060 num_specials += specials.len();
1061 }
1062 RawRepDef::Fsl(FslDesc { dimension, .. }) => {
1063 layer_kind = LayerKind::Fsl;
1064 all_dimension = *dimension;
1065 }
1066 }
1067 collected.push(layer);
1068 all_num_values += layer.num_values();
1069 }
1070
1071 if !has_nulls {
1073 match layer_kind {
1074 LayerKind::Validity => {
1075 return RawRepDef::Validity(ValidityDesc {
1076 validity: None,
1077 num_values: all_num_values,
1078 });
1079 }
1080 LayerKind::Fsl => {
1081 return RawRepDef::Fsl(FslDesc {
1082 validity: None,
1083 num_values: all_num_values,
1084 dimension: all_dimension,
1085 })
1086 }
1087 LayerKind::Offsets => {}
1088 }
1089 }
1090
1091 let mut validity_builder = if has_nulls {
1093 BooleanBufferBuilder::new(all_num_values)
1094 } else {
1095 BooleanBufferBuilder::new(0)
1096 };
1097 let mut all_offsets = if matches!(layer_kind, LayerKind::Offsets) {
1098 let mut all_offsets = Vec::with_capacity(all_num_values);
1099 all_offsets.push(0);
1100 all_offsets
1101 } else {
1102 Vec::new()
1103 };
1104 let mut all_specials = Vec::with_capacity(num_specials);
1105
1106 for layer in collected {
1107 match layer {
1108 RawRepDef::Validity(ValidityDesc {
1109 validity: Some(validity),
1110 ..
1111 }) => {
1112 validity_builder.append_buffer(validity);
1113 }
1114 RawRepDef::Validity(ValidityDesc {
1115 validity: None,
1116 num_values,
1117 }) => {
1118 validity_builder.append_n(*num_values, true);
1119 }
1120 RawRepDef::Fsl(FslDesc {
1121 validity,
1122 num_values,
1123 ..
1124 }) => {
1125 if let Some(validity) = validity {
1126 validity_builder.append_buffer(validity);
1127 } else {
1128 validity_builder.append_n(*num_values, true);
1129 }
1130 }
1131 RawRepDef::Offsets(OffsetDesc {
1132 offsets,
1133 validity: Some(validity),
1134 has_empty_lists,
1135 specials,
1136 ..
1137 }) => {
1138 all_has_empty_lists |= has_empty_lists;
1139 validity_builder.append_buffer(validity);
1140 let existing_lists = all_offsets.len() - 1;
1141 let last = *all_offsets.last().unwrap();
1142 all_offsets.extend(offsets.iter().skip(1).map(|off| *off + last));
1143 all_specials.extend(specials.iter().map(|s| match s {
1144 SpecialOffset::NullList(pos) => {
1145 SpecialOffset::NullList(*pos + existing_lists)
1146 }
1147 SpecialOffset::EmptyList(pos) => {
1148 SpecialOffset::EmptyList(*pos + existing_lists)
1149 }
1150 }));
1151 }
1152 RawRepDef::Offsets(OffsetDesc {
1153 offsets,
1154 validity: None,
1155 has_empty_lists,
1156 num_values,
1157 specials,
1158 }) => {
1159 all_has_empty_lists |= has_empty_lists;
1160 if has_nulls {
1161 validity_builder.append_n(*num_values, true);
1162 }
1163 let last = *all_offsets.last().unwrap();
1164 let existing_lists = all_offsets.len() - 1;
1165 all_offsets.extend(offsets.iter().skip(1).map(|off| *off + last));
1166 all_specials.extend(specials.iter().map(|s| match s {
1167 SpecialOffset::NullList(pos) => {
1168 SpecialOffset::NullList(*pos + existing_lists)
1169 }
1170 SpecialOffset::EmptyList(pos) => {
1171 SpecialOffset::EmptyList(*pos + existing_lists)
1172 }
1173 }));
1174 }
1175 }
1176 }
1177 let validity = if has_nulls {
1178 Some(validity_builder.finish())
1179 } else {
1180 None
1181 };
1182 match layer_kind {
1183 LayerKind::Fsl => RawRepDef::Fsl(FslDesc {
1184 validity,
1185 num_values: all_num_values,
1186 dimension: all_dimension,
1187 }),
1188 LayerKind::Validity => RawRepDef::Validity(ValidityDesc {
1189 validity,
1190 num_values: all_num_values,
1191 }),
1192 LayerKind::Offsets => RawRepDef::Offsets(OffsetDesc {
1193 offsets: all_offsets.into(),
1194 validity,
1195 has_empty_lists: all_has_empty_lists,
1196 num_values: all_num_values,
1197 specials: all_specials.into(),
1198 }),
1199 }
1200 }
1201
1202 pub fn serialize(builders: Vec<Self>) -> SerializedRepDefs {
1205 assert!(!builders.is_empty());
1206 if builders.iter().all(|b| b.is_empty()) {
1207 return SerializedRepDefs::empty(
1209 builders
1210 .first()
1211 .unwrap()
1212 .repdefs
1213 .iter()
1214 .map(|_| DefinitionInterpretation::AllValidItem)
1215 .collect::<Vec<_>>(),
1216 );
1217 }
1218 let has_nulls = builders.iter().any(|b| b.has_nulls());
1219 let has_offsets = builders.iter().any(|b| b.has_offsets());
1220 let total_len = builders.iter().map(|b| b.len.unwrap()).sum();
1221 let num_layers = builders[0].num_layers();
1222 let mut context = SerializerContext::new(total_len, has_nulls, has_offsets, num_layers);
1223 let combined_layers = (0..num_layers)
1224 .map(|layer_index| {
1225 Self::concat_layers(
1226 builders.iter().map(|b| &b.repdefs[layer_index]),
1227 builders.len(),
1228 )
1229 })
1230 .collect::<Vec<_>>();
1231 debug_assert!(builders
1232 .iter()
1233 .all(|b| b.num_layers() == builders[0].num_layers()));
1234 for layer in combined_layers.into_iter().rev() {
1235 match layer {
1236 RawRepDef::Validity(def) => {
1237 context.record_validity(&def);
1238 }
1239 RawRepDef::Offsets(rep) => {
1240 context.record_offsets(&rep);
1241 }
1242 RawRepDef::Fsl(fsl) => {
1243 context.record_fsl(&fsl);
1244 }
1245 }
1246 }
1247 context.build().collapse_specials()
1248 }
1249}
1250
1251#[derive(Debug)]
1256pub struct RepDefUnraveler {
1257 rep_levels: Option<LevelBuffer>,
1258 def_levels: Option<LevelBuffer>,
1259 levels_to_rep: Vec<u16>,
1261 def_meaning: Arc<[DefinitionInterpretation]>,
1262 current_def_cmp: u16,
1264 current_rep_cmp: u16,
1266 current_layer: usize,
1269}
1270
1271impl RepDefUnraveler {
1272 pub fn new(
1274 rep_levels: Option<LevelBuffer>,
1275 def_levels: Option<LevelBuffer>,
1276 def_meaning: Arc<[DefinitionInterpretation]>,
1277 ) -> Self {
1278 let mut levels_to_rep = Vec::with_capacity(def_meaning.len());
1279 let mut rep_counter = 0;
1280 levels_to_rep.push(0);
1282 for meaning in def_meaning.as_ref() {
1283 match meaning {
1284 DefinitionInterpretation::AllValidItem | DefinitionInterpretation::AllValidList => {
1285 }
1287 DefinitionInterpretation::NullableItem => {
1288 levels_to_rep.push(rep_counter);
1290 }
1291 DefinitionInterpretation::NullableList => {
1292 rep_counter += 1;
1293 levels_to_rep.push(rep_counter);
1294 }
1295 DefinitionInterpretation::EmptyableList => {
1296 rep_counter += 1;
1297 levels_to_rep.push(rep_counter);
1298 }
1299 DefinitionInterpretation::NullableAndEmptyableList => {
1300 rep_counter += 1;
1301 levels_to_rep.push(rep_counter);
1302 levels_to_rep.push(rep_counter);
1303 }
1304 }
1305 }
1306 Self {
1307 rep_levels,
1308 def_levels,
1309 current_def_cmp: 0,
1310 current_rep_cmp: 0,
1311 levels_to_rep,
1312 current_layer: 0,
1313 def_meaning,
1314 }
1315 }
1316
1317 pub fn is_all_valid(&self) -> bool {
1318 self.def_meaning[self.current_layer].is_all_valid()
1319 }
1320
1321 pub fn max_lists(&self) -> usize {
1327 debug_assert!(
1328 self.def_meaning[self.current_layer] != DefinitionInterpretation::NullableItem
1329 );
1330 self.rep_levels
1331 .as_ref()
1332 .map(|levels| levels.len())
1334 .unwrap_or(0)
1335 }
1336
1337 pub fn unravel_offsets<T: ArrowNativeType>(
1342 &mut self,
1343 offsets: &mut Vec<T>,
1344 validity: Option<&mut BooleanBufferBuilder>,
1345 ) -> Result<()> {
1346 let rep_levels = self
1347 .rep_levels
1348 .as_mut()
1349 .expect("Expected repetition level but data didn't contain repetition");
1350 let valid_level = self.current_def_cmp;
1351 let (null_level, empty_level) = match self.def_meaning[self.current_layer] {
1352 DefinitionInterpretation::NullableList => {
1353 self.current_def_cmp += 1;
1354 (valid_level + 1, 0)
1355 }
1356 DefinitionInterpretation::EmptyableList => {
1357 self.current_def_cmp += 1;
1358 (0, valid_level + 1)
1359 }
1360 DefinitionInterpretation::NullableAndEmptyableList => {
1361 self.current_def_cmp += 2;
1362 (valid_level + 1, valid_level + 2)
1363 }
1364 DefinitionInterpretation::AllValidList => (0, 0),
1365 _ => unreachable!(),
1366 };
1367 let max_level = null_level.max(empty_level);
1368 self.current_layer += 1;
1369
1370 let mut curlen: usize = offsets.last().map(|o| o.as_usize()).unwrap_or(0);
1371
1372 offsets.pop();
1380
1381 let to_offset = |val: usize| {
1382 T::from_usize(val)
1383 .ok_or_else(|| Error::invalid_input("A single batch had more than i32::MAX values and so a large container type is required", location!()))
1384 };
1385 self.current_rep_cmp += 1;
1386 if let Some(def_levels) = &mut self.def_levels {
1387 assert!(rep_levels.len() == def_levels.len());
1388 let mut push_validity: Box<dyn FnMut(bool)> = if let Some(validity) = validity {
1391 Box::new(|is_valid| validity.append(is_valid))
1392 } else {
1393 Box::new(|_| {})
1394 };
1395 let mut read_idx = 0;
1399 let mut write_idx = 0;
1400 while read_idx < rep_levels.len() {
1401 unsafe {
1404 let rep_val = *rep_levels.get_unchecked(read_idx);
1405 if rep_val != 0 {
1406 let def_val = *def_levels.get_unchecked(read_idx);
1407 *rep_levels.get_unchecked_mut(write_idx) = rep_val - 1;
1409 *def_levels.get_unchecked_mut(write_idx) = def_val;
1410 write_idx += 1;
1411
1412 if def_val == 0 {
1413 offsets.push(to_offset(curlen)?);
1415 curlen += 1;
1416 push_validity(true);
1417 } else if def_val > max_level {
1418 } else if def_val == null_level {
1420 offsets.push(to_offset(curlen)?);
1422 push_validity(false);
1423 } else if def_val == empty_level {
1424 offsets.push(to_offset(curlen)?);
1426 push_validity(true);
1427 } else {
1428 offsets.push(to_offset(curlen)?);
1430 curlen += 1;
1431 push_validity(true);
1432 }
1433 } else {
1434 curlen += 1;
1435 }
1436 read_idx += 1;
1437 }
1438 }
1439 offsets.push(to_offset(curlen)?);
1440 rep_levels.truncate(write_idx);
1441 def_levels.truncate(write_idx);
1442 Ok(())
1443 } else {
1444 let mut read_idx = 0;
1446 let mut write_idx = 0;
1447 let old_offsets_len = offsets.len();
1448 while read_idx < rep_levels.len() {
1449 unsafe {
1451 let rep_val = *rep_levels.get_unchecked(read_idx);
1452 if rep_val != 0 {
1453 offsets.push(to_offset(curlen)?);
1455 *rep_levels.get_unchecked_mut(write_idx) = rep_val - 1;
1456 write_idx += 1;
1457 }
1458 curlen += 1;
1459 read_idx += 1;
1460 }
1461 }
1462 let num_new_lists = offsets.len() - old_offsets_len;
1463 offsets.push(to_offset(curlen)?);
1464 rep_levels.truncate(offsets.len() - 1);
1465 if let Some(validity) = validity {
1466 validity.append_n(num_new_lists, true);
1469 }
1470 Ok(())
1471 }
1472 }
1473
1474 pub fn skip_validity(&mut self) {
1475 debug_assert!(
1476 self.def_meaning[self.current_layer] == DefinitionInterpretation::AllValidItem
1477 );
1478 self.current_layer += 1;
1479 }
1480
1481 pub fn unravel_validity(&mut self, validity: &mut BooleanBufferBuilder) {
1483 debug_assert!(
1484 self.def_meaning[self.current_layer] != DefinitionInterpretation::AllValidItem
1485 );
1486 self.current_layer += 1;
1487
1488 let def_levels = &self.def_levels.as_ref().unwrap();
1489
1490 let current_def_cmp = self.current_def_cmp;
1491 self.current_def_cmp += 1;
1492
1493 for is_valid in def_levels.iter().filter_map(|&level| {
1494 if self.levels_to_rep[level as usize] <= self.current_rep_cmp {
1495 Some(level <= current_def_cmp)
1496 } else {
1497 None
1498 }
1499 }) {
1500 validity.append(is_valid);
1501 }
1502 }
1503
1504 pub fn decimate(&mut self, dimension: usize) {
1505 if self.rep_levels.is_some() {
1506 todo!("Not yet supported FSL<...List<...>>");
1518 }
1519 let Some(def_levels) = self.def_levels.as_mut() else {
1520 return;
1521 };
1522 let mut read_idx = 0;
1523 let mut write_idx = 0;
1524 while read_idx < def_levels.len() {
1525 unsafe {
1526 *def_levels.get_unchecked_mut(write_idx) = *def_levels.get_unchecked(read_idx);
1527 }
1528 write_idx += 1;
1529 read_idx += dimension;
1530 }
1531 def_levels.truncate(write_idx);
1532 }
1533}
1534
1535#[derive(Debug)]
1549pub struct CompositeRepDefUnraveler {
1550 unravelers: Vec<RepDefUnraveler>,
1551}
1552
1553impl CompositeRepDefUnraveler {
1554 pub fn new(unravelers: Vec<RepDefUnraveler>) -> Self {
1555 Self { unravelers }
1556 }
1557
1558 pub fn unravel_validity(&mut self, num_values: usize) -> Option<NullBuffer> {
1562 let is_all_valid = self
1563 .unravelers
1564 .iter()
1565 .all(|unraveler| unraveler.is_all_valid());
1566
1567 if is_all_valid {
1568 for unraveler in self.unravelers.iter_mut() {
1569 unraveler.skip_validity();
1570 }
1571 None
1572 } else {
1573 let mut validity = BooleanBufferBuilder::new(num_values);
1574 for unraveler in self.unravelers.iter_mut() {
1575 unraveler.unravel_validity(&mut validity);
1576 }
1577 Some(NullBuffer::new(validity.finish()))
1578 }
1579 }
1580
1581 pub fn unravel_fsl_validity(
1582 &mut self,
1583 num_values: usize,
1584 dimension: usize,
1585 ) -> Option<NullBuffer> {
1586 for unraveler in self.unravelers.iter_mut() {
1587 unraveler.decimate(dimension);
1588 }
1589 self.unravel_validity(num_values)
1590 }
1591
1592 pub fn unravel_offsets<T: ArrowNativeType>(
1594 &mut self,
1595 ) -> Result<(OffsetBuffer<T>, Option<NullBuffer>)> {
1596 let mut is_all_valid = true;
1597 let mut max_num_lists = 0;
1598 for unraveler in self.unravelers.iter() {
1599 is_all_valid &= unraveler.is_all_valid();
1600 max_num_lists += unraveler.max_lists();
1601 }
1602
1603 let mut validity = if is_all_valid {
1604 None
1605 } else {
1606 Some(BooleanBufferBuilder::new(max_num_lists))
1609 };
1610
1611 let mut offsets = Vec::with_capacity(max_num_lists + 1);
1612
1613 for unraveler in self.unravelers.iter_mut() {
1614 unraveler.unravel_offsets(&mut offsets, validity.as_mut())?;
1615 }
1616
1617 Ok((
1618 OffsetBuffer::new(ScalarBuffer::from(offsets)),
1619 validity.map(|mut v| NullBuffer::new(v.finish())),
1620 ))
1621 }
1622}
1623
1624#[derive(Debug)]
1630pub struct BinaryControlWordIterator<I: Iterator<Item = (u16, u16)>, W> {
1631 repdef: I,
1632 def_width: usize,
1633 max_rep: u16,
1634 max_visible_def: u16,
1635 rep_mask: u16,
1636 def_mask: u16,
1637 bits_rep: u8,
1638 bits_def: u8,
1639 phantom: std::marker::PhantomData<W>,
1640}
1641
1642impl<I: Iterator<Item = (u16, u16)>> BinaryControlWordIterator<I, u8> {
1643 fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1644 let next = self.repdef.next()?;
1645 let control_word: u8 =
1646 (((next.0 & self.rep_mask) as u8) << self.def_width) + ((next.1 & self.def_mask) as u8);
1647 buf.push(control_word);
1648 let is_new_row = next.0 == self.max_rep;
1649 let is_visible = next.1 <= self.max_visible_def;
1650 let is_valid_item = next.1 == 0;
1651 Some(ControlWordDesc {
1652 is_new_row,
1653 is_visible,
1654 is_valid_item,
1655 })
1656 }
1657}
1658
1659impl<I: Iterator<Item = (u16, u16)>> BinaryControlWordIterator<I, u16> {
1660 fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1661 let next = self.repdef.next()?;
1662 let control_word: u16 =
1663 ((next.0 & self.rep_mask) << self.def_width) + (next.1 & self.def_mask);
1664 let control_word = control_word.to_le_bytes();
1665 buf.push(control_word[0]);
1666 buf.push(control_word[1]);
1667 let is_new_row = next.0 == self.max_rep;
1668 let is_visible = next.1 <= self.max_visible_def;
1669 let is_valid_item = next.1 == 0;
1670 Some(ControlWordDesc {
1671 is_new_row,
1672 is_visible,
1673 is_valid_item,
1674 })
1675 }
1676}
1677
1678impl<I: Iterator<Item = (u16, u16)>> BinaryControlWordIterator<I, u32> {
1679 fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1680 let next = self.repdef.next()?;
1681 let control_word: u32 = (((next.0 & self.rep_mask) as u32) << self.def_width)
1682 + ((next.1 & self.def_mask) as u32);
1683 let control_word = control_word.to_le_bytes();
1684 buf.push(control_word[0]);
1685 buf.push(control_word[1]);
1686 buf.push(control_word[2]);
1687 buf.push(control_word[3]);
1688 let is_new_row = next.0 == self.max_rep;
1689 let is_visible = next.1 <= self.max_visible_def;
1690 let is_valid_item = next.1 == 0;
1691 Some(ControlWordDesc {
1692 is_new_row,
1693 is_visible,
1694 is_valid_item,
1695 })
1696 }
1697}
1698
1699#[derive(Debug)]
1701pub struct UnaryControlWordIterator<I: Iterator<Item = u16>, W> {
1702 repdef: I,
1703 level_mask: u16,
1704 bits_rep: u8,
1705 bits_def: u8,
1706 max_rep: u16,
1707 phantom: std::marker::PhantomData<W>,
1708}
1709
1710impl<I: Iterator<Item = u16>> UnaryControlWordIterator<I, u8> {
1711 fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1712 let next = self.repdef.next()?;
1713 buf.push((next & self.level_mask) as u8);
1714 let is_new_row = self.max_rep == 0 || next == self.max_rep;
1715 let is_valid_item = next == 0 || self.bits_def == 0;
1716 Some(ControlWordDesc {
1717 is_new_row,
1718 is_visible: true,
1721 is_valid_item,
1722 })
1723 }
1724}
1725
1726impl<I: Iterator<Item = u16>> UnaryControlWordIterator<I, u16> {
1727 fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1728 let next = self.repdef.next().unwrap() & self.level_mask;
1729 let control_word = next.to_le_bytes();
1730 buf.push(control_word[0]);
1731 buf.push(control_word[1]);
1732 let is_new_row = self.max_rep == 0 || next == self.max_rep;
1733 let is_valid_item = next == 0 || self.bits_def == 0;
1734 Some(ControlWordDesc {
1735 is_new_row,
1736 is_visible: true,
1737 is_valid_item,
1738 })
1739 }
1740}
1741
1742impl<I: Iterator<Item = u16>> UnaryControlWordIterator<I, u32> {
1743 fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1744 let next = self.repdef.next()?;
1745 let next = (next & self.level_mask) as u32;
1746 let control_word = next.to_le_bytes();
1747 buf.push(control_word[0]);
1748 buf.push(control_word[1]);
1749 buf.push(control_word[2]);
1750 buf.push(control_word[3]);
1751 let is_new_row = self.max_rep == 0 || next as u16 == self.max_rep;
1752 let is_valid_item = next == 0 || self.bits_def == 0;
1753 Some(ControlWordDesc {
1754 is_new_row,
1755 is_visible: true,
1756 is_valid_item,
1757 })
1758 }
1759}
1760
1761#[derive(Debug)]
1763pub struct NilaryControlWordIterator {
1764 len: usize,
1765 idx: usize,
1766}
1767
1768impl NilaryControlWordIterator {
1769 fn append_next(&mut self) -> Option<ControlWordDesc> {
1770 if self.idx == self.len {
1771 None
1772 } else {
1773 self.idx += 1;
1774 Some(ControlWordDesc {
1775 is_new_row: true,
1776 is_visible: true,
1777 is_valid_item: true,
1778 })
1779 }
1780 }
1781}
1782
1783fn get_mask(width: u16) -> u16 {
1785 (1 << width) - 1
1786}
1787
1788type SpecificBinaryControlWordIterator<'a, T> = BinaryControlWordIterator<
1791 Zip<Copied<std::slice::Iter<'a, u16>>, Copied<std::slice::Iter<'a, u16>>>,
1792 T,
1793>;
1794
1795#[derive(Debug)]
1805pub enum ControlWordIterator<'a> {
1806 Binary8(SpecificBinaryControlWordIterator<'a, u8>),
1807 Binary16(SpecificBinaryControlWordIterator<'a, u16>),
1808 Binary32(SpecificBinaryControlWordIterator<'a, u32>),
1809 Unary8(UnaryControlWordIterator<Copied<std::slice::Iter<'a, u16>>, u8>),
1810 Unary16(UnaryControlWordIterator<Copied<std::slice::Iter<'a, u16>>, u16>),
1811 Unary32(UnaryControlWordIterator<Copied<std::slice::Iter<'a, u16>>, u32>),
1812 Nilary(NilaryControlWordIterator),
1813}
1814
1815#[derive(Debug)]
1817pub struct ControlWordDesc {
1818 pub is_new_row: bool,
1819 pub is_visible: bool,
1820 pub is_valid_item: bool,
1821}
1822
1823impl ControlWordIterator<'_> {
1824 pub fn append_next(&mut self, buf: &mut Vec<u8>) -> Option<ControlWordDesc> {
1828 match self {
1829 Self::Binary8(iter) => iter.append_next(buf),
1830 Self::Binary16(iter) => iter.append_next(buf),
1831 Self::Binary32(iter) => iter.append_next(buf),
1832 Self::Unary8(iter) => iter.append_next(buf),
1833 Self::Unary16(iter) => iter.append_next(buf),
1834 Self::Unary32(iter) => iter.append_next(buf),
1835 Self::Nilary(iter) => iter.append_next(),
1836 }
1837 }
1838
1839 pub fn has_repetition(&self) -> bool {
1841 match self {
1842 Self::Binary8(_) | Self::Binary16(_) | Self::Binary32(_) => true,
1843 Self::Unary8(iter) => iter.bits_rep > 0,
1844 Self::Unary16(iter) => iter.bits_rep > 0,
1845 Self::Unary32(iter) => iter.bits_rep > 0,
1846 Self::Nilary(_) => false,
1847 }
1848 }
1849
1850 pub fn bytes_per_word(&self) -> usize {
1852 match self {
1853 Self::Binary8(_) => 1,
1854 Self::Binary16(_) => 2,
1855 Self::Binary32(_) => 4,
1856 Self::Unary8(_) => 1,
1857 Self::Unary16(_) => 2,
1858 Self::Unary32(_) => 4,
1859 Self::Nilary(_) => 0,
1860 }
1861 }
1862
1863 pub fn bits_rep(&self) -> u8 {
1865 match self {
1866 Self::Binary8(iter) => iter.bits_rep,
1867 Self::Binary16(iter) => iter.bits_rep,
1868 Self::Binary32(iter) => iter.bits_rep,
1869 Self::Unary8(iter) => iter.bits_rep,
1870 Self::Unary16(iter) => iter.bits_rep,
1871 Self::Unary32(iter) => iter.bits_rep,
1872 Self::Nilary(_) => 0,
1873 }
1874 }
1875
1876 pub fn bits_def(&self) -> u8 {
1878 match self {
1879 Self::Binary8(iter) => iter.bits_def,
1880 Self::Binary16(iter) => iter.bits_def,
1881 Self::Binary32(iter) => iter.bits_def,
1882 Self::Unary8(iter) => iter.bits_def,
1883 Self::Unary16(iter) => iter.bits_def,
1884 Self::Unary32(iter) => iter.bits_def,
1885 Self::Nilary(_) => 0,
1886 }
1887 }
1888}
1889
1890pub fn build_control_word_iterator<'a>(
1894 rep: Option<&'a [u16]>,
1895 max_rep: u16,
1896 def: Option<&'a [u16]>,
1897 max_def: u16,
1898 max_visible_def: u16,
1899 len: usize,
1900) -> ControlWordIterator<'a> {
1901 let rep_width = if max_rep == 0 {
1902 0
1903 } else {
1904 log_2_ceil(max_rep as u32) as u16
1905 };
1906 let rep_mask = if max_rep == 0 { 0 } else { get_mask(rep_width) };
1907 let def_width = if max_def == 0 {
1908 0
1909 } else {
1910 log_2_ceil(max_def as u32) as u16
1911 };
1912 let def_mask = if max_def == 0 { 0 } else { get_mask(def_width) };
1913 let total_width = rep_width + def_width;
1914 match (rep, def) {
1915 (Some(rep), Some(def)) => {
1916 let iter = rep.iter().copied().zip(def.iter().copied());
1917 let def_width = def_width as usize;
1918 if total_width <= 8 {
1919 ControlWordIterator::Binary8(BinaryControlWordIterator {
1920 repdef: iter,
1921 rep_mask,
1922 def_mask,
1923 def_width,
1924 max_rep,
1925 max_visible_def,
1926 bits_rep: rep_width as u8,
1927 bits_def: def_width as u8,
1928 phantom: std::marker::PhantomData,
1929 })
1930 } else if total_width <= 16 {
1931 ControlWordIterator::Binary16(BinaryControlWordIterator {
1932 repdef: iter,
1933 rep_mask,
1934 def_mask,
1935 def_width,
1936 max_rep,
1937 max_visible_def,
1938 bits_rep: rep_width as u8,
1939 bits_def: def_width as u8,
1940 phantom: std::marker::PhantomData,
1941 })
1942 } else {
1943 ControlWordIterator::Binary32(BinaryControlWordIterator {
1944 repdef: iter,
1945 rep_mask,
1946 def_mask,
1947 def_width,
1948 max_rep,
1949 max_visible_def,
1950 bits_rep: rep_width as u8,
1951 bits_def: def_width as u8,
1952 phantom: std::marker::PhantomData,
1953 })
1954 }
1955 }
1956 (Some(lev), None) => {
1957 let iter = lev.iter().copied();
1958 if total_width <= 8 {
1959 ControlWordIterator::Unary8(UnaryControlWordIterator {
1960 repdef: iter,
1961 level_mask: rep_mask,
1962 bits_rep: total_width as u8,
1963 bits_def: 0,
1964 max_rep,
1965 phantom: std::marker::PhantomData,
1966 })
1967 } else if total_width <= 16 {
1968 ControlWordIterator::Unary16(UnaryControlWordIterator {
1969 repdef: iter,
1970 level_mask: rep_mask,
1971 bits_rep: total_width as u8,
1972 bits_def: 0,
1973 max_rep,
1974 phantom: std::marker::PhantomData,
1975 })
1976 } else {
1977 ControlWordIterator::Unary32(UnaryControlWordIterator {
1978 repdef: iter,
1979 level_mask: rep_mask,
1980 bits_rep: total_width as u8,
1981 bits_def: 0,
1982 max_rep,
1983 phantom: std::marker::PhantomData,
1984 })
1985 }
1986 }
1987 (None, Some(lev)) => {
1988 let iter = lev.iter().copied();
1989 if total_width <= 8 {
1990 ControlWordIterator::Unary8(UnaryControlWordIterator {
1991 repdef: iter,
1992 level_mask: def_mask,
1993 bits_rep: 0,
1994 bits_def: total_width as u8,
1995 max_rep: 0,
1996 phantom: std::marker::PhantomData,
1997 })
1998 } else if total_width <= 16 {
1999 ControlWordIterator::Unary16(UnaryControlWordIterator {
2000 repdef: iter,
2001 level_mask: def_mask,
2002 bits_rep: 0,
2003 bits_def: total_width as u8,
2004 max_rep: 0,
2005 phantom: std::marker::PhantomData,
2006 })
2007 } else {
2008 ControlWordIterator::Unary32(UnaryControlWordIterator {
2009 repdef: iter,
2010 level_mask: def_mask,
2011 bits_rep: 0,
2012 bits_def: total_width as u8,
2013 max_rep: 0,
2014 phantom: std::marker::PhantomData,
2015 })
2016 }
2017 }
2018 (None, None) => ControlWordIterator::Nilary(NilaryControlWordIterator { len, idx: 0 }),
2019 }
2020}
2021
2022#[derive(Copy, Clone, Debug)]
2026pub enum ControlWordParser {
2027 BOTH8(u8, u32),
2030 BOTH16(u8, u32),
2031 BOTH32(u8, u32),
2032 REP8,
2033 REP16,
2034 REP32,
2035 DEF8,
2036 DEF16,
2037 DEF32,
2038 NIL,
2039}
2040
2041impl ControlWordParser {
2042 fn parse_both<const WORD_SIZE: u8>(
2043 src: &[u8],
2044 dst_rep: &mut Vec<u16>,
2045 dst_def: &mut Vec<u16>,
2046 bits_to_shift: u8,
2047 mask_to_apply: u32,
2048 ) {
2049 match WORD_SIZE {
2050 1 => {
2051 let word = src[0];
2052 let rep = word >> bits_to_shift;
2053 let def = word & (mask_to_apply as u8);
2054 dst_rep.push(rep as u16);
2055 dst_def.push(def as u16);
2056 }
2057 2 => {
2058 let word = u16::from_le_bytes([src[0], src[1]]);
2059 let rep = word >> bits_to_shift;
2060 let def = word & mask_to_apply as u16;
2061 dst_rep.push(rep);
2062 dst_def.push(def);
2063 }
2064 4 => {
2065 let word = u32::from_le_bytes([src[0], src[1], src[2], src[3]]);
2066 let rep = word >> bits_to_shift;
2067 let def = word & mask_to_apply;
2068 dst_rep.push(rep as u16);
2069 dst_def.push(def as u16);
2070 }
2071 _ => unreachable!(),
2072 }
2073 }
2074
2075 fn parse_desc_both<const WORD_SIZE: u8>(
2076 src: &[u8],
2077 bits_to_shift: u8,
2078 mask_to_apply: u32,
2079 max_rep: u16,
2080 max_visible_def: u16,
2081 ) -> ControlWordDesc {
2082 match WORD_SIZE {
2083 1 => {
2084 let word = src[0];
2085 let rep = word >> bits_to_shift;
2086 let def = word & (mask_to_apply as u8);
2087 let is_visible = def as u16 <= max_visible_def;
2088 let is_new_row = rep as u16 == max_rep;
2089 let is_valid_item = def == 0;
2090 ControlWordDesc {
2091 is_visible,
2092 is_new_row,
2093 is_valid_item,
2094 }
2095 }
2096 2 => {
2097 let word = u16::from_le_bytes([src[0], src[1]]);
2098 let rep = word >> bits_to_shift;
2099 let def = word & mask_to_apply as u16;
2100 let is_visible = def <= max_visible_def;
2101 let is_new_row = rep == max_rep;
2102 let is_valid_item = def == 0;
2103 ControlWordDesc {
2104 is_visible,
2105 is_new_row,
2106 is_valid_item,
2107 }
2108 }
2109 4 => {
2110 let word = u32::from_le_bytes([src[0], src[1], src[2], src[3]]);
2111 let rep = word >> bits_to_shift;
2112 let def = word & mask_to_apply;
2113 let is_visible = def as u16 <= max_visible_def;
2114 let is_new_row = rep as u16 == max_rep;
2115 let is_valid_item = def == 0;
2116 ControlWordDesc {
2117 is_visible,
2118 is_new_row,
2119 is_valid_item,
2120 }
2121 }
2122 _ => unreachable!(),
2123 }
2124 }
2125
2126 fn parse_one<const WORD_SIZE: u8>(src: &[u8], dst: &mut Vec<u16>) {
2127 match WORD_SIZE {
2128 1 => {
2129 let word = src[0];
2130 dst.push(word as u16);
2131 }
2132 2 => {
2133 let word = u16::from_le_bytes([src[0], src[1]]);
2134 dst.push(word);
2135 }
2136 4 => {
2137 let word = u32::from_le_bytes([src[0], src[1], src[2], src[3]]);
2138 dst.push(word as u16);
2139 }
2140 _ => unreachable!(),
2141 }
2142 }
2143
2144 fn parse_rep_desc_one<const WORD_SIZE: u8>(src: &[u8], max_rep: u16) -> ControlWordDesc {
2145 match WORD_SIZE {
2146 1 => ControlWordDesc {
2147 is_new_row: src[0] as u16 == max_rep,
2148 is_visible: true,
2149 is_valid_item: true,
2150 },
2151 2 => ControlWordDesc {
2152 is_new_row: u16::from_le_bytes([src[0], src[1]]) == max_rep,
2153 is_visible: true,
2154 is_valid_item: true,
2155 },
2156 4 => ControlWordDesc {
2157 is_new_row: u32::from_le_bytes([src[0], src[1], src[2], src[3]]) as u16 == max_rep,
2158 is_visible: true,
2159 is_valid_item: true,
2160 },
2161 _ => unreachable!(),
2162 }
2163 }
2164
2165 fn parse_def_desc_one<const WORD_SIZE: u8>(src: &[u8]) -> ControlWordDesc {
2166 match WORD_SIZE {
2167 1 => ControlWordDesc {
2168 is_new_row: true,
2169 is_visible: true,
2170 is_valid_item: src[0] == 0,
2171 },
2172 2 => ControlWordDesc {
2173 is_new_row: true,
2174 is_visible: true,
2175 is_valid_item: u16::from_le_bytes([src[0], src[1]]) == 0,
2176 },
2177 4 => ControlWordDesc {
2178 is_new_row: true,
2179 is_visible: true,
2180 is_valid_item: u32::from_le_bytes([src[0], src[1], src[2], src[3]]) as u16 == 0,
2181 },
2182 _ => unreachable!(),
2183 }
2184 }
2185
2186 pub fn bytes_per_word(&self) -> usize {
2188 match self {
2189 Self::BOTH8(..) => 1,
2190 Self::BOTH16(..) => 2,
2191 Self::BOTH32(..) => 4,
2192 Self::REP8 => 1,
2193 Self::REP16 => 2,
2194 Self::REP32 => 4,
2195 Self::DEF8 => 1,
2196 Self::DEF16 => 2,
2197 Self::DEF32 => 4,
2198 Self::NIL => 0,
2199 }
2200 }
2201
2202 pub fn parse(&self, src: &[u8], dst_rep: &mut Vec<u16>, dst_def: &mut Vec<u16>) {
2209 match self {
2210 Self::BOTH8(bits_to_shift, mask_to_apply) => {
2211 Self::parse_both::<1>(src, dst_rep, dst_def, *bits_to_shift, *mask_to_apply)
2212 }
2213 Self::BOTH16(bits_to_shift, mask_to_apply) => {
2214 Self::parse_both::<2>(src, dst_rep, dst_def, *bits_to_shift, *mask_to_apply)
2215 }
2216 Self::BOTH32(bits_to_shift, mask_to_apply) => {
2217 Self::parse_both::<4>(src, dst_rep, dst_def, *bits_to_shift, *mask_to_apply)
2218 }
2219 Self::REP8 => Self::parse_one::<1>(src, dst_rep),
2220 Self::REP16 => Self::parse_one::<2>(src, dst_rep),
2221 Self::REP32 => Self::parse_one::<4>(src, dst_rep),
2222 Self::DEF8 => Self::parse_one::<1>(src, dst_def),
2223 Self::DEF16 => Self::parse_one::<2>(src, dst_def),
2224 Self::DEF32 => Self::parse_one::<4>(src, dst_def),
2225 Self::NIL => {}
2226 }
2227 }
2228
2229 pub fn has_rep(&self) -> bool {
2231 match self {
2232 Self::BOTH8(..)
2233 | Self::BOTH16(..)
2234 | Self::BOTH32(..)
2235 | Self::REP8
2236 | Self::REP16
2237 | Self::REP32 => true,
2238 Self::DEF8 | Self::DEF16 | Self::DEF32 | Self::NIL => false,
2239 }
2240 }
2241
2242 pub fn parse_desc(&self, src: &[u8], max_rep: u16, max_visible_def: u16) -> ControlWordDesc {
2244 match self {
2245 Self::BOTH8(bits_to_shift, mask_to_apply) => Self::parse_desc_both::<1>(
2246 src,
2247 *bits_to_shift,
2248 *mask_to_apply,
2249 max_rep,
2250 max_visible_def,
2251 ),
2252 Self::BOTH16(bits_to_shift, mask_to_apply) => Self::parse_desc_both::<2>(
2253 src,
2254 *bits_to_shift,
2255 *mask_to_apply,
2256 max_rep,
2257 max_visible_def,
2258 ),
2259 Self::BOTH32(bits_to_shift, mask_to_apply) => Self::parse_desc_both::<4>(
2260 src,
2261 *bits_to_shift,
2262 *mask_to_apply,
2263 max_rep,
2264 max_visible_def,
2265 ),
2266 Self::REP8 => Self::parse_rep_desc_one::<1>(src, max_rep),
2267 Self::REP16 => Self::parse_rep_desc_one::<2>(src, max_rep),
2268 Self::REP32 => Self::parse_rep_desc_one::<4>(src, max_rep),
2269 Self::DEF8 => Self::parse_def_desc_one::<1>(src),
2270 Self::DEF16 => Self::parse_def_desc_one::<2>(src),
2271 Self::DEF32 => Self::parse_def_desc_one::<4>(src),
2272 Self::NIL => ControlWordDesc {
2273 is_new_row: true,
2274 is_valid_item: true,
2275 is_visible: true,
2276 },
2277 }
2278 }
2279
2280 pub fn new(bits_rep: u8, bits_def: u8) -> Self {
2282 let total_bits = bits_rep + bits_def;
2283
2284 enum WordSize {
2285 One,
2286 Two,
2287 Four,
2288 }
2289
2290 let word_size = if total_bits <= 8 {
2291 WordSize::One
2292 } else if total_bits <= 16 {
2293 WordSize::Two
2294 } else {
2295 WordSize::Four
2296 };
2297
2298 match (bits_rep > 0, bits_def > 0, word_size) {
2299 (false, false, _) => Self::NIL,
2300 (false, true, WordSize::One) => Self::DEF8,
2301 (false, true, WordSize::Two) => Self::DEF16,
2302 (false, true, WordSize::Four) => Self::DEF32,
2303 (true, false, WordSize::One) => Self::REP8,
2304 (true, false, WordSize::Two) => Self::REP16,
2305 (true, false, WordSize::Four) => Self::REP32,
2306 (true, true, WordSize::One) => Self::BOTH8(bits_def, get_mask(bits_def as u16) as u32),
2307 (true, true, WordSize::Two) => Self::BOTH16(bits_def, get_mask(bits_def as u16) as u32),
2308 (true, true, WordSize::Four) => {
2309 Self::BOTH32(bits_def, get_mask(bits_def as u16) as u32)
2310 }
2311 }
2312 }
2313}
2314
2315#[cfg(test)]
2316mod tests {
2317 use arrow_buffer::{NullBuffer, OffsetBuffer, ScalarBuffer};
2318
2319 use crate::repdef::{
2320 CompositeRepDefUnraveler, DefinitionInterpretation, RepDefUnraveler, SerializedRepDefs,
2321 };
2322
2323 use super::RepDefBuilder;
2324
2325 fn validity(values: &[bool]) -> NullBuffer {
2326 NullBuffer::from_iter(values.iter().copied())
2327 }
2328
2329 fn offsets_32(values: &[i32]) -> OffsetBuffer<i32> {
2330 OffsetBuffer::<i32>::new(ScalarBuffer::from_iter(values.iter().copied()))
2331 }
2332
2333 fn offsets_64(values: &[i64]) -> OffsetBuffer<i64> {
2334 OffsetBuffer::<i64>::new(ScalarBuffer::from_iter(values.iter().copied()))
2335 }
2336
2337 #[test]
2338 fn test_repdef_basic() {
2339 let mut builder = RepDefBuilder::default();
2341 builder.add_offsets(
2342 offsets_64(&[0, 2, 2, 5]),
2343 Some(validity(&[true, false, true])),
2344 );
2345 builder.add_offsets(
2346 offsets_64(&[0, 1, 3, 5, 5, 9]),
2347 Some(validity(&[true, true, true, false, true])),
2348 );
2349 builder.add_validity_bitmap(validity(&[
2350 true, true, true, false, false, false, true, true, false,
2351 ]));
2352
2353 let repdefs = RepDefBuilder::serialize(vec![builder]);
2354 let rep = repdefs.repetition_levels.unwrap();
2355 let def = repdefs.definition_levels.unwrap();
2356
2357 assert_eq!(vec![0, 0, 0, 3, 1, 1, 2, 1, 0, 0, 1], *def);
2358 assert_eq!(vec![2, 1, 0, 2, 2, 0, 1, 1, 0, 0, 0], *rep);
2359
2360 let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2361 Some(rep.as_ref().to_vec()),
2362 Some(def.as_ref().to_vec()),
2363 repdefs.def_meaning.into(),
2364 )]);
2365
2366 assert_eq!(
2369 unraveler.unravel_validity(9),
2370 Some(validity(&[
2371 true, true, true, false, false, false, true, true, false
2372 ]))
2373 );
2374 let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2375 assert_eq!(off.inner(), offsets_32(&[0, 1, 3, 5, 5, 9]).inner());
2376 assert_eq!(val, Some(validity(&[true, true, true, false, true])));
2377 let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2378 assert_eq!(off.inner(), offsets_32(&[0, 2, 2, 5]).inner());
2379 assert_eq!(val, Some(validity(&[true, false, true])));
2380 }
2381
2382 #[test]
2383 fn test_repdef_simple_null_empty_list() {
2384 let check = |repdefs: SerializedRepDefs, last_def: DefinitionInterpretation| {
2385 let rep = repdefs.repetition_levels.unwrap();
2386 let def = repdefs.definition_levels.unwrap();
2387
2388 assert_eq!([1, 0, 1, 1, 0, 0], *rep);
2389 assert_eq!([0, 0, 2, 0, 1, 0], *def);
2390 assert!(repdefs.special_records.is_empty());
2391 assert_eq!(
2392 vec![DefinitionInterpretation::NullableItem, last_def,],
2393 repdefs.def_meaning
2394 );
2395 };
2396
2397 let mut builder = RepDefBuilder::default();
2401 builder.add_offsets(
2402 offsets_32(&[0, 2, 2, 5]),
2403 Some(validity(&[true, false, true])),
2404 );
2405 builder.add_validity_bitmap(validity(&[true, true, true, false, true]));
2406
2407 let repdefs = RepDefBuilder::serialize(vec![builder]);
2408
2409 check(repdefs, DefinitionInterpretation::NullableList);
2410
2411 let mut builder = RepDefBuilder::default();
2413 builder.add_offsets(offsets_32(&[0, 2, 2, 5]), None);
2414 builder.add_validity_bitmap(validity(&[true, true, true, false, true]));
2415
2416 let repdefs = RepDefBuilder::serialize(vec![builder]);
2417
2418 check(repdefs, DefinitionInterpretation::EmptyableList);
2419 }
2420
2421 #[test]
2422 fn test_repdef_empty_list_at_end() {
2423 let mut builder = RepDefBuilder::default();
2425 builder.add_offsets(offsets_32(&[0, 2, 5, 5]), None);
2426 builder.add_validity_bitmap(validity(&[true, true, true, false, true]));
2427
2428 let repdefs = RepDefBuilder::serialize(vec![builder]);
2429
2430 let rep = repdefs.repetition_levels.unwrap();
2431 let def = repdefs.definition_levels.unwrap();
2432
2433 assert_eq!([1, 0, 1, 0, 0, 1], *rep);
2434 assert_eq!([0, 0, 0, 1, 0, 2], *def);
2435 assert!(repdefs.special_records.is_empty());
2436 assert_eq!(
2437 vec![
2438 DefinitionInterpretation::NullableItem,
2439 DefinitionInterpretation::EmptyableList,
2440 ],
2441 repdefs.def_meaning
2442 );
2443 }
2444
2445 #[test]
2446 fn test_repdef_abnormal_nulls() {
2447 let mut builder = RepDefBuilder::default();
2450 builder.add_offsets(
2451 offsets_32(&[0, 2, 5, 8]),
2452 Some(validity(&[true, false, true])),
2453 );
2454 builder.add_no_null(5);
2457
2458 let repdefs = RepDefBuilder::serialize(vec![builder]);
2459
2460 let rep = repdefs.repetition_levels.unwrap();
2461 let def = repdefs.definition_levels.unwrap();
2462
2463 assert_eq!([1, 0, 1, 1, 0, 0], *rep);
2464 assert_eq!([0, 0, 1, 0, 0, 0], *def);
2465
2466 assert_eq!(
2467 vec![
2468 DefinitionInterpretation::AllValidItem,
2469 DefinitionInterpretation::NullableList,
2470 ],
2471 repdefs.def_meaning
2472 );
2473 }
2474
2475 #[test]
2476 fn test_repdef_fsl() {
2477 let mut builder = RepDefBuilder::default();
2478 builder.add_fsl(Some(validity(&[true, false])), 2, 2);
2479 builder.add_fsl(None, 2, 4);
2480 builder.add_validity_bitmap(validity(&[
2481 true, false, true, false, true, false, true, false,
2482 ]));
2483
2484 let repdefs = RepDefBuilder::serialize(vec![builder]);
2485
2486 assert_eq!(
2487 vec![
2488 DefinitionInterpretation::NullableItem,
2489 DefinitionInterpretation::AllValidItem,
2490 DefinitionInterpretation::NullableItem
2491 ],
2492 repdefs.def_meaning
2493 );
2494
2495 assert!(repdefs.repetition_levels.is_none());
2496
2497 let def = repdefs.definition_levels.unwrap();
2498
2499 assert_eq!([0, 1, 0, 1, 2, 2, 2, 2], *def);
2500
2501 let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2502 None,
2503 Some(def.as_ref().to_vec()),
2504 repdefs.def_meaning.into(),
2505 )]);
2506
2507 assert_eq!(
2508 unraveler.unravel_validity(8),
2509 Some(validity(&[
2510 true, false, true, false, false, false, false, false
2511 ]))
2512 );
2513 assert_eq!(unraveler.unravel_fsl_validity(4, 2), None);
2514 assert_eq!(
2515 unraveler.unravel_fsl_validity(2, 2),
2516 Some(validity(&[true, false]))
2517 );
2518 }
2519
2520 #[test]
2521 fn test_repdef_fsl_allvalid_item() {
2522 let mut builder = RepDefBuilder::default();
2523 builder.add_fsl(Some(validity(&[true, false])), 2, 2);
2524 builder.add_fsl(None, 2, 4);
2525 builder.add_no_null(8);
2526
2527 let repdefs = RepDefBuilder::serialize(vec![builder]);
2528
2529 assert_eq!(
2530 vec![
2531 DefinitionInterpretation::AllValidItem,
2532 DefinitionInterpretation::AllValidItem,
2533 DefinitionInterpretation::NullableItem
2534 ],
2535 repdefs.def_meaning
2536 );
2537
2538 assert!(repdefs.repetition_levels.is_none());
2539
2540 let def = repdefs.definition_levels.unwrap();
2541
2542 assert_eq!([0, 0, 0, 0, 1, 1, 1, 1], *def);
2543
2544 let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2545 None,
2546 Some(def.as_ref().to_vec()),
2547 repdefs.def_meaning.into(),
2548 )]);
2549
2550 assert_eq!(unraveler.unravel_validity(8), None);
2551 assert_eq!(unraveler.unravel_fsl_validity(4, 2), None);
2552 assert_eq!(
2553 unraveler.unravel_fsl_validity(2, 2),
2554 Some(validity(&[true, false]))
2555 );
2556 }
2557
2558 #[test]
2559 fn test_repdef_sliced_offsets() {
2560 let mut builder = RepDefBuilder::default();
2563 builder.add_offsets(
2564 offsets_32(&[5, 7, 7, 10]),
2565 Some(validity(&[true, false, true])),
2566 );
2567 builder.add_no_null(5);
2568
2569 let repdefs = RepDefBuilder::serialize(vec![builder]);
2570
2571 let rep = repdefs.repetition_levels.unwrap();
2572 let def = repdefs.definition_levels.unwrap();
2573
2574 assert_eq!([1, 0, 1, 1, 0, 0], *rep);
2575 assert_eq!([0, 0, 1, 0, 0, 0], *def);
2576
2577 assert_eq!(
2578 vec![
2579 DefinitionInterpretation::AllValidItem,
2580 DefinitionInterpretation::NullableList,
2581 ],
2582 repdefs.def_meaning
2583 );
2584 }
2585
2586 #[test]
2587 fn test_repdef_complex_null_empty() {
2588 let mut builder = RepDefBuilder::default();
2589 builder.add_offsets(
2590 offsets_32(&[0, 4, 4, 4, 6]),
2591 Some(validity(&[true, false, true, true])),
2592 );
2593 builder.add_offsets(
2594 offsets_32(&[0, 1, 1, 2, 2, 2, 3]),
2595 Some(validity(&[true, false, true, false, true, true])),
2596 );
2597 builder.add_no_null(3);
2598
2599 let repdefs = RepDefBuilder::serialize(vec![builder]);
2600
2601 let rep = repdefs.repetition_levels.unwrap();
2602 let def = repdefs.definition_levels.unwrap();
2603
2604 assert_eq!([2, 1, 1, 1, 2, 2, 2, 1], *rep);
2605 assert_eq!([0, 1, 0, 1, 3, 4, 2, 0], *def);
2606 }
2607
2608 #[test]
2609 fn test_repdef_empty_list_no_null() {
2610 let mut builder = RepDefBuilder::default();
2613 builder.add_offsets(offsets_32(&[0, 4, 4, 4, 6]), None);
2614 builder.add_no_null(6);
2615
2616 let repdefs = RepDefBuilder::serialize(vec![builder]);
2617
2618 let rep = repdefs.repetition_levels.unwrap();
2619 let def = repdefs.definition_levels.unwrap();
2620
2621 assert_eq!([1, 0, 0, 0, 1, 1, 1, 0], *rep);
2622 assert_eq!([0, 0, 0, 0, 1, 1, 0, 0], *def);
2623
2624 let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2625 Some(rep.as_ref().to_vec()),
2626 Some(def.as_ref().to_vec()),
2627 repdefs.def_meaning.into(),
2628 )]);
2629
2630 assert_eq!(unraveler.unravel_validity(6), None);
2631 let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2632 assert_eq!(off.inner(), offsets_32(&[0, 4, 4, 4, 6]).inner());
2633 assert_eq!(val, None);
2634 }
2635
2636 #[test]
2637 fn test_repdef_all_valid() {
2638 let mut builder = RepDefBuilder::default();
2639 builder.add_offsets(offsets_64(&[0, 2, 3, 5]), None);
2640 builder.add_offsets(offsets_64(&[0, 1, 3, 5, 7, 9]), None);
2641 builder.add_no_null(9);
2642
2643 let repdefs = RepDefBuilder::serialize(vec![builder]);
2644 let rep = repdefs.repetition_levels.unwrap();
2645 assert!(repdefs.definition_levels.is_none());
2646
2647 assert_eq!([2, 1, 0, 2, 0, 2, 0, 1, 0], *rep);
2648
2649 let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2650 Some(rep.as_ref().to_vec()),
2651 None,
2652 repdefs.def_meaning.into(),
2653 )]);
2654
2655 assert_eq!(unraveler.unravel_validity(9), None);
2656 let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2657 assert_eq!(off.inner(), offsets_32(&[0, 1, 3, 5, 7, 9]).inner());
2658 assert_eq!(val, None);
2659 let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2660 assert_eq!(off.inner(), offsets_32(&[0, 2, 3, 5]).inner());
2661 assert_eq!(val, None);
2662 }
2663
2664 #[test]
2665 fn test_repdef_no_rep() {
2666 let mut builder = RepDefBuilder::default();
2667 builder.add_no_null(5);
2668 builder.add_validity_bitmap(validity(&[false, false, true, true, true]));
2669 builder.add_validity_bitmap(validity(&[false, true, true, true, false]));
2670
2671 let repdefs = RepDefBuilder::serialize(vec![builder]);
2672 assert!(repdefs.repetition_levels.is_none());
2673 let def = repdefs.definition_levels.unwrap();
2674
2675 assert_eq!([2, 2, 0, 0, 1], *def);
2676
2677 let mut unraveler = CompositeRepDefUnraveler::new(vec![RepDefUnraveler::new(
2678 None,
2679 Some(def.as_ref().to_vec()),
2680 repdefs.def_meaning.into(),
2681 )]);
2682
2683 assert_eq!(
2684 unraveler.unravel_validity(5),
2685 Some(validity(&[false, false, true, true, false]))
2686 );
2687 assert_eq!(
2688 unraveler.unravel_validity(5),
2689 Some(validity(&[false, false, true, true, true]))
2690 );
2691 assert_eq!(unraveler.unravel_validity(5), None);
2692 }
2693
2694 #[test]
2695 fn test_composite_unravel() {
2696 let mut builder = RepDefBuilder::default();
2697 builder.add_offsets(
2698 offsets_64(&[0, 2, 2, 5]),
2699 Some(validity(&[true, false, true])),
2700 );
2701 let repdef1 = RepDefBuilder::serialize(vec![builder]);
2702
2703 let mut builder = RepDefBuilder::default();
2704 builder.add_offsets(offsets_64(&[0, 1, 3, 5, 7, 9]), None);
2705 let repdef2 = RepDefBuilder::serialize(vec![builder]);
2706
2707 let unravel1 = RepDefUnraveler::new(
2708 repdef1.repetition_levels.map(|l| l.to_vec()),
2709 repdef1.definition_levels.map(|l| l.to_vec()),
2710 repdef1.def_meaning.into(),
2711 );
2712 let unravel2 = RepDefUnraveler::new(
2713 repdef2.repetition_levels.map(|l| l.to_vec()),
2714 repdef2.definition_levels.map(|l| l.to_vec()),
2715 repdef2.def_meaning.into(),
2716 );
2717
2718 let mut unraveler = CompositeRepDefUnraveler::new(vec![unravel1, unravel2]);
2719
2720 let (off, val) = unraveler.unravel_offsets::<i32>().unwrap();
2721 assert_eq!(
2722 off.inner(),
2723 offsets_32(&[0, 2, 2, 5, 6, 8, 10, 12, 14]).inner()
2724 );
2725 assert_eq!(
2726 val,
2727 Some(validity(&[true, false, true, true, true, true, true, true]))
2728 );
2729 }
2730
2731 #[test]
2732 fn test_repdef_multiple_builders() {
2733 let mut builder1 = RepDefBuilder::default();
2735 builder1.add_offsets(offsets_64(&[0, 2]), None);
2736 builder1.add_offsets(offsets_64(&[0, 1, 3]), None);
2737 builder1.add_validity_bitmap(validity(&[true, true, true]));
2738
2739 let mut builder2 = RepDefBuilder::default();
2740 builder2.add_offsets(offsets_64(&[0, 0, 3]), Some(validity(&[false, true])));
2741 builder2.add_offsets(
2742 offsets_64(&[0, 2, 2, 6]),
2743 Some(validity(&[true, false, true])),
2744 );
2745 builder2.add_validity_bitmap(validity(&[false, false, false, true, true, false]));
2746
2747 let repdefs = RepDefBuilder::serialize(vec![builder1, builder2]);
2748
2749 let rep = repdefs.repetition_levels.unwrap();
2750 let def = repdefs.definition_levels.unwrap();
2751
2752 assert_eq!([2, 1, 0, 2, 2, 0, 1, 1, 0, 0, 0], *rep);
2753 assert_eq!([0, 0, 0, 3, 1, 1, 2, 1, 0, 0, 1], *def);
2754 }
2755
2756 #[test]
2757 fn test_slicer() {
2758 let mut builder = RepDefBuilder::default();
2759 builder.add_offsets(
2760 offsets_64(&[0, 2, 2, 30, 30]),
2761 Some(validity(&[true, false, true, true])),
2762 );
2763 builder.add_no_null(30);
2764
2765 let repdefs = RepDefBuilder::serialize(vec![builder]);
2766
2767 let mut rep_slicer = repdefs.rep_slicer().unwrap();
2768
2769 assert_eq!(rep_slicer.slice_next(5).len(), 12);
2771 assert_eq!(rep_slicer.slice_next(20).len(), 40);
2773 assert_eq!(rep_slicer.slice_rest().len(), 12);
2775
2776 let mut def_slicer = repdefs.rep_slicer().unwrap();
2777
2778 assert_eq!(def_slicer.slice_next(5).len(), 12);
2780 assert_eq!(def_slicer.slice_next(20).len(), 40);
2782 assert_eq!(def_slicer.slice_rest().len(), 12);
2784 }
2785
2786 #[test]
2787 fn test_control_words() {
2788 fn check(
2790 rep: &[u16],
2791 def: &[u16],
2792 expected_values: Vec<u8>,
2793 expected_bytes_per_word: usize,
2794 expected_bits_rep: u8,
2795 expected_bits_def: u8,
2796 ) {
2797 let num_vals = rep.len().max(def.len());
2798 let max_rep = rep.iter().max().copied().unwrap_or(0);
2799 let max_def = def.iter().max().copied().unwrap_or(0);
2800
2801 let in_rep = if rep.is_empty() { None } else { Some(rep) };
2802 let in_def = if def.is_empty() { None } else { Some(def) };
2803
2804 let mut iter = super::build_control_word_iterator(
2805 in_rep,
2806 max_rep,
2807 in_def,
2808 max_def,
2809 max_def + 1,
2810 expected_values.len(),
2811 );
2812 assert_eq!(iter.bytes_per_word(), expected_bytes_per_word);
2813 assert_eq!(iter.bits_rep(), expected_bits_rep);
2814 assert_eq!(iter.bits_def(), expected_bits_def);
2815 let mut cw_vec = Vec::with_capacity(num_vals * iter.bytes_per_word());
2816
2817 for _ in 0..num_vals {
2818 iter.append_next(&mut cw_vec);
2819 }
2820 assert!(iter.append_next(&mut cw_vec).is_none());
2821
2822 assert_eq!(expected_values, cw_vec);
2823
2824 let parser = super::ControlWordParser::new(expected_bits_rep, expected_bits_def);
2825
2826 let mut rep_out = Vec::with_capacity(num_vals);
2827 let mut def_out = Vec::with_capacity(num_vals);
2828
2829 if expected_bytes_per_word > 0 {
2830 for slice in cw_vec.chunks_exact(expected_bytes_per_word) {
2831 parser.parse(slice, &mut rep_out, &mut def_out);
2832 }
2833 }
2834
2835 assert_eq!(rep, rep_out.as_slice());
2836 assert_eq!(def, def_out.as_slice());
2837 }
2838
2839 let rep = &[0_u16, 7, 3, 2, 9, 8, 12, 5];
2841 let def = &[5_u16, 3, 1, 2, 12, 15, 0, 2];
2842 let expected = vec![
2843 0b00000101, 0b01110011, 0b00110001, 0b00100010, 0b10011100, 0b10001111, 0b11000000, 0b01010010, ];
2852 check(rep, def, expected, 1, 4, 4);
2853
2854 let rep = &[0_u16, 7, 3, 2, 9, 8, 12, 5];
2856 let def = &[5_u16, 3, 1, 2, 12, 22, 0, 2];
2857 let expected = vec![
2858 0b00000101, 0b00000000, 0b11100011, 0b00000000, 0b01100001, 0b00000000, 0b01000010, 0b00000000, 0b00101100, 0b00000001, 0b00010110, 0b00000001, 0b10000000, 0b00000001, 0b10100010, 0b00000000, ];
2867 check(rep, def, expected, 2, 4, 5);
2868
2869 let levels = &[0_u16, 7, 3, 2, 9, 8, 12, 5];
2871 let expected = vec![
2872 0b00000000, 0b00000111, 0b00000011, 0b00000010, 0b00001001, 0b00001000, 0b00001100, 0b00000101, ];
2881 check(levels, &[], expected.clone(), 1, 4, 0);
2882
2883 check(&[], levels, expected, 1, 0, 4);
2885
2886 check(&[], &[], Vec::default(), 0, 0, 0);
2888 }
2889
2890 #[test]
2891 fn test_control_words_rep_index() {
2892 fn check(
2893 rep: &[u16],
2894 def: &[u16],
2895 expected_new_rows: Vec<bool>,
2896 expected_is_visible: Vec<bool>,
2897 ) {
2898 let num_vals = rep.len().max(def.len());
2899 let max_rep = rep.iter().max().copied().unwrap_or(0);
2900 let max_def = def.iter().max().copied().unwrap_or(0);
2901
2902 let in_rep = if rep.is_empty() { None } else { Some(rep) };
2903 let in_def = if def.is_empty() { None } else { Some(def) };
2904
2905 let mut iter = super::build_control_word_iterator(
2906 in_rep,
2907 max_rep,
2908 in_def,
2909 max_def,
2910 2,
2911 expected_new_rows.len(),
2912 );
2913
2914 let mut cw_vec = Vec::with_capacity(num_vals * iter.bytes_per_word());
2915 let mut expected_new_rows = expected_new_rows.iter().copied();
2916 let mut expected_is_visible = expected_is_visible.iter().copied();
2917 for _ in 0..expected_new_rows.len() {
2918 let word_desc = iter.append_next(&mut cw_vec).unwrap();
2919 assert_eq!(word_desc.is_new_row, expected_new_rows.next().unwrap());
2920 assert_eq!(word_desc.is_visible, expected_is_visible.next().unwrap());
2921 }
2922 assert!(iter.append_next(&mut cw_vec).is_none());
2923 }
2924
2925 let rep = &[2_u16, 1, 0, 2, 2, 0, 1, 1, 0, 2, 0];
2927 let def = &[0_u16, 0, 0, 3, 1, 1, 2, 1, 0, 0, 1];
2929
2930 check(
2932 rep,
2933 def,
2934 vec![
2935 true, false, false, true, true, false, false, false, false, true, false,
2936 ],
2937 vec![
2938 true, true, true, false, true, true, true, true, true, true, true,
2939 ],
2940 );
2941 check(
2943 rep,
2944 &[],
2945 vec![
2946 true, false, false, true, true, false, false, false, false, true, false,
2947 ],
2948 vec![true; 11],
2949 );
2950 check(
2952 &[],
2953 def,
2954 vec![
2955 true, true, true, true, true, true, true, true, true, true, true,
2956 ],
2957 vec![true; 11],
2958 );
2959 check(
2961 &[],
2962 &[],
2963 vec![
2964 true, true, true, true, true, true, true, true, true, true, true,
2965 ],
2966 vec![true; 11],
2967 );
2968 }
2969}