1use arrow_array::OffsetSizeTrait;
13use byteorder::{ByteOrder, LittleEndian};
14use core::panic;
15use snafu::location;
16
17use crate::compression::{
18 BlockCompressor, BlockDecompressor, MiniBlockDecompressor, VariablePerValueDecompressor,
19};
20
21use crate::buffer::LanceBuffer;
22use crate::data::{BlockInfo, DataBlock, VariableWidthBlock};
23use crate::encodings::logical::primitive::fullzip::{PerValueCompressor, PerValueDataBlock};
24use crate::encodings::logical::primitive::miniblock::{
25 MiniBlockChunk, MiniBlockCompressed, MiniBlockCompressor,
26};
27use crate::format::pb21::compressive_encoding::Compression;
28use crate::format::pb21::CompressiveEncoding;
29use crate::format::{pb21, ProtobufUtils21};
30
31use lance_core::utils::bit::pad_bytes_to;
32use lance_core::{Error, Result};
33
34#[derive(Debug, Default)]
35pub struct BinaryMiniBlockEncoder {}
36
37const AIM_MINICHUNK_SIZE: i64 = 4 * 1024;
38
39fn chunk_offsets<N: OffsetSizeTrait>(
41 offsets: &[N],
42 data: &[u8],
43 alignment: usize,
44) -> (Vec<LanceBuffer>, Vec<MiniBlockChunk>) {
45 #[derive(Debug)]
46 struct ChunkInfo {
47 chunk_start_offset_in_orig_idx: usize,
48 chunk_last_offset_in_orig_idx: usize,
49 bytes_start_offset: usize,
51 padded_chunk_size: usize,
56 }
57
58 let byte_width: usize = N::get_byte_width();
59 let mut chunks_info = vec![];
60 let mut chunks = vec![];
61 let mut last_offset_in_orig_idx = 0;
62 loop {
63 let this_last_offset_in_orig_idx = search_next_offset_idx(offsets, last_offset_in_orig_idx);
64
65 let num_values_in_this_chunk = this_last_offset_in_orig_idx - last_offset_in_orig_idx;
66 let chunk_bytes = offsets[this_last_offset_in_orig_idx] - offsets[last_offset_in_orig_idx];
67 let this_chunk_size =
68 (num_values_in_this_chunk + 1) * byte_width + chunk_bytes.to_usize().unwrap();
69
70 let padded_chunk_size = this_chunk_size.next_multiple_of(alignment);
71 debug_assert!(padded_chunk_size > 0);
72
73 let this_chunk_bytes_start_offset = (num_values_in_this_chunk + 1) * byte_width;
74 chunks_info.push(ChunkInfo {
75 chunk_start_offset_in_orig_idx: last_offset_in_orig_idx,
76 chunk_last_offset_in_orig_idx: this_last_offset_in_orig_idx,
77 bytes_start_offset: this_chunk_bytes_start_offset,
78 padded_chunk_size,
79 });
80 chunks.push(MiniBlockChunk {
81 log_num_values: if this_last_offset_in_orig_idx == offsets.len() - 1 {
82 0
83 } else {
84 num_values_in_this_chunk.trailing_zeros() as u8
85 },
86 buffer_sizes: vec![padded_chunk_size as u16],
87 });
88 if this_last_offset_in_orig_idx == offsets.len() - 1 {
89 break;
90 }
91 last_offset_in_orig_idx = this_last_offset_in_orig_idx;
92 }
93
94 let output_total_bytes = chunks_info
95 .iter()
96 .map(|chunk_info| chunk_info.padded_chunk_size)
97 .sum::<usize>();
98
99 let mut output: Vec<u8> = Vec::with_capacity(output_total_bytes);
100
101 for chunk in chunks_info {
102 let this_chunk_offsets: Vec<N> = offsets
103 [chunk.chunk_start_offset_in_orig_idx..=chunk.chunk_last_offset_in_orig_idx]
104 .iter()
105 .map(|offset| {
106 *offset - offsets[chunk.chunk_start_offset_in_orig_idx]
107 + N::from_usize(chunk.bytes_start_offset).unwrap()
108 })
109 .collect();
110
111 let this_chunk_offsets = LanceBuffer::reinterpret_vec(this_chunk_offsets);
112 output.extend_from_slice(&this_chunk_offsets);
113
114 let start_in_orig = offsets[chunk.chunk_start_offset_in_orig_idx]
115 .to_usize()
116 .unwrap();
117 let end_in_orig = offsets[chunk.chunk_last_offset_in_orig_idx]
118 .to_usize()
119 .unwrap();
120 output.extend_from_slice(&data[start_in_orig..end_in_orig]);
121
122 const PAD_BYTE: u8 = 72;
124 let pad_len = pad_bytes_to(output.len(), alignment);
125
126 if pad_len > 0_usize {
128 output.extend(std::iter::repeat_n(PAD_BYTE, pad_len));
129 }
130 }
131 (vec![LanceBuffer::reinterpret_vec(output)], chunks)
132}
133
134fn search_next_offset_idx<N: OffsetSizeTrait>(offsets: &[N], last_offset_idx: usize) -> usize {
139 let mut num_values = 1;
140 let mut new_num_values = num_values * 2;
141 loop {
142 if last_offset_idx + new_num_values >= offsets.len() {
143 let existing_bytes = offsets[offsets.len() - 1] - offsets[last_offset_idx];
144 let new_size = existing_bytes
146 + N::from_usize((offsets.len() - last_offset_idx) * N::get_byte_width()).unwrap();
147 if new_size.to_i64().unwrap() <= AIM_MINICHUNK_SIZE {
148 return offsets.len() - 1;
150 } else {
151 return last_offset_idx + num_values;
153 }
154 }
155 let existing_bytes = offsets[last_offset_idx + new_num_values] - offsets[last_offset_idx];
156 let new_size =
157 existing_bytes + N::from_usize((new_num_values + 1) * N::get_byte_width()).unwrap();
158 if new_size.to_i64().unwrap() <= AIM_MINICHUNK_SIZE {
159 num_values = new_num_values;
160 new_num_values *= 2;
161 } else {
162 break;
163 }
164 }
165 last_offset_idx + new_num_values
166}
167
168impl BinaryMiniBlockEncoder {
169 fn chunk_data(&self, data: VariableWidthBlock) -> (MiniBlockCompressed, CompressiveEncoding) {
173 match data.bits_per_offset {
176 32 => {
177 let offsets = data.offsets.borrow_to_typed_slice::<i32>();
178 let (buffers, chunks) = chunk_offsets(offsets.as_ref(), &data.data, 4);
179 (
180 MiniBlockCompressed {
181 data: buffers,
182 chunks,
183 num_values: data.num_values,
184 },
185 ProtobufUtils21::variable(ProtobufUtils21::flat(32, None), None),
186 )
187 }
188 64 => {
189 let offsets = data.offsets.borrow_to_typed_slice::<i64>();
190 let (buffers, chunks) = chunk_offsets(offsets.as_ref(), &data.data, 8);
191 (
192 MiniBlockCompressed {
193 data: buffers,
194 chunks,
195 num_values: data.num_values,
196 },
197 ProtobufUtils21::variable(ProtobufUtils21::flat(64, None), None),
198 )
199 }
200 _ => panic!("Unsupported bits_per_offset={}", data.bits_per_offset),
201 }
202 }
203}
204
205impl MiniBlockCompressor for BinaryMiniBlockEncoder {
206 fn compress(&self, data: DataBlock) -> Result<(MiniBlockCompressed, CompressiveEncoding)> {
207 match data {
208 DataBlock::VariableWidth(variable_width) => Ok(self.chunk_data(variable_width)),
209 _ => Err(Error::InvalidInput {
210 source: format!(
211 "Cannot compress a data block of type {} with BinaryMiniBlockEncoder",
212 data.name()
213 )
214 .into(),
215 location: location!(),
216 }),
217 }
218 }
219}
220
221#[derive(Debug)]
222pub struct BinaryMiniBlockDecompressor {
223 bits_per_offset: u8,
224}
225
226impl BinaryMiniBlockDecompressor {
227 pub fn new(bits_per_offset: u8) -> Self {
228 assert!(bits_per_offset == 32 || bits_per_offset == 64);
229 Self { bits_per_offset }
230 }
231
232 pub fn from_variable(variable: &pb21::Variable) -> Self {
233 if let Compression::Flat(flat) = variable
234 .offsets
235 .as_ref()
236 .unwrap()
237 .compression
238 .as_ref()
239 .unwrap()
240 {
241 Self {
242 bits_per_offset: flat.bits_per_value as u8,
243 }
244 } else {
245 panic!("Unsupported offsets compression: {:?}", variable.offsets);
246 }
247 }
248}
249
250impl MiniBlockDecompressor for BinaryMiniBlockDecompressor {
251 fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock> {
256 assert_eq!(data.len(), 1);
257 let data = data.into_iter().next().unwrap();
258
259 if self.bits_per_offset == 64 {
260 assert!(data.len() >= 16);
262
263 let offsets_buffer = data.borrow_to_typed_slice::<u64>();
264 let offsets = offsets_buffer.as_ref();
265
266 let result_offsets = offsets[0..(num_values + 1) as usize]
267 .iter()
268 .map(|offset| offset - offsets[0])
269 .collect::<Vec<u64>>();
270
271 Ok(DataBlock::VariableWidth(VariableWidthBlock {
272 data: LanceBuffer::from(
273 data[offsets[0] as usize..offsets[num_values as usize] as usize].to_vec(),
274 ),
275 offsets: LanceBuffer::reinterpret_vec(result_offsets),
276 bits_per_offset: 64,
277 num_values,
278 block_info: BlockInfo::new(),
279 }))
280 } else {
281 assert!(data.len() >= 8);
283
284 let offsets_buffer = data.borrow_to_typed_slice::<u32>();
285 let offsets = offsets_buffer.as_ref();
286
287 let result_offsets = offsets[0..(num_values + 1) as usize]
288 .iter()
289 .map(|offset| offset - offsets[0])
290 .collect::<Vec<u32>>();
291
292 Ok(DataBlock::VariableWidth(VariableWidthBlock {
293 data: LanceBuffer::from(
294 data[offsets[0] as usize..offsets[num_values as usize] as usize].to_vec(),
295 ),
296 offsets: LanceBuffer::reinterpret_vec(result_offsets),
297 bits_per_offset: 32,
298 num_values,
299 block_info: BlockInfo::new(),
300 }))
301 }
302 }
303}
304
305#[derive(Debug, Default)]
315pub struct VariableEncoder {}
316
317impl BlockCompressor for VariableEncoder {
318 fn compress(&self, mut data: DataBlock) -> Result<LanceBuffer> {
319 match data {
320 DataBlock::VariableWidth(ref mut variable_width_data) => {
321 match variable_width_data.bits_per_offset {
322 32 => {
323 let offsets = variable_width_data.offsets.borrow_to_typed_slice::<u32>();
324 let offsets = offsets.as_ref();
325 let bytes_start_offset = 4 + 4 + std::mem::size_of_val(offsets) as u32;
328
329 let output_total_bytes =
330 bytes_start_offset as usize + variable_width_data.data.len();
331 let mut output: Vec<u8> = Vec::with_capacity(output_total_bytes);
332
333 output.extend_from_slice(&(32_u32).to_le_bytes());
335
336 output.extend_from_slice(&(bytes_start_offset).to_le_bytes());
338
339 output.extend_from_slice(&variable_width_data.offsets);
341
342 output.extend_from_slice(&variable_width_data.data);
344 Ok(LanceBuffer::from(output))
345 }
346 64 => {
347 let offsets = variable_width_data.offsets.borrow_to_typed_slice::<u64>();
348 let offsets = offsets.as_ref();
349 let bytes_start_offset = 8 + 8 + std::mem::size_of_val(offsets) as u64;
352
353 let output_total_bytes =
354 bytes_start_offset as usize + variable_width_data.data.len();
355 let mut output: Vec<u8> = Vec::with_capacity(output_total_bytes);
356
357 output.extend_from_slice(&(64_u64).to_le_bytes());
359
360 output.extend_from_slice(&(bytes_start_offset).to_le_bytes());
362
363 output.extend_from_slice(&variable_width_data.offsets);
365
366 output.extend_from_slice(&variable_width_data.data);
368 Ok(LanceBuffer::from(output))
369 }
370 _ => {
371 panic!("BinaryBlockEncoder does not work with {} bits per offset VariableWidth DataBlock.",
372 variable_width_data.bits_per_offset);
373 }
374 }
375 }
376 _ => {
377 panic!("BinaryBlockEncoder can only work with Variable Width DataBlock.");
378 }
379 }
380 }
381}
382
383impl PerValueCompressor for VariableEncoder {
384 fn compress(&self, data: DataBlock) -> Result<(PerValueDataBlock, CompressiveEncoding)> {
385 let DataBlock::VariableWidth(variable) = data else {
386 panic!("BinaryPerValueCompressor can only work with Variable Width DataBlock.");
387 };
388
389 let encoding = ProtobufUtils21::variable(
390 ProtobufUtils21::flat(variable.bits_per_offset as u64, None),
391 None,
392 );
393 Ok((PerValueDataBlock::Variable(variable), encoding))
394 }
395}
396
397#[derive(Debug, Default)]
398pub struct VariableDecoder {}
399
400impl VariablePerValueDecompressor for VariableDecoder {
401 fn decompress(&self, data: VariableWidthBlock) -> Result<DataBlock> {
402 Ok(DataBlock::VariableWidth(data))
403 }
404}
405
406#[derive(Debug, Default)]
407pub struct BinaryBlockDecompressor {}
408
409impl BlockDecompressor for BinaryBlockDecompressor {
410 fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock> {
411 let is_old_scheme = data[1] != 0 || data[2] != 0 || data[3] != 0;
423
424 let (bits_per_offset, bytes_start_offset, offset_start) = if is_old_scheme {
425 let bits_per_offset = data[0];
427 match bits_per_offset {
428 32 => {
429 debug_assert_eq!(LittleEndian::read_u32(&data[1..5]), num_values as u32);
430 let bytes_start_offset = LittleEndian::read_u32(&data[5..9]);
431 (bits_per_offset, bytes_start_offset as u64, 9)
432 }
433 64 => {
434 debug_assert_eq!(LittleEndian::read_u64(&data[1..9]), num_values);
435 let bytes_start_offset = LittleEndian::read_u64(&data[9..17]);
436 (bits_per_offset, bytes_start_offset, 17)
437 }
438 _ => {
439 return Err(Error::InvalidInput {
440 source: format!("Unsupported bits_per_offset={}", bits_per_offset).into(),
441 location: location!(),
442 });
443 }
444 }
445 } else {
446 let bits_per_offset = LittleEndian::read_u32(&data[0..4]) as u8;
448 match bits_per_offset {
449 32 => {
450 let bytes_start_offset = LittleEndian::read_u32(&data[4..8]);
451 (bits_per_offset, bytes_start_offset as u64, 8)
452 }
453 64 => {
454 let bytes_start_offset = LittleEndian::read_u64(&data[8..16]);
455 (bits_per_offset, bytes_start_offset, 16)
456 }
457 _ => {
458 return Err(Error::InvalidInput {
459 source: format!("Unsupported bits_per_offset={}", bits_per_offset).into(),
460 location: location!(),
461 });
462 }
463 }
464 };
465
466 let offsets =
468 data.slice_with_length(offset_start, bytes_start_offset as usize - offset_start);
469
470 let data = data.slice_with_length(
472 bytes_start_offset as usize,
473 data.len() - bytes_start_offset as usize,
474 );
475
476 Ok(DataBlock::VariableWidth(VariableWidthBlock {
477 data,
478 offsets,
479 bits_per_offset,
480 num_values,
481 block_info: BlockInfo::new(),
482 }))
483 }
484}
485
486#[cfg(test)]
487pub mod tests {
488 use arrow_array::{
489 builder::{LargeStringBuilder, StringBuilder},
490 ArrayRef, StringArray,
491 };
492 use arrow_schema::{DataType, Field};
493
494 use crate::{
495 constants::{
496 COMPRESSION_META_KEY, STRUCTURAL_ENCODING_FULLZIP, STRUCTURAL_ENCODING_META_KEY,
497 STRUCTURAL_ENCODING_MINIBLOCK,
498 },
499 testing::check_specific_random,
500 };
501 use rstest::rstest;
502 use std::{collections::HashMap, sync::Arc, vec};
503
504 use crate::{
505 testing::{
506 check_basic_random, check_round_trip_encoding_of_data, FnArrayGeneratorProvider,
507 TestCases,
508 },
509 version::LanceFileVersion,
510 };
511
512 #[test_log::test(tokio::test)]
513 async fn test_utf8_binary() {
514 let field = Field::new("", DataType::Utf8, false);
515 check_specific_random(
516 field,
517 TestCases::basic().with_min_file_version(LanceFileVersion::V2_1),
518 )
519 .await;
520 }
521
522 #[rstest]
523 #[test_log::test(tokio::test)]
524 async fn test_binary(
525 #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
526 structural_encoding: &str,
527 #[values(DataType::Utf8, DataType::Binary)] data_type: DataType,
528 ) {
529 let mut field_metadata = HashMap::new();
530 field_metadata.insert(
531 STRUCTURAL_ENCODING_META_KEY.to_string(),
532 structural_encoding.into(),
533 );
534
535 let field = Field::new("", data_type, false).with_metadata(field_metadata);
536 check_basic_random(field).await;
537 }
538
539 #[rstest]
540 #[test_log::test(tokio::test)]
541 async fn test_binary_fsst(
542 #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
543 structural_encoding: &str,
544 #[values(DataType::Binary, DataType::Utf8)] data_type: DataType,
545 ) {
546 let mut field_metadata = HashMap::new();
547 field_metadata.insert(
548 STRUCTURAL_ENCODING_META_KEY.to_string(),
549 structural_encoding.into(),
550 );
551 field_metadata.insert(COMPRESSION_META_KEY.to_string(), "fsst".into());
552 let field = Field::new("", data_type, true).with_metadata(field_metadata);
553 let test_cases = TestCases::default().with_min_file_version(LanceFileVersion::V2_1);
555 check_specific_random(field, test_cases).await;
556 }
557
558 #[rstest]
559 #[test_log::test(tokio::test)]
560 async fn test_fsst_large_binary(
561 #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
562 structural_encoding: &str,
563 #[values(DataType::LargeBinary, DataType::LargeUtf8)] data_type: DataType,
564 ) {
565 let mut field_metadata = HashMap::new();
566 field_metadata.insert(
567 STRUCTURAL_ENCODING_META_KEY.to_string(),
568 structural_encoding.into(),
569 );
570 field_metadata.insert(COMPRESSION_META_KEY.to_string(), "fsst".into());
571 let field = Field::new("", data_type, true).with_metadata(field_metadata);
572 check_specific_random(
573 field,
574 TestCases::basic().with_min_file_version(LanceFileVersion::V2_1),
575 )
576 .await;
577 }
578
579 #[test_log::test(tokio::test)]
580 async fn test_large_binary() {
581 let field = Field::new("", DataType::LargeBinary, true);
582 check_basic_random(field).await;
583 }
584
585 #[test_log::test(tokio::test)]
586 async fn test_large_utf8() {
587 let field = Field::new("", DataType::LargeUtf8, true);
588 check_basic_random(field).await;
589 }
590
591 #[rstest]
592 #[test_log::test(tokio::test)]
593 async fn test_small_strings(
594 #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
595 structural_encoding: &str,
596 ) {
597 use crate::testing::check_basic_generated;
598
599 let mut field_metadata = HashMap::new();
600 field_metadata.insert(
601 STRUCTURAL_ENCODING_META_KEY.to_string(),
602 structural_encoding.into(),
603 );
604 let field = Field::new("", DataType::Utf8, true).with_metadata(field_metadata);
605 check_basic_generated(
606 field,
607 Box::new(FnArrayGeneratorProvider::new(move || {
608 lance_datagen::array::utf8_prefix_plus_counter("user_", false)
609 })),
610 )
611 .await;
612 }
613
614 #[rstest]
615 #[test_log::test(tokio::test)]
616 async fn test_simple_binary(
617 #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
618 structural_encoding: &str,
619 #[values(DataType::Utf8, DataType::Binary)] data_type: DataType,
620 ) {
621 let string_array = StringArray::from(vec![Some("abc"), None, Some("pqr"), None, Some("m")]);
622 let string_array = arrow_cast::cast(&string_array, &data_type).unwrap();
623
624 let mut field_metadata = HashMap::new();
625 field_metadata.insert(
626 STRUCTURAL_ENCODING_META_KEY.to_string(),
627 structural_encoding.into(),
628 );
629
630 let test_cases = TestCases::default()
631 .with_range(0..2)
632 .with_range(0..3)
633 .with_range(1..3)
634 .with_indices(vec![0, 1, 3, 4]);
635 check_round_trip_encoding_of_data(
636 vec![Arc::new(string_array)],
637 &test_cases,
638 field_metadata,
639 )
640 .await;
641 }
642
643 #[test_log::test(tokio::test)]
644 async fn test_sliced_utf8() {
645 let string_array = StringArray::from(vec![Some("abc"), Some("de"), None, Some("fgh")]);
646 let string_array = string_array.slice(1, 3);
647
648 let test_cases = TestCases::default()
649 .with_range(0..1)
650 .with_range(0..2)
651 .with_range(1..2);
652 check_round_trip_encoding_of_data(
653 vec![Arc::new(string_array)],
654 &test_cases,
655 HashMap::new(),
656 )
657 .await;
658 }
659
660 #[test_log::test(tokio::test)]
661 async fn test_bigger_than_max_page_size() {
662 let big_string = String::from_iter((0..(32 * 1024 * 1024)).map(|_| '0'));
664 let string_array = StringArray::from(vec![
665 Some(big_string),
666 Some("abc".to_string()),
667 None,
668 None,
669 Some("xyz".to_string()),
670 ]);
671
672 let test_cases = TestCases::default().with_max_page_size(1024 * 1024);
674
675 check_round_trip_encoding_of_data(
676 vec![Arc::new(string_array)],
677 &test_cases,
678 HashMap::new(),
679 )
680 .await;
681
682 let big_string = String::from_iter((0..(1000 * 1000)).map(|_| '0'));
686 let string_array = StringArray::from_iter_values((0..90).map(|_| big_string.clone()));
687
688 check_round_trip_encoding_of_data(
689 vec![Arc::new(string_array)],
690 &TestCases::default(),
691 HashMap::new(),
692 )
693 .await;
694 }
695
696 #[test_log::test(tokio::test)]
697 async fn test_empty_strings() {
698 let values = [Some("abc"), Some(""), None];
701 for order in [[0, 1, 2], [1, 0, 2], [2, 0, 1]] {
703 let mut string_builder = StringBuilder::new();
704 for idx in order {
705 string_builder.append_option(values[idx]);
706 }
707 let string_array = Arc::new(string_builder.finish());
708 let test_cases = TestCases::default()
709 .with_indices(vec![1])
710 .with_indices(vec![0])
711 .with_indices(vec![2])
712 .with_indices(vec![0, 1]);
713 check_round_trip_encoding_of_data(
714 vec![string_array.clone()],
715 &test_cases,
716 HashMap::new(),
717 )
718 .await;
719 let test_cases = test_cases.with_batch_size(1);
720 check_round_trip_encoding_of_data(vec![string_array], &test_cases, HashMap::new())
721 .await;
722 }
723
724 let string_array = Arc::new(StringArray::from(vec![Some(""), None, Some("")]));
729
730 let test_cases = TestCases::default().with_range(0..2).with_indices(vec![1]);
731 check_round_trip_encoding_of_data(vec![string_array.clone()], &test_cases, HashMap::new())
732 .await;
733 let test_cases = test_cases.with_batch_size(1);
734 check_round_trip_encoding_of_data(vec![string_array], &test_cases, HashMap::new()).await;
735 }
736
737 #[test_log::test(tokio::test)]
738 #[ignore] async fn test_jumbo_string() {
740 let mut string_builder = LargeStringBuilder::new();
744 let giant_string = String::from_iter((0..(1024 * 1024)).map(|_| '0'));
746 for _ in 0..5000 {
747 string_builder.append_option(Some(&giant_string));
748 }
749 let giant_array = Arc::new(string_builder.finish()) as ArrayRef;
750 let arrs = vec![giant_array];
751
752 let test_cases = TestCases::default().without_validation();
754 check_round_trip_encoding_of_data(arrs, &test_cases, HashMap::new()).await;
755 }
756
757 #[rstest]
758 #[test_log::test(tokio::test)]
759 async fn test_binary_dictionary_encoding(
760 #[values(true, false)] with_nulls: bool,
761 #[values(100, 500, 35000)] dict_size: u32,
762 ) {
763 let test_cases = TestCases::default().with_min_file_version(LanceFileVersion::V2_1);
764 let strings = (0..dict_size)
765 .map(|i| i.to_string())
766 .collect::<Vec<String>>();
767
768 let repeated_strings: Vec<_> = strings
769 .iter()
770 .cycle()
771 .take(70000)
772 .enumerate()
773 .map(|(i, s)| {
774 if with_nulls && i % 7 == 0 {
775 None
776 } else {
777 Some(s.clone())
778 }
779 })
780 .collect();
781 let string_array = Arc::new(StringArray::from(repeated_strings)) as ArrayRef;
782 check_round_trip_encoding_of_data(vec![string_array], &test_cases, HashMap::new()).await;
783 }
784
785 #[test_log::test(tokio::test)]
786 async fn test_binary_encoding_verification() {
787 use lance_datagen::{ByteCount, RowCount};
788
789 let test_cases = TestCases::default()
790 .with_expected_encoding("variable")
791 .with_min_file_version(LanceFileVersion::V2_1);
792
793 let arr_small = lance_datagen::gen_batch()
796 .anon_col(lance_datagen::array::rand_utf8(ByteCount::from(10), false))
797 .into_batch_rows(RowCount::from(1000))
798 .unwrap()
799 .column(0)
800 .clone();
801 check_round_trip_encoding_of_data(vec![arr_small], &test_cases, HashMap::new()).await;
802
803 let metadata_explicit =
805 HashMap::from([("lance-encoding:compression".to_string(), "none".to_string())]);
806 let arr_large = lance_datagen::gen_batch()
807 .anon_col(lance_datagen::array::rand_utf8(ByteCount::from(50), false))
808 .into_batch_rows(RowCount::from(2000))
809 .unwrap()
810 .column(0)
811 .clone();
812 check_round_trip_encoding_of_data(vec![arr_large], &test_cases, metadata_explicit).await;
813 }
814
815 #[test]
816 fn test_binary_miniblock_with_misaligned_buffer() {
817 use super::BinaryMiniBlockDecompressor;
818 use crate::buffer::LanceBuffer;
819 use crate::compression::MiniBlockDecompressor;
820 use crate::data::DataBlock;
821
822 {
824 let decompressor = BinaryMiniBlockDecompressor {
825 bits_per_offset: 32,
826 };
827
828 let mut test_data = Vec::new();
832
833 test_data.extend_from_slice(&12u32.to_le_bytes()); test_data.extend_from_slice(&15u32.to_le_bytes()); test_data.extend_from_slice(&20u32.to_le_bytes()); test_data.extend_from_slice(b"ABCXYZ"); test_data.extend_from_slice(&[0, 0]); let mut padded = Vec::with_capacity(test_data.len() + 1);
844 padded.push(0xFF); padded.extend_from_slice(&test_data);
846
847 let bytes = bytes::Bytes::from(padded);
848 let misaligned = bytes.slice(1..); let buffer = LanceBuffer::from_bytes(misaligned, 1);
852
853 let ptr = buffer.as_ref().as_ptr();
855 assert_ne!(
856 ptr.align_offset(4),
857 0,
858 "Test setup: buffer should be misaligned for u32"
859 );
860
861 let result = decompressor.decompress(vec![buffer], 2);
863 assert!(
864 result.is_ok(),
865 "Decompression should succeed with misaligned buffer"
866 );
867
868 if let Ok(DataBlock::VariableWidth(block)) = result {
870 assert_eq!(block.num_values, 2);
871 assert_eq!(&block.data.as_ref()[..6], b"ABCXYZ");
873 } else {
874 panic!("Expected VariableWidth block");
875 }
876 }
877
878 {
880 let decompressor = BinaryMiniBlockDecompressor {
881 bits_per_offset: 64,
882 };
883
884 let mut test_data = Vec::new();
886
887 test_data.extend_from_slice(&24u64.to_le_bytes()); test_data.extend_from_slice(&29u64.to_le_bytes()); test_data.extend_from_slice(&40u64.to_le_bytes()); test_data.extend_from_slice(b"HelloWorld"); test_data.extend_from_slice(&[0, 0, 0, 0, 0, 0]); let mut padded = Vec::with_capacity(test_data.len() + 3);
898 padded.extend_from_slice(&[0xFF, 0xFF, 0xFF]); padded.extend_from_slice(&test_data);
900
901 let bytes = bytes::Bytes::from(padded);
902 let misaligned = bytes.slice(3..); let buffer = LanceBuffer::from_bytes(misaligned, 1);
905
906 let ptr = buffer.as_ref().as_ptr();
908 assert_ne!(
909 ptr.align_offset(8),
910 0,
911 "Test setup: buffer should be misaligned for u64"
912 );
913
914 let result = decompressor.decompress(vec![buffer], 2);
916 assert!(
917 result.is_ok(),
918 "Decompression should succeed with misaligned u64 buffer"
919 );
920
921 if let Ok(DataBlock::VariableWidth(block)) = result {
922 assert_eq!(block.num_values, 2);
923 assert_eq!(&block.data.as_ref()[..10], b"HelloWorld");
925 } else {
926 panic!("Expected VariableWidth block");
927 }
928 }
929 }
930}