1use arrow_array::OffsetSizeTrait;
13use byteorder::{ByteOrder, LittleEndian};
14use core::panic;
15use snafu::location;
16
17use crate::compression::{
18 BlockCompressor, BlockDecompressor, MiniBlockDecompressor, VariablePerValueDecompressor,
19};
20
21use crate::buffer::LanceBuffer;
22use crate::data::{BlockInfo, DataBlock, VariableWidthBlock};
23use crate::encodings::logical::primitive::fullzip::{PerValueCompressor, PerValueDataBlock};
24use crate::encodings::logical::primitive::miniblock::{
25 MiniBlockChunk, MiniBlockCompressed, MiniBlockCompressor, MAX_MINIBLOCK_VALUES,
26};
27use crate::format::pb21::compressive_encoding::Compression;
28use crate::format::pb21::CompressiveEncoding;
29use crate::format::{pb21, ProtobufUtils21};
30
31use lance_core::utils::bit::pad_bytes_to;
32use lance_core::{Error, Result};
33
34#[derive(Debug)]
35pub struct BinaryMiniBlockEncoder {
36 minichunk_size: i64,
37}
38
39impl Default for BinaryMiniBlockEncoder {
40 fn default() -> Self {
41 Self {
42 minichunk_size: *AIM_MINICHUNK_SIZE,
43 }
44 }
45}
46
47const DEFAULT_AIM_MINICHUNK_SIZE: i64 = 4 * 1024;
48
49pub static AIM_MINICHUNK_SIZE: std::sync::LazyLock<i64> = std::sync::LazyLock::new(|| {
50 std::env::var("LANCE_BINARY_MINIBLOCK_CHUNK_SIZE")
51 .unwrap_or_else(|_| DEFAULT_AIM_MINICHUNK_SIZE.to_string())
52 .parse::<i64>()
53 .unwrap_or(DEFAULT_AIM_MINICHUNK_SIZE)
54});
55
56fn chunk_offsets<N: OffsetSizeTrait>(
58 offsets: &[N],
59 data: &[u8],
60 alignment: usize,
61 minichunk_size: i64,
62) -> (Vec<LanceBuffer>, Vec<MiniBlockChunk>) {
63 #[derive(Debug)]
64 struct ChunkInfo {
65 chunk_start_offset_in_orig_idx: usize,
66 chunk_last_offset_in_orig_idx: usize,
67 bytes_start_offset: usize,
69 padded_chunk_size: usize,
74 }
75
76 let byte_width: usize = N::get_byte_width();
77 let mut chunks_info = vec![];
78 let mut chunks = vec![];
79 let mut last_offset_in_orig_idx = 0;
80 loop {
81 let this_last_offset_in_orig_idx =
82 search_next_offset_idx(offsets, last_offset_in_orig_idx, minichunk_size);
83
84 let num_values_in_this_chunk = this_last_offset_in_orig_idx - last_offset_in_orig_idx;
85 let chunk_bytes = offsets[this_last_offset_in_orig_idx] - offsets[last_offset_in_orig_idx];
86 let this_chunk_size =
87 (num_values_in_this_chunk + 1) * byte_width + chunk_bytes.to_usize().unwrap();
88
89 let padded_chunk_size = this_chunk_size.next_multiple_of(alignment);
90 debug_assert!(padded_chunk_size > 0);
91
92 let this_chunk_bytes_start_offset = (num_values_in_this_chunk + 1) * byte_width;
93 chunks_info.push(ChunkInfo {
94 chunk_start_offset_in_orig_idx: last_offset_in_orig_idx,
95 chunk_last_offset_in_orig_idx: this_last_offset_in_orig_idx,
96 bytes_start_offset: this_chunk_bytes_start_offset,
97 padded_chunk_size,
98 });
99 chunks.push(MiniBlockChunk {
100 log_num_values: if this_last_offset_in_orig_idx == offsets.len() - 1 {
101 0
102 } else {
103 num_values_in_this_chunk.trailing_zeros() as u8
104 },
105 buffer_sizes: vec![padded_chunk_size as u32],
106 });
107 if this_last_offset_in_orig_idx == offsets.len() - 1 {
108 break;
109 }
110 last_offset_in_orig_idx = this_last_offset_in_orig_idx;
111 }
112
113 let output_total_bytes = chunks_info
114 .iter()
115 .map(|chunk_info| chunk_info.padded_chunk_size)
116 .sum::<usize>();
117
118 let mut output: Vec<u8> = Vec::with_capacity(output_total_bytes);
119
120 for chunk in chunks_info {
121 let this_chunk_offsets: Vec<N> = offsets
122 [chunk.chunk_start_offset_in_orig_idx..=chunk.chunk_last_offset_in_orig_idx]
123 .iter()
124 .map(|offset| {
125 *offset - offsets[chunk.chunk_start_offset_in_orig_idx]
126 + N::from_usize(chunk.bytes_start_offset).unwrap()
127 })
128 .collect();
129
130 let this_chunk_offsets = LanceBuffer::reinterpret_vec(this_chunk_offsets);
131 output.extend_from_slice(&this_chunk_offsets);
132
133 let start_in_orig = offsets[chunk.chunk_start_offset_in_orig_idx]
134 .to_usize()
135 .unwrap();
136 let end_in_orig = offsets[chunk.chunk_last_offset_in_orig_idx]
137 .to_usize()
138 .unwrap();
139 output.extend_from_slice(&data[start_in_orig..end_in_orig]);
140
141 const PAD_BYTE: u8 = 72;
143 let pad_len = pad_bytes_to(output.len(), alignment);
144
145 if pad_len > 0_usize {
147 output.extend(std::iter::repeat_n(PAD_BYTE, pad_len));
148 }
149 }
150 (vec![LanceBuffer::reinterpret_vec(output)], chunks)
151}
152
153fn search_next_offset_idx<N: OffsetSizeTrait>(
158 offsets: &[N],
159 last_offset_idx: usize,
160 minichunk_size: i64,
161) -> usize {
162 let mut num_values = 1;
163 let mut new_num_values = num_values * 2;
164 loop {
165 if last_offset_idx + new_num_values >= offsets.len() {
166 let existing_bytes = offsets[offsets.len() - 1] - offsets[last_offset_idx];
167 let new_size = existing_bytes
169 + N::from_usize((offsets.len() - last_offset_idx) * N::get_byte_width()).unwrap();
170 if new_size.to_i64().unwrap() <= minichunk_size {
171 return offsets.len() - 1;
173 } else {
174 return last_offset_idx + num_values;
176 }
177 }
178 let existing_bytes = offsets[last_offset_idx + new_num_values] - offsets[last_offset_idx];
179 let new_size =
180 existing_bytes + N::from_usize((new_num_values + 1) * N::get_byte_width()).unwrap();
181 if new_size.to_i64().unwrap() <= minichunk_size {
182 if new_num_values * 2 > MAX_MINIBLOCK_VALUES as usize {
183 break;
185 }
186 num_values = new_num_values;
187 new_num_values *= 2;
188 } else {
189 break;
190 }
191 }
192 last_offset_idx + num_values
193}
194
195impl BinaryMiniBlockEncoder {
196 pub fn new(minichunk_size: Option<i64>) -> Self {
197 Self {
198 minichunk_size: minichunk_size.unwrap_or(*AIM_MINICHUNK_SIZE),
199 }
200 }
201
202 fn chunk_data(&self, data: VariableWidthBlock) -> (MiniBlockCompressed, CompressiveEncoding) {
206 match data.bits_per_offset {
209 32 => {
210 let offsets = data.offsets.borrow_to_typed_slice::<i32>();
211 let (buffers, chunks) =
212 chunk_offsets(offsets.as_ref(), &data.data, 4, self.minichunk_size);
213 (
214 MiniBlockCompressed {
215 data: buffers,
216 chunks,
217 num_values: data.num_values,
218 },
219 ProtobufUtils21::variable(ProtobufUtils21::flat(32, None), None),
220 )
221 }
222 64 => {
223 let offsets = data.offsets.borrow_to_typed_slice::<i64>();
224 let (buffers, chunks) =
225 chunk_offsets(offsets.as_ref(), &data.data, 8, self.minichunk_size);
226 (
227 MiniBlockCompressed {
228 data: buffers,
229 chunks,
230 num_values: data.num_values,
231 },
232 ProtobufUtils21::variable(ProtobufUtils21::flat(64, None), None),
233 )
234 }
235 _ => panic!("Unsupported bits_per_offset={}", data.bits_per_offset),
236 }
237 }
238}
239
240impl MiniBlockCompressor for BinaryMiniBlockEncoder {
241 fn compress(&self, data: DataBlock) -> Result<(MiniBlockCompressed, CompressiveEncoding)> {
242 match data {
243 DataBlock::VariableWidth(variable_width) => Ok(self.chunk_data(variable_width)),
244 _ => Err(Error::InvalidInput {
245 source: format!(
246 "Cannot compress a data block of type {} with BinaryMiniBlockEncoder",
247 data.name()
248 )
249 .into(),
250 location: location!(),
251 }),
252 }
253 }
254}
255
256#[derive(Debug)]
257pub struct BinaryMiniBlockDecompressor {
258 bits_per_offset: u8,
259}
260
261impl BinaryMiniBlockDecompressor {
262 pub fn new(bits_per_offset: u8) -> Self {
263 assert!(bits_per_offset == 32 || bits_per_offset == 64);
264 Self { bits_per_offset }
265 }
266
267 pub fn from_variable(variable: &pb21::Variable) -> Self {
268 if let Compression::Flat(flat) = variable
269 .offsets
270 .as_ref()
271 .unwrap()
272 .compression
273 .as_ref()
274 .unwrap()
275 {
276 Self {
277 bits_per_offset: flat.bits_per_value as u8,
278 }
279 } else {
280 panic!("Unsupported offsets compression: {:?}", variable.offsets);
281 }
282 }
283}
284
285impl MiniBlockDecompressor for BinaryMiniBlockDecompressor {
286 fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock> {
291 assert_eq!(data.len(), 1);
292 let data = data.into_iter().next().unwrap();
293
294 if self.bits_per_offset == 64 {
295 assert!(data.len() >= 16);
297
298 let offsets_buffer = data.borrow_to_typed_slice::<u64>();
299 let offsets = offsets_buffer.as_ref();
300
301 let result_offsets = offsets[0..(num_values + 1) as usize]
302 .iter()
303 .map(|offset| offset - offsets[0])
304 .collect::<Vec<u64>>();
305
306 Ok(DataBlock::VariableWidth(VariableWidthBlock {
307 data: LanceBuffer::from(
308 data[offsets[0] as usize..offsets[num_values as usize] as usize].to_vec(),
309 ),
310 offsets: LanceBuffer::reinterpret_vec(result_offsets),
311 bits_per_offset: 64,
312 num_values,
313 block_info: BlockInfo::new(),
314 }))
315 } else {
316 assert!(data.len() >= 8);
318
319 let offsets_buffer = data.borrow_to_typed_slice::<u32>();
320 let offsets = offsets_buffer.as_ref();
321
322 let result_offsets = offsets[0..(num_values + 1) as usize]
323 .iter()
324 .map(|offset| offset - offsets[0])
325 .collect::<Vec<u32>>();
326
327 Ok(DataBlock::VariableWidth(VariableWidthBlock {
328 data: LanceBuffer::from(
329 data[offsets[0] as usize..offsets[num_values as usize] as usize].to_vec(),
330 ),
331 offsets: LanceBuffer::reinterpret_vec(result_offsets),
332 bits_per_offset: 32,
333 num_values,
334 block_info: BlockInfo::new(),
335 }))
336 }
337 }
338}
339
340#[derive(Debug, Default)]
350pub struct VariableEncoder {}
351
352impl BlockCompressor for VariableEncoder {
353 fn compress(&self, mut data: DataBlock) -> Result<LanceBuffer> {
354 match data {
355 DataBlock::VariableWidth(ref mut variable_width_data) => {
356 match variable_width_data.bits_per_offset {
357 32 => {
358 let offsets = variable_width_data.offsets.borrow_to_typed_slice::<u32>();
359 let offsets = offsets.as_ref();
360 let bytes_start_offset = 4 + 4 + std::mem::size_of_val(offsets) as u32;
363
364 let output_total_bytes =
365 bytes_start_offset as usize + variable_width_data.data.len();
366 let mut output: Vec<u8> = Vec::with_capacity(output_total_bytes);
367
368 output.extend_from_slice(&(32_u32).to_le_bytes());
370
371 output.extend_from_slice(&(bytes_start_offset).to_le_bytes());
373
374 output.extend_from_slice(&variable_width_data.offsets);
376
377 output.extend_from_slice(&variable_width_data.data);
379 Ok(LanceBuffer::from(output))
380 }
381 64 => {
382 let offsets = variable_width_data.offsets.borrow_to_typed_slice::<u64>();
383 let offsets = offsets.as_ref();
384 let bytes_start_offset = 8 + 8 + std::mem::size_of_val(offsets) as u64;
387
388 let output_total_bytes =
389 bytes_start_offset as usize + variable_width_data.data.len();
390 let mut output: Vec<u8> = Vec::with_capacity(output_total_bytes);
391
392 output.extend_from_slice(&(64_u64).to_le_bytes());
394
395 output.extend_from_slice(&(bytes_start_offset).to_le_bytes());
397
398 output.extend_from_slice(&variable_width_data.offsets);
400
401 output.extend_from_slice(&variable_width_data.data);
403 Ok(LanceBuffer::from(output))
404 }
405 _ => {
406 panic!("BinaryBlockEncoder does not work with {} bits per offset VariableWidth DataBlock.",
407 variable_width_data.bits_per_offset);
408 }
409 }
410 }
411 _ => {
412 panic!("BinaryBlockEncoder can only work with Variable Width DataBlock.");
413 }
414 }
415 }
416}
417
418impl PerValueCompressor for VariableEncoder {
419 fn compress(&self, data: DataBlock) -> Result<(PerValueDataBlock, CompressiveEncoding)> {
420 let DataBlock::VariableWidth(variable) = data else {
421 panic!("BinaryPerValueCompressor can only work with Variable Width DataBlock.");
422 };
423
424 let encoding = ProtobufUtils21::variable(
425 ProtobufUtils21::flat(variable.bits_per_offset as u64, None),
426 None,
427 );
428 Ok((PerValueDataBlock::Variable(variable), encoding))
429 }
430}
431
432#[derive(Debug, Default)]
433pub struct VariableDecoder {}
434
435impl VariablePerValueDecompressor for VariableDecoder {
436 fn decompress(&self, data: VariableWidthBlock) -> Result<DataBlock> {
437 Ok(DataBlock::VariableWidth(data))
438 }
439}
440
441#[derive(Debug, Default)]
442pub struct BinaryBlockDecompressor {}
443
444impl BlockDecompressor for BinaryBlockDecompressor {
445 fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock> {
446 let is_old_scheme = data[1] != 0 || data[2] != 0 || data[3] != 0;
458
459 let (bits_per_offset, bytes_start_offset, offset_start) = if is_old_scheme {
460 let bits_per_offset = data[0];
462 match bits_per_offset {
463 32 => {
464 debug_assert_eq!(LittleEndian::read_u32(&data[1..5]), num_values as u32);
465 let bytes_start_offset = LittleEndian::read_u32(&data[5..9]);
466 (bits_per_offset, bytes_start_offset as u64, 9)
467 }
468 64 => {
469 debug_assert_eq!(LittleEndian::read_u64(&data[1..9]), num_values);
470 let bytes_start_offset = LittleEndian::read_u64(&data[9..17]);
471 (bits_per_offset, bytes_start_offset, 17)
472 }
473 _ => {
474 return Err(Error::InvalidInput {
475 source: format!("Unsupported bits_per_offset={}", bits_per_offset).into(),
476 location: location!(),
477 });
478 }
479 }
480 } else {
481 let bits_per_offset = LittleEndian::read_u32(&data[0..4]) as u8;
483 match bits_per_offset {
484 32 => {
485 let bytes_start_offset = LittleEndian::read_u32(&data[4..8]);
486 (bits_per_offset, bytes_start_offset as u64, 8)
487 }
488 64 => {
489 let bytes_start_offset = LittleEndian::read_u64(&data[8..16]);
490 (bits_per_offset, bytes_start_offset, 16)
491 }
492 _ => {
493 return Err(Error::InvalidInput {
494 source: format!("Unsupported bits_per_offset={}", bits_per_offset).into(),
495 location: location!(),
496 });
497 }
498 }
499 };
500
501 let offsets =
503 data.slice_with_length(offset_start, bytes_start_offset as usize - offset_start);
504
505 let data = data.slice_with_length(
507 bytes_start_offset as usize,
508 data.len() - bytes_start_offset as usize,
509 );
510
511 Ok(DataBlock::VariableWidth(VariableWidthBlock {
512 data,
513 offsets,
514 bits_per_offset,
515 num_values,
516 block_info: BlockInfo::new(),
517 }))
518 }
519}
520
521#[cfg(test)]
522pub mod tests {
523 use arrow_array::{
524 builder::{LargeStringBuilder, StringBuilder},
525 ArrayRef, StringArray,
526 };
527 use arrow_schema::{DataType, Field};
528
529 use crate::{
530 constants::{
531 COMPRESSION_META_KEY, STRUCTURAL_ENCODING_FULLZIP, STRUCTURAL_ENCODING_META_KEY,
532 STRUCTURAL_ENCODING_MINIBLOCK,
533 },
534 testing::check_specific_random,
535 };
536 use rstest::rstest;
537 use std::{collections::HashMap, sync::Arc, vec};
538
539 use crate::{
540 testing::{
541 check_basic_random, check_round_trip_encoding_of_data, FnArrayGeneratorProvider,
542 TestCases,
543 },
544 version::LanceFileVersion,
545 };
546
547 #[test_log::test(tokio::test)]
548 async fn test_utf8_binary() {
549 let field = Field::new("", DataType::Utf8, false);
550 check_specific_random(
551 field,
552 TestCases::basic().with_min_file_version(LanceFileVersion::V2_1),
553 )
554 .await;
555 }
556
557 #[rstest]
558 #[test_log::test(tokio::test)]
559 async fn test_binary(
560 #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
561 structural_encoding: &str,
562 #[values(DataType::Utf8, DataType::Binary)] data_type: DataType,
563 ) {
564 let mut field_metadata = HashMap::new();
565 field_metadata.insert(
566 STRUCTURAL_ENCODING_META_KEY.to_string(),
567 structural_encoding.into(),
568 );
569
570 let field = Field::new("", data_type, false).with_metadata(field_metadata);
571 check_basic_random(field).await;
572 }
573
574 #[rstest]
575 #[test_log::test(tokio::test)]
576 async fn test_binary_fsst(
577 #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
578 structural_encoding: &str,
579 #[values(DataType::Binary, DataType::Utf8)] data_type: DataType,
580 ) {
581 let mut field_metadata = HashMap::new();
582 field_metadata.insert(
583 STRUCTURAL_ENCODING_META_KEY.to_string(),
584 structural_encoding.into(),
585 );
586 field_metadata.insert(COMPRESSION_META_KEY.to_string(), "fsst".into());
587 let field = Field::new("", data_type, true).with_metadata(field_metadata);
588 let test_cases = TestCases::default().with_min_file_version(LanceFileVersion::V2_1);
590 check_specific_random(field, test_cases).await;
591 }
592
593 #[rstest]
594 #[test_log::test(tokio::test)]
595 async fn test_fsst_large_binary(
596 #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
597 structural_encoding: &str,
598 #[values(DataType::LargeBinary, DataType::LargeUtf8)] data_type: DataType,
599 ) {
600 let mut field_metadata = HashMap::new();
601 field_metadata.insert(
602 STRUCTURAL_ENCODING_META_KEY.to_string(),
603 structural_encoding.into(),
604 );
605 field_metadata.insert(COMPRESSION_META_KEY.to_string(), "fsst".into());
606 let field = Field::new("", data_type, true).with_metadata(field_metadata);
607 check_specific_random(
608 field,
609 TestCases::basic().with_min_file_version(LanceFileVersion::V2_1),
610 )
611 .await;
612 }
613
614 #[test_log::test(tokio::test)]
615 async fn test_large_binary() {
616 let field = Field::new("", DataType::LargeBinary, true);
617 check_basic_random(field).await;
618 }
619
620 #[test_log::test(tokio::test)]
621 async fn test_large_utf8() {
622 let field = Field::new("", DataType::LargeUtf8, true);
623 check_basic_random(field).await;
624 }
625
626 #[rstest]
627 #[test_log::test(tokio::test)]
628 async fn test_small_strings(
629 #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
630 structural_encoding: &str,
631 ) {
632 use crate::testing::check_basic_generated;
633
634 let mut field_metadata = HashMap::new();
635 field_metadata.insert(
636 STRUCTURAL_ENCODING_META_KEY.to_string(),
637 structural_encoding.into(),
638 );
639 let field = Field::new("", DataType::Utf8, true).with_metadata(field_metadata);
640 check_basic_generated(
641 field,
642 Box::new(FnArrayGeneratorProvider::new(move || {
643 lance_datagen::array::utf8_prefix_plus_counter("user_", false)
644 })),
645 )
646 .await;
647 }
648
649 #[rstest]
650 #[test_log::test(tokio::test)]
651 async fn test_simple_binary(
652 #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
653 structural_encoding: &str,
654 #[values(DataType::Utf8, DataType::Binary)] data_type: DataType,
655 ) {
656 let string_array = StringArray::from(vec![Some("abc"), None, Some("pqr"), None, Some("m")]);
657 let string_array = arrow_cast::cast(&string_array, &data_type).unwrap();
658
659 let mut field_metadata = HashMap::new();
660 field_metadata.insert(
661 STRUCTURAL_ENCODING_META_KEY.to_string(),
662 structural_encoding.into(),
663 );
664
665 let test_cases = TestCases::default()
666 .with_range(0..2)
667 .with_range(0..3)
668 .with_range(1..3)
669 .with_indices(vec![0, 1, 3, 4]);
670 check_round_trip_encoding_of_data(
671 vec![Arc::new(string_array)],
672 &test_cases,
673 field_metadata,
674 )
675 .await;
676 }
677
678 #[test_log::test(tokio::test)]
679 async fn test_sliced_utf8() {
680 let string_array = StringArray::from(vec![Some("abc"), Some("de"), None, Some("fgh")]);
681 let string_array = string_array.slice(1, 3);
682
683 let test_cases = TestCases::default()
684 .with_range(0..1)
685 .with_range(0..2)
686 .with_range(1..2);
687 check_round_trip_encoding_of_data(
688 vec![Arc::new(string_array)],
689 &test_cases,
690 HashMap::new(),
691 )
692 .await;
693 }
694
695 #[test_log::test(tokio::test)]
696 async fn test_bigger_than_max_page_size() {
697 let big_string = String::from_iter((0..(32 * 1024 * 1024)).map(|_| '0'));
699 let string_array = StringArray::from(vec![
700 Some(big_string),
701 Some("abc".to_string()),
702 None,
703 None,
704 Some("xyz".to_string()),
705 ]);
706
707 let test_cases = TestCases::default().with_max_page_size(1024 * 1024);
709
710 check_round_trip_encoding_of_data(
711 vec![Arc::new(string_array)],
712 &test_cases,
713 HashMap::new(),
714 )
715 .await;
716
717 let big_string = String::from_iter((0..(1000 * 1000)).map(|_| '0'));
721 let string_array = StringArray::from_iter_values((0..90).map(|_| big_string.clone()));
722
723 check_round_trip_encoding_of_data(
724 vec![Arc::new(string_array)],
725 &TestCases::default(),
726 HashMap::new(),
727 )
728 .await;
729 }
730
731 #[test_log::test(tokio::test)]
732 async fn test_empty_strings() {
733 let values = [Some("abc"), Some(""), None];
736 for order in [[0, 1, 2], [1, 0, 2], [2, 0, 1]] {
738 let mut string_builder = StringBuilder::new();
739 for idx in order {
740 string_builder.append_option(values[idx]);
741 }
742 let string_array = Arc::new(string_builder.finish());
743 let test_cases = TestCases::default()
744 .with_indices(vec![1])
745 .with_indices(vec![0])
746 .with_indices(vec![2])
747 .with_indices(vec![0, 1]);
748 check_round_trip_encoding_of_data(
749 vec![string_array.clone()],
750 &test_cases,
751 HashMap::new(),
752 )
753 .await;
754 let test_cases = test_cases.with_batch_size(1);
755 check_round_trip_encoding_of_data(vec![string_array], &test_cases, HashMap::new())
756 .await;
757 }
758
759 let string_array = Arc::new(StringArray::from(vec![Some(""), None, Some("")]));
764
765 let test_cases = TestCases::default().with_range(0..2).with_indices(vec![1]);
766 check_round_trip_encoding_of_data(vec![string_array.clone()], &test_cases, HashMap::new())
767 .await;
768 let test_cases = test_cases.with_batch_size(1);
769 check_round_trip_encoding_of_data(vec![string_array], &test_cases, HashMap::new()).await;
770 }
771
772 #[test_log::test(tokio::test)]
773 #[ignore] async fn test_jumbo_string() {
775 let mut string_builder = LargeStringBuilder::new();
779 let giant_string = String::from_iter((0..(1024 * 1024)).map(|_| '0'));
781 for _ in 0..5000 {
782 string_builder.append_option(Some(&giant_string));
783 }
784 let giant_array = Arc::new(string_builder.finish()) as ArrayRef;
785 let arrs = vec![giant_array];
786
787 let test_cases = TestCases::default().without_validation();
789 check_round_trip_encoding_of_data(arrs, &test_cases, HashMap::new()).await;
790 }
791
792 #[rstest]
793 #[test_log::test(tokio::test)]
794 async fn test_binary_dictionary_encoding(
795 #[values(true, false)] with_nulls: bool,
796 #[values(100, 500, 35000)] dict_size: u32,
797 ) {
798 let test_cases = TestCases::default().with_min_file_version(LanceFileVersion::V2_1);
799 let strings = (0..dict_size)
800 .map(|i| i.to_string())
801 .collect::<Vec<String>>();
802
803 let repeated_strings: Vec<_> = strings
804 .iter()
805 .cycle()
806 .take(70000)
807 .enumerate()
808 .map(|(i, s)| {
809 if with_nulls && i % 7 == 0 {
810 None
811 } else {
812 Some(s.clone())
813 }
814 })
815 .collect();
816 let string_array = Arc::new(StringArray::from(repeated_strings)) as ArrayRef;
817 check_round_trip_encoding_of_data(vec![string_array], &test_cases, HashMap::new()).await;
818 }
819
820 #[test_log::test(tokio::test)]
821 async fn test_binary_encoding_verification() {
822 use lance_datagen::{ByteCount, RowCount};
823
824 let test_cases = TestCases::default()
825 .with_expected_encoding("variable")
826 .with_min_file_version(LanceFileVersion::V2_1);
827
828 let arr_small = lance_datagen::gen_batch()
831 .anon_col(lance_datagen::array::rand_utf8(ByteCount::from(10), false))
832 .into_batch_rows(RowCount::from(1000))
833 .unwrap()
834 .column(0)
835 .clone();
836 check_round_trip_encoding_of_data(vec![arr_small], &test_cases, HashMap::new()).await;
837
838 let metadata_explicit =
840 HashMap::from([("lance-encoding:compression".to_string(), "none".to_string())]);
841 let arr_large = lance_datagen::gen_batch()
842 .anon_col(lance_datagen::array::rand_utf8(ByteCount::from(50), false))
843 .into_batch_rows(RowCount::from(2000))
844 .unwrap()
845 .column(0)
846 .clone();
847 check_round_trip_encoding_of_data(vec![arr_large], &test_cases, metadata_explicit).await;
848 }
849
850 #[test]
851 fn test_binary_miniblock_with_misaligned_buffer() {
852 use super::BinaryMiniBlockDecompressor;
853 use crate::buffer::LanceBuffer;
854 use crate::compression::MiniBlockDecompressor;
855 use crate::data::DataBlock;
856
857 {
859 let decompressor = BinaryMiniBlockDecompressor {
860 bits_per_offset: 32,
861 };
862
863 let mut test_data = Vec::new();
867
868 test_data.extend_from_slice(&12u32.to_le_bytes()); test_data.extend_from_slice(&15u32.to_le_bytes()); test_data.extend_from_slice(&20u32.to_le_bytes()); test_data.extend_from_slice(b"ABCXYZ"); test_data.extend_from_slice(&[0, 0]); let mut padded = Vec::with_capacity(test_data.len() + 1);
879 padded.push(0xFF); padded.extend_from_slice(&test_data);
881
882 let bytes = bytes::Bytes::from(padded);
883 let misaligned = bytes.slice(1..); let buffer = LanceBuffer::from_bytes(misaligned, 1);
887
888 let ptr = buffer.as_ref().as_ptr();
890 assert_ne!(
891 ptr.align_offset(4),
892 0,
893 "Test setup: buffer should be misaligned for u32"
894 );
895
896 let result = decompressor.decompress(vec![buffer], 2);
898 assert!(
899 result.is_ok(),
900 "Decompression should succeed with misaligned buffer"
901 );
902
903 if let Ok(DataBlock::VariableWidth(block)) = result {
905 assert_eq!(block.num_values, 2);
906 assert_eq!(&block.data.as_ref()[..6], b"ABCXYZ");
908 } else {
909 panic!("Expected VariableWidth block");
910 }
911 }
912
913 {
915 let decompressor = BinaryMiniBlockDecompressor {
916 bits_per_offset: 64,
917 };
918
919 let mut test_data = Vec::new();
921
922 test_data.extend_from_slice(&24u64.to_le_bytes()); test_data.extend_from_slice(&29u64.to_le_bytes()); test_data.extend_from_slice(&40u64.to_le_bytes()); test_data.extend_from_slice(b"HelloWorld"); test_data.extend_from_slice(&[0, 0, 0, 0, 0, 0]); let mut padded = Vec::with_capacity(test_data.len() + 3);
933 padded.extend_from_slice(&[0xFF, 0xFF, 0xFF]); padded.extend_from_slice(&test_data);
935
936 let bytes = bytes::Bytes::from(padded);
937 let misaligned = bytes.slice(3..); let buffer = LanceBuffer::from_bytes(misaligned, 1);
940
941 let ptr = buffer.as_ref().as_ptr();
943 assert_ne!(
944 ptr.align_offset(8),
945 0,
946 "Test setup: buffer should be misaligned for u64"
947 );
948
949 let result = decompressor.decompress(vec![buffer], 2);
951 assert!(
952 result.is_ok(),
953 "Decompression should succeed with misaligned u64 buffer"
954 );
955
956 if let Ok(DataBlock::VariableWidth(block)) = result {
957 assert_eq!(block.num_values, 2);
958 assert_eq!(&block.data.as_ref()[..10], b"HelloWorld");
960 } else {
961 panic!("Expected VariableWidth block");
962 }
963 }
964 }
965}