1use arrow_array::OffsetSizeTrait;
13use bytemuck::cast_slice;
14use byteorder::{ByteOrder, LittleEndian};
15use core::panic;
16use snafu::location;
17
18use crate::compression::{
19 BlockCompressor, BlockDecompressor, MiniBlockDecompressor, VariablePerValueDecompressor,
20};
21
22use crate::buffer::LanceBuffer;
23use crate::data::{BlockInfo, DataBlock, VariableWidthBlock};
24use crate::encodings::logical::primitive::fullzip::{PerValueCompressor, PerValueDataBlock};
25use crate::encodings::logical::primitive::miniblock::{
26 MiniBlockChunk, MiniBlockCompressed, MiniBlockCompressor,
27};
28use crate::format::pb21::compressive_encoding::Compression;
29use crate::format::pb21::CompressiveEncoding;
30use crate::format::{pb21, ProtobufUtils21};
31
32use lance_core::utils::bit::pad_bytes_to;
33use lance_core::{Error, Result};
34
35#[derive(Debug, Default)]
36pub struct BinaryMiniBlockEncoder {}
37
38const AIM_MINICHUNK_SIZE: i64 = 4 * 1024;
39
40fn chunk_offsets<N: OffsetSizeTrait>(
42 offsets: &[N],
43 data: &[u8],
44 alignment: usize,
45) -> (Vec<LanceBuffer>, Vec<MiniBlockChunk>) {
46 #[derive(Debug)]
47 struct ChunkInfo {
48 chunk_start_offset_in_orig_idx: usize,
49 chunk_last_offset_in_orig_idx: usize,
50 bytes_start_offset: usize,
52 padded_chunk_size: usize,
57 }
58
59 let byte_width: usize = N::get_byte_width();
60 let mut chunks_info = vec![];
61 let mut chunks = vec![];
62 let mut last_offset_in_orig_idx = 0;
63 loop {
64 let this_last_offset_in_orig_idx = search_next_offset_idx(offsets, last_offset_in_orig_idx);
65
66 let num_values_in_this_chunk = this_last_offset_in_orig_idx - last_offset_in_orig_idx;
67 let chunk_bytes = offsets[this_last_offset_in_orig_idx] - offsets[last_offset_in_orig_idx];
68 let this_chunk_size =
69 (num_values_in_this_chunk + 1) * byte_width + chunk_bytes.to_usize().unwrap();
70
71 let padded_chunk_size = this_chunk_size.next_multiple_of(alignment);
72
73 let this_chunk_bytes_start_offset = (num_values_in_this_chunk + 1) * byte_width;
74 chunks_info.push(ChunkInfo {
75 chunk_start_offset_in_orig_idx: last_offset_in_orig_idx,
76 chunk_last_offset_in_orig_idx: this_last_offset_in_orig_idx,
77 bytes_start_offset: this_chunk_bytes_start_offset,
78 padded_chunk_size,
79 });
80 chunks.push(MiniBlockChunk {
81 log_num_values: if this_last_offset_in_orig_idx == offsets.len() - 1 {
82 0
83 } else {
84 num_values_in_this_chunk.trailing_zeros() as u8
85 },
86 buffer_sizes: vec![padded_chunk_size as u16],
87 });
88 if this_last_offset_in_orig_idx == offsets.len() - 1 {
89 break;
90 }
91 last_offset_in_orig_idx = this_last_offset_in_orig_idx;
92 }
93
94 let output_total_bytes = chunks_info
95 .iter()
96 .map(|chunk_info| chunk_info.padded_chunk_size)
97 .sum::<usize>();
98
99 let mut output: Vec<u8> = Vec::with_capacity(output_total_bytes);
100
101 for chunk in chunks_info {
102 let this_chunk_offsets: Vec<N> = offsets
103 [chunk.chunk_start_offset_in_orig_idx..=chunk.chunk_last_offset_in_orig_idx]
104 .iter()
105 .map(|offset| {
106 *offset - offsets[chunk.chunk_start_offset_in_orig_idx]
107 + N::from_usize(chunk.bytes_start_offset).unwrap()
108 })
109 .collect();
110
111 let this_chunk_offsets = LanceBuffer::reinterpret_vec(this_chunk_offsets);
112 output.extend_from_slice(&this_chunk_offsets);
113
114 let start_in_orig = offsets[chunk.chunk_start_offset_in_orig_idx]
115 .to_usize()
116 .unwrap();
117 let end_in_orig = offsets[chunk.chunk_last_offset_in_orig_idx]
118 .to_usize()
119 .unwrap();
120 output.extend_from_slice(&data[start_in_orig..end_in_orig]);
121
122 const PAD_BYTE: u8 = 72;
124 let pad_len = pad_bytes_to(output.len(), alignment);
125
126 if pad_len > 0_usize {
128 output.extend(std::iter::repeat_n(PAD_BYTE, pad_len));
129 }
130 }
131 (vec![LanceBuffer::reinterpret_vec(output)], chunks)
132}
133
134fn search_next_offset_idx<N: OffsetSizeTrait>(offsets: &[N], last_offset_idx: usize) -> usize {
139 let mut num_values = 1;
140 let mut new_num_values = num_values * 2;
141 loop {
142 if last_offset_idx + new_num_values >= offsets.len() {
143 let existing_bytes = offsets[offsets.len() - 1] - offsets[last_offset_idx];
144 let new_size = existing_bytes
146 + N::from_usize((offsets.len() - last_offset_idx) * N::get_byte_width()).unwrap();
147 if new_size.to_i64().unwrap() <= AIM_MINICHUNK_SIZE {
148 return offsets.len() - 1;
150 } else {
151 return last_offset_idx + num_values;
153 }
154 }
155 let existing_bytes = offsets[last_offset_idx + new_num_values] - offsets[last_offset_idx];
156 let new_size =
157 existing_bytes + N::from_usize((new_num_values + 1) * N::get_byte_width()).unwrap();
158 if new_size.to_i64().unwrap() <= AIM_MINICHUNK_SIZE {
159 num_values = new_num_values;
160 new_num_values *= 2;
161 } else {
162 break;
163 }
164 }
165 last_offset_idx + new_num_values
166}
167
168impl BinaryMiniBlockEncoder {
169 fn chunk_data(&self, data: VariableWidthBlock) -> (MiniBlockCompressed, CompressiveEncoding) {
173 match data.bits_per_offset {
176 32 => {
177 let offsets = data.offsets.borrow_to_typed_slice::<i32>();
178 let (buffers, chunks) = chunk_offsets(offsets.as_ref(), &data.data, 4);
179 (
180 MiniBlockCompressed {
181 data: buffers,
182 chunks,
183 num_values: data.num_values,
184 },
185 ProtobufUtils21::variable(ProtobufUtils21::flat(32, None), None),
186 )
187 }
188 64 => {
189 let offsets = data.offsets.borrow_to_typed_slice::<i64>();
190 let (buffers, chunks) = chunk_offsets(offsets.as_ref(), &data.data, 8);
191 (
192 MiniBlockCompressed {
193 data: buffers,
194 chunks,
195 num_values: data.num_values,
196 },
197 ProtobufUtils21::variable(ProtobufUtils21::flat(64, None), None),
198 )
199 }
200 _ => panic!("Unsupported bits_per_offset={}", data.bits_per_offset),
201 }
202 }
203}
204
205impl MiniBlockCompressor for BinaryMiniBlockEncoder {
206 fn compress(&self, data: DataBlock) -> Result<(MiniBlockCompressed, CompressiveEncoding)> {
207 match data {
208 DataBlock::VariableWidth(variable_width) => Ok(self.chunk_data(variable_width)),
209 _ => Err(Error::InvalidInput {
210 source: format!(
211 "Cannot compress a data block of type {} with BinaryMiniBlockEncoder",
212 data.name()
213 )
214 .into(),
215 location: location!(),
216 }),
217 }
218 }
219}
220
221#[derive(Debug)]
222pub struct BinaryMiniBlockDecompressor {
223 bits_per_offset: u8,
224}
225
226impl BinaryMiniBlockDecompressor {
227 pub fn new(bits_per_offset: u8) -> Self {
228 assert!(bits_per_offset == 32 || bits_per_offset == 64);
229 Self { bits_per_offset }
230 }
231
232 pub fn from_variable(variable: &pb21::Variable) -> Self {
233 if let Compression::Flat(flat) = variable
234 .offsets
235 .as_ref()
236 .unwrap()
237 .compression
238 .as_ref()
239 .unwrap()
240 {
241 Self {
242 bits_per_offset: flat.bits_per_value as u8,
243 }
244 } else {
245 panic!("Unsupported offsets compression: {:?}", variable.offsets);
246 }
247 }
248}
249
250impl MiniBlockDecompressor for BinaryMiniBlockDecompressor {
251 fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock> {
256 assert_eq!(data.len(), 1);
257 let data = data.into_iter().next().unwrap();
258
259 if self.bits_per_offset == 64 {
260 assert!(data.len() >= 16);
262
263 let offsets_buffer = data.borrow_to_typed_slice::<u64>();
264 let offsets = offsets_buffer.as_ref();
265
266 let result_offsets = offsets[0..(num_values + 1) as usize]
267 .iter()
268 .map(|offset| offset - offsets[0])
269 .collect::<Vec<u64>>();
270
271 Ok(DataBlock::VariableWidth(VariableWidthBlock {
272 data: LanceBuffer::from(
273 data[offsets[0] as usize..offsets[num_values as usize] as usize].to_vec(),
274 ),
275 offsets: LanceBuffer::reinterpret_vec(result_offsets),
276 bits_per_offset: 64,
277 num_values,
278 block_info: BlockInfo::new(),
279 }))
280 } else {
281 assert!(data.len() >= 8);
283
284 let offsets_buffer = data.borrow_to_typed_slice::<u32>();
285 let offsets = offsets_buffer.as_ref();
286
287 let result_offsets = offsets[0..(num_values + 1) as usize]
288 .iter()
289 .map(|offset| offset - offsets[0])
290 .collect::<Vec<u32>>();
291
292 Ok(DataBlock::VariableWidth(VariableWidthBlock {
293 data: LanceBuffer::from(
294 data[offsets[0] as usize..offsets[num_values as usize] as usize].to_vec(),
295 ),
296 offsets: LanceBuffer::reinterpret_vec(result_offsets),
297 bits_per_offset: 32,
298 num_values,
299 block_info: BlockInfo::new(),
300 }))
301 }
302 }
303}
304
305#[derive(Debug, Default)]
315pub struct VariableEncoder {}
316
317impl BlockCompressor for VariableEncoder {
318 fn compress(&self, mut data: DataBlock) -> Result<LanceBuffer> {
319 match data {
320 DataBlock::VariableWidth(ref mut variable_width_data) => {
321 let num_values: u64 = variable_width_data.num_values;
322 match variable_width_data.bits_per_offset {
323 32 => {
324 let num_values: u32 = num_values
325 .try_into()
326 .expect("The Maximum number of values BinaryBlockEncoder can work with is u32::MAX");
327
328 let offsets = variable_width_data.offsets.borrow_to_typed_slice::<u32>();
329 let offsets = offsets.as_ref();
330 let bytes_start_offset = 1 + 4 + 4 + std::mem::size_of_val(offsets) as u32;
334
335 let output_total_bytes =
336 bytes_start_offset as usize + variable_width_data.data.len();
337 let mut output: Vec<u8> = Vec::with_capacity(output_total_bytes);
338
339 output.push(32_u8);
341
342 output.extend_from_slice(&(num_values).to_le_bytes());
344
345 output.extend_from_slice(&(bytes_start_offset).to_le_bytes());
347
348 output.extend_from_slice(cast_slice(offsets));
350
351 output.extend_from_slice(&variable_width_data.data);
353 Ok(LanceBuffer::from(output))
354 }
355 64 => {
356 let offsets = variable_width_data.offsets.borrow_to_typed_slice::<u64>();
357 let offsets = offsets.as_ref();
358 let bytes_start_offset = 1 + 8 + 8 + std::mem::size_of_val(offsets) as u64;
363
364 let output_total_bytes =
365 bytes_start_offset as usize + variable_width_data.data.len();
366 let mut output: Vec<u8> = Vec::with_capacity(output_total_bytes);
367
368 output.push(64_u8);
370
371 output.extend_from_slice(&(num_values).to_le_bytes());
373
374 output.extend_from_slice(&(bytes_start_offset).to_le_bytes());
376
377 output.extend_from_slice(cast_slice(offsets));
379
380 output.extend_from_slice(&variable_width_data.data);
382 Ok(LanceBuffer::from(output))
383 }
384 _ => {
385 panic!("BinaryBlockEncoder does not work with {} bits per offset VariableWidth DataBlock.",
386 variable_width_data.bits_per_offset);
387 }
388 }
389 }
390 _ => {
391 panic!("BinaryBlockEncoder can only work with Variable Width DataBlock.");
392 }
393 }
394 }
395}
396
397impl PerValueCompressor for VariableEncoder {
398 fn compress(&self, data: DataBlock) -> Result<(PerValueDataBlock, CompressiveEncoding)> {
399 let DataBlock::VariableWidth(variable) = data else {
400 panic!("BinaryPerValueCompressor can only work with Variable Width DataBlock.");
401 };
402
403 let encoding = ProtobufUtils21::variable(
404 ProtobufUtils21::flat(variable.bits_per_offset as u64, None),
405 None,
406 );
407 Ok((PerValueDataBlock::Variable(variable), encoding))
408 }
409}
410
411#[derive(Debug, Default)]
412pub struct VariableDecoder {}
413
414impl VariablePerValueDecompressor for VariableDecoder {
415 fn decompress(&self, data: VariableWidthBlock) -> Result<DataBlock> {
416 Ok(DataBlock::VariableWidth(data))
417 }
418}
419
420#[derive(Debug, Default)]
421pub struct BinaryBlockDecompressor {}
422
423impl BlockDecompressor for BinaryBlockDecompressor {
424 fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock> {
425 let bits_per_offset = data[0];
427 match bits_per_offset {
428 32 => {
429 let stored_num_values = LittleEndian::read_u32(&data[1..5]);
431 debug_assert_eq!(num_values, stored_num_values as u64);
432
433 let bytes_start_offset = LittleEndian::read_u32(&data[5..9]);
435
436 let offsets = data.slice_with_length(9, bytes_start_offset as usize - 9);
438
439 let data = data.slice_with_length(
441 bytes_start_offset as usize,
442 data.len() - bytes_start_offset as usize,
443 );
444
445 Ok(DataBlock::VariableWidth(VariableWidthBlock {
446 data,
447 offsets,
448 bits_per_offset: 32,
449 num_values,
450 block_info: BlockInfo::new(),
451 }))
452 }
453 64 => {
454 let stored_num_values = LittleEndian::read_u64(&data[1..9]);
456 debug_assert_eq!(num_values, stored_num_values);
457
458 let bytes_start_offset = LittleEndian::read_u64(&data[9..17]);
460
461 let offsets = data.slice_with_length(17, bytes_start_offset as usize - 17);
463
464 let data = data.slice_with_length(
466 bytes_start_offset as usize,
467 data.len() - bytes_start_offset as usize,
468 );
469
470 Ok(DataBlock::VariableWidth(VariableWidthBlock {
471 data,
472 offsets,
473 bits_per_offset: 64,
474 num_values,
475 block_info: BlockInfo::new(),
476 }))
477 }
478 _ => panic!("Unsupported bits_per_offset={}", bits_per_offset),
479 }
480 }
481}
482
483#[cfg(test)]
484pub mod tests {
485 use arrow_array::{
486 builder::{LargeStringBuilder, StringBuilder},
487 ArrayRef, StringArray,
488 };
489 use arrow_schema::{DataType, Field};
490
491 use crate::constants::{
492 COMPRESSION_META_KEY, STRUCTURAL_ENCODING_FULLZIP, STRUCTURAL_ENCODING_META_KEY,
493 STRUCTURAL_ENCODING_MINIBLOCK,
494 };
495 use rstest::rstest;
496 use std::{collections::HashMap, sync::Arc, vec};
497
498 use crate::{
499 testing::{
500 check_round_trip_encoding_generated, check_round_trip_encoding_of_data,
501 check_round_trip_encoding_random, FnArrayGeneratorProvider, TestCases,
502 },
503 version::LanceFileVersion,
504 };
505
506 #[rstest]
507 #[test_log::test(tokio::test)]
508 async fn test_utf8_binary(
509 #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
510 ) {
511 let field = Field::new("", DataType::Utf8, false);
512 check_round_trip_encoding_random(field, version).await;
513 }
514
515 #[rstest]
516 #[test_log::test(tokio::test)]
517 async fn test_binary(
518 #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
519 #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
520 structural_encoding: &str,
521 #[values(DataType::Utf8, DataType::Binary)] data_type: DataType,
522 ) {
523 let mut field_metadata = HashMap::new();
524 field_metadata.insert(
525 STRUCTURAL_ENCODING_META_KEY.to_string(),
526 structural_encoding.into(),
527 );
528
529 let field = Field::new("", data_type, false).with_metadata(field_metadata);
530 check_round_trip_encoding_random(field, version).await;
531 }
532
533 #[rstest]
534 #[test_log::test(tokio::test)]
535 async fn test_binary_fsst(
536 #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
537 structural_encoding: &str,
538 #[values(DataType::Binary, DataType::Utf8)] data_type: DataType,
539 ) {
540 let mut field_metadata = HashMap::new();
541 field_metadata.insert(
542 STRUCTURAL_ENCODING_META_KEY.to_string(),
543 structural_encoding.into(),
544 );
545 field_metadata.insert(COMPRESSION_META_KEY.to_string(), "fsst".into());
546 let field = Field::new("", data_type, true).with_metadata(field_metadata);
547 check_round_trip_encoding_random(field, LanceFileVersion::V2_1).await;
548 }
549
550 #[rstest]
551 #[test_log::test(tokio::test)]
552 async fn test_large_binary_fsst(
553 #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
554 structural_encoding: &str,
555 #[values(DataType::LargeBinary, DataType::LargeUtf8)] data_type: DataType,
556 ) {
557 let mut field_metadata = HashMap::new();
558 field_metadata.insert(
559 STRUCTURAL_ENCODING_META_KEY.to_string(),
560 structural_encoding.into(),
561 );
562 field_metadata.insert(COMPRESSION_META_KEY.to_string(), "fsst".into());
563 let field = Field::new("", data_type, true).with_metadata(field_metadata);
564 check_round_trip_encoding_random(field, LanceFileVersion::V2_1).await;
565 }
566
567 #[rstest]
568 #[test_log::test(tokio::test)]
569 async fn test_large_binary(
570 #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
571 ) {
572 let field = Field::new("", DataType::LargeBinary, true);
573 check_round_trip_encoding_random(field, version).await;
574 }
575
576 #[test_log::test(tokio::test)]
577 async fn test_large_utf8() {
578 let field = Field::new("", DataType::LargeUtf8, true);
579 check_round_trip_encoding_random(field, LanceFileVersion::V2_0).await;
580 }
581
582 #[rstest]
583 #[test_log::test(tokio::test)]
584 async fn test_small_strings(
585 #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
586 structural_encoding: &str,
587 ) {
588 let mut field_metadata = HashMap::new();
589 field_metadata.insert(
590 STRUCTURAL_ENCODING_META_KEY.to_string(),
591 structural_encoding.into(),
592 );
593 let field = Field::new("", DataType::Utf8, true).with_metadata(field_metadata);
594 check_round_trip_encoding_generated(
595 field,
596 Box::new(FnArrayGeneratorProvider::new(move || {
597 lance_datagen::array::utf8_prefix_plus_counter("user_", false)
598 })),
599 LanceFileVersion::V2_1,
600 )
601 .await;
602 }
603
604 #[rstest]
605 #[test_log::test(tokio::test)]
606 async fn test_simple_binary(
607 #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
608 #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
609 structural_encoding: &str,
610 #[values(DataType::Utf8, DataType::Binary)] data_type: DataType,
611 ) {
612 let string_array = StringArray::from(vec![Some("abc"), None, Some("pqr"), None, Some("m")]);
613 let string_array = arrow_cast::cast(&string_array, &data_type).unwrap();
614
615 let mut field_metadata = HashMap::new();
616 field_metadata.insert(
617 STRUCTURAL_ENCODING_META_KEY.to_string(),
618 structural_encoding.into(),
619 );
620
621 let test_cases = TestCases::default()
622 .with_range(0..2)
623 .with_range(0..3)
624 .with_range(1..3)
625 .with_indices(vec![0, 1, 3, 4])
626 .with_file_version(version);
627 check_round_trip_encoding_of_data(
628 vec![Arc::new(string_array)],
629 &test_cases,
630 field_metadata,
631 )
632 .await;
633 }
634
635 #[rstest]
636 #[test_log::test(tokio::test)]
637 async fn test_sliced_utf8(
638 #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
639 ) {
640 let string_array = StringArray::from(vec![Some("abc"), Some("de"), None, Some("fgh")]);
641 let string_array = string_array.slice(1, 3);
642
643 let test_cases = TestCases::default()
644 .with_range(0..1)
645 .with_range(0..2)
646 .with_range(1..2)
647 .with_file_version(version);
648 check_round_trip_encoding_of_data(
649 vec![Arc::new(string_array)],
650 &test_cases,
651 HashMap::new(),
652 )
653 .await;
654 }
655
656 #[test_log::test(tokio::test)]
657 async fn test_bigger_than_max_page_size() {
658 let big_string = String::from_iter((0..(32 * 1024 * 1024)).map(|_| '0'));
660 let string_array = StringArray::from(vec![
661 Some(big_string),
662 Some("abc".to_string()),
663 None,
664 None,
665 Some("xyz".to_string()),
666 ]);
667
668 let test_cases = TestCases::default().with_max_page_size(1024 * 1024);
670
671 check_round_trip_encoding_of_data(
672 vec![Arc::new(string_array)],
673 &test_cases,
674 HashMap::new(),
675 )
676 .await;
677
678 let big_string = String::from_iter((0..(1000 * 1000)).map(|_| '0'));
682 let string_array = StringArray::from_iter_values((0..90).map(|_| big_string.clone()));
683
684 check_round_trip_encoding_of_data(
685 vec![Arc::new(string_array)],
686 &TestCases::default(),
687 HashMap::new(),
688 )
689 .await;
690 }
691
692 #[rstest]
693 #[test_log::test(tokio::test)]
694 async fn test_empty_strings(
695 #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
696 ) {
697 let values = [Some("abc"), Some(""), None];
700 for order in [[0, 1, 2], [1, 0, 2], [2, 0, 1]] {
702 let mut string_builder = StringBuilder::new();
703 for idx in order {
704 string_builder.append_option(values[idx]);
705 }
706 let string_array = Arc::new(string_builder.finish());
707 let test_cases = TestCases::default()
708 .with_indices(vec![1])
709 .with_indices(vec![0])
710 .with_indices(vec![2])
711 .with_indices(vec![0, 1])
712 .with_file_version(version);
713 check_round_trip_encoding_of_data(
714 vec![string_array.clone()],
715 &test_cases,
716 HashMap::new(),
717 )
718 .await;
719 let test_cases = test_cases.with_batch_size(1);
720 check_round_trip_encoding_of_data(vec![string_array], &test_cases, HashMap::new())
721 .await;
722 }
723
724 let string_array = Arc::new(StringArray::from(vec![Some(""), None, Some("")]));
729
730 let test_cases = TestCases::default().with_range(0..2).with_indices(vec![1]);
731 check_round_trip_encoding_of_data(vec![string_array.clone()], &test_cases, HashMap::new())
732 .await;
733 let test_cases = test_cases.with_batch_size(1);
734 check_round_trip_encoding_of_data(vec![string_array], &test_cases, HashMap::new()).await;
735 }
736
737 #[rstest]
738 #[test_log::test(tokio::test)]
739 #[ignore] async fn test_jumbo_string(
741 #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
742 ) {
743 let mut string_builder = LargeStringBuilder::new();
747 let giant_string = String::from_iter((0..(1024 * 1024)).map(|_| '0'));
749 for _ in 0..5000 {
750 string_builder.append_option(Some(&giant_string));
751 }
752 let giant_array = Arc::new(string_builder.finish()) as ArrayRef;
753 let arrs = vec![giant_array];
754
755 let test_cases = TestCases::default()
757 .without_validation()
758 .with_file_version(version);
759 check_round_trip_encoding_of_data(arrs, &test_cases, HashMap::new()).await;
760 }
761
762 #[rstest]
763 #[test_log::test(tokio::test)]
764 async fn test_binary_dictionary_encoding(
765 #[values(true, false)] with_nulls: bool,
766 #[values(100, 500, 35000)] dict_size: u32,
767 ) {
768 let test_cases = TestCases::default().with_file_version(LanceFileVersion::V2_1);
769 let strings = (0..dict_size)
770 .map(|i| i.to_string())
771 .collect::<Vec<String>>();
772
773 let repeated_strings: Vec<_> = strings
774 .iter()
775 .cycle()
776 .take(70000)
777 .enumerate()
778 .map(|(i, s)| {
779 if with_nulls && i % 7 == 0 {
780 None
781 } else {
782 Some(s.clone())
783 }
784 })
785 .collect();
786 let string_array = Arc::new(StringArray::from(repeated_strings)) as ArrayRef;
787 check_round_trip_encoding_of_data(vec![string_array], &test_cases, HashMap::new()).await;
788 }
789
790 #[test_log::test(tokio::test)]
791 async fn test_binary_encoding_verification() {
792 use lance_datagen::{ByteCount, RowCount};
793
794 let test_cases = TestCases::default()
795 .with_expected_encoding("variable")
796 .with_file_version(LanceFileVersion::V2_1);
797
798 let arr_small = lance_datagen::gen_batch()
801 .anon_col(lance_datagen::array::rand_utf8(ByteCount::from(10), false))
802 .into_batch_rows(RowCount::from(1000))
803 .unwrap()
804 .column(0)
805 .clone();
806 check_round_trip_encoding_of_data(vec![arr_small], &test_cases, HashMap::new()).await;
807
808 let metadata_explicit =
810 HashMap::from([("lance-encoding:compression".to_string(), "none".to_string())]);
811 let arr_large = lance_datagen::gen_batch()
812 .anon_col(lance_datagen::array::rand_utf8(ByteCount::from(50), false))
813 .into_batch_rows(RowCount::from(2000))
814 .unwrap()
815 .column(0)
816 .clone();
817 check_round_trip_encoding_of_data(vec![arr_large], &test_cases, metadata_explicit).await;
818 }
819
820 #[test]
821 fn test_binary_miniblock_with_misaligned_buffer() {
822 use super::BinaryMiniBlockDecompressor;
823 use crate::buffer::LanceBuffer;
824 use crate::compression::MiniBlockDecompressor;
825 use crate::data::DataBlock;
826
827 {
829 let decompressor = BinaryMiniBlockDecompressor {
830 bits_per_offset: 32,
831 };
832
833 let mut test_data = Vec::new();
837
838 test_data.extend_from_slice(&12u32.to_le_bytes()); test_data.extend_from_slice(&15u32.to_le_bytes()); test_data.extend_from_slice(&20u32.to_le_bytes()); test_data.extend_from_slice(b"ABCXYZ"); test_data.extend_from_slice(&[0, 0]); let mut padded = Vec::with_capacity(test_data.len() + 1);
849 padded.push(0xFF); padded.extend_from_slice(&test_data);
851
852 let bytes = bytes::Bytes::from(padded);
853 let misaligned = bytes.slice(1..); let buffer = LanceBuffer::from_bytes(misaligned, 1);
857
858 let ptr = buffer.as_ref().as_ptr();
860 assert_ne!(
861 ptr.align_offset(4),
862 0,
863 "Test setup: buffer should be misaligned for u32"
864 );
865
866 let result = decompressor.decompress(vec![buffer], 2);
868 assert!(
869 result.is_ok(),
870 "Decompression should succeed with misaligned buffer"
871 );
872
873 if let Ok(DataBlock::VariableWidth(block)) = result {
875 assert_eq!(block.num_values, 2);
876 assert_eq!(&block.data.as_ref()[..6], b"ABCXYZ");
878 } else {
879 panic!("Expected VariableWidth block");
880 }
881 }
882
883 {
885 let decompressor = BinaryMiniBlockDecompressor {
886 bits_per_offset: 64,
887 };
888
889 let mut test_data = Vec::new();
891
892 test_data.extend_from_slice(&24u64.to_le_bytes()); test_data.extend_from_slice(&29u64.to_le_bytes()); test_data.extend_from_slice(&40u64.to_le_bytes()); test_data.extend_from_slice(b"HelloWorld"); test_data.extend_from_slice(&[0, 0, 0, 0, 0, 0]); let mut padded = Vec::with_capacity(test_data.len() + 3);
903 padded.extend_from_slice(&[0xFF, 0xFF, 0xFF]); padded.extend_from_slice(&test_data);
905
906 let bytes = bytes::Bytes::from(padded);
907 let misaligned = bytes.slice(3..); let buffer = LanceBuffer::from_bytes(misaligned, 1);
910
911 let ptr = buffer.as_ref().as_ptr();
913 assert_ne!(
914 ptr.align_offset(8),
915 0,
916 "Test setup: buffer should be misaligned for u64"
917 );
918
919 let result = decompressor.decompress(vec![buffer], 2);
921 assert!(
922 result.is_ok(),
923 "Decompression should succeed with misaligned u64 buffer"
924 );
925
926 if let Ok(DataBlock::VariableWidth(block)) = result {
927 assert_eq!(block.num_values, 2);
928 assert_eq!(&block.data.as_ref()[..10], b"HelloWorld");
930 } else {
931 panic!("Expected VariableWidth block");
932 }
933 }
934 }
935}