1use std::{convert::TryInto, sync::Arc};
13
14use arrow_array::types::UInt64Type;
15
16use lance_core::{Error, Result, datatypes::Field};
17
18use crate::{
19 buffer::LanceBuffer,
20 compression::{
21 DefaultCompressionStrategy, FixedPerValueDecompressor, MiniBlockDecompressor,
22 VariablePerValueDecompressor,
23 },
24 data::{
25 BlockInfo, DataBlock, DataBlockBuilder, FixedWidthDataBlock, StructDataBlock,
26 VariableWidthBlock,
27 },
28 encodings::logical::primitive::{
29 fullzip::{PerValueCompressor, PerValueDataBlock},
30 miniblock::{MiniBlockCompressed, MiniBlockCompressor},
31 },
32 format::{
33 ProtobufUtils21,
34 pb21::{CompressiveEncoding, PackedStruct, compressive_encoding::Compression},
35 },
36 statistics::{GetStat, Stat},
37};
38
39use super::value::{ValueDecompressor, ValueEncoder};
40
41fn struct_data_block_to_fixed_width_data_block(
45 struct_data_block: StructDataBlock,
46 bits_per_values: &[u64],
47) -> DataBlock {
48 let data_size = struct_data_block.expect_single_stat::<UInt64Type>(Stat::DataSize);
49 let mut output = Vec::with_capacity(data_size as usize);
50 let num_values = struct_data_block.children[0].num_values();
51
52 for i in 0..num_values as usize {
53 for (j, child) in struct_data_block.children.iter().enumerate() {
54 let bytes_per_value = (bits_per_values[j] / 8) as usize;
55 let this_data = child
56 .as_fixed_width_ref()
57 .unwrap()
58 .data
59 .slice_with_length(bytes_per_value * i, bytes_per_value);
60 output.extend_from_slice(&this_data);
61 }
62 }
63
64 DataBlock::FixedWidth(FixedWidthDataBlock {
65 bits_per_value: bits_per_values.iter().copied().sum(),
66 data: LanceBuffer::from(output),
67 num_values,
68 block_info: BlockInfo::default(),
69 })
70}
71
72#[derive(Debug, Default)]
73pub struct PackedStructFixedWidthMiniBlockEncoder {}
74
75impl MiniBlockCompressor for PackedStructFixedWidthMiniBlockEncoder {
76 fn compress(&self, data: DataBlock) -> Result<(MiniBlockCompressed, CompressiveEncoding)> {
77 match data {
78 DataBlock::Struct(struct_data_block) => {
79 let bits_per_values = struct_data_block.children.iter().map(|data_block| data_block.as_fixed_width_ref().unwrap().bits_per_value).collect::<Vec<_>>();
80
81 let data_block = struct_data_block_to_fixed_width_data_block(struct_data_block, &bits_per_values);
83
84 let value_miniblock_compressor = Box::new(ValueEncoder::default()) as Box<dyn MiniBlockCompressor>;
86 let (value_miniblock_compressed, value_array_encoding) =
87 value_miniblock_compressor.compress(data_block)?;
88
89 Ok((
90 value_miniblock_compressed,
91 ProtobufUtils21::packed_struct(value_array_encoding, bits_per_values),
92 ))
93 }
94 _ => Err(Error::invalid_input_source(format!(
95 "Cannot compress a data block of type {} with PackedStructFixedWidthBlockEncoder",
96 data.name()
97 )
98 .into())),
99 }
100 }
101}
102
103#[derive(Debug)]
104pub struct PackedStructFixedWidthMiniBlockDecompressor {
105 bits_per_values: Vec<u64>,
106 array_encoding: Box<dyn MiniBlockDecompressor>,
107}
108
109impl PackedStructFixedWidthMiniBlockDecompressor {
110 pub fn new(description: &PackedStruct) -> Self {
111 let array_encoding: Box<dyn MiniBlockDecompressor> = match description
112 .values
113 .as_ref()
114 .unwrap()
115 .compression
116 .as_ref()
117 .unwrap()
118 {
119 Compression::Flat(flat) => Box::new(ValueDecompressor::from_flat(flat)),
120 _ => panic!(
121 "Currently only `ArrayEncoding::Flat` is supported in packed struct encoding in Lance 2.1."
122 ),
123 };
124 Self {
125 bits_per_values: description.bits_per_value.clone(),
126 array_encoding,
127 }
128 }
129}
130
131impl MiniBlockDecompressor for PackedStructFixedWidthMiniBlockDecompressor {
132 fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock> {
133 assert_eq!(data.len(), 1);
134 let encoded_data_block = self.array_encoding.decompress(data, num_values)?;
135 let DataBlock::FixedWidth(encoded_data_block) = encoded_data_block else {
136 panic!("ValueDecompressor should output FixedWidth DataBlock")
137 };
138
139 let bytes_per_values = self
140 .bits_per_values
141 .iter()
142 .map(|bits_per_value| *bits_per_value as usize / 8)
143 .collect::<Vec<_>>();
144
145 assert!(encoded_data_block.bits_per_value % 8 == 0);
146 let encoded_bytes_per_row = (encoded_data_block.bits_per_value / 8) as usize;
147
148 let mut prefix_sum = vec![0; self.bits_per_values.len()];
150 for i in 0..(self.bits_per_values.len() - 1) {
151 prefix_sum[i + 1] = prefix_sum[i] + bytes_per_values[i];
152 }
153
154 let mut children_data_block = vec![];
155 for i in 0..self.bits_per_values.len() {
156 let child_buf_size = bytes_per_values[i] * num_values as usize;
157 let mut child_buf: Vec<u8> = Vec::with_capacity(child_buf_size);
158
159 for j in 0..num_values as usize {
160 let this_value = encoded_data_block.data.slice_with_length(
162 prefix_sum[i] + (j * encoded_bytes_per_row),
163 bytes_per_values[i],
164 );
165
166 child_buf.extend_from_slice(&this_value);
167 }
168
169 let child = DataBlock::FixedWidth(FixedWidthDataBlock {
170 data: LanceBuffer::from(child_buf),
171 bits_per_value: self.bits_per_values[i],
172 num_values,
173 block_info: BlockInfo::default(),
174 });
175 children_data_block.push(child);
176 }
177 Ok(DataBlock::Struct(StructDataBlock {
178 children: children_data_block,
179 block_info: BlockInfo::default(),
180 validity: None,
181 }))
182 }
183}
184
185#[derive(Debug)]
186enum VariablePackedFieldData {
187 Fixed {
188 block: FixedWidthDataBlock,
189 },
190 Variable {
191 block: VariableWidthBlock,
192 bits_per_length: u64,
193 },
194}
195
196impl VariablePackedFieldData {
197 fn append_row_bytes(&self, row_idx: usize, output: &mut Vec<u8>) -> Result<()> {
198 match self {
199 Self::Fixed { block } => {
200 let bits_per_value = block.bits_per_value;
201 if bits_per_value % 8 != 0 {
202 return Err(Error::invalid_input(
203 "Packed struct variable encoding requires byte-aligned fixed-width children",
204 ));
205 }
206 let bytes_per_value = (bits_per_value / 8) as usize;
207 let start = row_idx
208 .checked_mul(bytes_per_value)
209 .ok_or_else(|| Error::invalid_input("Packed struct row size overflow"))?;
210 let end = start + bytes_per_value;
211 let data = block.data.as_ref();
212 if end > data.len() {
213 return Err(Error::invalid_input(
214 "Packed struct fixed child out of bounds",
215 ));
216 }
217 output.extend_from_slice(&data[start..end]);
218 Ok(())
219 }
220 Self::Variable {
221 block,
222 bits_per_length,
223 } => {
224 if bits_per_length % 8 != 0 {
225 return Err(Error::invalid_input(
226 "Packed struct variable children must have byte-aligned length prefixes",
227 ));
228 }
229 let prefix_bytes = (*bits_per_length / 8) as usize;
230 if !(prefix_bytes == 4 || prefix_bytes == 8) {
231 return Err(Error::invalid_input(
232 "Packed struct variable children must use 32 or 64-bit length prefixes",
233 ));
234 }
235 match block.bits_per_offset {
236 32 => {
237 let offsets = block.offsets.borrow_to_typed_slice::<u32>();
238 let start = offsets[row_idx] as usize;
239 let end = offsets[row_idx + 1] as usize;
240 if end > block.data.len() {
241 return Err(Error::invalid_input(
242 "Packed struct variable child offsets out of bounds",
243 ));
244 }
245 let len = (end - start) as u32;
246 if prefix_bytes != std::mem::size_of::<u32>() {
247 return Err(Error::invalid_input(
248 "Packed struct variable child length prefix mismatch",
249 ));
250 }
251 output.extend_from_slice(&len.to_le_bytes());
252 output.extend_from_slice(&block.data[start..end]);
253 Ok(())
254 }
255 64 => {
256 let offsets = block.offsets.borrow_to_typed_slice::<u64>();
257 let start = offsets[row_idx] as usize;
258 let end = offsets[row_idx + 1] as usize;
259 if end > block.data.len() {
260 return Err(Error::invalid_input(
261 "Packed struct variable child offsets out of bounds",
262 ));
263 }
264 let len = (end - start) as u64;
265 if prefix_bytes != std::mem::size_of::<u64>() {
266 return Err(Error::invalid_input(
267 "Packed struct variable child length prefix mismatch",
268 ));
269 }
270 output.extend_from_slice(&len.to_le_bytes());
271 output.extend_from_slice(&block.data[start..end]);
272 Ok(())
273 }
274 _ => Err(Error::invalid_input(
275 "Packed struct variable child must use 32 or 64-bit offsets",
276 )),
277 }
278 }
279 }
280 }
281}
282
283#[derive(Debug)]
284pub struct PackedStructVariablePerValueEncoder {
285 strategy: DefaultCompressionStrategy,
286 fields: Vec<Field>,
287}
288
289impl PackedStructVariablePerValueEncoder {
290 pub fn new(strategy: DefaultCompressionStrategy, fields: Vec<Field>) -> Self {
291 Self { strategy, fields }
292 }
293}
294
295impl PerValueCompressor for PackedStructVariablePerValueEncoder {
296 fn compress(&self, data: DataBlock) -> Result<(PerValueDataBlock, CompressiveEncoding)> {
297 let DataBlock::Struct(struct_block) = data else {
298 return Err(Error::invalid_input(
299 "Packed struct encoder requires Struct data block",
300 ));
301 };
302
303 if struct_block.children.is_empty() {
304 return Err(Error::invalid_input(
305 "Packed struct encoder requires at least one child field",
306 ));
307 }
308 if struct_block.children.len() != self.fields.len() {
309 return Err(Error::invalid_input(
310 "Struct field metadata does not match number of children",
311 ));
312 }
313
314 let num_values = struct_block.children[0].num_values();
315 for child in struct_block.children.iter() {
316 if child.num_values() != num_values {
317 return Err(Error::invalid_input(
318 "Packed struct children must have matching value counts",
319 ));
320 }
321 }
322
323 let mut field_data = Vec::with_capacity(self.fields.len());
324 let mut field_metadata = Vec::with_capacity(self.fields.len());
325
326 for (field, child_block) in self.fields.iter().zip(struct_block.children.into_iter()) {
327 let compressor = crate::compression::CompressionStrategy::create_per_value(
328 &self.strategy,
329 field,
330 &child_block,
331 )?;
332 let (compressed, encoding) = compressor.compress(child_block)?;
333 match compressed {
334 PerValueDataBlock::Fixed(block) => {
335 field_metadata.push(ProtobufUtils21::packed_struct_field_fixed(
336 encoding,
337 block.bits_per_value,
338 ));
339 field_data.push(VariablePackedFieldData::Fixed { block });
340 }
341 PerValueDataBlock::Variable(block) => {
342 let bits_per_length = block.bits_per_offset as u64;
343 field_metadata.push(ProtobufUtils21::packed_struct_field_variable(
344 encoding,
345 bits_per_length,
346 ));
347 field_data.push(VariablePackedFieldData::Variable {
348 block,
349 bits_per_length,
350 });
351 }
352 }
353 }
354
355 let mut row_data: Vec<u8> = Vec::new();
356 let mut row_offsets: Vec<u64> = Vec::with_capacity(num_values as usize + 1);
357 row_offsets.push(0);
358 let mut total_bytes: usize = 0;
359 let mut max_row_len: usize = 0;
360 for row in 0..num_values as usize {
361 let start = row_data.len();
362 for field in &field_data {
363 field.append_row_bytes(row, &mut row_data)?;
364 }
365 let end = row_data.len();
366 let row_len = end - start;
367 max_row_len = max_row_len.max(row_len);
368 total_bytes = total_bytes
369 .checked_add(row_len)
370 .ok_or_else(|| Error::invalid_input("Packed struct row data size overflow"))?;
371 row_offsets.push(end as u64);
372 }
373 debug_assert_eq!(total_bytes, row_data.len());
374
375 let use_u32_offsets = total_bytes <= u32::MAX as usize && max_row_len <= u32::MAX as usize;
376 let bits_per_offset = if use_u32_offsets { 32 } else { 64 };
377 let offsets_buffer = if use_u32_offsets {
378 let offsets_u32 = row_offsets
379 .iter()
380 .map(|&offset| offset as u32)
381 .collect::<Vec<_>>();
382 LanceBuffer::reinterpret_vec(offsets_u32)
383 } else {
384 LanceBuffer::reinterpret_vec(row_offsets)
385 };
386
387 let data_block = VariableWidthBlock {
388 data: LanceBuffer::from(row_data),
389 bits_per_offset,
390 offsets: offsets_buffer,
391 num_values,
392 block_info: BlockInfo::new(),
393 };
394
395 Ok((
396 PerValueDataBlock::Variable(data_block),
397 ProtobufUtils21::packed_struct_variable(field_metadata),
398 ))
399 }
400}
401
402#[derive(Debug)]
403pub(crate) enum VariablePackedStructFieldKind {
404 Fixed {
405 bits_per_value: u64,
406 decompressor: Arc<dyn FixedPerValueDecompressor>,
407 },
408 Variable {
409 bits_per_length: u64,
410 decompressor: Arc<dyn VariablePerValueDecompressor>,
411 },
412}
413
414#[derive(Debug)]
415pub(crate) struct VariablePackedStructFieldDecoder {
416 pub(crate) kind: VariablePackedStructFieldKind,
417}
418
419#[derive(Debug)]
420pub struct PackedStructVariablePerValueDecompressor {
421 fields: Vec<VariablePackedStructFieldDecoder>,
422}
423
424impl PackedStructVariablePerValueDecompressor {
425 pub(crate) fn new(fields: Vec<VariablePackedStructFieldDecoder>) -> Self {
426 Self { fields }
427 }
428}
429
430enum FieldAccumulator {
431 Fixed {
432 builder: DataBlockBuilder,
433 bits_per_value: u64,
434 empty_value: DataBlock,
435 },
436 Variable32 {
437 builder: DataBlockBuilder,
438 empty_value: DataBlock,
439 },
440 Variable64 {
441 builder: DataBlockBuilder,
442 empty_value: DataBlock,
443 },
444}
445
446impl FieldAccumulator {
447 fn append_empty(&mut self) {
451 match self {
452 Self::Fixed {
453 builder,
454 empty_value,
455 ..
456 } => builder.append(empty_value, 0..1),
457 Self::Variable32 {
458 builder,
459 empty_value,
460 } => builder.append(empty_value, 0..1),
461 Self::Variable64 {
462 builder,
463 empty_value,
464 } => builder.append(empty_value, 0..1),
465 }
466 }
467}
468
469impl VariablePerValueDecompressor for PackedStructVariablePerValueDecompressor {
470 fn decompress(&self, data: VariableWidthBlock) -> Result<DataBlock> {
471 let num_values = data.num_values;
472 let offsets_u64 = match data.bits_per_offset {
473 32 => data
474 .offsets
475 .borrow_to_typed_slice::<u32>()
476 .iter()
477 .map(|v| *v as u64)
478 .collect::<Vec<_>>(),
479 64 => data
480 .offsets
481 .borrow_to_typed_slice::<u64>()
482 .as_ref()
483 .to_vec(),
484 _ => {
485 return Err(Error::invalid_input(
486 "Packed struct row offsets must be 32 or 64 bits",
487 ));
488 }
489 };
490
491 if offsets_u64.len() != num_values as usize + 1 {
492 return Err(Error::invalid_input(
493 "Packed struct row offsets length mismatch",
494 ));
495 }
496
497 let mut accumulators = Vec::with_capacity(self.fields.len());
498 for field in &self.fields {
499 match &field.kind {
500 VariablePackedStructFieldKind::Fixed { bits_per_value, .. } => {
501 if bits_per_value % 8 != 0 {
502 return Err(Error::invalid_input(
503 "Packed struct fixed child must be byte-aligned",
504 ));
505 }
506 let bytes_per_value = bits_per_value.checked_div(8).ok_or_else(|| {
507 Error::invalid_input("Invalid bits per value for packed struct field")
508 })?;
509 let estimate = bytes_per_value.checked_mul(num_values).ok_or_else(|| {
510 Error::invalid_input("Packed struct fixed child allocation overflow")
511 })?;
512 let empty_value = DataBlock::FixedWidth(FixedWidthDataBlock {
513 data: LanceBuffer::from(vec![0_u8; bytes_per_value as usize]),
514 bits_per_value: *bits_per_value,
515 num_values: 1,
516 block_info: BlockInfo::new(),
517 });
518 accumulators.push(FieldAccumulator::Fixed {
519 builder: DataBlockBuilder::with_capacity_estimate(estimate),
520 bits_per_value: *bits_per_value,
521 empty_value,
522 });
523 }
524 VariablePackedStructFieldKind::Variable {
525 bits_per_length, ..
526 } => match bits_per_length {
527 32 => accumulators.push(FieldAccumulator::Variable32 {
528 builder: DataBlockBuilder::with_capacity_estimate(data.data.len() as u64),
529 empty_value: DataBlock::VariableWidth(VariableWidthBlock {
530 data: LanceBuffer::empty(),
531 bits_per_offset: 32,
532 offsets: LanceBuffer::reinterpret_vec(vec![0_u32, 0_u32]),
533 num_values: 1,
534 block_info: BlockInfo::new(),
535 }),
536 }),
537 64 => accumulators.push(FieldAccumulator::Variable64 {
538 builder: DataBlockBuilder::with_capacity_estimate(data.data.len() as u64),
539 empty_value: DataBlock::VariableWidth(VariableWidthBlock {
540 data: LanceBuffer::empty(),
541 bits_per_offset: 64,
542 offsets: LanceBuffer::reinterpret_vec(vec![0_u64, 0_u64]),
543 num_values: 1,
544 block_info: BlockInfo::new(),
545 }),
546 }),
547 _ => {
548 return Err(Error::invalid_input(
549 "Packed struct variable child must use 32 or 64-bit length prefixes",
550 ));
551 }
552 },
553 }
554 }
555
556 for row_idx in 0..num_values as usize {
557 let row_start = offsets_u64[row_idx] as usize;
558 let row_end = offsets_u64[row_idx + 1] as usize;
559 if row_end > data.data.len() || row_start > row_end {
560 return Err(Error::invalid_input(
561 "Packed struct row bounds exceed buffer",
562 ));
563 }
564 if row_start == row_end {
565 for accumulator in accumulators.iter_mut() {
566 accumulator.append_empty();
567 }
568 continue;
569 }
570 let mut cursor = row_start;
571 for (field, accumulator) in self.fields.iter().zip(accumulators.iter_mut()) {
572 match (&field.kind, accumulator) {
573 (
574 VariablePackedStructFieldKind::Fixed { bits_per_value, .. },
575 FieldAccumulator::Fixed {
576 builder,
577 bits_per_value: acc_bits,
578 ..
579 },
580 ) => {
581 debug_assert_eq!(bits_per_value, acc_bits);
582 let bytes_per_value = (bits_per_value / 8) as usize;
583 let end = cursor + bytes_per_value;
584 if end > row_end {
585 return Err(Error::invalid_input(
586 "Packed struct fixed child exceeds row bounds",
587 ));
588 }
589 let value_block = DataBlock::FixedWidth(FixedWidthDataBlock {
590 data: LanceBuffer::from(data.data[cursor..end].to_vec()),
591 bits_per_value: *bits_per_value,
592 num_values: 1,
593 block_info: BlockInfo::new(),
594 });
595 builder.append(&value_block, 0..1);
596 cursor = end;
597 }
598 (
599 VariablePackedStructFieldKind::Variable {
600 bits_per_length, ..
601 },
602 FieldAccumulator::Variable32 { builder, .. },
603 ) => {
604 if *bits_per_length != 32 {
605 return Err(Error::invalid_input(
606 "Packed struct length prefix size mismatch",
607 ));
608 }
609 let end = cursor + std::mem::size_of::<u32>();
610 if end > row_end {
611 return Err(Error::invalid_input(
612 "Packed struct variable child length prefix out of bounds",
613 ));
614 }
615 let len = u32::from_le_bytes(
616 data.data[cursor..end]
617 .try_into()
618 .expect("slice has exact length"),
619 ) as usize;
620 cursor = end;
621 let value_end = cursor + len;
622 if value_end > row_end {
623 return Err(Error::invalid_input(
624 "Packed struct variable child exceeds row bounds",
625 ));
626 }
627 let value_block = DataBlock::VariableWidth(VariableWidthBlock {
628 data: LanceBuffer::from(data.data[cursor..value_end].to_vec()),
629 bits_per_offset: 32,
630 offsets: LanceBuffer::reinterpret_vec(vec![0_u32, len as u32]),
631 num_values: 1,
632 block_info: BlockInfo::new(),
633 });
634 builder.append(&value_block, 0..1);
635 cursor = value_end;
636 }
637 (
638 VariablePackedStructFieldKind::Variable {
639 bits_per_length, ..
640 },
641 FieldAccumulator::Variable64 { builder, .. },
642 ) => {
643 if *bits_per_length != 64 {
644 return Err(Error::invalid_input(
645 "Packed struct length prefix size mismatch",
646 ));
647 }
648 let end = cursor + std::mem::size_of::<u64>();
649 if end > row_end {
650 return Err(Error::invalid_input(
651 "Packed struct variable child length prefix out of bounds",
652 ));
653 }
654 let len = u64::from_le_bytes(
655 data.data[cursor..end]
656 .try_into()
657 .expect("slice has exact length"),
658 ) as usize;
659 cursor = end;
660 let value_end = cursor + len;
661 if value_end > row_end {
662 return Err(Error::invalid_input(
663 "Packed struct variable child exceeds row bounds",
664 ));
665 }
666 let value_block = DataBlock::VariableWidth(VariableWidthBlock {
667 data: LanceBuffer::from(data.data[cursor..value_end].to_vec()),
668 bits_per_offset: 64,
669 offsets: LanceBuffer::reinterpret_vec(vec![0_u64, len as u64]),
670 num_values: 1,
671 block_info: BlockInfo::new(),
672 });
673 builder.append(&value_block, 0..1);
674 cursor = value_end;
675 }
676 _ => {
677 return Err(Error::invalid_input(
678 "Packed struct accumulator kind mismatch",
679 ));
680 }
681 }
682 }
683 if cursor != row_end {
684 return Err(Error::invalid_input(
685 "Packed struct row parsing did not consume full row",
686 ));
687 }
688 }
689
690 let mut children = Vec::with_capacity(self.fields.len());
691 for (field, accumulator) in self.fields.iter().zip(accumulators.into_iter()) {
692 match (field, accumulator) {
693 (
694 VariablePackedStructFieldDecoder {
695 kind: VariablePackedStructFieldKind::Fixed { decompressor, .. },
696 },
697 FieldAccumulator::Fixed { builder, .. },
698 ) => {
699 let DataBlock::FixedWidth(block) = builder.finish() else {
700 panic!("Expected fixed-width datablock from builder");
701 };
702 let decoded = decompressor.decompress(block, num_values)?;
703 children.push(decoded);
704 }
705 (
706 VariablePackedStructFieldDecoder {
707 kind:
708 VariablePackedStructFieldKind::Variable {
709 bits_per_length,
710 decompressor,
711 },
712 },
713 FieldAccumulator::Variable32 { builder, .. },
714 ) => {
715 let DataBlock::VariableWidth(mut block) = builder.finish() else {
716 panic!("Expected variable-width datablock from builder");
717 };
718 debug_assert_eq!(block.bits_per_offset, 32);
719 block.bits_per_offset = (*bits_per_length) as u8;
720 let decoded = decompressor.decompress(block)?;
721 children.push(decoded);
722 }
723 (
724 VariablePackedStructFieldDecoder {
725 kind:
726 VariablePackedStructFieldKind::Variable {
727 bits_per_length,
728 decompressor,
729 },
730 },
731 FieldAccumulator::Variable64 { builder, .. },
732 ) => {
733 let DataBlock::VariableWidth(mut block) = builder.finish() else {
734 panic!("Expected variable-width datablock from builder");
735 };
736 debug_assert_eq!(block.bits_per_offset, 64);
737 block.bits_per_offset = (*bits_per_length) as u8;
738 let decoded = decompressor.decompress(block)?;
739 children.push(decoded);
740 }
741 _ => {
742 return Err(Error::invalid_input(
743 "Packed struct accumulator mismatch during finalize",
744 ));
745 }
746 }
747 }
748
749 Ok(DataBlock::Struct(StructDataBlock {
750 children,
751 block_info: BlockInfo::new(),
752 validity: None,
753 }))
754 }
755}
756
757#[cfg(test)]
758mod tests {
759 use super::*;
760 use crate::{
761 compression::CompressionStrategy,
762 compression::{DefaultCompressionStrategy, DefaultDecompressionStrategy},
763 constants::PACKED_STRUCT_META_KEY,
764 statistics::ComputeStat,
765 testing::{TestCases, check_round_trip_encoding_of_data},
766 version::LanceFileVersion,
767 };
768 use arrow_array::{
769 Array, ArrayRef, BinaryArray, Int32Array, Int64Array, LargeStringArray, StringArray,
770 StructArray, UInt32Array,
771 };
772 use arrow_schema::{DataType, Field as ArrowField, Fields};
773 use std::collections::HashMap;
774 use std::sync::Arc;
775
776 fn fixed_block_from_array(array: Int64Array) -> FixedWidthDataBlock {
777 let num_values = array.len() as u64;
778 let block = DataBlock::from_arrays(&[Arc::new(array) as ArrayRef], num_values);
779 match block {
780 DataBlock::FixedWidth(block) => block,
781 _ => panic!("Expected fixed-width data block"),
782 }
783 }
784
785 fn fixed_i32_block_from_array(array: Int32Array) -> FixedWidthDataBlock {
786 let num_values = array.len() as u64;
787 let block = DataBlock::from_arrays(&[Arc::new(array) as ArrayRef], num_values);
788 match block {
789 DataBlock::FixedWidth(block) => block,
790 _ => panic!("Expected fixed-width data block"),
791 }
792 }
793
794 fn variable_block_from_string_array(array: StringArray) -> VariableWidthBlock {
795 let num_values = array.len() as u64;
796 let block = DataBlock::from_arrays(&[Arc::new(array) as ArrayRef], num_values);
797 match block {
798 DataBlock::VariableWidth(block) => block,
799 _ => panic!("Expected variable-width block"),
800 }
801 }
802
803 fn variable_block_from_large_string_array(array: LargeStringArray) -> VariableWidthBlock {
804 let num_values = array.len() as u64;
805 let block = DataBlock::from_arrays(&[Arc::new(array) as ArrayRef], num_values);
806 match block {
807 DataBlock::VariableWidth(block) => block,
808 _ => panic!("Expected variable-width block"),
809 }
810 }
811
812 fn variable_block_from_binary_array(array: BinaryArray) -> VariableWidthBlock {
813 let num_values = array.len() as u64;
814 let block = DataBlock::from_arrays(&[Arc::new(array) as ArrayRef], num_values);
815 match block {
816 DataBlock::VariableWidth(block) => block,
817 _ => panic!("Expected variable-width block"),
818 }
819 }
820
821 #[test]
822 fn variable_packed_struct_round_trip() -> Result<()> {
823 let arrow_fields: Fields = vec![
824 ArrowField::new("id", DataType::UInt32, false),
825 ArrowField::new("name", DataType::Utf8, true),
826 ]
827 .into();
828 let arrow_struct = ArrowField::new("item", DataType::Struct(arrow_fields), false);
829 let struct_field = Field::try_from(&arrow_struct)?;
830
831 let ids = vec![1_u32, 2, 42];
832 let id_bytes = ids
833 .iter()
834 .flat_map(|value| value.to_le_bytes())
835 .collect::<Vec<_>>();
836 let mut id_block = FixedWidthDataBlock {
837 data: LanceBuffer::reinterpret_vec(ids),
838 bits_per_value: 32,
839 num_values: 3,
840 block_info: BlockInfo::new(),
841 };
842 id_block.compute_stat();
843 let id_block = DataBlock::FixedWidth(id_block);
844
845 let name_offsets = vec![0_i32, 1, 4, 4];
846 let name_bytes = b"abcz".to_vec();
847 let mut name_block = VariableWidthBlock {
848 data: LanceBuffer::from(name_bytes.clone()),
849 bits_per_offset: 32,
850 offsets: LanceBuffer::reinterpret_vec(name_offsets.clone()),
851 num_values: 3,
852 block_info: BlockInfo::new(),
853 };
854 name_block.compute_stat();
855 let name_block = DataBlock::VariableWidth(name_block);
856
857 let struct_block = StructDataBlock {
858 children: vec![id_block, name_block],
859 block_info: BlockInfo::new(),
860 validity: None,
861 };
862
863 let data_block = DataBlock::Struct(struct_block);
864
865 let compression_strategy =
866 DefaultCompressionStrategy::new().with_version(LanceFileVersion::V2_2);
867 let compressor = crate::compression::CompressionStrategy::create_per_value(
868 &compression_strategy,
869 &struct_field,
870 &data_block,
871 )?;
872 let (compressed, encoding) = compressor.compress(data_block)?;
873
874 let PerValueDataBlock::Variable(zipped) = compressed else {
875 panic!("expected variable-width packed struct output");
876 };
877
878 let decompression_strategy = DefaultDecompressionStrategy::default();
879 let decompressor =
880 crate::compression::DecompressionStrategy::create_variable_per_value_decompressor(
881 &decompression_strategy,
882 &encoding,
883 )?;
884 let decoded = decompressor.decompress(zipped)?;
885
886 let DataBlock::Struct(decoded_struct) = decoded else {
887 panic!("expected struct datablock after decode");
888 };
889
890 let decoded_id = decoded_struct.children[0].as_fixed_width_ref().unwrap();
891 assert_eq!(decoded_id.bits_per_value, 32);
892 assert_eq!(decoded_id.data.as_ref(), id_bytes.as_slice());
893
894 let decoded_name = decoded_struct.children[1].as_variable_width_ref().unwrap();
895 assert_eq!(decoded_name.bits_per_offset, 32);
896 let decoded_offsets = decoded_name.offsets.borrow_to_typed_slice::<i32>();
897 assert_eq!(decoded_offsets.as_ref(), name_offsets.as_slice());
898 assert_eq!(decoded_name.data.as_ref(), name_bytes.as_slice());
899
900 Ok(())
901 }
902
903 #[test]
904 fn variable_packed_struct_large_utf8_round_trip() -> Result<()> {
905 let arrow_fields: Fields = vec![
906 ArrowField::new("value", DataType::Int64, false),
907 ArrowField::new("text", DataType::LargeUtf8, false),
908 ]
909 .into();
910 let arrow_struct = ArrowField::new("item", DataType::Struct(arrow_fields), false);
911 let struct_field = Field::try_from(&arrow_struct)?;
912
913 let id_block = fixed_block_from_array(Int64Array::from(vec![10, 20, 30, 40]));
914 let payload_array = LargeStringArray::from(vec![
915 "alpha",
916 "a considerably longer payload for testing",
917 "mid",
918 "z",
919 ]);
920 let payload_block = variable_block_from_large_string_array(payload_array);
921
922 let struct_block = StructDataBlock {
923 children: vec![
924 DataBlock::FixedWidth(id_block.clone()),
925 DataBlock::VariableWidth(payload_block.clone()),
926 ],
927 block_info: BlockInfo::new(),
928 validity: None,
929 };
930
931 let data_block = DataBlock::Struct(struct_block);
932
933 let compression_strategy =
934 DefaultCompressionStrategy::new().with_version(LanceFileVersion::V2_2);
935 let compressor = crate::compression::CompressionStrategy::create_per_value(
936 &compression_strategy,
937 &struct_field,
938 &data_block,
939 )?;
940 let (compressed, encoding) = compressor.compress(data_block)?;
941
942 let PerValueDataBlock::Variable(zipped) = compressed else {
943 panic!("expected variable-width packed struct output");
944 };
945
946 let decompression_strategy = DefaultDecompressionStrategy::default();
947 let decompressor =
948 crate::compression::DecompressionStrategy::create_variable_per_value_decompressor(
949 &decompression_strategy,
950 &encoding,
951 )?;
952 let decoded = decompressor.decompress(zipped)?;
953
954 let DataBlock::Struct(decoded_struct) = decoded else {
955 panic!("expected struct datablock after decode");
956 };
957
958 let decoded_id = decoded_struct.children[0].as_fixed_width_ref().unwrap();
959 assert_eq!(decoded_id.bits_per_value, 64);
960 assert_eq!(decoded_id.data.as_ref(), id_block.data.as_ref());
961
962 let decoded_payload = decoded_struct.children[1].as_variable_width_ref().unwrap();
963 assert_eq!(decoded_payload.bits_per_offset, 64);
964 assert_eq!(
965 decoded_payload
966 .offsets
967 .borrow_to_typed_slice::<i64>()
968 .as_ref(),
969 payload_block
970 .offsets
971 .borrow_to_typed_slice::<i64>()
972 .as_ref()
973 );
974 assert_eq!(decoded_payload.data.as_ref(), payload_block.data.as_ref());
975
976 Ok(())
977 }
978
979 #[tokio::test]
980 async fn variable_packed_struct_utf8_round_trip() {
981 let fields = Fields::from(vec![
983 Arc::new(ArrowField::new("id", DataType::UInt32, false)),
984 Arc::new(ArrowField::new("uri", DataType::Utf8, false)),
985 Arc::new(ArrowField::new("long_text", DataType::LargeUtf8, false)),
986 ]);
987
988 let mut meta = HashMap::new();
990 meta.insert(PACKED_STRUCT_META_KEY.to_string(), "true".to_string());
991
992 let array = Arc::new(StructArray::from(vec![
993 (
994 fields[0].clone(),
995 Arc::new(UInt32Array::from(vec![1, 2, 3])) as ArrayRef,
996 ),
997 (
998 fields[1].clone(),
999 Arc::new(StringArray::from(vec![
1000 Some("a"),
1001 Some("b"),
1002 Some("/tmp/x"),
1003 ])) as ArrayRef,
1004 ),
1005 (
1006 fields[2].clone(),
1007 Arc::new(LargeStringArray::from(vec![
1008 Some("alpha"),
1009 Some("a considerably longer payload for testing"),
1010 Some("mid"),
1011 ])) as ArrayRef,
1012 ),
1013 ]));
1014
1015 let test_cases = TestCases::default()
1016 .with_min_file_version(LanceFileVersion::V2_2)
1017 .with_expected_encoding("variable_packed_struct");
1018
1019 check_round_trip_encoding_of_data(vec![array], &test_cases, meta).await;
1020 }
1021
1022 #[test]
1023 fn variable_packed_struct_multi_variable_round_trip() -> Result<()> {
1024 let arrow_fields: Fields = vec![
1025 ArrowField::new("category", DataType::Utf8, false),
1026 ArrowField::new("payload", DataType::Binary, false),
1027 ArrowField::new("count", DataType::Int32, false),
1028 ]
1029 .into();
1030 let arrow_struct = ArrowField::new("item", DataType::Struct(arrow_fields), false);
1031 let struct_field = Field::try_from(&arrow_struct)?;
1032
1033 let category_array = StringArray::from(vec!["red", "blue", "green", "red"]);
1034 let category_block = variable_block_from_string_array(category_array);
1035 let payload_values: Vec<Vec<u8>> =
1036 vec![vec![0x01, 0x02], vec![], vec![0x05, 0x06, 0x07], vec![0xff]];
1037 let payload_array =
1038 BinaryArray::from_iter_values(payload_values.iter().map(|v| v.as_slice()));
1039 let payload_block = variable_block_from_binary_array(payload_array);
1040 let count_block = fixed_i32_block_from_array(Int32Array::from(vec![1, 2, 3, 4]));
1041
1042 let struct_block = StructDataBlock {
1043 children: vec![
1044 DataBlock::VariableWidth(category_block.clone()),
1045 DataBlock::VariableWidth(payload_block.clone()),
1046 DataBlock::FixedWidth(count_block.clone()),
1047 ],
1048 block_info: BlockInfo::new(),
1049 validity: None,
1050 };
1051
1052 let data_block = DataBlock::Struct(struct_block);
1053
1054 let compression_strategy =
1055 DefaultCompressionStrategy::new().with_version(LanceFileVersion::V2_2);
1056 let compressor = crate::compression::CompressionStrategy::create_per_value(
1057 &compression_strategy,
1058 &struct_field,
1059 &data_block,
1060 )?;
1061 let (compressed, encoding) = compressor.compress(data_block)?;
1062
1063 let PerValueDataBlock::Variable(zipped) = compressed else {
1064 panic!("expected variable-width packed struct output");
1065 };
1066
1067 let decompression_strategy = DefaultDecompressionStrategy::default();
1068 let decompressor =
1069 crate::compression::DecompressionStrategy::create_variable_per_value_decompressor(
1070 &decompression_strategy,
1071 &encoding,
1072 )?;
1073 let decoded = decompressor.decompress(zipped)?;
1074
1075 let DataBlock::Struct(decoded_struct) = decoded else {
1076 panic!("expected struct datablock after decode");
1077 };
1078
1079 let decoded_category = decoded_struct.children[0].as_variable_width_ref().unwrap();
1080 assert_eq!(decoded_category.bits_per_offset, 32);
1081 assert_eq!(
1082 decoded_category
1083 .offsets
1084 .borrow_to_typed_slice::<i32>()
1085 .as_ref(),
1086 category_block
1087 .offsets
1088 .borrow_to_typed_slice::<i32>()
1089 .as_ref()
1090 );
1091 assert_eq!(decoded_category.data.as_ref(), category_block.data.as_ref());
1092
1093 let decoded_payload = decoded_struct.children[1].as_variable_width_ref().unwrap();
1094 assert_eq!(decoded_payload.bits_per_offset, 32);
1095 assert_eq!(
1096 decoded_payload
1097 .offsets
1098 .borrow_to_typed_slice::<i32>()
1099 .as_ref(),
1100 payload_block
1101 .offsets
1102 .borrow_to_typed_slice::<i32>()
1103 .as_ref()
1104 );
1105 assert_eq!(decoded_payload.data.as_ref(), payload_block.data.as_ref());
1106
1107 let decoded_count = decoded_struct.children[2].as_fixed_width_ref().unwrap();
1108 assert_eq!(decoded_count.bits_per_value, 32);
1109 assert_eq!(decoded_count.data.as_ref(), count_block.data.as_ref());
1110
1111 Ok(())
1112 }
1113
1114 #[test]
1115 fn variable_packed_struct_requires_v22() {
1116 let arrow_fields: Fields = vec![
1117 ArrowField::new("value", DataType::Int64, false),
1118 ArrowField::new("text", DataType::Utf8, false),
1119 ]
1120 .into();
1121 let arrow_struct = ArrowField::new("item", DataType::Struct(arrow_fields), false);
1122 let struct_field = Field::try_from(&arrow_struct).unwrap();
1123
1124 let value_block = fixed_block_from_array(Int64Array::from(vec![1, 2, 3]));
1125 let text_block =
1126 variable_block_from_string_array(StringArray::from(vec!["a", "bb", "ccc"]));
1127
1128 let struct_block = StructDataBlock {
1129 children: vec![
1130 DataBlock::FixedWidth(value_block),
1131 DataBlock::VariableWidth(text_block),
1132 ],
1133 block_info: BlockInfo::new(),
1134 validity: None,
1135 };
1136
1137 let compression_strategy =
1138 DefaultCompressionStrategy::new().with_version(LanceFileVersion::V2_1);
1139 let result =
1140 compression_strategy.create_per_value(&struct_field, &DataBlock::Struct(struct_block));
1141
1142 assert!(matches!(result, Err(Error::NotSupported { .. })));
1143 }
1144
1145 #[test]
1146 fn variable_packed_struct_decompress_empty_row() -> Result<()> {
1147 let strategy = DefaultDecompressionStrategy::default();
1148 let fixed_decompressor = Arc::from(
1149 crate::compression::DecompressionStrategy::create_fixed_per_value_decompressor(
1150 &strategy,
1151 &ProtobufUtils21::flat(32, None),
1152 )?,
1153 );
1154 let variable_decompressor = Arc::from(
1155 crate::compression::DecompressionStrategy::create_variable_per_value_decompressor(
1156 &strategy,
1157 &ProtobufUtils21::variable(ProtobufUtils21::flat(32, None), None),
1158 )?,
1159 );
1160
1161 let decompressor = PackedStructVariablePerValueDecompressor::new(vec![
1162 VariablePackedStructFieldDecoder {
1163 kind: VariablePackedStructFieldKind::Fixed {
1164 bits_per_value: 32,
1165 decompressor: fixed_decompressor,
1166 },
1167 },
1168 VariablePackedStructFieldDecoder {
1169 kind: VariablePackedStructFieldKind::Variable {
1170 bits_per_length: 32,
1171 decompressor: variable_decompressor,
1172 },
1173 },
1174 ]);
1175
1176 let mut row_data = Vec::new();
1177 row_data.extend_from_slice(&1_u32.to_le_bytes());
1178 row_data.extend_from_slice(&1_u32.to_le_bytes());
1179 row_data.extend_from_slice(b"a");
1180 row_data.extend_from_slice(&2_u32.to_le_bytes());
1181 row_data.extend_from_slice(&0_u32.to_le_bytes());
1182
1183 let input = VariableWidthBlock {
1184 data: LanceBuffer::from(row_data),
1185 bits_per_offset: 32,
1186 offsets: LanceBuffer::reinterpret_vec(vec![0_u32, 9_u32, 9_u32, 17_u32]),
1187 num_values: 3,
1188 block_info: BlockInfo::new(),
1189 };
1190
1191 let decoded = decompressor.decompress(input)?;
1192 let DataBlock::Struct(decoded_struct) = decoded else {
1193 panic!("expected struct output");
1194 };
1195
1196 let fixed = decoded_struct.children[0].as_fixed_width_ref().unwrap();
1197 assert_eq!(fixed.bits_per_value, 32);
1198 assert_eq!(
1199 fixed.data.borrow_to_typed_slice::<u32>().as_ref(),
1200 &[1, 0, 2]
1201 );
1202
1203 let variable = decoded_struct.children[1].as_variable_width_ref().unwrap();
1204 assert_eq!(variable.bits_per_offset, 32);
1205 assert_eq!(
1206 variable.offsets.borrow_to_typed_slice::<u32>().as_ref(),
1207 &[0_u32, 1_u32, 1_u32, 1_u32]
1208 );
1209 assert_eq!(variable.data.as_ref(), b"a");
1210
1211 Ok(())
1212 }
1213}