1use arrow_array::OffsetSizeTrait;
13use byteorder::{ByteOrder, LittleEndian};
14use core::panic;
15
16use crate::compression::{
17 BlockCompressor, BlockDecompressor, MiniBlockDecompressor, VariablePerValueDecompressor,
18};
19
20use crate::buffer::LanceBuffer;
21use crate::data::{BlockInfo, DataBlock, VariableWidthBlock};
22use crate::encodings::logical::primitive::fullzip::{PerValueCompressor, PerValueDataBlock};
23use crate::encodings::logical::primitive::miniblock::{
24 MAX_MINIBLOCK_VALUES, MiniBlockChunk, MiniBlockCompressed, MiniBlockCompressor,
25};
26use crate::format::pb21::CompressiveEncoding;
27use crate::format::pb21::compressive_encoding::Compression;
28use crate::format::{ProtobufUtils21, pb21};
29
30use lance_core::utils::bit::pad_bytes_to;
31use lance_core::{Error, Result};
32
33#[derive(Debug)]
34pub struct BinaryMiniBlockEncoder {
35 minichunk_size: i64,
36}
37
38impl Default for BinaryMiniBlockEncoder {
39 fn default() -> Self {
40 Self {
41 minichunk_size: *AIM_MINICHUNK_SIZE,
42 }
43 }
44}
45
46const DEFAULT_AIM_MINICHUNK_SIZE: i64 = 4 * 1024;
47
48pub static AIM_MINICHUNK_SIZE: std::sync::LazyLock<i64> = std::sync::LazyLock::new(|| {
49 std::env::var("LANCE_BINARY_MINIBLOCK_CHUNK_SIZE")
50 .unwrap_or_else(|_| DEFAULT_AIM_MINICHUNK_SIZE.to_string())
51 .parse::<i64>()
52 .unwrap_or(DEFAULT_AIM_MINICHUNK_SIZE)
53});
54
55fn chunk_offsets<N: OffsetSizeTrait>(
57 offsets: &[N],
58 data: &[u8],
59 alignment: usize,
60 minichunk_size: i64,
61) -> (Vec<LanceBuffer>, Vec<MiniBlockChunk>) {
62 #[derive(Debug)]
63 struct ChunkInfo {
64 chunk_start_offset_in_orig_idx: usize,
65 chunk_last_offset_in_orig_idx: usize,
66 bytes_start_offset: usize,
68 padded_chunk_size: usize,
73 }
74
75 let byte_width: usize = N::get_byte_width();
76 let mut chunks_info = vec![];
77 let mut chunks = vec![];
78 let mut last_offset_in_orig_idx = 0;
79 loop {
80 let this_last_offset_in_orig_idx =
81 search_next_offset_idx(offsets, last_offset_in_orig_idx, minichunk_size);
82
83 let num_values_in_this_chunk = this_last_offset_in_orig_idx - last_offset_in_orig_idx;
84 let chunk_bytes = offsets[this_last_offset_in_orig_idx] - offsets[last_offset_in_orig_idx];
85 let this_chunk_size =
86 (num_values_in_this_chunk + 1) * byte_width + chunk_bytes.to_usize().unwrap();
87
88 let padded_chunk_size = this_chunk_size.next_multiple_of(alignment);
89 debug_assert!(padded_chunk_size > 0);
90
91 let this_chunk_bytes_start_offset = (num_values_in_this_chunk + 1) * byte_width;
92 chunks_info.push(ChunkInfo {
93 chunk_start_offset_in_orig_idx: last_offset_in_orig_idx,
94 chunk_last_offset_in_orig_idx: this_last_offset_in_orig_idx,
95 bytes_start_offset: this_chunk_bytes_start_offset,
96 padded_chunk_size,
97 });
98 chunks.push(MiniBlockChunk {
99 log_num_values: if this_last_offset_in_orig_idx == offsets.len() - 1 {
100 0
101 } else {
102 num_values_in_this_chunk.trailing_zeros() as u8
103 },
104 buffer_sizes: vec![padded_chunk_size as u32],
105 });
106 if this_last_offset_in_orig_idx == offsets.len() - 1 {
107 break;
108 }
109 last_offset_in_orig_idx = this_last_offset_in_orig_idx;
110 }
111
112 let output_total_bytes = chunks_info
113 .iter()
114 .map(|chunk_info| chunk_info.padded_chunk_size)
115 .sum::<usize>();
116
117 let mut output: Vec<u8> = Vec::with_capacity(output_total_bytes);
118
119 for chunk in chunks_info {
120 let this_chunk_offsets: Vec<N> = offsets
121 [chunk.chunk_start_offset_in_orig_idx..=chunk.chunk_last_offset_in_orig_idx]
122 .iter()
123 .map(|offset| {
124 *offset - offsets[chunk.chunk_start_offset_in_orig_idx]
125 + N::from_usize(chunk.bytes_start_offset).unwrap()
126 })
127 .collect();
128
129 let this_chunk_offsets = LanceBuffer::reinterpret_vec(this_chunk_offsets);
130 output.extend_from_slice(&this_chunk_offsets);
131
132 let start_in_orig = offsets[chunk.chunk_start_offset_in_orig_idx]
133 .to_usize()
134 .unwrap();
135 let end_in_orig = offsets[chunk.chunk_last_offset_in_orig_idx]
136 .to_usize()
137 .unwrap();
138 output.extend_from_slice(&data[start_in_orig..end_in_orig]);
139
140 const PAD_BYTE: u8 = 72;
142 let pad_len = pad_bytes_to(output.len(), alignment);
143
144 if pad_len > 0_usize {
146 output.extend(std::iter::repeat_n(PAD_BYTE, pad_len));
147 }
148 }
149 (vec![LanceBuffer::reinterpret_vec(output)], chunks)
150}
151
152fn search_next_offset_idx<N: OffsetSizeTrait>(
157 offsets: &[N],
158 last_offset_idx: usize,
159 minichunk_size: i64,
160) -> usize {
161 let remaining_values = offsets.len().saturating_sub(last_offset_idx + 1);
165 if remaining_values <= 1 {
166 return offsets.len() - 1;
167 }
168
169 let mut num_values = 2;
170 let mut new_num_values = num_values * 2;
171 loop {
172 if last_offset_idx + new_num_values >= offsets.len() {
173 let existing_bytes = offsets[offsets.len() - 1] - offsets[last_offset_idx];
174 let new_size = existing_bytes
176 + N::from_usize((offsets.len() - last_offset_idx) * N::get_byte_width()).unwrap();
177 if new_size.to_i64().unwrap() <= minichunk_size {
178 return offsets.len() - 1;
180 } else {
181 return last_offset_idx + num_values;
183 }
184 }
185 let existing_bytes = offsets[last_offset_idx + new_num_values] - offsets[last_offset_idx];
186 let new_size =
187 existing_bytes + N::from_usize((new_num_values + 1) * N::get_byte_width()).unwrap();
188 if new_size.to_i64().unwrap() <= minichunk_size {
189 if new_num_values * 2 > MAX_MINIBLOCK_VALUES as usize {
190 break;
192 }
193 num_values = new_num_values;
194 new_num_values *= 2;
195 } else {
196 break;
197 }
198 }
199 last_offset_idx + num_values
200}
201
202impl BinaryMiniBlockEncoder {
203 pub fn new(minichunk_size: Option<i64>) -> Self {
204 Self {
205 minichunk_size: minichunk_size.unwrap_or(*AIM_MINICHUNK_SIZE),
206 }
207 }
208
209 fn chunk_data(&self, data: VariableWidthBlock) -> (MiniBlockCompressed, CompressiveEncoding) {
213 match data.bits_per_offset {
216 32 => {
217 let offsets = data.offsets.borrow_to_typed_slice::<i32>();
218 let (buffers, chunks) =
219 chunk_offsets(offsets.as_ref(), &data.data, 4, self.minichunk_size);
220 (
221 MiniBlockCompressed {
222 data: buffers,
223 chunks,
224 num_values: data.num_values,
225 },
226 ProtobufUtils21::variable(ProtobufUtils21::flat(32, None), None),
227 )
228 }
229 64 => {
230 let offsets = data.offsets.borrow_to_typed_slice::<i64>();
231 let (buffers, chunks) =
232 chunk_offsets(offsets.as_ref(), &data.data, 8, self.minichunk_size);
233 (
234 MiniBlockCompressed {
235 data: buffers,
236 chunks,
237 num_values: data.num_values,
238 },
239 ProtobufUtils21::variable(ProtobufUtils21::flat(64, None), None),
240 )
241 }
242 _ => panic!("Unsupported bits_per_offset={}", data.bits_per_offset),
243 }
244 }
245}
246
247impl MiniBlockCompressor for BinaryMiniBlockEncoder {
248 fn compress(&self, data: DataBlock) -> Result<(MiniBlockCompressed, CompressiveEncoding)> {
249 match data {
250 DataBlock::VariableWidth(variable_width) => Ok(self.chunk_data(variable_width)),
251 _ => Err(Error::invalid_input_source(
252 format!(
253 "Cannot compress a data block of type {} with BinaryMiniBlockEncoder",
254 data.name()
255 )
256 .into(),
257 )),
258 }
259 }
260}
261
262#[derive(Debug)]
263pub struct BinaryMiniBlockDecompressor {
264 bits_per_offset: u8,
265}
266
267impl BinaryMiniBlockDecompressor {
268 pub fn new(bits_per_offset: u8) -> Self {
269 assert!(bits_per_offset == 32 || bits_per_offset == 64);
270 Self { bits_per_offset }
271 }
272
273 pub fn from_variable(variable: &pb21::Variable) -> Self {
274 if let Compression::Flat(flat) = variable
275 .offsets
276 .as_ref()
277 .unwrap()
278 .compression
279 .as_ref()
280 .unwrap()
281 {
282 Self {
283 bits_per_offset: flat.bits_per_value as u8,
284 }
285 } else {
286 panic!("Unsupported offsets compression: {:?}", variable.offsets);
287 }
288 }
289}
290
291impl MiniBlockDecompressor for BinaryMiniBlockDecompressor {
292 fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock> {
297 assert_eq!(data.len(), 1);
298 let data = data.into_iter().next().unwrap();
299
300 if self.bits_per_offset == 64 {
301 assert!(data.len() >= 16);
303
304 let offsets_buffer = data.borrow_to_typed_slice::<u64>();
305 let offsets = offsets_buffer.as_ref();
306
307 let result_offsets = offsets[0..(num_values + 1) as usize]
308 .iter()
309 .map(|offset| offset - offsets[0])
310 .collect::<Vec<u64>>();
311
312 Ok(DataBlock::VariableWidth(VariableWidthBlock {
313 data: LanceBuffer::from(
314 data[offsets[0] as usize..offsets[num_values as usize] as usize].to_vec(),
315 ),
316 offsets: LanceBuffer::reinterpret_vec(result_offsets),
317 bits_per_offset: 64,
318 num_values,
319 block_info: BlockInfo::new(),
320 }))
321 } else {
322 assert!(data.len() >= 8);
324
325 let offsets_buffer = data.borrow_to_typed_slice::<u32>();
326 let offsets = offsets_buffer.as_ref();
327
328 let result_offsets = offsets[0..(num_values + 1) as usize]
329 .iter()
330 .map(|offset| offset - offsets[0])
331 .collect::<Vec<u32>>();
332
333 Ok(DataBlock::VariableWidth(VariableWidthBlock {
334 data: LanceBuffer::from(
335 data[offsets[0] as usize..offsets[num_values as usize] as usize].to_vec(),
336 ),
337 offsets: LanceBuffer::reinterpret_vec(result_offsets),
338 bits_per_offset: 32,
339 num_values,
340 block_info: BlockInfo::new(),
341 }))
342 }
343 }
344}
345
346#[derive(Debug, Default)]
356pub struct VariableEncoder {}
357
358impl BlockCompressor for VariableEncoder {
359 fn compress(&self, mut data: DataBlock) -> Result<LanceBuffer> {
360 match data {
361 DataBlock::VariableWidth(ref mut variable_width_data) => {
362 match variable_width_data.bits_per_offset {
363 32 => {
364 let offsets = variable_width_data.offsets.borrow_to_typed_slice::<u32>();
365 let offsets = offsets.as_ref();
366 let bytes_start_offset = 4 + 4 + std::mem::size_of_val(offsets) as u32;
369
370 let output_total_bytes =
371 bytes_start_offset as usize + variable_width_data.data.len();
372 let mut output: Vec<u8> = Vec::with_capacity(output_total_bytes);
373
374 output.extend_from_slice(&(32_u32).to_le_bytes());
376
377 output.extend_from_slice(&(bytes_start_offset).to_le_bytes());
379
380 output.extend_from_slice(&variable_width_data.offsets);
382
383 output.extend_from_slice(&variable_width_data.data);
385 Ok(LanceBuffer::from(output))
386 }
387 64 => {
388 let offsets = variable_width_data.offsets.borrow_to_typed_slice::<u64>();
389 let offsets = offsets.as_ref();
390 let bytes_start_offset = 8 + 8 + std::mem::size_of_val(offsets) as u64;
393
394 let output_total_bytes =
395 bytes_start_offset as usize + variable_width_data.data.len();
396 let mut output: Vec<u8> = Vec::with_capacity(output_total_bytes);
397
398 output.extend_from_slice(&(64_u64).to_le_bytes());
400
401 output.extend_from_slice(&(bytes_start_offset).to_le_bytes());
403
404 output.extend_from_slice(&variable_width_data.offsets);
406
407 output.extend_from_slice(&variable_width_data.data);
409 Ok(LanceBuffer::from(output))
410 }
411 _ => {
412 panic!(
413 "BinaryBlockEncoder does not work with {} bits per offset VariableWidth DataBlock.",
414 variable_width_data.bits_per_offset
415 );
416 }
417 }
418 }
419 _ => {
420 panic!("BinaryBlockEncoder can only work with Variable Width DataBlock.");
421 }
422 }
423 }
424}
425
426impl PerValueCompressor for VariableEncoder {
427 fn compress(&self, data: DataBlock) -> Result<(PerValueDataBlock, CompressiveEncoding)> {
428 let DataBlock::VariableWidth(variable) = data else {
429 panic!("BinaryPerValueCompressor can only work with Variable Width DataBlock.");
430 };
431
432 let encoding = ProtobufUtils21::variable(
433 ProtobufUtils21::flat(variable.bits_per_offset as u64, None),
434 None,
435 );
436 Ok((PerValueDataBlock::Variable(variable), encoding))
437 }
438}
439
440#[derive(Debug, Default)]
441pub struct VariableDecoder {}
442
443impl VariablePerValueDecompressor for VariableDecoder {
444 fn decompress(&self, data: VariableWidthBlock) -> Result<DataBlock> {
445 Ok(DataBlock::VariableWidth(data))
446 }
447}
448
449#[derive(Debug, Default)]
450pub struct BinaryBlockDecompressor {}
451
452impl BlockDecompressor for BinaryBlockDecompressor {
453 fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock> {
454 let is_old_scheme = data[1] != 0 || data[2] != 0 || data[3] != 0;
466
467 let (bits_per_offset, bytes_start_offset, offset_start) = if is_old_scheme {
468 let bits_per_offset = data[0];
470 match bits_per_offset {
471 32 => {
472 debug_assert_eq!(LittleEndian::read_u32(&data[1..5]), num_values as u32);
473 let bytes_start_offset = LittleEndian::read_u32(&data[5..9]);
474 (bits_per_offset, bytes_start_offset as u64, 9)
475 }
476 64 => {
477 debug_assert_eq!(LittleEndian::read_u64(&data[1..9]), num_values);
478 let bytes_start_offset = LittleEndian::read_u64(&data[9..17]);
479 (bits_per_offset, bytes_start_offset, 17)
480 }
481 _ => {
482 return Err(Error::invalid_input_source(
483 format!("Unsupported bits_per_offset={}", bits_per_offset).into(),
484 ));
485 }
486 }
487 } else {
488 let bits_per_offset = LittleEndian::read_u32(&data[0..4]) as u8;
490 match bits_per_offset {
491 32 => {
492 let bytes_start_offset = LittleEndian::read_u32(&data[4..8]);
493 (bits_per_offset, bytes_start_offset as u64, 8)
494 }
495 64 => {
496 let bytes_start_offset = LittleEndian::read_u64(&data[8..16]);
497 (bits_per_offset, bytes_start_offset, 16)
498 }
499 _ => {
500 return Err(Error::invalid_input_source(
501 format!("Unsupported bits_per_offset={}", bits_per_offset).into(),
502 ));
503 }
504 }
505 };
506
507 let offsets =
509 data.slice_with_length(offset_start, bytes_start_offset as usize - offset_start);
510
511 let data = data.slice_with_length(
513 bytes_start_offset as usize,
514 data.len() - bytes_start_offset as usize,
515 );
516
517 Ok(DataBlock::VariableWidth(VariableWidthBlock {
518 data,
519 offsets,
520 bits_per_offset,
521 num_values,
522 block_info: BlockInfo::new(),
523 }))
524 }
525}
526
527#[cfg(test)]
528pub mod tests {
529 use arrow_array::{
530 ArrayRef, StringArray,
531 builder::{LargeStringBuilder, StringBuilder},
532 };
533 use arrow_schema::{DataType, Field};
534
535 use crate::{
536 constants::{
537 COMPRESSION_META_KEY, STRUCTURAL_ENCODING_FULLZIP, STRUCTURAL_ENCODING_META_KEY,
538 STRUCTURAL_ENCODING_MINIBLOCK,
539 },
540 testing::check_specific_random,
541 };
542 use rstest::rstest;
543 use std::{collections::HashMap, sync::Arc, vec};
544
545 use crate::{
546 testing::{
547 FnArrayGeneratorProvider, TestCases, check_basic_random,
548 check_round_trip_encoding_of_data,
549 },
550 version::LanceFileVersion,
551 };
552
553 #[test_log::test(tokio::test)]
554 async fn test_utf8_binary() {
555 let field = Field::new("", DataType::Utf8, false);
556 check_specific_random(
557 field,
558 TestCases::basic().with_min_file_version(LanceFileVersion::V2_1),
559 )
560 .await;
561 }
562
563 #[rstest]
564 #[test_log::test(tokio::test)]
565 async fn test_binary(
566 #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
567 structural_encoding: &str,
568 #[values(DataType::Utf8, DataType::Binary)] data_type: DataType,
569 ) {
570 let mut field_metadata = HashMap::new();
571 field_metadata.insert(
572 STRUCTURAL_ENCODING_META_KEY.to_string(),
573 structural_encoding.into(),
574 );
575
576 let field = Field::new("", data_type, false).with_metadata(field_metadata);
577 check_basic_random(field).await;
578 }
579
580 #[rstest]
581 #[test_log::test(tokio::test)]
582 async fn test_binary_fsst(
583 #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
584 structural_encoding: &str,
585 #[values(DataType::Binary, DataType::Utf8)] data_type: DataType,
586 ) {
587 let mut field_metadata = HashMap::new();
588 field_metadata.insert(
589 STRUCTURAL_ENCODING_META_KEY.to_string(),
590 structural_encoding.into(),
591 );
592 field_metadata.insert(COMPRESSION_META_KEY.to_string(), "fsst".into());
593 let field = Field::new("", data_type, true).with_metadata(field_metadata);
594 let test_cases = TestCases::default().with_min_file_version(LanceFileVersion::V2_1);
596 check_specific_random(field, test_cases).await;
597 }
598
599 #[rstest]
600 #[test_log::test(tokio::test)]
601 async fn test_fsst_large_binary(
602 #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
603 structural_encoding: &str,
604 #[values(DataType::LargeBinary, DataType::LargeUtf8)] data_type: DataType,
605 ) {
606 let mut field_metadata = HashMap::new();
607 field_metadata.insert(
608 STRUCTURAL_ENCODING_META_KEY.to_string(),
609 structural_encoding.into(),
610 );
611 field_metadata.insert(COMPRESSION_META_KEY.to_string(), "fsst".into());
612 let field = Field::new("", data_type, true).with_metadata(field_metadata);
613 check_specific_random(
614 field,
615 TestCases::basic().with_min_file_version(LanceFileVersion::V2_1),
616 )
617 .await;
618 }
619
620 #[test_log::test(tokio::test)]
621 async fn test_large_binary() {
622 let field = Field::new("", DataType::LargeBinary, true);
623 check_basic_random(field).await;
624 }
625
626 #[test_log::test(tokio::test)]
627 async fn test_large_utf8() {
628 let field = Field::new("", DataType::LargeUtf8, true);
629 check_basic_random(field).await;
630 }
631
632 #[rstest]
633 #[test_log::test(tokio::test)]
634 async fn test_small_strings(
635 #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
636 structural_encoding: &str,
637 ) {
638 use crate::testing::check_basic_generated;
639
640 let mut field_metadata = HashMap::new();
641 field_metadata.insert(
642 STRUCTURAL_ENCODING_META_KEY.to_string(),
643 structural_encoding.into(),
644 );
645 let field = Field::new("", DataType::Utf8, true).with_metadata(field_metadata);
646 check_basic_generated(
647 field,
648 Box::new(FnArrayGeneratorProvider::new(move || {
649 lance_datagen::array::utf8_prefix_plus_counter("user_", false)
650 })),
651 )
652 .await;
653 }
654
655 #[rstest]
656 #[test_log::test(tokio::test)]
657 async fn test_simple_binary(
658 #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
659 structural_encoding: &str,
660 #[values(DataType::Utf8, DataType::Binary)] data_type: DataType,
661 ) {
662 let string_array = StringArray::from(vec![Some("abc"), None, Some("pqr"), None, Some("m")]);
663 let string_array = arrow_cast::cast(&string_array, &data_type).unwrap();
664
665 let mut field_metadata = HashMap::new();
666 field_metadata.insert(
667 STRUCTURAL_ENCODING_META_KEY.to_string(),
668 structural_encoding.into(),
669 );
670
671 let test_cases = TestCases::default()
672 .with_range(0..2)
673 .with_range(0..3)
674 .with_range(1..3)
675 .with_indices(vec![0, 1, 3, 4]);
676 check_round_trip_encoding_of_data(
677 vec![Arc::new(string_array)],
678 &test_cases,
679 field_metadata,
680 )
681 .await;
682 }
683
684 #[test_log::test(tokio::test)]
685 async fn test_sliced_utf8() {
686 let string_array = StringArray::from(vec![Some("abc"), Some("de"), None, Some("fgh")]);
687 let string_array = string_array.slice(1, 3);
688
689 let test_cases = TestCases::default()
690 .with_range(0..1)
691 .with_range(0..2)
692 .with_range(1..2);
693 check_round_trip_encoding_of_data(
694 vec![Arc::new(string_array)],
695 &test_cases,
696 HashMap::new(),
697 )
698 .await;
699 }
700
701 #[test_log::test(tokio::test)]
702 async fn test_bigger_than_max_page_size() {
703 let big_string = String::from_iter((0..(32 * 1024 * 1024)).map(|_| '0'));
705 let string_array = StringArray::from(vec![
706 Some(big_string),
707 Some("abc".to_string()),
708 None,
709 None,
710 Some("xyz".to_string()),
711 ]);
712
713 let test_cases = TestCases::default().with_max_page_size(1024 * 1024);
715
716 check_round_trip_encoding_of_data(
717 vec![Arc::new(string_array)],
718 &test_cases,
719 HashMap::new(),
720 )
721 .await;
722
723 let big_string = String::from_iter((0..(1000 * 1000)).map(|_| '0'));
727 let string_array = StringArray::from_iter_values((0..90).map(|_| big_string.clone()));
728
729 check_round_trip_encoding_of_data(
730 vec![Arc::new(string_array)],
731 &TestCases::default(),
732 HashMap::new(),
733 )
734 .await;
735 }
736
737 #[test_log::test(tokio::test)]
738 async fn test_empty_strings() {
739 let values = [Some("abc"), Some(""), None];
742 for order in [[0, 1, 2], [1, 0, 2], [2, 0, 1]] {
744 let mut string_builder = StringBuilder::new();
745 for idx in order {
746 string_builder.append_option(values[idx]);
747 }
748 let string_array = Arc::new(string_builder.finish());
749 let test_cases = TestCases::default()
750 .with_indices(vec![1])
751 .with_indices(vec![0])
752 .with_indices(vec![2])
753 .with_indices(vec![0, 1]);
754 check_round_trip_encoding_of_data(
755 vec![string_array.clone()],
756 &test_cases,
757 HashMap::new(),
758 )
759 .await;
760 let test_cases = test_cases.with_batch_size(1);
761 check_round_trip_encoding_of_data(vec![string_array], &test_cases, HashMap::new())
762 .await;
763 }
764
765 let string_array = Arc::new(StringArray::from(vec![Some(""), None, Some("")]));
770
771 let test_cases = TestCases::default().with_range(0..2).with_indices(vec![1]);
772 check_round_trip_encoding_of_data(vec![string_array.clone()], &test_cases, HashMap::new())
773 .await;
774 let test_cases = test_cases.with_batch_size(1);
775 check_round_trip_encoding_of_data(vec![string_array], &test_cases, HashMap::new()).await;
776 }
777
778 #[test_log::test(tokio::test)]
779 #[ignore] async fn test_jumbo_string() {
781 let mut string_builder = LargeStringBuilder::new();
785 let giant_string = String::from_iter((0..(1024 * 1024)).map(|_| '0'));
787 for _ in 0..5000 {
788 string_builder.append_option(Some(&giant_string));
789 }
790 let giant_array = Arc::new(string_builder.finish()) as ArrayRef;
791 let arrs = vec![giant_array];
792
793 let test_cases = TestCases::default().without_validation();
795 check_round_trip_encoding_of_data(arrs, &test_cases, HashMap::new()).await;
796 }
797
798 #[rstest]
799 #[test_log::test(tokio::test)]
800 async fn test_binary_dictionary_encoding(
801 #[values(true, false)] with_nulls: bool,
802 #[values(100, 500, 35000)] dict_size: u32,
803 ) {
804 let test_cases = TestCases::default().with_min_file_version(LanceFileVersion::V2_1);
805 let strings = (0..dict_size)
806 .map(|i| i.to_string())
807 .collect::<Vec<String>>();
808
809 let repeated_strings: Vec<_> = strings
810 .iter()
811 .cycle()
812 .take(70000)
813 .enumerate()
814 .map(|(i, s)| {
815 if with_nulls && i % 7 == 0 {
816 None
817 } else {
818 Some(s.clone())
819 }
820 })
821 .collect();
822 let string_array = Arc::new(StringArray::from(repeated_strings)) as ArrayRef;
823 check_round_trip_encoding_of_data(vec![string_array], &test_cases, HashMap::new()).await;
824 }
825
826 #[test_log::test(tokio::test)]
827 async fn test_binary_encoding_verification() {
828 use lance_datagen::{ByteCount, RowCount};
829
830 let test_cases = TestCases::default()
831 .with_expected_encoding("variable")
832 .with_min_file_version(LanceFileVersion::V2_1);
833
834 let arr_small = lance_datagen::gen_batch()
837 .anon_col(lance_datagen::array::rand_utf8(ByteCount::from(10), false))
838 .into_batch_rows(RowCount::from(1000))
839 .unwrap()
840 .column(0)
841 .clone();
842 check_round_trip_encoding_of_data(vec![arr_small], &test_cases, HashMap::new()).await;
843
844 let metadata_explicit =
846 HashMap::from([("lance-encoding:compression".to_string(), "none".to_string())]);
847 let arr_large = lance_datagen::gen_batch()
848 .anon_col(lance_datagen::array::rand_utf8(ByteCount::from(50), false))
849 .into_batch_rows(RowCount::from(2000))
850 .unwrap()
851 .column(0)
852 .clone();
853 check_round_trip_encoding_of_data(vec![arr_large], &test_cases, metadata_explicit).await;
854 }
855
856 #[test]
857 fn test_binary_miniblock_with_misaligned_buffer() {
858 use super::BinaryMiniBlockDecompressor;
859 use crate::buffer::LanceBuffer;
860 use crate::compression::MiniBlockDecompressor;
861 use crate::data::DataBlock;
862
863 {
865 let decompressor = BinaryMiniBlockDecompressor {
866 bits_per_offset: 32,
867 };
868
869 let mut test_data = Vec::new();
873
874 test_data.extend_from_slice(&12u32.to_le_bytes()); test_data.extend_from_slice(&15u32.to_le_bytes()); test_data.extend_from_slice(&20u32.to_le_bytes()); test_data.extend_from_slice(b"ABCXYZ"); test_data.extend_from_slice(&[0, 0]); let mut padded = Vec::with_capacity(test_data.len() + 1);
885 padded.push(0xFF); padded.extend_from_slice(&test_data);
887
888 let bytes = bytes::Bytes::from(padded);
889 let misaligned = bytes.slice(1..); let buffer = LanceBuffer::from_bytes(misaligned, 1);
893
894 let ptr = buffer.as_ref().as_ptr();
896 assert_ne!(
897 ptr.align_offset(4),
898 0,
899 "Test setup: buffer should be misaligned for u32"
900 );
901
902 let result = decompressor.decompress(vec![buffer], 2);
904 assert!(
905 result.is_ok(),
906 "Decompression should succeed with misaligned buffer"
907 );
908
909 if let Ok(DataBlock::VariableWidth(block)) = result {
911 assert_eq!(block.num_values, 2);
912 assert_eq!(&block.data.as_ref()[..6], b"ABCXYZ");
914 } else {
915 panic!("Expected VariableWidth block");
916 }
917 }
918
919 {
921 let decompressor = BinaryMiniBlockDecompressor {
922 bits_per_offset: 64,
923 };
924
925 let mut test_data = Vec::new();
927
928 test_data.extend_from_slice(&24u64.to_le_bytes()); test_data.extend_from_slice(&29u64.to_le_bytes()); test_data.extend_from_slice(&40u64.to_le_bytes()); test_data.extend_from_slice(b"HelloWorld"); test_data.extend_from_slice(&[0, 0, 0, 0, 0, 0]); let mut padded = Vec::with_capacity(test_data.len() + 3);
939 padded.extend_from_slice(&[0xFF, 0xFF, 0xFF]); padded.extend_from_slice(&test_data);
941
942 let bytes = bytes::Bytes::from(padded);
943 let misaligned = bytes.slice(3..); let buffer = LanceBuffer::from_bytes(misaligned, 1);
946
947 let ptr = buffer.as_ref().as_ptr();
949 assert_ne!(
950 ptr.align_offset(8),
951 0,
952 "Test setup: buffer should be misaligned for u64"
953 );
954
955 let result = decompressor.decompress(vec![buffer], 2);
957 assert!(
958 result.is_ok(),
959 "Decompression should succeed with misaligned u64 buffer"
960 );
961
962 if let Ok(DataBlock::VariableWidth(block)) = result {
963 assert_eq!(block.num_values, 2);
964 assert_eq!(&block.data.as_ref()[..10], b"HelloWorld");
966 } else {
967 panic!("Expected VariableWidth block");
968 }
969 }
970 }
971}