1use arrow_array::OffsetSizeTrait;
13use bytemuck::cast_slice;
14use byteorder::{ByteOrder, LittleEndian};
15use core::panic;
16use snafu::location;
17
18use crate::compression::{
19 BlockCompressor, BlockDecompressor, MiniBlockDecompressor, VariablePerValueDecompressor,
20};
21
22use crate::buffer::LanceBuffer;
23use crate::data::{BlockInfo, DataBlock, VariableWidthBlock};
24use crate::encodings::logical::primitive::fullzip::{PerValueCompressor, PerValueDataBlock};
25use crate::encodings::logical::primitive::miniblock::{
26 MiniBlockChunk, MiniBlockCompressed, MiniBlockCompressor,
27};
28use crate::format::pb21::compressive_encoding::Compression;
29use crate::format::pb21::CompressiveEncoding;
30use crate::format::{pb21, ProtobufUtils21};
31
32use lance_core::utils::bit::pad_bytes_to;
33use lance_core::{Error, Result};
34
35#[derive(Debug, Default)]
36pub struct BinaryMiniBlockEncoder {}
37
38const AIM_MINICHUNK_SIZE: i64 = 4 * 1024;
39
40fn chunk_offsets<N: OffsetSizeTrait>(
42 offsets: &[N],
43 data: &[u8],
44 alignment: usize,
45) -> (Vec<LanceBuffer>, Vec<MiniBlockChunk>) {
46 #[derive(Debug)]
47 struct ChunkInfo {
48 chunk_start_offset_in_orig_idx: usize,
49 chunk_last_offset_in_orig_idx: usize,
50 bytes_start_offset: usize,
52 padded_chunk_size: usize,
57 }
58
59 let byte_width: usize = N::get_byte_width();
60 let mut chunks_info = vec![];
61 let mut chunks = vec![];
62 let mut last_offset_in_orig_idx = 0;
63 loop {
64 let this_last_offset_in_orig_idx = search_next_offset_idx(offsets, last_offset_in_orig_idx);
65
66 let num_values_in_this_chunk = this_last_offset_in_orig_idx - last_offset_in_orig_idx;
67 let chunk_bytes = offsets[this_last_offset_in_orig_idx] - offsets[last_offset_in_orig_idx];
68 let this_chunk_size =
69 (num_values_in_this_chunk + 1) * byte_width + chunk_bytes.to_usize().unwrap();
70
71 let padded_chunk_size = this_chunk_size.next_multiple_of(alignment);
72 debug_assert!(padded_chunk_size > 0);
73
74 let this_chunk_bytes_start_offset = (num_values_in_this_chunk + 1) * byte_width;
75 chunks_info.push(ChunkInfo {
76 chunk_start_offset_in_orig_idx: last_offset_in_orig_idx,
77 chunk_last_offset_in_orig_idx: this_last_offset_in_orig_idx,
78 bytes_start_offset: this_chunk_bytes_start_offset,
79 padded_chunk_size,
80 });
81 chunks.push(MiniBlockChunk {
82 log_num_values: if this_last_offset_in_orig_idx == offsets.len() - 1 {
83 0
84 } else {
85 num_values_in_this_chunk.trailing_zeros() as u8
86 },
87 buffer_sizes: vec![padded_chunk_size as u16],
88 });
89 if this_last_offset_in_orig_idx == offsets.len() - 1 {
90 break;
91 }
92 last_offset_in_orig_idx = this_last_offset_in_orig_idx;
93 }
94
95 let output_total_bytes = chunks_info
96 .iter()
97 .map(|chunk_info| chunk_info.padded_chunk_size)
98 .sum::<usize>();
99
100 let mut output: Vec<u8> = Vec::with_capacity(output_total_bytes);
101
102 for chunk in chunks_info {
103 let this_chunk_offsets: Vec<N> = offsets
104 [chunk.chunk_start_offset_in_orig_idx..=chunk.chunk_last_offset_in_orig_idx]
105 .iter()
106 .map(|offset| {
107 *offset - offsets[chunk.chunk_start_offset_in_orig_idx]
108 + N::from_usize(chunk.bytes_start_offset).unwrap()
109 })
110 .collect();
111
112 let this_chunk_offsets = LanceBuffer::reinterpret_vec(this_chunk_offsets);
113 output.extend_from_slice(&this_chunk_offsets);
114
115 let start_in_orig = offsets[chunk.chunk_start_offset_in_orig_idx]
116 .to_usize()
117 .unwrap();
118 let end_in_orig = offsets[chunk.chunk_last_offset_in_orig_idx]
119 .to_usize()
120 .unwrap();
121 output.extend_from_slice(&data[start_in_orig..end_in_orig]);
122
123 const PAD_BYTE: u8 = 72;
125 let pad_len = pad_bytes_to(output.len(), alignment);
126
127 if pad_len > 0_usize {
129 output.extend(std::iter::repeat_n(PAD_BYTE, pad_len));
130 }
131 }
132 (vec![LanceBuffer::reinterpret_vec(output)], chunks)
133}
134
135fn search_next_offset_idx<N: OffsetSizeTrait>(offsets: &[N], last_offset_idx: usize) -> usize {
140 let mut num_values = 1;
141 let mut new_num_values = num_values * 2;
142 loop {
143 if last_offset_idx + new_num_values >= offsets.len() {
144 let existing_bytes = offsets[offsets.len() - 1] - offsets[last_offset_idx];
145 let new_size = existing_bytes
147 + N::from_usize((offsets.len() - last_offset_idx) * N::get_byte_width()).unwrap();
148 if new_size.to_i64().unwrap() <= AIM_MINICHUNK_SIZE {
149 return offsets.len() - 1;
151 } else {
152 return last_offset_idx + num_values;
154 }
155 }
156 let existing_bytes = offsets[last_offset_idx + new_num_values] - offsets[last_offset_idx];
157 let new_size =
158 existing_bytes + N::from_usize((new_num_values + 1) * N::get_byte_width()).unwrap();
159 if new_size.to_i64().unwrap() <= AIM_MINICHUNK_SIZE {
160 num_values = new_num_values;
161 new_num_values *= 2;
162 } else {
163 break;
164 }
165 }
166 last_offset_idx + new_num_values
167}
168
169impl BinaryMiniBlockEncoder {
170 fn chunk_data(&self, data: VariableWidthBlock) -> (MiniBlockCompressed, CompressiveEncoding) {
174 match data.bits_per_offset {
177 32 => {
178 let offsets = data.offsets.borrow_to_typed_slice::<i32>();
179 let (buffers, chunks) = chunk_offsets(offsets.as_ref(), &data.data, 4);
180 (
181 MiniBlockCompressed {
182 data: buffers,
183 chunks,
184 num_values: data.num_values,
185 },
186 ProtobufUtils21::variable(ProtobufUtils21::flat(32, None), None),
187 )
188 }
189 64 => {
190 let offsets = data.offsets.borrow_to_typed_slice::<i64>();
191 let (buffers, chunks) = chunk_offsets(offsets.as_ref(), &data.data, 8);
192 (
193 MiniBlockCompressed {
194 data: buffers,
195 chunks,
196 num_values: data.num_values,
197 },
198 ProtobufUtils21::variable(ProtobufUtils21::flat(64, None), None),
199 )
200 }
201 _ => panic!("Unsupported bits_per_offset={}", data.bits_per_offset),
202 }
203 }
204}
205
206impl MiniBlockCompressor for BinaryMiniBlockEncoder {
207 fn compress(&self, data: DataBlock) -> Result<(MiniBlockCompressed, CompressiveEncoding)> {
208 match data {
209 DataBlock::VariableWidth(variable_width) => Ok(self.chunk_data(variable_width)),
210 _ => Err(Error::InvalidInput {
211 source: format!(
212 "Cannot compress a data block of type {} with BinaryMiniBlockEncoder",
213 data.name()
214 )
215 .into(),
216 location: location!(),
217 }),
218 }
219 }
220}
221
222#[derive(Debug)]
223pub struct BinaryMiniBlockDecompressor {
224 bits_per_offset: u8,
225}
226
227impl BinaryMiniBlockDecompressor {
228 pub fn new(bits_per_offset: u8) -> Self {
229 assert!(bits_per_offset == 32 || bits_per_offset == 64);
230 Self { bits_per_offset }
231 }
232
233 pub fn from_variable(variable: &pb21::Variable) -> Self {
234 if let Compression::Flat(flat) = variable
235 .offsets
236 .as_ref()
237 .unwrap()
238 .compression
239 .as_ref()
240 .unwrap()
241 {
242 Self {
243 bits_per_offset: flat.bits_per_value as u8,
244 }
245 } else {
246 panic!("Unsupported offsets compression: {:?}", variable.offsets);
247 }
248 }
249}
250
251impl MiniBlockDecompressor for BinaryMiniBlockDecompressor {
252 fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock> {
257 assert_eq!(data.len(), 1);
258 let data = data.into_iter().next().unwrap();
259
260 if self.bits_per_offset == 64 {
261 assert!(data.len() >= 16);
263
264 let offsets_buffer = data.borrow_to_typed_slice::<u64>();
265 let offsets = offsets_buffer.as_ref();
266
267 let result_offsets = offsets[0..(num_values + 1) as usize]
268 .iter()
269 .map(|offset| offset - offsets[0])
270 .collect::<Vec<u64>>();
271
272 Ok(DataBlock::VariableWidth(VariableWidthBlock {
273 data: LanceBuffer::from(
274 data[offsets[0] as usize..offsets[num_values as usize] as usize].to_vec(),
275 ),
276 offsets: LanceBuffer::reinterpret_vec(result_offsets),
277 bits_per_offset: 64,
278 num_values,
279 block_info: BlockInfo::new(),
280 }))
281 } else {
282 assert!(data.len() >= 8);
284
285 let offsets_buffer = data.borrow_to_typed_slice::<u32>();
286 let offsets = offsets_buffer.as_ref();
287
288 let result_offsets = offsets[0..(num_values + 1) as usize]
289 .iter()
290 .map(|offset| offset - offsets[0])
291 .collect::<Vec<u32>>();
292
293 Ok(DataBlock::VariableWidth(VariableWidthBlock {
294 data: LanceBuffer::from(
295 data[offsets[0] as usize..offsets[num_values as usize] as usize].to_vec(),
296 ),
297 offsets: LanceBuffer::reinterpret_vec(result_offsets),
298 bits_per_offset: 32,
299 num_values,
300 block_info: BlockInfo::new(),
301 }))
302 }
303 }
304}
305
306#[derive(Debug, Default)]
316pub struct VariableEncoder {}
317
318impl BlockCompressor for VariableEncoder {
319 fn compress(&self, mut data: DataBlock) -> Result<LanceBuffer> {
320 match data {
321 DataBlock::VariableWidth(ref mut variable_width_data) => {
322 let num_values: u64 = variable_width_data.num_values;
323 match variable_width_data.bits_per_offset {
324 32 => {
325 let num_values: u32 = num_values
326 .try_into()
327 .expect("The Maximum number of values BinaryBlockEncoder can work with is u32::MAX");
328
329 let offsets = variable_width_data.offsets.borrow_to_typed_slice::<u32>();
330 let offsets = offsets.as_ref();
331 let bytes_start_offset = 1 + 4 + 4 + std::mem::size_of_val(offsets) as u32;
335
336 let output_total_bytes =
337 bytes_start_offset as usize + variable_width_data.data.len();
338 let mut output: Vec<u8> = Vec::with_capacity(output_total_bytes);
339
340 output.push(32_u8);
342
343 output.extend_from_slice(&(num_values).to_le_bytes());
345
346 output.extend_from_slice(&(bytes_start_offset).to_le_bytes());
348
349 output.extend_from_slice(cast_slice(offsets));
351
352 output.extend_from_slice(&variable_width_data.data);
354 Ok(LanceBuffer::from(output))
355 }
356 64 => {
357 let offsets = variable_width_data.offsets.borrow_to_typed_slice::<u64>();
358 let offsets = offsets.as_ref();
359 let bytes_start_offset = 1 + 8 + 8 + std::mem::size_of_val(offsets) as u64;
364
365 let output_total_bytes =
366 bytes_start_offset as usize + variable_width_data.data.len();
367 let mut output: Vec<u8> = Vec::with_capacity(output_total_bytes);
368
369 output.push(64_u8);
371
372 output.extend_from_slice(&(num_values).to_le_bytes());
374
375 output.extend_from_slice(&(bytes_start_offset).to_le_bytes());
377
378 output.extend_from_slice(cast_slice(offsets));
380
381 output.extend_from_slice(&variable_width_data.data);
383 Ok(LanceBuffer::from(output))
384 }
385 _ => {
386 panic!("BinaryBlockEncoder does not work with {} bits per offset VariableWidth DataBlock.",
387 variable_width_data.bits_per_offset);
388 }
389 }
390 }
391 _ => {
392 panic!("BinaryBlockEncoder can only work with Variable Width DataBlock.");
393 }
394 }
395 }
396}
397
398impl PerValueCompressor for VariableEncoder {
399 fn compress(&self, data: DataBlock) -> Result<(PerValueDataBlock, CompressiveEncoding)> {
400 let DataBlock::VariableWidth(variable) = data else {
401 panic!("BinaryPerValueCompressor can only work with Variable Width DataBlock.");
402 };
403
404 let encoding = ProtobufUtils21::variable(
405 ProtobufUtils21::flat(variable.bits_per_offset as u64, None),
406 None,
407 );
408 Ok((PerValueDataBlock::Variable(variable), encoding))
409 }
410}
411
412#[derive(Debug, Default)]
413pub struct VariableDecoder {}
414
415impl VariablePerValueDecompressor for VariableDecoder {
416 fn decompress(&self, data: VariableWidthBlock) -> Result<DataBlock> {
417 Ok(DataBlock::VariableWidth(data))
418 }
419}
420
421#[derive(Debug, Default)]
422pub struct BinaryBlockDecompressor {}
423
424impl BlockDecompressor for BinaryBlockDecompressor {
425 fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock> {
426 let bits_per_offset = data[0];
428 match bits_per_offset {
429 32 => {
430 let stored_num_values = LittleEndian::read_u32(&data[1..5]);
432 debug_assert_eq!(num_values, stored_num_values as u64);
433
434 let bytes_start_offset = LittleEndian::read_u32(&data[5..9]);
436
437 let offsets = data.slice_with_length(9, bytes_start_offset as usize - 9);
439
440 let data = data.slice_with_length(
442 bytes_start_offset as usize,
443 data.len() - bytes_start_offset as usize,
444 );
445
446 Ok(DataBlock::VariableWidth(VariableWidthBlock {
447 data,
448 offsets,
449 bits_per_offset: 32,
450 num_values,
451 block_info: BlockInfo::new(),
452 }))
453 }
454 64 => {
455 let stored_num_values = LittleEndian::read_u64(&data[1..9]);
457 debug_assert_eq!(num_values, stored_num_values);
458
459 let bytes_start_offset = LittleEndian::read_u64(&data[9..17]);
461
462 let offsets = data.slice_with_length(17, bytes_start_offset as usize - 17);
464
465 let data = data.slice_with_length(
467 bytes_start_offset as usize,
468 data.len() - bytes_start_offset as usize,
469 );
470
471 Ok(DataBlock::VariableWidth(VariableWidthBlock {
472 data,
473 offsets,
474 bits_per_offset: 64,
475 num_values,
476 block_info: BlockInfo::new(),
477 }))
478 }
479 _ => panic!("Unsupported bits_per_offset={}", bits_per_offset),
480 }
481 }
482}
483
484#[cfg(test)]
485pub mod tests {
486 use arrow_array::{
487 builder::{LargeStringBuilder, StringBuilder},
488 ArrayRef, StringArray,
489 };
490 use arrow_schema::{DataType, Field};
491
492 use crate::{
493 constants::{
494 COMPRESSION_META_KEY, STRUCTURAL_ENCODING_FULLZIP, STRUCTURAL_ENCODING_META_KEY,
495 STRUCTURAL_ENCODING_MINIBLOCK,
496 },
497 testing::check_specific_random,
498 };
499 use rstest::rstest;
500 use std::{collections::HashMap, sync::Arc, vec};
501
502 use crate::{
503 testing::{
504 check_basic_random, check_round_trip_encoding_of_data, FnArrayGeneratorProvider,
505 TestCases,
506 },
507 version::LanceFileVersion,
508 };
509
510 #[test_log::test(tokio::test)]
511 async fn test_utf8_binary() {
512 let field = Field::new("", DataType::Utf8, false);
513 check_specific_random(
514 field,
515 TestCases::basic().with_min_file_version(LanceFileVersion::V2_1),
516 )
517 .await;
518 }
519
520 #[rstest]
521 #[test_log::test(tokio::test)]
522 async fn test_binary(
523 #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
524 structural_encoding: &str,
525 #[values(DataType::Utf8, DataType::Binary)] data_type: DataType,
526 ) {
527 let mut field_metadata = HashMap::new();
528 field_metadata.insert(
529 STRUCTURAL_ENCODING_META_KEY.to_string(),
530 structural_encoding.into(),
531 );
532
533 let field = Field::new("", data_type, false).with_metadata(field_metadata);
534 check_basic_random(field).await;
535 }
536
537 #[rstest]
538 #[test_log::test(tokio::test)]
539 async fn test_binary_fsst(
540 #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
541 structural_encoding: &str,
542 #[values(DataType::Binary, DataType::Utf8)] data_type: DataType,
543 ) {
544 let mut field_metadata = HashMap::new();
545 field_metadata.insert(
546 STRUCTURAL_ENCODING_META_KEY.to_string(),
547 structural_encoding.into(),
548 );
549 field_metadata.insert(COMPRESSION_META_KEY.to_string(), "fsst".into());
550 let field = Field::new("", data_type, true).with_metadata(field_metadata);
551 let test_cases = TestCases::default().with_min_file_version(LanceFileVersion::V2_1);
553 check_specific_random(field, test_cases).await;
554 }
555
556 #[rstest]
557 #[test_log::test(tokio::test)]
558 async fn test_fsst_large_binary(
559 #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
560 structural_encoding: &str,
561 #[values(DataType::LargeBinary, DataType::LargeUtf8)] data_type: DataType,
562 ) {
563 let mut field_metadata = HashMap::new();
564 field_metadata.insert(
565 STRUCTURAL_ENCODING_META_KEY.to_string(),
566 structural_encoding.into(),
567 );
568 field_metadata.insert(COMPRESSION_META_KEY.to_string(), "fsst".into());
569 let field = Field::new("", data_type, true).with_metadata(field_metadata);
570 check_specific_random(
571 field,
572 TestCases::basic().with_min_file_version(LanceFileVersion::V2_1),
573 )
574 .await;
575 }
576
577 #[test_log::test(tokio::test)]
578 async fn test_large_binary() {
579 let field = Field::new("", DataType::LargeBinary, true);
580 check_basic_random(field).await;
581 }
582
583 #[test_log::test(tokio::test)]
584 async fn test_large_utf8() {
585 let field = Field::new("", DataType::LargeUtf8, true);
586 check_basic_random(field).await;
587 }
588
589 #[rstest]
590 #[test_log::test(tokio::test)]
591 async fn test_small_strings(
592 #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
593 structural_encoding: &str,
594 ) {
595 use crate::testing::check_basic_generated;
596
597 let mut field_metadata = HashMap::new();
598 field_metadata.insert(
599 STRUCTURAL_ENCODING_META_KEY.to_string(),
600 structural_encoding.into(),
601 );
602 let field = Field::new("", DataType::Utf8, true).with_metadata(field_metadata);
603 check_basic_generated(
604 field,
605 Box::new(FnArrayGeneratorProvider::new(move || {
606 lance_datagen::array::utf8_prefix_plus_counter("user_", false)
607 })),
608 )
609 .await;
610 }
611
612 #[rstest]
613 #[test_log::test(tokio::test)]
614 async fn test_simple_binary(
615 #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
616 structural_encoding: &str,
617 #[values(DataType::Utf8, DataType::Binary)] data_type: DataType,
618 ) {
619 let string_array = StringArray::from(vec![Some("abc"), None, Some("pqr"), None, Some("m")]);
620 let string_array = arrow_cast::cast(&string_array, &data_type).unwrap();
621
622 let mut field_metadata = HashMap::new();
623 field_metadata.insert(
624 STRUCTURAL_ENCODING_META_KEY.to_string(),
625 structural_encoding.into(),
626 );
627
628 let test_cases = TestCases::default()
629 .with_range(0..2)
630 .with_range(0..3)
631 .with_range(1..3)
632 .with_indices(vec![0, 1, 3, 4]);
633 check_round_trip_encoding_of_data(
634 vec![Arc::new(string_array)],
635 &test_cases,
636 field_metadata,
637 )
638 .await;
639 }
640
641 #[test_log::test(tokio::test)]
642 async fn test_sliced_utf8() {
643 let string_array = StringArray::from(vec![Some("abc"), Some("de"), None, Some("fgh")]);
644 let string_array = string_array.slice(1, 3);
645
646 let test_cases = TestCases::default()
647 .with_range(0..1)
648 .with_range(0..2)
649 .with_range(1..2);
650 check_round_trip_encoding_of_data(
651 vec![Arc::new(string_array)],
652 &test_cases,
653 HashMap::new(),
654 )
655 .await;
656 }
657
658 #[test_log::test(tokio::test)]
659 async fn test_bigger_than_max_page_size() {
660 let big_string = String::from_iter((0..(32 * 1024 * 1024)).map(|_| '0'));
662 let string_array = StringArray::from(vec![
663 Some(big_string),
664 Some("abc".to_string()),
665 None,
666 None,
667 Some("xyz".to_string()),
668 ]);
669
670 let test_cases = TestCases::default().with_max_page_size(1024 * 1024);
672
673 check_round_trip_encoding_of_data(
674 vec![Arc::new(string_array)],
675 &test_cases,
676 HashMap::new(),
677 )
678 .await;
679
680 let big_string = String::from_iter((0..(1000 * 1000)).map(|_| '0'));
684 let string_array = StringArray::from_iter_values((0..90).map(|_| big_string.clone()));
685
686 check_round_trip_encoding_of_data(
687 vec![Arc::new(string_array)],
688 &TestCases::default(),
689 HashMap::new(),
690 )
691 .await;
692 }
693
694 #[test_log::test(tokio::test)]
695 async fn test_empty_strings() {
696 let values = [Some("abc"), Some(""), None];
699 for order in [[0, 1, 2], [1, 0, 2], [2, 0, 1]] {
701 let mut string_builder = StringBuilder::new();
702 for idx in order {
703 string_builder.append_option(values[idx]);
704 }
705 let string_array = Arc::new(string_builder.finish());
706 let test_cases = TestCases::default()
707 .with_indices(vec![1])
708 .with_indices(vec![0])
709 .with_indices(vec![2])
710 .with_indices(vec![0, 1]);
711 check_round_trip_encoding_of_data(
712 vec![string_array.clone()],
713 &test_cases,
714 HashMap::new(),
715 )
716 .await;
717 let test_cases = test_cases.with_batch_size(1);
718 check_round_trip_encoding_of_data(vec![string_array], &test_cases, HashMap::new())
719 .await;
720 }
721
722 let string_array = Arc::new(StringArray::from(vec![Some(""), None, Some("")]));
727
728 let test_cases = TestCases::default().with_range(0..2).with_indices(vec![1]);
729 check_round_trip_encoding_of_data(vec![string_array.clone()], &test_cases, HashMap::new())
730 .await;
731 let test_cases = test_cases.with_batch_size(1);
732 check_round_trip_encoding_of_data(vec![string_array], &test_cases, HashMap::new()).await;
733 }
734
735 #[test_log::test(tokio::test)]
736 #[ignore] async fn test_jumbo_string() {
738 let mut string_builder = LargeStringBuilder::new();
742 let giant_string = String::from_iter((0..(1024 * 1024)).map(|_| '0'));
744 for _ in 0..5000 {
745 string_builder.append_option(Some(&giant_string));
746 }
747 let giant_array = Arc::new(string_builder.finish()) as ArrayRef;
748 let arrs = vec![giant_array];
749
750 let test_cases = TestCases::default().without_validation();
752 check_round_trip_encoding_of_data(arrs, &test_cases, HashMap::new()).await;
753 }
754
755 #[rstest]
756 #[test_log::test(tokio::test)]
757 async fn test_binary_dictionary_encoding(
758 #[values(true, false)] with_nulls: bool,
759 #[values(100, 500, 35000)] dict_size: u32,
760 ) {
761 let test_cases = TestCases::default().with_min_file_version(LanceFileVersion::V2_1);
762 let strings = (0..dict_size)
763 .map(|i| i.to_string())
764 .collect::<Vec<String>>();
765
766 let repeated_strings: Vec<_> = strings
767 .iter()
768 .cycle()
769 .take(70000)
770 .enumerate()
771 .map(|(i, s)| {
772 if with_nulls && i % 7 == 0 {
773 None
774 } else {
775 Some(s.clone())
776 }
777 })
778 .collect();
779 let string_array = Arc::new(StringArray::from(repeated_strings)) as ArrayRef;
780 check_round_trip_encoding_of_data(vec![string_array], &test_cases, HashMap::new()).await;
781 }
782
783 #[test_log::test(tokio::test)]
784 async fn test_binary_encoding_verification() {
785 use lance_datagen::{ByteCount, RowCount};
786
787 let test_cases = TestCases::default()
788 .with_expected_encoding("variable")
789 .with_min_file_version(LanceFileVersion::V2_1);
790
791 let arr_small = lance_datagen::gen_batch()
794 .anon_col(lance_datagen::array::rand_utf8(ByteCount::from(10), false))
795 .into_batch_rows(RowCount::from(1000))
796 .unwrap()
797 .column(0)
798 .clone();
799 check_round_trip_encoding_of_data(vec![arr_small], &test_cases, HashMap::new()).await;
800
801 let metadata_explicit =
803 HashMap::from([("lance-encoding:compression".to_string(), "none".to_string())]);
804 let arr_large = lance_datagen::gen_batch()
805 .anon_col(lance_datagen::array::rand_utf8(ByteCount::from(50), false))
806 .into_batch_rows(RowCount::from(2000))
807 .unwrap()
808 .column(0)
809 .clone();
810 check_round_trip_encoding_of_data(vec![arr_large], &test_cases, metadata_explicit).await;
811 }
812
813 #[test]
814 fn test_binary_miniblock_with_misaligned_buffer() {
815 use super::BinaryMiniBlockDecompressor;
816 use crate::buffer::LanceBuffer;
817 use crate::compression::MiniBlockDecompressor;
818 use crate::data::DataBlock;
819
820 {
822 let decompressor = BinaryMiniBlockDecompressor {
823 bits_per_offset: 32,
824 };
825
826 let mut test_data = Vec::new();
830
831 test_data.extend_from_slice(&12u32.to_le_bytes()); test_data.extend_from_slice(&15u32.to_le_bytes()); test_data.extend_from_slice(&20u32.to_le_bytes()); test_data.extend_from_slice(b"ABCXYZ"); test_data.extend_from_slice(&[0, 0]); let mut padded = Vec::with_capacity(test_data.len() + 1);
842 padded.push(0xFF); padded.extend_from_slice(&test_data);
844
845 let bytes = bytes::Bytes::from(padded);
846 let misaligned = bytes.slice(1..); let buffer = LanceBuffer::from_bytes(misaligned, 1);
850
851 let ptr = buffer.as_ref().as_ptr();
853 assert_ne!(
854 ptr.align_offset(4),
855 0,
856 "Test setup: buffer should be misaligned for u32"
857 );
858
859 let result = decompressor.decompress(vec![buffer], 2);
861 assert!(
862 result.is_ok(),
863 "Decompression should succeed with misaligned buffer"
864 );
865
866 if let Ok(DataBlock::VariableWidth(block)) = result {
868 assert_eq!(block.num_values, 2);
869 assert_eq!(&block.data.as_ref()[..6], b"ABCXYZ");
871 } else {
872 panic!("Expected VariableWidth block");
873 }
874 }
875
876 {
878 let decompressor = BinaryMiniBlockDecompressor {
879 bits_per_offset: 64,
880 };
881
882 let mut test_data = Vec::new();
884
885 test_data.extend_from_slice(&24u64.to_le_bytes()); test_data.extend_from_slice(&29u64.to_le_bytes()); test_data.extend_from_slice(&40u64.to_le_bytes()); test_data.extend_from_slice(b"HelloWorld"); test_data.extend_from_slice(&[0, 0, 0, 0, 0, 0]); let mut padded = Vec::with_capacity(test_data.len() + 3);
896 padded.extend_from_slice(&[0xFF, 0xFF, 0xFF]); padded.extend_from_slice(&test_data);
898
899 let bytes = bytes::Bytes::from(padded);
900 let misaligned = bytes.slice(3..); let buffer = LanceBuffer::from_bytes(misaligned, 1);
903
904 let ptr = buffer.as_ref().as_ptr();
906 assert_ne!(
907 ptr.align_offset(8),
908 0,
909 "Test setup: buffer should be misaligned for u64"
910 );
911
912 let result = decompressor.decompress(vec![buffer], 2);
914 assert!(
915 result.is_ok(),
916 "Decompression should succeed with misaligned u64 buffer"
917 );
918
919 if let Ok(DataBlock::VariableWidth(block)) = result {
920 assert_eq!(block.num_values, 2);
921 assert_eq!(&block.data.as_ref()[..10], b"HelloWorld");
923 } else {
924 panic!("Expected VariableWidth block");
925 }
926 }
927 }
928}