1use std::{convert::TryInto, sync::Arc};
13
14use arrow_array::types::UInt64Type;
15
16use lance_core::{datatypes::Field, Error, Result};
17use snafu::location;
18
19use crate::{
20 buffer::LanceBuffer,
21 compression::{
22 DefaultCompressionStrategy, FixedPerValueDecompressor, MiniBlockDecompressor,
23 VariablePerValueDecompressor,
24 },
25 data::{
26 BlockInfo, DataBlock, DataBlockBuilder, FixedWidthDataBlock, StructDataBlock,
27 VariableWidthBlock,
28 },
29 encodings::logical::primitive::{
30 fullzip::{PerValueCompressor, PerValueDataBlock},
31 miniblock::{MiniBlockCompressed, MiniBlockCompressor},
32 },
33 format::{
34 pb21::{compressive_encoding::Compression, CompressiveEncoding, PackedStruct},
35 ProtobufUtils21,
36 },
37 statistics::{GetStat, Stat},
38};
39
40use super::value::{ValueDecompressor, ValueEncoder};
41
42fn struct_data_block_to_fixed_width_data_block(
46 struct_data_block: StructDataBlock,
47 bits_per_values: &[u64],
48) -> DataBlock {
49 let data_size = struct_data_block.expect_single_stat::<UInt64Type>(Stat::DataSize);
50 let mut output = Vec::with_capacity(data_size as usize);
51 let num_values = struct_data_block.children[0].num_values();
52
53 for i in 0..num_values as usize {
54 for (j, child) in struct_data_block.children.iter().enumerate() {
55 let bytes_per_value = (bits_per_values[j] / 8) as usize;
56 let this_data = child
57 .as_fixed_width_ref()
58 .unwrap()
59 .data
60 .slice_with_length(bytes_per_value * i, bytes_per_value);
61 output.extend_from_slice(&this_data);
62 }
63 }
64
65 DataBlock::FixedWidth(FixedWidthDataBlock {
66 bits_per_value: bits_per_values.iter().copied().sum(),
67 data: LanceBuffer::from(output),
68 num_values,
69 block_info: BlockInfo::default(),
70 })
71}
72
73#[derive(Debug, Default)]
74pub struct PackedStructFixedWidthMiniBlockEncoder {}
75
76impl MiniBlockCompressor for PackedStructFixedWidthMiniBlockEncoder {
77 fn compress(&self, data: DataBlock) -> Result<(MiniBlockCompressed, CompressiveEncoding)> {
78 match data {
79 DataBlock::Struct(struct_data_block) => {
80 let bits_per_values = struct_data_block.children.iter().map(|data_block| data_block.as_fixed_width_ref().unwrap().bits_per_value).collect::<Vec<_>>();
81
82 let data_block = struct_data_block_to_fixed_width_data_block(struct_data_block, &bits_per_values);
84
85 let value_miniblock_compressor = Box::new(ValueEncoder::default()) as Box<dyn MiniBlockCompressor>;
87 let (value_miniblock_compressed, value_array_encoding) =
88 value_miniblock_compressor.compress(data_block)?;
89
90 Ok((
91 value_miniblock_compressed,
92 ProtobufUtils21::packed_struct(value_array_encoding, bits_per_values),
93 ))
94 }
95 _ => Err(Error::InvalidInput {
96 source: format!(
97 "Cannot compress a data block of type {} with PackedStructFixedWidthBlockEncoder",
98 data.name()
99 )
100 .into(),
101 location: location!(),
102 }),
103 }
104 }
105}
106
107#[derive(Debug)]
108pub struct PackedStructFixedWidthMiniBlockDecompressor {
109 bits_per_values: Vec<u64>,
110 array_encoding: Box<dyn MiniBlockDecompressor>,
111}
112
113impl PackedStructFixedWidthMiniBlockDecompressor {
114 pub fn new(description: &PackedStruct) -> Self {
115 let array_encoding: Box<dyn MiniBlockDecompressor> = match description.values.as_ref().unwrap().compression.as_ref().unwrap() {
116 Compression::Flat(flat) => Box::new(ValueDecompressor::from_flat(flat)),
117 _ => panic!("Currently only `ArrayEncoding::Flat` is supported in packed struct encoding in Lance 2.1."),
118 };
119 Self {
120 bits_per_values: description.bits_per_value.clone(),
121 array_encoding,
122 }
123 }
124}
125
126impl MiniBlockDecompressor for PackedStructFixedWidthMiniBlockDecompressor {
127 fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock> {
128 assert_eq!(data.len(), 1);
129 let encoded_data_block = self.array_encoding.decompress(data, num_values)?;
130 let DataBlock::FixedWidth(encoded_data_block) = encoded_data_block else {
131 panic!("ValueDecompressor should output FixedWidth DataBlock")
132 };
133
134 let bytes_per_values = self
135 .bits_per_values
136 .iter()
137 .map(|bits_per_value| *bits_per_value as usize / 8)
138 .collect::<Vec<_>>();
139
140 assert!(encoded_data_block.bits_per_value % 8 == 0);
141 let encoded_bytes_per_row = (encoded_data_block.bits_per_value / 8) as usize;
142
143 let mut prefix_sum = vec![0; self.bits_per_values.len()];
145 for i in 0..(self.bits_per_values.len() - 1) {
146 prefix_sum[i + 1] = prefix_sum[i] + bytes_per_values[i];
147 }
148
149 let mut children_data_block = vec![];
150 for i in 0..self.bits_per_values.len() {
151 let child_buf_size = bytes_per_values[i] * num_values as usize;
152 let mut child_buf: Vec<u8> = Vec::with_capacity(child_buf_size);
153
154 for j in 0..num_values as usize {
155 let this_value = encoded_data_block.data.slice_with_length(
157 prefix_sum[i] + (j * encoded_bytes_per_row),
158 bytes_per_values[i],
159 );
160
161 child_buf.extend_from_slice(&this_value);
162 }
163
164 let child = DataBlock::FixedWidth(FixedWidthDataBlock {
165 data: LanceBuffer::from(child_buf),
166 bits_per_value: self.bits_per_values[i],
167 num_values,
168 block_info: BlockInfo::default(),
169 });
170 children_data_block.push(child);
171 }
172 Ok(DataBlock::Struct(StructDataBlock {
173 children: children_data_block,
174 block_info: BlockInfo::default(),
175 validity: None,
176 }))
177 }
178}
179
180#[derive(Debug)]
181enum VariablePackedFieldData {
182 Fixed {
183 block: FixedWidthDataBlock,
184 },
185 Variable {
186 block: VariableWidthBlock,
187 bits_per_length: u64,
188 },
189}
190
191impl VariablePackedFieldData {
192 fn append_row_bytes(&self, row_idx: usize, output: &mut Vec<u8>) -> Result<()> {
193 match self {
194 Self::Fixed { block } => {
195 let bits_per_value = block.bits_per_value;
196 if bits_per_value % 8 != 0 {
197 return Err(Error::invalid_input(
198 "Packed struct variable encoding requires byte-aligned fixed-width children",
199 location!(),
200 ));
201 }
202 let bytes_per_value = (bits_per_value / 8) as usize;
203 let start = row_idx.checked_mul(bytes_per_value).ok_or_else(|| {
204 Error::invalid_input("Packed struct row size overflow", location!())
205 })?;
206 let end = start + bytes_per_value;
207 let data = block.data.as_ref();
208 if end > data.len() {
209 return Err(Error::invalid_input(
210 "Packed struct fixed child out of bounds",
211 location!(),
212 ));
213 }
214 output.extend_from_slice(&data[start..end]);
215 Ok(())
216 }
217 Self::Variable {
218 block,
219 bits_per_length,
220 } => {
221 if bits_per_length % 8 != 0 {
222 return Err(Error::invalid_input(
223 "Packed struct variable children must have byte-aligned length prefixes",
224 location!(),
225 ));
226 }
227 let prefix_bytes = (*bits_per_length / 8) as usize;
228 if !(prefix_bytes == 4 || prefix_bytes == 8) {
229 return Err(Error::invalid_input(
230 "Packed struct variable children must use 32 or 64-bit length prefixes",
231 location!(),
232 ));
233 }
234 match block.bits_per_offset {
235 32 => {
236 let offsets = block.offsets.borrow_to_typed_slice::<u32>();
237 let start = offsets[row_idx] as usize;
238 let end = offsets[row_idx + 1] as usize;
239 if end > block.data.len() {
240 return Err(Error::invalid_input(
241 "Packed struct variable child offsets out of bounds",
242 location!(),
243 ));
244 }
245 let len = (end - start) as u32;
246 if prefix_bytes != std::mem::size_of::<u32>() {
247 return Err(Error::invalid_input(
248 "Packed struct variable child length prefix mismatch",
249 location!(),
250 ));
251 }
252 output.extend_from_slice(&len.to_le_bytes());
253 output.extend_from_slice(&block.data[start..end]);
254 Ok(())
255 }
256 64 => {
257 let offsets = block.offsets.borrow_to_typed_slice::<u64>();
258 let start = offsets[row_idx] as usize;
259 let end = offsets[row_idx + 1] as usize;
260 if end > block.data.len() {
261 return Err(Error::invalid_input(
262 "Packed struct variable child offsets out of bounds",
263 location!(),
264 ));
265 }
266 let len = (end - start) as u64;
267 if prefix_bytes != std::mem::size_of::<u64>() {
268 return Err(Error::invalid_input(
269 "Packed struct variable child length prefix mismatch",
270 location!(),
271 ));
272 }
273 output.extend_from_slice(&len.to_le_bytes());
274 output.extend_from_slice(&block.data[start..end]);
275 Ok(())
276 }
277 _ => Err(Error::invalid_input(
278 "Packed struct variable child must use 32 or 64-bit offsets",
279 location!(),
280 )),
281 }
282 }
283 }
284 }
285}
286
287#[derive(Debug)]
288pub struct PackedStructVariablePerValueEncoder {
289 strategy: DefaultCompressionStrategy,
290 fields: Vec<Field>,
291}
292
293impl PackedStructVariablePerValueEncoder {
294 pub fn new(strategy: DefaultCompressionStrategy, fields: Vec<Field>) -> Self {
295 Self { strategy, fields }
296 }
297}
298
299impl PerValueCompressor for PackedStructVariablePerValueEncoder {
300 fn compress(&self, data: DataBlock) -> Result<(PerValueDataBlock, CompressiveEncoding)> {
301 let DataBlock::Struct(struct_block) = data else {
302 return Err(Error::invalid_input(
303 "Packed struct encoder requires Struct data block",
304 location!(),
305 ));
306 };
307
308 if struct_block.children.is_empty() {
309 return Err(Error::invalid_input(
310 "Packed struct encoder requires at least one child field",
311 location!(),
312 ));
313 }
314 if struct_block.children.len() != self.fields.len() {
315 return Err(Error::invalid_input(
316 "Struct field metadata does not match number of children",
317 location!(),
318 ));
319 }
320
321 let num_values = struct_block.children[0].num_values();
322 for child in struct_block.children.iter() {
323 if child.num_values() != num_values {
324 return Err(Error::invalid_input(
325 "Packed struct children must have matching value counts",
326 location!(),
327 ));
328 }
329 }
330
331 let mut field_data = Vec::with_capacity(self.fields.len());
332 let mut field_metadata = Vec::with_capacity(self.fields.len());
333
334 for (field, child_block) in self.fields.iter().zip(struct_block.children.into_iter()) {
335 let compressor = crate::compression::CompressionStrategy::create_per_value(
336 &self.strategy,
337 field,
338 &child_block,
339 )?;
340 let (compressed, encoding) = compressor.compress(child_block)?;
341 match compressed {
342 PerValueDataBlock::Fixed(block) => {
343 field_metadata.push(ProtobufUtils21::packed_struct_field_fixed(
344 encoding,
345 block.bits_per_value,
346 ));
347 field_data.push(VariablePackedFieldData::Fixed { block });
348 }
349 PerValueDataBlock::Variable(block) => {
350 let bits_per_length = block.bits_per_offset as u64;
351 field_metadata.push(ProtobufUtils21::packed_struct_field_variable(
352 encoding,
353 bits_per_length,
354 ));
355 field_data.push(VariablePackedFieldData::Variable {
356 block,
357 bits_per_length,
358 });
359 }
360 }
361 }
362
363 let mut row_data: Vec<u8> = Vec::new();
364 let mut row_offsets: Vec<u64> = Vec::with_capacity(num_values as usize + 1);
365 row_offsets.push(0);
366 let mut total_bytes: usize = 0;
367 let mut max_row_len: usize = 0;
368 for row in 0..num_values as usize {
369 let start = row_data.len();
370 for field in &field_data {
371 field.append_row_bytes(row, &mut row_data)?;
372 }
373 let end = row_data.len();
374 let row_len = end - start;
375 max_row_len = max_row_len.max(row_len);
376 total_bytes = total_bytes.checked_add(row_len).ok_or_else(|| {
377 Error::invalid_input("Packed struct row data size overflow", location!())
378 })?;
379 row_offsets.push(end as u64);
380 }
381 debug_assert_eq!(total_bytes, row_data.len());
382
383 let use_u32_offsets = total_bytes <= u32::MAX as usize && max_row_len <= u32::MAX as usize;
384 let bits_per_offset = if use_u32_offsets { 32 } else { 64 };
385 let offsets_buffer = if use_u32_offsets {
386 let offsets_u32 = row_offsets
387 .iter()
388 .map(|&offset| offset as u32)
389 .collect::<Vec<_>>();
390 LanceBuffer::reinterpret_vec(offsets_u32)
391 } else {
392 LanceBuffer::reinterpret_vec(row_offsets)
393 };
394
395 let data_block = VariableWidthBlock {
396 data: LanceBuffer::from(row_data),
397 bits_per_offset,
398 offsets: offsets_buffer,
399 num_values,
400 block_info: BlockInfo::new(),
401 };
402
403 Ok((
404 PerValueDataBlock::Variable(data_block),
405 ProtobufUtils21::packed_struct_variable(field_metadata),
406 ))
407 }
408}
409
410#[derive(Debug)]
411pub(crate) enum VariablePackedStructFieldKind {
412 Fixed {
413 bits_per_value: u64,
414 decompressor: Arc<dyn FixedPerValueDecompressor>,
415 },
416 Variable {
417 bits_per_length: u64,
418 decompressor: Arc<dyn VariablePerValueDecompressor>,
419 },
420}
421
422#[derive(Debug)]
423pub(crate) struct VariablePackedStructFieldDecoder {
424 pub(crate) kind: VariablePackedStructFieldKind,
425}
426
427#[derive(Debug)]
428pub struct PackedStructVariablePerValueDecompressor {
429 fields: Vec<VariablePackedStructFieldDecoder>,
430}
431
432impl PackedStructVariablePerValueDecompressor {
433 pub(crate) fn new(fields: Vec<VariablePackedStructFieldDecoder>) -> Self {
434 Self { fields }
435 }
436}
437
438enum FieldAccumulator {
439 Fixed {
440 builder: DataBlockBuilder,
441 bits_per_value: u64,
442 },
443 Variable32 {
444 builder: DataBlockBuilder,
445 },
446 Variable64 {
447 builder: DataBlockBuilder,
448 },
449}
450
451impl VariablePerValueDecompressor for PackedStructVariablePerValueDecompressor {
452 fn decompress(&self, data: VariableWidthBlock) -> Result<DataBlock> {
453 let num_values = data.num_values;
454 let offsets_u64 = match data.bits_per_offset {
455 32 => data
456 .offsets
457 .borrow_to_typed_slice::<u32>()
458 .iter()
459 .map(|v| *v as u64)
460 .collect::<Vec<_>>(),
461 64 => data
462 .offsets
463 .borrow_to_typed_slice::<u64>()
464 .as_ref()
465 .to_vec(),
466 _ => {
467 return Err(Error::invalid_input(
468 "Packed struct row offsets must be 32 or 64 bits",
469 location!(),
470 ))
471 }
472 };
473
474 if offsets_u64.len() != num_values as usize + 1 {
475 return Err(Error::invalid_input(
476 "Packed struct row offsets length mismatch",
477 location!(),
478 ));
479 }
480
481 let mut accumulators = Vec::with_capacity(self.fields.len());
482 for field in &self.fields {
483 match &field.kind {
484 VariablePackedStructFieldKind::Fixed { bits_per_value, .. } => {
485 if bits_per_value % 8 != 0 {
486 return Err(Error::invalid_input(
487 "Packed struct fixed child must be byte-aligned",
488 location!(),
489 ));
490 }
491 let bytes_per_value = bits_per_value.checked_div(8).ok_or_else(|| {
492 Error::invalid_input(
493 "Invalid bits per value for packed struct field",
494 location!(),
495 )
496 })?;
497 let estimate = bytes_per_value.checked_mul(num_values).ok_or_else(|| {
498 Error::invalid_input(
499 "Packed struct fixed child allocation overflow",
500 location!(),
501 )
502 })?;
503 accumulators.push(FieldAccumulator::Fixed {
504 builder: DataBlockBuilder::with_capacity_estimate(estimate),
505 bits_per_value: *bits_per_value,
506 });
507 }
508 VariablePackedStructFieldKind::Variable {
509 bits_per_length, ..
510 } => match bits_per_length {
511 32 => accumulators.push(FieldAccumulator::Variable32 {
512 builder: DataBlockBuilder::with_capacity_estimate(data.data.len() as u64),
513 }),
514 64 => accumulators.push(FieldAccumulator::Variable64 {
515 builder: DataBlockBuilder::with_capacity_estimate(data.data.len() as u64),
516 }),
517 _ => {
518 return Err(Error::invalid_input(
519 "Packed struct variable child must use 32 or 64-bit length prefixes",
520 location!(),
521 ))
522 }
523 },
524 }
525 }
526
527 for row_idx in 0..num_values as usize {
528 let row_start = offsets_u64[row_idx] as usize;
529 let row_end = offsets_u64[row_idx + 1] as usize;
530 if row_end > data.data.len() || row_start > row_end {
531 return Err(Error::invalid_input(
532 "Packed struct row bounds exceed buffer",
533 location!(),
534 ));
535 }
536 let mut cursor = row_start;
537 for (field, accumulator) in self.fields.iter().zip(accumulators.iter_mut()) {
538 match (&field.kind, accumulator) {
539 (
540 VariablePackedStructFieldKind::Fixed { bits_per_value, .. },
541 FieldAccumulator::Fixed {
542 builder,
543 bits_per_value: acc_bits,
544 },
545 ) => {
546 debug_assert_eq!(bits_per_value, acc_bits);
547 let bytes_per_value = (bits_per_value / 8) as usize;
548 let end = cursor + bytes_per_value;
549 if end > row_end {
550 return Err(Error::invalid_input(
551 "Packed struct fixed child exceeds row bounds",
552 location!(),
553 ));
554 }
555 let value_block = DataBlock::FixedWidth(FixedWidthDataBlock {
556 data: LanceBuffer::from(data.data[cursor..end].to_vec()),
557 bits_per_value: *bits_per_value,
558 num_values: 1,
559 block_info: BlockInfo::new(),
560 });
561 builder.append(&value_block, 0..1);
562 cursor = end;
563 }
564 (
565 VariablePackedStructFieldKind::Variable {
566 bits_per_length, ..
567 },
568 FieldAccumulator::Variable32 { builder },
569 ) => {
570 if *bits_per_length != 32 {
571 return Err(Error::invalid_input(
572 "Packed struct length prefix size mismatch",
573 location!(),
574 ));
575 }
576 let end = cursor + std::mem::size_of::<u32>();
577 if end > row_end {
578 return Err(Error::invalid_input(
579 "Packed struct variable child length prefix out of bounds",
580 location!(),
581 ));
582 }
583 let len = u32::from_le_bytes(
584 data.data[cursor..end]
585 .try_into()
586 .expect("slice has exact length"),
587 ) as usize;
588 cursor = end;
589 let value_end = cursor + len;
590 if value_end > row_end {
591 return Err(Error::invalid_input(
592 "Packed struct variable child exceeds row bounds",
593 location!(),
594 ));
595 }
596 let value_block = DataBlock::VariableWidth(VariableWidthBlock {
597 data: LanceBuffer::from(data.data[cursor..value_end].to_vec()),
598 bits_per_offset: 32,
599 offsets: LanceBuffer::reinterpret_vec(vec![0_u32, len as u32]),
600 num_values: 1,
601 block_info: BlockInfo::new(),
602 });
603 builder.append(&value_block, 0..1);
604 cursor = value_end;
605 }
606 (
607 VariablePackedStructFieldKind::Variable {
608 bits_per_length, ..
609 },
610 FieldAccumulator::Variable64 { builder },
611 ) => {
612 if *bits_per_length != 64 {
613 return Err(Error::invalid_input(
614 "Packed struct length prefix size mismatch",
615 location!(),
616 ));
617 }
618 let end = cursor + std::mem::size_of::<u64>();
619 if end > row_end {
620 return Err(Error::invalid_input(
621 "Packed struct variable child length prefix out of bounds",
622 location!(),
623 ));
624 }
625 let len = u64::from_le_bytes(
626 data.data[cursor..end]
627 .try_into()
628 .expect("slice has exact length"),
629 ) as usize;
630 cursor = end;
631 let value_end = cursor + len;
632 if value_end > row_end {
633 return Err(Error::invalid_input(
634 "Packed struct variable child exceeds row bounds",
635 location!(),
636 ));
637 }
638 let value_block = DataBlock::VariableWidth(VariableWidthBlock {
639 data: LanceBuffer::from(data.data[cursor..value_end].to_vec()),
640 bits_per_offset: 64,
641 offsets: LanceBuffer::reinterpret_vec(vec![0_u64, len as u64]),
642 num_values: 1,
643 block_info: BlockInfo::new(),
644 });
645 builder.append(&value_block, 0..1);
646 cursor = value_end;
647 }
648 _ => {
649 return Err(Error::invalid_input(
650 "Packed struct accumulator kind mismatch",
651 location!(),
652 ))
653 }
654 }
655 }
656 if cursor != row_end {
657 return Err(Error::invalid_input(
658 "Packed struct row parsing did not consume full row",
659 location!(),
660 ));
661 }
662 }
663
664 let mut children = Vec::with_capacity(self.fields.len());
665 for (field, accumulator) in self.fields.iter().zip(accumulators.into_iter()) {
666 match (field, accumulator) {
667 (
668 VariablePackedStructFieldDecoder {
669 kind: VariablePackedStructFieldKind::Fixed { decompressor, .. },
670 },
671 FieldAccumulator::Fixed { builder, .. },
672 ) => {
673 let DataBlock::FixedWidth(block) = builder.finish() else {
674 panic!("Expected fixed-width datablock from builder");
675 };
676 let decoded = decompressor.decompress(block, num_values)?;
677 children.push(decoded);
678 }
679 (
680 VariablePackedStructFieldDecoder {
681 kind:
682 VariablePackedStructFieldKind::Variable {
683 bits_per_length,
684 decompressor,
685 },
686 },
687 FieldAccumulator::Variable32 { builder },
688 ) => {
689 let DataBlock::VariableWidth(mut block) = builder.finish() else {
690 panic!("Expected variable-width datablock from builder");
691 };
692 debug_assert_eq!(block.bits_per_offset, 32);
693 block.bits_per_offset = (*bits_per_length) as u8;
694 let decoded = decompressor.decompress(block)?;
695 children.push(decoded);
696 }
697 (
698 VariablePackedStructFieldDecoder {
699 kind:
700 VariablePackedStructFieldKind::Variable {
701 bits_per_length,
702 decompressor,
703 },
704 },
705 FieldAccumulator::Variable64 { builder },
706 ) => {
707 let DataBlock::VariableWidth(mut block) = builder.finish() else {
708 panic!("Expected variable-width datablock from builder");
709 };
710 debug_assert_eq!(block.bits_per_offset, 64);
711 block.bits_per_offset = (*bits_per_length) as u8;
712 let decoded = decompressor.decompress(block)?;
713 children.push(decoded);
714 }
715 _ => {
716 return Err(Error::invalid_input(
717 "Packed struct accumulator mismatch during finalize",
718 location!(),
719 ))
720 }
721 }
722 }
723
724 Ok(DataBlock::Struct(StructDataBlock {
725 children,
726 block_info: BlockInfo::new(),
727 validity: None,
728 }))
729 }
730}
731
732#[cfg(test)]
733mod tests {
734 use super::*;
735 use crate::{
736 compression::CompressionStrategy,
737 compression::{DefaultCompressionStrategy, DefaultDecompressionStrategy},
738 statistics::ComputeStat,
739 version::LanceFileVersion,
740 };
741 use arrow_array::{
742 Array, ArrayRef, BinaryArray, Int32Array, Int64Array, LargeStringArray, StringArray,
743 };
744 use arrow_schema::{DataType, Field as ArrowField, Fields};
745 use std::sync::Arc;
746
747 fn fixed_block_from_array(array: Int64Array) -> FixedWidthDataBlock {
748 let num_values = array.len() as u64;
749 let block = DataBlock::from_arrays(&[Arc::new(array) as ArrayRef], num_values);
750 match block {
751 DataBlock::FixedWidth(block) => block,
752 _ => panic!("Expected fixed-width data block"),
753 }
754 }
755
756 fn fixed_i32_block_from_array(array: Int32Array) -> FixedWidthDataBlock {
757 let num_values = array.len() as u64;
758 let block = DataBlock::from_arrays(&[Arc::new(array) as ArrayRef], num_values);
759 match block {
760 DataBlock::FixedWidth(block) => block,
761 _ => panic!("Expected fixed-width data block"),
762 }
763 }
764
765 fn variable_block_from_string_array(array: StringArray) -> VariableWidthBlock {
766 let num_values = array.len() as u64;
767 let block = DataBlock::from_arrays(&[Arc::new(array) as ArrayRef], num_values);
768 match block {
769 DataBlock::VariableWidth(block) => block,
770 _ => panic!("Expected variable-width block"),
771 }
772 }
773
774 fn variable_block_from_large_string_array(array: LargeStringArray) -> VariableWidthBlock {
775 let num_values = array.len() as u64;
776 let block = DataBlock::from_arrays(&[Arc::new(array) as ArrayRef], num_values);
777 match block {
778 DataBlock::VariableWidth(block) => block,
779 _ => panic!("Expected variable-width block"),
780 }
781 }
782
783 fn variable_block_from_binary_array(array: BinaryArray) -> VariableWidthBlock {
784 let num_values = array.len() as u64;
785 let block = DataBlock::from_arrays(&[Arc::new(array) as ArrayRef], num_values);
786 match block {
787 DataBlock::VariableWidth(block) => block,
788 _ => panic!("Expected variable-width block"),
789 }
790 }
791
792 #[test]
793 fn variable_packed_struct_round_trip() -> Result<()> {
794 let arrow_fields: Fields = vec![
795 ArrowField::new("id", DataType::UInt32, false),
796 ArrowField::new("name", DataType::Utf8, true),
797 ]
798 .into();
799 let arrow_struct = ArrowField::new("item", DataType::Struct(arrow_fields), false);
800 let struct_field = Field::try_from(&arrow_struct)?;
801
802 let ids = vec![1_u32, 2, 42];
803 let id_bytes = ids
804 .iter()
805 .flat_map(|value| value.to_le_bytes())
806 .collect::<Vec<_>>();
807 let mut id_block = FixedWidthDataBlock {
808 data: LanceBuffer::reinterpret_vec(ids),
809 bits_per_value: 32,
810 num_values: 3,
811 block_info: BlockInfo::new(),
812 };
813 id_block.compute_stat();
814 let id_block = DataBlock::FixedWidth(id_block);
815
816 let name_offsets = vec![0_i32, 1, 4, 4];
817 let name_bytes = b"abcz".to_vec();
818 let mut name_block = VariableWidthBlock {
819 data: LanceBuffer::from(name_bytes.clone()),
820 bits_per_offset: 32,
821 offsets: LanceBuffer::reinterpret_vec(name_offsets.clone()),
822 num_values: 3,
823 block_info: BlockInfo::new(),
824 };
825 name_block.compute_stat();
826 let name_block = DataBlock::VariableWidth(name_block);
827
828 let struct_block = StructDataBlock {
829 children: vec![id_block, name_block],
830 block_info: BlockInfo::new(),
831 validity: None,
832 };
833
834 let data_block = DataBlock::Struct(struct_block);
835
836 let compression_strategy =
837 DefaultCompressionStrategy::new().with_version(LanceFileVersion::V2_2);
838 let compressor = crate::compression::CompressionStrategy::create_per_value(
839 &compression_strategy,
840 &struct_field,
841 &data_block,
842 )?;
843 let (compressed, encoding) = compressor.compress(data_block)?;
844
845 let PerValueDataBlock::Variable(zipped) = compressed else {
846 panic!("expected variable-width packed struct output");
847 };
848
849 let decompression_strategy = DefaultDecompressionStrategy::default();
850 let decompressor =
851 crate::compression::DecompressionStrategy::create_variable_per_value_decompressor(
852 &decompression_strategy,
853 &encoding,
854 )?;
855 let decoded = decompressor.decompress(zipped)?;
856
857 let DataBlock::Struct(decoded_struct) = decoded else {
858 panic!("expected struct datablock after decode");
859 };
860
861 let decoded_id = decoded_struct.children[0].as_fixed_width_ref().unwrap();
862 assert_eq!(decoded_id.bits_per_value, 32);
863 assert_eq!(decoded_id.data.as_ref(), id_bytes.as_slice());
864
865 let decoded_name = decoded_struct.children[1].as_variable_width_ref().unwrap();
866 assert_eq!(decoded_name.bits_per_offset, 32);
867 let decoded_offsets = decoded_name.offsets.borrow_to_typed_slice::<i32>();
868 assert_eq!(decoded_offsets.as_ref(), name_offsets.as_slice());
869 assert_eq!(decoded_name.data.as_ref(), name_bytes.as_slice());
870
871 Ok(())
872 }
873
874 #[test]
875 fn variable_packed_struct_large_utf8_round_trip() -> Result<()> {
876 let arrow_fields: Fields = vec![
877 ArrowField::new("value", DataType::Int64, false),
878 ArrowField::new("text", DataType::LargeUtf8, false),
879 ]
880 .into();
881 let arrow_struct = ArrowField::new("item", DataType::Struct(arrow_fields), false);
882 let struct_field = Field::try_from(&arrow_struct)?;
883
884 let id_block = fixed_block_from_array(Int64Array::from(vec![10, 20, 30, 40]));
885 let payload_array = LargeStringArray::from(vec![
886 "alpha",
887 "a considerably longer payload for testing",
888 "mid",
889 "z",
890 ]);
891 let payload_block = variable_block_from_large_string_array(payload_array);
892
893 let struct_block = StructDataBlock {
894 children: vec![
895 DataBlock::FixedWidth(id_block.clone()),
896 DataBlock::VariableWidth(payload_block.clone()),
897 ],
898 block_info: BlockInfo::new(),
899 validity: None,
900 };
901
902 let data_block = DataBlock::Struct(struct_block);
903
904 let compression_strategy =
905 DefaultCompressionStrategy::new().with_version(LanceFileVersion::V2_2);
906 let compressor = crate::compression::CompressionStrategy::create_per_value(
907 &compression_strategy,
908 &struct_field,
909 &data_block,
910 )?;
911 let (compressed, encoding) = compressor.compress(data_block)?;
912
913 let PerValueDataBlock::Variable(zipped) = compressed else {
914 panic!("expected variable-width packed struct output");
915 };
916
917 let decompression_strategy = DefaultDecompressionStrategy::default();
918 let decompressor =
919 crate::compression::DecompressionStrategy::create_variable_per_value_decompressor(
920 &decompression_strategy,
921 &encoding,
922 )?;
923 let decoded = decompressor.decompress(zipped)?;
924
925 let DataBlock::Struct(decoded_struct) = decoded else {
926 panic!("expected struct datablock after decode");
927 };
928
929 let decoded_id = decoded_struct.children[0].as_fixed_width_ref().unwrap();
930 assert_eq!(decoded_id.bits_per_value, 64);
931 assert_eq!(decoded_id.data.as_ref(), id_block.data.as_ref());
932
933 let decoded_payload = decoded_struct.children[1].as_variable_width_ref().unwrap();
934 assert_eq!(decoded_payload.bits_per_offset, 64);
935 assert_eq!(
936 decoded_payload
937 .offsets
938 .borrow_to_typed_slice::<i64>()
939 .as_ref(),
940 payload_block
941 .offsets
942 .borrow_to_typed_slice::<i64>()
943 .as_ref()
944 );
945 assert_eq!(decoded_payload.data.as_ref(), payload_block.data.as_ref());
946
947 Ok(())
948 }
949
950 #[test]
951 fn variable_packed_struct_multi_variable_round_trip() -> Result<()> {
952 let arrow_fields: Fields = vec![
953 ArrowField::new("category", DataType::Utf8, false),
954 ArrowField::new("payload", DataType::Binary, false),
955 ArrowField::new("count", DataType::Int32, false),
956 ]
957 .into();
958 let arrow_struct = ArrowField::new("item", DataType::Struct(arrow_fields), false);
959 let struct_field = Field::try_from(&arrow_struct)?;
960
961 let category_array = StringArray::from(vec!["red", "blue", "green", "red"]);
962 let category_block = variable_block_from_string_array(category_array);
963 let payload_values: Vec<Vec<u8>> =
964 vec![vec![0x01, 0x02], vec![], vec![0x05, 0x06, 0x07], vec![0xff]];
965 let payload_array =
966 BinaryArray::from_iter_values(payload_values.iter().map(|v| v.as_slice()));
967 let payload_block = variable_block_from_binary_array(payload_array);
968 let count_block = fixed_i32_block_from_array(Int32Array::from(vec![1, 2, 3, 4]));
969
970 let struct_block = StructDataBlock {
971 children: vec![
972 DataBlock::VariableWidth(category_block.clone()),
973 DataBlock::VariableWidth(payload_block.clone()),
974 DataBlock::FixedWidth(count_block.clone()),
975 ],
976 block_info: BlockInfo::new(),
977 validity: None,
978 };
979
980 let data_block = DataBlock::Struct(struct_block);
981
982 let compression_strategy =
983 DefaultCompressionStrategy::new().with_version(LanceFileVersion::V2_2);
984 let compressor = crate::compression::CompressionStrategy::create_per_value(
985 &compression_strategy,
986 &struct_field,
987 &data_block,
988 )?;
989 let (compressed, encoding) = compressor.compress(data_block)?;
990
991 let PerValueDataBlock::Variable(zipped) = compressed else {
992 panic!("expected variable-width packed struct output");
993 };
994
995 let decompression_strategy = DefaultDecompressionStrategy::default();
996 let decompressor =
997 crate::compression::DecompressionStrategy::create_variable_per_value_decompressor(
998 &decompression_strategy,
999 &encoding,
1000 )?;
1001 let decoded = decompressor.decompress(zipped)?;
1002
1003 let DataBlock::Struct(decoded_struct) = decoded else {
1004 panic!("expected struct datablock after decode");
1005 };
1006
1007 let decoded_category = decoded_struct.children[0].as_variable_width_ref().unwrap();
1008 assert_eq!(decoded_category.bits_per_offset, 32);
1009 assert_eq!(
1010 decoded_category
1011 .offsets
1012 .borrow_to_typed_slice::<i32>()
1013 .as_ref(),
1014 category_block
1015 .offsets
1016 .borrow_to_typed_slice::<i32>()
1017 .as_ref()
1018 );
1019 assert_eq!(decoded_category.data.as_ref(), category_block.data.as_ref());
1020
1021 let decoded_payload = decoded_struct.children[1].as_variable_width_ref().unwrap();
1022 assert_eq!(decoded_payload.bits_per_offset, 32);
1023 assert_eq!(
1024 decoded_payload
1025 .offsets
1026 .borrow_to_typed_slice::<i32>()
1027 .as_ref(),
1028 payload_block
1029 .offsets
1030 .borrow_to_typed_slice::<i32>()
1031 .as_ref()
1032 );
1033 assert_eq!(decoded_payload.data.as_ref(), payload_block.data.as_ref());
1034
1035 let decoded_count = decoded_struct.children[2].as_fixed_width_ref().unwrap();
1036 assert_eq!(decoded_count.bits_per_value, 32);
1037 assert_eq!(decoded_count.data.as_ref(), count_block.data.as_ref());
1038
1039 Ok(())
1040 }
1041
1042 #[test]
1043 fn variable_packed_struct_requires_v22() {
1044 let arrow_fields: Fields = vec![
1045 ArrowField::new("value", DataType::Int64, false),
1046 ArrowField::new("text", DataType::Utf8, false),
1047 ]
1048 .into();
1049 let arrow_struct = ArrowField::new("item", DataType::Struct(arrow_fields), false);
1050 let struct_field = Field::try_from(&arrow_struct).unwrap();
1051
1052 let value_block = fixed_block_from_array(Int64Array::from(vec![1, 2, 3]));
1053 let text_block =
1054 variable_block_from_string_array(StringArray::from(vec!["a", "bb", "ccc"]));
1055
1056 let struct_block = StructDataBlock {
1057 children: vec![
1058 DataBlock::FixedWidth(value_block),
1059 DataBlock::VariableWidth(text_block),
1060 ],
1061 block_info: BlockInfo::new(),
1062 validity: None,
1063 };
1064
1065 let compression_strategy =
1066 DefaultCompressionStrategy::new().with_version(LanceFileVersion::V2_1);
1067 let result =
1068 compression_strategy.create_per_value(&struct_field, &DataBlock::Struct(struct_block));
1069
1070 assert!(matches!(result, Err(Error::NotSupported { .. })));
1071 }
1072}