1use std::{convert::TryInto, sync::Arc};
13
14use arrow_array::types::UInt64Type;
15
16use lance_core::{datatypes::Field, Error, Result};
17use snafu::location;
18
19use crate::{
20 buffer::LanceBuffer,
21 compression::{
22 DefaultCompressionStrategy, FixedPerValueDecompressor, MiniBlockDecompressor,
23 VariablePerValueDecompressor,
24 },
25 data::{
26 BlockInfo, DataBlock, DataBlockBuilder, FixedWidthDataBlock, StructDataBlock,
27 VariableWidthBlock,
28 },
29 encodings::logical::primitive::{
30 fullzip::{PerValueCompressor, PerValueDataBlock},
31 miniblock::{MiniBlockCompressed, MiniBlockCompressor},
32 },
33 format::{
34 pb21::{compressive_encoding::Compression, CompressiveEncoding, PackedStruct},
35 ProtobufUtils21,
36 },
37 statistics::{GetStat, Stat},
38};
39
40use super::value::{ValueDecompressor, ValueEncoder};
41
42fn struct_data_block_to_fixed_width_data_block(
46 struct_data_block: StructDataBlock,
47 bits_per_values: &[u64],
48) -> DataBlock {
49 let data_size = struct_data_block.expect_single_stat::<UInt64Type>(Stat::DataSize);
50 let mut output = Vec::with_capacity(data_size as usize);
51 let num_values = struct_data_block.children[0].num_values();
52
53 for i in 0..num_values as usize {
54 for (j, child) in struct_data_block.children.iter().enumerate() {
55 let bytes_per_value = (bits_per_values[j] / 8) as usize;
56 let this_data = child
57 .as_fixed_width_ref()
58 .unwrap()
59 .data
60 .slice_with_length(bytes_per_value * i, bytes_per_value);
61 output.extend_from_slice(&this_data);
62 }
63 }
64
65 DataBlock::FixedWidth(FixedWidthDataBlock {
66 bits_per_value: bits_per_values.iter().copied().sum(),
67 data: LanceBuffer::from(output),
68 num_values,
69 block_info: BlockInfo::default(),
70 })
71}
72
73#[derive(Debug, Default)]
74pub struct PackedStructFixedWidthMiniBlockEncoder {}
75
76impl MiniBlockCompressor for PackedStructFixedWidthMiniBlockEncoder {
77 fn compress(&self, data: DataBlock) -> Result<(MiniBlockCompressed, CompressiveEncoding)> {
78 match data {
79 DataBlock::Struct(struct_data_block) => {
80 let bits_per_values = struct_data_block.children.iter().map(|data_block| data_block.as_fixed_width_ref().unwrap().bits_per_value).collect::<Vec<_>>();
81
82 let data_block = struct_data_block_to_fixed_width_data_block(struct_data_block, &bits_per_values);
84
85 let value_miniblock_compressor = Box::new(ValueEncoder::default()) as Box<dyn MiniBlockCompressor>;
87 let (value_miniblock_compressed, value_array_encoding) =
88 value_miniblock_compressor.compress(data_block)?;
89
90 Ok((
91 value_miniblock_compressed,
92 ProtobufUtils21::packed_struct(value_array_encoding, bits_per_values),
93 ))
94 }
95 _ => Err(Error::InvalidInput {
96 source: format!(
97 "Cannot compress a data block of type {} with PackedStructFixedWidthBlockEncoder",
98 data.name()
99 )
100 .into(),
101 location: location!(),
102 }),
103 }
104 }
105}
106
107#[derive(Debug)]
108pub struct PackedStructFixedWidthMiniBlockDecompressor {
109 bits_per_values: Vec<u64>,
110 array_encoding: Box<dyn MiniBlockDecompressor>,
111}
112
113impl PackedStructFixedWidthMiniBlockDecompressor {
114 pub fn new(description: &PackedStruct) -> Self {
115 let array_encoding: Box<dyn MiniBlockDecompressor> = match description.values.as_ref().unwrap().compression.as_ref().unwrap() {
116 Compression::Flat(flat) => Box::new(ValueDecompressor::from_flat(flat)),
117 _ => panic!("Currently only `ArrayEncoding::Flat` is supported in packed struct encoding in Lance 2.1."),
118 };
119 Self {
120 bits_per_values: description.bits_per_value.clone(),
121 array_encoding,
122 }
123 }
124}
125
126impl MiniBlockDecompressor for PackedStructFixedWidthMiniBlockDecompressor {
127 fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock> {
128 assert_eq!(data.len(), 1);
129 let encoded_data_block = self.array_encoding.decompress(data, num_values)?;
130 let DataBlock::FixedWidth(encoded_data_block) = encoded_data_block else {
131 panic!("ValueDecompressor should output FixedWidth DataBlock")
132 };
133
134 let bytes_per_values = self
135 .bits_per_values
136 .iter()
137 .map(|bits_per_value| *bits_per_value as usize / 8)
138 .collect::<Vec<_>>();
139
140 assert!(encoded_data_block.bits_per_value % 8 == 0);
141 let encoded_bytes_per_row = (encoded_data_block.bits_per_value / 8) as usize;
142
143 let mut prefix_sum = vec![0; self.bits_per_values.len()];
145 for i in 0..(self.bits_per_values.len() - 1) {
146 prefix_sum[i + 1] = prefix_sum[i] + bytes_per_values[i];
147 }
148
149 let mut children_data_block = vec![];
150 for i in 0..self.bits_per_values.len() {
151 let child_buf_size = bytes_per_values[i] * num_values as usize;
152 let mut child_buf: Vec<u8> = Vec::with_capacity(child_buf_size);
153
154 for j in 0..num_values as usize {
155 let this_value = encoded_data_block.data.slice_with_length(
157 prefix_sum[i] + (j * encoded_bytes_per_row),
158 bytes_per_values[i],
159 );
160
161 child_buf.extend_from_slice(&this_value);
162 }
163
164 let child = DataBlock::FixedWidth(FixedWidthDataBlock {
165 data: LanceBuffer::from(child_buf),
166 bits_per_value: self.bits_per_values[i],
167 num_values,
168 block_info: BlockInfo::default(),
169 });
170 children_data_block.push(child);
171 }
172 Ok(DataBlock::Struct(StructDataBlock {
173 children: children_data_block,
174 block_info: BlockInfo::default(),
175 validity: None,
176 }))
177 }
178}
179
180#[derive(Debug)]
181enum VariablePackedFieldData {
182 Fixed {
183 block: FixedWidthDataBlock,
184 },
185 Variable {
186 block: VariableWidthBlock,
187 bits_per_length: u64,
188 },
189}
190
191impl VariablePackedFieldData {
192 fn append_row_bytes(&self, row_idx: usize, output: &mut Vec<u8>) -> Result<()> {
193 match self {
194 Self::Fixed { block } => {
195 let bits_per_value = block.bits_per_value;
196 if bits_per_value % 8 != 0 {
197 return Err(Error::invalid_input(
198 "Packed struct variable encoding requires byte-aligned fixed-width children",
199 location!(),
200 ));
201 }
202 let bytes_per_value = (bits_per_value / 8) as usize;
203 let start = row_idx.checked_mul(bytes_per_value).ok_or_else(|| {
204 Error::invalid_input("Packed struct row size overflow", location!())
205 })?;
206 let end = start + bytes_per_value;
207 let data = block.data.as_ref();
208 if end > data.len() {
209 return Err(Error::invalid_input(
210 "Packed struct fixed child out of bounds",
211 location!(),
212 ));
213 }
214 output.extend_from_slice(&data[start..end]);
215 Ok(())
216 }
217 Self::Variable {
218 block,
219 bits_per_length,
220 } => {
221 if bits_per_length % 8 != 0 {
222 return Err(Error::invalid_input(
223 "Packed struct variable children must have byte-aligned length prefixes",
224 location!(),
225 ));
226 }
227 let prefix_bytes = (*bits_per_length / 8) as usize;
228 if !(prefix_bytes == 4 || prefix_bytes == 8) {
229 return Err(Error::invalid_input(
230 "Packed struct variable children must use 32 or 64-bit length prefixes",
231 location!(),
232 ));
233 }
234 match block.bits_per_offset {
235 32 => {
236 let offsets = block.offsets.borrow_to_typed_slice::<u32>();
237 let start = offsets[row_idx] as usize;
238 let end = offsets[row_idx + 1] as usize;
239 if end > block.data.len() {
240 return Err(Error::invalid_input(
241 "Packed struct variable child offsets out of bounds",
242 location!(),
243 ));
244 }
245 let len = (end - start) as u32;
246 if prefix_bytes != std::mem::size_of::<u32>() {
247 return Err(Error::invalid_input(
248 "Packed struct variable child length prefix mismatch",
249 location!(),
250 ));
251 }
252 output.extend_from_slice(&len.to_le_bytes());
253 output.extend_from_slice(&block.data[start..end]);
254 Ok(())
255 }
256 64 => {
257 let offsets = block.offsets.borrow_to_typed_slice::<u64>();
258 let start = offsets[row_idx] as usize;
259 let end = offsets[row_idx + 1] as usize;
260 if end > block.data.len() {
261 return Err(Error::invalid_input(
262 "Packed struct variable child offsets out of bounds",
263 location!(),
264 ));
265 }
266 let len = (end - start) as u64;
267 if prefix_bytes != std::mem::size_of::<u64>() {
268 return Err(Error::invalid_input(
269 "Packed struct variable child length prefix mismatch",
270 location!(),
271 ));
272 }
273 output.extend_from_slice(&len.to_le_bytes());
274 output.extend_from_slice(&block.data[start..end]);
275 Ok(())
276 }
277 _ => Err(Error::invalid_input(
278 "Packed struct variable child must use 32 or 64-bit offsets",
279 location!(),
280 )),
281 }
282 }
283 }
284 }
285}
286
287#[derive(Debug)]
288pub struct PackedStructVariablePerValueEncoder {
289 strategy: DefaultCompressionStrategy,
290 fields: Vec<Field>,
291}
292
293impl PackedStructVariablePerValueEncoder {
294 pub fn new(strategy: DefaultCompressionStrategy, fields: Vec<Field>) -> Self {
295 Self { strategy, fields }
296 }
297}
298
299impl PerValueCompressor for PackedStructVariablePerValueEncoder {
300 fn compress(&self, data: DataBlock) -> Result<(PerValueDataBlock, CompressiveEncoding)> {
301 let DataBlock::Struct(struct_block) = data else {
302 return Err(Error::invalid_input(
303 "Packed struct encoder requires Struct data block",
304 location!(),
305 ));
306 };
307
308 if struct_block.children.is_empty() {
309 return Err(Error::invalid_input(
310 "Packed struct encoder requires at least one child field",
311 location!(),
312 ));
313 }
314 if struct_block.children.len() != self.fields.len() {
315 return Err(Error::invalid_input(
316 "Struct field metadata does not match number of children",
317 location!(),
318 ));
319 }
320
321 let num_values = struct_block.children[0].num_values();
322 for child in struct_block.children.iter() {
323 if child.num_values() != num_values {
324 return Err(Error::invalid_input(
325 "Packed struct children must have matching value counts",
326 location!(),
327 ));
328 }
329 }
330
331 let mut field_data = Vec::with_capacity(self.fields.len());
332 let mut field_metadata = Vec::with_capacity(self.fields.len());
333
334 for (field, child_block) in self.fields.iter().zip(struct_block.children.into_iter()) {
335 let compressor = crate::compression::CompressionStrategy::create_per_value(
336 &self.strategy,
337 field,
338 &child_block,
339 )?;
340 let (compressed, encoding) = compressor.compress(child_block)?;
341 match compressed {
342 PerValueDataBlock::Fixed(block) => {
343 field_metadata.push(ProtobufUtils21::packed_struct_field_fixed(
344 encoding,
345 block.bits_per_value,
346 ));
347 field_data.push(VariablePackedFieldData::Fixed { block });
348 }
349 PerValueDataBlock::Variable(block) => {
350 let bits_per_length = block.bits_per_offset as u64;
351 field_metadata.push(ProtobufUtils21::packed_struct_field_variable(
352 encoding,
353 bits_per_length,
354 ));
355 field_data.push(VariablePackedFieldData::Variable {
356 block,
357 bits_per_length,
358 });
359 }
360 }
361 }
362
363 let mut row_data: Vec<u8> = Vec::new();
364 let mut row_offsets: Vec<u64> = Vec::with_capacity(num_values as usize + 1);
365 row_offsets.push(0);
366 let mut total_bytes: usize = 0;
367 let mut max_row_len: usize = 0;
368 for row in 0..num_values as usize {
369 let start = row_data.len();
370 for field in &field_data {
371 field.append_row_bytes(row, &mut row_data)?;
372 }
373 let end = row_data.len();
374 let row_len = end - start;
375 max_row_len = max_row_len.max(row_len);
376 total_bytes = total_bytes.checked_add(row_len).ok_or_else(|| {
377 Error::invalid_input("Packed struct row data size overflow", location!())
378 })?;
379 row_offsets.push(end as u64);
380 }
381 debug_assert_eq!(total_bytes, row_data.len());
382
383 let use_u32_offsets = total_bytes <= u32::MAX as usize && max_row_len <= u32::MAX as usize;
384 let bits_per_offset = if use_u32_offsets { 32 } else { 64 };
385 let offsets_buffer = if use_u32_offsets {
386 let offsets_u32 = row_offsets
387 .iter()
388 .map(|&offset| offset as u32)
389 .collect::<Vec<_>>();
390 LanceBuffer::reinterpret_vec(offsets_u32)
391 } else {
392 LanceBuffer::reinterpret_vec(row_offsets)
393 };
394
395 let data_block = VariableWidthBlock {
396 data: LanceBuffer::from(row_data),
397 bits_per_offset,
398 offsets: offsets_buffer,
399 num_values,
400 block_info: BlockInfo::new(),
401 };
402
403 Ok((
404 PerValueDataBlock::Variable(data_block),
405 ProtobufUtils21::packed_struct_variable(field_metadata),
406 ))
407 }
408}
409
410#[derive(Debug)]
411pub(crate) enum VariablePackedStructFieldKind {
412 Fixed {
413 bits_per_value: u64,
414 decompressor: Arc<dyn FixedPerValueDecompressor>,
415 },
416 Variable {
417 bits_per_length: u64,
418 decompressor: Arc<dyn VariablePerValueDecompressor>,
419 },
420}
421
422#[derive(Debug)]
423pub(crate) struct VariablePackedStructFieldDecoder {
424 pub(crate) kind: VariablePackedStructFieldKind,
425}
426
427#[derive(Debug)]
428pub struct PackedStructVariablePerValueDecompressor {
429 fields: Vec<VariablePackedStructFieldDecoder>,
430}
431
432impl PackedStructVariablePerValueDecompressor {
433 pub(crate) fn new(fields: Vec<VariablePackedStructFieldDecoder>) -> Self {
434 Self { fields }
435 }
436}
437
438enum FieldAccumulator {
439 Fixed {
440 builder: DataBlockBuilder,
441 bits_per_value: u64,
442 },
443 Variable32 {
444 builder: DataBlockBuilder,
445 },
446 Variable64 {
447 builder: DataBlockBuilder,
448 },
449}
450
451impl VariablePerValueDecompressor for PackedStructVariablePerValueDecompressor {
452 fn decompress(&self, data: VariableWidthBlock) -> Result<DataBlock> {
453 let num_values = data.num_values;
454 let offsets_u64 = match data.bits_per_offset {
455 32 => data
456 .offsets
457 .borrow_to_typed_slice::<u32>()
458 .iter()
459 .map(|v| *v as u64)
460 .collect::<Vec<_>>(),
461 64 => data
462 .offsets
463 .borrow_to_typed_slice::<u64>()
464 .as_ref()
465 .to_vec(),
466 _ => {
467 return Err(Error::invalid_input(
468 "Packed struct row offsets must be 32 or 64 bits",
469 location!(),
470 ))
471 }
472 };
473
474 if offsets_u64.len() != num_values as usize + 1 {
475 return Err(Error::invalid_input(
476 "Packed struct row offsets length mismatch",
477 location!(),
478 ));
479 }
480
481 let mut accumulators = Vec::with_capacity(self.fields.len());
482 for field in &self.fields {
483 match &field.kind {
484 VariablePackedStructFieldKind::Fixed { bits_per_value, .. } => {
485 if bits_per_value % 8 != 0 {
486 return Err(Error::invalid_input(
487 "Packed struct fixed child must be byte-aligned",
488 location!(),
489 ));
490 }
491 let bytes_per_value = bits_per_value.checked_div(8).ok_or_else(|| {
492 Error::invalid_input(
493 "Invalid bits per value for packed struct field",
494 location!(),
495 )
496 })?;
497 let estimate = bytes_per_value.checked_mul(num_values).ok_or_else(|| {
498 Error::invalid_input(
499 "Packed struct fixed child allocation overflow",
500 location!(),
501 )
502 })?;
503 accumulators.push(FieldAccumulator::Fixed {
504 builder: DataBlockBuilder::with_capacity_estimate(estimate),
505 bits_per_value: *bits_per_value,
506 });
507 }
508 VariablePackedStructFieldKind::Variable {
509 bits_per_length, ..
510 } => match bits_per_length {
511 32 => accumulators.push(FieldAccumulator::Variable32 {
512 builder: DataBlockBuilder::with_capacity_estimate(data.data.len() as u64),
513 }),
514 64 => accumulators.push(FieldAccumulator::Variable64 {
515 builder: DataBlockBuilder::with_capacity_estimate(data.data.len() as u64),
516 }),
517 _ => {
518 return Err(Error::invalid_input(
519 "Packed struct variable child must use 32 or 64-bit length prefixes",
520 location!(),
521 ))
522 }
523 },
524 }
525 }
526
527 for row_idx in 0..num_values as usize {
528 let row_start = offsets_u64[row_idx] as usize;
529 let row_end = offsets_u64[row_idx + 1] as usize;
530 if row_end > data.data.len() || row_start > row_end {
531 return Err(Error::invalid_input(
532 "Packed struct row bounds exceed buffer",
533 location!(),
534 ));
535 }
536 let mut cursor = row_start;
537 for (field, accumulator) in self.fields.iter().zip(accumulators.iter_mut()) {
538 match (&field.kind, accumulator) {
539 (
540 VariablePackedStructFieldKind::Fixed { bits_per_value, .. },
541 FieldAccumulator::Fixed {
542 builder,
543 bits_per_value: acc_bits,
544 },
545 ) => {
546 debug_assert_eq!(bits_per_value, acc_bits);
547 let bytes_per_value = (bits_per_value / 8) as usize;
548 let end = cursor + bytes_per_value;
549 if end > row_end {
550 return Err(Error::invalid_input(
551 "Packed struct fixed child exceeds row bounds",
552 location!(),
553 ));
554 }
555 let value_block = DataBlock::FixedWidth(FixedWidthDataBlock {
556 data: LanceBuffer::from(data.data[cursor..end].to_vec()),
557 bits_per_value: *bits_per_value,
558 num_values: 1,
559 block_info: BlockInfo::new(),
560 });
561 builder.append(&value_block, 0..1);
562 cursor = end;
563 }
564 (
565 VariablePackedStructFieldKind::Variable {
566 bits_per_length, ..
567 },
568 FieldAccumulator::Variable32 { builder },
569 ) => {
570 if *bits_per_length != 32 {
571 return Err(Error::invalid_input(
572 "Packed struct length prefix size mismatch",
573 location!(),
574 ));
575 }
576 let end = cursor + std::mem::size_of::<u32>();
577 if end > row_end {
578 return Err(Error::invalid_input(
579 "Packed struct variable child length prefix out of bounds",
580 location!(),
581 ));
582 }
583 let len = u32::from_le_bytes(
584 data.data[cursor..end]
585 .try_into()
586 .expect("slice has exact length"),
587 ) as usize;
588 cursor = end;
589 let value_end = cursor + len;
590 if value_end > row_end {
591 return Err(Error::invalid_input(
592 "Packed struct variable child exceeds row bounds",
593 location!(),
594 ));
595 }
596 let value_block = DataBlock::VariableWidth(VariableWidthBlock {
597 data: LanceBuffer::from(data.data[cursor..value_end].to_vec()),
598 bits_per_offset: 32,
599 offsets: LanceBuffer::reinterpret_vec(vec![0_u32, len as u32]),
600 num_values: 1,
601 block_info: BlockInfo::new(),
602 });
603 builder.append(&value_block, 0..1);
604 cursor = value_end;
605 }
606 (
607 VariablePackedStructFieldKind::Variable {
608 bits_per_length, ..
609 },
610 FieldAccumulator::Variable64 { builder },
611 ) => {
612 if *bits_per_length != 64 {
613 return Err(Error::invalid_input(
614 "Packed struct length prefix size mismatch",
615 location!(),
616 ));
617 }
618 let end = cursor + std::mem::size_of::<u64>();
619 if end > row_end {
620 return Err(Error::invalid_input(
621 "Packed struct variable child length prefix out of bounds",
622 location!(),
623 ));
624 }
625 let len = u64::from_le_bytes(
626 data.data[cursor..end]
627 .try_into()
628 .expect("slice has exact length"),
629 ) as usize;
630 cursor = end;
631 let value_end = cursor + len;
632 if value_end > row_end {
633 return Err(Error::invalid_input(
634 "Packed struct variable child exceeds row bounds",
635 location!(),
636 ));
637 }
638 let value_block = DataBlock::VariableWidth(VariableWidthBlock {
639 data: LanceBuffer::from(data.data[cursor..value_end].to_vec()),
640 bits_per_offset: 64,
641 offsets: LanceBuffer::reinterpret_vec(vec![0_u64, len as u64]),
642 num_values: 1,
643 block_info: BlockInfo::new(),
644 });
645 builder.append(&value_block, 0..1);
646 cursor = value_end;
647 }
648 _ => {
649 return Err(Error::invalid_input(
650 "Packed struct accumulator kind mismatch",
651 location!(),
652 ))
653 }
654 }
655 }
656 if cursor != row_end {
657 return Err(Error::invalid_input(
658 "Packed struct row parsing did not consume full row",
659 location!(),
660 ));
661 }
662 }
663
664 let mut children = Vec::with_capacity(self.fields.len());
665 for (field, accumulator) in self.fields.iter().zip(accumulators.into_iter()) {
666 match (field, accumulator) {
667 (
668 VariablePackedStructFieldDecoder {
669 kind: VariablePackedStructFieldKind::Fixed { decompressor, .. },
670 },
671 FieldAccumulator::Fixed { builder, .. },
672 ) => {
673 let DataBlock::FixedWidth(block) = builder.finish() else {
674 panic!("Expected fixed-width datablock from builder");
675 };
676 let decoded = decompressor.decompress(block, num_values)?;
677 children.push(decoded);
678 }
679 (
680 VariablePackedStructFieldDecoder {
681 kind:
682 VariablePackedStructFieldKind::Variable {
683 bits_per_length,
684 decompressor,
685 },
686 },
687 FieldAccumulator::Variable32 { builder },
688 ) => {
689 let DataBlock::VariableWidth(mut block) = builder.finish() else {
690 panic!("Expected variable-width datablock from builder");
691 };
692 debug_assert_eq!(block.bits_per_offset, 32);
693 block.bits_per_offset = (*bits_per_length) as u8;
694 let decoded = decompressor.decompress(block)?;
695 children.push(decoded);
696 }
697 (
698 VariablePackedStructFieldDecoder {
699 kind:
700 VariablePackedStructFieldKind::Variable {
701 bits_per_length,
702 decompressor,
703 },
704 },
705 FieldAccumulator::Variable64 { builder },
706 ) => {
707 let DataBlock::VariableWidth(mut block) = builder.finish() else {
708 panic!("Expected variable-width datablock from builder");
709 };
710 debug_assert_eq!(block.bits_per_offset, 64);
711 block.bits_per_offset = (*bits_per_length) as u8;
712 let decoded = decompressor.decompress(block)?;
713 children.push(decoded);
714 }
715 _ => {
716 return Err(Error::invalid_input(
717 "Packed struct accumulator mismatch during finalize",
718 location!(),
719 ))
720 }
721 }
722 }
723
724 Ok(DataBlock::Struct(StructDataBlock {
725 children,
726 block_info: BlockInfo::new(),
727 validity: None,
728 }))
729 }
730}
731
732#[cfg(test)]
733mod tests {
734 use super::*;
735 use crate::{
736 compression::CompressionStrategy,
737 compression::{DefaultCompressionStrategy, DefaultDecompressionStrategy},
738 constants::PACKED_STRUCT_META_KEY,
739 statistics::ComputeStat,
740 testing::{check_round_trip_encoding_of_data, TestCases},
741 version::LanceFileVersion,
742 };
743 use arrow_array::{
744 Array, ArrayRef, BinaryArray, Int32Array, Int64Array, LargeStringArray, StringArray,
745 StructArray, UInt32Array,
746 };
747 use arrow_schema::{DataType, Field as ArrowField, Fields};
748 use std::collections::HashMap;
749 use std::sync::Arc;
750
751 fn fixed_block_from_array(array: Int64Array) -> FixedWidthDataBlock {
752 let num_values = array.len() as u64;
753 let block = DataBlock::from_arrays(&[Arc::new(array) as ArrayRef], num_values);
754 match block {
755 DataBlock::FixedWidth(block) => block,
756 _ => panic!("Expected fixed-width data block"),
757 }
758 }
759
760 fn fixed_i32_block_from_array(array: Int32Array) -> FixedWidthDataBlock {
761 let num_values = array.len() as u64;
762 let block = DataBlock::from_arrays(&[Arc::new(array) as ArrayRef], num_values);
763 match block {
764 DataBlock::FixedWidth(block) => block,
765 _ => panic!("Expected fixed-width data block"),
766 }
767 }
768
769 fn variable_block_from_string_array(array: StringArray) -> VariableWidthBlock {
770 let num_values = array.len() as u64;
771 let block = DataBlock::from_arrays(&[Arc::new(array) as ArrayRef], num_values);
772 match block {
773 DataBlock::VariableWidth(block) => block,
774 _ => panic!("Expected variable-width block"),
775 }
776 }
777
778 fn variable_block_from_large_string_array(array: LargeStringArray) -> VariableWidthBlock {
779 let num_values = array.len() as u64;
780 let block = DataBlock::from_arrays(&[Arc::new(array) as ArrayRef], num_values);
781 match block {
782 DataBlock::VariableWidth(block) => block,
783 _ => panic!("Expected variable-width block"),
784 }
785 }
786
787 fn variable_block_from_binary_array(array: BinaryArray) -> VariableWidthBlock {
788 let num_values = array.len() as u64;
789 let block = DataBlock::from_arrays(&[Arc::new(array) as ArrayRef], num_values);
790 match block {
791 DataBlock::VariableWidth(block) => block,
792 _ => panic!("Expected variable-width block"),
793 }
794 }
795
796 #[test]
797 fn variable_packed_struct_round_trip() -> Result<()> {
798 let arrow_fields: Fields = vec![
799 ArrowField::new("id", DataType::UInt32, false),
800 ArrowField::new("name", DataType::Utf8, true),
801 ]
802 .into();
803 let arrow_struct = ArrowField::new("item", DataType::Struct(arrow_fields), false);
804 let struct_field = Field::try_from(&arrow_struct)?;
805
806 let ids = vec![1_u32, 2, 42];
807 let id_bytes = ids
808 .iter()
809 .flat_map(|value| value.to_le_bytes())
810 .collect::<Vec<_>>();
811 let mut id_block = FixedWidthDataBlock {
812 data: LanceBuffer::reinterpret_vec(ids),
813 bits_per_value: 32,
814 num_values: 3,
815 block_info: BlockInfo::new(),
816 };
817 id_block.compute_stat();
818 let id_block = DataBlock::FixedWidth(id_block);
819
820 let name_offsets = vec![0_i32, 1, 4, 4];
821 let name_bytes = b"abcz".to_vec();
822 let mut name_block = VariableWidthBlock {
823 data: LanceBuffer::from(name_bytes.clone()),
824 bits_per_offset: 32,
825 offsets: LanceBuffer::reinterpret_vec(name_offsets.clone()),
826 num_values: 3,
827 block_info: BlockInfo::new(),
828 };
829 name_block.compute_stat();
830 let name_block = DataBlock::VariableWidth(name_block);
831
832 let struct_block = StructDataBlock {
833 children: vec![id_block, name_block],
834 block_info: BlockInfo::new(),
835 validity: None,
836 };
837
838 let data_block = DataBlock::Struct(struct_block);
839
840 let compression_strategy =
841 DefaultCompressionStrategy::new().with_version(LanceFileVersion::V2_2);
842 let compressor = crate::compression::CompressionStrategy::create_per_value(
843 &compression_strategy,
844 &struct_field,
845 &data_block,
846 )?;
847 let (compressed, encoding) = compressor.compress(data_block)?;
848
849 let PerValueDataBlock::Variable(zipped) = compressed else {
850 panic!("expected variable-width packed struct output");
851 };
852
853 let decompression_strategy = DefaultDecompressionStrategy::default();
854 let decompressor =
855 crate::compression::DecompressionStrategy::create_variable_per_value_decompressor(
856 &decompression_strategy,
857 &encoding,
858 )?;
859 let decoded = decompressor.decompress(zipped)?;
860
861 let DataBlock::Struct(decoded_struct) = decoded else {
862 panic!("expected struct datablock after decode");
863 };
864
865 let decoded_id = decoded_struct.children[0].as_fixed_width_ref().unwrap();
866 assert_eq!(decoded_id.bits_per_value, 32);
867 assert_eq!(decoded_id.data.as_ref(), id_bytes.as_slice());
868
869 let decoded_name = decoded_struct.children[1].as_variable_width_ref().unwrap();
870 assert_eq!(decoded_name.bits_per_offset, 32);
871 let decoded_offsets = decoded_name.offsets.borrow_to_typed_slice::<i32>();
872 assert_eq!(decoded_offsets.as_ref(), name_offsets.as_slice());
873 assert_eq!(decoded_name.data.as_ref(), name_bytes.as_slice());
874
875 Ok(())
876 }
877
878 #[test]
879 fn variable_packed_struct_large_utf8_round_trip() -> Result<()> {
880 let arrow_fields: Fields = vec![
881 ArrowField::new("value", DataType::Int64, false),
882 ArrowField::new("text", DataType::LargeUtf8, false),
883 ]
884 .into();
885 let arrow_struct = ArrowField::new("item", DataType::Struct(arrow_fields), false);
886 let struct_field = Field::try_from(&arrow_struct)?;
887
888 let id_block = fixed_block_from_array(Int64Array::from(vec![10, 20, 30, 40]));
889 let payload_array = LargeStringArray::from(vec![
890 "alpha",
891 "a considerably longer payload for testing",
892 "mid",
893 "z",
894 ]);
895 let payload_block = variable_block_from_large_string_array(payload_array);
896
897 let struct_block = StructDataBlock {
898 children: vec![
899 DataBlock::FixedWidth(id_block.clone()),
900 DataBlock::VariableWidth(payload_block.clone()),
901 ],
902 block_info: BlockInfo::new(),
903 validity: None,
904 };
905
906 let data_block = DataBlock::Struct(struct_block);
907
908 let compression_strategy =
909 DefaultCompressionStrategy::new().with_version(LanceFileVersion::V2_2);
910 let compressor = crate::compression::CompressionStrategy::create_per_value(
911 &compression_strategy,
912 &struct_field,
913 &data_block,
914 )?;
915 let (compressed, encoding) = compressor.compress(data_block)?;
916
917 let PerValueDataBlock::Variable(zipped) = compressed else {
918 panic!("expected variable-width packed struct output");
919 };
920
921 let decompression_strategy = DefaultDecompressionStrategy::default();
922 let decompressor =
923 crate::compression::DecompressionStrategy::create_variable_per_value_decompressor(
924 &decompression_strategy,
925 &encoding,
926 )?;
927 let decoded = decompressor.decompress(zipped)?;
928
929 let DataBlock::Struct(decoded_struct) = decoded else {
930 panic!("expected struct datablock after decode");
931 };
932
933 let decoded_id = decoded_struct.children[0].as_fixed_width_ref().unwrap();
934 assert_eq!(decoded_id.bits_per_value, 64);
935 assert_eq!(decoded_id.data.as_ref(), id_block.data.as_ref());
936
937 let decoded_payload = decoded_struct.children[1].as_variable_width_ref().unwrap();
938 assert_eq!(decoded_payload.bits_per_offset, 64);
939 assert_eq!(
940 decoded_payload
941 .offsets
942 .borrow_to_typed_slice::<i64>()
943 .as_ref(),
944 payload_block
945 .offsets
946 .borrow_to_typed_slice::<i64>()
947 .as_ref()
948 );
949 assert_eq!(decoded_payload.data.as_ref(), payload_block.data.as_ref());
950
951 Ok(())
952 }
953
954 #[tokio::test]
955 async fn variable_packed_struct_utf8_round_trip() {
956 let fields = Fields::from(vec![
958 Arc::new(ArrowField::new("id", DataType::UInt32, false)),
959 Arc::new(ArrowField::new("uri", DataType::Utf8, false)),
960 Arc::new(ArrowField::new("long_text", DataType::LargeUtf8, false)),
961 ]);
962
963 let mut meta = HashMap::new();
965 meta.insert(PACKED_STRUCT_META_KEY.to_string(), "true".to_string());
966
967 let array = Arc::new(StructArray::from(vec![
968 (
969 fields[0].clone(),
970 Arc::new(UInt32Array::from(vec![1, 2, 3])) as ArrayRef,
971 ),
972 (
973 fields[1].clone(),
974 Arc::new(StringArray::from(vec![
975 Some("a"),
976 Some("b"),
977 Some("/tmp/x"),
978 ])) as ArrayRef,
979 ),
980 (
981 fields[2].clone(),
982 Arc::new(LargeStringArray::from(vec![
983 Some("alpha"),
984 Some("a considerably longer payload for testing"),
985 Some("mid"),
986 ])) as ArrayRef,
987 ),
988 ]));
989
990 let test_cases = TestCases::default()
991 .with_min_file_version(LanceFileVersion::V2_2)
992 .with_expected_encoding("variable_packed_struct");
993
994 check_round_trip_encoding_of_data(vec![array], &test_cases, meta).await;
995 }
996
997 #[test]
998 fn variable_packed_struct_multi_variable_round_trip() -> Result<()> {
999 let arrow_fields: Fields = vec![
1000 ArrowField::new("category", DataType::Utf8, false),
1001 ArrowField::new("payload", DataType::Binary, false),
1002 ArrowField::new("count", DataType::Int32, false),
1003 ]
1004 .into();
1005 let arrow_struct = ArrowField::new("item", DataType::Struct(arrow_fields), false);
1006 let struct_field = Field::try_from(&arrow_struct)?;
1007
1008 let category_array = StringArray::from(vec!["red", "blue", "green", "red"]);
1009 let category_block = variable_block_from_string_array(category_array);
1010 let payload_values: Vec<Vec<u8>> =
1011 vec![vec![0x01, 0x02], vec![], vec![0x05, 0x06, 0x07], vec![0xff]];
1012 let payload_array =
1013 BinaryArray::from_iter_values(payload_values.iter().map(|v| v.as_slice()));
1014 let payload_block = variable_block_from_binary_array(payload_array);
1015 let count_block = fixed_i32_block_from_array(Int32Array::from(vec![1, 2, 3, 4]));
1016
1017 let struct_block = StructDataBlock {
1018 children: vec![
1019 DataBlock::VariableWidth(category_block.clone()),
1020 DataBlock::VariableWidth(payload_block.clone()),
1021 DataBlock::FixedWidth(count_block.clone()),
1022 ],
1023 block_info: BlockInfo::new(),
1024 validity: None,
1025 };
1026
1027 let data_block = DataBlock::Struct(struct_block);
1028
1029 let compression_strategy =
1030 DefaultCompressionStrategy::new().with_version(LanceFileVersion::V2_2);
1031 let compressor = crate::compression::CompressionStrategy::create_per_value(
1032 &compression_strategy,
1033 &struct_field,
1034 &data_block,
1035 )?;
1036 let (compressed, encoding) = compressor.compress(data_block)?;
1037
1038 let PerValueDataBlock::Variable(zipped) = compressed else {
1039 panic!("expected variable-width packed struct output");
1040 };
1041
1042 let decompression_strategy = DefaultDecompressionStrategy::default();
1043 let decompressor =
1044 crate::compression::DecompressionStrategy::create_variable_per_value_decompressor(
1045 &decompression_strategy,
1046 &encoding,
1047 )?;
1048 let decoded = decompressor.decompress(zipped)?;
1049
1050 let DataBlock::Struct(decoded_struct) = decoded else {
1051 panic!("expected struct datablock after decode");
1052 };
1053
1054 let decoded_category = decoded_struct.children[0].as_variable_width_ref().unwrap();
1055 assert_eq!(decoded_category.bits_per_offset, 32);
1056 assert_eq!(
1057 decoded_category
1058 .offsets
1059 .borrow_to_typed_slice::<i32>()
1060 .as_ref(),
1061 category_block
1062 .offsets
1063 .borrow_to_typed_slice::<i32>()
1064 .as_ref()
1065 );
1066 assert_eq!(decoded_category.data.as_ref(), category_block.data.as_ref());
1067
1068 let decoded_payload = decoded_struct.children[1].as_variable_width_ref().unwrap();
1069 assert_eq!(decoded_payload.bits_per_offset, 32);
1070 assert_eq!(
1071 decoded_payload
1072 .offsets
1073 .borrow_to_typed_slice::<i32>()
1074 .as_ref(),
1075 payload_block
1076 .offsets
1077 .borrow_to_typed_slice::<i32>()
1078 .as_ref()
1079 );
1080 assert_eq!(decoded_payload.data.as_ref(), payload_block.data.as_ref());
1081
1082 let decoded_count = decoded_struct.children[2].as_fixed_width_ref().unwrap();
1083 assert_eq!(decoded_count.bits_per_value, 32);
1084 assert_eq!(decoded_count.data.as_ref(), count_block.data.as_ref());
1085
1086 Ok(())
1087 }
1088
1089 #[test]
1090 fn variable_packed_struct_requires_v22() {
1091 let arrow_fields: Fields = vec![
1092 ArrowField::new("value", DataType::Int64, false),
1093 ArrowField::new("text", DataType::Utf8, false),
1094 ]
1095 .into();
1096 let arrow_struct = ArrowField::new("item", DataType::Struct(arrow_fields), false);
1097 let struct_field = Field::try_from(&arrow_struct).unwrap();
1098
1099 let value_block = fixed_block_from_array(Int64Array::from(vec![1, 2, 3]));
1100 let text_block =
1101 variable_block_from_string_array(StringArray::from(vec!["a", "bb", "ccc"]));
1102
1103 let struct_block = StructDataBlock {
1104 children: vec![
1105 DataBlock::FixedWidth(value_block),
1106 DataBlock::VariableWidth(text_block),
1107 ],
1108 block_info: BlockInfo::new(),
1109 validity: None,
1110 };
1111
1112 let compression_strategy =
1113 DefaultCompressionStrategy::new().with_version(LanceFileVersion::V2_1);
1114 let result =
1115 compression_strategy.create_per_value(&struct_field, &DataBlock::Struct(struct_block));
1116
1117 assert!(matches!(result, Err(Error::NotSupported { .. })));
1118 }
1119}